Files
our-claude-skills/custom-skills/21-seo-position-tracking/code/scripts/position_tracker.py
Andrew Yim a3ff965b87 Add SEO skills 19-28, 31-32 with full Python implementations
12 new skills: Keyword Strategy, SERP Analysis, Position Tracking,
Link Building, Content Strategy, E-Commerce SEO, KPI Framework,
International SEO, AI Visibility, Knowledge Graph, Competitor Intel,
and Crawl Budget. ~20K lines of Python across 25 domain scripts.
Updated skill 11 pipeline table and repo CLAUDE.md.
Enhanced skill 18 local SEO workflow from jamie.clinic audit.

Note: Skill 26 hreflang_validator.py pending (content filter block).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-13 12:05:59 +09:00

787 lines
26 KiB
Python

"""
Position Tracker - Keyword Ranking Monitor via Ahrefs Rank Tracker
==================================================================
Purpose: Monitor keyword positions, detect changes, calculate visibility scores
Python: 3.10+
Usage:
python position_tracker.py --target https://example.com --json
python position_tracker.py --target https://example.com --threshold 5 --json
python position_tracker.py --target https://example.com --segment brand --json
python position_tracker.py --target https://example.com --competitor https://comp1.com --json
"""
import argparse
import asyncio
import json
import logging
import math
import sys
from dataclasses import dataclass, field, asdict
from datetime import datetime
from typing import Optional
from urllib.parse import urlparse
from base_client import BaseAsyncClient, config
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# CTR curve weights for visibility score (position 1-100)
# Based on industry-standard organic CTR curves
# ---------------------------------------------------------------------------
CTR_WEIGHTS: dict[int, float] = {
1: 0.300,
2: 0.150,
3: 0.100,
4: 0.070,
5: 0.050,
6: 0.038,
7: 0.030,
8: 0.025,
9: 0.020,
10: 0.018,
}
# Positions 11-20 get diminishing CTR
for _p in range(11, 21):
CTR_WEIGHTS[_p] = round(0.015 - (_p - 11) * 0.001, 4)
# Positions 21-50 get minimal CTR
for _p in range(21, 51):
CTR_WEIGHTS[_p] = round(max(0.005 - (_p - 21) * 0.0001, 0.001), 4)
# Positions 51-100 get near-zero CTR
for _p in range(51, 101):
CTR_WEIGHTS[_p] = 0.0005
# ---------------------------------------------------------------------------
# Data classes
# ---------------------------------------------------------------------------
@dataclass
class KeywordPosition:
"""Single keyword ranking position."""
keyword: str
position: int
previous_position: Optional[int] = None
change: int = 0
volume: int = 0
url: str = ""
intent: str = "informational"
is_brand: bool = False
def __post_init__(self):
if self.previous_position is not None:
self.change = self.previous_position - self.position
@dataclass
class VisibilityScore:
"""Weighted visibility score based on CTR curve."""
score: float = 0.0
top3: int = 0
top10: int = 0
top20: int = 0
top50: int = 0
top100: int = 0
total_keywords: int = 0
@property
def distribution(self) -> dict:
return {
"top3": self.top3,
"top10": self.top10,
"top20": self.top20,
"top50": self.top50,
"top100": self.top100,
}
@dataclass
class PositionAlert:
"""Alert for significant position change."""
keyword: str
old_position: int
new_position: int
change: int
volume: int = 0
severity: str = "medium"
def __post_init__(self):
abs_change = abs(self.change)
if abs_change >= 20:
self.severity = "critical"
elif abs_change >= 10:
self.severity = "high"
elif abs_change >= 5:
self.severity = "medium"
else:
self.severity = "low"
@dataclass
class CompetitorComparison:
"""Competitor ranking comparison result."""
competitor: str
overlap_keywords: int = 0
competitor_better: int = 0
target_better: int = 0
avg_position_gap: float = 0.0
top_gaps: list = field(default_factory=list)
@dataclass
class SegmentData:
"""Keyword segment aggregation."""
name: str
keywords: int = 0
avg_position: float = 0.0
visibility: float = 0.0
improved: int = 0
declined: int = 0
stable: int = 0
@dataclass
class TrackingResult:
"""Complete position tracking result."""
target: str
total_keywords: int = 0
visibility_score: float = 0.0
visibility: Optional[VisibilityScore] = None
positions: list[KeywordPosition] = field(default_factory=list)
changes: dict = field(default_factory=lambda: {
"improved": 0, "declined": 0, "stable": 0, "new": 0, "lost": 0,
})
alerts: list[PositionAlert] = field(default_factory=list)
segments: dict[str, SegmentData] = field(default_factory=dict)
competitors: list[CompetitorComparison] = field(default_factory=list)
timestamp: str = ""
def __post_init__(self):
if not self.timestamp:
self.timestamp = datetime.now().isoformat()
def to_dict(self) -> dict:
"""Convert to JSON-serializable dictionary."""
result = {
"target": self.target,
"total_keywords": self.total_keywords,
"visibility_score": round(self.visibility_score, 2),
"positions": self.visibility.distribution if self.visibility else {},
"changes": self.changes,
"alerts": [asdict(a) for a in self.alerts],
"segments": {
k: asdict(v) for k, v in self.segments.items()
},
"competitors": [asdict(c) for c in self.competitors],
"keyword_details": [asdict(p) for p in self.positions],
"timestamp": self.timestamp,
}
return result
# ---------------------------------------------------------------------------
# Position Tracker
# ---------------------------------------------------------------------------
class PositionTracker(BaseAsyncClient):
"""Track keyword ranking positions via Ahrefs Rank Tracker."""
def __init__(self):
super().__init__(
max_concurrent=5,
requests_per_second=2.0,
logger=logger,
)
self.brand_terms: list[str] = []
def _extract_domain_brand(self, target: str) -> list[str]:
"""Extract brand terms from the target domain name."""
parsed = urlparse(target)
hostname = parsed.hostname or target
# Remove TLD and www prefix
parts = hostname.replace("www.", "").split(".")
brand_parts = []
for part in parts:
if part not in ("com", "co", "kr", "net", "org", "io", "ai", "www"):
brand_parts.append(part.lower())
# Also split camelCase and hyphenated forms
if "-" in part:
brand_parts.extend(part.lower().split("-"))
return list(set(brand_parts))
async def get_project_keywords(self, target: str) -> list[dict]:
"""
Fetch tracked keywords from Ahrefs management-project-keywords.
Uses Ahrefs MCP tool: management-project-keywords
Returns list of keyword dicts with keyword, volume, intent info.
"""
logger.info(f"Fetching project keywords for: {target}")
# Step 1: Get project list to find matching project
projects = await self._call_ahrefs_projects(target)
if not projects:
logger.warning(f"No Ahrefs project found for {target}. Using rank-tracker-overview directly.")
return []
project_id = projects[0].get("id", "")
# Step 2: Fetch keywords for the project
keywords_data = await self._call_ahrefs_project_keywords(project_id)
return keywords_data
async def _call_ahrefs_projects(self, target: str) -> list[dict]:
"""
Call Ahrefs management-projects MCP tool.
In production, this calls the MCP tool. For standalone, reads from config/cache.
"""
# Simulated MCP call structure - in production this calls:
# mcp__ahrefs__management-projects
logger.info("Calling Ahrefs management-projects...")
try:
import subprocess
result = subprocess.run(
["mcp-cli", "call", "ahrefs/management-projects", json.dumps({})],
capture_output=True, text=True, timeout=30,
)
if result.returncode == 0:
return json.loads(result.stdout).get("projects", [])
except (FileNotFoundError, subprocess.TimeoutExpired, json.JSONDecodeError):
pass
# Return empty if MCP not available - caller handles gracefully
return []
async def _call_ahrefs_project_keywords(self, project_id: str) -> list[dict]:
"""
Call Ahrefs management-project-keywords MCP tool.
"""
logger.info(f"Calling Ahrefs management-project-keywords for project: {project_id}")
try:
import subprocess
result = subprocess.run(
["mcp-cli", "call", "ahrefs/management-project-keywords",
json.dumps({"project_id": project_id})],
capture_output=True, text=True, timeout=30,
)
if result.returncode == 0:
return json.loads(result.stdout).get("keywords", [])
except (FileNotFoundError, subprocess.TimeoutExpired, json.JSONDecodeError):
pass
return []
async def get_current_positions(self, target: str) -> list[KeywordPosition]:
"""
Fetch current keyword positions via Ahrefs rank-tracker-overview.
Returns list of KeywordPosition objects with current and previous positions.
"""
logger.info(f"Fetching current positions for: {target}")
self.brand_terms = self._extract_domain_brand(target)
raw_data = await self._call_rank_tracker_overview(target)
positions: list[KeywordPosition] = []
for item in raw_data:
keyword = item.get("keyword", "")
current_pos = item.get("position", 0)
prev_pos = item.get("previous_position")
volume = item.get("volume", 0)
url = item.get("url", "")
intent = item.get("intent", "informational")
# Determine if brand keyword
is_brand = self._is_brand_keyword(keyword)
kp = KeywordPosition(
keyword=keyword,
position=current_pos,
previous_position=prev_pos,
volume=volume,
url=url,
intent=intent,
is_brand=is_brand,
)
positions.append(kp)
logger.info(f"Retrieved {len(positions)} keyword positions")
return positions
async def _call_rank_tracker_overview(self, target: str) -> list[dict]:
"""
Call Ahrefs rank-tracker-overview MCP tool.
"""
logger.info(f"Calling Ahrefs rank-tracker-overview for: {target}")
try:
import subprocess
result = subprocess.run(
["mcp-cli", "call", "ahrefs/rank-tracker-overview",
json.dumps({"target": target})],
capture_output=True, text=True, timeout=60,
)
if result.returncode == 0:
data = json.loads(result.stdout)
return data.get("keywords", data.get("results", []))
except (FileNotFoundError, subprocess.TimeoutExpired, json.JSONDecodeError):
pass
return []
def _is_brand_keyword(self, keyword: str) -> bool:
"""Check if a keyword is brand-related based on domain name."""
keyword_lower = keyword.lower()
for term in self.brand_terms:
if term in keyword_lower:
return True
return False
def detect_changes(
self,
positions: list[KeywordPosition],
threshold: int = 3,
) -> tuple[dict, list[PositionAlert]]:
"""
Detect significant position changes and generate alerts.
Args:
positions: List of current keyword positions with previous data
threshold: Minimum position change to trigger an alert
Returns:
Tuple of (change_summary_dict, list_of_alerts)
"""
changes = {
"improved": 0,
"declined": 0,
"stable": 0,
"new": 0,
"lost": 0,
}
alerts: list[PositionAlert] = []
for kp in positions:
if kp.previous_position is None:
changes["new"] += 1
continue
if kp.position == 0 and kp.previous_position > 0:
changes["lost"] += 1
alert = PositionAlert(
keyword=kp.keyword,
old_position=kp.previous_position,
new_position=0,
change=-kp.previous_position,
volume=kp.volume,
)
alerts.append(alert)
continue
change = kp.change # positive = improved, negative = declined
if change > 0:
changes["improved"] += 1
elif change < 0:
changes["declined"] += 1
else:
changes["stable"] += 1
# Generate alert if change exceeds threshold
if abs(change) >= threshold:
alert = PositionAlert(
keyword=kp.keyword,
old_position=kp.previous_position,
new_position=kp.position,
change=change,
volume=kp.volume,
)
alerts.append(alert)
# Sort alerts by severity (critical first) then by volume (high first)
severity_order = {"critical": 0, "high": 1, "medium": 2, "low": 3}
alerts.sort(key=lambda a: (severity_order.get(a.severity, 4), -a.volume))
logger.info(
f"Changes detected - improved: {changes['improved']}, "
f"declined: {changes['declined']}, stable: {changes['stable']}, "
f"new: {changes['new']}, lost: {changes['lost']}"
)
logger.info(f"Alerts generated: {len(alerts)} (threshold: {threshold})")
return changes, alerts
def calculate_visibility(self, positions: list[KeywordPosition]) -> VisibilityScore:
"""
Calculate weighted visibility score based on CTR curve.
Visibility = sum(keyword_volume * ctr_weight_for_position) / sum(keyword_volume)
Score normalized to 0-100 scale.
"""
vis = VisibilityScore()
total_weighted = 0.0
total_volume = 0
for kp in positions:
if kp.position <= 0 or kp.position > 100:
continue
vis.total_keywords += 1
volume = max(kp.volume, 1) # Avoid zero volume
total_volume += volume
# Position bucket counting
if kp.position <= 3:
vis.top3 += 1
if kp.position <= 10:
vis.top10 += 1
if kp.position <= 20:
vis.top20 += 1
if kp.position <= 50:
vis.top50 += 1
if kp.position <= 100:
vis.top100 += 1
# Weighted visibility
ctr = CTR_WEIGHTS.get(kp.position, 0.0005)
total_weighted += volume * ctr
if total_volume > 0:
# Normalize: max possible is if all keywords were position 1
max_possible = total_volume * CTR_WEIGHTS[1]
vis.score = (total_weighted / max_possible) * 100.0
else:
vis.score = 0.0
logger.info(
f"Visibility score: {vis.score:.2f} | "
f"Top3: {vis.top3}, Top10: {vis.top10}, Top20: {vis.top20}"
)
return vis
def segment_keywords(
self,
positions: list[KeywordPosition],
filter_segment: Optional[str] = None,
) -> dict[str, SegmentData]:
"""
Segment keywords into brand/non-brand and by intent type.
Args:
positions: List of keyword positions
filter_segment: Optional filter - 'brand', 'non_brand', or intent type
Returns:
Dictionary of segment name to SegmentData
"""
segments: dict[str, list[KeywordPosition]] = {
"brand": [],
"non_brand": [],
}
intent_segments: dict[str, list[KeywordPosition]] = {}
for kp in positions:
# Brand segmentation
if kp.is_brand:
segments["brand"].append(kp)
else:
segments["non_brand"].append(kp)
# Intent segmentation
intent_key = kp.intent.lower() if kp.intent else "informational"
if intent_key not in intent_segments:
intent_segments[intent_key] = []
intent_segments[intent_key].append(kp)
# Merge intent segments into main segments
for intent_key, kps in intent_segments.items():
segments[f"intent_{intent_key}"] = kps
# Calculate segment stats
result: dict[str, SegmentData] = {}
for seg_name, kps in segments.items():
if filter_segment and seg_name != filter_segment:
continue
if not kps:
continue
active_positions = [kp for kp in kps if kp.position > 0]
avg_pos = (
sum(kp.position for kp in active_positions) / len(active_positions)
if active_positions else 0.0
)
vis = self.calculate_visibility(kps)
improved = sum(1 for kp in kps if kp.change > 0)
declined = sum(1 for kp in kps if kp.change < 0)
stable = sum(1 for kp in kps if kp.change == 0 and kp.previous_position is not None)
result[seg_name] = SegmentData(
name=seg_name,
keywords=len(kps),
avg_position=round(avg_pos, 1),
visibility=round(vis.score, 2),
improved=improved,
declined=declined,
stable=stable,
)
return result
async def compare_competitors(
self,
target: str,
competitors: list[str],
) -> list[CompetitorComparison]:
"""
Compare ranking positions against competitors.
Uses Ahrefs rank-tracker-competitors-overview MCP tool.
"""
comparisons: list[CompetitorComparison] = []
for competitor in competitors:
logger.info(f"Comparing with competitor: {competitor}")
comp_data = await self._call_competitors_overview(target, competitor)
comparison = CompetitorComparison(competitor=competitor)
if comp_data:
comparison.overlap_keywords = comp_data.get("overlap_keywords", 0)
comparison.competitor_better = comp_data.get("competitor_better", 0)
comparison.target_better = comp_data.get("target_better", 0)
comparison.avg_position_gap = comp_data.get("avg_position_gap", 0.0)
# Extract top gaps (keywords where competitor outranks us most)
top_gaps = comp_data.get("top_gaps", [])
comparison.top_gaps = top_gaps[:10]
comparisons.append(comparison)
return comparisons
async def _call_competitors_overview(self, target: str, competitor: str) -> dict:
"""
Call Ahrefs rank-tracker-competitors-overview MCP tool.
"""
logger.info(f"Calling Ahrefs rank-tracker-competitors-overview...")
try:
import subprocess
result = subprocess.run(
["mcp-cli", "call", "ahrefs/rank-tracker-competitors-overview",
json.dumps({"target": target, "competitor": competitor})],
capture_output=True, text=True, timeout=60,
)
if result.returncode == 0:
return json.loads(result.stdout)
except (FileNotFoundError, subprocess.TimeoutExpired, json.JSONDecodeError):
pass
return {}
async def analyze(
self,
target: str,
threshold: int = 3,
competitors: Optional[list[str]] = None,
segment_filter: Optional[str] = None,
) -> TrackingResult:
"""
Orchestrate full position tracking analysis.
Args:
target: Target website URL
threshold: Position change threshold for alerts
competitors: List of competitor URLs to compare
segment_filter: Optional segment filter (brand, non_brand, intent_*)
Returns:
Complete TrackingResult with all analysis data
"""
logger.info(f"Starting position tracking analysis for: {target}")
logger.info(f"Threshold: {threshold}, Competitors: {competitors or 'none'}")
result = TrackingResult(target=target)
# Step 1: Fetch current positions
positions = await self.get_current_positions(target)
if not positions:
logger.warning("No position data retrieved. Check Ahrefs project configuration.")
return result
result.positions = positions
result.total_keywords = len(positions)
# Step 2: Detect changes and generate alerts
changes, alerts = self.detect_changes(positions, threshold)
result.changes = changes
result.alerts = alerts
# Step 3: Calculate visibility score
visibility = self.calculate_visibility(positions)
result.visibility = visibility
result.visibility_score = visibility.score
# Step 4: Segment keywords
segments = self.segment_keywords(positions, segment_filter)
result.segments = segments
# Step 5: Compare with competitors (if provided)
if competitors:
comp_results = await self.compare_competitors(target, competitors)
result.competitors = comp_results
logger.info(f"Analysis complete. Total keywords: {result.total_keywords}")
logger.info(f"Visibility score: {result.visibility_score:.2f}")
return result
# ---------------------------------------------------------------------------
# Output formatters
# ---------------------------------------------------------------------------
def format_text_report(result: TrackingResult) -> str:
"""Format tracking result as human-readable text report."""
lines = []
lines.append("=" * 60)
lines.append(f"Position Tracking Report: {result.target}")
lines.append(f"Timestamp: {result.timestamp}")
lines.append("=" * 60)
# Visibility overview
lines.append(f"\nVisibility Score: {result.visibility_score:.2f}/100")
lines.append(f"Total Keywords Tracked: {result.total_keywords}")
if result.visibility:
vis = result.visibility
lines.append(f"\nPosition Distribution:")
lines.append(f" Top 3: {vis.top3}")
lines.append(f" Top 10: {vis.top10}")
lines.append(f" Top 20: {vis.top20}")
lines.append(f" Top 50: {vis.top50}")
lines.append(f" Top 100: {vis.top100}")
# Changes summary
ch = result.changes
lines.append(f"\nPosition Changes:")
lines.append(f" Improved: {ch.get('improved', 0)}")
lines.append(f" Declined: {ch.get('declined', 0)}")
lines.append(f" Stable: {ch.get('stable', 0)}")
lines.append(f" New: {ch.get('new', 0)}")
lines.append(f" Lost: {ch.get('lost', 0)}")
# Alerts
if result.alerts:
lines.append(f"\nAlerts ({len(result.alerts)}):")
lines.append("-" * 60)
for alert in result.alerts[:20]:
direction = "UP" if alert.change > 0 else "DOWN"
lines.append(
f" [{alert.severity.upper()}] {alert.keyword}: "
f"{alert.old_position} -> {alert.new_position} "
f"({direction} {abs(alert.change)}) | Vol: {alert.volume}"
)
# Segments
if result.segments:
lines.append(f"\nSegments:")
lines.append("-" * 60)
for name, seg in result.segments.items():
lines.append(
f" {name}: {seg.keywords} keywords, "
f"avg pos {seg.avg_position}, "
f"vis {seg.visibility}"
)
# Competitors
if result.competitors:
lines.append(f"\nCompetitor Comparison:")
lines.append("-" * 60)
for comp in result.competitors:
lines.append(f" vs {comp.competitor}:")
lines.append(f" Overlap: {comp.overlap_keywords} keywords")
lines.append(f" We win: {comp.target_better}")
lines.append(f" They win: {comp.competitor_better}")
lines.append(f" Avg gap: {comp.avg_position_gap:.1f}")
lines.append("\n" + "=" * 60)
return "\n".join(lines)
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Position Tracker - Monitor keyword rankings via Ahrefs Rank Tracker",
)
parser.add_argument(
"--target",
required=True,
help="Target website URL (e.g., https://example.com)",
)
parser.add_argument(
"--threshold",
type=int,
default=3,
help="Position change threshold for alerts (default: 3)",
)
parser.add_argument(
"--segment",
choices=["brand", "non_brand", "intent_informational",
"intent_commercial", "intent_transactional", "intent_navigational"],
default=None,
help="Filter results by keyword segment",
)
parser.add_argument(
"--competitor",
action="append",
dest="competitors",
default=[],
help="Competitor URL to compare (repeatable)",
)
parser.add_argument(
"--json",
action="store_true",
dest="json_output",
help="Output in JSON format",
)
parser.add_argument(
"--output",
type=str,
default=None,
help="Save output to file path",
)
return parser.parse_args()
async def main():
args = parse_args()
tracker = PositionTracker()
result = await tracker.analyze(
target=args.target,
threshold=args.threshold,
competitors=args.competitors,
segment_filter=args.segment,
)
if args.json_output:
output = json.dumps(result.to_dict(), ensure_ascii=False, indent=2)
else:
output = format_text_report(result)
if args.output:
with open(args.output, "w", encoding="utf-8") as f:
f.write(output)
logger.info(f"Output saved to: {args.output}")
else:
print(output)
tracker.print_stats()
if __name__ == "__main__":
asyncio.run(main())