DE2 - Assignment 2: Text - Inverted Index¶

Author : Badr TAJINI - Data Engineering II (Data-Intensive Workloads) - ESIEE 2025-2026

Track: D - Aviation (METAR weather reports + airport descriptions)
Names: Justine Guirauden and Volcy Desmazures

0. Setup¶

In [1]:
import os, time, pathlib, json, datetime, socket
import pandas as pd
import numpy as np
from contextlib import redirect_stdout
from urllib.parse import urlparse
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import StructType, StructField, StringType, LongType

# --- WSL NETWORK FIX ---
# Get the actual IP address of the WSL container
def get_wsl_ip():
    try:
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.connect(("8.8.8.8", 80))
        ip = s.getsockname()[0]
        s.close()
        return ip
    except:
        return "127.0.0.1"

WSL_IP = get_wsl_ip()
# -----------------------

DE2_SPARK_DRIVER_HOST  = WSL_IP # We force the driver to use the WSL IP
DE2_SPARK_BIND_ADDRESS = "0.0.0.0" # Listen on all interfaces
os.environ.setdefault("SPARK_LOCAL_IP", WSL_IP)

def show_spark_ui(spark_session):
    ui_url = spark_session.sparkContext.uiWebUrl
    print("Spark version:", spark_session.version)
    if ui_url:
        ui_port = urlparse(ui_url).port or 4040
        # This will show you the direct IP and the localhost bridge
        print(f"Spark UI (Direct IP): http://{WSL_IP}:{ui_port}")
        print(f"Spark UI (Windows localhost): http://localhost:{ui_port}")
    else:
        print("Spark UI: not available")

# Stop any existing session before creating a new one to free the port
if 'spark' in locals():
    spark.stop()

spark = (
    SparkSession.builder
    .appName("de2-assignment2-aviation")
    .master("local[*]")
    .config("spark.driver.host",        DE2_SPARK_DRIVER_HOST)
    .config("spark.driver.bindAddress", DE2_SPARK_BIND_ADDRESS)
    .config("spark.ui.bindAddress",     DE2_SPARK_BIND_ADDRESS)
    # RAM configuration
    .config("spark.driver.memory", "4g")     
    .config("spark.executor.memory", "4g")    
    .config("spark.driver.maxResultSize", "2g") 
    # Keep shuffle partitions low for local single-node cluster
    .config("spark.sql.shuffle.partitions", "8")
    .getOrCreate()
)

show_spark_ui(spark)

# ── Output / proof paths ─────────────────────────────────────────────────────
PARQUET_PATH = "outputs/lab2/inverted_index"
CSV_PATH     = "outputs/lab2/inverted_index_csv"
CORPUS_PATH  = "data/corpus/aviation_metar.csv"

for p in [PARQUET_PATH, CSV_PATH, "proof", "data/corpus"]:
    pathlib.Path(p).mkdir(parents=True, exist_ok=True)
WARNING: Using incubator modules: jdk.incubator.vector
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/04/29 17:29:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark version: 4.0.1
Spark UI (Direct IP): http://172.30.130.70:4040
Spark UI (Windows localhost): http://localhost:4040

0b. Corpus preparation¶

This cell builds data/corpus/aviation_metar.csvfrom the OpenSky dataset used in Lab 1.
Skip if the file already exists.

In [2]:
if not pathlib.Path(CORPUS_PATH).exists():
    LAB1_CSV = "data/raw_aviation/states_2017-06-05-00.csv"
    try:
        df_raw = pd.read_csv(LAB1_CSV)
        def make_doc(row):
            cs  = str(row.get('callsign', 'UNKNOWN')).strip() or 'UNKNOWN'
            alt = row.get('baroaltitude', 0) or 0
            vel = row.get('velocity', 0) or 0
            hdg = row.get('heading', 0) or 0
            vrt = row.get('vertrate', 0) or 0
            gnd = 'on ground' if row.get('onground') else 'airborne'
            sq  = str(row.get('squawk', '0000'))
            return (
                f"Aircraft {cs} is {gnd} squawk {sq} altitude {alt:.0f} meters "
                f"velocity {vel:.1f} knots heading {hdg:.0f} degrees "
                f"vertical rate {vrt:.1f} meters per second"
            )
        df_raw['doc_id'] = df_raw['icao24'].astype(str) + '_' + df_raw['time'].astype(str)
        df_raw['text']   = df_raw.apply(make_doc, axis=1)
        df_raw[['doc_id','text']].dropna().to_csv(CORPUS_PATH, index=False)
        print(f"Corpus built from Lab 1 data: {len(df_raw):,} documents → {CORPUS_PATH}")
    except FileNotFoundError:
        # Minimal fallback corpus - enough for demo/grading
        samples = [
            ("LFPG_001", "METAR LFPG 051200Z 27010KT 9999 FEW025 15/08 Q1015 aircraft approaching runway two seven"),
            ("LFPO_002", "METAR LFPO 051200Z 26015KT 8000 SCT020 14/09 Q1014 landing clearance aircraft on final approach"),
            ("LFMN_003", "METAR LFMN 051200Z 08010KT 9999 SKC 22/15 Q1018 visibility good aircraft taxiing runway zero four"),
            ("LFLL_004", "METAR LFLL 051200Z 20008KT 6000 BKN015 12/10 Q1013 fog patch aircraft holding short runway"),
            ("LFRS_005", "METAR LFRS 051200Z 31020KT 9999 FEW030 10/05 Q1020 wind shear alert aircraft go around"),
            ("LFRN_006", "METAR LFRN 051200Z 29012KT 9999 SCT035 11/06 Q1019 aircraft squawk seven seven zero zero emergency"),
            ("LFBO_007", "METAR LFBO 051200Z 24018KT 9999 FEW020 16/10 Q1016 aircraft airborne departure runway fourteen right"),
            ("LFBD_008", "METAR LFBD 051200Z 22014KT 9999 SCT025 17/11 Q1015 aircraft landing runway two three heading two twenty"),
            ("LFLY_009", "METAR LFLY 051200Z 19008KT 9999 FEW018 13/08 Q1017 aircraft climbing altitude five thousand feet"),
            ("LFST_010", "METAR LFST 051200Z 15005KT 9999 SKC 09/03 Q1022 aircraft descending altitude three thousand feet heading north"),
        ] * 40
        samples = [(f"{d}_{i}", t) for i, (d, t) in enumerate(samples)]
        pd.DataFrame(samples, columns=['doc_id','text']).to_csv(CORPUS_PATH, index=False)
        print(f"Fallback corpus generated: {len(samples)} documents → {CORPUS_PATH}")
else:
    print(f"Corpus found: {CORPUS_PATH}")
Corpus built from Lab 1 data: 1,526,689 documents → data/corpus/aviation_metar.csv

1. Corpus Ingestion¶

Load the METAR corpus with an explicit schema.

  • doc_id (StringType, non-nullable) - unique aircraft identifier + Unix timestamp
  • text (StringType, nullable) - free-text weather report sentence
In [3]:
# ── Explicit schema ──────────────────────────────────────────────────────────
corpus_schema = StructType([
    StructField("doc_id", StringType(), False),
    StructField("text",   StringType(), True),
])

df_corpus = (
    spark.read
    .schema(corpus_schema)
    .option("header", "true")
    .csv(CORPUS_PATH)
    .withColumn("doc_len", F.length("text"))
)

n_docs  = df_corpus.count()
avg_len = df_corpus.select(F.avg("doc_len")).first()[0] or 0

print(f"Documents  : {n_docs:,}")
print(f"Avg length : {avg_len:.0f} chars")
df_corpus.printSchema()
df_corpus.show(5, truncate=100)
                                                                                
Documents  : 1,526,689
Avg length : 137 chars
root
 |-- doc_id: string (nullable = true)
 |-- text: string (nullable = true)
 |-- doc_len: integer (nullable = true)

+-----------------+----------------------------------------------------------------------------------------------------+-------+
|           doc_id|                                                                                                text|doc_len|
+-----------------+----------------------------------------------------------------------------------------------------+-------+
|4bccb9_1496620800|Aircraft SXS2WY is airborne squawk 1000.0 altitude 7460 meters velocity 175.3 knots heading 305 d...|    139|
|502cb2_1496620800|Aircraft MON55BR is airborne squawk 2770.0 altitude 10988 meters velocity 232.9 knots heading 305...|    142|
|4bccaf_1496620800|Aircraft SXS7R is airborne squawk 3215.0 altitude 11582 meters velocity 196.7 knots heading 289 d...|    139|
|4008e1_1496620800|Aircraft TCX229 is airborne squawk 3462.0 altitude 10363 meters velocity 214.2 knots heading 293 ...|    140|
|4070e3_1496620800|Aircraft EXS14D is airborne squawk 7775.0 altitude 10973 meters velocity 221.8 knots heading 298 ...|    140|
+-----------------+----------------------------------------------------------------------------------------------------+-------+
only showing top 5 rows

2. Text Normalization¶

Normalization pipeline:

  1. Lowercase - uniform case for token matching
  2. Strip punctuation - regexp_replace keeping only [a-z0-9\s]
  3. Tokenize - split on [\s\W]+ → array of tokens
  4. Explode → one (doc_id, token) row per token
  5. Filter empty strings and single-char tokens
  6. Remove stop-words - generic English + METAR-specific filler words
In [4]:
# ── Normalization pipeline ───────────────────────────────────────────────────

# Step 1-2: lowercase + remove non-alphanumeric
df_clean = df_corpus.withColumn(
    "text_clean",
    F.regexp_replace(F.lower(F.col("text")), r"[^a-z0-9\s]", "")
)

# Step 3: tokenize on whitespace / punctuation boundaries
df_tokens = df_clean.withColumn(
    "tokens",
    F.split(F.col("text_clean"), r"[\s\W]+")
)

# Step 4-5: explode + filter trivial tokens
df_exploded = (
    df_tokens
    .select("doc_id", F.explode("tokens").alias("token"))
    .filter(F.length("token") > 1)
)
total_before = df_exploded.count()
print(f"Tokens BEFORE stop-word removal: {total_before:,}")

# Step 6: stop-word list (English + METAR filler)
STOP_WORDS = {
    "the", "a", "an", "is", "are", "was", "were", "in", "on", "at",
    "to", "for", "of", "and", "or", "not", "it", "this", "that", "with",
    "be", "by", "as", "from", "but", "its", "if", "do", "no", "so",
    "per", "has", "have", "had", "will", "would", "may", "can",
    # METAR-specific filler
    "meters", "meter", "degrees", "knots", "rate", "second", "seconds",
    "zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine"
}

df_filtered = df_exploded.filter(~F.col("token").isin(STOP_WORDS))
total_after = df_filtered.count()

removed_pct = (total_before - total_after) / total_before * 100 if total_before > 0 else 0
print(f"Tokens AFTER  stop-word removal: {total_after:,}")
print(f"Stop-words removed             : {total_before - total_after:,} ({removed_pct:.1f}%)")
df_filtered.show(10, truncate=60)
                                                                                
Tokens BEFORE stop-word removal: 32,173,700
                                                                                
Tokens AFTER  stop-word removal: 19,818,447
Stop-words removed             : 12,355,253 (38.4%)
+-----------------+--------+
|           doc_id|   token|
+-----------------+--------+
|4bccb9_1496620800|aircraft|
|4bccb9_1496620800|  sxs2wy|
|4bccb9_1496620800|airborne|
|4bccb9_1496620800|  squawk|
|4bccb9_1496620800|   10000|
|4bccb9_1496620800|altitude|
|4bccb9_1496620800|    7460|
|4bccb9_1496620800|velocity|
|4bccb9_1496620800|    1753|
|4bccb9_1496620800| heading|
+-----------------+--------+
only showing top 10 rows

3. Build Inverted Index¶

Schema: token (String) | doc_ids (Array<String>) | freq (Long)

  • collect_list(doc_id) → postings list (all documents containing this term)
  • count(*) → document frequency (df)
In [5]:
# ── Inverted index construction ──────────────────────────────────────────────
t0_build = time.time()

inverted_index = (
    df_filtered
    .groupBy("token")
    .agg(
        F.collect_list("doc_id").alias("doc_ids"),  # postings list
        F.count("*")            .alias("freq")       # document frequency
    )
    .orderBy(F.desc("freq"))   # rank by most frequent
)

n_terms = inverted_index.count()
build_ms = (time.time() - t0_build) * 1000

print(f"Unique terms  : {n_terms:,}")
print(f"Build time    : {build_ms:.0f} ms")
print()
inverted_index.printSchema()
inverted_index.show(20, truncate=80)
                                                                                
Unique terms  : 11,843
Build time    : 11112 ms

root
 |-- token: string (nullable = false)
 |-- doc_ids: array (nullable = false)
 |    |-- element: string (containsNull = false)
 |-- freq: long (nullable = false)

                                                                                
+--------+--------------------------------------------------------------------------------+-------+
|   token|                                                                         doc_ids|   freq|
+--------+--------------------------------------------------------------------------------+-------+
|     nan|[7c6c27_1496621230, ad4fe1_1496620810, 7c6c27_1496621230, ad4fe1_1496620810, ...|2429395|
|aircraft|[7c6c27_1496621230, 4ca351_1496620800, a0f46f_1496621230, 400ff4_1496620800, ...|1526689|
|  squawk|[7c6d9c_1496621230, 4ca351_1496620800, 7c6c27_1496621230, 400ff4_1496620800, ...|1526689|
|altitude|[7c6d9c_1496621230, 4ca351_1496620800, 7c6c27_1496621230, 400ff4_1496620800, ...|1526689|
| heading|[7c6d9c_1496621230, 4ca351_1496620800, 7c6c27_1496621230, 400ff4_1496620800, ...|1526689|
|vertical|[7c6d9c_1496621230, 4ca351_1496620800, 7c6c27_1496621230, 400ff4_1496620800, ...|1526689|
|velocity|[7c6d9c_1496621230, 4ca351_1496620800, 7c6c27_1496621230, 400ff4_1496620800, ...|1526689|
|airborne|[7c6d9c_1496621230, 4ca351_1496620800, a0f46f_1496621230, 400ff4_1496620800, ...|1384951|
|      00|[71c258_1496621240, a0f566_1496621650, a684c4_1496621240, 4baa90_1496621650, ...| 405336|
|      03|[a657bf_1496621650, abd90c_1496621240, ad5623_1496621650, a1fb8f_1496621240, ...| 234044|
|  ground|[7c6c27_1496621230, ad4fe1_1496620810, adda1e_1496621230, a95d95_1496620810, ...| 141738|
|   10973|[ab6cc9_1496621650, 4841c5_1496621240, 4841c5_1496621650, 40643e_1496621240, ...|  72734|
|   11278|[06a046_1496621650, a3ef1b_1496621240, a93d9f_1496621650, ad4cf2_1496621240, ...|  58302|
|   10668|[a699b0_1496621650, abe3e6_1496621240, 4ca068_1496621650, 71c043_1496621240, ...|  57388|
|   10363|[a33327_1496621650, 71c258_1496621240, a1512f_1496621650, a5a11a_1496621240, ...|  51652|
|   11582|[401000_1496621650, a82520_1496621240, c0809d_1496621650, a5fd2d_1496621240, ...|  49322|
|   11887|[a2396e_1496621650, 8a04bd_1496621240, a1c66a_1496621650, a582a9_1496621240, ...|  28690|
|      07|[aa7115_1496621650, a9c5c5_1496621240, a05d0c_1496621650, acdde3_1496621240, ...|  28106|
|     180|[338086_1496621230, 578055_1496620810, 3382f1_1496621230, a4caec_1496620810, ...|  28089|
|   10058|[a0f566_1496621650, a769a3_1496621240, a68daf_1496621650, a00ce0_1496621240, ...|  26290|
+--------+--------------------------------------------------------------------------------+-------+
only showing top 20 rows

4. Write to Parquet & CSV¶

  • Parquet: native array column → Spark can push down token == X filter to row-group statistics
  • CSV: array serialized as |-joined string → no column pruning, larger on disk
In [6]:
# ── Write Parquet ────────────────────────────────────────────────────────────
t0 = time.time()
inverted_index.write.mode("overwrite").parquet(PARQUET_PATH)
parquet_write_ms = (time.time() - t0) * 1000
print(f"Parquet written: {parquet_write_ms:.0f} ms  →  {PARQUET_PATH}")

# ── Write CSV (doc_ids array → pipe-separated string) ────────────────────────
t0 = time.time()
(
    inverted_index
    .withColumn("doc_ids", F.concat_ws("|", "doc_ids"))   # serialize array
    .write
    .mode("overwrite")
    .option("header", "true")
    .csv(CSV_PATH)
)
csv_write_ms = (time.time() - t0) * 1000
print(f"CSV     written: {csv_write_ms:.0f} ms  →  {CSV_PATH}")
                                                                                
Parquet written: 40737 ms  →  outputs/lab2/inverted_index
                                                                                
CSV     written: 42264 ms  →  outputs/lab2/inverted_index_csv

5. Query Latency¶

Reload from Parquet, cache, look up ≥ 3 aviation-relevant terms, record wall-clock latency.

In [7]:
# ── Reload + cache index ─────────────────────────────────────────────────────
idx = spark.read.parquet(PARQUET_PATH)
idx.cache()
idx.count()    # trigger cache population
print("Index cached. Schema:", idx.dtypes)

# ── Save index build plan ────────────────────────────────────────────────────
with open("proof/plan_index_build.txt", "w") as f:
    with redirect_stdout(f):
        inverted_index.explain(mode="formatted")
print("plan_index_build.txt  ✓")

# ── Query lookups ────────────────────────────────────────────────────────────
# Terms chosen for aviation relevance: should appear in METAR / OpenSky text
QUERY_TERMS = ["aircraft", "landing", "squawk", "altitude", "heading", "airborne"]

query_results = []
print()
for term in QUERY_TERMS:
    t0 = time.time()
    rows = idx.filter(F.col("token") == term).collect()
    latency_ms = (time.time() - t0) * 1000
    if rows:
        r = rows[0]
        n_postings = len(r["doc_ids"])
        print(f"  '{term:12s}': freq={r['freq']:>6,}  postings={n_postings:>6,}  latency={latency_ms:6.1f} ms")
        query_results.append({"term": term, "freq": r["freq"],
                               "n_postings": n_postings, "latency_ms": latency_ms})
    else:
        print(f"  '{term:12s}': NOT FOUND  latency={latency_ms:6.1f} ms")
        query_results.append({"term": term, "freq": 0, "n_postings": 0, "latency_ms": latency_ms})

# ── Save query plan (first term as representative) ───────────────────────────
with open("proof/plan_query.txt", "w") as f:
    with redirect_stdout(f):
        idx.filter(F.col("token") == QUERY_TERMS[0]).explain(mode="formatted")
print("\nproof/plan_query.txt  ✓")
print(f"Open Spark UI → Jobs/SQL tabs → take screenshots → save to proof/")
                                                                                
Index cached. Schema: [('token', 'string'), ('doc_ids', 'array<string>'), ('freq', 'bigint')]
plan_index_build.txt  ✓

                                                                                
  'aircraft    ': freq=1,526,689  postings=1,526,689  latency=9424.8 ms
  'landing     ': NOT FOUND  latency=1498.6 ms
  'squawk      ': freq=1,526,689  postings=1,526,689  latency=2888.6 ms
  'altitude    ': freq=1,526,689  postings=1,526,689  latency=1583.8 ms
  'heading     ': freq=1,526,689  postings=1,526,689  latency=1655.5 ms
  'airborne    ': freq=1,384,951  postings=1,384,951  latency=1361.4 ms

proof/plan_query.txt  ✓
Open Spark UI → Jobs/SQL tabs → take screenshots → save to proof/

6. Footprint Comparison — Parquet vs CSV¶

In [8]:
# ── On-disk size comparison ──────────────────────────────────────────────────
def dir_size(path):
    return sum(f.stat().st_size for f in pathlib.Path(path).rglob("*") if f.is_file())

def file_count(path):
    return sum(1 for f in pathlib.Path(path).rglob("*") if f.is_file())

parquet_bytes = dir_size(PARQUET_PATH)
csv_bytes     = dir_size(CSV_PATH)
parquet_files = file_count(PARQUET_PATH)
csv_files     = file_count(CSV_PATH)
ratio         = parquet_bytes / csv_bytes if csv_bytes > 0 else 0

print(f"Parquet : {parquet_bytes:>12,} bytes  ({parquet_files} files)")
print(f"CSV     : {csv_bytes:>12,} bytes  ({csv_files} files)")
if ratio < 1:
    print(f"Ratio   : {ratio:.2%}  →  Parquet is {(1-ratio)*100:.1f}% smaller than CSV")
else:
    print(f"Ratio   : {ratio:.2%}  →  Parquet is {(ratio-1)*100:.1f}% larger (small corpus)")
Parquet :  123,192,287 bytes  (6 files)
CSV     :  359,635,037 bytes  (6 files)
Ratio   : 34.25%  →  Parquet is 65.7% smaller than CSV

7. Evidence & Metrics¶

Save all plans, write the final lab2_metrics_log.csv, print the checklist.

In [9]:
# ── Build complete metrics log ───────────────────────────────────────────────
metric_rows = [
    # r1 — corpus ingestion
    {"run_id": "r1", "task": "ingest", "term": "",
     "corpus_rows": n_docs, "unique_terms": 0,
     "latency_ms": 0, "parquet_bytes": 0, "csv_bytes": 0,
     "shuffle_read_bytes": 0, "shuffle_write_bytes": 0,
     "note": f"corpus ingestion — {n_docs} docs, avg {avg_len:.0f} chars",
     "timestamp": datetime.datetime.now().isoformat()},
    # r2 — index build
    {"run_id": "r2", "task": "build_index", "term": "",
     "corpus_rows": n_docs, "unique_terms": n_terms,
     "latency_ms": build_ms, "parquet_bytes": parquet_bytes, "csv_bytes": 0,
     "shuffle_read_bytes": 0, "shuffle_write_bytes": 0,
     "note": f"inverted index — {n_terms} unique terms, shuffle.partitions=8",
     "timestamp": datetime.datetime.now().isoformat()},
]

# r3..r(2+N) — query lookups
for i, qr in enumerate(query_results, start=3):
    metric_rows.append({
        "run_id": f"r{i}", "task": "query", "term": qr["term"],
        "corpus_rows": n_docs, "unique_terms": n_terms,
        "latency_ms": qr["latency_ms"],
        "parquet_bytes": parquet_bytes, "csv_bytes": 0,
        "shuffle_read_bytes": 0, "shuffle_write_bytes": 0,
        "note": f"term='{qr['term']}' freq={qr['freq']} postings={qr['n_postings']}",
        "timestamp": datetime.datetime.now().isoformat(),
    })

# footprint row
metric_rows.append({
    "run_id": f"r{len(metric_rows)+1}", "task": "footprint", "term": "",
    "corpus_rows": n_docs, "unique_terms": n_terms,
    "latency_ms": 0, "parquet_bytes": parquet_bytes, "csv_bytes": csv_bytes,
    "shuffle_read_bytes": 0, "shuffle_write_bytes": 0,
    "note": f"Parquet={parquet_bytes:,}B  CSV={csv_bytes:,}B  ratio={ratio:.2%}",
    "timestamp": datetime.datetime.now().isoformat(),
})

df_metrics = pd.DataFrame(metric_rows)
df_metrics.to_csv("lab2_metrics_log.csv", index=False)

print("lab2_metrics_log.csv written:")
print(df_metrics.to_string(index=False))

# ── Verify proof files ───────────────────────────────────────────────────────
print("\nProof files:")
for f in ["proof/plan_index_build.txt", "proof/plan_query.txt"]:
    exists = pathlib.Path(f).exists()
    size   = pathlib.Path(f).stat().st_size if exists else 0
    print(f"  {'[x]' if exists else '[ ]'} {f}  ({size} bytes)")
lab2_metrics_log.csv written:
run_id        task     term  corpus_rows  unique_terms   latency_ms  parquet_bytes  csv_bytes  shuffle_read_bytes  shuffle_write_bytes                                                      note                  timestamp
    r1      ingest               1526689             0     0.000000              0          0                   0                    0            corpus ingestion — 1526689 docs, avg 137 chars 2026-04-29T17:48:22.593488
    r2 build_index               1526689         11843 11111.599684      123192287          0                   0                    0 inverted index — 11843 unique terms, shuffle.partitions=8 2026-04-29T17:48:22.593504
    r3       query aircraft      1526689         11843  9424.816847      123192287          0                   0                    0             term='aircraft' freq=1526689 postings=1526689 2026-04-29T17:48:22.593827
    r4       query  landing      1526689         11843  1498.634577      123192287          0                   0                    0                          term='landing' freq=0 postings=0 2026-04-29T17:48:22.593840
    r5       query   squawk      1526689         11843  2888.571024      123192287          0                   0                    0               term='squawk' freq=1526689 postings=1526689 2026-04-29T17:48:22.593845
    r6       query altitude      1526689         11843  1583.830118      123192287          0                   0                    0             term='altitude' freq=1526689 postings=1526689 2026-04-29T17:48:22.593850
    r7       query  heading      1526689         11843  1655.486584      123192287          0                   0                    0              term='heading' freq=1526689 postings=1526689 2026-04-29T17:48:22.593875
    r8       query airborne      1526689         11843  1361.358166      123192287          0                   0                    0             term='airborne' freq=1384951 postings=1384951 2026-04-29T17:48:22.593883
    r9   footprint               1526689         11843     0.000000      123192287  359635037                   0                    0      Parquet=123,192,287B  CSV=359,635,037B  ratio=34.25% 2026-04-29T17:48:22.594100

Proof files:
  [x] proof/plan_index_build.txt  (2553 bytes)
  [x] proof/plan_query.txt  (990 bytes)

8. Cleanup¶

In [10]:
spark.stop()
print("SparkSession closed.")

print("\n── Deliverables checklist ──────────────────────────────────")
checks = [
    ("assignment2_esiee.ipynb",                 "this notebook"),
    (PARQUET_PATH,                              "Parquet inverted index"),
    (CSV_PATH,                                  "CSV inverted index"),
    ("proof/plan_index_build.txt",              "formatted index build plan"),
    ("proof/plan_query.txt",                    "formatted query plan"),
    ("lab2_metrics_log.csv",                    "latency + footprint metrics"),
    ("engineering_note_lab2.md",                "one-page engineering note"),
    ("GENAI.md",                                "AI usage declaration"),
]
for path, desc in checks:
    ok = pathlib.Path(path).exists()
    print(f"  {'[x]' if ok else '[ ]'} {path:45s} — {desc}")

print("\n  [ ] proof/spark_ui_jobs.png    - capture manually from http://localhost:4040")
print("  [ ] proof/spark_ui_sql.png     - capture manually from http://localhost:4040/SQL")
SparkSession closed.

── Deliverables checklist ──────────────────────────────────
  [x] assignment2_esiee.ipynb                       — this notebook
  [x] outputs/lab2/inverted_index                   — Parquet inverted index
  [x] outputs/lab2/inverted_index_csv               — CSV inverted index
  [x] proof/plan_index_build.txt                    — formatted index build plan
  [x] proof/plan_query.txt                          — formatted query plan
  [x] lab2_metrics_log.csv                          — latency + footprint metrics
  [x] engineering_note_lab2.md                      — one-page engineering note
  [x] GENAI.md                                      — AI usage declaration

  [ ] proof/spark_ui_jobs.png    - capture manually from http://localhost:4040
  [ ] proof/spark_ui_sql.png     - capture manually from http://localhost:4040/SQL
In [ ]: