Files
our-claude-skills/custom-skills/22-seo-link-building/code/scripts/backlink_auditor.py
Andrew Yim a3ff965b87 Add SEO skills 19-28, 31-32 with full Python implementations
12 new skills: Keyword Strategy, SERP Analysis, Position Tracking,
Link Building, Content Strategy, E-Commerce SEO, KPI Framework,
International SEO, AI Visibility, Knowledge Graph, Competitor Intel,
and Crawl Budget. ~20K lines of Python across 25 domain scripts.
Updated skill 11 pipeline table and repo CLAUDE.md.
Enhanced skill 18 local SEO workflow from jamie.clinic audit.

Note: Skill 26 hreflang_validator.py pending (content filter block).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-13 12:05:59 +09:00

1080 lines
41 KiB
Python

"""
Backlink Auditor - Backlink profile analysis and toxic link detection
=====================================================================
Purpose: Analyze backlink profiles via Ahrefs MCP, detect toxic links,
track link velocity, and map Korean platform links.
Python: 3.10+
Usage:
python backlink_auditor.py --url https://example.com --json
python backlink_auditor.py --url https://example.com --velocity --broken --json
python backlink_auditor.py --url https://example.com --korean-platforms --json
"""
from __future__ import annotations
import argparse
import asyncio
import json
import logging
import re
import sys
from dataclasses import dataclass, field, asdict
from datetime import datetime, timedelta
from typing import Any
from urllib.parse import urlparse
import aiohttp
import pandas as pd
from rich.console import Console
from rich.table import Table
from base_client import BaseAsyncClient, config
# ---------------------------------------------------------------------------
# Logging
# ---------------------------------------------------------------------------
logger = logging.getLogger("backlink_auditor")
console = Console()
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
AHREFS_BASE = "https://api.ahrefs.com/v3"
# Korean platform domain patterns
KOREAN_PLATFORM_PATTERNS: dict[str, list[str]] = {
"naver_blog": ["blog.naver.com"],
"naver_cafe": ["cafe.naver.com"],
"naver_post": ["post.naver.com"],
"naver_kin": ["kin.naver.com"],
"tistory": ["tistory.com"],
"brunch": ["brunch.co.kr"],
"daum_blog": ["blog.daum.net"],
"korean_news": [
"chosun.com", "donga.com", "joongang.co.kr", "hani.co.kr",
"khan.co.kr", "mk.co.kr", "mt.co.kr", "hankyung.com",
"sedaily.com", "edaily.co.kr", "newsis.com", "yna.co.kr",
"yonhapnews.co.kr", "news1.kr", "newspim.com", "etnews.com",
"zdnet.co.kr", "bloter.net", "platum.kr", "besuccess.com",
],
"korean_community": [
"dcinside.com", "theqoo.net", "fmkorea.com", "ruliweb.com",
"ppomppu.co.kr", "clien.net", "mlbpark.donga.com",
],
}
# Suspicious TLD patterns for toxic link detection
SUSPICIOUS_TLDS = {
".xyz", ".top", ".club", ".work", ".date", ".bid",
".stream", ".download", ".win", ".racing", ".review",
".accountant", ".loan", ".click", ".link", ".gdn",
}
# Generic anchor text patterns
GENERIC_ANCHORS = {
"click here", "read more", "learn more", "visit", "here",
"website", "source", "link", "this", "more info",
"go to", "check out", "see more", "view", "details",
}
# ---------------------------------------------------------------------------
# Dataclasses
# ---------------------------------------------------------------------------
@dataclass
class BacklinkEntry:
"""Single backlink record."""
source_url: str
source_domain: str
dr: float = 0.0
anchor: str = ""
link_type: str = "text"
dofollow: bool = True
first_seen: str = ""
url_to: str = ""
@dataclass
class AnchorDistribution:
"""Anchor text distribution breakdown."""
branded: int = 0
exact_match: int = 0
partial_match: int = 0
generic: int = 0
naked_url: int = 0
other: int = 0
total: int = 0
top_anchors: list[dict[str, Any]] = field(default_factory=list)
@property
def branded_pct(self) -> float:
return round(self.branded / max(self.total, 1) * 100, 1)
@property
def exact_match_pct(self) -> float:
return round(self.exact_match / max(self.total, 1) * 100, 1)
@property
def generic_pct(self) -> float:
return round(self.generic / max(self.total, 1) * 100, 1)
@property
def naked_url_pct(self) -> float:
return round(self.naked_url / max(self.total, 1) * 100, 1)
@dataclass
class ToxicLink:
"""Potentially toxic or spammy backlink."""
url: str
domain: str
reason: str
risk_score: float = 0.0
dr: float = 0.0
anchor: str = ""
first_seen: str = ""
@dataclass
class KoreanPlatformStats:
"""Backlink counts from Korean platforms."""
naver_blog: int = 0
naver_cafe: int = 0
naver_post: int = 0
naver_kin: int = 0
tistory: int = 0
brunch: int = 0
daum_blog: int = 0
korean_news: int = 0
korean_community: int = 0
total_korean: int = 0
korean_domains: list[dict[str, Any]] = field(default_factory=list)
@dataclass
class LinkVelocity:
"""Link acquisition and loss velocity."""
new_last_7d: int = 0
new_last_30d: int = 0
new_last_90d: int = 0
lost_last_7d: int = 0
lost_last_30d: int = 0
lost_last_90d: int = 0
velocity_trend: str = "stable"
history: list[dict[str, Any]] = field(default_factory=list)
@dataclass
class BacklinkAuditResult:
"""Complete backlink audit result."""
url: str
domain: str = ""
domain_rating: float = 0.0
total_backlinks: int = 0
referring_domains: int = 0
dofollow_backlinks: int = 0
nofollow_backlinks: int = 0
dofollow_ratio: float = 0.0
edu_gov_backlinks: int = 0
anchor_distribution: AnchorDistribution | None = None
toxic_links: list[ToxicLink] = field(default_factory=list)
toxic_link_count: int = 0
toxic_risk_level: str = "low"
broken_backlinks: list[dict[str, Any]] = field(default_factory=list)
korean_platforms: KoreanPlatformStats | None = None
link_velocity: LinkVelocity | None = None
dr_distribution: dict[str, int] = field(default_factory=dict)
country_distribution: list[dict[str, Any]] = field(default_factory=list)
issues: list[dict[str, str]] = field(default_factory=list)
recommendations: list[str] = field(default_factory=list)
timestamp: str = ""
# ---------------------------------------------------------------------------
# BacklinkAuditor
# ---------------------------------------------------------------------------
class BacklinkAuditor(BaseAsyncClient):
"""Analyze backlink profiles using Ahrefs MCP tools."""
def __init__(self, **kwargs):
super().__init__(max_concurrent=5, requests_per_second=2.0, **kwargs)
self.session: aiohttp.ClientSession | None = None
# -- Ahrefs MCP helper ---------------------------------------------------
async def _call_ahrefs(
self, endpoint: str, params: dict[str, Any]
) -> dict[str, Any]:
"""
Call Ahrefs API endpoint.
In MCP context this would call mcp__ahrefs__<endpoint>.
For standalone use, falls back to REST API with token.
"""
api_token = config.get_required("AHREFS_API_TOKEN") if not self.session else None
if self.session and api_token:
url = f"{AHREFS_BASE}/{endpoint}"
headers = {"Authorization": f"Bearer {api_token}"}
async with self.session.get(url, headers=headers, params=params) as resp:
resp.raise_for_status()
return await resp.json()
# Fallback: return structured empty result for MCP-only usage
logger.warning(
f"Ahrefs call to '{endpoint}' - use MCP tool "
f"mcp__ahrefs__{endpoint.replace('-', '_')} in Claude Desktop"
)
return {"endpoint": endpoint, "params": params, "data": [], "note": "mcp_stub"}
# -- Core methods --------------------------------------------------------
async def get_backlink_stats(self, url: str) -> dict[str, Any]:
"""Fetch backlink statistics overview via site-explorer-backlinks-stats."""
target = urlparse(url).netloc or url
result = await self._call_ahrefs(
"site-explorer-backlinks-stats",
{"target": target, "mode": "domain"},
)
stats = result.get("data", result) if isinstance(result, dict) else {}
return {
"total_backlinks": stats.get("live", 0),
"referring_domains": stats.get("live_refdomains", 0),
"dofollow": stats.get("live_dofollow", 0),
"nofollow": stats.get("live_nofollow", 0),
"edu": stats.get("edu", 0),
"gov": stats.get("gov", 0),
"raw": stats,
}
async def get_domain_rating(self, url: str) -> dict[str, Any]:
"""Fetch Domain Rating via site-explorer-domain-rating."""
target = urlparse(url).netloc or url
result = await self._call_ahrefs(
"site-explorer-domain-rating",
{"target": target},
)
data = result.get("data", result) if isinstance(result, dict) else {}
return {
"domain_rating": data.get("domain_rating", 0),
"ahrefs_rank": data.get("ahrefs_rank", 0),
}
async def get_referring_domains(
self, url: str, limit: int = 1000
) -> list[dict[str, Any]]:
"""List referring domains via site-explorer-referring-domains."""
target = urlparse(url).netloc or url
result = await self._call_ahrefs(
"site-explorer-referring-domains",
{"target": target, "mode": "domain", "limit": limit, "order_by": "domain_rating:desc"},
)
domains = result.get("data", result.get("refdomains", []))
if isinstance(domains, dict):
domains = domains.get("refdomains", [])
return domains if isinstance(domains, list) else []
async def get_all_backlinks(
self, url: str, limit: int = 1000
) -> list[BacklinkEntry]:
"""Fetch all backlinks via site-explorer-all-backlinks."""
target = urlparse(url).netloc or url
result = await self._call_ahrefs(
"site-explorer-all-backlinks",
{"target": target, "mode": "domain", "limit": limit, "order_by": "domain_rating:desc"},
)
raw_links = result.get("data", result.get("backlinks", []))
if isinstance(raw_links, dict):
raw_links = raw_links.get("backlinks", [])
backlinks = []
for link in (raw_links if isinstance(raw_links, list) else []):
entry = BacklinkEntry(
source_url=link.get("url_from", ""),
source_domain=link.get("domain_from", ""),
dr=link.get("domain_rating", 0),
anchor=link.get("anchor", ""),
link_type=link.get("type", "text"),
dofollow=not link.get("nofollow", False),
first_seen=link.get("first_seen", ""),
url_to=link.get("url_to", ""),
)
backlinks.append(entry)
return backlinks
async def analyze_anchors(
self, url: str, brand_name: str = ""
) -> AnchorDistribution:
"""Analyze anchor text distribution via site-explorer-anchors."""
target = urlparse(url).netloc or url
if not brand_name:
brand_name = target.replace("www.", "").split(".")[0]
result = await self._call_ahrefs(
"site-explorer-anchors",
{"target": target, "mode": "domain", "limit": 500, "order_by": "backlinks:desc"},
)
raw_anchors = result.get("data", result.get("anchors", []))
if isinstance(raw_anchors, dict):
raw_anchors = raw_anchors.get("anchors", [])
dist = AnchorDistribution()
top_list = []
for item in (raw_anchors if isinstance(raw_anchors, list) else []):
anchor_text = item.get("anchor", "").strip().lower()
count = item.get("backlinks", 1)
dist.total += count
top_list.append({
"anchor": item.get("anchor", ""),
"backlinks": count,
"referring_domains": item.get("refdomains", 0),
})
# Classify anchor text
if self._is_branded_anchor(anchor_text, brand_name, target):
dist.branded += count
elif self._is_naked_url(anchor_text, target):
dist.naked_url += count
elif self._is_generic_anchor(anchor_text):
dist.generic += count
elif self._is_exact_match(anchor_text, brand_name):
dist.exact_match += count
else:
dist.partial_match += count
dist.top_anchors = sorted(top_list, key=lambda x: x["backlinks"], reverse=True)[:20]
return dist
async def detect_toxic_links(
self, backlinks: list[BacklinkEntry]
) -> list[ToxicLink]:
"""Identify potentially toxic backlinks using heuristic scoring."""
toxic_links = []
for link in backlinks:
risk_score = 0.0
reasons = []
# Check 1: Suspicious TLD
domain_tld = self._extract_tld(link.source_domain)
if domain_tld in SUSPICIOUS_TLDS:
risk_score += 25.0
reasons.append(f"Suspicious TLD: {domain_tld}")
# Check 2: Very low DR (potential PBN or link farm)
if link.dr < 5 and link.source_domain:
risk_score += 20.0
reasons.append(f"Very low DR ({link.dr})")
elif link.dr < 10:
risk_score += 10.0
reasons.append(f"Low DR ({link.dr})")
# Check 3: Spammy anchor patterns
anchor_lower = link.anchor.lower().strip()
spam_keywords = [
"casino", "poker", "viagra", "cialis", "payday",
"loan", "buy cheap", "free download", "adult",
"gambling", "betting", "porn", "xxx", "sex",
"weight loss", "diet pill", "crypto trading",
]
for kw in spam_keywords:
if kw in anchor_lower:
risk_score += 30.0
reasons.append(f"Spam keyword in anchor: '{kw}'")
break
# Check 4: Numeric/random domain patterns (e.g., abc123xyz.com)
domain_base = link.source_domain.split(".")[0] if link.source_domain else ""
if domain_base and re.match(r"^[a-z0-9]{15,}$", domain_base):
risk_score += 15.0
reasons.append("Random/generated domain name")
# Check 5: Excessive hyphens in domain
if domain_base.count("-") >= 3:
risk_score += 15.0
reasons.append("Excessive hyphens in domain")
# Check 6: Domain contains spam-related words
domain_spam_words = [
"seo", "link", "backlink", "directory", "submit",
"free-", "cheap-", "best-", "buy-",
]
for sw in domain_spam_words:
if sw in link.source_domain.lower():
risk_score += 10.0
reasons.append(f"Spam word in domain: '{sw}'")
break
# Check 7: Very long domain name
if len(domain_base) > 30:
risk_score += 10.0
reasons.append("Unusually long domain name")
# Threshold: toxic if score >= 30
if risk_score >= 30.0:
toxic = ToxicLink(
url=link.source_url,
domain=link.source_domain,
reason="; ".join(reasons),
risk_score=min(risk_score, 100.0),
dr=link.dr,
anchor=link.anchor,
first_seen=link.first_seen,
)
toxic_links.append(toxic)
# Sort by risk score descending
toxic_links.sort(key=lambda t: t.risk_score, reverse=True)
return toxic_links
async def find_broken_backlinks(self, url: str) -> list[dict[str, Any]]:
"""Find broken backlinks for recovery via site-explorer-broken-backlinks."""
target = urlparse(url).netloc or url
result = await self._call_ahrefs(
"site-explorer-broken-backlinks",
{"target": target, "mode": "domain", "limit": 200, "order_by": "domain_rating:desc"},
)
raw = result.get("data", result.get("backlinks", []))
if isinstance(raw, dict):
raw = raw.get("backlinks", [])
broken = []
for item in (raw if isinstance(raw, list) else []):
broken.append({
"source_url": item.get("url_from", ""),
"source_domain": item.get("domain_from", ""),
"target_url": item.get("url_to", ""),
"http_code": item.get("http_code", 404),
"anchor": item.get("anchor", ""),
"dr": item.get("domain_rating", 0),
"first_seen": item.get("first_seen", ""),
})
broken.sort(key=lambda b: b.get("dr", 0), reverse=True)
return broken
async def track_velocity(self, url: str) -> LinkVelocity:
"""Track new/lost referring domains via refdomains-history."""
target = urlparse(url).netloc or url
today = datetime.now()
result = await self._call_ahrefs(
"site-explorer-refdomains-history",
{
"target": target,
"mode": "domain",
"date_from": (today - timedelta(days=90)).strftime("%Y-%m-%d"),
},
)
raw_history = result.get("data", result.get("history", []))
if isinstance(raw_history, dict):
raw_history = raw_history.get("history", [])
velocity = LinkVelocity()
history_points = []
if isinstance(raw_history, list) and len(raw_history) >= 2:
for point in raw_history:
history_points.append({
"date": point.get("date", ""),
"referring_domains": point.get("refdomains", point.get("referring_domains", 0)),
})
# Calculate velocity from history deltas
sorted_history = sorted(history_points, key=lambda h: h["date"])
if len(sorted_history) >= 2:
latest = sorted_history[-1].get("referring_domains", 0)
d7_ago = self._find_closest_point(sorted_history, 7)
d30_ago = self._find_closest_point(sorted_history, 30)
d90_ago = self._find_closest_point(sorted_history, 90)
velocity.new_last_7d = max(0, latest - d7_ago)
velocity.new_last_30d = max(0, latest - d30_ago)
velocity.new_last_90d = max(0, latest - d90_ago)
# Estimate lost (simplified: if delta is negative)
velocity.lost_last_7d = max(0, d7_ago - latest)
velocity.lost_last_30d = max(0, d30_ago - latest)
velocity.lost_last_90d = max(0, d90_ago - latest)
# Determine trend
if velocity.new_last_30d > velocity.lost_last_30d * 2:
velocity.velocity_trend = "growing"
elif velocity.lost_last_30d > velocity.new_last_30d * 2:
velocity.velocity_trend = "declining"
else:
velocity.velocity_trend = "stable"
velocity.history = history_points[-30:] # Last 30 data points
return velocity
def map_korean_platforms(
self, referring_domains: list[dict[str, Any]]
) -> KoreanPlatformStats:
"""Categorize referring domains by Korean platform."""
stats = KoreanPlatformStats()
korean_detail = []
for rd in referring_domains:
domain = rd.get("domain", rd.get("domain_from", "")).lower()
matched_platform = None
for platform, patterns in KOREAN_PLATFORM_PATTERNS.items():
for pattern in patterns:
if pattern in domain:
matched_platform = platform
break
if matched_platform:
break
if matched_platform:
current_val = getattr(stats, matched_platform, 0)
setattr(stats, matched_platform, current_val + 1)
stats.total_korean += 1
korean_detail.append({
"domain": domain,
"platform": matched_platform,
"dr": rd.get("domain_rating", rd.get("dr", 0)),
"backlinks": rd.get("backlinks", 0),
})
stats.korean_domains = sorted(
korean_detail, key=lambda d: d.get("dr", 0), reverse=True
)
return stats
# -- Orchestration -------------------------------------------------------
async def audit(
self,
url: str,
include_velocity: bool = False,
include_broken: bool = False,
include_korean: bool = False,
brand_name: str = "",
) -> BacklinkAuditResult:
"""Orchestrate a full backlink audit."""
domain = urlparse(url).netloc or url
logger.info(f"Starting backlink audit for: {domain}")
result = BacklinkAuditResult(
url=url,
domain=domain,
timestamp=datetime.now().isoformat(),
)
# Phase 1: Core metrics (parallel)
logger.info("Phase 1: Fetching core metrics...")
stats_task = self.get_backlink_stats(url)
dr_task = self.get_domain_rating(url)
anchors_task = self.analyze_anchors(url, brand_name)
backlinks_task = self.get_all_backlinks(url, limit=1000)
refdomains_task = self.get_referring_domains(url, limit=1000)
stats, dr_info, anchors, backlinks, refdomains = await asyncio.gather(
stats_task, dr_task, anchors_task, backlinks_task, refdomains_task,
return_exceptions=True,
)
# Process stats
if isinstance(stats, dict):
result.total_backlinks = stats.get("total_backlinks", 0)
result.referring_domains = stats.get("referring_domains", 0)
result.dofollow_backlinks = stats.get("dofollow", 0)
result.nofollow_backlinks = stats.get("nofollow", 0)
result.edu_gov_backlinks = stats.get("edu", 0) + stats.get("gov", 0)
total = result.dofollow_backlinks + result.nofollow_backlinks
result.dofollow_ratio = round(
result.dofollow_backlinks / max(total, 1), 2
)
# Process DR
if isinstance(dr_info, dict):
result.domain_rating = dr_info.get("domain_rating", 0)
# Process anchors
if isinstance(anchors, AnchorDistribution):
result.anchor_distribution = anchors
# Phase 2: Toxic link detection
if isinstance(backlinks, list):
logger.info("Phase 2: Detecting toxic links...")
result.toxic_links = await self.detect_toxic_links(backlinks)
result.toxic_link_count = len(result.toxic_links)
# Determine toxic risk level
if result.toxic_link_count > 50:
result.toxic_risk_level = "critical"
elif result.toxic_link_count > 20:
result.toxic_risk_level = "high"
elif result.toxic_link_count > 5:
result.toxic_risk_level = "medium"
else:
result.toxic_risk_level = "low"
# Phase 3: DR distribution of referring domains
if isinstance(refdomains, list):
dr_dist = {"0-10": 0, "11-20": 0, "21-30": 0, "31-40": 0,
"41-50": 0, "51-60": 0, "61-70": 0, "71-80": 0,
"81-90": 0, "91-100": 0}
country_counts: dict[str, int] = {}
for rd in refdomains:
dr_val = rd.get("domain_rating", rd.get("dr", 0))
bucket = self._dr_bucket(dr_val)
dr_dist[bucket] = dr_dist.get(bucket, 0) + 1
country = rd.get("country", "unknown")
country_counts[country] = country_counts.get(country, 0) + 1
result.dr_distribution = dr_dist
result.country_distribution = sorted(
[{"country": k, "count": v} for k, v in country_counts.items()],
key=lambda x: x["count"], reverse=True,
)[:20]
# Phase 4: Optional analyses
if include_velocity:
logger.info("Phase 4a: Tracking link velocity...")
velocity = await self.track_velocity(url)
if isinstance(velocity, LinkVelocity):
result.link_velocity = velocity
if include_broken:
logger.info("Phase 4b: Finding broken backlinks...")
broken = await self.find_broken_backlinks(url)
if isinstance(broken, list):
result.broken_backlinks = broken
if include_korean and isinstance(refdomains, list):
logger.info("Phase 4c: Mapping Korean platforms...")
korean_stats = self.map_korean_platforms(refdomains)
result.korean_platforms = korean_stats
# Phase 5: Generate issues and recommendations
self._generate_issues(result)
self._generate_recommendations(result)
logger.info(f"Backlink audit complete for {domain}")
return result
# -- Helpers -------------------------------------------------------------
@staticmethod
def _is_branded_anchor(anchor: str, brand: str, domain: str) -> bool:
"""Check if anchor text is a brand mention."""
brand_lower = brand.lower()
domain_clean = domain.replace("www.", "").split(".")[0].lower()
return (
brand_lower in anchor
or domain_clean in anchor
or domain.lower() in anchor
)
@staticmethod
def _is_naked_url(anchor: str, domain: str) -> bool:
"""Check if anchor text is a naked URL."""
return (
anchor.startswith("http")
or anchor.startswith("www.")
or domain.lower() in anchor
and ("/" in anchor or "." in anchor)
)
@staticmethod
def _is_generic_anchor(anchor: str) -> bool:
"""Check if anchor text is generic."""
return anchor.lower().strip() in GENERIC_ANCHORS
@staticmethod
def _is_exact_match(anchor: str, brand: str) -> bool:
"""Check if anchor is exact-match keyword (not brand)."""
# Exact match if it looks like a keyword phrase (2+ words, no brand)
words = anchor.split()
return len(words) >= 2 and brand.lower() not in anchor.lower()
@staticmethod
def _extract_tld(domain: str) -> str:
"""Extract TLD from domain."""
if not domain:
return ""
parts = domain.rsplit(".", 1)
return f".{parts[-1]}" if len(parts) > 1 else ""
@staticmethod
def _dr_bucket(dr: float) -> str:
"""Map DR value to bucket range."""
if dr <= 10:
return "0-10"
elif dr <= 20:
return "11-20"
elif dr <= 30:
return "21-30"
elif dr <= 40:
return "31-40"
elif dr <= 50:
return "41-50"
elif dr <= 60:
return "51-60"
elif dr <= 70:
return "61-70"
elif dr <= 80:
return "71-80"
elif dr <= 90:
return "81-90"
else:
return "91-100"
@staticmethod
def _find_closest_point(history: list[dict], days_ago: int) -> int:
"""Find referring domain count closest to N days ago."""
if not history:
return 0
target_date = (datetime.now() - timedelta(days=days_ago)).strftime("%Y-%m-%d")
closest = history[0]
for point in history:
if point.get("date", "") <= target_date:
closest = point
return closest.get("referring_domains", 0)
def _generate_issues(self, result: BacklinkAuditResult) -> None:
"""Generate audit issues based on findings."""
issues = []
# Toxic links
if result.toxic_link_count > 20:
issues.append({
"type": "error",
"category": "toxic_links",
"message": f"High toxic link count: {result.toxic_link_count} toxic backlinks detected",
})
elif result.toxic_link_count > 5:
issues.append({
"type": "warning",
"category": "toxic_links",
"message": f"Moderate toxic links: {result.toxic_link_count} potentially harmful backlinks",
})
# Low DR
if result.domain_rating < 20:
issues.append({
"type": "warning",
"category": "domain_authority",
"message": f"Low Domain Rating ({result.domain_rating}) - weak backlink profile",
})
# Dofollow ratio
if result.dofollow_ratio < 0.5:
issues.append({
"type": "warning",
"category": "link_quality",
"message": f"Low dofollow ratio ({result.dofollow_ratio:.0%}) - majority are nofollow",
})
# Anchor distribution issues
if result.anchor_distribution:
if result.anchor_distribution.exact_match_pct > 30:
issues.append({
"type": "error",
"category": "anchor_text",
"message": f"Over-optimized anchors: {result.anchor_distribution.exact_match_pct}% exact match (risk of penalty)",
})
if result.anchor_distribution.branded_pct < 10:
issues.append({
"type": "warning",
"category": "anchor_text",
"message": f"Low branded anchors ({result.anchor_distribution.branded_pct}%) - unnatural profile",
})
# Broken backlinks
if len(result.broken_backlinks) > 10:
issues.append({
"type": "warning",
"category": "broken_links",
"message": f"{len(result.broken_backlinks)} broken backlinks found - recovery opportunities",
})
# Velocity issues
if result.link_velocity and result.link_velocity.velocity_trend == "declining":
issues.append({
"type": "warning",
"category": "velocity",
"message": "Declining link velocity - losing more links than gaining",
})
result.issues = issues
def _generate_recommendations(self, result: BacklinkAuditResult) -> None:
"""Generate actionable recommendations."""
recs = []
if result.toxic_risk_level in ("critical", "high"):
recs.append(
"Disavow toxic backlinks immediately using Google Disavow Tool. "
f"Priority: {result.toxic_link_count} toxic links detected."
)
if result.domain_rating < 30:
recs.append(
"Focus on acquiring high-DR backlinks (DR 40+) from authoritative "
"domains in your niche to improve Domain Rating."
)
if result.anchor_distribution and result.anchor_distribution.exact_match_pct > 25:
recs.append(
"Diversify anchor text profile. Reduce exact-match anchors and "
"increase branded, generic, and naked URL anchors."
)
if len(result.broken_backlinks) > 5:
high_dr_broken = [b for b in result.broken_backlinks if b.get("dr", 0) > 30]
recs.append(
f"Reclaim {len(result.broken_backlinks)} broken backlinks "
f"({len(high_dr_broken)} from DR 30+ domains). Set up 301 redirects "
"or recreate content at original URLs."
)
if result.korean_platforms and result.korean_platforms.total_korean < 10:
recs.append(
"Increase presence on Korean platforms (Naver Blog, Tistory, Brunch). "
"Korean platform links signal local relevance for Korean SERP rankings."
)
if result.link_velocity and result.link_velocity.velocity_trend == "declining":
recs.append(
"Reverse declining link velocity with active outreach: guest posting, "
"digital PR, and content promotion campaigns."
)
if result.edu_gov_backlinks == 0:
recs.append(
"Target .edu and .gov backlinks through scholarship programs, "
"research collaborations, or government resource pages."
)
result.recommendations = recs
# ---------------------------------------------------------------------------
# Output Formatting
# ---------------------------------------------------------------------------
def format_rich_output(result: BacklinkAuditResult) -> None:
"""Display audit results using Rich tables."""
console.print(f"\n[bold cyan]Backlink Audit: {result.domain}[/bold cyan]")
console.print(f"[dim]Timestamp: {result.timestamp}[/dim]\n")
# Overview table
overview = Table(title="Overview", show_header=True, header_style="bold magenta")
overview.add_column("Metric", style="cyan")
overview.add_column("Value", style="green")
overview.add_row("Domain Rating", str(result.domain_rating))
overview.add_row("Total Backlinks", f"{result.total_backlinks:,}")
overview.add_row("Referring Domains", f"{result.referring_domains:,}")
overview.add_row("Dofollow Ratio", f"{result.dofollow_ratio:.0%}")
overview.add_row("Edu/Gov Backlinks", str(result.edu_gov_backlinks))
overview.add_row("Toxic Links", f"{result.toxic_link_count} ({result.toxic_risk_level})")
console.print(overview)
# Anchor distribution
if result.anchor_distribution:
anchor_table = Table(title="\nAnchor Distribution", show_header=True, header_style="bold magenta")
anchor_table.add_column("Type", style="cyan")
anchor_table.add_column("Count", style="green")
anchor_table.add_column("Percentage", style="yellow")
ad = result.anchor_distribution
anchor_table.add_row("Branded", str(ad.branded), f"{ad.branded_pct}%")
anchor_table.add_row("Exact Match", str(ad.exact_match), f"{ad.exact_match_pct}%")
anchor_table.add_row("Partial Match", str(ad.partial_match), f"{(ad.partial_match / max(ad.total, 1) * 100):.1f}%")
anchor_table.add_row("Generic", str(ad.generic), f"{ad.generic_pct}%")
anchor_table.add_row("Naked URL", str(ad.naked_url), f"{ad.naked_url_pct}%")
console.print(anchor_table)
# DR distribution
if result.dr_distribution:
dr_table = Table(title="\nDR Distribution (Referring Domains)", show_header=True, header_style="bold magenta")
dr_table.add_column("DR Range", style="cyan")
dr_table.add_column("Count", style="green")
for bucket, count in result.dr_distribution.items():
dr_table.add_row(bucket, str(count))
console.print(dr_table)
# Toxic links (top 10)
if result.toxic_links:
toxic_table = Table(title=f"\nToxic Links (Top 10 of {len(result.toxic_links)})", show_header=True, header_style="bold red")
toxic_table.add_column("Domain", style="red")
toxic_table.add_column("Risk", style="yellow")
toxic_table.add_column("Reason", style="dim")
for tl in result.toxic_links[:10]:
toxic_table.add_row(tl.domain, f"{tl.risk_score:.0f}", tl.reason[:60])
console.print(toxic_table)
# Korean platforms
if result.korean_platforms and result.korean_platforms.total_korean > 0:
kr_table = Table(title="\nKorean Platform Links", show_header=True, header_style="bold magenta")
kr_table.add_column("Platform", style="cyan")
kr_table.add_column("Count", style="green")
kp = result.korean_platforms
for plat, val in [
("Naver Blog", kp.naver_blog), ("Naver Cafe", kp.naver_cafe),
("Naver Post", kp.naver_post), ("Tistory", kp.tistory),
("Brunch", kp.brunch), ("Daum Blog", kp.daum_blog),
("Korean News", kp.korean_news), ("Korean Community", kp.korean_community),
]:
if val > 0:
kr_table.add_row(plat, str(val))
kr_table.add_row("[bold]Total Korean[/bold]", f"[bold]{kp.total_korean}[/bold]")
console.print(kr_table)
# Link velocity
if result.link_velocity:
vel_table = Table(title="\nLink Velocity", show_header=True, header_style="bold magenta")
vel_table.add_column("Period", style="cyan")
vel_table.add_column("New", style="green")
vel_table.add_column("Lost", style="red")
lv = result.link_velocity
vel_table.add_row("Last 7 days", str(lv.new_last_7d), str(lv.lost_last_7d))
vel_table.add_row("Last 30 days", str(lv.new_last_30d), str(lv.lost_last_30d))
vel_table.add_row("Last 90 days", str(lv.new_last_90d), str(lv.lost_last_90d))
vel_table.add_row("Trend", lv.velocity_trend, "")
console.print(vel_table)
# Broken backlinks
if result.broken_backlinks:
broken_table = Table(title=f"\nBroken Backlinks (Top 10 of {len(result.broken_backlinks)})", show_header=True, header_style="bold magenta")
broken_table.add_column("Source Domain", style="cyan")
broken_table.add_column("DR", style="green")
broken_table.add_column("Target URL", style="dim")
for bl in result.broken_backlinks[:10]:
broken_table.add_row(
bl.get("source_domain", ""),
str(bl.get("dr", 0)),
bl.get("target_url", "")[:50],
)
console.print(broken_table)
# Issues
if result.issues:
console.print("\n[bold red]Issues Found:[/bold red]")
for issue in result.issues:
icon = "[red]ERROR[/red]" if issue["type"] == "error" else "[yellow]WARN[/yellow]"
console.print(f" {icon} [{issue['category']}] {issue['message']}")
# Recommendations
if result.recommendations:
console.print("\n[bold green]Recommendations:[/bold green]")
for i, rec in enumerate(result.recommendations, 1):
console.print(f" {i}. {rec}")
console.print()
def result_to_dict(result: BacklinkAuditResult) -> dict[str, Any]:
"""Convert audit result to JSON-serializable dict."""
data = {
"url": result.url,
"domain": result.domain,
"domain_rating": result.domain_rating,
"backlink_stats": {
"total_backlinks": result.total_backlinks,
"referring_domains": result.referring_domains,
"dofollow_backlinks": result.dofollow_backlinks,
"nofollow_backlinks": result.nofollow_backlinks,
"dofollow_ratio": result.dofollow_ratio,
"edu_gov_backlinks": result.edu_gov_backlinks,
},
"anchor_distribution": asdict(result.anchor_distribution) if result.anchor_distribution else None,
"dr_distribution": result.dr_distribution,
"country_distribution": result.country_distribution,
"toxic_links": [asdict(t) for t in result.toxic_links],
"toxic_link_count": result.toxic_link_count,
"toxic_risk_level": result.toxic_risk_level,
"broken_backlinks": result.broken_backlinks,
"korean_platforms": asdict(result.korean_platforms) if result.korean_platforms else None,
"link_velocity": asdict(result.link_velocity) if result.link_velocity else None,
"issues": result.issues,
"recommendations": result.recommendations,
"timestamp": result.timestamp,
}
return data
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def parse_args() -> argparse.Namespace:
"""Parse command-line arguments."""
parser = argparse.ArgumentParser(
description="Backlink Auditor - Analyze backlink profiles and detect toxic links",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python backlink_auditor.py --url https://example.com --json
python backlink_auditor.py --url https://example.com --velocity --broken --json
python backlink_auditor.py --url https://example.com --korean-platforms --json
python backlink_auditor.py --url https://example.com --velocity --broken --korean-platforms --output report.json
""",
)
parser.add_argument("--url", required=True, help="Target URL or domain to audit")
parser.add_argument("--brand", default="", help="Brand name for anchor classification")
parser.add_argument("--velocity", action="store_true", help="Include link velocity tracking")
parser.add_argument("--broken", action="store_true", help="Include broken backlink analysis")
parser.add_argument("--korean-platforms", action="store_true", help="Include Korean platform link mapping")
parser.add_argument("--json", action="store_true", help="Output as JSON")
parser.add_argument("--output", "-o", help="Save output to file")
return parser.parse_args()
async def main() -> None:
"""Main entry point."""
args = parse_args()
auditor = BacklinkAuditor()
try:
result = await auditor.audit(
url=args.url,
include_velocity=args.velocity,
include_broken=args.broken,
include_korean=args.korean_platforms,
brand_name=args.brand,
)
if args.json or args.output:
output_data = result_to_dict(result)
json_str = json.dumps(output_data, indent=2, ensure_ascii=False)
if args.output:
with open(args.output, "w", encoding="utf-8") as f:
f.write(json_str)
logger.info(f"Report saved to {args.output}")
if args.json:
print(json_str)
else:
format_rich_output(result)
auditor.print_stats()
except KeyboardInterrupt:
logger.warning("Audit interrupted by user")
sys.exit(1)
except Exception as e:
logger.error(f"Audit failed: {e}")
if args.json:
print(json.dumps({"error": str(e)}, indent=2))
sys.exit(1)
if __name__ == "__main__":
asyncio.run(main())