Skip to content

SQL

Tables explained

The schema of a table is the table name and its attributes:

  • A tuple = a record = row
  • A table = a set of tuples

Operators

Like operator

SELECT   *
FROM      Products
WHERE   PName LIKE %gizmo%

s LIKE p: pattern matching on strings p may contain two special symbols: - % = any sequence of characters - _ = any single character

Distinct

SELECT   DISTINCT category
FROM     Product

Order

SELECT   pname, price, manufacturer
FROM     Product
WHERE   category=gizmo AND price > 50
ORDER BY  price, pname

join

support we have a table with following schema

drop table if exists product, location;

create table location(
    id int primary key auto_increment,
    location text not null
);

create  table product(
    id int primary key auto_increment,
    name text,
    location_id int,
    foreign key (location_id) references location(id)
);

insert into location  (location) values ('Shenzhen');
insert into location  (location) values ('Shanghai');
insert into location  (location) values ('Beijing');

insert into product (name, location_id) values ('iPad', 1);
insert into product (name, location_id) values ('iPhone', 2);
insert into product (name, location_id) values ('iMac', 3);

First way

select name from product inner join location l on product.location_id = l.id where location = 'Shenzhen';

Second way

select name from product, location l where location_id =l.id and location = 'Shenzhen';

Outter join

iPad    Shenzhen
iPhone  Shanghai
iMac    Beijing
Null    Hangzhou

Inner join

iPad    Shenzhen
iPhone  Shanghai
iMac    Beijing

Aggregation

sum, count, min, max, avg

SELECT  avg(price)
FROM      Product
WHERE   maker=Toyota
  • COUNT applies to duplicates, unless otherwise stated:

Having

SELECT       product, Sum(price * quantity)
FROM          Purchase
WHERE       date > 10/1/2005
GROUP BY product
HAVING      Sum(quantity) > 30

PySpark SQL

RDD can be used to achieve the same functionality. What are the benefits of using DataFrames?

  • Readability
  • Flexibility
  • Columnar storage
  • Catalyst optimizer

Plan Optimization

截屏2020-12-09 上午10.22.24.png

Tree transformations

截屏2020-12-09 上午10.25.43.png

Catalyst Rules

  • Pattern matching functions that transform subtrees into specific structures.

  • Multiple patterns in the same transform call.

  • May take multiple batches to reach a fixed point.

  • transform can contain arbitrary Scala code.

Rule-based optimization

截屏2020-12-09 上午10.27.55.png

Cost-based optimization

Currently only used for choosing join algorithms - Big table joining big table: shuffle - Small table joining big table: broadcast the small table

Pyspark demo

!pip install pyspark
Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
     |████████████████████████████████| 204.2MB 67kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
     |████████████████████████████████| 204kB 27.6MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612243 sha256=1dcf7073a6e4895948c28fb6dcc6c2d41885f6288af4226ea8e4e78f09e44494
  Stored in directory: /root/.cache/pip/wheels/5e/bd/07/031766ca628adec8435bb40f0bd83bb676ce65ff4007f8e73f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1
from pyspark.sql import Row
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
row = Row(name='Alice', age=11)
row.name, row.age
('Alice', 11)

Using . to get attribute or using python dict function

row = Row(name='Alice', age=11, count=1)
print(f"Row.count: {row.count}")
print(f"Row['count']: {row['count']}")
Row.count: <built-in method count of Row object at 0x7fd3091ecaf0>
Row['count']: 1

Load csv

df = spark.read.csv('/content/drive/MyDrive/courses/HKUST/MSBD5003/data/building.csv', header=True, inferSchema=True)

Create rdd from dataframe

dfrdd = df.rdd
dfrdd.take(3)
[Row(BuildingID=1, BuildingMgr='M1', BuildingAge=25, HVACproduct='AC1000', Country='USA'),
 Row(BuildingID=2, BuildingMgr='M2', BuildingAge=27, HVACproduct='FN39TG', Country='France'),
 Row(BuildingID=3, BuildingMgr='M3', BuildingAge=28, HVACproduct='JDNS77', Country='Brazil')]

Select

from pyspark.sql.functions import *
df.select('BuildingID', 'Country').show()
+----------+------------+
|BuildingID|     Country|
+----------+------------+
|         1|         USA|
|         2|      France|
|         3|      Brazil|
|         4|     Finland|
|         5|   Hong Kong|
|         6|   Singapore|
|         7|South Africa|
|         8|   Australia|
|         9|      Mexico|
|        10|       China|
|        11|     Belgium|
|        12|     Finland|
|        13|Saudi Arabia|
|        14|     Germany|
|        15|      Israel|
|        16|      Turkey|
|        17|       Egypt|
|        18|   Indonesia|
|        19|      Canada|
|        20|   Argentina|
+----------+------------+

Using where clause

lit: Creates a Column of literal value.

df.where("Country<'USA'").select('BuildingID', lit('OK')).show()
+----------+---+
|BuildingID| OK|
+----------+---+
|         2| OK|
|         3| OK|
|         4| OK|
|         5| OK|
|         6| OK|
|         7| OK|
|         8| OK|
|         9| OK|
|        10| OK|
|        11| OK|
|        12| OK|
|        13| OK|
|        14| OK|
|        15| OK|
|        16| OK|
|        17| OK|
|        18| OK|
|        19| OK|
|        20| OK|
+----------+---+

RawSQL to spark

dfCustomer = spark.read.csv('/content/drive/MyDrive/courses/HKUST/MSBD5003/data/Customer.csv', header=True, inferSchema=True)
dfProduct = spark.read.csv('/content/drive/MyDrive/courses/HKUST/MSBD5003/data/Product.csv', header=True, inferSchema=True)
dfDetail = spark.read.csv('/content/drive/MyDrive/courses/HKUST/MSBD5003/data/SalesOrderDetail.csv', header=True, inferSchema=True)
dfHeader = spark.read.csv('/content/drive/MyDrive/courses/HKUST/MSBD5003/data/SalesOrderHeader.csv', header=True, inferSchema=True)
SELECT ProductID, Name, ListPrice 
FROM Product 
WHERE Color = 'black'
dfProduct.filter("Color = 'Black'")\
         .select('ProductID', 'Name', 'ListPrice')\
         .show(truncate=False)
+---------+-----------------------------+---------+
|ProductID|Name                         |ListPrice|
+---------+-----------------------------+---------+
|680      |HL Road Frame - Black, 58    |1431.5   |
|708      |Sport-100 Helmet, Black      |34.99    |
|722      |LL Road Frame - Black, 58    |337.22   |
|723      |LL Road Frame - Black, 60    |337.22   |
|724      |LL Road Frame - Black, 62    |337.22   |
|736      |LL Road Frame - Black, 44    |337.22   |
|737      |LL Road Frame - Black, 48    |337.22   |
|738      |LL Road Frame - Black, 52    |337.22   |
|743      |HL Mountain Frame - Black, 42|1349.6   |
|744      |HL Mountain Frame - Black, 44|1349.6   |
|745      |HL Mountain Frame - Black, 48|1349.6   |
|746      |HL Mountain Frame - Black, 46|1349.6   |
|747      |HL Mountain Frame - Black, 38|1349.6   |
|765      |Road-650 Black, 58           |782.99   |
|766      |Road-650 Black, 60           |782.99   |
|767      |Road-650 Black, 62           |782.99   |
|768      |Road-650 Black, 44           |782.99   |
|769      |Road-650 Black, 48           |782.99   |
|770      |Road-650 Black, 52           |782.99   |
|775      |Mountain-100 Black, 38       |3374.99  |
+---------+-----------------------------+---------+
only showing top 20 rows
dfProduct.where(dfProduct.Color=='Black') \
         .select(dfProduct.ProductID, dfProduct['Name'], (dfProduct['ListPrice'] * 2).alias('Double price')) \
         .show(truncate=False)
+---------+-----------------------------+------------+
|ProductID|Name                         |Double price|
+---------+-----------------------------+------------+
|680      |HL Road Frame - Black, 58    |2863.0      |
|708      |Sport-100 Helmet, Black      |69.98       |
|722      |LL Road Frame - Black, 58    |674.44      |
|723      |LL Road Frame - Black, 60    |674.44      |
|724      |LL Road Frame - Black, 62    |674.44      |
|736      |LL Road Frame - Black, 44    |674.44      |
|737      |LL Road Frame - Black, 48    |674.44      |
|738      |LL Road Frame - Black, 52    |674.44      |
|743      |HL Mountain Frame - Black, 42|2699.2      |
|744      |HL Mountain Frame - Black, 44|2699.2      |
|745      |HL Mountain Frame - Black, 48|2699.2      |
|746      |HL Mountain Frame - Black, 46|2699.2      |
|747      |HL Mountain Frame - Black, 38|2699.2      |
|765      |Road-650 Black, 58           |1565.98     |
|766      |Road-650 Black, 60           |1565.98     |
|767      |Road-650 Black, 62           |1565.98     |
|768      |Road-650 Black, 44           |1565.98     |
|769      |Road-650 Black, 48           |1565.98     |
|770      |Road-650 Black, 52           |1565.98     |
|775      |Mountain-100 Black, 38       |6749.98     |
+---------+-----------------------------+------------+
only showing top 20 rows
dfProduct.where(dfProduct.ListPrice * 2 > 100) \
         .select(dfProduct.ProductID, dfProduct['Name'], dfProduct.ListPrice * 2) \
         .show(truncate=False)
+---------+-------------------------+---------------+
|ProductID|Name                     |(ListPrice * 2)|
+---------+-------------------------+---------------+
|680      |HL Road Frame - Black, 58|2863.0         |
|706      |HL Road Frame - Red, 58  |2863.0         |
|717      |HL Road Frame - Red, 62  |2863.0         |
|718      |HL Road Frame - Red, 44  |2863.0         |
|719      |HL Road Frame - Red, 48  |2863.0         |
|720      |HL Road Frame - Red, 52  |2863.0         |
|721      |HL Road Frame - Red, 56  |2863.0         |
|722      |LL Road Frame - Black, 58|674.44         |
|723      |LL Road Frame - Black, 60|674.44         |
|724      |LL Road Frame - Black, 62|674.44         |
|725      |LL Road Frame - Red, 44  |674.44         |
|726      |LL Road Frame - Red, 48  |674.44         |
|727      |LL Road Frame - Red, 52  |674.44         |
|728      |LL Road Frame - Red, 58  |674.44         |
|729      |LL Road Frame - Red, 60  |674.44         |
|730      |LL Road Frame - Red, 62  |674.44         |
|731      |ML Road Frame - Red, 44  |1189.66        |
|732      |ML Road Frame - Red, 48  |1189.66        |
|733      |ML Road Frame - Red, 52  |1189.66        |
|734      |ML Road Frame - Red, 58  |1189.66        |
+---------+-------------------------+---------------+
only showing top 20 rows
SELECT ProductID, Name, ListPrice 
FROM Product 
WHERE Color = 'black' 
ORDER BY ProductID
dfProduct.filter("Color = 'Black'")\
         .select('ProductID', 'Name', 'ListPrice')\
         .orderBy('ListPrice')\
         .show(truncate=False)
+---------+--------------------------+---------+
|ProductID|Name                      |ListPrice|
+---------+--------------------------+---------+
|860      |Half-Finger Gloves, L     |24.49    |
|859      |Half-Finger Gloves, M     |24.49    |
|858      |Half-Finger Gloves, S     |24.49    |
|708      |Sport-100 Helmet, Black   |34.99    |
|862      |Full-Finger Gloves, M     |37.99    |
|861      |Full-Finger Gloves, S     |37.99    |
|863      |Full-Finger Gloves, L     |37.99    |
|841      |Men's Sports Shorts, S    |59.99    |
|849      |Men's Sports Shorts, M    |59.99    |
|851      |Men's Sports Shorts, XL   |59.99    |
|850      |Men's Sports Shorts, L    |59.99    |
|815      |LL Mountain Front Wheel   |60.745   |
|868      |Women's Mountain Shorts, M|69.99    |
|869      |Women's Mountain Shorts, L|69.99    |
|867      |Women's Mountain Shorts, S|69.99    |
|853      |Women's Tights, M         |74.99    |
|854      |Women's Tights, L         |74.99    |
|852      |Women's Tights, S         |74.99    |
|818      |LL Road Front Wheel       |85.565   |
|823      |LL Mountain Rear Wheel    |87.745   |
+---------+--------------------------+---------+
only showing top 20 rows

Find all orders and details on black product, return the product SalesOrderID, SalesOrderDetailID, Name, UnitPrice, and OrderQty

SELECT SalesOrderID, SalesOrderDetailID, Name, UnitPrice, OrderQty 
FROM SalesLT.SalesOrderDetail, SalesLT.Product
WHERE SalesOrderDetail.ProductID = Product.ProductID AND Color = 'Black'

SELECT SalesOrderID, SalesOrderDetailID, Name, UnitPrice, OrderQty 
FROM SalesLT.SalesOrderDetail
JOIN SalesLT.Product ON SalesOrderDetail.ProductID = Product.ProductID
WHERE Color = 'Black'
dfDetail.join(dfProduct, 'ProductID') \
        .filter("Color='Black'")\
        .select('SalesOrderID', 'SalesOrderDetailID', 'Name', 'UnitPrice', 'OrderQty') \
        .show()
+------------+------------------+--------------------+---------+--------+
|SalesOrderID|SalesOrderDetailID|                Name|UnitPrice|OrderQty|
+------------+------------------+--------------------+---------+--------+
|       71938|            113295|Sport-100 Helmet,...|   29.994|       5|
|       71902|            112988|Sport-100 Helmet,...|   20.994|       4|
|       71797|            111082|Sport-100 Helmet,...|  20.2942|      12|
|       71784|            110795|Sport-100 Helmet,...|  20.2942|      12|
|       71783|            110752|Sport-100 Helmet,...|  20.2942|      11|
|       71782|            110690|Sport-100 Helmet,...|   20.994|       7|
|       71797|            111045|LL Road Frame - B...|  202.332|       3|
|       71783|            110730|LL Road Frame - B...|  202.332|       6|
|       71938|            113297|LL Road Frame - B...|  202.332|       3|
|       71915|            113090|LL Road Frame - B...|  202.332|       2|
|       71815|            111451|LL Road Frame - B...|  202.332|       1|
|       71797|            111044|LL Road Frame - B...|  202.332|       1|
|       71783|            110710|LL Road Frame - B...|  202.332|       4|
|       71936|            113260|HL Mountain Frame...|   809.76|       4|
|       71899|            112937|HL Mountain Frame...|   809.76|       1|
|       71845|            112137|HL Mountain Frame...|   809.76|       2|
|       71832|            111806|HL Mountain Frame...|   809.76|       4|
|       71780|            110622|HL Mountain Frame...|   809.76|       1|
|       71936|            113235|HL Mountain Frame...|   809.76|       4|
|       71845|            112134|HL Mountain Frame...|   809.76|       3|
+------------+------------------+--------------------+---------+--------+
only showing top 20 rows

Move filter to after select

query = dfDetail.join(dfProduct, 'ProductID') \
        .select('SalesOrderID', 'SalesOrderDetailID', 'Name', 'UnitPrice', 'OrderQty') \
        .filter("Color='Black'")\

query.show()
+------------+------------------+--------------------+---------+--------+
|SalesOrderID|SalesOrderDetailID|                Name|UnitPrice|OrderQty|
+------------+------------------+--------------------+---------+--------+
|       71938|            113295|Sport-100 Helmet,...|   29.994|       5|
|       71902|            112988|Sport-100 Helmet,...|   20.994|       4|
|       71797|            111082|Sport-100 Helmet,...|  20.2942|      12|
|       71784|            110795|Sport-100 Helmet,...|  20.2942|      12|
|       71783|            110752|Sport-100 Helmet,...|  20.2942|      11|
|       71782|            110690|Sport-100 Helmet,...|   20.994|       7|
|       71797|            111045|LL Road Frame - B...|  202.332|       3|
|       71783|            110730|LL Road Frame - B...|  202.332|       6|
|       71938|            113297|LL Road Frame - B...|  202.332|       3|
|       71915|            113090|LL Road Frame - B...|  202.332|       2|
|       71815|            111451|LL Road Frame - B...|  202.332|       1|
|       71797|            111044|LL Road Frame - B...|  202.332|       1|
|       71783|            110710|LL Road Frame - B...|  202.332|       4|
|       71936|            113260|HL Mountain Frame...|   809.76|       4|
|       71899|            112937|HL Mountain Frame...|   809.76|       1|
|       71845|            112137|HL Mountain Frame...|   809.76|       2|
|       71832|            111806|HL Mountain Frame...|   809.76|       4|
|       71780|            110622|HL Mountain Frame...|   809.76|       1|
|       71936|            113235|HL Mountain Frame...|   809.76|       4|
|       71845|            112134|HL Mountain Frame...|   809.76|       3|
+------------+------------------+--------------------+---------+--------+
only showing top 20 rows

Query plan

query.explain()
== Physical Plan ==
*(2) Project [SalesOrderID#183, SalesOrderDetailID#184, Name#134, UnitPrice#187, OrderQty#185]
+- *(2) BroadcastHashJoin [ProductID#186], [ProductID#133], Inner, BuildLeft
   :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[3, int, true] as bigint))), [id=#485]
   :  +- *(1) Project [SalesOrderID#183, SalesOrderDetailID#184, OrderQty#185, ProductID#186, UnitPrice#187]
   :     +- *(1) Filter isnotnull(ProductID#186)
   :        +- FileScan csv [SalesOrderID#183,SalesOrderDetailID#184,OrderQty#185,ProductID#186,UnitPrice#187] Batched: false, DataFilters: [isnotnull(ProductID#186)], Format: CSV, Location: InMemoryFileIndex[file:/content/drive/MyDrive/courses/HKUST/MSBD5003/data/SalesOrderDetail.csv], PartitionFilters: [], PushedFilters: [IsNotNull(ProductID)], ReadSchema: struct<SalesOrderID:int,SalesOrderDetailID:int,OrderQty:int,ProductID:int,UnitPrice:double>
   +- *(2) Project [ProductID#133, Name#134]
      +- *(2) Filter ((isnotnull(Color#136) AND (Color#136 = Black)) AND isnotnull(ProductID#133))
         +- FileScan csv [ProductID#133,Name#134,Color#136] Batched: false, DataFilters: [isnotnull(Color#136), (Color#136 = Black), isnotnull(ProductID#133)], Format: CSV, Location: InMemoryFileIndex[file:/content/drive/MyDrive/courses/HKUST/MSBD5003/data/Product.csv], PartitionFilters: [], PushedFilters: [IsNotNull(Color), EqualTo(Color,Black), IsNotNull(ProductID)], ReadSchema: struct<ProductID:int,Name:string,Color:string>

SparkSQL performs optimization depending on whether intermediate dataframe are cached or not:

d1 = dfDetail.join(dfProduct, 'ProductID') \
             .select('SalesOrderID', 'SalesOrderDetailID', 'Name', 'UnitPrice', 'OrderQty')
d1.persist()
DataFrame[SalesOrderID: int, SalesOrderDetailID: int, Name: string, UnitPrice: double, OrderQty: int]
d2 = d1.filter("Color = 'Black'")
#d2 = d1.filter("OrderQty >= 10")
d2.explain()
== Physical Plan ==
*(2) Project [SalesOrderID#183, SalesOrderDetailID#184, Name#134, UnitPrice#187, OrderQty#185]
+- *(2) BroadcastHashJoin [ProductID#186], [ProductID#133], Inner, BuildLeft
   :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[3, int, true] as bigint))), [id=#564]
   :  +- *(1) Project [SalesOrderID#183, SalesOrderDetailID#184, OrderQty#185, ProductID#186, UnitPrice#187]
   :     +- *(1) Filter isnotnull(ProductID#186)
   :        +- FileScan csv [SalesOrderID#183,SalesOrderDetailID#184,OrderQty#185,ProductID#186,UnitPrice#187] Batched: false, DataFilters: [isnotnull(ProductID#186)], Format: CSV, Location: InMemoryFileIndex[file:/content/drive/MyDrive/courses/HKUST/MSBD5003/data/SalesOrderDetail.csv], PartitionFilters: [], PushedFilters: [IsNotNull(ProductID)], ReadSchema: struct<SalesOrderID:int,SalesOrderDetailID:int,OrderQty:int,ProductID:int,UnitPrice:double>
   +- *(2) Project [ProductID#133, Name#134]
      +- *(2) Filter ((isnotnull(Color#136) AND (Color#136 = Black)) AND isnotnull(ProductID#133))
         +- FileScan csv [ProductID#133,Name#134,Color#136] Batched: false, DataFilters: [isnotnull(Color#136), (Color#136 = Black), isnotnull(ProductID#133)], Format: CSV, Location: InMemoryFileIndex[file:/content/drive/MyDrive/courses/HKUST/MSBD5003/data/Product.csv], PartitionFilters: [], PushedFilters: [IsNotNull(Color), EqualTo(Color,Black), IsNotNull(ProductID)], ReadSchema: struct<ProductID:int,Name:string,Color:string>

Error! Since color doesn't exists in the result

d1 = dfDetail.join(dfProduct, 'ProductID') \
             .select('SalesOrderID', 'SalesOrderDetailID', 'Name', 'UnitPrice', 'OrderQty')
d1.write.csv('temp.csv', mode = 'overwrite', header = True)
d2 = spark.read.csv('temp.csv', header = True, inferSchema = True)
d2.filter("Color = 'Black'").show()
---------------------------------------------------------------------------

AnalysisException                         Traceback (most recent call last)

<ipython-input-26-30770f7af6e5> in <module>()
      2 d1.write.csv('temp.csv', mode = 'overwrite', header = True)
      3 d2 = spark.read.csv('temp.csv', header = True, inferSchema = True)
----> 4 d2.filter("Color = 'Black'").show()


/usr/local/lib/python3.6/dist-packages/pyspark/sql/dataframe.py in filter(self, condition)
   1457         """
   1458         if isinstance(condition, basestring):
-> 1459             jdf = self._jdf.filter(condition)
   1460         elif isinstance(condition, Column):
   1461             jdf = self._jdf.filter(condition._jc)


/usr/local/lib/python3.6/dist-packages/py4j/java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:


/usr/local/lib/python3.6/dist-packages/pyspark/sql/utils.py in deco(*a, **kw)
    132                 # Hide where the exception came from that shows a non-Pythonic
    133                 # JVM exception message.
--> 134                 raise_from(converted)
    135             else:
    136                 raise


/usr/local/lib/python3.6/dist-packages/pyspark/sql/utils.py in raise_from(e)


AnalysisException: cannot resolve '`Color`' given input columns: [Name, OrderQty, SalesOrderDetailID, SalesOrderID, UnitPrice]; line 1 pos 0;
'Filter ('Color = Black)
+- Relation[SalesOrderID#959,SalesOrderDetailID#960,Name#961,UnitPrice#962,OrderQty#963] csv

Find all orders that include at least one black product, return the product SalesOrderID, Name, UnitPrice, and OrderQty

SELECT DISTINCT SalesOrderID
FROM SalesLT.SalesOrderDetail
JOIN SalesLT.Product ON SalesOrderDetail.ProductID = Product.ProductID
WHERE Color = 'Black'
dfDetail.join(dfProduct.filter("Color='Black'"), 'ProductID') \
        .select('SalesOrderID') \
        .distinct() \
        .show()
+------------+
|SalesOrderID|
+------------+
|       71902|
|       71832|
|       71915|
|       71831|
|       71898|
|       71935|
|       71938|
|       71845|
|       71783|
|       71815|
|       71936|
|       71863|
|       71780|
|       71782|
|       71899|
|       71784|
|       71797|
+------------+
SELECT COUNT(DISTINCT Color)
FROM SalesLT.Product

It's 1 more than standard SQL. In standard SQL, COUNT() does not count NULLs.

dfProduct.select('Color').distinct().count()
10

Find the total price of each order, return SalesOrderID and total price (column name should be ‘totalprice’)

SELECT SalesOrderID, SUM(UnitPrice*OrderQty*(1-UnitPriceDiscount)) AS TotalPrice
FROM SalesLT.SalesOrderDetail
GROUP BY SalesOrderID
dfDetail.select('*', (dfDetail.UnitPrice * dfDetail.OrderQty
                      * (1 - dfDetail.UnitPriceDiscount)).alias('netprice'))\
        .groupBy('SalesOrderID').sum('netprice') \
        .withColumnRenamed('sum(netprice)', 'TotalPrice')\
        .show()
+------------+------------------+
|SalesOrderID|        TotalPrice|
+------------+------------------+
|       71867|             858.9|
|       71902|59894.209199999976|
|       71832|      28950.678108|
|       71915|1732.8899999999999|
|       71946|            31.584|
|       71895|221.25600000000003|
|       71816|2847.4079999999994|
|       71831|          1712.946|
|       71923|         96.108824|
|       71858|11528.844000000001|
|       71917|            37.758|
|       71897|          10585.05|
|       71885|           524.664|
|       71856|500.30400000000003|
|       71898| 53248.69200000002|
|       71774|           713.796|
|       71796| 47848.02600000001|
|       71935|5533.8689079999995|
|       71938|         74205.228|
|       71845|        34118.5356|
+------------+------------------+
only showing top 20 rows

Find the total price of each order where the total price > 10000

SELECT SalesOrderID, SUM(UnitPrice*OrderQty*(1-UnitPriceDiscount)) AS TotalPrice
FROM SalesLT.SalesOrderDetail
GROUP BY SalesOrderID
HAVING SUM(UnitPrice*OrderQty*(1-UnitPriceDiscount)) > 10000
dfDetail.select('*', (dfDetail.UnitPrice * dfDetail. OrderQty
                      * (1 - dfDetail.UnitPriceDiscount)).alias('netprice'))\
        .groupBy('SalesOrderID').sum('netprice') \
        .withColumnRenamed('sum(netprice)', 'TotalPrice')\
        .where('TotalPrice > 10000')\
        .show()
+------------+------------------+
|SalesOrderID|        TotalPrice|
+------------+------------------+
|       71902|59894.209199999976|
|       71832|      28950.678108|
|       71858|11528.844000000001|
|       71897|          10585.05|
|       71898| 53248.69200000002|
|       71796| 47848.02600000001|
|       71938|         74205.228|
|       71845|        34118.5356|
|       71783|      65683.367986|
|       71936| 79589.61602399996|
|       71780|29923.007999999998|
|       71782| 33319.98600000001|
|       71784| 89869.27631400003|
|       71797| 65123.46341800001|
+------------+------------------+

Find the total price on the black products of each order where the total price > 10000

SELECT SalesOrderID, SUM(UnitPrice*OrderQty*(1-UnitPriceDiscount)) AS TotalPrice
FROM SalesLT.SalesOrderDetail, SalesLT.Product
WHERE SalesLT.SalesOrderDetail.ProductID = SalesLT.Product.ProductID AND Color = 'Black'
GROUP BY SalesOrderID
HAVING SUM(UnitPrice*OrderQty*(1-UnitPriceDiscount)) > 10000
dfDetail.select('*', (dfDetail.UnitPrice * dfDetail. OrderQty
                      * (1 - dfDetail.UnitPriceDiscount)).alias('netprice'))\
        .join(dfProduct, 'ProductID') \
        .where("Color = 'Black'")\
        .groupBy('SalesOrderID').sum('netprice') \
        .withColumnRenamed('sum(netprice)', 'TotalPrice')\
        .where('TotalPrice > 10000')\
        .show()
+------------+------------------+
|SalesOrderID|        TotalPrice|
+------------+------------------+
|       71902|26677.883999999995|
|       71832|      16883.748108|
|       71938|         33824.448|
|       71845|         18109.836|
|       71783|15524.117476000003|
|       71936|      44490.290424|
|       71780|         16964.322|
|       71797|      27581.613792|
+------------+------------------+

For each customer, find the total quantity of black products bought. Report CustomerID, FirstName, LastName, and total quantity

select saleslt.customer.customerid, FirstName, LastName, sum(orderqty)
from saleslt.customer
left outer join 
(
saleslt.salesorderheader
join saleslt.salesorderdetail
on saleslt.salesorderdetail.salesorderid = saleslt.salesorderheader.salesorderid
join saleslt.product
on saleslt.product.productid = saleslt.salesorderdetail.productid and color = 'black'
)
on saleslt.customer.customerid = saleslt.salesorderheader.customerid
group by saleslt.customer.customerid, FirstName, LastName
order by sum(orderqty) desc
d1 = dfDetail.join(dfProduct, 'ProductID')\
             .where('Color = "Black"') \
             .join(dfHeader, 'SalesOrderID')\
             .groupBy('CustomerID').sum('OrderQty')
dfCustomer.join(d1, 'CustomerID', 'left_outer')\
          .select('CustomerID', 'FirstName', 'LastName', 'sum(OrderQty)')\
          .orderBy('sum(OrderQty)', ascending=False)\
          .show()
+----------+------------+------------+-------------+
|CustomerID|   FirstName|    LastName|sum(OrderQty)|
+----------+------------+------------+-------------+
|     30050|     Krishna|Sunkammurali|           89|
|     29796|         Jon|      Grande|           65|
|     29957|       Kevin|         Liu|           62|
|     29929|     Jeffrey|       Kurtz|           46|
|     29546| Christopher|        Beck|           45|
|     29922|      Pamala|        Kotc|           34|
|     30113|        Raja|   Venugopal|           34|
|     29938|       Frank|    Campbell|           29|
|     29736|       Terry|   Eminhizer|           23|
|     29485|   Catherine|        Abel|           10|
|     30019|     Matthew|      Miller|            9|
|     29932|     Rebecca|      Laszlo|            7|
|     29975|      Walter|        Mays|            5|
|     29638|    Rosmarie|     Carroll|            2|
|     30089|Michael John|      Troyer|            1|
|     29531|        Cory|       Booth|            1|
|     29568|      Donald|     Blanton|            1|
|       137|       Gytis|   Barzdukas|         null|
|     29834|      Cheryl|     Herring|         null|
|       451|        John|       Emory|         null|
+----------+------------+------------+-------------+
only showing top 20 rows

Run SQL Query

df.createOrReplaceTempView('HVAC')
spark.sql('SELECT * FROM HVAC WHERE BuildingAge >= 10').show()
+----------+-----------+-----------+-----------+------------+
|BuildingID|BuildingMgr|BuildingAge|HVACproduct|     Country|
+----------+-----------+-----------+-----------+------------+
|         1|         M1|         25|     AC1000|         USA|
|         2|         M2|         27|     FN39TG|      France|
|         3|         M3|         28|     JDNS77|      Brazil|
|         4|         M4|         17|     GG1919|     Finland|
|         7|         M7|         13|     FN39TG|South Africa|
|         8|         M8|         25|     JDNS77|   Australia|
|         9|         M9|         11|     GG1919|      Mexico|
|        10|        M10|         23|    ACMAX22|       China|
|        11|        M11|         14|     AC1000|     Belgium|
|        12|        M12|         26|     FN39TG|     Finland|
|        13|        M13|         25|     JDNS77|Saudi Arabia|
|        14|        M14|         17|     GG1919|     Germany|
|        15|        M15|         19|    ACMAX22|      Israel|
|        16|        M16|         23|     AC1000|      Turkey|
|        17|        M17|         11|     FN39TG|       Egypt|
|        18|        M18|         25|     JDNS77|   Indonesia|
|        19|        M19|         14|     GG1919|      Canada|
|        20|        M20|         19|    ACMAX22|   Argentina|
+----------+-----------+-----------+-----------+------------+

Mix DF with SQL

df.where('BuildingAge >= 10').createOrReplaceTempView('OldBuildings')
spark.sql('SELECT HVACproduct, COUNT(*) FROM OldBuildings GROUP BY HVACproduct').show()
+-----------+--------+
|HVACproduct|count(1)|
+-----------+--------+
|    ACMAX22|       3|
|     AC1000|       3|
|     JDNS77|       4|
|     FN39TG|       4|
|     GG1919|       4|
+-----------+--------+
d1 = spark.sql('SELECT * FROM HVAC WHERE BuildingAge >= 10')
d1.groupBy('HVACproduct').count().show()
+-----------+-----+
|HVACproduct|count|
+-----------+-----+
|    ACMAX22|    3|
|     AC1000|    3|
|     JDNS77|    4|
|     FN39TG|    4|
|     GG1919|    4|
+-----------+-----+

User Defined Function

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

slen = udf(lambda s: len(s)+2, IntegerType())
df.select('*', slen(df['Country']).alias('slen')).show()
+----------+-----------+-----------+-----------+------------+----+
|BuildingID|BuildingMgr|BuildingAge|HVACproduct|     Country|slen|
+----------+-----------+-----------+-----------+------------+----+
|         1|         M1|         25|     AC1000|         USA|   5|
|         2|         M2|         27|     FN39TG|      France|   8|
|         3|         M3|         28|     JDNS77|      Brazil|   8|
|         4|         M4|         17|     GG1919|     Finland|   9|
|         5|         M5|          3|    ACMAX22|   Hong Kong|  11|
|         6|         M6|          9|     AC1000|   Singapore|  11|
|         7|         M7|         13|     FN39TG|South Africa|  14|
|         8|         M8|         25|     JDNS77|   Australia|  11|
|         9|         M9|         11|     GG1919|      Mexico|   8|
|        10|        M10|         23|    ACMAX22|       China|   7|
|        11|        M11|         14|     AC1000|     Belgium|   9|
|        12|        M12|         26|     FN39TG|     Finland|   9|
|        13|        M13|         25|     JDNS77|Saudi Arabia|  14|
|        14|        M14|         17|     GG1919|     Germany|   9|
|        15|        M15|         19|    ACMAX22|      Israel|   8|
|        16|        M16|         23|     AC1000|      Turkey|   8|
|        17|        M17|         11|     FN39TG|       Egypt|   7|
|        18|        M18|         25|     JDNS77|   Indonesia|  11|
|        19|        M19|         14|     GG1919|      Canada|   8|
|        20|        M20|         19|    ACMAX22|   Argentina|  11|
+----------+-----------+-----------+-----------+------------+----+
spark.udf.register('slen', lambda s: len(s), IntegerType())
spark.sql('SELECT *, slen(Country) AS slen FROM HVAC').show()
+----------+-----------+-----------+-----------+------------+----+
|BuildingID|BuildingMgr|BuildingAge|HVACproduct|     Country|slen|
+----------+-----------+-----------+-----------+------------+----+
|         1|         M1|         25|     AC1000|         USA|   3|
|         2|         M2|         27|     FN39TG|      France|   6|
|         3|         M3|         28|     JDNS77|      Brazil|   6|
|         4|         M4|         17|     GG1919|     Finland|   7|
|         5|         M5|          3|    ACMAX22|   Hong Kong|   9|
|         6|         M6|          9|     AC1000|   Singapore|   9|
|         7|         M7|         13|     FN39TG|South Africa|  12|
|         8|         M8|         25|     JDNS77|   Australia|   9|
|         9|         M9|         11|     GG1919|      Mexico|   6|
|        10|        M10|         23|    ACMAX22|       China|   5|
|        11|        M11|         14|     AC1000|     Belgium|   7|
|        12|        M12|         26|     FN39TG|     Finland|   7|
|        13|        M13|         25|     JDNS77|Saudi Arabia|  12|
|        14|        M14|         17|     GG1919|     Germany|   7|
|        15|        M15|         19|    ACMAX22|      Israel|   6|
|        16|        M16|         23|     AC1000|      Turkey|   6|
|        17|        M17|         11|     FN39TG|       Egypt|   5|
|        18|        M18|         25|     JDNS77|   Indonesia|   9|
|        19|        M19|         14|     GG1919|      Canada|   6|
|        20|        M20|         19|    ACMAX22|   Argentina|   9|
+----------+-----------+-----------+-----------+------------+----+

Convert between RDD and Dataframe

From Dataframe to RDD

rdd = df.rdd
rdd.take(10)
[Row(BuildingID=1, BuildingMgr='M1', BuildingAge=25, HVACproduct='AC1000', Country='USA'),
 Row(BuildingID=2, BuildingMgr='M2', BuildingAge=27, HVACproduct='FN39TG', Country='France'),
 Row(BuildingID=3, BuildingMgr='M3', BuildingAge=28, HVACproduct='JDNS77', Country='Brazil'),
 Row(BuildingID=4, BuildingMgr='M4', BuildingAge=17, HVACproduct='GG1919', Country='Finland'),
 Row(BuildingID=5, BuildingMgr='M5', BuildingAge=3, HVACproduct='ACMAX22', Country='Hong Kong'),
 Row(BuildingID=6, BuildingMgr='M6', BuildingAge=9, HVACproduct='AC1000', Country='Singapore'),
 Row(BuildingID=7, BuildingMgr='M7', BuildingAge=13, HVACproduct='FN39TG', Country='South Africa'),
 Row(BuildingID=8, BuildingMgr='M8', BuildingAge=25, HVACproduct='JDNS77', Country='Australia'),
 Row(BuildingID=9, BuildingMgr='M9', BuildingAge=11, HVACproduct='GG1919', Country='Mexico'),
 Row(BuildingID=10, BuildingMgr='M10', BuildingAge=23, HVACproduct='ACMAX22', Country='China')]

From rdd to Dataframe

df = spark.createDataFrame(rdd)
df.printSchema()
df.show()
root
 |-- BuildingID: long (nullable = true)
 |-- BuildingMgr: string (nullable = true)
 |-- BuildingAge: long (nullable = true)
 |-- HVACproduct: string (nullable = true)
 |-- Country: string (nullable = true)

+----------+-----------+-----------+-----------+------------+
|BuildingID|BuildingMgr|BuildingAge|HVACproduct|     Country|
+----------+-----------+-----------+-----------+------------+
|         1|         M1|         25|     AC1000|         USA|
|         2|         M2|         27|     FN39TG|      France|
|         3|         M3|         28|     JDNS77|      Brazil|
|         4|         M4|         17|     GG1919|     Finland|
|         5|         M5|          3|    ACMAX22|   Hong Kong|
|         6|         M6|          9|     AC1000|   Singapore|
|         7|         M7|         13|     FN39TG|South Africa|
|         8|         M8|         25|     JDNS77|   Australia|
|         9|         M9|         11|     GG1919|      Mexico|
|        10|        M10|         23|    ACMAX22|       China|
|        11|        M11|         14|     AC1000|     Belgium|
|        12|        M12|         26|     FN39TG|     Finland|
|        13|        M13|         25|     JDNS77|Saudi Arabia|
|        14|        M14|         17|     GG1919|     Germany|
|        15|        M15|         19|    ACMAX22|      Israel|
|        16|        M16|         23|     AC1000|      Turkey|
|        17|        M17|         11|     FN39TG|       Egypt|
|        18|        M18|         25|     JDNS77|   Indonesia|
|        19|        M19|         14|     GG1919|      Canada|
|        20|        M20|         19|    ACMAX22|   Argentina|
+----------+-----------+-----------+-----------+------------+

Note:

DataFrames are stored using columnar storage with compression

RDDs are stored using row storage without compression

The RDD view of DataFrame just provides an interface, the Row objects are constructed on the fly and do not necessarily represent the internal storage format of the data

Closure in Dataframes

data = range(10)
df = spark.createDataFrame(zip(data, data))
df.printSchema()
df.show()
root
 |-- _1: long (nullable = true)
 |-- _2: long (nullable = true)

+---+---+
| _1| _2|
+---+---+
|  0|  0|
|  1|  1|
|  2|  2|
|  3|  3|
|  4|  4|
|  5|  5|
|  6|  6|
|  7|  7|
|  8|  8|
|  9|  9|
+---+---+

The 'closure' behaviour in RDD doesn't seem to exist for DataFrames because of the Catalyst optimizer

# using Dataframe
x = 5
df1 = df.filter(df._1 < x)
df1.show()
x = 3
df1.show()
+---+---+
| _1| _2|
+---+---+
|  0|  0|
|  1|  1|
|  2|  2|
|  3|  3|
|  4|  4|
+---+---+

+---+---+
| _1| _2|
+---+---+
|  0|  0|
|  1|  1|
|  2|  2|
|  3|  3|
|  4|  4|
+---+---+

When using rdd, the a will be calculated twice and the x's value changes will finally impact the result

# Using rdd
rdd = sc.parallelize(range(10))
x = 5
a = rdd.filter(lambda z: z < x)
print(a.take(10))
x = 3
print(a.take(10))
[0, 1, 2, 3, 4]
[0, 1, 2]
def f():
    return x/2
x = 5
df1 = df.select(df._1 * 2 + f() + 1 + 1)
df1.explain()
df1.show()
== Physical Plan ==
*(1) Project [(((cast((_1#1540L * 2) as double) + 2.5) + 1.0) + 1.0) AS ((((_1 * 2) + 2.5) + 1) + 1)#1573]
+- *(1) Scan ExistingRDD[_1#1540L,_2#1541L]


+----------------------------+
|((((_1 * 2) + 2.5) + 1) + 1)|
+----------------------------+
|                         4.5|
|                         6.5|
|                         8.5|
|                        10.5|
|                        12.5|
|                        14.5|
|                        16.5|
|                        18.5|
|                        20.5|
|                        22.5|
+----------------------------+
counter = 0

def increment_counter(x):
    global counter
    counter += 1

df.foreach(increment_counter)

print(counter)
0