DE2 — Lab 1: Structured Streaming Pipeline (10%)¶
Author : Badr TAJINI - Data Engineering II (Data-Intensive Workloads) - ESIEE 2025-2026
Track: (Write your track: A/B/C/D)
Goal: Build a Structured Streaming pipeline with windowed aggregation, watermarks, and a Parquet sink. Monitor via query.lastProgress and the Streaming UI. Deliver a before/after optimization report.
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import *
import time, pathlib, json
spark = SparkSession.builder \
.appName("DE2-Lab1-Streaming") \
.master("local[*]") \
.getOrCreate()
print("Spark version:", spark.version)
print("Spark UI:", spark.sparkContext.uiWebUrl)
WARNING: Using incubator modules: jdk.incubator.vector Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties 26/04/27 17:36:15 WARN Utils: Your hostname, OrdideJustine, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo) 26/04/27 17:36:15 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 26/04/27 17:36:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark version: 4.0.1 Spark UI: http://10.255.255.254:4040
1. Define Schema & Prepare Landing Directory¶
Define an explicit StructType schema for your track data. Create a landing directory where files will be dropped to simulate a streaming source.
# 1. Define schema based on OpenSky 'States' columns
# Note: Using DoubleType for coordinates and metrics as per Track D requirements
schema = StructType([
StructField("time", LongType(), True), # Unix Timestamp
StructField("icao24", StringType(), True), # Unique aircraft ID
StructField("lat", DoubleType(), True),
StructField("lon", DoubleType(), True),
StructField("velocity", DoubleType(), True),
StructField("heading", DoubleType(), True),
StructField("vertrate", DoubleType(), True),
StructField("callsign", StringType(), True),
StructField("onground", BooleanType(), True),
StructField("alert", BooleanType(), True),
StructField("spi", BooleanType(), True),
StructField("squawk", StringType(), True),
StructField("baroaltitude", DoubleType(), True),
StructField("geoaltitude", DoubleType(), True),
StructField("lastposupdate", DoubleType(), True),
StructField("lastcontact", DoubleType(), True)
])
# 2. Create landing directory for simulation
landing_dir = "data/landing/"
pathlib.Path(landing_dir).mkdir(parents=True, exist_ok=True)
print(f"Aviation Schema (Track D) defined.")
print(f"Landing directory created at: {landing_dir}")
Aviation Schema (Track D) defined. Landing directory created at: data/landing/
2. Create Streaming Source¶
Use spark.readStream to create a streaming DataFrame from the landing directory.
# Create streaming source from the landing directory
df_stream = (spark.readStream
.schema(schema)
.option("header", "true")
.option("maxFilesPerTrigger", 1) # Process one file per micro-batch for better UI visualization
.csv(landing_dir)
# Crucial step: Convert Unix Long timestamp (seconds) to a Spark Timestamp
.withColumn("event_time", F.from_unixtime(F.col("time")).cast("timestamp"))
)
print("Streaming source created successfully.")
print("Is streaming active:", df_stream.isStreaming)
# Display the schema to verify 'event_time' is now a TimestampType
df_stream.printSchema()
Streaming source created successfully. Is streaming active: True root |-- time: long (nullable = true) |-- icao24: string (nullable = true) |-- lat: double (nullable = true) |-- lon: double (nullable = true) |-- velocity: double (nullable = true) |-- heading: double (nullable = true) |-- vertrate: double (nullable = true) |-- callsign: string (nullable = true) |-- onground: boolean (nullable = true) |-- alert: boolean (nullable = true) |-- spi: boolean (nullable = true) |-- squawk: string (nullable = true) |-- baroaltitude: double (nullable = true) |-- geoaltitude: double (nullable = true) |-- lastposupdate: double (nullable = true) |-- lastcontact: double (nullable = true) |-- event_time: timestamp (nullable = true)
3. Watermark + Windowed Aggregation¶
Apply a watermark on the event-time column, then group by a tumbling/sliding window and your track-specific key.
# Apply watermark and windowed aggregation for Track D
# A 10-minute watermark is used to handle late-arriving flight data
windowed = (df_stream
.withWatermark("event_time", "10 minutes")
.groupBy(
F.window("event_time", "10 minutes"), # 10-minute tumbling window as per Track D requirements
F.col("icao24") # Group by aircraft ID
)
.agg(
F.count("*").alias("position_updates"),
F.avg("velocity").alias("avg_velocity"),
F.max("baroaltitude").alias("max_altitude")
))
print("Windowed aggregation defined with a 10-minute tumbling window.")
# Display the resulting schema
print("Windowed schema:")
windowed.printSchema()
Windowed aggregation defined with a 10-minute tumbling window. Windowed schema: root |-- window: struct (nullable = false) | |-- start: timestamp (nullable = true) | |-- end: timestamp (nullable = true) |-- icao24: string (nullable = true) |-- position_updates: long (nullable = false) |-- avg_velocity: double (nullable = true) |-- max_altitude: double (nullable = true)
4. Write to Parquet Sink¶
Write the windowed results to a Parquet sink with a trigger interval. Use checkpointLocation for fault tolerance.
import pandas as pd
import numpy as np
import os
# 1. Configuration
input_file = "data/raw_aviation/states_2017-06-05-00.csv"
split_dir = "data/raw_splits/"
n_files = 10
# Create the directory for the split files
os.makedirs(split_dir, exist_ok=True)
try:
# 2. Load the data
print(f"Reading {input_file}...")
df = pd.read_csv(input_file)
# 3. Shuffle data to simulate random flight updates
df = df.sample(frac=1).reset_index(drop=True)
# 4. Split and save
# We split the dataframe into 'n_files' chunks
chunks = np.array_split(df, n_files)
for i, chunk in enumerate(chunks):
chunk_path = os.path.join(split_dir, f"aviation_batch_{i+1}.csv")
chunk.to_csv(chunk_path, index=False)
print(f"Generated: {chunk_path} ({len(chunk)} rows)")
print(f"\nSuccessfully created {n_files} files in {split_dir}")
print("You can now move these files one by one to your 'landing_dir' during the stream.")
except FileNotFoundError:
print(f"Error: The file '{input_file}' was not found. Please check the file name.")
except Exception as e:
print(f"An error occurred: {e}")
# Create output and checkpoint directories
pathlib.Path("outputs/stream_sink").mkdir(parents=True, exist_ok=True)
pathlib.Path("outputs/checkpoint").mkdir(parents=True, exist_ok=True)
# Define and start the streaming query
query = (windowed.writeStream
.format("parquet")
.outputMode("append") # Required for windowed aggregations with watermarks
.option("path", "outputs/stream_sink")
.option("checkpointLocation", "outputs/checkpoint") # Ensures fault tolerance
.trigger(processingTime="10 seconds") # Baseline trigger interval
.start())
print(f"Query started successfully.")
print(f"Query Name: {query.name}")
print(f"Query ID: {query.id}")
print(f"Status: {query.status}")
#query.awaitTermination(timeout=300)
#query.stop()
Query started successfully.
Query Name: None
Query ID: 06aedff3-1197-4122-a85b-f6c8395607b8
Status: {'message': 'Initializing sources', 'isDataAvailable': False, 'isTriggerActive': False}
26/04/27 18:14:52 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
import shutil
# --- STEP 1: CLEANUP PREVIOUS RUNS ---
# To avoid conflicts and "Failed to cancel job" warnings, we stop the query
# and delete old metadata before restarting.
try:
query.stop()
except NameError:
pass
# Delete old outputs and checkpoints to start a clean "After" run
folders_to_clean = ["outputs/stream_sink", "outputs/checkpoint"]
for folder in folders_to_clean:
if pathlib.Path(folder).exists():
shutil.rmtree(folder)
print(f"Cleaned up: {folder}")
# --- STEP 2: APPLY OPTIMIZATIONS ---
# 1. Reduce shuffle partitions (Default is 200, which is too much for local mode)
spark.conf.set("spark.sql.shuffle.partitions", "5")
# --- STEP 3: DEFINE OPTIMIZED STREAM ---
# Here we change the trigger and/or the watermark
pathlib.Path("outputs/stream_sink").mkdir(parents=True, exist_ok=True)
pathlib.Path("outputs/checkpoint").mkdir(parents=True, exist_ok=True)
print("Starting Optimized Query (Run r2)...")
query = (windowed.writeStream
.format("parquet")
.outputMode("append")
.option("path", "outputs/stream_sink")
.option("checkpointLocation", "outputs/checkpoint")
.trigger(processingTime="30 seconds") # OPTIMIZATION: More time to breathe
.start())
print(f"Status: {query.status}")
# --- STEP 4: MONITOR ---
# Let it run for 5 minutes to process your files
# Then stop it properly
# query.awaitTermination(timeout=300)
# query.stop()
Cleaned up: outputs/stream_sink
Cleaned up: outputs/checkpoint
Starting Optimized Query (Run r2)...
Status: {'message': 'Initializing sources', 'isDataAvailable': False, 'isTriggerActive': False}
26/04/27 18:17:56 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
query.stop()
26/04/27 18:28:14 WARN DAGScheduler: Failed to cancel job group f3e9a7d9-37b2-455e-a5a5-6e1734189b49. Cannot find active jobs for it. 26/04/27 18:28:14 WARN DAGScheduler: Failed to cancel job group f3e9a7d9-37b2-455e-a5a5-6e1734189b49. Cannot find active jobs for it.
from pyspark.sql.functions import window, col
windowed_r3 = (
df_stream
.withWatermark("event_time", "5 minutes")
.groupBy(window("event_time", "10 minutes", "5 minutes"), "icao24")
.count()
)
# --- STEP 3: START r3 ---
print("Starting Optimized Query (Run r3: Watermark 5min)...")
query = (windowed_r3.writeStream
.format("parquet")
.outputMode("append")
.option("path", "outputs/stream_sink")
.option("checkpointLocation", "outputs/checkpoint")
.trigger(processingTime="30 seconds")
.start())
Starting Optimized Query (Run r3: Watermark 5min)...
26/04/27 18:04:55 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
# Force an immediate check of the current status
progress = query.lastProgress
if progress is not None:
print("--- METRICS FOUND ---")
print(f"Input Rate: {progress.get('inputRowsPerSecond')} rows/sec")
print(f"Processing Rate: {progress.get('processedRowsPerSecond')} rows/sec")
print(f"Batch Duration: {progress.get('durationMs', {}).get('triggerExecution')} ms")
print(f"Rows in Batch: {progress.get('numInputRows')}")
else:
print("--- NO DATA ACCESSIBLE ---")
print("The query is active but no micro-batch has completed yet.")
print("Drop a file into the landing folder and wait 15 seconds.")
# Let's also check the history just in case
print(f"\nRecent Progress Count: {len(query.recentProgress)}")
--- METRICS FOUND --- Input Rate: 5088.457820884578 rows/sec Processing Rate: 102188.08567603749 rows/sec Batch Duration: 1494 ms Rows in Batch: 152669 Recent Progress Count: 3
5. Monitor — query.lastProgress & Streaming UI¶
Capture streaming metrics from query.lastProgress and take screenshots of the Streaming tab in Spark UI.
import os
import json
import pathlib
import pandas as pd
from datetime import datetime
from uuid import UUID
from contextlib import redirect_stdout
def capture_lab_evidence(query, run_id, trigger_val, watermark_val, note):
"""
Captures all required evidence: CSV metrics, JSON progress, and execution plan.
"""
# 1. Setup folders
pathlib.Path("proof").mkdir(parents=True, exist_ok=True)
# 2. Capture progress data
progress = query.lastProgress
if not progress:
print(f" No progress found for {run_id}. Waiting for a batch...")
return
# 3. Save JSON Progress (UUID fix included)
def uuid_serializer(obj):
if isinstance(obj, UUID): return str(obj)
raise TypeError
json_path = f"proof/query_progress_{run_id}.json"
with open(json_path, "w") as f:
json.dump(progress, f, indent=2, default=uuid_serializer)
# 4. Save Formatted Plan
plan_path = f"proof/plan_streaming_{run_id}.txt"
with open(plan_path, "w") as f:
with redirect_stdout(f):
windowed.explain(mode="formatted")
# 5. Update CSV Metrics Log
metrics = {
"run_id": run_id,
"trigger_interval": trigger_val,
"watermark_duration": watermark_val,
"inputRowsPerSecond": progress.get("inputRowsPerSecond", 0),
"processedRowsPerSecond": progress.get("processedRowsPerSecond", 0),
"numInputRows": progress.get("numInputRows", 0),
"stateRows": progress.get("stateOperators", [{}])[0].get("numRowsTotal", 0) if progress.get("stateOperators") else 0,
"durationMs": progress.get("durationMs", {}).get("triggerExecution", 0),
"note": note,
"timestamp": datetime.now().isoformat()
}
csv_file = "lab1_metrics_log.csv"
df = pd.DataFrame([metrics])
header_needed = not os.path.exists(csv_file)
df.to_csv(csv_file, mode='a', header=header_needed, index=False)
print(f" Run {run_id} captured successfully!")
print(f"- Metrics: {csv_file}")
print(f"- JSON: {json_path}")
print(f"- Plan: {plan_path}")
display(df)
# --- CALL THIS AFTER DROPPING FILES FOR R3 ---
# capture_lab_evidence(query, "r3", "30s", "5min", "Watermark reduction test")
capture_lab_evidence(
query,
run_id="r1",
trigger_val="10s",
watermark_val="10min",
note="Baseline run with default shuffle partitions"
)
Run r1 captured successfully! - Metrics: lab1_metrics_log.csv - JSON: proof/query_progress_r1.json - Plan: proof/plan_streaming_r1.txt
| run_id | trigger_interval | watermark_duration | inputRowsPerSecond | processedRowsPerSecond | numInputRows | stateRows | durationMs | note | timestamp | |
|---|---|---|---|---|---|---|---|---|---|---|
| 0 | r1 | 10s | 10min | 15269.953991 | 110469.609262 | 152669 | 10481 | 1382 | Baseline run with default shuffle partitions | 2026-04-27T18:16:28.138927 |
capture_lab_evidence(
query,
run_id="r2",
trigger_val="30s",
watermark_val="10min",
note="Optimized shuffle (5) and trigger (30s)"
)
Run r2 captured successfully! - Metrics: lab1_metrics_log.csv - JSON: proof/query_progress_r2.json - Plan: proof/plan_streaming_r2.txt
| run_id | trigger_interval | watermark_duration | inputRowsPerSecond | processedRowsPerSecond | numInputRows | stateRows | durationMs | note | timestamp | |
|---|---|---|---|---|---|---|---|---|---|---|
| 0 | r2 | 30s | 10min | 52991.669559 | 102393.695506 | 152669 | 10342 | 1490 | Optimized shuffle (5) and trigger (30s) | 2026-04-27T18:18:30.645266 |
capture_lab_evidence(
query,
run_id="r3",
trigger_val="30s",
watermark_val="5min",
note="Reduced watermark to 5min - Final Optimization"
)
Run r3 captured successfully! - Metrics: lab1_metrics_log.csv - JSON: proof/query_progress_r3.json - Plan: proof/plan_streaming_r3.txt
| run_id | trigger_interval | watermark_duration | inputRowsPerSecond | processedRowsPerSecond | numInputRows | stateRows | durationMs | note | timestamp | |
|---|---|---|---|---|---|---|---|---|---|---|
| 0 | r3 | 30s | 5min | 5087.949077 | 115308.912387 | 152669 | 14997 | 1324 | Reduced watermark to 5min - Final Optimization | 2026-04-27T18:07:18.381641 |
# TODO: Capture query.lastProgress
# progress = query.lastProgress
# print(json.dumps(progress, indent=2))
#
# # Save progress JSON as evidence
# pathlib.Path("proof").mkdir(parents=True, exist_ok=True)
# with open("proof/query_progress.json", "w") as f:
# json.dump(progress, f, indent=2)
#
# # Key metrics to record:
# # - inputRowsPerSecond
# # - processedRowsPerSecond
# # - durationMs (triggerExecution, getBatch, etc.)
# # - stateOperators (numRowsTotal, numRowsUpdated)
#
# # TODO: Save streaming plan
# # windowed.explain("formatted")
# # Copy to proof/plan_streaming.txt
#
# # TODO: Open Spark UI → Streaming tab, take screenshots → proof/
6. Before/After Optimization¶
Re-run the pipeline with a different trigger interval or watermark duration. Compare throughput and latency metrics.
Before: trigger(processingTime="10 seconds"), watermark "10 minutes"
After: Change one parameter (e.g., trigger(processingTime="30 seconds") or watermark "5 minutes") and compare.
# TODO: Re-run with different trigger/watermark, capture metrics
# Compare inputRowsPerSecond, processedRowsPerSecond, durationMs
# Record both runs in lab1_metrics_log.csv
# Example metrics log structure:
# run_id, trigger_interval, watermark_duration, inputRowsPerSecond,
# processedRowsPerSecond, numInputRows, stateRows, durationMs, timestamp
spark.stop()
print("Lab 1 complete.")
Lab 1 complete.
26/04/27 18:29:27 WARN StateStore: Error running maintenance thread java.lang.IllegalStateException: SparkEnv not active, cannot do maintenance on StateStores at org.apache.spark.sql.execution.streaming.state.StateStore$.doMaintenance(StateStore.scala:971) at org.apache.spark.sql.execution.streaming.state.StateStore$.$anonfun$startMaintenanceIfNeeded$1(StateStore.scala:945) at org.apache.spark.sql.execution.streaming.state.StateStore$MaintenanceTask$$anon$1.run(StateStore.scala:746) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572) at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:358) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) at java.base/java.lang.Thread.run(Thread.java:1583)