Filter的作用是谓词下推,就是在Scan查询数据时,将过滤数据的操作放到服务端进行,减少数据的传输,减少网络IO。
介绍Filter使用方法的文章很多,就不再赘述了,主要记录下如何自定义Filter。
解析
在一次Scan的过程中,Filter存在于2个地方:
- RegionScannerImpl中,用来做整理流程的把控、StoreScanner的过滤和行级别的过滤处理。
- ScanQueryMatcher中,做Cell级别的过滤处理。
RegionScannerImpl中调用Filter的方法:
- 首先是isFamilyEssential,在HRegion创建Scanner时,用于筛选StoreScanner,当有多个列族时,筛选不通过的StoreScanner将放进joinedHeap中。joinedHeap中的StoreScanner不参与主流程的查询,只在主要StoreScanner查询完,确定了要这行数据,才会执行查询,并且只执行ScanQueryMatcher中调用Filter的方法。
- hasFilterRow。返回true的话,才有机会执行filterRowCells、filterRowCellsWithRet。
- filterRowCells。
- filterRowKey。这里Filter判断是否要过滤掉这一行,true表示过滤掉这一行。
- filterRowCellsWithRet。
- filterRow。执行差不多了,再判断一次,是否要过滤掉这一行。
- 在完成了一行的查询之后,会调用reset,清空临时状态。另外有种特殊情况,当Size、Time或Batch达到上限时,查询会提前返回,这时候,是不调用reset的,状态会持续到下一次请求。
- filterAllRemaining。true表示筛选是否完成了,没有更多数据了,整个Scan完成。
ScanQueryMatcher中调用Filter的方法:
- filterAllRemaining。里面也给了一次机会,判断是否执行完成了Scan。
- filterKeyValue。ColumnTracker返回值是MatchCode.INCLUDE时,默认一定是MatchCode.INCLUDE,会调用filterKeyValue,执行Filter的过滤操作,返回值是MatchCode。
- transformCell、getNextCellHint。严格来说,这2个方法是在StoreScanner里执行的
- 判断返回值为INCLUDE_AND_SEEK_NEXT_COL时,执行Filter.transformCell。
- 判断返回值为SEEK_NEXT_USING_HINT时,执行Filter.getNextCellHint。
Filter还有2个方法,用于RPC时,将Filter发送到服务端
toByteArray。首先需要转成字节码发送,调用的就是Filter的toByteArray方法。
parseFrom。到服务端再转成Filter,调用的是Filter的类方法parseFrom。
Protobuf安装
既然要发送实例到服务端,就需要Protobuf了。HBase1.3.2版本使用的是protobuf2.5.0,所以这里装个2.5.0。其他版本在Git Release上找下,安装流程是一样的。2.x和3.x生成的Java代码并不兼容,已有3.x也按照下面步骤执行就行。
- 下载protobuf-2.5.0.zip,解压。
- 检查下是否安装autoreconf和make,执行
brew install autoconf && brew install automake
- 到目录下执行
./autogen.sh && ./configure && make
- 检查下make状态,
make check
,没问题执行make install
- 检查下是否安装成功,
protoc --version
显示libprotoc 2.5.0
基本示例
先写个Filter.proto文件,版本proto2。外部类名字不能和内部类一样,名就取个Filter。
syntax = "proto2";
option java_package = "cn.edu.bupt.hbase.protobuf.generated";
option java_outer_classname = "Filter";
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;
message AnFilter {
}
执行protoc编译命令,生成Protobuf版的Filter类。
protoc --proto_path=src/main/protobuf/ --java_out=src/main/java/ src/main/protobuf/Filter.proto
写个Java的Filter类,继承FilterBase,引用前面生成的Filter.AnFilter。
public class AnFilter extends FilterBase {
@Override
public ReturnCode filterKeyValue(Cell cell) throws IOException {
// 默认值
return null;
}
public byte[] toByteArray() {
// 使用Filter.AnFilter
Filter.AnFilter.Builder builder = Filter.AnFilter.newBuilder();
return builder.build().toByteArray();
}
public static AnFilter parseFrom(byte[] pbBytes) throws DeserializationException {
// 这里暂时不解析了,直接new一个
return new AnFilter();
}
}
把代码打包,放入HBase的classpath里,重启HBase。
在ProtobufUtil.toFilter打个断点,客户端执行Scan代码,即进入断点,看到成功在Server端生成了Filter。
进阶示例
让我们写个a+b=c的Filter,并且只返回一行。随便放些数据进去
f:a | f:b | f:v | |
---|---|---|---|
row0 | value0 | ||
row1 | 1 | 1 | |
row2 | 2 | 2 | value2 |
row3 | 3 | value3 | |
row4 | 4 | ||
row5 | 5 | 5 | value5 |
row6 | 4 | 6 | value6 |
更改Filter.proto,增加一个变量c,这个变量是要传给服务端的
message AnFilter {
required int32 c = 1;
}
重新执行protoc编译命令,编译生成Filter。
修改Java的AnFilter,也增加上变量c,修改toByteArray把c设置进去、parseFrom再从byte[]里把c读出来。
public class AnFilter extends FilterBase {
private int c;
public AnFilter(int c) {
this.c = c;
}
public byte[] toByteArray() {
Filter.AnFilter.Builder builder = Filter.AnFilter.newBuilder();
builder.setC(c);
return builder.build().toByteArray();
}
public static AnFilter parseFrom(byte[] pbBytes) throws DeserializationException {
Filter.AnFilter filter;
try {
filter = Filter.AnFilter.parseFrom(pbBytes);
} catch (InvalidProtocolBufferException e) {
throw new DeserializationException(e);
}
return new AnFilter(filter.getC());
}
首先要找到a+b=c的行,找到qualifier是a,记录下值;找到qualifier是b,记录下值。a、b都找到了,判断下和是不是等于c,等于就返回ReturnCode.INCLUDE,把当前行找齐;否则跳过,返回ReturnCode.NEXT_ROW,继续找下一行。如果这一行还没找到a、b,还是得先返回ReturnCode.INCLUDE,不然这列就不会包含进去了。
private int sum;
private boolean hasA;
private boolean hasB;
private boolean foundC;
@Override
public ReturnCode filterKeyValue(Cell cell) throws IOException {
String qualifier = Bytes.toString(cell.getQualifierArray(),
cell.getQualifierOffset(),
cell.getQualifierLength());
if ("a".equals(qualifier)) {
sum += Bytes.toInt(cell.getValueArray(), cell.getValueOffset(),
cell.getValueLength());
hasA = true;
} else if ("b".equals(qualifier)) {
sum += Bytes.toInt(cell.getValueArray(), cell.getValueOffset(),
cell.getValueLength());
hasB = true;
}
if (hasA && hasB) {
if (sum == c) {
foundC = true;
return ReturnCode.INCLUDE;
} else {
return ReturnCode.NEXT_ROW;
}
}
return ReturnCode.INCLUDE;
}
那列都包含进去了,如果后面发现a+b!=c怎么办?没事,还有filterRow方法,没找到就返回true,把当前列过滤掉。
@Override
public boolean filterRow() throws IOException {
return !foundC;
}
一行执行完成,要重置下临时状态。
@Override
public void reset() throws IOException {
sum = 0;
hasA = false;
hasB = false;
}
找齐了这行就不继续找了,filterAllRemaining返回true。找到了a、b,不能直接返回true,因为可能还有其他列,filterAllRemaining是每个cell都执行的方法。所以在找齐一列的时候设置下值。
private boolean filterAllRemaining;
@Override
public void reset() throws IOException {
……
filterAllRemaining = foundC;
}
@Override
public boolean filterAllRemaining() throws IOException {
return filterAllRemaining;
}
打包放进HBase,重启HBase,客户端执行代码。
scan.setFilter(new AnFilter(10));
返回找到的那行。
row5 5 5 value5