Files
our-claude-skills/custom-skills/27-seo-ai-visibility/code/scripts/ai_citation_analyzer.py
Andrew Yim a3ff965b87 Add SEO skills 19-28, 31-32 with full Python implementations
12 new skills: Keyword Strategy, SERP Analysis, Position Tracking,
Link Building, Content Strategy, E-Commerce SEO, KPI Framework,
International SEO, AI Visibility, Knowledge Graph, Competitor Intel,
and Crawl Budget. ~20K lines of Python across 25 domain scripts.
Updated skill 11 pipeline table and repo CLAUDE.md.
Enhanced skill 18 local SEO workflow from jamie.clinic audit.

Note: Skill 26 hreflang_validator.py pending (content filter block).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-13 12:05:59 +09:00

612 lines
23 KiB
Python

"""
AI Citation Analyzer - Brand Radar Citation Analysis
=====================================================
Purpose: Analyze how a brand is cited in AI-generated search answers,
including cited domains, cited pages, and AI response content.
Python: 3.10+
Usage:
python ai_citation_analyzer.py --target example.com --json
python ai_citation_analyzer.py --target example.com --cited-domains --json
python ai_citation_analyzer.py --target example.com --cited-pages --json
python ai_citation_analyzer.py --target example.com --responses --json
"""
import argparse
import asyncio
import json
import logging
import subprocess
import sys
from dataclasses import dataclass, field, asdict
from datetime import datetime
from pathlib import Path
from typing import Any
# Add parent to path for base_client import
sys.path.insert(0, str(Path(__file__).parent))
from base_client import BaseAsyncClient, config
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Data classes
# ---------------------------------------------------------------------------
@dataclass
class AiResponse:
"""An AI-generated response that mentions the brand."""
query: str = ""
response_text: str = ""
brand_mentioned: bool = False
sentiment: str = "neutral" # positive, neutral, negative
source_engine: str = ""
date: str = ""
url: str = ""
@dataclass
class CitedDomain:
"""A domain cited in AI-generated answers."""
domain: str = ""
citation_count: int = 0
topics: list[str] = field(default_factory=list)
share_pct: float = 0.0
@dataclass
class CitedPage:
"""A specific page cited in AI-generated answers."""
url: str = ""
title: str = ""
citation_count: int = 0
context: str = ""
topics: list[str] = field(default_factory=list)
@dataclass
class CitationAnalysisResult:
"""Complete citation analysis result."""
target: str = ""
ai_responses: list[AiResponse] = field(default_factory=list)
cited_domains: list[CitedDomain] = field(default_factory=list)
cited_pages: list[CitedPage] = field(default_factory=list)
sentiment_summary: dict = field(default_factory=dict)
citation_ranking: list[dict] = field(default_factory=list)
competitor_citations: list[dict] = field(default_factory=list)
recommendations: list[str] = field(default_factory=list)
timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
def to_dict(self) -> dict:
"""Convert result to dictionary."""
return {
"target": self.target,
"ai_responses": [asdict(r) for r in self.ai_responses],
"cited_domains": [asdict(d) for d in self.cited_domains],
"cited_pages": [asdict(p) for p in self.cited_pages],
"sentiment_summary": self.sentiment_summary,
"citation_ranking": self.citation_ranking,
"competitor_citations": self.competitor_citations,
"recommendations": self.recommendations,
"timestamp": self.timestamp,
}
# ---------------------------------------------------------------------------
# MCP tool caller helper
# ---------------------------------------------------------------------------
def call_mcp_tool(tool_name: str, params: dict) -> dict:
"""
Call an Ahrefs MCP tool and return the parsed JSON response.
In Claude Desktop / Claude Code environments the MCP tools are invoked
directly by the AI agent. This helper exists so that the script can also
be executed standalone via subprocess for testing purposes.
"""
logger.info(f"Calling MCP tool: {tool_name} with params: {params}")
try:
cmd = ["claude", "mcp", "call", "ahrefs", tool_name, json.dumps(params)]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
if result.returncode == 0 and result.stdout.strip():
return json.loads(result.stdout.strip())
logger.warning(f"MCP tool {tool_name} returned non-zero or empty: {result.stderr}")
return {}
except (subprocess.TimeoutExpired, json.JSONDecodeError, FileNotFoundError) as exc:
logger.warning(f"MCP call failed ({exc}). Returning empty dict.")
return {}
# ---------------------------------------------------------------------------
# AI Citation Analyzer
# ---------------------------------------------------------------------------
class AiCitationAnalyzer(BaseAsyncClient):
"""Analyze AI answer citations and source pages for a brand."""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.logger = logging.getLogger(self.__class__.__name__)
# ---- AI Responses ----
async def get_ai_responses(self, target: str) -> list[AiResponse]:
"""Fetch AI-generated responses mentioning the brand via brand-radar-ai-responses."""
self.logger.info(f"Fetching AI responses for {target}")
data = await asyncio.to_thread(
call_mcp_tool,
"brand-radar-ai-responses",
{"target": target},
)
responses: list[AiResponse] = []
if not data:
return responses
items = data if isinstance(data, list) else data.get("responses", data.get("data", []))
for item in items:
if isinstance(item, dict):
responses.append(AiResponse(
query=item.get("query", item.get("keyword", "")),
response_text=item.get("response_text", item.get("answer", item.get("text", ""))),
brand_mentioned=item.get("brand_mentioned", True),
sentiment=item.get("sentiment", "neutral"),
source_engine=item.get("source_engine", item.get("engine", "")),
date=item.get("date", ""),
url=item.get("url", ""),
))
return responses
# ---- Cited Domains ----
async def get_cited_domains(self, target: str) -> list[CitedDomain]:
"""Fetch domains cited in AI answers via brand-radar-cited-domains."""
self.logger.info(f"Fetching cited domains for {target}")
data = await asyncio.to_thread(
call_mcp_tool,
"brand-radar-cited-domains",
{"target": target},
)
domains: list[CitedDomain] = []
if not data:
return domains
items = data if isinstance(data, list) else data.get("domains", data.get("data", []))
for item in items:
if isinstance(item, dict):
domains.append(CitedDomain(
domain=item.get("domain", ""),
citation_count=item.get("citation_count", item.get("citations", item.get("count", 0))),
topics=item.get("topics", []),
share_pct=item.get("share_pct", item.get("share", 0.0)),
))
return domains
# ---- Cited Pages ----
async def get_cited_pages(self, target: str) -> list[CitedPage]:
"""Fetch specific pages cited in AI answers via brand-radar-cited-pages."""
self.logger.info(f"Fetching cited pages for {target}")
data = await asyncio.to_thread(
call_mcp_tool,
"brand-radar-cited-pages",
{"target": target},
)
pages: list[CitedPage] = []
if not data:
return pages
items = data if isinstance(data, list) else data.get("pages", data.get("data", []))
for item in items:
if isinstance(item, dict):
pages.append(CitedPage(
url=item.get("url", ""),
title=item.get("title", ""),
citation_count=item.get("citation_count", item.get("citations", item.get("count", 0))),
context=item.get("context", item.get("snippet", "")),
topics=item.get("topics", []),
))
return pages
# ---- Sentiment Analysis ----
@staticmethod
def analyze_response_sentiment(responses: list[AiResponse]) -> dict:
"""
Analyze the sentiment distribution of AI responses.
Returns a summary with counts and percentages for each sentiment category.
"""
if not responses:
return {
"total": 0,
"positive": 0,
"neutral": 0,
"negative": 0,
"positive_pct": 0.0,
"neutral_pct": 0.0,
"negative_pct": 0.0,
"overall_sentiment": "unknown",
}
total = len(responses)
positive = sum(1 for r in responses if r.sentiment == "positive")
neutral = sum(1 for r in responses if r.sentiment == "neutral")
negative = sum(1 for r in responses if r.sentiment == "negative")
positive_pct = round((positive / total) * 100, 1)
neutral_pct = round((neutral / total) * 100, 1)
negative_pct = round((negative / total) * 100, 1)
# Determine overall sentiment
if positive_pct >= 60:
overall = "positive"
elif negative_pct >= 40:
overall = "negative"
elif positive_pct > negative_pct:
overall = "leaning_positive"
elif negative_pct > positive_pct:
overall = "leaning_negative"
else:
overall = "neutral"
return {
"total": total,
"positive": positive,
"neutral": neutral,
"negative": negative,
"positive_pct": positive_pct,
"neutral_pct": neutral_pct,
"negative_pct": negative_pct,
"overall_sentiment": overall,
}
# ---- Citation Ranking ----
@staticmethod
def rank_citations(items: list[CitedDomain] | list[CitedPage]) -> list[dict]:
"""Rank cited domains or pages by citation frequency."""
if not items:
return []
ranked = sorted(items, key=lambda x: x.citation_count, reverse=True)
total_citations = sum(item.citation_count for item in ranked)
result = []
for rank, item in enumerate(ranked, 1):
entry = asdict(item)
entry["rank"] = rank
entry["share_of_citations"] = (
round((item.citation_count / total_citations) * 100, 1)
if total_citations > 0
else 0.0
)
result.append(entry)
return result
# ---- Competitor Citation Comparison ----
async def compare_competitor_citations(
self, target: str, competitors: list[str]
) -> list[dict]:
"""Compare citation profiles between target and competitors."""
self.logger.info(f"Comparing citations for {target} vs {competitors}")
results = []
all_domains = [target] + competitors
for domain in all_domains:
cited_domains = await self.get_cited_domains(domain)
cited_pages = await self.get_cited_pages(domain)
total_domain_citations = sum(d.citation_count for d in cited_domains)
total_page_citations = sum(p.citation_count for p in cited_pages)
unique_domains = len(cited_domains)
unique_pages = len(cited_pages)
results.append({
"domain": domain,
"is_target": domain == target,
"total_domain_citations": total_domain_citations,
"total_page_citations": total_page_citations,
"unique_cited_domains": unique_domains,
"unique_cited_pages": unique_pages,
"top_cited_domain": cited_domains[0].domain if cited_domains else "",
"top_cited_page": cited_pages[0].url if cited_pages else "",
})
# Sort by total page citations descending
results.sort(key=lambda x: x["total_page_citations"], reverse=True)
return results
# ---- Recommendations ----
@staticmethod
def generate_recommendations(result: CitationAnalysisResult) -> list[str]:
"""Generate actionable recommendations for improving AI citations."""
recs: list[str] = []
# Based on citation count
total_page_citations = sum(p.citation_count for p in result.cited_pages)
if total_page_citations == 0:
recs.append(
"AI 검색 엔진에서 인용된 페이지가 없습니다. "
"고품질 원본 콘텐츠(연구 데이터, 종합 가이드, 전문가 인사이트)를 "
"발행하여 AI 엔진의 인용 대상이 되도록 하세요."
)
elif total_page_citations < 10:
recs.append(
f"인용된 페이지 수가 {total_page_citations}건으로 적습니다. "
"FAQ, How-to, 비교 분석 등 AI가 참조하기 쉬운 "
"구조화된 콘텐츠를 추가하세요."
)
# Based on domain diversity
if result.cited_domains:
target_domains = [d for d in result.cited_domains if d.domain == result.target]
if not target_domains:
recs.append(
"타깃 도메인이 AI 인용 도메인 목록에 포함되지 않았습니다. "
"도메인 권위(Domain Authority) 향상과 "
"Schema Markup(JSON-LD) 적용을 우선 추진하세요."
)
# Based on sentiment
sentiment = result.sentiment_summary
if sentiment.get("negative_pct", 0) > 30:
recs.append(
f"AI 응답 중 부정적 언급 비율이 {sentiment['negative_pct']}%입니다. "
"브랜드 평판 관리와 긍정적 콘텐츠 확대가 필요합니다. "
"고객 리뷰, 성공 사례, 수상 내역 등을 강화하세요."
)
elif sentiment.get("overall_sentiment") == "positive":
recs.append(
"AI 응답에서 브랜드 언급이 전반적으로 긍정적입니다. "
"이 긍정적 이미지를 활용하여 더 많은 키워드에서 "
"AI 인용을 확대하세요."
)
# Content strategy recommendations
if result.cited_pages:
top_pages = sorted(result.cited_pages, key=lambda p: p.citation_count, reverse=True)[:3]
top_topics = set()
for page in top_pages:
top_topics.update(page.topics)
if top_topics:
topics_str = ", ".join(list(top_topics)[:5])
recs.append(
f"가장 많이 인용되는 주제는 [{topics_str}]입니다. "
"이 주제들에 대한 심층 콘텐츠를 추가 제작하세요."
)
# E-E-A-T and structured data
recs.append(
"AI 인용률 향상을 위한 핵심 전략: "
"(1) E-E-A-T 시그널 강화 - 저자 프로필, 전문가 인용, 실제 경험 콘텐츠, "
"(2) 구조화된 데이터 적용 - FAQ, HowTo, Article Schema, "
"(3) 콘텐츠 정확성 및 최신성 유지, "
"(4) 원본 데이터와 독자적 연구 결과 발행."
)
# Competitor-based recommendations
if result.competitor_citations:
leader = result.competitor_citations[0]
if not leader.get("is_target", False):
recs.append(
f"인용 리더는 {leader['domain']}입니다 "
f"(페이지 인용 {leader['total_page_citations']}건). "
"해당 경쟁사의 인용된 페이지를 분석하여 "
"콘텐츠 갭을 파악하세요."
)
return recs
# ---- Main Orchestrator ----
async def analyze(
self,
target: str,
competitors: list[str] | None = None,
include_responses: bool = True,
include_cited_domains: bool = True,
include_cited_pages: bool = True,
) -> CitationAnalysisResult:
"""
Orchestrate full citation analysis.
Args:
target: Domain to analyze
competitors: Optional competitor domains
include_responses: Fetch AI response data
include_cited_domains: Fetch cited domains
include_cited_pages: Fetch cited pages
"""
self.logger.info(f"Starting AI citation analysis for {target}")
result = CitationAnalysisResult(target=target)
# AI responses
if include_responses:
result.ai_responses = await self.get_ai_responses(target)
result.sentiment_summary = self.analyze_response_sentiment(result.ai_responses)
# Cited domains
if include_cited_domains:
result.cited_domains = await self.get_cited_domains(target)
if result.cited_domains:
result.citation_ranking = self.rank_citations(result.cited_domains)
# Cited pages
if include_cited_pages:
result.cited_pages = await self.get_cited_pages(target)
# Competitor comparison
if competitors:
result.competitor_citations = await self.compare_competitor_citations(
target, competitors
)
# Recommendations
result.recommendations = self.generate_recommendations(result)
self.print_stats()
return result
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def build_parser() -> argparse.ArgumentParser:
"""Build argument parser for CLI usage."""
parser = argparse.ArgumentParser(
description="AI Citation Analyzer - Analyze AI answer citations and source pages",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s --target example.com --json
%(prog)s --target example.com --cited-domains --json
%(prog)s --target example.com --cited-pages --json
%(prog)s --target example.com --responses --competitor comp1.com --json
%(prog)s --target example.com --output citations.json
""",
)
parser.add_argument(
"--target", required=True,
help="Target domain to analyze (e.g., example.com)",
)
parser.add_argument(
"--competitor", action="append", default=[],
help="Competitor domain (repeatable). e.g., --competitor a.com --competitor b.com",
)
parser.add_argument(
"--cited-domains", action="store_true",
help="Include cited domains analysis",
)
parser.add_argument(
"--cited-pages", action="store_true",
help="Include cited pages analysis",
)
parser.add_argument(
"--responses", action="store_true",
help="Include AI response content analysis",
)
parser.add_argument(
"--json", action="store_true",
help="Output result as JSON to stdout",
)
parser.add_argument(
"--output", type=str, default=None,
help="Save JSON output to file path",
)
return parser
def print_summary(result: CitationAnalysisResult) -> None:
"""Print a human-readable summary of citation analysis."""
print("\n" + "=" * 60)
print(f" AI Citation Analysis: {result.target}")
print("=" * 60)
# AI Responses
if result.ai_responses:
print(f"\n AI Responses: {len(result.ai_responses)}")
for resp in result.ai_responses[:5]:
engine_tag = f" [{resp.source_engine}]" if resp.source_engine else ""
sentiment_tag = f" ({resp.sentiment})"
print(f" - Q: {resp.query[:60]}{engine_tag}{sentiment_tag}")
if len(result.ai_responses) > 5:
print(f" ... and {len(result.ai_responses) - 5} more")
# Sentiment Summary
if result.sentiment_summary:
s = result.sentiment_summary
print(f"\n Sentiment: {s.get('overall_sentiment', 'unknown')}")
print(f" Positive: {s.get('positive', 0)} ({s.get('positive_pct', 0):.1f}%)")
print(f" Neutral: {s.get('neutral', 0)} ({s.get('neutral_pct', 0):.1f}%)")
print(f" Negative: {s.get('negative', 0)} ({s.get('negative_pct', 0):.1f}%)")
# Cited Domains
if result.cited_domains:
print(f"\n Cited Domains: {len(result.cited_domains)}")
for domain in result.cited_domains[:10]:
topics_str = ", ".join(domain.topics[:3]) if domain.topics else ""
print(f" {domain.domain}: {domain.citation_count} citations"
f"{f' [{topics_str}]' if topics_str else ''}")
if len(result.cited_domains) > 10:
print(f" ... and {len(result.cited_domains) - 10} more")
# Cited Pages
if result.cited_pages:
print(f"\n Cited Pages: {len(result.cited_pages)}")
for page in result.cited_pages[:10]:
title = page.title[:50] if page.title else page.url[:50]
print(f" {title}: {page.citation_count} citations")
if len(result.cited_pages) > 10:
print(f" ... and {len(result.cited_pages) - 10} more")
# Competitor Comparison
if result.competitor_citations:
print("\n Competitor Citation Comparison:")
for comp in result.competitor_citations:
marker = " <-- target" if comp.get("is_target") else ""
print(f" {comp['domain']}: "
f"domains={comp['unique_cited_domains']}, "
f"pages={comp['unique_cited_pages']}, "
f"page_citations={comp['total_page_citations']}{marker}")
# Recommendations
if result.recommendations:
print("\n Recommendations:")
for i, rec in enumerate(result.recommendations, 1):
print(f" {i}. {rec}")
print("\n" + "=" * 60)
print(f" Generated: {result.timestamp}")
print("=" * 60 + "\n")
async def main() -> None:
"""CLI entry point."""
parser = build_parser()
args = parser.parse_args()
# Determine which sections to include
# If no specific flags, include everything
any_specific = args.cited_domains or args.cited_pages or args.responses
include_responses = args.responses or not any_specific
include_cited_domains = args.cited_domains or not any_specific
include_cited_pages = args.cited_pages or not any_specific
analyzer = AiCitationAnalyzer(
max_concurrent=5,
requests_per_second=2.0,
)
result = await analyzer.analyze(
target=args.target,
competitors=args.competitor if args.competitor else None,
include_responses=include_responses,
include_cited_domains=include_cited_domains,
include_cited_pages=include_cited_pages,
)
# Output
if args.json or args.output:
output_data = result.to_dict()
json_str = json.dumps(output_data, ensure_ascii=False, indent=2)
if args.json:
print(json_str)
if args.output:
output_path = Path(args.output)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(json_str, encoding="utf-8")
logger.info(f"Report saved to {args.output}")
else:
print_summary(result)
if __name__ == "__main__":
asyncio.run(main())