HBase多线程Query优化查询速度

HBase多线程查询

查询方式

1.hbase shell查询:

1
get "data.md5_id2","000045d573fb248697bb9c3f8536c1b5"

2.REST查询需要启动REST服务器: hbase-daemon.sh start rest -p 8887

http://localhost:8887/data.md5_id2/000045d573fb248697bb9c3f8536c1b5
http://localhost:8887/data.md5_id2/000045d573fb248697bb9c3f8536c1b5/id:id只查询列族=id,列名=id的列.
不过值却是用base64加密的. 具体的REST操作可以查看cloudera的三篇how-to.

1
2
3
4
5
6
7
<CellSet>
<Row key="MDAwMDQ1ZDU3M2ZiMjQ4Njk3YmI5YzNmODUzNmMxYjU=">
<Cell column="aWQ6YzE=" timestamp="1450359965709">MDA=</Cell>
<Cell column="aWQ6YzI=" timestamp="1450359965709">MDA=</Cell>
<Cell column="aWQ6aWQ=" timestamp="1450359965709">MTEwMTAwMTk1NTAzMjA2MjI0</Cell>
</Row>
</CellSet>

3.API查询

Get指定row-key, 并添加要查询的column-family和column-name. 调用HTable.get返回Result.

单线程

要查询的数据集在一个文件中, 读取文件的每行记录,构造Get对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static void selectFromFile(String filePath) throws Exception{
File file = new File(filePath);
BufferedInputStream fis = new BufferedInputStream(new FileInputStream(file));
BufferedReader reader = new BufferedReader(new InputStreamReader(fis,"utf-8"),1024);
StringBuffer sb = new StringBuffer();
String rowKey = "";
List<String> miss = new ArrayList();
while((rowKey = reader.readLine()) != null){
Get g = new Get(Bytes.toBytes(rowKey));
g.addColumn(Bytes.toBytes(queryFamily), Bytes.toBytes(queryColumn));
Result r = table.get(g);
if(r.raw() != null && r.raw().length>0){
sb.append(rowKey).append(",").append(new String(r.raw()[0].getValue())).append("\n");
}
}
fis.close();
reader.close();
append(fileResultPath, sb.toString());
}

多线程1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
public static void runMultiThread(String filePath) throws Exception{
File file = new File(filePath);
BufferedInputStream fis = new BufferedInputStream(new FileInputStream(file));
BufferedReader reader = new BufferedReader(new InputStreamReader(fis,"utf-8"),1024);
final String[] number = new String[]{"0","1","2","3","4","5","6","7","8","9","a","b","c","d","e","f"};
Map<String,List<String>> maps = new HashMap(){{
for(String n1:number){
put(""+n1, new ArrayList());
}
}};
String line = "";
while((line = reader.readLine()) != null){
maps.get(line.substring(0,1)).add(line);
}
fis.close();
reader.close();
//将要查询的文件分成多个Map. 文件读取完毕,接下来就直接针对Map操作了.

int tableN = maps.size();
//multi htable instance
final HTable[] readTables = new HTable[tableN];
for (int i = 0; i < tableN; i++) {
readTables[i] = new HTable(config, tableName);
readTables[i].setScannerCaching(100);
}

//thread factory
ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
builder.setNameFormat("ParallelBatchQuery");
ThreadFactory factory = builder.build();
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(tableN, factory);

//future callable
int i=0;
for (final Map.Entry<String, List<String>> entry : maps.entrySet()) {
final int j = i++;
executor.submit(new Runnable() {
@Override
public void run() {
try {
HBaseQuery.selectByKeys(entry.getValue(), readTables[j]);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
executor.shutdown();
while (!executor.isTerminated()) {}
System.out.println("Finished all threads");
}

public static void selectByKeys(List<String> rowKeys, HTable table) throws Exception{
StringBuffer sb = new StringBuffer();
//支持批量读取,直接构造一个List<Get>
List<Get> lstGet = new ArrayList<Get>();
for(String rowKey : rowKeys){
Get g = new Get(Bytes.toBytes(rowKey));
g.addColumn(Bytes.toBytes(queryFamily), Bytes.toBytes(queryColumn));
lstGet.add(g);
}
//Table一次性读取List<Get>,而不需要一条一条Get地查询
Result[] res = table.get(lstGet);
//返回结果.因为一条Get对应一个Result,所以Result也是一个数组
for (Result re : res) {
if (re != null && !re.isEmpty()) {
byte[] key = re.getRow();
byte[] value = re.getValue(queryFamily.getBytes(), queryColumn.getBytes());
if (key != null && value != null) {
sb.append(new String(key)).append(",").append(new String(value)).append("\n");
}
}
}
append(fileResultPath, sb.toString());
}

多线程append存在问题: 有些行是错乱的

原始数据:
13100004006,908ad680fc3e4101e097a139b93432b5,90,8a
13000004594,d4da62ff5a0cda04921b16426c5c853f,d4,da
13200003898,4b68843dd4eb1b1ef0a204f356f89f37,4b,68

结果数据:
40ea3fbf209d1d17ee3e4101e097a139b93432b5,13100004006
d4da62ff5a0cda04921b16426c5c853f,13000004594
de12f0a204f356f89f37,13200003898

问题的原因是: 每个线程并发调用selectByKeys追加到同一个文件造成的. 所以解决方式是不要在selectByKeys直接追加.
而是让selectByKeys返回List表示这一个线程的结果. 最后在future.get时才追加到文件中.

多线程2

由于selectByKeys有返回值,所以要用Callable,而不是前面的Runnable了.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//future callable
List<Future<List<String>>> futures = new ArrayList<Future<List<String>>>(tableN);
int i=0;
for (final Map.Entry<String, List<String>> entry : maps.entrySet()) {
//不同的回调使用不同的HTable实例. 如果需要返回值用Callable,不需要返回值直接用Runnable
MyCallable callable = new MyCallable(entry.getValue(), readTables[i++]);
FutureTask<List<String>> future = (FutureTask<List<String>>) executor.submit(callable);
futures.add(future);
}
executor.shutdown();
while (!executor.isTerminated()) {}
for (Future f : futures) {
if(f.get() != null) {
List<String> result = (List<String>)f.get();
StringBuffer sb = new StringBuffer();
for(String row : result){
sb.append(row).append("\n");
}
append(fileResultPath, sb.toString());
}
}
System.out.println("Finished all threads");

自定义的Callable的keys是这个线程要处理的查询键列表, HTable是为了多线程查询HBase.

1
2
3
4
5
6
7
8
9
10
11
12
13
class MyCallable implements Callable<List<String>> {
private List<String> keys;
private HTable table;

public MyCallable(List<String> lstKeys, HTable table) {
this.keys = lstKeys;
this.table = table;
}

public List<String> call() throws Exception {
return HBaseQuery.selectByKeys(keys, table);
}
}

使用优化后的多线程查询, 18万的数据耗时800多秒.

RegionSplit

在建表时指定了md5_id2预分配成16个Region,但是在导入了部分数据后,Region个数增加到了700多个,平均每个节点分布了有50多个Region.

mob_regions

同样的情况发生在md5_mob2(建表时16个,现在变成了67个). 有些Region的(查询)请求为0.

mob_request

查询每个RegionServer的情况, 分布也很不均匀. 是不是跟我们副本数设置为1有关系?

regionserver_request


文章目录
  1. 1. 查询方式
  2. 2. 单线程
  3. 3. 多线程1
  4. 4. 多线程2
  5. 5. RegionSplit