DE1 — Lab 2: PostgreSQL → Star Schema ETL¶

Author : Badr TAJINI - Data Engineering I - ESIEE 2025-2026


Execute all cells. Attach evidence and fill metrics.

0. Setup and schemas¶

In [44]:
from pyspark.sql import SparkSession, functions as F, types as T
spark = SparkSession.builder.appName("de1-lab2").getOrCreate()
base = "data/"
# Explicit schemas
customers_schema = T.StructType([
    T.StructField("customer_id", T.IntegerType(), False),
    T.StructField("name", T.StringType(), True),
    T.StructField("email", T.StringType(), True),
    T.StructField("created_at", T.TimestampType(), True),
])
brands_schema = T.StructType([
    T.StructField("brand_id", T.IntegerType(), False),
    T.StructField("brand_name", T.StringType(), True),
])
categories_schema = T.StructType([
    T.StructField("category_id", T.IntegerType(), False),
    T.StructField("category_name", T.StringType(), True),
])
products_schema = T.StructType([
    T.StructField("product_id", T.IntegerType(), False),
    T.StructField("product_name", T.StringType(), True),
    T.StructField("brand_id", T.IntegerType(), True),
    T.StructField("category_id", T.IntegerType(), True),
    T.StructField("price", T.DoubleType(), True),
])
orders_schema = T.StructType([
    T.StructField("order_id", T.IntegerType(), False),
    T.StructField("customer_id", T.IntegerType(), True),
    T.StructField("order_date", T.TimestampType(), True),
])
order_items_schema = T.StructType([
    T.StructField("order_item_id", T.IntegerType(), False),
    T.StructField("order_id", T.IntegerType(), True),
    T.StructField("product_id", T.IntegerType(), True),
    T.StructField("quantity", T.IntegerType(), True),
    T.StructField("unit_price", T.DoubleType(), True),
])
In [32]:
import os

# Affiche le chemin absolu du répertoire courant
print(os.getcwd())
/mnt/c/Users/Justine/Data_engineering/lab2

1. Ingest operational tables (from CSV exports)¶

In [45]:
customers = spark.read.schema(customers_schema).option("header","true").csv(base+"lab2_customers.csv")
brands = spark.read.schema(brands_schema).option("header","true").csv(base+"lab2_brands.csv")
categories = spark.read.schema(categories_schema).option("header","true").csv(base+"lab2_categories.csv")
products = spark.read.schema(products_schema).option("header","true").csv(base+"lab2_products.csv")
orders = spark.read.schema(orders_schema).option("header","true").csv(base+"lab2_orders.csv")
order_items = spark.read.schema(order_items_schema).option("header","true").csv(base+"lab2_order_items.csv")

for name, df in [("customers",customers),("brands",brands),("categories",categories),("products",products),("orders",orders),("order_items",order_items)]:
    print(name, df.count())
customers 24
brands 8
categories 9
products 60
orders 220
order_items 638
In [34]:
import requests
import pandas as pd
import os
from datetime import datetime

def export_spark_metrics_lab2(spark, run_id, task, note="", output_csv="lab2_metrics_log.csv"):
    """
    Exporte les métriques Spark (stages) pour Lab2 dans un CSV.
    - spark: SparkSession active
    - run_id: identifiant du run (ex: "r1")
    - task: nom de la tâche (ex: "ingest_plan")
    - note: commentaire ou variante testée
    - output_csv: nom du fichier de sortie
    """
    # Trigger computation pour s'assurer que Spark a fini le job
    try:
        spark.sparkContext.statusTracker().getJobIdsForGroup(None)
    except:
        pass
    
    ui_url = spark.sparkContext.uiWebUrl
    app_id = spark.sparkContext.applicationId
    
    if not ui_url:
        print("Pas d'UI Spark détectée (aucun job en cours ?)")
        return
    
    try:
        stages_url = f"{ui_url}/api/v1/applications/{app_id}/stages"
        stages = requests.get(stages_url).json()
    except Exception as e:
        print(f"Erreur API Spark: {e}")
        return
    
    data = []
    for s in stages:
        stage_id = s.get("stageId")
        job_ids = s.get("jobIds", [])
        
        data.append({
            "run_id": task,  # on met le nom de la task ici
            "task": task,
            "note": note,
            "files_read": s.get("inputRecords", 0),
            "input_size_bytes": s.get("inputBytes", 0),
            "shuffle_read_bytes": s.get("shuffleReadBytes", 0),
            "shuffle_write_bytes": s.get("shuffleWriteBytes", 0),
            "timestamp": datetime.now().isoformat()
        })
    
    df = pd.DataFrame(data)
    
    if df.empty:
        print(f"Aucune métrique trouvée pour {task} (job peut-être pas encore exécuté)")
        return
    
    # Écrire ou ajouter au CSV
    if os.path.exists(output_csv):
        df.to_csv(output_csv, mode="a", header=False, index=False)
    else:
        df.to_csv(output_csv, index=False)
    
    print(f"{len(df)} lignes exportées vers {output_csv} pour la tâche {task}")

Evidence: ingestion plan¶

In [46]:
ingest = orders.join(order_items, "order_id").select("order_id").distinct()
ingest.explain("formatted")
export_spark_metrics_lab2(spark, run_id="r1", task="ingest_plan", note="ingest operational tables")

from datetime import datetime as _dt
import pathlib
pathlib.Path("proof").mkdir(exist_ok=True)
with open("proof/plan_ingest.txt","w") as f:
    f.write(str(_dt.now())+"\n")
    f.write(ingest._jdf.queryExecution().executedPlan().toString())
print("Saved proof/plan_ingest.txt")
== Physical Plan ==
AdaptiveSparkPlan (11)
+- HashAggregate (10)
   +- Exchange (9)
      +- HashAggregate (8)
         +- Project (7)
            +- BroadcastHashJoin Inner BuildLeft (6)
               :- BroadcastExchange (3)
               :  +- Filter (2)
               :     +- Scan csv  (1)
               +- Filter (5)
                  +- Scan csv  (4)


(1) Scan csv 
Output [1]: [order_id#403]
Batched: false
Location: InMemoryFileIndex [file:/mnt/c/Users/Justine/Data_engineering/lab2/data/lab2_orders.csv]
PushedFilters: [IsNotNull(order_id)]
ReadSchema: struct<order_id:int>

(2) Filter
Input [1]: [order_id#403]
Condition : isnotnull(order_id#403)

(3) BroadcastExchange
Input [1]: [order_id#403]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=3615]

(4) Scan csv 
Output [1]: [order_id#407]
Batched: false
Location: InMemoryFileIndex [file:/mnt/c/Users/Justine/Data_engineering/lab2/data/lab2_order_items.csv]
PushedFilters: [IsNotNull(order_id)]
ReadSchema: struct<order_id:int>

(5) Filter
Input [1]: [order_id#407]
Condition : isnotnull(order_id#407)

(6) BroadcastHashJoin
Left keys [1]: [order_id#403]
Right keys [1]: [order_id#407]
Join type: Inner
Join condition: None

(7) Project
Output [1]: [order_id#403]
Input [2]: [order_id#403, order_id#407]

(8) HashAggregate
Input [1]: [order_id#403]
Keys [1]: [order_id#403]
Functions: []
Aggregate Attributes: []
Results [1]: [order_id#403]

(9) Exchange
Input [1]: [order_id#403]
Arguments: hashpartitioning(order_id#403, 200), ENSURE_REQUIREMENTS, [plan_id=3620]

(10) HashAggregate
Input [1]: [order_id#403]
Keys [1]: [order_id#403]
Functions: []
Aggregate Attributes: []
Results [1]: [order_id#403]

(11) AdaptiveSparkPlan
Output [1]: [order_id#403]
Arguments: isFinalPlan=false


Saved proof/plan_ingest.txt

2. Surrogate key function¶

In [48]:
def sk(cols):
    # stable 64-bit positive surrogate key from natural keys
    return F.abs(F.xxhash64(*[F.col(c) for c in cols]))

3. Build dimensions¶

In [49]:
dim_customer = customers.select(
    sk(["customer_id"]).alias("customer_sk"),
    "customer_id","name","email","created_at"
)

dim_brand = brands.select(
    sk(["brand_id"]).alias("brand_sk"),
    "brand_id","brand_name"
)

dim_category = categories.select(
    sk(["category_id"]).alias("category_sk"),
    "category_id","category_name"
)

dim_product = products.select(
    sk(["product_id"]).alias("product_sk"),
    "product_id","product_name",
    sk(["brand_id"]).alias("brand_sk"),
    sk(["category_id"]).alias("category_sk"),
    "price"
)

4. Build date dimension¶

In [50]:
from pyspark.sql import Window as W
dates = orders.select(F.to_date("order_date").alias("date")).distinct()
dim_date = dates.select(
    sk(["date"]).alias("date_sk"),
    F.col("date"),
    F.year("date").alias("year"),
    F.month("date").alias("month"),
    F.dayofmonth("date").alias("day"),
    F.date_format("date","E").alias("dow")
)

5. Build fact_sales with broadcast joins where appropriate¶

In [51]:
oi = order_items.alias("oi")
p = products.alias("p")
o = orders.alias("o")
c = customers.alias("c")

# Join with small dimensions using DF copies to compute SKs, then broadcast dims by size heuristic
df_fact = (oi
    .join(p, F.col("oi.product_id")==F.col("p.product_id"))
    .drop(F.col("p.product_id"))
    .join(o, "order_id")
    .join(c, "customer_id")
    .withColumn("date", F.to_date("order_date"))
)

# Attach surrogate keys
df_fact = (df_fact
    .withColumn("date_sk", sk(["date"]))
    .withColumn("customer_sk", sk(["customer_id"]))
    .withColumn("product_sk", sk(["product_id"]))
    .withColumn("quantity", F.col("quantity").cast("int"))
    .withColumn("unit_price", F.col("unit_price").cast("double"))
    .withColumn("subtotal", F.col("quantity")*F.col("unit_price"))
    .withColumn("year", F.year("date"))
    .withColumn("month", F.month("date"))
    .select("order_id","date_sk","customer_sk","product_sk","quantity","unit_price","subtotal","year","month")
)

df_fact.explain("formatted")
export_spark_metrics_lab2(spark, run_id="r1", task="fact_join", note="build fact_sales with SKs")
with open("proof/plan_fact_join.txt","w") as f:
    from datetime import datetime as _dt
    f.write(str(_dt.now())+"\n")
    f.write(df_fact._jdf.queryExecution().executedPlan().toString())
print("Saved proof/plan_fact_join.txt")
== Physical Plan ==
AdaptiveSparkPlan (19)
+- Project (18)
   +- Project (17)
      +- BroadcastHashJoin Inner BuildRight (16)
         :- Project (12)
         :  +- BroadcastHashJoin Inner BuildRight (11)
         :     :- Project (7)
         :     :  +- BroadcastHashJoin Inner BuildRight (6)
         :     :     :- Filter (2)
         :     :     :  +- Scan csv  (1)
         :     :     +- BroadcastExchange (5)
         :     :        +- Filter (4)
         :     :           +- Scan csv  (3)
         :     +- BroadcastExchange (10)
         :        +- Filter (9)
         :           +- Scan csv  (8)
         +- BroadcastExchange (15)
            +- Filter (14)
               +- Scan csv  (13)


(1) Scan csv 
Output [4]: [order_id#407, product_id#408, quantity#409, unit_price#410]
Batched: false
Location: InMemoryFileIndex [file:/mnt/c/Users/Justine/Data_engineering/lab2/data/lab2_order_items.csv]
PushedFilters: [IsNotNull(product_id), IsNotNull(order_id)]
ReadSchema: struct<order_id:int,product_id:int,quantity:int,unit_price:double>

(2) Filter
Input [4]: [order_id#407, product_id#408, quantity#409, unit_price#410]
Condition : (isnotnull(product_id#408) AND isnotnull(order_id#407))

(3) Scan csv 
Output [1]: [product_id#398]
Batched: false
Location: InMemoryFileIndex [file:/mnt/c/Users/Justine/Data_engineering/lab2/data/lab2_products.csv]
PushedFilters: [IsNotNull(product_id)]
ReadSchema: struct<product_id:int>

(4) Filter
Input [1]: [product_id#398]
Condition : isnotnull(product_id#398)

(5) BroadcastExchange
Input [1]: [product_id#398]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=3689]

(6) BroadcastHashJoin
Left keys [1]: [product_id#408]
Right keys [1]: [product_id#398]
Join type: Inner
Join condition: None

(7) Project
Output [4]: [order_id#407, product_id#408, quantity#409, unit_price#410]
Input [5]: [order_id#407, product_id#408, quantity#409, unit_price#410, product_id#398]

(8) Scan csv 
Output [3]: [order_id#403, customer_id#404, order_date#405]
Batched: false
Location: InMemoryFileIndex [file:/mnt/c/Users/Justine/Data_engineering/lab2/data/lab2_orders.csv]
PushedFilters: [IsNotNull(order_id), IsNotNull(customer_id)]
ReadSchema: struct<order_id:int,customer_id:int,order_date:timestamp>

(9) Filter
Input [3]: [order_id#403, customer_id#404, order_date#405]
Condition : (isnotnull(order_id#403) AND isnotnull(customer_id#404))

(10) BroadcastExchange
Input [3]: [order_id#403, customer_id#404, order_date#405]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=3693]

(11) BroadcastHashJoin
Left keys [1]: [order_id#407]
Right keys [1]: [order_id#403]
Join type: Inner
Join condition: None

(12) Project
Output [6]: [order_id#407, product_id#408, quantity#409, unit_price#410, customer_id#404, order_date#405]
Input [7]: [order_id#407, product_id#408, quantity#409, unit_price#410, order_id#403, customer_id#404, order_date#405]

(13) Scan csv 
Output [1]: [customer_id#390]
Batched: false
Location: InMemoryFileIndex [file:/mnt/c/Users/Justine/Data_engineering/lab2/data/lab2_customers.csv]
PushedFilters: [IsNotNull(customer_id)]
ReadSchema: struct<customer_id:int>

(14) Filter
Input [1]: [customer_id#390]
Condition : isnotnull(customer_id#390)

(15) BroadcastExchange
Input [1]: [customer_id#390]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=3697]

(16) BroadcastHashJoin
Left keys [1]: [customer_id#404]
Right keys [1]: [customer_id#390]
Join type: Inner
Join condition: None

(17) Project
Output [6]: [customer_id#404, order_id#407, product_id#408, quantity#409, unit_price#410, cast(order_date#405 as date) AS date#474]
Input [7]: [order_id#407, product_id#408, quantity#409, unit_price#410, customer_id#404, order_date#405, customer_id#390]

(18) Project
Output [9]: [order_id#407, abs(xxhash64(date#474, 42)) AS date_sk#475L, abs(xxhash64(customer_id#404, 42)) AS customer_sk#476L, abs(xxhash64(product_id#408, 42)) AS product_sk#477L, quantity#409, unit_price#410, (cast(quantity#409 as double) * unit_price#410) AS subtotal#480, year(date#474) AS year#481, month(date#474) AS month#482]
Input [6]: [customer_id#404, order_id#407, product_id#408, quantity#409, unit_price#410, date#474]

(19) AdaptiveSparkPlan
Output [9]: [order_id#407, date_sk#475L, customer_sk#476L, product_sk#477L, quantity#409, unit_price#410, subtotal#480, year#481, month#482]
Arguments: isFinalPlan=false


Saved proof/plan_fact_join.txt

6. Write Parquet outputs (partitioned by year, month)¶

In [52]:
base_out = "outputs"
(dim_customer.write.mode("overwrite").parquet(f"{base_out}/dim_customer"))
(dim_brand.write.mode("overwrite").parquet(f"{base_out}/dim_brand"))
(dim_category.write.mode("overwrite").parquet(f"{base_out}/dim_category"))
(dim_product.write.mode("overwrite").parquet(f"{base_out}/dim_product"))
(dim_date.write.mode("overwrite").parquet(f"{base_out}/dim_date"))
(df_fact.write.mode("overwrite").partitionBy("year","month").parquet(f"{base_out}/fact_sales"))
print("Parquet written under outputs")
                                                                                
Parquet written under outputs

7. Plan comparison: projection and layout¶

In [53]:
# Case A: join and then project
a = (orders.join(order_items, "order_id")
            .join(products, "product_id")
            .groupBy(F.to_date("order_date").alias("d"))
            .agg(F.sum(F.col("quantity")*F.col("price")).alias("gmv")))
a.explain("formatted")
_ = a.count()
export_spark_metrics_lab2(spark, run_id="r1", task="caseA_join_then_project", note="join then aggregate")

# Case B: project early
b = (orders.select("order_id","order_date")
            .join(order_items.select("order_id","product_id","quantity"), "order_id")
            .join(products.select("product_id","price"), "product_id")
            .groupBy(F.to_date("order_date").alias("d"))
            .agg(F.sum(F.col("quantity")*F.col("price")).alias("gmv")))
b.explain("formatted")
_ = b.count()
export_spark_metrics_lab2(spark, run_id="r1", task="caseB_project_then_join", note="project early then join")

print("Record Spark UI metrics for both runs in lab2_metrics_log.csv")
== Physical Plan ==
AdaptiveSparkPlan (16)
+- HashAggregate (15)
   +- Exchange (14)
      +- HashAggregate (13)
         +- Project (12)
            +- BroadcastHashJoin Inner BuildRight (11)
               :- Project (7)
               :  +- BroadcastHashJoin Inner BuildLeft (6)
               :     :- BroadcastExchange (3)
               :     :  +- Filter (2)
               :     :     +- Scan csv  (1)
               :     +- Filter (5)
               :        +- Scan csv  (4)
               +- BroadcastExchange (10)
                  +- Filter (9)
                     +- Scan csv  (8)


(1) Scan csv 
Output [2]: [order_id#403, order_date#405]
Batched: false
Location: InMemoryFileIndex [file:/mnt/c/Users/Justine/Data_engineering/lab2/data/lab2_orders.csv]
PushedFilters: [IsNotNull(order_id)]
ReadSchema: struct<order_id:int,order_date:timestamp>

(2) Filter
Input [2]: [order_id#403, order_date#405]
Condition : isnotnull(order_id#403)

(3) BroadcastExchange
Input [2]: [order_id#403, order_date#405]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=4214]

(4) Scan csv 
Output [3]: [order_id#407, product_id#408, quantity#409]
Batched: false
Location: InMemoryFileIndex [file:/mnt/c/Users/Justine/Data_engineering/lab2/data/lab2_order_items.csv]
PushedFilters: [IsNotNull(order_id), IsNotNull(product_id)]
ReadSchema: struct<order_id:int,product_id:int,quantity:int>

(5) Filter
Input [3]: [order_id#407, product_id#408, quantity#409]
Condition : (isnotnull(order_id#407) AND isnotnull(product_id#408))

(6) BroadcastHashJoin
Left keys [1]: [order_id#403]
Right keys [1]: [order_id#407]
Join type: Inner
Join condition: None

(7) Project
Output [3]: [order_date#405, product_id#408, quantity#409]
Input [5]: [order_id#403, order_date#405, order_id#407, product_id#408, quantity#409]

(8) Scan csv 
Output [2]: [product_id#398, price#402]
Batched: false
Location: InMemoryFileIndex [file:/mnt/c/Users/Justine/Data_engineering/lab2/data/lab2_products.csv]
PushedFilters: [IsNotNull(product_id)]
ReadSchema: struct<product_id:int,price:double>

(9) Filter
Input [2]: [product_id#398, price#402]
Condition : isnotnull(product_id#398)

(10) BroadcastExchange
Input [2]: [product_id#398, price#402]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=4218]

(11) BroadcastHashJoin
Left keys [1]: [product_id#408]
Right keys [1]: [product_id#398]
Join type: Inner
Join condition: None

(12) Project
Output [3]: [quantity#409, price#402, cast(order_date#405 as date) AS _groupingexpression#520]
Input [5]: [order_date#405, product_id#408, quantity#409, product_id#398, price#402]

(13) HashAggregate
Input [3]: [quantity#409, price#402, _groupingexpression#520]
Keys [1]: [_groupingexpression#520]
Functions [1]: [partial_sum((cast(quantity#409 as double) * price#402))]
Aggregate Attributes [1]: [sum#521]
Results [2]: [_groupingexpression#520, sum#522]

(14) Exchange
Input [2]: [_groupingexpression#520, sum#522]
Arguments: hashpartitioning(_groupingexpression#520, 200), ENSURE_REQUIREMENTS, [plan_id=4223]

(15) HashAggregate
Input [2]: [_groupingexpression#520, sum#522]
Keys [1]: [_groupingexpression#520]
Functions [1]: [sum((cast(quantity#409 as double) * price#402))]
Aggregate Attributes [1]: [sum((cast(quantity#409 as double) * price#402))#519]
Results [2]: [_groupingexpression#520 AS d#506, sum((cast(quantity#409 as double) * price#402))#519 AS gmv#507]

(16) AdaptiveSparkPlan
Output [2]: [d#506, gmv#507]
Arguments: isFinalPlan=false


== Physical Plan ==
AdaptiveSparkPlan (16)
+- HashAggregate (15)
   +- Exchange (14)
      +- HashAggregate (13)
         +- Project (12)
            +- BroadcastHashJoin Inner BuildRight (11)
               :- Project (7)
               :  +- BroadcastHashJoin Inner BuildLeft (6)
               :     :- BroadcastExchange (3)
               :     :  +- Filter (2)
               :     :     +- Scan csv  (1)
               :     +- Filter (5)
               :        +- Scan csv  (4)
               +- BroadcastExchange (10)
                  +- Filter (9)
                     +- Scan csv  (8)


(1) Scan csv 
Output [2]: [order_id#403, order_date#405]
Batched: false
Location: InMemoryFileIndex [file:/mnt/c/Users/Justine/Data_engineering/lab2/data/lab2_orders.csv]
PushedFilters: [IsNotNull(order_id)]
ReadSchema: struct<order_id:int,order_date:timestamp>

(2) Filter
Input [2]: [order_id#403, order_date#405]
Condition : isnotnull(order_id#403)

(3) BroadcastExchange
Input [2]: [order_id#403, order_date#405]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=4589]

(4) Scan csv 
Output [3]: [order_id#407, product_id#408, quantity#409]
Batched: false
Location: InMemoryFileIndex [file:/mnt/c/Users/Justine/Data_engineering/lab2/data/lab2_order_items.csv]
PushedFilters: [IsNotNull(order_id), IsNotNull(product_id)]
ReadSchema: struct<order_id:int,product_id:int,quantity:int>

(5) Filter
Input [3]: [order_id#407, product_id#408, quantity#409]
Condition : (isnotnull(order_id#407) AND isnotnull(product_id#408))

(6) BroadcastHashJoin
Left keys [1]: [order_id#403]
Right keys [1]: [order_id#407]
Join type: Inner
Join condition: None

(7) Project
Output [3]: [order_date#405, product_id#408, quantity#409]
Input [5]: [order_id#403, order_date#405, order_id#407, product_id#408, quantity#409]

(8) Scan csv 
Output [2]: [product_id#398, price#402]
Batched: false
Location: InMemoryFileIndex [file:/mnt/c/Users/Justine/Data_engineering/lab2/data/lab2_products.csv]
PushedFilters: [IsNotNull(product_id)]
ReadSchema: struct<product_id:int,price:double>

(9) Filter
Input [2]: [product_id#398, price#402]
Condition : isnotnull(product_id#398)

(10) BroadcastExchange
Input [2]: [product_id#398, price#402]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=4593]

(11) BroadcastHashJoin
Left keys [1]: [product_id#408]
Right keys [1]: [product_id#398]
Join type: Inner
Join condition: None

(12) Project
Output [3]: [quantity#409, price#402, cast(order_date#405 as date) AS _groupingexpression#543]
Input [5]: [order_date#405, product_id#408, quantity#409, product_id#398, price#402]

(13) HashAggregate
Input [3]: [quantity#409, price#402, _groupingexpression#543]
Keys [1]: [_groupingexpression#543]
Functions [1]: [partial_sum((cast(quantity#409 as double) * price#402))]
Aggregate Attributes [1]: [sum#544]
Results [2]: [_groupingexpression#543, sum#545]

(14) Exchange
Input [2]: [_groupingexpression#543, sum#545]
Arguments: hashpartitioning(_groupingexpression#543, 200), ENSURE_REQUIREMENTS, [plan_id=4598]

(15) HashAggregate
Input [2]: [_groupingexpression#543, sum#545]
Keys [1]: [_groupingexpression#543]
Functions [1]: [sum((cast(quantity#409 as double) * price#402))]
Aggregate Attributes [1]: [sum((cast(quantity#409 as double) * price#402))#542]
Results [2]: [_groupingexpression#543 AS d#535, sum((cast(quantity#409 as double) * price#402))#542 AS gmv#536]

(16) AdaptiveSparkPlan
Output [2]: [d#535, gmv#536]
Arguments: isFinalPlan=false


Record Spark UI metrics for both runs in lab2_metrics_log.csv

8. Cleanup¶

In [54]:
spark.stop()
print("Spark session stopped.")
Spark session stopped.
In [ ]: