U

Ultimate Autonomous Framework

Streamline your workflow with this autonomous, agents, systems, independently. Includes structured workflows, validation checks, and reusable patterns for ai research.

SkillClipticsai researchv1.0.0MIT
0 views0 copies

Ultimate Autonomous Framework

Overview

The Ultimate Autonomous Framework provides a production-ready architecture for building AI agents that operate independently over extended time periods, coordinating multiple sub-agents, managing state across sessions, and recovering from failures without human intervention. While basic agent loops handle single-turn tool calling, a true autonomous framework addresses the hard problems: multi-agent orchestration, persistent memory, dynamic task decomposition, self-correction, and graceful degradation.

This framework matters because real-world agent tasks -- refactoring an entire codebase, managing a deployment pipeline, or conducting multi-day research -- exceed the capabilities of a single model call with a few tools. They require planning, delegation, monitoring, and adaptation over time. The Ultimate Autonomous Framework provides the structural patterns to build these systems reliably.

When to Use

  • Building agents that operate over minutes or hours, not seconds (long-running autonomous tasks)
  • Coordinating multiple specialized agents that collaborate on a shared goal
  • Implementing self-correcting agents that detect their own failures and recover
  • Creating persistent agents that maintain memory and state across sessions
  • Building deployment or CI/CD agents that monitor and respond to events autonomously
  • Designing agent hierarchies where a supervisor delegates to worker agents
  • Implementing agents that dynamically decompose complex tasks into subtasks

Quick Start

# Initialize framework project mkdir -p autonomous-framework/{orchestrator,agents,memory,tasks,eval} cd autonomous-framework # Python setup python -m venv .venv && source .venv/bin/activate pip install anthropic openai pydantic redis celery networkx # Or TypeScript npm init -y npm install @anthropic-ai/sdk openai zod ioredis bullmq
# orchestrator/supervisor.py - Supervisor-worker pattern from dataclasses import dataclass, field from typing import Any from enum import Enum class AgentRole(Enum): PLANNER = "planner" CODER = "coder" REVIEWER = "reviewer" TESTER = "tester" RESEARCHER = "researcher" @dataclass class Task: id: str description: str assigned_to: AgentRole | None = None status: str = "pending" # pending, in_progress, completed, failed, blocked dependencies: list[str] = field(default_factory=list) result: Any = None retries: int = 0 max_retries: int = 3 class SupervisorAgent: """ Top-level agent that decomposes tasks, assigns them to specialized workers, monitors progress, and handles failures. """ def __init__(self, workers: dict[AgentRole, Any], llm_client): self.workers = workers self.llm = llm_client self.task_queue: list[Task] = [] self.completed: list[Task] = [] def decompose(self, goal: str) -> list[Task]: """Use LLM to break a high-level goal into ordered subtasks.""" response = self.llm.messages.create( model="claude-sonnet-4-20250514", max_tokens=4096, messages=[{ "role": "user", "content": f"""Decompose this goal into ordered subtasks. Each subtask should have: id, description, assigned_role, dependencies (list of task ids). Goal: {goal} Respond in JSON: [{{"id": "t1", "description": "...", "role": "coder", "depends_on": []}}]""" }], ) import json tasks_data = json.loads(response.content[0].text) return [ Task( id=t["id"], description=t["description"], assigned_to=AgentRole(t["role"]), dependencies=t.get("depends_on", []), ) for t in tasks_data ] def run(self, goal: str) -> dict: self.task_queue = self.decompose(goal) while self.task_queue: ready = self._get_ready_tasks() if not ready: # Check for deadlocks if all(t.status == "blocked" for t in self.task_queue): return {"status": "deadlocked", "remaining": len(self.task_queue)} continue for task in ready: task.status = "in_progress" worker = self.workers[task.assigned_to] try: result = worker.execute(task) task.result = result task.status = "completed" self.task_queue.remove(task) self.completed.append(task) except Exception as e: task.retries += 1 if task.retries >= task.max_retries: task.status = "failed" # Supervisor re-plans around the failure self._handle_failure(task, e) else: task.status = "pending" return {"status": "completed", "tasks": len(self.completed)} def _get_ready_tasks(self) -> list[Task]: completed_ids = {t.id for t in self.completed} return [ t for t in self.task_queue if t.status == "pending" and all(dep in completed_ids for dep in t.dependencies) ] def _handle_failure(self, task: Task, error: Exception): """Re-plan or skip failed tasks.""" # Ask LLM for an alternative approach pass

Core Concepts

1. Multi-Agent Orchestration Patterns

Production autonomous systems rarely use a single agent. Instead, they compose specialized agents with clear boundaries.

# orchestrator/patterns.py class SequentialPipeline: """ Agents execute in order, each passing output to the next. Best for: Code generation -> Review -> Testing -> Deployment """ def __init__(self, agents: list): self.agents = agents def run(self, initial_input: str) -> str: result = initial_input for agent in self.agents: result = agent.execute(result) return result class ParallelFanOut: """ Multiple agents work on independent subtasks simultaneously. Best for: Searching multiple codebases, running tests on multiple services """ def __init__(self, agents: list): self.agents = agents async def run(self, tasks: list[str]) -> list: import asyncio coroutines = [ agent.execute_async(task) for agent, task in zip(self.agents, tasks) ] return await asyncio.gather(*coroutines) class HierarchicalDelegation: """ Supervisor delegates to specialists, aggregates results. Best for: Complex multi-domain tasks (frontend + backend + infra) """ def __init__(self, supervisor, specialists: dict[str, Any]): self.supervisor = supervisor self.specialists = specialists def run(self, goal: str): plan = self.supervisor.plan(goal) results = {} for step in plan: specialist = self.specialists[step.domain] results[step.id] = specialist.execute(step.description) return self.supervisor.synthesize(results) class DebateAndConsensus: """ Multiple agents propose solutions, debate, converge on best. Best for: Architecture decisions, complex bug diagnosis """ def __init__(self, agents: list, judge): self.agents = agents self.judge = judge def run(self, problem: str, max_rounds: int = 3) -> str: proposals = [agent.propose(problem) for agent in self.agents] for round_num in range(max_rounds): critiques = [] for i, agent in enumerate(self.agents): others = [p for j, p in enumerate(proposals) if j != i] critique = agent.critique(proposals[i], others) critiques.append(critique) # Refine proposals based on critiques proposals = [ agent.refine(proposal, critique) for agent, proposal, critique in zip(self.agents, proposals, critiques) ] return self.judge.select_best(proposals)

2. Persistent Memory Architecture

Autonomous agents that run across sessions need memory that persists beyond the context window.

# memory/store.py from dataclasses import dataclass from datetime import datetime from typing import Optional @dataclass class MemoryEntry: id: str content: str memory_type: str # "episodic", "semantic", "procedural" importance: float # 0.0 to 1.0 created_at: datetime = field(default_factory=datetime.now) last_accessed: datetime = field(default_factory=datetime.now) access_count: int = 0 tags: list[str] = field(default_factory=list) class PersistentMemory: """ Three-tier memory system: - Working memory: Current task context (in-memory) - Episodic memory: Past experiences and outcomes (Redis/DB) - Semantic memory: Learned facts and patterns (Vector DB) """ def __init__(self, redis_client, vector_store): self.working: list[MemoryEntry] = [] self.redis = redis_client self.vectors = vector_store def remember(self, content: str, memory_type: str, importance: float, tags: list[str] = None): entry = MemoryEntry( id=f"mem_{datetime.now().timestamp()}", content=content, memory_type=memory_type, importance=importance, tags=tags or [], ) if memory_type == "working": self.working.append(entry) else: self._persist(entry) def recall(self, query: str, limit: int = 5) -> list[MemoryEntry]: """Retrieve memories relevant to the query.""" results = [] # Check working memory first for entry in self.working: if any(word in entry.content.lower() for word in query.lower().split()): results.append(entry) # Then search persistent storage vector_results = self.vectors.similarity_search(query, k=limit) results.extend(vector_results) # Sort by importance and recency results.sort(key=lambda e: (e.importance, e.last_accessed.timestamp()), reverse=True) return results[:limit] def consolidate(self): """Move important working memories to long-term storage.""" for entry in self.working: if entry.importance > 0.7 or entry.access_count > 3: entry.memory_type = "episodic" self._persist(entry) self.working = [e for e in self.working if e.importance <= 0.7 and e.access_count <= 3] def _persist(self, entry: MemoryEntry): import json self.redis.setex( f"memory:{entry.id}", 86400 * 30, # 30-day TTL json.dumps({ "content": entry.content, "type": entry.memory_type, "importance": entry.importance, "tags": entry.tags, "created": entry.created_at.isoformat(), }), ) self.vectors.upsert(entry.id, entry.content, metadata={"type": entry.memory_type})

3. Self-Correction and Recovery

Production autonomous agents must detect when they are stuck, producing bad output, or heading in the wrong direction.

# eval/self_correction.py class SelfCorrector: """ Monitors agent behavior for common failure patterns and triggers correction strategies. """ def __init__(self, llm_client): self.llm = llm_client self.action_history: list[dict] = [] self.error_patterns: dict[str, int] = {} def record_action(self, action: dict): self.action_history.append(action) def detect_loop(self, window: int = 5) -> bool: """Detect if the agent is repeating the same actions.""" if len(self.action_history) < window: return False recent = self.action_history[-window:] signatures = [f"{a['tool']}:{a.get('path', '')}" for a in recent] return len(set(signatures)) <= 2 def detect_regression(self, test_results: list[bool]) -> bool: """Detect if recent changes made things worse.""" if len(test_results) < 2: return False recent_pass_rate = sum(test_results[-5:]) / min(len(test_results), 5) previous_pass_rate = sum(test_results[:-5]) / max(len(test_results) - 5, 1) return recent_pass_rate < previous_pass_rate - 0.1 def generate_correction(self, issue: str, context: str) -> str: """Ask LLM to analyze the problem and suggest a new approach.""" response = self.llm.messages.create( model="claude-sonnet-4-20250514", max_tokens=2048, messages=[{ "role": "user", "content": f"""The agent encountered an issue during autonomous execution. Issue: {issue} Recent actions: {context} Analyze what went wrong and provide a corrected strategy. Be specific about which steps to retry differently and which to skip.""" }], ) return response.content[0].text

4. Task Graph Execution

Complex tasks form dependency graphs, not linear sequences. The framework must execute tasks respecting dependencies while maximizing parallelism.

// tasks/graph.ts interface TaskNode { id: string; description: string; dependencies: string[]; status: 'pending' | 'running' | 'completed' | 'failed'; assignedAgent: string; result?: any; } class TaskGraph { private nodes: Map<string, TaskNode> = new Map(); addTask(task: TaskNode): void { this.nodes.set(task.id, task); } getReadyTasks(): TaskNode[] { return Array.from(this.nodes.values()).filter(node => { if (node.status !== 'pending') return false; return node.dependencies.every(depId => { const dep = this.nodes.get(depId); return dep?.status === 'completed'; }); }); } markCompleted(taskId: string, result: any): void { const node = this.nodes.get(taskId); if (node) { node.status = 'completed'; node.result = result; } } markFailed(taskId: string): void { const node = this.nodes.get(taskId); if (node) { node.status = 'failed'; // Also mark dependent tasks as blocked for (const [, n] of this.nodes) { if (n.dependencies.includes(taskId)) { n.status = 'failed'; // Cascade failure } } } } isComplete(): boolean { return Array.from(this.nodes.values()).every( n => n.status === 'completed' || n.status === 'failed' ); } getProgress(): { completed: number; failed: number; pending: number; running: number } { const nodes = Array.from(this.nodes.values()); return { completed: nodes.filter(n => n.status === 'completed').length, failed: nodes.filter(n => n.status === 'failed').length, pending: nodes.filter(n => n.status === 'pending').length, running: nodes.filter(n => n.status === 'running').length, }; } }

Configuration Reference

ParameterTypeDefaultDescription
max_total_runtimeint3600Maximum seconds the entire autonomous run can take
max_agent_iterationsint50Maximum iterations per individual agent
memory_consolidation_intervalint20Consolidate working memory every N actions
self_correction_check_intervalint10Check for loops and regressions every N steps
task_retry_limitint3Maximum retries per task before marking as failed
parallel_workersint3Maximum concurrent agent workers
checkpoint_intervalint5Save checkpoint every N task completions
memory_ttl_daysint30TTL for persistent episodic memories
loop_detection_windowint5Number of recent actions to check for repetition
regression_thresholdfloat0.1Pass rate drop that triggers regression correction

Best Practices

  1. Decompose before executing. Always have the supervisor agent create a full task graph before any worker starts. A clear plan prevents wasted work and enables better parallelism estimation.

  2. Assign each sub-agent a narrow, well-defined role. An agent that is a "coder" should not also review its own code. Specialization improves output quality and makes failures easier to isolate.

  3. Implement heartbeat monitoring for long-running agents. If a worker has not produced output in N seconds, the supervisor should check on it, possibly restart it, or reassign the task.

  4. Use memory consolidation aggressively. Working memory should be pruned and summarized after each major task completion. Carrying stale context degrades performance.

  5. Design for partial failure. Not every subtask will succeed. The supervisor should be able to skip non-critical failed tasks and still produce useful output.

  6. Test the orchestration layer independently of agents. Mock agent responses and verify that the task graph, dependency resolution, and failure handling work correctly before adding real LLM calls.

  7. Set cost budgets per agent, not just globally. A runaway research agent should not consume the entire budget. Allocate token budgets proportionally to expected task complexity.

  8. Log at the orchestration level, not just individual agents. Track which supervisor decision led to which agent assignment. This is essential for debugging multi-agent coordination failures.

  9. Prefer explicit handoffs over implicit agent selection. The supervisor should decide which agent handles each task based on the plan, not rely on dynamic routing that can produce inconsistent assignments.

  10. Run end-to-end integration tests with recorded sessions. Capture successful multi-agent runs as test fixtures and replay them to catch regressions in orchestration logic.

Troubleshooting

Problem: Supervisor creates too many fine-grained tasks, overwhelming workers. Solution: Add a constraint in the decomposition prompt (e.g., "Create 3-7 tasks maximum. Group related work into single tasks."). You can also implement a task merging step that combines tasks assigned to the same agent with no interdependencies.

Problem: Agents produce inconsistent output formats, breaking downstream processing. Solution: Define strict output schemas for each agent role using Pydantic or Zod. Validate agent output before passing it to the next stage. Reject and retry on schema violations.

Problem: Memory retrieval returns irrelevant results. Solution: Improve tagging on memory entries so that metadata filtering narrows candidates before semantic search. Also tune the similarity threshold -- too low lets in noise, too high misses relevant memories.

Problem: Multi-agent system is slow due to sequential execution. Solution: Analyze the task graph for parallelism opportunities. Tasks without mutual dependencies should run concurrently. Use asyncio or worker pools to execute independent branches simultaneously.

Problem: Agent keeps retrying a fundamentally impossible task. Solution: Add semantic failure detection. If the same tool call fails 3 times with the same error, escalate to the supervisor with the error context rather than retrying blindly. The supervisor should re-plan, possibly decomposing the task differently.

Community

Reviews

Write a review

No reviews yet. Be the first to review this template!

Similar Templates