Lab 13: MCP Server Deployment
Deploy your orchestrator as an MCP server accessible to AI tools.
Objectives
By the end of this lab, you will:
- Understand MCP (Model Context Protocol) architecture
- Deploy your orchestrator as an MCP server
- Expose resources and tools via MCP
- Test with Claude Desktop
Prerequisites
- Labs 01-12 completed
- Python 3.10+
- Understanding of APIs and servers
What is MCP?
Model Context Protocol (MCP) is a standard for AI tools to interact with external services:
┌─────────────────┐ ┌─────────────────┐
│ AI Client │ │ MCP Server │
│ (Claude Desktop)│◄───────►│ (Your Service) │
└─────────────────┘ MCP └─────────────────┘
Protocol
MCP provides:
- Resources: Read-only data (like GET endpoints)
- Tools: Actions that change state (like POST endpoints)
- Standard protocol: Works with any MCP-compatible client
MCP Architecture
┌────────────────────────────────────────────────────────────┐
│ MCP SERVER │
│ │
│ ┌──────────────────┐ ┌──────────────────────────────┐ │
│ │ Resources │ │ Tools │ │
│ │ │ │ │ │
│ │ • task://queue │ │ • add_task(description) │ │
│ │ • task://current │ │ • start_task(id) │ │
│ │ • task://stats │ │ • complete_task(id, result) │ │
│ │ │ │ • fail_task(id, reason) │ │
│ └──────────────────┘ └──────────────────────────────┘ │
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Your Orchestrator │ │
│ │ │ │
│ │ TaskStore ◄─► Executor ◄─► Verifier ◄─► Safety │ │
│ └──────────────────────────────────────────────────────┘ │
└────────────────────────────────────────────────────────────┘
Step 1: Install FastMCP
pip install fastmcp
FastMCP is a lightweight framework for building MCP servers in Python.
Step 2: Create the MCP Server
Create mcp_server.py:
"""
MCP Server - Lab 13
Exposes the orchestrator via Model Context Protocol.
"""
from fastmcp import FastMCP
from typing import Optional, List
from dataclasses import asdict
import json
# Import your orchestrator components
from task_manager import TaskManager, TaskStatus
# Initialize FastMCP server
mcp = FastMCP("ralph-workflow")
# Initialize task manager (shared state)
task_manager = TaskManager("mcp_tasks.json")
# ==================== Resources ====================
# Resources are read-only views of data
@mcp.resource("task://queue")
def get_task_queue() -> str:
"""
Get all tasks in the queue.
Returns JSON array of all tasks with their status.
"""
tasks = task_manager.get_all()
return json.dumps([
{
"id": t.id,
"description": t.description,
"status": t.status.value if hasattr(t.status, 'value') else t.status,
"attempts": t.attempts,
"priority": getattr(t, 'priority', 5)
}
for t in tasks
], indent=2)
@mcp.resource("task://current")
def get_current_task() -> str:
"""
Get the current task being processed.
Returns the next pending task, or null if none.
"""
task = task_manager.get_next()
if task:
return json.dumps({
"id": task.id,
"description": task.description,
"status": task.status.value if hasattr(task.status, 'value') else task.status,
"attempts": task.attempts
}, indent=2)
return json.dumps(None)
@mcp.resource("task://stats")
def get_stats() -> str:
"""
Get task processing statistics.
Returns counts and rates for task completion.
"""
stats = task_manager.get_stats()
return json.dumps(stats, indent=2)
@mcp.resource("task://pending")
def get_pending_tasks() -> str:
"""
Get only pending tasks.
Returns tasks that are waiting to be processed.
"""
pending = task_manager.get_pending()
return json.dumps([
{
"id": t.id,
"description": t.description,
"priority": getattr(t, 'priority', 5)
}
for t in pending
], indent=2)
@mcp.resource("task://completed")
def get_completed_tasks() -> str:
"""
Get completed tasks with results.
Returns tasks that have been successfully processed.
"""
completed = task_manager.get_completed()
return json.dumps([
{
"id": t.id,
"description": t.description,
"result": t.result[:200] if t.result else None # Truncate long results
}
for t in completed
], indent=2)
# ==================== Tools ====================
# Tools are actions that modify state
@mcp.tool()
def add_task(
description: str,
priority: int = 5,
max_attempts: int = 3
) -> str:
"""
Add a new task to the queue.
Args:
description: What the task should accomplish
priority: Priority level (1=highest, 10=lowest)
max_attempts: Maximum retry attempts
Returns:
The created task ID
"""
task = task_manager.create(
description=description,
priority=priority,
max_attempts=max_attempts
)
return f"Created task: {task.id}"
@mcp.tool()
def start_task(task_id: str) -> str:
"""
Mark a task as in-progress.
Args:
task_id: The task ID to start
Returns:
Confirmation message
"""
task = task_manager.start(task_id)
if task:
return f"Started task: {task_id} (attempt {task.attempts})"
return f"Task not found: {task_id}"
@mcp.tool()
def complete_task(task_id: str, result: str) -> str:
"""
Mark a task as completed with result.
Args:
task_id: The task ID to complete
result: The result/output of the task
Returns:
Confirmation message
"""
task = task_manager.complete(task_id, result)
if task:
return f"Completed task: {task_id}"
return f"Task not found: {task_id}"
@mcp.tool()
def fail_task(task_id: str, reason: str) -> str:
"""
Mark a task as failed.
Args:
task_id: The task ID to fail
reason: Why the task failed
Returns:
Confirmation message
"""
task = task_manager.fail(task_id, reason)
if task:
return f"Failed task: {task_id} - {reason}"
return f"Task not found: {task_id}"
@mcp.tool()
def retry_task(task_id: str, feedback: str = "") -> str:
"""
Reset a task for retry.
Args:
task_id: The task ID to retry
feedback: Optional feedback for the retry
Returns:
Confirmation message or error if max attempts reached
"""
task = task_manager.retry(task_id, feedback)
if task:
if task.status == TaskStatus.FAILED:
return f"Task {task_id} exceeded max attempts"
return f"Task {task_id} queued for retry"
return f"Task not found: {task_id}"
@mcp.tool()
def get_next_task() -> str:
"""
Get the next task to process.
Returns the highest priority pending task.
"""
task = task_manager.get_next()
if task:
return json.dumps({
"id": task.id,
"description": task.description,
"attempts": task.attempts,
"max_attempts": task.max_attempts
}, indent=2)
return "No pending tasks"
@mcp.tool()
def clear_completed() -> str:
"""
Remove all completed tasks from the queue.
Returns:
Number of tasks removed
"""
completed = task_manager.get_completed()
count = len(completed)
for task in completed:
task_manager.delete(task.id)
return f"Cleared {count} completed tasks"
@mcp.tool()
def reset_all_tasks() -> str:
"""
Reset all tasks to pending status.
Use for testing or reprocessing.
"""
task_manager.reset_all()
return "All tasks reset to pending"
# ==================== Entry Point ====================
if __name__ == "__main__":
mcp.run()
Step 3: Create Package Structure
Create pyproject.toml:
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "ralph-mcp-server"
version = "1.0.0"
description = "MCP server for Ralph workflow orchestration"
requires-python = ">=3.10"
dependencies = [
"fastmcp>=0.1.0",
"anthropic>=0.18.0",
]
[project.scripts]
ralph-mcp = "mcp_server:mcp.run"
[tool.hatch.build.targets.wheel]
packages = ["."]
Step 4: Install and Test
# Install in development mode
pip install -e .
# Run the server directly
python mcp_server.py
# Or use the installed command
ralph-mcp
Step 5: Configure Claude Desktop
Add to your Claude Desktop configuration (~/.config/claude/claude_desktop_config.json on Linux/Mac or %APPDATA%\Claude\claude_desktop_config.json on Windows):
{
"mcpServers": {
"ralph-workflow": {
"command": "ralph-mcp",
"args": []
}
}
}
Or if running from source:
{
"mcpServers": {
"ralph-workflow": {
"command": "python",
"args": ["/path/to/mcp_server.py"]
}
}
}
Restart Claude Desktop to load the server.
Step 6: Test with Claude
In Claude Desktop, you can now:
User: Add a task to write a haiku about MCP
Claude: [Uses add_task tool]
Created task: task-001
User: What tasks are in the queue?
Claude: [Reads task://queue resource]
Here are the current tasks:
- task-001: Write a haiku about MCP (pending)
User: Process the next task
Claude: [Uses get_next_task, then complete_task]
Task completed! Here's the haiku:
...
Understanding MCP Concepts
Resources vs Tools
| Aspect | Resources | Tools |
|---|---|---|
| Purpose | Read data | Perform actions |
| HTTP analogy | GET | POST/PUT/DELETE |
| Side effects | None | Yes |
| Caching | Can be cached | Not cached |
| Use case | View state | Change state |
Resource URIs
@mcp.resource("task://queue") # All tasks
@mcp.resource("task://current") # Current task
@mcp.resource("task://stats") # Statistics
# Pattern: protocol://path
# - protocol: category of resource
# - path: specific resource
Tool Definitions
@mcp.tool()
def add_task(
description: str, # Required parameter
priority: int = 5, # Optional with default
max_attempts: int = 3 # Optional with default
) -> str:
"""
Add a new task to the queue. # Description shown to AI
Args:
description: What the task should accomplish # Param docs
priority: Priority level (1=highest, 10=lowest)
Returns:
The created task ID # Return description
"""
Advanced: Adding the Orchestrator
Extend the server to include full orchestration:
from orchestrator import Orchestrator, OrchestratorBuilder
from components import ClaudeExecutor, ClaudeVerifier, SimpleCircuitBreaker
# Global orchestrator instance
orchestrator = None
@mcp.tool()
def run_orchestrator(max_tasks: int = 10) -> str:
"""
Run the orchestrator to process pending tasks.
Args:
max_tasks: Maximum tasks to process in this run
Returns:
Summary of tasks processed
"""
global orchestrator
if orchestrator is None:
orchestrator = (OrchestratorBuilder()
.with_task_store(task_manager)
.with_executor(ClaudeExecutor())
.with_verifier(ClaudeVerifier())
.with_safety(SimpleCircuitBreaker(max_iterations=max_tasks))
.build())
stats = orchestrator.run()
return json.dumps({
"processed": stats.tasks_processed,
"completed": stats.tasks_completed,
"failed": stats.tasks_failed,
"elapsed_seconds": stats.elapsed_seconds
}, indent=2)
Exercises
Exercise 1: Add Batch Operations
Add tools for batch task operations:
@mcp.tool()
def add_tasks_batch(descriptions: List[str]) -> str:
"""Add multiple tasks at once."""
pass
@mcp.tool()
def complete_tasks_batch(task_ids: List[str], result: str) -> str:
"""Complete multiple tasks with the same result."""
pass
Exercise 2: Add Filtering Resource
Create a resource that accepts parameters:
@mcp.resource("task://filter/{status}")
def get_tasks_by_status(status: str) -> str:
"""Get tasks filtered by status."""
pass
Exercise 3: Add WebSocket Notifications
Implement real-time task notifications (advanced):
# When task status changes, notify connected clients
async def notify_task_change(task_id: str, new_status: str):
pass
Checkpoint
Before moving on, verify:
- MCP server starts without errors
- Resources return correct JSON
- Tools modify state correctly
- Claude Desktop can connect to your server
- You can add and complete tasks via Claude
Key Takeaway
MCP makes your orchestrator accessible to AI tools.
With MCP, your orchestrator becomes:
- Accessible: Any MCP client can use it
- Discoverable: Tools and resources are self-documenting
- Standardized: Follows the MCP protocol
- Composable: Works with other MCP servers
Get the Code
Full implementation: 8me/src/tier3-mcp-server/