import os
import zipfile
from urllib.request import urlretrieve
import hail as hl
from hailtop.utils import sync_retry_transient_errors
from .java import Env, info
from .misc import local_path_uri, new_local_temp_dir, new_temp_file
__all__ = ['get_1kg', 'get_hgdp', 'get_movie_lens']
resources = {
'1kg_annotations': 'https://storage.googleapis.com/hail-tutorial/1kg_annotations.txt',
'1kg_matrix_table': 'https://storage.googleapis.com/hail-tutorial/1kg.vcf.bgz',
'1kg_ensembl_gene_annotations': 'https://storage.googleapis.com/hail-tutorial/ensembl_gene_annotations.txt',
'HGDP_annotations': 'https://storage.googleapis.com/hail-tutorial/hgdp/hgdp_pop_and_sex_annotations.tsv',
'HGDP_matrix_table': 'https://storage.googleapis.com/hail-tutorial/hgdp/hgdp_subset.vcf.bgz',
'HGDP_ensembl_gene_annotations': 'https://storage.googleapis.com/hail-tutorial/hgdp/hgdp_gene_annotations.tsv',
'movie_lens_100k': 'https://files.grouplens.org/datasets/movielens/ml-100k.zip',
}
tmp_dir: str = None
def init_temp_dir():
global tmp_dir
if tmp_dir is None:
tmp_dir = new_local_temp_dir()
def _dir_exists(fs, path):
return fs.exists(path) and fs.is_dir(path)
def _file_exists(fs, path):
return fs.exists(path) and fs.is_file(path)
def _copy_to_tmp(fs, src, extension=None):
dst = new_temp_file(extension=extension)
fs.copy(src, dst)
return dst
[docs]def get_1kg(output_dir, overwrite: bool = False):
"""Download subset of the `1000 Genomes <http://www.internationalgenome.org/>`__
dataset and sample annotations.
Notes
-----
The download is about 15M.
Parameters
----------
output_dir
Directory in which to write data.
overwrite
If ``True``, overwrite any existing files/directories at `output_dir`.
"""
fs = Env.fs()
if not _dir_exists(fs, output_dir):
fs.mkdir(output_dir)
matrix_table_path = os.path.join(output_dir, '1kg.mt')
vcf_path = os.path.join(output_dir, '1kg.vcf.bgz')
sample_annotations_path = os.path.join(output_dir, '1kg_annotations.txt')
gene_annotations_path = os.path.join(output_dir, 'ensembl_gene_annotations.txt')
if (
overwrite
or not _dir_exists(fs, matrix_table_path)
or not _file_exists(fs, sample_annotations_path)
or not _file_exists(fs, vcf_path)
or not _file_exists(fs, gene_annotations_path)
):
init_temp_dir()
tmp_vcf = os.path.join(tmp_dir, '1kg.vcf.bgz')
source = resources['1kg_matrix_table']
info(f'downloading 1KG VCF ...\n' f' Source: {source}')
sync_retry_transient_errors(urlretrieve, resources['1kg_matrix_table'], tmp_vcf)
cluster_readable_vcf = _copy_to_tmp(fs, local_path_uri(tmp_vcf), extension='vcf.bgz')
info('importing VCF and writing to matrix table...')
hl.import_vcf(cluster_readable_vcf, min_partitions=16).write(matrix_table_path, overwrite=True)
tmp_sample_annot = os.path.join(tmp_dir, '1kg_annotations.txt')
source = resources['1kg_annotations']
info(f'downloading 1KG annotations ...\n' f' Source: {source}')
sync_retry_transient_errors(urlretrieve, source, tmp_sample_annot)
tmp_gene_annot = os.path.join(tmp_dir, 'ensembl_gene_annotations.txt')
source = resources['1kg_ensembl_gene_annotations']
info(f'downloading Ensembl gene annotations ...\n' f' Source: {source}')
sync_retry_transient_errors(urlretrieve, source, tmp_gene_annot)
hl.hadoop_copy(local_path_uri(tmp_sample_annot), sample_annotations_path)
hl.hadoop_copy(local_path_uri(tmp_gene_annot), gene_annotations_path)
hl.hadoop_copy(local_path_uri(tmp_vcf), vcf_path)
info('Done!')
else:
info('1KG files found')
[docs]def get_hgdp(output_dir, overwrite: bool = False):
"""Download subset of the `Human Genome Diversity Panel
<https://www.internationalgenome.org/data-portal/data-collection/hgdp/>`__
dataset and sample annotations.
Notes
-----
The download is about 30MB.
Parameters
----------
output_dir
Directory in which to write data.
overwrite
If ``True``, overwrite any existing files/directories at `output_dir`.
"""
fs = Env.fs()
if not _dir_exists(fs, output_dir):
fs.mkdir(output_dir)
matrix_table_path = os.path.join(output_dir, 'HGDP.mt')
vcf_path = os.path.join(output_dir, 'HGDP.vcf.bgz')
sample_annotations_path = os.path.join(output_dir, 'HGDP_annotations.txt')
gene_annotations_path = os.path.join(output_dir, 'ensembl_gene_annotations.txt')
if (
overwrite
or not _dir_exists(fs, matrix_table_path)
or not _file_exists(fs, sample_annotations_path)
or not _file_exists(fs, vcf_path)
or not _file_exists(fs, gene_annotations_path)
):
init_temp_dir()
tmp_vcf = os.path.join(tmp_dir, 'HGDP.vcf.bgz')
source = resources['HGDP_matrix_table']
info(f'downloading HGDP VCF ...\n' f' Source: {source}')
sync_retry_transient_errors(urlretrieve, resources['HGDP_matrix_table'], tmp_vcf)
cluster_readable_vcf = _copy_to_tmp(fs, local_path_uri(tmp_vcf), extension='vcf.bgz')
info('importing VCF and writing to matrix table...')
hl.import_vcf(cluster_readable_vcf, min_partitions=16, reference_genome='GRCh38').write(
matrix_table_path, overwrite=True
)
tmp_sample_annot = os.path.join(tmp_dir, 'HGDP_annotations.txt')
source = resources['HGDP_annotations']
info(f'downloading HGDP annotations ...\n' f' Source: {source}')
sync_retry_transient_errors(urlretrieve, source, tmp_sample_annot)
tmp_gene_annot = os.path.join(tmp_dir, 'ensembl_gene_annotations.txt')
source = resources['HGDP_ensembl_gene_annotations']
info(f'downloading Ensembl gene annotations ...\n' f' Source: {source}')
sync_retry_transient_errors(urlretrieve, source, tmp_gene_annot)
hl.hadoop_copy(local_path_uri(tmp_sample_annot), sample_annotations_path)
hl.hadoop_copy(local_path_uri(tmp_gene_annot), gene_annotations_path)
hl.hadoop_copy(local_path_uri(tmp_vcf), vcf_path)
info('Done!')
else:
info('HGDP files found')
[docs]def get_movie_lens(output_dir, overwrite: bool = False):
"""Download public Movie Lens dataset.
Notes
-----
The download is about 6M.
See the
`MovieLens website <https://grouplens.org/datasets/movielens/100k/>`__
for more information about this dataset.
Parameters
----------
output_dir
Directory in which to write data.
overwrite
If ``True``, overwrite existing files/directories at those locations.
"""
fs = Env.fs()
if not _dir_exists(fs, output_dir):
fs.mkdir(output_dir)
paths = [os.path.join(output_dir, x) for x in ['movies.ht', 'ratings.ht', 'users.ht']]
if overwrite or any(not _dir_exists(fs, f) for f in paths):
init_temp_dir()
source = resources['movie_lens_100k']
tmp_path = os.path.join(tmp_dir, 'ml-100k.zip')
info(f'downloading MovieLens-100k data ...\n' f' Source: {source}')
sync_retry_transient_errors(urlretrieve, source, tmp_path)
with zipfile.ZipFile(tmp_path, 'r') as z:
z.extractall(tmp_dir)
user_table_path = os.path.join(tmp_dir, 'ml-100k', 'u.user')
movie_table_path = os.path.join(tmp_dir, 'ml-100k', 'u.item')
ratings_table_path = os.path.join(tmp_dir, 'ml-100k', 'u.data')
assert os.path.exists(user_table_path)
assert os.path.exists(movie_table_path)
assert os.path.exists(ratings_table_path)
user_cluster_readable = _copy_to_tmp(fs, local_path_uri(user_table_path), extension='txt')
movie_cluster_readable = _copy_to_tmp(fs, local_path_uri(movie_table_path), 'txt')
ratings_cluster_readable = _copy_to_tmp(fs, local_path_uri(ratings_table_path), 'txt')
[movies_path, ratings_path, users_path] = paths
genres = [
'Action',
'Adventure',
'Animation',
"Children's",
'Comedy',
'Crime',
'Documentary',
'Drama',
'Fantasy',
'Film-Noir',
'Horror',
'Musical',
'Mystery',
'Romance',
'Sci-Fi',
'Thriller',
'War',
'Western',
]
# utility functions for importing movies
def field_to_array(ds, field):
return hl.if_else(ds[field] != 0, hl.array([field]), hl.empty_array(hl.tstr))
def fields_to_array(ds, fields):
return hl.flatten(hl.array([field_to_array(ds, f) for f in fields]))
def rename_columns(ht, new_names):
return ht.rename({k: v for k, v in zip(ht.row, new_names)})
info(f'importing users table and writing to {users_path} ...')
users = rename_columns(
hl.import_table(user_cluster_readable, key=['f0'], no_header=True, impute=True, delimiter='|'),
['id', 'age', 'sex', 'occupation', 'zipcode'],
)
users.write(users_path, overwrite=True)
info(f'importing movies table and writing to {movies_path} ...')
movies = hl.import_table(movie_cluster_readable, key=['f0'], no_header=True, impute=True, delimiter='|')
movies = rename_columns(
movies, ['id', 'title', 'release date', 'video release date', 'IMDb URL', 'unknown', *genres]
)
movies = movies.drop('release date', 'video release date', 'unknown', 'IMDb URL')
movies = movies.transmute(genres=fields_to_array(movies, genres))
movies.write(movies_path, overwrite=True)
info(f'importing ratings table and writing to {ratings_path} ...')
ratings = hl.import_table(ratings_cluster_readable, no_header=True, impute=True)
ratings = rename_columns(ratings, ['user_id', 'movie_id', 'rating', 'timestamp'])
ratings = ratings.drop('timestamp')
ratings.write(ratings_path, overwrite=True)
else:
info('Movie Lens files found!')