Source code for hail.methods.qc

import abc
import logging
import os
from collections import Counter
from shlex import quote as shq
from typing import Dict, List, Optional, Tuple, Union

import hail as hl
import hailtop.batch_client as bc
from hail.backend.service_backend import ServiceBackend
from hail.expr import Float64Expression
from hail.expr.expressions.expression_typecheck import expr_float64
from hail.expr.functions import numeric_allele_type
from hail.expr.types import tarray, tfloat, tint32, tstr, tstruct
from hail.genetics.allele_type import AlleleType
from hail.ir import TableToTableApply
from hail.matrixtable import MatrixTable
from hail.table import Table
from hail.typecheck import anytype, nullable, numeric, oneof, typecheck
from hail.utils import FatalError
from hail.utils.java import Env, info, warning
from hail.utils.misc import divide_null, guess_cloud_spark_provider, new_temp_file
from hailtop import pip_version, yamlx
from hailtop.config import get_deploy_config
from hailtop.utils import async_to_blocking

from .misc import (
    require_alleles_field,
    require_biallelic,
    require_col_key_str,
    require_row_key_variant,
    require_table_key_variant,
)

log = logging.getLogger('methods.qc')


HAIL_GENETICS_VEP_GRCH37_85_IMAGE = os.environ.get(
    'HAIL_GENETICS_VEP_GRCH37_85_IMAGE', f'hailgenetics/vep-grch37-85:{pip_version()}'
)
HAIL_GENETICS_VEP_GRCH38_95_IMAGE = os.environ.get(
    'HAIL_GENETICS_VEP_GRCH38_95_IMAGE', f'hailgenetics/vep-grch38-95:{pip_version()}'
)


def _qc_allele_type(ref, alt):
    return hl.bind(
        lambda at: hl.if_else(
            at == AlleleType.SNP,
            hl.if_else(hl.is_transition(ref, alt), AlleleType.TRANSITION, AlleleType.TRANSVERSION),
            at,
        ),
        numeric_allele_type(ref, alt),
    )


[docs]@typecheck(mt=MatrixTable, name=str) def sample_qc(mt, name='sample_qc') -> MatrixTable: """Compute per-sample metrics useful for quality control. .. include:: ../_templates/req_tvariant.rst Examples -------- Compute sample QC metrics and remove low-quality samples: >>> dataset = hl.sample_qc(dataset, name='sample_qc') >>> filtered_dataset = dataset.filter_cols((dataset.sample_qc.dp_stats.mean > 20) & (dataset.sample_qc.r_ti_tv > 1.5)) Notes ----- This method computes summary statistics per sample from a genetic matrix and stores the results as a new column-indexed struct field in the matrix, named based on the `name` parameter. If `mt` contains an entry field `DP` of type :py:data:`.tint32`, then the field `dp_stats` is computed. If `mt` contains an entry field `GQ` of type :py:data:`.tint32`, then the field `gq_stats` is computed. Both `dp_stats` and `gq_stats` are structs with with four fields: - `mean` (``float64``) -- Mean value. - `stdev` (``float64``) -- Standard deviation (zero degrees of freedom). - `min` (``int32``) -- Minimum value. - `max` (``int32``) -- Maximum value. If the dataset does not contain an entry field `GT` of type :py:data:`.tcall`, then an error is raised. The following fields are always computed from `GT`: - `call_rate` (``float64``) -- Fraction of calls not missing or filtered. Equivalent to `n_called` divided by :meth:`.count_rows`. - `n_called` (``int64``) -- Number of non-missing calls. - `n_not_called` (``int64``) -- Number of missing calls. - `n_filtered` (``int64``) -- Number of filtered entries. - `n_hom_ref` (``int64``) -- Number of homozygous reference calls. - `n_het` (``int64``) -- Number of heterozygous calls. - `n_hom_var` (``int64``) -- Number of homozygous alternate calls. - `n_non_ref` (``int64``) -- Sum of `n_het` and `n_hom_var`. - `n_snp` (``int64``) -- Number of SNP alternate alleles. - `n_insertion` (``int64``) -- Number of insertion alternate alleles. - `n_deletion` (``int64``) -- Number of deletion alternate alleles. - `n_singleton` (``int64``) -- Number of private alleles. Reference alleles are never counted as singletons, even if every other allele at a site is non-reference. - `n_transition` (``int64``) -- Number of transition (A-G, C-T) alternate alleles. - `n_transversion` (``int64``) -- Number of transversion alternate alleles. - `n_star` (``int64``) -- Number of star (upstream deletion) alleles. - `r_ti_tv` (``float64``) -- Transition/Transversion ratio. - `r_het_hom_var` (``float64``) -- Het/HomVar call ratio. - `r_insertion_deletion` (``float64``) -- Insertion/Deletion allele ratio. Missing values ``NA`` may result from division by zero. Parameters ---------- mt : :class:`.MatrixTable` Dataset. name : :class:`str` Name for resulting field. Returns ------- :class:`.MatrixTable` Dataset with a new column-indexed field `name`. """ require_row_key_variant(mt, 'sample_qc') variant_ac = Env.get_uid() variant_atypes = Env.get_uid() mt = mt.annotate_rows(**{ variant_ac: hl.agg.call_stats(mt.GT, mt.alleles).AC, variant_atypes: mt.alleles[1:].map(lambda alt: _qc_allele_type(mt.alleles[0], alt)), }) bound_exprs = {} gq_dp_exprs = {} def has_field_of_type(name, dtype): return name in mt.entry and mt[name].dtype == dtype if has_field_of_type('DP', hl.tint32): gq_dp_exprs['dp_stats'] = hl.agg.stats(mt.DP).select('mean', 'stdev', 'min', 'max') if has_field_of_type('GQ', hl.tint32): gq_dp_exprs['gq_stats'] = hl.agg.stats(mt.GQ).select('mean', 'stdev', 'min', 'max') if not has_field_of_type('GT', hl.tcall): raise ValueError("'sample_qc': expect an entry field 'GT' of type 'call'") bound_exprs['n_called'] = hl.agg.count_where(hl.is_defined(mt['GT'])) bound_exprs['n_not_called'] = hl.agg.count_where(hl.is_missing(mt['GT'])) n_rows_ref = hl.expr.construct_expr( hl.ir.Ref('n_rows', hl.tint64), hl.tint64, mt._col_indices, hl.utils.LinkedList(hl.expr.expressions.Aggregation) ) bound_exprs['n_filtered'] = n_rows_ref - hl.agg.count() bound_exprs['n_hom_ref'] = hl.agg.count_where(mt['GT'].is_hom_ref()) bound_exprs['n_het'] = hl.agg.count_where(mt['GT'].is_het()) bound_exprs['n_singleton'] = hl.agg.sum( hl.rbind( mt['GT'], lambda gt: hl.sum( hl.range(0, gt.ploidy).map( lambda i: hl.rbind(gt[i], lambda gti: (gti != 0) & (mt[variant_ac][gti] == 1)) ) ), ) ) bound_exprs['allele_type_counts'] = hl.agg.explode( lambda allele_type: hl.tuple(hl.agg.count_where(allele_type == i) for i in range(len(AlleleType))), ( hl.range(0, mt['GT'].ploidy) .map(lambda i: mt['GT'][i]) .filter(lambda allele_idx: allele_idx > 0) .map(lambda allele_idx: mt[variant_atypes][allele_idx - 1]) ), ) result_struct = hl.rbind( hl.struct(**bound_exprs), lambda x: hl.rbind( hl.struct(**{ **gq_dp_exprs, 'call_rate': hl.float64(x.n_called) / (x.n_called + x.n_not_called + x.n_filtered), 'n_called': x.n_called, 'n_not_called': x.n_not_called, 'n_filtered': x.n_filtered, 'n_hom_ref': x.n_hom_ref, 'n_het': x.n_het, 'n_hom_var': x.n_called - x.n_hom_ref - x.n_het, 'n_non_ref': x.n_called - x.n_hom_ref, 'n_singleton': x.n_singleton, 'n_snp': x.allele_type_counts[AlleleType.TRANSITION] + x.allele_type_counts[AlleleType.TRANSVERSION], 'n_insertion': x.allele_type_counts[AlleleType.INSERTION], 'n_deletion': x.allele_type_counts[AlleleType.DELETION], 'n_transition': x.allele_type_counts[AlleleType.TRANSITION], 'n_transversion': x.allele_type_counts[AlleleType.TRANSVERSION], 'n_star': x.allele_type_counts[AlleleType.STAR], }), lambda s: s.annotate( r_ti_tv=divide_null(hl.float64(s.n_transition), s.n_transversion), r_het_hom_var=divide_null(hl.float64(s.n_het), s.n_hom_var), r_insertion_deletion=divide_null(hl.float64(s.n_insertion), s.n_deletion), ), ), ) mt = mt.annotate_cols(**{name: result_struct}) mt = mt.drop(variant_ac, variant_atypes) return mt
[docs]@typecheck(mt=MatrixTable, name=str) def variant_qc(mt, name='variant_qc') -> MatrixTable: """Compute common variant statistics (quality control metrics). .. include:: ../_templates/req_tvariant.rst Examples -------- >>> dataset_result = hl.variant_qc(dataset) Notes ----- This method computes variant statistics from the genotype data, returning a new struct field `name` with the following metrics based on the fields present in the entry schema. If `mt` contains an entry field `DP` of type :py:data:`.tint32`, then the field `dp_stats` is computed. If `mt` contains an entry field `GQ` of type :py:data:`.tint32`, then the field `gq_stats` is computed. Both `dp_stats` and `gq_stats` are structs with with four fields: - `mean` (``float64``) -- Mean value. - `stdev` (``float64``) -- Standard deviation (zero degrees of freedom). - `min` (``int32``) -- Minimum value. - `max` (``int32``) -- Maximum value. If the dataset does not contain an entry field `GT` of type :py:data:`.tcall`, then an error is raised. The following fields are always computed from `GT`: - `AF` (``array<float64>``) -- Calculated allele frequency, one element per allele, including the reference. Sums to one. Equivalent to `AC` / `AN`. - `AC` (``array<int32>``) -- Calculated allele count, one element per allele, including the reference. Sums to `AN`. - `AN` (``int32``) -- Total number of called alleles. - `homozygote_count` (``array<int32>``) -- Number of homozygotes per allele. One element per allele, including the reference. - `call_rate` (``float64``) -- Fraction of calls neither missing nor filtered. Equivalent to `n_called` / :meth:`.count_cols`. - `n_called` (``int64``) -- Number of samples with a defined `GT`. - `n_not_called` (``int64``) -- Number of samples with a missing `GT`. - `n_filtered` (``int64``) -- Number of filtered entries. - `n_het` (``int64``) -- Number of heterozygous samples. - `n_non_ref` (``int64``) -- Number of samples with at least one called non-reference allele. - `het_freq_hwe` (``float64``) -- Expected frequency of heterozygous samples under Hardy-Weinberg equilibrium. See :func:`.functions.hardy_weinberg_test` for details. - `p_value_hwe` (``float64``) -- p-value from two-sided test of Hardy-Weinberg equilibrium. See :func:`.functions.hardy_weinberg_test` for details. - `p_value_excess_het` (``float64``) -- p-value from one-sided test of Hardy-Weinberg equilibrium for excess heterozygosity. See :func:`.functions.hardy_weinberg_test` for details. Warning ------- `het_freq_hwe` and `p_value_hwe` are calculated as in :func:`.functions.hardy_weinberg_test`, with non-diploid calls (``ploidy != 2``) ignored in the counts. As this test is only statistically rigorous in the biallelic setting, :func:`.variant_qc` sets both fields to missing for multiallelic variants. Consider using :func:`~hail.methods.split_multi` to split multi-allelic variants beforehand. Parameters ---------- mt : :class:`.MatrixTable` Dataset. name : :class:`str` Name for resulting field. Returns ------- :class:`.MatrixTable` """ require_alleles_field(mt, 'variant_qc') bound_exprs = {} gq_dp_exprs = {} def has_field_of_type(name, dtype): return name in mt.entry and mt[name].dtype == dtype if has_field_of_type('DP', hl.tint32): gq_dp_exprs['dp_stats'] = hl.agg.stats(mt.DP).select('mean', 'stdev', 'min', 'max') if has_field_of_type('GQ', hl.tint32): gq_dp_exprs['gq_stats'] = hl.agg.stats(mt.GQ).select('mean', 'stdev', 'min', 'max') if not has_field_of_type('GT', hl.tcall): raise ValueError("'variant_qc': expect an entry field 'GT' of type 'call'") bound_exprs['n_called'] = hl.agg.count_where(hl.is_defined(mt['GT'])) bound_exprs['n_not_called'] = hl.agg.count_where(hl.is_missing(mt['GT'])) n_cols_ref = hl.expr.construct_expr( hl.ir.Ref('n_cols', hl.tint32), hl.tint32, mt._row_indices, hl.utils.LinkedList(hl.expr.expressions.Aggregation) ) bound_exprs['n_filtered'] = hl.int64(n_cols_ref) - hl.agg.count() bound_exprs['call_stats'] = hl.agg.call_stats(mt.GT, mt.alleles) result = hl.rbind( hl.struct(**bound_exprs), lambda e1: hl.rbind( hl.case() .when( hl.len(mt.alleles) == 2, ( hl.hardy_weinberg_test( e1.call_stats.homozygote_count[0], e1.call_stats.AC[1] - 2 * e1.call_stats.homozygote_count[1], e1.call_stats.homozygote_count[1], ), hl.hardy_weinberg_test( e1.call_stats.homozygote_count[0], e1.call_stats.AC[1] - 2 * e1.call_stats.homozygote_count[1], e1.call_stats.homozygote_count[1], one_sided=True, ), ), ) .or_missing(), lambda hwe: hl.struct(**{ **gq_dp_exprs, **e1.call_stats, 'call_rate': hl.float(e1.n_called) / (e1.n_called + e1.n_not_called + e1.n_filtered), 'n_called': e1.n_called, 'n_not_called': e1.n_not_called, 'n_filtered': e1.n_filtered, 'n_het': e1.n_called - hl.sum(e1.call_stats.homozygote_count), 'n_non_ref': e1.n_called - e1.call_stats.homozygote_count[0], 'het_freq_hwe': hwe[0].het_freq_hwe, 'p_value_hwe': hwe[0].p_value, 'p_value_excess_het': hwe[1].p_value, }), ), ) return mt.annotate_rows(**{name: result})
[docs]@typecheck(left=MatrixTable, right=MatrixTable, _localize_global_statistics=bool) def concordance(left, right, *, _localize_global_statistics=True) -> Tuple[List[List[int]], Table, Table]: """Calculate call concordance with another dataset. .. include:: ../_templates/req_tstring.rst .. include:: ../_templates/req_tvariant.rst .. include:: ../_templates/req_biallelic.rst .. include:: ../_templates/req_unphased_diploid_gt.rst Examples -------- Compute concordance between two datasets and output the global concordance statistics and two tables with concordance computed per column key and per row key: >>> global_conc, cols_conc, rows_conc = hl.concordance(dataset, dataset2) Notes ----- This method computes the genotype call concordance (from the entry field **GT**) between two biallelic variant datasets. It requires unique sample IDs and performs an inner join on samples (only samples in both datasets will be considered). In addition, all genotype calls must be **diploid** and **unphased**. It performs an ordered zip join of the variants. That means the variants of each dataset are sorted, with duplicate variants appearing in some random relative order, and then zipped together. When a variant appears a different number of times between the two datasets, the dataset with the fewer number of instances is padded with "no data". For example, if a variant is only in one dataset, then each genotype is treated as "no data" in the other. This method returns a tuple of three objects: a nested list of list of int with global concordance summary statistics, a table with concordance statistics per column key, and a table with concordance statistics per row key. **Using the global summary result** The global summary is a list of list of int (conceptually a 5 by 5 matrix), where the indices have special meaning: 0. No Data (missing variant or filtered entry) 1. No Call (missing genotype call) 2. Hom Ref 3. Heterozygous 4. Hom Var The first index is the state in the left dataset and the second index is the state in the right dataset. Typical uses of the summary list are shown below. >>> summary, samples, variants = hl.concordance(dataset, dataset2) >>> left_homref_right_homvar = summary[2][4] >>> left_het_right_missing = summary[3][1] >>> left_het_right_something_else = sum(summary[3][:]) - summary[3][3] >>> total_concordant = summary[2][2] + summary[3][3] + summary[4][4] >>> total_discordant = sum([sum(s[2:]) for s in summary[2:]]) - total_concordant **Using the table results** Table 1: Concordance statistics by column This table contains the column key field of `left`, and the following fields: - `n_discordant` (:py:data:`.tint64`) -- Count of discordant calls (see below for full definition). - `concordance` (:class:`.tarray` of :class:`.tarray` of :py:data:`.tint64`) -- Array of concordance per state on left and right, matching the structure of the global summary defined above. Table 2: Concordance statistics by row This table contains the row key fields of `left`, and the following fields: - `n_discordant` (:py:data:`.tfloat64`) -- Count of discordant calls (see below for full definition). - `concordance` (:class:`.tarray` of :class:`.tarray` of :py:data:`.tint64`) -- Array of concordance per state on left and right, matching the structure of the global summary defined above. In these tables, the column **n_discordant** is provided as a convenience, because this is often one of the most useful concordance statistics. This value is the number of genotypes which were called (homozygous reference, heterozygous, or homozygous variant) in both datasets, but where the call did not match between the two. The column `concordance` matches the structure of the global summmary, which is detailed above. Once again, the first index into this array is the state on the left, and the second index is the state on the right. For example, ``concordance[1][4]`` is the number of "no call" genotypes on the left that were called homozygous variant on the right. Parameters ---------- left : :class:`.MatrixTable` First dataset to compare. right : :class:`.MatrixTable` Second dataset to compare. Returns ------- (list of list of int, :class:`.Table`, :class:`.Table`) The global concordance statistics, a table with concordance statistics per column key, and a table with concordance statistics per row key. """ require_col_key_str(left, 'concordance, left') require_col_key_str(right, 'concordance, right') left_sample_counter = left.aggregate_cols(hl.agg.counter(left.col_key[0])) right_sample_counter = right.aggregate_cols(hl.agg.counter(right.col_key[0])) left_bad = [f'{k!r}: {v}' for k, v in left_sample_counter.items() if v > 1] right_bad = [f'{k!r}: {v}' for k, v in right_sample_counter.items() if v > 1] if left_bad or right_bad: raise ValueError( f"Found duplicate sample IDs:\n" f" left: {', '.join(left_bad)}\n" f" right: {', '.join(right_bad)}" ) included = set(left_sample_counter.keys()).intersection(set(right_sample_counter.keys())) info( f"concordance: including {len(included)} shared samples " f"({len(left_sample_counter)} total on left, {len(right_sample_counter)} total on right)" ) left = require_biallelic(left, 'concordance, left') right = require_biallelic(right, 'concordance, right') lit = hl.literal(included, dtype=hl.tset(hl.tstr)) left = left.filter_cols(lit.contains(left.col_key[0])) right = right.filter_cols(lit.contains(right.col_key[0])) left = left.select_entries('GT').select_rows().select_cols() right = right.select_entries('GT').select_rows().select_cols() joined = hl.experimental.full_outer_join_mt(left, right) def get_idx(struct): return hl.if_else(hl.is_missing(struct), 0, hl.coalesce(2 + struct.GT.n_alt_alleles(), 1)) aggr = hl.agg.counter(get_idx(joined.left_entry) + 5 * get_idx(joined.right_entry)) def concordance_array(counter): return hl.range(0, 5).map(lambda i: hl.range(0, 5).map(lambda j: counter.get(i + 5 * j, 0))) discordant_indices = set() for i in range(5): for j in range(5): if i > 1 and j > 1 and i != j: discordant_indices.add(i + 5 * j) def n_discordant(counter): return hl.sum( hl.array(counter) .filter(lambda tup: hl.literal(discordant_indices).contains(tup[0])) .map(lambda tup: tup[1]) ) glob = joined.aggregate_entries(concordance_array(aggr), _localize=_localize_global_statistics) if _localize_global_statistics: total_conc = [x[1:] for x in glob[1:]] on_diag = sum(total_conc[i][i] for i in range(len(total_conc))) total_obs = sum(sum(x) for x in total_conc) pct = on_diag / total_obs * 100 if total_obs > 0 else float('nan') info(f"concordance: total concordance {pct:.2f}%") per_variant = joined.annotate_rows(concordance=aggr) per_variant = per_variant.select_rows( concordance=concordance_array(per_variant.concordance), n_discordant=n_discordant(per_variant.concordance) ) per_sample = joined.annotate_cols(concordance=aggr) per_sample = per_sample.select_cols( concordance=concordance_array(per_sample.concordance), n_discordant=n_discordant(per_sample.concordance) ) return glob, per_sample.cols(), per_variant.rows()
vep_json_typ = tstruct( assembly_name=tstr, allele_string=tstr, ancestral=tstr, colocated_variants=tarray( tstruct( aa_allele=tstr, aa_maf=tfloat, afr_allele=tstr, afr_maf=tfloat, allele_string=tstr, amr_allele=tstr, amr_maf=tfloat, clin_sig=tarray(tstr), end=tint32, eas_allele=tstr, eas_maf=tfloat, ea_allele=tstr, ea_maf=tfloat, eur_allele=tstr, eur_maf=tfloat, exac_adj_allele=tstr, exac_adj_maf=tfloat, exac_allele=tstr, exac_afr_allele=tstr, exac_afr_maf=tfloat, exac_amr_allele=tstr, exac_amr_maf=tfloat, exac_eas_allele=tstr, exac_eas_maf=tfloat, exac_fin_allele=tstr, exac_fin_maf=tfloat, exac_maf=tfloat, exac_nfe_allele=tstr, exac_nfe_maf=tfloat, exac_oth_allele=tstr, exac_oth_maf=tfloat, exac_sas_allele=tstr, exac_sas_maf=tfloat, id=tstr, minor_allele=tstr, minor_allele_freq=tfloat, phenotype_or_disease=tint32, pubmed=tarray(tint32), sas_allele=tstr, sas_maf=tfloat, somatic=tint32, start=tint32, strand=tint32, ) ), context=tstr, end=tint32, id=tstr, input=tstr, intergenic_consequences=tarray( tstruct( allele_num=tint32, consequence_terms=tarray(tstr), impact=tstr, minimised=tint32, variant_allele=tstr, ) ), most_severe_consequence=tstr, motif_feature_consequences=tarray( tstruct( allele_num=tint32, consequence_terms=tarray(tstr), high_inf_pos=tstr, impact=tstr, minimised=tint32, motif_feature_id=tstr, motif_name=tstr, motif_pos=tint32, motif_score_change=tfloat, strand=tint32, variant_allele=tstr, ) ), regulatory_feature_consequences=tarray( tstruct( allele_num=tint32, biotype=tstr, consequence_terms=tarray(tstr), impact=tstr, minimised=tint32, regulatory_feature_id=tstr, variant_allele=tstr, ) ), seq_region_name=tstr, start=tint32, strand=tint32, transcript_consequences=tarray( tstruct( allele_num=tint32, amino_acids=tstr, biotype=tstr, canonical=tint32, ccds=tstr, cdna_start=tint32, cdna_end=tint32, cds_end=tint32, cds_start=tint32, codons=tstr, consequence_terms=tarray(tstr), distance=tint32, domains=tarray(tstruct(db=tstr, name=tstr)), exon=tstr, gene_id=tstr, gene_pheno=tint32, gene_symbol=tstr, gene_symbol_source=tstr, hgnc_id=tstr, hgvsc=tstr, hgvsp=tstr, hgvs_offset=tint32, impact=tstr, intron=tstr, lof=tstr, lof_flags=tstr, lof_filter=tstr, lof_info=tstr, minimised=tint32, polyphen_prediction=tstr, polyphen_score=tfloat, protein_end=tint32, protein_start=tint32, protein_id=tstr, sift_prediction=tstr, sift_score=tfloat, strand=tint32, swissprot=tstr, transcript_id=tstr, trembl=tstr, uniparc=tstr, variant_allele=tstr, ) ), variant_class=tstr, )
[docs]class VEPConfig(abc.ABC): """Base class for configuring VEP. To define a custom VEP configuration to for Query on Batch, construct a new class that inherits from :class:`.VEPConfig` and has the following parameters defined: - `json_type` (:class:`.HailType`): The type of the VEP JSON schema (as produced by VEP when invoked with the `--json` option). - `data_bucket` (:obj:`.str`) -- The location where the VEP data is stored. - `data_mount` (:obj:`.str`) -- The location in the container where the data should be mounted. - `batch_run_command` (:obj:`.list` of :obj:`.str`) -- The command line to run for a VEP job for a partition. - `batch_run_csq_header_command` (:obj:`.list` of :obj:`.str`) -- The command line to run when generating the consequence header. - `env` (dict of :obj:`.str` to :obj:`.str`) -- A map of environment variables to values to add to the environment when invoking the command. - `cloud` (:obj:`.str`) -- The cloud where the Batch Service is located. - `image` (:obj:`.str`) -- The docker image to run VEP. - `data_bucket_is_requester_pays` (:obj:`.bool`) -- True if the data bucket is requester pays. - `regions` (:obj:`.list` of :obj:`.str`) -- A list of regions the VEP jobs can run in. In addition, the method `command` must be defined with the following signature. The output is the exact command to run the VEP executable. The inputs are `consequence` and `tolerate_parse_error` which are user-defined parameters to :func:`.vep`, `part_id` which is the partition ID, `input_file` which is the path to the input file where the input data can be found, and `output_file` is the path to the output file where the VEP annotations are written to. An example is shown below: .. code-block:: python3 def command(self, consequence: bool, tolerate_parse_error: bool, part_id: int, input_file: Optional[str], output_file: str) -> List[str]: vcf_or_json = '--vcf' if consequence else '--json' input_file = f'--input_file {input_file}' if input_file else '' return f'''/vep/vep {input_file} \ --format vcf \ {vcf_or_json} \ --everything \ --allele_number \ --no_stats \ --cache \ --offline \ --minimal \ --assembly GRCh37 \ --dir={self.data_mount} \ --plugin LoF,human_ancestor_fa:{self.data_mount}/loftee_data/human_ancestor.fa.gz,filter_position:0.05,min_intron_size:15,conservation_file:{self.data_mount}/loftee_data/phylocsf_gerp.sql,gerp_file:{self.data_mount}/loftee_data/GERP_scores.final.sorted.txt.gz \ -o STDOUT ''' The following environment variables are added to the job's environment: - `VEP_BLOCK_SIZE` - The maximum number of variants provided as input to each invocation of VEP. - `VEP_PART_ID` - Partition ID. - `VEP_DATA_MOUNT` - Location where the vep data is mounted (same as `data_mount` in the config). - `VEP_CONSEQUENCE` - Integer equal to 0 or 1 on whether `csq` is False or True. - `VEP_TOLERATE_PARSE_ERROR` - Integer equal to 0 or 1 on whether `tolerate_parse_error` is False or True. - `VEP_OUTPUT_FILE` - String specifying the local path where the output TSV file with the VEP result should be located. - `VEP_INPUT_FILE` - String specifying the local path where the input VCF shard is located for all jobs. The `VEP_INPUT_FILE` environment variable is not available for the single job that computes the consequence header when ``csq=True`` """ json_typ: hl.expr.HailType data_bucket: str data_mount: str regions: List[str] image: str env: Dict[str, str] data_bucket_is_requester_pays: bool cloud: str batch_run_command: List[str] batch_run_csq_header_command: List[str] @abc.abstractmethod def command( self, consequence: bool, tolerate_parse_error: bool, part_id: int, input_file: Optional[str], output_file: str ) -> List[str]: raise NotImplementedError
[docs]class VEPConfigGRCh37Version85(VEPConfig): """ The Hail-maintained VEP configuration for GRCh37 for VEP version 85. This class takes the following constructor arguments: - `data_bucket` (:obj:`.str`) -- The location where the VEP data is stored. - `data_mount` (:obj:`.str`) -- The location in the container where the data should be mounted. - `image` (:obj:`.str`) -- The docker image to run VEP. - `cloud` (:obj:`.str`) -- The cloud where the Batch Service is located. - `data_bucket_is_requester_pays` (:obj:`.bool`) -- True if the data bucket is requester pays. - `regions` (:obj:`.list` of :obj:`.str`) -- A list of regions the VEP jobs can run in. """ def __init__( self, *, data_bucket: str, data_mount: str, image: str, regions: List[str], cloud: str, data_bucket_is_requester_pays: bool, ): self.data_bucket = data_bucket self.data_mount = data_mount self.image = image self.regions = regions self.env = {} self.data_bucket_is_requester_pays = data_bucket_is_requester_pays self.cloud = cloud self.batch_run_command = ['python3', '/hail-vep/vep.py', 'vep'] self.batch_run_csq_header_command = ['python3', '/hail-vep/vep.py', 'csq_header'] self.json_typ = vep_json_typ def command( self, *, consequence: bool, tolerate_parse_error: bool, part_id: int, input_file: Optional[str], output_file: str, ) -> str: vcf_or_json = '--vcf' if consequence else '--json' input_file = f'--input_file {input_file}' if input_file else '' return f"""/vep/vep {input_file} \ --format vcf \ {vcf_or_json} \ --everything \ --allele_number \ --no_stats \ --cache \ --offline \ --minimal \ --assembly GRCh37 \ --dir={self.data_mount} \ --plugin LoF,human_ancestor_fa:{self.data_mount}/loftee_data/human_ancestor.fa.gz,filter_position:0.05,min_intron_size:15,conservation_file:{self.data_mount}/loftee_data/phylocsf_gerp.sql,gerp_file:{self.data_mount}/loftee_data/GERP_scores.final.sorted.txt.gz \ -o STDOUT """
[docs]class VEPConfigGRCh38Version95(VEPConfig): """ The Hail-maintained VEP configuration for GRCh38 for VEP version 95. This class takes the following constructor arguments: - `data_bucket` (:obj:`.str`) -- The location where the VEP data is stored. - `data_mount` (:obj:`.str`) -- The location in the container where the data should be mounted. - `image` (:obj:`.str`) -- The docker image to run VEP. - `cloud` (:obj:`.str`) -- The cloud where the Batch Service is located. - `data_bucket_is_requester_pays` (:obj:`.bool`) -- True if the data bucket is set to requester pays. - `regions` (:obj:`.list` of :obj:`.str`) -- A list of regions the VEP jobs can run in. """ def __init__( self, *, data_bucket: str, data_mount: str, image: str, regions: List[str], cloud: str, data_bucket_is_requester_pays: bool, ): self.data_bucket = data_bucket self.data_mount = data_mount self.image = image self.regions = regions self.env = {} self.data_bucket_is_requester_pays = data_bucket_is_requester_pays self.cloud = cloud self.batch_run_command = ['python3', '/hail-vep/vep.py', 'vep'] self.batch_run_csq_header_command = ['python3', '/hail-vep/vep.py', 'csq_header'] self.json_typ = vep_json_typ._insert_field( 'transcript_consequences', tarray( vep_json_typ['transcript_consequences'].element_type._insert_fields( appris=tstr, tsl=tint32, ) ), ) def command( self, *, consequence: bool, tolerate_parse_error: bool, part_id: int, input_file: Optional[str], output_file: str, ) -> str: vcf_or_json = '--vcf' if consequence else '--json' input_file = f'--input_file {input_file}' if input_file else '' return f"""/vep/vep {input_file} \ --format vcf \ {vcf_or_json} \ --everything \ --allele_number \ --no_stats \ --cache \ --offline \ --minimal \ --assembly GRCh38 \ --fasta {self.data_mount}homo_sapiens/95_GRCh38/Homo_sapiens.GRCh38.dna.toplevel.fa.gz \ --plugin "LoF,loftee_path:/vep/ensembl-vep/Plugins/,gerp_bigwig:{self.data_mount}/gerp_conservation_scores.homo_sapiens.GRCh38.bw,human_ancestor_fa:{self.data_mount}/human_ancestor.fa.gz,conservation_file:{self.data_mount}/loftee.sql" \ --dir_plugins /vep/ensembl-vep/Plugins/ \ --dir_cache {self.data_mount} \ -o STDOUT """
supported_vep_configs = { ('GRCh37', 'gcp', 'us-central1', 'hail.is'): VEPConfigGRCh37Version85( data_bucket='hail-qob-vep-grch37-us-central1', data_mount='/vep_data/', image=HAIL_GENETICS_VEP_GRCH37_85_IMAGE, regions=['us-central1'], cloud='gcp', data_bucket_is_requester_pays=True, ), ('GRCh38', 'gcp', 'us-central1', 'hail.is'): VEPConfigGRCh38Version95( data_bucket='hail-qob-vep-grch38-us-central1', data_mount='/vep_data/', image=HAIL_GENETICS_VEP_GRCH38_95_IMAGE, regions=['us-central1'], cloud='gcp', data_bucket_is_requester_pays=True, ), } def _supported_vep_config(cloud: str, reference_genome: str, *, regions: List[str]) -> VEPConfig: domain = get_deploy_config()._domain for region in regions: config_params = (reference_genome, cloud, region, domain) if config_params in supported_vep_configs: return supported_vep_configs[config_params] raise ValueError( f'could not find a supported vep configuration for reference genome {reference_genome}, ' f'cloud {cloud}, regions {regions}, and domain {domain}' ) def _service_vep( backend: ServiceBackend, ht: Table, config: Optional[VEPConfig], block_size: int, csq: bool, tolerate_parse_error: bool, temp_input_directory: str, temp_output_directory: str, ) -> Table: reference_genome = ht.locus.dtype.reference_genome.name cloud = async_to_blocking(backend._batch_client.cloud()) regions = backend.regions if config is not None: vep_config = config else: vep_config = _supported_vep_config(cloud, reference_genome, regions=regions) requester_pays_project = backend.flags.get('gcs_requester_pays_project') if requester_pays_project is None and vep_config.data_bucket_is_requester_pays and vep_config.cloud == 'gcp': raise ValueError( "No requester pays project has been set. " "Use hl.init(gcs_requester_pays_configuration='MY_PROJECT') " "to set the requester pays project to use." ) if csq: vep_typ = hl.tarray(hl.tstr) else: vep_typ = vep_config.json_typ def build_vep_batch(b: bc.aioclient.Batch, vep_input_path: str, vep_output_path: str): if csq: local_output_file = '/io/output' vep_command = vep_config.command( consequence=csq, part_id=-1, input_file=None, output_file=local_output_file, tolerate_parse_error=tolerate_parse_error, ) env = { 'VEP_BLOCK_SIZE': str(block_size), 'VEP_DATA_MOUNT': shq(vep_config.data_mount), 'VEP_CONSEQUENCE': str(int(csq)), 'VEP_TOLERATE_PARSE_ERROR': str(int(tolerate_parse_error)), 'VEP_PART_ID': str(-1), 'VEP_OUTPUT_FILE': local_output_file, 'VEP_COMMAND': vep_command, } env.update(vep_config.env) b.create_job( vep_config.image, vep_config.batch_run_csq_header_command, attributes={'name': 'csq-header'}, resources={'cpu': '1', 'memory': 'standard'}, cloudfuse=[(vep_config.data_bucket, vep_config.data_mount, True)], output_files=[(local_output_file, f'{vep_output_path}/csq-header')], regions=vep_config.regions, requester_pays_project=requester_pays_project, env=env, ) for f in hl.hadoop_ls(vep_input_path): path = f['path'] part_name = os.path.basename(path) if not part_name.startswith('part-'): continue part_id = int(part_name.split('-')[1]) local_input_file = '/io/input' local_output_file = '/io/output.gz' vep_command = vep_config.command( consequence=csq, part_id=part_id, input_file=local_input_file, output_file=local_output_file, tolerate_parse_error=tolerate_parse_error, ) env = { 'VEP_BLOCK_SIZE': str(block_size), 'VEP_DATA_MOUNT': shq(vep_config.data_mount), 'VEP_CONSEQUENCE': str(int(csq)), 'VEP_TOLERATE_PARSE_ERROR': str(int(tolerate_parse_error)), 'VEP_PART_ID': str(-1), 'VEP_INPUT_FILE': local_input_file, 'VEP_OUTPUT_FILE': local_output_file, 'VEP_COMMAND': vep_command, } env.update(vep_config.env) b.create_job( vep_config.image, vep_config.batch_run_command, attributes={'name': f'vep-{part_id}'}, resources={'cpu': '1', 'memory': 'standard'}, input_files=[(path, local_input_file)], output_files=[(local_output_file, f'{vep_output_path}/annotations/{part_name}.tsv.gz')], cloudfuse=[(vep_config.data_bucket, vep_config.data_mount, True)], regions=vep_config.regions, requester_pays_project=requester_pays_project, env=env, ) hl.export_vcf(ht, temp_input_directory, parallel='header_per_shard') starting_job_id = async_to_blocking(backend._batch.status())['n_jobs'] + 1 b = bc.client.Batch(backend._batch) build_vep_batch(b, temp_input_directory, temp_output_directory) b.submit(disable_progress_bar=True) try: status = b.wait( description='vep(...)', disable_progress_bar=backend.disable_progress_bar, progress=None, starting_job=starting_job_id, ) except BaseException as e: if isinstance(e, KeyboardInterrupt): print("Received a keyboard interrupt, cancelling the batch...") b.cancel() backend._batch = None raise if status['n_succeeded'] != status['n_jobs']: failing_job = next(iter(b.jobs('!success'))) failing_job = b.get_job(failing_job['job_id']) message = {'batch_status': status, 'job_status': failing_job.status(), 'log': failing_job.log()} raise FatalError(yamlx.dump(message)) annotations = hl.import_table( f'{temp_output_directory}/annotations/*', types={'variant': hl.tstr, 'vep': vep_typ, 'part_id': hl.tint, 'block_id': hl.tint}, force=True, ) annotations = annotations.annotate( vep_proc_id=hl.struct(part_id=annotations.part_id, block_id=annotations.block_id) ) annotations = annotations.drop('part_id', 'block_id') annotations = annotations.key_by(**hl.parse_variant(annotations.variant, reference_genome=reference_genome)) annotations = annotations.drop('variant') if csq: with hl.hadoop_open(f'{temp_output_directory}/csq-header') as f: vep_csq_header = f.read().rstrip() annotations = annotations.annotate_globals(vep_csq_header=vep_csq_header) return annotations
[docs]@typecheck( dataset=oneof(Table, MatrixTable), config=nullable(oneof(str, VEPConfig)), block_size=int, name=str, csq=bool, tolerate_parse_error=bool, ) def vep( dataset: Union[Table, MatrixTable], config: Optional[Union[str, VEPConfig]] = None, block_size: int = 1000, name: str = 'vep', csq: bool = False, tolerate_parse_error: bool = False, ): """Annotate variants with VEP. .. include:: ../_templates/req_tvariant.rst :func:`.vep` runs `Variant Effect Predictor <http://www.ensembl.org/info/docs/tools/vep/index.html>`__ on the current dataset and adds the result as a row field. Examples -------- Add VEP annotations to the dataset: >>> result = hl.vep(dataset, "data/vep-configuration.json") # doctest: +SKIP Notes ----- **Installation** This VEP command only works if you have already installed VEP on your computing environment. If you use `hailctl dataproc` to start Hail clusters, installing VEP is achieved by specifying the `--vep` flag. For more detailed instructions, see :ref:`vep_dataproc`. If you use `hailctl hdinsight`, see :ref:`vep_hdinsight`. **Spark Configuration** :func:`.vep` needs a configuration file to tell it how to run VEP. This is the ``config`` argument to the VEP function. If you are using `hailctl dataproc` as mentioned above, you can just use the default argument for ``config`` and everything will work. If you need to run VEP with Hail in other environments, there are detailed instructions below. The format of the configuration file is JSON, and :func:`.vep` expects a JSON object with three fields: - `command` (array of string) -- The VEP command line to run. The string literal `__OUTPUT_FORMAT_FLAG__` is replaced with `--json` or `--vcf` depending on `csq`. - `env` (object) -- A map of environment variables to values to add to the environment when invoking the command. The value of each object member must be a string. - `vep_json_schema` (string): The type of the VEP JSON schema (as produced by the VEP when invoked with the `--json` option). Note: This is the old-style 'parseable' Hail type syntax. This will change. Here is an example configuration file for invoking VEP release 85 installed in `/vep` with the Loftee plugin: .. code-block:: text { "command": [ "/vep", "--format", "vcf", "__OUTPUT_FORMAT_FLAG__", "--everything", "--allele_number", "--no_stats", "--cache", "--offline", "--minimal", "--assembly", "GRCh37", "--plugin", "LoF,human_ancestor_fa:/root/.vep/loftee_data/human_ancestor.fa.gz,filter_position:0.05,min_intron_size:15,conservation_file:/root/.vep/loftee_data/phylocsf_gerp.sql,gerp_file:/root/.vep/loftee_data/GERP_scores.final.sorted.txt.gz", "-o", "STDOUT" ], "env": { "PERL5LIB": "/vep_data/loftee" }, "vep_json_schema": "Struct{assembly_name:String,allele_string:String,ancestral:String,colocated_variants:Array[Struct{aa_allele:String,aa_maf:Float64,afr_allele:String,afr_maf:Float64,allele_string:String,amr_allele:String,amr_maf:Float64,clin_sig:Array[String],end:Int32,eas_allele:String,eas_maf:Float64,ea_allele:String,ea_maf:Float64,eur_allele:String,eur_maf:Float64,exac_adj_allele:String,exac_adj_maf:Float64,exac_allele:String,exac_afr_allele:String,exac_afr_maf:Float64,exac_amr_allele:String,exac_amr_maf:Float64,exac_eas_allele:String,exac_eas_maf:Float64,exac_fin_allele:String,exac_fin_maf:Float64,exac_maf:Float64,exac_nfe_allele:String,exac_nfe_maf:Float64,exac_oth_allele:String,exac_oth_maf:Float64,exac_sas_allele:String,exac_sas_maf:Float64,id:String,minor_allele:String,minor_allele_freq:Float64,phenotype_or_disease:Int32,pubmed:Array[Int32],sas_allele:String,sas_maf:Float64,somatic:Int32,start:Int32,strand:Int32}],context:String,end:Int32,id:String,input:String,intergenic_consequences:Array[Struct{allele_num:Int32,consequence_terms:Array[String],impact:String,minimised:Int32,variant_allele:String}],most_severe_consequence:String,motif_feature_consequences:Array[Struct{allele_num:Int32,consequence_terms:Array[String],high_inf_pos:String,impact:String,minimised:Int32,motif_feature_id:String,motif_name:String,motif_pos:Int32,motif_score_change:Float64,strand:Int32,variant_allele:String}],regulatory_feature_consequences:Array[Struct{allele_num:Int32,biotype:String,consequence_terms:Array[String],impact:String,minimised:Int32,regulatory_feature_id:String,variant_allele:String}],seq_region_name:String,start:Int32,strand:Int32,transcript_consequences:Array[Struct{allele_num:Int32,amino_acids:String,biotype:String,canonical:Int32,ccds:String,cdna_start:Int32,cdna_end:Int32,cds_end:Int32,cds_start:Int32,codons:String,consequence_terms:Array[String],distance:Int32,domains:Array[Struct{db:String,name:String}],exon:String,gene_id:String,gene_pheno:Int32,gene_symbol:String,gene_symbol_source:String,hgnc_id:String,hgvsc:String,hgvsp:String,hgvs_offset:Int32,impact:String,intron:String,lof:String,lof_flags:String,lof_filter:String,lof_info:String,minimised:Int32,polyphen_prediction:String,polyphen_score:Float64,protein_end:Int32,protein_start:Int32,protein_id:String,sift_prediction:String,sift_score:Float64,strand:Int32,swissprot:String,transcript_id:String,trembl:String,uniparc:String,variant_allele:String}],variant_class:String}" } The configuration files used by``hailctl dataproc`` can be found at the following locations: - ``GRCh37``: ``gs://hail-us-central1-vep/vep85-loftee-gcloud.json`` - ``GRCh38``: ``gs://hail-us-central1-vep/vep95-GRCh38-loftee-gcloud.json`` If no config file is specified, this function will check to see if environment variable `VEP_CONFIG_URI` is set with a path to a config file. **Batch Service Configuration** If no config is specified, Hail will use the user's Service configuration parameters to find a supported VEP configuration. However, if you wish to use your own implementation of VEP, then see the documentation for :class:`.VEPConfig`. **Annotations** A new row field is added in the location specified by `name` with type given by the type given by the `json_vep_schema` (if `csq` is ``False``) or :class:`.tarray` of :py:data:`.tstr` (if `csq` is ``True``). If csq is ``True``, then the CSQ header string is also added as a global field with name ``name + '_csq_header'``. Parameters ---------- dataset : :class:`.MatrixTable` or :class:`.Table` Dataset. config : :class:`str` or :class:`.VEPConfig`, optional Path to VEP configuration file or a VEPConfig object. block_size : :obj:`int` Number of rows to process per VEP invocation. name : :class:`str` Name for resulting row field. csq : :obj:`bool` If ``True``, annotates with the VCF CSQ field as a :py:data:`.tstr`. If ``False``, annotates as the `vep_json_schema`. tolerate_parse_error : :obj:`bool` If ``True``, ignore invalid JSON produced by VEP and return a missing annotation. Returns ------- :class:`.MatrixTable` or :class:`.Table` Dataset with new row-indexed field `name` containing VEP annotations. """ if isinstance(dataset, MatrixTable): require_row_key_variant(dataset, 'vep') ht = dataset.select_rows().rows() else: require_table_key_variant(dataset, 'vep') ht = dataset.select() ht = ht.distinct() backend = hl.current_backend() if isinstance(backend, ServiceBackend): with hl.TemporaryDirectory(prefix='qob/vep/inputs/') as vep_input_path: with hl.TemporaryDirectory(prefix='qob/vep/outputs/') as vep_output_path: annotations = _service_vep( backend, ht, config, block_size, csq, tolerate_parse_error, vep_input_path, vep_output_path ) annotations = annotations.checkpoint(new_temp_file()) else: if config is None: maybe_cloud_spark_provider = guess_cloud_spark_provider() maybe_config = os.getenv("VEP_CONFIG_URI") if maybe_config is not None: config = maybe_config elif maybe_cloud_spark_provider == 'hdinsight': warning( 'Assuming you are in a hailctl hdinsight cluster. If not, specify the config parameter to `hl.vep`.' ) config = 'file:/vep_data/vep-azure.json' else: raise ValueError("No config set and VEP_CONFIG_URI was not set.") annotations = Table( TableToTableApply( ht._tir, { 'name': 'VEP', 'config': config, 'csq': csq, 'blockSize': block_size, 'tolerateParseError': tolerate_parse_error, }, ) ).persist() if csq: dataset = dataset.annotate_globals(**{name + '_csq_header': annotations.index_globals()['vep_csq_header']}) if isinstance(dataset, MatrixTable): vep = annotations[dataset.row_key] return dataset.annotate_rows(**{name: vep.vep, name + '_proc_id': vep.vep_proc_id}) else: vep = annotations[dataset.key] return dataset.annotate(**{name: vep.vep, name + '_proc_id': vep.vep_proc_id})
[docs]@typecheck(dataset=oneof(Table, MatrixTable), config=str, block_size=int, name=str) def nirvana(dataset: Union[MatrixTable, Table], config, block_size=500000, name='nirvana'): """Annotate variants using `Nirvana <https://github.com/Illumina/Nirvana>`_. .. include:: ../_templates/experimental.rst .. include:: ../_templates/req_tvariant.rst :func:`.nirvana` runs `Nirvana <https://github.com/Illumina/Nirvana>`_ on the current dataset and adds a new row field in the location specified by `name`. Examples -------- Add Nirvana annotations to the dataset: >>> result = hl.nirvana(dataset, "data/nirvana.properties") # doctest: +SKIP **Configuration** :func:`.nirvana` requires a configuration file. The format is a `.properties file <https://en.wikipedia.org/wiki/.properties>`__, where each line defines a property as a key-value pair of the form ``key = value``. :func:`.nirvana` supports the following properties: - **hail.nirvana.dotnet** -- Location of dotnet. Optional, default: dotnet. - **hail.nirvana.path** -- Value of the PATH environment variable when invoking Nirvana. Optional, by default PATH is not set. - **hail.nirvana.location** -- Location of Nirvana.dll. Required. - **hail.nirvana.reference** -- Location of reference genome. Required. - **hail.nirvana.cache** -- Location of cache. Required. - **hail.nirvana.supplementaryAnnotationDirectory** -- Location of Supplementary Database. Optional, no supplementary database by default. Here is an example ``nirvana.properties`` configuration file: .. code-block:: text hail.nirvana.location = /path/to/dotnet/netcoreapp2.0/Nirvana.dll hail.nirvana.reference = /path/to/nirvana/References/Homo_sapiens.GRCh37.Nirvana.dat hail.nirvana.cache = /path/to/nirvana/Cache/GRCh37/Ensembl hail.nirvana.supplementaryAnnotationDirectory = /path/to/nirvana/SupplementaryDatabase/GRCh37 **Annotations** A new row field is added in the location specified by `name` with the following schema: .. code-block:: text struct { chromosome: str, refAllele: str, position: int32, altAlleles: array<str>, cytogeneticBand: str, quality: float64, filters: array<str>, jointSomaticNormalQuality: int32, copyNumber: int32, strandBias: float64, recalibratedQuality: float64, variants: array<struct { altAllele: str, refAllele: str, chromosome: str, begin: int32, end: int32, phylopScore: float64, isReferenceMinor: bool, variantType: str, vid: str, hgvsg: str, isRecomposedVariant: bool, isDecomposedVariant: bool, regulatoryRegions: array<struct { id: str, type: str, consequence: set<str> }>, clinvar: array<struct { id: str, reviewStatus: str, isAlleleSpecific: bool, alleleOrigins: array<str>, refAllele: str, altAllele: str, phenotypes: array<str>, medGenIds: array<str>, omimIds: array<str>, orphanetIds: array<str>, significance: str, lastUpdatedDate: str, pubMedIds: array<str> }>, cosmic: array<struct { id: str, isAlleleSpecific: bool, refAllele: str, altAllele: str, gene: str, sampleCount: int32, studies: array<struct { id: int32, histology: str, primarySite: str }> }>, dbsnp: struct { ids: array<str> }, globalAllele: struct { globalMinorAllele: str, globalMinorAlleleFrequency: float64 }, gnomad: struct { coverage: str, allAf: float64, allAc: int32, allAn: int32, allHc: int32, afrAf: float64, afrAc: int32, afrAn: int32, afrHc: int32, amrAf: float64, amrAc: int32, amrAn: int32, amrHc: int32, easAf: float64, easAc: int32, easAn: int32, easHc: int32, finAf: float64, finAc: int32, finAn: int32, finHc: int32, nfeAf: float64, nfeAc: int32, nfeAn: int32, nfeHc: int32, othAf: float64, othAc: int32, othAn: int32, othHc: int32, asjAf: float64, asjAc: int32, asjAn: int32, asjHc: int32, failedFilter: bool }, gnomadExome: struct { coverage: str, allAf: float64, allAc: int32, allAn: int32, allHc: int32, afrAf: float64, afrAc: int32, afrAn: int32, afrHc: int32, amrAf: float64, amrAc: int32, amrAn: int32, amrHc: int32, easAf: float64, easAc: int32, easAn: int32, easHc: int32, finAf: float64, finAc: int32, finAn: int32, finHc: int32, nfeAf: float64, nfeAc: int32, nfeAn: int32, nfeHc: int32, othAf: float64, othAc: int32, othAn: int32, othHc: int32, asjAf: float64, asjAc: int32, asjAn: int32, asjHc: int32, sasAf: float64, sasAc: int32, sasAn: int32, sasHc: int32, failedFilter: bool }, topmed: struct { failedFilter: bool, allAc: int32, allAn: int32, allAf: float64, allHc: int32 }, oneKg: struct { ancestralAllele: str, allAf: float64, allAc: int32, allAn: int32, afrAf: float64, afrAc: int32, afrAn: int32, amrAf: float64, amrAc: int32, amrAn: int32, easAf: float64, easAc: int32, easAn: int32, eurAf: float64, eurAc: int32, eurAn: int32, sasAf: float64, sasAc: int32, sasAn: int32 }, mitomap: array<struct { refAllele: str, altAllele: str, diseases : array<str>, hasHomoplasmy: bool, hasHeteroplasmy: bool, status: str, clinicalSignificance: str, scorePercentile: float64, isAlleleSpecific: bool, chromosome: str, begin: int32, end: int32, variantType: str } transcripts: struct { refSeq: array<struct { transcript: str, bioType: str, aminoAcids: str, cdnaPos: str, codons: str, cdsPos: str, exons: str, introns: str, geneId: str, hgnc: str, consequence: array<str>, hgvsc: str, hgvsp: str, isCanonical: bool, polyPhenScore: float64, polyPhenPrediction: str, proteinId: str, proteinPos: str, siftScore: float64, siftPrediction: str }>, ensembl: array<struct { transcript: str, bioType: str, aminoAcids: str, cdnaPos: str, codons: str, cdsPos: str, exons: str, introns: str, geneId: str, hgnc: str, consequence: array<str>, hgvsc: str, hgvsp: str, isCanonical: bool, polyPhenScore: float64, polyPhenPrediction: str, proteinId: str, proteinPos: str, siftScore: float64, siftPrediction: str }> }, overlappingGenes: array<str> }> genes: array<struct { name: str, omim: array<struct { mimNumber: int32, hgnc: str, description: str, phenotypes: array<struct { mimNumber: int32, phenotype: str, mapping: str, inheritance: array<str>, comments: str }> }> exac: struct { pLi: float64, pRec: float64, pNull: float64 } }> } Parameters ---------- dataset : :class:`.MatrixTable` or :class:`.Table` Dataset. config : :class:`str` Path to Nirvana configuration file. block_size : :obj:`int` Number of rows to process per Nirvana invocation. name : :class:`str` Name for resulting row field. Returns ------- :class:`.MatrixTable` or :class:`.Table` Dataset with new row-indexed field `name` containing Nirvana annotations. """ if isinstance(dataset, MatrixTable): require_row_key_variant(dataset, 'nirvana') ht = dataset.select_rows().rows() else: require_table_key_variant(dataset, 'nirvana') ht = dataset.select() annotations = Table( TableToTableApply(ht._tir, {'name': 'Nirvana', 'config': config, 'blockSize': block_size}) ).persist() if isinstance(dataset, MatrixTable): return dataset.annotate_rows(**{name: annotations[dataset.row_key].nirvana}) else: return dataset.annotate(**{name: annotations[dataset.key].nirvana})
class _VariantSummary(object): def __init__(self, rg, n_variants, alleles_per_variant, variants_per_contig, allele_types, nti, ntv): self.rg = rg self.n_variants = n_variants self.alleles_per_variant = alleles_per_variant self.variants_per_contig = variants_per_contig self.allele_types = allele_types self.nti = nti self.ntv = ntv def __repr__(self): return self.__str__() def _repr_html_(self): return self._html_string() def __str__(self): contig_idx = {contig: i for i, contig in enumerate(self.rg.contigs)} max_contig_len = max(len(contig) for contig in self.variants_per_contig) contig_formatter = f'%{max_contig_len}s' max_allele_count_len = max(len(str(x)) for x in self.alleles_per_variant) allele_count_formatter = f'%{max_allele_count_len}s' max_allele_type_len = max(len(x) for x in self.allele_types) allele_type_formatter = f'%{max_allele_type_len}s' line_break = '==============================' builder = [] builder.append(line_break) builder.append(f'Number of variants: {self.n_variants}') builder.append(line_break) builder.append('Alleles per variant') builder.append('-------------------') for n_alleles, count in sorted(self.alleles_per_variant.items(), key=lambda x: x[0]): builder.append(f' {allele_count_formatter % n_alleles} alleles: {count} variants') builder.append(line_break) builder.append('Variants per contig') builder.append('-------------------') for contig, count in sorted(self.variants_per_contig.items(), key=lambda x: contig_idx[x[0]]): builder.append(f' {contig_formatter % contig}: {count} variants') builder.append(line_break) builder.append('Allele type distribution') builder.append('------------------------') for allele_type, count in Counter(self.allele_types).most_common(): summary = f' {allele_type_formatter % allele_type}: {count} alternate alleles' if allele_type == 'SNP': nti = self.nti ntv = self.ntv summary += f' (Ti: {nti}, Tv: {ntv}, ratio: {nti / ntv:.2f})' builder.append(summary) builder.append(line_break) return '\n'.join(builder) def _html_string(self): contig_idx = {contig: i for i, contig in enumerate(self.rg.contigs)} import html builder = [] builder.append('<p><b>Variant summary:</b></p>') builder.append('<ul>') builder.append(f'<li><p>Total variants: {self.n_variants}</p></li>') builder.append('<li><p>Alleles per variant:</p>') builder.append('<table><thead style="font-weight: bold;">') builder.append('<tr><th>Number of alleles</th><th>Count</th></tr></thead><tbody>') for n_alleles, count in sorted(self.alleles_per_variant.items(), key=lambda x: x[0]): builder.append('<tr>') builder.append(f'<td>{n_alleles}</td>') builder.append(f'<td>{count}</td>') builder.append('</tr>') builder.append('</tbody></table>') builder.append('</li>') builder.append('<li><p>Counts by allele type:</p>') builder.append('<table><thead style="font-weight: bold;">') builder.append('<tr><th>Allele type</th><th>Count</th></tr></thead><tbody>') for allele_type, count in Counter(self.allele_types).most_common(): builder.append('<tr>') builder.append(f'<td>{html.escape(allele_type)}</td>') builder.append(f'<td>{count}</td>') builder.append('</tr>') builder.append('</tbody></table>') builder.append('</li>') builder.append('<li><p>Transitions/Transversions:</p>') builder.append('<table><thead style="font-weight: bold;">') builder.append('<tr><th>Metric</th><th>Value</th></tr></thead><tbody>') builder.append(f'<tr><td>Transitions</td><td>{self.nti}</td></tr>') builder.append(f'<tr><td>Transversions</td><td>{self.ntv}</td></tr>') builder.append(f'<tr><td>Ratio</td><td>{self.nti / self.ntv:.2f}</td></tr>') builder.append('</tbody></table>') builder.append('</li>') builder.append('<li><p>Variants per contig:</p>') builder.append('<table><thead style="font-weight: bold;">') builder.append('<tr><th>Contig</th><th>Count</th></tr></thead><tbody>') for contig, count in sorted(self.variants_per_contig.items(), key=lambda x: contig_idx[x[0]]): builder.append('<tr>') builder.append(f'<td>{html.escape(contig)}</td>') builder.append(f'<td>{count}</td>') builder.append('</tr>') builder.append('</tbody></table>') builder.append('</li>') builder.append('</ul>') return ''.join(builder)
[docs]@typecheck(mt=oneof(Table, MatrixTable), show=bool, handler=anytype) def summarize_variants(mt: Union[MatrixTable, MatrixTable], show=True, *, handler=None): """Summarize the variants present in a dataset and print the results. Examples -------- >>> hl.summarize_variants(dataset) # doctest: +SKIP ============================== Number of variants: 346 ============================== Alleles per variant ------------------- 2 alleles: 346 variants ============================== Variants per contig ------------------- 20: 346 variants ============================== Allele type distribution ------------------------ SNP: 301 alleles Deletion: 27 alleles Insertion: 18 alleles ============================== Parameters ---------- mt : :class:`.MatrixTable` or :class:`.Table` Matrix table with a variant (locus / alleles) row key. show : :obj:`bool` If ``True``, print results instead of returning them. handler Notes ----- The result returned if `show` is ``False`` is a :class:`.Struct` with five fields: - `n_variants` (:obj:`int`): Number of variants present in the matrix table. - `allele_types` (:obj:`dict` [:obj:`str`, :obj:`int`]): Number of alternate alleles in each allele allele category. - `contigs` (:obj:`dict` [:obj:`str`, :obj:`int`]): Number of variants on each contig. - `allele_counts` (:obj:`dict` [:obj:`int`, :obj:`int`]): Number of variants broken down by number of alleles (biallelic is 2, for example). - `r_ti_tv` (:obj:`float`): Ratio of transition alternate alleles to transversion alternate alleles. Returns ------- :obj:`None` or :class:`.Struct` Returns ``None`` if `show` is ``True``, or returns results as a struct. """ require_row_key_variant(mt, 'summarize_variants') if isinstance(mt, MatrixTable): ht = mt.rows() else: ht = mt allele_pairs = hl.range(1, hl.len(ht.alleles)).map(lambda i: (ht.alleles[0], ht.alleles[i])) def explode_result(alleles): ref, alt = alleles return ( hl.agg.counter(hl.allele_type(ref, alt)), hl.agg.count_where(hl.is_transition(ref, alt)), hl.agg.count_where(hl.is_transversion(ref, alt)), ) (allele_types, nti, ntv), contigs, allele_counts, n_variants = ht.aggregate(( hl.agg.explode(explode_result, allele_pairs), hl.agg.counter(ht.locus.contig), hl.agg.counter(hl.len(ht.alleles)), hl.agg.count(), )) rg = ht.locus.dtype.reference_genome if show: summary = _VariantSummary(rg, n_variants, allele_counts, contigs, allele_types, nti, ntv) if handler is None: handler = hl.utils.default_handler() handler(summary) else: return hl.Struct( allele_types=allele_types, contigs=contigs, allele_counts=allele_counts, n_variants=n_variants, r_ti_tv=nti / ntv, )
[docs]@typecheck( ds=oneof(MatrixTable, lambda: hl.vds.VariantDataset), min_af=numeric, max_af=numeric, min_dp=int, max_dp=int, min_gq=int, ref_AF=nullable(expr_float64), ) def compute_charr( ds: Union[MatrixTable, 'hl.vds.VariantDataset'], min_af: float = 0.05, max_af: float = 0.95, min_dp: int = 10, max_dp: int = 100, min_gq: int = 20, ref_AF: Optional[Float64Expression] = None, ): """Compute CHARR, the DNA sample contamination estimator. .. include:: ../_templates/experimental.rst Notes ----- The returned table has the sample ID field, plus the field: - `charr` (float64): CHARR contamination estimation. Note ----- It is possible to use gnomAD reference allele frequencies with the following: >>> gnomad_sites = hl.experimental.load_dataset('gnomad_genome_sites', version='3.1.2') # doctest: +SKIP >>> charr_result = hl.compute_charr(mt, ref_af=(1 - gnomad_sites[mt.row_key].freq[0].AF)) # doctest: +SKIP If the dataset is loaded from a gvcf and has NON_REF alleles, drop the last allele with the following or load it with the hail vcf combiner: >>> mt = mt.key_rows_by(locus=mt.locus, alleles=mt.alleles[:-1]) Parameters ---------- ds : :class:`.MatrixTable` or :class:`.VariantDataset` Dataset. min_af Minimum reference allele frequency to filter variants. max_af Maximum reference allele frequency to filter variants. min_dp Minimum sequencing depth to filter variants. max_dp Maximum sequencing depth to filter variants. min_gq Minimum genotype quality to filter variants ref_AF Reference AF expression. Necessary when the sample size is below 10,000. Returns ------- :class:`.Table` """ # Determine whether the input data is in the VDS format; if not, convert matrixtable to VDS and extract only the variant call information if isinstance(ds, hl.vds.VariantDataset): mt = ds.variant_data else: mt = ds if all(x in mt.entry for x in ['LA', 'LAD', 'LGT', 'GQ']): ad_field = 'LAD' gt_field = 'LGT' elif all(x in mt.entry for x in ['AD', 'GT', 'GQ']): ad_field = 'AD' gt_field = 'GT' else: raise ValueError( f"'compute_charr': require a VDS or MatrixTable with fields LAD/LAD/LGT/GQ/DP or AD/GT/GQ/DP," f" found entry fields {list(mt.entry)}" ) # Annotate reference allele frequency when it is not defined in the original data, and name it 'ref_AF'. ref_af_field = '__ref_af' if ref_AF is None: n_samples = mt.count_cols() if n_samples < 10000: raise ValueError( "'compute_charr': with fewer than 10,000 samples, require a reference AF in 'reference_data_source'." ) n_alleles = 2 * n_samples mt = mt.annotate_rows(**{ref_af_field: 1 - hl.agg.sum(mt[gt_field].n_alt_alleles()) / n_alleles}) else: mt = mt.annotate_rows(**{ref_af_field: ref_AF}) # Filter to autosomal biallelic SNVs with reference allele frequency within the range (min_af, max_af) rg = mt.locus.dtype.reference_genome.name if rg == 'GRCh37': mt = hl.filter_intervals(mt, [hl.parse_locus_interval('1-22', reference_genome=rg)]) elif rg == 'GRCh38': mt = hl.filter_intervals(mt, [hl.parse_locus_interval('chr1-chr22', reference_genome=rg)]) else: mt = mt.filter_rows(mt.locus.in_autosome()) mt = mt.filter_rows( (hl.len(mt.alleles) == 2) & hl.is_snp(mt.alleles[0], mt.alleles[1]) & (mt[ref_af_field] > min_af) & (mt[ref_af_field] < max_af) ) # Filter to variant calls with GQ above min_gq and DP within the range (min_dp, max_dp) ad_dp = mt['DP'] if 'DP' in mt.entry else hl.sum(mt[ad_field]) mt = mt.filter_entries(mt[gt_field].is_hom_var() & (mt.GQ >= min_gq) & (ad_dp >= min_dp) & (ad_dp <= max_dp)) # Compute CHARR mt = mt.select_cols(charr=hl.agg.mean((mt[ad_field][0] / (mt[ad_field][0] + mt[ad_field][1])) / mt[ref_af_field])) mt = mt.select_globals( af_min=min_af, af_max=max_af, dp_min=min_dp, dp_max=max_dp, gq_min=min_gq, ) return mt.cols()