您好,欢迎来电子发烧友网! ,新用户?[免费注册]

您的位置:电子发烧友网>源码下载>数值算法/人工智能>

如何使用Apache Spark中的DataSource API以实现数据源混合计算的实践

大小:0.6 MB 人气: 2017-10-10 需要积分:1

  本文主要介绍如何使用Apache Spark中的DataSource API以实现多个数据源混合计算的实践,那么这么做的意义何在,其主要归结于3个方面:

  首先,我们身边存在大量的数据,结构化、非结构化,各种各样的数据结构、格局格式,这种数据的多样性本身即是大数据的特性之一,从而也决定了一种存储方式不可能通吃所有。因此,数据本身决定了多种数据源存在的必然性。 其次:从业务需求来看,因为每天会开发各种各样的应用系统,应用系统中所遇到的业务场景是互不相同的,各种各样的需求决定了目前市面上不可能有一种软件架构同时能够解决这么多种业务场景,所以在数据存储包括数据查询、计算这一块也不可能只有一种威廉希尔官方网站 就能解决所有问题。最后,从软件的发展来看,现在市面上出现了越来越多面对某一个细分领域的软件威廉希尔官方网站 ,比如像数据存储、查询搜索引擎,MPP数据库,以及各种各样的查询引擎。这么多不同的软件中,每一个软件都相对擅长处理某一个领域的业务场景,只是涉及的领域大小不相同。因此,越来越多软件的产生也决定了我们所接受的数据会存储到越来越多不同的数据源。

  Apache Spark的多数据源方案

  传统方案中,实现多数据源通常有两种方案:冗余存储,一份业务数据有多个存储,或者内部互相引用;集中的计算,不同的数据使用不同存储,但是会在统一的地方集中计算,算的时候把这些数据从不同位置读取出来。下面一起讨论这两种解决方案中存在的问题:

  如何使用Apache Spark中的DataSource API以实现数据源混合计算的实践

  图1 多数据源方案

  第一种方案中存在的一个问题是数据一致性,一样的数据放在不同的存储里面或多或少会有格式上的不兼容,或者查询的差异,从而导致从不同位置查询的数据可能出现不一致。比如有两个报表相同的指标,但是因为是放在不同存储里查出来的结果对不上,这点非常致命。第二个问题是存储的成本,随着存储成本越来越低,这点倒是容易解决。

  第二种方案也存在两个问题,其一是不同存储出来的数据类型不同,从而在计算时需求相互转换,因此如何转换至关重要。第二个问题是读取效率,需要高性能的数据抽取机制,尽量避免从远端读取不必要的数据,并且需要保证一定的并发性。

  Spark在1.2.0版本首次发布了一个新的DataSourceAPI,这个API提供了非常灵活的方案,让Spark可以通过一个标准的接口访问各种外部数据源,目标是让Spark各个组件以非常方便的通过SparkSQL访问外部数据源。很显然,Spark的DataSourceAPI其采用的是方案二,那么它是如何解决其中那个的问题的呢?

  如何使用Apache Spark中的DataSource API以实现数据源混合计算的实践

  图2 External Datasource API

  首先,数据类型转换,Spark中定义了一个统一的数据类型标准,不同的数据源自己定义数据类型的转换方法,这样解决数据源之间相互类型转换的问题;

  关于数据处理效率的问题,Spark定义了一个比较简单的API的接口,主要有3个方式:

  1./* 全量数据抽取 */

  2.trait TableScan {

  3.def buildScan(): RDD[Row]

  4.}

  5.

  6./* 列剪枝数据抽取 */

  7.trait PrunedScan {

  8.def buildScan(requiredColumns: Array[String]): RDD[Row]

  9.}

  10.

  11./* 列剪枝+行过滤数据抽取 */

  12.trait PrunedFilteredScan {

  13.def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row]

  14.}

  TableScan。这种方式需要将1TB的数据从数据抽取,再把这些数据传到Spark中。在把这1TB的数据穿过网络IO传给Spark端之后,Spark还要逐行的进行过滤,从而消耗大量的计算资源,这是目前最低效的方式。

  PrunedScan。这个方式有一个好处是数据源只需要从磁盘读取1TB的数据,并只返回一些列的数据,Spark不需要计算就可以使用1GB的数据,这个过程中节省了大量的网络IO。

  PrunedFilteredScan。它需要数据源既支持列过滤也支持行过滤,其好处是在磁盘IO这一层进行数据过滤,因此如果需要1GB数据,可能只抽出2GB大小,经过列过滤的规则再抽出1GB的数据,随后传给Spark,因此这种数据源接口最高效,这也是目前市面上实现的最高效的数据接口。

  可直接使用的DataSource实现

  目前市面上可以找到的Spark DataSource实现代码有三大类:Spark自带;Spark Packages(http://Spark-packages.org/)网站中存放的第三方软件包;跟随其他项目一同发布的内置的Spark的实现。这里介绍其中几个:

  1.JDBCRelation

  1.private[sql] case class JDBCRelation(

  2.url: String,

  3.table: String,

  4.parts: Array[Partition],

  5.properties: Properties = new Properties())(@transient val sqlContext: SQLContext)

  6.extends BaseRelation

  7.with PrunedFilteredScan

  8.with InsertableRelation {

  9…。

  10.}

  以JDBC方式连接外部数据源在国内十分流行,Spark也内置了最高效的PrunedFilteredScan接口,同时还实现了数据插入的接口,使用起来非常方便,可以方便地把数据库中的表用到Spark。以Postgres为例:

  1.sqlContext.read.jdbc(

  2.“jdbc:postgresql://testhost:7531/testdb”,

  3.“testTable”,

  4.“idField”, ——-索引列

  5.10000, ——-起始index

  6.1000000, ——-结束index

  7.10, ——-partition数量

  8.new Properties

  9.).registerTempTable(“testTable”)

  实现机制:默认使用单个Task从远端数据库读取数据,如果设定了partitionColumn、lowerBound、upperBound、numPartitions这4个参数,那么还可以控制Spark把针对这个数据源的访问任务进行拆分,得到numPartitions个任务,每个Executor收到任务之后会并发的去连接数据库的Server读取数据。

  具体类型:PostgreSQL, MySQL。

  问题:在实际使用中需要注意一个问题,所有的Spark都会并发连接一个Server,并发过高时可能会对数据库造成较大的冲击(对于MPP等新型的关系型数据库还好)。

  建议:个人感觉,JDBC的数据源适合从MPP等分布式数据库中读取数据,对于传统意义上单机的数据库建议只处理一些相对较小的数据。

  2.HadoopFsRelation

  第二个在Spark内置的数据源实现,HadoopFs,也是实现中最高效的PrunedFilteredScan接口,使用起来相对来说比JDBC更方便。

  1.sqlContext

  2..read

  3..parquet(“hdfs://testFS/testPath”)

  4..registerTempTable(“test”)

  实现机制:执行的时候Spark在Driver端会直接获取列表,根据文件的格式类型和压缩方式生成多个TASK,再把这些TASK分配下去。Executor端会根据文件列表访问,这种方式访问HDFS不会出现IO集中的地方,所以具备很好的扩展性,可以处理相当大规模的数据。

  具体类型:ORC,Parquet,JSon。

  问题:在实时场景下如果使用HDFS作为数据输出的数据源,在写数据就会产生非常大量零散的数据,在HDFS上积累大量的零碎文件,就会带来很大的压力,后续处理这些小文件的时候也非常头疼。

  建议:这种方式适合离线数据处理程序输入和输出数据,还有一些数据处理Pipeline中的临时数据,数据量比较大,可以临时放在HDFS。实时场景下不推荐使用HDFS作为数据输出。

  3.ElasticSearch

  越来越多的互联网公司开始使用ELK(ElasticSearch+LogStash+Kibana)作为基础数据分析查询的工具,但是有多少人知道其实ElasticSearch也支持在Spark中挂载为一个DataSource进行查询呢?

  1.EsSparkSQL

  2..esDF(hc,indexName,esQuery)

  3..registerTempTable(”testTable”)

  实现机制:ES DataSource的实现机制是通过对esQuery进行解析,将实际要发往多个ES Nodes的请求分为多个Task,在每个Executor上并行执行。

非常好我支持^.^

(0) 0%

不好我反对

(0) 0%

      发表评论

      用户评论
      评价:好评中评差评

      发表评论,获取积分! 请遵守相关规定!