12 new skills: Keyword Strategy, SERP Analysis, Position Tracking, Link Building, Content Strategy, E-Commerce SEO, KPI Framework, International SEO, AI Visibility, Knowledge Graph, Competitor Intel, and Crawl Budget. ~20K lines of Python across 25 domain scripts. Updated skill 11 pipeline table and repo CLAUDE.md. Enhanced skill 18 local SEO workflow from jamie.clinic audit. Note: Skill 26 hreflang_validator.py pending (content filter block). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
802 lines
30 KiB
Python
802 lines
30 KiB
Python
"""
|
|
Performance Reporter - Period-over-period SEO performance reports
|
|
================================================================
|
|
Purpose: Generate executive summaries, trend analysis, tactical breakdowns,
|
|
and target-vs-actual comparison from Ahrefs historical data.
|
|
Python: 3.10+
|
|
"""
|
|
|
|
import argparse
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import sys
|
|
from dataclasses import dataclass, field, asdict
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import aiohttp
|
|
|
|
from base_client import BaseAsyncClient, config
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Data classes
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@dataclass
|
|
class TrendData:
|
|
"""Single trend data point for a metric."""
|
|
period: str
|
|
value: float
|
|
change_pct: float | None = None
|
|
direction: str = "stable" # up, down, stable
|
|
|
|
|
|
@dataclass
|
|
class WinConcern:
|
|
"""A notable win or concern from performance analysis."""
|
|
category: str
|
|
description: str
|
|
impact: str = "medium" # high, medium, low
|
|
action: str = ""
|
|
|
|
|
|
@dataclass
|
|
class TargetProgress:
|
|
"""Target vs actual progress tracking."""
|
|
kpi_name: str
|
|
target: float
|
|
actual: float
|
|
progress_pct: float = 0.0
|
|
|
|
def compute_progress(self) -> None:
|
|
"""Compute progress percentage toward target."""
|
|
if self.target and self.target != 0:
|
|
self.progress_pct = round((self.actual / self.target) * 100, 1)
|
|
else:
|
|
self.progress_pct = 0.0
|
|
|
|
|
|
@dataclass
|
|
class PerformanceReport:
|
|
"""Complete performance report."""
|
|
url: str = ""
|
|
period: str = "monthly"
|
|
date_from: str = ""
|
|
date_to: str = ""
|
|
health_score: float = 0.0
|
|
health_trend: str = "stable"
|
|
trends: dict[str, list[TrendData]] = field(default_factory=dict)
|
|
wins: list[WinConcern] = field(default_factory=list)
|
|
concerns: list[WinConcern] = field(default_factory=list)
|
|
executive_summary: dict[str, Any] = field(default_factory=dict)
|
|
tactical_breakdown: dict[str, Any] = field(default_factory=dict)
|
|
target_progress: list[TargetProgress] = field(default_factory=list)
|
|
traffic_value_change: float = 0.0
|
|
timestamp: str = ""
|
|
errors: list[str] = field(default_factory=list)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Period helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
PERIOD_DAYS = {
|
|
"monthly": 30,
|
|
"quarterly": 90,
|
|
"yearly": 365,
|
|
}
|
|
|
|
|
|
def get_date_range(
|
|
period: str, date_from: str | None = None, date_to: str | None = None
|
|
) -> tuple[str, str]:
|
|
"""Compute date range from period or explicit dates."""
|
|
if date_from and date_to:
|
|
return date_from, date_to
|
|
end = datetime.now()
|
|
days = PERIOD_DAYS.get(period, 30)
|
|
start = end - timedelta(days=days)
|
|
return start.strftime("%Y-%m-%d"), end.strftime("%Y-%m-%d")
|
|
|
|
|
|
def get_previous_range(
|
|
date_from: str, date_to: str
|
|
) -> tuple[str, str]:
|
|
"""Compute the previous period of equal length for comparison."""
|
|
start = datetime.strptime(date_from, "%Y-%m-%d")
|
|
end = datetime.strptime(date_to, "%Y-%m-%d")
|
|
delta = end - start
|
|
prev_end = start - timedelta(days=1)
|
|
prev_start = prev_end - delta
|
|
return prev_start.strftime("%Y-%m-%d"), prev_end.strftime("%Y-%m-%d")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Performance Reporter
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class PerformanceReporter(BaseAsyncClient):
|
|
"""Generate period-over-period SEO performance reports from Ahrefs."""
|
|
|
|
AHREFS_BASE = "https://api.ahrefs.com/v3"
|
|
|
|
def __init__(self, api_token: str | None = None):
|
|
super().__init__(max_concurrent=3, requests_per_second=2.0)
|
|
self.api_token = api_token or config.get_required("AHREFS_API_TOKEN")
|
|
self.headers = {
|
|
"Authorization": f"Bearer {self.api_token}",
|
|
"Accept": "application/json",
|
|
}
|
|
|
|
async def _ahrefs_get(
|
|
self, session: aiohttp.ClientSession, endpoint: str, params: dict
|
|
) -> dict:
|
|
"""Make an authenticated GET request to Ahrefs API."""
|
|
url = f"{self.AHREFS_BASE}/{endpoint}"
|
|
async with session.get(url, headers=self.headers, params=params) as resp:
|
|
if resp.status != 200:
|
|
text = await resp.text()
|
|
self.logger.warning(f"Ahrefs {endpoint} returned {resp.status}: {text}")
|
|
return {"error": f"HTTP {resp.status}", "detail": text}
|
|
return await resp.json()
|
|
|
|
# ----- Data collectors -----
|
|
|
|
async def get_metrics_history(
|
|
self,
|
|
session: aiohttp.ClientSession,
|
|
url: str,
|
|
date_from: str,
|
|
date_to: str,
|
|
) -> list[dict]:
|
|
"""Fetch historical metrics via site-explorer-metrics-history."""
|
|
data = await self._ahrefs_get(
|
|
session,
|
|
"site-explorer/metrics-history",
|
|
{
|
|
"target": url,
|
|
"mode": "domain",
|
|
"date_from": date_from,
|
|
"date_to": date_to,
|
|
},
|
|
)
|
|
if "error" in data:
|
|
self.logger.warning(f"Metrics history error: {data}")
|
|
return []
|
|
return data.get("metrics", data.get("data", []))
|
|
|
|
async def get_dr_history(
|
|
self,
|
|
session: aiohttp.ClientSession,
|
|
url: str,
|
|
date_from: str,
|
|
date_to: str,
|
|
) -> list[dict]:
|
|
"""Fetch domain rating history."""
|
|
data = await self._ahrefs_get(
|
|
session,
|
|
"site-explorer/domain-rating-history",
|
|
{
|
|
"target": url,
|
|
"date_from": date_from,
|
|
"date_to": date_to,
|
|
},
|
|
)
|
|
if "error" in data:
|
|
return []
|
|
return data.get("domain_rating_history", data.get("data", []))
|
|
|
|
async def get_current_metrics(
|
|
self, session: aiohttp.ClientSession, url: str
|
|
) -> dict:
|
|
"""Fetch current snapshot metrics."""
|
|
data = await self._ahrefs_get(
|
|
session,
|
|
"site-explorer/metrics",
|
|
{"target": url, "mode": "domain"},
|
|
)
|
|
if "error" in data:
|
|
return {}
|
|
return data.get("metrics", data)
|
|
|
|
async def get_volume_history(
|
|
self,
|
|
session: aiohttp.ClientSession,
|
|
url: str,
|
|
date_from: str,
|
|
date_to: str,
|
|
) -> list[dict]:
|
|
"""Fetch total search volume history."""
|
|
data = await self._ahrefs_get(
|
|
session,
|
|
"site-explorer/total-search-volume-history",
|
|
{
|
|
"target": url,
|
|
"date_from": date_from,
|
|
"date_to": date_to,
|
|
},
|
|
)
|
|
if "error" in data:
|
|
return []
|
|
return data.get("total_search_volume_history", data.get("data", []))
|
|
|
|
# ----- Analysis methods -----
|
|
|
|
def calculate_period_comparison(
|
|
self, current_data: list[dict], previous_data: list[dict], metric_key: str
|
|
) -> list[TrendData]:
|
|
"""Compare metric values between current and previous period."""
|
|
trends = []
|
|
|
|
def avg_metric(data_list: list[dict], key: str) -> float:
|
|
vals = []
|
|
for entry in data_list:
|
|
val = entry.get(key)
|
|
if val is None:
|
|
organic = entry.get("organic", {})
|
|
val = organic.get(key)
|
|
if val is not None:
|
|
vals.append(float(val))
|
|
return sum(vals) / len(vals) if vals else 0.0
|
|
|
|
current_avg = avg_metric(current_data, metric_key)
|
|
previous_avg = avg_metric(previous_data, metric_key)
|
|
|
|
change_pct = None
|
|
direction = "stable"
|
|
if previous_avg and previous_avg != 0:
|
|
change_pct = round(((current_avg - previous_avg) / abs(previous_avg)) * 100, 2)
|
|
if change_pct > 2.0:
|
|
direction = "up"
|
|
elif change_pct < -2.0:
|
|
direction = "down"
|
|
|
|
trends.append(TrendData(
|
|
period=metric_key,
|
|
value=round(current_avg, 2),
|
|
change_pct=change_pct,
|
|
direction=direction,
|
|
))
|
|
return trends
|
|
|
|
def identify_wins(
|
|
self, current: dict, previous: dict
|
|
) -> list[WinConcern]:
|
|
"""Identify significant positive changes between periods."""
|
|
wins = []
|
|
metric_labels = {
|
|
"traffic": "Organic Traffic",
|
|
"cost": "Traffic Value",
|
|
"keywords": "Keyword Count",
|
|
"refdomains": "Referring Domains",
|
|
}
|
|
|
|
for key, label in metric_labels.items():
|
|
curr_val = self._extract_metric(current, key)
|
|
prev_val = self._extract_metric(previous, key)
|
|
if prev_val and prev_val > 0 and curr_val > prev_val:
|
|
change_pct = ((curr_val - prev_val) / prev_val) * 100
|
|
if change_pct >= 5.0:
|
|
impact = "high" if change_pct >= 20 else ("medium" if change_pct >= 10 else "low")
|
|
wins.append(WinConcern(
|
|
category=label,
|
|
description=f"{label} increased by {change_pct:+.1f}% ({prev_val:,.0f} -> {curr_val:,.0f})",
|
|
impact=impact,
|
|
action=f"Continue current {label.lower()} strategy",
|
|
))
|
|
return wins
|
|
|
|
def identify_concerns(
|
|
self, current: dict, previous: dict
|
|
) -> list[WinConcern]:
|
|
"""Identify significant negative changes between periods."""
|
|
concerns = []
|
|
metric_labels = {
|
|
"traffic": "Organic Traffic",
|
|
"cost": "Traffic Value",
|
|
"keywords": "Keyword Count",
|
|
"refdomains": "Referring Domains",
|
|
}
|
|
|
|
for key, label in metric_labels.items():
|
|
curr_val = self._extract_metric(current, key)
|
|
prev_val = self._extract_metric(previous, key)
|
|
if prev_val and prev_val > 0 and curr_val < prev_val:
|
|
change_pct = ((curr_val - prev_val) / prev_val) * 100
|
|
if change_pct <= -5.0:
|
|
impact = "high" if change_pct <= -20 else ("medium" if change_pct <= -10 else "low")
|
|
actions = {
|
|
"Organic Traffic": "Investigate traffic sources and algorithm updates",
|
|
"Traffic Value": "Review keyword targeting and content quality",
|
|
"Keyword Count": "Expand content coverage and optimize existing pages",
|
|
"Referring Domains": "Strengthen link building outreach campaigns",
|
|
}
|
|
concerns.append(WinConcern(
|
|
category=label,
|
|
description=f"{label} decreased by {change_pct:.1f}% ({prev_val:,.0f} -> {curr_val:,.0f})",
|
|
impact=impact,
|
|
action=actions.get(label, f"Review {label.lower()} strategy"),
|
|
))
|
|
return concerns
|
|
|
|
def _extract_metric(self, data: dict, key: str) -> float:
|
|
"""Extract a metric value from nested Ahrefs response."""
|
|
if key in data:
|
|
return float(data[key])
|
|
organic = data.get("organic", {})
|
|
if key in organic:
|
|
return float(organic[key])
|
|
return 0.0
|
|
|
|
def generate_executive_summary(
|
|
self,
|
|
wins: list[WinConcern],
|
|
concerns: list[WinConcern],
|
|
health_score: float,
|
|
health_trend: str,
|
|
traffic_value_change: float,
|
|
) -> dict[str, Any]:
|
|
"""Generate high-level executive summary."""
|
|
summary = {
|
|
"health_score": health_score,
|
|
"health_trend": health_trend,
|
|
"traffic_value_change_usd": round(traffic_value_change, 2),
|
|
"total_wins": len(wins),
|
|
"total_concerns": len(concerns),
|
|
"top_wins": [
|
|
{"category": w.category, "description": w.description, "impact": w.impact}
|
|
for w in sorted(wins, key=lambda x: {"high": 0, "medium": 1, "low": 2}.get(x.impact, 3))[:5]
|
|
],
|
|
"top_concerns": [
|
|
{"category": c.category, "description": c.description, "impact": c.impact}
|
|
for c in sorted(concerns, key=lambda x: {"high": 0, "medium": 1, "low": 2}.get(x.impact, 3))[:5]
|
|
],
|
|
"overall_assessment": "",
|
|
}
|
|
|
|
if health_score >= 75:
|
|
summary["overall_assessment"] = "Strong performance - focus on maintaining momentum"
|
|
elif health_score >= 50:
|
|
summary["overall_assessment"] = "Moderate performance - targeted improvements needed"
|
|
else:
|
|
summary["overall_assessment"] = "Needs attention - prioritize fundamental improvements"
|
|
|
|
return summary
|
|
|
|
def generate_tactical_breakdown(
|
|
self, current: dict, wins: list[WinConcern], concerns: list[WinConcern]
|
|
) -> dict[str, Any]:
|
|
"""Generate actionable next steps per dimension."""
|
|
breakdown = {
|
|
"traffic": {
|
|
"status": "needs_review",
|
|
"actions": [],
|
|
},
|
|
"rankings": {
|
|
"status": "needs_review",
|
|
"actions": [],
|
|
},
|
|
"links": {
|
|
"status": "needs_review",
|
|
"actions": [],
|
|
},
|
|
"content": {
|
|
"status": "needs_review",
|
|
"actions": [],
|
|
},
|
|
"technical": {
|
|
"status": "needs_review",
|
|
"actions": [],
|
|
},
|
|
}
|
|
|
|
traffic = self._extract_metric(current, "traffic")
|
|
keywords = self._extract_metric(current, "keywords")
|
|
refdomains = self._extract_metric(current, "refdomains")
|
|
|
|
# Traffic actions
|
|
if traffic > 0:
|
|
breakdown["traffic"]["status"] = "active"
|
|
breakdown["traffic"]["actions"].append("Monitor top landing pages for traffic drops")
|
|
breakdown["traffic"]["actions"].append("Identify new keyword opportunities in adjacent topics")
|
|
else:
|
|
breakdown["traffic"]["actions"].append("Establish organic traffic baseline with content strategy")
|
|
|
|
# Rankings actions
|
|
if keywords > 0:
|
|
breakdown["rankings"]["status"] = "active"
|
|
breakdown["rankings"]["actions"].append(
|
|
f"Optimize pages for {int(keywords)} tracked keywords"
|
|
)
|
|
breakdown["rankings"]["actions"].append("Target featured snippets for top-performing queries")
|
|
else:
|
|
breakdown["rankings"]["actions"].append("Begin keyword research and content mapping")
|
|
|
|
# Links actions
|
|
if refdomains > 0:
|
|
breakdown["links"]["status"] = "active"
|
|
breakdown["links"]["actions"].append("Analyze top referring domains for partnership opportunities")
|
|
breakdown["links"]["actions"].append("Monitor for lost backlinks and reclaim valuable links")
|
|
else:
|
|
breakdown["links"]["actions"].append("Develop link acquisition strategy with digital PR")
|
|
|
|
# Content actions
|
|
breakdown["content"]["actions"].append("Audit content freshness and update older pages")
|
|
breakdown["content"]["actions"].append("Identify content gaps using competitor analysis")
|
|
|
|
# Technical actions
|
|
breakdown["technical"]["actions"].append("Run technical SEO audit for crawl issues")
|
|
breakdown["technical"]["actions"].append("Verify Core Web Vitals pass thresholds")
|
|
|
|
# Enrich with win/concern context
|
|
for w in wins:
|
|
cat_lower = w.category.lower()
|
|
if "traffic" in cat_lower and breakdown.get("traffic"):
|
|
breakdown["traffic"]["status"] = "improving"
|
|
if "keyword" in cat_lower and breakdown.get("rankings"):
|
|
breakdown["rankings"]["status"] = "improving"
|
|
if "domain" in cat_lower or "link" in cat_lower:
|
|
breakdown["links"]["status"] = "improving"
|
|
|
|
for c in concerns:
|
|
cat_lower = c.category.lower()
|
|
if "traffic" in cat_lower and breakdown.get("traffic"):
|
|
breakdown["traffic"]["status"] = "declining"
|
|
breakdown["traffic"]["actions"].insert(0, c.action)
|
|
if "keyword" in cat_lower and breakdown.get("rankings"):
|
|
breakdown["rankings"]["status"] = "declining"
|
|
breakdown["rankings"]["actions"].insert(0, c.action)
|
|
|
|
return breakdown
|
|
|
|
def compare_targets(
|
|
self, current: dict, targets: dict
|
|
) -> list[TargetProgress]:
|
|
"""Compare current metrics against saved targets."""
|
|
progress_list = []
|
|
for key, target_val in targets.items():
|
|
parts = key.split(".")
|
|
metric_name = parts[-1] if len(parts) > 1 else key
|
|
actual = self._extract_metric(current, metric_name)
|
|
if actual == 0.0 and len(parts) > 1:
|
|
# Try alternate key resolution
|
|
actual = current.get(key, 0.0)
|
|
if isinstance(actual, dict):
|
|
actual = 0.0
|
|
tp = TargetProgress(
|
|
kpi_name=key,
|
|
target=float(target_val),
|
|
actual=float(actual),
|
|
)
|
|
tp.compute_progress()
|
|
progress_list.append(tp)
|
|
return progress_list
|
|
|
|
# ----- Main orchestration -----
|
|
|
|
async def report(
|
|
self,
|
|
url: str,
|
|
period: str = "monthly",
|
|
date_from: str | None = None,
|
|
date_to: str | None = None,
|
|
executive_only: bool = False,
|
|
targets_path: str | None = None,
|
|
) -> PerformanceReport:
|
|
"""Orchestrate full performance report generation."""
|
|
report = PerformanceReport(
|
|
url=url,
|
|
period=period,
|
|
timestamp=datetime.now().isoformat(),
|
|
)
|
|
|
|
# Determine date ranges
|
|
report.date_from, report.date_to = get_date_range(period, date_from, date_to)
|
|
prev_from, prev_to = get_previous_range(report.date_from, report.date_to)
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
# Fetch current and previous period data concurrently
|
|
tasks = [
|
|
self.get_metrics_history(session, url, report.date_from, report.date_to),
|
|
self.get_metrics_history(session, url, prev_from, prev_to),
|
|
self.get_current_metrics(session, url),
|
|
self.get_dr_history(session, url, report.date_from, report.date_to),
|
|
self.get_volume_history(session, url, report.date_from, report.date_to),
|
|
]
|
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
current_history = results[0] if not isinstance(results[0], Exception) else []
|
|
previous_history = results[1] if not isinstance(results[1], Exception) else []
|
|
current_snapshot = results[2] if not isinstance(results[2], Exception) else {}
|
|
dr_history = results[3] if not isinstance(results[3], Exception) else []
|
|
volume_history = results[4] if not isinstance(results[4], Exception) else []
|
|
|
|
for i, r in enumerate(results):
|
|
if isinstance(r, Exception):
|
|
report.errors.append(f"Data fetch error [{i}]: {r}")
|
|
|
|
# Calculate trends for key metrics
|
|
for metric_key in ["traffic", "keywords", "cost", "refdomains"]:
|
|
if current_history or previous_history:
|
|
trend = self.calculate_period_comparison(
|
|
current_history if isinstance(current_history, list) else [],
|
|
previous_history if isinstance(previous_history, list) else [],
|
|
metric_key,
|
|
)
|
|
report.trends[metric_key] = [asdict(t) for t in trend]
|
|
|
|
# Build previous snapshot for comparison
|
|
previous_snapshot = {}
|
|
if isinstance(previous_history, list) and previous_history:
|
|
for entry in previous_history:
|
|
for key in ("traffic", "cost", "keywords", "refdomains"):
|
|
val = entry.get(key)
|
|
if val is None:
|
|
organic = entry.get("organic", {})
|
|
val = organic.get(key)
|
|
if val is not None:
|
|
if key not in previous_snapshot:
|
|
previous_snapshot[key] = []
|
|
previous_snapshot[key].append(float(val))
|
|
# Average the values
|
|
previous_snapshot = {
|
|
k: sum(v) / len(v) for k, v in previous_snapshot.items() if v
|
|
}
|
|
|
|
# Identify wins and concerns
|
|
if isinstance(current_snapshot, dict):
|
|
report.wins = self.identify_wins(current_snapshot, previous_snapshot)
|
|
report.concerns = self.identify_concerns(current_snapshot, previous_snapshot)
|
|
else:
|
|
report.wins = []
|
|
report.concerns = []
|
|
|
|
# Calculate health score (simple heuristic)
|
|
traffic = self._extract_metric(current_snapshot, "traffic") if isinstance(current_snapshot, dict) else 0
|
|
keywords = self._extract_metric(current_snapshot, "keywords") if isinstance(current_snapshot, dict) else 0
|
|
score_components = []
|
|
if traffic > 0:
|
|
score_components.append(min(100, traffic / 100))
|
|
if keywords > 0:
|
|
score_components.append(min(100, keywords / 50))
|
|
if dr_history:
|
|
latest_dr = dr_history[-1] if isinstance(dr_history, list) else {}
|
|
dr_val = latest_dr.get("domain_rating", latest_dr.get("domainRating", 0))
|
|
score_components.append(float(dr_val))
|
|
report.health_score = round(
|
|
sum(score_components) / max(len(score_components), 1), 1
|
|
)
|
|
|
|
# Health trend
|
|
win_count = len(report.wins)
|
|
concern_count = len(report.concerns)
|
|
if win_count > concern_count:
|
|
report.health_trend = "improving"
|
|
elif concern_count > win_count:
|
|
report.health_trend = "declining"
|
|
else:
|
|
report.health_trend = "stable"
|
|
|
|
# Traffic value change
|
|
curr_cost = self._extract_metric(current_snapshot, "cost") if isinstance(current_snapshot, dict) else 0
|
|
prev_cost = previous_snapshot.get("cost", 0)
|
|
report.traffic_value_change = round((curr_cost - prev_cost) / 100.0, 2)
|
|
|
|
# Executive summary
|
|
report.executive_summary = self.generate_executive_summary(
|
|
report.wins, report.concerns,
|
|
report.health_score, report.health_trend,
|
|
report.traffic_value_change,
|
|
)
|
|
|
|
if not executive_only:
|
|
# Tactical breakdown
|
|
report.tactical_breakdown = self.generate_tactical_breakdown(
|
|
current_snapshot if isinstance(current_snapshot, dict) else {},
|
|
report.wins, report.concerns,
|
|
)
|
|
|
|
# Target comparison
|
|
if targets_path:
|
|
try:
|
|
targets_data = json.loads(Path(targets_path).read_text())
|
|
# Use 30-day targets by default
|
|
target_set = targets_data.get("30_day", targets_data)
|
|
report.target_progress = self.compare_targets(
|
|
current_snapshot if isinstance(current_snapshot, dict) else {},
|
|
target_set,
|
|
)
|
|
except Exception as exc:
|
|
report.errors.append(f"Targets load error: {exc}")
|
|
|
|
return report
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Output formatting
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def format_text_report(report: PerformanceReport) -> str:
|
|
"""Format performance report as human-readable text."""
|
|
lines = []
|
|
lines.append("=" * 60)
|
|
lines.append(f"SEO Performance Report: {report.url}")
|
|
lines.append(f"Period: {report.period} ({report.date_from} to {report.date_to})")
|
|
lines.append(f"Generated: {report.timestamp}")
|
|
lines.append("=" * 60)
|
|
|
|
# Executive Summary
|
|
lines.append("\nEXECUTIVE SUMMARY")
|
|
lines.append("-" * 40)
|
|
es = report.executive_summary
|
|
lines.append(f" Health Score: {es.get('health_score', 0)}/100")
|
|
trend_arrow = {"improving": "^", "declining": "v", "stable": "="}.get(
|
|
es.get("health_trend", "stable"), "="
|
|
)
|
|
lines.append(f" Trend: {trend_arrow} {es.get('health_trend', 'stable')}")
|
|
lines.append(f" Traffic Value Change: ${es.get('traffic_value_change_usd', 0):,.2f}")
|
|
lines.append(f" Assessment: {es.get('overall_assessment', 'N/A')}")
|
|
|
|
# Wins
|
|
lines.append(f"\n Top Wins ({es.get('total_wins', 0)} total):")
|
|
for w in es.get("top_wins", []):
|
|
impact_marker = {"high": "!!!", "medium": "!!", "low": "!"}.get(w.get("impact", "low"), "!")
|
|
lines.append(f" {impact_marker} [{w.get('category', '')}] {w.get('description', '')}")
|
|
|
|
# Concerns
|
|
lines.append(f"\n Top Concerns ({es.get('total_concerns', 0)} total):")
|
|
for c in es.get("top_concerns", []):
|
|
impact_marker = {"high": "!!!", "medium": "!!", "low": "!"}.get(c.get("impact", "low"), "!")
|
|
lines.append(f" {impact_marker} [{c.get('category', '')}] {c.get('description', '')}")
|
|
|
|
# Trends
|
|
if report.trends:
|
|
lines.append("\nTRENDS")
|
|
lines.append("-" * 40)
|
|
for metric_name, trend_list in report.trends.items():
|
|
for t in trend_list:
|
|
if isinstance(t, dict):
|
|
dir_arrow = {"up": "^", "down": "v", "stable": "="}.get(
|
|
t.get("direction", "stable"), "="
|
|
)
|
|
change_str = f" ({t.get('change_pct', 0):+.1f}%)" if t.get("change_pct") is not None else ""
|
|
lines.append(f" {dir_arrow} {metric_name}: {t.get('value', 0):,.2f}{change_str}")
|
|
|
|
# Tactical Breakdown
|
|
if report.tactical_breakdown:
|
|
lines.append("\nTACTICAL BREAKDOWN")
|
|
lines.append("-" * 40)
|
|
for dim_name, dim_data in report.tactical_breakdown.items():
|
|
status = dim_data.get("status", "unknown")
|
|
status_marker = {
|
|
"improving": "^", "declining": "v", "active": "=", "needs_review": "?"
|
|
}.get(status, "?")
|
|
lines.append(f"\n [{dim_name.upper()}] Status: {status_marker} {status}")
|
|
for action in dim_data.get("actions", [])[:3]:
|
|
lines.append(f" > {action}")
|
|
|
|
# Target Progress
|
|
if report.target_progress:
|
|
lines.append("\nTARGET PROGRESS")
|
|
lines.append("-" * 40)
|
|
for tp in report.target_progress:
|
|
if isinstance(tp, TargetProgress):
|
|
bar_filled = int(min(tp.progress_pct, 100) / 5)
|
|
bar = "#" * bar_filled + "-" * (20 - bar_filled)
|
|
lines.append(
|
|
f" {tp.kpi_name}: [{bar}] {tp.progress_pct:.0f}% "
|
|
f"(actual: {tp.actual:,.0f} / target: {tp.target:,.0f})"
|
|
)
|
|
|
|
# Errors
|
|
if report.errors:
|
|
lines.append("\nERRORS")
|
|
lines.append("-" * 40)
|
|
for err in report.errors:
|
|
lines.append(f" ! {err}")
|
|
|
|
lines.append("\n" + "=" * 60)
|
|
return "\n".join(lines)
|
|
|
|
|
|
def serialize_report(report: PerformanceReport) -> dict:
|
|
"""Serialize PerformanceReport to JSON-safe dictionary."""
|
|
data = {
|
|
"url": report.url,
|
|
"period": report.period,
|
|
"date_from": report.date_from,
|
|
"date_to": report.date_to,
|
|
"health_score": report.health_score,
|
|
"health_trend": report.health_trend,
|
|
"trends": report.trends,
|
|
"wins": [asdict(w) for w in report.wins],
|
|
"concerns": [asdict(c) for c in report.concerns],
|
|
"executive_summary": report.executive_summary,
|
|
"tactical_breakdown": report.tactical_breakdown,
|
|
"target_progress": [asdict(tp) for tp in report.target_progress],
|
|
"traffic_value_change": report.traffic_value_change,
|
|
"timestamp": report.timestamp,
|
|
"errors": report.errors,
|
|
}
|
|
return data
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CLI entry point
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
"""Parse command-line arguments."""
|
|
parser = argparse.ArgumentParser(
|
|
description="SEO Performance Reporter - Period-over-period analysis"
|
|
)
|
|
parser.add_argument(
|
|
"--url", required=True, help="Target URL or domain"
|
|
)
|
|
parser.add_argument(
|
|
"--period", choices=["monthly", "quarterly", "yearly", "custom"],
|
|
default="monthly", help="Report period (default: monthly)"
|
|
)
|
|
parser.add_argument(
|
|
"--from", dest="date_from", type=str, default=None,
|
|
help="Start date (YYYY-MM-DD) for custom period"
|
|
)
|
|
parser.add_argument(
|
|
"--to", dest="date_to", type=str, default=None,
|
|
help="End date (YYYY-MM-DD) for custom period"
|
|
)
|
|
parser.add_argument(
|
|
"--executive", action="store_true",
|
|
help="Generate executive summary only"
|
|
)
|
|
parser.add_argument(
|
|
"--targets", type=str, default=None,
|
|
help="Path to targets JSON file for progress comparison"
|
|
)
|
|
parser.add_argument(
|
|
"--json", action="store_true",
|
|
help="Output results as JSON"
|
|
)
|
|
parser.add_argument(
|
|
"--output", type=str, default=None,
|
|
help="Save output to file path"
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
async def main() -> None:
|
|
"""Main entry point."""
|
|
args = parse_args()
|
|
|
|
reporter = PerformanceReporter()
|
|
result = await reporter.report(
|
|
url=args.url,
|
|
period=args.period,
|
|
date_from=args.date_from,
|
|
date_to=args.date_to,
|
|
executive_only=args.executive,
|
|
targets_path=args.targets,
|
|
)
|
|
|
|
if args.json:
|
|
output = json.dumps(serialize_report(result), indent=2, ensure_ascii=False)
|
|
else:
|
|
output = format_text_report(result)
|
|
|
|
if args.output:
|
|
Path(args.output).write_text(output, encoding="utf-8")
|
|
logger.info(f"Output saved to {args.output}")
|
|
else:
|
|
print(output)
|
|
|
|
reporter.print_stats()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|