DE2 — Assignment 3: Graphs or Clustering¶

Author : Badr TAJINI - Data Engineering II (Data-Intensive Workloads) - ESIEE 2025-2026

Track: D — Aviation (OpenSky Network)
Path chosen: A — Graph Processing (Airspace Proximity Network + manual iterative PageRank)
Names: Justine Guirauden and Volcy Desmazures


Graph model¶

Element Definition
Vertex A unique airborne aircraft identified by its icao24 transponder code
Edge Two aircraft shared the same 2°×2° lat/lon sector within the same 5-minute time window
Weight Unweighted (binary co-presence)

PageRank applied to this graph identifies the aircraft most central in the airspace traffic network, those that are most frequently co-located with many other aircraft, which maps to high-density approach/departure corridors.

Partitioning experiment: default hash partitioning (baseline) vs explicit repartition(N, "src") (optimized).

0. Setup¶

In [1]:
import os, time, pathlib, datetime, socket
import pandas as pd
import numpy as np
from contextlib import redirect_stdout
from urllib.parse import urlparse
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import (
    StructType, StructField, StringType,
    LongType, DoubleType, BooleanType
)

# ── Network binding (WSL / local compatibility) ───────────────────────────────
try:
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.connect(("8.8.8.8", 80))
    LOCAL_IP = s.getsockname()[0]
    s.close()
except Exception:
    LOCAL_IP = "127.0.0.1"

DE2_SPARK_DRIVER_HOST  = os.environ.get("DE2_SPARK_DRIVER_HOST",  LOCAL_IP)
DE2_SPARK_BIND_ADDRESS = os.environ.get("DE2_SPARK_BIND_ADDRESS", "0.0.0.0")
os.environ.setdefault("SPARK_LOCAL_IP", DE2_SPARK_DRIVER_HOST)

def show_spark_ui(spark_session):
    ui_url = spark_session.sparkContext.uiWebUrl
    print("Spark version:", spark_session.version)
    if ui_url:
        ui_port = urlparse(ui_url).port or 4040
        print("Spark UI:",                      ui_url)
        print("Spark UI (WSL/Windows browser):", f"http://localhost:{ui_port}")
    else:
        print("Spark UI: not available")

spark = (
    SparkSession.builder
    .appName("de2-assignment3-aviation-pagerank")
    .master("local[*]")
    .config("spark.driver.host",               DE2_SPARK_DRIVER_HOST)
    .config("spark.driver.bindAddress",        DE2_SPARK_BIND_ADDRESS)
    .config("spark.ui.bindAddress",            DE2_SPARK_BIND_ADDRESS)
    # Tuned for local mode: avoid 200-task shuffle overhead
    .config("spark.sql.shuffle.partitions",    "8")
    .getOrCreate()
)

show_spark_ui(spark)

# ── Directory setup ───────────────────────────────────────────────────────────
for p in ["outputs/lab3", "proof", "data/raw_aviation"]:
    pathlib.Path(p).mkdir(parents=True, exist_ok=True)

# ── PageRank hyper-parameters ────────────────────────────────────────────────
DAMPING     = 0.85       # standard damping factor
MAX_ITER    = 10         # maximum iterations
EPSILON     = 0.001      # convergence threshold (max absolute rank delta)
SECTOR_DEG  = 2          # degrees per grid cell
TIME_BUCKET = 300        # 5-minute windows (seconds)
N_PARTS     = 8          # repartition target (match local core count)

print("Setup complete.")
WARNING: Using incubator modules: jdk.incubator.vector
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/04/30 09:36:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark version: 4.0.1
Spark UI: http://172.30.130.70:4040
Spark UI (WSL/Windows browser): http://localhost:4040
Setup complete.

Step 1 — Data & Graph Construction¶

Load the OpenSky dataset from Lab 1.
If absent, a reproducible synthetic dataset (same schema, similar statistics) is generated automatically.

In [2]:
# ── Source selection ─────────────────────────────────────────────────────────
LAB1_CSV   = "data/raw_aviation/states_2017-06-05-00.csv"
SYNTH_PATH = "data/raw_aviation/synthetic_opensky.csv"

if not pathlib.Path(LAB1_CSV).exists() and not pathlib.Path(SYNTH_PATH).exists():
    print("Generating synthetic OpenSky dataset (5 000 records)...")
    rng = np.random.default_rng(42)
    n   = 5000
    icao24_pool = [f"a{i:05x}" for i in range(300)]
    squawk_pool = [str(s).zfill(4) for s in rng.integers(1000, 7777, 60)]
    pd.DataFrame({
        "time":         rng.integers(1496620800, 1496624400, n),
        "icao24":       rng.choice(icao24_pool, n),
        "lat":          rng.uniform(43.0, 52.0, n),
        "lon":          rng.uniform(-5.0, 15.0, n),
        "velocity":     rng.uniform(100, 280, n),
        "heading":      rng.uniform(0, 360, n),
        "vertrate":     rng.uniform(-10, 10, n),
        "callsign":     [f"AF{rng.integers(100,999)}" for _ in range(n)],
        "onground":     rng.choice([True, False], n, p=[0.05, 0.95]),
        "alert":        rng.choice([True, False], n, p=[0.01, 0.99]),
        "spi":          [False] * n,
        "squawk":       rng.choice(squawk_pool, n),
        "baroaltitude": rng.uniform(1000, 12000, n),
        "geoaltitude":  rng.uniform(1000, 12000, n),
        "lastposupdate":rng.uniform(1496620800, 1496624400, n),
        "lastcontact":  rng.uniform(1496620800, 1496624400, n),
    }).to_csv(SYNTH_PATH, index=False)
    print(f"Synthetic data → {SYNTH_PATH}")

SOURCE_CSV = LAB1_CSV if pathlib.Path(LAB1_CSV).exists() else SYNTH_PATH
print(f"Source: {SOURCE_CSV}")
Source: data/raw_aviation/states_2017-06-05-00.csv
In [3]:
# ── Schema (identical to Lab 1) ───────────────────────────────────────────────
opensky_schema = StructType([
    StructField("time",          LongType(),    True),
    StructField("icao24",        StringType(),  True),
    StructField("lat",           DoubleType(),  True),
    StructField("lon",           DoubleType(),  True),
    StructField("velocity",      DoubleType(),  True),
    StructField("heading",       DoubleType(),  True),
    StructField("vertrate",      DoubleType(),  True),
    StructField("callsign",      StringType(),  True),
    StructField("onground",      BooleanType(), True),
    StructField("alert",         BooleanType(), True),
    StructField("spi",           BooleanType(), True),
    StructField("squawk",        StringType(),  True),
    StructField("baroaltitude",  DoubleType(),  True),
    StructField("geoaltitude",   DoubleType(),  True),
    StructField("lastposupdate", DoubleType(),  True),
    StructField("lastcontact",   DoubleType(),  True),
])

# ── Load + filter (airborne only) ────────────────────────────────────────────
df_raw = (
    spark.read
    .schema(opensky_schema)
    .option("header", "true")
    .csv(SOURCE_CSV)
    .filter(
        F.col("icao24").isNotNull() &
        F.col("lat").isNotNull()    &
        F.col("lon").isNotNull()    &
        ~F.col("onground")
    )
)
print(f"Raw airborne records: {df_raw.count():,}")

# ── Snap to 2°×2° sector grid + 5-minute time bucket ─────────────────────────
df_sectors = df_raw.select(
    F.col("icao24"),
    (F.floor(F.col("lat") / SECTOR_DEG) * SECTOR_DEG).alias("sector_lat"),
    (F.floor(F.col("lon") / SECTOR_DEG) * SECTOR_DEG).alias("sector_lon"),
    (F.floor(F.col("time")  / TIME_BUCKET) * TIME_BUCKET).alias("time_bucket"),
)

# ── Proximity edges via self-join on sector + time_bucket ─────────────────────
# Using strict inequality (l.icao24 < r.icao24) → unique undirected edges
left  = df_sectors.alias("l")
right = df_sectors.alias("r")

edges_raw = (
    left.join(
        right,
        (
            (F.col("l.sector_lat")  == F.col("r.sector_lat"))  &
            (F.col("l.sector_lon")  == F.col("r.sector_lon"))  &
            (F.col("l.time_bucket") == F.col("r.time_bucket")) &
            (F.col("l.icao24")      <  F.col("r.icao24"))      # no self-loops, no duplicates
        )
    )
    .select(
        F.col("l.icao24").alias("src"),
        F.col("r.icao24").alias("dst"),
    )
    .distinct()
)
edges_raw.cache()
n_edges = edges_raw.count()

# ── Vertex DataFrame ─────────────────────────────────────────────────────────
vertices = (
    edges_raw.select(F.col("src").alias("id"))
    .union(edges_raw.select(F.col("dst").alias("id")))
    .distinct()
)
vertices.cache()
n_vertices = vertices.count()

print(f"Graph — Vertices: {n_vertices:,}  |  Edges: {n_edges:,}")
edges_raw.show(8)

# ── Out-degree + Skew analysis ────────────────────────────────────────────────
out_degree = (
    edges_raw.groupBy("src")
    .count()
    .withColumnRenamed("count", "out_degree")
    .orderBy(F.desc("out_degree"))
)
out_degree.cache()

deg_stats = out_degree.select(
    F.mean("out_degree").alias("mean"),
    F.stddev("out_degree").alias("stddev"),
    F.max("out_degree").alias("max")
).first()

skew_ratio = (deg_stats["max"] / deg_stats["mean"]) if deg_stats["mean"] else 0
print(f"\nOut-degree stats — mean: {deg_stats['mean']:.2f}  std: {deg_stats['stddev']:.2f}  max: {deg_stats['max']}")
print(f"Skew ratio (max/mean): {skew_ratio:.1f}x")
print("\nTop 10 hot vertices (potential skew sources):")
out_degree.show(10)
                                                                                
Raw airborne records: 992,613
                                                                                
Graph — Vertices: 6,264  |  Edges: 256,461
+------+------+
|   src|   dst|
+------+------+
|7c39fa|7c4eeb|
|7503a9|7c7a45|
|7c6d26|7c6db0|
|7c6d26|7c752d|
|7c5b44|7c6d26|
|7c6c99|7c6d75|
|780aa2|7c6d29|
|780aa2|7c7a3e|
+------+------+
only showing top 8 rows

Out-degree stats — mean: 42.69  std: 49.60  max: 438
Skew ratio (max/mean): 10.3x

Top 10 hot vertices (potential skew sources):
+------+----------+
|   src|out_degree|
+------+----------+
|0d0760|       438|
|2ac0af|       382|
|4245fb|       343|
|2ac64c|       335|
|2ac55b|       329|
|4b1901|       326|
|2ac126|       325|
|2980c5|       316|
|394a02|       304|
|a0420e|       304|
+------+----------+
only showing top 10 rows

Step 2 — Iterative PageRank (Baseline)¶

Manual implementation without explicit repartitioning.
Each iteration records wall-clock time and convergence delta for later comparison.

In [4]:
# ── Baseline PageRank (default hash partitioning) ─────────────────────────────
edges_baseline = edges_raw   # no repartition

ranks = vertices.withColumn("rank", F.lit(1.0 / n_vertices))
pr_metrics_baseline  = []
converged_baseline   = False

print(f"Starting PageRank — {n_vertices} vertices, damping={DAMPING}, ε={EPSILON}")
print(f"Partitions (edges): {edges_baseline.rdd.getNumPartitions()}")
print()

for i in range(MAX_ITER):
    t0 = time.time()

    # 1. Distribute rank from each src proportionally to its out-edges
    contribs = (
        edges_baseline
        .join(ranks,      edges_baseline.src == ranks.id,         "inner")
        .join(out_degree, edges_baseline.src == out_degree.src,   "inner")
        .select(
            edges_baseline.dst.alias("id"),
            (F.col("rank") / F.col("out_degree")).alias("contrib")
        )
    )

    # 2. Aggregate incoming contributions, apply damping
    new_ranks = (
        contribs
        .groupBy("id")
        .agg(F.sum("contrib").alias("sum_contrib"))
        .withColumn(
            "rank",
            F.lit((1.0 - DAMPING) / n_vertices)
            + F.lit(DAMPING) * F.col("sum_contrib")
        )
        .select("id", "rank")
    )
    new_ranks.cache()
    new_ranks.count()   # materialize — breaks DAG lineage growth
    elapsed_ms = (time.time() - t0) * 1000

    # 3. Convergence: max |new_rank - old_rank|
    old_r = ranks.withColumnRenamed("rank", "old_rank")
    delta = (
        new_ranks.join(old_r, "id")
        .select(F.max(F.abs(new_ranks.rank - F.col("old_rank"))).alias("delta"))
        .first()["delta"] or 0.0
    )

    pr_metrics_baseline.append({
        "run_id":              f"baseline_iter_{i:02d}",
        "path":                "A",
        "algorithm":           "pagerank_manual",
        "task":                f"pagerank_iter_{i}",
        "notes":               "default partitioning (baseline)",
        "iteration_or_k":      i,
        "seed":                0,
        "metric_value":        round(delta, 8),
        "shuffle_read_bytes":  0,   # complete from Spark UI after execution
        "shuffle_write_bytes": 0,
        "elapsed_ms":          round(elapsed_ms, 1),
        "timestamp":           datetime.datetime.now().isoformat(),
    })

    print(f"  [BASELINE] iter {i:2d} | {elapsed_ms:7.0f} ms | delta={delta:.6f}")
    ranks.unpersist()
    ranks = new_ranks

    if delta < EPSILON:
        print(f"  → Converged at iteration {i} (delta < ε={EPSILON})")
        converged_baseline = True
        break

if not converged_baseline:
    print(f"  → Did not converge within {MAX_ITER} iterations")

print("\nTop 15 aircraft by PageRank (baseline):")
ranks.orderBy(F.desc("rank")).show(15)

# ── Persist final results ─────────────────────────────────────────────────────
ranks.write.mode("overwrite").parquet("outputs/lab3/pagerank_baseline")
print("Results → outputs/lab3/pagerank_baseline")
Starting PageRank — 6264 vertices, damping=0.85, ε=0.001
Partitions (edges): 8

26/04/30 09:53:57 WARN Column: Constructing trivially true equals predicate, 'src == src'. Perhaps you need to use aliases.
  [BASELINE] iter  0 |    1030 ms | delta=0.001349
26/04/30 09:53:58 WARN Column: Constructing trivially true equals predicate, 'src == src'. Perhaps you need to use aliases.
  [BASELINE] iter  1 |     629 ms | delta=0.002393
26/04/30 09:54:00 WARN Column: Constructing trivially true equals predicate, 'src == src'. Perhaps you need to use aliases.
  [BASELINE] iter  2 |     578 ms | delta=0.002487
26/04/30 09:54:00 WARN Column: Constructing trivially true equals predicate, 'src == src'. Perhaps you need to use aliases.
  [BASELINE] iter  3 |     543 ms | delta=0.002112
26/04/30 09:54:01 WARN Column: Constructing trivially true equals predicate, 'src == src'. Perhaps you need to use aliases.
  [BASELINE] iter  4 |     645 ms | delta=0.000971
  → Converged at iteration 4 (delta < ε=0.001)

Top 15 aircraft by PageRank (baseline):
+------+--------------------+
|    id|                rank|
+------+--------------------+
|c81e22|0.007440856845081005|
|cda28b|0.007002487247207178|
|e8043b|0.005849323222585...|
|e61f99|0.005281231065134173|
|c0884a|0.005247960316645426|
|c07fbf|0.004481929543099...|
|d2579e|0.003832716281315...|
|c080a0|0.003708087800843...|
|c052e0|0.003416716491559...|
|e8043a|0.003241819131628802|
|ae0211|0.003066926241153087|
|adea26|0.003027870476320895|
|c087bc|0.002991025535631735|
|c067e7|0.002811286913353...|
|c080a9|0.002766710516659...|
+------+--------------------+
only showing top 15 rows
26/04/30 09:54:04 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                
Results → outputs/lab3/pagerank_baseline

2b. Save iteration plan¶

In [5]:
# Reconstruct the last iteration's contribution query for plan capture
_ranks_snap = vertices.withColumn("rank", F.lit(1.0 / n_vertices))
_contribs_plan = (
    edges_baseline
    .join(_ranks_snap, edges_baseline.src == _ranks_snap.id, "inner")
    .join(out_degree,  edges_baseline.src == out_degree.src, "inner")
    .select(
        edges_baseline.dst.alias("id"),
        (F.col("rank") / F.col("out_degree")).alias("contrib")
    )
)
with open("proof/plan_iteration.txt", "w") as f:
    with redirect_stdout(f):
        _contribs_plan.explain(mode="formatted")
print("proof/plan_iteration.txt saved.")
proof/plan_iteration.txt saved.
26/04/30 09:58:20 WARN Column: Constructing trivially true equals predicate, 'src == src'. Perhaps you need to use aliases.

Step 3 — Partitioning Experiment¶

Hypothesis: Pre-partitioning edges and out_degree on the join key (src) places all out-edges of a given aircraft on the same executor, turning the subsequent join(ranks, edges.src == ranks.id) into a co-located (sort-merge) join with reduced cross-partition shuffle.

Before: default hash partitioning — Spark must shuffle both sides of every join.
After: repartition(8, "src") applied once before the loop — subsequent joins on src benefit from pre-sorted co-location.

In [6]:
# ── Save BEFORE plan ─────────────────────────────────────────────────────────
_ranks_init = vertices.withColumn("rank", F.lit(1.0 / n_vertices))
_before_q = (
    edges_raw
    .join(_ranks_init, edges_raw.src == _ranks_init.id, "inner")
    .join(out_degree,  edges_raw.src == out_degree.src, "inner")
    .select(
        edges_raw.dst.alias("id"),
        (F.col("rank") / F.col("out_degree")).alias("contrib")
    )
)
with open("proof/plan_before.txt", "w") as f:
    with redirect_stdout(f):
        _before_q.explain(mode="formatted")
print("proof/plan_before.txt saved.")

# ── Repartition edge list and out-degree by 'src' ─────────────────────────────
edges_repartitioned  = edges_raw.repartition(N_PARTS, "src").cache()
edges_repartitioned.count()   # materialize
out_deg_repartitioned = out_degree.repartition(N_PARTS, "src").cache()
out_deg_repartitioned.count()
print(f"Edges repartitioned → {edges_repartitioned.rdd.getNumPartitions()} partitions by 'src'")

# ── Save AFTER plan ───────────────────────────────────────────────────────────
_after_q = (
    edges_repartitioned
    .join(_ranks_init,         edges_repartitioned.src == _ranks_init.id,            "inner")
    .join(out_deg_repartitioned, edges_repartitioned.src == out_deg_repartitioned.src, "inner")
    .select(
        edges_repartitioned.dst.alias("id"),
        (F.col("rank") / F.col("out_degree")).alias("contrib")
    )
)
with open("proof/plan_after.txt", "w") as f:
    with redirect_stdout(f):
        _after_q.explain(mode="formatted")
print("proof/plan_after.txt saved.")

# ── PageRank with repartitioned edges ─────────────────────────────────────────
ranks2 = vertices.withColumn("rank", F.lit(1.0 / n_vertices))
pr_metrics_repartitioned = []
converged_repartitioned  = False

print(f"\nStarting repartitioned PageRank — {N_PARTS} partitions by 'src'")
print()

for i in range(MAX_ITER):
    t0 = time.time()

    contribs2 = (
        edges_repartitioned
        .join(ranks2,              edges_repartitioned.src == ranks2.id,                "inner")
        .join(out_deg_repartitioned, edges_repartitioned.src == out_deg_repartitioned.src, "inner")
        .select(
            edges_repartitioned.dst.alias("id"),
            (F.col("rank") / F.col("out_degree")).alias("contrib")
        )
    )
    new_ranks2 = (
        contribs2.groupBy("id")
        .agg(F.sum("contrib").alias("sum_contrib"))
        .withColumn(
            "rank",
            F.lit((1.0 - DAMPING) / n_vertices)
            + F.lit(DAMPING) * F.col("sum_contrib")
        )
        .select("id", "rank")
    )
    new_ranks2.cache()
    new_ranks2.count()
    elapsed_ms = (time.time() - t0) * 1000

    old_r2 = ranks2.withColumnRenamed("rank", "old_rank")
    delta2  = (
        new_ranks2.join(old_r2, "id")
        .select(F.max(F.abs(new_ranks2.rank - F.col("old_rank"))).alias("delta"))
        .first()["delta"] or 0.0
    )

    pr_metrics_repartitioned.append({
        "run_id":              f"repartitioned_iter_{i:02d}",
        "path":                "A",
        "algorithm":           "pagerank_manual",
        "task":                f"pagerank_iter_{i}",
        "notes":               f"repartition({N_PARTS}, 'src') — optimized",
        "iteration_or_k":      i,
        "seed":                0,
        "metric_value":        round(delta2, 8),
        "shuffle_read_bytes":  0,
        "shuffle_write_bytes": 0,
        "elapsed_ms":          round(elapsed_ms, 1),
        "timestamp":           datetime.datetime.now().isoformat(),
    })

    print(f"  [REPARTITIONED] iter {i:2d} | {elapsed_ms:7.0f} ms | delta={delta2:.6f}")
    ranks2.unpersist()
    ranks2 = new_ranks2

    if delta2 < EPSILON:
        print(f"  → Converged at iteration {i}")
        converged_repartitioned = True
        break

if not converged_repartitioned:
    print(f"  → Did not converge within {MAX_ITER} iterations")

ranks2.write.mode("overwrite").parquet("outputs/lab3/pagerank_repartitioned")
print("Results → outputs/lab3/pagerank_repartitioned")
26/04/30 09:58:30 WARN Column: Constructing trivially true equals predicate, 'src == src'. Perhaps you need to use aliases.
proof/plan_before.txt saved.
Edges repartitioned → 8 partitions by 'src'
proof/plan_after.txt saved.

Starting repartitioned PageRank — 8 partitions by 'src'

26/04/30 09:58:31 WARN Column: Constructing trivially true equals predicate, 'src == src'. Perhaps you need to use aliases.
26/04/30 09:58:31 WARN Column: Constructing trivially true equals predicate, 'src == src'. Perhaps you need to use aliases.
  [REPARTITIONED] iter  0 |     505 ms | delta=0.001349
26/04/30 09:58:32 WARN Column: Constructing trivially true equals predicate, 'src == src'. Perhaps you need to use aliases.
  [REPARTITIONED] iter  1 |     315 ms | delta=0.002393
26/04/30 09:58:32 WARN Column: Constructing trivially true equals predicate, 'src == src'. Perhaps you need to use aliases.
  [REPARTITIONED] iter  2 |     402 ms | delta=0.002487
26/04/30 09:58:33 WARN Column: Constructing trivially true equals predicate, 'src == src'. Perhaps you need to use aliases.
  [REPARTITIONED] iter  3 |     546 ms | delta=0.002112
26/04/30 09:58:34 WARN Column: Constructing trivially true equals predicate, 'src == src'. Perhaps you need to use aliases.
  [REPARTITIONED] iter  4 |     687 ms | delta=0.000971
  → Converged at iteration 4
Results → outputs/lab3/pagerank_repartitioned

Step 4 — Convergence & Skew Analysis¶

Compare per-iteration elapsed time between baseline and repartitioned runs.
Analyse skew via degree distribution.

In [7]:
# ── Build comparison DataFrames ───────────────────────────────────────────────
df_base = pd.DataFrame(pr_metrics_baseline)
df_rept = pd.DataFrame(pr_metrics_repartitioned)

print("=" * 65)
print("CONVERGENCE — BASELINE (default hash partitioning)")
print("=" * 65)
if len(df_base):
    print(df_base[["iteration_or_k", "elapsed_ms", "metric_value"]]
          .rename(columns={"iteration_or_k": "iter",
                           "metric_value":   "delta"})
          .to_string(index=False))

print()
print("=" * 65)
print(f"CONVERGENCE — REPARTITIONED (repartition({N_PARTS}, 'src'))")
print("=" * 65)
if len(df_rept):
    print(df_rept[["iteration_or_k", "elapsed_ms", "metric_value"]]
          .rename(columns={"iteration_or_k": "iter",
                           "metric_value":   "delta"})
          .to_string(index=False))

# ── Per-iteration speed comparison ───────────────────────────────────────────
if len(df_base) and len(df_rept):
    avg_base  = df_base["elapsed_ms"].mean()
    avg_rept  = df_rept["elapsed_ms"].mean()
    total_base = df_base["elapsed_ms"].sum()
    total_rept = df_rept["elapsed_ms"].sum()
    gain       = (avg_base - avg_rept) / avg_base * 100 if avg_base > 0 else 0
    print(f"\nPerformance summary:")
    print(f"  Avg iteration time — Baseline      : {avg_base:8.0f} ms")
    print(f"  Avg iteration time — Repartitioned : {avg_rept:8.0f} ms")
    print(f"  Gain per iteration                 : {gain:+.1f}%")
    print(f"  Total time — Baseline              : {total_base:8.0f} ms")
    print(f"  Total time — Repartitioned         : {total_rept:8.0f} ms")

# ── Skew analysis ─────────────────────────────────────────────────────────────
print(f"\nSkew analysis (out-degree distribution):")
print(f"  Mean      : {deg_stats['mean']:.2f}")
print(f"  Std       : {deg_stats['stddev']:.2f}")
print(f"  Max       : {deg_stats['max']}")
print(f"  Skew ratio: {skew_ratio:.1f}x  (max / mean)")
if skew_ratio > 10:
    print("  ⚠ High skew — hot vertices inflate specific shuffle partitions")
    print("    Recommendation: filter out top-0.1% degree vertices or apply salting")
else:
    print("  ✓ Moderate skew — repartitioning by 'src' distributes load evenly")
=================================================================
CONVERGENCE — BASELINE (default hash partitioning)
=================================================================
 iter  elapsed_ms    delta
    0      1030.4 0.001349
    1       628.9 0.002393
    2       577.8 0.002487
    3       543.4 0.002112
    4       644.8 0.000971

=================================================================
CONVERGENCE — REPARTITIONED (repartition(8, 'src'))
=================================================================
 iter  elapsed_ms    delta
    0       505.3 0.001349
    1       315.0 0.002393
    2       401.5 0.002487
    3       546.2 0.002112
    4       686.9 0.000971

Performance summary:
  Avg iteration time — Baseline      :      685 ms
  Avg iteration time — Repartitioned :      491 ms
  Gain per iteration                 : +28.3%
  Total time — Baseline              :     3425 ms
  Total time — Repartitioned         :     2455 ms

Skew analysis (out-degree distribution):
  Mean      : 42.69
  Std       : 49.60
  Max       : 438
  Skew ratio: 10.3x  (max / mean)
  ⚠ High skew — hot vertices inflate specific shuffle partitions
    Recommendation: filter out top-0.1% degree vertices or apply salting

Step 5 — Evidence & Metrics¶

In [8]:
# ── Compile full metrics log ──────────────────────────────────────────────────
graph_build_row = {
    "run_id":              "graph_build",
    "path":                "A",
    "algorithm":           "graph_construction",
    "task":                "build_graph",
    "notes":               f"{n_vertices} vertices | {n_edges} edges | sector={SECTOR_DEG}deg/{TIME_BUCKET}s | skew_ratio={skew_ratio:.1f}x",
    "iteration_or_k":      0,
    "seed":                0,
    "metric_value":        round(skew_ratio, 3),
    "shuffle_read_bytes":  0,
    "shuffle_write_bytes": 0,
    "elapsed_ms":          0,
    "timestamp":           datetime.datetime.now().isoformat(),
}

all_metrics = [graph_build_row] + pr_metrics_baseline + pr_metrics_repartitioned

df_metrics = pd.DataFrame(all_metrics)
df_metrics.to_csv("lab3_metrics_log.csv", index=False)
print("lab3_metrics_log.csv written:")
print(df_metrics[["run_id", "task", "iteration_or_k",
                  "metric_value", "elapsed_ms", "notes"]].to_string(index=False))

# ── Verify all proof files ────────────────────────────────────────────────────
print("\n── Proof file checklist ─────────────────────────────────")
proof_files = [
    ("proof/plan_iteration.txt", "Physical plan for one PageRank iteration"),
    ("proof/plan_before.txt",    "Plan BEFORE repartitioning"),
    ("proof/plan_after.txt",     "Plan AFTER repartitioning"),
]
for fpath, desc in proof_files:
    p = pathlib.Path(fpath)
    ok = p.exists() and p.stat().st_size > 0
    print(f"  {'[x]' if ok else '[ ]'} {fpath:40s} — {desc}  ({p.stat().st_size if p.exists() else 0} bytes)")

print("\n── Deliverables checklist ────────────────────────────────")
deliverables = [
    ("assignment3_esiee.ipynb",                  "this notebook"),
    ("outputs/lab3/pagerank_baseline",           "Parquet — baseline PageRank scores"),
    ("outputs/lab3/pagerank_repartitioned",      "Parquet — repartitioned PageRank scores"),
    ("lab3_metrics_log.csv",                     "per-iteration metrics log"),
    ("engineering_note_lab3.md",                 "before/after comparative report"),
    ("GENAI.md",                                 "AI usage declaration"),
]
for path, desc in deliverables:
    ok = pathlib.Path(path).exists()
    print(f"  {'[x]' if ok else '[ ]'} {path:45s} — {desc}")

print()
print("  [ ] proof/spark_ui_jobs_baseline.png      — capture manually (Jobs tab)")
print("  [ ] proof/spark_ui_shuffle_before.png     — capture manually (SQL/Stages tab)")
print("  [ ] proof/spark_ui_shuffle_after.png      — capture manually (SQL/Stages tab)")
lab3_metrics_log.csv written:
               run_id            task  iteration_or_k  metric_value  elapsed_ms                                                              notes
          graph_build     build_graph               0     10.259000         0.0 6264 vertices | 256461 edges | sector=2deg/300s | skew_ratio=10.3x
     baseline_iter_00 pagerank_iter_0               0      0.001349      1030.4                                    default partitioning (baseline)
     baseline_iter_01 pagerank_iter_1               1      0.002393       628.9                                    default partitioning (baseline)
     baseline_iter_02 pagerank_iter_2               2      0.002487       577.8                                    default partitioning (baseline)
     baseline_iter_03 pagerank_iter_3               3      0.002112       543.4                                    default partitioning (baseline)
     baseline_iter_04 pagerank_iter_4               4      0.000971       644.8                                    default partitioning (baseline)
repartitioned_iter_00 pagerank_iter_0               0      0.001349       505.3                                  repartition(8, 'src') — optimized
repartitioned_iter_01 pagerank_iter_1               1      0.002393       315.0                                  repartition(8, 'src') — optimized
repartitioned_iter_02 pagerank_iter_2               2      0.002487       401.5                                  repartition(8, 'src') — optimized
repartitioned_iter_03 pagerank_iter_3               3      0.002112       546.2                                  repartition(8, 'src') — optimized
repartitioned_iter_04 pagerank_iter_4               4      0.000971       686.9                                  repartition(8, 'src') — optimized

── Proof file checklist ─────────────────────────────────
  [x] proof/plan_iteration.txt                 — Physical plan for one PageRank iteration  (33587 bytes)
  [x] proof/plan_before.txt                    — Plan BEFORE repartitioning  (33587 bytes)
  [x] proof/plan_after.txt                     — Plan AFTER repartitioning  (44734 bytes)

── Deliverables checklist ────────────────────────────────
  [x] assignment3_esiee.ipynb                       — this notebook
  [x] outputs/lab3/pagerank_baseline                — Parquet — baseline PageRank scores
  [x] outputs/lab3/pagerank_repartitioned           — Parquet — repartitioned PageRank scores
  [x] lab3_metrics_log.csv                          — per-iteration metrics log
  [x] engineering_note_lab3.md                      — before/after comparative report
  [x] GENAI.md                                      — AI usage declaration

  [ ] proof/spark_ui_jobs_baseline.png      — capture manually (Jobs tab)
  [ ] proof/spark_ui_shuffle_before.png     — capture manually (SQL/Stages tab)
  [ ] proof/spark_ui_shuffle_after.png      — capture manually (SQL/Stages tab)
In [9]:
spark.stop()
print("Assignment 3 complete.")
Assignment 3 complete.
In [ ]: