Pregel

In this post, I describe a simple but powerful framework for distributed computing called Pregel. Pregel was developed by Google, and is described in a 2010 paper written by seven Googlers. In 2009, the Google Research blog announced that the Pregel system was being used in dozens of applications within Google.

Pregel is a framework oriented toward graph-based algorithms. I won’t formally define graph-based algorithms here – we’ll see an example soon enough – but roughly speaking a graph-based algorithm is one which can be easily expressed in terms of the vertices of a graph, and their adjacent edges and vertices. Examples of problems which can be solved by graph-based algorithms include determining whether two vertices in a graph are connected, where there are clusters of connected vertices in a graph, and many other well-known graph problems. As a concrete example, in this post I describe how Pregel can be used to determine the PageRank of a web page.

What makes Pregel special is that it’s designed to scale very easily on a large-scale computer cluster. Typically, writing programs for clusters requires the programmer to get their hands dirty worrying about details of the cluster architecture, communication between machines in the cluster, considerations of fault-tolerance, and so on. The great thing about Pregel is that Pregel programs can be scaled (within limits) automatically on a cluster, without requiring the programmer to worry about the details of distributing the computation. Instead, they can concentrate on the algorithm they want to implement. In this, Pregel is similar to the MapReduce framework. Like MapReduce, Pregel gains this ability by concentrating on a narrow slice of problems. What makes Pregel interesting and different to MapReduce is that it is well-adapted to a somewhat different class of problems.

Using Pregel to compute PageRank

A Pregel program takes as input a graph, with many vertices and (directed) edges. The graph might, for example, be the link graph of the web, with the vertices representing web pages, and the edges representing links between those pages. Each vertex is also initialized with a value. For our PageRank example, the value will just be an initial guess for the PageRank.

Since I’m using PageRank as an example, let me briefly remind you how PageRank works. Imagine a websurfer surfing the web. A simple model of surfing behaviour might involve them either: (1) following a random link from the page they’re currently on; or (2) deciding they’re bored, and “teleporting” to a completely random page elsewhere on the web. Furthermore, the websurfer chooses to do the first type of action with probability 0.85, and the second type of action with probability 0.15. If they repeat this random browsing behaviour enough times, it turns out that the probability they’re on a given webpage eventually converges to a steady value, regardless of where they started. This probability is the PageRank for the page, and, roughly speaking, it’s a measure of the importance of the page: the higher the PageRank, the more important the page.

I won’t get into any more of the ins-and-outs of PageRank, but will assume that you’re happy enough with the description above. If you’re looking for more details, I’ve written an extended introduction.

We’re going to use Pregel to compute PageRank. During the initialization stage for the graph we’ll start each vertex (i.e., webpage) off with an estimate for its PageRank. It won’t make any difference to the final result what that starting estimate it, so let’s just use a very easy-to-compute probability distribution, say, initializing each vertex with a value that is just one over the total number of web pages (i.e., the number of vertices).

Once the input graph is initialized, the Pregel computation proceeds through a series of supersteps. During each superstep each vertex does two things: (1) it updates its own value; and (2) it can send messages to adjacent vertices. The way it updates its own value is by computing a user-specified function which depends on the value of the vertex at the end of the previous superstep, as well as the messages sent to the vertex during the last superstep. Similarly, the messages the vertex sends can depend both on its value and the messages it was sent during the last superstep.

To make this more concrete, here’s some python-like pseudocode showing how Pregel can be used to compute the PageRank of a link graph:

import pregel # includes pregel.Vertex class which we'll subclass

class PageRankVertex(pregel.Vertex):

    def update(self):
        if self.superstep < 50:
            # compute a new estimate of PageRank, based on the most recent
            # estimated PageRank of adjacent vertices
            self.value = 0.15 * 1/num_vertices + 0.85 * sum(self.messages)
            # send the new estimated PageRank to adjacent vertices, dividing
            # it equally between vertices
            self.messages = [(self.value / num_adjacent_vertices) 
                for each adjacent vertex]
        else:
            # stop after 50 supersteps
            self.active = False

initialize link structure for a 10-vertex graph
num_vertices = 10
for each vertex: # initialize values in the graph
    vertex.value = 1/num_vertices
run pregel
output values for all vertices

The code should be self-explanatory: we initialize the graph, with each vertex starting with an initial estimate for its own PageRank. We update that by imagining the websurfer making a single step, either teleporting to a new random vertex, or else randomly following a link to one of the adjacent vertices. This is repeated many times - I've arbitrarily chosen 50 supersteps - before halting.

With the PageRank pseudocode under our belts, let's return to the bigger picture and summarize the basic Pregel model a bit more formally.

Input phase: The input to a Pregel computation is a set of vertices. Each vertex has an initial value, and may also have an associated set of outgoing edges.

Computation phase: This is split up into supersteps. In any given superstep each vertex can update its value and also its outgoing edges. It can also emit messages, which are sent to adjacent vertices. The updated value, outgoing edges and messages are determined by a user-defined function of the vertice's value, edges, and incoming messages at the start of the superstep.

Note that Google's Pregel system allows messages to be sent to any other vertex, not just adjacent vertices, although the paper implies that in most cases messages are usually sent only to adjacent vertices. Also, Google's Pregel system allows both vertices and edges to have values. I've omitted both these for simplicity in the code below, although both are easily restored.

Halting: Each vertex has an attribute which determines whether it is active or not. Vertices start active, but can change to inactive at any superstep. The computation halts when every vertex is inactive. The paper notes that inactive vertices can be reactivated, although it is a little vague on when this happens.

Pregel's synchronous nature makes Pregel programs easy to think about. Although updates are done at many vertices in any single superstep, it doesn't matter in what order the updates are done, or if they're done in parallel, because the update at any specific vertex doesn't affect the result of updates at other vertices. That means there's no possibility of race conditions arising.

Single-machine Pregel library

I'll now describe a toy single-machine Pregel library, written in Python (v 2.6). The main additional feature beyond the description of Pregel given above is that this library partitions the vertices and assigns the different parts of the partition to workers, which in this implementation are separate Python threads. As we'll see below, on a cluster this idea is extended so the partitioned vertices aren't just assigned to different threads, but may be assigned to different machines. Here's the code (see also GitHub):

"""pregel.py is a python 2.6 module implementing a toy single-machine
version of Google's Pregel system for large-scale graph processing."""

import collections
import threading

class Vertex():

    def __init__(self,id,value,out_vertices):
        # This is mostly self-explanatory, but has a few quirks:
        #
        # self.id is included mainly because it's described in the
        # Pregel paper.  It is used briefly in the pagerank example,
        # but not in any essential way, and I was tempted to omit it.
        #
        # Each vertex stores the current superstep number in
        # self.superstep.  It's arguably not wise to store many copies
        # of global state in instance variables, but Pregel's
        # synchronous nature lets us get away with it.
        self.id = id 
        self.value = value
        self.out_vertices = out_vertices
        self.incoming_messages = []
        self.outgoing_messages = []
        self.active = True
        self.superstep = 0
   
class Pregel():

    def __init__(self,vertices,num_workers):
        self.vertices = vertices
        self.num_workers = num_workers

    def run(self):
        """Runs the Pregel instance."""
        self.partition = self.partition_vertices()
        while self.check_active():
            self.superstep()
            self.redistribute_messages()

    def partition_vertices(self):
        """Returns a dict with keys 0,...,self.num_workers-1
        representing the worker threads.  The corresponding values are
        lists of vertices assigned to that worker."""
        partition = collections.defaultdict(list)
        for vertex in self.vertices:
            partition[self.worker(vertex)].append(vertex)
        return partition

    def worker(self,vertex):
        """Returns the id of the worker that vertex is assigned to."""
        return hash(vertex) % self.num_workers

    def superstep(self):
        """Completes a single superstep.  

        Note that in this implementation, worker threads are spawned,
        and then destroyed during each superstep.  This creation and
        destruction causes some overhead, and it would be better to
        make the workers persistent, and to use a locking mechanism to
        synchronize.  The Pregel paper suggests that this is how
        Google's Pregel implementation works."""
        workers = []
        for vertex_list in self.partition.values():
            worker = Worker(vertex_list)
            workers.append(worker)
            worker.start()
        for worker in workers:
            worker.join()

    def redistribute_messages(self):
        """Updates the message lists for all vertices."""
        for vertex in self.vertices:
            vertex.superstep +=1
            vertex.incoming_messages = []
        for vertex in self.vertices:
            for (receiving_vertix,message) in vertex.outgoing_messages:
                receiving_vertix.incoming_messages.append((vertex,message))

    def check_active(self):
        """Returns True if there are any active vertices, and False
        otherwise."""
        return any([vertex.active for vertex in self.vertices])

class Worker(threading.Thread):

    def __init__(self,vertices):
        threading.Thread.__init__(self)
        self.vertices = vertices

    def run(self):
        self.superstep()

    def superstep(self):
        """Completes a single superstep for all the vertices in
        self."""
        for vertex in self.vertices:
            if vertex.active:
                vertex.update()

Here's the Python code for a computation of PageRank using both the Pregel library just given and, as a test, a more conventional matrix-based approach. You should not worry too much about the test code (at least initially), and concentrate on the bits related to Pregel.

"""pagerank.py illustrates how to use the pregel.py library, and tests
that the library works.

It illustrates pregel.py by computing the PageRank for a randomly
chosen 10-vertex web graph.

It tests pregel.py by computing the PageRank for the same graph in a
different, more conventional way, and showing that the two outputs are
near-identical."""

from pregel import *

# The next two imports are only needed for the test.  
from numpy import * 
import random

num_workers = 4
num_vertices = 10

def main():
    vertices = [PageRankVertex(j,1.0/num_vertices,[]) 
                for j in range(num_vertices)]
    create_edges(vertices)
    pr_test = pagerank_test(vertices)
    print "Test computation of pagerank:\n%s" % pr_test
    pr_pregel = pagerank_pregel(vertices)
    print "Pregel computation of pagerank:\n%s" % pr_pregel
    diff = pr_pregel-pr_test
    print "Difference between the two pagerank vectors:\n%s" % diff
    print "The norm of the difference is: %s" % linalg.norm(diff)

def create_edges(vertices):
    """Generates 4 randomly chosen outgoing edges from each vertex in
    vertices."""
    for vertex in vertices:
        vertex.out_vertices = random.sample(vertices,4)

def pagerank_test(vertices):
    """Computes the pagerank vector associated to vertices, using a
    standard matrix-theoretic approach to computing pagerank.  This is
    used as a basis for comparison."""
    I = mat(eye(num_vertices))
    G = zeros((num_vertices,num_vertices))
    for vertex in vertices:
        num_out_vertices = len(vertex.out_vertices)
        for out_vertex in vertex.out_vertices:
            G[out_vertex.id,vertex.id] = 1.0/num_out_vertices
    P = (1.0/num_vertices)*mat(ones((num_vertices,1)))
    return 0.15*((I-0.85*G).I)*P

def pagerank_pregel(vertices):
    """Computes the pagerank vector associated to vertices, using
    Pregel."""
    p = Pregel(vertices,num_workers)
    p.run()
    return mat([vertex.value for vertex in p.vertices]).transpose()

class PageRankVertex(Vertex):

    def update(self):
        # This routine has a bug when there are pages with no outgoing
        # links (never the case for our tests).  This problem can be
        # solved by introducing Aggregators into the Pregel framework,
        # but as an initial demonstration this works fine.
        if self.superstep < 50:
            self.value = 0.15 / num_vertices + 0.85*sum(
                [pagerank for (vertex,pagerank) in self.incoming_messages])
            outgoing_pagerank = self.value / len(self.out_vertices)
            self.outgoing_messages = [(vertex,outgoing_pagerank) 
                                      for vertex in self.out_vertices]
        else:
            self.active = False

if __name__ == "__main__":
    main()

You might observe that in the PageRank example the matrix-based code is no more complex than the Pregel-based code. So why bother with Pregel? What's different is that the Pregel code can, in principle, be automatically scaled on a cluster. And that's not so easy for the matrix-based approach.

The PageRank example also illustrates another point made in the Pregel paper. The authors note that users of Pregel at Google find it easy to program using Pregel once they begin to "think like a vertex". In a sense, the update method for instances of PageRankVertex is simply the vertex asking itself "What should I do in this superstep?"

There is, incidentally, a slight bug in the PageRank program. The bug arises because of a quirk in PageRank. Sometimes the random websurfer may come to a webpage that has no outgoing links. When that happens, they can't simply choose a random outgoing link. Instead, the PageRank algorithm is modified in this instance so that they always teleport to a page chosen completely at random.

A standard way of modelling this is to model a page with no outgoing links as a page that is linked to every single other page. It sounds counterintuitive, but it works - if it is connected to every single other page, then a random websurfer will, indeed, teleport to a completely random page.

Unfortunately, while this modification is fine in principle, it's not so good for Pregel. When Pregel is put on a cluster, it requires a lot of network overhead to send messages to large numbers of vertices. But we'll see below that there's a way of modifying Pregel so it can deal with this.

Problems

  • Write a Pregel program to determine whether two vertices in a graph are connected.

Distributed implementation of Pregel

To implement Pregel on a cluster, we need a way of assigning vertices to different machines and threads in the cluster. This can be done using a hashing scheme, as was done in the code above to assign vertices to different worker threads. It can also be done using other approaches, if desired, such as consistent hashing.

Between supersteps, messages between vertices on different machines are passed over the network. For efficiency this is done (in part) asynchronously, during the superstep, as batches of vertices finish being updated. Provided the messages are short, and the graph is relatively sparse, as in the PageRank example, then the network overhead will be relatively small. Pregel will incur much more network overhead for highly-connected graphs, or if the messages are very large, or both.

In the case of PageRank, the network overhead can be reduced by changing the vertex-assignment procedure so that pages from the same domain are assigned to the same machine in the cluster. This reduces overhead because most links on the web are between pages from the same domain.

Notes on implementation

Combiners: In some cases, network communication can be substantially reduced by combining messages. For instance, in computing PageRank, suppose vertices v_1,v_2 and v_3 on one machine all send messages to a vertex v on another machine. It would cut down on bandwidth to simply add all those messages together before they're sent. Google's Pregel implementation allows the user to define a way to combine messages in this way. Note that in general there is no way for this combining to be done automatically - the user must specify how it is to be done.

Aggregators: These are a modification of the basic Pregel framework described above. The idea is that during each superstep each vertex can provide a value to a single central aggegrator. The aggregator combines all the values using a reducing function, and makes the result available to all vertices at the beginning of the next superstep. Aggregators could, for example, be used to check that the computation of PageRank is converging.

Exercises

  • Modify the pregel.py program so that Pregel instances can include aggregators, as described above.
  • How could an aggregator be used to fix the bug in the PageRank example above, so webpages with no outgoing links are dealt with correctly?
  • How could an aggregator be used to detect whether or not the computation of PageRank has converged, instead of doing 50 supersteps?

Fault-tolerance: For very large clusters, Pregel must be able to recover if one or more machines in the cluster die. To do this, Pregel uses a checkpointing procedure. The idea is that between supersteps Pregel occasionally saves the entire state of the graph to persistent storage. If a machine dies, Pregel returns to the most recent checkpoint, repartitions the vertices, reloads the state of the graph, and continues the computation. Note that there is a tradeoff between how frequently checkpoints are done and how often machines die. According to the paper the implementation tries to determine a checkpointing frequency that ensures minimal average cost.

Miscellanea:

  • Google implement Pregel in C++.
  • The state of the graph is held in-memory, although the authors note that buffered messages are sometimes held on local disk.
  • Everything is co-ordinated by a single master machine.
  • The authors note that they've scaled Pregel to run on clusters containing hundreds of machines, and with graphs containing billions of vertices.
  • The authors compare Pregel to other graph-based frameworks, and note that theirs seems to be the only available framework which has been tested on clusters containing hundreds of machines.
  • Pregel is based on the Bulk Synchronous Parallel model of programming developed by Les Valiant.
  • There is a potential for bottlenecks when the computation at some vertex takes far longer than at most other vertices, stopping the superstep from completing. This may require careful design and profiling to avoid.
  • The paper is a bit handwavy on why and when Pregel is superior to MapReduce, although it is quite clear that the authors believe this is often the case for graph-based algorithms.

Concluding thought: Pregel can be used to do much more than just computing PageRank. In the Pregel paper, the authors describe how Pregel can be used to solve other problems, including finding shortest paths in a graph, and finding clusters in social networks. I'll conclude by noting a fun problem that they don't address, but which I believe could be addressed using Pregel or a similar system:

Problems

  • Can you use Pregel to compute (say) book recommendations, based on a set of user-supplied rating data for books?

Follow me on Twitter; Subscribe to RSS feed

14 comments

  1. Pingback: Quora
  2. Pingback: Quora
  3. Could you use Pregel to run on a medical data-base that would alert the doctor to abnormal changes in blood/liver/enzyme counts?

    Normally this would be done by eye when the doctor compares the latest blood counts.

    1. Pregel’s big advantage is scaling well on distributed clusters of computers. At a guess, the kind of medical database you’re talking about would be a single machine database (?), so Pregel wouldn’t really be applicable. But I don’t know for sure.

  4. Well I had in mind a large hospital data-base, or one such as the large blood labratorys such as sullivan nicholades. I’m pretty sure these would be run on fairly large servers due to the large volume of daily queries.

    The reason I saw pregel was its ability to index that kind of data fast. I originally wanted to create a graph layer that would analyse individual blood counts and plot them using moving averages to determain drastic changes in blood levels before they go outside the usual ‘blood count range’

    The solution I had was to have it on the doctor side rather than, server side, but a server-side implementation might be easier to implement.

  5. If it’s volume of queries that’s the problem, then that problem should be soluble by using replication and slave databases.

    But for very large datasets, where the problem isn’t the query volume, but rather that the data won’t all fit on one machine, or even on a small number of machines (where sharding might be the answer), then maybe Pregel would be useful. Of course, it depends on the details of the application. At the moment there’s no open source version of Pregel equivalent to Hadoop (although some people are working on interesting-looking projects), so Hadoop might be a better way to go in this scenario.

  6. Ok, so it’s more fitted for web-based mining than open-source. What about using pregel to data-mine for stock tips? I know there are some financial institutions running web-based analyzers for automated trading. How about creating a database of companies from the asx and correlating positive or negative data based on variables like hits, actual stock price/volume website statitistcs and other web-mined statistics?
    Or for FOREX analysis based on economic news and each countries cross-pair?

    1. The open source issue I mentioned isn’t about what the Pregel framework is well adapted for, algorithmically. It’s whether or not there is a version of Pregel that’s publicly available for people to play with (i.e., not restricted to people who work for Google). Although there are several experimental open source implementations of Pregel, they all look to be in early days. I don’t know of any proprietary implementation, apart from Google’s, and they’re not selling it, so far as I know.

      Regarding whether Pregel can be used to do web-mining of financial data, the devil is, as always, in the details of exactly what you’re computing.

    2. It sounds like you need time series analysis tool, not really a graph tool. Something like Aster Data’s npath functionality to spot large variations.

  7. Brendan, my guess is that you need a high performance graph database (GDB) oriented to transactional read queries. This type of GDBs allow for performing this type of pattern graph queries that are placed by navigating part of the graph. Please, see http://www.sparsity-technologies.com, where Dex shows to be a GDB which has a huge capacity (in the billions of nodes, edges and attributes) and a high performance analytical capability with queries answered in less than a second. It runs in off the shelve single processor hardware.

Comments are closed.