Error Handling

DataCompose primitives follow Spark's null-safe patterns for error handling, ensuring robust data pipelines that handle edge cases gracefully. The framework's compilation and execution model provides multiple layers of error resilience.

Core Principles

Null Safety

Primitives handle null values gracefully using PySpark’s when().otherwise() pattern:

@emails.register()
def filter_valid_emails(col: Column) -> Column:
    """Return email only if valid, otherwise return null."""
    return F.when(is_valid_email(col), col).otherwise(F.lit(None))

No Exceptions

Primitives don’t throw exceptions - they return nulls or default values for invalid data:

# Instead of throwing an error for invalid phone numbers
# This returns null for invalid inputs
@phone.register()
def clean_phone(col: Column) -> Column:
    """Clean and validate phone number, returning null for invalid numbers."""
    digits = extract_digits(col)
    return F.when(
        F.length(digits) == 10,  # Valid NANP
        format_nanp(digits)
    ).otherwise(F.lit(None))

Validation Patterns

Pre-validation

Use validation primitives to check data before transformation:

@text.compose()
def safe_email_processing():
    if validation.is_valid_email():
        email.normalize()
        email.extract_domain()
    else:
        # Handle invalid emails differently
        lambda col: F.lit("INVALID_EMAIL")

Filter Invalid Data

Use filter primitives to separate valid from invalid data:

# Process only valid emails, nulls for invalid
df.select(
    email.filter_valid_emails(F.col("email_input"))
).filter(F.col("email_input").isNotNull())

Default Values

Primitives can accept default values for handling edge cases:

@customer.register()
def extract_account_type(col, default_type='STANDARD'):
    """Extracts account type with fallback to default."""
    return F.when(F.substring(col, 1, 2) == 'PR', F.lit('PREMIUM'))
            .when(F.substring(col, 1, 2) == 'EN', F.lit('ENTERPRISE'))
            .otherwise(F.lit(default_type))  # Safe default

Handling Different Error Scenarios

Missing Data

# Handle missing required fields
@customer.compose()
def handle_missing_data():
    lambda col: F.when(col.isNull(), F.lit("MISSING")).otherwise(col)
    customer.standardize()

Invalid Format

# Mark invalid formats instead of failing
@phone.compose()
def mark_invalid_phones():
    lambda col: F.when(
        phone.validate_nanp(col),
        phone.format_e164(col)
    ).otherwise(F.concat(F.lit("INVALID:"), col))

Data Quality Issues

# Track data quality while processing
df = df.withColumn(
    "email_quality",
    F.when(email.is_valid_email(F.col("email")), "valid")
     .when(F.col("email").isNull(), "missing")
     .otherwise("invalid")
)

Best Practices for Error Handling

  1. Return nulls, not exceptions - Let Spark handle null propagation
  2. Use filter primitives - Separate valid/invalid data cleanly
  3. Provide defaults - Always have sensible fallback values
  4. Track quality - Add columns to track data quality issues
  5. Validate early - Check data validity before complex transformations
  6. Document behavior - Clearly state how primitives handle edge cases
  7. Test edge cases - Include null, empty, and invalid data in tests

Advanced Features

Error Tracking Pattern

Create columns to track transformation status:

# Track data quality while processing
df = df.withColumn(
    "processing_status",
    F.struct(
        F.col("email").alias("original"),
        email.normalize(F.col("email")).alias("processed"),
        F.when(email.is_valid_email(F.col("email")), "valid")
         .otherwise("invalid").alias("status")
    )
)

Multi-field Validation

Validate multiple fields together:

@customer.compose()
def validate_customer_record():
    # Check if all required fields are present
    lambda col: F.when(
        F.col("email").isNotNull() & 
        F.col("phone").isNotNull() & 
        F.col("address").isNotNull(),
        F.lit("complete")
    ).otherwise(F.lit("incomplete"))

Conditional Error Recovery

Apply different recovery strategies based on error type:

@email.compose()
def smart_error_recovery():
    # Try to fix common typos first
    if email.has_common_typo():
        email.fix_typos()
    
    # Then validate and mark if still invalid
    if email.is_valid_email():
        email.normalize()
    else:
        lambda col: F.concat(F.lit("INVALID:"), col)

Compilation Error Handling

Based on the PipelineCompiler in DataCompose, the framework handles compilation errors gracefully:

AST Compilation Fallback

The @compose decorator has multiple fallback strategies:

@namespace.compose()
def my_pipeline():
    namespace.transform()

# If AST compilation fails:
# 1. PipelineCompiler attempts advanced compilation
# 2. Falls back to _fallback_compose for sequential extraction
# 3. Ultimate fallback returns identity function

Compilation Warnings

The compiler logs warnings without failing:

# From PipelineCompiler
if self.debug:
    logger.warning(f"Failed to compile '{func.__name__}': {e}")
    logger.debug("Compilation error details:", exc_info=True)
# Returns empty pipeline as fallback
return StablePipeline([], self.debug)

Step Validation

Each CompiledStep validates itself:

# CompiledStep validation
def validate(self):
    if self.step_type == "transform":
        if not callable(self.action):
            raise ValueError(f"Transform step requires callable action")
    elif self.step_type == "conditional":
        if not callable(self.condition):
            raise ValueError(f"Conditional step requires callable condition")

Runtime Error Handling

StablePipeline Execution

The StablePipeline executor handles errors during execution:

def _execute_steps(self, steps, col):
    result = col
    for step in steps:
        if self.debug:
            # Log each step for debugging
            self.logger.debug(f"Executing step: {step_name}")

        if step.step_type == "transform":
            if callable(step.action):
                result = step.action(result)
        # Gracefully handles non-callable actions

Fallback Behavior

When compilation completely fails:

# Ultimate fallback - identity function
def pipeline(col):
    return col  # Returns input unchanged

pipeline.__doc__ = f"Failed to compile {func.__name__}"

Debugging Failed Transformations

Enable debug mode to trace issues:

@text.compose()
def debug_pipeline():
    text.trim()
    lambda col: F.when(F.length(col) == 0, F.lit("EMPTY_AFTER_TRIM")).otherwise(col)
    text.lowercase()
    text.validate()

# Debug output shows:
# - Successful compilation message with step count
# - Each step execution with name
# - Any compilation warnings or errors

Debug Logging Levels

import logging

# Enable detailed debugging
logging.getLogger("datacompose.operators.primitives").setLevel(logging.DEBUG)

# Now compilation and execution will show:
# - AST parsing attempts
# - Fallback strategies used
# - Step-by-step execution traces