""" Position Tracker - Keyword Ranking Monitor via Ahrefs Rank Tracker ================================================================== Purpose: Monitor keyword positions, detect changes, calculate visibility scores Python: 3.10+ Usage: python position_tracker.py --target https://example.com --json python position_tracker.py --target https://example.com --threshold 5 --json python position_tracker.py --target https://example.com --segment brand --json python position_tracker.py --target https://example.com --competitor https://comp1.com --json """ import argparse import asyncio import json import logging import math import sys from dataclasses import dataclass, field, asdict from datetime import datetime from typing import Optional from urllib.parse import urlparse from base_client import BaseAsyncClient, config logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # CTR curve weights for visibility score (position 1-100) # Based on industry-standard organic CTR curves # --------------------------------------------------------------------------- CTR_WEIGHTS: dict[int, float] = { 1: 0.300, 2: 0.150, 3: 0.100, 4: 0.070, 5: 0.050, 6: 0.038, 7: 0.030, 8: 0.025, 9: 0.020, 10: 0.018, } # Positions 11-20 get diminishing CTR for _p in range(11, 21): CTR_WEIGHTS[_p] = round(0.015 - (_p - 11) * 0.001, 4) # Positions 21-50 get minimal CTR for _p in range(21, 51): CTR_WEIGHTS[_p] = round(max(0.005 - (_p - 21) * 0.0001, 0.001), 4) # Positions 51-100 get near-zero CTR for _p in range(51, 101): CTR_WEIGHTS[_p] = 0.0005 # --------------------------------------------------------------------------- # Data classes # --------------------------------------------------------------------------- @dataclass class KeywordPosition: """Single keyword ranking position.""" keyword: str position: int previous_position: Optional[int] = None change: int = 0 volume: int = 0 url: str = "" intent: str = "informational" is_brand: bool = False def __post_init__(self): if self.previous_position is not None: self.change = self.previous_position - self.position @dataclass class VisibilityScore: """Weighted visibility score based on CTR curve.""" score: float = 0.0 top3: int = 0 top10: int = 0 top20: int = 0 top50: int = 0 top100: int = 0 total_keywords: int = 0 @property def distribution(self) -> dict: return { "top3": self.top3, "top10": self.top10, "top20": self.top20, "top50": self.top50, "top100": self.top100, } @dataclass class PositionAlert: """Alert for significant position change.""" keyword: str old_position: int new_position: int change: int volume: int = 0 severity: str = "medium" def __post_init__(self): abs_change = abs(self.change) if abs_change >= 20: self.severity = "critical" elif abs_change >= 10: self.severity = "high" elif abs_change >= 5: self.severity = "medium" else: self.severity = "low" @dataclass class CompetitorComparison: """Competitor ranking comparison result.""" competitor: str overlap_keywords: int = 0 competitor_better: int = 0 target_better: int = 0 avg_position_gap: float = 0.0 top_gaps: list = field(default_factory=list) @dataclass class SegmentData: """Keyword segment aggregation.""" name: str keywords: int = 0 avg_position: float = 0.0 visibility: float = 0.0 improved: int = 0 declined: int = 0 stable: int = 0 @dataclass class TrackingResult: """Complete position tracking result.""" target: str total_keywords: int = 0 visibility_score: float = 0.0 visibility: Optional[VisibilityScore] = None positions: list[KeywordPosition] = field(default_factory=list) changes: dict = field(default_factory=lambda: { "improved": 0, "declined": 0, "stable": 0, "new": 0, "lost": 0, }) alerts: list[PositionAlert] = field(default_factory=list) segments: dict[str, SegmentData] = field(default_factory=dict) competitors: list[CompetitorComparison] = field(default_factory=list) timestamp: str = "" def __post_init__(self): if not self.timestamp: self.timestamp = datetime.now().isoformat() def to_dict(self) -> dict: """Convert to JSON-serializable dictionary.""" result = { "target": self.target, "total_keywords": self.total_keywords, "visibility_score": round(self.visibility_score, 2), "positions": self.visibility.distribution if self.visibility else {}, "changes": self.changes, "alerts": [asdict(a) for a in self.alerts], "segments": { k: asdict(v) for k, v in self.segments.items() }, "competitors": [asdict(c) for c in self.competitors], "keyword_details": [asdict(p) for p in self.positions], "timestamp": self.timestamp, } return result # --------------------------------------------------------------------------- # Position Tracker # --------------------------------------------------------------------------- class PositionTracker(BaseAsyncClient): """Track keyword ranking positions via Ahrefs Rank Tracker.""" def __init__(self): super().__init__( max_concurrent=5, requests_per_second=2.0, logger=logger, ) self.brand_terms: list[str] = [] def _extract_domain_brand(self, target: str) -> list[str]: """Extract brand terms from the target domain name.""" parsed = urlparse(target) hostname = parsed.hostname or target # Remove TLD and www prefix parts = hostname.replace("www.", "").split(".") brand_parts = [] for part in parts: if part not in ("com", "co", "kr", "net", "org", "io", "ai", "www"): brand_parts.append(part.lower()) # Also split camelCase and hyphenated forms if "-" in part: brand_parts.extend(part.lower().split("-")) return list(set(brand_parts)) async def get_project_keywords(self, target: str) -> list[dict]: """ Fetch tracked keywords from Ahrefs management-project-keywords. Uses Ahrefs MCP tool: management-project-keywords Returns list of keyword dicts with keyword, volume, intent info. """ logger.info(f"Fetching project keywords for: {target}") # Step 1: Get project list to find matching project projects = await self._call_ahrefs_projects(target) if not projects: logger.warning(f"No Ahrefs project found for {target}. Using rank-tracker-overview directly.") return [] project_id = projects[0].get("id", "") # Step 2: Fetch keywords for the project keywords_data = await self._call_ahrefs_project_keywords(project_id) return keywords_data async def _call_ahrefs_projects(self, target: str) -> list[dict]: """ Call Ahrefs management-projects MCP tool. In production, this calls the MCP tool. For standalone, reads from config/cache. """ # Simulated MCP call structure - in production this calls: # mcp__ahrefs__management-projects logger.info("Calling Ahrefs management-projects...") try: import subprocess result = subprocess.run( ["mcp-cli", "call", "ahrefs/management-projects", json.dumps({})], capture_output=True, text=True, timeout=30, ) if result.returncode == 0: return json.loads(result.stdout).get("projects", []) except (FileNotFoundError, subprocess.TimeoutExpired, json.JSONDecodeError): pass # Return empty if MCP not available - caller handles gracefully return [] async def _call_ahrefs_project_keywords(self, project_id: str) -> list[dict]: """ Call Ahrefs management-project-keywords MCP tool. """ logger.info(f"Calling Ahrefs management-project-keywords for project: {project_id}") try: import subprocess result = subprocess.run( ["mcp-cli", "call", "ahrefs/management-project-keywords", json.dumps({"project_id": project_id})], capture_output=True, text=True, timeout=30, ) if result.returncode == 0: return json.loads(result.stdout).get("keywords", []) except (FileNotFoundError, subprocess.TimeoutExpired, json.JSONDecodeError): pass return [] async def get_current_positions(self, target: str) -> list[KeywordPosition]: """ Fetch current keyword positions via Ahrefs rank-tracker-overview. Returns list of KeywordPosition objects with current and previous positions. """ logger.info(f"Fetching current positions for: {target}") self.brand_terms = self._extract_domain_brand(target) raw_data = await self._call_rank_tracker_overview(target) positions: list[KeywordPosition] = [] for item in raw_data: keyword = item.get("keyword", "") current_pos = item.get("position", 0) prev_pos = item.get("previous_position") volume = item.get("volume", 0) url = item.get("url", "") intent = item.get("intent", "informational") # Determine if brand keyword is_brand = self._is_brand_keyword(keyword) kp = KeywordPosition( keyword=keyword, position=current_pos, previous_position=prev_pos, volume=volume, url=url, intent=intent, is_brand=is_brand, ) positions.append(kp) logger.info(f"Retrieved {len(positions)} keyword positions") return positions async def _call_rank_tracker_overview(self, target: str) -> list[dict]: """ Call Ahrefs rank-tracker-overview MCP tool. """ logger.info(f"Calling Ahrefs rank-tracker-overview for: {target}") try: import subprocess result = subprocess.run( ["mcp-cli", "call", "ahrefs/rank-tracker-overview", json.dumps({"target": target})], capture_output=True, text=True, timeout=60, ) if result.returncode == 0: data = json.loads(result.stdout) return data.get("keywords", data.get("results", [])) except (FileNotFoundError, subprocess.TimeoutExpired, json.JSONDecodeError): pass return [] def _is_brand_keyword(self, keyword: str) -> bool: """Check if a keyword is brand-related based on domain name.""" keyword_lower = keyword.lower() for term in self.brand_terms: if term in keyword_lower: return True return False def detect_changes( self, positions: list[KeywordPosition], threshold: int = 3, ) -> tuple[dict, list[PositionAlert]]: """ Detect significant position changes and generate alerts. Args: positions: List of current keyword positions with previous data threshold: Minimum position change to trigger an alert Returns: Tuple of (change_summary_dict, list_of_alerts) """ changes = { "improved": 0, "declined": 0, "stable": 0, "new": 0, "lost": 0, } alerts: list[PositionAlert] = [] for kp in positions: if kp.previous_position is None: changes["new"] += 1 continue if kp.position == 0 and kp.previous_position > 0: changes["lost"] += 1 alert = PositionAlert( keyword=kp.keyword, old_position=kp.previous_position, new_position=0, change=-kp.previous_position, volume=kp.volume, ) alerts.append(alert) continue change = kp.change # positive = improved, negative = declined if change > 0: changes["improved"] += 1 elif change < 0: changes["declined"] += 1 else: changes["stable"] += 1 # Generate alert if change exceeds threshold if abs(change) >= threshold: alert = PositionAlert( keyword=kp.keyword, old_position=kp.previous_position, new_position=kp.position, change=change, volume=kp.volume, ) alerts.append(alert) # Sort alerts by severity (critical first) then by volume (high first) severity_order = {"critical": 0, "high": 1, "medium": 2, "low": 3} alerts.sort(key=lambda a: (severity_order.get(a.severity, 4), -a.volume)) logger.info( f"Changes detected - improved: {changes['improved']}, " f"declined: {changes['declined']}, stable: {changes['stable']}, " f"new: {changes['new']}, lost: {changes['lost']}" ) logger.info(f"Alerts generated: {len(alerts)} (threshold: {threshold})") return changes, alerts def calculate_visibility(self, positions: list[KeywordPosition]) -> VisibilityScore: """ Calculate weighted visibility score based on CTR curve. Visibility = sum(keyword_volume * ctr_weight_for_position) / sum(keyword_volume) Score normalized to 0-100 scale. """ vis = VisibilityScore() total_weighted = 0.0 total_volume = 0 for kp in positions: if kp.position <= 0 or kp.position > 100: continue vis.total_keywords += 1 volume = max(kp.volume, 1) # Avoid zero volume total_volume += volume # Position bucket counting if kp.position <= 3: vis.top3 += 1 if kp.position <= 10: vis.top10 += 1 if kp.position <= 20: vis.top20 += 1 if kp.position <= 50: vis.top50 += 1 if kp.position <= 100: vis.top100 += 1 # Weighted visibility ctr = CTR_WEIGHTS.get(kp.position, 0.0005) total_weighted += volume * ctr if total_volume > 0: # Normalize: max possible is if all keywords were position 1 max_possible = total_volume * CTR_WEIGHTS[1] vis.score = (total_weighted / max_possible) * 100.0 else: vis.score = 0.0 logger.info( f"Visibility score: {vis.score:.2f} | " f"Top3: {vis.top3}, Top10: {vis.top10}, Top20: {vis.top20}" ) return vis def segment_keywords( self, positions: list[KeywordPosition], filter_segment: Optional[str] = None, ) -> dict[str, SegmentData]: """ Segment keywords into brand/non-brand and by intent type. Args: positions: List of keyword positions filter_segment: Optional filter - 'brand', 'non_brand', or intent type Returns: Dictionary of segment name to SegmentData """ segments: dict[str, list[KeywordPosition]] = { "brand": [], "non_brand": [], } intent_segments: dict[str, list[KeywordPosition]] = {} for kp in positions: # Brand segmentation if kp.is_brand: segments["brand"].append(kp) else: segments["non_brand"].append(kp) # Intent segmentation intent_key = kp.intent.lower() if kp.intent else "informational" if intent_key not in intent_segments: intent_segments[intent_key] = [] intent_segments[intent_key].append(kp) # Merge intent segments into main segments for intent_key, kps in intent_segments.items(): segments[f"intent_{intent_key}"] = kps # Calculate segment stats result: dict[str, SegmentData] = {} for seg_name, kps in segments.items(): if filter_segment and seg_name != filter_segment: continue if not kps: continue active_positions = [kp for kp in kps if kp.position > 0] avg_pos = ( sum(kp.position for kp in active_positions) / len(active_positions) if active_positions else 0.0 ) vis = self.calculate_visibility(kps) improved = sum(1 for kp in kps if kp.change > 0) declined = sum(1 for kp in kps if kp.change < 0) stable = sum(1 for kp in kps if kp.change == 0 and kp.previous_position is not None) result[seg_name] = SegmentData( name=seg_name, keywords=len(kps), avg_position=round(avg_pos, 1), visibility=round(vis.score, 2), improved=improved, declined=declined, stable=stable, ) return result async def compare_competitors( self, target: str, competitors: list[str], ) -> list[CompetitorComparison]: """ Compare ranking positions against competitors. Uses Ahrefs rank-tracker-competitors-overview MCP tool. """ comparisons: list[CompetitorComparison] = [] for competitor in competitors: logger.info(f"Comparing with competitor: {competitor}") comp_data = await self._call_competitors_overview(target, competitor) comparison = CompetitorComparison(competitor=competitor) if comp_data: comparison.overlap_keywords = comp_data.get("overlap_keywords", 0) comparison.competitor_better = comp_data.get("competitor_better", 0) comparison.target_better = comp_data.get("target_better", 0) comparison.avg_position_gap = comp_data.get("avg_position_gap", 0.0) # Extract top gaps (keywords where competitor outranks us most) top_gaps = comp_data.get("top_gaps", []) comparison.top_gaps = top_gaps[:10] comparisons.append(comparison) return comparisons async def _call_competitors_overview(self, target: str, competitor: str) -> dict: """ Call Ahrefs rank-tracker-competitors-overview MCP tool. """ logger.info(f"Calling Ahrefs rank-tracker-competitors-overview...") try: import subprocess result = subprocess.run( ["mcp-cli", "call", "ahrefs/rank-tracker-competitors-overview", json.dumps({"target": target, "competitor": competitor})], capture_output=True, text=True, timeout=60, ) if result.returncode == 0: return json.loads(result.stdout) except (FileNotFoundError, subprocess.TimeoutExpired, json.JSONDecodeError): pass return {} async def analyze( self, target: str, threshold: int = 3, competitors: Optional[list[str]] = None, segment_filter: Optional[str] = None, ) -> TrackingResult: """ Orchestrate full position tracking analysis. Args: target: Target website URL threshold: Position change threshold for alerts competitors: List of competitor URLs to compare segment_filter: Optional segment filter (brand, non_brand, intent_*) Returns: Complete TrackingResult with all analysis data """ logger.info(f"Starting position tracking analysis for: {target}") logger.info(f"Threshold: {threshold}, Competitors: {competitors or 'none'}") result = TrackingResult(target=target) # Step 1: Fetch current positions positions = await self.get_current_positions(target) if not positions: logger.warning("No position data retrieved. Check Ahrefs project configuration.") return result result.positions = positions result.total_keywords = len(positions) # Step 2: Detect changes and generate alerts changes, alerts = self.detect_changes(positions, threshold) result.changes = changes result.alerts = alerts # Step 3: Calculate visibility score visibility = self.calculate_visibility(positions) result.visibility = visibility result.visibility_score = visibility.score # Step 4: Segment keywords segments = self.segment_keywords(positions, segment_filter) result.segments = segments # Step 5: Compare with competitors (if provided) if competitors: comp_results = await self.compare_competitors(target, competitors) result.competitors = comp_results logger.info(f"Analysis complete. Total keywords: {result.total_keywords}") logger.info(f"Visibility score: {result.visibility_score:.2f}") return result # --------------------------------------------------------------------------- # Output formatters # --------------------------------------------------------------------------- def format_text_report(result: TrackingResult) -> str: """Format tracking result as human-readable text report.""" lines = [] lines.append("=" * 60) lines.append(f"Position Tracking Report: {result.target}") lines.append(f"Timestamp: {result.timestamp}") lines.append("=" * 60) # Visibility overview lines.append(f"\nVisibility Score: {result.visibility_score:.2f}/100") lines.append(f"Total Keywords Tracked: {result.total_keywords}") if result.visibility: vis = result.visibility lines.append(f"\nPosition Distribution:") lines.append(f" Top 3: {vis.top3}") lines.append(f" Top 10: {vis.top10}") lines.append(f" Top 20: {vis.top20}") lines.append(f" Top 50: {vis.top50}") lines.append(f" Top 100: {vis.top100}") # Changes summary ch = result.changes lines.append(f"\nPosition Changes:") lines.append(f" Improved: {ch.get('improved', 0)}") lines.append(f" Declined: {ch.get('declined', 0)}") lines.append(f" Stable: {ch.get('stable', 0)}") lines.append(f" New: {ch.get('new', 0)}") lines.append(f" Lost: {ch.get('lost', 0)}") # Alerts if result.alerts: lines.append(f"\nAlerts ({len(result.alerts)}):") lines.append("-" * 60) for alert in result.alerts[:20]: direction = "UP" if alert.change > 0 else "DOWN" lines.append( f" [{alert.severity.upper()}] {alert.keyword}: " f"{alert.old_position} -> {alert.new_position} " f"({direction} {abs(alert.change)}) | Vol: {alert.volume}" ) # Segments if result.segments: lines.append(f"\nSegments:") lines.append("-" * 60) for name, seg in result.segments.items(): lines.append( f" {name}: {seg.keywords} keywords, " f"avg pos {seg.avg_position}, " f"vis {seg.visibility}" ) # Competitors if result.competitors: lines.append(f"\nCompetitor Comparison:") lines.append("-" * 60) for comp in result.competitors: lines.append(f" vs {comp.competitor}:") lines.append(f" Overlap: {comp.overlap_keywords} keywords") lines.append(f" We win: {comp.target_better}") lines.append(f" They win: {comp.competitor_better}") lines.append(f" Avg gap: {comp.avg_position_gap:.1f}") lines.append("\n" + "=" * 60) return "\n".join(lines) # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Position Tracker - Monitor keyword rankings via Ahrefs Rank Tracker", ) parser.add_argument( "--target", required=True, help="Target website URL (e.g., https://example.com)", ) parser.add_argument( "--threshold", type=int, default=3, help="Position change threshold for alerts (default: 3)", ) parser.add_argument( "--segment", choices=["brand", "non_brand", "intent_informational", "intent_commercial", "intent_transactional", "intent_navigational"], default=None, help="Filter results by keyword segment", ) parser.add_argument( "--competitor", action="append", dest="competitors", default=[], help="Competitor URL to compare (repeatable)", ) parser.add_argument( "--json", action="store_true", dest="json_output", help="Output in JSON format", ) parser.add_argument( "--output", type=str, default=None, help="Save output to file path", ) return parser.parse_args() async def main(): args = parse_args() tracker = PositionTracker() result = await tracker.analyze( target=args.target, threshold=args.threshold, competitors=args.competitors, segment_filter=args.segment, ) if args.json_output: output = json.dumps(result.to_dict(), ensure_ascii=False, indent=2) else: output = format_text_report(result) if args.output: with open(args.output, "w", encoding="utf-8") as f: f.write(output) logger.info(f"Output saved to: {args.output}") else: print(output) tracker.print_stats() if __name__ == "__main__": asyncio.run(main())