Skip to content

Graph


!pip install pyspark
Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
     |████████████████████████████████| 204.2MB 38kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
     |████████████████████████████████| 204kB 51.8MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612243 sha256=d4a7bf9d9286fd24237188aa6c7e6fce433cbe8062ab1b5448e6c06d960012fe
  Stored in directory: /root/.cache/pip/wheels/5e/bd/07/031766ca628adec8435bb40f0bd83bb676ce65ff4007f8e73f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1
!pip install graphframes
Collecting graphframes
  Downloading https://files.pythonhosted.org/packages/0b/27/c7c7e1ced2fe9a905f865dd91faaec2ac8a8e313f511678c8ec92a41a153/graphframes-0.6-py2.py3-none-any.whl
Requirement already satisfied: numpy in /usr/local/lib/python3.6/dist-packages (from graphframes) (1.18.5)
Collecting nose
[?25l  Downloading https://files.pythonhosted.org/packages/15/d8/dd071918c040f50fa1cf80da16423af51ff8ce4a0f2399b7bf8de45ac3d9/nose-1.3.7-py3-none-any.whl (154kB)
     |████████████████████████████████| 163kB 15.3MB/s 
[?25hInstalling collected packages: nose, graphframes
Successfully installed graphframes-0.6 nose-1.3.7
!wget https://dl.bintray.com/spark-packages/maven/graphframes/graphframes/0.8.0-spark3.0-s_2.12/graphframes-0.8.0-spark3.0-s_2.12.jar
--2020-12-02 15:45:29--  https://dl.bintray.com/spark-packages/maven/graphframes/graphframes/0.8.0-spark3.0-s_2.12/graphframes-0.8.0-spark3.0-s_2.12.jar
Resolving dl.bintray.com (dl.bintray.com)... 3.122.43.129, 35.157.127.85
Connecting to dl.bintray.com (dl.bintray.com)|3.122.43.129|:443... connected.
HTTP request sent, awaiting response... 302 
Location: https://d29vzk4ow07wi7.cloudfront.net/b62d4bb1c4fdd74c9ce5aa4adee520a7a4375c2de73487381644e5220c67c1dd?response-content-disposition=attachment%3Bfilename%3D%22graphframes-0.8.0-spark3.0-s_2.12.jar%22&Policy=eyJTdGF0ZW1lbnQiOiBbeyJSZXNvdXJjZSI6Imh0dHAqOi8vZDI5dnprNG93MDd3aTcuY2xvdWRmcm9udC5uZXQvYjYyZDRiYjFjNGZkZDc0YzljZTVhYTRhZGVlNTIwYTdhNDM3NWMyZGU3MzQ4NzM4MTY0NGU1MjIwYzY3YzFkZD9yZXNwb25zZS1jb250ZW50LWRpc3Bvc2l0aW9uPWF0dGFjaG1lbnQlM0JmaWxlbmFtZSUzRCUyMmdyYXBoZnJhbWVzLTAuOC4wLXNwYXJrMy4wLXNfMi4xMi5qYXIlMjIiLCJDb25kaXRpb24iOnsiRGF0ZUxlc3NUaGFuIjp7IkFXUzpFcG9jaFRpbWUiOjE2MDY5MjQ2NDl9LCJJcEFkZHJlc3MiOnsiQVdTOlNvdXJjZUlwIjoiMC4wLjAuMC8wIn19fV19&Signature=dJQvqo-UV~oRF0t93Qw-YSGpDE1pvQCuS0t~kNdiC2cDsUoJ5CdWB-RinPlpEX5TY8-oIQBnpefd7ljNKnJ5t7L1ae4ZcIQLfqBdvJdMK7AcKzytxy3cc17j7Zc80hbTMYVkhThFSJS0Loz6fvPyedCqfjI8G66Mrp46VgSpTBCqHxF0bKusZuM4w82M9d-iLmYELnyDNPHTbLIAjlMh24CcxETAKmI~AN-pZPjaGz6YMc9rFuyROe8FE4p2B5jbmjzo5LB0AHNdJll~GXtqGFKPsdJavvoCVDqbAdyJxL3XtGZdMLwbSHO6WMhbJRetQc5mEgqSLPsrNXrCvD2Q-g__&Key-Pair-Id=APKAIFKFWOMXM2UMTSFA [following]
--2020-12-02 15:45:29--  https://d29vzk4ow07wi7.cloudfront.net/b62d4bb1c4fdd74c9ce5aa4adee520a7a4375c2de73487381644e5220c67c1dd?response-content-disposition=attachment%3Bfilename%3D%22graphframes-0.8.0-spark3.0-s_2.12.jar%22&Policy=eyJTdGF0ZW1lbnQiOiBbeyJSZXNvdXJjZSI6Imh0dHAqOi8vZDI5dnprNG93MDd3aTcuY2xvdWRmcm9udC5uZXQvYjYyZDRiYjFjNGZkZDc0YzljZTVhYTRhZGVlNTIwYTdhNDM3NWMyZGU3MzQ4NzM4MTY0NGU1MjIwYzY3YzFkZD9yZXNwb25zZS1jb250ZW50LWRpc3Bvc2l0aW9uPWF0dGFjaG1lbnQlM0JmaWxlbmFtZSUzRCUyMmdyYXBoZnJhbWVzLTAuOC4wLXNwYXJrMy4wLXNfMi4xMi5qYXIlMjIiLCJDb25kaXRpb24iOnsiRGF0ZUxlc3NUaGFuIjp7IkFXUzpFcG9jaFRpbWUiOjE2MDY5MjQ2NDl9LCJJcEFkZHJlc3MiOnsiQVdTOlNvdXJjZUlwIjoiMC4wLjAuMC8wIn19fV19&Signature=dJQvqo-UV~oRF0t93Qw-YSGpDE1pvQCuS0t~kNdiC2cDsUoJ5CdWB-RinPlpEX5TY8-oIQBnpefd7ljNKnJ5t7L1ae4ZcIQLfqBdvJdMK7AcKzytxy3cc17j7Zc80hbTMYVkhThFSJS0Loz6fvPyedCqfjI8G66Mrp46VgSpTBCqHxF0bKusZuM4w82M9d-iLmYELnyDNPHTbLIAjlMh24CcxETAKmI~AN-pZPjaGz6YMc9rFuyROe8FE4p2B5jbmjzo5LB0AHNdJll~GXtqGFKPsdJavvoCVDqbAdyJxL3XtGZdMLwbSHO6WMhbJRetQc5mEgqSLPsrNXrCvD2Q-g__&Key-Pair-Id=APKAIFKFWOMXM2UMTSFA
Resolving d29vzk4ow07wi7.cloudfront.net (d29vzk4ow07wi7.cloudfront.net)... 54.240.168.145, 54.240.168.82, 54.240.168.77, ...
Connecting to d29vzk4ow07wi7.cloudfront.net (d29vzk4ow07wi7.cloudfront.net)|54.240.168.145|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 243265 (238K) [multipart/form-data]
Saving to: ‘graphframes-0.8.0-spark3.0-s_2.12.jar’

graphframes-0.8.0-s 100%[===================>] 237.56K   563KB/s    in 0.4s

2020-12-02 15:45:31 (563 KB/s) - ‘graphframes-0.8.0-spark3.0-s_2.12.jar’ saved [243265/243265]
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages graphframes:graphframes:0.8.1-spark3.0-s_2.12 pyspark-shell'
---------------------------------------------------------------------------

NameError                                 Traceback (most recent call last)

<ipython-input-15-4d1ba1edf8bb> in <module>()
----> 1 os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages graphframes:graphframes:0.8.1-spark3.0-s_2.12 pyspark-shell'


NameError: name 'os' is not defined
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.types import Row

sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
from graphframes import *
sc.addPyFile("https://dl.bintray.com/spark-packages/maven/graphframes/graphframes/0.8.0-spark3.0-s_2.12/graphframes-0.8.0-spark3.0-s_2.12.jar")
# Vertics DataFrame
v = spark.createDataFrame([
  ("a", "Alice", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 37),
  ("d", "David", 29),
  ("e", "Esther", 32),
  ("f", "Fanny", 38),
  ("g", "Gabby", 60)
], ["id", "name", "age"])

# Edges DataFrame
e = spark.createDataFrame([
  ("a", "b", "friend"),
  ("b", "c", "follow"),
  ("c", "b", "follow"),
  ("f", "c", "follow"),
  ("e", "f", "follow"),
  ("e", "d", "friend"),
  ("d", "a", "friend"),
  ("a", "e", "friend"),
  ("g", "e", "follow")
], ["src", "dst", "relationship"])

# Create a GraphFrame
g = GraphFrame(v, e)

g.vertices.show()
g.edges.show()
---------------------------------------------------------------------------

Py4JJavaError                             Traceback (most recent call last)

<ipython-input-14-eccf3cb921e3> in <module>()
     24 
     25 # Create a GraphFrame
---> 26 g = GraphFrame(v, e)
     27 
     28 g.vertices.show()


/usr/local/lib/python3.6/dist-packages/graphframes/graphframe.py in __init__(self, v, e)
     63         self._sqlContext = v.sql_ctx
     64         self._sc = self._sqlContext._sc
---> 65         self._jvm_gf_api = _java_api(self._sc)
     66 
     67         self.ID = self._jvm_gf_api.ID()


/usr/local/lib/python3.6/dist-packages/graphframes/graphframe.py in _java_api(jsc)
     36 def _java_api(jsc):
     37     javaClassName = "org.graphframes.GraphFramePythonAPI"
---> 38     return jsc._jvm.Thread.currentThread().getContextClassLoader().loadClass(javaClassName) \
     39             .newInstance()
     40


/usr/local/lib/python3.6/dist-packages/py4j/java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:


/usr/local/lib/python3.6/dist-packages/pyspark/sql/utils.py in deco(*a, **kw)
    126     def deco(*a, **kw):
    127         try:
--> 128             return f(*a, **kw)
    129         except py4j.protocol.Py4JJavaError as e:
    130             converted = convert_exception(e.java_exception)


/usr/local/lib/python3.6/dist-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(


Py4JJavaError: An error occurred while calling o148.loadClass.
: java.lang.ClassNotFoundException: org.graphframes.GraphFramePythonAPI
    at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.base/java.lang.Thread.run(Thread.java:834)
3# g.vertices and g.edges are just DataFrames
# You can use any DataFrame API on them

g.edges.filter("src = 'a'").show()
+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|  a|  b|      friend|
|  a|  e|      friend|
+---+---+------------+
g.edges.filter("src = 'a'").count()
2
# Count the number of followers of c.
# This queries the edge DataFrame.
print(g.edges.filter("relationship = 'follow' and dst = 'c'").count())
2
# A GraphFrame has additional attributes

g.outDegrees.show()
+---+---------+
| id|outDegree|
+---+---------+
|  g|        1|
|  f|        1|
|  e|        2|
|  d|        1|
|  c|        1|
|  b|        1|
|  a|        2|
+---+---------+
g.inDegrees.show()
+---+--------+
| id|inDegree|
+---+--------+
|  f|       1|
|  e|       2|
|  d|       1|
|  c|       2|
|  b|       2|
|  a|       1|
+---+--------+
g.inDegrees.explain()
== Physical Plan ==
*(2) HashAggregate(keys=[dst#45], functions=[count(1)])
+- Exchange hashpartitioning(dst#45, 200), true, [id=#171]
   +- *(1) HashAggregate(keys=[dst#45], functions=[partial_count(1)])
      +- *(1) Project [dst#45]
         +- *(1) Scan ExistingRDD[src#44,dst#45,relationship#46]
myInDegrees = g.edges.groupBy('dst').count()\
               .withColumnRenamed('dst', 'id').withColumnRenamed('count', 'inDegree')
myInDegrees.show()
+---+--------+
| id|inDegree|
+---+--------+
|  f|       1|
|  e|       2|
|  d|       1|
|  c|       2|
|  b|       2|
|  a|       1|
+---+--------+
myInDegrees.explain()
== Physical Plan ==
*(2) HashAggregate(keys=[dst#45], functions=[count(1)])
+- Exchange hashpartitioning(dst#45, 200), true, [id=#218]
   +- *(1) HashAggregate(keys=[dst#45], functions=[partial_count(1)])
      +- *(1) Project [dst#45]
         +- *(1) Scan ExistingRDD[src#44,dst#45,relationship#46]
print(g.inDegrees.storageLevel)
Serialized 1x Replicated
g.inDegrees.cache()
DataFrame[id: string, inDegree: int]
print(g.inDegrees.storageLevel)
Disk Memory Deserialized 1x Replicated
print(g.vertices.storageLevel)
Serialized 1x Replicated
g.cache()
GraphFrame(v:[id: string, name: string ... 1 more field], e:[src: string, dst: string ... 1 more field])
print(g.vertices.storageLevel)
print(g.edges.storageLevel)
Disk Memory Deserialized 1x Replicated
Disk Memory Deserialized 1x Replicated
# A triplet view of the graph

g.triplets.show()
+----------------+--------------+----------------+
|             src|          edge|             dst|
+----------------+--------------+----------------+
| [e, Esther, 32]|[e, f, follow]|  [f, Fanny, 38]|
|  [g, Gabby, 60]|[g, e, follow]| [e, Esther, 32]|
|  [a, Alice, 34]|[a, e, friend]| [e, Esther, 32]|
| [e, Esther, 32]|[e, d, friend]|  [d, David, 29]|
|  [f, Fanny, 38]|[f, c, follow]|[c, Charlie, 37]|
|    [b, Bob, 36]|[b, c, follow]|[c, Charlie, 37]|
|[c, Charlie, 37]|[c, b, follow]|    [b, Bob, 36]|
|  [a, Alice, 34]|[a, b, friend]|    [b, Bob, 36]|
|  [d, David, 29]|[d, a, friend]|  [a, Alice, 34]|
+----------------+--------------+----------------+
g.triplets.explain()
== Physical Plan ==
*(9) Project [src#217, edge#215, dst#219]
+- *(9) SortMergeJoin [edge#215.dst], [dst#219.id], Inner
   :- *(6) Sort [edge#215.dst ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(edge#215.dst, 200), true, [id=#312]
   :     +- *(5) SortMergeJoin [edge#215.src], [src#217.id], Inner
   :        :- *(2) Sort [edge#215.src ASC NULLS FIRST], false, 0
   :        :  +- Exchange hashpartitioning(edge#215.src, 200), true, [id=#297]
   :        :     +- *(1) Project [struct(src, src#44, dst, dst#45, relationship, relationship#46) AS edge#215]
   :        :        +- InMemoryTableScan [dst#45, relationship#46, src#44]
   :        :              +- InMemoryRelation [src#44, dst#45, relationship#46], StorageLevel(disk, memory, deserialized, 1 replicas)
   :        :                    +- *(1) Scan ExistingRDD[src#44,dst#45,relationship#46]
   :        +- *(4) Sort [src#217.id ASC NULLS FIRST], false, 0
   :           +- Exchange hashpartitioning(src#217.id, 200), true, [id=#305]
   :              +- *(3) Project [struct(id, id#38, name, name#39, age, age#40L) AS src#217]
   :                 +- InMemoryTableScan [age#40L, id#38, name#39]
   :                       +- InMemoryRelation [id#38, name#39, age#40L], StorageLevel(disk, memory, deserialized, 1 replicas)
   :                             +- *(1) Scan ExistingRDD[id#38,name#39,age#40L]
   +- *(8) Sort [dst#219.id ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(dst#219.id, 200), true, [id=#320]
         +- *(7) Project [struct(id, id#38, name, name#39, age, age#40L) AS dst#219]
            +- InMemoryTableScan [age#40L, id#38, name#39]
                  +- InMemoryRelation [id#38, name#39, age#40L], StorageLevel(disk, memory, deserialized, 1 replicas)
                        +- *(1) Scan ExistingRDD[id#38,name#39,age#40L]

Motif Finding

# Search for pairs of vertices with edges in both directions between them.
motifs = g.find("(a)-[]->(b); (b)-[]->(a)").filter('a.id < b.id')
motifs.show()
+------------+----------------+
|           a|               b|
+------------+----------------+
|[b, Bob, 36]|[c, Charlie, 37]|
+------------+----------------+
# Find triangles

triangles = g.find("(a)-[]->(b); (b)-[]->(c); (c)-[]->(a)")
triangles = triangles.filter("a.id < b.id AND a.id < c.id")
triangles.show()
+--------------+---------------+--------------+
|             a|              b|             c|
+--------------+---------------+--------------+
|[a, Alice, 34]|[e, Esther, 32]|[d, David, 29]|
+--------------+---------------+--------------+
triangles.explain()
== Physical Plan ==
*(6) Project [a#630, b#632, c#657]
+- *(6) BroadcastHashJoin [c#657.id, a#630.id], [__tmp-6526019406657860729#687.src, __tmp-6526019406657860729#687.dst], Inner, BuildRight
   :- *(6) Project [a#630, b#632, c#657]
   :  +- *(6) BroadcastHashJoin [__tmp-430217833014886237#655.dst], [c#657.id], Inner, BuildRight, (a#630.id < c#657.id)
   :     :- *(6) BroadcastHashJoin [b#632.id], [__tmp-430217833014886237#655.src], Inner, BuildRight
   :     :  :- *(6) Project [a#630, b#632]
   :     :  :  +- *(6) BroadcastHashJoin [__tmp-1043886091038848698#628.dst], [b#632.id], Inner, BuildRight, (a#630.id < b#632.id)
   :     :  :     :- *(6) BroadcastHashJoin [__tmp-1043886091038848698#628.src], [a#630.id], Inner, BuildRight
   :     :  :     :  :- *(6) Project [struct(src, src#44, dst, dst#45, relationship, relationship#46) AS __tmp-1043886091038848698#628]
   :     :  :     :  :  +- InMemoryTableScan [dst#45, relationship#46, src#44]
   :     :  :     :  :        +- InMemoryRelation [src#44, dst#45, relationship#46], StorageLevel(disk, memory, deserialized, 1 replicas)
   :     :  :     :  :              +- *(1) Scan ExistingRDD[src#44,dst#45,relationship#46]
   :     :  :     :  +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, struct<id:string,name:string,age:bigint>, false].id)), [id=#628]
   :     :  :     :     +- *(1) Project [struct(id, id#38, name, name#39, age, age#40L) AS a#630]
   :     :  :     :        +- InMemoryTableScan [age#40L, id#38, name#39]
   :     :  :     :              +- InMemoryRelation [id#38, name#39, age#40L], StorageLevel(disk, memory, deserialized, 1 replicas)
   :     :  :     :                    +- *(1) Scan ExistingRDD[id#38,name#39,age#40L]
   :     :  :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, struct<id:string,name:string,age:bigint>, false].id)), [id=#634]
   :     :  :        +- *(2) Project [struct(id, id#38, name, name#39, age, age#40L) AS b#632]
   :     :  :           +- InMemoryTableScan [age#40L, id#38, name#39]
   :     :  :                 +- InMemoryRelation [id#38, name#39, age#40L], StorageLevel(disk, memory, deserialized, 1 replicas)
   :     :  :                       +- *(1) Scan ExistingRDD[id#38,name#39,age#40L]
   :     :  +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, struct<src:string,dst:string,relationship:string>, false].src)), [id=#641]
   :     :     +- *(3) Project [struct(src, src#44, dst, dst#45, relationship, relationship#46) AS __tmp-430217833014886237#655]
   :     :        +- InMemoryTableScan [dst#45, relationship#46, src#44]
   :     :              +- InMemoryRelation [src#44, dst#45, relationship#46], StorageLevel(disk, memory, deserialized, 1 replicas)
   :     :                    +- *(1) Scan ExistingRDD[src#44,dst#45,relationship#46]
   :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, struct<id:string,name:string,age:bigint>, false].id)), [id=#647]
   :        +- *(4) Project [struct(id, id#38, name, name#39, age, age#40L) AS c#657]
   :           +- InMemoryTableScan [age#40L, id#38, name#39]
   :                 +- InMemoryRelation [id#38, name#39, age#40L], StorageLevel(disk, memory, deserialized, 1 replicas)
   :                       +- *(1) Scan ExistingRDD[id#38,name#39,age#40L]
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, struct<src:string,dst:string,relationship:string>, false].src, input[0, struct<src:string,dst:string,relationship:string>, false].dst)), [id=#654]
      +- *(5) Project [struct(src, src#44, dst, dst#45, relationship, relationship#46) AS __tmp-6526019406657860729#687]
         +- InMemoryTableScan [dst#45, relationship#46, src#44]
               +- InMemoryRelation [src#44, dst#45, relationship#46], StorageLevel(disk, memory, deserialized, 1 replicas)
                     +- *(1) Scan ExistingRDD[src#44,dst#45,relationship#46]
# Negation
oneway = g.find("(a)-[]->(b); !(b)-[]->(a)")
oneway.show()
+---------------+----------------+
|              a|               b|
+---------------+----------------+
| [a, Alice, 34]| [e, Esther, 32]|
|[e, Esther, 32]|  [d, David, 29]|
| [a, Alice, 34]|    [b, Bob, 36]|
| [g, Gabby, 60]| [e, Esther, 32]|
|[e, Esther, 32]|  [f, Fanny, 38]|
| [f, Fanny, 38]|[c, Charlie, 37]|
| [d, David, 29]|  [a, Alice, 34]|
+---------------+----------------+
# Find vertices without incoming edges:
g.find("!()-[]->(a)").show()
+--------------+
|             a|
+--------------+
|[g, Gabby, 60]|
+--------------+
# More meaningful queries can be expressed by applying filters.
# Question: where is this filter applied?

g.find("(a)-[e]->(b); (b)-[]->(a)").filter("b.age > 36").show()
+------------+--------------+----------------+
|           a|             e|               b|
+------------+--------------+----------------+
|[b, Bob, 36]|[b, c, follow]|[c, Charlie, 37]|
+------------+--------------+----------------+

+------------+
|relationship|
+------------+
|      follow|
+------------+
g.find("(a)-[]->(b); (b)-[]->(a)").filter("b.age > 36").explain()
== Physical Plan ==
*(4) Project [a#2584, b#2586]
+- *(4) BroadcastHashJoin [b#2586.id, a#2584.id], [__tmp2506060614762666678#2609.src, __tmp2506060614762666678#2609.dst], Inner, BuildRight
   :- *(4) Project [a#2584, b#2586]
   :  +- *(4) BroadcastHashJoin [__tmp-3851898762290097694#2582.dst], [b#2586.id], Inner, BuildRight
   :     :- *(4) BroadcastHashJoin [__tmp-3851898762290097694#2582.src], [a#2584.id], Inner, BuildRight
   :     :  :- *(4) Project [struct(src, src#44, dst, dst#45, relationship, relationship#46) AS __tmp-3851898762290097694#2582]
   :     :  :  +- InMemoryTableScan [dst#45, relationship#46, src#44]
   :     :  :        +- InMemoryRelation [src#44, dst#45, relationship#46], StorageLevel(disk, memory, deserialized, 1 replicas)
   :     :  :              +- *(1) Scan ExistingRDD[src#44,dst#45,relationship#46]
   :     :  +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, struct<id:string,name:string,age:bigint>, false].id)), [id=#1356]
   :     :     +- *(1) Project [struct(id, id#38, name, name#39, age, age#40L) AS a#2584]
   :     :        +- InMemoryTableScan [age#40L, id#38, name#39]
   :     :              +- InMemoryRelation [id#38, name#39, age#40L], StorageLevel(disk, memory, deserialized, 1 replicas)
   :     :                    +- *(1) Scan ExistingRDD[id#38,name#39,age#40L]
   :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, struct<id:string,name:string,age:bigint>, false].id)), [id=#1363]
   :        +- *(2) Project [struct(id, id#38, name, name#39, age, age#40L) AS b#2586]
   :           +- *(2) Filter (isnotnull(age#40L) AND (age#40L > 36))
   :              +- InMemoryTableScan [age#40L, id#38, name#39], [isnotnull(age#40L), (age#40L > 36)]
   :                    +- InMemoryRelation [id#38, name#39, age#40L], StorageLevel(disk, memory, deserialized, 1 replicas)
   :                          +- *(1) Scan ExistingRDD[id#38,name#39,age#40L]
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, struct<src:string,dst:string,relationship:string>, false].src, input[0, struct<src:string,dst:string,relationship:string>, false].dst)), [id=#1370]
      +- *(3) Project [struct(src, src#44, dst, dst#45, relationship, relationship#46) AS __tmp2506060614762666678#2609]
         +- InMemoryTableScan [dst#45, relationship#46, src#44]
               +- InMemoryRelation [src#44, dst#45, relationship#46], StorageLevel(disk, memory, deserialized, 1 replicas)
                     +- *(1) Scan ExistingRDD[src#44,dst#45,relationship#46]
# Find chains of 4 vertices such that at least 2 of the 3 edges are "friend" relationships.
# The when function is similar to the CASE WHEN in SQL

chain4 = g.find("(a)-[e1]->(b); (b)-[e2]->(c); (c)-[e3]->(d)").where('a!=d AND a!=c AND b!=d')

friendTo1 = lambda e: when(e['relationship'] == 'friend', 1).otherwise(0)

chain4.select('*',friendTo1(chain4['e1']).alias('f1'), \
                  friendTo1(chain4['e2']).alias('f2'), \
                  friendTo1(chain4['e3']).alias('f3')) \
      .where('f1 + f2 + f3 >= 2').select('a', 'b', 'c', 'd').show()
+---------------+--------------+---------------+--------------+---------------+--------------+----------------+---+---+---+
|              a|            e1|              b|            e2|              c|            e3|               d| f1| f2| f3|
+---------------+--------------+---------------+--------------+---------------+--------------+----------------+---+---+---+
| [d, David, 29]|[d, a, friend]| [a, Alice, 34]|[a, e, friend]|[e, Esther, 32]|[e, f, follow]|  [f, Fanny, 38]|  1|  1|  0|
| [d, David, 29]|[d, a, friend]| [a, Alice, 34]|[a, b, friend]|   [b, Bob, 36]|[b, c, follow]|[c, Charlie, 37]|  1|  1|  0|
|[e, Esther, 32]|[e, d, friend]| [d, David, 29]|[d, a, friend]| [a, Alice, 34]|[a, b, friend]|    [b, Bob, 36]|  1|  1|  1|
| [g, Gabby, 60]|[g, e, follow]|[e, Esther, 32]|[e, d, friend]| [d, David, 29]|[d, a, friend]|  [a, Alice, 34]|  0|  1|  1|
+---------------+--------------+---------------+--------------+---------------+--------------+----------------+---+---+---+

Subgraphs

# Select subgraph of users older than 30, and relationships of type "friend".
# Drop isolated vertices (users) which are not contained in any edges (relationships).

g1 = g.filterVertices("age > 30").filterEdges("relationship = 'friend'")\
      .dropIsolatedVertices()

g1.vertices.show()
g1.edges.show()
+---+------+---+
| id|  name|age|
+---+------+---+
|  e|Esther| 32|
|  b|   Bob| 36|
|  a| Alice| 34|
+---+------+---+

+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|  a|  e|      friend|
|  a|  b|      friend|
+---+---+------------+
# Select subgraph based on edges "e" of type "follow"
# pointing from a younger user "a" to an older user "b".

paths = g.find("(a)-[e]->(b)")\
  .filter("e.relationship = 'follow'")\
  .filter("a.age < b.age")

# "paths" contains vertex info. Extract the edges.

e2 = paths.select("e.*")

# Construct the subgraph
g2 = GraphFrame(g.vertices, e2).dropIsolatedVertices()

g2.vertices.show()
g2.edges.show()
+---+-------+---+
| id|   name|age|
+---+-------+---+
|  f|  Fanny| 38|
|  e| Esther| 32|
|  c|Charlie| 37|
|  b|    Bob| 36|
+---+-------+---+

+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|  e|  f|      follow|
|  b|  c|      follow|
+---+---+------------+

BFS

# Starting vertex is 'a'
layers = [g.vertices.select('id').where("id = 'a'")]
visited =  layers[0]

while layers[-1].count() > 0:
    # From the current layer, get all the one-hop neighbors
    d1 = layers[-1].join(g.edges, layers[-1]['id'] == g.edges['src'])
    # Rename the column as 'id', and remove visited verices and duplicates
    d2 = d1.select(d1['dst'].alias('id')) \
           .subtract(visited).distinct().cache()
    layers += [d2]
    visited = visited.union(layers[-1]).cache()
layers[0].show()
+---+
| id|
+---+
|  a|
+---+
layers[1].show()
+---+
| id|
+---+
|  e|
|  b|
+---+
layers[2].show()
+---+
| id|
+---+
|  f|
|  d|
|  c|
+---+
layers[3].show()
+---+
| id|
+---+
+---+
# GraphFrames provides own BFS:

paths = g.bfs("id = 'a'", "age > 36")
paths.show()
+--------------+--------------+---------------+--------------+----------------+
|          from|            e0|             v1|            e1|              to|
+--------------+--------------+---------------+--------------+----------------+
|[a, Alice, 34]|[a, b, friend]|   [b, Bob, 36]|[b, c, follow]|[c, Charlie, 37]|
|[a, Alice, 34]|[a, e, friend]|[e, Esther, 32]|[e, f, follow]|  [f, Fanny, 38]|
+--------------+--------------+---------------+--------------+----------------+

List Ranking

# -1 denotes end of list
data = [(0, 5), (1, 0), (3, 4), (4, 6), (5, -1), (6,1)]
e = spark.createDataFrame(data, ['src', 'dst'])
v = e.select(col('src').alias('id'), when(e.dst == -1, 0).otherwise(1).alias('d'))
v1 = spark.createDataFrame([(-1, 0)], ['id', 'd'])
v = v.union(v1)
v.show()
e.show()
+---+---+
| id|  d|
+---+---+
|  0|  1|
|  1|  1|
|  3|  1|
|  4|  1|
|  5|  0|
|  6|  1|
| -1|  0|
+---+---+

+---+---+
|src|dst|
+---+---+
|  0|  5|
|  1|  0|
|  3|  4|
|  4|  6|
|  5| -1|
|  6|  1|
+---+---+
while e.filter('dst != -1').count() > 0:
    g = GraphFrame(v, e)
    g.cache()
    v = g.triplets.select(col('src.id').alias('id'), 
                          (col('src.d') + col('dst.d')).alias('d')) \
         .union(v1)
    e = g.find('(a)-[]->(b); (b)-[]->(c)') \
         .select(col('a.id').alias('src'), col('c.id').alias('dst')) \
         .union(e.filter('dst = -1'))
v.show()
+---+---+
| id|  d|
+---+---+
|  0|  1|
|  1|  2|
|  3|  5|
|  4|  4|
|  5|  0|
|  6|  3|
| -1|  0|
+---+---+

Message passing via AggregateMessages

from pyspark.sql.functions import coalesce, col, lit, sum, when, min, max
from graphframes.lib import AggregateMessages as AM

# AggregateMessages has the following members: src, dst, edge, msg
# For each user, sum the ages of the adjacent users.
agg = g.aggregateMessages(
    sum(AM.msg).alias("summedAges"),
    sendToSrc = AM.dst['age'],
    sendToDst = AM.src['age'])
agg.show()
+---+----------+
| id|summedAges|
+---+----------+
|  g|        32|
|  f|        69|
|  e|       161|
|  d|        66|
|  c|       110|
|  b|       108|
|  a|        97|
+---+----------+

The Pregel Model for Graph Computation

# Pagerank in the Pregel model 

from pyspark.sql.functions import coalesce, col, lit, sum, when, min
from graphframes.lib import Pregel

# Need to set up a directory for Pregel computation
sc.setCheckpointDir("checkpoint")

'''
Use builder pattern to describe the operations.
Call run() to start a run. It returns a DataFrame of vertices from the last iteration.

When a run starts, it expands the vertices DataFrame using column expressions 
defined by withVertexColumn(). Those additional vertex properties can be 
changed during Pregel iterations. In each Pregel iteration, there are three 
phases:

* Given each edge triplet, generate messages and specify target vertices to 
  send, described by sendMsgToDst() and sendMsgToSrc().
* Aggregate messages by target vertex IDs, described by aggMsgs().
* Update additional vertex properties based on aggregated messages and states 
  from previous iteration, described by withVertexColumn().
'''
v = g.outDegrees
g = GraphFrame(v,e)
ranks = g.pregel \
        .setMaxIter(5) \
        .sendMsgToDst(Pregel.src("rank") / Pregel.src("outDegree")) \
        .aggMsgs(sum(Pregel.msg())) \
        .withVertexColumn("rank", lit(1.0), \
            coalesce(Pregel.msg(), lit(0.0)) * lit(0.85) + lit(0.15)) \
        .run()
ranks.show()

# pyspark.sql.functions.coalesce(*cols): Returns the first column that is not null.
# Not to be confused with spark.sql.coalesce(numPartitions)
+---+---------+-------------------+
| id|outDegree|               rank|
+---+---------+-------------------+
|  g|        1|               0.15|
|  f|        1|0.41104330078124995|
|  e|        2| 0.5032932031249999|
|  d|        1|0.41104330078124995|
|  c|        1|  2.780783203124999|
|  b|        1| 2.2680220312499997|
|  a|        2|    0.4758149609375|
+---+---------+-------------------+
# BFS in the Pregel model

g = GraphFrame(v,e)

dist = g.pregel \
        .sendMsgToDst(when(Pregel.src('active'), Pregel.src('d') + 1)) \
        .aggMsgs(min(Pregel.msg())) \
        .withVertexColumn('d', when(v['id'] == 'a', 0).otherwise(99999), \
            when(Pregel.msg() < col('d'), Pregel.msg()).otherwise(col('d'))) \
        .withVertexColumn('active', when(v['id'] == 'a', True).otherwise(False), \
            when(Pregel.msg() < col('d'), True).otherwise(False)) \
        .run()
dist.show()
+---+---------+-----+------+
| id|outDegree|    d|active|
+---+---------+-----+------+
|  g|        1|99999| false|
|  f|        1|    2| false|
|  e|        2|    1| false|
|  d|        1|    2| false|
|  c|        1|    2| false|
|  b|        1|    1| false|
|  a|        2|    0| false|
+---+---------+-----+------+