Skip to main content
Limitry handles usage metering, limit enforcement, and billing integration. Here’s how different businesses use it.

AI/LLM Platform

Track token usage by model, enforce limits, alert before cutoff, and sync to Stripe. The setup:
  • Meter tokens per model (GPT-4, Claude, etc.)
  • Monthly limits that reset on billing cycle
  • Alerts at 80% and 100%
  • Cost tracking for invoicing
from limitry import Limitry
from openai import OpenAI

limitry = Limitry()
openai = OpenAI()

# 1. Create customer with Stripe link
limitry.customers.create(
    external_id="cust_123",
    name="Acme Corp",
    metadata={
        "stripe_customer_id": "cus_xxx",
        "plan": "pro"
    },
    billing_cycle_start="2024-01-15",
    timezone="America/New_York"
)

# 2. Create meters for each model
limitry.meters.create(
    name="GPT-4 Tokens",
    aggregation="sum",
    field="values.tokens",
    event_filter={
        "event_type": "llm.completion",
        "dimensions": {"model": "gpt-4"}
    }
)

limitry.meters.create(
    name="Claude Tokens",
    aggregation="sum",
    field="values.tokens",
    event_filter={
        "event_type": "llm.completion",
        "dimensions": {"model": "claude-3"}
    }
)

# 3. Create limits with alerts
limitry.limits.create(
    customer_id="cust_123",
    meter_id="mtr_gpt4",
    limit_value=1_000_000,  # 1M tokens/month
    period="month",
    alert_thresholds=[80, 100]
)

# 4. In your API: check → call → record
async def completion(customer_id: str, prompt: str, model: str = "gpt-4"):
    # Check limits first
    check = limitry.limits.check(customer_id=customer_id)
    if not check.allowed:
        raise Exception(f"Limit exceeded: {check.limits[0].name}")

    # Make the LLM call
    response = openai.chat.completions.create(
        model=model,
        messages=[{"role": "user", "content": prompt}]
    )

    # Record usage with cost
    tokens = response.usage.total_tokens
    cost_cents = calculate_cost(model, tokens)

    limitry.events.record(
        customer_id=customer_id,
        event_type="llm.completion",
        values={"tokens": tokens, "cost_cents": cost_cents},
        dimensions={"model": model}
    )

    return response.choices[0].message.content
Relevant docs: Events, Meters, Limits, Webhooks

API-as-a-Service

Request quotas by tier with overage tracking. The setup:
  • Count API requests per endpoint
  • Different limits per plan (Free: 1K/day, Pro: 100K/day)
  • Project-wide defaults with customer overrides
  • Export usage for overage billing
from limitry import Limitry

limitry = Limitry()

# 1. Create meter for API requests
limitry.meters.create(
    name="API Requests",
    aggregation="count",
    event_filter={"event_type": "api.request"}
)

# 2. Project-wide default (Free tier)
limitry.limits.create(
    meter_id="mtr_api_requests",
    limit_value=1000,
    period="day",
    alert_thresholds=[80, 100]
)

# 3. Pro tier override for specific customer
limitry.limits.create(
    customer_id="cust_123",
    meter_id="mtr_api_requests",
    limit_value=100_000,
    period="day"
)

# 4. Enterprise with no daily limit (just tracking)
limitry.limits.create(
    customer_id="cust_enterprise",
    meter_id="mtr_api_requests",
    limit_value=999_999_999,  # Effectively unlimited
    period="day"
)

# 5. Middleware: check and record
def api_middleware(request, customer_id: str):
    # Check before processing
    check = limitry.limits.check(customer_id=customer_id)
    if not check.allowed:
        return {"error": "Rate limit exceeded"}, 429

    # Process request...
    response = handle_request(request)

    # Record after success
    limitry.events.record(
        customer_id=customer_id,
        event_type="api.request",
        dimensions={
            "endpoint": request.path,
            "method": request.method
        }
    )

    return response
Relevant docs: Check-Record Pattern, Dimensions

Multi-Tenant SaaS

Per-workspace limits with team-level tracking. The setup:
  • Customer = organization
  • Dimensions for workspace and team
  • Limits scoped by dimension filters
  • Usage breakdown by team
from limitry import Limitry

limitry = Limitry()

# 1. Meter all feature usage
limitry.meters.create(
    name="Feature Usage",
    aggregation="count",
    event_filter={"event_type": "feature.used"}
)

# 2. Organization-wide limit
limitry.limits.create(
    customer_id="org_acme",
    meter_id="mtr_feature_usage",
    limit_value=10_000,
    period="month"
)

# 3. Per-workspace limit (using dimension filter)
limitry.limits.create(
    customer_id="org_acme",
    meter_id="mtr_feature_usage",
    limit_value=2_000,
    period="month",
    dimension_filters={"workspace_id": "ws_engineering"}
)

# 4. Record with full dimensional context
def track_feature(org_id: str, workspace_id: str, team_id: str, feature: str):
    limitry.events.record(
        customer_id=org_id,
        event_type="feature.used",
        dimensions={
            "workspace_id": workspace_id,
            "team_id": team_id,
            "feature": feature
        }
    )

# 5. Query usage breakdown by team
breakdown = limitry.events.get_breakdown(
    group_by="team_id",
    start_date="2024-01-01T00:00:00Z",
    end_date="2024-01-31T23:59:59Z"
)

for item in breakdown.data:
    print(f"Team {item.dimension}: {item.total_events} events")
Relevant docs: Dimensions, Customers

Prepaid Credits

Pay-as-you-go with credit balances. The setup:
  • Credit balance (wallet) per customer
  • Check sufficiency before operations
  • Debit after successful operations
  • Top up on Stripe payment success
from limitry import Limitry

limitry = Limitry()

# 1. Create balance when customer signs up
limitry.balances.create(
    customer_id="cust_123",
    name="API Credits",
    unit="cents",
    initial_balance=5000  # $50.00
)

# 2. Check and debit for each operation
async def process_request(customer_id: str, operation_cost: int):
    # Check if they have enough
    check = limitry.balances.check_sufficiency(
        customer_id=customer_id,
        name="API Credits",
        amount=operation_cost
    )

    if not check.sufficient:
        raise Exception(f"Insufficient credits. Balance: {check.current_balance}")

    # Do the work...
    result = await do_expensive_operation()

    # Debit after success
    limitry.balances.debit(
        customer_id=customer_id,
        name="API Credits",
        amount=operation_cost,
        description=f"API call: {result.operation_id}"
    )

    return result

# 3. Top up on Stripe webhook
def handle_stripe_payment(event):
    if event.type == "payment_intent.succeeded":
        customer_id = event.data.object.metadata.get("limitry_customer_id")
        amount_cents = event.data.object.amount

        limitry.balances.credit(
            customer_id=customer_id,
            name="API Credits",
            amount=amount_cents,
            description=f"Stripe payment: {event.data.object.id}"
        )
Relevant docs: Balances

Developer Tools (Build Minutes)

Track compute resources with monthly resets. The setup:
  • Meter build duration in seconds
  • Monthly limits by plan
  • Alert before hitting limit
  • max aggregation for concurrent builds
from limitry import Limitry
import time

limitry = Limitry()

# 1. Meter for build minutes (sum of duration)
limitry.meters.create(
    name="Build Minutes",
    aggregation="sum",
    field="values.duration_seconds",
    event_filter={"event_type": "build.completed"}
)

# 2. Meter for concurrent builds (max at any point)
limitry.meters.create(
    name="Concurrent Builds",
    aggregation="max",
    field="values.concurrent",
    event_filter={"event_type": "build.started"}
)

# 3. Monthly build minutes limit
limitry.limits.create(
    customer_id="cust_123",
    meter_id="mtr_build_minutes",
    limit_value=6000,  # 100 hours
    period="month",
    alert_thresholds=[50, 80, 100]
)

# 4. Track builds
def start_build(customer_id: str, build_id: str, concurrent_count: int):
    # Check limit before starting
    check = limitry.limits.check(customer_id=customer_id)
    if not check.allowed:
        raise Exception("Build minutes exhausted")

    # Record concurrent count
    limitry.events.record(
        customer_id=customer_id,
        event_type="build.started",
        values={"concurrent": concurrent_count},
        dimensions={"build_id": build_id}
    )

def complete_build(customer_id: str, build_id: str, duration_seconds: int):
    limitry.events.record(
        customer_id=customer_id,
        event_type="build.completed",
        values={"duration_seconds": duration_seconds},
        dimensions={"build_id": build_id}
    )
Relevant docs: Meters (aggregation types)

Stripe Integration

Sync usage to Stripe for metered billing. The setup:
  • Limitry tracks usage in real-time
  • Webhook at threshold triggers Stripe meter update
  • End-of-period job reports final usage
from limitry import Limitry
import stripe

limitry = Limitry()

# 1. Customer with Stripe IDs in metadata
limitry.customers.create(
    external_id="cust_123",
    metadata={
        "stripe_customer_id": "cus_xxx",
        "stripe_subscription_id": "sub_xxx",
        "stripe_subscription_item_id": "si_xxx"  # For metered billing
    }
)

# 2. Handle Limitry webhook (threshold reached)
def handle_limitry_webhook(payload):
    if payload["type"] == "limit.threshold_reached":
        customer_id = payload["data"]["customer_id"]
        threshold = payload["data"]["threshold"]
        used = payload["data"]["used"]

        # Get Stripe IDs from customer metadata
        customer = limitry.customers.retrieve(customer_id)
        stripe_si_id = customer.metadata["stripe_subscription_item_id"]

        # Report to Stripe
        stripe.billing.MeterEvent.create(
            event_name="api_requests",
            payload={
                "value": used,
                "stripe_customer_id": customer.metadata["stripe_customer_id"]
            }
        )

# 3. End-of-period sync (run via cron at billing cycle end)
def sync_usage_to_stripe(customer_id: str):
    customer = limitry.customers.retrieve(customer_id)

    # Get usage from Limitry
    limits = limitry.limits.check(customer_id=customer_id)

    for limit in limits.limits:
        stripe.billing.MeterEvent.create(
            event_name=limit.meter_id,
            payload={
                "value": limit.used,
                "stripe_customer_id": customer.metadata["stripe_customer_id"]
            }
        )
Relevant docs: Webhooks, Customers

Summary

Use CaseKey Features
AI/LLM PlatformToken meters, model dimensions, cost tracking, alerts
API-as-a-ServiceRequest counts, tiered limits, project defaults
Multi-Tenant SaaSDimension filters, workspace/team scoping, breakdowns
Prepaid CreditsBalances, check/debit pattern, Stripe top-up
Developer ToolsDuration tracking, sum/max aggregations
Stripe IntegrationWebhooks, metadata, metered billing sync

What Limitry Handles

You DoLimitry Does
Record events after operationsAggregate into meters
Check limits before operationsEnforce and return status
Store Stripe IDs in metadataFire webhooks at thresholds
Handle webhook in your backendTrack usage in real-time
You focus on your business logic. Limitry handles metering, limits, and billing integration.