""" Link Gap Finder - Competitor link gap analysis =============================================== Purpose: Identify link building opportunities by finding domains that link to competitors but not to the target site via Ahrefs MCP. Python: 3.10+ Usage: python link_gap_finder.py --target https://example.com --competitor https://comp1.com --json python link_gap_finder.py --target https://example.com --competitor https://comp1.com --competitor https://comp2.com --min-dr 30 --json """ from __future__ import annotations import argparse import asyncio import json import logging import re import sys from dataclasses import dataclass, field, asdict from datetime import datetime from typing import Any from urllib.parse import urlparse import aiohttp import pandas as pd from rich.console import Console from rich.table import Table from base_client import BaseAsyncClient, config # --------------------------------------------------------------------------- # Logging # --------------------------------------------------------------------------- logger = logging.getLogger("link_gap_finder") console = Console() # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- AHREFS_BASE = "https://api.ahrefs.com/v3" # Source category detection patterns SOURCE_CATEGORY_PATTERNS: dict[str, list[str]] = { "news": [ "news", "press", "media", "journal", "herald", "times", "post", "gazette", "tribune", "daily", "chosun", "donga", "joongang", "hani", "khan", "yna", "yonhap", "reuters", "bloomberg", "techcrunch", "verge", "wired", "arstechnica", "bbc", "cnn", ], "blog": [ "blog", "wordpress", "medium.com", "tistory.com", "brunch.co.kr", "blog.naver.com", "tumblr", "blogger", "substack", "ghost.io", "velog.io", "dev.to", ], "forum": [ "forum", "community", "discuss", "reddit.com", "quora.com", "stackexchange", "stackoverflow", "cafe.naver.com", "dcinside", "fmkorea", "clien", "ppomppu", "theqoo", "ruliweb", ], "directory": [ "directory", "listing", "yellowpages", "yelp", "bbb.org", "clutch.co", "g2.com", "capterra", "trustpilot", "glassdoor", "dmoz", "aboutus", "hotfrog", "manta", "superpages", ], "edu_gov": [ ".edu", ".gov", ".ac.kr", ".go.kr", ".or.kr", ], "social": [ "facebook.com", "twitter.com", "x.com", "linkedin.com", "instagram.com", "youtube.com", "pinterest.com", "tiktok.com", ], "korean_platform": [ "naver.com", "daum.net", "kakao.com", "tistory.com", "brunch.co.kr", "zum.com", "nate.com", ], } # --------------------------------------------------------------------------- # Dataclasses # --------------------------------------------------------------------------- @dataclass class LinkOpportunity: """A single link building opportunity from gap analysis.""" domain: str dr: float = 0.0 traffic: int = 0 linked_competitors: list[str] = field(default_factory=list) competitor_count: int = 0 not_linked_target: bool = True category: str = "other" feasibility_score: float = 0.0 impact_score: float = 0.0 overall_score: float = 0.0 backlinks_to_competitors: int = 0 country: str = "" top_anchor: str = "" @dataclass class GapSummary: """Summary statistics for the gap analysis.""" total_opportunities: int = 0 avg_dr: float = 0.0 high_dr_count: int = 0 category_breakdown: dict[str, int] = field(default_factory=dict) top_countries: list[dict[str, Any]] = field(default_factory=list) total_competitor_refdomains: dict[str, int] = field(default_factory=dict) target_refdomains_count: int = 0 @dataclass class LinkGapResult: """Complete link gap analysis result.""" target_url: str target_domain: str = "" competitor_urls: list[str] = field(default_factory=list) competitor_domains: list[str] = field(default_factory=list) target_dr: float = 0.0 opportunities: list[LinkOpportunity] = field(default_factory=list) summary: GapSummary | None = None top_opportunities: list[LinkOpportunity] = field(default_factory=list) issues: list[dict[str, str]] = field(default_factory=list) recommendations: list[str] = field(default_factory=list) timestamp: str = "" # --------------------------------------------------------------------------- # LinkGapFinder # --------------------------------------------------------------------------- class LinkGapFinder(BaseAsyncClient): """Find link building opportunities by analyzing competitor backlink gaps.""" def __init__(self, **kwargs): super().__init__(max_concurrent=5, requests_per_second=2.0, **kwargs) self.session: aiohttp.ClientSession | None = None # -- Ahrefs MCP helper --------------------------------------------------- async def _call_ahrefs( self, endpoint: str, params: dict[str, Any] ) -> dict[str, Any]: """ Call Ahrefs API endpoint. In MCP context this calls mcp__ahrefs__. For standalone use, falls back to REST API with token. """ api_token = config.get_required("AHREFS_API_TOKEN") if not self.session else None if self.session and api_token: url = f"{AHREFS_BASE}/{endpoint}" headers = {"Authorization": f"Bearer {api_token}"} async with self.session.get(url, headers=headers, params=params) as resp: resp.raise_for_status() return await resp.json() logger.warning( f"Ahrefs call to '{endpoint}' - use MCP tool " f"mcp__ahrefs__{endpoint.replace('-', '_')} in Claude Desktop" ) return {"endpoint": endpoint, "params": params, "data": [], "note": "mcp_stub"} # -- Core methods -------------------------------------------------------- async def get_referring_domains( self, url: str, limit: int = 1000 ) -> list[dict[str, Any]]: """Fetch referring domains for a given URL/domain.""" target = urlparse(url).netloc or url result = await self._call_ahrefs( "site-explorer-referring-domains", {"target": target, "mode": "domain", "limit": limit, "order_by": "domain_rating:desc"}, ) domains = result.get("data", result.get("refdomains", [])) if isinstance(domains, dict): domains = domains.get("refdomains", []) return domains if isinstance(domains, list) else [] async def get_domain_rating(self, url: str) -> float: """Fetch Domain Rating for a URL.""" target = urlparse(url).netloc or url result = await self._call_ahrefs( "site-explorer-domain-rating", {"target": target}, ) data = result.get("data", result) if isinstance(result, dict) else {} return data.get("domain_rating", 0.0) async def get_domain_metrics(self, url: str) -> dict[str, Any]: """Fetch comprehensive domain metrics.""" target = urlparse(url).netloc or url result = await self._call_ahrefs( "site-explorer-backlinks-stats", {"target": target, "mode": "domain"}, ) data = result.get("data", result) if isinstance(result, dict) else {} return { "total_backlinks": data.get("live", 0), "referring_domains": data.get("live_refdomains", 0), "dofollow": data.get("live_dofollow", 0), } def find_gaps( self, target_domains: set[str], competitor_domain_maps: dict[str, set[str]], ) -> list[dict[str, Any]]: """ Find domains linking to competitors but not to the target. Returns a list of gap domains with metadata about which competitors they link to. """ # Collect all competitor referring domains all_competitor_domains: dict[str, list[str]] = {} for comp_name, comp_domains in competitor_domain_maps.items(): for domain in comp_domains: domain_lower = domain.lower() if domain_lower not in all_competitor_domains: all_competitor_domains[domain_lower] = [] all_competitor_domains[domain_lower].append(comp_name) # Find gaps: in competitor set but not in target set target_set_lower = {d.lower() for d in target_domains} gaps = [] for domain, linked_comps in all_competitor_domains.items(): if domain not in target_set_lower: gaps.append({ "domain": domain, "linked_competitors": linked_comps, "competitor_count": len(set(linked_comps)), }) # Sort by number of competitors linking (more = higher priority) gaps.sort(key=lambda g: g["competitor_count"], reverse=True) return gaps def score_opportunities( self, gaps: list[dict[str, Any]], refdomains_data: dict[str, list[dict[str, Any]]], total_competitors: int, ) -> list[LinkOpportunity]: """ Score gap opportunities by DR, traffic, relevance, and feasibility. Scoring factors: - DR weight: Higher DR = more impactful link - Competitor overlap: More competitors linking = easier to acquire - Category bonus: Editorial/news links valued higher - Traffic bonus: Higher traffic domains valued more """ # Build a lookup of domain metadata from competitor refdomains domain_metadata: dict[str, dict[str, Any]] = {} for comp_url, domains in refdomains_data.items(): for rd in domains: d = rd.get("domain", rd.get("domain_from", "")).lower() if d and d not in domain_metadata: domain_metadata[d] = { "dr": rd.get("domain_rating", rd.get("dr", 0)), "traffic": rd.get("organic_traffic", rd.get("traffic", 0)), "backlinks": rd.get("backlinks", 0), "country": rd.get("country", ""), } opportunities = [] for gap in gaps: domain = gap["domain"] meta = domain_metadata.get(domain, {}) dr = meta.get("dr", 0) traffic = meta.get("traffic", 0) comp_count = gap["competitor_count"] # Category detection category = self._detect_category(domain) # Feasibility score (0-100) # Higher if: more competitors link (social proof), blog/forum (easier outreach) feasibility = min(100, ( (comp_count / max(total_competitors, 1)) * 40 # Competitor overlap + (30 if category in ("blog", "forum", "directory") else 10) # Category ease + (20 if dr < 60 else 5) # Lower DR = easier to get link from + (10 if traffic > 0 else 0) # Active site bonus )) # Impact score (0-100) # Higher if: high DR, high traffic, editorial/news impact = min(100, ( min(dr, 100) * 0.4 # DR weight (40%) + min(traffic / 1000, 30) # Traffic weight (up to 30) + (20 if category in ("news", "edu_gov") else 5) # Authority bonus + (comp_count / max(total_competitors, 1)) * 10 # Validation )) # Overall score = weighted average overall = round(feasibility * 0.4 + impact * 0.6, 1) opp = LinkOpportunity( domain=domain, dr=dr, traffic=traffic, linked_competitors=gap["linked_competitors"], competitor_count=comp_count, not_linked_target=True, category=category, feasibility_score=round(feasibility, 1), impact_score=round(impact, 1), overall_score=overall, backlinks_to_competitors=meta.get("backlinks", 0), country=meta.get("country", ""), ) opportunities.append(opp) # Sort by overall score descending opportunities.sort(key=lambda o: o.overall_score, reverse=True) return opportunities def categorize_sources( self, opportunities: list[LinkOpportunity] ) -> dict[str, list[LinkOpportunity]]: """Group opportunities by source category.""" categorized: dict[str, list[LinkOpportunity]] = {} for opp in opportunities: cat = opp.category if cat not in categorized: categorized[cat] = [] categorized[cat].append(opp) return categorized # -- Orchestration ------------------------------------------------------- async def analyze( self, target_url: str, competitor_urls: list[str], min_dr: float = 0, country_filter: str = "", limit: int = 1000, ) -> LinkGapResult: """Orchestrate full link gap analysis.""" target_domain = urlparse(target_url).netloc or target_url comp_domains = [urlparse(c).netloc or c for c in competitor_urls] logger.info(f"Starting link gap analysis: {target_domain} vs {comp_domains}") result = LinkGapResult( target_url=target_url, target_domain=target_domain, competitor_urls=competitor_urls, competitor_domains=comp_domains, timestamp=datetime.now().isoformat(), ) # Phase 1: Fetch target DR and referring domains logger.info("Phase 1: Fetching target data...") target_dr_task = self.get_domain_rating(target_url) target_rd_task = self.get_referring_domains(target_url, limit=limit) target_dr, target_refdomains = await asyncio.gather( target_dr_task, target_rd_task, return_exceptions=True, ) result.target_dr = target_dr if isinstance(target_dr, (int, float)) else 0 target_rd_list = target_refdomains if isinstance(target_refdomains, list) else [] target_domain_set = { rd.get("domain", rd.get("domain_from", "")).lower() for rd in target_rd_list if rd.get("domain", rd.get("domain_from", "")) } # Phase 2: Fetch competitor referring domains (parallel) logger.info("Phase 2: Fetching competitor data...") comp_rd_tasks = { comp_url: self.get_referring_domains(comp_url, limit=limit) for comp_url in competitor_urls } comp_results = {} for comp_url, task in comp_rd_tasks.items(): try: comp_rd = await task comp_results[comp_url] = comp_rd if isinstance(comp_rd, list) else [] except Exception as e: logger.error(f"Failed to fetch refdomains for {comp_url}: {e}") comp_results[comp_url] = [] # Build competitor domain maps competitor_domain_maps: dict[str, set[str]] = {} for comp_url, rd_list in comp_results.items(): comp_domain = urlparse(comp_url).netloc or comp_url competitor_domain_maps[comp_domain] = { rd.get("domain", rd.get("domain_from", "")).lower() for rd in rd_list if rd.get("domain", rd.get("domain_from", "")) } # Phase 3: Find gaps logger.info("Phase 3: Finding link gaps...") raw_gaps = self.find_gaps(target_domain_set, competitor_domain_maps) logger.info(f"Found {len(raw_gaps)} gap domains") # Phase 4: Score opportunities logger.info("Phase 4: Scoring opportunities...") opportunities = self.score_opportunities( raw_gaps, comp_results, len(competitor_urls) ) # Apply filters if min_dr > 0: opportunities = [o for o in opportunities if o.dr >= min_dr] if country_filter: country_lower = country_filter.lower() opportunities = [ o for o in opportunities if o.country.lower() == country_lower or not o.country ] result.opportunities = opportunities result.top_opportunities = opportunities[:50] # Phase 5: Build summary logger.info("Phase 5: Building summary...") result.summary = self._build_summary( opportunities, comp_results, len(target_rd_list) ) # Phase 6: Generate recommendations self._generate_issues(result) self._generate_recommendations(result) logger.info(f"Link gap analysis complete: {len(opportunities)} opportunities found") return result # -- Helpers ------------------------------------------------------------- @staticmethod def _detect_category(domain: str) -> str: """Detect the category of a domain based on patterns.""" domain_lower = domain.lower() for category, patterns in SOURCE_CATEGORY_PATTERNS.items(): for pattern in patterns: if pattern in domain_lower: return category # Fallback heuristics if domain_lower.endswith((".edu", ".ac.kr", ".gov", ".go.kr")): return "edu_gov" return "other" def _build_summary( self, opportunities: list[LinkOpportunity], comp_results: dict[str, list], target_rd_count: int, ) -> GapSummary: """Build summary statistics from opportunities.""" summary = GapSummary() summary.total_opportunities = len(opportunities) summary.target_refdomains_count = target_rd_count if opportunities: dr_values = [o.dr for o in opportunities if o.dr > 0] summary.avg_dr = round(sum(dr_values) / max(len(dr_values), 1), 1) summary.high_dr_count = sum(1 for o in opportunities if o.dr >= 50) # Category breakdown cat_counts: dict[str, int] = {} country_counts: dict[str, int] = {} for opp in opportunities: cat_counts[opp.category] = cat_counts.get(opp.category, 0) + 1 if opp.country: country_counts[opp.country] = country_counts.get(opp.country, 0) + 1 summary.category_breakdown = dict( sorted(cat_counts.items(), key=lambda x: x[1], reverse=True) ) summary.top_countries = sorted( [{"country": k, "count": v} for k, v in country_counts.items()], key=lambda x: x["count"], reverse=True, )[:10] # Competitor refdomains counts for comp_url, rd_list in comp_results.items(): comp_domain = urlparse(comp_url).netloc or comp_url summary.total_competitor_refdomains[comp_domain] = len(rd_list) return summary def _generate_issues(self, result: LinkGapResult) -> None: """Generate issues based on gap analysis.""" issues = [] if result.summary: # Large gap warning if result.summary.total_opportunities > 500: issues.append({ "type": "warning", "category": "link_gap", "message": ( f"Large link gap: {result.summary.total_opportunities} domains " "link to competitors but not to you" ), }) # High-DR gap if result.summary.high_dr_count > 50: issues.append({ "type": "error", "category": "authority_gap", "message": ( f"{result.summary.high_dr_count} high-authority domains (DR 50+) " "link to competitors but not to you" ), }) # Category-specific gaps news_gap = result.summary.category_breakdown.get("news", 0) if news_gap > 20: issues.append({ "type": "warning", "category": "pr_gap", "message": f"{news_gap} news/media domains link to competitors - consider digital PR", }) edu_gap = result.summary.category_breakdown.get("edu_gov", 0) if edu_gap > 5: issues.append({ "type": "info", "category": "edu_gov_gap", "message": f"{edu_gap} .edu/.gov domains link to competitors - high-authority opportunity", }) result.issues = issues def _generate_recommendations(self, result: LinkGapResult) -> None: """Generate actionable recommendations.""" recs = [] if not result.opportunities: recs.append("No significant link gaps found. Consider expanding competitor list.") result.recommendations = recs return # Top opportunities by category categorized = self.categorize_sources(result.top_opportunities[:100]) if "news" in categorized: news_count = len(categorized["news"]) top_news = [o.domain for o in categorized["news"][:3]] recs.append( f"Pursue {news_count} news/media link opportunities. " f"Top targets: {', '.join(top_news)}. " "Strategy: create newsworthy content, press releases, expert commentary." ) if "blog" in categorized: blog_count = len(categorized["blog"]) recs.append( f"Target {blog_count} blog/content site opportunities via guest posting, " "collaborative content, and expert interviews." ) if "directory" in categorized: dir_count = len(categorized["directory"]) recs.append( f"Submit to {dir_count} relevant directories and listing sites. " "Low effort, moderate impact for local SEO signals." ) if "forum" in categorized: forum_count = len(categorized["forum"]) recs.append( f"Engage in {forum_count} forum/community sites with helpful answers " "and resource sharing. Build presence before linking." ) if "korean_platform" in categorized: kr_count = len(categorized["korean_platform"]) recs.append( f"Build presence on {kr_count} Korean platforms (Naver, Tistory, Brunch). " "Critical for Korean SERP visibility." ) if "edu_gov" in categorized: eg_count = len(categorized["edu_gov"]) recs.append( f"Target {eg_count} .edu/.gov link opportunities through scholarship " "programs, research partnerships, or government resource contributions." ) # Multi-competitor overlap multi_comp = [o for o in result.top_opportunities if o.competitor_count >= 2] if multi_comp: recs.append( f"{len(multi_comp)} domains link to multiple competitors but not to you. " "These are high-priority targets as they validate industry relevance." ) # Quick wins: high feasibility, moderate impact quick_wins = [ o for o in result.opportunities[:100] if o.feasibility_score >= 60 and o.impact_score >= 30 ] if quick_wins: recs.append( f"Prioritize {len(quick_wins)} quick-win opportunities with high " "feasibility and moderate impact for fastest link acquisition." ) result.recommendations = recs # --------------------------------------------------------------------------- # Output Formatting # --------------------------------------------------------------------------- def format_rich_output(result: LinkGapResult) -> None: """Display gap analysis results using Rich tables.""" console.print(f"\n[bold cyan]Link Gap Analysis: {result.target_domain}[/bold cyan]") console.print(f"[dim]vs {', '.join(result.competitor_domains)}[/dim]") console.print(f"[dim]Timestamp: {result.timestamp}[/dim]\n") # Summary if result.summary: summary_table = Table(title="Summary", show_header=True, header_style="bold magenta") summary_table.add_column("Metric", style="cyan") summary_table.add_column("Value", style="green") summary_table.add_row("Target DR", str(result.target_dr)) summary_table.add_row("Target Referring Domains", str(result.summary.target_refdomains_count)) summary_table.add_row("Total Gap Opportunities", str(result.summary.total_opportunities)) summary_table.add_row("Avg Opportunity DR", str(result.summary.avg_dr)) summary_table.add_row("High-DR Opportunities (50+)", str(result.summary.high_dr_count)) for comp, count in result.summary.total_competitor_refdomains.items(): summary_table.add_row(f" {comp} Refdomains", str(count)) console.print(summary_table) # Category breakdown if result.summary and result.summary.category_breakdown: cat_table = Table(title="\nCategory Breakdown", show_header=True, header_style="bold magenta") cat_table.add_column("Category", style="cyan") cat_table.add_column("Count", style="green") for cat, count in result.summary.category_breakdown.items(): cat_table.add_row(cat, str(count)) console.print(cat_table) # Top opportunities if result.top_opportunities: opp_table = Table( title=f"\nTop Opportunities (showing {min(25, len(result.top_opportunities))})", show_header=True, header_style="bold magenta", ) opp_table.add_column("Domain", style="cyan", max_width=35) opp_table.add_column("DR", style="green", justify="right") opp_table.add_column("Category", style="yellow") opp_table.add_column("Comps", justify="right") opp_table.add_column("Score", style="bold green", justify="right") opp_table.add_column("Feasibility", justify="right") opp_table.add_column("Impact", justify="right") for opp in result.top_opportunities[:25]: opp_table.add_row( opp.domain[:35], str(int(opp.dr)), opp.category, str(opp.competitor_count), f"{opp.overall_score:.1f}", f"{opp.feasibility_score:.0f}", f"{opp.impact_score:.0f}", ) console.print(opp_table) # Issues if result.issues: console.print("\n[bold red]Issues:[/bold red]") for issue in result.issues: icon_map = {"error": "[red]ERROR[/red]", "warning": "[yellow]WARN[/yellow]", "info": "[blue]INFO[/blue]"} icon = icon_map.get(issue["type"], "[dim]INFO[/dim]") console.print(f" {icon} [{issue['category']}] {issue['message']}") # Recommendations if result.recommendations: console.print("\n[bold green]Recommendations:[/bold green]") for i, rec in enumerate(result.recommendations, 1): console.print(f" {i}. {rec}") console.print() def result_to_dict(result: LinkGapResult) -> dict[str, Any]: """Convert gap result to JSON-serializable dict.""" return { "target_url": result.target_url, "target_domain": result.target_domain, "target_dr": result.target_dr, "competitor_urls": result.competitor_urls, "competitor_domains": result.competitor_domains, "summary": asdict(result.summary) if result.summary else None, "opportunities": [asdict(o) for o in result.opportunities], "top_opportunities": [asdict(o) for o in result.top_opportunities], "issues": result.issues, "recommendations": result.recommendations, "timestamp": result.timestamp, } # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- def parse_args() -> argparse.Namespace: """Parse command-line arguments.""" parser = argparse.ArgumentParser( description="Link Gap Finder - Identify link building opportunities vs competitors", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python link_gap_finder.py --target https://example.com --competitor https://comp1.com --json python link_gap_finder.py --target https://example.com --competitor https://comp1.com --competitor https://comp2.com --min-dr 30 --json python link_gap_finder.py --target https://example.com --competitor https://comp1.com --country kr --output gap_report.json """, ) parser.add_argument("--target", required=True, help="Target URL or domain") parser.add_argument( "--competitor", action="append", required=True, help="Competitor URL or domain (can be repeated)", ) parser.add_argument( "--min-dr", type=float, default=0, help="Minimum DR filter for opportunities (default: 0)", ) parser.add_argument( "--country", default="", help="Filter by country code (e.g., kr, us, jp)", ) parser.add_argument( "--limit", type=int, default=1000, help="Max referring domains to fetch per site (default: 1000)", ) parser.add_argument("--json", action="store_true", help="Output as JSON") parser.add_argument("--output", "-o", help="Save output to file") return parser.parse_args() async def main() -> None: """Main entry point.""" args = parse_args() finder = LinkGapFinder() try: result = await finder.analyze( target_url=args.target, competitor_urls=args.competitor, min_dr=args.min_dr, country_filter=args.country, limit=args.limit, ) if args.json or args.output: output_data = result_to_dict(result) json_str = json.dumps(output_data, indent=2, ensure_ascii=False) if args.output: with open(args.output, "w", encoding="utf-8") as f: f.write(json_str) logger.info(f"Report saved to {args.output}") if args.json: print(json_str) else: format_rich_output(result) finder.print_stats() except KeyboardInterrupt: logger.warning("Analysis interrupted by user") sys.exit(1) except Exception as e: logger.error(f"Analysis failed: {e}") if args.json: print(json.dumps({"error": str(e)}, indent=2)) sys.exit(1) if __name__ == "__main__": asyncio.run(main())