ESIEE Paris — Data Engineering I — Assignment 2¶
Author : Badr TAJINI
Academic year: 2025–2026
Program: Data & Applications - Engineering - (FD)
Course: Data Engineering I
from pyspark.sql import SparkSession, functions as F, types as T
spark = SparkSession.builder.appName("de1-lab2-assignment").getOrCreate()
WARNING: Using incubator modules: jdk.incubator.vector Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties 25/11/05 16:35:34 WARN Utils: Your hostname, OrdideJustine, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo) 25/11/05 16:35:34 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 25/11/05 16:35:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Data inputs¶
Define your input paths. Use small CSV/JSON/Parquet files so the notebook runs locally. If your dataset requires credentials, create a sample subset and commit only that.
Paths to set:
SOURCE_A_PATH(fact‑like dataset)SOURCE_B_PATH(dimension‑like dataset)OUTPUT_BASE(directory for Parquet output)
# Read carefully the helper to review what is missing here
SOURCE_A_PATH = 'data/events.csv'
SOURCE_B_PATH = 'data/user.csv'
OUTPUT_BASE = 'outputs'
Pipeline API (implementations required)¶
Implement the following functions. Keep signatures stable. Use explicit schemas when possible. Log counts at each stage.
from typing import Tuple
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
def ingest(spark: SparkSession, path_a: str, path_b: str) -> Tuple[DataFrame, DataFrame]:
"""Load SOURCE_A and SOURCE_B. Apply explicit schemas where possible.
Return two DataFrames with uniform column naming.
"""
events_schema = T.StructType([
T.StructField("event_time", T.TimestampType(), True),
T.StructField("event_type", T.StringType(), True),
T.StructField("session_id", T.StringType(), True),
T.StructField("product_id", T.StringType(), True),
T.StructField("price", T.DoubleType(), True)
])
df_events = spark.read.csv(path_a, header=True, schema=events_schema)
print("Events count:", df_events.count())
# Explicit schema for users
users_schema = T.StructType([
T.StructField("user_id", T.StringType(), True),
T.StructField("first_name", T.StringType(), True),
T.StructField("last_name", T.StringType(), True),
T.StructField("birthdate", T.DateType(), True)
])
df_users = spark.read.csv(path_b, header=True, schema=users_schema)
print("Users count:", df_users.count())
return df_events, df_users
def transform(df_events: DataFrame, df_users: DataFrame) -> DataFrame:
"""Clean, deduplicate, and normalize. Add parsed timestamps.
Drop obvious null records. Prepare keys for join.
"""
df_events_clean = (
df_events
.dropna(subset=["event_time", "event_type", "session_id"])
.dropDuplicates(["event_time", "session_id", "product_id"])
)
# Convert timestamps to UTC if needed
df_events_clean = df_events_clean.withColumn("event_time", F.to_utc_timestamp("event_time", "UTC"))
print("Cleaned events count:", df_events_clean.count())
return df_events_clean
def join_and_aggregate(df_events: DataFrame, df_users: DataFrame) -> DataFrame:
"""Join with dim table. Handle potential skew (hint: salting or AQE).
Compute business aggregates with window or groupBy.
"""
# Example join: left join events with users on user_id
# Here we assume events somehow have user_id (or you can skip join for now)
# If events don't have user_id, this join is skipped
if "user_id" in df_events.columns:
df_joined = df_events.join(df_users, on="user_id", how="left")
else:
df_joined = df_events
# Compute age_on_event if birthdate exists
if "birthdate" in df_users.columns and "user_id" in df_joined.columns:
df_joined = df_joined.withColumn(
"age_on_event",
F.floor(F.months_between(F.col("event_time"), F.col("birthdate")) / 12)
)
# Example aggregation: count events per day
df_agg = (
df_joined
.withColumn("event_date", F.to_date("event_time"))
.groupBy("event_date")
.agg(F.count("*").alias("events_count"))
)
return df_agg
def write_out(df: DataFrame, base: str, partitions: list[str]) -> None:
"""Write Parquet, overwrite mode, partitioned by `partitions`.
Optimize small files if needed (coalesce).
"""
df.write.mode("overwrite").partitionBy(partitions).parquet(base)
print(f"Data written to {base} partitioned by {partitions}")
Tasks¶
- Ingest: read
SOURCE_A_PATH,SOURCE_B_PATH. Provide explicit schemas. Count rows and malformed records. - Transform: standardize column names, cast types, parse timestamps into UTC, deduplicate using keys.
- Join + Aggregate: explain your join strategy. Mitigate skew. Produce a tidy table with daily metrics.
- Store: write partitioned Parquet to
OUTPUT_BASE, e.g., partition bydateand one categorical column. - Explain plans: capture
df.explain(mode='formatted')for transform, join, and final write. - Quality gates: implement three checks (row count non‑zero, null rate thresholds, referential coverage). Abort if a gate fails.
- Reproducibility: document your Spark config and any seeds. Describe how to re‑run.
# ----------------------------
# Orchestration
# ----------------------------
if spark is not None:
# Ingest
df_events, df_users = ingest(spark, SOURCE_A_PATH, SOURCE_B_PATH)
# Transform
df_events_clean = transform(df_events, df_users)
# Join + Aggregate
df_final = join_and_aggregate(df_events_clean, df_users)
print("Final count:", df_final.count())
df_final.explain(mode="formatted")
# Write output
write_out(df_final, OUTPUT_BASE, partitions=["event_date"])
Events count: 42418541 Users count: 3022290
Cleaned events count: 42412833
Final count: 32
== Physical Plan ==
AdaptiveSparkPlan (10)
+- HashAggregate (9)
+- Exchange (8)
+- HashAggregate (7)
+- HashAggregate (6)
+- Exchange (5)
+- HashAggregate (4)
+- Project (3)
+- Filter (2)
+- Scan csv (1)
(1) Scan csv
Output [4]: [event_time#0, event_type#1, session_id#2, product_id#3]
Batched: false
Location: InMemoryFileIndex [file:/home/justine/Data_engineering/lab2_assignment/data/events.csv]
ReadSchema: struct<event_time:timestamp,event_type:string,session_id:string,product_id:string>
(2) Filter
Input [4]: [event_time#0, event_type#1, session_id#2, product_id#3]
Condition : atleastnnonnulls(3, event_time#0, event_type#1, session_id#2)
(3) Project
Output [3]: [event_time#0, session_id#2, product_id#3]
Input [4]: [event_time#0, event_type#1, session_id#2, product_id#3]
(4) HashAggregate
Input [3]: [event_time#0, session_id#2, product_id#3]
Keys [3]: [event_time#0, session_id#2, product_id#3]
Functions: []
Aggregate Attributes: []
Results [3]: [event_time#0, session_id#2, product_id#3]
(5) Exchange
Input [3]: [event_time#0, session_id#2, product_id#3]
Arguments: hashpartitioning(event_time#0, session_id#2, product_id#3, 200), ENSURE_REQUIREMENTS, [plan_id=284]
(6) HashAggregate
Input [3]: [event_time#0, session_id#2, product_id#3]
Keys [3]: [event_time#0, session_id#2, product_id#3]
Functions: []
Aggregate Attributes: []
Results [1]: [cast(to_utc_timestamp(event_time#0, UTC) as date) AS event_date#45]
(7) HashAggregate
Input [1]: [event_date#45]
Keys [1]: [event_date#45]
Functions [1]: [partial_count(1)]
Aggregate Attributes [1]: [count#71L]
Results [2]: [event_date#45, count#72L]
(8) Exchange
Input [2]: [event_date#45, count#72L]
Arguments: hashpartitioning(event_date#45, 200), ENSURE_REQUIREMENTS, [plan_id=288]
(9) HashAggregate
Input [2]: [event_date#45, count#72L]
Keys [1]: [event_date#45]
Functions [1]: [count(1)]
Aggregate Attributes [1]: [count(1)#53L]
Results [2]: [event_date#45, count(1)#53L AS events_count#46L]
(10) AdaptiveSparkPlan
Output [2]: [event_date#45, events_count#46L]
Arguments: isFinalPlan=false
[Stage 23:> (0 + 1) / 1]
Data written to outputs partitioned by ['event_date']
Assignment 2: ETL¶
1. Querying the Operational Database¶
Let's run a query to verify that the operational database has been properly restored and that we can issue a query to PostgreSQL:
import os
os.environ['PGHOST'] = '127.0.0.1' # adresse du serveur PostgreSQL
os.environ['PGPORT'] = '5433' # ou 5432 selon ton installation
os.environ['PGUSER'] = 'esiee_reader'
os.environ['PGPASSWORD'] = 'azerty123'
!psql "esiee_full" -v ON_ERROR_STOP=1 -c "SELECT COUNT(DISTINCT user_id) AS number_users FROM retail.user;"
number_users
--------------
3022290
(1 row)
The correct answer should be 3022290.
If running the cell above gives you the same answer, the everything should be in order.
If you're getting an error, fix it before moving on.
As a warmup exercise, write SQL queries against the operational database to answer the following questions and report the answers. Place both your SQL queries and answers in the following cell, replacing the placeholder texts that exist there. Each question needs to be answered by a single SQL query (that is, it is not acceptable to run multiple SQL queries and then compute the answer yourself).
- For
session_id789d3699-028e-4367-b515-b82e2cb5225f, what was the purchase price? - How many products are sold by the brand "sokolov"?
- What is the average purchase price of items purchased from the brand "febest"?
- What is average number of events per user? (Report answer to two digits after the decimal point, i.e., XX.XX)
write some code here
!psql "esiee_full" -v ON_ERROR_STOP=1 -c "SELECT price FROM retail.events WHERE session_id = '789d3699-028e-4367-b515-b82e2cb5225f' AND event_type = 'purchase';"
price -------- 100.39 (1 row)
!psql "esiee_full" -v ON_ERROR_STOP=1 -c "SELECT COUNT(DISTINCT p.product_id) AS nb_produits FROM retail.product p JOIN retail.events e ON p.product_id = e.product_id WHERE p.brand = 'sokolov' AND e.event_type = 'purchase';"
nb_produits
-------------
354
(1 row)
!psql "esiee_full" -v ON_ERROR_STOP=1 -c "SELECT ROUND(AVG(e.price), 2) AS prix_moyen FROM retail.product p JOIN retail.events e ON p.product_id = e.product_id WHERE p.brand = 'febest' AND e.event_type = 'purchase';"
prix_moyen
------------
20.39
(1 row)
!psql "esiee_full" -v ON_ERROR_STOP=1 -c "SELECT ROUND(AVG(ev_count), 2) AS events_par_user FROM (SELECT user_id, COUNT(*) AS ev_count FROM retail.session s JOIN retail.events e ON s.session_id = e.session_id GROUP BY user_id) t;"
events_par_user
-----------------
14.04
(1 row)
// qcell_1b76x2 (keep this id for tracking purposes)
Q1 SQL:
SELECT price FROM retail.events WHERE session_id = '789d3699-028e-4367-b515-b82e2cb5225f' AND event_type = 'purchase';
Q1 answer:
100.39
Q2 SQL:
SELECT COUNT(DISTINCT p.product_id) AS nb_produits FROM retail.product p JOIN retail.events e ON p.product_id = e.product_id WHERE p.brand = 'sokolov' AND e.event_type = 'purchase';
Q2 answer:
354
Q3 SQL:
SELECT ROUND(AVG(e.price), 2) AS prix_moyen FROM retail.product p JOIN retail.events e ON p.product_id = e.product_id WHERE p.brand = 'febest' AND e.event_type = 'purchase';
Q3 answer: 20.39
Q4 SQL:
SELECT ROUND(AVG(ev_count), 2) AS events_par_user FROM ( SELECT user_id, COUNT(*) AS ev_count FROM retail.session s JOIN retail.events e ON s.session_id = e.session_id GROUP BY user_id ) t;
Q4 answer:
14.04
2. Setup¶
The following cell contains setup to measure wall clock time and memory usage. (Don't worry about the details, just run the cell)
!pip install -U numpy pandas pyarrow matplotlib scipy
import sys, subprocess
try:
import psutil # noqa: F401
except Exception:
subprocess.check_call([sys.executable, "-m", "pip", "install", "psutil"])
print("psutil is installed.")
from IPython.core.magic import register_cell_magic
import time, os, platform
# Try to import optional modules
try:
import psutil
except Exception:
psutil = None
try:
import resource # not available on Windows
except Exception:
resource = None
def _rss_bytes():
"""Resident Set Size in bytes (cross-platform via psutil if available)."""
if psutil is not None:
return psutil.Process(os.getpid()).memory_info().rss
# Fallback: unknown RSS → 0
return 0
def _peak_bytes():
"""
Best-effort peak memory in bytes.
- Windows: psutil peak working set (peak_wset)
- Linux: resource.ru_maxrss (KB → bytes)
- macOS: resource.ru_maxrss (bytes)
Fallback to current RSS if unavailable.
"""
sysname = platform.system()
# Windows path: use psutil peak_wset if present
if sysname == "Windows" and psutil is not None:
mi = psutil.Process(os.getpid()).memory_info()
peak = getattr(mi, "peak_wset", None) # should be available on Windows
if peak is not None:
return int(peak)
return int(mi.rss)
# POSIX path: resource may be available
if resource is not None:
try:
ru = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
# On Linux ru_maxrss is in kilobytes; on macOS/BSD it is bytes
if sysname == "Linux":
return int(ru) * 1024
else:
return int(ru)
except Exception:
pass
# Last resort
return _rss_bytes()
@register_cell_magic
def timemem(line, cell):
"""
Measure wall time and memory around the execution of this cell.
%%timemem
<your code>
Notes:
- RSS = resident memory after the cell.
- Peak is OS-dependent (see _peak_bytes docstring).
"""
ip = get_ipython()
rss_before = _rss_bytes()
peak_before = _peak_bytes()
t0 = time.perf_counter()
# Execute the cell body
result = ip.run_cell(cell)
t1 = time.perf_counter()
rss_after = _rss_bytes()
peak_after = _peak_bytes()
wall = t1 - t0
rss_delta_mb = (rss_after - rss_before) / (1024 * 1024)
peak_delta_mb = (peak_after - peak_before) / (1024 * 1024)
print("======================================")
print(f"Wall time: {wall:.3f} s")
print(f"RSS Δ: {rss_delta_mb:+.2f} MB")
print(f"Peak memory Δ: {peak_delta_mb:+.2f} MB (OS-dependent)")
print("======================================")
return result
Requirement already satisfied: numpy in /home/justine/miniconda3/envs/de1-env/lib/python3.10/site-packages (2.2.6) Requirement already satisfied: pandas in /home/justine/miniconda3/envs/de1-env/lib/python3.10/site-packages (2.3.3) Collecting pyarrow Downloading pyarrow-22.0.0-cp310-cp310-manylinux_2_28_x86_64.whl.metadata (3.2 kB) Collecting matplotlib Downloading matplotlib-3.10.7-cp310-cp310-manylinux2014_x86_64.manylinux_2_17_x86_64.whl.metadata (11 kB) Collecting scipy Downloading scipy-1.15.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (61 kB) Requirement already satisfied: python-dateutil>=2.8.2 in /home/justine/miniconda3/envs/de1-env/lib/python3.10/site-packages (from pandas) (2.9.0.post0) Requirement already satisfied: pytz>=2020.1 in /home/justine/miniconda3/envs/de1-env/lib/python3.10/site-packages (from pandas) (2025.2) Requirement already satisfied: tzdata>=2022.7 in /home/justine/miniconda3/envs/de1-env/lib/python3.10/site-packages (from pandas) (2025.2) Collecting contourpy>=1.0.1 (from matplotlib) Downloading contourpy-1.3.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (5.5 kB) Collecting cycler>=0.10 (from matplotlib) Downloading cycler-0.12.1-py3-none-any.whl.metadata (3.8 kB) Collecting fonttools>=4.22.0 (from matplotlib) Downloading fonttools-4.60.1-cp310-cp310-manylinux2014_x86_64.manylinux_2_17_x86_64.whl.metadata (112 kB) Collecting kiwisolver>=1.3.1 (from matplotlib) Downloading kiwisolver-1.4.9-cp310-cp310-manylinux_2_12_x86_64.manylinux2010_x86_64.whl.metadata (6.3 kB) Requirement already satisfied: packaging>=20.0 in /home/justine/miniconda3/envs/de1-env/lib/python3.10/site-packages (from matplotlib) (25.0) Collecting pillow>=8 (from matplotlib) Downloading pillow-12.0.0-cp310-cp310-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (8.8 kB) Collecting pyparsing>=3 (from matplotlib) Downloading pyparsing-3.2.5-py3-none-any.whl.metadata (5.0 kB) Requirement already satisfied: six>=1.5 in /home/justine/miniconda3/envs/de1-env/lib/python3.10/site-packages (from python-dateutil>=2.8.2->pandas) (1.17.0) Downloading pyarrow-22.0.0-cp310-cp310-manylinux_2_28_x86_64.whl (47.6 MB) ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 47.6/47.6 MB 1.9 MB/s 0:00:25m0:00:0100:01 Downloading matplotlib-3.10.7-cp310-cp310-manylinux2014_x86_64.manylinux_2_17_x86_64.whl (8.7 MB) ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 8.7/8.7 MB 1.7 MB/s 0:00:04 eta 0:00:010m Downloading scipy-1.15.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (37.7 MB) ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 37.7/37.7 MB 1.8 MB/s 0:00:20m0:00:0100:01 Downloading contourpy-1.3.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (325 kB) Downloading cycler-0.12.1-py3-none-any.whl (8.3 kB) Downloading fonttools-4.60.1-cp310-cp310-manylinux2014_x86_64.manylinux_2_17_x86_64.whl (4.8 MB) ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 4.8/4.8 MB 2.3 MB/s 0:00:02 eta 0:00:01 Downloading kiwisolver-1.4.9-cp310-cp310-manylinux_2_12_x86_64.manylinux2010_x86_64.whl (1.6 MB) ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 1.6/1.6 MB 1.9 MB/s 0:00:00 eta 0:00:01 Downloading pillow-12.0.0-cp310-cp310-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl (7.0 MB) ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 7.0/7.0 MB 1.9 MB/s 0:00:03 eta 0:00:010m Downloading pyparsing-3.2.5-py3-none-any.whl (113 kB) Installing collected packages: scipy, pyparsing, pyarrow, pillow, kiwisolver, fonttools, cycler, contourpy, matplotlib ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 9/9 [matplotlib]9 [matplotlib] Successfully installed contourpy-1.3.2 cycler-0.12.1 fonttools-4.60.1 kiwisolver-1.4.9 matplotlib-3.10.7 pillow-12.0.0 pyarrow-22.0.0 pyparsing-3.2.5 scipy-1.15.3 psutil is installed.
3. The "Extract" in ETL¶
The operational database comprises the tables described in the helper.
For the "Extract" in ETL, we're going to extract the following CSV files, each corresponding to a table in the operational database:
- user.csv:
user_id, gender, birthdate - session.csv:
session_id, user_id - product.csv:
product_id, brand, category, product_name - product_name.csv:
category, product_name, description - events.csv:
event_time, event_type, session_id, product_id, price - category.csv:
category, description - brand.csv:
brand, description
From these files, we'll build a data warehouse organized in a standard star schema that has the following tables:
- Dimension tables:
dim_user,dim_age,dim_brand,dim_category,dim_product,dim_date,dim_session - The main fact table
fact_eventswith foreign keys:date_key, user_key, age_key, product_key, brand_key, category_key, session_key
Let's specify a "base directory":
# Change to path on your local machine.
BASE_DIR = "/home/justine/de1-website/DE1/labs-final/lab2-assignment/csv"
These are the commands that perform the "extraction":
!psql cs451 -v ON_ERROR_STOP=1 -c '\copy "retail"."user" TO '\''{BASE_DIR}/user.csv'\'' WITH (FORMAT csv, HEADER true)'
!psql cs451 -v ON_ERROR_STOP=1 -c '\copy "retail"."session" TO '\''{BASE_DIR}/session.csv'\'' WITH (FORMAT csv, HEADER true)'
!psql cs451 -v ON_ERROR_STOP=1 -c '\copy "retail"."category" TO '\''{BASE_DIR}/category.csv'\'' WITH (FORMAT csv, HEADER true)'
!psql cs451 -v ON_ERROR_STOP=1 -c '\copy "retail"."brand" TO '\''{BASE_DIR}/brand.csv'\'' WITH (FORMAT csv, HEADER true)'
!psql cs451 -v ON_ERROR_STOP=1 -c '\copy "retail"."product_name" TO '\''{BASE_DIR}/product_name.csv'\'' WITH (FORMAT csv, HEADER true)'
!psql cs451 -v ON_ERROR_STOP=1 -c '\copy "retail"."product" TO '\''{BASE_DIR}/product.csv'\'' WITH (FORMAT csv, HEADER true)'
!psql cs451 -v ON_ERROR_STOP=1 -c '\copy "retail"."events" TO '\''{BASE_DIR}/events.csv'\'' WITH (FORMAT csv, HEADER true)'
(Note that the quote style above will not work for Windows machines. Please adjust accordingly.)
After the extraction, you should have 7 CSV files, each corresponding to a table in the operational database.
The CSV files should be stored in BASE_DIR.
The following code snippet should "just work" to initialize Spark.
import findspark, os, sys
# Change to path on your local machine.
os.environ["SPARK_HOME"] = "home/justine/spark-4.0.1-bin-hadoop3"
findspark.init()
import os
from pyspark.sql import SparkSession, functions as F, types as T
from pyspark.sql.functions import col
py = sys.executable # the Python of this notebook (e.g., .../envs/yourenv/bin/python)
os.environ["PYSPARK_DRIVER_PYTHON"] = py
os.environ["PYSPARK_PYTHON"] = py
spark = SparkSession.getActiveSession() or (
SparkSession.builder
.appName("A2")
.master("local[*]")
.config("spark.driver.memory", "8g") # or 12g+
.config("spark.sql.shuffle.partitions","400")
.config("spark.sql.adaptive.enabled", "true")
.config("spark.pyspark.driver.python", py)
.config("spark.pyspark.python", py)
.config("spark.executorEnv.PYSPARK_PYTHON", py)
.getOrCreate()
)
spark
SparkSession - in-memory
At this point, Spark should be initialized.
Let's then load in CSV files into DataFrames.
write some code here
%%timemem
# codecell_30z8le (keep this id for tracking purposes)
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, DoubleType
USER_CSV = f"{BASE_DIR}/user.csv"
SESSION_CSV = f"{BASE_DIR}/session.csv"
PRODUCT_CSV = f"{BASE_DIR}/product.csv"
PRODUCT_NAME_CSV= f"{BASE_DIR}/product_name.csv"
EVENTS_CSV = f"{BASE_DIR}/events.csv"
CATEGORY_CSV = f"{BASE_DIR}/category.csv"
BRAND_CSV = f"{BASE_DIR}/brand.csv"
events_schema = StructType([
StructField("event_time", TimestampType(), True),
StructField("event_type", StringType(), True),
StructField("session_id", StringType(), True),
StructField("product_id", StringType(), True),
StructField("price", DoubleType(), True)
])
df_user = spark.read.csv(USER_CSV, header=True, inferSchema=True)
df_session = spark.read.csv(SESSION_CSV, header=True, inferSchema=True)
df_product = spark.read.csv(PRODUCT_CSV, header=True, inferSchema=True)
df_product_name = spark.read.csv(PRODUCT_NAME_CSV, header=True, inferSchema=True)
df_events = spark.read.csv(EVENTS_CSV, header=True, schema=events_schema)
df_category = spark.read.csv(CATEGORY_CSV, header=True, inferSchema=True)
df_brand = spark.read.csv(BRAND_CSV, header=True, inferSchema=True)
# Vérifier les counts
print(f"user: {df_user.count()}")
print(f"session: {df_session.count()}")
print(f"product: {df_product.count()}")
print(f"product_name: {df_product_name.count()}")
print(f"events: {df_events.count()}")
print(f"category: {df_category.count()}")
print(f"brand: {df_brand.count()}")
user: 3022290
session: 9244421 product: 166794 product_name: 127
events: 42418541 category: 13 brand: 3444 ====================================== Wall time: 11.014 s RSS Δ: +0.12 MB Peak memory Δ: +0.12 MB (OS-dependent) ======================================
<ExecutionResult object at 712f3ac35cf0, execution_count=None error_before_exec=None error_in_exec=None info=<ExecutionInfo object at 712f3ac354b0, raw_cell="# codecell_30z8le (keep this id for tracking purpo.." store_history=False silent=False shell_futures=True cell_id=None> result=None>
How do you know if you've done everything correctly?
Well, issue the SQL query select count(*) from retail.user; to count the number of rows in the user table in the operational database.
It should match the output of df_user.count(); same for the other tables.
If the counts match, then you know everything is in order.
!psql "esiee_full" -v ON_ERROR_STOP=1 -c "SELECT COUNT(*) FROM retail.user;"
count --------- 3022290 (1 row)
4. Build the Dimensions Tables¶
4.1 The user Dimension Table¶
Build the dim_user dimension table.
This table should include user_key, user_id, gender, birthdate, and generation.
Set generation to one of the following values based on the birth year:
- "Traditionalists": born 1925 to 1945
- "Boomers": born 1946 to 1964
- "GenX": born 1965 to 1980
- "Millennials": born 1981 to 2000
- "GenZ": born 2001 to 2020
write some code here
%%timemem
# codecell_41ax14 (keep this id for tracking purposes)
from pyspark.sql import functions as F
from pyspark.sql.window import Window
w = Window.orderBy("user_id")
dim_user = df_user.withColumn("user_key", F.row_number().over(w))
dim_user = dim_user.withColumn("birth_year", F.year("birthdate"))
dim_user = dim_user.withColumn(
"generation",
F.when((F.col("birth_year") >= 1925) & (F.col("birth_year") <= 1945), "Traditionalists")
.when((F.col("birth_year") >= 1946) & (F.col("birth_year") <= 1964), "Boomers")
.when((F.col("birth_year") >= 1965) & (F.col("birth_year") <= 1980), "GenX")
.when((F.col("birth_year") >= 1981) & (F.col("birth_year") <= 2000), "Millennials")
.when((F.col("birth_year") >= 2001) & (F.col("birth_year") <= 2020), "GenZ")
.otherwise("unknown")
)
dim_user = dim_user.select("user_key", "user_id", "gender", "birthdate", "generation")
# By the time we get to here, "dim_user" should hold the user dimensions table according to the specification above.
dim_user.count()
3022290
====================================== Wall time: 0.557 s RSS Δ: +0.00 MB Peak memory Δ: +0.00 MB (OS-dependent) ======================================
<ExecutionResult object at 712f3ac347f0, execution_count=None error_before_exec=None error_in_exec=None info=<ExecutionInfo object at 712f3ac34130, raw_cell="# codecell_41ax14 (keep this id for tracking purpo.." store_history=False silent=False shell_futures=True cell_id=None> result=3022290>
The correct answer should be 3022290.
4.2 The age Dimension Table¶
Even though birthdate exists in dim_user, a separate dim_age is helpful because it:
- Simplifies analysis with ready-made bands.
- Ensures consistency across all queries.
- Improves performance via small surrogate keys.
- Preserves history by fixing age at event time.
- Adds flexibility to adjust bands without changing facts.
We're going to build a dim_age table that has 4 columns:
age_key: (INT, surrogate PK)age_band: (STRING) following the age band rules belowmin_age: (INT)max_age: (INT)
Bands:
- "<18": min_age = NULL, max_age = 17
- "18-24": 18, 24
- "25-34": 25, 34
- "35-44": 35, 44
- "45-54": 45, 54
- "55-64": 55, 64
- "65-74": 65, 74
- "75-84": 75, 84
- "85-94": 85, 94
- "unknown": NULL, NULL
The construction of this table is a bit tricky, so we're going to show you how to do it, as follows:
%%timemem
# Static age bands
age_band_rows = [
("<18", None, 17),
("18-24", 18, 24),
("25-34", 25, 34),
("35-44", 35, 44),
("45-54", 45, 54),
("55-64", 55, 64),
("65-74", 65, 74),
("75-84", 75, 84),
("85-94", 85, 94),
("unknown", None, None),
]
dim_age = spark.createDataFrame(age_band_rows, ["age_band", "min_age", "max_age"])
w_age = Window.orderBy(F.col("age_band"))
dim_age = dim_age.withColumn("age_key", F.dense_rank().over(w_age))
dim_age.count()
10
====================================== Wall time: 1.482 s RSS Δ: +26.00 MB Peak memory Δ: +20.29 MB (OS-dependent) ======================================
<ExecutionResult object at 712f3ac36920, execution_count=None error_before_exec=None error_in_exec=None info=<ExecutionInfo object at 712f3ac374c0, raw_cell="
# Static age bands
age_band_rows = [
("<18", .." store_history=False silent=False shell_futures=True cell_id=None> result=10>
The correct answer should be 10.
4.3 The brand, product, and category Dimension Tables¶
Build the following dimension tables:
dim_brand:
brand_key(INT, surrogate PK)brand_code(STRING)brand_desc(STRING)
dim_category:
category_key(INT, surrogate PK)category_code(STRING)category_desc(STRING)
dim_product:
product_key(INT, surrogate PK)product_id(STRING)product_desc(STRING)brand_key(INT, FK →dim_brand)category_key(INT, FK →dim_category)
The Learning goals of dim_product is to keep all products in product, and add details from product_names, then join the results with brand and category dimension tables.
write some code here
%%timemem
# codecell_43k3n9 (keep this id for tracking purposes)
# 1. dim_brand
w_brand = Window.orderBy("brand")
dim_brand = df_brand.withColumn("brand_key", F.row_number().over(w_brand))
dim_brand = dim_brand.select(
"brand_key",
F.col("brand").alias("brand_code"),
F.col("description").alias("brand_desc")
)
# 2. dim_category
w_cat = Window.orderBy("category")
dim_category = df_category.withColumn("category_key", F.row_number().over(w_cat))
dim_category = dim_category.select(
"category_key",
F.col("category").alias("category_code"),
F.col("description").alias("category_desc")
)
# 3. dim_product
df_product_full = df_product.join(
df_product_name,
on=["category", "product_name"],
how="left"
)
df_product_full = df_product_full.join(
dim_brand,
df_product_full.brand == dim_brand.brand_code,
how="left"
).join(
dim_category,
df_product_full.category == dim_category.category_code,
how="left"
)
w_prod = Window.orderBy("product_id")
dim_product = df_product_full.withColumn("product_key", F.row_number().over(w_prod))
dim_product = dim_product.select(
"product_key",
"product_id",
F.col("description").alias("product_desc"),
"brand_key",
"category_key"
)
# By the time we get to here, "dim_brand", "dim_category", and "dim_product" should hold
# the dimension tables according to the specifications above.
print(f"Number of rows in dim_brand: {dim_brand.count()}")
print(f"Number of rows in dim_category: {dim_category.count()}")
print(f"Number of rows in dim_product: {dim_product.count()}")
Number of rows in dim_brand: 3444 Number of rows in dim_category: 13 Number of rows in dim_product: 166794 ====================================== Wall time: 1.130 s RSS Δ: +0.25 MB Peak memory Δ: +0.25 MB (OS-dependent) ======================================
<ExecutionResult object at 712f3932a7a0, execution_count=None error_before_exec=None error_in_exec=None info=<ExecutionInfo object at 712f3932a770, raw_cell="# codecell_43k3n9 (keep this id for tracking purpo.." store_history=False silent=False shell_futures=True cell_id=None> result=None>
Correct answers:
- Number of rows in
dim_brand: 3444 - Number of rows in
dim_category: 13 - Number of rows in
dim_product: 166794
4.4 The date Dimension Table¶
This table is expected to have one row per calendar date.
dim_date:
date_key(INT, surrogate PK; format YYYYMMDD)date(DATE, the actual calendar date)day(INT, 1–31)day_of_week(INT, 1=Mon … 7=Sun)day_name(STRING, e.g., Monday)is_weekend(BOOLEAN)week_of_year(INT, 1–53, ISO week)month(INT, 1–12)month_name(STRING, e.g., January)quarter(INT, 1–4)year(INT)
There are 2025 years, each with 365 days. Do we need to have a table that big? We can, but we do not have to!
Instead, follow these instructions to create only as many rows as we need:
- Determine the date range (from the min and max
event_dateindf_events). - Generate all dates in that range with
F.sequence(). - Derive attributes (
day,day_of_week, ...). - Create
date_key=year * 10000 + month * 100 + day(i.e., YYYYMMDD). - Assign
date_keyas the surrogate PK.
Build the dim_date table conforming to the specifications above.
write some code here
%%timemem
# codecell_44qm5c (keep this id for tracking purposes)
min_date, max_date = df_events.select(
F.min(F.to_date("event_time")).alias("min_date"),
F.max(F.to_date("event_time")).alias("max_date")
).first()
dates_df = spark.createDataFrame([(min_date, max_date)], ["start", "end"]) \
.withColumn("date", F.explode(F.sequence(F.col("start"), F.col("end"), F.expr("interval 1 day")))) \
.select("date")
dim_date = dates_df.withColumn("day", F.dayofmonth("date")) \
.withColumn("day_of_week", F.date_format("date", "u").cast("int")) \
.withColumn("day_name", F.date_format("date", "EEEE")) \
.withColumn("is_weekend", (F.col("day_of_week") >= 6).cast("boolean")) \
.withColumn("week_of_year", F.weekofyear("date")) \
.withColumn("month", F.month("date")) \
.withColumn("month_name", F.date_format("date", "MMMM")) \
.withColumn("quarter", F.quarter("date")) \
.withColumn("year", F.year("date")) \
.withColumn("date_key", F.col("year") * 10000 + F.col("month") * 100 + F.col("day"))
dim_date = dim_date.select(
"date_key", "date", "day", "day_of_week", "day_name", "is_weekend",
"week_of_year", "month", "month_name", "quarter", "year"
)
# By the time we get to here, "dim_date" should hold the dates dimension table according to the specification above.
print(dim_date.count())
32 ====================================== Wall time: 20.889 s RSS Δ: +0.00 MB Peak memory Δ: +0.00 MB (OS-dependent) ======================================
<ExecutionResult object at 712f3ac37e20, execution_count=None error_before_exec=None error_in_exec=None info=<ExecutionInfo object at 712f3ac35420, raw_cell="# codecell_44qm5c (keep this id for tracking purpo.." store_history=False silent=False shell_futures=True cell_id=None> result=None>
The correct answer should be 32.
If you reach here, congratulations! You have created all the dimension tables!
%%timemem
print(f"dim_user: {dim_user.count()}")
print(f"dim_age: {dim_age.count()}")
print(f"dim_brand: {dim_brand.count()}")
print(f"dim_category: {dim_category.count()}")
print(f"dim_product: {dim_product.count()}")
print(f"dim_date: {dim_date.count()}")
dim_user: 3022290 dim_age: 10 dim_brand: 3444 dim_category: 13 dim_product: 166794 dim_date: 32 ====================================== Wall time: 1.472 s RSS Δ: +0.00 MB Peak memory Δ: +0.00 MB (OS-dependent) ======================================
<ExecutionResult object at 712f3ac371f0, execution_count=None error_before_exec=None error_in_exec=None info=<ExecutionInfo object at 712f3ac34730, raw_cell="
print(f"dim_user: {dim_user.count()}")
print(f"di.." store_history=False silent=False shell_futures=True cell_id=None> result=None>
Correct answers:
dim_user: 3022290dim_age: 10dim_brand: 3444dim_category: 13dim_product: 166794dim_date: 32
5. Build the Fact Table¶
Now it's time to build the fact table!
Our goal in this step is to create a clean fact_events table that joins the events from the operational database to the dimension tables you've just built above.
Along the way, we're going to enforce data quality and do a bit of data cleaning.
5.1 Clean Events¶
Create events_clean by removing any record that "does not make sense".
Specifically:
- Start from the
df_eventsDataFrame. - Keep only rows with non-null timestamps,
session_id, andproduct_id. - Cast price to double; keep
NULLprices (views/carts can be price-less) and non-negative values only. - Drop dates in the future.
- Restrict to valid event types:
view,cart,purchase,remove.
write some code here
%%timemem
# codecell_51ep7v (keep this id for tracking purposes)
from pyspark.sql import functions as F
from functools import reduce
from operator import and_ as AND
valid_types = ["view", "cart", "purchase", "remove"]
events_clean = df_events
events_clean = events_clean.filter(
(F.col("event_time").isNotNull()) &
(F.col("session_id").isNotNull()) &
(F.col("product_id").isNotNull())
)
events_clean = events_clean.withColumn("price", F.col("price").cast("double")) \
.filter((F.col("price").isNull()) | (F.col("price") >= 0))
events_clean = events_clean.filter(F.to_date("event_time") <= F.current_date())
events_clean = events_clean.filter(F.col("event_type").isin(valid_types))
# By the time we get to here, "events_clean" should conform to the specification above.
events_clean.count()
42418541
====================================== Wall time: 21.164 s RSS Δ: +0.00 MB Peak memory Δ: +0.00 MB (OS-dependent) ======================================
<ExecutionResult object at 712f3ac35e70, execution_count=None error_before_exec=None error_in_exec=None info=<ExecutionInfo object at 712f3ac12050, raw_cell="# codecell_51ep7v (keep this id for tracking purpo.." store_history=False silent=False shell_futures=True cell_id=None> result=42418541>
5.2 Cap Silly Prices¶
Next, let us check some statistics about prices and then decide what we want to do.
What is the minimum, maximum, and average price in this database?
write some code here
%%timemem
# codecell_52hg6x (keep this id for tracking purposes)
stats = events_clean.select(
F.min("price").alias("minimum"),
F.max("price").alias("maximum"),
F.avg("price").alias("average")
).collect()[0]
minimum = stats["minimum"]
maximum = stats["maximum"]
average = stats["average"]
# By the time we get to here, "minimum", "maximum", and "average" should conform to the specification above.
print(f"minimum: {minimum}")
print(f"maximum: {maximum}")
print(f"average: {average}")
[Stage 251:====================================================> (25 + 1) / 26]
minimum: 0.0 maximum: 257407.0 average: 864.2732006942781 ====================================== Wall time: 24.174 s RSS Δ: +0.00 MB Peak memory Δ: +0.00 MB (OS-dependent) ======================================
<ExecutionResult object at 712f393ebd60, execution_count=None error_before_exec=None error_in_exec=None info=<ExecutionInfo object at 712f393ebd90, raw_cell="# codecell_52hg6x (keep this id for tracking purpo.." store_history=False silent=False shell_futures=True cell_id=None> result=None>
Wait, something's not right! The average price is 864.27 but the maximum seems suss... It is possible these high prices are just errors.
For simplicity, let us assume a threshold value equal to 100x the average, and remove anything more than that.
Filter events_clean as described.
write some code here
%%timemem
# codecell_52bf5d (keep this id for tracking purposes)
price_threshold = average * 100
events_clean = events_clean.filter(
(F.col("price").isNull()) | (F.col("price") >= 0) & (F.col("price") <= price_threshold)
)
# By the time we get to here, "events_clean" should conform to the specification above.
events_clean.count()
42351862
====================================== Wall time: 20.906 s RSS Δ: +0.00 MB Peak memory Δ: +0.00 MB (OS-dependent) ======================================
<ExecutionResult object at 712f393d1930, execution_count=None error_before_exec=None error_in_exec=None info=<ExecutionInfo object at 712f393d0cd0, raw_cell="# codecell_52bf5d (keep this id for tracking purpo.." store_history=False silent=False shell_futures=True cell_id=None> result=42351862>
Good, we still have about 42.4M records, but we've done some basic data cleaning. Let us continue...
5.3 Build Tiny Lookup Tables (LKPs)¶
Create lookup tables that help us connect events_clean with the dimension tables we created:
user_lkp: (user_id→user_key) fromdim_user.prod_lkp: (product_id→product_key,brand_key,category_key) fromdim_product.date_lkp: (date→date_key) fromdim_date.- session-to-user bridge: use the raw
df_session(session_id,user_id) CSV (not a dimension) to pulluser_id.
Hint: These LKPs are just calling select from the right sources with the right parameters.
write some code here
%%timemem
# codecell_53l2kp (keep this id for tracking purposes)
user_lkp = dim_user.select("user_id", "user_key")
prod_lkp = dim_product.select("product_id", "product_key", "brand_key", "category_key")
date_lkp = dim_date.select("date", "date_key")
session_bridge = df_session.select("session_id", "user_id")
# By the time we get to here, the following variables should conform to the specification above.
print(session_bridge.count(), user_lkp.count(), prod_lkp.count(), date_lkp.count())
9244421 3022290 166794 32 ====================================== Wall time: 2.033 s RSS Δ: +0.00 MB Peak memory Δ: +0.00 MB (OS-dependent) ======================================
<ExecutionResult object at 712f3932ae30, execution_count=None error_before_exec=None error_in_exec=None info=<ExecutionInfo object at 712f3932ba00, raw_cell="# codecell_53l2kp (keep this id for tracking purpo.." store_history=False silent=False shell_futures=True cell_id=None> result=None>
5.4 Join Everything Together¶
Finally, join everything together to create fact_events.
Follow the following steps:
- Start from
clean eventswith these columns: (event_time,event_type,session_id,product_id,price,date). - Join sessions first (to get
user_id). - Then join product, date, and user.
- Join with
dim_userto find out the birthdate and compute user age at the day of the event inage_on_event. - Join with
dim_ageto find the age band based onage_on_event.
Hints:
- You built the LKPs for a reason... use them.
- Left, right, or natural joins?
The final part above is a bit tricky, so we'll just give you the answer. But you'll need to figure out how it integrates with everything above.
.withColumn("age_on_event", F.floor(F.months_between(F.col("date"), F.to_date("birthdate"))/12))
.join(
dim_age.select("age_key", "age_band", "min_age", "max_age"),
(
((F.col("age_on_event") > F.col("min_age"))) &
((F.col("age_on_event") <= F.col("max_age")))
),
"left"
)
The final result (fact_events) should include the following columns:
date_keyuser_keyage_keyproduct_keybrand_keycategory_keysession_idevent_timeevent_typeprice
write some code here
%%timemem
# codecell_54aaaa (keep this id for tracking purposes)
events_fact = events_clean.withColumn("date", F.to_date("event_time"))
events_fact = events_fact.join(session_bridge, on="session_id", how="left")
events_fact = events_fact.join(prod_lkp, on="product_id", how="left")
events_fact = events_fact.join(date_lkp, on="date", how="left")
events_fact = events_fact.join(user_lkp, on="user_id", how="left")
events_fact = events_fact.join(dim_user.select("user_id", "birthdate"), on="user_id", how="left")
events_fact = events_fact.withColumn(
"age_on_event", F.floor(F.months_between(F.col("date"), F.to_date(F.col("birthdate")))/12)
)
events_fact = events_fact.join(
dim_age.select("age_key", "min_age", "max_age"),
(F.col("age_on_event") > F.col("min_age")) & (F.col("age_on_event") <= F.col("max_age")),
how="left"
)
fact_events = events_fact.select(
"date_key",
"user_key",
"age_key",
"product_key",
"brand_key",
"category_key",
"session_id",
"event_time",
"event_type",
"price"
)
# By the time we get to here, "fact_events" should conform to the specification above.
print(fact_events.count())
[Stage 279:===================================================> (96 + 4) / 100]
42351862 ====================================== Wall time: 99.582 s RSS Δ: -24.38 MB Peak memory Δ: +0.12 MB (OS-dependent) ======================================
<ExecutionResult object at 712f3932a6b0, execution_count=None error_before_exec=None error_in_exec=None info=<ExecutionInfo object at 712f3932a410, raw_cell="# codecell_54aaaa (keep this id for tracking purpo.." store_history=False silent=False shell_futures=True cell_id=None> result=None>
print(df_events.count())
print(events_clean.count())
print(fact_events.count())
42418541
42351862
[Stage 315:=====================================================>(99 + 1) / 100]
42351862
Congrats, you've done it! You've created the fact table successfuly! 🚀
Here is the summary of the schema:
date_key(FK →dim_date)user_key(FK →dim_user)age_key(FK →dim_age)product_key(FK →dim_product)brand_key(FK →dim_brand)category_key(FK →dim_category)session_id(STRING, business key, kept directly in this table)event_time(TIMESTAMP)event_tpe(STRING)price(DOUBLE)
6. Export the Fact Table¶
You now have a shiny fact_events table!
But how should you store it?
(Remember our discussion in class about row vs. column representations?)
Let's store fact_events in a few different ways and compare data sizes.
First, let's try writing out as CSV files, both compressed and uncompressed, per below.
Note that in Spark, we specify the output directory, which is then populated with many "part" files.
fact_events.write.mode("overwrite").option("header", True).csv(BASE_DIR + "/fact_events.csv")
fact_events.write.mode("overwrite").option("header", True).option("compression", "snappy").csv(BASE_DIR + "/fact_events.csv.snappy")
25/11/05 15:29:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:29:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:29:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:29:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:29:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:29:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:29:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:29:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:29:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:29:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:29:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:29:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:29:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:29:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:29:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:29:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:29:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:29:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:29:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:29:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:29:15 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:29:15 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:29:15 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:29:15 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:29:15 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:29:15 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:29:15 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:29:15 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:29:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:29:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:29:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:29:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:29:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:29:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:29:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:29:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:29:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:29:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:29:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:29:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:29:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:29:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:29:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:29:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:29:21 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:29:21 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:29:22 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:29:22 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:29:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:29:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:29:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:29:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:29:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:29:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:29:55 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:29:55 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:00 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:00 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:00 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:00 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:00 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:00 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:00 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:00 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:00 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:00 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:01 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:01 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:01 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:01 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:01 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:01 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:01 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:01 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:01 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:01 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:02 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:02 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:02 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:02 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:02 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:02 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:02 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:02 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:02 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:02 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:02 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:02 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:02 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:02 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:02 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:02 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:02 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:02 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:06 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:06 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:06 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:06 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:07 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:07 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:07 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:07 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 25/11/05 15:32:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. [Stage 379:===============> (8 + 8) / 29]
[4639.331s][warning][gc,alloc] Executor task launch worker for task 9.0 in stage 379.0 (TID 1468): Retried waiting for GCLocker too often allocating 2097154 words [4639.356s][warning][gc,alloc] Executor task launch worker for task 8.0 in stage 379.0 (TID 1467): Retried waiting for GCLocker too often allocating 2097154 words [4639.374s][warning][gc,alloc] Executor task launch worker for task 11.0 in stage 379.0 (TID 1470): Retried waiting for GCLocker too often allocating 2097154 words [4639.374s][warning][gc,alloc] Executor task launch worker for task 9.0 in stage 379.0 (TID 1468): Retried waiting for GCLocker too often allocating 2097154 words
25/11/05 15:33:08 WARN TaskMemoryManager: Failed to allocate a page (16777216 bytes), try again.
25/11/05 15:33:08 WARN TaskMemoryManager: Failed to allocate a page (16777216 bytes), try again.
25/11/05 15:33:08 WARN TaskMemoryManager: Failed to allocate a page (16777216 bytes), try again.
25/11/05 15:33:08 WARN TaskMemoryManager: Failed to allocate a page (16777216 bytes), try again.
Let's then try Parquet:
fact_events.write.mode("overwrite").parquet(BASE_DIR + "/fact_events.parquet")
25/11/05 15:36:54 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/05 15:36:54 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/05 15:36:54 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/05 15:36:54 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/05 15:36:54 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/05 15:36:54 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/05 15:36:54 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/05 15:36:54 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/05 15:36:54 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/05 15:36:54 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/05 15:36:55 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/05 15:36:55 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/05 15:36:55 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/05 15:36:55 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/05 15:36:55 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/05 15:36:55 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/05 15:36:55 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/05 15:36:55 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/05 15:36:55 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/05 15:36:55 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/05 15:36:59 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/05 15:36:59 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/05 15:36:59 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/05 15:37:00 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/05 15:37:00 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/05 15:37:00 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/05 15:37:00 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/05 15:37:00 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/05 15:37:00 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/05 15:37:00 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/05 15:37:00 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/05 15:37:00 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/05 15:37:00 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/05 15:37:00 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/05 15:37:00 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/05 15:37:00 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/05 15:37:00 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/05 15:37:00 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/05 15:37:01 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/05 15:37:01 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/05 15:37:40 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/05 15:37:40 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/05 15:37:40 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/05 15:37:40 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/05 15:37:40 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/05 15:37:40 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/05 15:37:41 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/05 15:37:41 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/05 15:37:41 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/05 15:37:41 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/05 15:38:39 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/11/05 15:38:46 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/11/05 15:38:49 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/11/05 15:38:51 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/11/05 15:38:53 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/11/05 15:38:55 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/11/05 15:38:55 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/11/05 15:38:59 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/11/05 15:39:01 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/11/05 15:39:03 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/11/05 15:39:03 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/11/05 15:39:04 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/11/05 15:39:05 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/11/05 15:39:06 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/11/05 15:39:06 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/11/05 15:39:07 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/11/05 15:39:08 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/11/05 15:39:09 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/11/05 15:39:10 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/11/05 15:39:11 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/11/05 15:39:16 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/11/05 15:39:20 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/11/05 15:39:23 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/11/05 15:39:24 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/11/05 15:39:25 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/11/05 15:39:26 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/11/05 15:39:26 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/11/05 15:39:28 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/11/05 15:39:30 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/11/05 15:39:30 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/11/05 15:39:32 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/11/05 15:39:32 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/11/05 15:39:34 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/11/05 15:39:34 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/11/05 15:39:36 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/11/05 15:39:36 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/11/05 15:39:36 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/11/05 15:39:39 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/11/05 15:39:40 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/11/05 15:39:40 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/11/05 15:39:42 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
Let's compare the output sizes using the following bit of code:
import os
for f in [BASE_DIR + "/fact_events.csv", BASE_DIR + "/fact_events.csv.snappy", BASE_DIR + "/fact_events.parquet"]:
try:
size = sum(os.path.getsize(os.path.join(dp, fn))
for dp, dn, filenames in os.walk(f)
for fn in filenames)
print(f"{f}: {size/(1024*1024*1024):.1f} GB")
except FileNotFoundError:
pass
/home/justine/de1-website/DE1/labs-final/lab2-assignment/csv/fact_events.csv: 4.3 GB /home/justine/de1-website/DE1/labs-final/lab2-assignment/csv/fact_events.csv.snappy: 1.2 GB /home/justine/de1-website/DE1/labs-final/lab2-assignment/csv/fact_events.parquet: 1.0 GB
your answers below!
// qcell_6a9876 (keep this id for tracking purposes)
- Size of CSV output, no compression: 4.3 GB
- Size of CSV output, Snappy compression: 1.2 GB
- Size of Parquet output: 1.0 GB
Answer the following question:
Q6.1 Why is columnar storage (Parquet) usually much smaller?
Q6.2 Which format is better for analytical queries and why?
your answers below!
// qcell_6b1234 (keep this id for tracking purposes)
Q6.1 Answer: Parquet files are much smaller because they store data column by column. This makes compression more efficient, especially when values repeat a lot in a column. Also, Parquet uses encoding and metadata optimizations, whereas CSV just stores everything as plain text, which takes more space.
Q6.2 Answer: I would say Parquet is better for analytical queries because it only reads the columns you need, so queries run faster. Also, it is compressed and optimized for filtering and aggregation, which reduces I/O and memory usage. CSV is easier to handle for small files, but for big datasets, Parquet is definitely more efficient (and smaller size of storage output).
7. Submission¶
Details about the Submission of this assignment are outlined in the helper.
%%timemem
spark.stop()
--------------------------------------------------------------------------- ConnectionRefusedError Traceback (most recent call last) Cell In[79], line 1 ----> 1 spark.stop() File ~/miniconda3/envs/de1-env/lib/python3.10/site-packages/pyspark/sql/session.py:2023, in SparkSession.stop(self) 2009 """ 2010 Stop the underlying :class:`SparkContext`. 2011 (...) 2019 >>> spark.stop() # doctest: +SKIP 2020 """ 2021 from pyspark.sql.context import SQLContext -> 2023 self._sc.stop() 2024 # We should clean the default session up. See SPARK-23228. 2025 assert self._jvm is not None File ~/miniconda3/envs/de1-env/lib/python3.10/site-packages/pyspark/core/context.py:684, in SparkContext.stop(self) 682 if getattr(self, "_jsc", None): 683 try: --> 684 self._jsc.stop() 685 except Py4JError: 686 # Case: SPARK-18523 687 warnings.warn( 688 "Unable to cleanly shutdown Spark JVM process." 689 " It is possible that the process has crashed," 690 " been killed or may also be in a zombie state.", 691 RuntimeWarning, 692 ) File ~/miniconda3/envs/de1-env/lib/python3.10/site-packages/py4j/java_gateway.py:1361, in JavaMember.__call__(self, *args) 1354 args_command, temp_args = self._build_args(*args) 1356 command = proto.CALL_COMMAND_NAME +\ 1357 self.command_header +\ 1358 args_command +\ 1359 proto.END_COMMAND_PART -> 1361 answer = self.gateway_client.send_command(command) 1362 return_value = get_return_value( 1363 answer, self.gateway_client, self.target_id, self.name) 1365 for temp_arg in temp_args: File ~/miniconda3/envs/de1-env/lib/python3.10/site-packages/py4j/java_gateway.py:1036, in GatewayClient.send_command(self, command, retry, binary) 1015 def send_command(self, command, retry=True, binary=False): 1016 """Sends a command to the JVM. This method is not intended to be 1017 called directly by Py4J users. It is usually called by 1018 :class:`JavaMember` instances. (...) 1034 if `binary` is `True`. 1035 """ -> 1036 connection = self._get_connection() 1037 try: 1038 response = connection.send_command(command) File ~/miniconda3/envs/de1-env/lib/python3.10/site-packages/py4j/clientserver.py:284, in JavaClient._get_connection(self) 281 pass 283 if connection is None or connection.socket is None: --> 284 connection = self._create_new_connection() 285 return connection File ~/miniconda3/envs/de1-env/lib/python3.10/site-packages/py4j/clientserver.py:291, in JavaClient._create_new_connection(self) 287 def _create_new_connection(self): 288 connection = ClientServerConnection( 289 self.java_parameters, self.python_parameters, 290 self.gateway_property, self) --> 291 connection.connect_to_java_server() 292 self.set_thread_connection(connection) 293 return connection File ~/miniconda3/envs/de1-env/lib/python3.10/site-packages/py4j/clientserver.py:438, in ClientServerConnection.connect_to_java_server(self) 435 if self.ssl_context: 436 self.socket = self.ssl_context.wrap_socket( 437 self.socket, server_hostname=self.java_address) --> 438 self.socket.connect((self.java_address, self.java_port)) 439 self.stream = self.socket.makefile("rb") 440 self.is_connected = True ConnectionRefusedError: [Errno 111] Connection refused
====================================== Wall time: 0.094 s RSS Δ: +2.75 MB Peak memory Δ: +0.00 MB (OS-dependent) ======================================
<ExecutionResult object at 712f3044ca30, execution_count=None error_before_exec=None error_in_exec=[Errno 111] Connection refused info=<ExecutionInfo object at 712f3044c9a0, raw_cell="spark.stop() " store_history=False silent=False shell_futures=True cell_id=None> result=None>
Deliverables¶
- This notebook with all code cells executed.
- A brief
REPORT.mdwith: inputs, assumptions, plan screenshots, quality results, and performance choices. - Output folder with Parquet sample (≤20 MB).
Evaluation¶
- Correctness and clarity of pipeline (40%).
- Data‑quality gates and rationale (20%).
- Performance reasoning and plan analysis (20%).
- Reproducibility and organization (20%).
Performance notes¶
- Record
spark.sql.shuffle.partitionsand justify your value. - Show one example of avoiding UDFs by using built‑ins.
- If you use broadcast join, explain why it is safe.
spark.conf.get("spark.sql.shuffle.partitions")
'200'
Reproducibility checklist¶
- List Spark version and key configs.
- Fix time zone to UTC.
- Control randomness if used.
- Provide exact commands to run the notebook end‑to‑end.
spark.conf.set("spark.sql.session.timeZone", "UTC")
print("Spark version:", spark.version)
print("Spark session time zone:", spark.conf.get("spark.sql.session.timeZone"))
print("BASE_DIR:", BASE_DIR)
print("Dataframes loaded:", df_user.count(), df_events.count())
Spark version: 4.0.1 Spark session time zone: UTC BASE_DIR: /home/justine/de1-website/DE1/labs-final/lab2-assignment/csv
[Stage 428:====================================================> (25 + 1) / 26]
Dataframes loaded: 3022290 42418541
print(spark.sparkContext.getConf().getAll())
[('spark.driver.port', '46809'), ('spark.driver.host', '10.255.255.254'), ('spark.rdd.compress', 'True'), ('spark.hadoop.fs.s3a.vectored.read.min.seek.size', '128K'), ('spark.app.name', 'de1-lab2-assignment'), ('spark.executor.extraJavaOptions', '-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-modules=jdk.incubator.vector --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false -Dio.netty.tryReflectionSetAccessible=true'), ('spark.sql.artifact.isolation.enabled', 'false'), ('spark.sql.warehouse.dir', 'file:/home/justine/Data_engineering/lab2_assignment/spark-warehouse'), ('spark.app.startTime', '1762348551846'), ('spark.master', 'local[*]'), ('spark.app.submitTime', '1762348551222'), ('spark.executor.id', 'driver'), ('spark.submit.pyFiles', ''), ('spark.driver.extraJavaOptions', '-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-modules=jdk.incubator.vector --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false -Dio.netty.tryReflectionSetAccessible=true'), ('spark.hadoop.fs.s3a.vectored.read.max.merged.size', '2M'), ('spark.app.id', 'local-1762348552951'), ('spark.submit.deployMode', 'client'), ('spark.serializer.objectStreamReset', '100'), ('spark.ui.showConsoleProgress', 'true')]