Hbase进行RowCount统计

对于Table内RowKey个数的统计,一直是HBase系统面临的一项重要工作,目前有三种执行该操作的方式。


测试环境:

Apache版的 hadoop-2.6.0 (cdh版的hadoop-2.6.0-cdh5.5.2也可以)
Apache版的 hbase-1.0.0 (一开始我用的是cdh版的hbase-1.0.0-cdh5.5.2,结果各种bug,无奈只能用Apache版的了)
jdk1.7.0_25


在hbase中创建测试所用的表:

create 'scores','grade','course'
put 'scores','zhangsan01','course:math','99'
put 'scores','zhangsan01','course:art','90'
put 'scores','zhangsan01','grade:','101'
put 'scores','zhangsan02','course:math','66'
put 'scores','zhangsan02','course:art','60'
put 'scores','lisi01','course:math','89'


一、使用MapReduce进行。可以借助HTableInputFormat实现对于Rowkey的划分,但是需要占用资源,另外由于使用的Hadoop集群提交作业,经常会遇到不能申请到资源的情况,延迟较大,不适合应用的频繁访问。(我没有使用HTableInputFormat方法,不知道如何使用这个方法进行rowcount,有时间再研究研究吧)

Java代码:(MapReduce读取HBase表的行数并将结果写入另一张HBase表中)

import java.io.IOException;
import java.util.Iterator;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;public class ReadWriteHBase1 {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {String hbaseTableName1 = "scores";String hbaseTableName2 = "mytb2";prepareTB2(hbaseTableName2);Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(ReadWriteHBase1.class);job.setJobName("mrreadwritehbase");Scan scan = new Scan();scan.setCaching(500);scan.setCacheBlocks(false);TableMapReduceUtil.initTableMapperJob(hbaseTableName1, scan, doMapper.class, Text.class, IntWritable.class, job);TableMapReduceUtil.initTableReducerJob(hbaseTableName2, doReducer.class, job);System.exit(job.waitForCompletion(true) ? 1 : 0);}public static class doMapper extends TableMapper<Text, IntWritable>{private final static IntWritable one = new IntWritable(1);private final static Text hui = new Text("count");@Overrideprotected void map(ImmutableBytesWritable key, Result value, Context context) throws Exception {context.write(hui, one);}}public static class doReducer extends TableReducer<Text, IntWritable, NullWritable>{@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws Exception {System.out.println(key.toString());int sum = 0;Iterator<IntWritable> haha = values.iterator();while (haha.hasNext()) {sum += haha.next().get();}Put put = new Put(Bytes.toBytes(key.toString()));put.add(Bytes.toBytes("mycolumnfamily"), Bytes.toBytes("sum"), Bytes.toBytes(String.valueOf(sum)));				context.write(NullWritable.get(), put);}}public static void prepareTB2(String hbaseTableName) throws IOException{HTableDescriptor tableDesc = new HTableDescriptor(hbaseTableName);HColumnDescriptor columnDesc = new HColumnDescriptor("mycolumnfamily");tableDesc.addFamily(columnDesc);Configuration  cfg = HBaseConfiguration.create();HBaseAdmin admin = new HBaseAdmin(cfg);if (admin.tableExists(hbaseTableName)) {System.out.println("Table exists,trying drop and create!");admin.disableTable(hbaseTableName);admin.deleteTable(hbaseTableName);admin.createTable(tableDesc);} else {System.out.println("create table: "+ hbaseTableName);admin.createTable(tableDesc);}}
}

运行:
[hadoop@h40 q1]$ /usr/jdk1.7.0_25/bin/javac ReadWriteHBase1.java
[hadoop@h40 q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar ReadWriteHBase1*class
[hadoop@h40 q1]$ hadoop jar xx.jar ReadWriteHBase1 

hbase(main):021:0> scan 'scores'
ROW                                                          COLUMN+CELL                                                                                                                                                                     lisi01                                                      column=course:math, timestamp=1492420326312, value=89                                                                                                                           zhangsan01                                                  column=course:art, timestamp=1492420325361, value=90                                                                                                                            zhangsan01                                                  column=course:math, timestamp=1492420325331, value=99                                                                                                                           zhangsan01                                                  column=grade:, timestamp=1492420325397, value=101                                                                                                                               zhangsan02                                                  column=course:art, timestamp=1492420325500, value=60                                                                                                                            zhangsan02                                                  column=course:math, timestamp=1492420325441, value=66                                                                                                                           
3 row(s) in 0.0120 seconds

scores表有三行数据,且表mytb2不存在,运行代码后查看mytb2表

hbase(main):003:0> scan 'mytb2'
ROW                                                          COLUMN+CELL                                                                                                                                                                     count                                                       column=mycolumnfamily:sum, timestamp=1489762697539, value=3
1 row(s) in 0.0280 seconds

二、使用Scan+KeyOnlyFilter的方式进行。可以借助Filter的功能,尽可能实现数据在RegionServer端进行统计,减轻Client端的压力。但是,在大多数情况下,从每一个Region上进行Scan,当Table较大时,会造成非常长的延迟,用户体验下降。

Java代码:

import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;public class rowCount {public static Admin admin = null;  public static Connection conn = null;  public static Configuration getConfiguration() {Configuration conf = HBaseConfiguration.create();conf.set("hbase.rootdir", "hdfs://192.168.8.40:9000/hbase");conf.set("hbase.zookeeper.quorum", "h40:2181,h41:2181,h42:2181");return conf;}public static void main(String[] args) throws Exception {  //表名rowCount("mytb1");}public static long rowCount(String tableName) {  long rowCount = 0;  try {  HTable table = new HTable(getConfiguration(), tableName);  Scan scan = new Scan();  scan.setFilter(new FirstKeyOnlyFilter());  ResultScanner resultScanner = table.getScanner(scan);  for (Result result : resultScanner) {  rowCount += result.size();  }  System.out.println("rowCount-->"+rowCount);} catch (IOException e) {}  return rowCount;  }  
}

三、协处理器:

1.hbase自带的org.apache.hadoop.hbase.coprocessor.AggregateImplementation,使用该类可以count一张表的总记录数

方法一:

启用表aggregation,只对特定的表生效。通过hbase Shell 来实现。
(1)disable指定表
hbase(main):036:0> disable 'scores'
(2)添加aggregation
alter 'scores', METHOD => 'table_att','coprocessor'=>'|org.apache.hadoop.hbase.coprocessor.AggregateImplementation||'
注意:这里吐槽一下CSDN,它会将一些关键词自动变红并且首字母大写,这里你就要注意了,org.apache.hadoop.hbase.coprocessor.AggregateImplementation中的hadoop中的h是小写而不是大写啊
(3)重启指定表 
hbase(main):038:0> enable 'scores'
(4)查看是否加载成功
hbase(main):039:0> describe 'scores'
Table scores is ENABLED                                                                                                                                                                                                                      
scores, {TABLE_ATTRIBUTES => {coprocessor$1 => '|org.apache.hadoop.hbase.coprocessor.AggregateImplementation||'}                                                                                                                             
COLUMN FAMILIES DESCRIPTION                                                                                                                                                                                                                  
{NAME => 'course', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'ROW', REPLICATION_SCOPE => '0', VERSIONS => '1', COMPRESSION => 'NONE', MIN_VERSIONS => '0', TTL => 'FOREVER', KEEP_DELETED_CELLS => 'FALSE', BLOCKSIZE => '65536', IN_MEMO
RY => 'false', BLOCKCACHE => 'true'}                                                                                                                                                                                                         
{NAME => 'grade', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'ROW', REPLICATION_SCOPE => '0', VERSIONS => '1', COMPRESSION => 'NONE', MIN_VERSIONS => '0', TTL => 'FOREVER', KEEP_DELETED_CELLS => 'FALSE', BLOCKSIZE => '65536', IN_MEMOR
Y => 'false', BLOCKCACHE => 'true'}                                                                                                                                                                                                          
2 row(s) in 0.0350 seconds
(5)如果你想删除的话
hbase(main):040:0> alter 'scores' ,METHOD=>'table_att_unset',NAME=>'coprocessor$1'

方法二:启动全局aggregation,能过操纵所有的表上的数据。通过修改hbase集群所有节点的hbase-site.xml文件来实现,只需要添加如下代码:

<property>  <name>hbase.coprocessor.user.region.classes</name><value>org.apache.hadoop.hbase.coprocessor.AggregateImplementation</value>  
</property>
注意:方法一不需要重启hbase集群,而方法二需要重启hbase集群

java代码:

import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.hbase.HBaseConfiguration;  
import org.apache.hadoop.hbase.TableName;  
import org.apache.hadoop.hbase.client.Scan;  
import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;  
import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter;  
import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;  
import org.apache.hadoop.hbase.util.Bytes;  public class MyAggregationClient {  public static void main(String[] args) throws Throwable {  Configuration customConf = new Configuration();  customConf.set("hbase.zookeeper.quorum", "h40:2181,h40:2181,h40:2181");//提高RPC通信时长  customConf.setLong("hbase.rpc.timeout", 600000);  //设置Scan缓存  customConf.setLong("hbase.client.scanner.caching", 1000);  Configuration configuration = HBaseConfiguration.create(customConf);  AggregationClient aggregationClient = new AggregationClient(  configuration);Scan scan = new Scan();//根据列族名统计行数
//    scan.addFamily(Bytes.toBytes("grade"));
//    scan.addColumn(Bytes.toBytes("course"), Bytes.toBytes("art"));/**private static final byte[] TABLE_NAME = Bytes.toBytes("scores");  long rowCount = aggregationClient.rowCount(TABLE_NAME, null, scan);这两行代码报错,显示TABLE_NAME必须是个TableName类型的,但也不知道这个TableName是个什么类型,该咋么表示,网上有这么写的,也不知道他们是咋么执行成功的,真厉害。。。下面的这行是正解* */long rowCount = aggregationClient.rowCount(TableName.valueOf("scores"), new LongColumnInterpreter(), scan);  System.out.println("row count is " + rowCount);}
}
我是直接在myeclipse中运行的这个代码,你也可以在Linux上运行。

根据表名统计行数运行结果为:row count is 3
根据列族统计行数运行结果为:row count is 1 (把scan.addFamily(Bytes.toBytes("grade"));的注释去掉即可)
根据列名(把scan.addColumn(Bytes.toBytes("course"), Bytes.toBytes("art"));的注释去掉)
理想结果为:row count is 2 实际结果为:row count is 3
(这我就不明白了啊,hbase给了这个方法addColumn(byte[] family, byte[] qualifier),但为什么运行结果却不是想要的呢,有知道的朋友告我一声,谢谢)


2.自定义EndPoint:

(1)首先需要下载protobuf的解析器protobuf-2.5.0.tar.gz,下载地址:http://download.csdn.net/detail/m0_37739193/9901063

然后,按照如下的方式安装:
[root@h40 usr]# tar -zxvf protobuf-2.5.0.tar.gz
[root@h40 usr]# cd protobuf-2.5.0
编译之前需要把编译环境安装好,我这里的Linux是红帽版本:
[root@h40 ~]# yum -y install gcc*
[root@h40 protobuf-2.5.0]# ./configure --prefix=/usr/protobuf-2.5.0
(如果不加该参数的话,protobuf默认安装在/usr/local/bin目录下)
[root@h40 protobuf-2.5.0]# make && make install


(2)编写.proto文件
[root@h40 protobuf-2.5.0]# vi hehe.proto

option java_package = "com.cxk.coprocessor.test.generated";  
option java_outer_classname = "CXKTestProtos";  
option java_generic_services = true;  
option java_generate_equals_and_hash = true;  
option optimize_for = SPEED;  
message CountRequest {  
}  
message CountResponse {  required int64 count = 1 [default = 0];  
}  
service RowCountService {  rpc getRowCount(CountRequest)  returns (CountResponse);  
}
写完这个消息后,使用我们刚才安装的protoc工具将其编译生成我们需要的Java代码,使用的命令如下:
[root@h40 protobuf-2.5.0]# bin/protoc --java_out=/usr hehe.proto


在com.cxk.coprocessor.test.generated下会生成CXKTestProtos.java
[root@h40 generated]# ls
CXKTestProtos.java
[root@h40 generated]# pwd
/usr/com/cxk/coprocessor/test/generated


(3)定义自己的Endpoint类(实现一下自己的方法):

package com.cxk.coprocessor.test.generated;import java.io.IOException;  
import java.util.ArrayList;  
import java.util.List;  
import org.apache.hadoop.hbase.Cell;  
import org.apache.hadoop.hbase.CellUtil;  
import org.apache.hadoop.hbase.Coprocessor;  
import org.apache.hadoop.hbase.CoprocessorEnvironment;  
import org.apache.hadoop.hbase.client.Scan;  
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;  
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;  
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;  
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;  
import org.apache.hadoop.hbase.protobuf.ResponseConverter;  
import org.apache.hadoop.hbase.regionserver.InternalScanner;  
import org.apache.hadoop.hbase.util.Bytes;  
import com.google.protobuf.RpcCallback;  
import com.google.protobuf.RpcController;  
import com.google.protobuf.Service;  public class RowCountEndpoint extends CXKTestProtos.RowCountService  implements Coprocessor, CoprocessorService {  private RegionCoprocessorEnvironment env;  public RowCountEndpoint() {  }  @Override  public Service getService() {  return this;  }  /**  * 统计hbase表总行数  */  @Override  public void getRowCount(RpcController controller, CXKTestProtos.CountRequest request,  RpcCallback<CXKTestProtos.CountResponse> done) {  Scan scan = new Scan();  scan.setFilter(new FirstKeyOnlyFilter());  CXKTestProtos.CountResponse response = null;  InternalScanner scanner = null;  try {  scanner = env.getRegion().getScanner(scan);  List<Cell> results = new ArrayList<Cell>();  boolean hasMore = false;  byte[] lastRow = null;  long count = 0;  do {  hasMore = scanner.next(results);  for (Cell kv : results) {  byte[] currentRow = CellUtil.cloneRow(kv);  if (lastRow == null || !Bytes.equals(lastRow, currentRow)) {  lastRow = currentRow;  count++;  }  }  results.clear();  } while (hasMore);  response = CXKTestProtos.CountResponse.newBuilder()  .setCount(count).build();  } catch (IOException ioe) {  ResponseConverter.setControllerException(controller, ioe);  } finally {  if (scanner != null) {  try {  scanner.close();  } catch (IOException ignored) {}  }  }  done.run(response);  }  @Override  public void start(CoprocessorEnvironment env) throws IOException {  if (env instanceof RegionCoprocessorEnvironment) {  this.env = (RegionCoprocessorEnvironment)env;  } else {  throw new CoprocessorException("Must be loaded on a table region!");  }  }  @Override  public void stop(CoprocessorEnvironment env) throws IOException {  // nothing to do  }  
}
(4)实现自己的客户端方法:
TestEndPoint.java 代码如下:

package com.cxk.coprocessor.test.generated;import java.io.IOException;
import java.util.Map;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.ServerRpcController;import com.cxk.coprocessor.test.generated.CXKTestProtos.RowCountService;
import com.google.protobuf.ServiceException;public class TestEndPoint {  
/**  *   * @param args[0] ip ,args[1] zk_ip,args[2] table_name  * @throws ServiceException  * @throws Throwable  */  public static void main(String[] args) throws ServiceException, Throwable {  // TODO Auto-generated method stub  System.out.println("begin.....");long begin_time=System.currentTimeMillis();Configuration config=HBaseConfiguration.create();String master_ip=args[0];
//        String master_ip = "h40";String zk_ip=args[1];
//        String zk_ip = "h40";String table_name=args[2];
//        String table_name = "scores";config.set("hbase.zookeeper.property.clientPort", "2181");   config.set("hbase.zookeeper.quorum", zk_ip);   config.set("hbase.master", master_ip+":600000");  final CXKTestProtos.CountRequest request = CXKTestProtos.CountRequest.getDefaultInstance();  HTable table=new HTable(config,table_name);  Map<byte[],Long> results = table.coprocessorService(RowCountService.class,  null, null,  new Batch.Call<CXKTestProtos.RowCountService,Long>() {  public Long call(CXKTestProtos.RowCountService counter) throws IOException {  ServerRpcController controller = new ServerRpcController();  BlockingRpcCallback<CXKTestProtos.CountResponse> rpcCallback =  new BlockingRpcCallback<CXKTestProtos.CountResponse>();  counter.getRowCount(controller, request, rpcCallback);  CXKTestProtos.CountResponse response = rpcCallback.get();  if (controller.failedOnException()) {  throw controller.getFailedOn();  }  return (response != null && response.hasCount()) ? response.getCount() : 0;  }  });  table.close();  if(results.size()>0){String hui = results.values().toString();String qiang = hui.substring(1, hui.length()-1);//results.values()输出为[i]类型,为了方便我这里转化成了i的形式System.out.println(qiang);  }else{  System.out.println("没有任何返回结果");  }  long end_time=System.currentTimeMillis();  System.out.println("end:"+(end_time-begin_time));  }  
}  
(5)将RowCountEndpoint.java、TestEndPoint.java和前面生成的CXKTestProtos.java放在myeclipse中的同一个工程下

将该工程打包成rowcount.jar
注意:myeclipse中的打包方式要用第二种,否则在后面无法运行java -jar命令,我这里是为了后面运行该命令才用这种打包方式(因为把所需要的jar包环境也一并打包了,所以打包速度慢),如果你不需要运行该命令,则完全可以用第一种打包方式并且打包速度更快


(6)部署endpoint方式:

1.1对指定表生效:
1.1.1
将rowcount.jar上传到3台机器的hbase的lib目录下,并且一定要重启hbase集群
再执行
alter 'scores', METHOD => 'table_att','coprocessor'=>'|com.cxk.coprocessor.test.generated.RowCountEndpoint||'
1.1.2
将rowcount.jar上传到hdfs上
[hadoop@h40 ~]$ hadoop fs -mkdir /in
[hadoop@h40 ~]$ hadoop fs -put rowcount.jar /in
再执行
alter 'scores' , METHOD =>'table_att','coprocessor'=>

'hdfs://h40:9000/in/rowcount.jar|com.cxk.coprocessor.test.generated.RowCountEndpoint|1001|arg1=1,arg2=2'
(这种方法我没有成功,并且还报错,无法挽回,所以慎用)


1.2对所有表生效:
修改hbase.site.xml文件,实现对所有表加载这个endpoint
主节点:
[hadoop@h40 ~]$ vi hbase-1.0.0/conf/hbase-site.xml 
添加如下内容:

<property><name>hbase.coprocessor.user.region.classes</name><value>com.cxk.coprocessor.test.generated.RowCountEndpoint</value>
</property>
其他两个节点一样(我的hbase集群用了3台机器搭建,分别为h40、h41、h42)
[hadoop@h40 ~]$ scp hbase-1.0.0/conf/hbase-site.xml h41:/home/hadoop/hbase-1.0.0/conf/
[hadoop@h40 ~]$ scp hbase-1.0.0/conf/hbase-site.xml h42:/home/hadoop/hbase-1.0.0/conf/


将rowcount.jar上传到3台机器的hbase的lib目录下,再重启hbase集群

(7)运行方式:
1.1
直接在myeclipse中运行TestEndPoint.java,该方法需要你在C:\Windows\System32\drivers\etc\hosts中添加如下内容映射你的虚拟机ip
192.168.8.40 h40
192.168.8.41 h41
192.168.8.42 h42
还有就是TestEndPoint.java中的代码没必要给参数了,直接这样给就行,把给参数的代码注释掉
String master_ip = "h40";
String zk_ip = "h40";
String table_name = "scores";
运行结果:

1.2
在Linux中运行TestEndPoint.java代码
[hadoop@h40 q1]$ /usr/jdk1.7.0_25/bin/javac TestEndPoint.java 
(你可能这步无法编译,这就需要你在.bash_profile中将/home/hadoop/hbase-1.0.0/lib/*添加到CLASSPATH中)
[hadoop@h40 q1]$ /usr/jdk1.7.0_25/bin/java TestEndPoint h40 h40 scores


1.3
在Linux中运行java -jar命令
[hadoop@h40 lib]$ /usr/jdk1.7.0_25/bin/java -jar rowcount.jar h40 h40 scores


上面的是根据表名进行行数统计,下面的这个是根据列族和列名进行行数统计,操作方法和上面的一样就不累述了,把主要的代码给大家。
(按理说应该可以将这些都合成在一个里面,等有时间再仔细研究吧,先把这几个案列给出大家来)

option java_package = "com.tencent.yun.endpoint.proto";
option java_outer_classname = "RowCountService";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;message RowCountRequest{required string family = 1;required string column = 2;
}message RowCountResponse {required int64 rowCount = 1 [default = 0];
}service RowCount {rpc getRowCount(RowCountRequest)returns (RowCountResponse);
}
(如果你只想通过列族名而不过滤列名的话把required string column = 2;去掉)

package com.tencent.yun.endpoint.proto;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.util.Bytes;import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import com.tencent.yun.endpoint.proto.RowCountService;
import com.tencent.yun.endpoint.proto.RowCountService.RowCountRequest;
import com.tencent.yun.endpoint.proto.RowCountService.RowCountResponse;public class RowCountEndPoint extends RowCountService.RowCount implements Coprocessor, CoprocessorService {private RegionCoprocessorEnvironment env;public Service getService() {return this;}public void start(CoprocessorEnvironment env) throws IOException {if (env instanceof RegionCoprocessorEnvironment) {this.env = (RegionCoprocessorEnvironment) env;} else {throw new CoprocessorException("Must be loaded on a table region!");}}public void stop(CoprocessorEnvironment arg0) throws IOException {// do nothing}@Overridepublic void getRowCount(RpcController controller, RowCountRequest request, RpcCallback<RowCountResponse> done) {Scan scan = new Scan();scan.addFamily(Bytes.toBytes(request.getFamily()));scan.addColumn(Bytes.toBytes(request.getFamily()), Bytes.toBytes(request.getColumn()));// scan.setMaxVersions(1);InternalScanner scanner = null;RowCountResponse response = null;long count = 0L;try {List<Cell> results = new ArrayList<Cell>();boolean hasMore = false;scanner = env.getRegion().getScanner(scan);do {hasMore = scanner.next(results);for (Cell cell : results) {count++;// count = count + Bytes.toLong(CellUtil.cloneValue(cell));}results.clear();} while (hasMore);response = RowCountResponse.newBuilder().setRowCount(count).build();} catch (IOException e) {ResponseConverter.setControllerException(controller, e);} finally {if (scanner != null) {try {scanner.close();} catch (IOException ignored) {}}}done.run(response);}}
package com.tencent.yun.endpoint.proto;import java.io.IOException;
import java.util.Map;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.ServerRpcController;import com.google.protobuf.ServiceException;
import com.tencent.yun.endpoint.proto.RowCountService.RowCount;
import com.tencent.yun.endpoint.proto.RowCountService.RowCountRequest;
import com.tencent.yun.endpoint.proto.RowCountService.RowCountResponse;
import com.tencent.yun.endpoint.proto.RegionObserver;public class RowCountClient {public static void testRowCountEndpoint(String tableName, String family, String col) throws IOException {System.out.println("begin test.....");long t1 = System.currentTimeMillis();Configuration config = HBaseConfiguration.create();//填写hbase zk地址config.set("hbase.zookeeper.quorum", "192.168.8.40:2181,192.168.8.41:2181,192.168.8.42:2181");// 根据列名统计final RowCountRequest req = RowCountRequest.newBuilder().setFamily(family).setColumn(col).build();// 根据列族名统计
//        final RowCountRequest req = RowCountRequest.newBuilder().setFamily(family).build();RowCountResponse resp = null;Connection con = null;Table table = null;try {con = ConnectionFactory.createConnection(config);table = con.getTable(TableName.valueOf(tableName));Map<byte[], Long> results = table.coprocessorService(RowCount.class, null, null,new Batch.Call<RowCount, Long>() {public Long call(RowCount instance) throws IOException {ServerRpcController controller = new ServerRpcController();BlockingRpcCallback<RowCountResponse> rpccall = new BlockingRpcCallback<RowCountResponse>();instance.getRowCount(controller, req, rpccall);RowCountResponse resp = rpccall.get();return resp.hasRowCount() ? resp.getRowCount() : 0L;}});long count = 0L;for (Long sum : results.values()) {
//                System.out.println("region row Sum = " + sum);count += sum;}System.out.println("total count = " + count);long t2 = System.currentTimeMillis();System.out.println("use time = " + (t2-t1));} catch (IOException e) {e.printStackTrace();} catch (ServiceException e) {e.printStackTrace();} catch (Throwable e) {e.printStackTrace();} finally{table.close();con.close();}}public static void main(String[] args) throws IOException {String tableName = "scores";String family = "course";String col = "math";testRowCountEndpoint(tableName, family, col);}
}
根据列名统计:代码为String tableName = "scores";String family = "course";String col = "art"; 运行结果为:total count = 2
根据列族名统计:代码为String tableName = "scores";String family = "course";String col = null; 理想结果:total count = 3 实际结果:total count = 5 (不知道哪里出问题了。。。)

为了统计使用Scan增加KeyOnlyFilter和Coprocessor之间的区别,记录了500次操作的时间,性能对比图如下:


从上图中,可以看出,大部分通过Coprocessor获取RowCount个数的延迟,小于1s,而使用Scan的方式,获得RowKeyCount的个数大概在4~5s。(备注,检查的table的Rowkey的个数在3w左右)。
那么究竟是什么原因让Coprocessor在统计Rowkey的个数上,拥有如此明显的优势呢?
这是因为在Table注册了Coprocessor之后,在执行AggregationClient的时候,会将RowCount分散到Table的每一个Region上,Region内RowCount的计算,是通过RPC执行调用接口,由Region对应的RegionServer执行InternalScanner进行的。
因此,性能的提升有两点原因:
1) 分布式统计。将原来客户端按照Rowkey的范围单点进行扫描,然后统计的方式,换成了由所有Region所在RegionServer同时计算的过程。
2)使用了在RegionServer内部执行使用了InternalScanner。这是距离实际存储最近的Scanner接口,存取更加快捷。


参考:
http://www.binospace.com/index.php/make-your-hbase-better-2/
http://blog.csdn.net/jameshadoop/article/details/42645551
http://blog.csdn.net/yangzongzhao/article/details/24306775
https://www.qcloud.com/document/product/407/4785

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/351643.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

【完整版】2023二级建造师《建筑实务》真题答案解析(2天考3科)

2023二级建造师考试将在6月3日、4日举行&#xff0c;2023二建《市政实务》考试时间&#xff08;2天考3科&#xff09;&#xff1a;6月4日 9:00-12:00&#xff0c; 考后甘建二将及时发布2023年二建市政实务真题及答案解析&#xff0c;敬请关注 2天考3科地区&#xff1a;四川、山…

DMBOK知识梳理for CDGA/CDGP——第三章数据治理

关 注gzh“大数据食铁兽” 回复“知识点”获取《DMBOK知识梳理for CDGA/CDGP》常考知识点&#xff08;第三章数据治理&#xff09; 第三章 数据治理 第三章在是CDGA|CDGP考试的重点考核章节之一&#xff0c;知识点比较密集&#xff0c;本章重点为语境关系图及数据治理概念…

LiangGaRy-学习笔记-Day19

1、回顾知识 1.1、文件系统说明 xfs与ext4文件系统 CentOS7以上&#xff1a;默认的就是XFS文件系统 xfs 使用的就是restore、dump等工具 CentOS6默认的就是ext4文件系统 extundelete工具就是用于ext4系统 1.2、回顾Linux文件系统 Linux文件系统是由三个部分组成 inode文…

一文学会MySQL四种安装方式

目录 &#x1f341;rpm方式安装 &#x1f340;下载软件包 &#x1f340;前置配置 &#x1f340;安装MySQL &#x1f341;yum方式安装 &#x1f340;下载软件包 &#x1f340;安装MySQL &#x1f341;二进制方式安装 &#x1f340;下载软件包 &#x1f340;安装MySQL &#x1f3…

2023最新网络安全面试题大全,看完这篇你的秋招offer就到手了!

前言 随着国家政策的扶持&#xff0c;网络安全行业也越来越为大众所熟知&#xff0c;想要进入到网络安全行业的人也越来越多。 为了拿到心仪的 Offer 之外&#xff0c;除了学好网络安全知识以外&#xff0c;还要应对好企业的面试。 作为一个安全老鸟&#xff0c;工作这么多年…

【自定义CPU占用率】

题目&#xff1a;写一个程序&#xff0c;让用户来决定Windows任务管理器&#xff08;Task Manager&#xff09;的CPU占用率。程序越精简越好&#xff0c;计算机语言不限。例如&#xff0c;可以实现下面三种情况&#xff1a; 1. CPU的占用率固定在50%&#xff0c;为一条直线&…

控制cpu占有率

http://www.cnblogs.com/Ripper-Y/archive/2012/05/19/2508511.html CPU正弦曲线 1 #include <iostream>2 #include <cmath>3 #include <ctime>4 #include <windows.h>5 6 using namespace std;7 8 //得到循环0xFFFFFFFF次用的秒数9 unsigned int te…

CPU正弦曲线

CPU正弦曲线 1 #include <iostream>2 #include <cmath>3 #include <ctime>4 #include <windows.h>5 6 using namespace std;7 8 //得到循环0xFFFFFFFF次用的秒数9 unsigned int test() 10 { 11 unsigned int c 0xFFFFFFFF; 12 13 time_t t1…

(1.5.1.1)编程之美:让CPU占用率曲线听你指挥

题目&#xff1a;写一个程序&#xff0c;让用户来决定Windows任务管理器&#xff08;Task Manager&#xff09;的CPU占用率。程序越精简越好&#xff0c;计算机语言不限。例如&#xff0c;可以实现下面三种情况&#xff1a; 1. CPU的占用率固定在50%&#xff0c;为一条直线&…

让CPU占用率曲线听你指挥

由于网上已经有很多有关此问题的博客&#xff0c;本文参考了http://blog.csdn.net/wesweeky/article/details/6402564 题目&#xff1a;写一个程序&#xff0c;让用户来决定Windows任务管理器&#xff08;Task Manager&#xff09;的CPU占用率。程序越精简越好&#xff0c;计…

现代计算机理论基础是什么_为什么旧游戏在现代计算机上运行得太快?

现代计算机理论基础是什么 If you’ve ever tried to get a vintage computer game up and running on a modern system, you’ve likely been shocked at how fast the game ran. Why do old games run out of control on modern hardware? 如果您曾经尝试过在现代系统上启动…

《编程之美》读书笔记23: 1.1 让CPU占用率曲线听你指挥

题目&#xff1a;写一个程序&#xff0c;让用户来决定Windows任务管理器&#xff08;Task Manager&#xff09;的CPU占用率。程序越精简越好&#xff0c;计算机语言不限。例如&#xff0c;可以实现下面三种情况&#xff1a; 1. CPU的占用率固定在50%&#xff0c;为一条直线&…

编程之美:让CPU占用率曲线听你指挥

题目&#xff1a;写一个程序&#xff0c;让用户来决定Windows任务管理器&#xff08;Task Manager&#xff09;的CPU占用率。程序越精简越好&#xff0c;计算机语言不限。例如&#xff0c;可以实现下面三种情况&#xff1a; 1. CPU的占用率固定在50%&#xff0c;为一条直线…

Python+Pytest+Allure+Git+Jenkins数据驱动接口自动化测试框架

一、接口基础   接口测试是对系统和组件之间的接口进行测试&#xff0c;主要是效验数据的交换&#xff0c;传递和控制管理过程&#xff0c;以及相互逻辑依赖关系。其中接口协议分为HTTP&#xff0c;RPC&#xff0c;Webservice&#xff0c;Dubbo&#xff0c;RESTful等类型。 …

【Mysql数据库从0到1】-入门基础篇--sql语句简单使用

【Mysql数据库从0到1】-入门基础篇--sql语句简单使用 &#x1f53b;一、数据库创建、删除、选择1.1 &#x1f343; create database 创建数据库1.2 &#x1f343; 使用 mysqladmin 创建数据库1.3 &#x1f343; drop 命令删除数据库--一般不建议在数据库执行delete、drop等命令…

火狐浏览器下载网页视频

1、打开火狐浏览器菜单栏&#xff0c;点击“附件组件”&#xff1a; 2、在搜素框内输入“VideoGrab” 回车。 3、点击搜索结果&#xff1a; 4、点击添加&#xff0c;我这里已经添加&#xff0c;所以显示的是可“移除”&#xff1b;

Firefox(火狐浏览器)

下面的教程因为有的过程复杂&#xff0c;所以点到即止&#xff0c;不进行多余赘述。请高阶鸟自行摸索。 毕竟&#xff0c;师傅领进门&#xff0c;修行在个人。 1.下载安装 官网&#xff1a;http://www.firefox.com.cn/ 既然是高阶鸟&#xff0c;下载安装什么的不是有手就行吗…

火狐linux 32位,火狐浏览器32.0版本-Firefox(火狐浏览器)32.0版下载 v32.0官方版--pc6下载站...

Firefox32.0版带有一个新的HTTP缓存后端&#xff0c;其他改进包括请求排序的优化&#xff0c;以提供更快的加载时间以及对崩溃和挂起应变能力提升&#xff0c;而且可以查看密码管理器中存储的登录的历史使用信息。。 Firefox 32.0版带有一个新的HTTP缓存后端&#xff0c;其他改…

waterfox(水狐)——火狐的非官方64位版浏览器(汉化)

64位系统下的64位应用毫无疑问会带来更好的性能&#xff0c;Mozilla最近显然是采用了互联网快速迭代的开发模式&#xff0c;Firefox在版本更新方面要向Google的Chrome看齐了。但64位的FireFox却始终步履蹒跚&#xff0c;而且它非常早期和不稳定&#xff0c;对于64位系统的朋友们…

Bert+FGSM/PGD实现中文文本分类(Loss=0.5L1+0.5L2)

任务目标&#xff1a;在使用FGSM/PGD来训练Bert模型进行文本分类&#xff0c;其实现原理可以简单概括为以下几个步骤&#xff1a; 对原始文本每个词转换为对应的嵌入向量。将每个嵌入向量与一个小的扰动向量相加&#xff0c;从而生成对抗样本。这个扰动向量的大小可以通过一个超…