Skip to content
!pip install pyspark
Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
     |████████████████████████████████| 204.2MB 62kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
     |████████████████████████████████| 204kB 38.1MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612243 sha256=579f4f3993e28b1ba393be6ffc555dc3c75b4997f72e1ad9bd8ec34f37910075
  Stored in directory: /root/.cache/pip/wheels/5e/bd/07/031766ca628adec8435bb40f0bd83bb676ce65ff4007f8e73f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
df = spark.read.csv('/content/drive/My Drive/课程/HKUST/MSBD5003/homeworks/hw2/sales.csv', header=True, inferSchema=True)
df.show()
+----------------+--------+-----+------------+-----------------+--------------------+--------------+--------------+----------------+----------------+-----------+----------+
|Transaction_date| Product|Price|Payment_Type|             Name|                City|         State|       Country| Account_Created|      Last_Login|   Latitude| Longitude|
+----------------+--------+-----+------------+-----------------+--------------------+--------------+--------------+----------------+----------------+-----------+----------+
| 01/02/2009 6:17|Product1| 1200|  Mastercard|         carolina|            Basildon|       England|United Kingdom| 01/02/2009 6:00| 01/02/2009 6:08|       51.5|-1.1166667|
| 01/02/2009 4:53|Product1| 1200|        Visa|           Betina|Parkville        ...|            MO| United States| 01/02/2009 4:42| 01/02/2009 7:49|     39.195| -94.68194|
|01/02/2009 13:08|Product1| 1200|  Mastercard|Federica e Andrea|Astoria          ...|            OR| United States|01/01/2009 16:21|01/03/2009 12:32|   46.18806|   -123.83|
|01/03/2009 14:44|Product1| 1200|        Visa|            Gouya|              Echuca|      Victoria|     Australia|   9/25/05 21:13|01/03/2009 14:22|-36.1333333|    144.75|
|01/04/2009 12:56|Product2| 3600|        Visa|          Gerd W |Cahaba Heights   ...|            AL| United States|  11/15/08 15:47|01/04/2009 12:45|   33.52056|  -86.8025|
|01/04/2009 13:19|Product1| 1200|        Visa|         LAURENCE|Mickleton        ...|            NJ| United States|   9/24/08 15:19|01/04/2009 13:04|      39.79| -75.23806|
|01/04/2009 20:11|Product1| 1200|  Mastercard|            Fleur|Peoria           ...|            IL| United States| 01/03/2009 9:38|01/04/2009 19:45|   40.69361| -89.58889|
|01/02/2009 20:09|Product1| 1200|  Mastercard|             adam|Martin           ...|            TN| United States|01/02/2009 17:43|01/04/2009 20:01|   36.34333| -88.85028|
|01/04/2009 13:17|Product1| 1200|  Mastercard|  Renee Elisabeth|            Tel Aviv|      Tel Aviv|        Israel|01/04/2009 13:03|01/04/2009 22:10| 32.0666667|34.7666667|
|01/04/2009 14:11|Product1| 1200|        Visa|            Aidan|              Chatou| Ile-de-France|        France| 06/03/2008 4:22| 01/05/2009 1:17| 48.8833333|      2.15|
| 01/05/2009 2:42|Product1| 1200|      Diners|            Stacy|New York         ...|            NY| United States| 01/05/2009 2:23| 01/05/2009 4:59|   40.71417| -74.00639|
| 01/05/2009 5:39|Product1| 1200|        Amex|            Heidi|           Eindhoven| Noord-Brabant|   Netherlands| 01/05/2009 4:55| 01/05/2009 8:15|      51.45| 5.4666667|
| 01/02/2009 9:16|Product1| 1200|  Mastercard|            Sean |Shavano Park     ...|            TX| United States| 01/02/2009 8:32| 01/05/2009 9:05|   29.42389| -98.49333|
|01/05/2009 10:08|Product1| 1200|        Visa|          Georgia|Eagle            ...|            ID| United States|11/11/2008 15:53|01/05/2009 10:05|   43.69556|-116.35306|
|01/02/2009 14:18|Product1| 1200|        Visa|          Richard|Riverside        ...|            NJ| United States|12/09/2008 12:07|01/05/2009 11:01|   40.03222| -74.95778|
| 01/04/2009 1:05|Product1| 1200|      Diners|           Leanne|         Julianstown|         Meath|       Ireland| 01/04/2009 0:00|01/05/2009 13:36| 53.6772222|-6.3191667|
|01/05/2009 11:37|Product1| 1200|        Visa|            Janet|              Ottawa|       Ontario|        Canada| 01/05/2009 9:35|01/05/2009 19:24| 45.4166667|     -75.7|
| 01/06/2009 5:02|Product1| 1200|      Diners|          barbara|           Hyderabad|Andhra Pradesh|         India| 01/06/2009 2:41| 01/06/2009 7:52| 17.3833333|78.4666667|
| 01/06/2009 7:45|Product2| 3600|        Visa|           Sabine|              London|       England|United Kingdom| 01/06/2009 7:00| 01/06/2009 9:17|   51.52721|   0.14559|
| 01/02/2009 7:35|Product1| 1200|      Diners|             Hani|Salt Lake City   ...|            UT| United States|   12/30/08 5:44|01/06/2009 10:52|   40.76083|-111.89028|
+----------------+--------+-----+------------+-----------------+--------------------+--------------+--------------+----------------+----------------+-----------+----------+
only showing top 20 rows

Question 1

Find all distinct countries.

Hint: use select(), distinct()

countries = df.select("Country").distinct()
countries.collect()
[Row(Country='Sweden'),
 Row(Country='Jersey'),
 Row(Country='Malaysia'),
 Row(Country='Turkey'),
 Row(Country='Germany'),
 Row(Country='France'),
 Row(Country='Belgium'),
 Row(Country='Finland'),
 Row(Country='United States'),
 Row(Country='India'),
 Row(Country='Kuwait'),
 Row(Country='Malta'),
 Row(Country='Italy'),
 Row(Country='Norway'),
 Row(Country='Spain'),
 Row(Country='Denmark'),
 Row(Country='Ireland'),
 Row(Country='Israel'),
 Row(Country='Iceland'),
 Row(Country='South Korea'),
 Row(Country='Switzerland'),
 Row(Country='United Arab Emirates'),
 Row(Country='Canada'),
 Row(Country='Brazil'),
 Row(Country='Luxembourg'),
 Row(Country='New Zealand'),
 Row(Country='Australia'),
 Row(Country='Austria'),
 Row(Country='South Africa'),
 Row(Country='Bahrain'),
 Row(Country='Hungary'),
 Row(Country='United Kingdom'),
 Row(Country='Moldova'),
 Row(Country='Netherlands')]

Question 2

Find the Name and Price of sales records in Brazil.

Hint: use filter().

df.select("Country", "Price").filter("Country = 'Brazil' ").show()
+-------+-----+
|Country|Price|
+-------+-----+
| Brazil| 1200|
| Brazil| 7500|
+-------+-----+

Question 3

For each country, find the total Price.

Hint: Use groupBy()

df.groupBy('Country').sum('Price').withColumnRenamed('sum(Price)', "Total Price").show()
+-------------+-----------+
|      Country|Total Price|
+-------------+-----------+
|       Sweden|       8400|
|       Jersey|       1200|
|     Malaysia|       1200|
|       Turkey|       2400|
|      Germany|      22800|
|       France|      30300|
|      Belgium|       3600|
|      Finland|       1200|
|United States|     350350|
|        India|       2400|
|       Kuwait|       1200|
|        Malta|       3600|
|        Italy|       2400|
|       Norway|      12000|
|        Spain|       2400|
|      Denmark|       8400|
|      Ireland|      29100|
|       Israel|       1200|
|      Iceland|       1200|
|  South Korea|       1200|
+-------------+-----------+
only showing top 20 rows

Question 4

List countries by their total Price in descending order.

Hint: Use orderBy()

df.groupBy('Country').sum('Price').withColumnRenamed('sum(Price)', "Total Price").orderBy('Total Price', ascending=False).show()
+--------------------+-----------+
|             Country|Total Price|
+--------------------+-----------+
|       United States|     350350|
|      United Kingdom|      63600|
|              Canada|      42000|
|              France|      30300|
|             Ireland|      29100|
|             Germany|      22800|
|           Australia|      22800|
|         Switzerland|      19200|
|         Netherlands|      14400|
|              Norway|      12000|
|              Brazil|       8700|
|             Denmark|       8400|
|              Sweden|       8400|
|             Austria|       3600|
|        South Africa|       3600|
|               Malta|       3600|
|             Belgium|       3600|
|United Arab Emirates|       3600|
|              Turkey|       2400|
|         New Zealand|       2400|
+--------------------+-----------+
only showing top 20 rows

Question 5

Redo Question 3, but replace the country names by their IDs.

Hint: Use join()

df2 = spark.read.csv('/content/drive/My Drive/课程/HKUST/MSBD5003/homeworks/hw2/countries.csv', header=True, inferSchema=True)
df2.show()
+--------------+---+
|       Country| ID|
+--------------+---+
|United Kingdom|  1|
| United States|  2|
|     Australia|  3|
|        Israel|  4|
|        France|  5|
|   Netherlands|  6|
|       Ireland|  7|
|        Canada|  8|
|         India|  9|
|  South Africa| 10|
|       Finland| 11|
|   Switzerland| 12|
|       Denmark| 13|
|       Belgium| 14|
|        Sweden| 15|
|        Norway| 16|
|    Luxembourg| 17|
|         Italy| 18|
|       Germany| 19|
|       Moldova| 20|
+--------------+---+
only showing top 20 rows
df.join(df2, 'Country').groupBy('ID').sum('Price').withColumnRenamed('sum(Price)', "Total Price").show()
+---+-----------+
| ID|Total Price|
+---+-----------+
| 31|       1200|
| 34|       2400|
| 28|       3600|
| 26|       3600|
| 27|       1200|
| 12|      19200|
| 22|       3600|
|  1|      63600|
| 13|       8400|
|  6|      14400|
| 16|      12000|
|  3|      22800|
| 20|       1200|
|  5|      30300|
| 19|      22800|
| 15|       8400|
|  9|       2400|
| 17|       1200|
|  4|       1200|
|  8|      42000|
+---+-----------+
only showing top 20 rows

Question 6

Rewrite the PageRank example using DataFrame API. Here is a skeleton of the code. Your job is to fill in the missing part. The data files can be downloaded at:

from pyspark.sql.functions import *

numOfIterations = 10

lines = spark.read.text("pagerank_data.txt")
# You can also test your program on the follow larger data set:
# lines = spark.read.text("dblp.in")

a = lines.select(split(lines[0],' '))
links = a.select(a[0][0].alias('src'), a[0][1].alias('dst'))
outdegrees = links.groupBy('src').count()
ranks = outdegrees.select('src', lit(1).alias('rank'))

for iteration in range(numOfIterations):
# FILL IN THIS PART

ranks.orderBy(desc('rank')).show()
lines = spark.read.text("/content/drive/My Drive/课程/HKUST/MSBD5003/homeworks/hw2/pagerank_data.txt")
lines.show()
+-----+
|value|
+-----+
|  1 2|
|  1 3|
|  2 3|
|  3 4|
|  4 1|
|  2 1|
+-----+
from pyspark.sql.functions import *
from pyspark.sql.types import FloatType
from operator import add

numOfIterations = 10

# You can also test your program on the follow larger data set:
# lines = spark.read.text("dblp.in")

a = lines.select(split(lines[0],' '))
links = a.select(a[0][0].alias('src'), a[0][1].alias('dst'))
outdegrees = links.groupBy('src').count()
ranks = outdegrees.select('src', lit(1).alias('rank'))

for iteration in range(numOfIterations):
    contribs = links.join(ranks, 'src').join(outdegrees, 'src').withColumnRenamed('count', 'outdegrees')
    contribs = contribs.select('src', 'dst',(contribs.rank / contribs.outdegrees).alias('rank')).groupBy('dst').sum().withColumnRenamed('dst', 'src').withColumnRenamed('sum(rank)', 'rank')
    ranks = contribs.select('src', (contribs.rank * 0.85 + 0.15).alias('rank'))

ranks.orderBy(desc('rank')).show()
+---+------------------+
|src|              rank|
+---+------------------+
|  1|1.2981882732854677|
|  4|0.9999999999999998|
|  3|0.9999999999999998|
|  2|0.7018117267145316|
+---+------------------+