Lab 06: Circuit Breakers

Prevent infinite loops and runaway costs with safety limits.

Objectives

By the end of this lab, you will:

  • Understand why circuit breakers are essential
  • Implement iteration and time limits
  • Detect repeated failures and stuck loops
  • Track costs and enforce budgets

Prerequisites

  • Lab 05 completed (tool calling)
  • Understanding of the loop pattern

The Problem: Infinite Loops

Without safety limits, loops can run forever:

# Dangerous: No exit condition for edge cases
while has_pending_tasks():
    task = get_next()
    result = execute(task)

    if not verify(result):
        retry(task)  # What if it NEVER passes?

Real failure modes:

  • Task is impossible (AI keeps trying forever)
  • Task is ambiguous (different wrong answers each time)
  • Verification is too strict (nothing ever passes)
  • API is failing (same error repeated)
  • Output is looping (same response over and over)

Circuit breakers detect these patterns and stop the loop.


The Circuit Breaker Pattern

class CircuitBreaker:
    def check(self) -> Tuple[bool, str]:
        """
        Check if the loop should continue.

        Returns:
            (True, "OK") if safe to continue
            (False, "reason") if loop should stop
        """
        if self.iterations > self.max_iterations:
            return False, "Max iterations reached"

        if self.consecutive_failures > self.max_consecutive_failures:
            return False, "Too many consecutive failures"

        if self.is_output_repeating():
            return False, "Output repetition detected"

        if self.estimated_cost > self.max_cost:
            return False, "Cost limit exceeded"

        return True, "OK"

Step 1: Create the Circuit Breaker

Create circuit_breaker.py:

"""
Circuit Breaker - Lab 06

Safety mechanisms to prevent infinite loops and runaway costs.
"""

import time
import hashlib
from typing import Tuple, List, Optional
from dataclasses import dataclass, field
from collections import deque


@dataclass
class CircuitBreakerConfig:
    """Configuration for circuit breaker limits."""
    max_iterations: int = 100
    max_consecutive_failures: int = 5
    max_time_seconds: int = 300  # 5 minutes
    max_cost_dollars: float = 1.0
    repetition_window: int = 5
    repetition_threshold: int = 3  # Same output 3 times = stuck


@dataclass
class CircuitBreakerState:
    """Current state of the circuit breaker."""
    iterations: int = 0
    consecutive_failures: int = 0
    total_failures: int = 0
    start_time: float = field(default_factory=time.time)
    estimated_cost: float = 0.0
    recent_outputs: deque = field(default_factory=lambda: deque(maxlen=10))
    recent_errors: deque = field(default_factory=lambda: deque(maxlen=10))
    trip_reason: Optional[str] = None
    is_tripped: bool = False


class CircuitBreaker:
    """
    Safety mechanism to prevent infinite loops.

    Usage:
        breaker = CircuitBreaker()

        while breaker.allow_continue():
            result = process_task()

            if success:
                breaker.record_success(result, tokens_used=500)
            else:
                breaker.record_failure(error_message)

        if breaker.is_tripped:
            print(f"Circuit breaker tripped: {breaker.trip_reason}")
    """

    # Approximate cost per 1K tokens (adjust based on model)
    COST_PER_1K_INPUT = 0.003   # Claude Sonnet input
    COST_PER_1K_OUTPUT = 0.015  # Claude Sonnet output

    def __init__(self, config: Optional[CircuitBreakerConfig] = None):
        self.config = config or CircuitBreakerConfig()
        self.state = CircuitBreakerState()

    def allow_continue(self) -> bool:
        """Check if the loop should continue."""
        ok, reason = self.check()
        if not ok:
            self.state.is_tripped = True
            self.state.trip_reason = reason
        return ok

    def check(self) -> Tuple[bool, str]:
        """
        Run all safety checks.

        Returns:
            (True, "OK") if safe to continue
            (False, "reason") if should stop
        """
        # Check iteration limit
        if self.state.iterations >= self.config.max_iterations:
            return False, f"Max iterations ({self.config.max_iterations}) reached"

        # Check consecutive failures
        if self.state.consecutive_failures >= self.config.max_consecutive_failures:
            return False, f"Too many consecutive failures ({self.state.consecutive_failures})"

        # Check time limit
        elapsed = time.time() - self.state.start_time
        if elapsed >= self.config.max_time_seconds:
            return False, f"Time limit ({self.config.max_time_seconds}s) exceeded"

        # Check cost limit
        if self.state.estimated_cost >= self.config.max_cost_dollars:
            return False, f"Cost limit (${self.config.max_cost_dollars:.2f}) exceeded"

        # Check for output repetition (stuck loop)
        if self._is_output_repeating():
            return False, "Output repetition detected (loop appears stuck)"

        # Check for error repetition (same error repeatedly)
        if self._is_error_repeating():
            return False, "Same error repeating (underlying issue not resolving)"

        return True, "OK"

    def record_success(self, output: str, input_tokens: int = 0, output_tokens: int = 0):
        """Record a successful iteration."""
        self.state.iterations += 1
        self.state.consecutive_failures = 0

        # Track output for repetition detection
        output_hash = self._hash_output(output)
        self.state.recent_outputs.append(output_hash)

        # Track cost
        self._add_cost(input_tokens, output_tokens)

    def record_failure(self, error: str, input_tokens: int = 0, output_tokens: int = 0):
        """Record a failed iteration."""
        self.state.iterations += 1
        self.state.consecutive_failures += 1
        self.state.total_failures += 1

        # Track error for repetition detection
        error_hash = self._hash_output(error)
        self.state.recent_errors.append(error_hash)

        # Track cost (failures still cost money!)
        self._add_cost(input_tokens, output_tokens)

    def record_skip(self):
        """Record a skipped iteration (no API call)."""
        self.state.iterations += 1

    def reset(self):
        """Reset the circuit breaker for a new run."""
        self.state = CircuitBreakerState()

    def _hash_output(self, text: str) -> str:
        """Create a hash of output for comparison."""
        # Normalize whitespace and case for comparison
        normalized = " ".join(text.lower().split())
        return hashlib.md5(normalized.encode()).hexdigest()[:16]

    def _is_output_repeating(self) -> bool:
        """Check if recent outputs are repeating."""
        if len(self.state.recent_outputs) < self.config.repetition_threshold:
            return False

        recent = list(self.state.recent_outputs)[-self.config.repetition_window:]

        # Check if any hash appears too many times
        for output_hash in set(recent):
            if recent.count(output_hash) >= self.config.repetition_threshold:
                return True

        return False

    def _is_error_repeating(self) -> bool:
        """Check if same error is repeating."""
        if len(self.state.recent_errors) < self.config.repetition_threshold:
            return False

        recent = list(self.state.recent_errors)[-self.config.repetition_window:]

        for error_hash in set(recent):
            if recent.count(error_hash) >= self.config.repetition_threshold:
                return True

        return False

    def _add_cost(self, input_tokens: int, output_tokens: int):
        """Add to estimated cost."""
        input_cost = (input_tokens / 1000) * self.COST_PER_1K_INPUT
        output_cost = (output_tokens / 1000) * self.COST_PER_1K_OUTPUT
        self.state.estimated_cost += input_cost + output_cost

    # ==================== Properties ====================

    @property
    def is_tripped(self) -> bool:
        """Check if breaker has tripped."""
        return self.state.is_tripped

    @property
    def trip_reason(self) -> Optional[str]:
        """Get the reason the breaker tripped."""
        return self.state.trip_reason

    @property
    def iterations(self) -> int:
        """Get current iteration count."""
        return self.state.iterations

    @property
    def elapsed_seconds(self) -> float:
        """Get elapsed time in seconds."""
        return time.time() - self.state.start_time

    @property
    def estimated_cost(self) -> float:
        """Get estimated cost so far."""
        return self.state.estimated_cost

    def get_stats(self) -> dict:
        """Get current statistics."""
        return {
            "iterations": self.state.iterations,
            "consecutive_failures": self.state.consecutive_failures,
            "total_failures": self.state.total_failures,
            "elapsed_seconds": round(self.elapsed_seconds, 1),
            "estimated_cost": round(self.state.estimated_cost, 4),
            "is_tripped": self.state.is_tripped,
            "trip_reason": self.state.trip_reason
        }

Step 2: Create Specialized Breakers

Add these to circuit_breaker.py:

class TaskCircuitBreaker(CircuitBreaker):
    """
    Circuit breaker specialized for per-task limits.

    Tracks limits per task, not just globally.
    """

    def __init__(
        self,
        max_attempts_per_task: int = 3,
        config: Optional[CircuitBreakerConfig] = None
    ):
        super().__init__(config)
        self.max_attempts_per_task = max_attempts_per_task
        self.task_attempts: dict = {}

    def check_task(self, task_id: str) -> Tuple[bool, str]:
        """Check if a specific task should be retried."""
        attempts = self.task_attempts.get(task_id, 0)

        if attempts >= self.max_attempts_per_task:
            return False, f"Task {task_id} exceeded max attempts ({self.max_attempts_per_task})"

        return True, "OK"

    def record_task_attempt(self, task_id: str):
        """Record an attempt for a specific task."""
        self.task_attempts[task_id] = self.task_attempts.get(task_id, 0) + 1

    def get_task_attempts(self, task_id: str) -> int:
        """Get attempt count for a task."""
        return self.task_attempts.get(task_id, 0)


class CostAwareCircuitBreaker(CircuitBreaker):
    """
    Circuit breaker with detailed cost tracking and alerts.
    """

    def __init__(
        self,
        budget_dollars: float = 1.0,
        alert_threshold: float = 0.8,  # Alert at 80% of budget
        config: Optional[CircuitBreakerConfig] = None
    ):
        config = config or CircuitBreakerConfig()
        config.max_cost_dollars = budget_dollars
        super().__init__(config)

        self.alert_threshold = alert_threshold
        self.alert_triggered = False

    def check_budget_alert(self) -> Optional[str]:
        """Check if we're approaching budget limit."""
        if self.state.estimated_cost >= self.config.max_cost_dollars * self.alert_threshold:
            if not self.alert_triggered:
                self.alert_triggered = True
                percent = (self.state.estimated_cost / self.config.max_cost_dollars) * 100
                return f"WARNING: {percent:.0f}% of budget used (${self.state.estimated_cost:.4f})"
        return None

    def get_remaining_budget(self) -> float:
        """Get remaining budget in dollars."""
        return max(0, self.config.max_cost_dollars - self.state.estimated_cost)

Step 3: Integrate with the Loop

Create loop_with_breaker.py:

"""
Loop with Circuit Breaker - Lab 06

Demonstrates safe loop execution with circuit breakers.
"""

from task_manager import TaskManager
from executor import execute_task, verify_result
from circuit_breaker import (
    CircuitBreaker,
    CircuitBreakerConfig,
    TaskCircuitBreaker
)


def main():
    manager = TaskManager("tasks.json")

    # Configure circuit breaker
    config = CircuitBreakerConfig(
        max_iterations=50,
        max_consecutive_failures=3,
        max_time_seconds=120,  # 2 minutes
        max_cost_dollars=0.50,
        repetition_threshold=3
    )

    breaker = TaskCircuitBreaker(
        max_attempts_per_task=3,
        config=config
    )

    # Create sample tasks if needed
    if not manager.tasks:
        manager.create("Write a haiku about safety")
        manager.create("Explain circuit breakers in one sentence")
        manager.create("This task is impossible: divide by zero mentally")

    print(f"Starting loop with circuit breaker...")
    print(f"Limits: {config.max_iterations} iterations, "
          f"{config.max_time_seconds}s, ${config.max_cost_dollars}\n")

    # Main loop with circuit breaker
    while manager.has_pending() and breaker.allow_continue():
        task = manager.get_next()

        # Check per-task limit
        task_ok, task_reason = breaker.check_task(task.id)
        if not task_ok:
            print(f"[{task.id}] {task_reason}")
            manager.fail(task.id, task_reason)
            continue

        breaker.record_task_attempt(task.id)

        print(f"[{task.id}] Attempt {breaker.get_task_attempts(task.id)}")
        print(f"  Task: {task.description[:40]}...")

        manager.start(task.id)

        # Execute
        result = execute_task(task.to_dict())

        # Estimate tokens (simplified)
        input_tokens = len(task.description.split()) * 2
        output_tokens = len(result.get("result", "").split()) * 2

        if result["status"] == "completed":
            breaker.record_success(
                result["result"],
                input_tokens=input_tokens,
                output_tokens=output_tokens
            )

            print(f"  ✓ Completed (confidence: {result['confidence']:.0%})")
            manager.complete(task.id, result["result"])

        elif result["status"] == "failed":
            breaker.record_failure(
                result["reason"],
                input_tokens=input_tokens,
                output_tokens=output_tokens
            )

            print(f"  ✗ Failed: {result['reason']}")

            # Check if we should retry or give up on this task
            if breaker.get_task_attempts(task.id) < 3:
                manager.retry(task.id, result["reason"])
            else:
                manager.fail(task.id, result["reason"])

        else:
            breaker.record_failure("Unknown status")
            manager.fail(task.id, "Unknown execution status")

        # Progress report
        stats = breaker.get_stats()
        print(f"  [Breaker: {stats['iterations']} iter, "
              f"${stats['estimated_cost']:.4f}, "
              f"{stats['elapsed_seconds']}s]\n")

    # Final report
    print("=" * 60)

    if breaker.is_tripped:
        print(f"⚠️  CIRCUIT BREAKER TRIPPED: {breaker.trip_reason}")
    else:
        print("✓ Loop completed normally")

    print("\nCircuit Breaker Stats:")
    for key, value in breaker.get_stats().items():
        print(f"  {key}: {value}")

    print("\nTask Stats:")
    task_stats = manager.get_stats()
    print(f"  Completed: {task_stats['completed']}")
    print(f"  Failed: {task_stats['failed']}")
    print(f"  Pending: {task_stats['pending']}")


if __name__ == "__main__":
    main()

Step 4: Test Circuit Breaker Scenarios

Create test_breaker.py:

"""
Test Circuit Breaker Scenarios - Lab 06
"""

from circuit_breaker import CircuitBreaker, CircuitBreakerConfig


def test_iteration_limit():
    """Test that iteration limit trips the breaker."""
    config = CircuitBreakerConfig(max_iterations=5)
    breaker = CircuitBreaker(config)

    for i in range(10):
        if not breaker.allow_continue():
            print(f"Tripped at iteration {i}: {breaker.trip_reason}")
            break
        breaker.record_success(f"output {i}")

    assert breaker.is_tripped
    assert "iterations" in breaker.trip_reason.lower()
    print("✓ Iteration limit test passed")


def test_consecutive_failures():
    """Test that consecutive failures trip the breaker."""
    config = CircuitBreakerConfig(max_consecutive_failures=3, max_iterations=100)
    breaker = CircuitBreaker(config)

    for i in range(10):
        if not breaker.allow_continue():
            print(f"Tripped at iteration {i}: {breaker.trip_reason}")
            break
        breaker.record_failure("same error")

    assert breaker.is_tripped
    assert "consecutive" in breaker.trip_reason.lower()
    print("✓ Consecutive failures test passed")


def test_output_repetition():
    """Test that repeated outputs trip the breaker."""
    config = CircuitBreakerConfig(
        repetition_threshold=3,
        max_iterations=100
    )
    breaker = CircuitBreaker(config)

    for i in range(10):
        if not breaker.allow_continue():
            print(f"Tripped at iteration {i}: {breaker.trip_reason}")
            break
        breaker.record_success("same output every time")

    assert breaker.is_tripped
    assert "repetition" in breaker.trip_reason.lower()
    print("✓ Output repetition test passed")


def test_cost_limit():
    """Test that cost limit trips the breaker."""
    config = CircuitBreakerConfig(max_cost_dollars=0.01, max_iterations=1000)
    breaker = CircuitBreaker(config)

    for i in range(100):
        if not breaker.allow_continue():
            print(f"Tripped at iteration {i}: {breaker.trip_reason}")
            print(f"Estimated cost: ${breaker.estimated_cost:.4f}")
            break
        # Simulate expensive calls
        breaker.record_success(f"output {i}", input_tokens=500, output_tokens=500)

    assert breaker.is_tripped
    assert "cost" in breaker.trip_reason.lower()
    print("✓ Cost limit test passed")


def test_success_resets_consecutive():
    """Test that success resets consecutive failure count."""
    config = CircuitBreakerConfig(max_consecutive_failures=3, max_iterations=100)
    breaker = CircuitBreaker(config)

    breaker.record_failure("error 1")
    breaker.record_failure("error 2")
    assert breaker.state.consecutive_failures == 2

    breaker.record_success("success!")
    assert breaker.state.consecutive_failures == 0

    print("✓ Consecutive reset test passed")


if __name__ == "__main__":
    test_iteration_limit()
    test_consecutive_failures()
    test_output_repetition()
    test_cost_limit()
    test_success_resets_consecutive()
    print("\n✓ All tests passed!")

Understanding Circuit Breakers

Why Multiple Checks?

Check Prevents
Iteration limit Loops that run forever
Consecutive failures Stuck on impossible task
Time limit Long-running stuck loops
Cost limit Runaway API spending
Output repetition Loops producing same wrong answer
Error repetition Same underlying error not resolving

The Trip Pattern

Normal Operation          Approaching Limit         Tripped
      ↓                         ↓                     ↓
┌─────────┐              ┌─────────┐            ┌─────────┐
│ ✓ ✓ ✓ ✓ │   ───────▶   │ ✓ ✓ ✗ ✗ │  ───────▶  │ ⛔ STOP │
│ ✓ ✓ ✓ ✓ │              │ ✗ ✗ ✗   │            │         │
└─────────┘              └─────────┘            └─────────┘

Cost Estimation

# Approximate costs (Claude Sonnet, January 2026)
INPUT_COST = $0.003 / 1K tokens
OUTPUT_COST = $0.015 / 1K tokens

# Example: 500 input + 500 output tokens
cost = (500/1000 * 0.003) + (500/1000 * 0.015)
cost = $0.009 per iteration

# 100 iterations = $0.90

Best Practices

1. Start Conservative

# Start with tight limits, loosen as needed
config = CircuitBreakerConfig(
    max_iterations=20,      # Start low
    max_cost_dollars=0.10,  # Start cheap
)

2. Log Trips

if breaker.is_tripped:
    logging.warning(f"Circuit breaker tripped: {breaker.trip_reason}")
    logging.info(f"Stats: {breaker.get_stats()}")

3. Different Limits for Different Tasks

# Simple tasks: tight limits
simple_config = CircuitBreakerConfig(max_iterations=10)

# Complex tasks: looser limits
complex_config = CircuitBreakerConfig(max_iterations=50, max_cost_dollars=1.0)

4. Alert Before Trip

if breaker.state.iterations > breaker.config.max_iterations * 0.8:
    print("Warning: Approaching iteration limit")

Exercises

Exercise 1: Add Rate Limiting

Add requests-per-minute limiting:

def check_rate_limit(self) -> Tuple[bool, str]:
    """Ensure we don't exceed API rate limits."""
    # Track requests per minute
    # Return False if too many recent requests
    pass

Exercise 2: Gradual Backoff

Implement exponential backoff on failures:

def get_backoff_seconds(self) -> float:
    """Calculate backoff based on consecutive failures."""
    return min(2 ** self.state.consecutive_failures, 60)

Exercise 3: Circuit Breaker Dashboard

Create a simple dashboard showing real-time breaker stats:

def print_dashboard(breaker: CircuitBreaker):
    """Print a visual dashboard of breaker status."""
    pass

Checkpoint

Before moving on, verify:

  • Circuit breaker trips on iteration limit
  • Circuit breaker trips on consecutive failures
  • Output repetition is detected
  • Cost tracking works correctly
  • You understand when each check is useful

Key Takeaway

Safety limits prevent runaway costs and infinite loops.

Circuit breakers are non-negotiable in production:

  • Protect your budget from runaway API costs
  • Prevent infinite loops from stuck tasks
  • Detect stuck states via repetition detection
  • Provide visibility into loop health
  • Enable debugging with detailed stats

Get the Code

Full implementation: 8me/src/tier1-ralph-loop/circuit_breaker.py



Back to top

8me Showcase - AI Agent Orchestration Learning Platform

This site uses Just the Docs, a documentation theme for Jekyll.