跳转至

生信流程软件-nextflow

当生信分析的内容数据比较多、相关的软件参数以及流程比较成熟时,我们可以采用一些现有的流程管理软件来编写生信分析流程,方便项目管理和后来者接手,相比于自己用perl、python什么的来串分析流程,专用的流程管理软件可以使流程结构更清晰、适应很多不同环境的系统以及一些其它很有用的功能。常用的流程软件有nextflow、snakemake、makefile、bpipe、WDL等,见过用得比较多的是snakemake和nextflow,前者基于python,后者基于java拓展语言。

nextflow对云环境和容器的支持比较好,方便流程迁移,还有例如动态资源申请、错误重试(某个步骤跑出错了,可以自动重跑)、类似于程序断点的resume功能(修改了流程脚本后重跑可以利用之前跑出来的数据只从这一步开始跑)、部署简单、 对系统环境依赖很小(只要求有Java8)、不需要系统API就可以支持PBS、SGE、slurm、LSF等作业调度系统以及本地运行,因此这里选择研究使用nextflow。

研究这个软件的初衷是近期看了一些用户跑callsnp的流程,流程写在一个shell脚本里,因为bwa使用20个核心,因此在申请资源的时候写20核,bwa跑完之后资源也无法释放,占着整个节点跑后面samtools、picad之类的一些单线程的程序,造成集群资源的极大浪费,生信分析里面这种情况很常见。还有一个用户痛点,就是在跑trimommatic、GATK等java程序时,极容易因为内存的原因导致程序运行失败,nextflow的动态资源申请、错误重试功能(每次重跑申请更多内存或者提交到大内存队列)可以有效缓解整个问题。

这个软件发到了NBT上,还是可以的,这篇文章里还比较了一下几种主流流程软件,https://www.nature.com/articles/nbt.3820.epdf

nextflow安装

nextflow自己安装,一行命令搞定。如果系统上java版本较低,运行时可能需要安装高版本的java。

安装过程中主要是下载各种java依赖包,依赖包的安装目录为 ~/.nextflow/

curl -s https://get.nextflow.io | bash

也可以使用集群上安装好的,第一次使用时会自动下载java依赖包。

module load nextflow

nextflow基本概念

nextflow与我们平常写的shell、perl、python等脚本,逻辑上不太一样,需要在用的过程中细细体会。

nextflow中有2个核心概念process和channel。

nextflow的主体部分由一个个process组成,process之间相互独立,如果没有channel的关联原则上所有process可以同时并行运行,将任务分发到整个集群上,原则上一个process跑一个软件如bwa、GATK等,可以随便自己写。

process之间由channel沟通,其实channel就是个异步命名管道(asynchronous FIFO queues),负责process的输入与输出,将一个的process的输出channel作为另外一个process的输入channel,实现process之间的先后运行关系。有个特别需要注意的地方是channel在一个process输出后,后面只能被一个process使用,如要被多个process同时使用,可以将整个channel复制成多个不同名字的channel,分别用到不同的channel中。

nextflow中主要内容虽然都是process和channel,但因为各种需要,还是需要写一些脚本的脚本来写能实际使用且功能完善的流程,主要是字符串、变量、数组、maps(perl中hash或python中的字典)正则表达式、条件判断,以及文件操作。细节可以参照https://www.nextflow.io/docs/latest/script.html#pipeline-page

nextflow的脚本语言从一个叫Groovy的东西拓展而来,如果觉得上面那个链接上的东西不够详细,可以参考Groovy的文档 http://groovy-lang.org/syntax.html

process介绍

example1

#!/usr/bin/env nextflow

params.str = 'Hello world!'
process splitLetters {
    output:
    file 'chunk_*' into letters mode flatten
    """    
    printf '${params.str}' | split -b 6 - chunk_    
    """
}

process convertToUpper {
    input:
    file x from letters

    output:
    stdout result
    """    
        cat $x | tr '[a-z]' '[A-Z]'    
    """
}

result.subscribe {
    println it.trim()}

process是整个nextflow的核心部分,流程的主要执行部分在这里完成,定义形式有点像其它脚本语言里的函数,以process关键字开头,可以看看下面这个example,process内从上到下依次是

各种执行(Directives)、输入channel(Inputs)、输出channel(Outputs)、执行条件(when)、脚本(script)

Directives主要有

afterScript(这个process执行后运行的自定义内容,一般为shell脚本,比如用于删除这一步产生的后面不需要的数据)、

beforeScript(这个process执行前运行的自定义内容,一般为shell脚本,比如载入这个process的运行环境)、

executor(指定process执行方式,比如本地执行(local)、提交到pbs(pbs)、提交到(sge)、提交到k8s(k8s)等)

cpus(这个process申请的核心数,在pbs、sge中比较有用)、

memory(这个process申请的内存)、

queue(指定作业提交的集群队列)、

clusterOptions(一般用于指定特定的集群作业提交参数,比如pbs错误输出文件)

MaxErrors(这个process进行错误重试的次数)、

errorStrategy(process出错后的处理策略,比如重试(retry)、忽略(ingore)、终止process(terminate)等)、

tag(使用变量设置process标签,方便在日志中区分process执行过程,比如某个process跑的bwa,其中有个数据的bwa跑错了,因为设置了标签很容易在nextflow的运行日志中找到是哪个数据跑出了问题)、

container(在docker中执行process)

等,其它的Directives见https://www.nextflow.io/docs/latest/process.html#directives

Input和Output关键字分别定义输入、输出channel,详细用法见https://www.nextflow.io/docs/latest/process.html#inputs

when关键字定义process的执行条件,如这个例子中需要同时满足这个两个条件这个process才能被执行。还有另一个比较用途是在调试nextflow过程中注释process,只需要when:false,这样process就不会被执行。

process find {
  input:
  file proteins
  val type from dbtype

  when:
  proteins.name =~ /^BB11.*/ && type == 'nr'

  script:
  """  blastp -query $proteins -db nr  """

}

Script中主要写命令、代码等,需要用一对"""包裹起来,默认是shell脚本,如上一个脚本所示,当然也可以是perl、python等,如下所示

process perlStuff {
    """    #!/usr/bin/perl
    print 'Hi there!' . '\n';    """
}

process pyStuff {
    """    #!/usr/bin/python
    x = 'Hello'    y = 'world!'    print "%s - %s" % (x,y)    """
}

此外,还可以用于条件控制,如下所示,可以根据输入不同,运行不同的程序

seq_to_align = ...
mode = 'tcoffee'

process align {
    input:
    file seq_to_aln from sequences

    script:
    if( mode == 'tcoffee' )
        """        
        t_coffee -in $seq_to_aln > out_file        
        """
    else if( mode == 'mafft' )
        """
        mafft --anysymbol --parttree --quiet $seq_to_aln > out_file        
        """
    else if( mode == 'clustalo' )
        """        
        clustalo -i $seq_to_aln -o out_file        
        """
    else
        error "Invalid alignment mode: ${mode}"
}

channel这东西我也讲不太好,可以结合给的例子和文档看,https://www.nextflow.io/docs/latest/channel.html

config配置文件

为了更方便管理,nextflow脚本写好之后尽量不再修改,所有很多参数的修改可以在config文件中进行。如每个process使用的资源,可以根据需要在config(随便命名)文件中修改,如下 所有的process都提到pbs中,Run_bwa这个process申请4个核,Run_GATK这个process申请8个核

params.genome="/public/home/software/test/callsnp/data/Gbarbadense_genome.fasta"
params.read_file="/public/home/software/test/callsnp/data/*_{1,2}.fastq.gz"
process {
    executor = 'pbs'
    withName: 'Run_bwa' {
        cpus = 4
        memory = 8.GB
        queue = 'batch'
    }
    withName: 'Run_GATK' {
        cpus = 8
        queue = 'high'
    }
}

使用方法为

nextflow run -c config callsnp.nf

更详细的介绍见 https://www.nextflow.io/docs/latest/config.html

执行nextflow

nextflow run

还有一些clean、clone、cloud、log等执行。

run的主要参数有

-c 指定配置文件

-with-trace process执行跟踪文件

-with-timeline process执行时间线,方便看各步骤跑的时间

--,指定输入参数,比如后面的例子中的params.genome这个参数,可以--genome '/public/home/software/test/callsnp/data/Gbarbadense_genome.fasta'这样在运行时指定

more example

这个里面有不少常用的流程

https://github.com/nextflow-io/awesome-nextflow/

下面这个是根据集群上一个同学跑的流程改写的一个nextflow脚本,看着有点长,其实逻辑很简单,写了详细的注释,可以参照改写自己的流程

nextflow_example.nf
//结果输出目录
params.output = "/public/home/software/test/callsnp/output"

//参考基因组
params.genome="/public/home/software/test/callsnp/data/Gbarbadense_genome.fasta"

//reads路径
params.read_file="/public/home/software/test/callsnp/data/*_{1,2}.fastq.gz"

//将参考基因组转换成file对象,以便使用getParent、getBaseName方法获取参考基因组所在路径和去掉.fa .fasta后的basename
ref_file=file(params.genome)
ref_dir=ref_file.getParent()
ref_base=ref_file.getBaseName()

//fromFilePairs默认读入一对reads,可以通过size参数设置
Channel.fromFilePairs(params.read_file)
    .ifEmpty{error "Cannot find  reads $params.read_file"}
    .into {reads}
//判断是否已经存在bwa索引文件,存在则将所有索引文件放入index_exi 这个channel;
//否则运行Run_bwa_index这process来建索引,然后输出到名字为index_exi的channel
Run_bwa_index
if(file(params.genome+".amb").exists()){
    index_exi = Channel.fromPath("${params.genome}.*")
}else{
process Run_bwa_index {

    //申请一个CPU核心
    cpus 1

    //使用PBS作业调度系统,使用local可在本地跑,也可以支持sge、slurm、lsf等调度系统
    executor = 'pbs'    

    //将output文件复制到ref_dir这个目录,mode还可以选move等
    publishDir "ref_dir",mode:'copy'

    //输入文件
    input:
    file genome from ref_file

    //输出
    output:
    file "${genome}.*" into index_exi

    //该process运行的条件,bwa index不存在时运行,其实这个地方有点重复
    when:
    !file(params.genome+".amb").exists()

    //两个"""包裹的部分是用户自己的各种脚本,一般用户需要跑的程序都写在这里,默认是shell脚本,还可以是perl、python、R等
    """
    module purge
    module load BWA
    bwa index $genome
    """
}
}

//给后面GATK运行创建genome index
process Run_dict_fai {

    cpus 1
    executor = 'pbs' 

    //pbs申请的memory,可以直接定义数字,也可以如下动态定义
    //如果这个process运行失败,可以重新开始,每次重新开始申请的内存增加2GB
    memory { 2.GB * task.attempt }

    //process运行出错时的行为,默认直接退出。因为java作业容易因为内存的原因而运行出错,
    //因此这里设置为retry,失败重新运行
    //还可以有ignore、finish、terminate
    errorStrategy 'retry'

    //process运行失败后,重新尝试的次数
    maxRetries 3

    //这个process的output的内容copy到ref_dir目录
    publishDir "ref_dir",mode:'copy'


    input:
    file genome from ref_file

    //将定义的这些文件作为一个整体输出到GATK_index这个channel
    output:
    set file("${genome}"),file("${genome}.fai"),file("${ref_base}.dict") into GATK_index

    //nextflow脚本,这里根据上面memory的值定义了一个内存值给下面的picard使用
    script:
    picardmem = "${task.memory}".replaceAll(/ GB/,"g")

    """
    module purge
    module load picard
    java -Xmx${picardmem} -jar \${EBROOTPICARD}/picard.jar CreateSequenceDictionary R=${genome} O=${ref_base}.dict
    module purge
    module load SAMtools/1.8
    samtools faidx ${genome} > ${genome}.fai
    """
}

//运行BWA比对
process Run_bwa {

    cpus 6
    executor = 'pbs'    

    input:
    //从reads中获取read id和两条read序列
    set val(id),file(readfile) from reads

    //输入bwa index文件,collect方法将这个channel中的所有item集合为一个list
    //否则就会一个个文件循环emit,不符合我们的要求
    file index from index_exi.collect()

    //将read id和生成的Sam文件放入channel bwa_sam中,给后面的process使用
    output:
    set val("${id}"),file("${id}.sam") into bwa_sam

    //process运行条件,true为运行,false为不运行,在此可以不需要这个when条件,仅用于测试
    when:
    true

    //通过输入的bwa_index文件提取bwa build生成的index文件的prefix,注意这里是Gbarbadense_genome.fasta,而不是Gbarbadense_genome
    //当然可以通过在前面的bwa build 加-p 指定生成的index文件的prefix为前面提取的ref_base,那么久可以不用这一步,下面bwa mem的index_base改为ref_base即可
    script:
    index_base = index[0].toString() - ~/.amb/

    """
    module purge
    module load BWA
    bwa mem -M -t ${task.cpus} -R '@RG\\tID:$id\\tLB:HISEQ10X\\tSM:$id' ${index_base} $readfile > ${id}.sam
    """
}

//运行picar,给sam文件排序、去重
process Run_picard {

    executor = 'pbs'    
    cpus 1

    //失败重试,动态资源申请
    memory { 2.GB * task.attempt }
    errorStrategy 'retry'
    maxRetries 3

    //这2个结果后面的process不会用到,但结果可能有用,可以move到自己指定的目录
    publishDir "$params.output/metrics",pattern:"*_metrics.txt",mode:'move'

    //删除不需要的文件
    afterScript     "rm ${samfile};rm ${id}_srt.bam"

    input:

    //Run_bwa生成的output channel作为这一步的input channel
    set val(id),file(samfile) from bwa_sam

    //输入参考基因组
    file genome from ref_file

    output:

    //输出排序后的bam
    file "${id}_srt.bam" into srtbam

    //输出metrics文件
    file "${id}_metrics.txt" into metrics

    //将id和相应的排序去重后的bam文件输入到2个channel中,因为一个channel只能使用一次,后面有2个process需要用到这个output channel的内容
    //所以需要输出2个内容相同、名称不同的channel
    set val("${id}"), file("${id}_srt_redup.bam") into samtools_input,GATK_input

    //所用同上,调试之用
    when:
    true

    //作用同上,用于给java程序设定内存参数
    script:
    picardmem = "${task.memory}".replaceAll(/ GB/,"g")

    """
    module purge
    module load picard
    java -Xmx${picardmem} -jar \${EBROOTPICARD}/picard.jar SortSam I=${samfile} O=${id}_srt.bam SORT_ORDER=coordinate
    java -Xmx${picardmem} -jar \${EBROOTPICARD}/picard.jar MarkDuplicates INPUT=${id}_srt.bam OUTPUT=${id}_srt_redup.bam CREATE_INDEX=true VALIDATION_STRINGENCY=SILENT METRICS_FILE=${id}_metrics.txt
    """
}

//samtools callsnp
process Run_samtools {

    cpus 1
    executor = 'pbs'    

    //output channel中的文件move到指定目录
    publishDir "$params.output/samtools",mode:'move'

    input:
    set val(id),file(bam) from samtools_input
    file genome from ref_file

    output:
    file "${id}.samtools.vcf" into samtoolsvcf

    when:
    true

    """
    module purge
    module load SAMtools/1.8
    module load BCFtools/1.6
    samtools mpileup -t DP -t SP -ugf ${genome} ${bam} |bcftools call --threads ${task.cpus} -mo ${id}.samtools.vcf
    """
}

process Run_GATK {

    cpus 4
    executor = 'pbs'    
    memory { 20.GB * task.attempt }
    errorStrategy 'retry'
    maxRetries 3

    publishDir "$params.output/GATK",mode:'move'

    input:

    //输入bam
    set val(id),file(bam) from GATK_input

    //输入index
    set file(genome),file(fai),file(dict) from GATK_index

    //将所有生成文件输出到GATKvcf channel中
    output:
    file "*" into GATKvcf

    when:
    true

    script:
    gatkmem = "${task.memory}".replaceAll(/ GB/,"g")

    """
    module purge
    module load GATK/3.8-0-Java-1.8.0_92
    java -Xmx${gatkmem} -jar \$EBROOTGATK/GenomeAnalysisTK.jar -R ${genome} -T HaplotypeCaller --emitRefConfidence GVCF -I ${bam} -o ${id}.gatk.vcf --variant_index_type LINEAR --variant_index_parameter 128000 -nct ${task.cpus}
    """
}
本文阅读量  次
本站总访问量  次