Skill Numbering Changes: - 01-03: OurDigital core (was 30-32) - 31-32: Notion tools (was 01-02) - 99_archive: Renamed from _archive for sorting New Files: - AGENTS.md: Claude Code agent routing guide - requirements.txt for 00-claude-code-setting, 32-notion-writer, 43-jamie-youtube-manager Documentation Updates: - CLAUDE.md: Updated skill inventory (23 skills) - AUDIT_REPORT.md: Current completion status (91%) - Archived REFACTORING_PLAN.md (most tasks complete) Removed: - ga-agent-skills/ (moved to separate repo ~/Project/dintel-ga4-agent) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
332 lines
10 KiB
Python
332 lines
10 KiB
Python
"""
|
|
Notion Async Organizer - Base Template
|
|
======================================
|
|
Purpose: Rate-limited async operations for Notion API
|
|
Python: 3.10+
|
|
Packages: notion-client, tenacity, tqdm, python-dotenv
|
|
|
|
Usage:
|
|
python async_organizer.py --database-id <id> [--dry-run]
|
|
"""
|
|
|
|
import asyncio
|
|
import argparse
|
|
import logging
|
|
import os
|
|
from asyncio import Semaphore
|
|
from datetime import datetime
|
|
from typing import Any
|
|
|
|
from dotenv import load_dotenv
|
|
from notion_client import AsyncClient
|
|
from tenacity import (
|
|
retry,
|
|
stop_after_attempt,
|
|
wait_exponential,
|
|
retry_if_exception_type,
|
|
)
|
|
from tqdm.asyncio import tqdm
|
|
|
|
# Load environment variables
|
|
load_dotenv()
|
|
|
|
# Configuration
|
|
NOTION_API_KEY = os.getenv("NOTION_TOKEN") or os.getenv("NOTION_API_KEY")
|
|
MAX_CONCURRENT_REQUESTS = 3
|
|
REQUEST_DELAY = 0.35 # ~3 requests/second
|
|
|
|
# Logging setup
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s - %(levelname)s - %(message)s",
|
|
handlers=[
|
|
logging.StreamHandler(),
|
|
logging.FileHandler(f"notion_organizer_{datetime.now():%Y%m%d_%H%M%S}.log"),
|
|
],
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class NotionAsyncOrganizer:
|
|
"""Async Notion operations with rate limiting and retry logic."""
|
|
|
|
def __init__(self, api_key: str, dry_run: bool = False):
|
|
self.client = AsyncClient(auth=api_key)
|
|
self.semaphore = Semaphore(MAX_CONCURRENT_REQUESTS)
|
|
self.dry_run = dry_run
|
|
self.stats = {"fetched": 0, "updated": 0, "created": 0, "errors": 0}
|
|
|
|
@retry(
|
|
stop=stop_after_attempt(3),
|
|
wait=wait_exponential(multiplier=1, min=2, max=10),
|
|
retry=retry_if_exception_type(Exception),
|
|
)
|
|
async def _rate_limited_request(self, coro):
|
|
"""Execute request with rate limiting and retry."""
|
|
async with self.semaphore:
|
|
await asyncio.sleep(REQUEST_DELAY)
|
|
return await coro
|
|
|
|
async def fetch_database_schema(self, database_id: str) -> dict:
|
|
"""Fetch database schema/properties."""
|
|
logger.info(f"Fetching database schema: {database_id}")
|
|
response = await self._rate_limited_request(
|
|
self.client.databases.retrieve(database_id=database_id)
|
|
)
|
|
self.stats["fetched"] += 1
|
|
return response
|
|
|
|
async def fetch_all_pages(
|
|
self,
|
|
database_id: str,
|
|
filter_obj: dict | None = None,
|
|
sorts: list | None = None,
|
|
) -> list[dict]:
|
|
"""Fetch all pages from a database with pagination."""
|
|
all_pages = []
|
|
has_more = True
|
|
start_cursor = None
|
|
|
|
logger.info(f"Fetching pages from database: {database_id}")
|
|
|
|
while has_more:
|
|
query_params = {
|
|
"database_id": database_id,
|
|
"page_size": 100,
|
|
}
|
|
if start_cursor:
|
|
query_params["start_cursor"] = start_cursor
|
|
if filter_obj:
|
|
query_params["filter"] = filter_obj
|
|
if sorts:
|
|
query_params["sorts"] = sorts
|
|
|
|
response = await self._rate_limited_request(
|
|
self.client.databases.query(**query_params)
|
|
)
|
|
|
|
all_pages.extend(response["results"])
|
|
has_more = response.get("has_more", False)
|
|
start_cursor = response.get("next_cursor")
|
|
self.stats["fetched"] += len(response["results"])
|
|
|
|
logger.info(f"Fetched {len(all_pages)} pages so far...")
|
|
|
|
return all_pages
|
|
|
|
async def update_page(self, page_id: str, properties: dict) -> dict | None:
|
|
"""Update a single page's properties."""
|
|
if self.dry_run:
|
|
logger.info(f"[DRY-RUN] Would update page {page_id}: {properties}")
|
|
return None
|
|
|
|
try:
|
|
result = await self._rate_limited_request(
|
|
self.client.pages.update(page_id=page_id, properties=properties)
|
|
)
|
|
self.stats["updated"] += 1
|
|
return result
|
|
except Exception as e:
|
|
self.stats["errors"] += 1
|
|
logger.error(f"Failed to update page {page_id}: {e}")
|
|
raise
|
|
|
|
async def batch_update_pages(
|
|
self, updates: list[dict], desc: str = "Updating pages"
|
|
) -> list[dict]:
|
|
"""Update multiple pages concurrently with progress bar."""
|
|
results = []
|
|
|
|
async def update_single(update: dict) -> dict:
|
|
try:
|
|
result = await self.update_page(
|
|
update["page_id"], update["properties"]
|
|
)
|
|
return {"page_id": update["page_id"], "success": True, "result": result}
|
|
except Exception as e:
|
|
return {"page_id": update["page_id"], "success": False, "error": str(e)}
|
|
|
|
tasks = [update_single(u) for u in updates]
|
|
|
|
for coro in tqdm.as_completed(tasks, total=len(tasks), desc=desc):
|
|
result = await coro
|
|
results.append(result)
|
|
|
|
success_count = sum(1 for r in results if r["success"])
|
|
logger.info(f"Batch update complete: {success_count}/{len(updates)} succeeded")
|
|
|
|
return results
|
|
|
|
async def create_page(
|
|
self, parent: dict, properties: dict, children: list | None = None
|
|
) -> dict | None:
|
|
"""Create a new page."""
|
|
if self.dry_run:
|
|
logger.info(f"[DRY-RUN] Would create page: {properties}")
|
|
return None
|
|
|
|
try:
|
|
create_params = {"parent": parent, "properties": properties}
|
|
if children:
|
|
create_params["children"] = children
|
|
|
|
result = await self._rate_limited_request(
|
|
self.client.pages.create(**create_params)
|
|
)
|
|
self.stats["created"] += 1
|
|
return result
|
|
except Exception as e:
|
|
self.stats["errors"] += 1
|
|
logger.error(f"Failed to create page: {e}")
|
|
raise
|
|
|
|
async def search(
|
|
self, query: str, filter_type: str | None = None
|
|
) -> list[dict]:
|
|
"""Search Notion workspace."""
|
|
all_results = []
|
|
has_more = True
|
|
start_cursor = None
|
|
|
|
while has_more:
|
|
search_params = {"query": query, "page_size": 100}
|
|
if start_cursor:
|
|
search_params["start_cursor"] = start_cursor
|
|
if filter_type:
|
|
search_params["filter"] = {"property": "object", "value": filter_type}
|
|
|
|
response = await self._rate_limited_request(
|
|
self.client.search(**search_params)
|
|
)
|
|
|
|
all_results.extend(response["results"])
|
|
has_more = response.get("has_more", False)
|
|
start_cursor = response.get("next_cursor")
|
|
|
|
return all_results
|
|
|
|
def print_stats(self):
|
|
"""Print operation statistics."""
|
|
logger.info("=" * 50)
|
|
logger.info("Operation Statistics:")
|
|
logger.info(f" Fetched: {self.stats['fetched']}")
|
|
logger.info(f" Updated: {self.stats['updated']}")
|
|
logger.info(f" Created: {self.stats['created']}")
|
|
logger.info(f" Errors: {self.stats['errors']}")
|
|
logger.info("=" * 50)
|
|
|
|
async def close(self):
|
|
"""Close the client connection."""
|
|
await self.client.aclose()
|
|
|
|
|
|
# ============================================================
|
|
# Example Operations - Customize these for your specific task
|
|
# ============================================================
|
|
|
|
|
|
async def example_audit_database(organizer: NotionAsyncOrganizer, database_id: str):
|
|
"""Example: Audit a database and report on its structure."""
|
|
schema = await organizer.fetch_database_schema(database_id)
|
|
|
|
print("\n" + "=" * 60)
|
|
print(f"Database: {schema.get('title', [{}])[0].get('plain_text', 'Untitled')}")
|
|
print("=" * 60)
|
|
|
|
properties = schema.get("properties", {})
|
|
print(f"\nTotal Properties: {len(properties)}\n")
|
|
|
|
for name, prop in properties.items():
|
|
prop_type = prop.get("type", "unknown")
|
|
print(f" - {name}: {prop_type}")
|
|
|
|
# Show select/multi_select options
|
|
if prop_type in ("select", "multi_select"):
|
|
options = prop.get(prop_type, {}).get("options", [])
|
|
if options:
|
|
option_names = [o["name"] for o in options[:5]]
|
|
suffix = f" (+{len(options) - 5} more)" if len(options) > 5 else ""
|
|
print(f" Options: {', '.join(option_names)}{suffix}")
|
|
|
|
return schema
|
|
|
|
|
|
async def example_bulk_status_update(
|
|
organizer: NotionAsyncOrganizer,
|
|
database_id: str,
|
|
old_status: str,
|
|
new_status: str,
|
|
):
|
|
"""Example: Update status for all pages matching a filter."""
|
|
# Fetch pages with old status
|
|
filter_obj = {"property": "Status", "select": {"equals": old_status}}
|
|
pages = await organizer.fetch_all_pages(database_id, filter_obj=filter_obj)
|
|
|
|
print(f"\nFound {len(pages)} pages with status '{old_status}'")
|
|
|
|
if not pages:
|
|
return
|
|
|
|
# Prepare updates
|
|
updates = [
|
|
{
|
|
"page_id": page["id"],
|
|
"properties": {"Status": {"select": {"name": new_status}}},
|
|
}
|
|
for page in pages
|
|
]
|
|
|
|
# Execute batch update
|
|
results = await organizer.batch_update_pages(
|
|
updates, desc=f"Updating status to '{new_status}'"
|
|
)
|
|
|
|
return results
|
|
|
|
|
|
async def main():
|
|
"""Main entry point."""
|
|
parser = argparse.ArgumentParser(description="Notion Async Organizer")
|
|
parser.add_argument("--database-id", "-d", required=True, help="Database ID")
|
|
parser.add_argument(
|
|
"--dry-run", action="store_true", help="Preview changes without executing"
|
|
)
|
|
parser.add_argument(
|
|
"--operation",
|
|
"-o",
|
|
choices=["audit", "status-update"],
|
|
default="audit",
|
|
help="Operation to perform",
|
|
)
|
|
parser.add_argument("--old-status", help="Old status value (for status-update)")
|
|
parser.add_argument("--new-status", help="New status value (for status-update)")
|
|
|
|
args = parser.parse_args()
|
|
|
|
if not NOTION_API_KEY:
|
|
logger.error("NOTION_TOKEN or NOTION_API_KEY environment variable not set")
|
|
return
|
|
|
|
organizer = NotionAsyncOrganizer(NOTION_API_KEY, dry_run=args.dry_run)
|
|
|
|
try:
|
|
if args.operation == "audit":
|
|
await example_audit_database(organizer, args.database_id)
|
|
|
|
elif args.operation == "status-update":
|
|
if not args.old_status or not args.new_status:
|
|
logger.error("--old-status and --new-status required for status-update")
|
|
return
|
|
await example_bulk_status_update(
|
|
organizer, args.database_id, args.old_status, args.new_status
|
|
)
|
|
|
|
organizer.print_stats()
|
|
|
|
finally:
|
|
await organizer.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|