DE1 — Final Project Notebook¶
Author : Badr TAJINI - Data Engineering I - ESIEE 2025-2026
This is the primary executable artifact. Fill config, run baseline, then optimized pipeline, and record evidence.
0. Load config¶
In [2]:
import yaml, pathlib, datetime
from pyspark.sql import SparkSession, functions as F, types as T
with open("de1_project_config.yml") as f:
CFG = yaml.safe_load(f)
spark = SparkSession.builder.appName("de1-project").getOrCreate()
CFG
WARNING: Using incubator modules: jdk.incubator.vector Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties 25/11/19 14:40:42 WARN Utils: Your hostname, OrdideJustine, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo) 25/11/19 14:40:42 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 25/11/19 14:40:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Out[2]:
{'paths': {'raw_csv_glob': 'data/en.openfoodfacts.org.products.csv',
'bronze': 'outputs/project/bronze',
'silver': 'outputs/project/silver',
'gold': 'outputs/project/gold',
'proof': 'outputs/project/proof'},
'layout': {'partition_by': ['countries_en'],
'sort_by': [],
'compression': 'snappy'},
'queries': {'schema_silver': {'code': 'string',
'product_name': 'string',
'countries_en': 'string',
'fat_100g': 'double',
'sugars_100g': 'double',
'created_t': 'long'}}}
In [22]:
import time
from pyspark.sql import SparkSession
class SparkMetricsCollector:
"""
ROBUST Version: Tool to automatically measure Spark job performance.
Includes fixes for missing methods in PySpark (clearJobGroup).
"""
def __init__(self, spark: SparkSession):
self.spark = spark
self.sc = spark.sparkContext
# We wrap this in try/except in case statusTracker is not available
try:
self.tracker = self.sc._jsc.sc().statusTracker()
except:
self.tracker = None
def start_task(self, group_id, description):
"""Starts the timer and tags the Spark job."""
self.group_id = group_id
self.sc.setJobGroup(group_id, description)
self.start_time = time.time()
print(f"Starting tracking for: {description}")
def stop_task(self):
"""Stops tracking and attempts to collect metrics safely."""
end_time = time.time()
elapsed_ms = (end_time - self.start_time) * 1000
# FIX: clearJobGroup() does not exist in PySpark Python API.
# We replace the current group with an empty one to stop tracking.
try:
self.sc.setJobGroup("", "")
except Exception:
pass
shuffle_write = 0
shuffle_read = 0
input_bytes = 0
# Attempt to get metrics via internal API (Safe Mode)
if self.tracker:
try:
job_ids = list(self.tracker.getJobIdsForGroup(self.group_id))
for job_id in job_ids:
job_info = self.tracker.getJobInfo(job_id)
if job_info.isEmpty(): continue
stage_ids = job_info.get().stageIds()
for stage_id in stage_ids:
stage_info = self.tracker.getStageInfo(stage_id)
if stage_info.isEmpty(): continue
# SAFE BLOCK: If accessing taskMetrics fails, we skip it
try:
stage_obj = stage_info.get()
metrics = stage_obj.taskMetrics()
shuffle_write += metrics.shuffleWriteMetrics().bytesWritten()
shuffle_read += metrics.shuffleReadMetrics().totalBytesRead()
input_bytes += metrics.inputMetrics().bytesRead()
except Exception:
pass
except Exception as e:
print(f"Warning: Could not collect detailed Shuffle metrics. Using Time only.")
results = {
"elapsed_ms": round(elapsed_ms, 2),
"shuffle_write_bytes": shuffle_write,
"shuffle_read_bytes": shuffle_read,
"input_bytes": input_bytes
}
print("Tracking completed.")
print(f" - Time: {results['elapsed_ms']} ms")
if shuffle_write > 0:
print(f" - Shuffle Write: {results['shuffle_write_bytes'] / (1024*1024):.2f} MB")
else:
print(" - Shuffle metrics: 0 (or unavailable)")
return results
# Initialize the collector
metrics_collector = SparkMetricsCollector(spark)
In [18]:
import pandas as pd
import datetime
def update_metrics_log(query_name, phase, metrics_dict, note=""):
"""
Updates the project_metrics_log.csv file with the collected metrics.
Args:
query_name (str): The query identifier (e.g., 'Q1', 'Q2').
phase (str): The execution phase ('baseline' or 'optimized').
metrics_dict (dict): Dictionary containing metrics (elapsed_ms, shuffle_bytes, etc.).
note (str): Optional note to append to the log.
"""
csv_path = "project_metrics_log.csv"
try:
# Load the existing CSV file
df = pd.read_csv(csv_path)
# Create a filter mask to locate the specific row (e.g., Q1 + baseline)
mask = (df['query'] == query_name) & (df['phase'] == phase)
if not df[mask].empty:
# Update columns with values from the metrics dictionary
df.loc[mask, 'elapsed_ms'] = metrics_dict.get('elapsed_ms', 0)
df.loc[mask, 'shuffle_write_bytes'] = metrics_dict.get('shuffle_write_bytes', 0)
df.loc[mask, 'shuffle_read_bytes'] = metrics_dict.get('shuffle_read_bytes', 0)
# Update input size if provided by the Spark collector
if 'input_bytes' in metrics_dict and metrics_dict['input_bytes'] > 0:
df.loc[mask, 'input_size_bytes'] = metrics_dict['input_bytes']
# Update metadata columns (timestamp and notes)
df.loc[mask, 'timestamp'] = datetime.datetime.now().isoformat()
df.loc[mask, 'notes'] = f"Auto-captured. {note}"
# Save changes back to CSV without the index
df.to_csv(csv_path, index=False)
print(f"[INFO] CSV updated successfully for {query_name} ({phase})")
# Optional: Print a preview of the updated row
# print(df[mask][['query', 'phase', 'elapsed_ms', 'shuffle_write_bytes']])
else:
print(f"[WARN] Row not found for Query: {query_name}, Phase: {phase}. Check spelling.")
except FileNotFoundError:
print(f"[ERROR] File {csv_path} not found. Please ensure it exists in the project root.")
except Exception as e:
print(f"[ERROR] An unexpected error occurred while updating CSV: {e}")
1. Bronze — landing raw data¶
In [4]:
raw_glob = CFG["paths"]["raw_csv_glob"]
bronze = CFG["paths"]["bronze"]
proof = CFG["paths"]["proof"]
df_raw = (spark.read.option("header","true").option("sep", "\t").option("pathGlobFilter", "[!.]*").csv(raw_glob))
df_raw.write.mode("overwrite").csv(bronze) # keep raw as CSV copy
print("Bronze written:", bronze)
[Stage 3:========================================================>(88 + 1) / 89]
Bronze written: outputs/project/bronze
In [5]:
df_raw.printSchema()
df_raw.show(5)
root |-- code: string (nullable = true) |-- url: string (nullable = true) |-- creator: string (nullable = true) |-- created_t: string (nullable = true) |-- created_datetime: string (nullable = true) |-- last_modified_t: string (nullable = true) |-- last_modified_datetime: string (nullable = true) |-- last_modified_by: string (nullable = true) |-- last_updated_t: string (nullable = true) |-- last_updated_datetime: string (nullable = true) |-- product_name: string (nullable = true) |-- abbreviated_product_name: string (nullable = true) |-- generic_name: string (nullable = true) |-- quantity: string (nullable = true) |-- packaging: string (nullable = true) |-- packaging_tags: string (nullable = true) |-- packaging_en: string (nullable = true) |-- packaging_text: string (nullable = true) |-- brands: string (nullable = true) |-- brands_tags: string (nullable = true) |-- brands_en: string (nullable = true) |-- categories: string (nullable = true) |-- categories_tags: string (nullable = true) |-- categories_en: string (nullable = true) |-- origins: string (nullable = true) |-- origins_tags: string (nullable = true) |-- origins_en: string (nullable = true) |-- manufacturing_places: string (nullable = true) |-- manufacturing_places_tags: string (nullable = true) |-- labels: string (nullable = true) |-- labels_tags: string (nullable = true) |-- labels_en: string (nullable = true) |-- emb_codes: string (nullable = true) |-- emb_codes_tags: string (nullable = true) |-- first_packaging_code_geo: string (nullable = true) |-- cities: string (nullable = true) |-- cities_tags: string (nullable = true) |-- purchase_places: string (nullable = true) |-- stores: string (nullable = true) |-- countries: string (nullable = true) |-- countries_tags: string (nullable = true) |-- countries_en: string (nullable = true) |-- ingredients_text: string (nullable = true) |-- ingredients_tags: string (nullable = true) |-- ingredients_analysis_tags: string (nullable = true) |-- allergens: string (nullable = true) |-- allergens_en: string (nullable = true) |-- traces: string (nullable = true) |-- traces_tags: string (nullable = true) |-- traces_en: string (nullable = true) |-- serving_size: string (nullable = true) |-- serving_quantity: string (nullable = true) |-- no_nutrition_data: string (nullable = true) |-- additives_n: string (nullable = true) |-- additives: string (nullable = true) |-- additives_tags: string (nullable = true) |-- additives_en: string (nullable = true) |-- nutriscore_score: string (nullable = true) |-- nutriscore_grade: string (nullable = true) |-- nova_group: string (nullable = true) |-- pnns_groups_1: string (nullable = true) |-- pnns_groups_2: string (nullable = true) |-- food_groups: string (nullable = true) |-- food_groups_tags: string (nullable = true) |-- food_groups_en: string (nullable = true) |-- states: string (nullable = true) |-- states_tags: string (nullable = true) |-- states_en: string (nullable = true) |-- brand_owner: string (nullable = true) |-- environmental_score_score: string (nullable = true) |-- environmental_score_grade: string (nullable = true) |-- nutrient_levels_tags: string (nullable = true) |-- product_quantity: string (nullable = true) |-- owner: string (nullable = true) |-- data_quality_errors_tags: string (nullable = true) |-- unique_scans_n: string (nullable = true) |-- popularity_tags: string (nullable = true) |-- completeness: string (nullable = true) |-- last_image_t: string (nullable = true) |-- last_image_datetime: string (nullable = true) |-- main_category: string (nullable = true) |-- main_category_en: string (nullable = true) |-- image_url: string (nullable = true) |-- image_small_url: string (nullable = true) |-- image_ingredients_url: string (nullable = true) |-- image_ingredients_small_url: string (nullable = true) |-- image_nutrition_url: string (nullable = true) |-- image_nutrition_small_url: string (nullable = true) |-- energy-kj_100g: string (nullable = true) |-- energy-kcal_100g: string (nullable = true) |-- energy_100g: string (nullable = true) |-- energy-from-fat_100g: string (nullable = true) |-- fat_100g: string (nullable = true) |-- saturated-fat_100g: string (nullable = true) |-- butyric-acid_100g: string (nullable = true) |-- caproic-acid_100g: string (nullable = true) |-- caprylic-acid_100g: string (nullable = true) |-- capric-acid_100g: string (nullable = true) |-- lauric-acid_100g: string (nullable = true) |-- myristic-acid_100g: string (nullable = true) |-- palmitic-acid_100g: string (nullable = true) |-- stearic-acid_100g: string (nullable = true) |-- arachidic-acid_100g: string (nullable = true) |-- behenic-acid_100g: string (nullable = true) |-- lignoceric-acid_100g: string (nullable = true) |-- cerotic-acid_100g: string (nullable = true) |-- montanic-acid_100g: string (nullable = true) |-- melissic-acid_100g: string (nullable = true) |-- unsaturated-fat_100g: string (nullable = true) |-- monounsaturated-fat_100g: string (nullable = true) |-- omega-9-fat_100g: string (nullable = true) |-- polyunsaturated-fat_100g: string (nullable = true) |-- omega-3-fat_100g: string (nullable = true) |-- omega-6-fat_100g: string (nullable = true) |-- alpha-linolenic-acid_100g: string (nullable = true) |-- eicosapentaenoic-acid_100g: string (nullable = true) |-- docosahexaenoic-acid_100g: string (nullable = true) |-- linoleic-acid_100g: string (nullable = true) |-- arachidonic-acid_100g: string (nullable = true) |-- gamma-linolenic-acid_100g: string (nullable = true) |-- dihomo-gamma-linolenic-acid_100g: string (nullable = true) |-- oleic-acid_100g: string (nullable = true) |-- elaidic-acid_100g: string (nullable = true) |-- gondoic-acid_100g: string (nullable = true) |-- mead-acid_100g: string (nullable = true) |-- erucic-acid_100g: string (nullable = true) |-- nervonic-acid_100g: string (nullable = true) |-- trans-fat_100g: string (nullable = true) |-- cholesterol_100g: string (nullable = true) |-- carbohydrates_100g: string (nullable = true) |-- sugars_100g: string (nullable = true) |-- added-sugars_100g: string (nullable = true) |-- sucrose_100g: string (nullable = true) |-- glucose_100g: string (nullable = true) |-- fructose_100g: string (nullable = true) |-- galactose_100g: string (nullable = true) |-- lactose_100g: string (nullable = true) |-- maltose_100g: string (nullable = true) |-- maltodextrins_100g: string (nullable = true) |-- psicose_100g: string (nullable = true) |-- starch_100g: string (nullable = true) |-- polyols_100g: string (nullable = true) |-- erythritol_100g: string (nullable = true) |-- isomalt_100g: string (nullable = true) |-- maltitol_100g: string (nullable = true) |-- sorbitol_100g: string (nullable = true) |-- fiber_100g: string (nullable = true) |-- soluble-fiber_100g: string (nullable = true) |-- polydextrose_100g: string (nullable = true) |-- insoluble-fiber_100g: string (nullable = true) |-- proteins_100g: string (nullable = true) |-- casein_100g: string (nullable = true) |-- serum-proteins_100g: string (nullable = true) |-- nucleotides_100g: string (nullable = true) |-- salt_100g: string (nullable = true) |-- added-salt_100g: string (nullable = true) |-- sodium_100g: string (nullable = true) |-- alcohol_100g: string (nullable = true) |-- vitamin-a_100g: string (nullable = true) |-- beta-carotene_100g: string (nullable = true) |-- vitamin-d_100g: string (nullable = true) |-- vitamin-e_100g: string (nullable = true) |-- vitamin-k_100g: string (nullable = true) |-- vitamin-c_100g: string (nullable = true) |-- vitamin-b1_100g: string (nullable = true) |-- vitamin-b2_100g: string (nullable = true) |-- vitamin-pp_100g: string (nullable = true) |-- vitamin-b6_100g: string (nullable = true) |-- vitamin-b9_100g: string (nullable = true) |-- folates_100g: string (nullable = true) |-- vitamin-b12_100g: string (nullable = true) |-- biotin_100g: string (nullable = true) |-- pantothenic-acid_100g: string (nullable = true) |-- silica_100g: string (nullable = true) |-- bicarbonate_100g: string (nullable = true) |-- potassium_100g: string (nullable = true) |-- chloride_100g: string (nullable = true) |-- calcium_100g: string (nullable = true) |-- phosphorus_100g: string (nullable = true) |-- iron_100g: string (nullable = true) |-- magnesium_100g: string (nullable = true) |-- zinc_100g: string (nullable = true) |-- copper_100g: string (nullable = true) |-- manganese_100g: string (nullable = true) |-- fluoride_100g: string (nullable = true) |-- selenium_100g: string (nullable = true) |-- chromium_100g: string (nullable = true) |-- molybdenum_100g: string (nullable = true) |-- iodine_100g: string (nullable = true) |-- caffeine_100g: string (nullable = true) |-- taurine_100g: string (nullable = true) |-- methylsulfonylmethane_100g: string (nullable = true) |-- ph_100g: string (nullable = true) |-- fruits-vegetables-nuts_100g: string (nullable = true) |-- fruits-vegetables-nuts-dried_100g: string (nullable = true) |-- fruits-vegetables-nuts-estimate_100g: string (nullable = true) |-- fruits-vegetables-nuts-estimate-from-ingredients_100g: string (nullable = true) |-- collagen-meat-protein-ratio_100g: string (nullable = true) |-- cocoa_100g: string (nullable = true) |-- chlorophyl_100g: string (nullable = true) |-- carbon-footprint_100g: string (nullable = true) |-- carbon-footprint-from-meat-or-fish_100g: string (nullable = true) |-- nutrition-score-fr_100g: string (nullable = true) |-- nutrition-score-uk_100g: string (nullable = true) |-- glycemic-index_100g: string (nullable = true) |-- water-hardness_100g: string (nullable = true) |-- choline_100g: string (nullable = true) |-- phylloquinone_100g: string (nullable = true) |-- beta-glucan_100g: string (nullable = true) |-- inositol_100g: string (nullable = true) |-- carnitine_100g: string (nullable = true) |-- sulphate_100g: string (nullable = true) |-- nitrate_100g: string (nullable = true) |-- acidity_100g: string (nullable = true) |-- carbohydrates-total_100g: string (nullable = true)
+--------+--------------------+--------------------+----------+--------------------+---------------+----------------------+----------------+--------------+---------------------+--------------------+------------------------+------------+--------+---------+--------------+------------+--------------+--------------------+--------------------+--------------------+-------------------+--------------------+-------------------+---------------+--------------------+----------------+--------------------+-------------------------+--------------------+--------------------+--------------------+---------+--------------+------------------------+------+-----------+---------------+------+------------------+--------------------+--------------+--------------------+--------------------+-------------------------+-------------------+------------+------+-----------+---------+------------+----------------+-----------------+-----------+---------+--------------+------------+----------------+----------------+----------+-------------+-------------+-----------+----------------+--------------+--------------------+--------------------+--------------------+-----------+-------------------------+-------------------------+--------------------+----------------+-----+------------------------+--------------+--------------------+------------+------------+--------------------+--------------------+-------------------+--------------------+--------------------+---------------------+---------------------------+--------------------+-------------------------+--------------+----------------+-----------+--------------------+--------+------------------+-----------------+-----------------+------------------+----------------+----------------+------------------+------------------+-----------------+-------------------+-----------------+--------------------+-----------------+------------------+------------------+--------------------+------------------------+----------------+------------------------+----------------+----------------+-------------------------+--------------------------+-------------------------+------------------+---------------------+-------------------------+--------------------------------+---------------+-----------------+-----------------+--------------+----------------+------------------+--------------+----------------+------------------+-----------+-----------------+------------+------------+-------------+--------------+------------+------------+------------------+------------+-----------+------------+---------------+------------+-------------+-------------+----------+------------------+-----------------+--------------------+-------------+-----------+-------------------+----------------+---------+---------------+-----------+------------+--------------+------------------+--------------+--------------+--------------+--------------+---------------+---------------+---------------+---------------+---------------+------------+----------------+-----------+---------------------+-----------+----------------+--------------+-------------+------------+---------------+---------+--------------+---------+-----------+--------------+-------------+-------------+-------------+---------------+-----------+-------------+------------+--------------------------+-------+---------------------------+---------------------------------+------------------------------------+-----------------------------------------------------+--------------------------------+----------+---------------+---------------------+---------------------------------------+-----------------------+-----------------------+-------------------+-------------------+------------+------------------+----------------+-------------+--------------+-------------+------------+------------+------------------------+ | code| url| creator| created_t| created_datetime|last_modified_t|last_modified_datetime|last_modified_by|last_updated_t|last_updated_datetime| product_name|abbreviated_product_name|generic_name|quantity|packaging|packaging_tags|packaging_en|packaging_text| brands| brands_tags| brands_en| categories| categories_tags| categories_en| origins| origins_tags| origins_en|manufacturing_places|manufacturing_places_tags| labels| labels_tags| labels_en|emb_codes|emb_codes_tags|first_packaging_code_geo|cities|cities_tags|purchase_places|stores| countries| countries_tags| countries_en| ingredients_text| ingredients_tags|ingredients_analysis_tags| allergens|allergens_en|traces|traces_tags|traces_en|serving_size|serving_quantity|no_nutrition_data|additives_n|additives|additives_tags|additives_en|nutriscore_score|nutriscore_grade|nova_group|pnns_groups_1|pnns_groups_2|food_groups|food_groups_tags|food_groups_en| states| states_tags| states_en|brand_owner|environmental_score_score|environmental_score_grade|nutrient_levels_tags|product_quantity|owner|data_quality_errors_tags|unique_scans_n| popularity_tags|completeness|last_image_t| last_image_datetime| main_category| main_category_en| image_url| image_small_url|image_ingredients_url|image_ingredients_small_url| image_nutrition_url|image_nutrition_small_url|energy-kj_100g|energy-kcal_100g|energy_100g|energy-from-fat_100g|fat_100g|saturated-fat_100g|butyric-acid_100g|caproic-acid_100g|caprylic-acid_100g|capric-acid_100g|lauric-acid_100g|myristic-acid_100g|palmitic-acid_100g|stearic-acid_100g|arachidic-acid_100g|behenic-acid_100g|lignoceric-acid_100g|cerotic-acid_100g|montanic-acid_100g|melissic-acid_100g|unsaturated-fat_100g|monounsaturated-fat_100g|omega-9-fat_100g|polyunsaturated-fat_100g|omega-3-fat_100g|omega-6-fat_100g|alpha-linolenic-acid_100g|eicosapentaenoic-acid_100g|docosahexaenoic-acid_100g|linoleic-acid_100g|arachidonic-acid_100g|gamma-linolenic-acid_100g|dihomo-gamma-linolenic-acid_100g|oleic-acid_100g|elaidic-acid_100g|gondoic-acid_100g|mead-acid_100g|erucic-acid_100g|nervonic-acid_100g|trans-fat_100g|cholesterol_100g|carbohydrates_100g|sugars_100g|added-sugars_100g|sucrose_100g|glucose_100g|fructose_100g|galactose_100g|lactose_100g|maltose_100g|maltodextrins_100g|psicose_100g|starch_100g|polyols_100g|erythritol_100g|isomalt_100g|maltitol_100g|sorbitol_100g|fiber_100g|soluble-fiber_100g|polydextrose_100g|insoluble-fiber_100g|proteins_100g|casein_100g|serum-proteins_100g|nucleotides_100g|salt_100g|added-salt_100g|sodium_100g|alcohol_100g|vitamin-a_100g|beta-carotene_100g|vitamin-d_100g|vitamin-e_100g|vitamin-k_100g|vitamin-c_100g|vitamin-b1_100g|vitamin-b2_100g|vitamin-pp_100g|vitamin-b6_100g|vitamin-b9_100g|folates_100g|vitamin-b12_100g|biotin_100g|pantothenic-acid_100g|silica_100g|bicarbonate_100g|potassium_100g|chloride_100g|calcium_100g|phosphorus_100g|iron_100g|magnesium_100g|zinc_100g|copper_100g|manganese_100g|fluoride_100g|selenium_100g|chromium_100g|molybdenum_100g|iodine_100g|caffeine_100g|taurine_100g|methylsulfonylmethane_100g|ph_100g|fruits-vegetables-nuts_100g|fruits-vegetables-nuts-dried_100g|fruits-vegetables-nuts-estimate_100g|fruits-vegetables-nuts-estimate-from-ingredients_100g|collagen-meat-protein-ratio_100g|cocoa_100g|chlorophyl_100g|carbon-footprint_100g|carbon-footprint-from-meat-or-fish_100g|nutrition-score-fr_100g|nutrition-score-uk_100g|glycemic-index_100g|water-hardness_100g|choline_100g|phylloquinone_100g|beta-glucan_100g|inositol_100g|carnitine_100g|sulphate_100g|nitrate_100g|acidity_100g|carbohydrates-total_100g| +--------+--------------------+--------------------+----------+--------------------+---------------+----------------------+----------------+--------------+---------------------+--------------------+------------------------+------------+--------+---------+--------------+------------+--------------+--------------------+--------------------+--------------------+-------------------+--------------------+-------------------+---------------+--------------------+----------------+--------------------+-------------------------+--------------------+--------------------+--------------------+---------+--------------+------------------------+------+-----------+---------------+------+------------------+--------------------+--------------+--------------------+--------------------+-------------------------+-------------------+------------+------+-----------+---------+------------+----------------+-----------------+-----------+---------+--------------+------------+----------------+----------------+----------+-------------+-------------+-----------+----------------+--------------+--------------------+--------------------+--------------------+-----------+-------------------------+-------------------------+--------------------+----------------+-----+------------------------+--------------+--------------------+------------+------------+--------------------+--------------------+-------------------+--------------------+--------------------+---------------------+---------------------------+--------------------+-------------------------+--------------+----------------+-----------+--------------------+--------+------------------+-----------------+-----------------+------------------+----------------+----------------+------------------+------------------+-----------------+-------------------+-----------------+--------------------+-----------------+------------------+------------------+--------------------+------------------------+----------------+------------------------+----------------+----------------+-------------------------+--------------------------+-------------------------+------------------+---------------------+-------------------------+--------------------------------+---------------+-----------------+-----------------+--------------+----------------+------------------+--------------+----------------+------------------+-----------+-----------------+------------+------------+-------------+--------------+------------+------------+------------------+------------+-----------+------------+---------------+------------+-------------+-------------+----------+------------------+-----------------+--------------------+-------------+-----------+-------------------+----------------+---------+---------------+-----------+------------+--------------+------------------+--------------+--------------+--------------+--------------+---------------+---------------+---------------+---------------+---------------+------------+----------------+-----------+---------------------+-----------+----------------+--------------+-------------+------------+---------------+---------+--------------+---------+-----------+--------------+-------------+-------------+-------------+---------------+-----------+-------------+------------+--------------------------+-------+---------------------------+---------------------------------+------------------------------------+-----------------------------------------------------+--------------------------------+----------+---------------+---------------------+---------------------------------------+-----------------------+-----------------------+-------------------+-------------------+------------+------------------+----------------+-------------+--------------+-------------+------------+------------+------------------------+ |00000002|http://world-en.o...|openfoodfacts-con...|1760861583|2025-10-19T08:13:03Z| 1760861586| 2025-10-19T08:13:06Z| NULL| 1760861586| 2025-10-19T08:13:06Z| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| en:Germany| en:germany| Germany| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL|en:to-be-complete...|en:to-be-complete...|To be completed,N...| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| 0.05| 1760861586|2025-10-19T08:13:06Z| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| |00000003|http://world-en.o...|openfoodfacts-con...|1752485388|2025-07-14T09:29:48Z| 1752485389| 2025-07-14T09:29:49Z| NULL| 1752485389| 2025-07-14T09:29:49Z| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| en:France| en:france| France| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL|en:to-be-complete...|en:to-be-complete...|To be completed,N...| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| 0.05| 1752485389|2025-07-14T09:29:49Z| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| |00000004|http://world-en.o...| elcoco|1560176426|2019-06-10T14:20:26Z| 1748094869| 2025-05-24T13:54:29Z| smoothie-app| 1748094869| 2025-05-24T13:54:29Z|Entrecôesteack - ...| NULL| NULL| 1000g| Glas| en:glass| Glass| NULL|PG Tips, green or...|xx:pg-tips,xx:gre...|pg-tips,green-org...|Nutrition drink mix|en:nutrition-drin...|Nutrition-drink-mix|Nizozemsko,Peru|en:netherlands,en...|Netherlands,Peru| NULL| NULL|Fair trade, Organ...|en:fair-trade,en:...|Fair trade,Organi...| NULL| NULL| NULL| NULL| NULL| United Kingdom|Amazon|Brasilien, Germany|en:brazil,en:germany|Brazil,Germany|Organic Ashwagand...|es:organic-ashwag...| en:palm-oil-conte...|en:nuts,en:soybeans| NULL| NULL| NULL| NULL| 0g| 0| NULL| 0| NULL| NULL| NULL| 15| d| NULL| unknown| unknown| NULL| NULL| NULL|en:to-be-complete...|en:to-be-complete...|To be completed,N...| NULL| NULL| unknown|en:fat-in-moderat...| 1000| NULL| en:energy-value-i...| 1|top-75-percent-sc...| 0.8875| 1748094869|2025-05-24T13:54:29Z|en:nutrition-drin...|Nutrition-drink-mix|https://images.op...|https://images.op...| https://images.op...| https://images.op...|https://images.op...| https://images.op...| 2401| 324| 2401| NULL| 12| 10.5| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| 0| 0| 13| 9| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| 36| NULL| NULL| NULL| 23| NULL| NULL| NULL| 0.3| NULL| 0.12| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| 0| NULL| NULL| NULL| NULL| NULL| 15| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| |00000005|http://world-en.o...|openfoodfacts-con...|1754314021|2025-08-04T13:27:01Z| 1754314023| 2025-08-04T13:27:03Z| NULL| 1754314023| 2025-08-04T13:27:03Z| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| en:France| en:france| France| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL|en:to-be-complete...|en:to-be-complete...|To be completed,N...| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| 0.05| 1754314023|2025-08-04T13:27:03Z| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| |00000006|http://world-en.o...| moon-rabbit|1760212975|2025-10-11T20:02:55Z| 1760218930| 2025-10-11T21:42:10Z| ascharao| 1760218930| 2025-10-11T21:42:10Z| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| en:Germany| en:germany| Germany| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL|en:to-be-complete...|en:to-be-complete...|To be completed,N...| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| 0.05| 1760212978|2025-10-11T20:02:58Z| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| NULL| +--------+--------------------+--------------------+----------+--------------------+---------------+----------------------+----------------+--------------+---------------------+--------------------+------------------------+------------+--------+---------+--------------+------------+--------------+--------------------+--------------------+--------------------+-------------------+--------------------+-------------------+---------------+--------------------+----------------+--------------------+-------------------------+--------------------+--------------------+--------------------+---------+--------------+------------------------+------+-----------+---------------+------+------------------+--------------------+--------------+--------------------+--------------------+-------------------------+-------------------+------------+------+-----------+---------+------------+----------------+-----------------+-----------+---------+--------------+------------+----------------+----------------+----------+-------------+-------------+-----------+----------------+--------------+--------------------+--------------------+--------------------+-----------+-------------------------+-------------------------+--------------------+----------------+-----+------------------------+--------------+--------------------+------------+------------+--------------------+--------------------+-------------------+--------------------+--------------------+---------------------+---------------------------+--------------------+-------------------------+--------------+----------------+-----------+--------------------+--------+------------------+-----------------+-----------------+------------------+----------------+----------------+------------------+------------------+-----------------+-------------------+-----------------+--------------------+-----------------+------------------+------------------+--------------------+------------------------+----------------+------------------------+----------------+----------------+-------------------------+--------------------------+-------------------------+------------------+---------------------+-------------------------+--------------------------------+---------------+-----------------+-----------------+--------------+----------------+------------------+--------------+----------------+------------------+-----------+-----------------+------------+------------+-------------+--------------+------------+------------+------------------+------------+-----------+------------+---------------+------------+-------------+-------------+----------+------------------+-----------------+--------------------+-------------+-----------+-------------------+----------------+---------+---------------+-----------+------------+--------------+------------------+--------------+--------------+--------------+--------------+---------------+---------------+---------------+---------------+---------------+------------+----------------+-----------+---------------------+-----------+----------------+--------------+-------------+------------+---------------+---------+--------------+---------+-----------+--------------+-------------+-------------+-------------+---------------+-----------+-------------+------------+--------------------------+-------+---------------------------+---------------------------------+------------------------------------+-----------------------------------------------------+--------------------------------+----------+---------------+---------------------+---------------------------------------+-----------------------+-----------------------+-------------------+-------------------+------------+------------------+----------------+-------------+--------------+-------------+------------+------------+------------------------+ only showing top 5 rows
2. Silver — cleaning and typing¶
In [7]:
# --- Verification Step: Inspect Salt vs. Sodium Data Quality ---
# 1. Preview the relevant columns to see the raw state (lots of NULLs expected)
print("Preview of Salt and Sodium columns:")
df_raw.select("product_name", "salt_100g", "sodium_100g").show(20, truncate=False)
# 2. Count rows where Salt is missing, but Sodium is present
# This metric justifies the 'coalesce' logic in the Silver step:
# We can "recover" these products by calculating Salt = Sodium * 2.5
missing_salt_but_has_sodium = df_raw.filter(
(F.col("salt_100g").isNull()) &
(F.col("sodium_100g").isNotNull())
).count()
print(f"Number of recoverable products (Sodium present but Salt missing): {missing_salt_but_has_sodium}")
Preview of Salt and Sodium columns: +---------------------------------+---------+-----------+ |product_name |salt_100g|sodium_100g| +---------------------------------+---------+-----------+ |NULL |NULL |NULL | |NULL |NULL |NULL | |Entrecôesteack - Highland Beef |0.3 |0.12 | |NULL |NULL |NULL | |NULL |NULL |NULL | |6666 |NULL |NULL | |granola Bio le Chocolaté |1 |0.4 | |NULL |1.5 |0.6 | |Protein Plant Powered Wrap |NULL |NULL | |xytitol pastilles |0.275 |0.11 | |xxx |NULL |NULL | |NULL |NULL |NULL | |Besan chilla with stuffed chciken|NULL |NULL | |Powdered peanut butter |0.0625 |0.025 | |Madeleines ChocoLait |0.48 |0.192 | |velvety vanilla cake mix |NULL |NULL | |Collagen For Her |0.0882 |0.0353 | |Chocolate peanut butter protein |0.45 |0.18 | |Erdbeeren |1.083325 |0.43333 | |Nesquik moins de sucre |1.79 |0.714 | +---------------------------------+---------+-----------+ only showing top 20 rows
[Stage 10:=======================================================>(88 + 1) / 89]
Number of recoverable products (Sodium present but Salt missing): 0
In [8]:
# --- Inspection: Preview Fat, Sugars, Salt, and Sodium ---
cols_to_check = ["product_name", "fat_100g", "sugars_100g", "salt_100g", "sodium_100g"]
print("1. Raw preview (top 20 rows - likely containing NULLs):")
df_raw.select(cols_to_check).show(20, truncate=False)
print("2. Preview of rows containing actual data (to verify format):")
# We filter to show only rows where 'fat_100g' is NOT null, so we can see the numbers
df_raw.select(cols_to_check).filter(
F.col("fat_100g").isNotNull() &
F.col("sugars_100g").isNotNull()
).show(20, truncate=False)
1. Raw preview (top 20 rows - likely containing NULLs): +---------------------------------+--------+-----------+---------+-----------+ |product_name |fat_100g|sugars_100g|salt_100g|sodium_100g| +---------------------------------+--------+-----------+---------+-----------+ |NULL |NULL |NULL |NULL |NULL | |NULL |NULL |NULL |NULL |NULL | |Entrecôesteack - Highland Beef |12 |9 |0.3 |0.12 | |NULL |NULL |NULL |NULL |NULL | |NULL |NULL |NULL |NULL |NULL | |6666 |NULL |NULL |NULL |NULL | |granola Bio le Chocolaté |1 |1 |1 |0.4 | |NULL |2 |1.7 |1.5 |0.6 | |Protein Plant Powered Wrap |NULL |NULL |NULL |NULL | |xytitol pastilles |0.5 |0.24 |0.275 |0.11 | |xxx |1.4 |NULL |NULL |NULL | |NULL |NULL |NULL |NULL |NULL | |Besan chilla with stuffed chciken|NULL |NULL |NULL |NULL | |Powdered peanut butter |13 |3.6 |0.0625 |0.025 | |Madeleines ChocoLait |24 |31 |0.48 |0.192 | |velvety vanilla cake mix |NULL |NULL |NULL |NULL | |Collagen For Her |1.76 |0 |0.0882 |0.0353 | |Chocolate peanut butter protein |5.36 |NULL |0.45 |0.18 | |Erdbeeren |6.9 |NULL |1.083325 |0.43333 | |Nesquik moins de sucre |5.71 |8.57 |1.79 |0.714 | +---------------------------------+--------+-----------+---------+-----------+ only showing top 20 rows 2. Preview of rows containing actual data (to verify format): +-------------------------------------------+----------------+----------------+---------+-----------+ |product_name |fat_100g |sugars_100g |salt_100g|sodium_100g| +-------------------------------------------+----------------+----------------+---------+-----------+ |Entrecôesteack - Highland Beef |12 |9 |0.3 |0.12 | |granola Bio le Chocolaté |1 |1 |1 |0.4 | |NULL |2 |1.7 |1.5 |0.6 | |xytitol pastilles |0.5 |0.24 |0.275 |0.11 | |Powdered peanut butter |13 |3.6 |0.0625 |0.025 | |Madeleines ChocoLait |24 |31 |0.48 |0.192 | |Collagen For Her |1.76 |0 |0.0882 |0.0353 | |Nesquik moins de sucre |5.71 |8.57 |1.79 |0.714 | |Farandole de madeleine |16.7 |1.85 |0.88 |0.352 | |The Smartest Cookie |5.36 |25 |2.68 |1.07 | |Hershey’s Syrup |2 |6 |0 |0 | |Anthony's Organic Cocoa Butter Chunks |100 |0 |0.075 |0.03 | |Lindt Vollmilch Schokolade |0.3 |0.2 |0.8 |0.32 | |Multi Patents Collagen Peptides |0 |9.09 |0 |0 | |Yerba Mate |42.4 |0 |0.455 |0.182 | |Confiture de Grenade aux fruits secs Rihana|6.1 |0.5 |0.49 |0.196 | |Multivitamins & Minerals |9 |4.05 |0.125 |0.05 | |Butter Burst |100 |18.6 |8.57 |3.43 | |1up |17.6470588235294|5.88235294117647|NULL |NULL | |Mre Lite |6.9 |6.9 |0.733 |0.293 | +-------------------------------------------+----------------+----------------+---------+-----------+ only showing top 20 rows
In [9]:
silver = CFG["paths"]["silver"]
df_silver = (df_raw
# 1. TYPE CASTING & DATA TRANSFORMATION
# Convert key numerical metrics from String to Double
.withColumn("fat_100g", F.col("fat_100g").cast("double"))
.withColumn("sugars_100g", F.col("sugars_100g").cast("double"))
.withColumn("salt_100g", F.col("salt_100g").cast("double"))
# Convert Unix timestamp (string) to Date type
# Casting to 'long' first is necessary before using F.from_unixtime
.withColumn("created_t_long", F.col("created_t").cast("long"))
.withColumn("creation_date", F.from_unixtime(F.col("created_t_long")).cast(T.DateType()))
# Extract the year for easy partitioning and aggregation
.withColumn("year", F.year("creation_date").cast(T.IntegerType()))
# 2. DATA QUALITY CHECKS (Handling NULLs)
# Drop rows where key analytical fields are missing (Null Policy)
# This cleans out products with no nutritional data or creation date
.dropna(subset=[
"fat_100g",
"sugars_100g",
"salt_100g", # Ajouté car on l'utilise dans le cast
"countries_en",
"creation_date"
])
# 3. NARROW PROJECTION (Crucial for later optimization)
# Select only the columns needed for the Gold layer and future joins
.select(
"code",
"countries_en",
"creation_date",
"year",
"fat_100g",
"sugars_100g",
"salt_100g",
"product_name",
F.col("nutriscore_grade").alias("nutriscore") # Rename for clarity
)
)
df_silver.write.mode("overwrite").parquet(silver)
print("Silver written:", silver)
25/11/19 16:43:06 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 95.00% for 8 writers 25/11/19 16:43:13 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 95.00% for 8 writers 25/11/19 16:43:19 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 95.00% for 8 writers 25/11/19 16:43:19 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 95.00% for 8 writers 25/11/19 16:43:20 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 95.00% for 8 writers 25/11/19 16:43:24 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 95.00% for 8 writers 25/11/19 16:43:25 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 95.00% for 8 writers 25/11/19 16:43:25 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 95.00% for 8 writers 25/11/19 16:43:26 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 95.00% for 8 writers 25/11/19 16:43:31 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 95.00% for 8 writers 25/11/19 16:43:32 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 95.00% for 8 writers 25/11/19 16:43:32 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 95.00% for 8 writers 25/11/19 16:43:33 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 95.00% for 8 writers 25/11/19 16:43:33 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 95.00% for 8 writers 25/11/19 16:43:37 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 95.00% for 8 writers 25/11/19 16:43:37 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 95.00% for 8 writers 25/11/19 16:43:38 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 95.00% for 8 writers 25/11/19 16:43:38 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 95.00% for 8 writers 25/11/19 16:43:42 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 95.00% for 8 writers 25/11/19 16:43:42 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 95.00% for 8 writers 25/11/19 16:43:42 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 95.00% for 8 writers 25/11/19 16:43:42 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 95.00% for 8 writers 25/11/19 16:43:44 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 95.00% for 8 writers 25/11/19 16:43:45 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 95.00% for 8 writers 25/11/19 16:43:45 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 95.00% for 8 writers 25/11/19 16:43:45 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 95.00% for 8 writers 25/11/19 16:43:47 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 95.00% for 8 writers 25/11/19 16:43:47 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 95.00% for 8 writers 25/11/19 16:43:47 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 95.00% for 8 writers 25/11/19 16:43:48 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 95.00% for 8 writers 25/11/19 16:43:48 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 95.00% for 8 writers 25/11/19 16:43:48 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 95.00% for 8 writers 25/11/19 16:43:50 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 95.00% for 8 writers 25/11/19 16:43:50 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 95.00% for 8 writers 25/11/19 16:43:50 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 95.00% for 8 writers 25/11/19 16:43:51 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 95.00% for 8 writers 25/11/19 16:43:51 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 95.00% for 8 writers 25/11/19 16:43:58 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 95.00% for 8 writers 25/11/19 16:43:58 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 95.00% for 8 writers 25/11/19 16:43:58 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 95.00% for 8 writers 25/11/19 16:43:59 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 95.00% for 8 writers 25/11/19 16:43:59 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 95.00% for 8 writers 25/11/19 16:43:59 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 95.00% for 8 writers 25/11/19 16:44:00 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 95.00% for 8 writers [Stage 15:=======================================================>(88 + 1) / 89]
Silver written: outputs/project/silver
3. Gold — analytics tables¶
Q1 - Nutritional Trends by Country and Year¶
In [23]:
gold = CFG["paths"]["gold"]
partition_by = CFG["layout"]["partition_by"] # Devrait être ["countries_en"]
# --- Gold Q1: Nutritional Trends by Country & Year (Baseline) ---
# 1. AGGREGATION
# We group by Country and Year to calculate average metrics
gold_q1 = (df_silver
.groupBy("countries_en", "year")
.agg(
F.avg("fat_100g").alias("avg_fat"),
F.avg("sugars_100g").alias("avg_sugars"),
F.avg("salt_100g").alias("avg_salt"),
F.count("*").alias("product_count")
)
# 2. FILTERING
# We only keep meaningful groups (e.g., countries with at least 50 products in that year)
# This avoids statistical noise from countries with just 1 or 2 products.
.filter(F.col("product_count") >= 50)
# 3. SORTING (Logical)
# Showing the countries with the most products first
.orderBy(F.col("product_count").desc())
)
# Start tracking BEFORE the write action
metrics_collector.start_task("q1_baseline", "Gold Q1 Baseline Run")
# 4. WRITE (Baseline)
# We write the result partitioned by the config key (countries_en)
# This is the standard "Baseline" approach before we try to optimize it further.
(gold_q1.write.mode("overwrite")
.partitionBy(*partition_by)
.parquet(f"{gold}/q1_nutrition_stats_baseline"))
# Stop tracking and get the dictionary of results
metrics_baseline = metrics_collector.stop_task()
print(f"Gold Q1 Baseline written to: {gold}/q1_nutrition_stats_baseline")
update_metrics_log("Q1", "baseline", metrics_baseline, note="Automated run")
Starting tracking for: Gold Q1 Baseline Run
Tracking completed. - Time: 26743.38 ms - Shuffle metrics: 0 (or unavailable) Gold Q1 Baseline written to: outputs/project/gold/q1_nutrition_stats_baseline [INFO] CSV updated successfully for Q1 (baseline)
/tmp/ipykernel_1213/1846102498.py:35: FutureWarning: Setting an item of incompatible dtype is deprecated and will raise an error in a future version of pandas. Value 'Auto-captured. Automated run' has dtype incompatible with float64, please explicitly cast to a compatible dtype first.
df.loc[mask, 'notes'] = f"Auto-captured. {note}"
In [11]:
gold_q1.show(10)
[Stage 24:=======================================================>(88 + 1) / 89]
+-------------+----+------------------+------------------+------------------+-------------+ | countries_en|year| avg_fat| avg_sugars| avg_salt|product_count| +-------------+----+------------------+------------------+------------------+-------------+ | France|2018|14.354150251290514|14.149931639526505|1.3311780389470138| 204521| |United States|2017| 18.0728667062721| 24.12250687790912|1.3209166810404191| 151097| |United States|2020|11.638441415720914|15.905750505214517|1.5747855141861344| 133271| |United States|2022|13.302800285068166|14.431219986889642|2.3446040975735656| 109931| |United States|2025|14.528111577862248|12.866601166540855| 3.296180411257816| 106890| | France|2019|15.064196279512842|14.191240245807117| 1.178798975764673| 98310| | Spain|2019|16.023631487578804| 9.429283120699823| 1.22837424507755| 78539| | Italy|2021| 14.6995937971515|10.865084877778656|1.2862531430161348| 77448| | France|2020| 15.14001272649394|12.147366598237145|1.3085055379823032| 68368| | France|2017|13.489183847348515|13.902984526912686|1.1307879524743631| 66232| +-------------+----+------------------+------------------+------------------+-------------+ only showing top 10 rows
4. Baseline plans and metrics¶
In [12]:
import os, datetime as _dt, pathlib
pathlib.Path(proof).mkdir(parents=True, exist_ok=True)
# Example baseline plan
plan = gold_q1._jdf.queryExecution().executedPlan().toString()
with open(f"{proof}/baseline_q1_plan.txt","w") as f:
f.write(str(_dt.datetime.now())+"\n")
f.write(plan)
print("Saved baseline plan. Record Spark UI metrics now.")
Saved baseline plan. Record Spark UI metrics now.
In [16]:
import os
# Define the exact path to the Gold Baseline table
# This combines the 'gold' folder path from your config with the specific table name
gold_path = f"{CFG['paths']['gold']}/q1_nutrition_stats_baseline"
def get_folder_size_mb(folder_path):
"""Recursively calculates the size of a directory in MB, excluding system files."""
total_size = 0
# Walk through the directory structure
for dirpath, _, filenames in os.walk(folder_path):
for f in filenames:
fp = os.path.join(dirpath, f)
# Filter out Hadoop checksums (.crc), success flags (_SUCCESS), and hidden files
if not f.endswith('.crc') and not f.startswith('_') and not f.startswith('.'):
total_size += os.path.getsize(fp)
# Convert bytes to Megabytes
return total_size / (1024 * 1024)
# Execute calculation
if os.path.exists(gold_path):
gold_size_mb = get_folder_size_mb(gold_path)
# Reference Raw Size (Bronze) - approx 1126 MB based on your dataset
raw_size_mb = 1126.0
print("--- STORAGE METRICS (BASELINE) ---")
print(f"Path measured: {gold_path}")
print(f"1. Gold Table Size (Baseline): {gold_size_mb:.2f} MB")
print(f"2. Compression Ratio: {(gold_size_mb / raw_size_mb) * 100:.2f}%")
print(" Target: <= 60%")
else:
print(f"Error: The folder '{gold_path}' does not exist.")
print("Please verify that Step 3 (Gold Baseline) finished writing successfully.")
--- STORAGE METRICS (BASELINE) --- Path measured: outputs/project/gold/q1_nutrition_stats_baseline 1. Gold Table Size (Baseline): 0.34 MB 2. Compression Ratio: 0.03% Target: <= 60%
5. Optimization — layout and joins¶
In [25]:
import datetime
gold = CFG["paths"]["gold"]
partition_by = CFG["layout"]["partition_by"]
proof = CFG["paths"]["proof"]
# Optimization for Q1
df_silver_min = df_silver.select(
"countries_en",
"year",
"fat_100g",
"sugars_100g",
"salt_100g"
)
gold_q1_opt = (df_silver_min
.groupBy("countries_en", "year")
.agg(
F.avg("fat_100g").alias("avg_fat"),
F.avg("sugars_100g").alias("avg_sugars"),
F.avg("salt_100g").alias("avg_salt"),
F.count("*").alias("product_count")
)
.filter(F.col("product_count") >= 50)
.orderBy(F.col("product_count").desc())
)
# --- EXECUTION WITH METRICS ---
# A. Start tracking
metrics_collector.start_task("q1_optimized", "Gold Q1 Optimized Run")
# B. Trigger the computation (Write to disk)
(gold_q1_opt
.sortWithinPartitions("year")
.write
.mode("overwrite")
.partitionBy(*partition_by)
.parquet(f"{gold}/q1_nutrition_stats_optimized")
)
# C. Stop tracking and capture numbers
metrics_opt = metrics_collector.stop_task()
print(f"Gold Q1 Optimized written to: {gold}/q1_nutrition_stats_optimized")
# --- SAVE PROOF (Physical Plan) ---
plan_opt = gold_q1_opt._jdf.queryExecution().executedPlan().toString()
with open(f"{proof}/optimized_q1_plan.txt", "w") as f:
f.write(str(datetime.datetime.now()) + "\n")
f.write(plan_opt)
print(f"Optimized plan saved to: {proof}/optimized_q1_plan.txt")
# --- UPDATE METRICS LOG ---
update_metrics_log("Q1", "optimized", metrics_opt, note="Narrow Projection + SortWithinPartitions")
Starting tracking for: Gold Q1 Optimized Run
Tracking completed. - Time: 29775.38 ms - Shuffle metrics: 0 (or unavailable) Gold Q1 Optimized written to: outputs/project/gold/q1_nutrition_stats_optimized Optimized plan saved to: outputs/project/proof/optimized_q1_plan.txt [INFO] CSV updated successfully for Q1 (optimized)
In [32]:
path_base_q1 = f"{CFG['paths']['gold']}/q1_nutrition_stats_baseline"
path_opt_q1 = f"{CFG['paths']['gold']}/q1_nutrition_stats_optimized"
def benchmark_q1_read(name, path, target_year):
print(f"\n=== Q1 BENCHMARK: {name} ===")
# 1. Clear cache (simulate cold read)
spark.catalog.clearCache()
# 2. Define Query: Filter by YEAR to test the 'sortWithinPartitions' impact
# We assume the user wants data for a specific year across all countries
start = time.time()
df = spark.read.parquet(path).filter(F.col("year") == target_year)
# 3. Force execution
count = df.count()
end = time.time()
duration = (end - start) * 1000
print(f"Target Year: {target_year}")
print(f"Rows found: {count}")
print(f"Read Time: {duration:.4f} ms")
# 4. Verify Predicate Pushdown
print(f"--- Execution Plan ({name}) ---")
df.explain()
# --- RUN COMPARISON ---
# Test with a recent year (likely present in the data)
target_year = 2023
benchmark_q1_read("Baseline (Unsorted Year)", path_base_q1, target_year)
benchmark_q1_read("Optimized (Sorted Year)", path_opt_q1, target_year)
=== Q1 BENCHMARK: Baseline (Unsorted Year) === Target Year: 2023 Rows found: 86 Read Time: 1240.8607 ms --- Execution Plan (Baseline (Unsorted Year)) --- == Physical Plan == *(1) Filter (isnotnull(year#2812) AND (year#2812 = 2023)) +- *(1) ColumnarToRow +- FileScan parquet [year#2812,avg_fat#2813,avg_sugars#2814,avg_salt#2815,product_count#2816L,countries_en#2817] Batched: true, DataFilters: [isnotnull(year#2812), (year#2812 = 2023)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/justine/Data_engineering/Project/outputs/project/gold/q1_nu..., PartitionFilters: [], PushedFilters: [IsNotNull(year), EqualTo(year,2023)], ReadSchema: struct<year:int,avg_fat:double,avg_sugars:double,avg_salt:double,product_count:bigint> === Q1 BENCHMARK: Optimized (Sorted Year) === Target Year: 2023 Rows found: 86 Read Time: 636.9076 ms --- Execution Plan (Optimized (Sorted Year)) --- == Physical Plan == *(1) Filter (isnotnull(year#2829) AND (year#2829 = 2023)) +- *(1) ColumnarToRow +- FileScan parquet [year#2829,avg_fat#2830,avg_sugars#2831,avg_salt#2832,product_count#2833L,countries_en#2834] Batched: true, DataFilters: [isnotnull(year#2829), (year#2829 = 2023)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/justine/Data_engineering/Project/outputs/project/gold/q1_nu..., PartitionFilters: [], PushedFilters: [IsNotNull(year), EqualTo(year,2023)], ReadSchema: struct<year:int,avg_fat:double,avg_sugars:double,avg_salt:double,product_count:bigint>
Q2 - Nutriscore Analysis¶
In [28]:
# --- Q2: NUTRISCORE PROFILE ANALYSIS ---
# 1. BASELINE EXECUTION
print("--- Running Q2 Baseline ---")
# Logic: Group by Nutriscore to see avg fat/sugar/salt
gold_q2 = (df_silver
.groupBy("nutriscore")
.agg(
F.avg("fat_100g").alias("avg_fat"),
F.avg("sugars_100g").alias("avg_sugars"),
F.count("*").alias("product_count")
)
.filter(F.col("nutriscore").isNotNull())
.orderBy("nutriscore")
)
metrics_collector.start_task("q2_baseline", "Gold Q2 Baseline")
(gold_q2.write.mode("overwrite").parquet(f"{gold}/q2_nutriscore_baseline"))
metrics_q2_base = metrics_collector.stop_task()
update_metrics_log("Q2", "baseline", metrics_q2_base)
# 2. OPTIMIZED EXECUTION
print("\n--- Running Q2 Optimized ---")
# Optimization: Narrow Projection (select only needed cols)
df_silver_q2 = df_silver.select("nutriscore", "fat_100g", "sugars_100g")
gold_q2_opt = (df_silver_q2
.groupBy("nutriscore")
.agg(
F.avg("fat_100g").alias("avg_fat"),
F.avg("sugars_100g").alias("avg_sugars"),
F.count("*").alias("product_count")
)
.filter(F.col("nutriscore").isNotNull())
.orderBy("nutriscore")
)
metrics_collector.start_task("q2_optimized", "Gold Q2 Optimized")
# Optimization: sortWithinPartitions on the grouping key ('nutriscore')
(gold_q2_opt
.sortWithinPartitions("nutriscore")
.write.mode("overwrite").parquet(f"{gold}/q2_nutriscore_optimized")
)
metrics_q2_opt = metrics_collector.stop_task()
update_metrics_log("Q2", "optimized", metrics_q2_opt, note="Projection + Sort(Nutriscore)")
--- Running Q2 Baseline --- Starting tracking for: Gold Q2 Baseline
Tracking completed. - Time: 28746.2 ms - Shuffle metrics: 0 (or unavailable) [INFO] CSV updated successfully for Q2 (baseline) --- Running Q2 Optimized --- Starting tracking for: Gold Q2 Optimized
Tracking completed. - Time: 31492.24 ms - Shuffle metrics: 0 (or unavailable) [INFO] CSV updated successfully for Q2 (optimized)
In [36]:
# --- SAVE PROOFS FOR Q2 (Physical Plans) ---
# 1. Baseline Plan
if 'gold_q2' in locals():
plan_q2_base = gold_q2._jdf.queryExecution().executedPlan().toString()
with open(f"{proof}/baseline_q2_plan.txt", "w") as f:
f.write(str(datetime.datetime.now()) + "\n")
f.write(plan_q2_base)
print(f" Baseline Q2 plan saved to: {proof}/baseline_q2_plan.txt")
# 2. Optimized Plan
if 'gold_q2_opt' in locals():
plan_q2_opt = gold_q2_opt._jdf.queryExecution().executedPlan().toString()
with open(f"{proof}/optimized_q2_plan.txt", "w") as f:
f.write(str(datetime.datetime.now()) + "\n")
f.write(plan_q2_opt)
print(f" Optimized Q2 plan saved to: {proof}/optimized_q2_plan.txt")
Baseline Q2 plan saved to: outputs/project/proof/baseline_q2_plan.txt Optimized Q2 plan saved to: outputs/project/proof/optimized_q2_plan.txt
In [35]:
# Define paths for Q2 (Nutriscore Analysis)
path_base = f"{CFG['paths']['gold']}/q2_nutriscore_baseline"
path_opt = f"{CFG['paths']['gold']}/q2_nutriscore_optimized"
def benchmark_read(name, path, target_score):
print(f"\n=== Q2 BENCHMARK: {name} ===")
# 1. Clear cache to ensure cold read (simulation)
spark.catalog.clearCache()
# 2. Define the query: A user wants ONLY 'target_score' data
start = time.time()
df = spark.read.parquet(path).filter(F.col("nutriscore") == target_score)
# 3. Force execution (count the rows)
count = df.count()
end = time.time()
duration = (end - start) * 1000
print(f"Rows found: {count}")
print(f"Read Time: {duration:.4f} ms")
# 4. Proof of Predicate Pushdown
# We verify that Spark pushes the filter to the Parquet reader
print(f"--- Execution Plan ({name}) ---")
df.explain()
# --- RUN COMPARISON ---
# We assume Q2 has been generated. We look for Nutriscore 'e'.
benchmark_read("Baseline (Unsorted)", path_base, "e")
benchmark_read("Optimized (Sorted)", path_opt, "e")
=== Q2 BENCHMARK: Baseline (Unsorted) === Rows found: 1 Read Time: 549.2725 ms --- Execution Plan (Baseline (Unsorted)) --- == Physical Plan == *(1) Filter (isnotnull(nutriscore#2925) AND (nutriscore#2925 = e)) +- *(1) ColumnarToRow +- FileScan parquet [nutriscore#2925,avg_fat#2926,avg_sugars#2927,product_count#2928L] Batched: true, DataFilters: [isnotnull(nutriscore#2925), (nutriscore#2925 = e)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/justine/Data_engineering/Project/outputs/project/gold/q2_nu..., PartitionFilters: [], PushedFilters: [IsNotNull(nutriscore), EqualTo(nutriscore,e)], ReadSchema: struct<nutriscore:string,avg_fat:double,avg_sugars:double,product_count:bigint> === Q2 BENCHMARK: Optimized (Sorted) === Rows found: 1 Read Time: 272.1484 ms --- Execution Plan (Optimized (Sorted)) --- == Physical Plan == *(1) Filter (isnotnull(nutriscore#2938) AND (nutriscore#2938 = e)) +- *(1) ColumnarToRow +- FileScan parquet [nutriscore#2938,avg_fat#2939,avg_sugars#2940,product_count#2941L] Batched: true, DataFilters: [isnotnull(nutriscore#2938), (nutriscore#2938 = e)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/justine/Data_engineering/Project/outputs/project/gold/q2_nu..., PartitionFilters: [], PushedFilters: [IsNotNull(nutriscore), EqualTo(nutriscore,e)], ReadSchema: struct<nutriscore:string,avg_fat:double,avg_sugars:double,product_count:bigint>
Q3 - Global Yearly Trend¶
In [33]:
# --- Q3: GLOBAL YEARLY TREND ANALYSIS ---
# 1. BASELINE EXECUTION
print("--- Running Q3 Baseline ---")
# Logic: Global trend by Year (ignoring countries)
gold_q3 = (df_silver
.groupBy("year")
.agg(
F.avg("sugars_100g").alias("avg_sugars"),
F.count("*").alias("total_products")
)
.filter("year >= 2000") # Filter modern era
.orderBy(F.col("year").desc())
)
metrics_collector.start_task("q3_baseline", "Gold Q3 Baseline")
(gold_q3.write.mode("overwrite").parquet(f"{gold}/q3_trends_baseline"))
metrics_q3_base = metrics_collector.stop_task()
update_metrics_log("Q3", "baseline", metrics_q3_base)
# 2. OPTIMIZED EXECUTION
print("\n--- Running Q3 Optimized ---")
# Optimization: Narrow Projection
df_silver_q3 = df_silver.select("year", "sugars_100g")
gold_q3_opt = (df_silver_q3
.groupBy("year")
.agg(
F.avg("sugars_100g").alias("avg_sugars"),
F.count("*").alias("total_products")
)
.filter("year >= 2000")
.orderBy(F.col("year").desc())
)
metrics_collector.start_task("q3_optimized", "Gold Q3 Optimized")
# Optimization: sortWithinPartitions on 'year'
(gold_q3_opt
.sortWithinPartitions("year")
.write.mode("overwrite").parquet(f"{gold}/q3_trends_optimized")
)
metrics_q3_opt = metrics_collector.stop_task()
update_metrics_log("Q3", "optimized", metrics_q3_opt, note="Projection + Sort(Year)")
--- Running Q3 Baseline --- Starting tracking for: Gold Q3 Baseline
Tracking completed. - Time: 38191.44 ms - Shuffle metrics: 0 (or unavailable) [INFO] CSV updated successfully for Q3 (baseline) --- Running Q3 Optimized --- Starting tracking for: Gold Q3 Optimized
Tracking completed. - Time: 24528.79 ms - Shuffle metrics: 0 (or unavailable) [INFO] CSV updated successfully for Q3 (optimized)
In [37]:
# --- SAVE PROOFS FOR Q3 (Physical Plans) ---
# 1. Baseline Plan
if 'gold_q3' in locals():
plan_q3_base = gold_q3._jdf.queryExecution().executedPlan().toString()
with open(f"{proof}/baseline_q3_plan.txt", "w") as f:
f.write(str(datetime.datetime.now()) + "\n")
f.write(plan_q3_base)
print(f" Baseline Q3 plan saved to: {proof}/baseline_q3_plan.txt")
# 2. Optimized Plan
if 'gold_q3_opt' in locals():
plan_q3_opt = gold_q3_opt._jdf.queryExecution().executedPlan().toString()
with open(f"{proof}/optimized_q3_plan.txt", "w") as f:
f.write(str(datetime.datetime.now()) + "\n")
f.write(plan_q3_opt)
print(f" Optimized Q3 plan saved to: {proof}/optimized_q3_plan.txt")
Baseline Q3 plan saved to: outputs/project/proof/baseline_q3_plan.txt Optimized Q3 plan saved to: outputs/project/proof/optimized_q3_plan.txt
In [34]:
path_base_q3 = f"{CFG['paths']['gold']}/q3_trends_baseline"
path_opt_q3 = f"{CFG['paths']['gold']}/q3_trends_optimized"
def benchmark_q3_read(name, path, target_year):
print(f"\n=== Q3 BENCHMARK: {name} ===")
# 1. Clear cache to ensure cold read (simulation)
spark.catalog.clearCache()
# 2. Define Query: A user wants stats ONLY for a specific year (e.g., 2020)
start = time.time()
# We read the parquet file and apply a filter immediately
df = spark.read.parquet(path).filter(F.col("year") == target_year)
# 3. Force execution (count rows)
try:
count = df.count()
end = time.time()
duration = (end - start) * 1000
print(f"Target Year: {target_year}")
print(f"Rows found: {count}")
print(f"Read Time: {duration:.4f} ms")
# 4. Proof of Predicate Pushdown
print(f"--- Execution Plan ({name}) ---")
df.explain()
except Exception as e:
print(f"Error reading path: {e}")
# --- RUN COMPARISON ---
# Let's look for a specific year, e.g., 2018
target_year = 2018
benchmark_q3_read("Baseline (Unsorted)", path_base_q3, target_year)
benchmark_q3_read("Optimized (Sorted)", path_opt_q3, target_year)
=== Q3 BENCHMARK: Baseline (Unsorted) === Target Year: 2018 Rows found: 1 Read Time: 784.1933 ms --- Execution Plan (Baseline (Unsorted)) --- == Physical Plan == *(1) Filter (isnotnull(year#2903) AND (year#2903 = 2018)) +- *(1) ColumnarToRow +- FileScan parquet [year#2903,avg_sugars#2904,total_products#2905L] Batched: true, DataFilters: [isnotnull(year#2903), (year#2903 = 2018)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/justine/Data_engineering/Project/outputs/project/gold/q3_tr..., PartitionFilters: [], PushedFilters: [IsNotNull(year), EqualTo(year,2018)], ReadSchema: struct<year:int,avg_sugars:double,total_products:bigint> === Q3 BENCHMARK: Optimized (Sorted) === Target Year: 2018 Rows found: 1 Read Time: 227.7348 ms --- Execution Plan (Optimized (Sorted)) --- == Physical Plan == *(1) Filter (isnotnull(year#2914) AND (year#2914 = 2018)) +- *(1) ColumnarToRow +- FileScan parquet [year#2914,avg_sugars#2915,total_products#2916L] Batched: true, DataFilters: [isnotnull(year#2914), (year#2914 = 2018)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/justine/Data_engineering/Project/outputs/project/gold/q3_tr..., PartitionFilters: [], PushedFilters: [IsNotNull(year), EqualTo(year,2018)], ReadSchema: struct<year:int,avg_sugars:double,total_products:bigint>
6. Cleanup¶
In [39]:
spark.stop()
print("Spark session stopped.")
Spark session stopped.
In [ ]: