HIVE自定义函数类型
1)Hive 自带了一些函数,比如:max/min等,但是数量有限,自己可以通过自定义UDF来方便的扩展。
2)当Hive提供的内置函数无法满足你的业务处理需要时,此时就可以考虑使用用户自定义函数(UDF:user-defined function)。
3)根据用户自定义函数类别分为以下三种:
(1)UDF(User-Defined-Function)
一进一出
(2)UDAF(User-Defined Aggregation Function)
聚集函数,多进一出
类似于:count/max/min
(3)UDTF(User-Defined Table-Generating Functions)
一进多出
如lateral view explore()
4)官方文档地址
https://cwiki.apache.org/confluence/display/Hive/HivePlugins
5)编程步骤:
(1)继承org.apache.hadoop.hive.ql.exec.UDF
(2)需要实现evaluate函数;evaluate函数支持重载;
(3)在hive的命令行窗口创建函数
添加jar
add jar linux_jar_path
创建function
create [temporary] function [dbname.]function_name AS class_name;
(4)在hive的命令行窗口删除函数
Drop [temporary] function [if exists] [dbname.]function_name;
6)注意事项:UDF必须要有返回类型,可以返回null,但是返回类型不能为void;
1)在项目中是否自定义过UDF、UDTF函数,以及用他们处理了什么问题,及自定义步骤?
(1)用UDF函数解析公共字段;用UDTF函数解析事件字段。
(2)自定义UDF:继承UDF,重写evaluate方法
(3)自定义UDTF:继承自GenericUDTF,重写3个方法:initialize(自定义输出的列名和类型),process(将结果返回forward(result)),close
2)为什么要自定义UDF/UDTF?
因为自定义函数,可以自己埋点Log打印日志,出错或者数据异常,方便调试。
示例:
1.Maven 依赖
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.hive/hive-exec -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.1.2</version>
</dependency>
</dependencies>
UDF 一进一出
进行数据结构的转换
把JSON数组字符串转换为 hive 中结构体格式
jsonstr ==> array<struct<action_id:string,item:string,item_type:string,ts:bigint>>
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.json.JSONArray;
import org.json.JSONObject;
import java.util.ArrayList;
import java.util.List;
@Description(name = "json_array_to_struct_array", value = "-- convert json_array to struct_array")
public class JsonArrayToStructArray extends GenericUDF {
/*
对输入检测
返回输出的值的对象检测器
*/
@Override
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
// 1. 对输入检测
if (arguments.length < 3) {
throw new UDFArgumentException("参数个数必须执行3个");
}
for (ObjectInspector argument : arguments) {
if (!"string".equals(argument.getTypeName())) {
throw new UDFArgumentException("参数必须是 string");
}
}
// 2. 返回输出的值的对象检测器
// array(struct(k:v, k:v), struct(...))
List<String> fieldNames = new ArrayList<>(); // 结构体的每个k的名字
List<ObjectInspector> oiList = new ArrayList<>(); // 结构体的每个k的名字
int size = arguments.length;
/*for (int i = 1; i < (size - 1) / 2 + 1; i++) {
String fieldName = getConstantStringValue(arguments, i).split(":")[0];
fieldNames.add(fieldName);
}*/
for (int i = (size - 1) / 2 + 1; i < size; i++) {
String fieldName = getConstantStringValue(arguments, i).split(":")[0];
fieldNames.add(fieldName);
// 不同的类型, 使用不同的检测器
String type = getConstantStringValue(arguments, i).split(":")[1];
switch (type) {
case "string":
oiList.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
break;
case "int":
oiList.add(PrimitiveObjectInspectorFactory.javaIntObjectInspector);
break;
case "bigint":
oiList.add(PrimitiveObjectInspectorFactory.javaLongObjectInspector);
break;
default:
throw new UDFArgumentException("未知的不支持的类型....");
}
}
return ObjectInspectorFactory
.getStandardListObjectInspector(ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, oiList));
}
/*
对传入的数据做计算, 返回函数最终的值
*/
@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
if (arguments[0].get() == null) {
return null;
}
// 1.获取传入的json数组
String jsonArrayString = arguments[0].get().toString();
JSONArray jsonArray = new JSONArray(jsonArrayString);
// 2. 解析数组中的数据
// 2.1 最终的数组
List<List<Object>> result = new ArrayList<>();
// 2.2 解析出来需要的每个结构体
for(int i = 0; i < jsonArray.length(); i++){
List<Object> struct = new ArrayList<>();
result.add(struct);
JSONObject obj = jsonArray.getJSONObject(i);
// 表示结构体应该有多个少个字段
for(int j = 1; j < (arguments.length - 1)/2 + 1; j++){
// 获取字段名
String name = arguments[j].get().toString();
if(obj.has(name)){
struct.add(obj.get(name));
}else{
struct.add(null);
}
}
/*
{
"displayType":"promotion",
"item":"3",
"item_type":"sku_id",
"order":1
}
json_array_to_struct_array(
get_json_object(line,'$.actions'),
'action_id',
'item',
'item_type',
'ts',
'action_id:string',
'item:string',
'item_type:string',
'ts:bigint')
array(struct(..), struct(....))
*/
}
return result;
}
/*
select a(...)
返回要展示的字符串
*/
@Override
public String getDisplayString(String[] children) {
return getStandardDisplayString("json_array_to_struct_array", children);
}
}
/*
[
{
"displayType":"promotion",
"item":"3",
"item_type":"sku_id",
"order":1
},
{
"displayType":"promotion",
"item":"1",
"item_type":"sku_id",
"order":2
},
{
"displayType":"query",
"item":"7",
"item_type":"sku_id",
"order":3
},
{
"displayType":"promotion",
"item":"5",
"item_type":"sku_id",
"order":4
}
]
*/
UDTF 一进多出
例如:把JSON数组:
[{"action_id":"favor_add","item":"6","item_type":"sku_id","ts":1592841743610},
{"action_id":"cart_add","item":"6","item_type":"sku_id","ts":1592841746886}]
转换为多行JSON对象:
{"action_id":"favor_add","item":"6","item_type":"sku_id","ts":1592841743610}
{"action_id":"cart_add","item":"6","item_type":"sku_id","ts":1592841746886}
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.*;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.json.JSONArray;
import java.util.ArrayList;
import java.util.List;
/**
* udtf: 一进多出
* 1进: [{}, {}]
* 多出: "{}", "{}"
*/
@Description(name = "explode_json_array", value = "explode json array ....")
public class ExplodeJsonArray extends GenericUDTF {
/*
作用:
1.检测输入
2. 返回期望的数据类型的检测器
*/
@Override
public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
// 1. 检测输入
// 1.1 获取到传入的参数
List<? extends StructField> inputFields = argOIs.getAllStructFieldRefs();
if(inputFields.size() != 1)
throw new UDFArgumentException("函数 explode_json_array 需要正好1个参数");
ObjectInspector inspector = inputFields.get(0).getFieldObjectInspector();
if(inspector.getCategory() != ObjectInspector.Category.PRIMITIVE || !inspector.getTypeName().equals("string")){
throw new UDFArgumentException("函数 explode_json_array 参数类型不匹配, 必须是一个字符串");
}
// 2. 返回期望数据类型的检测器
List<String> names = new ArrayList<>();
names.add("action");
List<ObjectInspector> inspectors = new ArrayList<>();
inspectors.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(names, inspectors);
}
/*
处理数据 [{}, {}, ...]
*/
@Override
public void process(Object[] args) throws HiveException {
String jsonArrayString = args[0].toString();
JSONArray jsonArray = new JSONArray(jsonArrayString);
for (int i = 0; i < jsonArray.length(); i++) {
String col = jsonArray.getString(i);
String[] cols = new String[1];
cols[0] = col;
forward(cols); // 为什么要传递数组? 有可能炸裂出来多列数据, 所以才需要传递数字
}
}
/**
* 关闭资源
* 不用实现
*/
@Override
public void close() throws HiveException {
}
}