Python SDK
The official NoLag SDK for Python. Full async/await support with type hints for Python 3.10+.
Installation
# pip
pip install nolag
# poetry
poetry add nolag
# uv
uv add nolagQuick Start
import asyncio
from nolag import NoLag
async def main():
# Create client with your actor token
client = NoLag("your-actor-token")
# Connect to NoLag
await client.connect()
# Subscribe to a topic
await client.subscribe("my-topic")
# Listen for messages on that topic
def on_message(data, meta):
print(f"Received: {data}")
client.on("my-topic", on_message)
# Publish a message
await client.emit("my-topic", {"hello": "world"})
# Keep running
await asyncio.sleep(60)
# Disconnect when done (synchronous)
client.disconnect()
asyncio.run(main())Configuration
from nolag import NoLag, NoLagOptions, QoS
options = NoLagOptions(
url="wss://broker.nolag.app/ws", # Custom broker URL
reconnect=True, # Auto-reconnect on disconnect
reconnect_interval=5.0, # Seconds between reconnect attempts
max_reconnect_attempts=10, # Max reconnect attempts (0 = infinite)
heartbeat_interval=30.0, # Heartbeat interval (0 to disable)
disconnect_on_hidden=False, # Disconnect when tab is hidden
qos=QoS.AT_LEAST_ONCE, # Default QoS level
load_balance=False, # Enable load balancing
load_balance_group=None, # Load balance group name
debug=True, # Enable debug logging
)
client = NoLag("your-actor-token", options)Fluent API
The fluent API lets you scope operations to a specific app and room, so you don't need to repeat the full topic path every time.
# Use the fluent API to scope topics to an app and room
room = client.set_app("chat").set_room("general")
# Subscribe, listen, and emit within the room
await room.subscribe("messages")
room.on("messages", lambda data, meta: print(data))
await room.emit("messages", {"text": "Hello from general!"})
# This is equivalent to using full topic paths:
# await client.subscribe("chat/general/messages")
# client.on("chat/general/messages", handler)
# await client.emit("chat/general/messages", data)
# Room also supports filter management
await room.set_filters("messages", ["lang:en", "lang:fr"])
await room.add_filters("messages", ["lang:de"])
await room.remove_filters("messages", ["lang:fr"])
# Unsubscribe within the room
await room.unsubscribe("messages")Subscribing to Topics
subscribe() registers interest in a topic. Use on() separately to attach a message handler.
from nolag import SubscribeOptions, QoS
# Basic subscription + handler
await client.subscribe("chat/messages")
client.on("chat/messages", lambda data, meta: print(data))
# With options (filters, load balancing)
options = SubscribeOptions(
qos=QoS.EXACTLY_ONCE,
load_balance=True,
load_balance_group="workers",
filters=["priority:high", "region:us"]
)
await client.subscribe("tasks", options)
client.on("tasks", lambda data, meta: print(f"Task: {data}"))
# Unsubscribe
await client.unsubscribe("chat/messages")Publishing Messages
from nolag import EmitOptions, QoS
# Publish any data (dict, list, string, bytes, etc.)
await client.emit("chat/messages", {"text": "Hello!"})
# With options
options = EmitOptions(
qos=QoS.EXACTLY_ONCE,
retain=True, # Retain last message for new subscribers
echo=False, # Don't receive your own message back
filter="priority:high" # Publish with a filter value
)
await client.emit("status", {"online": True}, options)Filters
Filters let subscribers receive only messages matching specific criteria. Publishers tag messages with a filter value via EmitOptions.filter, and subscribers set which filters they care about.
from nolag import SubscribeOptions, EmitOptions
# Subscribe with initial filters
await client.subscribe("events", SubscribeOptions(
filters=["region:us", "priority:high"]
))
client.on("events", lambda data, meta: print(data, meta.filter))
# Replace all filters for a topic
await client.set_filters("events", ["region:eu"])
# Add filters to the existing set
await client.add_filters("events", ["priority:critical"])
# Remove specific filters
await client.remove_filters("events", ["region:eu"])
# Clear all filters (switches back to wildcard / receive all)
await client.set_filters("events", [])
# Publish with a filter value
await client.emit("events", {"msg": "US high-priority"},
EmitOptions(filter="region:us"))Connection Events
from nolag import ConnectionStatus
# Listen for connection events
client.on("connect", lambda: print("Connected!"))
client.on("disconnect", lambda reason: print(f"Disconnected: {reason}"))
client.on("reconnect", lambda: print("Reconnected!"))
client.on("error", lambda err: print(f"Error: {err}"))
# Check connection status
if client.status == ConnectionStatus.CONNECTED:
print("We're connected!")Wildcard Handler (on_any / off_any)
Use on_any() to receive every message regardless of topic. The handler receives (topic, data, meta).
# Listen for ALL messages regardless of topic
def log_all(topic, data, meta):
print(f"[{topic}] {data} (from: {meta.sender})")
client.on_any(log_all)
# Remove a specific on_any handler
client.off_any(log_all)
# Remove ALL on_any handlers
client.off_any()Presence
# Set your presence data
await client.set_presence({"status": "online", "typing": False})
# Get presence for a specific actor by their actor_token_id
actor = client.get_presence("some-actor-token-id")
if actor:
print(f"{actor.actor_token_id}: {actor.presence}")
# Get all presence data
for actor in client.get_all_presence():
print(f"{actor.actor_token_id} ({actor.actor_type}): {actor.presence}")
# Clear your presence
await client.clear_presence()
# Listen for presence events
client.on("presence:join", lambda actor: print(f"Joined: {actor.actor_token_id}"))
client.on("presence:leave", lambda actor: print(f"Left: {actor.actor_token_id}"))
client.on("presence:update", lambda actor: print(f"Updated: {actor.presence}"))Error Handling
import asyncio
from nolag import NoLag
async def main():
client = NoLag("your-actor-token")
try:
await client.connect()
except Exception as e:
print(f"Connection failed: {e}")
return
# Handle errors during operation
client.on("error", lambda err: print(f"Error: {err}"))
try:
await client.emit("topic", {"data": "value"})
except Exception as e:
print(f"Emit failed: {e}")
# Disconnect is synchronous — no await needed
client.disconnect()
asyncio.run(main())QoS Levels
| Level | Name | Description |
|---|---|---|
| 0 | AT_MOST_ONCE | Fire and forget, no acknowledgment |
| 1 | AT_LEAST_ONCE | Guaranteed delivery, may have duplicates |
| 2 | EXACTLY_ONCE | Guaranteed exactly one delivery |
REST API Client
The SDK also includes a REST API client for managing apps, rooms, and actors:
from nolag import NoLagApi, AppCreate, RoomCreate, ActorCreate
async def main():
# Create API client with project-scoped API key
async with NoLagApi("nlg_live_xxx.secret") as api:
# List all apps in your project
apps = await api.apps.list()
print(f"Found {len(apps.data)} apps")
# Create a new app
app = await api.apps.create(AppCreate(
name="my-chat-app",
description="A real-time chat application"
))
# Create a room in the app
room = await api.rooms.create(app.app_id, RoomCreate(
name="general",
slug="general"
))
# Create an actor (save the access token!)
actor = await api.actors.create(ActorCreate(
name="web-client",
actor_type="device"
))
print(f"Actor token: {actor.access_token}")Load Balancing
Distribute messages across multiple subscribers:
from nolag import NoLag, NoLagOptions, SubscribeOptions
# Option 1: Enable load balancing globally via NoLagOptions
client = NoLag("your-actor-token", NoLagOptions(
load_balance=True,
load_balance_group="task-workers"
))
# Option 2: Enable per-subscription
await client.subscribe(
"tasks",
SubscribeOptions(
load_balance=True,
load_balance_group="task-workers"
)
)
client.on("tasks", lambda data, meta: print(f"Processing: {data}"))Type Definitions
from nolag import (
# WebSocket Client
NoLag, # Main client class
NoLagOptions, # Connection options
SubscribeOptions, # Subscription options
EmitOptions, # Publish options
ConnectionStatus, # Connection status enum
ActorType, # Actor type enum
QoS, # QoS level enum
MessageMeta, # Message metadata
ActorPresence, # Presence info
# REST API Client
NoLagApi, # REST API client
NoLagApiError, # API error class
App, AppCreate, AppUpdate,
Room, RoomCreate, RoomUpdate,
Actor, ActorWithToken, ActorCreate, ActorUpdate,
)Requirements
- Python 3.10+
- websockets >= 12.0
- msgpack >= 1.0.0
- aiohttp >= 3.9.0