!pip install pyspark
Collecting pyspark
[?25l Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
[K |████████████████████████████████| 204.2MB 62kB/s
[?25hCollecting py4j==0.10.9
[?25l Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K |████████████████████████████████| 204kB 38.1MB/s
[?25hBuilding wheels for collected packages: pyspark
Building wheel for pyspark (setup.py) ... [?25l[?25hdone
Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612243 sha256=579f4f3993e28b1ba393be6ffc555dc3c75b4997f72e1ad9bd8ec34f37910075
Stored in directory: /root/.cache/pip/wheels/5e/bd/07/031766ca628adec8435bb40f0bd83bb676ce65ff4007f8e73f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
df = spark.read.csv('/content/drive/My Drive/课程/HKUST/MSBD5003/homeworks/hw2/sales.csv', header=True, inferSchema=True)
df.show()
+----------------+--------+-----+------------+-----------------+--------------------+--------------+--------------+----------------+----------------+-----------+----------+
|Transaction_date| Product|Price|Payment_Type| Name| City| State| Country| Account_Created| Last_Login| Latitude| Longitude|
+----------------+--------+-----+------------+-----------------+--------------------+--------------+--------------+----------------+----------------+-----------+----------+
| 01/02/2009 6:17|Product1| 1200| Mastercard| carolina| Basildon| England|United Kingdom| 01/02/2009 6:00| 01/02/2009 6:08| 51.5|-1.1166667|
| 01/02/2009 4:53|Product1| 1200| Visa| Betina|Parkville ...| MO| United States| 01/02/2009 4:42| 01/02/2009 7:49| 39.195| -94.68194|
|01/02/2009 13:08|Product1| 1200| Mastercard|Federica e Andrea|Astoria ...| OR| United States|01/01/2009 16:21|01/03/2009 12:32| 46.18806| -123.83|
|01/03/2009 14:44|Product1| 1200| Visa| Gouya| Echuca| Victoria| Australia| 9/25/05 21:13|01/03/2009 14:22|-36.1333333| 144.75|
|01/04/2009 12:56|Product2| 3600| Visa| Gerd W |Cahaba Heights ...| AL| United States| 11/15/08 15:47|01/04/2009 12:45| 33.52056| -86.8025|
|01/04/2009 13:19|Product1| 1200| Visa| LAURENCE|Mickleton ...| NJ| United States| 9/24/08 15:19|01/04/2009 13:04| 39.79| -75.23806|
|01/04/2009 20:11|Product1| 1200| Mastercard| Fleur|Peoria ...| IL| United States| 01/03/2009 9:38|01/04/2009 19:45| 40.69361| -89.58889|
|01/02/2009 20:09|Product1| 1200| Mastercard| adam|Martin ...| TN| United States|01/02/2009 17:43|01/04/2009 20:01| 36.34333| -88.85028|
|01/04/2009 13:17|Product1| 1200| Mastercard| Renee Elisabeth| Tel Aviv| Tel Aviv| Israel|01/04/2009 13:03|01/04/2009 22:10| 32.0666667|34.7666667|
|01/04/2009 14:11|Product1| 1200| Visa| Aidan| Chatou| Ile-de-France| France| 06/03/2008 4:22| 01/05/2009 1:17| 48.8833333| 2.15|
| 01/05/2009 2:42|Product1| 1200| Diners| Stacy|New York ...| NY| United States| 01/05/2009 2:23| 01/05/2009 4:59| 40.71417| -74.00639|
| 01/05/2009 5:39|Product1| 1200| Amex| Heidi| Eindhoven| Noord-Brabant| Netherlands| 01/05/2009 4:55| 01/05/2009 8:15| 51.45| 5.4666667|
| 01/02/2009 9:16|Product1| 1200| Mastercard| Sean |Shavano Park ...| TX| United States| 01/02/2009 8:32| 01/05/2009 9:05| 29.42389| -98.49333|
|01/05/2009 10:08|Product1| 1200| Visa| Georgia|Eagle ...| ID| United States|11/11/2008 15:53|01/05/2009 10:05| 43.69556|-116.35306|
|01/02/2009 14:18|Product1| 1200| Visa| Richard|Riverside ...| NJ| United States|12/09/2008 12:07|01/05/2009 11:01| 40.03222| -74.95778|
| 01/04/2009 1:05|Product1| 1200| Diners| Leanne| Julianstown| Meath| Ireland| 01/04/2009 0:00|01/05/2009 13:36| 53.6772222|-6.3191667|
|01/05/2009 11:37|Product1| 1200| Visa| Janet| Ottawa| Ontario| Canada| 01/05/2009 9:35|01/05/2009 19:24| 45.4166667| -75.7|
| 01/06/2009 5:02|Product1| 1200| Diners| barbara| Hyderabad|Andhra Pradesh| India| 01/06/2009 2:41| 01/06/2009 7:52| 17.3833333|78.4666667|
| 01/06/2009 7:45|Product2| 3600| Visa| Sabine| London| England|United Kingdom| 01/06/2009 7:00| 01/06/2009 9:17| 51.52721| 0.14559|
| 01/02/2009 7:35|Product1| 1200| Diners| Hani|Salt Lake City ...| UT| United States| 12/30/08 5:44|01/06/2009 10:52| 40.76083|-111.89028|
+----------------+--------+-----+------------+-----------------+--------------------+--------------+--------------+----------------+----------------+-----------+----------+
only showing top 20 rows
Question 1¶
Find all distinct countries.
Hint: use select(), distinct()
countries = df.select("Country").distinct()
countries.collect()
[Row(Country='Sweden'),
Row(Country='Jersey'),
Row(Country='Malaysia'),
Row(Country='Turkey'),
Row(Country='Germany'),
Row(Country='France'),
Row(Country='Belgium'),
Row(Country='Finland'),
Row(Country='United States'),
Row(Country='India'),
Row(Country='Kuwait'),
Row(Country='Malta'),
Row(Country='Italy'),
Row(Country='Norway'),
Row(Country='Spain'),
Row(Country='Denmark'),
Row(Country='Ireland'),
Row(Country='Israel'),
Row(Country='Iceland'),
Row(Country='South Korea'),
Row(Country='Switzerland'),
Row(Country='United Arab Emirates'),
Row(Country='Canada'),
Row(Country='Brazil'),
Row(Country='Luxembourg'),
Row(Country='New Zealand'),
Row(Country='Australia'),
Row(Country='Austria'),
Row(Country='South Africa'),
Row(Country='Bahrain'),
Row(Country='Hungary'),
Row(Country='United Kingdom'),
Row(Country='Moldova'),
Row(Country='Netherlands')]
Question 2¶
Find the Name and Price of sales records in Brazil.
Hint: use filter().
df.select("Country", "Price").filter("Country = 'Brazil' ").show()
+-------+-----+
|Country|Price|
+-------+-----+
| Brazil| 1200|
| Brazil| 7500|
+-------+-----+
Question 3¶
For each country, find the total Price.
Hint: Use groupBy()
df.groupBy('Country').sum('Price').withColumnRenamed('sum(Price)', "Total Price").show()
+-------------+-----------+
| Country|Total Price|
+-------------+-----------+
| Sweden| 8400|
| Jersey| 1200|
| Malaysia| 1200|
| Turkey| 2400|
| Germany| 22800|
| France| 30300|
| Belgium| 3600|
| Finland| 1200|
|United States| 350350|
| India| 2400|
| Kuwait| 1200|
| Malta| 3600|
| Italy| 2400|
| Norway| 12000|
| Spain| 2400|
| Denmark| 8400|
| Ireland| 29100|
| Israel| 1200|
| Iceland| 1200|
| South Korea| 1200|
+-------------+-----------+
only showing top 20 rows
Question 4¶
List countries by their total Price in descending order.
Hint: Use orderBy()
df.groupBy('Country').sum('Price').withColumnRenamed('sum(Price)', "Total Price").orderBy('Total Price', ascending=False).show()
+--------------------+-----------+
| Country|Total Price|
+--------------------+-----------+
| United States| 350350|
| United Kingdom| 63600|
| Canada| 42000|
| France| 30300|
| Ireland| 29100|
| Germany| 22800|
| Australia| 22800|
| Switzerland| 19200|
| Netherlands| 14400|
| Norway| 12000|
| Brazil| 8700|
| Denmark| 8400|
| Sweden| 8400|
| Austria| 3600|
| South Africa| 3600|
| Malta| 3600|
| Belgium| 3600|
|United Arab Emirates| 3600|
| Turkey| 2400|
| New Zealand| 2400|
+--------------------+-----------+
only showing top 20 rows
Question 5¶
Redo Question 3, but replace the country names by their IDs.
Hint: Use join()
df2 = spark.read.csv('/content/drive/My Drive/课程/HKUST/MSBD5003/homeworks/hw2/countries.csv', header=True, inferSchema=True)
df2.show()
+--------------+---+
| Country| ID|
+--------------+---+
|United Kingdom| 1|
| United States| 2|
| Australia| 3|
| Israel| 4|
| France| 5|
| Netherlands| 6|
| Ireland| 7|
| Canada| 8|
| India| 9|
| South Africa| 10|
| Finland| 11|
| Switzerland| 12|
| Denmark| 13|
| Belgium| 14|
| Sweden| 15|
| Norway| 16|
| Luxembourg| 17|
| Italy| 18|
| Germany| 19|
| Moldova| 20|
+--------------+---+
only showing top 20 rows
df.join(df2, 'Country').groupBy('ID').sum('Price').withColumnRenamed('sum(Price)', "Total Price").show()
+---+-----------+
| ID|Total Price|
+---+-----------+
| 31| 1200|
| 34| 2400|
| 28| 3600|
| 26| 3600|
| 27| 1200|
| 12| 19200|
| 22| 3600|
| 1| 63600|
| 13| 8400|
| 6| 14400|
| 16| 12000|
| 3| 22800|
| 20| 1200|
| 5| 30300|
| 19| 22800|
| 15| 8400|
| 9| 2400|
| 17| 1200|
| 4| 1200|
| 8| 42000|
+---+-----------+
only showing top 20 rows
Question 6¶
Rewrite the PageRank example using DataFrame API. Here is a skeleton of the code. Your job is to fill in the missing part. The data files can be downloaded at:
from pyspark.sql.functions import *
numOfIterations = 10
lines = spark.read.text("pagerank_data.txt")
# You can also test your program on the follow larger data set:
# lines = spark.read.text("dblp.in")
a = lines.select(split(lines[0],' '))
links = a.select(a[0][0].alias('src'), a[0][1].alias('dst'))
outdegrees = links.groupBy('src').count()
ranks = outdegrees.select('src', lit(1).alias('rank'))
for iteration in range(numOfIterations):
# FILL IN THIS PART
ranks.orderBy(desc('rank')).show()
lines = spark.read.text("/content/drive/My Drive/课程/HKUST/MSBD5003/homeworks/hw2/pagerank_data.txt")
lines.show()
+-----+
|value|
+-----+
| 1 2|
| 1 3|
| 2 3|
| 3 4|
| 4 1|
| 2 1|
+-----+
from pyspark.sql.functions import *
from pyspark.sql.types import FloatType
from operator import add
numOfIterations = 10
# You can also test your program on the follow larger data set:
# lines = spark.read.text("dblp.in")
a = lines.select(split(lines[0],' '))
links = a.select(a[0][0].alias('src'), a[0][1].alias('dst'))
outdegrees = links.groupBy('src').count()
ranks = outdegrees.select('src', lit(1).alias('rank'))
for iteration in range(numOfIterations):
contribs = links.join(ranks, 'src').join(outdegrees, 'src').withColumnRenamed('count', 'outdegrees')
contribs = contribs.select('src', 'dst',(contribs.rank / contribs.outdegrees).alias('rank')).groupBy('dst').sum().withColumnRenamed('dst', 'src').withColumnRenamed('sum(rank)', 'rank')
ranks = contribs.select('src', (contribs.rank * 0.85 + 0.15).alias('rank'))
ranks.orderBy(desc('rank')).show()
+---+------------------+
|src| rank|
+---+------------------+
| 1|1.2981882732854677|
| 4|0.9999999999999998|
| 3|0.9999999999999998|
| 2|0.7018117267145316|
+---+------------------+