Restructure skill numbering: SEO 11-30, GTM 60-69, reserve 19-28 for future skills
Renumber 12 existing skills to new ranges: - SEO: 11→13, 12→18, 13→16, 14→17, 15→14, 16→15, 17→29, 18→30, 19→12 - GTM: 20→60, 21→61, 22→62 Update cross-references in gateway architect/builder skills, GTM guardian README, CLAUDE.md (skill tables + directory layout), and AGENTS.md (domain routing ranges). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,498 @@
|
||||
"""
|
||||
Schema Validator - Validate JSON-LD structured data markup
|
||||
==========================================================
|
||||
Purpose: Extract and validate schema.org structured data from URLs or files
|
||||
Python: 3.10+
|
||||
Usage:
|
||||
python schema_validator.py --url https://example.com
|
||||
python schema_validator.py --file schema.json
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime
|
||||
from typing import Any
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
try:
|
||||
import extruct
|
||||
HAS_EXTRUCT = True
|
||||
except ImportError:
|
||||
HAS_EXTRUCT = False
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s - %(levelname)s - %(message)s",
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class ValidationIssue:
|
||||
"""Represents a validation issue found in schema."""
|
||||
|
||||
severity: str # "error", "warning", "info"
|
||||
message: str
|
||||
schema_type: str | None = None
|
||||
property_name: str | None = None
|
||||
suggestion: str | None = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class ValidationResult:
|
||||
"""Complete validation result for a schema."""
|
||||
|
||||
url: str | None = None
|
||||
schemas_found: list[dict] = field(default_factory=list)
|
||||
issues: list[ValidationIssue] = field(default_factory=list)
|
||||
valid: bool = True
|
||||
rich_results_eligible: dict = field(default_factory=dict)
|
||||
timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
"""Convert to dictionary for JSON output."""
|
||||
return {
|
||||
"url": self.url,
|
||||
"schemas_found": len(self.schemas_found),
|
||||
"schema_types": [s.get("@type", "Unknown") for s in self.schemas_found],
|
||||
"valid": self.valid,
|
||||
"issues": [
|
||||
{
|
||||
"severity": i.severity,
|
||||
"message": i.message,
|
||||
"schema_type": i.schema_type,
|
||||
"property": i.property_name,
|
||||
"suggestion": i.suggestion,
|
||||
}
|
||||
for i in self.issues
|
||||
],
|
||||
"rich_results_eligible": self.rich_results_eligible,
|
||||
"timestamp": self.timestamp,
|
||||
}
|
||||
|
||||
|
||||
class SchemaValidator:
|
||||
"""Validate schema.org structured data."""
|
||||
|
||||
# Required properties for common schema types
|
||||
REQUIRED_PROPERTIES = {
|
||||
"Organization": ["name", "url"],
|
||||
"LocalBusiness": ["name", "address"],
|
||||
"Product": ["name"],
|
||||
"Offer": ["price", "priceCurrency"],
|
||||
"Article": ["headline", "author", "datePublished", "publisher"],
|
||||
"BlogPosting": ["headline", "author", "datePublished", "publisher"],
|
||||
"NewsArticle": ["headline", "author", "datePublished", "publisher"],
|
||||
"FAQPage": ["mainEntity"],
|
||||
"Question": ["name", "acceptedAnswer"],
|
||||
"Answer": ["text"],
|
||||
"BreadcrumbList": ["itemListElement"],
|
||||
"ListItem": ["position", "name"],
|
||||
"WebSite": ["name", "url"],
|
||||
"WebPage": ["name"],
|
||||
"Person": ["name"],
|
||||
"Event": ["name", "startDate", "location"],
|
||||
"Review": ["reviewRating", "author"],
|
||||
"AggregateRating": ["ratingValue"],
|
||||
"ImageObject": ["url"],
|
||||
}
|
||||
|
||||
# Recommended (but not required) properties
|
||||
RECOMMENDED_PROPERTIES = {
|
||||
"Organization": ["logo", "description", "contactPoint", "sameAs"],
|
||||
"LocalBusiness": ["telephone", "openingHoursSpecification", "geo", "image"],
|
||||
"Product": ["description", "image", "brand", "offers", "aggregateRating"],
|
||||
"Article": ["image", "dateModified", "description"],
|
||||
"FAQPage": [],
|
||||
"WebSite": ["potentialAction"],
|
||||
"BreadcrumbList": [],
|
||||
}
|
||||
|
||||
# Google Rich Results eligible types
|
||||
RICH_RESULTS_TYPES = {
|
||||
"Article", "BlogPosting", "NewsArticle",
|
||||
"Product", "Review",
|
||||
"FAQPage", "HowTo",
|
||||
"LocalBusiness", "Restaurant",
|
||||
"Event",
|
||||
"Recipe",
|
||||
"JobPosting",
|
||||
"Course",
|
||||
"BreadcrumbList",
|
||||
"Organization",
|
||||
"WebSite",
|
||||
"VideoObject",
|
||||
}
|
||||
|
||||
def __init__(self):
|
||||
self.session = requests.Session()
|
||||
self.session.headers.update({
|
||||
"User-Agent": "Mozilla/5.0 (compatible; SEOAuditBot/1.0)"
|
||||
})
|
||||
|
||||
def extract_from_url(self, url: str) -> list[dict]:
|
||||
"""Extract all structured data from a URL."""
|
||||
try:
|
||||
response = self.session.get(url, timeout=30)
|
||||
response.raise_for_status()
|
||||
return self.extract_from_html(response.text, url)
|
||||
except requests.RequestException as e:
|
||||
logger.error(f"Failed to fetch URL: {e}")
|
||||
return []
|
||||
|
||||
def extract_from_html(self, html: str, base_url: str | None = None) -> list[dict]:
|
||||
"""Extract structured data from HTML content."""
|
||||
schemas = []
|
||||
|
||||
# Method 1: Use extruct if available (handles JSON-LD, Microdata, RDFa)
|
||||
if HAS_EXTRUCT:
|
||||
try:
|
||||
data = extruct.extract(html, base_url=base_url, uniform=True)
|
||||
schemas.extend(data.get("json-ld", []))
|
||||
schemas.extend(data.get("microdata", []))
|
||||
schemas.extend(data.get("rdfa", []))
|
||||
except Exception as e:
|
||||
logger.warning(f"extruct extraction failed: {e}")
|
||||
|
||||
# Method 2: Manual JSON-LD extraction (fallback/additional)
|
||||
soup = BeautifulSoup(html, "html.parser")
|
||||
for script in soup.find_all("script", type="application/ld+json"):
|
||||
try:
|
||||
content = script.string
|
||||
if content:
|
||||
data = json.loads(content)
|
||||
if isinstance(data, list):
|
||||
schemas.extend(data)
|
||||
else:
|
||||
schemas.append(data)
|
||||
except json.JSONDecodeError as e:
|
||||
logger.warning(f"Invalid JSON-LD: {e}")
|
||||
|
||||
# Deduplicate schemas
|
||||
seen = set()
|
||||
unique_schemas = []
|
||||
for schema in schemas:
|
||||
schema_str = json.dumps(schema, sort_keys=True)
|
||||
if schema_str not in seen:
|
||||
seen.add(schema_str)
|
||||
unique_schemas.append(schema)
|
||||
|
||||
return unique_schemas
|
||||
|
||||
def validate(self, url: str | None = None, html: str | None = None,
|
||||
schema: dict | None = None) -> ValidationResult:
|
||||
"""Validate schema from URL, HTML, or direct schema dict."""
|
||||
result = ValidationResult(url=url)
|
||||
|
||||
# Extract schemas
|
||||
if schema:
|
||||
schemas = [schema]
|
||||
elif html:
|
||||
schemas = self.extract_from_html(html, url)
|
||||
elif url:
|
||||
schemas = self.extract_from_url(url)
|
||||
else:
|
||||
raise ValueError("Must provide url, html, or schema")
|
||||
|
||||
result.schemas_found = schemas
|
||||
|
||||
if not schemas:
|
||||
result.issues.append(ValidationIssue(
|
||||
severity="warning",
|
||||
message="No structured data found",
|
||||
suggestion="Add JSON-LD schema markup to improve SEO",
|
||||
))
|
||||
result.valid = False
|
||||
return result
|
||||
|
||||
# Validate each schema
|
||||
for schema in schemas:
|
||||
self._validate_schema(schema, result)
|
||||
|
||||
# Check for errors (warnings don't affect validity)
|
||||
result.valid = not any(i.severity == "error" for i in result.issues)
|
||||
|
||||
return result
|
||||
|
||||
def _validate_schema(self, schema: dict, result: ValidationResult,
|
||||
parent_type: str | None = None) -> None:
|
||||
"""Validate a single schema object."""
|
||||
schema_type = schema.get("@type")
|
||||
|
||||
if not schema_type:
|
||||
result.issues.append(ValidationIssue(
|
||||
severity="error",
|
||||
message="Missing @type property",
|
||||
schema_type=parent_type,
|
||||
))
|
||||
return
|
||||
|
||||
# Handle array of types
|
||||
if isinstance(schema_type, list):
|
||||
schema_type = schema_type[0]
|
||||
|
||||
# Check required properties
|
||||
required = self.REQUIRED_PROPERTIES.get(schema_type, [])
|
||||
for prop in required:
|
||||
if prop not in schema:
|
||||
result.issues.append(ValidationIssue(
|
||||
severity="error",
|
||||
message=f"Missing required property: {prop}",
|
||||
schema_type=schema_type,
|
||||
property_name=prop,
|
||||
suggestion=f"Add '{prop}' property to {schema_type} schema",
|
||||
))
|
||||
|
||||
# Check recommended properties
|
||||
recommended = self.RECOMMENDED_PROPERTIES.get(schema_type, [])
|
||||
for prop in recommended:
|
||||
if prop not in schema:
|
||||
result.issues.append(ValidationIssue(
|
||||
severity="info",
|
||||
message=f"Missing recommended property: {prop}",
|
||||
schema_type=schema_type,
|
||||
property_name=prop,
|
||||
suggestion=f"Consider adding '{prop}' for better rich results",
|
||||
))
|
||||
|
||||
# Check Rich Results eligibility
|
||||
if schema_type in self.RICH_RESULTS_TYPES:
|
||||
result.rich_results_eligible[schema_type] = self._check_rich_results(
|
||||
schema, schema_type
|
||||
)
|
||||
|
||||
# Validate nested schemas
|
||||
for key, value in schema.items():
|
||||
if key.startswith("@"):
|
||||
continue
|
||||
if isinstance(value, dict) and "@type" in value:
|
||||
self._validate_schema(value, result, schema_type)
|
||||
elif isinstance(value, list):
|
||||
for item in value:
|
||||
if isinstance(item, dict) and "@type" in item:
|
||||
self._validate_schema(item, result, schema_type)
|
||||
|
||||
# Type-specific validations
|
||||
self._validate_type_specific(schema, schema_type, result)
|
||||
|
||||
def _validate_type_specific(self, schema: dict, schema_type: str,
|
||||
result: ValidationResult) -> None:
|
||||
"""Type-specific validation rules."""
|
||||
if schema_type in ("Article", "BlogPosting", "NewsArticle"):
|
||||
# Check image
|
||||
if "image" not in schema:
|
||||
result.issues.append(ValidationIssue(
|
||||
severity="warning",
|
||||
message="Article without image may not show in rich results",
|
||||
schema_type=schema_type,
|
||||
property_name="image",
|
||||
suggestion="Add at least one image to the article",
|
||||
))
|
||||
|
||||
# Check headline length
|
||||
headline = schema.get("headline", "")
|
||||
if len(headline) > 110:
|
||||
result.issues.append(ValidationIssue(
|
||||
severity="warning",
|
||||
message=f"Headline too long ({len(headline)} chars, max 110)",
|
||||
schema_type=schema_type,
|
||||
property_name="headline",
|
||||
))
|
||||
|
||||
elif schema_type == "Product":
|
||||
offer = schema.get("offers", {})
|
||||
if isinstance(offer, dict):
|
||||
# Check price
|
||||
price = offer.get("price")
|
||||
if price is not None:
|
||||
try:
|
||||
float(price)
|
||||
except (ValueError, TypeError):
|
||||
result.issues.append(ValidationIssue(
|
||||
severity="error",
|
||||
message=f"Invalid price value: {price}",
|
||||
schema_type="Offer",
|
||||
property_name="price",
|
||||
))
|
||||
|
||||
# Check availability
|
||||
availability = offer.get("availability", "")
|
||||
valid_availabilities = [
|
||||
"InStock", "OutOfStock", "PreOrder", "Discontinued",
|
||||
"https://schema.org/InStock", "https://schema.org/OutOfStock",
|
||||
]
|
||||
if availability and not any(
|
||||
a in availability for a in valid_availabilities
|
||||
):
|
||||
result.issues.append(ValidationIssue(
|
||||
severity="warning",
|
||||
message=f"Unknown availability value: {availability}",
|
||||
schema_type="Offer",
|
||||
property_name="availability",
|
||||
))
|
||||
|
||||
elif schema_type == "LocalBusiness":
|
||||
# Check for geo coordinates
|
||||
if "geo" not in schema:
|
||||
result.issues.append(ValidationIssue(
|
||||
severity="info",
|
||||
message="Missing geo coordinates",
|
||||
schema_type=schema_type,
|
||||
property_name="geo",
|
||||
suggestion="Add latitude/longitude for better local search",
|
||||
))
|
||||
|
||||
elif schema_type == "FAQPage":
|
||||
main_entity = schema.get("mainEntity", [])
|
||||
if not main_entity:
|
||||
result.issues.append(ValidationIssue(
|
||||
severity="error",
|
||||
message="FAQPage must have at least one question",
|
||||
schema_type=schema_type,
|
||||
property_name="mainEntity",
|
||||
))
|
||||
elif len(main_entity) < 2:
|
||||
result.issues.append(ValidationIssue(
|
||||
severity="info",
|
||||
message="FAQPage has only one question",
|
||||
schema_type=schema_type,
|
||||
suggestion="Add more questions for better rich results",
|
||||
))
|
||||
|
||||
def _check_rich_results(self, schema: dict, schema_type: str) -> dict:
|
||||
"""Check if schema is eligible for Google Rich Results."""
|
||||
result = {
|
||||
"eligible": True,
|
||||
"missing_for_rich_results": [],
|
||||
}
|
||||
|
||||
if schema_type in ("Article", "BlogPosting", "NewsArticle"):
|
||||
required_for_rich = ["headline", "image", "datePublished", "author"]
|
||||
for prop in required_for_rich:
|
||||
if prop not in schema:
|
||||
result["eligible"] = False
|
||||
result["missing_for_rich_results"].append(prop)
|
||||
|
||||
elif schema_type == "Product":
|
||||
if "name" not in schema:
|
||||
result["eligible"] = False
|
||||
result["missing_for_rich_results"].append("name")
|
||||
offer = schema.get("offers")
|
||||
if not offer:
|
||||
result["eligible"] = False
|
||||
result["missing_for_rich_results"].append("offers")
|
||||
|
||||
elif schema_type == "FAQPage":
|
||||
if not schema.get("mainEntity"):
|
||||
result["eligible"] = False
|
||||
result["missing_for_rich_results"].append("mainEntity")
|
||||
|
||||
return result
|
||||
|
||||
def generate_report(self, result: ValidationResult) -> str:
|
||||
"""Generate human-readable validation report."""
|
||||
lines = [
|
||||
"=" * 60,
|
||||
"Schema Validation Report",
|
||||
"=" * 60,
|
||||
f"URL: {result.url or 'N/A'}",
|
||||
f"Timestamp: {result.timestamp}",
|
||||
f"Valid: {'Yes' if result.valid else 'No'}",
|
||||
f"Schemas Found: {len(result.schemas_found)}",
|
||||
"",
|
||||
]
|
||||
|
||||
if result.schemas_found:
|
||||
lines.append("Schema Types:")
|
||||
for schema in result.schemas_found:
|
||||
schema_type = schema.get("@type", "Unknown")
|
||||
lines.append(f" - {schema_type}")
|
||||
lines.append("")
|
||||
|
||||
if result.rich_results_eligible:
|
||||
lines.append("Rich Results Eligibility:")
|
||||
for schema_type, status in result.rich_results_eligible.items():
|
||||
eligible = "Yes" if status["eligible"] else "No"
|
||||
lines.append(f" - {schema_type}: {eligible}")
|
||||
if status["missing_for_rich_results"]:
|
||||
missing = ", ".join(status["missing_for_rich_results"])
|
||||
lines.append(f" Missing: {missing}")
|
||||
lines.append("")
|
||||
|
||||
if result.issues:
|
||||
lines.append("Issues Found:")
|
||||
errors = [i for i in result.issues if i.severity == "error"]
|
||||
warnings = [i for i in result.issues if i.severity == "warning"]
|
||||
infos = [i for i in result.issues if i.severity == "info"]
|
||||
|
||||
if errors:
|
||||
lines.append(f"\n ERRORS ({len(errors)}):")
|
||||
for issue in errors:
|
||||
lines.append(f" - [{issue.schema_type}] {issue.message}")
|
||||
if issue.suggestion:
|
||||
lines.append(f" Suggestion: {issue.suggestion}")
|
||||
|
||||
if warnings:
|
||||
lines.append(f"\n WARNINGS ({len(warnings)}):")
|
||||
for issue in warnings:
|
||||
lines.append(f" - [{issue.schema_type}] {issue.message}")
|
||||
if issue.suggestion:
|
||||
lines.append(f" Suggestion: {issue.suggestion}")
|
||||
|
||||
if infos:
|
||||
lines.append(f"\n INFO ({len(infos)}):")
|
||||
for issue in infos:
|
||||
lines.append(f" - [{issue.schema_type}] {issue.message}")
|
||||
if issue.suggestion:
|
||||
lines.append(f" Suggestion: {issue.suggestion}")
|
||||
|
||||
lines.append("")
|
||||
lines.append("=" * 60)
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def main():
|
||||
"""Main entry point for CLI usage."""
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Validate schema.org structured data",
|
||||
)
|
||||
parser.add_argument("--url", "-u", help="URL to validate")
|
||||
parser.add_argument("--file", "-f", help="JSON-LD file to validate")
|
||||
parser.add_argument("--output", "-o", help="Output file for JSON report")
|
||||
parser.add_argument("--json", action="store_true", help="Output as JSON")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if not args.url and not args.file:
|
||||
parser.error("Must provide --url or --file")
|
||||
|
||||
validator = SchemaValidator()
|
||||
|
||||
if args.file:
|
||||
with open(args.file, "r", encoding="utf-8") as f:
|
||||
schema = json.load(f)
|
||||
result = validator.validate(schema=schema)
|
||||
else:
|
||||
result = validator.validate(url=args.url)
|
||||
|
||||
if args.json or args.output:
|
||||
output = json.dumps(result.to_dict(), ensure_ascii=False, indent=2)
|
||||
if args.output:
|
||||
with open(args.output, "w", encoding="utf-8") as f:
|
||||
f.write(output)
|
||||
logger.info(f"Report written to {args.output}")
|
||||
else:
|
||||
print(output)
|
||||
else:
|
||||
print(validator.generate_report(result))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user