Harness Engineering for AI Agents · Self-Improvement

Building a DPO Dataset from Run Logs

Colab Notebook · ~60 min
Google Colab Notebook
Building a DPO Dataset from Run Logs
Python · ~60 min
Open in Colab
Lab Objectives
1
Parse a runs.jsonl file and extract the fields needed for each dataset type (SFT, DPO, reward, trajectory)
2
Build an SFT dataset by filtering runs to final_score >= min_score and PASS status
3
Build cross-run DPO preference pairs by finding same-task run pairs with score_delta >= min_delta
4
Build revision DPO pairs from multi-round Wiggum runs by treating round 1 output as rejected and round 2+ as chosen
5
Export all dataset types to JSONL files in HuggingFace-compatible format
6
Analyze the resulting dataset for size, score delta distribution, and task type coverage

Setup

conda activate ollama-pi
pip install datasets huggingface_hub  # optional, for HF Hub export

This lab uses a synthetic runs.jsonl file generated in the notebook. No real harness runs are required, but if you have a runs.jsonl from your own harness, you can use it instead.


Exercise 1: Parse runs.jsonl

Load and parse the synthetic run log, understanding the schema.

import json
import random
from datetime import datetime, timedelta

# Generate synthetic run log
random.seed(42)

def generate_output(task: str, quality: str) -> str:
    """Generate synthetic output at a given quality level."""
    if quality == "high":
        return f"""# {task.split('for')[0].strip()}

## 1. Technique One
Implement using ChromaDB 0.4.x with all-MiniLM-L6-v2 embeddings:
```python
client = chromadb.Client()
collection = client.create_collection('context')

Error handling: wrap in try/except chromadb.errors.InvalidDimensionException.

2. Technique Two

Use Redis with 24h TTL for search result caching:

cache = redis.Redis(); cache.setex(key, 86400, value)
```"""
    else:
        return f"""# {task.split('for')[0].strip()}

## 1. Technique One
Here you would implement your preferred retrieval approach depending on your setup.

## 2. Technique Two
Here you would apply context management using your specific tool of choice."""

TASKS = [
    "Research top 5 context engineering techniques for production LLM agents save to out.md",
    "Find best practices for cost management in AI agents save to out.md",
    "Explain the 3 most common failure modes in multi-agent systems save to out.md",
]

def make_run(task, quality, session, round_num=1):
    r1_score = random.uniform(6.5, 8.0) if quality == 'low' else random.uniform(7.8, 9.5)
    r2_score = min(10, r1_score + random.uniform(0.3, 1.5)) if round_num == 2 else None
    final_score = r2_score if r2_score else r1_score
    return {
        "run_id": f"{session}_{random.randint(1000,9999)}",
        "session_id": session,
        "timestamp": (datetime.now() - timedelta(hours=random.randint(0, 72))).isoformat(),
        "task": task,
        "task_type": "enumerated" if "top 5" in task or "3 most" in task else "best_practices",
        "producer_model": "pi-qwen-32b",
        "evaluator_model": "Qwen3-Coder:30b",
        "wiggum_scores": {
            "r1": {"relevance": 9, "completeness": 8, "depth": round(r1_score - 0.5), "specificity": round(r1_score - 1), "structure": 9},
            **(({"r2": {"relevance": 9, "completeness": 9, "depth": round(r2_score), "specificity": round(r2_score - 0.5), "structure": 9}}) if r2_score else {})
        },
        "wiggum_rounds": 2 if r2_score else 1,
        "wiggum_r1_score": round(r1_score, 2),
        "final_score": round(final_score, 2),
        "final": "PASS" if final_score >= 8.0 else "FAIL",
        "output_r1": generate_output(task, quality),
        "output_r2": generate_output(task, "high") if r2_score else None,
        "output_bytes": 4200 if quality == 'high' else 1800,
        "output_lines": 120 if quality == 'high' else 45,
        "criteria_check": "PASS" if quality == 'high' else random.choice(["PASS", "FAIL"]),
    }

# Generate 60 synthetic runs
runs = []
for i in range(60):
    task = random.choice(TASKS)
    quality = random.choice(['high', 'high', 'low'])  # 2:1 high:low ratio
    has_revision = random.random() > 0.6
    runs.append(make_run(task, quality, f"session_{i // 15 + 1}", round_num=2 if has_revision else 1))

# Write to JSONL
with open('synthetic_runs.jsonl', 'w') as f:
    for run in runs:
        f.write(json.dumps(run) + '\n')

print(f"Generated {len(runs)} synthetic runs")
print(f"PASS: {sum(1 for r in runs if r['final'] == 'PASS')}")
print(f"With revision: {sum(1 for r in runs if r['wiggum_rounds'] == 2)}")

Exercise 2: Build the SFT Dataset

def build_sft_dataset(runs: list[dict], min_score: float = 8.0) -> list[dict]:
    """High-quality PASS runs as instruction-following examples."""
    sft_examples = []
    for run in runs:
        # YOUR CODE: filter to PASS runs with final_score >= min_score
        # YOUR CODE: format as {"prompt": task, "completion": best_output}
        # Use output_r2 if available (revised = higher quality), else output_r1
        pass
    return sft_examples

sft = build_sft_dataset(runs, min_score=8.0)
print(f"SFT dataset: {len(sft)} examples (of {len(runs)} total runs)")
print(f"Coverage: {len(sft)/len(runs):.1%} of runs qualify")

# Inspect one example:
if sft:
    ex = sft[0]
    print(f"\nExample prompt (truncated): {ex['prompt'][:80]}...")
    print(f"Completion length: {len(ex['completion'])} chars")

Exercise 3: Build Cross-Run DPO Pairs

from itertools import combinations

def build_crossrun_dpo_pairs(runs: list[dict], min_delta: float = 0.5) -> list[dict]:
    """
    Find pairs of runs on the same task where score_delta >= min_delta.
    Higher score run = chosen, lower score run = rejected.
    """
    pairs = []
    # Group runs by task (exact match or semantic — use exact for simplicity)
    by_task = {}
    for run in runs:
        task_key = run['task']
        by_task.setdefault(task_key, []).append(run)

    for task, task_runs in by_task.items():
        if len(task_runs) < 2:
            continue
        # YOUR CODE: for each pair of runs on this task:
        #   - compute delta = abs(score_a - score_b)
        #   - if delta >= min_delta, create a DPO pair
        #   - chosen = run with higher final_score
        #   - rejected = run with lower final_score
        #   - use output_r2 if available, else output_r1
        pass

    return pairs

crossrun_pairs = build_crossrun_dpo_pairs(runs, min_delta=0.5)
print(f"Cross-run DPO pairs: {len(crossrun_pairs)}")

# Show delta distribution:
if crossrun_pairs:
    deltas = [p['score_delta'] for p in crossrun_pairs]
    print(f"Delta range: {min(deltas):.2f} – {max(deltas):.2f}")
    print(f"Mean delta: {sum(deltas)/len(deltas):.2f}")

Exercise 4: Build Revision DPO Pairs

def build_revision_dpo_pairs(runs: list[dict], min_delta: float = 0.5) -> list[dict]:
    """
    For multi-round Wiggum runs: round 1 output = rejected, round 2 = chosen.
    """
    pairs = []
    for run in runs:
        # YOUR CODE: filter to runs with wiggum_rounds >= 2 and output_r2 not None
        # YOUR CODE: compute delta = final_score - wiggum_r1_score
        # YOUR CODE: if delta >= min_delta, create a DPO pair
        pass
    return pairs

revision_pairs = build_revision_dpo_pairs(runs, min_delta=0.5)
print(f"Revision DPO pairs: {len(revision_pairs)}")

# Combine all DPO pairs:
all_dpo = crossrun_pairs + revision_pairs
print(f"Total DPO pairs: {len(all_dpo)}")

Exercise 5: Export to JSONL

import os

os.makedirs('hf_datasets', exist_ok=True)

def write_jsonl(data: list[dict], path: str):
    with open(path, 'w') as f:
        for item in data:
            f.write(json.dumps(item) + '\n')
    print(f"Wrote {len(data)} records to {path}")

# SFT
write_jsonl(sft, 'hf_datasets/sft.jsonl')

# DPO
write_jsonl(all_dpo, 'hf_datasets/dpo.jsonl')

# Reward model: all runs with scalar scores
reward_data = [
    {"prompt": r['task'], "completion": r.get('output_r2') or r['output_r1'], "score": r['final_score']}
    for r in runs
]
write_jsonl(reward_data, 'hf_datasets/reward.jsonl')

# Trajectory: multi-round runs as conversation sequences
def build_trajectory(run: dict) -> dict | None:
    if run['wiggum_rounds'] < 2 or not run.get('output_r2'):
        return None
    return {
        "task": run['task'],
        "turns": [
            {"role": "assistant", "content": run['output_r1']},
            {"role": "user",      "content": "[Evaluator feedback: improve depth and specificity]"},
            {"role": "assistant", "content": run['output_r2']},
        ],
        "final_score": run['final_score']
    }

trajectory_data = [t for r in runs if (t := build_trajectory(r)) is not None]
write_jsonl(trajectory_data, 'hf_datasets/trajectory.jsonl')

Exercise 6: Dataset Analysis

import json
from collections import Counter

# Load and analyze the DPO dataset
dpo_records = [json.loads(l) for l in open('hf_datasets/dpo.jsonl')]

# YOUR CODE: compute and print:
# 1. Total pairs by source (crossrun vs. revision)
# 2. Score delta distribution (min, max, mean, median)
# 3. Task type coverage (enumerated vs. best_practices)
# 4. Chosen vs. rejected output length comparison

# Expected output:
# Source breakdown:
#   crossrun: N pairs
#   revision: N pairs
#
# Score delta distribution:
#   min: 0.50  max: 2.1  mean: 0.98  median: 0.87
#
# Task type coverage:
#   enumerated: N pairs (XX%)
#   best_practices: N pairs (XX%)
#
# Output length:
#   chosen: mean 3,200 chars
#   rejected: mean 1,900 chars

Discussion questions:

  • Why is chosen output consistently longer than rejected? Does length correlate with quality in your synthetic data?
  • What min_delta value would you choose for real runs, and why?
  • Which source (crossrun vs. revision) produces stronger preference signals (larger deltas)?