""" Base Client - Shared async client utilities =========================================== Purpose: Rate-limited async operations for API clients Python: 3.10+ """ import asyncio import logging import os from asyncio import Semaphore from datetime import datetime from typing import Any, Callable, TypeVar from dotenv import load_dotenv from tenacity import ( retry, stop_after_attempt, wait_exponential, retry_if_exception_type, ) # Load environment variables load_dotenv() # Logging setup logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", ) T = TypeVar("T") class RateLimiter: """Rate limiter using token bucket algorithm.""" def __init__(self, rate: float, per: float = 1.0): """ Initialize rate limiter. Args: rate: Number of requests allowed per: Time period in seconds (default: 1 second) """ self.rate = rate self.per = per self.tokens = rate self.last_update = datetime.now() self._lock = asyncio.Lock() async def acquire(self) -> None: """Acquire a token, waiting if necessary.""" async with self._lock: now = datetime.now() elapsed = (now - self.last_update).total_seconds() self.tokens = min(self.rate, self.tokens + elapsed * (self.rate / self.per)) self.last_update = now if self.tokens < 1: wait_time = (1 - self.tokens) * (self.per / self.rate) await asyncio.sleep(wait_time) self.tokens = 0 else: self.tokens -= 1 class BaseAsyncClient: """Base class for async API clients with rate limiting.""" def __init__( self, max_concurrent: int = 5, requests_per_second: float = 3.0, logger: logging.Logger | None = None, ): """ Initialize base client. Args: max_concurrent: Maximum concurrent requests requests_per_second: Rate limit logger: Logger instance """ self.semaphore = Semaphore(max_concurrent) self.rate_limiter = RateLimiter(requests_per_second) self.logger = logger or logging.getLogger(self.__class__.__name__) self.stats = { "requests": 0, "success": 0, "errors": 0, "retries": 0, } @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10), retry=retry_if_exception_type(Exception), ) async def _rate_limited_request( self, coro: Callable[[], Any], ) -> Any: """Execute a request with rate limiting and retry.""" async with self.semaphore: await self.rate_limiter.acquire() self.stats["requests"] += 1 try: result = await coro() self.stats["success"] += 1 return result except Exception as e: self.stats["errors"] += 1 self.logger.error(f"Request failed: {e}") raise async def batch_requests( self, requests: list[Callable[[], Any]], desc: str = "Processing", ) -> list[Any]: """Execute multiple requests concurrently.""" try: from tqdm.asyncio import tqdm has_tqdm = True except ImportError: has_tqdm = False async def execute(req: Callable) -> Any: try: return await self._rate_limited_request(req) except Exception as e: return {"error": str(e)} tasks = [execute(req) for req in requests] if has_tqdm: results = [] for coro in tqdm.as_completed(tasks, total=len(tasks), desc=desc): result = await coro results.append(result) return results else: return await asyncio.gather(*tasks, return_exceptions=True) def print_stats(self) -> None: """Print request statistics.""" self.logger.info("=" * 40) self.logger.info("Request Statistics:") self.logger.info(f" Total Requests: {self.stats['requests']}") self.logger.info(f" Successful: {self.stats['success']}") self.logger.info(f" Errors: {self.stats['errors']}") self.logger.info("=" * 40) class ConfigManager: """Manage API configuration and credentials.""" def __init__(self): load_dotenv() @property def google_credentials_path(self) -> str | None: """Get Google service account credentials path.""" # Prefer SEO-specific credentials, fallback to general credentials seo_creds = os.path.expanduser("~/.credential/ourdigital-seo-agent.json") if os.path.exists(seo_creds): return seo_creds return os.getenv("GOOGLE_APPLICATION_CREDENTIALS") @property def pagespeed_api_key(self) -> str | None: """Get PageSpeed Insights API key.""" return os.getenv("PAGESPEED_API_KEY") @property def custom_search_api_key(self) -> str | None: """Get Custom Search API key.""" return os.getenv("CUSTOM_SEARCH_API_KEY") @property def custom_search_engine_id(self) -> str | None: """Get Custom Search Engine ID.""" return os.getenv("CUSTOM_SEARCH_ENGINE_ID") @property def notion_token(self) -> str | None: """Get Notion API token.""" return os.getenv("NOTION_TOKEN") or os.getenv("NOTION_API_KEY") def validate_google_credentials(self) -> bool: """Validate Google credentials are configured.""" creds_path = self.google_credentials_path if not creds_path: return False return os.path.exists(creds_path) def get_required(self, key: str) -> str: """Get required environment variable or raise error.""" value = os.getenv(key) if not value: raise ValueError(f"Missing required environment variable: {key}") return value # Singleton config instance config = ConfigManager()