DE1 — Lab 3: Physical Representations and Batch II Costs¶

Author : Badr TAJINI - Data Engineering I - ESIEE 2025-2026


Execute all cells. Capture plans and Spark UI evidence.

0. Setup and explicit schema¶

In [1]:
from pyspark.sql import SparkSession, functions as F, types as T
spark = SparkSession.builder.appName("de1-lab3").getOrCreate()

clicks_schema = T.StructType([
    T.StructField("prev_title", T.StringType(), True),
    T.StructField("curr_title", T.StringType(), True),
    T.StructField("type", T.StringType(), True),
    T.StructField("n", T.IntegerType(), True),
    T.StructField("ts", T.TimestampType(), True),
])
dim_schema = T.StructType([
    T.StructField("curr_title", T.StringType(), True),
    T.StructField("curr_category", T.StringType(), True),
])
WARNING: Using incubator modules: jdk.incubator.vector
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/10/27 19:04:25 WARN Utils: Your hostname, OrdideJustine, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/10/27 19:04:25 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/27 19:04:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

1. Ingest monthly CSVs (row format baseline)¶

In [2]:
base = "data/"
paths = [f"{base}lab3_clicks_2025-05.csv", f"{base}lab3_clicks_2025-06.csv", f"{base}lab3_clicks_2025-07.csv"]
row_df = (spark.read.schema(clicks_schema).option("header","true").csv(paths)
            .withColumn("year", F.year("ts")).withColumn("month", F.month("ts")))
row_df.cache()
print("Rows:", row_df.count())
row_df.printSchema()
row_df.show(5, truncate=False)
[Stage 0:>                                                          (0 + 3) / 3]
Rows: 15000
root
 |-- prev_title: string (nullable = true)
 |-- curr_title: string (nullable = true)
 |-- type: string (nullable = true)
 |-- n: integer (nullable = true)
 |-- ts: timestamp (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)

+-----------------------------+--------------+----+---+-------------------+----+-----+
|prev_title                   |curr_title    |type|n  |ts                 |year|month|
+-----------------------------+--------------+----+---+-------------------+----+-----+
|ETL                          |PySpark       |link|431|2025-06-01 02:57:00|2025|6    |
|Data_engineering             |Broadcast_join|link|347|2025-06-15 13:40:00|2025|6    |
|Python_(programming_language)|MapReduce     |link|39 |2025-06-07 15:14:00|2025|6    |
|ETL                          |Data_warehouse|link|401|2025-06-07 04:59:00|2025|6    |
|Python_(programming_language)|Dataframe     |link|155|2025-06-06 06:40:00|2025|6    |
+-----------------------------+--------------+----+---+-------------------+----+-----+
only showing top 5 rows
                                                                                
In [3]:
import requests
import pandas as pd
import os
from datetime import datetime

def export_lab3_metrics(spark, run_id, query, representation="", notes="", output_csv="lab3_metrics_log.csv"):
    """
    Export selected Spark metrics to a CSV for Lab 3.
    - spark: active SparkSession
    - run_id: identifier for the run (ex: 'r1')
    - query: query name (Q1, Q2, Q3, Join)
    - representation: 'row' or 'column', or 'normal'/'broadcast' for Join
    - notes: optional comment
    - output_csv: path to CSV file
    """
    ui_url = spark.sparkContext.uiWebUrl
    app_id = spark.sparkContext.applicationId
    
    if not ui_url:
        print("No Spark UI detected. Make sure a job is running.")
        return
    
    try:
        stages_url = f"{ui_url}/api/v1/applications/{app_id}/stages"
        stages = requests.get(stages_url).json()
    except Exception as e:
        print(f"Error accessing Spark API: {e}")
        return
    
    data = []
    for s in stages:
        input_bytes = s.get("inputBytes", 0)
        shuffle_read = s.get("shuffleReadBytes", 0)
        shuffle_write = s.get("shuffleWriteBytes", 0)
        files_read = s.get("inputRecords", 0)
        
        data.append({
            "run_id": run_id,
            "query": query,
            "representation": representation,
            "files_read": files_read,
            "input_size_bytes": input_bytes,
            "shuffle_read_bytes": shuffle_read,
            "shuffle_write_bytes": shuffle_write,
            "notes": notes,
            "timestamp": datetime.now().isoformat()
        })
    
    df = pd.DataFrame(data)
    if df.empty:
        print(f"No metrics found for {query}-{representation}")
        return
    
    if os.path.exists(output_csv):
        df.to_csv(output_csv, mode="a", header=False, index=False)
    else:
        df.to_csv(output_csv, index=False)
    
    print(f"{len(df)} rows exported to {output_csv}")

Evidence: row representation plan¶

In [4]:
# Query Q1: top transitions per month for 'link'
q1_row = (row_df.filter(F.col("type")=="link")
           .groupBy("year","month","prev_title","curr_title")
           .agg(F.sum("n").alias("n"))
           .orderBy(F.desc("n"))
           .limit(50))
q1_row.explain("formatted")
export_lab3_metrics(spark, "r1", "Q1", representation="row")


import pathlib, datetime as _dt
pathlib.Path("proof").mkdir(exist_ok=True)
with open("proof/plan_row.txt","w") as f:
    f.write(str(_dt.datetime.now())+"\n")
    f.write(q1_row._jdf.queryExecution().executedPlan().toString())
print("Saved proof/plan_row.txt")
== Physical Plan ==
AdaptiveSparkPlan (11)
+- TakeOrderedAndProject (10)
   +- HashAggregate (9)
      +- Exchange (8)
         +- HashAggregate (7)
            +- Project (6)
               +- Filter (5)
                  +- InMemoryTableScan (1)
                        +- InMemoryRelation (2)
                              +- * Project (4)
                                 +- Scan csv  (3)


(1) InMemoryTableScan
Output [6]: [curr_title#1, month#7, n#3, prev_title#0, type#2, year#6]
Arguments: [curr_title#1, month#7, n#3, prev_title#0, type#2, year#6], [isnotnull(type#2), (type#2 = link)]

(2) InMemoryRelation
Arguments: [prev_title#0, curr_title#1, type#2, n#3, ts#4, year#6, month#7], StorageLevel(disk, memory, deserialized, 1 replicas)

(3) Scan csv 
Output [5]: [prev_title#0, curr_title#1, type#2, n#3, ts#4]
Batched: false
Location: InMemoryFileIndex [file:/mnt/c/Users/Justine/Data_engineering/lab3/data/lab3_clicks_2025-05.csv, ... 2 entries]
ReadSchema: struct<prev_title:string,curr_title:string,type:string,n:int,ts:timestamp>

(4) Project [codegen id : 1]
Output [7]: [prev_title#0, curr_title#1, type#2, n#3, ts#4, year(cast(ts#4 as date)) AS year#6, month(cast(ts#4 as date)) AS month#7]
Input [5]: [prev_title#0, curr_title#1, type#2, n#3, ts#4]

(5) Filter
Input [6]: [curr_title#1, month#7, n#3, prev_title#0, type#2, year#6]
Condition : (isnotnull(type#2) AND (type#2 = link))

(6) Project
Output [5]: [prev_title#0, curr_title#1, n#3, year#6, month#7]
Input [6]: [curr_title#1, month#7, n#3, prev_title#0, type#2, year#6]

(7) HashAggregate
Input [5]: [prev_title#0, curr_title#1, n#3, year#6, month#7]
Keys [4]: [year#6, month#7, prev_title#0, curr_title#1]
Functions [1]: [partial_sum(n#3)]
Aggregate Attributes [1]: [sum#510L]
Results [5]: [year#6, month#7, prev_title#0, curr_title#1, sum#511L]

(8) Exchange
Input [5]: [year#6, month#7, prev_title#0, curr_title#1, sum#511L]
Arguments: hashpartitioning(year#6, month#7, prev_title#0, curr_title#1, 200), ENSURE_REQUIREMENTS, [plan_id=85]

(9) HashAggregate
Input [5]: [year#6, month#7, prev_title#0, curr_title#1, sum#511L]
Keys [4]: [year#6, month#7, prev_title#0, curr_title#1]
Functions [1]: [sum(n#3)]
Aggregate Attributes [1]: [sum(n#3)#404L]
Results [5]: [year#6, month#7, prev_title#0, curr_title#1, sum(n#3)#404L AS n#396L]

(10) TakeOrderedAndProject
Input [5]: [year#6, month#7, prev_title#0, curr_title#1, n#396L]
Arguments: 50, [n#396L DESC NULLS LAST], [year#6, month#7, prev_title#0, curr_title#1, n#396L]

(11) AdaptiveSparkPlan
Output [5]: [year#6, month#7, prev_title#0, curr_title#1, n#396L]
Arguments: isFinalPlan=false


5 rows exported to lab3_metrics_log.csv
Saved proof/plan_row.txt

2. Column representation: Parquet with partitioning and optional sort¶

In [5]:
from pyspark.sql.types import IntegerType
col_base = "outputs/columnar"
# Write columnar
(row_df
 .write.mode("overwrite")
 .partitionBy("year","month")
 .parquet(f"{col_base}/clicks_parquet"))

# Re‑read columnar for fair comparison
col_df = spark.read.schema(clicks_schema.add("year",IntegerType()).add("month",IntegerType())).parquet(f"{col_base}/clicks_parquet")
col_df.cache()
print("Columnar rows:", col_df.count())
                                                                                
Columnar rows: 15000

Evidence: column representation plan¶

In [6]:
q1_col = (col_df.filter(F.col("type")=="link")
           .groupBy("year","month","prev_title","curr_title")
           .agg(F.sum("n").alias("n"))
           .orderBy(F.desc("n"))
           .limit(50))
q1_col.explain("formatted")
export_lab3_metrics(spark, "r1", "Q1", representation="column")

with open("proof/plan_column.txt","w") as f:
    from datetime import datetime as _dt
    f.write(str(_dt.now())+"\n")
    f.write(q1_col._jdf.queryExecution().executedPlan().toString())
print("Saved proof/plan_column.txt")
== Physical Plan ==
AdaptiveSparkPlan (11)
+- TakeOrderedAndProject (10)
   +- HashAggregate (9)
      +- Exchange (8)
         +- HashAggregate (7)
            +- Project (6)
               +- Filter (5)
                  +- InMemoryTableScan (1)
                        +- InMemoryRelation (2)
                              +- * ColumnarToRow (4)
                                 +- Scan parquet  (3)


(1) InMemoryTableScan
Output [6]: [curr_title#653, month#658, n#655, prev_title#652, type#654, year#657]
Arguments: [curr_title#653, month#658, n#655, prev_title#652, type#654, year#657], [isnotnull(type#654), (type#654 = link)]

(2) InMemoryRelation
Arguments: [prev_title#652, curr_title#653, type#654, n#655, ts#656, year#657, month#658], StorageLevel(disk, memory, deserialized, 1 replicas)

(3) Scan parquet 
Output [7]: [prev_title#652, curr_title#653, type#654, n#655, ts#656, year#657, month#658]
Batched: true
Location: InMemoryFileIndex [file:/mnt/c/Users/Justine/Data_engineering/lab3/outputs/columnar/clicks_parquet]
ReadSchema: struct<prev_title:string,curr_title:string,type:string,n:int,ts:timestamp>

(4) ColumnarToRow [codegen id : 1]
Input [7]: [prev_title#652, curr_title#653, type#654, n#655, ts#656, year#657, month#658]

(5) Filter
Input [6]: [curr_title#653, month#658, n#655, prev_title#652, type#654, year#657]
Condition : (isnotnull(type#654) AND (type#654 = link))

(6) Project
Output [5]: [prev_title#652, curr_title#653, n#655, year#657, month#658]
Input [6]: [curr_title#653, month#658, n#655, prev_title#652, type#654, year#657]

(7) HashAggregate
Input [5]: [prev_title#652, curr_title#653, n#655, year#657, month#658]
Keys [4]: [year#657, month#658, prev_title#652, curr_title#653]
Functions [1]: [partial_sum(n#655)]
Aggregate Attributes [1]: [sum#995L]
Results [5]: [year#657, month#658, prev_title#652, curr_title#653, sum#996L]

(8) Exchange
Input [5]: [year#657, month#658, prev_title#652, curr_title#653, sum#996L]
Arguments: hashpartitioning(year#657, month#658, prev_title#652, curr_title#653, 200), ENSURE_REQUIREMENTS, [plan_id=181]

(9) HashAggregate
Input [5]: [year#657, month#658, prev_title#652, curr_title#653, sum#996L]
Keys [4]: [year#657, month#658, prev_title#652, curr_title#653]
Functions [1]: [sum(n#655)]
Aggregate Attributes [1]: [sum(n#655)#889L]
Results [5]: [year#657, month#658, prev_title#652, curr_title#653, sum(n#655)#889L AS n#881L]

(10) TakeOrderedAndProject
Input [5]: [year#657, month#658, prev_title#652, curr_title#653, n#881L]
Arguments: 50, [n#881L DESC NULLS LAST], [year#657, month#658, prev_title#652, curr_title#653, n#881L]

(11) AdaptiveSparkPlan
Output [5]: [year#657, month#658, prev_title#652, curr_title#653, n#881L]
Arguments: isFinalPlan=false


10 rows exported to lab3_metrics_log.csv
Saved proof/plan_column.txt

3. Join strategy: normal vs broadcast¶

In [7]:
dim = spark.read.schema(dim_schema).option("header","true").csv("data/lab3_dim_curr_category.csv")
# Non‑broadcast join
j1 = (col_df.join(dim, "curr_title", "left")
      .groupBy("curr_category")
      .agg(F.sum("n").alias("total_n"))
      .orderBy(F.desc("total_n")))
j1.explain("formatted")
export_lab3_metrics(spark, "r1", "Join", representation="normal")


# Broadcast join
from pyspark.sql.functions import broadcast
j2 = (col_df.join(broadcast(dim), "curr_title", "left")
      .groupBy("curr_category")
      .agg(F.sum("n").alias("total_n"))
      .orderBy(F.desc("total_n")))
j2.explain("formatted")
export_lab3_metrics(spark, "r1", "Join", representation="broadcast")


# Save one plan for evidence
with open("proof/plan_broadcast.txt","w") as f:
    from datetime import datetime as _dt
    f.write(str(_dt.now())+"\n")
    f.write(j2._jdf.queryExecution().executedPlan().toString())
print("Saved proof/plan_broadcast.txt")
== Physical Plan ==
AdaptiveSparkPlan (15)
+- Sort (14)
   +- Exchange (13)
      +- HashAggregate (12)
         +- Exchange (11)
            +- HashAggregate (10)
               +- Project (9)
                  +- BroadcastHashJoin LeftOuter BuildRight (8)
                     :- InMemoryTableScan (1)
                     :     +- InMemoryRelation (2)
                     :           +- * ColumnarToRow (4)
                     :              +- Scan parquet  (3)
                     +- BroadcastExchange (7)
                        +- Filter (6)
                           +- Scan csv  (5)


(1) InMemoryTableScan
Output [2]: [curr_title#653, n#655]
Arguments: [curr_title#653, n#655]

(2) InMemoryRelation
Arguments: [prev_title#652, curr_title#653, type#654, n#655, ts#656, year#657, month#658], StorageLevel(disk, memory, deserialized, 1 replicas)

(3) Scan parquet 
Output [7]: [prev_title#652, curr_title#653, type#654, n#655, ts#656, year#657, month#658]
Batched: true
Location: InMemoryFileIndex [file:/mnt/c/Users/Justine/Data_engineering/lab3/outputs/columnar/clicks_parquet]
ReadSchema: struct<prev_title:string,curr_title:string,type:string,n:int,ts:timestamp>

(4) ColumnarToRow [codegen id : 1]
Input [7]: [prev_title#652, curr_title#653, type#654, n#655, ts#656, year#657, month#658]

(5) Scan csv 
Output [2]: [curr_title#997, curr_category#998]
Batched: false
Location: InMemoryFileIndex [file:/mnt/c/Users/Justine/Data_engineering/lab3/data/lab3_dim_curr_category.csv]
PushedFilters: [IsNotNull(curr_title)]
ReadSchema: struct<curr_title:string,curr_category:string>

(6) Filter
Input [2]: [curr_title#997, curr_category#998]
Condition : isnotnull(curr_title#997)

(7) BroadcastExchange
Input [2]: [curr_title#997, curr_category#998]
Arguments: HashedRelationBroadcastMode(List(input[0, string, false]),false), [plan_id=220]

(8) BroadcastHashJoin
Left keys [1]: [curr_title#653]
Right keys [1]: [curr_title#997]
Join type: LeftOuter
Join condition: None

(9) Project
Output [2]: [n#655, curr_category#998]
Input [4]: [curr_title#653, n#655, curr_title#997, curr_category#998]

(10) HashAggregate
Input [2]: [n#655, curr_category#998]
Keys [1]: [curr_category#998]
Functions [1]: [partial_sum(n#655)]
Aggregate Attributes [1]: [sum#1115L]
Results [2]: [curr_category#998, sum#1116L]

(11) Exchange
Input [2]: [curr_category#998, sum#1116L]
Arguments: hashpartitioning(curr_category#998, 200), ENSURE_REQUIREMENTS, [plan_id=225]

(12) HashAggregate
Input [2]: [curr_category#998, sum#1116L]
Keys [1]: [curr_category#998]
Functions [1]: [sum(n#655)]
Aggregate Attributes [1]: [sum(n#655)#1009L]
Results [2]: [curr_category#998, sum(n#655)#1009L AS total_n#1000L]

(13) Exchange
Input [2]: [curr_category#998, total_n#1000L]
Arguments: rangepartitioning(total_n#1000L DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [plan_id=228]

(14) Sort
Input [2]: [curr_category#998, total_n#1000L]
Arguments: [total_n#1000L DESC NULLS LAST], true, 0

(15) AdaptiveSparkPlan
Output [2]: [curr_category#998, total_n#1000L]
Arguments: isFinalPlan=false


10 rows exported to lab3_metrics_log.csv
== Physical Plan ==
AdaptiveSparkPlan (15)
+- Sort (14)
   +- Exchange (13)
      +- HashAggregate (12)
         +- Exchange (11)
            +- HashAggregate (10)
               +- Project (9)
                  +- BroadcastHashJoin LeftOuter BuildRight (8)
                     :- InMemoryTableScan (1)
                     :     +- InMemoryRelation (2)
                     :           +- * ColumnarToRow (4)
                     :              +- Scan parquet  (3)
                     +- BroadcastExchange (7)
                        +- Filter (6)
                           +- Scan csv  (5)


(1) InMemoryTableScan
Output [2]: [curr_title#653, n#655]
Arguments: [curr_title#653, n#655]

(2) InMemoryRelation
Arguments: [prev_title#652, curr_title#653, type#654, n#655, ts#656, year#657, month#658], StorageLevel(disk, memory, deserialized, 1 replicas)

(3) Scan parquet 
Output [7]: [prev_title#652, curr_title#653, type#654, n#655, ts#656, year#657, month#658]
Batched: true
Location: InMemoryFileIndex [file:/mnt/c/Users/Justine/Data_engineering/lab3/outputs/columnar/clicks_parquet]
ReadSchema: struct<prev_title:string,curr_title:string,type:string,n:int,ts:timestamp>

(4) ColumnarToRow [codegen id : 1]
Input [7]: [prev_title#652, curr_title#653, type#654, n#655, ts#656, year#657, month#658]

(5) Scan csv 
Output [2]: [curr_title#997, curr_category#998]
Batched: false
Location: InMemoryFileIndex [file:/mnt/c/Users/Justine/Data_engineering/lab3/data/lab3_dim_curr_category.csv]
PushedFilters: [IsNotNull(curr_title)]
ReadSchema: struct<curr_title:string,curr_category:string>

(6) Filter
Input [2]: [curr_title#997, curr_category#998]
Condition : isnotnull(curr_title#997)

(7) BroadcastExchange
Input [2]: [curr_title#997, curr_category#998]
Arguments: HashedRelationBroadcastMode(List(input[0, string, false]),false), [plan_id=266]

(8) BroadcastHashJoin
Left keys [1]: [curr_title#653]
Right keys [1]: [curr_title#997]
Join type: LeftOuter
Join condition: None

(9) Project
Output [2]: [n#655, curr_category#998]
Input [4]: [curr_title#653, n#655, curr_title#997, curr_category#998]

(10) HashAggregate
Input [2]: [n#655, curr_category#998]
Keys [1]: [curr_category#998]
Functions [1]: [partial_sum(n#655)]
Aggregate Attributes [1]: [sum#1232L]
Results [2]: [curr_category#998, sum#1233L]

(11) Exchange
Input [2]: [curr_category#998, sum#1233L]
Arguments: hashpartitioning(curr_category#998, 200), ENSURE_REQUIREMENTS, [plan_id=271]

(12) HashAggregate
Input [2]: [curr_category#998, sum#1233L]
Keys [1]: [curr_category#998]
Functions [1]: [sum(n#655)]
Aggregate Attributes [1]: [sum(n#655)#1126L]
Results [2]: [curr_category#998, sum(n#655)#1126L AS total_n#1117L]

(13) Exchange
Input [2]: [curr_category#998, total_n#1117L]
Arguments: rangepartitioning(total_n#1117L DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [plan_id=274]

(14) Sort
Input [2]: [curr_category#998, total_n#1117L]
Arguments: [total_n#1117L DESC NULLS LAST], true, 0

(15) AdaptiveSparkPlan
Output [2]: [curr_category#998, total_n#1117L]
Arguments: isFinalPlan=false


10 rows exported to lab3_metrics_log.csv
Saved proof/plan_broadcast.txt

4. Additional queries for metrics¶

In [8]:
# Q2: daily GMV‑like metric (sum of n) for a specific title window
q2_row = (row_df.filter((F.col("type")=="link") & F.col("curr_title").isin("Apache_Spark","PySpark"))
           .groupBy("year","month","curr_title").agg(F.sum("n").alias("n")).orderBy("year","month","curr_title"))
q2_col = (col_df.filter((F.col("type")=="link") & F.col("curr_title").isin("Apache_Spark","PySpark"))
           .groupBy("year","month","curr_title").agg(F.sum("n").alias("n")).orderBy("year","month","curr_title"))

# Trigger
_ = q2_row.count(); _ = q2_col.count()
export_lab3_metrics(spark, "r1", "Q2", representation="row")
export_lab3_metrics(spark, "r1", "Q2", representation="column")

# Q3: heavy cardinality grouping
q3_row = row_df.groupBy("prev_title","curr_title").agg(F.sum("n").alias("n")).orderBy(F.desc("n")).limit(100)
q3_col = col_df.groupBy("prev_title","curr_title").agg(F.sum("n").alias("n")).orderBy(F.desc("n")).limit(100)
_ = q3_row.count(); _ = q3_col.count()
export_lab3_metrics(spark, "r1", "Q3", representation="row")
export_lab3_metrics(spark, "r1", "Q3", representation="column")

print("Open Spark UI at http://localhost:4040 while each job runs and record metrics into lab3_metrics_log.csv")
22 rows exported to lab3_metrics_log.csv
22 rows exported to lab3_metrics_log.csv
28 rows exported to lab3_metrics_log.csv
28 rows exported to lab3_metrics_log.csv
Open Spark UI at http://localhost:4040 while each job runs and record metrics into lab3_metrics_log.csv

5. Save sample outputs¶

In [14]:
import pathlib, pandas as pd
pathlib.Path("outputs").mkdir(parents=True, exist_ok=True)
q1_row.limit(10).toPandas().to_csv("outputs/q1_row_top10.csv", index=False)
q1_col.limit(10).toPandas().to_csv("outputs/q1_col_top10.csv", index=False)
j2.limit(20).toPandas().to_csv("outputs/j2_broadcast_sample.csv", index=False)
print("Saved sample outputs in outputs")
Saved sample outputs in outputs

6. Cleanup¶

In [9]:
spark.stop()
print("Spark session stopped.")
Spark session stopped.
In [ ]: