Skip to content

Readers API Reference

Data source readers for various formats.

BaseReader

BaseReader

Base class for all data source readers

Readers are responsible for: 1. Reading data from a source (file, URL, database, etc.) 2. Yielding rows as dictionaries (lazy evaluation) 3. Optionally supporting predicate pushdown 4. Optionally supporting column pruning

Source code in sqlstream/readers/base.py
class BaseReader:
    """
    Base class for all data source readers

    Readers are responsible for:
    1. Reading data from a source (file, URL, database, etc.)
    2. Yielding rows as dictionaries (lazy evaluation)
    3. Optionally supporting predicate pushdown
    4. Optionally supporting column pruning
    """

    def read_lazy(self) -> Iterator[dict[str, Any]]:
        """
        Yield rows as dictionaries

        This is the core method that all readers must implement.
        It should yield one row at a time (lazy evaluation) rather than
        loading all data into memory.

        Yields:
            Dictionary representing one row of data

        Example:
            {'name': 'Alice', 'age': 30, 'city': 'NYC'}
        """
        raise NotImplementedError("Subclasses must implement read_lazy()")

    def supports_pushdown(self) -> bool:
        """
        Does this reader support predicate pushdown?

        If True, the query optimizer can call set_filter() to push
        WHERE conditions down to the reader for more efficient execution.

        Returns:
            True if predicate pushdown is supported
        """
        return False

    def set_filter(self, conditions: list[Condition]) -> None:
        """
        Set filter conditions for predicate pushdown

        Args:
            conditions: List of WHERE conditions to apply during read

        Note:
            Only called if supports_pushdown() returns True
        """
        pass

    def supports_column_selection(self) -> bool:
        """
        Does this reader support column pruning?

        If True, the query optimizer can call set_columns() to specify
        which columns are needed, allowing the reader to skip reading
        unnecessary columns.

        Returns:
            True if column selection is supported
        """
        return False

    def set_columns(self, columns: list[str]) -> None:
        """
        Set which columns to read (column pruning)

        Args:
            columns: List of column names to read

        Note:
            Only called if supports_column_selection() returns True
        """
        pass

    def supports_limit(self) -> bool:
        """
        Does this reader support early termination with LIMIT?

        If True, the query optimizer can call set_limit() to specify
        the maximum number of rows to read, allowing early termination.

        Returns:
            True if limit pushdown is supported
        """
        return False

    def set_limit(self, limit: int) -> None:
        """
        Set maximum number of rows to read (limit pushdown)

        Args:
            limit: Maximum number of rows to yield

        Note:
            Only called if supports_limit() returns True
            Reader should stop yielding rows after 'limit' rows
        """
        pass

    def supports_partition_pruning(self) -> bool:
        """
        Does this reader support partition pruning?

        If True, the query optimizer can call set_partition_filters() to specify
        which partitions to read based on filter conditions.

        Returns:
            True if partition pruning is supported
        """
        return False

    def get_partition_columns(self) -> set:
        """
        Get partition column names for Hive-style partitioning

        Returns:
            Set of partition column names (e.g., {'year', 'month', 'day'})
            Empty set if not partitioned

        Example:
            For path: s3://bucket/data/year=2024/month=01/data.parquet
            Returns: {'year', 'month'}
        """
        return set()

    def set_partition_filters(self, conditions: list[Condition]) -> None:
        """
        Set filter conditions for partition pruning

        Args:
            conditions: List of WHERE conditions on partition columns

        Note:
            Only called if supports_partition_pruning() returns True
            Reader should skip partitions that don't match these conditions
        """
        pass

    def get_schema(self) -> Schema | None:
        """
        Get schema information (column names and types)

        Returns:
            Schema object with inferred types, or None if schema cannot be inferred

        Note:
            Optional method. Returns None by default.
            Readers should override this to provide schema inference.
        """
        return None

    def __iter__(self):
        """Allow readers to be used directly in for loops"""
        return self.read_lazy()

    def to_dataframe(self):
        """
        Convert reader content to pandas DataFrame

        Returns:
            pandas.DataFrame containing all data

        Raises:
            ImportError: If pandas is not installed

        Note:
            Default implementation iterates over read_lazy() and creates DataFrame.
            Subclasses should override this for better performance (e.g. using read_csv/read_parquet).
        """
        try:
            import pandas as pd
        except ImportError as e:
            raise ImportError("Pandas is required for to_dataframe()") from e

        # Default implementation: materialize iterator
        return pd.DataFrame(list(self.read_lazy()))

read_lazy

read_lazy() -> Iterator[dict[str, Any]]

Yield rows as dictionaries

This is the core method that all readers must implement. It should yield one row at a time (lazy evaluation) rather than loading all data into memory.

Yields:

Type Description
dict[str, Any]

Dictionary representing one row of data

Example

{'name': 'Alice', 'age': 30, 'city': 'NYC'}

Source code in sqlstream/readers/base.py
def read_lazy(self) -> Iterator[dict[str, Any]]:
    """
    Yield rows as dictionaries

    This is the core method that all readers must implement.
    It should yield one row at a time (lazy evaluation) rather than
    loading all data into memory.

    Yields:
        Dictionary representing one row of data

    Example:
        {'name': 'Alice', 'age': 30, 'city': 'NYC'}
    """
    raise NotImplementedError("Subclasses must implement read_lazy()")

supports_pushdown

supports_pushdown() -> bool

Does this reader support predicate pushdown?

If True, the query optimizer can call set_filter() to push WHERE conditions down to the reader for more efficient execution.

Returns:

Type Description
bool

True if predicate pushdown is supported

Source code in sqlstream/readers/base.py
def supports_pushdown(self) -> bool:
    """
    Does this reader support predicate pushdown?

    If True, the query optimizer can call set_filter() to push
    WHERE conditions down to the reader for more efficient execution.

    Returns:
        True if predicate pushdown is supported
    """
    return False

set_filter

set_filter(conditions: list[Condition]) -> None

Set filter conditions for predicate pushdown

Parameters:

Name Type Description Default
conditions list[Condition]

List of WHERE conditions to apply during read

required
Note

Only called if supports_pushdown() returns True

Source code in sqlstream/readers/base.py
def set_filter(self, conditions: list[Condition]) -> None:
    """
    Set filter conditions for predicate pushdown

    Args:
        conditions: List of WHERE conditions to apply during read

    Note:
        Only called if supports_pushdown() returns True
    """
    pass

supports_column_selection

supports_column_selection() -> bool

Does this reader support column pruning?

If True, the query optimizer can call set_columns() to specify which columns are needed, allowing the reader to skip reading unnecessary columns.

Returns:

Type Description
bool

True if column selection is supported

Source code in sqlstream/readers/base.py
def supports_column_selection(self) -> bool:
    """
    Does this reader support column pruning?

    If True, the query optimizer can call set_columns() to specify
    which columns are needed, allowing the reader to skip reading
    unnecessary columns.

    Returns:
        True if column selection is supported
    """
    return False

set_columns

set_columns(columns: list[str]) -> None

Set which columns to read (column pruning)

Parameters:

Name Type Description Default
columns list[str]

List of column names to read

required
Note

Only called if supports_column_selection() returns True

Source code in sqlstream/readers/base.py
def set_columns(self, columns: list[str]) -> None:
    """
    Set which columns to read (column pruning)

    Args:
        columns: List of column names to read

    Note:
        Only called if supports_column_selection() returns True
    """
    pass

supports_limit

supports_limit() -> bool

Does this reader support early termination with LIMIT?

If True, the query optimizer can call set_limit() to specify the maximum number of rows to read, allowing early termination.

Returns:

Type Description
bool

True if limit pushdown is supported

Source code in sqlstream/readers/base.py
def supports_limit(self) -> bool:
    """
    Does this reader support early termination with LIMIT?

    If True, the query optimizer can call set_limit() to specify
    the maximum number of rows to read, allowing early termination.

    Returns:
        True if limit pushdown is supported
    """
    return False

set_limit

set_limit(limit: int) -> None

Set maximum number of rows to read (limit pushdown)

Parameters:

Name Type Description Default
limit int

Maximum number of rows to yield

required
Note

Only called if supports_limit() returns True Reader should stop yielding rows after 'limit' rows

Source code in sqlstream/readers/base.py
def set_limit(self, limit: int) -> None:
    """
    Set maximum number of rows to read (limit pushdown)

    Args:
        limit: Maximum number of rows to yield

    Note:
        Only called if supports_limit() returns True
        Reader should stop yielding rows after 'limit' rows
    """
    pass

supports_partition_pruning

supports_partition_pruning() -> bool

Does this reader support partition pruning?

If True, the query optimizer can call set_partition_filters() to specify which partitions to read based on filter conditions.

Returns:

Type Description
bool

True if partition pruning is supported

Source code in sqlstream/readers/base.py
def supports_partition_pruning(self) -> bool:
    """
    Does this reader support partition pruning?

    If True, the query optimizer can call set_partition_filters() to specify
    which partitions to read based on filter conditions.

    Returns:
        True if partition pruning is supported
    """
    return False

get_partition_columns

get_partition_columns() -> set

Get partition column names for Hive-style partitioning

Returns:

Type Description
set

Set of partition column names (e.g., {'year', 'month', 'day'})

set

Empty set if not partitioned

Example

For path: s3://bucket/data/year=2024/month=01/data.parquet Returns: {'year', 'month'}

Source code in sqlstream/readers/base.py
def get_partition_columns(self) -> set:
    """
    Get partition column names for Hive-style partitioning

    Returns:
        Set of partition column names (e.g., {'year', 'month', 'day'})
        Empty set if not partitioned

    Example:
        For path: s3://bucket/data/year=2024/month=01/data.parquet
        Returns: {'year', 'month'}
    """
    return set()

set_partition_filters

set_partition_filters(conditions: list[Condition]) -> None

Set filter conditions for partition pruning

Parameters:

Name Type Description Default
conditions list[Condition]

List of WHERE conditions on partition columns

required
Note

Only called if supports_partition_pruning() returns True Reader should skip partitions that don't match these conditions

Source code in sqlstream/readers/base.py
def set_partition_filters(self, conditions: list[Condition]) -> None:
    """
    Set filter conditions for partition pruning

    Args:
        conditions: List of WHERE conditions on partition columns

    Note:
        Only called if supports_partition_pruning() returns True
        Reader should skip partitions that don't match these conditions
    """
    pass

get_schema

get_schema() -> Schema | None

Get schema information (column names and types)

Returns:

Type Description
Schema | None

Schema object with inferred types, or None if schema cannot be inferred

Note

Optional method. Returns None by default. Readers should override this to provide schema inference.

Source code in sqlstream/readers/base.py
def get_schema(self) -> Schema | None:
    """
    Get schema information (column names and types)

    Returns:
        Schema object with inferred types, or None if schema cannot be inferred

    Note:
        Optional method. Returns None by default.
        Readers should override this to provide schema inference.
    """
    return None

__iter__

__iter__()

Allow readers to be used directly in for loops

Source code in sqlstream/readers/base.py
def __iter__(self):
    """Allow readers to be used directly in for loops"""
    return self.read_lazy()

to_dataframe

to_dataframe()

Convert reader content to pandas DataFrame

Returns:

Type Description

pandas.DataFrame containing all data

Raises:

Type Description
ImportError

If pandas is not installed

Note

Default implementation iterates over read_lazy() and creates DataFrame. Subclasses should override this for better performance (e.g. using read_csv/read_parquet).

Source code in sqlstream/readers/base.py
def to_dataframe(self):
    """
    Convert reader content to pandas DataFrame

    Returns:
        pandas.DataFrame containing all data

    Raises:
        ImportError: If pandas is not installed

    Note:
        Default implementation iterates over read_lazy() and creates DataFrame.
        Subclasses should override this for better performance (e.g. using read_csv/read_parquet).
    """
    try:
        import pandas as pd
    except ImportError as e:
        raise ImportError("Pandas is required for to_dataframe()") from e

    # Default implementation: materialize iterator
    return pd.DataFrame(list(self.read_lazy()))

CSVReader

CSVReader

Bases: BaseReader

Lazy CSV reader with basic type inference

Features: - Lazy iteration (doesn't load entire file into memory) - Automatic type inference (int, float, string) - Predicate pushdown support - Column pruning support

Source code in sqlstream/readers/csv_reader.py
class CSVReader(BaseReader):
    """
    Lazy CSV reader with basic type inference

    Features:
    - Lazy iteration (doesn't load entire file into memory)
    - Automatic type inference (int, float, string)
    - Predicate pushdown support
    - Column pruning support
    """

    def __init__(self, path: str, encoding: str = "utf-8", delimiter: str = ","):
        """
        Initialize CSV reader

        Args:
            path: Path to CSV file (local or s3://)
            encoding: File encoding (default: utf-8)
            delimiter: CSV delimiter (default: comma)
        """
        self.path_str = path
        self.is_s3 = path.startswith("s3://")
        if not self.is_s3:
            self.path = Path(path)
        else:
            self.path = None  # type: ignore

        self.encoding = encoding
        self.delimiter = delimiter

        # For optimization (set by query optimizer)
        self.filter_conditions: list[Condition] = []
        self.required_columns: list[str] = []
        self.limit: int | None = None

        if not self.is_s3 and not self.path.exists():
            raise FileNotFoundError(f"CSV file not found: {path}")

    def supports_pushdown(self) -> bool:
        """CSV reader supports predicate pushdown"""
        return True

    def supports_column_selection(self) -> bool:
        """CSV reader supports column pruning"""
        return True

    def supports_limit(self) -> bool:
        """CSV reader supports limit pushdown"""
        return True

    def set_filter(self, conditions: list[Condition]) -> None:
        """Set filter conditions for pushdown"""
        self.filter_conditions = conditions

    def set_columns(self, columns: list[str]) -> None:
        """Set required columns for pruning"""
        self.required_columns = columns

    def set_limit(self, limit: int) -> None:
        """Set maximum rows to read for early termination"""
        self.limit = limit

    def _get_file_handle(self):
        """Get file handle for reading (local or S3)."""
        if self.is_s3:
            try:
                import s3fs

                fs = s3fs.S3FileSystem(anon=False)
                return fs.open(self.path_str, mode="r", encoding=self.encoding)
            except ImportError as e:
                raise ImportError(
                    "s3fs is required for S3 support. Install with: pip install sqlstream[s3]"
                ) from e
        else:
            return open(self.path, encoding=self.encoding, newline="")

    def read_lazy(self) -> Iterator[dict[str, Any]]:
        """
        Lazy iterator over CSV rows

        Yields rows as dictionaries with type inference applied.
        If filters are set, applies them during iteration.
        If columns are set, only yields those columns.
        If limit is set, stops after yielding that many rows.
        """
        with self._get_file_handle() as f:
            reader = csv.DictReader(f, delimiter=self.delimiter)
            rows_yielded = 0

            for row_num, raw_row in enumerate(reader, start=2):  # Start at 2 (after header)
                try:
                    # Check for extra columns (malformed row)
                    if None in raw_row:
                        raise ValueError(f"Row has extra columns: {raw_row[None]}")

                    # Apply type inference
                    row = self._infer_types(raw_row)

                    # Apply filters if set (predicate pushdown)
                    if self.filter_conditions:
                        if not self._matches_filter(row):
                            continue

                    # Apply column selection if set (column pruning)
                    if self.required_columns:
                        row = {k: v for k, v in row.items() if k in self.required_columns}

                    yield row
                    rows_yielded += 1

                    # Early termination if limit reached (limit pushdown)
                    if self.limit is not None and rows_yielded >= self.limit:
                        break

                except Exception as e:
                    # Handle malformed rows gracefully
                    warnings.warn(
                        f"Skipping malformed row {row_num} in {self.path}: {e}",
                        UserWarning,
                        stacklevel=2,
                    )
                    continue

    def _infer_types(self, row: dict[str, str]) -> dict[str, Any]:
        """
        Infer types for all values in a row

        Tries to convert strings to int, then float, otherwise keeps as string.

        Args:
            row: Dictionary with string values

        Returns:
            Dictionary with inferred types
        """
        typed_row = {}

        for key, value in row.items():
            # Handle empty strings as None
            if value == "" or value is None:
                typed_row[key] = None
                continue

            typed_row[key] = self._infer_value_type(value)

        return typed_row

    def _infer_value_type(self, value: str) -> Any:
        """
        Infer type of a single value using enhanced type system.

        Args:
            value: String value from CSV

        Returns:
            Value converted to proper Python type (int, float, Decimal, datetime, etc.)
        """
        from sqlstream.core.types import infer_type_from_string

        return infer_type_from_string(value)

    def _matches_filter(self, row: dict[str, Any]) -> bool:
        """
        Check if row matches all filter conditions

        Args:
            row: Row to check

        Returns:
            True if row matches all conditions (AND logic)
        """
        for condition in self.filter_conditions:
            if not self._evaluate_condition(row, condition):
                return False
        return True

    def _evaluate_condition(self, row: dict[str, Any], condition: Condition) -> bool:
        """
        Evaluate a single condition against a row

        Args:
            row: Row to check
            condition: Condition to evaluate

        Returns:
            True if condition is satisfied
        """
        # Get column value
        if condition.column not in row:
            return False

        value = row[condition.column]

        # Handle NULL values
        if value is None:
            return False

        # Evaluate operator
        op = condition.operator
        expected = condition.value

        try:
            if op == "=":
                return value == expected
            elif op == ">":
                return value > expected
            elif op == "<":
                return value < expected
            elif op == ">=":
                return value >= expected
            elif op == "<=":
                return value <= expected
            elif op == "!=":
                return value != expected
            else:
                # Unknown operator, skip this condition
                warnings.warn(f"Unknown operator: {op}", UserWarning, stacklevel=2)
                return True

        except TypeError:
            # Type mismatch (e.g., comparing string to int)
            # This is fine - row just doesn't match
            return False

    def get_schema(self, sample_size: int = 100) -> Schema | None:
        """
        Infer schema by sampling rows from the CSV file

        Args:
            sample_size: Number of rows to sample for type inference (default: 100)

        Returns:
            Schema object with inferred types, or None if file is empty
        """
        sample_rows = []

        with self._get_file_handle() as f:
            reader = csv.DictReader(f, delimiter=self.delimiter)

            # Read sample rows to infer types
            try:
                for i, raw_row in enumerate(reader):
                    if i >= sample_size:
                        break
                    typed_row = self._infer_types(raw_row)
                    sample_rows.append(typed_row)

            except StopIteration:
                # Empty file or fewer rows than sample_size
                pass

        if not sample_rows:
            return None

        return Schema.from_rows(sample_rows)

    def to_dataframe(self):
        """
        Convert to pandas DataFrame efficiently, respecting inferred types.
        """
        import pandas as pd

        from sqlstream.core.types import DataType

        # Get schema to guide pandas parsing
        schema = self.get_schema()

        parse_dates = []
        dtypes = {}

        if schema:
            for col, dtype in schema.columns.items():
                if dtype == DataType.DATETIME or dtype == DataType.DATE:
                    parse_dates.append(col)
                elif dtype == DataType.INTEGER:
                    dtypes[col] = "Int64"  # Nullable integer
                elif dtype == DataType.FLOAT:
                    dtypes[col] = "float64"
                elif dtype == DataType.DECIMAL:
                    # Pandas doesn't support native Decimal well in read_csv
                    # We'll let it be object or float, or handle it post-load if needed
                    pass
                elif dtype == DataType.STRING:
                    dtypes[col] = "string"
                elif dtype == DataType.BOOLEAN:
                    dtypes[col] = "boolean"

        # Use pandas read_csv for performance
        kwargs = {
            "encoding": self.encoding,
            "delimiter": self.delimiter,
            "parse_dates": parse_dates,
            "dtype": dtypes,
        }

        if self.is_s3:
            kwargs["storage_options"] = {"anon": False}
            return pd.read_csv(self.path_str, **kwargs)
        else:
            return pd.read_csv(self.path, **kwargs)

__init__

__init__(path: str, encoding: str = 'utf-8', delimiter: str = ',')

Initialize CSV reader

Parameters:

Name Type Description Default
path str

Path to CSV file (local or s3://)

required
encoding str

File encoding (default: utf-8)

'utf-8'
delimiter str

CSV delimiter (default: comma)

','
Source code in sqlstream/readers/csv_reader.py
def __init__(self, path: str, encoding: str = "utf-8", delimiter: str = ","):
    """
    Initialize CSV reader

    Args:
        path: Path to CSV file (local or s3://)
        encoding: File encoding (default: utf-8)
        delimiter: CSV delimiter (default: comma)
    """
    self.path_str = path
    self.is_s3 = path.startswith("s3://")
    if not self.is_s3:
        self.path = Path(path)
    else:
        self.path = None  # type: ignore

    self.encoding = encoding
    self.delimiter = delimiter

    # For optimization (set by query optimizer)
    self.filter_conditions: list[Condition] = []
    self.required_columns: list[str] = []
    self.limit: int | None = None

    if not self.is_s3 and not self.path.exists():
        raise FileNotFoundError(f"CSV file not found: {path}")

supports_pushdown

supports_pushdown() -> bool

CSV reader supports predicate pushdown

Source code in sqlstream/readers/csv_reader.py
def supports_pushdown(self) -> bool:
    """CSV reader supports predicate pushdown"""
    return True

supports_column_selection

supports_column_selection() -> bool

CSV reader supports column pruning

Source code in sqlstream/readers/csv_reader.py
def supports_column_selection(self) -> bool:
    """CSV reader supports column pruning"""
    return True

supports_limit

supports_limit() -> bool

CSV reader supports limit pushdown

Source code in sqlstream/readers/csv_reader.py
def supports_limit(self) -> bool:
    """CSV reader supports limit pushdown"""
    return True

set_filter

set_filter(conditions: list[Condition]) -> None

Set filter conditions for pushdown

Source code in sqlstream/readers/csv_reader.py
def set_filter(self, conditions: list[Condition]) -> None:
    """Set filter conditions for pushdown"""
    self.filter_conditions = conditions

set_columns

set_columns(columns: list[str]) -> None

Set required columns for pruning

Source code in sqlstream/readers/csv_reader.py
def set_columns(self, columns: list[str]) -> None:
    """Set required columns for pruning"""
    self.required_columns = columns

set_limit

set_limit(limit: int) -> None

Set maximum rows to read for early termination

Source code in sqlstream/readers/csv_reader.py
def set_limit(self, limit: int) -> None:
    """Set maximum rows to read for early termination"""
    self.limit = limit

read_lazy

read_lazy() -> Iterator[dict[str, Any]]

Lazy iterator over CSV rows

Yields rows as dictionaries with type inference applied. If filters are set, applies them during iteration. If columns are set, only yields those columns. If limit is set, stops after yielding that many rows.

Source code in sqlstream/readers/csv_reader.py
def read_lazy(self) -> Iterator[dict[str, Any]]:
    """
    Lazy iterator over CSV rows

    Yields rows as dictionaries with type inference applied.
    If filters are set, applies them during iteration.
    If columns are set, only yields those columns.
    If limit is set, stops after yielding that many rows.
    """
    with self._get_file_handle() as f:
        reader = csv.DictReader(f, delimiter=self.delimiter)
        rows_yielded = 0

        for row_num, raw_row in enumerate(reader, start=2):  # Start at 2 (after header)
            try:
                # Check for extra columns (malformed row)
                if None in raw_row:
                    raise ValueError(f"Row has extra columns: {raw_row[None]}")

                # Apply type inference
                row = self._infer_types(raw_row)

                # Apply filters if set (predicate pushdown)
                if self.filter_conditions:
                    if not self._matches_filter(row):
                        continue

                # Apply column selection if set (column pruning)
                if self.required_columns:
                    row = {k: v for k, v in row.items() if k in self.required_columns}

                yield row
                rows_yielded += 1

                # Early termination if limit reached (limit pushdown)
                if self.limit is not None and rows_yielded >= self.limit:
                    break

            except Exception as e:
                # Handle malformed rows gracefully
                warnings.warn(
                    f"Skipping malformed row {row_num} in {self.path}: {e}",
                    UserWarning,
                    stacklevel=2,
                )
                continue

get_schema

get_schema(sample_size: int = 100) -> Schema | None

Infer schema by sampling rows from the CSV file

Parameters:

Name Type Description Default
sample_size int

Number of rows to sample for type inference (default: 100)

100

Returns:

Type Description
Schema | None

Schema object with inferred types, or None if file is empty

Source code in sqlstream/readers/csv_reader.py
def get_schema(self, sample_size: int = 100) -> Schema | None:
    """
    Infer schema by sampling rows from the CSV file

    Args:
        sample_size: Number of rows to sample for type inference (default: 100)

    Returns:
        Schema object with inferred types, or None if file is empty
    """
    sample_rows = []

    with self._get_file_handle() as f:
        reader = csv.DictReader(f, delimiter=self.delimiter)

        # Read sample rows to infer types
        try:
            for i, raw_row in enumerate(reader):
                if i >= sample_size:
                    break
                typed_row = self._infer_types(raw_row)
                sample_rows.append(typed_row)

        except StopIteration:
            # Empty file or fewer rows than sample_size
            pass

    if not sample_rows:
        return None

    return Schema.from_rows(sample_rows)

to_dataframe

to_dataframe()

Convert to pandas DataFrame efficiently, respecting inferred types.

Source code in sqlstream/readers/csv_reader.py
def to_dataframe(self):
    """
    Convert to pandas DataFrame efficiently, respecting inferred types.
    """
    import pandas as pd

    from sqlstream.core.types import DataType

    # Get schema to guide pandas parsing
    schema = self.get_schema()

    parse_dates = []
    dtypes = {}

    if schema:
        for col, dtype in schema.columns.items():
            if dtype == DataType.DATETIME or dtype == DataType.DATE:
                parse_dates.append(col)
            elif dtype == DataType.INTEGER:
                dtypes[col] = "Int64"  # Nullable integer
            elif dtype == DataType.FLOAT:
                dtypes[col] = "float64"
            elif dtype == DataType.DECIMAL:
                # Pandas doesn't support native Decimal well in read_csv
                # We'll let it be object or float, or handle it post-load if needed
                pass
            elif dtype == DataType.STRING:
                dtypes[col] = "string"
            elif dtype == DataType.BOOLEAN:
                dtypes[col] = "boolean"

    # Use pandas read_csv for performance
    kwargs = {
        "encoding": self.encoding,
        "delimiter": self.delimiter,
        "parse_dates": parse_dates,
        "dtype": dtypes,
    }

    if self.is_s3:
        kwargs["storage_options"] = {"anon": False}
        return pd.read_csv(self.path_str, **kwargs)
    else:
        return pd.read_csv(self.path, **kwargs)

HTMLReader

HTMLReader

Bases: BaseReader

Read tables from HTML files or URLs

Extracts all tables from HTML and allows querying them. If multiple tables exist, you can select which one to query.

Example

Query first table in HTML

reader = HTMLReader("data.html")

Query specific table (0-indexed)

reader = HTMLReader("data.html", table_index=1)

Query table by matching text

reader = HTMLReader("data.html", match="Sales Data")

Source code in sqlstream/readers/html_reader.py
class HTMLReader(BaseReader):
    """
    Read tables from HTML files or URLs

    Extracts all tables from HTML and allows querying them.
    If multiple tables exist, you can select which one to query.

    Example:
        # Query first table in HTML
        reader = HTMLReader("data.html")

        # Query specific table (0-indexed)
        reader = HTMLReader("data.html", table_index=1)

        # Query table by matching text
        reader = HTMLReader("data.html", match="Sales Data")
    """

    def __init__(self, source: str, table: int = 0, match: str | None = None, **kwargs):
        """
        Initialize HTML reader

        Args:
            source: Path to HTML file or URL
            table: Which table to read (0-indexed, default: 0)
            match: Text to match in table (tries to find table containing this text)
            **kwargs: Additional arguments passed to pandas read_html
        """
        if not PANDAS_AVAILABLE:
            raise ImportError("HTML reader requires pandas library. Install `sqlstream[pandas]`")

        self.source = source
        self.table = table
        self.match = match
        self.kwargs = kwargs

        # Load tables from HTML
        self._load_tables()

        # Filter conditions and columns
        self.filter_conditions: list[Condition] = []
        self.required_columns: list[str] = []

    def _load_tables(self) -> None:
        """Load all tables from HTML source"""
        try:
            # read_html returns a list of DataFrames
            match_pattern = self.match if self.match else ".+"
            self.tables = pd.read_html(self.source, match=match_pattern, **self.kwargs)

            if not self.tables:
                raise ValueError(f"No tables found in HTML: {self.source}")

            # Select the table to work with
            if self.table >= len(self.tables):
                raise ValueError(
                    f"Table index {self.table} out of range. "
                    f"HTML contains {len(self.tables)} table(s)."
                )

            self.df = self.tables[self.table]

            # Clean column names (convert to strings, handle duplicates)
            self.df.columns = [str(col) for col in self.df.columns]

        except ValueError:
            # Re-raise ValueError for validation errors
            raise
        except Exception as e:
            # Only wrap actual I/O errors
            raise OSError(f"Failed to read HTML tables from {self.source}: {e}") from e

    def read_lazy(self) -> Iterator[dict[str, Any]]:
        """Read data lazily from the selected table"""
        df = self.df

        # Apply filters if any
        if self.filter_conditions:
            df = self._apply_filters(df)

        # Apply column selection if any
        if self.required_columns:
            available_cols = [c for c in self.required_columns if c in df.columns]
            if available_cols:
                df = df[available_cols]

        # Yield rows as dictionaries
        yield from df.to_dict("records")

    def _apply_filters(self, df: pd.DataFrame) -> pd.DataFrame:
        """Apply filter conditions to DataFrame"""
        mask = pd.Series([True] * len(df), index=df.index)

        for condition in self.filter_conditions:
            col = condition.column
            op = condition.operator
            value = condition.value

            if col not in df.columns:
                continue

            # Build condition mask
            if op == "=":
                mask &= df[col] == value
            elif op == ">":
                mask &= df[col] > value
            elif op == "<":
                mask &= df[col] < value
            elif op == ">=":
                mask &= df[col] >= value
            elif op == "<=":
                mask &= df[col] <= value
            elif op == "!=":
                mask &= df[col] != value

        return df[mask]

    def get_schema(self) -> Schema:
        """Get schema from the selected table"""

        schema = {}
        for col in self.df.columns:
            dtype = str(self.df[col].dtype)
            # Map pandas dtypes to SQL-like types
            if dtype.startswith("int"):
                schema[col] = DataType.INTEGER
            elif dtype.startswith("float"):
                schema[col] = DataType.FLOAT
            elif dtype == "bool":
                schema[col] = DataType.BOOLEAN
            elif dtype.startswith("datetime"):
                schema[col] = DataType.DATETIME
            elif dtype.startswith("timedelta"):
                schema[col] = DataType.TIME
            else:
                # For object/string types, try to infer from content
                # This is important for JSON or date strings that pandas didn't parse
                from sqlstream.core.types import infer_type

                # Sample non-null values
                sample_values = self.df[col].dropna().head(10)
                if not sample_values.empty:
                    # Use the first value to infer type
                    schema[col] = infer_type(sample_values.iloc[0])
                else:
                    schema[col] = DataType.STRING
        return Schema(schema)

    def supports_pushdown(self) -> bool:
        """HTML reader supports filter pushdown"""
        return True

    def supports_column_selection(self) -> bool:
        """HTML reader supports column selection"""
        return True

    def set_filter(self, conditions: list[Condition]) -> None:
        """Set filter conditions"""
        self.filter_conditions = conditions

    def set_columns(self, columns: list[str]) -> None:
        """Set required columns"""
        self.required_columns = columns

    def list_tables(self) -> list[str]:
        """
        List all tables found in the HTML

        Returns:
            List of table descriptions (first few column names)
        """
        descriptions = []
        for i, table in enumerate(self.tables):
            cols = list(table.columns)[:3]  # First 3 columns
            col_str = ", ".join(str(c) for c in cols)
            if len(table.columns) > 3:
                col_str += ", ..."
            descriptions.append(f"Table {i}: {col_str} ({len(table)} rows)")
        return descriptions

    def to_dataframe(self):
        """
        Convert to pandas DataFrame efficiently
        """
        # HTMLReader already holds data as a DataFrame
        return self.df

__init__

__init__(source: str, table: int = 0, match: str | None = None, **kwargs)

Initialize HTML reader

Parameters:

Name Type Description Default
source str

Path to HTML file or URL

required
table int

Which table to read (0-indexed, default: 0)

0
match str | None

Text to match in table (tries to find table containing this text)

None
**kwargs

Additional arguments passed to pandas read_html

{}
Source code in sqlstream/readers/html_reader.py
def __init__(self, source: str, table: int = 0, match: str | None = None, **kwargs):
    """
    Initialize HTML reader

    Args:
        source: Path to HTML file or URL
        table: Which table to read (0-indexed, default: 0)
        match: Text to match in table (tries to find table containing this text)
        **kwargs: Additional arguments passed to pandas read_html
    """
    if not PANDAS_AVAILABLE:
        raise ImportError("HTML reader requires pandas library. Install `sqlstream[pandas]`")

    self.source = source
    self.table = table
    self.match = match
    self.kwargs = kwargs

    # Load tables from HTML
    self._load_tables()

    # Filter conditions and columns
    self.filter_conditions: list[Condition] = []
    self.required_columns: list[str] = []

read_lazy

read_lazy() -> Iterator[dict[str, Any]]

Read data lazily from the selected table

Source code in sqlstream/readers/html_reader.py
def read_lazy(self) -> Iterator[dict[str, Any]]:
    """Read data lazily from the selected table"""
    df = self.df

    # Apply filters if any
    if self.filter_conditions:
        df = self._apply_filters(df)

    # Apply column selection if any
    if self.required_columns:
        available_cols = [c for c in self.required_columns if c in df.columns]
        if available_cols:
            df = df[available_cols]

    # Yield rows as dictionaries
    yield from df.to_dict("records")

get_schema

get_schema() -> Schema

Get schema from the selected table

Source code in sqlstream/readers/html_reader.py
def get_schema(self) -> Schema:
    """Get schema from the selected table"""

    schema = {}
    for col in self.df.columns:
        dtype = str(self.df[col].dtype)
        # Map pandas dtypes to SQL-like types
        if dtype.startswith("int"):
            schema[col] = DataType.INTEGER
        elif dtype.startswith("float"):
            schema[col] = DataType.FLOAT
        elif dtype == "bool":
            schema[col] = DataType.BOOLEAN
        elif dtype.startswith("datetime"):
            schema[col] = DataType.DATETIME
        elif dtype.startswith("timedelta"):
            schema[col] = DataType.TIME
        else:
            # For object/string types, try to infer from content
            # This is important for JSON or date strings that pandas didn't parse
            from sqlstream.core.types import infer_type

            # Sample non-null values
            sample_values = self.df[col].dropna().head(10)
            if not sample_values.empty:
                # Use the first value to infer type
                schema[col] = infer_type(sample_values.iloc[0])
            else:
                schema[col] = DataType.STRING
    return Schema(schema)

supports_pushdown

supports_pushdown() -> bool

HTML reader supports filter pushdown

Source code in sqlstream/readers/html_reader.py
def supports_pushdown(self) -> bool:
    """HTML reader supports filter pushdown"""
    return True

supports_column_selection

supports_column_selection() -> bool

HTML reader supports column selection

Source code in sqlstream/readers/html_reader.py
def supports_column_selection(self) -> bool:
    """HTML reader supports column selection"""
    return True

set_filter

set_filter(conditions: list[Condition]) -> None

Set filter conditions

Source code in sqlstream/readers/html_reader.py
def set_filter(self, conditions: list[Condition]) -> None:
    """Set filter conditions"""
    self.filter_conditions = conditions

set_columns

set_columns(columns: list[str]) -> None

Set required columns

Source code in sqlstream/readers/html_reader.py
def set_columns(self, columns: list[str]) -> None:
    """Set required columns"""
    self.required_columns = columns

list_tables

list_tables() -> list[str]

List all tables found in the HTML

Returns:

Type Description
list[str]

List of table descriptions (first few column names)

Source code in sqlstream/readers/html_reader.py
def list_tables(self) -> list[str]:
    """
    List all tables found in the HTML

    Returns:
        List of table descriptions (first few column names)
    """
    descriptions = []
    for i, table in enumerate(self.tables):
        cols = list(table.columns)[:3]  # First 3 columns
        col_str = ", ".join(str(c) for c in cols)
        if len(table.columns) > 3:
            col_str += ", ..."
        descriptions.append(f"Table {i}: {col_str} ({len(table)} rows)")
    return descriptions

to_dataframe

to_dataframe()

Convert to pandas DataFrame efficiently

Source code in sqlstream/readers/html_reader.py
def to_dataframe(self):
    """
    Convert to pandas DataFrame efficiently
    """
    # HTMLReader already holds data as a DataFrame
    return self.df

HTTPReader

HTTPReader

Bases: BaseReader

Read data from HTTP/HTTPS URLs with intelligent caching

Automatically detects file format (CSV or Parquet) and delegates to appropriate reader. Caches downloaded files to avoid re-downloads.

Example

reader = HTTPReader("https://example.com/data.csv") for row in reader.read_lazy(): print(row)

Source code in sqlstream/readers/http_reader.py
class HTTPReader(BaseReader):
    """
    Read data from HTTP/HTTPS URLs with intelligent caching

    Automatically detects file format (CSV or Parquet) and delegates
    to appropriate reader. Caches downloaded files to avoid re-downloads.

    Example:
        reader = HTTPReader("https://example.com/data.csv")
        for row in reader.read_lazy():
            print(row)
    """

    def __init__(
        self,
        url: str,
        cache_dir: str | None = None,
        force_download: bool = False,
        format: str | None = None,
        **kwargs,
    ):
        """
        Initialize HTTP reader

        Args:
            url: HTTP/HTTPS URL to data file
            cache_dir: Directory to cache downloaded files (default: system temp)
            force_download: If True, re-download even if cached
            format: Explicit format specification (csv, parquet, html, markdown).
                   If not provided, will auto-detect from URL extension or content.
            **kwargs: Additional arguments passed to the delegate reader
        """
        if not HTTPX_AVAILABLE:
            raise ImportError("HTTP reader requires httpx library. Install `sqlstream[http]`")

        self.url = url
        self.cache_dir = (
            Path(cache_dir) if cache_dir else Path(tempfile.gettempdir()) / "sqlstream_cache"
        )
        self.force_download = force_download
        self.explicit_format = format
        self.reader_kwargs = kwargs

        # Ensure cache directory exists
        self.cache_dir.mkdir(parents=True, exist_ok=True)

        # Download or get cached file
        self.local_path = self._get_or_download()

        # Detect format and create appropriate reader
        self.delegate_reader = self._create_delegate_reader()

        # Delegate filter conditions and column selection
        self.filter_conditions: list[Condition] = []
        self.required_columns: list[str] = []

    def _get_cache_path(self) -> Path:
        """Generate cache file path based on URL hash"""
        # Create hash of URL for cache key
        url_hash = hashlib.md5(self.url.encode()).hexdigest()

        # Extract filename from URL
        parsed = urlparse(self.url)
        filename = Path(parsed.path).name or "data"

        # Cache path: cache_dir/url_hash_filename
        return self.cache_dir / f"{url_hash}_{filename}"

    def _get_or_download(self) -> Path:
        """Get cached file or download if not cached"""
        cache_path = self._get_cache_path()

        # Return cached file if it exists and we're not forcing download
        if cache_path.exists() and not self.force_download:
            return cache_path

        # Download file
        return self._download_file(cache_path)

    def _download_file(self, target_path: Path) -> Path:
        """Download file from URL to target path"""
        try:
            with httpx.stream("GET", self.url, follow_redirects=True) as response:
                response.raise_for_status()

                # Write to temporary file first, then move to target
                temp_path = target_path.with_suffix(".tmp")

                with open(temp_path, "wb") as f:
                    for chunk in response.iter_bytes(chunk_size=8192):
                        f.write(chunk)

                # Move temp file to final location
                temp_path.rename(target_path)

                return target_path

        except Exception as e:
            raise OSError(f"Failed to download {self.url}: {e}") from e

    def _create_delegate_reader(self) -> BaseReader:
        """Create appropriate reader based on file format"""
        format_to_use = self.explicit_format

        # If no explicit format, try to detect from URL extension
        if not format_to_use:
            path_lower = str(self.local_path).lower()

            if path_lower.endswith(".parquet"):
                format_to_use = "parquet"
            elif path_lower.endswith(".csv"):
                format_to_use = "csv"
            elif path_lower.endswith((".html", ".htm")):
                format_to_use = "html"
            elif path_lower.endswith((".md", ".markdown")):
                format_to_use = "markdown"
            elif path_lower.endswith(".json"):
                format_to_use = "json"
            elif path_lower.endswith(".jsonl"):
                format_to_use = "jsonl"
            else:
                # Try to detect from content
                format_to_use = self._detect_format_from_content()

        # Create appropriate reader based on detected/specified format
        if format_to_use == "parquet":
            if not PARQUET_AVAILABLE:
                raise ImportError("Parquet files require pyarrow. Install `sqlstream[parquet]`")
            return ParquetReader(str(self.local_path))

        elif format_to_use == "html":
            try:
                from sqlstream.readers.html_reader import HTMLReader

                return HTMLReader(str(self.local_path), **self.reader_kwargs)
            except ImportError as e:
                raise ImportError(
                    "HTML reader requires pandas library. Install `sqlstream[pandas]`"
                ) from e

        elif format_to_use == "markdown":
            from sqlstream.readers.markdown_reader import MarkdownReader

            return MarkdownReader(str(self.local_path), **self.reader_kwargs)

        elif format_to_use == "json":
            from sqlstream.readers.json_reader import JSONReader

            # Pass table/records_key if available in kwargs
            records_key = self.reader_kwargs.get("table")
            return JSONReader(str(self.local_path), records_key=records_key)

        elif format_to_use == "jsonl":
            from sqlstream.readers.jsonl_reader import JSONLReader

            return JSONLReader(str(self.local_path))

        else:  # csv or unknown - default to CSV
            try:
                return CSVReader(str(self.local_path))
            except Exception as e:
                raise ValueError(f"Unknown file format: {format_to_use}") from e

    def _detect_format_from_content(self) -> str:
        """Try to detect format by peeking at file content"""
        try:
            with open(self.local_path, "rb") as f:
                # Read first few bytes
                header = f.read(512)

            # Check for HTML
            if (
                b"<html" in header.lower()
                or b"<!doctype html" in header.lower()
                or b"<table" in header.lower()
            ):
                return "html"

            # Check for Markdown table (simple heuristic)
            if b"|" in header and b"---" in header:
                return "markdown"

            # Check for Parquet magic number
            if header.startswith(b"PAR1"):
                return "parquet"

            # Check for JSON (starts with [ or {)
            stripped = header.strip()
            if stripped.startswith(b"[") or (
                stripped.startswith(b"{") and b'"records":' in stripped
            ):
                return "json"

            # Check for JSONL (multiple lines starting with {)
            lines = header.split(b"\n")
            if (
                len(lines) > 1
                and lines[0].strip().startswith(b"{")
                and lines[1].strip().startswith(b"{")
            ):
                return "jsonl"

            # Default to CSV
            return "csv"

        except Exception:
            # If detection fails, default to CSV
            return "csv"

    def read_lazy(self) -> Iterator[dict[str, Any]]:
        """Read data lazily, delegating to underlying reader"""
        # Apply filter conditions to delegate
        if self.filter_conditions:
            self.delegate_reader.set_filter(self.filter_conditions)

        # Apply column selection to delegate
        if self.required_columns:
            self.delegate_reader.set_columns(self.required_columns)

        # Delegate to underlying reader
        yield from self.delegate_reader.read_lazy()

    def get_schema(self) -> dict[str, str]:
        """Get schema from delegate reader"""
        return self.delegate_reader.get_schema()

    def supports_pushdown(self) -> bool:
        """HTTP reader supports pushdown via delegation"""
        return self.delegate_reader.supports_pushdown()

    def supports_column_selection(self) -> bool:
        """HTTP reader supports column selection via delegation"""
        return self.delegate_reader.supports_column_selection()

    def set_filter(self, conditions: list[Condition]) -> None:
        """Set filter conditions (will be pushed to delegate)"""
        self.filter_conditions = conditions

    def set_columns(self, columns: list[str]) -> None:
        """Set required columns (will be pushed to delegate)"""
        self.required_columns = columns

    def clear_cache(self) -> None:
        """Remove cached file for this URL"""
        cache_path = self._get_cache_path()
        if cache_path.exists():
            cache_path.unlink()

    @staticmethod
    def clear_all_cache(cache_dir: str | None = None) -> int:
        """
        Clear all cached files

        Args:
            cache_dir: Cache directory to clear (default: system temp)

        Returns:
            Number of files deleted
        """
        cache_path = (
            Path(cache_dir) if cache_dir else Path(tempfile.gettempdir()) / "sqlstream_cache"
        )

        if not cache_path.exists():
            return 0

        count = 0
        for file in cache_path.glob("*"):
            if file.is_file():
                file.unlink()
                count += 1

        return count

__init__

__init__(url: str, cache_dir: str | None = None, force_download: bool = False, format: str | None = None, **kwargs)

Initialize HTTP reader

Parameters:

Name Type Description Default
url str

HTTP/HTTPS URL to data file

required
cache_dir str | None

Directory to cache downloaded files (default: system temp)

None
force_download bool

If True, re-download even if cached

False
format str | None

Explicit format specification (csv, parquet, html, markdown). If not provided, will auto-detect from URL extension or content.

None
**kwargs

Additional arguments passed to the delegate reader

{}
Source code in sqlstream/readers/http_reader.py
def __init__(
    self,
    url: str,
    cache_dir: str | None = None,
    force_download: bool = False,
    format: str | None = None,
    **kwargs,
):
    """
    Initialize HTTP reader

    Args:
        url: HTTP/HTTPS URL to data file
        cache_dir: Directory to cache downloaded files (default: system temp)
        force_download: If True, re-download even if cached
        format: Explicit format specification (csv, parquet, html, markdown).
               If not provided, will auto-detect from URL extension or content.
        **kwargs: Additional arguments passed to the delegate reader
    """
    if not HTTPX_AVAILABLE:
        raise ImportError("HTTP reader requires httpx library. Install `sqlstream[http]`")

    self.url = url
    self.cache_dir = (
        Path(cache_dir) if cache_dir else Path(tempfile.gettempdir()) / "sqlstream_cache"
    )
    self.force_download = force_download
    self.explicit_format = format
    self.reader_kwargs = kwargs

    # Ensure cache directory exists
    self.cache_dir.mkdir(parents=True, exist_ok=True)

    # Download or get cached file
    self.local_path = self._get_or_download()

    # Detect format and create appropriate reader
    self.delegate_reader = self._create_delegate_reader()

    # Delegate filter conditions and column selection
    self.filter_conditions: list[Condition] = []
    self.required_columns: list[str] = []

read_lazy

read_lazy() -> Iterator[dict[str, Any]]

Read data lazily, delegating to underlying reader

Source code in sqlstream/readers/http_reader.py
def read_lazy(self) -> Iterator[dict[str, Any]]:
    """Read data lazily, delegating to underlying reader"""
    # Apply filter conditions to delegate
    if self.filter_conditions:
        self.delegate_reader.set_filter(self.filter_conditions)

    # Apply column selection to delegate
    if self.required_columns:
        self.delegate_reader.set_columns(self.required_columns)

    # Delegate to underlying reader
    yield from self.delegate_reader.read_lazy()

get_schema

get_schema() -> dict[str, str]

Get schema from delegate reader

Source code in sqlstream/readers/http_reader.py
def get_schema(self) -> dict[str, str]:
    """Get schema from delegate reader"""
    return self.delegate_reader.get_schema()

supports_pushdown

supports_pushdown() -> bool

HTTP reader supports pushdown via delegation

Source code in sqlstream/readers/http_reader.py
def supports_pushdown(self) -> bool:
    """HTTP reader supports pushdown via delegation"""
    return self.delegate_reader.supports_pushdown()

supports_column_selection

supports_column_selection() -> bool

HTTP reader supports column selection via delegation

Source code in sqlstream/readers/http_reader.py
def supports_column_selection(self) -> bool:
    """HTTP reader supports column selection via delegation"""
    return self.delegate_reader.supports_column_selection()

set_filter

set_filter(conditions: list[Condition]) -> None

Set filter conditions (will be pushed to delegate)

Source code in sqlstream/readers/http_reader.py
def set_filter(self, conditions: list[Condition]) -> None:
    """Set filter conditions (will be pushed to delegate)"""
    self.filter_conditions = conditions

set_columns

set_columns(columns: list[str]) -> None

Set required columns (will be pushed to delegate)

Source code in sqlstream/readers/http_reader.py
def set_columns(self, columns: list[str]) -> None:
    """Set required columns (will be pushed to delegate)"""
    self.required_columns = columns

clear_cache

clear_cache() -> None

Remove cached file for this URL

Source code in sqlstream/readers/http_reader.py
def clear_cache(self) -> None:
    """Remove cached file for this URL"""
    cache_path = self._get_cache_path()
    if cache_path.exists():
        cache_path.unlink()

clear_all_cache staticmethod

clear_all_cache(cache_dir: str | None = None) -> int

Clear all cached files

Parameters:

Name Type Description Default
cache_dir str | None

Cache directory to clear (default: system temp)

None

Returns:

Type Description
int

Number of files deleted

Source code in sqlstream/readers/http_reader.py
@staticmethod
def clear_all_cache(cache_dir: str | None = None) -> int:
    """
    Clear all cached files

    Args:
        cache_dir: Cache directory to clear (default: system temp)

    Returns:
        Number of files deleted
    """
    cache_path = (
        Path(cache_dir) if cache_dir else Path(tempfile.gettempdir()) / "sqlstream_cache"
    )

    if not cache_path.exists():
        return 0

    count = 0
    for file in cache_path.glob("*"):
        if file.is_file():
            file.unlink()
            count += 1

    return count

JSONReader

JSONReader

Bases: BaseReader

Reader for standard JSON files.

Supports: - Array of objects: [{"a": 1}, {"a": 2}] - Object with records key: {"data": [{"a": 1}, ...], "meta": ...} - Automatic type inference - Predicate pushdown (filtering in Python) - Column pruning

Source code in sqlstream/readers/json_reader.py
class JSONReader(BaseReader):
    """
    Reader for standard JSON files.

    Supports:
    - Array of objects: [{"a": 1}, {"a": 2}]
    - Object with records key: {"data": [{"a": 1}, ...], "meta": ...}
    - Automatic type inference
    - Predicate pushdown (filtering in Python)
    - Column pruning
    """

    def __init__(self, path: str, records_key: str | None = None, encoding: str = "utf-8"):
        """
        Initialize JSON reader

        Args:
            path: Path to JSON file
            records_key: Key containing the list of records (e.g., "data", "records").
                        If None, attempts to auto-detect or expects root to be a list.
            encoding: File encoding (default: utf-8)
        """
        self.path_str = path
        self.is_s3 = path.startswith("s3://")
        if not self.is_s3:
            self.path = Path(path)
        else:
            self.path = None

        self.records_key = records_key
        self.encoding = encoding

        # Optimization flags
        self.filter_conditions: list[Condition] = []
        self.required_columns: list[str] = []
        self.limit: int | None = None

        if not self.is_s3 and not self.path.exists():
            raise FileNotFoundError(f"JSON file not found: {path}")

    def supports_pushdown(self) -> bool:
        return True

    def supports_column_selection(self) -> bool:
        return True

    def supports_limit(self) -> bool:
        return True

    def set_filter(self, conditions: list[Condition]) -> None:
        self.filter_conditions = conditions

    def set_columns(self, columns: list[str]) -> None:
        self.required_columns = columns

    def set_limit(self, limit: int) -> None:
        self.limit = limit

    def _get_file_handle(self):
        """Get file handle for reading (local or S3)."""
        if self.is_s3:
            try:
                import s3fs

                fs = s3fs.S3FileSystem(anon=False)
                return fs.open(self.path_str, mode="r", encoding=self.encoding)
            except ImportError as e:
                raise ImportError(
                    "s3fs is required for S3 support. Install with: pip install sqlstream[s3]"
                ) from e
        else:
            return open(self.path, encoding=self.encoding)

    def read_lazy(self) -> Iterator[dict[str, Any]]:
        """
        Read JSON file and yield records.

        Note: Standard JSON parsing loads the whole file into memory.
        For large files, use JSONL format.
        """
        with self._get_file_handle() as f:
            try:
                data = json.load(f)
            except json.JSONDecodeError as e:
                raise ValueError(f"Invalid JSON file {self.path_str}: {e}") from e

        # Locate records
        records = self._locate_records(data)

        rows_yielded = 0
        for row in records:
            if not isinstance(row, dict):
                continue

            # Apply filters
            if self.filter_conditions:
                if not self._matches_filter(row):
                    continue

            # Apply column selection
            if self.required_columns:
                row = {k: row.get(k) for k in self.required_columns}

            yield row
            rows_yielded += 1

            if self.limit is not None and rows_yielded >= self.limit:
                break

    def _locate_records(self, data: Any) -> list[dict[str, Any]]:
        """
        Find the list of records in the JSON data.

        Supports JSONPath-like syntax:
        - "key" - simple key access
        - "key.nested" - nested object access
        - "key[0]" - array index access
        - "key[]" - flatten/merge arrays from all elements
        - "key[].nested" - nested access after flattening
        """
        # If no records_key specified, use auto-detection
        if not self.records_key:
            return self._auto_detect_records(data)

        # Navigate using the path
        result = self._navigate_path(data, self.records_key)

        # Ensure result is a list of dicts
        if isinstance(result, list):
            return result
        elif isinstance(result, dict):
            return [result]
        else:
            raise ValueError(f"Path '{self.records_key}' did not resolve to a list or object")

    def _auto_detect_records(self, data: Any) -> list[dict[str, Any]]:
        """Auto-detect records when no path is specified"""
        # If root is a list, that's our data
        if isinstance(data, list):
            return data

        # If root is a dict, look for common keys
        if isinstance(data, dict):
            common_keys = ["data", "records", "items", "rows", "results"]
            for key in common_keys:
                if key in data and isinstance(data[key], list):
                    return data[key]

            # Look for any list value
            for _, value in data.items():
                if isinstance(value, list) and len(value) > 0:
                    return value

            # If single object, treat as one-row table
            return [data]

        raise ValueError("JSON content must be a list or an object containing a list")

    def _navigate_path(self, data: Any, path: str) -> Any:
        """
        Navigate through JSON using a path string.

        Supports:
        - "key" - simple key
        - "key.nested.deep" - dot notation
        - "key[0]" - array indexing
        - "key[]" - flatten arrays
        - "key[].nested" - combinations

        Examples:
        - "users" → data["users"]
        - "result.orders" → data["result"]["orders"]
        - "users[0]" → data["users"][0]
        - "users[].transactions" → flatten [user["transactions"] for user in data["users"]]
        """
        import re

        # Handle flattening first if [] is in the path
        if "[]" in path:
            return self._flatten_path(data, path)

        current = data

        # Split path into segments (handling dots and brackets)
        # Pattern matches: "key", "key[0]", "key[1]" etc.
        segments = re.findall(r"([^.\[]+)(\[\d+\])?", path)

        for key, bracket in segments:
            if not key:
                continue

            # Access key in current dict
            if isinstance(current, dict):
                if key not in current:
                    raise ValueError(f"Key '{key}' not found in JSON")
                current = current[key]
            else:
                raise ValueError(f"Cannot access key '{key}' on non-dict type")

            # Handle bracket notation (numeric index only)
            if bracket:
                # Extract index [0], [1], etc.
                index = int(bracket[1:-1])  # Remove [ and ]
                if not isinstance(current, list):
                    raise ValueError(f"Cannot index non-list at '{key}'")
                if index < 0 or index >= len(current):
                    raise ValueError(f"Index {index} out of range for '{key}'")
                current = current[index]

        return current

    def _flatten_path(self, data: Any, path: str) -> list[Any]:
        """
        Handle flattening for paths with [].

        Examples:
        - "users[]" → flatten data["users"]
        - "users[].transactions" → flatten [user["transactions"] for user in data["users"]]
        """
        # Split on [] to find the flattening point
        parts = path.split("[]")

        if len(parts) != 2:
            raise ValueError("Only one '[]' operator is supported per path")

        before_flatten = parts[0]
        after_flatten = parts[1].lstrip(".")  # Remove leading dot if present

        # Navigate to the array to flatten (without [] in the path)
        if before_flatten:
            # Use simple navigation without []
            array = self._navigate_simple(data, before_flatten)
        else:
            array = data

        if not isinstance(array, list):
            raise ValueError(f"Cannot flatten non-list at '{before_flatten}'")

        # Flatten and optionally extract nested keys
        result = []
        for item in array:
            if after_flatten:
                # Navigate to nested key in each item
                try:
                    nested = self._navigate_simple(item, after_flatten)
                    if isinstance(nested, list):
                        result.extend(nested)
                    else:
                        result.append(nested)
                except (ValueError, KeyError):
                    # Skip items that don't have the nested key
                    continue
            else:
                # Just flatten the array
                if isinstance(item, dict):
                    result.append(item)

        return result

    def _navigate_simple(self, data: Any, path: str) -> Any:
        """Navigate a simple path without [] operators"""
        import re

        current = data
        segments = re.findall(r"([^.\[]+)(\[\d+\])?", path)

        for key, bracket in segments:
            if not key:
                continue

            if isinstance(current, dict):
                if key not in current:
                    raise ValueError(f"Key '{key}' not found in JSON")
                current = current[key]
            else:
                raise ValueError(f"Cannot access key '{key}' on non-dict type")

            if bracket:
                index = int(bracket[1:-1])
                if not isinstance(current, list):
                    raise ValueError(f"Cannot index non-list at '{key}'")
                if index < 0 or index >= len(current):
                    raise ValueError(f"Index {index} out of range for '{key}'")
                current = current[index]

        return current

    def _matches_filter(self, row: dict[str, Any]) -> bool:
        """Check if row matches filter conditions"""
        for condition in self.filter_conditions:
            if not self._evaluate_condition(row, condition):
                return False
        return True

    def _evaluate_condition(self, row: dict[str, Any], condition: Condition) -> bool:
        """Evaluate single condition"""
        if condition.column not in row:
            return False

        value = row[condition.column]
        if value is None:
            return False

        op = condition.operator
        expected = condition.value

        try:
            if op == "=":
                return value == expected
            elif op == ">":
                return value > expected
            elif op == "<":
                return value < expected
            elif op == ">=":
                return value >= expected
            elif op == "<=":
                return value <= expected
            elif op == "!=":
                return value != expected
            else:
                return True
        except TypeError:
            return False

    def get_schema(self) -> Schema | None:
        """Infer schema from data"""
        # We have to load the file to get schema
        try:
            # Get first few rows
            rows = []
            iterator = self.read_lazy()
            for _ in range(100):
                try:
                    rows.append(next(iterator))
                except StopIteration:
                    break

            if not rows:
                return None

            return Schema.from_rows(rows)
        except Exception:
            return None

__init__

__init__(path: str, records_key: str | None = None, encoding: str = 'utf-8')

Initialize JSON reader

Parameters:

Name Type Description Default
path str

Path to JSON file

required
records_key str | None

Key containing the list of records (e.g., "data", "records"). If None, attempts to auto-detect or expects root to be a list.

None
encoding str

File encoding (default: utf-8)

'utf-8'
Source code in sqlstream/readers/json_reader.py
def __init__(self, path: str, records_key: str | None = None, encoding: str = "utf-8"):
    """
    Initialize JSON reader

    Args:
        path: Path to JSON file
        records_key: Key containing the list of records (e.g., "data", "records").
                    If None, attempts to auto-detect or expects root to be a list.
        encoding: File encoding (default: utf-8)
    """
    self.path_str = path
    self.is_s3 = path.startswith("s3://")
    if not self.is_s3:
        self.path = Path(path)
    else:
        self.path = None

    self.records_key = records_key
    self.encoding = encoding

    # Optimization flags
    self.filter_conditions: list[Condition] = []
    self.required_columns: list[str] = []
    self.limit: int | None = None

    if not self.is_s3 and not self.path.exists():
        raise FileNotFoundError(f"JSON file not found: {path}")

read_lazy

read_lazy() -> Iterator[dict[str, Any]]

Read JSON file and yield records.

Note: Standard JSON parsing loads the whole file into memory. For large files, use JSONL format.

Source code in sqlstream/readers/json_reader.py
def read_lazy(self) -> Iterator[dict[str, Any]]:
    """
    Read JSON file and yield records.

    Note: Standard JSON parsing loads the whole file into memory.
    For large files, use JSONL format.
    """
    with self._get_file_handle() as f:
        try:
            data = json.load(f)
        except json.JSONDecodeError as e:
            raise ValueError(f"Invalid JSON file {self.path_str}: {e}") from e

    # Locate records
    records = self._locate_records(data)

    rows_yielded = 0
    for row in records:
        if not isinstance(row, dict):
            continue

        # Apply filters
        if self.filter_conditions:
            if not self._matches_filter(row):
                continue

        # Apply column selection
        if self.required_columns:
            row = {k: row.get(k) for k in self.required_columns}

        yield row
        rows_yielded += 1

        if self.limit is not None and rows_yielded >= self.limit:
            break

get_schema

get_schema() -> Schema | None

Infer schema from data

Source code in sqlstream/readers/json_reader.py
def get_schema(self) -> Schema | None:
    """Infer schema from data"""
    # We have to load the file to get schema
    try:
        # Get first few rows
        rows = []
        iterator = self.read_lazy()
        for _ in range(100):
            try:
                rows.append(next(iterator))
            except StopIteration:
                break

        if not rows:
            return None

        return Schema.from_rows(rows)
    except Exception:
        return None

JSONLReader

JSONLReader

Bases: BaseReader

Reader for JSONL (JSON Lines) files.

Format: {"id": 1, "name": "Alice"}

Features: - True lazy loading (line-by-line) - Handle malformed lines - Predicate pushdown - Column pruning

Source code in sqlstream/readers/jsonl_reader.py
class JSONLReader(BaseReader):
    """
    Reader for JSONL (JSON Lines) files.

    Format:
    {"id": 1, "name": "Alice"}
    {"id": 2, "name": "Bob"}

    Features:
    - True lazy loading (line-by-line)
    - Handle malformed lines
    - Predicate pushdown
    - Column pruning
    """

    def __init__(self, path: str, encoding: str = "utf-8"):
        """
        Initialize JSONL reader

        Args:
            path: Path to JSONL file
            encoding: File encoding (default: utf-8)
        """
        self.path_str = path
        self.is_s3 = path.startswith("s3://")
        if not self.is_s3:
            self.path = Path(path)
        else:
            self.path = None

        self.encoding = encoding

        # Optimization flags
        self.filter_conditions: list[Condition] = []
        self.required_columns: list[str] = []
        self.limit: int | None = None

        if not self.is_s3 and not self.path.exists():
            raise FileNotFoundError(f"JSONL file not found: {path}")

    def supports_pushdown(self) -> bool:
        return True

    def supports_column_selection(self) -> bool:
        return True

    def supports_limit(self) -> bool:
        return True

    def set_filter(self, conditions: list[Condition]) -> None:
        self.filter_conditions = conditions

    def set_columns(self, columns: list[str]) -> None:
        self.required_columns = columns

    def set_limit(self, limit: int) -> None:
        self.limit = limit

    def _get_file_handle(self):
        """Get file handle for reading (local or S3)."""
        if self.is_s3:
            try:
                import s3fs

                fs = s3fs.S3FileSystem(anon=False)
                return fs.open(self.path_str, mode="r", encoding=self.encoding)
            except ImportError as e:
                raise ImportError(
                    "s3fs is required for S3 support. Install with: pip install sqlstream[s3]"
                ) from e
        else:
            return open(self.path, encoding=self.encoding)

    def read_lazy(self) -> Iterator[dict[str, Any]]:
        """
        Yield rows from JSONL file line by line
        """
        with self._get_file_handle() as f:
            rows_yielded = 0

            for line_num, line in enumerate(f, start=1):
                line = line.strip()
                if not line:
                    continue

                try:
                    row = json.loads(line)

                    if not isinstance(row, dict):
                        warnings.warn(
                            f"Skipping non-dict row at line {line_num}", UserWarning, stacklevel=2
                        )
                        continue

                    # Apply filters
                    if self.filter_conditions:
                        if not self._matches_filter(row):
                            continue

                    # Apply column selection
                    if self.required_columns:
                        row = {k: row.get(k) for k in self.required_columns}

                    yield row
                    rows_yielded += 1

                    if self.limit is not None and rows_yielded >= self.limit:
                        break

                except json.JSONDecodeError:
                    warnings.warn(
                        f"Skipping invalid JSON at line {line_num}", UserWarning, stacklevel=2
                    )
                    continue

    def _matches_filter(self, row: dict[str, Any]) -> bool:
        """Check if row matches filter conditions"""
        for condition in self.filter_conditions:
            if not self._evaluate_condition(row, condition):
                return False
        return True

    def _evaluate_condition(self, row: dict[str, Any], condition: Condition) -> bool:
        """Evaluate single condition"""
        if condition.column not in row:
            return False

        value = row[condition.column]
        if value is None:
            return False

        op = condition.operator
        expected = condition.value

        try:
            if op == "=":
                return value == expected
            elif op == ">":
                return value > expected
            elif op == "<":
                return value < expected
            elif op == ">=":
                return value >= expected
            elif op == "<=":
                return value <= expected
            elif op == "!=":
                return value != expected
            else:
                return True
        except TypeError:
            return False

    def get_schema(self, sample_size: int = 100) -> Schema | None:
        """Infer schema by sampling first N lines"""
        sample_rows = []

        try:
            iterator = self.read_lazy()
            for _ in range(sample_size):
                try:
                    sample_rows.append(next(iterator))
                except StopIteration:
                    break
        except Exception:
            pass

        if not sample_rows:
            return None

        return Schema.from_rows(sample_rows)

__init__

__init__(path: str, encoding: str = 'utf-8')

Initialize JSONL reader

Parameters:

Name Type Description Default
path str

Path to JSONL file

required
encoding str

File encoding (default: utf-8)

'utf-8'
Source code in sqlstream/readers/jsonl_reader.py
def __init__(self, path: str, encoding: str = "utf-8"):
    """
    Initialize JSONL reader

    Args:
        path: Path to JSONL file
        encoding: File encoding (default: utf-8)
    """
    self.path_str = path
    self.is_s3 = path.startswith("s3://")
    if not self.is_s3:
        self.path = Path(path)
    else:
        self.path = None

    self.encoding = encoding

    # Optimization flags
    self.filter_conditions: list[Condition] = []
    self.required_columns: list[str] = []
    self.limit: int | None = None

    if not self.is_s3 and not self.path.exists():
        raise FileNotFoundError(f"JSONL file not found: {path}")

read_lazy

read_lazy() -> Iterator[dict[str, Any]]

Yield rows from JSONL file line by line

Source code in sqlstream/readers/jsonl_reader.py
def read_lazy(self) -> Iterator[dict[str, Any]]:
    """
    Yield rows from JSONL file line by line
    """
    with self._get_file_handle() as f:
        rows_yielded = 0

        for line_num, line in enumerate(f, start=1):
            line = line.strip()
            if not line:
                continue

            try:
                row = json.loads(line)

                if not isinstance(row, dict):
                    warnings.warn(
                        f"Skipping non-dict row at line {line_num}", UserWarning, stacklevel=2
                    )
                    continue

                # Apply filters
                if self.filter_conditions:
                    if not self._matches_filter(row):
                        continue

                # Apply column selection
                if self.required_columns:
                    row = {k: row.get(k) for k in self.required_columns}

                yield row
                rows_yielded += 1

                if self.limit is not None and rows_yielded >= self.limit:
                    break

            except json.JSONDecodeError:
                warnings.warn(
                    f"Skipping invalid JSON at line {line_num}", UserWarning, stacklevel=2
                )
                continue

get_schema

get_schema(sample_size: int = 100) -> Schema | None

Infer schema by sampling first N lines

Source code in sqlstream/readers/jsonl_reader.py
def get_schema(self, sample_size: int = 100) -> Schema | None:
    """Infer schema by sampling first N lines"""
    sample_rows = []

    try:
        iterator = self.read_lazy()
        for _ in range(sample_size):
            try:
                sample_rows.append(next(iterator))
            except StopIteration:
                break
    except Exception:
        pass

    if not sample_rows:
        return None

    return Schema.from_rows(sample_rows)

MarkdownReader

MarkdownReader

Bases: BaseReader

Read tables from Markdown files

Parses Markdown tables (GFM format) and allows querying them. Supports files with multiple tables.

Example Markdown table
Name Age City
Alice 30 New York
Bob 25 San Francisco
Example

reader = MarkdownReader("data.md") for row in reader.read_lazy(): print(row)

Source code in sqlstream/readers/markdown_reader.py
class MarkdownReader(BaseReader):
    """
    Read tables from Markdown files

    Parses Markdown tables (GFM format) and allows querying them.
    Supports files with multiple tables.

    Example Markdown table:
        | Name    | Age | City          |
        |:--------|----:|--------------:|
        | Alice   | 30  | New York      |
        | Bob     | 25  | San Francisco |

    Example:
        reader = MarkdownReader("data.md")
        for row in reader.read_lazy():
            print(row)
    """

    def __init__(
        self,
        source: str,
        table: int = 0,
    ):
        """
        Initialize Markdown reader

        Args:
            source: Path to Markdown file
            table: Which table to read if multiple tables exist (0-indexed)
        """
        self.source = source
        self.table = table

        # Parse tables from Markdown
        self._parse_markdown()

        # Filter conditions and columns
        self.filter_conditions: list[Condition] = []
        self.required_columns: list[str] = []

    def _parse_markdown(self) -> None:
        """Parse all tables from Markdown file"""
        # Read file content
        with open(self.source, encoding="utf-8") as f:
            content = f.read()

        # Find all tables
        self.tables = self._extract_tables(content)

        if not self.tables:
            raise ValueError(f"No tables found in Markdown file: {self.source}")

        if self.table >= len(self.tables):
            raise ValueError(
                f"Table index {self.table} out of range. "
                f"Markdown contains {len(self.tables)} table(s)."
            )

        # Select the table to work with
        self.data = self.tables[self.table]
        self.columns = self.data["columns"]
        self.rows = self.data["rows"]

    def _extract_tables(self, content: str) -> list[dict[str, Any]]:
        """
        Extract all tables from Markdown content

        Returns:
            List of table dicts with 'columns' and 'rows' keys
        """
        tables = []
        lines = content.split("\n")

        i = 0
        while i < len(lines):
            # Check if this line looks like a table header
            line = lines[i].strip()

            if line.startswith("|") and "|" in line[1:]:
                # Potential table start
                # Check if next line is separator
                if i + 1 < len(lines):
                    next_line = lines[i + 1].strip()
                    if self._is_separator_line(next_line):
                        # Found a table!
                        table = self._parse_table(lines, i)
                        if table:
                            tables.append(table)
                        # Skip past this table
                        i += table.get("line_count", 2)
                        continue

            i += 1

        return tables

    def _is_separator_line(self, line: str) -> bool:
        """Check if line is a table separator (e.g., |:---|---:|)"""
        if not line.startswith("|"):
            return False

        # Remove outer pipes and split
        parts = line.strip("|").split("|")

        # Check if all parts match separator pattern
        # Separators can be: ---, :---, ---:, :---:
        separator_pattern = re.compile(r"^:?-+:?$")

        return all(separator_pattern.match(p.strip()) for p in parts if p.strip())

    def _parse_table(self, lines: list[str], start_idx: int) -> dict[str, Any] | None:
        """Parse a single table starting at the given index"""
        # Parse header
        header_line = lines[start_idx].strip()
        columns = self._parse_row(header_line, infer_types=False)

        if not columns:
            return None

        # Skip separator line
        rows = []
        i = start_idx + 2  # Skip header and separator

        # Parse data rows
        while i < len(lines):
            line = lines[i].strip()

            # Stop if we hit an empty line or non-table content
            if not line or not line.startswith("|"):
                break

            # Skip if it's another separator (shouldn't happen in valid markdown)
            if self._is_separator_line(line):
                i += 1
                continue

            # Parse row
            row_values = self._parse_row(line)
            if row_values:
                # Ensure row has same number of columns as header
                # Pad with None if needed
                while len(row_values) < len(columns):
                    row_values.append(None)

                # Create row dict
                row_dict = {columns[j]: row_values[j] for j in range(len(columns))}
                rows.append(row_dict)

            i += 1

        return {"columns": columns, "rows": rows, "line_count": i - start_idx}

    def _parse_row(self, line: str, infer_types: bool = True) -> list[Any]:
        """Parse a single table row"""
        # Remove leading/trailing pipes and whitespace
        line = line.strip("|").strip()

        # Split by pipes, handling escaped pipes
        parts = []
        current = ""
        escaped = False

        for char in line:
            if char == "\\" and not escaped:
                escaped = True
                continue
            elif char == "|" and not escaped:
                parts.append(current.strip())
                current = ""
            else:
                if escaped:
                    current += "\\"
                    escaped = False
                current += char

        # Add last part
        if current or line.endswith("|"):
            parts.append(current.strip())

        # Clean up values
        cleaned = []
        for part in parts:
            # Convert empty strings and common null representations to None
            if not part or part.lower() in ("null", "none", "n/a", "-"):
                cleaned.append(None)
            else:
                # Try to infer types if requested
                if infer_types:
                    cleaned.append(self._infer_type(part))
                else:
                    cleaned.append(part)

        return cleaned

    def _infer_type(self, value: str) -> Any:
        """Infer and convert value to appropriate type"""
        from sqlstream.core.types import infer_type_from_string

        return infer_type_from_string(value)

    def read_lazy(self) -> Iterator[dict[str, Any]]:
        """Read data lazily from the selected table"""
        for row in self.rows:
            # Apply filters if any
            if self.filter_conditions:
                if not self._matches_filters(row):
                    continue

            # Apply column selection if any
            if self.required_columns:
                filtered_row = {k: v for k, v in row.items() if k in self.required_columns}
                yield filtered_row
            else:
                yield row

    def _matches_filters(self, row: dict[str, Any]) -> bool:
        """Check if row matches all filter conditions"""
        for condition in self.filter_conditions:
            col = condition.column
            op = condition.operator
            value = condition.value

            if col not in row:
                return False

            row_value = row[col]

            # Handle None values
            if row_value is None:
                return False

            # Apply operator
            if op == "=":
                if row_value != value:
                    return False
            elif op == ">":
                if row_value <= value:
                    return False
            elif op == "<":
                if row_value >= value:
                    return False
            elif op == ">=":
                if row_value < value:
                    return False
            elif op == "<=":
                if row_value > value:
                    return False
            elif op == "!=":
                if row_value == value:
                    return False

        return True

    def get_schema(self) -> Schema:
        """Get schema by inferring types from first few rows"""
        schema = {}

        # Sample first few rows to infer types
        sample_size = min(10, len(self.rows))

        from sqlstream.core.types import infer_type

        for col in self.columns:
            # Collect non-None values
            values = [row[col] for row in self.rows[:sample_size] if row.get(col) is not None]

            if not values:
                schema[col] = DataType.STRING
                continue

            # Infer type from values
            # Use the first non-null value to infer type, or check consistency
            # For simplicity, we'll use the first value's type, but we could be more robust
            first_val = values[0]
            schema[col] = infer_type(first_val)

        return Schema(schema)

    def supports_pushdown(self) -> bool:
        """Markdown reader supports filter pushdown"""
        return True

    def supports_column_selection(self) -> bool:
        """Markdown reader supports column selection"""
        return True

    def set_filter(self, conditions: list[Condition]) -> None:
        """Set filter conditions"""
        self.filter_conditions = conditions

    def set_columns(self, columns: list[str]) -> None:
        """Set required columns"""
        self.required_columns = columns

    def list_tables(self) -> list[str]:
        """
        List all tables found in the Markdown file

        Returns:
            List of table descriptions
        """
        descriptions = []
        for i, table in enumerate(self.tables):
            cols = table["columns"][:3]
            col_str = ", ".join(cols)
            if len(table["columns"]) > 3:
                col_str += ", ..."
            row_count = len(table["rows"])
            descriptions.append(f"Table {i}: {col_str} ({row_count} rows)")
        return descriptions

    def to_dataframe(self):
        """
        Convert to pandas DataFrame
        """
        try:
            import pandas as pd
        except ImportError as e:
            raise ImportError("Pandas is required for to_dataframe()") from e

        return pd.DataFrame(self.rows)

__init__

__init__(source: str, table: int = 0)

Initialize Markdown reader

Parameters:

Name Type Description Default
source str

Path to Markdown file

required
table int

Which table to read if multiple tables exist (0-indexed)

0
Source code in sqlstream/readers/markdown_reader.py
def __init__(
    self,
    source: str,
    table: int = 0,
):
    """
    Initialize Markdown reader

    Args:
        source: Path to Markdown file
        table: Which table to read if multiple tables exist (0-indexed)
    """
    self.source = source
    self.table = table

    # Parse tables from Markdown
    self._parse_markdown()

    # Filter conditions and columns
    self.filter_conditions: list[Condition] = []
    self.required_columns: list[str] = []

read_lazy

read_lazy() -> Iterator[dict[str, Any]]

Read data lazily from the selected table

Source code in sqlstream/readers/markdown_reader.py
def read_lazy(self) -> Iterator[dict[str, Any]]:
    """Read data lazily from the selected table"""
    for row in self.rows:
        # Apply filters if any
        if self.filter_conditions:
            if not self._matches_filters(row):
                continue

        # Apply column selection if any
        if self.required_columns:
            filtered_row = {k: v for k, v in row.items() if k in self.required_columns}
            yield filtered_row
        else:
            yield row

get_schema

get_schema() -> Schema

Get schema by inferring types from first few rows

Source code in sqlstream/readers/markdown_reader.py
def get_schema(self) -> Schema:
    """Get schema by inferring types from first few rows"""
    schema = {}

    # Sample first few rows to infer types
    sample_size = min(10, len(self.rows))

    from sqlstream.core.types import infer_type

    for col in self.columns:
        # Collect non-None values
        values = [row[col] for row in self.rows[:sample_size] if row.get(col) is not None]

        if not values:
            schema[col] = DataType.STRING
            continue

        # Infer type from values
        # Use the first non-null value to infer type, or check consistency
        # For simplicity, we'll use the first value's type, but we could be more robust
        first_val = values[0]
        schema[col] = infer_type(first_val)

    return Schema(schema)

supports_pushdown

supports_pushdown() -> bool

Markdown reader supports filter pushdown

Source code in sqlstream/readers/markdown_reader.py
def supports_pushdown(self) -> bool:
    """Markdown reader supports filter pushdown"""
    return True

supports_column_selection

supports_column_selection() -> bool

Markdown reader supports column selection

Source code in sqlstream/readers/markdown_reader.py
def supports_column_selection(self) -> bool:
    """Markdown reader supports column selection"""
    return True

set_filter

set_filter(conditions: list[Condition]) -> None

Set filter conditions

Source code in sqlstream/readers/markdown_reader.py
def set_filter(self, conditions: list[Condition]) -> None:
    """Set filter conditions"""
    self.filter_conditions = conditions

set_columns

set_columns(columns: list[str]) -> None

Set required columns

Source code in sqlstream/readers/markdown_reader.py
def set_columns(self, columns: list[str]) -> None:
    """Set required columns"""
    self.required_columns = columns

list_tables

list_tables() -> list[str]

List all tables found in the Markdown file

Returns:

Type Description
list[str]

List of table descriptions

Source code in sqlstream/readers/markdown_reader.py
def list_tables(self) -> list[str]:
    """
    List all tables found in the Markdown file

    Returns:
        List of table descriptions
    """
    descriptions = []
    for i, table in enumerate(self.tables):
        cols = table["columns"][:3]
        col_str = ", ".join(cols)
        if len(table["columns"]) > 3:
            col_str += ", ..."
        row_count = len(table["rows"])
        descriptions.append(f"Table {i}: {col_str} ({row_count} rows)")
    return descriptions

to_dataframe

to_dataframe()

Convert to pandas DataFrame

Source code in sqlstream/readers/markdown_reader.py
def to_dataframe(self):
    """
    Convert to pandas DataFrame
    """
    try:
        import pandas as pd
    except ImportError as e:
        raise ImportError("Pandas is required for to_dataframe()") from e

    return pd.DataFrame(self.rows)

ParallelCSVReader

ParallelCSVReader

Parallel CSV reader using chunked reading

Note

This is a placeholder for true parallel CSV reading. Implementing this correctly requires: - Chunk boundary detection (find newlines) - Header parsing and schema inference - Correct line splitting across chunks - Order preservation

For now, this is just a wrapper around ParallelReader.

Source code in sqlstream/readers/parallel_reader.py
class ParallelCSVReader:
    """
    Parallel CSV reader using chunked reading

    Note:
        This is a placeholder for true parallel CSV reading.
        Implementing this correctly requires:
        - Chunk boundary detection (find newlines)
        - Header parsing and schema inference
        - Correct line splitting across chunks
        - Order preservation

    For now, this is just a wrapper around ParallelReader.
    """

    def __init__(self, path: str, num_threads: int = 4, chunk_size: int = 1024 * 1024):
        """
        Initialize parallel CSV reader

        Args:
            path: Path to CSV file
            num_threads: Number of worker threads
            chunk_size: Chunk size in bytes
        """
        from sqlstream.readers.csv_reader import CSVReader

        self.reader = CSVReader(path)
        self.parallel_reader = ParallelReader(self.reader, num_threads=num_threads)

    def read_lazy(self) -> Iterator[dict[str, Any]]:
        """Yield rows"""
        return self.parallel_reader.read_lazy()

    def __iter__(self):
        """Allow iteration"""
        return self.read_lazy()

__init__

__init__(path: str, num_threads: int = 4, chunk_size: int = 1024 * 1024)

Initialize parallel CSV reader

Parameters:

Name Type Description Default
path str

Path to CSV file

required
num_threads int

Number of worker threads

4
chunk_size int

Chunk size in bytes

1024 * 1024
Source code in sqlstream/readers/parallel_reader.py
def __init__(self, path: str, num_threads: int = 4, chunk_size: int = 1024 * 1024):
    """
    Initialize parallel CSV reader

    Args:
        path: Path to CSV file
        num_threads: Number of worker threads
        chunk_size: Chunk size in bytes
    """
    from sqlstream.readers.csv_reader import CSVReader

    self.reader = CSVReader(path)
    self.parallel_reader = ParallelReader(self.reader, num_threads=num_threads)

read_lazy

read_lazy() -> Iterator[dict[str, Any]]

Yield rows

Source code in sqlstream/readers/parallel_reader.py
def read_lazy(self) -> Iterator[dict[str, Any]]:
    """Yield rows"""
    return self.parallel_reader.read_lazy()

__iter__

__iter__()

Allow iteration

Source code in sqlstream/readers/parallel_reader.py
def __iter__(self):
    """Allow iteration"""
    return self.read_lazy()

ParallelParquetReader

ParallelParquetReader

Parallel Parquet reader using row group parallelism

Parquet files are naturally parallelizable because: - Data is split into row groups - Each row group can be read independently - PyArrow supports parallel reading natively

Note

This is a placeholder. PyArrow already supports parallel reading via threads parameter in read_table().

For true parallel execution in SQLStream, we would: 1. Read row groups in parallel 2. Apply filters in parallel 3. Merge results in order

Source code in sqlstream/readers/parallel_reader.py
class ParallelParquetReader:
    """
    Parallel Parquet reader using row group parallelism

    Parquet files are naturally parallelizable because:
    - Data is split into row groups
    - Each row group can be read independently
    - PyArrow supports parallel reading natively

    Note:
        This is a placeholder. PyArrow already supports parallel
        reading via threads parameter in read_table().

        For true parallel execution in SQLStream, we would:
        1. Read row groups in parallel
        2. Apply filters in parallel
        3. Merge results in order
    """

    def __init__(self, path: str, num_threads: int = 4):
        """
        Initialize parallel Parquet reader

        Args:
            path: Path to Parquet file
            num_threads: Number of worker threads
        """
        from sqlstream.readers.parquet_reader import ParquetReader

        self.reader = ParquetReader(path)
        self.parallel_reader = ParallelReader(self.reader, num_threads=num_threads)

    def read_lazy(self) -> Iterator[dict[str, Any]]:
        """Yield rows"""
        return self.parallel_reader.read_lazy()

    def __iter__(self):
        """Allow iteration"""
        return self.read_lazy()

__init__

__init__(path: str, num_threads: int = 4)

Initialize parallel Parquet reader

Parameters:

Name Type Description Default
path str

Path to Parquet file

required
num_threads int

Number of worker threads

4
Source code in sqlstream/readers/parallel_reader.py
def __init__(self, path: str, num_threads: int = 4):
    """
    Initialize parallel Parquet reader

    Args:
        path: Path to Parquet file
        num_threads: Number of worker threads
    """
    from sqlstream.readers.parquet_reader import ParquetReader

    self.reader = ParquetReader(path)
    self.parallel_reader = ParallelReader(self.reader, num_threads=num_threads)

read_lazy

read_lazy() -> Iterator[dict[str, Any]]

Yield rows

Source code in sqlstream/readers/parallel_reader.py
def read_lazy(self) -> Iterator[dict[str, Any]]:
    """Yield rows"""
    return self.parallel_reader.read_lazy()

__iter__

__iter__()

Allow iteration

Source code in sqlstream/readers/parallel_reader.py
def __iter__(self):
    """Allow iteration"""
    return self.read_lazy()

ParallelReader

ParallelReader

Parallel wrapper for data readers

Wraps any BaseReader and reads data in parallel using a thread pool.

Usage
reader = CSVReader("large_file.csv")
parallel_reader = ParallelReader(reader, num_threads=4)

for row in parallel_reader:
    process(row)

How it works: - Producer threads read chunks of data - Consumer (main thread) yields rows in order - Queue-based coordination - Graceful shutdown on completion or error

Source code in sqlstream/readers/parallel_reader.py
class ParallelReader:
    """
    Parallel wrapper for data readers

    Wraps any BaseReader and reads data in parallel using a thread pool.

    Usage:
        ```python
        reader = CSVReader("large_file.csv")
        parallel_reader = ParallelReader(reader, num_threads=4)

        for row in parallel_reader:
            process(row)
        ```

    How it works:
    - Producer threads read chunks of data
    - Consumer (main thread) yields rows in order
    - Queue-based coordination
    - Graceful shutdown on completion or error
    """

    def __init__(
        self,
        reader: BaseReader,
        num_threads: int = 4,
        queue_size: int = 100,
    ):
        """
        Initialize parallel reader

        Args:
            reader: Underlying reader to wrap
            num_threads: Number of worker threads
            queue_size: Maximum items in queue (backpressure)
        """
        self.reader = reader
        self.num_threads = num_threads
        self.queue_size = queue_size

        # Queue for passing rows between threads
        self.row_queue: queue.Queue = queue.Queue(maxsize=queue_size)

        # Coordination
        self.stop_event = threading.Event()
        self.error: Exception | None = None
        self.workers: list[threading.Thread] = []

    def read_lazy(self) -> Iterator[dict[str, Any]]:
        """
        Yield rows from parallel reader

        Yields:
            Dictionary representing one row
        """
        # Start worker threads
        self._start_workers()

        try:
            # Yield rows from queue until done
            while True:
                try:
                    # Get row from queue (with timeout to check for errors)
                    row = self.row_queue.get(timeout=0.1)

                    # Sentinel value indicates completion
                    if row is None:
                        break

                    yield row

                except queue.Empty as e:
                    # Check if workers encountered error
                    if self.error:
                        raise self.error from e

                    # Check if all workers finished
                    if not any(w.is_alive() for w in self.workers):
                        # Workers done but no sentinel? Something went wrong
                        if self.row_queue.empty():
                            break

        finally:
            # Cleanup: stop workers and join threads
            self._stop_workers()

    def _start_workers(self) -> None:
        """Start worker threads"""
        # For now, use single-threaded mode
        # Multi-threading in Python is tricky due to GIL
        # and iterator protocol doesn't work well with threads

        # Simple implementation: just delegate to underlying reader
        # Future: Implement true parallelism with chunking

        worker = threading.Thread(target=self._worker_function, daemon=True)
        worker.start()
        self.workers.append(worker)

    def _worker_function(self) -> None:
        """
        Worker thread function

        Reads rows from underlying reader and puts them in queue
        """
        try:
            for row in self.reader.read_lazy():
                # Check if we should stop
                if self.stop_event.is_set():
                    break

                # Put row in queue (blocks if queue full)
                self.row_queue.put(row)

            # Signal completion with sentinel
            self.row_queue.put(None)

        except Exception as e:
            # Capture error for main thread
            self.error = e
            # Put sentinel to unblock main thread
            self.row_queue.put(None)

    def _stop_workers(self) -> None:
        """Stop worker threads and clean up"""
        # Signal workers to stop
        self.stop_event.set()

        # Wait for workers to finish (with timeout)
        for worker in self.workers:
            worker.join(timeout=1.0)

        # Clear queue
        while not self.row_queue.empty():
            try:
                self.row_queue.get_nowait()
            except queue.Empty:
                break

    def __iter__(self):
        """Allow iteration"""
        return self.read_lazy()

__init__

__init__(reader: BaseReader, num_threads: int = 4, queue_size: int = 100)

Initialize parallel reader

Parameters:

Name Type Description Default
reader BaseReader

Underlying reader to wrap

required
num_threads int

Number of worker threads

4
queue_size int

Maximum items in queue (backpressure)

100
Source code in sqlstream/readers/parallel_reader.py
def __init__(
    self,
    reader: BaseReader,
    num_threads: int = 4,
    queue_size: int = 100,
):
    """
    Initialize parallel reader

    Args:
        reader: Underlying reader to wrap
        num_threads: Number of worker threads
        queue_size: Maximum items in queue (backpressure)
    """
    self.reader = reader
    self.num_threads = num_threads
    self.queue_size = queue_size

    # Queue for passing rows between threads
    self.row_queue: queue.Queue = queue.Queue(maxsize=queue_size)

    # Coordination
    self.stop_event = threading.Event()
    self.error: Exception | None = None
    self.workers: list[threading.Thread] = []

read_lazy

read_lazy() -> Iterator[dict[str, Any]]

Yield rows from parallel reader

Yields:

Type Description
dict[str, Any]

Dictionary representing one row

Source code in sqlstream/readers/parallel_reader.py
def read_lazy(self) -> Iterator[dict[str, Any]]:
    """
    Yield rows from parallel reader

    Yields:
        Dictionary representing one row
    """
    # Start worker threads
    self._start_workers()

    try:
        # Yield rows from queue until done
        while True:
            try:
                # Get row from queue (with timeout to check for errors)
                row = self.row_queue.get(timeout=0.1)

                # Sentinel value indicates completion
                if row is None:
                    break

                yield row

            except queue.Empty as e:
                # Check if workers encountered error
                if self.error:
                    raise self.error from e

                # Check if all workers finished
                if not any(w.is_alive() for w in self.workers):
                    # Workers done but no sentinel? Something went wrong
                    if self.row_queue.empty():
                        break

    finally:
        # Cleanup: stop workers and join threads
        self._stop_workers()

__iter__

__iter__()

Allow iteration

Source code in sqlstream/readers/parallel_reader.py
def __iter__(self):
    """Allow iteration"""
    return self.read_lazy()

enable_parallel_reading

enable_parallel_reading

enable_parallel_reading(reader: BaseReader, num_threads: int = 4) -> ParallelReader

Enable parallel reading for any reader

Parameters:

Name Type Description Default
reader BaseReader

Reader to wrap

required
num_threads int

Number of worker threads

4

Returns:

Type Description
ParallelReader

Parallel reader wrapper

Example
reader = CSVReader("large_file.csv")
parallel_reader = enable_parallel_reading(reader, num_threads=4)

for row in parallel_reader:
    process(row)
Source code in sqlstream/readers/parallel_reader.py
def enable_parallel_reading(reader: BaseReader, num_threads: int = 4) -> ParallelReader:
    """
    Enable parallel reading for any reader

    Args:
        reader: Reader to wrap
        num_threads: Number of worker threads

    Returns:
        Parallel reader wrapper

    Example:
        ```python
        reader = CSVReader("large_file.csv")
        parallel_reader = enable_parallel_reading(reader, num_threads=4)

        for row in parallel_reader:
            process(row)
        ```
    """
    return ParallelReader(reader, num_threads=num_threads)

ParquetReader

ParquetReader

Bases: BaseReader

Intelligent Parquet reader with statistics-based optimization

Features: - Lazy iteration (doesn't load entire file) - Row group statistics-based pruning (HUGE performance win) - Column selection (only read needed columns) - Predicate pushdown with statistics

The key insight: Parquet stores min/max for each column in each row group. We can skip entire row groups if their statistics don't match our filters!

Example

Row Group 1: age [18-30], city ['LA', 'NYC'] Row Group 2: age [31-45], city ['NYC', 'SF'] Row Group 3: age [46-90], city ['LA', 'SF']

Query: WHERE age > 60 → Skip RG1 (max=30), Skip RG2 (max=45), Read RG3 only!

Source code in sqlstream/readers/parquet_reader.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
class ParquetReader(BaseReader):
    """
    Intelligent Parquet reader with statistics-based optimization

    Features:
    - Lazy iteration (doesn't load entire file)
    - Row group statistics-based pruning (HUGE performance win)
    - Column selection (only read needed columns)
    - Predicate pushdown with statistics

    The key insight: Parquet stores min/max for each column in each row group.
    We can skip entire row groups if their statistics don't match our filters!

    Example:
        Row Group 1: age [18-30], city ['LA', 'NYC']
        Row Group 2: age [31-45], city ['NYC', 'SF']
        Row Group 3: age [46-90], city ['LA', 'SF']

        Query: WHERE age > 60
        → Skip RG1 (max=30), Skip RG2 (max=45), Read RG3 only!
    """

    def __init__(self, path: str):
        """
        Initialize Parquet reader

        Args:
            path: Path to Parquet file (local or s3://)
        """
        self.path_str = path
        self.is_s3 = path.startswith("s3://")

        filesystem = None
        path_to_open = path

        if self.is_s3:
            try:
                import s3fs

                filesystem = s3fs.S3FileSystem(anon=False)
                # s3fs expects path without protocol when filesystem is provided
                path_to_open = path.replace("s3://", "")
            except ImportError as e:
                raise ImportError("s3fs is required for S3 support. Install `sqlstream[s3]`") from e
        else:
            self.path = Path(path)
            path_to_open = str(self.path)
            if not self.path.exists():
                raise FileNotFoundError(f"Parquet file not found: {path}")

        self.parquet_file = pq.ParquetFile(path_to_open, filesystem=filesystem)

        # Optimization state (set by planner)
        self.filter_conditions: list[Condition] = []
        self.required_columns: list[str] = []
        self.limit: int | None = None
        self.partition_filters: list[Condition] = []

        # Parse partition information from path
        self.partition_columns: set = set()
        self.partition_values: dict[str, Any] = {}
        self._parse_partition_info()

        # Check if file should be skipped based on partition filters
        self.partition_pruned = False

        # Statistics tracking
        self.total_row_groups = self.parquet_file.num_row_groups
        self.row_groups_scanned = 0

    def supports_pushdown(self) -> bool:
        """Parquet reader supports predicate pushdown"""
        return True

    def supports_column_selection(self) -> bool:
        """Parquet reader supports column pruning"""
        return True

    def supports_limit(self) -> bool:
        """Parquet reader supports limit pushdown"""
        return True

    def set_filter(self, conditions: list[Condition]) -> None:
        """Set filter conditions for pushdown"""
        self.filter_conditions = conditions

    def set_columns(self, columns: list[str]) -> None:
        """Set required columns for pruning"""
        self.required_columns = columns

    def set_limit(self, limit: int) -> None:
        """Set maximum rows to read for early termination"""
        self.limit = limit

    def supports_partition_pruning(self) -> bool:
        """Parquet reader supports partition pruning for Hive-style partitioning"""
        return True

    def get_partition_columns(self) -> set:
        """Get partition column names detected from file path"""
        return self.partition_columns

    def set_partition_filters(self, conditions: list[Condition]) -> None:
        """
        Set partition filters and check if this file should be skipped

        Args:
            conditions: List of WHERE conditions on partition columns
        """
        self.partition_filters = conditions

        # Check if this file's partitions match the filters
        # If not, mark it as pruned so we skip reading it
        if not self._partition_matches_filters():
            self.partition_pruned = True

    def read_lazy(self) -> Iterator[dict[str, Any]]:
        """
        Lazy iterator over Parquet rows with intelligent row group pruning

        This is where the magic happens:
        1. Check partition pruning (skip entire file if needed!)
        2. Select row groups using statistics (skip irrelevant ones!)
        3. Read only selected row groups
        4. Read only required columns
        5. Yield rows as dictionaries
        6. Early termination if limit is reached
        """
        # Step 0: Partition pruning - skip entire file if partition doesn't match
        if self.partition_pruned:
            # File has been pruned based on partition filters
            # Don't read any data!
            return

        # Step 1: Intelligent row group selection
        selected_row_groups = self._select_row_groups_with_statistics()

        # Track how many we're actually reading
        self.row_groups_scanned = len(selected_row_groups)

        # Step 2: Read only selected row groups
        rows_yielded = 0
        for rg_idx in selected_row_groups:
            # Read this row group (with column selection)
            for row in self._read_row_group(rg_idx):
                # Add partition columns to the row
                # These are "virtual" columns from the directory structure
                for col, value in self.partition_values.items():
                    row[col] = value

                yield row
                rows_yielded += 1

                # Early termination if limit reached
                if self.limit is not None and rows_yielded >= self.limit:
                    return

    def _select_row_groups_with_statistics(self) -> list[int]:
        """
        Use row group statistics to select which ones to read

        This is THE key optimization for Parquet!

        Returns:
            List of row group indices to read
        """
        if not self.filter_conditions:
            # No filters, read all row groups
            return list(range(self.total_row_groups))

        selected = []
        metadata = self.parquet_file.metadata

        for rg_idx in range(self.total_row_groups):
            rg_metadata = metadata.row_group(rg_idx)

            # Check if this row group's statistics match our filters
            if self._row_group_matches_filters(rg_metadata):
                selected.append(rg_idx)

        return selected

    def _row_group_matches_filters(self, rg_metadata) -> bool:
        """
        Check if row group statistics overlap with filter conditions

        Uses min/max statistics to determine if a row group could
        possibly contain matching rows.

        Args:
            rg_metadata: Row group metadata from Parquet file

        Returns:
            True if row group might contain matching rows
            False if we can definitively skip it

        Example:
            Filter: age > 60
            Row Group: age [18-55]
            Result: False (max < 60, so no rows can match)
        """
        for condition in self.filter_conditions:
            column_name = condition.column

            # Find column index
            try:
                # Get schema to find column index
                schema = self.parquet_file.schema_arrow
                column_idx = schema.get_field_index(column_name)
            except Exception:
                # Column not found or no index, can't use statistics
                continue

            # Get column metadata
            try:
                col_metadata = rg_metadata.column(column_idx)

                # Check if statistics are available
                if not col_metadata.is_stats_set:
                    continue

                stats = col_metadata.statistics

                # Get min/max values
                min_val = stats.min
                max_val = stats.max

                # Check if filter can eliminate this row group
                if not self._statistics_match_condition(min_val, max_val, condition):
                    return False  # Skip this row group!

            except Exception:
                # No statistics or error, conservatively keep row group
                continue

        return True  # Row group might contain matches

    def _statistics_match_condition(self, min_val: Any, max_val: Any, condition: Condition) -> bool:
        """
        Check if min/max statistics overlap with a condition

        Args:
            min_val: Minimum value in row group
            max_val: Maximum value in row group
            condition: Filter condition to check

        Returns:
            True if row group might contain matches
            False if we can skip it

        Logic:
            age > 60: Skip if max_val <= 60
            age < 30: Skip if min_val >= 30
            age = 25: Skip if 25 < min_val or 25 > max_val
            age >= 50: Skip if max_val < 50
            age <= 40: Skip if min_val > 40
        """
        op = condition.operator
        value = condition.value

        try:
            if op == ">":
                # Skip if max_val <= value (all rows too small)
                return max_val > value

            elif op == ">=":
                # Skip if max_val < value
                return max_val >= value

            elif op == "<":
                # Skip if min_val >= value (all rows too large)
                return min_val < value

            elif op == "<=":
                # Skip if min_val > value
                return min_val <= value

            elif op == "=":
                # Skip if value outside [min_val, max_val]
                return min_val <= value <= max_val

            elif op == "!=":
                # Can only skip if min_val == max_val == value
                # (entire row group is the excluded value)
                if min_val == max_val == value:
                    return False
                return True

            else:
                # Unknown operator, conservatively keep row group
                return True

        except (TypeError, ValueError):
            # Comparison failed (type mismatch), keep row group
            return True

    def _read_row_group(self, rg_idx: int) -> Iterator[dict[str, Any]]:
        """
        Read a specific row group

        Args:
            rg_idx: Row group index to read

        Yields:
            Rows as dictionaries
        """
        # Determine which columns to read
        # If we have filters, we need to read those columns even if not in required_columns
        columns_to_read = set()

        if self.required_columns:
            columns_to_read.update(self.required_columns)

        # Add columns needed for filtering
        if self.filter_conditions:
            for condition in self.filter_conditions:
                columns_to_read.add(condition.column)

        # Convert to list, or None to read all columns
        columns = list(columns_to_read) if columns_to_read else None

        # Read row group with column selection
        table = self.parquet_file.read_row_group(rg_idx, columns=columns)

        # Convert to row-oriented format and yield
        # PyArrow returns columnar data, we need rows
        num_rows = table.num_rows

        for i in range(num_rows):
            row = {}
            for col_name in table.column_names:
                col_data = table.column(col_name)
                # Get value at index i
                value = col_data[i].as_py()  # Convert to Python type
                row[col_name] = value

            # Apply filter conditions if set
            # Note: Row group statistics only help us skip entire groups,
            # but we still need to filter individual rows within selected groups
            if self.filter_conditions:
                if not self._matches_filter(row):
                    continue

            # Apply column selection to output
            # (we may have read extra columns for filtering)
            if self.required_columns:
                row = {k: v for k, v in row.items() if k in self.required_columns}

            yield row

    def _matches_filter(self, row: dict[str, Any]) -> bool:
        """
        Check if row matches all filter conditions

        Args:
            row: Row to check

        Returns:
            True if row matches all conditions (AND logic)
        """
        for condition in self.filter_conditions:
            if not self._evaluate_condition(row, condition):
                return False
        return True

    def _evaluate_condition(self, row: dict[str, Any], condition: Condition) -> bool:
        """
        Evaluate a single condition against a row

        Args:
            row: Row to check
            condition: Condition to evaluate

        Returns:
            True if condition is satisfied
        """
        # Get column value
        if condition.column not in row:
            return False

        value = row[condition.column]

        # Handle NULL values
        if value is None:
            return False

        # Evaluate operator
        op = condition.operator
        expected = condition.value

        try:
            if op == "=":
                return value == expected
            elif op == ">":
                return value > expected
            elif op == "<":
                return value < expected
            elif op == ">=":
                return value >= expected
            elif op == "<=":
                return value <= expected
            elif op == "!=":
                return value != expected
            else:
                # Unknown operator, conservatively keep row
                return True

        except TypeError:
            # Type mismatch (e.g., comparing string to int)
            # This is fine - row just doesn't match
            return False

    def get_schema(self) -> Schema:
        """
        Get schema from Parquet metadata

        Returns:
            Dictionary mapping column names to types
        """
        schema: dict[str, DataType] = {}
        arrow_schema = self.parquet_file.schema_arrow

        for i in range(len(arrow_schema)):
            field = arrow_schema.field(i)
            # Map Arrow types to simple type names
            schema[field.name] = self._arrow_type_to_dtype(field.type)

        return Schema(schema)

    def _arrow_type_to_string(self, arrow_type) -> str:
        """
        Convert PyArrow type to simple string

        Args:
            arrow_type: PyArrow data type

        Returns:
            Simple type name (int, float, string, etc.)
        """
        type_str = str(arrow_type)

        if "int" in type_str.lower():
            return "int"
        elif "float" in type_str.lower() or "double" in type_str.lower():
            return "float"
        elif "decimal" in type_str.lower():
            return "decimal"
        elif "string" in type_str.lower() or "utf8" in type_str.lower():
            return "string"
        elif "bool" in type_str.lower():
            return "bool"
        elif "date" in type_str.lower():
            return "date"
        elif "timestamp" in type_str.lower():
            return "datetime"
        elif "time" in type_str.lower():
            return "time"
        else:
            return type_str

    def _arrow_type_to_dtype(self, arrow_type) -> DataType:
        """
        Convert PyArrow type to SQLStream data type

        Args:
            arrow_type: PyArrow data type

        Returns:
            SQLStream data type
        """
        simple_type = self._arrow_type_to_string(arrow_type)
        if simple_type == "int":
            return DataType.INTEGER
        elif simple_type == "float":
            return DataType.FLOAT
        elif simple_type == "decimal":
            return DataType.DECIMAL
        elif simple_type == "string":
            return DataType.STRING
        elif simple_type == "bool":
            return DataType.BOOLEAN
        elif simple_type == "date":
            return DataType.DATE
        elif simple_type == "datetime":
            return DataType.DATETIME
        elif simple_type == "time":
            return DataType.TIME
        else:
            return DataType.STRING  # Default to string instead of NULL for unknown types

    def _parse_partition_info(self) -> None:
        """
        Parse partition information from Hive-style partitioned path

        Detects partition columns and values from path structure:
        - s3://bucket/data/year=2024/month=01/data.parquet
        - /path/to/data/country=USA/state=CA/data.parquet

        Populates:
        - self.partition_columns: {'year', 'month'} or {'country', 'state'}
        - self.partition_values: {'year': 2024, 'month': 1} or {'country': 'USA', 'state': 'CA'}
        """
        import re

        # Parse the path string for partition key=value patterns
        # Match pattern: name=value in directory structure
        partition_pattern = re.compile(r"([^/=]+)=([^/]+)")

        matches = partition_pattern.findall(self.path_str)

        for key, value in matches:
            self.partition_columns.add(key)

            # Try to infer type of partition value
            # Common patterns: year=2024 (int), month=01 (int), country=USA (str)
            typed_value = self._infer_partition_value_type(value)
            self.partition_values[key] = typed_value

    def _infer_partition_value_type(self, value: str) -> Any:
        """
        Infer the type of a partition value string

        Args:
            value: String value from partition path (e.g., "2024", "01", "USA")

        Returns:
            Typed value (int, float, or str)
        """
        from sqlstream.core.types import infer_type_from_string

        return infer_type_from_string(value)

    def _partition_matches_filters(self) -> bool:
        """
        Check if this file's partition values match the partition filters

        Returns:
            True if partition matches (file should be read)
            False if partition doesn't match (file should be skipped)

        Example:
            File path: s3://bucket/data/year=2024/month=01/data.parquet
            Partition values: {'year': 2024, 'month': 1}

            Filter: year > 2023 AND month = 1
            Result: True (matches)

            Filter: year = 2025
            Result: False (doesn't match, skip file!)
        """
        if not self.partition_filters:
            # No filters, all partitions match
            return True

        # Check each partition filter condition
        for condition in self.partition_filters:
            column = condition.column
            operator = condition.operator
            expected = condition.value

            # Get partition value for this column
            if column not in self.partition_values:
                # Filter references a partition column that doesn't exist in path
                # Conservatively assume match (don't skip)
                continue

            actual = self.partition_values[column]

            # Evaluate condition
            if not self._evaluate_partition_condition(actual, operator, expected):
                # Condition failed, skip this file!
                return False

        # All conditions passed
        return True

    def _evaluate_partition_condition(self, actual: Any, operator: str, expected: Any) -> bool:
        """
        Evaluate a partition filter condition

        Args:
            actual: Actual partition value from path
            operator: Comparison operator (=, >, <, etc.)
            expected: Expected value from WHERE clause

        Returns:
            True if condition is satisfied, False otherwise
        """
        try:
            if operator == "=":
                return actual == expected
            elif operator == ">":
                return actual > expected
            elif operator == "<":
                return actual < expected
            elif operator == ">=":
                return actual >= expected
            elif operator == "<=":
                return actual <= expected
            elif operator == "!=":
                return actual != expected
            else:
                # Unknown operator, conservatively match
                return True

        except (TypeError, ValueError):
            # Type mismatch, conservatively match
            return True

    def get_statistics(self) -> dict[str, Any]:
        """
        Get statistics about row group pruning

        Returns:
            Dictionary with pruning statistics
        """
        return {
            "total_row_groups": self.total_row_groups,
            "row_groups_scanned": self.row_groups_scanned,
            "row_groups_skipped": self.total_row_groups - self.row_groups_scanned,
            "pruning_ratio": (
                (self.total_row_groups - self.row_groups_scanned) / self.total_row_groups
                if self.total_row_groups > 0
                else 0
            ),
            "partition_pruned": self.partition_pruned,
            "partition_columns": list(self.partition_columns),
            "partition_values": self.partition_values,
        }

    def to_dataframe(self):
        """
        Convert to pandas DataFrame efficiently
        """
        import pandas as pd

        # Use pandas read_parquet for performance
        if self.is_s3:
            return pd.read_parquet(self.path_str, storage_options={"anon": False})
        else:
            return pd.read_parquet(self.path)

__init__

__init__(path: str)

Initialize Parquet reader

Parameters:

Name Type Description Default
path str

Path to Parquet file (local or s3://)

required
Source code in sqlstream/readers/parquet_reader.py
def __init__(self, path: str):
    """
    Initialize Parquet reader

    Args:
        path: Path to Parquet file (local or s3://)
    """
    self.path_str = path
    self.is_s3 = path.startswith("s3://")

    filesystem = None
    path_to_open = path

    if self.is_s3:
        try:
            import s3fs

            filesystem = s3fs.S3FileSystem(anon=False)
            # s3fs expects path without protocol when filesystem is provided
            path_to_open = path.replace("s3://", "")
        except ImportError as e:
            raise ImportError("s3fs is required for S3 support. Install `sqlstream[s3]`") from e
    else:
        self.path = Path(path)
        path_to_open = str(self.path)
        if not self.path.exists():
            raise FileNotFoundError(f"Parquet file not found: {path}")

    self.parquet_file = pq.ParquetFile(path_to_open, filesystem=filesystem)

    # Optimization state (set by planner)
    self.filter_conditions: list[Condition] = []
    self.required_columns: list[str] = []
    self.limit: int | None = None
    self.partition_filters: list[Condition] = []

    # Parse partition information from path
    self.partition_columns: set = set()
    self.partition_values: dict[str, Any] = {}
    self._parse_partition_info()

    # Check if file should be skipped based on partition filters
    self.partition_pruned = False

    # Statistics tracking
    self.total_row_groups = self.parquet_file.num_row_groups
    self.row_groups_scanned = 0

supports_pushdown

supports_pushdown() -> bool

Parquet reader supports predicate pushdown

Source code in sqlstream/readers/parquet_reader.py
def supports_pushdown(self) -> bool:
    """Parquet reader supports predicate pushdown"""
    return True

supports_column_selection

supports_column_selection() -> bool

Parquet reader supports column pruning

Source code in sqlstream/readers/parquet_reader.py
def supports_column_selection(self) -> bool:
    """Parquet reader supports column pruning"""
    return True

supports_limit

supports_limit() -> bool

Parquet reader supports limit pushdown

Source code in sqlstream/readers/parquet_reader.py
def supports_limit(self) -> bool:
    """Parquet reader supports limit pushdown"""
    return True

set_filter

set_filter(conditions: list[Condition]) -> None

Set filter conditions for pushdown

Source code in sqlstream/readers/parquet_reader.py
def set_filter(self, conditions: list[Condition]) -> None:
    """Set filter conditions for pushdown"""
    self.filter_conditions = conditions

set_columns

set_columns(columns: list[str]) -> None

Set required columns for pruning

Source code in sqlstream/readers/parquet_reader.py
def set_columns(self, columns: list[str]) -> None:
    """Set required columns for pruning"""
    self.required_columns = columns

set_limit

set_limit(limit: int) -> None

Set maximum rows to read for early termination

Source code in sqlstream/readers/parquet_reader.py
def set_limit(self, limit: int) -> None:
    """Set maximum rows to read for early termination"""
    self.limit = limit

supports_partition_pruning

supports_partition_pruning() -> bool

Parquet reader supports partition pruning for Hive-style partitioning

Source code in sqlstream/readers/parquet_reader.py
def supports_partition_pruning(self) -> bool:
    """Parquet reader supports partition pruning for Hive-style partitioning"""
    return True

get_partition_columns

get_partition_columns() -> set

Get partition column names detected from file path

Source code in sqlstream/readers/parquet_reader.py
def get_partition_columns(self) -> set:
    """Get partition column names detected from file path"""
    return self.partition_columns

set_partition_filters

set_partition_filters(conditions: list[Condition]) -> None

Set partition filters and check if this file should be skipped

Parameters:

Name Type Description Default
conditions list[Condition]

List of WHERE conditions on partition columns

required
Source code in sqlstream/readers/parquet_reader.py
def set_partition_filters(self, conditions: list[Condition]) -> None:
    """
    Set partition filters and check if this file should be skipped

    Args:
        conditions: List of WHERE conditions on partition columns
    """
    self.partition_filters = conditions

    # Check if this file's partitions match the filters
    # If not, mark it as pruned so we skip reading it
    if not self._partition_matches_filters():
        self.partition_pruned = True

read_lazy

read_lazy() -> Iterator[dict[str, Any]]

Lazy iterator over Parquet rows with intelligent row group pruning

This is where the magic happens: 1. Check partition pruning (skip entire file if needed!) 2. Select row groups using statistics (skip irrelevant ones!) 3. Read only selected row groups 4. Read only required columns 5. Yield rows as dictionaries 6. Early termination if limit is reached

Source code in sqlstream/readers/parquet_reader.py
def read_lazy(self) -> Iterator[dict[str, Any]]:
    """
    Lazy iterator over Parquet rows with intelligent row group pruning

    This is where the magic happens:
    1. Check partition pruning (skip entire file if needed!)
    2. Select row groups using statistics (skip irrelevant ones!)
    3. Read only selected row groups
    4. Read only required columns
    5. Yield rows as dictionaries
    6. Early termination if limit is reached
    """
    # Step 0: Partition pruning - skip entire file if partition doesn't match
    if self.partition_pruned:
        # File has been pruned based on partition filters
        # Don't read any data!
        return

    # Step 1: Intelligent row group selection
    selected_row_groups = self._select_row_groups_with_statistics()

    # Track how many we're actually reading
    self.row_groups_scanned = len(selected_row_groups)

    # Step 2: Read only selected row groups
    rows_yielded = 0
    for rg_idx in selected_row_groups:
        # Read this row group (with column selection)
        for row in self._read_row_group(rg_idx):
            # Add partition columns to the row
            # These are "virtual" columns from the directory structure
            for col, value in self.partition_values.items():
                row[col] = value

            yield row
            rows_yielded += 1

            # Early termination if limit reached
            if self.limit is not None and rows_yielded >= self.limit:
                return

get_schema

get_schema() -> Schema

Get schema from Parquet metadata

Returns:

Type Description
Schema

Dictionary mapping column names to types

Source code in sqlstream/readers/parquet_reader.py
def get_schema(self) -> Schema:
    """
    Get schema from Parquet metadata

    Returns:
        Dictionary mapping column names to types
    """
    schema: dict[str, DataType] = {}
    arrow_schema = self.parquet_file.schema_arrow

    for i in range(len(arrow_schema)):
        field = arrow_schema.field(i)
        # Map Arrow types to simple type names
        schema[field.name] = self._arrow_type_to_dtype(field.type)

    return Schema(schema)

get_statistics

get_statistics() -> dict[str, Any]

Get statistics about row group pruning

Returns:

Type Description
dict[str, Any]

Dictionary with pruning statistics

Source code in sqlstream/readers/parquet_reader.py
def get_statistics(self) -> dict[str, Any]:
    """
    Get statistics about row group pruning

    Returns:
        Dictionary with pruning statistics
    """
    return {
        "total_row_groups": self.total_row_groups,
        "row_groups_scanned": self.row_groups_scanned,
        "row_groups_skipped": self.total_row_groups - self.row_groups_scanned,
        "pruning_ratio": (
            (self.total_row_groups - self.row_groups_scanned) / self.total_row_groups
            if self.total_row_groups > 0
            else 0
        ),
        "partition_pruned": self.partition_pruned,
        "partition_columns": list(self.partition_columns),
        "partition_values": self.partition_values,
    }

to_dataframe

to_dataframe()

Convert to pandas DataFrame efficiently

Source code in sqlstream/readers/parquet_reader.py
def to_dataframe(self):
    """
    Convert to pandas DataFrame efficiently
    """
    import pandas as pd

    # Use pandas read_parquet for performance
    if self.is_s3:
        return pd.read_parquet(self.path_str, storage_options={"anon": False})
    else:
        return pd.read_parquet(self.path)

XMLReader

XMLReader

Bases: BaseReader

Read tabular data from XML files

Extracts tabular data from XML by finding repeating elements. Each repeating element becomes a row, and child elements/attributes become columns.

Example XML

Alice 30 New York Bob 25 San Francisco

Example

Query all elements

reader = XMLReader("data.xml", element="record")

Query with XPath-like syntax

reader = XMLReader("data.xml", element="data/record")

Source code in sqlstream/readers/xml_reader.py
class XMLReader(BaseReader):
    """
    Read tabular data from XML files

    Extracts tabular data from XML by finding repeating elements.
    Each repeating element becomes a row, and child elements/attributes become columns.

    Example XML:
        <data>
            <record>
                <name>Alice</name>
                <age>30</age>
                <city>New York</city>
            </record>
            <record>
                <name>Bob</name>
                <age>25</age>
                <city>San Francisco</city>
            </record>
        </data>

    Example:
        # Query all <record> elements
        reader = XMLReader("data.xml", element="record")

        # Query with XPath-like syntax
        reader = XMLReader("data.xml", element="data/record")
    """

    def __init__(self, source: str, element: str | None = None, **kwargs):
        """
        Initialize XML reader

        Args:
            source: Path to XML file
            element: Element tag name or path to extract (e.g., "record" or "data/record")
                    If not provided, will try to find the first repeating element
            **kwargs: Additional arguments (reserved for future use)
        """
        self.source = source
        self.element = element
        self.kwargs = kwargs

        # Parse XML and extract records
        self._parse_xml()

        # Filter conditions and columns
        self.filter_conditions: list[Condition] = []
        self.required_columns: list[str] = []

    def _parse_xml(self) -> None:
        """Parse XML file and extract tabular data"""
        try:
            tree = ET.parse(self.source)
            root = tree.getroot()

            # Find the elements to extract
            if self.element:
                # User specified element path
                elements = root.findall(f".//{self.element}")
                if not elements:
                    # Try exact path without //
                    elements = root.findall(self.element)
                if not elements:
                    raise ValueError(
                        f"No elements found matching '{self.element}' in XML: {self.source}"
                    )
            else:
                # Auto-detect: find first repeating element
                elements = self._find_repeating_elements(root)
                if not elements:
                    raise ValueError(
                        f"No repeating elements found in XML: {self.source}. "
                        "Specify element parameter explicitly."
                    )

            # Extract data from elements
            self.rows = []
            self.columns = set()

            for elem in elements:
                row = self._element_to_dict(elem)
                self.rows.append(row)
                self.columns.update(row.keys())

            # Convert columns to sorted list for consistent ordering
            self.columns = sorted(self.columns)

            if not self.rows:
                raise ValueError(f"No data rows extracted from XML: {self.source}")

        except ET.ParseError as e:
            raise OSError(f"Failed to parse XML file {self.source}: {e}") from e
        except FileNotFoundError as e:
            raise OSError(f"XML file not found: {self.source}") from e

    def _find_repeating_elements(self, root: ET.Element) -> list[ET.Element]:
        """
        Find the first type of repeating element in XML

        Returns:
            List of elements that repeat (more than one with same tag)
        """
        # Count occurrences of each tag at each level
        tag_counts = {}

        # Check direct children first
        for child in root:
            tag = child.tag
            if tag not in tag_counts:
                tag_counts[tag] = []
            tag_counts[tag].append(child)

        # Return the first tag that has multiple occurrences
        for _, elements in tag_counts.items():
            if len(elements) > 1:
                return elements

        # If no repeating elements at root level, search deeper
        for child in root:
            result = self._find_repeating_elements(child)
            if result:
                return result

        return []

    def _element_to_dict(self, elem: ET.Element) -> dict[str, Any]:
        """
        Convert an XML element to a dictionary

        Extracts:
        - Attributes as columns (prefixed with '@')
        - Child element text as columns
        - Nested elements as dot-notation columns (e.g., 'address.city')

        Args:
            elem: XML element to convert

        Returns:
            Dictionary representation of the element
        """
        row = {}

        # Add attributes (prefixed with @)
        for attr_name, attr_value in elem.attrib.items():
            row[f"@{attr_name}"] = self._infer_type(attr_value)

        # Add child elements
        for child in elem:
            tag = child.tag

            # If child has children, it's a nested structure
            if len(child) > 0:
                # Create nested dict with dot notation
                nested = self._element_to_dict(child)
                for key, value in nested.items():
                    row[f"{tag}.{key}"] = value
            else:
                # Simple text element
                text = child.text
                if text and text.strip():
                    row[tag] = self._infer_type(text.strip())
                else:
                    row[tag] = None

        # If element has text and no children, add it with special key
        if elem.text and elem.text.strip() and len(elem) == 0:
            row["_text"] = self._infer_type(elem.text.strip())

        return row

    def _infer_type(self, value: str) -> Any:
        """Infer and convert value to appropriate type"""
        from sqlstream.core.types import infer_type_from_string

        return infer_type_from_string(value)

    def read_lazy(self) -> Iterator[dict[str, Any]]:
        """Read data lazily from parsed XML"""
        for row in self.rows:
            # Apply filters if any
            if self.filter_conditions:
                if not self._matches_filters(row):
                    continue

            # Apply column selection if any
            if self.required_columns:
                # Ensure all columns exist in row (fill with None if missing)
                filtered_row = {k: row.get(k) for k in self.required_columns}
                yield filtered_row
            else:
                # Ensure all columns exist in row (fill with None if missing)
                complete_row = {col: row.get(col) for col in self.columns}
                yield complete_row

    def _matches_filters(self, row: dict[str, Any]) -> bool:
        """Check if row matches all filter conditions"""
        for condition in self.filter_conditions:
            col = condition.column
            op = condition.operator
            value = condition.value

            if col not in row:
                return False

            row_value = row[col]

            # Handle None values
            if row_value is None:
                return False

            # Apply operator
            if op == "=":
                if row_value != value:
                    return False
            elif op == ">":
                if row_value <= value:
                    return False
            elif op == "<":
                if row_value >= value:
                    return False
            elif op == ">=":
                if row_value < value:
                    return False
            elif op == "<=":
                if row_value > value:
                    return False
            elif op == "!=":
                if row_value == value:
                    return False

        return True

    def get_schema(self) -> Schema:
        """Get schema by inferring types from all rows"""
        schema = {}

        from sqlstream.core.types import infer_common_type

        for col in self.columns:
            # Collect all non-None values for this column
            values = [row[col] for row in self.rows if col in row and row[col] is not None]

            if not values:
                schema[col] = DataType.STRING
                continue

            # Infer common type from all values
            schema[col] = infer_common_type(values)

        return Schema(schema)

    def supports_pushdown(self) -> bool:
        """XML reader supports filter pushdown"""
        return True

    def supports_column_selection(self) -> bool:
        """XML reader supports column selection"""
        return True

    def set_filter(self, conditions: list[Condition]) -> None:
        """Set filter conditions"""
        self.filter_conditions = conditions

    def set_columns(self, columns: list[str]) -> None:
        """Set required columns"""
        self.required_columns = columns

    def to_dataframe(self):
        """
        Convert to pandas DataFrame
        """
        try:
            import pandas as pd
        except ImportError as e:
            raise ImportError("Pandas is required for to_dataframe()") from e

        # Ensure all rows have all columns
        complete_rows = [{col: row.get(col) for col in self.columns} for row in self.rows]

        return pd.DataFrame(complete_rows)

__init__

__init__(source: str, element: str | None = None, **kwargs)

Initialize XML reader

Parameters:

Name Type Description Default
source str

Path to XML file

required
element str | None

Element tag name or path to extract (e.g., "record" or "data/record") If not provided, will try to find the first repeating element

None
**kwargs

Additional arguments (reserved for future use)

{}
Source code in sqlstream/readers/xml_reader.py
def __init__(self, source: str, element: str | None = None, **kwargs):
    """
    Initialize XML reader

    Args:
        source: Path to XML file
        element: Element tag name or path to extract (e.g., "record" or "data/record")
                If not provided, will try to find the first repeating element
        **kwargs: Additional arguments (reserved for future use)
    """
    self.source = source
    self.element = element
    self.kwargs = kwargs

    # Parse XML and extract records
    self._parse_xml()

    # Filter conditions and columns
    self.filter_conditions: list[Condition] = []
    self.required_columns: list[str] = []

read_lazy

read_lazy() -> Iterator[dict[str, Any]]

Read data lazily from parsed XML

Source code in sqlstream/readers/xml_reader.py
def read_lazy(self) -> Iterator[dict[str, Any]]:
    """Read data lazily from parsed XML"""
    for row in self.rows:
        # Apply filters if any
        if self.filter_conditions:
            if not self._matches_filters(row):
                continue

        # Apply column selection if any
        if self.required_columns:
            # Ensure all columns exist in row (fill with None if missing)
            filtered_row = {k: row.get(k) for k in self.required_columns}
            yield filtered_row
        else:
            # Ensure all columns exist in row (fill with None if missing)
            complete_row = {col: row.get(col) for col in self.columns}
            yield complete_row

get_schema

get_schema() -> Schema

Get schema by inferring types from all rows

Source code in sqlstream/readers/xml_reader.py
def get_schema(self) -> Schema:
    """Get schema by inferring types from all rows"""
    schema = {}

    from sqlstream.core.types import infer_common_type

    for col in self.columns:
        # Collect all non-None values for this column
        values = [row[col] for row in self.rows if col in row and row[col] is not None]

        if not values:
            schema[col] = DataType.STRING
            continue

        # Infer common type from all values
        schema[col] = infer_common_type(values)

    return Schema(schema)

supports_pushdown

supports_pushdown() -> bool

XML reader supports filter pushdown

Source code in sqlstream/readers/xml_reader.py
def supports_pushdown(self) -> bool:
    """XML reader supports filter pushdown"""
    return True

supports_column_selection

supports_column_selection() -> bool

XML reader supports column selection

Source code in sqlstream/readers/xml_reader.py
def supports_column_selection(self) -> bool:
    """XML reader supports column selection"""
    return True

set_filter

set_filter(conditions: list[Condition]) -> None

Set filter conditions

Source code in sqlstream/readers/xml_reader.py
def set_filter(self, conditions: list[Condition]) -> None:
    """Set filter conditions"""
    self.filter_conditions = conditions

set_columns

set_columns(columns: list[str]) -> None

Set required columns

Source code in sqlstream/readers/xml_reader.py
def set_columns(self, columns: list[str]) -> None:
    """Set required columns"""
    self.required_columns = columns

to_dataframe

to_dataframe()

Convert to pandas DataFrame

Source code in sqlstream/readers/xml_reader.py
def to_dataframe(self):
    """
    Convert to pandas DataFrame
    """
    try:
        import pandas as pd
    except ImportError as e:
        raise ImportError("Pandas is required for to_dataframe()") from e

    # Ensure all rows have all columns
    complete_rows = [{col: row.get(col) for col in self.columns} for row in self.rows]

    return pd.DataFrame(complete_rows)