Files
Andrew Yim d2d0a2d460 Add SEO skills 33-34 and fix bugs in skills 19-34
New skills:
- Skill 33: Site migration planner with redirect mapping and monitoring
- Skill 34: Reporting dashboard with HTML charts and Korean executive reports

Bug fixes (Skill 34 - report_aggregator.py):
- Add audit_type fallback for skill identification (was only using audit_id prefix)
- Extract health scores from nested data dict (technical_score, onpage_score, etc.)
- Support subdomain matching in domain filter (blog.ourdigital.org matches ourdigital.org)
- Skip self-referencing DASH- aggregated reports

Bug fixes (Skill 20 - naver_serp_analyzer.py):
- Remove VIEW tab selectors (removed by Naver in 2026)
- Add new section detectors: books (도서), shortform (숏폼), influencer (인플루언서)

Improvements (Skill 34 - dashboard/executive report):
- Add Korean category labels for Chart.js charts (기술 SEO, 온페이지, etc.)
- Add Korean trend labels (개선 중 ↑, 안정 →, 하락 중 ↓)
- Add English→Korean issue description translation layer (20 common patterns)

Documentation improvements:
- Add Korean triggers to 4 skill descriptions (19, 25, 28, 31)
- Expand Skill 32 SKILL.md from 40→143 lines (was 6/10, added workflow, output format, limitations)
- Add output format examples to Skills 27 and 28 SKILL.md
- Add limitations sections to Skills 27 and 28
- Update README.md, CLAUDE.md, AGENTS.md for skills 33-34

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-14 00:01:00 +09:00

910 lines
34 KiB
Python

"""
Migration Monitor - Post-Migration Traffic & Indexation Monitoring
==================================================================
Purpose: Post-migration traffic comparison, redirect health checks,
indexation tracking, ranking change monitoring, and alert generation.
Python: 3.10+
Usage:
python migration_monitor.py --domain https://new-example.com --migration-date 2025-01-15 --baseline baseline.json --json
python migration_monitor.py --domain https://new-example.com --migration-date 2025-01-15 --json
"""
import argparse
import asyncio
import json
import logging
import sys
from dataclasses import dataclass, field, asdict
from datetime import datetime, timedelta
from typing import Any
from urllib.parse import urlparse
from base_client import BaseAsyncClient, config
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Data classes
# ---------------------------------------------------------------------------
@dataclass
class TrafficComparison:
"""Traffic comparison between pre- and post-migration periods."""
page_group: str = ""
pre_traffic: int = 0
post_traffic: int = 0
change_pct: float = 0.0
change_absolute: int = 0
status: str = "stable" # improved / stable / declined / critical
@dataclass
class RedirectHealth:
"""Health status of a single redirect."""
source: str = ""
target: str = ""
status_code: int = 0
chain_length: int = 0
is_broken: bool = False
final_url: str = ""
error: str = ""
@dataclass
class IndexationStatus:
"""Indexation comparison before and after migration."""
pre_count: int = 0
post_count: int = 0
change_pct: float = 0.0
missing_pages: list[str] = field(default_factory=list)
new_pages: list[str] = field(default_factory=list)
deindexed_count: int = 0
@dataclass
class RankingChange:
"""Ranking change for a keyword."""
keyword: str = ""
pre_position: int = 0
post_position: int = 0
change: int = 0
url: str = ""
search_volume: int = 0
@dataclass
class MigrationAlert:
"""Alert for significant post-migration issues."""
alert_type: str = "" # traffic_drop, redirect_broken, indexation_drop, ranking_loss
severity: str = "info" # info / warning / critical
message: str = ""
metric_value: float = 0.0
threshold: float = 0.0
affected_urls: list[str] = field(default_factory=list)
@dataclass
class MigrationReport:
"""Complete post-migration monitoring report."""
domain: str = ""
migration_date: str = ""
days_since_migration: int = 0
traffic_comparison: list[TrafficComparison] = field(default_factory=list)
redirect_health: list[RedirectHealth] = field(default_factory=list)
indexation: IndexationStatus | None = None
ranking_changes: list[RankingChange] = field(default_factory=list)
recovery_estimate: dict[str, Any] = field(default_factory=dict)
alerts: list[MigrationAlert] = field(default_factory=list)
timestamp: str = ""
errors: list[str] = field(default_factory=list)
# ---------------------------------------------------------------------------
# Monitor
# ---------------------------------------------------------------------------
class MigrationMonitor(BaseAsyncClient):
"""Monitors post-migration SEO health using Ahrefs and Firecrawl MCP tools."""
# Alert thresholds
TRAFFIC_DROP_WARNING = 0.20 # 20% drop
TRAFFIC_DROP_CRITICAL = 0.40 # 40% drop
RANKING_DROP_THRESHOLD = 5 # 5+ position drop
INDEXATION_DROP_WARNING = 0.10 # 10% indexation loss
def __init__(self):
super().__init__(max_concurrent=5, requests_per_second=2.0)
@staticmethod
def _extract_domain(url: str) -> str:
"""Extract bare domain from URL or return as-is if already bare."""
if "://" in url:
parsed = urlparse(url)
return parsed.netloc.lower().replace("www.", "")
return url.lower().replace("www.", "")
async def _call_ahrefs(self, tool: str, params: dict[str, Any]) -> dict:
"""Simulate Ahrefs MCP call. In production, routed via MCP bridge."""
self.logger.info(f"Ahrefs MCP call: {tool} | params={params}")
return {"tool": tool, "params": params, "data": {}}
async def _call_firecrawl(self, tool: str, params: dict[str, Any]) -> dict:
"""Simulate Firecrawl MCP call. In production, routed via MCP bridge."""
self.logger.info(f"Firecrawl MCP call: {tool} | params={params}")
return {"tool": tool, "params": params, "data": {}}
# ------------------------------------------------------------------
# Traffic Comparison
# ------------------------------------------------------------------
async def compare_traffic(
self, domain: str, migration_date: str
) -> list[TrafficComparison]:
"""Compare traffic before and after migration date."""
domain = self._extract_domain(domain)
mig_date = datetime.strptime(migration_date, "%Y-%m-%d")
days_since = (datetime.now() - mig_date).days
# Pre-migration period: same duration before migration
pre_start = (mig_date - timedelta(days=max(days_since, 30))).strftime("%Y-%m-%d")
pre_end = (mig_date - timedelta(days=1)).strftime("%Y-%m-%d")
post_start = migration_date
post_end = datetime.now().strftime("%Y-%m-%d")
self.logger.info(
f"Comparing traffic for {domain}: "
f"pre={pre_start}..{pre_end} vs post={post_start}..{post_end}"
)
# Fetch pre-migration metrics history
pre_resp = await self._call_ahrefs(
"site-explorer-metrics-history",
{"target": domain, "date_from": pre_start, "date_to": pre_end},
)
pre_data = pre_resp.get("data", {}).get("data_points", [])
# Fetch post-migration metrics history
post_resp = await self._call_ahrefs(
"site-explorer-metrics-history",
{"target": domain, "date_from": post_start, "date_to": post_end},
)
post_data = post_resp.get("data", {}).get("data_points", [])
# Calculate averages
pre_avg_traffic = 0
if pre_data:
pre_avg_traffic = int(
sum(int(p.get("organic_traffic", 0)) for p in pre_data) / len(pre_data)
)
post_avg_traffic = 0
if post_data:
post_avg_traffic = int(
sum(int(p.get("organic_traffic", 0)) for p in post_data) / len(post_data)
)
# Overall comparison
change_pct = 0.0
if pre_avg_traffic > 0:
change_pct = ((post_avg_traffic - pre_avg_traffic) / pre_avg_traffic) * 100
status = "stable"
if change_pct > 5:
status = "improved"
elif change_pct < -40:
status = "critical"
elif change_pct < -20:
status = "declined"
comparisons = [
TrafficComparison(
page_group="Overall",
pre_traffic=pre_avg_traffic,
post_traffic=post_avg_traffic,
change_pct=round(change_pct, 2),
change_absolute=post_avg_traffic - pre_avg_traffic,
status=status,
)
]
# Fetch top pages comparison
pre_pages_resp = await self._call_ahrefs(
"site-explorer-pages-by-traffic",
{"target": domain, "limit": 50},
)
top_pages = pre_pages_resp.get("data", {}).get("pages", [])
for page in top_pages[:20]:
page_url = page.get("url", "")
page_traffic = int(page.get("traffic", 0))
# In production, would compare with baseline data
comparisons.append(
TrafficComparison(
page_group=page_url,
pre_traffic=0, # Would be populated from baseline
post_traffic=page_traffic,
change_pct=0.0,
change_absolute=0,
status="stable",
)
)
self.logger.info(
f"Traffic comparison for {domain}: "
f"pre={pre_avg_traffic:,} -> post={post_avg_traffic:,} "
f"({change_pct:+.1f}%)"
)
return comparisons
# ------------------------------------------------------------------
# Redirect Health Check
# ------------------------------------------------------------------
async def check_redirects(
self, redirect_map: list[dict[str, str]]
) -> list[RedirectHealth]:
"""Verify redirect health: check for broken redirects, chains, and loops."""
health_results: list[RedirectHealth] = []
self.logger.info(f"Checking {len(redirect_map)} redirects for health...")
for entry in redirect_map:
source = entry.get("source", "")
expected_target = entry.get("target", "")
if not source:
continue
# Use Firecrawl to check the redirect
resp = await self._call_firecrawl(
"firecrawl_scrape",
{"url": source, "formats": ["links"]},
)
result_data = resp.get("data", {})
final_url = result_data.get("final_url", "")
status_code = int(result_data.get("status_code", 0))
redirect_chain = result_data.get("redirect_chain", [])
chain_length = len(redirect_chain)
is_broken = (
status_code >= 400
or status_code == 0
or (final_url and final_url != expected_target and status_code != 301)
)
health = RedirectHealth(
source=source,
target=expected_target,
status_code=status_code,
chain_length=chain_length,
is_broken=is_broken,
final_url=final_url,
error="" if not is_broken else f"Expected {expected_target}, got {final_url} ({status_code})",
)
health_results.append(health)
broken_count = sum(1 for h in health_results if h.is_broken)
chain_count = sum(1 for h in health_results if h.chain_length > 1)
self.logger.info(
f"Redirect health check complete: "
f"{broken_count} broken, {chain_count} chains detected "
f"out of {len(health_results)} redirects"
)
return health_results
# ------------------------------------------------------------------
# Indexation Tracking
# ------------------------------------------------------------------
async def track_indexation(
self, domain: str, pre_baseline: dict[str, Any] | None = None
) -> IndexationStatus:
"""Compare indexed pages before and after migration."""
domain = self._extract_domain(domain)
self.logger.info(f"Tracking indexation for {domain}")
# Fetch current metrics
metrics_resp = await self._call_ahrefs(
"site-explorer-metrics", {"target": domain}
)
current_pages = int(metrics_resp.get("data", {}).get("pages", 0))
# Get pre-migration count from baseline
pre_count = 0
if pre_baseline:
pre_count = int(pre_baseline.get("total_urls", 0))
change_pct = 0.0
if pre_count > 0:
change_pct = ((current_pages - pre_count) / pre_count) * 100
# Fetch current top pages to detect missing ones
pages_resp = await self._call_ahrefs(
"site-explorer-top-pages", {"target": domain, "limit": 500}
)
current_page_urls = set()
for page in pages_resp.get("data", {}).get("pages", []):
url = page.get("url", "")
if url:
current_page_urls.add(url)
# Compare with baseline URL inventory
missing_pages: list[str] = []
if pre_baseline:
baseline_urls = pre_baseline.get("url_inventory", [])
for url_entry in baseline_urls:
url = url_entry if isinstance(url_entry, str) else url_entry.get("url", "")
if url and url not in current_page_urls:
missing_pages.append(url)
status = IndexationStatus(
pre_count=pre_count,
post_count=current_pages,
change_pct=round(change_pct, 2),
missing_pages=missing_pages[:100], # Cap at 100 for readability
deindexed_count=len(missing_pages),
)
self.logger.info(
f"Indexation for {domain}: "
f"pre={pre_count:,} -> post={current_pages:,} "
f"({change_pct:+.1f}%), {len(missing_pages)} missing"
)
return status
# ------------------------------------------------------------------
# Ranking Tracking
# ------------------------------------------------------------------
async def track_rankings(
self, domain: str, priority_keywords: list[str] | None = None
) -> list[RankingChange]:
"""Track ranking changes for priority keywords."""
domain = self._extract_domain(domain)
self.logger.info(f"Tracking rankings for {domain}")
# Fetch current keyword rankings
kw_resp = await self._call_ahrefs(
"site-explorer-organic-keywords",
{"target": domain, "limit": 200},
)
current_keywords = kw_resp.get("data", {}).get("keywords", [])
ranking_changes: list[RankingChange] = []
for kw_data in current_keywords:
keyword = kw_data.get("keyword", "")
# If priority keywords specified, filter
if priority_keywords and keyword.lower() not in [k.lower() for k in priority_keywords]:
continue
current_pos = int(kw_data.get("position", 0))
previous_pos = int(kw_data.get("previous_position", current_pos))
volume = int(kw_data.get("search_volume", 0))
url = kw_data.get("url", "")
change = previous_pos - current_pos # Positive = improved
ranking_changes.append(
RankingChange(
keyword=keyword,
pre_position=previous_pos,
post_position=current_pos,
change=change,
url=url,
search_volume=volume,
)
)
# Sort by absolute change (biggest drops first)
ranking_changes.sort(key=lambda r: r.change)
self.logger.info(
f"Tracked {len(ranking_changes)} keyword rankings for {domain}"
)
return ranking_changes
# ------------------------------------------------------------------
# Recovery Estimation
# ------------------------------------------------------------------
def estimate_recovery(
self, traffic_data: list[TrafficComparison], migration_type: str = "domain-move"
) -> dict[str, Any]:
"""Estimate recovery timeline based on traffic comparison data."""
overall = next(
(t for t in traffic_data if t.page_group == "Overall"), None
)
if not overall:
return {
"estimated_weeks": "unknown",
"confidence": "low",
"message": "트래픽 데이터 부족으로 회복 기간 추정 불가",
}
change_pct = overall.change_pct
# Base recovery timelines by migration type (weeks)
base_timelines = {
"domain-move": 16, # 4 months
"platform": 8, # 2 months
"url-restructure": 12, # 3 months
"https": 4, # 1 month
"subdomain": 10, # 2.5 months
}
base_weeks = base_timelines.get(migration_type, 12)
if change_pct >= 0:
# No traffic drop — recovery already achieved or in progress
return {
"estimated_weeks": 0,
"confidence": "high",
"current_recovery_pct": 100.0,
"message": "트래픽 손실 없음 — 이전 성공적으로 진행 중",
}
elif change_pct > -20:
# Minor drop — quick recovery expected
estimated_weeks = max(int(base_weeks * 0.5), 2)
confidence = "high"
recovery_pct = round(100 + change_pct, 1)
elif change_pct > -40:
# Moderate drop — standard recovery timeline
estimated_weeks = base_weeks
confidence = "medium"
recovery_pct = round(100 + change_pct, 1)
else:
# Severe drop — extended recovery
estimated_weeks = int(base_weeks * 1.5)
confidence = "low"
recovery_pct = round(100 + change_pct, 1)
return {
"estimated_weeks": estimated_weeks,
"confidence": confidence,
"current_recovery_pct": recovery_pct,
"traffic_change_pct": change_pct,
"migration_type": migration_type,
"message": (
f"현재 트래픽 {change_pct:+.1f}% 변동. "
f"예상 회복 기간: {estimated_weeks}주 (신뢰도: {confidence}). "
f"현재 회복률: {recovery_pct:.1f}%"
),
}
# ------------------------------------------------------------------
# Alert Generation
# ------------------------------------------------------------------
def generate_alerts(self, report: MigrationReport) -> list[MigrationAlert]:
"""Generate alerts for significant post-migration issues."""
alerts: list[MigrationAlert] = []
# Traffic drop alerts
for tc in report.traffic_comparison:
if tc.page_group == "Overall":
abs_change = abs(tc.change_pct) / 100.0
if tc.change_pct < 0 and abs_change >= self.TRAFFIC_DROP_CRITICAL:
alerts.append(MigrationAlert(
alert_type="traffic_drop",
severity="critical",
message=(
f"심각한 트래픽 하락: {tc.change_pct:+.1f}% "
f"(이전 전 {tc.pre_traffic:,} -> 이전 후 {tc.post_traffic:,})"
),
metric_value=tc.change_pct,
threshold=-self.TRAFFIC_DROP_CRITICAL * 100,
))
elif tc.change_pct < 0 and abs_change >= self.TRAFFIC_DROP_WARNING:
alerts.append(MigrationAlert(
alert_type="traffic_drop",
severity="warning",
message=(
f"트래픽 하락 감지: {tc.change_pct:+.1f}% "
f"(이전 전 {tc.pre_traffic:,} -> 이전 후 {tc.post_traffic:,})"
),
metric_value=tc.change_pct,
threshold=-self.TRAFFIC_DROP_WARNING * 100,
))
# Broken redirect alerts
broken_redirects = [r for r in report.redirect_health if r.is_broken]
if broken_redirects:
severity = "critical" if len(broken_redirects) > 10 else "warning"
alerts.append(MigrationAlert(
alert_type="redirect_broken",
severity=severity,
message=(
f"깨진 리디렉트 {len(broken_redirects)}건 감지. "
f"고가치 페이지의 링크 에퀴티 손실 위험."
),
metric_value=float(len(broken_redirects)),
threshold=1.0,
affected_urls=[r.source for r in broken_redirects[:20]],
))
# Redirect chain alerts
chain_redirects = [r for r in report.redirect_health if r.chain_length > 1]
if chain_redirects:
alerts.append(MigrationAlert(
alert_type="redirect_chain",
severity="warning",
message=(
f"리디렉트 체인 {len(chain_redirects)}건 감지. "
f"크롤 효율성 및 링크 에퀴티에 영향."
),
metric_value=float(len(chain_redirects)),
threshold=1.0,
affected_urls=[r.source for r in chain_redirects[:20]],
))
# Indexation drop alerts
if report.indexation:
idx = report.indexation
if idx.pre_count > 0:
idx_drop = abs(idx.change_pct) / 100.0
if idx.change_pct < 0 and idx_drop >= self.INDEXATION_DROP_WARNING:
alerts.append(MigrationAlert(
alert_type="indexation_drop",
severity="warning" if idx_drop < 0.30 else "critical",
message=(
f"인덱싱 감소: {idx.change_pct:+.1f}% "
f"(이전 전 {idx.pre_count:,} -> 이전 후 {idx.post_count:,}페이지). "
f"디인덱싱된 페이지: {idx.deindexed_count}"
),
metric_value=idx.change_pct,
threshold=-self.INDEXATION_DROP_WARNING * 100,
affected_urls=idx.missing_pages[:20],
))
# Ranking loss alerts
significant_drops = [
r for r in report.ranking_changes
if r.change < -self.RANKING_DROP_THRESHOLD and r.search_volume > 100
]
if significant_drops:
alerts.append(MigrationAlert(
alert_type="ranking_loss",
severity="warning" if len(significant_drops) < 20 else "critical",
message=(
f"주요 키워드 {len(significant_drops)}개의 순위 하락 감지 "
f"(5포지션 이상 하락, 검색량 100+)"
),
metric_value=float(len(significant_drops)),
threshold=float(self.RANKING_DROP_THRESHOLD),
affected_urls=[r.url for r in significant_drops[:20]],
))
# Sort alerts by severity
severity_order = {"critical": 0, "warning": 1, "info": 2}
alerts.sort(key=lambda a: severity_order.get(a.severity, 3))
self.logger.info(f"Generated {len(alerts)} migration alerts")
return alerts
# ------------------------------------------------------------------
# Orchestrator
# ------------------------------------------------------------------
async def run(
self,
domain: str,
migration_date: str,
baseline_file: str | None = None,
migration_type: str = "domain-move",
) -> MigrationReport:
"""Orchestrate full post-migration monitoring pipeline."""
timestamp = datetime.now().isoformat()
mig_date = datetime.strptime(migration_date, "%Y-%m-%d")
days_since = (datetime.now() - mig_date).days
report = MigrationReport(
domain=self._extract_domain(domain),
migration_date=migration_date,
days_since_migration=days_since,
timestamp=timestamp,
)
# Load baseline if provided
baseline: dict[str, Any] | None = None
redirect_map_data: list[dict[str, str]] = []
if baseline_file:
try:
with open(baseline_file, "r", encoding="utf-8") as f:
baseline_raw = json.load(f)
baseline = baseline_raw.get("baseline", baseline_raw)
redirect_map_data = [
{"source": r.get("source", ""), "target": r.get("target", "")}
for r in baseline_raw.get("redirect_map", [])
]
self.logger.info(f"Loaded baseline from {baseline_file}")
except Exception as e:
msg = f"Failed to load baseline file: {e}"
self.logger.error(msg)
report.errors.append(msg)
try:
# Step 1: Traffic comparison
self.logger.info("Step 1/5: Comparing pre/post traffic...")
report.traffic_comparison = await self.compare_traffic(
domain, migration_date
)
# Step 2: Redirect health check
if redirect_map_data:
self.logger.info("Step 2/5: Checking redirect health...")
report.redirect_health = await self.check_redirects(redirect_map_data)
else:
self.logger.info(
"Step 2/5: Skipping redirect check (no baseline redirect map)"
)
# Step 3: Indexation tracking
self.logger.info("Step 3/5: Tracking indexation changes...")
report.indexation = await self.track_indexation(domain, baseline)
# Step 4: Ranking tracking
self.logger.info("Step 4/5: Tracking keyword rankings...")
report.ranking_changes = await self.track_rankings(domain)
# Step 5: Recovery estimation
self.logger.info("Step 5/5: Estimating recovery timeline...")
report.recovery_estimate = self.estimate_recovery(
report.traffic_comparison, migration_type
)
# Generate alerts
report.alerts = self.generate_alerts(report)
self.logger.info(
f"Migration monitoring complete: "
f"{days_since} days since migration, "
f"{len(report.alerts)} alerts generated"
)
except Exception as e:
msg = f"Migration monitoring pipeline error: {e}"
self.logger.error(msg)
report.errors.append(msg)
return report
# ---------------------------------------------------------------------------
# Output helpers
# ---------------------------------------------------------------------------
def _format_text_report(report: MigrationReport) -> str:
"""Format monitoring report as human-readable text."""
lines: list[str] = []
lines.append("=" * 70)
lines.append(" SEO MIGRATION MONITORING REPORT")
lines.append(f" Domain: {report.domain}")
lines.append(f" Migration Date: {report.migration_date}")
lines.append(f" Days Since Migration: {report.days_since_migration}")
lines.append(f" Generated: {report.timestamp}")
lines.append("=" * 70)
# Alerts
if report.alerts:
lines.append("")
lines.append("--- ALERTS ---")
for alert in report.alerts:
icon = {"critical": "[!]", "warning": "[*]", "info": "[-]"}.get(
alert.severity, "[-]"
)
lines.append(f" {icon} [{alert.severity.upper()}] {alert.message}")
if alert.affected_urls:
for url in alert.affected_urls[:5]:
lines.append(f" - {url}")
if len(alert.affected_urls) > 5:
lines.append(f" ... and {len(alert.affected_urls) - 5} more")
# Traffic comparison
if report.traffic_comparison:
lines.append("")
lines.append("--- TRAFFIC COMPARISON ---")
lines.append(
f" {'Page Group':<40} {'Pre':>10} {'Post':>10} {'Change':>10} {'Status':>10}"
)
lines.append(" " + "-" * 83)
for tc in report.traffic_comparison:
group = tc.page_group[:38]
lines.append(
f" {group:<40} {tc.pre_traffic:>10,} {tc.post_traffic:>10,} "
f"{tc.change_pct:>+9.1f}% {tc.status:>10}"
)
# Redirect health
if report.redirect_health:
broken = [r for r in report.redirect_health if r.is_broken]
chains = [r for r in report.redirect_health if r.chain_length > 1]
healthy = [r for r in report.redirect_health if not r.is_broken and r.chain_length <= 1]
lines.append("")
lines.append("--- REDIRECT HEALTH ---")
lines.append(f" Total Redirects: {len(report.redirect_health):,}")
lines.append(f" Healthy: {len(healthy):,}")
lines.append(f" Broken: {len(broken):,}")
lines.append(f" Chains (>1 hop): {len(chains):,}")
if broken:
lines.append("")
lines.append(" Broken Redirects:")
for r in broken[:10]:
lines.append(f" [{r.status_code}] {r.source} -> {r.target}")
if r.error:
lines.append(f" Error: {r.error}")
# Indexation
if report.indexation:
idx = report.indexation
lines.append("")
lines.append("--- INDEXATION STATUS ---")
lines.append(f" Pre-Migration Pages: {idx.pre_count:,}")
lines.append(f" Post-Migration Pages: {idx.post_count:,}")
lines.append(f" Change: {idx.change_pct:+.1f}%")
lines.append(f" De-indexed Pages: {idx.deindexed_count:,}")
if idx.missing_pages:
lines.append("")
lines.append(" Missing Pages (top 10):")
for page in idx.missing_pages[:10]:
lines.append(f" - {page}")
# Ranking changes
if report.ranking_changes:
lines.append("")
lines.append("--- RANKING CHANGES ---")
drops = [r for r in report.ranking_changes if r.change < 0]
gains = [r for r in report.ranking_changes if r.change > 0]
lines.append(f" Total Tracked: {len(report.ranking_changes)}")
lines.append(f" Improved: {len(gains)}")
lines.append(f" Declined: {len(drops)}")
if drops:
lines.append("")
lines.append(" Biggest Drops:")
lines.append(
f" {'Keyword':<30} {'Pre':>6} {'Post':>6} {'Change':>8} {'Volume':>8}"
)
lines.append(" " + "-" * 61)
for r in drops[:15]:
kw = r.keyword[:28]
lines.append(
f" {kw:<30} {r.pre_position:>6} {r.post_position:>6} "
f"{r.change:>+7} {r.search_volume:>8,}"
)
# Recovery estimate
if report.recovery_estimate:
est = report.recovery_estimate
lines.append("")
lines.append("--- RECOVERY ESTIMATE ---")
lines.append(f" {est.get('message', 'N/A')}")
weeks = est.get("estimated_weeks", "unknown")
confidence = est.get("confidence", "unknown")
lines.append(f" Estimated Weeks: {weeks}")
lines.append(f" Confidence: {confidence}")
if report.errors:
lines.append("")
lines.append("--- ERRORS ---")
for err in report.errors:
lines.append(f" - {err}")
lines.append("")
lines.append("=" * 70)
return "\n".join(lines)
def _serialize_report(report: MigrationReport) -> dict:
"""Convert report to JSON-serializable dict."""
output: dict[str, Any] = {
"domain": report.domain,
"migration_date": report.migration_date,
"days_since_migration": report.days_since_migration,
"traffic_comparison": [asdict(t) for t in report.traffic_comparison],
"redirect_health": [asdict(r) for r in report.redirect_health],
"indexation": asdict(report.indexation) if report.indexation else None,
"ranking_changes": [asdict(r) for r in report.ranking_changes],
"recovery_estimate": report.recovery_estimate,
"alerts": [asdict(a) for a in report.alerts],
"timestamp": report.timestamp,
}
if report.errors:
output["errors"] = report.errors
return output
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Migration Monitor - Post-migration SEO monitoring and alerting",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""\
Examples:
python migration_monitor.py --domain https://new-example.com --migration-date 2025-01-15 --baseline baseline.json --json
python migration_monitor.py --domain https://new-example.com --migration-date 2025-01-15 --json
""",
)
parser.add_argument(
"--domain",
required=True,
help="Domain to monitor (post-migration URL)",
)
parser.add_argument(
"--migration-date",
required=True,
help="Migration date in YYYY-MM-DD format",
)
parser.add_argument(
"--baseline",
type=str,
default=None,
help="Path to baseline JSON file from migration_planner.py",
)
parser.add_argument(
"--type",
choices=["domain-move", "platform", "url-restructure", "https", "subdomain"],
default="domain-move",
help="Migration type for recovery estimation (default: domain-move)",
)
parser.add_argument(
"--json",
action="store_true",
default=False,
help="Output in JSON format",
)
parser.add_argument(
"--output",
type=str,
default=None,
help="Save output to file path",
)
return parser.parse_args(argv)
async def async_main(args: argparse.Namespace) -> None:
monitor = MigrationMonitor()
report = await monitor.run(
domain=args.domain,
migration_date=args.migration_date,
baseline_file=args.baseline,
migration_type=args.type,
)
if args.json:
output_str = json.dumps(_serialize_report(report), indent=2, ensure_ascii=False)
else:
output_str = _format_text_report(report)
if args.output:
with open(args.output, "w", encoding="utf-8") as f:
f.write(output_str)
logger.info(f"Migration report saved to {args.output}")
else:
print(output_str)
monitor.print_stats()
def main() -> None:
args = parse_args()
asyncio.run(async_main(args))
if __name__ == "__main__":
main()