Batch

class hailtop.batch.batch.Batch(name=None, backend=None, attributes=None, requester_pays_project=None, default_image=None, default_memory=None, default_cpu=None, default_storage=None, default_regions=None, default_timeout=None, default_shell=None, default_python_image=None, default_spot=None, project=None, cancel_after_n_failures=None)

Bases: object

Object representing the distributed acyclic graph (DAG) of jobs to run.

Examples

Create a batch object:

>>> import hailtop.batch as hb
>>> p = hb.Batch()

Create a new job that prints “hello”:

>>> t = p.new_job()
>>> t.command(f'echo "hello" ')

Execute the DAG:

>>> p.run()

Require all jobs in this batch to execute in us-central1:

>>> b = hb.Batch(backend=hb.ServiceBackend(), default_regions=['us-central1'])

Notes

The methods Batch.read_input() and Batch.read_input_group() are for adding input files to a batch. An input file is a file that already exists before executing a batch and is not present in the docker container the job is being run in.

Files generated by executing a job are temporary files and must be written to a permanent location using the method Batch.write_output().

Parameters:
  • name (Optional[str]) – Name of the batch.

  • backend (Union[LocalBackend, ServiceBackend, None]) – Backend used to execute the jobs. If no backend is specified, a backend will be created by first looking at the environment variable HAIL_BATCH_BACKEND, then the hailctl config variable batch/backend. These configurations, if set, can be either local or service, and will result in the use of a LocalBackend and ServiceBackend respectively. If no argument is given and no configurations are set, the default is LocalBackend.

  • attributes (Optional[Dict[str, str]]) – Key-value pairs of additional attributes. ‘name’ is not a valid keyword. Use the name argument instead.

  • requester_pays_project (Optional[str]) – The name of the Google project to be billed when accessing requester pays buckets.

  • default_image (Optional[str]) – Default docker image to use for Bash jobs. This must be the full name of the image including any repository prefix and tags if desired (default tag is latest).

  • default_memory (Union[str, int, None]) – Memory setting to use by default if not specified by a job. Only applicable if a docker image is specified for the LocalBackend or the ServiceBackend. See Job.memory().

  • default_cpu (Union[str, int, float, None]) – CPU setting to use by default if not specified by a job. Only applicable if a docker image is specified for the LocalBackend or the ServiceBackend. See Job.cpu().

  • default_storage (Union[str, int, None]) – Storage setting to use by default if not specified by a job. Only applicable for the ServiceBackend. See Job.storage().

  • default_regions (Optional[List[str]]) – Cloud regions in which jobs may run. When unspecified or None, use the regions attribute of ServiceBackend. See ServiceBackend for details.

  • default_timeout (Union[int, float, None]) – Maximum time in seconds for a job to run before being killed. Only applicable for the ServiceBackend. If None, there is no timeout.

  • default_python_image (Optional[str]) – Default image to use for all Python jobs. This must be the full name of the image including any repository prefix and tags if desired (default tag is latest). The image must have the dill Python package installed and have the same version of Python installed that is currently running. If None, a tag of the hailgenetics/hail image will be chosen according to the current Hail and Python version.

  • default_spot (Optional[bool]) – If unspecified or True, jobs will run by default on spot instances. If False, jobs will run by default on non-spot instances. Each job can override this setting with Job.spot().

  • project (Optional[str]) – DEPRECATED: please specify google_project on the ServiceBackend instead. If specified, the project to use when authenticating with Google Storage. Google Storage is used to transfer serialized values between this computer and the cloud machines that execute Python jobs.

  • cancel_after_n_failures (Optional[int]) – Automatically cancel the batch after N failures have occurred. The default behavior is there is no limit on the number of failures. Only applicable for the ServiceBackend. Must be greater than 0.

Methods

from_batch_id

Create a Batch from an existing batch id.

new_bash_job

Initialize a BashJob object with default memory, storage, image, and CPU settings (defined in Batch) upon batch creation.

new_job

Alias for Batch.new_bash_job()

new_python_job

Initialize a new PythonJob object with default Python image, memory, storage, and CPU settings (defined in Batch) upon batch creation.

read_input

Create a new input resource file object representing a single file.

read_input_group

Create a new resource group representing a mapping of identifier to input resource files.

run

Execute a batch.

select_jobs

Select all jobs in the batch whose name matches pattern.

write_output

Write resource file or resource file group to an output destination.

static from_batch_id(batch_id, *args, **kwargs)

Create a Batch from an existing batch id.

Notes

Can only be used with the ServiceBackend.

Examples

Create a batch object from an existing batch id:

>>> b = Batch.from_batch_id(1)  
Parameters:

batch_id (int) – ID of an existing Batch

Return type:

Batch

Returns:

A Batch object that can append jobs to an existing batch.

new_bash_job(name=None, attributes=None, shell=None)

Initialize a BashJob object with default memory, storage, image, and CPU settings (defined in Batch) upon batch creation.

Examples

Create and execute a batch b with one job j that prints “hello world”:

>>> b = Batch()
>>> j = b.new_bash_job(name='hello', attributes={'language': 'english'})
>>> j.command('echo "hello world"')
>>> b.run()
Parameters:
  • name (Optional[str]) – Name of the job.

  • attributes (Optional[Dict[str, str]]) – Key-value pairs of additional attributes. ‘name’ is not a valid keyword. Use the name argument instead.

Return type:

BashJob

new_job(name=None, attributes=None, shell=None)

Alias for Batch.new_bash_job()

Return type:

BashJob

new_python_job(name=None, attributes=None)

Initialize a new PythonJob object with default Python image, memory, storage, and CPU settings (defined in Batch) upon batch creation.

Examples

Create and execute a batch b with one job j that prints “hello alice”:

b = Batch(default_python_image='hailgenetics/python-dill:3.9-slim')

def hello(name):
    return f'hello {name}'

j = b.new_python_job()
output = j.call(hello, 'alice')

# Write out the str representation of result to a file

b.write_output(output.as_str(), 'hello.txt')

b.run()

Notes

The image to use for Python jobs can be specified by default_python_image when constructing a Batch. The image specified must have the dill package installed. If default_python_image is not specified, then a Docker image will automatically be created for you with the base image hailgenetics/python-dill:[major_version].[minor_version]-slim and the Python packages specified by python_requirements will be installed. The default name of the image is batch-python with a random string for the tag unless python_build_image_name is specified. If the ServiceBackend is the backend, the locally built image will be pushed to the repository specified by image_repository.

Parameters:
  • name (Optional[str]) – Name of the job.

  • attributes (Optional[Dict[str, str]]) – Key-value pairs of additional attributes. ‘name’ is not a valid keyword. Use the name argument instead.

Return type:

PythonJob

read_input(path)

Create a new input resource file object representing a single file.

Warning

To avoid expensive egress charges, input files should be located in buckets that are multi-regional in the United States because Batch runs jobs in any US region.

Examples

Read the file hello.txt:

>>> b = Batch()
>>> input = b.read_input('data/hello.txt')
>>> j = b.new_job()
>>> j.command(f'cat {input}')
>>> b.run()
Parameters:

path (str) – File path to read.

Return type:

InputResourceFile

read_input_group(**kwargs)

Create a new resource group representing a mapping of identifier to input resource files.

Warning

To avoid expensive egress charges, input files should be located in buckets that are multi-regional in the United States because Batch runs jobs in any US region.

Examples

Read a binary PLINK file:

>>> b = Batch()
>>> bfile = b.read_input_group(bed="data/example.bed",
...                            bim="data/example.bim",
...                            fam="data/example.fam")
>>> j = b.new_job()
>>> j.command(f"plink --bfile {bfile} --geno --make-bed --out {j.geno}")
>>> j.command(f"wc -l {bfile.fam}")
>>> j.command(f"wc -l {bfile.bim}")
>>> b.run() 

Read a FASTA file and it’s index (file extensions matter!):

>>> fasta = b.read_input_group(**{'fasta': 'data/example.fasta',
...                               'fasta.idx': 'data/example.fasta.idx'})

Create a resource group where the identifiers don’t match the file extensions:

>>> rg = b.read_input_group(foo='data/foo.txt',
...                         bar='data/bar.txt')

rg.foo and rg.bar will not have the .txt file extension and instead will be {root}.foo and {root}.bar where {root} is a random identifier.

Notes

The identifier is used to refer to a specific resource file. For example, given the resource group rg, you can use the attribute notation rg.identifier or the get item notation rg[identifier].

The file extensions for each file are derived from the identifier. This is equivalent to “{root}.identifier” from BashJob.declare_resource_group(). We are planning on adding flexibility to incorporate more complicated extensions in the future such as .vcf.bgz. For now, use JobResourceFile.add_extension() to add an extension to a resource file.

Parameters:

kwargs (str) – Key word arguments where the name/key is the identifier and the value is the file path.

Return type:

ResourceGroup

run(dry_run=False, verbose=False, delete_scratch_on_exit=True, **backend_kwargs)

Execute a batch.

Examples

Create a simple batch with one job and execute it:

>>> b = Batch()
>>> j = b.new_job()
>>> j.command('echo "hello world"')
>>> b.run()
Parameters:
  • dry_run (bool) – If True, don’t execute code.

  • verbose (bool) – If True, print debugging output.

  • delete_scratch_on_exit (bool) – If True, delete temporary directories with intermediate files.

  • backend_kwargs (Any) – See Backend._run() for backend-specific arguments.

Return type:

Optional[Batch]

select_jobs(pattern)

Select all jobs in the batch whose name matches pattern.

Examples

Select jobs in batch matching qc:

>>> b = Batch()
>>> j = b.new_job(name='qc')
>>> qc_jobs = b.select_jobs('qc')
>>> assert qc_jobs == [j]
Parameters:

pattern (str) – Regex pattern matching job names.

Return type:

List[Job]

write_output(resource, dest)

Write resource file or resource file group to an output destination.

Examples

Write a single job intermediate to a local file:

>>> b = Batch()
>>> j = b.new_job()
>>> j.command(f'echo "hello" > {j.ofile}')
>>> b.write_output(j.ofile, 'output/hello.txt')
>>> b.run()

Write a single job intermediate to a permanent location in GCS:

b = Batch()
j = b.new_job()
j.command(f'echo "hello" > {j.ofile}')
b.write_output(j.ofile, 'gs://mybucket/output/hello.txt')
b.run()

Write a single job intermediate to a permanent location in Azure:

b = Batch()
j = b.new_job()
j.command(f'echo "hello" > {j.ofile}')
b.write_output(j.ofile, 'https://my-account.blob.core.windows.net/my-container/output/hello.txt')
b.run()  # doctest: +SKIP

Warning

To avoid expensive egress charges, output files should be located in buckets that are multi-regional in the United States because Batch runs jobs in any US region.

Notes

All JobResourceFile are temporary files and must be written to a permanent location using write_output() if the output needs to be saved.

Parameters:
  • resource (Resource) – Resource to be written to a file.

  • dest (str) – Destination file path. For a single ResourceFile, this will simply be dest. For a ResourceGroup, dest is the file root and each resource file will be written to {root}.identifier where identifier is the identifier of the file in the ResourceGroup map.