DE2 — Lab 1: Structured Streaming Pipeline (10%)¶

Author : Badr TAJINI - Data Engineering II (Data-Intensive Workloads) - ESIEE 2025-2026

Track: (Write your track: A/B/C/D)

Goal: Build a Structured Streaming pipeline with windowed aggregation, watermarks, and a Parquet sink. Monitor via query.lastProgress and the Streaming UI. Deliver a before/after optimization report.

In [1]:
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import *
import time, pathlib, json

spark = SparkSession.builder \
    .appName("DE2-Lab1-Streaming") \
    .master("local[*]") \
    .getOrCreate()

print("Spark version:", spark.version)
print("Spark UI:", spark.sparkContext.uiWebUrl)
WARNING: Using incubator modules: jdk.incubator.vector
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/04/27 17:36:15 WARN Utils: Your hostname, OrdideJustine, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
26/04/27 17:36:15 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/04/27 17:36:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark version: 4.0.1
Spark UI: http://10.255.255.254:4040

1. Define Schema & Prepare Landing Directory¶

Define an explicit StructType schema for your track data. Create a landing directory where files will be dropped to simulate a streaming source.

In [2]:
# 1. Define schema based on OpenSky 'States' columns
# Note: Using DoubleType for coordinates and metrics as per Track D requirements
schema = StructType([
    StructField("time", LongType(), True),          # Unix Timestamp
    StructField("icao24", StringType(), True),      # Unique aircraft ID
    StructField("lat", DoubleType(), True),
    StructField("lon", DoubleType(), True),
    StructField("velocity", DoubleType(), True),
    StructField("heading", DoubleType(), True),
    StructField("vertrate", DoubleType(), True),
    StructField("callsign", StringType(), True),
    StructField("onground", BooleanType(), True),
    StructField("alert", BooleanType(), True),
    StructField("spi", BooleanType(), True),
    StructField("squawk", StringType(), True),
    StructField("baroaltitude", DoubleType(), True),
    StructField("geoaltitude", DoubleType(), True),
    StructField("lastposupdate", DoubleType(), True),
    StructField("lastcontact", DoubleType(), True)
])

# 2. Create landing directory for simulation
landing_dir = "data/landing/"
pathlib.Path(landing_dir).mkdir(parents=True, exist_ok=True)

print(f"Aviation Schema (Track D) defined.")
print(f"Landing directory created at: {landing_dir}")
Aviation Schema (Track D) defined.
Landing directory created at: data/landing/

2. Create Streaming Source¶

Use spark.readStream to create a streaming DataFrame from the landing directory.

In [3]:
# Create streaming source from the landing directory
df_stream = (spark.readStream
    .schema(schema)
    .option("header", "true")
    .option("maxFilesPerTrigger", 1)  # Process one file per micro-batch for better UI visualization
    .csv(landing_dir)
    # Crucial step: Convert Unix Long timestamp (seconds) to a Spark Timestamp
    .withColumn("event_time", F.from_unixtime(F.col("time")).cast("timestamp"))
)

print("Streaming source created successfully.")
print("Is streaming active:", df_stream.isStreaming)

# Display the schema to verify 'event_time' is now a TimestampType
df_stream.printSchema()
Streaming source created successfully.
Is streaming active: True
root
 |-- time: long (nullable = true)
 |-- icao24: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- velocity: double (nullable = true)
 |-- heading: double (nullable = true)
 |-- vertrate: double (nullable = true)
 |-- callsign: string (nullable = true)
 |-- onground: boolean (nullable = true)
 |-- alert: boolean (nullable = true)
 |-- spi: boolean (nullable = true)
 |-- squawk: string (nullable = true)
 |-- baroaltitude: double (nullable = true)
 |-- geoaltitude: double (nullable = true)
 |-- lastposupdate: double (nullable = true)
 |-- lastcontact: double (nullable = true)
 |-- event_time: timestamp (nullable = true)

3. Watermark + Windowed Aggregation¶

Apply a watermark on the event-time column, then group by a tumbling/sliding window and your track-specific key.

In [4]:
# Apply watermark and windowed aggregation for Track D
# A 10-minute watermark is used to handle late-arriving flight data
windowed = (df_stream
    .withWatermark("event_time", "10 minutes")
    .groupBy(
        F.window("event_time", "10 minutes"),  # 10-minute tumbling window as per Track D requirements
        F.col("icao24")                        # Group by aircraft ID
    )
    .agg(
        F.count("*").alias("position_updates"),
        F.avg("velocity").alias("avg_velocity"),
        F.max("baroaltitude").alias("max_altitude")
    ))

print("Windowed aggregation defined with a 10-minute tumbling window.")

# Display the resulting schema
print("Windowed schema:")
windowed.printSchema()
Windowed aggregation defined with a 10-minute tumbling window.
Windowed schema:
root
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- icao24: string (nullable = true)
 |-- position_updates: long (nullable = false)
 |-- avg_velocity: double (nullable = true)
 |-- max_altitude: double (nullable = true)

4. Write to Parquet Sink¶

Write the windowed results to a Parquet sink with a trigger interval. Use checkpointLocation for fault tolerance.

In [ ]:
import pandas as pd
import numpy as np
import os

# 1. Configuration
input_file = "data/raw_aviation/states_2017-06-05-00.csv" 
split_dir = "data/raw_splits/"
n_files = 10

# Create the directory for the split files
os.makedirs(split_dir, exist_ok=True)

try:
    # 2. Load the data
    print(f"Reading {input_file}...")
    df = pd.read_csv(input_file)
    
    # 3. Shuffle data to simulate random flight updates
    df = df.sample(frac=1).reset_index(drop=True)
    
    # 4. Split and save
    # We split the dataframe into 'n_files' chunks
    chunks = np.array_split(df, n_files)
    
    for i, chunk in enumerate(chunks):
        chunk_path = os.path.join(split_dir, f"aviation_batch_{i+1}.csv")
        chunk.to_csv(chunk_path, index=False)
        print(f"Generated: {chunk_path} ({len(chunk)} rows)")
        
    print(f"\nSuccessfully created {n_files} files in {split_dir}")
    print("You can now move these files one by one to your 'landing_dir' during the stream.")

except FileNotFoundError:
    print(f"Error: The file '{input_file}' was not found. Please check the file name.")
except Exception as e:
    print(f"An error occurred: {e}")
In [26]:
# Create output and checkpoint directories
pathlib.Path("outputs/stream_sink").mkdir(parents=True, exist_ok=True)
pathlib.Path("outputs/checkpoint").mkdir(parents=True, exist_ok=True)

# Define and start the streaming query
query = (windowed.writeStream
    .format("parquet")
    .outputMode("append") # Required for windowed aggregations with watermarks 
    .option("path", "outputs/stream_sink")
    .option("checkpointLocation", "outputs/checkpoint") # Ensures fault tolerance 
    .trigger(processingTime="10 seconds") # Baseline trigger interval 
    .start())

print(f"Query started successfully.")
print(f"Query Name: {query.name}")
print(f"Query ID: {query.id}")
print(f"Status: {query.status}")

#query.awaitTermination(timeout=300)
#query.stop()
Query started successfully.
Query Name: None
Query ID: 06aedff3-1197-4122-a85b-f6c8395607b8
Status: {'message': 'Initializing sources', 'isDataAvailable': False, 'isTriggerActive': False}
26/04/27 18:14:52 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
In [32]:
import shutil

# --- STEP 1: CLEANUP PREVIOUS RUNS ---
# To avoid conflicts and "Failed to cancel job" warnings, we stop the query 
# and delete old metadata before restarting.
try:
    query.stop()
except NameError:
    pass

# Delete old outputs and checkpoints to start a clean "After" run
folders_to_clean = ["outputs/stream_sink", "outputs/checkpoint"]
for folder in folders_to_clean:
    if pathlib.Path(folder).exists():
        shutil.rmtree(folder)
        print(f"Cleaned up: {folder}")

# --- STEP 2: APPLY OPTIMIZATIONS ---
# 1. Reduce shuffle partitions (Default is 200, which is too much for local mode)
spark.conf.set("spark.sql.shuffle.partitions", "5") 

# --- STEP 3: DEFINE OPTIMIZED STREAM ---
# Here we change the trigger and/or the watermark
pathlib.Path("outputs/stream_sink").mkdir(parents=True, exist_ok=True)
pathlib.Path("outputs/checkpoint").mkdir(parents=True, exist_ok=True)

print("Starting Optimized Query (Run r2)...")

query = (windowed.writeStream
    .format("parquet")
    .outputMode("append")
    .option("path", "outputs/stream_sink")
    .option("checkpointLocation", "outputs/checkpoint")
    .trigger(processingTime="30 seconds") # OPTIMIZATION: More time to breathe
    .start())

print(f"Status: {query.status}")

# --- STEP 4: MONITOR ---
# Let it run for 5 minutes to process your files
# Then stop it properly
# query.awaitTermination(timeout=300)
# query.stop()
Cleaned up: outputs/stream_sink
Cleaned up: outputs/checkpoint
Starting Optimized Query (Run r2)...
Status: {'message': 'Initializing sources', 'isDataAvailable': False, 'isTriggerActive': False}
26/04/27 18:17:56 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
In [38]:
query.stop()
26/04/27 18:28:14 WARN DAGScheduler: Failed to cancel job group f3e9a7d9-37b2-455e-a5a5-6e1734189b49. Cannot find active jobs for it.
26/04/27 18:28:14 WARN DAGScheduler: Failed to cancel job group f3e9a7d9-37b2-455e-a5a5-6e1734189b49. Cannot find active jobs for it.
In [20]:
from pyspark.sql.functions import window, col

windowed_r3 = (
    df_stream 
    .withWatermark("event_time", "5 minutes") 
    .groupBy(window("event_time", "10 minutes", "5 minutes"), "icao24")
    .count()
)

# --- STEP 3: START r3 ---
print("Starting Optimized Query (Run r3: Watermark 5min)...")

query = (windowed_r3.writeStream
    .format("parquet")
    .outputMode("append")
    .option("path", "outputs/stream_sink")
    .option("checkpointLocation", "outputs/checkpoint")
    .trigger(processingTime="30 seconds")
    .start())
Starting Optimized Query (Run r3: Watermark 5min)...
26/04/27 18:04:55 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
                                                                                
In [35]:
# Force an immediate check of the current status
progress = query.lastProgress

if progress is not None:
    print("--- METRICS FOUND ---")
    print(f"Input Rate: {progress.get('inputRowsPerSecond')} rows/sec")
    print(f"Processing Rate: {progress.get('processedRowsPerSecond')} rows/sec")
    print(f"Batch Duration: {progress.get('durationMs', {}).get('triggerExecution')} ms")
    print(f"Rows in Batch: {progress.get('numInputRows')}")
else:
    print("--- NO DATA ACCESSIBLE ---")
    print("The query is active but no micro-batch has completed yet.")
    print("Drop a file into the landing folder and wait 15 seconds.")
    
# Let's also check the history just in case
print(f"\nRecent Progress Count: {len(query.recentProgress)}")
--- METRICS FOUND ---
Input Rate: 5088.457820884578 rows/sec
Processing Rate: 102188.08567603749 rows/sec
Batch Duration: 1494 ms
Rows in Batch: 152669

Recent Progress Count: 3

5. Monitor — query.lastProgress & Streaming UI¶

Capture streaming metrics from query.lastProgress and take screenshots of the Streaming tab in Spark UI.

In [6]:
import os
import json
import pathlib
import pandas as pd
from datetime import datetime
from uuid import UUID
from contextlib import redirect_stdout

def capture_lab_evidence(query, run_id, trigger_val, watermark_val, note):
    """
    Captures all required evidence: CSV metrics, JSON progress, and execution plan.
    """
    # 1. Setup folders
    pathlib.Path("proof").mkdir(parents=True, exist_ok=True)
    
    # 2. Capture progress data
    progress = query.lastProgress
    if not progress:
        print(f" No progress found for {run_id}. Waiting for a batch...")
        return

    # 3. Save JSON Progress (UUID fix included)
    def uuid_serializer(obj):
        if isinstance(obj, UUID): return str(obj)
        raise TypeError
        
    json_path = f"proof/query_progress_{run_id}.json"
    with open(json_path, "w") as f:
        json.dump(progress, f, indent=2, default=uuid_serializer)

    # 4. Save Formatted Plan
    plan_path = f"proof/plan_streaming_{run_id}.txt"
    with open(plan_path, "w") as f:
        with redirect_stdout(f):
            windowed.explain(mode="formatted")

    # 5. Update CSV Metrics Log
    metrics = {
        "run_id": run_id,
        "trigger_interval": trigger_val,
        "watermark_duration": watermark_val,
        "inputRowsPerSecond": progress.get("inputRowsPerSecond", 0),
        "processedRowsPerSecond": progress.get("processedRowsPerSecond", 0),
        "numInputRows": progress.get("numInputRows", 0),
        "stateRows": progress.get("stateOperators", [{}])[0].get("numRowsTotal", 0) if progress.get("stateOperators") else 0,
        "durationMs": progress.get("durationMs", {}).get("triggerExecution", 0),
        "note": note,
        "timestamp": datetime.now().isoformat()
    }

    csv_file = "lab1_metrics_log.csv"
    df = pd.DataFrame([metrics])
    header_needed = not os.path.exists(csv_file)
    df.to_csv(csv_file, mode='a', header=header_needed, index=False)
    
    print(f" Run {run_id} captured successfully!")
    print(f"- Metrics: {csv_file}")
    print(f"- JSON: {json_path}")
    print(f"- Plan: {plan_path}")
    display(df)

# --- CALL THIS AFTER DROPPING FILES FOR R3 ---
# capture_lab_evidence(query, "r3", "30s", "5min", "Watermark reduction test")
In [29]:
capture_lab_evidence(
    query, 
    run_id="r1", 
    trigger_val="10s", 
    watermark_val="10min", 
    note="Baseline run with default shuffle partitions"
)
 Run r1 captured successfully!
- Metrics: lab1_metrics_log.csv
- JSON: proof/query_progress_r1.json
- Plan: proof/plan_streaming_r1.txt
run_id trigger_interval watermark_duration inputRowsPerSecond processedRowsPerSecond numInputRows stateRows durationMs note timestamp
0 r1 10s 10min 15269.953991 110469.609262 152669 10481 1382 Baseline run with default shuffle partitions 2026-04-27T18:16:28.138927
In [34]:
capture_lab_evidence(
    query, 
    run_id="r2", 
    trigger_val="30s", 
    watermark_val="10min", 
    note="Optimized shuffle (5) and trigger (30s)"
)
 Run r2 captured successfully!
- Metrics: lab1_metrics_log.csv
- JSON: proof/query_progress_r2.json
- Plan: proof/plan_streaming_r2.txt
run_id trigger_interval watermark_duration inputRowsPerSecond processedRowsPerSecond numInputRows stateRows durationMs note timestamp
0 r2 30s 10min 52991.669559 102393.695506 152669 10342 1490 Optimized shuffle (5) and trigger (30s) 2026-04-27T18:18:30.645266
In [23]:
capture_lab_evidence(
    query, 
    run_id="r3", 
    trigger_val="30s", 
    watermark_val="5min", 
    note="Reduced watermark to 5min - Final Optimization"
)
 Run r3 captured successfully!
- Metrics: lab1_metrics_log.csv
- JSON: proof/query_progress_r3.json
- Plan: proof/plan_streaming_r3.txt
run_id trigger_interval watermark_duration inputRowsPerSecond processedRowsPerSecond numInputRows stateRows durationMs note timestamp
0 r3 30s 5min 5087.949077 115308.912387 152669 14997 1324 Reduced watermark to 5min - Final Optimization 2026-04-27T18:07:18.381641
In [ ]:
# TODO: Capture query.lastProgress
# progress = query.lastProgress
# print(json.dumps(progress, indent=2))
#
# # Save progress JSON as evidence
# pathlib.Path("proof").mkdir(parents=True, exist_ok=True)
# with open("proof/query_progress.json", "w") as f:
#     json.dump(progress, f, indent=2)
#
# # Key metrics to record:
# # - inputRowsPerSecond
# # - processedRowsPerSecond
# # - durationMs (triggerExecution, getBatch, etc.)
# # - stateOperators (numRowsTotal, numRowsUpdated)
#
# # TODO: Save streaming plan
# # windowed.explain("formatted")
# # Copy to proof/plan_streaming.txt
#
# # TODO: Open Spark UI → Streaming tab, take screenshots → proof/

6. Before/After Optimization¶

Re-run the pipeline with a different trigger interval or watermark duration. Compare throughput and latency metrics.

Before: trigger(processingTime="10 seconds"), watermark "10 minutes" After: Change one parameter (e.g., trigger(processingTime="30 seconds") or watermark "5 minutes") and compare.

In [39]:
# TODO: Re-run with different trigger/watermark, capture metrics
# Compare inputRowsPerSecond, processedRowsPerSecond, durationMs
# Record both runs in lab1_metrics_log.csv

# Example metrics log structure:
# run_id, trigger_interval, watermark_duration, inputRowsPerSecond,
# processedRowsPerSecond, numInputRows, stateRows, durationMs, timestamp

spark.stop()
print("Lab 1 complete.")
Lab 1 complete.
26/04/27 18:29:27 WARN StateStore: Error running maintenance thread
java.lang.IllegalStateException: SparkEnv not active, cannot do maintenance on StateStores
	at org.apache.spark.sql.execution.streaming.state.StateStore$.doMaintenance(StateStore.scala:971)
	at org.apache.spark.sql.execution.streaming.state.StateStore$.$anonfun$startMaintenanceIfNeeded$1(StateStore.scala:945)
	at org.apache.spark.sql.execution.streaming.state.StateStore$MaintenanceTask$$anon$1.run(StateStore.scala:746)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
	at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:358)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)
In [ ]: