Random Forest Model
Introduction
We want to use a random forest model to predict regional mutability of the genome (at a scale of 50kb) using a series of genomic features. Specifically, we divide the genome into non-overlapping 50kb windows and we regress the observed/expected variant count ratio (which indicates the mutability of a specific window) against a number of genomic features measured on each corresponding window (such as replication timing, recombination rate, and various histone marks). For each window under investigation, we fit the model using all the rest of the windows and then apply the model to that window to predict its mutability as a function of its genomic features.
To perform this analysis with Batch, we will first use a PythonJob
to execute a Python function directly for each window of interest. Next,
we will add a mechanism for checkpointing files as the number of windows
of interest is quite large (~52,000). Lastly, we will add a mechanism to batch windows
into groups of 10 to amortize the amount of time spent copying input
and output files compared to the time of the actual computation per window
(~30 seconds).
Batch Code
Imports
We import all the modules we will need. The random forest model code comes from the sklearn package.
import hailtop.batch as hb
import hailtop.fs as hfs
from hailtop.utils import grouped
import pandas as pd
from typing import List, Optional, Tuple
import argparse
import sklearn
Random Forest Function
The inputs to the random forest function are two data frame files. df_x is the path to a file containing a Pandas data frame where the variables in the data frame represent the number of genomic features measured on each corresponding window. df_y is the path to a file containing a Pandas data frame where the variables in the data frame are the observed and expected variant count ratio.
We write a function that runs the random forest model and leaves the window of interest out of the model window_name.
An important thing to note in the code below is the number of cores is a parameter to the function and matches the number of cores we give the job in the Batch control code below.
def random_forest(df_x_path: str, df_y_path: str, window_name: str, cores: int = 1) -> Tuple[str, float, float]:
# read in data
df_x = pd.read_table(df_x_path, header=0, index_col=0)
df_y = pd.read_table(df_y_path, header=0, index_col=0)
# split training and testing data for the current window
x_train = df_x[df_x.index != window_name]
x_test = df_x[df_x.index == window_name]
y_train = df_y[df_y.index != window_name]
y_test = df_y[df_y.index == window_name]
# run random forest
rf = RandomForestRegressor(n_estimators=100,
n_jobs=cores,
max_features=3/4,
oob_score=True,
verbose=False)
rf.fit(x_train, y_train)
# apply the trained random forest on testing data
y_pred = rf.predict(x_test)
# store obs and pred values for this window
obs = y_test["oe"].to_list()[0]
pred = y_pred[0]
return (window_name, obs, pred)
Format Result Function
The function below takes the expected output of the function random_forest and returns a tab-delimited string that will be used later on when concatenating results.
def as_tsv(input: Tuple[str, float, float]) -> str:
return '\t'.join(str(i) for i in input)
Build Python Image
In order to run a PythonJob
, Batch needs an image that has the
same version of Python as the version of Python running on your computer
and the Python package dill installed. Batch will automatically
choose a suitable image for you if your Python version is 3.9 or newer.
You can supply your own image that meets the requirements listed above to the
method PythonJob.image()
or as the argument default_python_image when
constructing a Batch . We also provide a convenience function docker.build_python_image()
for building an image that has the correct version of Python and dill installed
along with any desired Python packages.
For running the random forest, we need both the sklearn and pandas Python
packages installed in the image. We use docker.build_python_image()
to build
an image and push it automatically to the location specified (ex: us-docker.pkg.dev/hail-vdc/random-forest).
image = hb.build_python_image('us-docker.pkg.dev/hail-vdc/random-forest',
requirements=['sklearn', 'pandas'])
Control Code
We start by defining a backend.
backend = hb.ServiceBackend()
Second, we create a Batch
and specify the default Python image to
use for Python jobs with default_python_image. image is the return value
from building the Python image above and is the full name of where the newly
built image was pushed to.
b = hb.Batch(name='rf',
default_python_image=image)
Next, we read the y dataframe locally in order to get the list of windows to run. The file path containing the dataframe could be stored on the cloud. Therefore, we use the function hfs.open to read the data regardless of where it’s located.
with hfs.open(df_y_path) as f:
local_df_y = pd.read_table(f, header=0, index_col=0)
Now we have a y dataframe object on our local computer, but we need a way
for Batch to localize the files as inputs to a Job
. Therefore, we
use the method Batch.read_input()
to tell Batch to localize these files
when they are referenced by a Job
.
df_x_input = b.read_input(df_x_path)
df_y_input = b.read_input(df_y_path)
We initialize a list to keep track of all of the output files to concatenate later on.
results = []
We now have all of our inputs ready and can iterate through each window in the
y dataframe. For each window, we create a new PythonJob
using the
method Batch.new_python_job()
. We then use the method PythonJob.call()
to run the function random_forest. The inputs to random_forest are the Batch inputs
df_x_input and df_y_input as well as the window name. Notice that the first argument to
PythonJob.call()
is the reference to the function to call (i.e random_forest and
not random_forest(…). The rest of the arguments are the usual positional arguments and
key-word arguments to the function. Lastly, we assign the result of calling the function
to the variable result which is a PythonResult
. A PythonResult
can be thought of as a Python object and used in subsequent calls to PythonJob.call()
.
Since the type of result is a tuple of (str, float, float), we need to convert the Python tuple to a tab-delimited string that can later be concatenated. We use the as_tsv function we wrote above to do so. The input to as_tsv is result and we assign the output to tsv_result.
Lastly in the for loop for each window, we append the tsv_result to the results list. However,
tsv_result is a Python object. We use the PythonResult.as_str()
method to convert the
Python object to a text file containing the str() output of the Python object.
for window in local_df_y.index.to_list():
j = b.new_python_job()
result = j.call(random_forest, df_x_input, df_y_input, window)
tsv_result = j.call(as_tsv, result)
results.append(tsv_result.as_str())
Now that we have computed the random forest results for each window, we can concatenate
the outputs together into a single file using the concatenate()
function and then
write the concatenated results file to a permanent output location.
output = hb.concatenate(b, results)
b.write_output(output, results_path)
Finally, we call Batch.run()
to execute the batch and then close the backend.
b.run(wait=False)
backend.close()
Add Checkpointing
The pipeline we wrote above is not resilient to failing jobs. Therefore, we can add a way to checkpoint the results so we only run jobs that haven’t already succeeded in future runs of the pipeline. The way we do this is by having Batch write the computed result to a file and using the function hfs.exists to check whether the file already exists before adding that job to the DAG.
First, we define the checkpoint path for each window.
def checkpoint_path(window):
return f'gs://my_bucket/checkpoints/random-forest/{window}'
Next, we define the list of results we’ll append to:
results = []
Now, we take our for loop over the windows from before, but now we check whether
the checkpointed already exists. If it does exist, then we read the checkpointed
file as a InputResourceFile
using Batch.read_input()
and append
the input to the results list. If the checkpoint doesn’t exist and we add the job
to the batch, then we need to write the results file to the checkpoint location
using Batch.write_output()
.
for window in local_df_y.index.to_list():
checkpoint = checkpoint_path(window)
if hfs.exists(checkpoint):
result = b.read_input(checkpoint)
results.append(result)
continue
j = b.new_python_job()
result = j.call(random_forest, df_x_input, df_y_input, window)
tsv_result = j.call(as_tsv, result)
tsv_result = tsv_result.as_str()
b.write_output(tsv_result, checkpoint)
results.append(tsv_result)
Add Batching of Jobs
If we have a lot of short running jobs, then we might want to run the calls for multiple windows at a time in a single job along with the checkpointing mechanism to check if the result for the window has already completed.
Building on the solution above for checkpointing, we need two for loops instead of one to ensure we still get an even number of jobs in each batch while not rerunning previously completed windows.
First, we create a results array that is the size of the number of windows
indices = local_df_y.index.to_list()
results = [None] * len(indices)
We identify all of the windows whose checkpoint file already exists and append the inputs to the results list in the correct position in the list to ensure the ordering of results is consistent. We also create a list that holds tuples of the window to compute, the index of that window, and the checkpoint path.
inputs = []
for i, window in enumerate(indices):
checkpoint = checkpoint_path(window)
if hfs.exists(checkpoint):
result = b.read_input(checkpoint)
results[i] = result
continue
inputs.append((window, i, checkpoint))
Then we have another for loop that uses the hailtop.grouped
function to group the inputs into groups of 10 and create a
job for each group. Then we create a PythonJob
and
use PythonJob.call()
to run the random forest function
for each window in that group. Lastly, we append the result
to the correct place in the results list.
for inputs in grouped(10, inputs):
j = b.new_python_job()
for window, i, checkpoint in inputs:
result = j.call(random_forest, df_x_input, df_y_input, window)
tsv_result = j.call(as_tsv, result)
tsv_result = tsv_result.as_str()
b.write_output(tsv_result, checkpoint)
results[i] = tsv_result
Now we’ve only run the jobs in groups of 10 for jobs that have no existing checkpoint file. The results will be concatenated in the correct order.
Synopsis
We have presented three different ways with increasing complexity to write a pipeline that runs a random forest for various windows in the genome. The complete code is provided here for your reference.
from typing import Tuple
import pandas as pd
from sklearn.ensemble import RandomForestRegressor
import hailtop.batch as hb
import hailtop.fs as hfs
def random_forest(df_x_path: str, df_y_path: str, window_name: str, cores: int = 1) -> Tuple[str, float, float]:
# read in data
df_x = pd.read_table(df_x_path, header=0, index_col=0)
df_y = pd.read_table(df_y_path, header=0, index_col=0)
# split training and testing data for the current window
x_train = df_x[df_x.index != window_name]
x_test = df_x[df_x.index == window_name]
y_train = df_y[df_y.index != window_name]
y_test = df_y[df_y.index == window_name]
# run random forest
max_features = 3 / 4
rf = RandomForestRegressor(n_estimators=100, n_jobs=cores, max_features=max_features, oob_score=True, verbose=False)
rf.fit(x_train, y_train)
# apply the trained random forest on testing data
y_pred = rf.predict(x_test)
# store obs and pred values for this window
obs = y_test["oe"].to_list()[0]
pred = y_pred[0]
return (window_name, obs, pred)
def as_tsv(input: Tuple[str, float, float]) -> str:
return '\t'.join(str(i) for i in input)
def main(df_x_path, df_y_path, output_path, python_image):
backend = hb.ServiceBackend()
b = hb.Batch(name='rf-loo', default_python_image=python_image)
with hfs.open(df_y_path) as f:
local_df_y = pd.read_table(f, header=0, index_col=0)
df_x_input = b.read_input(df_x_path)
df_y_input = b.read_input(df_y_path)
results = []
for window in local_df_y.index.to_list():
j = b.new_python_job()
result = j.call(random_forest, df_x_input, df_y_input, window)
tsv_result = j.call(as_tsv, result)
results.append(tsv_result.as_str())
output = hb.concatenate(b, results)
b.write_output(output, output_path)
b.run(wait=False)
backend.close()
from typing import Tuple
import pandas as pd
from sklearn.ensemble import RandomForestRegressor
import hailtop.batch as hb
import hailtop.fs as hfs
def random_forest(df_x_path: str, df_y_path: str, window_name: str, cores: int = 1) -> Tuple[str, float, float]:
# read in data
df_x = pd.read_table(df_x_path, header=0, index_col=0)
df_y = pd.read_table(df_y_path, header=0, index_col=0)
# split training and testing data for the current window
x_train = df_x[df_x.index != window_name]
x_test = df_x[df_x.index == window_name]
y_train = df_y[df_y.index != window_name]
y_test = df_y[df_y.index == window_name]
# run random forest
max_features = 3 / 4
rf = RandomForestRegressor(n_estimators=100, n_jobs=cores, max_features=max_features, oob_score=True, verbose=False)
rf.fit(x_train, y_train)
# apply the trained random forest on testing data
y_pred = rf.predict(x_test)
# store obs and pred values for this window
obs = y_test["oe"].to_list()[0]
pred = y_pred[0]
return (window_name, obs, pred)
def as_tsv(input: Tuple[str, float, float]) -> str:
return '\t'.join(str(i) for i in input)
def checkpoint_path(window):
return f'gs://my_bucket/checkpoints/random-forest/{window}'
def main(df_x_path, df_y_path, output_path, python_image):
backend = hb.ServiceBackend()
b = hb.Batch(name='rf-loo', default_python_image=python_image)
with hfs.open(df_y_path) as f:
local_df_y = pd.read_table(f, header=0, index_col=0)
df_x_input = b.read_input(df_x_path)
df_y_input = b.read_input(df_y_path)
results = []
for window in local_df_y.index.to_list():
checkpoint = checkpoint_path(window)
if hfs.exists(checkpoint):
result = b.read_input(checkpoint)
results.append(result)
continue
j = b.new_python_job()
result = j.call(random_forest, df_x_input, df_y_input, window)
tsv_result = j.call(as_tsv, result)
tsv_result = tsv_result.as_str()
b.write_output(tsv_result, checkpoint)
results.append(tsv_result)
output = hb.concatenate(b, results)
b.write_output(output, output_path)
b.run(wait=False)
backend.close()
from typing import Tuple
import pandas as pd
from sklearn.ensemble import RandomForestRegressor
import hailtop.batch as hb
import hailtop.fs as hfs
from hailtop.utils import grouped
def random_forest(df_x_path: str, df_y_path: str, window_name: str, cores: int = 1) -> Tuple[str, float, float]:
# read in data
df_x = pd.read_table(df_x_path, header=0, index_col=0)
df_y = pd.read_table(df_y_path, header=0, index_col=0)
# split training and testing data for the current window
x_train = df_x[df_x.index != window_name]
x_test = df_x[df_x.index == window_name]
y_train = df_y[df_y.index != window_name]
y_test = df_y[df_y.index == window_name]
# run random forest
max_features = 3 / 4
rf = RandomForestRegressor(n_estimators=100, n_jobs=cores, max_features=max_features, oob_score=True, verbose=False)
rf.fit(x_train, y_train)
# apply the trained random forest on testing data
y_pred = rf.predict(x_test)
# store obs and pred values for this window
obs = y_test["oe"].to_list()[0]
pred = y_pred[0]
return (window_name, obs, pred)
def as_tsv(input: Tuple[str, float, float]) -> str:
return '\t'.join(str(i) for i in input)
def checkpoint_path(window):
return f'gs://my_bucket/checkpoints/random-forest/{window}'
def main(df_x_path, df_y_path, output_path, python_image):
backend = hb.ServiceBackend()
b = hb.Batch(name='rf-loo', default_python_image=python_image)
with hfs.open(df_y_path) as f:
local_df_y = pd.read_table(f, header=0, index_col=0)
df_x_input = b.read_input(df_x_path)
df_y_input = b.read_input(df_y_path)
indices = local_df_y.index.to_list()
results = [None] * len(indices)
inputs = []
for i, window in enumerate(indices):
checkpoint = checkpoint_path(window)
if hfs.exists(checkpoint):
result = b.read_input(checkpoint)
results[i] = result
continue
inputs.append((window, i, checkpoint))
for inputs in grouped(10, inputs):
j = b.new_python_job()
for window, i, checkpoint in inputs:
result = j.call(random_forest, df_x_input, df_y_input, window)
tsv_result = j.call(as_tsv, result)
tsv_result = tsv_result.as_str()
b.write_output(tsv_result, checkpoint)
results[i] = tsv_result
output = hb.concatenate(b, results)
b.write_output(output, output_path)
b.run(wait=False)
backend.close()