- Add 91-multi-agent-guide skill for setting up multi-agent collaboration with templates for AGENTS.md, Claude, Gemini, Codex configs, and CI/CD - Add USER-GUIDE.md for reference-curator documentation - Update default paths in reference-curator configs to use ~/Documents/05_AI Agent/10_Reference Library/ - Update settings-audit-report.md Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
497 lines
16 KiB
Python
497 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Multi-Agent Ownership Verification Script
|
|
|
|
This script enforces file ownership rules and lock verification for the
|
|
multi-agent workflow defined in GUARDRAILS.md.
|
|
|
|
Usage:
|
|
# Verify files before commit
|
|
python tools/check-ownership.py path/to/file.py
|
|
|
|
# Check commit message format
|
|
python tools/check-ownership.py --check-commit-msg
|
|
|
|
# Verify setup
|
|
python tools/check-ownership.py --verify-setup
|
|
|
|
# List expired locks/tasks
|
|
python tools/check-ownership.py --list-expired
|
|
|
|
# Validate state files
|
|
python tools/check-ownership.py --validate-state
|
|
|
|
Environment:
|
|
SEO_AGENT_AUTHOR: Agent identity (claude|gemini|codex|human)
|
|
"""
|
|
|
|
import argparse
|
|
import os
|
|
import re
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Set, Tuple
|
|
|
|
try:
|
|
import yaml
|
|
except ImportError:
|
|
print("Warning: PyYAML not installed. Install with: pip install pyyaml")
|
|
yaml = None
|
|
|
|
# =============================================================================
|
|
# CUSTOMIZE: Ownership Matrix
|
|
# =============================================================================
|
|
# Modify these patterns to match your project structure
|
|
|
|
# Primary ownership rules: pattern -> owner
|
|
OWNERSHIP_MATRIX: Dict[str, str] = {
|
|
# Claude ownership (customize for your project)
|
|
r"^src/.*/cli\.py$": "claude",
|
|
r"^src/.*/core/": "claude",
|
|
r"^src/.*/main\.py$": "claude",
|
|
r"^CLAUDE\.md$": "claude",
|
|
r"^AGENTS\.md$": "claude",
|
|
r"^GUARDRAILS\.md$": "claude",
|
|
# Gemini ownership
|
|
r"^src/.*/integrations/google/": "gemini",
|
|
r"^src/.*/integrations/gsc\.py$": "gemini",
|
|
r"^src/.*/integrations/ga.*\.py$": "gemini",
|
|
r"^docs/.*\.md$": "gemini",
|
|
r"^GEMINI\.md$": "gemini",
|
|
# Codex ownership
|
|
r"^src/.*/models\.py$": "codex",
|
|
r"^src/.*/utils/": "codex",
|
|
r"^tests/": "codex",
|
|
r"^CODEX\.md$": "codex",
|
|
}
|
|
|
|
# Shared files (require coordination, Claude approves)
|
|
SHARED_FILES: Set[str] = {
|
|
r"^pyproject\.toml$",
|
|
r"^package\.json$",
|
|
r"^config/.*\.yaml$",
|
|
r"^config/.*\.json$",
|
|
r"^\\.github/workflows/",
|
|
r"^PROJECT_PLAN\.md$",
|
|
}
|
|
|
|
# Files that any agent can modify without ownership check
|
|
UNRESTRICTED_FILES: Set[str] = {
|
|
r"^\\.agent-state/",
|
|
r"^\\.pre-commit-config\\.yaml$",
|
|
r"^\\.gitignore$",
|
|
r"^README\\.md$",
|
|
}
|
|
|
|
# Valid agents (add custom agents here)
|
|
VALID_AGENTS = {"claude", "gemini", "codex", "human"}
|
|
|
|
|
|
# =============================================================================
|
|
# Helper Functions
|
|
# =============================================================================
|
|
|
|
|
|
def get_project_root() -> Path:
|
|
"""Find the project root directory."""
|
|
current = Path(__file__).resolve().parent
|
|
while current != current.parent:
|
|
if (current / "pyproject.toml").exists():
|
|
return current
|
|
if (current / "package.json").exists():
|
|
return current
|
|
if (current / ".git").exists():
|
|
return current
|
|
current = current.parent
|
|
return Path.cwd()
|
|
|
|
|
|
def get_agent_author() -> Optional[str]:
|
|
"""Get the current agent from environment variable."""
|
|
agent = os.environ.get("SEO_AGENT_AUTHOR", "").lower()
|
|
return agent if agent in VALID_AGENTS else None
|
|
|
|
|
|
def load_yaml_file(filepath: Path) -> Optional[dict]:
|
|
"""Load a YAML file safely."""
|
|
if yaml is None:
|
|
return None
|
|
if not filepath.exists():
|
|
return None
|
|
try:
|
|
with open(filepath, "r") as f:
|
|
return yaml.safe_load(f)
|
|
except Exception as e:
|
|
print(f"Warning: Could not load {filepath}: {e}")
|
|
return None
|
|
|
|
|
|
def get_file_owner(filepath: str) -> Tuple[Optional[str], str]:
|
|
"""
|
|
Determine the owner of a file.
|
|
|
|
Returns:
|
|
Tuple of (owner, ownership_type) where ownership_type is
|
|
'primary', 'shared', 'unrestricted', or 'unknown'
|
|
"""
|
|
# Normalize path
|
|
filepath = filepath.replace("\\", "/").lstrip("./")
|
|
|
|
# Check unrestricted first
|
|
for pattern in UNRESTRICTED_FILES:
|
|
if re.match(pattern, filepath):
|
|
return None, "unrestricted"
|
|
|
|
# Check shared files
|
|
for pattern in SHARED_FILES:
|
|
if re.match(pattern, filepath):
|
|
return "shared", "shared"
|
|
|
|
# Check primary ownership
|
|
for pattern, owner in OWNERSHIP_MATRIX.items():
|
|
if re.match(pattern, filepath):
|
|
return owner, "primary"
|
|
|
|
return None, "unknown"
|
|
|
|
|
|
def is_file_locked(filepath: str, root: Path) -> Tuple[bool, Optional[dict]]:
|
|
"""
|
|
Check if a file is locked by another agent.
|
|
|
|
Returns:
|
|
Tuple of (is_locked, lock_info)
|
|
"""
|
|
locks_file = root / ".agent-state" / "locks.yaml"
|
|
locks_data = load_yaml_file(locks_file)
|
|
|
|
if not locks_data or "locks" not in locks_data:
|
|
return False, None
|
|
|
|
filepath = filepath.replace("\\", "/").lstrip("./")
|
|
current_time = datetime.now(timezone.utc)
|
|
|
|
for lock in locks_data.get("locks", []):
|
|
if not lock:
|
|
continue
|
|
lock_path = lock.get("path", "").replace("\\", "/").lstrip("./")
|
|
|
|
# Check if path matches (exact or prefix for directories)
|
|
if filepath == lock_path or filepath.startswith(lock_path + "/"):
|
|
# Check if lock is expired
|
|
expires_at = lock.get("expires_at")
|
|
if expires_at:
|
|
try:
|
|
expiry = datetime.fromisoformat(expires_at.replace("Z", "+00:00"))
|
|
if current_time > expiry:
|
|
continue # Lock expired
|
|
except ValueError:
|
|
pass
|
|
|
|
return True, lock
|
|
|
|
return False, None
|
|
|
|
|
|
def check_commit_message(msg_file: str) -> bool:
|
|
"""
|
|
Verify commit message follows the [Agent] type(scope): description format.
|
|
"""
|
|
try:
|
|
with open(msg_file, "r") as f:
|
|
first_line = f.readline().strip()
|
|
except Exception as e:
|
|
print(f"Error reading commit message: {e}")
|
|
return False
|
|
|
|
# Skip merge commits and fixup commits
|
|
if first_line.startswith("Merge ") or first_line.startswith("fixup!"):
|
|
return True
|
|
|
|
# Pattern: [Agent] type(scope): description
|
|
pattern = r"^\[(Claude|Gemini|Codex|Human|CI)\]\s+\w+(\([^)]+\))?:\s+.+"
|
|
if not re.match(pattern, first_line, re.IGNORECASE):
|
|
print("Error: Commit message must follow format: [Agent] type(scope): description")
|
|
print(f" Got: {first_line}")
|
|
print(" Example: [Claude] feat(core): add new feature")
|
|
print(" Valid agents: Claude, Gemini, Codex, Human, CI")
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
def verify_ownership(files: List[str], agent: str, root: Path) -> Tuple[List[str], List[str]]:
|
|
"""
|
|
Verify ownership for a list of files.
|
|
|
|
Returns:
|
|
Tuple of (violations, warnings)
|
|
"""
|
|
violations = []
|
|
warnings = []
|
|
|
|
for filepath in files:
|
|
owner, ownership_type = get_file_owner(filepath)
|
|
|
|
# Unrestricted files - anyone can modify
|
|
if ownership_type == "unrestricted":
|
|
continue
|
|
|
|
# Unknown files - warn but don't block
|
|
if ownership_type == "unknown":
|
|
warnings.append(f"Unknown ownership for: {filepath}")
|
|
continue
|
|
|
|
# Shared files - warn (Claude should approve)
|
|
if ownership_type == "shared":
|
|
if agent != "claude":
|
|
warnings.append(f"Shared file modified (requires Claude approval): {filepath}")
|
|
continue
|
|
|
|
# Primary ownership check
|
|
if owner and owner != agent:
|
|
# Claude can override any ownership
|
|
if agent == "claude":
|
|
warnings.append(f"Claude overriding {owner}'s ownership: {filepath}")
|
|
else:
|
|
violations.append(f"Ownership violation: {filepath} is owned by {owner}")
|
|
|
|
# Lock check
|
|
is_locked, lock_info = is_file_locked(filepath, root)
|
|
if is_locked and lock_info:
|
|
lock_agent = lock_info.get("agent", "unknown")
|
|
if lock_agent != agent:
|
|
# Claude can override locks
|
|
if agent == "claude" and lock_info.get("lock_type") != "override":
|
|
warnings.append(f"Claude overriding {lock_agent}'s lock: {filepath}")
|
|
else:
|
|
task_id = lock_info.get("task_id", "unknown")
|
|
violations.append(f"File locked by {lock_agent}: {filepath} (task: {task_id})")
|
|
|
|
return violations, warnings
|
|
|
|
|
|
def verify_setup() -> bool:
|
|
"""Verify that the agent environment is properly configured."""
|
|
agent = get_agent_author()
|
|
root = get_project_root()
|
|
|
|
print("=== Agent Environment Verification ===\n")
|
|
|
|
# Check environment variable
|
|
if agent:
|
|
print(f"[OK] SEO_AGENT_AUTHOR is set to: {agent}")
|
|
else:
|
|
env_val = os.environ.get("SEO_AGENT_AUTHOR", "")
|
|
if env_val:
|
|
print(f"[ERROR] Invalid SEO_AGENT_AUTHOR value: {env_val}")
|
|
print(f" Valid values: {', '.join(sorted(VALID_AGENTS))}")
|
|
else:
|
|
print("[WARNING] SEO_AGENT_AUTHOR is not set")
|
|
print(" Set with: export SEO_AGENT_AUTHOR=<agent>")
|
|
|
|
# Check state files
|
|
tasks_file = root / ".agent-state" / "tasks.yaml"
|
|
locks_file = root / ".agent-state" / "locks.yaml"
|
|
|
|
if tasks_file.exists():
|
|
print(f"[OK] Tasks file exists: {tasks_file}")
|
|
else:
|
|
print(f"[WARNING] Tasks file not found: {tasks_file}")
|
|
|
|
if locks_file.exists():
|
|
print(f"[OK] Locks file exists: {locks_file}")
|
|
else:
|
|
print(f"[WARNING] Locks file not found: {locks_file}")
|
|
|
|
# Check GUARDRAILS.md
|
|
guardrails = root / "GUARDRAILS.md"
|
|
if guardrails.exists():
|
|
print("[OK] GUARDRAILS.md exists")
|
|
else:
|
|
print("[ERROR] GUARDRAILS.md not found")
|
|
|
|
print("\n=== Setup Complete ===")
|
|
return agent is not None
|
|
|
|
|
|
def list_expired(root: Path) -> None:
|
|
"""List expired tasks and locks."""
|
|
current_time = datetime.now(timezone.utc)
|
|
|
|
# Check tasks
|
|
tasks_file = root / ".agent-state" / "tasks.yaml"
|
|
tasks_data = load_yaml_file(tasks_file)
|
|
|
|
print("=== Expired Tasks ===\n")
|
|
if tasks_data and "tasks" in tasks_data:
|
|
found_expired = False
|
|
for task in tasks_data.get("tasks", []):
|
|
if not task or task.get("status") in ("completed", "abandoned"):
|
|
continue
|
|
expires_at = task.get("expires_at")
|
|
if expires_at:
|
|
try:
|
|
expiry = datetime.fromisoformat(expires_at.replace("Z", "+00:00"))
|
|
if current_time > expiry:
|
|
found_expired = True
|
|
print(f" - {task.get('id')}: {task.get('description')}")
|
|
print(f" Agent: {task.get('agent')}, Expired: {expires_at}")
|
|
except ValueError:
|
|
pass
|
|
if not found_expired:
|
|
print(" No expired tasks found.")
|
|
else:
|
|
print(" No tasks file or empty.")
|
|
|
|
# Check locks
|
|
locks_file = root / ".agent-state" / "locks.yaml"
|
|
locks_data = load_yaml_file(locks_file)
|
|
|
|
print("\n=== Expired Locks ===\n")
|
|
if locks_data and "locks" in locks_data:
|
|
found_expired = False
|
|
for lock in locks_data.get("locks", []):
|
|
if not lock:
|
|
continue
|
|
expires_at = lock.get("expires_at")
|
|
if expires_at:
|
|
try:
|
|
expiry = datetime.fromisoformat(expires_at.replace("Z", "+00:00"))
|
|
if current_time > expiry:
|
|
found_expired = True
|
|
print(f" - {lock.get('path')}")
|
|
print(f" Agent: {lock.get('agent')}, Task: {lock.get('task_id')}")
|
|
except ValueError:
|
|
pass
|
|
if not found_expired:
|
|
print(" No expired locks found.")
|
|
else:
|
|
print(" No locks file or empty.")
|
|
|
|
|
|
def validate_state(root: Path) -> bool:
|
|
"""Validate the structure of state files."""
|
|
valid = True
|
|
|
|
# Validate tasks.yaml
|
|
tasks_file = root / ".agent-state" / "tasks.yaml"
|
|
tasks_data = load_yaml_file(tasks_file)
|
|
|
|
print("=== Validating State Files ===\n")
|
|
|
|
if tasks_data:
|
|
print("[OK] tasks.yaml is valid YAML")
|
|
if "version" not in tasks_data:
|
|
print("[WARNING] tasks.yaml missing 'version' field")
|
|
if "tasks" not in tasks_data:
|
|
print("[WARNING] tasks.yaml missing 'tasks' field")
|
|
else:
|
|
if tasks_file.exists():
|
|
print("[ERROR] tasks.yaml is not valid YAML")
|
|
valid = False
|
|
else:
|
|
print("[WARNING] tasks.yaml does not exist")
|
|
|
|
# Validate locks.yaml
|
|
locks_file = root / ".agent-state" / "locks.yaml"
|
|
locks_data = load_yaml_file(locks_file)
|
|
|
|
if locks_data:
|
|
print("[OK] locks.yaml is valid YAML")
|
|
if "version" not in locks_data:
|
|
print("[WARNING] locks.yaml missing 'version' field")
|
|
if "locks" not in locks_data:
|
|
print("[WARNING] locks.yaml missing 'locks' field")
|
|
else:
|
|
if locks_file.exists():
|
|
print("[ERROR] locks.yaml is not valid YAML")
|
|
valid = False
|
|
else:
|
|
print("[WARNING] locks.yaml does not exist")
|
|
|
|
return valid
|
|
|
|
|
|
# =============================================================================
|
|
# Main Entry Point
|
|
# =============================================================================
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(
|
|
description="Multi-Agent Ownership Verification",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog=__doc__,
|
|
)
|
|
parser.add_argument("files", nargs="*", help="Files to check ownership for")
|
|
parser.add_argument(
|
|
"--check-commit-msg",
|
|
metavar="FILE",
|
|
nargs="?",
|
|
const=".git/COMMIT_EDITMSG",
|
|
help="Check commit message format",
|
|
)
|
|
parser.add_argument("--verify-setup", action="store_true", help="Verify agent environment")
|
|
parser.add_argument("--list-expired", action="store_true", help="List expired tasks/locks")
|
|
parser.add_argument(
|
|
"--validate-state", action="store_true", help="Validate state file structure"
|
|
)
|
|
parser.add_argument("--strict", action="store_true", help="Treat warnings as errors")
|
|
|
|
args = parser.parse_args()
|
|
root = get_project_root()
|
|
|
|
# Handle special commands
|
|
if args.verify_setup:
|
|
return 0 if verify_setup() else 1
|
|
|
|
if args.list_expired:
|
|
list_expired(root)
|
|
return 0
|
|
|
|
if args.validate_state:
|
|
return 0 if validate_state(root) else 1
|
|
|
|
if args.check_commit_msg:
|
|
return 0 if check_commit_message(args.check_commit_msg) else 1
|
|
|
|
# Default: check file ownership
|
|
if not args.files:
|
|
print("No files specified. Use --help for usage.")
|
|
return 0
|
|
|
|
agent = get_agent_author()
|
|
if not agent:
|
|
print("Warning: SEO_AGENT_AUTHOR not set or invalid.")
|
|
print(" Set with: export SEO_AGENT_AUTHOR=<agent>")
|
|
print(" Ownership check skipped.")
|
|
return 0
|
|
|
|
violations, warnings = verify_ownership(args.files, agent, root)
|
|
|
|
# Print results
|
|
if warnings:
|
|
print("\n=== Warnings ===")
|
|
for warning in warnings:
|
|
print(f" [WARN] {warning}")
|
|
|
|
if violations:
|
|
print("\n=== Ownership Violations ===")
|
|
for violation in violations:
|
|
print(f" [ERROR] {violation}")
|
|
print(f"\nAgent '{agent}' cannot modify these files.")
|
|
print("See GUARDRAILS.md for ownership rules.")
|
|
return 1
|
|
|
|
if args.strict and warnings:
|
|
print("\n[STRICT MODE] Treating warnings as errors.")
|
|
return 1
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|