一百六十七、HBase观察者协处理器示例
示例
HBase提供了Observer Coprocessor(观察者协处理器)的示例。
下面给出更详细的例子。
这些示例假设一个名为users的表,其中有两个列族personalDet和salaryDet,包含个人和工资详细信息。下面是users表格:
personalDet | salaryDet | |||||
---|---|---|---|---|---|---|
jverne | Jules | Verne | 02/08/1828 | 12000 | 9000 | 3000 |
rowkey | name | lastname | dob | gross | net | allowances |
admin | Admin | Admin | ||||
cdickens | Charles | Dickens | 02/07/1812 | 10000 | 8000 | 2000 |
观察者示例
以下Observer协处理器可防止用户admin的详细信息在users表Get或者Scan中返回。
1、 编写一个实现RegionObserver类的类;
2、 重写preGetOp()方法(不推荐使用该preGet()方法)以检查客户端是否已使用admin值查询rowkey如果是,则返回空结果否则,正常处理请求;
3、 将您的代码和依赖项放在JAR文件中;
4、 将JAR放在HDFS中,HBase可以在其中找到它;
5、 加载协处理器;
6、 写一个简单的程序来测试它;
以下是上述步骤的实现:
public class RegionObserverExample implements RegionObserver {
private static final byte[] ADMIN = Bytes.toBytes("admin");
private static final byte[] COLUMN_FAMILY = Bytes.toBytes("details");
private static final byte[] COLUMN = Bytes.toBytes("Admin_det");
private static final byte[] VALUE = Bytes.toBytes("You can't see Admin details");
@Override
public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, final Get get, final List<Cell> results)
throws IOException {
if (Bytes.equals(get.getRow(),ADMIN)) {
Cell c = CellUtil.createCell(get.getRow(),COLUMN_FAMILY, COLUMN,
System.currentTimeMillis(), (byte)4, VALUE);
results.add(c);
e.bypass();
}
}
重写preGetOp()仅适用于Get操作。您还需要重写该preScannerOpen()方法以从扫描结果中过滤admin行。
@Override
public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e, final Scan scan,
final RegionScanner s) throws IOException {
Filter filter = new RowFilter(CompareOp.NOT_EQUAL, new BinaryComparator(ADMIN));
scan.setFilter(filter);
return s;
这种方法有效,但有副作用。
如果客户端在其扫描中使用了过滤器,则该过滤器将替换该过滤器。相反,您可以显式删除扫描中的任何admin结果:
@Override
public boolean postScannerNext(final ObserverContext<RegionCoprocessorEnvironment> e, final InternalScanner s,
final List<Result> results, final int limit, final boolean hasMore) throws IOException {
Result result = null;
Iterator<Result> iterator = results.iterator();
while (iterator.hasNext()) {
result = iterator.next();
if (Bytes.equals(result.getRow(), ROWKEY)) {
iterator.remove();
break;
}
}
return hasMore;
端点示例
仍然使用该users表,该示例使用端点协处理器实现协处理器以计算所有员工工资的总和。
1、 创建一个定义服务的’.proto’文件;
option java_package = "org.myname.hbase.coprocessor.autogenerated";
option java_outer_classname = "Sum";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;
message SumRequest {
required string family = 1;
required string column = 2;
}
message SumResponse {
required int64 sum = 1 [default = 0];
}
service SumService {
rpc getSum(SumRequest)
returns (SumResponse);
}
2、 执行protoc命令以从上面的.proto文件生成Java代码;
$ mkdir src
$ protoc --java_out=src ./sum.proto
这将生成一个类调用Sum.java。
3、 编写一个扩展生成的服务类的类,实现Coprocessor和CoprocessorService类,并重写服务方法;
**注意:**如果您从hbase-site.xml加载协处理器然后使用HBase Shell 再次加载同一个协处理器,它将再次加载。同一个类将存在两次,第二个实例将具有更高的ID(因此具有更低的优先级)。结果是有效地忽略了重复的协处理器。
public class SumEndPoint extends Sum.SumService implements Coprocessor, CoprocessorService {
private RegionCoprocessorEnvironment env;
@Override
public Service getService() {
return this;
}
@Override
public void start(CoprocessorEnvironment env) throws IOException {
if (env instanceof RegionCoprocessorEnvironment) {
this.env = (RegionCoprocessorEnvironment)env;
} else {
throw new CoprocessorException("Must be loaded on a table region!");
}
}
@Override
public void stop(CoprocessorEnvironment env) throws IOException {
// do nothing
}
@Override
public void getSum(RpcController controller, Sum.SumRequest request, RpcCallback<Sum.SumResponse> done) {
Scan scan = new Scan();
scan.addFamily(Bytes.toBytes(request.getFamily()));
scan.addColumn(Bytes.toBytes(request.getFamily()), Bytes.toBytes(request.getColumn()));
Sum.SumResponse response = null;
InternalScanner scanner = null;
try {
scanner = env.getRegion().getScanner(scan);
List<Cell> results = new ArrayList<>();
boolean hasMore = false;
long sum = 0L;
do {
hasMore = scanner.next(results);
for (Cell cell : results) {
sum = sum + Bytes.toLong(CellUtil.cloneValue(cell));
}
results.clear();
} while (hasMore);
response = Sum.SumResponse.newBuilder().setSum(sum).build();
} catch (IOException ioe) {
ResponseConverter.setControllerException(controller, ioe);
} finally {
if (scanner != null) {
try {
scanner.close();
} catch (IOException ignored) {}
}
}
done.run(response);
}
}
Configuration conf = HBaseConfiguration.create();
Connection connection = ConnectionFactory.createConnection(conf);
TableName tableName = TableName.valueOf("users");
Table table = connection.getTable(tableName);
final Sum.SumRequest request = Sum.SumRequest.newBuilder().setFamily("salaryDet").setColumn("gross").build();
try {
Map<byte[], Long> results = table.coprocessorService(
Sum.SumService.class,
null, /* start key */
null, /* end key */
new Batch.Call<Sum.SumService, Long>() {
@Override
public Long call(Sum.SumService aggregate) throws IOException {
BlockingRpcCallback<Sum.SumResponse> rpcCallback = new BlockingRpcCallback<>();
aggregate.getSum(null, request, rpcCallback);
Sum.SumResponse response = rpcCallback.get();
return response.hasSum() ? response.getSum() : 0L;
}
}
);
for (Long sum : results.values()) {
System.out.println("Sum = " + sum);
}
} catch (ServiceException e) {
e.printStackTrace();
} catch (Throwable e) {
e.printStackTrace();
}
4、 加载协处理器;
5、 编写客户端代码以调用协处理器;