""" Sitemap Validator - Validate XML sitemaps ========================================== Purpose: Parse and validate XML sitemaps for SEO compliance Python: 3.10+ Usage: python sitemap_validator.py --url https://example.com/sitemap.xml """ import argparse import asyncio import gzip import json import logging import re from dataclasses import dataclass, field from datetime import datetime from io import BytesIO from typing import Any from urllib.parse import urljoin, urlparse import aiohttp import requests from lxml import etree logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", ) logger = logging.getLogger(__name__) @dataclass class SitemapIssue: """Represents a sitemap validation issue.""" severity: str # "error", "warning", "info" message: str url: str | None = None suggestion: str | None = None @dataclass class SitemapEntry: """Represents a single URL entry in sitemap.""" loc: str lastmod: str | None = None changefreq: str | None = None priority: float | None = None status_code: int | None = None @dataclass class SitemapResult: """Complete sitemap validation result.""" url: str sitemap_type: str # "urlset" or "sitemapindex" entries: list[SitemapEntry] = field(default_factory=list) child_sitemaps: list[str] = field(default_factory=list) issues: list[SitemapIssue] = field(default_factory=list) valid: bool = True stats: dict = field(default_factory=dict) timestamp: str = field(default_factory=lambda: datetime.now().isoformat()) def to_dict(self) -> dict: """Convert to dictionary for JSON output.""" return { "url": self.url, "sitemap_type": self.sitemap_type, "valid": self.valid, "stats": self.stats, "issues": [ { "severity": i.severity, "message": i.message, "url": i.url, "suggestion": i.suggestion, } for i in self.issues ], "entries_count": len(self.entries), "child_sitemaps": self.child_sitemaps, "timestamp": self.timestamp, } class SitemapValidator: """Validate XML sitemaps.""" SITEMAP_NS = "http://www.sitemaps.org/schemas/sitemap/0.9" MAX_URLS = 50000 MAX_SIZE_BYTES = 50 * 1024 * 1024 # 50MB VALID_CHANGEFREQ = { "always", "hourly", "daily", "weekly", "monthly", "yearly", "never" } def __init__(self, check_urls: bool = False, max_concurrent: int = 10): self.check_urls = check_urls self.max_concurrent = max_concurrent self.session = requests.Session() self.session.headers.update({ "User-Agent": "Mozilla/5.0 (compatible; SEOAuditBot/1.0)" }) def fetch_sitemap(self, url: str) -> tuple[bytes, bool]: """Fetch sitemap content, handling gzip compression.""" try: response = self.session.get(url, timeout=30) response.raise_for_status() content = response.content is_gzipped = False # Check if gzipped if url.endswith(".gz") or response.headers.get( "Content-Encoding" ) == "gzip": try: content = gzip.decompress(content) is_gzipped = True except gzip.BadGzipFile: pass return content, is_gzipped except requests.RequestException as e: raise RuntimeError(f"Failed to fetch sitemap: {e}") def parse_sitemap(self, content: bytes) -> tuple[str, list[dict]]: """Parse sitemap XML content.""" try: root = etree.fromstring(content) except etree.XMLSyntaxError as e: raise ValueError(f"Invalid XML: {e}") # Remove namespace for easier parsing nsmap = {"sm": self.SITEMAP_NS} # Check if it's a sitemap index or urlset if root.tag == f"{{{self.SITEMAP_NS}}}sitemapindex": sitemap_type = "sitemapindex" entries = [] for sitemap in root.findall("sm:sitemap", nsmap): entry = {} loc = sitemap.find("sm:loc", nsmap) if loc is not None and loc.text: entry["loc"] = loc.text.strip() lastmod = sitemap.find("sm:lastmod", nsmap) if lastmod is not None and lastmod.text: entry["lastmod"] = lastmod.text.strip() if entry.get("loc"): entries.append(entry) elif root.tag == f"{{{self.SITEMAP_NS}}}urlset": sitemap_type = "urlset" entries = [] for url in root.findall("sm:url", nsmap): entry = {} loc = url.find("sm:loc", nsmap) if loc is not None and loc.text: entry["loc"] = loc.text.strip() lastmod = url.find("sm:lastmod", nsmap) if lastmod is not None and lastmod.text: entry["lastmod"] = lastmod.text.strip() changefreq = url.find("sm:changefreq", nsmap) if changefreq is not None and changefreq.text: entry["changefreq"] = changefreq.text.strip().lower() priority = url.find("sm:priority", nsmap) if priority is not None and priority.text: try: entry["priority"] = float(priority.text.strip()) except ValueError: entry["priority"] = None if entry.get("loc"): entries.append(entry) else: raise ValueError(f"Unknown sitemap type: {root.tag}") return sitemap_type, entries def validate(self, url: str) -> SitemapResult: """Validate a sitemap URL.""" result = SitemapResult(url=url, sitemap_type="unknown") # Fetch sitemap try: content, is_gzipped = self.fetch_sitemap(url) except RuntimeError as e: result.issues.append(SitemapIssue( severity="error", message=str(e), url=url, )) result.valid = False return result # Check size if len(content) > self.MAX_SIZE_BYTES: result.issues.append(SitemapIssue( severity="error", message=f"Sitemap exceeds 50MB limit ({len(content) / 1024 / 1024:.2f}MB)", url=url, suggestion="Split sitemap into smaller files using sitemap index", )) # Parse XML try: sitemap_type, entries = self.parse_sitemap(content) except ValueError as e: result.issues.append(SitemapIssue( severity="error", message=str(e), url=url, )) result.valid = False return result result.sitemap_type = sitemap_type # Process entries if sitemap_type == "sitemapindex": result.child_sitemaps = [e["loc"] for e in entries] result.stats = { "child_sitemaps_count": len(entries), } else: # Validate URL entries url_count = len(entries) result.stats["url_count"] = url_count if url_count > self.MAX_URLS: result.issues.append(SitemapIssue( severity="error", message=f"Sitemap exceeds 50,000 URL limit ({url_count} URLs)", url=url, suggestion="Split into multiple sitemaps with sitemap index", )) if url_count == 0: result.issues.append(SitemapIssue( severity="warning", message="Sitemap is empty (no URLs)", url=url, )) # Validate individual entries seen_urls = set() invalid_lastmod = 0 invalid_changefreq = 0 invalid_priority = 0 for entry in entries: loc = entry.get("loc", "") # Check for duplicates if loc in seen_urls: result.issues.append(SitemapIssue( severity="warning", message="Duplicate URL in sitemap", url=loc, )) seen_urls.add(loc) # Validate lastmod format lastmod = entry.get("lastmod") if lastmod: if not self._validate_date(lastmod): invalid_lastmod += 1 # Validate changefreq changefreq = entry.get("changefreq") if changefreq and changefreq not in self.VALID_CHANGEFREQ: invalid_changefreq += 1 # Validate priority priority = entry.get("priority") if priority is not None: if not (0.0 <= priority <= 1.0): invalid_priority += 1 # Create entry object result.entries.append(SitemapEntry( loc=loc, lastmod=lastmod, changefreq=changefreq, priority=priority, )) # Add summary issues if invalid_lastmod > 0: result.issues.append(SitemapIssue( severity="warning", message=f"{invalid_lastmod} URLs with invalid lastmod format", suggestion="Use ISO 8601 format (YYYY-MM-DD or YYYY-MM-DDTHH:MM:SS+TZ)", )) if invalid_changefreq > 0: result.issues.append(SitemapIssue( severity="info", message=f"{invalid_changefreq} URLs with invalid changefreq", suggestion="Use: always, hourly, daily, weekly, monthly, yearly, never", )) if invalid_priority > 0: result.issues.append(SitemapIssue( severity="warning", message=f"{invalid_priority} URLs with invalid priority (must be 0.0-1.0)", )) result.stats.update({ "invalid_lastmod": invalid_lastmod, "invalid_changefreq": invalid_changefreq, "invalid_priority": invalid_priority, "has_lastmod": sum(1 for e in result.entries if e.lastmod), "has_changefreq": sum(1 for e in result.entries if e.changefreq), "has_priority": sum(1 for e in result.entries if e.priority is not None), }) # Check URLs if requested if self.check_urls and result.entries: asyncio.run(self._check_url_status(result)) # Determine validity result.valid = not any(i.severity == "error" for i in result.issues) return result def _validate_date(self, date_str: str) -> bool: """Validate ISO 8601 date format.""" patterns = [ r"^\d{4}-\d{2}-\d{2}$", r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}", ] return any(re.match(p, date_str) for p in patterns) async def _check_url_status(self, result: SitemapResult) -> None: """Check HTTP status of URLs in sitemap.""" semaphore = asyncio.Semaphore(self.max_concurrent) async def check_url(entry: SitemapEntry) -> None: async with semaphore: try: async with aiohttp.ClientSession() as session: async with session.head( entry.loc, timeout=aiohttp.ClientTimeout(total=10), allow_redirects=True, ) as response: entry.status_code = response.status except Exception: entry.status_code = 0 await asyncio.gather(*[check_url(e) for e in result.entries[:100]]) # Count status codes status_counts = {} for entry in result.entries: if entry.status_code: status_counts[entry.status_code] = ( status_counts.get(entry.status_code, 0) + 1 ) result.stats["url_status_codes"] = status_counts # Add issues for non-200 URLs error_count = sum( 1 for e in result.entries if e.status_code and e.status_code >= 400 ) if error_count > 0: result.issues.append(SitemapIssue( severity="warning", message=f"{error_count} URLs returning error status codes (4xx/5xx)", suggestion="Remove or fix broken URLs in sitemap", )) def generate_report(self, result: SitemapResult) -> str: """Generate human-readable validation report.""" lines = [ "=" * 60, "Sitemap Validation Report", "=" * 60, f"URL: {result.url}", f"Type: {result.sitemap_type}", f"Valid: {'Yes' if result.valid else 'No'}", f"Timestamp: {result.timestamp}", "", ] lines.append("Statistics:") for key, value in result.stats.items(): lines.append(f" {key}: {value}") lines.append("") if result.child_sitemaps: lines.append(f"Child Sitemaps ({len(result.child_sitemaps)}):") for sitemap in result.child_sitemaps[:10]: lines.append(f" - {sitemap}") if len(result.child_sitemaps) > 10: lines.append(f" ... and {len(result.child_sitemaps) - 10} more") lines.append("") if result.issues: lines.append("Issues Found:") errors = [i for i in result.issues if i.severity == "error"] warnings = [i for i in result.issues if i.severity == "warning"] infos = [i for i in result.issues if i.severity == "info"] if errors: lines.append(f"\n ERRORS ({len(errors)}):") for issue in errors: lines.append(f" - {issue.message}") if issue.url: lines.append(f" URL: {issue.url}") if issue.suggestion: lines.append(f" Suggestion: {issue.suggestion}") if warnings: lines.append(f"\n WARNINGS ({len(warnings)}):") for issue in warnings: lines.append(f" - {issue.message}") if issue.suggestion: lines.append(f" Suggestion: {issue.suggestion}") if infos: lines.append(f"\n INFO ({len(infos)}):") for issue in infos: lines.append(f" - {issue.message}") lines.append("") lines.append("=" * 60) return "\n".join(lines) def main(): """Main entry point for CLI usage.""" parser = argparse.ArgumentParser( description="Validate XML sitemaps", ) parser.add_argument("--url", "-u", required=True, help="Sitemap URL to validate") parser.add_argument("--check-urls", action="store_true", help="Check HTTP status of URLs (slower)") parser.add_argument("--output", "-o", help="Output file for JSON report") parser.add_argument("--json", action="store_true", help="Output as JSON") args = parser.parse_args() validator = SitemapValidator(check_urls=args.check_urls) result = validator.validate(args.url) if args.json or args.output: output = json.dumps(result.to_dict(), ensure_ascii=False, indent=2) if args.output: with open(args.output, "w", encoding="utf-8") as f: f.write(output) logger.info(f"Report written to {args.output}") else: print(output) else: print(validator.generate_report(result)) if __name__ == "__main__": main()