Batch
- class hailtop.batch.batch.Batch(name=None, backend=None, attributes=None, requester_pays_project=None, default_image=None, default_memory=None, default_cpu=None, default_storage=None, default_regions=None, default_timeout=None, default_shell=None, default_python_image=None, default_spot=None, project=None, cancel_after_n_failures=None)
Bases:
object
Object representing the distributed acyclic graph (DAG) of jobs to run.
Examples
Create a batch object:
>>> import hailtop.batch as hb >>> p = hb.Batch()
Create a new job that prints “hello”:
>>> t = p.new_job() >>> t.command(f'echo "hello" ')
Execute the DAG:
>>> p.run()
Require all jobs in this batch to execute in us-central1:
>>> b = hb.Batch(backend=hb.ServiceBackend(), default_regions=['us-central1'])
Notes
The methods
Batch.read_input()
andBatch.read_input_group()
are for adding input files to a batch. An input file is a file that already exists before executing a batch and is not present in the docker container the job is being run in.Files generated by executing a job are temporary files and must be written to a permanent location using the method
Batch.write_output()
.- Parameters:
backend (
Union
[LocalBackend
,ServiceBackend
,None
]) – Backend used to execute the jobs. If no backend is specified, a backend will be created by first looking at the environment variable HAIL_BATCH_BACKEND, then the hailctl config variable batch/backend. These configurations, if set, can be either local or service, and will result in the use of aLocalBackend
andServiceBackend
respectively. If no argument is given and no configurations are set, the default isLocalBackend
.attributes (
Optional
[Dict
[str
,str
]]) – Key-value pairs of additional attributes. ‘name’ is not a valid keyword. Use the name argument instead.requester_pays_project (
Optional
[str
]) – The name of the Google project to be billed when accessing requester pays buckets.default_image (
Optional
[str
]) – Default docker image to use for Bash jobs. This must be the full name of the image including any repository prefix and tags if desired (default tag is latest).default_memory (
Union
[str
,int
,None
]) – Memory setting to use by default if not specified by a job. Only applicable if a docker image is specified for theLocalBackend
or theServiceBackend
. SeeJob.memory()
.default_cpu (
Union
[str
,int
,float
,None
]) – CPU setting to use by default if not specified by a job. Only applicable if a docker image is specified for theLocalBackend
or theServiceBackend
. SeeJob.cpu()
.default_storage (
Union
[str
,int
,None
]) – Storage setting to use by default if not specified by a job. Only applicable for theServiceBackend
. SeeJob.storage()
.default_regions (
Optional
[List
[str
]]) – Cloud regions in which jobs may run. When unspecified orNone
, use the regions attribute ofServiceBackend
. SeeServiceBackend
for details.default_timeout (
Union
[int
,float
,None
]) – Maximum time in seconds for a job to run before being killed. Only applicable for theServiceBackend
. If None, there is no timeout.default_python_image (
Optional
[str
]) – Default image to use for all Python jobs. This must be the full name of the image including any repository prefix and tags if desired (default tag is latest). The image must have the dill Python package installed and have the same version of Python installed that is currently running. If None, a tag of the hailgenetics/hail image will be chosen according to the current Hail and Python version.default_spot (
Optional
[bool
]) – If unspecified orTrue
, jobs will run by default on spot instances. IfFalse
, jobs will run by default on non-spot instances. Each job can override this setting withJob.spot()
.project (
Optional
[str
]) – DEPRECATED: please specify google_project on the ServiceBackend instead. If specified, the project to use when authenticating with Google Storage. Google Storage is used to transfer serialized values between this computer and the cloud machines that execute Python jobs.cancel_after_n_failures (
Optional
[int
]) – Automatically cancel the batch after N failures have occurred. The default behavior is there is no limit on the number of failures. Only applicable for theServiceBackend
. Must be greater than 0.
Methods
Create a Batch from an existing batch id.
Initialize a
BashJob
object with default memory, storage, image, and CPU settings (defined inBatch
) upon batch creation.Alias for
Batch.new_bash_job()
Initialize a new
PythonJob
object with default Python image, memory, storage, and CPU settings (defined inBatch
) upon batch creation.Create a new input resource file object representing a single file.
Create a new resource group representing a mapping of identifier to input resource files.
Execute a batch.
Select all jobs in the batch whose name matches pattern.
Write resource file or resource file group to an output destination.
- static from_batch_id(batch_id, *args, **kwargs)
Create a Batch from an existing batch id.
Notes
Can only be used with the
ServiceBackend
.Examples
Create a batch object from an existing batch id:
>>> b = Batch.from_batch_id(1)
- new_bash_job(name=None, attributes=None, shell=None)
Initialize a
BashJob
object with default memory, storage, image, and CPU settings (defined inBatch
) upon batch creation.Examples
Create and execute a batch b with one job j that prints “hello world”:
>>> b = Batch() >>> j = b.new_bash_job(name='hello', attributes={'language': 'english'}) >>> j.command('echo "hello world"') >>> b.run()
- new_job(name=None, attributes=None, shell=None)
Alias for
Batch.new_bash_job()
- Return type:
- new_python_job(name=None, attributes=None)
Initialize a new
PythonJob
object with default Python image, memory, storage, and CPU settings (defined inBatch
) upon batch creation.Examples
Create and execute a batch b with one job j that prints “hello alice”:
b = Batch(default_python_image='hailgenetics/python-dill:3.9-slim') def hello(name): return f'hello {name}' j = b.new_python_job() output = j.call(hello, 'alice') # Write out the str representation of result to a file b.write_output(output.as_str(), 'hello.txt') b.run()
Notes
The image to use for Python jobs can be specified by default_python_image when constructing a
Batch
. The image specified must have the dill package installed. Ifdefault_python_image
is not specified, then a Docker image will automatically be created for you with the base image hailgenetics/python-dill:[major_version].[minor_version]-slim and the Python packages specified bypython_requirements
will be installed. The default name of the image is batch-python with a random string for the tag unlesspython_build_image_name
is specified. If theServiceBackend
is the backend, the locally built image will be pushed to the repository specified byimage_repository
.
- read_input(path)
Create a new input resource file object representing a single file.
Warning
To avoid expensive egress charges, input files should be located in buckets that are in the same region in which your Batch jobs run.
Examples
Read the file hello.txt:
>>> b = Batch() >>> input = b.read_input('data/hello.txt') >>> j = b.new_job() >>> j.command(f'cat {input}') >>> b.run()
- Parameters:
path (
str
) – File path to read.- Return type:
- read_input_group(**kwargs)
Create a new resource group representing a mapping of identifier to input resource files.
Warning
To avoid expensive egress charges, input files should be located in buckets that are in the same region in which your Batch jobs run.
Examples
Read a binary PLINK file:
>>> b = Batch() >>> bfile = b.read_input_group(bed="data/example.bed", ... bim="data/example.bim", ... fam="data/example.fam") >>> j = b.new_job() >>> j.command(f"plink --bfile {bfile} --geno --make-bed --out {j.geno}") >>> j.command(f"wc -l {bfile.fam}") >>> j.command(f"wc -l {bfile.bim}") >>> b.run()
Read a FASTA file and it’s index (file extensions matter!):
>>> fasta = b.read_input_group(**{'fasta': 'data/example.fasta', ... 'fasta.idx': 'data/example.fasta.idx'})
Create a resource group where the identifiers don’t match the file extensions:
>>> rg = b.read_input_group(foo='data/foo.txt', ... bar='data/bar.txt')
rg.foo and rg.bar will not have the .txt file extension and instead will be {root}.foo and {root}.bar where {root} is a random identifier.
Notes
The identifier is used to refer to a specific resource file. For example, given the resource group rg, you can use the attribute notation rg.identifier or the get item notation rg[identifier].
The file extensions for each file are derived from the identifier. This is equivalent to “{root}.identifier” from
BashJob.declare_resource_group()
. We are planning on adding flexibility to incorporate more complicated extensions in the future such as .vcf.bgz. For now, useJobResourceFile.add_extension()
to add an extension to a resource file.
- run(dry_run=False, verbose=False, delete_scratch_on_exit=True, **backend_kwargs)
Execute a batch.
Examples
Create a simple batch with one job and execute it:
>>> b = Batch() >>> j = b.new_job() >>> j.command('echo "hello world"') >>> b.run()
- Parameters:
dry_run (
bool
) – If True, don’t execute code.verbose (
bool
) – If True, print debugging output.delete_scratch_on_exit (
bool
) – If True, delete temporary directories with intermediate files.backend_kwargs (
Any
) – SeeBackend._run()
for backend-specific arguments.
- Return type:
Optional
[Batch
]
- select_jobs(pattern)
Select all jobs in the batch whose name matches pattern.
Examples
Select jobs in batch matching qc:
>>> b = Batch() >>> j = b.new_job(name='qc') >>> qc_jobs = b.select_jobs('qc') >>> assert qc_jobs == [j]
- write_output(resource, dest)
Write resource file or resource file group to an output destination.
Examples
Write a single job intermediate to a local file:
>>> b = Batch() >>> j = b.new_job() >>> j.command(f'echo "hello" > {j.ofile}') >>> b.write_output(j.ofile, 'output/hello.txt') >>> b.run()
Write a single job intermediate to a permanent location in GCS:
b = Batch() j = b.new_job() j.command(f'echo "hello" > {j.ofile}') b.write_output(j.ofile, 'gs://mybucket/output/hello.txt') b.run()
Write a single job intermediate to a permanent location in Azure:
b = Batch() j = b.new_job() j.command(f'echo "hello" > {j.ofile}') b.write_output(j.ofile, 'https://my-account.blob.core.windows.net/my-container/output/hello.txt') b.run() # doctest: +SKIP
Warning
To avoid expensive egress charges, output files should be located in buckets that are in the same region in which your Batch jobs run.
Notes
All
JobResourceFile
are temporary files and must be written to a permanent location usingwrite_output()
if the output needs to be saved.- Parameters:
resource (
Resource
) – Resource to be written to a file.dest (
str
) – Destination file path. For a singleResourceFile
, this will simply be dest. For aResourceGroup
, dest is the file root and each resource file will be written to {root}.identifier where identifier is the identifier of the file in theResourceGroup
map.