HIVE 自定义UDF、UDTF函数

HIVE自定义函数类型

1)Hive 自带了一些函数,比如:max/min等,但是数量有限,自己可以通过自定义UDF来方便的扩展。

2)当Hive提供的内置函数无法满足你的业务处理需要时,此时就可以考虑使用用户自定义函数(UDF:user-defined function)。

3)根据用户自定义函数类别分为以下三种:

(1)UDF(User-Defined-Function)

一进一出

(2)UDAF(User-Defined Aggregation Function)

聚集函数,多进一出

类似于:count/max/min

(3)UDTF(User-Defined Table-Generating Functions)

一进多出

如lateral view explore()

4)官方文档地址

https://cwiki.apache.org/confluence/display/Hive/HivePlugins

5)编程步骤:

(1)继承org.apache.hadoop.hive.ql.exec.UDF

(2)需要实现evaluate函数;evaluate函数支持重载;

(3)在hive的命令行窗口创建函数

添加jar

add jar linux_jar_path

创建function

create [temporary] function [dbname.]function_name AS class_name;

(4)在hive的命令行窗口删除函数

Drop [temporary] function [if exists] [dbname.]function_name;

6)注意事项:UDF必须要有返回类型,可以返回null,但是返回类型不能为void;

1)在项目中是否自定义过UDF、UDTF函数,以及用他们处理了什么问题,及自定义步骤?
(1)用UDF函数解析公共字段;用UDTF函数解析事件字段。
(2)自定义UDF:继承UDF,重写evaluate方法
(3)自定义UDTF:继承自GenericUDTF,重写3个方法:initialize(自定义输出的列名和类型),process(将结果返回forward(result)),close
2)为什么要自定义UDF/UDTF?
因为自定义函数,可以自己埋点Log打印日志,出错或者数据异常,方便调试。

示例:

1.Maven 依赖

<dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.hive/hive-exec -->
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>3.1.2</version>
        </dependency>
</dependencies>

UDF 一进一出

进行数据结构的转换
把JSON数组字符串转换为 hive 中结构体格式

 jsonstr  ==>  array<struct<action_id:string,item:string,item_type:string,ts:bigint>>

import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.json.JSONArray;
import org.json.JSONObject;

import java.util.ArrayList;
import java.util.List;

@Description(name = "json_array_to_struct_array", value = "-- convert json_array to struct_array")
public class JsonArrayToStructArray extends GenericUDF {
    /*
        对输入检测
        返回输出的值的对象检测器
     */
    @Override
    public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
        // 1. 对输入检测
        if (arguments.length < 3) {
            throw new UDFArgumentException("参数个数必须执行3个");
        }
        for (ObjectInspector argument : arguments) {
            if (!"string".equals(argument.getTypeName())) {
                throw new UDFArgumentException("参数必须是 string");
            }
        }
        // 2. 返回输出的值的对象检测器
        // array(struct(k:v, k:v), struct(...))
        List<String> fieldNames = new ArrayList<>();  // 结构体的每个k的名字
        List<ObjectInspector> oiList = new ArrayList<>();  // 结构体的每个k的名字

        int size = arguments.length;
        /*for (int i = 1; i < (size - 1) / 2 + 1; i++) {
            String fieldName = getConstantStringValue(arguments, i).split(":")[0];
            fieldNames.add(fieldName);
        }*/

        for (int i = (size - 1) / 2 + 1; i < size; i++) {

            String fieldName = getConstantStringValue(arguments, i).split(":")[0];
            fieldNames.add(fieldName);

            // 不同的类型, 使用不同的检测器
            String type = getConstantStringValue(arguments, i).split(":")[1];
            switch (type) {
                case "string":
                    oiList.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
                    break;
                case "int":
                    oiList.add(PrimitiveObjectInspectorFactory.javaIntObjectInspector);
                    break;
                case "bigint":
                    oiList.add(PrimitiveObjectInspectorFactory.javaLongObjectInspector);
                    break;

                default:
                    throw new UDFArgumentException("未知的不支持的类型....");
            }
        }

        return ObjectInspectorFactory
                .getStandardListObjectInspector(ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, oiList));
    }

    /*
       对传入的数据做计算, 返回函数最终的值
     */
    @Override
    public Object evaluate(DeferredObject[] arguments) throws HiveException {

        if (arguments[0].get() == null) {
            return null;
        }

        // 1.获取传入的json数组
        String jsonArrayString = arguments[0].get().toString();
        JSONArray jsonArray = new JSONArray(jsonArrayString);

        // 2. 解析数组中的数据
        // 2.1 最终的数组

        List<List<Object>> result = new ArrayList<>();
        // 2.2 解析出来需要的每个结构体
        for(int i = 0; i < jsonArray.length(); i++){
            List<Object> struct = new ArrayList<>();
            result.add(struct);

            JSONObject obj = jsonArray.getJSONObject(i);

            // 表示结构体应该有多个少个字段
            for(int j = 1; j < (arguments.length - 1)/2 + 1; j++){
                // 获取字段名
                String name = arguments[j].get().toString();
                if(obj.has(name)){
                    struct.add(obj.get(name));
                }else{
                    struct.add(null);
                }
            }
            /*
            {
                    "displayType":"promotion",
                    "item":"3",
                    "item_type":"sku_id",
                    "order":1
            }

             json_array_to_struct_array(
                       get_json_object(line,'$.actions'),
                       'action_id',
                      'item',
                      'item_type',
                      'ts',
                      'action_id:string',
                      'item:string',
                      'item_type:string',
                      'ts:bigint')
        array(struct(..), struct(....))

             */

        }

        return result;
    }

    /*
    select  a(...)
    返回要展示的字符串
     */
    @Override
    public String getDisplayString(String[] children) {
        return getStandardDisplayString("json_array_to_struct_array", children);
    }
}
/*
[
        {
            "displayType":"promotion",
            "item":"3",
            "item_type":"sku_id",
            "order":1
        },
        {
            "displayType":"promotion",
            "item":"1",
            "item_type":"sku_id",
            "order":2
        },
        {
            "displayType":"query",
            "item":"7",
            "item_type":"sku_id",
            "order":3
        },
        {
            "displayType":"promotion",
            "item":"5",
            "item_type":"sku_id",
            "order":4
        }
]
 */

UDTF 一进多出

例如:把JSON数组:
[{"action_id":"favor_add","item":"6","item_type":"sku_id","ts":1592841743610},
{"action_id":"cart_add","item":"6","item_type":"sku_id","ts":1592841746886}]

转换为多行JSON对象:
{"action_id":"favor_add","item":"6","item_type":"sku_id","ts":1592841743610}
{"action_id":"cart_add","item":"6","item_type":"sku_id","ts":1592841746886}

import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.*;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.json.JSONArray;

import java.util.ArrayList;
import java.util.List;

/**
 * udtf:  一进多出
 *  1进:   [{}, {}]
 *  多出:  "{}", "{}"
 */
@Description(name = "explode_json_array", value = "explode json array ....")
public class ExplodeJsonArray extends GenericUDTF {
    /*
      作用:
      1.检测输入
      2. 返回期望的数据类型的检测器
     */
    @Override
    public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
        // 1. 检测输入
        // 1.1 获取到传入的参数
        List<? extends StructField> inputFields = argOIs.getAllStructFieldRefs();
        if(inputFields.size() != 1)
            throw new UDFArgumentException("函数 explode_json_array 需要正好1个参数");

        ObjectInspector inspector = inputFields.get(0).getFieldObjectInspector();
        if(inspector.getCategory() != ObjectInspector.Category.PRIMITIVE || !inspector.getTypeName().equals("string")){
            throw new UDFArgumentException("函数 explode_json_array 参数类型不匹配, 必须是一个字符串");
        }
        // 2. 返回期望数据类型的检测器
        List<String> names = new ArrayList<>();
        names.add("action");
        List<ObjectInspector> inspectors = new ArrayList<>();
        inspectors.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

        return ObjectInspectorFactory.getStandardStructObjectInspector(names, inspectors);

    }
    /*
        处理数据   [{}, {}, ...]
     */
    @Override
    public void process(Object[] args) throws HiveException {
        String jsonArrayString = args[0].toString();
        JSONArray jsonArray = new JSONArray(jsonArrayString);
        for (int i = 0; i < jsonArray.length(); i++) {
            String col = jsonArray.getString(i);
            String[] cols = new String[1];
            cols[0] = col;
            forward(cols);  // 为什么要传递数组? 有可能炸裂出来多列数据, 所以才需要传递数字
        }
    }
    /**
     * 关闭资源
     * 不用实现
     */
    @Override
    public void close() throws HiveException {

    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,980评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,178评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,868评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,498评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,492评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,521评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,910评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,569评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,793评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,559评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,639评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,342评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,931评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,904评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,144评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,833评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,350评论 2 342