DE1 — Lab 3: Physical Representations and Batch II Costs¶
Author : Badr TAJINI - Data Engineering I - ESIEE 2025-2026
Execute all cells. Capture plans and Spark UI evidence.
0. Setup and explicit schema¶
In [1]:
from pyspark.sql import SparkSession, functions as F, types as T
spark = SparkSession.builder.appName("de1-lab3").getOrCreate()
clicks_schema = T.StructType([
T.StructField("prev_title", T.StringType(), True),
T.StructField("curr_title", T.StringType(), True),
T.StructField("type", T.StringType(), True),
T.StructField("n", T.IntegerType(), True),
T.StructField("ts", T.TimestampType(), True),
])
dim_schema = T.StructType([
T.StructField("curr_title", T.StringType(), True),
T.StructField("curr_category", T.StringType(), True),
])
WARNING: Using incubator modules: jdk.incubator.vector Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties 25/10/27 19:04:25 WARN Utils: Your hostname, OrdideJustine, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo) 25/10/27 19:04:25 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 25/10/27 19:04:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
1. Ingest monthly CSVs (row format baseline)¶
In [2]:
base = "data/"
paths = [f"{base}lab3_clicks_2025-05.csv", f"{base}lab3_clicks_2025-06.csv", f"{base}lab3_clicks_2025-07.csv"]
row_df = (spark.read.schema(clicks_schema).option("header","true").csv(paths)
.withColumn("year", F.year("ts")).withColumn("month", F.month("ts")))
row_df.cache()
print("Rows:", row_df.count())
row_df.printSchema()
row_df.show(5, truncate=False)
[Stage 0:> (0 + 3) / 3]
Rows: 15000 root |-- prev_title: string (nullable = true) |-- curr_title: string (nullable = true) |-- type: string (nullable = true) |-- n: integer (nullable = true) |-- ts: timestamp (nullable = true) |-- year: integer (nullable = true) |-- month: integer (nullable = true) +-----------------------------+--------------+----+---+-------------------+----+-----+ |prev_title |curr_title |type|n |ts |year|month| +-----------------------------+--------------+----+---+-------------------+----+-----+ |ETL |PySpark |link|431|2025-06-01 02:57:00|2025|6 | |Data_engineering |Broadcast_join|link|347|2025-06-15 13:40:00|2025|6 | |Python_(programming_language)|MapReduce |link|39 |2025-06-07 15:14:00|2025|6 | |ETL |Data_warehouse|link|401|2025-06-07 04:59:00|2025|6 | |Python_(programming_language)|Dataframe |link|155|2025-06-06 06:40:00|2025|6 | +-----------------------------+--------------+----+---+-------------------+----+-----+ only showing top 5 rows
In [3]:
import requests
import pandas as pd
import os
from datetime import datetime
def export_lab3_metrics(spark, run_id, query, representation="", notes="", output_csv="lab3_metrics_log.csv"):
"""
Export selected Spark metrics to a CSV for Lab 3.
- spark: active SparkSession
- run_id: identifier for the run (ex: 'r1')
- query: query name (Q1, Q2, Q3, Join)
- representation: 'row' or 'column', or 'normal'/'broadcast' for Join
- notes: optional comment
- output_csv: path to CSV file
"""
ui_url = spark.sparkContext.uiWebUrl
app_id = spark.sparkContext.applicationId
if not ui_url:
print("No Spark UI detected. Make sure a job is running.")
return
try:
stages_url = f"{ui_url}/api/v1/applications/{app_id}/stages"
stages = requests.get(stages_url).json()
except Exception as e:
print(f"Error accessing Spark API: {e}")
return
data = []
for s in stages:
input_bytes = s.get("inputBytes", 0)
shuffle_read = s.get("shuffleReadBytes", 0)
shuffle_write = s.get("shuffleWriteBytes", 0)
files_read = s.get("inputRecords", 0)
data.append({
"run_id": run_id,
"query": query,
"representation": representation,
"files_read": files_read,
"input_size_bytes": input_bytes,
"shuffle_read_bytes": shuffle_read,
"shuffle_write_bytes": shuffle_write,
"notes": notes,
"timestamp": datetime.now().isoformat()
})
df = pd.DataFrame(data)
if df.empty:
print(f"No metrics found for {query}-{representation}")
return
if os.path.exists(output_csv):
df.to_csv(output_csv, mode="a", header=False, index=False)
else:
df.to_csv(output_csv, index=False)
print(f"{len(df)} rows exported to {output_csv}")
Evidence: row representation plan¶
In [4]:
# Query Q1: top transitions per month for 'link'
q1_row = (row_df.filter(F.col("type")=="link")
.groupBy("year","month","prev_title","curr_title")
.agg(F.sum("n").alias("n"))
.orderBy(F.desc("n"))
.limit(50))
q1_row.explain("formatted")
export_lab3_metrics(spark, "r1", "Q1", representation="row")
import pathlib, datetime as _dt
pathlib.Path("proof").mkdir(exist_ok=True)
with open("proof/plan_row.txt","w") as f:
f.write(str(_dt.datetime.now())+"\n")
f.write(q1_row._jdf.queryExecution().executedPlan().toString())
print("Saved proof/plan_row.txt")
== Physical Plan ==
AdaptiveSparkPlan (11)
+- TakeOrderedAndProject (10)
+- HashAggregate (9)
+- Exchange (8)
+- HashAggregate (7)
+- Project (6)
+- Filter (5)
+- InMemoryTableScan (1)
+- InMemoryRelation (2)
+- * Project (4)
+- Scan csv (3)
(1) InMemoryTableScan
Output [6]: [curr_title#1, month#7, n#3, prev_title#0, type#2, year#6]
Arguments: [curr_title#1, month#7, n#3, prev_title#0, type#2, year#6], [isnotnull(type#2), (type#2 = link)]
(2) InMemoryRelation
Arguments: [prev_title#0, curr_title#1, type#2, n#3, ts#4, year#6, month#7], StorageLevel(disk, memory, deserialized, 1 replicas)
(3) Scan csv
Output [5]: [prev_title#0, curr_title#1, type#2, n#3, ts#4]
Batched: false
Location: InMemoryFileIndex [file:/mnt/c/Users/Justine/Data_engineering/lab3/data/lab3_clicks_2025-05.csv, ... 2 entries]
ReadSchema: struct<prev_title:string,curr_title:string,type:string,n:int,ts:timestamp>
(4) Project [codegen id : 1]
Output [7]: [prev_title#0, curr_title#1, type#2, n#3, ts#4, year(cast(ts#4 as date)) AS year#6, month(cast(ts#4 as date)) AS month#7]
Input [5]: [prev_title#0, curr_title#1, type#2, n#3, ts#4]
(5) Filter
Input [6]: [curr_title#1, month#7, n#3, prev_title#0, type#2, year#6]
Condition : (isnotnull(type#2) AND (type#2 = link))
(6) Project
Output [5]: [prev_title#0, curr_title#1, n#3, year#6, month#7]
Input [6]: [curr_title#1, month#7, n#3, prev_title#0, type#2, year#6]
(7) HashAggregate
Input [5]: [prev_title#0, curr_title#1, n#3, year#6, month#7]
Keys [4]: [year#6, month#7, prev_title#0, curr_title#1]
Functions [1]: [partial_sum(n#3)]
Aggregate Attributes [1]: [sum#510L]
Results [5]: [year#6, month#7, prev_title#0, curr_title#1, sum#511L]
(8) Exchange
Input [5]: [year#6, month#7, prev_title#0, curr_title#1, sum#511L]
Arguments: hashpartitioning(year#6, month#7, prev_title#0, curr_title#1, 200), ENSURE_REQUIREMENTS, [plan_id=85]
(9) HashAggregate
Input [5]: [year#6, month#7, prev_title#0, curr_title#1, sum#511L]
Keys [4]: [year#6, month#7, prev_title#0, curr_title#1]
Functions [1]: [sum(n#3)]
Aggregate Attributes [1]: [sum(n#3)#404L]
Results [5]: [year#6, month#7, prev_title#0, curr_title#1, sum(n#3)#404L AS n#396L]
(10) TakeOrderedAndProject
Input [5]: [year#6, month#7, prev_title#0, curr_title#1, n#396L]
Arguments: 50, [n#396L DESC NULLS LAST], [year#6, month#7, prev_title#0, curr_title#1, n#396L]
(11) AdaptiveSparkPlan
Output [5]: [year#6, month#7, prev_title#0, curr_title#1, n#396L]
Arguments: isFinalPlan=false
5 rows exported to lab3_metrics_log.csv
Saved proof/plan_row.txt
2. Column representation: Parquet with partitioning and optional sort¶
In [5]:
from pyspark.sql.types import IntegerType
col_base = "outputs/columnar"
# Write columnar
(row_df
.write.mode("overwrite")
.partitionBy("year","month")
.parquet(f"{col_base}/clicks_parquet"))
# Re‑read columnar for fair comparison
col_df = spark.read.schema(clicks_schema.add("year",IntegerType()).add("month",IntegerType())).parquet(f"{col_base}/clicks_parquet")
col_df.cache()
print("Columnar rows:", col_df.count())
Columnar rows: 15000
Evidence: column representation plan¶
In [6]:
q1_col = (col_df.filter(F.col("type")=="link")
.groupBy("year","month","prev_title","curr_title")
.agg(F.sum("n").alias("n"))
.orderBy(F.desc("n"))
.limit(50))
q1_col.explain("formatted")
export_lab3_metrics(spark, "r1", "Q1", representation="column")
with open("proof/plan_column.txt","w") as f:
from datetime import datetime as _dt
f.write(str(_dt.now())+"\n")
f.write(q1_col._jdf.queryExecution().executedPlan().toString())
print("Saved proof/plan_column.txt")
== Physical Plan ==
AdaptiveSparkPlan (11)
+- TakeOrderedAndProject (10)
+- HashAggregate (9)
+- Exchange (8)
+- HashAggregate (7)
+- Project (6)
+- Filter (5)
+- InMemoryTableScan (1)
+- InMemoryRelation (2)
+- * ColumnarToRow (4)
+- Scan parquet (3)
(1) InMemoryTableScan
Output [6]: [curr_title#653, month#658, n#655, prev_title#652, type#654, year#657]
Arguments: [curr_title#653, month#658, n#655, prev_title#652, type#654, year#657], [isnotnull(type#654), (type#654 = link)]
(2) InMemoryRelation
Arguments: [prev_title#652, curr_title#653, type#654, n#655, ts#656, year#657, month#658], StorageLevel(disk, memory, deserialized, 1 replicas)
(3) Scan parquet
Output [7]: [prev_title#652, curr_title#653, type#654, n#655, ts#656, year#657, month#658]
Batched: true
Location: InMemoryFileIndex [file:/mnt/c/Users/Justine/Data_engineering/lab3/outputs/columnar/clicks_parquet]
ReadSchema: struct<prev_title:string,curr_title:string,type:string,n:int,ts:timestamp>
(4) ColumnarToRow [codegen id : 1]
Input [7]: [prev_title#652, curr_title#653, type#654, n#655, ts#656, year#657, month#658]
(5) Filter
Input [6]: [curr_title#653, month#658, n#655, prev_title#652, type#654, year#657]
Condition : (isnotnull(type#654) AND (type#654 = link))
(6) Project
Output [5]: [prev_title#652, curr_title#653, n#655, year#657, month#658]
Input [6]: [curr_title#653, month#658, n#655, prev_title#652, type#654, year#657]
(7) HashAggregate
Input [5]: [prev_title#652, curr_title#653, n#655, year#657, month#658]
Keys [4]: [year#657, month#658, prev_title#652, curr_title#653]
Functions [1]: [partial_sum(n#655)]
Aggregate Attributes [1]: [sum#995L]
Results [5]: [year#657, month#658, prev_title#652, curr_title#653, sum#996L]
(8) Exchange
Input [5]: [year#657, month#658, prev_title#652, curr_title#653, sum#996L]
Arguments: hashpartitioning(year#657, month#658, prev_title#652, curr_title#653, 200), ENSURE_REQUIREMENTS, [plan_id=181]
(9) HashAggregate
Input [5]: [year#657, month#658, prev_title#652, curr_title#653, sum#996L]
Keys [4]: [year#657, month#658, prev_title#652, curr_title#653]
Functions [1]: [sum(n#655)]
Aggregate Attributes [1]: [sum(n#655)#889L]
Results [5]: [year#657, month#658, prev_title#652, curr_title#653, sum(n#655)#889L AS n#881L]
(10) TakeOrderedAndProject
Input [5]: [year#657, month#658, prev_title#652, curr_title#653, n#881L]
Arguments: 50, [n#881L DESC NULLS LAST], [year#657, month#658, prev_title#652, curr_title#653, n#881L]
(11) AdaptiveSparkPlan
Output [5]: [year#657, month#658, prev_title#652, curr_title#653, n#881L]
Arguments: isFinalPlan=false
10 rows exported to lab3_metrics_log.csv
Saved proof/plan_column.txt
3. Join strategy: normal vs broadcast¶
In [7]:
dim = spark.read.schema(dim_schema).option("header","true").csv("data/lab3_dim_curr_category.csv")
# Non‑broadcast join
j1 = (col_df.join(dim, "curr_title", "left")
.groupBy("curr_category")
.agg(F.sum("n").alias("total_n"))
.orderBy(F.desc("total_n")))
j1.explain("formatted")
export_lab3_metrics(spark, "r1", "Join", representation="normal")
# Broadcast join
from pyspark.sql.functions import broadcast
j2 = (col_df.join(broadcast(dim), "curr_title", "left")
.groupBy("curr_category")
.agg(F.sum("n").alias("total_n"))
.orderBy(F.desc("total_n")))
j2.explain("formatted")
export_lab3_metrics(spark, "r1", "Join", representation="broadcast")
# Save one plan for evidence
with open("proof/plan_broadcast.txt","w") as f:
from datetime import datetime as _dt
f.write(str(_dt.now())+"\n")
f.write(j2._jdf.queryExecution().executedPlan().toString())
print("Saved proof/plan_broadcast.txt")
== Physical Plan ==
AdaptiveSparkPlan (15)
+- Sort (14)
+- Exchange (13)
+- HashAggregate (12)
+- Exchange (11)
+- HashAggregate (10)
+- Project (9)
+- BroadcastHashJoin LeftOuter BuildRight (8)
:- InMemoryTableScan (1)
: +- InMemoryRelation (2)
: +- * ColumnarToRow (4)
: +- Scan parquet (3)
+- BroadcastExchange (7)
+- Filter (6)
+- Scan csv (5)
(1) InMemoryTableScan
Output [2]: [curr_title#653, n#655]
Arguments: [curr_title#653, n#655]
(2) InMemoryRelation
Arguments: [prev_title#652, curr_title#653, type#654, n#655, ts#656, year#657, month#658], StorageLevel(disk, memory, deserialized, 1 replicas)
(3) Scan parquet
Output [7]: [prev_title#652, curr_title#653, type#654, n#655, ts#656, year#657, month#658]
Batched: true
Location: InMemoryFileIndex [file:/mnt/c/Users/Justine/Data_engineering/lab3/outputs/columnar/clicks_parquet]
ReadSchema: struct<prev_title:string,curr_title:string,type:string,n:int,ts:timestamp>
(4) ColumnarToRow [codegen id : 1]
Input [7]: [prev_title#652, curr_title#653, type#654, n#655, ts#656, year#657, month#658]
(5) Scan csv
Output [2]: [curr_title#997, curr_category#998]
Batched: false
Location: InMemoryFileIndex [file:/mnt/c/Users/Justine/Data_engineering/lab3/data/lab3_dim_curr_category.csv]
PushedFilters: [IsNotNull(curr_title)]
ReadSchema: struct<curr_title:string,curr_category:string>
(6) Filter
Input [2]: [curr_title#997, curr_category#998]
Condition : isnotnull(curr_title#997)
(7) BroadcastExchange
Input [2]: [curr_title#997, curr_category#998]
Arguments: HashedRelationBroadcastMode(List(input[0, string, false]),false), [plan_id=220]
(8) BroadcastHashJoin
Left keys [1]: [curr_title#653]
Right keys [1]: [curr_title#997]
Join type: LeftOuter
Join condition: None
(9) Project
Output [2]: [n#655, curr_category#998]
Input [4]: [curr_title#653, n#655, curr_title#997, curr_category#998]
(10) HashAggregate
Input [2]: [n#655, curr_category#998]
Keys [1]: [curr_category#998]
Functions [1]: [partial_sum(n#655)]
Aggregate Attributes [1]: [sum#1115L]
Results [2]: [curr_category#998, sum#1116L]
(11) Exchange
Input [2]: [curr_category#998, sum#1116L]
Arguments: hashpartitioning(curr_category#998, 200), ENSURE_REQUIREMENTS, [plan_id=225]
(12) HashAggregate
Input [2]: [curr_category#998, sum#1116L]
Keys [1]: [curr_category#998]
Functions [1]: [sum(n#655)]
Aggregate Attributes [1]: [sum(n#655)#1009L]
Results [2]: [curr_category#998, sum(n#655)#1009L AS total_n#1000L]
(13) Exchange
Input [2]: [curr_category#998, total_n#1000L]
Arguments: rangepartitioning(total_n#1000L DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [plan_id=228]
(14) Sort
Input [2]: [curr_category#998, total_n#1000L]
Arguments: [total_n#1000L DESC NULLS LAST], true, 0
(15) AdaptiveSparkPlan
Output [2]: [curr_category#998, total_n#1000L]
Arguments: isFinalPlan=false
10 rows exported to lab3_metrics_log.csv
== Physical Plan ==
AdaptiveSparkPlan (15)
+- Sort (14)
+- Exchange (13)
+- HashAggregate (12)
+- Exchange (11)
+- HashAggregate (10)
+- Project (9)
+- BroadcastHashJoin LeftOuter BuildRight (8)
:- InMemoryTableScan (1)
: +- InMemoryRelation (2)
: +- * ColumnarToRow (4)
: +- Scan parquet (3)
+- BroadcastExchange (7)
+- Filter (6)
+- Scan csv (5)
(1) InMemoryTableScan
Output [2]: [curr_title#653, n#655]
Arguments: [curr_title#653, n#655]
(2) InMemoryRelation
Arguments: [prev_title#652, curr_title#653, type#654, n#655, ts#656, year#657, month#658], StorageLevel(disk, memory, deserialized, 1 replicas)
(3) Scan parquet
Output [7]: [prev_title#652, curr_title#653, type#654, n#655, ts#656, year#657, month#658]
Batched: true
Location: InMemoryFileIndex [file:/mnt/c/Users/Justine/Data_engineering/lab3/outputs/columnar/clicks_parquet]
ReadSchema: struct<prev_title:string,curr_title:string,type:string,n:int,ts:timestamp>
(4) ColumnarToRow [codegen id : 1]
Input [7]: [prev_title#652, curr_title#653, type#654, n#655, ts#656, year#657, month#658]
(5) Scan csv
Output [2]: [curr_title#997, curr_category#998]
Batched: false
Location: InMemoryFileIndex [file:/mnt/c/Users/Justine/Data_engineering/lab3/data/lab3_dim_curr_category.csv]
PushedFilters: [IsNotNull(curr_title)]
ReadSchema: struct<curr_title:string,curr_category:string>
(6) Filter
Input [2]: [curr_title#997, curr_category#998]
Condition : isnotnull(curr_title#997)
(7) BroadcastExchange
Input [2]: [curr_title#997, curr_category#998]
Arguments: HashedRelationBroadcastMode(List(input[0, string, false]),false), [plan_id=266]
(8) BroadcastHashJoin
Left keys [1]: [curr_title#653]
Right keys [1]: [curr_title#997]
Join type: LeftOuter
Join condition: None
(9) Project
Output [2]: [n#655, curr_category#998]
Input [4]: [curr_title#653, n#655, curr_title#997, curr_category#998]
(10) HashAggregate
Input [2]: [n#655, curr_category#998]
Keys [1]: [curr_category#998]
Functions [1]: [partial_sum(n#655)]
Aggregate Attributes [1]: [sum#1232L]
Results [2]: [curr_category#998, sum#1233L]
(11) Exchange
Input [2]: [curr_category#998, sum#1233L]
Arguments: hashpartitioning(curr_category#998, 200), ENSURE_REQUIREMENTS, [plan_id=271]
(12) HashAggregate
Input [2]: [curr_category#998, sum#1233L]
Keys [1]: [curr_category#998]
Functions [1]: [sum(n#655)]
Aggregate Attributes [1]: [sum(n#655)#1126L]
Results [2]: [curr_category#998, sum(n#655)#1126L AS total_n#1117L]
(13) Exchange
Input [2]: [curr_category#998, total_n#1117L]
Arguments: rangepartitioning(total_n#1117L DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [plan_id=274]
(14) Sort
Input [2]: [curr_category#998, total_n#1117L]
Arguments: [total_n#1117L DESC NULLS LAST], true, 0
(15) AdaptiveSparkPlan
Output [2]: [curr_category#998, total_n#1117L]
Arguments: isFinalPlan=false
10 rows exported to lab3_metrics_log.csv
Saved proof/plan_broadcast.txt
4. Additional queries for metrics¶
In [8]:
# Q2: daily GMV‑like metric (sum of n) for a specific title window
q2_row = (row_df.filter((F.col("type")=="link") & F.col("curr_title").isin("Apache_Spark","PySpark"))
.groupBy("year","month","curr_title").agg(F.sum("n").alias("n")).orderBy("year","month","curr_title"))
q2_col = (col_df.filter((F.col("type")=="link") & F.col("curr_title").isin("Apache_Spark","PySpark"))
.groupBy("year","month","curr_title").agg(F.sum("n").alias("n")).orderBy("year","month","curr_title"))
# Trigger
_ = q2_row.count(); _ = q2_col.count()
export_lab3_metrics(spark, "r1", "Q2", representation="row")
export_lab3_metrics(spark, "r1", "Q2", representation="column")
# Q3: heavy cardinality grouping
q3_row = row_df.groupBy("prev_title","curr_title").agg(F.sum("n").alias("n")).orderBy(F.desc("n")).limit(100)
q3_col = col_df.groupBy("prev_title","curr_title").agg(F.sum("n").alias("n")).orderBy(F.desc("n")).limit(100)
_ = q3_row.count(); _ = q3_col.count()
export_lab3_metrics(spark, "r1", "Q3", representation="row")
export_lab3_metrics(spark, "r1", "Q3", representation="column")
print("Open Spark UI at http://localhost:4040 while each job runs and record metrics into lab3_metrics_log.csv")
22 rows exported to lab3_metrics_log.csv 22 rows exported to lab3_metrics_log.csv 28 rows exported to lab3_metrics_log.csv 28 rows exported to lab3_metrics_log.csv Open Spark UI at http://localhost:4040 while each job runs and record metrics into lab3_metrics_log.csv
5. Save sample outputs¶
In [14]:
import pathlib, pandas as pd
pathlib.Path("outputs").mkdir(parents=True, exist_ok=True)
q1_row.limit(10).toPandas().to_csv("outputs/q1_row_top10.csv", index=False)
q1_col.limit(10).toPandas().to_csv("outputs/q1_col_top10.csv", index=False)
j2.limit(20).toPandas().to_csv("outputs/j2_broadcast_sample.csv", index=False)
print("Saved sample outputs in outputs")
Saved sample outputs in outputs
6. Cleanup¶
In [9]:
spark.stop()
print("Spark session stopped.")
Spark session stopped.
In [ ]: