Files
Andrew Yim d1cd1298a8 feat(reference-curator): Add pipeline orchestrator and refactor skill format
Pipeline Orchestrator:
- Add 07-pipeline-orchestrator skill with code/CLAUDE.md and desktop/SKILL.md
- Add /reference-curator-pipeline slash command for full workflow automation
- Add pipeline_runs and pipeline_iteration_tracker tables to schema.sql
- Add v_pipeline_status and v_pipeline_iterations views
- Add pipeline_config.yaml configuration template
- Update AGENTS.md with Reference Curator Skills section
- Update claude-project files with pipeline documentation

Skill Format Refactoring:
- Extract YAML frontmatter from SKILL.md files to separate skill.yaml
- Add tools/ directories with MCP tool documentation
- Update SKILL-FORMAT-REQUIREMENTS.md with new structure
- Add migrate-skill-structure.py script for format conversion

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-29 01:01:02 +07:00

7.8 KiB

Pipeline Orchestrator

Coordinates the full 6-skill reference curation workflow with QA loop handling.

Trigger Keywords

"curate references", "full pipeline", "run curation", "reference-curator-pipeline"

Architecture

[Input] → discovery → crawler → repository → distiller ◄──┐
                                                    │     │
                                              reviewer     │
                                                    │     │
                    ┌───────────────────────────────┼─────┤
                    ▼           ▼                   ▼     │
                 APPROVE     REJECT          REFACTOR ────┤
                    │           │                         │
                    ▼           ▼                DEEP_RESEARCH
                 export      archive                 │
                                                     ▼
                                                  crawler ─┘

Input Detection

Parse input to determine mode:

def detect_input_mode(input_value):
    if input_value.endswith('.json') and os.path.exists(input_value):
        return 'manifest'
    elif input_value.startswith('http://') or input_value.startswith('https://'):
        return 'urls'
    else:
        return 'topic'

Pipeline Execution

Stage 1: Reference Discovery (Topic Mode Only)

# Skip if input mode is 'urls' or 'manifest'
if mode == 'topic':
    /reference-discovery "$TOPIC" --max-sources $MAX_SOURCES
    # Output: manifest.json

Stage 2: Web Crawler

# From manifest or URLs
/web-crawler $INPUT --max-pages $MAX_PAGES
# Output: crawled files in ~/reference-library/raw/

Stage 3: Content Repository

/content-repository store
# Output: documents stored in MySQL or file-based storage

Stage 4: Content Distiller

/content-distiller all-pending
# Output: distilled content records

Stage 5: Quality Reviewer

if auto_approve:
    /quality-reviewer all-pending --auto-approve --threshold $THRESHOLD
else:
    /quality-reviewer all-pending

Handle QA decisions:

  • APPROVE: Add to export queue
  • REFACTOR: Re-run distiller with feedback (track iteration count)
  • DEEP_RESEARCH: Run crawler for additional sources, then distill
  • REJECT: Archive with reason

Stage 6: Markdown Exporter

/markdown-exporter $EXPORT_FORMAT
# Output: files in ~/reference-library/exports/

State Management

Initialize Pipeline State

def init_pipeline_state(run_id, input_value, options):
    state = {
        "run_id": run_id,
        "run_type": detect_input_mode(input_value),
        "input_value": input_value,
        "status": "running",
        "current_stage": "discovery",
        "options": options,
        "stats": {
            "sources_discovered": 0,
            "pages_crawled": 0,
            "documents_stored": 0,
            "documents_distilled": 0,
            "approved": 0,
            "refactored": 0,
            "deep_researched": 0,
            "rejected": 0,
            "needs_manual_review": 0
        },
        "started_at": datetime.now().isoformat()
    }
    save_state(run_id, state)
    return state

MySQL State (Preferred)

INSERT INTO pipeline_runs (run_type, input_value, options)
VALUES ('topic', 'Claude system prompts', '{"max_sources": 10}');

File-Based Fallback

~/reference-library/pipeline_state/run_XXX/
├── state.json           # Current stage and stats
├── manifest.json        # Discovered sources
├── crawl_results.json   # Crawled document paths
├── review_log.json      # QA decisions per document
└── errors.log           # Any errors encountered

QA Loop Logic

MAX_REFACTOR_ITERATIONS = 3
MAX_DEEP_RESEARCH_ITERATIONS = 2
MAX_TOTAL_ITERATIONS = 5

def handle_qa_decision(doc_id, decision, iteration_counts):
    refactor_count = iteration_counts.get('refactor', 0)
    research_count = iteration_counts.get('deep_research', 0)
    total = refactor_count + research_count

    if total >= MAX_TOTAL_ITERATIONS:
        return 'needs_manual_review'

    if decision == 'refactor':
        if refactor_count >= MAX_REFACTOR_ITERATIONS:
            return 'needs_manual_review'
        iteration_counts['refactor'] = refactor_count + 1
        return 're_distill'

    if decision == 'deep_research':
        if research_count >= MAX_DEEP_RESEARCH_ITERATIONS:
            return 'needs_manual_review'
        iteration_counts['deep_research'] = research_count + 1
        return 're_crawl_and_distill'

    return decision  # approve or reject

Checkpoint Strategy

Save checkpoint after each stage completes:

Stage Checkpoint Resume Point
discovery manifest.json created → crawler
crawl crawl_results.json → repository
store DB records or file list → distiller
distill distilled_content records → reviewer
review review_logs records → exporter or loop
export final export complete Done

Progress Reporting

Report progress to user at key checkpoints:

[Pipeline] Stage 1/6: Discovery - Found 8 sources
[Pipeline] Stage 2/6: Crawling - 45/50 pages complete
[Pipeline] Stage 3/6: Storing - 45 documents saved
[Pipeline] Stage 4/6: Distilling - 45 documents processed
[Pipeline] Stage 5/6: Reviewing - 40 approved, 3 refactored, 2 rejected
[Pipeline] Stage 6/6: Exporting - 40 documents exported
[Pipeline] Complete! See ~/reference-library/exports/

Error Handling

def handle_stage_error(stage, error, state):
    state['status'] = 'paused'
    state['error_message'] = str(error)
    state['error_stage'] = stage
    save_state(state['run_id'], state)

    # Log to errors.log
    log_error(state['run_id'], stage, error)

    # Report to user
    return f"Pipeline paused at {stage}: {error}. Resume with run_id {state['run_id']}"

Resume Pipeline

def resume_pipeline(run_id):
    state = load_state(run_id)

    if state['status'] != 'paused':
        return f"Pipeline {run_id} is {state['status']}, cannot resume"

    stage = state['current_stage']
    state['status'] = 'running'
    state['error_message'] = None
    save_state(run_id, state)

    # Resume from failed stage
    return execute_from_stage(stage, state)

Output Summary

On completion, generate summary:

{
  "run_id": 123,
  "status": "completed",
  "duration_minutes": 15,
  "stats": {
    "sources_discovered": 5,
    "pages_crawled": 45,
    "documents_stored": 45,
    "documents_distilled": 45,
    "approved": 40,
    "refactored": 8,
    "deep_researched": 2,
    "rejected": 3,
    "needs_manual_review": 2
  },
  "exports": {
    "format": "project_files",
    "path": "~/reference-library/exports/",
    "document_count": 40
  },
  "errors": []
}

Integration Points

Skill Called By Provides
reference-discovery Orchestrator manifest.json
web-crawler Orchestrator Raw crawled files
content-repository Orchestrator Stored documents
content-distiller Orchestrator, QA loop Distilled content
quality-reviewer Orchestrator QA decisions
markdown-exporter Orchestrator Final exports

Configuration

Read from ~/.config/reference-curator/pipeline_config.yaml:

pipeline:
  max_sources: 10
  max_pages: 50
  auto_approve: false
  approval_threshold: 0.85

qa_loop:
  max_refactor_iterations: 3
  max_deep_research_iterations: 2
  max_total_iterations: 5

export:
  default_format: project_files
  include_rejected: false

state:
  backend: mysql  # or 'file'
  state_directory: ~/reference-library/pipeline_state/