DE2 - Lab 2: Text Processing - Inverted Index Pipeline (15%)¶

Author : Badr TAJINI - Data Engineering II (Data-Intensive Workloads) - ESIEE 2025-2026

Track: D - Aviation (METAR weather reports + airport descriptions)

Goal: Ingest a text corpus, tokenize and normalize, build an inverted index, measure query latency, compare Parquet vs CSV storage.

In [1]:
import os, time, pathlib, json
import pandas as pd
from uuid import UUID
from contextlib import redirect_stdout
from urllib.parse import urlparse
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import StructType, StructField, StringType, LongType

DE2_SPARK_DRIVER_HOST  = os.environ.get("DE2_SPARK_DRIVER_HOST",  "127.0.0.1")
DE2_SPARK_BIND_ADDRESS = os.environ.get("DE2_SPARK_BIND_ADDRESS", "0.0.0.0")
os.environ.setdefault("SPARK_LOCAL_IP", DE2_SPARK_DRIVER_HOST)

def show_spark_ui(spark_session):
    ui_url = spark_session.sparkContext.uiWebUrl
    print("Spark version:", spark_session.version)
    if ui_url:
        ui_port = urlparse(ui_url).port or 4040
        print("Spark UI:", ui_url)
        print("Spark UI (WSL/Windows browser):", f"http://localhost:{ui_port}")
    else:
        print("Spark UI: not available")

spark = (
    SparkSession.builder
    .appName("DE2-Lab2-TextProcessing")
    .master("local[*]")
    .config("spark.driver.host",        DE2_SPARK_DRIVER_HOST)
    .config("spark.driver.bindAddress", DE2_SPARK_BIND_ADDRESS)
    .config("spark.ui.bindAddress",     DE2_SPARK_BIND_ADDRESS)
    # RAM configuration
    .config("spark.driver.memory", "4g")     
    .config("spark.executor.memory", "4g")    
    .config("spark.driver.maxResultSize", "2g") 
    # Reduce shuffle partitions: no more than 8 cores on local mode
    .config("spark.sql.shuffle.partitions", "8")
    .getOrCreate()
)

show_spark_ui(spark)
WARNING: Using incubator modules: jdk.incubator.vector
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/04/29 10:57:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark version: 4.0.1
Spark UI: http://127.0.0.1:4040
Spark UI (WSL/Windows browser): http://localhost:4040

1. Corpus Ingestion¶

Track D - Aviation:

  • Source: METAR weather reports + airport descriptions
  • Each row = one METAR report (one document), identified by doc_id (station code + timestamp)
  • If you only have the OpenSky CSV from Lab 1, the cell below also shows how to generate a synthetic METAR-like corpus for testing.

Expected file: data/corpus/aviation_metar.csv with columns doc_id, text

In [2]:
# ── Generate a synthetic METAR corpus from your Lab 1 dataset ───
import os, pathlib
import pandas as pd

METAR_CSV = "data/corpus/aviation_metar.csv"
pathlib.Path("data/corpus").mkdir(parents=True, exist_ok=True)

if not os.path.exists(METAR_CSV):
    # Build synthetic documents from the OpenSky CSV used in Lab 1
    LAB1_CSV = "data/raw_aviation/states_2017-06-05-00.csv"
    try:
        df_raw = pd.read_csv(LAB1_CSV)
        # Build one synthetic METAR-like sentence per aircraft record
        def make_metar(row):
            cs  = str(row.get('callsign', 'UNKNOWN')).strip() or 'UNKNOWN'
            alt = row.get('baroaltitude', 0) or 0
            vel = row.get('velocity', 0) or 0
            hdg = row.get('heading', 0) or 0
            vrt = row.get('vertrate', 0) or 0
            gnd = 'on ground' if row.get('onground') else 'airborne'
            sq  = str(row.get('squawk', '0000'))
            return (
                f"Aircraft {cs} is {gnd} squawk {sq} altitude {alt:.0f} meters "
                f"velocity {vel:.1f} knots heading {hdg:.0f} degrees "
                f"vertical rate {vrt:.1f} meters per second"
            )
        df_raw['doc_id'] = df_raw['icao24'].astype(str) + '_' + df_raw['time'].astype(str)
        df_raw['text']   = df_raw.apply(make_metar, axis=1)
        df_corpus = df_raw[['doc_id', 'text']].dropna()
        df_corpus.to_csv(METAR_CSV, index=False)
        print(f"Synthetic METAR corpus generated: {len(df_corpus)} documents → {METAR_CSV}")
    except FileNotFoundError:
        # Fallback: generate a small demo corpus
        metar_samples = [
            ("LFPG_001", "METAR LFPG 051200Z 27010KT 9999 FEW025 15/08 Q1015 NOSIG aircraft approaching runway two seven"),
            ("LFPO_001", "METAR LFPO 051200Z 26015KT 8000 SCT020 14/09 Q1014 landing clearance issued aircraft on final"),
            ("LFMN_001", "METAR LFMN 051200Z 08010KT 9999 SKC 22/15 Q1018 visibility good aircraft taxiing runway zero four"),
            ("LFLL_001", "METAR LFLL 051200Z 20008KT 6000 BKN015 12/10 Q1013 fog patch aircraft holding short runway three six"),
            ("LFRS_001", "METAR LFRS 051200Z 31020KT 9999 FEW030 10/05 Q1020 wind shear alert aircraft go around"),
            ("LFRN_001", "METAR LFRN 051200Z 29012KT 9999 SCT035 11/06 Q1019 aircraft squawk seven seven zero zero emergency"),
            ("LFBO_001", "METAR LFBO 051200Z 24018KT 9999 FEW020 16/10 Q1016 aircraft airborne departure runway one four right"),
            ("LFBD_001", "METAR LFBD 051200Z 22014KT 9999 SCT025 17/11 Q1015 aircraft landing runway two three heading two two zero"),
        ] * 50  # repeat to get a meaningful corpus
        # Make doc_ids unique
        metar_samples = [(f"{d}_{i}", t) for i, (d, t) in enumerate(metar_samples)]
        pd.DataFrame(metar_samples, columns=['doc_id','text']).to_csv(METAR_CSV, index=False)
        print(f"Demo METAR corpus generated: {len(metar_samples)} documents → {METAR_CSV}")
else:
    print(f"Corpus already exists: {METAR_CSV}")
Synthetic METAR corpus generated: 1526689 documents → data/corpus/aviation_metar.csv
In [2]:
# ── Load corpus with explicit schema ────────────────────────────────────────
schema = StructType([
    StructField("doc_id", StringType(), False),
    StructField("text",   StringType(), True),
])

CORPUS_PATH = "data/corpus/aviation_metar.csv"

df_corpus = (
    spark.read
    .schema(schema)
    .option("header", "true")
    .csv(CORPUS_PATH)
)

# Basic stats
df_corpus = df_corpus.withColumn("doc_len", F.length("text"))
n_docs    = df_corpus.count()
avg_len   = df_corpus.select(F.avg("doc_len")).first()[0] or 0

print(f"Documents  : {n_docs:,}")
print(f"Avg length : {avg_len:.0f} chars")
df_corpus.show(5, truncate=100)
                                                                                
Documents  : 1,526,689
Avg length : 137 chars
+-----------------+----------------------------------------------------------------------------------------------------+-------+
|           doc_id|                                                                                                text|doc_len|
+-----------------+----------------------------------------------------------------------------------------------------+-------+
|4bccb9_1496620800|Aircraft SXS2WY is airborne squawk 1000.0 altitude 7460 meters velocity 175.3 knots heading 305 d...|    139|
|502cb2_1496620800|Aircraft MON55BR is airborne squawk 2770.0 altitude 10988 meters velocity 232.9 knots heading 305...|    142|
|4bccaf_1496620800|Aircraft SXS7R is airborne squawk 3215.0 altitude 11582 meters velocity 196.7 knots heading 289 d...|    139|
|4008e1_1496620800|Aircraft TCX229 is airborne squawk 3462.0 altitude 10363 meters velocity 214.2 knots heading 293 ...|    140|
|4070e3_1496620800|Aircraft EXS14D is airborne squawk 7775.0 altitude 10973 meters velocity 221.8 knots heading 298 ...|    140|
+-----------------+----------------------------------------------------------------------------------------------------+-------+
only showing top 5 rows

2. Text Normalization — Lowercase, Tokenize, Remove Stop-Words¶

Pipeline:

  1. Lowercase + strip non-alphanumeric characters with regexp_replace
  2. Split on whitespace/punctuation: split(text, "[\\s\\W]+")
  3. Explode array → one row per (doc_id, token)
  4. Filter empty strings, then filter stop-words via broadcast set
In [3]:
# ── Step 1-2: Lowercase + tokenize ──────────────────────────────────────────
df_clean = df_corpus.withColumn(
    "text_clean",
    F.regexp_replace(F.lower(F.col("text")), r"[^a-z0-9\s]", "")
)

df_tokens = df_clean.withColumn(
    "tokens",
    F.split(F.col("text_clean"), r"[\s\W]+")   # split on whitespace/punctuation
)

# ── Step 3: Explode → (doc_id, token) ────────────────────────────────────────
df_exploded = (
    df_tokens
    .select("doc_id", F.explode("tokens").alias("token"))
    .filter(F.length("token") > 1)   # drop single-char and empty tokens
)
total_before = df_exploded.count()
print(f"Total tokens BEFORE stop-word removal: {total_before:,}")

# ── Step 4: Remove stop-words ────────────────────────────────────────────────
# Aviation-aware stop list: generic English + common METAR filler words
STOP_WORDS = {
    "the", "a", "an", "is", "are", "was", "were", "in", "on", "at",
    "to", "for", "of", "and", "or", "not", "it", "this", "that", "with",
    "be", "by", "as", "from", "but", "its", "if", "do", "no", "so",
    "per", "has", "have", "had", "will", "would", "may", "can",
    # METAR fillers
    "meters", "meter", "degrees", "knots", "rate", "second", "seconds"
}

df_filtered = df_exploded.filter(~F.col("token").isin(STOP_WORDS))
total_after = df_filtered.count()

print(f"Total tokens AFTER  stop-word removal: {total_after:,}")
print(f"Stop-words removed : {total_before - total_after:,} ({(total_before - total_after)/total_before*100:.1f}%)")
df_filtered.show(10, truncate=60)
                                                                                
Total tokens BEFORE stop-word removal: 32,173,700
                                                                                
Total tokens AFTER  stop-word removal: 19,818,447
Stop-words removed : 12,355,253 (38.4%)
+-----------------+--------+
|           doc_id|   token|
+-----------------+--------+
|4bccb9_1496620800|aircraft|
|4bccb9_1496620800|  sxs2wy|
|4bccb9_1496620800|airborne|
|4bccb9_1496620800|  squawk|
|4bccb9_1496620800|   10000|
|4bccb9_1496620800|altitude|
|4bccb9_1496620800|    7460|
|4bccb9_1496620800|velocity|
|4bccb9_1496620800|    1753|
|4bccb9_1496620800| heading|
+-----------------+--------+
only showing top 10 rows

3. Build Inverted Index¶

Group by token → collect list of doc_id (postings list) + document frequency (freq).

In [4]:
# ── Build inverted index ─────────────────────────────────────────────────────
inverted_index = (
    df_filtered
    .groupBy("token")
    .agg(
        F.collect_list("doc_id").alias("doc_ids"),  # postings list
        F.count("*")            .alias("freq")       # document frequency
    )
    .orderBy(F.desc("freq"))    # most frequent terms first
)

n_terms = inverted_index.count()
print(f"Unique terms in index: {n_terms:,}")

# Show top terms (most frequent)
inverted_index.show(20, truncate=80)
                                                                                
Unique terms in index: 11,843
                                                                                
+--------+--------------------------------------------------------------------------------+-------+
|   token|                                                                         doc_ids|   freq|
+--------+--------------------------------------------------------------------------------+-------+
|     nan|[7c6c27_1496621230, ad4fe1_1496620810, 7c6c27_1496621230, ad4fe1_1496620810, ...|2429395|
|aircraft|[7c6c27_1496621230, 4ca351_1496620800, a0f46f_1496621230, 400ff4_1496620800, ...|1526689|
|  squawk|[7c6d9c_1496621230, 4ca351_1496620800, 7c6c27_1496621230, 400ff4_1496620800, ...|1526689|
|altitude|[7c6d9c_1496621230, 4ca351_1496620800, 7c6c27_1496621230, 400ff4_1496620800, ...|1526689|
|velocity|[7c6d9c_1496621230, 4ca351_1496620800, 7c6c27_1496621230, 400ff4_1496620800, ...|1526689|
|vertical|[7c6d9c_1496621230, 4ca351_1496620800, 7c6c27_1496621230, 400ff4_1496620800, ...|1526689|
| heading|[7c6d9c_1496621230, 4ca351_1496620800, 7c6c27_1496621230, 400ff4_1496620800, ...|1526689|
|airborne|[7c6d9c_1496621230, 4ca351_1496620800, a0f46f_1496621230, 400ff4_1496620800, ...|1384951|
|      00|[71c258_1496621240, a0f566_1496621650, a684c4_1496621240, 4baa90_1496621650, ...| 405336|
|      03|[a657bf_1496621650, abd90c_1496621240, ad5623_1496621650, a1fb8f_1496621240, ...| 234044|
|  ground|[7c6c27_1496621230, ad4fe1_1496620810, adda1e_1496621230, a95d95_1496620810, ...| 141738|
|   10973|[ab6cc9_1496621650, 4841c5_1496621240, 4841c5_1496621650, 40643e_1496621240, ...|  72734|
|   11278|[06a046_1496621650, a3ef1b_1496621240, a93d9f_1496621650, ad4cf2_1496621240, ...|  58302|
|   10668|[a699b0_1496621650, abe3e6_1496621240, 4ca068_1496621650, 71c043_1496621240, ...|  57388|
|   10363|[a33327_1496621650, 71c258_1496621240, a1512f_1496621650, a5a11a_1496621240, ...|  51652|
|   11582|[401000_1496621650, a82520_1496621240, c0809d_1496621650, a5fd2d_1496621240, ...|  49322|
|   11887|[a2396e_1496621650, 8a04bd_1496621240, a1c66a_1496621650, a582a9_1496621240, ...|  28690|
|      07|[aa7115_1496621650, a9c5c5_1496621240, a05d0c_1496621650, acdde3_1496621240, ...|  28106|
|     180|[338086_1496621230, 578055_1496620810, 3382f1_1496621230, a4caec_1496620810, ...|  28089|
|   10058|[a0f566_1496621650, a769a3_1496621240, a68daf_1496621650, a00ce0_1496621240, ...|  26290|
+--------+--------------------------------------------------------------------------------+-------+
only showing top 20 rows

4. Write Inverted Index - Parquet and CSV¶

  • Parquet: native array type → efficient column pruning on token filter
  • CSV: doc_ids array serialized as comma-joined string → larger footprint, no column pruning
In [5]:
PARQUET_PATH = "outputs/lab2/inverted_index"
CSV_PATH     = "outputs/lab2/inverted_index_csv"

pathlib.Path(PARQUET_PATH).mkdir(parents=True, exist_ok=True)
pathlib.Path(CSV_PATH).mkdir(parents=True, exist_ok=True)

# ── Write Parquet ────────────────────────────────────────────────────────────
t0 = time.time()
inverted_index.write.mode("overwrite").parquet(PARQUET_PATH)
parquet_write_ms = (time.time() - t0) * 1000
print(f"Parquet written in {parquet_write_ms:.0f} ms → {PARQUET_PATH}")

# ── Write CSV (doc_ids array → pipe-separated string) ────────────────────────
t0 = time.time()
(
    inverted_index
    .withColumn("doc_ids", F.concat_ws("|", "doc_ids"))  # serialize array for CSV
    .write
    .mode("overwrite")
    .option("header", "true")
    .csv(CSV_PATH)
)
csv_write_ms = (time.time() - t0) * 1000
print(f"CSV     written in {csv_write_ms:.0f} ms → {CSV_PATH}")
                                                                                
Parquet written in 36354 ms → outputs/lab2/inverted_index
                                                                                
CSV     written in 29962 ms → outputs/lab2/inverted_index_csv

5. Query Latency Measurement¶

Read back from Parquet (simulates production query), cache the index, then look up aviation-relevant terms.
Measure wall-clock latency with time.time() and capture the physical plan.

In [ ]:
# ── Reload index from Parquet + cache ────────────────────────────────────────
idx = spark.read.parquet(PARQUET_PATH)
idx.cache()
idx.count()   # trigger cache population
print("Index cached.")
print(f"Schema: {idx.dtypes}")

# ── Save index build plan ────────────────────────────────────────────────────
pathlib.Path("proof").mkdir(parents=True, exist_ok=True)
with open("proof/plan_index_build.txt", "w") as f:
    with redirect_stdout(f):
        inverted_index.explain(mode="formatted")
print("plan_index_build.txt saved.")

# ── Query 3 aviation-relevant terms ─────────────────────────────────────────
# Terms expected to appear in METAR reports / aviation text
QUERY_TERMS = ["aircraft", "landing", "squawk", "altitude", "heading", "airborne"]

query_results = []
for term in QUERY_TERMS:
    t0 = time.time()
    rows = idx.filter(F.col("token") == term).collect()
    latency_ms = (time.time() - t0) * 1000
    if rows:
        r = rows[0]
        print(f"  '{term}': freq={r['freq']:,}, postings={len(r['doc_ids']):,}, latency={latency_ms:.1f} ms")
        query_results.append({"term": term, "freq": r["freq"],
                               "n_postings": len(r["doc_ids"]), "latency_ms": latency_ms})
    else:
        print(f"  '{term}': NOT FOUND, latency={latency_ms:.1f} ms")
        query_results.append({"term": term, "freq": 0, "n_postings": 0, "latency_ms": latency_ms})

# ── Save query plan (first term) ─────────────────────────────────────────────
with open("proof/plan_query.txt", "w") as f:
    with redirect_stdout(f):
        idx.filter(F.col("token") == QUERY_TERMS[0]).explain(mode="formatted")
print("proof/plan_query.txt saved.")
                                                                                
Index cached.
Schema: [('token', 'string'), ('doc_ids', 'array<string>'), ('freq', 'bigint')]
plan_index_build.txt saved.
  'aircraft': freq=1,526,689, postings=1,526,689, latency=4494.6 ms
  'landing': NOT FOUND, latency=87.3 ms
  'squawk': freq=1,526,689, postings=1,526,689, latency=3309.1 ms
  'altitude': freq=1,526,689, postings=1,526,689, latency=2026.1 ms
  'heading': freq=1,526,689, postings=1,526,689, latency=1341.2 ms
  'airborne': freq=1,384,951, postings=1,384,951, latency=1001.5 ms
proof/plan_query.txt saved.

6. Storage Footprint Comparison — Parquet vs CSV¶

In [7]:
# ── On-disk footprint ────────────────────────────────────────────────────────
def dir_size_bytes(path):
    return sum(
        f.stat().st_size
        for f in pathlib.Path(path).rglob("*")
        if f.is_file()
    )

def dir_file_count(path):
    return sum(1 for f in pathlib.Path(path).rglob("*") if f.is_file())

parquet_bytes = dir_size_bytes(PARQUET_PATH)
csv_bytes     = dir_size_bytes(CSV_PATH)
parquet_files = dir_file_count(PARQUET_PATH)
csv_files     = dir_file_count(CSV_PATH)
ratio         = parquet_bytes / csv_bytes if csv_bytes > 0 else 0

print(f"Parquet : {parquet_bytes:>12,} bytes  ({parquet_files} files)")
print(f"CSV     : {csv_bytes:>12,} bytes  ({csv_files} files)")
print(f"Ratio   : {ratio:.2%}  (Parquet is {(1-ratio)*100:.1f}% smaller)" if ratio < 1
      else f"Ratio   : {ratio:.2%}")

# ── Fill metrics log ─────────────────────────────────────────────────────────
import datetime

rows = [
    {"run_id": "r1", "task": "ingest",          "term": "",
     "corpus_rows": n_docs, "unique_terms": 0,
     "latency_ms": 0, "parquet_bytes": 0, "csv_bytes": 0,
     "shuffle_read_bytes": 0, "shuffle_write_bytes": 0,
     "note": f"corpus ingestion ({n_docs} docs)",
     "timestamp": datetime.datetime.now().isoformat()},
    {"run_id": "r2", "task": "build_index",     "term": "",
     "corpus_rows": n_docs, "unique_terms": n_terms,
     "latency_ms": parquet_write_ms, "parquet_bytes": parquet_bytes, "csv_bytes": 0,
     "shuffle_read_bytes": 0, "shuffle_write_bytes": 0,
     "note": f"inverted index construction ({n_terms} unique terms)",
     "timestamp": datetime.datetime.now().isoformat()},
    {"run_id": "r6", "task": "footprint",        "term": "",
     "corpus_rows": n_docs, "unique_terms": n_terms,
     "latency_ms": 0, "parquet_bytes": parquet_bytes, "csv_bytes": csv_bytes,
     "shuffle_read_bytes": 0, "shuffle_write_bytes": 0,
     "note": f"Parquet vs CSV — ratio {ratio:.2%}",
     "timestamp": datetime.datetime.now().isoformat()},
]

for i, qr in enumerate(query_results, start=3):
    rows.insert(i, {
        "run_id": f"r{i}", "task": "query", "term": qr["term"],
        "corpus_rows": n_docs, "unique_terms": n_terms,
        "latency_ms": qr["latency_ms"], "parquet_bytes": parquet_bytes, "csv_bytes": 0,
        "shuffle_read_bytes": 0, "shuffle_write_bytes": 0,
        "note": f"single-term lookup '{qr['term']}' freq={qr['freq']} postings={qr['n_postings']}",
        "timestamp": datetime.datetime.now().isoformat(),
    })

df_metrics = pd.DataFrame(rows)
df_metrics.to_csv("lab2_metrics_log.csv", index=False)
print("\nlab2_metrics_log.csv written:")
print(df_metrics.to_string(index=False))
Parquet :  123,192,287 bytes  (6 files)
CSV     :  359,635,037 bytes  (6 files)
Ratio   : 34.25%  (Parquet is 65.7% smaller)

lab2_metrics_log.csv written:
run_id        task     term  corpus_rows  unique_terms   latency_ms  parquet_bytes  csv_bytes  shuffle_read_bytes  shuffle_write_bytes                                                        note                  timestamp
    r1      ingest               1526689             0     0.000000              0          0                   0                    0                             corpus ingestion (1526689 docs) 2026-04-29T11:03:06.186400
    r2 build_index               1526689         11843 36353.927851      123192287          0                   0                    0            inverted index construction (11843 unique terms) 2026-04-29T11:03:06.186868
    r6   footprint               1526689         11843     0.000000      123192287  359635037                   0                    0                               Parquet vs CSV — ratio 34.25% 2026-04-29T11:03:06.186873
    r3       query aircraft      1526689         11843  4494.558096      123192287          0                   0                    0 single-term lookup 'aircraft' freq=1526689 postings=1526689 2026-04-29T11:03:06.187299
    r4       query  landing      1526689         11843    87.278605      123192287          0                   0                    0              single-term lookup 'landing' freq=0 postings=0 2026-04-29T11:03:06.187306
    r5       query   squawk      1526689         11843  3309.077024      123192287          0                   0                    0   single-term lookup 'squawk' freq=1526689 postings=1526689 2026-04-29T11:03:06.187308
    r6       query altitude      1526689         11843  2026.100159      123192287          0                   0                    0 single-term lookup 'altitude' freq=1526689 postings=1526689 2026-04-29T11:03:06.187311
    r7       query  heading      1526689         11843  1341.185093      123192287          0                   0                    0  single-term lookup 'heading' freq=1526689 postings=1526689 2026-04-29T11:03:06.187314
    r8       query airborne      1526689         11843  1001.450777      123192287          0                   0                    0 single-term lookup 'airborne' freq=1384951 postings=1384951 2026-04-29T11:03:06.187316
In [8]:
spark.stop()
print("Lab 2 complete.")
print("\nDeliverables checklist:")
print("  [x] outputs/lab2/inverted_index/     — Parquet index")
print("  [x] outputs/lab2/inverted_index_csv/ — CSV index")
print("  [x] proof/plan_index_build.txt")
print("  [x] proof/plan_query.txt")
print("  [x] lab2_metrics_log.csv")
print("  [ ] Spark UI screenshots — capture manually from http://localhost:4040")
Lab 2 complete.

Deliverables checklist:
  [x] outputs/lab2/inverted_index/     — Parquet index
  [x] outputs/lab2/inverted_index_csv/ — CSV index
  [x] proof/plan_index_build.txt
  [x] proof/plan_query.txt
  [x] lab2_metrics_log.csv
  [ ] Spark UI screenshots — capture manually from http://localhost:4040
In [ ]: