DE2 - Lab 2: Text Processing - Inverted Index Pipeline (15%)¶
Author : Badr TAJINI - Data Engineering II (Data-Intensive Workloads) - ESIEE 2025-2026
Track: D - Aviation (METAR weather reports + airport descriptions)
Goal: Ingest a text corpus, tokenize and normalize, build an inverted index, measure query latency, compare Parquet vs CSV storage.
In [1]:
import os, time, pathlib, json
import pandas as pd
from uuid import UUID
from contextlib import redirect_stdout
from urllib.parse import urlparse
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import StructType, StructField, StringType, LongType
DE2_SPARK_DRIVER_HOST = os.environ.get("DE2_SPARK_DRIVER_HOST", "127.0.0.1")
DE2_SPARK_BIND_ADDRESS = os.environ.get("DE2_SPARK_BIND_ADDRESS", "0.0.0.0")
os.environ.setdefault("SPARK_LOCAL_IP", DE2_SPARK_DRIVER_HOST)
def show_spark_ui(spark_session):
ui_url = spark_session.sparkContext.uiWebUrl
print("Spark version:", spark_session.version)
if ui_url:
ui_port = urlparse(ui_url).port or 4040
print("Spark UI:", ui_url)
print("Spark UI (WSL/Windows browser):", f"http://localhost:{ui_port}")
else:
print("Spark UI: not available")
spark = (
SparkSession.builder
.appName("DE2-Lab2-TextProcessing")
.master("local[*]")
.config("spark.driver.host", DE2_SPARK_DRIVER_HOST)
.config("spark.driver.bindAddress", DE2_SPARK_BIND_ADDRESS)
.config("spark.ui.bindAddress", DE2_SPARK_BIND_ADDRESS)
# RAM configuration
.config("spark.driver.memory", "4g")
.config("spark.executor.memory", "4g")
.config("spark.driver.maxResultSize", "2g")
# Reduce shuffle partitions: no more than 8 cores on local mode
.config("spark.sql.shuffle.partitions", "8")
.getOrCreate()
)
show_spark_ui(spark)
WARNING: Using incubator modules: jdk.incubator.vector Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 26/04/29 10:57:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark version: 4.0.1 Spark UI: http://127.0.0.1:4040 Spark UI (WSL/Windows browser): http://localhost:4040
1. Corpus Ingestion¶
Track D - Aviation:
- Source: METAR weather reports + airport descriptions
- Each row = one METAR report (one document), identified by
doc_id(station code + timestamp) - If you only have the OpenSky CSV from Lab 1, the cell below also shows how to generate a synthetic METAR-like corpus for testing.
Expected file: data/corpus/aviation_metar.csv with columns doc_id, text
In [2]:
# ── Generate a synthetic METAR corpus from your Lab 1 dataset ───
import os, pathlib
import pandas as pd
METAR_CSV = "data/corpus/aviation_metar.csv"
pathlib.Path("data/corpus").mkdir(parents=True, exist_ok=True)
if not os.path.exists(METAR_CSV):
# Build synthetic documents from the OpenSky CSV used in Lab 1
LAB1_CSV = "data/raw_aviation/states_2017-06-05-00.csv"
try:
df_raw = pd.read_csv(LAB1_CSV)
# Build one synthetic METAR-like sentence per aircraft record
def make_metar(row):
cs = str(row.get('callsign', 'UNKNOWN')).strip() or 'UNKNOWN'
alt = row.get('baroaltitude', 0) or 0
vel = row.get('velocity', 0) or 0
hdg = row.get('heading', 0) or 0
vrt = row.get('vertrate', 0) or 0
gnd = 'on ground' if row.get('onground') else 'airborne'
sq = str(row.get('squawk', '0000'))
return (
f"Aircraft {cs} is {gnd} squawk {sq} altitude {alt:.0f} meters "
f"velocity {vel:.1f} knots heading {hdg:.0f} degrees "
f"vertical rate {vrt:.1f} meters per second"
)
df_raw['doc_id'] = df_raw['icao24'].astype(str) + '_' + df_raw['time'].astype(str)
df_raw['text'] = df_raw.apply(make_metar, axis=1)
df_corpus = df_raw[['doc_id', 'text']].dropna()
df_corpus.to_csv(METAR_CSV, index=False)
print(f"Synthetic METAR corpus generated: {len(df_corpus)} documents → {METAR_CSV}")
except FileNotFoundError:
# Fallback: generate a small demo corpus
metar_samples = [
("LFPG_001", "METAR LFPG 051200Z 27010KT 9999 FEW025 15/08 Q1015 NOSIG aircraft approaching runway two seven"),
("LFPO_001", "METAR LFPO 051200Z 26015KT 8000 SCT020 14/09 Q1014 landing clearance issued aircraft on final"),
("LFMN_001", "METAR LFMN 051200Z 08010KT 9999 SKC 22/15 Q1018 visibility good aircraft taxiing runway zero four"),
("LFLL_001", "METAR LFLL 051200Z 20008KT 6000 BKN015 12/10 Q1013 fog patch aircraft holding short runway three six"),
("LFRS_001", "METAR LFRS 051200Z 31020KT 9999 FEW030 10/05 Q1020 wind shear alert aircraft go around"),
("LFRN_001", "METAR LFRN 051200Z 29012KT 9999 SCT035 11/06 Q1019 aircraft squawk seven seven zero zero emergency"),
("LFBO_001", "METAR LFBO 051200Z 24018KT 9999 FEW020 16/10 Q1016 aircraft airborne departure runway one four right"),
("LFBD_001", "METAR LFBD 051200Z 22014KT 9999 SCT025 17/11 Q1015 aircraft landing runway two three heading two two zero"),
] * 50 # repeat to get a meaningful corpus
# Make doc_ids unique
metar_samples = [(f"{d}_{i}", t) for i, (d, t) in enumerate(metar_samples)]
pd.DataFrame(metar_samples, columns=['doc_id','text']).to_csv(METAR_CSV, index=False)
print(f"Demo METAR corpus generated: {len(metar_samples)} documents → {METAR_CSV}")
else:
print(f"Corpus already exists: {METAR_CSV}")
Synthetic METAR corpus generated: 1526689 documents → data/corpus/aviation_metar.csv
In [2]:
# ── Load corpus with explicit schema ────────────────────────────────────────
schema = StructType([
StructField("doc_id", StringType(), False),
StructField("text", StringType(), True),
])
CORPUS_PATH = "data/corpus/aviation_metar.csv"
df_corpus = (
spark.read
.schema(schema)
.option("header", "true")
.csv(CORPUS_PATH)
)
# Basic stats
df_corpus = df_corpus.withColumn("doc_len", F.length("text"))
n_docs = df_corpus.count()
avg_len = df_corpus.select(F.avg("doc_len")).first()[0] or 0
print(f"Documents : {n_docs:,}")
print(f"Avg length : {avg_len:.0f} chars")
df_corpus.show(5, truncate=100)
Documents : 1,526,689 Avg length : 137 chars +-----------------+----------------------------------------------------------------------------------------------------+-------+ | doc_id| text|doc_len| +-----------------+----------------------------------------------------------------------------------------------------+-------+ |4bccb9_1496620800|Aircraft SXS2WY is airborne squawk 1000.0 altitude 7460 meters velocity 175.3 knots heading 305 d...| 139| |502cb2_1496620800|Aircraft MON55BR is airborne squawk 2770.0 altitude 10988 meters velocity 232.9 knots heading 305...| 142| |4bccaf_1496620800|Aircraft SXS7R is airborne squawk 3215.0 altitude 11582 meters velocity 196.7 knots heading 289 d...| 139| |4008e1_1496620800|Aircraft TCX229 is airborne squawk 3462.0 altitude 10363 meters velocity 214.2 knots heading 293 ...| 140| |4070e3_1496620800|Aircraft EXS14D is airborne squawk 7775.0 altitude 10973 meters velocity 221.8 knots heading 298 ...| 140| +-----------------+----------------------------------------------------------------------------------------------------+-------+ only showing top 5 rows
2. Text Normalization — Lowercase, Tokenize, Remove Stop-Words¶
Pipeline:
- Lowercase + strip non-alphanumeric characters with
regexp_replace - Split on whitespace/punctuation:
split(text, "[\\s\\W]+") - Explode array → one row per (doc_id, token)
- Filter empty strings, then filter stop-words via broadcast set
In [3]:
# ── Step 1-2: Lowercase + tokenize ──────────────────────────────────────────
df_clean = df_corpus.withColumn(
"text_clean",
F.regexp_replace(F.lower(F.col("text")), r"[^a-z0-9\s]", "")
)
df_tokens = df_clean.withColumn(
"tokens",
F.split(F.col("text_clean"), r"[\s\W]+") # split on whitespace/punctuation
)
# ── Step 3: Explode → (doc_id, token) ────────────────────────────────────────
df_exploded = (
df_tokens
.select("doc_id", F.explode("tokens").alias("token"))
.filter(F.length("token") > 1) # drop single-char and empty tokens
)
total_before = df_exploded.count()
print(f"Total tokens BEFORE stop-word removal: {total_before:,}")
# ── Step 4: Remove stop-words ────────────────────────────────────────────────
# Aviation-aware stop list: generic English + common METAR filler words
STOP_WORDS = {
"the", "a", "an", "is", "are", "was", "were", "in", "on", "at",
"to", "for", "of", "and", "or", "not", "it", "this", "that", "with",
"be", "by", "as", "from", "but", "its", "if", "do", "no", "so",
"per", "has", "have", "had", "will", "would", "may", "can",
# METAR fillers
"meters", "meter", "degrees", "knots", "rate", "second", "seconds"
}
df_filtered = df_exploded.filter(~F.col("token").isin(STOP_WORDS))
total_after = df_filtered.count()
print(f"Total tokens AFTER stop-word removal: {total_after:,}")
print(f"Stop-words removed : {total_before - total_after:,} ({(total_before - total_after)/total_before*100:.1f}%)")
df_filtered.show(10, truncate=60)
Total tokens BEFORE stop-word removal: 32,173,700
Total tokens AFTER stop-word removal: 19,818,447 Stop-words removed : 12,355,253 (38.4%) +-----------------+--------+ | doc_id| token| +-----------------+--------+ |4bccb9_1496620800|aircraft| |4bccb9_1496620800| sxs2wy| |4bccb9_1496620800|airborne| |4bccb9_1496620800| squawk| |4bccb9_1496620800| 10000| |4bccb9_1496620800|altitude| |4bccb9_1496620800| 7460| |4bccb9_1496620800|velocity| |4bccb9_1496620800| 1753| |4bccb9_1496620800| heading| +-----------------+--------+ only showing top 10 rows
3. Build Inverted Index¶
Group by token → collect list of doc_id (postings list) + document frequency (freq).
In [4]:
# ── Build inverted index ─────────────────────────────────────────────────────
inverted_index = (
df_filtered
.groupBy("token")
.agg(
F.collect_list("doc_id").alias("doc_ids"), # postings list
F.count("*") .alias("freq") # document frequency
)
.orderBy(F.desc("freq")) # most frequent terms first
)
n_terms = inverted_index.count()
print(f"Unique terms in index: {n_terms:,}")
# Show top terms (most frequent)
inverted_index.show(20, truncate=80)
Unique terms in index: 11,843
+--------+--------------------------------------------------------------------------------+-------+ | token| doc_ids| freq| +--------+--------------------------------------------------------------------------------+-------+ | nan|[7c6c27_1496621230, ad4fe1_1496620810, 7c6c27_1496621230, ad4fe1_1496620810, ...|2429395| |aircraft|[7c6c27_1496621230, 4ca351_1496620800, a0f46f_1496621230, 400ff4_1496620800, ...|1526689| | squawk|[7c6d9c_1496621230, 4ca351_1496620800, 7c6c27_1496621230, 400ff4_1496620800, ...|1526689| |altitude|[7c6d9c_1496621230, 4ca351_1496620800, 7c6c27_1496621230, 400ff4_1496620800, ...|1526689| |velocity|[7c6d9c_1496621230, 4ca351_1496620800, 7c6c27_1496621230, 400ff4_1496620800, ...|1526689| |vertical|[7c6d9c_1496621230, 4ca351_1496620800, 7c6c27_1496621230, 400ff4_1496620800, ...|1526689| | heading|[7c6d9c_1496621230, 4ca351_1496620800, 7c6c27_1496621230, 400ff4_1496620800, ...|1526689| |airborne|[7c6d9c_1496621230, 4ca351_1496620800, a0f46f_1496621230, 400ff4_1496620800, ...|1384951| | 00|[71c258_1496621240, a0f566_1496621650, a684c4_1496621240, 4baa90_1496621650, ...| 405336| | 03|[a657bf_1496621650, abd90c_1496621240, ad5623_1496621650, a1fb8f_1496621240, ...| 234044| | ground|[7c6c27_1496621230, ad4fe1_1496620810, adda1e_1496621230, a95d95_1496620810, ...| 141738| | 10973|[ab6cc9_1496621650, 4841c5_1496621240, 4841c5_1496621650, 40643e_1496621240, ...| 72734| | 11278|[06a046_1496621650, a3ef1b_1496621240, a93d9f_1496621650, ad4cf2_1496621240, ...| 58302| | 10668|[a699b0_1496621650, abe3e6_1496621240, 4ca068_1496621650, 71c043_1496621240, ...| 57388| | 10363|[a33327_1496621650, 71c258_1496621240, a1512f_1496621650, a5a11a_1496621240, ...| 51652| | 11582|[401000_1496621650, a82520_1496621240, c0809d_1496621650, a5fd2d_1496621240, ...| 49322| | 11887|[a2396e_1496621650, 8a04bd_1496621240, a1c66a_1496621650, a582a9_1496621240, ...| 28690| | 07|[aa7115_1496621650, a9c5c5_1496621240, a05d0c_1496621650, acdde3_1496621240, ...| 28106| | 180|[338086_1496621230, 578055_1496620810, 3382f1_1496621230, a4caec_1496620810, ...| 28089| | 10058|[a0f566_1496621650, a769a3_1496621240, a68daf_1496621650, a00ce0_1496621240, ...| 26290| +--------+--------------------------------------------------------------------------------+-------+ only showing top 20 rows
4. Write Inverted Index - Parquet and CSV¶
- Parquet: native array type → efficient column pruning on
tokenfilter - CSV:
doc_idsarray serialized as comma-joined string → larger footprint, no column pruning
In [5]:
PARQUET_PATH = "outputs/lab2/inverted_index"
CSV_PATH = "outputs/lab2/inverted_index_csv"
pathlib.Path(PARQUET_PATH).mkdir(parents=True, exist_ok=True)
pathlib.Path(CSV_PATH).mkdir(parents=True, exist_ok=True)
# ── Write Parquet ────────────────────────────────────────────────────────────
t0 = time.time()
inverted_index.write.mode("overwrite").parquet(PARQUET_PATH)
parquet_write_ms = (time.time() - t0) * 1000
print(f"Parquet written in {parquet_write_ms:.0f} ms → {PARQUET_PATH}")
# ── Write CSV (doc_ids array → pipe-separated string) ────────────────────────
t0 = time.time()
(
inverted_index
.withColumn("doc_ids", F.concat_ws("|", "doc_ids")) # serialize array for CSV
.write
.mode("overwrite")
.option("header", "true")
.csv(CSV_PATH)
)
csv_write_ms = (time.time() - t0) * 1000
print(f"CSV written in {csv_write_ms:.0f} ms → {CSV_PATH}")
Parquet written in 36354 ms → outputs/lab2/inverted_index
CSV written in 29962 ms → outputs/lab2/inverted_index_csv
5. Query Latency Measurement¶
Read back from Parquet (simulates production query), cache the index, then look up aviation-relevant terms.
Measure wall-clock latency with time.time() and capture the physical plan.
In [ ]:
# ── Reload index from Parquet + cache ────────────────────────────────────────
idx = spark.read.parquet(PARQUET_PATH)
idx.cache()
idx.count() # trigger cache population
print("Index cached.")
print(f"Schema: {idx.dtypes}")
# ── Save index build plan ────────────────────────────────────────────────────
pathlib.Path("proof").mkdir(parents=True, exist_ok=True)
with open("proof/plan_index_build.txt", "w") as f:
with redirect_stdout(f):
inverted_index.explain(mode="formatted")
print("plan_index_build.txt saved.")
# ── Query 3 aviation-relevant terms ─────────────────────────────────────────
# Terms expected to appear in METAR reports / aviation text
QUERY_TERMS = ["aircraft", "landing", "squawk", "altitude", "heading", "airborne"]
query_results = []
for term in QUERY_TERMS:
t0 = time.time()
rows = idx.filter(F.col("token") == term).collect()
latency_ms = (time.time() - t0) * 1000
if rows:
r = rows[0]
print(f" '{term}': freq={r['freq']:,}, postings={len(r['doc_ids']):,}, latency={latency_ms:.1f} ms")
query_results.append({"term": term, "freq": r["freq"],
"n_postings": len(r["doc_ids"]), "latency_ms": latency_ms})
else:
print(f" '{term}': NOT FOUND, latency={latency_ms:.1f} ms")
query_results.append({"term": term, "freq": 0, "n_postings": 0, "latency_ms": latency_ms})
# ── Save query plan (first term) ─────────────────────────────────────────────
with open("proof/plan_query.txt", "w") as f:
with redirect_stdout(f):
idx.filter(F.col("token") == QUERY_TERMS[0]).explain(mode="formatted")
print("proof/plan_query.txt saved.")
Index cached.
Schema: [('token', 'string'), ('doc_ids', 'array<string>'), ('freq', 'bigint')]
plan_index_build.txt saved.
'aircraft': freq=1,526,689, postings=1,526,689, latency=4494.6 ms
'landing': NOT FOUND, latency=87.3 ms
'squawk': freq=1,526,689, postings=1,526,689, latency=3309.1 ms
'altitude': freq=1,526,689, postings=1,526,689, latency=2026.1 ms
'heading': freq=1,526,689, postings=1,526,689, latency=1341.2 ms
'airborne': freq=1,384,951, postings=1,384,951, latency=1001.5 ms
proof/plan_query.txt saved.
6. Storage Footprint Comparison — Parquet vs CSV¶
In [7]:
# ── On-disk footprint ────────────────────────────────────────────────────────
def dir_size_bytes(path):
return sum(
f.stat().st_size
for f in pathlib.Path(path).rglob("*")
if f.is_file()
)
def dir_file_count(path):
return sum(1 for f in pathlib.Path(path).rglob("*") if f.is_file())
parquet_bytes = dir_size_bytes(PARQUET_PATH)
csv_bytes = dir_size_bytes(CSV_PATH)
parquet_files = dir_file_count(PARQUET_PATH)
csv_files = dir_file_count(CSV_PATH)
ratio = parquet_bytes / csv_bytes if csv_bytes > 0 else 0
print(f"Parquet : {parquet_bytes:>12,} bytes ({parquet_files} files)")
print(f"CSV : {csv_bytes:>12,} bytes ({csv_files} files)")
print(f"Ratio : {ratio:.2%} (Parquet is {(1-ratio)*100:.1f}% smaller)" if ratio < 1
else f"Ratio : {ratio:.2%}")
# ── Fill metrics log ─────────────────────────────────────────────────────────
import datetime
rows = [
{"run_id": "r1", "task": "ingest", "term": "",
"corpus_rows": n_docs, "unique_terms": 0,
"latency_ms": 0, "parquet_bytes": 0, "csv_bytes": 0,
"shuffle_read_bytes": 0, "shuffle_write_bytes": 0,
"note": f"corpus ingestion ({n_docs} docs)",
"timestamp": datetime.datetime.now().isoformat()},
{"run_id": "r2", "task": "build_index", "term": "",
"corpus_rows": n_docs, "unique_terms": n_terms,
"latency_ms": parquet_write_ms, "parquet_bytes": parquet_bytes, "csv_bytes": 0,
"shuffle_read_bytes": 0, "shuffle_write_bytes": 0,
"note": f"inverted index construction ({n_terms} unique terms)",
"timestamp": datetime.datetime.now().isoformat()},
{"run_id": "r6", "task": "footprint", "term": "",
"corpus_rows": n_docs, "unique_terms": n_terms,
"latency_ms": 0, "parquet_bytes": parquet_bytes, "csv_bytes": csv_bytes,
"shuffle_read_bytes": 0, "shuffle_write_bytes": 0,
"note": f"Parquet vs CSV — ratio {ratio:.2%}",
"timestamp": datetime.datetime.now().isoformat()},
]
for i, qr in enumerate(query_results, start=3):
rows.insert(i, {
"run_id": f"r{i}", "task": "query", "term": qr["term"],
"corpus_rows": n_docs, "unique_terms": n_terms,
"latency_ms": qr["latency_ms"], "parquet_bytes": parquet_bytes, "csv_bytes": 0,
"shuffle_read_bytes": 0, "shuffle_write_bytes": 0,
"note": f"single-term lookup '{qr['term']}' freq={qr['freq']} postings={qr['n_postings']}",
"timestamp": datetime.datetime.now().isoformat(),
})
df_metrics = pd.DataFrame(rows)
df_metrics.to_csv("lab2_metrics_log.csv", index=False)
print("\nlab2_metrics_log.csv written:")
print(df_metrics.to_string(index=False))
Parquet : 123,192,287 bytes (6 files)
CSV : 359,635,037 bytes (6 files)
Ratio : 34.25% (Parquet is 65.7% smaller)
lab2_metrics_log.csv written:
run_id task term corpus_rows unique_terms latency_ms parquet_bytes csv_bytes shuffle_read_bytes shuffle_write_bytes note timestamp
r1 ingest 1526689 0 0.000000 0 0 0 0 corpus ingestion (1526689 docs) 2026-04-29T11:03:06.186400
r2 build_index 1526689 11843 36353.927851 123192287 0 0 0 inverted index construction (11843 unique terms) 2026-04-29T11:03:06.186868
r6 footprint 1526689 11843 0.000000 123192287 359635037 0 0 Parquet vs CSV — ratio 34.25% 2026-04-29T11:03:06.186873
r3 query aircraft 1526689 11843 4494.558096 123192287 0 0 0 single-term lookup 'aircraft' freq=1526689 postings=1526689 2026-04-29T11:03:06.187299
r4 query landing 1526689 11843 87.278605 123192287 0 0 0 single-term lookup 'landing' freq=0 postings=0 2026-04-29T11:03:06.187306
r5 query squawk 1526689 11843 3309.077024 123192287 0 0 0 single-term lookup 'squawk' freq=1526689 postings=1526689 2026-04-29T11:03:06.187308
r6 query altitude 1526689 11843 2026.100159 123192287 0 0 0 single-term lookup 'altitude' freq=1526689 postings=1526689 2026-04-29T11:03:06.187311
r7 query heading 1526689 11843 1341.185093 123192287 0 0 0 single-term lookup 'heading' freq=1526689 postings=1526689 2026-04-29T11:03:06.187314
r8 query airborne 1526689 11843 1001.450777 123192287 0 0 0 single-term lookup 'airborne' freq=1384951 postings=1384951 2026-04-29T11:03:06.187316
In [8]:
spark.stop()
print("Lab 2 complete.")
print("\nDeliverables checklist:")
print(" [x] outputs/lab2/inverted_index/ — Parquet index")
print(" [x] outputs/lab2/inverted_index_csv/ — CSV index")
print(" [x] proof/plan_index_build.txt")
print(" [x] proof/plan_query.txt")
print(" [x] lab2_metrics_log.csv")
print(" [ ] Spark UI screenshots — capture manually from http://localhost:4040")
Lab 2 complete. Deliverables checklist: [x] outputs/lab2/inverted_index/ — Parquet index [x] outputs/lab2/inverted_index_csv/ — CSV index [x] proof/plan_index_build.txt [x] proof/plan_query.txt [x] lab2_metrics_log.csv [ ] Spark UI screenshots — capture manually from http://localhost:4040
In [ ]: