Skip to main content
The Lightdash Python SDK lets you query your semantic layer directly from Python. Use it in Jupyter notebooks, Python scripts, or anywhere you use Python to ensure everyone pulls from a single source of truth.

See it in action

Try the getting started Jupyter notebook for a hands-on walkthrough.

Installation

pip install lightdash

Quick start

from lightdash import Client

client = Client(
    instance_url="https://app.lightdash.cloud",
    access_token="your-token",
    project_uuid="your-uuid",
)
model = client.get_model("orders")

# Build and execute a query
result = (
    model.query()
    .metrics(model.metrics.revenue, model.metrics.profit)
    .dimensions(model.dimensions.country)
    .filter(model.dimensions.status == "active")
    .sort(model.metrics.revenue.desc())
    .limit(100)
    .execute()
)

# Get results as a DataFrame
df = result.to_df()

# Or as a list of dictionaries
records = result.to_records()

Query builder

The SDK provides two patterns for building queries: single-call and chainable builder.

Single-call pattern

Pass all parameters at once for simple queries:
query = model.query(
    metrics=[model.metrics.revenue, model.metrics.profit],
    dimensions=[model.dimensions.country],
    filters=model.dimensions.status == "active",
    sort=model.metrics.revenue.desc(),
    limit=100
)
result = query.execute()

Chainable builder pattern

Build queries incrementally with method chaining. Each method returns a new immutable Query object:
query = (
    model.query()
    .metrics(model.metrics.revenue)
    .dimensions(model.dimensions.country, model.dimensions.date)
    .filter(model.dimensions.status == "active")
    .sort(model.metrics.revenue.desc())
    .limit(100)
)
Key characteristics:
  • Immutable — each method returns a new Query object, safe for reuse
  • Lazy evaluation — API calls only happen when .execute() is called
  • Order-independent — methods can be called in any order
  • Composable — create base queries and extend them
# Create a reusable base query
base = model.query().metrics(model.metrics.revenue).dimensions(model.dimensions.country)

# Extend it for different use cases
by_active = base.filter(model.dimensions.status == "active")
by_inactive = base.filter(model.dimensions.status == "inactive")

Dimensions and metrics

Access dimensions and metrics as attributes on the model:
# Access via attribute
country = model.dimensions.country
revenue = model.metrics.revenue

# List all available
all_dimensions = model.dimensions.list()
all_metrics = model.metrics.list()
Features:
  • Lazy loading — fetched from API on first access, then cached
  • Fuzzy matching — typos suggest closest matches
  • Tab completion — works in Jupyter/IPython for discovery
  • Rich display — HTML rendering in notebooks

Filters

Use Python comparison operators on dimensions to create filters:
# Equality and inequality
model.dimensions.country == "USA"
model.dimensions.country != "USA"

# Numeric comparisons
model.dimensions.amount > 1000
model.dimensions.amount >= 1000
model.dimensions.amount < 500
model.dimensions.amount <= 500

# String operations
model.dimensions.name.starts_with("John")
model.dimensions.name.ends_with("son")
model.dimensions.name.includes("Smith")

# List membership
model.dimensions.country.in_(["USA", "UK", "Canada"])

# Null checks
model.dimensions.email.is_null()
model.dimensions.email.is_not_null()

# Range checks
model.dimensions.order_date.between("2024-01-01", "2024-12-31")
model.dimensions.amount.not_between(100, 500)

Supported operators by data type

OperatorNumericStringBooleanDate
is nullYesYesYesYes
is not nullYesYesYesYes
equals / isYesYesYesYes
is notYesYes-Yes
is less thanYes---
is greater thanYes---
starts with-Yes--
ends with-Yes--
includes-Yes--
in the last---Yes
in the next---Yes
in the current---Yes
is before---Yes
is after---Yes
is betweenYes--Yes
is not betweenYes--Yes

Combining filters

Use & (AND) and | (OR) to combine filters:
# AND: Both conditions must be true
f = (model.dimensions.country == "USA") & (model.dimensions.amount > 1000)

# OR: Either condition must be true
f = (model.dimensions.status == "active") | (model.dimensions.status == "pending")

# Complex combinations
f = (
    (model.dimensions.country == "USA") &
    ((model.dimensions.amount > 1000) | (model.dimensions.priority == "high"))
)
Multiple .filter() calls on a query are combined with AND logic:
query = (
    model.query()
    .filter(model.dimensions.country == "USA")
    .filter(model.dimensions.amount > 1000)  # AND-ed with above
)

Sorting

Sort results using the .sort() method:
from lightdash import Sort

# Using metric/dimension methods (recommended)
query = model.query().sort(model.metrics.revenue.desc())
query = model.query().sort(model.dimensions.country.asc())

# Multiple sorts
query = model.query().sort(
    model.metrics.revenue.desc(),
    model.dimensions.country.asc()
)

# Control null positioning
query = model.query().sort(model.dimensions.name.asc(nulls_first=True))

# Using Sort class directly
query = model.query().sort(Sort("orders_revenue", descending=True))

Results

Query results implement a unified ResultSet interface.

Converting results

result = query.execute()

# To pandas DataFrame
df = result.to_df()  # or result.to_df(backend="pandas")

# To polars DataFrame
df = result.to_df(backend="polars")

# To list of dictionaries
records = result.to_records()

# To JSON string
json_str = result.to_json_str()

Iterating over results

# Iterate over rows
for row in result:
    print(row)

# Get total count
total = len(result)

Pagination

For large result sets, results are paginated automatically:
result = query.execute()

# Access specific page
page_2 = result.page(2)

# Iterate through all pages
for page in result.iter_pages():
    process(page)

# Lazy DataFrame loading (polars only)
lazy_df = result.to_df_lazy()
Available properties on the result:
  • result.query_uuid — unique identifier for the query
  • result.total_results — total number of rows
  • result.total_pages — number of pages
  • result.fields — field metadata

SQL runner

Execute raw SQL queries directly against your data warehouse:
# Execute SQL
result = client.sql("SELECT * FROM orders WHERE status = 'active' LIMIT 100")
df = result.to_df()

# With custom limit
result = client.sql("SELECT * FROM orders", limit=1000)

# Introspection
tables = client.sql_runner.tables()
fields = client.sql_runner.fields("orders")
fields = client.sql_runner.fields("orders", schema="public")

Exception handling

The SDK provides specific exceptions for different error conditions:
from lightdash import LightdashError, QueryError, QueryTimeout, QueryCancelled

try:
    result = query.execute()
except QueryTimeout as e:
    print(f"Query timed out: {e.query_uuid}")
except QueryCancelled as e:
    print(f"Query was cancelled: {e.query_uuid}")
except QueryError as e:
    print(f"Query failed: {e.message}")
except LightdashError as e:
    print(f"Lightdash error: {e.message} (status: {e.status_code})")
Exception hierarchy:
  • LightdashError — base exception for all SDK errors
    • QueryError — query execution failed (HTTP 400)
    • QueryTimeout — query exceeded timeout (HTTP 408)
    • QueryCancelled — query was cancelled (HTTP 499)

Complete example

from lightdash import Client, QueryError, QueryTimeout

# Initialize client
client = Client(
    instance_url="https://app.lightdash.cloud",
    access_token="your-api-token",
    project_uuid="your-project-uuid",
)

# Get a model
model = client.get_model("orders")

# Build a query with filters
query = (
    model.query()
    .metrics(model.metrics.total_revenue, model.metrics.order_count)
    .dimensions(model.dimensions.country, model.dimensions.order_date)
    .filter(
        (model.dimensions.status == "completed") &
        (model.dimensions.order_date >= "2024-01-01")
    )
    .sort(model.metrics.total_revenue.desc())
    .limit(50)
)

try:
    result = query.execute()
    df = result.to_df()
    print(f"Fetched {len(result)} rows")
    print(df.head())
except QueryTimeout:
    print("Query took too long - try adding more filters")
except QueryError as e:
    print(f"Query failed: {e.message}")

For the full SDK source and more examples, see the Python SDK on GitHub.