
Parallel Task API: how do I run an async deep research/enrichment job and fetch the final JSON output?
In Parallel, “async deep research job” is just another way of saying “Task API run.” You send a Task request, Parallel’s processors do the web research/enrichment asynchronously, and you later fetch the final JSON output with citations, reasoning, and confidence attached to every field via Basis.
Below is a practical, code-first walkthrough of how to:
- Create an async deep research or enrichment task
- Poll for completion safely within typical latency bands
- Retrieve and inspect the final JSON output (with evidence)
- Design your schema so agents can consume the result reliably
All examples assume you’ve already created an API key and installed the Python client.
How the Task API works (mental model)
Parallel’s Task API is built for “program, run, repeat” workflows:
- Input:
- A natural-language instruction (e.g., “Create a market research report…”)
- Optionally, existing structured records you want enriched (e.g., CRM accounts)
- Processing:
- Runs on Parallel’s AI-native web index + live crawling
- Uses a chosen Processor tier (Lite → Ultra8x) to trade off latency vs depth
- Returns evidence-based outputs with citations, rationale, and confidence via Basis
- Output:
- Deep research reports or
- Structured enrichment JSON that fits a schema you define
Tasks are asynchronous: you create a run, get a run_id, and later fetch the result. Latency is:
- 5s–30 minutes, depending on the processor and task complexity
- Asynchronous by design — ideal for background research and enrichment pipelines
- Rate limits: Up to 2,000 requests/min
You pay per request, not per token, which means you know the cost of a batch before you start it (CPM-style predictability rather than “we’ll see” token bills).
Quick-start: Async deep research, then fetch JSON output
This is the minimal pattern you’ll use in production.
Step 1 – Install and import the Parallel client
pip install parallel-ai
from parallel import Parallel
client = Parallel(api_key="PARALLEL_API_KEY")
Step 2 – Create an async Task run
For deep research (single prompt), you can start as simple as:
task_run = client.task_run.create(
input=(
"Create a comprehensive market research report on the HVAC industry in the USA, "
"including market size, key segments, recent M&A activity, major competitors, "
"and a short outlook. Return the result as structured JSON."
),
processor="ultra", # or "core", "base", etc., depending on depth/latency budget
)
print(f"Run ID: {task_run.run_id}")
What’s happening here:
processor="ultra"chooses a higher-compute processor for deeper research (expect longer latency but better coverage and reasoning).- The API returns a
task_runobject with arun_id— this is your handle to the async job. - No summarization/browsing token meter: the price is per run (within the $0.005–$2.4 range depending on processor), so you can budget runs ahead of time.
Step 3 – Poll for completion and fetch final JSON
You then poll for the result. The library wraps this in a simple .result helper:
run_result = client.task_run.result(
task_run.run_id,
api_timeout=3600 # seconds; enough for Ultra-level deep research
)
# `run_result.output` is the final JSON payload
print(run_result.output)
Key behavior:
.result(run_id, api_timeout=…)will keep polling until:- The run succeeds and returns final JSON, or
- The timeout is exceeded and raises an error.
- For most production workflows, set
api_timeoutin the 5s–30min window that matches the processor and complexity. For Ultra-level long-tail research, 30–60 minutes is a safe upper bound. run_result.outputis already JSON-serializable (dict-like in Python). You can persist it directly to your database or pass it straight into downstream agents.
Enrichment: Async Task for structured CRM-style data
For enrichment jobs, you’ll typically:
- Pass a list/array of existing records (e.g., companies, leads, products).
- Define the output schema you expect (e.g.,
industry,headcount,tech_stack). - Let Parallel run deep web research for each entity and fill the schema, with citations and confidence per field.
Here’s a concrete example.
Step 1 – Define your enrichment schema
Design a JSON structure that your system expects. For a sales use case:
enrichment_schema = {
"type": "object",
"properties": {
"company_name": {"type": "string"},
"company_website": {"type": "string"},
"industry": {"type": "string"},
"employee_count": {"type": "integer"},
"hq_location": {"type": "string"},
"funding_stage": {"type": "string"},
"key_products": {
"type": "array",
"items": {"type": "string"}
}
},
"required": ["company_name", "company_website"]
}
You don’t have to use JSON Schema syntax strictly, but it’s a good way to keep validation strict and predictable for agents.
Step 2 – Provide input records for enrichment
Let’s say you have a list of prospects:
prospects = [
{"company_name": "Acme HVAC", "company_website": "https://acmehvac.com"},
{"company_name": "Northwind Cooling", "company_website": "https://northwindcooling.com"},
]
Step 3 – Create an async enrichment Task
You combine the schema, records, and task instructions:
task_run = client.task_run.create(
input={
"instruction": (
"For each company, research the web and enrich the record with "
"industry, employee_count, HQ location, funding_stage, and key_products. "
"Return an array where each element aligns with the input record index. "
"For any field you cannot verify, leave it null and explain why in Basis."
),
"records": prospects,
"output_schema": enrichment_schema,
},
processor="core" # often enough for enrichment; adjust per coverage/latency needs
)
print(f"Enrichment Run ID: {task_run.run_id}")
Notes:
- The
inputhere is structured: an instruction, your records, and an explicit schema. - Processor choice:
lite/base: faster, cheaper, suitable for simple lookups.core: good balance across coverage, latency, and cost for most enrichment pipelines.pro/ultra: for complex cases, ambiguous entities, or high-stakes compliance/regulatory data.
Step 4 – Fetch the enrichment results
run_result = client.task_run.result(
task_run.run_id,
api_timeout=1800 # 30 minutes; typically ample for enrichment batches
)
output = run_result.output
You’ll usually see something like:
[
{
"company_name": "Acme HVAC",
"company_website": "https://acmehvac.com",
"industry": "Heating, Ventilation, and Air Conditioning (HVAC) Services",
"employee_count": 230,
"hq_location": "Denver, Colorado, United States",
"funding_stage": "Private, Bootstrapped",
"key_products": [
"Commercial HVAC installation",
"Industrial cooling systems",
"Preventive maintenance contracts"
],
"_basis": {
"industry": {
"confidence": 0.93,
"citations": [
{
"url": "https://acmehvac.com/about",
"excerpt": "Acme HVAC is a regional provider of commercial and industrial heating and cooling services..."
}
],
"rationale": "Company website and regional business directory listings consistently categorize Acme as an HVAC services provider."
},
"employee_count": { "...": "..." }
}
},
{ "...": "..." }
]
The exact field naming may vary, but the pattern is:
- Top-level fields: the enriched attributes you requested.
"_basis"(or equivalent Basis structure):citations: URLs and compressed excerpts that justify each fieldconfidence: calibrated probability-like score per fieldrationale: model’s reasoning, useful for review or programmatic filtering
This is the central design: facts are always shipped with provenance, so you can:
- Reject low-confidence fields automatically.
- Drive UI explanations (“We believe this is their HQ because…”).
- Audit each enrichment run for compliance scenarios.
Handling async status explicitly (optional but robust)
If you want more control than .result(...), you can query status manually. This is useful if you’re building your own worker loop or want to inspect intermediate states.
Step 1 – Create a run
Same as before:
task_run = client.task_run.create(
input="Create a comprehensive market research report on the HVAC industry in the USA...",
processor="ultra"
)
run_id = task_run.run_id
Step 2 – Poll status in a loop
import time
poll_interval = 10 # seconds
while True:
current = client.task_run.get(run_id) # or equivalent 'retrieve' call
status = current.status
if status in ("succeeded", "failed", "cancelled"):
break
print(f"Run {run_id} still {status}... waiting {poll_interval}s")
time.sleep(poll_interval)
Step 3 – Handle terminal states and extract output
if status == "succeeded":
output = current.output
# process/store output here
elif status == "failed":
print(f"Task failed with error: {current.error}")
else:
print(f"Task ended in state: {status}")
This approach lets you:
- Implement job dashboards showing in-progress research.
- Align polling with your queue/worker infrastructure instead of blocking.
- Set per-processor polling intervals (e.g., shorter for
lite, longer forultra).
Processor selection: depth vs latency vs budget
The Task API’s Processor architecture is built for explicit tradeoffs:
-
Lite/Base:
- Latency: closer to the lower end of 5–30s for simple tasks
- Best for: shallow enrichment, simple factual lookups, high-volume queues
- Cost: lower CPM; ideal when you care about coverage across millions of items
-
Core:
- Balanced latency (~10–120s typical, but still within the 5s–30min envelope)
- Best for: most production enrichment, mid-depth market snapshots, product catalog research
-
Pro/Ultra (including scaling multipliers like Ultra2x, Ultra4x, Ultra8x):
- Latency: can extend towards the upper end of the 5s–30min band, especially for complex multi-entity research
- Best for: deep market/technical research, regulated domains (legal, medical, finance) where correctness and coverage matter more than latency
- Cost: higher per request, but often replaces hours of human research or multiple LLM browsing passes
Because pricing is per request, you can:
- Choose processors based on task complexity rather than guessing token usage.
- Model your budget simply:
CPM(processor) × number_of_runs.
Designing outputs agents can actually use
If you’re building agents that call the Task API via tools (MCP or custom), the winning pattern is:
- Define a strict schema:
- Types, required fields, and allowed values (e.g., enums for
industry,stage).
- Types, required fields, and allowed values (e.g., enums for
- Include Basis in the schema:
- Either as a dedicated
_basisobject or per-field attributes (e.g.,industry_confidence,industry_citations).
- Either as a dedicated
- Programmatic acceptance criteria:
- E.g., “accept a field if confidence ≥ 0.8 and at least one citation from a domain on the trusted list.”
- Chain-of-record:
- Persist
run_id, output, and Basis so you can re-audit any decision made downstream.
- Persist
Example of a schema-aware instruction:
instruction = """
You are an enrichment engine. For each input company, research the web and fill the output_schema.
Return JSON that strictly adheres to the output_schema types and structure.
Include Basis metadata with citations, confidence, and rationale for each non-null field.
If a field cannot be confidently determined from the web, leave it null and explain why in Basis.
"""
Then pass this instruction alongside records and output_schema in your Task input.
Reliability and benchmarking context
Parallel’s Task API is optimized for evidence-based deep research, not just “browsing + summarization”:
- Built on an AI-native web index + live crawling, not a generic SERP wrapper.
- Output is oriented around token-dense compressed excerpts and structured fields, not human-facing snippets.
- For verifiability, Task outputs leverage the Basis framework:
- Citations (URLs + excerpts)
- Reasoning/rationale
- Calibrated confidence
On internal and public benchmarks (e.g., DeepResearch Bench, RACER, WISER-Atomic), Parallel consistently targets the Pareto frontier of accuracy vs CPM vs latency, with clear methodology sections describing:
- Judge models used
- Tool constraints (e.g., search-only)
- Testing windows (to control for web drift)
That’s the backdrop for why teams use Task: to collapse multi-step “search → scrape → parse → summarize” stacks into a single async call with predictable economics.
Putting it together: end-to-end pattern
Here’s a compact blueprint you can adapt:
from parallel import Parallel
import json
client = Parallel(api_key="PARALLEL_API_KEY")
def run_deep_research():
task_run = client.task_run.create(
input="Create a comprehensive market research report on the HVAC industry in the USA including an analysis of recent M&A activity and other relevant details. Return as structured JSON with Basis metadata.",
processor="ultra",
)
print(f"Run ID: {task_run.run_id}")
result = client.task_run.result(task_run.run_id, api_timeout=3600)
output = result.output
# Persist or pass to downstream agents
with open("hvac_market_research.json", "w") as f:
json.dump(output, f, indent=2)
return output
if __name__ == "__main__":
report = run_deep_research()
print(json.dumps(report, indent=2))
For enrichment, just swap the input to structured records + schema, choosing core or pro processors depending on how deep you need to go.
Final takeaway
To run an async deep research or enrichment job with the Parallel Task API and fetch the final JSON output, you only need three core steps:
- Create a Task run with
task_run.create(...)— specifying your instructions, optional records/schema, and the processor tier that matches your latency and depth needs. - Wait asynchronously — either using
task_run.result(run_id, api_timeout=...)or your own polling loop to handle statuses. - Use the JSON output + Basis —
run_result.outputgives you structured data with citations, reasoning, and confidence so agents and humans can trust (or reject) every atomic fact.
When you’re ready to wire this into your own agents or data workflows, you can get started with just a few lines of code: