Pipeline Orchestrator: - Add 07-pipeline-orchestrator skill with code/CLAUDE.md and desktop/SKILL.md - Add /reference-curator-pipeline slash command for full workflow automation - Add pipeline_runs and pipeline_iteration_tracker tables to schema.sql - Add v_pipeline_status and v_pipeline_iterations views - Add pipeline_config.yaml configuration template - Update AGENTS.md with Reference Curator Skills section - Update claude-project files with pipeline documentation Skill Format Refactoring: - Extract YAML frontmatter from SKILL.md files to separate skill.yaml - Add tools/ directories with MCP tool documentation - Update SKILL-FORMAT-REQUIREMENTS.md with new structure - Add migrate-skill-structure.py script for format conversion Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
7.3 KiB
7.3 KiB
Pipeline Orchestrator
Coordinates the full reference curation workflow, handling QA loops and state management.
Pipeline Architecture
[Input: Topic | URLs | Manifest]
│
▼
1. reference-discovery ──────────────────┐
(skip if URLs/manifest) │
│ │
▼ │
2. web-crawler-orchestrator │
│ │
▼ │
3. content-repository │
│ │
▼ │
4. content-distiller ◄───────────────────┤
│ │
▼ │
5. quality-reviewer │
│ │
┌─────┼─────┬────────────────┐ │
▼ ▼ ▼ ▼ │
APPROVE REJECT REFACTOR DEEP_RESEARCH│
│ │ │ │ │
│ │ └─────────────┤ │
│ │ └───────┘
▼ ▼
6. markdown-exporter archive
│
▼
[Complete]
Input Modes
| Mode | Example Input | Pipeline Start |
|---|---|---|
| Topic | "Claude system prompts" |
Stage 1 (discovery) |
| URLs | ["https://docs.anthropic.com/..."] |
Stage 2 (crawler) |
| Manifest | Path to manifest.json |
Stage 2 (crawler) |
Configuration Options
pipeline:
max_sources: 10 # Discovery limit
max_pages: 50 # Pages per source
auto_approve: false # Auto-approve above threshold
approval_threshold: 0.85
qa_loop:
max_refactor_iterations: 3
max_deep_research_iterations: 2
max_total_iterations: 5
export:
format: project_files # or fine_tuning, jsonl
Pipeline Execution
Stage 1: Reference Discovery
For topic-based input, search and validate authoritative sources:
def run_discovery(topic, max_sources=10):
# Uses WebSearch to find sources
# Validates credibility
# Outputs manifest.json with source URLs
sources = search_authoritative_sources(topic, max_sources)
validate_and_rank_sources(sources)
write_manifest(sources)
return manifest_path
Stage 2: Web Crawler
Crawl URLs from manifest or direct input:
def run_crawler(input_source, max_pages=50):
# Selects optimal crawler backend
# Respects rate limits
# Stores raw content
urls = load_urls(input_source)
for url in urls:
crawl_with_best_backend(url, max_pages)
return crawl_results
Stage 3: Content Repository
Store crawled content with deduplication:
def run_repository(crawl_results):
# Deduplicates by URL hash
# Tracks versions
# Returns stored doc IDs
for result in crawl_results:
store_document(result)
return stored_doc_ids
Stage 4: Content Distiller
Process raw content into structured summaries:
def run_distiller(doc_ids, refactor_instructions=None):
# Extracts key concepts
# Generates summaries
# Creates structured markdown
for doc_id in doc_ids:
distill_document(doc_id, instructions=refactor_instructions)
return distilled_ids
Stage 5: Quality Reviewer
Score and route content based on quality:
def run_reviewer(distilled_ids, auto_approve=False, threshold=0.85):
decisions = {}
for distill_id in distilled_ids:
score, assessment = score_content(distill_id)
if auto_approve and score >= threshold:
decisions[distill_id] = ('approve', None)
elif score >= 0.85:
decisions[distill_id] = ('approve', None)
elif score >= 0.60:
instructions = generate_feedback(assessment)
decisions[distill_id] = ('refactor', instructions)
elif score >= 0.40:
queries = generate_research_queries(assessment)
decisions[distill_id] = ('deep_research', queries)
else:
decisions[distill_id] = ('reject', assessment)
return decisions
Stage 6: Markdown Exporter
Export approved content:
def run_exporter(approved_ids, format='project_files'):
# Organizes by topic
# Generates INDEX.md
# Creates cross-references
export_documents(approved_ids, format=format)
return export_path
QA Loop Handling
def handle_qa_loop(distill_id, decision, iteration_tracker):
counts = iteration_tracker.get(distill_id, {'refactor': 0, 'deep_research': 0})
if decision == 'refactor':
if counts['refactor'] >= MAX_REFACTOR:
return 'needs_manual_review'
counts['refactor'] += 1
iteration_tracker[distill_id] = counts
return 're_distill'
if decision == 'deep_research':
if counts['deep_research'] >= MAX_DEEP_RESEARCH:
return 'needs_manual_review'
counts['deep_research'] += 1
iteration_tracker[distill_id] = counts
return 're_crawl'
return decision
State Management
MySQL Backend (Preferred)
SELECT run_id, status, current_stage, stats
FROM pipeline_runs
WHERE run_id = ?;
File-Based Fallback
~/reference-library/pipeline_state/
├── run_001/
│ ├── state.json # Pipeline state
│ ├── manifest.json # Discovered sources
│ ├── crawl_results.json
│ └── review_log.json # QA decisions
State JSON format:
{
"run_id": "run_001",
"run_type": "topic",
"input_value": "Claude system prompts",
"status": "running",
"current_stage": "distilling",
"stats": {
"sources_discovered": 5,
"pages_crawled": 45,
"approved": 0,
"refactored": 0
},
"started_at": "2026-01-29T10:00:00Z"
}
Checkpointing
Checkpoint after each stage to enable resume:
| Checkpoint | Trigger | Resume From |
|---|---|---|
discovery_complete |
Manifest saved | → crawler |
crawl_complete |
All pages crawled | → repository |
store_complete |
Docs in database | → distiller |
distill_complete |
Content processed | → reviewer |
review_complete |
Decisions logged | → exporter |
export_complete |
Files generated | Done |
Output Summary
{
"run_id": 123,
"status": "completed",
"duration_minutes": 15,
"stats": {
"sources_discovered": 5,
"pages_crawled": 45,
"documents_stored": 45,
"documents_distilled": 45,
"approved": 40,
"refactored": 8,
"deep_researched": 2,
"rejected": 3,
"needs_manual_review": 2
},
"exports": {
"format": "project_files",
"path": "~/reference-library/exports/",
"document_count": 40
}
}
Error Handling
On stage failure:
- Save checkpoint with error state
- Log error details
- Report to user with resume instructions
try:
run_stage(stage_name)
save_checkpoint(stage_name, 'complete')
except Exception as e:
save_checkpoint(stage_name, 'failed', error=str(e))
report_error(f"Pipeline paused at {stage_name}: {e}")