DE2 -- Assignment 1: Streaming Pipeline¶
Author : Badr TAJINI - Data Engineering II (Data-Intensive Workloads) - ESIEE 2025-2026
Track: D - Aviation (OpenSky Network)
Names: Justine Guirauden and Volcy Desmazures
0. Setup¶
import os, sys, time, datetime, pathlib, json, shutil
import pandas as pd
import numpy as np
from uuid import UUID
from contextlib import redirect_stdout
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import (
StructType, StructField, StringType, IntegerType,
TimestampType, DoubleType, LongType, BooleanType
)
spark = (
SparkSession.builder
.appName("de2-assignment1-aviation")
.master("local[*]")
# Baseline: default shuffle partitions (200)
.getOrCreate()
)
print("Spark:", spark.version)
print("Spark UI:", spark.sparkContext.uiWebUrl)
WARNING: Using incubator modules: jdk.incubator.vector Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties 26/04/29 09:53:11 WARN Utils: Your hostname, OrdideJustine, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo) 26/04/29 09:53:11 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 26/04/29 09:53:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark: 4.0.1 Spark UI: http://10.255.255.254:4040
1. Define Your Schema and Stream Source¶
Track D: Aviation (OpenSky Network)
# ── Schema for Track D (OpenSky aviation data) ─────────────────────────────
event_schema = StructType([
StructField("time", LongType(), True), # Unix epoch (seconds)
StructField("icao24", StringType(), True), # Unique aircraft identifier
StructField("lat", DoubleType(), True),
StructField("lon", DoubleType(), True),
StructField("velocity", DoubleType(), True), # m/s
StructField("heading", DoubleType(), True), # degrees
StructField("vertrate", DoubleType(), True), # m/s vertical speed
StructField("callsign", StringType(), True),
StructField("onground", BooleanType(),True),
StructField("alert", BooleanType(),True),
StructField("spi", BooleanType(),True),
StructField("squawk", StringType(), True),
StructField("baroaltitude", DoubleType(), True), # meters
StructField("geoaltitude", DoubleType(), True), # meters
StructField("lastposupdate", DoubleType(), True),
StructField("lastcontact", DoubleType(), True)
])
# ── Streaming parameters ────────────────────────────────────────────────────
EVENT_TIME_COL = "event_time" # Derived timestamp column
WINDOW_DURATION = "10 minutes" # Tumbling window size
WATERMARK_DELAY = "10 minutes" # Late-data tolerance (baseline)
LANDING_DIR = "data/landing/"
pathlib.Path(LANDING_DIR).mkdir(parents=True, exist_ok=True)
print(f"Landing directory: {LANDING_DIR}")
# ── Streaming source ────────────────────────────────────────────────────────
# maxFilesPerTrigger=1 → one CSV per micro-batch (better UI visibility)
df_stream = (
spark.readStream
.schema(event_schema)
.option("header", "true")
.option("maxFilesPerTrigger", 1)
.csv(LANDING_DIR)
# Convert Unix Long (seconds) → TimestampType for windowing
.withColumn(EVENT_TIME_COL, F.from_unixtime(F.col("time")).cast("timestamp"))
)
print("Is streaming:", df_stream.isStreaming)
df_stream.printSchema()
Landing directory: data/landing/ Is streaming: True root |-- time: long (nullable = true) |-- icao24: string (nullable = true) |-- lat: double (nullable = true) |-- lon: double (nullable = true) |-- velocity: double (nullable = true) |-- heading: double (nullable = true) |-- vertrate: double (nullable = true) |-- callsign: string (nullable = true) |-- onground: boolean (nullable = true) |-- alert: boolean (nullable = true) |-- spi: boolean (nullable = true) |-- squawk: string (nullable = true) |-- baroaltitude: double (nullable = true) |-- geoaltitude: double (nullable = true) |-- lastposupdate: double (nullable = true) |-- lastcontact: double (nullable = true) |-- event_time: timestamp (nullable = true)
2. Windowed Aggregation + Watermark¶
A 10-minute tumbling window groups position updates per aircraft (icao24).
The 10-minute watermark tells Spark to discard state for data arriving more than 10 minutes late, bounding memory usage while still handling realistic ATC delays.
Three aggregates are computed: position update count, average velocity, and maximum barometric altitude.
# ── Baseline windowed aggregation (r1) ─────────────────────────────────────
windowed = (
df_stream
.withWatermark(EVENT_TIME_COL, WATERMARK_DELAY) # Late-data bound
.groupBy(
F.window(EVENT_TIME_COL, WINDOW_DURATION), # 10-min tumbling window
F.col("icao24") # Group key: aircraft ID
)
.agg(
F.count("*") .alias("position_updates"),
F.avg("velocity") .alias("avg_velocity_ms"),
F.max("baroaltitude") .alias("max_baro_altitude_m")
)
)
print("Windowed aggregation schema:")
windowed.printSchema()
Windowed aggregation schema: root |-- window: struct (nullable = false) | |-- start: timestamp (nullable = true) | |-- end: timestamp (nullable = true) |-- icao24: string (nullable = true) |-- position_updates: long (nullable = false) |-- avg_velocity_ms: double (nullable = true) |-- max_baro_altitude_m: double (nullable = true)
2b. Data preparation : simulate streaming source¶
Split the raw CSV into 10 batches so files can be dropped one at a time into data/landing/.
# ── Prepare split files (run once before starting the stream) ───────────────
INPUT_FILE = "data/raw_aviation/states_2017-06-05-00.csv" # dataset
SPLIT_DIR = "data/raw_splits/"
N_FILES = 10
os.makedirs(SPLIT_DIR, exist_ok=True)
try:
df_raw = pd.read_csv(INPUT_FILE)
df_raw = df_raw.sample(frac=1).reset_index(drop=True) # shuffle
for i, chunk in enumerate(np.array_split(df_raw, N_FILES)):
path = os.path.join(SPLIT_DIR, f"aviation_batch_{i+1:02d}.csv")
chunk.to_csv(path, index=False)
print(f" Generated {path} ({len(chunk)} rows)")
print(f"\n{N_FILES} files ready in {SPLIT_DIR}")
print("Move them one-by-one into data/landing/ while the stream is running.")
except FileNotFoundError:
print(f"[WARN] {INPUT_FILE} not found — place your dataset there first.")
except Exception as exc:
print(f"[ERROR] {exc}")
/home/justine/miniconda3/envs/de1-env/lib/python3.10/site-packages/numpy/_core/fromnumeric.py:57: FutureWarning: 'DataFrame.swapaxes' is deprecated and will be removed in a future version. Please use 'DataFrame.transpose' instead. return bound(*args, **kwds)
Generated data/raw_splits/aviation_batch_01.csv (152669 rows) Generated data/raw_splits/aviation_batch_02.csv (152669 rows) Generated data/raw_splits/aviation_batch_03.csv (152669 rows) Generated data/raw_splits/aviation_batch_04.csv (152669 rows) Generated data/raw_splits/aviation_batch_05.csv (152669 rows) Generated data/raw_splits/aviation_batch_06.csv (152669 rows) Generated data/raw_splits/aviation_batch_07.csv (152669 rows) Generated data/raw_splits/aviation_batch_08.csv (152669 rows) Generated data/raw_splits/aviation_batch_09.csv (152669 rows) Generated data/raw_splits/aviation_batch_10.csv (152668 rows) 10 files ready in data/raw_splits/ Move them one-by-one into data/landing/ while the stream is running.
3. Write Stream to Parquet : Baseline (r1)¶
- outputMode = append: required for windowed aggregations with watermark on Parquet.
- checkpointLocation: guarantees exactly-once processing (WAL + offset tracking).
- trigger 10 s: baseline micro-batch interval.
# ── Output / checkpoint paths ───────────────────────────────────────────────
SINK_PATH = "outputs/lab1/stream_sink"
CKPT_PATH = "outputs/lab1/checkpoint"
pathlib.Path(SINK_PATH).mkdir(parents=True, exist_ok=True)
pathlib.Path(CKPT_PATH).mkdir(parents=True, exist_ok=True)
# ── Start baseline query (r1) ───────────────────────────────────────────────
query = (
windowed.writeStream
.format("parquet")
.outputMode("append") # Only new closed windows
.option("path", SINK_PATH)
.option("checkpointLocation", CKPT_PATH) # Exactly-once guarantee
.trigger(processingTime="10 seconds") # Baseline: 10-s micro-batch
.start()
)
print("Query name :", query.name)
print("Query ID :", query.id)
print("Status :", query.status)
print(f"\nStreaming UI → http://localhost:4040/StreamingQuery/")
print("Drop aviation_batch_*.csv files into data/landing/ to feed the stream.")
26/04/29 09:55:55 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
Query name : None
Query ID : 77ff833e-cf3d-438d-931c-ca9f0108186a
Status : {'message': 'Initializing sources', 'isDataAvailable': False, 'isTriggerActive': False}
Streaming UI → http://localhost:4040/StreamingQuery/
Drop aviation_batch_*.csv files into data/landing/ to feed the stream.
26/04/29 09:57:42 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 10000} milliseconds, but spent 22964 milliseconds 26/04/29 09:57:44 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 95.00% for 8 writers 26/04/29 09:58:06 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 10000} milliseconds, but spent 23972 milliseconds 26/04/29 09:59:05 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 10000} milliseconds, but spent 15965 milliseconds 26/04/29 09:59:45 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 10000} milliseconds, but spent 15909 milliseconds 26/04/29 10:00:14 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 10000} milliseconds, but spent 14994 milliseconds 26/04/29 10:01:04 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 10000} milliseconds, but spent 14632 milliseconds 26/04/29 10:01:33 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 10000} milliseconds, but spent 13998 milliseconds
4. Monitor and Capture Evidence¶
def capture_evidence(query, run_id, trigger_val, watermark_val, note):
"""Save metrics CSV row, JSON progress, and formatted execution plan."""
pathlib.Path("proof").mkdir(parents=True, exist_ok=True)
progress = query.lastProgress
if not progress:
print(f"[{run_id}] No progress yet — drop a file and wait 15 s.")
return
# 1. JSON progress dump
def _serial(obj):
if isinstance(obj, UUID): return str(obj)
raise TypeError
json_path = f"proof/query_progress_{run_id}.json"
with open(json_path, "w") as f:
json.dump(progress, f, indent=2, default=_serial)
# 2. Formatted physical plan
plan_path = f"proof/plan_streaming_{run_id}.txt"
with open(plan_path, "w") as f:
with redirect_stdout(f):
windowed.explain(mode="formatted")
# Also write the canonical proof/plan_streaming.txt expected by the rubric
with open("proof/plan_streaming.txt", "w") as f:
with redirect_stdout(f):
windowed.explain(mode="formatted")
# 3. CSV metrics log
state_ops = progress.get("stateOperators", [{}])
metrics = {
"run_id": run_id,
"trigger_interval": trigger_val,
"watermark_duration": watermark_val,
"inputRowsPerSecond": progress.get("inputRowsPerSecond", 0),
"processedRowsPerSecond": progress.get("processedRowsPerSecond", 0),
"numInputRows": progress.get("numInputRows", 0),
"stateRows": state_ops[0].get("numRowsTotal", 0) if state_ops else 0,
"durationMs_trigger": progress.get("durationMs", {}).get("triggerExecution", 0),
"durationMs_getBatch": progress.get("durationMs", {}).get("getBatch", 0),
"note": note,
"timestamp": datetime.datetime.now().isoformat()
}
csv_path = "lab1_metrics_log.csv"
df_m = pd.DataFrame([metrics])
header = not os.path.exists(csv_path)
df_m.to_csv(csv_path, mode="a", header=header, index=False)
print(f"[{run_id}] Evidence captured.")
print(f" Metrics → {csv_path}")
print(f" JSON → {json_path}")
print(f" Plan → {plan_path}")
return df_m
# ── Quick progress check ────────────────────────────────────────────────────
def show_progress(query):
p = query.lastProgress
if p:
print(f" inputRowsPerSecond : {p.get('inputRowsPerSecond')}")
print(f" processedRowsPerSecond : {p.get('processedRowsPerSecond')}")
print(f" numInputRows : {p.get('numInputRows')}")
print(f" triggerExecution (ms) : {p.get('durationMs', {}).get('triggerExecution')}")
print(f" recentProgress count : {len(query.recentProgress)}")
else:
print("No batch completed yet — drop a file into data/landing/ and wait.")
show_progress(query)
inputRowsPerSecond : 0.0 processedRowsPerSecond : 0.0 numInputRows : 0 triggerExecution (ms) : 84
[Stage 15:====> (16 + 8) / 200]
recentProgress count : 19
26/04/29 10:02:25 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 10000} milliseconds, but spent 15692 milliseconds 26/04/29 10:03:05 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 10000} milliseconds, but spent 15536 milliseconds
# Run after >= 5 micro-batches have completed for run r1
capture_evidence(
query,
run_id="r1",
trigger_val="10s",
watermark_val="10min",
note="Baseline — trigger 10s, watermark 10min, shuffle.partitions=200"
)
[Stage 21:================================> (120 + 8) / 200]
[r1] Evidence captured. Metrics → lab1_metrics_log.csv JSON → proof/query_progress_r1.json Plan → proof/plan_streaming_r1.txt
[Stage 21:================================> (120 + 9) / 200]
| run_id | trigger_interval | watermark_duration | inputRowsPerSecond | processedRowsPerSecond | numInputRows | stateRows | durationMs_trigger | durationMs_getBatch | note | timestamp | |
|---|---|---|---|---|---|---|---|---|---|---|---|
| 0 | r1 | 10s | 10min | 0.0 | 0.0 | 0 | 10679 | 127 | 0 | Baseline — trigger 10s, watermark 10min, shuff... | 2026-04-29T10:04:19.559382 |
26/04/29 10:04:23 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 10000} milliseconds, but spent 13955 milliseconds
5. Optimize and Re-Measure¶
Two optimizations applied for r2:
spark.sql.shuffle.partitions = 5(default 200 is wasteful on a local 1-node cluster).- Trigger increased to 30 s → fewer trigger-overhead cycles, larger batches → higher throughput per trigger.
r3 further reduces watermark to 5 minutes to test state-store compaction speed.
# ── Stop baseline, clean outputs, apply optimizations ──────────────────────
try:
query.stop()
print("Baseline query stopped.")
except Exception:
pass
for folder in [SINK_PATH, CKPT_PATH]:
if pathlib.Path(folder).exists():
shutil.rmtree(folder)
print(f"Cleaned: {folder}")
# Optimization 1 — reduce shuffle partitions (most impactful on local mode)
spark.conf.set("spark.sql.shuffle.partitions", "5")
print("shuffle.partitions set to 5")
pathlib.Path(SINK_PATH).mkdir(parents=True, exist_ok=True)
pathlib.Path(CKPT_PATH).mkdir(parents=True, exist_ok=True)
# Optimization 2 — larger trigger interval → fewer scheduling overheads
query = (
windowed.writeStream
.format("parquet")
.outputMode("append")
.option("path", SINK_PATH)
.option("checkpointLocation", CKPT_PATH)
.trigger(processingTime="30 seconds") # CHANGED from 10s
.start()
)
print("Optimized query (r2) started — trigger=30s, shuffle.partitions=5")
26/04/29 10:08:16 WARN DAGScheduler: Failed to cancel job group 8641015d-2d9f-437f-9c01-c53d74bbde31. Cannot find active jobs for it. 26/04/29 10:08:16 WARN DAGScheduler: Failed to cancel job group 8641015d-2d9f-437f-9c01-c53d74bbde31. Cannot find active jobs for it.
Baseline query stopped. Cleaned: outputs/lab1/stream_sink Cleaned: outputs/lab1/checkpoint shuffle.partitions set to 5
26/04/29 10:08:27 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
Optimized query (r2) started — trigger=30s, shuffle.partitions=5
# Capture r2 metrics (after >= 5 batches)
capture_evidence(
query,
run_id="r2",
trigger_val="30s",
watermark_val="10min",
note="Optimized — trigger 30s, shuffle.partitions=5"
)
[r2] Evidence captured. Metrics → lab1_metrics_log.csv JSON → proof/query_progress_r2.json Plan → proof/plan_streaming_r2.txt
| run_id | trigger_interval | watermark_duration | inputRowsPerSecond | processedRowsPerSecond | numInputRows | stateRows | durationMs_trigger | durationMs_getBatch | note | timestamp | |
|---|---|---|---|---|---|---|---|---|---|---|---|
| 0 | r2 | 30s | 10min | 0.0 | 0.0 | 0 | 10658 | 46 | 0 | Optimized — trigger 30s, shuffle.partitions=5 | 2026-04-29T10:14:18.343398 |
# ── r3: reduced watermark ───────────────────────────────────────────────────
try:
query.stop()
except Exception:
pass
for folder in [SINK_PATH, CKPT_PATH]:
if pathlib.Path(folder).exists():
shutil.rmtree(folder)
pathlib.Path(SINK_PATH).mkdir(parents=True, exist_ok=True)
pathlib.Path(CKPT_PATH).mkdir(parents=True, exist_ok=True)
# Redefine aggregation with shorter watermark
windowed_r3 = (
df_stream
.withWatermark(EVENT_TIME_COL, "5 minutes") # CHANGED watermark
.groupBy(
F.window(EVENT_TIME_COL, "10 minutes", "5 minutes"), # sliding window
F.col("icao24")
)
.agg(
F.count("*") .alias("position_updates"),
F.avg("velocity") .alias("avg_velocity_ms"),
F.max("baroaltitude") .alias("max_baro_altitude_m")
)
)
query = (
windowed_r3.writeStream
.format("parquet")
.outputMode("append")
.option("path", SINK_PATH)
.option("checkpointLocation", CKPT_PATH)
.trigger(processingTime="30 seconds")
.start()
)
print("r3 started — watermark=5min, sliding window 10min/5min, trigger=30s")
26/04/29 10:14:20 WARN DAGScheduler: Failed to cancel job group fda2d31f-34e2-4fe4-88fc-a63250dc8b31. Cannot find active jobs for it. 26/04/29 10:14:20 WARN DAGScheduler: Failed to cancel job group fda2d31f-34e2-4fe4-88fc-a63250dc8b31. Cannot find active jobs for it.
r3 started — watermark=5min, sliding window 10min/5min, trigger=30s
26/04/29 10:14:20 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
capture_evidence(
query,
run_id="r3",
trigger_val="30s",
watermark_val="5min",
note="Watermark reduced to 5min + sliding window 10min/5min"
)
[r3] Evidence captured. Metrics → lab1_metrics_log.csv JSON → proof/query_progress_r3.json Plan → proof/plan_streaming_r3.txt
| run_id | trigger_interval | watermark_duration | inputRowsPerSecond | processedRowsPerSecond | numInputRows | stateRows | durationMs_trigger | durationMs_getBatch | note | timestamp | |
|---|---|---|---|---|---|---|---|---|---|---|---|
| 0 | r3 | 30s | 5min | 5089.136305 | 80183.298319 | 152669 | 15056 | 1903 | 60 | Watermark reduced to 5min + sliding window 10m... | 2026-04-29T10:18:22.012668 |
6. Fill Metrics Log¶
The CSV lab1_metrics_log.csv is appended by capture_evidence() above.
This cell reads it back for a final display.
import pandas as pd
df_log = pd.read_csv("lab1_metrics_log.csv")
print(df_log[['run_id', 'numInputRows']])
run_id numInputRows r1 10s 10674 r1 10s 10679 r2 30s 10630 r2 30s 10658 r3 30s 15012 r3 30s 15056
# ── Read and display final metrics table ────────────────────────────────────
csv_path = "lab1_metrics_log.csv"
if os.path.exists(csv_path):
df_log = pd.read_csv(csv_path)
print("=== lab1_metrics_log.csv ===")
print(df_log.to_string(index=False))
# Delta comparison r1 → r2
if len(df_log) >= 2:
r1 = df_log[df_log.run_id == "r1"].iloc[-1]
r2 = df_log[df_log.run_id == "r2"].iloc[-1]
delta_proc = r2.processedRowsPerSecond - r1.processedRowsPerSecond
delta_dur = r2.durationMs_trigger - r1.durationMs_trigger
print(f"\nOptimization gain (r1 → r2):")
print(f" processedRowsPerSecond : {delta_proc:+.2f} rows/s")
print(f" triggerExecution : {delta_dur:+.0f} ms")
else:
print("lab1_metrics_log.csv not yet created — run capture_evidence() first.")
=== lab1_metrics_log.csv === run_id trigger_interval watermark_duration inputRowsPerSecond processedRowsPerSecond numInputRows stateRows durationMs note timestamp 10s 10min 0.000000 0.000000 0 10674 101 0 Baseline — trigger 10s, watermark 10min, shuffle.partitions=200 2026-04-29T10:03:27.749116 10s 10min 0.000000 0.000000 0 10679 127 0 Baseline — trigger 10s, watermark 10min, shuffle.partitions=200 2026-04-29T10:04:19.559382 30s 10min 5088.966667 61934.685598 152669 10630 2465 64 Optimized — trigger 30s, shuffle.partitions=5 2026-04-29T10:13:20.019672 30s 10min 0.000000 0.000000 0 10658 46 0 Optimized — trigger 30s, shuffle.partitions=5 2026-04-29T10:14:13.341648 30s 5min 5089.136305 112091.776799 152669 15012 1361 53 Watermark reduced to 5min + sliding window 10min/5min 2026-04-29T10:17:11.624451 30s 5min 5089.136305 80183.298319 152669 15056 1903 60 Watermark reduced to 5min + sliding window 10min/5min 2026-04-29T10:18:02.824017
--------------------------------------------------------------------------- IndexError Traceback (most recent call last) Cell In[19], line 11 9 # Delta comparison r1 → r2 10 if len(df_log) >= 2: ---> 11 r1 = df_log[df_log.run_id == "r1"].iloc[-1] 12 r2 = df_log[df_log.run_id == "r2"].iloc[-1] 13 delta_proc = r2.processedRowsPerSecond - r1.processedRowsPerSecond File ~/miniconda3/envs/de1-env/lib/python3.10/site-packages/pandas/core/indexing.py:1192, in _LocationIndexer.__getitem__(self, key) 1190 maybe_callable = com.apply_if_callable(key, self.obj) 1191 maybe_callable = self._check_deprecated_callable_usage(key, maybe_callable) -> 1192 return self._getitem_axis(maybe_callable, axis=axis) File ~/miniconda3/envs/de1-env/lib/python3.10/site-packages/pandas/core/indexing.py:1753, in _iLocIndexer._getitem_axis(self, key, axis) 1750 raise TypeError("Cannot index by location index with a non-integer key") 1752 # validate the location -> 1753 self._validate_integer(key, axis) 1755 return self.obj._ixs(key, axis=axis) File ~/miniconda3/envs/de1-env/lib/python3.10/site-packages/pandas/core/indexing.py:1686, in _iLocIndexer._validate_integer(self, key, axis) 1684 len_axis = len(self.obj._get_axis(axis)) 1685 if key >= len_axis or key < -len_axis: -> 1686 raise IndexError("single positional indexer is out-of-bounds") IndexError: single positional indexer is out-of-bounds
7. Cleanup¶
try:
query.stop()
print("Streaming query stopped.")
except Exception:
pass
spark.stop()
print("SparkSession closed.")
print("\nDeliverables checklist:")
print(" [x] assignment1_esiee.ipynb — this notebook")
print(" [x] outputs/lab1/stream_sink/ — Parquet output")
print(" [x] outputs/lab1/checkpoint/ — checkpoint directory")
print(" [x] proof/plan_streaming.txt — formatted physical plan")
print(" [x] lab1_metrics_log.csv — >= 3 runs (r1, r2, r3)")
print(" [x] engineering_note.md — one-page design note")
print(" [x] GENAI.md — AI usage declaration")
26/04/29 10:33:12 WARN DAGScheduler: Failed to cancel job group 707bce97-5f0b-46a6-913d-d9d5e65aea1a. Cannot find active jobs for it. 26/04/29 10:33:12 WARN DAGScheduler: Failed to cancel job group 707bce97-5f0b-46a6-913d-d9d5e65aea1a. Cannot find active jobs for it.
Streaming query stopped. SparkSession closed. Deliverables checklist: [x] assignment1_esiee.ipynb — this notebook [x] outputs/lab1/stream_sink/ — Parquet output [x] outputs/lab1/checkpoint/ — checkpoint directory [x] proof/plan_streaming.txt — formatted physical plan [x] lab1_metrics_log.csv — >= 3 runs (r1, r2, r3) [x] engineering_note.md — one-page design note [x] GENAI.md — AI usage declaration