DE2 -- Assignment 1: Streaming Pipeline¶

Author : Badr TAJINI - Data Engineering II (Data-Intensive Workloads) - ESIEE 2025-2026

Track: D - Aviation (OpenSky Network)

Names: Justine Guirauden and Volcy Desmazures

0. Setup¶

In [1]:
import os, sys, time, datetime, pathlib, json, shutil
import pandas as pd
import numpy as np
from uuid import UUID
from contextlib import redirect_stdout
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import (
    StructType, StructField, StringType, IntegerType,
    TimestampType, DoubleType, LongType, BooleanType
)

spark = (
    SparkSession.builder
    .appName("de2-assignment1-aviation")
    .master("local[*]")
    # Baseline: default shuffle partitions (200)
    .getOrCreate()
)

print("Spark:", spark.version)
print("Spark UI:", spark.sparkContext.uiWebUrl)
WARNING: Using incubator modules: jdk.incubator.vector
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/04/29 09:53:11 WARN Utils: Your hostname, OrdideJustine, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
26/04/29 09:53:11 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/04/29 09:53:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark: 4.0.1
Spark UI: http://10.255.255.254:4040

1. Define Your Schema and Stream Source¶

Track D: Aviation (OpenSky Network)

In [2]:
# ── Schema for Track D (OpenSky aviation data) ─────────────────────────────
event_schema = StructType([
    StructField("time",          LongType(),   True),  # Unix epoch (seconds)
    StructField("icao24",        StringType(), True),  # Unique aircraft identifier
    StructField("lat",           DoubleType(), True),
    StructField("lon",           DoubleType(), True),
    StructField("velocity",      DoubleType(), True),  # m/s
    StructField("heading",       DoubleType(), True),  # degrees
    StructField("vertrate",      DoubleType(), True),  # m/s vertical speed
    StructField("callsign",      StringType(), True),
    StructField("onground",      BooleanType(),True),
    StructField("alert",         BooleanType(),True),
    StructField("spi",           BooleanType(),True),
    StructField("squawk",        StringType(), True),
    StructField("baroaltitude",  DoubleType(), True),  # meters
    StructField("geoaltitude",   DoubleType(), True),  # meters
    StructField("lastposupdate", DoubleType(), True),
    StructField("lastcontact",   DoubleType(), True)
])

# ── Streaming parameters ────────────────────────────────────────────────────
EVENT_TIME_COL   = "event_time"     # Derived timestamp column
WINDOW_DURATION  = "10 minutes"     # Tumbling window size
WATERMARK_DELAY  = "10 minutes"     # Late-data tolerance (baseline)

LANDING_DIR      = "data/landing/"
pathlib.Path(LANDING_DIR).mkdir(parents=True, exist_ok=True)
print(f"Landing directory: {LANDING_DIR}")

# ── Streaming source ────────────────────────────────────────────────────────
# maxFilesPerTrigger=1 → one CSV per micro-batch (better UI visibility)
df_stream = (
    spark.readStream
    .schema(event_schema)
    .option("header", "true")
    .option("maxFilesPerTrigger", 1)
    .csv(LANDING_DIR)
    # Convert Unix Long (seconds) → TimestampType for windowing
    .withColumn(EVENT_TIME_COL, F.from_unixtime(F.col("time")).cast("timestamp"))
)

print("Is streaming:", df_stream.isStreaming)
df_stream.printSchema()
Landing directory: data/landing/
Is streaming: True
root
 |-- time: long (nullable = true)
 |-- icao24: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- velocity: double (nullable = true)
 |-- heading: double (nullable = true)
 |-- vertrate: double (nullable = true)
 |-- callsign: string (nullable = true)
 |-- onground: boolean (nullable = true)
 |-- alert: boolean (nullable = true)
 |-- spi: boolean (nullable = true)
 |-- squawk: string (nullable = true)
 |-- baroaltitude: double (nullable = true)
 |-- geoaltitude: double (nullable = true)
 |-- lastposupdate: double (nullable = true)
 |-- lastcontact: double (nullable = true)
 |-- event_time: timestamp (nullable = true)

2. Windowed Aggregation + Watermark¶

A 10-minute tumbling window groups position updates per aircraft (icao24).
The 10-minute watermark tells Spark to discard state for data arriving more than 10 minutes late, bounding memory usage while still handling realistic ATC delays.
Three aggregates are computed: position update count, average velocity, and maximum barometric altitude.

In [3]:
# ── Baseline windowed aggregation (r1) ─────────────────────────────────────
windowed = (
    df_stream
    .withWatermark(EVENT_TIME_COL, WATERMARK_DELAY)      # Late-data bound
    .groupBy(
        F.window(EVENT_TIME_COL, WINDOW_DURATION),       # 10-min tumbling window
        F.col("icao24")                                  # Group key: aircraft ID
    )
    .agg(
        F.count("*")           .alias("position_updates"),
        F.avg("velocity")      .alias("avg_velocity_ms"),
        F.max("baroaltitude")  .alias("max_baro_altitude_m")
    )
)

print("Windowed aggregation schema:")
windowed.printSchema()
Windowed aggregation schema:
root
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- icao24: string (nullable = true)
 |-- position_updates: long (nullable = false)
 |-- avg_velocity_ms: double (nullable = true)
 |-- max_baro_altitude_m: double (nullable = true)

2b. Data preparation : simulate streaming source¶

Split the raw CSV into 10 batches so files can be dropped one at a time into data/landing/.

In [4]:
# ── Prepare split files (run once before starting the stream) ───────────────
INPUT_FILE = "data/raw_aviation/states_2017-06-05-00.csv"   # dataset
SPLIT_DIR  = "data/raw_splits/"
N_FILES    = 10

os.makedirs(SPLIT_DIR, exist_ok=True)

try:
    df_raw = pd.read_csv(INPUT_FILE)
    df_raw = df_raw.sample(frac=1).reset_index(drop=True)   # shuffle
    for i, chunk in enumerate(np.array_split(df_raw, N_FILES)):
        path = os.path.join(SPLIT_DIR, f"aviation_batch_{i+1:02d}.csv")
        chunk.to_csv(path, index=False)
        print(f"  Generated {path}  ({len(chunk)} rows)")
    print(f"\n{N_FILES} files ready in {SPLIT_DIR}")
    print("Move them one-by-one into data/landing/ while the stream is running.")
except FileNotFoundError:
    print(f"[WARN] {INPUT_FILE} not found — place your dataset there first.")
except Exception as exc:
    print(f"[ERROR] {exc}")
/home/justine/miniconda3/envs/de1-env/lib/python3.10/site-packages/numpy/_core/fromnumeric.py:57: FutureWarning: 'DataFrame.swapaxes' is deprecated and will be removed in a future version. Please use 'DataFrame.transpose' instead.
  return bound(*args, **kwds)
  Generated data/raw_splits/aviation_batch_01.csv  (152669 rows)
  Generated data/raw_splits/aviation_batch_02.csv  (152669 rows)
  Generated data/raw_splits/aviation_batch_03.csv  (152669 rows)
  Generated data/raw_splits/aviation_batch_04.csv  (152669 rows)
  Generated data/raw_splits/aviation_batch_05.csv  (152669 rows)
  Generated data/raw_splits/aviation_batch_06.csv  (152669 rows)
  Generated data/raw_splits/aviation_batch_07.csv  (152669 rows)
  Generated data/raw_splits/aviation_batch_08.csv  (152669 rows)
  Generated data/raw_splits/aviation_batch_09.csv  (152669 rows)
  Generated data/raw_splits/aviation_batch_10.csv  (152668 rows)

10 files ready in data/raw_splits/
Move them one-by-one into data/landing/ while the stream is running.

3. Write Stream to Parquet : Baseline (r1)¶

  • outputMode = append: required for windowed aggregations with watermark on Parquet.
  • checkpointLocation: guarantees exactly-once processing (WAL + offset tracking).
  • trigger 10 s: baseline micro-batch interval.
In [5]:
# ── Output / checkpoint paths ───────────────────────────────────────────────
SINK_PATH  = "outputs/lab1/stream_sink"
CKPT_PATH  = "outputs/lab1/checkpoint"

pathlib.Path(SINK_PATH).mkdir(parents=True, exist_ok=True)
pathlib.Path(CKPT_PATH).mkdir(parents=True, exist_ok=True)

# ── Start baseline query (r1) ───────────────────────────────────────────────
query = (
    windowed.writeStream
    .format("parquet")
    .outputMode("append")                       # Only new closed windows
    .option("path", SINK_PATH)
    .option("checkpointLocation", CKPT_PATH)    # Exactly-once guarantee
    .trigger(processingTime="10 seconds")        # Baseline: 10-s micro-batch
    .start()
)

print("Query name :", query.name)
print("Query ID   :", query.id)
print("Status     :", query.status)
print(f"\nStreaming UI → http://localhost:4040/StreamingQuery/")
print("Drop aviation_batch_*.csv files into data/landing/ to feed the stream.")
26/04/29 09:55:55 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
Query name : None
Query ID   : 77ff833e-cf3d-438d-931c-ca9f0108186a
Status     : {'message': 'Initializing sources', 'isDataAvailable': False, 'isTriggerActive': False}

Streaming UI → http://localhost:4040/StreamingQuery/
Drop aviation_batch_*.csv files into data/landing/ to feed the stream.
26/04/29 09:57:42 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 10000} milliseconds, but spent 22964 milliseconds
26/04/29 09:57:44 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
26/04/29 09:58:06 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 10000} milliseconds, but spent 23972 milliseconds
26/04/29 09:59:05 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 10000} milliseconds, but spent 15965 milliseconds
26/04/29 09:59:45 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 10000} milliseconds, but spent 15909 milliseconds
26/04/29 10:00:14 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 10000} milliseconds, but spent 14994 milliseconds
26/04/29 10:01:04 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 10000} milliseconds, but spent 14632 milliseconds
26/04/29 10:01:33 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 10000} milliseconds, but spent 13998 milliseconds

4. Monitor and Capture Evidence¶

In [6]:
def capture_evidence(query, run_id, trigger_val, watermark_val, note):
    """Save metrics CSV row, JSON progress, and formatted execution plan."""
    pathlib.Path("proof").mkdir(parents=True, exist_ok=True)

    progress = query.lastProgress
    if not progress:
        print(f"[{run_id}] No progress yet — drop a file and wait 15 s.")
        return

    # 1. JSON progress dump
    def _serial(obj):
        if isinstance(obj, UUID): return str(obj)
        raise TypeError
    json_path = f"proof/query_progress_{run_id}.json"
    with open(json_path, "w") as f:
        json.dump(progress, f, indent=2, default=_serial)

    # 2. Formatted physical plan
    plan_path = f"proof/plan_streaming_{run_id}.txt"
    with open(plan_path, "w") as f:
        with redirect_stdout(f):
            windowed.explain(mode="formatted")

    # Also write the canonical proof/plan_streaming.txt expected by the rubric
    with open("proof/plan_streaming.txt", "w") as f:
        with redirect_stdout(f):
            windowed.explain(mode="formatted")

    # 3. CSV metrics log
    state_ops = progress.get("stateOperators", [{}])
    metrics = {
        "run_id":                  run_id,
        "trigger_interval":        trigger_val,
        "watermark_duration":      watermark_val,
        "inputRowsPerSecond":      progress.get("inputRowsPerSecond", 0),
        "processedRowsPerSecond":  progress.get("processedRowsPerSecond", 0),
        "numInputRows":            progress.get("numInputRows", 0),
        "stateRows":               state_ops[0].get("numRowsTotal", 0) if state_ops else 0,
        "durationMs_trigger":      progress.get("durationMs", {}).get("triggerExecution", 0),
        "durationMs_getBatch":     progress.get("durationMs", {}).get("getBatch", 0),
        "note":                    note,
        "timestamp":               datetime.datetime.now().isoformat()
    }
    csv_path = "lab1_metrics_log.csv"
    df_m = pd.DataFrame([metrics])
    header = not os.path.exists(csv_path)
    df_m.to_csv(csv_path, mode="a", header=header, index=False)

    print(f"[{run_id}] Evidence captured.")
    print(f"  Metrics  → {csv_path}")
    print(f"  JSON     → {json_path}")
    print(f"  Plan     → {plan_path}")
    return df_m


# ── Quick progress check ────────────────────────────────────────────────────
def show_progress(query):
    p = query.lastProgress
    if p:
        print(f"  inputRowsPerSecond      : {p.get('inputRowsPerSecond')}")
        print(f"  processedRowsPerSecond  : {p.get('processedRowsPerSecond')}")
        print(f"  numInputRows            : {p.get('numInputRows')}")
        print(f"  triggerExecution (ms)   : {p.get('durationMs', {}).get('triggerExecution')}")
        print(f"  recentProgress count    : {len(query.recentProgress)}")
    else:
        print("No batch completed yet — drop a file into data/landing/ and wait.")

show_progress(query)
  inputRowsPerSecond      : 0.0
  processedRowsPerSecond  : 0.0
  numInputRows            : 0
  triggerExecution (ms)   : 84
[Stage 15:====>                                                  (16 + 8) / 200]
  recentProgress count    : 19
26/04/29 10:02:25 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 10000} milliseconds, but spent 15692 milliseconds
26/04/29 10:03:05 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 10000} milliseconds, but spent 15536 milliseconds
In [10]:
# Run after >= 5 micro-batches have completed for run r1
capture_evidence(
    query,
    run_id="r1",
    trigger_val="10s",
    watermark_val="10min",
    note="Baseline — trigger 10s, watermark 10min, shuffle.partitions=200"
)
[Stage 21:================================>                     (120 + 8) / 200]
[r1] Evidence captured.
  Metrics  → lab1_metrics_log.csv
  JSON     → proof/query_progress_r1.json
  Plan     → proof/plan_streaming_r1.txt
[Stage 21:================================>                     (120 + 9) / 200]
Out[10]:
run_id trigger_interval watermark_duration inputRowsPerSecond processedRowsPerSecond numInputRows stateRows durationMs_trigger durationMs_getBatch note timestamp
0 r1 10s 10min 0.0 0.0 0 10679 127 0 Baseline — trigger 10s, watermark 10min, shuff... 2026-04-29T10:04:19.559382
26/04/29 10:04:23 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 10000} milliseconds, but spent 13955 milliseconds

5. Optimize and Re-Measure¶

Two optimizations applied for r2:

  1. spark.sql.shuffle.partitions = 5 (default 200 is wasteful on a local 1-node cluster).
  2. Trigger increased to 30 s → fewer trigger-overhead cycles, larger batches → higher throughput per trigger.

r3 further reduces watermark to 5 minutes to test state-store compaction speed.

In [11]:
# ── Stop baseline, clean outputs, apply optimizations ──────────────────────
try:
    query.stop()
    print("Baseline query stopped.")
except Exception:
    pass

for folder in [SINK_PATH, CKPT_PATH]:
    if pathlib.Path(folder).exists():
        shutil.rmtree(folder)
        print(f"Cleaned: {folder}")

# Optimization 1 — reduce shuffle partitions (most impactful on local mode)
spark.conf.set("spark.sql.shuffle.partitions", "5")
print("shuffle.partitions set to 5")

pathlib.Path(SINK_PATH).mkdir(parents=True, exist_ok=True)
pathlib.Path(CKPT_PATH).mkdir(parents=True, exist_ok=True)

# Optimization 2 — larger trigger interval → fewer scheduling overheads
query = (
    windowed.writeStream
    .format("parquet")
    .outputMode("append")
    .option("path", SINK_PATH)
    .option("checkpointLocation", CKPT_PATH)
    .trigger(processingTime="30 seconds")     # CHANGED from 10s
    .start()
)
print("Optimized query (r2) started — trigger=30s, shuffle.partitions=5")
26/04/29 10:08:16 WARN DAGScheduler: Failed to cancel job group 8641015d-2d9f-437f-9c01-c53d74bbde31. Cannot find active jobs for it.
26/04/29 10:08:16 WARN DAGScheduler: Failed to cancel job group 8641015d-2d9f-437f-9c01-c53d74bbde31. Cannot find active jobs for it.
Baseline query stopped.
Cleaned: outputs/lab1/stream_sink
Cleaned: outputs/lab1/checkpoint
shuffle.partitions set to 5
26/04/29 10:08:27 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
Optimized query (r2) started — trigger=30s, shuffle.partitions=5
                                                                                
In [14]:
# Capture r2 metrics (after >= 5 batches)
capture_evidence(
    query,
    run_id="r2",
    trigger_val="30s",
    watermark_val="10min",
    note="Optimized — trigger 30s, shuffle.partitions=5"
)
[r2] Evidence captured.
  Metrics  → lab1_metrics_log.csv
  JSON     → proof/query_progress_r2.json
  Plan     → proof/plan_streaming_r2.txt
Out[14]:
run_id trigger_interval watermark_duration inputRowsPerSecond processedRowsPerSecond numInputRows stateRows durationMs_trigger durationMs_getBatch note timestamp
0 r2 30s 10min 0.0 0.0 0 10658 46 0 Optimized — trigger 30s, shuffle.partitions=5 2026-04-29T10:14:18.343398
In [15]:
# ── r3: reduced watermark ───────────────────────────────────────────────────
try:
    query.stop()
except Exception:
    pass

for folder in [SINK_PATH, CKPT_PATH]:
    if pathlib.Path(folder).exists():
        shutil.rmtree(folder)

pathlib.Path(SINK_PATH).mkdir(parents=True, exist_ok=True)
pathlib.Path(CKPT_PATH).mkdir(parents=True, exist_ok=True)

# Redefine aggregation with shorter watermark
windowed_r3 = (
    df_stream
    .withWatermark(EVENT_TIME_COL, "5 minutes")          # CHANGED watermark
    .groupBy(
        F.window(EVENT_TIME_COL, "10 minutes", "5 minutes"),  # sliding window
        F.col("icao24")
    )
    .agg(
        F.count("*")          .alias("position_updates"),
        F.avg("velocity")     .alias("avg_velocity_ms"),
        F.max("baroaltitude") .alias("max_baro_altitude_m")
    )
)

query = (
    windowed_r3.writeStream
    .format("parquet")
    .outputMode("append")
    .option("path", SINK_PATH)
    .option("checkpointLocation", CKPT_PATH)
    .trigger(processingTime="30 seconds")
    .start()
)
print("r3 started — watermark=5min, sliding window 10min/5min, trigger=30s")
26/04/29 10:14:20 WARN DAGScheduler: Failed to cancel job group fda2d31f-34e2-4fe4-88fc-a63250dc8b31. Cannot find active jobs for it.
26/04/29 10:14:20 WARN DAGScheduler: Failed to cancel job group fda2d31f-34e2-4fe4-88fc-a63250dc8b31. Cannot find active jobs for it.
r3 started — watermark=5min, sliding window 10min/5min, trigger=30s
26/04/29 10:14:20 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
In [18]:
capture_evidence(
    query,
    run_id="r3",
    trigger_val="30s",
    watermark_val="5min",
    note="Watermark reduced to 5min + sliding window 10min/5min"
)
[r3] Evidence captured.
  Metrics  → lab1_metrics_log.csv
  JSON     → proof/query_progress_r3.json
  Plan     → proof/plan_streaming_r3.txt
Out[18]:
run_id trigger_interval watermark_duration inputRowsPerSecond processedRowsPerSecond numInputRows stateRows durationMs_trigger durationMs_getBatch note timestamp
0 r3 30s 5min 5089.136305 80183.298319 152669 15056 1903 60 Watermark reduced to 5min + sliding window 10m... 2026-04-29T10:18:22.012668

6. Fill Metrics Log¶

The CSV lab1_metrics_log.csv is appended by capture_evidence() above.
This cell reads it back for a final display.

In [21]:
import pandas as pd
df_log = pd.read_csv("lab1_metrics_log.csv")
print(df_log[['run_id', 'numInputRows']]) 
   run_id  numInputRows
r1    10s         10674
r1    10s         10679
r2    30s         10630
r2    30s         10658
r3    30s         15012
r3    30s         15056
In [19]:
# ── Read and display final metrics table ────────────────────────────────────
csv_path = "lab1_metrics_log.csv"

if os.path.exists(csv_path):
    df_log = pd.read_csv(csv_path)
    print("=== lab1_metrics_log.csv ===")
    print(df_log.to_string(index=False))

    # Delta comparison r1 → r2
    if len(df_log) >= 2:
        r1 = df_log[df_log.run_id == "r1"].iloc[-1]
        r2 = df_log[df_log.run_id == "r2"].iloc[-1]
        delta_proc = r2.processedRowsPerSecond - r1.processedRowsPerSecond
        delta_dur  = r2.durationMs_trigger - r1.durationMs_trigger
        print(f"\nOptimization gain (r1 → r2):")
        print(f"  processedRowsPerSecond : {delta_proc:+.2f} rows/s")
        print(f"  triggerExecution       : {delta_dur:+.0f} ms")
else:
    print("lab1_metrics_log.csv not yet created — run capture_evidence() first.")
=== lab1_metrics_log.csv ===
run_id trigger_interval  watermark_duration  inputRowsPerSecond  processedRowsPerSecond  numInputRows  stateRows  durationMs                                                            note                  timestamp
   10s            10min            0.000000            0.000000                       0         10674        101           0 Baseline — trigger 10s, watermark 10min, shuffle.partitions=200 2026-04-29T10:03:27.749116
   10s            10min            0.000000            0.000000                       0         10679        127           0 Baseline — trigger 10s, watermark 10min, shuffle.partitions=200 2026-04-29T10:04:19.559382
   30s            10min         5088.966667        61934.685598                  152669         10630       2465          64                   Optimized — trigger 30s, shuffle.partitions=5 2026-04-29T10:13:20.019672
   30s            10min            0.000000            0.000000                       0         10658         46           0                   Optimized — trigger 30s, shuffle.partitions=5 2026-04-29T10:14:13.341648
   30s             5min         5089.136305       112091.776799                  152669         15012       1361          53           Watermark reduced to 5min + sliding window 10min/5min 2026-04-29T10:17:11.624451
   30s             5min         5089.136305        80183.298319                  152669         15056       1903          60           Watermark reduced to 5min + sliding window 10min/5min 2026-04-29T10:18:02.824017
---------------------------------------------------------------------------
IndexError                                Traceback (most recent call last)
Cell In[19], line 11
      9 # Delta comparison r1 → r2
     10 if len(df_log) >= 2:
---> 11     r1 = df_log[df_log.run_id == "r1"].iloc[-1]
     12     r2 = df_log[df_log.run_id == "r2"].iloc[-1]
     13     delta_proc = r2.processedRowsPerSecond - r1.processedRowsPerSecond

File ~/miniconda3/envs/de1-env/lib/python3.10/site-packages/pandas/core/indexing.py:1192, in _LocationIndexer.__getitem__(self, key)
   1190 maybe_callable = com.apply_if_callable(key, self.obj)
   1191 maybe_callable = self._check_deprecated_callable_usage(key, maybe_callable)
-> 1192 return self._getitem_axis(maybe_callable, axis=axis)

File ~/miniconda3/envs/de1-env/lib/python3.10/site-packages/pandas/core/indexing.py:1753, in _iLocIndexer._getitem_axis(self, key, axis)
   1750     raise TypeError("Cannot index by location index with a non-integer key")
   1752 # validate the location
-> 1753 self._validate_integer(key, axis)
   1755 return self.obj._ixs(key, axis=axis)

File ~/miniconda3/envs/de1-env/lib/python3.10/site-packages/pandas/core/indexing.py:1686, in _iLocIndexer._validate_integer(self, key, axis)
   1684 len_axis = len(self.obj._get_axis(axis))
   1685 if key >= len_axis or key < -len_axis:
-> 1686     raise IndexError("single positional indexer is out-of-bounds")

IndexError: single positional indexer is out-of-bounds

7. Cleanup¶

In [22]:
try:
    query.stop()
    print("Streaming query stopped.")
except Exception:
    pass

spark.stop()
print("SparkSession closed.")
print("\nDeliverables checklist:")
print("  [x] assignment1_esiee.ipynb — this notebook")
print("  [x] outputs/lab1/stream_sink/ — Parquet output")
print("  [x] outputs/lab1/checkpoint/ — checkpoint directory")
print("  [x] proof/plan_streaming.txt — formatted physical plan")
print("  [x] lab1_metrics_log.csv — >= 3 runs (r1, r2, r3)")
print("  [x] engineering_note.md — one-page design note")
print("  [x] GENAI.md — AI usage declaration")
26/04/29 10:33:12 WARN DAGScheduler: Failed to cancel job group 707bce97-5f0b-46a6-913d-d9d5e65aea1a. Cannot find active jobs for it.
26/04/29 10:33:12 WARN DAGScheduler: Failed to cancel job group 707bce97-5f0b-46a6-913d-d9d5e65aea1a. Cannot find active jobs for it.
Streaming query stopped.
SparkSession closed.

Deliverables checklist:
  [x] assignment1_esiee.ipynb — this notebook
  [x] outputs/lab1/stream_sink/ — Parquet output
  [x] outputs/lab1/checkpoint/ — checkpoint directory
  [x] proof/plan_streaming.txt — formatted physical plan
  [x] lab1_metrics_log.csv — >= 3 runs (r1, r2, r3)
  [x] engineering_note.md — one-page design note
  [x] GENAI.md — AI usage declaration
In [ ]: