Files
macha-autonomous/orchestrator.py
Lily Miller b96a2bb703 queueueueue
2025-10-09 16:44:48 -06:00

1090 lines
46 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Orchestrator - Main control loop for Macha's autonomous system
"""
import json
import time
import signal
import sys
from pathlib import Path
from datetime import datetime
from typing import Dict, Any
from monitor import SystemMonitor
from agent import MachaAgent
from executor import SafeExecutor
from notifier import GotifyNotifier
from context_db import ContextDatabase
from remote_monitor import RemoteMonitor
from config_parser import ConfigParser
from system_discovery import SystemDiscovery
from issue_tracker import IssueTracker
from git_context import GitContext
from typing import List
class MachaOrchestrator:
"""Main orchestrator for autonomous system maintenance"""
def __init__(
self,
check_interval: int = 300, # 5 minutes
autonomy_level: str = "suggest",
state_dir: Path = Path("/var/lib/macha"),
config_file: Path = Path("/etc/macha-autonomous/config.json"),
remote_systems: list = None
):
self.check_interval = check_interval
self.autonomy_level = autonomy_level
self.state_dir = state_dir
self.config_file = config_file
self.running = False
self.remote_systems = remote_systems or []
# Set log file early so _log() works
self.log_file = self.state_dir / "orchestrator.log"
# Load config if exists
self._load_config()
# Initialize context database first
try:
self.context_db = ContextDatabase()
except Exception as e:
self._log(f"Warning: Could not connect to ChromaDB: {e}")
self._log("Continuing without context database")
self.context_db = None
# Initialize config parser
self.config_parser = None
if self.context_db and self.config_repo:
try:
self.config_parser = ConfigParser(self.config_repo)
except Exception as e:
self._log(f"Warning: Could not initialize config parser: {e}")
# Initialize git context
self.git_context = None
if self.config_parser:
try:
# Use the same local repo path as config_parser
local_repo_path = Path("/var/lib/macha/config-repo")
if local_repo_path.exists():
self.git_context = GitContext(repo_path=str(local_repo_path))
self._log(f"Git context initialized for {local_repo_path}")
else:
self._log(f"Warning: Config repo not found at {local_repo_path}")
except Exception as e:
self._log(f"Warning: Could not initialize git context: {e}")
# Initialize components
self.monitor = SystemMonitor(state_dir)
self.agent = MachaAgent(
ollama_host=self.ollama_host,
model=self.model,
state_dir=state_dir,
context_db=self.context_db,
config_repo=self.config_repo,
config_branch=self.config_branch,
use_queue=True,
priority="AUTONOMOUS"
)
self.executor = SafeExecutor(
state_dir=state_dir,
autonomy_level=self.autonomy_level,
agent=self.agent
)
self.notifier = GotifyNotifier()
self.discovery = SystemDiscovery(domain="coven.systems")
self.issue_tracker = IssueTracker(
context_db=self.context_db,
log_dir=str(state_dir / "logs")
) if self.context_db else None
# Initialize system registry
if self.context_db:
try:
self._initialize_system_registry()
except Exception as e:
self._log(f"Warning: Could not initialize system registry: {e}")
# Setup signal handlers
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
def _load_config(self):
"""Load configuration from file"""
import os
self.ollama_host = "http://localhost:11434" # Default
self.model = "gpt-oss:latest" # Default
# Try to get flake URL from NH_FLAKE environment variable (set by nh tool)
nh_flake = os.environ.get("NH_FLAKE", "")
if nh_flake:
self.config_repo = nh_flake
self.config_branch = "main" # NH doesn't specify branch
else:
self.config_repo = "git+https://git.coven.systems/lily/nixos-servers"
self.config_branch = "main"
if self.config_file.exists():
try:
with open(self.config_file, 'r') as f:
config = json.load(f)
self.check_interval = config.get("check_interval", self.check_interval)
self.autonomy_level = config.get("autonomy_level", self.autonomy_level)
self.ollama_host = config.get("ollama_host", self.ollama_host)
self.model = config.get("model", self.model)
# Config file can override NH_FLAKE
self.config_repo = config.get("config_repo", self.config_repo)
self.config_branch = config.get("config_branch", self.config_branch)
self._log(f"Loaded config: model={self.model}, ollama_host={self.ollama_host}, repo={self.config_repo}")
except Exception as e:
self._log(f"Failed to load config: {e}")
def _signal_handler(self, signum, frame):
"""Handle shutdown signals"""
self._log(f"Received signal {signum}, shutting down gracefully...")
self.running = False
def _log(self, message: str):
"""Log a message"""
timestamp = datetime.now().isoformat()
log_line = f"[{timestamp}] {message}"
print(log_line)
with open(self.log_file, 'a') as f:
f.write(log_line + '\n')
def _initialize_system_registry(self):
"""Initialize the system registry in ChromaDB"""
if not self.context_db:
return
import socket
hostname = socket.gethostname()
# Add FQDN
fqdn = f"{hostname}.coven.systems"
# Register self (Macha) - discover local services
local_services = self._discover_local_services()
self._log(f"Registering {fqdn} with repo={self.config_repo}, branch={self.config_branch}")
self.context_db.register_system(
hostname=fqdn,
system_type="workstation",
services=local_services,
capabilities=["ai-inference", "system-orchestration", "log-aggregation"],
metadata={"role": "controller", "local": True},
config_repo=self.config_repo,
config_branch=self.config_branch,
os_type="nixos"
)
# Register remote systems and discover their services
for remote in self.remote_systems:
remote_services = self._discover_remote_services(remote)
self.context_db.register_system(
hostname=remote,
system_type="server",
services=remote_services,
capabilities=[],
config_repo=self.config_repo,
config_branch=self.config_branch,
os_type="nixos" # Assume NixOS for now, will be detected during auto-discovery
)
self._log("System registry initialized")
# Parse and store configuration files
self._parse_and_store_configs()
def _discover_local_services(self) -> List[str]:
"""Discover services running on local system"""
import subprocess
services = set()
try:
# Get all active services
result = subprocess.run(
["systemctl", "list-units", "--type=service", "--state=running", "--no-pager", "--no-legend"],
capture_output=True,
text=True,
timeout=10
)
if result.returncode == 0:
for line in result.stdout.strip().split('\n'):
if line.strip():
# Extract service name (first column)
service_name = line.split()[0].replace('.service', '')
# Filter to interesting application services
if any(keyword in service_name.lower() for keyword in [
'ollama', 'chroma', 'autonomous', 'gotify', 'nextcloud',
'prowlarr', 'radarr', 'sonarr', 'whisparr', 'lidarr', 'readarr',
'sabnzbd', 'transmission', 'calibre', 'gpclient'
]):
services.add(service_name)
except Exception as e:
self._log(f"Warning: Could not discover local services: {e}")
return sorted(services)
def _discover_remote_services(self, hostname: str) -> List[str]:
"""Discover services running on remote system via journal"""
if not hasattr(self, 'journal_monitor'):
from journal_monitor import JournalMonitor
self.journal_monitor = JournalMonitor()
try:
services = self.journal_monitor.get_active_services(hostname)
self._log(f"Discovered {len(services)} services on {hostname}: {', '.join(services[:5])}")
return services
except Exception as e:
self._log(f"Warning: Could not discover services on {hostname}: {e}")
return []
def _update_service_registry(self):
"""Periodically update the service registry with current running services"""
if not self.context_db:
return
import socket
hostname = socket.gethostname()
fqdn = f"{hostname}.coven.systems"
# Update local services
local_services = self._discover_local_services()
self.context_db.register_system(
hostname=fqdn,
system_type="workstation",
services=local_services,
capabilities=["ai-inference", "system-orchestration", "log-aggregation"],
metadata={"role": "controller", "local": True},
config_repo=self.config_repo,
config_branch=self.config_branch,
os_type="nixos"
)
# Update remote systems
for remote in self.remote_systems:
remote_services = self._discover_remote_services(remote)
if remote_services:
self.context_db.register_system(
hostname=remote,
system_type="server",
services=remote_services,
capabilities=[],
config_repo=self.config_repo,
config_branch=self.config_branch,
os_type="nixos" # Will be updated by auto-discovery
)
def _discover_new_systems(self):
"""Discover new systems from journal logs and register them"""
if not self.context_db or not self.discovery:
return
try:
# Get known systems from database
known_hostnames = self.context_db.get_known_hostnames()
# Discover systems from journal (last 10 minutes)
discovered = self.discovery.discover_from_journal(since_minutes=10)
# Filter to new systems only
new_systems = [h for h in discovered if h not in known_hostnames]
if not new_systems:
return
self._log(f"🔍 Discovered {len(new_systems)} new system(s): {', '.join(new_systems)}")
# Get systems defined in flake for comparison
flake_systems = []
if self.config_parser:
try:
flake_systems = self.config_parser.get_systems_from_flake()
self._log(f"Flake defines {len(flake_systems)} systems: {', '.join(flake_systems)}")
except Exception as e:
self._log(f"Could not get flake systems: {e}")
# Process each new system separately
for hostname in new_systems:
try:
self._log(f"📡 Analyzing new system: {hostname}")
# Check if system is defined in flake
short_hostname = hostname.split('.')[0] # Get 'rhiannon' from 'rhiannon.coven.systems'
in_flake = short_hostname in flake_systems
if in_flake:
self._log(f" ✓ System IS defined in flake as '{short_hostname}'")
else:
self._log(f" ⚠ System NOT found in flake (unmanaged)")
# Detect OS type
os_type = self.discovery.detect_os_type(hostname)
self._log(f" OS detected: {os_type.upper()}")
# Profile the system
profile = self.discovery.profile_system(hostname, os_type)
# Determine role
role = self.discovery.get_system_role(profile)
self._log(f" Role: {role}")
self._log(f" Services: {len(profile['services'])} discovered")
# Register in database
self.context_db.register_system(
hostname=hostname,
system_type=role,
services=profile['services'],
capabilities=profile['capabilities'],
metadata={
'discovered_at': profile['discovered_at'],
'hardware': profile.get('hardware', {}),
'auto_discovered': True,
'in_flake': in_flake,
'flake_name': short_hostname if in_flake else None
},
config_repo=self.config_repo if (os_type == 'nixos' and in_flake) else "",
config_branch=self.config_branch if (os_type == 'nixos' and in_flake) else "",
os_type=os_type
)
# Send notification (with flake info)
if self.notifier:
message = (
f"🔍 New System Auto-Discovered\n\n"
f"Hostname: {hostname}\n"
f"OS: {os_type.upper()}\n"
f"Role: {role}\n"
f"Services: {len(profile['services'])} detected\n"
f"In Flake: {'✓ Yes' if in_flake else '✗ No (unmanaged)'}\n\n"
f"System has been registered and analyzed.\n"
f"Use 'macha-systems' to view all registered systems."
)
self.notifier.send(
title="🌐 Macha: New System Discovered",
message=message,
priority=self.notifier.PRIORITY_MEDIUM
)
# Run separate analysis for this system (include flake status)
profile['in_flake'] = in_flake
profile['flake_name'] = short_hostname if in_flake else None
self._analyze_new_system(hostname, profile)
except Exception as e:
self._log(f"❌ Error processing {hostname}: {e}")
except Exception as e:
self._log(f"Error during system discovery: {e}")
def _analyze_new_system(self, hostname: str, profile: Dict[str, Any]):
"""Run a focused analysis on a newly discovered system"""
try:
self._log(f"🧠 Running AI analysis of {hostname}...")
# Gather system context from ChromaDB
system_context = self.context_db.get_system_context(hostname)
# Create analysis prompt focused on this specific system
in_flake = profile.get('in_flake', False)
flake_name = profile.get('flake_name', '')
flake_status = ""
if in_flake:
flake_status = f"\n✓ This system IS defined in the flake as '{flake_name}'"
flake_status += f"\n You can review its intended configuration at: systems/{flake_name}.nix"
flake_status += f"\n Compare actual vs expected to identify drift."
else:
flake_status = f"\n⚠ This system is NOT in the flake (unmanaged system)"
flake_status += f"\n You cannot manage its NixOS configuration directly."
analysis = self.agent._create_analysis_prompt({
'hostname': hostname,
'os_type': profile['os_type'],
'services': profile['services'],
'capabilities': profile['capabilities'],
'hardware': profile.get('hardware', {}),
'discovered_at': profile['discovered_at'],
'in_flake': in_flake,
'flake_name': flake_name
}, system_context)
# Get AI analysis
response = self.agent._query_ollama(
f"You have discovered a new system in your infrastructure. "
f"Review its profile and provide initial observations.\n\n"
f"{flake_status}\n\n{analysis}",
model=self.agent.model
)
if response:
self._log(f"📝 AI Analysis for {hostname}:")
self._log(response[:500]) # Log first 500 chars
# Store this as a decision/observation
self.context_db.record_decision({
'type': 'system_discovery',
'hostname': hostname,
'analysis': response,
'profile': profile
})
except Exception as e:
self._log(f"Warning: Could not analyze {hostname}: {e}")
def _parse_and_store_configs(self):
"""Parse repository and store config files in ChromaDB"""
if not self.config_parser or not self.context_db:
return
try:
self._log("Parsing configuration repository...")
# Ensure repository is up to date
if not self.config_parser.ensure_repo():
self._log("Warning: Could not update config repository")
return
# Get systems from flake
systems = self.config_parser.get_systems_from_flake()
self._log(f"Found {len(systems)} systems in flake: {', '.join(systems)}")
# For each system, get its config files
for system_name in systems:
fqdn = f"{system_name}.coven.systems"
config = self.config_parser.get_system_config(system_name)
if not config['main_file']:
continue
# Update system with list of config files
self.context_db.update_system_config_files(fqdn, config['all_files'])
# Store each config file in ChromaDB
for file_path in config['all_files']:
content = self.config_parser.read_file_content(file_path)
if content:
# Determine category from path
category = "unknown"
if file_path.startswith("apps/"):
category = "apps"
elif file_path.startswith("systems/"):
category = "systems"
elif file_path.startswith("osconfigs/"):
category = "osconfigs"
elif file_path.startswith("users/"):
category = "users"
self.context_db.store_config_file(
file_path=file_path,
content=content,
category=category,
systems_using=[fqdn]
)
self._log(f"Configuration parsing complete")
except Exception as e:
self._log(f"Error parsing configs: {e}")
import traceback
self._log(traceback.format_exc())
def _log_metrics(self, data: Dict[str, Any]):
"""Log key metrics in a structured format for easy parsing"""
res = data.get("resources", {})
systemd = data.get("systemd", {})
logs = data.get("logs", {})
disk = data.get("disk", {})
self._log("KEY METRICS:")
self._log(f" CPU Usage: {res.get('cpu_percent', 0):.1f}%")
self._log(f" Memory Usage: {res.get('memory_percent', 0):.1f}%")
self._log(f" Load Average: {res.get('load_average', {}).get('1min', 0):.2f}")
self._log(f" Failed Services: {systemd.get('failed_count', 0)}")
self._log(f" Errors (1h): {logs.get('error_count_1h', 0)}")
# Disk usage for critical partitions
for part in disk.get("partitions", []):
if part.get("mountpoint") in ["/", "/home", "/var"]:
self._log(f" Disk {part['mountpoint']}: {part.get('percent_used', 0):.1f}% used")
# Network status
net = data.get("network", {})
internet_status = "✅ Connected" if net.get("internet_reachable") else "❌ Offline"
self._log(f" Internet: {internet_status}")
def _review_open_issues(self, system_hostname: str):
"""Review all open issues for this system and log status"""
if not self.issue_tracker:
return
open_issues = self.issue_tracker.list_issues(
hostname=system_hostname,
status="open"
)
if not open_issues:
self._log("No open issues in tracker")
return
self._log(f"\n{'='*60}")
self._log(f"OPEN ISSUES REVIEW ({len(open_issues)} active)")
self._log(f"{'='*60}")
for issue in open_issues:
issue_id = issue['issue_id'][:8] # Short ID
age_hours = self._calculate_issue_age(issue['created_at'])
inv_count = len(issue.get('investigations', []))
action_count = len(issue.get('actions', []))
self._log(f"\n Issue {issue_id}: {issue['title']}")
self._log(f" Severity: {issue['severity'].upper()}")
self._log(f" Status: {issue['status']}")
self._log(f" Age: {age_hours:.1f} hours")
self._log(f" Activity: {inv_count} investigations, {action_count} actions")
self._log(f" Description: {issue['description'][:100]}...")
self._log(f"{'='*60}\n")
def _track_or_update_issue(
self,
system_hostname: str,
issue_description: str,
severity: str = "medium"
) -> str:
"""
Find or create an issue for this problem.
Returns the issue_id.
"""
if not self.issue_tracker:
return None
# Try to find existing issue
title = issue_description[:100] # Use first 100 chars as title
existing = self.issue_tracker.find_similar_issue(
hostname=system_hostname,
title=title,
description=issue_description
)
if existing:
issue_id = existing['issue_id']
self._log(f"Linked to existing issue: {issue_id[:8]}")
return issue_id
# Create new issue
issue_id = self.issue_tracker.create_issue(
hostname=system_hostname,
title=title,
description=issue_description,
severity=severity,
source="auto-detected"
)
self._log(f"Created new issue: {issue_id[:8]}")
self.notifier.notify_issue_created(
issue_id[:8],
title,
severity
)
return issue_id
def _link_action_to_issue(
self,
issue_id: str,
fix_proposal: Dict[str, Any],
execution_result: Dict[str, Any]
):
"""Link an investigation or fix action to an issue"""
if not self.issue_tracker or not issue_id:
return
action_type = fix_proposal.get('action_type', 'unknown')
if action_type == 'investigation':
self.issue_tracker.update_issue(
issue_id,
status="investigating",
investigation={
"commands": fix_proposal.get('commands', []),
"output": execution_result.get('output', ''),
"success": execution_result.get('success', False),
"diagnosis": fix_proposal.get('diagnosis', '')
}
)
else:
self.issue_tracker.update_issue(
issue_id,
status="fixing",
action={
"proposed_action": fix_proposal.get('proposed_action', ''),
"commands": fix_proposal.get('commands', []),
"output": execution_result.get('output', ''),
"success": execution_result.get('success', False),
"risk_level": fix_proposal.get('risk_level', 'unknown')
}
)
def _auto_resolve_fixed_issues(self, system_hostname: str, detected_problems: List[str]):
"""Auto-resolve issues that are no longer detected"""
if not self.issue_tracker:
return
resolved_count = self.issue_tracker.auto_resolve_if_fixed(
system_hostname,
detected_problems
)
if resolved_count > 0:
self._log(f"\n✅ Auto-resolved {resolved_count} issue(s) (problems no longer detected)")
def _calculate_issue_age(self, created_at: str) -> float:
"""Calculate age of issue in hours"""
try:
from datetime import datetime
created = datetime.fromisoformat(created_at)
now = datetime.utcnow()
delta = now - created
return delta.total_seconds() / 3600
except:
return 0
def run_once(self) -> Dict[str, Any]:
"""Run one maintenance cycle"""
self._log("=== Starting maintenance cycle ===")
# Get system hostname
import socket
hostname = socket.gethostname()
system_hostname = f"{hostname}.coven.systems"
# Review open issues before starting new checks
self._review_open_issues(system_hostname)
# Discover new systems from journal logs
self._discover_new_systems()
# Update service registry periodically (every 10th cycle to avoid overhead)
if not hasattr(self, '_cycle_count'):
self._cycle_count = 0
self._cycle_count += 1
if self._cycle_count % 10 == 1: # First cycle and every 10th
self._update_service_registry()
# Refresh configuration repository every 3 cycles (~15 min) to keep git context current
# This ensures git_context has up-to-date information about recent config changes
if self._cycle_count % 3 == 1 and self.config_parser:
try:
self._log("Refreshing configuration repository...")
if self.config_parser.ensure_repo():
self._log("✓ Configuration repository updated")
# Reinitialize git_context if it exists to pick up fresh data
if self.git_context:
local_repo_path = Path("/var/lib/macha/config-repo")
self.git_context = GitContext(repo_path=str(local_repo_path))
else:
self._log("⚠ Could not refresh configuration repository")
except Exception as e:
self._log(f"⚠ Error refreshing config repo: {e}")
# Step 1: Monitor system
self._log("Collecting system health data...")
monitoring_data = self.monitor.collect_all()
self.monitor.save_snapshot(monitoring_data)
# Print detailed summary
summary = self.monitor.get_summary(monitoring_data)
self._log(f"\n{'='*60}")
self._log("SYSTEM HEALTH SUMMARY")
self._log(f"{'='*60}")
self._log(summary)
self._log(f"{'='*60}\n")
# Log key metrics for easy grepping
self._log_metrics(monitoring_data)
# Step 2: Analyze with AI (with system context including git)
self._log("\nAnalyzing system state with AI...")
import socket
hostname = socket.gethostname()
fqdn = f"{hostname}.coven.systems"
analysis = self.agent.analyze_system_state(
monitoring_data,
system_hostname=fqdn,
git_context=self.git_context if hasattr(self, 'git_context') else None
)
# Check if analysis was skipped due to queue already being busy
if isinstance(analysis, str) and "already in progress" in analysis.lower():
self._log("⏭️ Analysis skipped - autonomous check already in queue")
return
self._log(f"\n{'='*60}")
self._log("AI ANALYSIS RESULTS")
self._log(f"{'='*60}")
self._log(f"Overall Status: {analysis.get('status', 'unknown').upper()}")
self._log(f"Assessment: {analysis.get('overall_assessment', 'No assessment')}")
# Log detected issues
issues = analysis.get('issues', [])
if issues:
self._log(f"\nDetected {len(issues)} issue(s):")
for i, issue in enumerate(issues, 1):
severity = issue.get('severity', 'unknown')
category = issue.get('category', 'unknown')
description = issue.get('description', 'No description')
requires_action = issue.get('requires_action', False)
action_flag = "⚠️ ACTION REQUIRED" if requires_action else " Informational"
self._log(f"\n Issue #{i}:")
self._log(f" Severity: {severity.upper()}")
self._log(f" Category: {category}")
self._log(f" Description: {description}")
self._log(f" {action_flag}")
else:
self._log("\n✅ No issues detected")
# Log recommended actions
recommended_actions = analysis.get('recommended_actions', [])
if recommended_actions:
self._log(f"\nRecommended Actions ({len(recommended_actions)}):")
for action in recommended_actions:
self._log(f" - {action}")
self._log(f"{'='*60}\n")
# Send health summary notification for critical states
status = analysis.get('status', 'unknown')
if status == 'intervention_required':
self.notifier.notify_health_summary(
analysis.get('overall_assessment', 'System requires intervention'),
status
)
# Step 3: Handle issues
results = []
issues_requiring_action = [
issue for issue in analysis.get("issues", [])
if issue.get("requires_action", False)
]
if issues_requiring_action:
self._log(f"Found {len(issues_requiring_action)} issues requiring action")
for issue in issues_requiring_action:
self._log(f"\n{''*60}")
self._log(f"Addressing issue: {issue['description']}")
# Track or update issue in tracker
issue_id = self._track_or_update_issue(
system_hostname,
issue['description'],
severity=issue.get('severity', 'medium')
)
# Notify about critical issues
if issue.get('severity') == 'critical':
self.notifier.notify_critical_issue(
issue['description'],
f"Category: {issue.get('category', 'unknown')}"
)
# Check for recent investigations of this issue
previous_investigations = []
if self.context_db:
previous_investigations = self.context_db.get_recent_investigations(
issue["description"],
system_hostname,
hours=24
)
# Get fix proposal from AI
if previous_investigations:
self._log(f"Found {len(previous_investigations)} previous investigation(s) for this issue")
self._log("Requesting AI fix proposal with investigation history...")
else:
self._log("Requesting AI fix proposal...")
fix_proposal = self.agent.propose_fix(
issue["description"],
{
"monitoring_data": monitoring_data,
"issue": issue,
"previous_investigations": previous_investigations
}
)
# Log detailed fix proposal
self._log(f"\nAI FIX PROPOSAL:")
self._log(f" Diagnosis: {fix_proposal.get('diagnosis', 'No diagnosis')}")
self._log(f" Proposed Action: {fix_proposal.get('proposed_action', 'No proposal')}")
self._log(f" Action Type: {fix_proposal.get('action_type', 'unknown')}")
self._log(f" Risk Level: {fix_proposal.get('risk_level', 'unknown').upper()}")
if fix_proposal.get('commands'):
self._log(f" Commands to execute:")
for cmd in fix_proposal.get('commands', []):
self._log(f" - {cmd}")
if fix_proposal.get('reasoning'):
self._log(f" Reasoning: {fix_proposal.get('reasoning')}")
if fix_proposal.get('rollback_plan'):
self._log(f" Rollback Plan: {fix_proposal.get('rollback_plan')}")
# Execute or queue the fix
self._log("\nExecuting action...")
execution_result = self.executor.execute_action(
fix_proposal,
monitoring_data
)
# Log execution result
self._log(f"\nEXECUTION RESULT:")
self._log(f" Status: {execution_result.get('status', 'unknown').upper()}")
self._log(f" Executed: {'Yes' if execution_result.get('executed') else 'No'}")
if execution_result.get('reason'):
self._log(f" Reason: {execution_result.get('reason')}")
if execution_result.get('success') is not None:
success_icon = "" if execution_result.get('success') else ""
self._log(f" Success: {success_icon} {execution_result.get('success')}")
if execution_result.get("output"):
self._log(f" Output: {execution_result['output']}")
if execution_result.get("error"):
self._log(f" Error: {execution_result['error']}")
# Link action to issue
self._link_action_to_issue(issue_id, fix_proposal, execution_result)
# Store investigation results in ChromaDB
if (fix_proposal.get('action_type') == 'investigation' and
execution_result.get('executed') and
execution_result.get('output') and
self.context_db):
try:
self.context_db.store_investigation(
system=system_hostname,
issue_description=issue["description"],
commands=fix_proposal.get('commands', []),
output=execution_result['output']
)
self._log("Investigation results stored in database")
except Exception as e:
self._log(f"Warning: Could not store investigation: {e}")
# If this was an investigation that succeeded, analyze the results and propose actual fix
if (fix_proposal.get('action_type') == 'investigation' and
execution_result.get('executed') and
execution_result.get('success') and
execution_result.get('output')):
self._log("\n" + "="*60)
self._log("INVESTIGATION COMPLETE - Analyzing results...")
self._log("="*60)
# Build context with investigation results
investigation_context = {
"original_issue": issue["description"],
"investigation_output": execution_result['output'],
"monitoring_data": monitoring_data,
"issue": issue
}
# Ask AI to propose actual fix based on investigation
self._log("Requesting AI to propose fix based on investigation findings...")
actual_fix_proposal = self.agent.propose_fix(
f"Based on investigation of: {issue['description']}\n\nInvestigation output:\n{execution_result['output'][:1000]}",
investigation_context
)
# Log the new fix proposal
self._log(f"\nFIX PROPOSAL BASED ON INVESTIGATION:")
self._log(f" Diagnosis: {actual_fix_proposal.get('diagnosis', 'No diagnosis')}")
self._log(f" Proposed Action: {actual_fix_proposal.get('proposed_action', 'No proposal')}")
self._log(f" Action Type: {actual_fix_proposal.get('action_type', 'unknown')}")
self._log(f" Risk Level: {actual_fix_proposal.get('risk_level', 'unknown').upper()}")
if actual_fix_proposal.get('commands'):
self._log(f" Commands to execute:")
for cmd in actual_fix_proposal.get('commands', []):
self._log(f" - {cmd}")
# Only proceed with non-investigation actions
if actual_fix_proposal.get('action_type') != 'investigation':
self._log("\nExecuting follow-up action...")
followup_result = self.executor.execute_action(
actual_fix_proposal,
monitoring_data
)
self._log(f"\nFOLLOW-UP EXECUTION RESULT:")
self._log(f" Status: {followup_result.get('status', 'unknown').upper()}")
self._log(f" Executed: {'Yes' if followup_result.get('executed') else 'No'}")
if followup_result.get('status') == 'queued_for_approval':
self.notifier.notify_action_queued(
actual_fix_proposal.get('proposed_action', 'Unknown action'),
actual_fix_proposal.get('risk_level', 'unknown')
)
elif followup_result.get('executed'):
self.notifier.notify_action_executed(
actual_fix_proposal.get('proposed_action', 'Unknown action'),
followup_result.get('success', False)
)
# Store the follow-up result instead
execution_result = followup_result
else:
self._log("\nAI still recommends investigation - no further action taken.")
# Send notification based on execution result
if execution_result.get('status') == 'queued_for_approval':
self.notifier.notify_action_queued(
fix_proposal.get('proposed_action', 'Unknown action'),
fix_proposal.get('risk_level', 'unknown')
)
elif execution_result.get('executed'):
self.notifier.notify_action_executed(
fix_proposal.get('proposed_action', 'Unknown action'),
execution_result.get('success', False),
execution_result.get('output', '')
)
results.append({
"issue": issue,
"proposal": fix_proposal,
"execution": execution_result
})
else:
self._log("No issues requiring immediate action")
# Final summary
self._log(f"\n{'='*60}")
self._log("MAINTENANCE CYCLE COMPLETE")
self._log(f"{'='*60}")
self._log(f"Status: {analysis.get('status', 'unknown').upper()}")
self._log(f"Issues Found: {len(issues)}")
self._log(f"Actions Taken: {len(results)}")
if results:
executed = sum(1 for r in results if r.get('execution', {}).get('executed'))
queued = sum(1 for r in results if r.get('execution', {}).get('status') == 'queued_for_approval')
self._log(f" - Executed: {executed}")
self._log(f" - Queued for approval: {queued}")
# Auto-resolve issues that are no longer detected
detected_problems = [issue['description'] for issue in analysis.get('issues', [])]
self._auto_resolve_fixed_issues(system_hostname, detected_problems)
self._log(f"Next check in: {self.check_interval} seconds")
self._log(f"{'='*60}\n")
return {
"timestamp": datetime.now().isoformat(),
"monitoring": monitoring_data,
"analysis": analysis,
"actions": results
}
def run_continuous(self):
"""Run continuous maintenance loop"""
self._log(f"Starting Macha Autonomous System Maintenance")
self._log(f"Autonomy level: {self.autonomy_level}")
self._log(f"Check interval: {self.check_interval} seconds")
self._log(f"State directory: {self.state_dir}")
self.running = True
while self.running:
try:
cycle_result = self.run_once()
# Wait for next cycle
if self.running:
self._log(f"Waiting {self.check_interval} seconds until next check...")
time.sleep(self.check_interval)
except KeyboardInterrupt:
break
except Exception as e:
self._log(f"ERROR in maintenance cycle: {e}")
import traceback
self._log(traceback.format_exc())
# Wait a bit before retrying after error
if self.running:
time.sleep(60)
self._log("Macha Autonomous System Maintenance stopped")
def run_daemon(self):
"""Run as a background daemon"""
# TODO: Proper daemonization
self.run_continuous()
def main():
"""Main entry point"""
import argparse
parser = argparse.ArgumentParser(description="Macha Autonomous System Maintenance")
parser.add_argument(
"--mode",
choices=["once", "continuous", "daemon"],
default="once",
help="Run mode"
)
parser.add_argument(
"--autonomy",
choices=["observe", "suggest", "auto-safe", "auto-full"],
default="suggest",
help="Autonomy level"
)
parser.add_argument(
"--interval",
type=int,
default=300,
help="Check interval in seconds (for continuous mode)"
)
parser.add_argument(
"--config",
type=Path,
default=Path("/etc/macha-autonomous/config.json"),
help="Config file path"
)
args = parser.parse_args()
orchestrator = MachaOrchestrator(
check_interval=args.interval,
autonomy_level=args.autonomy,
config_file=args.config
)
if args.mode == "once":
result = orchestrator.run_once()
print(json.dumps(result, indent=2))
elif args.mode == "continuous":
orchestrator.run_continuous()
elif args.mode == "daemon":
orchestrator.run_daemon()
if __name__ == "__main__":
main()