DE2 - Lab 3: Graphs or Clustering - Instrumented Iterative Workload (15%)¶

Author : Badr TAJINI - Data Engineering II - ESIEE 2025-2026

Track: D - Aviation (OpenSky Network)
Path chosen: A - Graph Processing (Airport Airspace Proximity Network + manual iterative PageRank)

Graph model:

  • Vertices = unique aircraft (icao24 identifiers)
  • Edges = two aircraft occupied the same 2°×2° lat/lon airspace sector within the same 5-minute time window
  • PageRank on this graph reveals the aircraft most central in the airspace traffic flow

Goal: Execute an iterative workload with a platform-engineering focus: partitioning strategies, skew analysis, per-iteration shuffle costs, convergence, and a before/after comparative report.

In [1]:
import os, time, pathlib, datetime, socket
import pandas as pd
import numpy as np
from contextlib import redirect_stdout
from urllib.parse import urlparse
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType, BooleanType

# ── WSL / WINDOWS IP CONFIGURATION ──────────────────────────────────────────
# This block ensures the Spark UI is accessible from your Windows browser.
# It detects the WSL internal IP and binds Spark to it.
try:
    # Create a dummy socket to find the eth0 IP address of WSL
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.connect(("8.8.8.8", 80))
    DE2_IP = s.getsockname()[0]
    s.close()
except Exception:
    DE2_IP = "127.0.0.1"

os.environ["SPARK_LOCAL_IP"] = DE2_IP

def show_spark_ui(s):
    """
    Prints the Spark UI links. 
    Click the 'WSL/Windows Access' link to open the UI in Chrome/Edge.
    """
    ui_url = s.sparkContext.uiWebUrl
    print(f"Spark version: {s.version}")
    if ui_url:
        ui_port = urlparse(ui_url).port or 4040
        print(f"Spark UI (Internal WSL): {ui_url}")
        print(f"Spark UI (WSL/Windows Access): http://localhost:{ui_port}")
    else:
        print("Spark UI: Not available")

# ── SPARK SESSION INITIALIZATION ──────────────────────────────────────────────
# We use 'local[*]' to utilize all available CPU cores.
# 4GB of RAM is allocated to prevent 'Java Heap Space' errors during joins.
spark = (
    SparkSession.builder
    .appName("DE2-Lab3-GraphProcessing")
    .master("local[*]")
    .config("spark.driver.host",                DE2_IP)
    .config("spark.driver.bindAddress",         "0.0.0.0")
    .config("spark.ui.bindAddress",             "0.0.0.0")
    .config("spark.driver.memory",              "4g") 
    .config("spark.sql.shuffle.partitions",     "8") # Optimized for local 8-core machines
    .getOrCreate()
)

show_spark_ui(spark)

# ── DIRECTORY SETUP ──────────────────────────────────────────────────────────
for p in ["outputs/lab3", "proof"]:
    pathlib.Path(p).mkdir(parents=True, exist_ok=True)
    print(f"Directory ready: {p}")
WARNING: Using incubator modules: jdk.incubator.vector
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/04/29 18:59:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark version: 4.0.1
Spark UI (Internal WSL): http://172.30.130.70:4040
Spark UI (WSL/Windows Access): http://localhost:4040
Directory ready: outputs/lab3
Directory ready: proof

0. Data Preparation¶

Load the OpenSky CSV from Lab 1.
If unavailable, generate a synthetic dataset that reproduces the same schema and statistical structure.

In [2]:
LAB1_CSV   = "data/raw_aviation/states_2017-06-05-00.csv"
SYNTH_PATH = "data/raw_aviation/synthetic_opensky.csv"
pathlib.Path("data/raw_aviation").mkdir(parents=True, exist_ok=True)

if not pathlib.Path(LAB1_CSV).exists() and not pathlib.Path(SYNTH_PATH).exists():
    print("Generating synthetic OpenSky dataset (≈ 5 000 records)...")
    rng = np.random.default_rng(42)
    n   = 5000
    icao24_pool  = [f"a{i:05x}" for i in range(300)]   # 300 unique aircraft
    squawk_pool  = [str(s).zfill(4) for s in rng.integers(1000, 7777, 60)]
    rows = {
        "time":         rng.integers(1496620800, 1496624400, n),   # 1h window
        "icao24":       rng.choice(icao24_pool, n),
        "lat":          rng.uniform(43.0, 52.0, n),                # Europe
        "lon":          rng.uniform(-5.0, 15.0, n),
        "velocity":     rng.uniform(100, 280, n),
        "heading":      rng.uniform(0, 360, n),
        "vertrate":     rng.uniform(-10, 10, n),
        "callsign":     [f"AF{rng.integers(100,999)}" for _ in range(n)],
        "onground":     rng.choice([True, False], n, p=[0.05, 0.95]),
        "alert":        rng.choice([True, False], n, p=[0.01, 0.99]),
        "spi":          [False] * n,
        "squawk":       rng.choice(squawk_pool, n),
        "baroaltitude": rng.uniform(1000, 12000, n),
        "geoaltitude":  rng.uniform(1000, 12000, n),
        "lastposupdate":rng.uniform(1496620800, 1496624400, n),
        "lastcontact":  rng.uniform(1496620800, 1496624400, n),
    }
    pd.DataFrame(rows).to_csv(SYNTH_PATH, index=False)
    print(f"Synthetic dataset written → {SYNTH_PATH}")
    SOURCE_CSV = SYNTH_PATH
else:
    SOURCE_CSV = LAB1_CSV if pathlib.Path(LAB1_CSV).exists() else SYNTH_PATH
    print(f"Using dataset: {SOURCE_CSV}")
Using dataset: data/raw_aviation/states_2017-06-05-00.csv

A.1 — Build Edge and Vertex DataFrames¶

Proximity graph construction:

  1. Snap each aircraft's position to a 2°×2° lat/lon sector grid and a 5-minute time bucket
  2. Two aircraft sharing the same (sector_lat, sector_lon, time_bucket) tuple → edge between them
  3. This models potential ATC separation conflicts or shared approach paths
In [3]:
# ── Load raw OpenSky data ─────────────────────────────────────────────────────
opensky_schema = StructType([
    StructField("time",          LongType(),   True),
    StructField("icao24",        StringType(), True),
    StructField("lat",           DoubleType(), True),
    StructField("lon",           DoubleType(), True),
    StructField("velocity",      DoubleType(), True),
    StructField("heading",       DoubleType(), True),
    StructField("vertrate",      DoubleType(), True),
    StructField("callsign",      StringType(), True),
    StructField("onground",      BooleanType(),True),
    StructField("alert",         BooleanType(),True),
    StructField("spi",           BooleanType(),True),
    StructField("squawk",        StringType(), True),
    StructField("baroaltitude",  DoubleType(), True),
    StructField("geoaltitude",   DoubleType(), True),
    StructField("lastposupdate", DoubleType(), True),
    StructField("lastcontact",   DoubleType(), True),
])

df_raw = (
    spark.read
    .schema(opensky_schema)
    .option("header", "true")
    .csv(SOURCE_CSV)
    .filter(F.col("icao24").isNotNull() & F.col("lat").isNotNull() & F.col("lon").isNotNull())
    .filter(~F.col("onground"))                  # airborne aircraft only
)

print(f"Raw records (airborne): {df_raw.count():,}")

# ── Snap to 2°×2° sectors + 5-minute time buckets ────────────────────────────
SECTOR_DEG  = 2     # degrees per sector cell
TIME_BUCKET = 300   # seconds (5 minutes)

df_sectors = df_raw.select(
    F.col("icao24"),
    (F.floor(F.col("lat") / SECTOR_DEG) * SECTOR_DEG).alias("sector_lat"),
    (F.floor(F.col("lon") / SECTOR_DEG) * SECTOR_DEG).alias("sector_lon"),
    (F.floor(F.col("time")  / TIME_BUCKET) * TIME_BUCKET).alias("time_bucket"),
)

# ── Build proximity edges via self-join on (sector, time_bucket) ──────────────
# Alias the same DataFrame to perform the self-join
left  = df_sectors.alias("l")
right = df_sectors.alias("r")

edges_raw = (
    left.join(
        right,
        on=(
            (F.col("l.sector_lat")  == F.col("r.sector_lat")) &
            (F.col("l.sector_lon")  == F.col("r.sector_lon")) &
            (F.col("l.time_bucket") == F.col("r.time_bucket")) &
            (F.col("l.icao24")      <  F.col("r.icao24"))     # avoid duplicates & self-loops
        )
    )
    .select(
        F.col("l.icao24").alias("src"),
        F.col("r.icao24").alias("dst"),
    )
    .distinct()
)

# Materialize edge list
edges_raw.cache()
n_edges = edges_raw.count()
print(f"Edges (unique aircraft pairs in same sector/window): {n_edges:,}")
edges_raw.show(10)

# ── Build vertex DataFrame ────────────────────────────────────────────────────
vertices = (
    edges_raw.select("src").withColumnRenamed("src", "id")
    .union(edges_raw.select("dst").withColumnRenamed("dst", "id"))
    .distinct()
)
vertices.cache()
n_vertices = vertices.count()
print(f"Vertices (aircraft in the graph): {n_vertices:,}")

# ── Skew analysis: out-degree distribution ────────────────────────────────────
out_degree = (
    edges_raw.groupBy("src")
    .count()
    .withColumnRenamed("count", "out_degree")
    .orderBy(F.desc("out_degree"))
)
out_degree.cache()
print("\nTop 10 aircraft by out-degree (hot vertices = potential skew sources):")
out_degree.show(10)

deg_stats = out_degree.select(
    F.mean("out_degree").alias("mean"),
    F.stddev("out_degree").alias("stddev"),
    F.max("out_degree").alias("max")
).first()
print(f"Degree — mean: {deg_stats['mean']:.2f}, std: {deg_stats['stddev']:.2f}, max: {deg_stats['max']}")
                                                                                
Raw records (airborne): 992,613
                                                                                
Edges (unique aircraft pairs in same sector/window): 256,461
+------+------+
|   src|   dst|
+------+------+
|7c39fa|7c4eeb|
|7503a9|7c7a45|
|7c6d26|7c6db0|
|7c6d26|7c752d|
|7c5b44|7c6d26|
|7c6c99|7c6d75|
|780aa2|7c6d29|
|780aa2|7c7a3e|
|780aa2|7c23be|
|7c7bd3|7cc3ff|
+------+------+
only showing top 10 rows
Vertices (aircraft in the graph): 6,264

Top 10 aircraft by out-degree (hot vertices = potential skew sources):
+------+----------+
|   src|out_degree|
+------+----------+
|0d0760|       438|
|2ac0af|       382|
|4245fb|       343|
|2ac64c|       335|
|2ac55b|       329|
|4b1901|       326|
|2ac126|       325|
|2980c5|       316|
|394a02|       304|
|a0420e|       304|
+------+----------+
only showing top 10 rows
Degree — mean: 42.69, std: 49.60, max: 438

A.2 — Iterative PageRank (manual implementation)¶

Algorithm:

  1. Initialize: rank[v] = 1 / N for all vertices
  2. Each iteration: rank[v] = (1-d)/N + d × Σ(rank[u]/out_degree[u]) for all in-neighbours u
  3. Damping factor d = 0.85 (standard web graph value, valid for airspace traffic too)
  4. Stopping criterion: max(|new_rank - old_rank|) < ε = 0.001

Each iteration is wrapped in time.time() and shuffle metrics are logged.

In [5]:
# ── PageRank constants ────────────────────────────────────────────────────────
DAMPING   = 0.85
MAX_ITER  = 10
EPSILON   = 0.001          # convergence threshold

# ── Use the DEFAULT partitioning (baseline) ───────────────────────────────────
edges_baseline = edges_raw                  # no explicit repartition

# ── Initialize ranks ─────────────────────────────────────────────────────────
ranks = vertices.withColumn("rank", F.lit(1.0 / n_vertices))

# Pre-compute out-degree (constant across iterations)
# Already computed above as `out_degree`

pr_metrics_baseline = []   # log per-iteration metrics
converged = False

for i in range(MAX_ITER):
    t0 = time.time()

    # 1. Join edges with current ranks (src → rank)
    contribs = (
        edges_baseline
        .join(ranks,      edges_baseline.src == ranks.id, "inner")
        .join(out_degree, edges_baseline.src == out_degree.src, "inner")
        .select(
            edges_baseline.dst.alias("id"),
            (F.col("rank") / F.col("out_degree")).alias("contrib")
        )
    )

    # 2. Aggregate contributions per destination
    new_ranks = (
        contribs
        .groupBy("id")
        .agg(F.sum("contrib").alias("sum_contrib"))
        .withColumn(
            "rank",
            F.lit((1.0 - DAMPING) / n_vertices) + F.lit(DAMPING) * F.col("sum_contrib")
        )
        .select("id", "rank")
    )

    # 3. Materialize to break DAG lineage, then measure convergence delta
    new_ranks.cache()
    new_ranks.count()    # force action
    elapsed_ms = (time.time() - t0) * 1000

    # 4. Convergence: max absolute rank change
    # Fix: Alias the old rank column to 'old_rank' to avoid [AMBIGUOUS_REFERENCE] error
    old_r = ranks.withColumnRenamed("rank", "old_rank")
    
    delta = (
        new_ranks.join(old_r, "id")
        .select(F.max(F.abs(F.col("rank") - F.col("old_rank"))).alias("delta"))
        .first()["delta"] or 0.0
    )

    # Log per-iteration metrics
    pr_metrics_baseline.append({
        "run_id":              f"baseline_iter_{i}",
        "path":                "A",
        "algorithm":           "pagerank_manual",
        "task":                f"pagerank_iter_{i}",
        "notes":               "default partitioning",
        "iteration_or_k":      i,
        "seed":                0,
        "metric_value":        delta,
        "shuffle_read_bytes":  0,   # to be filled manually from Spark UI
        "shuffle_write_bytes": 0,   # to be filled manually from Spark UI
        "elapsed_ms":          elapsed_ms,
        "timestamp":           datetime.datetime.now().isoformat(),
    })

    print(f"Iter {i:2d}: elapsed={elapsed_ms:7.0f} ms  |  delta={delta:.6f}")

    # Update ranks for next iteration and clean up cache
    ranks.unpersist()
    ranks = new_ranks

    if delta < EPSILON:
        print(f"  → Converged at iteration {i} (delta={delta:.6f} < ε={EPSILON})")
        converged = True
        break

if not converged:
    print(f"  → Did not converge within {MAX_ITER} iterations")

print("\nTop 15 aircraft by PageRank (most central in the airspace network):")
ranks.orderBy(F.desc("rank")).show(15)
26/04/29 19:06:48 WARN Column: Constructing trivially true equals predicate, 'src == src'. Perhaps you need to use aliases.
26/04/29 19:06:48 WARN CacheManager: Asked to cache already cached data.
Iter  0: elapsed=    265 ms  |  delta=0.001349
26/04/29 19:06:48 WARN Column: Constructing trivially true equals predicate, 'src == src'. Perhaps you need to use aliases.
Iter  1: elapsed=    584 ms  |  delta=0.002393
26/04/29 19:06:49 WARN Column: Constructing trivially true equals predicate, 'src == src'. Perhaps you need to use aliases.
Iter  2: elapsed=    591 ms  |  delta=0.002487
26/04/29 19:06:50 WARN Column: Constructing trivially true equals predicate, 'src == src'. Perhaps you need to use aliases.
Iter  3: elapsed=    555 ms  |  delta=0.002112
26/04/29 19:06:51 WARN Column: Constructing trivially true equals predicate, 'src == src'. Perhaps you need to use aliases.
Iter  4: elapsed=    593 ms  |  delta=0.000971
  → Converged at iteration 4 (delta=0.000971 < ε=0.001)

Top 15 aircraft by PageRank (most central in the airspace network):
+------+--------------------+
|    id|                rank|
+------+--------------------+
|c81e22|0.007440856845081005|
|cda28b|0.007002487247207178|
|e8043b|0.005849323222585...|
|e61f99|0.005281231065134172|
|c0884a|0.005247960316645426|
|c07fbf|0.004481929543099...|
|d2579e|0.003832716281315...|
|c080a0|0.003708087800843093|
|c052e0|0.003416716491559...|
|e8043a|0.003241819131628802|
|ae0211|0.003066926241153087|
|adea26|0.003027870476320895|
|c087bc|0.002991025535631735|
|c067e7|0.002811286913353...|
|c080a9|0.002766710516659172|
+------+--------------------+
only showing top 15 rows
In [6]:
# ── Save final PageRank results ───────────────────────────────────────────────
ranks.write.mode("overwrite").parquet("outputs/lab3/pagerank_results")
print("PageRank results written → outputs/lab3/pagerank_results")

# ── Save iteration plan (formatted physical plan for one iteration's join) ────
contribs_plan = (
    edges_baseline
    .join(ranks,      edges_baseline.src == ranks.id, "inner")
    .join(out_degree, edges_baseline.src == out_degree.src, "inner")
    .select(
        edges_baseline.dst.alias("id"),
        (F.col("rank") / F.col("out_degree")).alias("contrib")
    )
)
with open("proof/plan_iteration.txt", "w") as f:
    with redirect_stdout(f):
        contribs_plan.explain(mode="formatted")
print("proof/plan_iteration.txt saved.")
                                                                                
PageRank results written → outputs/lab3/pagerank_results
proof/plan_iteration.txt saved.
26/04/29 19:07:46 WARN Column: Constructing trivially true equals predicate, 'src == src'. Perhaps you need to use aliases.

A.3 — Partitioning Experiment¶

Hypothesis: Repartitioning edges by the join key (src) before the iterative loop collocates all out-edges of the same aircraft in the same partition, reducing shuffle during the join with ranks.

Strategies compared:

Strategy Description
Baseline Default hash partition (no explicit repartition)
Repartitioned edges.repartition(8, "src") + ranks.repartition(8, "id") before loop
In [7]:
# ── Save plan BEFORE repartitioning ──────────────────────────────────────────
ranks_init = vertices.withColumn("rank", F.lit(1.0 / n_vertices))
plan_before_q = (
    edges_raw
    .join(ranks_init,  edges_raw.src == ranks_init.id, "inner")
    .join(out_degree,  edges_raw.src == out_degree.src, "inner")
    .select(edges_raw.dst.alias("id"),
            (F.col("rank") / F.col("out_degree")).alias("contrib"))
)
with open("proof/plan_before.txt", "w") as f:
    with redirect_stdout(f):
        plan_before_q.explain(mode="formatted")
print("proof/plan_before.txt saved.")

# ── Repartition edges and out_degree by join key ──────────────────────────────
N_PARTS = 8
edges_repartitioned = edges_raw.repartition(N_PARTS, "src").cache()
edges_repartitioned.count()   # materialize
out_degree_repartitioned = out_degree.repartition(N_PARTS, "src").cache()
out_degree_repartitioned.count()
print(f"Edges repartitioned to {N_PARTS} partitions by 'src'.")

# ── Save plan AFTER repartitioning ────────────────────────────────────────────
plan_after_q = (
    edges_repartitioned
    .join(ranks_init,              edges_repartitioned.src == ranks_init.id, "inner")
    .join(out_degree_repartitioned, edges_repartitioned.src == out_degree_repartitioned.src, "inner")
    .select(edges_repartitioned.dst.alias("id"),
            (F.col("rank") / F.col("out_degree")).alias("contrib"))
)
with open("proof/plan_after.txt", "w") as f:
    with redirect_stdout(f):
        plan_after_q.explain(mode="formatted")
print("proof/plan_after.txt saved.")

# ── Run PageRank with repartitioned edges ─────────────────────────────────────
ranks2 = vertices.withColumn("rank", F.lit(1.0 / n_vertices))
pr_metrics_repartitioned = []

for i in range(MAX_ITER):
    t0 = time.time()

    contribs2 = (
        edges_repartitioned
        .join(ranks2,                    edges_repartitioned.src == ranks2.id, "inner")
        .join(out_degree_repartitioned,  edges_repartitioned.src == out_degree_repartitioned.src, "inner")
        .select(
            edges_repartitioned.dst.alias("id"),
            (F.col("rank") / F.col("out_degree")).alias("contrib")
        )
    )
    new_ranks2 = (
        contribs2.groupBy("id")
        .agg(F.sum("contrib").alias("sum_contrib"))
        .withColumn("rank",
                    F.lit((1.0 - DAMPING) / n_vertices) + F.lit(DAMPING) * F.col("sum_contrib"))
        .select("id", "rank")
    )
    new_ranks2.cache()
    new_ranks2.count()
    elapsed_ms = (time.time() - t0) * 1000

    old_r2 = ranks2.withColumnRenamed("rank", "old_rank")
    delta2  = (
        new_ranks2.join(old_r2, "id")
        .select(F.max(F.abs(new_ranks2.rank - F.col("old_rank"))).alias("delta"))
        .first()["delta"] or 0.0
    )

    pr_metrics_repartitioned.append({
        "run_id":              f"repartitioned_iter_{i}",
        "path":                "A",
        "algorithm":           "pagerank_manual",
        "task":                f"pagerank_iter_{i}",
        "notes":               f"repartition(8, 'src')",
        "iteration_or_k":      i,
        "seed":                0,
        "metric_value":        delta2,
        "shuffle_read_bytes":  0,
        "shuffle_write_bytes": 0,
        "elapsed_ms":          elapsed_ms,
        "timestamp":           datetime.datetime.now().isoformat(),
    })

    print(f"[REPARTITIONED] Iter {i:2d}: elapsed={elapsed_ms:7.0f} ms  |  delta={delta2:.6f}")
    ranks2.unpersist()
    ranks2 = new_ranks2
    if delta2 < EPSILON:
        print(f"  → Converged at iteration {i}")
        break

print("\nTop 10 aircraft (repartitioned run):")
ranks2.orderBy(F.desc("rank")).show(10)
26/04/29 19:07:57 WARN Column: Constructing trivially true equals predicate, 'src == src'. Perhaps you need to use aliases.
proof/plan_before.txt saved.
Edges repartitioned to 8 partitions by 'src'.
proof/plan_after.txt saved.
26/04/29 19:07:58 WARN Column: Constructing trivially true equals predicate, 'src == src'. Perhaps you need to use aliases.
26/04/29 19:07:58 WARN Column: Constructing trivially true equals predicate, 'src == src'. Perhaps you need to use aliases.
[REPARTITIONED] Iter  0: elapsed=    632 ms  |  delta=0.001349
26/04/29 19:07:59 WARN Column: Constructing trivially true equals predicate, 'src == src'. Perhaps you need to use aliases.
[REPARTITIONED] Iter  1: elapsed=    361 ms  |  delta=0.002393
26/04/29 19:08:00 WARN Column: Constructing trivially true equals predicate, 'src == src'. Perhaps you need to use aliases.
[REPARTITIONED] Iter  2: elapsed=    440 ms  |  delta=0.002487
26/04/29 19:08:00 WARN Column: Constructing trivially true equals predicate, 'src == src'. Perhaps you need to use aliases.
[REPARTITIONED] Iter  3: elapsed=    659 ms  |  delta=0.002112
26/04/29 19:08:01 WARN Column: Constructing trivially true equals predicate, 'src == src'. Perhaps you need to use aliases.
                                                                                
[REPARTITIONED] Iter  4: elapsed=    757 ms  |  delta=0.000971
  → Converged at iteration 4

Top 10 aircraft (repartitioned run):
+------+--------------------+
|    id|                rank|
+------+--------------------+
|c81e22|0.007440856845081...|
|cda28b|0.007002487247207179|
|e8043b|0.005849323222585473|
|e61f99|0.005281231065134172|
|c0884a|0.005247960316645426|
|c07fbf| 0.00448192954309929|
|d2579e|0.003832716281315...|
|c080a0|0.003708087800843093|
|c052e0|0.003416716491559356|
|e8043a|0.003241819131628801|
+------+--------------------+
only showing top 10 rows

A.4 — Convergence & Skew Analysis¶

In [8]:
# ── Convergence table ─────────────────────────────────────────────────────────
df_base = pd.DataFrame(pr_metrics_baseline)
df_rept = pd.DataFrame(pr_metrics_repartitioned)

print("=== Baseline convergence ===")
print(df_base[["iteration_or_k", "elapsed_ms", "metric_value"]].to_string(index=False))
print("\n=== Repartitioned convergence ===")
print(df_rept[["iteration_or_k", "elapsed_ms", "metric_value"]].to_string(index=False))

# ── Skew summary ──────────────────────────────────────────────────────────────
print(f"\nSkew analysis:")
print(f"  Mean out-degree : {deg_stats['mean']:.2f}")
print(f"  Std out-degree  : {deg_stats['stddev']:.2f}")
print(f"  Max out-degree  : {deg_stats['max']}")
skew_ratio = deg_stats["max"] / deg_stats["mean"] if deg_stats["mean"] else 0
print(f"  Skew ratio      : {skew_ratio:.1f}x (max/mean)")
if skew_ratio > 10:
    print("  ⚠ High skew — hot vertices will cause uneven shuffle partitions")
else:
    print("  ✓ Moderate skew — repartitioning by 'src' should balance load")

# ── Performance comparison ────────────────────────────────────────────────────
if len(df_base) > 0 and len(df_rept) > 0:
    avg_base = df_base["elapsed_ms"].mean()
    avg_rept = df_rept["elapsed_ms"].mean()
    gain = (avg_base - avg_rept) / avg_base * 100 if avg_base > 0 else 0
    print(f"\nPerformance (avg per iteration):")
    print(f"  Baseline      : {avg_base:.0f} ms")
    print(f"  Repartitioned : {avg_rept:.0f} ms")
    print(f"  Gain          : {gain:+.1f}%")
=== Baseline convergence ===
 iteration_or_k  elapsed_ms  metric_value
              0  264.610529      0.001349
              1  584.051847      0.002393
              2  590.568781      0.002487
              3  555.108547      0.002112
              4  592.921972      0.000971

=== Repartitioned convergence ===
 iteration_or_k  elapsed_ms  metric_value
              0  632.028103      0.001349
              1  361.491442      0.002393
              2  439.799070      0.002487
              3  659.268141      0.002112
              4  756.708622      0.000971

Skew analysis:
  Mean out-degree : 42.69
  Std out-degree  : 49.60
  Max out-degree  : 438
  Skew ratio      : 10.3x (max/mean)
  ⚠ High skew — hot vertices will cause uneven shuffle partitions

Performance (avg per iteration):
  Baseline      : 517 ms
  Repartitioned : 570 ms
  Gain          : -10.1%

Shared — Evidence & Metrics¶

In [9]:
# ── Write lab3_metrics_log.csv ────────────────────────────────────────────────
all_metrics = pr_metrics_baseline + pr_metrics_repartitioned

# Add skew and vertex/edge count rows
all_metrics.insert(0, {
    "run_id": "graph_build", "path": "A", "algorithm": "graph_construction",
    "task": "build_graph", "notes": f"{n_vertices} vertices, {n_edges} edges, sector=2deg/5min",
    "iteration_or_k": 0, "seed": 0,
    "metric_value": skew_ratio,
    "shuffle_read_bytes": 0, "shuffle_write_bytes": 0,
    "elapsed_ms": 0, "timestamp": datetime.datetime.now().isoformat(),
})

df_metrics = pd.DataFrame(all_metrics)
df_metrics.to_csv("lab3_metrics_log.csv", index=False)
print("lab3_metrics_log.csv written:")
print(df_metrics[["run_id","task","iteration_or_k","metric_value","elapsed_ms"]].to_string(index=False))

# ── Verify proof files ────────────────────────────────────────────────────────
print("\nProof files:")
for f in ["proof/plan_iteration.txt", "proof/plan_before.txt", "proof/plan_after.txt"]:
    p = pathlib.Path(f)
    print(f"  {'[x]' if p.exists() else '[ ]'} {f}  ({p.stat().st_size if p.exists() else 0} bytes)")
print("  [ ] proof/spark_ui_shuffle_before.png  — capture manually")
print("  [ ] proof/spark_ui_shuffle_after.png   — capture manually")
lab3_metrics_log.csv written:
              run_id            task  iteration_or_k  metric_value  elapsed_ms
         graph_build     build_graph               0     10.259127    0.000000
     baseline_iter_0 pagerank_iter_0               0      0.001349  264.610529
     baseline_iter_1 pagerank_iter_1               1      0.002393  584.051847
     baseline_iter_2 pagerank_iter_2               2      0.002487  590.568781
     baseline_iter_3 pagerank_iter_3               3      0.002112  555.108547
     baseline_iter_4 pagerank_iter_4               4      0.000971  592.921972
repartitioned_iter_0 pagerank_iter_0               0      0.001349  632.028103
repartitioned_iter_1 pagerank_iter_1               1      0.002393  361.491442
repartitioned_iter_2 pagerank_iter_2               2      0.002487  439.799070
repartitioned_iter_3 pagerank_iter_3               3      0.002112  659.268141
repartitioned_iter_4 pagerank_iter_4               4      0.000971  756.708622

Proof files:
  [x] proof/plan_iteration.txt  (1183547 bytes)
  [x] proof/plan_before.txt  (33583 bytes)
  [x] proof/plan_after.txt  (44730 bytes)
  [ ] proof/spark_ui_shuffle_before.png  — capture manually
  [ ] proof/spark_ui_shuffle_after.png   — capture manually

Cleanup¶

In [10]:
spark.stop()
print("Lab 3 complete.")
Lab 3 complete.
In [ ]: