""" Ranking Reporter - Ranking Performance Reports with Trends ========================================================== Purpose: Generate ranking reports with trend analysis, top movers, and competitor comparison Python: 3.10+ Usage: python ranking_reporter.py --target https://example.com --period 30 --json python ranking_reporter.py --target https://example.com --period 90 --json python ranking_reporter.py --target https://example.com --competitor https://comp1.com --period 30 --json """ import argparse import asyncio import json import logging import sys from dataclasses import dataclass, field, asdict from datetime import datetime, timedelta from typing import Optional from urllib.parse import urlparse from base_client import BaseAsyncClient, config logger = logging.getLogger(__name__) # CTR weights for impact scoring (same as position_tracker) CTR_WEIGHTS: dict[int, float] = { 1: 0.300, 2: 0.150, 3: 0.100, 4: 0.070, 5: 0.050, 6: 0.038, 7: 0.030, 8: 0.025, 9: 0.020, 10: 0.018, } for _p in range(11, 21): CTR_WEIGHTS[_p] = round(0.015 - (_p - 11) * 0.001, 4) for _p in range(21, 51): CTR_WEIGHTS[_p] = round(max(0.005 - (_p - 21) * 0.0001, 0.001), 4) for _p in range(51, 101): CTR_WEIGHTS[_p] = 0.0005 # --------------------------------------------------------------------------- # Data classes # --------------------------------------------------------------------------- @dataclass class PositionSnapshot: """A single position measurement at a point in time.""" date: str position: int volume: int = 0 url: str = "" @dataclass class RankingTrend: """Keyword ranking trend over time.""" keyword: str positions_over_time: list[PositionSnapshot] = field(default_factory=list) trend_direction: str = "stable" # improved, declined, stable, new, lost avg_position: float = 0.0 current_position: int = 0 start_position: int = 0 total_change: int = 0 volume: int = 0 intent: str = "informational" is_brand: bool = False def compute_trend(self): """Compute trend direction and average from position history.""" if not self.positions_over_time: self.trend_direction = "stable" return positions = [s.position for s in self.positions_over_time if s.position > 0] if not positions: self.trend_direction = "lost" return self.avg_position = sum(positions) / len(positions) self.current_position = positions[-1] self.start_position = positions[0] self.total_change = self.start_position - self.current_position # Determine trend using linear regression direction if len(positions) >= 2: n = len(positions) x_mean = (n - 1) / 2.0 y_mean = sum(positions) / n numerator = sum((i - x_mean) * (p - y_mean) for i, p in enumerate(positions)) denominator = sum((i - x_mean) ** 2 for i in range(n)) if denominator > 0: slope = numerator / denominator # Negative slope means position number decreasing = improving if slope < -0.5: self.trend_direction = "improved" elif slope > 0.5: self.trend_direction = "declined" else: self.trend_direction = "stable" else: self.trend_direction = "stable" if self.volume == 0 and self.positions_over_time: self.volume = self.positions_over_time[-1].volume @dataclass class TopMover: """Keyword with significant position change.""" keyword: str position_change: int current_position: int = 0 previous_position: int = 0 volume: int = 0 impact_score: float = 0.0 direction: str = "improved" def calculate_impact(self): """Calculate impact score: volume * CTR delta.""" old_ctr = CTR_WEIGHTS.get(self.previous_position, 0.0005) if self.previous_position > 0 else 0.0 new_ctr = CTR_WEIGHTS.get(self.current_position, 0.0005) if self.current_position > 0 else 0.0 ctr_delta = abs(new_ctr - old_ctr) self.impact_score = round(self.volume * ctr_delta, 2) self.direction = "improved" if self.position_change > 0 else "declined" @dataclass class SegmentReport: """Performance breakdown for a keyword segment.""" segment_name: str total_keywords: int = 0 avg_position: float = 0.0 avg_position_change: float = 0.0 visibility_score: float = 0.0 improved_count: int = 0 declined_count: int = 0 stable_count: int = 0 top_gainers: list[TopMover] = field(default_factory=list) top_losers: list[TopMover] = field(default_factory=list) @dataclass class CompetitorReport: """Competitor comparison for a reporting period.""" competitor: str our_visibility: float = 0.0 their_visibility: float = 0.0 overlap_keywords: int = 0 keywords_we_lead: int = 0 keywords_they_lead: int = 0 notable_gaps: list[dict] = field(default_factory=list) @dataclass class RankingReport: """Complete ranking performance report.""" target: str period_days: int = 30 period_start: str = "" period_end: str = "" total_keywords: int = 0 current_visibility: float = 0.0 previous_visibility: float = 0.0 visibility_change: float = 0.0 trend_summary: dict = field(default_factory=lambda: { "improved": 0, "declined": 0, "stable": 0, "new": 0, "lost": 0, }) top_gainers: list[TopMover] = field(default_factory=list) top_losers: list[TopMover] = field(default_factory=list) segments: list[SegmentReport] = field(default_factory=list) competitors: list[CompetitorReport] = field(default_factory=list) keyword_trends: list[RankingTrend] = field(default_factory=list) timestamp: str = "" def __post_init__(self): if not self.timestamp: self.timestamp = datetime.now().isoformat() if not self.period_end: self.period_end = datetime.now().strftime("%Y-%m-%d") if not self.period_start: start = datetime.now() - timedelta(days=self.period_days) self.period_start = start.strftime("%Y-%m-%d") def to_dict(self) -> dict: """Convert to JSON-serializable dictionary.""" return { "target": self.target, "period": { "days": self.period_days, "start": self.period_start, "end": self.period_end, }, "total_keywords": self.total_keywords, "visibility": { "current": round(self.current_visibility, 2), "previous": round(self.previous_visibility, 2), "change": round(self.visibility_change, 2), }, "trend_summary": self.trend_summary, "top_gainers": [asdict(m) for m in self.top_gainers], "top_losers": [asdict(m) for m in self.top_losers], "segments": [asdict(s) for s in self.segments], "competitors": [asdict(c) for c in self.competitors], "keyword_trends": [ { "keyword": t.keyword, "trend_direction": t.trend_direction, "avg_position": round(t.avg_position, 1), "current_position": t.current_position, "start_position": t.start_position, "total_change": t.total_change, "volume": t.volume, } for t in self.keyword_trends ], "timestamp": self.timestamp, } # --------------------------------------------------------------------------- # Ranking Reporter # --------------------------------------------------------------------------- class RankingReporter(BaseAsyncClient): """Generate ranking performance reports with trend analysis.""" def __init__(self): super().__init__( max_concurrent=5, requests_per_second=2.0, logger=logger, ) def _extract_domain_brand(self, target: str) -> list[str]: """Extract brand terms from the target domain name.""" parsed = urlparse(target) hostname = parsed.hostname or target parts = hostname.replace("www.", "").split(".") brand_parts = [] for part in parts: if part not in ("com", "co", "kr", "net", "org", "io", "ai", "www"): brand_parts.append(part.lower()) if "-" in part: brand_parts.extend(part.lower().split("-")) return list(set(brand_parts)) async def get_historical_positions( self, target: str, period_days: int = 30, ) -> list[RankingTrend]: """ Fetch historical position data from Ahrefs rank-tracker-overview with date range parameters. Returns list of RankingTrend objects with position snapshots over time. """ logger.info(f"Fetching historical positions for {target} ({period_days} days)") brand_terms = self._extract_domain_brand(target) end_date = datetime.now().strftime("%Y-%m-%d") start_date = (datetime.now() - timedelta(days=period_days)).strftime("%Y-%m-%d") raw_data = await self._call_rank_tracker_historical(target, start_date, end_date) trends: dict[str, RankingTrend] = {} for item in raw_data: keyword = item.get("keyword", "") if keyword not in trends: is_brand = any(term in keyword.lower() for term in brand_terms) trends[keyword] = RankingTrend( keyword=keyword, volume=item.get("volume", 0), intent=item.get("intent", "informational"), is_brand=is_brand, ) snapshot = PositionSnapshot( date=item.get("date", end_date), position=item.get("position", 0), volume=item.get("volume", 0), url=item.get("url", ""), ) trends[keyword].positions_over_time.append(snapshot) # Sort snapshots by date and compute trends for trend in trends.values(): trend.positions_over_time.sort(key=lambda s: s.date) trend.compute_trend() logger.info(f"Retrieved trends for {len(trends)} keywords") return list(trends.values()) async def _call_rank_tracker_historical( self, target: str, start_date: str, end_date: str, ) -> list[dict]: """Call Ahrefs rank-tracker-overview with date range.""" logger.info(f"Calling Ahrefs rank-tracker-overview ({start_date} to {end_date})...") try: import subprocess result = subprocess.run( ["mcp-cli", "call", "ahrefs/rank-tracker-overview", json.dumps({ "target": target, "date_from": start_date, "date_to": end_date, })], capture_output=True, text=True, timeout=60, ) if result.returncode == 0: data = json.loads(result.stdout) return data.get("keywords", data.get("results", [])) except (FileNotFoundError, subprocess.TimeoutExpired, json.JSONDecodeError): pass return [] def calculate_trends(self, trends: list[RankingTrend]) -> dict: """ Compute overall trend summary from keyword trends. Returns dict with improved/declined/stable/new/lost counts. """ summary = { "improved": 0, "declined": 0, "stable": 0, "new": 0, "lost": 0, } for trend in trends: direction = trend.trend_direction if direction in summary: summary[direction] += 1 else: summary["stable"] += 1 logger.info( f"Trend summary: improved={summary['improved']}, " f"declined={summary['declined']}, stable={summary['stable']}" ) return summary def find_top_movers( self, trends: list[RankingTrend], limit: int = 10, ) -> tuple[list[TopMover], list[TopMover]]: """ Find keywords with biggest position gains and losses. Returns tuple of (top_gainers, top_losers) sorted by impact score. """ gainers: list[TopMover] = [] losers: list[TopMover] = [] for trend in trends: if not trend.positions_over_time or len(trend.positions_over_time) < 2: continue first_pos = trend.start_position last_pos = trend.current_position if first_pos <= 0 or last_pos <= 0: continue change = first_pos - last_pos # positive = improved mover = TopMover( keyword=trend.keyword, position_change=change, current_position=last_pos, previous_position=first_pos, volume=trend.volume, ) mover.calculate_impact() if change > 0: gainers.append(mover) elif change < 0: losers.append(mover) # Sort by impact score descending gainers.sort(key=lambda m: m.impact_score, reverse=True) losers.sort(key=lambda m: m.impact_score, reverse=True) logger.info(f"Top movers: {len(gainers)} gainers, {len(losers)} losers") return gainers[:limit], losers[:limit] def _calculate_visibility_score(self, trends: list[RankingTrend], use_start: bool = False) -> float: """Calculate visibility score from trends (current or start positions).""" total_weighted = 0.0 total_volume = 0 for trend in trends: pos = trend.start_position if use_start else trend.current_position if pos <= 0 or pos > 100: continue volume = max(trend.volume, 1) total_volume += volume ctr = CTR_WEIGHTS.get(pos, 0.0005) total_weighted += volume * ctr if total_volume > 0: max_possible = total_volume * CTR_WEIGHTS[1] return (total_weighted / max_possible) * 100.0 return 0.0 def generate_segment_report(self, trends: list[RankingTrend]) -> list[SegmentReport]: """ Generate performance breakdown by keyword segment. Segments include: brand, non_brand, and by intent type. """ segment_map: dict[str, list[RankingTrend]] = {} for trend in trends: # Brand segment brand_key = "brand" if trend.is_brand else "non_brand" if brand_key not in segment_map: segment_map[brand_key] = [] segment_map[brand_key].append(trend) # Intent segment intent_key = f"intent_{trend.intent.lower()}" if trend.intent else "intent_informational" if intent_key not in segment_map: segment_map[intent_key] = [] segment_map[intent_key].append(trend) reports: list[SegmentReport] = [] for seg_name, seg_trends in sorted(segment_map.items()): if not seg_trends: continue active = [t for t in seg_trends if t.current_position > 0] avg_pos = sum(t.current_position for t in active) / len(active) if active else 0.0 avg_change = sum(t.total_change for t in seg_trends) / len(seg_trends) if seg_trends else 0.0 vis = self._calculate_visibility_score(seg_trends, use_start=False) improved = sum(1 for t in seg_trends if t.trend_direction == "improved") declined = sum(1 for t in seg_trends if t.trend_direction == "declined") stable = sum(1 for t in seg_trends if t.trend_direction == "stable") # Get top movers within segment seg_gainers, seg_losers = self.find_top_movers(seg_trends, limit=5) report = SegmentReport( segment_name=seg_name, total_keywords=len(seg_trends), avg_position=round(avg_pos, 1), avg_position_change=round(avg_change, 1), visibility_score=round(vis, 2), improved_count=improved, declined_count=declined, stable_count=stable, top_gainers=seg_gainers, top_losers=seg_losers, ) reports.append(report) return reports async def compare_with_competitor( self, target: str, competitor: str, period_days: int = 30, ) -> CompetitorReport: """ Period-over-period comparison with a competitor. Uses Ahrefs rank-tracker-competitors-stats for detailed comparison. """ logger.info(f"Comparing {target} vs {competitor} over {period_days} days") comp_data = await self._call_competitors_stats(target, competitor) report = CompetitorReport(competitor=competitor) if comp_data: report.our_visibility = comp_data.get("target_visibility", 0.0) report.their_visibility = comp_data.get("competitor_visibility", 0.0) report.overlap_keywords = comp_data.get("overlap_keywords", 0) report.keywords_we_lead = comp_data.get("target_better", 0) report.keywords_they_lead = comp_data.get("competitor_better", 0) # Extract notable gaps gaps = comp_data.get("keyword_gaps", []) report.notable_gaps = [ { "keyword": g.get("keyword", ""), "our_position": g.get("target_position", 0), "their_position": g.get("competitor_position", 0), "volume": g.get("volume", 0), } for g in gaps[:15] ] return report async def _call_competitors_stats(self, target: str, competitor: str) -> dict: """Call Ahrefs rank-tracker-competitors-stats MCP tool.""" logger.info("Calling Ahrefs rank-tracker-competitors-stats...") try: import subprocess result = subprocess.run( ["mcp-cli", "call", "ahrefs/rank-tracker-competitors-stats", json.dumps({"target": target, "competitor": competitor})], capture_output=True, text=True, timeout=60, ) if result.returncode == 0: return json.loads(result.stdout) except (FileNotFoundError, subprocess.TimeoutExpired, json.JSONDecodeError): pass return {} async def generate_report( self, target: str, period_days: int = 30, competitors: Optional[list[str]] = None, ) -> RankingReport: """ Orchestrate full ranking performance report generation. Args: target: Target website URL period_days: Reporting period in days competitors: List of competitor URLs to compare Returns: Complete RankingReport with trends, movers, segments, and comparisons """ logger.info(f"Generating ranking report for: {target} ({period_days} days)") report = RankingReport(target=target, period_days=period_days) # Step 1: Fetch historical position data trends = await self.get_historical_positions(target, period_days) if not trends: logger.warning("No historical data retrieved. Check Ahrefs project configuration.") return report report.keyword_trends = trends report.total_keywords = len(trends) # Step 2: Calculate trend summary report.trend_summary = self.calculate_trends(trends) # Step 3: Calculate visibility scores (current vs period start) report.current_visibility = self._calculate_visibility_score(trends, use_start=False) report.previous_visibility = self._calculate_visibility_score(trends, use_start=True) report.visibility_change = report.current_visibility - report.previous_visibility # Step 4: Find top movers gainers, losers = self.find_top_movers(trends, limit=10) report.top_gainers = gainers report.top_losers = losers # Step 5: Generate segment reports report.segments = self.generate_segment_report(trends) # Step 6: Compare with competitors if competitors: for competitor in competitors: comp_report = await self.compare_with_competitor( target, competitor, period_days, ) report.competitors.append(comp_report) logger.info(f"Report complete. Keywords: {report.total_keywords}") logger.info( f"Visibility: {report.previous_visibility:.2f} -> " f"{report.current_visibility:.2f} ({report.visibility_change:+.2f})" ) return report # --------------------------------------------------------------------------- # Output formatters # --------------------------------------------------------------------------- def format_text_report(report: RankingReport) -> str: """Format ranking report as human-readable text.""" lines = [] lines.append("=" * 60) lines.append(f"Ranking Performance Report: {report.target}") lines.append(f"Period: {report.period_start} ~ {report.period_end} ({report.period_days} days)") lines.append(f"Generated: {report.timestamp}") lines.append("=" * 60) # Visibility trend lines.append(f"\nVisibility Score:") lines.append(f" Current: {report.current_visibility:.2f}") lines.append(f" Previous: {report.previous_visibility:.2f}") change_sign = "+" if report.visibility_change >= 0 else "" lines.append(f" Change: {change_sign}{report.visibility_change:.2f}") # Trend summary ts = report.trend_summary lines.append(f"\nKeyword Trends ({report.total_keywords} total):") lines.append(f" Improved: {ts.get('improved', 0)}") lines.append(f" Declined: {ts.get('declined', 0)}") lines.append(f" Stable: {ts.get('stable', 0)}") lines.append(f" New: {ts.get('new', 0)}") lines.append(f" Lost: {ts.get('lost', 0)}") # Top gainers if report.top_gainers: lines.append(f"\nTop Gainers:") lines.append("-" * 60) for m in report.top_gainers: lines.append( f" {m.keyword}: {m.previous_position} -> {m.current_position} " f"(+{m.position_change}) | Vol: {m.volume} | Impact: {m.impact_score}" ) # Top losers if report.top_losers: lines.append(f"\nTop Losers:") lines.append("-" * 60) for m in report.top_losers: lines.append( f" {m.keyword}: {m.previous_position} -> {m.current_position} " f"({m.position_change}) | Vol: {m.volume} | Impact: {m.impact_score}" ) # Segments if report.segments: lines.append(f"\nSegment Breakdown:") lines.append("-" * 60) for seg in report.segments: lines.append( f" {seg.segment_name}: {seg.total_keywords} kw, " f"avg pos {seg.avg_position}, vis {seg.visibility_score}, " f"improved {seg.improved_count} / declined {seg.declined_count}" ) # Competitors if report.competitors: lines.append(f"\nCompetitor Comparison:") lines.append("-" * 60) for comp in report.competitors: lines.append(f" vs {comp.competitor}:") lines.append(f" Our visibility: {comp.our_visibility:.2f}") lines.append(f" Their visibility: {comp.their_visibility:.2f}") lines.append(f" Overlap: {comp.overlap_keywords} keywords") lines.append(f" We lead: {comp.keywords_we_lead}") lines.append(f" They lead: {comp.keywords_they_lead}") if comp.notable_gaps: lines.append(f" Notable gaps:") for gap in comp.notable_gaps[:5]: lines.append( f" {gap['keyword']}: us #{gap['our_position']} " f"vs them #{gap['their_position']} (vol: {gap['volume']})" ) lines.append("\n" + "=" * 60) return "\n".join(lines) # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Ranking Reporter - Generate ranking performance reports with trends", ) parser.add_argument( "--target", required=True, help="Target website URL (e.g., https://example.com)", ) parser.add_argument( "--period", type=int, default=30, help="Reporting period in days (default: 30)", ) parser.add_argument( "--competitor", action="append", dest="competitors", default=[], help="Competitor URL to compare (repeatable)", ) parser.add_argument( "--json", action="store_true", dest="json_output", help="Output in JSON format", ) parser.add_argument( "--output", type=str, default=None, help="Save output to file path", ) return parser.parse_args() async def main(): args = parse_args() reporter = RankingReporter() report = await reporter.generate_report( target=args.target, period_days=args.period, competitors=args.competitors, ) if args.json_output: output = json.dumps(report.to_dict(), ensure_ascii=False, indent=2) else: output = format_text_report(report) if args.output: with open(args.output, "w", encoding="utf-8") as f: f.write(output) logger.info(f"Output saved to: {args.output}") else: print(output) reporter.print_stats() if __name__ == "__main__": asyncio.run(main())