对于Table内RowKey个数的统计,一直是HBase系统面临的一项重要工作,目前有三种执行该操作的方式。
测试环境:
Apache版的 hadoop-2.6.0 (cdh版的hadoop-2.6.0-cdh5.5.2也可以)
Apache版的 hbase-1.0.0 (一开始我用的是cdh版的hbase-1.0.0-cdh5.5.2,结果各种bug,无奈只能用Apache版的了)
jdk1.7.0_25
在hbase中创建测试所用的表:
create 'scores','grade','course'
put 'scores','zhangsan01','course:math','99'
put 'scores','zhangsan01','course:art','90'
put 'scores','zhangsan01','grade:','101'
put 'scores','zhangsan02','course:math','66'
put 'scores','zhangsan02','course:art','60'
put 'scores','lisi01','course:math','89'
Java代码:(MapReduce读取HBase表的行数并将结果写入另一张HBase表中)
import java.io.IOException;
import java.util.Iterator;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;public class ReadWriteHBase1 {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {String hbaseTableName1 = "scores";String hbaseTableName2 = "mytb2";prepareTB2(hbaseTableName2);Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(ReadWriteHBase1.class);job.setJobName("mrreadwritehbase");Scan scan = new Scan();scan.setCaching(500);scan.setCacheBlocks(false);TableMapReduceUtil.initTableMapperJob(hbaseTableName1, scan, doMapper.class, Text.class, IntWritable.class, job);TableMapReduceUtil.initTableReducerJob(hbaseTableName2, doReducer.class, job);System.exit(job.waitForCompletion(true) ? 1 : 0);}public static class doMapper extends TableMapper<Text, IntWritable>{private final static IntWritable one = new IntWritable(1);private final static Text hui = new Text("count");@Overrideprotected void map(ImmutableBytesWritable key, Result value, Context context) throws Exception {context.write(hui, one);}}public static class doReducer extends TableReducer<Text, IntWritable, NullWritable>{@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws Exception {System.out.println(key.toString());int sum = 0;Iterator<IntWritable> haha = values.iterator();while (haha.hasNext()) {sum += haha.next().get();}Put put = new Put(Bytes.toBytes(key.toString()));put.add(Bytes.toBytes("mycolumnfamily"), Bytes.toBytes("sum"), Bytes.toBytes(String.valueOf(sum))); context.write(NullWritable.get(), put);}}public static void prepareTB2(String hbaseTableName) throws IOException{HTableDescriptor tableDesc = new HTableDescriptor(hbaseTableName);HColumnDescriptor columnDesc = new HColumnDescriptor("mycolumnfamily");tableDesc.addFamily(columnDesc);Configuration cfg = HBaseConfiguration.create();HBaseAdmin admin = new HBaseAdmin(cfg);if (admin.tableExists(hbaseTableName)) {System.out.println("Table exists,trying drop and create!");admin.disableTable(hbaseTableName);admin.deleteTable(hbaseTableName);admin.createTable(tableDesc);} else {System.out.println("create table: "+ hbaseTableName);admin.createTable(tableDesc);}}
}
运行:
[hadoop@h40 q1]$ /usr/jdk1.7.0_25/bin/javac ReadWriteHBase1.java
[hadoop@h40 q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar ReadWriteHBase1*class
[hadoop@h40 q1]$ hadoop jar xx.jar ReadWriteHBase1
hbase(main):021:0> scan 'scores'
ROW COLUMN+CELL lisi01 column=course:math, timestamp=1492420326312, value=89 zhangsan01 column=course:art, timestamp=1492420325361, value=90 zhangsan01 column=course:math, timestamp=1492420325331, value=99 zhangsan01 column=grade:, timestamp=1492420325397, value=101 zhangsan02 column=course:art, timestamp=1492420325500, value=60 zhangsan02 column=course:math, timestamp=1492420325441, value=66
3 row(s) in 0.0120 seconds
scores表有三行数据,且表mytb2不存在,运行代码后查看mytb2表
hbase(main):003:0> scan 'mytb2'
ROW COLUMN+CELL count column=mycolumnfamily:sum, timestamp=1489762697539, value=3
1 row(s) in 0.0280 seconds
二、使用Scan+KeyOnlyFilter的方式进行。可以借助Filter的功能,尽可能实现数据在RegionServer端进行统计,减轻Client端的压力。但是,在大多数情况下,从每一个Region上进行Scan,当Table较大时,会造成非常长的延迟,用户体验下降。
Java代码:
import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;public class rowCount {public static Admin admin = null; public static Connection conn = null; public static Configuration getConfiguration() {Configuration conf = HBaseConfiguration.create();conf.set("hbase.rootdir", "hdfs://192.168.8.40:9000/hbase");conf.set("hbase.zookeeper.quorum", "h40:2181,h41:2181,h42:2181");return conf;}public static void main(String[] args) throws Exception { //表名rowCount("mytb1");}public static long rowCount(String tableName) { long rowCount = 0; try { HTable table = new HTable(getConfiguration(), tableName); Scan scan = new Scan(); scan.setFilter(new FirstKeyOnlyFilter()); ResultScanner resultScanner = table.getScanner(scan); for (Result result : resultScanner) { rowCount += result.size(); } System.out.println("rowCount-->"+rowCount);} catch (IOException e) {} return rowCount; }
}
三、协处理器:
1.hbase自带的org.apache.hadoop.hbase.coprocessor.AggregateImplementation,使用该类可以count一张表的总记录数
方法一:
启用表aggregation,只对特定的表生效。通过hbase Shell 来实现。
(1)disable指定表
hbase(main):036:0> disable 'scores'
(2)添加aggregation
alter 'scores', METHOD => 'table_att','coprocessor'=>'|org.apache.hadoop.hbase.coprocessor.AggregateImplementation||'
注意:这里吐槽一下CSDN,它会将一些关键词自动变红并且首字母大写,这里你就要注意了,org.apache.hadoop.hbase.coprocessor.AggregateImplementation中的hadoop中的h是小写而不是大写啊
(3)重启指定表
hbase(main):038:0> enable 'scores'
(4)查看是否加载成功
hbase(main):039:0> describe 'scores'
Table scores is ENABLED
scores, {TABLE_ATTRIBUTES => {coprocessor$1 => '|org.apache.hadoop.hbase.coprocessor.AggregateImplementation||'}
COLUMN FAMILIES DESCRIPTION
{NAME => 'course', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'ROW', REPLICATION_SCOPE => '0', VERSIONS => '1', COMPRESSION => 'NONE', MIN_VERSIONS => '0', TTL => 'FOREVER', KEEP_DELETED_CELLS => 'FALSE', BLOCKSIZE => '65536', IN_MEMO
RY => 'false', BLOCKCACHE => 'true'}
{NAME => 'grade', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'ROW', REPLICATION_SCOPE => '0', VERSIONS => '1', COMPRESSION => 'NONE', MIN_VERSIONS => '0', TTL => 'FOREVER', KEEP_DELETED_CELLS => 'FALSE', BLOCKSIZE => '65536', IN_MEMOR
Y => 'false', BLOCKCACHE => 'true'}
2 row(s) in 0.0350 seconds
(5)如果你想删除的话
hbase(main):040:0> alter 'scores' ,METHOD=>'table_att_unset',NAME=>'coprocessor$1'
方法二:启动全局aggregation,能过操纵所有的表上的数据。通过修改hbase集群所有节点的hbase-site.xml文件来实现,只需要添加如下代码:
<property> <name>hbase.coprocessor.user.region.classes</name><value>org.apache.hadoop.hbase.coprocessor.AggregateImplementation</value>
</property>
注意:方法一不需要重启hbase集群,而方法二需要重启hbase集群
java代码:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter;
import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;
import org.apache.hadoop.hbase.util.Bytes; public class MyAggregationClient { public static void main(String[] args) throws Throwable { Configuration customConf = new Configuration(); customConf.set("hbase.zookeeper.quorum", "h40:2181,h40:2181,h40:2181");//提高RPC通信时长 customConf.setLong("hbase.rpc.timeout", 600000); //设置Scan缓存 customConf.setLong("hbase.client.scanner.caching", 1000); Configuration configuration = HBaseConfiguration.create(customConf); AggregationClient aggregationClient = new AggregationClient( configuration);Scan scan = new Scan();//根据列族名统计行数
// scan.addFamily(Bytes.toBytes("grade"));
// scan.addColumn(Bytes.toBytes("course"), Bytes.toBytes("art"));/**private static final byte[] TABLE_NAME = Bytes.toBytes("scores"); long rowCount = aggregationClient.rowCount(TABLE_NAME, null, scan);这两行代码报错,显示TABLE_NAME必须是个TableName类型的,但也不知道这个TableName是个什么类型,该咋么表示,网上有这么写的,也不知道他们是咋么执行成功的,真厉害。。。下面的这行是正解* */long rowCount = aggregationClient.rowCount(TableName.valueOf("scores"), new LongColumnInterpreter(), scan); System.out.println("row count is " + rowCount);}
}
我是直接在myeclipse中运行的这个代码,你也可以在Linux上运行。
根据表名统计行数运行结果为:row count is 3
根据列族统计行数运行结果为:row count is 1 (把scan.addFamily(Bytes.toBytes("grade"));的注释去掉即可)
根据列名(把scan.addColumn(Bytes.toBytes("course"), Bytes.toBytes("art"));的注释去掉)
理想结果为:row count is 2 实际结果为:row count is 3
(这我就不明白了啊,hbase给了这个方法addColumn(byte[] family, byte[] qualifier),但为什么运行结果却不是想要的呢,有知道的朋友告我一声,谢谢)
2.自定义EndPoint:
(1)首先需要下载protobuf的解析器protobuf-2.5.0.tar.gz,下载地址:http://download.csdn.net/detail/m0_37739193/9901063
[root@h40 usr]# tar -zxvf protobuf-2.5.0.tar.gz
[root@h40 usr]# cd protobuf-2.5.0
编译之前需要把编译环境安装好,我这里的Linux是红帽版本:
[root@h40 ~]# yum -y install gcc*
[root@h40 protobuf-2.5.0]# ./configure --prefix=/usr/protobuf-2.5.0
(如果不加该参数的话,protobuf默认安装在/usr/local/bin目录下)
[root@h40 protobuf-2.5.0]# make && make install
(2)编写.proto文件
[root@h40 protobuf-2.5.0]# vi hehe.proto
option java_package = "com.cxk.coprocessor.test.generated";
option java_outer_classname = "CXKTestProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;
message CountRequest {
}
message CountResponse { required int64 count = 1 [default = 0];
}
service RowCountService { rpc getRowCount(CountRequest) returns (CountResponse);
}
写完这个消息后,使用我们刚才安装的protoc工具将其编译生成我们需要的Java代码,使用的命令如下:
[root@h40 protobuf-2.5.0]# bin/protoc --java_out=/usr hehe.proto
在com.cxk.coprocessor.test.generated下会生成CXKTestProtos.java
[root@h40 generated]# ls
CXKTestProtos.java
[root@h40 generated]# pwd
/usr/com/cxk/coprocessor/test/generated
(3)定义自己的Endpoint类(实现一下自己的方法):
package com.cxk.coprocessor.test.generated;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.util.Bytes;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service; public class RowCountEndpoint extends CXKTestProtos.RowCountService implements Coprocessor, CoprocessorService { private RegionCoprocessorEnvironment env; public RowCountEndpoint() { } @Override public Service getService() { return this; } /** * 统计hbase表总行数 */ @Override public void getRowCount(RpcController controller, CXKTestProtos.CountRequest request, RpcCallback<CXKTestProtos.CountResponse> done) { Scan scan = new Scan(); scan.setFilter(new FirstKeyOnlyFilter()); CXKTestProtos.CountResponse response = null; InternalScanner scanner = null; try { scanner = env.getRegion().getScanner(scan); List<Cell> results = new ArrayList<Cell>(); boolean hasMore = false; byte[] lastRow = null; long count = 0; do { hasMore = scanner.next(results); for (Cell kv : results) { byte[] currentRow = CellUtil.cloneRow(kv); if (lastRow == null || !Bytes.equals(lastRow, currentRow)) { lastRow = currentRow; count++; } } results.clear(); } while (hasMore); response = CXKTestProtos.CountResponse.newBuilder() .setCount(count).build(); } catch (IOException ioe) { ResponseConverter.setControllerException(controller, ioe); } finally { if (scanner != null) { try { scanner.close(); } catch (IOException ignored) {} } } done.run(response); } @Override public void start(CoprocessorEnvironment env) throws IOException { if (env instanceof RegionCoprocessorEnvironment) { this.env = (RegionCoprocessorEnvironment)env; } else { throw new CoprocessorException("Must be loaded on a table region!"); } } @Override public void stop(CoprocessorEnvironment env) throws IOException { // nothing to do }
}
(4)实现自己的客户端方法:
TestEndPoint.java 代码如下:
package com.cxk.coprocessor.test.generated;import java.io.IOException;
import java.util.Map;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.ServerRpcController;import com.cxk.coprocessor.test.generated.CXKTestProtos.RowCountService;
import com.google.protobuf.ServiceException;public class TestEndPoint {
/** * * @param args[0] ip ,args[1] zk_ip,args[2] table_name * @throws ServiceException * @throws Throwable */ public static void main(String[] args) throws ServiceException, Throwable { // TODO Auto-generated method stub System.out.println("begin.....");long begin_time=System.currentTimeMillis();Configuration config=HBaseConfiguration.create();String master_ip=args[0];
// String master_ip = "h40";String zk_ip=args[1];
// String zk_ip = "h40";String table_name=args[2];
// String table_name = "scores";config.set("hbase.zookeeper.property.clientPort", "2181"); config.set("hbase.zookeeper.quorum", zk_ip); config.set("hbase.master", master_ip+":600000"); final CXKTestProtos.CountRequest request = CXKTestProtos.CountRequest.getDefaultInstance(); HTable table=new HTable(config,table_name); Map<byte[],Long> results = table.coprocessorService(RowCountService.class, null, null, new Batch.Call<CXKTestProtos.RowCountService,Long>() { public Long call(CXKTestProtos.RowCountService counter) throws IOException { ServerRpcController controller = new ServerRpcController(); BlockingRpcCallback<CXKTestProtos.CountResponse> rpcCallback = new BlockingRpcCallback<CXKTestProtos.CountResponse>(); counter.getRowCount(controller, request, rpcCallback); CXKTestProtos.CountResponse response = rpcCallback.get(); if (controller.failedOnException()) { throw controller.getFailedOn(); } return (response != null && response.hasCount()) ? response.getCount() : 0; } }); table.close(); if(results.size()>0){String hui = results.values().toString();String qiang = hui.substring(1, hui.length()-1);//results.values()输出为[i]类型,为了方便我这里转化成了i的形式System.out.println(qiang); }else{ System.out.println("没有任何返回结果"); } long end_time=System.currentTimeMillis(); System.out.println("end:"+(end_time-begin_time)); }
}
(5)将RowCountEndpoint.java、TestEndPoint.java和前面生成的CXKTestProtos.java放在myeclipse中的同一个工程下
将该工程打包成rowcount.jar
注意:myeclipse中的打包方式要用第二种,否则在后面无法运行java -jar命令,我这里是为了后面运行该命令才用这种打包方式(因为把所需要的jar包环境也一并打包了,所以打包速度慢),如果你不需要运行该命令,则完全可以用第一种打包方式并且打包速度更快
(6)部署endpoint方式:
1.1对指定表生效:
1.1.1
将rowcount.jar上传到3台机器的hbase的lib目录下,并且一定要重启hbase集群
再执行
alter 'scores', METHOD => 'table_att','coprocessor'=>'|com.cxk.coprocessor.test.generated.RowCountEndpoint||'
1.1.2
将rowcount.jar上传到hdfs上
[hadoop@h40 ~]$ hadoop fs -mkdir /in
[hadoop@h40 ~]$ hadoop fs -put rowcount.jar /in
再执行
alter 'scores' , METHOD =>'table_att','coprocessor'=>
'hdfs://h40:9000/in/rowcount.jar|com.cxk.coprocessor.test.generated.RowCountEndpoint|1001|arg1=1,arg2=2'
(这种方法我没有成功,并且还报错,无法挽回,所以慎用)
1.2对所有表生效:
修改hbase.site.xml文件,实现对所有表加载这个endpoint
主节点:
[hadoop@h40 ~]$ vi hbase-1.0.0/conf/hbase-site.xml
添加如下内容:
<property><name>hbase.coprocessor.user.region.classes</name><value>com.cxk.coprocessor.test.generated.RowCountEndpoint</value>
</property>
其他两个节点一样(我的hbase集群用了3台机器搭建,分别为h40、h41、h42)
[hadoop@h40 ~]$ scp hbase-1.0.0/conf/hbase-site.xml h41:/home/hadoop/hbase-1.0.0/conf/
[hadoop@h40 ~]$ scp hbase-1.0.0/conf/hbase-site.xml h42:/home/hadoop/hbase-1.0.0/conf/
将rowcount.jar上传到3台机器的hbase的lib目录下,再重启hbase集群
(7)运行方式:
1.1
直接在myeclipse中运行TestEndPoint.java,该方法需要你在C:\Windows\System32\drivers\etc\hosts中添加如下内容映射你的虚拟机ip
192.168.8.40 h40
192.168.8.41 h41
192.168.8.42 h42
还有就是TestEndPoint.java中的代码没必要给参数了,直接这样给就行,把给参数的代码注释掉
String master_ip = "h40";
String zk_ip = "h40";
String table_name = "scores";
运行结果:
1.2
在Linux中运行TestEndPoint.java代码
[hadoop@h40 q1]$ /usr/jdk1.7.0_25/bin/javac TestEndPoint.java
(你可能这步无法编译,这就需要你在.bash_profile中将/home/hadoop/hbase-1.0.0/lib/*添加到CLASSPATH中)
[hadoop@h40 q1]$ /usr/jdk1.7.0_25/bin/java TestEndPoint h40 h40 scores
1.3
在Linux中运行java -jar命令
[hadoop@h40 lib]$ /usr/jdk1.7.0_25/bin/java -jar rowcount.jar h40 h40 scores
上面的是根据表名进行行数统计,下面的这个是根据列族和列名进行行数统计,操作方法和上面的一样就不累述了,把主要的代码给大家。
(按理说应该可以将这些都合成在一个里面,等有时间再仔细研究吧,先把这几个案列给出大家来)
option java_package = "com.tencent.yun.endpoint.proto";
option java_outer_classname = "RowCountService";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;message RowCountRequest{required string family = 1;required string column = 2;
}message RowCountResponse {required int64 rowCount = 1 [default = 0];
}service RowCount {rpc getRowCount(RowCountRequest)returns (RowCountResponse);
}
(如果你只想通过列族名而不过滤列名的话把required string column = 2;去掉)
package com.tencent.yun.endpoint.proto;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.util.Bytes;import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import com.tencent.yun.endpoint.proto.RowCountService;
import com.tencent.yun.endpoint.proto.RowCountService.RowCountRequest;
import com.tencent.yun.endpoint.proto.RowCountService.RowCountResponse;public class RowCountEndPoint extends RowCountService.RowCount implements Coprocessor, CoprocessorService {private RegionCoprocessorEnvironment env;public Service getService() {return this;}public void start(CoprocessorEnvironment env) throws IOException {if (env instanceof RegionCoprocessorEnvironment) {this.env = (RegionCoprocessorEnvironment) env;} else {throw new CoprocessorException("Must be loaded on a table region!");}}public void stop(CoprocessorEnvironment arg0) throws IOException {// do nothing}@Overridepublic void getRowCount(RpcController controller, RowCountRequest request, RpcCallback<RowCountResponse> done) {Scan scan = new Scan();scan.addFamily(Bytes.toBytes(request.getFamily()));scan.addColumn(Bytes.toBytes(request.getFamily()), Bytes.toBytes(request.getColumn()));// scan.setMaxVersions(1);InternalScanner scanner = null;RowCountResponse response = null;long count = 0L;try {List<Cell> results = new ArrayList<Cell>();boolean hasMore = false;scanner = env.getRegion().getScanner(scan);do {hasMore = scanner.next(results);for (Cell cell : results) {count++;// count = count + Bytes.toLong(CellUtil.cloneValue(cell));}results.clear();} while (hasMore);response = RowCountResponse.newBuilder().setRowCount(count).build();} catch (IOException e) {ResponseConverter.setControllerException(controller, e);} finally {if (scanner != null) {try {scanner.close();} catch (IOException ignored) {}}}done.run(response);}}
package com.tencent.yun.endpoint.proto;import java.io.IOException;
import java.util.Map;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.ServerRpcController;import com.google.protobuf.ServiceException;
import com.tencent.yun.endpoint.proto.RowCountService.RowCount;
import com.tencent.yun.endpoint.proto.RowCountService.RowCountRequest;
import com.tencent.yun.endpoint.proto.RowCountService.RowCountResponse;
import com.tencent.yun.endpoint.proto.RegionObserver;public class RowCountClient {public static void testRowCountEndpoint(String tableName, String family, String col) throws IOException {System.out.println("begin test.....");long t1 = System.currentTimeMillis();Configuration config = HBaseConfiguration.create();//填写hbase zk地址config.set("hbase.zookeeper.quorum", "192.168.8.40:2181,192.168.8.41:2181,192.168.8.42:2181");// 根据列名统计final RowCountRequest req = RowCountRequest.newBuilder().setFamily(family).setColumn(col).build();// 根据列族名统计
// final RowCountRequest req = RowCountRequest.newBuilder().setFamily(family).build();RowCountResponse resp = null;Connection con = null;Table table = null;try {con = ConnectionFactory.createConnection(config);table = con.getTable(TableName.valueOf(tableName));Map<byte[], Long> results = table.coprocessorService(RowCount.class, null, null,new Batch.Call<RowCount, Long>() {public Long call(RowCount instance) throws IOException {ServerRpcController controller = new ServerRpcController();BlockingRpcCallback<RowCountResponse> rpccall = new BlockingRpcCallback<RowCountResponse>();instance.getRowCount(controller, req, rpccall);RowCountResponse resp = rpccall.get();return resp.hasRowCount() ? resp.getRowCount() : 0L;}});long count = 0L;for (Long sum : results.values()) {
// System.out.println("region row Sum = " + sum);count += sum;}System.out.println("total count = " + count);long t2 = System.currentTimeMillis();System.out.println("use time = " + (t2-t1));} catch (IOException e) {e.printStackTrace();} catch (ServiceException e) {e.printStackTrace();} catch (Throwable e) {e.printStackTrace();} finally{table.close();con.close();}}public static void main(String[] args) throws IOException {String tableName = "scores";String family = "course";String col = "math";testRowCountEndpoint(tableName, family, col);}
}
根据列名统计:代码为String tableName = "scores";String family = "course";String col = "art";
运行结果为:total count = 2
根据列族名统计:代码为String tableName = "scores";String family = "course";String col = null; 理想结果:total count = 3 实际结果:total count = 5 (不知道哪里出问题了。。。)
为了统计使用Scan增加KeyOnlyFilter和Coprocessor之间的区别,记录了500次操作的时间,性能对比图如下:
从上图中,可以看出,大部分通过Coprocessor获取RowCount个数的延迟,小于1s,而使用Scan的方式,获得RowKeyCount的个数大概在4~5s。(备注,检查的table的Rowkey的个数在3w左右)。
那么究竟是什么原因让Coprocessor在统计Rowkey的个数上,拥有如此明显的优势呢?
这是因为在Table注册了Coprocessor之后,在执行AggregationClient的时候,会将RowCount分散到Table的每一个Region上,Region内RowCount的计算,是通过RPC执行调用接口,由Region对应的RegionServer执行InternalScanner进行的。
因此,性能的提升有两点原因:
1) 分布式统计。将原来客户端按照Rowkey的范围单点进行扫描,然后统计的方式,换成了由所有Region所在RegionServer同时计算的过程。
2)使用了在RegionServer内部执行使用了InternalScanner。这是距离实际存储最近的Scanner接口,存取更加快捷。
参考:
http://www.binospace.com/index.php/make-your-hbase-better-2/
http://blog.csdn.net/jameshadoop/article/details/42645551
http://blog.csdn.net/yangzongzhao/article/details/24306775
https://www.qcloud.com/document/product/407/4785