Mongo on Hadoop

At Foursquare, one of our most important pieces of data infrastructure is getting a copy of our production Mongo database into Hadoop. Today, we’re open-sourcing two parts of this job, a utility to dump Mongo to Hadoop, and code to read this data from MapReduce jobs.

Why Mongo on Hadoop?

Our Mongo database is the source of truth for everything at Foursquare. How many users do we have? How many people are checked into JFK right now? What’s the most-liked tip left by a friend at this bar? Ask Mongo. However, the database has to stay extremely responsive to reads and writes at all times; no one wants their check ins to show up ten minutes after they happen! This means that more expensive queries (such as looking at every user joined with every venue they’ve visited) cannot happen on Mongo. So how do we answer these questions? We dump the data to Hadoop!

What is Hadoop?

Hadoop is an open-source computation framework for massive sets of data. It has two main components: HDFS (Hadoop Distributed File System) for storing data, and MapReduce for running computations on that data. Foursquare’s current Hadoop cluster is about 100 servers in our datacenter, with 2.5 Petabytes of raw storage. Hadoop powers a lot of what we do here, from business and product decisions, to some of our coolest features (like the real-time notifications from the all-new Foursquare).

Having our Mongo data in Hadoop allows us to have the exact same view of the world that the app has. It lets us ask massive questions without any impact on the production database.

Getting The Data There

It starts with the Mongo replicas, which are also running in our datacenter. These nodes are all running independent LVM stacks beneath each mongod process (one LVM stack per physical SSD volume). Every six hours a process running on each node issues an LVM snapshot command for the disk on which a given mongo database runs. A central coordinator ensures that this happens as close as possible to simultaneously across all clusters. This creates a “sane” snapshot of the cluster. If shards were snapshotted at very different times, there could be records with foreign keys that don’t exist. These snapshots are archived and compressed locally, then uploaded to a specific directory named according to the snapshot group being taken in HDFS.

A separate process is continuously monitoring this directory, waiting for a complete cluster to be available (i.e., every shard from the cluster exists). Once that happens, the files are downloaded, decompressed, and extracted in parallel across several servers. When an individual file finishes downloading, we launch the mongodump utility to write the data back to HDFS. This data is in a format that’s easier to consume in MapReduce jobs. For example, all our checkin data up to and including 2014-01-01 is in a single directory, stored in Mongo’s BSON format: /datasets/mongo/bson/ident=checkins/version=2014-01-01/col=checkins/

Reading the Data

Having our entire Mongo database available in Hadoop is nice, but it’s not very useful unless it can be read by MapReduce jobs. Every Mongo collection that gets read in Hadoop has an associated Thrift definition, and a custom input format turns the BSON into the Thrift object. Since we use Scala, the Thrift objects are generated Scala code, using Spindle.

When an entire cluster is finished for a particular day, we put a marker file to indicate this to Hadoop jobs. Another process updates our Hive tables to point to the new data, entirely transparent to people writing Hive queries.

– Joe Ennever (@TDJoe), Data Infrastructure Engineer