Files
Andrew Yim a3ff965b87 Add SEO skills 19-28, 31-32 with full Python implementations
12 new skills: Keyword Strategy, SERP Analysis, Position Tracking,
Link Building, Content Strategy, E-Commerce SEO, KPI Framework,
International SEO, AI Visibility, Knowledge Graph, Competitor Intel,
and Crawl Budget. ~20K lines of Python across 25 domain scripts.
Updated skill 11 pipeline table and repo CLAUDE.md.
Enhanced skill 18 local SEO workflow from jamie.clinic audit.

Note: Skill 26 hreflang_validator.py pending (content filter block).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-13 12:05:59 +09:00

802 lines
30 KiB
Python

"""
Performance Reporter - Period-over-period SEO performance reports
================================================================
Purpose: Generate executive summaries, trend analysis, tactical breakdowns,
and target-vs-actual comparison from Ahrefs historical data.
Python: 3.10+
"""
import argparse
import asyncio
import json
import logging
import sys
from dataclasses import dataclass, field, asdict
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any
import aiohttp
from base_client import BaseAsyncClient, config
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Data classes
# ---------------------------------------------------------------------------
@dataclass
class TrendData:
"""Single trend data point for a metric."""
period: str
value: float
change_pct: float | None = None
direction: str = "stable" # up, down, stable
@dataclass
class WinConcern:
"""A notable win or concern from performance analysis."""
category: str
description: str
impact: str = "medium" # high, medium, low
action: str = ""
@dataclass
class TargetProgress:
"""Target vs actual progress tracking."""
kpi_name: str
target: float
actual: float
progress_pct: float = 0.0
def compute_progress(self) -> None:
"""Compute progress percentage toward target."""
if self.target and self.target != 0:
self.progress_pct = round((self.actual / self.target) * 100, 1)
else:
self.progress_pct = 0.0
@dataclass
class PerformanceReport:
"""Complete performance report."""
url: str = ""
period: str = "monthly"
date_from: str = ""
date_to: str = ""
health_score: float = 0.0
health_trend: str = "stable"
trends: dict[str, list[TrendData]] = field(default_factory=dict)
wins: list[WinConcern] = field(default_factory=list)
concerns: list[WinConcern] = field(default_factory=list)
executive_summary: dict[str, Any] = field(default_factory=dict)
tactical_breakdown: dict[str, Any] = field(default_factory=dict)
target_progress: list[TargetProgress] = field(default_factory=list)
traffic_value_change: float = 0.0
timestamp: str = ""
errors: list[str] = field(default_factory=list)
# ---------------------------------------------------------------------------
# Period helpers
# ---------------------------------------------------------------------------
PERIOD_DAYS = {
"monthly": 30,
"quarterly": 90,
"yearly": 365,
}
def get_date_range(
period: str, date_from: str | None = None, date_to: str | None = None
) -> tuple[str, str]:
"""Compute date range from period or explicit dates."""
if date_from and date_to:
return date_from, date_to
end = datetime.now()
days = PERIOD_DAYS.get(period, 30)
start = end - timedelta(days=days)
return start.strftime("%Y-%m-%d"), end.strftime("%Y-%m-%d")
def get_previous_range(
date_from: str, date_to: str
) -> tuple[str, str]:
"""Compute the previous period of equal length for comparison."""
start = datetime.strptime(date_from, "%Y-%m-%d")
end = datetime.strptime(date_to, "%Y-%m-%d")
delta = end - start
prev_end = start - timedelta(days=1)
prev_start = prev_end - delta
return prev_start.strftime("%Y-%m-%d"), prev_end.strftime("%Y-%m-%d")
# ---------------------------------------------------------------------------
# Performance Reporter
# ---------------------------------------------------------------------------
class PerformanceReporter(BaseAsyncClient):
"""Generate period-over-period SEO performance reports from Ahrefs."""
AHREFS_BASE = "https://api.ahrefs.com/v3"
def __init__(self, api_token: str | None = None):
super().__init__(max_concurrent=3, requests_per_second=2.0)
self.api_token = api_token or config.get_required("AHREFS_API_TOKEN")
self.headers = {
"Authorization": f"Bearer {self.api_token}",
"Accept": "application/json",
}
async def _ahrefs_get(
self, session: aiohttp.ClientSession, endpoint: str, params: dict
) -> dict:
"""Make an authenticated GET request to Ahrefs API."""
url = f"{self.AHREFS_BASE}/{endpoint}"
async with session.get(url, headers=self.headers, params=params) as resp:
if resp.status != 200:
text = await resp.text()
self.logger.warning(f"Ahrefs {endpoint} returned {resp.status}: {text}")
return {"error": f"HTTP {resp.status}", "detail": text}
return await resp.json()
# ----- Data collectors -----
async def get_metrics_history(
self,
session: aiohttp.ClientSession,
url: str,
date_from: str,
date_to: str,
) -> list[dict]:
"""Fetch historical metrics via site-explorer-metrics-history."""
data = await self._ahrefs_get(
session,
"site-explorer/metrics-history",
{
"target": url,
"mode": "domain",
"date_from": date_from,
"date_to": date_to,
},
)
if "error" in data:
self.logger.warning(f"Metrics history error: {data}")
return []
return data.get("metrics", data.get("data", []))
async def get_dr_history(
self,
session: aiohttp.ClientSession,
url: str,
date_from: str,
date_to: str,
) -> list[dict]:
"""Fetch domain rating history."""
data = await self._ahrefs_get(
session,
"site-explorer/domain-rating-history",
{
"target": url,
"date_from": date_from,
"date_to": date_to,
},
)
if "error" in data:
return []
return data.get("domain_rating_history", data.get("data", []))
async def get_current_metrics(
self, session: aiohttp.ClientSession, url: str
) -> dict:
"""Fetch current snapshot metrics."""
data = await self._ahrefs_get(
session,
"site-explorer/metrics",
{"target": url, "mode": "domain"},
)
if "error" in data:
return {}
return data.get("metrics", data)
async def get_volume_history(
self,
session: aiohttp.ClientSession,
url: str,
date_from: str,
date_to: str,
) -> list[dict]:
"""Fetch total search volume history."""
data = await self._ahrefs_get(
session,
"site-explorer/total-search-volume-history",
{
"target": url,
"date_from": date_from,
"date_to": date_to,
},
)
if "error" in data:
return []
return data.get("total_search_volume_history", data.get("data", []))
# ----- Analysis methods -----
def calculate_period_comparison(
self, current_data: list[dict], previous_data: list[dict], metric_key: str
) -> list[TrendData]:
"""Compare metric values between current and previous period."""
trends = []
def avg_metric(data_list: list[dict], key: str) -> float:
vals = []
for entry in data_list:
val = entry.get(key)
if val is None:
organic = entry.get("organic", {})
val = organic.get(key)
if val is not None:
vals.append(float(val))
return sum(vals) / len(vals) if vals else 0.0
current_avg = avg_metric(current_data, metric_key)
previous_avg = avg_metric(previous_data, metric_key)
change_pct = None
direction = "stable"
if previous_avg and previous_avg != 0:
change_pct = round(((current_avg - previous_avg) / abs(previous_avg)) * 100, 2)
if change_pct > 2.0:
direction = "up"
elif change_pct < -2.0:
direction = "down"
trends.append(TrendData(
period=metric_key,
value=round(current_avg, 2),
change_pct=change_pct,
direction=direction,
))
return trends
def identify_wins(
self, current: dict, previous: dict
) -> list[WinConcern]:
"""Identify significant positive changes between periods."""
wins = []
metric_labels = {
"traffic": "Organic Traffic",
"cost": "Traffic Value",
"keywords": "Keyword Count",
"refdomains": "Referring Domains",
}
for key, label in metric_labels.items():
curr_val = self._extract_metric(current, key)
prev_val = self._extract_metric(previous, key)
if prev_val and prev_val > 0 and curr_val > prev_val:
change_pct = ((curr_val - prev_val) / prev_val) * 100
if change_pct >= 5.0:
impact = "high" if change_pct >= 20 else ("medium" if change_pct >= 10 else "low")
wins.append(WinConcern(
category=label,
description=f"{label} increased by {change_pct:+.1f}% ({prev_val:,.0f} -> {curr_val:,.0f})",
impact=impact,
action=f"Continue current {label.lower()} strategy",
))
return wins
def identify_concerns(
self, current: dict, previous: dict
) -> list[WinConcern]:
"""Identify significant negative changes between periods."""
concerns = []
metric_labels = {
"traffic": "Organic Traffic",
"cost": "Traffic Value",
"keywords": "Keyword Count",
"refdomains": "Referring Domains",
}
for key, label in metric_labels.items():
curr_val = self._extract_metric(current, key)
prev_val = self._extract_metric(previous, key)
if prev_val and prev_val > 0 and curr_val < prev_val:
change_pct = ((curr_val - prev_val) / prev_val) * 100
if change_pct <= -5.0:
impact = "high" if change_pct <= -20 else ("medium" if change_pct <= -10 else "low")
actions = {
"Organic Traffic": "Investigate traffic sources and algorithm updates",
"Traffic Value": "Review keyword targeting and content quality",
"Keyword Count": "Expand content coverage and optimize existing pages",
"Referring Domains": "Strengthen link building outreach campaigns",
}
concerns.append(WinConcern(
category=label,
description=f"{label} decreased by {change_pct:.1f}% ({prev_val:,.0f} -> {curr_val:,.0f})",
impact=impact,
action=actions.get(label, f"Review {label.lower()} strategy"),
))
return concerns
def _extract_metric(self, data: dict, key: str) -> float:
"""Extract a metric value from nested Ahrefs response."""
if key in data:
return float(data[key])
organic = data.get("organic", {})
if key in organic:
return float(organic[key])
return 0.0
def generate_executive_summary(
self,
wins: list[WinConcern],
concerns: list[WinConcern],
health_score: float,
health_trend: str,
traffic_value_change: float,
) -> dict[str, Any]:
"""Generate high-level executive summary."""
summary = {
"health_score": health_score,
"health_trend": health_trend,
"traffic_value_change_usd": round(traffic_value_change, 2),
"total_wins": len(wins),
"total_concerns": len(concerns),
"top_wins": [
{"category": w.category, "description": w.description, "impact": w.impact}
for w in sorted(wins, key=lambda x: {"high": 0, "medium": 1, "low": 2}.get(x.impact, 3))[:5]
],
"top_concerns": [
{"category": c.category, "description": c.description, "impact": c.impact}
for c in sorted(concerns, key=lambda x: {"high": 0, "medium": 1, "low": 2}.get(x.impact, 3))[:5]
],
"overall_assessment": "",
}
if health_score >= 75:
summary["overall_assessment"] = "Strong performance - focus on maintaining momentum"
elif health_score >= 50:
summary["overall_assessment"] = "Moderate performance - targeted improvements needed"
else:
summary["overall_assessment"] = "Needs attention - prioritize fundamental improvements"
return summary
def generate_tactical_breakdown(
self, current: dict, wins: list[WinConcern], concerns: list[WinConcern]
) -> dict[str, Any]:
"""Generate actionable next steps per dimension."""
breakdown = {
"traffic": {
"status": "needs_review",
"actions": [],
},
"rankings": {
"status": "needs_review",
"actions": [],
},
"links": {
"status": "needs_review",
"actions": [],
},
"content": {
"status": "needs_review",
"actions": [],
},
"technical": {
"status": "needs_review",
"actions": [],
},
}
traffic = self._extract_metric(current, "traffic")
keywords = self._extract_metric(current, "keywords")
refdomains = self._extract_metric(current, "refdomains")
# Traffic actions
if traffic > 0:
breakdown["traffic"]["status"] = "active"
breakdown["traffic"]["actions"].append("Monitor top landing pages for traffic drops")
breakdown["traffic"]["actions"].append("Identify new keyword opportunities in adjacent topics")
else:
breakdown["traffic"]["actions"].append("Establish organic traffic baseline with content strategy")
# Rankings actions
if keywords > 0:
breakdown["rankings"]["status"] = "active"
breakdown["rankings"]["actions"].append(
f"Optimize pages for {int(keywords)} tracked keywords"
)
breakdown["rankings"]["actions"].append("Target featured snippets for top-performing queries")
else:
breakdown["rankings"]["actions"].append("Begin keyword research and content mapping")
# Links actions
if refdomains > 0:
breakdown["links"]["status"] = "active"
breakdown["links"]["actions"].append("Analyze top referring domains for partnership opportunities")
breakdown["links"]["actions"].append("Monitor for lost backlinks and reclaim valuable links")
else:
breakdown["links"]["actions"].append("Develop link acquisition strategy with digital PR")
# Content actions
breakdown["content"]["actions"].append("Audit content freshness and update older pages")
breakdown["content"]["actions"].append("Identify content gaps using competitor analysis")
# Technical actions
breakdown["technical"]["actions"].append("Run technical SEO audit for crawl issues")
breakdown["technical"]["actions"].append("Verify Core Web Vitals pass thresholds")
# Enrich with win/concern context
for w in wins:
cat_lower = w.category.lower()
if "traffic" in cat_lower and breakdown.get("traffic"):
breakdown["traffic"]["status"] = "improving"
if "keyword" in cat_lower and breakdown.get("rankings"):
breakdown["rankings"]["status"] = "improving"
if "domain" in cat_lower or "link" in cat_lower:
breakdown["links"]["status"] = "improving"
for c in concerns:
cat_lower = c.category.lower()
if "traffic" in cat_lower and breakdown.get("traffic"):
breakdown["traffic"]["status"] = "declining"
breakdown["traffic"]["actions"].insert(0, c.action)
if "keyword" in cat_lower and breakdown.get("rankings"):
breakdown["rankings"]["status"] = "declining"
breakdown["rankings"]["actions"].insert(0, c.action)
return breakdown
def compare_targets(
self, current: dict, targets: dict
) -> list[TargetProgress]:
"""Compare current metrics against saved targets."""
progress_list = []
for key, target_val in targets.items():
parts = key.split(".")
metric_name = parts[-1] if len(parts) > 1 else key
actual = self._extract_metric(current, metric_name)
if actual == 0.0 and len(parts) > 1:
# Try alternate key resolution
actual = current.get(key, 0.0)
if isinstance(actual, dict):
actual = 0.0
tp = TargetProgress(
kpi_name=key,
target=float(target_val),
actual=float(actual),
)
tp.compute_progress()
progress_list.append(tp)
return progress_list
# ----- Main orchestration -----
async def report(
self,
url: str,
period: str = "monthly",
date_from: str | None = None,
date_to: str | None = None,
executive_only: bool = False,
targets_path: str | None = None,
) -> PerformanceReport:
"""Orchestrate full performance report generation."""
report = PerformanceReport(
url=url,
period=period,
timestamp=datetime.now().isoformat(),
)
# Determine date ranges
report.date_from, report.date_to = get_date_range(period, date_from, date_to)
prev_from, prev_to = get_previous_range(report.date_from, report.date_to)
async with aiohttp.ClientSession() as session:
# Fetch current and previous period data concurrently
tasks = [
self.get_metrics_history(session, url, report.date_from, report.date_to),
self.get_metrics_history(session, url, prev_from, prev_to),
self.get_current_metrics(session, url),
self.get_dr_history(session, url, report.date_from, report.date_to),
self.get_volume_history(session, url, report.date_from, report.date_to),
]
results = await asyncio.gather(*tasks, return_exceptions=True)
current_history = results[0] if not isinstance(results[0], Exception) else []
previous_history = results[1] if not isinstance(results[1], Exception) else []
current_snapshot = results[2] if not isinstance(results[2], Exception) else {}
dr_history = results[3] if not isinstance(results[3], Exception) else []
volume_history = results[4] if not isinstance(results[4], Exception) else []
for i, r in enumerate(results):
if isinstance(r, Exception):
report.errors.append(f"Data fetch error [{i}]: {r}")
# Calculate trends for key metrics
for metric_key in ["traffic", "keywords", "cost", "refdomains"]:
if current_history or previous_history:
trend = self.calculate_period_comparison(
current_history if isinstance(current_history, list) else [],
previous_history if isinstance(previous_history, list) else [],
metric_key,
)
report.trends[metric_key] = [asdict(t) for t in trend]
# Build previous snapshot for comparison
previous_snapshot = {}
if isinstance(previous_history, list) and previous_history:
for entry in previous_history:
for key in ("traffic", "cost", "keywords", "refdomains"):
val = entry.get(key)
if val is None:
organic = entry.get("organic", {})
val = organic.get(key)
if val is not None:
if key not in previous_snapshot:
previous_snapshot[key] = []
previous_snapshot[key].append(float(val))
# Average the values
previous_snapshot = {
k: sum(v) / len(v) for k, v in previous_snapshot.items() if v
}
# Identify wins and concerns
if isinstance(current_snapshot, dict):
report.wins = self.identify_wins(current_snapshot, previous_snapshot)
report.concerns = self.identify_concerns(current_snapshot, previous_snapshot)
else:
report.wins = []
report.concerns = []
# Calculate health score (simple heuristic)
traffic = self._extract_metric(current_snapshot, "traffic") if isinstance(current_snapshot, dict) else 0
keywords = self._extract_metric(current_snapshot, "keywords") if isinstance(current_snapshot, dict) else 0
score_components = []
if traffic > 0:
score_components.append(min(100, traffic / 100))
if keywords > 0:
score_components.append(min(100, keywords / 50))
if dr_history:
latest_dr = dr_history[-1] if isinstance(dr_history, list) else {}
dr_val = latest_dr.get("domain_rating", latest_dr.get("domainRating", 0))
score_components.append(float(dr_val))
report.health_score = round(
sum(score_components) / max(len(score_components), 1), 1
)
# Health trend
win_count = len(report.wins)
concern_count = len(report.concerns)
if win_count > concern_count:
report.health_trend = "improving"
elif concern_count > win_count:
report.health_trend = "declining"
else:
report.health_trend = "stable"
# Traffic value change
curr_cost = self._extract_metric(current_snapshot, "cost") if isinstance(current_snapshot, dict) else 0
prev_cost = previous_snapshot.get("cost", 0)
report.traffic_value_change = round((curr_cost - prev_cost) / 100.0, 2)
# Executive summary
report.executive_summary = self.generate_executive_summary(
report.wins, report.concerns,
report.health_score, report.health_trend,
report.traffic_value_change,
)
if not executive_only:
# Tactical breakdown
report.tactical_breakdown = self.generate_tactical_breakdown(
current_snapshot if isinstance(current_snapshot, dict) else {},
report.wins, report.concerns,
)
# Target comparison
if targets_path:
try:
targets_data = json.loads(Path(targets_path).read_text())
# Use 30-day targets by default
target_set = targets_data.get("30_day", targets_data)
report.target_progress = self.compare_targets(
current_snapshot if isinstance(current_snapshot, dict) else {},
target_set,
)
except Exception as exc:
report.errors.append(f"Targets load error: {exc}")
return report
# ---------------------------------------------------------------------------
# Output formatting
# ---------------------------------------------------------------------------
def format_text_report(report: PerformanceReport) -> str:
"""Format performance report as human-readable text."""
lines = []
lines.append("=" * 60)
lines.append(f"SEO Performance Report: {report.url}")
lines.append(f"Period: {report.period} ({report.date_from} to {report.date_to})")
lines.append(f"Generated: {report.timestamp}")
lines.append("=" * 60)
# Executive Summary
lines.append("\nEXECUTIVE SUMMARY")
lines.append("-" * 40)
es = report.executive_summary
lines.append(f" Health Score: {es.get('health_score', 0)}/100")
trend_arrow = {"improving": "^", "declining": "v", "stable": "="}.get(
es.get("health_trend", "stable"), "="
)
lines.append(f" Trend: {trend_arrow} {es.get('health_trend', 'stable')}")
lines.append(f" Traffic Value Change: ${es.get('traffic_value_change_usd', 0):,.2f}")
lines.append(f" Assessment: {es.get('overall_assessment', 'N/A')}")
# Wins
lines.append(f"\n Top Wins ({es.get('total_wins', 0)} total):")
for w in es.get("top_wins", []):
impact_marker = {"high": "!!!", "medium": "!!", "low": "!"}.get(w.get("impact", "low"), "!")
lines.append(f" {impact_marker} [{w.get('category', '')}] {w.get('description', '')}")
# Concerns
lines.append(f"\n Top Concerns ({es.get('total_concerns', 0)} total):")
for c in es.get("top_concerns", []):
impact_marker = {"high": "!!!", "medium": "!!", "low": "!"}.get(c.get("impact", "low"), "!")
lines.append(f" {impact_marker} [{c.get('category', '')}] {c.get('description', '')}")
# Trends
if report.trends:
lines.append("\nTRENDS")
lines.append("-" * 40)
for metric_name, trend_list in report.trends.items():
for t in trend_list:
if isinstance(t, dict):
dir_arrow = {"up": "^", "down": "v", "stable": "="}.get(
t.get("direction", "stable"), "="
)
change_str = f" ({t.get('change_pct', 0):+.1f}%)" if t.get("change_pct") is not None else ""
lines.append(f" {dir_arrow} {metric_name}: {t.get('value', 0):,.2f}{change_str}")
# Tactical Breakdown
if report.tactical_breakdown:
lines.append("\nTACTICAL BREAKDOWN")
lines.append("-" * 40)
for dim_name, dim_data in report.tactical_breakdown.items():
status = dim_data.get("status", "unknown")
status_marker = {
"improving": "^", "declining": "v", "active": "=", "needs_review": "?"
}.get(status, "?")
lines.append(f"\n [{dim_name.upper()}] Status: {status_marker} {status}")
for action in dim_data.get("actions", [])[:3]:
lines.append(f" > {action}")
# Target Progress
if report.target_progress:
lines.append("\nTARGET PROGRESS")
lines.append("-" * 40)
for tp in report.target_progress:
if isinstance(tp, TargetProgress):
bar_filled = int(min(tp.progress_pct, 100) / 5)
bar = "#" * bar_filled + "-" * (20 - bar_filled)
lines.append(
f" {tp.kpi_name}: [{bar}] {tp.progress_pct:.0f}% "
f"(actual: {tp.actual:,.0f} / target: {tp.target:,.0f})"
)
# Errors
if report.errors:
lines.append("\nERRORS")
lines.append("-" * 40)
for err in report.errors:
lines.append(f" ! {err}")
lines.append("\n" + "=" * 60)
return "\n".join(lines)
def serialize_report(report: PerformanceReport) -> dict:
"""Serialize PerformanceReport to JSON-safe dictionary."""
data = {
"url": report.url,
"period": report.period,
"date_from": report.date_from,
"date_to": report.date_to,
"health_score": report.health_score,
"health_trend": report.health_trend,
"trends": report.trends,
"wins": [asdict(w) for w in report.wins],
"concerns": [asdict(c) for c in report.concerns],
"executive_summary": report.executive_summary,
"tactical_breakdown": report.tactical_breakdown,
"target_progress": [asdict(tp) for tp in report.target_progress],
"traffic_value_change": report.traffic_value_change,
"timestamp": report.timestamp,
"errors": report.errors,
}
return data
# ---------------------------------------------------------------------------
# CLI entry point
# ---------------------------------------------------------------------------
def parse_args() -> argparse.Namespace:
"""Parse command-line arguments."""
parser = argparse.ArgumentParser(
description="SEO Performance Reporter - Period-over-period analysis"
)
parser.add_argument(
"--url", required=True, help="Target URL or domain"
)
parser.add_argument(
"--period", choices=["monthly", "quarterly", "yearly", "custom"],
default="monthly", help="Report period (default: monthly)"
)
parser.add_argument(
"--from", dest="date_from", type=str, default=None,
help="Start date (YYYY-MM-DD) for custom period"
)
parser.add_argument(
"--to", dest="date_to", type=str, default=None,
help="End date (YYYY-MM-DD) for custom period"
)
parser.add_argument(
"--executive", action="store_true",
help="Generate executive summary only"
)
parser.add_argument(
"--targets", type=str, default=None,
help="Path to targets JSON file for progress comparison"
)
parser.add_argument(
"--json", action="store_true",
help="Output results as JSON"
)
parser.add_argument(
"--output", type=str, default=None,
help="Save output to file path"
)
return parser.parse_args()
async def main() -> None:
"""Main entry point."""
args = parse_args()
reporter = PerformanceReporter()
result = await reporter.report(
url=args.url,
period=args.period,
date_from=args.date_from,
date_to=args.date_to,
executive_only=args.executive,
targets_path=args.targets,
)
if args.json:
output = json.dumps(serialize_report(result), indent=2, ensure_ascii=False)
else:
output = format_text_report(result)
if args.output:
Path(args.output).write_text(output, encoding="utf-8")
logger.info(f"Output saved to {args.output}")
else:
print(output)
reporter.print_stats()
if __name__ == "__main__":
asyncio.run(main())