Cassandra Migration Tool

Cassandra BulkLoad 批量导数据工具(1)

Cassandra BulkLoad Example

https://github.com/yukim/cassandra-bulkload-example
http://www.datastax.com/dev/blog/using-the-cassandra-bulk-loader-updated

生成SSTable文件, 通过sstableloader导入到Cassandra

1
2
3
4
cd cassandra-bulkload-example
./gradlew run
cqlsh 192.168.6.52 -f schema.cql
sstableloader -d 192.168.6.52 data/quote/historical_prices

注意点:

编译gradle项目时./gradlew run要修改build.gradle中的cassandra版本和我们使用的一致.

修改sstableloader的内存为50G: 对应的issule:https://issues.apache.org/jira/browse/CASSANDRA-7385
sed -i -e ‘s/-Xmx256M/-Xmx50G/g’ /usr/install/apache-cassandra-2.0.16/bin/sstableloader
解决办法当然是增加内存,从10G一直开始增加,直到50G时才没报错. 否则会报错OOM内存溢出:

执行sstableloader不一定要在目标节点上

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Established connection to initial hosts
Opening sstables and calculating sections to stream
Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
at org.apache.cassandra.utils.EstimatedHistogram$EstimatedHistogramSerializer.deserialize(EstimatedHistogram.java:335)
at org.apache.cassandra.io.sstable.SSTableMetadata$SSTableMetadataSerializer.deserialize(SSTableMetadata.java:463)
at org.apache.cassandra.io.sstable.SSTableMetadata$SSTableMetadataSerializer.deserialize(SSTableMetadata.java:448)
at org.apache.cassandra.io.sstable.SSTableMetadata$SSTableMetadataSerializer.deserialize(SSTableMetadata.java:432)
at org.apache.cassandra.io.sstable.SSTableReader.openMetadata(SSTableReader.java:225)
at org.apache.cassandra.io.sstable.SSTableReader.openForBatch(SSTableReader.java:160)
at org.apache.cassandra.io.sstable.SSTableLoader$1.accept(SSTableLoader.java:107)
at java.io.File.list(File.java:1155)
at org.apache.cassandra.io.sstable.SSTableLoader.openSSTables(SSTableLoader.java:68)
at org.apache.cassandra.io.sstable.SSTableLoader.stream(SSTableLoader.java:150)
at org.apache.cassandra.tools.BulkLoader.main(BulkLoader.java:95)

使用sstableloader命令导入数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
[qihuang.zheng@dp0652 cassandra-bulkload-example]$ sstableloader -d 192.168.6.52 data/quote/historical_prices
Established connection to initial hosts
Opening sstables and calculating sections to stream
Streaming relevant part of data/quote/historical_prices/quote-historical_prices-jb-1-Data.db to [/192.168.6.52, /192.168.6.53, /192.168.6.55, /192.168.6.56, /192.168.6.57]
Streaming session ID: c530bf30-1b09-11e5-9bb4-f9a3de842dc7
progress: [/192.168.6.55 1/1 (100%)] [/192.168.6.53 1/1 (100%)] [total: 100% - 8MB/s (avg: 0MB/s)]
Summary statistics:
Connections per host: : 1
Total files transferred: : 2
Total bytes transferred: : 1113374
Total duration (ms): : 3651
Average transfer rate (MB/s): : 0
Peak transfer rate (MB/s): : 0

[qihuang.zheng@dp0652 cassandra-bulkload-example]$ cqlsh 192.168.6.52
cqlsh> describe KEYSPACES ;
system quote mykeyspace async_examples system_traces
cqlsh> use quote ;
cqlsh:quote> DESCRIBE TABLES ;
historical_prices
cqlsh:quote> select count(*) from historical_prices ;
count
-------
10000
Default LIMIT of 10000 was used. Specify your own LIMIT clause to get more results.

cqlsh:quote> SELECT * FROM historical_prices WHERE ticker = 'ORCL' LIMIT 3;

ticker | date | adj_close | close | high | low | open | volume
--------+--------------------------+-----------+-----------+-----------+-----------+-----------+----------
ORCL | 2015-06-24 00:00:00+0800 | 41.200001 | 41.200001 | 41.77 | 41.200001 | 41.540001 | 17689400
ORCL | 2015-06-23 00:00:00+0800 | 41.709999 | 41.709999 | 42.060001 | 41.48 | 41.939999 | 16803100
ORCL | 2015-06-22 00:00:00+0800 | 41.490002 | 41.490002 | 41.959999 | 41.450001 | 41.799999 | 22333400

本项目用的数据集来自于雅虎的股票数据,指定了ORCL,YHOO,GOOG三个公司.
用linux的wc -l统计数据一共12534条. 那么Cassandra在count查询是超过1万条只显示10000.

1
2
3
4
5
➜  Downloads  wc -l table*.csv
7385 table (1).csv
315 table (2).csv
4834 table.csv
12534 total

BulkLoad Example Code

执行cqlsh 192.168.6.52 -f schema.cql会初始化数据库quote和表

1
2
3
4
5
6
7
8
9
10
11
CREATE TABLE IF NOT EXISTS historical_prices (
ticker ascii,
date timestamp,
open decimal,
high decimal,
low decimal,
close decimal,
volume bigint,
adj_close decimal,
PRIMARY KEY (ticker, date)
) WITH CLUSTERING ORDER BY (date DESC);

执行./gradlew run会在当前目录创建data目录

1
2
3
4
5
6
7
[qihuang.zheng@dp0652 cassandra-bulkload-example]$ ll data/quote/historical_prices/
-rw-r--r--. 1 qihuang.zheng users 467 6月 25 15:13 quote-historical_prices-jb-1-CompressionInfo.db
-rw-r--r--. 1 qihuang.zheng users 1072677 6月 25 15:13 quote-historical_prices-jb-1-Data.db
-rw-r--r--. 1 qihuang.zheng users 16 6月 25 15:13 quote-historical_prices-jb-1-Filter.db
-rw-r--r--. 1 qihuang.zheng users 3224 6月 25 15:13 quote-historical_prices-jb-1-Index.db
-rw-r--r--. 1 qihuang.zheng users 4411 6月 25 15:13 quote-historical_prices-jb-1-Statistics.db
-rw-r--r--. 1 qihuang.zheng users 79 6月 25 15:13 quote-historical_prices-jb-1-TOC.txt

上面的数据实际上和我们前面看到的在cqlsh命令行中建表以及插入数据形成的文件是一样的结构
ticker是参数ORCL,YHOO,GOOG. 对每一行数据都通过CQLSSTableWriter写到指定的输出目录data下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Prepare SSTable writer
CQLSSTableWriter.Builder builder = CQLSSTableWriter.builder();
builder.inDirectory(outputDir) // set output directory
.forTable(SCHEMA) // set target schema
.using(INSERT_STMT) // set CQL statement to put data
.withPartitioner(new Murmur3Partitioner()); // set partitioner if needed default is Murmur3Partitioner
CQLSSTableWriter writer = builder.build();

writer.addRow(ticker,
DATE_FORMAT.parse(line.get(0)),
new BigDecimal(line.get(1)),
new BigDecimal(line.get(2)),
new BigDecimal(line.get(3)),
new BigDecimal(line.get(4)),
Long.parseLong(line.get(5)),
new BigDecimal(line.get(6)));

KassandraMRHelper

https://www.fullcontact.com/blog/cassandra-sstables-offline/
https://tech.knewton.com/blog/2013/11/cassandra-and-hadoop-introducing-the-kassandramrhelper/
https://github.com/Knewton/KassandraMRHelper

读取本地SSTable文件, 转换成CSV等格式

修改thrift的版本为0.9.2,并编译jar包:

1
2
git clone https://github.com/Knewton/KassandraMRHelper.git
mvn clean package -P HadoopMapReduce

生成Cassandra需要的SSTable数据文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java -cp ./KassandraMRHelper-with-dependencies.jar:./KassandraMRHelper-0.3.2-tests.jar com.knewton.mapreduce.cassandra.WriteSampleSSTable
usage: WriteSampleSSTable [OPTIONS] <output_dir>
-e,--studentEvents <arg> The number of student events per student to be
generated. Default value is 10
-h,--help Prints this help message.
-s,--students <arg> The number of students (rows) to be generated.
Default value is 100.
mkdir ~/data/kassandra
java -cp ./KassandraMRHelper-with-dependencies.jar:./KassandraMRHelper-0.3.2-tests.jar com.knewton.mapreduce.cassandra.WriteSampleSSTable ~/data/kassandra
ll ~/data/kassandra
-rw-r--r-- 1 zhengqh staff 81K 9 9 16:07 demoKeyspace-StudentEvents-ic-1-Data.db
-rw-r--r-- 1 zhengqh staff 81B 9 9 16:07 demoKeyspace-StudentEvents-ic-1-Digest.sha1
-rw-r--r-- 1 zhengqh staff 16B 9 9 16:07 demoKeyspace-StudentEvents-ic-1-Filter.db
-rw-r--r-- 1 zhengqh staff 2.1K 9 9 16:07 demoKeyspace-StudentEvents-ic-1-Index.db
-rw-r--r-- 1 zhengqh staff 4.3K 9 9 16:07 demoKeyspace-StudentEvents-ic-1-Statistics.db
-rw-r--r-- 1 zhengqh staff 104B 9 9 16:07 demoKeyspace-StudentEvents-ic-1-Summary.db
-rw-r--r-- 1 zhengqh staff 72B 9 9 16:07 demoKeyspace-StudentEvents-ic-1-TOC.txt

使用MRHelper读取数据文件,生成HDFS文件:

1
2
3
4
5
java -cp ./KassandraMRHelper-with-dependencies.jar:./KassandraMRHelper-0.3.2-tests.jar com.knewton.mapreduce.example.SSTableMRExample ~/data/kassandra ~/output/kassandra
ll ~/output/kassandra
total 184
-rw-r--r-- 1 zhengqh staff 0B 9 9 16:13 _SUCCESS
-rw-r--r-- 1 zhengqh staff 90K 9 9 16:13 part-r-00000

直接在本地cat查看:

1
2
➜  ~  cat output/kassandra/kassandra | wc -l
1000

拷贝到Hadoop集群上,查看HDFS文件:

1
2
3
4
5
6
7
/usr/install/hadoop/bin/hadoop fs -cat /user/qihuang.zheng/kassandra
1000099 1441786051350,4240636200168406977,YnOpcnqvJF,qGKjtMQujw,SCVNaYwPmQ,88,1441786050968
1000099 1441786051351,7970945131406557059,wedwqXPAEU,kkxVHMWNPT,vlSDpZWBmw,56,1441786050968
1000099 1441786051352,5922454426850141034,LkQKbFNjFg,HycByuodgg,nvVSgDCRJg,57,1441786050968

/usr/install/hadoop/bin/hadoop fs -cat /user/qihuang.zheng/kassandra | wc -l
1000

HDFS输出的key,value以tab分隔. 默认100个students,每个student10个event,总共1000条记录.

TODO: 这里读取的是本地的SSTable文件,并写到本地, 可不可以读取HDFS上的SSTabel文件?

Other Tool


文章目录
  1. 1. Cassandra BulkLoad Example
    1. 1.1. BulkLoad Example Code
  2. 2. KassandraMRHelper
  3. 3. Other Tool