New skills: - Skill 33: Site migration planner with redirect mapping and monitoring - Skill 34: Reporting dashboard with HTML charts and Korean executive reports Bug fixes (Skill 34 - report_aggregator.py): - Add audit_type fallback for skill identification (was only using audit_id prefix) - Extract health scores from nested data dict (technical_score, onpage_score, etc.) - Support subdomain matching in domain filter (blog.ourdigital.org matches ourdigital.org) - Skip self-referencing DASH- aggregated reports Bug fixes (Skill 20 - naver_serp_analyzer.py): - Remove VIEW tab selectors (removed by Naver in 2026) - Add new section detectors: books (도서), shortform (숏폼), influencer (인플루언서) Improvements (Skill 34 - dashboard/executive report): - Add Korean category labels for Chart.js charts (기술 SEO, 온페이지, etc.) - Add Korean trend labels (개선 중 ↑, 안정 →, 하락 중 ↓) - Add English→Korean issue description translation layer (20 common patterns) Documentation improvements: - Add Korean triggers to 4 skill descriptions (19, 25, 28, 31) - Expand Skill 32 SKILL.md from 40→143 lines (was 6/10, added workflow, output format, limitations) - Add output format examples to Skills 27 and 28 SKILL.md - Add limitations sections to Skills 27 and 28 - Update README.md, CLAUDE.md, AGENTS.md for skills 33-34 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
755 lines
32 KiB
Python
755 lines
32 KiB
Python
"""
|
|
Migration Planner - SEO Site Migration Planning
|
|
================================================
|
|
Purpose: Pre-migration risk assessment, redirect mapping, URL inventory,
|
|
crawl baseline capture, and checklist generation for site migrations.
|
|
Python: 3.10+
|
|
|
|
Usage:
|
|
python migration_planner.py --domain https://example.com --type domain-move --new-domain https://new-example.com --json
|
|
python migration_planner.py --domain https://example.com --type platform --json
|
|
python migration_planner.py --domain https://example.com --type url-restructure --json
|
|
python migration_planner.py --domain http://example.com --type https --json
|
|
python migration_planner.py --domain https://blog.example.com --type subdomain --new-domain https://example.com/blog --json
|
|
"""
|
|
|
|
import argparse
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import sys
|
|
from dataclasses import dataclass, field, asdict
|
|
from datetime import datetime
|
|
from typing import Any
|
|
from urllib.parse import urlparse
|
|
|
|
from base_client import BaseAsyncClient, config
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Data classes
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@dataclass
|
|
class MigrationURL:
|
|
"""A single URL in the migration inventory with associated metrics."""
|
|
url: str = ""
|
|
traffic: int = 0
|
|
keywords: int = 0
|
|
backlinks: int = 0
|
|
risk_score: float = 0.0
|
|
redirect_target: str = ""
|
|
status_code: int = 200
|
|
priority: str = "low" # critical / high / medium / low
|
|
|
|
|
|
@dataclass
|
|
class MigrationBaseline:
|
|
"""Pre-migration baseline snapshot of the site."""
|
|
domain: str = ""
|
|
total_urls: int = 0
|
|
total_traffic: int = 0
|
|
total_keywords: int = 0
|
|
total_referring_domains: int = 0
|
|
top_pages: list[dict[str, Any]] = field(default_factory=list)
|
|
url_inventory: list[MigrationURL] = field(default_factory=list)
|
|
|
|
|
|
@dataclass
|
|
class RedirectMap:
|
|
"""A single redirect mapping entry."""
|
|
source: str = ""
|
|
target: str = ""
|
|
status_code: int = 301
|
|
priority: str = "low" # critical / high / medium / low
|
|
risk_score: float = 0.0
|
|
|
|
|
|
@dataclass
|
|
class RiskAssessment:
|
|
"""Aggregated risk assessment for the migration."""
|
|
high_risk_urls: int = 0
|
|
medium_risk_urls: int = 0
|
|
low_risk_urls: int = 0
|
|
overall_risk: str = "low" # critical / high / medium / low
|
|
top_risk_urls: list[dict[str, Any]] = field(default_factory=list)
|
|
risk_factors: list[str] = field(default_factory=list)
|
|
|
|
|
|
@dataclass
|
|
class MigrationPlan:
|
|
"""Complete migration plan output."""
|
|
migration_type: str = ""
|
|
domain: str = ""
|
|
new_domain: str = ""
|
|
baseline: MigrationBaseline | None = None
|
|
redirect_map: list[RedirectMap] = field(default_factory=list)
|
|
risk_assessment: RiskAssessment | None = None
|
|
pre_migration_checklist: list[dict[str, Any]] = field(default_factory=list)
|
|
timestamp: str = ""
|
|
errors: list[str] = field(default_factory=list)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Migration types
|
|
# ---------------------------------------------------------------------------
|
|
|
|
MIGRATION_TYPES = {
|
|
"domain-move": "Domain Move (old domain -> new domain)",
|
|
"platform": "Platform Change (CMS/framework migration)",
|
|
"url-restructure": "URL Restructuring (path/slug changes)",
|
|
"https": "HTTPS Migration (HTTP -> HTTPS)",
|
|
"subdomain": "Subdomain Consolidation (subdomain -> subfolder)",
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Planner
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class MigrationPlanner(BaseAsyncClient):
|
|
"""Plans site migrations using Firecrawl for crawling and Ahrefs for SEO data."""
|
|
|
|
def __init__(self):
|
|
super().__init__(max_concurrent=5, requests_per_second=2.0)
|
|
|
|
@staticmethod
|
|
def _extract_domain(url: str) -> str:
|
|
"""Extract bare domain from URL or return as-is if already bare."""
|
|
if "://" in url:
|
|
parsed = urlparse(url)
|
|
return parsed.netloc.lower().replace("www.", "")
|
|
return url.lower().replace("www.", "")
|
|
|
|
@staticmethod
|
|
def _normalize_url(url: str) -> str:
|
|
"""Ensure URL has a scheme."""
|
|
if not url.startswith(("http://", "https://")):
|
|
return f"https://{url}"
|
|
return url
|
|
|
|
# ------------------------------------------------------------------
|
|
# MCP wrappers (return dicts; Claude MCP bridge fills these)
|
|
# ------------------------------------------------------------------
|
|
|
|
async def _call_ahrefs(self, tool: str, params: dict[str, Any]) -> dict:
|
|
"""Simulate Ahrefs MCP call. In production, routed via MCP bridge."""
|
|
self.logger.info(f"Ahrefs MCP call: {tool} | params={params}")
|
|
return {"tool": tool, "params": params, "data": {}}
|
|
|
|
async def _call_firecrawl(self, tool: str, params: dict[str, Any]) -> dict:
|
|
"""Simulate Firecrawl MCP call. In production, routed via MCP bridge."""
|
|
self.logger.info(f"Firecrawl MCP call: {tool} | params={params}")
|
|
return {"tool": tool, "params": params, "data": {}}
|
|
|
|
# ------------------------------------------------------------------
|
|
# URL Inventory
|
|
# ------------------------------------------------------------------
|
|
|
|
async def crawl_url_inventory(self, domain: str) -> list[MigrationURL]:
|
|
"""Crawl the site via Firecrawl to capture all URLs and status codes."""
|
|
url = self._normalize_url(domain)
|
|
self.logger.info(f"Crawling URL inventory for {url}")
|
|
|
|
resp = await self._call_firecrawl(
|
|
"firecrawl_crawl",
|
|
{"url": url, "limit": 5000, "scrapeOptions": {"formats": ["links"]}},
|
|
)
|
|
|
|
crawl_data = resp.get("data", {})
|
|
pages = crawl_data.get("pages", [])
|
|
|
|
inventory: list[MigrationURL] = []
|
|
for page in pages:
|
|
migration_url = MigrationURL(
|
|
url=page.get("url", ""),
|
|
status_code=int(page.get("status_code", 200)),
|
|
)
|
|
inventory.append(migration_url)
|
|
|
|
if not inventory:
|
|
# Fallback: create a single entry for the domain
|
|
inventory.append(MigrationURL(url=url, status_code=200))
|
|
self.logger.warning(
|
|
"Firecrawl returned no pages; created placeholder entry. "
|
|
"Verify Firecrawl MCP is configured."
|
|
)
|
|
else:
|
|
self.logger.info(f"Crawled {len(inventory)} URLs from {domain}")
|
|
|
|
return inventory
|
|
|
|
# ------------------------------------------------------------------
|
|
# Ahrefs Baseline
|
|
# ------------------------------------------------------------------
|
|
|
|
async def fetch_top_pages_baseline(
|
|
self, domain: str, limit: int = 500
|
|
) -> list[dict[str, Any]]:
|
|
"""Fetch top pages with traffic and keyword data from Ahrefs."""
|
|
domain = self._extract_domain(domain)
|
|
self.logger.info(f"Fetching top pages baseline for {domain}")
|
|
|
|
resp = await self._call_ahrefs(
|
|
"site-explorer-top-pages",
|
|
{"target": domain, "limit": limit},
|
|
)
|
|
|
|
pages_raw = resp.get("data", {}).get("pages", [])
|
|
top_pages: list[dict[str, Any]] = []
|
|
for page in pages_raw:
|
|
top_pages.append({
|
|
"url": page.get("url", ""),
|
|
"traffic": int(page.get("traffic", 0)),
|
|
"keywords": int(page.get("keywords", 0)),
|
|
"top_keyword": page.get("top_keyword", ""),
|
|
"position": int(page.get("position", 0)),
|
|
})
|
|
|
|
self.logger.info(f"Fetched {len(top_pages)} top pages for {domain}")
|
|
return top_pages
|
|
|
|
async def fetch_site_metrics(self, domain: str) -> dict[str, Any]:
|
|
"""Fetch overall site metrics from Ahrefs."""
|
|
domain = self._extract_domain(domain)
|
|
|
|
metrics_resp = await self._call_ahrefs(
|
|
"site-explorer-metrics", {"target": domain}
|
|
)
|
|
metrics = metrics_resp.get("data", {})
|
|
|
|
backlinks_resp = await self._call_ahrefs(
|
|
"site-explorer-backlinks-stats", {"target": domain}
|
|
)
|
|
backlinks = backlinks_resp.get("data", {})
|
|
|
|
return {
|
|
"organic_traffic": int(metrics.get("organic_traffic", 0)),
|
|
"organic_keywords": int(metrics.get("organic_keywords", 0)),
|
|
"referring_domains": int(backlinks.get("referring_domains", 0)),
|
|
}
|
|
|
|
async def fetch_page_backlinks(self, url: str) -> int:
|
|
"""Fetch backlink count for a specific URL."""
|
|
resp = await self._call_ahrefs(
|
|
"site-explorer-backlinks-stats", {"target": url}
|
|
)
|
|
return int(resp.get("data", {}).get("referring_domains", 0))
|
|
|
|
async def fetch_page_keywords(self, url: str) -> list[dict[str, Any]]:
|
|
"""Fetch keyword rankings for a specific URL."""
|
|
resp = await self._call_ahrefs(
|
|
"site-explorer-organic-keywords",
|
|
{"target": url, "limit": 100},
|
|
)
|
|
return resp.get("data", {}).get("keywords", [])
|
|
|
|
# ------------------------------------------------------------------
|
|
# Risk Assessment
|
|
# ------------------------------------------------------------------
|
|
|
|
def assess_url_risk(self, url_data: MigrationURL) -> float:
|
|
"""Score risk for a single URL based on traffic, backlinks, and keywords.
|
|
|
|
Risk score 0-100:
|
|
- Traffic weight: 40% (high traffic = high risk if migration fails)
|
|
- Backlinks weight: 30% (external links break if redirect fails)
|
|
- Keywords weight: 30% (ranking loss risk)
|
|
"""
|
|
# Normalize each factor to 0-100
|
|
# Traffic: 1000+ monthly visits = high risk
|
|
traffic_score = min((url_data.traffic / 1000) * 100, 100) if url_data.traffic > 0 else 0
|
|
|
|
# Backlinks: 50+ referring domains = high risk
|
|
backlinks_score = min((url_data.backlinks / 50) * 100, 100) if url_data.backlinks > 0 else 0
|
|
|
|
# Keywords: 20+ rankings = high risk
|
|
keywords_score = min((url_data.keywords / 20) * 100, 100) if url_data.keywords > 0 else 0
|
|
|
|
risk = (
|
|
traffic_score * 0.40
|
|
+ backlinks_score * 0.30
|
|
+ keywords_score * 0.30
|
|
)
|
|
|
|
return round(min(max(risk, 0), 100), 1)
|
|
|
|
def classify_priority(self, risk_score: float) -> str:
|
|
"""Classify URL priority based on risk score."""
|
|
if risk_score >= 75:
|
|
return "critical"
|
|
elif risk_score >= 50:
|
|
return "high"
|
|
elif risk_score >= 25:
|
|
return "medium"
|
|
else:
|
|
return "low"
|
|
|
|
# ------------------------------------------------------------------
|
|
# Redirect Map
|
|
# ------------------------------------------------------------------
|
|
|
|
def generate_redirect_map(
|
|
self,
|
|
url_inventory: list[MigrationURL],
|
|
migration_type: str,
|
|
new_domain: str | None = None,
|
|
) -> list[RedirectMap]:
|
|
"""Generate redirect mappings based on migration type."""
|
|
redirect_map: list[RedirectMap] = []
|
|
|
|
for url_entry in url_inventory:
|
|
source = url_entry.url
|
|
if not source:
|
|
continue
|
|
|
|
parsed = urlparse(source)
|
|
path = parsed.path
|
|
|
|
# Determine target URL based on migration type
|
|
if migration_type == "domain-move" and new_domain:
|
|
new_parsed = urlparse(self._normalize_url(new_domain))
|
|
target = f"{new_parsed.scheme}://{new_parsed.netloc}{path}"
|
|
|
|
elif migration_type == "https":
|
|
target = source.replace("http://", "https://")
|
|
|
|
elif migration_type == "subdomain" and new_domain:
|
|
# e.g., blog.example.com/page -> example.com/blog/page
|
|
new_parsed = urlparse(self._normalize_url(new_domain))
|
|
target = f"{new_parsed.scheme}://{new_parsed.netloc}{new_parsed.path.rstrip('/')}{path}"
|
|
|
|
elif migration_type == "url-restructure":
|
|
# Placeholder: URL restructuring requires custom mapping rules
|
|
# In practice, user provides a mapping CSV or pattern
|
|
target = source # Will need manual mapping
|
|
|
|
elif migration_type == "platform":
|
|
# Platform change: URLs may stay the same or change
|
|
target = source # Will need verification post-migration
|
|
|
|
else:
|
|
target = source
|
|
|
|
redirect_entry = RedirectMap(
|
|
source=source,
|
|
target=target,
|
|
status_code=301,
|
|
priority=url_entry.priority,
|
|
risk_score=url_entry.risk_score,
|
|
)
|
|
redirect_map.append(redirect_entry)
|
|
|
|
# Sort by risk score descending (highest risk first)
|
|
redirect_map.sort(key=lambda r: r.risk_score, reverse=True)
|
|
|
|
self.logger.info(
|
|
f"Generated {len(redirect_map)} redirect mappings "
|
|
f"for {migration_type} migration"
|
|
)
|
|
return redirect_map
|
|
|
|
# ------------------------------------------------------------------
|
|
# Checklist
|
|
# ------------------------------------------------------------------
|
|
|
|
def generate_checklist(self, migration_type: str) -> list[dict[str, Any]]:
|
|
"""Generate pre-migration checklist based on migration type."""
|
|
# Common checklist items for all migration types
|
|
common_items = [
|
|
{"step": 1, "category": "Baseline", "task": "URL 인벤토리 크롤링 완료", "description": "Firecrawl로 전체 URL 목록 및 상태 코드 캡처", "status": "pending"},
|
|
{"step": 2, "category": "Baseline", "task": "트래픽 베이스라인 캡처", "description": "Ahrefs에서 페이지별 트래픽, 키워드, 백링크 데이터 수집", "status": "pending"},
|
|
{"step": 3, "category": "Baseline", "task": "Google Search Console 데이터 내보내기", "description": "현재 인덱싱 상태, 사이트맵 현황, 크롤 통계 기록", "status": "pending"},
|
|
{"step": 4, "category": "Baseline", "task": "Google Analytics 벤치마크 저장", "description": "이전 전 30일/90일 트래픽 데이터 스냅샷 저장", "status": "pending"},
|
|
{"step": 5, "category": "Redirects", "task": "Redirect 맵 생성", "description": "모든 URL에 대한 301 리디렉트 매핑 완료", "status": "pending"},
|
|
{"step": 6, "category": "Redirects", "task": "고위험 URL 우선 검증", "description": "트래픽/백링크 기준 상위 URL 리디렉트 수동 확인", "status": "pending"},
|
|
{"step": 7, "category": "Technical", "task": "robots.txt 업데이트 준비", "description": "새 도메인/구조에 맞는 robots.txt 작성", "status": "pending"},
|
|
{"step": 8, "category": "Technical", "task": "XML 사이트맵 업데이트 준비", "description": "새 URL 구조 반영한 사이트맵 생성", "status": "pending"},
|
|
{"step": 9, "category": "Technical", "task": "Canonical 태그 업데이트 계획", "description": "모든 페이지의 canonical URL이 새 주소를 가리키도록 변경", "status": "pending"},
|
|
{"step": 10, "category": "Technical", "task": "Internal link 업데이트 계획", "description": "사이트 내부 링크가 새 URL을 직접 가리키도록 변경", "status": "pending"},
|
|
{"step": 11, "category": "Monitoring", "task": "모니터링 대시보드 설정", "description": "이전 후 트래픽, 인덱싱, 리디렉트 상태 모니터링 준비", "status": "pending"},
|
|
{"step": 12, "category": "Monitoring", "task": "알림 임계값 설정", "description": "트래픽 20% 이상 하락 시 알림 설정", "status": "pending"},
|
|
]
|
|
|
|
# Type-specific items
|
|
type_specific: dict[str, list[dict[str, Any]]] = {
|
|
"domain-move": [
|
|
{"step": 13, "category": "Domain", "task": "새 도메인 DNS 설정", "description": "DNS A/CNAME 레코드 설정 및 전파 확인", "status": "pending"},
|
|
{"step": 14, "category": "Domain", "task": "Google Search Console에 새 도메인 등록", "description": "새 도메인 속성 추가 및 소유권 확인", "status": "pending"},
|
|
{"step": 15, "category": "Domain", "task": "도메인 변경 알림 (GSC Change of Address)", "description": "Search Console에서 주소 변경 도구 실행", "status": "pending"},
|
|
{"step": 16, "category": "Domain", "task": "SSL 인증서 설치", "description": "새 도메인에 유효한 SSL 인증서 설치", "status": "pending"},
|
|
],
|
|
"platform": [
|
|
{"step": 13, "category": "Platform", "task": "URL 구조 매핑 확인", "description": "새 플랫폼에서 동일한 URL 구조 유지 여부 확인", "status": "pending"},
|
|
{"step": 14, "category": "Platform", "task": "메타 태그 이전 확인", "description": "Title, Description, Open Graph 태그 동일 여부 확인", "status": "pending"},
|
|
{"step": 15, "category": "Platform", "task": "구조화된 데이터 이전", "description": "JSON-LD Schema Markup 동일 여부 확인", "status": "pending"},
|
|
{"step": 16, "category": "Platform", "task": "스테이징 환경 테스트", "description": "스테이징에서 전체 크롤링 및 리디렉트 테스트 실행", "status": "pending"},
|
|
],
|
|
"url-restructure": [
|
|
{"step": 13, "category": "URL", "task": "URL 패턴 매핑 문서화", "description": "기존 → 신규 URL 패턴 규칙 문서화", "status": "pending"},
|
|
{"step": 14, "category": "URL", "task": "정규식 리디렉트 규칙 작성", "description": "서버 레벨 리디렉트 규칙 (nginx/Apache) 작성", "status": "pending"},
|
|
{"step": 15, "category": "URL", "task": "Breadcrumb 업데이트", "description": "새 URL 구조에 맞게 Breadcrumb 네비게이션 수정", "status": "pending"},
|
|
],
|
|
"https": [
|
|
{"step": 13, "category": "HTTPS", "task": "SSL 인증서 설치 및 확인", "description": "유효한 SSL 인증서 설치 (Let's Encrypt 또는 상용 인증서)", "status": "pending"},
|
|
{"step": 14, "category": "HTTPS", "task": "Mixed Content 점검", "description": "HTTP로 로드되는 리소스 (이미지, CSS, JS) 식별 및 수정", "status": "pending"},
|
|
{"step": 15, "category": "HTTPS", "task": "HSTS 헤더 설정", "description": "Strict-Transport-Security 헤더 활성화", "status": "pending"},
|
|
],
|
|
"subdomain": [
|
|
{"step": 13, "category": "Subdomain", "task": "서브도메인 → 서브폴더 매핑", "description": "서브도메인 경로를 서브폴더 경로로 매핑", "status": "pending"},
|
|
{"step": 14, "category": "Subdomain", "task": "서버 리디렉트 규칙 설정", "description": "서브도메인에서 메인 도메인으로의 301 리디렉트 규칙", "status": "pending"},
|
|
{"step": 15, "category": "Subdomain", "task": "DNS 설정 업데이트", "description": "서브도메인 DNS 레코드 유지 (리디렉트용)", "status": "pending"},
|
|
],
|
|
}
|
|
|
|
checklist = common_items.copy()
|
|
if migration_type in type_specific:
|
|
checklist.extend(type_specific[migration_type])
|
|
|
|
self.logger.info(
|
|
f"Generated {len(checklist)} checklist items for {migration_type} migration"
|
|
)
|
|
return checklist
|
|
|
|
# ------------------------------------------------------------------
|
|
# Orchestrator
|
|
# ------------------------------------------------------------------
|
|
|
|
async def run(
|
|
self,
|
|
domain: str,
|
|
migration_type: str,
|
|
new_domain: str | None = None,
|
|
) -> MigrationPlan:
|
|
"""Orchestrate full migration planning pipeline."""
|
|
timestamp = datetime.now().isoformat()
|
|
plan = MigrationPlan(
|
|
migration_type=migration_type,
|
|
domain=self._extract_domain(domain),
|
|
new_domain=self._extract_domain(new_domain) if new_domain else "",
|
|
timestamp=timestamp,
|
|
)
|
|
|
|
try:
|
|
# Step 1: Crawl URL inventory
|
|
self.logger.info("Step 1/6: Crawling URL inventory via Firecrawl...")
|
|
url_inventory = await self.crawl_url_inventory(domain)
|
|
|
|
# Step 2: Fetch Ahrefs baseline
|
|
self.logger.info("Step 2/6: Fetching Ahrefs top pages baseline...")
|
|
top_pages = await self.fetch_top_pages_baseline(domain)
|
|
site_metrics = await self.fetch_site_metrics(domain)
|
|
|
|
# Step 3: Enrich URL inventory with Ahrefs data
|
|
self.logger.info("Step 3/6: Enriching URLs with traffic/backlink data...")
|
|
top_pages_map: dict[str, dict] = {}
|
|
for page in top_pages:
|
|
page_url = page.get("url", "")
|
|
if page_url:
|
|
top_pages_map[page_url] = page
|
|
|
|
for url_entry in url_inventory:
|
|
page_data = top_pages_map.get(url_entry.url, {})
|
|
url_entry.traffic = int(page_data.get("traffic", 0))
|
|
url_entry.keywords = int(page_data.get("keywords", 0))
|
|
|
|
# Step 4: Risk assessment per URL
|
|
self.logger.info("Step 4/6: Scoring risk per URL...")
|
|
for url_entry in url_inventory:
|
|
url_entry.risk_score = self.assess_url_risk(url_entry)
|
|
url_entry.priority = self.classify_priority(url_entry.risk_score)
|
|
|
|
# Build baseline
|
|
baseline = MigrationBaseline(
|
|
domain=self._extract_domain(domain),
|
|
total_urls=len(url_inventory),
|
|
total_traffic=site_metrics.get("organic_traffic", 0),
|
|
total_keywords=site_metrics.get("organic_keywords", 0),
|
|
total_referring_domains=site_metrics.get("referring_domains", 0),
|
|
top_pages=top_pages[:50], # Store top 50 for reference
|
|
url_inventory=url_inventory,
|
|
)
|
|
plan.baseline = baseline
|
|
|
|
# Step 5: Generate redirect map
|
|
self.logger.info("Step 5/6: Generating redirect map...")
|
|
plan.redirect_map = self.generate_redirect_map(
|
|
url_inventory, migration_type, new_domain
|
|
)
|
|
|
|
# Build risk assessment summary
|
|
high_risk = sum(1 for u in url_inventory if u.risk_score >= 75)
|
|
medium_risk = sum(1 for u in url_inventory if 25 <= u.risk_score < 75)
|
|
low_risk = sum(1 for u in url_inventory if u.risk_score < 25)
|
|
|
|
# Determine overall risk level
|
|
if high_risk > len(url_inventory) * 0.2:
|
|
overall_risk = "critical"
|
|
elif high_risk > len(url_inventory) * 0.1:
|
|
overall_risk = "high"
|
|
elif medium_risk > len(url_inventory) * 0.3:
|
|
overall_risk = "medium"
|
|
else:
|
|
overall_risk = "low"
|
|
|
|
# Top risk URLs
|
|
sorted_urls = sorted(url_inventory, key=lambda u: u.risk_score, reverse=True)
|
|
top_risk = [
|
|
{
|
|
"url": u.url,
|
|
"risk_score": u.risk_score,
|
|
"traffic": u.traffic,
|
|
"keywords": u.keywords,
|
|
"backlinks": u.backlinks,
|
|
}
|
|
for u in sorted_urls[:20]
|
|
]
|
|
|
|
# Risk factors
|
|
risk_factors: list[str] = []
|
|
if high_risk > 0:
|
|
risk_factors.append(
|
|
f"{high_risk}개 고위험 URL (트래픽/백링크 손실 위험)"
|
|
)
|
|
if baseline.total_traffic > 10000:
|
|
risk_factors.append(
|
|
f"월간 오가닉 트래픽 {baseline.total_traffic:,}회 — 이전 실패 시 큰 영향"
|
|
)
|
|
if baseline.total_referring_domains > 500:
|
|
risk_factors.append(
|
|
f"참조 도메인 {baseline.total_referring_domains:,}개 — 리디렉트 누락 시 링크 에퀴티 손실"
|
|
)
|
|
if migration_type == "domain-move":
|
|
risk_factors.append(
|
|
"도메인 변경은 가장 위험한 이전 유형 — 최소 3-6개월 회복 예상"
|
|
)
|
|
elif migration_type == "url-restructure":
|
|
risk_factors.append(
|
|
"URL 구조 변경 시 모든 내부/외부 링크 영향 — 정규식 리디렉트 필수"
|
|
)
|
|
|
|
plan.risk_assessment = RiskAssessment(
|
|
high_risk_urls=high_risk,
|
|
medium_risk_urls=medium_risk,
|
|
low_risk_urls=low_risk,
|
|
overall_risk=overall_risk,
|
|
top_risk_urls=top_risk,
|
|
risk_factors=risk_factors,
|
|
)
|
|
|
|
# Step 6: Generate checklist
|
|
self.logger.info("Step 6/6: Generating pre-migration checklist...")
|
|
plan.pre_migration_checklist = self.generate_checklist(migration_type)
|
|
|
|
self.logger.info(
|
|
f"Migration plan complete: {len(url_inventory)} URLs inventoried, "
|
|
f"{len(plan.redirect_map)} redirects mapped, "
|
|
f"overall risk: {overall_risk}"
|
|
)
|
|
|
|
except Exception as e:
|
|
msg = f"Migration planning pipeline error: {e}"
|
|
self.logger.error(msg)
|
|
plan.errors.append(msg)
|
|
|
|
return plan
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Output helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _format_text_report(plan: MigrationPlan) -> str:
|
|
"""Format migration plan as human-readable text report."""
|
|
lines: list[str] = []
|
|
lines.append("=" * 70)
|
|
lines.append(" SEO MIGRATION PLAN")
|
|
lines.append(f" Domain: {plan.domain}")
|
|
if plan.new_domain:
|
|
lines.append(f" New Domain: {plan.new_domain}")
|
|
lines.append(f" Migration Type: {MIGRATION_TYPES.get(plan.migration_type, plan.migration_type)}")
|
|
lines.append(f" Generated: {plan.timestamp}")
|
|
lines.append("=" * 70)
|
|
|
|
if plan.baseline:
|
|
b = plan.baseline
|
|
lines.append("")
|
|
lines.append("--- BASELINE ---")
|
|
lines.append(f" Total URLs: {b.total_urls:,}")
|
|
lines.append(f" Organic Traffic: {b.total_traffic:,}")
|
|
lines.append(f" Organic Keywords: {b.total_keywords:,}")
|
|
lines.append(f" Referring Domains: {b.total_referring_domains:,}")
|
|
|
|
if plan.risk_assessment:
|
|
r = plan.risk_assessment
|
|
lines.append("")
|
|
lines.append("--- RISK ASSESSMENT ---")
|
|
lines.append(f" Overall Risk: {r.overall_risk.upper()}")
|
|
lines.append(f" High Risk URLs: {r.high_risk_urls:,}")
|
|
lines.append(f" Medium Risk: {r.medium_risk_urls:,}")
|
|
lines.append(f" Low Risk: {r.low_risk_urls:,}")
|
|
if r.risk_factors:
|
|
lines.append("")
|
|
lines.append(" Risk Factors:")
|
|
for factor in r.risk_factors:
|
|
lines.append(f" - {factor}")
|
|
if r.top_risk_urls:
|
|
lines.append("")
|
|
lines.append(" Top Risk URLs:")
|
|
for url_info in r.top_risk_urls[:10]:
|
|
lines.append(
|
|
f" [{url_info['risk_score']:.0f}] {url_info['url']} "
|
|
f"(traffic={url_info['traffic']:,}, kw={url_info['keywords']})"
|
|
)
|
|
|
|
if plan.redirect_map:
|
|
lines.append("")
|
|
lines.append(f"--- REDIRECT MAP ({len(plan.redirect_map)} entries) ---")
|
|
# Show top 20 by risk
|
|
for i, rmap in enumerate(plan.redirect_map[:20], 1):
|
|
lines.append(
|
|
f" {i:>3}. [{rmap.priority.upper():>8}] "
|
|
f"{rmap.source} -> {rmap.target}"
|
|
)
|
|
if len(plan.redirect_map) > 20:
|
|
lines.append(f" ... and {len(plan.redirect_map) - 20} more entries")
|
|
|
|
if plan.pre_migration_checklist:
|
|
lines.append("")
|
|
lines.append("--- PRE-MIGRATION CHECKLIST ---")
|
|
for item in plan.pre_migration_checklist:
|
|
status_marker = "[ ]" if item["status"] == "pending" else "[x]"
|
|
lines.append(
|
|
f" {status_marker} Step {item['step']}: {item['task']}"
|
|
)
|
|
lines.append(f" {item['description']}")
|
|
|
|
if plan.errors:
|
|
lines.append("")
|
|
lines.append("--- ERRORS ---")
|
|
for err in plan.errors:
|
|
lines.append(f" - {err}")
|
|
|
|
lines.append("")
|
|
lines.append("=" * 70)
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _serialize_plan(plan: MigrationPlan) -> dict:
|
|
"""Convert plan to JSON-serializable dict."""
|
|
output: dict[str, Any] = {
|
|
"domain": plan.domain,
|
|
"new_domain": plan.new_domain,
|
|
"migration_type": plan.migration_type,
|
|
"baseline": None,
|
|
"redirect_map": [asdict(r) for r in plan.redirect_map],
|
|
"risk_assessment": asdict(plan.risk_assessment) if plan.risk_assessment else None,
|
|
"pre_migration_checklist": plan.pre_migration_checklist,
|
|
"timestamp": plan.timestamp,
|
|
}
|
|
|
|
if plan.baseline:
|
|
output["baseline"] = {
|
|
"domain": plan.baseline.domain,
|
|
"total_urls": plan.baseline.total_urls,
|
|
"total_traffic": plan.baseline.total_traffic,
|
|
"total_keywords": plan.baseline.total_keywords,
|
|
"total_referring_domains": plan.baseline.total_referring_domains,
|
|
"top_pages": plan.baseline.top_pages,
|
|
"url_inventory": [asdict(u) for u in plan.baseline.url_inventory],
|
|
}
|
|
|
|
if plan.errors:
|
|
output["errors"] = plan.errors
|
|
|
|
return output
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CLI
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(
|
|
description="SEO Migration Planner - Pre-migration risk assessment and redirect mapping",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""\
|
|
Examples:
|
|
python migration_planner.py --domain https://example.com --type domain-move --new-domain https://new-example.com --json
|
|
python migration_planner.py --domain https://example.com --type platform --json
|
|
python migration_planner.py --domain https://example.com --type url-restructure --json
|
|
python migration_planner.py --domain http://example.com --type https --json
|
|
python migration_planner.py --domain https://blog.example.com --type subdomain --new-domain https://example.com/blog --json
|
|
""",
|
|
)
|
|
parser.add_argument(
|
|
"--domain",
|
|
required=True,
|
|
help="Target website URL or domain to plan migration for",
|
|
)
|
|
parser.add_argument(
|
|
"--type",
|
|
required=True,
|
|
choices=["domain-move", "platform", "url-restructure", "https", "subdomain"],
|
|
help="Migration type",
|
|
)
|
|
parser.add_argument(
|
|
"--new-domain",
|
|
type=str,
|
|
default=None,
|
|
help="New domain/URL (required for domain-move and subdomain types)",
|
|
)
|
|
parser.add_argument(
|
|
"--json",
|
|
action="store_true",
|
|
default=False,
|
|
help="Output in JSON format",
|
|
)
|
|
parser.add_argument(
|
|
"--output",
|
|
type=str,
|
|
default=None,
|
|
help="Save output to file path",
|
|
)
|
|
return parser.parse_args(argv)
|
|
|
|
|
|
async def async_main(args: argparse.Namespace) -> None:
|
|
# Validate required arguments for specific types
|
|
if args.type in ("domain-move", "subdomain") and not args.new_domain:
|
|
logger.error(f"--new-domain is required for {args.type} migration type")
|
|
sys.exit(1)
|
|
|
|
planner = MigrationPlanner()
|
|
|
|
plan = await planner.run(
|
|
domain=args.domain,
|
|
migration_type=args.type,
|
|
new_domain=args.new_domain,
|
|
)
|
|
|
|
if args.json:
|
|
output_str = json.dumps(_serialize_plan(plan), indent=2, ensure_ascii=False)
|
|
else:
|
|
output_str = _format_text_report(plan)
|
|
|
|
if args.output:
|
|
with open(args.output, "w", encoding="utf-8") as f:
|
|
f.write(output_str)
|
|
logger.info(f"Migration plan saved to {args.output}")
|
|
else:
|
|
print(output_str)
|
|
|
|
planner.print_stats()
|
|
|
|
|
|
def main() -> None:
|
|
args = parse_args()
|
|
asyncio.run(async_main(args))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|