Python Module Quick Start
Get started with SQLStream as a Python library in 5 minutes!
Step 1: Install SQLStream
For minimal installation (CSV only):
Step 2: Your First Query
Create a sample CSV file or use an existing one:
# Create sample data
import csv
with open("employees.csv", "w") as f:
writer = csv.writer(f)
writer.writerow(["id", "name", "department", "salary"])
writer.writerow([1, "Alice", "Engineering", 95000])
writer.writerow([2, "Bob", "Sales", 75000])
writer.writerow([3, "Charlie", "Engineering", 105000])
Now query it with SQLStream:
from sqlstream import query
# Query with explicit source
results = query("employees.csv").sql("""
SELECT name, salary
FROM employees
WHERE department = 'Engineering'
""")
# Print results
for row in results:
print(f"{row['name']}: ${row['salary']:,}")
Output:
Alternative: Inline File Paths
You can also specify the file path directly in the SQL:
from sqlstream import query
# Query with inline source (extracted from SQL)
results = query().sql("""
SELECT name, salary
FROM 'employees.csv'
WHERE department = 'Engineering'
""")
for row in results:
print(f"{row['name']}: ${row['salary']:,}")
Step 3: Working with Results
Iterate Over Rows (Lazy Evaluation)
from sqlstream import query
results = query("employees.csv").sql("SELECT * FROM employees WHERE salary > 80000")
# Lazy iteration - rows are processed one at a time
for row in results:
print(row) # row is a dict
Convert to List
# Convert all results to a list (eager evaluation)
results_list = query("employees.csv").sql("SELECT * FROM employees").to_list()
print(f"Found {len(results_list)} employees")
Access as DataFrame (with pandas)
import pandas as pd
from sqlstream import query
results = query("employees.csv").sql("SELECT * FROM employees")
# Convert to pandas DataFrame
df = pd.DataFrame(results.to_list())
print(df)
Step 4: Advanced Queries
Aggregations
from sqlstream import query
# Count employees by department
results = query("employees.csv").sql("""
SELECT department, COUNT(*) as count, AVG(salary) as avg_salary
FROM employees
GROUP BY department
ORDER BY avg_salary DESC
""")
for row in results:
print(f"{row['department']}: {row['count']} employees, avg ${row['avg_salary']:,.0f}")
JOINs Across Files
from sqlstream import query
# Create orders data
import csv
with open("orders.csv", "w") as f:
writer = csv.writer(f)
writer.writerow(["order_id", "employee_id", "amount"])
writer.writerow([101, 1, 1500])
writer.writerow([102, 2, 2300])
writer.writerow([103, 1, 1800])
# Join employees and orders
results = query().sql("""
SELECT e.name, SUM(o.amount) as total_sales
FROM 'employees.csv' e
JOIN 'orders.csv' o ON e.id = o.employee_id
GROUP BY e.name
ORDER BY total_sales DESC
""")
for row in results:
print(f"{row['name']}: ${row['total_sales']:,}")
Step 5: Multiple File Formats
Query JSON Files
from sqlstream import query
# Query a JSON file with nested data
results = query("users.json#json:data.users").sql("""
SELECT name, email
FROM users
WHERE age > 25
""")
for row in results:
print(row)
Query Parquet Files
from sqlstream import query
# Parquet files are optimized for column-oriented queries
results = query("sales.parquet").sql("""
SELECT region, SUM(amount) as total
FROM sales
GROUP BY region
""")
for row in results:
print(f"{row['region']}: ${row['total']:,}")
Query HTML Tables
from sqlstream import query
# Extract and query tables from HTML
results = query("report.html#html:0").sql("""
SELECT * FROM report WHERE revenue > 100000
""")
for row in results:
print(row)
Step 6: Performance Optimization
Use the Pandas Backend
For large files, use the pandas backend for 10-100x speedup:
from sqlstream import query
# Specify backend explicitly
results = query("large_file.csv", backend="pandas").sql("""
SELECT * FROM large_file WHERE amount > 1000
""")
for row in results:
print(row)
Use the DuckDB Backend
For complex SQL queries, use DuckDB for 100x+ speedup:
from sqlstream import query
# DuckDB supports advanced SQL features
results = query("employees.csv", backend="duckdb").sql("""
WITH ranked AS (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY department ORDER BY salary DESC) as rank
FROM employees
)
SELECT * FROM ranked WHERE rank = 1
""")
for row in results:
print(row)
Step 7: Error Handling
from sqlstream import query
try:
results = query("data.csv").sql("SELECT * FROM data WHERE invalid_column = 1")
for row in results:
print(row)
except Exception as e:
print(f"Query error: {e}")
Common Use Cases
Data Exploration
from sqlstream import query
# Count rows
count = query("data.csv").sql("SELECT COUNT(*) as count FROM data").to_list()[0]['count']
print(f"Total rows: {count}")
# Get unique values
unique_categories = query("data.csv").sql("SELECT DISTINCT category FROM data")
for row in unique_categories:
print(row['category'])
# Summary statistics
stats = query("data.csv").sql("""
SELECT
MIN(price) as min_price,
MAX(price) as max_price,
AVG(price) as avg_price
FROM data
""").to_list()[0]
print(stats)
Data Cleaning
from sqlstream import query
import csv
# Remove duplicates and filter nulls
results = query("messy_data.csv").sql("""
SELECT DISTINCT *
FROM messy_data
WHERE name IS NOT NULL
AND age > 0
ORDER BY id
""")
# Export cleaned data
with open("clean_data.csv", "w") as f:
rows = results.to_list()
if rows:
writer = csv.DictWriter(f, fieldnames=rows[0].keys())
writer.writeheader()
writer.writerows(rows)
ETL Pipeline
from sqlstream import query
# Extract and Transform
result = query("customers.csv").sql("""
SELECT
c.name,
c.email,
COUNT(o.order_id) as total_orders,
SUM(o.amount) as total_spent
FROM customers c
LEFT JOIN 'orders.csv' o ON c.id = o.customer_id
GROUP BY c.name, c.email
HAVING total_orders > 0
ORDER BY total_spent DESC
""")
# Load - send to database, API, etc.
for row in result:
# Process each customer
print(f"{row['name']}: {row['total_orders']} orders, ${row['total_spent']:,}")
# INSERT INTO database, POST to API, etc.
What's Next?
- Basic Usage - Learn core Python API concepts
- Advanced Patterns - Custom readers, streaming, error handling
- Examples - More real-world Python examples
- API Reference - Complete API documentation