开发部署HBase Endpoint Coprocessor

一、前言

本文示例协处理器实现根据scan条件对指点列进行count、avg。这里推荐HBase技术社区文章,该文非常详细的介绍了如何开发并部署一个HBase Endpoint Coprocessor。本文基于该文基础上,介绍如何在参数中传入Scan、Filter等参数。

二、环境准备

1、下载安装Protobuf,请根据当前hbase集群使用的Protobuf版本来进行安装。
2、配置环境变量
3、创建测试表

三、Protobuf生成序列化类

protobuf本身支持的数据类型不多,如果参数需要使用一个对象或者scan range,scan filter怎么办?本文主要介绍如何传入一个Scan对象

1、构建proto环境
可以找到hbase中找到hbase-protocol项目,protobuf目录下有着hbase已经定义好的许多proto,本文需要使用的Scan对象在Client.proto中定义,将需要的或者所有proto文件拷贝到上述安装Protobuf环境的机器上。

image

2、创建ClifeProto.proto文件,内容如下:

syntax = "proto2";
option java_package = "com.clife.data.hbase.coprocessor.aggregate";
option java_outer_classname = "ClifeProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;

import "Client.proto";

message ClifeAvgRequest {
    required string family = 1;
    required string columns = 2;
    optional Scan scan = 3;
}

message ClifeAvgResponse {
    required int64 count = 1;
    required string values = 2;
}

service ClifeService {
  rpc getAvg(ClifeAvgRequest)
    returns (ClifeAvgResponse);
}

注意ClifeProto.proto需要与Client.proto在同一目录下,如图:


image.png

3、生成java类

protoc --java_out=./ ClifeProto.proto

--java_out后面是指定生成java类的输出目录
执行完后可以在上图的com目录下找到对应的java类。

四、Endpoint Coprocessor服务端实现

1、构建maven项目

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.6.0-cdh5.7.2</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>2.6.0-cdh5.7.2</version>
</dependency>
<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>1.2.0-cdh5.7.2</version>
</dependency>
<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-examples</artifactId>
    <version>1.2.0-cdh5.7.2</version>
</dependency>
<dependency>
    <groupId>com.google.protobuf</groupId>
    <artifactId>protobuf-java</artifactId>
    <version>2.5.0</version>
</dependency>

在maven package的时候可以将上述依赖排除掉,避免打包后jar很大。
将步骤三中生成的java类,拷贝到ClifeProto.proto文件中配置的com.clife.data.hbase.coprocessor.aggregate包下。
2、构建Endpoint实现类

public class ClifeProtosEndPoint extends ClifeProtos.ClifeService implements Coprocessor, CoprocessorService {

    protected static final Log log = LogFactory.getLog(ClifeProtosEndPoint.class);
    private RegionCoprocessorEnvironment env;

    /**
     * 计算平均值
     * 根据客户端传入的scan条件,对指定字段进行求和,
     * 将求和结果与数据条数返回客户端,由客户端完成求平均
     * 1、客户端需要传入参数:
     *    1)scan(可选),可以通过scan设置rowkeyRange、timeRange、filter等
     *    2)family(必须),每次rpc请求只允许操作一个列簇
     *    3)colums(必须),需要统计的列,多个列之间用逗号分隔,如:"weight,age"
     * 2、返回值:
     *    1)Count: Long,查询的数据条数
     *    2)Values:String,columns的求和结果,如:"weight:234,age:345"
     * @param controller
     * @param request
     * @param done
     */
    @Override
    public void getAvg(RpcController controller, ClifeProtos.ClifeAvgRequest request, RpcCallback<ClifeProtos.ClifeAvgResponse> done) {
        ClifeProtos.ClifeAvgResponse response = null;
        long counter = 0L;
        List<Cell> results = new ArrayList<>();
        InternalScanner scanner = null;
        try {
            log.info("Start Clife avg endpoint.........................");
            Scan scan = null;
            ClientProtos.Scan cScan = request.getScan();
            if (cScan != null) {
                scan = ProtobufUtil.toScan(request.getScan());
                byte[] startRow = scan.getStartRow();
                byte[] stopRow = scan.getStopRow();
                if (startRow != null && stopRow != null)
                    log.info("StartRow = " +  RowKeyUtil.convertByteRunDataRowKeyToString(startRow) +
                            ", StopRow = " + RowKeyUtil.convertByteRunDataRowKeyToString(stopRow));
            } else {
                scan = new Scan();
            }

            byte[] cf = Bytes.toBytes(request.getFamily());
            scan.addFamily(cf);
            //传入列的方式   sales,sales
            String colums = request.getColumns();
            log.info("Input colums: " + colums);
            Map<String, Long> columnMaps = new HashedMap();
            for (String column : colums.split(",")) {
                columnMaps.put(column, 0L);
                scan.addColumn(Bytes.toBytes(request.getFamily()), Bytes.toBytes(column));
            }
            scanner = this.env.getRegion().getScanner(scan);
            boolean hasMoreRows = false;
            do {
                hasMoreRows = scanner.next(results);

                if (results.size() > 0) {
                    counter++;
                }
                for (Cell cell : results) {
                    String column = Bytes.toString(CellUtil.cloneQualifier(cell));
                    String value = Bytes.toString(CellUtil.cloneValue(cell));
                    Long temp = Long.parseLong(value);
                    columnMaps.put(column, columnMaps.get(column) + temp);
                }

                results.clear();
            } while (hasMoreRows);
            StringBuffer values = new StringBuffer();
            for (String key : columnMaps.keySet()) {
                Long value = columnMaps.get(key);
                values.append(key).append(":").append(value).append(",");
            }
            log.info("Clife avg server result: " + values);
            response = ClifeProtos.ClifeAvgResponse.newBuilder()
                    .setCount(counter)
                    .setValues(values.toString())
                    .build();
        } catch (IOException e) {
            ResponseConverter.setControllerException(controller, e);
        } finally {
            if (scanner != null) {
                try {
                    scanner.close();
                } catch (IOException ignored) {
                }
            }
        }
        log.info("Row counter from this region is "
                + env.getRegion().getRegionInfo().getRegionNameAsString() + ": " + counter);
        done.run(response);
    }

    @Override
    public void start(CoprocessorEnvironment env) throws IOException {
        if (env instanceof RegionCoprocessorEnvironment) {
            this.env = (RegionCoprocessorEnvironment) env;
        } else {
            throw new CoprocessorException("Must be loaded on a table region!");
        }
    }

    @Override
    public void stop(CoprocessorEnvironment coprocessorEnvironment) throws IOException {
    }

    @Override
    public Service getService() {
        return this;
    }
}

3、Endpoint Coprocessor客户端实现

public class ClifeProtosExample {

    /**
     * 效率最高的方式
     * 通过HBase的coprocessorService(Class, byte[],byte[],Batch.Call,Callback<R>)方法获取表的总条数
     * @param table HBase表名
     * @return 返回表的总条数
     */
    public static long execFastEndpointCoprocessor(Table table, final Scan scan, final String family, final String cloumes) {
        long start_t = System.currentTimeMillis();
        //定义总的 rowCount 变量
        final AtomicLong totalRowCount = new AtomicLong();
        final Map<String, AtomicLong> sumMap = new HashMap<>();
        try {
            Batch.Callback<ClifeProtos.ClifeAvgResponse> callback = new Batch.Callback<ClifeProtos.ClifeAvgResponse>() {
                @Override
                public void update(byte[] bytes, byte[] bytes1, ClifeProtos.ClifeAvgResponse clifeAvgResponse) {
                    //更新Count值
                    totalRowCount.getAndAdd(clifeAvgResponse.getCount());
                    String values = clifeAvgResponse.getValues();
                    for(String kv : values.split(",")) {
                        String[] kvs = kv.split(":");
                        String key = kvs[0];
                        Long value = Long.parseLong(kvs[1]);
                        if (!sumMap.containsKey(key)) {
                            final AtomicLong sum = new AtomicLong();
                            sum.getAndAdd(value);
                            sumMap.put(key, sum);
                        } else {
                            sumMap.get(key).getAndAdd(value);
                        }
                    }
                }
            };

            final ClientProtos.Scan cScan = ProtobufUtil.toScan(scan);

            table.coprocessorService(ClifeProtos.ClifeService.class, scan.getStartRow(), scan.getStopRow(),
                    new Batch.Call<ClifeProtos.ClifeService, ClifeProtos.ClifeAvgResponse>() {
                        @Override
                        public ClifeProtos.ClifeAvgResponse call(ClifeProtos.ClifeService aggregationService) throws IOException {
                            ClifeProtos.ClifeAvgRequest requet =
                                    ClifeProtos.ClifeAvgRequest.newBuilder()
                                            .setScan(cScan)
                                            .setFamily(family)
                                            .setColumns(cloumes)
                                            .build();
                            BlockingRpcCallback<ClifeProtos.ClifeAvgResponse> rpcCallback = new BlockingRpcCallback<>();
                            aggregationService.getAvg(null, requet, rpcCallback);
                            ClifeProtos.ClifeAvgResponse response = rpcCallback.get();
                            return response;
                        }
                    }, callback);
        } catch (Throwable throwable) {
            throwable.printStackTrace();
        }
        System.out.println("耗时:" + (System.currentTimeMillis() - start_t));
        System.out.println("totalRowCount:" + totalRowCount.longValue());
        for (String key : sumMap.keySet()) {
            Double value = sumMap.get(key).doubleValue();
            System.out.println(key + " avg = " + value / totalRowCount.longValue());
        }

        return totalRowCount.longValue();
    }
}

注意:
1)本文协处理器在服务端只做求和,平均值在客户端完成;
2)传入的Scan对象,支持rowkey range、time range、filter等scan的条件过滤

五、部署及调用

1、maven编译
2、将编译好的jar上传到hdfs某目录下,注意文件所属用户组。如:/hbase/coprocessor

image.png

3、协处理器装载
协处理器的装载分为动态和静态两种,参照这篇文章。本文介绍的协处理器存在很重的业务性,并不适合动态加载。本文的目的也只是介绍如何在协处理器中使用Scan等对象。

alter 'test_coprocessor',METHOD=>'table_att','COPROCESSOR'=>'hdfs://nameservice1/hbase/coprocessor/clife-data-hbase-1.0.8.jar||com.clife.data.hbase.coprocessor.aggregate.ClifeProtosEndPoint||'
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,684评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,143评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,214评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,788评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,796评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,665评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,027评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,679评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,346评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,664评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,766评论 1 331
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,412评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,015评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,974评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,073评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,501评论 2 343