Composition Experimental

DataCompose's composition system enables you to build complex data transformation pipelines from simple primitives. The @compose decorator provides a declarative syntax that improves code readability while maintaining performance.

Why Composition Matters

Building data pipelines traditionally requires nested function calls or intermediate variables that clutter code and reduce maintainability. Composition solves this by providing a linear, readable syntax for transformation sequences.

The @compose Decorator

The @compose decorator transforms declarative function bodies into efficient transformation pipelines:

# Using @compose decorator
@text.compose()
def clean_text():
    text.trim()
    text.lowercase()
    text.remove_special_chars()

# Apply the composed pipeline
df.select(clean_text(F.col("input")))

# This is exactly equivalent to:
df.select(
    text.remove_special_chars(
        text.lowercase(
            text.trim(F.col("input"))
        )
    )
)

# Or using sequential application:
col = F.col("input")
col = text.trim(col)
col = text.lowercase(col)
col = text.remove_special_chars(col)
df.select(col)

The decorator is a convenience wrapper—it produces the same execution as manual chaining but with better readability.

Choosing Your Style: Decorator vs Traditional

Both approaches are equally valid - it’s entirely your choice! DataCompose supports both decorator-based composition and traditional function style:

# Option 1: Using @compose decorator (declarative style)
@text.compose()
def clean():
    text.trim()
    text.lowercase()
    text.remove_special_chars()

# Option 2: Traditional function style
def clean(col):
    col = text.trim(col)
    col = text.lowercase(col)
    col = text.remove_special_chars(col)
    return col

# Both work exactly the same way:
df.select(clean(F.col("input")))  # Same result with either approach

When to Use Each Style

Use the @compose decorator when:

  • You prefer declarative, readable syntax
  • Building reusable transformation pipelines
  • Working with conditional logic (if/else statements)
  • Composing multiple namespaces together

Use traditional functions when:

  • You’re more comfortable with explicit function calls
  • Integrating with existing codebases
  • Need full control over the transformation flow
  • Debugging specific transformation steps

More Examples: Both Styles

# Email validation - Decorator style
@email.compose()
def validate_email():
    email.lowercase()
    email.trim()
    if email.is_valid():
        email.extract_domain()
    else:
        email.mark_invalid()

# Email validation - Traditional style
def validate_email(col):
    col = email.lowercase(col)
    col = email.trim(col)
    col = F.when(
        email.is_valid(col),
        email.extract_domain(col)
    ).otherwise(
        email.mark_invalid(col)
    )
    return col

# Phone formatting - Decorator style
@phone.compose()
def format_phone():
    phone.remove_non_digits()
    phone.validate_length()
    phone.format_e164()

# Phone formatting - Traditional style
def format_phone(col):
    col = phone.remove_non_digits(col)
    col = phone.validate_length(col)
    col = phone.format_e164(col)
    return col

Remember: There’s no performance difference between the two approaches. Choose the style that makes your code most maintainable for your team.

Under the Hood

Pipeline Compilation Process

DataCompose uses AST (Abstract Syntax Tree) analysis to compile pipeline definitions at definition time:

  1. Static Analysis - Parses the function’s AST to extract transformation sequences
  2. Step Compilation - Converts each primitive call into a CompiledStep object
  3. Runtime Execution - StablePipeline executor applies transformations efficiently

Performance note: Compilation happens once when the pipeline is defined, not during execution.

Execution Order Matters

Transformations execute sequentially in the order they appear:

@email.compose()
def process_email():
    email.lowercase()        # Step 1: Normalize case
    email.trim()             # Step 2: Remove whitespace
    email.validate()         # Step 3: Verify format
    email.extract_domain()   # Step 4: Extract domain part

Best practice: Order transformations from general to specific—clean before validate, validate before extract.

Advanced Features

Conditional Branching

Experimental

Composition supports conditional logic within pipelines using if/else statements:

@validation.compose()
def process_email():
    if validation.is_valid_email():
        email.normalize()
        email.extract_domain()
    else:
        email.mark_invalid()

Implementation detail: Conditions compile to Spark’s when/otherwise expressions, maintaining lazy evaluation.

Parameterized Pipelines

Configure transformations with specific parameters:

@text.compose()
def clean_with_options():
    text.trim(chars=' \t\n')
    text.truncate(max_length=100)
    text.lowercase()

# Parameters are baked into the pipeline
df.select(clean_with_options(F.col("description")))

Use case: Create specialized pipelines for different data sources without code duplication.

Nested Composition

Build complex pipelines from simpler ones:

@text.compose()
def basic_clean():
    text.trim()
    text.lowercase()

@text.compose()
def full_clean():
    basic_clean()  # Reuse existing pipeline
    text.remove_special_chars()
    text.normalize_unicode()

Design principle: Create small, focused pipelines and compose them for complex workflows.

Working with Multiple Namespaces

Composition can work across multiple primitive namespaces:

from transformers.pyspark.emails import emails
from transformers.pyspark.phone_numbers import phones

@customer.compose()
def clean_contact_info():
    emails.validate()
    emails.normalize()
    phones.format_e164()
    customer.standardize_company_name()

Composition Patterns

Sequential Processing

The most common pattern - apply transformations one after another:

@text.compose()
def standard_text_pipeline():
    text.remove_html_tags()
    text.normalize_whitespace()
    text.trim()
    text.lowercase()

Validation and Cleaning

Combine validation with conditional cleaning:

@email.compose()
def smart_email_clean():
    if validation.has_common_typo():
        email.fix_typos()
    email.normalize()
    email.validate()

Multi-field Processing

Process related fields together:

@customer.compose()
def process_customer_record():
    customer.standardize_first_name()
    customer.standardize_last_name()
    customer.format_full_name()
    customer.create_customer_id()

Inline Lambda Functions

You can return a lambda function directly in the composed function for one-off transformations:

@text.compose()
def custom_processing():
    text.trim()
    text.lowercase()
    lambda col: F.regexp_replace(col, r'[^ws]', '')  # Inline custom transformation
    text.validate_length()

This allows you to include simple, pipeline-specific transformations without creating a full primitive.

Customization Hierarchy

When you need custom transformation logic, follow this recommended approach:

1. Use Primitive Parameters (Preferred)

First, try using the built-in parameters of existing primitives:

@text.compose()
def custom_clean():
    text.trim(chars=' \t\n\r')  # Use parameters for customization
    text.truncate(max_length=50)
    text.replace_pattern(pattern=r's+', replacement=' ')

2. Use Inline Lambdas (For Simple Logic)

If parameters aren’t sufficient, add inline lambda functions for one-off transformations:

@text.compose()
def specialized_clean():
    text.trim()
    lambda col: F.regexp_replace(col, r'[^ws-]', '')  # Custom regex
    lambda col: F.when(F.length(col) < 3, None).otherwise(col)  # Custom validation
    text.lowercase()

3. Modify Imported Code (For Complex Changes)

Only if the above approaches don’t work, modify the DataCompose code directly:

  • The code is in your transformers directory after running datacompose add
  • You own this code - modify it to fit your exact needs
  • Changes persist since it’s part of your codebase
# In transformers/pyspark/utils.py - modify existing primitives directly
@text.register()
def custom_normalize(col):
    """Your completely custom implementation"""
    return your_custom_logic(col)

Best Practices for Composition

  1. Keep pipelines focused - Each pipeline should have a clear, single purpose
  2. Use descriptive names - Pipeline names should indicate what transformation they perform
  3. Order matters - Consider the sequence of transformations carefully
  4. Test pipelines - Composed pipelines should be tested as complete units
  5. Document complex logic - Add docstrings explaining the pipeline’s purpose and logic
  6. Avoid deep nesting - Break complex pipelines into smaller, reusable components
  7. Follow the customization hierarchy - Use parameters → lambdas → code modification (in that order)

Performance Considerations

  • Compilation is one-time - The AST analysis happens only when the pipeline is defined
  • No runtime overhead - Composed pipelines execute as efficiently as manual function chaining
  • Lazy evaluation - Spark’s lazy evaluation means transformations are optimized before execution
  • Catalyst optimizer - Spark’s Catalyst optimizer can further optimize the composed transformations

Debugging Composed Pipelines

Enable debug mode to see each transformation as it’s applied:

@text.compose()
def debug_pipeline():
    text.trim()
    text.lowercase()
    text.validate_length()

# Debug output will show each step during execution
df.select(debug_pipeline(F.col("input")))