跳转至

Spark on cluster

随着测序数据量的增长,目前的软件处理这些大批量的数据越来越费时间,因此不少人将大数据相关的技术应用在生物信息行业,以期加速测序数据的处理。早期apache基金会根据google的三篇论文开发了hadoop套件(HDFS集群文件系统、MapReduce数据处理框架、yarn集群作业调度器)利用大量廉价的服务器和网络,以较低的成本满足互联网行业大数据的处理需求。hadoop的数据处理模型只有map和reduce两种,对数据做复杂处理时编程较为复杂,且hadoop运行时需要在磁盘上频繁地来回读写数据(计算中间结果落盘),导致程序运行时间较长,另外hadoop也不适合迭代计算(如机器学习、图计算等),交互式处理(数据挖掘) 和流式处理(点击日志分析),因此目前生产上使用较少(单指MapReduce数据处理框架,HDFS和yarn还是用得比较多)。

针对hadoop的这些缺点,加州大学柏克莱分校技术人员开发了基于存储器内运算技术的spark大数据处理框架,所有数据最后落盘之前都在内存中分析运算,大大加快了数据处理速度,相比于hadoop,其速度可提高上100倍。Spark由Scala语言编写,提供了多种数据处理方式(hadoop只有2种),大大降低了复杂任务的编程难度。spark可以使用shell交互运行,方便程序调试,同时也原生支持java、python的API。Spark提供了多个领域的计算解决方案,包括批处理、交互式查询(Spark SQL)、实时流处理(Spark Streaming)、机器学习(Spark MLlib)和图计算(Spark GraphX),因此目前spark是实际项目中使用很频繁的一种大数据框架。

由于spark可脱离HDFS和yarn调度器独立运行,因此spark可运行于普通HPC集群上。重点实验室集群的作业调度系统lsf提供了运行spark集群和程序的相关脚本(lsf-spark-submit.sh,lsf-spark-shell.sh,以及我改写的lsf-spark-app.sh)。GATK4提供了多个工具的spark版本(工具名以spark结尾),使其可以轻松地运行在spark集群上,相比单机版本其运算速度有了很大的提高。目前集群上划了部分节点放入spark队列,运行spark作业。下面以HaplotypeCallerSpark为例说明,spark作业如何在集群上运行(lsf-spark-app.sh 后面部分都需要在一行,不能用\断行,这里为了排版能看清人为断行了)。

#BSUB -J gatk
#BSUB -n 100
#BSUB -o %J.gatk.out
#BSUB -e %J.gatk.err
#BSUB -R span[ptile=20]
#BSUB -q spark 

datadir=/public/home/test/haoliu/work/callsnp/data/
master=`echo $LSB_MCPU_HOSTS|awk '{print $1}'`

echo spark master is $master
echo "gatk HaplotypeCaller"

lsf-spark-app.sh --executor-memory 20g --driver-memory 20g --conf spark.driver.userClassPathFirst=false --conf spark.io.compression.codec=lzf 
--conf spark.driver.maxResultSize=0 
--conf spark.executor.extraJavaOptions='-DGATK_STACKTRACE_ON_USER_EXCEPTION=true -Dsamjdk.use_async_io_read_samtools=false 
-Dsamjdk.use_async_io_write_samtools=false -Dsamjdk.use_async_io_write_tribble=false -Dsamjdk.compression_level=2'  
--conf spark.driver.extraJavaOptions='-DGATK_STACKTRACE_ON_USER_EXCEPTION=true -Dsamjdk.use_async_io_read_samtools=false 
-Dsamjdk.use_async_io_write_samtools=false -Dsamjdk.use_async_io_write_tribble=false -Dsamjdk.compression_level=2'  
--conf spark.kryoserializer.buffer.max=1024m --conf spark.yarn.executor.memoryOverhead=600 
/public/home/software/opt/bio/software/GATK/4.1.3.0/gatk-package-4.1.3.0-local.jar HaplotypeCallerSpark 
-R ${datadir}/hg38.fa -I ${datadir}/ERR194146_sort_redup.bam -O ${datadir}/ERR194146.vcf --spark-master spark://${master}:6311

测试数据为人30x的全基因组数据,52GB的fq.gz。HaplotypeCallerSpark测试是用了120核,运行时间为2054s;同样的数据HaplotypeCaller,胖节点上使用100核,运行时间138501s。HaplotypeCallerSpark加速了60多倍,加速效果比较明显。

本文阅读量  次
本站总访问量  次