Skip to content
!pip install networkx
Requirement already satisfied: networkx in /opt/conda/lib/python3.8/site-packages (2.5)
Requirement already satisfied: decorator>=4.3.0 in /opt/conda/lib/python3.8/site-packages (from networkx) (4.4.2)

Import dependencies

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages graphframes:graphframes:0.8.1-spark3.0-s_2.12 pyspark-shell'

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.types import Row
from graphframes import *
import networkx as nx
import matplotlib.pyplot as plt

sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
def PlotGraph(edge_list):
    Gplot=nx.DiGraph()
    for row in edge_list.select('src','dst').take(1000):
        Gplot.add_edge(row['src'],row['dst'])

#     plt.subplot(121)
#     plt.figure(figsize=(10, 10))
    nx.draw(Gplot, with_labels=True, font_weight='bold')
v = spark.createDataFrame([
  ("a", "Alice", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 37),
  ("d", "David", 29),
  ("e", "Esther", 32),
  ("f", "Fanny", 38),
  ("g", "Gabby", 60)
], ["id", "name", "age"])

# Edges DataFrame
e = spark.createDataFrame([
  ("a", "b", "friend"),
  ("b", "c", "follow"),
  ("c", "b", "follow"),
  ("f", "c", "follow"),
  ("e", "f", "follow"),
  ("e", "d", "friend"),
  ("d", "a", "friend"),
  ("a", "e", "friend"),
  ("g", "e", "follow")
], ["src", "dst", "relationship"])

# Create a GraphFrame
g = GraphFrame(v, e)

g.vertices.show()
g.edges.show()
+---+-------+---+
| id|   name|age|
+---+-------+---+
|  a|  Alice| 34|
|  b|    Bob| 36|
|  c|Charlie| 37|
|  d|  David| 29|
|  e| Esther| 32|
|  f|  Fanny| 38|
|  g|  Gabby| 60|
+---+-------+---+

+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|  a|  b|      friend|
|  b|  c|      follow|
|  c|  b|      follow|
|  f|  c|      follow|
|  e|  f|      follow|
|  e|  d|      friend|
|  d|  a|      friend|
|  a|  e|      friend|
|  g|  e|      follow|
+---+---+------------+
PlotGraph(g.edges)

png

Question 1

Write code to perform the following tasks using GraphFrames:

Find Alice's two-hop neighbors' names, regardless of the edge type.

friends = g.find("(a)-[]->(b);(b)-[]->(c)").filter("a.name='Alice'")
friends.select('c').collect()
[Row(c=Row(id='f', name='Fanny', age=38)),
 Row(c=Row(id='d', name='David', age=29)),
 Row(c=Row(id='c', name='Charlie', age=37))]

Question 2

Redo the previous question, but exclude Alice's two-hop neighbors who have an edge back to Alice.

friends = g.find("(a)-[]->(b);(b)-[]->(c);(c)-[]->(a)").filter("a.name='Alice'")
friends.select('c').collect()
[Row(c=Row(id='d', name='David', age=29))]

Question 3

Find all people who follow Charlie.

Hint: Use AND in SQL, or (..) & (..) in DataFrame boolean expressions.

followers = g.find("(a)-[e]->(b)").filter("b.name='Charlie' AND e.relationship='follow' ")
followers.select('a').show()
+--------------+
|             a|
+--------------+
|[f, Fanny, 38]|
|  [b, Bob, 36]|
+--------------+

Question 4

Find all people who are being followed by at least 2 people.

g.edges.filter(g.edges.relationship == 'follow').groupby('dst').count().filter("count>=2").show()
+---+-----+
|dst|count|
+---+-----+
|  c|    2|
+---+-----+

Question 5

Create a queue of 10 RDDs using this data set and feed it into a Spark Streaming program. Your Spark Streaming algorithm should maintain a state that keeps track of the longest noun seen so far associated with each distinct adjective. After each RDD, print any 5 adjectives and their associated longest nouns, as well as the longest noun associated with the adjective 'good'. Note that not every line in the data set contains exactly two words, so make sure to clean the data as they are fed into the streaming program. The skeleton code is provided below:

from pyspark.streaming import StreamingContext

ssc = StreamingContext(sc, 5)
# Provide a checkpointing directory. Required for stateful transformations
ssc.checkpoint("checkpoint")

numPartitions = 8
rdd = sc.textFile('../data/adj_noun_pairs.txt', numPartitions)
rddQueue = rdd.randomSplit([1]*10, 123)
lines = ssc.queueStream(rddQueue)

# FILL IN YOUR CODE

ssc.start()
ssc.awaitTermination(50)
ssc.stop(False)
print("Finished")
from pyspark.streaming import StreamingContext

ssc = StreamingContext(sc, 5)
# Provide a checkpointing directory. Required for stateful transformations
ssc.checkpoint("checkpoint")

numPartitions = 8
rdd = sc.textFile('adj_noun_pairs.txt', numPartitions)
rddQueue = rdd.randomSplit([1]*10, 123)
lines = ssc.queueStream(rddQueue)



ssc.start()
ssc.awaitTermination(10)
ssc.stop(False)
print("Finished")
---------------------------------------------------------------------------

IllegalArgumentException                  Traceback (most recent call last)

<ipython-input-49-043ad3d9c502> in <module>
     12 
     13 
---> 14 ssc.start()
     15 ssc.awaitTermination(10)
     16 ssc.stop(False)


/usr/local/spark/python/pyspark/streaming/context.py in start(self)
    177         Start the execution of the streams.
    178         """
--> 179         self._jssc.start()
    180         StreamingContext._activeContext = self
    181


/usr/local/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1302 
   1303         answer = self.gateway_client.send_command(command)
-> 1304         return_value = get_return_value(
   1305             answer, self.gateway_client, self.target_id, self.name)
   1306


/usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    132                 # Hide where the exception came from that shows a non-Pythonic
    133                 # JVM exception message.
--> 134                 raise_from(converted)
    135             else:
    136                 raise


/usr/local/spark/python/pyspark/sql/utils.py in raise_from(e)


IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute