--- name: pipeline-orchestrator description: | Full reference curation pipeline coordinator with QA loop and state management. Triggers: run pipeline, orchestrate workflow, full curation, pipeline start. --- # Pipeline Orchestrator Coordinates the full reference curation workflow, handling QA loops and state management. ## Pipeline Architecture ``` [Input: Topic | URLs | Manifest] │ ▼ 1. reference-discovery ──────────────────┐ (skip if URLs/manifest) │ │ │ ▼ │ 2. web-crawler-orchestrator │ │ │ ▼ │ 3. content-repository │ │ │ ▼ │ 4. content-distiller ◄───────────────────┤ │ │ ▼ │ 5. quality-reviewer │ │ │ ┌─────┼─────┬────────────────┐ │ ▼ ▼ ▼ ▼ │ APPROVE REJECT REFACTOR DEEP_RESEARCH│ │ │ │ │ │ │ │ └─────────────┤ │ │ │ └───────┘ ▼ ▼ 6. markdown-exporter archive │ ▼ [Complete] ``` ## Input Modes | Mode | Example Input | Pipeline Start | |------|--------------|----------------| | **Topic** | `"Claude system prompts"` | Stage 1 (discovery) | | **URLs** | `["https://docs.anthropic.com/..."]` | Stage 2 (crawler) | | **Manifest** | Path to `manifest.json` | Stage 2 (crawler) | ## Configuration Options ```yaml pipeline: max_sources: 10 # Discovery limit max_pages: 50 # Pages per source auto_approve: false # Auto-approve above threshold approval_threshold: 0.85 qa_loop: max_refactor_iterations: 3 max_deep_research_iterations: 2 max_total_iterations: 5 export: format: project_files # or fine_tuning, jsonl ``` ## Pipeline Execution ### Stage 1: Reference Discovery For topic-based input, search and validate authoritative sources: ```python def run_discovery(topic, max_sources=10): # Uses WebSearch to find sources # Validates credibility # Outputs manifest.json with source URLs sources = search_authoritative_sources(topic, max_sources) validate_and_rank_sources(sources) write_manifest(sources) return manifest_path ``` ### Stage 2: Web Crawler Crawl URLs from manifest or direct input: ```python def run_crawler(input_source, max_pages=50): # Selects optimal crawler backend # Respects rate limits # Stores raw content urls = load_urls(input_source) for url in urls: crawl_with_best_backend(url, max_pages) return crawl_results ``` ### Stage 3: Content Repository Store crawled content with deduplication: ```python def run_repository(crawl_results): # Deduplicates by URL hash # Tracks versions # Returns stored doc IDs for result in crawl_results: store_document(result) return stored_doc_ids ``` ### Stage 4: Content Distiller Process raw content into structured summaries: ```python def run_distiller(doc_ids, refactor_instructions=None): # Extracts key concepts # Generates summaries # Creates structured markdown for doc_id in doc_ids: distill_document(doc_id, instructions=refactor_instructions) return distilled_ids ``` ### Stage 5: Quality Reviewer Score and route content based on quality: ```python def run_reviewer(distilled_ids, auto_approve=False, threshold=0.85): decisions = {} for distill_id in distilled_ids: score, assessment = score_content(distill_id) if auto_approve and score >= threshold: decisions[distill_id] = ('approve', None) elif score >= 0.85: decisions[distill_id] = ('approve', None) elif score >= 0.60: instructions = generate_feedback(assessment) decisions[distill_id] = ('refactor', instructions) elif score >= 0.40: queries = generate_research_queries(assessment) decisions[distill_id] = ('deep_research', queries) else: decisions[distill_id] = ('reject', assessment) return decisions ``` ### Stage 6: Markdown Exporter Export approved content: ```python def run_exporter(approved_ids, format='project_files'): # Organizes by topic # Generates INDEX.md # Creates cross-references export_documents(approved_ids, format=format) return export_path ``` ## QA Loop Handling ```python def handle_qa_loop(distill_id, decision, iteration_tracker): counts = iteration_tracker.get(distill_id, {'refactor': 0, 'deep_research': 0}) if decision == 'refactor': if counts['refactor'] >= MAX_REFACTOR: return 'needs_manual_review' counts['refactor'] += 1 iteration_tracker[distill_id] = counts return 're_distill' if decision == 'deep_research': if counts['deep_research'] >= MAX_DEEP_RESEARCH: return 'needs_manual_review' counts['deep_research'] += 1 iteration_tracker[distill_id] = counts return 're_crawl' return decision ``` ## State Management ### MySQL Backend (Preferred) ```sql SELECT run_id, status, current_stage, stats FROM pipeline_runs WHERE run_id = ?; ``` ### File-Based Fallback ``` ~/reference-library/pipeline_state/ ├── run_001/ │ ├── state.json # Pipeline state │ ├── manifest.json # Discovered sources │ ├── crawl_results.json │ └── review_log.json # QA decisions ``` State JSON format: ```json { "run_id": "run_001", "run_type": "topic", "input_value": "Claude system prompts", "status": "running", "current_stage": "distilling", "stats": { "sources_discovered": 5, "pages_crawled": 45, "approved": 0, "refactored": 0 }, "started_at": "2026-01-29T10:00:00Z" } ``` ## Checkpointing Checkpoint after each stage to enable resume: | Checkpoint | Trigger | Resume From | |------------|---------|-------------| | `discovery_complete` | Manifest saved | → crawler | | `crawl_complete` | All pages crawled | → repository | | `store_complete` | Docs in database | → distiller | | `distill_complete` | Content processed | → reviewer | | `review_complete` | Decisions logged | → exporter | | `export_complete` | Files generated | Done | ## Output Summary ```json { "run_id": 123, "status": "completed", "duration_minutes": 15, "stats": { "sources_discovered": 5, "pages_crawled": 45, "documents_stored": 45, "documents_distilled": 45, "approved": 40, "refactored": 8, "deep_researched": 2, "rejected": 3, "needs_manual_review": 2 }, "exports": { "format": "project_files", "path": "~/reference-library/exports/", "document_count": 40 } } ``` ## Error Handling On stage failure: 1. Save checkpoint with error state 2. Log error details 3. Report to user with resume instructions ```python try: run_stage(stage_name) save_checkpoint(stage_name, 'complete') except Exception as e: save_checkpoint(stage_name, 'failed', error=str(e)) report_error(f"Pipeline paused at {stage_name}: {e}") ```