Skip to content

Sparksql

!pip install pyspark
Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
     |████████████████████████████████| 204.2MB 56kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
     |████████████████████████████████| 204kB 39.0MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612243 sha256=2f5ff611b6f601d04626ac58c802aeb82b7faa0f99da467b2288b8fc05a7d419
  Stored in directory: /root/.cache/pip/wheels/5e/bd/07/031766ca628adec8435bb40f0bd83bb676ce65ff4007f8e73f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

Dataframe operations

from pyspark.sql import Row


row = Row(name="Alice", age=11)
print(row)
print(row['name'], row['age'])
print(row.name, row.age)

row = Row(name="Alice", age=11, count=1)
print(row.count)
print(row['count'])
Row(name='Alice', age=11)
Alice 11
Alice 11
<built-in method count of Row object at 0x7f3384ce6e08>
1
!wget https://www.cse.ust.hk/msbd5003/data/building.csv
--2020-12-09 02:43:31--  https://www.cse.ust.hk/msbd5003/data/building.csv
Resolving www.cse.ust.hk (www.cse.ust.hk)... 143.89.40.27
Connecting to www.cse.ust.hk (www.cse.ust.hk)|143.89.40.27|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 544 [text/plain]
Saving to: ‘building.csv’

building.csv        100%[===================>]     544  --.-KB/s    in 0s

2020-12-09 02:43:33 (25.1 MB/s) - ‘building.csv’ saved [544/544]
# Data file at https://www.cse.ust.hk/msbd5003/data/building.csv

df = spark.read.csv('building.csv', header=True, inferSchema=True)
# show the content of the dataframe
df.show()
+----------+-----------+-----------+-----------+------------+
|BuildingID|BuildingMgr|BuildingAge|HVACproduct|     Country|
+----------+-----------+-----------+-----------+------------+
|         1|         M1|         25|     AC1000|         USA|
|         2|         M2|         27|     FN39TG|      France|
|         3|         M3|         28|     JDNS77|      Brazil|
|         4|         M4|         17|     GG1919|     Finland|
|         5|         M5|          3|    ACMAX22|   Hong Kong|
|         6|         M6|          9|     AC1000|   Singapore|
|         7|         M7|         13|     FN39TG|South Africa|
|         8|         M8|         25|     JDNS77|   Australia|
|         9|         M9|         11|     GG1919|      Mexico|
|        10|        M10|         23|    ACMAX22|       China|
|        11|        M11|         14|     AC1000|     Belgium|
|        12|        M12|         26|     FN39TG|     Finland|
|        13|        M13|         25|     JDNS77|Saudi Arabia|
|        14|        M14|         17|     GG1919|     Germany|
|        15|        M15|         19|    ACMAX22|      Israel|
|        16|        M16|         23|     AC1000|      Turkey|
|        17|        M17|         11|     FN39TG|       Egypt|
|        18|        M18|         25|     JDNS77|   Indonesia|
|        19|        M19|         14|     GG1919|      Canada|
|        20|        M20|         19|    ACMAX22|   Argentina|
+----------+-----------+-----------+-----------+------------+
# Print the dataframe schema in a tree format
df.printSchema()
root
 |-- BuildingID: integer (nullable = true)
 |-- BuildingMgr: string (nullable = true)
 |-- BuildingAge: integer (nullable = true)
 |-- HVACproduct: string (nullable = true)
 |-- Country: string (nullable = true)
# Create an RDD from the dataframe
dfrdd = df.rdd
dfrdd.take(3)
[Row(BuildingID=1, BuildingMgr='M1', BuildingAge=25, HVACproduct='AC1000', Country='USA'),
 Row(BuildingID=2, BuildingMgr='M2', BuildingAge=27, HVACproduct='FN39TG', Country='France'),
 Row(BuildingID=3, BuildingMgr='M3', BuildingAge=28, HVACproduct='JDNS77', Country='Brazil')]
# Retrieve specific columns from the dataframe
df.select('BuildingID', 'Country').show()
+----------+------------+
|BuildingID|     Country|
+----------+------------+
|         1|         USA|
|         2|      France|
|         3|      Brazil|
|         4|     Finland|
|         5|   Hong Kong|
|         6|   Singapore|
|         7|South Africa|
|         8|   Australia|
|         9|      Mexico|
|        10|       China|
|        11|     Belgium|
|        12|     Finland|
|        13|Saudi Arabia|
|        14|     Germany|
|        15|      Israel|
|        16|      Turkey|
|        17|       Egypt|
|        18|   Indonesia|
|        19|      Canada|
|        20|   Argentina|
+----------+------------+
from pyspark.sql.functions import *

df.where("Country<'USA'").select('BuildingID', lit('OK')).show()
+----------+---+
|BuildingID| OK|
+----------+---+
|         2| OK|
|         3| OK|
|         4| OK|
|         5| OK|
|         6| OK|
|         7| OK|
|         8| OK|
|         9| OK|
|        10| OK|
|        11| OK|
|        12| OK|
|        13| OK|
|        14| OK|
|        15| OK|
|        16| OK|
|        17| OK|
|        18| OK|
|        19| OK|
|        20| OK|
+----------+---+
# Use GroupBy clause with dataframe 
df.groupBy('HVACProduct').count().show()
+-----------+-----+
|HVACProduct|count|
+-----------+-----+
|    ACMAX22|    4|
|     AC1000|    4|
|     JDNS77|    4|
|     FN39TG|    4|
|     GG1919|    4|
+-----------+-----+
!wget https://www.cse.ust.hk/msbd5003/data/Customer.csv
!wget https://www.cse.ust.hk/msbd5003/data/Product.csv
!wget https://www.cse.ust.hk/msbd5003/data/SalesOrderDetail.csv
!wget https://www.cse.ust.hk/msbd5003/data/SalesOrderHeader.csv
--2020-12-09 03:07:42--  https://www.cse.ust.hk/msbd5003/data/Customer.csv
Resolving www.cse.ust.hk (www.cse.ust.hk)... 143.89.40.27
Connecting to www.cse.ust.hk (www.cse.ust.hk)|143.89.40.27|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 199491 (195K) [text/plain]
Saving to: ‘Customer.csv’

Customer.csv        100%[===================>] 194.82K   242KB/s    in 0.8s

2020-12-09 03:07:44 (242 KB/s) - ‘Customer.csv’ saved [199491/199491]

--2020-12-09 03:07:44--  https://www.cse.ust.hk/msbd5003/data/Product.csv
Resolving www.cse.ust.hk (www.cse.ust.hk)... 143.89.40.27
Connecting to www.cse.ust.hk (www.cse.ust.hk)|143.89.40.27|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1355634 (1.3M) [text/plain]
Saving to: ‘Product.csv’

Product.csv         100%[===================>]   1.29M  1.06MB/s    in 1.2s

2020-12-09 03:07:46 (1.06 MB/s) - ‘Product.csv’ saved [1355634/1355634]

--2020-12-09 03:07:46--  https://www.cse.ust.hk/msbd5003/data/SalesOrderDetail.csv
Resolving www.cse.ust.hk (www.cse.ust.hk)... 143.89.40.27
Connecting to www.cse.ust.hk (www.cse.ust.hk)|143.89.40.27|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 56766 (55K) [text/plain]
Saving to: ‘SalesOrderDetail.csv’

SalesOrderDetail.cs 100%[===================>]  55.44K   137KB/s    in 0.4s

2020-12-09 03:07:47 (137 KB/s) - ‘SalesOrderDetail.csv’ saved [56766/56766]

--2020-12-09 03:07:47--  https://www.cse.ust.hk/msbd5003/data/SalesOrderHeader.csv
Resolving www.cse.ust.hk (www.cse.ust.hk)... 143.89.40.27
Connecting to www.cse.ust.hk (www.cse.ust.hk)|143.89.40.27|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 8680 (8.5K) [text/plain]
Saving to: ‘SalesOrderHeader.csv’

SalesOrderHeader.cs 100%[===================>]   8.48K  --.-KB/s    in 0s

2020-12-09 03:07:49 (131 MB/s) - ‘SalesOrderHeader.csv’ saved [8680/8680]

Rewriting SQL with DataFrame API

# Load data from csv files
# Data files at https://www.cse.ust.hk/msbd5003/data

dfCustomer = spark.read.csv('Customer.csv', header=True, inferSchema=True)
dfProduct = spark.read.csv('Product.csv', header=True, inferSchema=True)
dfDetail = spark.read.csv('SalesOrderDetail.csv', header=True, inferSchema=True)
dfHeader = spark.read.csv('SalesOrderHeader.csv', header=True, inferSchema=True)
# SELECT ProductID, Name, ListPrice 
# FROM Product 
# WHERE Color = 'black'

dfProduct.filter("Color = 'Black'")\
         .select('ProductID', 'Name', 'ListPrice')\
         .show(truncate=False)
+---------+-----------------------------+---------+
|ProductID|Name                         |ListPrice|
+---------+-----------------------------+---------+
|680      |HL Road Frame - Black, 58    |1431.5   |
|708      |Sport-100 Helmet, Black      |34.99    |
|722      |LL Road Frame - Black, 58    |337.22   |
|723      |LL Road Frame - Black, 60    |337.22   |
|724      |LL Road Frame - Black, 62    |337.22   |
|736      |LL Road Frame - Black, 44    |337.22   |
|737      |LL Road Frame - Black, 48    |337.22   |
|738      |LL Road Frame - Black, 52    |337.22   |
|743      |HL Mountain Frame - Black, 42|1349.6   |
|744      |HL Mountain Frame - Black, 44|1349.6   |
|745      |HL Mountain Frame - Black, 48|1349.6   |
|746      |HL Mountain Frame - Black, 46|1349.6   |
|747      |HL Mountain Frame - Black, 38|1349.6   |
|765      |Road-650 Black, 58           |782.99   |
|766      |Road-650 Black, 60           |782.99   |
|767      |Road-650 Black, 62           |782.99   |
|768      |Road-650 Black, 44           |782.99   |
|769      |Road-650 Black, 48           |782.99   |
|770      |Road-650 Black, 52           |782.99   |
|775      |Mountain-100 Black, 38       |3374.99  |
+---------+-----------------------------+---------+
only showing top 20 rows
dfProduct.where(dfProduct.Color=='Black') \
         .select(dfProduct.ProductID, dfProduct['Name'], (dfProduct['ListPrice'] * 2).alias('Double price')) \
         .show(truncate=False)
+---------+-----------------------------+------------+
|ProductID|Name                         |Double price|
+---------+-----------------------------+------------+
|680      |HL Road Frame - Black, 58    |2863.0      |
|708      |Sport-100 Helmet, Black      |69.98       |
|722      |LL Road Frame - Black, 58    |674.44      |
|723      |LL Road Frame - Black, 60    |674.44      |
|724      |LL Road Frame - Black, 62    |674.44      |
|736      |LL Road Frame - Black, 44    |674.44      |
|737      |LL Road Frame - Black, 48    |674.44      |
|738      |LL Road Frame - Black, 52    |674.44      |
|743      |HL Mountain Frame - Black, 42|2699.2      |
|744      |HL Mountain Frame - Black, 44|2699.2      |
|745      |HL Mountain Frame - Black, 48|2699.2      |
|746      |HL Mountain Frame - Black, 46|2699.2      |
|747      |HL Mountain Frame - Black, 38|2699.2      |
|765      |Road-650 Black, 58           |1565.98     |
|766      |Road-650 Black, 60           |1565.98     |
|767      |Road-650 Black, 62           |1565.98     |
|768      |Road-650 Black, 44           |1565.98     |
|769      |Road-650 Black, 48           |1565.98     |
|770      |Road-650 Black, 52           |1565.98     |
|775      |Mountain-100 Black, 38       |6749.98     |
+---------+-----------------------------+------------+
only showing top 20 rows
dfProduct.where(dfProduct.ListPrice * 2 > 100) \
         .select(dfProduct.ProductID, dfProduct['Name'], dfProduct.ListPrice * 2) \
         .show(truncate=False)
+---------+-------------------------+---------------+
|ProductID|Name                     |(ListPrice * 2)|
+---------+-------------------------+---------------+
|680      |HL Road Frame - Black, 58|2863.0         |
|706      |HL Road Frame - Red, 58  |2863.0         |
|717      |HL Road Frame - Red, 62  |2863.0         |
|718      |HL Road Frame - Red, 44  |2863.0         |
|719      |HL Road Frame - Red, 48  |2863.0         |
|720      |HL Road Frame - Red, 52  |2863.0         |
|721      |HL Road Frame - Red, 56  |2863.0         |
|722      |LL Road Frame - Black, 58|674.44         |
|723      |LL Road Frame - Black, 60|674.44         |
|724      |LL Road Frame - Black, 62|674.44         |
|725      |LL Road Frame - Red, 44  |674.44         |
|726      |LL Road Frame - Red, 48  |674.44         |
|727      |LL Road Frame - Red, 52  |674.44         |
|728      |LL Road Frame - Red, 58  |674.44         |
|729      |LL Road Frame - Red, 60  |674.44         |
|730      |LL Road Frame - Red, 62  |674.44         |
|731      |ML Road Frame - Red, 44  |1189.66        |
|732      |ML Road Frame - Red, 48  |1189.66        |
|733      |ML Road Frame - Red, 52  |1189.66        |
|734      |ML Road Frame - Red, 58  |1189.66        |
+---------+-------------------------+---------------+
only showing top 20 rows
# SELECT ProductID, Name, ListPrice 
# FROM Product 
# WHERE Color = 'black' 
# ORDER BY ProductID

dfProduct.filter("Color = 'Black'")\
         .select('ProductID', 'Name', 'ListPrice')\
         .orderBy('ListPrice')\
         .show(truncate=False)
+---------+--------------------------+---------+
|ProductID|Name                      |ListPrice|
+---------+--------------------------+---------+
|860      |Half-Finger Gloves, L     |24.49    |
|859      |Half-Finger Gloves, M     |24.49    |
|858      |Half-Finger Gloves, S     |24.49    |
|708      |Sport-100 Helmet, Black   |34.99    |
|862      |Full-Finger Gloves, M     |37.99    |
|861      |Full-Finger Gloves, S     |37.99    |
|863      |Full-Finger Gloves, L     |37.99    |
|841      |Men's Sports Shorts, S    |59.99    |
|849      |Men's Sports Shorts, M    |59.99    |
|851      |Men's Sports Shorts, XL   |59.99    |
|850      |Men's Sports Shorts, L    |59.99    |
|815      |LL Mountain Front Wheel   |60.745   |
|868      |Women's Mountain Shorts, M|69.99    |
|869      |Women's Mountain Shorts, L|69.99    |
|867      |Women's Mountain Shorts, S|69.99    |
|853      |Women's Tights, M         |74.99    |
|854      |Women's Tights, L         |74.99    |
|852      |Women's Tights, S         |74.99    |
|818      |LL Road Front Wheel       |85.565   |
|823      |LL Mountain Rear Wheel    |87.745   |
+---------+--------------------------+---------+
only showing top 20 rows
# Find all orders and details on black product,
# return the product SalesOrderID, SalesOrderDetailID, Name, UnitPrice, and OrderQty

# SELECT SalesOrderID, SalesOrderDetailID, Name, UnitPrice, OrderQty 
# FROM SalesLT.SalesOrderDetail, SalesLT.Product
# WHERE SalesOrderDetail.ProductID = Product.ProductID AND Color = 'Black'

# SELECT SalesOrderID, SalesOrderDetailID, Name, UnitPrice, OrderQty 
# FROM SalesLT.SalesOrderDetail
# JOIN SalesLT.Product ON SalesOrderDetail.ProductID = Product.ProductID
# WHERE Color = 'Black'

# Spark SQL supports natural joins

dfDetail.join(dfProduct, 'ProductID') \
        .select('SalesOrderID', 'SalesOrderDetailID', 'Name', 'UnitPrice', 'OrderQty') \
        .filter("Color='Black'") \
        .show()

# If we move the filter to after select, it still works.  Why?
+------------+------------------+--------------------+---------+--------+
|SalesOrderID|SalesOrderDetailID|                Name|UnitPrice|OrderQty|
+------------+------------------+--------------------+---------+--------+
|       71938|            113295|Sport-100 Helmet,...|   29.994|       5|
|       71902|            112988|Sport-100 Helmet,...|   20.994|       4|
|       71797|            111082|Sport-100 Helmet,...|  20.2942|      12|
|       71784|            110795|Sport-100 Helmet,...|  20.2942|      12|
|       71783|            110752|Sport-100 Helmet,...|  20.2942|      11|
|       71782|            110690|Sport-100 Helmet,...|   20.994|       7|
|       71797|            111045|LL Road Frame - B...|  202.332|       3|
|       71783|            110730|LL Road Frame - B...|  202.332|       6|
|       71938|            113297|LL Road Frame - B...|  202.332|       3|
|       71915|            113090|LL Road Frame - B...|  202.332|       2|
|       71815|            111451|LL Road Frame - B...|  202.332|       1|
|       71797|            111044|LL Road Frame - B...|  202.332|       1|
|       71783|            110710|LL Road Frame - B...|  202.332|       4|
|       71936|            113260|HL Mountain Frame...|   809.76|       4|
|       71899|            112937|HL Mountain Frame...|   809.76|       1|
|       71845|            112137|HL Mountain Frame...|   809.76|       2|
|       71832|            111806|HL Mountain Frame...|   809.76|       4|
|       71780|            110622|HL Mountain Frame...|   809.76|       1|
|       71936|            113235|HL Mountain Frame...|   809.76|       4|
|       71845|            112134|HL Mountain Frame...|   809.76|       3|
+------------+------------------+--------------------+---------+--------+
only showing top 20 rows
# This also works:

d1 = dfDetail.join(dfProduct, 'ProductID') \
             .select('SalesOrderID', 'SalesOrderDetailID', 'Name', 'UnitPrice', 'OrderQty')
d1.show()
d2 = d1.filter("Color = 'Black'")
d2.show()
d2.explain()
+------------+------------------+--------------------+---------+--------+
|SalesOrderID|SalesOrderDetailID|                Name|UnitPrice|OrderQty|
+------------+------------------+--------------------+---------+--------+
|       71938|            113278|Sport-100 Helmet,...|   20.994|       3|
|       71936|            113228|Sport-100 Helmet,...|   20.994|       1|
|       71902|            112980|Sport-100 Helmet,...|   20.994|       2|
|       71797|            111075|Sport-100 Helmet,...|   20.994|       6|
|       71784|            110794|Sport-100 Helmet,...|   20.994|      10|
|       71783|            110751|Sport-100 Helmet,...|   20.994|      10|
|       71782|            110709|Sport-100 Helmet,...|   20.994|       3|
|       71938|            113295|Sport-100 Helmet,...|   29.994|       5|
|       71902|            112988|Sport-100 Helmet,...|   20.994|       4|
|       71797|            111082|Sport-100 Helmet,...|  20.2942|      12|
|       71784|            110795|Sport-100 Helmet,...|  20.2942|      12|
|       71783|            110752|Sport-100 Helmet,...|  20.2942|      11|
|       71782|            110690|Sport-100 Helmet,...|   20.994|       7|
|       71938|            113282|Sport-100 Helmet,...|   20.994|       3|
|       71902|            112995|Sport-100 Helmet,...|   20.994|       7|
|       71863|            112395|Sport-100 Helmet,...|   20.994|       1|
|       71797|            111038|Sport-100 Helmet,...|   20.994|       4|
|       71784|            110753|Sport-100 Helmet,...|   20.994|       2|
|       71783|            110749|Sport-100 Helmet,...|  19.2445|      15|
|       71782|            110708|Sport-100 Helmet,...|   20.994|       6|
+------------+------------------+--------------------+---------+--------+
only showing top 20 rows

+------------+------------------+--------------------+---------+--------+
|SalesOrderID|SalesOrderDetailID|                Name|UnitPrice|OrderQty|
+------------+------------------+--------------------+---------+--------+
|       71938|            113295|Sport-100 Helmet,...|   29.994|       5|
|       71902|            112988|Sport-100 Helmet,...|   20.994|       4|
|       71797|            111082|Sport-100 Helmet,...|  20.2942|      12|
|       71784|            110795|Sport-100 Helmet,...|  20.2942|      12|
|       71783|            110752|Sport-100 Helmet,...|  20.2942|      11|
|       71782|            110690|Sport-100 Helmet,...|   20.994|       7|
|       71797|            111045|LL Road Frame - B...|  202.332|       3|
|       71783|            110730|LL Road Frame - B...|  202.332|       6|
|       71938|            113297|LL Road Frame - B...|  202.332|       3|
|       71915|            113090|LL Road Frame - B...|  202.332|       2|
|       71815|            111451|LL Road Frame - B...|  202.332|       1|
|       71797|            111044|LL Road Frame - B...|  202.332|       1|
|       71783|            110710|LL Road Frame - B...|  202.332|       4|
|       71936|            113260|HL Mountain Frame...|   809.76|       4|
|       71899|            112937|HL Mountain Frame...|   809.76|       1|
|       71845|            112137|HL Mountain Frame...|   809.76|       2|
|       71832|            111806|HL Mountain Frame...|   809.76|       4|
|       71780|            110622|HL Mountain Frame...|   809.76|       1|
|       71936|            113235|HL Mountain Frame...|   809.76|       4|
|       71845|            112134|HL Mountain Frame...|   809.76|       3|
+------------+------------------+--------------------+---------+--------+
only showing top 20 rows

== Physical Plan ==
*(2) Project [SalesOrderID#217, SalesOrderDetailID#218, Name#168, UnitPrice#221, OrderQty#219]
+- *(2) BroadcastHashJoin [ProductID#220], [ProductID#167], Inner, BuildLeft
   :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[3, int, true] as bigint))), [id=#380]
   :  +- *(1) Project [SalesOrderID#217, SalesOrderDetailID#218, OrderQty#219, ProductID#220, UnitPrice#221]
   :     +- *(1) Filter isnotnull(ProductID#220)
   :        +- FileScan csv [SalesOrderID#217,SalesOrderDetailID#218,OrderQty#219,ProductID#220,UnitPrice#221] Batched: false, DataFilters: [isnotnull(ProductID#220)], Format: CSV, Location: InMemoryFileIndex[file:/content/SalesOrderDetail.csv], PartitionFilters: [], PushedFilters: [IsNotNull(ProductID)], ReadSchema: struct<SalesOrderID:int,SalesOrderDetailID:int,OrderQty:int,ProductID:int,UnitPrice:double>
   +- *(2) Project [ProductID#167, Name#168]
      +- *(2) Filter ((isnotnull(Color#170) AND (Color#170 = Black)) AND isnotnull(ProductID#167))
         +- FileScan csv [ProductID#167,Name#168,Color#170] Batched: false, DataFilters: [isnotnull(Color#170), (Color#170 = Black), isnotnull(ProductID#167)], Format: CSV, Location: InMemoryFileIndex[file:/content/Product.csv], PartitionFilters: [], PushedFilters: [IsNotNull(Color), EqualTo(Color,Black), IsNotNull(ProductID)], ReadSchema: struct<ProductID:int,Name:string,Color:string>
# SparkSQL performs optimization depending on whether intermediate dataframe are cached or not:

d1 = dfDetail.join(dfProduct, 'ProductID') \
             .select('SalesOrderID', 'SalesOrderDetailID', 'Name', 'UnitPrice', 'OrderQty')
d1.persist()
d1.show()
+------------+------------------+--------------------+---------+--------+
|SalesOrderID|SalesOrderDetailID|                Name|UnitPrice|OrderQty|
+------------+------------------+--------------------+---------+--------+
|       71938|            113278|Sport-100 Helmet,...|   20.994|       3|
|       71936|            113228|Sport-100 Helmet,...|   20.994|       1|
|       71902|            112980|Sport-100 Helmet,...|   20.994|       2|
|       71797|            111075|Sport-100 Helmet,...|   20.994|       6|
|       71784|            110794|Sport-100 Helmet,...|   20.994|      10|
|       71783|            110751|Sport-100 Helmet,...|   20.994|      10|
|       71782|            110709|Sport-100 Helmet,...|   20.994|       3|
|       71938|            113295|Sport-100 Helmet,...|   20.994|       5|
|       71902|            112988|Sport-100 Helmet,...|   20.994|       4|
|       71797|            111082|Sport-100 Helmet,...|  20.2942|      12|
|       71784|            110795|Sport-100 Helmet,...|  20.2942|      12|
|       71783|            110752|Sport-100 Helmet,...|  20.2942|      11|
|       71782|            110690|Sport-100 Helmet,...|   20.994|       7|
|       71938|            113282|Sport-100 Helmet,...|   20.994|       3|
|       71902|            112995|Sport-100 Helmet,...|   20.994|       7|
|       71863|            112395|Sport-100 Helmet,...|   20.994|       1|
|       71797|            111038|Sport-100 Helmet,...|   20.994|       4|
|       71784|            110753|Sport-100 Helmet,...|   20.994|       2|
|       71783|            110749|Sport-100 Helmet,...|  19.2445|      15|
|       71782|            110708|Sport-100 Helmet,...|   20.994|       6|
+------------+------------------+--------------------+---------+--------+
only showing top 20 rows
d2 = d1.filter("Color = 'Black'")
#d2 = d1.filter("OrderQty >= 10")
d2.show()
d2.explain()
+------------+------------------+--------------------+---------+--------+
|SalesOrderID|SalesOrderDetailID|                Name|UnitPrice|OrderQty|
+------------+------------------+--------------------+---------+--------+
|       71938|            113295|Sport-100 Helmet,...|   29.994|       5|
|       71902|            112988|Sport-100 Helmet,...|   20.994|       4|
|       71797|            111082|Sport-100 Helmet,...|  20.2942|      12|
|       71784|            110795|Sport-100 Helmet,...|  20.2942|      12|
|       71783|            110752|Sport-100 Helmet,...|  20.2942|      11|
|       71782|            110690|Sport-100 Helmet,...|   20.994|       7|
|       71797|            111045|LL Road Frame - B...|  202.332|       3|
|       71783|            110730|LL Road Frame - B...|  202.332|       6|
|       71938|            113297|LL Road Frame - B...|  202.332|       3|
|       71915|            113090|LL Road Frame - B...|  202.332|       2|
|       71815|            111451|LL Road Frame - B...|  202.332|       1|
|       71797|            111044|LL Road Frame - B...|  202.332|       1|
|       71783|            110710|LL Road Frame - B...|  202.332|       4|
|       71936|            113260|HL Mountain Frame...|   809.76|       4|
|       71899|            112937|HL Mountain Frame...|   809.76|       1|
|       71845|            112137|HL Mountain Frame...|   809.76|       2|
|       71832|            111806|HL Mountain Frame...|   809.76|       4|
|       71780|            110622|HL Mountain Frame...|   809.76|       1|
|       71936|            113235|HL Mountain Frame...|   809.76|       4|
|       71845|            112134|HL Mountain Frame...|   809.76|       3|
+------------+------------------+--------------------+---------+--------+
only showing top 20 rows

== Physical Plan ==
*(2) Project [SalesOrderID#112, SalesOrderDetailID#113, Name#63, UnitPrice#116, OrderQty#114]
+- *(2) BroadcastHashJoin [ProductID#115], [ProductID#62], Inner, BuildLeft
   :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[3, int, true] as bigint))), [id=#280]
   :  +- *(1) Project [SalesOrderID#112, SalesOrderDetailID#113, OrderQty#114, ProductID#115, UnitPrice#116]
   :     +- *(1) Filter isnotnull(ProductID#115)
   :        +- FileScan csv [SalesOrderID#112,SalesOrderDetailID#113,OrderQty#114,ProductID#115,UnitPrice#116] Batched: false, DataFilters: [isnotnull(ProductID#115)], Format: CSV, Location: InMemoryFileIndex[file:/csproject/msbd5003/public_html/data/SalesOrderDetail.csv], PartitionFilters: [], PushedFilters: [IsNotNull(ProductID)], ReadSchema: struct<SalesOrderID:int,SalesOrderDetailID:int,OrderQty:int,ProductID:int,UnitPrice:double>
   +- *(2) Project [ProductID#62, Name#63]
      +- *(2) Filter ((isnotnull(Color#65) AND (Color#65 = Black)) AND isnotnull(ProductID#62))
         +- FileScan csv [ProductID#62,Name#63,Color#65] Batched: false, DataFilters: [isnotnull(Color#65), (Color#65 = Black), isnotnull(ProductID#62)], Format: CSV, Location: InMemoryFileIndex[file:/csproject/msbd5003/public_html/data/Product.csv], PartitionFilters: [], PushedFilters: [IsNotNull(Color), EqualTo(Color,Black), IsNotNull(ProductID)], ReadSchema: struct<ProductID:int,Name:string,Color:string>
# This will report an error:

d1 = dfDetail.join(dfProduct, 'ProductID') \
             .select('SalesOrderID', 'SalesOrderDetailID', 'Name', 'UnitPrice', 'OrderQty')
d1.write.csv('temp.csv', mode = 'overwrite', header = True)
d2 = spark.read.csv('temp.csv', header = True, inferSchema = True)
d2.filter("Color = 'Black'").show()
---------------------------------------------------------------------------

AnalysisException                         Traceback (most recent call last)

<ipython-input-18-168595bb0376> in <module>
      5 d1.write.csv('temp.csv', mode = 'overwrite', header = True)
      6 d2 = spark.read.csv('temp.csv', header = True, inferSchema = True)
----> 7 d2.filter("Color = 'Black'").show()


/csproject/msbd5003/python/pyspark/sql/dataframe.py in filter(self, condition)
   1457         """
   1458         if isinstance(condition, basestring):
-> 1459             jdf = self._jdf.filter(condition)
   1460         elif isinstance(condition, Column):
   1461             jdf = self._jdf.filter(condition._jc)


/csproject/msbd5003/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:


/csproject/msbd5003/python/pyspark/sql/utils.py in deco(*a, **kw)
    135                 # Hide where the exception came from that shows a non-Pythonic
    136                 # JVM exception message.
--> 137                 raise_from(converted)
    138             else:
    139                 raise


/csproject/msbd5003/python/pyspark/sql/utils.py in raise_from(e)


AnalysisException: cannot resolve '`Color`' given input columns: [Name, OrderQty, SalesOrderDetailID, SalesOrderID, UnitPrice]; line 1 pos 0;
'Filter ('Color = Black)
+- Relation[SalesOrderID#580,SalesOrderDetailID#581,Name#582,UnitPrice#583,OrderQty#584] csv
# Find all orders that include at least one black product, 
# return the product SalesOrderID, Name, UnitPrice, and OrderQty

# SELECT DISTINCT SalesOrderID
# FROM SalesLT.SalesOrderDetail
# JOIN SalesLT.Product ON SalesOrderDetail.ProductID = Product.ProductID
# WHERE Color = 'Black'

dfDetail.join(dfProduct.filter("Color='Black'"), 'ProductID') \
        .select('SalesOrderID') \
        .distinct() \
        .show()
+------------+
|SalesOrderID|
+------------+
|       71902|
|       71832|
|       71915|
|       71831|
|       71898|
|       71935|
|       71938|
|       71845|
|       71783|
|       71815|
|       71936|
|       71863|
|       71780|
|       71782|
|       71899|
|       71784|
|       71797|
+------------+
# How many colors in the products?

# SELECT COUNT(DISTINCT Color)
# FROM SalesLT.Product

dfProduct.select('Color').distinct().count()

# It's 1 more than standard SQL.  In standard SQL, COUNT() does not count NULLs.
10
# Find the total price of each order, 
# return SalesOrderID and total price (column name should be ‘totalprice’)

# SELECT SalesOrderID, SUM(UnitPrice*OrderQty*(1-UnitPriceDiscount)) AS TotalPrice
# FROM SalesLT.SalesOrderDetail
# GROUP BY SalesOrderID

dfDetail.select('*', (dfDetail.UnitPrice * dfDetail.OrderQty
                      * (1 - dfDetail.UnitPriceDiscount)).alias('netprice'))\
        .groupBy('SalesOrderID').sum('netprice') \
        .withColumnRenamed('sum(netprice)', 'TotalPrice')\
        .show()
+------------+------------------+
|SalesOrderID|     sum(netprice)|
+------------+------------------+
|       71867|             858.9|
|       71902|59894.209199999976|
|       71832|      28950.678108|
|       71915|1732.8899999999999|
|       71946|            31.584|
|       71895|221.25600000000003|
|       71816|2847.4079999999994|
|       71831|          1712.946|
|       71923|         96.108824|
|       71858|11528.844000000001|
|       71917|            37.758|
|       71897|          10585.05|
|       71885|           524.664|
|       71856|500.30400000000003|
|       71898| 53248.69200000002|
|       71774|           713.796|
|       71796| 47848.02600000001|
|       71935|5533.8689079999995|
|       71938|         74160.228|
|       71845|        34118.5356|
+------------+------------------+
only showing top 20 rows
# Find the total price of each order where the total price > 10000

# SELECT SalesOrderID, SUM(UnitPrice*OrderQty*(1-UnitPriceDiscount)) AS TotalPrice
# FROM SalesLT.SalesOrderDetail
# GROUP BY SalesOrderID
# HAVING SUM(UnitPrice*OrderQty*(1-UnitPriceDiscount)) > 10000

dfDetail.select('*', (dfDetail.UnitPrice * dfDetail. OrderQty
                      * (1 - dfDetail.UnitPriceDiscount)).alias('netprice'))\
        .groupBy('SalesOrderID').sum('netprice') \
        .withColumnRenamed('sum(netprice)', 'TotalPrice')\
        .where('TotalPrice > 10000')\
        .show()
+------------+------------------+
|SalesOrderID|        TotalPrice|
+------------+------------------+
|       71902|59894.209199999976|
|       71832|      28950.678108|
|       71858|11528.844000000001|
|       71897|          10585.05|
|       71898| 53248.69200000002|
|       71796| 47848.02600000001|
|       71938|         74160.228|
|       71845|        34118.5356|
|       71783|      65683.367986|
|       71936| 79589.61602399996|
|       71780|29923.007999999998|
|       71782| 33319.98600000001|
|       71784| 89869.27631400003|
|       71797| 65123.46341800001|
+------------+------------------+
# Find the total price on the black products of each order where the total price > 10000

# SELECT SalesOrderID, SUM(UnitPrice*OrderQty*(1-UnitPriceDiscount)) AS TotalPrice
# FROM SalesLT.SalesOrderDetail, SalesLT.Product
# WHERE SalesLT.SalesOrderDetail.ProductID = SalesLT.Product.ProductID AND Color = 'Black'
# GROUP BY SalesOrderID
# HAVING SUM(UnitPrice*OrderQty*(1-UnitPriceDiscount)) > 10000

dfDetail.select('*', (dfDetail.UnitPrice * dfDetail. OrderQty
                      * (1 - dfDetail.UnitPriceDiscount)).alias('netprice'))\
        .join(dfProduct, 'ProductID') \
        .where("Color = 'Black'")\
        .groupBy('SalesOrderID').sum('netprice') \
        .withColumnRenamed('sum(netprice)', 'TotalPrice')\
        .where('TotalPrice > 10000')\
        .show()
+------------+------------------+
|SalesOrderID|        TotalPrice|
+------------+------------------+
|       71902|26677.883999999995|
|       71832|      16883.748108|
|       71938|         33779.448|
|       71845|         18109.836|
|       71783|15524.117476000003|
|       71936|      44490.290424|
|       71780|         16964.322|
|       71797|      27581.613792|
+------------+------------------+
# For each customer, find the total quantity of black products bought.
# Report CustomerID, FirstName, LastName, and total quantity

# select saleslt.customer.customerid, FirstName, LastName, sum(orderqty)
# from saleslt.customer
# left outer join 
# (
# saleslt.salesorderheader
# join saleslt.salesorderdetail
# on saleslt.salesorderdetail.salesorderid = saleslt.salesorderheader.salesorderid
# join saleslt.product
# on saleslt.product.productid = saleslt.salesorderdetail.productid and color = 'black'
# )
# on saleslt.customer.customerid = saleslt.salesorderheader.customerid
# group by saleslt.customer.customerid, FirstName, LastName
# order by sum(orderqty) desc

d1 = dfDetail.join(dfProduct, 'ProductID')\
             .where('Color = "Black"') \
             .join(dfHeader, 'SalesOrderID')\
             .groupBy('CustomerID').sum('OrderQty')
dfCustomer.join(d1, 'CustomerID', 'left_outer')\
          .select('CustomerID', 'FirstName', 'LastName', 'sum(OrderQty)')\
          .orderBy('sum(OrderQty)', ascending=False)\
          .show()
+----------+------------+------------+-------------+
|CustomerID|   FirstName|    LastName|sum(OrderQty)|
+----------+------------+------------+-------------+
|     30050|     Krishna|Sunkammurali|           89|
|     29796|         Jon|      Grande|           65|
|     29957|       Kevin|         Liu|           62|
|     29929|     Jeffrey|       Kurtz|           46|
|     29546| Christopher|        Beck|           45|
|     29922|      Pamala|        Kotc|           34|
|     30113|        Raja|   Venugopal|           34|
|     29938|       Frank|    Campbell|           29|
|     29736|       Terry|   Eminhizer|           23|
|     29485|   Catherine|        Abel|           10|
|     30019|     Matthew|      Miller|            9|
|     29932|     Rebecca|      Laszlo|            7|
|     29975|      Walter|        Mays|            5|
|     29638|    Rosmarie|     Carroll|            2|
|     29531|        Cory|       Booth|            1|
|     30089|Michael John|      Troyer|            1|
|     29568|      Donald|     Blanton|            1|
|     29868|      Denean|        Ison|         null|
|     29646|      Stacey|   Cereghino|         null|
|     29905|   Elizabeth|      Keyser|         null|
+----------+------------+------------+-------------+
only showing top 20 rows

Embed SQL queries

You can also run SQL queries over dataframes once you register them as temporary tables within the SparkSession.

# Register the dataframe as a temporary view called HVAC
df.createOrReplaceTempView('HVAC')
spark.sql('SELECT * FROM HVAC WHERE BuildingAge >= 10').show()
+----------+-----------+-----------+-----------+------------+
|BuildingID|BuildingMgr|BuildingAge|HVACproduct|     Country|
+----------+-----------+-----------+-----------+------------+
|         1|         M1|         25|     AC1000|         USA|
|         2|         M2|         27|     FN39TG|      France|
|         3|         M3|         28|     JDNS77|      Brazil|
|         4|         M4|         17|     GG1919|     Finland|
|         7|         M7|         13|     FN39TG|South Africa|
|         8|         M8|         25|     JDNS77|   Australia|
|         9|         M9|         11|     GG1919|      Mexico|
|        10|        M10|         23|    ACMAX22|       China|
|        11|        M11|         14|     AC1000|     Belgium|
|        12|        M12|         26|     FN39TG|     Finland|
|        13|        M13|         25|     JDNS77|Saudi Arabia|
|        14|        M14|         17|     GG1919|     Germany|
|        15|        M15|         19|    ACMAX22|      Israel|
|        16|        M16|         23|     AC1000|      Turkey|
|        17|        M17|         11|     FN39TG|       Egypt|
|        18|        M18|         25|     JDNS77|   Indonesia|
|        19|        M19|         14|     GG1919|      Canada|
|        20|        M20|         19|    ACMAX22|   Argentina|
+----------+-----------+-----------+-----------+------------+
# Can even mix DataFrame API with SQL:
df.where('BuildingAge >= 10').createOrReplaceTempView('OldBuildings')
spark.sql('SELECT HVACproduct, COUNT(*) FROM OldBuildings GROUP BY HVACproduct').show()
+-----------+--------+
|HVACproduct|count(1)|
+-----------+--------+
|    ACMAX22|       3|
|     AC1000|       3|
|     JDNS77|       4|
|     FN39TG|       4|
|     GG1919|       4|
+-----------+--------+
d1 = spark.sql('SELECT * FROM HVAC WHERE BuildingAge >= 10')
d1.groupBy('HVACproduct').count().show()
+-----------+-----+
|HVACproduct|count|
+-----------+-----+
|    ACMAX22|    3|
|     AC1000|    3|
|     JDNS77|    4|
|     FN39TG|    4|
|     GG1919|    4|
+-----------+-----+
# UDF

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

slen = udf(lambda s: len(s)+2, IntegerType())
df.select('*', slen(df['Country']).alias('slen')).show()
+----------+-----------+-----------+-----------+------------+----+
|BuildingID|BuildingMgr|BuildingAge|HVACproduct|     Country|slen|
+----------+-----------+-----------+-----------+------------+----+
|         1|         M1|         25|     AC1000|         USA|   5|
|         2|         M2|         27|     FN39TG|      France|   8|
|         3|         M3|         28|     JDNS77|      Brazil|   8|
|         4|         M4|         17|     GG1919|     Finland|   9|
|         5|         M5|          3|    ACMAX22|   Hong Kong|  11|
|         6|         M6|          9|     AC1000|   Singapore|  11|
|         7|         M7|         13|     FN39TG|South Africa|  14|
|         8|         M8|         25|     JDNS77|   Australia|  11|
|         9|         M9|         11|     GG1919|      Mexico|   8|
|        10|        M10|         23|    ACMAX22|       China|   7|
|        11|        M11|         14|     AC1000|     Belgium|   9|
|        12|        M12|         26|     FN39TG|     Finland|   9|
|        13|        M13|         25|     JDNS77|Saudi Arabia|  14|
|        14|        M14|         17|     GG1919|     Germany|   9|
|        15|        M15|         19|    ACMAX22|      Israel|   8|
|        16|        M16|         23|     AC1000|      Turkey|   8|
|        17|        M17|         11|     FN39TG|       Egypt|   7|
|        18|        M18|         25|     JDNS77|   Indonesia|  11|
|        19|        M19|         14|     GG1919|      Canada|   8|
|        20|        M20|         19|    ACMAX22|   Argentina|  11|
+----------+-----------+-----------+-----------+------------+----+
spark.udf.register('slen', lambda s: len(s), IntegerType())
spark.sql('SELECT *, slen(Country) AS slen FROM HVAC').show()
+----------+-----------+-----------+-----------+------------+----+
|BuildingID|BuildingMgr|BuildingAge|HVACproduct|     Country|slen|
+----------+-----------+-----------+-----------+------------+----+
|         1|         M1|         25|     AC1000|         USA|   3|
|         2|         M2|         27|     FN39TG|      France|   6|
|         3|         M3|         28|     JDNS77|      Brazil|   6|
|         4|         M4|         17|     GG1919|     Finland|   7|
|         5|         M5|          3|    ACMAX22|   Hong Kong|   9|
|         6|         M6|          9|     AC1000|   Singapore|   9|
|         7|         M7|         13|     FN39TG|South Africa|  12|
|         8|         M8|         25|     JDNS77|   Australia|   9|
|         9|         M9|         11|     GG1919|      Mexico|   6|
|        10|        M10|         23|    ACMAX22|       China|   5|
|        11|        M11|         14|     AC1000|     Belgium|   7|
|        12|        M12|         26|     FN39TG|     Finland|   7|
|        13|        M13|         25|     JDNS77|Saudi Arabia|  12|
|        14|        M14|         17|     GG1919|     Germany|   7|
|        15|        M15|         19|    ACMAX22|      Israel|   6|
|        16|        M16|         23|     AC1000|      Turkey|   6|
|        17|        M17|         11|     FN39TG|       Egypt|   5|
|        18|        M18|         25|     JDNS77|   Indonesia|   9|
|        19|        M19|         14|     GG1919|      Canada|   6|
|        20|        M20|         19|    ACMAX22|   Argentina|   9|
+----------+-----------+-----------+-----------+------------+----+

Flexible Data Model

Sample data file at

https://www.cse.ust.hk/msbd5003/data/products.json

df = spark.read.json('../data/products.json')
df.printSchema()
root
 |-- dimensions: struct (nullable = true)
 |    |-- height: double (nullable = true)
 |    |-- length: double (nullable = true)
 |    |-- width: double (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- price: double (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- warehouseLocation: struct (nullable = true)
 |    |-- latitude: double (nullable = true)
 |    |-- longitude: double (nullable = true)
df.show()
+----------------+---+----------------+-----+-----------+-----------------+
|      dimensions| id|            name|price|       tags|warehouseLocation|
+----------------+---+----------------+-----+-----------+-----------------+
|[9.5, 7.0, 12.0]|  2|An ice sculpture| 12.5|[cold, ice]|   [-78.75, 20.4]|
| [1.0, 3.1, 1.0]|  3|    A blue mouse| 25.5|       null|    [54.4, -32.7]|
+----------------+---+----------------+-----+-----------+-----------------+
# Accessing nested fields

df.select(df['dimensions.height']).show()
+------+
|height|
+------+
|   9.5|
|   1.0|
+------+
df.select('dimensions.height').show()
+------+
|height|
+------+
|   9.5|
|   1.0|
+------+
df.select('dimensions.height')\
  .filter("tags[0] = 'cold' AND warehouseLocation.latitude < 0")\
  .show()
+------+
|height|
+------+
|   9.5|
+------+
df.rdd.take(3)
[Row(dimensions=Row(height=9.5, length=7.0, width=12.0), id=2, name='An ice sculpture', price=12.5, tags=['cold', 'ice'], warehouseLocation=Row(latitude=-78.75, longitude=20.4)),
 Row(dimensions=Row(height=1.0, length=3.1, width=1.0), id=3, name='A blue mouse', price=25.5, tags=None, warehouseLocation=Row(latitude=54.4, longitude=-32.7))]

Converting between RDD and DataFrame

Sample data file at:

https://www.cse.ust.hk/msbd5003/data/people.txt

# Load a text file and convert each line to a Row.
lines = sc.textFile("../data/people.txt")

def parse(l):
    a = l.split(',')
    return (a[0], int(a[1]))

rdd = lines.map(parse)
rdd.collect()
[('Michael', 29), ('Andy', 30), ('Justin', 19)]
# Create the DataFrame from an RDD of tuples, schema is inferred
df = spark.createDataFrame(rdd)
df.printSchema()
df.show()
root
 |-- _1: string (nullable = true)
 |-- _2: long (nullable = true)

+-------+---+
|     _1| _2|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+
# Create the DataFrame from an RDD of tuples with column names, type is inferred
df = spark.createDataFrame(rdd, ['name', 'age'])
df.printSchema()
df.show()
root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)

+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+
# Create the DataFrame from an RDD of Rows, type is given in the Row objects
from pyspark.sql import Row

rdd_rows = rdd.map(lambda p: Row(name = p[0], age = p[1]))
df = spark.createDataFrame(rdd_rows)
df.printSchema()
df.show()
root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)

+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+
# Row fields with types incompatible with that of previous rows will be turned into nulls
row1 = Row(name="Alice", age=11)
row2 = Row(name="Bob", age='12')
rdd_rows = sc.parallelize([row1, row2])
df1 = spark.createDataFrame(rdd_rows)
df1.show()
+-----+----+
| name| age|
+-----+----+
|Alice|  11|
|  Bob|null|
+-----+----+
# rdd returns the content as an RDD of Rows
teenagers = df.filter('age >= 13 and age <= 19')

teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name)
teenNames.collect()
['Name: Justin']

Note:

DataFrames are stored using columnar storage with compression

RDDs are stored using row storage without compression

The RDD view of DataFrame just provides an interface, the Row objects are constructed on the fly and do not necessarily represent the internal storage format of the data

Closure in DataFrames

data = range(10)
df = spark.createDataFrame(zip(data, data))
df.printSchema()
df.show()
root
 |-- _1: long (nullable = true)
 |-- _2: long (nullable = true)

+---+---+
| _1| _2|
+---+---+
|  0|  0|
|  1|  1|
|  2|  2|
|  3|  3|
|  4|  4|
|  5|  5|
|  6|  6|
|  7|  7|
|  8|  8|
|  9|  9|
+---+---+
# The 'closure' behaviour in RDD doesn't seem to exist for DataFrames

x = 5
df1 = df.filter(df._1 < x)
df1.show()
x = 3
df1.show()
+---+---+
| _1| _2|
+---+---+
|  0|  0|
|  1|  1|
|  2|  2|
|  3|  3|
|  4|  4|
+---+---+

+---+---+
| _1| _2|
+---+---+
|  0|  0|
|  1|  1|
|  2|  2|
|  3|  3|
|  4|  4|
+---+---+
# Because of the Catalyst optimizer !

df1.explain()
== Physical Plan ==
*(1) Filter (isnotnull(_1#1265L) AND (_1#1265L < 5))
+- *(1) Scan ExistingRDD[_1#1265L,_2#1266L]
def f():
    return x/2
x = 5
df1 = df.select(df._1 * 2 + f() + 1 + 1)
df1.explain()
df1.show()
== Physical Plan ==
*(1) Project [(((cast((_1#1265L * 2) as double) + 2.5) + 1.0) + 1.0) AS ((((_1 * 2) + 2.5) + 1) + 1)#1296]
+- *(1) Scan ExistingRDD[_1#1265L,_2#1266L]


+----------------------------+
|((((_1 * 2) + 2.5) + 1) + 1)|
+----------------------------+
|                         4.5|
|                         6.5|
|                         8.5|
|                        10.5|
|                        12.5|
|                        14.5|
|                        16.5|
|                        18.5|
|                        20.5|
|                        22.5|
+----------------------------+
rdd = sc.parallelize(range(10))
x = 5
a = rdd.filter(lambda z: z < x)
print(a.take(10))
x = 3
print(a.take(10))
[0, 1, 2, 3, 4]
[0, 1, 2]
counter = 0

def increment_counter(x):
    global counter
    counter += 1

df.foreach(increment_counter)

print(counter)
0