🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
499 lines
18 KiB
Python
499 lines
18 KiB
Python
"""
|
|
Schema Validator - Validate JSON-LD structured data markup
|
|
==========================================================
|
|
Purpose: Extract and validate schema.org structured data from URLs or files
|
|
Python: 3.10+
|
|
Usage:
|
|
python schema_validator.py --url https://example.com
|
|
python schema_validator.py --file schema.json
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import logging
|
|
import re
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
from typing import Any
|
|
from urllib.parse import urlparse
|
|
|
|
import requests
|
|
from bs4 import BeautifulSoup
|
|
|
|
try:
|
|
import extruct
|
|
HAS_EXTRUCT = True
|
|
except ImportError:
|
|
HAS_EXTRUCT = False
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s - %(levelname)s - %(message)s",
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class ValidationIssue:
|
|
"""Represents a validation issue found in schema."""
|
|
|
|
severity: str # "error", "warning", "info"
|
|
message: str
|
|
schema_type: str | None = None
|
|
property_name: str | None = None
|
|
suggestion: str | None = None
|
|
|
|
|
|
@dataclass
|
|
class ValidationResult:
|
|
"""Complete validation result for a schema."""
|
|
|
|
url: str | None = None
|
|
schemas_found: list[dict] = field(default_factory=list)
|
|
issues: list[ValidationIssue] = field(default_factory=list)
|
|
valid: bool = True
|
|
rich_results_eligible: dict = field(default_factory=dict)
|
|
timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
|
|
|
|
def to_dict(self) -> dict:
|
|
"""Convert to dictionary for JSON output."""
|
|
return {
|
|
"url": self.url,
|
|
"schemas_found": len(self.schemas_found),
|
|
"schema_types": [s.get("@type", "Unknown") for s in self.schemas_found],
|
|
"valid": self.valid,
|
|
"issues": [
|
|
{
|
|
"severity": i.severity,
|
|
"message": i.message,
|
|
"schema_type": i.schema_type,
|
|
"property": i.property_name,
|
|
"suggestion": i.suggestion,
|
|
}
|
|
for i in self.issues
|
|
],
|
|
"rich_results_eligible": self.rich_results_eligible,
|
|
"timestamp": self.timestamp,
|
|
}
|
|
|
|
|
|
class SchemaValidator:
|
|
"""Validate schema.org structured data."""
|
|
|
|
# Required properties for common schema types
|
|
REQUIRED_PROPERTIES = {
|
|
"Organization": ["name", "url"],
|
|
"LocalBusiness": ["name", "address"],
|
|
"Product": ["name"],
|
|
"Offer": ["price", "priceCurrency"],
|
|
"Article": ["headline", "author", "datePublished", "publisher"],
|
|
"BlogPosting": ["headline", "author", "datePublished", "publisher"],
|
|
"NewsArticle": ["headline", "author", "datePublished", "publisher"],
|
|
"FAQPage": ["mainEntity"],
|
|
"Question": ["name", "acceptedAnswer"],
|
|
"Answer": ["text"],
|
|
"BreadcrumbList": ["itemListElement"],
|
|
"ListItem": ["position", "name"],
|
|
"WebSite": ["name", "url"],
|
|
"WebPage": ["name"],
|
|
"Person": ["name"],
|
|
"Event": ["name", "startDate", "location"],
|
|
"Review": ["reviewRating", "author"],
|
|
"AggregateRating": ["ratingValue"],
|
|
"ImageObject": ["url"],
|
|
}
|
|
|
|
# Recommended (but not required) properties
|
|
RECOMMENDED_PROPERTIES = {
|
|
"Organization": ["logo", "description", "contactPoint", "sameAs"],
|
|
"LocalBusiness": ["telephone", "openingHoursSpecification", "geo", "image"],
|
|
"Product": ["description", "image", "brand", "offers", "aggregateRating"],
|
|
"Article": ["image", "dateModified", "description"],
|
|
"FAQPage": [],
|
|
"WebSite": ["potentialAction"],
|
|
"BreadcrumbList": [],
|
|
}
|
|
|
|
# Google Rich Results eligible types
|
|
RICH_RESULTS_TYPES = {
|
|
"Article", "BlogPosting", "NewsArticle",
|
|
"Product", "Review",
|
|
"FAQPage", "HowTo",
|
|
"LocalBusiness", "Restaurant",
|
|
"Event",
|
|
"Recipe",
|
|
"JobPosting",
|
|
"Course",
|
|
"BreadcrumbList",
|
|
"Organization",
|
|
"WebSite",
|
|
"VideoObject",
|
|
}
|
|
|
|
def __init__(self):
|
|
self.session = requests.Session()
|
|
self.session.headers.update({
|
|
"User-Agent": "Mozilla/5.0 (compatible; SEOAuditBot/1.0)"
|
|
})
|
|
|
|
def extract_from_url(self, url: str) -> list[dict]:
|
|
"""Extract all structured data from a URL."""
|
|
try:
|
|
response = self.session.get(url, timeout=30)
|
|
response.raise_for_status()
|
|
return self.extract_from_html(response.text, url)
|
|
except requests.RequestException as e:
|
|
logger.error(f"Failed to fetch URL: {e}")
|
|
return []
|
|
|
|
def extract_from_html(self, html: str, base_url: str | None = None) -> list[dict]:
|
|
"""Extract structured data from HTML content."""
|
|
schemas = []
|
|
|
|
# Method 1: Use extruct if available (handles JSON-LD, Microdata, RDFa)
|
|
if HAS_EXTRUCT:
|
|
try:
|
|
data = extruct.extract(html, base_url=base_url, uniform=True)
|
|
schemas.extend(data.get("json-ld", []))
|
|
schemas.extend(data.get("microdata", []))
|
|
schemas.extend(data.get("rdfa", []))
|
|
except Exception as e:
|
|
logger.warning(f"extruct extraction failed: {e}")
|
|
|
|
# Method 2: Manual JSON-LD extraction (fallback/additional)
|
|
soup = BeautifulSoup(html, "html.parser")
|
|
for script in soup.find_all("script", type="application/ld+json"):
|
|
try:
|
|
content = script.string
|
|
if content:
|
|
data = json.loads(content)
|
|
if isinstance(data, list):
|
|
schemas.extend(data)
|
|
else:
|
|
schemas.append(data)
|
|
except json.JSONDecodeError as e:
|
|
logger.warning(f"Invalid JSON-LD: {e}")
|
|
|
|
# Deduplicate schemas
|
|
seen = set()
|
|
unique_schemas = []
|
|
for schema in schemas:
|
|
schema_str = json.dumps(schema, sort_keys=True)
|
|
if schema_str not in seen:
|
|
seen.add(schema_str)
|
|
unique_schemas.append(schema)
|
|
|
|
return unique_schemas
|
|
|
|
def validate(self, url: str | None = None, html: str | None = None,
|
|
schema: dict | None = None) -> ValidationResult:
|
|
"""Validate schema from URL, HTML, or direct schema dict."""
|
|
result = ValidationResult(url=url)
|
|
|
|
# Extract schemas
|
|
if schema:
|
|
schemas = [schema]
|
|
elif html:
|
|
schemas = self.extract_from_html(html, url)
|
|
elif url:
|
|
schemas = self.extract_from_url(url)
|
|
else:
|
|
raise ValueError("Must provide url, html, or schema")
|
|
|
|
result.schemas_found = schemas
|
|
|
|
if not schemas:
|
|
result.issues.append(ValidationIssue(
|
|
severity="warning",
|
|
message="No structured data found",
|
|
suggestion="Add JSON-LD schema markup to improve SEO",
|
|
))
|
|
result.valid = False
|
|
return result
|
|
|
|
# Validate each schema
|
|
for schema in schemas:
|
|
self._validate_schema(schema, result)
|
|
|
|
# Check for errors (warnings don't affect validity)
|
|
result.valid = not any(i.severity == "error" for i in result.issues)
|
|
|
|
return result
|
|
|
|
def _validate_schema(self, schema: dict, result: ValidationResult,
|
|
parent_type: str | None = None) -> None:
|
|
"""Validate a single schema object."""
|
|
schema_type = schema.get("@type")
|
|
|
|
if not schema_type:
|
|
result.issues.append(ValidationIssue(
|
|
severity="error",
|
|
message="Missing @type property",
|
|
schema_type=parent_type,
|
|
))
|
|
return
|
|
|
|
# Handle array of types
|
|
if isinstance(schema_type, list):
|
|
schema_type = schema_type[0]
|
|
|
|
# Check required properties
|
|
required = self.REQUIRED_PROPERTIES.get(schema_type, [])
|
|
for prop in required:
|
|
if prop not in schema:
|
|
result.issues.append(ValidationIssue(
|
|
severity="error",
|
|
message=f"Missing required property: {prop}",
|
|
schema_type=schema_type,
|
|
property_name=prop,
|
|
suggestion=f"Add '{prop}' property to {schema_type} schema",
|
|
))
|
|
|
|
# Check recommended properties
|
|
recommended = self.RECOMMENDED_PROPERTIES.get(schema_type, [])
|
|
for prop in recommended:
|
|
if prop not in schema:
|
|
result.issues.append(ValidationIssue(
|
|
severity="info",
|
|
message=f"Missing recommended property: {prop}",
|
|
schema_type=schema_type,
|
|
property_name=prop,
|
|
suggestion=f"Consider adding '{prop}' for better rich results",
|
|
))
|
|
|
|
# Check Rich Results eligibility
|
|
if schema_type in self.RICH_RESULTS_TYPES:
|
|
result.rich_results_eligible[schema_type] = self._check_rich_results(
|
|
schema, schema_type
|
|
)
|
|
|
|
# Validate nested schemas
|
|
for key, value in schema.items():
|
|
if key.startswith("@"):
|
|
continue
|
|
if isinstance(value, dict) and "@type" in value:
|
|
self._validate_schema(value, result, schema_type)
|
|
elif isinstance(value, list):
|
|
for item in value:
|
|
if isinstance(item, dict) and "@type" in item:
|
|
self._validate_schema(item, result, schema_type)
|
|
|
|
# Type-specific validations
|
|
self._validate_type_specific(schema, schema_type, result)
|
|
|
|
def _validate_type_specific(self, schema: dict, schema_type: str,
|
|
result: ValidationResult) -> None:
|
|
"""Type-specific validation rules."""
|
|
if schema_type in ("Article", "BlogPosting", "NewsArticle"):
|
|
# Check image
|
|
if "image" not in schema:
|
|
result.issues.append(ValidationIssue(
|
|
severity="warning",
|
|
message="Article without image may not show in rich results",
|
|
schema_type=schema_type,
|
|
property_name="image",
|
|
suggestion="Add at least one image to the article",
|
|
))
|
|
|
|
# Check headline length
|
|
headline = schema.get("headline", "")
|
|
if len(headline) > 110:
|
|
result.issues.append(ValidationIssue(
|
|
severity="warning",
|
|
message=f"Headline too long ({len(headline)} chars, max 110)",
|
|
schema_type=schema_type,
|
|
property_name="headline",
|
|
))
|
|
|
|
elif schema_type == "Product":
|
|
offer = schema.get("offers", {})
|
|
if isinstance(offer, dict):
|
|
# Check price
|
|
price = offer.get("price")
|
|
if price is not None:
|
|
try:
|
|
float(price)
|
|
except (ValueError, TypeError):
|
|
result.issues.append(ValidationIssue(
|
|
severity="error",
|
|
message=f"Invalid price value: {price}",
|
|
schema_type="Offer",
|
|
property_name="price",
|
|
))
|
|
|
|
# Check availability
|
|
availability = offer.get("availability", "")
|
|
valid_availabilities = [
|
|
"InStock", "OutOfStock", "PreOrder", "Discontinued",
|
|
"https://schema.org/InStock", "https://schema.org/OutOfStock",
|
|
]
|
|
if availability and not any(
|
|
a in availability for a in valid_availabilities
|
|
):
|
|
result.issues.append(ValidationIssue(
|
|
severity="warning",
|
|
message=f"Unknown availability value: {availability}",
|
|
schema_type="Offer",
|
|
property_name="availability",
|
|
))
|
|
|
|
elif schema_type == "LocalBusiness":
|
|
# Check for geo coordinates
|
|
if "geo" not in schema:
|
|
result.issues.append(ValidationIssue(
|
|
severity="info",
|
|
message="Missing geo coordinates",
|
|
schema_type=schema_type,
|
|
property_name="geo",
|
|
suggestion="Add latitude/longitude for better local search",
|
|
))
|
|
|
|
elif schema_type == "FAQPage":
|
|
main_entity = schema.get("mainEntity", [])
|
|
if not main_entity:
|
|
result.issues.append(ValidationIssue(
|
|
severity="error",
|
|
message="FAQPage must have at least one question",
|
|
schema_type=schema_type,
|
|
property_name="mainEntity",
|
|
))
|
|
elif len(main_entity) < 2:
|
|
result.issues.append(ValidationIssue(
|
|
severity="info",
|
|
message="FAQPage has only one question",
|
|
schema_type=schema_type,
|
|
suggestion="Add more questions for better rich results",
|
|
))
|
|
|
|
def _check_rich_results(self, schema: dict, schema_type: str) -> dict:
|
|
"""Check if schema is eligible for Google Rich Results."""
|
|
result = {
|
|
"eligible": True,
|
|
"missing_for_rich_results": [],
|
|
}
|
|
|
|
if schema_type in ("Article", "BlogPosting", "NewsArticle"):
|
|
required_for_rich = ["headline", "image", "datePublished", "author"]
|
|
for prop in required_for_rich:
|
|
if prop not in schema:
|
|
result["eligible"] = False
|
|
result["missing_for_rich_results"].append(prop)
|
|
|
|
elif schema_type == "Product":
|
|
if "name" not in schema:
|
|
result["eligible"] = False
|
|
result["missing_for_rich_results"].append("name")
|
|
offer = schema.get("offers")
|
|
if not offer:
|
|
result["eligible"] = False
|
|
result["missing_for_rich_results"].append("offers")
|
|
|
|
elif schema_type == "FAQPage":
|
|
if not schema.get("mainEntity"):
|
|
result["eligible"] = False
|
|
result["missing_for_rich_results"].append("mainEntity")
|
|
|
|
return result
|
|
|
|
def generate_report(self, result: ValidationResult) -> str:
|
|
"""Generate human-readable validation report."""
|
|
lines = [
|
|
"=" * 60,
|
|
"Schema Validation Report",
|
|
"=" * 60,
|
|
f"URL: {result.url or 'N/A'}",
|
|
f"Timestamp: {result.timestamp}",
|
|
f"Valid: {'Yes' if result.valid else 'No'}",
|
|
f"Schemas Found: {len(result.schemas_found)}",
|
|
"",
|
|
]
|
|
|
|
if result.schemas_found:
|
|
lines.append("Schema Types:")
|
|
for schema in result.schemas_found:
|
|
schema_type = schema.get("@type", "Unknown")
|
|
lines.append(f" - {schema_type}")
|
|
lines.append("")
|
|
|
|
if result.rich_results_eligible:
|
|
lines.append("Rich Results Eligibility:")
|
|
for schema_type, status in result.rich_results_eligible.items():
|
|
eligible = "Yes" if status["eligible"] else "No"
|
|
lines.append(f" - {schema_type}: {eligible}")
|
|
if status["missing_for_rich_results"]:
|
|
missing = ", ".join(status["missing_for_rich_results"])
|
|
lines.append(f" Missing: {missing}")
|
|
lines.append("")
|
|
|
|
if result.issues:
|
|
lines.append("Issues Found:")
|
|
errors = [i for i in result.issues if i.severity == "error"]
|
|
warnings = [i for i in result.issues if i.severity == "warning"]
|
|
infos = [i for i in result.issues if i.severity == "info"]
|
|
|
|
if errors:
|
|
lines.append(f"\n ERRORS ({len(errors)}):")
|
|
for issue in errors:
|
|
lines.append(f" - [{issue.schema_type}] {issue.message}")
|
|
if issue.suggestion:
|
|
lines.append(f" Suggestion: {issue.suggestion}")
|
|
|
|
if warnings:
|
|
lines.append(f"\n WARNINGS ({len(warnings)}):")
|
|
for issue in warnings:
|
|
lines.append(f" - [{issue.schema_type}] {issue.message}")
|
|
if issue.suggestion:
|
|
lines.append(f" Suggestion: {issue.suggestion}")
|
|
|
|
if infos:
|
|
lines.append(f"\n INFO ({len(infos)}):")
|
|
for issue in infos:
|
|
lines.append(f" - [{issue.schema_type}] {issue.message}")
|
|
if issue.suggestion:
|
|
lines.append(f" Suggestion: {issue.suggestion}")
|
|
|
|
lines.append("")
|
|
lines.append("=" * 60)
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def main():
|
|
"""Main entry point for CLI usage."""
|
|
parser = argparse.ArgumentParser(
|
|
description="Validate schema.org structured data",
|
|
)
|
|
parser.add_argument("--url", "-u", help="URL to validate")
|
|
parser.add_argument("--file", "-f", help="JSON-LD file to validate")
|
|
parser.add_argument("--output", "-o", help="Output file for JSON report")
|
|
parser.add_argument("--json", action="store_true", help="Output as JSON")
|
|
|
|
args = parser.parse_args()
|
|
|
|
if not args.url and not args.file:
|
|
parser.error("Must provide --url or --file")
|
|
|
|
validator = SchemaValidator()
|
|
|
|
if args.file:
|
|
with open(args.file, "r", encoding="utf-8") as f:
|
|
schema = json.load(f)
|
|
result = validator.validate(schema=schema)
|
|
else:
|
|
result = validator.validate(url=args.url)
|
|
|
|
if args.json or args.output:
|
|
output = json.dumps(result.to_dict(), ensure_ascii=False, indent=2)
|
|
if args.output:
|
|
with open(args.output, "w", encoding="utf-8") as f:
|
|
f.write(output)
|
|
logger.info(f"Report written to {args.output}")
|
|
else:
|
|
print(output)
|
|
else:
|
|
print(validator.generate_report(result))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|