Parallel Task API: how do I run an async deep research/enrichment job and fetch the final JSON output?
RAG Retrieval & Web Search APIs

Parallel Task API: how do I run an async deep research/enrichment job and fetch the final JSON output?

10 min read

In Parallel, “async deep research job” is just another way of saying “Task API run.” You send a Task request, Parallel’s processors do the web research/enrichment asynchronously, and you later fetch the final JSON output with citations, reasoning, and confidence attached to every field via Basis.

Below is a practical, code-first walkthrough of how to:

  • Create an async deep research or enrichment task
  • Poll for completion safely within typical latency bands
  • Retrieve and inspect the final JSON output (with evidence)
  • Design your schema so agents can consume the result reliably

All examples assume you’ve already created an API key and installed the Python client.


How the Task API works (mental model)

Parallel’s Task API is built for “program, run, repeat” workflows:

  • Input:
    • A natural-language instruction (e.g., “Create a market research report…”)
    • Optionally, existing structured records you want enriched (e.g., CRM accounts)
  • Processing:
    • Runs on Parallel’s AI-native web index + live crawling
    • Uses a chosen Processor tier (Lite → Ultra8x) to trade off latency vs depth
    • Returns evidence-based outputs with citations, rationale, and confidence via Basis
  • Output:
    • Deep research reports or
    • Structured enrichment JSON that fits a schema you define

Tasks are asynchronous: you create a run, get a run_id, and later fetch the result. Latency is:

  • 5s–30 minutes, depending on the processor and task complexity
  • Asynchronous by design — ideal for background research and enrichment pipelines
  • Rate limits: Up to 2,000 requests/min

You pay per request, not per token, which means you know the cost of a batch before you start it (CPM-style predictability rather than “we’ll see” token bills).


Quick-start: Async deep research, then fetch JSON output

This is the minimal pattern you’ll use in production.

Step 1 – Install and import the Parallel client

pip install parallel-ai
from parallel import Parallel

client = Parallel(api_key="PARALLEL_API_KEY")

Step 2 – Create an async Task run

For deep research (single prompt), you can start as simple as:

task_run = client.task_run.create(
    input=(
        "Create a comprehensive market research report on the HVAC industry in the USA, "
        "including market size, key segments, recent M&A activity, major competitors, "
        "and a short outlook. Return the result as structured JSON."
    ),
    processor="ultra",  # or "core", "base", etc., depending on depth/latency budget
)
print(f"Run ID: {task_run.run_id}")

What’s happening here:

  • processor="ultra" chooses a higher-compute processor for deeper research (expect longer latency but better coverage and reasoning).
  • The API returns a task_run object with a run_id — this is your handle to the async job.
  • No summarization/browsing token meter: the price is per run (within the $0.005–$2.4 range depending on processor), so you can budget runs ahead of time.

Step 3 – Poll for completion and fetch final JSON

You then poll for the result. The library wraps this in a simple .result helper:

run_result = client.task_run.result(
    task_run.run_id,
    api_timeout=3600  # seconds; enough for Ultra-level deep research
)

# `run_result.output` is the final JSON payload
print(run_result.output)

Key behavior:

  • .result(run_id, api_timeout=…) will keep polling until:
    • The run succeeds and returns final JSON, or
    • The timeout is exceeded and raises an error.
  • For most production workflows, set api_timeout in the 5s–30min window that matches the processor and complexity. For Ultra-level long-tail research, 30–60 minutes is a safe upper bound.
  • run_result.output is already JSON-serializable (dict-like in Python). You can persist it directly to your database or pass it straight into downstream agents.

Enrichment: Async Task for structured CRM-style data

For enrichment jobs, you’ll typically:

  1. Pass a list/array of existing records (e.g., companies, leads, products).
  2. Define the output schema you expect (e.g., industry, headcount, tech_stack).
  3. Let Parallel run deep web research for each entity and fill the schema, with citations and confidence per field.

Here’s a concrete example.

Step 1 – Define your enrichment schema

Design a JSON structure that your system expects. For a sales use case:

enrichment_schema = {
    "type": "object",
    "properties": {
        "company_name": {"type": "string"},
        "company_website": {"type": "string"},
        "industry": {"type": "string"},
        "employee_count": {"type": "integer"},
        "hq_location": {"type": "string"},
        "funding_stage": {"type": "string"},
        "key_products": {
            "type": "array",
            "items": {"type": "string"}
        }
    },
    "required": ["company_name", "company_website"]
}

You don’t have to use JSON Schema syntax strictly, but it’s a good way to keep validation strict and predictable for agents.

Step 2 – Provide input records for enrichment

Let’s say you have a list of prospects:

prospects = [
    {"company_name": "Acme HVAC", "company_website": "https://acmehvac.com"},
    {"company_name": "Northwind Cooling", "company_website": "https://northwindcooling.com"},
]

Step 3 – Create an async enrichment Task

You combine the schema, records, and task instructions:

task_run = client.task_run.create(
    input={
        "instruction": (
            "For each company, research the web and enrich the record with "
            "industry, employee_count, HQ location, funding_stage, and key_products. "
            "Return an array where each element aligns with the input record index. "
            "For any field you cannot verify, leave it null and explain why in Basis."
        ),
        "records": prospects,
        "output_schema": enrichment_schema,
    },
    processor="core"  # often enough for enrichment; adjust per coverage/latency needs
)

print(f"Enrichment Run ID: {task_run.run_id}")

Notes:

  • The input here is structured: an instruction, your records, and an explicit schema.
  • Processor choice:
    • lite/base: faster, cheaper, suitable for simple lookups.
    • core: good balance across coverage, latency, and cost for most enrichment pipelines.
    • pro/ultra: for complex cases, ambiguous entities, or high-stakes compliance/regulatory data.

Step 4 – Fetch the enrichment results

run_result = client.task_run.result(
    task_run.run_id,
    api_timeout=1800  # 30 minutes; typically ample for enrichment batches
)

output = run_result.output

You’ll usually see something like:

[
  {
    "company_name": "Acme HVAC",
    "company_website": "https://acmehvac.com",
    "industry": "Heating, Ventilation, and Air Conditioning (HVAC) Services",
    "employee_count": 230,
    "hq_location": "Denver, Colorado, United States",
    "funding_stage": "Private, Bootstrapped",
    "key_products": [
      "Commercial HVAC installation",
      "Industrial cooling systems",
      "Preventive maintenance contracts"
    ],
    "_basis": {
      "industry": {
        "confidence": 0.93,
        "citations": [
          {
            "url": "https://acmehvac.com/about",
            "excerpt": "Acme HVAC is a regional provider of commercial and industrial heating and cooling services..."
          }
        ],
        "rationale": "Company website and regional business directory listings consistently categorize Acme as an HVAC services provider."
      },
      "employee_count": { "...": "..." }
    }
  },
  { "...": "..." }
]

The exact field naming may vary, but the pattern is:

  • Top-level fields: the enriched attributes you requested.
  • "_basis" (or equivalent Basis structure):
    • citations: URLs and compressed excerpts that justify each field
    • confidence: calibrated probability-like score per field
    • rationale: model’s reasoning, useful for review or programmatic filtering

This is the central design: facts are always shipped with provenance, so you can:

  • Reject low-confidence fields automatically.
  • Drive UI explanations (“We believe this is their HQ because…”).
  • Audit each enrichment run for compliance scenarios.

Handling async status explicitly (optional but robust)

If you want more control than .result(...), you can query status manually. This is useful if you’re building your own worker loop or want to inspect intermediate states.

Step 1 – Create a run

Same as before:

task_run = client.task_run.create(
    input="Create a comprehensive market research report on the HVAC industry in the USA...",
    processor="ultra"
)
run_id = task_run.run_id

Step 2 – Poll status in a loop

import time

poll_interval = 10  # seconds

while True:
    current = client.task_run.get(run_id)  # or equivalent 'retrieve' call
    status = current.status

    if status in ("succeeded", "failed", "cancelled"):
        break

    print(f"Run {run_id} still {status}... waiting {poll_interval}s")
    time.sleep(poll_interval)

Step 3 – Handle terminal states and extract output

if status == "succeeded":
    output = current.output
    # process/store output here
elif status == "failed":
    print(f"Task failed with error: {current.error}")
else:
    print(f"Task ended in state: {status}")

This approach lets you:

  • Implement job dashboards showing in-progress research.
  • Align polling with your queue/worker infrastructure instead of blocking.
  • Set per-processor polling intervals (e.g., shorter for lite, longer for ultra).

Processor selection: depth vs latency vs budget

The Task API’s Processor architecture is built for explicit tradeoffs:

  • Lite/Base:

    • Latency: closer to the lower end of 5–30s for simple tasks
    • Best for: shallow enrichment, simple factual lookups, high-volume queues
    • Cost: lower CPM; ideal when you care about coverage across millions of items
  • Core:

    • Balanced latency (~10–120s typical, but still within the 5s–30min envelope)
    • Best for: most production enrichment, mid-depth market snapshots, product catalog research
  • Pro/Ultra (including scaling multipliers like Ultra2x, Ultra4x, Ultra8x):

    • Latency: can extend towards the upper end of the 5s–30min band, especially for complex multi-entity research
    • Best for: deep market/technical research, regulated domains (legal, medical, finance) where correctness and coverage matter more than latency
    • Cost: higher per request, but often replaces hours of human research or multiple LLM browsing passes

Because pricing is per request, you can:

  • Choose processors based on task complexity rather than guessing token usage.
  • Model your budget simply: CPM(processor) × number_of_runs.

Designing outputs agents can actually use

If you’re building agents that call the Task API via tools (MCP or custom), the winning pattern is:

  1. Define a strict schema:
    • Types, required fields, and allowed values (e.g., enums for industry, stage).
  2. Include Basis in the schema:
    • Either as a dedicated _basis object or per-field attributes (e.g., industry_confidence, industry_citations).
  3. Programmatic acceptance criteria:
    • E.g., “accept a field if confidence ≥ 0.8 and at least one citation from a domain on the trusted list.”
  4. Chain-of-record:
    • Persist run_id, output, and Basis so you can re-audit any decision made downstream.

Example of a schema-aware instruction:

instruction = """
You are an enrichment engine. For each input company, research the web and fill the output_schema.
Return JSON that strictly adheres to the output_schema types and structure.
Include Basis metadata with citations, confidence, and rationale for each non-null field.
If a field cannot be confidently determined from the web, leave it null and explain why in Basis.
"""

Then pass this instruction alongside records and output_schema in your Task input.


Reliability and benchmarking context

Parallel’s Task API is optimized for evidence-based deep research, not just “browsing + summarization”:

  • Built on an AI-native web index + live crawling, not a generic SERP wrapper.
  • Output is oriented around token-dense compressed excerpts and structured fields, not human-facing snippets.
  • For verifiability, Task outputs leverage the Basis framework:
    • Citations (URLs + excerpts)
    • Reasoning/rationale
    • Calibrated confidence

On internal and public benchmarks (e.g., DeepResearch Bench, RACER, WISER-Atomic), Parallel consistently targets the Pareto frontier of accuracy vs CPM vs latency, with clear methodology sections describing:

  • Judge models used
  • Tool constraints (e.g., search-only)
  • Testing windows (to control for web drift)

That’s the backdrop for why teams use Task: to collapse multi-step “search → scrape → parse → summarize” stacks into a single async call with predictable economics.


Putting it together: end-to-end pattern

Here’s a compact blueprint you can adapt:

from parallel import Parallel
import json

client = Parallel(api_key="PARALLEL_API_KEY")

def run_deep_research():
    task_run = client.task_run.create(
        input="Create a comprehensive market research report on the HVAC industry in the USA including an analysis of recent M&A activity and other relevant details. Return as structured JSON with Basis metadata.",
        processor="ultra",
    )
    print(f"Run ID: {task_run.run_id}")

    result = client.task_run.result(task_run.run_id, api_timeout=3600)
    output = result.output

    # Persist or pass to downstream agents
    with open("hvac_market_research.json", "w") as f:
        json.dump(output, f, indent=2)

    return output

if __name__ == "__main__":
    report = run_deep_research()
    print(json.dumps(report, indent=2))

For enrichment, just swap the input to structured records + schema, choosing core or pro processors depending on how deep you need to go.


Final takeaway

To run an async deep research or enrichment job with the Parallel Task API and fetch the final JSON output, you only need three core steps:

  1. Create a Task run with task_run.create(...) — specifying your instructions, optional records/schema, and the processor tier that matches your latency and depth needs.
  2. Wait asynchronously — either using task_run.result(run_id, api_timeout=...) or your own polling loop to handle statuses.
  3. Use the JSON output + Basisrun_result.output gives you structured data with citations, reasoning, and confidence so agents and humans can trust (or reject) every atomic fact.

When you’re ready to wire this into your own agents or data workflows, you can get started with just a few lines of code:

Get Started