Lab 04: JSON Task Management

Build a robust TaskManager class with full CRUD operations and metadata tracking.

Objectives

By the end of this lab, you will:

  • Structure tasks with rich JSON schemas
  • Implement full CRUD operations
  • Track metadata (attempts, timestamps, history)
  • Query tasks by status, age, and other criteria

Prerequisites

  • Labs 01-03 completed
  • Understanding of JSON and Python classes

Why Structured Task Management?

In earlier labs, we used simple dictionaries. That works for demos, but production systems need:

Ad-hoc Approach Structured Approach
Scattered logic Centralized TaskManager
Implicit state Explicit lifecycle
No history Full audit trail
Hard to query Rich query methods
Fragile Validated schemas

The Task Schema

A production-ready task schema:

{
  "id": "task-001",
  "description": "Write a haiku about loops",
  "status": "pending",
  "priority": 1,
  "created_at": "2026-01-19T10:00:00Z",
  "started_at": null,
  "completed_at": null,
  "attempts": 0,
  "max_attempts": 3,
  "result": null,
  "criteria": {
    "type": "haiku",
    "requirements": ["3 lines", "5-7-5 syllables"]
  },
  "metadata": {
    "source": "user-input",
    "tags": ["creative", "poetry"]
  },
  "history": []
}

Field Descriptions

Field Type Purpose
id string Unique identifier
description string What to do
status enum pending/in_progress/completed/failed
priority int Lower = higher priority
created_at ISO date When task was created
started_at ISO date When work began
completed_at ISO date When task finished
attempts int Number of tries so far
max_attempts int Maximum allowed attempts
result string The final output
criteria object Verification requirements
metadata object Extensible custom data
history array Audit trail of events

Step 1: Create the TaskManager Class

Create task_manager.py:

"""
Task Manager - Lab 04

Production-ready task management with JSON persistence,
full CRUD operations, and rich querying capabilities.
"""

import json
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional, List, Dict, Any
from dataclasses import dataclass, asdict
from enum import Enum


class TaskStatus(Enum):
    """Task lifecycle states."""
    PENDING = "pending"
    IN_PROGRESS = "in_progress"
    COMPLETED = "completed"
    FAILED = "failed"


@dataclass
class HistoryEntry:
    """A single history event."""
    timestamp: str
    event: str
    details: Optional[str] = None


@dataclass
class Task:
    """A single task with full metadata."""
    id: str
    description: str
    status: str = "pending"
    priority: int = 5
    created_at: str = ""
    started_at: Optional[str] = None
    completed_at: Optional[str] = None
    attempts: int = 0
    max_attempts: int = 3
    result: Optional[str] = None
    criteria: Optional[Dict[str, Any]] = None
    metadata: Optional[Dict[str, Any]] = None
    history: Optional[List[Dict[str, Any]]] = None
    failure_reason: Optional[str] = None

    def __post_init__(self):
        if not self.created_at:
            self.created_at = datetime.now(timezone.utc).isoformat()
        if self.criteria is None:
            self.criteria = {}
        if self.metadata is None:
            self.metadata = {}
        if self.history is None:
            self.history = []

    def to_dict(self) -> dict:
        """Convert to dictionary for JSON serialization."""
        return asdict(self)

    @classmethod
    def from_dict(cls, data: dict) -> "Task":
        """Create Task from dictionary."""
        return cls(**data)


class TaskManager:
    """
    Manages tasks with JSON persistence and rich querying.

    Usage:
        manager = TaskManager("tasks.json")
        task = manager.create("Write a haiku")
        manager.start(task.id)
        manager.complete(task.id, "Result here")
    """

    def __init__(self, filepath: str = "tasks.json"):
        self.filepath = Path(filepath)
        self.tasks: Dict[str, Task] = {}
        self._load()

    # ==================== Persistence ====================

    def _load(self):
        """Load tasks from JSON file."""
        if self.filepath.exists():
            with open(self.filepath, "r") as f:
                data = json.load(f)
                for task_data in data.get("tasks", []):
                    task = Task.from_dict(task_data)
                    self.tasks[task.id] = task
            print(f"Loaded {len(self.tasks)} tasks from {self.filepath}")
        else:
            print(f"No existing tasks file, starting fresh")

    def _save(self):
        """Persist all tasks to JSON file."""
        data = {
            "tasks": [task.to_dict() for task in self.tasks.values()],
            "saved_at": datetime.now(timezone.utc).isoformat()
        }
        with open(self.filepath, "w") as f:
            json.dump(data, f, indent=2)

    # ==================== CRUD Operations ====================

    def create(
        self,
        description: str,
        priority: int = 5,
        criteria: Optional[Dict] = None,
        metadata: Optional[Dict] = None,
        max_attempts: int = 3
    ) -> Task:
        """Create a new task."""
        task = Task(
            id=f"task-{uuid.uuid4().hex[:8]}",
            description=description,
            priority=priority,
            criteria=criteria,
            metadata=metadata,
            max_attempts=max_attempts
        )

        self._add_history(task, "created", f"Priority: {priority}")
        self.tasks[task.id] = task
        self._save()

        return task

    def get(self, task_id: str) -> Optional[Task]:
        """Get a task by ID."""
        return self.tasks.get(task_id)

    def update(self, task_id: str, **kwargs) -> Optional[Task]:
        """Update task fields."""
        task = self.tasks.get(task_id)
        if not task:
            return None

        for key, value in kwargs.items():
            if hasattr(task, key):
                setattr(task, key, value)

        self._add_history(task, "updated", f"Fields: {list(kwargs.keys())}")
        self._save()
        return task

    def delete(self, task_id: str) -> bool:
        """Delete a task."""
        if task_id in self.tasks:
            del self.tasks[task_id]
            self._save()
            return True
        return False

    # ==================== Lifecycle Operations ====================

    def start(self, task_id: str) -> Optional[Task]:
        """Mark a task as in progress."""
        task = self.tasks.get(task_id)
        if not task:
            return None

        task.status = TaskStatus.IN_PROGRESS.value
        task.started_at = datetime.now(timezone.utc).isoformat()
        task.attempts += 1

        self._add_history(task, "started", f"Attempt {task.attempts}")
        self._save()
        return task

    def complete(self, task_id: str, result: str) -> Optional[Task]:
        """Mark a task as completed with result."""
        task = self.tasks.get(task_id)
        if not task:
            return None

        task.status = TaskStatus.COMPLETED.value
        task.completed_at = datetime.now(timezone.utc).isoformat()
        task.result = result

        self._add_history(task, "completed", f"Result length: {len(result)}")
        self._save()
        return task

    def fail(self, task_id: str, reason: str) -> Optional[Task]:
        """Mark a task as permanently failed."""
        task = self.tasks.get(task_id)
        if not task:
            return None

        task.status = TaskStatus.FAILED.value
        task.completed_at = datetime.now(timezone.utc).isoformat()
        task.failure_reason = reason

        self._add_history(task, "failed", reason)
        self._save()
        return task

    def retry(self, task_id: str, feedback: str = "") -> Optional[Task]:
        """Reset task to pending for retry."""
        task = self.tasks.get(task_id)
        if not task:
            return None

        if task.attempts >= task.max_attempts:
            return self.fail(task_id, f"Max attempts ({task.max_attempts}) reached")

        task.status = TaskStatus.PENDING.value
        task.started_at = None

        self._add_history(task, "retry_scheduled", feedback)
        self._save()
        return task

    # ==================== Query Methods ====================

    def get_all(self) -> List[Task]:
        """Get all tasks."""
        return list(self.tasks.values())

    def get_by_status(self, status: str) -> List[Task]:
        """Get tasks with specific status."""
        return [t for t in self.tasks.values() if t.status == status]

    def get_pending(self) -> List[Task]:
        """Get all pending tasks, sorted by priority."""
        pending = self.get_by_status(TaskStatus.PENDING.value)
        return sorted(pending, key=lambda t: t.priority)

    def get_next(self) -> Optional[Task]:
        """Get the next task to process (highest priority pending)."""
        pending = self.get_pending()
        return pending[0] if pending else None

    def has_pending(self) -> bool:
        """Check if there are pending tasks."""
        return len(self.get_pending()) > 0

    def get_failed(self) -> List[Task]:
        """Get all failed tasks."""
        return self.get_by_status(TaskStatus.FAILED.value)

    def get_completed(self) -> List[Task]:
        """Get all completed tasks."""
        return self.get_by_status(TaskStatus.COMPLETED.value)

    def get_by_tag(self, tag: str) -> List[Task]:
        """Get tasks with a specific tag."""
        return [
            t for t in self.tasks.values()
            if tag in t.metadata.get("tags", [])
        ]

    def get_stale(self, hours: int = 24) -> List[Task]:
        """Get in-progress tasks older than specified hours."""
        cutoff = datetime.now(timezone.utc).timestamp() - (hours * 3600)
        stale = []

        for task in self.get_by_status(TaskStatus.IN_PROGRESS.value):
            if task.started_at:
                started = datetime.fromisoformat(task.started_at).timestamp()
                if started < cutoff:
                    stale.append(task)

        return stale

    # ==================== Statistics ====================

    def get_stats(self) -> Dict[str, Any]:
        """Get comprehensive statistics."""
        all_tasks = list(self.tasks.values())
        total = len(all_tasks)

        if total == 0:
            return {"total": 0, "message": "No tasks"}

        completed = len(self.get_completed())
        failed = len(self.get_failed())
        pending = len(self.get_pending())
        in_progress = len(self.get_by_status(TaskStatus.IN_PROGRESS.value))

        # Calculate average attempts for completed tasks
        completed_tasks = self.get_completed()
        avg_attempts = (
            sum(t.attempts for t in completed_tasks) / len(completed_tasks)
            if completed_tasks else 0
        )

        return {
            "total": total,
            "completed": completed,
            "failed": failed,
            "pending": pending,
            "in_progress": in_progress,
            "completion_rate": completed / total * 100,
            "failure_rate": failed / total * 100 if total > 0 else 0,
            "average_attempts": round(avg_attempts, 2)
        }

    # ==================== Utility ====================

    def _add_history(self, task: Task, event: str, details: str = ""):
        """Add an entry to task history."""
        entry = {
            "timestamp": datetime.now(timezone.utc).isoformat(),
            "event": event,
            "details": details
        }
        task.history.append(entry)

    def reset_all(self):
        """Reset all tasks to pending (for testing)."""
        for task in self.tasks.values():
            task.status = TaskStatus.PENDING.value
            task.attempts = 0
            task.result = None
            task.started_at = None
            task.completed_at = None
            self._add_history(task, "reset", "Manual reset")
        self._save()

    def import_from_text(self, filepath: str, default_priority: int = 5):
        """Import tasks from a text file (one per line)."""
        with open(filepath, "r") as f:
            for line in f:
                description = line.strip()
                if description and not description.startswith("#"):
                    self.create(description, priority=default_priority)

    def export_results(self, filepath: str):
        """Export completed task results to a file."""
        with open(filepath, "w") as f:
            for task in self.get_completed():
                f.write(f"## Task: {task.description}\n\n")
                f.write(f"{task.result}\n\n")
                f.write(f"---\n\n")

Step 2: Create a CLI Interface

Create cli.py:

"""
Task Manager CLI - Lab 04

Command-line interface for task management.
"""

import argparse
from task_manager import TaskManager


def main():
    parser = argparse.ArgumentParser(description="Task Manager CLI")
    parser.add_argument("--file", default="tasks.json", help="Tasks file")

    subparsers = parser.add_subparsers(dest="command", help="Commands")

    # Add task
    add_parser = subparsers.add_parser("add", help="Add a new task")
    add_parser.add_argument("description", help="Task description")
    add_parser.add_argument("--priority", type=int, default=5)

    # List tasks
    list_parser = subparsers.add_parser("list", help="List tasks")
    list_parser.add_argument("--status", help="Filter by status")

    # Show task
    show_parser = subparsers.add_parser("show", help="Show task details")
    show_parser.add_argument("task_id", help="Task ID")

    # Stats
    subparsers.add_parser("stats", help="Show statistics")

    # Import
    import_parser = subparsers.add_parser("import", help="Import from text file")
    import_parser.add_argument("filepath", help="Text file path")

    # Reset
    subparsers.add_parser("reset", help="Reset all tasks to pending")

    args = parser.parse_args()
    manager = TaskManager(args.file)

    if args.command == "add":
        task = manager.create(args.description, priority=args.priority)
        print(f"Created: {task.id}")

    elif args.command == "list":
        if args.status:
            tasks = manager.get_by_status(args.status)
        else:
            tasks = manager.get_all()

        for task in tasks:
            icon = {"pending": "○", "in_progress": "◐", "completed": "✓", "failed": "✗"}
            print(f"{icon.get(task.status, '?')} [{task.id}] {task.description[:50]}")

    elif args.command == "show":
        task = manager.get(args.task_id)
        if task:
            print(f"ID: {task.id}")
            print(f"Description: {task.description}")
            print(f"Status: {task.status}")
            print(f"Priority: {task.priority}")
            print(f"Attempts: {task.attempts}/{task.max_attempts}")
            print(f"Created: {task.created_at}")
            if task.history:
                print(f"\nHistory ({len(task.history)} events):")
                for entry in task.history[-5:]:  # Last 5 events
                    print(f"  {entry['timestamp'][:19]} - {entry['event']}")
        else:
            print(f"Task not found: {args.task_id}")

    elif args.command == "stats":
        stats = manager.get_stats()
        print(f"Total: {stats['total']}")
        print(f"Completed: {stats['completed']} ({stats['completion_rate']:.1f}%)")
        print(f"Failed: {stats['failed']} ({stats['failure_rate']:.1f}%)")
        print(f"Pending: {stats['pending']}")
        print(f"In Progress: {stats['in_progress']}")
        print(f"Avg Attempts: {stats['average_attempts']}")

    elif args.command == "import":
        manager.import_from_text(args.filepath)
        print(f"Imported tasks from {args.filepath}")

    elif args.command == "reset":
        manager.reset_all()
        print("All tasks reset to pending")

    else:
        parser.print_help()


if __name__ == "__main__":
    main()

Step 3: Use in Your Loop

Update your main loop to use the TaskManager:

"""
Loop with TaskManager - Lab 04
"""

import anthropic
from task_manager import TaskManager


client = anthropic.Anthropic()
manager = TaskManager("tasks.json")


def process_task(task):
    """Process a single task."""
    manager.start(task.id)

    response = client.messages.create(
        model="claude-sonnet-4-20250514",
        max_tokens=500,
        messages=[{"role": "user", "content": task.description}]
    )
    result = response.content[0].text

    # Simplified verification (see Lab 03 for full version)
    manager.complete(task.id, result)
    return True


def main():
    # Import tasks if empty
    if not manager.tasks:
        manager.import_from_text("tasks.txt")

    stats = manager.get_stats()
    print(f"Tasks: {stats['pending']} pending, {stats['completed']} done\n")

    while manager.has_pending():
        task = manager.get_next()
        print(f"Processing [{task.id}] {task.description[:40]}...")

        if process_task(task):
            print(f"  ✓ Completed\n")
        else:
            print(f"  ✗ Failed\n")

    # Final stats
    stats = manager.get_stats()
    print(f"\nDone! {stats['completion_rate']:.0f}% completion rate")
    print(f"Average attempts: {stats['average_attempts']}")


if __name__ == "__main__":
    main()

Step 4: Test the CLI

# Add tasks
python cli.py add "Write a haiku about Python" --priority 1
python cli.py add "Explain recursion simply" --priority 2
python cli.py add "List 3 benefits of testing" --priority 3

# List all tasks
python cli.py list

# Show details
python cli.py show task-abc12345

# Check stats
python cli.py stats

# Import from file
python cli.py import tasks.txt

Understanding the Design

Task Lifecycle

    ┌──────────┐
    │  CREATE  │
    └────┬─────┘
         │
         ▼
    ┌──────────┐
    │ PENDING  │◄─────────────────┐
    └────┬─────┘                  │
         │ start()                │ retry()
         ▼                        │
    ┌──────────────┐              │
    │ IN_PROGRESS  │──────────────┤
    └────┬─────────┘              │
         │                        │
    ┌────┴────┐                   │
    │         │                   │
    ▼         ▼                   │
┌────────┐  ┌────────┐            │
│COMPLETED│  │ FAILED │───────────┘
└────────┘  └────────┘   (if retries left)

Why History Matters

task.history = [
    {"timestamp": "...", "event": "created", "details": "Priority: 1"},
    {"timestamp": "...", "event": "started", "details": "Attempt 1"},
    {"timestamp": "...", "event": "retry_scheduled", "details": "Wrong format"},
    {"timestamp": "...", "event": "started", "details": "Attempt 2"},
    {"timestamp": "...", "event": "completed", "details": "Result length: 47"}
]

History enables:

  • Debugging: See exactly what happened
  • Auditing: Track all state changes
  • Analytics: Measure time between events
  • Replay: Understand failure patterns

Priority Scheduling

def get_next(self) -> Optional[Task]:
    pending = self.get_pending()  # Already sorted by priority
    return pending[0] if pending else None

Lower priority number = processed first. This enables:

  • Urgent tasks (priority 1) processed before routine tasks (priority 5)
  • Consistent ordering across restarts

Exercises

Exercise 1: Add Dependencies

Allow tasks to depend on other tasks:

{
    "id": "task-002",
    "depends_on": ["task-001"],  # Must complete task-001 first
    ...
}

Update get_pending() to respect dependencies.

Exercise 2: Add Deadlines

Add deadline tracking:

{
    "deadline": "2026-01-20T18:00:00Z",
    ...
}

def get_overdue(self) -> List[Task]:
    """Get tasks past their deadline."""
    pass

Exercise 3: Task Templates

Create reusable task templates:

manager.create_from_template("code-review", variables={"file": "main.py"})

Checkpoint

Before moving on, verify:

  • Tasks persist correctly to JSON
  • History tracks all state changes
  • CLI commands work as expected
  • Priority scheduling works correctly
  • Stats accurately reflect task states

Key Takeaway

Structured task management scales better than ad-hoc.

A proper TaskManager gives you:

  • Single source of truth for all task state
  • Full audit trail via history
  • Rich querying for monitoring and debugging
  • Consistent lifecycle management
  • Easy testing via reset and import

Get the Code

Full implementation: 8me/src/tier1-ralph-loop/task_manager.py



Back to top

8me Showcase - AI Agent Orchestration Learning Platform

This site uses Just the Docs, a documentation theme for Jekyll.