#!/usr/bin/env python3 """ Multi-Agent Ownership Verification Script This script enforces file ownership rules and lock verification for the multi-agent workflow defined in GUARDRAILS.md. Usage: # Verify files before commit python tools/check-ownership.py path/to/file.py # Check commit message format python tools/check-ownership.py --check-commit-msg # Verify setup python tools/check-ownership.py --verify-setup # List expired locks/tasks python tools/check-ownership.py --list-expired # Validate state files python tools/check-ownership.py --validate-state Environment: SEO_AGENT_AUTHOR: Agent identity (claude|gemini|codex|human) """ import argparse import os import re import sys from datetime import datetime, timezone from pathlib import Path from typing import Dict, List, Optional, Set, Tuple try: import yaml except ImportError: print("Warning: PyYAML not installed. Install with: pip install pyyaml") yaml = None # ============================================================================= # CUSTOMIZE: Ownership Matrix # ============================================================================= # Modify these patterns to match your project structure # Primary ownership rules: pattern -> owner OWNERSHIP_MATRIX: Dict[str, str] = { # Claude ownership (customize for your project) r"^src/.*/cli\.py$": "claude", r"^src/.*/core/": "claude", r"^src/.*/main\.py$": "claude", r"^CLAUDE\.md$": "claude", r"^AGENTS\.md$": "claude", r"^GUARDRAILS\.md$": "claude", # Gemini ownership r"^src/.*/integrations/google/": "gemini", r"^src/.*/integrations/gsc\.py$": "gemini", r"^src/.*/integrations/ga.*\.py$": "gemini", r"^docs/.*\.md$": "gemini", r"^GEMINI\.md$": "gemini", # Codex ownership r"^src/.*/models\.py$": "codex", r"^src/.*/utils/": "codex", r"^tests/": "codex", r"^CODEX\.md$": "codex", } # Shared files (require coordination, Claude approves) SHARED_FILES: Set[str] = { r"^pyproject\.toml$", r"^package\.json$", r"^config/.*\.yaml$", r"^config/.*\.json$", r"^\\.github/workflows/", r"^PROJECT_PLAN\.md$", } # Files that any agent can modify without ownership check UNRESTRICTED_FILES: Set[str] = { r"^\\.agent-state/", r"^\\.pre-commit-config\\.yaml$", r"^\\.gitignore$", r"^README\\.md$", } # Valid agents (add custom agents here) VALID_AGENTS = {"claude", "gemini", "codex", "human"} # ============================================================================= # Helper Functions # ============================================================================= def get_project_root() -> Path: """Find the project root directory.""" current = Path(__file__).resolve().parent while current != current.parent: if (current / "pyproject.toml").exists(): return current if (current / "package.json").exists(): return current if (current / ".git").exists(): return current current = current.parent return Path.cwd() def get_agent_author() -> Optional[str]: """Get the current agent from environment variable.""" agent = os.environ.get("SEO_AGENT_AUTHOR", "").lower() return agent if agent in VALID_AGENTS else None def load_yaml_file(filepath: Path) -> Optional[dict]: """Load a YAML file safely.""" if yaml is None: return None if not filepath.exists(): return None try: with open(filepath, "r") as f: return yaml.safe_load(f) except Exception as e: print(f"Warning: Could not load {filepath}: {e}") return None def get_file_owner(filepath: str) -> Tuple[Optional[str], str]: """ Determine the owner of a file. Returns: Tuple of (owner, ownership_type) where ownership_type is 'primary', 'shared', 'unrestricted', or 'unknown' """ # Normalize path filepath = filepath.replace("\\", "/").lstrip("./") # Check unrestricted first for pattern in UNRESTRICTED_FILES: if re.match(pattern, filepath): return None, "unrestricted" # Check shared files for pattern in SHARED_FILES: if re.match(pattern, filepath): return "shared", "shared" # Check primary ownership for pattern, owner in OWNERSHIP_MATRIX.items(): if re.match(pattern, filepath): return owner, "primary" return None, "unknown" def is_file_locked(filepath: str, root: Path) -> Tuple[bool, Optional[dict]]: """ Check if a file is locked by another agent. Returns: Tuple of (is_locked, lock_info) """ locks_file = root / ".agent-state" / "locks.yaml" locks_data = load_yaml_file(locks_file) if not locks_data or "locks" not in locks_data: return False, None filepath = filepath.replace("\\", "/").lstrip("./") current_time = datetime.now(timezone.utc) for lock in locks_data.get("locks", []): if not lock: continue lock_path = lock.get("path", "").replace("\\", "/").lstrip("./") # Check if path matches (exact or prefix for directories) if filepath == lock_path or filepath.startswith(lock_path + "/"): # Check if lock is expired expires_at = lock.get("expires_at") if expires_at: try: expiry = datetime.fromisoformat(expires_at.replace("Z", "+00:00")) if current_time > expiry: continue # Lock expired except ValueError: pass return True, lock return False, None def check_commit_message(msg_file: str) -> bool: """ Verify commit message follows the [Agent] type(scope): description format. """ try: with open(msg_file, "r") as f: first_line = f.readline().strip() except Exception as e: print(f"Error reading commit message: {e}") return False # Skip merge commits and fixup commits if first_line.startswith("Merge ") or first_line.startswith("fixup!"): return True # Pattern: [Agent] type(scope): description pattern = r"^\[(Claude|Gemini|Codex|Human|CI)\]\s+\w+(\([^)]+\))?:\s+.+" if not re.match(pattern, first_line, re.IGNORECASE): print("Error: Commit message must follow format: [Agent] type(scope): description") print(f" Got: {first_line}") print(" Example: [Claude] feat(core): add new feature") print(" Valid agents: Claude, Gemini, Codex, Human, CI") return False return True def verify_ownership(files: List[str], agent: str, root: Path) -> Tuple[List[str], List[str]]: """ Verify ownership for a list of files. Returns: Tuple of (violations, warnings) """ violations = [] warnings = [] for filepath in files: owner, ownership_type = get_file_owner(filepath) # Unrestricted files - anyone can modify if ownership_type == "unrestricted": continue # Unknown files - warn but don't block if ownership_type == "unknown": warnings.append(f"Unknown ownership for: {filepath}") continue # Shared files - warn (Claude should approve) if ownership_type == "shared": if agent != "claude": warnings.append(f"Shared file modified (requires Claude approval): {filepath}") continue # Primary ownership check if owner and owner != agent: # Claude can override any ownership if agent == "claude": warnings.append(f"Claude overriding {owner}'s ownership: {filepath}") else: violations.append(f"Ownership violation: {filepath} is owned by {owner}") # Lock check is_locked, lock_info = is_file_locked(filepath, root) if is_locked and lock_info: lock_agent = lock_info.get("agent", "unknown") if lock_agent != agent: # Claude can override locks if agent == "claude" and lock_info.get("lock_type") != "override": warnings.append(f"Claude overriding {lock_agent}'s lock: {filepath}") else: task_id = lock_info.get("task_id", "unknown") violations.append(f"File locked by {lock_agent}: {filepath} (task: {task_id})") return violations, warnings def verify_setup() -> bool: """Verify that the agent environment is properly configured.""" agent = get_agent_author() root = get_project_root() print("=== Agent Environment Verification ===\n") # Check environment variable if agent: print(f"[OK] SEO_AGENT_AUTHOR is set to: {agent}") else: env_val = os.environ.get("SEO_AGENT_AUTHOR", "") if env_val: print(f"[ERROR] Invalid SEO_AGENT_AUTHOR value: {env_val}") print(f" Valid values: {', '.join(sorted(VALID_AGENTS))}") else: print("[WARNING] SEO_AGENT_AUTHOR is not set") print(" Set with: export SEO_AGENT_AUTHOR=") # Check state files tasks_file = root / ".agent-state" / "tasks.yaml" locks_file = root / ".agent-state" / "locks.yaml" if tasks_file.exists(): print(f"[OK] Tasks file exists: {tasks_file}") else: print(f"[WARNING] Tasks file not found: {tasks_file}") if locks_file.exists(): print(f"[OK] Locks file exists: {locks_file}") else: print(f"[WARNING] Locks file not found: {locks_file}") # Check GUARDRAILS.md guardrails = root / "GUARDRAILS.md" if guardrails.exists(): print("[OK] GUARDRAILS.md exists") else: print("[ERROR] GUARDRAILS.md not found") print("\n=== Setup Complete ===") return agent is not None def list_expired(root: Path) -> None: """List expired tasks and locks.""" current_time = datetime.now(timezone.utc) # Check tasks tasks_file = root / ".agent-state" / "tasks.yaml" tasks_data = load_yaml_file(tasks_file) print("=== Expired Tasks ===\n") if tasks_data and "tasks" in tasks_data: found_expired = False for task in tasks_data.get("tasks", []): if not task or task.get("status") in ("completed", "abandoned"): continue expires_at = task.get("expires_at") if expires_at: try: expiry = datetime.fromisoformat(expires_at.replace("Z", "+00:00")) if current_time > expiry: found_expired = True print(f" - {task.get('id')}: {task.get('description')}") print(f" Agent: {task.get('agent')}, Expired: {expires_at}") except ValueError: pass if not found_expired: print(" No expired tasks found.") else: print(" No tasks file or empty.") # Check locks locks_file = root / ".agent-state" / "locks.yaml" locks_data = load_yaml_file(locks_file) print("\n=== Expired Locks ===\n") if locks_data and "locks" in locks_data: found_expired = False for lock in locks_data.get("locks", []): if not lock: continue expires_at = lock.get("expires_at") if expires_at: try: expiry = datetime.fromisoformat(expires_at.replace("Z", "+00:00")) if current_time > expiry: found_expired = True print(f" - {lock.get('path')}") print(f" Agent: {lock.get('agent')}, Task: {lock.get('task_id')}") except ValueError: pass if not found_expired: print(" No expired locks found.") else: print(" No locks file or empty.") def validate_state(root: Path) -> bool: """Validate the structure of state files.""" valid = True # Validate tasks.yaml tasks_file = root / ".agent-state" / "tasks.yaml" tasks_data = load_yaml_file(tasks_file) print("=== Validating State Files ===\n") if tasks_data: print("[OK] tasks.yaml is valid YAML") if "version" not in tasks_data: print("[WARNING] tasks.yaml missing 'version' field") if "tasks" not in tasks_data: print("[WARNING] tasks.yaml missing 'tasks' field") else: if tasks_file.exists(): print("[ERROR] tasks.yaml is not valid YAML") valid = False else: print("[WARNING] tasks.yaml does not exist") # Validate locks.yaml locks_file = root / ".agent-state" / "locks.yaml" locks_data = load_yaml_file(locks_file) if locks_data: print("[OK] locks.yaml is valid YAML") if "version" not in locks_data: print("[WARNING] locks.yaml missing 'version' field") if "locks" not in locks_data: print("[WARNING] locks.yaml missing 'locks' field") else: if locks_file.exists(): print("[ERROR] locks.yaml is not valid YAML") valid = False else: print("[WARNING] locks.yaml does not exist") return valid # ============================================================================= # Main Entry Point # ============================================================================= def main() -> int: parser = argparse.ArgumentParser( description="Multi-Agent Ownership Verification", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=__doc__, ) parser.add_argument("files", nargs="*", help="Files to check ownership for") parser.add_argument( "--check-commit-msg", metavar="FILE", nargs="?", const=".git/COMMIT_EDITMSG", help="Check commit message format", ) parser.add_argument("--verify-setup", action="store_true", help="Verify agent environment") parser.add_argument("--list-expired", action="store_true", help="List expired tasks/locks") parser.add_argument( "--validate-state", action="store_true", help="Validate state file structure" ) parser.add_argument("--strict", action="store_true", help="Treat warnings as errors") args = parser.parse_args() root = get_project_root() # Handle special commands if args.verify_setup: return 0 if verify_setup() else 1 if args.list_expired: list_expired(root) return 0 if args.validate_state: return 0 if validate_state(root) else 1 if args.check_commit_msg: return 0 if check_commit_message(args.check_commit_msg) else 1 # Default: check file ownership if not args.files: print("No files specified. Use --help for usage.") return 0 agent = get_agent_author() if not agent: print("Warning: SEO_AGENT_AUTHOR not set or invalid.") print(" Set with: export SEO_AGENT_AUTHOR=") print(" Ownership check skipped.") return 0 violations, warnings = verify_ownership(args.files, agent, root) # Print results if warnings: print("\n=== Warnings ===") for warning in warnings: print(f" [WARN] {warning}") if violations: print("\n=== Ownership Violations ===") for violation in violations: print(f" [ERROR] {violation}") print(f"\nAgent '{agent}' cannot modify these files.") print("See GUARDRAILS.md for ownership rules.") return 1 if args.strict and warnings: print("\n[STRICT MODE] Treating warnings as errors.") return 1 return 0 if __name__ == "__main__": sys.exit(main())