feat: Add multi-agent-guide skill and update reference-curator configs
- Add 91-multi-agent-guide skill for setting up multi-agent collaboration with templates for AGENTS.md, Claude, Gemini, Codex configs, and CI/CD - Add USER-GUIDE.md for reference-curator documentation - Update default paths in reference-curator configs to use ~/Documents/05_AI Agent/10_Reference Library/ - Update settings-audit-report.md Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
496
custom-skills/91-multi-agent-guide/scripts/check-ownership.py
Normal file
496
custom-skills/91-multi-agent-guide/scripts/check-ownership.py
Normal file
@@ -0,0 +1,496 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Multi-Agent Ownership Verification Script
|
||||
|
||||
This script enforces file ownership rules and lock verification for the
|
||||
multi-agent workflow defined in GUARDRAILS.md.
|
||||
|
||||
Usage:
|
||||
# Verify files before commit
|
||||
python tools/check-ownership.py path/to/file.py
|
||||
|
||||
# Check commit message format
|
||||
python tools/check-ownership.py --check-commit-msg
|
||||
|
||||
# Verify setup
|
||||
python tools/check-ownership.py --verify-setup
|
||||
|
||||
# List expired locks/tasks
|
||||
python tools/check-ownership.py --list-expired
|
||||
|
||||
# Validate state files
|
||||
python tools/check-ownership.py --validate-state
|
||||
|
||||
Environment:
|
||||
SEO_AGENT_AUTHOR: Agent identity (claude|gemini|codex|human)
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional, Set, Tuple
|
||||
|
||||
try:
|
||||
import yaml
|
||||
except ImportError:
|
||||
print("Warning: PyYAML not installed. Install with: pip install pyyaml")
|
||||
yaml = None
|
||||
|
||||
# =============================================================================
|
||||
# CUSTOMIZE: Ownership Matrix
|
||||
# =============================================================================
|
||||
# Modify these patterns to match your project structure
|
||||
|
||||
# Primary ownership rules: pattern -> owner
|
||||
OWNERSHIP_MATRIX: Dict[str, str] = {
|
||||
# Claude ownership (customize for your project)
|
||||
r"^src/.*/cli\.py$": "claude",
|
||||
r"^src/.*/core/": "claude",
|
||||
r"^src/.*/main\.py$": "claude",
|
||||
r"^CLAUDE\.md$": "claude",
|
||||
r"^AGENTS\.md$": "claude",
|
||||
r"^GUARDRAILS\.md$": "claude",
|
||||
# Gemini ownership
|
||||
r"^src/.*/integrations/google/": "gemini",
|
||||
r"^src/.*/integrations/gsc\.py$": "gemini",
|
||||
r"^src/.*/integrations/ga.*\.py$": "gemini",
|
||||
r"^docs/.*\.md$": "gemini",
|
||||
r"^GEMINI\.md$": "gemini",
|
||||
# Codex ownership
|
||||
r"^src/.*/models\.py$": "codex",
|
||||
r"^src/.*/utils/": "codex",
|
||||
r"^tests/": "codex",
|
||||
r"^CODEX\.md$": "codex",
|
||||
}
|
||||
|
||||
# Shared files (require coordination, Claude approves)
|
||||
SHARED_FILES: Set[str] = {
|
||||
r"^pyproject\.toml$",
|
||||
r"^package\.json$",
|
||||
r"^config/.*\.yaml$",
|
||||
r"^config/.*\.json$",
|
||||
r"^\\.github/workflows/",
|
||||
r"^PROJECT_PLAN\.md$",
|
||||
}
|
||||
|
||||
# Files that any agent can modify without ownership check
|
||||
UNRESTRICTED_FILES: Set[str] = {
|
||||
r"^\\.agent-state/",
|
||||
r"^\\.pre-commit-config\\.yaml$",
|
||||
r"^\\.gitignore$",
|
||||
r"^README\\.md$",
|
||||
}
|
||||
|
||||
# Valid agents (add custom agents here)
|
||||
VALID_AGENTS = {"claude", "gemini", "codex", "human"}
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Helper Functions
|
||||
# =============================================================================
|
||||
|
||||
|
||||
def get_project_root() -> Path:
|
||||
"""Find the project root directory."""
|
||||
current = Path(__file__).resolve().parent
|
||||
while current != current.parent:
|
||||
if (current / "pyproject.toml").exists():
|
||||
return current
|
||||
if (current / "package.json").exists():
|
||||
return current
|
||||
if (current / ".git").exists():
|
||||
return current
|
||||
current = current.parent
|
||||
return Path.cwd()
|
||||
|
||||
|
||||
def get_agent_author() -> Optional[str]:
|
||||
"""Get the current agent from environment variable."""
|
||||
agent = os.environ.get("SEO_AGENT_AUTHOR", "").lower()
|
||||
return agent if agent in VALID_AGENTS else None
|
||||
|
||||
|
||||
def load_yaml_file(filepath: Path) -> Optional[dict]:
|
||||
"""Load a YAML file safely."""
|
||||
if yaml is None:
|
||||
return None
|
||||
if not filepath.exists():
|
||||
return None
|
||||
try:
|
||||
with open(filepath, "r") as f:
|
||||
return yaml.safe_load(f)
|
||||
except Exception as e:
|
||||
print(f"Warning: Could not load {filepath}: {e}")
|
||||
return None
|
||||
|
||||
|
||||
def get_file_owner(filepath: str) -> Tuple[Optional[str], str]:
|
||||
"""
|
||||
Determine the owner of a file.
|
||||
|
||||
Returns:
|
||||
Tuple of (owner, ownership_type) where ownership_type is
|
||||
'primary', 'shared', 'unrestricted', or 'unknown'
|
||||
"""
|
||||
# Normalize path
|
||||
filepath = filepath.replace("\\", "/").lstrip("./")
|
||||
|
||||
# Check unrestricted first
|
||||
for pattern in UNRESTRICTED_FILES:
|
||||
if re.match(pattern, filepath):
|
||||
return None, "unrestricted"
|
||||
|
||||
# Check shared files
|
||||
for pattern in SHARED_FILES:
|
||||
if re.match(pattern, filepath):
|
||||
return "shared", "shared"
|
||||
|
||||
# Check primary ownership
|
||||
for pattern, owner in OWNERSHIP_MATRIX.items():
|
||||
if re.match(pattern, filepath):
|
||||
return owner, "primary"
|
||||
|
||||
return None, "unknown"
|
||||
|
||||
|
||||
def is_file_locked(filepath: str, root: Path) -> Tuple[bool, Optional[dict]]:
|
||||
"""
|
||||
Check if a file is locked by another agent.
|
||||
|
||||
Returns:
|
||||
Tuple of (is_locked, lock_info)
|
||||
"""
|
||||
locks_file = root / ".agent-state" / "locks.yaml"
|
||||
locks_data = load_yaml_file(locks_file)
|
||||
|
||||
if not locks_data or "locks" not in locks_data:
|
||||
return False, None
|
||||
|
||||
filepath = filepath.replace("\\", "/").lstrip("./")
|
||||
current_time = datetime.now(timezone.utc)
|
||||
|
||||
for lock in locks_data.get("locks", []):
|
||||
if not lock:
|
||||
continue
|
||||
lock_path = lock.get("path", "").replace("\\", "/").lstrip("./")
|
||||
|
||||
# Check if path matches (exact or prefix for directories)
|
||||
if filepath == lock_path or filepath.startswith(lock_path + "/"):
|
||||
# Check if lock is expired
|
||||
expires_at = lock.get("expires_at")
|
||||
if expires_at:
|
||||
try:
|
||||
expiry = datetime.fromisoformat(expires_at.replace("Z", "+00:00"))
|
||||
if current_time > expiry:
|
||||
continue # Lock expired
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
return True, lock
|
||||
|
||||
return False, None
|
||||
|
||||
|
||||
def check_commit_message(msg_file: str) -> bool:
|
||||
"""
|
||||
Verify commit message follows the [Agent] type(scope): description format.
|
||||
"""
|
||||
try:
|
||||
with open(msg_file, "r") as f:
|
||||
first_line = f.readline().strip()
|
||||
except Exception as e:
|
||||
print(f"Error reading commit message: {e}")
|
||||
return False
|
||||
|
||||
# Skip merge commits and fixup commits
|
||||
if first_line.startswith("Merge ") or first_line.startswith("fixup!"):
|
||||
return True
|
||||
|
||||
# Pattern: [Agent] type(scope): description
|
||||
pattern = r"^\[(Claude|Gemini|Codex|Human|CI)\]\s+\w+(\([^)]+\))?:\s+.+"
|
||||
if not re.match(pattern, first_line, re.IGNORECASE):
|
||||
print("Error: Commit message must follow format: [Agent] type(scope): description")
|
||||
print(f" Got: {first_line}")
|
||||
print(" Example: [Claude] feat(core): add new feature")
|
||||
print(" Valid agents: Claude, Gemini, Codex, Human, CI")
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def verify_ownership(files: List[str], agent: str, root: Path) -> Tuple[List[str], List[str]]:
|
||||
"""
|
||||
Verify ownership for a list of files.
|
||||
|
||||
Returns:
|
||||
Tuple of (violations, warnings)
|
||||
"""
|
||||
violations = []
|
||||
warnings = []
|
||||
|
||||
for filepath in files:
|
||||
owner, ownership_type = get_file_owner(filepath)
|
||||
|
||||
# Unrestricted files - anyone can modify
|
||||
if ownership_type == "unrestricted":
|
||||
continue
|
||||
|
||||
# Unknown files - warn but don't block
|
||||
if ownership_type == "unknown":
|
||||
warnings.append(f"Unknown ownership for: {filepath}")
|
||||
continue
|
||||
|
||||
# Shared files - warn (Claude should approve)
|
||||
if ownership_type == "shared":
|
||||
if agent != "claude":
|
||||
warnings.append(f"Shared file modified (requires Claude approval): {filepath}")
|
||||
continue
|
||||
|
||||
# Primary ownership check
|
||||
if owner and owner != agent:
|
||||
# Claude can override any ownership
|
||||
if agent == "claude":
|
||||
warnings.append(f"Claude overriding {owner}'s ownership: {filepath}")
|
||||
else:
|
||||
violations.append(f"Ownership violation: {filepath} is owned by {owner}")
|
||||
|
||||
# Lock check
|
||||
is_locked, lock_info = is_file_locked(filepath, root)
|
||||
if is_locked and lock_info:
|
||||
lock_agent = lock_info.get("agent", "unknown")
|
||||
if lock_agent != agent:
|
||||
# Claude can override locks
|
||||
if agent == "claude" and lock_info.get("lock_type") != "override":
|
||||
warnings.append(f"Claude overriding {lock_agent}'s lock: {filepath}")
|
||||
else:
|
||||
task_id = lock_info.get("task_id", "unknown")
|
||||
violations.append(f"File locked by {lock_agent}: {filepath} (task: {task_id})")
|
||||
|
||||
return violations, warnings
|
||||
|
||||
|
||||
def verify_setup() -> bool:
|
||||
"""Verify that the agent environment is properly configured."""
|
||||
agent = get_agent_author()
|
||||
root = get_project_root()
|
||||
|
||||
print("=== Agent Environment Verification ===\n")
|
||||
|
||||
# Check environment variable
|
||||
if agent:
|
||||
print(f"[OK] SEO_AGENT_AUTHOR is set to: {agent}")
|
||||
else:
|
||||
env_val = os.environ.get("SEO_AGENT_AUTHOR", "")
|
||||
if env_val:
|
||||
print(f"[ERROR] Invalid SEO_AGENT_AUTHOR value: {env_val}")
|
||||
print(f" Valid values: {', '.join(sorted(VALID_AGENTS))}")
|
||||
else:
|
||||
print("[WARNING] SEO_AGENT_AUTHOR is not set")
|
||||
print(" Set with: export SEO_AGENT_AUTHOR=<agent>")
|
||||
|
||||
# Check state files
|
||||
tasks_file = root / ".agent-state" / "tasks.yaml"
|
||||
locks_file = root / ".agent-state" / "locks.yaml"
|
||||
|
||||
if tasks_file.exists():
|
||||
print(f"[OK] Tasks file exists: {tasks_file}")
|
||||
else:
|
||||
print(f"[WARNING] Tasks file not found: {tasks_file}")
|
||||
|
||||
if locks_file.exists():
|
||||
print(f"[OK] Locks file exists: {locks_file}")
|
||||
else:
|
||||
print(f"[WARNING] Locks file not found: {locks_file}")
|
||||
|
||||
# Check GUARDRAILS.md
|
||||
guardrails = root / "GUARDRAILS.md"
|
||||
if guardrails.exists():
|
||||
print("[OK] GUARDRAILS.md exists")
|
||||
else:
|
||||
print("[ERROR] GUARDRAILS.md not found")
|
||||
|
||||
print("\n=== Setup Complete ===")
|
||||
return agent is not None
|
||||
|
||||
|
||||
def list_expired(root: Path) -> None:
|
||||
"""List expired tasks and locks."""
|
||||
current_time = datetime.now(timezone.utc)
|
||||
|
||||
# Check tasks
|
||||
tasks_file = root / ".agent-state" / "tasks.yaml"
|
||||
tasks_data = load_yaml_file(tasks_file)
|
||||
|
||||
print("=== Expired Tasks ===\n")
|
||||
if tasks_data and "tasks" in tasks_data:
|
||||
found_expired = False
|
||||
for task in tasks_data.get("tasks", []):
|
||||
if not task or task.get("status") in ("completed", "abandoned"):
|
||||
continue
|
||||
expires_at = task.get("expires_at")
|
||||
if expires_at:
|
||||
try:
|
||||
expiry = datetime.fromisoformat(expires_at.replace("Z", "+00:00"))
|
||||
if current_time > expiry:
|
||||
found_expired = True
|
||||
print(f" - {task.get('id')}: {task.get('description')}")
|
||||
print(f" Agent: {task.get('agent')}, Expired: {expires_at}")
|
||||
except ValueError:
|
||||
pass
|
||||
if not found_expired:
|
||||
print(" No expired tasks found.")
|
||||
else:
|
||||
print(" No tasks file or empty.")
|
||||
|
||||
# Check locks
|
||||
locks_file = root / ".agent-state" / "locks.yaml"
|
||||
locks_data = load_yaml_file(locks_file)
|
||||
|
||||
print("\n=== Expired Locks ===\n")
|
||||
if locks_data and "locks" in locks_data:
|
||||
found_expired = False
|
||||
for lock in locks_data.get("locks", []):
|
||||
if not lock:
|
||||
continue
|
||||
expires_at = lock.get("expires_at")
|
||||
if expires_at:
|
||||
try:
|
||||
expiry = datetime.fromisoformat(expires_at.replace("Z", "+00:00"))
|
||||
if current_time > expiry:
|
||||
found_expired = True
|
||||
print(f" - {lock.get('path')}")
|
||||
print(f" Agent: {lock.get('agent')}, Task: {lock.get('task_id')}")
|
||||
except ValueError:
|
||||
pass
|
||||
if not found_expired:
|
||||
print(" No expired locks found.")
|
||||
else:
|
||||
print(" No locks file or empty.")
|
||||
|
||||
|
||||
def validate_state(root: Path) -> bool:
|
||||
"""Validate the structure of state files."""
|
||||
valid = True
|
||||
|
||||
# Validate tasks.yaml
|
||||
tasks_file = root / ".agent-state" / "tasks.yaml"
|
||||
tasks_data = load_yaml_file(tasks_file)
|
||||
|
||||
print("=== Validating State Files ===\n")
|
||||
|
||||
if tasks_data:
|
||||
print("[OK] tasks.yaml is valid YAML")
|
||||
if "version" not in tasks_data:
|
||||
print("[WARNING] tasks.yaml missing 'version' field")
|
||||
if "tasks" not in tasks_data:
|
||||
print("[WARNING] tasks.yaml missing 'tasks' field")
|
||||
else:
|
||||
if tasks_file.exists():
|
||||
print("[ERROR] tasks.yaml is not valid YAML")
|
||||
valid = False
|
||||
else:
|
||||
print("[WARNING] tasks.yaml does not exist")
|
||||
|
||||
# Validate locks.yaml
|
||||
locks_file = root / ".agent-state" / "locks.yaml"
|
||||
locks_data = load_yaml_file(locks_file)
|
||||
|
||||
if locks_data:
|
||||
print("[OK] locks.yaml is valid YAML")
|
||||
if "version" not in locks_data:
|
||||
print("[WARNING] locks.yaml missing 'version' field")
|
||||
if "locks" not in locks_data:
|
||||
print("[WARNING] locks.yaml missing 'locks' field")
|
||||
else:
|
||||
if locks_file.exists():
|
||||
print("[ERROR] locks.yaml is not valid YAML")
|
||||
valid = False
|
||||
else:
|
||||
print("[WARNING] locks.yaml does not exist")
|
||||
|
||||
return valid
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Main Entry Point
|
||||
# =============================================================================
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Multi-Agent Ownership Verification",
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
epilog=__doc__,
|
||||
)
|
||||
parser.add_argument("files", nargs="*", help="Files to check ownership for")
|
||||
parser.add_argument(
|
||||
"--check-commit-msg",
|
||||
metavar="FILE",
|
||||
nargs="?",
|
||||
const=".git/COMMIT_EDITMSG",
|
||||
help="Check commit message format",
|
||||
)
|
||||
parser.add_argument("--verify-setup", action="store_true", help="Verify agent environment")
|
||||
parser.add_argument("--list-expired", action="store_true", help="List expired tasks/locks")
|
||||
parser.add_argument(
|
||||
"--validate-state", action="store_true", help="Validate state file structure"
|
||||
)
|
||||
parser.add_argument("--strict", action="store_true", help="Treat warnings as errors")
|
||||
|
||||
args = parser.parse_args()
|
||||
root = get_project_root()
|
||||
|
||||
# Handle special commands
|
||||
if args.verify_setup:
|
||||
return 0 if verify_setup() else 1
|
||||
|
||||
if args.list_expired:
|
||||
list_expired(root)
|
||||
return 0
|
||||
|
||||
if args.validate_state:
|
||||
return 0 if validate_state(root) else 1
|
||||
|
||||
if args.check_commit_msg:
|
||||
return 0 if check_commit_message(args.check_commit_msg) else 1
|
||||
|
||||
# Default: check file ownership
|
||||
if not args.files:
|
||||
print("No files specified. Use --help for usage.")
|
||||
return 0
|
||||
|
||||
agent = get_agent_author()
|
||||
if not agent:
|
||||
print("Warning: SEO_AGENT_AUTHOR not set or invalid.")
|
||||
print(" Set with: export SEO_AGENT_AUTHOR=<agent>")
|
||||
print(" Ownership check skipped.")
|
||||
return 0
|
||||
|
||||
violations, warnings = verify_ownership(args.files, agent, root)
|
||||
|
||||
# Print results
|
||||
if warnings:
|
||||
print("\n=== Warnings ===")
|
||||
for warning in warnings:
|
||||
print(f" [WARN] {warning}")
|
||||
|
||||
if violations:
|
||||
print("\n=== Ownership Violations ===")
|
||||
for violation in violations:
|
||||
print(f" [ERROR] {violation}")
|
||||
print(f"\nAgent '{agent}' cannot modify these files.")
|
||||
print("See GUARDRAILS.md for ownership rules.")
|
||||
return 1
|
||||
|
||||
if args.strict and warnings:
|
||||
print("\n[STRICT MODE] Treating warnings as errors.")
|
||||
return 1
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
Reference in New Issue
Block a user