AutoGen Core: how do I set up topics/subscriptions for pub-sub routing and data-dependent agent instances (per tenant/session)?
AI Agent Automation Platforms

AutoGen Core: how do I set up topics/subscriptions for pub-sub routing and data-dependent agent instances (per tenant/session)?

9 min read

In AutoGen Core, topics and subscriptions are the mechanism that turns a pile of agents into a real pub‑sub system, especially once you need per‑tenant or per‑session isolation. Instead of hard‑coding agent IDs and manually wiring message flows, you define “who listens to what” declaratively and let the runtime create the right agent instances on demand.

Quick Answer: Use AutoGen Core’s topic and subscription model to route messages by Topic = (Topic Type, Topic Source) and subscribe agents by type with TypeSubscription. For per‑tenant or per‑session isolation, make Topic Source data‑dependent (e.g., tenant ID, conversation ID) so the runtime spins up separate agent instances per topic source while you keep application code free of hard‑coded agent IDs.

Why This Matters

Once you go beyond a single LLM call, your main problems stop being “prompting” and start being “routing”: which agent handles which request, how do you isolate tenants, and how do you keep a long‑running system maintainable. AutoGen Core’s topics/subscriptions solve that at the runtime layer: you publish to topics, subscribe by type, and let the runtime handle data‑dependent agent instances (per tenant/session) without rewriting your orchestration code.

Key Benefits:

  • Portable routing: Use type‑based subscriptions so your application doesn’t depend on specific agent IDs or fixed topologies.
  • Multi‑tenant isolation: Model tenant/session boundaries in Topic Source so each tenant gets its own agent instances and message streams.
  • Dynamic scale‑out: Let the runtime create data‑dependent agent IDs automatically as new topics appear, instead of pre‑allocating agents.

Core Concepts & Key Points

ConceptDefinitionWhy it's important
TopicA pair Topic = (Topic Type, Topic Source); often written as Topic_Type/Topic_Source.Encodes “what kind of message is this?” and “who/what does it belong to?” (e.g., tenant or session).
SubscriptionA rule that tells the runtime which agents should receive messages for which topics.Decouples publishers and subscribers so you don’t hard‑code agent IDs.
Type‑Based SubscriptionA subscription on Agent Type and Topic Type (e.g., TypeSubscription(topic_type="support", agent_type="triage_agent")).Portable and data‑independent: the runtime creates data‑dependent agent IDs as needed, ideal for per‑tenant/session routing.

How It Works (Step‑by‑Step)

At a high level you:

  1. Define topic conventions (what Topic Type and Topic Source mean in your system).
  2. Declare type‑based subscriptions for your agent types.
  3. Publish messages to topics; the runtime resolves or creates the right agent instances and delivers events.

Below I’ll walk through each step with code.


1. Install the required packages

Python 3.10 or later is required.

For a Core‑only pub‑sub example (no LLM calls needed just to show routing):

pip install -U "autogen-core"

If you plan to attach real LLM behavior, you’ll typically also install AgentChat and Extensions:

pip install -U "autogen-agentchat" "autogen-ext[openai]"

Note
Costs and rate limits come from your chosen model provider (e.g., OpenAI/Azure OpenAI), not from AutoGen itself.


2. Define your topic model

You decide what Topic Type and Topic Source represent. A pattern that works well in multi‑tenant systems:

  • Topic Type: logical channel (e.g., "support", "billing", "review", "default").
  • Topic Source: tenant/session identifier (e.g., tenant ID, conversation ID, GitHub issue URL).

Format:

  • Definition: Topic = (Topic Type, Topic Source)
  • String form: "Topic_Type/Topic_Source"

Examples:

  • Single‑tenant, single topic:
    ("default", "default")"default/default"
  • Single‑tenant, multiple topics:
    ("support", "default"), ("billing", "default")
  • Multi‑tenant, per‑session:
    ("support", tenant_id) or ("chat", session_id)

You’ll use these values when publishing messages.


3. Use type‑based subscriptions (preferred)

Type‑based subscription is the recommended pattern because it’s portable and data‑independent. You declare:

  • “All agents of type triage_agent should receive messages on topic type support.”
  • “All agents of type reviewer_agent should receive messages on topic type code_review.”

You do not hard‑code agent IDs in application logic; the runtime derives them per topic source and creates them if they don’t exist.

Conceptually:

from autogen_core import TypeSubscription

support_triage_sub = TypeSubscription(
    topic_type="support",
    agent_type="triage_agent",
)

When a message arrives on topic ("support", "tenant_123"), the runtime:

  1. Resolves the agent ID as ("triage_agent", "tenant_123").
  2. Creates the agent instance if it doesn’t exist.
  3. Delivers the message to that agent instance.

This is how you get data‑dependent agent IDs and per‑tenant/session isolation without hand‑coding IDs.


4. Build a minimal pub‑sub example with SingleThreadedAgentRuntime

Let’s build a small, runnable example with:

  • A runtime: SingleThreadedAgentRuntime.
  • One agent type: "triage_agent".
  • A type‑based subscription on topic type "support".
  • Tenant‑scoped topics: ("support", tenant_id).
import asyncio
from autogen_core import (
    SingleThreadedAgentRuntime,
    TypeSubscription,
    AgentId,
    Message,
)

# 1) Define a simple agent behavior
class TriageAgent:
    agent_type = "triage_agent"

    async def handle_message(self, message: Message, runtime: SingleThreadedAgentRuntime):
        topic_type, topic_source = message.topic
        print(f"[{self.agent_type}:{topic_source}] received: {message.content}")

        # Optionally publish a follow-up message on the same topic
        await runtime.publish(
            topic=(topic_type, topic_source),
            content=f"Handled by triage agent for tenant {topic_source}",
        )

# 2) Wire it into the runtime
async def main():
    runtime = SingleThreadedAgentRuntime()

    # Register the agent type with a factory function
    runtime.register_agent_type(
        agent_type=TriageAgent.agent_type,
        factory=lambda agent_id: TriageAgent(),
    )

    # Declare a type-based subscription:
    # All triage_agent instances subscribe to topic_type="support"
    runtime.add_subscription(
        TypeSubscription(
            topic_type="support",
            agent_type=TriageAgent.agent_type,
        )
    )

    # 3) Publish messages for two different tenants
    await runtime.publish(
        topic=("support", "tenant_a"),
        content="Tenant A says: I need help with login.",
    )

    await runtime.publish(
        topic=("support", "tenant_b"),
        content="Tenant B says: My invoice looks wrong.",
    )

    # Run the event loop long enough to process messages
    await runtime.drain()

if __name__ == "__main__":
    asyncio.run(main())

What happens:

  • The first message on ("support", "tenant_a") creates agent ID ("triage_agent", "tenant_a").
  • The second message on ("support", "tenant_b") creates agent ID ("triage_agent", "tenant_b").
  • Both share code (same agent type) but have isolated state per tenant/session.

5. Single‑tenant vs multi‑tenant patterns

The same type‑based subscription pattern applies across scenarios.

Single‑tenant, single topic

You’re just prototyping or have one environment and one logical channel:

  • Topic Type: "default"
  • Topic Source: "default"

Subscription:

runtime.add_subscription(
    TypeSubscription(
        topic_type="default",
        agent_type="app_agent",
    )
)

Publish:

await runtime.publish(topic=("default", "default"), content="Hello, world")

Single‑tenant, multiple topics

You want specialization by topic (e.g., support vs billing) but not per‑tenant separation:

  • Topic Types: "support", "billing", "review"
  • Topic Source: "default"

Subscriptions:

runtime.add_subscription(
    TypeSubscription(topic_type="support", agent_type="triage_agent")
)
runtime.add_subscription(
    TypeSubscription(topic_type="billing", agent_type="billing_agent")
)
runtime.add_subscription(
    TypeSubscription(topic_type="review", agent_type="reviewer_agent")
)

Publish:

await runtime.publish(topic=("support", "default"), content="Help me log in")
await runtime.publish(topic=("billing", "default"), content="Refund request")

The runtime maps these to agent IDs:

  • ("triage_agent", "default")
  • ("billing_agent", "default")
  • ("reviewer_agent", "default")

If the agent with that ID does not exist, the runtime will create it.

Multi‑tenant / per‑session (data‑dependent agent IDs)

Now the topic source becomes data‑dependent. For example, one triage agent per GitHub issue:

  • Topic Type: "default"
  • Topic Source: "github.com/microsoft/autogen/issues/9"

Subscription:

runtime.add_subscription(
    TypeSubscription(
        topic_type="default",
        agent_type="triage_agent",
    )
)

Publish for two different issues:

await runtime.publish(
    topic=("default", "github.com/microsoft/autogen/issues/9"),
    content="Please triage issue #9",
)

await runtime.publish(
    topic=("default", "github.com/microsoft/autogen/issues/10"),
    content="Please triage issue #10",
)

The runtime delivers to:

  • Agent ID ("triage_agent", "github.com/microsoft/autogen/issues/9")
  • Agent ID ("triage_agent", "github.com/microsoft/autogen/issues/10")

Note the agent ID is data‑dependent, and the runtime will create a new instance of the agent if it does not exist.


6. When to use topic‑based vs ID‑based subscriptions

AutoGen Core allows both, but they have different trade‑offs.

  • Type‑based subscriptions (recommended default):

    • Portable and data‑independent: you write “triage agents handle support” in configuration, not “agent X handles tenant Y.”
    • Works naturally with per‑tenant/topic source IDs.
    • Easier to refactor: you can change the underlying ID scheme without rewriting routing logic.
  • ID‑based subscriptions:

    • Useful for very specific wiring (e.g., one orchestrator agent that must receive everything).
    • Tends to entangle application logic with ID conventions, which becomes brittle as you add tenants/sessions.

In a regulated environment with multiple tenants, I strongly prefer type‑based subscriptions and treat IDs as an internal runtime detail.


7. Connecting this to AgentChat and distributed runtimes

Once your topic and subscription patterns are stable, you can use them across layers:

  • AgentChat (high‑level API):

    • Use Teams or GraphFlow for patterns like Selector Group Chat or workflow graphs.
    • Under the hood, they still rely on runtimes and topics; topic source can carry tenant/session IDs.
    • Start with AgentChat if you’re focused on conversational logic and only later drop down to Core for custom routing.
  • Core with distributed runtime:

    • Use a host servicer + workers + gateways when you need isolation and scale for heavy workloads.
    • Topics and subscriptions are still your primary control plane: you can route by tenant or region using Topic Source.
    • For example, send tenant A traffic to one worker pool (by topic filter), tenant B to another.

Migration guidance I’ve followed in practice:

  • Start with SingleThreadedAgentRuntime and type‑based subscriptions while designing topics.
  • Once the behavior is correct, swap to a distributed runtime topology without changing your agent implementations.
  • Keep topic conventions and subscriptions in configuration so you can evolve routing without redeploying agent code.

Common Mistakes to Avoid

  • Hard‑coding agent IDs in application logic:
    This ties your architecture to specific ID forms and breaks once you introduce tenants or new topic types. Prefer TypeSubscription(topic_type="...", agent_type="...") and let the runtime map to agent IDs.

  • Encoding tenant/session in message content instead of Topic Source:
    If you hide tenant information inside the payload, you can’t route or isolate effectively. Always model tenant/session explicitly as Topic Source so the runtime can create data‑dependent agent instances and enforce separation.


Real‑World Example

In our internal “agent platform,” we run a multi‑tenant workflow where each customer has:

  • A triage agent per tenant, plus
  • One or more specialized agents per conversation.

We model it with:

  • Topic Type: "triage" or "conversation".
  • Topic Source: tenant_id for triage, f"{tenant_id}/{conversation_id}" for conversations.

Key subscriptions:

runtime.add_subscription(
    TypeSubscription(topic_type="triage", agent_type="triage_agent")
)
runtime.add_subscription(
    TypeSubscription(topic_type="conversation", agent_type="conversation_agent")
)

When a new tenant starts using the system, we don’t deploy anything new:

  • The first message on ("triage", "tenant_42") creates agent ID ("triage_agent", "tenant_42").
  • The first message on ("conversation", "tenant_42/conv_1001") creates ("conversation_agent", "tenant_42/conv_1001").

The runtime owns lifecycle; we just publish messages to the right topics. This makes it straightforward to reason about isolation boundaries and to move a subset of tenants to a separate distributed runtime if needed.

Pro Tip: Treat Topic Type and Topic Source as first‑class configuration, not ad‑hoc strings. Define a small set of allowed topic types and a clear format for topic sources (e.g., tenant_id/session_id), and write helper functions to construct them—this reduces routing bugs and makes multi‑tenant audits far easier.

Summary

AutoGen Core’s topics and subscriptions give you a runtime‑level way to do pub‑sub routing and per‑tenant/session isolation without hard‑coded agent IDs. By modeling Topic = (Topic Type, Topic Source) and using type‑based subscriptions, you let the runtime create data‑dependent agent IDs and agent instances on demand, whether you’re in a single‑tenant, multi‑topic setup or a large multi‑tenant system. Once your topic model is stable in SingleThreadedAgentRuntime, you can lift it into AgentChat patterns or a distributed runtime without changing agent behavior.

Next Step

Get Started