Add SEO skills 19-28, 31-32 with full Python implementations

12 new skills: Keyword Strategy, SERP Analysis, Position Tracking,
Link Building, Content Strategy, E-Commerce SEO, KPI Framework,
International SEO, AI Visibility, Knowledge Graph, Competitor Intel,
and Crawl Budget. ~20K lines of Python across 25 domain scripts.
Updated skill 11 pipeline table and repo CLAUDE.md.
Enhanced skill 18 local SEO workflow from jamie.clinic audit.

Note: Skill 26 hreflang_validator.py pending (content filter block).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-13 12:05:59 +09:00
parent 159f7ec3f7
commit a3ff965b87
125 changed files with 25948 additions and 173 deletions

View File

@@ -0,0 +1,611 @@
"""
AI Citation Analyzer - Brand Radar Citation Analysis
=====================================================
Purpose: Analyze how a brand is cited in AI-generated search answers,
including cited domains, cited pages, and AI response content.
Python: 3.10+
Usage:
python ai_citation_analyzer.py --target example.com --json
python ai_citation_analyzer.py --target example.com --cited-domains --json
python ai_citation_analyzer.py --target example.com --cited-pages --json
python ai_citation_analyzer.py --target example.com --responses --json
"""
import argparse
import asyncio
import json
import logging
import subprocess
import sys
from dataclasses import dataclass, field, asdict
from datetime import datetime
from pathlib import Path
from typing import Any
# Add parent to path for base_client import
sys.path.insert(0, str(Path(__file__).parent))
from base_client import BaseAsyncClient, config
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Data classes
# ---------------------------------------------------------------------------
@dataclass
class AiResponse:
"""An AI-generated response that mentions the brand."""
query: str = ""
response_text: str = ""
brand_mentioned: bool = False
sentiment: str = "neutral" # positive, neutral, negative
source_engine: str = ""
date: str = ""
url: str = ""
@dataclass
class CitedDomain:
"""A domain cited in AI-generated answers."""
domain: str = ""
citation_count: int = 0
topics: list[str] = field(default_factory=list)
share_pct: float = 0.0
@dataclass
class CitedPage:
"""A specific page cited in AI-generated answers."""
url: str = ""
title: str = ""
citation_count: int = 0
context: str = ""
topics: list[str] = field(default_factory=list)
@dataclass
class CitationAnalysisResult:
"""Complete citation analysis result."""
target: str = ""
ai_responses: list[AiResponse] = field(default_factory=list)
cited_domains: list[CitedDomain] = field(default_factory=list)
cited_pages: list[CitedPage] = field(default_factory=list)
sentiment_summary: dict = field(default_factory=dict)
citation_ranking: list[dict] = field(default_factory=list)
competitor_citations: list[dict] = field(default_factory=list)
recommendations: list[str] = field(default_factory=list)
timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
def to_dict(self) -> dict:
"""Convert result to dictionary."""
return {
"target": self.target,
"ai_responses": [asdict(r) for r in self.ai_responses],
"cited_domains": [asdict(d) for d in self.cited_domains],
"cited_pages": [asdict(p) for p in self.cited_pages],
"sentiment_summary": self.sentiment_summary,
"citation_ranking": self.citation_ranking,
"competitor_citations": self.competitor_citations,
"recommendations": self.recommendations,
"timestamp": self.timestamp,
}
# ---------------------------------------------------------------------------
# MCP tool caller helper
# ---------------------------------------------------------------------------
def call_mcp_tool(tool_name: str, params: dict) -> dict:
"""
Call an Ahrefs MCP tool and return the parsed JSON response.
In Claude Desktop / Claude Code environments the MCP tools are invoked
directly by the AI agent. This helper exists so that the script can also
be executed standalone via subprocess for testing purposes.
"""
logger.info(f"Calling MCP tool: {tool_name} with params: {params}")
try:
cmd = ["claude", "mcp", "call", "ahrefs", tool_name, json.dumps(params)]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
if result.returncode == 0 and result.stdout.strip():
return json.loads(result.stdout.strip())
logger.warning(f"MCP tool {tool_name} returned non-zero or empty: {result.stderr}")
return {}
except (subprocess.TimeoutExpired, json.JSONDecodeError, FileNotFoundError) as exc:
logger.warning(f"MCP call failed ({exc}). Returning empty dict.")
return {}
# ---------------------------------------------------------------------------
# AI Citation Analyzer
# ---------------------------------------------------------------------------
class AiCitationAnalyzer(BaseAsyncClient):
"""Analyze AI answer citations and source pages for a brand."""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.logger = logging.getLogger(self.__class__.__name__)
# ---- AI Responses ----
async def get_ai_responses(self, target: str) -> list[AiResponse]:
"""Fetch AI-generated responses mentioning the brand via brand-radar-ai-responses."""
self.logger.info(f"Fetching AI responses for {target}")
data = await asyncio.to_thread(
call_mcp_tool,
"brand-radar-ai-responses",
{"target": target},
)
responses: list[AiResponse] = []
if not data:
return responses
items = data if isinstance(data, list) else data.get("responses", data.get("data", []))
for item in items:
if isinstance(item, dict):
responses.append(AiResponse(
query=item.get("query", item.get("keyword", "")),
response_text=item.get("response_text", item.get("answer", item.get("text", ""))),
brand_mentioned=item.get("brand_mentioned", True),
sentiment=item.get("sentiment", "neutral"),
source_engine=item.get("source_engine", item.get("engine", "")),
date=item.get("date", ""),
url=item.get("url", ""),
))
return responses
# ---- Cited Domains ----
async def get_cited_domains(self, target: str) -> list[CitedDomain]:
"""Fetch domains cited in AI answers via brand-radar-cited-domains."""
self.logger.info(f"Fetching cited domains for {target}")
data = await asyncio.to_thread(
call_mcp_tool,
"brand-radar-cited-domains",
{"target": target},
)
domains: list[CitedDomain] = []
if not data:
return domains
items = data if isinstance(data, list) else data.get("domains", data.get("data", []))
for item in items:
if isinstance(item, dict):
domains.append(CitedDomain(
domain=item.get("domain", ""),
citation_count=item.get("citation_count", item.get("citations", item.get("count", 0))),
topics=item.get("topics", []),
share_pct=item.get("share_pct", item.get("share", 0.0)),
))
return domains
# ---- Cited Pages ----
async def get_cited_pages(self, target: str) -> list[CitedPage]:
"""Fetch specific pages cited in AI answers via brand-radar-cited-pages."""
self.logger.info(f"Fetching cited pages for {target}")
data = await asyncio.to_thread(
call_mcp_tool,
"brand-radar-cited-pages",
{"target": target},
)
pages: list[CitedPage] = []
if not data:
return pages
items = data if isinstance(data, list) else data.get("pages", data.get("data", []))
for item in items:
if isinstance(item, dict):
pages.append(CitedPage(
url=item.get("url", ""),
title=item.get("title", ""),
citation_count=item.get("citation_count", item.get("citations", item.get("count", 0))),
context=item.get("context", item.get("snippet", "")),
topics=item.get("topics", []),
))
return pages
# ---- Sentiment Analysis ----
@staticmethod
def analyze_response_sentiment(responses: list[AiResponse]) -> dict:
"""
Analyze the sentiment distribution of AI responses.
Returns a summary with counts and percentages for each sentiment category.
"""
if not responses:
return {
"total": 0,
"positive": 0,
"neutral": 0,
"negative": 0,
"positive_pct": 0.0,
"neutral_pct": 0.0,
"negative_pct": 0.0,
"overall_sentiment": "unknown",
}
total = len(responses)
positive = sum(1 for r in responses if r.sentiment == "positive")
neutral = sum(1 for r in responses if r.sentiment == "neutral")
negative = sum(1 for r in responses if r.sentiment == "negative")
positive_pct = round((positive / total) * 100, 1)
neutral_pct = round((neutral / total) * 100, 1)
negative_pct = round((negative / total) * 100, 1)
# Determine overall sentiment
if positive_pct >= 60:
overall = "positive"
elif negative_pct >= 40:
overall = "negative"
elif positive_pct > negative_pct:
overall = "leaning_positive"
elif negative_pct > positive_pct:
overall = "leaning_negative"
else:
overall = "neutral"
return {
"total": total,
"positive": positive,
"neutral": neutral,
"negative": negative,
"positive_pct": positive_pct,
"neutral_pct": neutral_pct,
"negative_pct": negative_pct,
"overall_sentiment": overall,
}
# ---- Citation Ranking ----
@staticmethod
def rank_citations(items: list[CitedDomain] | list[CitedPage]) -> list[dict]:
"""Rank cited domains or pages by citation frequency."""
if not items:
return []
ranked = sorted(items, key=lambda x: x.citation_count, reverse=True)
total_citations = sum(item.citation_count for item in ranked)
result = []
for rank, item in enumerate(ranked, 1):
entry = asdict(item)
entry["rank"] = rank
entry["share_of_citations"] = (
round((item.citation_count / total_citations) * 100, 1)
if total_citations > 0
else 0.0
)
result.append(entry)
return result
# ---- Competitor Citation Comparison ----
async def compare_competitor_citations(
self, target: str, competitors: list[str]
) -> list[dict]:
"""Compare citation profiles between target and competitors."""
self.logger.info(f"Comparing citations for {target} vs {competitors}")
results = []
all_domains = [target] + competitors
for domain in all_domains:
cited_domains = await self.get_cited_domains(domain)
cited_pages = await self.get_cited_pages(domain)
total_domain_citations = sum(d.citation_count for d in cited_domains)
total_page_citations = sum(p.citation_count for p in cited_pages)
unique_domains = len(cited_domains)
unique_pages = len(cited_pages)
results.append({
"domain": domain,
"is_target": domain == target,
"total_domain_citations": total_domain_citations,
"total_page_citations": total_page_citations,
"unique_cited_domains": unique_domains,
"unique_cited_pages": unique_pages,
"top_cited_domain": cited_domains[0].domain if cited_domains else "",
"top_cited_page": cited_pages[0].url if cited_pages else "",
})
# Sort by total page citations descending
results.sort(key=lambda x: x["total_page_citations"], reverse=True)
return results
# ---- Recommendations ----
@staticmethod
def generate_recommendations(result: CitationAnalysisResult) -> list[str]:
"""Generate actionable recommendations for improving AI citations."""
recs: list[str] = []
# Based on citation count
total_page_citations = sum(p.citation_count for p in result.cited_pages)
if total_page_citations == 0:
recs.append(
"AI 검색 엔진에서 인용된 페이지가 없습니다. "
"고품질 원본 콘텐츠(연구 데이터, 종합 가이드, 전문가 인사이트)를 "
"발행하여 AI 엔진의 인용 대상이 되도록 하세요."
)
elif total_page_citations < 10:
recs.append(
f"인용된 페이지 수가 {total_page_citations}건으로 적습니다. "
"FAQ, How-to, 비교 분석 등 AI가 참조하기 쉬운 "
"구조화된 콘텐츠를 추가하세요."
)
# Based on domain diversity
if result.cited_domains:
target_domains = [d for d in result.cited_domains if d.domain == result.target]
if not target_domains:
recs.append(
"타깃 도메인이 AI 인용 도메인 목록에 포함되지 않았습니다. "
"도메인 권위(Domain Authority) 향상과 "
"Schema Markup(JSON-LD) 적용을 우선 추진하세요."
)
# Based on sentiment
sentiment = result.sentiment_summary
if sentiment.get("negative_pct", 0) > 30:
recs.append(
f"AI 응답 중 부정적 언급 비율이 {sentiment['negative_pct']}%입니다. "
"브랜드 평판 관리와 긍정적 콘텐츠 확대가 필요합니다. "
"고객 리뷰, 성공 사례, 수상 내역 등을 강화하세요."
)
elif sentiment.get("overall_sentiment") == "positive":
recs.append(
"AI 응답에서 브랜드 언급이 전반적으로 긍정적입니다. "
"이 긍정적 이미지를 활용하여 더 많은 키워드에서 "
"AI 인용을 확대하세요."
)
# Content strategy recommendations
if result.cited_pages:
top_pages = sorted(result.cited_pages, key=lambda p: p.citation_count, reverse=True)[:3]
top_topics = set()
for page in top_pages:
top_topics.update(page.topics)
if top_topics:
topics_str = ", ".join(list(top_topics)[:5])
recs.append(
f"가장 많이 인용되는 주제는 [{topics_str}]입니다. "
"이 주제들에 대한 심층 콘텐츠를 추가 제작하세요."
)
# E-E-A-T and structured data
recs.append(
"AI 인용률 향상을 위한 핵심 전략: "
"(1) E-E-A-T 시그널 강화 - 저자 프로필, 전문가 인용, 실제 경험 콘텐츠, "
"(2) 구조화된 데이터 적용 - FAQ, HowTo, Article Schema, "
"(3) 콘텐츠 정확성 및 최신성 유지, "
"(4) 원본 데이터와 독자적 연구 결과 발행."
)
# Competitor-based recommendations
if result.competitor_citations:
leader = result.competitor_citations[0]
if not leader.get("is_target", False):
recs.append(
f"인용 리더는 {leader['domain']}입니다 "
f"(페이지 인용 {leader['total_page_citations']}건). "
"해당 경쟁사의 인용된 페이지를 분석하여 "
"콘텐츠 갭을 파악하세요."
)
return recs
# ---- Main Orchestrator ----
async def analyze(
self,
target: str,
competitors: list[str] | None = None,
include_responses: bool = True,
include_cited_domains: bool = True,
include_cited_pages: bool = True,
) -> CitationAnalysisResult:
"""
Orchestrate full citation analysis.
Args:
target: Domain to analyze
competitors: Optional competitor domains
include_responses: Fetch AI response data
include_cited_domains: Fetch cited domains
include_cited_pages: Fetch cited pages
"""
self.logger.info(f"Starting AI citation analysis for {target}")
result = CitationAnalysisResult(target=target)
# AI responses
if include_responses:
result.ai_responses = await self.get_ai_responses(target)
result.sentiment_summary = self.analyze_response_sentiment(result.ai_responses)
# Cited domains
if include_cited_domains:
result.cited_domains = await self.get_cited_domains(target)
if result.cited_domains:
result.citation_ranking = self.rank_citations(result.cited_domains)
# Cited pages
if include_cited_pages:
result.cited_pages = await self.get_cited_pages(target)
# Competitor comparison
if competitors:
result.competitor_citations = await self.compare_competitor_citations(
target, competitors
)
# Recommendations
result.recommendations = self.generate_recommendations(result)
self.print_stats()
return result
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def build_parser() -> argparse.ArgumentParser:
"""Build argument parser for CLI usage."""
parser = argparse.ArgumentParser(
description="AI Citation Analyzer - Analyze AI answer citations and source pages",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s --target example.com --json
%(prog)s --target example.com --cited-domains --json
%(prog)s --target example.com --cited-pages --json
%(prog)s --target example.com --responses --competitor comp1.com --json
%(prog)s --target example.com --output citations.json
""",
)
parser.add_argument(
"--target", required=True,
help="Target domain to analyze (e.g., example.com)",
)
parser.add_argument(
"--competitor", action="append", default=[],
help="Competitor domain (repeatable). e.g., --competitor a.com --competitor b.com",
)
parser.add_argument(
"--cited-domains", action="store_true",
help="Include cited domains analysis",
)
parser.add_argument(
"--cited-pages", action="store_true",
help="Include cited pages analysis",
)
parser.add_argument(
"--responses", action="store_true",
help="Include AI response content analysis",
)
parser.add_argument(
"--json", action="store_true",
help="Output result as JSON to stdout",
)
parser.add_argument(
"--output", type=str, default=None,
help="Save JSON output to file path",
)
return parser
def print_summary(result: CitationAnalysisResult) -> None:
"""Print a human-readable summary of citation analysis."""
print("\n" + "=" * 60)
print(f" AI Citation Analysis: {result.target}")
print("=" * 60)
# AI Responses
if result.ai_responses:
print(f"\n AI Responses: {len(result.ai_responses)}")
for resp in result.ai_responses[:5]:
engine_tag = f" [{resp.source_engine}]" if resp.source_engine else ""
sentiment_tag = f" ({resp.sentiment})"
print(f" - Q: {resp.query[:60]}{engine_tag}{sentiment_tag}")
if len(result.ai_responses) > 5:
print(f" ... and {len(result.ai_responses) - 5} more")
# Sentiment Summary
if result.sentiment_summary:
s = result.sentiment_summary
print(f"\n Sentiment: {s.get('overall_sentiment', 'unknown')}")
print(f" Positive: {s.get('positive', 0)} ({s.get('positive_pct', 0):.1f}%)")
print(f" Neutral: {s.get('neutral', 0)} ({s.get('neutral_pct', 0):.1f}%)")
print(f" Negative: {s.get('negative', 0)} ({s.get('negative_pct', 0):.1f}%)")
# Cited Domains
if result.cited_domains:
print(f"\n Cited Domains: {len(result.cited_domains)}")
for domain in result.cited_domains[:10]:
topics_str = ", ".join(domain.topics[:3]) if domain.topics else ""
print(f" {domain.domain}: {domain.citation_count} citations"
f"{f' [{topics_str}]' if topics_str else ''}")
if len(result.cited_domains) > 10:
print(f" ... and {len(result.cited_domains) - 10} more")
# Cited Pages
if result.cited_pages:
print(f"\n Cited Pages: {len(result.cited_pages)}")
for page in result.cited_pages[:10]:
title = page.title[:50] if page.title else page.url[:50]
print(f" {title}: {page.citation_count} citations")
if len(result.cited_pages) > 10:
print(f" ... and {len(result.cited_pages) - 10} more")
# Competitor Comparison
if result.competitor_citations:
print("\n Competitor Citation Comparison:")
for comp in result.competitor_citations:
marker = " <-- target" if comp.get("is_target") else ""
print(f" {comp['domain']}: "
f"domains={comp['unique_cited_domains']}, "
f"pages={comp['unique_cited_pages']}, "
f"page_citations={comp['total_page_citations']}{marker}")
# Recommendations
if result.recommendations:
print("\n Recommendations:")
for i, rec in enumerate(result.recommendations, 1):
print(f" {i}. {rec}")
print("\n" + "=" * 60)
print(f" Generated: {result.timestamp}")
print("=" * 60 + "\n")
async def main() -> None:
"""CLI entry point."""
parser = build_parser()
args = parser.parse_args()
# Determine which sections to include
# If no specific flags, include everything
any_specific = args.cited_domains or args.cited_pages or args.responses
include_responses = args.responses or not any_specific
include_cited_domains = args.cited_domains or not any_specific
include_cited_pages = args.cited_pages or not any_specific
analyzer = AiCitationAnalyzer(
max_concurrent=5,
requests_per_second=2.0,
)
result = await analyzer.analyze(
target=args.target,
competitors=args.competitor if args.competitor else None,
include_responses=include_responses,
include_cited_domains=include_cited_domains,
include_cited_pages=include_cited_pages,
)
# Output
if args.json or args.output:
output_data = result.to_dict()
json_str = json.dumps(output_data, ensure_ascii=False, indent=2)
if args.json:
print(json_str)
if args.output:
output_path = Path(args.output)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(json_str, encoding="utf-8")
logger.info(f"Report saved to {args.output}")
else:
print_summary(result)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,594 @@
"""
AI Visibility Tracker - Brand Radar Monitoring
================================================
Purpose: Track brand visibility in AI-generated search answers
using Ahrefs Brand Radar APIs.
Python: 3.10+
Usage:
python ai_visibility_tracker.py --target example.com --json
python ai_visibility_tracker.py --target example.com --competitor comp1.com --json
python ai_visibility_tracker.py --target example.com --history --json
python ai_visibility_tracker.py --target example.com --sov --json
"""
import argparse
import asyncio
import json
import logging
import subprocess
import sys
from dataclasses import dataclass, field, asdict
from datetime import datetime
from pathlib import Path
from typing import Any
# Add parent to path for base_client import
sys.path.insert(0, str(Path(__file__).parent))
from base_client import BaseAsyncClient, config
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Data classes
# ---------------------------------------------------------------------------
@dataclass
class ImpressionMetrics:
"""AI search impression metrics for a brand."""
total: int = 0
trend: str = "stable" # increasing, decreasing, stable
change_pct: float = 0.0
period: str = ""
breakdown: dict = field(default_factory=dict)
@dataclass
class MentionMetrics:
"""AI search mention metrics for a brand."""
total: int = 0
trend: str = "stable"
change_pct: float = 0.0
period: str = ""
breakdown: dict = field(default_factory=dict)
@dataclass
class SovMetric:
"""Share of Voice metric for a single domain."""
domain: str = ""
sov_pct: float = 0.0
change_pct: float = 0.0
@dataclass
class HistoryPoint:
"""Single data point in a time series."""
date: str = ""
value: float = 0.0
@dataclass
class CompetitorVisibility:
"""Aggregated AI visibility metrics for a competitor domain."""
domain: str = ""
impressions: int = 0
mentions: int = 0
sov: float = 0.0
@dataclass
class AiVisibilityResult:
"""Complete AI visibility tracking result."""
target: str = ""
impressions: ImpressionMetrics = field(default_factory=ImpressionMetrics)
mentions: MentionMetrics = field(default_factory=MentionMetrics)
share_of_voice: dict = field(default_factory=dict)
impressions_history: list[HistoryPoint] = field(default_factory=list)
mentions_history: list[HistoryPoint] = field(default_factory=list)
sov_history: list[HistoryPoint] = field(default_factory=list)
competitors: list[CompetitorVisibility] = field(default_factory=list)
recommendations: list[str] = field(default_factory=list)
timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
def to_dict(self) -> dict:
"""Convert result to dictionary."""
return {
"target": self.target,
"impressions": asdict(self.impressions),
"mentions": asdict(self.mentions),
"share_of_voice": self.share_of_voice,
"impressions_history": [asdict(h) for h in self.impressions_history],
"mentions_history": [asdict(h) for h in self.mentions_history],
"sov_history": [asdict(h) for h in self.sov_history],
"competitors": [asdict(c) for c in self.competitors],
"recommendations": self.recommendations,
"timestamp": self.timestamp,
}
# ---------------------------------------------------------------------------
# MCP tool caller helper
# ---------------------------------------------------------------------------
def call_mcp_tool(tool_name: str, params: dict) -> dict:
"""
Call an Ahrefs MCP tool and return the parsed JSON response.
In Claude Desktop / Claude Code environments the MCP tools are invoked
directly by the AI agent. This helper exists so that the script can also
be executed standalone via subprocess for testing purposes.
"""
logger.info(f"Calling MCP tool: {tool_name} with params: {params}")
try:
cmd = ["claude", "mcp", "call", "ahrefs", tool_name, json.dumps(params)]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
if result.returncode == 0 and result.stdout.strip():
return json.loads(result.stdout.strip())
logger.warning(f"MCP tool {tool_name} returned non-zero or empty: {result.stderr}")
return {}
except (subprocess.TimeoutExpired, json.JSONDecodeError, FileNotFoundError) as exc:
logger.warning(f"MCP call failed ({exc}). Returning empty dict.")
return {}
# ---------------------------------------------------------------------------
# AI Visibility Tracker
# ---------------------------------------------------------------------------
class AiVisibilityTracker(BaseAsyncClient):
"""Track brand visibility across AI-generated search results."""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.logger = logging.getLogger(self.__class__.__name__)
# ---- Impressions ----
async def get_impressions_overview(self, target: str) -> ImpressionMetrics:
"""Fetch current AI impression metrics via brand-radar-impressions-overview."""
self.logger.info(f"Fetching impressions overview for {target}")
data = await asyncio.to_thread(
call_mcp_tool,
"brand-radar-impressions-overview",
{"target": target},
)
metrics = ImpressionMetrics()
if not data:
return metrics
metrics.total = data.get("total_impressions", data.get("impressions", 0))
metrics.change_pct = data.get("change_pct", data.get("change", 0.0))
metrics.period = data.get("period", "")
metrics.breakdown = data.get("breakdown", {})
if metrics.change_pct > 5:
metrics.trend = "increasing"
elif metrics.change_pct < -5:
metrics.trend = "decreasing"
else:
metrics.trend = "stable"
return metrics
# ---- Mentions ----
async def get_mentions_overview(self, target: str) -> MentionMetrics:
"""Fetch current AI mention metrics via brand-radar-mentions-overview."""
self.logger.info(f"Fetching mentions overview for {target}")
data = await asyncio.to_thread(
call_mcp_tool,
"brand-radar-mentions-overview",
{"target": target},
)
metrics = MentionMetrics()
if not data:
return metrics
metrics.total = data.get("total_mentions", data.get("mentions", 0))
metrics.change_pct = data.get("change_pct", data.get("change", 0.0))
metrics.period = data.get("period", "")
metrics.breakdown = data.get("breakdown", {})
if metrics.change_pct > 5:
metrics.trend = "increasing"
elif metrics.change_pct < -5:
metrics.trend = "decreasing"
else:
metrics.trend = "stable"
return metrics
# ---- Share of Voice ----
async def get_sov_overview(self, target: str) -> dict:
"""Fetch Share of Voice overview via brand-radar-sov-overview."""
self.logger.info(f"Fetching SOV overview for {target}")
data = await asyncio.to_thread(
call_mcp_tool,
"brand-radar-sov-overview",
{"target": target},
)
if not data:
return {"brand_sov": 0.0, "competitors": []}
brand_sov = data.get("sov", data.get("share_of_voice", 0.0))
competitors_raw = data.get("competitors", [])
competitors = []
for comp in competitors_raw:
competitors.append(SovMetric(
domain=comp.get("domain", ""),
sov_pct=comp.get("sov", comp.get("share_of_voice", 0.0)),
change_pct=comp.get("change_pct", 0.0),
))
return {
"brand_sov": brand_sov,
"competitors": [asdict(c) for c in competitors],
}
# ---- History ----
async def get_impressions_history(self, target: str) -> list[HistoryPoint]:
"""Fetch impressions history via brand-radar-impressions-history."""
self.logger.info(f"Fetching impressions history for {target}")
data = await asyncio.to_thread(
call_mcp_tool,
"brand-radar-impressions-history",
{"target": target},
)
return self._parse_history(data)
async def get_mentions_history(self, target: str) -> list[HistoryPoint]:
"""Fetch mentions history via brand-radar-mentions-history."""
self.logger.info(f"Fetching mentions history for {target}")
data = await asyncio.to_thread(
call_mcp_tool,
"brand-radar-mentions-history",
{"target": target},
)
return self._parse_history(data)
async def get_sov_history(self, target: str) -> list[HistoryPoint]:
"""Fetch SOV history via brand-radar-sov-history."""
self.logger.info(f"Fetching SOV history for {target}")
data = await asyncio.to_thread(
call_mcp_tool,
"brand-radar-sov-history",
{"target": target},
)
return self._parse_history(data)
def _parse_history(self, data: dict | list) -> list[HistoryPoint]:
"""Parse history data from MCP response into HistoryPoint list."""
points: list[HistoryPoint] = []
if not data:
return points
items = data if isinstance(data, list) else data.get("history", data.get("data", []))
for item in items:
if isinstance(item, dict):
points.append(HistoryPoint(
date=item.get("date", item.get("period", "")),
value=item.get("value", item.get("impressions", item.get("mentions", item.get("sov", 0.0)))),
))
return points
# ---- Competitor Comparison ----
async def compare_competitors(
self, target: str, competitors: list[str]
) -> list[CompetitorVisibility]:
"""Aggregate AI visibility metrics for target and competitors."""
self.logger.info(f"Comparing competitors: {competitors}")
results: list[CompetitorVisibility] = []
all_domains = [target] + competitors
for domain in all_domains:
imp = await self.get_impressions_overview(domain)
men = await self.get_mentions_overview(domain)
sov_data = await self.get_sov_overview(domain)
results.append(CompetitorVisibility(
domain=domain,
impressions=imp.total,
mentions=men.total,
sov=sov_data.get("brand_sov", 0.0),
))
# Sort by SOV descending
results.sort(key=lambda x: x.sov, reverse=True)
return results
# ---- Trend Calculation ----
@staticmethod
def calculate_trends(history: list[HistoryPoint]) -> dict:
"""Determine trend direction and statistics from history data."""
if not history or len(history) < 2:
return {
"direction": "insufficient_data",
"avg_value": 0.0,
"min_value": 0.0,
"max_value": 0.0,
"change_pct": 0.0,
"data_points": len(history) if history else 0,
}
values = [h.value for h in history]
first_value = values[0]
last_value = values[-1]
avg_value = sum(values) / len(values)
min_value = min(values)
max_value = max(values)
if first_value > 0:
change_pct = ((last_value - first_value) / first_value) * 100
else:
change_pct = 0.0
if change_pct > 10:
direction = "strongly_increasing"
elif change_pct > 3:
direction = "increasing"
elif change_pct < -10:
direction = "strongly_decreasing"
elif change_pct < -3:
direction = "decreasing"
else:
direction = "stable"
return {
"direction": direction,
"avg_value": round(avg_value, 2),
"min_value": round(min_value, 2),
"max_value": round(max_value, 2),
"change_pct": round(change_pct, 2),
"data_points": len(values),
}
# ---- Recommendations ----
@staticmethod
def generate_recommendations(result: AiVisibilityResult) -> list[str]:
"""Generate actionable recommendations for improving AI visibility."""
recs: list[str] = []
# Impression-based recommendations
if result.impressions.total == 0:
recs.append(
"AI 검색에서 브랜드 노출이 감지되지 않았습니다. "
"E-E-A-T 시그널(경험, 전문성, 권위성, 신뢰성)을 강화하여 "
"AI 엔진이 콘텐츠를 참조할 수 있도록 하세요."
)
elif result.impressions.trend == "decreasing":
recs.append(
"AI 검색 노출이 감소 추세입니다. 최신 콘텐츠 업데이트 및 "
"구조화된 데이터(Schema Markup) 추가를 검토하세요."
)
elif result.impressions.trend == "increasing":
recs.append(
"AI 검색 노출이 증가 추세입니다. 현재 콘텐츠 전략을 "
"유지하면서 추가 키워드 확장을 고려하세요."
)
# Mention-based recommendations
if result.mentions.total == 0:
recs.append(
"AI 응답에서 브랜드 언급이 없습니다. "
"브랜드명이 포함된 고품질 콘텐츠를 제작하고, "
"외부 사이트에서의 브랜드 언급(Citations)을 늘리세요."
)
elif result.mentions.trend == "decreasing":
recs.append(
"AI 응답 내 브랜드 언급이 줄어들고 있습니다. "
"콘텐츠 신선도(Freshness)와 업계 권위 신호를 점검하세요."
)
# SOV recommendations
sov_value = result.share_of_voice.get("brand_sov", 0.0)
if sov_value < 10:
recs.append(
f"AI 검색 Share of Voice가 {sov_value}%로 낮습니다. "
"핵심 키워드에 대한 종합 가이드, FAQ 콘텐츠, "
"원본 데이터/연구 자료를 발행하여 인용 가능성을 높이세요."
)
elif sov_value < 25:
recs.append(
f"AI 검색 Share of Voice가 {sov_value}%입니다. "
"경쟁사 대비 차별화된 전문 콘텐츠와 "
"독점 데이터 기반 인사이트를 강화하세요."
)
# Competitor-based recommendations
if result.competitors:
top_competitor = result.competitors[0]
if top_competitor.domain != result.target and top_competitor.sov > sov_value:
recs.append(
f"최대 경쟁사 {top_competitor.domain}의 SOV가 "
f"{top_competitor.sov}%로 앞서고 있습니다. "
"해당 경쟁사의 콘텐츠 전략과 인용 패턴을 분석하세요."
)
# General best practices
recs.append(
"AI 검색 최적화를 위해 다음 사항을 지속적으로 점검하세요: "
"(1) 구조화된 데이터(JSON-LD) 적용, "
"(2) FAQ 및 How-to 콘텐츠 발행, "
"(3) 신뢰할 수 있는 외부 사이트에서의 백링크 확보, "
"(4) 콘텐츠 정기 업데이트 및 정확성 검증."
)
return recs
# ---- Main Orchestrator ----
async def track(
self,
target: str,
competitors: list[str] | None = None,
include_history: bool = False,
include_sov: bool = False,
) -> AiVisibilityResult:
"""
Orchestrate full AI visibility tracking.
Args:
target: Domain to track
competitors: Optional list of competitor domains
include_history: Whether to fetch historical trends
include_sov: Whether to include SOV analysis
"""
self.logger.info(f"Starting AI visibility tracking for {target}")
result = AiVisibilityResult(target=target)
# Core metrics (always fetched)
result.impressions = await self.get_impressions_overview(target)
result.mentions = await self.get_mentions_overview(target)
# Share of Voice
if include_sov or competitors:
result.share_of_voice = await self.get_sov_overview(target)
# History
if include_history:
result.impressions_history = await self.get_impressions_history(target)
result.mentions_history = await self.get_mentions_history(target)
if include_sov:
result.sov_history = await self.get_sov_history(target)
# Competitor comparison
if competitors:
result.competitors = await self.compare_competitors(target, competitors)
# Generate recommendations
result.recommendations = self.generate_recommendations(result)
self.print_stats()
return result
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def build_parser() -> argparse.ArgumentParser:
"""Build argument parser for CLI usage."""
parser = argparse.ArgumentParser(
description="AI Visibility Tracker - Monitor brand visibility in AI search results",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s --target example.com --json
%(prog)s --target example.com --competitor comp1.com --competitor comp2.com --json
%(prog)s --target example.com --history --sov --json
%(prog)s --target example.com --output report.json
""",
)
parser.add_argument(
"--target", required=True,
help="Target domain to track (e.g., example.com)",
)
parser.add_argument(
"--competitor", action="append", default=[],
help="Competitor domain (repeatable). e.g., --competitor a.com --competitor b.com",
)
parser.add_argument(
"--history", action="store_true",
help="Include historical trend data (impressions, mentions, SOV over time)",
)
parser.add_argument(
"--sov", action="store_true",
help="Include Share of Voice analysis",
)
parser.add_argument(
"--json", action="store_true",
help="Output result as JSON to stdout",
)
parser.add_argument(
"--output", type=str, default=None,
help="Save JSON output to file path",
)
return parser
def print_summary(result: AiVisibilityResult) -> None:
"""Print a human-readable summary of AI visibility results."""
print("\n" + "=" * 60)
print(f" AI Visibility Report: {result.target}")
print("=" * 60)
print(f"\n Impressions: {result.impressions.total:,}")
print(f" Trend: {result.impressions.trend} ({result.impressions.change_pct:+.1f}%)")
print(f"\n Mentions: {result.mentions.total:,}")
print(f" Trend: {result.mentions.trend} ({result.mentions.change_pct:+.1f}%)")
if result.share_of_voice:
sov = result.share_of_voice.get("brand_sov", 0.0)
print(f"\n Share of Voice: {sov:.1f}%")
comp_list = result.share_of_voice.get("competitors", [])
if comp_list:
print(" Competitors:")
for c in comp_list:
print(f" {c.get('domain', '?')}: {c.get('sov_pct', 0):.1f}%")
if result.impressions_history:
trend_info = AiVisibilityTracker.calculate_trends(result.impressions_history)
print(f"\n Impressions Trend: {trend_info['direction']}")
print(f" Range: {trend_info['min_value']:,.0f} - {trend_info['max_value']:,.0f}")
print(f" Change: {trend_info['change_pct']:+.1f}%")
if result.competitors:
print("\n Competitor Comparison:")
for cv in result.competitors:
marker = " <-- target" if cv.domain == result.target else ""
print(f" {cv.domain}: SOV={cv.sov:.1f}%, Imp={cv.impressions:,}, Men={cv.mentions:,}{marker}")
if result.recommendations:
print("\n Recommendations:")
for i, rec in enumerate(result.recommendations, 1):
print(f" {i}. {rec}")
print("\n" + "=" * 60)
print(f" Generated: {result.timestamp}")
print("=" * 60 + "\n")
async def main() -> None:
"""CLI entry point."""
parser = build_parser()
args = parser.parse_args()
tracker = AiVisibilityTracker(
max_concurrent=5,
requests_per_second=2.0,
)
result = await tracker.track(
target=args.target,
competitors=args.competitor if args.competitor else None,
include_history=args.history,
include_sov=args.sov,
)
# Output
if args.json or args.output:
output_data = result.to_dict()
json_str = json.dumps(output_data, ensure_ascii=False, indent=2)
if args.json:
print(json_str)
if args.output:
output_path = Path(args.output)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(json_str, encoding="utf-8")
logger.info(f"Report saved to {args.output}")
else:
print_summary(result)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,207 @@
"""
Base Client - Shared async client utilities
===========================================
Purpose: Rate-limited async operations for API clients
Python: 3.10+
"""
import asyncio
import logging
import os
from asyncio import Semaphore
from datetime import datetime
from typing import Any, Callable, TypeVar
from dotenv import load_dotenv
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
)
# Load environment variables
load_dotenv()
# Logging setup
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
)
T = TypeVar("T")
class RateLimiter:
"""Rate limiter using token bucket algorithm."""
def __init__(self, rate: float, per: float = 1.0):
"""
Initialize rate limiter.
Args:
rate: Number of requests allowed
per: Time period in seconds (default: 1 second)
"""
self.rate = rate
self.per = per
self.tokens = rate
self.last_update = datetime.now()
self._lock = asyncio.Lock()
async def acquire(self) -> None:
"""Acquire a token, waiting if necessary."""
async with self._lock:
now = datetime.now()
elapsed = (now - self.last_update).total_seconds()
self.tokens = min(self.rate, self.tokens + elapsed * (self.rate / self.per))
self.last_update = now
if self.tokens < 1:
wait_time = (1 - self.tokens) * (self.per / self.rate)
await asyncio.sleep(wait_time)
self.tokens = 0
else:
self.tokens -= 1
class BaseAsyncClient:
"""Base class for async API clients with rate limiting."""
def __init__(
self,
max_concurrent: int = 5,
requests_per_second: float = 3.0,
logger: logging.Logger | None = None,
):
"""
Initialize base client.
Args:
max_concurrent: Maximum concurrent requests
requests_per_second: Rate limit
logger: Logger instance
"""
self.semaphore = Semaphore(max_concurrent)
self.rate_limiter = RateLimiter(requests_per_second)
self.logger = logger or logging.getLogger(self.__class__.__name__)
self.stats = {
"requests": 0,
"success": 0,
"errors": 0,
"retries": 0,
}
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
retry=retry_if_exception_type(Exception),
)
async def _rate_limited_request(
self,
coro: Callable[[], Any],
) -> Any:
"""Execute a request with rate limiting and retry."""
async with self.semaphore:
await self.rate_limiter.acquire()
self.stats["requests"] += 1
try:
result = await coro()
self.stats["success"] += 1
return result
except Exception as e:
self.stats["errors"] += 1
self.logger.error(f"Request failed: {e}")
raise
async def batch_requests(
self,
requests: list[Callable[[], Any]],
desc: str = "Processing",
) -> list[Any]:
"""Execute multiple requests concurrently."""
try:
from tqdm.asyncio import tqdm
has_tqdm = True
except ImportError:
has_tqdm = False
async def execute(req: Callable) -> Any:
try:
return await self._rate_limited_request(req)
except Exception as e:
return {"error": str(e)}
tasks = [execute(req) for req in requests]
if has_tqdm:
results = []
for coro in tqdm.as_completed(tasks, total=len(tasks), desc=desc):
result = await coro
results.append(result)
return results
else:
return await asyncio.gather(*tasks, return_exceptions=True)
def print_stats(self) -> None:
"""Print request statistics."""
self.logger.info("=" * 40)
self.logger.info("Request Statistics:")
self.logger.info(f" Total Requests: {self.stats['requests']}")
self.logger.info(f" Successful: {self.stats['success']}")
self.logger.info(f" Errors: {self.stats['errors']}")
self.logger.info("=" * 40)
class ConfigManager:
"""Manage API configuration and credentials."""
def __init__(self):
load_dotenv()
@property
def google_credentials_path(self) -> str | None:
"""Get Google service account credentials path."""
# Prefer SEO-specific credentials, fallback to general credentials
seo_creds = os.path.expanduser("~/.credential/ourdigital-seo-agent.json")
if os.path.exists(seo_creds):
return seo_creds
return os.getenv("GOOGLE_APPLICATION_CREDENTIALS")
@property
def pagespeed_api_key(self) -> str | None:
"""Get PageSpeed Insights API key."""
return os.getenv("PAGESPEED_API_KEY")
@property
def custom_search_api_key(self) -> str | None:
"""Get Custom Search API key."""
return os.getenv("CUSTOM_SEARCH_API_KEY")
@property
def custom_search_engine_id(self) -> str | None:
"""Get Custom Search Engine ID."""
return os.getenv("CUSTOM_SEARCH_ENGINE_ID")
@property
def notion_token(self) -> str | None:
"""Get Notion API token."""
return os.getenv("NOTION_TOKEN") or os.getenv("NOTION_API_KEY")
def validate_google_credentials(self) -> bool:
"""Validate Google credentials are configured."""
creds_path = self.google_credentials_path
if not creds_path:
return False
return os.path.exists(creds_path)
def get_required(self, key: str) -> str:
"""Get required environment variable or raise error."""
value = os.getenv(key)
if not value:
raise ValueError(f"Missing required environment variable: {key}")
return value
# Singleton config instance
config = ConfigManager()

View File

@@ -0,0 +1,8 @@
# 27-seo-ai-visibility dependencies
requests>=2.31.0
aiohttp>=3.9.0
pandas>=2.1.0
tenacity>=8.2.0
tqdm>=4.66.0
python-dotenv>=1.0.0
rich>=13.7.0