Skip to content

Utilities Reference

Utility functions and helpers.

Aggregator

Aggregator

Base class for aggregators

Source code in sqlstream/utils/aggregates.py
class Aggregator:
    """Base class for aggregators"""

    def update(self, value: Any) -> None:
        """Update aggregator with a new value"""
        raise NotImplementedError

    def result(self) -> Any:
        """Get final aggregated result"""
        raise NotImplementedError

update

update(value: Any) -> None

Update aggregator with a new value

Source code in sqlstream/utils/aggregates.py
def update(self, value: Any) -> None:
    """Update aggregator with a new value"""
    raise NotImplementedError

result

result() -> Any

Get final aggregated result

Source code in sqlstream/utils/aggregates.py
def result(self) -> Any:
    """Get final aggregated result"""
    raise NotImplementedError

AvgAggregator

AvgAggregator

Bases: Aggregator

AVG aggregator - computes average of numeric values

Source code in sqlstream/utils/aggregates.py
class AvgAggregator(Aggregator):
    """AVG aggregator - computes average of numeric values"""

    def __init__(self):
        self.sum = 0
        self.count = 0

    def update(self, value: Any) -> None:
        """Add value to average calculation"""
        if value is None:
            return

        try:
            self.sum += value
            self.count += 1
        except TypeError:
            # Skip non-numeric values
            pass

    def result(self) -> float | None:
        """Return average, or None if no valid values"""
        if self.count == 0:
            return None
        return self.sum / self.count

update

update(value: Any) -> None

Add value to average calculation

Source code in sqlstream/utils/aggregates.py
def update(self, value: Any) -> None:
    """Add value to average calculation"""
    if value is None:
        return

    try:
        self.sum += value
        self.count += 1
    except TypeError:
        # Skip non-numeric values
        pass

result

result() -> float | None

Return average, or None if no valid values

Source code in sqlstream/utils/aggregates.py
def result(self) -> float | None:
    """Return average, or None if no valid values"""
    if self.count == 0:
        return None
    return self.sum / self.count

CountAggregator

CountAggregator

Bases: Aggregator

COUNT aggregator - counts non-NULL values

Source code in sqlstream/utils/aggregates.py
class CountAggregator(Aggregator):
    """COUNT aggregator - counts non-NULL values"""

    def __init__(self, count_star: bool = False):
        """
        Initialize COUNT aggregator

        Args:
            count_star: If True, counts all rows (COUNT(*))
                       If False, counts non-NULL values (COUNT(column))
        """
        self.count_star = count_star
        self.count = 0

    def update(self, value: Any) -> None:
        """Update count"""
        if self.count_star or value is not None:
            self.count += 1

    def result(self) -> int:
        """Return total count"""
        return self.count

__init__

__init__(count_star: bool = False)

Initialize COUNT aggregator

Parameters:

Name Type Description Default
count_star bool

If True, counts all rows (COUNT(*)) If False, counts non-NULL values (COUNT(column))

False
Source code in sqlstream/utils/aggregates.py
def __init__(self, count_star: bool = False):
    """
    Initialize COUNT aggregator

    Args:
        count_star: If True, counts all rows (COUNT(*))
                   If False, counts non-NULL values (COUNT(column))
    """
    self.count_star = count_star
    self.count = 0

update

update(value: Any) -> None

Update count

Source code in sqlstream/utils/aggregates.py
def update(self, value: Any) -> None:
    """Update count"""
    if self.count_star or value is not None:
        self.count += 1

result

result() -> int

Return total count

Source code in sqlstream/utils/aggregates.py
def result(self) -> int:
    """Return total count"""
    return self.count

MaxAggregator

MaxAggregator

Bases: Aggregator

MAX aggregator - finds maximum value

Source code in sqlstream/utils/aggregates.py
class MaxAggregator(Aggregator):
    """MAX aggregator - finds maximum value"""

    def __init__(self):
        self.max: Any | None = None

    def update(self, value: Any) -> None:
        """Update maximum"""
        if value is None:
            return

        if self.max is None or value > self.max:
            self.max = value

    def result(self) -> Any | None:
        """Return maximum value, or None if no valid values"""
        return self.max

update

update(value: Any) -> None

Update maximum

Source code in sqlstream/utils/aggregates.py
def update(self, value: Any) -> None:
    """Update maximum"""
    if value is None:
        return

    if self.max is None or value > self.max:
        self.max = value

result

result() -> Any | None

Return maximum value, or None if no valid values

Source code in sqlstream/utils/aggregates.py
def result(self) -> Any | None:
    """Return maximum value, or None if no valid values"""
    return self.max

MinAggregator

MinAggregator

Bases: Aggregator

MIN aggregator - finds minimum value

Source code in sqlstream/utils/aggregates.py
class MinAggregator(Aggregator):
    """MIN aggregator - finds minimum value"""

    def __init__(self):
        self.min: Any | None = None

    def update(self, value: Any) -> None:
        """Update minimum"""
        if value is None:
            return

        if self.min is None or value < self.min:
            self.min = value

    def result(self) -> Any | None:
        """Return minimum value, or None if no valid values"""
        return self.min

update

update(value: Any) -> None

Update minimum

Source code in sqlstream/utils/aggregates.py
def update(self, value: Any) -> None:
    """Update minimum"""
    if value is None:
        return

    if self.min is None or value < self.min:
        self.min = value

result

result() -> Any | None

Return minimum value, or None if no valid values

Source code in sqlstream/utils/aggregates.py
def result(self) -> Any | None:
    """Return minimum value, or None if no valid values"""
    return self.min

SumAggregator

SumAggregator

Bases: Aggregator

SUM aggregator - sums numeric values

Source code in sqlstream/utils/aggregates.py
class SumAggregator(Aggregator):
    """SUM aggregator - sums numeric values"""

    def __init__(self):
        self.sum: float | None = None

    def update(self, value: Any) -> None:
        """Add value to sum"""
        if value is None:
            return

        if self.sum is None:
            self.sum = 0

        try:
            self.sum += value
        except TypeError:
            # Skip non-numeric values
            pass

    def result(self) -> float | None:
        """Return sum, or None if no valid values"""
        return self.sum

update

update(value: Any) -> None

Add value to sum

Source code in sqlstream/utils/aggregates.py
def update(self, value: Any) -> None:
    """Add value to sum"""
    if value is None:
        return

    if self.sum is None:
        self.sum = 0

    try:
        self.sum += value
    except TypeError:
        # Skip non-numeric values
        pass

result

result() -> float | None

Return sum, or None if no valid values

Source code in sqlstream/utils/aggregates.py
def result(self) -> float | None:
    """Return sum, or None if no valid values"""
    return self.sum

create_aggregator

create_aggregator

create_aggregator(function: str, column: str) -> Aggregator

Factory function to create appropriate aggregator

Parameters:

Name Type Description Default
function str

Aggregate function name (COUNT, SUM, AVG, MIN, MAX)

required
column str

Column name (or '' for COUNT())

required

Returns:

Type Description
Aggregator

Aggregator instance

Raises:

Type Description
ValueError

If function is not recognized

Source code in sqlstream/utils/aggregates.py
def create_aggregator(function: str, column: str) -> Aggregator:
    """
    Factory function to create appropriate aggregator

    Args:
        function: Aggregate function name (COUNT, SUM, AVG, MIN, MAX)
        column: Column name (or '*' for COUNT(*))

    Returns:
        Aggregator instance

    Raises:
        ValueError: If function is not recognized
    """
    function = function.upper()

    if function == "COUNT":
        return CountAggregator(count_star=(column == "*"))
    elif function == "SUM":
        return SumAggregator()
    elif function == "AVG":
        return AvgAggregator()
    elif function == "MIN":
        return MinAggregator()
    elif function == "MAX":
        return MaxAggregator()
    else:
        raise ValueError(f"Unknown aggregate function: {function}")