
AutoGen Core: how do I set up topics/subscriptions for pub-sub routing and data-dependent agent instances (per tenant/session)?
In AutoGen Core, topics and subscriptions are the mechanism that turns a pile of agents into a real pub‑sub system, especially once you need per‑tenant or per‑session isolation. Instead of hard‑coding agent IDs and manually wiring message flows, you define “who listens to what” declaratively and let the runtime create the right agent instances on demand.
Quick Answer: Use AutoGen Core’s topic and subscription model to route messages by
Topic = (Topic Type, Topic Source)and subscribe agents by type withTypeSubscription. For per‑tenant or per‑session isolation, makeTopic Sourcedata‑dependent (e.g., tenant ID, conversation ID) so the runtime spins up separate agent instances per topic source while you keep application code free of hard‑coded agent IDs.
Why This Matters
Once you go beyond a single LLM call, your main problems stop being “prompting” and start being “routing”: which agent handles which request, how do you isolate tenants, and how do you keep a long‑running system maintainable. AutoGen Core’s topics/subscriptions solve that at the runtime layer: you publish to topics, subscribe by type, and let the runtime handle data‑dependent agent instances (per tenant/session) without rewriting your orchestration code.
Key Benefits:
- Portable routing: Use type‑based subscriptions so your application doesn’t depend on specific agent IDs or fixed topologies.
- Multi‑tenant isolation: Model tenant/session boundaries in
Topic Sourceso each tenant gets its own agent instances and message streams. - Dynamic scale‑out: Let the runtime create data‑dependent agent IDs automatically as new topics appear, instead of pre‑allocating agents.
Core Concepts & Key Points
| Concept | Definition | Why it's important |
|---|---|---|
| Topic | A pair Topic = (Topic Type, Topic Source); often written as Topic_Type/Topic_Source. | Encodes “what kind of message is this?” and “who/what does it belong to?” (e.g., tenant or session). |
| Subscription | A rule that tells the runtime which agents should receive messages for which topics. | Decouples publishers and subscribers so you don’t hard‑code agent IDs. |
| Type‑Based Subscription | A subscription on Agent Type and Topic Type (e.g., TypeSubscription(topic_type="support", agent_type="triage_agent")). | Portable and data‑independent: the runtime creates data‑dependent agent IDs as needed, ideal for per‑tenant/session routing. |
How It Works (Step‑by‑Step)
At a high level you:
- Define topic conventions (what
Topic TypeandTopic Sourcemean in your system). - Declare type‑based subscriptions for your agent types.
- Publish messages to topics; the runtime resolves or creates the right agent instances and delivers events.
Below I’ll walk through each step with code.
1. Install the required packages
Python 3.10 or later is required.
For a Core‑only pub‑sub example (no LLM calls needed just to show routing):
pip install -U "autogen-core"
If you plan to attach real LLM behavior, you’ll typically also install AgentChat and Extensions:
pip install -U "autogen-agentchat" "autogen-ext[openai]"
Note
Costs and rate limits come from your chosen model provider (e.g., OpenAI/Azure OpenAI), not from AutoGen itself.
2. Define your topic model
You decide what Topic Type and Topic Source represent. A pattern that works well in multi‑tenant systems:
Topic Type: logical channel (e.g.,"support","billing","review","default").Topic Source: tenant/session identifier (e.g., tenant ID, conversation ID, GitHub issue URL).
Format:
- Definition:
Topic = (Topic Type, Topic Source) - String form:
"Topic_Type/Topic_Source"
Examples:
- Single‑tenant, single topic:
("default", "default")→"default/default" - Single‑tenant, multiple topics:
("support", "default"),("billing", "default") - Multi‑tenant, per‑session:
("support", tenant_id)or("chat", session_id)
You’ll use these values when publishing messages.
3. Use type‑based subscriptions (preferred)
Type‑based subscription is the recommended pattern because it’s portable and data‑independent. You declare:
- “All agents of type
triage_agentshould receive messages on topic typesupport.” - “All agents of type
reviewer_agentshould receive messages on topic typecode_review.”
You do not hard‑code agent IDs in application logic; the runtime derives them per topic source and creates them if they don’t exist.
Conceptually:
from autogen_core import TypeSubscription
support_triage_sub = TypeSubscription(
topic_type="support",
agent_type="triage_agent",
)
When a message arrives on topic ("support", "tenant_123"), the runtime:
- Resolves the agent ID as
("triage_agent", "tenant_123"). - Creates the agent instance if it doesn’t exist.
- Delivers the message to that agent instance.
This is how you get data‑dependent agent IDs and per‑tenant/session isolation without hand‑coding IDs.
4. Build a minimal pub‑sub example with SingleThreadedAgentRuntime
Let’s build a small, runnable example with:
- A runtime:
SingleThreadedAgentRuntime. - One agent type:
"triage_agent". - A type‑based subscription on topic type
"support". - Tenant‑scoped topics:
("support", tenant_id).
import asyncio
from autogen_core import (
SingleThreadedAgentRuntime,
TypeSubscription,
AgentId,
Message,
)
# 1) Define a simple agent behavior
class TriageAgent:
agent_type = "triage_agent"
async def handle_message(self, message: Message, runtime: SingleThreadedAgentRuntime):
topic_type, topic_source = message.topic
print(f"[{self.agent_type}:{topic_source}] received: {message.content}")
# Optionally publish a follow-up message on the same topic
await runtime.publish(
topic=(topic_type, topic_source),
content=f"Handled by triage agent for tenant {topic_source}",
)
# 2) Wire it into the runtime
async def main():
runtime = SingleThreadedAgentRuntime()
# Register the agent type with a factory function
runtime.register_agent_type(
agent_type=TriageAgent.agent_type,
factory=lambda agent_id: TriageAgent(),
)
# Declare a type-based subscription:
# All triage_agent instances subscribe to topic_type="support"
runtime.add_subscription(
TypeSubscription(
topic_type="support",
agent_type=TriageAgent.agent_type,
)
)
# 3) Publish messages for two different tenants
await runtime.publish(
topic=("support", "tenant_a"),
content="Tenant A says: I need help with login.",
)
await runtime.publish(
topic=("support", "tenant_b"),
content="Tenant B says: My invoice looks wrong.",
)
# Run the event loop long enough to process messages
await runtime.drain()
if __name__ == "__main__":
asyncio.run(main())
What happens:
- The first message on
("support", "tenant_a")creates agent ID("triage_agent", "tenant_a"). - The second message on
("support", "tenant_b")creates agent ID("triage_agent", "tenant_b"). - Both share code (same agent type) but have isolated state per tenant/session.
5. Single‑tenant vs multi‑tenant patterns
The same type‑based subscription pattern applies across scenarios.
Single‑tenant, single topic
You’re just prototyping or have one environment and one logical channel:
- Topic Type:
"default" - Topic Source:
"default"
Subscription:
runtime.add_subscription(
TypeSubscription(
topic_type="default",
agent_type="app_agent",
)
)
Publish:
await runtime.publish(topic=("default", "default"), content="Hello, world")
Single‑tenant, multiple topics
You want specialization by topic (e.g., support vs billing) but not per‑tenant separation:
- Topic Types:
"support","billing","review" - Topic Source:
"default"
Subscriptions:
runtime.add_subscription(
TypeSubscription(topic_type="support", agent_type="triage_agent")
)
runtime.add_subscription(
TypeSubscription(topic_type="billing", agent_type="billing_agent")
)
runtime.add_subscription(
TypeSubscription(topic_type="review", agent_type="reviewer_agent")
)
Publish:
await runtime.publish(topic=("support", "default"), content="Help me log in")
await runtime.publish(topic=("billing", "default"), content="Refund request")
The runtime maps these to agent IDs:
("triage_agent", "default")("billing_agent", "default")("reviewer_agent", "default")
If the agent with that ID does not exist, the runtime will create it.
Multi‑tenant / per‑session (data‑dependent agent IDs)
Now the topic source becomes data‑dependent. For example, one triage agent per GitHub issue:
- Topic Type:
"default" - Topic Source:
"github.com/microsoft/autogen/issues/9"
Subscription:
runtime.add_subscription(
TypeSubscription(
topic_type="default",
agent_type="triage_agent",
)
)
Publish for two different issues:
await runtime.publish(
topic=("default", "github.com/microsoft/autogen/issues/9"),
content="Please triage issue #9",
)
await runtime.publish(
topic=("default", "github.com/microsoft/autogen/issues/10"),
content="Please triage issue #10",
)
The runtime delivers to:
- Agent ID
("triage_agent", "github.com/microsoft/autogen/issues/9") - Agent ID
("triage_agent", "github.com/microsoft/autogen/issues/10")
Note the agent ID is data‑dependent, and the runtime will create a new instance of the agent if it does not exist.
6. When to use topic‑based vs ID‑based subscriptions
AutoGen Core allows both, but they have different trade‑offs.
-
Type‑based subscriptions (recommended default):
- Portable and data‑independent: you write “triage agents handle support” in configuration, not “agent X handles tenant Y.”
- Works naturally with per‑tenant/topic source IDs.
- Easier to refactor: you can change the underlying ID scheme without rewriting routing logic.
-
ID‑based subscriptions:
- Useful for very specific wiring (e.g., one orchestrator agent that must receive everything).
- Tends to entangle application logic with ID conventions, which becomes brittle as you add tenants/sessions.
In a regulated environment with multiple tenants, I strongly prefer type‑based subscriptions and treat IDs as an internal runtime detail.
7. Connecting this to AgentChat and distributed runtimes
Once your topic and subscription patterns are stable, you can use them across layers:
-
AgentChat (high‑level API):
- Use Teams or GraphFlow for patterns like Selector Group Chat or workflow graphs.
- Under the hood, they still rely on runtimes and topics; topic source can carry tenant/session IDs.
- Start with AgentChat if you’re focused on conversational logic and only later drop down to Core for custom routing.
-
Core with distributed runtime:
- Use a host servicer + workers + gateways when you need isolation and scale for heavy workloads.
- Topics and subscriptions are still your primary control plane: you can route by tenant or region using
Topic Source. - For example, send tenant A traffic to one worker pool (by topic filter), tenant B to another.
Migration guidance I’ve followed in practice:
- Start with
SingleThreadedAgentRuntimeand type‑based subscriptions while designing topics. - Once the behavior is correct, swap to a distributed runtime topology without changing your agent implementations.
- Keep topic conventions and subscriptions in configuration so you can evolve routing without redeploying agent code.
Common Mistakes to Avoid
-
Hard‑coding agent IDs in application logic:
This ties your architecture to specific ID forms and breaks once you introduce tenants or new topic types. PreferTypeSubscription(topic_type="...", agent_type="...")and let the runtime map to agent IDs. -
Encoding tenant/session in message content instead of
Topic Source:
If you hide tenant information inside the payload, you can’t route or isolate effectively. Always model tenant/session explicitly asTopic Sourceso the runtime can create data‑dependent agent instances and enforce separation.
Real‑World Example
In our internal “agent platform,” we run a multi‑tenant workflow where each customer has:
- A triage agent per tenant, plus
- One or more specialized agents per conversation.
We model it with:
- Topic Type:
"triage"or"conversation". - Topic Source:
tenant_idfor triage,f"{tenant_id}/{conversation_id}"for conversations.
Key subscriptions:
runtime.add_subscription(
TypeSubscription(topic_type="triage", agent_type="triage_agent")
)
runtime.add_subscription(
TypeSubscription(topic_type="conversation", agent_type="conversation_agent")
)
When a new tenant starts using the system, we don’t deploy anything new:
- The first message on
("triage", "tenant_42")creates agent ID("triage_agent", "tenant_42"). - The first message on
("conversation", "tenant_42/conv_1001")creates("conversation_agent", "tenant_42/conv_1001").
The runtime owns lifecycle; we just publish messages to the right topics. This makes it straightforward to reason about isolation boundaries and to move a subset of tenants to a separate distributed runtime if needed.
Pro Tip: Treat
Topic TypeandTopic Sourceas first‑class configuration, not ad‑hoc strings. Define a small set of allowed topic types and a clear format for topic sources (e.g.,tenant_id/session_id), and write helper functions to construct them—this reduces routing bugs and makes multi‑tenant audits far easier.
Summary
AutoGen Core’s topics and subscriptions give you a runtime‑level way to do pub‑sub routing and per‑tenant/session isolation without hard‑coded agent IDs. By modeling Topic = (Topic Type, Topic Source) and using type‑based subscriptions, you let the runtime create data‑dependent agent IDs and agent instances on demand, whether you’re in a single‑tenant, multi‑topic setup or a large multi‑tenant system. Once your topic model is stable in SingleThreadedAgentRuntime, you can lift it into AgentChat patterns or a distributed runtime without changing agent behavior.