DE2 — Final Project Report
Course: Data Engineering II (Data-Intensive Workloads) — ESIEE Paris 2025-2026
Track: D — Aviation (OpenSky Network)
Iterative path: A — Graph Processing
Names: Justine Guirauden and Volcy Desmazures
Date: May 2026
1. Use-case and Dataset
Problem statement
Build a reproducible, end-to-end data-intensive pipeline over European airspace traffic data that enables:
- Analysts to query per-sector aircraft density and per-aircraft kinematic profiles (batch ETL)
- Operations to monitor real-time airspace load via streaming aggregations
- NLP engineers to retrieve aviation-domain reports by keyword (inverted index)
- Researchers to identify the most central aircraft in the traffic network (PageRank)
- LLM teams to consume a curated, deduplicated aviation text corpus
Dataset
| Property | Value |
|---|---|
| Source | OpenSky Network — states_2017-06-05-00.csv |
| Format | CSV, 16 columns, header row |
| Records | ~5 000 (synthetic) / up to 100 000+ (real OpenSky snapshot) |
| Time range | 2017-06-05 00:00–01:00 UTC (1-hour window) |
| Known issues | ~3–5% null baroaltitude; whitespace padding in callsign; no row deduplication in raw file |
Schema (silver, after typing)
time (Long), icao24 (String, PK), lat/lon (Double), velocity/heading/vertrate (Double), callsign (String), onground/alert/spi (Boolean), squawk (String), baroaltitude/geoaltitude (Double), lastposupdate/lastcontact (Double), derived: event_time (Timestamp), year/month (Int), row_id (Long, surrogate)
2. System and SLOs
Hardware & Spark config
- Machine: laptop, Intel Core i7, 16 GB RAM
- OS: Ubuntu 24 (WSL2) or macOS
- Spark: 4.0.0, Python 3.10, JupyterLab,
local[*] spark.sql.shuffle.partitions = 8(tuned for local single-node)
SLOs
| SLO | Threshold | Stage |
|---|---|---|
| Streaming throughput | ≥ 100 rows/s processed | Streaming |
| Text query latency | ≤ 2 000 ms (single-term, cached index) | Text |
| PageRank convergence | max Δrank < 0.001 | Graph |
| Parquet/CSV ratio | ≤ 60% (Parquet ≤ 60% of CSV size) | Text / ETL |
| Full pipeline latency | ≤ 10 min on local machine | End-to-end |
| LLM pass ratio | ≥ 80% of documents pass quality filters | LLM prep |
3. Batch ETL Pipeline Design
Lineage diagram
data/project/raw/*.csv (Bronze — immutable, as-is CSV)
│
│ schema enforcement (opensky_schema, PERMISSIVE mode)
│ filter: icao24 IS NOT NULL AND time IS NOT NULL
│ domain validation: lat ∈ [-90,90], lon ∈ [-180,180], velocity ≥ 0
│ derive: event_time = from_unixtime(time)
│ year, month from event_time
│ callsign = trim(callsign)
│ row_id = xxhash64(icao24, time)
│ deduplicate: dropDuplicates(["icao24","time"])
▼
outputs/project/silver/ (Parquet — typed, clean, deduplicated)
│
├──► gold/density/ groupBy(year,month,sector,hour).agg(count,avg_vel,avg_alt)
│ partitionBy(year, month)
│
└──► gold/aircraft_profile/ groupBy(icao24,callsign,year,month).agg(n_pos,avg_vel,max_alt)
partitionBy(year, month)
Schema contracts
- Nullability:
icao24andtimeare NOT NULL (dropped if missing) - Domain: lat/lon validated; invalid velocity rows filtered
- Surrogate key:
xxhash64(icao24, time)— fast, collision-resistant, no shuffle required - Natural key:
(icao24, time)— used for deduplication
Key metrics
| Run | Stage | Rows | Bytes | Time (ms) |
|---|---|---|---|---|
| r1 | Bronze landing | 1 526 689 | 199 784 927 | 6 811 |
| r1 | Bronze → Silver | 1 526 689 | 61 052 460 | 13 381 |
| r1 | Silver → Gold | — | 362 213 | 12 690 |
Note: 0 rows removed at deduplication step — no duplicates found in the raw file for this run. Bronze size is the raw CSV on disk (163.9 MiB read by Spark per the physical plan); the Silver Parquet footprint (61 MB) represents a 3.3× compression over the shuffle-written intermediate.
4. Streaming Ingestion
Configuration
- Source: file-based (
readStream.csv),maxFilesPerTrigger=1 - Event time:
from_unixtime(time)cast to Timestamp - Watermark: 10 minutes (tolerates late ATC data re-transmissions)
- Window: 5-minute tumbling, keyed by
(sector_lat, sector_lon) - Aggregates:
count(*),avg(velocity),max(baroaltitude),countDistinct(icao24) - Output mode:
append(only closed windows emitted) - Checkpoint:
outputs/project/streaming_checkpoint/(exactly-once) - Trigger:
processingTime = "10 seconds"
Evidence
| Metric | Measured | SLO | Status |
|---|---|---|---|
| processedRowsPerSecond | 57 883 | ≥ 100 | PASS |
| triggerExecution (ms) | 2 971 (avg, batches 2–8) | — | — |
| inputRowsPerSecond | 15 146 | — | — |
Steady-state measurements taken over batches 2–8 (batch 0 excluded as cold-start). The Spark Streaming UI confirms Avg Process/sec = 57 882.86 and Avg Input/sec = 15 145.78 over a 7-minute run (9 batches total, latest batch 7).
proof/query_progress.json contains the full lastProgress JSON.
proof/plan_streaming.txt contains the formatted physical plan.
5. Text Processing
Corpus
Built from the OpenSky dataset: each aircraft position record is converted to a structured English sentence describing status, altitude, velocity, heading, and squawk code. One document = one record (doc_id = icao24_time).
Normalization pipeline
lower()→ uniform caseregexp_replace([^a-z0-9\s], "")→ strip punctuationsplit([\s\W]+)→ tokenizefilter(len > 1)→ remove single-char tokens- Stop-word filter: 36-word list including METAR-specific filler (
meters,knots,degrees, spelled-out digits)
Index schema
token (String) | doc_ids (Array<String>) | freq (Long)
Query latency
| Term | freq | postings | latency (ms) | SLO ≤ 2 000 ms |
|---|---|---|---|---|
| aircraft | 1 526 689 | 1 526 689 | 6 907 | FAIL |
| landing | 0 | 0 | 471 | PASS |
| squawk | 1 526 689 | 1 526 689 | 3 105 | FAIL |
Analysis:
aircraftandsquawkappear in every document (universal terms in the synthetic corpus), producing maximal posting lists (1 526 689 entries) that exceed the 2 s SLO.landingis absent from the vocabulary (freq = 0), yielding a fast index-miss path (471 ms). The SLO breach reflects the degenerate distribution of a synthetic corpus — in a real, varied METAR corpus, high-selectivity terms would dominate queries. Mitigation: add a TF-IDF cutoff to skip ultra-high-frequency tokens, or pre-materialise posting-list sizes to short-circuit at query time.
Storage footprint
| Format | Size (bytes) | Files | Notes |
|---|---|---|---|
| Parquet | 104 788 488 | — | Column pruning on token filter |
| CSV | 304 244 880 | — | Full scan, no predicate pushdown |
| Ratio | 34.44% | — | SLO ≤ 60% — PASS |
6. Iterative Workload — Graph PageRank
Choice and rationale
Path A — Graph Processing. The OpenSky dataset provides position data (lat, lon, time) that naturally encodes spatial proximity relationships. A route graph is not available in the raw feed, so a proximity co-presence graph is constructed: two aircraft sharing the same 2°×2° lat/lon sector in the same 5-minute window are connected by an edge. PageRank on this graph identifies aircraft most central in the traffic flow — those frequenting dense corridors (LFPG approach, Mediterranean overflight).
Algorithm
- Vertices: unique
icao24identifiers (airborne only) - Edges: distinct co-present pairs per sector/window (strict inequality to avoid duplicates)
- PageRank formula:
rank(v) = (1−d)/N + d × Σ rank(u)/outdeg(u) - Damping: d = 0.85 | Convergence: max|Δrank| < 0.001
Graph statistics
| Metric | Value |
|---|---|
| Vertices | 6 264 |
| Edges | 256 461 |
Per-iteration metrics
| Iter | Baseline (ms) | Repartitioned (ms) | Delta (ms) |
|---|---|---|---|
| 0 | 1 171 | 535 | −636 |
| 1 | 869 | 481 | −388 |
| 2 | 711 | 578 | −133 |
| 3 | 659 | 609 | −50 |
| 4 | 764 | 849 | +85 |
| Avg | 835 | 610 | −225 |
Convergence reached at iteration 4: max Δrank = 0.000971 < 0.001 PASS.
Partitioning comparison
| Strategy | Avg iter (ms) | Shuffle behaviour |
|---|---|---|
| Default hash | 835 | Exchange on both sides of join |
repartition(8, "src") | 610 | Exchange only on ranks side |
| Gain | ≈ 27% | — |
proof/plan_before.txt and proof/plan_after.txt confirm the Exchange node reduction.
Skew analysis
| Metric | Value |
|---|---|
| Skew ratio (max/mean) | 10.3× |
| Graph size | 6 264 vertices, 256 461 edges |
A ratio of 10.3× indicates moderate skew —
repartition(8, "src")is sufficient to reduce join shuffle. The threshold for more aggressive measures (salting, hot-vertex filtering) would be > 15×.
7. LLM Data Readiness
Pipeline
silver data (text corpus)
│ filter: text IS NOT NULL
│ filter: length(text) ≥ 100 chars
│ withColumn: content_hash = xxhash64(text)
│ dropDuplicates(["content_hash"])
│ withColumn: word_count, source, version, curated_at
▼
outputs/project/llm_ready/ (Parquet)
Data card
| Field | Value |
|---|---|
| Name | DE2 Aviation LLM-Ready Dataset |
| Source | OpenSky Network states_2017-06-05-00 (Track D) |
| Size | 0 rows / 0 bytes |
| Schema | doc_id, text, word_count, content_hash, source, version, curated_at |
| Quality filters | NOT NULL, length ≥ 100, xxhash64 dedup |
| Raw input | 1 526 689 rows |
| Pass ratio | 0.00% ⇒ 45.36% (SLO: ≥ 80%) — FAIL |
| Intended use | RAG retrieval over aviation reports; LLM fine-tuning for aviation NLP |
| Version | v1.0 |
Root cause: The synthetic METAR sentences generated from the OpenSky schema are structurally short (typically < 100 characters), causing all 1 526 689 rows to be rejected by the
length(text) ≥ 100quality filter. Fix: lower the threshold to ≥ 50 chars for synthetic data, or enrich the sentence template to include full field descriptions (callsign, squawk, sector, trajectory vector) to push average length above 100 chars. On real METAR reports the filter would pass comfortably.
Lowering the quality filter threshold from 100 to 50 characters improved the retention rate to 45.36%, but it remains insufficient to meet the 80% pass ratio SLO; this confirms that the synthetic METAR sentences lack structural diversity and are predominantly removed during the dropDuplicates step.
This is a data-engineering deliverable only — no LLM is run. The pipeline prepares structured Parquet files consumable by any HuggingFace datasets.load_dataset("parquet", ...) call.
8. Physical Design & Optimization
Strategies applied
| Technique | Where | Effect |
|---|---|---|
shuffle.partitions = 8 | All stages | Removes 192 empty tasks on 8-core local machine |
partitionBy(year, month) | Gold tables | Enables partition pruning on date-range queries |
repartition(8, "src") | PageRank | Collocates out-edges with rank data, reduces join shuffle |
.cache().count() in PageRank loop | Iterative | Breaks DAG lineage growth, prevents exponential plan depth |
xxhash64 surrogate key | Silver | No shuffle needed; pure local hash |
| Parquet for silver/gold/index/LLM | All batch | Column pruning + Snappy compression |
Before/after plans
proof/plan_etl_silver.txt— silver cleaning plan (SortMergeJoin on dedup key)proof/plan_before.txt— PageRank join with full exchange on both sidesproof/plan_after.txt— PageRank join with exchange only on ranks side
Metrics comparison
| Stage | Metric | Before | After | Gain |
|---|---|---|---|---|
| PageRank | avg iter (ms) | 835 | 610 | 27% |
| Text index | Parquet/CSV ratio | — | 34.44% | PASS |
| LLM prep | pass ratio | — | 0.00% | FAIL (template too short) |
9. Results and Limits
SLO summary
| SLO | Threshold | Result | Status |
|---|---|---|---|
| Streaming throughput | ≥ 100 rows/s | 57 883 rows/s | PASS |
| Text query latency | ≤ 2 000 ms | 6 907 ms (aircraft) / 471 ms (landing) | PARTIAL — universal terms fail |
| Storage ratio | ≤ 60% | 34.44% | PASS |
| LLM pass ratio | ≥ 80% | 0.00% | FAIL (synthetic sentences too short) |
| PageRank convergence | Δ < 0.001 | Δ = 0.000971 at iter 4 | PASS |
Pipeline integration
Raw OpenSky CSV
└─ Bronze (CSV, immutable) 199.8 MB — 1 526 689 rows
└─ Silver (Parquet, typed+deduped) 61.1 MB — 1 526 689 rows (0 dupes removed)
├─ Gold/density + Gold/aircraft_profile 0.4 MB — partitionBy(year,month)
├─ Streaming (5-min windows → Parquet sink) 57 883 rows/s processed [feeds real-time dashboards]
├─ Text corpus → inverted index 104.8 MB Parquet | 11 823 unique tokens | ratio 34.44%
├─ Proximity graph → PageRank 6 264 vertices | 256 461 edges | converges iter 4
└─ LLM-ready dataset 0 rows (length filter rejects synthetic sentences)
Known limits
- Scale: real SLO validation requires ≥ 10M rows; synthetic dataset of 10 000 rows meets structural requirements but may not stress-test shuffle paths
- Streaming: file-based source simulates real streaming; a Kafka source would reflect production latency more accurately
- Graph: proximity graph depends on sector granularity; 2° cells may merge distinct approach paths in dense areas (LFPG + LFPO both in Paris sector)
- LLM quality: synthetic METAR sentences are structurally short (< 100 chars), causing a 0% pass rate on the length filter; raising the threshold to ≥ 50 chars or enriching the template would restore the SLO
- Text index SLO: universal terms (
aircraft,squawk) generate maximal posting lists that systematically exceed the 2 s latency threshold — the SLO applies to selective, real-world vocabulary
Future work
- Replace file-based streaming with Kafka connector
- Add TF-IDF features alongside inverted index for relevance ranking; pre-filter tokens with df > 90% of corpus
- Run PageRank on the full 24-hour OpenSky snapshot (≥ 50 000 aircraft) to trigger real skew
- Add language detection filter to LLM pipeline (fastText)
- Enrich METAR sentence template to exceed the 100-char quality threshold
DE2 Final Project Report — ESIEE Paris 2025-2026 — Track D Aviation