DE2 - Lab 3: Graphs or Clustering - Instrumented Iterative Workload (15%)¶
Author : Badr TAJINI - Data Engineering II - ESIEE 2025-2026
Track: D - Aviation (OpenSky Network)
Path chosen: A - Graph Processing (Airport Airspace Proximity Network + manual iterative PageRank)
Graph model:
- Vertices = unique aircraft (
icao24identifiers) - Edges = two aircraft occupied the same 2°×2° lat/lon airspace sector within the same 5-minute time window
- PageRank on this graph reveals the aircraft most central in the airspace traffic flow
Goal: Execute an iterative workload with a platform-engineering focus: partitioning strategies, skew analysis, per-iteration shuffle costs, convergence, and a before/after comparative report.
import os, time, pathlib, datetime, socket
import pandas as pd
import numpy as np
from contextlib import redirect_stdout
from urllib.parse import urlparse
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType, BooleanType
# ── WSL / WINDOWS IP CONFIGURATION ──────────────────────────────────────────
# This block ensures the Spark UI is accessible from your Windows browser.
# It detects the WSL internal IP and binds Spark to it.
try:
# Create a dummy socket to find the eth0 IP address of WSL
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
DE2_IP = s.getsockname()[0]
s.close()
except Exception:
DE2_IP = "127.0.0.1"
os.environ["SPARK_LOCAL_IP"] = DE2_IP
def show_spark_ui(s):
"""
Prints the Spark UI links.
Click the 'WSL/Windows Access' link to open the UI in Chrome/Edge.
"""
ui_url = s.sparkContext.uiWebUrl
print(f"Spark version: {s.version}")
if ui_url:
ui_port = urlparse(ui_url).port or 4040
print(f"Spark UI (Internal WSL): {ui_url}")
print(f"Spark UI (WSL/Windows Access): http://localhost:{ui_port}")
else:
print("Spark UI: Not available")
# ── SPARK SESSION INITIALIZATION ──────────────────────────────────────────────
# We use 'local[*]' to utilize all available CPU cores.
# 4GB of RAM is allocated to prevent 'Java Heap Space' errors during joins.
spark = (
SparkSession.builder
.appName("DE2-Lab3-GraphProcessing")
.master("local[*]")
.config("spark.driver.host", DE2_IP)
.config("spark.driver.bindAddress", "0.0.0.0")
.config("spark.ui.bindAddress", "0.0.0.0")
.config("spark.driver.memory", "4g")
.config("spark.sql.shuffle.partitions", "8") # Optimized for local 8-core machines
.getOrCreate()
)
show_spark_ui(spark)
# ── DIRECTORY SETUP ──────────────────────────────────────────────────────────
for p in ["outputs/lab3", "proof"]:
pathlib.Path(p).mkdir(parents=True, exist_ok=True)
print(f"Directory ready: {p}")
WARNING: Using incubator modules: jdk.incubator.vector Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 26/04/29 18:59:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark version: 4.0.1 Spark UI (Internal WSL): http://172.30.130.70:4040 Spark UI (WSL/Windows Access): http://localhost:4040 Directory ready: outputs/lab3 Directory ready: proof
0. Data Preparation¶
Load the OpenSky CSV from Lab 1.
If unavailable, generate a synthetic dataset that reproduces the same schema and statistical structure.
LAB1_CSV = "data/raw_aviation/states_2017-06-05-00.csv"
SYNTH_PATH = "data/raw_aviation/synthetic_opensky.csv"
pathlib.Path("data/raw_aviation").mkdir(parents=True, exist_ok=True)
if not pathlib.Path(LAB1_CSV).exists() and not pathlib.Path(SYNTH_PATH).exists():
print("Generating synthetic OpenSky dataset (≈ 5 000 records)...")
rng = np.random.default_rng(42)
n = 5000
icao24_pool = [f"a{i:05x}" for i in range(300)] # 300 unique aircraft
squawk_pool = [str(s).zfill(4) for s in rng.integers(1000, 7777, 60)]
rows = {
"time": rng.integers(1496620800, 1496624400, n), # 1h window
"icao24": rng.choice(icao24_pool, n),
"lat": rng.uniform(43.0, 52.0, n), # Europe
"lon": rng.uniform(-5.0, 15.0, n),
"velocity": rng.uniform(100, 280, n),
"heading": rng.uniform(0, 360, n),
"vertrate": rng.uniform(-10, 10, n),
"callsign": [f"AF{rng.integers(100,999)}" for _ in range(n)],
"onground": rng.choice([True, False], n, p=[0.05, 0.95]),
"alert": rng.choice([True, False], n, p=[0.01, 0.99]),
"spi": [False] * n,
"squawk": rng.choice(squawk_pool, n),
"baroaltitude": rng.uniform(1000, 12000, n),
"geoaltitude": rng.uniform(1000, 12000, n),
"lastposupdate":rng.uniform(1496620800, 1496624400, n),
"lastcontact": rng.uniform(1496620800, 1496624400, n),
}
pd.DataFrame(rows).to_csv(SYNTH_PATH, index=False)
print(f"Synthetic dataset written → {SYNTH_PATH}")
SOURCE_CSV = SYNTH_PATH
else:
SOURCE_CSV = LAB1_CSV if pathlib.Path(LAB1_CSV).exists() else SYNTH_PATH
print(f"Using dataset: {SOURCE_CSV}")
Using dataset: data/raw_aviation/states_2017-06-05-00.csv
A.1 — Build Edge and Vertex DataFrames¶
Proximity graph construction:
- Snap each aircraft's position to a 2°×2° lat/lon sector grid and a 5-minute time bucket
- Two aircraft sharing the same
(sector_lat, sector_lon, time_bucket)tuple → edge between them - This models potential ATC separation conflicts or shared approach paths
# ── Load raw OpenSky data ─────────────────────────────────────────────────────
opensky_schema = StructType([
StructField("time", LongType(), True),
StructField("icao24", StringType(), True),
StructField("lat", DoubleType(), True),
StructField("lon", DoubleType(), True),
StructField("velocity", DoubleType(), True),
StructField("heading", DoubleType(), True),
StructField("vertrate", DoubleType(), True),
StructField("callsign", StringType(), True),
StructField("onground", BooleanType(),True),
StructField("alert", BooleanType(),True),
StructField("spi", BooleanType(),True),
StructField("squawk", StringType(), True),
StructField("baroaltitude", DoubleType(), True),
StructField("geoaltitude", DoubleType(), True),
StructField("lastposupdate", DoubleType(), True),
StructField("lastcontact", DoubleType(), True),
])
df_raw = (
spark.read
.schema(opensky_schema)
.option("header", "true")
.csv(SOURCE_CSV)
.filter(F.col("icao24").isNotNull() & F.col("lat").isNotNull() & F.col("lon").isNotNull())
.filter(~F.col("onground")) # airborne aircraft only
)
print(f"Raw records (airborne): {df_raw.count():,}")
# ── Snap to 2°×2° sectors + 5-minute time buckets ────────────────────────────
SECTOR_DEG = 2 # degrees per sector cell
TIME_BUCKET = 300 # seconds (5 minutes)
df_sectors = df_raw.select(
F.col("icao24"),
(F.floor(F.col("lat") / SECTOR_DEG) * SECTOR_DEG).alias("sector_lat"),
(F.floor(F.col("lon") / SECTOR_DEG) * SECTOR_DEG).alias("sector_lon"),
(F.floor(F.col("time") / TIME_BUCKET) * TIME_BUCKET).alias("time_bucket"),
)
# ── Build proximity edges via self-join on (sector, time_bucket) ──────────────
# Alias the same DataFrame to perform the self-join
left = df_sectors.alias("l")
right = df_sectors.alias("r")
edges_raw = (
left.join(
right,
on=(
(F.col("l.sector_lat") == F.col("r.sector_lat")) &
(F.col("l.sector_lon") == F.col("r.sector_lon")) &
(F.col("l.time_bucket") == F.col("r.time_bucket")) &
(F.col("l.icao24") < F.col("r.icao24")) # avoid duplicates & self-loops
)
)
.select(
F.col("l.icao24").alias("src"),
F.col("r.icao24").alias("dst"),
)
.distinct()
)
# Materialize edge list
edges_raw.cache()
n_edges = edges_raw.count()
print(f"Edges (unique aircraft pairs in same sector/window): {n_edges:,}")
edges_raw.show(10)
# ── Build vertex DataFrame ────────────────────────────────────────────────────
vertices = (
edges_raw.select("src").withColumnRenamed("src", "id")
.union(edges_raw.select("dst").withColumnRenamed("dst", "id"))
.distinct()
)
vertices.cache()
n_vertices = vertices.count()
print(f"Vertices (aircraft in the graph): {n_vertices:,}")
# ── Skew analysis: out-degree distribution ────────────────────────────────────
out_degree = (
edges_raw.groupBy("src")
.count()
.withColumnRenamed("count", "out_degree")
.orderBy(F.desc("out_degree"))
)
out_degree.cache()
print("\nTop 10 aircraft by out-degree (hot vertices = potential skew sources):")
out_degree.show(10)
deg_stats = out_degree.select(
F.mean("out_degree").alias("mean"),
F.stddev("out_degree").alias("stddev"),
F.max("out_degree").alias("max")
).first()
print(f"Degree — mean: {deg_stats['mean']:.2f}, std: {deg_stats['stddev']:.2f}, max: {deg_stats['max']}")
Raw records (airborne): 992,613
Edges (unique aircraft pairs in same sector/window): 256,461 +------+------+ | src| dst| +------+------+ |7c39fa|7c4eeb| |7503a9|7c7a45| |7c6d26|7c6db0| |7c6d26|7c752d| |7c5b44|7c6d26| |7c6c99|7c6d75| |780aa2|7c6d29| |780aa2|7c7a3e| |780aa2|7c23be| |7c7bd3|7cc3ff| +------+------+ only showing top 10 rows Vertices (aircraft in the graph): 6,264 Top 10 aircraft by out-degree (hot vertices = potential skew sources): +------+----------+ | src|out_degree| +------+----------+ |0d0760| 438| |2ac0af| 382| |4245fb| 343| |2ac64c| 335| |2ac55b| 329| |4b1901| 326| |2ac126| 325| |2980c5| 316| |394a02| 304| |a0420e| 304| +------+----------+ only showing top 10 rows Degree — mean: 42.69, std: 49.60, max: 438
A.2 — Iterative PageRank (manual implementation)¶
Algorithm:
- Initialize:
rank[v] = 1 / Nfor all vertices - Each iteration:
rank[v] = (1-d)/N + d × Σ(rank[u]/out_degree[u])for all in-neighbours u - Damping factor
d = 0.85(standard web graph value, valid for airspace traffic too) - Stopping criterion:
max(|new_rank - old_rank|) < ε = 0.001
Each iteration is wrapped in time.time() and shuffle metrics are logged.
# ── PageRank constants ────────────────────────────────────────────────────────
DAMPING = 0.85
MAX_ITER = 10
EPSILON = 0.001 # convergence threshold
# ── Use the DEFAULT partitioning (baseline) ───────────────────────────────────
edges_baseline = edges_raw # no explicit repartition
# ── Initialize ranks ─────────────────────────────────────────────────────────
ranks = vertices.withColumn("rank", F.lit(1.0 / n_vertices))
# Pre-compute out-degree (constant across iterations)
# Already computed above as `out_degree`
pr_metrics_baseline = [] # log per-iteration metrics
converged = False
for i in range(MAX_ITER):
t0 = time.time()
# 1. Join edges with current ranks (src → rank)
contribs = (
edges_baseline
.join(ranks, edges_baseline.src == ranks.id, "inner")
.join(out_degree, edges_baseline.src == out_degree.src, "inner")
.select(
edges_baseline.dst.alias("id"),
(F.col("rank") / F.col("out_degree")).alias("contrib")
)
)
# 2. Aggregate contributions per destination
new_ranks = (
contribs
.groupBy("id")
.agg(F.sum("contrib").alias("sum_contrib"))
.withColumn(
"rank",
F.lit((1.0 - DAMPING) / n_vertices) + F.lit(DAMPING) * F.col("sum_contrib")
)
.select("id", "rank")
)
# 3. Materialize to break DAG lineage, then measure convergence delta
new_ranks.cache()
new_ranks.count() # force action
elapsed_ms = (time.time() - t0) * 1000
# 4. Convergence: max absolute rank change
# Fix: Alias the old rank column to 'old_rank' to avoid [AMBIGUOUS_REFERENCE] error
old_r = ranks.withColumnRenamed("rank", "old_rank")
delta = (
new_ranks.join(old_r, "id")
.select(F.max(F.abs(F.col("rank") - F.col("old_rank"))).alias("delta"))
.first()["delta"] or 0.0
)
# Log per-iteration metrics
pr_metrics_baseline.append({
"run_id": f"baseline_iter_{i}",
"path": "A",
"algorithm": "pagerank_manual",
"task": f"pagerank_iter_{i}",
"notes": "default partitioning",
"iteration_or_k": i,
"seed": 0,
"metric_value": delta,
"shuffle_read_bytes": 0, # to be filled manually from Spark UI
"shuffle_write_bytes": 0, # to be filled manually from Spark UI
"elapsed_ms": elapsed_ms,
"timestamp": datetime.datetime.now().isoformat(),
})
print(f"Iter {i:2d}: elapsed={elapsed_ms:7.0f} ms | delta={delta:.6f}")
# Update ranks for next iteration and clean up cache
ranks.unpersist()
ranks = new_ranks
if delta < EPSILON:
print(f" → Converged at iteration {i} (delta={delta:.6f} < ε={EPSILON})")
converged = True
break
if not converged:
print(f" → Did not converge within {MAX_ITER} iterations")
print("\nTop 15 aircraft by PageRank (most central in the airspace network):")
ranks.orderBy(F.desc("rank")).show(15)
26/04/29 19:06:48 WARN Column: Constructing trivially true equals predicate, 'src == src'. Perhaps you need to use aliases. 26/04/29 19:06:48 WARN CacheManager: Asked to cache already cached data.
Iter 0: elapsed= 265 ms | delta=0.001349
26/04/29 19:06:48 WARN Column: Constructing trivially true equals predicate, 'src == src'. Perhaps you need to use aliases.
Iter 1: elapsed= 584 ms | delta=0.002393
26/04/29 19:06:49 WARN Column: Constructing trivially true equals predicate, 'src == src'. Perhaps you need to use aliases.
Iter 2: elapsed= 591 ms | delta=0.002487
26/04/29 19:06:50 WARN Column: Constructing trivially true equals predicate, 'src == src'. Perhaps you need to use aliases.
Iter 3: elapsed= 555 ms | delta=0.002112
26/04/29 19:06:51 WARN Column: Constructing trivially true equals predicate, 'src == src'. Perhaps you need to use aliases.
Iter 4: elapsed= 593 ms | delta=0.000971 → Converged at iteration 4 (delta=0.000971 < ε=0.001) Top 15 aircraft by PageRank (most central in the airspace network): +------+--------------------+ | id| rank| +------+--------------------+ |c81e22|0.007440856845081005| |cda28b|0.007002487247207178| |e8043b|0.005849323222585...| |e61f99|0.005281231065134172| |c0884a|0.005247960316645426| |c07fbf|0.004481929543099...| |d2579e|0.003832716281315...| |c080a0|0.003708087800843093| |c052e0|0.003416716491559...| |e8043a|0.003241819131628802| |ae0211|0.003066926241153087| |adea26|0.003027870476320895| |c087bc|0.002991025535631735| |c067e7|0.002811286913353...| |c080a9|0.002766710516659172| +------+--------------------+ only showing top 15 rows
# ── Save final PageRank results ───────────────────────────────────────────────
ranks.write.mode("overwrite").parquet("outputs/lab3/pagerank_results")
print("PageRank results written → outputs/lab3/pagerank_results")
# ── Save iteration plan (formatted physical plan for one iteration's join) ────
contribs_plan = (
edges_baseline
.join(ranks, edges_baseline.src == ranks.id, "inner")
.join(out_degree, edges_baseline.src == out_degree.src, "inner")
.select(
edges_baseline.dst.alias("id"),
(F.col("rank") / F.col("out_degree")).alias("contrib")
)
)
with open("proof/plan_iteration.txt", "w") as f:
with redirect_stdout(f):
contribs_plan.explain(mode="formatted")
print("proof/plan_iteration.txt saved.")
PageRank results written → outputs/lab3/pagerank_results proof/plan_iteration.txt saved.
26/04/29 19:07:46 WARN Column: Constructing trivially true equals predicate, 'src == src'. Perhaps you need to use aliases.
A.3 — Partitioning Experiment¶
Hypothesis: Repartitioning edges by the join key (src) before the iterative loop collocates all out-edges of the same aircraft in the same partition, reducing shuffle during the join with ranks.
Strategies compared:
| Strategy | Description |
|---|---|
| Baseline | Default hash partition (no explicit repartition) |
| Repartitioned | edges.repartition(8, "src") + ranks.repartition(8, "id") before loop |
# ── Save plan BEFORE repartitioning ──────────────────────────────────────────
ranks_init = vertices.withColumn("rank", F.lit(1.0 / n_vertices))
plan_before_q = (
edges_raw
.join(ranks_init, edges_raw.src == ranks_init.id, "inner")
.join(out_degree, edges_raw.src == out_degree.src, "inner")
.select(edges_raw.dst.alias("id"),
(F.col("rank") / F.col("out_degree")).alias("contrib"))
)
with open("proof/plan_before.txt", "w") as f:
with redirect_stdout(f):
plan_before_q.explain(mode="formatted")
print("proof/plan_before.txt saved.")
# ── Repartition edges and out_degree by join key ──────────────────────────────
N_PARTS = 8
edges_repartitioned = edges_raw.repartition(N_PARTS, "src").cache()
edges_repartitioned.count() # materialize
out_degree_repartitioned = out_degree.repartition(N_PARTS, "src").cache()
out_degree_repartitioned.count()
print(f"Edges repartitioned to {N_PARTS} partitions by 'src'.")
# ── Save plan AFTER repartitioning ────────────────────────────────────────────
plan_after_q = (
edges_repartitioned
.join(ranks_init, edges_repartitioned.src == ranks_init.id, "inner")
.join(out_degree_repartitioned, edges_repartitioned.src == out_degree_repartitioned.src, "inner")
.select(edges_repartitioned.dst.alias("id"),
(F.col("rank") / F.col("out_degree")).alias("contrib"))
)
with open("proof/plan_after.txt", "w") as f:
with redirect_stdout(f):
plan_after_q.explain(mode="formatted")
print("proof/plan_after.txt saved.")
# ── Run PageRank with repartitioned edges ─────────────────────────────────────
ranks2 = vertices.withColumn("rank", F.lit(1.0 / n_vertices))
pr_metrics_repartitioned = []
for i in range(MAX_ITER):
t0 = time.time()
contribs2 = (
edges_repartitioned
.join(ranks2, edges_repartitioned.src == ranks2.id, "inner")
.join(out_degree_repartitioned, edges_repartitioned.src == out_degree_repartitioned.src, "inner")
.select(
edges_repartitioned.dst.alias("id"),
(F.col("rank") / F.col("out_degree")).alias("contrib")
)
)
new_ranks2 = (
contribs2.groupBy("id")
.agg(F.sum("contrib").alias("sum_contrib"))
.withColumn("rank",
F.lit((1.0 - DAMPING) / n_vertices) + F.lit(DAMPING) * F.col("sum_contrib"))
.select("id", "rank")
)
new_ranks2.cache()
new_ranks2.count()
elapsed_ms = (time.time() - t0) * 1000
old_r2 = ranks2.withColumnRenamed("rank", "old_rank")
delta2 = (
new_ranks2.join(old_r2, "id")
.select(F.max(F.abs(new_ranks2.rank - F.col("old_rank"))).alias("delta"))
.first()["delta"] or 0.0
)
pr_metrics_repartitioned.append({
"run_id": f"repartitioned_iter_{i}",
"path": "A",
"algorithm": "pagerank_manual",
"task": f"pagerank_iter_{i}",
"notes": f"repartition(8, 'src')",
"iteration_or_k": i,
"seed": 0,
"metric_value": delta2,
"shuffle_read_bytes": 0,
"shuffle_write_bytes": 0,
"elapsed_ms": elapsed_ms,
"timestamp": datetime.datetime.now().isoformat(),
})
print(f"[REPARTITIONED] Iter {i:2d}: elapsed={elapsed_ms:7.0f} ms | delta={delta2:.6f}")
ranks2.unpersist()
ranks2 = new_ranks2
if delta2 < EPSILON:
print(f" → Converged at iteration {i}")
break
print("\nTop 10 aircraft (repartitioned run):")
ranks2.orderBy(F.desc("rank")).show(10)
26/04/29 19:07:57 WARN Column: Constructing trivially true equals predicate, 'src == src'. Perhaps you need to use aliases.
proof/plan_before.txt saved. Edges repartitioned to 8 partitions by 'src'. proof/plan_after.txt saved.
26/04/29 19:07:58 WARN Column: Constructing trivially true equals predicate, 'src == src'. Perhaps you need to use aliases. 26/04/29 19:07:58 WARN Column: Constructing trivially true equals predicate, 'src == src'. Perhaps you need to use aliases.
[REPARTITIONED] Iter 0: elapsed= 632 ms | delta=0.001349
26/04/29 19:07:59 WARN Column: Constructing trivially true equals predicate, 'src == src'. Perhaps you need to use aliases.
[REPARTITIONED] Iter 1: elapsed= 361 ms | delta=0.002393
26/04/29 19:08:00 WARN Column: Constructing trivially true equals predicate, 'src == src'. Perhaps you need to use aliases.
[REPARTITIONED] Iter 2: elapsed= 440 ms | delta=0.002487
26/04/29 19:08:00 WARN Column: Constructing trivially true equals predicate, 'src == src'. Perhaps you need to use aliases.
[REPARTITIONED] Iter 3: elapsed= 659 ms | delta=0.002112
26/04/29 19:08:01 WARN Column: Constructing trivially true equals predicate, 'src == src'. Perhaps you need to use aliases.
[REPARTITIONED] Iter 4: elapsed= 757 ms | delta=0.000971 → Converged at iteration 4 Top 10 aircraft (repartitioned run): +------+--------------------+ | id| rank| +------+--------------------+ |c81e22|0.007440856845081...| |cda28b|0.007002487247207179| |e8043b|0.005849323222585473| |e61f99|0.005281231065134172| |c0884a|0.005247960316645426| |c07fbf| 0.00448192954309929| |d2579e|0.003832716281315...| |c080a0|0.003708087800843093| |c052e0|0.003416716491559356| |e8043a|0.003241819131628801| +------+--------------------+ only showing top 10 rows
A.4 — Convergence & Skew Analysis¶
# ── Convergence table ─────────────────────────────────────────────────────────
df_base = pd.DataFrame(pr_metrics_baseline)
df_rept = pd.DataFrame(pr_metrics_repartitioned)
print("=== Baseline convergence ===")
print(df_base[["iteration_or_k", "elapsed_ms", "metric_value"]].to_string(index=False))
print("\n=== Repartitioned convergence ===")
print(df_rept[["iteration_or_k", "elapsed_ms", "metric_value"]].to_string(index=False))
# ── Skew summary ──────────────────────────────────────────────────────────────
print(f"\nSkew analysis:")
print(f" Mean out-degree : {deg_stats['mean']:.2f}")
print(f" Std out-degree : {deg_stats['stddev']:.2f}")
print(f" Max out-degree : {deg_stats['max']}")
skew_ratio = deg_stats["max"] / deg_stats["mean"] if deg_stats["mean"] else 0
print(f" Skew ratio : {skew_ratio:.1f}x (max/mean)")
if skew_ratio > 10:
print(" ⚠ High skew — hot vertices will cause uneven shuffle partitions")
else:
print(" ✓ Moderate skew — repartitioning by 'src' should balance load")
# ── Performance comparison ────────────────────────────────────────────────────
if len(df_base) > 0 and len(df_rept) > 0:
avg_base = df_base["elapsed_ms"].mean()
avg_rept = df_rept["elapsed_ms"].mean()
gain = (avg_base - avg_rept) / avg_base * 100 if avg_base > 0 else 0
print(f"\nPerformance (avg per iteration):")
print(f" Baseline : {avg_base:.0f} ms")
print(f" Repartitioned : {avg_rept:.0f} ms")
print(f" Gain : {gain:+.1f}%")
=== Baseline convergence ===
iteration_or_k elapsed_ms metric_value
0 264.610529 0.001349
1 584.051847 0.002393
2 590.568781 0.002487
3 555.108547 0.002112
4 592.921972 0.000971
=== Repartitioned convergence ===
iteration_or_k elapsed_ms metric_value
0 632.028103 0.001349
1 361.491442 0.002393
2 439.799070 0.002487
3 659.268141 0.002112
4 756.708622 0.000971
Skew analysis:
Mean out-degree : 42.69
Std out-degree : 49.60
Max out-degree : 438
Skew ratio : 10.3x (max/mean)
⚠ High skew — hot vertices will cause uneven shuffle partitions
Performance (avg per iteration):
Baseline : 517 ms
Repartitioned : 570 ms
Gain : -10.1%
Shared — Evidence & Metrics¶
# ── Write lab3_metrics_log.csv ────────────────────────────────────────────────
all_metrics = pr_metrics_baseline + pr_metrics_repartitioned
# Add skew and vertex/edge count rows
all_metrics.insert(0, {
"run_id": "graph_build", "path": "A", "algorithm": "graph_construction",
"task": "build_graph", "notes": f"{n_vertices} vertices, {n_edges} edges, sector=2deg/5min",
"iteration_or_k": 0, "seed": 0,
"metric_value": skew_ratio,
"shuffle_read_bytes": 0, "shuffle_write_bytes": 0,
"elapsed_ms": 0, "timestamp": datetime.datetime.now().isoformat(),
})
df_metrics = pd.DataFrame(all_metrics)
df_metrics.to_csv("lab3_metrics_log.csv", index=False)
print("lab3_metrics_log.csv written:")
print(df_metrics[["run_id","task","iteration_or_k","metric_value","elapsed_ms"]].to_string(index=False))
# ── Verify proof files ────────────────────────────────────────────────────────
print("\nProof files:")
for f in ["proof/plan_iteration.txt", "proof/plan_before.txt", "proof/plan_after.txt"]:
p = pathlib.Path(f)
print(f" {'[x]' if p.exists() else '[ ]'} {f} ({p.stat().st_size if p.exists() else 0} bytes)")
print(" [ ] proof/spark_ui_shuffle_before.png — capture manually")
print(" [ ] proof/spark_ui_shuffle_after.png — capture manually")
lab3_metrics_log.csv written:
run_id task iteration_or_k metric_value elapsed_ms
graph_build build_graph 0 10.259127 0.000000
baseline_iter_0 pagerank_iter_0 0 0.001349 264.610529
baseline_iter_1 pagerank_iter_1 1 0.002393 584.051847
baseline_iter_2 pagerank_iter_2 2 0.002487 590.568781
baseline_iter_3 pagerank_iter_3 3 0.002112 555.108547
baseline_iter_4 pagerank_iter_4 4 0.000971 592.921972
repartitioned_iter_0 pagerank_iter_0 0 0.001349 632.028103
repartitioned_iter_1 pagerank_iter_1 1 0.002393 361.491442
repartitioned_iter_2 pagerank_iter_2 2 0.002487 439.799070
repartitioned_iter_3 pagerank_iter_3 3 0.002112 659.268141
repartitioned_iter_4 pagerank_iter_4 4 0.000971 756.708622
Proof files:
[x] proof/plan_iteration.txt (1183547 bytes)
[x] proof/plan_before.txt (33583 bytes)
[x] proof/plan_after.txt (44730 bytes)
[ ] proof/spark_ui_shuffle_before.png — capture manually
[ ] proof/spark_ui_shuffle_after.png — capture manually
Cleanup¶
spark.stop()
print("Lab 3 complete.")
Lab 3 complete.