Caches the vertices and edges associated with this graph at the previously-specified target
storage levels, which default to MEMORY_ONLY
.
Caches the vertices and edges associated with this graph at the previously-specified target
storage levels, which default to MEMORY_ONLY
. This is used to pin a graph in memory enabling
multiple queries to reuse the same construction process.
Mark this Graph for checkpointing.
Mark this Graph for checkpointing. It will be saved to a file inside the checkpoint directory set with SparkContext.setCheckpointDir() and all references to its parent RDDs will be removed. It is strongly recommended that this Graph is persisted in memory, otherwise saving it on a file will require recomputation.
An RDD containing the edges and their associated attributes.
An RDD containing the edges and their associated attributes. The entries in the RDD contain just the source id and target id along with the edge data.
an RDD containing the edges in this graph
Merges multiple edges between two vertices into a single edge.
Merges multiple edges between two vertices into a single edge. For correct results, the graph must have been partitioned using partitionBy.
the user-supplied commutative associative function to merge edge attributes for duplicate edges.
The resulting graph with a single edge for each (source, dest) vertex pair.
Transforms each edge attribute using the map function, passing it a whole partition at a time.
Transforms each edge attribute using the map function, passing it a whole partition at a
time. The map function is given an iterator over edges within a logical partition as well as
the partition's ID, and it should return a new iterator over the new values of each edge. The
new iterator's elements must correspond one-to-one with the old iterator's elements. If
adjacent vertex values are desired, use mapTriplets
.
the new edge data type
a function that takes a partition id and an iterator over all the edges in the partition, and must return an iterator over the new values for each edge in the order of the input iterator
This does not change the structure of the graph or modify the values of this graph. As a consequence the underlying index structures can be reused.
Transforms each edge attribute a partition at a time using the map function, passing it the adjacent vertex attributes as well.
Transforms each edge attribute a partition at a time using the map function, passing it the
adjacent vertex attributes as well. The map function is given an iterator over edge triplets
within a logical partition and should yield a new iterator over the new values of each edge in
the order in which they are provided. If adjacent vertex values are not required, consider
using mapEdges
instead.
the new edge data type
the iterator transform
which fields should be included in the edge triplet passed to the map function. If not all fields are needed, specifying this can improve performance.
This does not change the structure of the graph or modify the values of this graph. As a consequence the underlying index structures can be reused.
Transforms each vertex attribute in the graph using the map function.
Transforms each vertex attribute in the graph using the map function.
the new vertex data type
the function from a vertex object to a new vertex value
We might use this operation to change the vertex values from one type to another to initialize an algorithm.
val rawGraph: Graph[(), ()] = Graph.textFile("hdfs://file") val root = 42 var bfsGraph = rawGraph.mapVertices[Int]((vid, data) => if (vid == root) 0 else Math.MaxValue)
The new graph has the same structure. As a consequence the underlying index structures can be reused.
Restricts the graph to only the vertices and edges that are also in other
, but keeps the
attributes from this graph.
Restricts the graph to only the vertices and edges that are also in other
, but keeps the
attributes from this graph.
the graph to project this graph onto
a graph with vertices and edges that exist in both the current graph and other
,
with vertex and edge data from the current graph
Joins the vertices with entries in the table
RDD and merges the results using mapFunc
.
Joins the vertices with entries in the table
RDD and merges the results using mapFunc
.
The input table should contain at most one entry for each vertex. If no entry in other
is
provided for a particular vertex in the graph, the map function receives None
.
the type of entry in the table of updates
the new vertex value type
the table to join with the vertices in the graph. The table should contain at most one entry for each vertex.
the function used to compute the new vertex values. The map function is invoked for all vertices, even those that do not have a corresponding entry in the table.
This function is used to update the vertices with new values based on external data. For example we could add the out-degree to each vertex record:
val rawGraph: Graph[_, _] = Graph.textFile("webgraph") val outDeg: RDD[(VertexId, Int)] = rawGraph.outDegrees val graph = rawGraph.outerJoinVertices(outDeg) { (vid, data, optDeg) => optDeg.getOrElse(0) }
Repartitions the edges in the graph according to partitionStrategy
.
Repartitions the edges in the graph according to partitionStrategy
.
the partitioning strategy to use when partitioning the edges in the graph.
the number of edge partitions in the new graph.
Repartitions the edges in the graph according to partitionStrategy
.
Repartitions the edges in the graph according to partitionStrategy
.
the partitioning strategy to use when partitioning the edges in the graph.
Caches the vertices and edges associated with this graph at the specified storage level, ignoring any target storage levels previously set.
Caches the vertices and edges associated with this graph at the specified storage level, ignoring any target storage levels previously set.
the level at which to cache the graph.
A reference to this graph for convenience.
Reverses all edges in the graph.
Reverses all edges in the graph. If this graph contains an edge from a to b then the returned graph contains an edge from b to a.
Restricts the graph to only the vertices and edges satisfying the predicates.
Restricts the graph to only the vertices and edges satisfying the predicates. The resulting subgraph satisifies
V' = {v : for all v in V where vpred(v)} E' = {(u,v): for all (u,v) in E where epred((u,v)) && vpred(u) && vpred(v)}
the edge predicate, which takes a triplet and evaluates to true if the edge is to remain in the subgraph. Note that only edges where both vertices satisfy the vertex predicate are considered.
the vertex predicate, which takes a vertex object and evaluates to true if the vertex is to be included in the subgraph
the subgraph containing only the vertices and edges that satisfy the predicates
An RDD containing the edge triplets, which are edges along with the vertex data associated with the adjacent vertices.
An RDD containing the edge triplets, which are edges along with the vertex data associated with the adjacent vertices. The caller should use edges if the vertex data are not needed, i.e. if only the edge data and adjacent vertex ids are needed.
an RDD containing edge triplets
This operation might be used to evaluate a graph coloring where we would like to check that both vertices are a different color.
type Color = Int val graph: Graph[Color, Int] = GraphLoader.edgeListFile("hdfs://file.tsv") val numInvalid = graph.triplets.map(e => if (e.src.data == e.dst.data) 1 else 0).sum
Uncaches both vertices and edges of this graph.
Uncaches both vertices and edges of this graph. This is useful in iterative algorithms that build a new graph in each iteration.
Uncaches only the vertices of this graph, leaving the edges alone.
Uncaches only the vertices of this graph, leaving the edges alone. This is useful in iterative algorithms that modify the vertex attributes but reuse the edges. This method can be used to uncache the vertex attributes of previous iterations once they are no longer needed, improving GC performance.
An RDD containing the vertices and their associated attributes.
An RDD containing the vertices and their associated attributes.
an RDD containing the vertices in this graph
vertex ids are unique.
Aggregates values from the neighboring edges and vertices of each vertex.
Aggregates values from the neighboring edges and vertices of each vertex. The user supplied
mapFunc
function is invoked on each edge of the graph, generating 0 or more "messages" to be
"sent" to either vertex in the edge. The reduceFunc
is then used to combine the output of
the map phase destined to each vertex.
This function is deprecated in 1.2.0 because of SPARK-3936. Use aggregateMessages instead.
the type of "message" to be sent to each vertex
the user defined map function which returns 0 or more messages to neighboring vertices
the user defined reduce function which should be commutative and associative and is used to combine the output of the map phase
an efficient way to run the aggregation on a subset of the edges if
desired. This is done by specifying a set of "active" vertices and an edge direction. The
sendMsg
function will then run only on edges connected to active vertices by edges in the
specified direction. If the direction is In
, sendMsg
will only be run on edges with
destination in the active set. If the direction is Out
, sendMsg
will only be run on edges
originating from vertices in the active set. If the direction is Either
, sendMsg
will be
run on edges with *either* vertex in the active set. If the direction is Both
, sendMsg
will be run on edges with *both* vertices in the active set. The active set must have the
same index as the graph's vertices.
(Since version 1.2.0) use aggregateMessages
We can use this function to compute the in-degree of each vertex
val rawGraph: Graph[(),()] = Graph.textFile("twittergraph") val inDeg: RDD[(VertexId, Int)] = mapReduceTriplets[Int](et => Iterator((et.dst.id, 1)), _ + _)
By expressing computation at the edge level we achieve maximum parallelism. This is one of the core functions in the Graph API in that enables neighborhood level computation. For example this function can be used to count neighbors satisfying a predicate or implement PageRank.
Aggregates values from the neighboring edges and vertices of each vertex.
Aggregates values from the neighboring edges and vertices of each vertex. The user-supplied
sendMsg
function is invoked on each edge of the graph, generating 0 or more messages to be
sent to either vertex in the edge. The mergeMsg
function is then used to combine all messages
destined to the same vertex.
the type of message to be sent to each vertex
runs on each edge, sending messages to neighboring vertices using the EdgeContext.
used to combine messages from sendMsg
destined to the same vertex. This
combiner should be commutative and associative.
which fields should be included in the EdgeContext passed to the
sendMsg
function. If not all fields are needed, specifying this can improve performance.
We can use this function to compute the in-degree of each vertex
val rawGraph: Graph[_, _] = Graph.textFile("twittergraph") val inDeg: RDD[(VertexId, Int)] = aggregateMessages[Int](ctx => ctx.sendToDst(1), _ + _)
By expressing computation at the edge level we achieve maximum parallelism. This is one of the core functions in the Graph API in that enables neighborhood level computation. For example this function can be used to count neighbors satisfying a predicate or implement PageRank.
Transforms each edge attribute in the graph using the map function.
Transforms each edge attribute in the graph using the map function. The map function is not
passed the vertex value for the vertices adjacent to the edge. If vertex values are desired,
use mapTriplets
.
the new edge data type
the function from an edge object to a new edge value.
This function might be used to initialize edge attributes.
This graph is not changed and that the new graph has the same structure. As a consequence the underlying index structures can be reused.
Transforms each edge attribute using the map function, passing it the adjacent vertex attributes as well.
Transforms each edge attribute using the map function, passing it the adjacent vertex
attributes as well. If adjacent vertex values are not required,
consider using mapEdges
instead.
the new edge data type
the function from an edge object to a new edge value.
which fields should be included in the edge triplet passed to the map function. If not all fields are needed, specifying this can improve performance.
This function might be used to initialize edge attributes based on the attributes associated with each vertex.
val rawGraph: Graph[Int, Int] = someLoadFunction() val graph = rawGraph.mapTriplets[Int]( edge => edge.src.data - edge.dst.data)
This does not change the structure of the graph or modify the values of this graph. As a consequence the underlying index structures can be reused.
Transforms each edge attribute using the map function, passing it the adjacent vertex attributes as well.
Transforms each edge attribute using the map function, passing it the adjacent vertex
attributes as well. If adjacent vertex values are not required,
consider using mapEdges
instead.
the new edge data type
the function from an edge object to a new edge value.
This function might be used to initialize edge attributes based on the attributes associated with each vertex.
val rawGraph: Graph[Int, Int] = someLoadFunction() val graph = rawGraph.mapTriplets[Int]( edge => edge.src.data - edge.dst.data)
This does not change the structure of the graph or modify the values of this graph. As a consequence the underlying index structures can be reused.
The associated GraphOps object.
The Graph abstractly represents a graph with arbitrary objects associated with vertices and edges. The graph provides basic operations to access and manipulate the data associated with vertices and edges as well as the underlying structure. Like Spark RDDs, the graph is a functional data-structure in which mutating operations return new graphs.
the vertex attribute type
the edge attribute type
GraphOps contains additional convenience operations and graph algorithms.