DE2 - Assignment 2: Text - Inverted Index¶
Author : Badr TAJINI - Data Engineering II (Data-Intensive Workloads) - ESIEE 2025-2026
Track: D - Aviation (METAR weather reports + airport descriptions)
Names: Justine Guirauden and Volcy Desmazures
0. Setup¶
In [1]:
import os, time, pathlib, json, datetime, socket
import pandas as pd
import numpy as np
from contextlib import redirect_stdout
from urllib.parse import urlparse
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import StructType, StructField, StringType, LongType
# --- WSL NETWORK FIX ---
# Get the actual IP address of the WSL container
def get_wsl_ip():
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
s.close()
return ip
except:
return "127.0.0.1"
WSL_IP = get_wsl_ip()
# -----------------------
DE2_SPARK_DRIVER_HOST = WSL_IP # We force the driver to use the WSL IP
DE2_SPARK_BIND_ADDRESS = "0.0.0.0" # Listen on all interfaces
os.environ.setdefault("SPARK_LOCAL_IP", WSL_IP)
def show_spark_ui(spark_session):
ui_url = spark_session.sparkContext.uiWebUrl
print("Spark version:", spark_session.version)
if ui_url:
ui_port = urlparse(ui_url).port or 4040
# This will show you the direct IP and the localhost bridge
print(f"Spark UI (Direct IP): http://{WSL_IP}:{ui_port}")
print(f"Spark UI (Windows localhost): http://localhost:{ui_port}")
else:
print("Spark UI: not available")
# Stop any existing session before creating a new one to free the port
if 'spark' in locals():
spark.stop()
spark = (
SparkSession.builder
.appName("de2-assignment2-aviation")
.master("local[*]")
.config("spark.driver.host", DE2_SPARK_DRIVER_HOST)
.config("spark.driver.bindAddress", DE2_SPARK_BIND_ADDRESS)
.config("spark.ui.bindAddress", DE2_SPARK_BIND_ADDRESS)
# RAM configuration
.config("spark.driver.memory", "4g")
.config("spark.executor.memory", "4g")
.config("spark.driver.maxResultSize", "2g")
# Keep shuffle partitions low for local single-node cluster
.config("spark.sql.shuffle.partitions", "8")
.getOrCreate()
)
show_spark_ui(spark)
# ── Output / proof paths ─────────────────────────────────────────────────────
PARQUET_PATH = "outputs/lab2/inverted_index"
CSV_PATH = "outputs/lab2/inverted_index_csv"
CORPUS_PATH = "data/corpus/aviation_metar.csv"
for p in [PARQUET_PATH, CSV_PATH, "proof", "data/corpus"]:
pathlib.Path(p).mkdir(parents=True, exist_ok=True)
WARNING: Using incubator modules: jdk.incubator.vector Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 26/04/29 17:29:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark version: 4.0.1 Spark UI (Direct IP): http://172.30.130.70:4040 Spark UI (Windows localhost): http://localhost:4040
0b. Corpus preparation¶
This cell builds data/corpus/aviation_metar.csvfrom the OpenSky dataset used in Lab 1.
Skip if the file already exists.
In [2]:
if not pathlib.Path(CORPUS_PATH).exists():
LAB1_CSV = "data/raw_aviation/states_2017-06-05-00.csv"
try:
df_raw = pd.read_csv(LAB1_CSV)
def make_doc(row):
cs = str(row.get('callsign', 'UNKNOWN')).strip() or 'UNKNOWN'
alt = row.get('baroaltitude', 0) or 0
vel = row.get('velocity', 0) or 0
hdg = row.get('heading', 0) or 0
vrt = row.get('vertrate', 0) or 0
gnd = 'on ground' if row.get('onground') else 'airborne'
sq = str(row.get('squawk', '0000'))
return (
f"Aircraft {cs} is {gnd} squawk {sq} altitude {alt:.0f} meters "
f"velocity {vel:.1f} knots heading {hdg:.0f} degrees "
f"vertical rate {vrt:.1f} meters per second"
)
df_raw['doc_id'] = df_raw['icao24'].astype(str) + '_' + df_raw['time'].astype(str)
df_raw['text'] = df_raw.apply(make_doc, axis=1)
df_raw[['doc_id','text']].dropna().to_csv(CORPUS_PATH, index=False)
print(f"Corpus built from Lab 1 data: {len(df_raw):,} documents → {CORPUS_PATH}")
except FileNotFoundError:
# Minimal fallback corpus - enough for demo/grading
samples = [
("LFPG_001", "METAR LFPG 051200Z 27010KT 9999 FEW025 15/08 Q1015 aircraft approaching runway two seven"),
("LFPO_002", "METAR LFPO 051200Z 26015KT 8000 SCT020 14/09 Q1014 landing clearance aircraft on final approach"),
("LFMN_003", "METAR LFMN 051200Z 08010KT 9999 SKC 22/15 Q1018 visibility good aircraft taxiing runway zero four"),
("LFLL_004", "METAR LFLL 051200Z 20008KT 6000 BKN015 12/10 Q1013 fog patch aircraft holding short runway"),
("LFRS_005", "METAR LFRS 051200Z 31020KT 9999 FEW030 10/05 Q1020 wind shear alert aircraft go around"),
("LFRN_006", "METAR LFRN 051200Z 29012KT 9999 SCT035 11/06 Q1019 aircraft squawk seven seven zero zero emergency"),
("LFBO_007", "METAR LFBO 051200Z 24018KT 9999 FEW020 16/10 Q1016 aircraft airborne departure runway fourteen right"),
("LFBD_008", "METAR LFBD 051200Z 22014KT 9999 SCT025 17/11 Q1015 aircraft landing runway two three heading two twenty"),
("LFLY_009", "METAR LFLY 051200Z 19008KT 9999 FEW018 13/08 Q1017 aircraft climbing altitude five thousand feet"),
("LFST_010", "METAR LFST 051200Z 15005KT 9999 SKC 09/03 Q1022 aircraft descending altitude three thousand feet heading north"),
] * 40
samples = [(f"{d}_{i}", t) for i, (d, t) in enumerate(samples)]
pd.DataFrame(samples, columns=['doc_id','text']).to_csv(CORPUS_PATH, index=False)
print(f"Fallback corpus generated: {len(samples)} documents → {CORPUS_PATH}")
else:
print(f"Corpus found: {CORPUS_PATH}")
Corpus built from Lab 1 data: 1,526,689 documents → data/corpus/aviation_metar.csv
1. Corpus Ingestion¶
Load the METAR corpus with an explicit schema.
doc_id(StringType, non-nullable) - unique aircraft identifier + Unix timestamptext(StringType, nullable) - free-text weather report sentence
In [3]:
# ── Explicit schema ──────────────────────────────────────────────────────────
corpus_schema = StructType([
StructField("doc_id", StringType(), False),
StructField("text", StringType(), True),
])
df_corpus = (
spark.read
.schema(corpus_schema)
.option("header", "true")
.csv(CORPUS_PATH)
.withColumn("doc_len", F.length("text"))
)
n_docs = df_corpus.count()
avg_len = df_corpus.select(F.avg("doc_len")).first()[0] or 0
print(f"Documents : {n_docs:,}")
print(f"Avg length : {avg_len:.0f} chars")
df_corpus.printSchema()
df_corpus.show(5, truncate=100)
Documents : 1,526,689 Avg length : 137 chars root |-- doc_id: string (nullable = true) |-- text: string (nullable = true) |-- doc_len: integer (nullable = true) +-----------------+----------------------------------------------------------------------------------------------------+-------+ | doc_id| text|doc_len| +-----------------+----------------------------------------------------------------------------------------------------+-------+ |4bccb9_1496620800|Aircraft SXS2WY is airborne squawk 1000.0 altitude 7460 meters velocity 175.3 knots heading 305 d...| 139| |502cb2_1496620800|Aircraft MON55BR is airborne squawk 2770.0 altitude 10988 meters velocity 232.9 knots heading 305...| 142| |4bccaf_1496620800|Aircraft SXS7R is airborne squawk 3215.0 altitude 11582 meters velocity 196.7 knots heading 289 d...| 139| |4008e1_1496620800|Aircraft TCX229 is airborne squawk 3462.0 altitude 10363 meters velocity 214.2 knots heading 293 ...| 140| |4070e3_1496620800|Aircraft EXS14D is airborne squawk 7775.0 altitude 10973 meters velocity 221.8 knots heading 298 ...| 140| +-----------------+----------------------------------------------------------------------------------------------------+-------+ only showing top 5 rows
2. Text Normalization¶
Normalization pipeline:
- Lowercase - uniform case for token matching
- Strip punctuation -
regexp_replacekeeping only[a-z0-9\s] - Tokenize -
spliton[\s\W]+→ array of tokens - Explode → one (doc_id, token) row per token
- Filter empty strings and single-char tokens
- Remove stop-words - generic English + METAR-specific filler words
In [4]:
# ── Normalization pipeline ───────────────────────────────────────────────────
# Step 1-2: lowercase + remove non-alphanumeric
df_clean = df_corpus.withColumn(
"text_clean",
F.regexp_replace(F.lower(F.col("text")), r"[^a-z0-9\s]", "")
)
# Step 3: tokenize on whitespace / punctuation boundaries
df_tokens = df_clean.withColumn(
"tokens",
F.split(F.col("text_clean"), r"[\s\W]+")
)
# Step 4-5: explode + filter trivial tokens
df_exploded = (
df_tokens
.select("doc_id", F.explode("tokens").alias("token"))
.filter(F.length("token") > 1)
)
total_before = df_exploded.count()
print(f"Tokens BEFORE stop-word removal: {total_before:,}")
# Step 6: stop-word list (English + METAR filler)
STOP_WORDS = {
"the", "a", "an", "is", "are", "was", "were", "in", "on", "at",
"to", "for", "of", "and", "or", "not", "it", "this", "that", "with",
"be", "by", "as", "from", "but", "its", "if", "do", "no", "so",
"per", "has", "have", "had", "will", "would", "may", "can",
# METAR-specific filler
"meters", "meter", "degrees", "knots", "rate", "second", "seconds",
"zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine"
}
df_filtered = df_exploded.filter(~F.col("token").isin(STOP_WORDS))
total_after = df_filtered.count()
removed_pct = (total_before - total_after) / total_before * 100 if total_before > 0 else 0
print(f"Tokens AFTER stop-word removal: {total_after:,}")
print(f"Stop-words removed : {total_before - total_after:,} ({removed_pct:.1f}%)")
df_filtered.show(10, truncate=60)
Tokens BEFORE stop-word removal: 32,173,700
Tokens AFTER stop-word removal: 19,818,447 Stop-words removed : 12,355,253 (38.4%) +-----------------+--------+ | doc_id| token| +-----------------+--------+ |4bccb9_1496620800|aircraft| |4bccb9_1496620800| sxs2wy| |4bccb9_1496620800|airborne| |4bccb9_1496620800| squawk| |4bccb9_1496620800| 10000| |4bccb9_1496620800|altitude| |4bccb9_1496620800| 7460| |4bccb9_1496620800|velocity| |4bccb9_1496620800| 1753| |4bccb9_1496620800| heading| +-----------------+--------+ only showing top 10 rows
3. Build Inverted Index¶
Schema: token (String) | doc_ids (Array<String>) | freq (Long)
collect_list(doc_id)→ postings list (all documents containing this term)count(*)→ document frequency (df)
In [5]:
# ── Inverted index construction ──────────────────────────────────────────────
t0_build = time.time()
inverted_index = (
df_filtered
.groupBy("token")
.agg(
F.collect_list("doc_id").alias("doc_ids"), # postings list
F.count("*") .alias("freq") # document frequency
)
.orderBy(F.desc("freq")) # rank by most frequent
)
n_terms = inverted_index.count()
build_ms = (time.time() - t0_build) * 1000
print(f"Unique terms : {n_terms:,}")
print(f"Build time : {build_ms:.0f} ms")
print()
inverted_index.printSchema()
inverted_index.show(20, truncate=80)
Unique terms : 11,843 Build time : 11112 ms root |-- token: string (nullable = false) |-- doc_ids: array (nullable = false) | |-- element: string (containsNull = false) |-- freq: long (nullable = false)
+--------+--------------------------------------------------------------------------------+-------+ | token| doc_ids| freq| +--------+--------------------------------------------------------------------------------+-------+ | nan|[7c6c27_1496621230, ad4fe1_1496620810, 7c6c27_1496621230, ad4fe1_1496620810, ...|2429395| |aircraft|[7c6c27_1496621230, 4ca351_1496620800, a0f46f_1496621230, 400ff4_1496620800, ...|1526689| | squawk|[7c6d9c_1496621230, 4ca351_1496620800, 7c6c27_1496621230, 400ff4_1496620800, ...|1526689| |altitude|[7c6d9c_1496621230, 4ca351_1496620800, 7c6c27_1496621230, 400ff4_1496620800, ...|1526689| | heading|[7c6d9c_1496621230, 4ca351_1496620800, 7c6c27_1496621230, 400ff4_1496620800, ...|1526689| |vertical|[7c6d9c_1496621230, 4ca351_1496620800, 7c6c27_1496621230, 400ff4_1496620800, ...|1526689| |velocity|[7c6d9c_1496621230, 4ca351_1496620800, 7c6c27_1496621230, 400ff4_1496620800, ...|1526689| |airborne|[7c6d9c_1496621230, 4ca351_1496620800, a0f46f_1496621230, 400ff4_1496620800, ...|1384951| | 00|[71c258_1496621240, a0f566_1496621650, a684c4_1496621240, 4baa90_1496621650, ...| 405336| | 03|[a657bf_1496621650, abd90c_1496621240, ad5623_1496621650, a1fb8f_1496621240, ...| 234044| | ground|[7c6c27_1496621230, ad4fe1_1496620810, adda1e_1496621230, a95d95_1496620810, ...| 141738| | 10973|[ab6cc9_1496621650, 4841c5_1496621240, 4841c5_1496621650, 40643e_1496621240, ...| 72734| | 11278|[06a046_1496621650, a3ef1b_1496621240, a93d9f_1496621650, ad4cf2_1496621240, ...| 58302| | 10668|[a699b0_1496621650, abe3e6_1496621240, 4ca068_1496621650, 71c043_1496621240, ...| 57388| | 10363|[a33327_1496621650, 71c258_1496621240, a1512f_1496621650, a5a11a_1496621240, ...| 51652| | 11582|[401000_1496621650, a82520_1496621240, c0809d_1496621650, a5fd2d_1496621240, ...| 49322| | 11887|[a2396e_1496621650, 8a04bd_1496621240, a1c66a_1496621650, a582a9_1496621240, ...| 28690| | 07|[aa7115_1496621650, a9c5c5_1496621240, a05d0c_1496621650, acdde3_1496621240, ...| 28106| | 180|[338086_1496621230, 578055_1496620810, 3382f1_1496621230, a4caec_1496620810, ...| 28089| | 10058|[a0f566_1496621650, a769a3_1496621240, a68daf_1496621650, a00ce0_1496621240, ...| 26290| +--------+--------------------------------------------------------------------------------+-------+ only showing top 20 rows
4. Write to Parquet & CSV¶
- Parquet: native array column → Spark can push down
token == Xfilter to row-group statistics - CSV: array serialized as
|-joined string → no column pruning, larger on disk
In [6]:
# ── Write Parquet ────────────────────────────────────────────────────────────
t0 = time.time()
inverted_index.write.mode("overwrite").parquet(PARQUET_PATH)
parquet_write_ms = (time.time() - t0) * 1000
print(f"Parquet written: {parquet_write_ms:.0f} ms → {PARQUET_PATH}")
# ── Write CSV (doc_ids array → pipe-separated string) ────────────────────────
t0 = time.time()
(
inverted_index
.withColumn("doc_ids", F.concat_ws("|", "doc_ids")) # serialize array
.write
.mode("overwrite")
.option("header", "true")
.csv(CSV_PATH)
)
csv_write_ms = (time.time() - t0) * 1000
print(f"CSV written: {csv_write_ms:.0f} ms → {CSV_PATH}")
Parquet written: 40737 ms → outputs/lab2/inverted_index
CSV written: 42264 ms → outputs/lab2/inverted_index_csv
5. Query Latency¶
Reload from Parquet, cache, look up ≥ 3 aviation-relevant terms, record wall-clock latency.
In [7]:
# ── Reload + cache index ─────────────────────────────────────────────────────
idx = spark.read.parquet(PARQUET_PATH)
idx.cache()
idx.count() # trigger cache population
print("Index cached. Schema:", idx.dtypes)
# ── Save index build plan ────────────────────────────────────────────────────
with open("proof/plan_index_build.txt", "w") as f:
with redirect_stdout(f):
inverted_index.explain(mode="formatted")
print("plan_index_build.txt ✓")
# ── Query lookups ────────────────────────────────────────────────────────────
# Terms chosen for aviation relevance: should appear in METAR / OpenSky text
QUERY_TERMS = ["aircraft", "landing", "squawk", "altitude", "heading", "airborne"]
query_results = []
print()
for term in QUERY_TERMS:
t0 = time.time()
rows = idx.filter(F.col("token") == term).collect()
latency_ms = (time.time() - t0) * 1000
if rows:
r = rows[0]
n_postings = len(r["doc_ids"])
print(f" '{term:12s}': freq={r['freq']:>6,} postings={n_postings:>6,} latency={latency_ms:6.1f} ms")
query_results.append({"term": term, "freq": r["freq"],
"n_postings": n_postings, "latency_ms": latency_ms})
else:
print(f" '{term:12s}': NOT FOUND latency={latency_ms:6.1f} ms")
query_results.append({"term": term, "freq": 0, "n_postings": 0, "latency_ms": latency_ms})
# ── Save query plan (first term as representative) ───────────────────────────
with open("proof/plan_query.txt", "w") as f:
with redirect_stdout(f):
idx.filter(F.col("token") == QUERY_TERMS[0]).explain(mode="formatted")
print("\nproof/plan_query.txt ✓")
print(f"Open Spark UI → Jobs/SQL tabs → take screenshots → save to proof/")
Index cached. Schema: [('token', 'string'), ('doc_ids', 'array<string>'), ('freq', 'bigint')]
plan_index_build.txt ✓
'aircraft ': freq=1,526,689 postings=1,526,689 latency=9424.8 ms 'landing ': NOT FOUND latency=1498.6 ms 'squawk ': freq=1,526,689 postings=1,526,689 latency=2888.6 ms 'altitude ': freq=1,526,689 postings=1,526,689 latency=1583.8 ms 'heading ': freq=1,526,689 postings=1,526,689 latency=1655.5 ms 'airborne ': freq=1,384,951 postings=1,384,951 latency=1361.4 ms proof/plan_query.txt ✓ Open Spark UI → Jobs/SQL tabs → take screenshots → save to proof/
6. Footprint Comparison — Parquet vs CSV¶
In [8]:
# ── On-disk size comparison ──────────────────────────────────────────────────
def dir_size(path):
return sum(f.stat().st_size for f in pathlib.Path(path).rglob("*") if f.is_file())
def file_count(path):
return sum(1 for f in pathlib.Path(path).rglob("*") if f.is_file())
parquet_bytes = dir_size(PARQUET_PATH)
csv_bytes = dir_size(CSV_PATH)
parquet_files = file_count(PARQUET_PATH)
csv_files = file_count(CSV_PATH)
ratio = parquet_bytes / csv_bytes if csv_bytes > 0 else 0
print(f"Parquet : {parquet_bytes:>12,} bytes ({parquet_files} files)")
print(f"CSV : {csv_bytes:>12,} bytes ({csv_files} files)")
if ratio < 1:
print(f"Ratio : {ratio:.2%} → Parquet is {(1-ratio)*100:.1f}% smaller than CSV")
else:
print(f"Ratio : {ratio:.2%} → Parquet is {(ratio-1)*100:.1f}% larger (small corpus)")
Parquet : 123,192,287 bytes (6 files) CSV : 359,635,037 bytes (6 files) Ratio : 34.25% → Parquet is 65.7% smaller than CSV
7. Evidence & Metrics¶
Save all plans, write the final lab2_metrics_log.csv, print the checklist.
In [9]:
# ── Build complete metrics log ───────────────────────────────────────────────
metric_rows = [
# r1 — corpus ingestion
{"run_id": "r1", "task": "ingest", "term": "",
"corpus_rows": n_docs, "unique_terms": 0,
"latency_ms": 0, "parquet_bytes": 0, "csv_bytes": 0,
"shuffle_read_bytes": 0, "shuffle_write_bytes": 0,
"note": f"corpus ingestion — {n_docs} docs, avg {avg_len:.0f} chars",
"timestamp": datetime.datetime.now().isoformat()},
# r2 — index build
{"run_id": "r2", "task": "build_index", "term": "",
"corpus_rows": n_docs, "unique_terms": n_terms,
"latency_ms": build_ms, "parquet_bytes": parquet_bytes, "csv_bytes": 0,
"shuffle_read_bytes": 0, "shuffle_write_bytes": 0,
"note": f"inverted index — {n_terms} unique terms, shuffle.partitions=8",
"timestamp": datetime.datetime.now().isoformat()},
]
# r3..r(2+N) — query lookups
for i, qr in enumerate(query_results, start=3):
metric_rows.append({
"run_id": f"r{i}", "task": "query", "term": qr["term"],
"corpus_rows": n_docs, "unique_terms": n_terms,
"latency_ms": qr["latency_ms"],
"parquet_bytes": parquet_bytes, "csv_bytes": 0,
"shuffle_read_bytes": 0, "shuffle_write_bytes": 0,
"note": f"term='{qr['term']}' freq={qr['freq']} postings={qr['n_postings']}",
"timestamp": datetime.datetime.now().isoformat(),
})
# footprint row
metric_rows.append({
"run_id": f"r{len(metric_rows)+1}", "task": "footprint", "term": "",
"corpus_rows": n_docs, "unique_terms": n_terms,
"latency_ms": 0, "parquet_bytes": parquet_bytes, "csv_bytes": csv_bytes,
"shuffle_read_bytes": 0, "shuffle_write_bytes": 0,
"note": f"Parquet={parquet_bytes:,}B CSV={csv_bytes:,}B ratio={ratio:.2%}",
"timestamp": datetime.datetime.now().isoformat(),
})
df_metrics = pd.DataFrame(metric_rows)
df_metrics.to_csv("lab2_metrics_log.csv", index=False)
print("lab2_metrics_log.csv written:")
print(df_metrics.to_string(index=False))
# ── Verify proof files ───────────────────────────────────────────────────────
print("\nProof files:")
for f in ["proof/plan_index_build.txt", "proof/plan_query.txt"]:
exists = pathlib.Path(f).exists()
size = pathlib.Path(f).stat().st_size if exists else 0
print(f" {'[x]' if exists else '[ ]'} {f} ({size} bytes)")
lab2_metrics_log.csv written:
run_id task term corpus_rows unique_terms latency_ms parquet_bytes csv_bytes shuffle_read_bytes shuffle_write_bytes note timestamp
r1 ingest 1526689 0 0.000000 0 0 0 0 corpus ingestion — 1526689 docs, avg 137 chars 2026-04-29T17:48:22.593488
r2 build_index 1526689 11843 11111.599684 123192287 0 0 0 inverted index — 11843 unique terms, shuffle.partitions=8 2026-04-29T17:48:22.593504
r3 query aircraft 1526689 11843 9424.816847 123192287 0 0 0 term='aircraft' freq=1526689 postings=1526689 2026-04-29T17:48:22.593827
r4 query landing 1526689 11843 1498.634577 123192287 0 0 0 term='landing' freq=0 postings=0 2026-04-29T17:48:22.593840
r5 query squawk 1526689 11843 2888.571024 123192287 0 0 0 term='squawk' freq=1526689 postings=1526689 2026-04-29T17:48:22.593845
r6 query altitude 1526689 11843 1583.830118 123192287 0 0 0 term='altitude' freq=1526689 postings=1526689 2026-04-29T17:48:22.593850
r7 query heading 1526689 11843 1655.486584 123192287 0 0 0 term='heading' freq=1526689 postings=1526689 2026-04-29T17:48:22.593875
r8 query airborne 1526689 11843 1361.358166 123192287 0 0 0 term='airborne' freq=1384951 postings=1384951 2026-04-29T17:48:22.593883
r9 footprint 1526689 11843 0.000000 123192287 359635037 0 0 Parquet=123,192,287B CSV=359,635,037B ratio=34.25% 2026-04-29T17:48:22.594100
Proof files:
[x] proof/plan_index_build.txt (2553 bytes)
[x] proof/plan_query.txt (990 bytes)
8. Cleanup¶
In [10]:
spark.stop()
print("SparkSession closed.")
print("\n── Deliverables checklist ──────────────────────────────────")
checks = [
("assignment2_esiee.ipynb", "this notebook"),
(PARQUET_PATH, "Parquet inverted index"),
(CSV_PATH, "CSV inverted index"),
("proof/plan_index_build.txt", "formatted index build plan"),
("proof/plan_query.txt", "formatted query plan"),
("lab2_metrics_log.csv", "latency + footprint metrics"),
("engineering_note_lab2.md", "one-page engineering note"),
("GENAI.md", "AI usage declaration"),
]
for path, desc in checks:
ok = pathlib.Path(path).exists()
print(f" {'[x]' if ok else '[ ]'} {path:45s} — {desc}")
print("\n [ ] proof/spark_ui_jobs.png - capture manually from http://localhost:4040")
print(" [ ] proof/spark_ui_sql.png - capture manually from http://localhost:4040/SQL")
SparkSession closed. ── Deliverables checklist ────────────────────────────────── [x] assignment2_esiee.ipynb — this notebook [x] outputs/lab2/inverted_index — Parquet inverted index [x] outputs/lab2/inverted_index_csv — CSV inverted index [x] proof/plan_index_build.txt — formatted index build plan [x] proof/plan_query.txt — formatted query plan [x] lab2_metrics_log.csv — latency + footprint metrics [x] engineering_note_lab2.md — one-page engineering note [x] GENAI.md — AI usage declaration [ ] proof/spark_ui_jobs.png - capture manually from http://localhost:4040 [ ] proof/spark_ui_sql.png - capture manually from http://localhost:4040/SQL
In [ ]: