在当前企业生产数据膨胀的时代,数据即使企业的价值所在,也是一家企业的技术挑战所在。所以在海量数据处理场景上,人们意识到单机计算能力再强也无法满足日益增长的数据处理需求,分布式才是解决该类问题的根本解决方案。
而在分布式领域,有两类产品是至关重要的,分别分布式存储和分布式计算,用户只有将两者的特性充分利用,才可以真正发挥分布式架构的存储和计算能力。
本文主要向读者们介绍SequoiaDB(分布式存储)和Spark(分布式计算)两款产品的对接使用,以及介绍在海量数据场景下如何提高统计分析性能。
SequoiaDB是国内为数不多的自主研发的分布式数据库,特点是同时支持文档存储和块存储,支持标准SQL和事务功能,支持复杂索引查询、与Hadoop、Hive、Spark都有较深度的集成。目前SequoiaDB已经在Github开源。
SequoiaDB在分布式存储功能上,较一般的大数据产品提供更多的数据切分规则,包括:水平切分、范围切分、多维分区(类似partition分区)和多维切分方式,用户可以根据不用的场景选择相应的切分方式,以提高系统的存储能力和操作性能。
Spark 近年来发展特别迅猛,特别在正式发布Spark 1.0 版本后,得到了众多硅谷巨头支持,例如:Cloudera、IBM、Hortonworks、Intel等,而且在Spark 2.0宣布支持TPC-DS99后,使用SparkSQL做大数据处理和分析的开发者越来越多,可以预见,Spark将会成为继Hadoop之后最重要和流行的分布式计算框架。
SparkSQL是Spark产品中一个组成部分,SQL的执行引擎使用Spark的RDD和Dataframe实现。目前SparkSQL已经可以完整运行TPC-DS99测试,标志着SparkSQL在数据分析和数据处理场景上技术进一步成熟。
SparkSQL和另外一款流行的大数据SQL产品--Hive有相似之处,例如两者都使用Thriftserver作为JDBC服务,两个产品都使用相同的metadata代码(实际上SparkSQL复用了Hive的metadata代码)。但是两款产品还是有本质上的区别,最大的不同点在于执行引擎,Hive默认支持Hadoop和Tez计算框架,而SparkSQL只支持Spark RDD计算框架,但是SparkSQL的拥有更加深度的执行计划优化和处理引擎优化。
原理介绍
了解Spark技术原理的读者们应该清楚,Spark本身是一款分布式计算框架。它不像Hadoop一样,同时为开发者提供分布式计算和分布式存储,而是开放了存储层的开发接口,只要开发者按照Spark的接口规范实现了接口方法,任何存储产品都可以成为Spark数据计算的来源,同时也包括SparkSQL的数据来源。
SequoiaDB是一款分布式数据库,能够为用户存储海量的数据,但是如果要对海量数据做统计、分析,还是需要借助分布式计算框架的并发计算性能,提高计算效率。
所以SequoiaDB为Spark开发了SequoiaDB for Spark的连接器,让Spark支持从SequoiaDB中并发获取数据,再完成相应的数据计算。
对接方式
Spark和SequoiaDB对接方式比较简单,用户只要将SequoiaDB for Spark 连接器spark-sequoiadb.jar 和SequoiaDB的java 驱动sequoiadb.jar 加入到每个Spark Worker的CLASSPATH 中即可。
例如,用户希望SparkSQL对接到SequoiaDB,可以为spark-env.sh 配置文件中增加SPARK_CLASSPATH参数,如果该参数已经存在,则将新jar 包添加到SPARK_CLASSPATH 参数上,如:
SPARK_CLASSPATH="/media/psf/mnt/sequoiadb-driver-2.9.0-SNAPSHOT.jar:/media/psf/mnt/spark-sequoiadb_2.11-2.9.0-SNAPSHOT.jar"
用户修改完spark-env.sh 配置后,重启spark-sql 或者 thriftserver 就完成了Spark和SequoiaDB的对接。
Spark SQL+SequoiaDB的性能优化将会从connector 计算技术原理、SparkSQL优化、SequoiaDB优化和connector参数优化4个方面进行介绍。
SequoiaDB for SparkSQL connector介绍
1)connector工作原理
Spark产品虽然为用户提供了多种功能模块,但是都只是数据计算的功能模块。Spark产品本身没有任何的存储功能,在默认情况下,Spark是从本地文件服务器或者HDFS上读取数据。而Spark也将它与存储层的接口开放给广大开发者,开发者只要按照Spark接口规范实现其存储层连接器,任何数据源均可称为Spark计算的数据来源。
下图为Spark worker与存储层中datanode的关系。
Spark计算框架与存储层的关系,可以从下图中了解其原理。
Spark master在接收到一个计算任务后,首先会与存储层做一次通讯,从存储层的访问快照或者是存储规划中,得到本次计算任务所设计的所有数据的存储情况。存储层返回给Spark master的结果为数据存储的partition队列。
然后Spark master会将数据存储的partition队列中的partition逐个分配给给Spark worker。Spark work在接收到数据的partition信息后,就能够了解如何获取计算数据。然后Spark work会主动与存储层的node节点进行连接,获取数据,再结合Spark master下发给Spark worker的计算任务,开始数据计算工作。
SequoiaDB for Spark的连接器的实现原理和上述描述基本一致,只是在生成数据计算的partition任务时,连接器会根据Spark下压的查询条件到SequoiaDB中生成查询计划。
如果SequoiaDB能够根据查询条件做索引扫描,连接器生成的partition任务将是让Spark work直接连接SequoiaDB的数据节点。
如果SequoiaDB无法根据查询条件做索引扫描,连接器将获取相关数据表的所有数据块信息,然后根据partitionblocknum和partitionmaxnum参数生成包含若干个数据块连接信息的partititon计算任务。
2)connector参数说明
SequoiaDB for Spark 连接器在SequoiaDB 2.10之后进行了重构,提高Spark并发从SequoiaDB获取数据的性能,参数也有相应的调整。
用户在SparkSQL上创建数据源为SequoiaDB的table,建表模版如下:
create [temporary] <table|view> <name>[(schema)] using com.sequoiadb.spark options (<options>);
SparkSQL创表命令的关键字介绍:
1. temporary 关键字,代表该表或者视图是否为邻时创建的,如果用户标记了temporary 关键字,则该表或者视图在客户端重启后将自动被删除;
2. 建表时用户可以选择不指定表结构,因为如果用户不显式指定表结构,SparkSQL将在建表时自动检测已经存在数据的表结构;
3. com.sequoiadb.spark 关键字为SequoiaDB for Spark connector 的入口类;
4. options 为SequoiaDB for Spark connector的配置参数;
SparkSQL建表例子如下:
create table tableName (name string, id int) using com.sequoiadb.spark options (host 'sdb1:11810,sdb2:11810,sdb3:11810', collectionspace 'foo', collection 'bar', username 'sdbadmin', password 'sdbadmin');
SparkSQL for SequoiaDB的建表options参数列表如下:
名称 | 说明 | 类型 | 默认值 | 是否必填 |
host | SequoiaDB协调节点/独立节点地址,多个地址以”,”分隔。例如:”server1:11810,server2:11810” | string | - | 是 |
collectionspace | 集合空间名称 | string | - | 是 |
collection | 集合名称(不包含集合空间名称) | string | - | 是 |
username | 用户名 | string | “” | 否 |
password | 用户名对应的密码 | string | “” | 否 |
samplingratio | schema采样率,取值(0, 1.0] | double | 1.0 | 否 |
samplingnum | schema采样数量(每个分区),取值大于0。 | long | 1000 | 否 |
samplingwithid | schema采样时是否带”_id”字段,取值为”true”或”false”。 | boolean | false | 否 |
samplingsingle | schema采样时使用一个分区,取值为”true”或”false”。 | boolean | true | 否 |
bulksize | 向SequoiaDB集合插入数据时批插的数据量,取值大于0。 | int | 500 | 否 |
partitionmode | 分区模式,取值可以是”single”,”sharding”,”datablock”,”auto”。 设为auto时根据情况自动选择”sharding”或”datablock”。 | string | auto | 否 |
partitionblocknum | 每个分区的数据块数,在分区模式为”datablock”时有效。取值大于0。 | int | 4 | 否 |
partitionmaxnum | 最大分区数量,在分区模式为”datablock”时有效。取值大于等于0,等于0时表示不限制分区最大数量。由于partitionMaxNum的限制,每个分区的数据块数可能与partitionBlockNum不同。 | int | 1000 | 否 |
用户如果要使用SparkSQL对海量数据做统计分析操作,那么应该从3个方面进行性能调优
1. 调大Spark Worker 最大可用内存大小,防止在计算过程中数据超出内存范围,需要将部分数据写入到临时文件上;
2. 增加Spark Worker 数目,并且设置每个Worker均可以使用当前服务器左右CPU资源,以提高并发能力;
3. 调整Spark的运行参数;
用户可以对spark-env.sh 配置文件进行设置,SPARK_WORKER_MEMORY为控制Worker可用内存的参数,SPARK_WORKER_INSTANCES为每台服务器启动多少个Worker的参数。
如果用户需要调整Spark的运行参数,则应该修改spark-defaults.conf 配置文件,对优化海量数据统计计算有较明显提升的参数有
1) spark.storage.memoryFraction,该参数控制Worker多少内存比例用户存储临时计算数据,默认为0.6,代表60%的含义;
2) spark.shuffle.memoryFraction,该参数控制计算过程中shuffle时能够占用每个Worker的内存比例,默认为0.2,代表20%的含义,如果临时存储的计算数据较少,而计算中有较多的group by、sort、join等操作,应该考虑将spark.shuffle.memoryFraction 调大,spark.storage.memoryFraction调小,避免超出内存部分需要写入临时文件中;
3) spark.serializer,该参数设置Spark在运行时使用哪种序列化方法,默认为org.apache.spark.serializer.JavaSerializer,但是为了提升性能,应该选择org.apache.spark.serializer.KryoSerializer 序列化
SparkSQL+SequoiaDB这种组合,由于数据读取是从SequoiaDB中进行,所以在性能优化应该考虑三点
1. 尽可能将大表的数据分布式存储,所以建议符合二维切分条件的table应该采用多维+Hash切分两种数据均衡方式进行数据分布式存储;
2. 数据导入时,应该避免同时对相同集合空间的多个集合做数据导入,因为同一个集合空间下的多个集合是共用相同一个数据文件,如果同时向相同集合空间的多个集合做数据导入,会导致每个集合下的数据块存储过于离散,从而导致在Spark SQL从SequoiaDB获取海量数据时,需要读取的数据块过多;
3. 如果SparkSQL的查询命令中包含查询条件,应该对应地在SequoiaDB中建立对应字段的索引;
SequoiaDB for Spark 连接器的参数优化,主要分两个场景,一是数据读,另外一个是数据写入。
数据写入的优化空间较少,只有一个参数可以调整,即bulksize参数,该参数默认值为500,代表连接器向SequoiaDB写入数据时,以500条记录组成一个网络包,再向SequoiaDB发送写入请求,通常设置bulksize参数,以一个网络包不超过2MB为准。
数据读取的参数优化,用户则需要关注partitionmode、partitionblocknum和partitionmaxnum三个参数。
partitionmode,连接器的分区模式,可选值有single、sharding、datablock、auto,默认值为auto,代表连接器智能识别。
1. single值代表SparkSQL在访问SequoiaDB数据时,不考虑并发性能,只用一个线程连接SequoiaDB的Coord节点,一般该参数在建表做表结构数据抽样时采用;
2. sharding值代表SparkSQL访问SequoiaDB数据时,采用直接连接SequoiaDB各个datanode的方式,该参数一般采用在SQL命令包含查询条件,并且该查询可以在SequoiaDB中使用索引查询的场景;
3. datablock值代表SparkSQL访问SequoiaDB数据时,采用并发连接SequoiaDB的数据块进行数据读取,该参数一般使用在SQL命令无法在SequoiaDB中使用索引查询,并且查询的数据量较大的场景;
4. auto值代表SparkSQL在向SequoiaDB查询数据时,访问SequoiaDB的方式将由连接器根据不同的情况分析决定;
partitionblocknum,该参数只有在partitionmode=datablock时才会生效,代表每个Worker在做数据计算时,一次获取多少个SequoiaDB数据块读取任务,该参数默认值为4。如果SequoiaDB中存储的数据量较大,计算时涉及到的数据块较多,用户应该调大该参数,使得SparkSQL的计算任务保持在一个合理范围,提高数据读取效率。
partitionmaxnum,该参数只有在partitionmode=datablock时才会生效,代表连接器最多能够生成多少个数据块读取任务,该参数的默认值为1000。该参数主要是为了避免由于SequoiaDB中的数据量过大,导致总的数据块数量太大,从而导致SparkSQL的计算任务过多,而导致总体计算性能下降。
本文从Spark、SequoiaDB以及SequoiaDB for Spark connector三个方面向读者们介绍了海量数据下使用SparkSQL+SequoiaDB的性能调优方法。
文章中介绍的方法具有一定的参考意义,但是性能调优一直都是最考验技术人员的工作。技术人员在对分布式环境做性能调优时,需要综合考虑多个方面的数据,例如:服务器的硬件资源使用情况、Spark运行状况、SequoiaDB数据分布是否合理、连机器的参数设置是否正确、SQL命令是否有调优的空间等,要想性能提升,重点是要求技术人员找到整个系统中的性能短板,然后通过调整不同的参数或者修改存储方案,从而让系统运行得更加高效。