본문 바로가기
2022-2/논문 아이디어

Pregel: A System for Large-Scale Graph Processing

by 이망고_ 2022. 2. 15.

Pregel: A System for Large-Scale Graph Processing

 

ABSTRACT
Many practical computing problems concern large graphs. Standard examples include the Web graph and various social networks. The scale of these graphs—in some cases billions of vertices, trillions of edges—poses challenges to their efficient processing. In this paper we present a computational model suitable for this task. Programs are expressed as a sequence of iterations, in each of which a vertex can receive messages sent in the previous iteration, send messages to other vertices, and modify its own state and that of its outgoing edges or mutate graph topology. This vertex-centric approach is flexible enough to express a broad set of algorithms. The model has been designed for efficient, scalable, and fault-tolerant implementation on clusters of thousands of commodity computers, and its implied synchronicity makes reasoning about programs easier. Distribution-related details are hidden behind an abstract API. The result is a framework for processing large graphs that is expressive and easy to program.

 

 

INTRODUCTION 

The Internet made the Web graph a popular object of
analysis and research. Web 2.0 fueled interest in social networks. Other large graphs—for example, induced by transportation routes, the similarity of newspaper articles, paths of disease outbreaks, or citation relationships among published scientific work—have been processed for decades. Frequently applied algorithms include shortest paths computations, different flavors of clustering, and variations on the page rank theme. There are many other graph computing problems of practical value, e.g., minimum cut and connected components. Efficient processing of large graphs is challenging. Graph algorithms often exhibit poor locality of memory access, very little work per vertex, and a changing degree of parallelism over the course of execution [31, 39]. Distribution over many machines exacerbates the locality issue and increases the probability that a machine will fail during computation. Despite the ubiquity of large graphs and their commercial im- importance, we know of no scalable general-purpose system for implementing arbitrary graph algorithms over arbitrary graph representations in a large-scale distributed environment. Implementing an algorithm to process a large graph typically means choosing among the following options:
1. Crafting a custom distributed infrastructure, typically requiring a substantial implementation effort that must be repeated for each new algorithm or graph representation.
2. Relying on an existing distributed computing platform, often ill-suited for graph processing. MapReduce [14], for example, is a very good fit for a wide array of large-scale computing problems. It is sometimes used to mine large graphs [11, 30], but this can lead to sub-optimal performance and usability issues. The basic models for processing data have been extended to facilitate aggregation [41] and SQL-like queries [40, 47], but these extensions are usually not ideal for graph algorithms that often better fit a message-passing model.
3. Using a single-computer graph algorithm library, such as BGL [43], LEDA [35], NetworkX [25], JDSL [20], Stanford GraphBase [29], or FGL [16], limiting the scale of problems that can be addressed.
4. Using an existing parallel graph system. The Parallel BGL [22] and CGMgraph [8] libraries address parallel graph algorithms, but do not address fault tolerance or other issues that are important for very large-scale distributed systems.
None of these alternatives fit our purposes. To address dis- tributed processing of large-scale graphs, we built a scalable our experience with it. The high-level organization of Pregel programs is inspired
by Valiant’s Bulk Synchronous Parallel model [45]. Pregel computations consist of a sequence of iterations, called supersteps. During a superstep the framework invokes a user-defined function for each vertex, conceptually in parallel. The function specifies behavior at a single vertex V and a single superstep S. It can read messages sent to V in superstep S − 1, send messages to other vertices that will be received at superstep S + 1, and modify the state of V and its outgoing edges. Messages are typically sent along outgoing edges, but a message may be sent to any vertex whose identifier is known. The vertex-centric approach is reminiscent of MapReduce
in that users focus on a local action, processing each item independently, and the system composes these actions to lift computation to a large dataset. By design the model is well suited for distributed implementations: it doesn’t expose any mechanism for detecting order of execution within a superstep, and all communication is from superstep S to superstep S + 1. The synchronicity of this model makes it easier to reason
about program semantics when implementing algorithms, and ensures that Pregel programs are inherently free of dead-locks and data races common in asynchronous systems. In principle, the performance of Pregel programs should be competitive with that of asynchronous systems given enough parallel slack [28, 34]. Because typical graph computations have many more vertices than machines, one should be able to balance the machine loads so that the synchronization between supersteps does not add excessive latency. The rest of the paper is structured as follows. Section 2
describes the model. Section 3 describes its expression as a C++ API. Section 4 discusses implementation issues,  including performance and fault tolerance. In Section 5 we present several applications of this model to graph algorithm problems, and in Section 6 we present performance results. Finally, we discuss related work and future directions.

 

MODEL OF COMPUTATION 

The input to a Pregel computation is a directed graph in which each vertex is uniquely identified by a string vertex identifier. Each vertex is associated with a modifiable, user-defined value. The directed edges are associated with their source vertices, and each edge consists of a modifiable, user-defined value and a target vertex identifier. A typical Pregel computation consists of input, when the
graph is initialized, followed by a sequence of super steps separated by global synchronization points until the algorithm terminates, and finishing with output. Within each superstep the vertices compute in parallel,
each executing the same user-defined function that expresses the logic of a given algorithm. A vertex can modify its state or that of its outgoing edges, receive messages sent to it in the previous superstep, send messages to other vertices (to be received in the next superstep), or even mutate topology of the graph. Edges are not first-class citizens in this model, having no associated computation. 

 Algorithm termination is based on every vertex voting to halt. In superstep 0, every vertex is in the active state; all active vertices participate in the computation of any given superstep. A vertex deactivates itself by voting to halt. This means that the vertex has no further work to do unless triggered externally, and the Pregel framework will not execute that vertex in subsequent supersteps unless it receives a message. If reactivated by a message, a vertex must explicitly deactivate itself again. The algorithm as a whole terminates when all vertices are simultaneously inactive and there are no messages in transit. This simple state machine is illustrated in Figure 1. The output of a Pregel program is the set of values explicitly output by the vertices. It is often a directed graph isomorphic to the input, but this is not a necessary property of the system because vertices and edges can be added and removed during computation. A clustering algorithm, for example, might generate a small set of disconnected vertices selected from a large graph. A graph mining algorithm might simply output aggregated statistics mined from the graph. Figure 2 illustrates these concepts using a simple example:
given a strongly connected graph where each vertex contains a value, it propagates the largest value to every vertex. In each superstep, any vertex that has learned a larger value from its messages sends it to all its neighbors. When no further vertices change in a superstep, the algorithm terminates. We chose a pure message-passing model, omitting remote
reads and other ways of emulating shared memory, for two reasons. First, message passing is sufficiently expressive that there is no need for remote reads. We have not found any graph algorithms for which message passing is insufficient. Second, this choice is better for performance. In a cluster environment, reading a value from a remote machine incurs high latency that can’t easily be hidden. Our message-passing model allows us to amortize latency by delivering messages asynchronously in batches. 

 Graph algorithms can be written as a series of chained
MapReduce invocations [11, 30]. We chose a different model for reasons of usability and performance. Pregel keeps vertices and edges on the machine that performs computation and uses network transfers only for messages. MapReduce, however, is essentially functional, so expressing a graph algorithm as a chained MapReduce requires passing the entire state of the graph from one stage to the next—in general, requiring much more communication and associated serialization overhead. In addition, the need to coordinate the steps of a chained MapReduce adds programming complexity that is avoided by Pregel’s iteration over supersteps.

3. THE C++ API 

 This section discusses the most important aspects of Pregel’s C++ API, omitting relatively mechanical issues. Writing a Pregel program involves subclassing the predefined Vertex class (see Figure 3). Its template arguments define three value types, associated with vertices, edges, and messages. Each vertex has an associated value of the specified type. This uniformity may seem restrictive, but users can manage it by using flexible types like protocol buffers [42]. The edge and message types behave similarly. The user overrides the virtual Compute() method, which
will be executed at each active vertex in every superstep. Predefined Vertex methods allow Compute() to query information about the current vertex and its edges, and to send messages to other vertices. Compute() can inspect the value associated with its vertex via GetValue() or modify it via MutableValue(). It can inspect and modify the values of out-edges using methods supplied by the out-edge iterator. These state updates are visible immediately. Since their visibility is confined to the modified vertex, there are no data races on concurrent value access from different vertices. The values associated with the vertex and its edges are the
only per-vertex state that persists across supersteps. Limiting the graph state managed by the framework to a single value per vertex or edge simplifies the main computation cycle, graph distribution, and failure recovery.
3.1 Message Passing 

 Vertices communicate directly with one another by sending messages, each of which consists of a message value and the name of the destination vertex. The type of message value is specified by the user as a template parameter of the Vertex class. A vertex can send any number of messages in a superstep.
All messages sent to vertex V in superstep S are available, via an iterator, when V’s Compute() method is called in superstep S + 1. There is no guaranteed order of messages in the iterator, but it is guaranteed that messages will be delivered and that they will not be duplicated. A common usage pattern is for a vertex V to iterate over
its outgoing edges, sending a message to the destination ver- tex of each edge, as shown in the PageRank algorithm in Figure 4 (Section 5.1 below). However, dest_vertex need not be a neighbor of V. A vertex could learn the identifier of a non-neighbor from a message received earlier, or vertex identifiers could be known implicitly. For example, the graph could be a clique, with well-known vertex identifiers V1 through Vn, in which case there may be no need to even keep explicit edges in the graph. When the destination vertex of any message does not exist, we execute user-defined handlers. A handler could, for example, create the missing vertex or remove the dangling edge from its source vertex. 

 

3.2 Combiners 

Sending a message, especially to a vertex on another machine, incurs some overhead. This can be reduced in some cases with help from the user. For example, suppose that Compute() receives integer messages and that only the sum matters, as opposed to the individual values. In that case, the system can combine several messages intended for a vertex V into a single message containing their sum, reducing the number of messages that must be transmitted and buffered. Combiners are not enabled by default, because there is
no mechanical way to find a useful combining function that is consistent with the semantics of the user’s Compute() method. To enable this optimization the user subclasses the Combiner class, overriding a virtual Combine() method. There are no guarantees about which (if any) messages are combined, the groupings presented to the combiner, or the order of combining, so combiners should only be enabled for commutative and associative operations. For some algorithms, such as single-source shortest paths
(Section 5.2), we have observed more than a fourfold reduc- tion in message traffic by using combiners.

 

3.3 Aggregators 

Pregel aggregators are a mechanism for global communication, monitoring, and data. Each vertex can provide a value to an aggregator in superstep S, the system combines those values using a reduction operator, and the resulting value is made available to all vertices in 

superstep S + 1. Pregel includes a number of predefined aggregators, such as min, max, or sum operations on various integer or string types. Aggregators can be used for statistics. For instance, a sum aggregator applied to the out-degree of each vertex yields the total number of edges in the graph. More complex reduction operators can generate histograms of a statistic. Aggregators can also be used for global coordination. For
instance, one branch of Compute() can be executed for the supersteps until an and aggregator determines that all vertices satisfy some condition, and then another branch can be executed until termination. A min or max aggregator, applied to the vertex ID, can be used to select a vertex to play a distinguished role in an algorithm. To define a new aggregator, a user subclasses the pre-
defined Aggregator class, and specifies how the aggregated value is initialized from the first input value and how multiple partially aggregated values are reduced to one. Aggregation operators should be commutative and associative. By default an aggregator only reduces input values from
a single superstep, but it is also possible to define a sticky aggregator that uses input values from all supersteps. This is useful, for example, for maintaining a global edge count that is adjusted only when edges are added or removed. More advanced uses are possible. For example, an aggregator can be used to implement a distributed priority queue for the ∆-stepping shortest paths algorithm [37]. Each vertex is assigned to a priority bucket based on its tentative distance. In one superstep, the vertices contribute their in- dices to a min aggregator. The minimum is broadcast to all workers in the next superstep, and the vertices in the lowest-index bucket relax edges.

 

3.4 Topology Mutations 

 Some graph algorithms need to change the graph’s topology. A clustering algorithm, for example, might replace each cluster with a single vertex, and a minimum spanning tree algorithm might remove all but the tree edges. Just as a user’s Compute() function can send messages, it can also issue requests to add or remove vertices or edges. Multiple vertices may issue conflicting requests in the same
superstep (e.g., two requests to add a vertex V, with different initial values). We use two mechanisms to achieve determinism: partial ordering and handlers. As with messages, mutations become effective in the superstep after the requests were issued. Within that super- step removals are performed first, with edge removal before vertex removal, since removing a vertex implicitly removes all of its out-edges. Additions follow removals, with vertex addition before edge addition, and all mutations precede calls to Compute(). This partial ordering yields deterministic results for most conflicts. The remaining conflicts are resolved by user-defined handlers. If there are multiple requests to create the same vertex in the same superstep, then by default the system just picks one arbitrarily, but users with special needs may specify a better conflict resolution policy by defining an appropriate handler method in their Vertex subclass. The same handler mechanism is used to resolve conflicts caused by multiple vertex removal requests, or by multiple edge addition or re- moval requests. We delegate the resolution to handlers to keep the code of Compute() simple, which limits the inter-action between a handler and Compute(), but has not been an issue in practice. Our coordination mechanism is lazy: global mutations do
not require coordination until the point when they are applied. This design choice facilitates stream processing. The
intuition is that conflicts involving modification of a vertex V are handled by V itself. Pregel also supports purely local mutations, i.e., a vertex
adding or removing its own outgoing edges or removing it- self. Local mutations cannot introduce conflicts and making them immediately effective simplifies distributed programming by using easier sequential programming semantics.

 

3.5 Input and output 

There are many possible file formats for graphs, such as
a text file, a set of vertices in a relational database, or rows in Bigtable [9]. To avoid imposing a specific choice of file format, Pregel decouples the task of interpreting an input file as a graph from the task of graph computation. Similarly, output can be generated in an arbitrary format and stored in the form most suitable for a given application. The Pregel library provides readers and writers for many common file formats, but users with unusual needs can write their own by subclassing the abstract base classes Reader and Writer.
4.