先说结论:

  由于mapreduce中没有办法直接控制map数量,所以只能曲线救国,通过设置每个map中处理的数据量进行设置;reduce是可以直接设置的。
控制map和reduce的参数

set mapred.max.split.size=256000000;        -- 决定每个map处理的最大的文件大小,单位为B
set mapred.min.split.size.per.node=1;         -- 节点中可以处理的最小的文件大小
set mapred.min.split.size.per.rack=1;         -- 机架中可以处理的最小的文件大小
方法1
set mapred.reduce.tasks=10;  -- 设置reduce的数量
方法2
set hive.exec.reducers.bytes.per.reducer=1073741824 -- 每个reduce处理的数据量,默认1GB

补充说明:一个集群可以有多个机架,一个机架有1至多个节点,这里的集群是mapreduce不是yarn,yarn没有详细了解过,另外如果想要实现map中的数据合并需要设置下面的参数,集群默认就是这个格式

set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

需要确认的问题:
 a.我们该设置多少个map多少个reduce才合适?
  map数普遍是通过执行时长来确认的,至少应当保证每个map执行时长在1分钟以上,太短的话意味着大量重复的jvm启用和销毁。具体设置要根据具体任务来处理,有些任务占用cpu大,有些占用io大。
  我这边的话因为大任务经常搞了上千个map,作为小集群影响还是蛮大的,所以只对监控到的hql产生过大的map和reduce进行调整,经过一些简单测试,map数保证在三四百个其实不影响执行效率。

 b.设置了上面的参数会带来什么影响?
  设置map的话,影响不是很大,可能会带来更多的集群之间的io,毕竟要做节点之间的文件合并
  设置reduce的话,如果使用mapred.reduce.tasks,这个影响就大了,至少会造成同一个session每一个mr的job的reduce都是这么多个,而且reduce个数意味着最后的文件数量的输出,如果小文件太多的话可以打开reduce端的小文件合并参数,set hive.merge.mapredfiles=true
1、控制map数量的三个参数的逻辑概念

  可以简单的理解为集群对一个表分区下面的文件进行分发到各个节点,之后根据mapred.max.split.size确认要启动多少个map数,逻辑如下
  a.假设有两个文件大小分别为(256M,280M)被分配到节点A,那么会启动两个map,剩余的文件大小为10MB和35MB因为每个大小都不足241MB会先做保留
  b.根据参数set mapred.min.split.size.per.node看剩余的大小情况并进行合并,如果值为1,表示a中每个剩余文件都会自己起一个map,这里会起两个,如果设置为大于4510241024则会合并成一个块,并产生一个map
  如果mapred.min.split.size.per.node为1010241024,那么在这个节点上一共会有4个map,处理的大小为(245MB,245MB,10MB,10MB,10MB,10MB),余下9MB
  如果mapred.min.split.size.per.node为4510241024,那么会有三个map,处理的大小为(245MB,245MB,45MB)
  实际中mapred.min.split.size.per.node无法准确地设置成4510241024,会有剩余并保留带下一步进行判断处理
  c. 对b中余出来的文件与其它节点余出来的文件根据mapred.min.split.size.per.rack大小进行判断是否合并,对再次余出来的文件独自产生一个map处理
2、控制map数量的简单实用方式

  我们执行一个hive语句,发现起了1000个map,这种情况对于当前的数据量来说是完全不必要的,同时还会影响其它用户提交任务
  这个时候希望map减小到250个左右,很简单
  将map处理的最大的文件大小增大,256000000*4=1024000000
参数修改为如下

set mapred.max.split.size=1024000000;
set mapred.min.split.size.per.node=1024000000;
set mapred.min.split.size.per.rack=1024000000;

3、控制reduce参数

  修改reduce的个数就简单很多,直接根据可能的情况作个简单的判断确认需要的reduce数量,如果无法判断,根据当前map数减小10倍,保持在1~100个reduce即可(注意,这个不同的集群需求不同哈)
设置参数如下
set mapred.reduce.tasks=10
不建议随意设置reduce参数哈,可能调整参数更好一点
set hive.exec.reducers.bytes.per.reducer=1073741824
4、控制map和rduce数的说明

  1、因为set参数的设置是session级别的,Toad for Cloud(青蛙)第三方软件中暂时没有发现如何使set的参数设置有效的,所以请使用CRT等工具连接到linux的界面中进行使用,使用hive命令连接hive集群,一次设置只要不结束这次连接,那么参数是始终有效的。
  2、注意各参数的设置大小,不要冲突,否则会异常,大小顺序如下
mapred.max.split.size <= mapred.min.split.size.per.node <= mapred.min.split.size.per.rack
  3、这种减少map数的行为是否能带来更短的执行时间,需要具体分析,map数也不是越少越好,减少了map数,单个map处理的数据量就上升,需要更多的时间,同时也因为需要合并节点内、节点间甚至机架之间的数据需要更多的IO和带宽。
  参数的设置本质是根据文件情况、系统情况、数据计算情况进行的一个平衡考虑,有取有舍,我们需要遵循的规则就是:使大数据量利用合适的map数;使单个map任务处理合适的数据量
  4、reduce数同3


1、hive.merge.mapfiles,True时会合并map输出。
2、hive.merge.mapredfiles,True时会合并reduce输出。
3、hive.merge.size.per.task,合并操作后的单个文件大小。
4、hive.merge.size.smallfiles.avgsize,当输出文件平均大小小于设定值时,启动合并操作。这一设定只有当hive.merge.mapfiles或hive.merge.mapredfiles设定为true时,才会对相应的操作有效。
5、mapred.reduce.tasks=30; 设置Reduce Task个数
6、hive.exec.compress.output=’false’; 设置数据不作压缩,要是压缩了我们拿出来的文件就只能通过HIVE-JDBC来解析
7、mapred.map.tasks=1200;
8、hive.optimize.skewjoin=true;这个是给join优化的 0.6官方版本好像有个bug悲哀啊
9、hive.groupby.skewindata=true;这个是给groupby优化的

优化案例一:

使用的生产Hive环境的几个参数配置如下:

        dfs.block.size=268435456

        hive.merge.mapredfiles=true

        hive.merge.mapfiles=true

        hive.merge.size.per.task=256000000

        mapred.map.tasks=2 

因为合并小文件默认为true,而dfs.block.size与hive.merge.size.per.task的搭配使得合并后的绝大部分文件都在300MB左右。

CASE 1:

    现在我们假设有3个300MB大小的文件,那么goalsize = min(900MB/2,256MB) = 256MB (具体如何计算map数请参见http://blog.sina.com.cn/s/blog_6ff05a2c010178qd.html)

    所以整个JOB会有6个map,其中3个map分别处理256MB的数据,还有3个map分别处理44MB的数据。

    这时候木桶效应就来了,整个JOB的map阶段的执行时间不是看最短的1个map的执行时间,而是看最长的1个map的执行时间。所以,虽然有3个map分别只处理44MB的数据,可以很快跑完,但它们还是要等待另外3个处理256MB的map。显然,处理256MB的3个map拖了整个JOB的后腿。

CASE 2:

    如果我们把mapred.map.tasks设置成6,再来看一下有什么变化:

    goalsize = min(900MB/6,256MB) = 150MB

    整个JOB同样会分配6个map来处理,每个map处理150MB的数据,非常均匀,谁都不会拖后腿,最合理地分配了资源,执行时间大约为CASE 1的59%(150/256) 

    案例分析:

    虽然mapred.map.tasks从2调整到了6,但是CASE 2并没有比CASE 1多用map资源,同样都是使用6个map。而CASE 2的执行时间约为CASE 1执行时间的59%。

从这个案例可以看出,对mapred.map.tasks进行自动化的优化设置其实是可以很明显地提高作业执行效率的。

案例二(处理小文件):

最近仓库里面新建了一张分区表,数据量大约是12亿行,分区比较多,从2008年7月开始 一天一个分区。

配置了一个任务

对这个表进行group by 的时候 发现启动了2800多个maps .

执行的时间也高大10分钟。

然后我在hdfs文件里面看到 这个表的每个分区里面都有20多个小文件,每个文件都不大 300KB--1MB

 

之前的hive的参数:

hive.merge.mapfiles=true

hive.merge.mapredfiles=false

hive.merge.rcfile.block.level=true

hive.merge.size.per.task=256000000

hive.merge.smallfiles.avgsize=16000000

hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat

mapred.max.split.size=256000000

mapred.min.split.size=1

mapred.min.split.size.per.node=1

mapred.min.split.size.per.rack=1

 

hive.merge.mapredfiles 这个指的是 在Map-Reduce的任务结束时合并小文件

解决办法:

1.修改参数hive.merge.mapredfiles=true

2.通过map_reduece的办法生成一张新的表 此时生成的文件变成了每个分区一个文件

 

再次执行group by 发现效率得到了大大的提升。

小结:

正确处理hive小文件 是 控制map数的一个重要环节

处理的不好 会大大影响任务的执行效率