import org.apache.spark.sql.SparkSession import org.apache.spark.sql.CarbonSession._ val carbon = SparkSession.builder().config(sc.getConf).getOrCreateCarbonSession("/tmp/carbon") carbon.sql("CREATE TABLE IF NOT EXISTS test_table(id string,name string,city string,age Int)STORED BY 'carbondata'") carbon.sql("LOAD DATA INPATH '/Users/zhengqh/Downloads/spark-2.1.1-bin-hadoop2.7/sample.csv' INTO TABLE test_table") carbon.sql("SELECT city, avg(age), sum(age) FROM test_table GROUP BY city").show()
sql("CREATE TABLE IF NOT EXISTS test_table1(id string,name string,city string,age Int)") sql("insert into table test_table1 values('1','david','shenzhen',31)") sql("insert into table test_table1 values('2','eason','shenzhen',20)") sql("insert into table test_table1 values('3','jarry','wuhan',35)")
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.CarbonSession._ val carbon = SparkSession.builder().config(sc.getConf).getOrCreateCarbonSession("hdfs://tdhdfs/user/tongdun/carbon","/home/admin/carbon")
carbon.sql("CREATE TABLE IF NOT EXISTS test_table2(id string,name string,city string,age Int)STORED BY 'carbondata'") carbon.sql("INSERT INTO test_table2 SELECT * FROM test_table1") // insert #1 carbon.sql("select * from test_table2").show carbon.sql("INSERT INTO test_table2 SELECT * FROM test_table1") // insert again carbon.sql("select * from test_table2").show
carbon.sql("INSERT overwrite table test_table2 SELECT * FROM test_table1") // overwrite
carbon.sql("insert into cross_partner_carbon select * from crosspartner")
spark.sql("select count(*) from cross2partner_dt").show carbon.sql("select count(*) from cross_partner_carbon_dm").show
spark.sql("select * from cross2partner_dt").show carbon.sql("select * from cross_partner_carbon_dm").show
val idnumber="" spark.sql(s"select sequenceId from cross2partner_dt where partnerCode='007fenqi' and eventType='Loan' and idNumber='$idnumber'").show carbon.sql(s"select sequenceId from cross_partner_carbon_dm where partnerCode='007fenqi' and eventType='Loan' and idNumber='$idnumber'").show
比较crosspartner_hdfs的过滤与carbondata的查询
1
carbon.sql("select sequenceId from cross_partner_carbon where partnerCode='qufenqi' and eventType='Loan' and idNumber=''").show
spark.sql("""insert into table crosspartner_hdfs partition(ds='201706') select activity_map.partnerCode as partnerCode, activity_map.eventType as eventType, activity_map.idNumber as idNumber, activity_map.accountMobile as accountMobile, activity_map.accountEmail as accountEmail, activity_map.accountPhone as accountPhone, activity_map.deviceId as deviceId, activity_map.cardNumber as cardNumber, activity_map.contact1Mobile as contact1Mobile, activity_map.contact2Mobile as contact2Mobile, activity_map.contact3Mobile as contact3Mobile, activity_map.contact4Mobile as contact4Mobile, activity_map.contact5Mobile as contact5Mobile, activity_map.contact1IdNumber as contact1IdNumber, activity_map.contact2IdNumber as contact2IdNumber, activity_map.contact3IdNumber as contact3IdNumber, activity_map.contact4IdNumber as contact4IdNumber, activity_map.contact5IdNumber as contact5IdNumber, activity_map.sequenceId as sequenceId from activity where year=2017 and month=6 and activity_map.eventType in('Loan','Lending') """)
import java.text.SimpleDateFormat import java.util.{Calendar,Date} defyear(ymd: String) = ymd.substring(0,4) defmonth(ymd: String) = { var month=ymd.substring(4,6) if(month.startsWith("0")) month=ymd.substring(5,6) month } defday(ymd: String) = { var d=ymd.substring(6,8) if(d.startsWith("0")) d=ymd.substring(7,8) d } //写成parquet文件夹 defgenCrossData(beg: String, end: String) = { var cal = Calendar.getInstance() var datef=newSimpleDateFormat("yyyyMMdd") var beginTime=datef.parse(beg) var endTime=datef.parse(end) while(beginTime.compareTo(endTime)<=0){ cal.setTime(beginTime); var ymd=datef.format(beginTime) println(ymd) var y=year(ymd) var m=month(ymd) var d=day(ymd) spark.sql(s""" select activity_map.partnerCode as partnerCode, activity_map.eventType as eventType, activity_map.idNumber as idNumber, activity_map.accountMobile as accountMobile, activity_map.accountEmail as accountEmail, activity_map.accountPhone as accountPhone, activity_map.deviceId as deviceId, activity_map.cardNumber as cardNumber, activity_map.contact1Mobile as contact1Mobile, activity_map.contact2Mobile as contact2Mobile, activity_map.contact3Mobile as contact3Mobile, activity_map.contact4Mobile as contact4Mobile, activity_map.contact5Mobile as contact5Mobile, activity_map.contact1IdNumber as contact1IdNumber, activity_map.contact2IdNumber as contact2IdNumber, activity_map.contact3IdNumber as contact3IdNumber, activity_map.contact4IdNumber as contact4IdNumber, activity_map.contact5IdNumber as contact5IdNumber, activity_map.sequenceId as sequenceId from activity where year=$y and month=$m and day=$d and activity_map.eventType in('Loan','Lending') """).repartition(1).write.mode("overwrite").parquet(s"/user/hive/warehouse/cross_partner_hdfs/ds=$ymd") cal.add(Calendar.DATE,1); beginTime=cal.getTime(); } } genCrossData("20170101","20170630")
genCrossData("20170621","20170630")
查询parquet,建立临时表,使用SparkSQL查询
1 2 3 4 5 6
val df=spark.read.parquet("/user/hive/warehouse/cross_partner_hdfs/*") df.createOrReplaceTempView("cross_partner_hdfs")
spark.sql("select * from cross_partner_hdfs").show
spark.sql("select sequenceId from cross_partner_hdfs where partnerCode='qufenqi' and eventType='Loan' and idNumber=''").show
使用临时表的数据插入到carbondata table
1 2 3 4
val df=spark.read.parquet("/user/hive/warehouse/cross_partner_hdfs/*") df.createOrReplaceTempView("cross_partner_hdfs")
carbon.sql("insert into cross_partner_carbon select * from cross_partner_hdfs")
import java.text.SimpleDateFormat import java.util.{Calendar,Date} defgenCrossData(beg: String, end: String) = { var cal = Calendar.getInstance() var datef=newSimpleDateFormat("yyyyMMdd") var beginTime=datef.parse(beg) var endTime=datef.parse(end) while(beginTime.compareTo(endTime)<=0){ cal.setTime(beginTime); var ymd=datef.format(beginTime) spark.sql(s"alter table cross2partner_dt add partition(ds='$ymd')") cal.add(Calendar.DATE,1); beginTime=cal.getTime(); } } genCrossData("20170102","20170630")
一次性将所有数据插入carbondata太慢了
1
carbon.sql(s"insert into cross_partner_carbon select * from crosspartner where ds like '$ymd%'")
改用按月/天插入carbondata表
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
import java.text.SimpleDateFormat import java.util.{Calendar,Date} defgenCrossCarbonData(beg: String, end: String) = { var cal = Calendar.getInstance() var datef=newSimpleDateFormat("yyyyMM") var beginTime=datef.parse(beg) var endTime=datef.parse(end) while(beginTime.compareTo(endTime)<=0){ cal.setTime(beginTime); var ymd=datef.format(beginTime) println(ymd) carbon.sql(s"insert into cross_partner_carbon select * from cross2partner_dt where ds like '$ymd%'") cal.add(Calendar.DATE,1); beginTime=cal.getTime(); } } genCrossCarbonData("201701","201706")
carbon.sql("""CREATE TABLE IF NOT EXISTS crosspartner1( ... STORED BY 'carbondata' TBLPROPERTIES ('DICTIONARY_EXCLUDE'='sequenceId,idNumber,accountMobile,accountEmail,accountPhone,deviceId,cardNumber,contact1Mobile,contact2Mobile,contact3Mobile,contact4Mobile,contact5Mobile,contact1IdNumber,contact2IdNumber,contact3IdNumber,contact4IdNumber,contact5IdNumber') """)
carbon.sql("insert into crosspartner1 select * from cross_partner_hdfs")
[WARNING] /Users/zhengqh/Github/carbondata-parent-1.1.1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateCoalescedRDD.scala:23: warning: imported `RDD' is permanently hidden by definition of class RDD in package rdd [INFO] import org.apache.spark.rdd.{CoalescedRDDPartition, DataLoadPartitionCoalescer, RDD} [INFO] ^ [WARNING] /Users/zhengqh/Github/carbondata-parent-1.1.1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala:125: warning: non-variable type argument Any in type pattern scala.collection.Map[Any,Any] is unchecked since it is eliminated by erasure [INFO] case m: scala.collection.Map[Any, Any] => [INFO] ^ [ERROR] /Users/zhengqh/Github/carbondata-parent-1.1.1/integration/spark-common/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala:87: error: value child is not a member of org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable [INFO] case i: InsertIntoTable => process(i.child, nodeList) [INFO] ^ [WARNING] 11 warnings found [ERROR] one error found [INFO] ------------------------------------------------------------------------ [INFO] Reactor Summary: [INFO] [INFO] Apache CarbonData :: Parent ........................ SUCCESS [ 5.140 s] [INFO] Apache CarbonData :: Common ........................ SUCCESS [ 10.114 s] [INFO] Apache CarbonData :: Core .......................... SUCCESS [ 29.232 s] [INFO] Apache CarbonData :: Processing .................... SUCCESS [ 9.828 s] [INFO] Apache CarbonData :: Hadoop ........................ SUCCESS [ 5.719 s] [INFO] Apache CarbonData :: Spark Common .................. FAILURE [01:10 min] [INFO] Apache CarbonData :: Spark Common Test ............. SKIPPED [INFO] Apache CarbonData :: Assembly ...................... SKIPPED [INFO] Apache CarbonData :: Spark2 ........................ SKIPPED [INFO] Apache CarbonData :: Spark2 Examples ............... SKIPPED [INFO] ------------------------------------------------------------------------ [INFO] BUILD FAILURE [INFO] ------------------------------------------------------------------------ [INFO] Total time: 02:10 min [INFO] Finished at: 2017-08-03T14:39:55+08:00 [INFO] Final Memory: 72M/786M [INFO] ------------------------------------------------------------------------ [ERROR] Failed to execute goal org.scala-tools:maven-scala-plugin:2.15.2:compile (default) on project carbondata-spark-common: wrap: org.apache.commons.exec.ExecuteException: Process exited with an error: 1(Exit value: 1) -> [Help 1] [ERROR] [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch. [ERROR] Re-run Maven using the -X switch to enable full debug logging. [ERROR] [ERROR] For more information about the errors and possible solutions, please read the following articles: [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException [ERROR] [ERROR] After correcting the problems, you can resume the build with the command [ERROR] mvn <goals> -rf :carbondata-spark-common
如果使用spark2.1.1编译的二进制包,放到spark2.2.0下,也会报错:
spark-1.6.2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
case class InsertIntoTable( table: LogicalPlan, partition: Map[String, Option[String]], child: LogicalPlan, overwrite: Boolean, ifNotExists: Boolean) extends LogicalPlan {
case class InsertIntoTable( table: LogicalPlan, partition: Map[String, Option[String]], query: LogicalPlan, overwrite: Boolean, ifPartitionNotExists: Boolean) extends LogicalPlan { // We don't want `table` in children as sometimes we don't want to transform it. override def children: Seq[LogicalPlan] = query :: Nil override def output: Seq[Attribute] = Seq.empty override lazy val resolved: Boolean = false }
[INFO] Apache CarbonData :: Assembly ...................... FAILURE [ 2.180 s] [INFO] Apache CarbonData :: Spark2 ........................ SKIPPED [INFO] Apache CarbonData :: Spark2 Examples ............... SKIPPED [INFO] ------------------------------------------------------------------------ [INFO] BUILD FAILURE [INFO] ------------------------------------------------------------------------ [INFO] Total time: 01:57 min [INFO] Finished at: 2017-08-03T15:33:59+08:00 [INFO] Final Memory: 83M/728M [INFO] ------------------------------------------------------------------------ [ERROR] Failed to execute goal on project carbondata-assembly: Could not resolve dependencies for project org.apache.carbondata:carbondata-assembly:pom:1.1.1: Could not find artifact org.apache.carbondata:carbondata-spark:jar:1.1.1 in central (http://repo1.maven.org/maven2) -> [Help 1] [ERROR] [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch. [ERROR] Re-run Maven using the -X switch to enable full debug logging. [ERROR] [ERROR] For more information about the errors and possible solutions, please read the following articles: [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/DependencyResolutionException [ERROR] [ERROR] After correcting the problems, you can resume the build with the command [ERROR] mvn <goals> -rf :carbondata-assembly