In [1]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
In [ ]:
os.getcwd()
In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Iceberg Demo") \
    .master("local[*]") \
    .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.0") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.spark_catalog.type", "hadoop") \
    .config("spark.sql.catalog.spark_catalog.warehouse", "/home/jovyan/test") \
    .config("spark.sql.defaultCatalog", "spark_catalog") \
    .config("spark.sql.warehouse.dir", "/home/jovyan/test") \
    .getOrCreate()

Verify catalog setup¶

In [3]:
# Verify catalog setup
print("Available catalogs:")
spark.sql("SHOW CATALOGS").show()
Available catalogs:
+-------------+
|      catalog|
+-------------+
|spark_catalog|
+-------------+

Create a database namespace¶

In [4]:
spark.sql("CREATE DATABASE IF NOT EXISTS iceberg_db")
print("Available databases:")
spark.sql("SHOW DATABASES").show()
Available databases:
+----------+
| namespace|
+----------+
|iceberg_db|
+----------+

In [5]:
print("Tables in iceberg_db:")
spark.sql("SHOW TABLES IN iceberg_db").show()
Tables in iceberg_db:
+----------+--------------------+-----------+
| namespace|           tableName|isTemporary|
+----------+--------------------+-----------+
|iceberg_db|   advanced_products|      false|
|iceberg_db|      basic_customer|      false|
|iceberg_db|    legacy_inventory|      false|
|iceberg_db| converted_inventory|      false|
|iceberg_db|  partitioned_orders|      false|
|iceberg_db|evolving_customer...|      false|
|iceberg_db|       test_customer|      false|
+----------+--------------------+-----------+

----------------------------¶

2. Creating Iceberg Tables¶

----------------------------¶

In [6]:
# 2.1 Create a basic table
print("Creating basic_customer table...")
spark.sql("""
CREATE TABLE IF NOT EXISTS iceberg_db.basic_customer (
    customer_id INT,
    name STRING,
    email STRING,
    registration_date TIMESTAMP,
    active BOOLEAN
) USING iceberg
""")
Creating basic_customer table...
Out[6]:
DataFrame[]
In [7]:
# 2.2 Create a partitioned table
from pyspark.sql.types import *
from pyspark.sql.functions import *
print("Creating partitioned_orders table...")
spark.sql("""
CREATE TABLE IF NOT EXISTS iceberg_db.partitioned_orders (
    order_id INT,
    customer_id INT,
    product_id INT,
    quantity INT,
    price DOUBLE,
    order_date TIMESTAMP,
    status STRING
) USING iceberg
PARTITIONED BY (year(order_date), month(order_date), status)
""")
Creating partitioned_orders table...
---------------------------------------------------------------------------
UnsupportedOperationException             Traceback (most recent call last)
Cell In[7], line 5
      3 from pyspark.sql.functions import *
      4 print("Creating partitioned_orders table...")
----> 5 spark.sql("""
      6 CREATE TABLE IF NOT EXISTS iceberg_db.partitioned_orders (
      7     order_id INT,
      8     customer_id INT,
      9     product_id INT,
     10     quantity INT,
     11     price DOUBLE,
     12     order_date TIMESTAMP,
     13     status STRING
     14 ) USING iceberg
     15 PARTITIONED BY (year(order_date), month(order_date), status)
     16 """)

File /usr/local/spark/python/pyspark/sql/session.py:1631, in SparkSession.sql(self, sqlQuery, args, **kwargs)
   1627         assert self._jvm is not None
   1628         litArgs = self._jvm.PythonUtils.toArray(
   1629             [_to_java_column(lit(v)) for v in (args or [])]
   1630         )
-> 1631     return DataFrame(self._jsparkSession.sql(sqlQuery, litArgs), self)
   1632 finally:
   1633     if len(kwargs) > 0:

File /usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
   1316 command = proto.CALL_COMMAND_NAME +\
   1317     self.command_header +\
   1318     args_command +\
   1319     proto.END_COMMAND_PART
   1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
   1323     answer, self.gateway_client, self.target_id, self.name)
   1325 for temp_arg in temp_args:
   1326     if hasattr(temp_arg, "_detach"):

File /usr/local/spark/python/pyspark/errors/exceptions/captured.py:185, in capture_sql_exception.<locals>.deco(*a, **kw)
    181 converted = convert_exception(e.java_exception)
    182 if not isinstance(converted, UnknownException):
    183     # Hide where the exception came from that shows a non-Pythonic
    184     # JVM exception message.
--> 185     raise converted from None
    186 else:
    187     raise

UnsupportedOperationException: Unsupported partition transform: year(order_date).
In [ ]:
### 2.3 Create a table with advanced properties
In [ ]:
print("Creating advanced_products table...")
spark.sql("""
CREATE TABLE IF NOT EXISTS iceberg_db.advanced_products (
    product_id INT,
    name STRING,
    category STRING,
    price DOUBLE,
    inventory INT,
    last_updated TIMESTAMP
) USING iceberg
PARTITIONED BY (category)
TBLPROPERTIES (
    'format-version' = '2',
    'write.delete.mode' = 'merge-on-read',
    'write.update.mode' = 'merge-on-read',
    'write.merge.mode' = 'merge-on-read',
    'write.metadata.compression-codec' = 'gzip',
    'write.parquet.compression-codec' = 'zstd',
    'write.distribution-mode' = 'hash'
)
""")
In [ ]:
### 2.4 Create table with schema evolution enabled
In [5]:
# Create a table
spark.sql("""
CREATE TABLE IF NOT EXISTS iceberg_db.basic_customer (
    customer_id INT,
    name STRING,
    email STRING,
    registration_date TIMESTAMP,
    active BOOLEAN
) USING iceberg
""")

# Query the table - default catalog is implicit
# spark.sql("SELECT * FROM iceberg_db.basic_customer").show()

# Or with explicit catalog reference
spark.sql("SELECT * FROM spark_catalog.iceberg_db.basic_customer").show()
+-----------+----+-----+-----------------+------+
|customer_id|name|email|registration_date|active|
+-----------+----+-----+-----------------+------+
+-----------+----+-----+-----------------+------+

In [8]:
# Create database and table
print("\nCreating database and table...")
spark.sql("CREATE DATABASE IF NOT EXISTS iceberg_db")
spark.sql("""
CREATE TABLE IF NOT EXISTS iceberg_db.test_customer (
    customer_id INT,
    name STRING,
    email STRING,
    registration_date TIMESTAMP,
    active BOOLEAN
) USING iceberg
""")
Creating database and table...
Out[8]:
DataFrame[]
In [12]:
# Write sample data
import datetime
print("\nWriting sample data...")
sample_data = [
    (1, "John Doe", "john@example.com", datetime.datetime.now(), True),
    (2, "Jane Smith", "jane@example.com", datetime.datetime.now(), True),
    (3, "Bob Johnson", "bob@example.com", datetime.datetime.now(), False)
]
schema = ["customer_id", "name", "email", "registration_date", "active"]
sample_df = spark.createDataFrame(sample_data, schema=schema)
sample_df.writeTo("spark_catalog.iceberg_db.test_customer").append()
Writing sample data...
In [6]:
spark.sql("SELECT * FROM spark_catalog.iceberg_db.test_customer").show()
+-----------+-----------+----------------+--------------------+------+
|customer_id|       name|           email|   registration_date|active|
+-----------+-----------+----------------+--------------------+------+
|          1|   John Doe|john@example.com|2025-02-20 10:07:...|  true|
|          2| Jane Smith|jane@example.com|2025-02-20 10:07:...|  true|
|          3|Bob Johnson| bob@example.com|2025-02-20 10:07:...| false|
+-----------+-----------+----------------+--------------------+------+

In [15]:
# COMMAND -------------------------
# 3. Writing Data to Iceberg Tables
# ----------------------------------
In [27]:
import random
import datetime
# 3.1 Generate and write sample customer data
print("Generating sample customer data...")
num_customers = 1000
customers = []

# Generate random customer data
customer_ids = list(range(1, num_customers + 1))
first_names = ["James", "Mary", "John", "Patricia", "Robert", "Jennifer", "Michael", "Linda", "William", "Elizabeth"]
last_names = ["Smith", "Johnson", "Williams", "Jones", "Brown", "Davis", "Miller", "Wilson", "Moore", "Taylor"]
domains = ["gmail.com", "yahoo.com", "hotmail.com", "outlook.com", "icloud.com"]

start_date = datetime.datetime(2020, 1, 1)
end_date = datetime.datetime(2023, 12, 31)
diff = (end_date - start_date).days
Generating sample customer data...
In [17]:
for i in range(num_customers):
    first_name = random.choice(first_names)
    last_name = random.choice(last_names)
    name = f"{first_name} {last_name}"
    email = f"{first_name.lower()}.{last_name.lower()}{random.randint(1, 999)}@{random.choice(domains)}"
    registration_date = start_date + datetime.timedelta(days=random.randint(0, diff))
    active = random.random() > 0.2  # 80% chance of being active
    customers.append((i+1, name, email, registration_date, active))
Generating sample customer data...
In [18]:
# Create DataFrame and write to Iceberg table
customer_df = spark.createDataFrame(customers, ["customer_id", "name", "email", "registration_date", "active"])
customer_df.writeTo("spark_catalog.iceberg_db.basic_customer").append()
print(f"Wrote {customer_df.count()} records to basic_customer table")
Wrote 1000 records to basic_customer table
In [15]:
spark.sql("SELECT * FROM spark_catalog.iceberg_db.basic_customer").show()
+-----------+----------------+--------------------+-------------------+------+
|customer_id|            name|               email|  registration_date|active|
+-----------+----------------+--------------------+-------------------+------+
|          1|   Robert Miller|robert.miller383@...|2022-01-01 00:00:00|  true|
|          2| Elizabeth Brown|elizabeth.brown13...|2023-12-13 00:00:00|  true|
|          3|   Robert Wilson|robert.wilson137@...|2020-04-05 00:00:00|  true|
|          4| Elizabeth Smith|elizabeth.smith92...|2021-11-27 00:00:00|  true|
|          5|      Mary Moore|mary.moore309@yah...|2020-10-05 00:00:00|  true|
|          6|    Robert Davis|robert.davis777@g...|2023-09-24 00:00:00| false|
|          7| Patricia Wilson|patricia.wilson20...|2022-01-06 00:00:00|  true|
|          8|Elizabeth Taylor|elizabeth.taylor5...|2021-08-05 00:00:00|  true|
|          9|  Jennifer Smith|jennifer.smith274...|2022-08-09 00:00:00|  true|
|         10| Jennifer Miller|jennifer.miller81...|2021-04-09 00:00:00| false|
|         11| Patricia Taylor|patricia.taylor94...|2021-08-19 00:00:00|  true|
|         12|   Robert Taylor|robert.taylor612@...|2020-10-29 00:00:00|  true|
|         13| Jennifer Wilson|jennifer.wilson17...|2021-09-11 00:00:00|  true|
|         14|  Jennifer Brown|jennifer.brown163...|2022-05-31 00:00:00|  true|
|         15|   Michael Davis|michael.davis693@...|2020-05-24 00:00:00|  true|
|         16|      Mary Davis|mary.davis741@hot...|2021-05-26 00:00:00|  true|
|         17|Jennifer Johnson|jennifer.johnson3...|2023-05-17 00:00:00|  true|
|         18|   Michael Jones|michael.jones66@y...|2020-10-02 00:00:00|  true|
|         19| Robert Williams|robert.williams29...|2022-05-09 00:00:00|  true|
|         20| Elizabeth Smith|elizabeth.smith53...|2023-01-21 00:00:00|  true|
+-----------+----------------+--------------------+-------------------+------+
only showing top 20 rows

In [20]:
# 3.2 Generate and write sample orders data
In [24]:
# 2.2 Create a partitioned table
print("Creating partitioned_orders table...")
spark.sql("""
CREATE TABLE IF NOT EXISTS iceberg_db.partitioned_orders (
    order_id INT,
    customer_id INT,
    product_id INT,
    quantity INT,
    price DOUBLE,
    order_date TIMESTAMP,
    status STRING
) USING iceberg
PARTITIONED BY (month(order_date), status)
""")
Creating partitioned_orders table...
Out[24]:
DataFrame[]
In [9]:
spark.sql("SELECT * FROM spark_catalog.iceberg_db.partitioned_orders").show()
+--------+-----------+----------+--------+-----+----------+------+
|order_id|customer_id|product_id|quantity|price|order_date|status|
+--------+-----------+----------+--------+-----+----------+------+
+--------+-----------+----------+--------+-----+----------+------+

Expensive Operation, Care Memory¶

In [8]:
import random
import datetime

start_date = datetime.datetime(2020, 1, 1)
end_date = datetime.datetime(2023, 12, 31)
diff = (end_date - start_date).days

num_customers = 1000
num_orders = 2000
orders = []
products = list(range(1, 101))  # 100 products
statuses = ["pending", "processing", "shipped", "delivered", "cancelled"]

for i in range(num_orders):
    order_id = i + 1
    customer_id = random.randint(1, num_customers)
    product_id = random.choice(products)
    quantity = random.randint(1, 10)
    price = random.uniform(10, 1000)
    order_date = start_date + datetime.timedelta(days=random.randint(0, diff))
    status = random.choice(statuses)
    orders.append((order_id, customer_id, product_id, quantity, price, order_date, status))

print(f"Length of orders: {len(orders)}")
Length of orders: 2000
In [ ]:
# Create DataFrame and write to Iceberg table
orders_df = spark.createDataFrame(orders, ["order_id", "customer_id", "product_id", "quantity", "price", "order_date", "status"])
orders_df.writeTo("spark_catalog.iceberg_db.partitioned_orders").append()
print(f"Wrote {orders_df.count()} records to partitioned_orders table")
In [ ]:
 

Advanced¶

In [18]:
print("Creating advanced_products table...")
spark.sql("""
CREATE TABLE IF NOT EXISTS iceberg_db.advanced_products (
    product_id INT,
    name STRING,
    category STRING,
    price DOUBLE,
    inventory INT,
    last_updated TIMESTAMP
) USING iceberg
PARTITIONED BY (category)
TBLPROPERTIES (
    'format-version' = '2',
    'write.delete.mode' = 'merge-on-read',
    'write.update.mode' = 'merge-on-read',
    'write.merge.mode' = 'merge-on-read',
    'write.metadata.compression-codec' = 'gzip',
    'write.parquet.compression-codec' = 'zstd',
    'write.distribution-mode' = 'hash'
)
""")
Creating advanced_products table...
Out[18]:
DataFrame[]
In [22]:
spark.sql("SELECT * FROM spark_catalog.iceberg_db.advanced_products").show()
+----------+--------------+--------+------+---------+--------------------+
|product_id|          name|category| price|inventory|        last_updated|
+----------+--------------+--------+------+---------+--------------------+
|         4|  Mystery J322|   Books|225.59|      999|2025-01-23 10:41:...|
|        13|Self-help H496|   Books|222.04|      134|2025-02-03 10:41:...|
|        16|   Travel V518|   Books|325.51|      249|2025-01-22 10:41:...|
|        29|  Science N290|   Books|740.27|       34|2025-01-30 10:41:...|
|        35|Biography N113|   Books| 469.8|       37|2025-01-22 10:41:...|
|        44|Self-help V649|   Books|498.29|      784|2025-02-06 10:41:...|
|        53|Self-help Q218|   Books|641.38|      201|2025-02-19 10:41:...|
|        54|  Romance S684|   Books|359.27|      620|2025-01-27 10:41:...|
|        62|  Mystery R367|   Books|958.87|      126|2025-02-15 10:41:...|
|        76| Cookbook D249|   Books| 132.5|      281|2025-02-20 10:41:...|
|        77|    Novel H273|   Books|758.36|      718|2025-02-20 10:41:...|
|        85| Cookbook W452|   Books|733.11|      759|2025-01-31 10:41:...|
|        90|  Science N291|   Books|889.99|      425|2025-02-01 10:41:...|
|        99|Biography W486|   Books|473.44|      242|2025-02-05 10:41:...|
|        15|  Sweater E645|Clothing|488.02|      850|2025-02-03 10:41:...|
|        20|      Hat Y502|Clothing|245.69|      604|2025-02-03 10:41:...|
|        39|  T-shirt E877|Clothing|170.78|      718|2025-02-04 10:41:...|
|        42|      Hat R521|Clothing| 137.3|      677|2025-02-03 10:41:...|
|        45|     Suit L555|Clothing| 368.8|      887|2025-01-28 10:41:...|
|        59|    Socks U570|Clothing| 296.4|      467|2025-01-28 10:41:...|
+----------+--------------+--------+------+---------+--------------------+
only showing top 20 rows

In [20]:
# 3.3 Generate and write sample products data
print("Generating sample products data...")
categories = ["Electronics", "Clothing", "Home", "Sports", "Books", "Beauty", "Toys", "Food", "Jewelry", "Garden"]
product_names = {
    "Electronics": ["Smartphone", "Laptop", "Tablet", "Headphones", "TV", "Camera", "Speaker", "Watch", "Monitor", "Keyboard"],
    "Clothing": ["T-shirt", "Jeans", "Dress", "Jacket", "Sweater", "Socks", "Hat", "Skirt", "Suit", "Shoes"],
    "Home": ["Sofa", "Table", "Chair", "Lamp", "Bed", "Rug", "Mirror", "Curtains", "Pillow", "Blanket"],
    "Sports": ["Bicycle", "Treadmill", "Weights", "Yoga Mat", "Basketball", "Tennis Racket", "Golf Clubs", "Soccer Ball", "Helmet", "Skates"],
    "Books": ["Novel", "Biography", "Cookbook", "Self-help", "History", "Science", "Fantasy", "Mystery", "Romance", "Travel"],
    "Beauty": ["Shampoo", "Perfume", "Lipstick", "Face Cream", "Mascara", "Nail Polish", "Lotion", "Sunscreen", "Soap", "Hair Dryer"],
    "Toys": ["Action Figure", "Doll", "Puzzle", "Board Game", "Remote Car", "Building Blocks", "Stuffed Animal", "Art Set", "Science Kit", "Drone"],
    "Food": ["Chocolate", "Coffee", "Tea", "Pasta", "Cookies", "Spices", "Olive Oil", "Honey", "Cereal", "Snacks"],
    "Jewelry": ["Necklace", "Ring", "Earrings", "Bracelet", "Watch", "Pendant", "Brooch", "Anklet", "Cufflinks", "Tiara"],
    "Garden": ["Lawn Mower", "Plant Pot", "Seeds", "Garden Hose", "Shovel", "Rake", "Fertilizer", "Gloves", "Pruning Shears", "Watering Can"]
}
Generating sample products data...
In [21]:
products = []
for i in range(1, 101):
    category = random.choice(categories)
    name = f"{random.choice(product_names[category])} {chr(65 + random.randint(0, 25))}{random.randint(100, 999)}"
    price = round(random.uniform(10, 1000), 2)
    inventory = random.randint(0, 1000)
    last_updated = datetime.datetime.now() - datetime.timedelta(days=random.randint(0, 30))
    products.append((i, name, category, price, inventory, last_updated))

# Create DataFrame and write to Iceberg table
products_df = spark.createDataFrame(products, ["product_id", "name", "category", "price", "inventory", "last_updated"])
products_df.writeTo("iceberg_db.advanced_products").append()
print(f"Wrote {products_df.count()} records to advanced_products table")
Wrote 100 records to advanced_products table
In [23]:
# 2.4 Create table with schema evolution enabled
In [24]:
print("Creating evolving_customer_profile table...")
spark.sql("""
CREATE TABLE IF NOT EXISTS iceberg_db.evolving_customer_profile (
    customer_id INT,
    profile STRUCT<
        name: STRING,
        age: INT,
        address: STRING,
        preferences: MAP<STRING, STRING>
    >,
    last_updated TIMESTAMP
) USING iceberg
TBLPROPERTIES (
    'format-version' = '2',
    'write.update.mode' = 'merge-on-read'
)
""")

# List created tables
print("Tables in iceberg_db:")
spark.sql("SHOW TABLES IN iceberg_db").show()
Creating evolving_customer_profile table...
Tables in iceberg_db:
+----------+--------------------+-----------+
| namespace|           tableName|isTemporary|
+----------+--------------------+-----------+
|iceberg_db|   advanced_products|      false|
|iceberg_db|      basic_customer|      false|
|iceberg_db|  partitioned_orders|      false|
|iceberg_db|evolving_customer...|      false|
|iceberg_db|       test_customer|      false|
+----------+--------------------+-----------+

3.4 Generate and write customer profile data with nested structures¶

In [28]:
print("Generating customer profile data with nested structures...")
profile_data = []
preferences_options = {
    "communication": ["email", "sms", "phone", "post"],
    "payment": ["credit", "debit", "paypal", "crypto", "bank_transfer"],
    "shipping": ["standard", "express", "same_day", "pickup"],
    "marketing": ["opt_in", "opt_out"],
    "theme": ["light", "dark", "system"]
}

for i in range(1, num_customers + 1):
    customer_id = i
    name = next((c[1] for c in customers if c[0] == i), "Unknown")
    age = random.randint(18, 80)
    address = f"{random.randint(1, 9999)} {random.choice(['Main', 'Oak', 'Pine', 'Maple', 'Cedar'])} {random.choice(['St', 'Ave', 'Blvd', 'Rd', 'Dr'])}"
    
    # Create random preferences
    preferences = {}
    for pref_key, options in preferences_options.items():
        if random.random() > 0.3:  # 70% chance to have this preference
            preferences[pref_key] = random.choice(options)
    
    # Create profile struct
    profile = {"name": name, "age": age, "address": address, "preferences": preferences}
    last_updated = datetime.datetime.now() - datetime.timedelta(days=random.randint(0, 60))
    
    profile_data.append((customer_id, profile, last_updated))
Generating customer profile data with nested structures...
In [30]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Create schema for the nested data
profile_schema = StructType([
    StructField("customer_id", IntegerType(), False),
    StructField("profile", StructType([
        StructField("name", StringType(), True),
        StructField("age", IntegerType(), True),
        StructField("address", StringType(), True),
        StructField("preferences", MapType(StringType(), StringType()), True)
    ]), True),
    StructField("last_updated", TimestampType(), True)
])

# Create DataFrame and write to Iceberg table
profile_df = spark.createDataFrame(profile_data, profile_schema)
profile_df.writeTo("iceberg_db.evolving_customer_profile").append()
print(f"Wrote {profile_df.count()} records to evolving_customer_profile table")
Wrote 1000 records to evolving_customer_profile table
In [31]:
spark.sql("SELECT * FROM spark_catalog.iceberg_db.evolving_customer_profile").show()
+-----------+--------------------+--------------------+
|customer_id|             profile|        last_updated|
+-----------+--------------------+--------------------+
|          1|{Unknown, 26, 594...|2025-01-04 10:49:...|
|          2|{Unknown, 19, 693...|2025-01-07 10:49:...|
|          3|{Unknown, 61, 551...|2025-02-06 10:49:...|
|          4|{Unknown, 50, 835...|2025-01-01 10:49:...|
|          5|{Unknown, 52, 840...|2024-12-22 10:49:...|
|          6|{Unknown, 80, 370...|2025-01-02 10:49:...|
|          7|{Unknown, 75, 535...|2024-12-31 10:49:...|
|          8|{Unknown, 34, 816...|2025-01-04 10:49:...|
|          9|{Unknown, 21, 334...|2025-01-12 10:49:...|
|         10|{Unknown, 76, 345...|2024-12-26 10:49:...|
|         11|{Unknown, 39, 694...|2025-01-10 10:49:...|
|         12|{Unknown, 29, 652...|2024-12-31 10:49:...|
|         13|{Unknown, 69, 602...|2025-01-27 10:49:...|
|         14|{Unknown, 53, 272...|2025-02-12 10:49:...|
|         15|{Unknown, 78, 655...|2025-01-07 10:49:...|
|         16|{Unknown, 73, 241...|2024-12-29 10:49:...|
|         17|{Unknown, 62, 422...|2025-02-12 10:49:...|
|         18|{Unknown, 58, 385...|2025-02-05 10:49:...|
|         19|{Unknown, 71, 313...|2025-01-03 10:49:...|
|         20|{Unknown, 28, 455...|2024-12-31 10:49:...|
+-----------+--------------------+--------------------+
only showing top 20 rows

COMMAND ------------------------------¶

4. Converting Existing Data to Iceberg¶

--------------------------------------¶

In [33]:
print("Creating a non-Iceberg parquet table...")
spark.sql("""
CREATE TABLE IF NOT EXISTS iceberg_db.legacy_inventory (
    inventory_id INT,
    warehouse_id INT,
    product_id INT,
    quantity INT,
    location STRING,
    last_checked TIMESTAMP
) USING parquet
""")

# Generate sample inventory data
num_inventory = 500
warehouses = list(range(1, 6))  # 5 warehouses
locations = ["A1", "A2", "B1", "B2", "C1", "C2", "D1", "D2", "E1", "E2"]

inventory = []
for i in range(num_inventory):
    inventory_id = i + 1
    warehouse_id = random.choice(warehouses)
    product_id = random.randint(1, 100)
    quantity = random.randint(0, 200)
    location = random.choice(locations)
    last_checked = datetime.datetime.now() - datetime.timedelta(days=random.randint(0, 90))
    inventory.append((inventory_id, warehouse_id, product_id, quantity, location, last_checked))

# Write to parquet table
inventory_df = spark.createDataFrame(inventory, ["inventory_id", "warehouse_id", "product_id", "quantity", "location", "last_checked"])
inventory_df.write.mode("overwrite").saveAsTable("iceberg_db.legacy_inventory")
print(f"Wrote {inventory_df.count()} records to legacy parquet table")
Creating a non-Iceberg parquet table...
Wrote 500 records to legacy parquet table
In [34]:
spark.sql("SELECT * FROM spark_catalog.iceberg_db.legacy_inventory").show()
+------------+------------+----------+--------+--------+--------------------+
|inventory_id|warehouse_id|product_id|quantity|location|        last_checked|
+------------+------------+----------+--------+--------+--------------------+
|           1|           2|        57|     142|      E1|2024-12-18 11:06:...|
|           2|           4|        58|     158|      A1|2025-01-15 11:06:...|
|           3|           4|        48|      53|      C1|2025-01-05 11:06:...|
|           4|           3|        75|     124|      A2|2025-02-13 11:06:...|
|           5|           4|        31|     125|      E1|2024-12-03 11:06:...|
|           6|           2|        65|      36|      D1|2024-12-20 11:06:...|
|           7|           2|        72|     153|      A2|2024-12-02 11:06:...|
|           8|           1|        54|     192|      C2|2024-12-24 11:06:...|
|           9|           3|        77|      51|      A2|2025-01-05 11:06:...|
|          10|           2|        86|     193|      B1|2024-12-18 11:06:...|
|          11|           5|        54|     183|      E2|2025-01-11 11:06:...|
|          12|           2|        12|     125|      B2|2025-01-31 11:06:...|
|          13|           5|        46|     146|      D2|2024-12-25 11:06:...|
|          14|           1|        22|      71|      C2|2025-02-12 11:06:...|
|          15|           5|         6|      38|      C2|2025-02-11 11:06:...|
|          16|           3|         8|     147|      A1|2024-11-29 11:06:...|
|          17|           5|        75|     179|      C1|2024-12-29 11:06:...|
|          18|           3|        22|      26|      B1|2025-01-20 11:06:...|
|          19|           1|         6|      12|      B1|2025-01-02 11:06:...|
|          20|           4|        30|      55|      A2|2024-12-19 11:06:...|
+------------+------------+----------+--------+--------+--------------------+
only showing top 20 rows

In [35]:
# 4.2 Convert the legacy table to Iceberg format
print("Converting legacy_inventory to Iceberg format...")
spark.sql("""
CREATE TABLE IF NOT EXISTS iceberg_db.converted_inventory
USING iceberg
PARTITIONED BY (warehouse_id)
AS SELECT * FROM iceberg_db.legacy_inventory
""")
print("Conversion complete!")

# Verify the table conversion
print("Verifying converted table...")
spark.sql("DESCRIBE EXTENDED iceberg_db.converted_inventory").show(truncate=False)
Converting legacy_inventory to Iceberg format...
Conversion complete!
Verifying converted table...
+----------------------------+--------------------------------------------+-------+
|col_name                    |data_type                                   |comment|
+----------------------------+--------------------------------------------+-------+
|inventory_id                |bigint                                      |NULL   |
|warehouse_id                |bigint                                      |NULL   |
|product_id                  |bigint                                      |NULL   |
|quantity                    |bigint                                      |NULL   |
|location                    |string                                      |NULL   |
|last_checked                |timestamp                                   |NULL   |
|# Partition Information     |                                            |       |
|# col_name                  |data_type                                   |comment|
|warehouse_id                |bigint                                      |NULL   |
|                            |                                            |       |
|# Metadata Columns          |                                            |       |
|_spec_id                    |int                                         |       |
|_partition                  |struct<warehouse_id:bigint>                 |       |
|_file                       |string                                      |       |
|_pos                        |bigint                                      |       |
|_deleted                    |boolean                                     |       |
|                            |                                            |       |
|# Detailed Table Information|                                            |       |
|Name                        |spark_catalog.iceberg_db.converted_inventory|       |
|Type                        |MANAGED                                     |       |
+----------------------------+--------------------------------------------+-------+
only showing top 20 rows

In [36]:
# COMMAND ----------
# 5. Reading and Querying Iceberg Tables
# -------------------------------------
In [37]:
print("Performing basic queries...")

# Count records in each table
tables = ["basic_customer", "partitioned_orders", "advanced_products", "evolving_customer_profile", "converted_inventory"]
for table in tables:
    count = spark.sql(f"SELECT COUNT(*) as count FROM iceberg_db.{table}").collect()[0]["count"]
    print(f"Table {table} has {count} records")
Performing basic queries...
Table basic_customer has 1000 records
Table partitioned_orders has 0 records
Table advanced_products has 100 records
Table evolving_customer_profile has 1000 records
Table converted_inventory has 500 records
In [38]:
# 5.2 Query with filters on partitioned columns
print("\nQuerying orders with partition filters...")
shipped_orders = spark.sql("""
SELECT order_id, customer_id, order_date, status
FROM iceberg_db.partitioned_orders
WHERE status = 'shipped' AND order_date > '2022-01-01'
""")
print(f"Found {shipped_orders.count()} shipped orders after 2022-01-01")
Querying orders with partition filters...
Found 0 shipped orders after 2022-01-01
In [39]:
# 5.3 Join operations
print("\nPerforming join operation between orders and customers...")
customer_orders = spark.sql("""
SELECT c.name, c.email, o.order_id, o.price, o.order_date, o.status
FROM iceberg_db.basic_customer c
JOIN iceberg_db.partitioned_orders o
  ON c.customer_id = o.customer_id
WHERE o.price > 500
ORDER BY o.price DESC
LIMIT 10
""")
print("Top 10 high-value orders:")
customer_orders.show(truncate=False)
Performing join operation between orders and customers...
Top 10 high-value orders:
+----+-----+--------+-----+----------+------+
|name|email|order_id|price|order_date|status|
+----+-----+--------+-----+----------+------+
+----+-----+--------+-----+----------+------+

In [41]:
spark.sql("SELECT * FROM spark_catalog.iceberg_db.partitioned_orders").show()
+--------+-----------+----------+--------+-----+----------+------+
|order_id|customer_id|product_id|quantity|price|order_date|status|
+--------+-----------+----------+--------+-----+----------+------+
+--------+-----------+----------+--------+-----+----------+------+

5.4 Aggregation queries¶

In [6]:
print("\nPerforming aggregation queries...")
category_stats = spark.sql("""
SELECT 
    category,
    COUNT(*) as product_count,
    ROUND(AVG(price), 2) as avg_price,
    ROUND(SUM(inventory), 2) as total_inventory,
    ROUND(MIN(price), 2) as min_price,
    ROUND(MAX(price), 2) as max_price
FROM iceberg_db.advanced_products
GROUP BY category
ORDER BY product_count DESC
""")
print("Product statistics by category:")
category_stats.show(truncate=False)
Performing aggregation queries...
Product statistics by category:
+-----------+-------------+---------+---------------+---------+---------+
|category   |product_count|avg_price|total_inventory|min_price|max_price|
+-----------+-------------+---------+---------------+---------+---------+
|Books      |14           |530.6    |5609           |132.5    |958.87   |
|Garden     |11           |419.31   |5733           |22.66    |931.67   |
|Food       |11           |426.05   |5054           |32.73    |929.76   |
|Beauty     |11           |638.41   |6223           |103.37   |985.85   |
|Electronics|10           |376.17   |4576           |97.48    |824.89   |
|Clothing   |9            |391.99   |6005           |137.3    |973.95   |
|Home       |9            |595.74   |4821           |307.41   |969.9    |
|Toys       |9            |455.92   |4412           |83.36    |882.47   |
|Sports     |8            |526.45   |2730           |23.71    |997.02   |
|Jewelry    |8            |490.74   |3704           |118.78   |977.92   |
+-----------+-------------+---------+---------------+---------+---------+

5.5 Working with nested data¶

In [7]:
print("\nQuerying nested data structures...")
age_groups = spark.sql("""
SELECT 
    CASE 
        WHEN profile.age < 30 THEN '18-29'
        WHEN profile.age BETWEEN 30 AND 45 THEN '30-45'
        WHEN profile.age BETWEEN 46 AND 60 THEN '46-60'
        ELSE '60+'
    END as age_group,
    COUNT(*) as customer_count,
    COLLECT_LIST(profile.preferences['marketing']) as marketing_preferences
FROM iceberg_db.evolving_customer_profile
GROUP BY 
    CASE 
        WHEN profile.age < 30 THEN '18-29'
        WHEN profile.age BETWEEN 30 AND 45 THEN '30-45'
        WHEN profile.age BETWEEN 46 AND 60 THEN '46-60'
        ELSE '60+'
    END
ORDER BY age_group
""")
print("Customer counts by age group with marketing preferences:")
age_groups.show(truncate=False)
Querying nested data structures...
Customer counts by age group with marketing preferences:
+---------+--------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|age_group|customer_count|marketing_preferences                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
+---------+--------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|18-29    |159           |[opt_in, opt_out, opt_out, opt_out, opt_out, opt_in, opt_in, opt_in, opt_in, opt_out, opt_in, opt_out, opt_in, opt_out, opt_out, opt_in, opt_out, opt_out, opt_out, opt_out, opt_out, opt_out, opt_out, opt_in, opt_out, opt_out, opt_in, opt_in, opt_out, opt_in, opt_out, opt_out, opt_in, opt_in, opt_out, opt_out, opt_in, opt_out, opt_in, opt_in, opt_out, opt_in, opt_in, opt_out, opt_in, opt_in, opt_out, opt_out, opt_in, opt_out, opt_out, opt_in, opt_out, opt_out, opt_in, opt_in, opt_in, opt_in, opt_out, opt_out, opt_out, opt_out, opt_in, opt_in, opt_in, opt_in, opt_in, opt_out, opt_out, opt_in, opt_in, opt_out, opt_in, opt_out, opt_out, opt_in, opt_in, opt_in, opt_in, opt_out, opt_out, opt_out, opt_out, opt_out, opt_in, opt_in, opt_out, opt_in, opt_out, opt_out, opt_in, opt_in, opt_out, opt_in, opt_in, opt_in, opt_out, opt_in, opt_in, opt_in, opt_in, opt_out, opt_in, opt_in, opt_in, opt_out, opt_in, opt_in, opt_in]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
|30-45    |250           |[opt_in, opt_out, opt_out, opt_out, opt_out, opt_out, opt_out, opt_in, opt_in, opt_out, opt_in, opt_in, opt_in, opt_out, opt_in, opt_in, opt_out, opt_out, opt_in, opt_out, opt_out, opt_out, opt_in, opt_out, opt_in, opt_in, opt_out, opt_in, opt_in, opt_in, opt_in, opt_in, opt_out, opt_out, opt_out, opt_in, opt_out, opt_in, opt_in, opt_out, opt_in, opt_in, opt_in, opt_in, opt_out, opt_in, opt_in, opt_out, opt_out, opt_in, opt_out, opt_in, opt_in, opt_out, opt_out, opt_in, opt_out, opt_in, opt_out, opt_out, opt_out, opt_out, opt_out, opt_out, opt_out, opt_out, opt_out, opt_in, opt_out, opt_in, opt_out, opt_out, opt_out, opt_in, opt_in, opt_out, opt_in, opt_in, opt_in, opt_out, opt_in, opt_in, opt_out, opt_in, opt_in, opt_in, opt_out, opt_in, opt_in, opt_out, opt_in, opt_in, opt_out, opt_out, opt_out, opt_out, opt_out, opt_in, opt_out, opt_in, opt_out, opt_out, opt_in, opt_in, opt_in, opt_in, opt_in, opt_in, opt_in, opt_in, opt_in, opt_in, opt_out, opt_in, opt_in, opt_in, opt_in, opt_in, opt_out, opt_out, opt_out, opt_out, opt_in, opt_in, opt_in, opt_out, opt_out, opt_in, opt_in, opt_in, opt_out, opt_in, opt_out, opt_out, opt_out, opt_out, opt_in, opt_in, opt_out, opt_out, opt_in, opt_out, opt_in, opt_in, opt_in, opt_in, opt_in, opt_in, opt_out, opt_out, opt_in, opt_out, opt_out, opt_in, opt_out, opt_in, opt_in, opt_in, opt_in, opt_in, opt_out, opt_in, opt_out, opt_out, opt_in]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |
|46-60    |250           |[opt_out, opt_out, opt_in, opt_in, opt_in, opt_out, opt_in, opt_out, opt_out, opt_out, opt_out, opt_in, opt_out, opt_out, opt_in, opt_in, opt_in, opt_out, opt_out, opt_in, opt_out, opt_in, opt_out, opt_out, opt_out, opt_out, opt_in, opt_in, opt_in, opt_in, opt_out, opt_in, opt_out, opt_in, opt_out, opt_out, opt_out, opt_out, opt_in, opt_in, opt_out, opt_out, opt_out, opt_in, opt_out, opt_in, opt_out, opt_in, opt_in, opt_in, opt_out, opt_out, opt_out, opt_in, opt_out, opt_in, opt_out, opt_out, opt_out, opt_in, opt_in, opt_out, opt_in, opt_in, opt_in, opt_out, opt_in, opt_out, opt_in, opt_out, opt_out, opt_in, opt_in, opt_in, opt_in, opt_in, opt_in, opt_out, opt_in, opt_in, opt_out, opt_out, opt_out, opt_out, opt_in, opt_in, opt_in, opt_out, opt_in, opt_out, opt_out, opt_in, opt_out, opt_in, opt_in, opt_in, opt_in, opt_out, opt_in, opt_in, opt_out, opt_in, opt_in, opt_out, opt_in, opt_out, opt_in, opt_out, opt_out, opt_out, opt_out, opt_in, opt_in, opt_in, opt_out, opt_in, opt_in, opt_in, opt_out, opt_out, opt_in, opt_in, opt_out, opt_out, opt_in, opt_in, opt_out, opt_in, opt_out, opt_out, opt_out, opt_in, opt_out, opt_out, opt_out, opt_out, opt_in, opt_out, opt_in, opt_out, opt_in, opt_in, opt_out, opt_in, opt_out, opt_out, opt_out, opt_in, opt_in, opt_out, opt_out, opt_out, opt_in, opt_out, opt_in, opt_in, opt_in, opt_in, opt_in, opt_out, opt_in, opt_out, opt_out, opt_in, opt_out, opt_in, opt_in, opt_out, opt_in, opt_out, opt_in, opt_out, opt_out, opt_in, opt_in, opt_in, opt_out, opt_out, opt_out]                                                                                                                                                                                                                                                                                                                                                                                                                                   |
|60+      |341           |[opt_in, opt_in, opt_out, opt_out, opt_in, opt_out, opt_in, opt_out, opt_out, opt_in, opt_out, opt_out, opt_in, opt_out, opt_in, opt_out, opt_out, opt_out, opt_in, opt_out, opt_out, opt_in, opt_in, opt_in, opt_in, opt_out, opt_out, opt_in, opt_in, opt_out, opt_in, opt_out, opt_in, opt_out, opt_in, opt_out, opt_out, opt_in, opt_out, opt_out, opt_in, opt_out, opt_out, opt_out, opt_in, opt_in, opt_in, opt_in, opt_out, opt_out, opt_in, opt_out, opt_in, opt_in, opt_in, opt_in, opt_in, opt_out, opt_out, opt_out, opt_in, opt_in, opt_in, opt_out, opt_in, opt_out, opt_in, opt_in, opt_in, opt_out, opt_in, opt_in, opt_out, opt_in, opt_out, opt_in, opt_in, opt_in, opt_in, opt_out, opt_out, opt_in, opt_in, opt_in, opt_in, opt_in, opt_out, opt_in, opt_in, opt_out, opt_in, opt_in, opt_out, opt_out, opt_out, opt_in, opt_in, opt_in, opt_in, opt_in, opt_in, opt_out, opt_in, opt_out, opt_in, opt_out, opt_out, opt_in, opt_out, opt_out, opt_in, opt_in, opt_out, opt_in, opt_in, opt_out, opt_out, opt_in, opt_in, opt_in, opt_out, opt_out, opt_out, opt_out, opt_out, opt_in, opt_in, opt_out, opt_in, opt_in, opt_out, opt_in, opt_out, opt_out, opt_out, opt_in, opt_in, opt_out, opt_in, opt_out, opt_out, opt_out, opt_in, opt_in, opt_in, opt_out, opt_in, opt_out, opt_out, opt_in, opt_in, opt_in, opt_in, opt_in, opt_out, opt_out, opt_out, opt_in, opt_out, opt_out, opt_in, opt_in, opt_in, opt_in, opt_in, opt_out, opt_out, opt_in, opt_out, opt_in, opt_out, opt_out, opt_out, opt_in, opt_in, opt_out, opt_out, opt_in, opt_out, opt_out, opt_in, opt_in, opt_in, opt_in, opt_in, opt_out, opt_out, opt_in, opt_in, opt_out, opt_in, opt_in, opt_in, opt_out, opt_out, opt_out, opt_in, opt_out, opt_in, opt_in, opt_in, opt_out, opt_in, opt_in, opt_in, opt_in, opt_in, opt_in, opt_in, opt_in, opt_out, opt_out, opt_in, opt_in, opt_out, opt_in, opt_out, opt_in, opt_in, opt_in, opt_in, opt_out, opt_out, opt_in, opt_out, opt_out, opt_in, opt_out, opt_in, opt_in]|
+---------+--------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

COMMAND ----------¶

6. Schema Evolution¶

------------------¶

In [9]:
# 6.1 Adding new columns
print("Demonstrating schema evolution - adding columns...")
spark.sql("""
ALTER TABLE iceberg_db.evolving_customer_profile
ADD COLUMNS (
    loyalty_tier STRING,
    account_manager INT
)
""")
Demonstrating schema evolution - adding columns...
Out[9]:
DataFrame[]
In [10]:
# Verify new schema
print("Updated schema:")
spark.sql("DESCRIBE iceberg_db.evolving_customer_profile").show()
Updated schema:
+---------------+--------------------+-------+
|       col_name|           data_type|comment|
+---------------+--------------------+-------+
|    customer_id|                 int|   NULL|
|        profile|struct<name:strin...|   NULL|
|   last_updated|           timestamp|   NULL|
|   loyalty_tier|              string|   NULL|
|account_manager|                 int|   NULL|
+---------------+--------------------+-------+

In [11]:
# 6.2 Generate and write data with the new schema
print("Writing data with the new schema...")
# Update some existing profiles with new columns
updated_profiles = spark.sql("""
SELECT
    customer_id,
    profile,
    last_updated,
    CASE
        WHEN customer_id % 10 = 0 THEN 'Platinum'
        WHEN customer_id % 10 <= 3 THEN 'Gold'
        WHEN customer_id % 10 <= 7 THEN 'Silver'
        ELSE 'Bronze'
    END as loyalty_tier,
    CASE
        WHEN customer_id % 10 = 0 THEN 101
        WHEN customer_id % 10 <= 3 THEN 102
        ELSE NULL
    END as account_manager
FROM iceberg_db.evolving_customer_profile
WHERE customer_id <= 100
""")
Writing data with the new schema...
In [12]:
# Overwrite the selected records
updated_profiles.writeTo("iceberg_db.evolving_customer_profile").overwritePartitions()
print(f"Updated {updated_profiles.count()} customer profiles with new schema")
Updated 100 customer profiles with new schema
In [13]:
# 6.3 Query data with the evolved schema
print("Querying data with evolved schema...")
loyalty_distribution = spark.sql("""
SELECT
    loyalty_tier,
    COUNT(*) as customer_count,
    ROUND(AVG(profile.age), 1) as avg_age,
    COUNT(account_manager) as managed_accounts
FROM iceberg_db.evolving_customer_profile
WHERE loyalty_tier IS NOT NULL
GROUP BY loyalty_tier
ORDER BY customer_count DESC
""")
print("Customer distribution by loyalty tier:")
loyalty_distribution.show()
Querying data with evolved schema...
Customer distribution by loyalty tier:
+------------+--------------+-------+----------------+
|loyalty_tier|customer_count|avg_age|managed_accounts|
+------------+--------------+-------+----------------+
|      Silver|            40|   51.3|               0|
|        Gold|            30|   50.6|              30|
|      Bronze|            20|   49.6|               0|
|    Platinum|            10|   49.5|              10|
+------------+--------------+-------+----------------+

In [14]:
### COMMAND -------------------------
### 7. Demonstrating ACID Transactions
### ---------------------------------
In [16]:
# 7.1 Atomic updates
import time
print("Demonstrating atomic updates in transactions...")
# Update inventory for multiple products in a single transaction
start_time = time.time()
spark.sql("""
UPDATE iceberg_db.advanced_products
SET inventory = inventory - 10, last_updated = current_timestamp()
WHERE product_id IN (1, 2, 3, 4, 5) AND inventory >= 10
""")
end_time = time.time()
print(f"Atomic update completed in {end_time - start_time:.2f} seconds")
Demonstrating atomic updates in transactions...
Atomic update completed in 2.95 seconds
In [17]:
# Check updated records
updated_products = spark.sql("""
SELECT product_id, name, inventory, last_updated
FROM iceberg_db.advanced_products
WHERE product_id IN (1, 2, 3, 4, 5)
ORDER BY product_id
""")
print("Updated product inventory:")
updated_products.show(truncate=False)
Updated product inventory:
+----------+-------------+---------+--------------------------+
|product_id|name         |inventory|last_updated              |
+----------+-------------+---------+--------------------------+
|1         |Weights Z516 |6        |2025-02-18 10:41:16.743027|
|2         |Lamp W258    |607      |2025-02-20 11:55:39.464659|
|3         |Soap K303    |6        |2025-02-19 10:41:16.743071|
|4         |Mystery J322 |989      |2025-02-20 11:55:39.464659|
|5         |Keyboard G880|860      |2025-02-20 11:55:39.464659|
+----------+-------------+---------+--------------------------+

In [18]:
# 7.2 Conditional updates (demonstrating Consistency)
print("\nDemonstrating consistency with conditional updates...")
# Only update if conditions are met
conditionally_updated = spark.sql("""
UPDATE iceberg_db.advanced_products
SET 
    price = price * 1.1,
    last_updated = current_timestamp()
WHERE 
    category = 'Electronics' AND 
    price < 500 AND
    inventory > 50
""")
print("Conditionally updated product prices. Checking results:")
Demonstrating consistency with conditional updates...
Conditionally updated product prices. Checking results:
In [19]:
price_updates = spark.sql("""
SELECT
    product_id,
    name,
    category,
    price,
    inventory,
    last_updated
FROM iceberg_db.advanced_products
WHERE 
    category = 'Electronics'
ORDER BY price DESC
""")
price_updates.show(truncate=False)
+----------+---------------+-----------+------------------+---------+--------------------------+
|product_id|name           |category   |price             |inventory|last_updated              |
+----------+---------------+-----------+------------------+---------+--------------------------+
|9         |Watch U477     |Electronics|824.89            |994      |2025-01-30 10:41:16.743177|
|21        |Camera M950    |Electronics|564.72            |90       |2025-02-04 10:41:16.743293|
|49        |Smartphone D281|Electronics|502.3260000000001 |885      |2025-02-20 11:56:10.560806|
|19        |Watch W515     |Electronics|456.093           |67       |2025-02-20 11:56:10.560806|
|30        |Monitor Q532   |Electronics|417.494           |160      |2025-02-20 11:56:10.560806|
|69        |Keyboard R167  |Electronics|383.845           |71       |2025-02-20 11:56:10.560806|
|74        |Laptop E772    |Electronics|314.45700000000005|319      |2025-02-20 11:56:10.560806|
|82        |TV U710        |Electronics|270.952           |489      |2025-02-20 11:56:10.560806|
|5         |Keyboard G880  |Electronics|156.87100000000004|860      |2025-02-20 11:56:10.560806|
|96        |Monitor I604   |Electronics|107.22800000000001|631      |2025-02-20 11:56:10.560806|
+----------+---------------+-----------+------------------+---------+--------------------------+

In [21]:
import datetime
# 7.3 Merge operations (upserts)
print("\nDemonstrating merge operations (upserts)...")
# Create temporary data for merge
merge_data = [
    (101, "New Smart Watch X1", "Electronics", 299.99, 50, datetime.datetime.now()),
    (5, "Updated Product Name", "Electronics", 599.99, 75, datetime.datetime.now())
]
merge_df = spark.createDataFrame(merge_data, ["product_id", "name", "category", "price", "inventory", "last_updated"])
merge_df.createOrReplaceTempView("products_updates")
Demonstrating merge operations (upserts)...
In [23]:
# Perform merge operation
spark.sql("""
MERGE INTO iceberg_db.advanced_products t
USING products_updates s
ON t.product_id = s.product_id
WHEN MATCHED THEN
  UPDATE SET
    t.name = s.name,
    t.price = s.price,
    t.inventory = s.inventory,
    t.last_updated = s.last_updated
WHEN NOT MATCHED THEN
  INSERT (product_id, name, category, price, inventory, last_updated)
  VALUES (s.product_id, s.name, s.category, s.price, s.inventory, s.last_updated)
""")
Out[23]:
DataFrame[]
In [25]:
# Check results of merge
merged_products = spark.sql("""
SELECT product_id, name, category, price, inventory, last_updated
FROM iceberg_db.advanced_products
WHERE product_id IN (5, 101)
""")
print("Results of merge operation:")
merged_products.show(truncate=False)
Results of merge operation:
+----------+--------------------+-----------+------+---------+--------------------------+
|product_id|name                |category   |price |inventory|last_updated              |
+----------+--------------------+-----------+------+---------+--------------------------+
|5         |Updated Product Name|Electronics|599.99|75       |2025-02-20 11:56:38.123071|
|101       |New Smart Watch X1  |Electronics|299.99|50       |2025-02-20 11:56:38.123064|
+----------+--------------------+-----------+------+---------+--------------------------+

In [26]:
# 7.4 Time travel queries (demonstrating Isolation and Durability)
In [28]:
print("\nDemonstrating time travel capabilities...")
# First, check table history
history = spark.sql("SELECT * FROM iceberg_db.advanced_products.history")
print("Table history:")
history.show(truncate=False)
Demonstrating time travel capabilities...
Table history:
+-----------------------+-------------------+-------------------+-------------------+
|made_current_at        |snapshot_id        |parent_id          |is_current_ancestor|
+-----------------------+-------------------+-------------------+-------------------+
|2025-02-20 10:41:24.55 |6465532312590292151|NULL               |true               |
|2025-02-20 11:55:41.873|3954393121797699845|6465532312590292151|true               |
|2025-02-20 11:56:12.154|6398027269306179278|3954393121797699845|true               |
|2025-02-20 11:57:14.76 |4142118315220597146|6398027269306179278|true               |
+-----------------------+-------------------+-------------------+-------------------+

In [29]:
# Get the snapshot ID from before our updates
if history.count() >= 2:
    older_snapshot = history.orderBy("made_current_at").collect()[0]["snapshot_id"]
    current_snapshot = history.orderBy("made_current_at", ascending=False).collect()[0]["snapshot_id"]
    
    # Query at an earlier snapshot
    print(f"\nQuerying data at earlier snapshot: {older_snapshot}")
    old_data = spark.read.option("snapshot-id", older_snapshot).table("iceberg_db.advanced_products")
    print(f"Number of products in older snapshot: {old_data.count()}")
    
    # Query using timestamp
    one_hour_ago = datetime.datetime.now() - datetime.timedelta(hours=1)
    timestamp_str = one_hour_ago.strftime("%Y-%m-%d %H:%M:%S")
    print(f"\nQuerying data as of {timestamp_str}")
    try:
        time_travel_data = spark.read.option("as-of-timestamp", timestamp_str).table("iceberg_db.advanced_products")
        print(f"Number of products as of {timestamp_str}: {time_travel_data.count()}")
    except Exception as e:
        print(f"Time travel query failed: {str(e)}")
Querying data at earlier snapshot: 6465532312590292151
Number of products in older snapshot: 100

Querying data as of 2025-02-20 10:58:25
Time travel query failed: For input string: "2025-02-20 10:58:25"
In [30]:
# COMMAND ----------
# 8. Advanced Iceberg Workflows
# ----------------------------
In [32]:
# 9.1 Data Retention and Expiration
print("\nDemonstrating data retention policies...")
# Set retention policy for snapshots
spark.sql("""
ALTER TABLE iceberg_db.partitioned_orders
SET TBLPROPERTIES (
  'history.expire.max-snapshot-age-ms' = '604800000', -- 7 days
  'history.expire.min-snapshots-to-keep' = '3'
)
""")
Demonstrating data retention policies...
Out[32]:
DataFrame[]
In [37]:
# Check current properties
props = spark.sql("SHOW TBLPROPERTIES iceberg_db.partitioned_orders")
print("Table properties for retention:")
props.filter(col("key").like("%expire%")).show(truncate=False)

# Manually expire snapshots (would normally be run as a scheduled job)
try:
    spark.sql("CALL iceberg_catalog.system.expire_snapshots(table => 'iceberg_db.partitioned_orders', older_than => TIMESTAMP '2023-01-01 00:00:00', retain_last => 3)")
    print("Expired old snapshots successfully")
except Exception as e:
    print(f"Note: Snapshot expiration not supported in this Spark version: {str(e)}")
Table properties for retention:
+------------------------------------+---------+
|key                                 |value    |
+------------------------------------+---------+
|history.expire.max-snapshot-age-ms  |604800000|
|history.expire.min-snapshots-to-keep|3        |
+------------------------------------+---------+

Note: Snapshot expiration not supported in this Spark version: Procedure iceberg_catalog.system.expire_snapshots not found
In [38]:
# 9.2 Table Migration and Repartitioning
print("\nDemonstrating table repartitioning...")
# First, check current partition distribution
partition_info = spark.sql("""
SELECT status, COUNT(*) as partition_count
FROM iceberg_db.partitioned_orders
GROUP BY status
""")
print("Current partition distribution:")
partition_info.show()
Demonstrating table repartitioning...
Current partition distribution:
+------+---------------+
|status|partition_count|
+------+---------------+
+------+---------------+

In [ ]: