12 new skills: Keyword Strategy, SERP Analysis, Position Tracking, Link Building, Content Strategy, E-Commerce SEO, KPI Framework, International SEO, AI Visibility, Knowledge Graph, Competitor Intel, and Crawl Budget. ~20K lines of Python across 25 domain scripts. Updated skill 11 pipeline table and repo CLAUDE.md. Enhanced skill 18 local SEO workflow from jamie.clinic audit. Note: Skill 26 hreflang_validator.py pending (content filter block). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
729 lines
26 KiB
Python
729 lines
26 KiB
Python
"""
|
|
Ranking Reporter - Ranking Performance Reports with Trends
|
|
==========================================================
|
|
Purpose: Generate ranking reports with trend analysis, top movers, and competitor comparison
|
|
Python: 3.10+
|
|
|
|
Usage:
|
|
python ranking_reporter.py --target https://example.com --period 30 --json
|
|
python ranking_reporter.py --target https://example.com --period 90 --json
|
|
python ranking_reporter.py --target https://example.com --competitor https://comp1.com --period 30 --json
|
|
"""
|
|
|
|
import argparse
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import sys
|
|
from dataclasses import dataclass, field, asdict
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional
|
|
from urllib.parse import urlparse
|
|
|
|
from base_client import BaseAsyncClient, config
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# CTR weights for impact scoring (same as position_tracker)
|
|
CTR_WEIGHTS: dict[int, float] = {
|
|
1: 0.300, 2: 0.150, 3: 0.100, 4: 0.070, 5: 0.050,
|
|
6: 0.038, 7: 0.030, 8: 0.025, 9: 0.020, 10: 0.018,
|
|
}
|
|
for _p in range(11, 21):
|
|
CTR_WEIGHTS[_p] = round(0.015 - (_p - 11) * 0.001, 4)
|
|
for _p in range(21, 51):
|
|
CTR_WEIGHTS[_p] = round(max(0.005 - (_p - 21) * 0.0001, 0.001), 4)
|
|
for _p in range(51, 101):
|
|
CTR_WEIGHTS[_p] = 0.0005
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Data classes
|
|
# ---------------------------------------------------------------------------
|
|
@dataclass
|
|
class PositionSnapshot:
|
|
"""A single position measurement at a point in time."""
|
|
date: str
|
|
position: int
|
|
volume: int = 0
|
|
url: str = ""
|
|
|
|
|
|
@dataclass
|
|
class RankingTrend:
|
|
"""Keyword ranking trend over time."""
|
|
keyword: str
|
|
positions_over_time: list[PositionSnapshot] = field(default_factory=list)
|
|
trend_direction: str = "stable" # improved, declined, stable, new, lost
|
|
avg_position: float = 0.0
|
|
current_position: int = 0
|
|
start_position: int = 0
|
|
total_change: int = 0
|
|
volume: int = 0
|
|
intent: str = "informational"
|
|
is_brand: bool = False
|
|
|
|
def compute_trend(self):
|
|
"""Compute trend direction and average from position history."""
|
|
if not self.positions_over_time:
|
|
self.trend_direction = "stable"
|
|
return
|
|
|
|
positions = [s.position for s in self.positions_over_time if s.position > 0]
|
|
if not positions:
|
|
self.trend_direction = "lost"
|
|
return
|
|
|
|
self.avg_position = sum(positions) / len(positions)
|
|
self.current_position = positions[-1]
|
|
self.start_position = positions[0]
|
|
self.total_change = self.start_position - self.current_position
|
|
|
|
# Determine trend using linear regression direction
|
|
if len(positions) >= 2:
|
|
n = len(positions)
|
|
x_mean = (n - 1) / 2.0
|
|
y_mean = sum(positions) / n
|
|
numerator = sum((i - x_mean) * (p - y_mean) for i, p in enumerate(positions))
|
|
denominator = sum((i - x_mean) ** 2 for i in range(n))
|
|
|
|
if denominator > 0:
|
|
slope = numerator / denominator
|
|
# Negative slope means position number decreasing = improving
|
|
if slope < -0.5:
|
|
self.trend_direction = "improved"
|
|
elif slope > 0.5:
|
|
self.trend_direction = "declined"
|
|
else:
|
|
self.trend_direction = "stable"
|
|
else:
|
|
self.trend_direction = "stable"
|
|
|
|
if self.volume == 0 and self.positions_over_time:
|
|
self.volume = self.positions_over_time[-1].volume
|
|
|
|
|
|
@dataclass
|
|
class TopMover:
|
|
"""Keyword with significant position change."""
|
|
keyword: str
|
|
position_change: int
|
|
current_position: int = 0
|
|
previous_position: int = 0
|
|
volume: int = 0
|
|
impact_score: float = 0.0
|
|
direction: str = "improved"
|
|
|
|
def calculate_impact(self):
|
|
"""Calculate impact score: volume * CTR delta."""
|
|
old_ctr = CTR_WEIGHTS.get(self.previous_position, 0.0005) if self.previous_position > 0 else 0.0
|
|
new_ctr = CTR_WEIGHTS.get(self.current_position, 0.0005) if self.current_position > 0 else 0.0
|
|
ctr_delta = abs(new_ctr - old_ctr)
|
|
self.impact_score = round(self.volume * ctr_delta, 2)
|
|
self.direction = "improved" if self.position_change > 0 else "declined"
|
|
|
|
|
|
@dataclass
|
|
class SegmentReport:
|
|
"""Performance breakdown for a keyword segment."""
|
|
segment_name: str
|
|
total_keywords: int = 0
|
|
avg_position: float = 0.0
|
|
avg_position_change: float = 0.0
|
|
visibility_score: float = 0.0
|
|
improved_count: int = 0
|
|
declined_count: int = 0
|
|
stable_count: int = 0
|
|
top_gainers: list[TopMover] = field(default_factory=list)
|
|
top_losers: list[TopMover] = field(default_factory=list)
|
|
|
|
|
|
@dataclass
|
|
class CompetitorReport:
|
|
"""Competitor comparison for a reporting period."""
|
|
competitor: str
|
|
our_visibility: float = 0.0
|
|
their_visibility: float = 0.0
|
|
overlap_keywords: int = 0
|
|
keywords_we_lead: int = 0
|
|
keywords_they_lead: int = 0
|
|
notable_gaps: list[dict] = field(default_factory=list)
|
|
|
|
|
|
@dataclass
|
|
class RankingReport:
|
|
"""Complete ranking performance report."""
|
|
target: str
|
|
period_days: int = 30
|
|
period_start: str = ""
|
|
period_end: str = ""
|
|
total_keywords: int = 0
|
|
current_visibility: float = 0.0
|
|
previous_visibility: float = 0.0
|
|
visibility_change: float = 0.0
|
|
trend_summary: dict = field(default_factory=lambda: {
|
|
"improved": 0, "declined": 0, "stable": 0, "new": 0, "lost": 0,
|
|
})
|
|
top_gainers: list[TopMover] = field(default_factory=list)
|
|
top_losers: list[TopMover] = field(default_factory=list)
|
|
segments: list[SegmentReport] = field(default_factory=list)
|
|
competitors: list[CompetitorReport] = field(default_factory=list)
|
|
keyword_trends: list[RankingTrend] = field(default_factory=list)
|
|
timestamp: str = ""
|
|
|
|
def __post_init__(self):
|
|
if not self.timestamp:
|
|
self.timestamp = datetime.now().isoformat()
|
|
if not self.period_end:
|
|
self.period_end = datetime.now().strftime("%Y-%m-%d")
|
|
if not self.period_start:
|
|
start = datetime.now() - timedelta(days=self.period_days)
|
|
self.period_start = start.strftime("%Y-%m-%d")
|
|
|
|
def to_dict(self) -> dict:
|
|
"""Convert to JSON-serializable dictionary."""
|
|
return {
|
|
"target": self.target,
|
|
"period": {
|
|
"days": self.period_days,
|
|
"start": self.period_start,
|
|
"end": self.period_end,
|
|
},
|
|
"total_keywords": self.total_keywords,
|
|
"visibility": {
|
|
"current": round(self.current_visibility, 2),
|
|
"previous": round(self.previous_visibility, 2),
|
|
"change": round(self.visibility_change, 2),
|
|
},
|
|
"trend_summary": self.trend_summary,
|
|
"top_gainers": [asdict(m) for m in self.top_gainers],
|
|
"top_losers": [asdict(m) for m in self.top_losers],
|
|
"segments": [asdict(s) for s in self.segments],
|
|
"competitors": [asdict(c) for c in self.competitors],
|
|
"keyword_trends": [
|
|
{
|
|
"keyword": t.keyword,
|
|
"trend_direction": t.trend_direction,
|
|
"avg_position": round(t.avg_position, 1),
|
|
"current_position": t.current_position,
|
|
"start_position": t.start_position,
|
|
"total_change": t.total_change,
|
|
"volume": t.volume,
|
|
}
|
|
for t in self.keyword_trends
|
|
],
|
|
"timestamp": self.timestamp,
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Ranking Reporter
|
|
# ---------------------------------------------------------------------------
|
|
class RankingReporter(BaseAsyncClient):
|
|
"""Generate ranking performance reports with trend analysis."""
|
|
|
|
def __init__(self):
|
|
super().__init__(
|
|
max_concurrent=5,
|
|
requests_per_second=2.0,
|
|
logger=logger,
|
|
)
|
|
|
|
def _extract_domain_brand(self, target: str) -> list[str]:
|
|
"""Extract brand terms from the target domain name."""
|
|
parsed = urlparse(target)
|
|
hostname = parsed.hostname or target
|
|
parts = hostname.replace("www.", "").split(".")
|
|
brand_parts = []
|
|
for part in parts:
|
|
if part not in ("com", "co", "kr", "net", "org", "io", "ai", "www"):
|
|
brand_parts.append(part.lower())
|
|
if "-" in part:
|
|
brand_parts.extend(part.lower().split("-"))
|
|
return list(set(brand_parts))
|
|
|
|
async def get_historical_positions(
|
|
self,
|
|
target: str,
|
|
period_days: int = 30,
|
|
) -> list[RankingTrend]:
|
|
"""
|
|
Fetch historical position data from Ahrefs rank-tracker-overview
|
|
with date range parameters.
|
|
|
|
Returns list of RankingTrend objects with position snapshots over time.
|
|
"""
|
|
logger.info(f"Fetching historical positions for {target} ({period_days} days)")
|
|
brand_terms = self._extract_domain_brand(target)
|
|
|
|
end_date = datetime.now().strftime("%Y-%m-%d")
|
|
start_date = (datetime.now() - timedelta(days=period_days)).strftime("%Y-%m-%d")
|
|
|
|
raw_data = await self._call_rank_tracker_historical(target, start_date, end_date)
|
|
|
|
trends: dict[str, RankingTrend] = {}
|
|
for item in raw_data:
|
|
keyword = item.get("keyword", "")
|
|
if keyword not in trends:
|
|
is_brand = any(term in keyword.lower() for term in brand_terms)
|
|
trends[keyword] = RankingTrend(
|
|
keyword=keyword,
|
|
volume=item.get("volume", 0),
|
|
intent=item.get("intent", "informational"),
|
|
is_brand=is_brand,
|
|
)
|
|
|
|
snapshot = PositionSnapshot(
|
|
date=item.get("date", end_date),
|
|
position=item.get("position", 0),
|
|
volume=item.get("volume", 0),
|
|
url=item.get("url", ""),
|
|
)
|
|
trends[keyword].positions_over_time.append(snapshot)
|
|
|
|
# Sort snapshots by date and compute trends
|
|
for trend in trends.values():
|
|
trend.positions_over_time.sort(key=lambda s: s.date)
|
|
trend.compute_trend()
|
|
|
|
logger.info(f"Retrieved trends for {len(trends)} keywords")
|
|
return list(trends.values())
|
|
|
|
async def _call_rank_tracker_historical(
|
|
self, target: str, start_date: str, end_date: str,
|
|
) -> list[dict]:
|
|
"""Call Ahrefs rank-tracker-overview with date range."""
|
|
logger.info(f"Calling Ahrefs rank-tracker-overview ({start_date} to {end_date})...")
|
|
try:
|
|
import subprocess
|
|
result = subprocess.run(
|
|
["mcp-cli", "call", "ahrefs/rank-tracker-overview",
|
|
json.dumps({
|
|
"target": target,
|
|
"date_from": start_date,
|
|
"date_to": end_date,
|
|
})],
|
|
capture_output=True, text=True, timeout=60,
|
|
)
|
|
if result.returncode == 0:
|
|
data = json.loads(result.stdout)
|
|
return data.get("keywords", data.get("results", []))
|
|
except (FileNotFoundError, subprocess.TimeoutExpired, json.JSONDecodeError):
|
|
pass
|
|
return []
|
|
|
|
def calculate_trends(self, trends: list[RankingTrend]) -> dict:
|
|
"""
|
|
Compute overall trend summary from keyword trends.
|
|
|
|
Returns dict with improved/declined/stable/new/lost counts.
|
|
"""
|
|
summary = {
|
|
"improved": 0,
|
|
"declined": 0,
|
|
"stable": 0,
|
|
"new": 0,
|
|
"lost": 0,
|
|
}
|
|
for trend in trends:
|
|
direction = trend.trend_direction
|
|
if direction in summary:
|
|
summary[direction] += 1
|
|
else:
|
|
summary["stable"] += 1
|
|
|
|
logger.info(
|
|
f"Trend summary: improved={summary['improved']}, "
|
|
f"declined={summary['declined']}, stable={summary['stable']}"
|
|
)
|
|
return summary
|
|
|
|
def find_top_movers(
|
|
self,
|
|
trends: list[RankingTrend],
|
|
limit: int = 10,
|
|
) -> tuple[list[TopMover], list[TopMover]]:
|
|
"""
|
|
Find keywords with biggest position gains and losses.
|
|
|
|
Returns tuple of (top_gainers, top_losers) sorted by impact score.
|
|
"""
|
|
gainers: list[TopMover] = []
|
|
losers: list[TopMover] = []
|
|
|
|
for trend in trends:
|
|
if not trend.positions_over_time or len(trend.positions_over_time) < 2:
|
|
continue
|
|
|
|
first_pos = trend.start_position
|
|
last_pos = trend.current_position
|
|
|
|
if first_pos <= 0 or last_pos <= 0:
|
|
continue
|
|
|
|
change = first_pos - last_pos # positive = improved
|
|
|
|
mover = TopMover(
|
|
keyword=trend.keyword,
|
|
position_change=change,
|
|
current_position=last_pos,
|
|
previous_position=first_pos,
|
|
volume=trend.volume,
|
|
)
|
|
mover.calculate_impact()
|
|
|
|
if change > 0:
|
|
gainers.append(mover)
|
|
elif change < 0:
|
|
losers.append(mover)
|
|
|
|
# Sort by impact score descending
|
|
gainers.sort(key=lambda m: m.impact_score, reverse=True)
|
|
losers.sort(key=lambda m: m.impact_score, reverse=True)
|
|
|
|
logger.info(f"Top movers: {len(gainers)} gainers, {len(losers)} losers")
|
|
return gainers[:limit], losers[:limit]
|
|
|
|
def _calculate_visibility_score(self, trends: list[RankingTrend], use_start: bool = False) -> float:
|
|
"""Calculate visibility score from trends (current or start positions)."""
|
|
total_weighted = 0.0
|
|
total_volume = 0
|
|
|
|
for trend in trends:
|
|
pos = trend.start_position if use_start else trend.current_position
|
|
if pos <= 0 or pos > 100:
|
|
continue
|
|
volume = max(trend.volume, 1)
|
|
total_volume += volume
|
|
ctr = CTR_WEIGHTS.get(pos, 0.0005)
|
|
total_weighted += volume * ctr
|
|
|
|
if total_volume > 0:
|
|
max_possible = total_volume * CTR_WEIGHTS[1]
|
|
return (total_weighted / max_possible) * 100.0
|
|
return 0.0
|
|
|
|
def generate_segment_report(self, trends: list[RankingTrend]) -> list[SegmentReport]:
|
|
"""
|
|
Generate performance breakdown by keyword segment.
|
|
|
|
Segments include: brand, non_brand, and by intent type.
|
|
"""
|
|
segment_map: dict[str, list[RankingTrend]] = {}
|
|
|
|
for trend in trends:
|
|
# Brand segment
|
|
brand_key = "brand" if trend.is_brand else "non_brand"
|
|
if brand_key not in segment_map:
|
|
segment_map[brand_key] = []
|
|
segment_map[brand_key].append(trend)
|
|
|
|
# Intent segment
|
|
intent_key = f"intent_{trend.intent.lower()}" if trend.intent else "intent_informational"
|
|
if intent_key not in segment_map:
|
|
segment_map[intent_key] = []
|
|
segment_map[intent_key].append(trend)
|
|
|
|
reports: list[SegmentReport] = []
|
|
for seg_name, seg_trends in sorted(segment_map.items()):
|
|
if not seg_trends:
|
|
continue
|
|
|
|
active = [t for t in seg_trends if t.current_position > 0]
|
|
avg_pos = sum(t.current_position for t in active) / len(active) if active else 0.0
|
|
avg_change = sum(t.total_change for t in seg_trends) / len(seg_trends) if seg_trends else 0.0
|
|
|
|
vis = self._calculate_visibility_score(seg_trends, use_start=False)
|
|
|
|
improved = sum(1 for t in seg_trends if t.trend_direction == "improved")
|
|
declined = sum(1 for t in seg_trends if t.trend_direction == "declined")
|
|
stable = sum(1 for t in seg_trends if t.trend_direction == "stable")
|
|
|
|
# Get top movers within segment
|
|
seg_gainers, seg_losers = self.find_top_movers(seg_trends, limit=5)
|
|
|
|
report = SegmentReport(
|
|
segment_name=seg_name,
|
|
total_keywords=len(seg_trends),
|
|
avg_position=round(avg_pos, 1),
|
|
avg_position_change=round(avg_change, 1),
|
|
visibility_score=round(vis, 2),
|
|
improved_count=improved,
|
|
declined_count=declined,
|
|
stable_count=stable,
|
|
top_gainers=seg_gainers,
|
|
top_losers=seg_losers,
|
|
)
|
|
reports.append(report)
|
|
|
|
return reports
|
|
|
|
async def compare_with_competitor(
|
|
self,
|
|
target: str,
|
|
competitor: str,
|
|
period_days: int = 30,
|
|
) -> CompetitorReport:
|
|
"""
|
|
Period-over-period comparison with a competitor.
|
|
|
|
Uses Ahrefs rank-tracker-competitors-stats for detailed comparison.
|
|
"""
|
|
logger.info(f"Comparing {target} vs {competitor} over {period_days} days")
|
|
|
|
comp_data = await self._call_competitors_stats(target, competitor)
|
|
|
|
report = CompetitorReport(competitor=competitor)
|
|
|
|
if comp_data:
|
|
report.our_visibility = comp_data.get("target_visibility", 0.0)
|
|
report.their_visibility = comp_data.get("competitor_visibility", 0.0)
|
|
report.overlap_keywords = comp_data.get("overlap_keywords", 0)
|
|
report.keywords_we_lead = comp_data.get("target_better", 0)
|
|
report.keywords_they_lead = comp_data.get("competitor_better", 0)
|
|
|
|
# Extract notable gaps
|
|
gaps = comp_data.get("keyword_gaps", [])
|
|
report.notable_gaps = [
|
|
{
|
|
"keyword": g.get("keyword", ""),
|
|
"our_position": g.get("target_position", 0),
|
|
"their_position": g.get("competitor_position", 0),
|
|
"volume": g.get("volume", 0),
|
|
}
|
|
for g in gaps[:15]
|
|
]
|
|
|
|
return report
|
|
|
|
async def _call_competitors_stats(self, target: str, competitor: str) -> dict:
|
|
"""Call Ahrefs rank-tracker-competitors-stats MCP tool."""
|
|
logger.info("Calling Ahrefs rank-tracker-competitors-stats...")
|
|
try:
|
|
import subprocess
|
|
result = subprocess.run(
|
|
["mcp-cli", "call", "ahrefs/rank-tracker-competitors-stats",
|
|
json.dumps({"target": target, "competitor": competitor})],
|
|
capture_output=True, text=True, timeout=60,
|
|
)
|
|
if result.returncode == 0:
|
|
return json.loads(result.stdout)
|
|
except (FileNotFoundError, subprocess.TimeoutExpired, json.JSONDecodeError):
|
|
pass
|
|
return {}
|
|
|
|
async def generate_report(
|
|
self,
|
|
target: str,
|
|
period_days: int = 30,
|
|
competitors: Optional[list[str]] = None,
|
|
) -> RankingReport:
|
|
"""
|
|
Orchestrate full ranking performance report generation.
|
|
|
|
Args:
|
|
target: Target website URL
|
|
period_days: Reporting period in days
|
|
competitors: List of competitor URLs to compare
|
|
|
|
Returns:
|
|
Complete RankingReport with trends, movers, segments, and comparisons
|
|
"""
|
|
logger.info(f"Generating ranking report for: {target} ({period_days} days)")
|
|
|
|
report = RankingReport(target=target, period_days=period_days)
|
|
|
|
# Step 1: Fetch historical position data
|
|
trends = await self.get_historical_positions(target, period_days)
|
|
|
|
if not trends:
|
|
logger.warning("No historical data retrieved. Check Ahrefs project configuration.")
|
|
return report
|
|
|
|
report.keyword_trends = trends
|
|
report.total_keywords = len(trends)
|
|
|
|
# Step 2: Calculate trend summary
|
|
report.trend_summary = self.calculate_trends(trends)
|
|
|
|
# Step 3: Calculate visibility scores (current vs period start)
|
|
report.current_visibility = self._calculate_visibility_score(trends, use_start=False)
|
|
report.previous_visibility = self._calculate_visibility_score(trends, use_start=True)
|
|
report.visibility_change = report.current_visibility - report.previous_visibility
|
|
|
|
# Step 4: Find top movers
|
|
gainers, losers = self.find_top_movers(trends, limit=10)
|
|
report.top_gainers = gainers
|
|
report.top_losers = losers
|
|
|
|
# Step 5: Generate segment reports
|
|
report.segments = self.generate_segment_report(trends)
|
|
|
|
# Step 6: Compare with competitors
|
|
if competitors:
|
|
for competitor in competitors:
|
|
comp_report = await self.compare_with_competitor(
|
|
target, competitor, period_days,
|
|
)
|
|
report.competitors.append(comp_report)
|
|
|
|
logger.info(f"Report complete. Keywords: {report.total_keywords}")
|
|
logger.info(
|
|
f"Visibility: {report.previous_visibility:.2f} -> "
|
|
f"{report.current_visibility:.2f} ({report.visibility_change:+.2f})"
|
|
)
|
|
|
|
return report
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Output formatters
|
|
# ---------------------------------------------------------------------------
|
|
def format_text_report(report: RankingReport) -> str:
|
|
"""Format ranking report as human-readable text."""
|
|
lines = []
|
|
lines.append("=" * 60)
|
|
lines.append(f"Ranking Performance Report: {report.target}")
|
|
lines.append(f"Period: {report.period_start} ~ {report.period_end} ({report.period_days} days)")
|
|
lines.append(f"Generated: {report.timestamp}")
|
|
lines.append("=" * 60)
|
|
|
|
# Visibility trend
|
|
lines.append(f"\nVisibility Score:")
|
|
lines.append(f" Current: {report.current_visibility:.2f}")
|
|
lines.append(f" Previous: {report.previous_visibility:.2f}")
|
|
change_sign = "+" if report.visibility_change >= 0 else ""
|
|
lines.append(f" Change: {change_sign}{report.visibility_change:.2f}")
|
|
|
|
# Trend summary
|
|
ts = report.trend_summary
|
|
lines.append(f"\nKeyword Trends ({report.total_keywords} total):")
|
|
lines.append(f" Improved: {ts.get('improved', 0)}")
|
|
lines.append(f" Declined: {ts.get('declined', 0)}")
|
|
lines.append(f" Stable: {ts.get('stable', 0)}")
|
|
lines.append(f" New: {ts.get('new', 0)}")
|
|
lines.append(f" Lost: {ts.get('lost', 0)}")
|
|
|
|
# Top gainers
|
|
if report.top_gainers:
|
|
lines.append(f"\nTop Gainers:")
|
|
lines.append("-" * 60)
|
|
for m in report.top_gainers:
|
|
lines.append(
|
|
f" {m.keyword}: {m.previous_position} -> {m.current_position} "
|
|
f"(+{m.position_change}) | Vol: {m.volume} | Impact: {m.impact_score}"
|
|
)
|
|
|
|
# Top losers
|
|
if report.top_losers:
|
|
lines.append(f"\nTop Losers:")
|
|
lines.append("-" * 60)
|
|
for m in report.top_losers:
|
|
lines.append(
|
|
f" {m.keyword}: {m.previous_position} -> {m.current_position} "
|
|
f"({m.position_change}) | Vol: {m.volume} | Impact: {m.impact_score}"
|
|
)
|
|
|
|
# Segments
|
|
if report.segments:
|
|
lines.append(f"\nSegment Breakdown:")
|
|
lines.append("-" * 60)
|
|
for seg in report.segments:
|
|
lines.append(
|
|
f" {seg.segment_name}: {seg.total_keywords} kw, "
|
|
f"avg pos {seg.avg_position}, vis {seg.visibility_score}, "
|
|
f"improved {seg.improved_count} / declined {seg.declined_count}"
|
|
)
|
|
|
|
# Competitors
|
|
if report.competitors:
|
|
lines.append(f"\nCompetitor Comparison:")
|
|
lines.append("-" * 60)
|
|
for comp in report.competitors:
|
|
lines.append(f" vs {comp.competitor}:")
|
|
lines.append(f" Our visibility: {comp.our_visibility:.2f}")
|
|
lines.append(f" Their visibility: {comp.their_visibility:.2f}")
|
|
lines.append(f" Overlap: {comp.overlap_keywords} keywords")
|
|
lines.append(f" We lead: {comp.keywords_we_lead}")
|
|
lines.append(f" They lead: {comp.keywords_they_lead}")
|
|
if comp.notable_gaps:
|
|
lines.append(f" Notable gaps:")
|
|
for gap in comp.notable_gaps[:5]:
|
|
lines.append(
|
|
f" {gap['keyword']}: us #{gap['our_position']} "
|
|
f"vs them #{gap['their_position']} (vol: {gap['volume']})"
|
|
)
|
|
|
|
lines.append("\n" + "=" * 60)
|
|
return "\n".join(lines)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CLI
|
|
# ---------------------------------------------------------------------------
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(
|
|
description="Ranking Reporter - Generate ranking performance reports with trends",
|
|
)
|
|
parser.add_argument(
|
|
"--target",
|
|
required=True,
|
|
help="Target website URL (e.g., https://example.com)",
|
|
)
|
|
parser.add_argument(
|
|
"--period",
|
|
type=int,
|
|
default=30,
|
|
help="Reporting period in days (default: 30)",
|
|
)
|
|
parser.add_argument(
|
|
"--competitor",
|
|
action="append",
|
|
dest="competitors",
|
|
default=[],
|
|
help="Competitor URL to compare (repeatable)",
|
|
)
|
|
parser.add_argument(
|
|
"--json",
|
|
action="store_true",
|
|
dest="json_output",
|
|
help="Output in JSON format",
|
|
)
|
|
parser.add_argument(
|
|
"--output",
|
|
type=str,
|
|
default=None,
|
|
help="Save output to file path",
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
async def main():
|
|
args = parse_args()
|
|
|
|
reporter = RankingReporter()
|
|
|
|
report = await reporter.generate_report(
|
|
target=args.target,
|
|
period_days=args.period,
|
|
competitors=args.competitors,
|
|
)
|
|
|
|
if args.json_output:
|
|
output = json.dumps(report.to_dict(), ensure_ascii=False, indent=2)
|
|
else:
|
|
output = format_text_report(report)
|
|
|
|
if args.output:
|
|
with open(args.output, "w", encoding="utf-8") as f:
|
|
f.write(output)
|
|
logger.info(f"Output saved to: {args.output}")
|
|
else:
|
|
print(output)
|
|
|
|
reporter.print_stats()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|