Harness Engineering for AI Agents · Production Systems

Orchestration

12 min read
By the end of this reading you will be able to:
  • Describe how the orchestrator decomposes a compound task into subtasks, including what information each subtask prompt receives
  • Explain the role of ThreadPoolExecutor in parallel subtask execution and identify the constraints that limit parallelism
  • Trace how subtask outputs are assembled into a final document and verified by a final Wiggum pass

When to Use the Orchestrator

The orchestrator is not the default — use it only when a task genuinely requires parallel, independent research across multiple non-overlapping subtopics.

Good orchestrator tasks:

  • "Research agent failure modes and context engineering, synthesize into a unified guide"
  • "Write a comprehensive overview of PPO, SAC, and DDPG for a practitioner audience"
  • "Survey the literature on LoRA fine-tuning from three angles: training efficiency, task transfer, and deployment"

Bad orchestrator tasks:

  • "Find the top 5 context engineering techniques" — single focus, agent.py handles it better
  • "Explain attention mechanisms" — single topic, no benefit from decomposition
  • "Annotate this paper" — single-document task, skill handles it better

The overhead of decomposition, parallel execution, and assembly adds latency and token cost. For single-focus tasks, the single-agent pipeline is faster and produces tighter output.

Decomposition

The orchestrator calls a decomposition prompt before any research begins:

DECOMPOSE_PROMPT = """\
Compound task: {task}

Decompose this into 2-4 independent subtasks. Each subtask should:
- Cover a distinct, non-overlapping scope
- Be researchable independently without knowledge of other subtasks
- Produce output that will be assembled into a unified document

Respond as JSON:
[{{"id": 1, "title": "...", "scope": "...", "out_of_scope": [...]}}, ...]"""

def decompose(task: str, model: str) -> list[dict]:
    response = ollama.chat(model=model,
                           messages=[{"role": "user", "content":
                               DECOMPOSE_PROMPT.format(task=task)}])
    return json.loads(response["message"]["content"])

Each subtask dict carries both positive scope ("what this subtask covers") and negative scope ("what other subtasks cover, explicitly excluded"). The negative scope is what prevents role ambiguity — telling each subtask what not to cover is more constraining than positive scope alone.

Parallel Execution

from concurrent.futures import ThreadPoolExecutor, as_completed

MAX_WORKERS = 4

def run_parallel_subtasks(subtasks: list[dict], base_task: str,
                           producer_model: str) -> list[str]:
    results = {}

    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        futures = {}
        for st in subtasks:
            subtask_prompt = build_subtask_prompt(st, base_task, len(subtasks))
            future = executor.submit(
                run_single_agent,
                subtask_prompt,
                producer_model,
                temp_path=f"_sub_{st['id']}.md"  # temp output file
            )
            futures[future] = st["id"]

        for future in as_completed(futures):
            st_id = futures[future]
            try:
                output = future.result()
                results[st_id] = output
                log(f"[orchestrator] subtask {st_id} complete")
            except Exception as e:
                log(f"[orchestrator] subtask {st_id} failed: {e}")
                results[st_id] = f"[Subtask {st_id} failed: {e}]"

    # Return in original order
    return [results[st["id"]] for st in subtasks]

Parallelism is bounded at 4 workers because:

  1. VRAM: Each agent.py instance loads a model — with keep-alive, the model stays warm, but multiple concurrent calls still compete for VRAM
  2. Search cache: Multiple concurrent search calls to the same DDGS backend can trigger rate limiting
  3. Disk I/O: Each subtask writes a temp file; more than 4 concurrent writes on a slow disk creates contention

The Temp File Pattern

Each subtask writes to a temp file (_sub_1.md, _sub_2.md, ...) rather than the final output path. This serves two purposes:

  1. If a subtask fails, the partial output is preserved for debugging
  2. The assembly stage reads from these files rather than from in-memory strings, keeping memory overhead low for very large outputs

After successful assembly, the orchestrator deletes the temp files:

for st in subtasks:
    temp = f"_sub_{st['id']}.md"
    if os.path.exists(temp):
        os.remove(temp)

Assembly

ASSEMBLE_PROMPT = """\
You have completed research on the following compound task:
{task}

You have {n} research outputs covering different aspects:
{subtask_summaries}

Assemble these into a unified, coherent document that:
- Synthesizes themes that appear across multiple subtasks
- Explicitly notes tensions or contradictions between subtask findings
- Avoids simple concatenation — integrate, don't just join
- Begins with a summary of the whole document

Subtask outputs follow."""

def assemble(task, subtask_outputs, subtasks, producer_model):
    summaries = "\n".join(
        f"Subtask {i+1}: {st['title']} — {st['scope']}"
        for i, st in enumerate(subtasks)
    )
    combined_context = "\n\n---\n\n".join(
        f"=== Subtask {i+1}: {subtasks[i]['title']} ===\n{output}"
        for i, output in enumerate(subtask_outputs)
    )
    prompt = ASSEMBLE_PROMPT.format(task=task, n=len(subtasks),
                                     subtask_summaries=summaries)
    return call_producer(prompt + "\n\n" + combined_context, producer_model)

The assembly call is where the orchestrator earns its cost. A naive approach would concatenate subtask outputs and call it done. The assembly prompt asks the model to identify cross-cutting themes, note contradictions, and produce a document that reads as a unified piece rather than a collage.

Final Wiggum Pass

After assembly, the assembled document passes through a final Wiggum evaluation — the same loop as single-agent runs:

assembled = assemble(task, subtask_outputs, subtasks, producer_model)
final_output, scores, verdict = wiggum_loop(
    task, assembled, "research",  # compound tasks are always task_type="research"
    producer_model, evaluator_model
)
write_file(output_path, final_output)

The final Wiggum pass checks the assembled document as a whole — not individual subtasks. A document where all subtasks pass their local Wiggum checks can still fail the assembly check if the integration is poor.