Source code for hail.utils.hadoop_utils

import gzip
import io
import os.path
import sys
from typing import Any, Dict, List

from hail.fs.hadoop_fs import HadoopFS
from hail.typecheck import enumeration, typecheck
from hail.utils.java import Env, info


[docs]@typecheck(path=str, mode=enumeration('r', 'w', 'x', 'rb', 'wb', 'xb'), buffer_size=int) def hadoop_open(path: str, mode: str = 'r', buffer_size: int = 8192): """Open a file through the Hadoop filesystem API. Supports distributed file systems like hdfs, gs, and s3. Warning ------- Due to an implementation limitation, :func:`hadoop_open` may be quite slow for large data sets (anything larger than 50 MB). Examples -------- Write a Pandas DataFrame as a CSV directly into Google Cloud Storage: >>> with hadoop_open('gs://my-bucket/df.csv', 'w') as f: # doctest: +SKIP ... pandas_df.to_csv(f) Read and print the lines of a text file stored in Google Cloud Storage: >>> with hadoop_open('gs://my-bucket/notes.txt') as f: # doctest: +SKIP ... for line in f: ... print(line.strip()) Write two lines directly to a file in Google Cloud Storage: >>> with hadoop_open('gs://my-bucket/notes.txt', 'w') as f: # doctest: +SKIP ... f.write('result1: %s\\n' % result1) ... f.write('result2: %s\\n' % result2) Unpack a packed Python struct directly from a file in Google Cloud Storage: >>> from struct import unpack >>> with hadoop_open('gs://my-bucket/notes.txt', 'rb') as f: # doctest: +SKIP ... print(unpack('<f', bytearray(f.read()))) Notes ----- The supported modes are: - ``'r'`` -- Readable text file (:class:`io.TextIOWrapper`). Default behavior. - ``'w'`` -- Writable text file (:class:`io.TextIOWrapper`). - ``'x'`` -- Exclusive writable text file (:class:`io.TextIOWrapper`). Throws an error if a file already exists at the path. - ``'rb'`` -- Readable binary file (:class:`io.BufferedReader`). - ``'wb'`` -- Writable binary file (:class:`io.BufferedWriter`). - ``'xb'`` -- Exclusive writable binary file (:class:`io.BufferedWriter`). Throws an error if a file already exists at the path. The provided destination file path must be a URI (uniform resource identifier). .. caution:: These file handles are slower than standard Python file handles. If you are writing a large file (larger than ~50M), it will be faster to write to a local file using standard Python I/O and use :func:`.hadoop_copy` to move your file to a distributed file system. Parameters ---------- path : :class:`str` Path to file. mode : :class:`str` File access mode. buffer_size : :obj:`int` Buffer size, in bytes. Returns ------- Readable or writable file handle. """ # pile of hacks to preserve some legacy behavior, like auto gzip fs = Env.fs() if isinstance(fs, HadoopFS): return fs.legacy_open(path, mode, buffer_size) _, ext = os.path.splitext(path) if ext in ('.gz', '.bgz'): binary_mode = 'wb' if mode[0] == 'w' else 'rb' file = fs.open(path, binary_mode, buffer_size) file = gzip.GzipFile(fileobj=file, mode=mode) if 'b' not in mode: file = io.TextIOWrapper(file, encoding='utf-8') else: file = fs.open(path, mode, buffer_size) return file
[docs]@typecheck(src=str, dest=str) def hadoop_copy(src, dest): """Copy a file through the Hadoop filesystem API. Supports distributed file systems like hdfs, gs, and s3. Examples -------- Copy a file from Google Cloud Storage to a local file: >>> hadoop_copy('gs://hail-common/LCR.interval_list', ... 'file:///mnt/data/LCR.interval_list') # doctest: +SKIP Notes ---- Try using :func:`.hadoop_open` first, it's simpler, but not great for large data! For example: >>> with hadoop_open('gs://my_bucket/results.csv', 'r') as f: #doctest: +SKIP ... pandas_df.to_csv(f) The provided source and destination file paths must be URIs (uniform resource identifiers). Parameters ---------- src: :class:`str` Source file URI. dest: :class:`str` Destination file URI. """ return Env.fs().copy(src, dest)
[docs]def hadoop_exists(path: str) -> bool: """Returns ``True`` if `path` exists. Parameters ---------- path : :class:`str` Returns ------- :obj:`.bool` """ return Env.fs().exists(path)
[docs]def hadoop_is_file(path: str) -> bool: """Returns ``True`` if `path` both exists and is a file. Parameters ---------- path : :class:`str` Returns ------- :obj:`.bool` """ return Env.fs().is_file(path)
[docs]def hadoop_is_dir(path: str) -> bool: """Returns ``True`` if `path` both exists and is a directory. Parameters ---------- path : :class:`str` Returns ------- :obj:`.bool` """ return Env.fs().is_dir(path)
[docs]def hadoop_stat(path: str) -> Dict[str, Any]: """Returns information about the file or directory at a given path. Notes ----- Raises an error if `path` does not exist. The resulting dictionary contains the following data: - is_dir (:obj:`bool`) -- Path is a directory. - size_bytes (:obj:`int`) -- Size in bytes. - size (:class:`str`) -- Size as a readable string. - modification_time (:class:`str`) -- Time of last file modification. - owner (:class:`str`) -- Owner. - path (:class:`str`) -- Path. Parameters ---------- path : :class:`str` Returns ------- :obj:`dict` """ return Env.fs().stat(path).to_legacy_dict()
[docs]def hadoop_ls(path: str) -> List[Dict[str, Any]]: """Returns information about files at `path`. Notes ----- Raises an error if `path` does not exist. If `path` is a file, returns a list with one element. If `path` is a directory, returns an element for each file contained in `path` (does not search recursively). Each dict element of the result list contains the following data: - is_dir (:obj:`bool`) -- Path is a directory. - size_bytes (:obj:`int`) -- Size in bytes. - size (:class:`str`) -- Size as a readable string. - modification_time (:class:`str`) -- Time of last file modification. - owner (:class:`str`) -- Owner. - path (:class:`str`) -- Path. Parameters ---------- path : :class:`str` Returns ------- :obj:`list` [:obj:`dict`] """ return [sr.to_legacy_dict() for sr in Env.fs().ls(path)]
[docs]def hadoop_scheme_supported(scheme: str) -> bool: """Returns ``True`` if the Hadoop filesystem supports URLs with the given scheme. Examples -------- >>> hadoop_scheme_supported('gs') # doctest: +SKIP Notes ----- URLs with the `https` scheme are only supported if they are specifically Azure Blob Storage URLs of the form `https://<ACCOUNT_NAME>.blob.core.windows.net/<CONTAINER_NAME>/<PATH>` Parameters ---------- scheme : :class:`str` Returns ------- :obj:`.bool` """ return Env.fs().supports_scheme(scheme)
[docs]def copy_log(path: str) -> None: """Attempt to copy the session log to a hadoop-API-compatible location. Examples -------- Specify a manual path: >>> hl.copy_log('gs://my-bucket/analysis-10-jan19.log') # doctest: +SKIP INFO: copying log to 'gs://my-bucket/analysis-10-jan19.log'... Copy to a directory: >>> hl.copy_log('gs://my-bucket/') # doctest: +SKIP INFO: copying log to 'gs://my-bucket/hail-20180924-2018-devel-46e5fad57524.log'... Notes ----- Since Hail cannot currently log directly to distributed file systems, this function is provided as a utility for offloading logs from ephemeral nodes. If `path` is a directory, then the log file will be copied using its base name to the directory (e.g. ``/home/hail.log`` would be copied as ``gs://my-bucket/hail.log`` if `path` is ``gs://my-bucket``. Parameters ---------- path: :class:`str` """ from hail.utils import local_path_uri log = os.path.realpath(Env.hc()._log) try: if hadoop_is_dir(path): _, tail = os.path.split(log) path = os.path.join(path, tail) info(f"copying log to {path!r}...") hadoop_copy(local_path_uri(log), path) except Exception as e: sys.stderr.write(f'Could not copy log: encountered error:\n {e}')