Lab 10: Gating and Drift Prevention
Implement quality gates and detect when outputs drift from intent.
Objectives
By the end of this lab, you will:
- Implement pre-execution gates (preconditions)
- Implement post-execution gates (validation)
- Detect semantic drift from original intent
- Handle gate failures gracefully
Prerequisites
- Lab 09 completed (multi-agent)
- Understanding of the orchestrator pattern
Why Gates?
Without gates, errors compound:
Task: "Write a sorting function"
│
▼
Iteration 1: Writes search function (wrong!)
│
▼
Iteration 2: "Improves" the search function
│
▼
Iteration 3: Adds features to the search function
│
▼
Result: Great search function, but task was SORTING
Gates catch problems early:
Task: "Write a sorting function"
│
▼
PRE-GATE: Is task well-defined? ✓
│
▼
Iteration 1: Writes search function
│
▼
POST-GATE: Does output match intent? ✗ DRIFT DETECTED
│
▼
Correction: Re-execute with explicit "must sort, not search"
The Gate Pattern
def gated_execute(task):
# Pre-gate: Check if we should proceed
if not passes_pre_gate(task):
return reject("Pre-conditions not met")
# Execute
result = execute(task)
# Post-gate: Check if result is valid
if not passes_post_gate(task, result):
return rollback_or_retry("Post-conditions failed")
return commit(result)
Step 1: Create the Gate Framework
Create gates.py:
"""
Gating Framework - Lab 10
Pre and post execution gates for quality control.
"""
from abc import ABC, abstractmethod
from typing import Optional, List, Dict, Any, Tuple
from dataclasses import dataclass
from enum import Enum
class GateResult(Enum):
"""Result of a gate check."""
PASS = "pass"
FAIL = "fail"
WARN = "warn" # Pass with warning
@dataclass
class GateCheckResult:
"""Detailed result from a gate check."""
result: GateResult
gate_name: str
message: str
details: Optional[Dict[str, Any]] = None
@property
def passed(self) -> bool:
return self.result in (GateResult.PASS, GateResult.WARN)
@dataclass
class GateReport:
"""Full report from all gate checks."""
checks: List[GateCheckResult]
@property
def passed(self) -> bool:
return all(c.passed for c in self.checks)
@property
def has_warnings(self) -> bool:
return any(c.result == GateResult.WARN for c in self.checks)
@property
def failures(self) -> List[GateCheckResult]:
return [c for c in self.checks if c.result == GateResult.FAIL]
def summary(self) -> str:
passed = sum(1 for c in self.checks if c.result == GateResult.PASS)
warned = sum(1 for c in self.checks if c.result == GateResult.WARN)
failed = sum(1 for c in self.checks if c.result == GateResult.FAIL)
return f"Gates: {passed} passed, {warned} warned, {failed} failed"
class Gate(ABC):
"""Abstract base class for gates."""
def __init__(self, name: str, required: bool = True):
self.name = name
self.required = required
@abstractmethod
def check(self, **kwargs) -> GateCheckResult:
"""Run the gate check."""
pass
class PreGate(Gate):
"""Gate that runs before execution."""
pass
class PostGate(Gate):
"""Gate that runs after execution."""
pass
class GateRunner:
"""
Runs a collection of gates.
Usage:
runner = GateRunner()
runner.add_pre_gate(TaskDefinedGate())
runner.add_post_gate(OutputLengthGate())
pre_report = runner.run_pre_gates(task=task)
if pre_report.passed:
result = execute(task)
post_report = runner.run_post_gates(task=task, result=result)
"""
def __init__(self):
self.pre_gates: List[PreGate] = []
self.post_gates: List[PostGate] = []
def add_pre_gate(self, gate: PreGate) -> "GateRunner":
self.pre_gates.append(gate)
return self
def add_post_gate(self, gate: PostGate) -> "GateRunner":
self.post_gates.append(gate)
return self
def run_pre_gates(self, **kwargs) -> GateReport:
"""Run all pre-execution gates."""
results = []
for gate in self.pre_gates:
try:
result = gate.check(**kwargs)
results.append(result)
# Stop on required gate failure
if gate.required and result.result == GateResult.FAIL:
break
except Exception as e:
results.append(GateCheckResult(
result=GateResult.FAIL,
gate_name=gate.name,
message=f"Gate error: {e}"
))
if gate.required:
break
return GateReport(checks=results)
def run_post_gates(self, **kwargs) -> GateReport:
"""Run all post-execution gates."""
results = []
for gate in self.post_gates:
try:
result = gate.check(**kwargs)
results.append(result)
except Exception as e:
results.append(GateCheckResult(
result=GateResult.FAIL,
gate_name=gate.name,
message=f"Gate error: {e}"
))
return GateReport(checks=results)
Step 2: Implement Common Gates
Add to gates.py:
# ==================== Pre-Gates ====================
class TaskDefinedGate(PreGate):
"""Ensure task is properly defined."""
def __init__(self, min_length: int = 10):
super().__init__("task_defined")
self.min_length = min_length
def check(self, task: str, **kwargs) -> GateCheckResult:
if not task or len(task.strip()) < self.min_length:
return GateCheckResult(
result=GateResult.FAIL,
gate_name=self.name,
message=f"Task too short (min {self.min_length} chars)"
)
# Check for vague tasks
vague_phrases = ["do something", "fix it", "make it work", "whatever"]
if any(phrase in task.lower() for phrase in vague_phrases):
return GateCheckResult(
result=GateResult.WARN,
gate_name=self.name,
message="Task may be too vague"
)
return GateCheckResult(
result=GateResult.PASS,
gate_name=self.name,
message="Task is well-defined"
)
class ResourceAvailableGate(PreGate):
"""Check if required resources are available."""
def __init__(self, required_resources: List[str]):
super().__init__("resource_available")
self.required = required_resources
def check(self, resources: Dict[str, Any] = None, **kwargs) -> GateCheckResult:
resources = resources or {}
missing = [r for r in self.required if r not in resources]
if missing:
return GateCheckResult(
result=GateResult.FAIL,
gate_name=self.name,
message=f"Missing resources: {missing}"
)
return GateCheckResult(
result=GateResult.PASS,
gate_name=self.name,
message="All resources available"
)
class BudgetGate(PreGate):
"""Check if budget allows execution."""
def __init__(self, max_cost: float):
super().__init__("budget")
self.max_cost = max_cost
def check(self, current_cost: float = 0, estimated_cost: float = 0, **kwargs) -> GateCheckResult:
if current_cost + estimated_cost > self.max_cost:
return GateCheckResult(
result=GateResult.FAIL,
gate_name=self.name,
message=f"Would exceed budget (${current_cost + estimated_cost:.2f} > ${self.max_cost:.2f})"
)
if current_cost + estimated_cost > self.max_cost * 0.8:
return GateCheckResult(
result=GateResult.WARN,
gate_name=self.name,
message=f"Approaching budget limit ({(current_cost + estimated_cost) / self.max_cost * 100:.0f}%)"
)
return GateCheckResult(
result=GateResult.PASS,
gate_name=self.name,
message="Within budget"
)
# ==================== Post-Gates ====================
class OutputLengthGate(PostGate):
"""Check if output meets length requirements."""
def __init__(self, min_length: int = 1, max_length: int = 10000):
super().__init__("output_length")
self.min_length = min_length
self.max_length = max_length
def check(self, result: str, **kwargs) -> GateCheckResult:
length = len(result) if result else 0
if length < self.min_length:
return GateCheckResult(
result=GateResult.FAIL,
gate_name=self.name,
message=f"Output too short ({length} < {self.min_length})"
)
if length > self.max_length:
return GateCheckResult(
result=GateResult.FAIL,
gate_name=self.name,
message=f"Output too long ({length} > {self.max_length})"
)
return GateCheckResult(
result=GateResult.PASS,
gate_name=self.name,
message=f"Output length OK ({length} chars)"
)
class FormatGate(PostGate):
"""Check if output matches expected format."""
def __init__(self, expected_format: str):
"""
Args:
expected_format: "json", "markdown", "code", "list", etc.
"""
super().__init__("format")
self.expected_format = expected_format
def check(self, result: str, **kwargs) -> GateCheckResult:
if self.expected_format == "json":
return self._check_json(result)
elif self.expected_format == "markdown":
return self._check_markdown(result)
elif self.expected_format == "code":
return self._check_code(result)
elif self.expected_format == "list":
return self._check_list(result)
else:
return GateCheckResult(
result=GateResult.PASS,
gate_name=self.name,
message="Format check skipped (unknown format)"
)
def _check_json(self, result: str) -> GateCheckResult:
import json
try:
json.loads(result)
return GateCheckResult(
result=GateResult.PASS,
gate_name=self.name,
message="Valid JSON"
)
except:
return GateCheckResult(
result=GateResult.FAIL,
gate_name=self.name,
message="Invalid JSON"
)
def _check_markdown(self, result: str) -> GateCheckResult:
if "#" in result or "**" in result or "-" in result:
return GateCheckResult(
result=GateResult.PASS,
gate_name=self.name,
message="Appears to be Markdown"
)
return GateCheckResult(
result=GateResult.WARN,
gate_name=self.name,
message="May not be Markdown"
)
def _check_code(self, result: str) -> GateCheckResult:
code_indicators = ["def ", "function ", "class ", "const ", "let ", "var ", "import ", "from "]
if any(ind in result for ind in code_indicators):
return GateCheckResult(
result=GateResult.PASS,
gate_name=self.name,
message="Contains code"
)
return GateCheckResult(
result=GateResult.WARN,
gate_name=self.name,
message="May not contain code"
)
def _check_list(self, result: str) -> GateCheckResult:
lines = result.strip().split("\n")
list_lines = [l for l in lines if l.strip().startswith(("-", "*", "•")) or
(l.strip() and l.strip()[0].isdigit() and "." in l[:3])]
if len(list_lines) >= 2:
return GateCheckResult(
result=GateResult.PASS,
gate_name=self.name,
message=f"Contains list ({len(list_lines)} items)"
)
return GateCheckResult(
result=GateResult.FAIL,
gate_name=self.name,
message="Does not appear to be a list"
)
class ConfidenceGate(PostGate):
"""Check if confidence meets threshold."""
def __init__(self, min_confidence: float = 0.7):
super().__init__("confidence")
self.min_confidence = min_confidence
def check(self, confidence: float = 0, **kwargs) -> GateCheckResult:
if confidence >= self.min_confidence:
return GateCheckResult(
result=GateResult.PASS,
gate_name=self.name,
message=f"Confidence OK ({confidence:.0%})"
)
if confidence >= self.min_confidence * 0.8:
return GateCheckResult(
result=GateResult.WARN,
gate_name=self.name,
message=f"Confidence marginal ({confidence:.0%})"
)
return GateCheckResult(
result=GateResult.FAIL,
gate_name=self.name,
message=f"Confidence too low ({confidence:.0%})"
)
Step 3: Implement Drift Detection
Create drift_detection.py:
"""
Drift Detection - Lab 10
Detect when outputs drift from original intent.
"""
from typing import Optional, List, Tuple
from dataclasses import dataclass
import anthropic
from gates import PostGate, GateCheckResult, GateResult
@dataclass
class DriftAnalysis:
"""Result of drift analysis."""
is_drifting: bool
similarity_score: float # 0-1, higher = more similar
drift_description: str
suggested_correction: Optional[str] = None
class DriftDetector:
"""
Detects semantic drift from original intent.
Uses AI to compare output against original task intent.
"""
def __init__(self, model: str = "claude-sonnet-4-20250514"):
self.client = anthropic.Anthropic()
self.model = model
def analyze(self, task: str, output: str, context: str = "") -> DriftAnalysis:
"""
Analyze if output has drifted from task intent.
Args:
task: Original task description
output: The generated output
context: Optional additional context
Returns:
DriftAnalysis with similarity score and description
"""
prompt = f"""Analyze whether this output matches the original task intent.
ORIGINAL TASK:
{task}
{f"CONTEXT: {context}" if context else ""}
OUTPUT TO ANALYZE:
{output}
Evaluate:
1. Does the output address the task?
2. Has the output drifted to a different topic?
3. Are there any misunderstandings?
Respond in this format:
SIMILARITY: (0.0 to 1.0, where 1.0 = perfect match)
DRIFTING: (yes/no)
DESCRIPTION: (brief explanation)
CORRECTION: (if drifting, how to correct - otherwise "N/A")
"""
response = self.client.messages.create(
model=self.model,
max_tokens=500,
messages=[{"role": "user", "content": prompt}]
)
return self._parse_response(response.content[0].text)
def _parse_response(self, text: str) -> DriftAnalysis:
"""Parse the AI response."""
similarity = 0.5
is_drifting = False
description = ""
correction = None
for line in text.split("\n"):
line = line.strip()
if line.upper().startswith("SIMILARITY:"):
try:
similarity = float(line.split(":")[1].strip())
except:
pass
elif line.upper().startswith("DRIFTING:"):
is_drifting = "yes" in line.lower()
elif line.upper().startswith("DESCRIPTION:"):
description = line.split(":", 1)[1].strip()
elif line.upper().startswith("CORRECTION:"):
corr = line.split(":", 1)[1].strip()
if corr.lower() != "n/a":
correction = corr
return DriftAnalysis(
is_drifting=is_drifting,
similarity_score=similarity,
drift_description=description,
suggested_correction=correction
)
class DriftGate(PostGate):
"""Post-gate that checks for semantic drift."""
def __init__(
self,
min_similarity: float = 0.7,
detector: Optional[DriftDetector] = None
):
super().__init__("drift_detection")
self.min_similarity = min_similarity
self.detector = detector or DriftDetector()
def check(self, task: str, result: str, **kwargs) -> GateCheckResult:
analysis = self.detector.analyze(task, result)
if analysis.is_drifting or analysis.similarity_score < self.min_similarity:
return GateCheckResult(
result=GateResult.FAIL,
gate_name=self.name,
message=f"Drift detected: {analysis.drift_description}",
details={
"similarity": analysis.similarity_score,
"correction": analysis.suggested_correction
}
)
if analysis.similarity_score < self.min_similarity + 0.1:
return GateCheckResult(
result=GateResult.WARN,
gate_name=self.name,
message=f"Possible drift: {analysis.drift_description}",
details={"similarity": analysis.similarity_score}
)
return GateCheckResult(
result=GateResult.PASS,
gate_name=self.name,
message=f"No drift detected (similarity: {analysis.similarity_score:.0%})"
)
class KeywordDriftGate(PostGate):
"""
Simple drift detection based on keyword overlap.
Faster and cheaper than AI-based detection.
"""
def __init__(self, min_overlap: float = 0.3):
super().__init__("keyword_drift")
self.min_overlap = min_overlap
def check(self, task: str, result: str, **kwargs) -> GateCheckResult:
# Extract keywords (simple approach)
task_words = self._extract_keywords(task)
result_words = self._extract_keywords(result)
if not task_words:
return GateCheckResult(
result=GateResult.PASS,
gate_name=self.name,
message="No keywords to compare"
)
# Calculate overlap
overlap = len(task_words & result_words) / len(task_words)
if overlap < self.min_overlap:
return GateCheckResult(
result=GateResult.FAIL,
gate_name=self.name,
message=f"Low keyword overlap ({overlap:.0%})",
details={
"task_keywords": list(task_words),
"result_keywords": list(result_words),
"overlap": overlap
}
)
return GateCheckResult(
result=GateResult.PASS,
gate_name=self.name,
message=f"Keyword overlap OK ({overlap:.0%})"
)
def _extract_keywords(self, text: str) -> set:
"""Extract significant words from text."""
# Simple keyword extraction
stop_words = {
"a", "an", "the", "is", "are", "was", "were", "be", "been",
"being", "have", "has", "had", "do", "does", "did", "will",
"would", "could", "should", "may", "might", "must", "shall",
"can", "to", "of", "in", "for", "on", "with", "at", "by",
"from", "as", "into", "through", "during", "before", "after",
"above", "below", "between", "under", "again", "further",
"then", "once", "here", "there", "when", "where", "why", "how",
"all", "each", "few", "more", "most", "other", "some", "such",
"no", "nor", "not", "only", "own", "same", "so", "than", "too",
"very", "just", "and", "but", "if", "or", "because", "until",
"while", "this", "that", "these", "those", "it", "its"
}
words = set()
for word in text.lower().split():
# Clean word
word = ''.join(c for c in word if c.isalnum())
if word and len(word) > 2 and word not in stop_words:
words.add(word)
return words
Step 4: Complete Example
Create gating_demo.py:
"""
Gating Demo - Lab 10
Demonstrates pre/post gates and drift detection.
"""
from gates import (
GateRunner, TaskDefinedGate, BudgetGate,
OutputLengthGate, FormatGate, ConfidenceGate
)
from drift_detection import DriftGate, KeywordDriftGate
def demo_gates():
"""Demonstrate gating in action."""
print("=" * 60)
print("GATING DEMO")
print("=" * 60)
# Set up gate runner
runner = GateRunner()
# Pre-gates
runner.add_pre_gate(TaskDefinedGate(min_length=10))
runner.add_pre_gate(BudgetGate(max_cost=1.0))
# Post-gates
runner.add_post_gate(OutputLengthGate(min_length=10, max_length=5000))
runner.add_post_gate(FormatGate("list"))
runner.add_post_gate(ConfidenceGate(min_confidence=0.7))
runner.add_post_gate(KeywordDriftGate(min_overlap=0.2))
# Test cases
test_cases = [
{
"name": "Good task",
"task": "List 5 benefits of using Python for data science",
"result": """Here are 5 benefits of using Python for data science:
1. Rich ecosystem of libraries (NumPy, Pandas, Scikit-learn)
2. Easy to learn and readable syntax
3. Strong community support
4. Excellent data visualization tools
5. Integration with big data platforms""",
"confidence": 0.9,
"current_cost": 0.10
},
{
"name": "Vague task",
"task": "Do something",
"result": "OK",
"confidence": 0.5,
"current_cost": 0.10
},
{
"name": "Drifted output",
"task": "Write a sorting algorithm",
"result": """Here's a great recipe for chocolate cake:
1. Preheat oven to 350°F
2. Mix flour and sugar
3. Add eggs and butter
4. Bake for 30 minutes""",
"confidence": 0.8,
"current_cost": 0.10
},
{
"name": "Over budget",
"task": "Analyze this large dataset in detail",
"result": "Analysis complete",
"confidence": 0.9,
"current_cost": 0.95,
"estimated_cost": 0.10
}
]
for case in test_cases:
print(f"\n--- Test: {case['name']} ---")
print(f"Task: {case['task'][:50]}...")
# Run pre-gates
pre_report = runner.run_pre_gates(
task=case["task"],
current_cost=case.get("current_cost", 0),
estimated_cost=case.get("estimated_cost", 0.01)
)
print(f"\nPre-gates: {pre_report.summary()}")
for check in pre_report.checks:
icon = {"pass": "✓", "warn": "⚠", "fail": "✗"}[check.result.value]
print(f" {icon} {check.gate_name}: {check.message}")
if not pre_report.passed:
print(" → Execution blocked by pre-gate")
continue
# Simulate execution (already have result)
print(f"\nExecuted. Result: {case['result'][:50]}...")
# Run post-gates
post_report = runner.run_post_gates(
task=case["task"],
result=case["result"],
confidence=case.get("confidence", 0.8)
)
print(f"\nPost-gates: {post_report.summary()}")
for check in post_report.checks:
icon = {"pass": "✓", "warn": "⚠", "fail": "✗"}[check.result.value]
print(f" {icon} {check.gate_name}: {check.message}")
if not post_report.passed:
print(" → Result rejected by post-gate")
else:
print(" → Result accepted!")
if __name__ == "__main__":
demo_gates()
Understanding Gates
Gate Hierarchy
┌─────────────┐
│ Task │
└──────┬──────┘
│
┌──────▼──────┐
│ PRE-GATES │
│ │
│ • Defined? │
│ • Budget? │
│ • Resources?│
└──────┬──────┘
│ Pass?
┌──────▼──────┐
│ EXECUTE │
└──────┬──────┘
│
┌──────▼──────┐
│ POST-GATES │
│ │
│ • Length? │
│ • Format? │
│ • Drift? │
│ • Confidence│
└──────┬──────┘
│ Pass?
┌──────▼──────┐
│ COMMIT │
└─────────────┘
Drift Detection Strategies
| Strategy | Speed | Cost | Accuracy |
|---|---|---|---|
| Keyword overlap | Fast | Free | Low |
| Embedding similarity | Medium | Low | Medium |
| AI evaluation | Slow | Higher | High |
Choose based on your needs:
- Keyword: Quick sanity check
- AI: Important/complex tasks
Exercises
Exercise 1: Custom Validator Gate
Create a gate that runs custom validation functions:
class CustomValidatorGate(PostGate):
def __init__(self, validators: List[Callable[[str], bool]]):
pass
def check(self, result: str, **kwargs) -> GateCheckResult:
pass
Exercise 2: Progressive Gating
Implement gates that become stricter over retries:
class ProgressiveConfidenceGate(PostGate):
def __init__(self, base_threshold: float = 0.7):
pass
def check(self, confidence: float, attempt: int, **kwargs) -> GateCheckResult:
# Higher threshold on first try, lower on retries
pass
Exercise 3: Gate Analytics
Track gate pass/fail rates over time:
class GateAnalytics:
def record(self, gate_name: str, result: GateResult):
pass
def get_pass_rate(self, gate_name: str) -> float:
pass
def report(self) -> dict:
pass
Checkpoint
Before moving on, verify:
- Pre-gates block invalid tasks
- Post-gates catch bad outputs
- Drift detection identifies off-topic outputs
- Gate reports summarize all checks
- You understand when to use each gate type
Key Takeaway
Gates catch problems before they compound.
Gating provides:
- Early rejection of bad inputs (pre-gates)
- Quality assurance on outputs (post-gates)
- Drift prevention to stay on track
- Clear failure reasons for debugging
Get the Code
Related concepts: 8me/src/tier3.5-orchestration-concepts/02-patterns.md