Table Joins Tutorial

This tutorial walks through some ways to join Hail tables. We’ll use a simple movie dataset to illustrate. The movie dataset comes in multiple parts. Here are a few questions we might naturally ask about the dataset:

  • What is the mean rating per genre?
  • What is the favorite movie for each occupation?
  • What genres are most preferred by women vs men?

We’ll use joins to combine datasets in order to answer these questions.

Let’s initialize Hail, fetch the tutorial data, and load three tables: users, movies, and ratings.

In [1]:
import hail as hl
import seaborn

hl.utils.get_movie_lens('data/')

users = hl.read_table('data/users.ht')
movies = hl.read_table('data/movies.ht')
ratings = hl.read_table('data/ratings.ht')
Initializing Spark and Hail with default parameters...
Running on Apache Spark version 2.2.0
SparkUI available at http://10.32.4.4:4040
Welcome to
     __  __     <>__
    / /_/ /__  __/ /
   / __  / _ `/ / /
  /_/ /_/\_,_/_/_/   version 0.2.5-2595d91d83e0
LOGGING: writing to /hail/repo/hail/build/tmp/python/hail/docs/tutorials/hail-20181215-1613-0.2.5-2595d91d83e0.log
2018-12-15 16:13:44 Hail: INFO: Movie Lens files found!

The Key to Understanding Joins

To understand joins in Hail, we need to revisit one of the crucial properties of tables: the key.

A table has an ordered list of fields known as the key. Our users table has one key, the id field. We can see all the fields, as well as the keys, of a table by calling describe().

In [2]:
users.describe()
----------------------------------------
Global fields:
    None
----------------------------------------
Row fields:
    'id': int32
    'age': int32
    'sex': str
    'occupation': str
    'zipcode': str
----------------------------------------
Key: ['id']
----------------------------------------

key is a struct expression of all of the key fields, so we can refer to the key of a table without explicitly specifying the names of the key fields.

In [3]:
users.key.describe()
--------------------------------------------------------
Type:
    struct {
        id: int32
    }
--------------------------------------------------------
Source:
    <hail.table.Table object at 0x7fedbdcd79b0>
Index:
    ['row']
--------------------------------------------------------

Keys need not be unique or non-missing, although in many applications they will be both.

When tables are joined in Hail, they are joined based on their keys. In order to join two tables, they must share the same number of keys, same key types (i.e. string vs integer), and the same order of keys.

Let’s look at a simple example of a join. We’ll use the Table.parallelize() method to create two small tables, t1 and t2.

In [4]:
t1 = hl.Table.parallelize([
    {'a': 'foo', 'b': 1},
    {'a': 'bar', 'b': 2},
    {'a': 'bar', 'b': 2}],
    hl.tstruct(a=hl.tstr, b=hl.tint32),
    key='a')
t2 = hl.Table.parallelize([
    {'t': 'foo', 'x': 3.14},
    {'t': 'bar', 'x': 2.78},
    {'t': 'bar', 'x': -1},
    {'t': 'quam', 'x': 0}],
    hl.tstruct(t=hl.tstr, x=hl.tfloat64),
    key='t')
In [5]:
t1.show()
2018-12-15 16:13:46 Hail: INFO: Coerced sorted dataset
2018-12-15 16:13:46 Hail: INFO: Coerced dataset with out-of-order partitions.
+-------+-------+
| a     |     b |
+-------+-------+
| str   | int32 |
+-------+-------+
| "bar" |     2 |
| "bar" |     2 |
| "foo" |     1 |
+-------+-------+

In [6]:
t2.show()
2018-12-15 16:13:47 Hail: INFO: Ordering unsorted dataset with network shuffle
+--------+-----------+
| t      |         x |
+--------+-----------+
| str    |   float64 |
+--------+-----------+
| "bar"  |  2.78e+00 |
| "bar"  | -1.00e+00 |
| "foo"  |  3.14e+00 |
| "quam" |  0.00e+00 |
+--------+-----------+

Now, we can join the tables.

In [7]:
j = t1.annotate(t2_x = t2[t1.a].x)
j.show()
2018-12-15 16:13:47 Hail: INFO: Coerced sorted dataset
2018-12-15 16:13:47 Hail: INFO: Coerced dataset with out-of-order partitions.
2018-12-15 16:13:47 Hail: INFO: Ordering unsorted dataset with network shuffle
+-------+-------+----------+
| a     |     b |     t2_x |
+-------+-------+----------+
| str   | int32 |  float64 |
+-------+-------+----------+
| "bar" |     2 | 2.78e+00 |
| "bar" |     2 | 2.78e+00 |
| "foo" |     1 | 3.14e+00 |
+-------+-------+----------+

Let’s break this syntax down.

t2[t1.a] is an expression referring to the row of table t2 with value t1.a. So this expression will create a map between the keys of t1 and the rows of t2. You can view this mapping directly:

In [8]:
t2[t1.a].show()
2018-12-15 16:13:48 Hail: INFO: Coerced sorted dataset
2018-12-15 16:13:48 Hail: INFO: Coerced dataset with out-of-order partitions.
2018-12-15 16:13:48 Hail: INFO: Ordering unsorted dataset with network shuffle
+-------+----------+
| a     | <expr>.x |
+-------+----------+
| str   |  float64 |
+-------+----------+
| "bar" | 2.78e+00 |
| "bar" | 2.78e+00 |
| "foo" | 3.14e+00 |
+-------+----------+

Since we only want the field x from t2, we can select it with t2[t1.a].x. Then we add this field to t1 with the anntotate_rows() method. The new joined table j has a field t2_x that comes from the rows of t2. The tables could be joined, because they shared the same number of keys (1) and the same key type (string). The keys do not need to share the same name. Notice that the rows with keys present in t2 but not in t1 do not show up in the final result. This join syntax performs a left join. Tables also have a SQL-style inner/left/right/outer join() method.

The magic of keys is that they can be used to create a mapping, like a Python dictionary, between the keys of one table and the row values of another table: table[expr] will refer to the row of table that has a key value of expr. If the row is not unique, one such row is chosen arbitrarily.

Here’s a subtle bit: if expr is an expression indexed by a row of table2, then table[expr] is also an expression indexed by a row of table2.

Also note that while they look similar, table['field'] and table1[table2.key] are doing very different things!

table['field'] selects a field from the table, while table1[table2.key] creates a mapping between the keys of table2 and the rows of table1.

In [9]:
t1['a'].describe()
--------------------------------------------------------
Type:
    str
--------------------------------------------------------
Source:
    <hail.table.Table object at 0x7fede4e20b70>
Index:
    ['row']
--------------------------------------------------------
In [10]:
t2[t1.a].describe()
--------------------------------------------------------
Type:
    struct {
        x: float64
    }
--------------------------------------------------------
Source:
    <hail.table.Table object at 0x7fede4e20b70>
Index:
    ['row']
--------------------------------------------------------

Joining Tables

Now that we understand the basics of how joins work, let’s use a join to compute the average movie rating per genre.

We have a table ratings, which contains user_id, movie_id, and rating fields. Group by movie_id and aggregate to get the mean rating of each movie.

In [11]:
t = (ratings.group_by(ratings.movie_id)
     .aggregate(rating = hl.agg.mean(ratings.rating)))
t.describe()
----------------------------------------
Global fields:
    None
----------------------------------------
Row fields:
    'movie_id': int32
    'rating': float64
----------------------------------------
Key: ['movie_id']
----------------------------------------

To get the mean rating by genre, we need to join in the genre field from the movies table.

In [12]:
t = t.annotate(genres = movies[t.movie_id].genres)
t.describe()
----------------------------------------
Global fields:
    None
----------------------------------------
Row fields:
    'movie_id': int32
    'rating': float64
    'genres': array<str>
----------------------------------------
Key: ['movie_id']
----------------------------------------
In [13]:
t.show()
2018-12-15 16:13:50 Hail: INFO: Ordering unsorted dataset with network shuffle
+----------+----------+-------------------------------------+
| movie_id |   rating | genres                              |
+----------+----------+-------------------------------------+
|    int32 |  float64 | array<str>                          |
+----------+----------+-------------------------------------+
|        1 | 3.88e+00 | ["Animation","Children's","Comedy"] |
|        2 | 3.21e+00 | ["Action","Adventure","Thriller"]   |
|        3 | 3.03e+00 | ["Thriller"]                        |
|        4 | 3.55e+00 | ["Action","Comedy","Drama"]         |
|        5 | 3.30e+00 | ["Crime","Drama","Thriller"]        |
|        6 | 3.58e+00 | ["Drama"]                           |
|        7 | 3.80e+00 | ["Drama","Sci-Fi"]                  |
|        8 | 4.00e+00 | ["Children's","Comedy","Drama"]     |
|        9 | 3.90e+00 | ["Drama"]                           |
|       10 | 3.83e+00 | ["Drama","War"]                     |
+----------+----------+-------------------------------------+
showing top 10 rows

We want to group the ratings by genre, but they’re packed up in an array. To unpack the genres, we can use explode.

explode creates a new row for each element in the value of the field, which must be a collection (array or set).

In [14]:
t = t.explode(t.genres)
t.show()
2018-12-15 16:13:52 Hail: INFO: Ordering unsorted dataset with network shuffle
+----------+----------+--------------+
| movie_id |   rating | genres       |
+----------+----------+--------------+
|    int32 |  float64 | str          |
+----------+----------+--------------+
|        1 | 3.88e+00 | "Animation"  |
|        1 | 3.88e+00 | "Children's" |
|        1 | 3.88e+00 | "Comedy"     |
|        2 | 3.21e+00 | "Action"     |
|        2 | 3.21e+00 | "Adventure"  |
|        2 | 3.21e+00 | "Thriller"   |
|        3 | 3.03e+00 | "Thriller"   |
|        4 | 3.55e+00 | "Action"     |
|        4 | 3.55e+00 | "Comedy"     |
|        4 | 3.55e+00 | "Drama"      |
+----------+----------+--------------+
showing top 10 rows

Finally, we can get group by genre and aggregate to get the mean rating per genre.

In [15]:
t = (t.group_by(t.genres)
     .aggregate(rating = hl.agg.mean(t.rating)))
t.show(n=100)
2018-12-15 16:13:53 Hail: INFO: Ordering unsorted dataset with network shuffle
+---------------+----------+
| genres        |   rating |
+---------------+----------+
| str           |  float64 |
+---------------+----------+
| "Action"      | 2.97e+00 |
| "Adventure"   | 3.14e+00 |
| "Animation"   | 3.30e+00 |
| "Children's"  | 2.92e+00 |
| "Comedy"      | 3.00e+00 |
| "Crime"       | 3.21e+00 |
| "Documentary" | 3.23e+00 |
| "Drama"       | 3.19e+00 |
| "Fantasy"     | 2.85e+00 |
| "Film-Noir"   | 3.55e+00 |
| "Horror"      | 2.73e+00 |
| "Musical"     | 3.38e+00 |
| "Mystery"     | 3.34e+00 |
| "Romance"     | 3.24e+00 |
| "Sci-Fi"      | 3.17e+00 |
| "Thriller"    | 3.14e+00 |
| "War"         | 3.49e+00 |
| "Western"     | 3.19e+00 |
+---------------+----------+

2018-12-15 16:13:53 Hail: INFO: Ordering unsorted dataset with network shuffle

Exercises

  • What is the favorite movie for each occupation?
  • What genres are rated most differently by men and women?