In [1]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
In [ ]:
os.getcwd()
In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("Iceberg Demo") \
.master("local[*]") \
.config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.0") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.spark_catalog.type", "hadoop") \
.config("spark.sql.catalog.spark_catalog.warehouse", "/home/jovyan/test") \
.config("spark.sql.defaultCatalog", "spark_catalog") \
.config("spark.sql.warehouse.dir", "/home/jovyan/test") \
.getOrCreate()
Verify catalog setup¶
In [3]:
# Verify catalog setup
print("Available catalogs:")
spark.sql("SHOW CATALOGS").show()
Available catalogs: +-------------+ | catalog| +-------------+ |spark_catalog| +-------------+
Create a database namespace¶
In [4]:
spark.sql("CREATE DATABASE IF NOT EXISTS iceberg_db")
print("Available databases:")
spark.sql("SHOW DATABASES").show()
Available databases: +----------+ | namespace| +----------+ |iceberg_db| +----------+
In [5]:
print("Tables in iceberg_db:")
spark.sql("SHOW TABLES IN iceberg_db").show()
Tables in iceberg_db: +----------+--------------------+-----------+ | namespace| tableName|isTemporary| +----------+--------------------+-----------+ |iceberg_db| advanced_products| false| |iceberg_db| basic_customer| false| |iceberg_db| legacy_inventory| false| |iceberg_db| converted_inventory| false| |iceberg_db| partitioned_orders| false| |iceberg_db|evolving_customer...| false| |iceberg_db| test_customer| false| +----------+--------------------+-----------+
In [6]:
# 2.1 Create a basic table
print("Creating basic_customer table...")
spark.sql("""
CREATE TABLE IF NOT EXISTS iceberg_db.basic_customer (
customer_id INT,
name STRING,
email STRING,
registration_date TIMESTAMP,
active BOOLEAN
) USING iceberg
""")
Creating basic_customer table...
Out[6]:
DataFrame[]
In [7]:
# 2.2 Create a partitioned table
from pyspark.sql.types import *
from pyspark.sql.functions import *
print("Creating partitioned_orders table...")
spark.sql("""
CREATE TABLE IF NOT EXISTS iceberg_db.partitioned_orders (
order_id INT,
customer_id INT,
product_id INT,
quantity INT,
price DOUBLE,
order_date TIMESTAMP,
status STRING
) USING iceberg
PARTITIONED BY (year(order_date), month(order_date), status)
""")
Creating partitioned_orders table...
--------------------------------------------------------------------------- UnsupportedOperationException Traceback (most recent call last) Cell In[7], line 5 3 from pyspark.sql.functions import * 4 print("Creating partitioned_orders table...") ----> 5 spark.sql(""" 6 CREATE TABLE IF NOT EXISTS iceberg_db.partitioned_orders ( 7 order_id INT, 8 customer_id INT, 9 product_id INT, 10 quantity INT, 11 price DOUBLE, 12 order_date TIMESTAMP, 13 status STRING 14 ) USING iceberg 15 PARTITIONED BY (year(order_date), month(order_date), status) 16 """) File /usr/local/spark/python/pyspark/sql/session.py:1631, in SparkSession.sql(self, sqlQuery, args, **kwargs) 1627 assert self._jvm is not None 1628 litArgs = self._jvm.PythonUtils.toArray( 1629 [_to_java_column(lit(v)) for v in (args or [])] 1630 ) -> 1631 return DataFrame(self._jsparkSession.sql(sqlQuery, litArgs), self) 1632 finally: 1633 if len(kwargs) > 0: File /usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args) 1316 command = proto.CALL_COMMAND_NAME +\ 1317 self.command_header +\ 1318 args_command +\ 1319 proto.END_COMMAND_PART 1321 answer = self.gateway_client.send_command(command) -> 1322 return_value = get_return_value( 1323 answer, self.gateway_client, self.target_id, self.name) 1325 for temp_arg in temp_args: 1326 if hasattr(temp_arg, "_detach"): File /usr/local/spark/python/pyspark/errors/exceptions/captured.py:185, in capture_sql_exception.<locals>.deco(*a, **kw) 181 converted = convert_exception(e.java_exception) 182 if not isinstance(converted, UnknownException): 183 # Hide where the exception came from that shows a non-Pythonic 184 # JVM exception message. --> 185 raise converted from None 186 else: 187 raise UnsupportedOperationException: Unsupported partition transform: year(order_date).
In [ ]:
### 2.3 Create a table with advanced properties
In [ ]:
print("Creating advanced_products table...")
spark.sql("""
CREATE TABLE IF NOT EXISTS iceberg_db.advanced_products (
product_id INT,
name STRING,
category STRING,
price DOUBLE,
inventory INT,
last_updated TIMESTAMP
) USING iceberg
PARTITIONED BY (category)
TBLPROPERTIES (
'format-version' = '2',
'write.delete.mode' = 'merge-on-read',
'write.update.mode' = 'merge-on-read',
'write.merge.mode' = 'merge-on-read',
'write.metadata.compression-codec' = 'gzip',
'write.parquet.compression-codec' = 'zstd',
'write.distribution-mode' = 'hash'
)
""")
In [ ]:
### 2.4 Create table with schema evolution enabled
In [5]:
# Create a table
spark.sql("""
CREATE TABLE IF NOT EXISTS iceberg_db.basic_customer (
customer_id INT,
name STRING,
email STRING,
registration_date TIMESTAMP,
active BOOLEAN
) USING iceberg
""")
# Query the table - default catalog is implicit
# spark.sql("SELECT * FROM iceberg_db.basic_customer").show()
# Or with explicit catalog reference
spark.sql("SELECT * FROM spark_catalog.iceberg_db.basic_customer").show()
+-----------+----+-----+-----------------+------+ |customer_id|name|email|registration_date|active| +-----------+----+-----+-----------------+------+ +-----------+----+-----+-----------------+------+
In [8]:
# Create database and table
print("\nCreating database and table...")
spark.sql("CREATE DATABASE IF NOT EXISTS iceberg_db")
spark.sql("""
CREATE TABLE IF NOT EXISTS iceberg_db.test_customer (
customer_id INT,
name STRING,
email STRING,
registration_date TIMESTAMP,
active BOOLEAN
) USING iceberg
""")
Creating database and table...
Out[8]:
DataFrame[]
In [12]:
# Write sample data
import datetime
print("\nWriting sample data...")
sample_data = [
(1, "John Doe", "john@example.com", datetime.datetime.now(), True),
(2, "Jane Smith", "jane@example.com", datetime.datetime.now(), True),
(3, "Bob Johnson", "bob@example.com", datetime.datetime.now(), False)
]
schema = ["customer_id", "name", "email", "registration_date", "active"]
sample_df = spark.createDataFrame(sample_data, schema=schema)
sample_df.writeTo("spark_catalog.iceberg_db.test_customer").append()
Writing sample data...
In [6]:
spark.sql("SELECT * FROM spark_catalog.iceberg_db.test_customer").show()
+-----------+-----------+----------------+--------------------+------+ |customer_id| name| email| registration_date|active| +-----------+-----------+----------------+--------------------+------+ | 1| John Doe|john@example.com|2025-02-20 10:07:...| true| | 2| Jane Smith|jane@example.com|2025-02-20 10:07:...| true| | 3|Bob Johnson| bob@example.com|2025-02-20 10:07:...| false| +-----------+-----------+----------------+--------------------+------+
In [15]:
# COMMAND -------------------------
# 3. Writing Data to Iceberg Tables
# ----------------------------------
In [27]:
import random
import datetime
# 3.1 Generate and write sample customer data
print("Generating sample customer data...")
num_customers = 1000
customers = []
# Generate random customer data
customer_ids = list(range(1, num_customers + 1))
first_names = ["James", "Mary", "John", "Patricia", "Robert", "Jennifer", "Michael", "Linda", "William", "Elizabeth"]
last_names = ["Smith", "Johnson", "Williams", "Jones", "Brown", "Davis", "Miller", "Wilson", "Moore", "Taylor"]
domains = ["gmail.com", "yahoo.com", "hotmail.com", "outlook.com", "icloud.com"]
start_date = datetime.datetime(2020, 1, 1)
end_date = datetime.datetime(2023, 12, 31)
diff = (end_date - start_date).days
Generating sample customer data...
In [17]:
for i in range(num_customers):
first_name = random.choice(first_names)
last_name = random.choice(last_names)
name = f"{first_name} {last_name}"
email = f"{first_name.lower()}.{last_name.lower()}{random.randint(1, 999)}@{random.choice(domains)}"
registration_date = start_date + datetime.timedelta(days=random.randint(0, diff))
active = random.random() > 0.2 # 80% chance of being active
customers.append((i+1, name, email, registration_date, active))
Generating sample customer data...
In [18]:
# Create DataFrame and write to Iceberg table
customer_df = spark.createDataFrame(customers, ["customer_id", "name", "email", "registration_date", "active"])
customer_df.writeTo("spark_catalog.iceberg_db.basic_customer").append()
print(f"Wrote {customer_df.count()} records to basic_customer table")
Wrote 1000 records to basic_customer table
In [15]:
spark.sql("SELECT * FROM spark_catalog.iceberg_db.basic_customer").show()
+-----------+----------------+--------------------+-------------------+------+ |customer_id| name| email| registration_date|active| +-----------+----------------+--------------------+-------------------+------+ | 1| Robert Miller|robert.miller383@...|2022-01-01 00:00:00| true| | 2| Elizabeth Brown|elizabeth.brown13...|2023-12-13 00:00:00| true| | 3| Robert Wilson|robert.wilson137@...|2020-04-05 00:00:00| true| | 4| Elizabeth Smith|elizabeth.smith92...|2021-11-27 00:00:00| true| | 5| Mary Moore|mary.moore309@yah...|2020-10-05 00:00:00| true| | 6| Robert Davis|robert.davis777@g...|2023-09-24 00:00:00| false| | 7| Patricia Wilson|patricia.wilson20...|2022-01-06 00:00:00| true| | 8|Elizabeth Taylor|elizabeth.taylor5...|2021-08-05 00:00:00| true| | 9| Jennifer Smith|jennifer.smith274...|2022-08-09 00:00:00| true| | 10| Jennifer Miller|jennifer.miller81...|2021-04-09 00:00:00| false| | 11| Patricia Taylor|patricia.taylor94...|2021-08-19 00:00:00| true| | 12| Robert Taylor|robert.taylor612@...|2020-10-29 00:00:00| true| | 13| Jennifer Wilson|jennifer.wilson17...|2021-09-11 00:00:00| true| | 14| Jennifer Brown|jennifer.brown163...|2022-05-31 00:00:00| true| | 15| Michael Davis|michael.davis693@...|2020-05-24 00:00:00| true| | 16| Mary Davis|mary.davis741@hot...|2021-05-26 00:00:00| true| | 17|Jennifer Johnson|jennifer.johnson3...|2023-05-17 00:00:00| true| | 18| Michael Jones|michael.jones66@y...|2020-10-02 00:00:00| true| | 19| Robert Williams|robert.williams29...|2022-05-09 00:00:00| true| | 20| Elizabeth Smith|elizabeth.smith53...|2023-01-21 00:00:00| true| +-----------+----------------+--------------------+-------------------+------+ only showing top 20 rows
In [20]:
# 3.2 Generate and write sample orders data
In [24]:
# 2.2 Create a partitioned table
print("Creating partitioned_orders table...")
spark.sql("""
CREATE TABLE IF NOT EXISTS iceberg_db.partitioned_orders (
order_id INT,
customer_id INT,
product_id INT,
quantity INT,
price DOUBLE,
order_date TIMESTAMP,
status STRING
) USING iceberg
PARTITIONED BY (month(order_date), status)
""")
Creating partitioned_orders table...
Out[24]:
DataFrame[]
In [9]:
spark.sql("SELECT * FROM spark_catalog.iceberg_db.partitioned_orders").show()
+--------+-----------+----------+--------+-----+----------+------+ |order_id|customer_id|product_id|quantity|price|order_date|status| +--------+-----------+----------+--------+-----+----------+------+ +--------+-----------+----------+--------+-----+----------+------+
Expensive Operation, Care Memory¶
In [8]:
import random
import datetime
start_date = datetime.datetime(2020, 1, 1)
end_date = datetime.datetime(2023, 12, 31)
diff = (end_date - start_date).days
num_customers = 1000
num_orders = 2000
orders = []
products = list(range(1, 101)) # 100 products
statuses = ["pending", "processing", "shipped", "delivered", "cancelled"]
for i in range(num_orders):
order_id = i + 1
customer_id = random.randint(1, num_customers)
product_id = random.choice(products)
quantity = random.randint(1, 10)
price = random.uniform(10, 1000)
order_date = start_date + datetime.timedelta(days=random.randint(0, diff))
status = random.choice(statuses)
orders.append((order_id, customer_id, product_id, quantity, price, order_date, status))
print(f"Length of orders: {len(orders)}")
Length of orders: 2000
In [ ]:
# Create DataFrame and write to Iceberg table
orders_df = spark.createDataFrame(orders, ["order_id", "customer_id", "product_id", "quantity", "price", "order_date", "status"])
orders_df.writeTo("spark_catalog.iceberg_db.partitioned_orders").append()
print(f"Wrote {orders_df.count()} records to partitioned_orders table")
In [ ]:
Advanced¶
In [18]:
print("Creating advanced_products table...")
spark.sql("""
CREATE TABLE IF NOT EXISTS iceberg_db.advanced_products (
product_id INT,
name STRING,
category STRING,
price DOUBLE,
inventory INT,
last_updated TIMESTAMP
) USING iceberg
PARTITIONED BY (category)
TBLPROPERTIES (
'format-version' = '2',
'write.delete.mode' = 'merge-on-read',
'write.update.mode' = 'merge-on-read',
'write.merge.mode' = 'merge-on-read',
'write.metadata.compression-codec' = 'gzip',
'write.parquet.compression-codec' = 'zstd',
'write.distribution-mode' = 'hash'
)
""")
Creating advanced_products table...
Out[18]:
DataFrame[]
In [22]:
spark.sql("SELECT * FROM spark_catalog.iceberg_db.advanced_products").show()
+----------+--------------+--------+------+---------+--------------------+ |product_id| name|category| price|inventory| last_updated| +----------+--------------+--------+------+---------+--------------------+ | 4| Mystery J322| Books|225.59| 999|2025-01-23 10:41:...| | 13|Self-help H496| Books|222.04| 134|2025-02-03 10:41:...| | 16| Travel V518| Books|325.51| 249|2025-01-22 10:41:...| | 29| Science N290| Books|740.27| 34|2025-01-30 10:41:...| | 35|Biography N113| Books| 469.8| 37|2025-01-22 10:41:...| | 44|Self-help V649| Books|498.29| 784|2025-02-06 10:41:...| | 53|Self-help Q218| Books|641.38| 201|2025-02-19 10:41:...| | 54| Romance S684| Books|359.27| 620|2025-01-27 10:41:...| | 62| Mystery R367| Books|958.87| 126|2025-02-15 10:41:...| | 76| Cookbook D249| Books| 132.5| 281|2025-02-20 10:41:...| | 77| Novel H273| Books|758.36| 718|2025-02-20 10:41:...| | 85| Cookbook W452| Books|733.11| 759|2025-01-31 10:41:...| | 90| Science N291| Books|889.99| 425|2025-02-01 10:41:...| | 99|Biography W486| Books|473.44| 242|2025-02-05 10:41:...| | 15| Sweater E645|Clothing|488.02| 850|2025-02-03 10:41:...| | 20| Hat Y502|Clothing|245.69| 604|2025-02-03 10:41:...| | 39| T-shirt E877|Clothing|170.78| 718|2025-02-04 10:41:...| | 42| Hat R521|Clothing| 137.3| 677|2025-02-03 10:41:...| | 45| Suit L555|Clothing| 368.8| 887|2025-01-28 10:41:...| | 59| Socks U570|Clothing| 296.4| 467|2025-01-28 10:41:...| +----------+--------------+--------+------+---------+--------------------+ only showing top 20 rows
In [20]:
# 3.3 Generate and write sample products data
print("Generating sample products data...")
categories = ["Electronics", "Clothing", "Home", "Sports", "Books", "Beauty", "Toys", "Food", "Jewelry", "Garden"]
product_names = {
"Electronics": ["Smartphone", "Laptop", "Tablet", "Headphones", "TV", "Camera", "Speaker", "Watch", "Monitor", "Keyboard"],
"Clothing": ["T-shirt", "Jeans", "Dress", "Jacket", "Sweater", "Socks", "Hat", "Skirt", "Suit", "Shoes"],
"Home": ["Sofa", "Table", "Chair", "Lamp", "Bed", "Rug", "Mirror", "Curtains", "Pillow", "Blanket"],
"Sports": ["Bicycle", "Treadmill", "Weights", "Yoga Mat", "Basketball", "Tennis Racket", "Golf Clubs", "Soccer Ball", "Helmet", "Skates"],
"Books": ["Novel", "Biography", "Cookbook", "Self-help", "History", "Science", "Fantasy", "Mystery", "Romance", "Travel"],
"Beauty": ["Shampoo", "Perfume", "Lipstick", "Face Cream", "Mascara", "Nail Polish", "Lotion", "Sunscreen", "Soap", "Hair Dryer"],
"Toys": ["Action Figure", "Doll", "Puzzle", "Board Game", "Remote Car", "Building Blocks", "Stuffed Animal", "Art Set", "Science Kit", "Drone"],
"Food": ["Chocolate", "Coffee", "Tea", "Pasta", "Cookies", "Spices", "Olive Oil", "Honey", "Cereal", "Snacks"],
"Jewelry": ["Necklace", "Ring", "Earrings", "Bracelet", "Watch", "Pendant", "Brooch", "Anklet", "Cufflinks", "Tiara"],
"Garden": ["Lawn Mower", "Plant Pot", "Seeds", "Garden Hose", "Shovel", "Rake", "Fertilizer", "Gloves", "Pruning Shears", "Watering Can"]
}
Generating sample products data...
In [21]:
products = []
for i in range(1, 101):
category = random.choice(categories)
name = f"{random.choice(product_names[category])} {chr(65 + random.randint(0, 25))}{random.randint(100, 999)}"
price = round(random.uniform(10, 1000), 2)
inventory = random.randint(0, 1000)
last_updated = datetime.datetime.now() - datetime.timedelta(days=random.randint(0, 30))
products.append((i, name, category, price, inventory, last_updated))
# Create DataFrame and write to Iceberg table
products_df = spark.createDataFrame(products, ["product_id", "name", "category", "price", "inventory", "last_updated"])
products_df.writeTo("iceberg_db.advanced_products").append()
print(f"Wrote {products_df.count()} records to advanced_products table")
Wrote 100 records to advanced_products table
In [23]:
# 2.4 Create table with schema evolution enabled
In [24]:
print("Creating evolving_customer_profile table...")
spark.sql("""
CREATE TABLE IF NOT EXISTS iceberg_db.evolving_customer_profile (
customer_id INT,
profile STRUCT<
name: STRING,
age: INT,
address: STRING,
preferences: MAP<STRING, STRING>
>,
last_updated TIMESTAMP
) USING iceberg
TBLPROPERTIES (
'format-version' = '2',
'write.update.mode' = 'merge-on-read'
)
""")
# List created tables
print("Tables in iceberg_db:")
spark.sql("SHOW TABLES IN iceberg_db").show()
Creating evolving_customer_profile table... Tables in iceberg_db: +----------+--------------------+-----------+ | namespace| tableName|isTemporary| +----------+--------------------+-----------+ |iceberg_db| advanced_products| false| |iceberg_db| basic_customer| false| |iceberg_db| partitioned_orders| false| |iceberg_db|evolving_customer...| false| |iceberg_db| test_customer| false| +----------+--------------------+-----------+
3.4 Generate and write customer profile data with nested structures¶
In [28]:
print("Generating customer profile data with nested structures...")
profile_data = []
preferences_options = {
"communication": ["email", "sms", "phone", "post"],
"payment": ["credit", "debit", "paypal", "crypto", "bank_transfer"],
"shipping": ["standard", "express", "same_day", "pickup"],
"marketing": ["opt_in", "opt_out"],
"theme": ["light", "dark", "system"]
}
for i in range(1, num_customers + 1):
customer_id = i
name = next((c[1] for c in customers if c[0] == i), "Unknown")
age = random.randint(18, 80)
address = f"{random.randint(1, 9999)} {random.choice(['Main', 'Oak', 'Pine', 'Maple', 'Cedar'])} {random.choice(['St', 'Ave', 'Blvd', 'Rd', 'Dr'])}"
# Create random preferences
preferences = {}
for pref_key, options in preferences_options.items():
if random.random() > 0.3: # 70% chance to have this preference
preferences[pref_key] = random.choice(options)
# Create profile struct
profile = {"name": name, "age": age, "address": address, "preferences": preferences}
last_updated = datetime.datetime.now() - datetime.timedelta(days=random.randint(0, 60))
profile_data.append((customer_id, profile, last_updated))
Generating customer profile data with nested structures...
In [30]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
# Create schema for the nested data
profile_schema = StructType([
StructField("customer_id", IntegerType(), False),
StructField("profile", StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
StructField("address", StringType(), True),
StructField("preferences", MapType(StringType(), StringType()), True)
]), True),
StructField("last_updated", TimestampType(), True)
])
# Create DataFrame and write to Iceberg table
profile_df = spark.createDataFrame(profile_data, profile_schema)
profile_df.writeTo("iceberg_db.evolving_customer_profile").append()
print(f"Wrote {profile_df.count()} records to evolving_customer_profile table")
Wrote 1000 records to evolving_customer_profile table
In [31]:
spark.sql("SELECT * FROM spark_catalog.iceberg_db.evolving_customer_profile").show()
+-----------+--------------------+--------------------+
|customer_id| profile| last_updated|
+-----------+--------------------+--------------------+
| 1|{Unknown, 26, 594...|2025-01-04 10:49:...|
| 2|{Unknown, 19, 693...|2025-01-07 10:49:...|
| 3|{Unknown, 61, 551...|2025-02-06 10:49:...|
| 4|{Unknown, 50, 835...|2025-01-01 10:49:...|
| 5|{Unknown, 52, 840...|2024-12-22 10:49:...|
| 6|{Unknown, 80, 370...|2025-01-02 10:49:...|
| 7|{Unknown, 75, 535...|2024-12-31 10:49:...|
| 8|{Unknown, 34, 816...|2025-01-04 10:49:...|
| 9|{Unknown, 21, 334...|2025-01-12 10:49:...|
| 10|{Unknown, 76, 345...|2024-12-26 10:49:...|
| 11|{Unknown, 39, 694...|2025-01-10 10:49:...|
| 12|{Unknown, 29, 652...|2024-12-31 10:49:...|
| 13|{Unknown, 69, 602...|2025-01-27 10:49:...|
| 14|{Unknown, 53, 272...|2025-02-12 10:49:...|
| 15|{Unknown, 78, 655...|2025-01-07 10:49:...|
| 16|{Unknown, 73, 241...|2024-12-29 10:49:...|
| 17|{Unknown, 62, 422...|2025-02-12 10:49:...|
| 18|{Unknown, 58, 385...|2025-02-05 10:49:...|
| 19|{Unknown, 71, 313...|2025-01-03 10:49:...|
| 20|{Unknown, 28, 455...|2024-12-31 10:49:...|
+-----------+--------------------+--------------------+
only showing top 20 rows
In [33]:
print("Creating a non-Iceberg parquet table...")
spark.sql("""
CREATE TABLE IF NOT EXISTS iceberg_db.legacy_inventory (
inventory_id INT,
warehouse_id INT,
product_id INT,
quantity INT,
location STRING,
last_checked TIMESTAMP
) USING parquet
""")
# Generate sample inventory data
num_inventory = 500
warehouses = list(range(1, 6)) # 5 warehouses
locations = ["A1", "A2", "B1", "B2", "C1", "C2", "D1", "D2", "E1", "E2"]
inventory = []
for i in range(num_inventory):
inventory_id = i + 1
warehouse_id = random.choice(warehouses)
product_id = random.randint(1, 100)
quantity = random.randint(0, 200)
location = random.choice(locations)
last_checked = datetime.datetime.now() - datetime.timedelta(days=random.randint(0, 90))
inventory.append((inventory_id, warehouse_id, product_id, quantity, location, last_checked))
# Write to parquet table
inventory_df = spark.createDataFrame(inventory, ["inventory_id", "warehouse_id", "product_id", "quantity", "location", "last_checked"])
inventory_df.write.mode("overwrite").saveAsTable("iceberg_db.legacy_inventory")
print(f"Wrote {inventory_df.count()} records to legacy parquet table")
Creating a non-Iceberg parquet table... Wrote 500 records to legacy parquet table
In [34]:
spark.sql("SELECT * FROM spark_catalog.iceberg_db.legacy_inventory").show()
+------------+------------+----------+--------+--------+--------------------+ |inventory_id|warehouse_id|product_id|quantity|location| last_checked| +------------+------------+----------+--------+--------+--------------------+ | 1| 2| 57| 142| E1|2024-12-18 11:06:...| | 2| 4| 58| 158| A1|2025-01-15 11:06:...| | 3| 4| 48| 53| C1|2025-01-05 11:06:...| | 4| 3| 75| 124| A2|2025-02-13 11:06:...| | 5| 4| 31| 125| E1|2024-12-03 11:06:...| | 6| 2| 65| 36| D1|2024-12-20 11:06:...| | 7| 2| 72| 153| A2|2024-12-02 11:06:...| | 8| 1| 54| 192| C2|2024-12-24 11:06:...| | 9| 3| 77| 51| A2|2025-01-05 11:06:...| | 10| 2| 86| 193| B1|2024-12-18 11:06:...| | 11| 5| 54| 183| E2|2025-01-11 11:06:...| | 12| 2| 12| 125| B2|2025-01-31 11:06:...| | 13| 5| 46| 146| D2|2024-12-25 11:06:...| | 14| 1| 22| 71| C2|2025-02-12 11:06:...| | 15| 5| 6| 38| C2|2025-02-11 11:06:...| | 16| 3| 8| 147| A1|2024-11-29 11:06:...| | 17| 5| 75| 179| C1|2024-12-29 11:06:...| | 18| 3| 22| 26| B1|2025-01-20 11:06:...| | 19| 1| 6| 12| B1|2025-01-02 11:06:...| | 20| 4| 30| 55| A2|2024-12-19 11:06:...| +------------+------------+----------+--------+--------+--------------------+ only showing top 20 rows
In [35]:
# 4.2 Convert the legacy table to Iceberg format
print("Converting legacy_inventory to Iceberg format...")
spark.sql("""
CREATE TABLE IF NOT EXISTS iceberg_db.converted_inventory
USING iceberg
PARTITIONED BY (warehouse_id)
AS SELECT * FROM iceberg_db.legacy_inventory
""")
print("Conversion complete!")
# Verify the table conversion
print("Verifying converted table...")
spark.sql("DESCRIBE EXTENDED iceberg_db.converted_inventory").show(truncate=False)
Converting legacy_inventory to Iceberg format... Conversion complete! Verifying converted table... +----------------------------+--------------------------------------------+-------+ |col_name |data_type |comment| +----------------------------+--------------------------------------------+-------+ |inventory_id |bigint |NULL | |warehouse_id |bigint |NULL | |product_id |bigint |NULL | |quantity |bigint |NULL | |location |string |NULL | |last_checked |timestamp |NULL | |# Partition Information | | | |# col_name |data_type |comment| |warehouse_id |bigint |NULL | | | | | |# Metadata Columns | | | |_spec_id |int | | |_partition |struct<warehouse_id:bigint> | | |_file |string | | |_pos |bigint | | |_deleted |boolean | | | | | | |# Detailed Table Information| | | |Name |spark_catalog.iceberg_db.converted_inventory| | |Type |MANAGED | | +----------------------------+--------------------------------------------+-------+ only showing top 20 rows
In [36]:
# COMMAND ----------
# 5. Reading and Querying Iceberg Tables
# -------------------------------------
In [37]:
print("Performing basic queries...")
# Count records in each table
tables = ["basic_customer", "partitioned_orders", "advanced_products", "evolving_customer_profile", "converted_inventory"]
for table in tables:
count = spark.sql(f"SELECT COUNT(*) as count FROM iceberg_db.{table}").collect()[0]["count"]
print(f"Table {table} has {count} records")
Performing basic queries... Table basic_customer has 1000 records Table partitioned_orders has 0 records Table advanced_products has 100 records Table evolving_customer_profile has 1000 records Table converted_inventory has 500 records
In [38]:
# 5.2 Query with filters on partitioned columns
print("\nQuerying orders with partition filters...")
shipped_orders = spark.sql("""
SELECT order_id, customer_id, order_date, status
FROM iceberg_db.partitioned_orders
WHERE status = 'shipped' AND order_date > '2022-01-01'
""")
print(f"Found {shipped_orders.count()} shipped orders after 2022-01-01")
Querying orders with partition filters... Found 0 shipped orders after 2022-01-01
In [39]:
# 5.3 Join operations
print("\nPerforming join operation between orders and customers...")
customer_orders = spark.sql("""
SELECT c.name, c.email, o.order_id, o.price, o.order_date, o.status
FROM iceberg_db.basic_customer c
JOIN iceberg_db.partitioned_orders o
ON c.customer_id = o.customer_id
WHERE o.price > 500
ORDER BY o.price DESC
LIMIT 10
""")
print("Top 10 high-value orders:")
customer_orders.show(truncate=False)
Performing join operation between orders and customers... Top 10 high-value orders: +----+-----+--------+-----+----------+------+ |name|email|order_id|price|order_date|status| +----+-----+--------+-----+----------+------+ +----+-----+--------+-----+----------+------+
In [41]:
spark.sql("SELECT * FROM spark_catalog.iceberg_db.partitioned_orders").show()
+--------+-----------+----------+--------+-----+----------+------+ |order_id|customer_id|product_id|quantity|price|order_date|status| +--------+-----------+----------+--------+-----+----------+------+ +--------+-----------+----------+--------+-----+----------+------+
5.4 Aggregation queries¶
In [6]:
print("\nPerforming aggregation queries...")
category_stats = spark.sql("""
SELECT
category,
COUNT(*) as product_count,
ROUND(AVG(price), 2) as avg_price,
ROUND(SUM(inventory), 2) as total_inventory,
ROUND(MIN(price), 2) as min_price,
ROUND(MAX(price), 2) as max_price
FROM iceberg_db.advanced_products
GROUP BY category
ORDER BY product_count DESC
""")
print("Product statistics by category:")
category_stats.show(truncate=False)
Performing aggregation queries... Product statistics by category: +-----------+-------------+---------+---------------+---------+---------+ |category |product_count|avg_price|total_inventory|min_price|max_price| +-----------+-------------+---------+---------------+---------+---------+ |Books |14 |530.6 |5609 |132.5 |958.87 | |Garden |11 |419.31 |5733 |22.66 |931.67 | |Food |11 |426.05 |5054 |32.73 |929.76 | |Beauty |11 |638.41 |6223 |103.37 |985.85 | |Electronics|10 |376.17 |4576 |97.48 |824.89 | |Clothing |9 |391.99 |6005 |137.3 |973.95 | |Home |9 |595.74 |4821 |307.41 |969.9 | |Toys |9 |455.92 |4412 |83.36 |882.47 | |Sports |8 |526.45 |2730 |23.71 |997.02 | |Jewelry |8 |490.74 |3704 |118.78 |977.92 | +-----------+-------------+---------+---------------+---------+---------+
5.5 Working with nested data¶
In [7]:
print("\nQuerying nested data structures...")
age_groups = spark.sql("""
SELECT
CASE
WHEN profile.age < 30 THEN '18-29'
WHEN profile.age BETWEEN 30 AND 45 THEN '30-45'
WHEN profile.age BETWEEN 46 AND 60 THEN '46-60'
ELSE '60+'
END as age_group,
COUNT(*) as customer_count,
COLLECT_LIST(profile.preferences['marketing']) as marketing_preferences
FROM iceberg_db.evolving_customer_profile
GROUP BY
CASE
WHEN profile.age < 30 THEN '18-29'
WHEN profile.age BETWEEN 30 AND 45 THEN '30-45'
WHEN profile.age BETWEEN 46 AND 60 THEN '46-60'
ELSE '60+'
END
ORDER BY age_group
""")
print("Customer counts by age group with marketing preferences:")
age_groups.show(truncate=False)
Querying nested data structures... Customer counts by age group with marketing preferences: +---------+--------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |age_group|customer_count|marketing_preferences | +---------+--------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |18-29 |159 |[opt_in, opt_out, opt_out, opt_out, opt_out, opt_in, opt_in, opt_in, opt_in, opt_out, opt_in, opt_out, opt_in, opt_out, opt_out, opt_in, opt_out, opt_out, opt_out, opt_out, opt_out, opt_out, opt_out, opt_in, opt_out, opt_out, opt_in, opt_in, opt_out, opt_in, opt_out, opt_out, opt_in, opt_in, opt_out, opt_out, opt_in, opt_out, opt_in, opt_in, opt_out, opt_in, opt_in, opt_out, opt_in, opt_in, opt_out, opt_out, opt_in, opt_out, opt_out, opt_in, opt_out, opt_out, opt_in, opt_in, opt_in, opt_in, opt_out, opt_out, opt_out, opt_out, opt_in, opt_in, opt_in, opt_in, opt_in, opt_out, opt_out, opt_in, opt_in, opt_out, opt_in, opt_out, opt_out, opt_in, opt_in, opt_in, opt_in, opt_out, opt_out, opt_out, opt_out, opt_out, opt_in, opt_in, opt_out, opt_in, opt_out, opt_out, opt_in, opt_in, opt_out, opt_in, opt_in, opt_in, opt_out, opt_in, opt_in, opt_in, opt_in, opt_out, opt_in, opt_in, opt_in, opt_out, opt_in, opt_in, opt_in] | |30-45 |250 |[opt_in, opt_out, opt_out, opt_out, opt_out, opt_out, opt_out, opt_in, opt_in, opt_out, opt_in, opt_in, opt_in, opt_out, opt_in, opt_in, opt_out, opt_out, opt_in, opt_out, opt_out, opt_out, opt_in, opt_out, opt_in, opt_in, opt_out, opt_in, opt_in, opt_in, opt_in, opt_in, opt_out, opt_out, opt_out, opt_in, opt_out, opt_in, opt_in, opt_out, opt_in, opt_in, opt_in, opt_in, opt_out, opt_in, opt_in, opt_out, opt_out, opt_in, opt_out, opt_in, opt_in, opt_out, opt_out, opt_in, opt_out, opt_in, opt_out, opt_out, opt_out, opt_out, opt_out, opt_out, opt_out, opt_out, opt_out, opt_in, opt_out, opt_in, opt_out, opt_out, opt_out, opt_in, opt_in, opt_out, opt_in, opt_in, opt_in, opt_out, opt_in, opt_in, opt_out, opt_in, opt_in, opt_in, opt_out, opt_in, opt_in, opt_out, opt_in, opt_in, opt_out, opt_out, opt_out, opt_out, opt_out, opt_in, opt_out, opt_in, opt_out, opt_out, opt_in, opt_in, opt_in, opt_in, opt_in, opt_in, opt_in, opt_in, opt_in, opt_in, opt_out, opt_in, opt_in, opt_in, opt_in, opt_in, opt_out, opt_out, opt_out, opt_out, opt_in, opt_in, opt_in, opt_out, opt_out, opt_in, opt_in, opt_in, opt_out, opt_in, opt_out, opt_out, opt_out, opt_out, opt_in, opt_in, opt_out, opt_out, opt_in, opt_out, opt_in, opt_in, opt_in, opt_in, opt_in, opt_in, opt_out, opt_out, opt_in, opt_out, opt_out, opt_in, opt_out, opt_in, opt_in, opt_in, opt_in, opt_in, opt_out, opt_in, opt_out, opt_out, opt_in] | |46-60 |250 |[opt_out, opt_out, opt_in, opt_in, opt_in, opt_out, opt_in, opt_out, opt_out, opt_out, opt_out, opt_in, opt_out, opt_out, opt_in, opt_in, opt_in, opt_out, opt_out, opt_in, opt_out, opt_in, opt_out, opt_out, opt_out, opt_out, opt_in, opt_in, opt_in, opt_in, opt_out, opt_in, opt_out, opt_in, opt_out, opt_out, opt_out, opt_out, opt_in, opt_in, opt_out, opt_out, opt_out, opt_in, opt_out, opt_in, opt_out, opt_in, opt_in, opt_in, opt_out, opt_out, opt_out, opt_in, opt_out, opt_in, opt_out, opt_out, opt_out, opt_in, opt_in, opt_out, opt_in, opt_in, opt_in, opt_out, opt_in, opt_out, opt_in, opt_out, opt_out, opt_in, opt_in, opt_in, opt_in, opt_in, opt_in, opt_out, opt_in, opt_in, opt_out, opt_out, opt_out, opt_out, opt_in, opt_in, opt_in, opt_out, opt_in, opt_out, opt_out, opt_in, opt_out, opt_in, opt_in, opt_in, opt_in, opt_out, opt_in, opt_in, opt_out, opt_in, opt_in, opt_out, opt_in, opt_out, opt_in, opt_out, opt_out, opt_out, opt_out, opt_in, opt_in, opt_in, opt_out, opt_in, opt_in, opt_in, opt_out, opt_out, opt_in, opt_in, opt_out, opt_out, opt_in, opt_in, opt_out, opt_in, opt_out, opt_out, opt_out, opt_in, opt_out, opt_out, opt_out, opt_out, opt_in, opt_out, opt_in, opt_out, opt_in, opt_in, opt_out, opt_in, opt_out, opt_out, opt_out, opt_in, opt_in, opt_out, opt_out, opt_out, opt_in, opt_out, opt_in, opt_in, opt_in, opt_in, opt_in, opt_out, opt_in, opt_out, opt_out, opt_in, opt_out, opt_in, opt_in, opt_out, opt_in, opt_out, opt_in, opt_out, opt_out, opt_in, opt_in, opt_in, opt_out, opt_out, opt_out] | |60+ |341 |[opt_in, opt_in, opt_out, opt_out, opt_in, opt_out, opt_in, opt_out, opt_out, opt_in, opt_out, opt_out, opt_in, opt_out, opt_in, opt_out, opt_out, opt_out, opt_in, opt_out, opt_out, opt_in, opt_in, opt_in, opt_in, opt_out, opt_out, opt_in, opt_in, opt_out, opt_in, opt_out, opt_in, opt_out, opt_in, opt_out, opt_out, opt_in, opt_out, opt_out, opt_in, opt_out, opt_out, opt_out, opt_in, opt_in, opt_in, opt_in, opt_out, opt_out, opt_in, opt_out, opt_in, opt_in, opt_in, opt_in, opt_in, opt_out, opt_out, opt_out, opt_in, opt_in, opt_in, opt_out, opt_in, opt_out, opt_in, opt_in, opt_in, opt_out, opt_in, opt_in, opt_out, opt_in, opt_out, opt_in, opt_in, opt_in, opt_in, opt_out, opt_out, opt_in, opt_in, opt_in, opt_in, opt_in, opt_out, opt_in, opt_in, opt_out, opt_in, opt_in, opt_out, opt_out, opt_out, opt_in, opt_in, opt_in, opt_in, opt_in, opt_in, opt_out, opt_in, opt_out, opt_in, opt_out, opt_out, opt_in, opt_out, opt_out, opt_in, opt_in, opt_out, opt_in, opt_in, opt_out, opt_out, opt_in, opt_in, opt_in, opt_out, opt_out, opt_out, opt_out, opt_out, opt_in, opt_in, opt_out, opt_in, opt_in, opt_out, opt_in, opt_out, opt_out, opt_out, opt_in, opt_in, opt_out, opt_in, opt_out, opt_out, opt_out, opt_in, opt_in, opt_in, opt_out, opt_in, opt_out, opt_out, opt_in, opt_in, opt_in, opt_in, opt_in, opt_out, opt_out, opt_out, opt_in, opt_out, opt_out, opt_in, opt_in, opt_in, opt_in, opt_in, opt_out, opt_out, opt_in, opt_out, opt_in, opt_out, opt_out, opt_out, opt_in, opt_in, opt_out, opt_out, opt_in, opt_out, opt_out, opt_in, opt_in, opt_in, opt_in, opt_in, opt_out, opt_out, opt_in, opt_in, opt_out, opt_in, opt_in, opt_in, opt_out, opt_out, opt_out, opt_in, opt_out, opt_in, opt_in, opt_in, opt_out, opt_in, opt_in, opt_in, opt_in, opt_in, opt_in, opt_in, opt_in, opt_out, opt_out, opt_in, opt_in, opt_out, opt_in, opt_out, opt_in, opt_in, opt_in, opt_in, opt_out, opt_out, opt_in, opt_out, opt_out, opt_in, opt_out, opt_in, opt_in]| +---------+--------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
In [9]:
# 6.1 Adding new columns
print("Demonstrating schema evolution - adding columns...")
spark.sql("""
ALTER TABLE iceberg_db.evolving_customer_profile
ADD COLUMNS (
loyalty_tier STRING,
account_manager INT
)
""")
Demonstrating schema evolution - adding columns...
Out[9]:
DataFrame[]
In [10]:
# Verify new schema
print("Updated schema:")
spark.sql("DESCRIBE iceberg_db.evolving_customer_profile").show()
Updated schema: +---------------+--------------------+-------+ | col_name| data_type|comment| +---------------+--------------------+-------+ | customer_id| int| NULL| | profile|struct<name:strin...| NULL| | last_updated| timestamp| NULL| | loyalty_tier| string| NULL| |account_manager| int| NULL| +---------------+--------------------+-------+
In [11]:
# 6.2 Generate and write data with the new schema
print("Writing data with the new schema...")
# Update some existing profiles with new columns
updated_profiles = spark.sql("""
SELECT
customer_id,
profile,
last_updated,
CASE
WHEN customer_id % 10 = 0 THEN 'Platinum'
WHEN customer_id % 10 <= 3 THEN 'Gold'
WHEN customer_id % 10 <= 7 THEN 'Silver'
ELSE 'Bronze'
END as loyalty_tier,
CASE
WHEN customer_id % 10 = 0 THEN 101
WHEN customer_id % 10 <= 3 THEN 102
ELSE NULL
END as account_manager
FROM iceberg_db.evolving_customer_profile
WHERE customer_id <= 100
""")
Writing data with the new schema...
In [12]:
# Overwrite the selected records
updated_profiles.writeTo("iceberg_db.evolving_customer_profile").overwritePartitions()
print(f"Updated {updated_profiles.count()} customer profiles with new schema")
Updated 100 customer profiles with new schema
In [13]:
# 6.3 Query data with the evolved schema
print("Querying data with evolved schema...")
loyalty_distribution = spark.sql("""
SELECT
loyalty_tier,
COUNT(*) as customer_count,
ROUND(AVG(profile.age), 1) as avg_age,
COUNT(account_manager) as managed_accounts
FROM iceberg_db.evolving_customer_profile
WHERE loyalty_tier IS NOT NULL
GROUP BY loyalty_tier
ORDER BY customer_count DESC
""")
print("Customer distribution by loyalty tier:")
loyalty_distribution.show()
Querying data with evolved schema... Customer distribution by loyalty tier: +------------+--------------+-------+----------------+ |loyalty_tier|customer_count|avg_age|managed_accounts| +------------+--------------+-------+----------------+ | Silver| 40| 51.3| 0| | Gold| 30| 50.6| 30| | Bronze| 20| 49.6| 0| | Platinum| 10| 49.5| 10| +------------+--------------+-------+----------------+
In [14]:
### COMMAND -------------------------
### 7. Demonstrating ACID Transactions
### ---------------------------------
In [16]:
# 7.1 Atomic updates
import time
print("Demonstrating atomic updates in transactions...")
# Update inventory for multiple products in a single transaction
start_time = time.time()
spark.sql("""
UPDATE iceberg_db.advanced_products
SET inventory = inventory - 10, last_updated = current_timestamp()
WHERE product_id IN (1, 2, 3, 4, 5) AND inventory >= 10
""")
end_time = time.time()
print(f"Atomic update completed in {end_time - start_time:.2f} seconds")
Demonstrating atomic updates in transactions... Atomic update completed in 2.95 seconds
In [17]:
# Check updated records
updated_products = spark.sql("""
SELECT product_id, name, inventory, last_updated
FROM iceberg_db.advanced_products
WHERE product_id IN (1, 2, 3, 4, 5)
ORDER BY product_id
""")
print("Updated product inventory:")
updated_products.show(truncate=False)
Updated product inventory: +----------+-------------+---------+--------------------------+ |product_id|name |inventory|last_updated | +----------+-------------+---------+--------------------------+ |1 |Weights Z516 |6 |2025-02-18 10:41:16.743027| |2 |Lamp W258 |607 |2025-02-20 11:55:39.464659| |3 |Soap K303 |6 |2025-02-19 10:41:16.743071| |4 |Mystery J322 |989 |2025-02-20 11:55:39.464659| |5 |Keyboard G880|860 |2025-02-20 11:55:39.464659| +----------+-------------+---------+--------------------------+
In [18]:
# 7.2 Conditional updates (demonstrating Consistency)
print("\nDemonstrating consistency with conditional updates...")
# Only update if conditions are met
conditionally_updated = spark.sql("""
UPDATE iceberg_db.advanced_products
SET
price = price * 1.1,
last_updated = current_timestamp()
WHERE
category = 'Electronics' AND
price < 500 AND
inventory > 50
""")
print("Conditionally updated product prices. Checking results:")
Demonstrating consistency with conditional updates... Conditionally updated product prices. Checking results:
In [19]:
price_updates = spark.sql("""
SELECT
product_id,
name,
category,
price,
inventory,
last_updated
FROM iceberg_db.advanced_products
WHERE
category = 'Electronics'
ORDER BY price DESC
""")
price_updates.show(truncate=False)
+----------+---------------+-----------+------------------+---------+--------------------------+ |product_id|name |category |price |inventory|last_updated | +----------+---------------+-----------+------------------+---------+--------------------------+ |9 |Watch U477 |Electronics|824.89 |994 |2025-01-30 10:41:16.743177| |21 |Camera M950 |Electronics|564.72 |90 |2025-02-04 10:41:16.743293| |49 |Smartphone D281|Electronics|502.3260000000001 |885 |2025-02-20 11:56:10.560806| |19 |Watch W515 |Electronics|456.093 |67 |2025-02-20 11:56:10.560806| |30 |Monitor Q532 |Electronics|417.494 |160 |2025-02-20 11:56:10.560806| |69 |Keyboard R167 |Electronics|383.845 |71 |2025-02-20 11:56:10.560806| |74 |Laptop E772 |Electronics|314.45700000000005|319 |2025-02-20 11:56:10.560806| |82 |TV U710 |Electronics|270.952 |489 |2025-02-20 11:56:10.560806| |5 |Keyboard G880 |Electronics|156.87100000000004|860 |2025-02-20 11:56:10.560806| |96 |Monitor I604 |Electronics|107.22800000000001|631 |2025-02-20 11:56:10.560806| +----------+---------------+-----------+------------------+---------+--------------------------+
In [21]:
import datetime
# 7.3 Merge operations (upserts)
print("\nDemonstrating merge operations (upserts)...")
# Create temporary data for merge
merge_data = [
(101, "New Smart Watch X1", "Electronics", 299.99, 50, datetime.datetime.now()),
(5, "Updated Product Name", "Electronics", 599.99, 75, datetime.datetime.now())
]
merge_df = spark.createDataFrame(merge_data, ["product_id", "name", "category", "price", "inventory", "last_updated"])
merge_df.createOrReplaceTempView("products_updates")
Demonstrating merge operations (upserts)...
In [23]:
# Perform merge operation
spark.sql("""
MERGE INTO iceberg_db.advanced_products t
USING products_updates s
ON t.product_id = s.product_id
WHEN MATCHED THEN
UPDATE SET
t.name = s.name,
t.price = s.price,
t.inventory = s.inventory,
t.last_updated = s.last_updated
WHEN NOT MATCHED THEN
INSERT (product_id, name, category, price, inventory, last_updated)
VALUES (s.product_id, s.name, s.category, s.price, s.inventory, s.last_updated)
""")
Out[23]:
DataFrame[]
In [25]:
# Check results of merge
merged_products = spark.sql("""
SELECT product_id, name, category, price, inventory, last_updated
FROM iceberg_db.advanced_products
WHERE product_id IN (5, 101)
""")
print("Results of merge operation:")
merged_products.show(truncate=False)
Results of merge operation: +----------+--------------------+-----------+------+---------+--------------------------+ |product_id|name |category |price |inventory|last_updated | +----------+--------------------+-----------+------+---------+--------------------------+ |5 |Updated Product Name|Electronics|599.99|75 |2025-02-20 11:56:38.123071| |101 |New Smart Watch X1 |Electronics|299.99|50 |2025-02-20 11:56:38.123064| +----------+--------------------+-----------+------+---------+--------------------------+
In [26]:
# 7.4 Time travel queries (demonstrating Isolation and Durability)
In [28]:
print("\nDemonstrating time travel capabilities...")
# First, check table history
history = spark.sql("SELECT * FROM iceberg_db.advanced_products.history")
print("Table history:")
history.show(truncate=False)
Demonstrating time travel capabilities... Table history: +-----------------------+-------------------+-------------------+-------------------+ |made_current_at |snapshot_id |parent_id |is_current_ancestor| +-----------------------+-------------------+-------------------+-------------------+ |2025-02-20 10:41:24.55 |6465532312590292151|NULL |true | |2025-02-20 11:55:41.873|3954393121797699845|6465532312590292151|true | |2025-02-20 11:56:12.154|6398027269306179278|3954393121797699845|true | |2025-02-20 11:57:14.76 |4142118315220597146|6398027269306179278|true | +-----------------------+-------------------+-------------------+-------------------+
In [29]:
# Get the snapshot ID from before our updates
if history.count() >= 2:
older_snapshot = history.orderBy("made_current_at").collect()[0]["snapshot_id"]
current_snapshot = history.orderBy("made_current_at", ascending=False).collect()[0]["snapshot_id"]
# Query at an earlier snapshot
print(f"\nQuerying data at earlier snapshot: {older_snapshot}")
old_data = spark.read.option("snapshot-id", older_snapshot).table("iceberg_db.advanced_products")
print(f"Number of products in older snapshot: {old_data.count()}")
# Query using timestamp
one_hour_ago = datetime.datetime.now() - datetime.timedelta(hours=1)
timestamp_str = one_hour_ago.strftime("%Y-%m-%d %H:%M:%S")
print(f"\nQuerying data as of {timestamp_str}")
try:
time_travel_data = spark.read.option("as-of-timestamp", timestamp_str).table("iceberg_db.advanced_products")
print(f"Number of products as of {timestamp_str}: {time_travel_data.count()}")
except Exception as e:
print(f"Time travel query failed: {str(e)}")
Querying data at earlier snapshot: 6465532312590292151 Number of products in older snapshot: 100 Querying data as of 2025-02-20 10:58:25 Time travel query failed: For input string: "2025-02-20 10:58:25"
In [30]:
# COMMAND ----------
# 8. Advanced Iceberg Workflows
# ----------------------------
In [32]:
# 9.1 Data Retention and Expiration
print("\nDemonstrating data retention policies...")
# Set retention policy for snapshots
spark.sql("""
ALTER TABLE iceberg_db.partitioned_orders
SET TBLPROPERTIES (
'history.expire.max-snapshot-age-ms' = '604800000', -- 7 days
'history.expire.min-snapshots-to-keep' = '3'
)
""")
Demonstrating data retention policies...
Out[32]:
DataFrame[]
In [37]:
# Check current properties
props = spark.sql("SHOW TBLPROPERTIES iceberg_db.partitioned_orders")
print("Table properties for retention:")
props.filter(col("key").like("%expire%")).show(truncate=False)
# Manually expire snapshots (would normally be run as a scheduled job)
try:
spark.sql("CALL iceberg_catalog.system.expire_snapshots(table => 'iceberg_db.partitioned_orders', older_than => TIMESTAMP '2023-01-01 00:00:00', retain_last => 3)")
print("Expired old snapshots successfully")
except Exception as e:
print(f"Note: Snapshot expiration not supported in this Spark version: {str(e)}")
Table properties for retention: +------------------------------------+---------+ |key |value | +------------------------------------+---------+ |history.expire.max-snapshot-age-ms |604800000| |history.expire.min-snapshots-to-keep|3 | +------------------------------------+---------+ Note: Snapshot expiration not supported in this Spark version: Procedure iceberg_catalog.system.expire_snapshots not found
In [38]:
# 9.2 Table Migration and Repartitioning
print("\nDemonstrating table repartitioning...")
# First, check current partition distribution
partition_info = spark.sql("""
SELECT status, COUNT(*) as partition_count
FROM iceberg_db.partitioned_orders
GROUP BY status
""")
print("Current partition distribution:")
partition_info.show()
Demonstrating table repartitioning... Current partition distribution: +------+---------------+ |status|partition_count| +------+---------------+ +------+---------------+
In [ ]: