‘壹’ Spark 处理小文件
不论是Hive还是Spark sql在使用过程中都可能会遇到小文件过多的问题。小文件过多最直接的表现是任务执行时间长,查看Spark log会发现大量的数据移动的日志。我们可以查看log中展现的日志信息,去对应的路径下查看文件的大小和个数。
通过上述命令可以查看文件的个数以及大小。count查看出的文件大小单位是B,需要转换为MB。
在spark官方的推荐文档中,parquet格式的文件推荐大小是128MB,小于该大小的均可以称之为小文件,在实际的工作,往往小文件的大小仅仅为几KB,表现为,可能文件大小为几百MB,但是文件个数可能到达了几十万个。一般来说,我们可以通过简单相除获得文件的平均大小,如果文件数目不多,我们也可以通过下述命令获得每个文件的大小。
1.任务执行时间长
2.真实的文件大小独占一个数据存储块,存放到DataNode节点中。同时 DataNode一般默认存三份副本,以保障数据安全。同时该文件所存放的位置也写入到NameNode的内存中,如果有Secondary NameNode高可用节点,也可同时复制一份过去。NameNode的内存数据将会存放到硬盘中,如果HDFS发生重启,将产生较长时间的元数据从硬盘读到内存的过程。
3.不论在Hive还是在Spark中,每一个存储块都对应一个Map程序,一个Map呈现就需要一个JVM,启动一个JVM去读取或者写小文件是吃力不讨好的行为。在实际的生产中,为了更好的管理集群资源,一般会要求程序执行时限制Executor数量和每个Executor的核心数量,需要频繁创建Executor来读取写入。
5.影响磁盘寻址时间
小文件合并,本质上就是通过某种操作,将一系列小文件合并成大文件。我们知道,以MapRece为代表的大数据系统,都习惯用K-V键值对的形式来处理文件,最后文件落盘,也是一个rece对应一个输出文件。所以直观上,我们可以减少rece数量,达到减少文件数量的目的。
从Map到Rece需要一个Shuffle过程,所以我们将小文件合并理解为通过一个Shuffle,合并小文件成一个大文件。基于这样的思想,我们的策略可以分为两类:一类是原来的计算已经有Shuffle了,那么我们可以认为控制输出文件的数量;二类是强制触发Shuffle,进行小文件合并。
1-设置参数 (一般用于Hive)
2-distribute by rand()
往动态分区插入数据时,在已经写好的SQL末尾加上distribute by rand()
该算子只是起到打散的效果,但是我们还要设置文件的大小,以免打散后仍然有小文件。
表示每个rece的大小,Hive可以数据总量,得到rece个数,假设hive认为会有10个rece,那么,这里rand()则会为 x % 10
3-group by
我们知道,group by算子会触发Shuffle,因此只要我们设置好Shuffle时的文件个数就好,在Spark SQL中,我们可以设置partition个数,因为一个partition会对应一个文件。
上述的操作,会触发shuffle,因此我们再设置partition个数。
则表示,shuffle后,只会产生10个partition.
4-repartition()
5-coalesce()
需要注意的是,4和5都是spark 2.4以及以后才会支持的。
‘贰’ AWS Glue中使用Spark SQL
AWS Glue 是一项完全托管的提取、转换和加载 (ETL) 服务,让客户能够轻松准备和加载数据进行分析。您只需在 AWS 管理控制台中单击几次,即可创建并运行 ETL 作业。您只需将 AWS Glue 指向存储在 AWS 上的数据,AWS Glue 便会发现您的数据,并将关联的元数据(例如表定义和架构)存储到 AWS Glue 数据目录中。存入目录后,您的数据可立即供 ETL 搜索、查询和使用。
Glue提供了DynamicFrame来操作数据,但如果用户习惯用Spark SQL来做ETL,那是否可行呢? 本文就做了一个尝试:
首先我们创建一个基本的Glue Job,选择Spark,这里要注意在Job parameters里面加上
--enable-glue-datacatalog = true
这是为了在Spark SQL中使用Glue的元数据。
之后其他步骤都随意选择,进入脚本编辑环境,将脚本替换成如下:
这里做了一个简单的insert overwrite操作,从表testdata1中选择数据到表table_6。
尝试运行Job,等待7-8分钟后就可以看到任务完成了。此时去检查table_6的数据,已经有了。
‘叁’ Spark SQL CLI的元数据库和数据默认情况下分别存在什么地方
默认使用derby数据库,存在本地文件
‘肆’ 怎样的架构设计才是真正的数据仓库架构
一直想整理一下这块内容,既然是漫谈,就想起什么说什么吧。我一直是在互联网行业,就以互联网行业来说。
先大概列一下互联网行业数据仓库、数据平台的用途:
整合公司所有业务数据,建立统一的数据中心;
提供各种报表,有给高层的,有给各个业务的;
为网站运营提供运营上的数据支持,就是通过数据,让运营及时了解网站和产品的运营效果;
为各个业务提供线上或线下的数据支持,成为公司统一的数据交换与提供平台;
分析用户行为数据,通过数据挖掘来降低投入成本,提高投入效果;比如广告定向精准投放、用户个性化推荐等;
开发数据产品,直接或间接为公司盈利;
建设开放数据平台,开放公司数据;
。。。。。。
- 上面列出的内容看上去和传统行业数据仓库用途差不多,并且都要求数据仓库/数据平台有很好的稳定性、可靠性;但在互联网行业,除了数据量大之外,越来越多的业务要求时效性,甚至很多是要求实时的 ,另外,互联网行业的业务变化非常快,不可能像传统行业一样,可以使用自顶向下的方法建立数据仓库,一劳永逸,它要求新的业务很快能融入数据仓库中来,老的下线的业务,能很方便的从现有的数据仓库中下线;
- 其实,互联网行业的数据仓库就是所谓的敏捷数据仓库,不但要求能快速的响应数据,也要求能快速的响应业务;
- 建设敏捷数据仓库,除了对架构技术上的要求之外,还有一个很重要的方面,就是数据建模,如果一上来就想着建立一套能兼容所有数据和业务的数据模型,那就又回到传统数据仓库的建设上了,很难满足对业务变化的快速响应。应对这种情况,一般是先将核心的持久化的业务进行深度建模(比如:基于网站日志建立的网站统计分析模型和用户浏览轨迹模型;基于公司核心用户数据建立的用户模型),其它的业务一般都采用维度+宽表的方式来建立数据模型。这块是后话。
- 整体架构下面的图是我们目前使用的数据平台架构图,其实大多公司应该都差不多:
- 逻辑上,一般都有数据采集层、数据存储与分析层、数据共享层、数据应用层。可能叫法有所不同,本质上的角色都大同小异。
- 我们从下往上看:
- 数据采集数据采集层的任务就是把数据从各种数据源中采集和存储到数据存储上,期间有可能会做一些简单的清洗。
- 数据源的种类比较多:
网站日志:
- 作为互联网行业,网站日志占的份额最大,网站日志存储在多台网站日志服务器上,
- 一般是在每台网站日志服务器上部署flume agent,实时的收集网站日志并存储到HDFS上;
业务数据库:
- 业务数据库的种类也是多种多样,有Mysql、Oracle、SqlServer等,这时候,我们迫切的需要一种能从各种数据库中将数据同步到HDFS上的工具,Sqoop是一种,但是Sqoop太过繁重,而且不管数据量大小,都需要启动MapRece来执行,而且需要Hadoop集群的每台机器都能访问业务数据库;应对此场景,淘宝开源的DataX,是一个很好的解决方案(可参考文章 《异构数据源海量数据交换工具-Taobao DataX 下载和使用》),有资源的话,可以基于DataX之上做二次开发,就能非常好的解决,我们目前使用的DataHub也是。
- 当然,Flume通过配置与开发,也可以实时的从数据库中同步数据到HDFS。
来自于Ftp/Http的数据源:
- 有可能一些合作伙伴提供的数据,需要通过Ftp/Http等定时获取,DataX也可以满足该需求;
其他数据源:
- 比如一些手工录入的数据,只需要提供一个接口或小程序,即可完成;
- 数据存储与分析毋庸置疑,HDFS是大数据环境下数据仓库/数据平台最完美的数据存储解决方案。
- 离线数据分析与计算,也就是对实时性要求不高的部分,在我看来,Hive还是首当其冲的选择,丰富的数据类型、内置函数;压缩比非常高的ORC文件存储格式;非常方便的SQL支持,使得Hive在基于结构化数据上的统计分析远远比MapRece要高效的多,一句SQL可以完成的需求,开发MR可能需要上百行代码;
- 当然,使用Hadoop框架自然而然也提供了MapRece接口,如果真的很乐意开发Java,或者对SQL不熟,那么也可以使用MapRece来做分析与计算;Spark是这两年非常火的,经过实践,它的性能的确比MapRece要好很多,而且和Hive、Yarn结合的越来越好,因此,必须支持使用Spark和SparkSQL来做分析和计算。因为已经有Hadoop Yarn,使用Spark其实是非常容易的,不用单独部署Spark集群,关于Spark On Yarn的相关文章,可参考:《Spark On Yarn系列文章》
- 实时计算部分,后面单独说。
- 数据共享这里的数据共享,其实指的是前面数据分析与计算后的结果存放的地方,其实就是关系型数据库和NOSQL数据库;
- 前面使用Hive、MR、Spark、SparkSQL分析和计算的结果,还是在HDFS上,但大多业务和应用不可能直接从HDFS上获取数据,那么就需要一个数据共享的地方,使得各业务和产品能方便的获取数据;和数据采集层到HDFS刚好相反,这里需要一个从HDFS将数据同步至其他目标数据源的工具,同样,DataX也可以满足。
- 另外,一些实时计算的结果数据可能由实时计算模块直接写入数据共享。
- 数据应用
业务产品
- 业务产品所使用的数据,已经存在于数据共享层,他们直接从数据共享层访问即可;
报表
- 同业务产品,报表所使用的数据,一般也是已经统计汇总好的,存放于数据共享层;
即席查询
- 即席查询的用户有很多,有可能是数据开发人员、网站和产品运营人员、数据分析人员、甚至是部门老大,他们都有即席查询数据的需求;
- 这种即席查询通常是现有的报表和数据共享层的数据并不能满足他们的需求,需要从数据存储层直接查询。
- 即席查询一般是通过SQL完成,最大的难度在于响应速度上,使用Hive有点慢,目前我的解决方案是SparkSQL,它的响应速度较Hive快很多,而且能很好的与Hive兼容。
- 当然,你也可以使用Impala,如果不在乎平台中再多一个框架的话。
OLAP
- 目前,很多的OLAP工具不能很好的支持从HDFS上直接获取数据,都是通过将需要的数据同步到关系型数据库中做OLAP,但如果数据量巨大的话,关系型数据库显然不行;
- 这时候,需要做相应的开发,从HDFS或者HBase中获取数据,完成OLAP的功能;
- 比如:根据用户在界面上选择的不定的维度和指标,通过开发接口,从HBase中获取数据来展示。
其它数据接口
- 这种接口有通用的,有定制的。比如:一个从Redis中获取用户属性的接口是通用的,所有的业务都可以调用这个接口来获取用户属性。
- 实时计算现在业务对数据仓库实时性的需求越来越多,比如:实时的了解网站的整体流量;实时的获取一个广告的曝光和点击;在海量数据下,依靠传统数据库和传统实现方法基本完成不了,需要的是一种分布式的、高吞吐量的、延时低的、高可靠的实时计算框架;Storm在这块是比较成熟了,但我选择Spark Streaming,原因很简单,不想多引入一个框架到平台中,另外,Spark Streaming比Storm延时性高那么一点点,那对于我们的需要可以忽略。
- 我们目前使用Spark Streaming实现了实时的网站流量统计、实时的广告效果统计两块功能。
- 做法也很简单,由Flume在前端日志服务器上收集网站日志和广告日志,实时的发送给Spark Streaming,由Spark Streaming完成统计,将数据存储至Redis,业务通过访问Redis实时获取。
- 任务调度与监控在数据仓库/数据平台中,有各种各样非常多的程序和任务,比如:数据采集任务、数据同步任务、数据分析任务等;
- 这些任务除了定时调度,还存在非常复杂的任务依赖关系,比如:数据分析任务必须等相应的数据采集任务完成后才能开始;数据同步任务需要等数据分析任务完成后才能开始;这就需要一个非常完善的任务调度与监控系统,它作为数据仓库/数据平台的中枢,负责调度和监控所有任务的分配与运行。
- 前面有写过文章,《大数据平台中的任务调度与监控》,这里不再累赘。
- 总结在我看来架构并不是技术越多越新越好,而是在可以满足需求的情况下,越简单越稳定越好。目前在我们的数据平台中,开发更多的是关注业务,而不是技术,他们把业务和需求搞清楚了,基本上只需要做简单的SQL开发,然后配置到调度系统就可以了,如果任务异常,会收到告警。这样,可以使更多的资源专注于业务之上。
‘伍’ 技术选型 - OLAP大数据技术哪家强
Lambda架构的核心理念是“流批一体化”,因为随着机器性能和数据框架的不断完善,用户其实不关心底层是如何运行的,批处理也好,流式处理也罢,能按照统一的模型返回结果就可以了,这就是Lambda架构诞生的原因。现在很多应用,例如Spark和Flink,都支持这种结构,也就是数据进入平台后,可以选择批处理运行,也可以选择流式处理运行,但不管怎样,一致性都是相同的。
Kylin
Kylin的主要特点是预计算,提前计算好各个cube,这样的优点是查询快速,秒级延迟;缺点也非常明显,灵活性不足,无法做一些 探索 式的,关联性的数据分析。
适合的场景也是比较固定的,场景清晰的地方。
ClickHouse
Clickhouse由俄罗斯yandex公司开发。专为在线数据分析而设计。
Clickhouse最大的特点首先是快 ,为了快采用了列式储存,列式储存更好的支持压缩,压缩后的数据传输量变小,所以更快;同时支持分片,支持分布式执行,支持SQL。
ClickHouse很轻量级,支持数据压缩和最终数据一致性,其数据量级在PB级别。
另外Clickhouse不是为关联分析而生,所以多表关联支持的不太好。
同样Clickhouse不能修改或者删除数据,仅能用于批量删除或修改。没有完整的事务支持,不支持二级索引等等,缺点也非常明显。
与Kylin相比ClickHouse更加的灵活,sql支持的更好,但是相比Kylin,ClickHouse不支持大并发,也就是不能很多访问同时在线。
总之ClickHouse用于在线数据分析,支持功能简单。CPU 利用率高,速度极快。最好的场景用于行为统计分析。
Hive
Hive这个工具,大家一定很熟悉,大数据仓库的首选工具。可以将结构化的数据文件映射为一张数据库表,并提供完整的sql查询功能。
主要功能是可以将sql语句转换为相对应的MapRece任务进行运行,这样可能处理海量的数据批量,
Hive与HDFS结合紧密,在大数据开始初期,提供一种直接使用sql就能访问HDFS的方案,摆脱了写MapRece任务的方式,极大的降低了大数据的门槛。
当然Hive的缺点非常明显,定义的是分钟级别的查询延迟,估计都是在比较理想的情况。 但是作为数据仓库的每日批量工具,的确是一个稳定合格的产品。
Presto
Presto极大的改进了Hive的查询速度,而且Presto 本身并不存储数据,但是可以接入多种数据源,并且支持跨数据源的级联查询,支持包括复杂查询、聚合、连接等等。
Presto没有使用MapRece,它是通过一个定制的查询和执行引擎来完成的。它的所有的查询处理是在内存中,这也是它的性能很高的一个主要原因。
Presto由于是基于内存的,缺点可能是多张大表关联操作时易引起内存溢出错误。
另外Presto不支持OLTP的场景,所以不要把Presto当做数据库来使用。
Presto相比ClickHouse优点主要是多表join效果好。相比ClickHouse的支持功能简单,场景支持单一,Presto支持复杂的查询,应用范围更广。
Impala
Impala是Cloudera 公司推出,提供对 HDFS、Hbase 数据的高性能、低延迟的交互式 SQL 查询功能。
Impala 使用 Hive的元数据, 完全在内存中计算。是CDH 平台首选的 PB 级大数据实时查询分析引擎。
Impala 的缺点也很明显,首先严重依赖Hive,而且稳定性也稍差,元数据需要单独的mysql/pgsql来存储,对数据源的支持比较少,很多nosql是不支持的。但是,估计是cloudera的国内市场推广做的不错,Impala在国内的市场不错。
SparkSQL
SparkSQL的前身是Shark,它将 SQL 查询与 Spark 程序无缝集成,可以将结构化数据作为 Spark 的 RDD 进行查询。
SparkSQL后续不再受限于Hive,只是兼容Hive。
SparkSQL提供了sql访问和API访问的接口。
支持访问各式各样的数据源,包括Hive, Avro, Parquet, ORC, JSON, and JDBC。
Drill
Drill好像国内使用的很少,根据定义,Drill是一个低延迟的分布式海量数据交互式查询引擎,支持多种数据源,包括hadoop,NoSQL存储等等。
除了支持多种的数据源,Drill跟BI工具集成比较好。
Druid
Druid是专为海量数据集上的做高性能 OLAP而设计的数据存储和分析系统。
Druid 的架构是 Lambda 架构,分成实时层和批处理层。
Druid的核心设计结合了数据仓库,时间序列数据库和搜索系统的思想,以创建一个统一的系统,用于针对各种用例的实时分析。Druid将这三个系统中每个系统的关键特征合并到其接收层,存储格式,查询层和核心体系结构中。
目前 Druid 的去重都是非精确的,Druid 适合处理星型模型的数据,不支持关联操作。也不支持数据的更新。
Druid最大的优点还是支持实时与查询功能,解约了很多开发工作。
Ku
ku是一套完全独立的分布式存储引擎,很多设计概念上借鉴了HBase,但是又跟HBase不同,不需要HDFS,通过raft做数据复制;分片策略支持keyrange和hash等多种。
数据格式在parquet基础上做了些修改,支持二级索引,更像一个列式存储,而不是HBase schema-free的kv方式。
ku也是cloudera主导的项目,跟Impala结合比较好,通过impala可以支持update操作。
ku相对于原有parquet和ORC格式主要还是做增量更新的。
Hbase
Hbase使用的很广,更多的是作为一个KV数据库来使用,查询的速度很快。
Hawq
Hawq是一个Hadoop原生大规模并行SQL分析引擎,Hawq采用 MPP 架构,改进了针对 Hadoop 的基于成本的查询优化器。
除了能高效处理本身的内部数据,还可通过 PXF 访问 HDFS、Hive、HBase、JSON 等外部数据源。HAWQ全面兼容 SQL 标准,还可用 SQL 完成简单的数据挖掘和机器学习。无论是功能特性,还是性能表现,HAWQ 都比较适用于构建 Hadoop 分析型数据仓库应用。
‘陆’ sparkSQL用jdbc连接hive和用元数据连接hive的区别,各自优缺点
spark on hive : 是spark 通过spark-sql 使用hive 语句操作hive ,底层运行的还是 spark rdd.
*(1)就是通过sparksql,加载hive的配置文件,获取到hive的元数据信息
* (2)spark sql获取到hive的元数据信息之后就可以拿到hive的所有表的数据
* (3)接下来就可以通过spark sql来操作hive表中的数据
hive on spark: 是hive 等的执行引擎变成spark , 不再是maprece. 相对于上一项,这个要实现责麻烦很多, 必须重新编译你的spark. 和导入jar包,
‘柒’ spark 怎么获取dataframe的元数据信息
而case class类就是继承了Proct。我们所熟悉的TupleN类型也是继承了scala.Proct类的,所以我们也可以通过TupleN来创建DataFrame:
[python] view plain
val mobiles=sqlContext.createDataFrame(Seq((1,"Android"), (2, "iPhone"))) mobiles.printSchema mobiles.show()
root
|-- _1: integer (nullable = false)
|-- _2: string (nullable = true)
+---+-------+
| _1| _2|
‘捌’ hive kerberos sparksql怎么创建hivecontext
park+shark ,可以直接用hive原来的表。
phpHiveAdmin将HQL请求发送给HAproxy负载的Hive server集群。 三、phpHiveAdmin读取Metadata的数据,注意这里是只读,并不存在对Metadata的读写。因为元数据非常重要,涉及到底层数据的正确性,所以不能随意修改。
‘玖’ spark与hive查询得出的数据不同
在实际工作的情况中,经常有spark与hive查询出来的数据存在不一样的情况,基本的原因如下: 1、由于精度不一样导致的 2、更多的时候确实是由于元数据混乱导致的 (就是说hive中能读到这个字段的值,但是在spark中却无法读取到该字段的值。 很多时候可能还是由于大小写的混乱所导致的) 同一条sql,hive能生成表,而spark却生成的一张空表,或者数据缺少,存在null值,与hive结果不一致 设置 spark.sql.hive.convertMetastoreOrc=false convertMetastoreParquet=false 原因: spark用自己的格式读取hive文件后进行自动转换后进行操作 官方说明‘拾’ 【数仓】对比spark-hive的两种分布式计算模式
最近在学习过程中发现SparkSQL、Hive on Spark、Spark on Hive是非常容易混淆的的概念。了解三者的关系前,先要先明白几个概念。
相对于HIve on MapRece,本质上来说,Hive on Spark是Hive把自己的引擎从MapRece替换为更高效的SparkRDD。数据源是hive本身,当我们执行HQL时底层已经不再是将HQL转换为MapRece任务,而是跑SparkRDD任务。
在hive-site-xml中把hive.execution.engine配置换成spark,在hive所在节点安装Spark并配置环境变量。外部远程登录或者hive命令行模式就会执行spark任务了。
即:Hive on Spark = HQL解析 + SparkRDD引擎
Spark on Hive是以Spark角度看Hive是数据源,在Spark中配置Hive,并获取Hive中的元数据,然后用SparkSQL操作hive表的数据并直接翻译成SparkRDD任务。Hive只是作为一个Spark的数据源。
bin/spark-sql、bin/spark-submit采用的是这种方式。提交任务的jar必须带着hive-site.xml的配置。
即:Spark on Hive = SparkSql解析 + SparkRDD引擎
Spark on Hive的模式更加丝滑,性能更好。
HIve on Spark的模式对大数据周边组件的支持兼容性更好。