import functools
import operator
import hail as hl
from hail.genetics.reference_genome import reference_genome_type
from hail.table import Table
from hail.typecheck import nullable, sequenceof, typecheck
from hail.utils import new_temp_file
from hail.utils.java import info
[docs]@typecheck(
    path=str,
    reference_genome=nullable(reference_genome_type),
    skip_invalid_contigs=bool,
    min_partitions=nullable(int),
    force_bgz=bool,
    force=bool,
)
def import_gtf(
    path, reference_genome=None, skip_invalid_contigs=False, min_partitions=None, force_bgz=False, force=False
) -> Table:
    """Import a GTF file.
    The GTF file format is identical to the GFF version 2 file format,
    and so this function can be used to import GFF version 2 files as
    well.
    See https://www.ensembl.org/info/website/upload/gff.html for more
    details on the GTF/GFF2 file format.
    The :class:`.Table` returned by this function will be keyed by the
    ``interval`` row field and will include the following row fields:
    .. code-block:: text
        'source': str
        'feature': str
        'score': float64
        'strand': str
        'frame': int32
        'interval': interval<>
    There will also be corresponding fields for every tag found in the
    attribute field of the GTF file.
    Note
    ----
    This function will return an ``interval`` field of type :class:`.tinterval`
    constructed from the ``seqname``, ``start``, and ``end`` fields in the
    GTF file. This interval is inclusive of both the start and end positions
    in the GTF file.
    If the ``reference_genome`` parameter is specified, the start and end
    points of the ``interval`` field will be of type :class:`.tlocus`.
    Otherwise, the start and end points of the ``interval`` field will be of
    type :class:`.tstruct` with fields ``seqname`` (type :class:`str`) and
    ``position`` (type :obj:`.tint32`).
    Furthermore, if the ``reference_genome`` parameter is specified and
    ``skip_invalid_contigs`` is ``True``, this import function will skip
    lines in the GTF where ``seqname`` is not consistent with the reference
    genome specified.
    Example
    -------
    >>> ht = hl.experimental.import_gtf('data/test.gtf',
    ...                                 reference_genome='GRCh37',
    ...                                 skip_invalid_contigs=True)
    >>> ht.describe()  # doctest: +SKIP_OUTPUT_CHECK
    ----------------------------------------
    Global fields:
    None
    ----------------------------------------
    Row fields:
        'source': str
        'feature': str
        'score': float64
        'strand': str
        'frame': int32
        'gene_type': str
        'exon_id': str
        'havana_transcript': str
        'level': str
        'transcript_name': str
        'gene_status': str
        'gene_id': str
        'transcript_type': str
        'tag': str
        'transcript_status': str
        'gene_name': str
        'transcript_id': str
        'exon_number': str
        'havana_gene': str
        'interval': interval<locus<GRCh37>>
    ----------------------------------------
    Key: ['interval']
    ----------------------------------------
    Parameters
    ----------
    path : :class:`str`
        File to import.
    reference_genome : :class:`str` or :class:`.ReferenceGenome`, optional
        Reference genome to use.
    skip_invalid_contigs : :obj:`bool`
        If ``True`` and `reference_genome` is not ``None``, skip lines where
        ``seqname`` is not consistent with the reference genome.
    min_partitions : :obj:`int` or :obj:`None`
        Minimum number of partitions (passed to import_table).
    force_bgz : :obj:`bool`
        If ``True``, load files as blocked gzip files, assuming
        that they were actually compressed using the BGZ codec. This option is
        useful when the file extension is not ``'.bgz'``, but the file is
        blocked gzip, so that the file can be read in parallel and not on a
        single node.
    force : :obj:`bool`
        If ``True``, load gzipped files serially on one core. This should
        be used only when absolutely necessary, as processing time will be
        increased due to lack of parallelism.
    Returns
    -------
    :class:`.Table`
    """
    ht = hl.import_table(
        path,
        min_partitions=min_partitions,
        comment='#',
        no_header=True,
        types={'f3': hl.tint, 'f4': hl.tint, 'f5': hl.tfloat, 'f7': hl.tint},
        missing='.',
        delimiter='\t',
        force_bgz=force_bgz,
        force=force,
    )
    ht = ht.rename({
        'f0': 'seqname',
        'f1': 'source',
        'f2': 'feature',
        'f3': 'start',
        'f4': 'end',
        'f5': 'score',
        'f6': 'strand',
        'f7': 'frame',
        'f8': 'attribute',
    })
    def parse_attributes(unparsed_attributes):
        def parse_attribute(attribute):
            key_and_value = attribute.split(' ')
            key = key_and_value[0]
            value = key_and_value[1]
            return (key, value.replace('"|;\\$', ''))
        return hl.dict(unparsed_attributes.split('; ').map(parse_attribute))
    ht = ht.annotate(attribute=parse_attributes(ht['attribute']))
    ht = ht.checkpoint(new_temp_file())
    attributes = ht.aggregate(hl.agg.explode(lambda x: hl.agg.collect_as_set(x), ht['attribute'].keys()))
    ht = ht.transmute(**{x: hl.or_missing(ht['attribute'].contains(x), ht['attribute'][x]) for x in attributes if x})
    if reference_genome:
        if reference_genome.name == 'GRCh37':
            ht = ht.annotate(
                seqname=hl.case()
                .when((ht['seqname'] == 'M') | (ht['seqname'] == 'chrM'), 'MT')
                .when(ht['seqname'].startswith('chr'), ht['seqname'].replace('^chr', ''))
                .default(ht['seqname'])
            )
        else:
            ht = ht.annotate(
                seqname=hl.case()
                .when(ht['seqname'].startswith('HLA'), ht['seqname'])
                .when(ht['seqname'].startswith('chrHLA'), ht['seqname'].replace('^chr', ''))
                .when(ht['seqname'].startswith('chr'), ht['seqname'])
                .default('chr' + ht['seqname'])
            )
        if skip_invalid_contigs:
            valid_contigs = hl.literal(set(reference_genome.contigs))
            ht = ht.filter(valid_contigs.contains(ht['seqname']))
        ht = ht.transmute(
            interval=hl.locus_interval(
                ht['seqname'],
                ht['start'],
                ht['end'],
                includes_start=True,
                includes_end=True,
                reference_genome=reference_genome,
            )
        )
    else:
        ht = ht.transmute(
            interval=hl.interval(
                hl.struct(seqname=ht['seqname'], position=ht['start']),
                hl.struct(seqname=ht['seqname'], position=ht['end']),
                includes_start=True,
                includes_end=True,
            )
        )
    ht = ht.key_by('interval')
    return ht 
[docs]@typecheck(
    gene_symbols=nullable(sequenceof(str)),
    gene_ids=nullable(sequenceof(str)),
    transcript_ids=nullable(sequenceof(str)),
    verbose=bool,
    reference_genome=nullable(reference_genome_type),
    gtf_file=nullable(str),
)
def get_gene_intervals(
    gene_symbols=None, gene_ids=None, transcript_ids=None, verbose=True, reference_genome=None, gtf_file=None
):
    """Get intervals of genes or transcripts.
    Get the boundaries of genes or transcripts from a GTF file, for quick filtering of a Table or MatrixTable.
    On Google Cloud platform:
    Gencode v19 (GRCh37) GTF available at: gs://hail-common/references/gencode/gencode.v19.annotation.gtf.bgz
    Gencode v29 (GRCh38) GTF available at: gs://hail-common/references/gencode/gencode.v29.annotation.gtf.bgz
    Example
    -------
    >>> hl.filter_intervals(ht, get_gene_intervals(gene_symbols=['PCSK9'], reference_genome='GRCh37'))  # doctest: +SKIP
    Parameters
    ----------
    gene_symbols : :obj:`list` of :class:`str`, optional
       Gene symbols (e.g. PCSK9).
    gene_ids : :obj:`list` of :class:`str`, optional
       Gene IDs (e.g. ENSG00000223972).
    transcript_ids : :obj:`list` of :class:`str`, optional
       Transcript IDs (e.g. ENSG00000223972).
    verbose : :obj:`bool`
       If ``True``, print which genes and transcripts were matched in the GTF file.
    reference_genome : :class:`str` or :class:`.ReferenceGenome`, optional
       Reference genome to use (passed along to import_gtf).
    gtf_file : :class:`str`
       GTF file to load. If none is provided, but `reference_genome` is one of
       `GRCh37` or `GRCh38`, a default will be used (on Google Cloud Platform).
    Returns
    -------
    :obj:`list` of :class:`.Interval`
    """
    if gene_symbols is None and gene_ids is None and transcript_ids is None:
        raise ValueError('get_gene_intervals requires at least one of gene_symbols, gene_ids, or transcript_ids')
    ht = _load_gencode_gtf(gtf_file, reference_genome)
    criteria = []
    if gene_symbols:
        criteria.append(hl.any(lambda y: (ht.feature == 'gene') & (ht.gene_name == y), gene_symbols))
    if gene_ids:
        criteria.append(hl.any(lambda y: (ht.feature == 'gene') & (ht.gene_id == y.split('\\.')[0]), gene_ids))
    if transcript_ids:
        criteria.append(
            hl.any(lambda y: (ht.feature == 'transcript') & (ht.transcript_id == y.split('\\.')[0]), transcript_ids)
        )
    ht = ht.filter(functools.reduce(operator.ior, criteria))
    gene_info = ht.aggregate(hl.agg.collect((ht.feature, ht.gene_name, ht.gene_id, ht.transcript_id, ht.interval)))
    if verbose:
        info(
            f'get_gene_intervals found {len(gene_info)} entries:\n'
            + "\n".join(map(lambda x: f'{x[0]}: {x[1]} ({x[2] if x[0] == "gene" else x[3]})', gene_info))
        )
    intervals = list(map(lambda x: x[-1], gene_info))
    return intervals 
def _load_gencode_gtf(gtf_file=None, reference_genome=None):
    """
    Get Gencode GTF (from file or reference genome)
    Parameters
    ----------
    reference_genome : :class:`.ReferenceGenome`, optional
       Reference genome to use (passed along to import_gtf).
    gtf_file : :class:`str`
       GTF file to load. If none is provided, but `reference_genome` is one of
       `GRCh37` or `GRCh38`, a default will be used (on Google Cloud Platform).
    Returns
    -------
    :class:`.Table`
    """
    GTFS = {
        'GRCh37': 'gs://hail-common/references/gencode/gencode.v19.annotation.gtf.bgz',
        'GRCh38': 'gs://hail-common/references/gencode/gencode.v29.annotation.gtf.bgz',
    }
    if reference_genome is None:
        reference_genome = hl.default_reference().name
    else:
        reference_genome = reference_genome.name
    if gtf_file is None:
        gtf_file = GTFS.get(reference_genome)
        if gtf_file is None:
            raise ValueError(
                'get_gene_intervals requires a GTF file, or the reference genome be one of GRCh37 or GRCh38 (when on Google Cloud Platform)'
            )
    ht = hl.experimental.import_gtf(
        gtf_file, reference_genome=reference_genome, skip_invalid_contigs=True, min_partitions=12
    )
    ht = ht.annotate(gene_id=ht.gene_id.split('\\.')[0], transcript_id=ht.transcript_id.split('\\.')[0])
    return ht