Source code for limitry.client.resources.events

"""Auto-generated operation classes.

This file is auto-generated by the operators generator.
Do not edit this file manually.
"""

from typing import TYPE_CHECKING, Any, Dict, List, Optional

from limitry.client.types import (
    Event,
    EventsBatchPostRequest,
    EventsBatchPostResponse,
    EventsPostRequest,
    EventsPostResponse
)
from limitry.client.utils.pagination import PaginatedResponse

if TYPE_CHECKING:
    from limitry.client.client import Client


[docs] class Events: """Operations for events."""
[docs] def __init__(self, client: "Client") -> None: """Initialize Events operations. Args: client: The Limitry client instance """ self._client = client
[docs] async def list( self, customer_id: Optional[str] = None, event_type: Optional[str] = None, model: Optional[str] = None, limit: Optional[str] = None, cursor: Optional[str] = None ) -> PaginatedResponse[Event]: """List usage events. Retrieve a paginated list of usage events with optional filtering. Use cursor-based pagination by passing the `nextCursor` from the previous response. You can filter events by customer, event type, or model. **Pagination:** - Use the `cursor` parameter from the previous response's `nextCursor` field - The `hasMore` field indicates if there are more results - Maximum limit is 100 events per request. Args: customer_id: Filter events by customer ID event_type: Filter events by type (e.g., "model_call", "embedding") model: Filter events by model name (e.g., "gpt-4", "claude-3") limit: Maximum number of events to return (1-100, default: 50) (default: "50") cursor: Pagination cursor from the previous response Returns: PaginatedResponse[Event]: List of events Example:: items = await client.events.list() for item in items.data: print(item) Raises: APIError: Invalid query parameters AuthenticationError: Unauthorized - Invalid or missing API key NetworkError: If a network error occurs""" params = { "customerId": customer_id, "eventType": event_type, "model": model, "limit": limit, "cursor": cursor, } params = {k: v for k, v in params.items() if v is not None} response = await self._client.request("GET", "/events", params=params) return PaginatedResponse( data=[Event(**item) for item in response['data']], next_cursor=response.get('nextCursor'), has_more=response.get('hasMore', False) )
[docs] async def ingest( self, request: EventsPostRequest ) -> EventsPostResponse: """Ingest a usage event. Record a single usage event for metering and analytics. This endpoint only records the event without checking quotas or rate limits. For quota enforcement and rate limiting, use `POST /v1/track` instead. **When to use this endpoint:** - Recording events for analytics only - Post-event tracking after successful operations - Bulk ingestion without enforcement **When to use /v1/track instead:** - Need quota enforcement before processing - Need rate limit checking - Want all-in-one tracking with enforcement. Args: request: EventsPostRequest object Returns: EventsPostResponse: Event ingested successfully Example:: request = EventsPostRequest( customerId="customerid_123", eventType="example", model="example", provider="provider_123" # ... other properties ) result = await client.events.ingest( request ) print(result) Raises: APIError: Invalid request body AuthenticationError: Unauthorized - Invalid or missing API key NetworkError: If a network error occurs""" response = await self._client.request("POST", "/events", json=request.model_dump(mode='json', exclude_none=True)) return EventsPostResponse(**response)
[docs] async def ingest_batch( self, request: EventsBatchPostRequest ) -> EventsBatchPostResponse: """Ingest multiple usage events. Record multiple usage events in a single request for efficient bulk ingestion. **Limits:** - Maximum 1000 events per batch - Events are processed asynchronously - All events must be valid or the entire batch will be rejected **Use cases:** - Bulk import of historical data - Batch processing of events - High-throughput event ingestion For quota enforcement and rate limiting, use `POST /v1/track` instead. Args: request: EventsBatchPostRequest object Returns: EventsBatchPostResponse: Events ingested successfully Example:: request = EventsBatchPostRequest( events=[] ) result = await client.events.ingest_batch( request ) print(result) Raises: APIError: Invalid request body AuthenticationError: Unauthorized - Invalid or missing API key NetworkError: If a network error occurs""" response = await self._client.request("POST", "/events/batch", json=request.model_dump(mode='json', exclude_none=True)) return EventsBatchPostResponse(**response)