Files
our-claude-skills/ourdigital-custom-skills/12-ourdigital-seo-audit/scripts/sitemap_validator.py
Andrew Yim 9426787ba6 feat(seo-audit): Add comprehensive SEO audit skill
Add ourdigital-seo-audit skill with:
- Full site audit orchestrator (full_audit.py)
- Google Search Console and PageSpeed API clients
- Schema.org JSON-LD validation and generation
- XML sitemap and robots.txt validation
- Notion database integration for findings export
- Core Web Vitals measurement and analysis
- 7 schema templates (article, faq, product, etc.)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-14 02:30:02 +09:00

468 lines
16 KiB
Python

"""
Sitemap Validator - Validate XML sitemaps
==========================================
Purpose: Parse and validate XML sitemaps for SEO compliance
Python: 3.10+
Usage:
python sitemap_validator.py --url https://example.com/sitemap.xml
"""
import argparse
import asyncio
import gzip
import json
import logging
import re
from dataclasses import dataclass, field
from datetime import datetime
from io import BytesIO
from typing import Any
from urllib.parse import urljoin, urlparse
import aiohttp
import requests
from lxml import etree
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
@dataclass
class SitemapIssue:
"""Represents a sitemap validation issue."""
severity: str # "error", "warning", "info"
message: str
url: str | None = None
suggestion: str | None = None
@dataclass
class SitemapEntry:
"""Represents a single URL entry in sitemap."""
loc: str
lastmod: str | None = None
changefreq: str | None = None
priority: float | None = None
status_code: int | None = None
@dataclass
class SitemapResult:
"""Complete sitemap validation result."""
url: str
sitemap_type: str # "urlset" or "sitemapindex"
entries: list[SitemapEntry] = field(default_factory=list)
child_sitemaps: list[str] = field(default_factory=list)
issues: list[SitemapIssue] = field(default_factory=list)
valid: bool = True
stats: dict = field(default_factory=dict)
timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
def to_dict(self) -> dict:
"""Convert to dictionary for JSON output."""
return {
"url": self.url,
"sitemap_type": self.sitemap_type,
"valid": self.valid,
"stats": self.stats,
"issues": [
{
"severity": i.severity,
"message": i.message,
"url": i.url,
"suggestion": i.suggestion,
}
for i in self.issues
],
"entries_count": len(self.entries),
"child_sitemaps": self.child_sitemaps,
"timestamp": self.timestamp,
}
class SitemapValidator:
"""Validate XML sitemaps."""
SITEMAP_NS = "http://www.sitemaps.org/schemas/sitemap/0.9"
MAX_URLS = 50000
MAX_SIZE_BYTES = 50 * 1024 * 1024 # 50MB
VALID_CHANGEFREQ = {
"always", "hourly", "daily", "weekly",
"monthly", "yearly", "never"
}
def __init__(self, check_urls: bool = False, max_concurrent: int = 10):
self.check_urls = check_urls
self.max_concurrent = max_concurrent
self.session = requests.Session()
self.session.headers.update({
"User-Agent": "Mozilla/5.0 (compatible; SEOAuditBot/1.0)"
})
def fetch_sitemap(self, url: str) -> tuple[bytes, bool]:
"""Fetch sitemap content, handling gzip compression."""
try:
response = self.session.get(url, timeout=30)
response.raise_for_status()
content = response.content
is_gzipped = False
# Check if gzipped
if url.endswith(".gz") or response.headers.get(
"Content-Encoding"
) == "gzip":
try:
content = gzip.decompress(content)
is_gzipped = True
except gzip.BadGzipFile:
pass
return content, is_gzipped
except requests.RequestException as e:
raise RuntimeError(f"Failed to fetch sitemap: {e}")
def parse_sitemap(self, content: bytes) -> tuple[str, list[dict]]:
"""Parse sitemap XML content."""
try:
root = etree.fromstring(content)
except etree.XMLSyntaxError as e:
raise ValueError(f"Invalid XML: {e}")
# Remove namespace for easier parsing
nsmap = {"sm": self.SITEMAP_NS}
# Check if it's a sitemap index or urlset
if root.tag == f"{{{self.SITEMAP_NS}}}sitemapindex":
sitemap_type = "sitemapindex"
entries = []
for sitemap in root.findall("sm:sitemap", nsmap):
entry = {}
loc = sitemap.find("sm:loc", nsmap)
if loc is not None and loc.text:
entry["loc"] = loc.text.strip()
lastmod = sitemap.find("sm:lastmod", nsmap)
if lastmod is not None and lastmod.text:
entry["lastmod"] = lastmod.text.strip()
if entry.get("loc"):
entries.append(entry)
elif root.tag == f"{{{self.SITEMAP_NS}}}urlset":
sitemap_type = "urlset"
entries = []
for url in root.findall("sm:url", nsmap):
entry = {}
loc = url.find("sm:loc", nsmap)
if loc is not None and loc.text:
entry["loc"] = loc.text.strip()
lastmod = url.find("sm:lastmod", nsmap)
if lastmod is not None and lastmod.text:
entry["lastmod"] = lastmod.text.strip()
changefreq = url.find("sm:changefreq", nsmap)
if changefreq is not None and changefreq.text:
entry["changefreq"] = changefreq.text.strip().lower()
priority = url.find("sm:priority", nsmap)
if priority is not None and priority.text:
try:
entry["priority"] = float(priority.text.strip())
except ValueError:
entry["priority"] = None
if entry.get("loc"):
entries.append(entry)
else:
raise ValueError(f"Unknown sitemap type: {root.tag}")
return sitemap_type, entries
def validate(self, url: str) -> SitemapResult:
"""Validate a sitemap URL."""
result = SitemapResult(url=url, sitemap_type="unknown")
# Fetch sitemap
try:
content, is_gzipped = self.fetch_sitemap(url)
except RuntimeError as e:
result.issues.append(SitemapIssue(
severity="error",
message=str(e),
url=url,
))
result.valid = False
return result
# Check size
if len(content) > self.MAX_SIZE_BYTES:
result.issues.append(SitemapIssue(
severity="error",
message=f"Sitemap exceeds 50MB limit ({len(content) / 1024 / 1024:.2f}MB)",
url=url,
suggestion="Split sitemap into smaller files using sitemap index",
))
# Parse XML
try:
sitemap_type, entries = self.parse_sitemap(content)
except ValueError as e:
result.issues.append(SitemapIssue(
severity="error",
message=str(e),
url=url,
))
result.valid = False
return result
result.sitemap_type = sitemap_type
# Process entries
if sitemap_type == "sitemapindex":
result.child_sitemaps = [e["loc"] for e in entries]
result.stats = {
"child_sitemaps_count": len(entries),
}
else:
# Validate URL entries
url_count = len(entries)
result.stats["url_count"] = url_count
if url_count > self.MAX_URLS:
result.issues.append(SitemapIssue(
severity="error",
message=f"Sitemap exceeds 50,000 URL limit ({url_count} URLs)",
url=url,
suggestion="Split into multiple sitemaps with sitemap index",
))
if url_count == 0:
result.issues.append(SitemapIssue(
severity="warning",
message="Sitemap is empty (no URLs)",
url=url,
))
# Validate individual entries
seen_urls = set()
invalid_lastmod = 0
invalid_changefreq = 0
invalid_priority = 0
for entry in entries:
loc = entry.get("loc", "")
# Check for duplicates
if loc in seen_urls:
result.issues.append(SitemapIssue(
severity="warning",
message="Duplicate URL in sitemap",
url=loc,
))
seen_urls.add(loc)
# Validate lastmod format
lastmod = entry.get("lastmod")
if lastmod:
if not self._validate_date(lastmod):
invalid_lastmod += 1
# Validate changefreq
changefreq = entry.get("changefreq")
if changefreq and changefreq not in self.VALID_CHANGEFREQ:
invalid_changefreq += 1
# Validate priority
priority = entry.get("priority")
if priority is not None:
if not (0.0 <= priority <= 1.0):
invalid_priority += 1
# Create entry object
result.entries.append(SitemapEntry(
loc=loc,
lastmod=lastmod,
changefreq=changefreq,
priority=priority,
))
# Add summary issues
if invalid_lastmod > 0:
result.issues.append(SitemapIssue(
severity="warning",
message=f"{invalid_lastmod} URLs with invalid lastmod format",
suggestion="Use ISO 8601 format (YYYY-MM-DD or YYYY-MM-DDTHH:MM:SS+TZ)",
))
if invalid_changefreq > 0:
result.issues.append(SitemapIssue(
severity="info",
message=f"{invalid_changefreq} URLs with invalid changefreq",
suggestion="Use: always, hourly, daily, weekly, monthly, yearly, never",
))
if invalid_priority > 0:
result.issues.append(SitemapIssue(
severity="warning",
message=f"{invalid_priority} URLs with invalid priority (must be 0.0-1.0)",
))
result.stats.update({
"invalid_lastmod": invalid_lastmod,
"invalid_changefreq": invalid_changefreq,
"invalid_priority": invalid_priority,
"has_lastmod": sum(1 for e in result.entries if e.lastmod),
"has_changefreq": sum(1 for e in result.entries if e.changefreq),
"has_priority": sum(1 for e in result.entries if e.priority is not None),
})
# Check URLs if requested
if self.check_urls and result.entries:
asyncio.run(self._check_url_status(result))
# Determine validity
result.valid = not any(i.severity == "error" for i in result.issues)
return result
def _validate_date(self, date_str: str) -> bool:
"""Validate ISO 8601 date format."""
patterns = [
r"^\d{4}-\d{2}-\d{2}$",
r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}",
]
return any(re.match(p, date_str) for p in patterns)
async def _check_url_status(self, result: SitemapResult) -> None:
"""Check HTTP status of URLs in sitemap."""
semaphore = asyncio.Semaphore(self.max_concurrent)
async def check_url(entry: SitemapEntry) -> None:
async with semaphore:
try:
async with aiohttp.ClientSession() as session:
async with session.head(
entry.loc,
timeout=aiohttp.ClientTimeout(total=10),
allow_redirects=True,
) as response:
entry.status_code = response.status
except Exception:
entry.status_code = 0
await asyncio.gather(*[check_url(e) for e in result.entries[:100]])
# Count status codes
status_counts = {}
for entry in result.entries:
if entry.status_code:
status_counts[entry.status_code] = (
status_counts.get(entry.status_code, 0) + 1
)
result.stats["url_status_codes"] = status_counts
# Add issues for non-200 URLs
error_count = sum(
1 for e in result.entries
if e.status_code and e.status_code >= 400
)
if error_count > 0:
result.issues.append(SitemapIssue(
severity="warning",
message=f"{error_count} URLs returning error status codes (4xx/5xx)",
suggestion="Remove or fix broken URLs in sitemap",
))
def generate_report(self, result: SitemapResult) -> str:
"""Generate human-readable validation report."""
lines = [
"=" * 60,
"Sitemap Validation Report",
"=" * 60,
f"URL: {result.url}",
f"Type: {result.sitemap_type}",
f"Valid: {'Yes' if result.valid else 'No'}",
f"Timestamp: {result.timestamp}",
"",
]
lines.append("Statistics:")
for key, value in result.stats.items():
lines.append(f" {key}: {value}")
lines.append("")
if result.child_sitemaps:
lines.append(f"Child Sitemaps ({len(result.child_sitemaps)}):")
for sitemap in result.child_sitemaps[:10]:
lines.append(f" - {sitemap}")
if len(result.child_sitemaps) > 10:
lines.append(f" ... and {len(result.child_sitemaps) - 10} more")
lines.append("")
if result.issues:
lines.append("Issues Found:")
errors = [i for i in result.issues if i.severity == "error"]
warnings = [i for i in result.issues if i.severity == "warning"]
infos = [i for i in result.issues if i.severity == "info"]
if errors:
lines.append(f"\n ERRORS ({len(errors)}):")
for issue in errors:
lines.append(f" - {issue.message}")
if issue.url:
lines.append(f" URL: {issue.url}")
if issue.suggestion:
lines.append(f" Suggestion: {issue.suggestion}")
if warnings:
lines.append(f"\n WARNINGS ({len(warnings)}):")
for issue in warnings:
lines.append(f" - {issue.message}")
if issue.suggestion:
lines.append(f" Suggestion: {issue.suggestion}")
if infos:
lines.append(f"\n INFO ({len(infos)}):")
for issue in infos:
lines.append(f" - {issue.message}")
lines.append("")
lines.append("=" * 60)
return "\n".join(lines)
def main():
"""Main entry point for CLI usage."""
parser = argparse.ArgumentParser(
description="Validate XML sitemaps",
)
parser.add_argument("--url", "-u", required=True, help="Sitemap URL to validate")
parser.add_argument("--check-urls", action="store_true",
help="Check HTTP status of URLs (slower)")
parser.add_argument("--output", "-o", help="Output file for JSON report")
parser.add_argument("--json", action="store_true", help="Output as JSON")
args = parser.parse_args()
validator = SitemapValidator(check_urls=args.check_urls)
result = validator.validate(args.url)
if args.json or args.output:
output = json.dumps(result.to_dict(), ensure_ascii=False, indent=2)
if args.output:
with open(args.output, "w", encoding="utf-8") as f:
f.write(output)
logger.info(f"Report written to {args.output}")
else:
print(output)
else:
print(validator.generate_report(result))
if __name__ == "__main__":
main()