Batch

class hailtop.batch.batch.Batch(name=None, backend=None, attributes=None, requester_pays_project=None, default_image=None, default_memory=None, default_cpu=None, default_storage=None, default_timeout=None, default_shell=None, default_python_image=None, project=None, cancel_after_n_failures=None)

Bases: object

Object representing the distributed acyclic graph (DAG) of jobs to run.

Examples

Create a batch object:

>>> p = Batch()

Create a new job that prints “hello”:

>>> t = p.new_job()
>>> t.command(f'echo "hello" ')

Execute the DAG:

>>> p.run()

Notes

The methods Batch.read_input() and Batch.read_input_group() are for adding input files to a batch. An input file is a file that already exists before executing a batch and is not present in the docker container the job is being run in.

Files generated by executing a job are temporary files and must be written to a permanent location using the method Batch.write_output().

Parameters
  • name (Optional[str]) – Name of the batch.

  • backend (Optional[Backend]) – Backend used to execute the jobs. Default is LocalBackend.

  • attributes (Optional[Dict[str, str]]) – Key-value pairs of additional attributes. ‘name’ is not a valid keyword. Use the name argument instead.

  • requester_pays_project (Optional[str]) – The name of the Google project to be billed when accessing requester pays buckets.

  • default_image (Optional[str]) – Default docker image to use for Bash jobs. This must be the full name of the image including any repository prefix and tags if desired (default tag is latest).

  • default_memory (Union[str, int, None]) – Memory setting to use by default if not specified by a job. Only applicable if a docker image is specified for the LocalBackend or the ServiceBackend. See Job.memory().

  • default_cpu (Union[str, int, float, None]) – CPU setting to use by default if not specified by a job. Only applicable if a docker image is specified for the LocalBackend or the ServiceBackend. See Job.cpu().

  • default_storage (Union[str, int, None]) – Storage setting to use by default if not specified by a job. Only applicable for the ServiceBackend. See Job.storage().

  • default_timeout (Union[int, float, None]) – Maximum time in seconds for a job to run before being killed. Only applicable for the ServiceBackend. If None, there is no timeout.

  • default_python_image (Optional[str]) – Default image to use for all Python jobs. This must be the full name of the image including any repository prefix and tags if desired (default tag is latest). The image must have the dill Python package installed and have the same version of Python installed that is currently running. If None, a compatible Python image with dill pre-installed will automatically be used if the current Python version is 3.6, 3.7, or 3.8.

  • project (Optional[str]) – DEPRECATED: please specify google_project on the ServiceBackend instead. If specified, the project to use when authenticating with Google Storage. Google Storage is used to transfer serialized values between this computer and the cloud machines that execute Python jobs.

  • cancel_after_n_failures (Optional[int]) – Automatically cancel the batch after N failures have occurred. The default behavior is there is no limit on the number of failures. Only applicable for the ServiceBackend. Must be greater than 0.

Methods

new_bash_job

Initialize a BashJob object with default memory, storage, image, and CPU settings (defined in Batch) upon batch creation.

new_job

Alias for Batch.new_bash_job()

new_python_job

Initialize a new PythonJob object with default Python image, memory, storage, and CPU settings (defined in Batch) upon batch creation.

read_input

Create a new input resource file object representing a single file.

read_input_group

Create a new resource group representing a mapping of identifier to input resource files.

run

Execute a batch.

select_jobs

Select all jobs in the batch whose name matches pattern.

write_output

Write resource file or resource file group to an output destination.

new_bash_job(name=None, attributes=None, shell=None)

Initialize a BashJob object with default memory, storage, image, and CPU settings (defined in Batch) upon batch creation.

Examples

Create and execute a batch b with one job j that prints “hello world”:

>>> b = Batch()
>>> j = b.new_bash_job(name='hello', attributes={'language': 'english'})
>>> j.command('echo "hello world"')
>>> b.run()
Parameters
  • name (Optional[str]) – Name of the job.

  • attributes (Optional[Dict[str, str]]) – Key-value pairs of additional attributes. ‘name’ is not a valid keyword. Use the name argument instead.

Return type

BashJob

new_job(name=None, attributes=None, shell=None)

Alias for Batch.new_bash_job()

Return type

BashJob

new_python_job(name=None, attributes=None)

Initialize a new PythonJob object with default Python image, memory, storage, and CPU settings (defined in Batch) upon batch creation.

Examples

Create and execute a batch b with one job j that prints “hello alice”:

b = Batch(default_python_image='gcr.io/hail-vdc/python-dill:3.7-slim')

def hello(name):
    return f'hello {name}'

j = b.new_python_job()
output = j.call(hello, 'alice')

# Write out the str representation of result to a file

b.write_output(output.as_str(), 'hello.txt')

b.run()

Notes

The image to use for Python jobs can be specified by default_python_image when constructing a Batch. The image specified must have the dill package installed. If default_python_image is not specified, then a Docker image will automatically be created for you with the base image hailgenetics/python-dill:[major_version].[minor_version]-slim and the Python packages specified by python_requirements will be installed. The default name of the image is batch-python with a random string for the tag unless python_build_image_name is specified. If the ServiceBackend is the backend, the locally built image will be pushed to the repository specified by image_repository.

Parameters
  • name (Optional[str]) – Name of the job.

  • attributes (Optional[Dict[str, str]]) – Key-value pairs of additional attributes. ‘name’ is not a valid keyword. Use the name argument instead.

Return type

PythonJob

read_input(path)

Create a new input resource file object representing a single file.

Warning

To avoid expensive egress charges, input files should be located in buckets that are multi-regional in the United States because Batch runs jobs in any US region.

Examples

Read the file hello.txt:

>>> b = Batch()
>>> input = b.read_input('data/hello.txt')
>>> j = b.new_job()
>>> j.command(f'cat {input}')
>>> b.run()
Parameters
  • path (str) – File path to read.

  • extension (str, optional) – File extension to use.

Return type

InputResourceFile

read_input_group(**kwargs)

Create a new resource group representing a mapping of identifier to input resource files.

Warning

To avoid expensive egress charges, input files should be located in buckets that are multi-regional in the United States because Batch runs jobs in any US region.

Examples

Read a binary PLINK file:

>>> b = Batch()
>>> bfile = b.read_input_group(bed="data/example.bed",
...                            bim="data/example.bim",
...                            fam="data/example.fam")
>>> j = b.new_job()
>>> j.command(f"plink --bfile {bfile} --geno --make-bed --out {j.geno}")
>>> j.command(f"wc -l {bfile.fam}")
>>> j.command(f"wc -l {bfile.bim}")
>>> b.run() 

Read a FASTA file and it’s index (file extensions matter!):

>>> fasta = b.read_input_group(**{'fasta': 'data/example.fasta',
...                               'fasta.idx': 'data/example.fasta.idx'})

Create a resource group where the identifiers don’t match the file extensions:

>>> rg = b.read_input_group(foo='data/foo.txt',
...                         bar='data/bar.txt')

rg.foo and rg.bar will not have the .txt file extension and instead will be {root}.foo and {root}.bar where {root} is a random identifier.

Notes

The identifier is used to refer to a specific resource file. For example, given the resource group rg, you can use the attribute notation rg.identifier or the get item notation rg[identifier].

The file extensions for each file are derived from the identifier. This is equivalent to “{root}.identifier” from BashJob.declare_resource_group(). We are planning on adding flexibility to incorporate more complicated extensions in the future such as .vcf.bgz. For now, use JobResourceFile.add_extension() to add an extension to a resource file.

Parameters

kwargs (str) – Key word arguments where the name/key is the identifier and the value is the file path.

Return type

ResourceGroup

run(dry_run=False, verbose=False, delete_scratch_on_exit=True, **backend_kwargs)

Execute a batch.

Examples

Create a simple batch with one job and execute it:

>>> b = Batch()
>>> j = b.new_job()
>>> j.command('echo "hello world"')
>>> b.run()
Parameters
  • dry_run (bool) – If True, don’t execute code.

  • verbose (bool) – If True, print debugging output.

  • delete_scratch_on_exit (bool) – If True, delete temporary directories with intermediate files.

  • backend_kwargs (Any) – See Backend._run() for backend-specific arguments.

select_jobs(pattern)

Select all jobs in the batch whose name matches pattern.

Examples

Select jobs in batch matching qc:

>>> b = Batch()
>>> j = b.new_job(name='qc')
>>> qc_jobs = b.select_jobs('qc')
>>> assert qc_jobs == [j]
Parameters

pattern (str) – Regex pattern matching job names.

Return type

List[Job]

write_output(resource, dest)

Write resource file or resource file group to an output destination.

Examples

Write a single job intermediate to a local file:

>>> b = Batch()
>>> j = b.new_job()
>>> j.command(f'echo "hello" > {j.ofile}')
>>> b.write_output(j.ofile, 'output/hello.txt')
>>> b.run()

Write a single job intermediate to a permanent location in GCS:

>>> b = Batch()
>>> j = b.new_job()
>>> j.command(f'echo "hello" > {j.ofile}')
>>> b.write_output(j.ofile, 'gs://mybucket/output/hello.txt')
>>> b.run()  

Write a single job intermediate to a permanent location in Azure:

>>> b = Batch()
>>> j = b.new_job()
>>> j.command(f'echo "hello" > {j.ofile}')
>>> b.write_output(j.ofile, 'hail-az://my-account/my-container/output/hello.txt')
>>> b.run()  

Warning

To avoid expensive egress charges, output files should be located in buckets that are multi-regional in the United States because Batch runs jobs in any US region.

Notes

All JobResourceFile are temporary files and must be written to a permanent location using write_output() if the output needs to be saved.

Parameters
  • resource (Resource) – Resource to be written to a file.

  • dest (str) – Destination file path. For a single ResourceFile, this will simply be dest. For a ResourceGroup, dest is the file root and each resource file will be written to {root}.identifier where identifier is the identifier of the file in the ResourceGroup map.