Performance Considerations

Understanding how DataCompose primitives impact Spark performance is crucial for building efficient data pipelines at scale.

Typo Correction and Mapping Performance

The Challenge with Typo Mappings

Typo correction primitives like fix_common_typos can be expensive operations:

@emails.register()
def fix_common_typos(col: Column) -> Column:
    """Fix common email domain typos - potentially expensive operation."""
    typo_mappings = {
        'gmial.com': 'gmail.com',
        'gmai.com': 'gmail.com',
        'gmil.com': 'gmail.com',
        'yahooo.com': 'yahoo.com',
        'yaho.com': 'yahoo.com',
        # ... hundreds more mappings
    }

    # This creates a chain of when().otherwise() calls
    result = col
    for typo, correct in typo_mappings.items():
        result = F.when(
            F.lower(F.split(col, '@')[1]) == typo,
            F.concat(F.split(col, '@')[0], F.lit('@'), F.lit(correct))
        ).otherwise(result)

Performance Impact

  1. Chain of Conditions - Each typo creates a new when().otherwise() check
  2. String Operations - Multiple splits and concatenations per check
  3. Catalyst Optimization Limits - Deep nested conditions can hit optimizer limits
  4. Memory Overhead - Large mapping dictionaries consume driver memory

Optimization Strategies

1. Use Broadcast Joins for Large Mappings

def fix_typos_optimized(df: DataFrame, email_col: str) -> DataFrame:
    """Use broadcast join for large typo mappings."""
    # Create mapping DataFrame
    typo_data = [
        ('gmial.com', 'gmail.com'),
        ('gmai.com', 'gmail.com'),
        # ... more mappings
    ]
    typo_df = spark.createDataFrame(typo_data, ['typo', 'correct'])

    # Extract domain and join
    df_with_domain = df.withColumn('domain', F.split(F.col(email_col), '@')[1])

    # Broadcast small mapping table
    corrected = df_with_domain.join(
        F.broadcast(typo_df),
        df_with_domain.domain == typo_df.typo,
        'left'
    )

    # Reconstruct email with corrected domain
    corrected = corrected.withColumn(
        email_col,
        F.when(
            F.col('correct').isNotNull(),
            F.concat(F.split(F.col(email_col), '@')[0], F.lit('@'), F.col('correct'))
        ).otherwise(F.col(email_col))
    ).drop('domain', 'typo', 'correct')

    return corrected

2. Limit Typo Checks to Common Domains

@emails.register()
def fix_common_typos_selective(col: Column) -> Column:
    """Only check typos for common domains - much faster."""
    # Only check if domain looks like common providers
    return F.when(
        F.lower(F.split(col, '@')[1]).rlike('^gm[ai].*.com$|^yaho.*.com$'),
        fix_typos_comprehensive(col)  # Apply expensive checks
    ).otherwise(col)  # Skip typo checking for other domains

3. Use UDFs for Complex Mappings

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Define mapping as Python dict (cached in executor)
TYPO_MAP = {'gmial.com': 'gmail.com', ...}

@udf(returnType=StringType)
def fix_typos_udf(email):
    if not email or '@' not in email:
        return email

    local, domain = email.rsplit('@', 1)
    domain_lower = domain.lower()

    # Direct dictionary lookup - O(1)
    corrected = TYPO_MAP.get(domain_lower, domain)
    return f"{local}@{corrected}"

# Use the UDF
df.withColumn('email_fixed', fix_typos_udf(F.col('email')))

General Performance Best Practices

1. Column Pruning

Only select columns you need before applying transformations:

# Good - reduces data shuffling
df.select('email', 'phone').transform(clean_contact_info)

# Bad - processes unnecessary columns
df.transform(clean_contact_info).select('email', 'phone')

2. Predicate Pushdown

Filter early to reduce data volume:

# Good - filter before transformation
df.filter(F.col('email').isNotNull())
  .transform(email_pipeline)

# Bad - transform everything then filter
df.transform(email_pipeline)
  .filter(F.col('email_valid') == True)

3. Cache Intermediate Results

For pipelines used multiple times:

# Cache cleaned data if reused
cleaned_df = df.transform(comprehensive_clean).cache()

# Now multiple operations use cached data
valid_emails = cleaned_df.filter(email.is_valid_email(F.col('email')))
invalid_emails = cleaned_df.filter(~email.is_valid_email(F.col('email')))

4. Avoid Redundant Operations

Compose primitives to minimize repeated work:

# Bad - extracts domain twice
df.withColumn('domain', email.extract_domain(F.col('email')))
  .withColumn('is_corporate', email.is_corporate_email(F.col('email')))

# Good - extract once, derive from result
df.withColumn('domain', email.extract_domain(F.col('email')))
  .withColumn('is_corporate', ~F.col('domain').isin(FREE_PROVIDERS))

5. Partition Wisely

Ensure proper partitioning for operations:

# Repartition before heavy transformations
df.repartition(200, 'email_domain')
  .transform(heavy_email_pipeline)

Monitoring Performance

Enable Spark UI Metrics

@text.compose()
def monitored_pipeline():
    # Each step will be visible in Spark UI
    text.clean()
    text.validate()
    text.transform()

Measure Transformation Time

import time

start = time.time()
result = df.transform(complex_pipeline)
result.count()  # Force evaluation
print(f"Pipeline took {time.time() - start:.2f} seconds")

Profile Memory Usage

# Check DataFrame size before/after transformations
df.cache()
print(f"Input partitions: {df.rdd.getNumPartitions()}")
print(f"Input size: {df.count()} rows")

result = df.transform(memory_intensive_pipeline)
print(f"Output partitions: {result.rdd.getNumPartitions()}")
print(f"Output size: {result.count()} rows")

Primitive-Specific Performance Tips

Email Primitives

  • Use filter_valid_emails early to reduce data volume
  • Cache domain extractions if used multiple times
  • Consider broadcast joins for provider lookups

Phone Primitives

  • Extract digits once and reuse for multiple validations
  • Use region-specific validation to avoid unnecessary checks

Address Primitives

  • Parse addresses once into structured format
  • Cache parsed components for multiple extractions
  • Use postal code validation to short-circuit full address validation

When to Optimize

Not all pipelines need optimization. Consider optimization when:

  1. Data Volume > 10 million rows
  2. Typo Mappings > 100 entries
  3. Pipeline Reuse - Same transformation applied multiple times
  4. Memory Pressure - Executors running out of memory
  5. Long Runtime - Pipeline takes > 5 minutes