""" Migration Monitor - Post-Migration Traffic & Indexation Monitoring ================================================================== Purpose: Post-migration traffic comparison, redirect health checks, indexation tracking, ranking change monitoring, and alert generation. Python: 3.10+ Usage: python migration_monitor.py --domain https://new-example.com --migration-date 2025-01-15 --baseline baseline.json --json python migration_monitor.py --domain https://new-example.com --migration-date 2025-01-15 --json """ import argparse import asyncio import json import logging import sys from dataclasses import dataclass, field, asdict from datetime import datetime, timedelta from typing import Any from urllib.parse import urlparse from base_client import BaseAsyncClient, config logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Data classes # --------------------------------------------------------------------------- @dataclass class TrafficComparison: """Traffic comparison between pre- and post-migration periods.""" page_group: str = "" pre_traffic: int = 0 post_traffic: int = 0 change_pct: float = 0.0 change_absolute: int = 0 status: str = "stable" # improved / stable / declined / critical @dataclass class RedirectHealth: """Health status of a single redirect.""" source: str = "" target: str = "" status_code: int = 0 chain_length: int = 0 is_broken: bool = False final_url: str = "" error: str = "" @dataclass class IndexationStatus: """Indexation comparison before and after migration.""" pre_count: int = 0 post_count: int = 0 change_pct: float = 0.0 missing_pages: list[str] = field(default_factory=list) new_pages: list[str] = field(default_factory=list) deindexed_count: int = 0 @dataclass class RankingChange: """Ranking change for a keyword.""" keyword: str = "" pre_position: int = 0 post_position: int = 0 change: int = 0 url: str = "" search_volume: int = 0 @dataclass class MigrationAlert: """Alert for significant post-migration issues.""" alert_type: str = "" # traffic_drop, redirect_broken, indexation_drop, ranking_loss severity: str = "info" # info / warning / critical message: str = "" metric_value: float = 0.0 threshold: float = 0.0 affected_urls: list[str] = field(default_factory=list) @dataclass class MigrationReport: """Complete post-migration monitoring report.""" domain: str = "" migration_date: str = "" days_since_migration: int = 0 traffic_comparison: list[TrafficComparison] = field(default_factory=list) redirect_health: list[RedirectHealth] = field(default_factory=list) indexation: IndexationStatus | None = None ranking_changes: list[RankingChange] = field(default_factory=list) recovery_estimate: dict[str, Any] = field(default_factory=dict) alerts: list[MigrationAlert] = field(default_factory=list) timestamp: str = "" errors: list[str] = field(default_factory=list) # --------------------------------------------------------------------------- # Monitor # --------------------------------------------------------------------------- class MigrationMonitor(BaseAsyncClient): """Monitors post-migration SEO health using Ahrefs and Firecrawl MCP tools.""" # Alert thresholds TRAFFIC_DROP_WARNING = 0.20 # 20% drop TRAFFIC_DROP_CRITICAL = 0.40 # 40% drop RANKING_DROP_THRESHOLD = 5 # 5+ position drop INDEXATION_DROP_WARNING = 0.10 # 10% indexation loss def __init__(self): super().__init__(max_concurrent=5, requests_per_second=2.0) @staticmethod def _extract_domain(url: str) -> str: """Extract bare domain from URL or return as-is if already bare.""" if "://" in url: parsed = urlparse(url) return parsed.netloc.lower().replace("www.", "") return url.lower().replace("www.", "") async def _call_ahrefs(self, tool: str, params: dict[str, Any]) -> dict: """Simulate Ahrefs MCP call. In production, routed via MCP bridge.""" self.logger.info(f"Ahrefs MCP call: {tool} | params={params}") return {"tool": tool, "params": params, "data": {}} async def _call_firecrawl(self, tool: str, params: dict[str, Any]) -> dict: """Simulate Firecrawl MCP call. In production, routed via MCP bridge.""" self.logger.info(f"Firecrawl MCP call: {tool} | params={params}") return {"tool": tool, "params": params, "data": {}} # ------------------------------------------------------------------ # Traffic Comparison # ------------------------------------------------------------------ async def compare_traffic( self, domain: str, migration_date: str ) -> list[TrafficComparison]: """Compare traffic before and after migration date.""" domain = self._extract_domain(domain) mig_date = datetime.strptime(migration_date, "%Y-%m-%d") days_since = (datetime.now() - mig_date).days # Pre-migration period: same duration before migration pre_start = (mig_date - timedelta(days=max(days_since, 30))).strftime("%Y-%m-%d") pre_end = (mig_date - timedelta(days=1)).strftime("%Y-%m-%d") post_start = migration_date post_end = datetime.now().strftime("%Y-%m-%d") self.logger.info( f"Comparing traffic for {domain}: " f"pre={pre_start}..{pre_end} vs post={post_start}..{post_end}" ) # Fetch pre-migration metrics history pre_resp = await self._call_ahrefs( "site-explorer-metrics-history", {"target": domain, "date_from": pre_start, "date_to": pre_end}, ) pre_data = pre_resp.get("data", {}).get("data_points", []) # Fetch post-migration metrics history post_resp = await self._call_ahrefs( "site-explorer-metrics-history", {"target": domain, "date_from": post_start, "date_to": post_end}, ) post_data = post_resp.get("data", {}).get("data_points", []) # Calculate averages pre_avg_traffic = 0 if pre_data: pre_avg_traffic = int( sum(int(p.get("organic_traffic", 0)) for p in pre_data) / len(pre_data) ) post_avg_traffic = 0 if post_data: post_avg_traffic = int( sum(int(p.get("organic_traffic", 0)) for p in post_data) / len(post_data) ) # Overall comparison change_pct = 0.0 if pre_avg_traffic > 0: change_pct = ((post_avg_traffic - pre_avg_traffic) / pre_avg_traffic) * 100 status = "stable" if change_pct > 5: status = "improved" elif change_pct < -40: status = "critical" elif change_pct < -20: status = "declined" comparisons = [ TrafficComparison( page_group="Overall", pre_traffic=pre_avg_traffic, post_traffic=post_avg_traffic, change_pct=round(change_pct, 2), change_absolute=post_avg_traffic - pre_avg_traffic, status=status, ) ] # Fetch top pages comparison pre_pages_resp = await self._call_ahrefs( "site-explorer-pages-by-traffic", {"target": domain, "limit": 50}, ) top_pages = pre_pages_resp.get("data", {}).get("pages", []) for page in top_pages[:20]: page_url = page.get("url", "") page_traffic = int(page.get("traffic", 0)) # In production, would compare with baseline data comparisons.append( TrafficComparison( page_group=page_url, pre_traffic=0, # Would be populated from baseline post_traffic=page_traffic, change_pct=0.0, change_absolute=0, status="stable", ) ) self.logger.info( f"Traffic comparison for {domain}: " f"pre={pre_avg_traffic:,} -> post={post_avg_traffic:,} " f"({change_pct:+.1f}%)" ) return comparisons # ------------------------------------------------------------------ # Redirect Health Check # ------------------------------------------------------------------ async def check_redirects( self, redirect_map: list[dict[str, str]] ) -> list[RedirectHealth]: """Verify redirect health: check for broken redirects, chains, and loops.""" health_results: list[RedirectHealth] = [] self.logger.info(f"Checking {len(redirect_map)} redirects for health...") for entry in redirect_map: source = entry.get("source", "") expected_target = entry.get("target", "") if not source: continue # Use Firecrawl to check the redirect resp = await self._call_firecrawl( "firecrawl_scrape", {"url": source, "formats": ["links"]}, ) result_data = resp.get("data", {}) final_url = result_data.get("final_url", "") status_code = int(result_data.get("status_code", 0)) redirect_chain = result_data.get("redirect_chain", []) chain_length = len(redirect_chain) is_broken = ( status_code >= 400 or status_code == 0 or (final_url and final_url != expected_target and status_code != 301) ) health = RedirectHealth( source=source, target=expected_target, status_code=status_code, chain_length=chain_length, is_broken=is_broken, final_url=final_url, error="" if not is_broken else f"Expected {expected_target}, got {final_url} ({status_code})", ) health_results.append(health) broken_count = sum(1 for h in health_results if h.is_broken) chain_count = sum(1 for h in health_results if h.chain_length > 1) self.logger.info( f"Redirect health check complete: " f"{broken_count} broken, {chain_count} chains detected " f"out of {len(health_results)} redirects" ) return health_results # ------------------------------------------------------------------ # Indexation Tracking # ------------------------------------------------------------------ async def track_indexation( self, domain: str, pre_baseline: dict[str, Any] | None = None ) -> IndexationStatus: """Compare indexed pages before and after migration.""" domain = self._extract_domain(domain) self.logger.info(f"Tracking indexation for {domain}") # Fetch current metrics metrics_resp = await self._call_ahrefs( "site-explorer-metrics", {"target": domain} ) current_pages = int(metrics_resp.get("data", {}).get("pages", 0)) # Get pre-migration count from baseline pre_count = 0 if pre_baseline: pre_count = int(pre_baseline.get("total_urls", 0)) change_pct = 0.0 if pre_count > 0: change_pct = ((current_pages - pre_count) / pre_count) * 100 # Fetch current top pages to detect missing ones pages_resp = await self._call_ahrefs( "site-explorer-top-pages", {"target": domain, "limit": 500} ) current_page_urls = set() for page in pages_resp.get("data", {}).get("pages", []): url = page.get("url", "") if url: current_page_urls.add(url) # Compare with baseline URL inventory missing_pages: list[str] = [] if pre_baseline: baseline_urls = pre_baseline.get("url_inventory", []) for url_entry in baseline_urls: url = url_entry if isinstance(url_entry, str) else url_entry.get("url", "") if url and url not in current_page_urls: missing_pages.append(url) status = IndexationStatus( pre_count=pre_count, post_count=current_pages, change_pct=round(change_pct, 2), missing_pages=missing_pages[:100], # Cap at 100 for readability deindexed_count=len(missing_pages), ) self.logger.info( f"Indexation for {domain}: " f"pre={pre_count:,} -> post={current_pages:,} " f"({change_pct:+.1f}%), {len(missing_pages)} missing" ) return status # ------------------------------------------------------------------ # Ranking Tracking # ------------------------------------------------------------------ async def track_rankings( self, domain: str, priority_keywords: list[str] | None = None ) -> list[RankingChange]: """Track ranking changes for priority keywords.""" domain = self._extract_domain(domain) self.logger.info(f"Tracking rankings for {domain}") # Fetch current keyword rankings kw_resp = await self._call_ahrefs( "site-explorer-organic-keywords", {"target": domain, "limit": 200}, ) current_keywords = kw_resp.get("data", {}).get("keywords", []) ranking_changes: list[RankingChange] = [] for kw_data in current_keywords: keyword = kw_data.get("keyword", "") # If priority keywords specified, filter if priority_keywords and keyword.lower() not in [k.lower() for k in priority_keywords]: continue current_pos = int(kw_data.get("position", 0)) previous_pos = int(kw_data.get("previous_position", current_pos)) volume = int(kw_data.get("search_volume", 0)) url = kw_data.get("url", "") change = previous_pos - current_pos # Positive = improved ranking_changes.append( RankingChange( keyword=keyword, pre_position=previous_pos, post_position=current_pos, change=change, url=url, search_volume=volume, ) ) # Sort by absolute change (biggest drops first) ranking_changes.sort(key=lambda r: r.change) self.logger.info( f"Tracked {len(ranking_changes)} keyword rankings for {domain}" ) return ranking_changes # ------------------------------------------------------------------ # Recovery Estimation # ------------------------------------------------------------------ def estimate_recovery( self, traffic_data: list[TrafficComparison], migration_type: str = "domain-move" ) -> dict[str, Any]: """Estimate recovery timeline based on traffic comparison data.""" overall = next( (t for t in traffic_data if t.page_group == "Overall"), None ) if not overall: return { "estimated_weeks": "unknown", "confidence": "low", "message": "트래픽 데이터 부족으로 회복 기간 추정 불가", } change_pct = overall.change_pct # Base recovery timelines by migration type (weeks) base_timelines = { "domain-move": 16, # 4 months "platform": 8, # 2 months "url-restructure": 12, # 3 months "https": 4, # 1 month "subdomain": 10, # 2.5 months } base_weeks = base_timelines.get(migration_type, 12) if change_pct >= 0: # No traffic drop — recovery already achieved or in progress return { "estimated_weeks": 0, "confidence": "high", "current_recovery_pct": 100.0, "message": "트래픽 손실 없음 — 이전 성공적으로 진행 중", } elif change_pct > -20: # Minor drop — quick recovery expected estimated_weeks = max(int(base_weeks * 0.5), 2) confidence = "high" recovery_pct = round(100 + change_pct, 1) elif change_pct > -40: # Moderate drop — standard recovery timeline estimated_weeks = base_weeks confidence = "medium" recovery_pct = round(100 + change_pct, 1) else: # Severe drop — extended recovery estimated_weeks = int(base_weeks * 1.5) confidence = "low" recovery_pct = round(100 + change_pct, 1) return { "estimated_weeks": estimated_weeks, "confidence": confidence, "current_recovery_pct": recovery_pct, "traffic_change_pct": change_pct, "migration_type": migration_type, "message": ( f"현재 트래픽 {change_pct:+.1f}% 변동. " f"예상 회복 기간: {estimated_weeks}주 (신뢰도: {confidence}). " f"현재 회복률: {recovery_pct:.1f}%" ), } # ------------------------------------------------------------------ # Alert Generation # ------------------------------------------------------------------ def generate_alerts(self, report: MigrationReport) -> list[MigrationAlert]: """Generate alerts for significant post-migration issues.""" alerts: list[MigrationAlert] = [] # Traffic drop alerts for tc in report.traffic_comparison: if tc.page_group == "Overall": abs_change = abs(tc.change_pct) / 100.0 if tc.change_pct < 0 and abs_change >= self.TRAFFIC_DROP_CRITICAL: alerts.append(MigrationAlert( alert_type="traffic_drop", severity="critical", message=( f"심각한 트래픽 하락: {tc.change_pct:+.1f}% " f"(이전 전 {tc.pre_traffic:,} -> 이전 후 {tc.post_traffic:,})" ), metric_value=tc.change_pct, threshold=-self.TRAFFIC_DROP_CRITICAL * 100, )) elif tc.change_pct < 0 and abs_change >= self.TRAFFIC_DROP_WARNING: alerts.append(MigrationAlert( alert_type="traffic_drop", severity="warning", message=( f"트래픽 하락 감지: {tc.change_pct:+.1f}% " f"(이전 전 {tc.pre_traffic:,} -> 이전 후 {tc.post_traffic:,})" ), metric_value=tc.change_pct, threshold=-self.TRAFFIC_DROP_WARNING * 100, )) # Broken redirect alerts broken_redirects = [r for r in report.redirect_health if r.is_broken] if broken_redirects: severity = "critical" if len(broken_redirects) > 10 else "warning" alerts.append(MigrationAlert( alert_type="redirect_broken", severity=severity, message=( f"깨진 리디렉트 {len(broken_redirects)}건 감지. " f"고가치 페이지의 링크 에퀴티 손실 위험." ), metric_value=float(len(broken_redirects)), threshold=1.0, affected_urls=[r.source for r in broken_redirects[:20]], )) # Redirect chain alerts chain_redirects = [r for r in report.redirect_health if r.chain_length > 1] if chain_redirects: alerts.append(MigrationAlert( alert_type="redirect_chain", severity="warning", message=( f"리디렉트 체인 {len(chain_redirects)}건 감지. " f"크롤 효율성 및 링크 에퀴티에 영향." ), metric_value=float(len(chain_redirects)), threshold=1.0, affected_urls=[r.source for r in chain_redirects[:20]], )) # Indexation drop alerts if report.indexation: idx = report.indexation if idx.pre_count > 0: idx_drop = abs(idx.change_pct) / 100.0 if idx.change_pct < 0 and idx_drop >= self.INDEXATION_DROP_WARNING: alerts.append(MigrationAlert( alert_type="indexation_drop", severity="warning" if idx_drop < 0.30 else "critical", message=( f"인덱싱 감소: {idx.change_pct:+.1f}% " f"(이전 전 {idx.pre_count:,} -> 이전 후 {idx.post_count:,}페이지). " f"디인덱싱된 페이지: {idx.deindexed_count}건" ), metric_value=idx.change_pct, threshold=-self.INDEXATION_DROP_WARNING * 100, affected_urls=idx.missing_pages[:20], )) # Ranking loss alerts significant_drops = [ r for r in report.ranking_changes if r.change < -self.RANKING_DROP_THRESHOLD and r.search_volume > 100 ] if significant_drops: alerts.append(MigrationAlert( alert_type="ranking_loss", severity="warning" if len(significant_drops) < 20 else "critical", message=( f"주요 키워드 {len(significant_drops)}개의 순위 하락 감지 " f"(5포지션 이상 하락, 검색량 100+)" ), metric_value=float(len(significant_drops)), threshold=float(self.RANKING_DROP_THRESHOLD), affected_urls=[r.url for r in significant_drops[:20]], )) # Sort alerts by severity severity_order = {"critical": 0, "warning": 1, "info": 2} alerts.sort(key=lambda a: severity_order.get(a.severity, 3)) self.logger.info(f"Generated {len(alerts)} migration alerts") return alerts # ------------------------------------------------------------------ # Orchestrator # ------------------------------------------------------------------ async def run( self, domain: str, migration_date: str, baseline_file: str | None = None, migration_type: str = "domain-move", ) -> MigrationReport: """Orchestrate full post-migration monitoring pipeline.""" timestamp = datetime.now().isoformat() mig_date = datetime.strptime(migration_date, "%Y-%m-%d") days_since = (datetime.now() - mig_date).days report = MigrationReport( domain=self._extract_domain(domain), migration_date=migration_date, days_since_migration=days_since, timestamp=timestamp, ) # Load baseline if provided baseline: dict[str, Any] | None = None redirect_map_data: list[dict[str, str]] = [] if baseline_file: try: with open(baseline_file, "r", encoding="utf-8") as f: baseline_raw = json.load(f) baseline = baseline_raw.get("baseline", baseline_raw) redirect_map_data = [ {"source": r.get("source", ""), "target": r.get("target", "")} for r in baseline_raw.get("redirect_map", []) ] self.logger.info(f"Loaded baseline from {baseline_file}") except Exception as e: msg = f"Failed to load baseline file: {e}" self.logger.error(msg) report.errors.append(msg) try: # Step 1: Traffic comparison self.logger.info("Step 1/5: Comparing pre/post traffic...") report.traffic_comparison = await self.compare_traffic( domain, migration_date ) # Step 2: Redirect health check if redirect_map_data: self.logger.info("Step 2/5: Checking redirect health...") report.redirect_health = await self.check_redirects(redirect_map_data) else: self.logger.info( "Step 2/5: Skipping redirect check (no baseline redirect map)" ) # Step 3: Indexation tracking self.logger.info("Step 3/5: Tracking indexation changes...") report.indexation = await self.track_indexation(domain, baseline) # Step 4: Ranking tracking self.logger.info("Step 4/5: Tracking keyword rankings...") report.ranking_changes = await self.track_rankings(domain) # Step 5: Recovery estimation self.logger.info("Step 5/5: Estimating recovery timeline...") report.recovery_estimate = self.estimate_recovery( report.traffic_comparison, migration_type ) # Generate alerts report.alerts = self.generate_alerts(report) self.logger.info( f"Migration monitoring complete: " f"{days_since} days since migration, " f"{len(report.alerts)} alerts generated" ) except Exception as e: msg = f"Migration monitoring pipeline error: {e}" self.logger.error(msg) report.errors.append(msg) return report # --------------------------------------------------------------------------- # Output helpers # --------------------------------------------------------------------------- def _format_text_report(report: MigrationReport) -> str: """Format monitoring report as human-readable text.""" lines: list[str] = [] lines.append("=" * 70) lines.append(" SEO MIGRATION MONITORING REPORT") lines.append(f" Domain: {report.domain}") lines.append(f" Migration Date: {report.migration_date}") lines.append(f" Days Since Migration: {report.days_since_migration}") lines.append(f" Generated: {report.timestamp}") lines.append("=" * 70) # Alerts if report.alerts: lines.append("") lines.append("--- ALERTS ---") for alert in report.alerts: icon = {"critical": "[!]", "warning": "[*]", "info": "[-]"}.get( alert.severity, "[-]" ) lines.append(f" {icon} [{alert.severity.upper()}] {alert.message}") if alert.affected_urls: for url in alert.affected_urls[:5]: lines.append(f" - {url}") if len(alert.affected_urls) > 5: lines.append(f" ... and {len(alert.affected_urls) - 5} more") # Traffic comparison if report.traffic_comparison: lines.append("") lines.append("--- TRAFFIC COMPARISON ---") lines.append( f" {'Page Group':<40} {'Pre':>10} {'Post':>10} {'Change':>10} {'Status':>10}" ) lines.append(" " + "-" * 83) for tc in report.traffic_comparison: group = tc.page_group[:38] lines.append( f" {group:<40} {tc.pre_traffic:>10,} {tc.post_traffic:>10,} " f"{tc.change_pct:>+9.1f}% {tc.status:>10}" ) # Redirect health if report.redirect_health: broken = [r for r in report.redirect_health if r.is_broken] chains = [r for r in report.redirect_health if r.chain_length > 1] healthy = [r for r in report.redirect_health if not r.is_broken and r.chain_length <= 1] lines.append("") lines.append("--- REDIRECT HEALTH ---") lines.append(f" Total Redirects: {len(report.redirect_health):,}") lines.append(f" Healthy: {len(healthy):,}") lines.append(f" Broken: {len(broken):,}") lines.append(f" Chains (>1 hop): {len(chains):,}") if broken: lines.append("") lines.append(" Broken Redirects:") for r in broken[:10]: lines.append(f" [{r.status_code}] {r.source} -> {r.target}") if r.error: lines.append(f" Error: {r.error}") # Indexation if report.indexation: idx = report.indexation lines.append("") lines.append("--- INDEXATION STATUS ---") lines.append(f" Pre-Migration Pages: {idx.pre_count:,}") lines.append(f" Post-Migration Pages: {idx.post_count:,}") lines.append(f" Change: {idx.change_pct:+.1f}%") lines.append(f" De-indexed Pages: {idx.deindexed_count:,}") if idx.missing_pages: lines.append("") lines.append(" Missing Pages (top 10):") for page in idx.missing_pages[:10]: lines.append(f" - {page}") # Ranking changes if report.ranking_changes: lines.append("") lines.append("--- RANKING CHANGES ---") drops = [r for r in report.ranking_changes if r.change < 0] gains = [r for r in report.ranking_changes if r.change > 0] lines.append(f" Total Tracked: {len(report.ranking_changes)}") lines.append(f" Improved: {len(gains)}") lines.append(f" Declined: {len(drops)}") if drops: lines.append("") lines.append(" Biggest Drops:") lines.append( f" {'Keyword':<30} {'Pre':>6} {'Post':>6} {'Change':>8} {'Volume':>8}" ) lines.append(" " + "-" * 61) for r in drops[:15]: kw = r.keyword[:28] lines.append( f" {kw:<30} {r.pre_position:>6} {r.post_position:>6} " f"{r.change:>+7} {r.search_volume:>8,}" ) # Recovery estimate if report.recovery_estimate: est = report.recovery_estimate lines.append("") lines.append("--- RECOVERY ESTIMATE ---") lines.append(f" {est.get('message', 'N/A')}") weeks = est.get("estimated_weeks", "unknown") confidence = est.get("confidence", "unknown") lines.append(f" Estimated Weeks: {weeks}") lines.append(f" Confidence: {confidence}") if report.errors: lines.append("") lines.append("--- ERRORS ---") for err in report.errors: lines.append(f" - {err}") lines.append("") lines.append("=" * 70) return "\n".join(lines) def _serialize_report(report: MigrationReport) -> dict: """Convert report to JSON-serializable dict.""" output: dict[str, Any] = { "domain": report.domain, "migration_date": report.migration_date, "days_since_migration": report.days_since_migration, "traffic_comparison": [asdict(t) for t in report.traffic_comparison], "redirect_health": [asdict(r) for r in report.redirect_health], "indexation": asdict(report.indexation) if report.indexation else None, "ranking_changes": [asdict(r) for r in report.ranking_changes], "recovery_estimate": report.recovery_estimate, "alerts": [asdict(a) for a in report.alerts], "timestamp": report.timestamp, } if report.errors: output["errors"] = report.errors return output # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- def parse_args(argv: list[str] | None = None) -> argparse.Namespace: parser = argparse.ArgumentParser( description="Migration Monitor - Post-migration SEO monitoring and alerting", formatter_class=argparse.RawDescriptionHelpFormatter, epilog="""\ Examples: python migration_monitor.py --domain https://new-example.com --migration-date 2025-01-15 --baseline baseline.json --json python migration_monitor.py --domain https://new-example.com --migration-date 2025-01-15 --json """, ) parser.add_argument( "--domain", required=True, help="Domain to monitor (post-migration URL)", ) parser.add_argument( "--migration-date", required=True, help="Migration date in YYYY-MM-DD format", ) parser.add_argument( "--baseline", type=str, default=None, help="Path to baseline JSON file from migration_planner.py", ) parser.add_argument( "--type", choices=["domain-move", "platform", "url-restructure", "https", "subdomain"], default="domain-move", help="Migration type for recovery estimation (default: domain-move)", ) parser.add_argument( "--json", action="store_true", default=False, help="Output in JSON format", ) parser.add_argument( "--output", type=str, default=None, help="Save output to file path", ) return parser.parse_args(argv) async def async_main(args: argparse.Namespace) -> None: monitor = MigrationMonitor() report = await monitor.run( domain=args.domain, migration_date=args.migration_date, baseline_file=args.baseline, migration_type=args.type, ) if args.json: output_str = json.dumps(_serialize_report(report), indent=2, ensure_ascii=False) else: output_str = _format_text_report(report) if args.output: with open(args.output, "w", encoding="utf-8") as f: f.write(output_str) logger.info(f"Migration report saved to {args.output}") else: print(output_str) monitor.print_stats() def main() -> None: args = parse_args() asyncio.run(async_main(args)) if __name__ == "__main__": main()