跳转至

snakemake

snakemake是一款功能强大的生物信息学流程软件,支持自动化工作流和数据分析。

snakemake可自动解决分析流程之间的步骤依赖、使各步骤按定义好的顺序运行,同时支持所有样本同时并行提交运行,极大地提高了跑分析流程的效率;

snakemake中各流程和软件版本固定之后,可反复重跑,使数据分析具有良好的可重复性;同时通过相关参数设置可以监控流程中的资源使用;

snakemake具有类似于断点续跑的功能,流程跑出错之后,解决问题后继续跑可以自动跳过已经正常跑完的步骤;

snakemake流程写好之后,无需改动即可在本地、云环境、各种集群(slurm/lsf/sge/pbs等)上运行;

snakemake使用python3编写,可在Snakefile文件中直接编写python代码,解决复杂的功能需求;

官方文档:https://snakemake.readthedocs.io/en/stable/

代码库:https://github.com/snakemake-workflows

常用snakemake流程:https://snakemake.github.io/snakemake-workflow-catalog/

学习准备

安装

pip install snakemake
下载测试数据
$ mkdir snakemake-tutorial && cd snakemake-tutorial
$ curl -L https://github.com/snakemake/snakemake-tutorial-data/archive/v5.24.1.tar.gz -o snakemake-tutorial-data.tar.gz
$ tar xfz snakemake-tutorial-data.tar.gz 
$ tree snakemake-tutorial-data-5.24.1/
snakemake-tutorial-data-5.24.1/
├── data
   ├── genome.fa
   ├── genome.fa.amb
   ├── genome.fa.ann
   ├── genome.fa.bwt
   ├── genome.fa.fai
   ├── genome.fa.pac
   ├── genome.fa.sa
   └── samples
       ├── A.fastq
       ├── B.fastq
       └── C.fastq
├── environment.yaml
├── README.md
├── Snakefile
└── upload_google_storage.py

2 directories, 14 files

Note

snakemake默认读取Snakefile文件进行运行,Snakefile文件中记录程序运行过程中的一切,包括数据来源、程序命令、程序参数、结果输出目录等等。

建议将工作流Snakefile文件命名为.py文件,这样在写Python代码时会有语法高亮。

在使用snakemake时,程序会自动寻找当前目录下是否存在有Snakefile文件,如果有的话则会自动根据这个Snakefile文件进行运行。

Snakefile文件命名为.py文件后,使用snakemake时则不会识别到.py文件,这时需要加上-s参数指定这个文件。如 snakemake -s callsnp_smk.py

基本使用

这里以3个样本的bwa比对流程来说明snakemake的基本使用方式。

test1_smk.py
SAMPLES = ['A', 'B', 'C']

rule all:
    input:
        expand("mapped_reads/{sample}.bam", sample=SAMPLES),

rule bwa_map:
    input:
        REF="data/genome.fa",
        FQ="data/samples/{sample}.fastq",
    output:
        BAM="mapped_reads/{sample}.bam",
    shell:
        "bwa mem {input.REF} {input.FQ}| samtools view -Sb - > {output.BAM}"

snakemake的基本组成单位是rule,表示定义了一条规则,对应一个数据处理步骤。

每一个rule包含三个基本元素,分别是inputoutputshell或run或script,分别表示“输入文件”、“输出文件”和“运行命令”。除all这个特殊rule之外,每个rule可以自定义名称,如这里的bwa_map

Important

  • input: 是这个rule执行所需要的输入文件,一般是我们提供的原始数据文件或其它rule的output文件。每个文件一行,可以对每个文件进行命名,方便在shell执行部分引用,如这里的FQ, REF,同时这里使用了通配符{sample},以自动匹配不同的样本。在shell中引用时可以有2中写法{input[FQ]}{input.FQ},这里使用后者,因为可以少敲一个字符;
  • output: 是这个rule执行完会输出的文件,一般是最终结果文件或其它rule的input文件。相关规则与input一致;
  • `shell或run或script: rule真正执行的地方,一般是需要运行的程序或python代码;

snakemake会自动分析不同rule之间的input和output的依赖关系,即一条rule的input是来自哪条rule的output,从而将一条条rule串成一个完整的流程。

稍微复杂的流程中,一般会有个特殊的rule,rule all,snakemake运行的入口,即第一个被执行的rule。一般它只需要定义input,用来定义流程最终输出的结果,一般都是别的rule的output。snakemake从rule all的input回溯查找它是哪个rule的output,如果这个rule的input又是其它的rule的output,就再往上回溯,一直回溯到无法回溯为止,回溯的最后一个rule的input一般是我们的输入文件,如原始的fq文件,最终理清从输入文件到rule allinput的整个流程并依次执行。如果一个rule的input和output不被所有rule依赖(含all),那么该rule不会被执行。如果流程有不同的分支,最终会生成多个需要的结果,那么这些结果都需要在这里定义。

脚本中用到了snakemake的函数expand,可以方便地利用一些简单的列表和基本模板,得到文件列表。一般用在需要多个输入或输出文件的场合。使用举例

expand("sorted_reads/{sample}.bam", sample=SAMPLES)
# ["sorted_reads/A.bam", "sorted_reads/B.bam"]

# 可以提供多个参数列表
expand("sorted_reads/{sample}.{replicate}.bam", sample=SAMPLES, replicate=[0, 1])
# ["sorted_reads/A.0.bam", "sorted_reads/A.1.bam", "sorted_reads/B.0.bam", "sorted_reads/B.1.bam"]

如果脚本有问题这里会报错提醒,这里可以看到每个rule执行的次数,以及具体的命令、参数、输入输出文件。

$ snakemake -np -s test1_smk.py 

$ snakemake -np -s test1_smk.py 
Building DAG of jobs...
Job counts:
    count   jobs
    1   all
    3   bwa_map
    4

[Sun Dec 11 00:55:32 2022]
rule bwa_map:
    input: data/genome.fa, data/samples/A.fastq
    output: mapped_reads/A.bam
    jobid: 1
    wildcards: sample=A

bwa mem data/samples/A.fastq data/genome.fa| samtools view -Sb - > mapped_reads/A.bam

.
.
.

[Sun Dec 11 00:55:32 2022]
rule bwa_map:
    input: data/genome.fa, data/samples/C.fastq
    output: mapped_reads/C.bam
    jobid: 3
    wildcards: sample=C

bwa mem data/samples/C.fastq data/genome.fa| samtools view -Sb - > mapped_reads/C.bam

[Sun Dec 11 00:55:32 2022]
localrule all:
    input: mapped_reads/A.bam, mapped_reads/B.bam, mapped_reads/C.bam
    jobid: 0

Job counts:
    count   jobs
    1   all
    3   bwa_map
    4
This was a dry-run (flag -n). The order of jobs does not reflect the order of execution.

实际运行

$ snakemake -p -s test1_smk.py 

Info

snakemake 对于不存在路径会首先自动创建

更多用法

glob_wildcards

在上面的脚本中,SAMPLES是在脚本中手动指定的,如果样本数比较多、且有一定的规律,可以使用glob_wildcards自动提取样本名称和路径,其具有类似于bash中通配符的功能。

如下,在fastq目录中一共有两组paired-end测序数据:

fastq
├── Sample1.R1.fastq.gz
├── Sample1.R2.fastq.gz
├── Sample2.R1.fastq.gz
└── Sample2.R2.fastq.gz

glob_wildcards可以接受一个或多个通配符表达式,类似{pattern},最后返回一个由list组成的tuple,所以如果只有一个变量则需要添加逗号。glob_wildcards不支持bash中用的通配符(?,*)

# Example 1
(SAMPLE_LIST,) = glob_wildcards("fastq/{sample}.fastq")
需要注意下SAMPLE_LIST后面的逗号。
# Example 2
(genome_builds, samples, ...) = glob_wildcards("{pattern1}/{pattern2}.whatever")
例子2说明,可以接收多个匹配模板。

如果存在下面文件:

grch37/A.bam
grch37/B.bam
grch38/A.bam
grch38/B.bam
(targets, samples) = glob_wildcards("{target}/{sample}.bam")返回的结果将是:
targets = ['grch37', 'grch38']
samples = ['A', 'B']
改写之前的snakemake文件
(SAMPLES,) = glob_wildcards("data/samples/{sample}.fastq") 

rule all:
    input:
        expand("mapped_reads/{sample}.bam", sample=SAMPLES),

rule bwa_map:
    input:
        REF="data/genome.fa",
        FQ="data/samples/{sample}.fastq",
    output:
        BAM="mapped_reads/{sample}.bam",
    threads: 8
    shell:
        "bwa mem -t {threads} {input.REF} {input.FQ}| samtools view -Sb - > {output.BAM}"

输入函数

输入样本文件比较多、命名规则比较复杂时,可以将样本信息写入单独的文件中,编写python函数读取样本信息传递给input。

新建以制表符分隔的sample.txt文件(也可以在excle里写好后直接复制过来)。

Snakefile中可以直接写python,使用pandas读取sample.txt。

Snakefile中的wildcards是snakemake的object,通过wildcards.sample可以获得样本名。

参考:计算流程组织-snakemake基础篇

id fq1 fq2
test1 0.data/test1_R1.fastq.gz 0.data/test1_R2.fastq.gz
test2 0.data/test2_R1.fastq.gz 0.data/test2_R2.fastq.gz
import pandas

def parse_samples(samples_tsv):
    return pandas.read_csv(samples_tsv, sep='\t').set_index("id", drop=False)

def get_sample_id(sample_df, wildcards, col):
    return sample_df.loc[wildcards.sample, [col]].dropna()[0]

_samples = parse_samples("sample.txt")

rule all:
    input:
        expand("2.rmhost/{sample}_{R}.rmhost.fq.gz", sample=_samples.index, R=["1","2"])

rule trim:
    input:
        r1 = lambda wildcards: get_sample_id(_samples, wildcards, "fq1"),
        r2 = lambda wildcards: get_sample_id(_samples, wildcards, "fq2")

线程数

对于多线程运行的程序,可使用threads定义线程数,如下所示:

(SAMPLES,) = glob_wildcards("data/samples/{sample}.fastq") 

rule all:
    input:
        expand("mapped_reads/{sample}.bam", sample=SAMPLES),

rule bwa_map:
    input:
        REF="data/genome.fa",
        FQ="data/samples/{sample}.fastq",
    output:
        BAM="mapped_reads/{sample}.bam",
    threads: 8
    shell:
        "bwa mem -t {threads} {input.REF} {input.FQ}| samtools view -Sb - > {output.BAM}"
运行时用--cores指定整个流程同时最多使用的核心数,如--cores 10,流程只能并行跑一个,但如果还有2线程的rule,那还可以同时跑这个2线程的rule;--cores 30,流程就可以并行跑3个。使用threads定义rule中程序使用的线程数,方便后面运行时控制流程的资源消耗。

规则参数

有时在shell中,需要根据不同的样本使用不同的参数,该参数其既不是输入文件,也不是输出文件,如bwa mem -R 参数。如果把这些参数放在input里,则会因为找不到文件而出错。snakemake使用params关键字来设置这些参数。

(SAMPLES,) = glob_wildcards("data/samples/{sample}.fastq") 

rule all:
    input:
        expand("mapped_reads/{sample}.bam", sample=SAMPLES),

rule bwa_map:
    input:
        REF="data/genome.fa",
        FQ="data/samples/{sample}.fastq",
    output:
        BAM="mapped_reads/{sample}.bam",
    threads: 8
    params:
        rg="@RG\tID:{sample}\tSM:{sample}"
    shell:
        "bwa mem -R '{params[rg}' -t {threads} {input.REF} {input.FQ}| samtools view -Sb - > {output.BAM}"

日志

当流程较复杂时,可保存每个rule的输出日志,而不是输出到屏幕,方便运行出错时排查。snakemake使用关键字log定义输出的日志。运行出错时,在log里面定义的文件不会被snakemake删掉,而output里面的文件则会被删除。

(SAMPLES,) = glob_wildcards("data/samples/{sample}.fastq") 

rule all:
    input:
        expand("mapped_reads/{sample}.bam", sample=SAMPLES),

rule bwa_map:
    input:
        REF="data/genome.fa",
        FQ="data/samples/{sample}.fastq",
    output:
        BAM="mapped_reads/{sample}.bam",
    threads: 8
    params:
        rg="@RG\tID:{sample}\tSM:{sample}"
    log:
        "logs/bwa_mem/{sample}.log"
    shell:
        "bwa mem -R '{params.rg}' -t {threads} {input.REF} {input.FQ}| samtools view -Sb - > {output.BAM}"

临时文件和受保护的文件

snakemake使用temp()来将一些文件标记成临时文件,在执行结束后自动删除。temp()可以用于删除流程中不需要保留的中间文件。

部分重要的中间文件和结果文件,为避免误删,可以使用protected()保护起来,此时snakemake就会在文件系统中对该输出文件写保护,也就是最后的权限为-r--r--r--, 在删除的时候会提醒rm: remove write-protected regular file ‘A.bam’?

(SAMPLES,) = glob_wildcards("data/samples/{sample}.fastq") 

rule all:
    input:
        expand("mapped_reads/{sample}.bam", sample=SAMPLES),

rule bwa_map:
    input:
        REF="data/genome.fa",
        FQ="data/samples/{sample}.fastq",
    output:
       protected(BAM="mapped_reads/{sample}.bam"),
    threads: 8
    params:
        rg="@RG\tID:{sample}\tSM:{sample}"
    log:
        "logs/bwa_mem/{sample}.log"
    shell:
        "bwa mem -R '{params.rg}' -t {threads} {input.REF} {input.FQ}| samtools view -Sb - > {output.BAM}"

benchmarks

在该关键字下的文件会自动记录该规则运行所消耗的时间和内存。同时,在该关键字下,还可以重复多次运行同一程序,从而评估该程序消耗资源的均值。具体使用 repeat("benchmarks/{sample}.bwa.benchmark.txt", 3) ,这样则会重复运行3次结果。

(SAMPLES,) = glob_wildcards("data/samples/{sample}.fastq") 

rule all:
    input:
        expand("mapped_reads/{sample}.bam", sample=SAMPLES),

rule bwa_map:
    input:
        REF="data/genome.fa",
        FQ="data/samples/{sample}.fastq",
    output:
       protected(BAM="mapped_reads/{sample}.bam"),
    threads: 8
    params:
        rg="@RG\tID:{sample}\tSM:{sample}"
    log:
        "logs/bwa_mem/{sample}.log"
    benchmark:
        "benchmarks/{sample}.bwa.benchmark.txt"
    shell:
        "bwa mem -R '{params.rg}' -t {threads} {input.REF} {input.FQ}| samtools view -Sb - > {output.BAM}"

失败重跑

retries定义rule失败重跑次数 snakemake --retries 定义全局的重跑次数

配置文件

为了使snakemake脚本更加通用,可以使用配置文件,将分析需要用到的原始数据、参考基因组等写入到配置文件中。

配置文件可以用JSON或YAML语法进行写,然后用configfile: "config.yaml"读取成字典,变量名为config

如下所示,将参考基因组、原始数据后缀(复杂的如_R1.fq.gz)和BWA的线程数写在配置文件中,后期更改参数更方便。

ref: "data/genome.fa"
raw_suf: ".fastq"
bwa_threads: 8
configfile: "config.yaml"

raw_suf=config["raw_suf"]

(SAMPLES,) = glob_wildcards("data/samples/{sample}"+raw_suf) 

rule all:
    input:
        expand("mapped_reads/{sample}.bam", sample=SAMPLES),

rule bwa_map:
    input:
        REF=config["ref"],
        FQ="data/samples/{sample}.fastq"+{raw_suf},
    output:
    protected(BAM="mapped_reads/{sample}.bam"),
    threads: config["bwa_threads"]
    params:
        rg="@RG\tID:{sample}\tSM:{sample}"
    log: 
        "logs/bwa_mem/{sample}.log"
    benchmark:
        "benchmarks/{sample}.bwa.benchmark.txt"
    shell:
        "bwa mem -R '{params.rg}' -t {threads} {input.REF} {input.FQ}| samtools view -Sb - > {output.BAM}"

wrapper

将各种比对软件等常用步骤封装好,使用时直接调用,见

https://snakemake-wrappers.readthedocs.io/en/stable/

里面有很多有用的代码可以借鉴。

singularity

snakemake支持使用singularity简化分析环境部署,可以有2种方式使用:利用snakemake特性;将singularity当普通程序原生使用;

snakemake --use-singularity --cores 30 -p -s bwa_map_singularitysmk.py

使用singularity的额外参数有2个:--use-singularity(固定的,必须加); --singularity-args 后面跟一段传递给singularity的参数,一般用双引号括起来(有空格),最常用的是--bind挂载,冒号前后分别是需要挂载的宿主系统的文件地址和对应的挂载到容器虚拟机文件上的地址,如--singularity-args "--bind /data/:/mnt/ "

snakemake使用singularity特性有个限制,就是每个rule只能加载一个镜像,rule需要用到多个程序时,无法使用。

(SAMPLES,) = glob_wildcards("data/samples/{sample}.fastq")

rule all:
    input:
        expand("mapped_reads/{sample}.bam", sample=SAMPLES),

rule bwa_map:
    input:
        REF="data/genome.fa",
        FQ="data/samples/{sample}.fastq",
    output:
    BAM="mapped_reads/{sample}.bam",
    threads: 8
    params:
        rg="@RG\\tID:{sample}\\tSM:{sample}"
    log:
        "logs/bwa_mem/{sample}.log"
    benchmark:
        "benchmarks/{sample}.bwa.benchmark.txt"
    singularity:
        'bwa_0.7.17--h7132678_9'
    shell:
        """
        bwa mem -R '{params.rg}' -t {threads} {input.REF} {input.FQ} > {output.BAM}
        """

将singularity当普通程序使用

(SAMPLES,) = glob_wildcards("data/samples/{sample}.fastq")

rule all:
    input:
        expand("mapped_reads/{sample}.bam", sample=SAMPLES),

rule bwa_map:
    input:
        REF="data/genome.fa",
        FQ="data/samples/{sample}.fastq",
    output:
    BAM="mapped_reads/{sample}.bam",
    threads: 8
    params:
        rg="@RG\\tID:{sample}\\tSM:{sample}"
    log:
        "logs/bwa_mem/{sample}.log"
    benchmark:
        "benchmarks/{sample}.bwa.benchmark.txt"
    shell:
        """
        module load Singularity
        singularity exec ./bwa_0.7.17--h7132678_9 bwa mem -R '{params.rg}' -t {threads} {input.REF} {input.FQ} > {output.BAM}
        """

完整流程

bcftools call snp

snakemake官方文档中bcftools call snp 流程

文件

$ tree snakemake-tutorial-data-5.24.1/
snakemake-tutorial-data-5.24.1/
├── data
   ├── genome.fa
   ├── genome.fa.amb
   ├── genome.fa.ann
   ├── genome.fa.bwt
   ├── genome.fa.fai
   ├── genome.fa.pac
   ├── genome.fa.sa
   └── samples
       ├── A.fastq
       ├── B.fastq
       └── C.fastq
├── environment.yaml
├── README.md
├── Snakefile
└── upload_google_storage.py

2 directories, 14 files
测试检查是否出错
$ snakemake -np -s callsnp_bcftools_smk.py
本地运行,使用20核
$ snakemake --cores 20 -p -s callsnp_bcftools_smk.py
集群运行,同时提交10个作业
$ snakemake   --cluster  'bsub -n {threads}'  -j 10  -s callsnp_bcftools_smk.py

ref: "data/genome.fa"
raw_suf: ".fastq"
bwa_threads: 8
configfile: "config.yaml"

raw_suf=config["raw_suf"]
ref=config["ref"]

(SAMPLES,) = glob_wildcards("data/samples/{sample}"+raw_suf) 

rule all:
    input:
        "calls/all.vcf"

rule bwa_map:
    input:
        REF=ref,
        FQ="data/samples/{sample}"+raw_suf,
    output:
        BAM="mapped_reads/{sample}.bam",
    threads: config["bwa_threads"]
    params:
        rg="@RG\tID:{sample}\tSM:{sample}"
    shell:
        "bwa mem -R '{params[rg}' -t {threads} {input.REF} {input.FQ}| samtools view -Sb - > {output.BAM}"


rule samtools_sort:
    input:
        BAM="mapped_reads/{sample}.bam"
    output:
        SRT_BAM="sorted_reads/{sample}.bam"
    shell:
        "samtools sort -O bam {input.BAM} > {output.SRT_BAM}"

rule samtools_index:
    input:
        BAM="sorted_reads/{sample}.bam"
    output:
        BAI="sorted_reads/{sample}.bam.bai"
    shell:
        "samtools index {input.BAM}"

rule bcftools_call:
    input:
        REF=ref,
        BAM=expand("sorted_reads/{sample}.bam", sample=SAMPLES),
        BAI=expand("sorted_reads/{sample}.bam.bai", sample=SAMPLES)
    output:
        "calls/all.vcf"
    shell:
        "bcftools mpileup -f {input.REF} {input.BAM} | "
        "bcftools call -mv - > {output}"

GATK4 call snp

下面是GATK4 call snp流程,没有使用config文件

测试检查是否出错

$ snakemake -np -s GATK4_smk.py
本地运行,使用20核
$ snakemake --cores 20 -p -s GATK4_smk.py
集群运行,同时提交10个作业
$ snakemake   --cluster  'bsub -n {threads}'  -j 10  -s GATK4_smk.py

REF="ref/genome.assembly.fa"
PFX="genome.assembly"
RAWDATA="data"

(SAMPLES,READS,) = glob_wildcards(RAWDATA+"/{sample}_{read}.fq.gz")
print(SAMPLES)

rule all:
    input:
        expand("{genome}.bwt", genome=REF),# bwa索引
        expand("{genome}.fai", genome=REF),# samtools索引
        "ref/"+PFX+".dict", # GATK索引
        "result/03.snp/raw_variants.vcf", # 最终需要的结果

rule trim_galore:
    input:
        R1 = RAWDATA+"/{sample}_1.fq.gz",
        R2 = RAWDATA+"/{sample}_2.fq.gz"
    output:
        R1 = "result/01.data/trim/{sample}/{sample}_1_val_1.fq.gz",
        R2 = "result/01.data/trim/{sample}/{sample}_2_val_2.fq.gz",
    threads:8
    params:
        dirs = "result/01.data/trim/{sample}"
    log:
        "logs/trim/{sample}.log"
    shell:
        """
        module load TrimGalore/0.6.6
        echo {output.R1} {output.R2}
        trim_galore -j {threads} -q 25 --phred33 --length 36 -e 0.1 --stringency 3 --paired -o {params.dirs} --fastqc {input.R1} {input.R2} 
        """

rule bwa_index:
    input:
        genome = REF
    output:
        "{genome}.bwt",
    shell:
        """
        module load BWA/0.7.17
        bwa index {input.genome}
        """

rule bwa_map:
    input:
        R1 = "result/01.data/trim/{sample}/{sample}_1_val_1.fq.gz",
        R2 = "result/01.data/trim/{sample}/{sample}_2_val_2.fq.gz",
        genome_index = expand("{genome}.bwt",genome=REF)
    output:
        "result/02.align/{sample}_sort.bam"
    params:
        genome = REF,
    threads:16
    shell:
        """
        module load BWA/0.7.17
        module load SAMtools/1.9
        bwa mem -R \'@RG\\tID:foo\\tSM:bar\\tLB:Abace\' -t {threads} {params.genome} {input.R1} {input.R2}| samtools sort -@{threads} -o {output}
        #注意,双引号里面的特殊符号(()''%&等)会产生歧义,需要使用反斜杠符号进行注释。
        """

rule samtools_faidx: # samtools索引
    input:
        genome = REF
    output:
        "{genome}.fai"
    shell:
        """
        module load SAMtools/1.9
        samtools faidx {input.genome}
        """

rule GATK_dict: # GATK建立索引
    input:
        genome = REF
    output:
        dict = "ref/{genome_profix}.dict"
    shell:
        """
        module load GATK/4.1.9.0
        gatk CreateSequenceDictionary -R  {input.genome} -O {output.dict}
        """

rule GATK_MarkDuplicates:
    input:
        sample = "result/02.align/{sample}_sort.bam",
    output:
        dup_bam = "result/02.align/{sample}_sort_dup.bam"
    params:
        genome = REF
    log:
        metrics = "result/02.align/log/{sample}_markdup_metrics.txt"
    shell:
        """
        module load GATK/4.1.9.0
        gatk  MarkDuplicates -I {input.sample} -O {output.dup_bam} -M {log.metrics}
        """

rule samtools_index:
    input:
        "result/02.align/{sample}_sort_dup.bam"
    output:
        "result/02.align/{sample}_sort_dup.bam.bai"
    shell:
        """
        module load SAMtools/1.9
        samtools index {input}
        """

rule GATK_HaplotypeCaller:
    input:
        sample = "result/02.align/{sample}_sort_dup.bam",
        index = "result/02.align/{sample}_sort_dup.bam.bai"
    output:
        sample_vcf = "result/03.snp/{sample}.g.vcf"
    params:
        genome = REF
    threads: 4
    shell:
        """
        module load GATK/4.1.9.0
        gatk HaplotypeCaller -ERC GVCF -R {params.genome} -I {input.sample} -O {output.sample_vcf} --annotate-with-num-discovered-alleles true --native-pair-hmm-threads {threads}
        """
rule GATK_CombineGVCFs:
    input:
        sample_vcf = expand("result/03.snp/{sample}.g.vcf", sample=SAMPLES)
    output:
        cohort_vcf = "result/03.snp/cohort.g.vcf"
    params:
        genome = REF
    log:
        "logs/GATK_CombineGVCFs/GATK_CombineGVCFs.log"
    shell:        
        """
        module load GATK/4.1.9.0
        input_gvcf=$(ls {input.sample_vcf} |uniq|xargs -i echo "--variant {{}}")
        gatk CombineGVCFs  -R {params.genome} $input_gvcf -O {output.cohort_vcf}
        """

rule GATK_GenotypeGVCFs:
    input:
        cohort_vcf = "result/03.snp/cohort.g.vcf"
    output:
        raw_vcf = "result/03.snp/raw_variants.vcf"
    params:
        genome = REF
    shell:        
        """
        module load GATK/4.1.9.0
        gatk GenotypeGVCFs  -R {params.genome} -G StandardAnnotation -V {input.cohort_vcf} -O {output.raw_vcf}
        """

rule gatk_SelectVariants: #筛选snp位点
    input:
        sample = "result/03.snp/raw_variants.vcf",
    output:
        snp = "result/03.snp/snp.vcf"
    params:
        genome = REF
    shell:                
        """
        module load GATK/4.1.9.0
        gatk SelectVariants -R {params.genome} -V {input.sample} -O {output.snp} --select-type-to-include SNP
        """

运行选项

snakemake的运行选项非常多,这里列出一些比较常用的运行方式。

运行前检查潜在错误:

snakemake -n
snakemake -np
snakemake -nr
# --dryrun/-n: 不真正执行
# --printshellcmds/-p: 输出要执行的shell命令
# --reason/-r: 输出每条rule执行的原因
直接运行:
snakemake
snakemake -s Snakefile -j 4
# -s/--snakefile 指定Snakefile,否则是当前目录下的Snakefile
# --cores/--jobs/-j N: 指定并行数,如果不指定N,则使用当前最大可用的核心数
强制重新运行:
snakemake -f
# --forece/-f: 强制执行选定的目标,或是第一个规则,无论是否已经完成
snakemake -F
# --forceall/-F: 也是强制执行,同时该规则所依赖的规则都要重新执行
snakemake -R some_rule
# --forecerun/-R TARGET: 重新执行给定的规则或生成文件。当你修改规则的时候,使用该命令
运行出错后重新运行:
snakemake --restart-times 3 
可视化:
snakemake --dag  | dot -Tsvg > dag.svg
snakemake --dag  | dit -Tpdf > dag.pdf
# --dag: 生成依赖的有向图
snakemake --gui 0.0.0.0:2468
# --gui: 通过网页查看运行状态

lsf集群上运行

使用profile文件

第一次使用需要先配置lsf profile,之后就可以直接使用

cp -r /public/home/software/.config/snakemake/ ~/.config/
使用
snakemake  --profile lsf  -j 100 -s callsnp_smk.py
该profile可以自动获取每个rule的threads并该rule的作业申请相应的核数。

如果对内存有要求,可以在rule中使用resources关键字进行设置

resources:
    mem_mb=10000 # 单位为MB,这里设置内存需求为10G
使用lsf.yaml配置文件可以设置默认的全局参数,以及为每个rule设置不同的参数。注意lsf.yaml文件需要与snakemake文件在同一目录下。

lsf.yaml
__default__:
  - "-q q2680v2"

bwa_map:
  - "-q high"

Warning

这个profile默认申请的内存是1GB,如果在运行过程中内存使用较多,可以适当增大,否则会出现因为内存不够作业被杀死的情况,报错的时候不是很好排查。如果有出现莫名其妙的报错,可以在out文件中看一下maxmem和request mem的大小,两者接近的话,很可能是内存的问题。

更多使用方法见 lsf profile

使用cluster选项

基本使用

snakemake   --cluster  'bsub -n {threads}'  -j 100  -s callsnp_smk.py
使用--cluster-config选项可以自定义更多个性化选项,如

snakemake  --cluster-config cluster.json --cluster  \
'bsub -n {threads} -q {cluster.queue} -R {cluster.resources}'  -j 100  -s callsnp_smk.py
{
    "__default__" :
    {
        "queue"     : "normal",
        "nCPUs"     : "1",
        "memory"    : 20000,
        "resources" : "\"rusage[mem=20000]\"",
        "name"      : "JOBNAME.{rule}.{wildcards}",
        "output"    : "logs/cluster/{rule}.{wildcards}.out",
        "error"     : "logs/cluster/{rule}.{wildcards}.err"
    },


    "bwa_map" :
    {
        "queue"     : "high",
        "memory"    : 30000,
        "resources" : "\"rusage[mem=30000]\"",
    }
}

使用drmaa库

Distributed Resource Management Application API (DRMAA),即分布式资源管理应用程序API,是一种高级 开放网格论坛(Open_Grid_Forum)应用程序接口规范,用于向分布式资源管理(DRM)系统(例如集群或网格计算提交和控制作业)。API的范围涵盖了应用程序提交,控制和监视DRM系统中执行资源上的作业所需的所有高级功能。

各主流作业调度系统一般都有对应的DRMAA库,使用C、C++、Perl、Python等语言实现,由官方或第三方开发维护。

使用集群的module中的snakemake,load时DRMAA库可自动加载。

用户自行安装的snakemake需要自行引入DRMAA库。

export DRMAA_LIBRARY_PATH=/public/apps/lsf-drmaa/lib/libdrmaa.so
使用
snakemake   --drmaa ' -n {threads}' -j 100 -s callsnp_smk.py

Summary

集群运行时目前官方推荐使用profile的方式,使用--cluster选项属于过去式,但这种方式灵活度最高;

drmaa库在各个作业调度系统中的实现完成度各不相同,可酌情选择。lsf的drmaa库由IBM官方维护,相对比较可靠;

注意事项

  • {}使用

    script中将{}作为snakemake定义的变量,因此在shell中使用{}时会出现报错。

    cat {input[chr]}|awk '{print $1"\t0\t"$2}' > {prefix}.bed

    NameError: The name 'print $1"\t0\t"$2' is unknown in this context. Please make sure that you defined that variable. Also note that braces not used for variable access have to be escaped by repeating them, i.e. {{print $1}}
    
    需要将awk中的{}改成{{}},如cat {input[chr]}|awk '{{print $1"\t0\t"$2}}' > {prefix}.bed

本文阅读量  次
本站总访问量  次