#!/usr/bin/env python3 """ GTM Manager - Comprehensive Google Tag Manager management toolkit. Features: - Audit: GTM container validation, dataLayer analysis, form/checkout tracking - Inject: Generate custom HTML tags for dataLayer pushes when direct code access is unavailable - Export: Send audit results to Notion database Usage: # Audit mode python gtm_manager.py audit --url "https://example.com" --journey full # Inject mode - generate dataLayer push tags python gtm_manager.py inject --event purchase --output tags/ # Generate tags from audit report python gtm_manager.py inject --from-audit gtm_audit_report.json Options: audit Run GTM audit on a URL inject Generate custom HTML tags for dataLayer injection --url Target URL to audit (required for audit) --container Expected GTM container ID (e.g., GTM-XXXXXX) --journey Journey type: pageview, scroll, click, form, checkout, datalayer, full --output Output file/directory path --notion Export results to Notion database """ import argparse import json import re import sys from datetime import datetime from urllib.parse import urlparse, parse_qs from playwright.sync_api import sync_playwright # Tag destination patterns TAG_DESTINATIONS = { "GA4": [ r"google-analytics\.com/g/collect", r"analytics\.google\.com/g/collect", ], "Universal Analytics": [ r"google-analytics\.com/collect", r"google-analytics\.com/r/collect", ], "Google Ads": [ r"googleads\.g\.doubleclick\.net", r"google\.com/pagead", r"googleadservices\.com/pagead", ], "Meta Pixel": [ r"facebook\.com/tr", r"connect\.facebook\.net", ], "LinkedIn": [ r"px\.ads\.linkedin\.com", r"snap\.licdn\.com", ], "TikTok": [ r"analytics\.tiktok\.com", ], "Twitter/X": [ r"ads-twitter\.com", r"t\.co/i/adsct", ], "Kakao": [ r"pixel\.kakao\.com", ], "Naver": [ r"wcs\.naver\.com", ], } # GA4 Required Parameters by Event GA4_EVENT_REQUIREMENTS = { "purchase": { "required": ["transaction_id", "value", "currency"], "items_required": ["item_id", "item_name"], }, "add_to_cart": { "required": ["currency", "value"], "items_required": ["item_id", "item_name"], }, "begin_checkout": { "required": ["currency", "value"], "items_required": ["item_id", "item_name"], }, "add_shipping_info": { "required": ["currency", "value"], "recommended": ["shipping_tier"], }, "add_payment_info": { "required": ["currency", "value"], "recommended": ["payment_type"], }, "view_item": { "required": ["currency", "value"], "items_required": ["item_id", "item_name"], }, "view_cart": { "required": ["currency", "value"], "items_required": ["item_id", "item_name"], }, "generate_lead": { "recommended": ["currency", "value"], }, "form_submit": { "recommended": ["form_id", "form_name"], }, } # Checkout flow sequence CHECKOUT_SEQUENCE = [ "view_cart", "begin_checkout", "add_shipping_info", "add_payment_info", "purchase", ] class DataLayerValidator: """Advanced dataLayer validation and monitoring.""" def __init__(self): self.events = [] self.issues = [] self.snapshots = [] def validate_event(self, event_data): """Validate a single dataLayer event against GA4 specs.""" issues = [] event_name = event_data.get("event") if not event_name: return issues # Check if event has requirements if event_name in GA4_EVENT_REQUIREMENTS: reqs = GA4_EVENT_REQUIREMENTS[event_name] ecommerce = event_data.get("ecommerce", {}) # Check required fields for field in reqs.get("required", []): if field not in ecommerce and field not in event_data: issues.append({ "type": "missing_required", "event": event_name, "field": field, "message": f"Missing required field: {field}", }) # Check items array items = ecommerce.get("items", []) if reqs.get("items_required") and not items: issues.append({ "type": "missing_items", "event": event_name, "message": "E-commerce event missing 'items' array", }) # Validate items structure for i, item in enumerate(items): for field in reqs.get("items_required", []): if field not in item: issues.append({ "type": "item_missing_field", "event": event_name, "item_index": i, "field": field, "message": f"Item {i} missing required field: {field}", }) # Check data types if "value" in ecommerce: if not isinstance(ecommerce["value"], (int, float)): issues.append({ "type": "wrong_type", "event": event_name, "field": "value", "message": f"'value' should be number, got {type(ecommerce['value']).__name__}", }) # Check transaction_id uniqueness hint if event_name == "purchase" and "transaction_id" in ecommerce: tid = ecommerce["transaction_id"] if not tid or tid == "" or tid == "undefined": issues.append({ "type": "invalid_transaction_id", "event": event_name, "message": "transaction_id is empty or invalid", }) return issues def validate_sequence(self, events): """Validate checkout event sequence.""" issues = [] event_names = [e.get("event") for e in events if e.get("event")] # Find checkout events in order checkout_events = [e for e in event_names if e in CHECKOUT_SEQUENCE] # Check sequence last_idx = -1 for event in checkout_events: idx = CHECKOUT_SEQUENCE.index(event) if idx < last_idx: issues.append({ "type": "sequence_error", "message": f"Event '{event}' fired out of order", }) last_idx = idx return issues def check_ecommerce_clear(self, events): """Check if ecommerce object is cleared before new pushes.""" issues = [] last_had_ecommerce = False for i, event in enumerate(events): has_ecommerce = "ecommerce" in event is_clear = event.get("ecommerce") is None if has_ecommerce and last_had_ecommerce and not is_clear: # Previous had ecommerce, this has ecommerce, but no clear issues.append({ "type": "missing_ecommerce_clear", "index": i, "event": event.get("event"), "message": "E-commerce data should be cleared before new push", }) if has_ecommerce and not is_clear: last_had_ecommerce = True elif is_clear: last_had_ecommerce = False return issues class FormAnalyzer: """Form discovery, analysis, and interaction tracking.""" def __init__(self, page): self.page = page self.forms = [] self.interactions = [] self.issues = [] def discover_forms(self): """Find and analyze all forms on the page.""" forms_data = self.page.evaluate(""" () => { const forms = document.querySelectorAll('form'); return Array.from(forms).map((form, idx) => { const fields = Array.from(form.querySelectorAll('input, select, textarea')); return { index: idx, id: form.id || null, name: form.name || null, action: form.action || null, method: form.method || 'get', className: form.className || null, fieldCount: fields.length, fields: fields.map(field => ({ type: field.type || field.tagName.toLowerCase(), name: field.name || null, id: field.id || null, required: field.required || false, placeholder: field.placeholder || null, validation: field.pattern || null, maxLength: field.maxLength > 0 ? field.maxLength : null, })), hasSubmitButton: form.querySelector('button[type="submit"], input[type="submit"]') !== null, }; }); } """) self.forms = forms_data return forms_data def analyze_form_tracking_readiness(self): """Check if forms are ready for GTM tracking.""" issues = [] for form in self.forms: # Check for identifiers if not form["id"] and not form["name"]: issues.append({ "type": "form_no_identifier", "form_index": form["index"], "message": f"Form {form['index']} has no id or name attribute", "recommendation": "Add id or name attribute for reliable form tracking", }) # Check fields for tracking for field in form["fields"]: if field["type"] in ["text", "email", "tel"] and not field["name"] and not field["id"]: issues.append({ "type": "field_no_identifier", "form_index": form["index"], "field_type": field["type"], "message": "Input field missing name/id for tracking", }) # Check for submit button if not form["hasSubmitButton"]: issues.append({ "type": "form_no_submit", "form_index": form["index"], "message": "Form has no submit button - may use JS submission", "recommendation": "Verify form submission triggers dataLayer push", }) self.issues = issues return issues def simulate_form_interaction(self, form_index=0): """Simulate user interaction with a form.""" if form_index >= len(self.forms): return {"error": "Form index out of range"} form = self.forms[form_index] interactions = [] # Find form element form_selector = f"form:nth-of-type({form_index + 1})" if form["id"]: form_selector = f"#{form['id']}" elif form["name"]: form_selector = f"form[name='{form['name']}']" try: form_element = self.page.locator(form_selector) # Interact with each field for field in form["fields"]: field_selector = None if field["id"]: field_selector = f"#{field['id']}" elif field["name"]: field_selector = f"[name='{field['name']}']" if not field_selector: continue try: field_element = self.page.locator(field_selector).first # Focus event field_element.focus() interactions.append({ "action": "focus", "field": field["name"] or field["id"], "timestamp": datetime.now().isoformat(), }) self.page.wait_for_timeout(200) # Fill based on type test_values = { "text": "Test User", "email": "test@example.com", "tel": "010-1234-5678", "number": "100", "password": "TestPass123!", } if field["type"] in test_values: field_element.fill(test_values[field["type"]]) interactions.append({ "action": "input", "field": field["name"] or field["id"], "type": field["type"], "timestamp": datetime.now().isoformat(), }) self.page.wait_for_timeout(200) # Blur event field_element.blur() interactions.append({ "action": "blur", "field": field["name"] or field["id"], "timestamp": datetime.now().isoformat(), }) except Exception as e: interactions.append({ "action": "error", "field": field["name"] or field["id"], "error": str(e), }) self.interactions = interactions return interactions except Exception as e: return {"error": str(e)} def check_form_events(self, datalayer_events): """Check if expected form events are in dataLayer.""" expected_events = ["form_start", "form_submit", "generate_lead"] found_events = [] missing_events = [] event_names = [e.get("event") for e in datalayer_events] for expected in expected_events: if expected in event_names: found_events.append(expected) else: missing_events.append(expected) return { "found": found_events, "missing": missing_events, "recommendation": "Consider implementing: " + ", ".join(missing_events) if missing_events else None, } class CheckoutFlowAnalyzer: """E-commerce checkout flow simulation and validation.""" def __init__(self, page): self.page = page self.steps_completed = [] self.events_captured = [] self.issues = [] def detect_checkout_elements(self): """Find checkout-related elements on page.""" elements = self.page.evaluate(""" () => { const selectors = { cart: [ '[class*="cart"]', '[id*="cart"]', '[class*="basket"]', '[id*="basket"]', ], checkout: [ '[class*="checkout"]', '[id*="checkout"]', 'button:has-text("Checkout")', 'a:has-text("Checkout")', 'button:has-text("결제")', 'a:has-text("결제")', ], addToCart: [ 'button:has-text("Add to Cart")', 'button:has-text("Add to Bag")', 'button:has-text("장바구니")', 'button:has-text("담기")', '[class*="add-to-cart"]', '[id*="add-to-cart"]', ], quantity: [ '[class*="quantity"]', '[name*="quantity"]', '[class*="qty"]', '[name*="qty"]', ], removeItem: [ '[class*="remove"]', 'button:has-text("Remove")', 'button:has-text("삭제")', '[class*="delete"]', ], promoCode: [ '[name*="promo"]', '[name*="coupon"]', '[id*="coupon"]', '[placeholder*="promo"]', '[placeholder*="coupon"]', ], }; const found = {}; for (const [type, selectorList] of Object.entries(selectors)) { found[type] = []; for (const sel of selectorList) { try { const elements = document.querySelectorAll(sel); elements.forEach(el => { found[type].push({ selector: sel, tag: el.tagName.toLowerCase(), text: el.textContent?.slice(0, 50) || null, visible: el.offsetParent !== null, }); }); } catch(e) {} } } return found; } """) return elements def simulate_add_to_cart(self): """Attempt to simulate add-to-cart action.""" try: # Try common add-to-cart selectors selectors = [ 'button:has-text("Add to Cart")', 'button:has-text("Add to Bag")', 'button:has-text("장바구니")', '[class*="add-to-cart"]:visible', '[id*="add-to-cart"]:visible', ] for selector in selectors: try: btn = self.page.locator(selector).first if btn.is_visible(): btn.click() self.steps_completed.append({ "step": "add_to_cart", "selector": selector, "timestamp": datetime.now().isoformat(), }) self.page.wait_for_timeout(1500) return True except: continue return False except Exception as e: self.issues.append({"step": "add_to_cart", "error": str(e)}) return False def simulate_begin_checkout(self): """Attempt to click checkout button.""" try: selectors = [ 'button:has-text("Checkout")', 'a:has-text("Checkout")', 'button:has-text("결제하기")', 'button:has-text("주문하기")', '[class*="checkout-btn"]:visible', ] for selector in selectors: try: btn = self.page.locator(selector).first if btn.is_visible(): btn.click() self.steps_completed.append({ "step": "begin_checkout", "selector": selector, "timestamp": datetime.now().isoformat(), }) self.page.wait_for_timeout(2000) return True except: continue return False except Exception as e: self.issues.append({"step": "begin_checkout", "error": str(e)}) return False def validate_checkout_events(self, datalayer_events): """Validate checkout-related events in dataLayer.""" results = { "events_found": [], "events_missing": [], "sequence_valid": True, "issues": [], } event_names = [e.get("event") for e in datalayer_events] # Check each checkout step for step in CHECKOUT_SEQUENCE: if step in event_names: results["events_found"].append(step) # Validate event parameters for event in datalayer_events: if event.get("event") == step: validator = DataLayerValidator() issues = validator.validate_event(event) results["issues"].extend(issues) else: results["events_missing"].append(step) # Check sequence found_sequence = [e for e in event_names if e in CHECKOUT_SEQUENCE] expected_order = [e for e in CHECKOUT_SEQUENCE if e in found_sequence] if found_sequence != expected_order: results["sequence_valid"] = False results["issues"].append({ "type": "sequence_error", "message": f"Events out of order. Found: {found_sequence}, Expected: {expected_order}", }) return results class GTMAuditor: """Main GTM audit orchestrator.""" def __init__(self, url, container_id=None, timeout=30000, headless=True): self.url = url self.expected_container = container_id self.timeout = timeout self.headless = headless self.report = { "audit_metadata": { "url": url, "timestamp": datetime.now().isoformat(), "expected_container": container_id, }, "container_status": {}, "datalayer_analysis": { "events": [], "validation_issues": [], "sequence_issues": [], }, "form_analysis": { "forms_found": [], "tracking_issues": [], "events_status": {}, }, "checkout_analysis": { "elements_found": {}, "events_status": {}, "flow_issues": [], }, "network_requests": [], "tags_fired": [], "issues": [], "recommendations": [], "checklist": {}, } self.network_requests = [] self.datalayer_history = [] self.page = None def _setup_network_monitoring(self, page): """Intercept and log network requests to tag destinations.""" def handle_request(request): url = request.url for destination, patterns in TAG_DESTINATIONS.items(): for pattern in patterns: if re.search(pattern, url): parsed = urlparse(url) params = parse_qs(parsed.query) self.network_requests.append({ "destination": destination, "url": url[:200], "method": request.method, "params": {k: v[0] if len(v) == 1 else v for k, v in params.items()}, "timestamp": datetime.now().isoformat(), }) break page.on("request", handle_request) def _setup_datalayer_monitoring(self, page): """Inject dataLayer monitoring script.""" page.evaluate(""" () => { window.__gtmAuditEvents = []; const originalPush = window.dataLayer.push; window.dataLayer.push = function() { const result = originalPush.apply(this, arguments); for (let i = 0; i < arguments.length; i++) { window.__gtmAuditEvents.push({ data: JSON.parse(JSON.stringify(arguments[i])), timestamp: new Date().toISOString() }); } return result; }; } """) def _capture_datalayer(self, page): """Capture current dataLayer state.""" try: datalayer = page.evaluate(""" () => { if (typeof window.dataLayer !== 'undefined') { return JSON.parse(JSON.stringify(window.dataLayer)); } return null; } """) return datalayer except Exception as e: return {"error": str(e)} def _capture_monitored_events(self, page): """Capture events logged by our monitoring.""" try: events = page.evaluate(""" () => window.__gtmAuditEvents || [] """) return events except: return [] def _check_gtm_container(self, page): """Verify GTM container installation.""" result = page.evaluate(""" () => { const scripts = document.querySelectorAll('script'); const gtmInfo = { installed: false, containers: [], position: null, noscript: false, dataLayerInit: false, dataLayerInitBeforeGTM: false, }; gtmInfo.dataLayerInit = typeof window.dataLayer !== 'undefined' && Array.isArray(window.dataLayer); let gtmScriptIndex = -1; let dataLayerInitIndex = -1; scripts.forEach((script, index) => { const src = script.src || ''; const innerHTML = script.innerHTML || ''; // Check for dataLayer init if (innerHTML.includes('dataLayer') && innerHTML.includes('[]')) { dataLayerInitIndex = index; } const gtmMatch = src.match(/gtm\\.js\\?id=(GTM-[A-Z0-9]+)/); if (gtmMatch) { gtmInfo.installed = true; gtmInfo.containers.push(gtmMatch[1]); gtmInfo.position = script.closest('head') ? 'head' : 'body'; gtmScriptIndex = index; } const inlineMatch = innerHTML.match(/GTM-[A-Z0-9]+/g); if (inlineMatch) { gtmInfo.installed = true; inlineMatch.forEach(id => { if (!gtmInfo.containers.includes(id)) { gtmInfo.containers.push(id); } }); } }); gtmInfo.dataLayerInitBeforeGTM = dataLayerInitIndex < gtmScriptIndex && dataLayerInitIndex !== -1; const noscripts = document.querySelectorAll('noscript'); noscripts.forEach(ns => { if (ns.innerHTML.includes('googletagmanager.com/ns.html')) { gtmInfo.noscript = true; } }); return gtmInfo; } """) status = { "installed": result["installed"], "containers": result["containers"], "position": result["position"], "noscript_present": result["noscript"], "datalayer_initialized": result["dataLayerInit"], "datalayer_init_before_gtm": result["dataLayerInitBeforeGTM"], "issues": [], } if not result["installed"]: status["issues"].append("GTM container not detected") self.report["issues"].append({ "severity": "critical", "type": "container_missing", "message": "GTM container script not found on page", }) if len(result["containers"]) > 1: status["issues"].append(f"Multiple containers: {result['containers']}") self.report["issues"].append({ "severity": "warning", "type": "multiple_containers", "message": f"Multiple GTM containers found: {', '.join(result['containers'])}", }) if self.expected_container and self.expected_container not in result["containers"]: self.report["issues"].append({ "severity": "error", "type": "container_mismatch", "message": f"Expected {self.expected_container}, found {result['containers']}", }) if result["position"] == "body": self.report["issues"].append({ "severity": "warning", "type": "script_position", "message": "GTM script in body - may delay tag firing", }) if not result["dataLayerInitBeforeGTM"]: self.report["issues"].append({ "severity": "warning", "type": "datalayer_order", "message": "dataLayer should be initialized before GTM script", }) self.report["container_status"] = status return status def _simulate_scroll(self, page): """Simulate scroll to trigger scroll-depth tags.""" page.evaluate(""" () => { const heights = [0.25, 0.5, 0.75, 0.9, 1.0]; const docHeight = document.documentElement.scrollHeight; heights.forEach((pct, i) => { setTimeout(() => { window.scrollTo(0, docHeight * pct); }, i * 500); }); } """) page.wait_for_timeout(3000) def _run_form_audit(self, page): """Execute form analysis.""" print("📝 Analyzing forms...") form_analyzer = FormAnalyzer(page) forms = form_analyzer.discover_forms() tracking_issues = form_analyzer.analyze_form_tracking_readiness() self.report["form_analysis"]["forms_found"] = forms self.report["form_analysis"]["tracking_issues"] = tracking_issues if forms: print(f" Found {len(forms)} form(s)") # Simulate interaction with first form interactions = form_analyzer.simulate_form_interaction(0) self.report["form_analysis"]["interactions"] = interactions # Allow time for events page.wait_for_timeout(2000) # Check form events datalayer = self._capture_datalayer(page) if datalayer: events_status = form_analyzer.check_form_events(datalayer) self.report["form_analysis"]["events_status"] = events_status else: print(" No forms found on page") def _run_checkout_audit(self, page): """Execute e-commerce checkout flow analysis.""" print("🛒 Analyzing checkout flow...") checkout_analyzer = CheckoutFlowAnalyzer(page) elements = checkout_analyzer.detect_checkout_elements() self.report["checkout_analysis"]["elements_found"] = elements # Log what we found for element_type, found in elements.items(): if found: print(f" Found {len(found)} {element_type} element(s)") def _run_datalayer_audit(self, page): """Execute deep dataLayer analysis.""" print("📊 Analyzing dataLayer...") datalayer = self._capture_datalayer(page) monitored_events = self._capture_monitored_events(page) if not datalayer: self.report["datalayer_analysis"]["issues"] = ["dataLayer not found"] return validator = DataLayerValidator() # Validate each event for event in datalayer: if isinstance(event, dict): issues = validator.validate_event(event) if issues: self.report["datalayer_analysis"]["validation_issues"].extend(issues) # Check sequence sequence_issues = validator.validate_sequence(datalayer) self.report["datalayer_analysis"]["sequence_issues"] = sequence_issues # Check ecommerce clearing clear_issues = validator.check_ecommerce_clear(datalayer) self.report["datalayer_analysis"]["validation_issues"].extend(clear_issues) # Store events events = [] for i, item in enumerate(datalayer): if isinstance(item, dict) and item.get("event"): events.append({ "index": i, "event": item.get("event"), "has_ecommerce": "ecommerce" in item, "params": list(item.keys()), }) self.report["datalayer_analysis"]["events"] = events print(f" Found {len(events)} events in dataLayer") def _generate_recommendations(self): """Generate recommendations based on findings.""" recs = [] for issue in self.report["issues"]: if issue["type"] == "container_missing": recs.append({ "priority": "high", "action": "Install GTM container", "details": "Add GTM snippet to
section", }) elif issue["type"] == "datalayer_order": recs.append({ "priority": "medium", "action": "Initialize dataLayer before GTM", "details": "Add 'window.dataLayer = window.dataLayer || [];' before GTM", }) # Form recommendations if not self.report["form_analysis"]["forms_found"]: pass # No forms to track elif self.report["form_analysis"].get("events_status", {}).get("missing"): missing = self.report["form_analysis"]["events_status"]["missing"] recs.append({ "priority": "medium", "action": "Implement form tracking events", "details": f"Missing events: {', '.join(missing)}", }) # DataLayer recommendations validation_issues = self.report["datalayer_analysis"].get("validation_issues", []) if validation_issues: recs.append({ "priority": "high", "action": "Fix dataLayer validation issues", "details": f"{len(validation_issues)} issue(s) found in event structure", }) # Tag coverage destinations = set(r["destination"] for r in self.network_requests) if "GA4" not in destinations: recs.append({ "priority": "high", "action": "Verify GA4 implementation", "details": "No GA4 requests detected", }) self.report["recommendations"] = recs def _generate_checklist(self): """Generate audit checklist.""" self.report["checklist"] = { "container_health": { "gtm_installed": self.report["container_status"].get("installed", False), "correct_container": self.expected_container in self.report["container_status"].get("containers", []) if self.expected_container else True, "no_duplicates": len(self.report["container_status"].get("containers", [])) <= 1, "correct_position": self.report["container_status"].get("position") == "head", "datalayer_init_order": self.report["container_status"].get("datalayer_init_before_gtm", False), }, "datalayer_quality": { "initialized": self.report["container_status"].get("datalayer_initialized", False), "events_present": len(self.report["datalayer_analysis"].get("events", [])) > 0, "no_validation_errors": len(self.report["datalayer_analysis"].get("validation_issues", [])) == 0, "correct_sequence": len(self.report["datalayer_analysis"].get("sequence_issues", [])) == 0, }, "form_tracking": { "forms_identifiable": all( f.get("id") or f.get("name") for f in self.report["form_analysis"].get("forms_found", []) ) if self.report["form_analysis"].get("forms_found") else True, "form_events_present": len( self.report["form_analysis"].get("events_status", {}).get("found", []) ) > 0 if self.report["form_analysis"].get("forms_found") else True, }, "tag_firing": { "ga4_active": any(r["destination"] == "GA4" for r in self.network_requests), "requests_captured": len(self.network_requests) > 0, }, } def run_audit(self, journey="pageview"): """Execute the full audit workflow.""" print(f"🔍 Starting GTM audit for: {self.url}") print(f" Journey type: {journey}") with sync_playwright() as p: browser = p.chromium.launch(headless=self.headless) context = browser.new_context( viewport={"width": 1920, "height": 1080}, user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) GTMAudit/1.0" ) page = context.new_page() self.page = page self._setup_network_monitoring(page) try: print("📄 Loading page...") page.goto(self.url, timeout=self.timeout, wait_until="networkidle") page.wait_for_timeout(2000) # Setup dataLayer monitoring after page load try: self._setup_datalayer_monitoring(page) except: pass print("🏷️ Checking GTM container...") self._check_gtm_container(page) # Run journey-specific audits if journey in ["scroll", "pageview", "full"]: print("📜 Simulating scroll...") self._simulate_scroll(page) if journey in ["form", "full"]: self._run_form_audit(page) if journey in ["checkout", "full"]: self._run_checkout_audit(page) if journey in ["datalayer", "full"]: self._run_datalayer_audit(page) # Always do basic dataLayer capture page.wait_for_timeout(2000) self._run_datalayer_audit(page) # Store network requests self.report["network_requests"] = self.network_requests self.report["tags_fired"] = list(set(r["destination"] for r in self.network_requests)) except Exception as e: self.report["issues"].append({ "severity": "critical", "type": "audit_error", "message": str(e), }) finally: browser.close() self._generate_recommendations() self._generate_checklist() print("✅ Audit complete!") return self.report def save_report(self, filepath): """Save report to JSON file.""" with open(filepath, "w", encoding="utf-8") as f: json.dump(self.report, f, indent=2, ensure_ascii=False) print(f"📝 Report saved to: {filepath}") def print_summary(self): """Print audit summary to console.""" print("\n" + "="*60) print("📋 GTM AUDIT SUMMARY") print("="*60) # Container cs = self.report["container_status"] print(f"\n🏷️ Container: {'✅ Installed' if cs.get('installed') else '❌ Not Found'}") if cs.get("containers"): print(f" IDs: {', '.join(cs['containers'])}") # DataLayer dl = self.report["datalayer_analysis"] print(f"\n📊 DataLayer:") print(f" Events found: {len(dl.get('events', []))}") print(f" Validation issues: {len(dl.get('validation_issues', []))}") # Forms fa = self.report["form_analysis"] if fa.get("forms_found"): print(f"\n📝 Forms:") print(f" Forms found: {len(fa['forms_found'])}") print(f" Tracking issues: {len(fa.get('tracking_issues', []))}") # Tags print(f"\n🔥 Tags Fired: {', '.join(self.report['tags_fired']) if self.report['tags_fired'] else 'None detected'}") # Issues print(f"\n⚠️ Total Issues: {len(self.report['issues'])}") for issue in self.report["issues"][:5]: print(f" - [{issue['severity'].upper()}] {issue['message']}") # Recommendations print(f"\n💡 Recommendations: {len(self.report['recommendations'])}") for rec in self.report["recommendations"][:3]: print(f" - [{rec['priority'].upper()}] {rec['action']}") print("\n" + "="*60) class NotionExporter: """Export GTM audit reports to Notion database.""" # Default database ID - OurDigital GTM Audit Log DEFAULT_DATABASE_ID = "2cf581e5-8a1e-8163-997f-ccb387156a20" def __init__(self, token=None, database_id=None): import os self.token = token or os.environ.get("NOTION_TOKEN") or os.environ.get("NOTION_API_KEY") self.database_id = database_id or self.DEFAULT_DATABASE_ID self.base_url = "https://api.notion.com/v1" if not self.token: raise ValueError("Notion token not found. Set NOTION_TOKEN or NOTION_API_KEY environment variable.") def _headers(self): return { "Authorization": f"Bearer {self.token}", "Content-Type": "application/json", "Notion-Version": "2022-06-28", } def _make_request(self, method, endpoint, data=None): import urllib.request import urllib.error url = f"{self.base_url}{endpoint}" req = urllib.request.Request(url, method=method, headers=self._headers()) if data: req.data = json.dumps(data).encode("utf-8") try: with urllib.request.urlopen(req) as response: return json.loads(response.read().decode("utf-8")) except urllib.error.HTTPError as e: error_body = e.read().decode("utf-8") raise Exception(f"Notion API error: {e.code} - {error_body}") def _generate_audit_id(self, url, timestamp): """Generate unique audit ID.""" from urllib.parse import urlparse import hashlib domain = urlparse(url).netloc.replace("www.", "") date_str = timestamp[:10].replace("-", "") hash_suffix = hashlib.md5(f"{url}{timestamp}".encode()).hexdigest()[:6] return f"GTM-{domain[:20]}-{date_str}-{hash_suffix}" def _determine_audit_status(self, report): """Determine overall audit status based on issues.""" issues = report.get("issues", []) critical_count = sum(1 for i in issues if i.get("severity") == "critical") high_count = sum(1 for i in issues if i.get("severity") in ["critical", "error"]) if critical_count > 0: return "Fail" elif high_count > 0 or len(issues) > 5: return "Warning" return "Pass" def _determine_gtm_status(self, container_status): """Determine GTM installation status.""" if not container_status.get("installed"): return "Not Found" if len(container_status.get("containers", [])) > 1: return "Multiple Containers" return "Installed" def _generate_summary(self, report): """Generate text summary of audit findings.""" parts = [] # Container status cs = report.get("container_status", {}) if cs.get("installed"): containers = cs.get("containers", []) parts.append(f"GTM: {', '.join(containers)}") else: parts.append("GTM: Not installed") # DataLayer dl = report.get("datalayer_analysis", {}) event_count = len(dl.get("events", [])) validation_issues = len(dl.get("validation_issues", [])) parts.append(f"Events: {event_count}") if validation_issues: parts.append(f"Validation issues: {validation_issues}") # Forms fa = report.get("form_analysis", {}) form_count = len(fa.get("forms_found", [])) if form_count: parts.append(f"Forms: {form_count}") # Tags tags = report.get("tags_fired", []) if tags: parts.append(f"Tags: {', '.join(tags)}") # Issues issues = report.get("issues", []) if issues: parts.append(f"Total issues: {len(issues)}") return " | ".join(parts) def export_report(self, report, journey_type="full"): """Export audit report to Notion database.""" metadata = report.get("audit_metadata", {}) url = metadata.get("url", "") timestamp = metadata.get("timestamp", datetime.now().isoformat()) audit_id = self._generate_audit_id(url, timestamp) container_status = report.get("container_status", {}) issues = report.get("issues", []) tags_fired = report.get("tags_fired", []) # Count critical issues critical_count = sum(1 for i in issues if i.get("severity") in ["critical", "error"]) # Build Notion page properties from urllib.parse import urlparse site_name = urlparse(url).netloc if url else "Unknown" properties = { "Site": { "title": [{"text": {"content": site_name}}] }, "Audit ID": { "rich_text": [{"text": {"content": audit_id}}] }, "URL": { "url": url if url else None }, "Audit Date": { "date": {"start": timestamp[:10]} if timestamp else None }, "Journey Type": { "select": {"name": journey_type} }, "GTM Status": { "select": {"name": self._determine_gtm_status(container_status)} }, "Container IDs": { "rich_text": [{"text": {"content": ", ".join(container_status.get("containers", []))}}] }, "Tags Fired": { "multi_select": [{"name": tag} for tag in tags_fired if tag in ["GA4", "Google Ads", "Meta Pixel", "LinkedIn", "TikTok", "Kakao", "Naver"]] }, "Issues Count": { "number": len(issues) }, "Critical Issues": { "number": critical_count }, "Audit Status": { "select": {"name": self._determine_audit_status(report)} }, "Summary": { "rich_text": [{"text": {"content": self._generate_summary(report)[:2000]}}] }, } # Create page in database page_data = { "parent": {"database_id": self.database_id}, "properties": properties, } result = self._make_request("POST", "/pages", page_data) page_url = result.get("url", "") print(f"📤 Exported to Notion: {page_url}") return { "success": True, "page_id": result.get("id"), "page_url": page_url, "audit_id": audit_id, } def add_detailed_content(self, page_id, report): """Add detailed content blocks to the Notion page.""" blocks = [] # Issues section issues = report.get("issues", []) if issues: blocks.append({ "type": "heading_2", "heading_2": { "rich_text": [{"type": "text", "text": {"content": "Issues Found"}}] } }) for issue in issues[:10]: # Limit to 10 issues severity = issue.get("severity", "info").upper() message = issue.get("message", "") blocks.append({ "type": "bulleted_list_item", "bulleted_list_item": { "rich_text": [{"type": "text", "text": {"content": f"[{severity}] {message}"}}] } }) # Recommendations section recommendations = report.get("recommendations", []) if recommendations: blocks.append({ "type": "heading_2", "heading_2": { "rich_text": [{"type": "text", "text": {"content": "Recommendations"}}] } }) for rec in recommendations: priority = rec.get("priority", "").upper() action = rec.get("action", "") details = rec.get("details", "") blocks.append({ "type": "bulleted_list_item", "bulleted_list_item": { "rich_text": [{"type": "text", "text": {"content": f"[{priority}] {action}: {details}"}}] } }) # Checklist section checklist = report.get("checklist", {}) if checklist: blocks.append({ "type": "heading_2", "heading_2": { "rich_text": [{"type": "text", "text": {"content": "Checklist"}}] } }) for category, items in checklist.items(): blocks.append({ "type": "heading_3", "heading_3": { "rich_text": [{"type": "text", "text": {"content": category.replace("_", " ").title()}}] } }) for item, passed in items.items(): status = "✅" if passed else "❌" item_name = item.replace("_", " ").title() blocks.append({ "type": "bulleted_list_item", "bulleted_list_item": { "rich_text": [{"type": "text", "text": {"content": f"{status} {item_name}"}}] } }) if blocks: self._make_request("PATCH", f"/blocks/{page_id}/children", {"children": blocks}) class DataLayerInjector: """Generate GTM custom HTML tags for dataLayer injection. Use when direct code access is unavailable and you need to push dataLayer events through GTM custom HTML tags. """ # Event templates with required/recommended parameters EVENT_TEMPLATES = { "page_view": { "description": "Track page views with custom dimensions", "params": { "page_title": "document.title", "page_location": "window.location.href", "page_path": "window.location.pathname", }, "custom_params": ["content_group", "author", "publish_date"], "trigger": "All Pages", }, "view_item": { "description": "Track product detail page views", "params": { "currency": "'KRW'", "value": "/* product price */", }, "items_params": ["item_id", "item_name", "item_brand", "item_category", "price", "quantity"], "trigger": "Product Detail Page", "scrape_selectors": { "item_name": [".product-title", ".product-name", "h1.title", "[itemprop='name']"], "price": [".product-price", ".price", "[itemprop='price']", ".sale-price"], "item_id": ["[data-product-id]", "[data-sku]", ".product-sku"], "item_brand": [".brand", "[itemprop='brand']", ".product-brand"], "item_category": [".category", ".breadcrumb", "[itemprop='category']"], }, }, "add_to_cart": { "description": "Track add to cart actions", "params": { "currency": "'KRW'", "value": "/* cart value */", }, "items_params": ["item_id", "item_name", "item_brand", "item_category", "price", "quantity"], "trigger": "Add to Cart Button Click", "trigger_selector": "button.add-to-cart, .btn-cart, [data-action='add-to-cart']", }, "remove_from_cart": { "description": "Track remove from cart actions", "params": { "currency": "'KRW'", "value": "/* removed item value */", }, "items_params": ["item_id", "item_name", "price", "quantity"], "trigger": "Remove from Cart Click", "trigger_selector": "button.remove, .btn-remove, [data-action='remove']", }, "view_cart": { "description": "Track cart page views", "params": { "currency": "'KRW'", "value": "/* total cart value */", }, "items_params": ["item_id", "item_name", "price", "quantity"], "trigger": "Cart Page", }, "begin_checkout": { "description": "Track checkout initiation", "params": { "currency": "'KRW'", "value": "/* checkout value */", "coupon": "/* coupon code if any */", }, "items_params": ["item_id", "item_name", "price", "quantity"], "trigger": "Checkout Page", }, "add_shipping_info": { "description": "Track shipping info submission", "params": { "currency": "'KRW'", "value": "/* order value */", "shipping_tier": "/* shipping method */", }, "items_params": ["item_id", "item_name", "price", "quantity"], "trigger": "Shipping Form Submit", }, "add_payment_info": { "description": "Track payment info submission", "params": { "currency": "'KRW'", "value": "/* order value */", "payment_type": "/* payment method */", }, "items_params": ["item_id", "item_name", "price", "quantity"], "trigger": "Payment Form Submit", "payment_types_kr": ["신용카드", "카카오페이", "네이버페이", "토스", "무통장입금", "휴대폰결제"], }, "purchase": { "description": "Track completed purchases", "params": { "transaction_id": "/* unique order ID */", "currency": "'KRW'", "value": "/* total order value */", "tax": "/* tax amount */", "shipping": "/* shipping cost */", "coupon": "/* coupon code if any */", }, "items_params": ["item_id", "item_name", "item_brand", "item_category", "price", "quantity"], "trigger": "Order Confirmation Page", "scrape_selectors": { "transaction_id": [".order-number", ".order-id", "[data-order-id]"], "value": [".order-total", ".total-price", ".grand-total"], }, }, "generate_lead": { "description": "Track lead form submissions", "params": { "currency": "'KRW'", "value": "/* lead value if applicable */", }, "custom_params": ["form_id", "form_name", "lead_source"], "trigger": "Lead Form Submit", }, "form_submit": { "description": "Track general form submissions", "params": {}, "custom_params": ["form_id", "form_name", "form_destination"], "trigger": "Form Submission", }, "form_start": { "description": "Track form interaction start", "params": {}, "custom_params": ["form_id", "form_name"], "trigger": "Form Field Focus", }, "scroll": { "description": "Track scroll depth", "params": { "percent_scrolled": "/* 25, 50, 75, 90, 100 */", }, "trigger": "Scroll Depth", }, "file_download": { "description": "Track file downloads", "params": { "file_name": "/* filename */", "file_extension": "/* pdf, xlsx, etc */", "link_url": "/* download URL */", }, "trigger": "File Download Click", "trigger_selector": "a[href$='.pdf'], a[href$='.xlsx'], a[href$='.docx'], a[download]", }, "video_start": { "description": "Track video play start", "params": { "video_title": "/* video title */", "video_provider": "/* youtube, vimeo, etc */", "video_url": "/* video URL */", }, "trigger": "Video Start", }, "video_progress": { "description": "Track video progress milestones", "params": { "video_title": "/* video title */", "video_percent": "/* 25, 50, 75, 100 */", "video_current_time": "/* seconds watched */", }, "trigger": "Video Progress", }, "video_complete": { "description": "Track video completion", "params": { "video_title": "/* video title */", "video_duration": "/* total duration */", }, "trigger": "Video Complete", }, "search": { "description": "Track site search", "params": { "search_term": "/* search query */", }, "custom_params": ["search_results_count"], "trigger": "Search Results Page", "scrape_selectors": { "search_term": ["input[name='q']", "input[name='search']", ".search-input"], }, }, "login": { "description": "Track user login", "params": { "method": "/* login method */", }, "trigger": "Login Success", }, "sign_up": { "description": "Track user registration", "params": { "method": "/* signup method */", }, "trigger": "Signup Success", }, "share": { "description": "Track social sharing", "params": { "method": "/* share platform */", "content_type": "/* article, product, etc */", "item_id": "/* content ID */", }, "trigger": "Share Button Click", }, "outbound_click": { "description": "Track outbound link clicks", "params": { "link_url": "/* destination URL */", "link_domain": "/* destination domain */", "outbound": "true", }, "trigger": "Outbound Link Click", }, } def __init__(self, currency="KRW"): self.currency = currency self.generated_tags = [] def generate_tag(self, event_name, options=None): """Generate a custom HTML tag for a specific event.""" options = options or {} if event_name not in self.EVENT_TEMPLATES: return {"error": f"Unknown event: {event_name}. Available: {list(self.EVENT_TEMPLATES.keys())}"} template = self.EVENT_TEMPLATES[event_name] # Build the tag tag = { "event": event_name, "name": f"cHTML - dataLayer - {event_name}", "description": template["description"], "trigger_recommendation": template.get("trigger", "Custom"), "trigger_selector": template.get("trigger_selector"), "html": self._generate_html(event_name, template, options), "scrape_selectors": template.get("scrape_selectors", {}), } self.generated_tags.append(tag) return tag def _generate_html(self, event_name, template, options): """Generate the custom HTML tag code.""" use_scraping = options.get("use_scraping", False) include_comments = options.get("include_comments", True) lines = ["") return "\n".join(lines) def _generate_scraping_code(self, template, event_name): """Generate code that scrapes values from DOM.""" lines = [] selectors = template.get("scrape_selectors", {}) lines.append(" // Scrape product/order data from page") lines.append(" function getText(selectors) {") lines.append(" for (var i = 0; i < selectors.length; i++) {") lines.append(" var el = document.querySelector(selectors[i]);") lines.append(" if (el) return el.textContent.trim();") lines.append(" }") lines.append(" return '';") lines.append(" }") lines.append("") lines.append(" function getPrice(selectors) {") lines.append(" var text = getText(selectors);") lines.append(" var num = text.replace(/[^0-9]/g, '');") lines.append(" return num ? parseInt(num, 10) : 0;") lines.append(" }") lines.append("") # Generate variable assignments for field, sels in selectors.items(): sel_str = json.dumps(sels) if field in ["price", "value"]: lines.append(f" var {field} = getPrice({sel_str});") else: lines.append(f" var {field} = getText({sel_str});") lines.append("") lines.append(" window.dataLayer.push({") lines.append(f" 'event': '{event_name}',") lines.append(" 'ecommerce': {") lines.append(f" 'currency': '{self.currency}',") if "value" in selectors or "price" in selectors: lines.append(" 'value': value || price,") if "transaction_id" in selectors: lines.append(" 'transaction_id': transaction_id,") lines.append(" 'items': [{") items_params = template.get("items_params", []) for param in items_params: if param in selectors: lines.append(f" '{param}': {param},") elif param == "quantity": lines.append(f" '{param}': 1,") elif param == "price": lines.append(f" '{param}': price,") lines.append(" }]") lines.append(" }") lines.append(" });") return lines def _generate_manual_ecommerce_code(self, event_name, template, options): """Generate ecommerce code with manual value placeholders.""" lines = [] lines.append(" window.dataLayer.push({") lines.append(f" 'event': '{event_name}',") lines.append(" 'ecommerce': {") for param, default_value in template.get("params", {}).items(): if param == "currency": lines.append(f" '{param}': '{self.currency}',") else: lines.append(f" '{param}': {default_value},") lines.append(" 'items': [{") for param in template.get("items_params", []): lines.append(f" '{param}': /* TODO: set {param} */,") lines.append(" }]") lines.append(" }") lines.append(" });") return lines def generate_from_audit(self, audit_report): """Generate tags based on audit findings.""" recommendations = [] # Check for missing events datalayer = audit_report.get("datalayer_analysis", {}) existing_events = [e.get("event") for e in datalayer.get("events", [])] # Form analysis form_analysis = audit_report.get("form_analysis", {}) if form_analysis.get("forms_found") and "form_submit" not in existing_events: recommendations.append({ "event": "form_submit", "reason": f"Found {len(form_analysis['forms_found'])} form(s) but no form_submit event", }) self.generate_tag("form_submit") # Checkout analysis checkout = audit_report.get("checkout_analysis", {}) elements = checkout.get("elements_found", {}) if elements.get("addToCart") and "add_to_cart" not in existing_events: recommendations.append({ "event": "add_to_cart", "reason": "Add to cart elements found but no add_to_cart event", }) self.generate_tag("add_to_cart") if elements.get("checkout") and "begin_checkout" not in existing_events: recommendations.append({ "event": "begin_checkout", "reason": "Checkout elements found but no begin_checkout event", }) self.generate_tag("begin_checkout") # Check for missing ecommerce sequence for event in CHECKOUT_SEQUENCE: if event not in existing_events: if event not in [r["event"] for r in recommendations]: recommendations.append({ "event": event, "reason": f"Missing from checkout sequence", }) self.generate_tag(event) return { "recommendations": recommendations, "generated_tags": self.generated_tags, } def generate_all_ecommerce(self): """Generate all ecommerce-related tags.""" ecommerce_events = [ "view_item", "add_to_cart", "remove_from_cart", "view_cart", "begin_checkout", "add_shipping_info", "add_payment_info", "purchase" ] for event in ecommerce_events: self.generate_tag(event) return self.generated_tags def generate_engagement_tags(self): """Generate engagement tracking tags.""" engagement_events = [ "form_submit", "form_start", "scroll", "file_download", "video_start", "video_progress", "video_complete", "search", "outbound_click", "share" ] for event in engagement_events: self.generate_tag(event) return self.generated_tags def save_tags(self, output_path, format="html"): """Save generated tags to files.""" import os if not self.generated_tags: return {"error": "No tags generated yet"} os.makedirs(output_path, exist_ok=True) saved_files = [] for tag in self.generated_tags: event = tag["event"] if format == "html": filename = f"{event}_tag.html" filepath = os.path.join(output_path, filename) with open(filepath, "w", encoding="utf-8") as f: f.write(f"\n") f.write(f"\n") f.write(f"\n") if tag.get("trigger_selector"): f.write(f"\n") f.write("\n") f.write(tag["html"]) saved_files.append(filepath) elif format == "json": filename = f"{event}_tag.json" filepath = os.path.join(output_path, filename) with open(filepath, "w", encoding="utf-8") as f: json.dump(tag, f, indent=2, ensure_ascii=False) saved_files.append(filepath) # Also save a combined file combined_path = os.path.join(output_path, f"all_tags.{format}") if format == "html": with open(combined_path, "w", encoding="utf-8") as f: f.write("\n") f.write("\n\n") for tag in self.generated_tags: f.write(f"\n\n") f.write(f"\n") f.write(f"\n\n") f.write(tag["html"]) f.write("\n\n") else: with open(combined_path, "w", encoding="utf-8") as f: json.dump(self.generated_tags, f, indent=2, ensure_ascii=False) saved_files.append(combined_path) return {"saved_files": saved_files, "count": len(self.generated_tags)} def print_tag(self, tag): """Print a tag to console.""" print(f"\n{'='*60}") print(f"📦 {tag['name']}") print(f" {tag['description']}") print(f" Trigger: {tag['trigger_recommendation']}") if tag.get("trigger_selector"): print(f" Selector: {tag['trigger_selector']}") print(f"{'='*60}") print(tag["html"]) print() def main(): parser = argparse.ArgumentParser( description="GTM Manager - Audit and manage Google Tag Manager implementations", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Audit a website python gtm_manager.py audit --url https://example.com # Generate ecommerce dataLayer tags python gtm_manager.py inject --preset ecommerce --output ./tags # Generate tags from audit report python gtm_manager.py inject --from-audit gtm_audit_report.json # Generate specific event tag python gtm_manager.py inject --event purchase --event add_to_cart """ ) subparsers = parser.add_subparsers(dest="command", help="Command to run") # Audit subcommand audit_parser = subparsers.add_parser("audit", help="Audit GTM implementation on a URL") audit_parser.add_argument("--url", required=True, help="Target URL to audit") audit_parser.add_argument("--container", help="Expected GTM container ID (e.g., GTM-XXXXXX)") audit_parser.add_argument("--journey", default="full", choices=["pageview", "scroll", "click", "form", "checkout", "datalayer", "full"], help="Journey type to simulate") audit_parser.add_argument("--output", default="gtm_audit_report.json", help="Output file path") audit_parser.add_argument("--timeout", type=int, default=30000, help="Page load timeout (ms)") audit_parser.add_argument("--headless", action="store_true", default=True, help="Run headless") audit_parser.add_argument("--notion", action="store_true", help="Export results to Notion database") audit_parser.add_argument("--notion-database", help="Notion database ID") audit_parser.add_argument("--notion-detailed", action="store_true", help="Add detailed content to Notion page") audit_parser.add_argument("--generate-tags", action="store_true", help="Generate missing dataLayer tags after audit") # Inject subcommand inject_parser = subparsers.add_parser("inject", help="Generate dataLayer injection tags") inject_parser.add_argument("--event", action="append", help="Event type(s) to generate (can be repeated)") inject_parser.add_argument("--preset", choices=["ecommerce", "engagement", "all"], help="Generate preset group of tags") inject_parser.add_argument("--from-audit", help="Generate tags based on audit report JSON file") inject_parser.add_argument("--output", default="./gtm_tags", help="Output directory for generated tags") inject_parser.add_argument("--format", choices=["html", "json"], default="html", help="Output format") inject_parser.add_argument("--currency", default="KRW", help="Currency code for ecommerce events") inject_parser.add_argument("--scrape", action="store_true", help="Generate DOM scraping code for values") inject_parser.add_argument("--list-events", action="store_true", help="List available event types") args = parser.parse_args() if args.command == "audit": run_audit(args) elif args.command == "inject": run_inject(args) else: parser.print_help() def run_audit(args): """Run GTM audit.""" auditor = GTMAuditor( url=args.url, container_id=args.container, timeout=args.timeout, headless=args.headless, ) report = auditor.run_audit(journey=args.journey) auditor.save_report(args.output) auditor.print_summary() # Generate missing tags if requested if args.generate_tags: print("\n🏷️ Generating missing dataLayer tags...") injector = DataLayerInjector() result = injector.generate_from_audit(report) if result["recommendations"]: print(f" Found {len(result['recommendations'])} missing event(s)") for rec in result["recommendations"]: print(f" - {rec['event']}: {rec['reason']}") output_dir = args.output.replace(".json", "_tags") save_result = injector.save_tags(output_dir) print(f" Saved {save_result['count']} tags to {output_dir}/") else: print(" No missing events detected") # Export to Notion if requested if args.notion: try: exporter = NotionExporter(database_id=args.notion_database) result = exporter.export_report(report, journey_type=args.journey) if args.notion_detailed and result.get("page_id"): exporter.add_detailed_content(result["page_id"], report) print(" Added detailed content to Notion page") except Exception as e: print(f"❌ Notion export failed: {e}") def run_inject(args): """Run dataLayer injection tag generation.""" injector = DataLayerInjector(currency=args.currency) # List available events if args.list_events: print("\n📋 Available Event Types:") print("=" * 60) categories = { "Ecommerce": ["view_item", "add_to_cart", "remove_from_cart", "view_cart", "begin_checkout", "add_shipping_info", "add_payment_info", "purchase"], "Forms & Leads": ["form_submit", "form_start", "generate_lead"], "Engagement": ["scroll", "file_download", "search", "outbound_click", "share"], "Video": ["video_start", "video_progress", "video_complete"], "User": ["login", "sign_up"], "Page": ["page_view"], } for category, events in categories.items(): print(f"\n{category}:") for event in events: template = DataLayerInjector.EVENT_TEMPLATES.get(event, {}) desc = template.get("description", "") print(f" - {event}: {desc}") return # Generate from audit report if args.from_audit: try: with open(args.from_audit, "r", encoding="utf-8") as f: report = json.load(f) print(f"📊 Analyzing audit report: {args.from_audit}") result = injector.generate_from_audit(report) if result["recommendations"]: print(f"\n🔍 Found {len(result['recommendations'])} missing event(s):") for rec in result["recommendations"]: print(f" - {rec['event']}: {rec['reason']}") else: print(" No missing events detected in audit report") except FileNotFoundError: print(f"❌ Audit report not found: {args.from_audit}") return except json.JSONDecodeError: print(f"❌ Invalid JSON in audit report: {args.from_audit}") return # Generate preset groups elif args.preset: print(f"📦 Generating {args.preset} preset tags...") if args.preset == "ecommerce": injector.generate_all_ecommerce() elif args.preset == "engagement": injector.generate_engagement_tags() elif args.preset == "all": injector.generate_all_ecommerce() injector.generate_engagement_tags() for event in ["page_view", "login", "sign_up"]: injector.generate_tag(event) # Generate specific events elif args.event: print(f"📦 Generating tags for: {', '.join(args.event)}") for event in args.event: options = {"use_scraping": args.scrape} result = injector.generate_tag(event, options) if "error" in result: print(f" ❌ {result['error']}") else: injector.print_tag(result) else: print("❌ Please specify --event, --preset, or --from-audit") print(" Use --list-events to see available event types") return # Save generated tags if injector.generated_tags: save_result = injector.save_tags(args.output, format=args.format) print(f"\n✅ Saved {save_result['count']} tag(s) to {args.output}/") for filepath in save_result["saved_files"][:5]: print(f" - {filepath}") if len(save_result["saved_files"]) > 5: print(f" ... and {len(save_result['saved_files']) - 5} more") if __name__ == "__main__": main()