Files
Andrew Yim a3ff965b87 Add SEO skills 19-28, 31-32 with full Python implementations
12 new skills: Keyword Strategy, SERP Analysis, Position Tracking,
Link Building, Content Strategy, E-Commerce SEO, KPI Framework,
International SEO, AI Visibility, Knowledge Graph, Competitor Intel,
and Crawl Budget. ~20K lines of Python across 25 domain scripts.
Updated skill 11 pipeline table and repo CLAUDE.md.
Enhanced skill 18 local SEO workflow from jamie.clinic audit.

Note: Skill 26 hreflang_validator.py pending (content filter block).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-13 12:05:59 +09:00

720 lines
27 KiB
Python

"""
Competitor Profiler - SEO Competitive Intelligence
===================================================
Purpose: Auto-discover competitors, build profile cards, comparison matrices,
keyword overlap analysis, and competitive threat scoring.
Python: 3.10+
Usage:
python competitor_profiler.py --target https://example.com --json
python competitor_profiler.py --target https://example.com --competitor https://comp1.com --json
python competitor_profiler.py --target https://example.com --max-competitors 10 --korean-market --json
"""
import argparse
import asyncio
import json
import logging
import sys
from dataclasses import dataclass, field, asdict
from datetime import datetime
from typing import Any
from urllib.parse import urlparse
from base_client import BaseAsyncClient, config
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Data classes
# ---------------------------------------------------------------------------
@dataclass
class CompetitorProfile:
"""Full profile card for a single domain."""
domain: str
domain_rating: float = 0.0
organic_traffic: int = 0
organic_keywords: int = 0
referring_domains: int = 0
top_pages_count: int = 0
traffic_value_usd: float = 0.0
content_volume: int = 0
naver_blog_presence: bool = False
naver_cafe_presence: bool = False
@dataclass
class KeywordOverlap:
"""Keyword overlap analysis between target and a competitor."""
shared: int = 0
unique_target: int = 0
unique_competitor: int = 0
gap_keywords: int = 0
overlap_percentage: float = 0.0
@dataclass
class ThreatAssessment:
"""Competitive threat score and breakdown for one competitor."""
domain: str = ""
threat_score: float = 0.0
growth_rate: float = 0.0
dr_gap: float = 0.0
keyword_overlap_pct: float = 0.0
traffic_ratio: float = 0.0
strengths: list[str] = field(default_factory=list)
weaknesses: list[str] = field(default_factory=list)
@dataclass
class ComparisonMatrix:
"""Multi-dimensional comparison matrix across SEO dimensions."""
dimensions: list[str] = field(default_factory=list)
target_scores: dict[str, float] = field(default_factory=dict)
competitor_scores: dict[str, dict[str, float]] = field(default_factory=dict)
@dataclass
class CompetitorProfilingResult:
"""Full profiling result with all competitor data."""
target: str = ""
target_profile: CompetitorProfile | None = None
competitors: list[dict[str, Any]] = field(default_factory=list)
comparison_matrix: ComparisonMatrix | None = None
market_position: str = "unknown"
timestamp: str = ""
errors: list[str] = field(default_factory=list)
# ---------------------------------------------------------------------------
# Profiler
# ---------------------------------------------------------------------------
class CompetitorProfiler(BaseAsyncClient):
"""Builds competitor profiles using Ahrefs MCP tools."""
DIMENSIONS = ["traffic", "domain_rating", "keywords", "backlinks", "content"]
def __init__(self, korean_market: bool = False):
super().__init__(max_concurrent=5, requests_per_second=2.0)
self.korean_market = korean_market
@staticmethod
def _extract_domain(url: str) -> str:
"""Extract bare domain from URL or return as-is if already bare."""
if "://" in url:
parsed = urlparse(url)
return parsed.netloc.lower().replace("www.", "")
return url.lower().replace("www.", "")
# ------------------------------------------------------------------
# Ahrefs MCP wrappers (return dicts; Claude MCP bridge fills these)
# ------------------------------------------------------------------
async def _call_ahrefs(self, tool: str, params: dict[str, Any]) -> dict:
"""Simulate Ahrefs MCP call. In production, routed via MCP bridge."""
self.logger.info(f"Ahrefs MCP call: {tool} | params={params}")
return {"tool": tool, "params": params, "data": {}}
async def discover_competitors(
self, target: str, limit: int = 20
) -> list[str]:
"""Discover organic competitors via site-explorer-organic-competitors."""
domain = self._extract_domain(target)
self.logger.info(f"Discovering competitors for {domain} (limit={limit})")
resp = await self._call_ahrefs(
"site-explorer-organic-competitors",
{"target": domain, "limit": limit, "country": "kr"},
)
competitors_raw: list[dict] = resp.get("data", {}).get("competitors", [])
discovered = []
for entry in competitors_raw:
comp_domain = entry.get("domain", "")
if comp_domain and comp_domain != domain:
discovered.append(comp_domain)
if not discovered:
self.logger.warning(
"No competitors returned from Ahrefs; "
"check that the target domain has organic traffic."
)
else:
self.logger.info(f"Discovered {len(discovered)} competitors")
return discovered[:limit]
async def build_profile(self, domain: str) -> CompetitorProfile:
"""Build a complete profile card for a single domain."""
domain = self._extract_domain(domain)
profile = CompetitorProfile(domain=domain)
# --- Metrics ---
metrics_resp = await self._call_ahrefs(
"site-explorer-metrics", {"target": domain}
)
metrics = metrics_resp.get("data", {})
profile.organic_traffic = int(metrics.get("organic_traffic", 0))
profile.organic_keywords = int(metrics.get("organic_keywords", 0))
profile.traffic_value_usd = float(metrics.get("traffic_value", 0.0))
# --- Domain Rating ---
dr_resp = await self._call_ahrefs(
"site-explorer-domain-rating", {"target": domain}
)
dr_data = dr_resp.get("data", {})
profile.domain_rating = float(dr_data.get("domain_rating", 0.0))
# --- Referring Domains ---
bl_resp = await self._call_ahrefs(
"site-explorer-backlinks-stats", {"target": domain}
)
bl_data = bl_resp.get("data", {})
profile.referring_domains = int(bl_data.get("referring_domains", 0))
# --- Top Pages ---
pages_resp = await self._call_ahrefs(
"site-explorer-top-pages", {"target": domain, "limit": 1000}
)
pages_data = pages_resp.get("data", {})
profile.top_pages_count = len(pages_data.get("pages", []))
# --- Content Volume (pages indexed) ---
history_resp = await self._call_ahrefs(
"site-explorer-pages-history", {"target": domain}
)
history_data = history_resp.get("data", {})
data_points = history_data.get("data_points", [])
if data_points:
latest = data_points[-1]
profile.content_volume = int(latest.get("pages", 0))
self.logger.info(
f"Profile built for {domain}: DR={profile.domain_rating}, "
f"traffic={profile.organic_traffic}, keywords={profile.organic_keywords}"
)
return profile
async def analyze_keyword_overlap(
self, target: str, competitor: str, limit: int = 1000
) -> KeywordOverlap:
"""Analyze keyword overlap between target and a single competitor."""
target_domain = self._extract_domain(target)
comp_domain = self._extract_domain(competitor)
# Fetch keyword sets for both domains
target_resp = await self._call_ahrefs(
"site-explorer-organic-keywords",
{"target": target_domain, "limit": limit},
)
comp_resp = await self._call_ahrefs(
"site-explorer-organic-keywords",
{"target": comp_domain, "limit": limit},
)
target_kws: set[str] = set()
for kw in target_resp.get("data", {}).get("keywords", []):
keyword = kw.get("keyword", "")
if keyword:
target_kws.add(keyword.lower())
comp_kws: set[str] = set()
for kw in comp_resp.get("data", {}).get("keywords", []):
keyword = kw.get("keyword", "")
if keyword:
comp_kws.add(keyword.lower())
shared = target_kws & comp_kws
unique_target = target_kws - comp_kws
unique_comp = comp_kws - target_kws
gap = unique_comp # keywords the competitor ranks for but target does not
total_union = len(target_kws | comp_kws) or 1
overlap_pct = (len(shared) / total_union) * 100.0
overlap = KeywordOverlap(
shared=len(shared),
unique_target=len(unique_target),
unique_competitor=len(unique_comp),
gap_keywords=len(gap),
overlap_percentage=round(overlap_pct, 2),
)
self.logger.info(
f"Keyword overlap {target_domain} vs {comp_domain}: "
f"shared={overlap.shared}, gap={overlap.gap_keywords}"
)
return overlap
def build_comparison_matrix(
self,
target_profile: CompetitorProfile,
competitor_profiles: list[CompetitorProfile],
) -> ComparisonMatrix:
"""Create a multi-dimensional comparison matrix."""
matrix = ComparisonMatrix(dimensions=list(self.DIMENSIONS))
# Normalize scores to 0-100 scale relative to max in competitive set
all_profiles = [target_profile] + competitor_profiles
def _max_val(attr: str) -> float:
return max(getattr(p, attr, 0) for p in all_profiles) or 1
max_traffic = _max_val("organic_traffic")
max_dr = 100.0 # DR is already 0-100
max_kw = _max_val("organic_keywords")
max_rd = _max_val("referring_domains")
max_content = _max_val("content_volume")
def _norm(profile: CompetitorProfile) -> dict[str, float]:
return {
"traffic": round((profile.organic_traffic / max_traffic) * 100, 1),
"domain_rating": round(profile.domain_rating, 1),
"keywords": round((profile.organic_keywords / max_kw) * 100, 1),
"backlinks": round((profile.referring_domains / max_rd) * 100, 1),
"content": round((profile.content_volume / max_content) * 100, 1)
if max_content > 0
else 0.0,
}
matrix.target_scores = _norm(target_profile)
for cp in competitor_profiles:
matrix.competitor_scores[cp.domain] = _norm(cp)
return matrix
def score_threat(
self,
target_profile: CompetitorProfile,
competitor_profile: CompetitorProfile,
overlap: KeywordOverlap,
) -> ThreatAssessment:
"""Score competitive threat 0-100 based on multiple factors."""
assessment = ThreatAssessment(domain=competitor_profile.domain)
# --- DR gap (positive = competitor stronger) ---
dr_gap = competitor_profile.domain_rating - target_profile.domain_rating
assessment.dr_gap = round(dr_gap, 1)
dr_score = min(max((dr_gap + 30) / 60 * 100, 0), 100) # scale -30..+30 -> 0-100
# --- Traffic ratio ---
target_traffic = max(target_profile.organic_traffic, 1)
traffic_ratio = competitor_profile.organic_traffic / target_traffic
assessment.traffic_ratio = round(traffic_ratio, 2)
traffic_score = min(traffic_ratio * 50, 100) # 2x traffic = 100
# --- Keyword overlap percentage ---
assessment.keyword_overlap_pct = overlap.overlap_percentage
overlap_score = min(overlap.overlap_percentage * 2, 100) # 50% overlap = 100
# --- Gap keywords (competitor ranks, target doesn't) ---
total_target_kw = max(overlap.shared + overlap.unique_target, 1)
gap_ratio = overlap.gap_keywords / total_target_kw
gap_score = min(gap_ratio * 100, 100)
# --- Weighted threat score ---
threat = (
dr_score * 0.20
+ traffic_score * 0.30
+ overlap_score * 0.25
+ gap_score * 0.25
)
assessment.threat_score = round(min(max(threat, 0), 100), 1)
# --- Identify strengths & weaknesses ---
if dr_gap > 5:
assessment.strengths.append(f"Higher DR by {dr_gap:.0f} points")
elif dr_gap < -5:
assessment.weaknesses.append(f"Lower DR by {abs(dr_gap):.0f} points")
if traffic_ratio > 1.5:
assessment.strengths.append(
f"Traffic {traffic_ratio:.1f}x higher than target"
)
elif traffic_ratio < 0.5:
assessment.weaknesses.append(
f"Traffic only {traffic_ratio:.1f}x of target"
)
if overlap.gap_keywords > overlap.shared:
assessment.strengths.append(
f"{overlap.gap_keywords} keywords target is missing"
)
if competitor_profile.referring_domains > target_profile.referring_domains * 1.5:
assessment.strengths.append("Significantly more referring domains")
elif competitor_profile.referring_domains < target_profile.referring_domains * 0.5:
assessment.weaknesses.append("Fewer referring domains")
if competitor_profile.content_volume > target_profile.content_volume * 1.5:
assessment.strengths.append("Larger content volume")
elif competitor_profile.content_volume < target_profile.content_volume * 0.5:
assessment.weaknesses.append("Smaller content library")
self.logger.info(
f"Threat score for {competitor_profile.domain}: "
f"{assessment.threat_score}/100"
)
return assessment
async def detect_korean_presence(self, domain: str) -> dict[str, bool]:
"""Check Naver Blog/Cafe presence for a domain (heuristic)."""
domain = self._extract_domain(domain)
self.logger.info(f"Checking Korean market presence for {domain}")
# In production, this would use WebSearch MCP to query Naver
# Heuristic: check if domain has .co.kr or .kr TLD,
# or has Korean-language top pages
is_korean_tld = domain.endswith(".kr") or domain.endswith(".co.kr")
# Check top pages for Korean content signals
pages_resp = await self._call_ahrefs(
"site-explorer-organic-keywords",
{"target": domain, "limit": 50, "country": "kr"},
)
kr_keywords = pages_resp.get("data", {}).get("keywords", [])
has_kr_keywords = len(kr_keywords) > 0
return {
"naver_blog_presence": is_korean_tld or has_kr_keywords,
"naver_cafe_presence": is_korean_tld,
"korean_tld": is_korean_tld,
"korean_keyword_count": len(kr_keywords),
}
def determine_market_position(
self,
target_profile: CompetitorProfile,
competitor_profiles: list[CompetitorProfile],
) -> str:
"""Classify target as leader / challenger / follower / niche."""
if not competitor_profiles:
return "unknown"
all_profiles = [target_profile] + competitor_profiles
all_profiles.sort(key=lambda p: p.organic_traffic, reverse=True)
target_rank = next(
(i for i, p in enumerate(all_profiles) if p.domain == target_profile.domain),
len(all_profiles),
)
total = len(all_profiles)
percentile = target_rank / total
# DR comparison
avg_competitor_dr = (
sum(p.domain_rating for p in competitor_profiles) / len(competitor_profiles)
if competitor_profiles
else 0
)
dr_advantage = target_profile.domain_rating - avg_competitor_dr
# Traffic leader check
max_traffic = max(p.organic_traffic for p in all_profiles) or 1
traffic_share = target_profile.organic_traffic / max_traffic
if percentile <= 0.1 and traffic_share >= 0.8:
return "leader"
elif percentile <= 0.33 or (dr_advantage > 10 and traffic_share > 0.5):
return "challenger"
elif percentile <= 0.66:
return "follower"
else:
# Check if niche player (high DR but low traffic = niche authority)
if target_profile.domain_rating > avg_competitor_dr:
return "niche"
return "follower"
async def profile(
self,
target: str,
competitors: list[str] | None = None,
max_competitors: int = 10,
) -> CompetitorProfilingResult:
"""Orchestrate full competitor profiling pipeline."""
timestamp = datetime.now().isoformat()
result = CompetitorProfilingResult(
target=self._extract_domain(target),
timestamp=timestamp,
)
try:
# Step 1: Build target profile
self.logger.info("Step 1/6: Building target profile...")
target_profile = await self.build_profile(target)
result.target_profile = target_profile
# Step 2: Discover or validate competitors
self.logger.info("Step 2/6: Discovering competitors...")
if competitors:
comp_domains = [self._extract_domain(c) for c in competitors]
else:
comp_domains = await self.discover_competitors(
target, limit=max_competitors
)
if not comp_domains:
result.errors.append("No competitors found or provided.")
return result
comp_domains = comp_domains[:max_competitors]
# Step 3: Build competitor profiles
self.logger.info(
f"Step 3/6: Profiling {len(comp_domains)} competitors..."
)
competitor_profiles: list[CompetitorProfile] = []
for domain in comp_domains:
try:
cp = await self.build_profile(domain)
if self.korean_market:
kr_presence = await self.detect_korean_presence(domain)
cp.naver_blog_presence = kr_presence.get(
"naver_blog_presence", False
)
cp.naver_cafe_presence = kr_presence.get(
"naver_cafe_presence", False
)
competitor_profiles.append(cp)
except Exception as e:
msg = f"Failed to profile {domain}: {e}"
self.logger.error(msg)
result.errors.append(msg)
# Step 4: Keyword overlap analysis
self.logger.info("Step 4/6: Analyzing keyword overlaps...")
overlaps: dict[str, KeywordOverlap] = {}
for cp in competitor_profiles:
try:
overlap = await self.analyze_keyword_overlap(target, cp.domain)
overlaps[cp.domain] = overlap
except Exception as e:
msg = f"Keyword overlap failed for {cp.domain}: {e}"
self.logger.error(msg)
result.errors.append(msg)
overlaps[cp.domain] = KeywordOverlap()
# Step 5: Build comparison matrix
self.logger.info("Step 5/6: Building comparison matrix...")
matrix = self.build_comparison_matrix(target_profile, competitor_profiles)
result.comparison_matrix = matrix
# Step 6: Score threats and assemble output
self.logger.info("Step 6/6: Scoring competitive threats...")
for cp in competitor_profiles:
overlap = overlaps.get(cp.domain, KeywordOverlap())
threat = self.score_threat(target_profile, cp, overlap)
competitor_entry = {
"domain": cp.domain,
"profile": asdict(cp),
"threat_score": threat.threat_score,
"threat_detail": asdict(threat),
"keyword_overlap": asdict(overlap),
}
result.competitors.append(competitor_entry)
# Sort by threat score descending
result.competitors.sort(
key=lambda c: c.get("threat_score", 0), reverse=True
)
# Determine market position
result.market_position = self.determine_market_position(
target_profile, competitor_profiles
)
self.logger.info(
f"Profiling complete: {len(result.competitors)} competitors analyzed. "
f"Market position: {result.market_position}"
)
except Exception as e:
msg = f"Profiling pipeline error: {e}"
self.logger.error(msg)
result.errors.append(msg)
return result
# ---------------------------------------------------------------------------
# Output helpers
# ---------------------------------------------------------------------------
def _format_text_report(result: CompetitorProfilingResult) -> str:
"""Format profiling result as human-readable text report."""
lines: list[str] = []
lines.append("=" * 70)
lines.append(f" COMPETITOR INTELLIGENCE REPORT")
lines.append(f" Target: {result.target}")
lines.append(f" Generated: {result.timestamp}")
lines.append(f" Market Position: {result.market_position.upper()}")
lines.append("=" * 70)
if result.target_profile:
tp = result.target_profile
lines.append("")
lines.append("--- TARGET PROFILE ---")
lines.append(f" Domain Rating: {tp.domain_rating}")
lines.append(f" Organic Traffic: {tp.organic_traffic:,}")
lines.append(f" Organic Keywords: {tp.organic_keywords:,}")
lines.append(f" Referring Domains: {tp.referring_domains:,}")
lines.append(f" Top Pages: {tp.top_pages_count:,}")
lines.append(f" Content Volume: {tp.content_volume:,}")
lines.append(f" Traffic Value: ${tp.traffic_value_usd:,.2f}")
if result.competitors:
lines.append("")
lines.append("--- COMPETITORS (sorted by threat score) ---")
for i, comp in enumerate(result.competitors, 1):
p = comp["profile"]
t = comp["threat_detail"]
o = comp["keyword_overlap"]
lines.append("")
lines.append(f" #{i} {comp['domain']}")
lines.append(f" Threat Score: {comp['threat_score']}/100")
lines.append(f" Domain Rating: {p['domain_rating']}")
lines.append(f" Organic Traffic: {p['organic_traffic']:,}")
lines.append(f" Keywords: {p['organic_keywords']:,}")
lines.append(f" Referring Doms: {p['referring_domains']:,}")
lines.append(f" Keyword Overlap: {o['shared']} shared, {o['gap_keywords']} gap")
if t.get("strengths"):
lines.append(f" Strengths: {'; '.join(t['strengths'])}")
if t.get("weaknesses"):
lines.append(f" Weaknesses: {'; '.join(t['weaknesses'])}")
if result.comparison_matrix:
m = result.comparison_matrix
lines.append("")
lines.append("--- COMPARISON MATRIX ---")
header = f" {'Dimension':<20} {'Target':>10}"
for domain in m.competitor_scores:
short = domain[:15]
header += f" {short:>15}"
lines.append(header)
lines.append(" " + "-" * (len(header) - 2))
for dim in m.dimensions:
row = f" {dim:<20} {m.target_scores.get(dim, 0):>10.1f}"
for domain, scores in m.competitor_scores.items():
row += f" {scores.get(dim, 0):>15.1f}"
lines.append(row)
if result.errors:
lines.append("")
lines.append("--- ERRORS ---")
for err in result.errors:
lines.append(f" - {err}")
lines.append("")
lines.append("=" * 70)
return "\n".join(lines)
def _serialize_result(result: CompetitorProfilingResult) -> dict:
"""Convert result to JSON-serializable dict."""
output = {
"target": result.target,
"target_profile": asdict(result.target_profile) if result.target_profile else None,
"competitors": result.competitors,
"comparison_matrix": asdict(result.comparison_matrix)
if result.comparison_matrix
else None,
"market_position": result.market_position,
"timestamp": result.timestamp,
}
if result.errors:
output["errors"] = result.errors
return output
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="SEO Competitor Profiler - Build competitive intelligence reports",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""\
Examples:
python competitor_profiler.py --target https://example.com --json
python competitor_profiler.py --target https://example.com --competitor https://comp1.com --json
python competitor_profiler.py --target https://example.com --max-competitors 10 --korean-market --json
""",
)
parser.add_argument(
"--target",
required=True,
help="Target website URL or domain to analyze",
)
parser.add_argument(
"--competitor",
action="append",
dest="competitors",
default=[],
help="Competitor URL/domain (repeatable; omit for auto-discovery)",
)
parser.add_argument(
"--max-competitors",
type=int,
default=10,
help="Maximum competitors to profile (default: 10)",
)
parser.add_argument(
"--korean-market",
action="store_true",
default=False,
help="Include Korean market analysis (Naver Blog/Cafe presence)",
)
parser.add_argument(
"--json",
action="store_true",
default=False,
help="Output in JSON format",
)
parser.add_argument(
"--output",
type=str,
default=None,
help="Save output to file path",
)
return parser.parse_args(argv)
async def async_main(args: argparse.Namespace) -> None:
profiler = CompetitorProfiler(korean_market=args.korean_market)
result = await profiler.profile(
target=args.target,
competitors=args.competitors or None,
max_competitors=args.max_competitors,
)
if args.json:
output_str = json.dumps(_serialize_result(result), indent=2, ensure_ascii=False)
else:
output_str = _format_text_report(result)
if args.output:
with open(args.output, "w", encoding="utf-8") as f:
f.write(output_str)
logger.info(f"Report saved to {args.output}")
else:
print(output_str)
profiler.print_stats()
def main() -> None:
args = parse_args()
asyncio.run(async_main(args))
if __name__ == "__main__":
main()