Graph
Collecting pyspark
[?25l Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
[K |████████████████████████████████| 204.2MB 38kB/s
[?25hCollecting py4j==0.10.9
[?25l Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K |████████████████████████████████| 204kB 51.8MB/s
[?25hBuilding wheels for collected packages: pyspark
Building wheel for pyspark (setup.py) ... [?25l[?25hdone
Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612243 sha256=d4a7bf9d9286fd24237188aa6c7e6fce433cbe8062ab1b5448e6c06d960012fe
Stored in directory: /root/.cache/pip/wheels/5e/bd/07/031766ca628adec8435bb40f0bd83bb676ce65ff4007f8e73f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1
Collecting graphframes
Downloading https://files.pythonhosted.org/packages/0b/27/c7c7e1ced2fe9a905f865dd91faaec2ac8a8e313f511678c8ec92a41a153/graphframes-0.6-py2.py3-none-any.whl
Requirement already satisfied: numpy in /usr/local/lib/python3.6/dist-packages (from graphframes) (1.18.5)
Collecting nose
[?25l Downloading https://files.pythonhosted.org/packages/15/d8/dd071918c040f50fa1cf80da16423af51ff8ce4a0f2399b7bf8de45ac3d9/nose-1.3.7-py3-none-any.whl (154kB)
[K |████████████████████████████████| 163kB 15.3MB/s
[?25hInstalling collected packages: nose, graphframes
Successfully installed graphframes-0.6 nose-1.3.7
!wget https://dl.bintray.com/spark-packages/maven/graphframes/graphframes/0.8.0-spark3.0-s_2.12/graphframes-0.8.0-spark3.0-s_2.12.jar
--2020-12-02 15:45:29-- https://dl.bintray.com/spark-packages/maven/graphframes/graphframes/0.8.0-spark3.0-s_2.12/graphframes-0.8.0-spark3.0-s_2.12.jar
Resolving dl.bintray.com (dl.bintray.com)... 3.122.43.129, 35.157.127.85
Connecting to dl.bintray.com (dl.bintray.com)|3.122.43.129|:443... connected.
HTTP request sent, awaiting response... 302
Location: https://d29vzk4ow07wi7.cloudfront.net/b62d4bb1c4fdd74c9ce5aa4adee520a7a4375c2de73487381644e5220c67c1dd?response-content-disposition=attachment%3Bfilename%3D%22graphframes-0.8.0-spark3.0-s_2.12.jar%22&Policy=eyJTdGF0ZW1lbnQiOiBbeyJSZXNvdXJjZSI6Imh0dHAqOi8vZDI5dnprNG93MDd3aTcuY2xvdWRmcm9udC5uZXQvYjYyZDRiYjFjNGZkZDc0YzljZTVhYTRhZGVlNTIwYTdhNDM3NWMyZGU3MzQ4NzM4MTY0NGU1MjIwYzY3YzFkZD9yZXNwb25zZS1jb250ZW50LWRpc3Bvc2l0aW9uPWF0dGFjaG1lbnQlM0JmaWxlbmFtZSUzRCUyMmdyYXBoZnJhbWVzLTAuOC4wLXNwYXJrMy4wLXNfMi4xMi5qYXIlMjIiLCJDb25kaXRpb24iOnsiRGF0ZUxlc3NUaGFuIjp7IkFXUzpFcG9jaFRpbWUiOjE2MDY5MjQ2NDl9LCJJcEFkZHJlc3MiOnsiQVdTOlNvdXJjZUlwIjoiMC4wLjAuMC8wIn19fV19&Signature=dJQvqo-UV~oRF0t93Qw-YSGpDE1pvQCuS0t~kNdiC2cDsUoJ5CdWB-RinPlpEX5TY8-oIQBnpefd7ljNKnJ5t7L1ae4ZcIQLfqBdvJdMK7AcKzytxy3cc17j7Zc80hbTMYVkhThFSJS0Loz6fvPyedCqfjI8G66Mrp46VgSpTBCqHxF0bKusZuM4w82M9d-iLmYELnyDNPHTbLIAjlMh24CcxETAKmI~AN-pZPjaGz6YMc9rFuyROe8FE4p2B5jbmjzo5LB0AHNdJll~GXtqGFKPsdJavvoCVDqbAdyJxL3XtGZdMLwbSHO6WMhbJRetQc5mEgqSLPsrNXrCvD2Q-g__&Key-Pair-Id=APKAIFKFWOMXM2UMTSFA [following]
--2020-12-02 15:45:29-- https://d29vzk4ow07wi7.cloudfront.net/b62d4bb1c4fdd74c9ce5aa4adee520a7a4375c2de73487381644e5220c67c1dd?response-content-disposition=attachment%3Bfilename%3D%22graphframes-0.8.0-spark3.0-s_2.12.jar%22&Policy=eyJTdGF0ZW1lbnQiOiBbeyJSZXNvdXJjZSI6Imh0dHAqOi8vZDI5dnprNG93MDd3aTcuY2xvdWRmcm9udC5uZXQvYjYyZDRiYjFjNGZkZDc0YzljZTVhYTRhZGVlNTIwYTdhNDM3NWMyZGU3MzQ4NzM4MTY0NGU1MjIwYzY3YzFkZD9yZXNwb25zZS1jb250ZW50LWRpc3Bvc2l0aW9uPWF0dGFjaG1lbnQlM0JmaWxlbmFtZSUzRCUyMmdyYXBoZnJhbWVzLTAuOC4wLXNwYXJrMy4wLXNfMi4xMi5qYXIlMjIiLCJDb25kaXRpb24iOnsiRGF0ZUxlc3NUaGFuIjp7IkFXUzpFcG9jaFRpbWUiOjE2MDY5MjQ2NDl9LCJJcEFkZHJlc3MiOnsiQVdTOlNvdXJjZUlwIjoiMC4wLjAuMC8wIn19fV19&Signature=dJQvqo-UV~oRF0t93Qw-YSGpDE1pvQCuS0t~kNdiC2cDsUoJ5CdWB-RinPlpEX5TY8-oIQBnpefd7ljNKnJ5t7L1ae4ZcIQLfqBdvJdMK7AcKzytxy3cc17j7Zc80hbTMYVkhThFSJS0Loz6fvPyedCqfjI8G66Mrp46VgSpTBCqHxF0bKusZuM4w82M9d-iLmYELnyDNPHTbLIAjlMh24CcxETAKmI~AN-pZPjaGz6YMc9rFuyROe8FE4p2B5jbmjzo5LB0AHNdJll~GXtqGFKPsdJavvoCVDqbAdyJxL3XtGZdMLwbSHO6WMhbJRetQc5mEgqSLPsrNXrCvD2Q-g__&Key-Pair-Id=APKAIFKFWOMXM2UMTSFA
Resolving d29vzk4ow07wi7.cloudfront.net (d29vzk4ow07wi7.cloudfront.net)... 54.240.168.145, 54.240.168.82, 54.240.168.77, ...
Connecting to d29vzk4ow07wi7.cloudfront.net (d29vzk4ow07wi7.cloudfront.net)|54.240.168.145|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 243265 (238K) [multipart/form-data]
Saving to: ‘graphframes-0.8.0-spark3.0-s_2.12.jar’
graphframes-0.8.0-s 100%[===================>] 237.56K 563KB/s in 0.4s
2020-12-02 15:45:31 (563 KB/s) - ‘graphframes-0.8.0-spark3.0-s_2.12.jar’ saved [243265/243265]
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages graphframes:graphframes:0.8.1-spark3.0-s_2.12 pyspark-shell'
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
<ipython-input-15-4d1ba1edf8bb> in <module>()
----> 1 os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages graphframes:graphframes:0.8.1-spark3.0-s_2.12 pyspark-shell'
NameError: name 'os' is not defined
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.types import Row
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
from graphframes import *
sc.addPyFile("https://dl.bintray.com/spark-packages/maven/graphframes/graphframes/0.8.0-spark3.0-s_2.12/graphframes-0.8.0-spark3.0-s_2.12.jar")
# Vertics DataFrame
v = spark.createDataFrame([
("a", "Alice", 34),
("b", "Bob", 36),
("c", "Charlie", 37),
("d", "David", 29),
("e", "Esther", 32),
("f", "Fanny", 38),
("g", "Gabby", 60)
], ["id", "name", "age"])
# Edges DataFrame
e = spark.createDataFrame([
("a", "b", "friend"),
("b", "c", "follow"),
("c", "b", "follow"),
("f", "c", "follow"),
("e", "f", "follow"),
("e", "d", "friend"),
("d", "a", "friend"),
("a", "e", "friend"),
("g", "e", "follow")
], ["src", "dst", "relationship"])
# Create a GraphFrame
g = GraphFrame(v, e)
g.vertices.show()
g.edges.show()
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-14-eccf3cb921e3> in <module>()
24
25 # Create a GraphFrame
---> 26 g = GraphFrame(v, e)
27
28 g.vertices.show()
/usr/local/lib/python3.6/dist-packages/graphframes/graphframe.py in __init__(self, v, e)
63 self._sqlContext = v.sql_ctx
64 self._sc = self._sqlContext._sc
---> 65 self._jvm_gf_api = _java_api(self._sc)
66
67 self.ID = self._jvm_gf_api.ID()
/usr/local/lib/python3.6/dist-packages/graphframes/graphframe.py in _java_api(jsc)
36 def _java_api(jsc):
37 javaClassName = "org.graphframes.GraphFramePythonAPI"
---> 38 return jsc._jvm.Thread.currentThread().getContextClassLoader().loadClass(javaClassName) \
39 .newInstance()
40
/usr/local/lib/python3.6/dist-packages/py4j/java_gateway.py in __call__(self, *args)
1303 answer = self.gateway_client.send_command(command)
1304 return_value = get_return_value(
-> 1305 answer, self.gateway_client, self.target_id, self.name)
1306
1307 for temp_arg in temp_args:
/usr/local/lib/python3.6/dist-packages/pyspark/sql/utils.py in deco(*a, **kw)
126 def deco(*a, **kw):
127 try:
--> 128 return f(*a, **kw)
129 except py4j.protocol.Py4JJavaError as e:
130 converted = convert_exception(e.java_exception)
/usr/local/lib/python3.6/dist-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o148.loadClass.
: java.lang.ClassNotFoundException: org.graphframes.GraphFramePythonAPI
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:834)
3# g.vertices and g.edges are just DataFrames
# You can use any DataFrame API on them
g.edges.filter("src = 'a'").show()
+---+---+------------+
|src|dst|relationship|
+---+---+------------+
| a| b| friend|
| a| e| friend|
+---+---+------------+
g.edges.filter("src = 'a'").count()
# Count the number of followers of c.
# This queries the edge DataFrame.
print(g.edges.filter("relationship = 'follow' and dst = 'c'").count())
# A GraphFrame has additional attributes
g.outDegrees.show()
+---+---------+
| id|outDegree|
+---+---------+
| g| 1|
| f| 1|
| e| 2|
| d| 1|
| c| 1|
| b| 1|
| a| 2|
+---+---------+
+---+--------+
| id|inDegree|
+---+--------+
| f| 1|
| e| 2|
| d| 1|
| c| 2|
| b| 2|
| a| 1|
+---+--------+
== Physical Plan ==
*(2) HashAggregate(keys=[dst#45], functions=[count(1)])
+- Exchange hashpartitioning(dst#45, 200), true, [id=#171]
+- *(1) HashAggregate(keys=[dst#45], functions=[partial_count(1)])
+- *(1) Project [dst#45]
+- *(1) Scan ExistingRDD[src#44,dst#45,relationship#46]
myInDegrees = g.edges.groupBy('dst').count()\
.withColumnRenamed('dst', 'id').withColumnRenamed('count', 'inDegree')
myInDegrees.show()
+---+--------+
| id|inDegree|
+---+--------+
| f| 1|
| e| 2|
| d| 1|
| c| 2|
| b| 2|
| a| 1|
+---+--------+
== Physical Plan ==
*(2) HashAggregate(keys=[dst#45], functions=[count(1)])
+- Exchange hashpartitioning(dst#45, 200), true, [id=#218]
+- *(1) HashAggregate(keys=[dst#45], functions=[partial_count(1)])
+- *(1) Project [dst#45]
+- *(1) Scan ExistingRDD[src#44,dst#45,relationship#46]
print(g.inDegrees.storageLevel)
DataFrame[id: string, inDegree: int]
print(g.inDegrees.storageLevel)
Disk Memory Deserialized 1x Replicated
print(g.vertices.storageLevel)
GraphFrame(v:[id: string, name: string ... 1 more field], e:[src: string, dst: string ... 1 more field])
print(g.vertices.storageLevel)
print(g.edges.storageLevel)
Disk Memory Deserialized 1x Replicated
Disk Memory Deserialized 1x Replicated
# A triplet view of the graph
g.triplets.show()
+----------------+--------------+----------------+
| src| edge| dst|
+----------------+--------------+----------------+
| [e, Esther, 32]|[e, f, follow]| [f, Fanny, 38]|
| [g, Gabby, 60]|[g, e, follow]| [e, Esther, 32]|
| [a, Alice, 34]|[a, e, friend]| [e, Esther, 32]|
| [e, Esther, 32]|[e, d, friend]| [d, David, 29]|
| [f, Fanny, 38]|[f, c, follow]|[c, Charlie, 37]|
| [b, Bob, 36]|[b, c, follow]|[c, Charlie, 37]|
|[c, Charlie, 37]|[c, b, follow]| [b, Bob, 36]|
| [a, Alice, 34]|[a, b, friend]| [b, Bob, 36]|
| [d, David, 29]|[d, a, friend]| [a, Alice, 34]|
+----------------+--------------+----------------+
== Physical Plan ==
*(9) Project [src#217, edge#215, dst#219]
+- *(9) SortMergeJoin [edge#215.dst], [dst#219.id], Inner
:- *(6) Sort [edge#215.dst ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(edge#215.dst, 200), true, [id=#312]
: +- *(5) SortMergeJoin [edge#215.src], [src#217.id], Inner
: :- *(2) Sort [edge#215.src ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(edge#215.src, 200), true, [id=#297]
: : +- *(1) Project [struct(src, src#44, dst, dst#45, relationship, relationship#46) AS edge#215]
: : +- InMemoryTableScan [dst#45, relationship#46, src#44]
: : +- InMemoryRelation [src#44, dst#45, relationship#46], StorageLevel(disk, memory, deserialized, 1 replicas)
: : +- *(1) Scan ExistingRDD[src#44,dst#45,relationship#46]
: +- *(4) Sort [src#217.id ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(src#217.id, 200), true, [id=#305]
: +- *(3) Project [struct(id, id#38, name, name#39, age, age#40L) AS src#217]
: +- InMemoryTableScan [age#40L, id#38, name#39]
: +- InMemoryRelation [id#38, name#39, age#40L], StorageLevel(disk, memory, deserialized, 1 replicas)
: +- *(1) Scan ExistingRDD[id#38,name#39,age#40L]
+- *(8) Sort [dst#219.id ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(dst#219.id, 200), true, [id=#320]
+- *(7) Project [struct(id, id#38, name, name#39, age, age#40L) AS dst#219]
+- InMemoryTableScan [age#40L, id#38, name#39]
+- InMemoryRelation [id#38, name#39, age#40L], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) Scan ExistingRDD[id#38,name#39,age#40L]
Motif Finding
# Search for pairs of vertices with edges in both directions between them.
motifs = g.find("(a)-[]->(b); (b)-[]->(a)").filter('a.id < b.id')
motifs.show()
+------------+----------------+
| a| b|
+------------+----------------+
|[b, Bob, 36]|[c, Charlie, 37]|
+------------+----------------+
# Find triangles
triangles = g.find("(a)-[]->(b); (b)-[]->(c); (c)-[]->(a)")
triangles = triangles.filter("a.id < b.id AND a.id < c.id")
triangles.show()
+--------------+---------------+--------------+
| a| b| c|
+--------------+---------------+--------------+
|[a, Alice, 34]|[e, Esther, 32]|[d, David, 29]|
+--------------+---------------+--------------+
== Physical Plan ==
*(6) Project [a#630, b#632, c#657]
+- *(6) BroadcastHashJoin [c#657.id, a#630.id], [__tmp-6526019406657860729#687.src, __tmp-6526019406657860729#687.dst], Inner, BuildRight
:- *(6) Project [a#630, b#632, c#657]
: +- *(6) BroadcastHashJoin [__tmp-430217833014886237#655.dst], [c#657.id], Inner, BuildRight, (a#630.id < c#657.id)
: :- *(6) BroadcastHashJoin [b#632.id], [__tmp-430217833014886237#655.src], Inner, BuildRight
: : :- *(6) Project [a#630, b#632]
: : : +- *(6) BroadcastHashJoin [__tmp-1043886091038848698#628.dst], [b#632.id], Inner, BuildRight, (a#630.id < b#632.id)
: : : :- *(6) BroadcastHashJoin [__tmp-1043886091038848698#628.src], [a#630.id], Inner, BuildRight
: : : : :- *(6) Project [struct(src, src#44, dst, dst#45, relationship, relationship#46) AS __tmp-1043886091038848698#628]
: : : : : +- InMemoryTableScan [dst#45, relationship#46, src#44]
: : : : : +- InMemoryRelation [src#44, dst#45, relationship#46], StorageLevel(disk, memory, deserialized, 1 replicas)
: : : : : +- *(1) Scan ExistingRDD[src#44,dst#45,relationship#46]
: : : : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, struct<id:string,name:string,age:bigint>, false].id)), [id=#628]
: : : : +- *(1) Project [struct(id, id#38, name, name#39, age, age#40L) AS a#630]
: : : : +- InMemoryTableScan [age#40L, id#38, name#39]
: : : : +- InMemoryRelation [id#38, name#39, age#40L], StorageLevel(disk, memory, deserialized, 1 replicas)
: : : : +- *(1) Scan ExistingRDD[id#38,name#39,age#40L]
: : : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, struct<id:string,name:string,age:bigint>, false].id)), [id=#634]
: : : +- *(2) Project [struct(id, id#38, name, name#39, age, age#40L) AS b#632]
: : : +- InMemoryTableScan [age#40L, id#38, name#39]
: : : +- InMemoryRelation [id#38, name#39, age#40L], StorageLevel(disk, memory, deserialized, 1 replicas)
: : : +- *(1) Scan ExistingRDD[id#38,name#39,age#40L]
: : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, struct<src:string,dst:string,relationship:string>, false].src)), [id=#641]
: : +- *(3) Project [struct(src, src#44, dst, dst#45, relationship, relationship#46) AS __tmp-430217833014886237#655]
: : +- InMemoryTableScan [dst#45, relationship#46, src#44]
: : +- InMemoryRelation [src#44, dst#45, relationship#46], StorageLevel(disk, memory, deserialized, 1 replicas)
: : +- *(1) Scan ExistingRDD[src#44,dst#45,relationship#46]
: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, struct<id:string,name:string,age:bigint>, false].id)), [id=#647]
: +- *(4) Project [struct(id, id#38, name, name#39, age, age#40L) AS c#657]
: +- InMemoryTableScan [age#40L, id#38, name#39]
: +- InMemoryRelation [id#38, name#39, age#40L], StorageLevel(disk, memory, deserialized, 1 replicas)
: +- *(1) Scan ExistingRDD[id#38,name#39,age#40L]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, struct<src:string,dst:string,relationship:string>, false].src, input[0, struct<src:string,dst:string,relationship:string>, false].dst)), [id=#654]
+- *(5) Project [struct(src, src#44, dst, dst#45, relationship, relationship#46) AS __tmp-6526019406657860729#687]
+- InMemoryTableScan [dst#45, relationship#46, src#44]
+- InMemoryRelation [src#44, dst#45, relationship#46], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) Scan ExistingRDD[src#44,dst#45,relationship#46]
# Negation
oneway = g.find("(a)-[]->(b); !(b)-[]->(a)")
oneway.show()
+---------------+----------------+
| a| b|
+---------------+----------------+
| [a, Alice, 34]| [e, Esther, 32]|
|[e, Esther, 32]| [d, David, 29]|
| [a, Alice, 34]| [b, Bob, 36]|
| [g, Gabby, 60]| [e, Esther, 32]|
|[e, Esther, 32]| [f, Fanny, 38]|
| [f, Fanny, 38]|[c, Charlie, 37]|
| [d, David, 29]| [a, Alice, 34]|
+---------------+----------------+
# Find vertices without incoming edges:
g.find("!()-[]->(a)").show()
+--------------+
| a|
+--------------+
|[g, Gabby, 60]|
+--------------+
# More meaningful queries can be expressed by applying filters.
# Question: where is this filter applied?
g.find("(a)-[e]->(b); (b)-[]->(a)").filter("b.age > 36").show()
+------------+--------------+----------------+
| a| e| b|
+------------+--------------+----------------+
|[b, Bob, 36]|[b, c, follow]|[c, Charlie, 37]|
+------------+--------------+----------------+
+------------+
|relationship|
+------------+
| follow|
+------------+
g.find("(a)-[]->(b); (b)-[]->(a)").filter("b.age > 36").explain()
== Physical Plan ==
*(4) Project [a#2584, b#2586]
+- *(4) BroadcastHashJoin [b#2586.id, a#2584.id], [__tmp2506060614762666678#2609.src, __tmp2506060614762666678#2609.dst], Inner, BuildRight
:- *(4) Project [a#2584, b#2586]
: +- *(4) BroadcastHashJoin [__tmp-3851898762290097694#2582.dst], [b#2586.id], Inner, BuildRight
: :- *(4) BroadcastHashJoin [__tmp-3851898762290097694#2582.src], [a#2584.id], Inner, BuildRight
: : :- *(4) Project [struct(src, src#44, dst, dst#45, relationship, relationship#46) AS __tmp-3851898762290097694#2582]
: : : +- InMemoryTableScan [dst#45, relationship#46, src#44]
: : : +- InMemoryRelation [src#44, dst#45, relationship#46], StorageLevel(disk, memory, deserialized, 1 replicas)
: : : +- *(1) Scan ExistingRDD[src#44,dst#45,relationship#46]
: : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, struct<id:string,name:string,age:bigint>, false].id)), [id=#1356]
: : +- *(1) Project [struct(id, id#38, name, name#39, age, age#40L) AS a#2584]
: : +- InMemoryTableScan [age#40L, id#38, name#39]
: : +- InMemoryRelation [id#38, name#39, age#40L], StorageLevel(disk, memory, deserialized, 1 replicas)
: : +- *(1) Scan ExistingRDD[id#38,name#39,age#40L]
: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, struct<id:string,name:string,age:bigint>, false].id)), [id=#1363]
: +- *(2) Project [struct(id, id#38, name, name#39, age, age#40L) AS b#2586]
: +- *(2) Filter (isnotnull(age#40L) AND (age#40L > 36))
: +- InMemoryTableScan [age#40L, id#38, name#39], [isnotnull(age#40L), (age#40L > 36)]
: +- InMemoryRelation [id#38, name#39, age#40L], StorageLevel(disk, memory, deserialized, 1 replicas)
: +- *(1) Scan ExistingRDD[id#38,name#39,age#40L]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, struct<src:string,dst:string,relationship:string>, false].src, input[0, struct<src:string,dst:string,relationship:string>, false].dst)), [id=#1370]
+- *(3) Project [struct(src, src#44, dst, dst#45, relationship, relationship#46) AS __tmp2506060614762666678#2609]
+- InMemoryTableScan [dst#45, relationship#46, src#44]
+- InMemoryRelation [src#44, dst#45, relationship#46], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) Scan ExistingRDD[src#44,dst#45,relationship#46]
# Find chains of 4 vertices such that at least 2 of the 3 edges are "friend" relationships.
# The when function is similar to the CASE WHEN in SQL
chain4 = g.find("(a)-[e1]->(b); (b)-[e2]->(c); (c)-[e3]->(d)").where('a!=d AND a!=c AND b!=d')
friendTo1 = lambda e: when(e['relationship'] == 'friend', 1).otherwise(0)
chain4.select('*',friendTo1(chain4['e1']).alias('f1'), \
friendTo1(chain4['e2']).alias('f2'), \
friendTo1(chain4['e3']).alias('f3')) \
.where('f1 + f2 + f3 >= 2').select('a', 'b', 'c', 'd').show()
+---------------+--------------+---------------+--------------+---------------+--------------+----------------+---+---+---+
| a| e1| b| e2| c| e3| d| f1| f2| f3|
+---------------+--------------+---------------+--------------+---------------+--------------+----------------+---+---+---+
| [d, David, 29]|[d, a, friend]| [a, Alice, 34]|[a, e, friend]|[e, Esther, 32]|[e, f, follow]| [f, Fanny, 38]| 1| 1| 0|
| [d, David, 29]|[d, a, friend]| [a, Alice, 34]|[a, b, friend]| [b, Bob, 36]|[b, c, follow]|[c, Charlie, 37]| 1| 1| 0|
|[e, Esther, 32]|[e, d, friend]| [d, David, 29]|[d, a, friend]| [a, Alice, 34]|[a, b, friend]| [b, Bob, 36]| 1| 1| 1|
| [g, Gabby, 60]|[g, e, follow]|[e, Esther, 32]|[e, d, friend]| [d, David, 29]|[d, a, friend]| [a, Alice, 34]| 0| 1| 1|
+---------------+--------------+---------------+--------------+---------------+--------------+----------------+---+---+---+
Subgraphs
# Select subgraph of users older than 30, and relationships of type "friend".
# Drop isolated vertices (users) which are not contained in any edges (relationships).
g1 = g.filterVertices("age > 30").filterEdges("relationship = 'friend'")\
.dropIsolatedVertices()
g1.vertices.show()
g1.edges.show()
+---+------+---+
| id| name|age|
+---+------+---+
| e|Esther| 32|
| b| Bob| 36|
| a| Alice| 34|
+---+------+---+
+---+---+------------+
|src|dst|relationship|
+---+---+------------+
| a| e| friend|
| a| b| friend|
+---+---+------------+
# Select subgraph based on edges "e" of type "follow"
# pointing from a younger user "a" to an older user "b".
paths = g.find("(a)-[e]->(b)")\
.filter("e.relationship = 'follow'")\
.filter("a.age < b.age")
# "paths" contains vertex info. Extract the edges.
e2 = paths.select("e.*")
# Construct the subgraph
g2 = GraphFrame(g.vertices, e2).dropIsolatedVertices()
g2.vertices.show()
g2.edges.show()
+---+-------+---+
| id| name|age|
+---+-------+---+
| f| Fanny| 38|
| e| Esther| 32|
| c|Charlie| 37|
| b| Bob| 36|
+---+-------+---+
+---+---+------------+
|src|dst|relationship|
+---+---+------------+
| e| f| follow|
| b| c| follow|
+---+---+------------+
BFS
# Starting vertex is 'a'
layers = [g.vertices.select('id').where("id = 'a'")]
visited = layers[0]
while layers[-1].count() > 0:
# From the current layer, get all the one-hop neighbors
d1 = layers[-1].join(g.edges, layers[-1]['id'] == g.edges['src'])
# Rename the column as 'id', and remove visited verices and duplicates
d2 = d1.select(d1['dst'].alias('id')) \
.subtract(visited).distinct().cache()
layers += [d2]
visited = visited.union(layers[-1]).cache()
+---+
| id|
+---+
| a|
+---+
+---+
| id|
+---+
| e|
| b|
+---+
+---+
| id|
+---+
| f|
| d|
| c|
+---+
# GraphFrames provides own BFS:
paths = g.bfs("id = 'a'", "age > 36")
paths.show()
+--------------+--------------+---------------+--------------+----------------+
| from| e0| v1| e1| to|
+--------------+--------------+---------------+--------------+----------------+
|[a, Alice, 34]|[a, b, friend]| [b, Bob, 36]|[b, c, follow]|[c, Charlie, 37]|
|[a, Alice, 34]|[a, e, friend]|[e, Esther, 32]|[e, f, follow]| [f, Fanny, 38]|
+--------------+--------------+---------------+--------------+----------------+
List Ranking
# -1 denotes end of list
data = [(0, 5), (1, 0), (3, 4), (4, 6), (5, -1), (6,1)]
e = spark.createDataFrame(data, ['src', 'dst'])
v = e.select(col('src').alias('id'), when(e.dst == -1, 0).otherwise(1).alias('d'))
v1 = spark.createDataFrame([(-1, 0)], ['id', 'd'])
v = v.union(v1)
v.show()
e.show()
+---+---+
| id| d|
+---+---+
| 0| 1|
| 1| 1|
| 3| 1|
| 4| 1|
| 5| 0|
| 6| 1|
| -1| 0|
+---+---+
+---+---+
|src|dst|
+---+---+
| 0| 5|
| 1| 0|
| 3| 4|
| 4| 6|
| 5| -1|
| 6| 1|
+---+---+
while e.filter('dst != -1').count() > 0:
g = GraphFrame(v, e)
g.cache()
v = g.triplets.select(col('src.id').alias('id'),
(col('src.d') + col('dst.d')).alias('d')) \
.union(v1)
e = g.find('(a)-[]->(b); (b)-[]->(c)') \
.select(col('a.id').alias('src'), col('c.id').alias('dst')) \
.union(e.filter('dst = -1'))
v.show()
+---+---+
| id| d|
+---+---+
| 0| 1|
| 1| 2|
| 3| 5|
| 4| 4|
| 5| 0|
| 6| 3|
| -1| 0|
+---+---+
Message passing via AggregateMessages
from pyspark.sql.functions import coalesce, col, lit, sum, when, min, max
from graphframes.lib import AggregateMessages as AM
# AggregateMessages has the following members: src, dst, edge, msg
# For each user, sum the ages of the adjacent users.
agg = g.aggregateMessages(
sum(AM.msg).alias("summedAges"),
sendToSrc = AM.dst['age'],
sendToDst = AM.src['age'])
agg.show()
+---+----------+
| id|summedAges|
+---+----------+
| g| 32|
| f| 69|
| e| 161|
| d| 66|
| c| 110|
| b| 108|
| a| 97|
+---+----------+
The Pregel Model for Graph Computation
# Pagerank in the Pregel model
from pyspark.sql.functions import coalesce, col, lit, sum, when, min
from graphframes.lib import Pregel
# Need to set up a directory for Pregel computation
sc.setCheckpointDir("checkpoint")
'''
Use builder pattern to describe the operations.
Call run() to start a run. It returns a DataFrame of vertices from the last iteration.
When a run starts, it expands the vertices DataFrame using column expressions
defined by withVertexColumn(). Those additional vertex properties can be
changed during Pregel iterations. In each Pregel iteration, there are three
phases:
* Given each edge triplet, generate messages and specify target vertices to
send, described by sendMsgToDst() and sendMsgToSrc().
* Aggregate messages by target vertex IDs, described by aggMsgs().
* Update additional vertex properties based on aggregated messages and states
from previous iteration, described by withVertexColumn().
'''
v = g.outDegrees
g = GraphFrame(v,e)
ranks = g.pregel \
.setMaxIter(5) \
.sendMsgToDst(Pregel.src("rank") / Pregel.src("outDegree")) \
.aggMsgs(sum(Pregel.msg())) \
.withVertexColumn("rank", lit(1.0), \
coalesce(Pregel.msg(), lit(0.0)) * lit(0.85) + lit(0.15)) \
.run()
ranks.show()
# pyspark.sql.functions.coalesce(*cols): Returns the first column that is not null.
# Not to be confused with spark.sql.coalesce(numPartitions)
+---+---------+-------------------+
| id|outDegree| rank|
+---+---------+-------------------+
| g| 1| 0.15|
| f| 1|0.41104330078124995|
| e| 2| 0.5032932031249999|
| d| 1|0.41104330078124995|
| c| 1| 2.780783203124999|
| b| 1| 2.2680220312499997|
| a| 2| 0.4758149609375|
+---+---------+-------------------+
# BFS in the Pregel model
g = GraphFrame(v,e)
dist = g.pregel \
.sendMsgToDst(when(Pregel.src('active'), Pregel.src('d') + 1)) \
.aggMsgs(min(Pregel.msg())) \
.withVertexColumn('d', when(v['id'] == 'a', 0).otherwise(99999), \
when(Pregel.msg() < col('d'), Pregel.msg()).otherwise(col('d'))) \
.withVertexColumn('active', when(v['id'] == 'a', True).otherwise(False), \
when(Pregel.msg() < col('d'), True).otherwise(False)) \
.run()
dist.show()
+---+---------+-----+------+
| id|outDegree| d|active|
+---+---------+-----+------+
| g| 1|99999| false|
| f| 1| 2| false|
| e| 2| 1| false|
| d| 1| 2| false|
| c| 1| 2| false|
| b| 1| 1| false|
| a| 2| 0| false|
+---+---------+-----+------+