Files
our-claude-skills/custom-skills/21-gtm-manager/code/scripts/gtm_manager.py
Andrew Yim 236be6c580 directory changes and restructuring
🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-22 02:01:41 +09:00

2074 lines
79 KiB
Python

#!/usr/bin/env python3
"""
GTM Manager - Comprehensive Google Tag Manager management toolkit.
Features:
- Audit: GTM container validation, dataLayer analysis, form/checkout tracking
- Inject: Generate custom HTML tags for dataLayer pushes when direct code access is unavailable
- Export: Send audit results to Notion database
Usage:
# Audit mode
python gtm_manager.py audit --url "https://example.com" --journey full
# Inject mode - generate dataLayer push tags
python gtm_manager.py inject --event purchase --output tags/
# Generate tags from audit report
python gtm_manager.py inject --from-audit gtm_audit_report.json
Options:
audit Run GTM audit on a URL
inject Generate custom HTML tags for dataLayer injection
--url Target URL to audit (required for audit)
--container Expected GTM container ID (e.g., GTM-XXXXXX)
--journey Journey type: pageview, scroll, click, form, checkout, datalayer, full
--output Output file/directory path
--notion Export results to Notion database
"""
import argparse
import json
import re
import sys
from datetime import datetime
from urllib.parse import urlparse, parse_qs
from playwright.sync_api import sync_playwright
# Tag destination patterns
TAG_DESTINATIONS = {
"GA4": [
r"google-analytics\.com/g/collect",
r"analytics\.google\.com/g/collect",
],
"Universal Analytics": [
r"google-analytics\.com/collect",
r"google-analytics\.com/r/collect",
],
"Google Ads": [
r"googleads\.g\.doubleclick\.net",
r"google\.com/pagead",
r"googleadservices\.com/pagead",
],
"Meta Pixel": [
r"facebook\.com/tr",
r"connect\.facebook\.net",
],
"LinkedIn": [
r"px\.ads\.linkedin\.com",
r"snap\.licdn\.com",
],
"TikTok": [
r"analytics\.tiktok\.com",
],
"Twitter/X": [
r"ads-twitter\.com",
r"t\.co/i/adsct",
],
"Kakao": [
r"pixel\.kakao\.com",
],
"Naver": [
r"wcs\.naver\.com",
],
}
# GA4 Required Parameters by Event
GA4_EVENT_REQUIREMENTS = {
"purchase": {
"required": ["transaction_id", "value", "currency"],
"items_required": ["item_id", "item_name"],
},
"add_to_cart": {
"required": ["currency", "value"],
"items_required": ["item_id", "item_name"],
},
"begin_checkout": {
"required": ["currency", "value"],
"items_required": ["item_id", "item_name"],
},
"add_shipping_info": {
"required": ["currency", "value"],
"recommended": ["shipping_tier"],
},
"add_payment_info": {
"required": ["currency", "value"],
"recommended": ["payment_type"],
},
"view_item": {
"required": ["currency", "value"],
"items_required": ["item_id", "item_name"],
},
"view_cart": {
"required": ["currency", "value"],
"items_required": ["item_id", "item_name"],
},
"generate_lead": {
"recommended": ["currency", "value"],
},
"form_submit": {
"recommended": ["form_id", "form_name"],
},
}
# Checkout flow sequence
CHECKOUT_SEQUENCE = [
"view_cart",
"begin_checkout",
"add_shipping_info",
"add_payment_info",
"purchase",
]
class DataLayerValidator:
"""Advanced dataLayer validation and monitoring."""
def __init__(self):
self.events = []
self.issues = []
self.snapshots = []
def validate_event(self, event_data):
"""Validate a single dataLayer event against GA4 specs."""
issues = []
event_name = event_data.get("event")
if not event_name:
return issues
# Check if event has requirements
if event_name in GA4_EVENT_REQUIREMENTS:
reqs = GA4_EVENT_REQUIREMENTS[event_name]
ecommerce = event_data.get("ecommerce", {})
# Check required fields
for field in reqs.get("required", []):
if field not in ecommerce and field not in event_data:
issues.append({
"type": "missing_required",
"event": event_name,
"field": field,
"message": f"Missing required field: {field}",
})
# Check items array
items = ecommerce.get("items", [])
if reqs.get("items_required") and not items:
issues.append({
"type": "missing_items",
"event": event_name,
"message": "E-commerce event missing 'items' array",
})
# Validate items structure
for i, item in enumerate(items):
for field in reqs.get("items_required", []):
if field not in item:
issues.append({
"type": "item_missing_field",
"event": event_name,
"item_index": i,
"field": field,
"message": f"Item {i} missing required field: {field}",
})
# Check data types
if "value" in ecommerce:
if not isinstance(ecommerce["value"], (int, float)):
issues.append({
"type": "wrong_type",
"event": event_name,
"field": "value",
"message": f"'value' should be number, got {type(ecommerce['value']).__name__}",
})
# Check transaction_id uniqueness hint
if event_name == "purchase" and "transaction_id" in ecommerce:
tid = ecommerce["transaction_id"]
if not tid or tid == "" or tid == "undefined":
issues.append({
"type": "invalid_transaction_id",
"event": event_name,
"message": "transaction_id is empty or invalid",
})
return issues
def validate_sequence(self, events):
"""Validate checkout event sequence."""
issues = []
event_names = [e.get("event") for e in events if e.get("event")]
# Find checkout events in order
checkout_events = [e for e in event_names if e in CHECKOUT_SEQUENCE]
# Check sequence
last_idx = -1
for event in checkout_events:
idx = CHECKOUT_SEQUENCE.index(event)
if idx < last_idx:
issues.append({
"type": "sequence_error",
"message": f"Event '{event}' fired out of order",
})
last_idx = idx
return issues
def check_ecommerce_clear(self, events):
"""Check if ecommerce object is cleared before new pushes."""
issues = []
last_had_ecommerce = False
for i, event in enumerate(events):
has_ecommerce = "ecommerce" in event
is_clear = event.get("ecommerce") is None
if has_ecommerce and last_had_ecommerce and not is_clear:
# Previous had ecommerce, this has ecommerce, but no clear
issues.append({
"type": "missing_ecommerce_clear",
"index": i,
"event": event.get("event"),
"message": "E-commerce data should be cleared before new push",
})
if has_ecommerce and not is_clear:
last_had_ecommerce = True
elif is_clear:
last_had_ecommerce = False
return issues
class FormAnalyzer:
"""Form discovery, analysis, and interaction tracking."""
def __init__(self, page):
self.page = page
self.forms = []
self.interactions = []
self.issues = []
def discover_forms(self):
"""Find and analyze all forms on the page."""
forms_data = self.page.evaluate("""
() => {
const forms = document.querySelectorAll('form');
return Array.from(forms).map((form, idx) => {
const fields = Array.from(form.querySelectorAll('input, select, textarea'));
return {
index: idx,
id: form.id || null,
name: form.name || null,
action: form.action || null,
method: form.method || 'get',
className: form.className || null,
fieldCount: fields.length,
fields: fields.map(field => ({
type: field.type || field.tagName.toLowerCase(),
name: field.name || null,
id: field.id || null,
required: field.required || false,
placeholder: field.placeholder || null,
validation: field.pattern || null,
maxLength: field.maxLength > 0 ? field.maxLength : null,
})),
hasSubmitButton: form.querySelector('button[type="submit"], input[type="submit"]') !== null,
};
});
}
""")
self.forms = forms_data
return forms_data
def analyze_form_tracking_readiness(self):
"""Check if forms are ready for GTM tracking."""
issues = []
for form in self.forms:
# Check for identifiers
if not form["id"] and not form["name"]:
issues.append({
"type": "form_no_identifier",
"form_index": form["index"],
"message": f"Form {form['index']} has no id or name attribute",
"recommendation": "Add id or name attribute for reliable form tracking",
})
# Check fields for tracking
for field in form["fields"]:
if field["type"] in ["text", "email", "tel"] and not field["name"] and not field["id"]:
issues.append({
"type": "field_no_identifier",
"form_index": form["index"],
"field_type": field["type"],
"message": "Input field missing name/id for tracking",
})
# Check for submit button
if not form["hasSubmitButton"]:
issues.append({
"type": "form_no_submit",
"form_index": form["index"],
"message": "Form has no submit button - may use JS submission",
"recommendation": "Verify form submission triggers dataLayer push",
})
self.issues = issues
return issues
def simulate_form_interaction(self, form_index=0):
"""Simulate user interaction with a form."""
if form_index >= len(self.forms):
return {"error": "Form index out of range"}
form = self.forms[form_index]
interactions = []
# Find form element
form_selector = f"form:nth-of-type({form_index + 1})"
if form["id"]:
form_selector = f"#{form['id']}"
elif form["name"]:
form_selector = f"form[name='{form['name']}']"
try:
form_element = self.page.locator(form_selector)
# Interact with each field
for field in form["fields"]:
field_selector = None
if field["id"]:
field_selector = f"#{field['id']}"
elif field["name"]:
field_selector = f"[name='{field['name']}']"
if not field_selector:
continue
try:
field_element = self.page.locator(field_selector).first
# Focus event
field_element.focus()
interactions.append({
"action": "focus",
"field": field["name"] or field["id"],
"timestamp": datetime.now().isoformat(),
})
self.page.wait_for_timeout(200)
# Fill based on type
test_values = {
"text": "Test User",
"email": "test@example.com",
"tel": "010-1234-5678",
"number": "100",
"password": "TestPass123!",
}
if field["type"] in test_values:
field_element.fill(test_values[field["type"]])
interactions.append({
"action": "input",
"field": field["name"] or field["id"],
"type": field["type"],
"timestamp": datetime.now().isoformat(),
})
self.page.wait_for_timeout(200)
# Blur event
field_element.blur()
interactions.append({
"action": "blur",
"field": field["name"] or field["id"],
"timestamp": datetime.now().isoformat(),
})
except Exception as e:
interactions.append({
"action": "error",
"field": field["name"] or field["id"],
"error": str(e),
})
self.interactions = interactions
return interactions
except Exception as e:
return {"error": str(e)}
def check_form_events(self, datalayer_events):
"""Check if expected form events are in dataLayer."""
expected_events = ["form_start", "form_submit", "generate_lead"]
found_events = []
missing_events = []
event_names = [e.get("event") for e in datalayer_events]
for expected in expected_events:
if expected in event_names:
found_events.append(expected)
else:
missing_events.append(expected)
return {
"found": found_events,
"missing": missing_events,
"recommendation": "Consider implementing: " + ", ".join(missing_events) if missing_events else None,
}
class CheckoutFlowAnalyzer:
"""E-commerce checkout flow simulation and validation."""
def __init__(self, page):
self.page = page
self.steps_completed = []
self.events_captured = []
self.issues = []
def detect_checkout_elements(self):
"""Find checkout-related elements on page."""
elements = self.page.evaluate("""
() => {
const selectors = {
cart: [
'[class*="cart"]', '[id*="cart"]',
'[class*="basket"]', '[id*="basket"]',
],
checkout: [
'[class*="checkout"]', '[id*="checkout"]',
'button:has-text("Checkout")', 'a:has-text("Checkout")',
'button:has-text("결제")', 'a:has-text("결제")',
],
addToCart: [
'button:has-text("Add to Cart")', 'button:has-text("Add to Bag")',
'button:has-text("장바구니")', 'button:has-text("담기")',
'[class*="add-to-cart"]', '[id*="add-to-cart"]',
],
quantity: [
'[class*="quantity"]', '[name*="quantity"]',
'[class*="qty"]', '[name*="qty"]',
],
removeItem: [
'[class*="remove"]', 'button:has-text("Remove")',
'button:has-text("삭제")', '[class*="delete"]',
],
promoCode: [
'[name*="promo"]', '[name*="coupon"]', '[id*="coupon"]',
'[placeholder*="promo"]', '[placeholder*="coupon"]',
],
};
const found = {};
for (const [type, selectorList] of Object.entries(selectors)) {
found[type] = [];
for (const sel of selectorList) {
try {
const elements = document.querySelectorAll(sel);
elements.forEach(el => {
found[type].push({
selector: sel,
tag: el.tagName.toLowerCase(),
text: el.textContent?.slice(0, 50) || null,
visible: el.offsetParent !== null,
});
});
} catch(e) {}
}
}
return found;
}
""")
return elements
def simulate_add_to_cart(self):
"""Attempt to simulate add-to-cart action."""
try:
# Try common add-to-cart selectors
selectors = [
'button:has-text("Add to Cart")',
'button:has-text("Add to Bag")',
'button:has-text("장바구니")',
'[class*="add-to-cart"]:visible',
'[id*="add-to-cart"]:visible',
]
for selector in selectors:
try:
btn = self.page.locator(selector).first
if btn.is_visible():
btn.click()
self.steps_completed.append({
"step": "add_to_cart",
"selector": selector,
"timestamp": datetime.now().isoformat(),
})
self.page.wait_for_timeout(1500)
return True
except:
continue
return False
except Exception as e:
self.issues.append({"step": "add_to_cart", "error": str(e)})
return False
def simulate_begin_checkout(self):
"""Attempt to click checkout button."""
try:
selectors = [
'button:has-text("Checkout")',
'a:has-text("Checkout")',
'button:has-text("결제하기")',
'button:has-text("주문하기")',
'[class*="checkout-btn"]:visible',
]
for selector in selectors:
try:
btn = self.page.locator(selector).first
if btn.is_visible():
btn.click()
self.steps_completed.append({
"step": "begin_checkout",
"selector": selector,
"timestamp": datetime.now().isoformat(),
})
self.page.wait_for_timeout(2000)
return True
except:
continue
return False
except Exception as e:
self.issues.append({"step": "begin_checkout", "error": str(e)})
return False
def validate_checkout_events(self, datalayer_events):
"""Validate checkout-related events in dataLayer."""
results = {
"events_found": [],
"events_missing": [],
"sequence_valid": True,
"issues": [],
}
event_names = [e.get("event") for e in datalayer_events]
# Check each checkout step
for step in CHECKOUT_SEQUENCE:
if step in event_names:
results["events_found"].append(step)
# Validate event parameters
for event in datalayer_events:
if event.get("event") == step:
validator = DataLayerValidator()
issues = validator.validate_event(event)
results["issues"].extend(issues)
else:
results["events_missing"].append(step)
# Check sequence
found_sequence = [e for e in event_names if e in CHECKOUT_SEQUENCE]
expected_order = [e for e in CHECKOUT_SEQUENCE if e in found_sequence]
if found_sequence != expected_order:
results["sequence_valid"] = False
results["issues"].append({
"type": "sequence_error",
"message": f"Events out of order. Found: {found_sequence}, Expected: {expected_order}",
})
return results
class GTMAuditor:
"""Main GTM audit orchestrator."""
def __init__(self, url, container_id=None, timeout=30000, headless=True):
self.url = url
self.expected_container = container_id
self.timeout = timeout
self.headless = headless
self.report = {
"audit_metadata": {
"url": url,
"timestamp": datetime.now().isoformat(),
"expected_container": container_id,
},
"container_status": {},
"datalayer_analysis": {
"events": [],
"validation_issues": [],
"sequence_issues": [],
},
"form_analysis": {
"forms_found": [],
"tracking_issues": [],
"events_status": {},
},
"checkout_analysis": {
"elements_found": {},
"events_status": {},
"flow_issues": [],
},
"network_requests": [],
"tags_fired": [],
"issues": [],
"recommendations": [],
"checklist": {},
}
self.network_requests = []
self.datalayer_history = []
self.page = None
def _setup_network_monitoring(self, page):
"""Intercept and log network requests to tag destinations."""
def handle_request(request):
url = request.url
for destination, patterns in TAG_DESTINATIONS.items():
for pattern in patterns:
if re.search(pattern, url):
parsed = urlparse(url)
params = parse_qs(parsed.query)
self.network_requests.append({
"destination": destination,
"url": url[:200],
"method": request.method,
"params": {k: v[0] if len(v) == 1 else v for k, v in params.items()},
"timestamp": datetime.now().isoformat(),
})
break
page.on("request", handle_request)
def _setup_datalayer_monitoring(self, page):
"""Inject dataLayer monitoring script."""
page.evaluate("""
() => {
window.__gtmAuditEvents = [];
const originalPush = window.dataLayer.push;
window.dataLayer.push = function() {
const result = originalPush.apply(this, arguments);
for (let i = 0; i < arguments.length; i++) {
window.__gtmAuditEvents.push({
data: JSON.parse(JSON.stringify(arguments[i])),
timestamp: new Date().toISOString()
});
}
return result;
};
}
""")
def _capture_datalayer(self, page):
"""Capture current dataLayer state."""
try:
datalayer = page.evaluate("""
() => {
if (typeof window.dataLayer !== 'undefined') {
return JSON.parse(JSON.stringify(window.dataLayer));
}
return null;
}
""")
return datalayer
except Exception as e:
return {"error": str(e)}
def _capture_monitored_events(self, page):
"""Capture events logged by our monitoring."""
try:
events = page.evaluate("""
() => window.__gtmAuditEvents || []
""")
return events
except:
return []
def _check_gtm_container(self, page):
"""Verify GTM container installation."""
result = page.evaluate("""
() => {
const scripts = document.querySelectorAll('script');
const gtmInfo = {
installed: false,
containers: [],
position: null,
noscript: false,
dataLayerInit: false,
dataLayerInitBeforeGTM: false,
};
gtmInfo.dataLayerInit = typeof window.dataLayer !== 'undefined' &&
Array.isArray(window.dataLayer);
let gtmScriptIndex = -1;
let dataLayerInitIndex = -1;
scripts.forEach((script, index) => {
const src = script.src || '';
const innerHTML = script.innerHTML || '';
// Check for dataLayer init
if (innerHTML.includes('dataLayer') && innerHTML.includes('[]')) {
dataLayerInitIndex = index;
}
const gtmMatch = src.match(/gtm\\.js\\?id=(GTM-[A-Z0-9]+)/);
if (gtmMatch) {
gtmInfo.installed = true;
gtmInfo.containers.push(gtmMatch[1]);
gtmInfo.position = script.closest('head') ? 'head' : 'body';
gtmScriptIndex = index;
}
const inlineMatch = innerHTML.match(/GTM-[A-Z0-9]+/g);
if (inlineMatch) {
gtmInfo.installed = true;
inlineMatch.forEach(id => {
if (!gtmInfo.containers.includes(id)) {
gtmInfo.containers.push(id);
}
});
}
});
gtmInfo.dataLayerInitBeforeGTM = dataLayerInitIndex < gtmScriptIndex && dataLayerInitIndex !== -1;
const noscripts = document.querySelectorAll('noscript');
noscripts.forEach(ns => {
if (ns.innerHTML.includes('googletagmanager.com/ns.html')) {
gtmInfo.noscript = true;
}
});
return gtmInfo;
}
""")
status = {
"installed": result["installed"],
"containers": result["containers"],
"position": result["position"],
"noscript_present": result["noscript"],
"datalayer_initialized": result["dataLayerInit"],
"datalayer_init_before_gtm": result["dataLayerInitBeforeGTM"],
"issues": [],
}
if not result["installed"]:
status["issues"].append("GTM container not detected")
self.report["issues"].append({
"severity": "critical",
"type": "container_missing",
"message": "GTM container script not found on page",
})
if len(result["containers"]) > 1:
status["issues"].append(f"Multiple containers: {result['containers']}")
self.report["issues"].append({
"severity": "warning",
"type": "multiple_containers",
"message": f"Multiple GTM containers found: {', '.join(result['containers'])}",
})
if self.expected_container and self.expected_container not in result["containers"]:
self.report["issues"].append({
"severity": "error",
"type": "container_mismatch",
"message": f"Expected {self.expected_container}, found {result['containers']}",
})
if result["position"] == "body":
self.report["issues"].append({
"severity": "warning",
"type": "script_position",
"message": "GTM script in body - may delay tag firing",
})
if not result["dataLayerInitBeforeGTM"]:
self.report["issues"].append({
"severity": "warning",
"type": "datalayer_order",
"message": "dataLayer should be initialized before GTM script",
})
self.report["container_status"] = status
return status
def _simulate_scroll(self, page):
"""Simulate scroll to trigger scroll-depth tags."""
page.evaluate("""
() => {
const heights = [0.25, 0.5, 0.75, 0.9, 1.0];
const docHeight = document.documentElement.scrollHeight;
heights.forEach((pct, i) => {
setTimeout(() => {
window.scrollTo(0, docHeight * pct);
}, i * 500);
});
}
""")
page.wait_for_timeout(3000)
def _run_form_audit(self, page):
"""Execute form analysis."""
print("📝 Analyzing forms...")
form_analyzer = FormAnalyzer(page)
forms = form_analyzer.discover_forms()
tracking_issues = form_analyzer.analyze_form_tracking_readiness()
self.report["form_analysis"]["forms_found"] = forms
self.report["form_analysis"]["tracking_issues"] = tracking_issues
if forms:
print(f" Found {len(forms)} form(s)")
# Simulate interaction with first form
interactions = form_analyzer.simulate_form_interaction(0)
self.report["form_analysis"]["interactions"] = interactions
# Allow time for events
page.wait_for_timeout(2000)
# Check form events
datalayer = self._capture_datalayer(page)
if datalayer:
events_status = form_analyzer.check_form_events(datalayer)
self.report["form_analysis"]["events_status"] = events_status
else:
print(" No forms found on page")
def _run_checkout_audit(self, page):
"""Execute e-commerce checkout flow analysis."""
print("🛒 Analyzing checkout flow...")
checkout_analyzer = CheckoutFlowAnalyzer(page)
elements = checkout_analyzer.detect_checkout_elements()
self.report["checkout_analysis"]["elements_found"] = elements
# Log what we found
for element_type, found in elements.items():
if found:
print(f" Found {len(found)} {element_type} element(s)")
def _run_datalayer_audit(self, page):
"""Execute deep dataLayer analysis."""
print("📊 Analyzing dataLayer...")
datalayer = self._capture_datalayer(page)
monitored_events = self._capture_monitored_events(page)
if not datalayer:
self.report["datalayer_analysis"]["issues"] = ["dataLayer not found"]
return
validator = DataLayerValidator()
# Validate each event
for event in datalayer:
if isinstance(event, dict):
issues = validator.validate_event(event)
if issues:
self.report["datalayer_analysis"]["validation_issues"].extend(issues)
# Check sequence
sequence_issues = validator.validate_sequence(datalayer)
self.report["datalayer_analysis"]["sequence_issues"] = sequence_issues
# Check ecommerce clearing
clear_issues = validator.check_ecommerce_clear(datalayer)
self.report["datalayer_analysis"]["validation_issues"].extend(clear_issues)
# Store events
events = []
for i, item in enumerate(datalayer):
if isinstance(item, dict) and item.get("event"):
events.append({
"index": i,
"event": item.get("event"),
"has_ecommerce": "ecommerce" in item,
"params": list(item.keys()),
})
self.report["datalayer_analysis"]["events"] = events
print(f" Found {len(events)} events in dataLayer")
def _generate_recommendations(self):
"""Generate recommendations based on findings."""
recs = []
for issue in self.report["issues"]:
if issue["type"] == "container_missing":
recs.append({
"priority": "high",
"action": "Install GTM container",
"details": "Add GTM snippet to <head> section",
})
elif issue["type"] == "datalayer_order":
recs.append({
"priority": "medium",
"action": "Initialize dataLayer before GTM",
"details": "Add 'window.dataLayer = window.dataLayer || [];' before GTM",
})
# Form recommendations
if not self.report["form_analysis"]["forms_found"]:
pass # No forms to track
elif self.report["form_analysis"].get("events_status", {}).get("missing"):
missing = self.report["form_analysis"]["events_status"]["missing"]
recs.append({
"priority": "medium",
"action": "Implement form tracking events",
"details": f"Missing events: {', '.join(missing)}",
})
# DataLayer recommendations
validation_issues = self.report["datalayer_analysis"].get("validation_issues", [])
if validation_issues:
recs.append({
"priority": "high",
"action": "Fix dataLayer validation issues",
"details": f"{len(validation_issues)} issue(s) found in event structure",
})
# Tag coverage
destinations = set(r["destination"] for r in self.network_requests)
if "GA4" not in destinations:
recs.append({
"priority": "high",
"action": "Verify GA4 implementation",
"details": "No GA4 requests detected",
})
self.report["recommendations"] = recs
def _generate_checklist(self):
"""Generate audit checklist."""
self.report["checklist"] = {
"container_health": {
"gtm_installed": self.report["container_status"].get("installed", False),
"correct_container": self.expected_container in self.report["container_status"].get("containers", []) if self.expected_container else True,
"no_duplicates": len(self.report["container_status"].get("containers", [])) <= 1,
"correct_position": self.report["container_status"].get("position") == "head",
"datalayer_init_order": self.report["container_status"].get("datalayer_init_before_gtm", False),
},
"datalayer_quality": {
"initialized": self.report["container_status"].get("datalayer_initialized", False),
"events_present": len(self.report["datalayer_analysis"].get("events", [])) > 0,
"no_validation_errors": len(self.report["datalayer_analysis"].get("validation_issues", [])) == 0,
"correct_sequence": len(self.report["datalayer_analysis"].get("sequence_issues", [])) == 0,
},
"form_tracking": {
"forms_identifiable": all(
f.get("id") or f.get("name")
for f in self.report["form_analysis"].get("forms_found", [])
) if self.report["form_analysis"].get("forms_found") else True,
"form_events_present": len(
self.report["form_analysis"].get("events_status", {}).get("found", [])
) > 0 if self.report["form_analysis"].get("forms_found") else True,
},
"tag_firing": {
"ga4_active": any(r["destination"] == "GA4" for r in self.network_requests),
"requests_captured": len(self.network_requests) > 0,
},
}
def run_audit(self, journey="pageview"):
"""Execute the full audit workflow."""
print(f"🔍 Starting GTM audit for: {self.url}")
print(f" Journey type: {journey}")
with sync_playwright() as p:
browser = p.chromium.launch(headless=self.headless)
context = browser.new_context(
viewport={"width": 1920, "height": 1080},
user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) GTMAudit/1.0"
)
page = context.new_page()
self.page = page
self._setup_network_monitoring(page)
try:
print("📄 Loading page...")
page.goto(self.url, timeout=self.timeout, wait_until="networkidle")
page.wait_for_timeout(2000)
# Setup dataLayer monitoring after page load
try:
self._setup_datalayer_monitoring(page)
except:
pass
print("🏷️ Checking GTM container...")
self._check_gtm_container(page)
# Run journey-specific audits
if journey in ["scroll", "pageview", "full"]:
print("📜 Simulating scroll...")
self._simulate_scroll(page)
if journey in ["form", "full"]:
self._run_form_audit(page)
if journey in ["checkout", "full"]:
self._run_checkout_audit(page)
if journey in ["datalayer", "full"]:
self._run_datalayer_audit(page)
# Always do basic dataLayer capture
page.wait_for_timeout(2000)
self._run_datalayer_audit(page)
# Store network requests
self.report["network_requests"] = self.network_requests
self.report["tags_fired"] = list(set(r["destination"] for r in self.network_requests))
except Exception as e:
self.report["issues"].append({
"severity": "critical",
"type": "audit_error",
"message": str(e),
})
finally:
browser.close()
self._generate_recommendations()
self._generate_checklist()
print("✅ Audit complete!")
return self.report
def save_report(self, filepath):
"""Save report to JSON file."""
with open(filepath, "w", encoding="utf-8") as f:
json.dump(self.report, f, indent=2, ensure_ascii=False)
print(f"📝 Report saved to: {filepath}")
def print_summary(self):
"""Print audit summary to console."""
print("\n" + "="*60)
print("📋 GTM AUDIT SUMMARY")
print("="*60)
# Container
cs = self.report["container_status"]
print(f"\n🏷️ Container: {'✅ Installed' if cs.get('installed') else '❌ Not Found'}")
if cs.get("containers"):
print(f" IDs: {', '.join(cs['containers'])}")
# DataLayer
dl = self.report["datalayer_analysis"]
print(f"\n📊 DataLayer:")
print(f" Events found: {len(dl.get('events', []))}")
print(f" Validation issues: {len(dl.get('validation_issues', []))}")
# Forms
fa = self.report["form_analysis"]
if fa.get("forms_found"):
print(f"\n📝 Forms:")
print(f" Forms found: {len(fa['forms_found'])}")
print(f" Tracking issues: {len(fa.get('tracking_issues', []))}")
# Tags
print(f"\n🔥 Tags Fired: {', '.join(self.report['tags_fired']) if self.report['tags_fired'] else 'None detected'}")
# Issues
print(f"\n⚠️ Total Issues: {len(self.report['issues'])}")
for issue in self.report["issues"][:5]:
print(f" - [{issue['severity'].upper()}] {issue['message']}")
# Recommendations
print(f"\n💡 Recommendations: {len(self.report['recommendations'])}")
for rec in self.report["recommendations"][:3]:
print(f" - [{rec['priority'].upper()}] {rec['action']}")
print("\n" + "="*60)
class NotionExporter:
"""Export GTM audit reports to Notion database."""
# Default database ID - OurDigital GTM Audit Log
DEFAULT_DATABASE_ID = "2cf581e5-8a1e-8163-997f-ccb387156a20"
def __init__(self, token=None, database_id=None):
import os
self.token = token or os.environ.get("NOTION_TOKEN") or os.environ.get("NOTION_API_KEY")
self.database_id = database_id or self.DEFAULT_DATABASE_ID
self.base_url = "https://api.notion.com/v1"
if not self.token:
raise ValueError("Notion token not found. Set NOTION_TOKEN or NOTION_API_KEY environment variable.")
def _headers(self):
return {
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json",
"Notion-Version": "2022-06-28",
}
def _make_request(self, method, endpoint, data=None):
import urllib.request
import urllib.error
url = f"{self.base_url}{endpoint}"
req = urllib.request.Request(url, method=method, headers=self._headers())
if data:
req.data = json.dumps(data).encode("utf-8")
try:
with urllib.request.urlopen(req) as response:
return json.loads(response.read().decode("utf-8"))
except urllib.error.HTTPError as e:
error_body = e.read().decode("utf-8")
raise Exception(f"Notion API error: {e.code} - {error_body}")
def _generate_audit_id(self, url, timestamp):
"""Generate unique audit ID."""
from urllib.parse import urlparse
import hashlib
domain = urlparse(url).netloc.replace("www.", "")
date_str = timestamp[:10].replace("-", "")
hash_suffix = hashlib.md5(f"{url}{timestamp}".encode()).hexdigest()[:6]
return f"GTM-{domain[:20]}-{date_str}-{hash_suffix}"
def _determine_audit_status(self, report):
"""Determine overall audit status based on issues."""
issues = report.get("issues", [])
critical_count = sum(1 for i in issues if i.get("severity") == "critical")
high_count = sum(1 for i in issues if i.get("severity") in ["critical", "error"])
if critical_count > 0:
return "Fail"
elif high_count > 0 or len(issues) > 5:
return "Warning"
return "Pass"
def _determine_gtm_status(self, container_status):
"""Determine GTM installation status."""
if not container_status.get("installed"):
return "Not Found"
if len(container_status.get("containers", [])) > 1:
return "Multiple Containers"
return "Installed"
def _generate_summary(self, report):
"""Generate text summary of audit findings."""
parts = []
# Container status
cs = report.get("container_status", {})
if cs.get("installed"):
containers = cs.get("containers", [])
parts.append(f"GTM: {', '.join(containers)}")
else:
parts.append("GTM: Not installed")
# DataLayer
dl = report.get("datalayer_analysis", {})
event_count = len(dl.get("events", []))
validation_issues = len(dl.get("validation_issues", []))
parts.append(f"Events: {event_count}")
if validation_issues:
parts.append(f"Validation issues: {validation_issues}")
# Forms
fa = report.get("form_analysis", {})
form_count = len(fa.get("forms_found", []))
if form_count:
parts.append(f"Forms: {form_count}")
# Tags
tags = report.get("tags_fired", [])
if tags:
parts.append(f"Tags: {', '.join(tags)}")
# Issues
issues = report.get("issues", [])
if issues:
parts.append(f"Total issues: {len(issues)}")
return " | ".join(parts)
def export_report(self, report, journey_type="full"):
"""Export audit report to Notion database."""
metadata = report.get("audit_metadata", {})
url = metadata.get("url", "")
timestamp = metadata.get("timestamp", datetime.now().isoformat())
audit_id = self._generate_audit_id(url, timestamp)
container_status = report.get("container_status", {})
issues = report.get("issues", [])
tags_fired = report.get("tags_fired", [])
# Count critical issues
critical_count = sum(1 for i in issues if i.get("severity") in ["critical", "error"])
# Build Notion page properties
from urllib.parse import urlparse
site_name = urlparse(url).netloc if url else "Unknown"
properties = {
"Site": {
"title": [{"text": {"content": site_name}}]
},
"Audit ID": {
"rich_text": [{"text": {"content": audit_id}}]
},
"URL": {
"url": url if url else None
},
"Audit Date": {
"date": {"start": timestamp[:10]} if timestamp else None
},
"Journey Type": {
"select": {"name": journey_type}
},
"GTM Status": {
"select": {"name": self._determine_gtm_status(container_status)}
},
"Container IDs": {
"rich_text": [{"text": {"content": ", ".join(container_status.get("containers", []))}}]
},
"Tags Fired": {
"multi_select": [{"name": tag} for tag in tags_fired if tag in ["GA4", "Google Ads", "Meta Pixel", "LinkedIn", "TikTok", "Kakao", "Naver"]]
},
"Issues Count": {
"number": len(issues)
},
"Critical Issues": {
"number": critical_count
},
"Audit Status": {
"select": {"name": self._determine_audit_status(report)}
},
"Summary": {
"rich_text": [{"text": {"content": self._generate_summary(report)[:2000]}}]
},
}
# Create page in database
page_data = {
"parent": {"database_id": self.database_id},
"properties": properties,
}
result = self._make_request("POST", "/pages", page_data)
page_url = result.get("url", "")
print(f"📤 Exported to Notion: {page_url}")
return {
"success": True,
"page_id": result.get("id"),
"page_url": page_url,
"audit_id": audit_id,
}
def add_detailed_content(self, page_id, report):
"""Add detailed content blocks to the Notion page."""
blocks = []
# Issues section
issues = report.get("issues", [])
if issues:
blocks.append({
"type": "heading_2",
"heading_2": {
"rich_text": [{"type": "text", "text": {"content": "Issues Found"}}]
}
})
for issue in issues[:10]: # Limit to 10 issues
severity = issue.get("severity", "info").upper()
message = issue.get("message", "")
blocks.append({
"type": "bulleted_list_item",
"bulleted_list_item": {
"rich_text": [{"type": "text", "text": {"content": f"[{severity}] {message}"}}]
}
})
# Recommendations section
recommendations = report.get("recommendations", [])
if recommendations:
blocks.append({
"type": "heading_2",
"heading_2": {
"rich_text": [{"type": "text", "text": {"content": "Recommendations"}}]
}
})
for rec in recommendations:
priority = rec.get("priority", "").upper()
action = rec.get("action", "")
details = rec.get("details", "")
blocks.append({
"type": "bulleted_list_item",
"bulleted_list_item": {
"rich_text": [{"type": "text", "text": {"content": f"[{priority}] {action}: {details}"}}]
}
})
# Checklist section
checklist = report.get("checklist", {})
if checklist:
blocks.append({
"type": "heading_2",
"heading_2": {
"rich_text": [{"type": "text", "text": {"content": "Checklist"}}]
}
})
for category, items in checklist.items():
blocks.append({
"type": "heading_3",
"heading_3": {
"rich_text": [{"type": "text", "text": {"content": category.replace("_", " ").title()}}]
}
})
for item, passed in items.items():
status = "" if passed else ""
item_name = item.replace("_", " ").title()
blocks.append({
"type": "bulleted_list_item",
"bulleted_list_item": {
"rich_text": [{"type": "text", "text": {"content": f"{status} {item_name}"}}]
}
})
if blocks:
self._make_request("PATCH", f"/blocks/{page_id}/children", {"children": blocks})
class DataLayerInjector:
"""Generate GTM custom HTML tags for dataLayer injection.
Use when direct code access is unavailable and you need to push
dataLayer events through GTM custom HTML tags.
"""
# Event templates with required/recommended parameters
EVENT_TEMPLATES = {
"page_view": {
"description": "Track page views with custom dimensions",
"params": {
"page_title": "document.title",
"page_location": "window.location.href",
"page_path": "window.location.pathname",
},
"custom_params": ["content_group", "author", "publish_date"],
"trigger": "All Pages",
},
"view_item": {
"description": "Track product detail page views",
"params": {
"currency": "'KRW'",
"value": "/* product price */",
},
"items_params": ["item_id", "item_name", "item_brand", "item_category", "price", "quantity"],
"trigger": "Product Detail Page",
"scrape_selectors": {
"item_name": [".product-title", ".product-name", "h1.title", "[itemprop='name']"],
"price": [".product-price", ".price", "[itemprop='price']", ".sale-price"],
"item_id": ["[data-product-id]", "[data-sku]", ".product-sku"],
"item_brand": [".brand", "[itemprop='brand']", ".product-brand"],
"item_category": [".category", ".breadcrumb", "[itemprop='category']"],
},
},
"add_to_cart": {
"description": "Track add to cart actions",
"params": {
"currency": "'KRW'",
"value": "/* cart value */",
},
"items_params": ["item_id", "item_name", "item_brand", "item_category", "price", "quantity"],
"trigger": "Add to Cart Button Click",
"trigger_selector": "button.add-to-cart, .btn-cart, [data-action='add-to-cart']",
},
"remove_from_cart": {
"description": "Track remove from cart actions",
"params": {
"currency": "'KRW'",
"value": "/* removed item value */",
},
"items_params": ["item_id", "item_name", "price", "quantity"],
"trigger": "Remove from Cart Click",
"trigger_selector": "button.remove, .btn-remove, [data-action='remove']",
},
"view_cart": {
"description": "Track cart page views",
"params": {
"currency": "'KRW'",
"value": "/* total cart value */",
},
"items_params": ["item_id", "item_name", "price", "quantity"],
"trigger": "Cart Page",
},
"begin_checkout": {
"description": "Track checkout initiation",
"params": {
"currency": "'KRW'",
"value": "/* checkout value */",
"coupon": "/* coupon code if any */",
},
"items_params": ["item_id", "item_name", "price", "quantity"],
"trigger": "Checkout Page",
},
"add_shipping_info": {
"description": "Track shipping info submission",
"params": {
"currency": "'KRW'",
"value": "/* order value */",
"shipping_tier": "/* shipping method */",
},
"items_params": ["item_id", "item_name", "price", "quantity"],
"trigger": "Shipping Form Submit",
},
"add_payment_info": {
"description": "Track payment info submission",
"params": {
"currency": "'KRW'",
"value": "/* order value */",
"payment_type": "/* payment method */",
},
"items_params": ["item_id", "item_name", "price", "quantity"],
"trigger": "Payment Form Submit",
"payment_types_kr": ["신용카드", "카카오페이", "네이버페이", "토스", "무통장입금", "휴대폰결제"],
},
"purchase": {
"description": "Track completed purchases",
"params": {
"transaction_id": "/* unique order ID */",
"currency": "'KRW'",
"value": "/* total order value */",
"tax": "/* tax amount */",
"shipping": "/* shipping cost */",
"coupon": "/* coupon code if any */",
},
"items_params": ["item_id", "item_name", "item_brand", "item_category", "price", "quantity"],
"trigger": "Order Confirmation Page",
"scrape_selectors": {
"transaction_id": [".order-number", ".order-id", "[data-order-id]"],
"value": [".order-total", ".total-price", ".grand-total"],
},
},
"generate_lead": {
"description": "Track lead form submissions",
"params": {
"currency": "'KRW'",
"value": "/* lead value if applicable */",
},
"custom_params": ["form_id", "form_name", "lead_source"],
"trigger": "Lead Form Submit",
},
"form_submit": {
"description": "Track general form submissions",
"params": {},
"custom_params": ["form_id", "form_name", "form_destination"],
"trigger": "Form Submission",
},
"form_start": {
"description": "Track form interaction start",
"params": {},
"custom_params": ["form_id", "form_name"],
"trigger": "Form Field Focus",
},
"scroll": {
"description": "Track scroll depth",
"params": {
"percent_scrolled": "/* 25, 50, 75, 90, 100 */",
},
"trigger": "Scroll Depth",
},
"file_download": {
"description": "Track file downloads",
"params": {
"file_name": "/* filename */",
"file_extension": "/* pdf, xlsx, etc */",
"link_url": "/* download URL */",
},
"trigger": "File Download Click",
"trigger_selector": "a[href$='.pdf'], a[href$='.xlsx'], a[href$='.docx'], a[download]",
},
"video_start": {
"description": "Track video play start",
"params": {
"video_title": "/* video title */",
"video_provider": "/* youtube, vimeo, etc */",
"video_url": "/* video URL */",
},
"trigger": "Video Start",
},
"video_progress": {
"description": "Track video progress milestones",
"params": {
"video_title": "/* video title */",
"video_percent": "/* 25, 50, 75, 100 */",
"video_current_time": "/* seconds watched */",
},
"trigger": "Video Progress",
},
"video_complete": {
"description": "Track video completion",
"params": {
"video_title": "/* video title */",
"video_duration": "/* total duration */",
},
"trigger": "Video Complete",
},
"search": {
"description": "Track site search",
"params": {
"search_term": "/* search query */",
},
"custom_params": ["search_results_count"],
"trigger": "Search Results Page",
"scrape_selectors": {
"search_term": ["input[name='q']", "input[name='search']", ".search-input"],
},
},
"login": {
"description": "Track user login",
"params": {
"method": "/* login method */",
},
"trigger": "Login Success",
},
"sign_up": {
"description": "Track user registration",
"params": {
"method": "/* signup method */",
},
"trigger": "Signup Success",
},
"share": {
"description": "Track social sharing",
"params": {
"method": "/* share platform */",
"content_type": "/* article, product, etc */",
"item_id": "/* content ID */",
},
"trigger": "Share Button Click",
},
"outbound_click": {
"description": "Track outbound link clicks",
"params": {
"link_url": "/* destination URL */",
"link_domain": "/* destination domain */",
"outbound": "true",
},
"trigger": "Outbound Link Click",
},
}
def __init__(self, currency="KRW"):
self.currency = currency
self.generated_tags = []
def generate_tag(self, event_name, options=None):
"""Generate a custom HTML tag for a specific event."""
options = options or {}
if event_name not in self.EVENT_TEMPLATES:
return {"error": f"Unknown event: {event_name}. Available: {list(self.EVENT_TEMPLATES.keys())}"}
template = self.EVENT_TEMPLATES[event_name]
# Build the tag
tag = {
"event": event_name,
"name": f"cHTML - dataLayer - {event_name}",
"description": template["description"],
"trigger_recommendation": template.get("trigger", "Custom"),
"trigger_selector": template.get("trigger_selector"),
"html": self._generate_html(event_name, template, options),
"scrape_selectors": template.get("scrape_selectors", {}),
}
self.generated_tags.append(tag)
return tag
def _generate_html(self, event_name, template, options):
"""Generate the custom HTML tag code."""
use_scraping = options.get("use_scraping", False)
include_comments = options.get("include_comments", True)
lines = ["<script>"]
if include_comments:
lines.append(f"// GTM Custom HTML Tag: {event_name}")
lines.append(f"// {template['description']}")
lines.append("// Generated by OurDigital GTM Manager")
lines.append("")
# Check if this is an ecommerce event with items
has_items = "items_params" in template
if has_items:
lines.append("(function() {")
lines.append(" // Clear previous ecommerce data")
lines.append(" window.dataLayer.push({ ecommerce: null });")
lines.append("")
if use_scraping and template.get("scrape_selectors"):
lines.extend(self._generate_scraping_code(template, event_name))
else:
lines.extend(self._generate_manual_ecommerce_code(event_name, template, options))
lines.append("})();")
else:
# Simple event without items
lines.append("window.dataLayer.push({")
lines.append(f" 'event': '{event_name}',")
for param, default_value in template.get("params", {}).items():
if param == "currency":
lines.append(f" '{param}': '{self.currency}',")
else:
lines.append(f" '{param}': {default_value},")
# Add custom params as placeholders
for param in template.get("custom_params", []):
lines.append(f" '{param}': /* TODO: set {param} */,")
lines.append("});")
lines.append("</script>")
return "\n".join(lines)
def _generate_scraping_code(self, template, event_name):
"""Generate code that scrapes values from DOM."""
lines = []
selectors = template.get("scrape_selectors", {})
lines.append(" // Scrape product/order data from page")
lines.append(" function getText(selectors) {")
lines.append(" for (var i = 0; i < selectors.length; i++) {")
lines.append(" var el = document.querySelector(selectors[i]);")
lines.append(" if (el) return el.textContent.trim();")
lines.append(" }")
lines.append(" return '';")
lines.append(" }")
lines.append("")
lines.append(" function getPrice(selectors) {")
lines.append(" var text = getText(selectors);")
lines.append(" var num = text.replace(/[^0-9]/g, '');")
lines.append(" return num ? parseInt(num, 10) : 0;")
lines.append(" }")
lines.append("")
# Generate variable assignments
for field, sels in selectors.items():
sel_str = json.dumps(sels)
if field in ["price", "value"]:
lines.append(f" var {field} = getPrice({sel_str});")
else:
lines.append(f" var {field} = getText({sel_str});")
lines.append("")
lines.append(" window.dataLayer.push({")
lines.append(f" 'event': '{event_name}',")
lines.append(" 'ecommerce': {")
lines.append(f" 'currency': '{self.currency}',")
if "value" in selectors or "price" in selectors:
lines.append(" 'value': value || price,")
if "transaction_id" in selectors:
lines.append(" 'transaction_id': transaction_id,")
lines.append(" 'items': [{")
items_params = template.get("items_params", [])
for param in items_params:
if param in selectors:
lines.append(f" '{param}': {param},")
elif param == "quantity":
lines.append(f" '{param}': 1,")
elif param == "price":
lines.append(f" '{param}': price,")
lines.append(" }]")
lines.append(" }")
lines.append(" });")
return lines
def _generate_manual_ecommerce_code(self, event_name, template, options):
"""Generate ecommerce code with manual value placeholders."""
lines = []
lines.append(" window.dataLayer.push({")
lines.append(f" 'event': '{event_name}',")
lines.append(" 'ecommerce': {")
for param, default_value in template.get("params", {}).items():
if param == "currency":
lines.append(f" '{param}': '{self.currency}',")
else:
lines.append(f" '{param}': {default_value},")
lines.append(" 'items': [{")
for param in template.get("items_params", []):
lines.append(f" '{param}': /* TODO: set {param} */,")
lines.append(" }]")
lines.append(" }")
lines.append(" });")
return lines
def generate_from_audit(self, audit_report):
"""Generate tags based on audit findings."""
recommendations = []
# Check for missing events
datalayer = audit_report.get("datalayer_analysis", {})
existing_events = [e.get("event") for e in datalayer.get("events", [])]
# Form analysis
form_analysis = audit_report.get("form_analysis", {})
if form_analysis.get("forms_found") and "form_submit" not in existing_events:
recommendations.append({
"event": "form_submit",
"reason": f"Found {len(form_analysis['forms_found'])} form(s) but no form_submit event",
})
self.generate_tag("form_submit")
# Checkout analysis
checkout = audit_report.get("checkout_analysis", {})
elements = checkout.get("elements_found", {})
if elements.get("addToCart") and "add_to_cart" not in existing_events:
recommendations.append({
"event": "add_to_cart",
"reason": "Add to cart elements found but no add_to_cart event",
})
self.generate_tag("add_to_cart")
if elements.get("checkout") and "begin_checkout" not in existing_events:
recommendations.append({
"event": "begin_checkout",
"reason": "Checkout elements found but no begin_checkout event",
})
self.generate_tag("begin_checkout")
# Check for missing ecommerce sequence
for event in CHECKOUT_SEQUENCE:
if event not in existing_events:
if event not in [r["event"] for r in recommendations]:
recommendations.append({
"event": event,
"reason": f"Missing from checkout sequence",
})
self.generate_tag(event)
return {
"recommendations": recommendations,
"generated_tags": self.generated_tags,
}
def generate_all_ecommerce(self):
"""Generate all ecommerce-related tags."""
ecommerce_events = [
"view_item", "add_to_cart", "remove_from_cart", "view_cart",
"begin_checkout", "add_shipping_info", "add_payment_info", "purchase"
]
for event in ecommerce_events:
self.generate_tag(event)
return self.generated_tags
def generate_engagement_tags(self):
"""Generate engagement tracking tags."""
engagement_events = [
"form_submit", "form_start", "scroll", "file_download",
"video_start", "video_progress", "video_complete",
"search", "outbound_click", "share"
]
for event in engagement_events:
self.generate_tag(event)
return self.generated_tags
def save_tags(self, output_path, format="html"):
"""Save generated tags to files."""
import os
if not self.generated_tags:
return {"error": "No tags generated yet"}
os.makedirs(output_path, exist_ok=True)
saved_files = []
for tag in self.generated_tags:
event = tag["event"]
if format == "html":
filename = f"{event}_tag.html"
filepath = os.path.join(output_path, filename)
with open(filepath, "w", encoding="utf-8") as f:
f.write(f"<!-- {tag['name']} -->\n")
f.write(f"<!-- {tag['description']} -->\n")
f.write(f"<!-- Trigger: {tag['trigger_recommendation']} -->\n")
if tag.get("trigger_selector"):
f.write(f"<!-- Trigger Selector: {tag['trigger_selector']} -->\n")
f.write("\n")
f.write(tag["html"])
saved_files.append(filepath)
elif format == "json":
filename = f"{event}_tag.json"
filepath = os.path.join(output_path, filename)
with open(filepath, "w", encoding="utf-8") as f:
json.dump(tag, f, indent=2, ensure_ascii=False)
saved_files.append(filepath)
# Also save a combined file
combined_path = os.path.join(output_path, f"all_tags.{format}")
if format == "html":
with open(combined_path, "w", encoding="utf-8") as f:
f.write("<!-- All Generated GTM Custom HTML Tags -->\n")
f.write("<!-- Generated by OurDigital GTM Manager -->\n\n")
for tag in self.generated_tags:
f.write(f"\n<!-- ========== {tag['event']} ========== -->\n")
f.write(f"<!-- {tag['description']} -->\n")
f.write(f"<!-- Trigger: {tag['trigger_recommendation']} -->\n\n")
f.write(tag["html"])
f.write("\n\n")
else:
with open(combined_path, "w", encoding="utf-8") as f:
json.dump(self.generated_tags, f, indent=2, ensure_ascii=False)
saved_files.append(combined_path)
return {"saved_files": saved_files, "count": len(self.generated_tags)}
def print_tag(self, tag):
"""Print a tag to console."""
print(f"\n{'='*60}")
print(f"📦 {tag['name']}")
print(f" {tag['description']}")
print(f" Trigger: {tag['trigger_recommendation']}")
if tag.get("trigger_selector"):
print(f" Selector: {tag['trigger_selector']}")
print(f"{'='*60}")
print(tag["html"])
print()
def main():
parser = argparse.ArgumentParser(
description="GTM Manager - Audit and manage Google Tag Manager implementations",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Audit a website
python gtm_manager.py audit --url https://example.com
# Generate ecommerce dataLayer tags
python gtm_manager.py inject --preset ecommerce --output ./tags
# Generate tags from audit report
python gtm_manager.py inject --from-audit gtm_audit_report.json
# Generate specific event tag
python gtm_manager.py inject --event purchase --event add_to_cart
"""
)
subparsers = parser.add_subparsers(dest="command", help="Command to run")
# Audit subcommand
audit_parser = subparsers.add_parser("audit", help="Audit GTM implementation on a URL")
audit_parser.add_argument("--url", required=True, help="Target URL to audit")
audit_parser.add_argument("--container", help="Expected GTM container ID (e.g., GTM-XXXXXX)")
audit_parser.add_argument("--journey", default="full",
choices=["pageview", "scroll", "click", "form", "checkout", "datalayer", "full"],
help="Journey type to simulate")
audit_parser.add_argument("--output", default="gtm_audit_report.json", help="Output file path")
audit_parser.add_argument("--timeout", type=int, default=30000, help="Page load timeout (ms)")
audit_parser.add_argument("--headless", action="store_true", default=True, help="Run headless")
audit_parser.add_argument("--notion", action="store_true", help="Export results to Notion database")
audit_parser.add_argument("--notion-database", help="Notion database ID")
audit_parser.add_argument("--notion-detailed", action="store_true", help="Add detailed content to Notion page")
audit_parser.add_argument("--generate-tags", action="store_true", help="Generate missing dataLayer tags after audit")
# Inject subcommand
inject_parser = subparsers.add_parser("inject", help="Generate dataLayer injection tags")
inject_parser.add_argument("--event", action="append", help="Event type(s) to generate (can be repeated)")
inject_parser.add_argument("--preset", choices=["ecommerce", "engagement", "all"],
help="Generate preset group of tags")
inject_parser.add_argument("--from-audit", help="Generate tags based on audit report JSON file")
inject_parser.add_argument("--output", default="./gtm_tags", help="Output directory for generated tags")
inject_parser.add_argument("--format", choices=["html", "json"], default="html", help="Output format")
inject_parser.add_argument("--currency", default="KRW", help="Currency code for ecommerce events")
inject_parser.add_argument("--scrape", action="store_true", help="Generate DOM scraping code for values")
inject_parser.add_argument("--list-events", action="store_true", help="List available event types")
args = parser.parse_args()
if args.command == "audit":
run_audit(args)
elif args.command == "inject":
run_inject(args)
else:
parser.print_help()
def run_audit(args):
"""Run GTM audit."""
auditor = GTMAuditor(
url=args.url,
container_id=args.container,
timeout=args.timeout,
headless=args.headless,
)
report = auditor.run_audit(journey=args.journey)
auditor.save_report(args.output)
auditor.print_summary()
# Generate missing tags if requested
if args.generate_tags:
print("\n🏷️ Generating missing dataLayer tags...")
injector = DataLayerInjector()
result = injector.generate_from_audit(report)
if result["recommendations"]:
print(f" Found {len(result['recommendations'])} missing event(s)")
for rec in result["recommendations"]:
print(f" - {rec['event']}: {rec['reason']}")
output_dir = args.output.replace(".json", "_tags")
save_result = injector.save_tags(output_dir)
print(f" Saved {save_result['count']} tags to {output_dir}/")
else:
print(" No missing events detected")
# Export to Notion if requested
if args.notion:
try:
exporter = NotionExporter(database_id=args.notion_database)
result = exporter.export_report(report, journey_type=args.journey)
if args.notion_detailed and result.get("page_id"):
exporter.add_detailed_content(result["page_id"], report)
print(" Added detailed content to Notion page")
except Exception as e:
print(f"❌ Notion export failed: {e}")
def run_inject(args):
"""Run dataLayer injection tag generation."""
injector = DataLayerInjector(currency=args.currency)
# List available events
if args.list_events:
print("\n📋 Available Event Types:")
print("=" * 60)
categories = {
"Ecommerce": ["view_item", "add_to_cart", "remove_from_cart", "view_cart",
"begin_checkout", "add_shipping_info", "add_payment_info", "purchase"],
"Forms & Leads": ["form_submit", "form_start", "generate_lead"],
"Engagement": ["scroll", "file_download", "search", "outbound_click", "share"],
"Video": ["video_start", "video_progress", "video_complete"],
"User": ["login", "sign_up"],
"Page": ["page_view"],
}
for category, events in categories.items():
print(f"\n{category}:")
for event in events:
template = DataLayerInjector.EVENT_TEMPLATES.get(event, {})
desc = template.get("description", "")
print(f" - {event}: {desc}")
return
# Generate from audit report
if args.from_audit:
try:
with open(args.from_audit, "r", encoding="utf-8") as f:
report = json.load(f)
print(f"📊 Analyzing audit report: {args.from_audit}")
result = injector.generate_from_audit(report)
if result["recommendations"]:
print(f"\n🔍 Found {len(result['recommendations'])} missing event(s):")
for rec in result["recommendations"]:
print(f" - {rec['event']}: {rec['reason']}")
else:
print(" No missing events detected in audit report")
except FileNotFoundError:
print(f"❌ Audit report not found: {args.from_audit}")
return
except json.JSONDecodeError:
print(f"❌ Invalid JSON in audit report: {args.from_audit}")
return
# Generate preset groups
elif args.preset:
print(f"📦 Generating {args.preset} preset tags...")
if args.preset == "ecommerce":
injector.generate_all_ecommerce()
elif args.preset == "engagement":
injector.generate_engagement_tags()
elif args.preset == "all":
injector.generate_all_ecommerce()
injector.generate_engagement_tags()
for event in ["page_view", "login", "sign_up"]:
injector.generate_tag(event)
# Generate specific events
elif args.event:
print(f"📦 Generating tags for: {', '.join(args.event)}")
for event in args.event:
options = {"use_scraping": args.scrape}
result = injector.generate_tag(event, options)
if "error" in result:
print(f"{result['error']}")
else:
injector.print_tag(result)
else:
print("❌ Please specify --event, --preset, or --from-audit")
print(" Use --list-events to see available event types")
return
# Save generated tags
if injector.generated_tags:
save_result = injector.save_tags(args.output, format=args.format)
print(f"\n✅ Saved {save_result['count']} tag(s) to {args.output}/")
for filepath in save_result["saved_files"][:5]:
print(f" - {filepath}")
if len(save_result["saved_files"]) > 5:
print(f" ... and {len(save_result['saved_files']) - 5} more")
if __name__ == "__main__":
main()