Building a DPO Dataset from Run Logs
Setup
conda activate ollama-pi
pip install datasets huggingface_hub # optional, for HF Hub export
This lab uses a synthetic runs.jsonl file generated in the notebook. No real harness runs are required, but if you have a runs.jsonl from your own harness, you can use it instead.
Exercise 1: Parse runs.jsonl
Load and parse the synthetic run log, understanding the schema.
import json
import random
from datetime import datetime, timedelta
# Generate synthetic run log
random.seed(42)
def generate_output(task: str, quality: str) -> str:
"""Generate synthetic output at a given quality level."""
if quality == "high":
return f"""# {task.split('for')[0].strip()}
## 1. Technique One
Implement using ChromaDB 0.4.x with all-MiniLM-L6-v2 embeddings:
```python
client = chromadb.Client()
collection = client.create_collection('context')
Error handling: wrap in try/except chromadb.errors.InvalidDimensionException.
2. Technique Two
Use Redis with 24h TTL for search result caching:
cache = redis.Redis(); cache.setex(key, 86400, value)
```"""
else:
return f"""# {task.split('for')[0].strip()}
## 1. Technique One
Here you would implement your preferred retrieval approach depending on your setup.
## 2. Technique Two
Here you would apply context management using your specific tool of choice."""
TASKS = [
"Research top 5 context engineering techniques for production LLM agents save to out.md",
"Find best practices for cost management in AI agents save to out.md",
"Explain the 3 most common failure modes in multi-agent systems save to out.md",
]
def make_run(task, quality, session, round_num=1):
r1_score = random.uniform(6.5, 8.0) if quality == 'low' else random.uniform(7.8, 9.5)
r2_score = min(10, r1_score + random.uniform(0.3, 1.5)) if round_num == 2 else None
final_score = r2_score if r2_score else r1_score
return {
"run_id": f"{session}_{random.randint(1000,9999)}",
"session_id": session,
"timestamp": (datetime.now() - timedelta(hours=random.randint(0, 72))).isoformat(),
"task": task,
"task_type": "enumerated" if "top 5" in task or "3 most" in task else "best_practices",
"producer_model": "pi-qwen-32b",
"evaluator_model": "Qwen3-Coder:30b",
"wiggum_scores": {
"r1": {"relevance": 9, "completeness": 8, "depth": round(r1_score - 0.5), "specificity": round(r1_score - 1), "structure": 9},
**(({"r2": {"relevance": 9, "completeness": 9, "depth": round(r2_score), "specificity": round(r2_score - 0.5), "structure": 9}}) if r2_score else {})
},
"wiggum_rounds": 2 if r2_score else 1,
"wiggum_r1_score": round(r1_score, 2),
"final_score": round(final_score, 2),
"final": "PASS" if final_score >= 8.0 else "FAIL",
"output_r1": generate_output(task, quality),
"output_r2": generate_output(task, "high") if r2_score else None,
"output_bytes": 4200 if quality == 'high' else 1800,
"output_lines": 120 if quality == 'high' else 45,
"criteria_check": "PASS" if quality == 'high' else random.choice(["PASS", "FAIL"]),
}
# Generate 60 synthetic runs
runs = []
for i in range(60):
task = random.choice(TASKS)
quality = random.choice(['high', 'high', 'low']) # 2:1 high:low ratio
has_revision = random.random() > 0.6
runs.append(make_run(task, quality, f"session_{i // 15 + 1}", round_num=2 if has_revision else 1))
# Write to JSONL
with open('synthetic_runs.jsonl', 'w') as f:
for run in runs:
f.write(json.dumps(run) + '\n')
print(f"Generated {len(runs)} synthetic runs")
print(f"PASS: {sum(1 for r in runs if r['final'] == 'PASS')}")
print(f"With revision: {sum(1 for r in runs if r['wiggum_rounds'] == 2)}")
Exercise 2: Build the SFT Dataset
def build_sft_dataset(runs: list[dict], min_score: float = 8.0) -> list[dict]:
"""High-quality PASS runs as instruction-following examples."""
sft_examples = []
for run in runs:
# YOUR CODE: filter to PASS runs with final_score >= min_score
# YOUR CODE: format as {"prompt": task, "completion": best_output}
# Use output_r2 if available (revised = higher quality), else output_r1
pass
return sft_examples
sft = build_sft_dataset(runs, min_score=8.0)
print(f"SFT dataset: {len(sft)} examples (of {len(runs)} total runs)")
print(f"Coverage: {len(sft)/len(runs):.1%} of runs qualify")
# Inspect one example:
if sft:
ex = sft[0]
print(f"\nExample prompt (truncated): {ex['prompt'][:80]}...")
print(f"Completion length: {len(ex['completion'])} chars")
Exercise 3: Build Cross-Run DPO Pairs
from itertools import combinations
def build_crossrun_dpo_pairs(runs: list[dict], min_delta: float = 0.5) -> list[dict]:
"""
Find pairs of runs on the same task where score_delta >= min_delta.
Higher score run = chosen, lower score run = rejected.
"""
pairs = []
# Group runs by task (exact match or semantic — use exact for simplicity)
by_task = {}
for run in runs:
task_key = run['task']
by_task.setdefault(task_key, []).append(run)
for task, task_runs in by_task.items():
if len(task_runs) < 2:
continue
# YOUR CODE: for each pair of runs on this task:
# - compute delta = abs(score_a - score_b)
# - if delta >= min_delta, create a DPO pair
# - chosen = run with higher final_score
# - rejected = run with lower final_score
# - use output_r2 if available, else output_r1
pass
return pairs
crossrun_pairs = build_crossrun_dpo_pairs(runs, min_delta=0.5)
print(f"Cross-run DPO pairs: {len(crossrun_pairs)}")
# Show delta distribution:
if crossrun_pairs:
deltas = [p['score_delta'] for p in crossrun_pairs]
print(f"Delta range: {min(deltas):.2f} – {max(deltas):.2f}")
print(f"Mean delta: {sum(deltas)/len(deltas):.2f}")
Exercise 4: Build Revision DPO Pairs
def build_revision_dpo_pairs(runs: list[dict], min_delta: float = 0.5) -> list[dict]:
"""
For multi-round Wiggum runs: round 1 output = rejected, round 2 = chosen.
"""
pairs = []
for run in runs:
# YOUR CODE: filter to runs with wiggum_rounds >= 2 and output_r2 not None
# YOUR CODE: compute delta = final_score - wiggum_r1_score
# YOUR CODE: if delta >= min_delta, create a DPO pair
pass
return pairs
revision_pairs = build_revision_dpo_pairs(runs, min_delta=0.5)
print(f"Revision DPO pairs: {len(revision_pairs)}")
# Combine all DPO pairs:
all_dpo = crossrun_pairs + revision_pairs
print(f"Total DPO pairs: {len(all_dpo)}")
Exercise 5: Export to JSONL
import os
os.makedirs('hf_datasets', exist_ok=True)
def write_jsonl(data: list[dict], path: str):
with open(path, 'w') as f:
for item in data:
f.write(json.dumps(item) + '\n')
print(f"Wrote {len(data)} records to {path}")
# SFT
write_jsonl(sft, 'hf_datasets/sft.jsonl')
# DPO
write_jsonl(all_dpo, 'hf_datasets/dpo.jsonl')
# Reward model: all runs with scalar scores
reward_data = [
{"prompt": r['task'], "completion": r.get('output_r2') or r['output_r1'], "score": r['final_score']}
for r in runs
]
write_jsonl(reward_data, 'hf_datasets/reward.jsonl')
# Trajectory: multi-round runs as conversation sequences
def build_trajectory(run: dict) -> dict | None:
if run['wiggum_rounds'] < 2 or not run.get('output_r2'):
return None
return {
"task": run['task'],
"turns": [
{"role": "assistant", "content": run['output_r1']},
{"role": "user", "content": "[Evaluator feedback: improve depth and specificity]"},
{"role": "assistant", "content": run['output_r2']},
],
"final_score": run['final_score']
}
trajectory_data = [t for r in runs if (t := build_trajectory(r)) is not None]
write_jsonl(trajectory_data, 'hf_datasets/trajectory.jsonl')
Exercise 6: Dataset Analysis
import json
from collections import Counter
# Load and analyze the DPO dataset
dpo_records = [json.loads(l) for l in open('hf_datasets/dpo.jsonl')]
# YOUR CODE: compute and print:
# 1. Total pairs by source (crossrun vs. revision)
# 2. Score delta distribution (min, max, mean, median)
# 3. Task type coverage (enumerated vs. best_practices)
# 4. Chosen vs. rejected output length comparison
# Expected output:
# Source breakdown:
# crossrun: N pairs
# revision: N pairs
#
# Score delta distribution:
# min: 0.50 max: 2.1 mean: 0.98 median: 0.87
#
# Task type coverage:
# enumerated: N pairs (XX%)
# best_practices: N pairs (XX%)
#
# Output length:
# chosen: mean 3,200 chars
# rejected: mean 1,900 chars
Discussion questions:
- Why is chosen output consistently longer than rejected? Does length correlate with quality in your synthetic data?
- What min_delta value would you choose for real runs, and why?
- Which source (crossrun vs. revision) produces stronger preference signals (larger deltas)?