Table Joins Tutorial

This tutorial walks through some ways to join Hail tables. We’ll use a simple movie dataset to illustrate. The movie dataset comes in multiple parts. Here are a few questions we might naturally ask about the dataset:

  • What is the mean rating per genre?

  • What is the favorite movie for each occupation?

  • What genres are most preferred by women vs men?

We’ll use joins to combine datasets in order to answer these questions.

Let’s initialize Hail, fetch the tutorial data, and load three tables: users, movies, and ratings.

[1]:
import hail as hl

hl.utils.get_movie_lens('data/')

users = hl.read_table('data/users.ht')
movies = hl.read_table('data/movies.ht')
ratings = hl.read_table('data/ratings.ht')
Loading BokehJS ...
Initializing Hail with default parameters...
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Running on Apache Spark version 3.5.0
SparkUI available at http://hostname-09f2439d4b:4040
Welcome to
     __  __     <>__
    / /_/ /__  __/ /
   / __  / _ `/ / /
  /_/ /_/\_,_/_/_/   version 0.2.133-4c60fddb171a
LOGGING: writing to /io/hail/python/hail/docs/tutorials/hail-20241004-2010-0.2.133-4c60fddb171a.log
2024-10-04 20:10:22.038 Hail: INFO: Movie Lens files found!

The Key to Understanding Joins

To understand joins in Hail, we need to revisit one of the crucial properties of tables: the key.

A table has an ordered list of fields known as the key. Our users table has one key, the id field. We can see all the fields, as well as the keys, of a table by calling describe().

[2]:
users.describe()
----------------------------------------
Global fields:
    None
----------------------------------------
Row fields:
    'id': int32
    'age': int32
    'sex': str
    'occupation': str
    'zipcode': str
----------------------------------------
Key: ['id']
----------------------------------------

key is a struct expression of all of the key fields, so we can refer to the key of a table without explicitly specifying the names of the key fields.

[3]:
users.key.describe()
--------------------------------------------------------
Type:
        struct {
        id: int32
    }
--------------------------------------------------------
Source:
    <hail.table.Table object at 0x7f5beee034f0>
Index:
    ['row']
--------------------------------------------------------

Keys need not be unique or non-missing, although in many applications they will be both.

When tables are joined in Hail, they are joined based on their keys. In order to join two tables, they must share the same number of keys, same key types (i.e. string vs integer), and the same order of keys.

Let’s look at a simple example of a join. We’ll use the Table.parallelize() method to create two small tables, t1 and t2.

[4]:
t1 = hl.Table.parallelize([
    {'a': 'foo', 'b': 1},
    {'a': 'bar', 'b': 2},
    {'a': 'bar', 'b': 2}],
    hl.tstruct(a=hl.tstr, b=hl.tint32),
    key='a')
t2 = hl.Table.parallelize([
    {'t': 'foo', 'x': 3.14},
    {'t': 'bar', 'x': 2.78},
    {'t': 'bar', 'x': -1},
    {'t': 'quam', 'x': 0}],
    hl.tstruct(t=hl.tstr, x=hl.tfloat64),
    key='t')
[5]:
t1.show()
SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder".
SLF4J: Defaulting to no-operation MDCAdapter implementation.
SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for further details.
a
b
strint32
"bar"2
"bar"2
"foo"1
[6]:
t2.show()
t
x
strfloat64
"bar"2.78e+00
"bar"-1.00e+00
"foo"3.14e+00
"quam"0.00e+00

Now, we can join the tables.

[7]:
j = t1.annotate(t2_x = t2[t1.a].x)
j.show()
[Stage 3:==========================================>              (12 + 4) / 16]
a
b
t2_x
strint32float64
"bar"22.78e+00
"bar"22.78e+00
"foo"13.14e+00

Let’s break this syntax down.

t2[t1.a] is an expression referring to the row of table t2 with value t1.a. So this expression will create a map between the keys of t1 and the rows of t2. You can view this mapping directly:

[8]:
t2[t1.a].show()
<expr>
a
x
strfloat64
"bar"2.78e+00
"bar"2.78e+00
"foo"3.14e+00

Since we only want the field x from t2, we can select it with t2[t1.a].x. Then we add this field to t1 with the anntotate_rows() method. The new joined table j has a field t2_x that comes from the rows of t2. The tables could be joined, because they shared the same number of keys (1) and the same key type (string). The keys do not need to share the same name. Notice that the rows with keys present in t2 but not in t1 do not show up in the final result. This join syntax performs a left join. Tables also have a SQL-style inner/left/right/outer join() method.

The magic of keys is that they can be used to create a mapping, like a Python dictionary, between the keys of one table and the row values of another table: table[expr] will refer to the row of table that has a key value of expr. If the row is not unique, one such row is chosen arbitrarily.

Here’s a subtle bit: if expr is an expression indexed by a row of table2, then table[expr] is also an expression indexed by a row of table2.

Also note that while they look similar, table['field'] and table1[table2.key] are doing very different things!

table['field'] selects a field from the table, while table1[table2.key] creates a mapping between the keys of table2 and the rows of table1.

[9]:
t1['a'].describe()
--------------------------------------------------------
Type:
        str
--------------------------------------------------------
Source:
    <hail.table.Table object at 0x7f5bee73d130>
Index:
    ['row']
--------------------------------------------------------
[10]:
t2[t1.a].describe()
--------------------------------------------------------
Type:
        struct {
        x: float64
    }
--------------------------------------------------------
Source:
    <hail.table.Table object at 0x7f5bee73d130>
Index:
    ['row']
--------------------------------------------------------

Joining Tables

Now that we understand the basics of how joins work, let’s use a join to compute the average movie rating per genre.

We have a table ratings, which contains user_id, movie_id, and rating fields. Group by movie_id and aggregate to get the mean rating of each movie.

[11]:
t = (ratings.group_by(ratings.movie_id)
     .aggregate(rating = hl.agg.mean(ratings.rating)))
t.describe()
----------------------------------------
Global fields:
    None
----------------------------------------
Row fields:
    'movie_id': int32
    'rating': float64
----------------------------------------
Key: ['movie_id']
----------------------------------------

To get the mean rating by genre, we need to join in the genre field from the movies table.

[12]:
t = t.annotate(genres = movies[t.movie_id].genres)
t.describe()
----------------------------------------
Global fields:
    None
----------------------------------------
Row fields:
    'movie_id': int32
    'rating': float64
    'genres': array<str>
----------------------------------------
Key: ['movie_id']
----------------------------------------
[13]:
t.show()
[Stage 12:>                                                         (0 + 1) / 1]
movie_id
rating
genres
int32float64array<str>
13.88e+00["Animation","Children's","Comedy"]
23.21e+00["Action","Adventure","Thriller"]
33.03e+00["Thriller"]
43.55e+00["Action","Comedy","Drama"]
53.30e+00["Crime","Drama","Thriller"]
63.58e+00["Drama"]
73.80e+00["Drama","Sci-Fi"]
84.00e+00["Children's","Comedy","Drama"]
93.90e+00["Drama"]
103.83e+00["Drama","War"]

showing top 10 rows

We want to group the ratings by genre, but they’re packed up in an array. To unpack the genres, we can use explode.

explode creates a new row for each element in the value of the field, which must be a collection (array or set).

[14]:
t = t.explode(t.genres)
t.show()
movie_id
rating
genres
int32float64str
13.88e+00"Animation"
13.88e+00"Children's"
13.88e+00"Comedy"
23.21e+00"Action"
23.21e+00"Adventure"
23.21e+00"Thriller"
33.03e+00"Thriller"
43.55e+00"Action"
43.55e+00"Comedy"
43.55e+00"Drama"

showing top 10 rows

Finally, we can get group by genre and aggregate to get the mean rating per genre.

[15]:
t = (t.group_by(t.genres)
     .aggregate(rating = hl.agg.mean(t.rating)))
t.show(n=100)
[Stage 24:>                                                         (0 + 1) / 1]
genres
rating
strfloat64
"Action"2.97e+00
"Adventure"3.14e+00
"Animation"3.30e+00
"Children's"2.92e+00
"Comedy"3.00e+00
"Crime"3.21e+00
"Documentary"3.23e+00
"Drama"3.19e+00
"Fantasy"2.85e+00
"Film-Noir"3.55e+00
"Horror"2.73e+00
"Musical"3.38e+00
"Mystery"3.34e+00
"Romance"3.24e+00
"Sci-Fi"3.17e+00
"Thriller"3.14e+00
"War"3.49e+00
"Western"3.19e+00

Let’s do another example. This time, we’ll see if we can determine what the highest rated movies are, on average, for each occupation. We start by joining the two tables movies and users.

[16]:
movie_data = ratings.annotate(
    movie = movies[ratings.movie_id].title,
    occupation = users[ratings.user_id].occupation)

movie_data.show()
[Stage 31:>                                                         (0 + 1) / 1]
user_id
movie_id
rating
movie
occupation
int32int32int32strstr
115"Toy Story (1995)""technician"
123"GoldenEye (1995)""technician"
134"Four Rooms (1995)""technician"
143"Get Shorty (1995)""technician"
153"Copycat (1995)""technician"
165"Shanghai Triad (Yao a yao yao dao waipo qiao) (1995)""technician"
174"Twelve Monkeys (1995)""technician"
181"Babe (1995)""technician"
195"Dead Man Walking (1995)""technician"
1103"Richard III (1995)""technician"

showing top 10 rows

Next, we’ll use group_by along with the aggregator hl.agg.mean to determine the average rating of each movie by occupation. Remember that the group_by operation is always associated with an aggregation.

[17]:
ratings_by_job = movie_data.group_by(
    movie_data.occupation, movie_data.movie).aggregate(
    mean = hl.agg.mean(movie_data.rating))

ratings_by_job.show()
[Stage 44:>                                                         (0 + 1) / 1]
occupation
movie
mean
strstrfloat64
"administrator""101 Dalmatians (1996)"2.75e+00
"administrator""12 Angry Men (1957)"4.56e+00
"administrator""187 (1997)"2.00e+00
"administrator""2 Days in the Valley (1996)"3.25e+00
"administrator""20,000 Leagues Under the Sea (1954)"3.63e+00
"administrator""2001: A Space Odyssey (1968)"4.04e+00
"administrator""39 Steps, The (1935)"3.80e+00
"administrator""8 1/2 (1963)"4.00e+00
"administrator""8 Seconds (1994)"4.00e+00
"administrator""A Chef in Love (1996)"5.00e+00

showing top 10 rows

[ ]:

Now we can use another group_by to determine the highest rated movie, on average, for each occupation.

The syntax here needs some explaining. The second step in the cell below is just to clean up the table created by the preceding step. If you examine the intermediate result (for example, by giving a new name to the output of the first step), you will see that there are two columns corresponding to occupation, occupation and val.occupation. This is an artifact of the aggregator syntax and the fact that we are retaining the entire row from ratings_by_job. So in the second line, we use select to keep those columns that we want, and also rename them to drop the val. syntax. Since occupation is a key of this table, we don’t need to select for it.

[18]:
highest_rated = ratings_by_job.group_by(
    ratings_by_job.occupation).aggregate(
    val = hl.agg.take(ratings_by_job.row,1, ordering = -ratings_by_job.mean)[0]
)

highest_rated = highest_rated.select(movie = highest_rated.val.movie,
                                    mean = highest_rated.val.mean)

highest_rated.show()
[Stage 67:>                                                         (0 + 1) / 1]
occupation
movie
mean
strstrfloat64
"administrator""A Chef in Love (1996)"5.00e+00
"artist""39 Steps, The (1935)"5.00e+00
"doctor""Alien (1979)"5.00e+00
"educator""Aparajito (1956)"5.00e+00
"engineer""Charade (1963)"5.00e+00
"entertainment""American in Paris, An (1951)"5.00e+00
"executive""A Chef in Love (1996)"5.00e+00
"healthcare""39 Steps, The (1935)"5.00e+00
"homemaker""Beautiful Girls (1996)"5.00e+00
"lawyer""Anastasia (1997)"5.00e+00

showing top 10 rows

Let’s try to get a deeper understanding of this result. Notice that every movie displayed has an average rating of 5, which means that every person gave these movies the highest rating. Is that unlikely? We can determine how many people rated each of these movies by working backwards and filtering our original movie_data table by fields in highest_rated.

Note that in the second line below, we are taking advantage of the fact that Hail tables are keyed.

[19]:
highest_rated = highest_rated.key_by(
    highest_rated.occupation, highest_rated.movie)

counts_temp = movie_data.filter(
    hl.is_defined(highest_rated[movie_data.occupation, movie_data.movie]))

counts = counts_temp.group_by(counts_temp.occupation, counts_temp.movie).aggregate(
    counts = hl.agg.count())

counts.show()
[Stage 108:>                                                        (0 + 1) / 1]
occupation
movie
counts
strstrint64
"administrator""A Chef in Love (1996)"1
"artist""39 Steps, The (1935)"1
"doctor""Alien (1979)"1
"educator""Aparajito (1956)"2
"engineer""Charade (1963)"1
"entertainment""American in Paris, An (1951)"1
"executive""A Chef in Love (1996)"1
"healthcare""39 Steps, The (1935)"1
"homemaker""Beautiful Girls (1996)"1
"lawyer""Anastasia (1997)"1

showing top 10 rows

So it looks like the highest rated movies, when computed naively, mostly have a single viewer rating them. To get a better understanding of the data, we can recompute this list but only include movies which have more than 1 viewer (left as an exercise).

Exercises

  • What is the favorite movie for each occupation, conditional on there being more than one viewer?

  • What genres are rated most differently by men and women?