Bulk Synchronous Parallel and Pregel

Large-scale graph processing

Paul Krzyzanowski

March 24, 2021

Goal: Create a software framework for fault tolerant, deadlock-free parallel processing. Then, adapt that to create an software framework makes it easy to operate on graphs on a massive scale.

Bulk Synchronous Parallel (BSP)


Bulk Synchronous Parallel (BSP) is a programming model and computation framework for parallel computing. Computation is divided into a sequence of supersteps. In each superstep, a set of processes, running the same code, executes concurrently and creates messages that are sent to other processes. The superstep ends when all the computation in the superstep is complete and all messages have been sent. A barrier synchronization at the end of the superstep ensures that all messages have been transmitted (but not yet delivered to the processes). The next superstep begins with the delivery of all those messages to the processes, that then execute their superstep and send messages that will be delivered at the start of the next superstep. This process continues until all processors vote to halt.

Note that no process is permitted to send and receive messages from or to another process during a superstep. Any sent messages will be delivered only at the start of the next superstep. This restriction ensures deadlock-free execution.

A popular implementation of the BSP framework is Apache Hama.


What’s a graph?

A graph is a set of vertices connected by edges. Edges are directed from one vertex to another or bidirectional. In computing, a vertex is represented by an object and a directed edge is a link to another object.

Graphs are all around us

They represent computer networks, social groups, roads, disease outbreaks, phone call connections, Twitter followers, Facebook friends, web page links, etc. Some of these graphs have an enormous scale. The world wide web has billions of pages and Facebook has around a billion users.

What’s wrong with MapReduce?

Nothing is wrong with MapReduce. It’s great for many problems. However, many graph traversal problems, when written for MapReduce, end up having to take multiple iterations of MapReduce, with the output of one iteration feeding the input of another. This is not only slow, but it is also inefficient as data has to be written to files at the end of every map and reduce operation and moved between machines. The entire state of the graph needs to be transmitted in each stage, which requires a lot more communication overhead than Pregel, where the vertices and edges remain on the machine that performs the computation. Moreover, MapReduce is not the most direct way of thinking about and logically solving many problems.

Introducing Pregel

Pregel is a software framework created by Google to make it easy to work on huge graphs (e.g., ones with billions of vertices) that span many machines (e.g., tens of thousands). Like MapReduce, the framework relieves the programmer from having to deal with assigning tasks to processors, monitoring jobs, handling failures, and managing communications.

The Pregel framework allows you to write “vertex-centric” code. That is, the same user code, a compute() function, is run concurrently on each vertex of the graph. Each instance of this function keeps track of information, can iterate over outgoing edges (each of which has a value), and can send messages to the vertices connected to those edges or to any other vertices it may know about (e.g., having received a vertex ID via a message).

When a function does not have any more work to do, it votes to halt. This puts the corresponding vertex in an inactive state. When all vertices are in an inactive state, the framework terminates. However, if a vertex’s compute function sends a message to an inactive vertex, that vertex will be made active at the next superstep.

Pregel is an adaptation of the Bulk Synchronous Parallel (BSP) model that is specifically designed with graph processing in mind. Like BSP, Pregel executes in supersteps. Each superstep consists of computation followed by message sending. All messages are synchronized with a barrier, which marks the end of the superstep. At the next superstep, the messages from the previous superstep are delivered to, and available at, the compute function. The downside of using BSP directly for graph processing is that significant additional work would be needed to define, propagate, and maintain the topology (connections) of a graph and map vertices to compute nodes.

Advanced APIs

Since there is overhead in sending messages to vertices, particularly when they are on other machines, Pregel supports the optional use of combiners. A combiner is a user-defined function that is applied to a bunch of messages all targeted to the same vertex. The Combine method processes the values and creates a single input to that vertex. For example, if the goal is to take the sum of a set of values or to choose data from the highest-numbered vertex, the combiner can merge several messages into one before they are transmitted over the network. This does not alter the compute function since it still has to be prepared to receive multiple messages from multiple sources.

To manage global state, such as overall statistics, total edges of a graph, global flags, or minimum or maximum values of a vertex, Pregel allows a user to define an aggregator. An aggregator combines received values into one value and makes that value available to all vertices at the next superstep.

Pregel design

Pregel uses a master-worker architecture. Many copies of the program are started on a cluster of machines. One copy becomes the master and is responsible for coordinating activity rather than processing the graph. The others are workers. The master registers itself with a name server (Chubby). Each worker process contacts the name server to find the master.

The Pregel framework divides the graph into partitions. Each partition contains a set of vertices and their outgoing edges. A vertex is assigned to a partition based on its ID. By default, this is simply a hash function: hash(vertex_ID) mod N, where N is the number of partitions.

The master assigns one or more partitions to each worker. Each partition will run in a separate thread. Assigning multiple partitions per worker enables better use of the CPU and better load balancing and more powerful machines can be assigned more partitions.

The master assigns chunks of input, which is usually resident in GFS or Bigtable (a large structured table), to each worker. Each input item is a set of vertices and its edges. Workers read this input and create either local messages for the vertices they manage or, if the input record is for a remote vertex, send the message to the worker that owns that vertex.

All vertices are initially marked as active. The master then asks each worker to perform a superstep. To do this, the worker will run concurrent threads, one for each partition. Each thread loops through all the active vertices in that partition, calling the Compute() method for the vertex. The compute function consumes input, runs its algorithm, and generates zero or more messages to other vertices. Workers send these messages asynchronously but they will not be delivered to their target functions until the next superstep starts. When a worker is done with its processing for one superstep, it informs the master. It also tells the master how many of the vertices it manages will be in the active state in the next superstep. The master waits for all workers to complete before starting the next superstep. This cycle continues until there are no more vertices in the active state.

Note that the programmer does not need to be aware of any of this – the framework takes care of starting the workers and tracking their execution. Programmers only need to know that their Compute() method will be invoked once for each vertex at each superstep.

Fault tolerance

Pregel uses checkpointing for fault tolerance. The master tells each worker to checkpoint itself every N supersteps. Checkpointing means that each vertex must save its entire state in stable storage at the start of the superstep. This state will include vertex values, edge values, incoming messages, and possibly any other state that the algorithm needs to track. A master will periodically send ping messages to workers to see if they are alive. If a master does not hear from a worker within a certain window of time, it assumes that the worker has failed. In this case, the master reassigns the set of vertices to other workers and tells all workers to restart their computation from the superstep at the most recent checkpoint.

A popular implementation of Pregel is Apache Giraph, which has been used by Facebook to analyze its graph of social connections among its user community. Facebook improved the platform to allow Giraph to handle over a billion users, hundreds of billions of social connections, and a trillion edges.

Last modified November 11, 2021.
recycled pixels