directory changes and restructuring

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2025-12-22 02:01:41 +09:00
parent eea49f9f8c
commit 236be6c580
598 changed files with 0 additions and 0 deletions

View File

@@ -0,0 +1,127 @@
# CLAUDE.md
## Overview
Technical SEO auditor for crawlability fundamentals: robots.txt validation, XML sitemap analysis, and URL accessibility checking.
## Quick Start
```bash
# Install dependencies
pip install -r scripts/requirements.txt
# Robots.txt analysis
python scripts/robots_checker.py --url https://example.com
# Sitemap validation
python scripts/sitemap_validator.py --url https://example.com/sitemap.xml
# Async URL crawl (check sitemap URLs accessibility)
python scripts/sitemap_crawler.py --sitemap https://example.com/sitemap.xml
```
## Scripts
| Script | Purpose | Key Output |
|--------|---------|------------|
| `robots_checker.py` | Parse and validate robots.txt | User-agent rules, disallow patterns, sitemap declarations |
| `sitemap_validator.py` | Validate XML sitemap structure | URL count, lastmod dates, size limits, syntax errors |
| `sitemap_crawler.py` | Async check URL accessibility | HTTP status codes, response times, broken links |
| `base_client.py` | Shared utilities | RateLimiter, ConfigManager, BaseAsyncClient |
## Robots.txt Checker
```bash
# Basic analysis
python scripts/robots_checker.py --url https://example.com
# Test specific URL against rules
python scripts/robots_checker.py --url https://example.com --test-url /admin/
# Output JSON
python scripts/robots_checker.py --url https://example.com --json
```
**Checks performed**:
- Syntax validation
- User-agent rule parsing
- Disallow/Allow pattern analysis
- Sitemap declarations
- Critical resource access (CSS/JS/images)
## Sitemap Validator
```bash
# Validate sitemap
python scripts/sitemap_validator.py --url https://example.com/sitemap.xml
# Include sitemap index parsing
python scripts/sitemap_validator.py --url https://example.com/sitemap_index.xml --follow-index
```
**Validation rules**:
- XML syntax correctness
- URL count limit (50,000 max per sitemap)
- File size limit (50MB max uncompressed)
- Lastmod date format validation
- Sitemap index structure
## Sitemap Crawler
```bash
# Crawl all URLs in sitemap
python scripts/sitemap_crawler.py --sitemap https://example.com/sitemap.xml
# Limit concurrent requests
python scripts/sitemap_crawler.py --sitemap https://example.com/sitemap.xml --concurrency 10
# Sample mode (check subset)
python scripts/sitemap_crawler.py --sitemap https://example.com/sitemap.xml --sample 100
```
**Output includes**:
- HTTP status codes per URL
- Response times
- Redirect chains
- Broken links (4xx, 5xx)
## Output Format
All scripts support `--json` flag for structured output:
```json
{
"url": "https://example.com",
"status": "valid|invalid|warning",
"issues": [
{
"type": "error|warning|info",
"message": "Description",
"location": "Line or URL"
}
],
"summary": {}
}
```
## Common Issues Detected
| Category | Issue | Severity |
|----------|-------|----------|
| Robots.txt | Missing sitemap declaration | Medium |
| Robots.txt | Blocking CSS/JS resources | High |
| Robots.txt | Overly broad disallow rules | Medium |
| Sitemap | URLs returning 404 | High |
| Sitemap | Missing lastmod dates | Low |
| Sitemap | Exceeds 50,000 URL limit | High |
| Sitemap | Non-canonical URLs included | Medium |
## Configuration
Environment variables (optional):
```bash
# Rate limiting
CRAWL_DELAY=1.0 # Seconds between requests
MAX_CONCURRENT=20 # Async concurrency limit
REQUEST_TIMEOUT=30 # Request timeout seconds
```

View File

@@ -0,0 +1,207 @@
"""
Base Client - Shared async client utilities
===========================================
Purpose: Rate-limited async operations for API clients
Python: 3.10+
"""
import asyncio
import logging
import os
from asyncio import Semaphore
from datetime import datetime
from typing import Any, Callable, TypeVar
from dotenv import load_dotenv
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
)
# Load environment variables
load_dotenv()
# Logging setup
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
)
T = TypeVar("T")
class RateLimiter:
"""Rate limiter using token bucket algorithm."""
def __init__(self, rate: float, per: float = 1.0):
"""
Initialize rate limiter.
Args:
rate: Number of requests allowed
per: Time period in seconds (default: 1 second)
"""
self.rate = rate
self.per = per
self.tokens = rate
self.last_update = datetime.now()
self._lock = asyncio.Lock()
async def acquire(self) -> None:
"""Acquire a token, waiting if necessary."""
async with self._lock:
now = datetime.now()
elapsed = (now - self.last_update).total_seconds()
self.tokens = min(self.rate, self.tokens + elapsed * (self.rate / self.per))
self.last_update = now
if self.tokens < 1:
wait_time = (1 - self.tokens) * (self.per / self.rate)
await asyncio.sleep(wait_time)
self.tokens = 0
else:
self.tokens -= 1
class BaseAsyncClient:
"""Base class for async API clients with rate limiting."""
def __init__(
self,
max_concurrent: int = 5,
requests_per_second: float = 3.0,
logger: logging.Logger | None = None,
):
"""
Initialize base client.
Args:
max_concurrent: Maximum concurrent requests
requests_per_second: Rate limit
logger: Logger instance
"""
self.semaphore = Semaphore(max_concurrent)
self.rate_limiter = RateLimiter(requests_per_second)
self.logger = logger or logging.getLogger(self.__class__.__name__)
self.stats = {
"requests": 0,
"success": 0,
"errors": 0,
"retries": 0,
}
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
retry=retry_if_exception_type(Exception),
)
async def _rate_limited_request(
self,
coro: Callable[[], Any],
) -> Any:
"""Execute a request with rate limiting and retry."""
async with self.semaphore:
await self.rate_limiter.acquire()
self.stats["requests"] += 1
try:
result = await coro()
self.stats["success"] += 1
return result
except Exception as e:
self.stats["errors"] += 1
self.logger.error(f"Request failed: {e}")
raise
async def batch_requests(
self,
requests: list[Callable[[], Any]],
desc: str = "Processing",
) -> list[Any]:
"""Execute multiple requests concurrently."""
try:
from tqdm.asyncio import tqdm
has_tqdm = True
except ImportError:
has_tqdm = False
async def execute(req: Callable) -> Any:
try:
return await self._rate_limited_request(req)
except Exception as e:
return {"error": str(e)}
tasks = [execute(req) for req in requests]
if has_tqdm:
results = []
for coro in tqdm.as_completed(tasks, total=len(tasks), desc=desc):
result = await coro
results.append(result)
return results
else:
return await asyncio.gather(*tasks, return_exceptions=True)
def print_stats(self) -> None:
"""Print request statistics."""
self.logger.info("=" * 40)
self.logger.info("Request Statistics:")
self.logger.info(f" Total Requests: {self.stats['requests']}")
self.logger.info(f" Successful: {self.stats['success']}")
self.logger.info(f" Errors: {self.stats['errors']}")
self.logger.info("=" * 40)
class ConfigManager:
"""Manage API configuration and credentials."""
def __init__(self):
load_dotenv()
@property
def google_credentials_path(self) -> str | None:
"""Get Google service account credentials path."""
# Prefer SEO-specific credentials, fallback to general credentials
seo_creds = os.path.expanduser("~/.credential/ourdigital-seo-agent.json")
if os.path.exists(seo_creds):
return seo_creds
return os.getenv("GOOGLE_APPLICATION_CREDENTIALS")
@property
def pagespeed_api_key(self) -> str | None:
"""Get PageSpeed Insights API key."""
return os.getenv("PAGESPEED_API_KEY")
@property
def custom_search_api_key(self) -> str | None:
"""Get Custom Search API key."""
return os.getenv("CUSTOM_SEARCH_API_KEY")
@property
def custom_search_engine_id(self) -> str | None:
"""Get Custom Search Engine ID."""
return os.getenv("CUSTOM_SEARCH_ENGINE_ID")
@property
def notion_token(self) -> str | None:
"""Get Notion API token."""
return os.getenv("NOTION_TOKEN") or os.getenv("NOTION_API_KEY")
def validate_google_credentials(self) -> bool:
"""Validate Google credentials are configured."""
creds_path = self.google_credentials_path
if not creds_path:
return False
return os.path.exists(creds_path)
def get_required(self, key: str) -> str:
"""Get required environment variable or raise error."""
value = os.getenv(key)
if not value:
raise ValueError(f"Missing required environment variable: {key}")
return value
# Singleton config instance
config = ConfigManager()

View File

@@ -0,0 +1,569 @@
"""
Page Analyzer - Extract SEO metadata from web pages
===================================================
Purpose: Comprehensive page-level SEO data extraction
Python: 3.10+
Usage:
from page_analyzer import PageAnalyzer, PageMetadata
analyzer = PageAnalyzer()
metadata = analyzer.analyze_url("https://example.com/page")
"""
import json
import logging
import re
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
from urllib.parse import urljoin, urlparse
import requests
from bs4 import BeautifulSoup
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
@dataclass
class LinkData:
"""Represents a link found on a page."""
url: str
anchor_text: str
is_internal: bool
is_nofollow: bool = False
link_type: str = "body" # body, nav, footer, etc.
@dataclass
class HeadingData:
"""Represents a heading found on a page."""
level: int # 1-6
text: str
@dataclass
class SchemaData:
"""Represents schema.org structured data."""
schema_type: str
properties: dict
format: str = "json-ld" # json-ld, microdata, rdfa
@dataclass
class OpenGraphData:
"""Represents Open Graph metadata."""
og_title: str | None = None
og_description: str | None = None
og_image: str | None = None
og_url: str | None = None
og_type: str | None = None
og_site_name: str | None = None
og_locale: str | None = None
twitter_card: str | None = None
twitter_title: str | None = None
twitter_description: str | None = None
twitter_image: str | None = None
@dataclass
class PageMetadata:
"""Complete SEO metadata for a page."""
# Basic info
url: str
status_code: int = 0
content_type: str = ""
response_time_ms: float = 0
analyzed_at: datetime = field(default_factory=datetime.now)
# Meta tags
title: str | None = None
title_length: int = 0
meta_description: str | None = None
meta_description_length: int = 0
canonical_url: str | None = None
robots_meta: str | None = None
# Language
html_lang: str | None = None
hreflang_tags: list[dict] = field(default_factory=list) # [{"lang": "en", "url": "..."}]
# Headings
headings: list[HeadingData] = field(default_factory=list)
h1_count: int = 0
h1_text: str | None = None
# Open Graph & Social
open_graph: OpenGraphData = field(default_factory=OpenGraphData)
# Schema/Structured Data
schema_data: list[SchemaData] = field(default_factory=list)
schema_types_found: list[str] = field(default_factory=list)
# Links
internal_links: list[LinkData] = field(default_factory=list)
external_links: list[LinkData] = field(default_factory=list)
internal_link_count: int = 0
external_link_count: int = 0
# Images
images_total: int = 0
images_without_alt: int = 0
images_with_alt: int = 0
# Content metrics
word_count: int = 0
# Issues found
issues: list[str] = field(default_factory=list)
warnings: list[str] = field(default_factory=list)
def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization."""
return {
"url": self.url,
"status_code": self.status_code,
"content_type": self.content_type,
"response_time_ms": self.response_time_ms,
"analyzed_at": self.analyzed_at.isoformat(),
"title": self.title,
"title_length": self.title_length,
"meta_description": self.meta_description,
"meta_description_length": self.meta_description_length,
"canonical_url": self.canonical_url,
"robots_meta": self.robots_meta,
"html_lang": self.html_lang,
"hreflang_tags": self.hreflang_tags,
"h1_count": self.h1_count,
"h1_text": self.h1_text,
"headings_count": len(self.headings),
"schema_types_found": self.schema_types_found,
"internal_link_count": self.internal_link_count,
"external_link_count": self.external_link_count,
"images_total": self.images_total,
"images_without_alt": self.images_without_alt,
"word_count": self.word_count,
"issues": self.issues,
"warnings": self.warnings,
"open_graph": {
"og_title": self.open_graph.og_title,
"og_description": self.open_graph.og_description,
"og_image": self.open_graph.og_image,
"og_url": self.open_graph.og_url,
"og_type": self.open_graph.og_type,
},
}
def get_summary(self) -> str:
"""Get a brief summary of the page analysis."""
lines = [
f"URL: {self.url}",
f"Status: {self.status_code}",
f"Title: {self.title[:50] + '...' if self.title and len(self.title) > 50 else self.title}",
f"Description: {'' if self.meta_description else '✗ Missing'}",
f"Canonical: {'' if self.canonical_url else '✗ Missing'}",
f"H1: {self.h1_count} found",
f"Schema: {', '.join(self.schema_types_found) if self.schema_types_found else 'None'}",
f"Links: {self.internal_link_count} internal, {self.external_link_count} external",
f"Images: {self.images_total} total, {self.images_without_alt} without alt",
]
if self.issues:
lines.append(f"Issues: {len(self.issues)}")
return "\n".join(lines)
class PageAnalyzer:
"""Analyze web pages for SEO metadata."""
DEFAULT_USER_AGENT = "Mozilla/5.0 (compatible; OurDigitalSEOBot/1.0; +https://ourdigital.org)"
def __init__(
self,
user_agent: str | None = None,
timeout: int = 30,
):
"""
Initialize page analyzer.
Args:
user_agent: Custom user agent string
timeout: Request timeout in seconds
"""
self.user_agent = user_agent or self.DEFAULT_USER_AGENT
self.timeout = timeout
self.session = requests.Session()
self.session.headers.update({
"User-Agent": self.user_agent,
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9,ko;q=0.8",
})
def analyze_url(self, url: str) -> PageMetadata:
"""
Analyze a URL and extract SEO metadata.
Args:
url: URL to analyze
Returns:
PageMetadata object with all extracted data
"""
metadata = PageMetadata(url=url)
try:
# Fetch page
start_time = datetime.now()
response = self.session.get(url, timeout=self.timeout, allow_redirects=True)
metadata.response_time_ms = (datetime.now() - start_time).total_seconds() * 1000
metadata.status_code = response.status_code
metadata.content_type = response.headers.get("Content-Type", "")
if response.status_code != 200:
metadata.issues.append(f"HTTP {response.status_code} status")
if response.status_code >= 400:
return metadata
# Parse HTML
soup = BeautifulSoup(response.text, "html.parser")
base_url = url
# Extract all metadata
self._extract_basic_meta(soup, metadata)
self._extract_canonical(soup, metadata, base_url)
self._extract_robots_meta(soup, metadata)
self._extract_hreflang(soup, metadata)
self._extract_headings(soup, metadata)
self._extract_open_graph(soup, metadata)
self._extract_schema(soup, metadata)
self._extract_links(soup, metadata, base_url)
self._extract_images(soup, metadata)
self._extract_content_metrics(soup, metadata)
# Run SEO checks
self._run_seo_checks(metadata)
except requests.RequestException as e:
metadata.issues.append(f"Request failed: {str(e)}")
logger.error(f"Failed to analyze {url}: {e}")
except Exception as e:
metadata.issues.append(f"Analysis error: {str(e)}")
logger.error(f"Error analyzing {url}: {e}")
return metadata
def _extract_basic_meta(self, soup: BeautifulSoup, metadata: PageMetadata) -> None:
"""Extract title and meta description."""
# Title
title_tag = soup.find("title")
if title_tag and title_tag.string:
metadata.title = title_tag.string.strip()
metadata.title_length = len(metadata.title)
# Meta description
desc_tag = soup.find("meta", attrs={"name": re.compile(r"^description$", re.I)})
if desc_tag and desc_tag.get("content"):
metadata.meta_description = desc_tag["content"].strip()
metadata.meta_description_length = len(metadata.meta_description)
# HTML lang
html_tag = soup.find("html")
if html_tag and html_tag.get("lang"):
metadata.html_lang = html_tag["lang"]
def _extract_canonical(self, soup: BeautifulSoup, metadata: PageMetadata, base_url: str) -> None:
"""Extract canonical URL."""
canonical = soup.find("link", rel="canonical")
if canonical and canonical.get("href"):
metadata.canonical_url = urljoin(base_url, canonical["href"])
def _extract_robots_meta(self, soup: BeautifulSoup, metadata: PageMetadata) -> None:
"""Extract robots meta tag."""
robots = soup.find("meta", attrs={"name": re.compile(r"^robots$", re.I)})
if robots and robots.get("content"):
metadata.robots_meta = robots["content"]
# Also check for googlebot-specific
googlebot = soup.find("meta", attrs={"name": re.compile(r"^googlebot$", re.I)})
if googlebot and googlebot.get("content"):
if metadata.robots_meta:
metadata.robots_meta += f" | googlebot: {googlebot['content']}"
else:
metadata.robots_meta = f"googlebot: {googlebot['content']}"
def _extract_hreflang(self, soup: BeautifulSoup, metadata: PageMetadata) -> None:
"""Extract hreflang tags."""
hreflang_tags = soup.find_all("link", rel="alternate", hreflang=True)
for tag in hreflang_tags:
if tag.get("href") and tag.get("hreflang"):
metadata.hreflang_tags.append({
"lang": tag["hreflang"],
"url": tag["href"]
})
def _extract_headings(self, soup: BeautifulSoup, metadata: PageMetadata) -> None:
"""Extract all headings."""
for level in range(1, 7):
for heading in soup.find_all(f"h{level}"):
text = heading.get_text(strip=True)
if text:
metadata.headings.append(HeadingData(level=level, text=text))
# Count H1s specifically
h1_tags = soup.find_all("h1")
metadata.h1_count = len(h1_tags)
if h1_tags:
metadata.h1_text = h1_tags[0].get_text(strip=True)
def _extract_open_graph(self, soup: BeautifulSoup, metadata: PageMetadata) -> None:
"""Extract Open Graph and Twitter Card data."""
og = metadata.open_graph
# Open Graph tags
og_mappings = {
"og:title": "og_title",
"og:description": "og_description",
"og:image": "og_image",
"og:url": "og_url",
"og:type": "og_type",
"og:site_name": "og_site_name",
"og:locale": "og_locale",
}
for og_prop, attr_name in og_mappings.items():
tag = soup.find("meta", property=og_prop)
if tag and tag.get("content"):
setattr(og, attr_name, tag["content"])
# Twitter Card tags
twitter_mappings = {
"twitter:card": "twitter_card",
"twitter:title": "twitter_title",
"twitter:description": "twitter_description",
"twitter:image": "twitter_image",
}
for tw_name, attr_name in twitter_mappings.items():
tag = soup.find("meta", attrs={"name": tw_name})
if tag and tag.get("content"):
setattr(og, attr_name, tag["content"])
def _extract_schema(self, soup: BeautifulSoup, metadata: PageMetadata) -> None:
"""Extract schema.org structured data."""
# JSON-LD
for script in soup.find_all("script", type="application/ld+json"):
try:
data = json.loads(script.string)
if isinstance(data, list):
for item in data:
self._process_schema_item(item, metadata, "json-ld")
else:
self._process_schema_item(data, metadata, "json-ld")
except (json.JSONDecodeError, TypeError):
continue
# Microdata (basic detection)
for item in soup.find_all(itemscope=True):
itemtype = item.get("itemtype", "")
if itemtype:
schema_type = itemtype.split("/")[-1]
if schema_type not in metadata.schema_types_found:
metadata.schema_types_found.append(schema_type)
metadata.schema_data.append(SchemaData(
schema_type=schema_type,
properties={},
format="microdata"
))
def _process_schema_item(self, data: dict, metadata: PageMetadata, format_type: str) -> None:
"""Process a single schema.org item."""
if not isinstance(data, dict):
return
schema_type = data.get("@type", "Unknown")
if isinstance(schema_type, list):
schema_type = schema_type[0] if schema_type else "Unknown"
if schema_type not in metadata.schema_types_found:
metadata.schema_types_found.append(schema_type)
metadata.schema_data.append(SchemaData(
schema_type=schema_type,
properties=data,
format=format_type
))
# Process nested @graph items
if "@graph" in data:
for item in data["@graph"]:
self._process_schema_item(item, metadata, format_type)
def _extract_links(self, soup: BeautifulSoup, metadata: PageMetadata, base_url: str) -> None:
"""Extract internal and external links."""
parsed_base = urlparse(base_url)
base_domain = parsed_base.netloc.lower()
for a_tag in soup.find_all("a", href=True):
href = a_tag["href"]
# Skip non-http links
if href.startswith(("#", "javascript:", "mailto:", "tel:")):
continue
# Resolve relative URLs
full_url = urljoin(base_url, href)
parsed_url = urlparse(full_url)
# Get anchor text
anchor_text = a_tag.get_text(strip=True)[:100] # Limit length
# Check if nofollow
rel = a_tag.get("rel", [])
if isinstance(rel, str):
rel = rel.split()
is_nofollow = "nofollow" in rel
# Determine if internal or external
link_domain = parsed_url.netloc.lower()
is_internal = (
link_domain == base_domain or
link_domain.endswith(f".{base_domain}") or
base_domain.endswith(f".{link_domain}")
)
link_data = LinkData(
url=full_url,
anchor_text=anchor_text,
is_internal=is_internal,
is_nofollow=is_nofollow,
)
if is_internal:
metadata.internal_links.append(link_data)
else:
metadata.external_links.append(link_data)
metadata.internal_link_count = len(metadata.internal_links)
metadata.external_link_count = len(metadata.external_links)
def _extract_images(self, soup: BeautifulSoup, metadata: PageMetadata) -> None:
"""Extract image information."""
images = soup.find_all("img")
metadata.images_total = len(images)
for img in images:
alt = img.get("alt", "").strip()
if alt:
metadata.images_with_alt += 1
else:
metadata.images_without_alt += 1
def _extract_content_metrics(self, soup: BeautifulSoup, metadata: PageMetadata) -> None:
"""Extract content metrics like word count."""
# Remove script and style elements
for element in soup(["script", "style", "noscript"]):
element.decompose()
# Get text content
text = soup.get_text(separator=" ", strip=True)
words = text.split()
metadata.word_count = len(words)
def _run_seo_checks(self, metadata: PageMetadata) -> None:
"""Run SEO checks and add issues/warnings."""
# Title checks
if not metadata.title:
metadata.issues.append("Missing title tag")
elif metadata.title_length < 30:
metadata.warnings.append(f"Title too short ({metadata.title_length} chars, recommend 50-60)")
elif metadata.title_length > 60:
metadata.warnings.append(f"Title too long ({metadata.title_length} chars, recommend 50-60)")
# Meta description checks
if not metadata.meta_description:
metadata.issues.append("Missing meta description")
elif metadata.meta_description_length < 120:
metadata.warnings.append(f"Meta description too short ({metadata.meta_description_length} chars)")
elif metadata.meta_description_length > 160:
metadata.warnings.append(f"Meta description too long ({metadata.meta_description_length} chars)")
# Canonical check
if not metadata.canonical_url:
metadata.warnings.append("Missing canonical tag")
elif metadata.canonical_url != metadata.url:
metadata.warnings.append(f"Canonical points to different URL: {metadata.canonical_url}")
# H1 checks
if metadata.h1_count == 0:
metadata.issues.append("Missing H1 tag")
elif metadata.h1_count > 1:
metadata.warnings.append(f"Multiple H1 tags ({metadata.h1_count})")
# Image alt check
if metadata.images_without_alt > 0:
metadata.warnings.append(f"{metadata.images_without_alt} images missing alt text")
# Schema check
if not metadata.schema_types_found:
metadata.warnings.append("No structured data found")
# Open Graph check
if not metadata.open_graph.og_title:
metadata.warnings.append("Missing Open Graph tags")
# Robots meta check
if metadata.robots_meta:
robots_lower = metadata.robots_meta.lower()
if "noindex" in robots_lower:
metadata.issues.append("Page is set to noindex")
if "nofollow" in robots_lower:
metadata.warnings.append("Page is set to nofollow")
def main():
"""CLI entry point for testing."""
import argparse
parser = argparse.ArgumentParser(description="Page SEO Analyzer")
parser.add_argument("url", help="URL to analyze")
parser.add_argument("--json", "-j", action="store_true", help="Output as JSON")
args = parser.parse_args()
analyzer = PageAnalyzer()
metadata = analyzer.analyze_url(args.url)
if args.json:
print(json.dumps(metadata.to_dict(), indent=2, ensure_ascii=False))
else:
print("=" * 60)
print("PAGE ANALYSIS REPORT")
print("=" * 60)
print(metadata.get_summary())
print()
if metadata.issues:
print("ISSUES:")
for issue in metadata.issues:
print(f"{issue}")
if metadata.warnings:
print("\nWARNINGS:")
for warning in metadata.warnings:
print(f"{warning}")
if metadata.hreflang_tags:
print(f"\nHREFLANG TAGS ({len(metadata.hreflang_tags)}):")
for tag in metadata.hreflang_tags[:5]:
print(f" {tag['lang']}: {tag['url']}")
if metadata.schema_types_found:
print(f"\nSCHEMA TYPES:")
for schema_type in metadata.schema_types_found:
print(f" - {schema_type}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,17 @@
# 10-seo-technical-audit dependencies
# Install: pip install -r requirements.txt
# Web Scraping & Parsing
lxml>=5.1.0
beautifulsoup4>=4.12.0
requests>=2.31.0
aiohttp>=3.9.0
# Async & Retry
tenacity>=8.2.0
tqdm>=4.66.0
# Environment & CLI
python-dotenv>=1.0.0
rich>=13.7.0
typer>=0.9.0

View File

@@ -0,0 +1,540 @@
"""
Robots.txt Checker - Analyze robots.txt configuration
=====================================================
Purpose: Parse and analyze robots.txt for SEO compliance
Python: 3.10+
Usage:
python robots_checker.py --url https://example.com/robots.txt
python robots_checker.py --url https://example.com --test-url /admin/
"""
import argparse
import json
import logging
import re
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
from urllib.parse import urljoin, urlparse
from urllib.robotparser import RobotFileParser
import requests
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
@dataclass
class RobotsIssue:
"""Represents a robots.txt issue."""
severity: str # "error", "warning", "info"
message: str
line_number: int | None = None
directive: str | None = None
suggestion: str | None = None
@dataclass
class UserAgentRules:
"""Rules for a specific user-agent."""
user_agent: str
disallow: list[str] = field(default_factory=list)
allow: list[str] = field(default_factory=list)
crawl_delay: float | None = None
@dataclass
class RobotsResult:
"""Complete robots.txt analysis result."""
url: str
accessible: bool = True
content: str = ""
rules: list[UserAgentRules] = field(default_factory=list)
sitemaps: list[str] = field(default_factory=list)
issues: list[RobotsIssue] = field(default_factory=list)
stats: dict = field(default_factory=dict)
timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
def to_dict(self) -> dict:
"""Convert to dictionary for JSON output."""
return {
"url": self.url,
"accessible": self.accessible,
"sitemaps": self.sitemaps,
"rules": [
{
"user_agent": r.user_agent,
"disallow": r.disallow,
"allow": r.allow,
"crawl_delay": r.crawl_delay,
}
for r in self.rules
],
"issues": [
{
"severity": i.severity,
"message": i.message,
"line_number": i.line_number,
"directive": i.directive,
"suggestion": i.suggestion,
}
for i in self.issues
],
"stats": self.stats,
"timestamp": self.timestamp,
}
class RobotsChecker:
"""Analyze robots.txt configuration."""
# Common user agents
USER_AGENTS = {
"*": "All bots",
"Googlebot": "Google crawler",
"Googlebot-Image": "Google Image crawler",
"Googlebot-News": "Google News crawler",
"Googlebot-Video": "Google Video crawler",
"Bingbot": "Bing crawler",
"Slurp": "Yahoo crawler",
"DuckDuckBot": "DuckDuckGo crawler",
"Baiduspider": "Baidu crawler",
"Yandex": "Yandex crawler",
"facebot": "Facebook crawler",
"Twitterbot": "Twitter crawler",
"LinkedInBot": "LinkedIn crawler",
}
# Paths that should generally not be blocked
IMPORTANT_PATHS = [
"/",
"/*.css",
"/*.js",
"/*.jpg",
"/*.jpeg",
"/*.png",
"/*.gif",
"/*.svg",
"/*.webp",
]
# Paths commonly blocked
COMMON_BLOCKED = [
"/admin",
"/wp-admin",
"/login",
"/private",
"/api",
"/cgi-bin",
"/tmp",
"/search",
]
def __init__(self):
self.session = requests.Session()
self.session.headers.update({
"User-Agent": "Mozilla/5.0 (compatible; SEOAuditBot/1.0)"
})
def fetch_robots(self, url: str) -> str | None:
"""Fetch robots.txt content."""
# Ensure we're fetching robots.txt
parsed = urlparse(url)
if not parsed.path.endswith("robots.txt"):
robots_url = f"{parsed.scheme}://{parsed.netloc}/robots.txt"
else:
robots_url = url
try:
response = self.session.get(robots_url, timeout=10)
if response.status_code == 200:
return response.text
elif response.status_code == 404:
return None
else:
raise RuntimeError(f"HTTP {response.status_code}")
except requests.RequestException as e:
raise RuntimeError(f"Failed to fetch robots.txt: {e}")
def parse_robots(self, content: str) -> tuple[list[UserAgentRules], list[str]]:
"""Parse robots.txt content."""
rules = []
sitemaps = []
current_ua = None
current_rules = None
for line_num, line in enumerate(content.split("\n"), 1):
line = line.strip()
# Skip empty lines and comments
if not line or line.startswith("#"):
continue
# Parse directive
if ":" not in line:
continue
directive, value = line.split(":", 1)
directive = directive.strip().lower()
value = value.strip()
if directive == "user-agent":
# Save previous user-agent rules
if current_rules:
rules.append(current_rules)
current_ua = value
current_rules = UserAgentRules(user_agent=value)
elif directive == "disallow" and current_rules:
if value: # Empty disallow means allow all
current_rules.disallow.append(value)
elif directive == "allow" and current_rules:
if value:
current_rules.allow.append(value)
elif directive == "crawl-delay" and current_rules:
try:
current_rules.crawl_delay = float(value)
except ValueError:
pass
elif directive == "sitemap":
if value:
sitemaps.append(value)
# Don't forget last user-agent
if current_rules:
rules.append(current_rules)
return rules, sitemaps
def analyze(self, url: str) -> RobotsResult:
"""Analyze robots.txt."""
result = RobotsResult(url=url)
# Fetch robots.txt
try:
content = self.fetch_robots(url)
if content is None:
result.accessible = False
result.issues.append(RobotsIssue(
severity="info",
message="No robots.txt found (returns 404)",
suggestion="Consider creating a robots.txt file",
))
return result
except RuntimeError as e:
result.accessible = False
result.issues.append(RobotsIssue(
severity="error",
message=str(e),
))
return result
result.content = content
result.rules, result.sitemaps = self.parse_robots(content)
# Analyze content
self._analyze_syntax(result)
self._analyze_rules(result)
self._analyze_sitemaps(result)
# Calculate stats
result.stats = {
"user_agents_count": len(result.rules),
"user_agents": [r.user_agent for r in result.rules],
"total_disallow_rules": sum(len(r.disallow) for r in result.rules),
"total_allow_rules": sum(len(r.allow) for r in result.rules),
"sitemaps_count": len(result.sitemaps),
"has_crawl_delay": any(r.crawl_delay for r in result.rules),
"content_length": len(content),
}
return result
def _analyze_syntax(self, result: RobotsResult) -> None:
"""Check for syntax issues."""
lines = result.content.split("\n")
for line_num, line in enumerate(lines, 1):
line = line.strip()
# Skip empty lines and comments
if not line or line.startswith("#"):
continue
# Check for valid directive
if ":" not in line:
result.issues.append(RobotsIssue(
severity="warning",
message=f"Invalid line (missing colon): {line[:50]}",
line_number=line_num,
))
continue
directive, value = line.split(":", 1)
directive = directive.strip().lower()
valid_directives = {
"user-agent", "disallow", "allow",
"crawl-delay", "sitemap", "host",
}
if directive not in valid_directives:
result.issues.append(RobotsIssue(
severity="info",
message=f"Unknown directive: {directive}",
line_number=line_num,
directive=directive,
))
def _analyze_rules(self, result: RobotsResult) -> None:
"""Analyze blocking rules."""
# Check if there are any rules
if not result.rules:
result.issues.append(RobotsIssue(
severity="info",
message="No user-agent rules defined",
suggestion="Add User-agent: * rules to control crawling",
))
return
# Check for wildcard rule
has_wildcard = any(r.user_agent == "*" for r in result.rules)
if not has_wildcard:
result.issues.append(RobotsIssue(
severity="info",
message="No wildcard (*) user-agent defined",
suggestion="Consider adding User-agent: * as fallback",
))
# Check for blocking important resources
for rules in result.rules:
for disallow in rules.disallow:
# Check if blocking root
if disallow == "/":
result.issues.append(RobotsIssue(
severity="error",
message=f"Blocking entire site for {rules.user_agent}",
directive=f"Disallow: {disallow}",
suggestion="This will prevent indexing. Is this intentional?",
))
# Check if blocking CSS/JS
if any(ext in disallow.lower() for ext in [".css", ".js"]):
result.issues.append(RobotsIssue(
severity="warning",
message=f"Blocking CSS/JS files for {rules.user_agent}",
directive=f"Disallow: {disallow}",
suggestion="May affect rendering and SEO",
))
# Check for blocking images
if any(ext in disallow.lower() for ext in [".jpg", ".png", ".gif", ".webp"]):
result.issues.append(RobotsIssue(
severity="info",
message=f"Blocking image files for {rules.user_agent}",
directive=f"Disallow: {disallow}",
))
# Check crawl delay
if rules.crawl_delay:
if rules.crawl_delay > 10:
result.issues.append(RobotsIssue(
severity="warning",
message=f"High crawl-delay ({rules.crawl_delay}s) for {rules.user_agent}",
directive=f"Crawl-delay: {rules.crawl_delay}",
suggestion="May significantly slow indexing",
))
elif rules.crawl_delay > 0:
result.issues.append(RobotsIssue(
severity="info",
message=f"Crawl-delay set to {rules.crawl_delay}s for {rules.user_agent}",
))
def _analyze_sitemaps(self, result: RobotsResult) -> None:
"""Analyze sitemap declarations."""
if not result.sitemaps:
result.issues.append(RobotsIssue(
severity="warning",
message="No sitemap declared in robots.txt",
suggestion="Add Sitemap: directive to help crawlers find your sitemap",
))
else:
for sitemap in result.sitemaps:
if not sitemap.startswith("http"):
result.issues.append(RobotsIssue(
severity="warning",
message=f"Sitemap URL should be absolute: {sitemap}",
directive=f"Sitemap: {sitemap}",
))
def test_url(self, robots_url: str, test_path: str,
user_agent: str = "Googlebot") -> dict:
"""Test if a specific URL is allowed."""
# Use Python's built-in parser
rp = RobotFileParser()
# Ensure robots.txt URL
parsed = urlparse(robots_url)
if not parsed.path.endswith("robots.txt"):
robots_txt_url = f"{parsed.scheme}://{parsed.netloc}/robots.txt"
else:
robots_txt_url = robots_url
rp.set_url(robots_txt_url)
try:
rp.read()
except Exception as e:
return {
"path": test_path,
"user_agent": user_agent,
"allowed": None,
"error": str(e),
}
# Build full URL for testing
base_url = f"{parsed.scheme}://{parsed.netloc}"
full_url = urljoin(base_url, test_path)
allowed = rp.can_fetch(user_agent, full_url)
return {
"path": test_path,
"user_agent": user_agent,
"allowed": allowed,
"full_url": full_url,
}
def generate_report(self, result: RobotsResult) -> str:
"""Generate human-readable analysis report."""
lines = [
"=" * 60,
"Robots.txt Analysis Report",
"=" * 60,
f"URL: {result.url}",
f"Accessible: {'Yes' if result.accessible else 'No'}",
f"Timestamp: {result.timestamp}",
"",
]
if result.accessible:
lines.append("Statistics:")
for key, value in result.stats.items():
if key == "user_agents":
lines.append(f" {key}: {', '.join(value) if value else 'None'}")
else:
lines.append(f" {key}: {value}")
lines.append("")
if result.sitemaps:
lines.append(f"Sitemaps ({len(result.sitemaps)}):")
for sitemap in result.sitemaps:
lines.append(f" - {sitemap}")
lines.append("")
if result.rules:
lines.append("Rules Summary:")
for rules in result.rules:
lines.append(f"\n User-agent: {rules.user_agent}")
if rules.disallow:
lines.append(f" Disallow: {len(rules.disallow)} rules")
for d in rules.disallow[:5]:
lines.append(f" - {d}")
if len(rules.disallow) > 5:
lines.append(f" ... and {len(rules.disallow) - 5} more")
if rules.allow:
lines.append(f" Allow: {len(rules.allow)} rules")
for a in rules.allow[:3]:
lines.append(f" - {a}")
if rules.crawl_delay:
lines.append(f" Crawl-delay: {rules.crawl_delay}s")
lines.append("")
if result.issues:
lines.append("Issues Found:")
errors = [i for i in result.issues if i.severity == "error"]
warnings = [i for i in result.issues if i.severity == "warning"]
infos = [i for i in result.issues if i.severity == "info"]
if errors:
lines.append(f"\n ERRORS ({len(errors)}):")
for issue in errors:
lines.append(f" - {issue.message}")
if issue.directive:
lines.append(f" Directive: {issue.directive}")
if issue.suggestion:
lines.append(f" Suggestion: {issue.suggestion}")
if warnings:
lines.append(f"\n WARNINGS ({len(warnings)}):")
for issue in warnings:
lines.append(f" - {issue.message}")
if issue.suggestion:
lines.append(f" Suggestion: {issue.suggestion}")
if infos:
lines.append(f"\n INFO ({len(infos)}):")
for issue in infos:
lines.append(f" - {issue.message}")
lines.append("")
lines.append("=" * 60)
return "\n".join(lines)
def main():
"""Main entry point for CLI usage."""
parser = argparse.ArgumentParser(
description="Analyze robots.txt configuration",
)
parser.add_argument("--url", "-u", required=True,
help="URL to robots.txt or domain")
parser.add_argument("--test-url", "-t",
help="Test if specific URL path is allowed")
parser.add_argument("--user-agent", "-a", default="Googlebot",
help="User agent for testing (default: Googlebot)")
parser.add_argument("--output", "-o", help="Output file for JSON report")
parser.add_argument("--json", action="store_true", help="Output as JSON")
args = parser.parse_args()
checker = RobotsChecker()
if args.test_url:
# Test specific URL
test_result = checker.test_url(args.url, args.test_url, args.user_agent)
if args.json:
print(json.dumps(test_result, indent=2))
else:
status = "ALLOWED" if test_result["allowed"] else "BLOCKED"
print(f"URL: {test_result['path']}")
print(f"User-Agent: {test_result['user_agent']}")
print(f"Status: {status}")
else:
# Full analysis
result = checker.analyze(args.url)
if args.json or args.output:
output = json.dumps(result.to_dict(), ensure_ascii=False, indent=2)
if args.output:
with open(args.output, "w", encoding="utf-8") as f:
f.write(output)
logger.info(f"Report written to {args.output}")
else:
print(output)
else:
print(checker.generate_report(result))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,969 @@
"""
Sitemap Crawler - Sequential page analysis from sitemap
=======================================================
Purpose: Crawl sitemap URLs one by one, analyze each page, save to Notion
Python: 3.10+
Usage:
from sitemap_crawler import SitemapCrawler
crawler = SitemapCrawler()
crawler.crawl_sitemap("https://example.com/sitemap.xml", delay=2.0)
"""
import json
import logging
import time
import xml.etree.ElementTree as ET
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Callable, Generator
from urllib.parse import urlparse
import requests
from notion_client import Client
from base_client import config
from page_analyzer import PageAnalyzer, PageMetadata
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
# Default database for page analysis data
DEFAULT_PAGES_DATABASE_ID = "2c8581e5-8a1e-8035-880b-e38cefc2f3ef"
# Default limits to prevent excessive resource usage
DEFAULT_MAX_PAGES = 500
DEFAULT_DELAY_SECONDS = 2.0
# Progress tracking directory
PROGRESS_DIR = Path.home() / ".claude" / "seo-audit-progress"
PROGRESS_DIR.mkdir(parents=True, exist_ok=True)
@dataclass
class CrawlProgress:
"""Track crawl progress."""
total_urls: int = 0
processed_urls: int = 0
successful_urls: int = 0
failed_urls: int = 0
skipped_urls: int = 0
start_time: datetime = field(default_factory=datetime.now)
current_url: str = ""
audit_id: str = ""
site: str = ""
status: str = "running" # running, completed, failed
error_message: str = ""
summary_page_id: str = ""
def get_progress_percent(self) -> float:
if self.total_urls == 0:
return 0.0
return (self.processed_urls / self.total_urls) * 100
def get_elapsed_time(self) -> str:
elapsed = datetime.now() - self.start_time
minutes = int(elapsed.total_seconds() // 60)
seconds = int(elapsed.total_seconds() % 60)
return f"{minutes}m {seconds}s"
def get_eta(self) -> str:
if self.processed_urls == 0:
return "calculating..."
elapsed = (datetime.now() - self.start_time).total_seconds()
avg_time_per_url = elapsed / self.processed_urls
remaining_urls = self.total_urls - self.processed_urls
eta_seconds = remaining_urls * avg_time_per_url
minutes = int(eta_seconds // 60)
seconds = int(eta_seconds % 60)
return f"{minutes}m {seconds}s"
def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization."""
return {
"audit_id": self.audit_id,
"site": self.site,
"status": self.status,
"total_urls": self.total_urls,
"processed_urls": self.processed_urls,
"successful_urls": self.successful_urls,
"failed_urls": self.failed_urls,
"progress_percent": round(self.get_progress_percent(), 1),
"elapsed_time": self.get_elapsed_time(),
"eta": self.get_eta(),
"current_url": self.current_url,
"start_time": self.start_time.isoformat(),
"error_message": self.error_message,
"summary_page_id": self.summary_page_id,
"updated_at": datetime.now().isoformat(),
}
def save_to_file(self, filepath: Path | None = None) -> Path:
"""Save progress to JSON file."""
if filepath is None:
filepath = PROGRESS_DIR / f"{self.audit_id}.json"
with open(filepath, "w") as f:
json.dump(self.to_dict(), f, indent=2)
return filepath
@classmethod
def load_from_file(cls, filepath: Path) -> "CrawlProgress":
"""Load progress from JSON file."""
with open(filepath, "r") as f:
data = json.load(f)
progress = cls()
progress.audit_id = data.get("audit_id", "")
progress.site = data.get("site", "")
progress.status = data.get("status", "unknown")
progress.total_urls = data.get("total_urls", 0)
progress.processed_urls = data.get("processed_urls", 0)
progress.successful_urls = data.get("successful_urls", 0)
progress.failed_urls = data.get("failed_urls", 0)
progress.current_url = data.get("current_url", "")
progress.error_message = data.get("error_message", "")
progress.summary_page_id = data.get("summary_page_id", "")
if data.get("start_time"):
progress.start_time = datetime.fromisoformat(data["start_time"])
return progress
def get_active_crawls() -> list[CrawlProgress]:
"""Get all active (running) crawl jobs."""
active = []
for filepath in PROGRESS_DIR.glob("*.json"):
try:
progress = CrawlProgress.load_from_file(filepath)
if progress.status == "running":
active.append(progress)
except Exception:
continue
return active
def get_all_crawls() -> list[CrawlProgress]:
"""Get all crawl jobs (active and completed)."""
crawls = []
for filepath in sorted(PROGRESS_DIR.glob("*.json"), reverse=True):
try:
progress = CrawlProgress.load_from_file(filepath)
crawls.append(progress)
except Exception:
continue
return crawls
def get_crawl_status(audit_id: str) -> CrawlProgress | None:
"""Get status of a specific crawl by audit ID."""
filepath = PROGRESS_DIR / f"{audit_id}.json"
if filepath.exists():
return CrawlProgress.load_from_file(filepath)
return None
@dataclass
class CrawlResult:
"""Result of a complete sitemap crawl."""
site: str
sitemap_url: str
audit_id: str
total_pages: int
successful_pages: int
failed_pages: int
start_time: datetime
end_time: datetime
pages_analyzed: list[PageMetadata] = field(default_factory=list)
notion_page_ids: list[str] = field(default_factory=list)
summary_page_id: str | None = None
def get_duration(self) -> str:
duration = self.end_time - self.start_time
minutes = int(duration.total_seconds() // 60)
seconds = int(duration.total_seconds() % 60)
return f"{minutes}m {seconds}s"
class SitemapCrawler:
"""Crawl sitemap URLs and analyze each page."""
def __init__(
self,
notion_token: str | None = None,
database_id: str | None = None,
):
"""
Initialize sitemap crawler.
Args:
notion_token: Notion API token
database_id: Notion database ID for storing results
"""
self.notion_token = notion_token or config.notion_token
self.database_id = database_id or DEFAULT_PAGES_DATABASE_ID
self.analyzer = PageAnalyzer()
if self.notion_token:
self.notion = Client(auth=self.notion_token)
else:
self.notion = None
logger.warning("Notion token not configured, results will not be saved")
def fetch_sitemap_urls(self, sitemap_url: str) -> list[str]:
"""
Fetch and parse URLs from a sitemap.
Args:
sitemap_url: URL of the sitemap
Returns:
List of URLs found in the sitemap
"""
try:
response = requests.get(sitemap_url, timeout=30)
response.raise_for_status()
# Parse XML
root = ET.fromstring(response.content)
# Handle namespace
namespaces = {
"sm": "http://www.sitemaps.org/schemas/sitemap/0.9"
}
urls = []
# Check if this is a sitemap index
sitemap_tags = root.findall(".//sm:sitemap/sm:loc", namespaces)
if sitemap_tags:
# This is a sitemap index, recursively fetch child sitemaps
logger.info(f"Found sitemap index with {len(sitemap_tags)} child sitemaps")
for loc in sitemap_tags:
if loc.text:
child_urls = self.fetch_sitemap_urls(loc.text)
urls.extend(child_urls)
else:
# Regular sitemap, extract URLs
url_tags = root.findall(".//sm:url/sm:loc", namespaces)
if not url_tags:
# Try without namespace
url_tags = root.findall(".//url/loc")
for loc in url_tags:
if loc.text:
urls.append(loc.text)
# Remove duplicates while preserving order
seen = set()
unique_urls = []
for url in urls:
if url not in seen:
seen.add(url)
unique_urls.append(url)
logger.info(f"Found {len(unique_urls)} unique URLs in sitemap")
return unique_urls
except Exception as e:
logger.error(f"Failed to fetch sitemap: {e}")
raise
def crawl_sitemap(
self,
sitemap_url: str,
delay: float = DEFAULT_DELAY_SECONDS,
max_pages: int = DEFAULT_MAX_PAGES,
progress_callback: Callable[[CrawlProgress], None] | None = None,
save_to_notion: bool = True,
url_filter: Callable[[str], bool] | None = None,
) -> CrawlResult:
"""
Crawl all URLs in a sitemap sequentially.
Args:
sitemap_url: URL of the sitemap
delay: Seconds to wait between requests (default: 2.0s)
max_pages: Maximum number of pages to process (default: 500)
progress_callback: Function called with progress updates
save_to_notion: Whether to save results to Notion
url_filter: Optional function to filter URLs (return True to include)
Returns:
CrawlResult with all analyzed pages
"""
# Parse site info
parsed_sitemap = urlparse(sitemap_url)
site = f"{parsed_sitemap.scheme}://{parsed_sitemap.netloc}"
site_domain = parsed_sitemap.netloc
# Generate audit ID
audit_id = f"{site_domain}-pages-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
logger.info(f"Starting sitemap crawl: {sitemap_url}")
logger.info(f"Audit ID: {audit_id}")
logger.info(f"Delay between requests: {delay}s")
# Initialize progress tracking
progress = CrawlProgress(
audit_id=audit_id,
site=site,
status="running",
)
# Fetch URLs
urls = self.fetch_sitemap_urls(sitemap_url)
# Apply URL filter if provided
if url_filter:
urls = [url for url in urls if url_filter(url)]
logger.info(f"After filtering: {len(urls)} URLs")
# Apply max pages limit (default: 500 to prevent excessive resource usage)
if len(urls) > max_pages:
logger.warning(f"Sitemap has {len(urls)} URLs, limiting to {max_pages} pages")
logger.warning(f"Use max_pages parameter to adjust this limit")
urls = urls[:max_pages]
logger.info(f"Processing {len(urls)} pages (max: {max_pages})")
# Update progress with total URLs
progress.total_urls = len(urls)
progress.save_to_file()
# Initialize result
result = CrawlResult(
site=site,
sitemap_url=sitemap_url,
audit_id=audit_id,
total_pages=len(urls),
successful_pages=0,
failed_pages=0,
start_time=datetime.now(),
end_time=datetime.now(),
)
# Process each URL
try:
for i, url in enumerate(urls):
progress.current_url = url
progress.processed_urls = i
progress.save_to_file() # Save progress to file
if progress_callback:
progress_callback(progress)
logger.info(f"[{i+1}/{len(urls)}] Analyzing: {url}")
try:
# Analyze page
metadata = self.analyzer.analyze_url(url)
result.pages_analyzed.append(metadata)
if metadata.status_code == 200:
progress.successful_urls += 1
result.successful_pages += 1
# Save to Notion
if save_to_notion and self.notion:
page_id = self._save_page_to_notion(metadata, audit_id, site)
if page_id:
result.notion_page_ids.append(page_id)
else:
progress.failed_urls += 1
result.failed_pages += 1
except Exception as e:
logger.error(f"Failed to analyze {url}: {e}")
progress.failed_urls += 1
result.failed_pages += 1
# Wait before next request
if i < len(urls) - 1: # Don't wait after last URL
time.sleep(delay)
# Final progress update
progress.processed_urls = len(urls)
progress.status = "completed"
if progress_callback:
progress_callback(progress)
except Exception as e:
progress.status = "failed"
progress.error_message = str(e)
progress.save_to_file()
raise
# Update result
result.end_time = datetime.now()
# Create summary page
if save_to_notion and self.notion:
summary_id = self._create_crawl_summary_page(result)
result.summary_page_id = summary_id
progress.summary_page_id = summary_id
# Save final progress
progress.save_to_file()
logger.info(f"Crawl complete: {result.successful_pages}/{result.total_pages} pages analyzed")
logger.info(f"Duration: {result.get_duration()}")
return result
def _save_page_to_notion(
self,
metadata: PageMetadata,
audit_id: str,
site: str,
) -> str | None:
"""Save page metadata to Notion database."""
try:
# Build properties
properties = {
"Issue": {"title": [{"text": {"content": f"📄 {metadata.url}"}}]},
"Category": {"select": {"name": "On-page SEO"}},
"Priority": {"select": {"name": self._determine_priority(metadata)}},
"Site": {"url": site},
"URL": {"url": metadata.url},
"Audit ID": {"rich_text": [{"text": {"content": audit_id}}]},
"Found Date": {"date": {"start": datetime.now().strftime("%Y-%m-%d")}},
}
# Build page content
children = self._build_page_content(metadata)
response = self.notion.pages.create(
parent={"database_id": self.database_id},
properties=properties,
children=children,
)
return response["id"]
except Exception as e:
logger.error(f"Failed to save to Notion: {e}")
return None
def _determine_priority(self, metadata: PageMetadata) -> str:
"""Determine priority based on issues found."""
if len(metadata.issues) >= 3:
return "High"
elif len(metadata.issues) >= 1:
return "Medium"
elif len(metadata.warnings) >= 3:
return "Medium"
else:
return "Low"
def _build_page_content(self, metadata: PageMetadata) -> list[dict]:
"""Build Notion page content blocks from metadata."""
children = []
# Status summary callout
status_emoji = "" if not metadata.issues else "⚠️" if len(metadata.issues) < 3 else ""
children.append({
"object": "block",
"type": "callout",
"callout": {
"rich_text": [
{"type": "text", "text": {"content": f"Status: {metadata.status_code} | "}},
{"type": "text", "text": {"content": f"Response: {metadata.response_time_ms:.0f}ms | "}},
{"type": "text", "text": {"content": f"Issues: {len(metadata.issues)} | "}},
{"type": "text", "text": {"content": f"Warnings: {len(metadata.warnings)}"}},
],
"icon": {"type": "emoji", "emoji": status_emoji},
"color": "gray_background" if not metadata.issues else "yellow_background" if len(metadata.issues) < 3 else "red_background",
}
})
# Meta Tags Section
children.append({
"object": "block",
"type": "heading_2",
"heading_2": {"rich_text": [{"type": "text", "text": {"content": "Meta Tags"}}]}
})
# Meta tags table
meta_rows = [
{"type": "table_row", "table_row": {"cells": [
[{"type": "text", "text": {"content": "Tag"}, "annotations": {"bold": True}}],
[{"type": "text", "text": {"content": "Value"}, "annotations": {"bold": True}}],
[{"type": "text", "text": {"content": "Status"}, "annotations": {"bold": True}}],
]}},
{"type": "table_row", "table_row": {"cells": [
[{"type": "text", "text": {"content": "Title"}}],
[{"type": "text", "text": {"content": (metadata.title or "")[:50]}}],
[{"type": "text", "text": {"content": f"{metadata.title_length} chars" if metadata.title else "✗ Missing"}}],
]}},
{"type": "table_row", "table_row": {"cells": [
[{"type": "text", "text": {"content": "Description"}}],
[{"type": "text", "text": {"content": (metadata.meta_description or "")[:50]}}],
[{"type": "text", "text": {"content": f"{metadata.meta_description_length} chars" if metadata.meta_description else "✗ Missing"}}],
]}},
{"type": "table_row", "table_row": {"cells": [
[{"type": "text", "text": {"content": "Canonical"}}],
[{"type": "text", "text": {"content": (metadata.canonical_url or "")[:50]}}],
[{"type": "text", "text": {"content": "" if metadata.canonical_url else "✗ Missing"}}],
]}},
{"type": "table_row", "table_row": {"cells": [
[{"type": "text", "text": {"content": "Robots"}}],
[{"type": "text", "text": {"content": metadata.robots_meta or ""}}],
[{"type": "text", "text": {"content": "" if metadata.robots_meta else ""}}],
]}},
{"type": "table_row", "table_row": {"cells": [
[{"type": "text", "text": {"content": "Lang"}}],
[{"type": "text", "text": {"content": metadata.html_lang or ""}}],
[{"type": "text", "text": {"content": "" if metadata.html_lang else ""}}],
]}},
]
children.append({
"object": "block",
"type": "table",
"table": {
"table_width": 3,
"has_column_header": True,
"has_row_header": False,
"children": meta_rows
}
})
# Headings Section
children.append({
"object": "block",
"type": "heading_2",
"heading_2": {"rich_text": [{"type": "text", "text": {"content": "Headings"}}]}
})
children.append({
"object": "block",
"type": "paragraph",
"paragraph": {"rich_text": [
{"type": "text", "text": {"content": f"H1: {metadata.h1_count} | "}},
{"type": "text", "text": {"content": f"Total headings: {len(metadata.headings)}"}},
]}
})
if metadata.h1_text:
children.append({
"object": "block",
"type": "quote",
"quote": {"rich_text": [{"type": "text", "text": {"content": metadata.h1_text[:200]}}]}
})
# Schema Data Section
children.append({
"object": "block",
"type": "heading_2",
"heading_2": {"rich_text": [{"type": "text", "text": {"content": "Structured Data"}}]}
})
if metadata.schema_types_found:
children.append({
"object": "block",
"type": "paragraph",
"paragraph": {"rich_text": [
{"type": "text", "text": {"content": "Schema types found: "}},
{"type": "text", "text": {"content": ", ".join(metadata.schema_types_found)}, "annotations": {"code": True}},
]}
})
else:
children.append({
"object": "block",
"type": "callout",
"callout": {
"rich_text": [{"type": "text", "text": {"content": "No structured data found on this page"}}],
"icon": {"type": "emoji", "emoji": "⚠️"},
"color": "yellow_background",
}
})
# Open Graph Section
children.append({
"object": "block",
"type": "heading_2",
"heading_2": {"rich_text": [{"type": "text", "text": {"content": "Open Graph"}}]}
})
og = metadata.open_graph
og_status = "✓ Configured" if og.og_title else "✗ Missing"
children.append({
"object": "block",
"type": "paragraph",
"paragraph": {"rich_text": [
{"type": "text", "text": {"content": f"Status: {og_status}\n"}},
{"type": "text", "text": {"content": f"og:title: {og.og_title or ''}\n"}},
{"type": "text", "text": {"content": f"og:type: {og.og_type or ''}"}},
]}
})
# Links Section
children.append({
"object": "block",
"type": "heading_2",
"heading_2": {"rich_text": [{"type": "text", "text": {"content": "Links"}}]}
})
children.append({
"object": "block",
"type": "paragraph",
"paragraph": {"rich_text": [
{"type": "text", "text": {"content": f"Internal links: {metadata.internal_link_count}\n"}},
{"type": "text", "text": {"content": f"External links: {metadata.external_link_count}"}},
]}
})
# Images Section
children.append({
"object": "block",
"type": "heading_2",
"heading_2": {"rich_text": [{"type": "text", "text": {"content": "Images"}}]}
})
children.append({
"object": "block",
"type": "paragraph",
"paragraph": {"rich_text": [
{"type": "text", "text": {"content": f"Total: {metadata.images_total} | "}},
{"type": "text", "text": {"content": f"With alt: {metadata.images_with_alt} | "}},
{"type": "text", "text": {"content": f"Without alt: {metadata.images_without_alt}"}},
]}
})
# Hreflang Section (if present)
if metadata.hreflang_tags:
children.append({
"object": "block",
"type": "heading_2",
"heading_2": {"rich_text": [{"type": "text", "text": {"content": "Hreflang Tags"}}]}
})
for tag in metadata.hreflang_tags[:10]:
children.append({
"object": "block",
"type": "bulleted_list_item",
"bulleted_list_item": {"rich_text": [
{"type": "text", "text": {"content": f"{tag['lang']}: "}},
{"type": "text", "text": {"content": tag['url'], "link": {"url": tag['url']}}},
]}
})
# Issues & Warnings Section
if metadata.issues or metadata.warnings:
children.append({
"object": "block",
"type": "heading_2",
"heading_2": {"rich_text": [{"type": "text", "text": {"content": "Issues & Warnings"}}]}
})
for issue in metadata.issues:
children.append({
"object": "block",
"type": "to_do",
"to_do": {
"rich_text": [
{"type": "text", "text": {"content": ""}, "annotations": {"bold": True}},
{"type": "text", "text": {"content": issue}},
],
"checked": False,
}
})
for warning in metadata.warnings:
children.append({
"object": "block",
"type": "to_do",
"to_do": {
"rich_text": [
{"type": "text", "text": {"content": "⚠️ "}, "annotations": {"bold": True}},
{"type": "text", "text": {"content": warning}},
],
"checked": False,
}
})
return children
def _create_crawl_summary_page(self, result: CrawlResult) -> str | None:
"""Create a summary page for the crawl."""
try:
site_domain = urlparse(result.site).netloc
# Calculate statistics
total_issues = sum(len(p.issues) for p in result.pages_analyzed)
total_warnings = sum(len(p.warnings) for p in result.pages_analyzed)
pages_with_issues = sum(1 for p in result.pages_analyzed if p.issues)
pages_without_schema = sum(1 for p in result.pages_analyzed if not p.schema_types_found)
pages_without_description = sum(1 for p in result.pages_analyzed if not p.meta_description)
children = []
# Header callout
children.append({
"object": "block",
"type": "callout",
"callout": {
"rich_text": [
{"type": "text", "text": {"content": f"Sitemap Crawl Complete\n\n"}},
{"type": "text", "text": {"content": f"Audit ID: {result.audit_id}\n"}},
{"type": "text", "text": {"content": f"Duration: {result.get_duration()}\n"}},
{"type": "text", "text": {"content": f"Pages: {result.successful_pages}/{result.total_pages}"}},
],
"icon": {"type": "emoji", "emoji": "📊"},
"color": "blue_background",
}
})
# Statistics table
children.append({
"object": "block",
"type": "heading_2",
"heading_2": {"rich_text": [{"type": "text", "text": {"content": "Statistics"}}]}
})
stats_rows = [
{"type": "table_row", "table_row": {"cells": [
[{"type": "text", "text": {"content": "Metric"}, "annotations": {"bold": True}}],
[{"type": "text", "text": {"content": "Count"}, "annotations": {"bold": True}}],
]}},
{"type": "table_row", "table_row": {"cells": [
[{"type": "text", "text": {"content": "Total Pages"}}],
[{"type": "text", "text": {"content": str(result.total_pages)}}],
]}},
{"type": "table_row", "table_row": {"cells": [
[{"type": "text", "text": {"content": "Successfully Analyzed"}}],
[{"type": "text", "text": {"content": str(result.successful_pages)}}],
]}},
{"type": "table_row", "table_row": {"cells": [
[{"type": "text", "text": {"content": "Pages with Issues"}}],
[{"type": "text", "text": {"content": str(pages_with_issues)}}],
]}},
{"type": "table_row", "table_row": {"cells": [
[{"type": "text", "text": {"content": "Total Issues"}}],
[{"type": "text", "text": {"content": str(total_issues)}}],
]}},
{"type": "table_row", "table_row": {"cells": [
[{"type": "text", "text": {"content": "Total Warnings"}}],
[{"type": "text", "text": {"content": str(total_warnings)}}],
]}},
{"type": "table_row", "table_row": {"cells": [
[{"type": "text", "text": {"content": "Pages without Schema"}}],
[{"type": "text", "text": {"content": str(pages_without_schema)}}],
]}},
{"type": "table_row", "table_row": {"cells": [
[{"type": "text", "text": {"content": "Pages without Description"}}],
[{"type": "text", "text": {"content": str(pages_without_description)}}],
]}},
]
children.append({
"object": "block",
"type": "table",
"table": {
"table_width": 2,
"has_column_header": True,
"has_row_header": False,
"children": stats_rows
}
})
# Pages list
children.append({
"object": "block",
"type": "heading_2",
"heading_2": {"rich_text": [{"type": "text", "text": {"content": "Analyzed Pages"}}]}
})
children.append({
"object": "block",
"type": "paragraph",
"paragraph": {"rich_text": [
{"type": "text", "text": {"content": f"Filter by Audit ID in the database to see all {result.successful_pages} page entries."}}
]}
})
# Create the summary page
response = self.notion.pages.create(
parent={"database_id": self.database_id},
properties={
"Issue": {"title": [{"text": {"content": f"📊 Sitemap Crawl: {site_domain}"}}]},
"Category": {"select": {"name": "Technical SEO"}},
"Priority": {"select": {"name": "High"}},
"Site": {"url": result.site},
"Audit ID": {"rich_text": [{"text": {"content": result.audit_id}}]},
"Found Date": {"date": {"start": datetime.now().strftime("%Y-%m-%d")}},
},
children=children,
)
logger.info(f"Created crawl summary page: {response['id']}")
return response["id"]
except Exception as e:
logger.error(f"Failed to create summary page: {e}")
return None
def print_progress_status(progress: CrawlProgress) -> None:
"""Print formatted progress status."""
status_emoji = {
"running": "🔄",
"completed": "",
"failed": "",
}.get(progress.status, "")
print(f"""
{'=' * 60}
{status_emoji} SEO Page Analysis - {progress.status.upper()}
{'=' * 60}
Audit ID: {progress.audit_id}
Site: {progress.site}
Status: {progress.status}
Progress: {progress.processed_urls}/{progress.total_urls} pages ({progress.get_progress_percent():.1f}%)
Successful: {progress.successful_urls}
Failed: {progress.failed_urls}
Elapsed: {progress.get_elapsed_time()}
ETA: {progress.get_eta() if progress.status == 'running' else 'N/A'}
Current URL: {progress.current_url[:60] + '...' if len(progress.current_url) > 60 else progress.current_url}
""")
if progress.summary_page_id:
print(f"Summary: https://www.notion.so/{progress.summary_page_id.replace('-', '')}")
if progress.error_message:
print(f"Error: {progress.error_message}")
print("=" * 60)
def main():
"""CLI entry point."""
import argparse
parser = argparse.ArgumentParser(description="Sitemap Crawler with Background Support")
subparsers = parser.add_subparsers(dest="command", help="Commands")
# Crawl command
crawl_parser = subparsers.add_parser("crawl", help="Start crawling a sitemap")
crawl_parser.add_argument("sitemap_url", help="URL of the sitemap to crawl")
crawl_parser.add_argument("--delay", "-d", type=float, default=DEFAULT_DELAY_SECONDS,
help=f"Delay between requests in seconds (default: {DEFAULT_DELAY_SECONDS})")
crawl_parser.add_argument("--max-pages", "-m", type=int, default=DEFAULT_MAX_PAGES,
help=f"Maximum pages to process (default: {DEFAULT_MAX_PAGES})")
crawl_parser.add_argument("--no-notion", action="store_true",
help="Don't save to Notion")
crawl_parser.add_argument("--no-limit", action="store_true",
help="Remove page limit (use with caution)")
# Status command
status_parser = subparsers.add_parser("status", help="Check crawl progress")
status_parser.add_argument("audit_id", nargs="?", help="Specific audit ID to check (optional)")
status_parser.add_argument("--all", "-a", action="store_true", help="Show all crawls (not just active)")
# List command
list_parser = subparsers.add_parser("list", help="List all crawl jobs")
args = parser.parse_args()
# Default to crawl if no command specified but URL provided
if args.command is None:
# Check if first positional arg looks like a URL
import sys
if len(sys.argv) > 1 and (sys.argv[1].startswith("http") or sys.argv[1].endswith(".xml")):
args.command = "crawl"
args.sitemap_url = sys.argv[1]
args.delay = DEFAULT_DELAY_SECONDS
args.max_pages = DEFAULT_MAX_PAGES
args.no_notion = False
args.no_limit = False
else:
parser.print_help()
return
if args.command == "status":
if args.audit_id:
# Show specific crawl status
progress = get_crawl_status(args.audit_id)
if progress:
print_progress_status(progress)
else:
print(f"No crawl found with audit ID: {args.audit_id}")
else:
# Show active crawls
if args.all:
crawls = get_all_crawls()
label = "All"
else:
crawls = get_active_crawls()
label = "Active"
if crawls:
print(f"\n{label} Crawl Jobs ({len(crawls)}):")
print("-" * 60)
for p in crawls:
status_emoji = {"running": "🔄", "completed": "", "failed": ""}.get(p.status, "")
print(f"{status_emoji} {p.audit_id}")
print(f" Site: {p.site}")
print(f" Progress: {p.processed_urls}/{p.total_urls} ({p.get_progress_percent():.1f}%)")
print()
else:
print(f"No {label.lower()} crawl jobs found.")
return
if args.command == "list":
crawls = get_all_crawls()
if crawls:
print(f"\nAll Crawl Jobs ({len(crawls)}):")
print("-" * 80)
print(f"{'Status':<10} {'Audit ID':<45} {'Progress':<15}")
print("-" * 80)
for p in crawls[:20]: # Show last 20
status_emoji = {"running": "🔄", "completed": "", "failed": ""}.get(p.status, "")
progress_str = f"{p.processed_urls}/{p.total_urls}"
print(f"{status_emoji} {p.status:<7} {p.audit_id:<45} {progress_str:<15}")
if len(crawls) > 20:
print(f"... and {len(crawls) - 20} more")
else:
print("No crawl jobs found.")
return
if args.command == "crawl":
# Handle --no-limit option
max_pages = args.max_pages
if args.no_limit:
max_pages = 999999 # Effectively unlimited
print("⚠️ WARNING: Page limit disabled. This may take a very long time!")
def progress_callback(progress: CrawlProgress):
pct = progress.get_progress_percent()
print(f"\r[{pct:5.1f}%] {progress.processed_urls}/{progress.total_urls} pages | "
f"Success: {progress.successful_urls} | Failed: {progress.failed_urls} | "
f"ETA: {progress.get_eta()}", end="", flush=True)
crawler = SitemapCrawler()
result = crawler.crawl_sitemap(
args.sitemap_url,
delay=args.delay,
max_pages=max_pages,
progress_callback=progress_callback,
save_to_notion=not args.no_notion,
)
print() # New line after progress
print()
print("=" * 60)
print("CRAWL COMPLETE")
print("=" * 60)
print(f"Audit ID: {result.audit_id}")
print(f"Total Pages: {result.total_pages}")
print(f"Successful: {result.successful_pages}")
print(f"Failed: {result.failed_pages}")
print(f"Duration: {result.get_duration()}")
if result.summary_page_id:
print(f"Summary Page: https://www.notion.so/{result.summary_page_id.replace('-', '')}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,467 @@
"""
Sitemap Validator - Validate XML sitemaps
==========================================
Purpose: Parse and validate XML sitemaps for SEO compliance
Python: 3.10+
Usage:
python sitemap_validator.py --url https://example.com/sitemap.xml
"""
import argparse
import asyncio
import gzip
import json
import logging
import re
from dataclasses import dataclass, field
from datetime import datetime
from io import BytesIO
from typing import Any
from urllib.parse import urljoin, urlparse
import aiohttp
import requests
from lxml import etree
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
@dataclass
class SitemapIssue:
"""Represents a sitemap validation issue."""
severity: str # "error", "warning", "info"
message: str
url: str | None = None
suggestion: str | None = None
@dataclass
class SitemapEntry:
"""Represents a single URL entry in sitemap."""
loc: str
lastmod: str | None = None
changefreq: str | None = None
priority: float | None = None
status_code: int | None = None
@dataclass
class SitemapResult:
"""Complete sitemap validation result."""
url: str
sitemap_type: str # "urlset" or "sitemapindex"
entries: list[SitemapEntry] = field(default_factory=list)
child_sitemaps: list[str] = field(default_factory=list)
issues: list[SitemapIssue] = field(default_factory=list)
valid: bool = True
stats: dict = field(default_factory=dict)
timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
def to_dict(self) -> dict:
"""Convert to dictionary for JSON output."""
return {
"url": self.url,
"sitemap_type": self.sitemap_type,
"valid": self.valid,
"stats": self.stats,
"issues": [
{
"severity": i.severity,
"message": i.message,
"url": i.url,
"suggestion": i.suggestion,
}
for i in self.issues
],
"entries_count": len(self.entries),
"child_sitemaps": self.child_sitemaps,
"timestamp": self.timestamp,
}
class SitemapValidator:
"""Validate XML sitemaps."""
SITEMAP_NS = "http://www.sitemaps.org/schemas/sitemap/0.9"
MAX_URLS = 50000
MAX_SIZE_BYTES = 50 * 1024 * 1024 # 50MB
VALID_CHANGEFREQ = {
"always", "hourly", "daily", "weekly",
"monthly", "yearly", "never"
}
def __init__(self, check_urls: bool = False, max_concurrent: int = 10):
self.check_urls = check_urls
self.max_concurrent = max_concurrent
self.session = requests.Session()
self.session.headers.update({
"User-Agent": "Mozilla/5.0 (compatible; SEOAuditBot/1.0)"
})
def fetch_sitemap(self, url: str) -> tuple[bytes, bool]:
"""Fetch sitemap content, handling gzip compression."""
try:
response = self.session.get(url, timeout=30)
response.raise_for_status()
content = response.content
is_gzipped = False
# Check if gzipped
if url.endswith(".gz") or response.headers.get(
"Content-Encoding"
) == "gzip":
try:
content = gzip.decompress(content)
is_gzipped = True
except gzip.BadGzipFile:
pass
return content, is_gzipped
except requests.RequestException as e:
raise RuntimeError(f"Failed to fetch sitemap: {e}")
def parse_sitemap(self, content: bytes) -> tuple[str, list[dict]]:
"""Parse sitemap XML content."""
try:
root = etree.fromstring(content)
except etree.XMLSyntaxError as e:
raise ValueError(f"Invalid XML: {e}")
# Remove namespace for easier parsing
nsmap = {"sm": self.SITEMAP_NS}
# Check if it's a sitemap index or urlset
if root.tag == f"{{{self.SITEMAP_NS}}}sitemapindex":
sitemap_type = "sitemapindex"
entries = []
for sitemap in root.findall("sm:sitemap", nsmap):
entry = {}
loc = sitemap.find("sm:loc", nsmap)
if loc is not None and loc.text:
entry["loc"] = loc.text.strip()
lastmod = sitemap.find("sm:lastmod", nsmap)
if lastmod is not None and lastmod.text:
entry["lastmod"] = lastmod.text.strip()
if entry.get("loc"):
entries.append(entry)
elif root.tag == f"{{{self.SITEMAP_NS}}}urlset":
sitemap_type = "urlset"
entries = []
for url in root.findall("sm:url", nsmap):
entry = {}
loc = url.find("sm:loc", nsmap)
if loc is not None and loc.text:
entry["loc"] = loc.text.strip()
lastmod = url.find("sm:lastmod", nsmap)
if lastmod is not None and lastmod.text:
entry["lastmod"] = lastmod.text.strip()
changefreq = url.find("sm:changefreq", nsmap)
if changefreq is not None and changefreq.text:
entry["changefreq"] = changefreq.text.strip().lower()
priority = url.find("sm:priority", nsmap)
if priority is not None and priority.text:
try:
entry["priority"] = float(priority.text.strip())
except ValueError:
entry["priority"] = None
if entry.get("loc"):
entries.append(entry)
else:
raise ValueError(f"Unknown sitemap type: {root.tag}")
return sitemap_type, entries
def validate(self, url: str) -> SitemapResult:
"""Validate a sitemap URL."""
result = SitemapResult(url=url, sitemap_type="unknown")
# Fetch sitemap
try:
content, is_gzipped = self.fetch_sitemap(url)
except RuntimeError as e:
result.issues.append(SitemapIssue(
severity="error",
message=str(e),
url=url,
))
result.valid = False
return result
# Check size
if len(content) > self.MAX_SIZE_BYTES:
result.issues.append(SitemapIssue(
severity="error",
message=f"Sitemap exceeds 50MB limit ({len(content) / 1024 / 1024:.2f}MB)",
url=url,
suggestion="Split sitemap into smaller files using sitemap index",
))
# Parse XML
try:
sitemap_type, entries = self.parse_sitemap(content)
except ValueError as e:
result.issues.append(SitemapIssue(
severity="error",
message=str(e),
url=url,
))
result.valid = False
return result
result.sitemap_type = sitemap_type
# Process entries
if sitemap_type == "sitemapindex":
result.child_sitemaps = [e["loc"] for e in entries]
result.stats = {
"child_sitemaps_count": len(entries),
}
else:
# Validate URL entries
url_count = len(entries)
result.stats["url_count"] = url_count
if url_count > self.MAX_URLS:
result.issues.append(SitemapIssue(
severity="error",
message=f"Sitemap exceeds 50,000 URL limit ({url_count} URLs)",
url=url,
suggestion="Split into multiple sitemaps with sitemap index",
))
if url_count == 0:
result.issues.append(SitemapIssue(
severity="warning",
message="Sitemap is empty (no URLs)",
url=url,
))
# Validate individual entries
seen_urls = set()
invalid_lastmod = 0
invalid_changefreq = 0
invalid_priority = 0
for entry in entries:
loc = entry.get("loc", "")
# Check for duplicates
if loc in seen_urls:
result.issues.append(SitemapIssue(
severity="warning",
message="Duplicate URL in sitemap",
url=loc,
))
seen_urls.add(loc)
# Validate lastmod format
lastmod = entry.get("lastmod")
if lastmod:
if not self._validate_date(lastmod):
invalid_lastmod += 1
# Validate changefreq
changefreq = entry.get("changefreq")
if changefreq and changefreq not in self.VALID_CHANGEFREQ:
invalid_changefreq += 1
# Validate priority
priority = entry.get("priority")
if priority is not None:
if not (0.0 <= priority <= 1.0):
invalid_priority += 1
# Create entry object
result.entries.append(SitemapEntry(
loc=loc,
lastmod=lastmod,
changefreq=changefreq,
priority=priority,
))
# Add summary issues
if invalid_lastmod > 0:
result.issues.append(SitemapIssue(
severity="warning",
message=f"{invalid_lastmod} URLs with invalid lastmod format",
suggestion="Use ISO 8601 format (YYYY-MM-DD or YYYY-MM-DDTHH:MM:SS+TZ)",
))
if invalid_changefreq > 0:
result.issues.append(SitemapIssue(
severity="info",
message=f"{invalid_changefreq} URLs with invalid changefreq",
suggestion="Use: always, hourly, daily, weekly, monthly, yearly, never",
))
if invalid_priority > 0:
result.issues.append(SitemapIssue(
severity="warning",
message=f"{invalid_priority} URLs with invalid priority (must be 0.0-1.0)",
))
result.stats.update({
"invalid_lastmod": invalid_lastmod,
"invalid_changefreq": invalid_changefreq,
"invalid_priority": invalid_priority,
"has_lastmod": sum(1 for e in result.entries if e.lastmod),
"has_changefreq": sum(1 for e in result.entries if e.changefreq),
"has_priority": sum(1 for e in result.entries if e.priority is not None),
})
# Check URLs if requested
if self.check_urls and result.entries:
asyncio.run(self._check_url_status(result))
# Determine validity
result.valid = not any(i.severity == "error" for i in result.issues)
return result
def _validate_date(self, date_str: str) -> bool:
"""Validate ISO 8601 date format."""
patterns = [
r"^\d{4}-\d{2}-\d{2}$",
r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}",
]
return any(re.match(p, date_str) for p in patterns)
async def _check_url_status(self, result: SitemapResult) -> None:
"""Check HTTP status of URLs in sitemap."""
semaphore = asyncio.Semaphore(self.max_concurrent)
async def check_url(entry: SitemapEntry) -> None:
async with semaphore:
try:
async with aiohttp.ClientSession() as session:
async with session.head(
entry.loc,
timeout=aiohttp.ClientTimeout(total=10),
allow_redirects=True,
) as response:
entry.status_code = response.status
except Exception:
entry.status_code = 0
await asyncio.gather(*[check_url(e) for e in result.entries[:100]])
# Count status codes
status_counts = {}
for entry in result.entries:
if entry.status_code:
status_counts[entry.status_code] = (
status_counts.get(entry.status_code, 0) + 1
)
result.stats["url_status_codes"] = status_counts
# Add issues for non-200 URLs
error_count = sum(
1 for e in result.entries
if e.status_code and e.status_code >= 400
)
if error_count > 0:
result.issues.append(SitemapIssue(
severity="warning",
message=f"{error_count} URLs returning error status codes (4xx/5xx)",
suggestion="Remove or fix broken URLs in sitemap",
))
def generate_report(self, result: SitemapResult) -> str:
"""Generate human-readable validation report."""
lines = [
"=" * 60,
"Sitemap Validation Report",
"=" * 60,
f"URL: {result.url}",
f"Type: {result.sitemap_type}",
f"Valid: {'Yes' if result.valid else 'No'}",
f"Timestamp: {result.timestamp}",
"",
]
lines.append("Statistics:")
for key, value in result.stats.items():
lines.append(f" {key}: {value}")
lines.append("")
if result.child_sitemaps:
lines.append(f"Child Sitemaps ({len(result.child_sitemaps)}):")
for sitemap in result.child_sitemaps[:10]:
lines.append(f" - {sitemap}")
if len(result.child_sitemaps) > 10:
lines.append(f" ... and {len(result.child_sitemaps) - 10} more")
lines.append("")
if result.issues:
lines.append("Issues Found:")
errors = [i for i in result.issues if i.severity == "error"]
warnings = [i for i in result.issues if i.severity == "warning"]
infos = [i for i in result.issues if i.severity == "info"]
if errors:
lines.append(f"\n ERRORS ({len(errors)}):")
for issue in errors:
lines.append(f" - {issue.message}")
if issue.url:
lines.append(f" URL: {issue.url}")
if issue.suggestion:
lines.append(f" Suggestion: {issue.suggestion}")
if warnings:
lines.append(f"\n WARNINGS ({len(warnings)}):")
for issue in warnings:
lines.append(f" - {issue.message}")
if issue.suggestion:
lines.append(f" Suggestion: {issue.suggestion}")
if infos:
lines.append(f"\n INFO ({len(infos)}):")
for issue in infos:
lines.append(f" - {issue.message}")
lines.append("")
lines.append("=" * 60)
return "\n".join(lines)
def main():
"""Main entry point for CLI usage."""
parser = argparse.ArgumentParser(
description="Validate XML sitemaps",
)
parser.add_argument("--url", "-u", required=True, help="Sitemap URL to validate")
parser.add_argument("--check-urls", action="store_true",
help="Check HTTP status of URLs (slower)")
parser.add_argument("--output", "-o", help="Output file for JSON report")
parser.add_argument("--json", action="store_true", help="Output as JSON")
args = parser.parse_args()
validator = SitemapValidator(check_urls=args.check_urls)
result = validator.validate(args.url)
if args.json or args.output:
output = json.dumps(result.to_dict(), ensure_ascii=False, indent=2)
if args.output:
with open(args.output, "w", encoding="utf-8") as f:
f.write(output)
logger.info(f"Report written to {args.output}")
else:
print(output)
else:
print(validator.generate_report(result))
if __name__ == "__main__":
main()