【大数据干货】Hive中的 UDTF用户定义表函数使用指南

时间:2018-07-02 20:18:27   来源:上海尚学堂   阅读:

在这篇文章中,我们将深入了解用户定义表函数(UDTF),该函数的实现是通过继承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF这个抽象通用类,UDTF相对UDF更为复杂,但是通过它,我们读入一个数据域,输出多行多列,而UDF只能输出单行单列。

 

1、代码

文章中所有的代码可以在这里找到:hive examplesGitHub repository

 

2、示例数据

首先先创建一张包含示例数据的表:people,该表只有name一列,该列中包含了一个或多个名字,该表数据保存在people.txt文件中。
~$ cat ./people.txt

John Smith
John and Ann White
Ted Green
Dorothy
把该文件上载到hdfs目录/user/matthew/people中:
hadoop fs -mkdir people
hadoop fs -put ./people.txt people

下面要创建hive外部表,在hive shell中执行

  1.  
    CREATE EXTERNAL TABLE people (name string)
  2.  
    ROW FORMAT DELIMITED FIELDS
  3.  
    TERMINATED BY '\t'
  4.  
    ESCAPED BY ''
  5.  
    LINES TERMINATED BY '\n'
  6.  
    STORED AS TEXTFILE
  7.  
    LOCATION '/user/matthew/people';

3、UDTF的输出值

上一文章讲解的UDF与GenericUDF函数是操作单个数据域。它们必须要返回一个值。但是这并不适用于所用的数据处理任务。Hive可以存储许多类型的数据,而有时候我们并不想单数据域输入、单数据域输出。对于每一行的输入,可能我们想输出多行,又或是不输出,举个例子,想一下函数explode(一个hive内置函数)的作用。
同样,可能我们也想输出多列,而不是输出单列。
以上所有的要求我们可以用UDTF去完成。
 

4、实例

首先我们先假设我们想清洗people这张表中的人名,这个新的表有:
1、姓和名 两个分开的列
2、所有记录都包含姓名
3、每条记录或有包含多个人名(eg Nick and Nicole Smith)
为了达到这个实例目的,我们将实现以下API:
org.apache.hadoop.hive.ql.udf.generic.GenericUDTF
我们将覆盖以下三个方法:

  1.  
    //该方法中,我们将指定输入输出参数:输入参数的ObjectInspector与输出参数的StructObjectInspector
  2.  
    abstract StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException;
  3.  
     
  4.  
    //我们将处理一条输入记录,输出若干条结果记录
  5.  
    abstract void process(Object[] record) throws HiveException;
  6.  
     
  7.  
    //当没有记录处理的时候该方法会被调用,用来清理代码或者产生额外的输出
  8.  
    abstract void close() throws HiveException;

5、代码实现

 

5.1 完整代码


  1.  
    public class NameParserGenericUDTF extends GenericUDTF {
  2.  
     
  3.  
    private PrimitiveObjectInspector stringOI = null;
  4.  
     
  5.  
    @Override
  6.  
    public StructObjectInspector initialize(ObjectInspector[] args) UDFArgumentException {
  7.  
     
  8.  
    if (args.length != 1) {
  9.  
    throw new UDFArgumentException("NameParserGenericUDTF() takes exactly one argument");
  10.  
    }
  11.  
     
  12.  
    if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE
  13.  
    && ((PrimitiveObjectInspector) args[0]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING) {
  14.  
    throw new UDFArgumentException("NameParserGenericUDTF() takes a string as a parameter");
  15.  
    }
  16.  
     
  17.  
    // 输入格式(inspectors)
  18.  
    stringOI = (PrimitiveObjectInspector) args[0];
  19.  
     
  20.  
    // 输出格式(inspectors) -- 有两个属性的对象
  21.  
    List fieldNames = new ArrayList(2);
  22.  
    List fieldOIs = new ArrayList(2);
  23.  
    fieldNames.add("name");
  24.  
    fieldNames.add("surname");
  25.  
    fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
  26.  
    fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
  27.  
    return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
  28.  
    }
  29.  
     
  30.  
    public ArrayList processInputRecord(String name){
  31.  
    ArrayList result = new ArrayList();
  32.  
     
  33.  
    // 忽略null值与空值
  34.  
    if (name == null || name.isEmpty()) {
  35.  
    return result;
  36.  
    }
  37.  
     
  38.  
    String[] tokens = name.split("\\s+");
  39.  
     
  40.  
    if (tokens.length == 2){
  41.  
    result.add(new Object[] { tokens[0], tokens[1] });
  42.  
    }else if (tokens.length == 4 && tokens[1].equals("and")){
  43.  
    result.add(new Object[] { tokens[0], tokens[3] });
  44.  
    result.add(new Object[] { tokens[2], tokens[3] });
  45.  
    }
  46.  
     
  47.  
    return result;
  48.  
    }
  49.  
     
  50.  
    @Override
  51.  
    public void process(Object[] record) throws HiveException {
  52.  
     
  53.  
    final String name = stringOI.getPrimitiveJavaObject(record[0]).toString();
  54.  
     
  55.  
    ArrayList results = processInputRecord(name);
  56.  
     
  57.  
    Iterator it = results.iterator();
  58.  
     
  59.  
    while (it.hasNext()){
  60.  
    Object[] r = it.next();
  61.  
    forward(r);
  62.  
    }
  63.  
    }
  64.  
     
  65.  
    @Override
  66.  
    public void close() throws HiveException {
  67.  
    // do nothing
  68.  
    }
  69.  
    }
以上代码可以从:github目录 check 下来。
 

5.2 代码走读

该UDTF以string类型作为参数,返回一个拥有两个属性的对象,与GenericUDF比较相似,指定输入输出数据格式(objectinspector),以便hive能识别输入与输出。

我们为输入的string参数定义了数据格式PrimitiveObjectInspector
stringOI = (PrimitiveObjectInspector) args[0]

定义输出数据格式(objectinspectors) 需要我们先定义两个属性名称,因为(objectinspectors)需要读取每一个属性(在这个实例中,两个属性都是string类型)。

  1.  
    List fieldNames = new ArrayList(2);
  2.  
    fieldNames.add("name");
  3.  
    fieldNames.add("surname");
  4.  
     
  5.  
    List fieldOIs = new ArrayList(2);
  6.  
    fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
  7.  
    fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
  8.  
     
  9.  
    return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);

我们主要的处理逻辑放在这个比较直观的processInputRecord方法当中。分开逻辑处理有利我们进行更简单的单元测试,而不用涉及到繁琐的objectinspector。
最后,一旦得到结果就可以对其进行forward,把基注册为hive处理后的输出记录对象。

  1.  
    while (it.hasNext()){
  2.  
    Object[] r = it.next();
  3.  
    forward(r);
  4.  
    }
  5.  
    }

5.3 使用该UDTF函数

我们可以在hive中创建我们自己的函数
mvn package
cp target/hive-extensions-1.0-SNAPSHOT-jar-with-dependencies.jar ./ext.jar
然后在hive中使用

  1.  
    ADD JAR ./ext.jar;
  2.  
     
  3.  
    CREATE TEMPORARY FUNCTION process_names as 'com.matthewrathbone.example.NameParserGenericUDTF';
  4.  
     
  5.  
    SELECT
  6.  
    adTable.name,
  7.  
    adTable.surname
  8.  
    FROM people
  9.  
    lateral view process_names(name) adTable as name, surname;
输出
OK
John	Smith
John	White
Ann		White
Ted		Green

6、Hive优化中的UDTF

Hive优化:看做mapreduce处理
 a\排序优化:sort by 效率高于 order by
 b\分区:使用静态分区 (statu_date="20160516",location="beijin"),每个分区对应hdfs上的一个目录
 c\减少job和task数量:使用表链接操作
 d\解决groupby数据倾斜问题:设置hive.groupby.skewindata=true ,那么hive会自动负载均衡
 e\小文件合并成大文件:表连接操作
 f\使用UDF或UDAF函数:http://www.shsxt.com/it/bigdata/1226.html


7. UDTF使用规则

UDTF有两种使用方法,或者说是规则,一种直接放到select后面,一种和lateral view一起使用。

 

7.1:直接select中使用

select explode_map(properties) as (col1,col2) from src;

不可以添加其他字段使用

select a, explode_map(properties) as (col1,col2) from src

不可以嵌套调用

select explode_map(explode_map(properties)) from src

不可以和group by/cluster by/distribute by/sort by一起使用

select explode_map(properties) as (col1,col2) from src group by col1, col2

 

7.2:和lateral view一起使用

select src.id, mytable.col1, mytable.col2 from src lateral view explode_map(properties) mytable as col1, col2;

此方法更为方便日常使用。执行过程相当于单独执行了两次抽取,然后union到一个表里。


上海尚学堂大数据培训整理编辑,更多大数据文章阅读请返回本栏目查看。如需要大数据课程详情和学习资料,请联系咨询客服。
分享:0

电话咨询

客服热线服务时间

周一至周五 9:00-21:00

周六至周日 9:00-18:00

咨询电话

021-67690939
15201841284

微信扫一扫