Major refactoring of ourdigital-custom-skills with new numbering system: ## Structure Changes - Each skill now has code/ (Claude Code) and desktop/ (Claude Desktop) versions - New progressive numbering: 01-09 General, 10-19 SEO, 20-29 GTM, 30-39 OurDigital, 40-49 Jamie ## Skill Reorganization - 01-notion-organizer (from 02) - 10-18: SEO tools split into focused skills (technical, on-page, local, schema, vitals, gsc, gateway) - 20-21: GTM audit and manager - 30-32: OurDigital designer, research, presentation - 40-41: Jamie brand editor and audit ## New Files - .claude/commands/: Slash command definitions for all skills - CLAUDE.md: Updated with new skill structure documentation - REFACTORING_PLAN.md: Migration documentation - COMPATIBILITY_REPORT.md, SKILLS_COMPARISON.md: Analysis docs ## Removed - Old skill directories (02-05, 10-14, 20-21 old numbering) - Consolidated into new structure with _archive/ for reference 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
468 lines
16 KiB
Python
468 lines
16 KiB
Python
"""
|
|
Sitemap Validator - Validate XML sitemaps
|
|
==========================================
|
|
Purpose: Parse and validate XML sitemaps for SEO compliance
|
|
Python: 3.10+
|
|
Usage:
|
|
python sitemap_validator.py --url https://example.com/sitemap.xml
|
|
"""
|
|
|
|
import argparse
|
|
import asyncio
|
|
import gzip
|
|
import json
|
|
import logging
|
|
import re
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
from io import BytesIO
|
|
from typing import Any
|
|
from urllib.parse import urljoin, urlparse
|
|
|
|
import aiohttp
|
|
import requests
|
|
from lxml import etree
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s - %(levelname)s - %(message)s",
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class SitemapIssue:
|
|
"""Represents a sitemap validation issue."""
|
|
|
|
severity: str # "error", "warning", "info"
|
|
message: str
|
|
url: str | None = None
|
|
suggestion: str | None = None
|
|
|
|
|
|
@dataclass
|
|
class SitemapEntry:
|
|
"""Represents a single URL entry in sitemap."""
|
|
|
|
loc: str
|
|
lastmod: str | None = None
|
|
changefreq: str | None = None
|
|
priority: float | None = None
|
|
status_code: int | None = None
|
|
|
|
|
|
@dataclass
|
|
class SitemapResult:
|
|
"""Complete sitemap validation result."""
|
|
|
|
url: str
|
|
sitemap_type: str # "urlset" or "sitemapindex"
|
|
entries: list[SitemapEntry] = field(default_factory=list)
|
|
child_sitemaps: list[str] = field(default_factory=list)
|
|
issues: list[SitemapIssue] = field(default_factory=list)
|
|
valid: bool = True
|
|
stats: dict = field(default_factory=dict)
|
|
timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
|
|
|
|
def to_dict(self) -> dict:
|
|
"""Convert to dictionary for JSON output."""
|
|
return {
|
|
"url": self.url,
|
|
"sitemap_type": self.sitemap_type,
|
|
"valid": self.valid,
|
|
"stats": self.stats,
|
|
"issues": [
|
|
{
|
|
"severity": i.severity,
|
|
"message": i.message,
|
|
"url": i.url,
|
|
"suggestion": i.suggestion,
|
|
}
|
|
for i in self.issues
|
|
],
|
|
"entries_count": len(self.entries),
|
|
"child_sitemaps": self.child_sitemaps,
|
|
"timestamp": self.timestamp,
|
|
}
|
|
|
|
|
|
class SitemapValidator:
|
|
"""Validate XML sitemaps."""
|
|
|
|
SITEMAP_NS = "http://www.sitemaps.org/schemas/sitemap/0.9"
|
|
MAX_URLS = 50000
|
|
MAX_SIZE_BYTES = 50 * 1024 * 1024 # 50MB
|
|
|
|
VALID_CHANGEFREQ = {
|
|
"always", "hourly", "daily", "weekly",
|
|
"monthly", "yearly", "never"
|
|
}
|
|
|
|
def __init__(self, check_urls: bool = False, max_concurrent: int = 10):
|
|
self.check_urls = check_urls
|
|
self.max_concurrent = max_concurrent
|
|
self.session = requests.Session()
|
|
self.session.headers.update({
|
|
"User-Agent": "Mozilla/5.0 (compatible; SEOAuditBot/1.0)"
|
|
})
|
|
|
|
def fetch_sitemap(self, url: str) -> tuple[bytes, bool]:
|
|
"""Fetch sitemap content, handling gzip compression."""
|
|
try:
|
|
response = self.session.get(url, timeout=30)
|
|
response.raise_for_status()
|
|
|
|
content = response.content
|
|
is_gzipped = False
|
|
|
|
# Check if gzipped
|
|
if url.endswith(".gz") or response.headers.get(
|
|
"Content-Encoding"
|
|
) == "gzip":
|
|
try:
|
|
content = gzip.decompress(content)
|
|
is_gzipped = True
|
|
except gzip.BadGzipFile:
|
|
pass
|
|
|
|
return content, is_gzipped
|
|
except requests.RequestException as e:
|
|
raise RuntimeError(f"Failed to fetch sitemap: {e}")
|
|
|
|
def parse_sitemap(self, content: bytes) -> tuple[str, list[dict]]:
|
|
"""Parse sitemap XML content."""
|
|
try:
|
|
root = etree.fromstring(content)
|
|
except etree.XMLSyntaxError as e:
|
|
raise ValueError(f"Invalid XML: {e}")
|
|
|
|
# Remove namespace for easier parsing
|
|
nsmap = {"sm": self.SITEMAP_NS}
|
|
|
|
# Check if it's a sitemap index or urlset
|
|
if root.tag == f"{{{self.SITEMAP_NS}}}sitemapindex":
|
|
sitemap_type = "sitemapindex"
|
|
entries = []
|
|
for sitemap in root.findall("sm:sitemap", nsmap):
|
|
entry = {}
|
|
loc = sitemap.find("sm:loc", nsmap)
|
|
if loc is not None and loc.text:
|
|
entry["loc"] = loc.text.strip()
|
|
lastmod = sitemap.find("sm:lastmod", nsmap)
|
|
if lastmod is not None and lastmod.text:
|
|
entry["lastmod"] = lastmod.text.strip()
|
|
if entry.get("loc"):
|
|
entries.append(entry)
|
|
elif root.tag == f"{{{self.SITEMAP_NS}}}urlset":
|
|
sitemap_type = "urlset"
|
|
entries = []
|
|
for url in root.findall("sm:url", nsmap):
|
|
entry = {}
|
|
loc = url.find("sm:loc", nsmap)
|
|
if loc is not None and loc.text:
|
|
entry["loc"] = loc.text.strip()
|
|
lastmod = url.find("sm:lastmod", nsmap)
|
|
if lastmod is not None and lastmod.text:
|
|
entry["lastmod"] = lastmod.text.strip()
|
|
changefreq = url.find("sm:changefreq", nsmap)
|
|
if changefreq is not None and changefreq.text:
|
|
entry["changefreq"] = changefreq.text.strip().lower()
|
|
priority = url.find("sm:priority", nsmap)
|
|
if priority is not None and priority.text:
|
|
try:
|
|
entry["priority"] = float(priority.text.strip())
|
|
except ValueError:
|
|
entry["priority"] = None
|
|
if entry.get("loc"):
|
|
entries.append(entry)
|
|
else:
|
|
raise ValueError(f"Unknown sitemap type: {root.tag}")
|
|
|
|
return sitemap_type, entries
|
|
|
|
def validate(self, url: str) -> SitemapResult:
|
|
"""Validate a sitemap URL."""
|
|
result = SitemapResult(url=url, sitemap_type="unknown")
|
|
|
|
# Fetch sitemap
|
|
try:
|
|
content, is_gzipped = self.fetch_sitemap(url)
|
|
except RuntimeError as e:
|
|
result.issues.append(SitemapIssue(
|
|
severity="error",
|
|
message=str(e),
|
|
url=url,
|
|
))
|
|
result.valid = False
|
|
return result
|
|
|
|
# Check size
|
|
if len(content) > self.MAX_SIZE_BYTES:
|
|
result.issues.append(SitemapIssue(
|
|
severity="error",
|
|
message=f"Sitemap exceeds 50MB limit ({len(content) / 1024 / 1024:.2f}MB)",
|
|
url=url,
|
|
suggestion="Split sitemap into smaller files using sitemap index",
|
|
))
|
|
|
|
# Parse XML
|
|
try:
|
|
sitemap_type, entries = self.parse_sitemap(content)
|
|
except ValueError as e:
|
|
result.issues.append(SitemapIssue(
|
|
severity="error",
|
|
message=str(e),
|
|
url=url,
|
|
))
|
|
result.valid = False
|
|
return result
|
|
|
|
result.sitemap_type = sitemap_type
|
|
|
|
# Process entries
|
|
if sitemap_type == "sitemapindex":
|
|
result.child_sitemaps = [e["loc"] for e in entries]
|
|
result.stats = {
|
|
"child_sitemaps_count": len(entries),
|
|
}
|
|
else:
|
|
# Validate URL entries
|
|
url_count = len(entries)
|
|
result.stats["url_count"] = url_count
|
|
|
|
if url_count > self.MAX_URLS:
|
|
result.issues.append(SitemapIssue(
|
|
severity="error",
|
|
message=f"Sitemap exceeds 50,000 URL limit ({url_count} URLs)",
|
|
url=url,
|
|
suggestion="Split into multiple sitemaps with sitemap index",
|
|
))
|
|
|
|
if url_count == 0:
|
|
result.issues.append(SitemapIssue(
|
|
severity="warning",
|
|
message="Sitemap is empty (no URLs)",
|
|
url=url,
|
|
))
|
|
|
|
# Validate individual entries
|
|
seen_urls = set()
|
|
invalid_lastmod = 0
|
|
invalid_changefreq = 0
|
|
invalid_priority = 0
|
|
|
|
for entry in entries:
|
|
loc = entry.get("loc", "")
|
|
|
|
# Check for duplicates
|
|
if loc in seen_urls:
|
|
result.issues.append(SitemapIssue(
|
|
severity="warning",
|
|
message="Duplicate URL in sitemap",
|
|
url=loc,
|
|
))
|
|
seen_urls.add(loc)
|
|
|
|
# Validate lastmod format
|
|
lastmod = entry.get("lastmod")
|
|
if lastmod:
|
|
if not self._validate_date(lastmod):
|
|
invalid_lastmod += 1
|
|
|
|
# Validate changefreq
|
|
changefreq = entry.get("changefreq")
|
|
if changefreq and changefreq not in self.VALID_CHANGEFREQ:
|
|
invalid_changefreq += 1
|
|
|
|
# Validate priority
|
|
priority = entry.get("priority")
|
|
if priority is not None:
|
|
if not (0.0 <= priority <= 1.0):
|
|
invalid_priority += 1
|
|
|
|
# Create entry object
|
|
result.entries.append(SitemapEntry(
|
|
loc=loc,
|
|
lastmod=lastmod,
|
|
changefreq=changefreq,
|
|
priority=priority,
|
|
))
|
|
|
|
# Add summary issues
|
|
if invalid_lastmod > 0:
|
|
result.issues.append(SitemapIssue(
|
|
severity="warning",
|
|
message=f"{invalid_lastmod} URLs with invalid lastmod format",
|
|
suggestion="Use ISO 8601 format (YYYY-MM-DD or YYYY-MM-DDTHH:MM:SS+TZ)",
|
|
))
|
|
|
|
if invalid_changefreq > 0:
|
|
result.issues.append(SitemapIssue(
|
|
severity="info",
|
|
message=f"{invalid_changefreq} URLs with invalid changefreq",
|
|
suggestion="Use: always, hourly, daily, weekly, monthly, yearly, never",
|
|
))
|
|
|
|
if invalid_priority > 0:
|
|
result.issues.append(SitemapIssue(
|
|
severity="warning",
|
|
message=f"{invalid_priority} URLs with invalid priority (must be 0.0-1.0)",
|
|
))
|
|
|
|
result.stats.update({
|
|
"invalid_lastmod": invalid_lastmod,
|
|
"invalid_changefreq": invalid_changefreq,
|
|
"invalid_priority": invalid_priority,
|
|
"has_lastmod": sum(1 for e in result.entries if e.lastmod),
|
|
"has_changefreq": sum(1 for e in result.entries if e.changefreq),
|
|
"has_priority": sum(1 for e in result.entries if e.priority is not None),
|
|
})
|
|
|
|
# Check URLs if requested
|
|
if self.check_urls and result.entries:
|
|
asyncio.run(self._check_url_status(result))
|
|
|
|
# Determine validity
|
|
result.valid = not any(i.severity == "error" for i in result.issues)
|
|
|
|
return result
|
|
|
|
def _validate_date(self, date_str: str) -> bool:
|
|
"""Validate ISO 8601 date format."""
|
|
patterns = [
|
|
r"^\d{4}-\d{2}-\d{2}$",
|
|
r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}",
|
|
]
|
|
return any(re.match(p, date_str) for p in patterns)
|
|
|
|
async def _check_url_status(self, result: SitemapResult) -> None:
|
|
"""Check HTTP status of URLs in sitemap."""
|
|
semaphore = asyncio.Semaphore(self.max_concurrent)
|
|
|
|
async def check_url(entry: SitemapEntry) -> None:
|
|
async with semaphore:
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.head(
|
|
entry.loc,
|
|
timeout=aiohttp.ClientTimeout(total=10),
|
|
allow_redirects=True,
|
|
) as response:
|
|
entry.status_code = response.status
|
|
except Exception:
|
|
entry.status_code = 0
|
|
|
|
await asyncio.gather(*[check_url(e) for e in result.entries[:100]])
|
|
|
|
# Count status codes
|
|
status_counts = {}
|
|
for entry in result.entries:
|
|
if entry.status_code:
|
|
status_counts[entry.status_code] = (
|
|
status_counts.get(entry.status_code, 0) + 1
|
|
)
|
|
|
|
result.stats["url_status_codes"] = status_counts
|
|
|
|
# Add issues for non-200 URLs
|
|
error_count = sum(
|
|
1 for e in result.entries
|
|
if e.status_code and e.status_code >= 400
|
|
)
|
|
if error_count > 0:
|
|
result.issues.append(SitemapIssue(
|
|
severity="warning",
|
|
message=f"{error_count} URLs returning error status codes (4xx/5xx)",
|
|
suggestion="Remove or fix broken URLs in sitemap",
|
|
))
|
|
|
|
def generate_report(self, result: SitemapResult) -> str:
|
|
"""Generate human-readable validation report."""
|
|
lines = [
|
|
"=" * 60,
|
|
"Sitemap Validation Report",
|
|
"=" * 60,
|
|
f"URL: {result.url}",
|
|
f"Type: {result.sitemap_type}",
|
|
f"Valid: {'Yes' if result.valid else 'No'}",
|
|
f"Timestamp: {result.timestamp}",
|
|
"",
|
|
]
|
|
|
|
lines.append("Statistics:")
|
|
for key, value in result.stats.items():
|
|
lines.append(f" {key}: {value}")
|
|
lines.append("")
|
|
|
|
if result.child_sitemaps:
|
|
lines.append(f"Child Sitemaps ({len(result.child_sitemaps)}):")
|
|
for sitemap in result.child_sitemaps[:10]:
|
|
lines.append(f" - {sitemap}")
|
|
if len(result.child_sitemaps) > 10:
|
|
lines.append(f" ... and {len(result.child_sitemaps) - 10} more")
|
|
lines.append("")
|
|
|
|
if result.issues:
|
|
lines.append("Issues Found:")
|
|
errors = [i for i in result.issues if i.severity == "error"]
|
|
warnings = [i for i in result.issues if i.severity == "warning"]
|
|
infos = [i for i in result.issues if i.severity == "info"]
|
|
|
|
if errors:
|
|
lines.append(f"\n ERRORS ({len(errors)}):")
|
|
for issue in errors:
|
|
lines.append(f" - {issue.message}")
|
|
if issue.url:
|
|
lines.append(f" URL: {issue.url}")
|
|
if issue.suggestion:
|
|
lines.append(f" Suggestion: {issue.suggestion}")
|
|
|
|
if warnings:
|
|
lines.append(f"\n WARNINGS ({len(warnings)}):")
|
|
for issue in warnings:
|
|
lines.append(f" - {issue.message}")
|
|
if issue.suggestion:
|
|
lines.append(f" Suggestion: {issue.suggestion}")
|
|
|
|
if infos:
|
|
lines.append(f"\n INFO ({len(infos)}):")
|
|
for issue in infos:
|
|
lines.append(f" - {issue.message}")
|
|
|
|
lines.append("")
|
|
lines.append("=" * 60)
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def main():
|
|
"""Main entry point for CLI usage."""
|
|
parser = argparse.ArgumentParser(
|
|
description="Validate XML sitemaps",
|
|
)
|
|
parser.add_argument("--url", "-u", required=True, help="Sitemap URL to validate")
|
|
parser.add_argument("--check-urls", action="store_true",
|
|
help="Check HTTP status of URLs (slower)")
|
|
parser.add_argument("--output", "-o", help="Output file for JSON report")
|
|
parser.add_argument("--json", action="store_true", help="Output as JSON")
|
|
|
|
args = parser.parse_args()
|
|
|
|
validator = SitemapValidator(check_urls=args.check_urls)
|
|
result = validator.validate(args.url)
|
|
|
|
if args.json or args.output:
|
|
output = json.dumps(result.to_dict(), ensure_ascii=False, indent=2)
|
|
if args.output:
|
|
with open(args.output, "w", encoding="utf-8") as f:
|
|
f.write(output)
|
|
logger.info(f"Report written to {args.output}")
|
|
else:
|
|
print(output)
|
|
else:
|
|
print(validator.generate_report(result))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|