Source code for limitry.client.resources.events
"""Auto-generated operation classes.
This file is auto-generated by the operators generator.
Do not edit this file manually.
"""
from typing import TYPE_CHECKING, Any, Dict, List, Optional
from limitry.client.types import (
Event,
EventsBatchPostRequest,
EventsBatchPostResponse,
EventsPostRequest,
EventsPostResponse
)
from limitry.client.utils.pagination import PaginatedResponse
if TYPE_CHECKING:
from limitry.client.client import Client
[docs]
class Events:
"""Operations for events."""
[docs]
def __init__(self, client: "Client") -> None:
"""Initialize Events operations.
Args:
client: The Limitry client instance
"""
self._client = client
[docs]
async def list(
self,
customer_id: Optional[str] = None,
event_type: Optional[str] = None,
model: Optional[str] = None,
limit: Optional[str] = None,
cursor: Optional[str] = None
) -> PaginatedResponse[Event]:
"""List usage events.
Retrieve a paginated list of usage events with optional filtering.
Use cursor-based pagination by passing the `nextCursor` from the previous response. You can filter events by customer, event type, or model.
**Pagination:**
- Use the `cursor` parameter from the previous response's `nextCursor` field
- The `hasMore` field indicates if there are more results
- Maximum limit is 100 events per request.
Args:
customer_id: Filter events by customer ID
event_type: Filter events by type (e.g., "model_call", "embedding")
model: Filter events by model name (e.g., "gpt-4", "claude-3")
limit: Maximum number of events to return (1-100, default: 50) (default: "50")
cursor: Pagination cursor from the previous response
Returns:
PaginatedResponse[Event]: List of events
Example::
items = await client.events.list()
for item in items.data:
print(item)
Raises:
APIError: Invalid query parameters
AuthenticationError: Unauthorized - Invalid or missing API key
NetworkError: If a network error occurs"""
params = {
"customerId": customer_id,
"eventType": event_type,
"model": model,
"limit": limit,
"cursor": cursor,
}
params = {k: v for k, v in params.items() if v is not None}
response = await self._client.request("GET", "/events", params=params)
return PaginatedResponse(
data=[Event(**item) for item in response['data']],
next_cursor=response.get('nextCursor'),
has_more=response.get('hasMore', False)
)
[docs]
async def ingest(
self,
request: EventsPostRequest
) -> EventsPostResponse:
"""Ingest a usage event.
Record a single usage event for metering and analytics.
This endpoint only records the event without checking quotas or rate limits. For quota enforcement and rate limiting, use `POST /v1/track` instead.
**When to use this endpoint:**
- Recording events for analytics only
- Post-event tracking after successful operations
- Bulk ingestion without enforcement
**When to use /v1/track instead:**
- Need quota enforcement before processing
- Need rate limit checking
- Want all-in-one tracking with enforcement.
Args:
request: EventsPostRequest object
Returns:
EventsPostResponse: Event ingested successfully
Example::
request = EventsPostRequest(
customerId="customerid_123",
eventType="example",
model="example",
provider="provider_123"
# ... other properties
)
result = await client.events.ingest(
request
)
print(result)
Raises:
APIError: Invalid request body
AuthenticationError: Unauthorized - Invalid or missing API key
NetworkError: If a network error occurs"""
response = await self._client.request("POST", "/events", json=request.model_dump(mode='json', exclude_none=True))
return EventsPostResponse(**response)
[docs]
async def ingest_batch(
self,
request: EventsBatchPostRequest
) -> EventsBatchPostResponse:
"""Ingest multiple usage events.
Record multiple usage events in a single request for efficient bulk ingestion.
**Limits:**
- Maximum 1000 events per batch
- Events are processed asynchronously
- All events must be valid or the entire batch will be rejected
**Use cases:**
- Bulk import of historical data
- Batch processing of events
- High-throughput event ingestion
For quota enforcement and rate limiting, use `POST /v1/track` instead.
Args:
request: EventsBatchPostRequest object
Returns:
EventsBatchPostResponse: Events ingested successfully
Example::
request = EventsBatchPostRequest(
events=[]
)
result = await client.events.ingest_batch(
request
)
print(result)
Raises:
APIError: Invalid request body
AuthenticationError: Unauthorized - Invalid or missing API key
NetworkError: If a network error occurs"""
response = await self._client.request("POST", "/events/batch", json=request.model_dump(mode='json', exclude_none=True))
return EventsBatchPostResponse(**response)