㈠ sparksql jdbc 求助
将hive-site.xml拷贝到Spark目录下conf文件夹
local模式
spark-sql --driver-class-path /usr/local/hive-1.2.1/lib/mysql-connector-java-5.1.31-bin.jar
或者
需要在$SPARK_HOME/conf/spark-env.sh中的SPARK_CLASSPATH添加jdbc驱动的jar包
export export SPARK_CLASSPATH=$SPARK_CLASSPATH:/usr/local/hive-1.2.1/lib/mysql-connector-java-5.1.31-bin.jar
连接到集群:
spark-sql --master spark://10.8.2.100:7077 --driver-class-path /usr/local/hive-1.2.1/lib/mysql-connector-java-5.1.31-bin.jar
开启thriftserver,指定服务器为Hadoop-master
内网连接:
sbin/start-thriftserver.sh --master spark://10.9.2.100:7077 --driver-class-path /usr/local/hive-1.2.1/lib/mysql-connector-java-5.1.31-bin.jar
外网连接:
sbin/start-thriftserver.sh --hiveconf hive.server2.thrift.port=10000 --hiveconf hive.server2.thrift.bind.host=hadoop-master --master spark://10.9.2.100:7077 --driver-class-path /usr/local/hive-1.2.1/lib/mysql-connector-java-5.1.31-bin.jar
停止thriftserver
㈡ tableau连接spark sql 报错,怎么处理
这个分好几种情况,最常见的是你的数据量太多(小数据正常) 或是需要执行大存储过程(简单的存储过程也正常)时候出现,你需要去官网下在安装对应数据库的驱动程序.
㈢ Ku:Spark SQL操作Ku
摘要: Spark SQL , Ku
参考 https://github.com/xieenze/SparkOnKu/blob/master/src/main/scala/com/spark/test/KuCRUD.scala
引入 spark-core_2.11 , spark-sql_2.11 , ku-spark2_2.11 , hadoop-client 依赖包
指定 ku.master" , ku.table ,如果读取超时加入 ku.operation.timeout.ms 参数
或者
写入数据可以使用dataframe的 write 方法,也可以使用 kuContext 的 updateRows , insertRows , upsertRows , insertIgnoreRows 方法
直接调用dataframe的write方法指定 ku.master , ku.table ,只支持 append 模式,对已有key的数据自动更新
调用kuContext的 upsertRows 方法,效果和dataframe调用write append模式一样
调用kuContext insertRows , insertIgnoreRows 方法,如果插入的数据key已存在insertRows直接报错,insertIgnoreRows忽略已存在的key,只插入不存在的key
调用kuContext updateRows 方法,对已经存在的key数据做更新,如果key不存在直接报错
使用已有dataframe的schema建表
使用 StructType 自定义schema
删除表和判断表是否存在
㈣ spark从hive数据仓库中读取的数据可以使用sparksql进行查询吗
1、为了让Spark能够连接到Hive的原有数据仓库,我们需要将Hive中的hive-site.xml文件拷贝到Spark的conf目录下,这样就可以通过这个配置文件找到Hive的元数据以及数据存放。
在这里由于我的Spark是自动安装和部署的,因此需要知道CDH将hive-site.xml放在哪里。经过摸索。该文件默认所在的路径是:/etc/hive/conf 下。
同理,spark的conf也是在/etc/spark/conf。
此时,如上所述,将对应的hive-site.xml拷贝到spark/conf目录下即可
如果Hive的元数据存放在Mysql中,我们还需要准备好Mysql相关驱动,比如:mysql-connector-java-5.1.22-bin.jar。
2、编写测试代码
val conf=new SparkConf().setAppName("Spark-Hive").setMaster("local")
val sc=new SparkContext(conf)
//create hivecontext
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' ") //这里需要注意数据的间隔符
sqlContext.sql("LOAD DATA INPATH '/user/liujiyu/spark/kv1.txt' INTO TABLE src ");
sqlContext.sql(" SELECT * FROM jn1").collect().foreach(println)
sc.stop()
3、下面列举一下出现的问题:
(1)如果没有将hive-site.xml拷贝到spark/conf目录下,会出现:
分析:从错误提示上面就知道,spark无法知道hive的元数据的位置,所以就无法实例化对应的client。
解决的办法就是必须将hive-site.xml拷贝到spark/conf目录下
(2)测试代码中没有加sc.stop会出现如下错误:
ERROR scheler.LiveListenerBus: Listener EventLoggingListener threw an exception
java.lang.reflect.InvocationTargetException
在代码最后一行添加sc.stop()解决了该问题。
㈤ 如何使用 Spark SQL
一、启动方法
/data/spark-1.4.0-bin-cdh4/bin/spark-sql --master spark://master:7077 --total-executor-cores 10 --executor-memory 1g --executor-cores 2
注:/data/spark-1.4.0-bin-cdh4/为spark的安装路径
/data/spark-1.4.0-bin-cdh4/bin/spark-sql –help 查看启动选项
--master MASTER_URL 指定master url
--executor-memory MEM 每个executor的内存,默认为1G
--total-executor-cores NUM 所有executor的总核数
-e <quoted-query-string> 直接执行查询SQL
-f <filename> 以文件方式批量执行SQL
二、Spark sql对hive支持的功能
1、查询语句:SELECT GROUP BY ORDER BY CLUSTER BY SORT BY
2、hive操作运算:
1) 关系运算:= ==, <>, <, >, >=, <=
2) 算术运算:+, -, *, /, %
3) 逻辑运算:AND, &&, OR, ||
4) 复杂的数据结构
5) 数学函数:(sign, ln, cos, etc)
6) 字符串函数:
3、 UDF
4、 UDAF
5、 用户定义的序列化格式
6、join操作:JOIN {LEFT|RIGHT|FULL} OUTER JOIN LEFT SEMI JOIN CROSS JOIN
7、 unions操作:
8、 子查询: SELECT col FROM ( SELECT a + b AS col from t1) t2
9、Sampling
10、 Explain
11、 分区表
12、 视图
13、 hive ddl功能:CREATE TABLE、CREATE TABLE AS SELECT、ALTER TABLE
14、 支持的数据类型:TINYINT SMALLINT INT BIGINT BOOLEAN FLOAT DOUBLE STRING BINARY TIMESTAMPDATE ARRAY MAP STRUCT
三、Spark sql 在客户端编程方式进行查询数据
1、启动spark-shell
./spark-shell --master spark://master:7077 --total-executor-cores 10 --executor-memory 1g --executor-cores 2
2、编写程序
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.json("../examples/src/main/resources/people.json")
查看所有数据:df.show()
查看表结构:df.printSchema()
只看name列:df.select("name").show()
对数据运算:df.select(df("name"), df("age") + 1).show()
过滤数据:df.filter(df("age") > 21).show()
分组统计:df.groupBy("age").count().show()
1、查询txt数据
import sqlContext.implicits._
case class Person(name: String, age: Int)
val people = sc.textFile("../examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")
2、parquet文件
val df = sqlContext.read.load("../examples/src/main/resources/users.parquet")
3、hdfs文件
val df = sqlContext.read.load("hdfs://namenode.Hadoop:9000/user/hive/warehouse/spark_test.db/test_parquet/part-r-00001.gz.parquet")
4、保存查询结果数据
val df = sqlContext.read.load("../examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet“)
四、Spark sql性能调优
缓存数据表:sqlContext.cacheTable("tableName")
取消缓存表:sqlContext.uncacheTable("tableName")
spark.sql.inMemoryColumnarStorage.compressedtrue当设置为true时,Spark SQL将为基于数据统计信息的每列自动选择一个压缩算法。
spark.sql.inMemoryColumnarStorage.batchSize10000柱状缓存的批数据大小。更大的批数据可以提高内存的利用率以及压缩效率,但有OOMs的风险