""" Notion Schema Migrator ====================== Purpose: Migrate data between Notion databases with schema mapping Python: 3.10+ Packages: notion-client, tenacity, tqdm, python-dotenv Usage: python schema_migrator.py \ --source-db \ --target-db \ --mapping mapping.json \ [--dry-run] """ import asyncio import argparse import json import logging import os from asyncio import Semaphore from datetime import datetime from typing import Any from dotenv import load_dotenv from notion_client import AsyncClient from tenacity import retry, stop_after_attempt, wait_exponential from tqdm.asyncio import tqdm load_dotenv() NOTION_API_KEY = os.getenv("NOTION_TOKEN") or os.getenv("NOTION_API_KEY") MAX_CONCURRENT_REQUESTS = 3 REQUEST_DELAY = 0.35 logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", ) logger = logging.getLogger(__name__) class SchemaMigrator: """Migrate data between Notion databases with property mapping.""" def __init__(self, api_key: str, dry_run: bool = False): self.client = AsyncClient(auth=api_key) self.semaphore = Semaphore(MAX_CONCURRENT_REQUESTS) self.dry_run = dry_run self.stats = { "pages_fetched": 0, "pages_migrated": 0, "pages_skipped": 0, "errors": 0, } @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10), ) async def _request(self, coro): async with self.semaphore: await asyncio.sleep(REQUEST_DELAY) return await coro async def get_schema(self, database_id: str) -> dict: """Get database schema.""" return await self._request( self.client.databases.retrieve(database_id=database_id) ) async def fetch_all_pages(self, database_id: str) -> list[dict]: """Fetch all pages from source database.""" pages = [] has_more = True cursor = None while has_more: params = {"database_id": database_id, "page_size": 100} if cursor: params["start_cursor"] = cursor response = await self._request(self.client.databases.query(**params)) pages.extend(response["results"]) has_more = response.get("has_more", False) cursor = response.get("next_cursor") self.stats["pages_fetched"] = len(pages) logger.info(f"Fetched {len(pages)} pages...") return pages def transform_property( self, value: dict, source_type: str, target_type: str, value_mapping: dict | None = None, ) -> dict | None: """Transform a property value from source to target type.""" # Extract raw value based on source type raw_value = None if source_type == "title": raw_value = "".join( t.get("plain_text", "") for t in value.get("title", []) ) elif source_type == "rich_text": raw_value = "".join( t.get("plain_text", "") for t in value.get("rich_text", []) ) elif source_type == "number": raw_value = value.get("number") elif source_type == "select": select_val = value.get("select") raw_value = select_val.get("name") if select_val else None elif source_type == "multi_select": raw_value = [o.get("name") for o in value.get("multi_select", [])] elif source_type == "status": status_val = value.get("status") raw_value = status_val.get("name") if status_val else None elif source_type == "date": raw_value = value.get("date") elif source_type == "checkbox": raw_value = value.get("checkbox") elif source_type == "url": raw_value = value.get("url") elif source_type == "email": raw_value = value.get("email") elif source_type == "phone_number": raw_value = value.get("phone_number") if raw_value is None: return None # Apply value mapping if provided if value_mapping and isinstance(raw_value, str): raw_value = value_mapping.get(raw_value, raw_value) elif value_mapping and isinstance(raw_value, list): raw_value = [value_mapping.get(v, v) for v in raw_value] # Convert to target type if target_type == "title": return {"title": [{"text": {"content": str(raw_value)}}]} elif target_type == "rich_text": return {"rich_text": [{"text": {"content": str(raw_value)}}]} elif target_type == "number": try: return {"number": float(raw_value) if raw_value else None} except (ValueError, TypeError): return None elif target_type == "select": return {"select": {"name": str(raw_value)}} if raw_value else None elif target_type == "multi_select": if isinstance(raw_value, list): return {"multi_select": [{"name": v} for v in raw_value]} return {"multi_select": [{"name": str(raw_value)}]} elif target_type == "status": return {"status": {"name": str(raw_value)}} if raw_value else None elif target_type == "date": return {"date": raw_value} if raw_value else None elif target_type == "checkbox": return {"checkbox": bool(raw_value)} elif target_type == "url": return {"url": str(raw_value)} if raw_value else None elif target_type == "email": return {"email": str(raw_value)} if raw_value else None elif target_type == "phone_number": return {"phone_number": str(raw_value)} if raw_value else None return None def map_page_properties( self, source_page: dict, mapping: dict, source_schema: dict, target_schema: dict, ) -> dict: """Map source page properties to target schema.""" source_props = source_page.get("properties", {}) target_props = {} for source_name, mapping_config in mapping.items(): if source_name not in source_props: continue target_name = mapping_config.get("target", source_name) value_mapping = mapping_config.get("value_mapping") source_type = source_schema["properties"].get(source_name, {}).get("type") target_type = target_schema["properties"].get(target_name, {}).get("type") if not source_type or not target_type: logger.warning( f"Skipping {source_name}: source_type={source_type}, target_type={target_type}" ) continue transformed = self.transform_property( source_props[source_name], source_type, target_type, value_mapping, ) if transformed: target_props[target_name] = transformed return target_props async def migrate_page( self, page: dict, target_database_id: str, mapping: dict, source_schema: dict, target_schema: dict, ) -> dict: """Migrate a single page to target database.""" page_id = page["id"] try: properties = self.map_page_properties( page, mapping, source_schema, target_schema ) if not properties: self.stats["pages_skipped"] += 1 return {"page_id": page_id, "success": False, "reason": "no_properties"} if self.dry_run: logger.debug(f"[DRY-RUN] Would create: {properties}") return {"page_id": page_id, "success": True, "dry_run": True} result = await self._request( self.client.pages.create( parent={"database_id": target_database_id}, properties=properties, ) ) self.stats["pages_migrated"] += 1 return {"page_id": page_id, "success": True, "new_page_id": result["id"]} except Exception as e: self.stats["errors"] += 1 logger.error(f"Failed to migrate page {page_id}: {e}") return {"page_id": page_id, "success": False, "error": str(e)} async def migrate( self, source_db: str, target_db: str, mapping: dict, ) -> list[dict]: """Execute full migration.""" logger.info("Fetching schemas...") source_schema = await self.get_schema(source_db) target_schema = await self.get_schema(target_db) logger.info(f"Source: {len(source_schema['properties'])} properties") logger.info(f"Target: {len(target_schema['properties'])} properties") logger.info("Fetching source pages...") pages = await self.fetch_all_pages(source_db) logger.info(f"Found {len(pages)} pages to migrate") results = [] for page in tqdm(pages, desc="Migrating"): result = await self.migrate_page( page, target_db, mapping, source_schema, target_schema ) results.append(result) return results def print_stats(self): logger.info("=" * 50) logger.info("Migration Statistics:") logger.info(f" Pages Fetched: {self.stats['pages_fetched']}") logger.info(f" Pages Migrated: {self.stats['pages_migrated']}") logger.info(f" Pages Skipped: {self.stats['pages_skipped']}") logger.info(f" Errors: {self.stats['errors']}") logger.info("=" * 50) async def close(self): await self.client.aclose() def generate_mapping_template(source_schema: dict, target_schema: dict) -> dict: """Generate a mapping template for user to customize.""" mapping = {} for prop_name, prop_config in source_schema.get("properties", {}).items(): source_type = prop_config.get("type") # Try to find matching property in target target_match = None for t_name, t_config in target_schema.get("properties", {}).items(): if t_name.lower() == prop_name.lower(): target_match = t_name break mapping[prop_name] = { "target": target_match or prop_name, "source_type": source_type, "value_mapping": None, # User can add {"old_value": "new_value"} } return mapping async def main(): parser = argparse.ArgumentParser(description="Notion Schema Migrator") parser.add_argument("--source-db", "-s", required=True, help="Source database ID") parser.add_argument("--target-db", "-t", required=True, help="Target database ID") parser.add_argument("--mapping", "-m", help="JSON mapping file path") parser.add_argument( "--generate-mapping", action="store_true", help="Generate mapping template", ) parser.add_argument("--dry-run", action="store_true", help="Preview without executing") parser.add_argument("--output", "-o", help="Output file for generated mapping") args = parser.parse_args() if not NOTION_API_KEY: logger.error("NOTION_TOKEN or NOTION_API_KEY not set") return migrator = SchemaMigrator(NOTION_API_KEY, dry_run=args.dry_run) try: if args.generate_mapping: source_schema = await migrator.get_schema(args.source_db) target_schema = await migrator.get_schema(args.target_db) mapping = generate_mapping_template(source_schema, target_schema) output_file = args.output or "mapping_template.json" with open(output_file, "w") as f: json.dump(mapping, f, indent=2) logger.info(f"Mapping template saved to {output_file}") return if not args.mapping: logger.error("--mapping required for migration (or use --generate-mapping)") return with open(args.mapping) as f: mapping = json.load(f) results = await migrator.migrate(args.source_db, args.target_db, mapping) migrator.print_stats() # Save results output_file = f"migration_results_{datetime.now():%Y%m%d_%H%M%S}.json" with open(output_file, "w") as f: json.dump(results, f, indent=2) logger.info(f"Results saved to {output_file}") finally: await migrator.close() if __name__ == "__main__": asyncio.run(main())