DE1 — Lab 2: PostgreSQL → Star Schema ETL¶
Author : Badr TAJINI - Data Engineering I - ESIEE 2025-2026
Execute all cells. Attach evidence and fill metrics.
0. Setup and schemas¶
In [44]:
from pyspark.sql import SparkSession, functions as F, types as T
spark = SparkSession.builder.appName("de1-lab2").getOrCreate()
base = "data/"
# Explicit schemas
customers_schema = T.StructType([
T.StructField("customer_id", T.IntegerType(), False),
T.StructField("name", T.StringType(), True),
T.StructField("email", T.StringType(), True),
T.StructField("created_at", T.TimestampType(), True),
])
brands_schema = T.StructType([
T.StructField("brand_id", T.IntegerType(), False),
T.StructField("brand_name", T.StringType(), True),
])
categories_schema = T.StructType([
T.StructField("category_id", T.IntegerType(), False),
T.StructField("category_name", T.StringType(), True),
])
products_schema = T.StructType([
T.StructField("product_id", T.IntegerType(), False),
T.StructField("product_name", T.StringType(), True),
T.StructField("brand_id", T.IntegerType(), True),
T.StructField("category_id", T.IntegerType(), True),
T.StructField("price", T.DoubleType(), True),
])
orders_schema = T.StructType([
T.StructField("order_id", T.IntegerType(), False),
T.StructField("customer_id", T.IntegerType(), True),
T.StructField("order_date", T.TimestampType(), True),
])
order_items_schema = T.StructType([
T.StructField("order_item_id", T.IntegerType(), False),
T.StructField("order_id", T.IntegerType(), True),
T.StructField("product_id", T.IntegerType(), True),
T.StructField("quantity", T.IntegerType(), True),
T.StructField("unit_price", T.DoubleType(), True),
])
In [32]:
import os
# Affiche le chemin absolu du répertoire courant
print(os.getcwd())
/mnt/c/Users/Justine/Data_engineering/lab2
1. Ingest operational tables (from CSV exports)¶
In [45]:
customers = spark.read.schema(customers_schema).option("header","true").csv(base+"lab2_customers.csv")
brands = spark.read.schema(brands_schema).option("header","true").csv(base+"lab2_brands.csv")
categories = spark.read.schema(categories_schema).option("header","true").csv(base+"lab2_categories.csv")
products = spark.read.schema(products_schema).option("header","true").csv(base+"lab2_products.csv")
orders = spark.read.schema(orders_schema).option("header","true").csv(base+"lab2_orders.csv")
order_items = spark.read.schema(order_items_schema).option("header","true").csv(base+"lab2_order_items.csv")
for name, df in [("customers",customers),("brands",brands),("categories",categories),("products",products),("orders",orders),("order_items",order_items)]:
print(name, df.count())
customers 24 brands 8 categories 9 products 60 orders 220 order_items 638
In [34]:
import requests
import pandas as pd
import os
from datetime import datetime
def export_spark_metrics_lab2(spark, run_id, task, note="", output_csv="lab2_metrics_log.csv"):
"""
Exporte les métriques Spark (stages) pour Lab2 dans un CSV.
- spark: SparkSession active
- run_id: identifiant du run (ex: "r1")
- task: nom de la tâche (ex: "ingest_plan")
- note: commentaire ou variante testée
- output_csv: nom du fichier de sortie
"""
# Trigger computation pour s'assurer que Spark a fini le job
try:
spark.sparkContext.statusTracker().getJobIdsForGroup(None)
except:
pass
ui_url = spark.sparkContext.uiWebUrl
app_id = spark.sparkContext.applicationId
if not ui_url:
print("Pas d'UI Spark détectée (aucun job en cours ?)")
return
try:
stages_url = f"{ui_url}/api/v1/applications/{app_id}/stages"
stages = requests.get(stages_url).json()
except Exception as e:
print(f"Erreur API Spark: {e}")
return
data = []
for s in stages:
stage_id = s.get("stageId")
job_ids = s.get("jobIds", [])
data.append({
"run_id": task, # on met le nom de la task ici
"task": task,
"note": note,
"files_read": s.get("inputRecords", 0),
"input_size_bytes": s.get("inputBytes", 0),
"shuffle_read_bytes": s.get("shuffleReadBytes", 0),
"shuffle_write_bytes": s.get("shuffleWriteBytes", 0),
"timestamp": datetime.now().isoformat()
})
df = pd.DataFrame(data)
if df.empty:
print(f"Aucune métrique trouvée pour {task} (job peut-être pas encore exécuté)")
return
# Écrire ou ajouter au CSV
if os.path.exists(output_csv):
df.to_csv(output_csv, mode="a", header=False, index=False)
else:
df.to_csv(output_csv, index=False)
print(f"{len(df)} lignes exportées vers {output_csv} pour la tâche {task}")
Evidence: ingestion plan¶
In [46]:
ingest = orders.join(order_items, "order_id").select("order_id").distinct()
ingest.explain("formatted")
export_spark_metrics_lab2(spark, run_id="r1", task="ingest_plan", note="ingest operational tables")
from datetime import datetime as _dt
import pathlib
pathlib.Path("proof").mkdir(exist_ok=True)
with open("proof/plan_ingest.txt","w") as f:
f.write(str(_dt.now())+"\n")
f.write(ingest._jdf.queryExecution().executedPlan().toString())
print("Saved proof/plan_ingest.txt")
== Physical Plan ==
AdaptiveSparkPlan (11)
+- HashAggregate (10)
+- Exchange (9)
+- HashAggregate (8)
+- Project (7)
+- BroadcastHashJoin Inner BuildLeft (6)
:- BroadcastExchange (3)
: +- Filter (2)
: +- Scan csv (1)
+- Filter (5)
+- Scan csv (4)
(1) Scan csv
Output [1]: [order_id#403]
Batched: false
Location: InMemoryFileIndex [file:/mnt/c/Users/Justine/Data_engineering/lab2/data/lab2_orders.csv]
PushedFilters: [IsNotNull(order_id)]
ReadSchema: struct<order_id:int>
(2) Filter
Input [1]: [order_id#403]
Condition : isnotnull(order_id#403)
(3) BroadcastExchange
Input [1]: [order_id#403]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=3615]
(4) Scan csv
Output [1]: [order_id#407]
Batched: false
Location: InMemoryFileIndex [file:/mnt/c/Users/Justine/Data_engineering/lab2/data/lab2_order_items.csv]
PushedFilters: [IsNotNull(order_id)]
ReadSchema: struct<order_id:int>
(5) Filter
Input [1]: [order_id#407]
Condition : isnotnull(order_id#407)
(6) BroadcastHashJoin
Left keys [1]: [order_id#403]
Right keys [1]: [order_id#407]
Join type: Inner
Join condition: None
(7) Project
Output [1]: [order_id#403]
Input [2]: [order_id#403, order_id#407]
(8) HashAggregate
Input [1]: [order_id#403]
Keys [1]: [order_id#403]
Functions: []
Aggregate Attributes: []
Results [1]: [order_id#403]
(9) Exchange
Input [1]: [order_id#403]
Arguments: hashpartitioning(order_id#403, 200), ENSURE_REQUIREMENTS, [plan_id=3620]
(10) HashAggregate
Input [1]: [order_id#403]
Keys [1]: [order_id#403]
Functions: []
Aggregate Attributes: []
Results [1]: [order_id#403]
(11) AdaptiveSparkPlan
Output [1]: [order_id#403]
Arguments: isFinalPlan=false
Saved proof/plan_ingest.txt
2. Surrogate key function¶
In [48]:
def sk(cols):
# stable 64-bit positive surrogate key from natural keys
return F.abs(F.xxhash64(*[F.col(c) for c in cols]))
3. Build dimensions¶
In [49]:
dim_customer = customers.select(
sk(["customer_id"]).alias("customer_sk"),
"customer_id","name","email","created_at"
)
dim_brand = brands.select(
sk(["brand_id"]).alias("brand_sk"),
"brand_id","brand_name"
)
dim_category = categories.select(
sk(["category_id"]).alias("category_sk"),
"category_id","category_name"
)
dim_product = products.select(
sk(["product_id"]).alias("product_sk"),
"product_id","product_name",
sk(["brand_id"]).alias("brand_sk"),
sk(["category_id"]).alias("category_sk"),
"price"
)
4. Build date dimension¶
In [50]:
from pyspark.sql import Window as W
dates = orders.select(F.to_date("order_date").alias("date")).distinct()
dim_date = dates.select(
sk(["date"]).alias("date_sk"),
F.col("date"),
F.year("date").alias("year"),
F.month("date").alias("month"),
F.dayofmonth("date").alias("day"),
F.date_format("date","E").alias("dow")
)
5. Build fact_sales with broadcast joins where appropriate¶
In [51]:
oi = order_items.alias("oi")
p = products.alias("p")
o = orders.alias("o")
c = customers.alias("c")
# Join with small dimensions using DF copies to compute SKs, then broadcast dims by size heuristic
df_fact = (oi
.join(p, F.col("oi.product_id")==F.col("p.product_id"))
.drop(F.col("p.product_id"))
.join(o, "order_id")
.join(c, "customer_id")
.withColumn("date", F.to_date("order_date"))
)
# Attach surrogate keys
df_fact = (df_fact
.withColumn("date_sk", sk(["date"]))
.withColumn("customer_sk", sk(["customer_id"]))
.withColumn("product_sk", sk(["product_id"]))
.withColumn("quantity", F.col("quantity").cast("int"))
.withColumn("unit_price", F.col("unit_price").cast("double"))
.withColumn("subtotal", F.col("quantity")*F.col("unit_price"))
.withColumn("year", F.year("date"))
.withColumn("month", F.month("date"))
.select("order_id","date_sk","customer_sk","product_sk","quantity","unit_price","subtotal","year","month")
)
df_fact.explain("formatted")
export_spark_metrics_lab2(spark, run_id="r1", task="fact_join", note="build fact_sales with SKs")
with open("proof/plan_fact_join.txt","w") as f:
from datetime import datetime as _dt
f.write(str(_dt.now())+"\n")
f.write(df_fact._jdf.queryExecution().executedPlan().toString())
print("Saved proof/plan_fact_join.txt")
== Physical Plan ==
AdaptiveSparkPlan (19)
+- Project (18)
+- Project (17)
+- BroadcastHashJoin Inner BuildRight (16)
:- Project (12)
: +- BroadcastHashJoin Inner BuildRight (11)
: :- Project (7)
: : +- BroadcastHashJoin Inner BuildRight (6)
: : :- Filter (2)
: : : +- Scan csv (1)
: : +- BroadcastExchange (5)
: : +- Filter (4)
: : +- Scan csv (3)
: +- BroadcastExchange (10)
: +- Filter (9)
: +- Scan csv (8)
+- BroadcastExchange (15)
+- Filter (14)
+- Scan csv (13)
(1) Scan csv
Output [4]: [order_id#407, product_id#408, quantity#409, unit_price#410]
Batched: false
Location: InMemoryFileIndex [file:/mnt/c/Users/Justine/Data_engineering/lab2/data/lab2_order_items.csv]
PushedFilters: [IsNotNull(product_id), IsNotNull(order_id)]
ReadSchema: struct<order_id:int,product_id:int,quantity:int,unit_price:double>
(2) Filter
Input [4]: [order_id#407, product_id#408, quantity#409, unit_price#410]
Condition : (isnotnull(product_id#408) AND isnotnull(order_id#407))
(3) Scan csv
Output [1]: [product_id#398]
Batched: false
Location: InMemoryFileIndex [file:/mnt/c/Users/Justine/Data_engineering/lab2/data/lab2_products.csv]
PushedFilters: [IsNotNull(product_id)]
ReadSchema: struct<product_id:int>
(4) Filter
Input [1]: [product_id#398]
Condition : isnotnull(product_id#398)
(5) BroadcastExchange
Input [1]: [product_id#398]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=3689]
(6) BroadcastHashJoin
Left keys [1]: [product_id#408]
Right keys [1]: [product_id#398]
Join type: Inner
Join condition: None
(7) Project
Output [4]: [order_id#407, product_id#408, quantity#409, unit_price#410]
Input [5]: [order_id#407, product_id#408, quantity#409, unit_price#410, product_id#398]
(8) Scan csv
Output [3]: [order_id#403, customer_id#404, order_date#405]
Batched: false
Location: InMemoryFileIndex [file:/mnt/c/Users/Justine/Data_engineering/lab2/data/lab2_orders.csv]
PushedFilters: [IsNotNull(order_id), IsNotNull(customer_id)]
ReadSchema: struct<order_id:int,customer_id:int,order_date:timestamp>
(9) Filter
Input [3]: [order_id#403, customer_id#404, order_date#405]
Condition : (isnotnull(order_id#403) AND isnotnull(customer_id#404))
(10) BroadcastExchange
Input [3]: [order_id#403, customer_id#404, order_date#405]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=3693]
(11) BroadcastHashJoin
Left keys [1]: [order_id#407]
Right keys [1]: [order_id#403]
Join type: Inner
Join condition: None
(12) Project
Output [6]: [order_id#407, product_id#408, quantity#409, unit_price#410, customer_id#404, order_date#405]
Input [7]: [order_id#407, product_id#408, quantity#409, unit_price#410, order_id#403, customer_id#404, order_date#405]
(13) Scan csv
Output [1]: [customer_id#390]
Batched: false
Location: InMemoryFileIndex [file:/mnt/c/Users/Justine/Data_engineering/lab2/data/lab2_customers.csv]
PushedFilters: [IsNotNull(customer_id)]
ReadSchema: struct<customer_id:int>
(14) Filter
Input [1]: [customer_id#390]
Condition : isnotnull(customer_id#390)
(15) BroadcastExchange
Input [1]: [customer_id#390]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=3697]
(16) BroadcastHashJoin
Left keys [1]: [customer_id#404]
Right keys [1]: [customer_id#390]
Join type: Inner
Join condition: None
(17) Project
Output [6]: [customer_id#404, order_id#407, product_id#408, quantity#409, unit_price#410, cast(order_date#405 as date) AS date#474]
Input [7]: [order_id#407, product_id#408, quantity#409, unit_price#410, customer_id#404, order_date#405, customer_id#390]
(18) Project
Output [9]: [order_id#407, abs(xxhash64(date#474, 42)) AS date_sk#475L, abs(xxhash64(customer_id#404, 42)) AS customer_sk#476L, abs(xxhash64(product_id#408, 42)) AS product_sk#477L, quantity#409, unit_price#410, (cast(quantity#409 as double) * unit_price#410) AS subtotal#480, year(date#474) AS year#481, month(date#474) AS month#482]
Input [6]: [customer_id#404, order_id#407, product_id#408, quantity#409, unit_price#410, date#474]
(19) AdaptiveSparkPlan
Output [9]: [order_id#407, date_sk#475L, customer_sk#476L, product_sk#477L, quantity#409, unit_price#410, subtotal#480, year#481, month#482]
Arguments: isFinalPlan=false
Saved proof/plan_fact_join.txt
6. Write Parquet outputs (partitioned by year, month)¶
In [52]:
base_out = "outputs"
(dim_customer.write.mode("overwrite").parquet(f"{base_out}/dim_customer"))
(dim_brand.write.mode("overwrite").parquet(f"{base_out}/dim_brand"))
(dim_category.write.mode("overwrite").parquet(f"{base_out}/dim_category"))
(dim_product.write.mode("overwrite").parquet(f"{base_out}/dim_product"))
(dim_date.write.mode("overwrite").parquet(f"{base_out}/dim_date"))
(df_fact.write.mode("overwrite").partitionBy("year","month").parquet(f"{base_out}/fact_sales"))
print("Parquet written under outputs")
Parquet written under outputs
7. Plan comparison: projection and layout¶
In [53]:
# Case A: join and then project
a = (orders.join(order_items, "order_id")
.join(products, "product_id")
.groupBy(F.to_date("order_date").alias("d"))
.agg(F.sum(F.col("quantity")*F.col("price")).alias("gmv")))
a.explain("formatted")
_ = a.count()
export_spark_metrics_lab2(spark, run_id="r1", task="caseA_join_then_project", note="join then aggregate")
# Case B: project early
b = (orders.select("order_id","order_date")
.join(order_items.select("order_id","product_id","quantity"), "order_id")
.join(products.select("product_id","price"), "product_id")
.groupBy(F.to_date("order_date").alias("d"))
.agg(F.sum(F.col("quantity")*F.col("price")).alias("gmv")))
b.explain("formatted")
_ = b.count()
export_spark_metrics_lab2(spark, run_id="r1", task="caseB_project_then_join", note="project early then join")
print("Record Spark UI metrics for both runs in lab2_metrics_log.csv")
== Physical Plan ==
AdaptiveSparkPlan (16)
+- HashAggregate (15)
+- Exchange (14)
+- HashAggregate (13)
+- Project (12)
+- BroadcastHashJoin Inner BuildRight (11)
:- Project (7)
: +- BroadcastHashJoin Inner BuildLeft (6)
: :- BroadcastExchange (3)
: : +- Filter (2)
: : +- Scan csv (1)
: +- Filter (5)
: +- Scan csv (4)
+- BroadcastExchange (10)
+- Filter (9)
+- Scan csv (8)
(1) Scan csv
Output [2]: [order_id#403, order_date#405]
Batched: false
Location: InMemoryFileIndex [file:/mnt/c/Users/Justine/Data_engineering/lab2/data/lab2_orders.csv]
PushedFilters: [IsNotNull(order_id)]
ReadSchema: struct<order_id:int,order_date:timestamp>
(2) Filter
Input [2]: [order_id#403, order_date#405]
Condition : isnotnull(order_id#403)
(3) BroadcastExchange
Input [2]: [order_id#403, order_date#405]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=4214]
(4) Scan csv
Output [3]: [order_id#407, product_id#408, quantity#409]
Batched: false
Location: InMemoryFileIndex [file:/mnt/c/Users/Justine/Data_engineering/lab2/data/lab2_order_items.csv]
PushedFilters: [IsNotNull(order_id), IsNotNull(product_id)]
ReadSchema: struct<order_id:int,product_id:int,quantity:int>
(5) Filter
Input [3]: [order_id#407, product_id#408, quantity#409]
Condition : (isnotnull(order_id#407) AND isnotnull(product_id#408))
(6) BroadcastHashJoin
Left keys [1]: [order_id#403]
Right keys [1]: [order_id#407]
Join type: Inner
Join condition: None
(7) Project
Output [3]: [order_date#405, product_id#408, quantity#409]
Input [5]: [order_id#403, order_date#405, order_id#407, product_id#408, quantity#409]
(8) Scan csv
Output [2]: [product_id#398, price#402]
Batched: false
Location: InMemoryFileIndex [file:/mnt/c/Users/Justine/Data_engineering/lab2/data/lab2_products.csv]
PushedFilters: [IsNotNull(product_id)]
ReadSchema: struct<product_id:int,price:double>
(9) Filter
Input [2]: [product_id#398, price#402]
Condition : isnotnull(product_id#398)
(10) BroadcastExchange
Input [2]: [product_id#398, price#402]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=4218]
(11) BroadcastHashJoin
Left keys [1]: [product_id#408]
Right keys [1]: [product_id#398]
Join type: Inner
Join condition: None
(12) Project
Output [3]: [quantity#409, price#402, cast(order_date#405 as date) AS _groupingexpression#520]
Input [5]: [order_date#405, product_id#408, quantity#409, product_id#398, price#402]
(13) HashAggregate
Input [3]: [quantity#409, price#402, _groupingexpression#520]
Keys [1]: [_groupingexpression#520]
Functions [1]: [partial_sum((cast(quantity#409 as double) * price#402))]
Aggregate Attributes [1]: [sum#521]
Results [2]: [_groupingexpression#520, sum#522]
(14) Exchange
Input [2]: [_groupingexpression#520, sum#522]
Arguments: hashpartitioning(_groupingexpression#520, 200), ENSURE_REQUIREMENTS, [plan_id=4223]
(15) HashAggregate
Input [2]: [_groupingexpression#520, sum#522]
Keys [1]: [_groupingexpression#520]
Functions [1]: [sum((cast(quantity#409 as double) * price#402))]
Aggregate Attributes [1]: [sum((cast(quantity#409 as double) * price#402))#519]
Results [2]: [_groupingexpression#520 AS d#506, sum((cast(quantity#409 as double) * price#402))#519 AS gmv#507]
(16) AdaptiveSparkPlan
Output [2]: [d#506, gmv#507]
Arguments: isFinalPlan=false
== Physical Plan ==
AdaptiveSparkPlan (16)
+- HashAggregate (15)
+- Exchange (14)
+- HashAggregate (13)
+- Project (12)
+- BroadcastHashJoin Inner BuildRight (11)
:- Project (7)
: +- BroadcastHashJoin Inner BuildLeft (6)
: :- BroadcastExchange (3)
: : +- Filter (2)
: : +- Scan csv (1)
: +- Filter (5)
: +- Scan csv (4)
+- BroadcastExchange (10)
+- Filter (9)
+- Scan csv (8)
(1) Scan csv
Output [2]: [order_id#403, order_date#405]
Batched: false
Location: InMemoryFileIndex [file:/mnt/c/Users/Justine/Data_engineering/lab2/data/lab2_orders.csv]
PushedFilters: [IsNotNull(order_id)]
ReadSchema: struct<order_id:int,order_date:timestamp>
(2) Filter
Input [2]: [order_id#403, order_date#405]
Condition : isnotnull(order_id#403)
(3) BroadcastExchange
Input [2]: [order_id#403, order_date#405]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=4589]
(4) Scan csv
Output [3]: [order_id#407, product_id#408, quantity#409]
Batched: false
Location: InMemoryFileIndex [file:/mnt/c/Users/Justine/Data_engineering/lab2/data/lab2_order_items.csv]
PushedFilters: [IsNotNull(order_id), IsNotNull(product_id)]
ReadSchema: struct<order_id:int,product_id:int,quantity:int>
(5) Filter
Input [3]: [order_id#407, product_id#408, quantity#409]
Condition : (isnotnull(order_id#407) AND isnotnull(product_id#408))
(6) BroadcastHashJoin
Left keys [1]: [order_id#403]
Right keys [1]: [order_id#407]
Join type: Inner
Join condition: None
(7) Project
Output [3]: [order_date#405, product_id#408, quantity#409]
Input [5]: [order_id#403, order_date#405, order_id#407, product_id#408, quantity#409]
(8) Scan csv
Output [2]: [product_id#398, price#402]
Batched: false
Location: InMemoryFileIndex [file:/mnt/c/Users/Justine/Data_engineering/lab2/data/lab2_products.csv]
PushedFilters: [IsNotNull(product_id)]
ReadSchema: struct<product_id:int,price:double>
(9) Filter
Input [2]: [product_id#398, price#402]
Condition : isnotnull(product_id#398)
(10) BroadcastExchange
Input [2]: [product_id#398, price#402]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=4593]
(11) BroadcastHashJoin
Left keys [1]: [product_id#408]
Right keys [1]: [product_id#398]
Join type: Inner
Join condition: None
(12) Project
Output [3]: [quantity#409, price#402, cast(order_date#405 as date) AS _groupingexpression#543]
Input [5]: [order_date#405, product_id#408, quantity#409, product_id#398, price#402]
(13) HashAggregate
Input [3]: [quantity#409, price#402, _groupingexpression#543]
Keys [1]: [_groupingexpression#543]
Functions [1]: [partial_sum((cast(quantity#409 as double) * price#402))]
Aggregate Attributes [1]: [sum#544]
Results [2]: [_groupingexpression#543, sum#545]
(14) Exchange
Input [2]: [_groupingexpression#543, sum#545]
Arguments: hashpartitioning(_groupingexpression#543, 200), ENSURE_REQUIREMENTS, [plan_id=4598]
(15) HashAggregate
Input [2]: [_groupingexpression#543, sum#545]
Keys [1]: [_groupingexpression#543]
Functions [1]: [sum((cast(quantity#409 as double) * price#402))]
Aggregate Attributes [1]: [sum((cast(quantity#409 as double) * price#402))#542]
Results [2]: [_groupingexpression#543 AS d#535, sum((cast(quantity#409 as double) * price#402))#542 AS gmv#536]
(16) AdaptiveSparkPlan
Output [2]: [d#535, gmv#536]
Arguments: isFinalPlan=false
Record Spark UI metrics for both runs in lab2_metrics_log.csv
8. Cleanup¶
In [54]:
spark.stop()
print("Spark session stopped.")
Spark session stopped.
In [ ]: