Python SDK

The official NoLag SDK for Python. Full async/await support with type hints for Python 3.10+.

Installation

# pip
pip install nolag

# poetry
poetry add nolag

# uv
uv add nolag

Quick Start

import asyncio
from nolag import NoLag

async def main():
    # Create client with your actor token
    client = NoLag("your-actor-token")

    # Connect to NoLag
    await client.connect()

    # Subscribe to a topic
    await client.subscribe("my-topic")

    # Listen for messages on that topic
    def on_message(data, meta):
        print(f"Received: {data}")

    client.on("my-topic", on_message)

    # Publish a message
    await client.emit("my-topic", {"hello": "world"})

    # Keep running
    await asyncio.sleep(60)

    # Disconnect when done (synchronous)
    client.disconnect()

asyncio.run(main())

Configuration

from nolag import NoLag, NoLagOptions, QoS

options = NoLagOptions(
    url="wss://broker.nolag.app/ws",  # Custom broker URL
    reconnect=True,                     # Auto-reconnect on disconnect
    reconnect_interval=5.0,             # Seconds between reconnect attempts
    max_reconnect_attempts=10,          # Max reconnect attempts (0 = infinite)
    heartbeat_interval=30.0,            # Heartbeat interval (0 to disable)
    disconnect_on_hidden=False,         # Disconnect when tab is hidden
    qos=QoS.AT_LEAST_ONCE,              # Default QoS level
    load_balance=False,                 # Enable load balancing
    load_balance_group=None,            # Load balance group name
    debug=True,                         # Enable debug logging
)

client = NoLag("your-actor-token", options)

Fluent API

The fluent API lets you scope operations to a specific app and room, so you don't need to repeat the full topic path every time.

# Use the fluent API to scope topics to an app and room
room = client.set_app("chat").set_room("general")

# Subscribe, listen, and emit within the room
await room.subscribe("messages")
room.on("messages", lambda data, meta: print(data))
await room.emit("messages", {"text": "Hello from general!"})

# This is equivalent to using full topic paths:
# await client.subscribe("chat/general/messages")
# client.on("chat/general/messages", handler)
# await client.emit("chat/general/messages", data)

# Room also supports filter management
await room.set_filters("messages", ["lang:en", "lang:fr"])
await room.add_filters("messages", ["lang:de"])
await room.remove_filters("messages", ["lang:fr"])

# Unsubscribe within the room
await room.unsubscribe("messages")

Subscribing to Topics

subscribe() registers interest in a topic. Use on() separately to attach a message handler.

from nolag import SubscribeOptions, QoS

# Basic subscription + handler
await client.subscribe("chat/messages")
client.on("chat/messages", lambda data, meta: print(data))

# With options (filters, load balancing)
options = SubscribeOptions(
    qos=QoS.EXACTLY_ONCE,
    load_balance=True,
    load_balance_group="workers",
    filters=["priority:high", "region:us"]
)
await client.subscribe("tasks", options)
client.on("tasks", lambda data, meta: print(f"Task: {data}"))

# Unsubscribe
await client.unsubscribe("chat/messages")

Publishing Messages

from nolag import EmitOptions, QoS

# Publish any data (dict, list, string, bytes, etc.)
await client.emit("chat/messages", {"text": "Hello!"})

# With options
options = EmitOptions(
    qos=QoS.EXACTLY_ONCE,
    retain=True,          # Retain last message for new subscribers
    echo=False,           # Don't receive your own message back
    filter="priority:high" # Publish with a filter value
)
await client.emit("status", {"online": True}, options)

Filters

Filters let subscribers receive only messages matching specific criteria. Publishers tag messages with a filter value via EmitOptions.filter, and subscribers set which filters they care about.

from nolag import SubscribeOptions, EmitOptions

# Subscribe with initial filters
await client.subscribe("events", SubscribeOptions(
    filters=["region:us", "priority:high"]
))
client.on("events", lambda data, meta: print(data, meta.filter))

# Replace all filters for a topic
await client.set_filters("events", ["region:eu"])

# Add filters to the existing set
await client.add_filters("events", ["priority:critical"])

# Remove specific filters
await client.remove_filters("events", ["region:eu"])

# Clear all filters (switches back to wildcard / receive all)
await client.set_filters("events", [])

# Publish with a filter value
await client.emit("events", {"msg": "US high-priority"},
    EmitOptions(filter="region:us"))

Connection Events

from nolag import ConnectionStatus

# Listen for connection events
client.on("connect", lambda: print("Connected!"))
client.on("disconnect", lambda reason: print(f"Disconnected: {reason}"))
client.on("reconnect", lambda: print("Reconnected!"))
client.on("error", lambda err: print(f"Error: {err}"))

# Check connection status
if client.status == ConnectionStatus.CONNECTED:
    print("We're connected!")

Wildcard Handler (on_any / off_any)

Use on_any() to receive every message regardless of topic. The handler receives (topic, data, meta).

# Listen for ALL messages regardless of topic
def log_all(topic, data, meta):
    print(f"[{topic}] {data} (from: {meta.sender})")

client.on_any(log_all)

# Remove a specific on_any handler
client.off_any(log_all)

# Remove ALL on_any handlers
client.off_any()

Presence

# Set your presence data
await client.set_presence({"status": "online", "typing": False})

# Get presence for a specific actor by their actor_token_id
actor = client.get_presence("some-actor-token-id")
if actor:
    print(f"{actor.actor_token_id}: {actor.presence}")

# Get all presence data
for actor in client.get_all_presence():
    print(f"{actor.actor_token_id} ({actor.actor_type}): {actor.presence}")

# Clear your presence
await client.clear_presence()

# Listen for presence events
client.on("presence:join", lambda actor: print(f"Joined: {actor.actor_token_id}"))
client.on("presence:leave", lambda actor: print(f"Left: {actor.actor_token_id}"))
client.on("presence:update", lambda actor: print(f"Updated: {actor.presence}"))

Error Handling

import asyncio
from nolag import NoLag

async def main():
    client = NoLag("your-actor-token")

    try:
        await client.connect()
    except Exception as e:
        print(f"Connection failed: {e}")
        return

    # Handle errors during operation
    client.on("error", lambda err: print(f"Error: {err}"))

    try:
        await client.emit("topic", {"data": "value"})
    except Exception as e:
        print(f"Emit failed: {e}")

    # Disconnect is synchronous — no await needed
    client.disconnect()

asyncio.run(main())

QoS Levels

LevelNameDescription
0AT_MOST_ONCEFire and forget, no acknowledgment
1AT_LEAST_ONCEGuaranteed delivery, may have duplicates
2EXACTLY_ONCEGuaranteed exactly one delivery

REST API Client

The SDK also includes a REST API client for managing apps, rooms, and actors:

from nolag import NoLagApi, AppCreate, RoomCreate, ActorCreate

async def main():
    # Create API client with project-scoped API key
    async with NoLagApi("nlg_live_xxx.secret") as api:
        # List all apps in your project
        apps = await api.apps.list()
        print(f"Found {len(apps.data)} apps")

        # Create a new app
        app = await api.apps.create(AppCreate(
            name="my-chat-app",
            description="A real-time chat application"
        ))

        # Create a room in the app
        room = await api.rooms.create(app.app_id, RoomCreate(
            name="general",
            slug="general"
        ))

        # Create an actor (save the access token!)
        actor = await api.actors.create(ActorCreate(
            name="web-client",
            actor_type="device"
        ))
        print(f"Actor token: {actor.access_token}")

Load Balancing

Distribute messages across multiple subscribers:

from nolag import NoLag, NoLagOptions, SubscribeOptions

# Option 1: Enable load balancing globally via NoLagOptions
client = NoLag("your-actor-token", NoLagOptions(
    load_balance=True,
    load_balance_group="task-workers"
))

# Option 2: Enable per-subscription
await client.subscribe(
    "tasks",
    SubscribeOptions(
        load_balance=True,
        load_balance_group="task-workers"
    )
)
client.on("tasks", lambda data, meta: print(f"Processing: {data}"))

Type Definitions

from nolag import (
    # WebSocket Client
    NoLag,              # Main client class
    NoLagOptions,       # Connection options
    SubscribeOptions,   # Subscription options
    EmitOptions,        # Publish options
    ConnectionStatus,   # Connection status enum
    ActorType,          # Actor type enum
    QoS,                # QoS level enum
    MessageMeta,        # Message metadata
    ActorPresence,      # Presence info

    # REST API Client
    NoLagApi,           # REST API client
    NoLagApiError,      # API error class
    App, AppCreate, AppUpdate,
    Room, RoomCreate, RoomUpdate,
    Actor, ActorWithToken, ActorCreate, ActorUpdate,
)

Requirements

  • Python 3.10+
  • websockets >= 12.0
  • msgpack >= 1.0.0
  • aiohttp >= 3.9.0

Next Steps