Initial commit: Claude Skills Factory with 8 refined custom skills
Custom Skills (ourdigital-custom-skills/): - 00-ourdigital-visual-storytelling: Blog featured image prompt generator - 01-ourdigital-research-publisher: Research-to-publication workflow - 02-notion-organizer: Notion workspace management - 03-research-to-presentation: Notion research to PPT/Figma - 04-seo-gateway-strategist: SEO gateway page strategy planning - 05-gateway-page-content-builder: Gateway page content generation - 20-jamie-brand-editor: Jamie Clinic branded content GENERATION - 21-jamie-brand-guardian: Jamie Clinic content REVIEW & evaluation Refinements applied: - All skills converted to SKILL.md format with YAML frontmatter - Added version fields to all skills - Flattened nested folder structures - Removed packaging artifacts (.zip, .skill files) - Reorganized file structures (scripts/, references/, etc.) - Differentiated Jamie skills with clear roles 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
95
ourdigital-custom-skills/02-notion-organizer/SKILL.md
Normal file
95
ourdigital-custom-skills/02-notion-organizer/SKILL.md
Normal file
@@ -0,0 +1,95 @@
|
||||
---
|
||||
name: notion-organizer
|
||||
version: 1.0.0
|
||||
description: Notion workspace management agent for organizing, restructuring, consolidating, and maintaining databases and pages. Use when user asks to search Notion, organize databases, clean up properties, migrate data, merge databases, audit schemas, or manage Notion content. Activates for keywords like Notion, database, knowledge base, wiki, workspace organization.
|
||||
allowed-tools: mcp__notion__*, Read, Write, Edit, Bash(python:*), Bash(pip:*)
|
||||
---
|
||||
|
||||
# Notion Organizer Skill
|
||||
|
||||
## Purpose
|
||||
|
||||
Specialized Notion workspace management capability for:
|
||||
- Database schema analysis and optimization
|
||||
- Property standardization and cleanup
|
||||
- Content restructuring and hierarchy optimization
|
||||
- Database merging and migration
|
||||
- Bulk operations with rate-limit compliance
|
||||
|
||||
## Execution Strategy: Three-Tier Approach
|
||||
|
||||
Always follow this priority order:
|
||||
|
||||
### Tier 1: Notion MCP Tools (Primary)
|
||||
|
||||
Use built-in MCP tools first. Available tools:
|
||||
|
||||
| Tool | Purpose |
|
||||
|------|---------|
|
||||
| `mcp__notion__search` | Find pages/databases by keyword |
|
||||
| `mcp__notion__get-page` | Retrieve page content |
|
||||
| `mcp__notion__get-database` | Retrieve database schema |
|
||||
| `mcp__notion__create-page` | Create new pages |
|
||||
| `mcp__notion__update-page` | Modify page properties |
|
||||
| `mcp__notion__query-database` | Query database with filters |
|
||||
|
||||
### Tier 2: Alternative Approaches (Fallback)
|
||||
|
||||
If MCP tools insufficient:
|
||||
- Export/import via filesystem (user action required)
|
||||
- Memory tools for tracking state across sessions
|
||||
- Sequential thinking for complex planning
|
||||
|
||||
### Tier 3: Python Scripts (Advanced)
|
||||
|
||||
For bulk operations (50+ items):
|
||||
- Generate async Python scripts
|
||||
- Include rate limiting (3 req/sec max)
|
||||
- Provide requirements.txt
|
||||
- Always include dry-run option
|
||||
|
||||
See `scripts/` directory for templates.
|
||||
|
||||
## Operational Guidelines
|
||||
|
||||
### Before Any Modification
|
||||
1. **Fetch first**: Always examine current structure before changes
|
||||
2. **Confirm destructive actions**: Get user approval for deletes/major restructures
|
||||
3. **Estimate impact**: For large operations, provide time/API call estimates
|
||||
4. **Backup reminder**: Remind about Notion version history
|
||||
|
||||
### Rate Limits (Critical)
|
||||
- Maximum: 3 requests/second average
|
||||
- Use pagination (100 items max per request)
|
||||
- Implement exponential backoff on 429 errors
|
||||
|
||||
### Communication
|
||||
- Korean for explanations (한국어로 설명)
|
||||
- English for code and technical terms
|
||||
- Structured before/after summaries
|
||||
|
||||
## Quick Commands
|
||||
|
||||
### Database Audit
|
||||
"Analyze [database name] structure and recommend optimizations"
|
||||
|
||||
### Property Cleanup
|
||||
"Standardize property names in [database] to [convention]"
|
||||
|
||||
### Bulk Move
|
||||
"Move all pages tagged [X] from [source] to [target]"
|
||||
|
||||
### Schema Migration
|
||||
"Migrate data from [source database] to [target database]"
|
||||
|
||||
## Workflow Patterns
|
||||
|
||||
See `reference.md` for detailed workflow documentation.
|
||||
See `scripts/` for Python templates.
|
||||
|
||||
## Limitations
|
||||
|
||||
- Cannot access unshared databases/pages
|
||||
- Cannot modify workspace settings
|
||||
- Cannot recover permanently deleted content
|
||||
- Large operations (1000+ pages) require Python scripts
|
||||
@@ -0,0 +1,299 @@
|
||||
# Notion Organizer Examples
|
||||
|
||||
## Quick Reference: Common Tasks
|
||||
|
||||
### 1. Database Audit
|
||||
|
||||
**Request:**
|
||||
```
|
||||
Audit my "Client Projects" database and recommend optimizations
|
||||
```
|
||||
|
||||
**Expected Actions:**
|
||||
1. Fetch database schema using MCP
|
||||
2. Analyze properties: types, naming, usage
|
||||
3. Report findings in Korean
|
||||
4. Recommend specific improvements
|
||||
|
||||
**Sample Output:**
|
||||
```
|
||||
## Database Audit: Client Projects
|
||||
|
||||
### 현재 구조 (Current Structure)
|
||||
- Properties: 15개
|
||||
- 페이지 수: 234개
|
||||
|
||||
### 발견된 문제점 (Issues Found)
|
||||
|
||||
1. **중복 속성**: "담당자"와 "Assignee" 동일한 용도
|
||||
2. **미사용 속성**: "Legacy ID" - 모든 값이 비어있음
|
||||
3. **명명 불일치**: "due_date" vs "Due Date" vs "마감일"
|
||||
|
||||
### 권장 조치 (Recommendations)
|
||||
|
||||
1. "담당자"와 "Assignee" 통합 → "담당자"로 유지
|
||||
2. "Legacy ID" 속성 삭제
|
||||
3. 명명 규칙 통일: Title Case 권장
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### 2. Bulk Status Update
|
||||
|
||||
**Request:**
|
||||
```
|
||||
Change all pages with status "Pending Review" to "In Review"
|
||||
in the Tasks database
|
||||
```
|
||||
|
||||
**Tier 1 Approach (MCP):**
|
||||
```python
|
||||
# Claude uses MCP tools:
|
||||
1. mcp__notion__query-database with filter:
|
||||
{"property": "Status", "select": {"equals": "Pending Review"}}
|
||||
|
||||
2. For each page, mcp__notion__update-page:
|
||||
{"Status": {"select": {"name": "In Review"}}}
|
||||
```
|
||||
|
||||
**Tier 3 Approach (Python - for 50+ pages):**
|
||||
```bash
|
||||
python scripts/async_organizer.py \
|
||||
--database-id abc123 \
|
||||
--operation status-update \
|
||||
--old-status "Pending Review" \
|
||||
--new-status "In Review" \
|
||||
--dry-run # Test first!
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### 3. Schema Migration
|
||||
|
||||
**Request:**
|
||||
```
|
||||
Migrate data from "Old Projects" to "New Projects" database.
|
||||
Map Status→Stage, Due Date→Deadline, Tags→Categories
|
||||
```
|
||||
|
||||
**Step 1: Generate Mapping Template**
|
||||
```bash
|
||||
python scripts/schema_migrator.py \
|
||||
--source-db old_projects_id \
|
||||
--target-db new_projects_id \
|
||||
--generate-mapping \
|
||||
--output my_mapping.json
|
||||
```
|
||||
|
||||
**Step 2: Customize Mapping**
|
||||
Edit `my_mapping.json`:
|
||||
```json
|
||||
{
|
||||
"Status": {
|
||||
"target": "Stage",
|
||||
"value_mapping": {
|
||||
"Todo": "Backlog",
|
||||
"Doing": "In Progress",
|
||||
"Done": "Complete"
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
**Step 3: Execute Migration**
|
||||
```bash
|
||||
# Dry run first
|
||||
python scripts/schema_migrator.py \
|
||||
--source-db old_projects_id \
|
||||
--target-db new_projects_id \
|
||||
--mapping my_mapping.json \
|
||||
--dry-run
|
||||
|
||||
# Execute
|
||||
python scripts/schema_migrator.py \
|
||||
--source-db old_projects_id \
|
||||
--target-db new_projects_id \
|
||||
--mapping my_mapping.json
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### 4. Property Cleanup
|
||||
|
||||
**Request:**
|
||||
```
|
||||
Standardize all property names in "Marketing Campaigns"
|
||||
to Title Case with spaces
|
||||
```
|
||||
|
||||
**Before:**
|
||||
```
|
||||
- campaign_name → Campaign Name
|
||||
- startDate → Start Date
|
||||
- end-date → End Date
|
||||
- STATUS → Status
|
||||
- assigned_to → Assigned To
|
||||
```
|
||||
|
||||
**MCP Approach:**
|
||||
```
|
||||
Use mcp__notion__update-database to rename properties:
|
||||
{
|
||||
"properties": {
|
||||
"campaign_name": { "name": "Campaign Name" },
|
||||
"startDate": { "name": "Start Date" },
|
||||
"end-date": { "name": "End Date" },
|
||||
"STATUS": { "name": "Status" }
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### 5. Duplicate Detection
|
||||
|
||||
**Request:**
|
||||
```
|
||||
Find duplicate entries in "Contacts" database based on email
|
||||
```
|
||||
|
||||
**Python Script Approach:**
|
||||
```python
|
||||
# Pseudocode for duplicate detection
|
||||
pages = fetch_all_pages(database_id)
|
||||
|
||||
# Group by email
|
||||
email_groups = {}
|
||||
for page in pages:
|
||||
email = get_property(page, "Email")
|
||||
if email:
|
||||
email_groups.setdefault(email, []).append(page)
|
||||
|
||||
# Find duplicates
|
||||
duplicates = {
|
||||
email: pages
|
||||
for email, pages in email_groups.items()
|
||||
if len(pages) > 1
|
||||
}
|
||||
|
||||
# Report
|
||||
for email, dup_pages in duplicates.items():
|
||||
print(f"Duplicate: {email}")
|
||||
for p in dup_pages:
|
||||
print(f" - {get_title(p)} (created: {p['created_time']})")
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### 6. Archive Old Content
|
||||
|
||||
**Request:**
|
||||
```
|
||||
Move all tasks completed more than 90 days ago to Archive database
|
||||
```
|
||||
|
||||
**Filter:**
|
||||
```json
|
||||
{
|
||||
"and": [
|
||||
{
|
||||
"property": "Status",
|
||||
"status": { "equals": "Complete" }
|
||||
},
|
||||
{
|
||||
"property": "Completed Date",
|
||||
"date": {
|
||||
"before": "2025-09-07"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
**Process:**
|
||||
1. Query with filter
|
||||
2. For each page:
|
||||
- Create copy in Archive database
|
||||
- Update original with "Archived" status or delete
|
||||
3. Report summary
|
||||
|
||||
---
|
||||
|
||||
### 7. Relation Audit
|
||||
|
||||
**Request:**
|
||||
```
|
||||
Find all pages in "Tasks" that have broken relations to "Projects"
|
||||
```
|
||||
|
||||
**Approach:**
|
||||
1. Fetch all Tasks pages
|
||||
2. For each task, check Project relation
|
||||
3. Verify referenced Project page exists
|
||||
4. Report broken relations
|
||||
|
||||
**Sample Output:**
|
||||
```
|
||||
## Relation Audit: Tasks → Projects
|
||||
|
||||
총 작업: 150개
|
||||
정상 연결: 142개
|
||||
끊어진 연결: 8개
|
||||
|
||||
### 끊어진 연결 목록:
|
||||
1. "Website Redesign Phase 2" → Project not found
|
||||
2. "Q3 Marketing Review" → Project deleted
|
||||
...
|
||||
|
||||
### 권장 조치:
|
||||
- 삭제된 프로젝트 복원 또는
|
||||
- 해당 작업들을 다른 프로젝트에 재할당
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Environment Setup
|
||||
|
||||
### Quick Start
|
||||
```bash
|
||||
# Navigate to scripts directory
|
||||
cd ~/.claude/skills/notion-organizer/scripts
|
||||
|
||||
# Create virtual environment
|
||||
python -m venv venv
|
||||
source venv/bin/activate # macOS/Linux
|
||||
|
||||
# Install dependencies
|
||||
pip install -r requirements.txt
|
||||
|
||||
# Set environment variable
|
||||
export NOTION_TOKEN="your_token_here"
|
||||
# Or create .env file with NOTION_TOKEN=your_token
|
||||
```
|
||||
|
||||
### Verify Setup
|
||||
```bash
|
||||
# Test with audit (read-only)
|
||||
python async_organizer.py --database-id YOUR_DB_ID --operation audit
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
### Rate Limit Errors (429)
|
||||
- Scripts automatically retry with exponential backoff
|
||||
- If persistent, reduce `MAX_CONCURRENT_REQUESTS` to 2
|
||||
|
||||
### Permission Errors (404)
|
||||
- Ensure database is shared with your integration
|
||||
- Check integration has correct capabilities
|
||||
|
||||
### Property Type Mismatch
|
||||
- Use `--generate-mapping` to see current types
|
||||
- Some conversions require manual handling (e.g., people → text)
|
||||
|
||||
### Large Databases (1000+ pages)
|
||||
- Always use Python scripts, not MCP
|
||||
- Consider running in batches with checkpoints
|
||||
- Monitor API usage in Notion settings
|
||||
@@ -0,0 +1,250 @@
|
||||
# Notion Organizer Reference
|
||||
|
||||
## Notion API Fundamentals
|
||||
|
||||
### Base Configuration
|
||||
- **Base URL**: `https://api.notion.com`
|
||||
- **Current Version**: `2022-06-28`
|
||||
- **Authentication**: Bearer token in Authorization header
|
||||
|
||||
### Rate Limits
|
||||
|
||||
| Limit | Value | Strategy |
|
||||
|-------|-------|----------|
|
||||
| Requests/second | 3 (average) | Use throttling/semaphore |
|
||||
| Burst allowed | Small bursts | Implement exponential backoff |
|
||||
| Page size | 100 items max | Use pagination cursors |
|
||||
| Payload size | 500KB max | Split large operations |
|
||||
|
||||
### Core Object Hierarchy
|
||||
|
||||
```
|
||||
Workspace
|
||||
└── Database (container)
|
||||
└── Page (row)
|
||||
└── Block (content)
|
||||
```
|
||||
|
||||
### Property Types Reference
|
||||
|
||||
| Type | Use Case | Notes |
|
||||
|------|----------|-------|
|
||||
| `title` | Page name | Required, one per database |
|
||||
| `rich_text` | Text content | Max 2,000 chars |
|
||||
| `number` | Numeric values | Supports format options |
|
||||
| `select` | Single choice | Define options array |
|
||||
| `multi_select` | Multiple choices | Define options array |
|
||||
| `status` | Workflow states | Groups: To-do, In progress, Complete |
|
||||
| `date` | Dates/times | ISO 8601 format |
|
||||
| `checkbox` | Boolean | true/false |
|
||||
| `url` | Links | Max 2,000 chars |
|
||||
| `email` | Email addresses | Validation applied |
|
||||
| `phone_number` | Phone | String format |
|
||||
| `relation` | Links to pages | Requires database_id |
|
||||
| `rollup` | Aggregated data | Requires relation + function |
|
||||
| `formula` | Computed values | Expression syntax |
|
||||
| `files` | Attachments | External URLs or Notion hosted |
|
||||
| `people` | User references | Notion user IDs |
|
||||
| `created_time` | Auto timestamp | Read-only |
|
||||
| `created_by` | Auto user | Read-only |
|
||||
| `last_edited_time` | Auto timestamp | Read-only |
|
||||
| `last_edited_by` | Auto user | Read-only |
|
||||
|
||||
### Size Limits
|
||||
|
||||
| Element | Limit |
|
||||
|---------|-------|
|
||||
| Rich text content | 2,000 characters |
|
||||
| URL length | 2,000 characters |
|
||||
| Array elements | 100 items |
|
||||
| Page properties | 100 per page |
|
||||
| Database properties | 100 per database |
|
||||
|
||||
### Error Codes
|
||||
|
||||
| Code | Status | Action |
|
||||
|------|--------|--------|
|
||||
| `rate_limited` | 429 | Wait Retry-After header seconds |
|
||||
| `validation_error` | 400 | Check request body format |
|
||||
| `object_not_found` | 404 | Verify sharing/permissions |
|
||||
| `unauthorized` | 401 | Check API token validity |
|
||||
| `conflict_error` | 409 | Resource was modified, refetch |
|
||||
| `internal_server_error` | 500 | Retry with backoff |
|
||||
|
||||
---
|
||||
|
||||
## Workflow Patterns
|
||||
|
||||
### Pattern 1: Database Audit
|
||||
|
||||
**Purpose**: Analyze database structure and recommend optimizations
|
||||
|
||||
**Steps**:
|
||||
1. Fetch database schema via MCP or API
|
||||
2. Analyze property types, naming conventions, usage
|
||||
3. Identify issues:
|
||||
- Unused properties
|
||||
- Inconsistent naming
|
||||
- Suboptimal property types
|
||||
- Missing relations
|
||||
4. Present recommendations with rationale
|
||||
5. Execute approved changes incrementally
|
||||
|
||||
**Example Query**:
|
||||
```
|
||||
Audit my "Projects" database:
|
||||
- Check for unused properties
|
||||
- Identify naming inconsistencies
|
||||
- Recommend schema optimizations
|
||||
```
|
||||
|
||||
### Pattern 2: Bulk Reorganization
|
||||
|
||||
**Purpose**: Move/update many pages efficiently
|
||||
|
||||
**Decision Tree**:
|
||||
- ≤ 50 operations → Use MCP tools with staged execution
|
||||
- > 50 operations → Generate Python script
|
||||
|
||||
**Steps**:
|
||||
1. Assess scope (count affected pages)
|
||||
2. Estimate API calls and time
|
||||
3. Choose execution method (MCP vs Python)
|
||||
4. Execute with progress updates
|
||||
5. Generate summary report
|
||||
|
||||
**Example Query**:
|
||||
```
|
||||
Move all pages with status "Archived" from "Active Projects"
|
||||
to "Archive" database, preserving the Project Name and Date properties
|
||||
```
|
||||
|
||||
### Pattern 3: Schema Migration
|
||||
|
||||
**Purpose**: Transfer data between databases with different schemas
|
||||
|
||||
**Steps**:
|
||||
1. Fetch source database schema
|
||||
2. Fetch target database schema
|
||||
3. Create property mapping plan:
|
||||
- Direct mappings (same type)
|
||||
- Transformations needed (type conversion)
|
||||
- Unmappable properties (manual handling)
|
||||
4. Validate compatibility
|
||||
5. Execute migration:
|
||||
- MCP for small datasets
|
||||
- Python for large datasets
|
||||
6. Verify data integrity
|
||||
|
||||
**Property Mapping Template**:
|
||||
```
|
||||
Source Property → Target Property (Transformation)
|
||||
─────────────────────────────────────────────────
|
||||
Name (title) → Project Name (title) [Direct]
|
||||
Status (select) → Stage (status) [Map values]
|
||||
Due Date (date) → Deadline (date) [Direct]
|
||||
Tags (multi) → Categories (multi) [Merge options]
|
||||
Notes (text) → Description (text) [Direct]
|
||||
Owner (text) → Assignee (people) [Manual]
|
||||
```
|
||||
|
||||
### Pattern 4: Property Cleanup
|
||||
|
||||
**Purpose**: Standardize properties across databases
|
||||
|
||||
**Common Tasks**:
|
||||
- Rename properties to consistent convention (camelCase, snake_case, Title Case)
|
||||
- Consolidate duplicate select/multi-select options
|
||||
- Remove unused properties
|
||||
- Add missing required properties
|
||||
|
||||
**Naming Convention Guide**:
|
||||
```
|
||||
Recommended: Title Case with spaces
|
||||
Examples: "Project Name", "Due Date", "Status", "Assigned To"
|
||||
|
||||
Alternative: camelCase (for technical databases)
|
||||
Examples: "projectName", "dueDate", "status", "assignedTo"
|
||||
```
|
||||
|
||||
### Pattern 5: Duplicate Detection
|
||||
|
||||
**Purpose**: Find and handle duplicate or similar content
|
||||
|
||||
**Detection Strategies**:
|
||||
1. Exact title match
|
||||
2. Fuzzy title similarity (Levenshtein distance)
|
||||
3. Property combination match (e.g., same name + date)
|
||||
4. Content hash comparison
|
||||
|
||||
**Resolution Options**:
|
||||
- Merge: Combine properties from duplicates
|
||||
- Archive: Move older duplicate to archive
|
||||
- Delete: Remove with user confirmation
|
||||
- Link: Create relation between related items
|
||||
|
||||
---
|
||||
|
||||
## MCP Tool Usage Examples
|
||||
|
||||
### Search for Pages
|
||||
```
|
||||
Use mcp__notion__search to find:
|
||||
- Query: "marketing campaign"
|
||||
- Filter: database_id = "abc123"
|
||||
```
|
||||
|
||||
### Query Database with Filters
|
||||
```
|
||||
Use mcp__notion__query-database:
|
||||
- Database ID: "abc123"
|
||||
- Filter: { "property": "Status", "select": { "equals": "Active" } }
|
||||
- Sorts: [{ "property": "Created", "direction": "descending" }]
|
||||
```
|
||||
|
||||
### Update Page Properties
|
||||
```
|
||||
Use mcp__notion__update-page:
|
||||
- Page ID: "xyz789"
|
||||
- Properties: {
|
||||
"Status": { "select": { "name": "Completed" } },
|
||||
"Completed Date": { "date": { "start": "2025-12-05" } }
|
||||
}
|
||||
```
|
||||
|
||||
### Create New Page
|
||||
```
|
||||
Use mcp__notion__create-page:
|
||||
- Parent: { "database_id": "abc123" }
|
||||
- Properties: {
|
||||
"Name": { "title": [{ "text": { "content": "New Project" } }] },
|
||||
"Status": { "select": { "name": "Planning" } }
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Best Practices
|
||||
|
||||
### 1. Always Fetch Before Modify
|
||||
Never assume database structure. Always retrieve current schema first.
|
||||
|
||||
### 2. Batch Operations Wisely
|
||||
- Group related updates
|
||||
- Use pagination for queries
|
||||
- Implement checkpoints for large operations
|
||||
|
||||
### 3. Handle Relations Carefully
|
||||
- Relations require both databases to be accessible
|
||||
- Synced databases need special handling
|
||||
- Rollups depend on relations - update order matters
|
||||
|
||||
### 4. Preserve Data Integrity
|
||||
- Back up critical data before major changes
|
||||
- Use transactions where possible
|
||||
- Verify changes after execution
|
||||
|
||||
### 5. Respect User Permissions
|
||||
- Check integration has access to target resources
|
||||
- Request additional permissions when needed
|
||||
- Document permission requirements
|
||||
@@ -0,0 +1,331 @@
|
||||
"""
|
||||
Notion Async Organizer - Base Template
|
||||
======================================
|
||||
Purpose: Rate-limited async operations for Notion API
|
||||
Python: 3.10+
|
||||
Packages: notion-client, tenacity, tqdm, python-dotenv
|
||||
|
||||
Usage:
|
||||
python async_organizer.py --database-id <id> [--dry-run]
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import argparse
|
||||
import logging
|
||||
import os
|
||||
from asyncio import Semaphore
|
||||
from datetime import datetime
|
||||
from typing import Any
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from notion_client import AsyncClient
|
||||
from tenacity import (
|
||||
retry,
|
||||
stop_after_attempt,
|
||||
wait_exponential,
|
||||
retry_if_exception_type,
|
||||
)
|
||||
from tqdm.asyncio import tqdm
|
||||
|
||||
# Load environment variables
|
||||
load_dotenv()
|
||||
|
||||
# Configuration
|
||||
NOTION_API_KEY = os.getenv("NOTION_TOKEN") or os.getenv("NOTION_API_KEY")
|
||||
MAX_CONCURRENT_REQUESTS = 3
|
||||
REQUEST_DELAY = 0.35 # ~3 requests/second
|
||||
|
||||
# Logging setup
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s - %(levelname)s - %(message)s",
|
||||
handlers=[
|
||||
logging.StreamHandler(),
|
||||
logging.FileHandler(f"notion_organizer_{datetime.now():%Y%m%d_%H%M%S}.log"),
|
||||
],
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class NotionAsyncOrganizer:
|
||||
"""Async Notion operations with rate limiting and retry logic."""
|
||||
|
||||
def __init__(self, api_key: str, dry_run: bool = False):
|
||||
self.client = AsyncClient(auth=api_key)
|
||||
self.semaphore = Semaphore(MAX_CONCURRENT_REQUESTS)
|
||||
self.dry_run = dry_run
|
||||
self.stats = {"fetched": 0, "updated": 0, "created": 0, "errors": 0}
|
||||
|
||||
@retry(
|
||||
stop=stop_after_attempt(3),
|
||||
wait=wait_exponential(multiplier=1, min=2, max=10),
|
||||
retry=retry_if_exception_type(Exception),
|
||||
)
|
||||
async def _rate_limited_request(self, coro):
|
||||
"""Execute request with rate limiting and retry."""
|
||||
async with self.semaphore:
|
||||
await asyncio.sleep(REQUEST_DELAY)
|
||||
return await coro
|
||||
|
||||
async def fetch_database_schema(self, database_id: str) -> dict:
|
||||
"""Fetch database schema/properties."""
|
||||
logger.info(f"Fetching database schema: {database_id}")
|
||||
response = await self._rate_limited_request(
|
||||
self.client.databases.retrieve(database_id=database_id)
|
||||
)
|
||||
self.stats["fetched"] += 1
|
||||
return response
|
||||
|
||||
async def fetch_all_pages(
|
||||
self,
|
||||
database_id: str,
|
||||
filter_obj: dict | None = None,
|
||||
sorts: list | None = None,
|
||||
) -> list[dict]:
|
||||
"""Fetch all pages from a database with pagination."""
|
||||
all_pages = []
|
||||
has_more = True
|
||||
start_cursor = None
|
||||
|
||||
logger.info(f"Fetching pages from database: {database_id}")
|
||||
|
||||
while has_more:
|
||||
query_params = {
|
||||
"database_id": database_id,
|
||||
"page_size": 100,
|
||||
}
|
||||
if start_cursor:
|
||||
query_params["start_cursor"] = start_cursor
|
||||
if filter_obj:
|
||||
query_params["filter"] = filter_obj
|
||||
if sorts:
|
||||
query_params["sorts"] = sorts
|
||||
|
||||
response = await self._rate_limited_request(
|
||||
self.client.databases.query(**query_params)
|
||||
)
|
||||
|
||||
all_pages.extend(response["results"])
|
||||
has_more = response.get("has_more", False)
|
||||
start_cursor = response.get("next_cursor")
|
||||
self.stats["fetched"] += len(response["results"])
|
||||
|
||||
logger.info(f"Fetched {len(all_pages)} pages so far...")
|
||||
|
||||
return all_pages
|
||||
|
||||
async def update_page(self, page_id: str, properties: dict) -> dict | None:
|
||||
"""Update a single page's properties."""
|
||||
if self.dry_run:
|
||||
logger.info(f"[DRY-RUN] Would update page {page_id}: {properties}")
|
||||
return None
|
||||
|
||||
try:
|
||||
result = await self._rate_limited_request(
|
||||
self.client.pages.update(page_id=page_id, properties=properties)
|
||||
)
|
||||
self.stats["updated"] += 1
|
||||
return result
|
||||
except Exception as e:
|
||||
self.stats["errors"] += 1
|
||||
logger.error(f"Failed to update page {page_id}: {e}")
|
||||
raise
|
||||
|
||||
async def batch_update_pages(
|
||||
self, updates: list[dict], desc: str = "Updating pages"
|
||||
) -> list[dict]:
|
||||
"""Update multiple pages concurrently with progress bar."""
|
||||
results = []
|
||||
|
||||
async def update_single(update: dict) -> dict:
|
||||
try:
|
||||
result = await self.update_page(
|
||||
update["page_id"], update["properties"]
|
||||
)
|
||||
return {"page_id": update["page_id"], "success": True, "result": result}
|
||||
except Exception as e:
|
||||
return {"page_id": update["page_id"], "success": False, "error": str(e)}
|
||||
|
||||
tasks = [update_single(u) for u in updates]
|
||||
|
||||
for coro in tqdm.as_completed(tasks, total=len(tasks), desc=desc):
|
||||
result = await coro
|
||||
results.append(result)
|
||||
|
||||
success_count = sum(1 for r in results if r["success"])
|
||||
logger.info(f"Batch update complete: {success_count}/{len(updates)} succeeded")
|
||||
|
||||
return results
|
||||
|
||||
async def create_page(
|
||||
self, parent: dict, properties: dict, children: list | None = None
|
||||
) -> dict | None:
|
||||
"""Create a new page."""
|
||||
if self.dry_run:
|
||||
logger.info(f"[DRY-RUN] Would create page: {properties}")
|
||||
return None
|
||||
|
||||
try:
|
||||
create_params = {"parent": parent, "properties": properties}
|
||||
if children:
|
||||
create_params["children"] = children
|
||||
|
||||
result = await self._rate_limited_request(
|
||||
self.client.pages.create(**create_params)
|
||||
)
|
||||
self.stats["created"] += 1
|
||||
return result
|
||||
except Exception as e:
|
||||
self.stats["errors"] += 1
|
||||
logger.error(f"Failed to create page: {e}")
|
||||
raise
|
||||
|
||||
async def search(
|
||||
self, query: str, filter_type: str | None = None
|
||||
) -> list[dict]:
|
||||
"""Search Notion workspace."""
|
||||
all_results = []
|
||||
has_more = True
|
||||
start_cursor = None
|
||||
|
||||
while has_more:
|
||||
search_params = {"query": query, "page_size": 100}
|
||||
if start_cursor:
|
||||
search_params["start_cursor"] = start_cursor
|
||||
if filter_type:
|
||||
search_params["filter"] = {"property": "object", "value": filter_type}
|
||||
|
||||
response = await self._rate_limited_request(
|
||||
self.client.search(**search_params)
|
||||
)
|
||||
|
||||
all_results.extend(response["results"])
|
||||
has_more = response.get("has_more", False)
|
||||
start_cursor = response.get("next_cursor")
|
||||
|
||||
return all_results
|
||||
|
||||
def print_stats(self):
|
||||
"""Print operation statistics."""
|
||||
logger.info("=" * 50)
|
||||
logger.info("Operation Statistics:")
|
||||
logger.info(f" Fetched: {self.stats['fetched']}")
|
||||
logger.info(f" Updated: {self.stats['updated']}")
|
||||
logger.info(f" Created: {self.stats['created']}")
|
||||
logger.info(f" Errors: {self.stats['errors']}")
|
||||
logger.info("=" * 50)
|
||||
|
||||
async def close(self):
|
||||
"""Close the client connection."""
|
||||
await self.client.aclose()
|
||||
|
||||
|
||||
# ============================================================
|
||||
# Example Operations - Customize these for your specific task
|
||||
# ============================================================
|
||||
|
||||
|
||||
async def example_audit_database(organizer: NotionAsyncOrganizer, database_id: str):
|
||||
"""Example: Audit a database and report on its structure."""
|
||||
schema = await organizer.fetch_database_schema(database_id)
|
||||
|
||||
print("\n" + "=" * 60)
|
||||
print(f"Database: {schema.get('title', [{}])[0].get('plain_text', 'Untitled')}")
|
||||
print("=" * 60)
|
||||
|
||||
properties = schema.get("properties", {})
|
||||
print(f"\nTotal Properties: {len(properties)}\n")
|
||||
|
||||
for name, prop in properties.items():
|
||||
prop_type = prop.get("type", "unknown")
|
||||
print(f" - {name}: {prop_type}")
|
||||
|
||||
# Show select/multi_select options
|
||||
if prop_type in ("select", "multi_select"):
|
||||
options = prop.get(prop_type, {}).get("options", [])
|
||||
if options:
|
||||
option_names = [o["name"] for o in options[:5]]
|
||||
suffix = f" (+{len(options) - 5} more)" if len(options) > 5 else ""
|
||||
print(f" Options: {', '.join(option_names)}{suffix}")
|
||||
|
||||
return schema
|
||||
|
||||
|
||||
async def example_bulk_status_update(
|
||||
organizer: NotionAsyncOrganizer,
|
||||
database_id: str,
|
||||
old_status: str,
|
||||
new_status: str,
|
||||
):
|
||||
"""Example: Update status for all pages matching a filter."""
|
||||
# Fetch pages with old status
|
||||
filter_obj = {"property": "Status", "select": {"equals": old_status}}
|
||||
pages = await organizer.fetch_all_pages(database_id, filter_obj=filter_obj)
|
||||
|
||||
print(f"\nFound {len(pages)} pages with status '{old_status}'")
|
||||
|
||||
if not pages:
|
||||
return
|
||||
|
||||
# Prepare updates
|
||||
updates = [
|
||||
{
|
||||
"page_id": page["id"],
|
||||
"properties": {"Status": {"select": {"name": new_status}}},
|
||||
}
|
||||
for page in pages
|
||||
]
|
||||
|
||||
# Execute batch update
|
||||
results = await organizer.batch_update_pages(
|
||||
updates, desc=f"Updating status to '{new_status}'"
|
||||
)
|
||||
|
||||
return results
|
||||
|
||||
|
||||
async def main():
|
||||
"""Main entry point."""
|
||||
parser = argparse.ArgumentParser(description="Notion Async Organizer")
|
||||
parser.add_argument("--database-id", "-d", required=True, help="Database ID")
|
||||
parser.add_argument(
|
||||
"--dry-run", action="store_true", help="Preview changes without executing"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--operation",
|
||||
"-o",
|
||||
choices=["audit", "status-update"],
|
||||
default="audit",
|
||||
help="Operation to perform",
|
||||
)
|
||||
parser.add_argument("--old-status", help="Old status value (for status-update)")
|
||||
parser.add_argument("--new-status", help="New status value (for status-update)")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if not NOTION_API_KEY:
|
||||
logger.error("NOTION_TOKEN or NOTION_API_KEY environment variable not set")
|
||||
return
|
||||
|
||||
organizer = NotionAsyncOrganizer(NOTION_API_KEY, dry_run=args.dry_run)
|
||||
|
||||
try:
|
||||
if args.operation == "audit":
|
||||
await example_audit_database(organizer, args.database_id)
|
||||
|
||||
elif args.operation == "status-update":
|
||||
if not args.old_status or not args.new_status:
|
||||
logger.error("--old-status and --new-status required for status-update")
|
||||
return
|
||||
await example_bulk_status_update(
|
||||
organizer, args.database_id, args.old_status, args.new_status
|
||||
)
|
||||
|
||||
organizer.print_stats()
|
||||
|
||||
finally:
|
||||
await organizer.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
@@ -0,0 +1,26 @@
|
||||
# Notion Organizer Scripts - Requirements
|
||||
# Python 3.10+ required
|
||||
|
||||
# Notion API client
|
||||
notion-client==2.2.1
|
||||
|
||||
# Async HTTP
|
||||
aiohttp==3.9.1
|
||||
|
||||
# Rate limiting
|
||||
asyncio-throttle==1.0.2
|
||||
|
||||
# Environment variables
|
||||
python-dotenv==1.0.0
|
||||
|
||||
# Retry logic
|
||||
tenacity==8.2.3
|
||||
|
||||
# Progress bars
|
||||
tqdm==4.66.1
|
||||
|
||||
# Optional: Data analysis
|
||||
# pandas==2.1.4
|
||||
|
||||
# Optional: Fuzzy matching for duplicates
|
||||
# rapidfuzz==3.5.2
|
||||
@@ -0,0 +1,367 @@
|
||||
"""
|
||||
Notion Schema Migrator
|
||||
======================
|
||||
Purpose: Migrate data between Notion databases with schema mapping
|
||||
Python: 3.10+
|
||||
Packages: notion-client, tenacity, tqdm, python-dotenv
|
||||
|
||||
Usage:
|
||||
python schema_migrator.py \
|
||||
--source-db <source_database_id> \
|
||||
--target-db <target_database_id> \
|
||||
--mapping mapping.json \
|
||||
[--dry-run]
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import argparse
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
from asyncio import Semaphore
|
||||
from datetime import datetime
|
||||
from typing import Any
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from notion_client import AsyncClient
|
||||
from tenacity import retry, stop_after_attempt, wait_exponential
|
||||
from tqdm.asyncio import tqdm
|
||||
|
||||
load_dotenv()
|
||||
|
||||
NOTION_API_KEY = os.getenv("NOTION_TOKEN") or os.getenv("NOTION_API_KEY")
|
||||
MAX_CONCURRENT_REQUESTS = 3
|
||||
REQUEST_DELAY = 0.35
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s - %(levelname)s - %(message)s",
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SchemaMigrator:
|
||||
"""Migrate data between Notion databases with property mapping."""
|
||||
|
||||
def __init__(self, api_key: str, dry_run: bool = False):
|
||||
self.client = AsyncClient(auth=api_key)
|
||||
self.semaphore = Semaphore(MAX_CONCURRENT_REQUESTS)
|
||||
self.dry_run = dry_run
|
||||
self.stats = {
|
||||
"pages_fetched": 0,
|
||||
"pages_migrated": 0,
|
||||
"pages_skipped": 0,
|
||||
"errors": 0,
|
||||
}
|
||||
|
||||
@retry(
|
||||
stop=stop_after_attempt(3),
|
||||
wait=wait_exponential(multiplier=1, min=2, max=10),
|
||||
)
|
||||
async def _request(self, coro):
|
||||
async with self.semaphore:
|
||||
await asyncio.sleep(REQUEST_DELAY)
|
||||
return await coro
|
||||
|
||||
async def get_schema(self, database_id: str) -> dict:
|
||||
"""Get database schema."""
|
||||
return await self._request(
|
||||
self.client.databases.retrieve(database_id=database_id)
|
||||
)
|
||||
|
||||
async def fetch_all_pages(self, database_id: str) -> list[dict]:
|
||||
"""Fetch all pages from source database."""
|
||||
pages = []
|
||||
has_more = True
|
||||
cursor = None
|
||||
|
||||
while has_more:
|
||||
params = {"database_id": database_id, "page_size": 100}
|
||||
if cursor:
|
||||
params["start_cursor"] = cursor
|
||||
|
||||
response = await self._request(self.client.databases.query(**params))
|
||||
pages.extend(response["results"])
|
||||
has_more = response.get("has_more", False)
|
||||
cursor = response.get("next_cursor")
|
||||
self.stats["pages_fetched"] = len(pages)
|
||||
logger.info(f"Fetched {len(pages)} pages...")
|
||||
|
||||
return pages
|
||||
|
||||
def transform_property(
|
||||
self,
|
||||
value: dict,
|
||||
source_type: str,
|
||||
target_type: str,
|
||||
value_mapping: dict | None = None,
|
||||
) -> dict | None:
|
||||
"""Transform a property value from source to target type."""
|
||||
|
||||
# Extract raw value based on source type
|
||||
raw_value = None
|
||||
|
||||
if source_type == "title":
|
||||
raw_value = "".join(
|
||||
t.get("plain_text", "") for t in value.get("title", [])
|
||||
)
|
||||
elif source_type == "rich_text":
|
||||
raw_value = "".join(
|
||||
t.get("plain_text", "") for t in value.get("rich_text", [])
|
||||
)
|
||||
elif source_type == "number":
|
||||
raw_value = value.get("number")
|
||||
elif source_type == "select":
|
||||
select_val = value.get("select")
|
||||
raw_value = select_val.get("name") if select_val else None
|
||||
elif source_type == "multi_select":
|
||||
raw_value = [o.get("name") for o in value.get("multi_select", [])]
|
||||
elif source_type == "status":
|
||||
status_val = value.get("status")
|
||||
raw_value = status_val.get("name") if status_val else None
|
||||
elif source_type == "date":
|
||||
raw_value = value.get("date")
|
||||
elif source_type == "checkbox":
|
||||
raw_value = value.get("checkbox")
|
||||
elif source_type == "url":
|
||||
raw_value = value.get("url")
|
||||
elif source_type == "email":
|
||||
raw_value = value.get("email")
|
||||
elif source_type == "phone_number":
|
||||
raw_value = value.get("phone_number")
|
||||
|
||||
if raw_value is None:
|
||||
return None
|
||||
|
||||
# Apply value mapping if provided
|
||||
if value_mapping and isinstance(raw_value, str):
|
||||
raw_value = value_mapping.get(raw_value, raw_value)
|
||||
elif value_mapping and isinstance(raw_value, list):
|
||||
raw_value = [value_mapping.get(v, v) for v in raw_value]
|
||||
|
||||
# Convert to target type
|
||||
if target_type == "title":
|
||||
return {"title": [{"text": {"content": str(raw_value)}}]}
|
||||
elif target_type == "rich_text":
|
||||
return {"rich_text": [{"text": {"content": str(raw_value)}}]}
|
||||
elif target_type == "number":
|
||||
try:
|
||||
return {"number": float(raw_value) if raw_value else None}
|
||||
except (ValueError, TypeError):
|
||||
return None
|
||||
elif target_type == "select":
|
||||
return {"select": {"name": str(raw_value)}} if raw_value else None
|
||||
elif target_type == "multi_select":
|
||||
if isinstance(raw_value, list):
|
||||
return {"multi_select": [{"name": v} for v in raw_value]}
|
||||
return {"multi_select": [{"name": str(raw_value)}]}
|
||||
elif target_type == "status":
|
||||
return {"status": {"name": str(raw_value)}} if raw_value else None
|
||||
elif target_type == "date":
|
||||
return {"date": raw_value} if raw_value else None
|
||||
elif target_type == "checkbox":
|
||||
return {"checkbox": bool(raw_value)}
|
||||
elif target_type == "url":
|
||||
return {"url": str(raw_value)} if raw_value else None
|
||||
elif target_type == "email":
|
||||
return {"email": str(raw_value)} if raw_value else None
|
||||
elif target_type == "phone_number":
|
||||
return {"phone_number": str(raw_value)} if raw_value else None
|
||||
|
||||
return None
|
||||
|
||||
def map_page_properties(
|
||||
self,
|
||||
source_page: dict,
|
||||
mapping: dict,
|
||||
source_schema: dict,
|
||||
target_schema: dict,
|
||||
) -> dict:
|
||||
"""Map source page properties to target schema."""
|
||||
source_props = source_page.get("properties", {})
|
||||
target_props = {}
|
||||
|
||||
for source_name, mapping_config in mapping.items():
|
||||
if source_name not in source_props:
|
||||
continue
|
||||
|
||||
target_name = mapping_config.get("target", source_name)
|
||||
value_mapping = mapping_config.get("value_mapping")
|
||||
|
||||
source_type = source_schema["properties"].get(source_name, {}).get("type")
|
||||
target_type = target_schema["properties"].get(target_name, {}).get("type")
|
||||
|
||||
if not source_type or not target_type:
|
||||
logger.warning(
|
||||
f"Skipping {source_name}: source_type={source_type}, target_type={target_type}"
|
||||
)
|
||||
continue
|
||||
|
||||
transformed = self.transform_property(
|
||||
source_props[source_name],
|
||||
source_type,
|
||||
target_type,
|
||||
value_mapping,
|
||||
)
|
||||
|
||||
if transformed:
|
||||
target_props[target_name] = transformed
|
||||
|
||||
return target_props
|
||||
|
||||
async def migrate_page(
|
||||
self,
|
||||
page: dict,
|
||||
target_database_id: str,
|
||||
mapping: dict,
|
||||
source_schema: dict,
|
||||
target_schema: dict,
|
||||
) -> dict:
|
||||
"""Migrate a single page to target database."""
|
||||
page_id = page["id"]
|
||||
|
||||
try:
|
||||
properties = self.map_page_properties(
|
||||
page, mapping, source_schema, target_schema
|
||||
)
|
||||
|
||||
if not properties:
|
||||
self.stats["pages_skipped"] += 1
|
||||
return {"page_id": page_id, "success": False, "reason": "no_properties"}
|
||||
|
||||
if self.dry_run:
|
||||
logger.debug(f"[DRY-RUN] Would create: {properties}")
|
||||
return {"page_id": page_id, "success": True, "dry_run": True}
|
||||
|
||||
result = await self._request(
|
||||
self.client.pages.create(
|
||||
parent={"database_id": target_database_id},
|
||||
properties=properties,
|
||||
)
|
||||
)
|
||||
self.stats["pages_migrated"] += 1
|
||||
return {"page_id": page_id, "success": True, "new_page_id": result["id"]}
|
||||
|
||||
except Exception as e:
|
||||
self.stats["errors"] += 1
|
||||
logger.error(f"Failed to migrate page {page_id}: {e}")
|
||||
return {"page_id": page_id, "success": False, "error": str(e)}
|
||||
|
||||
async def migrate(
|
||||
self,
|
||||
source_db: str,
|
||||
target_db: str,
|
||||
mapping: dict,
|
||||
) -> list[dict]:
|
||||
"""Execute full migration."""
|
||||
logger.info("Fetching schemas...")
|
||||
source_schema = await self.get_schema(source_db)
|
||||
target_schema = await self.get_schema(target_db)
|
||||
|
||||
logger.info(f"Source: {len(source_schema['properties'])} properties")
|
||||
logger.info(f"Target: {len(target_schema['properties'])} properties")
|
||||
|
||||
logger.info("Fetching source pages...")
|
||||
pages = await self.fetch_all_pages(source_db)
|
||||
logger.info(f"Found {len(pages)} pages to migrate")
|
||||
|
||||
results = []
|
||||
for page in tqdm(pages, desc="Migrating"):
|
||||
result = await self.migrate_page(
|
||||
page, target_db, mapping, source_schema, target_schema
|
||||
)
|
||||
results.append(result)
|
||||
|
||||
return results
|
||||
|
||||
def print_stats(self):
|
||||
logger.info("=" * 50)
|
||||
logger.info("Migration Statistics:")
|
||||
logger.info(f" Pages Fetched: {self.stats['pages_fetched']}")
|
||||
logger.info(f" Pages Migrated: {self.stats['pages_migrated']}")
|
||||
logger.info(f" Pages Skipped: {self.stats['pages_skipped']}")
|
||||
logger.info(f" Errors: {self.stats['errors']}")
|
||||
logger.info("=" * 50)
|
||||
|
||||
async def close(self):
|
||||
await self.client.aclose()
|
||||
|
||||
|
||||
def generate_mapping_template(source_schema: dict, target_schema: dict) -> dict:
|
||||
"""Generate a mapping template for user to customize."""
|
||||
mapping = {}
|
||||
|
||||
for prop_name, prop_config in source_schema.get("properties", {}).items():
|
||||
source_type = prop_config.get("type")
|
||||
|
||||
# Try to find matching property in target
|
||||
target_match = None
|
||||
for t_name, t_config in target_schema.get("properties", {}).items():
|
||||
if t_name.lower() == prop_name.lower():
|
||||
target_match = t_name
|
||||
break
|
||||
|
||||
mapping[prop_name] = {
|
||||
"target": target_match or prop_name,
|
||||
"source_type": source_type,
|
||||
"value_mapping": None, # User can add {"old_value": "new_value"}
|
||||
}
|
||||
|
||||
return mapping
|
||||
|
||||
|
||||
async def main():
|
||||
parser = argparse.ArgumentParser(description="Notion Schema Migrator")
|
||||
parser.add_argument("--source-db", "-s", required=True, help="Source database ID")
|
||||
parser.add_argument("--target-db", "-t", required=True, help="Target database ID")
|
||||
parser.add_argument("--mapping", "-m", help="JSON mapping file path")
|
||||
parser.add_argument(
|
||||
"--generate-mapping",
|
||||
action="store_true",
|
||||
help="Generate mapping template",
|
||||
)
|
||||
parser.add_argument("--dry-run", action="store_true", help="Preview without executing")
|
||||
parser.add_argument("--output", "-o", help="Output file for generated mapping")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if not NOTION_API_KEY:
|
||||
logger.error("NOTION_TOKEN or NOTION_API_KEY not set")
|
||||
return
|
||||
|
||||
migrator = SchemaMigrator(NOTION_API_KEY, dry_run=args.dry_run)
|
||||
|
||||
try:
|
||||
if args.generate_mapping:
|
||||
source_schema = await migrator.get_schema(args.source_db)
|
||||
target_schema = await migrator.get_schema(args.target_db)
|
||||
mapping = generate_mapping_template(source_schema, target_schema)
|
||||
|
||||
output_file = args.output or "mapping_template.json"
|
||||
with open(output_file, "w") as f:
|
||||
json.dump(mapping, f, indent=2)
|
||||
logger.info(f"Mapping template saved to {output_file}")
|
||||
return
|
||||
|
||||
if not args.mapping:
|
||||
logger.error("--mapping required for migration (or use --generate-mapping)")
|
||||
return
|
||||
|
||||
with open(args.mapping) as f:
|
||||
mapping = json.load(f)
|
||||
|
||||
results = await migrator.migrate(args.source_db, args.target_db, mapping)
|
||||
migrator.print_stats()
|
||||
|
||||
# Save results
|
||||
output_file = f"migration_results_{datetime.now():%Y%m%d_%H%M%S}.json"
|
||||
with open(output_file, "w") as f:
|
||||
json.dump(results, f, indent=2)
|
||||
logger.info(f"Results saved to {output_file}")
|
||||
|
||||
finally:
|
||||
await migrator.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
@@ -0,0 +1,56 @@
|
||||
{
|
||||
"_comment": "Property Mapping Template - Customize for your migration",
|
||||
"_instructions": {
|
||||
"target": "Name of property in target database",
|
||||
"source_type": "Auto-detected, for reference only",
|
||||
"value_mapping": "Optional: Map old values to new values"
|
||||
},
|
||||
"Name": {
|
||||
"target": "Project Name",
|
||||
"source_type": "title",
|
||||
"value_mapping": null
|
||||
},
|
||||
"Status": {
|
||||
"target": "Stage",
|
||||
"source_type": "select",
|
||||
"value_mapping": {
|
||||
"Not Started": "Backlog",
|
||||
"In Progress": "Active",
|
||||
"Done": "Completed",
|
||||
"On Hold": "Paused"
|
||||
}
|
||||
},
|
||||
"Priority": {
|
||||
"target": "Priority",
|
||||
"source_type": "select",
|
||||
"value_mapping": {
|
||||
"P1": "High",
|
||||
"P2": "Medium",
|
||||
"P3": "Low"
|
||||
}
|
||||
},
|
||||
"Due Date": {
|
||||
"target": "Deadline",
|
||||
"source_type": "date",
|
||||
"value_mapping": null
|
||||
},
|
||||
"Tags": {
|
||||
"target": "Categories",
|
||||
"source_type": "multi_select",
|
||||
"value_mapping": {
|
||||
"marketing": "Marketing",
|
||||
"dev": "Development",
|
||||
"design": "Design"
|
||||
}
|
||||
},
|
||||
"Description": {
|
||||
"target": "Notes",
|
||||
"source_type": "rich_text",
|
||||
"value_mapping": null
|
||||
},
|
||||
"Completed": {
|
||||
"target": "Is Done",
|
||||
"source_type": "checkbox",
|
||||
"value_mapping": null
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user