Files
our-claude-skills/custom-skills/33-seo-migration-planner/code/scripts/migration_planner.py
Andrew Yim d2d0a2d460 Add SEO skills 33-34 and fix bugs in skills 19-34
New skills:
- Skill 33: Site migration planner with redirect mapping and monitoring
- Skill 34: Reporting dashboard with HTML charts and Korean executive reports

Bug fixes (Skill 34 - report_aggregator.py):
- Add audit_type fallback for skill identification (was only using audit_id prefix)
- Extract health scores from nested data dict (technical_score, onpage_score, etc.)
- Support subdomain matching in domain filter (blog.ourdigital.org matches ourdigital.org)
- Skip self-referencing DASH- aggregated reports

Bug fixes (Skill 20 - naver_serp_analyzer.py):
- Remove VIEW tab selectors (removed by Naver in 2026)
- Add new section detectors: books (도서), shortform (숏폼), influencer (인플루언서)

Improvements (Skill 34 - dashboard/executive report):
- Add Korean category labels for Chart.js charts (기술 SEO, 온페이지, etc.)
- Add Korean trend labels (개선 중 ↑, 안정 →, 하락 중 ↓)
- Add English→Korean issue description translation layer (20 common patterns)

Documentation improvements:
- Add Korean triggers to 4 skill descriptions (19, 25, 28, 31)
- Expand Skill 32 SKILL.md from 40→143 lines (was 6/10, added workflow, output format, limitations)
- Add output format examples to Skills 27 and 28 SKILL.md
- Add limitations sections to Skills 27 and 28
- Update README.md, CLAUDE.md, AGENTS.md for skills 33-34

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-14 00:01:00 +09:00

755 lines
32 KiB
Python

"""
Migration Planner - SEO Site Migration Planning
================================================
Purpose: Pre-migration risk assessment, redirect mapping, URL inventory,
crawl baseline capture, and checklist generation for site migrations.
Python: 3.10+
Usage:
python migration_planner.py --domain https://example.com --type domain-move --new-domain https://new-example.com --json
python migration_planner.py --domain https://example.com --type platform --json
python migration_planner.py --domain https://example.com --type url-restructure --json
python migration_planner.py --domain http://example.com --type https --json
python migration_planner.py --domain https://blog.example.com --type subdomain --new-domain https://example.com/blog --json
"""
import argparse
import asyncio
import json
import logging
import sys
from dataclasses import dataclass, field, asdict
from datetime import datetime
from typing import Any
from urllib.parse import urlparse
from base_client import BaseAsyncClient, config
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Data classes
# ---------------------------------------------------------------------------
@dataclass
class MigrationURL:
"""A single URL in the migration inventory with associated metrics."""
url: str = ""
traffic: int = 0
keywords: int = 0
backlinks: int = 0
risk_score: float = 0.0
redirect_target: str = ""
status_code: int = 200
priority: str = "low" # critical / high / medium / low
@dataclass
class MigrationBaseline:
"""Pre-migration baseline snapshot of the site."""
domain: str = ""
total_urls: int = 0
total_traffic: int = 0
total_keywords: int = 0
total_referring_domains: int = 0
top_pages: list[dict[str, Any]] = field(default_factory=list)
url_inventory: list[MigrationURL] = field(default_factory=list)
@dataclass
class RedirectMap:
"""A single redirect mapping entry."""
source: str = ""
target: str = ""
status_code: int = 301
priority: str = "low" # critical / high / medium / low
risk_score: float = 0.0
@dataclass
class RiskAssessment:
"""Aggregated risk assessment for the migration."""
high_risk_urls: int = 0
medium_risk_urls: int = 0
low_risk_urls: int = 0
overall_risk: str = "low" # critical / high / medium / low
top_risk_urls: list[dict[str, Any]] = field(default_factory=list)
risk_factors: list[str] = field(default_factory=list)
@dataclass
class MigrationPlan:
"""Complete migration plan output."""
migration_type: str = ""
domain: str = ""
new_domain: str = ""
baseline: MigrationBaseline | None = None
redirect_map: list[RedirectMap] = field(default_factory=list)
risk_assessment: RiskAssessment | None = None
pre_migration_checklist: list[dict[str, Any]] = field(default_factory=list)
timestamp: str = ""
errors: list[str] = field(default_factory=list)
# ---------------------------------------------------------------------------
# Migration types
# ---------------------------------------------------------------------------
MIGRATION_TYPES = {
"domain-move": "Domain Move (old domain -> new domain)",
"platform": "Platform Change (CMS/framework migration)",
"url-restructure": "URL Restructuring (path/slug changes)",
"https": "HTTPS Migration (HTTP -> HTTPS)",
"subdomain": "Subdomain Consolidation (subdomain -> subfolder)",
}
# ---------------------------------------------------------------------------
# Planner
# ---------------------------------------------------------------------------
class MigrationPlanner(BaseAsyncClient):
"""Plans site migrations using Firecrawl for crawling and Ahrefs for SEO data."""
def __init__(self):
super().__init__(max_concurrent=5, requests_per_second=2.0)
@staticmethod
def _extract_domain(url: str) -> str:
"""Extract bare domain from URL or return as-is if already bare."""
if "://" in url:
parsed = urlparse(url)
return parsed.netloc.lower().replace("www.", "")
return url.lower().replace("www.", "")
@staticmethod
def _normalize_url(url: str) -> str:
"""Ensure URL has a scheme."""
if not url.startswith(("http://", "https://")):
return f"https://{url}"
return url
# ------------------------------------------------------------------
# MCP wrappers (return dicts; Claude MCP bridge fills these)
# ------------------------------------------------------------------
async def _call_ahrefs(self, tool: str, params: dict[str, Any]) -> dict:
"""Simulate Ahrefs MCP call. In production, routed via MCP bridge."""
self.logger.info(f"Ahrefs MCP call: {tool} | params={params}")
return {"tool": tool, "params": params, "data": {}}
async def _call_firecrawl(self, tool: str, params: dict[str, Any]) -> dict:
"""Simulate Firecrawl MCP call. In production, routed via MCP bridge."""
self.logger.info(f"Firecrawl MCP call: {tool} | params={params}")
return {"tool": tool, "params": params, "data": {}}
# ------------------------------------------------------------------
# URL Inventory
# ------------------------------------------------------------------
async def crawl_url_inventory(self, domain: str) -> list[MigrationURL]:
"""Crawl the site via Firecrawl to capture all URLs and status codes."""
url = self._normalize_url(domain)
self.logger.info(f"Crawling URL inventory for {url}")
resp = await self._call_firecrawl(
"firecrawl_crawl",
{"url": url, "limit": 5000, "scrapeOptions": {"formats": ["links"]}},
)
crawl_data = resp.get("data", {})
pages = crawl_data.get("pages", [])
inventory: list[MigrationURL] = []
for page in pages:
migration_url = MigrationURL(
url=page.get("url", ""),
status_code=int(page.get("status_code", 200)),
)
inventory.append(migration_url)
if not inventory:
# Fallback: create a single entry for the domain
inventory.append(MigrationURL(url=url, status_code=200))
self.logger.warning(
"Firecrawl returned no pages; created placeholder entry. "
"Verify Firecrawl MCP is configured."
)
else:
self.logger.info(f"Crawled {len(inventory)} URLs from {domain}")
return inventory
# ------------------------------------------------------------------
# Ahrefs Baseline
# ------------------------------------------------------------------
async def fetch_top_pages_baseline(
self, domain: str, limit: int = 500
) -> list[dict[str, Any]]:
"""Fetch top pages with traffic and keyword data from Ahrefs."""
domain = self._extract_domain(domain)
self.logger.info(f"Fetching top pages baseline for {domain}")
resp = await self._call_ahrefs(
"site-explorer-top-pages",
{"target": domain, "limit": limit},
)
pages_raw = resp.get("data", {}).get("pages", [])
top_pages: list[dict[str, Any]] = []
for page in pages_raw:
top_pages.append({
"url": page.get("url", ""),
"traffic": int(page.get("traffic", 0)),
"keywords": int(page.get("keywords", 0)),
"top_keyword": page.get("top_keyword", ""),
"position": int(page.get("position", 0)),
})
self.logger.info(f"Fetched {len(top_pages)} top pages for {domain}")
return top_pages
async def fetch_site_metrics(self, domain: str) -> dict[str, Any]:
"""Fetch overall site metrics from Ahrefs."""
domain = self._extract_domain(domain)
metrics_resp = await self._call_ahrefs(
"site-explorer-metrics", {"target": domain}
)
metrics = metrics_resp.get("data", {})
backlinks_resp = await self._call_ahrefs(
"site-explorer-backlinks-stats", {"target": domain}
)
backlinks = backlinks_resp.get("data", {})
return {
"organic_traffic": int(metrics.get("organic_traffic", 0)),
"organic_keywords": int(metrics.get("organic_keywords", 0)),
"referring_domains": int(backlinks.get("referring_domains", 0)),
}
async def fetch_page_backlinks(self, url: str) -> int:
"""Fetch backlink count for a specific URL."""
resp = await self._call_ahrefs(
"site-explorer-backlinks-stats", {"target": url}
)
return int(resp.get("data", {}).get("referring_domains", 0))
async def fetch_page_keywords(self, url: str) -> list[dict[str, Any]]:
"""Fetch keyword rankings for a specific URL."""
resp = await self._call_ahrefs(
"site-explorer-organic-keywords",
{"target": url, "limit": 100},
)
return resp.get("data", {}).get("keywords", [])
# ------------------------------------------------------------------
# Risk Assessment
# ------------------------------------------------------------------
def assess_url_risk(self, url_data: MigrationURL) -> float:
"""Score risk for a single URL based on traffic, backlinks, and keywords.
Risk score 0-100:
- Traffic weight: 40% (high traffic = high risk if migration fails)
- Backlinks weight: 30% (external links break if redirect fails)
- Keywords weight: 30% (ranking loss risk)
"""
# Normalize each factor to 0-100
# Traffic: 1000+ monthly visits = high risk
traffic_score = min((url_data.traffic / 1000) * 100, 100) if url_data.traffic > 0 else 0
# Backlinks: 50+ referring domains = high risk
backlinks_score = min((url_data.backlinks / 50) * 100, 100) if url_data.backlinks > 0 else 0
# Keywords: 20+ rankings = high risk
keywords_score = min((url_data.keywords / 20) * 100, 100) if url_data.keywords > 0 else 0
risk = (
traffic_score * 0.40
+ backlinks_score * 0.30
+ keywords_score * 0.30
)
return round(min(max(risk, 0), 100), 1)
def classify_priority(self, risk_score: float) -> str:
"""Classify URL priority based on risk score."""
if risk_score >= 75:
return "critical"
elif risk_score >= 50:
return "high"
elif risk_score >= 25:
return "medium"
else:
return "low"
# ------------------------------------------------------------------
# Redirect Map
# ------------------------------------------------------------------
def generate_redirect_map(
self,
url_inventory: list[MigrationURL],
migration_type: str,
new_domain: str | None = None,
) -> list[RedirectMap]:
"""Generate redirect mappings based on migration type."""
redirect_map: list[RedirectMap] = []
for url_entry in url_inventory:
source = url_entry.url
if not source:
continue
parsed = urlparse(source)
path = parsed.path
# Determine target URL based on migration type
if migration_type == "domain-move" and new_domain:
new_parsed = urlparse(self._normalize_url(new_domain))
target = f"{new_parsed.scheme}://{new_parsed.netloc}{path}"
elif migration_type == "https":
target = source.replace("http://", "https://")
elif migration_type == "subdomain" and new_domain:
# e.g., blog.example.com/page -> example.com/blog/page
new_parsed = urlparse(self._normalize_url(new_domain))
target = f"{new_parsed.scheme}://{new_parsed.netloc}{new_parsed.path.rstrip('/')}{path}"
elif migration_type == "url-restructure":
# Placeholder: URL restructuring requires custom mapping rules
# In practice, user provides a mapping CSV or pattern
target = source # Will need manual mapping
elif migration_type == "platform":
# Platform change: URLs may stay the same or change
target = source # Will need verification post-migration
else:
target = source
redirect_entry = RedirectMap(
source=source,
target=target,
status_code=301,
priority=url_entry.priority,
risk_score=url_entry.risk_score,
)
redirect_map.append(redirect_entry)
# Sort by risk score descending (highest risk first)
redirect_map.sort(key=lambda r: r.risk_score, reverse=True)
self.logger.info(
f"Generated {len(redirect_map)} redirect mappings "
f"for {migration_type} migration"
)
return redirect_map
# ------------------------------------------------------------------
# Checklist
# ------------------------------------------------------------------
def generate_checklist(self, migration_type: str) -> list[dict[str, Any]]:
"""Generate pre-migration checklist based on migration type."""
# Common checklist items for all migration types
common_items = [
{"step": 1, "category": "Baseline", "task": "URL 인벤토리 크롤링 완료", "description": "Firecrawl로 전체 URL 목록 및 상태 코드 캡처", "status": "pending"},
{"step": 2, "category": "Baseline", "task": "트래픽 베이스라인 캡처", "description": "Ahrefs에서 페이지별 트래픽, 키워드, 백링크 데이터 수집", "status": "pending"},
{"step": 3, "category": "Baseline", "task": "Google Search Console 데이터 내보내기", "description": "현재 인덱싱 상태, 사이트맵 현황, 크롤 통계 기록", "status": "pending"},
{"step": 4, "category": "Baseline", "task": "Google Analytics 벤치마크 저장", "description": "이전 전 30일/90일 트래픽 데이터 스냅샷 저장", "status": "pending"},
{"step": 5, "category": "Redirects", "task": "Redirect 맵 생성", "description": "모든 URL에 대한 301 리디렉트 매핑 완료", "status": "pending"},
{"step": 6, "category": "Redirects", "task": "고위험 URL 우선 검증", "description": "트래픽/백링크 기준 상위 URL 리디렉트 수동 확인", "status": "pending"},
{"step": 7, "category": "Technical", "task": "robots.txt 업데이트 준비", "description": "새 도메인/구조에 맞는 robots.txt 작성", "status": "pending"},
{"step": 8, "category": "Technical", "task": "XML 사이트맵 업데이트 준비", "description": "새 URL 구조 반영한 사이트맵 생성", "status": "pending"},
{"step": 9, "category": "Technical", "task": "Canonical 태그 업데이트 계획", "description": "모든 페이지의 canonical URL이 새 주소를 가리키도록 변경", "status": "pending"},
{"step": 10, "category": "Technical", "task": "Internal link 업데이트 계획", "description": "사이트 내부 링크가 새 URL을 직접 가리키도록 변경", "status": "pending"},
{"step": 11, "category": "Monitoring", "task": "모니터링 대시보드 설정", "description": "이전 후 트래픽, 인덱싱, 리디렉트 상태 모니터링 준비", "status": "pending"},
{"step": 12, "category": "Monitoring", "task": "알림 임계값 설정", "description": "트래픽 20% 이상 하락 시 알림 설정", "status": "pending"},
]
# Type-specific items
type_specific: dict[str, list[dict[str, Any]]] = {
"domain-move": [
{"step": 13, "category": "Domain", "task": "새 도메인 DNS 설정", "description": "DNS A/CNAME 레코드 설정 및 전파 확인", "status": "pending"},
{"step": 14, "category": "Domain", "task": "Google Search Console에 새 도메인 등록", "description": "새 도메인 속성 추가 및 소유권 확인", "status": "pending"},
{"step": 15, "category": "Domain", "task": "도메인 변경 알림 (GSC Change of Address)", "description": "Search Console에서 주소 변경 도구 실행", "status": "pending"},
{"step": 16, "category": "Domain", "task": "SSL 인증서 설치", "description": "새 도메인에 유효한 SSL 인증서 설치", "status": "pending"},
],
"platform": [
{"step": 13, "category": "Platform", "task": "URL 구조 매핑 확인", "description": "새 플랫폼에서 동일한 URL 구조 유지 여부 확인", "status": "pending"},
{"step": 14, "category": "Platform", "task": "메타 태그 이전 확인", "description": "Title, Description, Open Graph 태그 동일 여부 확인", "status": "pending"},
{"step": 15, "category": "Platform", "task": "구조화된 데이터 이전", "description": "JSON-LD Schema Markup 동일 여부 확인", "status": "pending"},
{"step": 16, "category": "Platform", "task": "스테이징 환경 테스트", "description": "스테이징에서 전체 크롤링 및 리디렉트 테스트 실행", "status": "pending"},
],
"url-restructure": [
{"step": 13, "category": "URL", "task": "URL 패턴 매핑 문서화", "description": "기존 → 신규 URL 패턴 규칙 문서화", "status": "pending"},
{"step": 14, "category": "URL", "task": "정규식 리디렉트 규칙 작성", "description": "서버 레벨 리디렉트 규칙 (nginx/Apache) 작성", "status": "pending"},
{"step": 15, "category": "URL", "task": "Breadcrumb 업데이트", "description": "새 URL 구조에 맞게 Breadcrumb 네비게이션 수정", "status": "pending"},
],
"https": [
{"step": 13, "category": "HTTPS", "task": "SSL 인증서 설치 및 확인", "description": "유효한 SSL 인증서 설치 (Let's Encrypt 또는 상용 인증서)", "status": "pending"},
{"step": 14, "category": "HTTPS", "task": "Mixed Content 점검", "description": "HTTP로 로드되는 리소스 (이미지, CSS, JS) 식별 및 수정", "status": "pending"},
{"step": 15, "category": "HTTPS", "task": "HSTS 헤더 설정", "description": "Strict-Transport-Security 헤더 활성화", "status": "pending"},
],
"subdomain": [
{"step": 13, "category": "Subdomain", "task": "서브도메인 → 서브폴더 매핑", "description": "서브도메인 경로를 서브폴더 경로로 매핑", "status": "pending"},
{"step": 14, "category": "Subdomain", "task": "서버 리디렉트 규칙 설정", "description": "서브도메인에서 메인 도메인으로의 301 리디렉트 규칙", "status": "pending"},
{"step": 15, "category": "Subdomain", "task": "DNS 설정 업데이트", "description": "서브도메인 DNS 레코드 유지 (리디렉트용)", "status": "pending"},
],
}
checklist = common_items.copy()
if migration_type in type_specific:
checklist.extend(type_specific[migration_type])
self.logger.info(
f"Generated {len(checklist)} checklist items for {migration_type} migration"
)
return checklist
# ------------------------------------------------------------------
# Orchestrator
# ------------------------------------------------------------------
async def run(
self,
domain: str,
migration_type: str,
new_domain: str | None = None,
) -> MigrationPlan:
"""Orchestrate full migration planning pipeline."""
timestamp = datetime.now().isoformat()
plan = MigrationPlan(
migration_type=migration_type,
domain=self._extract_domain(domain),
new_domain=self._extract_domain(new_domain) if new_domain else "",
timestamp=timestamp,
)
try:
# Step 1: Crawl URL inventory
self.logger.info("Step 1/6: Crawling URL inventory via Firecrawl...")
url_inventory = await self.crawl_url_inventory(domain)
# Step 2: Fetch Ahrefs baseline
self.logger.info("Step 2/6: Fetching Ahrefs top pages baseline...")
top_pages = await self.fetch_top_pages_baseline(domain)
site_metrics = await self.fetch_site_metrics(domain)
# Step 3: Enrich URL inventory with Ahrefs data
self.logger.info("Step 3/6: Enriching URLs with traffic/backlink data...")
top_pages_map: dict[str, dict] = {}
for page in top_pages:
page_url = page.get("url", "")
if page_url:
top_pages_map[page_url] = page
for url_entry in url_inventory:
page_data = top_pages_map.get(url_entry.url, {})
url_entry.traffic = int(page_data.get("traffic", 0))
url_entry.keywords = int(page_data.get("keywords", 0))
# Step 4: Risk assessment per URL
self.logger.info("Step 4/6: Scoring risk per URL...")
for url_entry in url_inventory:
url_entry.risk_score = self.assess_url_risk(url_entry)
url_entry.priority = self.classify_priority(url_entry.risk_score)
# Build baseline
baseline = MigrationBaseline(
domain=self._extract_domain(domain),
total_urls=len(url_inventory),
total_traffic=site_metrics.get("organic_traffic", 0),
total_keywords=site_metrics.get("organic_keywords", 0),
total_referring_domains=site_metrics.get("referring_domains", 0),
top_pages=top_pages[:50], # Store top 50 for reference
url_inventory=url_inventory,
)
plan.baseline = baseline
# Step 5: Generate redirect map
self.logger.info("Step 5/6: Generating redirect map...")
plan.redirect_map = self.generate_redirect_map(
url_inventory, migration_type, new_domain
)
# Build risk assessment summary
high_risk = sum(1 for u in url_inventory if u.risk_score >= 75)
medium_risk = sum(1 for u in url_inventory if 25 <= u.risk_score < 75)
low_risk = sum(1 for u in url_inventory if u.risk_score < 25)
# Determine overall risk level
if high_risk > len(url_inventory) * 0.2:
overall_risk = "critical"
elif high_risk > len(url_inventory) * 0.1:
overall_risk = "high"
elif medium_risk > len(url_inventory) * 0.3:
overall_risk = "medium"
else:
overall_risk = "low"
# Top risk URLs
sorted_urls = sorted(url_inventory, key=lambda u: u.risk_score, reverse=True)
top_risk = [
{
"url": u.url,
"risk_score": u.risk_score,
"traffic": u.traffic,
"keywords": u.keywords,
"backlinks": u.backlinks,
}
for u in sorted_urls[:20]
]
# Risk factors
risk_factors: list[str] = []
if high_risk > 0:
risk_factors.append(
f"{high_risk}개 고위험 URL (트래픽/백링크 손실 위험)"
)
if baseline.total_traffic > 10000:
risk_factors.append(
f"월간 오가닉 트래픽 {baseline.total_traffic:,}회 — 이전 실패 시 큰 영향"
)
if baseline.total_referring_domains > 500:
risk_factors.append(
f"참조 도메인 {baseline.total_referring_domains:,}개 — 리디렉트 누락 시 링크 에퀴티 손실"
)
if migration_type == "domain-move":
risk_factors.append(
"도메인 변경은 가장 위험한 이전 유형 — 최소 3-6개월 회복 예상"
)
elif migration_type == "url-restructure":
risk_factors.append(
"URL 구조 변경 시 모든 내부/외부 링크 영향 — 정규식 리디렉트 필수"
)
plan.risk_assessment = RiskAssessment(
high_risk_urls=high_risk,
medium_risk_urls=medium_risk,
low_risk_urls=low_risk,
overall_risk=overall_risk,
top_risk_urls=top_risk,
risk_factors=risk_factors,
)
# Step 6: Generate checklist
self.logger.info("Step 6/6: Generating pre-migration checklist...")
plan.pre_migration_checklist = self.generate_checklist(migration_type)
self.logger.info(
f"Migration plan complete: {len(url_inventory)} URLs inventoried, "
f"{len(plan.redirect_map)} redirects mapped, "
f"overall risk: {overall_risk}"
)
except Exception as e:
msg = f"Migration planning pipeline error: {e}"
self.logger.error(msg)
plan.errors.append(msg)
return plan
# ---------------------------------------------------------------------------
# Output helpers
# ---------------------------------------------------------------------------
def _format_text_report(plan: MigrationPlan) -> str:
"""Format migration plan as human-readable text report."""
lines: list[str] = []
lines.append("=" * 70)
lines.append(" SEO MIGRATION PLAN")
lines.append(f" Domain: {plan.domain}")
if plan.new_domain:
lines.append(f" New Domain: {plan.new_domain}")
lines.append(f" Migration Type: {MIGRATION_TYPES.get(plan.migration_type, plan.migration_type)}")
lines.append(f" Generated: {plan.timestamp}")
lines.append("=" * 70)
if plan.baseline:
b = plan.baseline
lines.append("")
lines.append("--- BASELINE ---")
lines.append(f" Total URLs: {b.total_urls:,}")
lines.append(f" Organic Traffic: {b.total_traffic:,}")
lines.append(f" Organic Keywords: {b.total_keywords:,}")
lines.append(f" Referring Domains: {b.total_referring_domains:,}")
if plan.risk_assessment:
r = plan.risk_assessment
lines.append("")
lines.append("--- RISK ASSESSMENT ---")
lines.append(f" Overall Risk: {r.overall_risk.upper()}")
lines.append(f" High Risk URLs: {r.high_risk_urls:,}")
lines.append(f" Medium Risk: {r.medium_risk_urls:,}")
lines.append(f" Low Risk: {r.low_risk_urls:,}")
if r.risk_factors:
lines.append("")
lines.append(" Risk Factors:")
for factor in r.risk_factors:
lines.append(f" - {factor}")
if r.top_risk_urls:
lines.append("")
lines.append(" Top Risk URLs:")
for url_info in r.top_risk_urls[:10]:
lines.append(
f" [{url_info['risk_score']:.0f}] {url_info['url']} "
f"(traffic={url_info['traffic']:,}, kw={url_info['keywords']})"
)
if plan.redirect_map:
lines.append("")
lines.append(f"--- REDIRECT MAP ({len(plan.redirect_map)} entries) ---")
# Show top 20 by risk
for i, rmap in enumerate(plan.redirect_map[:20], 1):
lines.append(
f" {i:>3}. [{rmap.priority.upper():>8}] "
f"{rmap.source} -> {rmap.target}"
)
if len(plan.redirect_map) > 20:
lines.append(f" ... and {len(plan.redirect_map) - 20} more entries")
if plan.pre_migration_checklist:
lines.append("")
lines.append("--- PRE-MIGRATION CHECKLIST ---")
for item in plan.pre_migration_checklist:
status_marker = "[ ]" if item["status"] == "pending" else "[x]"
lines.append(
f" {status_marker} Step {item['step']}: {item['task']}"
)
lines.append(f" {item['description']}")
if plan.errors:
lines.append("")
lines.append("--- ERRORS ---")
for err in plan.errors:
lines.append(f" - {err}")
lines.append("")
lines.append("=" * 70)
return "\n".join(lines)
def _serialize_plan(plan: MigrationPlan) -> dict:
"""Convert plan to JSON-serializable dict."""
output: dict[str, Any] = {
"domain": plan.domain,
"new_domain": plan.new_domain,
"migration_type": plan.migration_type,
"baseline": None,
"redirect_map": [asdict(r) for r in plan.redirect_map],
"risk_assessment": asdict(plan.risk_assessment) if plan.risk_assessment else None,
"pre_migration_checklist": plan.pre_migration_checklist,
"timestamp": plan.timestamp,
}
if plan.baseline:
output["baseline"] = {
"domain": plan.baseline.domain,
"total_urls": plan.baseline.total_urls,
"total_traffic": plan.baseline.total_traffic,
"total_keywords": plan.baseline.total_keywords,
"total_referring_domains": plan.baseline.total_referring_domains,
"top_pages": plan.baseline.top_pages,
"url_inventory": [asdict(u) for u in plan.baseline.url_inventory],
}
if plan.errors:
output["errors"] = plan.errors
return output
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="SEO Migration Planner - Pre-migration risk assessment and redirect mapping",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""\
Examples:
python migration_planner.py --domain https://example.com --type domain-move --new-domain https://new-example.com --json
python migration_planner.py --domain https://example.com --type platform --json
python migration_planner.py --domain https://example.com --type url-restructure --json
python migration_planner.py --domain http://example.com --type https --json
python migration_planner.py --domain https://blog.example.com --type subdomain --new-domain https://example.com/blog --json
""",
)
parser.add_argument(
"--domain",
required=True,
help="Target website URL or domain to plan migration for",
)
parser.add_argument(
"--type",
required=True,
choices=["domain-move", "platform", "url-restructure", "https", "subdomain"],
help="Migration type",
)
parser.add_argument(
"--new-domain",
type=str,
default=None,
help="New domain/URL (required for domain-move and subdomain types)",
)
parser.add_argument(
"--json",
action="store_true",
default=False,
help="Output in JSON format",
)
parser.add_argument(
"--output",
type=str,
default=None,
help="Save output to file path",
)
return parser.parse_args(argv)
async def async_main(args: argparse.Namespace) -> None:
# Validate required arguments for specific types
if args.type in ("domain-move", "subdomain") and not args.new_domain:
logger.error(f"--new-domain is required for {args.type} migration type")
sys.exit(1)
planner = MigrationPlanner()
plan = await planner.run(
domain=args.domain,
migration_type=args.type,
new_domain=args.new_domain,
)
if args.json:
output_str = json.dumps(_serialize_plan(plan), indent=2, ensure_ascii=False)
else:
output_str = _format_text_report(plan)
if args.output:
with open(args.output, "w", encoding="utf-8") as f:
f.write(output_str)
logger.info(f"Migration plan saved to {args.output}")
else:
print(output_str)
planner.print_stats()
def main() -> None:
args = parse_args()
asyncio.run(async_main(args))
if __name__ == "__main__":
main()