BatchPoolExecutor

class hailtop.batch.batch_pool_executor.BatchPoolExecutor(*, name=None, backend=None, image=None, cpus_per_job=None, wait_on_exit=True, cleanup_bucket=True, project=None)

Bases: object

An executor which executes Python functions in the cloud.

concurrent.futures.ProcessPoolExecutor and concurrent.futures.ThreadPoolExecutor enable the use of all the computer cores available on a single computer. BatchPoolExecutor enables the use of an effectively arbitrary number of cloud computer cores.

Functions provided to submit() are serialized using dill, sent to a Python docker container in the cloud, deserialized, and executed. The results are serialized and returned to the machine from which submit() was called. The Python version in the docker container will share a major and minor verison with the local process. The image parameter overrides this behavior.

When used as a context manager (the with syntax), the executor will wait for all jobs to finish before finishing the with statement. This behavior can be controlled by the wait_on_exit parameter.

This class creates a folder batch-pool-executor at the root of the bucket specified by the backend. This folder can be safely deleted after all jobs have completed.

Examples

Add 3 to 6 on a machine in the cloud and send the result back to this machine:

>>> with BatchPoolExecutor() as bpe:  
...     future_nine = bpe.submit(lambda: 3 + 6)
>>> future_nine.result()  
9

map() facilitates the common case of executing a function on many values in parallel:

>>> with BatchPoolExecutor() as bpe:  
...     list(bpe.map(lambda x: x * 3, range(4)))
[0, 3, 6, 9]
Parameters:
  • name (Optional[str]) – A name for the executor. Executors produce many batches and each batch will include this name as a prefix.

  • backend (Optional[ServiceBackend]) – Backend used to execute the jobs. Must be a ServiceBackend.

  • image (Optional[str]) – The name of a Docker image used for each submitted job. The image must include Python 3.9 or later and must have the dill Python package installed. If you intend to use numpy, ensure that OpenBLAS is also installed. If unspecified, an image with a matching Python verison and numpy, scipy, and sklearn installed is used.

  • cpus_per_job (Union[int, str, None]) – The number of CPU cores to allocate to each job. The default value is 1. The parameter is passed unaltered to Job.cpu(). This parameter’s value is used to set several environment variables instructing BLAS and LAPACK to limit core use.

  • wait_on_exit (bool) – If True or unspecified, wait for all jobs to complete when exiting a context. If False, do not wait. This option has no effect if this executor is not used with the with syntax.

  • cleanup_bucket (bool) – If True or unspecified, delete all temporary files in the cloud storage bucket when this executor fully shuts down. If Python crashes before the executor is shutdown, the files will not be deleted.

  • project (Optional[str]) – DEPRECATED. Please specify gcs_requester_pays_configuration in ServiceBackend.

Methods

async_map

Aysncio compatible version of map().

async_submit

Aysncio compatible version of BatchPoolExecutor.submit().

map

Call fn on cloud machines with arguments from iterables.

shutdown

Allow temporary resources to be cleaned up.

submit

Call fn on a cloud machine with all remaining arguments and keyword arguments.

async async_map(fn, iterables, timeout=None, chunksize=1)

Aysncio compatible version of map().

Return type:

AsyncGenerator[int, None]

async async_submit(unapplied, *args, **kwargs)

Aysncio compatible version of BatchPoolExecutor.submit().

Return type:

BatchPoolFuture

map(fn, *iterables, timeout=None, chunksize=1)

Call fn on cloud machines with arguments from iterables.

This function returns a generator which will produce each result in the same order as the iterables, only blocking if the result is not yet ready. You can convert the generator to a list with list.

Examples

Do nothing, but on the cloud:

>>> with BatchPoolExecutor() as bpe:  
...     list(bpe.map(lambda x: x, range(4)))
[0, 1, 2, 3]

Call a function with two parameters, on the cloud:

>>> with BatchPoolExecutor() as bpe:  
...     list(bpe.map(lambda x, y: x + y,
...                  ["white", "cat", "best"],
...                  ["house", "dog", "friend"]))
["whitehouse", "catdog", "bestfriend"]

Generate products of random matrices, on the cloud:

>>> def random_product(seed):
...     np.random.seed(seed)
...     w = np.random.rand(1, 100)
...     u = np.random.rand(100, 1)
...     return float(w @ u)
>>> with BatchPoolExecutor() as bpe:  
...     list(bpe.map(random_product, range(4)))
[24.440006386777277, 23.325755364428026, 23.920184804993806, 25.47912882125101]
Parameters:
  • fn (Callable) – The function to execute.

  • iterables (Iterable[Any]) – The iterables are zipped together and each tuple is used as arguments to fn. See the second example for more detail. It is not possible to pass keyword arguments. Each element of iterables must have the same length.

  • timeout (Union[int, float, None]) – This is roughly a timeout on how long we wait on each function call. Specifically, each call to the returned generator’s BatchPoolFuture iterator.__next__() invokes BatchPoolFuture.result() with this timeout.

  • chunksize (int) – The number of tasks to schedule in the same docker container. Docker containers take about 5 seconds to start. Ideally, each task should take an order of magnitude more time than start-up time. You can make the chunksize larger to reduce parallelism but increase the amount of meaningful work done per-container.

shutdown(wait=True)

Allow temporary resources to be cleaned up.

Until shutdown is called, some temporary cloud storage files will persist. After shutdown has been called and all outstanding jobs have completed, these files will be deleted.

Parameters:

wait (bool) – If true, wait for all jobs to complete before returning from this method.

submit(fn, *args, **kwargs)

Call fn on a cloud machine with all remaining arguments and keyword arguments.

The function, any objects it references, the arguments, and the keyword arguments will be serialized to the cloud machine. Python modules are not serialized, so you must ensure any needed Python modules and packages already present in the underlying Docker image. For more details see the default_image argument to BatchPoolExecutor

This function does not return the function’s output, it returns a BatchPoolFuture whose BatchPoolFuture.result() method can be used to access the value.

Examples

Do nothing, but on the cloud:

>>> with BatchPoolExecutor() as bpe:  
...     future = bpe.submit(lambda x: x, 4)
...     future.result()
4

Call a function with two arguments and one keyword argument, on the cloud:

>>> with BatchPoolExecutor() as bpe:  
...     future = bpe.submit(lambda x, y, z: x + y + z,
...                         "poly", "ethyl", z="ene")
...     future.result()
"polyethylene"

Generate a product of two random matrices, on the cloud:

>>> def random_product(seed):
...     np.random.seed(seed)
...     w = np.random.rand(1, 100)
...     u = np.random.rand(100, 1)
...     return float(w @ u)
>>> with BatchPoolExecutor() as bpe:  
...     future = bpe.submit(random_product, 1)
...     future.result()
[23.325755364428026]
Parameters:
  • fn (Callable) – The function to execute.

  • args (Any) – Arguments for the funciton.

  • kwargs (Any) – Keyword arguments for the function.

Return type:

BatchPoolFuture