Tutorial

This tutorial goes through the basic concepts of Batch with examples.

Import

Batch is located inside the hailtop module, which can be installed as described in the Getting Started section.

>>> import hailtop.batch as hb

f-strings

f-strings were added to Python in version 3.6 and are denoted by the ‘f’ character before a string literal. When creating the string, Python evaluates any expressions in single curly braces {…} using the current variable scope. When Python compiles the example below, the string ‘Alice’ is substituted for {name} because the variable name is set to ‘Alice’ in the line above.

>>> name = 'Alice'
>>> print(f'hello {name}')
hello Alice

You can put any arbitrary Python code inside the curly braces and Python will evaluate the expression correctly. For example, below we evaluate x + 1 first before compiling the string. Therefore, we get ‘x = 6’ as the resulting string.

>>> x = 5
>>> print(f'x = {x + 1}')
x = 6

To use an f-string and output a single curly brace in the output string, escape the curly brace by duplicating the character. For example, { becomes {{ in the string definition, but will print as {. Likewise, } becomes }}, but will print as }.

>>> x = 5
>>> print(f'x = {{x + 1}} plus {x}')
x = {x + 1} plus 5

To learn more about f-strings, check out this tutorial.

Hello World

A Batch consists of a set of Job to execute. There can be an arbitrary number of jobs in the batch that are executed in order of their dependencies. A dependency between two jobs states that the dependent job should not run until the previous job completes. Thus, under the covers a batch is a directed acyclic graph (DAG) of jobs.

In the example below, we have defined a Batch b with the name ‘hello’. We use the method Batch.new_job() to create a job object which we call j and then use the method BashJob.command() to tell Batch that we want to execute echo “hello world”. However, at this point, Batch hasn’t actually run the job to print “hello world”. All we have done is specified the jobs and the order in which they should be run. To actually execute the Batch, we call Batch.run(). The name arguments to both Batch and Job are used in the Batch Service UI.

>>> b = hb.Batch(name='hello')
>>> j = b.new_job(name='j1')
>>> j.command('echo "hello world"')
>>> b.run()

Now that we know how to create a batch with a single job, we call Batch.new_job() twice to create two jobs s and t which both will print a variant of hello world to stdout. Calling b.run() executes the batch. By default, batches are executed by the LocalBackend which runs jobs on your local computer. Therefore, even though these jobs can be run in parallel, they are still run sequentially. However, if batches are executed by the ServiceBackend using the Batch Service, then s and t can be run in parallel as there exist no dependencies between them.

>>> b = hb.Batch(name='hello-parallel')
>>> s = b.new_job(name='j1')
>>> s.command('echo "hello world 1"')
>>> t = b.new_job(name='j2')
>>> t.command('echo "hello world 2"')
>>> b.run()

To create a dependency between s and t, we use the method Job.depends_on() to explicitly state that t depends on s. In both the LocalBackend and ServiceBackend, s will always run before t.

>>> b = hb.Batch(name='hello-serial')
>>> s = b.new_job(name='j1')
>>> s.command('echo "hello world 1"')
>>> t = b.new_job(name='j2')
>>> t.command('echo "hello world 2"')
>>> t.depends_on(s)
>>> b.run()

File Dependencies

So far we have created batches with two jobs where the dependencies between them were declared explicitly. However, in many computational pipelines, we want to have a file generated by one job be the input to a downstream job. Batch has a mechanism for tracking file outputs and then inferring job dependencies from the usage of those files.

In the example below, we have specified two jobs: s and t. s prints “hello world” as in previous examples. However, instead of printing to stdout, this time s redirects the output to a temporary file defined by s.ofile. s.ofile is a Python object of type JobResourceFile that was created on the fly when we accessed an attribute of a Job that does not already exist. Any time we access the attribute again (in this example ofile), we get the same JobResourceFile that was previously created. However, be aware that you cannot use an existing method or property name of Job objects such as BashJob.command() or BashJob.image().

Note the ‘f’ character before the string in the command for s! We placed s.ofile in curly braces so when Python interpolates the f-string, it replaced the JobResourceFile object with an actual file path into the command for s. We use another f-string in t’s command where we print the contents of s.ofile to stdout. s.ofile is the same temporary file that was created in the command for t. Therefore, Batch deduces that t must depend on s and thus creates an implicit dependency for t on s. In both the LocalBackend and ServiceBackend, s will always run before t.

>>> b = hb.Batch(name='hello-serial')
>>> s = b.new_job(name='j1')
>>> s.command(f'echo "hello world" > {s.ofile}')
>>> t = b.new_job(name='j2')
>>> t.command(f'cat {s.ofile}')
>>> b.run()

Scatter / Gather

Batch is implemented in Python making it easy to use for loops to create more complicated dependency graphs between jobs. A scatter is a set of jobs with the same command but varying input parameters. A gather is a final job or “sink” that waits for all of the jobs in the scatter to be complete before executing.

In the example below, we use a for loop to create a job for each one of ‘Alice’, ‘Bob’, and ‘Dan’ that prints the name of the user programatically thereby scattering the echo command over users.

>>> b = hb.Batch(name='scatter')
>>> for name in ['Alice', 'Bob', 'Dan']:
...     j = b.new_job(name=name)
...     j.command(f'echo "hello {name}"')
>>> b.run()

In the previous example, we did not assign the jobs we created for each user to a unique variable name and instead named it j each time in the for loop. However, if we want to add a final gather job (sink) that depends on the completion of all user jobs, then we need to keep track of all of the user jobs so we can use the Job.depends_on() method to explicitly link the sink job to be dependent on the user jobs, which are stored in the jobs array. The single asterisk before jobs is used in Python to have all elements in the array be treated as separate input arguments to the function, in this case Job.depends_on().

_images/dags.005.png
>>> b = hb.Batch(name='scatter-gather-1')
>>> jobs = []
>>> for name in ['Alice', 'Bob', 'Dan']:
...     j = b.new_job(name=name)
...     j.command(f'echo "hello {name}"')
...     jobs.append(j)
>>> sink = b.new_job(name='sink')
>>> sink.command(f'echo "I wait for everyone"')
>>> sink.depends_on(*jobs)
>>> b.run()

Now that we know how to create a sink job that depends on an arbitrary number of jobs, we want to have the outputs of each of the per-user jobs be implicit file dependencies in the sink job (see the section on file dependencies). The changes from the previous example to make this happen are each job j uses an f-string to create a temporary output file j.ofile where the output to echo is redirected. We then use all of the output files in the sink command by creating a string with the temporary output file names for each job. A JobResourceFile is a Batch-specific object that inherits from str. Therefore, you can use JobResourceFile as if they were strings, which we do with the join command for strings.

_images/dags.006.png
>>> b = hb.Batch(name='scatter-gather-2')
>>> jobs = []
>>> for name in ['Alice', 'Bob', 'Dan']:
...     j = b.new_job(name=name)
...     j.command(f'echo "hello {name}" > {j.ofile}')
...     jobs.append(j)
>>> sink = b.new_job(name='sink')
>>> sink.command('cat {}'.format(' '.join([j.ofile for j in jobs])))
>>> b.run()

Nested Scatters

We can also create a nested scatter where we have a series of jobs per user. This is equivalent to a nested for loop. In the example below, we instantiate a new Batch object b. Then for each user in ‘Alice’, ‘Bob’, and ‘Dan’ we create new jobs for making the bed, doing laundry, and grocery shopping. In total, we will have created 9 jobs that run in parallel as we did not define any dependencies between the jobs.

_images/dags.007.png
>>> b = hb.Batch(name='nested-scatter-1')
>>> for user in ['Alice', 'Bob', 'Dan']:
...     for chore in ['make-bed', 'laundry', 'grocery-shop']:
...         j = b.new_job(name=f'{user}-{chore}')
...         j.command(f'echo "user {user} is doing chore {chore}"')
>>> b.run()

We can implement the same example as above with a function that implements the inner for loop. The do_chores function takes a Batch object to add new jobs to and a user name for whom to create chore jobs for. Like above, we create 9 independent jobs. However, by structuring the code into smaller functions that take batch objects, we can create more complicated dependency graphs and reuse components across various computational pipelines.

>>> def do_chores(b, user):
...     for chore in ['make-bed', 'laundry', 'grocery-shop']:
...         j = b.new_job(name=f'{user}-{chore}')
...         j.command(f'echo "user {user} is doing chore {chore}"')

>>> b = hb.Batch(name='nested-scatter-2')
>>> for user in ['Alice', 'Bob', 'Dan']:
...     do_chores(b, user)
>>> b.run()

Lastly, we provide an example of a more complicated batch that has an initial job, then scatters jobs per user, then has a series of gather / sink jobs to wait for the per user jobs to be done before completing.

_images/dags.008.png
>>> def do_chores(b, head, user):
...     chores = []
...     for chore in ['make-bed', 'laundry', 'grocery-shop']:
...         j = b.new_job(name=f'{user}-{chore}')
...         j.command(f'echo "user {user} is doing chore {chore}"')
...         j.depends_on(head)
...         chores.append(j)
...     sink = b.new_job(name=f'{user}-sink')
...     sink.depends_on(*chores)
...     return sink

>>> b = hb.Batch(name='nested-scatter-3')
>>> head = b.new_job(name='head')
>>> user_sinks = []
>>> for user in ['Alice', 'Bob', 'Dan']:
...     user_sink = do_chores(b, head, user)
...     user_sinks.append(user_sink)
>>> final_sink = b.new_job(name='final-sink')
>>> final_sink.depends_on(*user_sinks)
>>> b.run()

Input Files

Previously, we discussed that JobResourceFile are temporary files and are created from Job objects. However, in order to read a file that was not generated by executing jobs (input file), we use the method Batch.read_input() to create an InputResourceFile. An input resource file can be used exactly in the same way as a JobResourceFile. We can refer to an input resource file in a command using an f-string. In the example below, we add the file data/hello.txt as an input resource file called input. We then print the contents of input to stdout in Job j.

>>> b = hb.Batch(name='hello-input')
>>> input = b.read_input('data/hello.txt')
>>> j = b.new_job(name='hello')
>>> j.command(f'cat {input}')
>>> b.run()

Why do we need to explicitly add input files to batches rather than referring directly to the path in the command? You could refer directly to the path when using the LocalBackend, but only if you are not specifying a docker image to use when running the command with BashJob.image(). This is because Batch copies any input files to a special temporary directory which gets mounted to the Docker container. When using the ServiceBackend, input files would be files in Google Storage. Many commands do not know how to handle file paths in Google Storage. Therefore, we suggest explicitly adding all input files as input resource files to the batch so to make sure the same code can run in all scenarios. Files that are already in a Docker image do not need to be read as inputs to the batch.

Output Files

All files generated by Batch are temporary files! They are copied as appropriate between jobs for downstream jobs’ use, but will be removed when the batch has completed. In order to save files generated by a batch for future use, you need to explicitly call Batch.write_output(). The first argument to Batch.write_output() can be any type of ResourceFile which includes input resource files and job resource files as well as resource groups as described below. The second argument to write_output should be either a local file path or a google storage file path when using the LocalBackend. For the ServiceBackend, the second argument must be a google storage file path.

>>> b = hb.Batch(name='hello-input')
>>> j = b.new_job(name='hello')
>>> j.command(f'echo "hello" > {j.ofile}')
>>> b.write_output(j.ofile, 'output/hello.txt')
>>> b.run()

Resource Groups

Many bioinformatics tools treat files as a group with a common file path and specific file extensions. For example, PLINK stores genetic data in three files: *.bed has the genotype data, *.bim has the variant information, and *.fam has the sample information. PLINK can take as an input the path to the files expecting there will be three files with the appropriate extensions. It also writes files with a common file root and specific file extensions including when writing out a new dataset or outputting summary statistics.

To enable Batch to work with file groups, we added a ResourceGroup object that is essentially a dictionary from file extension name to file path. When creating a ResourceGroup in a Job (equivalent to a JobResourceFile), you first need to use the method BashJob.declare_resource_group() to declare the files in the resource group explicitly before referring to the resource group in a command. This is because the default when referring to an attribute on a job that has not been defined before is to create a JobResourceFile and not a resource group.

In the example below, we first declare that create.bfile will be a resource group with three files. The attribute name comes from the name of the key word argument bfile. The constructor expects a dictionary as the value for the key word argument. The dictionary defines the names of each of the files and the file path where they should be located. In this example, the file paths contain {root} which is the common temporary file path that will get substituted in to create the final file path. Do not use f-strings here as we substitute a value for {root} when creating the resource group!

We can then refer to create.bfile in commands which gets interpolated with the common temporary file root path (equivalent to {root}) or we can refer to a specific file in the resource group such as create.bfile.fam.

>>> b = hb.Batch(name='resource-groups')
>>> create = b.new_job(name='create-dummy')
>>> create.declare_resource_group(bfile={'bed': '{root}.bed',
...                                      'bim': '{root}.bim',
...                                      'fam': '{root}.fam'})
>>> create.command(f'plink --dummy 10 100 --make-bed --out {create.bfile}')
>>> b.run() 

As described previously for input files, we need a separate mechanism for creating a resource group from a set of input files using the method Batch.read_input_group(). The constructor takes key word arguments that define the name of the file such as bed to the path where that file is located. The resource group is then a dictionary of the name of the attribute to an InputResourceFile.

In the example below, we created an input resource group bfile with three files. The group’s common root file path can be referred to with bfile in a command or you can reference a specific input file such as bfile.fam.

>>> b = hb.Batch(name='resource-groups')
>>> bfile = b.read_input_group(bed='data/example.bed',
...                            bim='data/example.bim',
...                            fam='data/example.fam')
>>> wc_bim = b.new_job(name='wc-bim')
>>> wc_bim.command(f'wc -l {bfile.bim}')
>>> wc_fam = b.new_job(name='wc-fam')
>>> wc_fam.command(f'wc -l {bfile.fam}')
>>> b.run()

Resource File Extensions

If your tool requires a specific extension for the input files in a resource group, then you’d need to create the resource group as follows:

>>> b = hb.Batch(name='resource-file-extensions')
>>> rg = b.read_input_group(**{'txt.gz': 'data/hello.txt.gz'})
>>> rg['txt.gz']

Python Jobs

Up until now we have used the Batch.new_job() method to create a new BashJob. The jobs run a command that is assumed to be a bash command. However, Batch also has an alternate type of job called a PythonJob. Unlike BashJob, PythonJob does not have a BashJob.command() method and instead have a PythonJob.call() method that takes a Python function to call and the positional arguments and key-word arguments to provide to the function. The result of PythonJob.call() is a PythonResult which can be used as either arguments to another PythonJob or to other BashJob by using one of the methods to convert a PythonResult to a file: PythonResult.as_str(), PythonResult.as_repr(), and PythonResult.as_json().

In the example below, we first define two Python functions: hello_world() and upper(). Next, we create a batch and then create a new PythonJob with Batch.new_python_job(). Then we use PythonJob.call() and pass the hello_world function that we want to call. Notice we just passed the reference to the function and not hello_world(). We also add a Python string alice as an argument to the function. The result of the j.call() is a PythonResult which we’ve assigned to the variable hello_str.

We want to use the hello_str result and make all the letters in upper case. We call PythonJob.call() and pass a reference to the upper function. But now the argument is hello_str which holds the result from calling hello_world above. We assign the new output to the variable result.

At this point, we want to write out the transformed hello world result to a text file. However, result is a PythonResult. Therefore, we need to use the PythonResult.as_str() to convert result to a JobResourceFile with the string output HELLO WORLD ALICE. Now we can write the result to a file.

def hello_world(name):
    return f'hello {name}'


def upper(s):
    return s.upper()


b = hb.Batch(name='hello')
j = b.new_python_job()
hello_str = j.call(hello_world, 'alice')
result = j.call(upper, hello_str)
b.write_output(result.as_str(), 'output/hello-alice.txt')
b.run()

Backends

There are two backends that execute batches: the LocalBackend and the ServiceBackend. The local backend is used by default and executes jobs on your local computer. The service backend executes jobs in a shared compute cluster managed by the Hail team. To use the Batch Service, follow the directions here.