分区是hive存放数据的一种方式。将列值作为目录来存放数据,就是一个分区。这样查询时使用分区列进行过滤,只需根据列值直接扫描对应目录下的数据,不扫描其他不关心的分区,快速定位,提高查询效率。分动态和静态分区两种:
1. 静态分区:若分区的值是确定的,那么称为静态分区。新增分区或者是加载分区数据时,已经指定分区名。
create table if not exists day_part1(
uid int,
uname string
)
partitioned by(year int,month int)
row format delimited fields terminated by '\t'
;
##加载数据指定分区
load data local inpath '/root/Desktop/student.txt' into table day_part1 partition(year=2017,month=04);
##新增分区指定分区名
alter table day_part1 add partition(year=2017,month=1) partition(year=2016,month=12);
1. 动态分区:分区的值是非确定的,由输入数据来确定
2.1 动态分区的相关属性:
hive.exec.dynamic.partition=true :是否允许动态分区
hive.exec.dynamic.partition.mode=strict :分区模式设置
strict:最少需要有一个是静态分区
nostrict:可以全部是动态分区
hive.exec.max.dynamic.partitions=1000 :允许动态分区的最大数量
hive.exec.max.dynamic.partitions.pernode =100 :单个节点上的mapper/reducer允许创建的最大分区
2.2 动态分区的操作
##创建临时表
create table if not exists tmp(
uid int,
commentid bigint,
recommentid bigint,
year int,
month int,
day int
)
row format delimited fields terminated by '\t';
##加载数据
load data local inpath '/root/Desktop/comm' into table tmp;
##创建动态分区表
create table if not exists dyp1(
uid int,
commentid bigint,
recommentid bigint
)
partitioned by(year int,month int,day int)
row format delimited fields terminated by '\t'
;
##严格模式
insert into table dyp1 partition(year=2016,month,day)
select uid,commentid,recommentid,month,day from tmp;
##非严格模式
##设置非严格模式动态分区
set hive.exec.dynamic.partition.mode=nostrict;
##创建动态分区表
create table if not exists dyp2(
uid int,
commentid bigint,
recommentid bigint
)
partitioned by(year int,month int,day int)
row format delimited fields terminated by '\t';
##为非严格模式动态分区加载数据
insert into table dyp2 partition(year,month,day)
select uid,commentid,recommentid,year,month,day from tmp;
3.分区注意细节
(1)、尽量不要是用动态分区,因为动态分区的时候,将会为每一个分区分配reducer数量,当分区数量多的时候,reducer数量将会增加,对服务器是一种灾难。
(2)、动态分区和静态分区的区别,静态分区不管有没有数据都将会创建该分区,动态分区是有结果集将创建,否则不创建。
(3)、hive动态分区的严格模式和hive提供的hive.mapred.mode的严格模式。
hive提供我们一个严格模式:为了阻止用户不小心提交恶意hql
hive.mapred.mode=nostrict : strict
如果该模式值为strict,将会阻止以下三种查询:
(1)、对分区表查询,where中过滤字段不是分区字段。
(2)、笛卡尔积join查询,join查询语句,不带on条件 或者 where条件。
(3)、对order by查询,有order by的查询不带limit语句。
hive中创建分区表没有什么复杂的分区类型(范围分区、列表分区、hash分区、混合分区等)。分区列也不是表中的一个实际的字段,而是一个或者多个伪列。意思是说在表的数据文件中实际上并不保存分区列的信息与数据。
下面的语句创建了一个简单的分区表: create table partition_test (member_id string, name string ) partitioned by ( stat_date string, province string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','; 这个例子中创建了stat_date和province两个字段作为分区列。通常情况下需要先预先创建好分区,然后才能使用该分区,例如: alter table partition_test add partition(stat_date='20110728',province='zhejiang'); 这样就创建好了一个分区。这时我们会看到hive在HDFS存储中创建了一个相应的文件夹: $ hadoop fs -ls/user/hive/warehouse/partition_test/stat_date=20110728 Found 1 items drwxr-xr-x - admin supergroup 0 2011-07-29 09:53/user/hive/warehouse/partition_test/stat_date=20110728/province=zhejiang 每一个分区都会有一个独立的文件夹,下面是该分区所有的数据文件。在这个例子中stat_date是主层次,province是副层次,所有stat_date='20110728',而province不同的分区都会在/user/hive/warehouse/partition_test/stat_date=20110728下面,而stat_date不同的分区都会在/user/hive/warehouse/partition_test/下面,如: $ hadoop fs -ls /user/hive/warehouse/partition_test/ Found 2 items drwxr-xr-x - admin supergroup 0 2011-07-28 19:46/user/hive/warehouse/partition_test/stat_date=20110526 drwxr-xr-x - admin supergroup 0 2011-07-29 09:53/user/hive/warehouse/partition_test/stat_date=20110728 注意,因为分区列的值要转化为文件夹的存储路径,所以如果分区列的值中包含特殊值,如 '%', ':', '/','#',它将会被使用%加上2字节的ASCII码进行转义,如: hive> alter table partition_test add partition(stat_date='2011/07/28',province='zhejiang'); OK Time taken: 4.644 seconds $hadoop fs -ls /user/hive/warehouse/partition_test/ Found 3 items drwxr-xr-x - admin supergroup 0 2011-07-29 10:06/user/hive/warehouse/partition_test/stat_date=2011% 2F07%2F28 drwxr-xr-x - admin supergroup 0 2011-07-28 19:46/user/hive/warehouse/partition_test/stat_date=20110526 drwxr-xr-x - admin supergroup 0 2011-07-29 09:53/user/hive/warehouse/partition_test/stat_date=20110728 我使用一个辅助的非分区表partition_test_input准备向partition_test中插入数据: hive> desc partition_test_input; OK stat_date string member_id string name string province string hive> select * from partition_test_input; OK 20110526 1 liujiannan liaoning 20110526 2 wangchaoqun hubei 20110728 3 xuhongxing sichuan 20110728 4 zhudaoyong henan 20110728 5 zhouchengyu heilongjiang 然后我向partition_test的分区中插入数据: hive> insert overwrite table partition_testpartition(stat_date='20110728',province='henan') selectmember_id,name from partition_test_input where stat_date='20110728'and province='henan'; Total MapReduce jobs = 2 ... 1 Rows loaded to partition_test OK 还可以同时向多个分区插入数据,0.7版本以后不存在的分区会自动创建,0.6之前的版本官方文档上说必须要预先创建好分区: hive> > from partition_test_input > insert overwrite table partition_test partition(stat_date='20110526',province='liaoning') > select member_id,name where stat_date='20110526'and province='liaoning' > insert overwrite table partition_test partition(stat_date='20110728',province='sichuan') > select member_id,name where stat_date='20110728'and province='sichuan' > insert overwrite table partition_test partition(stat_date='20110728',province='heilongjiang') > select member_id,name where stat_date='20110728'and province='heilongjiang'; Total MapReduce jobs = 4 ... 3 Rows loaded to partition_test OK 特别要注意,在其他数据库中,一般向分区表中插入数据时系统会校验数据是否符合该分区,如果不符合会报错。而在hive中,向某个分区中插入什么样的数据完全是由人来控制的,因为分区键是伪列,不实际存储在文件中,如: hive> insert overwrite table partition_testpartition(stat_date='20110527',province='liaoning') selectmember_id,name from partition_test_input; Total MapReduce jobs = 2 ... 5 Rows loaded to partition_test OK hive> select * from partition_test wherestat_date='20110527' and province='liaoning'; OK 1 liujiannan 20110527 liaoning 2 wangchaoqun 20110527 liaoning 3 xuhongxing 20110527 liaoning 4 zhudaoyong 20110527 liaoning 5 zhouchengyu 20110527 liaoning 可以看到在partition_test_input中的5条数据有着不同的stat_date和province,但是在插入到partition(stat_date='20110527',province='liaoning')这个分区后,5条数据的stat_date和province都变成相同的了,因为这两列的数据是根据文件夹的名字读取来的,而不是实际从数据文件中读取来的: $ hadoop fs -cat/user/hive/warehouse/partition_test/stat_date=20110527/province=liaoning/000000_0 1,liujiannan 2,wangchaoqun 3,xuhongxing 4,zhudaoyong 5,zhouchengyu 下面介绍一下动态分区,因为按照上面的方法向分区表中插入数据,如果源数据量很大,那么针对一个分区就要写一个insert,非常麻烦。况且在之前的版本中,必须先手动创建好所有的分区后才能插入,这就更麻烦了,你必须先要知道源数据中都有什么样的数据才能创建分区。 使用动态分区可以很好的解决上述问题。动态分区可以根据查询得到的数据自动匹配到相应的分区中去。 使用动态分区要先设置hive.exec.dynamic.partition参数值为true,默认值为false,即不允许使用: hive> set hive.exec.dynamic.partition; hive.exec.dynamic.partition=false hive> set hive.exec.dynamic.partition=true; hive> set hive.exec.dynamic.partition; hive.exec.dynamic.partition=true 动态分区的使用方法很简单,假设我想向stat_date='20110728'这个分区下面插入数据,至于province插入到哪个子分区下面让数据库自己来判断,那可以这样写: hive> insert overwrite table partition_testpartition(stat_date='20110728',province) > select member_id,name,province frompartition_test_input where stat_date='20110728'; Total MapReduce jobs = 2 ... 3 Rows loaded to partition_test OK stat_date叫做静态分区列,province叫做动态分区列。select子句中需要把动态分区列按照分区的顺序写出来,静态分区列不用写出来。这样stat_date='20110728'的所有数据,会根据province的不同分别插入到/user/hive/warehouse/partition_test/stat_date=20110728/下面的不同的子文件夹下,如果源数据对应的province子分区不存在,则会自动创建,非常方便,而且避免了人工控制插入数据与分区的映射关系存在的潜在风险。 注意,动态分区不允许主分区采用动态列而副分区采用静态列,这样将导致所有的主分区都要创建副分区静态列所定义的分区: hive> insert overwrite table partition_testpartition(stat_date,province='liaoning') > select member_id,name,province frompartition_test_input where province='liaoning'; FAILED: Error in semantic analysis: Line 1:48 Dynamic partitioncannot be the parent of a static partition 'liaoning' 动态分区可以允许所有的分区列都是动态分区列,但是要首先设置一个参数hive.exec.dynamic.partition.mode: hive> set hive.exec.dynamic.partition.mode; hive.exec.dynamic.partition.mode=strict 它的默认值是strick,即不允许分区列全部是动态的,这是为了防止用户有可能原意是只在子分区内进行动态建分区,但是由于疏忽忘记为主分区列指定值了,这将导致一个dml语句在短时间内创建大量的新的分区(对应大量新的文件夹),对系统性能带来影响。 所以我们要设置: hive> sethive.exec.dynamic.partition.mode=nostrick; 再介绍3个参数: hive.exec.max.dynamic.partitions.pernode (缺省值100):每一个mapreducejob允许创建的分区的最大数量,如果超过了这个数量就会报错 hive.exec.max.dynamic.partitions(缺省值1000):一个dml语句允许创建的所有分区的最大数量 hive.exec.max.created.files (缺省值100000):所有的mapreducejob允许创建的文件的最大数量 当源表数据量很大时,单独一个mapreducejob中生成的数据在分区列上可能很分散,举个简单的例子,比如下面的表要用3个map: 1 1 1 2 2 2 3 3 3 如果数据这样分布,那每个mapreduce只需要创建1个分区就可以了: |1 map1 --> |1 |1 |2 map2 --> |2 |2 |3 map3 --> |3 |3 但是如果数据按下面这样分布,那第一个mapreduce就要创建3个分区: |1 map1 --> |2 |3 |1 map2 --> |2 |3 |1 map3 --> |2 |3 下面给出了一个报错的例子: hive> sethive.exec.max.dynamic.partitions.pernode=4; hive> insert overwrite table partition_testpartition(stat_date,province) > select member_id,name,stat_date,province frompartition_test_input distribute by stat_date,province; Total MapReduce jobs = 1 ... [Fatal Error] Operator FS_4 (id=4): Number of dynamic partitionsexceeded hive.exec.max.dynamic.partitions.pernode.. Killing thejob. Ended Job = job_201107251641_0083 with errors FAILED: Execution Error, return code 2 fromorg.apache.hadoop.hive.ql.exec.MapRedTask 为了让分区列的值相同的数据尽量在同一个mapreduce中,这样每一个mapreduce可以尽量少的产生新的文件夹,可以借助distributeby的功能,将分区列值相同的数据放到一起: hive> insert overwrite table partition_testpartition(stat_date,province) > select member_id,name,stat_date,province frompartition_test_input distribute by stat_date,province; Total MapReduce jobs = 1 ... 18 Rows loaded to partition_test OK 好了,关于hive的分区表先简单介绍到这里,后续版本如果有功能的更新我也会再更新。
为何分区分桶
我们知道传统的DBMS系统一般都具有表分区的功能,通过表分区能够在特定的区域检索数据,减少扫描成本,在一定程度上提高查询效率,当然我们还可以通过进一步在分区上建立索引进一步提升查询效率。在此就不赘述了。在Hive数仓中也有分区分桶的概念,在逻辑上分区表与未分区表没有区别,在物理上分区表会将数据按照分区键的列值存储在表目录的子目录中,目录名=“分区键=键值”。其中需要注意的是分区键的值不一定要基于表的某一列(字段),它可以指定任意值,只要查询的时候指定相应的分区键来查询即可。我们可以对分区进行添加、删除、重命名、清空等操作。因为分区在特定的区域(子目录)下检索数据,它作用同DNMS分区一样,都是为了减少扫描成本。
分桶则是指定分桶表的某一列,让该列数据按照哈希取模的方式随机、均匀地分发到各个桶文件中。因为分桶操作需要根据某一列具体数据来进行哈希取模操作,故指定的分桶列必须基于表中的某一列(字段)。因为分桶改变了数据的存储方式,它会把哈希取模相同或者在某一区间的数据行放在同一个桶文件中。如此一来便可提高查询效率,如:我们要对两张在同一列上进行了分桶操作的表进行JOIN操作的时候,只需要对保存相同列值的桶进行JOIN操作即可。同时分桶也能让取样(Sampling)更高效。
分区
Hive(Inceptor)分区又分为单值分区、范围分区。单值分区又分为静态分区和动态分区。我们先看下分区长啥样。如下,假如有一张表名为persionrank表,记录每个人的评级,有id、name、score字段。我们便可以创建分区rank(注意rank不是表中的列,我们可以把它当做虚拟列),并将相应数据导入指定分区(将数据插入指定目录)。单值分区
单值分区根据插入时是否需要手动指定分区可以分为:单值静态分区:导入数据时需要手动指定分区。单值动态分区:导入数据时,系统可以动态判断目标分区。单值分区表的建表方式有两种:直接定义列和 CREATE TABLE LIKE。注意,单值分区表不能用 CREATE
TABLE AS SELECT 建表。而范围分区表只能通过直接定义列来建表。1、静态分区创建
直接在 PARTITIONED BY 后面跟上分区键、类型即可。(分区键不能和任何列重名)
CREATE [EXTERNAL] TABLE <table_name>
(<col_name> <data_type> [, <col_name> <data_type> ...]) -- 指定分区键和数据类型 PARTITIONED BY (<partition_key> <data_type>, ...) [CLUSTERED BY ...] [ROW FORMAT <row_format>] [STORED AS TEXTFILE|ORC|CSVFILE] [LOCATION '<file_path>'] [TBLPROPERTIES ('<property_name>'='<property_value>', ...)]; 2、静态分区写入-- 覆盖写入
INSERT OVERWRITE TABLE <table_name> PARTITION (<partition_key>=<partition_value>[, <partition_key>=<partition_value>, ...]) SELECT <select_statement>; -- 追加写入 INSERT INTO TABLE <table_name> PARTITION (<partition_key>=<partition_value>[, <partition_key>=<partition_value>, ...]) SELECT <select_statement>; 3、动态分区创建创建方式与静态分区表完全一样,一张表可同时被静态和动态分区键分区,只是动态分区键需要放在静态分区建的后面(因为HDFS上的动态分区目录下不能包含静态分区的子目录),如下 spk 即 static partition key, dpk 即 dynamic partition key。
CREATE TABLE <table_name>
PARTITIONED BY ([<spk> <data_type>, ... ,] <dpk> <data_type>, [<dpk> <data_type>,...]); -- ...略 4、动态分区写入静态分区键要用 <spk>=<value> 指定分区值;动态分区只需要给出分出分区键名称 <dpk>。
-- 开启动态分区支持,并设置最大分区数
set hive.exec.dynamic.partition=true; set hive.exec.max.dynamic.partitions=2000; -- <dpk>为动态分区键, <spk>为静态分区键 INSERT (OVERWRITE | INTO) TABLE <table_name> PARTITION ([<spk>=<value>, ..., ] <dpk>, [..., <dpk>]) SELECT <select_statement>; 范围分区 单值分区每个分区对应于分区键的一个取值,而每个范围分区则对应分区键的一个区间,只要落在指定区间内的记录都被存储在对应的分区下。分区范围需要手动指定,分区的范围为前闭后开区间 [最小值, 最大值)。最后出现的分区可以使用 MAXVALUE 作为上限,MAXVALUE 代表该分区键的数据类型所允许的最大 值。CREATE [EXTERNAL] TABLE <table_name>
(<col_name> <data_type>, <col_name> <data_type>, ...) PARTITIONED BY RANGE (<partition_key> <data_type>, ...) (PARTITION [<partition_name>] VALUES LESS THAN (<cutoff>), [PARTITION [<partition_name>] VALUES LESS THAN (<cutoff>), ... ] PARTITION [<partition_name>] VALUES LESS THAN (<cutoff>|MAXVALUE) ) [ROW FORMAT <row_format>] [STORED AS TEXTFILE|ORC|CSVFILE] [LOCATION '<file_path>'] [TBLPROPERTIES ('<property_name>'='<property_value>', ...)]; eg:多个范围分区键的情况:DROP TABLE IF EXISTS test_demo;
CREATE TABLE test_demo (value INT) PARTITIONED BY RANGE (id1 INT, id2 INT, id3 INT) ( -- id1在(--∞,5]之间,id2在(-∞,105]之间,id3在(-∞,205]之间 PARTITION p5_105_205 VALUES LESS THAN (5, 105, 205), -- id1在(--∞,5]之间,id2在(-∞,105]之间,id3在(205,215]之间 PARTITION p5_105_215 VALUES LESS THAN (5, 105, 215), PARTITION p5_115_max VALUES LESS THAN (5, 115, MAXVALUE), PARTITION p10_115_205 VALUES LESS THAN (10, 115, 205), PARTITION p10_115_215 VALUES LESS THAN (10, 115, 215), PARTITION pall_max values less than (MAXVALUE, MAXVALUE, MAXVALUE) );分桶
说完分区,我们来继续搞分桶。对Hive(Inceptor)表分桶可以将表中记录按分桶键的哈希值分散进多个文件中,这些小文件称为桶。创建分桶表
我们先看一下创建分桶表的创建,分桶表的建表有三种方式:直接建表,CREATE TABLE LIKE 和 CREATE TABLE AS SELECT ,单值分区表不能用 CREATETABLE AS SELECT 建表。这里以直接建表为例:CREATE [EXTERNAL] TABLE <table_name>
(<col_name> <data_type> [, <col_name> <data_type> ...])] [PARTITIONED BY ...] CLUSTERED BY (<col_name>) [SORTED BY (<col_name> [ASC|DESC] [, <col_name> [ASC|DESC]...])] INTO <num_buckets> BUCKETS [ROW FORMAT <row_format>] [STORED AS TEXTFILE|ORC|CSVFILE] [LOCATION '<file_path>'] [TBLPROPERTIES ('<property_name>'='<property_value>', ...)]; 分桶键只能有一个即<col_name>。表可以同时分区和分桶,当表分区时,每个分区下都会有<num_buckets> 个桶。我们也可以选择使用 SORTED BY … 在桶内排序,排序键和分桶键无需相同。ASC 为升序选项,DESC 为降序选项,默认排序方式是升序。<num_buckets> 指定分桶个数,也就是表目录下小文件的个数。向分桶表写入数据
因为分桶表在创建的时候只会定义Scheme,且写入数据的时候不会自动进行分桶、排序,需要人工先进行分桶、排序后再写入数据。确保目标表中的数据和它定义的分布一致。目前有两种方式往分桶表中插入数据:
方法一:打开enforce bucketing开关。
SET hive.enforce.bucketing=true; ①
INSERT (INTO|OVERWRITE) TABLE <bucketed_table> SELECT <select_statement> [SORT BY <sort_key> [ASC|DESC], [<sort_key> [ASC|DESC], ...]]; ② 方法二:将reducer个数设置为目标表的桶数,并在 SELECT 语句中用 DISTRIBUTE BY <bucket_key>对查询结果按目标表的分桶键分进reducer中。SET mapred.reduce.tasks = <num_buckets>;
INSERT (INTO|OVERWRITE) TABLE <bucketed_table> SELECT <select_statement> DISTRIBUTE BY <bucket_key>, [<bucket_key>, ...] [SORT BY <sort_key> [ASC|DESC], [<sort_key> [ASC|DESC], ...]]; 如果分桶表创建时定义了排序键,那么数据不仅要分桶,还要排序 如果分桶键和排序键不同,且按降序排列,使用Distribute by … Sort by分桶排序 如果分桶键和排序键相同,且按升序排列(默认),使用 Cluster by 分桶排序,即如下: SET mapred.reduce.tasks = <num_buckets>; INSERT (INTO|OVERWRITE) TABLE <bucketed_table> SELECT <select_statement> CLUSTER BY <bucket_sort_key>, [<bucket_sort_key>, ...];另外补充说明一下,在Hive(Inceptor)中,ORC事务表必须进行分桶(为了提高效率)。每个桶的文件大小应在100~200MB之间(ORC表压缩后的数据)。通常做法是先分区后分桶。
---------------------使用Hive SQL插入动态分区的Parquet表OOM异常分析
温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。 Fayson的github: 提示:代码块部分可以左右滑动查看噢
1.异常描述
当运行“INSERT ... SELECT”语句向Parquet或者ORC格式的表中插入数据时,如果启用了动态分区,你可能会碰到以下错误,而导致作业无法正常执行。
Hive客户端:
-
Task with the most failures(4):
-
Diagnostic Messages for this Task:
-
Error: GC overhead limit exceeded
-
...
-
FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask
-
MapReduce Jobs Launched:
-
Stage-Stage-1: Map: 1 HDFS Read: 0 HDFS Write: 0 FAIL
-
Total MapReduce CPU Time Spent: 0 msec
(可左右滑动)
YARN的8088中查看具体map task报错:
2017-10-27 17:08:04,317 FATAL [main] org.apache.hadoop.mapred.YarnChild: Error running child : java.lang.OutOfMemoryError: GC overhead limit exceeded
(可左右滑动)
2.异常分析
Parquet和ORC是列式批处理文件格式。这些格式要求在写入文件之前将批次的行(batches of rows)缓存在内存中。在执行INSERT语句时,动态分区目前的实现是:至少为每个动态分区目录打开一个文件写入器(file writer)。由于这些缓冲区是按分区维护的,因此在运行时所需的内存量随着分区数量的增加而增加。所以经常会导致mappers或reducers的OOM,具体取决于打开的文件写入器(file writer)的数量。
通过INSERT语句插入数据到动态分区表中,也可能会超过HDFS同时打开文件数的限制。
如果没有join或聚合,INSERT ... SELECT语句会被转换为只有map任务的作业。mapper任务会读取输入记录然后将它们发送到目标分区目录。在这种情况下,每个mapper必须为遇到的每个动态分区创建一个新的文件写入器(file writer)。mapper在运行时所需的内存量随着它遇到的分区数量的增加而增加。
3.异常重现与解决
3.1.生成动态分区的几个参数说明
hive.exec.dynamic.partition
默认值:false
是否开启动态分区功能,默认false关闭。
使用动态分区时候,该参数必须设置成true;
hive.exec.dynamic.partition.mode
默认值:strict
动态分区的模式,默认strict,表示必须指定至少一个分区为静态分区,nonstrict模式表示允许所有的分区字段都可以使用动态分区。
一般需要设置为nonstrict
hive.exec.max.dynamic.partitions.pernode
默认值:100
在每个执行MR的节点上,最大可以创建多少个动态分区。
该参数需要根据实际的数据来设定。
比如:源数据中包含了一年的数据,即day字段有365个值,那么该参数就需要设置成大于365,如果使用默认值100,则会报错。
hive.exec.max.dynamic.partitions
默认值:1000
在所有执行MR的节点上,最大一共可以创建多少个动态分区。
同上参数解释。
hive.exec.max.created.files
默认值:100000
整个MR Job中,最大可以创建多少个HDFS文件。
一般默认值足够了,除非你的数据量非常大,需要创建的文件数大于100000,可根据实际情况加以调整。
mapreduce.map.memory.mb
map任务的物理内存分配值,常见设置为1GB,2GB,4GB等。
mapreduce.map.java.opts
map任务的Java堆栈大小设置,一般设置为小于等于上面那个值的75%,这样可以保证map任务有足够的堆栈外内存空间。
mapreduce.input.fileinputformat.split.maxsize
mapreduce.input.fileinputformat.split.minsize
这个两个参数联合起来用,主要是为了方便控制mapreduce的map数量。比如我设置为1073741824,就是为了让每个map处理1GB的文件。
3.2.一个例子
Fayson在前两天给人调一个使用Hive SQL插入动态分区的Parquet表时,总是报错OOM,也是折腾了很久。以下我们来看看整个过程。
1.首先我们看看执行脚本的内容,基本其实就是使用Hive的insert语句将文本数据表插入到另外一张parquet表中,当然使用了动态分区。
2.我们看看原始数据文件,是文本文件,一共120个,每个30GB大小,总共差不多3.6TB。
3.我们看看报错
4.因为是一个只有map的mapreduce任务,当我们从YARN的8088观察这个作业时可以发现,基本没有一个map能够执行成功,全部都是失败的。报上面的错误。
5.把mapreduce.map.memory.mb从2GB增大到4GB,8GB,16GB,相应mapreduce.map.java.opts增大到3GB,6GB,12GB。依旧报错OOM。
6.后面又将mapreduce.input.fileinputformat.split.maxsize从1GB,减少为512MB,256MB,从而增大map数量,缩小单个map处理文件的大小。依旧报错OOM。
7.最后启用hive.optimize.sort.dynamic.partition,增加reduce过程,作业执行成功。
8.最后查看结果文件大约1.2TB,约为输入文件的三分之一。一共1557个分区,最大的分区文件为2GB。
4.异常总结
对于这个异常,我们建议有以下三种方式来处理:
1.启用hive.optimize.sort.dynamic.partition,将其设置为true。通过这个优化,这个只有map任务的mapreduce会引入reduce过程,这样动态分区的那个字段比如日期在传到reducer时会被排序。由于分区字段是排序的,因此每个reducer只需要保持一个文件写入器(file writer)随时处于打开状态,在收到来自特定分区的所有行后,关闭记录写入器(record writer),从而减小内存压力。这种优化方式在写parquet文件时使用的内存要相对少一些,但代价是要对分区字段进行排序。
-
SET hive.optimize.sort.dynamic.partition=true;
-
INSERT OVERWRITE TABLE [table] SELECT ...
2.第二种方式就是增加每个mapper的内存分配,即增大mapreduce.map.memory.mb和mapreduce.map.java.opts,这样所有文件写入器(filewriter)缓冲区对应的内存会更充沛。
3.将查询分解为几个较小的查询,以减少每个查询创建的分区数量。这样可以让每个mapper打开较少的文件写入器(file writer)。
备注:
默认情况下,Hive为每个打开的Parquet文件缓冲区(file buffer)分配128MB。这个buffer大小由参数parquet.block.size控制。为获得最佳性能,parquet的buffer size需要与HDFS的block size保持对齐(比如相等),从而使每个parquet文件在单个HDFS的块中,以便每个I/O请求都可以读取整个数据文件,而无需通过网络传输访问后续的block。
-
-- set Parquetbuffer size to 256MB (in bytes)
-
set parquet.block.size=268435456