Job

class hailtop.batch.job.Job(batch, token, *, name=None, attributes=None, shell=None)

Bases: object

Object representing a single job to execute.

Notes

This class should never be created directly by the user. Use Batch.new_job(), Batch.new_bash_job(), or Batch.new_python_job() instead.

Methods

always_copy_output

Set the job to always copy output to cloud storage, even if the job failed.

always_run

Set the job to always run, even if dependencies fail.

cloudfuse

Add a bucket to mount with gcsfuse in GCP or a storage container with blobfuse in Azure.

cpu

Set the job's CPU requirements.

depends_on

Explicitly set dependencies on other jobs.

env

gcsfuse

Add a bucket to mount with gcsfuse.

memory

Set the job's memory requirements.

regions

Set the cloud regions a job can run in.

spot

Set whether a job is run on spot instances.

storage

Set the job's storage size.

timeout

Set the maximum amount of time this job can run for in seconds.

always_copy_output(always_copy_output=True)

Set the job to always copy output to cloud storage, even if the job failed.

Notes

Can only be used with the backend.ServiceBackend.

Examples

>>> b = Batch(backend=backend.ServiceBackend('test'))
>>> j = b.new_job()
>>> (j.always_copy_output()
...   .command(f'echo "hello" > {j.ofile} && false'))
Parameters:

always_copy_output (bool) – If True, set job to always copy output to cloud storage regardless of whether the job succeeded.

Return type:

Self

Returns:

Same job object set to always copy output.

always_run(always_run=True)

Set the job to always run, even if dependencies fail.

Warning

Jobs set to always run are not cancellable!

Examples

>>> b = Batch(backend=backend.ServiceBackend('test'))
>>> j = b.new_job()
>>> (j.always_run()
...   .command(f'echo "hello"'))
Parameters:

always_run (bool) – If True, set job to always run.

Return type:

Self

Returns:

Same job object set to always run.

cloudfuse(bucket, mount_point, *, read_only=True)

Add a bucket to mount with gcsfuse in GCP or a storage container with blobfuse in Azure.

Notes

Can only be used with the backend.ServiceBackend. This method can be called more than once.

Warning

There are performance and cost implications of using gcsfuse or blobfuse.

Examples

Google Cloud Platform:

>>> b = Batch(backend=backend.ServiceBackend('test'))
>>> j = b.new_job()
>>> (j.cloudfuse('my-bucket', '/my-bucket')
...   .command(f'cat /my-bucket/my-blob-object'))

Azure:

>>> b = Batch(backend=backend.ServiceBackend('test'))
>>> j = b.new_job()
>>> (j.cloudfuse('my-account/my-container', '/dest')
...   .command(f'cat /dest/my-blob-object'))
Parameters:
  • bucket (str) – Name of the google storage bucket to mount or the path to an Azure container in the format of <account>/<container>.

  • mount_point (str) – The path at which the cloud blob storage should be mounted to in the Docker container.

  • read_only (bool) – If True, mount the cloud blob storage in read-only mode.

Return type:

Self

Returns:

Same job object set with a cloud storage path to mount with either gcsfuse or blobfuse.

cpu(cores)

Set the job’s CPU requirements.

Notes

The string expression must be of the form {number}{suffix} where the optional suffix is m representing millicpu. Omitting a suffix means the value is in cpu.

For the ServiceBackend, cores must be a power of two between 0.25 and 16.

Examples

Set the job’s CPU requirement to 250 millicpu:

>>> b = Batch()
>>> j = b.new_job()
>>> (j.cpu('250m')
...   .command(f'echo "hello"'))
>>> b.run()
Parameters:

cores (Union[str, int, float, None]) – Units are in cpu if cores is numeric. If None, use the default value for the ServiceBackend (1 cpu).

Return type:

Self

Returns:

Same job object with CPU requirements set.

depends_on(*jobs)

Explicitly set dependencies on other jobs.

Examples

Initialize the batch:

>>> b = Batch()

Create the first job:

>>> j1 = b.new_job()
>>> j1.command(f'echo "hello"')

Create the second job j2 that depends on j1:

>>> j2 = b.new_job()
>>> j2.depends_on(j1)
>>> j2.command(f'echo "world"')

Execute the batch:

>>> b.run()

Notes

Dependencies between jobs are automatically created when resources from one job are used in a subsequent job. This method is only needed when no intermediate resource exists and the dependency needs to be explicitly set.

Parameters:

jobs (Job) – Sequence of jobs to depend on.

Return type:

Self

Returns:

Same job object with dependencies set.

env(variable, value)
gcsfuse(bucket, mount_point, read_only=True)

Add a bucket to mount with gcsfuse.

Notes

Can only be used with the backend.ServiceBackend. This method can be called more than once. This method has been deprecated. Use Job.cloudfuse() instead.

Warning

There are performance and cost implications of using gcsfuse.

Examples

>>> b = Batch(backend=backend.ServiceBackend('test'))
>>> j = b.new_job()
>>> (j.gcsfuse('my-bucket', '/my-bucket')
...   .command(f'cat /my-bucket/my-file'))
Parameters:
  • bucket – Name of the google storage bucket to mount.

  • mount_point – The path at which the bucket should be mounted to in the Docker container.

  • read_only – If True, mount the bucket in read-only mode.

Return type:

Self

Returns:

Same job object set with a bucket to mount with gcsfuse.

memory(memory)

Set the job’s memory requirements.

Examples

Set the job’s memory requirement to be 3Gi:

>>> b = Batch()
>>> j = b.new_job()
>>> (j.memory('3Gi')
...   .command(f'echo "hello"'))
>>> b.run()

Notes

The memory expression must be of the form {number}{suffix} where valid optional suffixes are K, Ki, M, Mi, G, Gi, T, Ti, P, and Pi. Omitting a suffix means the value is in bytes.

For the ServiceBackend, the values ‘lowmem’, ‘standard’, and ‘highmem’ are also valid arguments. ‘lowmem’ corresponds to approximately 1 Gi/core, ‘standard’ corresponds to approximately 4 Gi/core, and ‘highmem’ corresponds to approximately 7 Gi/core. The default value is ‘standard’.

Parameters:

memory (Union[int, str, None]) – Units are in bytes if memory is an int. If None, use the default value for the ServiceBackend (‘standard’).

Return type:

Self

Returns:

Same job object with memory requirements set.

regions(regions)

Set the cloud regions a job can run in.

Notes

Can only be used with the backend.ServiceBackend.

This method may be used to ensure code executes in the same region as the data it reads. This can avoid egress charges as well as improve latency.

Examples

Require the job to run in ‘us-central1’:

>>> b = Batch(backend=backend.ServiceBackend('test'))
>>> j = b.new_job()
>>> (j.regions(['us-central1'])
...   .command(f'echo "hello"'))

Specify the job can run in any region:

>>> b = Batch(backend=backend.ServiceBackend('test'))
>>> j = b.new_job()
>>> (j.regions(None)
...   .command(f'echo "hello"'))
Parameters:

regions (Optional[List[str]]) – The cloud region(s) to run this job in. Use None to signify the job can run in any available region. Use py:staticmethod:.ServiceBackend.supported_regions to list the available regions to choose from. The default is the job can run in any region.

Return type:

Self

Returns:

Same job object with the cloud regions the job can run in set.

spot(is_spot)

Set whether a job is run on spot instances. By default, all jobs run on spot instances.

Examples

Ensure a job only runs on non-spot instances:

>>> b = Batch(backend=backend.ServiceBackend('test'))
>>> j = b.new_job()
>>> j = j.spot(False)
>>> j = j.command(f'echo "hello"')
Parameters:

is_spot (bool) – If False, this job will be run on non-spot instances.

Return type:

Self

Returns:

Same job object.

storage(storage)

Set the job’s storage size.

Examples

Set the job’s disk requirements to 10 Gi:

>>> b = Batch()
>>> j = b.new_job()
>>> (j.storage('10Gi')
...   .command(f'echo "hello"'))
>>> b.run()

Notes

The storage expression must be of the form {number}{suffix} where valid optional suffixes are K, Ki, M, Mi, G, Gi, T, Ti, P, and Pi. Omitting a suffix means the value is in bytes.

For the ServiceBackend, jobs requesting one or more cores receive 5 GiB of storage for the root file system /. Jobs requesting a fraction of a core receive the same fraction of 5 GiB of storage. If you need additional storage, you can explicitly request more storage using this method and the extra storage space will be mounted at /io. Batch automatically writes all ResourceFile to /io.

The default storage size is 0 Gi. The minimum storage size is 0 Gi and the maximum storage size is 64 Ti. If storage is set to a value between 0 Gi and 10 Gi, the storage request is rounded up to 10 Gi. All values are rounded up to the nearest Gi.

Parameters:

storage (Union[int, str, None]) – Units are in bytes if storage is an int. If None, use the default storage size for the ServiceBackend (0 Gi).

Return type:

Self

Returns:

Same job object with storage set.

timeout(timeout)

Set the maximum amount of time this job can run for in seconds.

Notes

Can only be used with the backend.ServiceBackend.

Examples

>>> b = Batch(backend=backend.ServiceBackend('test'))
>>> j = b.new_job()
>>> (j.timeout(10)
...   .command(f'echo "hello"'))
Parameters:

timeout (Union[int, float, None]) – Maximum amount of time in seconds for a job to run before being killed. If None, there is no timeout.

Return type:

Self

Returns:

Same job object set with a timeout in seconds.