Skip to content

Operators Reference

Query execution operators.

Operator

Operator

Base class for all query operators

Operators form a tree where: - Leaf operators (e.g., Scan) read from data sources - Internal operators (e.g., Filter, Project) transform data - Root operator is pulled by the executor to get results

The pull-based execution model means: - Operators are lazy (generators) - Data flows through the tree on-demand - Memory usage is O(pipeline depth), not O(data size)

Source code in sqlstream/operators/base.py
class Operator:
    """
    Base class for all query operators

    Operators form a tree where:
    - Leaf operators (e.g., Scan) read from data sources
    - Internal operators (e.g., Filter, Project) transform data
    - Root operator is pulled by the executor to get results

    The pull-based execution model means:
    - Operators are lazy (generators)
    - Data flows through the tree on-demand
    - Memory usage is O(pipeline depth), not O(data size)
    """

    def __init__(self, child: Optional["Operator"] = None):
        """
        Initialize operator

        Args:
            child: Child operator to pull data from (None for leaf operators)
        """
        self.child = child

    def __iter__(self) -> Iterator[dict[str, Any]]:
        """
        Execute operator and yield results

        This is the core method that defines operator behavior.
        Subclasses must implement this to define how they process data.

        Yields:
            Rows as dictionaries
        """
        raise NotImplementedError(f"{self.__class__.__name__} must implement __iter__()")

    def __repr__(self) -> str:
        """String representation for debugging"""
        return f"{self.__class__.__name__}()"

__init__

__init__(child: Optional[Operator] = None)

Initialize operator

Parameters:

Name Type Description Default
child Optional[Operator]

Child operator to pull data from (None for leaf operators)

None
Source code in sqlstream/operators/base.py
def __init__(self, child: Optional["Operator"] = None):
    """
    Initialize operator

    Args:
        child: Child operator to pull data from (None for leaf operators)
    """
    self.child = child

__iter__

__iter__() -> Iterator[dict[str, Any]]

Execute operator and yield results

This is the core method that defines operator behavior. Subclasses must implement this to define how they process data.

Yields:

Type Description
dict[str, Any]

Rows as dictionaries

Source code in sqlstream/operators/base.py
def __iter__(self) -> Iterator[dict[str, Any]]:
    """
    Execute operator and yield results

    This is the core method that defines operator behavior.
    Subclasses must implement this to define how they process data.

    Yields:
        Rows as dictionaries
    """
    raise NotImplementedError(f"{self.__class__.__name__} must implement __iter__()")

__repr__

__repr__() -> str

String representation for debugging

Source code in sqlstream/operators/base.py
def __repr__(self) -> str:
    """String representation for debugging"""
    return f"{self.__class__.__name__}()"

Filter

Filter

Bases: Operator

Filter operator - evaluates WHERE conditions

Pulls rows from child and only yields those that satisfy all conditions (AND logic).

Source code in sqlstream/operators/filter.py
class Filter(Operator):
    """
    Filter operator - evaluates WHERE conditions

    Pulls rows from child and only yields those that satisfy
    all conditions (AND logic).
    """

    def __init__(self, child: Operator, conditions: list[Condition]):
        """
        Initialize filter operator

        Args:
            child: Child operator to pull rows from
            conditions: List of conditions (AND'd together)
        """
        super().__init__(child)
        self.conditions = conditions

    def __iter__(self) -> Iterator[dict[str, Any]]:
        """
        Yield only rows that match all conditions

        For each row from child:
        1. Evaluate all conditions
        2. If all are True, yield the row
        3. Otherwise, skip it
        """
        for row in self.child:
            if self._matches(row):
                yield row

    def _matches(self, row: dict[str, Any]) -> bool:
        """
        Check if row matches all conditions

        Args:
            row: Row to check

        Returns:
            True if all conditions are satisfied (AND logic)
        """
        for condition in self.conditions:
            if not self._evaluate_condition(row, condition):
                return False
        return True

    def _evaluate_condition(self, row: dict[str, Any], condition: Condition) -> bool:
        """
        Evaluate a single condition against a row

        Args:
            row: Row to check
            condition: Condition to evaluate

        Returns:
            True if condition is satisfied
        """
        # Get column value
        if condition.column not in row:
            return False

        value = row[condition.column]

        # Handle NULL values
        if value is None:
            return False

        # Get expected value
        expected = condition.value

        # Evaluate operator
        op = condition.operator

        try:
            if op == "=":
                return value == expected
            elif op == ">":
                return value > expected
            elif op == "<":
                return value < expected
            elif op == ">=":
                return value >= expected
            elif op == "<=":
                return value <= expected
            elif op == "!=":
                return value != expected
            else:
                # Unknown operator - default to True to avoid filtering
                return True

        except TypeError:
            # Type mismatch (e.g., comparing string to int)
            return False

    def __repr__(self) -> str:
        cond_str = " AND ".join(str(c) for c in self.conditions)
        return f"Filter({cond_str})"

__init__

__init__(child: Operator, conditions: list[Condition])

Initialize filter operator

Parameters:

Name Type Description Default
child Operator

Child operator to pull rows from

required
conditions list[Condition]

List of conditions (AND'd together)

required
Source code in sqlstream/operators/filter.py
def __init__(self, child: Operator, conditions: list[Condition]):
    """
    Initialize filter operator

    Args:
        child: Child operator to pull rows from
        conditions: List of conditions (AND'd together)
    """
    super().__init__(child)
    self.conditions = conditions

__iter__

__iter__() -> Iterator[dict[str, Any]]

Yield only rows that match all conditions

For each row from child: 1. Evaluate all conditions 2. If all are True, yield the row 3. Otherwise, skip it

Source code in sqlstream/operators/filter.py
def __iter__(self) -> Iterator[dict[str, Any]]:
    """
    Yield only rows that match all conditions

    For each row from child:
    1. Evaluate all conditions
    2. If all are True, yield the row
    3. Otherwise, skip it
    """
    for row in self.child:
        if self._matches(row):
            yield row

GroupByOperator

GroupByOperator

Bases: Operator

GROUP BY operator with aggregation

Uses hash-based aggregation: 1. Scan all input rows 2. Group by key columns 3. Maintain aggregators for each group 4. Yield one row per group

Note: This operator materializes all data in memory (not lazy). For large datasets, consider external sorting/grouping.

Source code in sqlstream/operators/groupby.py
class GroupByOperator(Operator):
    """
    GROUP BY operator with aggregation

    Uses hash-based aggregation:
    1. Scan all input rows
    2. Group by key columns
    3. Maintain aggregators for each group
    4. Yield one row per group

    Note: This operator materializes all data in memory (not lazy).
    For large datasets, consider external sorting/grouping.
    """

    def __init__(
        self,
        source: Operator,
        group_by_columns: list[str],
        aggregates: list[AggregateFunction],
        select_columns: list[str],
    ):
        """
        Initialize GroupBy operator

        Args:
            source: Source operator
            group_by_columns: List of columns to group by
            aggregates: List of aggregate functions to compute
            select_columns: List of columns in SELECT clause (for output order)
        """
        super().__init__(source)
        self.group_by_columns = group_by_columns
        self.aggregates = aggregates
        self.select_columns = select_columns

    def __iter__(self) -> Iterator[dict[str, Any]]:
        """
        Execute GROUP BY aggregation

        Yields:
            One row per group with group columns and aggregated values
        """
        # Hash map: group_key -> aggregators
        groups: dict[tuple, list] = {}

        # Scan all input rows and build groups
        for row in self.child:
            # Extract group key
            group_key = self._extract_group_key(row)

            # Initialize aggregators for new group
            if group_key not in groups:
                groups[group_key] = self._create_aggregators()

            # Update aggregators
            aggregators = groups[group_key]
            for i, agg_func in enumerate(self.aggregates):
                value = row.get(agg_func.column) if agg_func.column != "*" else None
                aggregators[i].update(value)

        # Yield one row per group
        for group_key, aggregators in groups.items():
            row = self._build_output_row(group_key, aggregators)
            yield row

    def _extract_group_key(self, row: dict[str, Any]) -> tuple:
        """
        Extract group key from row

        Args:
            row: Input row

        Returns:
            Tuple of group key values
        """
        key_values = []
        for col in self.group_by_columns:
            value = row.get(col)
            # Handle unhashable types (e.g., lists, dicts)
            # Convert to string representation for hashing
            if isinstance(value, (list, dict)):
                value = str(value)
            key_values.append(value)

        return tuple(key_values)

    def _create_aggregators(self) -> list:
        """
        Create fresh aggregators for a new group

        Returns:
            List of aggregator instances
        """
        aggregators = []
        for agg_func in self.aggregates:
            aggregator = create_aggregator(agg_func.function, agg_func.column)
            aggregators.append(aggregator)
        return aggregators

    def _build_output_row(self, group_key: tuple, aggregators: list) -> dict[str, Any]:
        """
        Build output row from group key and aggregated values

        Args:
            group_key: Tuple of group key values
            aggregators: List of aggregators with final values

        Returns:
            Output row dictionary
        """
        row = {}

        # Add group key columns
        for i, col_name in enumerate(self.group_by_columns):
            row[col_name] = group_key[i]

        # Add aggregated columns
        for i, agg_func in enumerate(self.aggregates):
            # Use alias if provided, otherwise generate name
            col_name = (
                agg_func.alias
                if agg_func.alias
                else f"{agg_func.function.lower()}_{agg_func.column}"
            )
            row[col_name] = aggregators[i].result()

        return row

    def explain(self, indent: int = 0) -> list[str]:
        """Generate execution plan explanation"""
        lines = [" " * indent + f"GroupBy(keys={self.group_by_columns})"]

        # Show aggregate functions
        for agg in self.aggregates:
            lines.append(" " * (indent + 2) + f"→ {agg}")

        # Add source explanation
        lines.extend(self.child.explain(indent + 2))

        return lines

__init__

__init__(source: Operator, group_by_columns: list[str], aggregates: list[AggregateFunction], select_columns: list[str])

Initialize GroupBy operator

Parameters:

Name Type Description Default
source Operator

Source operator

required
group_by_columns list[str]

List of columns to group by

required
aggregates list[AggregateFunction]

List of aggregate functions to compute

required
select_columns list[str]

List of columns in SELECT clause (for output order)

required
Source code in sqlstream/operators/groupby.py
def __init__(
    self,
    source: Operator,
    group_by_columns: list[str],
    aggregates: list[AggregateFunction],
    select_columns: list[str],
):
    """
    Initialize GroupBy operator

    Args:
        source: Source operator
        group_by_columns: List of columns to group by
        aggregates: List of aggregate functions to compute
        select_columns: List of columns in SELECT clause (for output order)
    """
    super().__init__(source)
    self.group_by_columns = group_by_columns
    self.aggregates = aggregates
    self.select_columns = select_columns

__iter__

__iter__() -> Iterator[dict[str, Any]]

Execute GROUP BY aggregation

Yields:

Type Description
dict[str, Any]

One row per group with group columns and aggregated values

Source code in sqlstream/operators/groupby.py
def __iter__(self) -> Iterator[dict[str, Any]]:
    """
    Execute GROUP BY aggregation

    Yields:
        One row per group with group columns and aggregated values
    """
    # Hash map: group_key -> aggregators
    groups: dict[tuple, list] = {}

    # Scan all input rows and build groups
    for row in self.child:
        # Extract group key
        group_key = self._extract_group_key(row)

        # Initialize aggregators for new group
        if group_key not in groups:
            groups[group_key] = self._create_aggregators()

        # Update aggregators
        aggregators = groups[group_key]
        for i, agg_func in enumerate(self.aggregates):
            value = row.get(agg_func.column) if agg_func.column != "*" else None
            aggregators[i].update(value)

    # Yield one row per group
    for group_key, aggregators in groups.items():
        row = self._build_output_row(group_key, aggregators)
        yield row

explain

explain(indent: int = 0) -> list[str]

Generate execution plan explanation

Source code in sqlstream/operators/groupby.py
def explain(self, indent: int = 0) -> list[str]:
    """Generate execution plan explanation"""
    lines = [" " * indent + f"GroupBy(keys={self.group_by_columns})"]

    # Show aggregate functions
    for agg in self.aggregates:
        lines.append(" " * (indent + 2) + f"→ {agg}")

    # Add source explanation
    lines.extend(self.child.explain(indent + 2))

    return lines

HashJoinOperator

HashJoinOperator

Bases: Operator

Hash Join operator for equi-joins

Supports: - INNER JOIN: Only matching rows from both tables - LEFT JOIN: All rows from left, matched rows from right (NULL if no match) - RIGHT JOIN: All rows from right, matched rows from left (NULL if no match)

Algorithm: 1. Build hash table from right table (keyed by join column) 2. Probe hash table with rows from left table 3. Output joined rows based on join type

Note: This operator materializes the right table in memory. For very large right tables, consider external hash join or other algorithms.

Source code in sqlstream/operators/join.py
class HashJoinOperator(Operator):
    """
    Hash Join operator for equi-joins

    Supports:
    - INNER JOIN: Only matching rows from both tables
    - LEFT JOIN: All rows from left, matched rows from right (NULL if no match)
    - RIGHT JOIN: All rows from right, matched rows from left (NULL if no match)

    Algorithm:
    1. Build hash table from right table (keyed by join column)
    2. Probe hash table with rows from left table
    3. Output joined rows based on join type

    Note: This operator materializes the right table in memory.
    For very large right tables, consider external hash join or other algorithms.
    """

    def __init__(
        self,
        left: Operator,
        right: Operator,
        join_type: str,
        left_key: str,
        right_key: str,
    ):
        """
        Initialize Hash Join operator

        Args:
            left: Left table operator
            right: Right table operator
            join_type: 'INNER', 'LEFT', or 'RIGHT'
            left_key: Column name in left table for join condition
            right_key: Column name in right table for join condition
        """
        # Store both children (join has two inputs)
        super().__init__(left)
        self.left = left
        self.right = right
        self.join_type = join_type.upper()
        self.left_key = left_key
        self.right_key = right_key

        # Validate join type
        if self.join_type not in ("INNER", "LEFT", "RIGHT"):
            raise ValueError(f"Unsupported join type: {join_type}")

    def __iter__(self) -> Iterator[dict[str, Any]]:
        """
        Execute hash join

        Yields:
            Joined rows with columns from both tables
        """
        if self.join_type == "INNER":
            yield from self._inner_join()
        elif self.join_type == "LEFT":
            yield from self._left_join()
        elif self.join_type == "RIGHT":
            yield from self._right_join()

    def _inner_join(self) -> Iterator[dict[str, Any]]:
        """
        Execute INNER JOIN

        Returns only rows that have matching join keys in both tables.
        """
        # Build phase: Create hash table from right table
        hash_table = self._build_hash_table()

        # Probe phase: Scan left table and find matches
        for left_row in self.left:
            join_key = left_row.get(self.left_key)

            # Skip rows with NULL join key (standard SQL behavior)
            if join_key is None:
                continue

            # Probe hash table
            if join_key in hash_table:
                # Found match(es) - join with all matching right rows
                for right_row in hash_table[join_key]:
                    yield self._merge_rows(left_row, right_row)

    def _left_join(self) -> Iterator[dict[str, Any]]:
        """
        Execute LEFT JOIN

        Returns all rows from left table. If there's a match in right table,
        include right columns. If no match, right columns are NULL.
        """
        # Build phase: Create hash table from right table
        hash_table = self._build_hash_table()

        # Probe phase: Scan left table
        for left_row in self.left:
            join_key = left_row.get(self.left_key)

            # Check for match
            if join_key is not None and join_key in hash_table:
                # Found match(es) - join with all matching right rows
                for right_row in hash_table[join_key]:
                    yield self._merge_rows(left_row, right_row)
            else:
                # No match - output left row with NULL for right columns
                yield self._merge_rows(left_row, None)

    def _right_join(self) -> Iterator[dict[str, Any]]:
        """
        Execute RIGHT JOIN

        Returns all rows from right table. If there's a match in left table,
        include left columns. If no match, left columns are NULL.
        """
        # Build phase: Create hash table from right table
        # Also track which right rows were matched
        hash_table = self._build_hash_table()
        matched_right_rows = set()  # Track (join_key, row_index) tuples

        # Probe phase: Scan left table and output matches
        for left_row in self.left:
            join_key = left_row.get(self.left_key)

            if join_key is not None and join_key in hash_table:
                # Found match(es) - join with all matching right rows
                for idx, right_row in enumerate(hash_table[join_key]):
                    yield self._merge_rows(left_row, right_row)
                    # Mark this right row as matched
                    matched_right_rows.add((join_key, idx))

        # Output unmatched right rows with NULL for left columns
        for join_key, right_rows in hash_table.items():
            for idx, right_row in enumerate(right_rows):
                if (join_key, idx) not in matched_right_rows:
                    yield self._merge_rows(None, right_row)

    def _build_hash_table(self) -> dict[Any, list[dict[str, Any]]]:
        """
        Build hash table from right table

        Returns:
            Hash table mapping join key values to lists of matching rows
        """
        hash_table: dict[Any, list[dict[str, Any]]] = {}

        for row in self.right:
            join_key = row.get(self.right_key)

            # Skip rows with NULL join key (they can never match)
            if join_key is None:
                continue

            # Handle unhashable types (e.g., lists, dicts)
            if isinstance(join_key, (list, dict)):
                join_key = str(join_key)

            # Add row to hash table
            if join_key not in hash_table:
                hash_table[join_key] = []
            hash_table[join_key].append(row)

        return hash_table

    def _merge_rows(
        self, left_row: dict[str, Any] | None, right_row: dict[str, Any] | None
    ) -> dict[str, Any]:
        """
        Merge left and right rows into a single output row

        Handles column name conflicts by prefixing with table names if needed.

        Args:
            left_row: Row from left table (None for RIGHT JOIN with no match)
            right_row: Row from right table (None for LEFT JOIN with no match)

        Returns:
            Merged row dictionary
        """
        result = {}

        # Add left columns
        if left_row is not None:
            result.update(left_row)
        elif right_row is not None:
            # For RIGHT JOIN with no match, add NULL for all left columns
            # We don't know the left schema, so we just don't add anything
            # The columns will be added on first matched row
            pass

        # Add right columns
        if right_row is not None:
            for key, value in right_row.items():
                # Handle column name conflicts
                if key in result and left_row is not None:
                    # Column exists in both tables - this shouldn't happen with
                    # proper column qualification in SELECT, but handle it
                    # by keeping the left value and prefixing right with "right_"
                    result[f"right_{key}"] = value
                else:
                    result[key] = value

        return result

    def explain(self, indent: int = 0) -> list[str]:
        """Generate execution plan explanation"""
        lines = [" " * indent + f"HashJoin({self.join_type}, {self.left_key} = {self.right_key})"]
        lines.append(" " * (indent + 2) + "Left:")
        lines.extend(self.left.explain(indent + 4))
        lines.append(" " * (indent + 2) + "Right:")
        lines.extend(self.right.explain(indent + 4))
        return lines

__init__

__init__(left: Operator, right: Operator, join_type: str, left_key: str, right_key: str)

Initialize Hash Join operator

Parameters:

Name Type Description Default
left Operator

Left table operator

required
right Operator

Right table operator

required
join_type str

'INNER', 'LEFT', or 'RIGHT'

required
left_key str

Column name in left table for join condition

required
right_key str

Column name in right table for join condition

required
Source code in sqlstream/operators/join.py
def __init__(
    self,
    left: Operator,
    right: Operator,
    join_type: str,
    left_key: str,
    right_key: str,
):
    """
    Initialize Hash Join operator

    Args:
        left: Left table operator
        right: Right table operator
        join_type: 'INNER', 'LEFT', or 'RIGHT'
        left_key: Column name in left table for join condition
        right_key: Column name in right table for join condition
    """
    # Store both children (join has two inputs)
    super().__init__(left)
    self.left = left
    self.right = right
    self.join_type = join_type.upper()
    self.left_key = left_key
    self.right_key = right_key

    # Validate join type
    if self.join_type not in ("INNER", "LEFT", "RIGHT"):
        raise ValueError(f"Unsupported join type: {join_type}")

__iter__

__iter__() -> Iterator[dict[str, Any]]

Execute hash join

Yields:

Type Description
dict[str, Any]

Joined rows with columns from both tables

Source code in sqlstream/operators/join.py
def __iter__(self) -> Iterator[dict[str, Any]]:
    """
    Execute hash join

    Yields:
        Joined rows with columns from both tables
    """
    if self.join_type == "INNER":
        yield from self._inner_join()
    elif self.join_type == "LEFT":
        yield from self._left_join()
    elif self.join_type == "RIGHT":
        yield from self._right_join()

explain

explain(indent: int = 0) -> list[str]

Generate execution plan explanation

Source code in sqlstream/operators/join.py
def explain(self, indent: int = 0) -> list[str]:
    """Generate execution plan explanation"""
    lines = [" " * indent + f"HashJoin({self.join_type}, {self.left_key} = {self.right_key})"]
    lines.append(" " * (indent + 2) + "Left:")
    lines.extend(self.left.explain(indent + 4))
    lines.append(" " * (indent + 2) + "Right:")
    lines.extend(self.right.explain(indent + 4))
    return lines

Limit

Limit

Bases: Operator

Limit operator - restricts number of rows (LIMIT clause)

Pulls rows from child and yields only the first N rows. This allows for early termination - we stop pulling from child once we've yielded enough rows.

Source code in sqlstream/operators/limit.py
class Limit(Operator):
    """
    Limit operator - restricts number of rows (LIMIT clause)

    Pulls rows from child and yields only the first N rows.
    This allows for early termination - we stop pulling from
    child once we've yielded enough rows.
    """

    def __init__(self, child: Operator, limit: int):
        """
        Initialize limit operator

        Args:
            child: Child operator to pull rows from
            limit: Maximum number of rows to yield
        """
        super().__init__(child)
        self.limit = limit

    def __iter__(self) -> Iterator[dict[str, Any]]:
        """
        Yield at most limit rows

        This is efficient because it stops pulling from child
        as soon as we've yielded enough rows (early termination).
        """
        count = 0

        for row in self.child:
            if count >= self.limit:
                break

            yield row
            count += 1

    def __repr__(self) -> str:
        return f"Limit({self.limit})"

__init__

__init__(child: Operator, limit: int)

Initialize limit operator

Parameters:

Name Type Description Default
child Operator

Child operator to pull rows from

required
limit int

Maximum number of rows to yield

required
Source code in sqlstream/operators/limit.py
def __init__(self, child: Operator, limit: int):
    """
    Initialize limit operator

    Args:
        child: Child operator to pull rows from
        limit: Maximum number of rows to yield
    """
    super().__init__(child)
    self.limit = limit

__iter__

__iter__() -> Iterator[dict[str, Any]]

Yield at most limit rows

This is efficient because it stops pulling from child as soon as we've yielded enough rows (early termination).

Source code in sqlstream/operators/limit.py
def __iter__(self) -> Iterator[dict[str, Any]]:
    """
    Yield at most limit rows

    This is efficient because it stops pulling from child
    as soon as we've yielded enough rows (early termination).
    """
    count = 0

    for row in self.child:
        if count >= self.limit:
            break

        yield row
        count += 1

OrderByOperator

OrderByOperator

Bases: Operator

ORDER BY operator

Sorts all input rows by specified columns.

Note: This operator materializes all data in memory (not lazy). For large datasets that don't fit in memory, consider external sorting.

Source code in sqlstream/operators/orderby.py
class OrderByOperator(Operator):
    """
    ORDER BY operator

    Sorts all input rows by specified columns.

    Note: This operator materializes all data in memory (not lazy).
    For large datasets that don't fit in memory, consider external sorting.
    """

    def __init__(self, source: Operator, order_by: list[OrderByColumn]):
        """
        Initialize OrderBy operator

        Args:
            source: Source operator
            order_by: List of OrderByColumn specifications
        """
        super().__init__(source)
        self.order_by = order_by

    def __iter__(self) -> Iterator[dict[str, Any]]:
        """
        Execute ORDER BY sorting

        Yields:
            Rows in sorted order
        """
        # Materialize all rows
        rows = list(self.child)

        # Sort rows using multi-key sort
        sorted_rows = sorted(rows, key=self._sort_key)

        # Yield sorted rows
        yield from sorted_rows

    def _sort_key(self, row: dict[str, Any]) -> tuple:
        """
        Generate sort key for a row

        Args:
            row: Input row

        Returns:
            Tuple of (value, reverse_flag) for multi-key sorting
        """
        key_parts = []

        for order_col in self.order_by:
            value = row.get(order_col.column)

            # Handle NULL values - sort them last
            if value is None:
                # Use a sentinel that sorts last
                value = (1, None)  # (sort_last_flag, None)
            else:
                value = (0, value)  # (sort_first_flag, actual_value)

            # For DESC, we need to reverse the comparison
            if order_col.direction == "DESC":
                # Invert the sort order by negating numbers or using reverse wrapper
                if isinstance(value[1], (int, float)):
                    value = (value[0], -value[1] if value[1] is not None else None)
                else:
                    # For non-numeric types, we'll use a reverse wrapper
                    value = (value[0], ReverseCompare(value[1]))

            key_parts.append(value)

        return tuple(key_parts)

    def explain(self, indent: int = 0) -> list[str]:
        """Generate execution plan explanation"""
        order_spec = ", ".join(f"{col.column} {col.direction}" for col in self.order_by)
        lines = [" " * indent + f"OrderBy({order_spec})"]
        lines.extend(self.child.explain(indent + 2))
        return lines

__init__

__init__(source: Operator, order_by: list[OrderByColumn])

Initialize OrderBy operator

Parameters:

Name Type Description Default
source Operator

Source operator

required
order_by list[OrderByColumn]

List of OrderByColumn specifications

required
Source code in sqlstream/operators/orderby.py
def __init__(self, source: Operator, order_by: list[OrderByColumn]):
    """
    Initialize OrderBy operator

    Args:
        source: Source operator
        order_by: List of OrderByColumn specifications
    """
    super().__init__(source)
    self.order_by = order_by

__iter__

__iter__() -> Iterator[dict[str, Any]]

Execute ORDER BY sorting

Yields:

Type Description
dict[str, Any]

Rows in sorted order

Source code in sqlstream/operators/orderby.py
def __iter__(self) -> Iterator[dict[str, Any]]:
    """
    Execute ORDER BY sorting

    Yields:
        Rows in sorted order
    """
    # Materialize all rows
    rows = list(self.child)

    # Sort rows using multi-key sort
    sorted_rows = sorted(rows, key=self._sort_key)

    # Yield sorted rows
    yield from sorted_rows

explain

explain(indent: int = 0) -> list[str]

Generate execution plan explanation

Source code in sqlstream/operators/orderby.py
def explain(self, indent: int = 0) -> list[str]:
    """Generate execution plan explanation"""
    order_spec = ", ".join(f"{col.column} {col.direction}" for col in self.order_by)
    lines = [" " * indent + f"OrderBy({order_spec})"]
    lines.extend(self.child.explain(indent + 2))
    return lines

ReverseCompare

ReverseCompare

Wrapper class to reverse comparison order for non-numeric types

Used for DESC sorting of strings and other non-numeric types.

Source code in sqlstream/operators/orderby.py
class ReverseCompare:
    """
    Wrapper class to reverse comparison order for non-numeric types

    Used for DESC sorting of strings and other non-numeric types.
    """

    def __init__(self, value):
        self.value = value

    def __lt__(self, other):
        if isinstance(other, ReverseCompare):
            return self.value > other.value
        return self.value > other

    def __le__(self, other):
        if isinstance(other, ReverseCompare):
            return self.value >= other.value
        return self.value >= other

    def __gt__(self, other):
        if isinstance(other, ReverseCompare):
            return self.value < other.value
        return self.value < other

    def __ge__(self, other):
        if isinstance(other, ReverseCompare):
            return self.value <= other.value
        return self.value <= other

    def __eq__(self, other):
        if isinstance(other, ReverseCompare):
            return self.value == other.value
        return self.value == other

    def __ne__(self, other):
        if isinstance(other, ReverseCompare):
            return self.value != other.value
        return self.value != other

    def __repr__(self):
        return f"ReverseCompare({self.value!r})"

Project

Project

Bases: Operator

Project operator - selects columns (SELECT clause)

Pulls rows from child and yields only the requested columns.

For efficiency, we use dict views rather than copying data.

Source code in sqlstream/operators/project.py
class Project(Operator):
    """
    Project operator - selects columns (SELECT clause)

    Pulls rows from child and yields only the requested columns.

    For efficiency, we use dict views rather than copying data.
    """

    def __init__(self, child: Operator, columns: list[str]):
        """
        Initialize project operator

        Args:
            child: Child operator to pull rows from
            columns: List of column names to select (or ['*'] for all)
        """
        super().__init__(child)
        self.columns = columns

    def __iter__(self) -> Iterator[dict[str, Any]]:
        """
        Yield rows with only selected columns

        If columns is ['*'], yields all columns unchanged.
        Otherwise, creates a new dict with only the requested columns.
        """
        # SELECT *
        if self.columns == ["*"]:
            yield from self.child
            return

        # SELECT specific columns
        for row in self.child:
            # Create projected row with only selected columns
            projected = {}

            for col in self.columns:
                if col in row:
                    projected[col] = row[col]
                else:
                    # Column not found - set to None
                    projected[col] = None

            yield projected

    def __repr__(self) -> str:
        col_str = ", ".join(self.columns)
        return f"Project({col_str})"

__init__

__init__(child: Operator, columns: list[str])

Initialize project operator

Parameters:

Name Type Description Default
child Operator

Child operator to pull rows from

required
columns list[str]

List of column names to select (or ['*'] for all)

required
Source code in sqlstream/operators/project.py
def __init__(self, child: Operator, columns: list[str]):
    """
    Initialize project operator

    Args:
        child: Child operator to pull rows from
        columns: List of column names to select (or ['*'] for all)
    """
    super().__init__(child)
    self.columns = columns

__iter__

__iter__() -> Iterator[dict[str, Any]]

Yield rows with only selected columns

If columns is ['*'], yields all columns unchanged. Otherwise, creates a new dict with only the requested columns.

Source code in sqlstream/operators/project.py
def __iter__(self) -> Iterator[dict[str, Any]]:
    """
    Yield rows with only selected columns

    If columns is ['*'], yields all columns unchanged.
    Otherwise, creates a new dict with only the requested columns.
    """
    # SELECT *
    if self.columns == ["*"]:
        yield from self.child
        return

    # SELECT specific columns
    for row in self.child:
        # Create projected row with only selected columns
        projected = {}

        for col in self.columns:
            if col in row:
                projected[col] = row[col]
            else:
                # Column not found - set to None
                projected[col] = None

        yield projected

Scan

Scan

Bases: Operator

Scan operator - wrapper around a data source reader

This is the leaf of the operator tree. It pulls data from a reader and yields it to parent operators.

Source code in sqlstream/operators/scan.py
class Scan(Operator):
    """
    Scan operator - wrapper around a data source reader

    This is the leaf of the operator tree. It pulls data from
    a reader and yields it to parent operators.
    """

    def __init__(self, reader: BaseReader):
        """
        Initialize scan operator

        Args:
            reader: Data source reader to scan
        """
        super().__init__(child=None)  # Scan has no child
        self.reader = reader

    def __iter__(self) -> Iterator[dict[str, Any]]:
        """
        Yield all rows from the reader

        This delegates directly to the reader's lazy iterator.
        """
        yield from self.reader.read_lazy()

    def __repr__(self) -> str:
        return f"Scan({self.reader.__class__.__name__})"

__init__

__init__(reader: BaseReader)

Initialize scan operator

Parameters:

Name Type Description Default
reader BaseReader

Data source reader to scan

required
Source code in sqlstream/operators/scan.py
def __init__(self, reader: BaseReader):
    """
    Initialize scan operator

    Args:
        reader: Data source reader to scan
    """
    super().__init__(child=None)  # Scan has no child
    self.reader = reader

__iter__

__iter__() -> Iterator[dict[str, Any]]

Yield all rows from the reader

This delegates directly to the reader's lazy iterator.

Source code in sqlstream/operators/scan.py
def __iter__(self) -> Iterator[dict[str, Any]]:
    """
    Yield all rows from the reader

    This delegates directly to the reader's lazy iterator.
    """
    yield from self.reader.read_lazy()