Clumping GWAS Results

Introduction

After performing a genome-wide association study (GWAS) for a given phenotype, an analyst might want to clump the association results based on the correlation between variants and p-values. The goal is to get a list of independent associated loci accounting for linkage disequilibrium between variants.

For example, given a region of the genome with three variants: SNP1, SNP2, and SNP3. SNP1 has a p-value of 1e-8, SNP2 has a p-value of 1e-7, and SNP3 has a p-value of 1e-6. The correlation between SNP1 and SNP2 is 0.95, SNP1 and SNP3 is 0.8, and SNP2 and SNP3 is 0.7. We would want to report SNP1 is the most associated variant with the phenotype and “clump” SNP2 and SNP3 with the association for SNP1.

Hail is a highly flexible tool for performing analyses on genetic datasets in a parallel manner that takes advantage of a scalable compute cluster. However, LD-based clumping is one example of many algorithms that are not available in Hail, but are implemented by other bioinformatics tools such as PLINK. We use Batch to enable functionality unavailable directly in Hail while still being able to take advantage of a scalable compute cluster.

To demonstrate how to perform LD-based clumping with Batch, we’ll use the 1000 Genomes dataset from the Hail GWAS tutorial. First, we’ll write a Python Hail script that performs a GWAS for caffeine consumption and exports the results as a binary PLINK file and a TSV with the association results. Second, we’ll build a docker image containing the custom GWAS script and Hail pre-installed and then push that image to the Google Container Repository. Lastly, we’ll write a Python script that creates a Batch workflow for LD-based clumping with parallelism across chromosomes and execute it with the Batch Service. The job computation graph will look like the one depicted in the image below:

../_images/cookbook_clumping.png

Hail GWAS Script

We wrote a stand-alone Python script run_gwas.py that takes a VCF file, a phenotypes file, the output destination file root, and the number of cores to use as input arguments. The Hail code for performing the GWAS is described here. We export two sets of files to the file root defined by --output-file. The first is a binary PLINK file set with three files ending in .bed, .bim, and .fam. We also export a file with two columns SNP and P which contain the GWAS p-values per variant.

Notice the lines highlighted below. Hail will attempt to use all cores on the computer if no defaults are given. However, with Batch, we only get a subset of the computer, so we must explicitly specify how much resources Hail can use based on the input argument --cores.

run_gwas.py
import argparse
import hail as hl


def run_gwas(vcf_file, phenotypes_file, output_file):
    table = hl.import_table(phenotypes_file, impute=True).key_by('Sample')

    hl.import_vcf(vcf_file).write('tmp.mt')
    mt = hl.read_matrix_table('tmp.mt')

    mt = mt.annotate_cols(pheno=table[mt.s])
    mt = hl.sample_qc(mt)
    mt = mt.filter_cols((mt.sample_qc.dp_stats.mean >= 4) & (mt.sample_qc.call_rate >= 0.97))
    ab = mt.AD[1] / hl.sum(mt.AD)
    filter_condition_ab = (
        (mt.GT.is_hom_ref() & (ab <= 0.1))
        | (mt.GT.is_het() & (ab >= 0.25) & (ab <= 0.75))
        | (mt.GT.is_hom_var() & (ab >= 0.9))
    )
    mt = mt.filter_entries(filter_condition_ab)
    mt = hl.variant_qc(mt)
    mt = mt.filter_rows(mt.variant_qc.AF[1] > 0.01)

    eigenvalues, pcs, _ = hl.hwe_normalized_pca(mt.GT)

    mt = mt.annotate_cols(scores=pcs[mt.s].scores)

    gwas = hl.linear_regression_rows(
        y=mt.pheno.CaffeineConsumption,
        x=mt.GT.n_alt_alleles(),
        covariates=[1.0, mt.pheno.isFemale, mt.scores[0], mt.scores[1], mt.scores[2]],
    )

    gwas = gwas.select(SNP=hl.variant_str(gwas.locus, gwas.alleles), P=gwas.p_value)
    gwas = gwas.key_by(gwas.SNP)
    gwas = gwas.select(gwas.P)
    gwas.export(f'{output_file}.assoc', header=True)

    hl.export_plink(mt, output_file, fam_id=mt.s, ind_id=mt.s)


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--vcf', required=True)
    parser.add_argument('--phenotypes', required=True)
    parser.add_argument('--output-file', required=True)
    parser.add_argument('--cores', required=False)
    args = parser.parse_args()

    if args.cores:
        hl.init(master=f'local[{args.cores}]')

    run_gwas(args.vcf, args.phenotypes, args.output_file)

Docker Image

A Python script alone does not define its dependencies such as on third-party packages. For example, to execute the run_gwas.py script above, Hail must be installed as well as the libraries Hail depends on. Batch uses Docker images to define these dependencies including the type of operating system and any third-party software dependencies. The Hail team maintains a Docker image, hailgenetics/hail, for public use with Hail already installed. We extend this Docker image to include the run_gwas.py script.

Dockerfile
FROM hailgenetics/hail:0.2.37

COPY run_gwas.py /

The following Docker command builds this image:

docker pull hailgenetics/hail:0.2.37
docker build -t 1kg-gwas -f Dockerfile .

Batch can only access images pushed to a Docker repository. You have two repositories available to you: the public Docker Hub repository and your project’s private Google Container Repository (GCR). It is not advisable to put credentials inside any Docker image, even if it is only pushed to a private repository.

The following Docker command pushes the image to GCR:

docker tag 1kg-gwas us-docker.pkg.dev/<MY_PROJECT>/1kg-gwas
docker push us-docker.pkg.dev/<MY_PROJECT>/1kg-gwas

Replace <MY_PROJECT> with the name of your Google project. Ensure your Batch service account can access images in GCR.

Batch Script

The next thing we want to do is write a Hail Batch script to execute LD-based clumping of association results for the 1000 genomes dataset.

Functions

GWAS

To start, we will write a function that creates a new Job on an existing Batch that takes as arguments the VCF file and the phenotypes file. The return value of this function is the Job that is created in the function, which can be used later to access the binary PLINK file output and association results in downstream jobs.

def gwas(batch, vcf, phenotypes):
    """
    QC data and get association test statistics
    """
    cores = 2
    g = batch.new_job(name='run-gwas')
    g.image('us-docker.pkg.dev/<MY_PROJECT>/1kg-gwas:latest')
    g.cpu(cores)
    g.declare_resource_group(ofile={
        'bed': '{root}.bed',
        'bim': '{root}.bim',
        'fam': '{root}.fam',
        'assoc': '{root}.assoc'
    })
    g.command(f'''
python3 /run_gwas.py \
    --vcf {vcf} \
    --phenotypes {phenotypes} \
    --output-file {g.ofile} \
    --cores {cores}
''')
    return g

A couple of things to note about this function:

  • The image is the image created in the previous step. We copied the run_gwas.py script into the root directory /. Therefore, to execute the run_gwas.py script, we call /run_gwas.py.

  • The run_gwas.py script takes an output-file parameter and then creates files ending with the extensions .bed, .bim, .fam, and .assoc. In order for Batch to know the script is creating files as a group with a common file root, we need to use the BashJob.declare_resource_group() method. We then pass g.ofile as the output file root to run_gwas.py as that represents the temporary file root given to all files in the resource group ({root} when declaring the resource group).

Clumping By Chromosome

The second function performs clumping for a given chromosome. The input arguments are the Batch for which to create a new BashJob, the PLINK binary file root, the association results with at least two columns (SNP and P), and the chromosome for which to do the clumping for. The return value is the new BashJob created.

def clump(batch, bfile, assoc, chr):
    """
    Clump association results with PLINK
    """
    c = batch.new_job(name=f'clump-{chr}')
    c.image('hailgenetics/genetics:0.2.37')
    c.memory('1Gi')
    c.command(f'''
plink --bfile {bfile} \
    --clump {assoc} \
    --chr {chr} \
    --clump-p1 0.01 \
    --clump-p2 0.01 \
    --clump-r2 0.5 \
    --clump-kb 1000 \
    --memory 1024

mv plink.clumped {c.clumped}
''')
    return c

A couple of things to note about this function:

  • We use the image hailgenetics/genetics which is a publicly available Docker image from Docker Hub maintained by the Hail team that contains many useful bioinformatics tools including PLINK.

  • We explicitly tell PLINK to only use 1Gi of memory because PLINK defaults to using half of the machine’s memory. PLINK’s memory-available detection mechanism is unfortunately unaware of the memory limit imposed by Batch. Not specifying resource requirements correctly can cause performance degradations with PLINK.

  • PLINK creates a hard-coded file plink.clumped. We have to move that file to a temporary Batch file {c.clumped} in order to use that file in downstream jobs.

Merge Clumping Results

The third function concatenates all of the clumping results per chromosome into a single file with one header line. The inputs are the Batch for which to create a new BashJob and a list containing all of the individual clumping results files. We use the ubuntu:22.04 Docker image for this job. The return value is the new BashJob created.

def merge(batch, results):
    """
    Merge clumped results files together
    """
    merger = batch.new_job(name='merge-results')
    merger.image('ubuntu:22.04')
    if results:
        merger.command(f'''
head -n 1 {results[0]} > {merger.ofile}
for result in {" ".join(results)}
do
    tail -n +2 "$result" >> {merger.ofile}
done
sed -i -e '/^$/d' {merger.ofile}
''')
    return merger

Control Code

The last thing we want to do is use the functions we wrote above to create new jobs on a Batch which can be executed with the ServiceBackend.

First, we import the Batch module as hb.

import hailtop.batch as hb

Next, we create a Batch specifying the backend is the ServiceBackend and give it the name ‘clumping’.

backend = hb.ServiceBackend()
batch = hb.Batch(backend=backend, name='clumping')

We create InputResourceFile objects for the VCF file and phenotypes file using the Batch.read_input() method. These are the inputs to the entire Batch and are not outputs of a BashJob.

vcf = batch.read_input('gs://hail-tutorial/1kg.vcf.bgz')
phenotypes = batch.read_input('gs://hail-tutorial/1kg_annotations.txt')

We use the gwas function defined above to create a new job on the batch to perform a GWAS that outputs a binary PLINK file and association results:

g = gwas(batch, vcf, phenotypes)

We call the clump function once per chromosome and aggregate a list of the clumping results files passing the outputs from the g job defined above as inputs to the clump function:

results = []
for chr in range(1, 23):
    c = clump(batch, g.ofile, g.ofile.assoc, chr)
    results.append(c.clumped)

Finally, we use the merge function to concatenate the results into a single file and then write this output to a permanent location using Batch.write_output(). The inputs to the merge function are the clumped output files from each of the clump jobs.

m = merge(batch, results)
batch.write_output(m.ofile, 'gs://<MY_BUCKET>/batch-clumping/1kg-caffeine-consumption.clumped')

The last thing we do is submit the Batch to the service and then close the Backend:

batch.run(open=True, wait=False)  # doctest: +SKIP
backend.close()

Synopsis

We provide the code used above in one place for your reference:

run_gwas.py
import argparse
import hail as hl


def run_gwas(vcf_file, phenotypes_file, output_file):
    table = hl.import_table(phenotypes_file, impute=True).key_by('Sample')

    hl.import_vcf(vcf_file).write('tmp.mt')
    mt = hl.read_matrix_table('tmp.mt')

    mt = mt.annotate_cols(pheno=table[mt.s])
    mt = hl.sample_qc(mt)
    mt = mt.filter_cols((mt.sample_qc.dp_stats.mean >= 4) & (mt.sample_qc.call_rate >= 0.97))
    ab = mt.AD[1] / hl.sum(mt.AD)
    filter_condition_ab = (
        (mt.GT.is_hom_ref() & (ab <= 0.1))
        | (mt.GT.is_het() & (ab >= 0.25) & (ab <= 0.75))
        | (mt.GT.is_hom_var() & (ab >= 0.9))
    )
    mt = mt.filter_entries(filter_condition_ab)
    mt = hl.variant_qc(mt)
    mt = mt.filter_rows(mt.variant_qc.AF[1] > 0.01)

    eigenvalues, pcs, _ = hl.hwe_normalized_pca(mt.GT)

    mt = mt.annotate_cols(scores=pcs[mt.s].scores)

    gwas = hl.linear_regression_rows(
        y=mt.pheno.CaffeineConsumption,
        x=mt.GT.n_alt_alleles(),
        covariates=[1.0, mt.pheno.isFemale, mt.scores[0], mt.scores[1], mt.scores[2]],
    )

    gwas = gwas.select(SNP=hl.variant_str(gwas.locus, gwas.alleles), P=gwas.p_value)
    gwas = gwas.key_by(gwas.SNP)
    gwas = gwas.select(gwas.P)
    gwas.export(f'{output_file}.assoc', header=True)

    hl.export_plink(mt, output_file, fam_id=mt.s, ind_id=mt.s)


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--vcf', required=True)
    parser.add_argument('--phenotypes', required=True)
    parser.add_argument('--output-file', required=True)
    parser.add_argument('--cores', required=False)
    args = parser.parse_args()

    if args.cores:
        hl.init(master=f'local[{args.cores}]')

    run_gwas(args.vcf, args.phenotypes, args.output_file)
Dockerfile
FROM hailgenetics/hail:0.2.37

COPY run_gwas.py /
batch_clumping.py
import hailtop.batch as hb


def gwas(batch, vcf, phenotypes):
    """
    QC data and get association test statistics
    """
    cores = 2
    g = batch.new_job(name='run-gwas')
    g.image('us-docker.pkg.dev/<MY_PROJECT>/1kg-gwas:latest')
    g.cpu(cores)
    g.declare_resource_group(
        ofile={'bed': '{root}.bed', 'bim': '{root}.bim', 'fam': '{root}.fam', 'assoc': '{root}.assoc'}
    )
    g.command(f"""
python3 /run_gwas.py \
    --vcf {vcf} \
    --phenotypes {phenotypes} \
    --output-file {g.ofile} \
    --cores {cores}
""")
    return g


def clump(batch, bfile, assoc, chr):
    """
    Clump association results with PLINK
    """
    c = batch.new_job(name=f'clump-{chr}')
    c.image('hailgenetics/genetics:0.2.37')
    c.memory('1Gi')
    c.command(f"""
plink --bfile {bfile} \
    --clump {assoc} \
    --chr {chr} \
    --clump-p1 0.01 \
    --clump-p2 0.01 \
    --clump-r2 0.5 \
    --clump-kb 1000 \
    --memory 1024

mv plink.clumped {c.clumped}
""")
    return c


def merge(batch, results):
    """
    Merge clumped results files together
    """
    merger = batch.new_job(name='merge-results')
    merger.image('ubuntu:22.04')
    if results:
        merger.command(f"""
head -n 1 {results[0]} > {merger.ofile}
for result in {" ".join(results)}
do
    tail -n +2 "$result" >> {merger.ofile}
done
sed -i -e '/^$/d' {merger.ofile}
""")
    return merger


if __name__ == '__main__':
    backend = hb.ServiceBackend()
    batch = hb.Batch(backend=backend, name='clumping')

    vcf = batch.read_input('gs://hail-tutorial/1kg.vcf.bgz')
    phenotypes = batch.read_input('gs://hail-tutorial/1kg_annotations.txt')

    g = gwas(batch, vcf, phenotypes)

    results = []
    for chr in range(1, 23):
        c = clump(batch, g.ofile, g.ofile.assoc, chr)
        results.append(c.clumped)

    m = merge(batch, results)
    batch.write_output(m.ofile, 'gs://<MY_BUCKET>/batch-clumping/1kg-caffeine-consumption.clumped')

    batch.run(open=True, wait=False)
    backend.close()