""" SERP Analyzer - Google SERP feature detection and competitor mapping ==================================================================== Purpose: Analyze Google SERP features, map competitor positions, classify content types, and score SERP opportunities. Python: 3.10+ Usage: python serp_analyzer.py --keyword "치과 임플란트" --country kr --json python serp_analyzer.py --keywords-file keywords.txt --country kr --json python serp_analyzer.py --keyword "dental implant" --output serp_report.json """ import argparse import json import logging import re import subprocess import sys from dataclasses import asdict, dataclass, field from datetime import datetime from pathlib import Path from typing import Any from urllib.parse import urlparse from rich.console import Console from rich.table import Table # --------------------------------------------------------------------------- # Logging # --------------------------------------------------------------------------- logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", ) logger = logging.getLogger(__name__) console = Console() # --------------------------------------------------------------------------- # Data Classes # --------------------------------------------------------------------------- @dataclass class SerpFeatures: """Tracks presence and count of Google SERP features.""" featured_snippet: bool = False people_also_ask: bool = False local_pack: bool = False knowledge_panel: bool = False video_carousel: bool = False image_pack: bool = False site_links: bool = False ads_top: int = 0 ads_bottom: int = 0 shopping: bool = False @property def feature_count(self) -> int: """Count of boolean features that are present.""" count = 0 for f in [ self.featured_snippet, self.people_also_ask, self.local_pack, self.knowledge_panel, self.video_carousel, self.image_pack, self.site_links, self.shopping, ]: if f: count += 1 return count @property def has_ads(self) -> bool: return self.ads_top > 0 or self.ads_bottom > 0 @dataclass class CompetitorPosition: """A single competitor entry in the SERP.""" position: int url: str domain: str title: str = "" content_type: str = "unknown" is_featured: bool = False has_sitelinks: bool = False estimated_traffic_share: float = 0.0 @dataclass class SerpResult: """Complete SERP analysis result for a keyword.""" keyword: str country: str = "us" search_volume: int = 0 keyword_difficulty: float = 0.0 cpc: float = 0.0 serp_features: SerpFeatures = field(default_factory=SerpFeatures) competitors: list[CompetitorPosition] = field(default_factory=list) opportunity_score: int = 0 intent_signals: str = "informational" content_type_distribution: dict[str, int] = field(default_factory=dict) volatility: str = "stable" timestamp: str = "" def __post_init__(self): if not self.timestamp: self.timestamp = datetime.now().isoformat() # --------------------------------------------------------------------------- # Content Type Classifiers # --------------------------------------------------------------------------- # URL path patterns that hint at content type URL_CONTENT_PATTERNS: dict[str, list[str]] = { "blog": [ r"/blog/", r"/post/", r"/article/", r"/news/", r"/magazine/", r"/journal/", r"/column/", r"/story/", r"\d{4}/\d{2}/", ], "product": [ r"/product/", r"/item/", r"/shop/", r"/store/", r"/buy/", r"/p/", r"/goods/", r"/catalog/", ], "service": [ r"/service", r"/solution", r"/treatment", r"/procedure", r"/pricing", r"/consultation", ], "news": [ r"/news/", r"/press/", r"/media/", r"/release/", r"news\.", r"press\.", ], "video": [ r"youtube\.com/watch", r"youtu\.be/", r"vimeo\.com/", r"/video/", r"/watch/", ], "forum": [ r"/forum/", r"/community/", r"/discuss", r"/thread/", r"/question/", r"/answers/", ], "wiki": [ r"wikipedia\.org", r"/wiki/", r"namu\.wiki", ], } # Title keywords that hint at content type TITLE_CONTENT_PATTERNS: dict[str, list[str]] = { "blog": ["블로그", "후기", "리뷰", "review", "guide", "가이드", "팁", "tips"], "product": ["구매", "가격", "buy", "price", "shop", "할인", "sale", "최저가"], "service": ["상담", "치료", "진료", "병원", "클리닉", "clinic", "treatment"], "news": ["뉴스", "속보", "보도", "news", "기사", "report"], "video": ["영상", "동영상", "video", "youtube"], "comparison": ["비교", "vs", "versus", "compare", "차이", "best"], } # CTR distribution by position (approximate click-through rates) CTR_BY_POSITION: dict[int, float] = { 1: 0.316, 2: 0.158, 3: 0.110, 4: 0.080, 5: 0.062, 6: 0.049, 7: 0.040, 8: 0.034, 9: 0.029, 10: 0.025, } # --------------------------------------------------------------------------- # SERP Analyzer # --------------------------------------------------------------------------- class SerpAnalyzer: """Analyzes Google SERP features, competitor positions, and opportunities.""" def __init__(self): self.logger = logging.getLogger(self.__class__.__name__) # ----- Data Fetching ----- def get_serp_data(self, keyword: str, country: str = "us") -> dict[str, Any]: """ Fetch SERP data via Ahrefs serp-overview MCP tool. Uses subprocess to invoke the Ahrefs MCP tool. Falls back to a structured placeholder when the MCP tool is unavailable (e.g., in standalone / CI environments). """ self.logger.info(f"Fetching SERP data for '{keyword}' (country={country})") try: # Attempt MCP tool call via subprocess cmd = [ "claude", "mcp", "call", "ahrefs", "serp-overview", json.dumps({"keyword": keyword, "country": country}), ] result = subprocess.run( cmd, capture_output=True, text=True, timeout=60, ) if result.returncode == 0 and result.stdout.strip(): data = json.loads(result.stdout) self.logger.info("Successfully fetched SERP data via MCP") return data except (subprocess.TimeoutExpired, FileNotFoundError, json.JSONDecodeError) as exc: self.logger.warning(f"MCP call unavailable ({exc}), using keyword metrics fallback") # Fallback: try Ahrefs keywords-explorer-overview try: cmd_kw = [ "claude", "mcp", "call", "ahrefs", "keywords-explorer-overview", json.dumps({"keyword": keyword, "country": country}), ] result_kw = subprocess.run( cmd_kw, capture_output=True, text=True, timeout=60, ) if result_kw.returncode == 0 and result_kw.stdout.strip(): data = json.loads(result_kw.stdout) self.logger.info("Fetched keyword overview via MCP") return data except (subprocess.TimeoutExpired, FileNotFoundError, json.JSONDecodeError) as exc: self.logger.warning(f"Keywords-explorer MCP also unavailable ({exc})") # Return empty structure when no MCP tools available self.logger.warning( "No MCP data source available. Run inside Claude Desktop " "or provide data via --input flag." ) return { "keyword": keyword, "country": country, "serp": [], "serp_features": {}, "metrics": {}, } # ----- Feature Detection ----- def detect_features(self, serp_data: dict[str, Any]) -> SerpFeatures: """ Identify SERP features from Ahrefs response data. Handles both the structured 'serp_features' dict returned by keywords-explorer-overview and the raw SERP items list from serp-overview. """ features = SerpFeatures() # -- Method 1: structured serp_features from Ahrefs -- sf = serp_data.get("serp_features", {}) if isinstance(sf, dict): features.featured_snippet = sf.get("featured_snippet", False) features.people_also_ask = sf.get("people_also_ask", False) features.local_pack = sf.get("local_pack", False) features.knowledge_panel = sf.get("knowledge_panel", False) or sf.get( "knowledge_graph", False ) features.video_carousel = sf.get("video", False) or sf.get( "video_carousel", False ) features.image_pack = sf.get("image_pack", False) or sf.get( "images", False ) features.site_links = sf.get("sitelinks", False) or sf.get( "site_links", False ) features.shopping = sf.get("shopping_results", False) or sf.get( "shopping", False ) features.ads_top = int(sf.get("ads_top", 0) or 0) features.ads_bottom = int(sf.get("ads_bottom", 0) or 0) # -- Method 2: infer from raw SERP items list -- serp_items = serp_data.get("serp", []) if isinstance(serp_items, list): for item in serp_items: item_type = str(item.get("type", "")).lower() if "featured_snippet" in item_type or item.get("is_featured"): features.featured_snippet = True if "people_also_ask" in item_type or "paa" in item_type: features.people_also_ask = True if "local" in item_type or "map" in item_type: features.local_pack = True if "knowledge" in item_type: features.knowledge_panel = True if "video" in item_type: features.video_carousel = True if "image" in item_type: features.image_pack = True if item.get("sitelinks"): features.site_links = True if "shopping" in item_type: features.shopping = True if "ad" in item_type: pos = item.get("position", 0) if pos <= 4: features.ads_top += 1 else: features.ads_bottom += 1 return features # ----- Competitor Mapping ----- def map_competitors(self, serp_data: dict[str, Any]) -> list[CompetitorPosition]: """Extract competitor positions and domains from SERP data.""" competitors: list[CompetitorPosition] = [] serp_items = serp_data.get("serp", []) if not isinstance(serp_items, list): return competitors for item in serp_items: url = item.get("url", "") if not url: continue # Skip ads for organic mapping item_type = str(item.get("type", "")).lower() if "ad" in item_type: continue parsed = urlparse(url) domain = parsed.netloc.replace("www.", "") position = int(item.get("position", len(competitors) + 1)) title = item.get("title", "") content_type = self.classify_content_type(item) traffic_share = CTR_BY_POSITION.get(position, 0.01) comp = CompetitorPosition( position=position, url=url, domain=domain, title=title, content_type=content_type, is_featured=bool(item.get("is_featured")), has_sitelinks=bool(item.get("sitelinks")), estimated_traffic_share=round(traffic_share, 4), ) competitors.append(comp) # Sort by position competitors.sort(key=lambda c: c.position) return competitors # ----- Content Type Classification ----- def classify_content_type(self, result: dict[str, Any]) -> str: """ Classify a SERP result as blog/product/service/news/video/forum/wiki based on URL patterns and title keywords. """ url = result.get("url", "").lower() title = result.get("title", "").lower() scores: dict[str, int] = {} # Score from URL patterns for ctype, patterns in URL_CONTENT_PATTERNS.items(): for pattern in patterns: if re.search(pattern, url): scores[ctype] = scores.get(ctype, 0) + 2 break # Score from title patterns for ctype, keywords in TITLE_CONTENT_PATTERNS.items(): for kw in keywords: if kw.lower() in title: scores[ctype] = scores.get(ctype, 0) + 1 if not scores: # Heuristic: if domain is a known authority site parsed = urlparse(url) domain = parsed.netloc.lower() if any(d in domain for d in ["wikipedia", "namu.wiki", "나무위키"]): return "wiki" if any(d in domain for d in ["youtube", "vimeo"]): return "video" if any(d in domain for d in ["naver.com", "tistory.com", "brunch.co.kr"]): return "blog" return "service_page" # Return highest scoring type return max(scores, key=scores.get) # type: ignore[arg-type] # ----- Opportunity Scoring ----- def calculate_opportunity_score( self, features: SerpFeatures, positions: list[CompetitorPosition], ) -> int: """ Score SERP opportunity from 0-100. Higher scores indicate better opportunity to rank or gain features. Factors (additive): - Featured snippet available but could be captured +15 - PAA present (related question opportunity) +10 - No knowledge panel (less SERP real-estate taken) +10 - Low ad count (more organic visibility) +10 - Few sitelinks in top results +5 - Content diversity (various domains in top 10) +10 - No video carousel (opportunity to add video) +5 - Top results are blogs (easier to outrank) +10 - Image pack absent (image SEO opportunity) +5 - Shopping absent for commercial keywords +5 - Top positions lacking schema/rich results +5 Penalty factors (subtractive): - Knowledge panel dominates -15 - Heavy ad presence (4+ top ads) -10 - Single domain dominates top 5 -10 """ score = 50 # Base score # -- Positive signals -- if features.featured_snippet: score += 15 if features.people_also_ask: score += 10 if not features.knowledge_panel: score += 10 if features.ads_top <= 1: score += 10 elif features.ads_top <= 2: score += 5 if not features.video_carousel: score += 5 if not features.image_pack: score += 5 if not features.shopping: score += 5 # Domain diversity in top 10 if positions: top10_domains = {p.domain for p in positions[:10]} if len(top10_domains) >= 8: score += 10 elif len(top10_domains) >= 5: score += 5 # Blog-heavy top results (easier to compete) blog_count = sum( 1 for p in positions[:5] if p.content_type == "blog" ) if blog_count >= 3: score += 10 elif blog_count >= 2: score += 5 # Sitelinks reduce available space sitelink_count = sum(1 for p in positions[:5] if p.has_sitelinks) if sitelink_count <= 1: score += 5 # Single domain dominance penalty domain_counts: dict[str, int] = {} for p in positions[:5]: domain_counts[p.domain] = domain_counts.get(p.domain, 0) + 1 if any(c >= 3 for c in domain_counts.values()): score -= 10 # -- Negative signals -- if features.knowledge_panel: score -= 15 if features.ads_top >= 4: score -= 10 elif features.ads_top >= 3: score -= 5 # Clamp to 0-100 return max(0, min(100, score)) # ----- Intent Validation ----- def validate_intent( self, features: SerpFeatures, positions: list[CompetitorPosition], ) -> str: """ Infer search intent from SERP composition. Returns one of: informational, navigational, commercial, transactional, local """ signals: dict[str, int] = { "informational": 0, "navigational": 0, "commercial": 0, "transactional": 0, "local": 0, } # Feature-based signals if features.featured_snippet: signals["informational"] += 3 if features.people_also_ask: signals["informational"] += 2 if features.knowledge_panel: signals["informational"] += 2 signals["navigational"] += 2 if features.local_pack: signals["local"] += 5 if features.shopping: signals["transactional"] += 4 if features.has_ads: signals["commercial"] += 2 signals["transactional"] += 1 if features.ads_top >= 3: signals["transactional"] += 2 if features.image_pack: signals["informational"] += 1 if features.video_carousel: signals["informational"] += 1 # Content type signals from top results for pos in positions[:10]: ct = pos.content_type if ct == "blog": signals["informational"] += 1 elif ct == "product": signals["transactional"] += 2 elif ct == "service": signals["commercial"] += 1 elif ct == "news": signals["informational"] += 1 elif ct == "video": signals["informational"] += 1 elif ct == "wiki": signals["informational"] += 2 elif ct == "forum": signals["informational"] += 1 elif ct == "comparison": signals["commercial"] += 2 # Navigational: single domain dominates top 3 if positions: top3_domains = [p.domain for p in positions[:3]] if len(set(top3_domains)) == 1: signals["navigational"] += 5 # Return highest signal return max(signals, key=signals.get) # type: ignore[arg-type] # ----- Content Type Distribution ----- def _content_type_distribution( self, positions: list[CompetitorPosition] ) -> dict[str, int]: """Count content types across top organic results.""" dist: dict[str, int] = {} for p in positions[:10]: dist[p.content_type] = dist.get(p.content_type, 0) + 1 return dict(sorted(dist.items(), key=lambda x: x[1], reverse=True)) # ----- Volatility Assessment ----- def _assess_volatility(self, serp_data: dict[str, Any]) -> str: """ Assess SERP volatility based on available signals. Returns: stable, moderate, volatile """ # Check if Ahrefs provides a volatility/movement score metrics = serp_data.get("metrics", {}) if isinstance(metrics, dict): volatility_score = metrics.get("serp_volatility", None) if volatility_score is not None: if volatility_score < 3: return "stable" elif volatility_score < 7: return "moderate" else: return "volatile" # Heuristic: if many results have recent dates, SERP is more volatile serp_items = serp_data.get("serp", []) if isinstance(serp_items, list) and serp_items: recent_count = 0 for item in serp_items[:10]: last_seen = item.get("last_seen", "") if last_seen: try: dt = datetime.fromisoformat(last_seen.replace("Z", "+00:00")) if (datetime.now(dt.tzinfo) - dt).days < 30: recent_count += 1 except (ValueError, TypeError): pass if recent_count >= 5: return "volatile" elif recent_count >= 3: return "moderate" return "stable" # ----- Main Analysis Orchestrator ----- def analyze(self, keyword: str, country: str = "us") -> SerpResult: """ Orchestrate full SERP analysis for a single keyword. Steps: 1. Fetch SERP data from Ahrefs MCP 2. Detect SERP features 3. Map competitor positions 4. Classify content types 5. Calculate opportunity score 6. Validate search intent 7. Assess volatility """ serp_data = self.get_serp_data(keyword, country) features = self.detect_features(serp_data) positions = self.map_competitors(serp_data) opportunity = self.calculate_opportunity_score(features, positions) intent = self.validate_intent(features, positions) content_dist = self._content_type_distribution(positions) volatility = self._assess_volatility(serp_data) # Extract keyword metrics if available metrics = serp_data.get("metrics", {}) search_volume = int(metrics.get("search_volume", 0) or 0) keyword_difficulty = float(metrics.get("keyword_difficulty", 0) or 0) cpc = float(metrics.get("cpc", 0) or 0) result = SerpResult( keyword=keyword, country=country, search_volume=search_volume, keyword_difficulty=keyword_difficulty, cpc=cpc, serp_features=features, competitors=positions, opportunity_score=opportunity, intent_signals=intent, content_type_distribution=content_dist, volatility=volatility, ) return result # --------------------------------------------------------------------------- # Output Helpers # --------------------------------------------------------------------------- def result_to_dict(result: SerpResult) -> dict[str, Any]: """Convert SerpResult to a JSON-serializable dictionary.""" d = asdict(result) return d def print_rich_report(result: SerpResult) -> None: """Print a human-readable report using rich.""" console.rule(f"[bold blue]SERP Analysis: {result.keyword}") console.print(f"[dim]Country: {result.country} | Timestamp: {result.timestamp}[/dim]") console.print() # Metrics if result.search_volume or result.keyword_difficulty: metrics_table = Table(title="Keyword Metrics", show_lines=True) metrics_table.add_column("Metric", style="cyan") metrics_table.add_column("Value", style="green") metrics_table.add_row("Search Volume", f"{result.search_volume:,}") metrics_table.add_row("Keyword Difficulty", f"{result.keyword_difficulty:.1f}") metrics_table.add_row("CPC", f"${result.cpc:.2f}") console.print(metrics_table) console.print() # SERP Features feat = result.serp_features feat_table = Table(title="SERP Features", show_lines=True) feat_table.add_column("Feature", style="cyan") feat_table.add_column("Present", style="green") feat_table.add_row("Featured Snippet", _bool_icon(feat.featured_snippet)) feat_table.add_row("People Also Ask", _bool_icon(feat.people_also_ask)) feat_table.add_row("Local Pack", _bool_icon(feat.local_pack)) feat_table.add_row("Knowledge Panel", _bool_icon(feat.knowledge_panel)) feat_table.add_row("Video Carousel", _bool_icon(feat.video_carousel)) feat_table.add_row("Image Pack", _bool_icon(feat.image_pack)) feat_table.add_row("Site Links", _bool_icon(feat.site_links)) feat_table.add_row("Shopping", _bool_icon(feat.shopping)) feat_table.add_row("Ads (top)", str(feat.ads_top)) feat_table.add_row("Ads (bottom)", str(feat.ads_bottom)) console.print(feat_table) console.print() # Competitors if result.competitors: comp_table = Table(title="Top Competitors", show_lines=True) comp_table.add_column("#", style="bold") comp_table.add_column("Domain", style="cyan") comp_table.add_column("Type", style="magenta") comp_table.add_column("CTR Share", style="green") comp_table.add_column("Featured", style="yellow") for c in result.competitors[:10]: comp_table.add_row( str(c.position), c.domain, c.content_type, f"{c.estimated_traffic_share:.1%}", _bool_icon(c.is_featured), ) console.print(comp_table) console.print() # Content Distribution if result.content_type_distribution: dist_table = Table(title="Content Type Distribution (Top 10)", show_lines=True) dist_table.add_column("Content Type", style="cyan") dist_table.add_column("Count", style="green") for ct, count in result.content_type_distribution.items(): dist_table.add_row(ct, str(count)) console.print(dist_table) console.print() # Summary opp_color = "green" if result.opportunity_score >= 60 else ( "yellow" if result.opportunity_score >= 40 else "red" ) console.print(f"Opportunity Score: [{opp_color}]{result.opportunity_score}/100[/{opp_color}]") console.print(f"Search Intent: [bold]{result.intent_signals}[/bold]") console.print(f"SERP Volatility: [bold]{result.volatility}[/bold]") console.rule() def _bool_icon(val: bool) -> str: """Return Yes/No string for boolean values.""" return "Yes" if val else "No" # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( description="Google SERP feature detection and competitor mapping", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python serp_analyzer.py --keyword "치과 임플란트" --country kr --json python serp_analyzer.py --keywords-file keywords.txt --country kr --output report.json """, ) group = parser.add_mutually_exclusive_group(required=True) group.add_argument( "--keyword", type=str, help="Single keyword to analyze", ) group.add_argument( "--keywords-file", type=str, help="Path to file with one keyword per line", ) parser.add_argument( "--country", type=str, default="us", help="Country code for SERP (default: us)", ) parser.add_argument( "--json", action="store_true", dest="json_output", help="Output results as JSON", ) parser.add_argument( "--output", type=str, help="Write JSON results to file", ) return parser def load_keywords(filepath: str) -> list[str]: """Load keywords from a text file, one per line.""" path = Path(filepath) if not path.exists(): logger.error(f"Keywords file not found: {filepath}") sys.exit(1) keywords = [] with open(path, "r", encoding="utf-8") as fh: for line in fh: kw = line.strip() if kw and not kw.startswith("#"): keywords.append(kw) logger.info(f"Loaded {len(keywords)} keywords from {filepath}") return keywords def main() -> None: parser = build_parser() args = parser.parse_args() analyzer = SerpAnalyzer() # Collect keywords if args.keyword: keywords = [args.keyword] else: keywords = load_keywords(args.keywords_file) if not keywords: logger.error("No keywords to analyze") sys.exit(1) results: list[dict[str, Any]] = [] for kw in keywords: console.print(f"\n[bold]Analyzing:[/bold] {kw}") result = analyzer.analyze(kw, args.country) if args.json_output or args.output: results.append(result_to_dict(result)) else: print_rich_report(result) # JSON output if args.json_output: output_data = results[0] if len(results) == 1 else results print(json.dumps(output_data, ensure_ascii=False, indent=2)) if args.output: output_data = results[0] if len(results) == 1 else results output_path = Path(args.output) with open(output_path, "w", encoding="utf-8") as fh: json.dump(output_data, fh, ensure_ascii=False, indent=2) logger.info(f"Results written to {output_path}") if __name__ == "__main__": main()