Skip to content

Optimizers Reference

Query optimization strategies.

Optimizer

Optimizer

Bases: ABC

Base class for all query optimizers

Each optimizer implements a single optimization rule. Optimizers are applied in a pipeline before query execution.

Source code in sqlstream/optimizers/base.py
class Optimizer(ABC):
    """
    Base class for all query optimizers

    Each optimizer implements a single optimization rule.
    Optimizers are applied in a pipeline before query execution.
    """

    def __init__(self):
        """Initialize optimizer"""
        self.applied = False
        self.description = ""

    @abstractmethod
    def can_optimize(self, ast: SelectStatement, reader: BaseReader) -> bool:
        """
        Check if this optimization can be applied

        Args:
            ast: Parsed SQL statement
            reader: Data source reader

        Returns:
            True if optimization is applicable
        """
        pass

    @abstractmethod
    def optimize(self, ast: SelectStatement, reader: BaseReader) -> None:
        """
        Apply the optimization

        Args:
            ast: Parsed SQL statement
            reader: Data source reader

        Modifies:
            - reader: Sets optimization hints
            - self.applied: Marks optimization as applied
            - self.description: Describes what was optimized
        """
        pass

    @abstractmethod
    def get_name(self) -> str:
        """
        Get the name of this optimizer

        Returns:
            Human-readable optimizer name
        """
        pass

    def get_description(self) -> str:
        """
        Get description of what was optimized

        Returns:
            Description string if applied, empty string otherwise
        """
        return self.description if self.applied else ""

    def was_applied(self) -> bool:
        """
        Check if optimization was applied

        Returns:
            True if optimization was applied
        """
        return self.applied

__init__

__init__()

Initialize optimizer

Source code in sqlstream/optimizers/base.py
def __init__(self):
    """Initialize optimizer"""
    self.applied = False
    self.description = ""

can_optimize abstractmethod

can_optimize(ast: SelectStatement, reader: BaseReader) -> bool

Check if this optimization can be applied

Parameters:

Name Type Description Default
ast SelectStatement

Parsed SQL statement

required
reader BaseReader

Data source reader

required

Returns:

Type Description
bool

True if optimization is applicable

Source code in sqlstream/optimizers/base.py
@abstractmethod
def can_optimize(self, ast: SelectStatement, reader: BaseReader) -> bool:
    """
    Check if this optimization can be applied

    Args:
        ast: Parsed SQL statement
        reader: Data source reader

    Returns:
        True if optimization is applicable
    """
    pass

optimize abstractmethod

optimize(ast: SelectStatement, reader: BaseReader) -> None

Apply the optimization

Parameters:

Name Type Description Default
ast SelectStatement

Parsed SQL statement

required
reader BaseReader

Data source reader

required
Modifies
  • reader: Sets optimization hints
  • self.applied: Marks optimization as applied
  • self.description: Describes what was optimized
Source code in sqlstream/optimizers/base.py
@abstractmethod
def optimize(self, ast: SelectStatement, reader: BaseReader) -> None:
    """
    Apply the optimization

    Args:
        ast: Parsed SQL statement
        reader: Data source reader

    Modifies:
        - reader: Sets optimization hints
        - self.applied: Marks optimization as applied
        - self.description: Describes what was optimized
    """
    pass

get_name abstractmethod

get_name() -> str

Get the name of this optimizer

Returns:

Type Description
str

Human-readable optimizer name

Source code in sqlstream/optimizers/base.py
@abstractmethod
def get_name(self) -> str:
    """
    Get the name of this optimizer

    Returns:
        Human-readable optimizer name
    """
    pass

get_description

get_description() -> str

Get description of what was optimized

Returns:

Type Description
str

Description string if applied, empty string otherwise

Source code in sqlstream/optimizers/base.py
def get_description(self) -> str:
    """
    Get description of what was optimized

    Returns:
        Description string if applied, empty string otherwise
    """
    return self.description if self.applied else ""

was_applied

was_applied() -> bool

Check if optimization was applied

Returns:

Type Description
bool

True if optimization was applied

Source code in sqlstream/optimizers/base.py
def was_applied(self) -> bool:
    """
    Check if optimization was applied

    Returns:
        True if optimization was applied
    """
    return self.applied

OptimizerPipeline

OptimizerPipeline

Pipeline that applies multiple optimizers in sequence

Optimizers are applied in order, and each can build on the previous optimizations.

Source code in sqlstream/optimizers/base.py
class OptimizerPipeline:
    """
    Pipeline that applies multiple optimizers in sequence

    Optimizers are applied in order, and each can build on
    the previous optimizations.
    """

    def __init__(self, optimizers: list[Optimizer]):
        """
        Initialize pipeline

        Args:
            optimizers: List of optimizers to apply in order
        """
        self.optimizers = optimizers

    def optimize(self, ast: SelectStatement, reader: BaseReader) -> None:
        """
        Apply all optimizers in sequence

        Args:
            ast: Parsed SQL statement
            reader: Data source reader
        """
        for optimizer in self.optimizers:
            if optimizer.can_optimize(ast, reader):
                optimizer.optimize(ast, reader)

    def get_applied_optimizations(self) -> list[str]:
        """
        Get list of optimizations that were applied

        Returns:
            List of optimization descriptions
        """
        return [
            f"{opt.get_name()}: {opt.get_description()}"
            for opt in self.optimizers
            if opt.was_applied()
        ]

    def get_summary(self) -> str:
        """
        Get summary of all applied optimizations

        Returns:
            Human-readable summary
        """
        applied = self.get_applied_optimizations()

        if not applied:
            return "No optimizations applied"

        summary = "Optimizations applied:\n"
        for opt_desc in applied:
            summary += f"  - {opt_desc}\n"

        return summary.strip()

__init__

__init__(optimizers: list[Optimizer])

Initialize pipeline

Parameters:

Name Type Description Default
optimizers list[Optimizer]

List of optimizers to apply in order

required
Source code in sqlstream/optimizers/base.py
def __init__(self, optimizers: list[Optimizer]):
    """
    Initialize pipeline

    Args:
        optimizers: List of optimizers to apply in order
    """
    self.optimizers = optimizers

optimize

optimize(ast: SelectStatement, reader: BaseReader) -> None

Apply all optimizers in sequence

Parameters:

Name Type Description Default
ast SelectStatement

Parsed SQL statement

required
reader BaseReader

Data source reader

required
Source code in sqlstream/optimizers/base.py
def optimize(self, ast: SelectStatement, reader: BaseReader) -> None:
    """
    Apply all optimizers in sequence

    Args:
        ast: Parsed SQL statement
        reader: Data source reader
    """
    for optimizer in self.optimizers:
        if optimizer.can_optimize(ast, reader):
            optimizer.optimize(ast, reader)

get_applied_optimizations

get_applied_optimizations() -> list[str]

Get list of optimizations that were applied

Returns:

Type Description
list[str]

List of optimization descriptions

Source code in sqlstream/optimizers/base.py
def get_applied_optimizations(self) -> list[str]:
    """
    Get list of optimizations that were applied

    Returns:
        List of optimization descriptions
    """
    return [
        f"{opt.get_name()}: {opt.get_description()}"
        for opt in self.optimizers
        if opt.was_applied()
    ]

get_summary

get_summary() -> str

Get summary of all applied optimizations

Returns:

Type Description
str

Human-readable summary

Source code in sqlstream/optimizers/base.py
def get_summary(self) -> str:
    """
    Get summary of all applied optimizations

    Returns:
        Human-readable summary
    """
    applied = self.get_applied_optimizations()

    if not applied:
        return "No optimizations applied"

    summary = "Optimizations applied:\n"
    for opt_desc in applied:
        summary += f"  - {opt_desc}\n"

    return summary.strip()

ColumnPruningOptimizer

ColumnPruningOptimizer

Bases: Optimizer

Prune (skip reading) unused columns

Benefits: - Massive I/O reduction for wide tables - Reduces memory usage - Critical for columnar formats (Parquet, ORC) - Can read 10x faster if selecting 1 column from 10

Example

SELECT name, age FROM employees -- 100 columns total

Without pruning: Read all 100 columns → Project 2 With pruning: Read only 2 columns → Much faster

Source code in sqlstream/optimizers/column_pruning.py
class ColumnPruningOptimizer(Optimizer):
    """
    Prune (skip reading) unused columns

    Benefits:
    - Massive I/O reduction for wide tables
    - Reduces memory usage
    - Critical for columnar formats (Parquet, ORC)
    - Can read 10x faster if selecting 1 column from 10

    Example:
        SELECT name, age FROM employees  -- 100 columns total

        Without pruning: Read all 100 columns → Project 2
        With pruning: Read only 2 columns → Much faster
    """

    def get_name(self) -> str:
        return "Column pruning"

    def can_optimize(self, ast: SelectStatement, reader: BaseReader) -> bool:
        """
        Check if column pruning is applicable

        Conditions:
        1. Reader supports column selection
        2. Not SELECT * (can't prune if all columns needed)

        Args:
            ast: Parsed SQL statement
            reader: Data source reader

        Returns:
            True if optimization can be applied
        """
        # Reader must support column selection
        if not reader.supports_column_selection():
            return False

        # Can't prune with SELECT *
        if "*" in ast.columns:
            return False

        return True

    def optimize(self, ast: SelectStatement, reader: BaseReader) -> None:
        """
        Apply column pruning optimization

        Args:
            ast: Parsed SQL statement
            reader: Data source reader
        """
        # Analyze which columns are actually needed
        needed_columns = self._analyze_column_dependencies(ast)

        # Don't apply if SELECT * found during analysis
        if "*" in needed_columns:
            return

        reader.set_columns(needed_columns)
        self.applied = True
        self.description = f"{len(needed_columns)} column(s) selected"

    def _analyze_column_dependencies(self, ast: SelectStatement) -> list[str]:
        """
        Determine which columns are needed for the query

        Columns are needed if they appear in:
        - SELECT clause
        - WHERE clause
        - GROUP BY clause
        - ORDER BY clause
        - Aggregate functions
        - JOIN conditions

        Args:
            ast: Parsed SQL statement

        Returns:
            List of required column names (or ['*'] for all)
        """
        needed: set[str] = set()

        # Columns from SELECT clause
        if "*" in ast.columns:
            return ["*"]  # Can't prune if SELECT *

        needed.update(ast.columns)

        # Columns from WHERE clause
        if ast.where:
            for condition in ast.where.conditions:
                needed.add(condition.column)

        # Columns from GROUP BY clause
        if ast.group_by:
            needed.update(ast.group_by)

        # Columns from ORDER BY clause
        if ast.order_by:
            for order_col in ast.order_by:
                needed.add(order_col.column)

        # Columns from aggregate functions
        if ast.aggregates:
            for agg in ast.aggregates:
                if agg.column != "*":  # COUNT(*) doesn't need a column
                    needed.add(agg.column)

        # Columns from JOIN conditions
        if ast.join:
            # Need the left join key from the left table
            needed.add(ast.join.on_left)
            # Note: right join key is from right table, handled separately

        return list(needed)

can_optimize

can_optimize(ast: SelectStatement, reader: BaseReader) -> bool

Check if column pruning is applicable

Conditions: 1. Reader supports column selection 2. Not SELECT * (can't prune if all columns needed)

Parameters:

Name Type Description Default
ast SelectStatement

Parsed SQL statement

required
reader BaseReader

Data source reader

required

Returns:

Type Description
bool

True if optimization can be applied

Source code in sqlstream/optimizers/column_pruning.py
def can_optimize(self, ast: SelectStatement, reader: BaseReader) -> bool:
    """
    Check if column pruning is applicable

    Conditions:
    1. Reader supports column selection
    2. Not SELECT * (can't prune if all columns needed)

    Args:
        ast: Parsed SQL statement
        reader: Data source reader

    Returns:
        True if optimization can be applied
    """
    # Reader must support column selection
    if not reader.supports_column_selection():
        return False

    # Can't prune with SELECT *
    if "*" in ast.columns:
        return False

    return True

optimize

optimize(ast: SelectStatement, reader: BaseReader) -> None

Apply column pruning optimization

Parameters:

Name Type Description Default
ast SelectStatement

Parsed SQL statement

required
reader BaseReader

Data source reader

required
Source code in sqlstream/optimizers/column_pruning.py
def optimize(self, ast: SelectStatement, reader: BaseReader) -> None:
    """
    Apply column pruning optimization

    Args:
        ast: Parsed SQL statement
        reader: Data source reader
    """
    # Analyze which columns are actually needed
    needed_columns = self._analyze_column_dependencies(ast)

    # Don't apply if SELECT * found during analysis
    if "*" in needed_columns:
        return

    reader.set_columns(needed_columns)
    self.applied = True
    self.description = f"{len(needed_columns)} column(s) selected"

ColumnStatistics

ColumnStatistics dataclass

Statistics about a single column

Attributes:

Name Type Description
distinct_count int

Number of distinct values (cardinality)

null_count int

Number of NULL values

min_value Any

Minimum value

max_value Any

Maximum value

avg_length float

Average value length (for strings)

Source code in sqlstream/optimizers/cost_based.py
@dataclass
class ColumnStatistics:
    """
    Statistics about a single column

    Attributes:
        distinct_count: Number of distinct values (cardinality)
        null_count: Number of NULL values
        min_value: Minimum value
        max_value: Maximum value
        avg_length: Average value length (for strings)
    """

    distinct_count: int = 0
    null_count: int = 0
    min_value: Any = None
    max_value: Any = None
    avg_length: float = 0.0

CostBasedOptimizer

CostBasedOptimizer

Bases: Optimizer

Cost-based optimization framework

This is a meta-optimizer that provides infrastructure for cost-based decisions in other optimizers.

Benefits: - Statistics-driven decisions - Better join ordering - Better index selection (future) - Adaptive query execution (future)

Note

This is a framework/placeholder. Real cost-based optimization requires statistics collection, which is expensive. For now, we just provide the infrastructure and simple cost models.

Source code in sqlstream/optimizers/cost_based.py
class CostBasedOptimizer(Optimizer):
    """
    Cost-based optimization framework

    This is a meta-optimizer that provides infrastructure for
    cost-based decisions in other optimizers.

    Benefits:
    - Statistics-driven decisions
    - Better join ordering
    - Better index selection (future)
    - Adaptive query execution (future)

    Note:
        This is a framework/placeholder. Real cost-based optimization
        requires statistics collection, which is expensive. For now,
        we just provide the infrastructure and simple cost models.
    """

    def __init__(self):
        super().__init__()
        self.statistics_cache: dict[str, TableStatistics] = {}

    def get_name(self) -> str:
        return "Cost-based optimization"

    def can_optimize(self, ast: SelectStatement, reader: BaseReader) -> bool:
        """
        Check if cost-based optimization is applicable

        For now, this is disabled as it requires statistics collection.

        Args:
            ast: Parsed SQL statement
            reader: Data source reader

        Returns:
            False (disabled for now)
        """
        # Cost-based optimization requires:
        # 1. Statistics collection (expensive - need to scan data)
        # 2. Cost models for all operations
        # 3. Plan enumeration and comparison
        # 4. Plan selection

        # This is complex and expensive, so we disable it for now
        return False

    def optimize(self, ast: SelectStatement, reader: BaseReader) -> None:
        """
        Apply cost-based optimizations

        Args:
            ast: Parsed SQL statement
            reader: Data source reader

        Note:
            This is a placeholder for future implementation
        """
        # Future implementation would:
        # 1. Collect or lookup table statistics
        # 2. Estimate costs for different query plans
        # 3. Choose the lowest-cost plan
        # 4. Rewrite AST to execute chosen plan

        self.applied = True
        self.description = "framework ready (not yet active)"

    def collect_statistics(self, reader: BaseReader, sample_size: int = 1000) -> TableStatistics:
        """
        Collect statistics from a data source

        Args:
            reader: Data source to collect stats from
            sample_size: Number of rows to sample (for efficiency)

        Returns:
            Table statistics

        Note:
            This is expensive - requires reading data
            In production, stats would be cached and updated periodically
        """
        stats = TableStatistics()

        # Sample rows
        rows_sampled = 0
        column_values: dict[str, set] = {}

        for row in reader.read_lazy():
            rows_sampled += 1

            # Track distinct values per column
            for col, value in row.items():
                if col not in column_values:
                    column_values[col] = set()
                column_values[col].add(value)

            if rows_sampled >= sample_size:
                break

        # Estimate total row count (extrapolate from sample)
        # This is a rough estimate - real implementation would use metadata
        stats.row_count = rows_sampled

        # Calculate column statistics
        for col, values in column_values.items():
            col_stats = ColumnStatistics(
                distinct_count=len(values),
                null_count=sum(1 for v in values if v is None),
            )

            # Calculate min/max if comparable
            non_null_values = [v for v in values if v is not None]
            if non_null_values:
                try:
                    col_stats.min_value = min(non_null_values)
                    col_stats.max_value = max(non_null_values)
                except TypeError:
                    # Values not comparable
                    pass

            stats.column_stats[col] = col_stats

        return stats

can_optimize

can_optimize(ast: SelectStatement, reader: BaseReader) -> bool

Check if cost-based optimization is applicable

For now, this is disabled as it requires statistics collection.

Parameters:

Name Type Description Default
ast SelectStatement

Parsed SQL statement

required
reader BaseReader

Data source reader

required

Returns:

Type Description
bool

False (disabled for now)

Source code in sqlstream/optimizers/cost_based.py
def can_optimize(self, ast: SelectStatement, reader: BaseReader) -> bool:
    """
    Check if cost-based optimization is applicable

    For now, this is disabled as it requires statistics collection.

    Args:
        ast: Parsed SQL statement
        reader: Data source reader

    Returns:
        False (disabled for now)
    """
    # Cost-based optimization requires:
    # 1. Statistics collection (expensive - need to scan data)
    # 2. Cost models for all operations
    # 3. Plan enumeration and comparison
    # 4. Plan selection

    # This is complex and expensive, so we disable it for now
    return False

optimize

optimize(ast: SelectStatement, reader: BaseReader) -> None

Apply cost-based optimizations

Parameters:

Name Type Description Default
ast SelectStatement

Parsed SQL statement

required
reader BaseReader

Data source reader

required
Note

This is a placeholder for future implementation

Source code in sqlstream/optimizers/cost_based.py
def optimize(self, ast: SelectStatement, reader: BaseReader) -> None:
    """
    Apply cost-based optimizations

    Args:
        ast: Parsed SQL statement
        reader: Data source reader

    Note:
        This is a placeholder for future implementation
    """
    # Future implementation would:
    # 1. Collect or lookup table statistics
    # 2. Estimate costs for different query plans
    # 3. Choose the lowest-cost plan
    # 4. Rewrite AST to execute chosen plan

    self.applied = True
    self.description = "framework ready (not yet active)"

collect_statistics

collect_statistics(reader: BaseReader, sample_size: int = 1000) -> TableStatistics

Collect statistics from a data source

Parameters:

Name Type Description Default
reader BaseReader

Data source to collect stats from

required
sample_size int

Number of rows to sample (for efficiency)

1000

Returns:

Type Description
TableStatistics

Table statistics

Note

This is expensive - requires reading data In production, stats would be cached and updated periodically

Source code in sqlstream/optimizers/cost_based.py
def collect_statistics(self, reader: BaseReader, sample_size: int = 1000) -> TableStatistics:
    """
    Collect statistics from a data source

    Args:
        reader: Data source to collect stats from
        sample_size: Number of rows to sample (for efficiency)

    Returns:
        Table statistics

    Note:
        This is expensive - requires reading data
        In production, stats would be cached and updated periodically
    """
    stats = TableStatistics()

    # Sample rows
    rows_sampled = 0
    column_values: dict[str, set] = {}

    for row in reader.read_lazy():
        rows_sampled += 1

        # Track distinct values per column
        for col, value in row.items():
            if col not in column_values:
                column_values[col] = set()
            column_values[col].add(value)

        if rows_sampled >= sample_size:
            break

    # Estimate total row count (extrapolate from sample)
    # This is a rough estimate - real implementation would use metadata
    stats.row_count = rows_sampled

    # Calculate column statistics
    for col, values in column_values.items():
        col_stats = ColumnStatistics(
            distinct_count=len(values),
            null_count=sum(1 for v in values if v is None),
        )

        # Calculate min/max if comparable
        non_null_values = [v for v in values if v is not None]
        if non_null_values:
            try:
                col_stats.min_value = min(non_null_values)
                col_stats.max_value = max(non_null_values)
            except TypeError:
                # Values not comparable
                pass

        stats.column_stats[col] = col_stats

    return stats

CostModel

CostModel

Cost model for estimating query operation costs

Costs are in abstract units. Lower is better. The goal is to compare different plans, not to predict absolute runtime.

Source code in sqlstream/optimizers/cost_based.py
class CostModel:
    """
    Cost model for estimating query operation costs

    Costs are in abstract units. Lower is better.
    The goal is to compare different plans, not to predict absolute runtime.
    """

    # Cost constants (tunable)
    COST_PER_ROW_SCAN = 1.0  # Cost to read one row
    COST_PER_ROW_FILTER = 0.1  # Cost to evaluate filter on one row
    COST_PER_ROW_PROJECT = 0.05  # Cost to project one row
    COST_PER_ROW_SORT = 2.0  # Cost to sort one row (N log N)
    COST_PER_ROW_HASH = 1.5  # Cost to hash one row (for joins/groups)
    COST_PER_ROW_JOIN = 0.5  # Cost to join one row

    @classmethod
    def estimate_scan_cost(cls, row_count: int) -> float:
        """
        Estimate cost of scanning a table

        Args:
            row_count: Number of rows to scan

        Returns:
            Estimated cost
        """
        return row_count * cls.COST_PER_ROW_SCAN

    @classmethod
    def estimate_filter_cost(cls, row_count: int, selectivity: float = 0.1) -> float:
        """
        Estimate cost of filtering rows

        Args:
            row_count: Number of input rows
            selectivity: Fraction of rows that pass filter (0.0-1.0)

        Returns:
            Estimated cost
        """
        # Cost to evaluate filter on all rows
        filter_cost = row_count * cls.COST_PER_ROW_FILTER
        # Output row count for downstream operations
        row_count * selectivity
        return filter_cost

    @classmethod
    def estimate_join_cost(cls, left_rows: int, right_rows: int, selectivity: float = 0.1) -> float:
        """
        Estimate cost of hash join

        Args:
            left_rows: Number of rows in left table
            right_rows: Number of rows in right table
            selectivity: Fraction of cartesian product that matches

        Returns:
            Estimated cost
        """
        # Build hash table on smaller table
        build_rows = min(left_rows, right_rows)
        probe_rows = max(left_rows, right_rows)

        # Cost to build hash table
        build_cost = build_rows * cls.COST_PER_ROW_HASH

        # Cost to probe hash table
        probe_cost = probe_rows * cls.COST_PER_ROW_JOIN

        # Output row count
        left_rows * right_rows * selectivity

        return build_cost + probe_cost

    @classmethod
    def estimate_sort_cost(cls, row_count: int) -> float:
        """
        Estimate cost of sorting

        Args:
            row_count: Number of rows to sort

        Returns:
            Estimated cost (O(N log N))
        """
        import math

        if row_count <= 1:
            return 0.0

        return row_count * math.log2(row_count) * cls.COST_PER_ROW_SORT

    @classmethod
    def estimate_selectivity(
        cls, condition: Condition, stats: ColumnStatistics | None = None
    ) -> float:
        """
        Estimate selectivity of a filter condition

        Args:
            condition: Filter condition
            stats: Column statistics (if available)

        Returns:
            Estimated selectivity (0.0-1.0)

        Note:
            These are rough heuristics. Real databases use histograms.
        """
        op = condition.operator

        # Default selectivities (rough heuristics)
        if op == "=":
            # Equality: depends on cardinality
            if stats and stats.distinct_count > 0:
                return 1.0 / stats.distinct_count
            return 0.1  # Default guess

        elif op in (">", "<"):
            # Range: assume half the rows
            return 0.5

        elif op in (">=", "<="):
            # Range: assume half the rows plus equals
            return 0.5

        elif op == "!=":
            # Not equals: most rows
            if stats and stats.distinct_count > 0:
                return 1.0 - (1.0 / stats.distinct_count)
            return 0.9  # Default guess

        else:
            # Unknown operator
            return 0.5  # Middle ground

estimate_scan_cost classmethod

estimate_scan_cost(row_count: int) -> float

Estimate cost of scanning a table

Parameters:

Name Type Description Default
row_count int

Number of rows to scan

required

Returns:

Type Description
float

Estimated cost

Source code in sqlstream/optimizers/cost_based.py
@classmethod
def estimate_scan_cost(cls, row_count: int) -> float:
    """
    Estimate cost of scanning a table

    Args:
        row_count: Number of rows to scan

    Returns:
        Estimated cost
    """
    return row_count * cls.COST_PER_ROW_SCAN

estimate_filter_cost classmethod

estimate_filter_cost(row_count: int, selectivity: float = 0.1) -> float

Estimate cost of filtering rows

Parameters:

Name Type Description Default
row_count int

Number of input rows

required
selectivity float

Fraction of rows that pass filter (0.0-1.0)

0.1

Returns:

Type Description
float

Estimated cost

Source code in sqlstream/optimizers/cost_based.py
@classmethod
def estimate_filter_cost(cls, row_count: int, selectivity: float = 0.1) -> float:
    """
    Estimate cost of filtering rows

    Args:
        row_count: Number of input rows
        selectivity: Fraction of rows that pass filter (0.0-1.0)

    Returns:
        Estimated cost
    """
    # Cost to evaluate filter on all rows
    filter_cost = row_count * cls.COST_PER_ROW_FILTER
    # Output row count for downstream operations
    row_count * selectivity
    return filter_cost

estimate_join_cost classmethod

estimate_join_cost(left_rows: int, right_rows: int, selectivity: float = 0.1) -> float

Estimate cost of hash join

Parameters:

Name Type Description Default
left_rows int

Number of rows in left table

required
right_rows int

Number of rows in right table

required
selectivity float

Fraction of cartesian product that matches

0.1

Returns:

Type Description
float

Estimated cost

Source code in sqlstream/optimizers/cost_based.py
@classmethod
def estimate_join_cost(cls, left_rows: int, right_rows: int, selectivity: float = 0.1) -> float:
    """
    Estimate cost of hash join

    Args:
        left_rows: Number of rows in left table
        right_rows: Number of rows in right table
        selectivity: Fraction of cartesian product that matches

    Returns:
        Estimated cost
    """
    # Build hash table on smaller table
    build_rows = min(left_rows, right_rows)
    probe_rows = max(left_rows, right_rows)

    # Cost to build hash table
    build_cost = build_rows * cls.COST_PER_ROW_HASH

    # Cost to probe hash table
    probe_cost = probe_rows * cls.COST_PER_ROW_JOIN

    # Output row count
    left_rows * right_rows * selectivity

    return build_cost + probe_cost

estimate_sort_cost classmethod

estimate_sort_cost(row_count: int) -> float

Estimate cost of sorting

Parameters:

Name Type Description Default
row_count int

Number of rows to sort

required

Returns:

Type Description
float

Estimated cost (O(N log N))

Source code in sqlstream/optimizers/cost_based.py
@classmethod
def estimate_sort_cost(cls, row_count: int) -> float:
    """
    Estimate cost of sorting

    Args:
        row_count: Number of rows to sort

    Returns:
        Estimated cost (O(N log N))
    """
    import math

    if row_count <= 1:
        return 0.0

    return row_count * math.log2(row_count) * cls.COST_PER_ROW_SORT

estimate_selectivity classmethod

estimate_selectivity(condition: Condition, stats: ColumnStatistics | None = None) -> float

Estimate selectivity of a filter condition

Parameters:

Name Type Description Default
condition Condition

Filter condition

required
stats ColumnStatistics | None

Column statistics (if available)

None

Returns:

Type Description
float

Estimated selectivity (0.0-1.0)

Note

These are rough heuristics. Real databases use histograms.

Source code in sqlstream/optimizers/cost_based.py
@classmethod
def estimate_selectivity(
    cls, condition: Condition, stats: ColumnStatistics | None = None
) -> float:
    """
    Estimate selectivity of a filter condition

    Args:
        condition: Filter condition
        stats: Column statistics (if available)

    Returns:
        Estimated selectivity (0.0-1.0)

    Note:
        These are rough heuristics. Real databases use histograms.
    """
    op = condition.operator

    # Default selectivities (rough heuristics)
    if op == "=":
        # Equality: depends on cardinality
        if stats and stats.distinct_count > 0:
            return 1.0 / stats.distinct_count
        return 0.1  # Default guess

    elif op in (">", "<"):
        # Range: assume half the rows
        return 0.5

    elif op in (">=", "<="):
        # Range: assume half the rows plus equals
        return 0.5

    elif op == "!=":
        # Not equals: most rows
        if stats and stats.distinct_count > 0:
            return 1.0 - (1.0 / stats.distinct_count)
        return 0.9  # Default guess

    else:
        # Unknown operator
        return 0.5  # Middle ground

TableStatistics

TableStatistics dataclass

Statistics about a table/data source

Attributes:

Name Type Description
row_count int

Total number of rows

column_stats dict[str, ColumnStatistics]

Per-column statistics (cardinality, min/max, nulls)

size_bytes int

Approximate size in bytes

Source code in sqlstream/optimizers/cost_based.py
@dataclass
class TableStatistics:
    """
    Statistics about a table/data source

    Attributes:
        row_count: Total number of rows
        column_stats: Per-column statistics (cardinality, min/max, nulls)
        size_bytes: Approximate size in bytes
    """

    row_count: int = 0
    column_stats: dict[str, "ColumnStatistics"] = None
    size_bytes: int = 0

    def __post_init__(self):
        if self.column_stats is None:
            self.column_stats = {}

JoinReorderingOptimizer

JoinReorderingOptimizer

Bases: Optimizer

Reorder joins to minimize intermediate result size

Benefits: - Smaller intermediate results = less memory - Faster execution (less data to process) - Better cache utilization

Strategy: - For now: Simple heuristic (join smallest first) - Future: Cost-based with statistics

Example

Tables: A (1M rows), B (100 rows), C (1K rows)

Bad order: A JOIN B JOIN C → (1M × 100) JOIN C = huge intermediate result

Good order: B JOIN C JOIN A → (100 × 1K) JOIN A = smaller intermediate result

Note

This is a placeholder implementation. Full join reordering requires table statistics and is complex. For now, we just track that joins could be reordered.

Source code in sqlstream/optimizers/join_reordering.py
class JoinReorderingOptimizer(Optimizer):
    """
    Reorder joins to minimize intermediate result size

    Benefits:
    - Smaller intermediate results = less memory
    - Faster execution (less data to process)
    - Better cache utilization

    Strategy:
    - For now: Simple heuristic (join smallest first)
    - Future: Cost-based with statistics

    Example:
        Tables: A (1M rows), B (100 rows), C (1K rows)

        Bad order:  A JOIN B JOIN C
        → (1M × 100) JOIN C = huge intermediate result

        Good order: B JOIN C JOIN A
        → (100 × 1K) JOIN A = smaller intermediate result

    Note:
        This is a placeholder implementation. Full join reordering
        requires table statistics and is complex. For now, we
        just track that joins could be reordered.
    """

    def get_name(self) -> str:
        return "Join reordering"

    def can_optimize(self, ast: SelectStatement, reader: BaseReader) -> bool:
        """
        Check if join reordering is applicable

        Conditions:
        1. Query has JOIN clause
        2. No circular dependencies in join conditions
        3. All joins are inner joins (outer joins have order constraints)

        Args:
            ast: Parsed SQL statement
            reader: Data source reader

        Returns:
            True if optimization can be applied
        """
        # Must have a join
        if not ast.join:
            return False

        # For now, we don't actually reorder (placeholder)
        # Full implementation would need:
        # - Table statistics (row counts)
        # - Join selectivity estimation
        # - Graph analysis to find optimal order
        # - Preservation of join semantics

        # This is a marker that join reordering could be applied
        # but we don't implement it yet to avoid breaking correctness

        return False  # Disabled for now

    def optimize(self, ast: SelectStatement, reader: BaseReader) -> None:
        """
        Apply join reordering optimization

        Args:
            ast: Parsed SQL statement
            reader: Data source reader

        Note:
            This is a placeholder. Real implementation would:
            1. Collect table statistics (row counts)
            2. Estimate join selectivity
            3. Build join graph
            4. Find optimal join order (dynamic programming or greedy)
            5. Rewrite AST with new join order
            6. Preserve join semantics (INNER vs OUTER)
        """
        # Placeholder - actual implementation would reorder joins here
        # For now, just mark as applied if we detected potential
        self.applied = True
        self.description = "placeholder (not yet implemented)"

    def _analyze_join_graph(self, ast: SelectStatement) -> list[tuple[str, str]]:
        """
        Analyze join graph to understand table relationships

        Args:
            ast: Parsed SQL statement

        Returns:
            List of (left_table, right_table) pairs

        Note:
            This is a helper for future implementation
        """
        edges = []

        if ast.join:
            # Extract join relationships
            # This would need to parse join conditions to build graph
            pass

        return edges

    def _estimate_join_cost(
        self, left_table: str, right_table: str, selectivity: float = 0.1
    ) -> float:
        """
        Estimate cost of joining two tables

        Args:
            left_table: Name of left table
            right_table: Name of right table
            selectivity: Estimated fraction of rows that match (0.0-1.0)

        Returns:
            Estimated cost (lower is better)

        Note:
            This is a helper for future implementation
            Real implementation would use actual table statistics
        """
        # Placeholder - would need real table statistics
        # Cost = (left_size * right_size) * selectivity
        return 0.0

    def _find_optimal_join_order(
        self, tables: list[str], join_graph: list[tuple[str, str]]
    ) -> list[str]:
        """
        Find optimal order to join tables

        Args:
            tables: List of table names
            join_graph: List of (table1, table2) join pairs

        Returns:
            Optimal order to join tables

        Note:
            This is a helper for future implementation
            Classic dynamic programming problem (like traveling salesman)
        """
        # Placeholder - would implement DP or greedy algorithm
        # For now, just return original order
        return tables

can_optimize

can_optimize(ast: SelectStatement, reader: BaseReader) -> bool

Check if join reordering is applicable

Conditions: 1. Query has JOIN clause 2. No circular dependencies in join conditions 3. All joins are inner joins (outer joins have order constraints)

Parameters:

Name Type Description Default
ast SelectStatement

Parsed SQL statement

required
reader BaseReader

Data source reader

required

Returns:

Type Description
bool

True if optimization can be applied

Source code in sqlstream/optimizers/join_reordering.py
def can_optimize(self, ast: SelectStatement, reader: BaseReader) -> bool:
    """
    Check if join reordering is applicable

    Conditions:
    1. Query has JOIN clause
    2. No circular dependencies in join conditions
    3. All joins are inner joins (outer joins have order constraints)

    Args:
        ast: Parsed SQL statement
        reader: Data source reader

    Returns:
        True if optimization can be applied
    """
    # Must have a join
    if not ast.join:
        return False

    # For now, we don't actually reorder (placeholder)
    # Full implementation would need:
    # - Table statistics (row counts)
    # - Join selectivity estimation
    # - Graph analysis to find optimal order
    # - Preservation of join semantics

    # This is a marker that join reordering could be applied
    # but we don't implement it yet to avoid breaking correctness

    return False  # Disabled for now

optimize

optimize(ast: SelectStatement, reader: BaseReader) -> None

Apply join reordering optimization

Parameters:

Name Type Description Default
ast SelectStatement

Parsed SQL statement

required
reader BaseReader

Data source reader

required
Note

This is a placeholder. Real implementation would: 1. Collect table statistics (row counts) 2. Estimate join selectivity 3. Build join graph 4. Find optimal join order (dynamic programming or greedy) 5. Rewrite AST with new join order 6. Preserve join semantics (INNER vs OUTER)

Source code in sqlstream/optimizers/join_reordering.py
def optimize(self, ast: SelectStatement, reader: BaseReader) -> None:
    """
    Apply join reordering optimization

    Args:
        ast: Parsed SQL statement
        reader: Data source reader

    Note:
        This is a placeholder. Real implementation would:
        1. Collect table statistics (row counts)
        2. Estimate join selectivity
        3. Build join graph
        4. Find optimal join order (dynamic programming or greedy)
        5. Rewrite AST with new join order
        6. Preserve join semantics (INNER vs OUTER)
    """
    # Placeholder - actual implementation would reorder joins here
    # For now, just mark as applied if we detected potential
    self.applied = True
    self.description = "placeholder (not yet implemented)"

LimitPushdownOptimizer

LimitPushdownOptimizer

Bases: Optimizer

Push LIMIT to the reader for early termination

Benefits: - Stop reading after N rows - Massive speedup for large files - Reduces memory usage

Example

SELECT * FROM large_file.csv LIMIT 10

Without pushdown: Read entire file → Take first 10 With pushdown: Stop reading after 10 rows → Much faster

Note

Cannot push down if query has: - ORDER BY (need to see all rows to sort) - GROUP BY (need to see all rows to group) - Aggregates (need all rows to aggregate) - JOIN (complex - may need all rows)

Source code in sqlstream/optimizers/limit_pushdown.py
class LimitPushdownOptimizer(Optimizer):
    """
    Push LIMIT to the reader for early termination

    Benefits:
    - Stop reading after N rows
    - Massive speedup for large files
    - Reduces memory usage

    Example:
        SELECT * FROM large_file.csv LIMIT 10

        Without pushdown: Read entire file → Take first 10
        With pushdown: Stop reading after 10 rows → Much faster

    Note:
        Cannot push down if query has:
        - ORDER BY (need to see all rows to sort)
        - GROUP BY (need to see all rows to group)
        - Aggregates (need all rows to aggregate)
        - JOIN (complex - may need all rows)
    """

    def get_name(self) -> str:
        return "Limit pushdown"

    def can_optimize(self, ast: SelectStatement, reader: BaseReader) -> bool:
        """
        Check if limit pushdown is applicable

        Conditions:
        1. Query has LIMIT clause
        2. Reader supports limit pushdown
        3. No ORDER BY (would need to read all rows first)
        4. No GROUP BY (would need to read all rows first)
        5. No aggregates (would need to read all rows first)
        6. No JOIN (complex - skip for now)

        Args:
            ast: Parsed SQL statement
            reader: Data source reader

        Returns:
            True if optimization can be applied
        """
        # Must have LIMIT
        if ast.limit is None:
            return False

        # Reader must support limit pushdown
        if not reader.supports_limit():
            return False

        # Cannot push down with ORDER BY
        if ast.order_by:
            return False

        # Cannot push down with GROUP BY
        if ast.group_by:
            return False

        # Cannot push down with aggregates
        if ast.aggregates:
            return False

        # Cannot push down with JOIN (for now)
        if ast.join:
            return False

        return True

    def optimize(self, ast: SelectStatement, reader: BaseReader) -> None:
        """
        Apply limit pushdown optimization

        Args:
            ast: Parsed SQL statement
            reader: Data source reader
        """
        reader.set_limit(ast.limit)
        self.applied = True
        self.description = f"limit {ast.limit}"

can_optimize

can_optimize(ast: SelectStatement, reader: BaseReader) -> bool

Check if limit pushdown is applicable

Conditions: 1. Query has LIMIT clause 2. Reader supports limit pushdown 3. No ORDER BY (would need to read all rows first) 4. No GROUP BY (would need to read all rows first) 5. No aggregates (would need to read all rows first) 6. No JOIN (complex - skip for now)

Parameters:

Name Type Description Default
ast SelectStatement

Parsed SQL statement

required
reader BaseReader

Data source reader

required

Returns:

Type Description
bool

True if optimization can be applied

Source code in sqlstream/optimizers/limit_pushdown.py
def can_optimize(self, ast: SelectStatement, reader: BaseReader) -> bool:
    """
    Check if limit pushdown is applicable

    Conditions:
    1. Query has LIMIT clause
    2. Reader supports limit pushdown
    3. No ORDER BY (would need to read all rows first)
    4. No GROUP BY (would need to read all rows first)
    5. No aggregates (would need to read all rows first)
    6. No JOIN (complex - skip for now)

    Args:
        ast: Parsed SQL statement
        reader: Data source reader

    Returns:
        True if optimization can be applied
    """
    # Must have LIMIT
    if ast.limit is None:
        return False

    # Reader must support limit pushdown
    if not reader.supports_limit():
        return False

    # Cannot push down with ORDER BY
    if ast.order_by:
        return False

    # Cannot push down with GROUP BY
    if ast.group_by:
        return False

    # Cannot push down with aggregates
    if ast.aggregates:
        return False

    # Cannot push down with JOIN (for now)
    if ast.join:
        return False

    return True

optimize

optimize(ast: SelectStatement, reader: BaseReader) -> None

Apply limit pushdown optimization

Parameters:

Name Type Description Default
ast SelectStatement

Parsed SQL statement

required
reader BaseReader

Data source reader

required
Source code in sqlstream/optimizers/limit_pushdown.py
def optimize(self, ast: SelectStatement, reader: BaseReader) -> None:
    """
    Apply limit pushdown optimization

    Args:
        ast: Parsed SQL statement
        reader: Data source reader
    """
    reader.set_limit(ast.limit)
    self.applied = True
    self.description = f"limit {ast.limit}"

PartitionPruningOptimizer

PartitionPruningOptimizer

Bases: Optimizer

Prune (skip) partitions that don't match filter conditions

Benefits: - Massive I/O reduction for partitioned datasets - Skip entire directories/files - Critical for data lakes and big data - Can reduce data read by 10x-1000x

Example

Dataset partitioned by date: year=YYYY/month=MM/day=DD/ Query: WHERE date >= '2024-01-01' → Skip all partitions before 2024

Source code in sqlstream/optimizers/partition_pruning.py
class PartitionPruningOptimizer(Optimizer):
    """
    Prune (skip) partitions that don't match filter conditions

    Benefits:
    - Massive I/O reduction for partitioned datasets
    - Skip entire directories/files
    - Critical for data lakes and big data
    - Can reduce data read by 10x-1000x

    Example:
        Dataset partitioned by date: year=YYYY/month=MM/day=DD/
        Query: WHERE date >= '2024-01-01'
        → Skip all partitions before 2024
    """

    def get_name(self) -> str:
        return "Partition pruning"

    def can_optimize(self, ast: SelectStatement, reader: BaseReader) -> bool:
        """
        Check if partition pruning is applicable

        Conditions:
        1. Reader supports partition pruning
        2. Query has WHERE clause
        3. WHERE clause references partition columns

        Args:
            ast: Parsed SQL statement
            reader: Data source reader

        Returns:
            True if optimization can be applied
        """
        # Reader must support partition pruning
        if not hasattr(reader, "supports_partition_pruning"):
            return False

        if not reader.supports_partition_pruning():
            return False

        # Must have WHERE clause
        if not ast.where:
            return False

        # Check if any filter conditions reference partition columns
        partition_cols = reader.get_partition_columns()
        if not partition_cols:
            return False

        filter_cols = {cond.column for cond in ast.where.conditions}
        if not filter_cols.intersection(partition_cols):
            return False

        return True

    def optimize(self, ast: SelectStatement, reader: BaseReader) -> None:
        """
        Apply partition pruning optimization

        Args:
            ast: Parsed SQL statement
            reader: Data source reader
        """
        # Extract conditions that reference partition columns
        partition_cols = reader.get_partition_columns()
        partition_filters = []
        non_partition_filters = []

        for cond in ast.where.conditions:
            if cond.column in partition_cols:
                partition_filters.append(cond)
            else:
                non_partition_filters.append(cond)

        if partition_filters:
            reader.set_partition_filters(partition_filters)

            # IMPORTANT: Remove partition filters from WHERE clause
            # Partition columns are virtual (from directory path) and don't exist in data
            # They should only be used for partition pruning, not row-level filtering
            ast.where.conditions = non_partition_filters

            self.applied = True
            self.description = f"{len(partition_filters)} partition filter(s)"

can_optimize

can_optimize(ast: SelectStatement, reader: BaseReader) -> bool

Check if partition pruning is applicable

Conditions: 1. Reader supports partition pruning 2. Query has WHERE clause 3. WHERE clause references partition columns

Parameters:

Name Type Description Default
ast SelectStatement

Parsed SQL statement

required
reader BaseReader

Data source reader

required

Returns:

Type Description
bool

True if optimization can be applied

Source code in sqlstream/optimizers/partition_pruning.py
def can_optimize(self, ast: SelectStatement, reader: BaseReader) -> bool:
    """
    Check if partition pruning is applicable

    Conditions:
    1. Reader supports partition pruning
    2. Query has WHERE clause
    3. WHERE clause references partition columns

    Args:
        ast: Parsed SQL statement
        reader: Data source reader

    Returns:
        True if optimization can be applied
    """
    # Reader must support partition pruning
    if not hasattr(reader, "supports_partition_pruning"):
        return False

    if not reader.supports_partition_pruning():
        return False

    # Must have WHERE clause
    if not ast.where:
        return False

    # Check if any filter conditions reference partition columns
    partition_cols = reader.get_partition_columns()
    if not partition_cols:
        return False

    filter_cols = {cond.column for cond in ast.where.conditions}
    if not filter_cols.intersection(partition_cols):
        return False

    return True

optimize

optimize(ast: SelectStatement, reader: BaseReader) -> None

Apply partition pruning optimization

Parameters:

Name Type Description Default
ast SelectStatement

Parsed SQL statement

required
reader BaseReader

Data source reader

required
Source code in sqlstream/optimizers/partition_pruning.py
def optimize(self, ast: SelectStatement, reader: BaseReader) -> None:
    """
    Apply partition pruning optimization

    Args:
        ast: Parsed SQL statement
        reader: Data source reader
    """
    # Extract conditions that reference partition columns
    partition_cols = reader.get_partition_columns()
    partition_filters = []
    non_partition_filters = []

    for cond in ast.where.conditions:
        if cond.column in partition_cols:
            partition_filters.append(cond)
        else:
            non_partition_filters.append(cond)

    if partition_filters:
        reader.set_partition_filters(partition_filters)

        # IMPORTANT: Remove partition filters from WHERE clause
        # Partition columns are virtual (from directory path) and don't exist in data
        # They should only be used for partition pruning, not row-level filtering
        ast.where.conditions = non_partition_filters

        self.applied = True
        self.description = f"{len(partition_filters)} partition filter(s)"

QueryPlanner

QueryPlanner

Query planner and optimizer orchestrator

Applies a pipeline of optimizations to improve query performance: 1. Join reordering - optimize join order for performance 2. Partition pruning - skip entire partitions/files based on filters 3. Predicate pushdown - push WHERE filters to readers 4. Column pruning - tell readers which columns to read 5. Limit pushdown - early termination for LIMIT queries 6. Projection pushdown - push computed expressions (future)

The planner modifies the reader in-place with optimization hints.

Example
planner = QueryPlanner()
planner.optimize(ast, reader)
print(planner.get_optimization_summary())
Source code in sqlstream/optimizers/planner.py
class QueryPlanner:
    """
    Query planner and optimizer orchestrator

    Applies a pipeline of optimizations to improve query performance:
    1. Join reordering - optimize join order for performance
    2. Partition pruning - skip entire partitions/files based on filters
    3. Predicate pushdown - push WHERE filters to readers
    4. Column pruning - tell readers which columns to read
    5. Limit pushdown - early termination for LIMIT queries
    6. Projection pushdown - push computed expressions (future)

    The planner modifies the reader in-place with optimization hints.

    Example:
        ```python
        planner = QueryPlanner()
        planner.optimize(ast, reader)
        print(planner.get_optimization_summary())
        ```
    """

    def __init__(self):
        """
        Initialize planner with default optimization pipeline

        The order matters:
        1. Join reordering first (affects join execution plan)
        2. Partition pruning second (can skip entire files!)
        3. Predicate pushdown third (reduces data read)
        4. Column pruning fourth (narrows columns)
        5. Limit pushdown fifth (early termination)
        6. Projection pushdown last (transform data at source)
        """
        self.pipeline = OptimizerPipeline(
            [
                JoinReorderingOptimizer(),
                PartitionPruningOptimizer(),
                PredicatePushdownOptimizer(),
                ColumnPruningOptimizer(),
                LimitPushdownOptimizer(),
                ProjectionPushdownOptimizer(),
            ]
        )
        # For backward compatibility with old API
        self.optimizations_applied: list[str] = []

    def optimize(self, ast: SelectStatement, reader: BaseReader) -> None:
        """
        Apply all applicable optimizations

        Args:
            ast: Parsed SQL statement
            reader: Data source reader

        Modifies:
            - reader: Sets optimization hints (filters, columns, limit, etc.)
            - self.optimizations_applied: List of applied optimizations
        """
        # Run the optimization pipeline
        self.pipeline.optimize(ast, reader)

        # Update the backward-compatible optimizations_applied list
        self.optimizations_applied = self.pipeline.get_applied_optimizations()

    def get_optimization_summary(self) -> str:
        """
        Get summary of optimizations applied

        Returns:
            Human-readable summary

        Example:
            "Optimizations applied:
             - Predicate pushdown: 2 condition(s)
             - Column pruning: 3 column(s) selected"
        """
        return self.pipeline.get_summary()

    def get_optimizers(self) -> list:
        """
        Get list of all optimizers in the pipeline

        Returns:
            List of optimizer instances
        """
        return self.pipeline.optimizers

    def add_optimizer(self, optimizer: Optimizer) -> None:
        """
        Add a custom optimizer to the pipeline

        Args:
            optimizer: Optimizer instance to add

        Example:
            ```python
            planner = QueryPlanner()
            planner.add_optimizer(MyCustomOptimizer())
            ```
        """
        self.pipeline.optimizers.append(optimizer)

__init__

__init__()

Initialize planner with default optimization pipeline

The order matters: 1. Join reordering first (affects join execution plan) 2. Partition pruning second (can skip entire files!) 3. Predicate pushdown third (reduces data read) 4. Column pruning fourth (narrows columns) 5. Limit pushdown fifth (early termination) 6. Projection pushdown last (transform data at source)

Source code in sqlstream/optimizers/planner.py
def __init__(self):
    """
    Initialize planner with default optimization pipeline

    The order matters:
    1. Join reordering first (affects join execution plan)
    2. Partition pruning second (can skip entire files!)
    3. Predicate pushdown third (reduces data read)
    4. Column pruning fourth (narrows columns)
    5. Limit pushdown fifth (early termination)
    6. Projection pushdown last (transform data at source)
    """
    self.pipeline = OptimizerPipeline(
        [
            JoinReorderingOptimizer(),
            PartitionPruningOptimizer(),
            PredicatePushdownOptimizer(),
            ColumnPruningOptimizer(),
            LimitPushdownOptimizer(),
            ProjectionPushdownOptimizer(),
        ]
    )
    # For backward compatibility with old API
    self.optimizations_applied: list[str] = []

optimize

optimize(ast: SelectStatement, reader: BaseReader) -> None

Apply all applicable optimizations

Parameters:

Name Type Description Default
ast SelectStatement

Parsed SQL statement

required
reader BaseReader

Data source reader

required
Modifies
  • reader: Sets optimization hints (filters, columns, limit, etc.)
  • self.optimizations_applied: List of applied optimizations
Source code in sqlstream/optimizers/planner.py
def optimize(self, ast: SelectStatement, reader: BaseReader) -> None:
    """
    Apply all applicable optimizations

    Args:
        ast: Parsed SQL statement
        reader: Data source reader

    Modifies:
        - reader: Sets optimization hints (filters, columns, limit, etc.)
        - self.optimizations_applied: List of applied optimizations
    """
    # Run the optimization pipeline
    self.pipeline.optimize(ast, reader)

    # Update the backward-compatible optimizations_applied list
    self.optimizations_applied = self.pipeline.get_applied_optimizations()

get_optimization_summary

get_optimization_summary() -> str

Get summary of optimizations applied

Returns:

Type Description
str

Human-readable summary

Example

"Optimizations applied: - Predicate pushdown: 2 condition(s) - Column pruning: 3 column(s) selected"

Source code in sqlstream/optimizers/planner.py
def get_optimization_summary(self) -> str:
    """
    Get summary of optimizations applied

    Returns:
        Human-readable summary

    Example:
        "Optimizations applied:
         - Predicate pushdown: 2 condition(s)
         - Column pruning: 3 column(s) selected"
    """
    return self.pipeline.get_summary()

get_optimizers

get_optimizers() -> list

Get list of all optimizers in the pipeline

Returns:

Type Description
list

List of optimizer instances

Source code in sqlstream/optimizers/planner.py
def get_optimizers(self) -> list:
    """
    Get list of all optimizers in the pipeline

    Returns:
        List of optimizer instances
    """
    return self.pipeline.optimizers

add_optimizer

add_optimizer(optimizer: Optimizer) -> None

Add a custom optimizer to the pipeline

Parameters:

Name Type Description Default
optimizer Optimizer

Optimizer instance to add

required
Example
planner = QueryPlanner()
planner.add_optimizer(MyCustomOptimizer())
Source code in sqlstream/optimizers/planner.py
def add_optimizer(self, optimizer: Optimizer) -> None:
    """
    Add a custom optimizer to the pipeline

    Args:
        optimizer: Optimizer instance to add

    Example:
        ```python
        planner = QueryPlanner()
        planner.add_optimizer(MyCustomOptimizer())
        ```
    """
    self.pipeline.optimizers.append(optimizer)

PredicatePushdownOptimizer

PredicatePushdownOptimizer

Bases: Optimizer

Push WHERE conditions to the reader

Benefits: - Reduces I/O by filtering at the source - Reduces memory usage - Especially effective for columnar formats (Parquet) - Can leverage indexes if available

Example

SELECT * FROM data WHERE age > 30

Without pushdown: Read all rows → Filter in memory With pushdown: Filter while reading → Less data read

Source code in sqlstream/optimizers/predicate_pushdown.py
class PredicatePushdownOptimizer(Optimizer):
    """
    Push WHERE conditions to the reader

    Benefits:
    - Reduces I/O by filtering at the source
    - Reduces memory usage
    - Especially effective for columnar formats (Parquet)
    - Can leverage indexes if available

    Example:
        SELECT * FROM data WHERE age > 30

        Without pushdown: Read all rows → Filter in memory
        With pushdown: Filter while reading → Less data read
    """

    def get_name(self) -> str:
        return "Predicate pushdown"

    def can_optimize(self, ast: SelectStatement, reader: BaseReader) -> bool:
        """
        Check if predicate pushdown is applicable

        Conditions:
        1. Query has WHERE clause
        2. Reader supports pushdown
        3. Not a JOIN query (complex - needs smarter analysis)

        Args:
            ast: Parsed SQL statement
            reader: Data source reader

        Returns:
            True if optimization can be applied
        """
        # Must have WHERE clause
        if not ast.where:
            return False

        # Reader must support pushdown
        if not reader.supports_pushdown():
            return False

        # Skip JOINs for now - WHERE conditions may reference either table
        # TODO: Make this smarter by analyzing which conditions apply to which table
        if ast.join:
            return False

        return True

    def optimize(self, ast: SelectStatement, reader: BaseReader) -> None:
        """
        Apply predicate pushdown optimization

        Args:
            ast: Parsed SQL statement
            reader: Data source reader
        """
        # Extract conditions that can be pushed down
        pushable = self._extract_pushable_conditions(ast.where.conditions)

        if pushable:
            reader.set_filter(pushable)
            self.applied = True
            self.description = f"{len(pushable)} condition(s)"

    def _extract_pushable_conditions(self, conditions: list[Condition]) -> list[Condition]:
        """
        Determine which conditions can be safely pushed to readers

        Pushable conditions:
        - Simple column comparisons: column op value
        - Where value is a literal (not another column or expression)

        NOT pushable (future work):
        - Complex expressions: LENGTH(name) > 5
        - Cross-column comparisons: age > salary
        - User-defined functions
        - Conditions involving aggregates

        Args:
            conditions: List of WHERE conditions

        Returns:
            List of conditions safe to push down
        """
        pushable = []

        for condition in conditions:
            # For now, all simple conditions are pushable
            # Future: Check for complex expressions, UDFs, etc.
            if self._is_simple_condition(condition):
                pushable.append(condition)

        return pushable

    def _is_simple_condition(self, condition: Condition) -> bool:
        """
        Check if condition is a simple column comparison

        Simple conditions:
        - column = value (literal)
        - column > value (literal)
        - column < value (literal)
        - etc.

        Args:
            condition: Condition to check

        Returns:
            True if condition is simple and pushable
        """
        # For now, all our conditions are simple
        # Future improvements:
        # - Check if value is a literal vs expression
        # - Detect cross-column comparisons
        # - Detect function calls
        return True

can_optimize

can_optimize(ast: SelectStatement, reader: BaseReader) -> bool

Check if predicate pushdown is applicable

Conditions: 1. Query has WHERE clause 2. Reader supports pushdown 3. Not a JOIN query (complex - needs smarter analysis)

Parameters:

Name Type Description Default
ast SelectStatement

Parsed SQL statement

required
reader BaseReader

Data source reader

required

Returns:

Type Description
bool

True if optimization can be applied

Source code in sqlstream/optimizers/predicate_pushdown.py
def can_optimize(self, ast: SelectStatement, reader: BaseReader) -> bool:
    """
    Check if predicate pushdown is applicable

    Conditions:
    1. Query has WHERE clause
    2. Reader supports pushdown
    3. Not a JOIN query (complex - needs smarter analysis)

    Args:
        ast: Parsed SQL statement
        reader: Data source reader

    Returns:
        True if optimization can be applied
    """
    # Must have WHERE clause
    if not ast.where:
        return False

    # Reader must support pushdown
    if not reader.supports_pushdown():
        return False

    # Skip JOINs for now - WHERE conditions may reference either table
    # TODO: Make this smarter by analyzing which conditions apply to which table
    if ast.join:
        return False

    return True

optimize

optimize(ast: SelectStatement, reader: BaseReader) -> None

Apply predicate pushdown optimization

Parameters:

Name Type Description Default
ast SelectStatement

Parsed SQL statement

required
reader BaseReader

Data source reader

required
Source code in sqlstream/optimizers/predicate_pushdown.py
def optimize(self, ast: SelectStatement, reader: BaseReader) -> None:
    """
    Apply predicate pushdown optimization

    Args:
        ast: Parsed SQL statement
        reader: Data source reader
    """
    # Extract conditions that can be pushed down
    pushable = self._extract_pushable_conditions(ast.where.conditions)

    if pushable:
        reader.set_filter(pushable)
        self.applied = True
        self.description = f"{len(pushable)} condition(s)"

ProjectionPushdownOptimizer

ProjectionPushdownOptimizer

Bases: Optimizer

Push projection (SELECT expressions) to the reader

Benefits (when implemented): - Evaluate expressions at read time - Reduce data movement - Leverage database/engine native functions

Example (future): SELECT UPPER(name), age * 2 FROM data

With pushdown: Reader evaluates UPPER() and age*2
Without: Read raw data → Apply transformations later

Status: Placeholder - not yet implemented Reason: Requires expression evaluation framework

Source code in sqlstream/optimizers/projection_pushdown.py
class ProjectionPushdownOptimizer(Optimizer):
    """
    Push projection (SELECT expressions) to the reader

    Benefits (when implemented):
    - Evaluate expressions at read time
    - Reduce data movement
    - Leverage database/engine native functions

    Example (future):
        SELECT UPPER(name), age * 2 FROM data

        With pushdown: Reader evaluates UPPER() and age*2
        Without: Read raw data → Apply transformations later

    Status: Placeholder - not yet implemented
    Reason: Requires expression evaluation framework
    """

    def get_name(self) -> str:
        return "Projection pushdown"

    def can_optimize(self, ast: SelectStatement, reader: BaseReader) -> bool:
        """
        Check if projection pushdown is applicable

        Currently always returns False as this is not yet implemented.

        Future conditions:
        1. Reader supports expression evaluation
        2. Expressions are supported by reader (native functions)
        3. Not complex nested expressions

        Args:
            ast: Parsed SQL statement
            reader: Data source reader

        Returns:
            False (not yet implemented)
        """
        # TODO: Implement when we have expression evaluation framework
        return False

    def optimize(self, ast: SelectStatement, reader: BaseReader) -> None:
        """
        Apply projection pushdown optimization

        Currently a no-op placeholder.

        Args:
            ast: Parsed SQL statement
            reader: Data source reader
        """
        # Placeholder for future implementation
        pass

can_optimize

can_optimize(ast: SelectStatement, reader: BaseReader) -> bool

Check if projection pushdown is applicable

Currently always returns False as this is not yet implemented.

Future conditions: 1. Reader supports expression evaluation 2. Expressions are supported by reader (native functions) 3. Not complex nested expressions

Parameters:

Name Type Description Default
ast SelectStatement

Parsed SQL statement

required
reader BaseReader

Data source reader

required

Returns:

Type Description
bool

False (not yet implemented)

Source code in sqlstream/optimizers/projection_pushdown.py
def can_optimize(self, ast: SelectStatement, reader: BaseReader) -> bool:
    """
    Check if projection pushdown is applicable

    Currently always returns False as this is not yet implemented.

    Future conditions:
    1. Reader supports expression evaluation
    2. Expressions are supported by reader (native functions)
    3. Not complex nested expressions

    Args:
        ast: Parsed SQL statement
        reader: Data source reader

    Returns:
        False (not yet implemented)
    """
    # TODO: Implement when we have expression evaluation framework
    return False

optimize

optimize(ast: SelectStatement, reader: BaseReader) -> None

Apply projection pushdown optimization

Currently a no-op placeholder.

Parameters:

Name Type Description Default
ast SelectStatement

Parsed SQL statement

required
reader BaseReader

Data source reader

required
Source code in sqlstream/optimizers/projection_pushdown.py
def optimize(self, ast: SelectStatement, reader: BaseReader) -> None:
    """
    Apply projection pushdown optimization

    Currently a no-op placeholder.

    Args:
        ast: Parsed SQL statement
        reader: Data source reader
    """
    # Placeholder for future implementation
    pass