Typo Correction and Mapping Performance
The Challenge with Typo Mappings
Typo correction primitives like fix_common_typos
can be expensive operations:
@emails.register()
def fix_common_typos(col: Column) -> Column:
"""Fix common email domain typos - potentially expensive operation."""
typo_mappings = {
'gmial.com': 'gmail.com',
'gmai.com': 'gmail.com',
'gmil.com': 'gmail.com',
'yahooo.com': 'yahoo.com',
'yaho.com': 'yahoo.com',
# ... hundreds more mappings
}
# This creates a chain of when().otherwise() calls
result = col
for typo, correct in typo_mappings.items():
result = F.when(
F.lower(F.split(col, '@')[1]) == typo,
F.concat(F.split(col, '@')[0], F.lit('@'), F.lit(correct))
).otherwise(result)
Performance Impact
- Chain of Conditions - Each typo creates a new
when().otherwise()
check - String Operations - Multiple splits and concatenations per check
- Catalyst Optimization Limits - Deep nested conditions can hit optimizer limits
- Memory Overhead - Large mapping dictionaries consume driver memory
Optimization Strategies
1. Use Broadcast Joins for Large Mappings
def fix_typos_optimized(df: DataFrame, email_col: str) -> DataFrame:
"""Use broadcast join for large typo mappings."""
# Create mapping DataFrame
typo_data = [
('gmial.com', 'gmail.com'),
('gmai.com', 'gmail.com'),
# ... more mappings
]
typo_df = spark.createDataFrame(typo_data, ['typo', 'correct'])
# Extract domain and join
df_with_domain = df.withColumn('domain', F.split(F.col(email_col), '@')[1])
# Broadcast small mapping table
corrected = df_with_domain.join(
F.broadcast(typo_df),
df_with_domain.domain == typo_df.typo,
'left'
)
# Reconstruct email with corrected domain
corrected = corrected.withColumn(
email_col,
F.when(
F.col('correct').isNotNull(),
F.concat(F.split(F.col(email_col), '@')[0], F.lit('@'), F.col('correct'))
).otherwise(F.col(email_col))
).drop('domain', 'typo', 'correct')
return corrected
2. Limit Typo Checks to Common Domains
@emails.register()
def fix_common_typos_selective(col: Column) -> Column:
"""Only check typos for common domains - much faster."""
# Only check if domain looks like common providers
return F.when(
F.lower(F.split(col, '@')[1]).rlike('^gm[ai].*.com$|^yaho.*.com$'),
fix_typos_comprehensive(col) # Apply expensive checks
).otherwise(col) # Skip typo checking for other domains
3. Use UDFs for Complex Mappings
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
# Define mapping as Python dict (cached in executor)
TYPO_MAP = {'gmial.com': 'gmail.com', ...}
@udf(returnType=StringType)
def fix_typos_udf(email):
if not email or '@' not in email:
return email
local, domain = email.rsplit('@', 1)
domain_lower = domain.lower()
# Direct dictionary lookup - O(1)
corrected = TYPO_MAP.get(domain_lower, domain)
return f"{local}@{corrected}"
# Use the UDF
df.withColumn('email_fixed', fix_typos_udf(F.col('email')))
General Performance Best Practices
1. Column Pruning
Only select columns you need before applying transformations:
# Good - reduces data shuffling
df.select('email', 'phone').transform(clean_contact_info)
# Bad - processes unnecessary columns
df.transform(clean_contact_info).select('email', 'phone')
2. Predicate Pushdown
Filter early to reduce data volume:
# Good - filter before transformation
df.filter(F.col('email').isNotNull())
.transform(email_pipeline)
# Bad - transform everything then filter
df.transform(email_pipeline)
.filter(F.col('email_valid') == True)
3. Cache Intermediate Results
For pipelines used multiple times:
# Cache cleaned data if reused
cleaned_df = df.transform(comprehensive_clean).cache()
# Now multiple operations use cached data
valid_emails = cleaned_df.filter(email.is_valid_email(F.col('email')))
invalid_emails = cleaned_df.filter(~email.is_valid_email(F.col('email')))
4. Avoid Redundant Operations
Compose primitives to minimize repeated work:
# Bad - extracts domain twice
df.withColumn('domain', email.extract_domain(F.col('email')))
.withColumn('is_corporate', email.is_corporate_email(F.col('email')))
# Good - extract once, derive from result
df.withColumn('domain', email.extract_domain(F.col('email')))
.withColumn('is_corporate', ~F.col('domain').isin(FREE_PROVIDERS))
5. Partition Wisely
Ensure proper partitioning for operations:
# Repartition before heavy transformations
df.repartition(200, 'email_domain')
.transform(heavy_email_pipeline)
Monitoring Performance
Enable Spark UI Metrics
@text.compose()
def monitored_pipeline():
# Each step will be visible in Spark UI
text.clean()
text.validate()
text.transform()
Measure Transformation Time
import time
start = time.time()
result = df.transform(complex_pipeline)
result.count() # Force evaluation
print(f"Pipeline took {time.time() - start:.2f} seconds")
Profile Memory Usage
# Check DataFrame size before/after transformations
df.cache()
print(f"Input partitions: {df.rdd.getNumPartitions()}")
print(f"Input size: {df.count()} rows")
result = df.transform(memory_intensive_pipeline)
print(f"Output partitions: {result.rdd.getNumPartitions()}")
print(f"Output size: {result.count()} rows")
Primitive-Specific Performance Tips
Email Primitives
- Use
filter_valid_emails
early to reduce data volume - Cache domain extractions if used multiple times
- Consider broadcast joins for provider lookups
Phone Primitives
- Extract digits once and reuse for multiple validations
- Use region-specific validation to avoid unnecessary checks
Address Primitives
- Parse addresses once into structured format
- Cache parsed components for multiple extractions
- Use postal code validation to short-circuit full address validation
When to Optimize
Not all pipelines need optimization. Consider optimization when:
- Data Volume > 10 million rows
- Typo Mappings > 100 entries
- Pipeline Reuse - Same transformation applied multiple times
- Memory Pressure - Executors running out of memory
- Long Runtime - Pipeline takes > 5 minutes
Related Documentation
- Getting Started - Understanding DataCompose basics
- Composition - Building efficient pipelines
- Error Handling - Performance-safe error handling