Source code for hail.experimental.import_gtf

import functools
import operator

import hail as hl
from hail.genetics.reference_genome import reference_genome_type
from hail.table import Table
from hail.typecheck import nullable, sequenceof, typecheck
from hail.utils import new_temp_file
from hail.utils.java import info


[docs]@typecheck( path=str, reference_genome=nullable(reference_genome_type), skip_invalid_contigs=bool, min_partitions=nullable(int), force_bgz=bool, force=bool, ) def import_gtf( path, reference_genome=None, skip_invalid_contigs=False, min_partitions=None, force_bgz=False, force=False ) -> Table: """Import a GTF file. The GTF file format is identical to the GFF version 2 file format, and so this function can be used to import GFF version 2 files as well. See https://www.ensembl.org/info/website/upload/gff.html for more details on the GTF/GFF2 file format. The :class:`.Table` returned by this function will be keyed by the ``interval`` row field and will include the following row fields: .. code-block:: text 'source': str 'feature': str 'score': float64 'strand': str 'frame': int32 'interval': interval<> There will also be corresponding fields for every tag found in the attribute field of the GTF file. Note ---- This function will return an ``interval`` field of type :class:`.tinterval` constructed from the ``seqname``, ``start``, and ``end`` fields in the GTF file. This interval is inclusive of both the start and end positions in the GTF file. If the ``reference_genome`` parameter is specified, the start and end points of the ``interval`` field will be of type :class:`.tlocus`. Otherwise, the start and end points of the ``interval`` field will be of type :class:`.tstruct` with fields ``seqname`` (type :class:`str`) and ``position`` (type :obj:`.tint32`). Furthermore, if the ``reference_genome`` parameter is specified and ``skip_invalid_contigs`` is ``True``, this import function will skip lines in the GTF where ``seqname`` is not consistent with the reference genome specified. Example ------- >>> ht = hl.experimental.import_gtf('data/test.gtf', ... reference_genome='GRCh37', ... skip_invalid_contigs=True) >>> ht.describe() # doctest: +SKIP_OUTPUT_CHECK ---------------------------------------- Global fields: None ---------------------------------------- Row fields: 'source': str 'feature': str 'score': float64 'strand': str 'frame': int32 'gene_type': str 'exon_id': str 'havana_transcript': str 'level': str 'transcript_name': str 'gene_status': str 'gene_id': str 'transcript_type': str 'tag': str 'transcript_status': str 'gene_name': str 'transcript_id': str 'exon_number': str 'havana_gene': str 'interval': interval<locus<GRCh37>> ---------------------------------------- Key: ['interval'] ---------------------------------------- Parameters ---------- path : :class:`str` File to import. reference_genome : :class:`str` or :class:`.ReferenceGenome`, optional Reference genome to use. skip_invalid_contigs : :obj:`bool` If ``True`` and `reference_genome` is not ``None``, skip lines where ``seqname`` is not consistent with the reference genome. min_partitions : :obj:`int` or :obj:`None` Minimum number of partitions (passed to import_table). force_bgz : :obj:`bool` If ``True``, load files as blocked gzip files, assuming that they were actually compressed using the BGZ codec. This option is useful when the file extension is not ``'.bgz'``, but the file is blocked gzip, so that the file can be read in parallel and not on a single node. force : :obj:`bool` If ``True``, load gzipped files serially on one core. This should be used only when absolutely necessary, as processing time will be increased due to lack of parallelism. Returns ------- :class:`.Table` """ ht = hl.import_table( path, min_partitions=min_partitions, comment='#', no_header=True, types={'f3': hl.tint, 'f4': hl.tint, 'f5': hl.tfloat, 'f7': hl.tint}, missing='.', delimiter='\t', force_bgz=force_bgz, force=force, ) ht = ht.rename({ 'f0': 'seqname', 'f1': 'source', 'f2': 'feature', 'f3': 'start', 'f4': 'end', 'f5': 'score', 'f6': 'strand', 'f7': 'frame', 'f8': 'attribute', }) def parse_attributes(unparsed_attributes): def parse_attribute(attribute): key_and_value = attribute.split(' ') key = key_and_value[0] value = key_and_value[1] return (key, value.replace('"|;\\$', '')) return hl.dict(unparsed_attributes.split('; ').map(parse_attribute)) ht = ht.annotate(attribute=parse_attributes(ht['attribute'])) ht = ht.checkpoint(new_temp_file()) attributes = ht.aggregate(hl.agg.explode(lambda x: hl.agg.collect_as_set(x), ht['attribute'].keys())) ht = ht.transmute(**{x: hl.or_missing(ht['attribute'].contains(x), ht['attribute'][x]) for x in attributes if x}) if reference_genome: if reference_genome.name == 'GRCh37': ht = ht.annotate( seqname=hl.case() .when((ht['seqname'] == 'M') | (ht['seqname'] == 'chrM'), 'MT') .when(ht['seqname'].startswith('chr'), ht['seqname'].replace('^chr', '')) .default(ht['seqname']) ) else: ht = ht.annotate( seqname=hl.case() .when(ht['seqname'].startswith('HLA'), ht['seqname']) .when(ht['seqname'].startswith('chrHLA'), ht['seqname'].replace('^chr', '')) .when(ht['seqname'].startswith('chr'), ht['seqname']) .default('chr' + ht['seqname']) ) if skip_invalid_contigs: valid_contigs = hl.literal(set(reference_genome.contigs)) ht = ht.filter(valid_contigs.contains(ht['seqname'])) ht = ht.transmute( interval=hl.locus_interval( ht['seqname'], ht['start'], ht['end'], includes_start=True, includes_end=True, reference_genome=reference_genome, ) ) else: ht = ht.transmute( interval=hl.interval( hl.struct(seqname=ht['seqname'], position=ht['start']), hl.struct(seqname=ht['seqname'], position=ht['end']), includes_start=True, includes_end=True, ) ) ht = ht.key_by('interval') return ht
[docs]@typecheck( gene_symbols=nullable(sequenceof(str)), gene_ids=nullable(sequenceof(str)), transcript_ids=nullable(sequenceof(str)), verbose=bool, reference_genome=nullable(reference_genome_type), gtf_file=nullable(str), ) def get_gene_intervals( gene_symbols=None, gene_ids=None, transcript_ids=None, verbose=True, reference_genome=None, gtf_file=None ): """Get intervals of genes or transcripts. Get the boundaries of genes or transcripts from a GTF file, for quick filtering of a Table or MatrixTable. On Google Cloud platform: Gencode v19 (GRCh37) GTF available at: gs://hail-common/references/gencode/gencode.v19.annotation.gtf.bgz Gencode v29 (GRCh38) GTF available at: gs://hail-common/references/gencode/gencode.v29.annotation.gtf.bgz Example ------- >>> hl.filter_intervals(ht, get_gene_intervals(gene_symbols=['PCSK9'], reference_genome='GRCh37')) # doctest: +SKIP Parameters ---------- gene_symbols : :obj:`list` of :class:`str`, optional Gene symbols (e.g. PCSK9). gene_ids : :obj:`list` of :class:`str`, optional Gene IDs (e.g. ENSG00000223972). transcript_ids : :obj:`list` of :class:`str`, optional Transcript IDs (e.g. ENSG00000223972). verbose : :obj:`bool` If ``True``, print which genes and transcripts were matched in the GTF file. reference_genome : :class:`str` or :class:`.ReferenceGenome`, optional Reference genome to use (passed along to import_gtf). gtf_file : :class:`str` GTF file to load. If none is provided, but `reference_genome` is one of `GRCh37` or `GRCh38`, a default will be used (on Google Cloud Platform). Returns ------- :obj:`list` of :class:`.Interval` """ if gene_symbols is None and gene_ids is None and transcript_ids is None: raise ValueError('get_gene_intervals requires at least one of gene_symbols, gene_ids, or transcript_ids') ht = _load_gencode_gtf(gtf_file, reference_genome) criteria = [] if gene_symbols: criteria.append(hl.any(lambda y: (ht.feature == 'gene') & (ht.gene_name == y), gene_symbols)) if gene_ids: criteria.append(hl.any(lambda y: (ht.feature == 'gene') & (ht.gene_id == y.split('\\.')[0]), gene_ids)) if transcript_ids: criteria.append( hl.any(lambda y: (ht.feature == 'transcript') & (ht.transcript_id == y.split('\\.')[0]), transcript_ids) ) ht = ht.filter(functools.reduce(operator.ior, criteria)) gene_info = ht.aggregate(hl.agg.collect((ht.feature, ht.gene_name, ht.gene_id, ht.transcript_id, ht.interval))) if verbose: info( f'get_gene_intervals found {len(gene_info)} entries:\n' + "\n".join(map(lambda x: f'{x[0]}: {x[1]} ({x[2] if x[0] == "gene" else x[3]})', gene_info)) ) intervals = list(map(lambda x: x[-1], gene_info)) return intervals
def _load_gencode_gtf(gtf_file=None, reference_genome=None): """ Get Gencode GTF (from file or reference genome) Parameters ---------- reference_genome : :class:`.ReferenceGenome`, optional Reference genome to use (passed along to import_gtf). gtf_file : :class:`str` GTF file to load. If none is provided, but `reference_genome` is one of `GRCh37` or `GRCh38`, a default will be used (on Google Cloud Platform). Returns ------- :class:`.Table` """ GTFS = { 'GRCh37': 'gs://hail-common/references/gencode/gencode.v19.annotation.gtf.bgz', 'GRCh38': 'gs://hail-common/references/gencode/gencode.v29.annotation.gtf.bgz', } if reference_genome is None: reference_genome = hl.default_reference().name else: reference_genome = reference_genome.name if gtf_file is None: gtf_file = GTFS.get(reference_genome) if gtf_file is None: raise ValueError( 'get_gene_intervals requires a GTF file, or the reference genome be one of GRCh37 or GRCh38 (when on Google Cloud Platform)' ) ht = hl.experimental.import_gtf( gtf_file, reference_genome=reference_genome, skip_invalid_contigs=True, min_partitions=12 ) ht = ht.annotate(gene_id=ht.gene_id.split('\\.')[0], transcript_id=ht.transcript_id.split('\\.')[0]) return ht