Using the expression language to slice, dice, and query genetic data

This notebook uses the Hail expression language to query, filter, and annotate the same thousand genomes dataset from the overview. We also cover how to compute aggregate statistics from a dataset using the expression language.

Every Hail practical notebook starts the same: import the necessary modules, and construct a HailContext. This is the entry point for Hail functionality. This object also wraps a SparkContext, which can be accessed with hc.sc.

In [1]:
from hail import *
hc = HailContext()
Running on Apache Spark version 2.0.2
SparkUI available at http://10.56.135.40:4040
Welcome to
     __  __     <>__
    / /_/ /__  __/ /
   / __  / _ `/ / /
  /_/ /_/\_,_/_/_/   version 0.1-5a67787

If the above cell ran without error, we’re ready to go!

Before using Hail, we import some standard Python libraries for use throughout the notebook.

In [2]:
from pprint import pprint

Check for tutorial data or download if necessary

This cell downloads the necessary data from Google Storage if it isn’t found in the current working directory.

In [3]:
import os
if os.path.isdir('data/1kg.vds') and os.path.isfile('data/1kg_annotations.txt'):
    print('All files are present and accounted for!')
else:
    import sys
    sys.stderr.write('Downloading data (~50M) from Google Storage...\n')
    import urllib
    import tarfile
    urllib.urlretrieve('https://storage.googleapis.com/hail-1kg/tutorial_data.tar',
                       'tutorial_data.tar')
    sys.stderr.write('Download finished!\n')
    sys.stderr.write('Extracting...\n')
    tarfile.open('tutorial_data.tar').extractall()
    if not (os.path.isdir('data/1kg.vds') and os.path.isfile('data/1kg_annotations.txt')):
        raise RuntimeError('Something went wrong!')
    else:
        sys.stderr.write('Done!\n')
Downloading data (~50M) from Google Storage...
Download finished!
Extracting...
Done!

We will read a dataset from disk, and print some summary statistics about it to re-familiarize ourselves.

In [4]:
vds = hc.read('data/1kg.vds')
vds.summarize().report()

         Samples: 1000
        Variants: 10961
       Call Rate: 0.983163
         Contigs: ['X', '12', '8', '19', '4', '15', '11', '9', '22', '13', '16', '5', '10', '21', '6', '1', '17', '14', '20', '2', '18', '7', '3']
   Multiallelics: 0
            SNPs: 10961
            MNPs: 0
      Insertions: 0
       Deletions: 0
 Complex Alleles: 0
    Star Alleles: 0
     Max Alleles: 2

Types in action

We’ll produce some sample annotations with the sample_qc method, then use these annotations to demonstrate some of the expression language features.

In [5]:
vds = vds.variant_qc().cache().sample_qc()
In [6]:
pprint(vds.sample_schema)
Struct{
     qc: Struct{
         callRate: Double,
         nCalled: Int,
         nNotCalled: Int,
         nHomRef: Int,
         nHet: Int,
         nHomVar: Int,
         nSNP: Int,
         nInsertion: Int,
         nDeletion: Int,
         nSingleton: Int,
         nTransition: Int,
         nTransversion: Int,
         dpMean: Double,
         dpStDev: Double,
         gqMean: Double,
         gqStDev: Double,
         nNonRef: Int,
         rTiTv: Double,
         rHetHomVar: Double,
         rInsertionDeletion: Double
     }
 }

Filtering with expressions

The schema printed above is the type of the sample annotations, which are given the variable name ‘sa’ wherever they appear. Here, we use the filter_samples_expr method to filter samples based on these annotations. If we want to filter on the “dpMean” above, we need to select the ‘qc’ field from the ‘sa’ struct, then select the ‘dpMean’ field from the ‘qc’ struct. These selections are done with dots.

There are four Hail methods that use the expression language to filter a dataset: - filter_variants_expr - filter_samples_expr - filter_genotypes - filter_alleles

All these methods take a Hail expression as a string argument, and return a filtered dataset.

In [7]:
# unfiltered
vds.num_samples
Out[7]:
1000
In [8]:
vds.filter_samples_expr('sa.qc.dpMean > 5', keep=True).num_samples
Out[8]:
699
In [9]:
vds.filter_samples_expr('sa.qc.dpMean <= 5', keep=False).num_samples
Out[9]:
699
In [10]:
vds.filter_samples_expr('sa.qc.callRate > 0.95', keep=True).num_samples
Out[10]:
928
In [11]:
vds.filter_samples_expr('sa.qc.callRate > 0.95 && sa.qc.dpMean > 5', keep=True).num_samples
Out[11]:
696

Let’s add some sample annotations from the metadata file to allow for some more interesting expressions.

In [12]:
kt = hc.import_table('data/1kg_annotations.txt', impute=True).key_by('Sample')
vds = vds.annotate_samples_table(kt, root='sa.metadata')
2018-10-18 01:25:55 Hail: INFO: Reading table to impute column types
2018-10-18 01:25:55 Hail: INFO: Finished type imputation
  Loading column `Sample' as type String (imputed)
  Loading column `Population' as type String (imputed)
  Loading column `SuperPopulation' as type String (imputed)
  Loading column `isFemale' as type Boolean (imputed)
  Loading column `PurpleHair' as type Boolean (imputed)
  Loading column `CaffeineConsumption' as type Int (imputed)
In [13]:
pprint(vds.sample_schema)
Struct{
     qc: Struct{
         callRate: Double,
         nCalled: Int,
         nNotCalled: Int,
         nHomRef: Int,
         nHet: Int,
         nHomVar: Int,
         nSNP: Int,
         nInsertion: Int,
         nDeletion: Int,
         nSingleton: Int,
         nTransition: Int,
         nTransversion: Int,
         dpMean: Double,
         dpStDev: Double,
         gqMean: Double,
         gqStDev: Double,
         nNonRef: Int,
         rTiTv: Double,
         rHetHomVar: Double,
         rInsertionDeletion: Double
     },
     metadata: Struct{
         Population: String,
         SuperPopulation: String,
         isFemale: Boolean,
         PurpleHair: Boolean,
         CaffeineConsumption: Int
     }
 }

We can apply conditional filters on things like population with if/else:

In [14]:
vds.filter_samples_expr('if (sa.metadata.Population == "EAS") sa.qc.dpMean > 8 else sa.qc.dpMean > 4').num_samples
Out[14]:
897

Filtering variants and genotypes

One of the advantages of Hail’s filtering interface is that it’s equally easy to filter samples, variants, or genotypes. If one is handed a fresh VCF text file, it’s pretty easy to write a program to filter variants, but much harder to filter samples or genotypes. Other data representations may lend themselves to a different operation being easy, and the others hard. In Hail, we’ve abstracted away all of this – it’s easy to filter anything!

In [15]:
vds.count_variants()
Out[15]:
10961L
In [16]:
# Filter on allele frequency
vds.filter_variants_expr('va.qc.AF > 0.1', keep=True).count_variants()
Out[16]:
7993L
In [17]:
# Filter on allele frequency and GQ mean
vds.filter_variants_expr('va.qc.AF > 0.1 && va.qc.gqMean > 20').count_variants()
Out[17]:
7879L
In [18]:
# Genotype call rate across the entire dataset
vds.summarize().call_rate
Out[18]:
0.9831634887327798

As we can see in the previous cell, the overall call rate of this dataset is 98.7%.

In [19]:
vds.filter_genotypes('g.gq >= 20', keep=True).summarize().call_rate
Out[19]:
0.5495507709150625

However, 40% of those called genotypes are called with GQ 20 or less! This corresponds to less than 99% confidence in the call.

Annotating with expressions

It is also possible to produce new annotations with the expression language. These take an expression of the form:

<new annotation name> = <expression>

To annotate samples, the new annotation name must also start with sa. To annotate variants, it must always begin with va.

Here are some simple examples.

In [20]:
(vds.annotate_samples_expr('sa.keepThisSample = sa.qc.callRate > 0.95 && sa.qc.dpMean > 5')
    .filter_samples_expr('sa.keepThisSample', keep=True).num_samples)
Out[20]:
696
In [21]:
(vds.annotate_variants_expr('va.keepThisVariant = va.qc.AF > 0.1 && va.qc.gqMean > 20')
    .filter_variants_expr('va.keepThisVariant').count_variants())
Out[21]:
7879L

Key tables also have an annotate method. We can use this to produce new columns or redefine old ones:

In [22]:
kt.to_dataframe().show(5)
+-------+----------+---------------+--------+----------+-------------------+
| Sample|Population|SuperPopulation|isFemale|PurpleHair|CaffeineConsumption|
+-------+----------+---------------+--------+----------+-------------------+
|NA19784|       MXL|            AMR|   false|     false|                  8|
|NA19102|       YRI|            AFR|    true|     false|                  6|
|HG00141|       GBR|            EUR|   false|     false|                  6|
|HG01890|       ACB|            AFR|   false|     false|                  8|
|HG00263|       GBR|            EUR|    true|      true|                  6|
+-------+----------+---------------+--------+----------+-------------------+
only showing top 5 rows

In [23]:
kt.annotate('is_american = SuperPopulation == "AMR"').to_dataframe().show(5)
+-------+----------+---------------+--------+----------+-------------------+-----------+
| Sample|Population|SuperPopulation|isFemale|PurpleHair|CaffeineConsumption|is_american|
+-------+----------+---------------+--------+----------+-------------------+-----------+
|NA19784|       MXL|            AMR|   false|     false|                  8|       true|
|NA19102|       YRI|            AFR|    true|     false|                  6|      false|
|HG00141|       GBR|            EUR|   false|     false|                  6|      false|
|HG01890|       ACB|            AFR|   false|     false|                  8|      false|
|HG00263|       GBR|            EUR|    true|      true|                  6|      false|
+-------+----------+---------------+--------+----------+-------------------+-----------+
only showing top 5 rows

Aggregables

We’ve now seen how it’s possible to use the Hail expression language to manipulate various things like numbers and arrays. We can compute the mean of an array of numbers with .mean(), find their max with .max(), and so on.

But what if we wanted to compute the mean of 5 trillion numbers? That’s a lot of data, and turns out to be the rough number of genotypes in the preprocessed gnomAD VCF, which contained about 20 thousand samples and 250 million variants. Hail is designed to handle datasets of this size and larger, and does so by computing in parallel on many computers using Apache Spark.

But we still want a simple programming model that allows us to query and transform such distributed data. That is where the Aggregable comes in. First, an example:

In [24]:
vds.query_genotypes('gs.map(g => g.gq).stats()').mean
Out[24]:
30.682263230349086

The above statement computes the mean GQ of all genotypes in a dataset. This code can compute the mean GQ of a megabyte-scale thousand genomes subset on a laptop, or compute the mean GQ of a 300 TB .vcf on a massive cloud cluster. Hail is scalable!

An Aggregable[T] is distributed collection of elements of type T. The interface is modeled on Array[T], but aggregables can be arbitrarily large and they are unordered, so they don’t support operations like indexing.

Aggregables support map and filter. Like sum, max, etc. on arrays, aggregables support operations which we call “aggregators” that operate on the entire aggregable collection and produce a summary or derived statistic. See the documentation for a complete list of aggregators.

Aggregables are available in expressions on various methods on VariantDataset. Above, query_genotypes exposes the aggregable gs: Aggregable[Genotype] which is the collection of all the genotypes in the dataset.

First, we map the genotypes to their GQ values. Then, we use the stats() aggregator to compute a struct with information like mean and standard deviation. We can see the other values in the struct produced as well:

In [25]:
pprint(vds.query_genotypes('gs.map(g => g.gq).stats()'))
{u'max': 99.0,
 u'mean': 30.682263230349086,
 u'min': 0.0,
 u'nNotMissing': 10776455L,
 u'stdev': 26.544770565260993,
 u'sum': 330646029.00001156}

Count

The count aggregator is pretty simple - it counts the number of elements in the aggregable.

In [26]:
vds.query_genotypes('gs.count()')
Out[26]:
10961000L
In [27]:
vds.num_samples * vds.count_variants()
Out[27]:
10961000L

There’s one genotype per sample per variant, so the count of gs is equal to the number of samples times the number of variants, or about 11 million. How can we make this more useful? With filter!

In [28]:
vds.query_genotypes('gs.filter(g => g.isHet()).count()')
Out[28]:
2583309L

Of the 11 million genotypes in the dataset, about 2.5M are heterozygous.

What about combining sample annotations with genotype information? How many heterozygote genotypes are found in the American samples? A simple way to implement this is by filtering to American samples first and then running the same query.

In [29]:
(vds.filter_samples_expr('sa.metadata.SuperPopulation == "AMR"')
    .query_genotypes('gs.filter(g => g.isHet()).count()'))
Out[29]:
754850L

The next cell is a bit tricky - aggregables have an extra “context” that they carry around. We can actually access the sample, sample annotations, variant, and variant annotations inside of operations on gs. We don’t need to filter samples first, we can do it inside the query:

In [30]:
vds.query_genotypes('gs.filter(g => g.isHet() && sa.metadata.SuperPopulation == "AMR").count()')
Out[30]:
754850L

Here’s an example where we use the variant annotations to count the number of heterozygous genotypes in Americans at rare loci.

In [31]:
vds.query_genotypes('''gs.filter(g => g.isHet()
    && sa.metadata.SuperPopulation == "AMR"
    && va.qc.AF < 0.01).count()''')
Out[31]:
1879L

Sum

The sum aggregator can be used to compute useful statistics per sample or variant. For example, we may want to count the total number of non-reference alleles per sample:

In [32]:
(vds.annotate_samples_expr('sa.nNonRefAlleles = gs.map(g => g.nNonRefAlleles()).sum()')
    .query_samples('samples.map(s => sa.nNonRefAlleles).take(10)'))
Out[32]:
[6423, 6530, 6606, 6638, 6570, 6572, 6542, 6490, 6606, 6464]

Fraction

The fraction aggregator can actually be implemented in terms of filter and 2 counts, but it’s a common enough operation that we have a separate function.

In [33]:
vds.summarize().call_rate
Out[33]:
0.9831634887327798
In [34]:
vds.query_genotypes('gs.fraction(g => g.isCalled())')
Out[34]:
0.9831634887327798

Stats

The stats aggregator computes six useful statistics about a numeric aggregable. We can get quality distributions per sample or variant easily with this function.

In [35]:
pprint((vds.annotate_variants_expr('va.gq_stats = gs.map(g => g.gq).stats()')
 .query_variants('variants.map(v => {v: v, stats: va.gq_stats}).take(3)')))
[{u'stats': {u'max': 99.0,
             u'mean': 39.998995983935735,
             u'min': 1.0,
             u'nNotMissing': 996L,
             u'stdev': 31.792957831360212,
             u'sum': 39838.99999999999},
  u'v': Variant(contig=14, start=30524538, ref=A, alts=[AltAllele(ref=A, alt=G)])},
 {u'stats': {u'max': 99.0,
             u'mean': 26.887550200803233,
             u'min': 3.0,
             u'nNotMissing': 996L,
             u'stdev': 19.34736902981885,
             u'sum': 26780.00000000002},
  u'v': Variant(contig=14, start=30887546, ref=A, alts=[AltAllele(ref=A, alt=C)])},
 {u'stats': {u'max': 99.0,
             u'mean': 34.281504065040686,
             u'min': 0.0,
             u'nNotMissing': 984L,
             u'stdev': 29.878905727852295,
             u'sum': 33733.00000000004},
  u'v': Variant(contig=14, start=31099738, ref=A, alts=[AltAllele(ref=A, alt=G)])}]

Counter

The counter aggregator counts the number of occurrences of each unique key in an aggregable. You may have seen it in the overview:

In [36]:
kt.query('SuperPopulation.counter()')
Out[36]:
{u'AFR': 1018L, u'AMR': 535L, u'EAS': 617L, u'EUR': 669L, u'SAS': 661L}

It’s useful for other computations, too. We can compute the het counts by population per variant:

In [37]:
pprint((vds.annotate_variants_expr('''
    va.pop_counts = gs.filter(g => g.isHet())
                      .map(g => sa.metadata.SuperPopulation).counter()''')
 .query_variants('variants.map(v => {at: str(v), populations: va.pop_counts}).take(3)')))
[{u'at': u'8:95009020:T:C',
  u'populations': {u'AFR': 50L,
                   u'AMR': 143L,
                   u'EAS': 98L,
                   u'EUR': 102L,
                   u'SAS': 2L}},
 {u'at': u'8:95150036:A:G',
  u'populations': {u'AFR': 42L,
                   u'AMR': 99L,
                   u'EAS': 81L,
                   u'EUR': 61L,
                   u'SAS': 3L}},
 {u'at': u'8:95470511:C:G',
  u'populations': {u'AFR': 2L, u'AMR': 7L, u'EAS': 1L, u'EUR': 1L}}]

FlatMap

flatMap is not an aggregator, but a transformer. It is like map, but takes a lambda function that returns either an Array[T] or Set[T], and flattens the elements. Here’s a didactic example:

In [38]:
vds.query_samples('samples.count()')
Out[38]:
1000L
In [39]:
vds.query_samples('samples.flatMap(s => [1,2,3]).count()')
Out[39]:
3000L

Take

take is an aggregator that takes n elements of an aggregable, non-deterministically.

In [40]:
vds.query_genotypes('gs.take(5)')
Out[40]:
[Genotype(GT=1, AD=[2, 2], DP=4, GQ=59, PL=[59, 0, 79]),
 Genotype(GT=0, AD=[7, 0], DP=7, GQ=21, PL=[0, 21, 259]),
 Genotype(GT=1, AD=[6, 5], DP=11, GQ=99, PL=[123, 0, 200]),
 Genotype(GT=0, AD=[9, 0], DP=9, GQ=27, PL=[0, 27, 340]),
 Genotype(GT=1, AD=[2, 6], DP=8, GQ=51, PL=[165, 0, 51])]

Collect

collect is an aggregator that collects all elements of an aggregable into an array. It’s usually not a good idea to use this without filtering the aggregable first. For example, we can collect the set of sample IDs that are called non-reference at each variant.

In [41]:
pprint((vds.filter_variants_expr('va.qc.AF < 0.01')
    .annotate_variants_expr('''
    va.nonref_samples = gs.filter(g => g.isCalledNonRef())
                      .map(g => s).collect()''')
    .query_variants('variants.map(v => {at: str(v), homvars: va.nonref_samples}).take(3)')))
[{u'at': u'8:95470511:C:G',
  u'homvars': [u'HG00657',
               u'HG01051',
               u'HG01113',
               u'HG01173',
               u'HG01251',
               u'HG01383',
               u'HG01468',
               u'HG01479',
               u'HG01773',
               u'HG01924',
               u'HG02282',
               u'HG02477']},
 {u'at': u'8:95863909:A:T', u'homvars': []},
 {u'at': u'8:97172671:C:T', u'homvars': []}]

takeBy

takeBy is an aggregator that takes elements of an aggregable ordered by a lambda function (smallest to largest). We can easily select the variants with the lowest p-values after regression:

In [42]:
top_5_pvals = (vds.linreg('sa.metadata.CaffeineConsumption')
               .query_variants('variants.map(v => {at: str(v), pval: va.linreg.pval}).takeBy(x => x.pval, 5)'))
pprint(top_5_pvals)
2018-10-18 01:26:07 Hail: INFO: Running linear regression on 1000 samples with 1 covariate including intercept...
[{u'at': u'10:56025604:A:C', u'pval': 5.595049078641033e-05},
 {u'at': u'20:55431571:A:C', u'pval': 0.00010899661736561121},
 {u'at': u'10:91099630:T:C', u'pval': 0.00013497679316886596},
 {u'at': u'4:149350527:T:C', u'pval': 0.00017786066989195366},
 {u'at': u'7:152600817:G:A', u'pval': 0.0002252314501866726}]

Aggregating by key

The aggregate_by_key method is likely the most powerful piece of query functionality in Hail. It’s a method on KeyTable. You can produce key tables from a VariantDataset with three methods:

  • variants_table(): a key table with the variant and variant annotations as columns. There is one row per variant.
  • samples_table(): a key table with the sample and sample annotations as columns. There is one row per sample.
  • genotypes_table(): a key table that is the coordinate representation of the genetic matrix. The columns are the variant, variant annotations, sample, sample annotations, and genotype. There is one row per variant/sample combination: (N * M) total rows!

Using aggregate_by_key with genotypes_table can produce counts of loss of function variants in cases and controls per gene, compute the mean depth per sample per exon, and much more. You define the aggregation keys, and you define how to combine the rows. This method produces another KeyTable.

We use it here to compute the mean depth and quality by alt allele type by population. This particular aggregation isn’t particularly exciting, but illustrates the complete flexibility of this model. You can group by gene, or by gene and consequence, or by frequency bin and gene, or any combination of groupings you think may be scientifically useful.

In [43]:
agg = (vds.genotypes_table()
       .aggregate_by_key(key_expr=['alt = v.altAllele()', 'pop = sa.metadata.Population'],
                         agg_expr=['meanDP = g.map(g => g.dp).stats().mean',
                                   'meanGQ = g.map(g => g.gq).stats().mean']))

agg.to_dataframe().show(20)
+-------+-------+---+-----------------+------------------+
|alt.ref|alt.alt|pop|           meanDP|            meanGQ|
+-------+-------+---+-----------------+------------------+
|      G|      A|CLM|6.713602921189501|29.557196640602385|
|      G|      A|CHS|6.055483222825004|  26.1616413529854|
|      A|      G|ACB|8.219300572540169| 36.69049236462031|
|      G|      C|KHV| 8.11193496515991|27.014288265761753|
|      T|      A|GBR|6.724943693693696| 25.52567890811876|
|      G|      T|GBR|6.645626690712366| 30.06765181251444|
|      G|      T|GWD|8.781482584102415| 39.77106281631438|
|      G|      T|ACB|8.187543469969247| 35.60651584116235|
|      T|      C|ACB|8.175691316854016| 36.45534497566568|
|      C|      A|CLM|6.717877643669381| 30.58616631799159|
|      A|      T|CLM| 6.62070440610295| 23.03519521231121|
|      T|      A|PEL|7.990459153249849|28.495455222768577|
|      G|      T|PJL|7.010545742156603|  31.2713382507903|
|      C|      T|FIN|5.444408373156044|24.476829768405626|
|      A|      T|PEL|7.915141172820342|27.473113280644665|
|      G|      A|FIN|5.432682961313487|24.082344645218175|
|      T|      A|PUR|6.788825990647302| 25.04023624953857|
|      A|      G|CHS|6.165009609788766|27.951926034044316|
|      C|      A|GWD|8.835015863859248| 39.03259301990194|
|      A|      G|GWD|8.968527097580713| 41.12155305537184|
+-------+-------+---+-----------------+------------------+
only showing top 20 rows