DE2 — Lab 0: Environment Validation & Plan Reading¶
Author : Badr TAJINI - Data Engineering II (Data-Intensive Workloads) - ESIEE 2025-2026
Goal: Validate your Spark environment and refresh plan-reading skills from DE1.
Tasks:
- Create a Spark session and verify the version.
- Read the sample CSV, enforce schema, write partitioned Parquet.
- Read the Parquet back and compare plans with
explain("formatted"). - Capture Spark UI metrics (Shuffle Read/Write, Input Size).
- Export this notebook as a Python script and run via
spark-submit.
In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
spark = SparkSession.builder \
.appName("DE2-Lab0-Validation") \
.master("local[*]") \
.getOrCreate()
print("Spark version:", spark.version)
print("Spark UI:", spark.sparkContext.uiWebUrl)
WARNING: Using incubator modules: jdk.incubator.vector Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties 26/04/23 09:28:56 WARN Utils: Your hostname, OrdideJustine, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo) 26/04/23 09:28:56 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 26/04/23 09:29:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark version: 4.0.1 Spark UI: http://10.255.255.254:4040
1. Read CSV with explicit schema¶
In [4]:
schema = StructType([
StructField("id", IntegerType(), False),
StructField("category", StringType(), True),
StructField("value", DoubleType(), True),
StructField("text", StringType(), True),
])
df = spark.read.csv("data/sample.csv", header=True, schema=schema)
print(f"Rows: {df.count()}, Columns: {len(df.columns)}")
df.printSchema()
df.show(5)
Rows: 15, Columns: 4 root |-- id: integer (nullable = true) |-- category: string (nullable = true) |-- value: double (nullable = true) |-- text: string (nullable = true) +---+--------+-----+--------------------+ | id|category|value| text| +---+--------+-----+--------------------+ | 1| tech| 42.5|distributed syste...| | 2| science| 88.3|machine learning ...| | 3| tech| 15.7|spark shuffle ope...| | 4|business| 67.2|data warehouses s...| | 5| science| 93.1|clustering algori...| +---+--------+-----+--------------------+ only showing top 5 rows
2. Write partitioned Parquet¶
In [5]:
df.write.mode("overwrite") \
.partitionBy("category") \
.parquet("outputs/lab0/sample_parquet")
print("Parquet written to outputs/lab0/sample_parquet/")
Parquet written to outputs/lab0/sample_parquet/
3. Read Parquet and compare plans¶
In [6]:
df_parquet = spark.read.parquet("outputs/lab0/sample_parquet")
print("=== CSV scan plan ===")
df.explain("formatted")
print("\n=== Parquet scan plan ===")
df_parquet.explain("formatted")
=== CSV scan plan === == Physical Plan == Scan csv (1) (1) Scan csv Output [4]: [id#30, category#31, value#32, text#33] Batched: false Location: InMemoryFileIndex [file:/mnt/c/Users/Justine/OneDrive - ESIEE Paris/Documents/ESIEE/E4/Cours/S2/Data Engineering 2/data/sample.csv] ReadSchema: struct<id:int,category:string,value:double,text:string> === Parquet scan plan === == Physical Plan == * ColumnarToRow (2) +- Scan parquet (1) (1) Scan parquet Output [4]: [id#65, value#66, text#67, category#68] Batched: true Location: InMemoryFileIndex [file:/mnt/c/Users/Justine/OneDrive - ESIEE Paris/Documents/ESIEE/E4/Cours/S2/Data Engineering 2/outputs/lab0/sample_parquet] ReadSchema: struct<id:int,value:double,text:string> (2) ColumnarToRow [codegen id : 1] Input [4]: [id#65, value#66, text#67, category#68]
4. Simple aggregation — capture metrics¶
In [7]:
from pyspark.sql import functions as F
agg_df = df_parquet.groupBy("category").agg(
F.count("*").alias("cnt"),
F.avg("value").alias("avg_value")
)
agg_df.explain("formatted")
agg_df.show()
== Physical Plan ==
AdaptiveSparkPlan (5)
+- HashAggregate (4)
+- Exchange (3)
+- HashAggregate (2)
+- Scan parquet (1)
(1) Scan parquet
Output [2]: [value#66, category#68]
Batched: true
Location: InMemoryFileIndex [file:/mnt/c/Users/Justine/OneDrive - ESIEE Paris/Documents/ESIEE/E4/Cours/S2/Data Engineering 2/outputs/lab0/sample_parquet]
ReadSchema: struct<value:double>
(2) HashAggregate
Input [2]: [value#66, category#68]
Keys [1]: [category#68]
Functions [2]: [partial_count(1), partial_avg(value#66)]
Aggregate Attributes [3]: [count#78L, sum#79, count#80L]
Results [4]: [category#68, count#81L, sum#82, count#83L]
(3) Exchange
Input [4]: [category#68, count#81L, sum#82, count#83L]
Arguments: hashpartitioning(category#68, 200), ENSURE_REQUIREMENTS, [plan_id=131]
(4) HashAggregate
Input [4]: [category#68, count#81L, sum#82, count#83L]
Keys [1]: [category#68]
Functions [2]: [count(1), avg(value#66)]
Aggregate Attributes [2]: [count(1)#75L, avg(value#66)#76]
Results [3]: [category#68, count(1)#75L AS cnt#69L, avg(value#66)#76 AS avg_value#70]
(5) AdaptiveSparkPlan
Output [3]: [category#68, cnt#69L, avg_value#70]
Arguments: isFinalPlan=false
+--------+---+------------------+
|category|cnt| avg_value|
+--------+---+------------------+
| tech| 6|41.733333333333334|
| science| 5| 75.55999999999999|
|business| 4| 59.675|
+--------+---+------------------+
5. Capture evidence¶
- Open http://localhost:4040 and take screenshots of SQL and Jobs tabs.
- Note the Shuffle Read/Write and Input Size values.
- Save screenshots in the
proof/folder.
In [8]:
spark.stop()
print("Lab 0 complete. Environment validated.")
Lab 0 complete. Environment validated.
In [ ]: