Files
Andrew Yim a3ff965b87 Add SEO skills 19-28, 31-32 with full Python implementations
12 new skills: Keyword Strategy, SERP Analysis, Position Tracking,
Link Building, Content Strategy, E-Commerce SEO, KPI Framework,
International SEO, AI Visibility, Knowledge Graph, Competitor Intel,
and Crawl Budget. ~20K lines of Python across 25 domain scripts.
Updated skill 11 pipeline table and repo CLAUDE.md.
Enhanced skill 18 local SEO workflow from jamie.clinic audit.

Note: Skill 26 hreflang_validator.py pending (content filter block).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-13 12:05:59 +09:00

729 lines
26 KiB
Python

"""
Ranking Reporter - Ranking Performance Reports with Trends
==========================================================
Purpose: Generate ranking reports with trend analysis, top movers, and competitor comparison
Python: 3.10+
Usage:
python ranking_reporter.py --target https://example.com --period 30 --json
python ranking_reporter.py --target https://example.com --period 90 --json
python ranking_reporter.py --target https://example.com --competitor https://comp1.com --period 30 --json
"""
import argparse
import asyncio
import json
import logging
import sys
from dataclasses import dataclass, field, asdict
from datetime import datetime, timedelta
from typing import Optional
from urllib.parse import urlparse
from base_client import BaseAsyncClient, config
logger = logging.getLogger(__name__)
# CTR weights for impact scoring (same as position_tracker)
CTR_WEIGHTS: dict[int, float] = {
1: 0.300, 2: 0.150, 3: 0.100, 4: 0.070, 5: 0.050,
6: 0.038, 7: 0.030, 8: 0.025, 9: 0.020, 10: 0.018,
}
for _p in range(11, 21):
CTR_WEIGHTS[_p] = round(0.015 - (_p - 11) * 0.001, 4)
for _p in range(21, 51):
CTR_WEIGHTS[_p] = round(max(0.005 - (_p - 21) * 0.0001, 0.001), 4)
for _p in range(51, 101):
CTR_WEIGHTS[_p] = 0.0005
# ---------------------------------------------------------------------------
# Data classes
# ---------------------------------------------------------------------------
@dataclass
class PositionSnapshot:
"""A single position measurement at a point in time."""
date: str
position: int
volume: int = 0
url: str = ""
@dataclass
class RankingTrend:
"""Keyword ranking trend over time."""
keyword: str
positions_over_time: list[PositionSnapshot] = field(default_factory=list)
trend_direction: str = "stable" # improved, declined, stable, new, lost
avg_position: float = 0.0
current_position: int = 0
start_position: int = 0
total_change: int = 0
volume: int = 0
intent: str = "informational"
is_brand: bool = False
def compute_trend(self):
"""Compute trend direction and average from position history."""
if not self.positions_over_time:
self.trend_direction = "stable"
return
positions = [s.position for s in self.positions_over_time if s.position > 0]
if not positions:
self.trend_direction = "lost"
return
self.avg_position = sum(positions) / len(positions)
self.current_position = positions[-1]
self.start_position = positions[0]
self.total_change = self.start_position - self.current_position
# Determine trend using linear regression direction
if len(positions) >= 2:
n = len(positions)
x_mean = (n - 1) / 2.0
y_mean = sum(positions) / n
numerator = sum((i - x_mean) * (p - y_mean) for i, p in enumerate(positions))
denominator = sum((i - x_mean) ** 2 for i in range(n))
if denominator > 0:
slope = numerator / denominator
# Negative slope means position number decreasing = improving
if slope < -0.5:
self.trend_direction = "improved"
elif slope > 0.5:
self.trend_direction = "declined"
else:
self.trend_direction = "stable"
else:
self.trend_direction = "stable"
if self.volume == 0 and self.positions_over_time:
self.volume = self.positions_over_time[-1].volume
@dataclass
class TopMover:
"""Keyword with significant position change."""
keyword: str
position_change: int
current_position: int = 0
previous_position: int = 0
volume: int = 0
impact_score: float = 0.0
direction: str = "improved"
def calculate_impact(self):
"""Calculate impact score: volume * CTR delta."""
old_ctr = CTR_WEIGHTS.get(self.previous_position, 0.0005) if self.previous_position > 0 else 0.0
new_ctr = CTR_WEIGHTS.get(self.current_position, 0.0005) if self.current_position > 0 else 0.0
ctr_delta = abs(new_ctr - old_ctr)
self.impact_score = round(self.volume * ctr_delta, 2)
self.direction = "improved" if self.position_change > 0 else "declined"
@dataclass
class SegmentReport:
"""Performance breakdown for a keyword segment."""
segment_name: str
total_keywords: int = 0
avg_position: float = 0.0
avg_position_change: float = 0.0
visibility_score: float = 0.0
improved_count: int = 0
declined_count: int = 0
stable_count: int = 0
top_gainers: list[TopMover] = field(default_factory=list)
top_losers: list[TopMover] = field(default_factory=list)
@dataclass
class CompetitorReport:
"""Competitor comparison for a reporting period."""
competitor: str
our_visibility: float = 0.0
their_visibility: float = 0.0
overlap_keywords: int = 0
keywords_we_lead: int = 0
keywords_they_lead: int = 0
notable_gaps: list[dict] = field(default_factory=list)
@dataclass
class RankingReport:
"""Complete ranking performance report."""
target: str
period_days: int = 30
period_start: str = ""
period_end: str = ""
total_keywords: int = 0
current_visibility: float = 0.0
previous_visibility: float = 0.0
visibility_change: float = 0.0
trend_summary: dict = field(default_factory=lambda: {
"improved": 0, "declined": 0, "stable": 0, "new": 0, "lost": 0,
})
top_gainers: list[TopMover] = field(default_factory=list)
top_losers: list[TopMover] = field(default_factory=list)
segments: list[SegmentReport] = field(default_factory=list)
competitors: list[CompetitorReport] = field(default_factory=list)
keyword_trends: list[RankingTrend] = field(default_factory=list)
timestamp: str = ""
def __post_init__(self):
if not self.timestamp:
self.timestamp = datetime.now().isoformat()
if not self.period_end:
self.period_end = datetime.now().strftime("%Y-%m-%d")
if not self.period_start:
start = datetime.now() - timedelta(days=self.period_days)
self.period_start = start.strftime("%Y-%m-%d")
def to_dict(self) -> dict:
"""Convert to JSON-serializable dictionary."""
return {
"target": self.target,
"period": {
"days": self.period_days,
"start": self.period_start,
"end": self.period_end,
},
"total_keywords": self.total_keywords,
"visibility": {
"current": round(self.current_visibility, 2),
"previous": round(self.previous_visibility, 2),
"change": round(self.visibility_change, 2),
},
"trend_summary": self.trend_summary,
"top_gainers": [asdict(m) for m in self.top_gainers],
"top_losers": [asdict(m) for m in self.top_losers],
"segments": [asdict(s) for s in self.segments],
"competitors": [asdict(c) for c in self.competitors],
"keyword_trends": [
{
"keyword": t.keyword,
"trend_direction": t.trend_direction,
"avg_position": round(t.avg_position, 1),
"current_position": t.current_position,
"start_position": t.start_position,
"total_change": t.total_change,
"volume": t.volume,
}
for t in self.keyword_trends
],
"timestamp": self.timestamp,
}
# ---------------------------------------------------------------------------
# Ranking Reporter
# ---------------------------------------------------------------------------
class RankingReporter(BaseAsyncClient):
"""Generate ranking performance reports with trend analysis."""
def __init__(self):
super().__init__(
max_concurrent=5,
requests_per_second=2.0,
logger=logger,
)
def _extract_domain_brand(self, target: str) -> list[str]:
"""Extract brand terms from the target domain name."""
parsed = urlparse(target)
hostname = parsed.hostname or target
parts = hostname.replace("www.", "").split(".")
brand_parts = []
for part in parts:
if part not in ("com", "co", "kr", "net", "org", "io", "ai", "www"):
brand_parts.append(part.lower())
if "-" in part:
brand_parts.extend(part.lower().split("-"))
return list(set(brand_parts))
async def get_historical_positions(
self,
target: str,
period_days: int = 30,
) -> list[RankingTrend]:
"""
Fetch historical position data from Ahrefs rank-tracker-overview
with date range parameters.
Returns list of RankingTrend objects with position snapshots over time.
"""
logger.info(f"Fetching historical positions for {target} ({period_days} days)")
brand_terms = self._extract_domain_brand(target)
end_date = datetime.now().strftime("%Y-%m-%d")
start_date = (datetime.now() - timedelta(days=period_days)).strftime("%Y-%m-%d")
raw_data = await self._call_rank_tracker_historical(target, start_date, end_date)
trends: dict[str, RankingTrend] = {}
for item in raw_data:
keyword = item.get("keyword", "")
if keyword not in trends:
is_brand = any(term in keyword.lower() for term in brand_terms)
trends[keyword] = RankingTrend(
keyword=keyword,
volume=item.get("volume", 0),
intent=item.get("intent", "informational"),
is_brand=is_brand,
)
snapshot = PositionSnapshot(
date=item.get("date", end_date),
position=item.get("position", 0),
volume=item.get("volume", 0),
url=item.get("url", ""),
)
trends[keyword].positions_over_time.append(snapshot)
# Sort snapshots by date and compute trends
for trend in trends.values():
trend.positions_over_time.sort(key=lambda s: s.date)
trend.compute_trend()
logger.info(f"Retrieved trends for {len(trends)} keywords")
return list(trends.values())
async def _call_rank_tracker_historical(
self, target: str, start_date: str, end_date: str,
) -> list[dict]:
"""Call Ahrefs rank-tracker-overview with date range."""
logger.info(f"Calling Ahrefs rank-tracker-overview ({start_date} to {end_date})...")
try:
import subprocess
result = subprocess.run(
["mcp-cli", "call", "ahrefs/rank-tracker-overview",
json.dumps({
"target": target,
"date_from": start_date,
"date_to": end_date,
})],
capture_output=True, text=True, timeout=60,
)
if result.returncode == 0:
data = json.loads(result.stdout)
return data.get("keywords", data.get("results", []))
except (FileNotFoundError, subprocess.TimeoutExpired, json.JSONDecodeError):
pass
return []
def calculate_trends(self, trends: list[RankingTrend]) -> dict:
"""
Compute overall trend summary from keyword trends.
Returns dict with improved/declined/stable/new/lost counts.
"""
summary = {
"improved": 0,
"declined": 0,
"stable": 0,
"new": 0,
"lost": 0,
}
for trend in trends:
direction = trend.trend_direction
if direction in summary:
summary[direction] += 1
else:
summary["stable"] += 1
logger.info(
f"Trend summary: improved={summary['improved']}, "
f"declined={summary['declined']}, stable={summary['stable']}"
)
return summary
def find_top_movers(
self,
trends: list[RankingTrend],
limit: int = 10,
) -> tuple[list[TopMover], list[TopMover]]:
"""
Find keywords with biggest position gains and losses.
Returns tuple of (top_gainers, top_losers) sorted by impact score.
"""
gainers: list[TopMover] = []
losers: list[TopMover] = []
for trend in trends:
if not trend.positions_over_time or len(trend.positions_over_time) < 2:
continue
first_pos = trend.start_position
last_pos = trend.current_position
if first_pos <= 0 or last_pos <= 0:
continue
change = first_pos - last_pos # positive = improved
mover = TopMover(
keyword=trend.keyword,
position_change=change,
current_position=last_pos,
previous_position=first_pos,
volume=trend.volume,
)
mover.calculate_impact()
if change > 0:
gainers.append(mover)
elif change < 0:
losers.append(mover)
# Sort by impact score descending
gainers.sort(key=lambda m: m.impact_score, reverse=True)
losers.sort(key=lambda m: m.impact_score, reverse=True)
logger.info(f"Top movers: {len(gainers)} gainers, {len(losers)} losers")
return gainers[:limit], losers[:limit]
def _calculate_visibility_score(self, trends: list[RankingTrend], use_start: bool = False) -> float:
"""Calculate visibility score from trends (current or start positions)."""
total_weighted = 0.0
total_volume = 0
for trend in trends:
pos = trend.start_position if use_start else trend.current_position
if pos <= 0 or pos > 100:
continue
volume = max(trend.volume, 1)
total_volume += volume
ctr = CTR_WEIGHTS.get(pos, 0.0005)
total_weighted += volume * ctr
if total_volume > 0:
max_possible = total_volume * CTR_WEIGHTS[1]
return (total_weighted / max_possible) * 100.0
return 0.0
def generate_segment_report(self, trends: list[RankingTrend]) -> list[SegmentReport]:
"""
Generate performance breakdown by keyword segment.
Segments include: brand, non_brand, and by intent type.
"""
segment_map: dict[str, list[RankingTrend]] = {}
for trend in trends:
# Brand segment
brand_key = "brand" if trend.is_brand else "non_brand"
if brand_key not in segment_map:
segment_map[brand_key] = []
segment_map[brand_key].append(trend)
# Intent segment
intent_key = f"intent_{trend.intent.lower()}" if trend.intent else "intent_informational"
if intent_key not in segment_map:
segment_map[intent_key] = []
segment_map[intent_key].append(trend)
reports: list[SegmentReport] = []
for seg_name, seg_trends in sorted(segment_map.items()):
if not seg_trends:
continue
active = [t for t in seg_trends if t.current_position > 0]
avg_pos = sum(t.current_position for t in active) / len(active) if active else 0.0
avg_change = sum(t.total_change for t in seg_trends) / len(seg_trends) if seg_trends else 0.0
vis = self._calculate_visibility_score(seg_trends, use_start=False)
improved = sum(1 for t in seg_trends if t.trend_direction == "improved")
declined = sum(1 for t in seg_trends if t.trend_direction == "declined")
stable = sum(1 for t in seg_trends if t.trend_direction == "stable")
# Get top movers within segment
seg_gainers, seg_losers = self.find_top_movers(seg_trends, limit=5)
report = SegmentReport(
segment_name=seg_name,
total_keywords=len(seg_trends),
avg_position=round(avg_pos, 1),
avg_position_change=round(avg_change, 1),
visibility_score=round(vis, 2),
improved_count=improved,
declined_count=declined,
stable_count=stable,
top_gainers=seg_gainers,
top_losers=seg_losers,
)
reports.append(report)
return reports
async def compare_with_competitor(
self,
target: str,
competitor: str,
period_days: int = 30,
) -> CompetitorReport:
"""
Period-over-period comparison with a competitor.
Uses Ahrefs rank-tracker-competitors-stats for detailed comparison.
"""
logger.info(f"Comparing {target} vs {competitor} over {period_days} days")
comp_data = await self._call_competitors_stats(target, competitor)
report = CompetitorReport(competitor=competitor)
if comp_data:
report.our_visibility = comp_data.get("target_visibility", 0.0)
report.their_visibility = comp_data.get("competitor_visibility", 0.0)
report.overlap_keywords = comp_data.get("overlap_keywords", 0)
report.keywords_we_lead = comp_data.get("target_better", 0)
report.keywords_they_lead = comp_data.get("competitor_better", 0)
# Extract notable gaps
gaps = comp_data.get("keyword_gaps", [])
report.notable_gaps = [
{
"keyword": g.get("keyword", ""),
"our_position": g.get("target_position", 0),
"their_position": g.get("competitor_position", 0),
"volume": g.get("volume", 0),
}
for g in gaps[:15]
]
return report
async def _call_competitors_stats(self, target: str, competitor: str) -> dict:
"""Call Ahrefs rank-tracker-competitors-stats MCP tool."""
logger.info("Calling Ahrefs rank-tracker-competitors-stats...")
try:
import subprocess
result = subprocess.run(
["mcp-cli", "call", "ahrefs/rank-tracker-competitors-stats",
json.dumps({"target": target, "competitor": competitor})],
capture_output=True, text=True, timeout=60,
)
if result.returncode == 0:
return json.loads(result.stdout)
except (FileNotFoundError, subprocess.TimeoutExpired, json.JSONDecodeError):
pass
return {}
async def generate_report(
self,
target: str,
period_days: int = 30,
competitors: Optional[list[str]] = None,
) -> RankingReport:
"""
Orchestrate full ranking performance report generation.
Args:
target: Target website URL
period_days: Reporting period in days
competitors: List of competitor URLs to compare
Returns:
Complete RankingReport with trends, movers, segments, and comparisons
"""
logger.info(f"Generating ranking report for: {target} ({period_days} days)")
report = RankingReport(target=target, period_days=period_days)
# Step 1: Fetch historical position data
trends = await self.get_historical_positions(target, period_days)
if not trends:
logger.warning("No historical data retrieved. Check Ahrefs project configuration.")
return report
report.keyword_trends = trends
report.total_keywords = len(trends)
# Step 2: Calculate trend summary
report.trend_summary = self.calculate_trends(trends)
# Step 3: Calculate visibility scores (current vs period start)
report.current_visibility = self._calculate_visibility_score(trends, use_start=False)
report.previous_visibility = self._calculate_visibility_score(trends, use_start=True)
report.visibility_change = report.current_visibility - report.previous_visibility
# Step 4: Find top movers
gainers, losers = self.find_top_movers(trends, limit=10)
report.top_gainers = gainers
report.top_losers = losers
# Step 5: Generate segment reports
report.segments = self.generate_segment_report(trends)
# Step 6: Compare with competitors
if competitors:
for competitor in competitors:
comp_report = await self.compare_with_competitor(
target, competitor, period_days,
)
report.competitors.append(comp_report)
logger.info(f"Report complete. Keywords: {report.total_keywords}")
logger.info(
f"Visibility: {report.previous_visibility:.2f} -> "
f"{report.current_visibility:.2f} ({report.visibility_change:+.2f})"
)
return report
# ---------------------------------------------------------------------------
# Output formatters
# ---------------------------------------------------------------------------
def format_text_report(report: RankingReport) -> str:
"""Format ranking report as human-readable text."""
lines = []
lines.append("=" * 60)
lines.append(f"Ranking Performance Report: {report.target}")
lines.append(f"Period: {report.period_start} ~ {report.period_end} ({report.period_days} days)")
lines.append(f"Generated: {report.timestamp}")
lines.append("=" * 60)
# Visibility trend
lines.append(f"\nVisibility Score:")
lines.append(f" Current: {report.current_visibility:.2f}")
lines.append(f" Previous: {report.previous_visibility:.2f}")
change_sign = "+" if report.visibility_change >= 0 else ""
lines.append(f" Change: {change_sign}{report.visibility_change:.2f}")
# Trend summary
ts = report.trend_summary
lines.append(f"\nKeyword Trends ({report.total_keywords} total):")
lines.append(f" Improved: {ts.get('improved', 0)}")
lines.append(f" Declined: {ts.get('declined', 0)}")
lines.append(f" Stable: {ts.get('stable', 0)}")
lines.append(f" New: {ts.get('new', 0)}")
lines.append(f" Lost: {ts.get('lost', 0)}")
# Top gainers
if report.top_gainers:
lines.append(f"\nTop Gainers:")
lines.append("-" * 60)
for m in report.top_gainers:
lines.append(
f" {m.keyword}: {m.previous_position} -> {m.current_position} "
f"(+{m.position_change}) | Vol: {m.volume} | Impact: {m.impact_score}"
)
# Top losers
if report.top_losers:
lines.append(f"\nTop Losers:")
lines.append("-" * 60)
for m in report.top_losers:
lines.append(
f" {m.keyword}: {m.previous_position} -> {m.current_position} "
f"({m.position_change}) | Vol: {m.volume} | Impact: {m.impact_score}"
)
# Segments
if report.segments:
lines.append(f"\nSegment Breakdown:")
lines.append("-" * 60)
for seg in report.segments:
lines.append(
f" {seg.segment_name}: {seg.total_keywords} kw, "
f"avg pos {seg.avg_position}, vis {seg.visibility_score}, "
f"improved {seg.improved_count} / declined {seg.declined_count}"
)
# Competitors
if report.competitors:
lines.append(f"\nCompetitor Comparison:")
lines.append("-" * 60)
for comp in report.competitors:
lines.append(f" vs {comp.competitor}:")
lines.append(f" Our visibility: {comp.our_visibility:.2f}")
lines.append(f" Their visibility: {comp.their_visibility:.2f}")
lines.append(f" Overlap: {comp.overlap_keywords} keywords")
lines.append(f" We lead: {comp.keywords_we_lead}")
lines.append(f" They lead: {comp.keywords_they_lead}")
if comp.notable_gaps:
lines.append(f" Notable gaps:")
for gap in comp.notable_gaps[:5]:
lines.append(
f" {gap['keyword']}: us #{gap['our_position']} "
f"vs them #{gap['their_position']} (vol: {gap['volume']})"
)
lines.append("\n" + "=" * 60)
return "\n".join(lines)
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Ranking Reporter - Generate ranking performance reports with trends",
)
parser.add_argument(
"--target",
required=True,
help="Target website URL (e.g., https://example.com)",
)
parser.add_argument(
"--period",
type=int,
default=30,
help="Reporting period in days (default: 30)",
)
parser.add_argument(
"--competitor",
action="append",
dest="competitors",
default=[],
help="Competitor URL to compare (repeatable)",
)
parser.add_argument(
"--json",
action="store_true",
dest="json_output",
help="Output in JSON format",
)
parser.add_argument(
"--output",
type=str,
default=None,
help="Save output to file path",
)
return parser.parse_args()
async def main():
args = parse_args()
reporter = RankingReporter()
report = await reporter.generate_report(
target=args.target,
period_days=args.period,
competitors=args.competitors,
)
if args.json_output:
output = json.dumps(report.to_dict(), ensure_ascii=False, indent=2)
else:
output = format_text_report(report)
if args.output:
with open(args.output, "w", encoding="utf-8") as f:
f.write(output)
logger.info(f"Output saved to: {args.output}")
else:
print(output)
reporter.print_stats()
if __name__ == "__main__":
asyncio.run(main())