DE2 — Assignment 3: Graphs or Clustering¶
Author : Badr TAJINI - Data Engineering II (Data-Intensive Workloads) - ESIEE 2025-2026
Track: D — Aviation (OpenSky Network)
Path chosen: A — Graph Processing (Airspace Proximity Network + manual iterative PageRank)
Names: Justine Guirauden and Volcy Desmazures
Graph model¶
| Element | Definition |
|---|---|
| Vertex | A unique airborne aircraft identified by its icao24 transponder code |
| Edge | Two aircraft shared the same 2°×2° lat/lon sector within the same 5-minute time window |
| Weight | Unweighted (binary co-presence) |
PageRank applied to this graph identifies the aircraft most central in the airspace traffic network, those that are most frequently co-located with many other aircraft, which maps to high-density approach/departure corridors.
Partitioning experiment: default hash partitioning (baseline) vs explicit repartition(N, "src") (optimized).
0. Setup¶
import os, time, pathlib, datetime, socket
import pandas as pd
import numpy as np
from contextlib import redirect_stdout
from urllib.parse import urlparse
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import (
StructType, StructField, StringType,
LongType, DoubleType, BooleanType
)
# ── Network binding (WSL / local compatibility) ───────────────────────────────
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
LOCAL_IP = s.getsockname()[0]
s.close()
except Exception:
LOCAL_IP = "127.0.0.1"
DE2_SPARK_DRIVER_HOST = os.environ.get("DE2_SPARK_DRIVER_HOST", LOCAL_IP)
DE2_SPARK_BIND_ADDRESS = os.environ.get("DE2_SPARK_BIND_ADDRESS", "0.0.0.0")
os.environ.setdefault("SPARK_LOCAL_IP", DE2_SPARK_DRIVER_HOST)
def show_spark_ui(spark_session):
ui_url = spark_session.sparkContext.uiWebUrl
print("Spark version:", spark_session.version)
if ui_url:
ui_port = urlparse(ui_url).port or 4040
print("Spark UI:", ui_url)
print("Spark UI (WSL/Windows browser):", f"http://localhost:{ui_port}")
else:
print("Spark UI: not available")
spark = (
SparkSession.builder
.appName("de2-assignment3-aviation-pagerank")
.master("local[*]")
.config("spark.driver.host", DE2_SPARK_DRIVER_HOST)
.config("spark.driver.bindAddress", DE2_SPARK_BIND_ADDRESS)
.config("spark.ui.bindAddress", DE2_SPARK_BIND_ADDRESS)
# Tuned for local mode: avoid 200-task shuffle overhead
.config("spark.sql.shuffle.partitions", "8")
.getOrCreate()
)
show_spark_ui(spark)
# ── Directory setup ───────────────────────────────────────────────────────────
for p in ["outputs/lab3", "proof", "data/raw_aviation"]:
pathlib.Path(p).mkdir(parents=True, exist_ok=True)
# ── PageRank hyper-parameters ────────────────────────────────────────────────
DAMPING = 0.85 # standard damping factor
MAX_ITER = 10 # maximum iterations
EPSILON = 0.001 # convergence threshold (max absolute rank delta)
SECTOR_DEG = 2 # degrees per grid cell
TIME_BUCKET = 300 # 5-minute windows (seconds)
N_PARTS = 8 # repartition target (match local core count)
print("Setup complete.")
WARNING: Using incubator modules: jdk.incubator.vector Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 26/04/30 09:36:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark version: 4.0.1 Spark UI: http://172.30.130.70:4040 Spark UI (WSL/Windows browser): http://localhost:4040 Setup complete.
Step 1 — Data & Graph Construction¶
Load the OpenSky dataset from Lab 1.
If absent, a reproducible synthetic dataset (same schema, similar statistics) is generated automatically.
# ── Source selection ─────────────────────────────────────────────────────────
LAB1_CSV = "data/raw_aviation/states_2017-06-05-00.csv"
SYNTH_PATH = "data/raw_aviation/synthetic_opensky.csv"
if not pathlib.Path(LAB1_CSV).exists() and not pathlib.Path(SYNTH_PATH).exists():
print("Generating synthetic OpenSky dataset (5 000 records)...")
rng = np.random.default_rng(42)
n = 5000
icao24_pool = [f"a{i:05x}" for i in range(300)]
squawk_pool = [str(s).zfill(4) for s in rng.integers(1000, 7777, 60)]
pd.DataFrame({
"time": rng.integers(1496620800, 1496624400, n),
"icao24": rng.choice(icao24_pool, n),
"lat": rng.uniform(43.0, 52.0, n),
"lon": rng.uniform(-5.0, 15.0, n),
"velocity": rng.uniform(100, 280, n),
"heading": rng.uniform(0, 360, n),
"vertrate": rng.uniform(-10, 10, n),
"callsign": [f"AF{rng.integers(100,999)}" for _ in range(n)],
"onground": rng.choice([True, False], n, p=[0.05, 0.95]),
"alert": rng.choice([True, False], n, p=[0.01, 0.99]),
"spi": [False] * n,
"squawk": rng.choice(squawk_pool, n),
"baroaltitude": rng.uniform(1000, 12000, n),
"geoaltitude": rng.uniform(1000, 12000, n),
"lastposupdate":rng.uniform(1496620800, 1496624400, n),
"lastcontact": rng.uniform(1496620800, 1496624400, n),
}).to_csv(SYNTH_PATH, index=False)
print(f"Synthetic data → {SYNTH_PATH}")
SOURCE_CSV = LAB1_CSV if pathlib.Path(LAB1_CSV).exists() else SYNTH_PATH
print(f"Source: {SOURCE_CSV}")
Source: data/raw_aviation/states_2017-06-05-00.csv
# ── Schema (identical to Lab 1) ───────────────────────────────────────────────
opensky_schema = StructType([
StructField("time", LongType(), True),
StructField("icao24", StringType(), True),
StructField("lat", DoubleType(), True),
StructField("lon", DoubleType(), True),
StructField("velocity", DoubleType(), True),
StructField("heading", DoubleType(), True),
StructField("vertrate", DoubleType(), True),
StructField("callsign", StringType(), True),
StructField("onground", BooleanType(), True),
StructField("alert", BooleanType(), True),
StructField("spi", BooleanType(), True),
StructField("squawk", StringType(), True),
StructField("baroaltitude", DoubleType(), True),
StructField("geoaltitude", DoubleType(), True),
StructField("lastposupdate", DoubleType(), True),
StructField("lastcontact", DoubleType(), True),
])
# ── Load + filter (airborne only) ────────────────────────────────────────────
df_raw = (
spark.read
.schema(opensky_schema)
.option("header", "true")
.csv(SOURCE_CSV)
.filter(
F.col("icao24").isNotNull() &
F.col("lat").isNotNull() &
F.col("lon").isNotNull() &
~F.col("onground")
)
)
print(f"Raw airborne records: {df_raw.count():,}")
# ── Snap to 2°×2° sector grid + 5-minute time bucket ─────────────────────────
df_sectors = df_raw.select(
F.col("icao24"),
(F.floor(F.col("lat") / SECTOR_DEG) * SECTOR_DEG).alias("sector_lat"),
(F.floor(F.col("lon") / SECTOR_DEG) * SECTOR_DEG).alias("sector_lon"),
(F.floor(F.col("time") / TIME_BUCKET) * TIME_BUCKET).alias("time_bucket"),
)
# ── Proximity edges via self-join on sector + time_bucket ─────────────────────
# Using strict inequality (l.icao24 < r.icao24) → unique undirected edges
left = df_sectors.alias("l")
right = df_sectors.alias("r")
edges_raw = (
left.join(
right,
(
(F.col("l.sector_lat") == F.col("r.sector_lat")) &
(F.col("l.sector_lon") == F.col("r.sector_lon")) &
(F.col("l.time_bucket") == F.col("r.time_bucket")) &
(F.col("l.icao24") < F.col("r.icao24")) # no self-loops, no duplicates
)
)
.select(
F.col("l.icao24").alias("src"),
F.col("r.icao24").alias("dst"),
)
.distinct()
)
edges_raw.cache()
n_edges = edges_raw.count()
# ── Vertex DataFrame ─────────────────────────────────────────────────────────
vertices = (
edges_raw.select(F.col("src").alias("id"))
.union(edges_raw.select(F.col("dst").alias("id")))
.distinct()
)
vertices.cache()
n_vertices = vertices.count()
print(f"Graph — Vertices: {n_vertices:,} | Edges: {n_edges:,}")
edges_raw.show(8)
# ── Out-degree + Skew analysis ────────────────────────────────────────────────
out_degree = (
edges_raw.groupBy("src")
.count()
.withColumnRenamed("count", "out_degree")
.orderBy(F.desc("out_degree"))
)
out_degree.cache()
deg_stats = out_degree.select(
F.mean("out_degree").alias("mean"),
F.stddev("out_degree").alias("stddev"),
F.max("out_degree").alias("max")
).first()
skew_ratio = (deg_stats["max"] / deg_stats["mean"]) if deg_stats["mean"] else 0
print(f"\nOut-degree stats — mean: {deg_stats['mean']:.2f} std: {deg_stats['stddev']:.2f} max: {deg_stats['max']}")
print(f"Skew ratio (max/mean): {skew_ratio:.1f}x")
print("\nTop 10 hot vertices (potential skew sources):")
out_degree.show(10)
Raw airborne records: 992,613
Graph — Vertices: 6,264 | Edges: 256,461 +------+------+ | src| dst| +------+------+ |7c39fa|7c4eeb| |7503a9|7c7a45| |7c6d26|7c6db0| |7c6d26|7c752d| |7c5b44|7c6d26| |7c6c99|7c6d75| |780aa2|7c6d29| |780aa2|7c7a3e| +------+------+ only showing top 8 rows Out-degree stats — mean: 42.69 std: 49.60 max: 438 Skew ratio (max/mean): 10.3x Top 10 hot vertices (potential skew sources): +------+----------+ | src|out_degree| +------+----------+ |0d0760| 438| |2ac0af| 382| |4245fb| 343| |2ac64c| 335| |2ac55b| 329| |4b1901| 326| |2ac126| 325| |2980c5| 316| |394a02| 304| |a0420e| 304| +------+----------+ only showing top 10 rows
Step 2 — Iterative PageRank (Baseline)¶
Manual implementation without explicit repartitioning.
Each iteration records wall-clock time and convergence delta for later comparison.
# ── Baseline PageRank (default hash partitioning) ─────────────────────────────
edges_baseline = edges_raw # no repartition
ranks = vertices.withColumn("rank", F.lit(1.0 / n_vertices))
pr_metrics_baseline = []
converged_baseline = False
print(f"Starting PageRank — {n_vertices} vertices, damping={DAMPING}, ε={EPSILON}")
print(f"Partitions (edges): {edges_baseline.rdd.getNumPartitions()}")
print()
for i in range(MAX_ITER):
t0 = time.time()
# 1. Distribute rank from each src proportionally to its out-edges
contribs = (
edges_baseline
.join(ranks, edges_baseline.src == ranks.id, "inner")
.join(out_degree, edges_baseline.src == out_degree.src, "inner")
.select(
edges_baseline.dst.alias("id"),
(F.col("rank") / F.col("out_degree")).alias("contrib")
)
)
# 2. Aggregate incoming contributions, apply damping
new_ranks = (
contribs
.groupBy("id")
.agg(F.sum("contrib").alias("sum_contrib"))
.withColumn(
"rank",
F.lit((1.0 - DAMPING) / n_vertices)
+ F.lit(DAMPING) * F.col("sum_contrib")
)
.select("id", "rank")
)
new_ranks.cache()
new_ranks.count() # materialize — breaks DAG lineage growth
elapsed_ms = (time.time() - t0) * 1000
# 3. Convergence: max |new_rank - old_rank|
old_r = ranks.withColumnRenamed("rank", "old_rank")
delta = (
new_ranks.join(old_r, "id")
.select(F.max(F.abs(new_ranks.rank - F.col("old_rank"))).alias("delta"))
.first()["delta"] or 0.0
)
pr_metrics_baseline.append({
"run_id": f"baseline_iter_{i:02d}",
"path": "A",
"algorithm": "pagerank_manual",
"task": f"pagerank_iter_{i}",
"notes": "default partitioning (baseline)",
"iteration_or_k": i,
"seed": 0,
"metric_value": round(delta, 8),
"shuffle_read_bytes": 0, # complete from Spark UI after execution
"shuffle_write_bytes": 0,
"elapsed_ms": round(elapsed_ms, 1),
"timestamp": datetime.datetime.now().isoformat(),
})
print(f" [BASELINE] iter {i:2d} | {elapsed_ms:7.0f} ms | delta={delta:.6f}")
ranks.unpersist()
ranks = new_ranks
if delta < EPSILON:
print(f" → Converged at iteration {i} (delta < ε={EPSILON})")
converged_baseline = True
break
if not converged_baseline:
print(f" → Did not converge within {MAX_ITER} iterations")
print("\nTop 15 aircraft by PageRank (baseline):")
ranks.orderBy(F.desc("rank")).show(15)
# ── Persist final results ─────────────────────────────────────────────────────
ranks.write.mode("overwrite").parquet("outputs/lab3/pagerank_baseline")
print("Results → outputs/lab3/pagerank_baseline")
Starting PageRank — 6264 vertices, damping=0.85, ε=0.001 Partitions (edges): 8
26/04/30 09:53:57 WARN Column: Constructing trivially true equals predicate, 'src == src'. Perhaps you need to use aliases.
[BASELINE] iter 0 | 1030 ms | delta=0.001349
26/04/30 09:53:58 WARN Column: Constructing trivially true equals predicate, 'src == src'. Perhaps you need to use aliases.
[BASELINE] iter 1 | 629 ms | delta=0.002393
26/04/30 09:54:00 WARN Column: Constructing trivially true equals predicate, 'src == src'. Perhaps you need to use aliases.
[BASELINE] iter 2 | 578 ms | delta=0.002487
26/04/30 09:54:00 WARN Column: Constructing trivially true equals predicate, 'src == src'. Perhaps you need to use aliases.
[BASELINE] iter 3 | 543 ms | delta=0.002112
26/04/30 09:54:01 WARN Column: Constructing trivially true equals predicate, 'src == src'. Perhaps you need to use aliases.
[BASELINE] iter 4 | 645 ms | delta=0.000971 → Converged at iteration 4 (delta < ε=0.001) Top 15 aircraft by PageRank (baseline): +------+--------------------+ | id| rank| +------+--------------------+ |c81e22|0.007440856845081005| |cda28b|0.007002487247207178| |e8043b|0.005849323222585...| |e61f99|0.005281231065134173| |c0884a|0.005247960316645426| |c07fbf|0.004481929543099...| |d2579e|0.003832716281315...| |c080a0|0.003708087800843...| |c052e0|0.003416716491559...| |e8043a|0.003241819131628802| |ae0211|0.003066926241153087| |adea26|0.003027870476320895| |c087bc|0.002991025535631735| |c067e7|0.002811286913353...| |c080a9|0.002766710516659...| +------+--------------------+ only showing top 15 rows
26/04/30 09:54:04 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
Results → outputs/lab3/pagerank_baseline
2b. Save iteration plan¶
# Reconstruct the last iteration's contribution query for plan capture
_ranks_snap = vertices.withColumn("rank", F.lit(1.0 / n_vertices))
_contribs_plan = (
edges_baseline
.join(_ranks_snap, edges_baseline.src == _ranks_snap.id, "inner")
.join(out_degree, edges_baseline.src == out_degree.src, "inner")
.select(
edges_baseline.dst.alias("id"),
(F.col("rank") / F.col("out_degree")).alias("contrib")
)
)
with open("proof/plan_iteration.txt", "w") as f:
with redirect_stdout(f):
_contribs_plan.explain(mode="formatted")
print("proof/plan_iteration.txt saved.")
proof/plan_iteration.txt saved.
26/04/30 09:58:20 WARN Column: Constructing trivially true equals predicate, 'src == src'. Perhaps you need to use aliases.
Step 3 — Partitioning Experiment¶
Hypothesis: Pre-partitioning edges and out_degree on the join key (src) places all out-edges of a given aircraft on the same executor, turning the subsequent join(ranks, edges.src == ranks.id) into a co-located (sort-merge) join with reduced cross-partition shuffle.
Before: default hash partitioning — Spark must shuffle both sides of every join.
After: repartition(8, "src") applied once before the loop — subsequent joins on src benefit from pre-sorted co-location.
# ── Save BEFORE plan ─────────────────────────────────────────────────────────
_ranks_init = vertices.withColumn("rank", F.lit(1.0 / n_vertices))
_before_q = (
edges_raw
.join(_ranks_init, edges_raw.src == _ranks_init.id, "inner")
.join(out_degree, edges_raw.src == out_degree.src, "inner")
.select(
edges_raw.dst.alias("id"),
(F.col("rank") / F.col("out_degree")).alias("contrib")
)
)
with open("proof/plan_before.txt", "w") as f:
with redirect_stdout(f):
_before_q.explain(mode="formatted")
print("proof/plan_before.txt saved.")
# ── Repartition edge list and out-degree by 'src' ─────────────────────────────
edges_repartitioned = edges_raw.repartition(N_PARTS, "src").cache()
edges_repartitioned.count() # materialize
out_deg_repartitioned = out_degree.repartition(N_PARTS, "src").cache()
out_deg_repartitioned.count()
print(f"Edges repartitioned → {edges_repartitioned.rdd.getNumPartitions()} partitions by 'src'")
# ── Save AFTER plan ───────────────────────────────────────────────────────────
_after_q = (
edges_repartitioned
.join(_ranks_init, edges_repartitioned.src == _ranks_init.id, "inner")
.join(out_deg_repartitioned, edges_repartitioned.src == out_deg_repartitioned.src, "inner")
.select(
edges_repartitioned.dst.alias("id"),
(F.col("rank") / F.col("out_degree")).alias("contrib")
)
)
with open("proof/plan_after.txt", "w") as f:
with redirect_stdout(f):
_after_q.explain(mode="formatted")
print("proof/plan_after.txt saved.")
# ── PageRank with repartitioned edges ─────────────────────────────────────────
ranks2 = vertices.withColumn("rank", F.lit(1.0 / n_vertices))
pr_metrics_repartitioned = []
converged_repartitioned = False
print(f"\nStarting repartitioned PageRank — {N_PARTS} partitions by 'src'")
print()
for i in range(MAX_ITER):
t0 = time.time()
contribs2 = (
edges_repartitioned
.join(ranks2, edges_repartitioned.src == ranks2.id, "inner")
.join(out_deg_repartitioned, edges_repartitioned.src == out_deg_repartitioned.src, "inner")
.select(
edges_repartitioned.dst.alias("id"),
(F.col("rank") / F.col("out_degree")).alias("contrib")
)
)
new_ranks2 = (
contribs2.groupBy("id")
.agg(F.sum("contrib").alias("sum_contrib"))
.withColumn(
"rank",
F.lit((1.0 - DAMPING) / n_vertices)
+ F.lit(DAMPING) * F.col("sum_contrib")
)
.select("id", "rank")
)
new_ranks2.cache()
new_ranks2.count()
elapsed_ms = (time.time() - t0) * 1000
old_r2 = ranks2.withColumnRenamed("rank", "old_rank")
delta2 = (
new_ranks2.join(old_r2, "id")
.select(F.max(F.abs(new_ranks2.rank - F.col("old_rank"))).alias("delta"))
.first()["delta"] or 0.0
)
pr_metrics_repartitioned.append({
"run_id": f"repartitioned_iter_{i:02d}",
"path": "A",
"algorithm": "pagerank_manual",
"task": f"pagerank_iter_{i}",
"notes": f"repartition({N_PARTS}, 'src') — optimized",
"iteration_or_k": i,
"seed": 0,
"metric_value": round(delta2, 8),
"shuffle_read_bytes": 0,
"shuffle_write_bytes": 0,
"elapsed_ms": round(elapsed_ms, 1),
"timestamp": datetime.datetime.now().isoformat(),
})
print(f" [REPARTITIONED] iter {i:2d} | {elapsed_ms:7.0f} ms | delta={delta2:.6f}")
ranks2.unpersist()
ranks2 = new_ranks2
if delta2 < EPSILON:
print(f" → Converged at iteration {i}")
converged_repartitioned = True
break
if not converged_repartitioned:
print(f" → Did not converge within {MAX_ITER} iterations")
ranks2.write.mode("overwrite").parquet("outputs/lab3/pagerank_repartitioned")
print("Results → outputs/lab3/pagerank_repartitioned")
26/04/30 09:58:30 WARN Column: Constructing trivially true equals predicate, 'src == src'. Perhaps you need to use aliases.
proof/plan_before.txt saved. Edges repartitioned → 8 partitions by 'src' proof/plan_after.txt saved. Starting repartitioned PageRank — 8 partitions by 'src'
26/04/30 09:58:31 WARN Column: Constructing trivially true equals predicate, 'src == src'. Perhaps you need to use aliases. 26/04/30 09:58:31 WARN Column: Constructing trivially true equals predicate, 'src == src'. Perhaps you need to use aliases.
[REPARTITIONED] iter 0 | 505 ms | delta=0.001349
26/04/30 09:58:32 WARN Column: Constructing trivially true equals predicate, 'src == src'. Perhaps you need to use aliases.
[REPARTITIONED] iter 1 | 315 ms | delta=0.002393
26/04/30 09:58:32 WARN Column: Constructing trivially true equals predicate, 'src == src'. Perhaps you need to use aliases.
[REPARTITIONED] iter 2 | 402 ms | delta=0.002487
26/04/30 09:58:33 WARN Column: Constructing trivially true equals predicate, 'src == src'. Perhaps you need to use aliases.
[REPARTITIONED] iter 3 | 546 ms | delta=0.002112
26/04/30 09:58:34 WARN Column: Constructing trivially true equals predicate, 'src == src'. Perhaps you need to use aliases.
[REPARTITIONED] iter 4 | 687 ms | delta=0.000971 → Converged at iteration 4 Results → outputs/lab3/pagerank_repartitioned
Step 4 — Convergence & Skew Analysis¶
Compare per-iteration elapsed time between baseline and repartitioned runs.
Analyse skew via degree distribution.
# ── Build comparison DataFrames ───────────────────────────────────────────────
df_base = pd.DataFrame(pr_metrics_baseline)
df_rept = pd.DataFrame(pr_metrics_repartitioned)
print("=" * 65)
print("CONVERGENCE — BASELINE (default hash partitioning)")
print("=" * 65)
if len(df_base):
print(df_base[["iteration_or_k", "elapsed_ms", "metric_value"]]
.rename(columns={"iteration_or_k": "iter",
"metric_value": "delta"})
.to_string(index=False))
print()
print("=" * 65)
print(f"CONVERGENCE — REPARTITIONED (repartition({N_PARTS}, 'src'))")
print("=" * 65)
if len(df_rept):
print(df_rept[["iteration_or_k", "elapsed_ms", "metric_value"]]
.rename(columns={"iteration_or_k": "iter",
"metric_value": "delta"})
.to_string(index=False))
# ── Per-iteration speed comparison ───────────────────────────────────────────
if len(df_base) and len(df_rept):
avg_base = df_base["elapsed_ms"].mean()
avg_rept = df_rept["elapsed_ms"].mean()
total_base = df_base["elapsed_ms"].sum()
total_rept = df_rept["elapsed_ms"].sum()
gain = (avg_base - avg_rept) / avg_base * 100 if avg_base > 0 else 0
print(f"\nPerformance summary:")
print(f" Avg iteration time — Baseline : {avg_base:8.0f} ms")
print(f" Avg iteration time — Repartitioned : {avg_rept:8.0f} ms")
print(f" Gain per iteration : {gain:+.1f}%")
print(f" Total time — Baseline : {total_base:8.0f} ms")
print(f" Total time — Repartitioned : {total_rept:8.0f} ms")
# ── Skew analysis ─────────────────────────────────────────────────────────────
print(f"\nSkew analysis (out-degree distribution):")
print(f" Mean : {deg_stats['mean']:.2f}")
print(f" Std : {deg_stats['stddev']:.2f}")
print(f" Max : {deg_stats['max']}")
print(f" Skew ratio: {skew_ratio:.1f}x (max / mean)")
if skew_ratio > 10:
print(" ⚠ High skew — hot vertices inflate specific shuffle partitions")
print(" Recommendation: filter out top-0.1% degree vertices or apply salting")
else:
print(" ✓ Moderate skew — repartitioning by 'src' distributes load evenly")
=================================================================
CONVERGENCE — BASELINE (default hash partitioning)
=================================================================
iter elapsed_ms delta
0 1030.4 0.001349
1 628.9 0.002393
2 577.8 0.002487
3 543.4 0.002112
4 644.8 0.000971
=================================================================
CONVERGENCE — REPARTITIONED (repartition(8, 'src'))
=================================================================
iter elapsed_ms delta
0 505.3 0.001349
1 315.0 0.002393
2 401.5 0.002487
3 546.2 0.002112
4 686.9 0.000971
Performance summary:
Avg iteration time — Baseline : 685 ms
Avg iteration time — Repartitioned : 491 ms
Gain per iteration : +28.3%
Total time — Baseline : 3425 ms
Total time — Repartitioned : 2455 ms
Skew analysis (out-degree distribution):
Mean : 42.69
Std : 49.60
Max : 438
Skew ratio: 10.3x (max / mean)
⚠ High skew — hot vertices inflate specific shuffle partitions
Recommendation: filter out top-0.1% degree vertices or apply salting
Step 5 — Evidence & Metrics¶
# ── Compile full metrics log ──────────────────────────────────────────────────
graph_build_row = {
"run_id": "graph_build",
"path": "A",
"algorithm": "graph_construction",
"task": "build_graph",
"notes": f"{n_vertices} vertices | {n_edges} edges | sector={SECTOR_DEG}deg/{TIME_BUCKET}s | skew_ratio={skew_ratio:.1f}x",
"iteration_or_k": 0,
"seed": 0,
"metric_value": round(skew_ratio, 3),
"shuffle_read_bytes": 0,
"shuffle_write_bytes": 0,
"elapsed_ms": 0,
"timestamp": datetime.datetime.now().isoformat(),
}
all_metrics = [graph_build_row] + pr_metrics_baseline + pr_metrics_repartitioned
df_metrics = pd.DataFrame(all_metrics)
df_metrics.to_csv("lab3_metrics_log.csv", index=False)
print("lab3_metrics_log.csv written:")
print(df_metrics[["run_id", "task", "iteration_or_k",
"metric_value", "elapsed_ms", "notes"]].to_string(index=False))
# ── Verify all proof files ────────────────────────────────────────────────────
print("\n── Proof file checklist ─────────────────────────────────")
proof_files = [
("proof/plan_iteration.txt", "Physical plan for one PageRank iteration"),
("proof/plan_before.txt", "Plan BEFORE repartitioning"),
("proof/plan_after.txt", "Plan AFTER repartitioning"),
]
for fpath, desc in proof_files:
p = pathlib.Path(fpath)
ok = p.exists() and p.stat().st_size > 0
print(f" {'[x]' if ok else '[ ]'} {fpath:40s} — {desc} ({p.stat().st_size if p.exists() else 0} bytes)")
print("\n── Deliverables checklist ────────────────────────────────")
deliverables = [
("assignment3_esiee.ipynb", "this notebook"),
("outputs/lab3/pagerank_baseline", "Parquet — baseline PageRank scores"),
("outputs/lab3/pagerank_repartitioned", "Parquet — repartitioned PageRank scores"),
("lab3_metrics_log.csv", "per-iteration metrics log"),
("engineering_note_lab3.md", "before/after comparative report"),
("GENAI.md", "AI usage declaration"),
]
for path, desc in deliverables:
ok = pathlib.Path(path).exists()
print(f" {'[x]' if ok else '[ ]'} {path:45s} — {desc}")
print()
print(" [ ] proof/spark_ui_jobs_baseline.png — capture manually (Jobs tab)")
print(" [ ] proof/spark_ui_shuffle_before.png — capture manually (SQL/Stages tab)")
print(" [ ] proof/spark_ui_shuffle_after.png — capture manually (SQL/Stages tab)")
lab3_metrics_log.csv written:
run_id task iteration_or_k metric_value elapsed_ms notes
graph_build build_graph 0 10.259000 0.0 6264 vertices | 256461 edges | sector=2deg/300s | skew_ratio=10.3x
baseline_iter_00 pagerank_iter_0 0 0.001349 1030.4 default partitioning (baseline)
baseline_iter_01 pagerank_iter_1 1 0.002393 628.9 default partitioning (baseline)
baseline_iter_02 pagerank_iter_2 2 0.002487 577.8 default partitioning (baseline)
baseline_iter_03 pagerank_iter_3 3 0.002112 543.4 default partitioning (baseline)
baseline_iter_04 pagerank_iter_4 4 0.000971 644.8 default partitioning (baseline)
repartitioned_iter_00 pagerank_iter_0 0 0.001349 505.3 repartition(8, 'src') — optimized
repartitioned_iter_01 pagerank_iter_1 1 0.002393 315.0 repartition(8, 'src') — optimized
repartitioned_iter_02 pagerank_iter_2 2 0.002487 401.5 repartition(8, 'src') — optimized
repartitioned_iter_03 pagerank_iter_3 3 0.002112 546.2 repartition(8, 'src') — optimized
repartitioned_iter_04 pagerank_iter_4 4 0.000971 686.9 repartition(8, 'src') — optimized
── Proof file checklist ─────────────────────────────────
[x] proof/plan_iteration.txt — Physical plan for one PageRank iteration (33587 bytes)
[x] proof/plan_before.txt — Plan BEFORE repartitioning (33587 bytes)
[x] proof/plan_after.txt — Plan AFTER repartitioning (44734 bytes)
── Deliverables checklist ────────────────────────────────
[x] assignment3_esiee.ipynb — this notebook
[x] outputs/lab3/pagerank_baseline — Parquet — baseline PageRank scores
[x] outputs/lab3/pagerank_repartitioned — Parquet — repartitioned PageRank scores
[x] lab3_metrics_log.csv — per-iteration metrics log
[x] engineering_note_lab3.md — before/after comparative report
[x] GENAI.md — AI usage declaration
[ ] proof/spark_ui_jobs_baseline.png — capture manually (Jobs tab)
[ ] proof/spark_ui_shuffle_before.png — capture manually (SQL/Stages tab)
[ ] proof/spark_ui_shuffle_after.png — capture manually (SQL/Stages tab)
spark.stop()
print("Assignment 3 complete.")
Assignment 3 complete.