12 new skills: Keyword Strategy, SERP Analysis, Position Tracking, Link Building, Content Strategy, E-Commerce SEO, KPI Framework, International SEO, AI Visibility, Knowledge Graph, Competitor Intel, and Crawl Budget. ~20K lines of Python across 25 domain scripts. Updated skill 11 pipeline table and repo CLAUDE.md. Enhanced skill 18 local SEO workflow from jamie.clinic audit. Note: Skill 26 hreflang_validator.py pending (content filter block). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
803 lines
30 KiB
Python
803 lines
30 KiB
Python
"""
|
|
Link Gap Finder - Competitor link gap analysis
|
|
===============================================
|
|
Purpose: Identify link building opportunities by finding domains that link
|
|
to competitors but not to the target site via Ahrefs MCP.
|
|
Python: 3.10+
|
|
Usage:
|
|
python link_gap_finder.py --target https://example.com --competitor https://comp1.com --json
|
|
python link_gap_finder.py --target https://example.com --competitor https://comp1.com --competitor https://comp2.com --min-dr 30 --json
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import re
|
|
import sys
|
|
from dataclasses import dataclass, field, asdict
|
|
from datetime import datetime
|
|
from typing import Any
|
|
from urllib.parse import urlparse
|
|
|
|
import aiohttp
|
|
import pandas as pd
|
|
from rich.console import Console
|
|
from rich.table import Table
|
|
|
|
from base_client import BaseAsyncClient, config
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Logging
|
|
# ---------------------------------------------------------------------------
|
|
logger = logging.getLogger("link_gap_finder")
|
|
console = Console()
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Constants
|
|
# ---------------------------------------------------------------------------
|
|
AHREFS_BASE = "https://api.ahrefs.com/v3"
|
|
|
|
# Source category detection patterns
|
|
SOURCE_CATEGORY_PATTERNS: dict[str, list[str]] = {
|
|
"news": [
|
|
"news", "press", "media", "journal", "herald", "times", "post",
|
|
"gazette", "tribune", "daily", "chosun", "donga", "joongang",
|
|
"hani", "khan", "yna", "yonhap", "reuters", "bloomberg",
|
|
"techcrunch", "verge", "wired", "arstechnica", "bbc", "cnn",
|
|
],
|
|
"blog": [
|
|
"blog", "wordpress", "medium.com", "tistory.com", "brunch.co.kr",
|
|
"blog.naver.com", "tumblr", "blogger", "substack", "ghost.io",
|
|
"velog.io", "dev.to",
|
|
],
|
|
"forum": [
|
|
"forum", "community", "discuss", "reddit.com", "quora.com",
|
|
"stackexchange", "stackoverflow", "cafe.naver.com", "dcinside",
|
|
"fmkorea", "clien", "ppomppu", "theqoo", "ruliweb",
|
|
],
|
|
"directory": [
|
|
"directory", "listing", "yellowpages", "yelp", "bbb.org",
|
|
"clutch.co", "g2.com", "capterra", "trustpilot", "glassdoor",
|
|
"dmoz", "aboutus", "hotfrog", "manta", "superpages",
|
|
],
|
|
"edu_gov": [
|
|
".edu", ".gov", ".ac.kr", ".go.kr", ".or.kr",
|
|
],
|
|
"social": [
|
|
"facebook.com", "twitter.com", "x.com", "linkedin.com",
|
|
"instagram.com", "youtube.com", "pinterest.com", "tiktok.com",
|
|
],
|
|
"korean_platform": [
|
|
"naver.com", "daum.net", "kakao.com", "tistory.com",
|
|
"brunch.co.kr", "zum.com", "nate.com",
|
|
],
|
|
}
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Dataclasses
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@dataclass
|
|
class LinkOpportunity:
|
|
"""A single link building opportunity from gap analysis."""
|
|
domain: str
|
|
dr: float = 0.0
|
|
traffic: int = 0
|
|
linked_competitors: list[str] = field(default_factory=list)
|
|
competitor_count: int = 0
|
|
not_linked_target: bool = True
|
|
category: str = "other"
|
|
feasibility_score: float = 0.0
|
|
impact_score: float = 0.0
|
|
overall_score: float = 0.0
|
|
backlinks_to_competitors: int = 0
|
|
country: str = ""
|
|
top_anchor: str = ""
|
|
|
|
|
|
@dataclass
|
|
class GapSummary:
|
|
"""Summary statistics for the gap analysis."""
|
|
total_opportunities: int = 0
|
|
avg_dr: float = 0.0
|
|
high_dr_count: int = 0
|
|
category_breakdown: dict[str, int] = field(default_factory=dict)
|
|
top_countries: list[dict[str, Any]] = field(default_factory=list)
|
|
total_competitor_refdomains: dict[str, int] = field(default_factory=dict)
|
|
target_refdomains_count: int = 0
|
|
|
|
|
|
@dataclass
|
|
class LinkGapResult:
|
|
"""Complete link gap analysis result."""
|
|
target_url: str
|
|
target_domain: str = ""
|
|
competitor_urls: list[str] = field(default_factory=list)
|
|
competitor_domains: list[str] = field(default_factory=list)
|
|
target_dr: float = 0.0
|
|
opportunities: list[LinkOpportunity] = field(default_factory=list)
|
|
summary: GapSummary | None = None
|
|
top_opportunities: list[LinkOpportunity] = field(default_factory=list)
|
|
issues: list[dict[str, str]] = field(default_factory=list)
|
|
recommendations: list[str] = field(default_factory=list)
|
|
timestamp: str = ""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# LinkGapFinder
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class LinkGapFinder(BaseAsyncClient):
|
|
"""Find link building opportunities by analyzing competitor backlink gaps."""
|
|
|
|
def __init__(self, **kwargs):
|
|
super().__init__(max_concurrent=5, requests_per_second=2.0, **kwargs)
|
|
self.session: aiohttp.ClientSession | None = None
|
|
|
|
# -- Ahrefs MCP helper ---------------------------------------------------
|
|
|
|
async def _call_ahrefs(
|
|
self, endpoint: str, params: dict[str, Any]
|
|
) -> dict[str, Any]:
|
|
"""
|
|
Call Ahrefs API endpoint.
|
|
|
|
In MCP context this calls mcp__ahrefs__<endpoint>.
|
|
For standalone use, falls back to REST API with token.
|
|
"""
|
|
api_token = config.get_required("AHREFS_API_TOKEN") if not self.session else None
|
|
|
|
if self.session and api_token:
|
|
url = f"{AHREFS_BASE}/{endpoint}"
|
|
headers = {"Authorization": f"Bearer {api_token}"}
|
|
async with self.session.get(url, headers=headers, params=params) as resp:
|
|
resp.raise_for_status()
|
|
return await resp.json()
|
|
|
|
logger.warning(
|
|
f"Ahrefs call to '{endpoint}' - use MCP tool "
|
|
f"mcp__ahrefs__{endpoint.replace('-', '_')} in Claude Desktop"
|
|
)
|
|
return {"endpoint": endpoint, "params": params, "data": [], "note": "mcp_stub"}
|
|
|
|
# -- Core methods --------------------------------------------------------
|
|
|
|
async def get_referring_domains(
|
|
self, url: str, limit: int = 1000
|
|
) -> list[dict[str, Any]]:
|
|
"""Fetch referring domains for a given URL/domain."""
|
|
target = urlparse(url).netloc or url
|
|
result = await self._call_ahrefs(
|
|
"site-explorer-referring-domains",
|
|
{"target": target, "mode": "domain", "limit": limit, "order_by": "domain_rating:desc"},
|
|
)
|
|
domains = result.get("data", result.get("refdomains", []))
|
|
if isinstance(domains, dict):
|
|
domains = domains.get("refdomains", [])
|
|
return domains if isinstance(domains, list) else []
|
|
|
|
async def get_domain_rating(self, url: str) -> float:
|
|
"""Fetch Domain Rating for a URL."""
|
|
target = urlparse(url).netloc or url
|
|
result = await self._call_ahrefs(
|
|
"site-explorer-domain-rating",
|
|
{"target": target},
|
|
)
|
|
data = result.get("data", result) if isinstance(result, dict) else {}
|
|
return data.get("domain_rating", 0.0)
|
|
|
|
async def get_domain_metrics(self, url: str) -> dict[str, Any]:
|
|
"""Fetch comprehensive domain metrics."""
|
|
target = urlparse(url).netloc or url
|
|
result = await self._call_ahrefs(
|
|
"site-explorer-backlinks-stats",
|
|
{"target": target, "mode": "domain"},
|
|
)
|
|
data = result.get("data", result) if isinstance(result, dict) else {}
|
|
return {
|
|
"total_backlinks": data.get("live", 0),
|
|
"referring_domains": data.get("live_refdomains", 0),
|
|
"dofollow": data.get("live_dofollow", 0),
|
|
}
|
|
|
|
def find_gaps(
|
|
self,
|
|
target_domains: set[str],
|
|
competitor_domain_maps: dict[str, set[str]],
|
|
) -> list[dict[str, Any]]:
|
|
"""
|
|
Find domains linking to competitors but not to the target.
|
|
|
|
Returns a list of gap domains with metadata about which
|
|
competitors they link to.
|
|
"""
|
|
# Collect all competitor referring domains
|
|
all_competitor_domains: dict[str, list[str]] = {}
|
|
|
|
for comp_name, comp_domains in competitor_domain_maps.items():
|
|
for domain in comp_domains:
|
|
domain_lower = domain.lower()
|
|
if domain_lower not in all_competitor_domains:
|
|
all_competitor_domains[domain_lower] = []
|
|
all_competitor_domains[domain_lower].append(comp_name)
|
|
|
|
# Find gaps: in competitor set but not in target set
|
|
target_set_lower = {d.lower() for d in target_domains}
|
|
gaps = []
|
|
|
|
for domain, linked_comps in all_competitor_domains.items():
|
|
if domain not in target_set_lower:
|
|
gaps.append({
|
|
"domain": domain,
|
|
"linked_competitors": linked_comps,
|
|
"competitor_count": len(set(linked_comps)),
|
|
})
|
|
|
|
# Sort by number of competitors linking (more = higher priority)
|
|
gaps.sort(key=lambda g: g["competitor_count"], reverse=True)
|
|
return gaps
|
|
|
|
def score_opportunities(
|
|
self,
|
|
gaps: list[dict[str, Any]],
|
|
refdomains_data: dict[str, list[dict[str, Any]]],
|
|
total_competitors: int,
|
|
) -> list[LinkOpportunity]:
|
|
"""
|
|
Score gap opportunities by DR, traffic, relevance, and feasibility.
|
|
|
|
Scoring factors:
|
|
- DR weight: Higher DR = more impactful link
|
|
- Competitor overlap: More competitors linking = easier to acquire
|
|
- Category bonus: Editorial/news links valued higher
|
|
- Traffic bonus: Higher traffic domains valued more
|
|
"""
|
|
# Build a lookup of domain metadata from competitor refdomains
|
|
domain_metadata: dict[str, dict[str, Any]] = {}
|
|
for comp_url, domains in refdomains_data.items():
|
|
for rd in domains:
|
|
d = rd.get("domain", rd.get("domain_from", "")).lower()
|
|
if d and d not in domain_metadata:
|
|
domain_metadata[d] = {
|
|
"dr": rd.get("domain_rating", rd.get("dr", 0)),
|
|
"traffic": rd.get("organic_traffic", rd.get("traffic", 0)),
|
|
"backlinks": rd.get("backlinks", 0),
|
|
"country": rd.get("country", ""),
|
|
}
|
|
|
|
opportunities = []
|
|
|
|
for gap in gaps:
|
|
domain = gap["domain"]
|
|
meta = domain_metadata.get(domain, {})
|
|
|
|
dr = meta.get("dr", 0)
|
|
traffic = meta.get("traffic", 0)
|
|
comp_count = gap["competitor_count"]
|
|
|
|
# Category detection
|
|
category = self._detect_category(domain)
|
|
|
|
# Feasibility score (0-100)
|
|
# Higher if: more competitors link (social proof), blog/forum (easier outreach)
|
|
feasibility = min(100, (
|
|
(comp_count / max(total_competitors, 1)) * 40 # Competitor overlap
|
|
+ (30 if category in ("blog", "forum", "directory") else 10) # Category ease
|
|
+ (20 if dr < 60 else 5) # Lower DR = easier to get link from
|
|
+ (10 if traffic > 0 else 0) # Active site bonus
|
|
))
|
|
|
|
# Impact score (0-100)
|
|
# Higher if: high DR, high traffic, editorial/news
|
|
impact = min(100, (
|
|
min(dr, 100) * 0.4 # DR weight (40%)
|
|
+ min(traffic / 1000, 30) # Traffic weight (up to 30)
|
|
+ (20 if category in ("news", "edu_gov") else 5) # Authority bonus
|
|
+ (comp_count / max(total_competitors, 1)) * 10 # Validation
|
|
))
|
|
|
|
# Overall score = weighted average
|
|
overall = round(feasibility * 0.4 + impact * 0.6, 1)
|
|
|
|
opp = LinkOpportunity(
|
|
domain=domain,
|
|
dr=dr,
|
|
traffic=traffic,
|
|
linked_competitors=gap["linked_competitors"],
|
|
competitor_count=comp_count,
|
|
not_linked_target=True,
|
|
category=category,
|
|
feasibility_score=round(feasibility, 1),
|
|
impact_score=round(impact, 1),
|
|
overall_score=overall,
|
|
backlinks_to_competitors=meta.get("backlinks", 0),
|
|
country=meta.get("country", ""),
|
|
)
|
|
opportunities.append(opp)
|
|
|
|
# Sort by overall score descending
|
|
opportunities.sort(key=lambda o: o.overall_score, reverse=True)
|
|
return opportunities
|
|
|
|
def categorize_sources(
|
|
self, opportunities: list[LinkOpportunity]
|
|
) -> dict[str, list[LinkOpportunity]]:
|
|
"""Group opportunities by source category."""
|
|
categorized: dict[str, list[LinkOpportunity]] = {}
|
|
for opp in opportunities:
|
|
cat = opp.category
|
|
if cat not in categorized:
|
|
categorized[cat] = []
|
|
categorized[cat].append(opp)
|
|
return categorized
|
|
|
|
# -- Orchestration -------------------------------------------------------
|
|
|
|
async def analyze(
|
|
self,
|
|
target_url: str,
|
|
competitor_urls: list[str],
|
|
min_dr: float = 0,
|
|
country_filter: str = "",
|
|
limit: int = 1000,
|
|
) -> LinkGapResult:
|
|
"""Orchestrate full link gap analysis."""
|
|
target_domain = urlparse(target_url).netloc or target_url
|
|
comp_domains = [urlparse(c).netloc or c for c in competitor_urls]
|
|
|
|
logger.info(f"Starting link gap analysis: {target_domain} vs {comp_domains}")
|
|
|
|
result = LinkGapResult(
|
|
target_url=target_url,
|
|
target_domain=target_domain,
|
|
competitor_urls=competitor_urls,
|
|
competitor_domains=comp_domains,
|
|
timestamp=datetime.now().isoformat(),
|
|
)
|
|
|
|
# Phase 1: Fetch target DR and referring domains
|
|
logger.info("Phase 1: Fetching target data...")
|
|
target_dr_task = self.get_domain_rating(target_url)
|
|
target_rd_task = self.get_referring_domains(target_url, limit=limit)
|
|
|
|
target_dr, target_refdomains = await asyncio.gather(
|
|
target_dr_task, target_rd_task, return_exceptions=True,
|
|
)
|
|
|
|
result.target_dr = target_dr if isinstance(target_dr, (int, float)) else 0
|
|
target_rd_list = target_refdomains if isinstance(target_refdomains, list) else []
|
|
target_domain_set = {
|
|
rd.get("domain", rd.get("domain_from", "")).lower()
|
|
for rd in target_rd_list
|
|
if rd.get("domain", rd.get("domain_from", ""))
|
|
}
|
|
|
|
# Phase 2: Fetch competitor referring domains (parallel)
|
|
logger.info("Phase 2: Fetching competitor data...")
|
|
comp_rd_tasks = {
|
|
comp_url: self.get_referring_domains(comp_url, limit=limit)
|
|
for comp_url in competitor_urls
|
|
}
|
|
comp_results = {}
|
|
for comp_url, task in comp_rd_tasks.items():
|
|
try:
|
|
comp_rd = await task
|
|
comp_results[comp_url] = comp_rd if isinstance(comp_rd, list) else []
|
|
except Exception as e:
|
|
logger.error(f"Failed to fetch refdomains for {comp_url}: {e}")
|
|
comp_results[comp_url] = []
|
|
|
|
# Build competitor domain maps
|
|
competitor_domain_maps: dict[str, set[str]] = {}
|
|
for comp_url, rd_list in comp_results.items():
|
|
comp_domain = urlparse(comp_url).netloc or comp_url
|
|
competitor_domain_maps[comp_domain] = {
|
|
rd.get("domain", rd.get("domain_from", "")).lower()
|
|
for rd in rd_list
|
|
if rd.get("domain", rd.get("domain_from", ""))
|
|
}
|
|
|
|
# Phase 3: Find gaps
|
|
logger.info("Phase 3: Finding link gaps...")
|
|
raw_gaps = self.find_gaps(target_domain_set, competitor_domain_maps)
|
|
logger.info(f"Found {len(raw_gaps)} gap domains")
|
|
|
|
# Phase 4: Score opportunities
|
|
logger.info("Phase 4: Scoring opportunities...")
|
|
opportunities = self.score_opportunities(
|
|
raw_gaps, comp_results, len(competitor_urls)
|
|
)
|
|
|
|
# Apply filters
|
|
if min_dr > 0:
|
|
opportunities = [o for o in opportunities if o.dr >= min_dr]
|
|
|
|
if country_filter:
|
|
country_lower = country_filter.lower()
|
|
opportunities = [
|
|
o for o in opportunities
|
|
if o.country.lower() == country_lower or not o.country
|
|
]
|
|
|
|
result.opportunities = opportunities
|
|
result.top_opportunities = opportunities[:50]
|
|
|
|
# Phase 5: Build summary
|
|
logger.info("Phase 5: Building summary...")
|
|
result.summary = self._build_summary(
|
|
opportunities, comp_results, len(target_rd_list)
|
|
)
|
|
|
|
# Phase 6: Generate recommendations
|
|
self._generate_issues(result)
|
|
self._generate_recommendations(result)
|
|
|
|
logger.info(f"Link gap analysis complete: {len(opportunities)} opportunities found")
|
|
return result
|
|
|
|
# -- Helpers -------------------------------------------------------------
|
|
|
|
@staticmethod
|
|
def _detect_category(domain: str) -> str:
|
|
"""Detect the category of a domain based on patterns."""
|
|
domain_lower = domain.lower()
|
|
|
|
for category, patterns in SOURCE_CATEGORY_PATTERNS.items():
|
|
for pattern in patterns:
|
|
if pattern in domain_lower:
|
|
return category
|
|
|
|
# Fallback heuristics
|
|
if domain_lower.endswith((".edu", ".ac.kr", ".gov", ".go.kr")):
|
|
return "edu_gov"
|
|
|
|
return "other"
|
|
|
|
def _build_summary(
|
|
self,
|
|
opportunities: list[LinkOpportunity],
|
|
comp_results: dict[str, list],
|
|
target_rd_count: int,
|
|
) -> GapSummary:
|
|
"""Build summary statistics from opportunities."""
|
|
summary = GapSummary()
|
|
summary.total_opportunities = len(opportunities)
|
|
summary.target_refdomains_count = target_rd_count
|
|
|
|
if opportunities:
|
|
dr_values = [o.dr for o in opportunities if o.dr > 0]
|
|
summary.avg_dr = round(sum(dr_values) / max(len(dr_values), 1), 1)
|
|
summary.high_dr_count = sum(1 for o in opportunities if o.dr >= 50)
|
|
|
|
# Category breakdown
|
|
cat_counts: dict[str, int] = {}
|
|
country_counts: dict[str, int] = {}
|
|
for opp in opportunities:
|
|
cat_counts[opp.category] = cat_counts.get(opp.category, 0) + 1
|
|
if opp.country:
|
|
country_counts[opp.country] = country_counts.get(opp.country, 0) + 1
|
|
|
|
summary.category_breakdown = dict(
|
|
sorted(cat_counts.items(), key=lambda x: x[1], reverse=True)
|
|
)
|
|
summary.top_countries = sorted(
|
|
[{"country": k, "count": v} for k, v in country_counts.items()],
|
|
key=lambda x: x["count"], reverse=True,
|
|
)[:10]
|
|
|
|
# Competitor refdomains counts
|
|
for comp_url, rd_list in comp_results.items():
|
|
comp_domain = urlparse(comp_url).netloc or comp_url
|
|
summary.total_competitor_refdomains[comp_domain] = len(rd_list)
|
|
|
|
return summary
|
|
|
|
def _generate_issues(self, result: LinkGapResult) -> None:
|
|
"""Generate issues based on gap analysis."""
|
|
issues = []
|
|
|
|
if result.summary:
|
|
# Large gap warning
|
|
if result.summary.total_opportunities > 500:
|
|
issues.append({
|
|
"type": "warning",
|
|
"category": "link_gap",
|
|
"message": (
|
|
f"Large link gap: {result.summary.total_opportunities} domains "
|
|
"link to competitors but not to you"
|
|
),
|
|
})
|
|
|
|
# High-DR gap
|
|
if result.summary.high_dr_count > 50:
|
|
issues.append({
|
|
"type": "error",
|
|
"category": "authority_gap",
|
|
"message": (
|
|
f"{result.summary.high_dr_count} high-authority domains (DR 50+) "
|
|
"link to competitors but not to you"
|
|
),
|
|
})
|
|
|
|
# Category-specific gaps
|
|
news_gap = result.summary.category_breakdown.get("news", 0)
|
|
if news_gap > 20:
|
|
issues.append({
|
|
"type": "warning",
|
|
"category": "pr_gap",
|
|
"message": f"{news_gap} news/media domains link to competitors - consider digital PR",
|
|
})
|
|
|
|
edu_gap = result.summary.category_breakdown.get("edu_gov", 0)
|
|
if edu_gap > 5:
|
|
issues.append({
|
|
"type": "info",
|
|
"category": "edu_gov_gap",
|
|
"message": f"{edu_gap} .edu/.gov domains link to competitors - high-authority opportunity",
|
|
})
|
|
|
|
result.issues = issues
|
|
|
|
def _generate_recommendations(self, result: LinkGapResult) -> None:
|
|
"""Generate actionable recommendations."""
|
|
recs = []
|
|
|
|
if not result.opportunities:
|
|
recs.append("No significant link gaps found. Consider expanding competitor list.")
|
|
result.recommendations = recs
|
|
return
|
|
|
|
# Top opportunities by category
|
|
categorized = self.categorize_sources(result.top_opportunities[:100])
|
|
|
|
if "news" in categorized:
|
|
news_count = len(categorized["news"])
|
|
top_news = [o.domain for o in categorized["news"][:3]]
|
|
recs.append(
|
|
f"Pursue {news_count} news/media link opportunities. "
|
|
f"Top targets: {', '.join(top_news)}. "
|
|
"Strategy: create newsworthy content, press releases, expert commentary."
|
|
)
|
|
|
|
if "blog" in categorized:
|
|
blog_count = len(categorized["blog"])
|
|
recs.append(
|
|
f"Target {blog_count} blog/content site opportunities via guest posting, "
|
|
"collaborative content, and expert interviews."
|
|
)
|
|
|
|
if "directory" in categorized:
|
|
dir_count = len(categorized["directory"])
|
|
recs.append(
|
|
f"Submit to {dir_count} relevant directories and listing sites. "
|
|
"Low effort, moderate impact for local SEO signals."
|
|
)
|
|
|
|
if "forum" in categorized:
|
|
forum_count = len(categorized["forum"])
|
|
recs.append(
|
|
f"Engage in {forum_count} forum/community sites with helpful answers "
|
|
"and resource sharing. Build presence before linking."
|
|
)
|
|
|
|
if "korean_platform" in categorized:
|
|
kr_count = len(categorized["korean_platform"])
|
|
recs.append(
|
|
f"Build presence on {kr_count} Korean platforms (Naver, Tistory, Brunch). "
|
|
"Critical for Korean SERP visibility."
|
|
)
|
|
|
|
if "edu_gov" in categorized:
|
|
eg_count = len(categorized["edu_gov"])
|
|
recs.append(
|
|
f"Target {eg_count} .edu/.gov link opportunities through scholarship "
|
|
"programs, research partnerships, or government resource contributions."
|
|
)
|
|
|
|
# Multi-competitor overlap
|
|
multi_comp = [o for o in result.top_opportunities if o.competitor_count >= 2]
|
|
if multi_comp:
|
|
recs.append(
|
|
f"{len(multi_comp)} domains link to multiple competitors but not to you. "
|
|
"These are high-priority targets as they validate industry relevance."
|
|
)
|
|
|
|
# Quick wins: high feasibility, moderate impact
|
|
quick_wins = [
|
|
o for o in result.opportunities[:100]
|
|
if o.feasibility_score >= 60 and o.impact_score >= 30
|
|
]
|
|
if quick_wins:
|
|
recs.append(
|
|
f"Prioritize {len(quick_wins)} quick-win opportunities with high "
|
|
"feasibility and moderate impact for fastest link acquisition."
|
|
)
|
|
|
|
result.recommendations = recs
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Output Formatting
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def format_rich_output(result: LinkGapResult) -> None:
|
|
"""Display gap analysis results using Rich tables."""
|
|
console.print(f"\n[bold cyan]Link Gap Analysis: {result.target_domain}[/bold cyan]")
|
|
console.print(f"[dim]vs {', '.join(result.competitor_domains)}[/dim]")
|
|
console.print(f"[dim]Timestamp: {result.timestamp}[/dim]\n")
|
|
|
|
# Summary
|
|
if result.summary:
|
|
summary_table = Table(title="Summary", show_header=True, header_style="bold magenta")
|
|
summary_table.add_column("Metric", style="cyan")
|
|
summary_table.add_column("Value", style="green")
|
|
summary_table.add_row("Target DR", str(result.target_dr))
|
|
summary_table.add_row("Target Referring Domains", str(result.summary.target_refdomains_count))
|
|
summary_table.add_row("Total Gap Opportunities", str(result.summary.total_opportunities))
|
|
summary_table.add_row("Avg Opportunity DR", str(result.summary.avg_dr))
|
|
summary_table.add_row("High-DR Opportunities (50+)", str(result.summary.high_dr_count))
|
|
|
|
for comp, count in result.summary.total_competitor_refdomains.items():
|
|
summary_table.add_row(f" {comp} Refdomains", str(count))
|
|
|
|
console.print(summary_table)
|
|
|
|
# Category breakdown
|
|
if result.summary and result.summary.category_breakdown:
|
|
cat_table = Table(title="\nCategory Breakdown", show_header=True, header_style="bold magenta")
|
|
cat_table.add_column("Category", style="cyan")
|
|
cat_table.add_column("Count", style="green")
|
|
for cat, count in result.summary.category_breakdown.items():
|
|
cat_table.add_row(cat, str(count))
|
|
console.print(cat_table)
|
|
|
|
# Top opportunities
|
|
if result.top_opportunities:
|
|
opp_table = Table(
|
|
title=f"\nTop Opportunities (showing {min(25, len(result.top_opportunities))})",
|
|
show_header=True,
|
|
header_style="bold magenta",
|
|
)
|
|
opp_table.add_column("Domain", style="cyan", max_width=35)
|
|
opp_table.add_column("DR", style="green", justify="right")
|
|
opp_table.add_column("Category", style="yellow")
|
|
opp_table.add_column("Comps", justify="right")
|
|
opp_table.add_column("Score", style="bold green", justify="right")
|
|
opp_table.add_column("Feasibility", justify="right")
|
|
opp_table.add_column("Impact", justify="right")
|
|
|
|
for opp in result.top_opportunities[:25]:
|
|
opp_table.add_row(
|
|
opp.domain[:35],
|
|
str(int(opp.dr)),
|
|
opp.category,
|
|
str(opp.competitor_count),
|
|
f"{opp.overall_score:.1f}",
|
|
f"{opp.feasibility_score:.0f}",
|
|
f"{opp.impact_score:.0f}",
|
|
)
|
|
console.print(opp_table)
|
|
|
|
# Issues
|
|
if result.issues:
|
|
console.print("\n[bold red]Issues:[/bold red]")
|
|
for issue in result.issues:
|
|
icon_map = {"error": "[red]ERROR[/red]", "warning": "[yellow]WARN[/yellow]", "info": "[blue]INFO[/blue]"}
|
|
icon = icon_map.get(issue["type"], "[dim]INFO[/dim]")
|
|
console.print(f" {icon} [{issue['category']}] {issue['message']}")
|
|
|
|
# Recommendations
|
|
if result.recommendations:
|
|
console.print("\n[bold green]Recommendations:[/bold green]")
|
|
for i, rec in enumerate(result.recommendations, 1):
|
|
console.print(f" {i}. {rec}")
|
|
|
|
console.print()
|
|
|
|
|
|
def result_to_dict(result: LinkGapResult) -> dict[str, Any]:
|
|
"""Convert gap result to JSON-serializable dict."""
|
|
return {
|
|
"target_url": result.target_url,
|
|
"target_domain": result.target_domain,
|
|
"target_dr": result.target_dr,
|
|
"competitor_urls": result.competitor_urls,
|
|
"competitor_domains": result.competitor_domains,
|
|
"summary": asdict(result.summary) if result.summary else None,
|
|
"opportunities": [asdict(o) for o in result.opportunities],
|
|
"top_opportunities": [asdict(o) for o in result.top_opportunities],
|
|
"issues": result.issues,
|
|
"recommendations": result.recommendations,
|
|
"timestamp": result.timestamp,
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CLI
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
"""Parse command-line arguments."""
|
|
parser = argparse.ArgumentParser(
|
|
description="Link Gap Finder - Identify link building opportunities vs competitors",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Examples:
|
|
python link_gap_finder.py --target https://example.com --competitor https://comp1.com --json
|
|
python link_gap_finder.py --target https://example.com --competitor https://comp1.com --competitor https://comp2.com --min-dr 30 --json
|
|
python link_gap_finder.py --target https://example.com --competitor https://comp1.com --country kr --output gap_report.json
|
|
""",
|
|
)
|
|
parser.add_argument("--target", required=True, help="Target URL or domain")
|
|
parser.add_argument(
|
|
"--competitor", action="append", required=True,
|
|
help="Competitor URL or domain (can be repeated)",
|
|
)
|
|
parser.add_argument(
|
|
"--min-dr", type=float, default=0,
|
|
help="Minimum DR filter for opportunities (default: 0)",
|
|
)
|
|
parser.add_argument(
|
|
"--country", default="",
|
|
help="Filter by country code (e.g., kr, us, jp)",
|
|
)
|
|
parser.add_argument(
|
|
"--limit", type=int, default=1000,
|
|
help="Max referring domains to fetch per site (default: 1000)",
|
|
)
|
|
parser.add_argument("--json", action="store_true", help="Output as JSON")
|
|
parser.add_argument("--output", "-o", help="Save output to file")
|
|
return parser.parse_args()
|
|
|
|
|
|
async def main() -> None:
|
|
"""Main entry point."""
|
|
args = parse_args()
|
|
|
|
finder = LinkGapFinder()
|
|
|
|
try:
|
|
result = await finder.analyze(
|
|
target_url=args.target,
|
|
competitor_urls=args.competitor,
|
|
min_dr=args.min_dr,
|
|
country_filter=args.country,
|
|
limit=args.limit,
|
|
)
|
|
|
|
if args.json or args.output:
|
|
output_data = result_to_dict(result)
|
|
json_str = json.dumps(output_data, indent=2, ensure_ascii=False)
|
|
|
|
if args.output:
|
|
with open(args.output, "w", encoding="utf-8") as f:
|
|
f.write(json_str)
|
|
logger.info(f"Report saved to {args.output}")
|
|
|
|
if args.json:
|
|
print(json_str)
|
|
else:
|
|
format_rich_output(result)
|
|
|
|
finder.print_stats()
|
|
|
|
except KeyboardInterrupt:
|
|
logger.warning("Analysis interrupted by user")
|
|
sys.exit(1)
|
|
except Exception as e:
|
|
logger.error(f"Analysis failed: {e}")
|
|
if args.json:
|
|
print(json.dumps({"error": str(e)}, indent=2))
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|