Source code for hail.context

import sys
import os
from urllib.parse import urlparse, urlunparse

import pkg_resources
from pyspark import SparkContext

import hail
from hail.genetics.reference_genome import ReferenceGenome
from hail.typecheck import nullable, typecheck, typecheck_method, enumeration, dictof
from hail.utils import get_env_or_default
from hail.utils.java import Env, FatalError, warning
from hail.backend import Backend


def _get_tmpdir(tmpdir):
    if tmpdir is None:
        tmpdir = '/tmp'
    return tmpdir


def _get_local_tmpdir(local_tmpdir):
    local_tmpdir = get_env_or_default(local_tmpdir, 'TMPDIR', 'file:///tmp')
    r = urlparse(local_tmpdir)
    if not r.scheme:
        r = r._replace(scheme='file')
    elif r.scheme != 'file':
        raise ValueError('invalid local_tmpfile: must use scheme file, got scheme {r.scheme}')
    return urlunparse(r)


def _get_log(log):
    if log is None:
        py_version = version()
        log = hail.utils.timestamp_path(os.path.join(os.getcwd(), 'hail'),
                                        suffix=f'-{py_version}.log')
    return log


class HailContext(object):
    @typecheck_method(log=str,
                      quiet=bool,
                      append=bool,
                      tmpdir=str,
                      local_tmpdir=str,
                      default_reference=str,
                      global_seed=nullable(int),
                      backend=Backend)
    def __init__(self, log, quiet, append, tmpdir, local_tmpdir,
                 default_reference, global_seed, backend):
        assert not Env._hc

        super(HailContext, self).__init__()

        self._log = log

        self._tmpdir = tmpdir
        self._local_tmpdir = local_tmpdir

        self._backend = backend

        self._warn_cols_order = True
        self._warn_entries_order = True

        Env._hc = self

        ReferenceGenome._from_config(self._backend.get_reference('GRCh37'), True)
        ReferenceGenome._from_config(self._backend.get_reference('GRCh38'), True)
        ReferenceGenome._from_config(self._backend.get_reference('GRCm38'), True)
        ReferenceGenome._from_config(self._backend.get_reference('CanFam3'), True)

        if default_reference in ReferenceGenome._references:
            self._default_ref = ReferenceGenome._references[default_reference]
        else:
            self._default_ref = ReferenceGenome.read(default_reference)

        if not quiet:
            py_version = version()
            sys.stderr.write(
                'Welcome to\n'
                '     __  __     <>__\n'
                '    / /_/ /__  __/ /\n'
                '   / __  / _ `/ / /\n'
                '  /_/ /_/\\_,_/_/_/   version {}\n'.format(py_version))

            if py_version.startswith('devel'):
                sys.stderr.write('NOTE: This is a beta version. Interfaces may change\n'
                                 '  during the beta period. We recommend pulling\n'
                                 '  the latest changes weekly.\n')
            sys.stderr.write(f'LOGGING: writing to {log}\n')

        if global_seed is None:
            global_seed = 6348563392232659379
        Env.set_seed(global_seed)

    @property
    def default_reference(self):
        return self._default_ref

    def stop(self):
        self._backend.stop()
        self._backend = None
        Env._hc = None
        Env._dummy_table = None
        Env._seed_generator = None
        hail.ir.clear_session_functions()
        ReferenceGenome._references = {}


[docs]@typecheck(sc=nullable(SparkContext), app_name=str, master=nullable(str), local=str, log=nullable(str), quiet=bool, append=bool, min_block_size=int, branching_factor=int, tmp_dir=str, default_reference=enumeration('GRCh37', 'GRCh38', 'GRCm38', 'CanFam3'), idempotent=bool, global_seed=nullable(int), spark_conf=nullable(dictof(str, str)), skip_logging_configuration=bool, local_tmpdir=nullable(str), _optimizer_iterations=nullable(int)) def init(sc=None, app_name='Hail', master=None, local='local[*]', log=None, quiet=False, append=False, min_block_size=0, branching_factor=50, tmp_dir='/tmp', default_reference='GRCh37', idempotent=False, global_seed=6348563392232659379, spark_conf=None, skip_logging_configuration=False, local_tmpdir=None, _optimizer_iterations=None): """Initialize Hail and Spark. Examples -------- Import and initialize Hail using GRCh38 as the default reference genome: >>> import hail as hl >>> hl.init(default_reference='GRCh38') # doctest: +SKIP Notes ----- Hail is not only a Python library; most of Hail is written in Java/Scala and runs together with Apache Spark in the Java Virtual Machine (JVM). In order to use Hail, a JVM needs to run as well. The :func:`.init` function is used to initialize Hail and Spark. This function also sets global configuration parameters used for the Hail session, like the default reference genome and log file location. This function will be called automatically (with default parameters) if any Hail functionality requiring the backend (most of the libary!) is used. To initialize Hail explicitly with non-default arguments, be sure to do so directly after importing the module, as in the above example. Note ---- If a :class:`pyspark.SparkContext` is already running, then Hail must be initialized with it as an argument: >>> hl.init(sc=sc) # doctest: +SKIP See Also -------- :func:`.stop` Parameters ---------- sc : pyspark.SparkContext, optional Spark context. By default, a Spark context will be created. app_name : :obj:`str` Spark application name. master : :obj:`str`, optional Spark master. local : :obj:`str` Local-mode master, used if `master` is not defined here or in the Spark configuration. log : :obj:`str` Local path for Hail log file. Does not currently support distributed file systems like Google Storage, S3, or HDFS. quiet : :obj:`bool` Print fewer log messages. append : :obj:`bool` Append to the end of the log file. min_block_size : :obj:`int` Minimum file block size in MB. branching_factor : :obj:`int` Branching factor for tree aggregation. tmp_dir : :obj:`str`, optional Networked temporary directory. Must be a network-visible file path. Defaults to /tmp in the default scheme. default_reference : :obj:`str` Default reference genome. Either ``'GRCh37'``, ``'GRCh38'``, ``'GRCm38'``, or ``'CanFam3'``. idempotent : :obj:`bool` If ``True``, calling this function is a no-op if Hail has already been initialized. global_seed : :obj:`int`, optional Global random seed. spark_conf : :obj:`dict[str, str]`, optional Spark configuration parameters. skip_logging_configuration : :obj:`bool` Skip logging configuration in java and python. local_tmpdir : :obj:`str`, optional Local temporary directory. Used on driver and executor nodes. Must use the file scheme. Defaults to TMPDIR, or /tmp. """ from hail.backend.spark_backend import SparkBackend if Env._hc: if idempotent: return else: warning('Hail has already been initialized. If this call was intended to change configuration,' ' close the session with hl.stop() first.') log = _get_log(log) tmpdir = _get_tmpdir(tmp_dir) local_tmpdir = _get_local_tmpdir(local_tmpdir) optimizer_iterations = get_env_or_default(_optimizer_iterations, 'HAIL_OPTIMIZER_ITERATIONS', 3) backend = SparkBackend( idempotent, sc, spark_conf, app_name, master, local, log, quiet, append, min_block_size, branching_factor, tmpdir, local_tmpdir, skip_logging_configuration, optimizer_iterations) HailContext( log, quiet, append, tmp_dir, local_tmpdir, default_reference, global_seed, backend)
@typecheck( billing_project=nullable(str), bucket=nullable(str), log=nullable(str), quiet=bool, append=bool, tmpdir=nullable(str), local_tmpdir=nullable(str), default_reference=enumeration('GRCh37', 'GRCh38', 'GRCm38', 'CanFam3'), global_seed=nullable(int), skip_logging_configuration=bool) def init_service( billing_project: str = None, bucket: str = None, log=None, quiet=False, append=False, tmpdir=None, local_tmpdir=None, default_reference='GRCh37', global_seed=6348563392232659379, skip_logging_configuration=False): from hail.backend.service_backend import ServiceBackend backend = ServiceBackend(billing_project, bucket, skip_logging_configuration=skip_logging_configuration) log = _get_log(log) tmpdir = _get_tmpdir(tmpdir) local_tmpdir = _get_local_tmpdir(local_tmpdir) HailContext( log, quiet, append, tmpdir, local_tmpdir, default_reference, global_seed, backend) @typecheck( log=nullable(str), quiet=bool, append=bool, branching_factor=int, tmpdir=nullable(str), default_reference=enumeration('GRCh37', 'GRCh38', 'GRCm38', 'CanFam3'), global_seed=nullable(int), skip_logging_configuration=bool, _optimizer_iterations=nullable(int)) def init_local( log=None, quiet=False, append=False, branching_factor=50, tmpdir=None, default_reference='GRCh37', global_seed=6348563392232659379, skip_logging_configuration=False, _optimizer_iterations=None): from hail.backend.local_backend import LocalBackend log = _get_log(log) tmpdir = _get_tmpdir(tmpdir) optimizer_iterations = get_env_or_default(_optimizer_iterations, 'HAIL_OPTIMIZER_ITERATIONS', 3) backend = LocalBackend( tmpdir, log, quiet, append, branching_factor, skip_logging_configuration, optimizer_iterations) HailContext( log, quiet, append, tmpdir, tmpdir, default_reference, global_seed, backend)
[docs]def version(): """Get the installed hail version. Returns ------- str """ if hail.__version__ is None: # https://stackoverflow.com/questions/6028000/how-to-read-a-static-file-from-inside-a-python-package hail.__version__ = pkg_resources.resource_string(__name__, 'hail_version').decode().strip() return hail.__version__
def _hail_cite_url(): v = version() [tag, sha_prefix] = v.split("-") if pkg_resources.resource_exists(__name__, "hail-all-spark.jar"): # pip installed return f"https://github.com/hail-is/hail/releases/tag/{tag}" return f"https://github.com/hail-is/hail/commit/{sha_prefix}"
[docs]def citation(*, bibtex=False): """Generate a Hail citation. Parameters ---------- bibtex : bool Generate a citation in BibTeX form. Returns ------- str """ if bibtex: return f"@misc{{Hail," \ f" author = {{Hail Team}}," \ f" title = {{Hail}}," \ f" howpublished = {{\\url{{{_hail_cite_url()}}}}}" \ f"}}" return f"Hail Team. Hail {version()}. {_hail_cite_url()}."
def cite_hail(): return citation(bibtex=False) def cite_hail_bibtex(): return citation(bibtex=True)
[docs]def stop(): """Stop the currently running Hail session.""" if Env._hc: Env.hc().stop()
[docs]def spark_context(): """Returns the active Spark context. Returns ------- :class:`pyspark.SparkContext` """ return Env.spark_backend('spark_context').sc
def current_backend(): return Env.hc()._backend
[docs]def default_reference(): """Returns the default reference genome ``'GRCh37'``. Returns ------- :class:`.ReferenceGenome` """ return Env.hc().default_reference
[docs]def get_reference(name) -> 'hail.ReferenceGenome': """Returns the reference genome corresponding to `name`. Notes ----- Hail's built-in references are ``'GRCh37'``, ``GRCh38'``, ``'GRCm38'``, and ``'CanFam3'``. The contig names and lengths come from the GATK resource bundle: `human_g1k_v37.dict <ftp://gsapubftp-anonymous@ftp.broadinstitute.org/bundle/b37/human_g1k_v37.dict>`__ and `Homo_sapiens_assembly38.dict <ftp://gsapubftp-anonymous@ftp.broadinstitute.org/bundle/hg38/Homo_sapiens_assembly38.dict>`__. If ``name='default'``, the value of :func:`.default_reference` is returned. Parameters ---------- name : :obj:`str` Name of a previously loaded reference genome or one of Hail's built-in references: ``'GRCh37'``, ``'GRCh38'``, ``'GRCm38'``, ``'CanFam3'``, and ``'default'``. Returns ------- :class:`.ReferenceGenome` """ Env.hc() if name == 'default': return default_reference() else: return ReferenceGenome._references[name]
[docs]@typecheck(seed=int) def set_global_seed(seed): """Sets Hail's global seed to `seed`. Parameters ---------- seed : :obj:`int` Integer used to seed Hail's random number generator """ Env.set_seed(seed)
def _set_flags(**flags): available = set(Env.backend()._jhc.flags().available()) invalid = [] for flag, value in flags.items(): if flag in available: Env.backend()._jhc.flags().set(flag, value) else: invalid.append(flag) if len(invalid) != 0: raise FatalError("Flags {} not valid. Valid flags: \n {}" .format(', '.join(invalid), '\n '.join(available))) def _get_flags(*flags): return {flag: Env.backend()._jhc.flags().get(flag) for flag in flags} def debug_info(): hail_jar_path = None if pkg_resources.resource_exists(__name__, "hail-all-spark.jar"): hail_jar_path = pkg_resources.resource_filename(__name__, "hail-all-spark.jar") return { 'spark_conf': spark_context()._conf.getAll(), 'hail_jar_path': hail_jar_path, 'version': version() }