diff --git a/chat.py b/chat.py index 2d3a08b..43c9974 100644 --- a/chat.py +++ b/chat.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 """ Interactive chat interface with Macha AI agent. -Allows conversational interaction and directive execution. +Unified chat/conversation interface using tool-calling architecture. """ import json @@ -10,7 +10,7 @@ import subprocess import sys from datetime import datetime from pathlib import Path -from typing import List, Dict, Any +from typing import List, Dict, Any, Optional # Add parent directory to path for imports sys.path.insert(0, str(Path(__file__).parent)) @@ -19,152 +19,34 @@ from agent import MachaAgent class MachaChatSession: - """Interactive chat session with Macha""" + """Interactive chat session with Macha using tool-calling architecture""" - def __init__(self): - self.agent = MachaAgent(use_queue=True, priority="INTERACTIVE") + def __init__( + self, + ollama_host: str = "http://localhost:11434", + model: str = "gpt-oss:latest", + state_dir: Path = Path("/var/lib/macha"), + enable_tools: bool = True + ): + """Initialize chat session with Macha + + Args: + ollama_host: Ollama API endpoint + model: Model name to use + state_dir: State directory for agent + enable_tools: Whether to enable tool calling (should always be True) + """ + self.agent = MachaAgent( + ollama_host=ollama_host, + model=model, + state_dir=state_dir, + enable_tools=enable_tools, + use_queue=True, + priority="INTERACTIVE" + ) self.conversation_history: List[Dict[str, str]] = [] self.session_start = datetime.now().isoformat() - def _create_chat_prompt(self, user_message: str) -> str: - """Create a prompt for the chat session""" - - # Build conversation context - context = "" - if self.conversation_history: - context = "\n\nCONVERSATION HISTORY:\n" - for entry in self.conversation_history[-10:]: # Last 10 messages - role = entry['role'].upper() - msg = entry['message'] - context += f"{role}: {msg}\n" - - prompt = f"""{MachaAgent.SYSTEM_PROMPT} - -TASK: INTERACTIVE CHAT SESSION - -You are in an interactive chat session with the system administrator. -You can have a natural conversation and execute commands when directed. - -CAPABILITIES: -- Answer questions about system status -- Explain configurations and issues -- Execute commands when explicitly asked -- Provide guidance and recommendations - -COMMAND EXECUTION: -When the user asks you to run a command or perform an action that requires execution: -1. Respond with a JSON object containing the command to execute -2. Format: {{"action": "execute", "command": "the command", "explanation": "why you're running it"}} -3. After seeing the output, continue the conversation naturally - -RESPONSE FORMAT: -- For normal conversation: Respond naturally in plain text -- For command execution: Respond with JSON containing action/command/explanation -- Keep responses concise but informative - -RULES: -- Only execute commands when explicitly asked or when it's clearly needed -- Explain what you're about to do before executing -- Never execute destructive commands without explicit confirmation -- If unsure, ask for clarification -{context} - -USER: {user_message} - -MACHA:""" - - return prompt - - def _execute_command(self, command: str) -> Dict[str, Any]: - """Execute a shell command and return results""" - try: - result = subprocess.run( - command, - shell=True, - capture_output=True, - text=True, - timeout=30 - ) - - # Check if command failed due to permissions - needs_sudo = False - permission_errors = [ - 'Interactive authentication required', - 'Permission denied', - 'Operation not permitted', - 'Must be root', - 'insufficient privileges', - 'authentication is required' - ] - - if result.returncode != 0: - error_text = (result.stderr + result.stdout).lower() - for perm_error in permission_errors: - if perm_error.lower() in error_text: - needs_sudo = True - break - - # Retry with sudo if permission error detected - if needs_sudo and not command.strip().startswith('sudo'): - print(f"\nāš ļø Permission denied, retrying with sudo...") - sudo_command = f"sudo {command}" - result = subprocess.run( - sudo_command, - shell=True, - capture_output=True, - text=True, - timeout=30 - ) - - return { - 'success': result.returncode == 0, - 'exit_code': result.returncode, - 'stdout': result.stdout, - 'stderr': result.stderr, - 'command': sudo_command, - 'retried_with_sudo': True - } - - return { - 'success': result.returncode == 0, - 'exit_code': result.returncode, - 'stdout': result.stdout, - 'stderr': result.stderr, - 'command': command, - 'retried_with_sudo': False - } - except subprocess.TimeoutExpired: - return { - 'success': False, - 'exit_code': -1, - 'stdout': '', - 'stderr': 'Command timed out after 30 seconds', - 'command': command, - 'retried_with_sudo': False - } - except Exception as e: - return { - 'success': False, - 'exit_code': -1, - 'stdout': '', - 'stderr': str(e), - 'command': command, - 'retried_with_sudo': False - } - - def _parse_response(self, response: str) -> Dict[str, Any]: - """Parse AI response to determine if it's a command or text""" - try: - # Try to parse as JSON - parsed = json.loads(response.strip()) - if isinstance(parsed, dict) and 'action' in parsed: - return parsed - except json.JSONDecodeError: - pass - - # It's plain text conversation - return {'action': 'chat', 'message': response} - def _auto_diagnose_ollama(self) -> str: """Automatically diagnose Ollama issues""" diagnostics = [] @@ -241,8 +123,16 @@ MACHA:""" return "\n".join(diagnostics) - def process_message(self, user_message: str) -> str: - """Process a user message and return Macha's response""" + def process_message(self, user_message: str, verbose: bool = False) -> str: + """Process a user message and return Macha's response + + Args: + user_message: The user's message + verbose: Whether to show detailed token counts + + Returns: + Macha's response + """ # Add user message to history self.conversation_history.append({ @@ -258,14 +148,13 @@ MACHA:""" knowledge_context = self.agent._query_relevant_knowledge(user_message, limit=3) # Add recent conversation history (last 15 messages to stay within context limits) - # With tool calling, messages grow quickly, so we limit more aggressively - recent_history = self.conversation_history[-15:] # Last ~7 exchanges + recent_history = self.conversation_history[-15:] for entry in recent_history: content = entry['message'] # Truncate very long messages (e.g., command outputs) if len(content) > 3000: content = content[:1500] + "\n... [message truncated] ...\n" + content[-1500:] - # Add knowledge context to first user message if available + # Add knowledge context to last user message if available if entry == recent_history[-1] and knowledge_context: content += knowledge_context messages.append({ @@ -273,9 +162,22 @@ MACHA:""" "content": content }) + if verbose: + # Estimate tokens for debugging + total_chars = sum(len(json.dumps(m)) for m in messages) + estimated_tokens = total_chars // 4 + print(f"[Context: {estimated_tokens:,} tokens, {len(messages)} messages]") + try: - # Use tool-aware chat API - ai_response = self.agent._query_ollama_with_tools(messages) + # Use tool-aware chat API - this handles all tool calling automatically + response_data = self.agent._query_ollama_with_tools( + messages, + tool_definitions=self.agent.tools.get_tool_definitions() if self.agent.enable_tools else [] + ) + + # Extract the final response + ai_response = response_data.get("content", "") + except Exception as e: error_msg = ( f"āŒ CRITICAL: Failed to communicate with Ollama inference engine\n\n" @@ -298,91 +200,16 @@ MACHA:""" diagnostics = self._auto_diagnose_ollama() return error_msg + "\n" + diagnostics - # Check if Ollama returned an error - try: - error_check = json.loads(ai_response) - if isinstance(error_check, dict) and 'error' in error_check: - error_msg = ( - f"āŒ Ollama API Error\n\n" - f"Error: {error_check.get('error', 'Unknown error')}\n" - f"Diagnosis: {error_check.get('diagnosis', 'No details')}\n\n" - ) - # Auto-diagnose the issue - diagnostics = self._auto_diagnose_ollama() - return error_msg + "\n" + diagnostics - except json.JSONDecodeError: - # Not JSON, it's a normal response - pass + # Add response to history + self.conversation_history.append({ + 'role': 'assistant', + 'message': ai_response, + 'timestamp': datetime.now().isoformat() + }) - # Parse response - parsed = self._parse_response(ai_response) - - if parsed.get('action') == 'execute': - # AI wants to execute a command - command = parsed.get('command', '') - explanation = parsed.get('explanation', '') - - # Show what we're about to do - response = f"šŸ”§ {explanation}\n\nExecuting: `{command}`\n\n" - - # Execute the command - result = self._execute_command(command) - - # Show if we retried with sudo - if result.get('retried_with_sudo'): - response += f"āš ļø Permission denied, retried as: `{result['command']}`\n\n" - - if result['success']: - response += "āœ… Command succeeded:\n" - if result['stdout']: - response += f"```\n{result['stdout']}\n```" - else: - response += "(no output)" - else: - response += f"āŒ Command failed (exit code {result['exit_code']}):\n" - if result['stderr']: - response += f"```\n{result['stderr']}\n```" - elif result['stdout']: - response += f"```\n{result['stdout']}\n```" - - # Add command execution to history - self.conversation_history.append({ - 'role': 'macha', - 'message': response, - 'timestamp': datetime.now().isoformat(), - 'command_result': result - }) - - # Now ask AI to respond to the command output - followup_prompt = f"""The command completed. Here's what happened: - -Command: {command} -Success: {result['success']} -Output: {result['stdout'][:500] if result['stdout'] else '(none)'} -Error: {result['stderr'][:500] if result['stderr'] else '(none)'} - -Please provide a brief analysis or next steps.""" - - followup_response = self.agent._query_ollama(followup_prompt) - - if followup_response: - response += f"\n\n{followup_response}" - - return response - - else: - # Normal conversation response - message = parsed.get('message', ai_response) - - self.conversation_history.append({ - 'role': 'macha', - 'message': message, - 'timestamp': datetime.now().isoformat() - }) - - return message + return ai_response - def run(self): + def run_interactive(self): """Run the interactive chat session""" print("=" * 70) print("🌐 MACHA INTERACTIVE CHAT") @@ -425,9 +252,6 @@ Please provide a brief analysis or next steps.""" continue elif user_input.lower() == '/debug': - import os - import subprocess - print("\n" + "=" * 70) print("MACHA ARCHITECTURE & STATUS") print("=" * 70) @@ -453,19 +277,18 @@ Please provide a brief analysis or next steps.""" except: print(f" Sudo Access: āŒ No") - print(f" Note: Chat runs as invoking user (you), not as macha-autonomous") + print(f" Note: Chat runs as invoking user (you), using macha's tools") print("\n🧠 INFERENCE ENGINE:") print(f" Backend: Ollama") print(f" Host: {self.agent.ollama_host}") print(f" Model: {self.agent.model}") print(f" Service: ollama.service (systemd)") + print(f" Queue Worker: ollama-queue-worker.service") print("\nšŸ’¾ DATABASE:") print(f" Backend: ChromaDB") - print(f" Host: http://localhost:8000") - print(f" Data: /var/lib/chromadb") - print(f" Service: chromadb.service (systemd)") + print(f" State: {self.agent.state_dir}") print("\nšŸ” OLLAMA STATUS:") # Try to query Ollama status @@ -488,6 +311,12 @@ Please provide a brief analysis or next steps.""" print(f" Status: āŒ Cannot connect: {e}") print(f" Hint: Check 'systemctl status ollama.service'") + print("\nšŸ› ļø TOOLS:") + print(f" Enabled: {self.agent.enable_tools}") + if self.agent.enable_tools: + print(f" Available tools: {len(self.agent.tools.get_tool_definitions())}") + print(f" Architecture: Centralized command_patterns.py") + print("\nšŸ’” CONVERSATION:") print(f" History: {len(self.conversation_history)} messages") print(f" Session started: {self.session_start}") @@ -497,7 +326,7 @@ Please provide a brief analysis or next steps.""" # Process the message print("\nšŸ¤– MACHA: ", end='', flush=True) - response = self.process_message(user_input) + response = self.process_message(user_input, verbose=False) print(response) except KeyboardInterrupt: @@ -508,15 +337,48 @@ Please provide a brief analysis or next steps.""" break except Exception as e: print(f"\nāŒ Error: {e}") + import traceback + traceback.print_exc() continue + + def ask_once(self, question: str, verbose: bool = True) -> str: + """Ask a single question and return the response (for macha-ask command) + + Args: + question: The question to ask + verbose: Whether to show detailed context information + + Returns: + Macha's response + """ + response = self.process_message(question, verbose=verbose) + return response def main(): - """Main entry point""" + """Main entry point for macha-chat""" session = MachaChatSession() - session.run() + session.run_interactive() + + +def ask_main(): + """Entry point for macha-ask""" + if len(sys.argv) < 2: + print("Usage: macha-ask ", file=sys.stderr) + sys.exit(1) + + question = " ".join(sys.argv[1:]) + session = MachaChatSession() + + response = session.ask_once(question, verbose=True) + + print("\n" + "=" * 60) + print("MACHA:") + print("=" * 60) + print(response) + print("=" * 60) + print() if __name__ == "__main__": main() - diff --git a/conversation.py b/conversation.py index a1c2be3..4f54cb0 100644 --- a/conversation.py +++ b/conversation.py @@ -1,328 +1,12 @@ #!/usr/bin/env python3 """ -Conversational Interface - Allows questioning Macha about decisions and system state +Macha conversation interface - legacy compatibility wrapper. +This module now uses the unified chat.py implementation. """ -import json -import requests -from typing import Dict, List, Any, Optional -from pathlib import Path -from datetime import datetime -from agent import MachaAgent - - -class MachaConversation: - """Conversational interface for Macha""" - - def __init__( - self, - ollama_host: str = "http://localhost:11434", - model: str = "gpt-oss:latest", - state_dir: Path = Path("/var/lib/macha") - ): - self.ollama_host = ollama_host - self.model = model - self.state_dir = state_dir - self.decision_log = self.state_dir / "decisions.jsonl" - self.approval_queue = self.state_dir / "approval_queue.json" - self.orchestrator_log = self.state_dir / "orchestrator.log" - - # Initialize agent with tool support and queue - self.agent = MachaAgent( - ollama_host=ollama_host, - model=model, - state_dir=state_dir, - enable_tools=True, - use_queue=True, - priority="INTERACTIVE" - ) - - def ask(self, question: str, include_context: bool = True) -> str: - """Ask Macha a question with optional system context""" - - context = "" - if include_context: - context = self._gather_context() - - # Build messages for tool-aware chat - content = self._create_conversational_prompt(question, context) - messages = [{"role": "user", "content": content}] - - response = self.agent._query_ollama_with_tools(messages) - - return response - - def discuss_action(self, action_index: int) -> str: - """Discuss a specific queued action by its queue position (0-based index)""" - - action = self._get_action_from_queue(action_index) - if not action: - return f"No action found at queue position {action_index}. Use 'macha-approve list' to see available actions." - - context = self._gather_context() - action_context = json.dumps(action, indent=2) - - content = f"""TASK: DISCUSS PROPOSED ACTION -================================================================================ - -A user is asking about a proposed action in your approval queue. - -QUEUED ACTION (Queue Position #{action_index}): -{action_context} - -RECENT SYSTEM CONTEXT: -{context} - -The user wants to discuss this action. Explain: -1. Why you proposed this action -2. What problem it solves -3. The risks involved -4. What could go wrong -5. Alternative approaches if any - -Be conversational, helpful, and honest about uncertainties. -""" - - messages = [{"role": "user", "content": content}] - return self.agent._query_ollama_with_tools(messages) - - def _gather_context(self) -> str: - """Gather relevant system context for the conversation""" - - context_parts = [] - - # System infrastructure from ChromaDB - try: - from context_db import ContextDatabase - db = ContextDatabase() - systems = db.get_all_systems() - - if systems: - context_parts.append("INFRASTRUCTURE:") - for system in systems: - context_parts.append(f" - {system['hostname']} ({system.get('type', 'unknown')})") - if system.get('config_repo'): - context_parts.append(f" Config Repo: {system['config_repo']}") - context_parts.append(f" Branch: {system.get('config_branch', 'unknown')}") - if system.get('capabilities'): - context_parts.append(f" Capabilities: {', '.join(system['capabilities'])}") - except Exception as e: - # ChromaDB not available, skip - pass - - # Recent decisions - recent_decisions = self._get_recent_decisions(5) - if recent_decisions: - context_parts.append("\nRECENT DECISIONS:") - for i, dec in enumerate(recent_decisions, 1): - timestamp = dec.get("timestamp", "unknown") - analysis = dec.get("analysis", {}) - status = analysis.get("status", "unknown") - context_parts.append(f"{i}. [{timestamp}] Status: {status}") - if "issues" in analysis: - for issue in analysis.get("issues", [])[:3]: - context_parts.append(f" - {issue.get('description', 'N/A')}") - - # Pending approvals - pending = self._get_pending_approvals() - if pending: - context_parts.append(f"\nPENDING APPROVALS: {len(pending)} action(s) awaiting approval") - - # Recent log excerpts (last 10 lines) - recent_logs = self._get_recent_logs(10) - if recent_logs: - context_parts.append("\nRECENT LOG ENTRIES:") - context_parts.extend(recent_logs) - - return "\n".join(context_parts) - - def _create_conversational_prompt(self, question: str, context: str) -> str: - """Create a conversational prompt""" - - return f"""{MachaAgent.SYSTEM_PROMPT} - -TASK: ANSWER QUESTION -================================================================================ - -You monitor system health, analyze issues using AI, and propose fixes. Be helpful, -honest about what you know and don't know, and reference the context provided below. - -SYSTEM CONTEXT: -{context if context else "No recent activity"} - -USER QUESTION: -{question} - -Respond conversationally and helpfully. If the question is about your recent decisions -or actions, reference the context above. If you don't have enough information, say so. -Keep responses concise but informative. -""" - - def _query_ollama(self, prompt: str, temperature: float = 0.7) -> str: - """Query Ollama API""" - try: - response = requests.post( - f"{self.ollama_host}/api/generate", - json={ - "model": self.model, - "prompt": prompt, - "stream": False, - "temperature": temperature, - }, - timeout=60 - ) - response.raise_for_status() - return response.json().get("response", "") - except requests.exceptions.HTTPError as e: - error_detail = "" - try: - error_detail = f" - {response.text}" - except: - pass - return f"Error: Ollama returned HTTP {response.status_code}{error_detail}" - except Exception as e: - return f"Error querying Ollama: {str(e)}" - - def _get_recent_decisions(self, count: int = 5) -> List[Dict[str, Any]]: - """Get recent decisions from log""" - if not self.decision_log.exists(): - return [] - - decisions = [] - try: - with open(self.decision_log, 'r') as f: - for line in f: - if line.strip(): - try: - decisions.append(json.loads(line)) - except: - pass - except: - pass - - return decisions[-count:] - - def _get_pending_approvals(self) -> List[Dict[str, Any]]: - """Get pending approvals from queue""" - if not self.approval_queue.exists(): - return [] - - try: - with open(self.approval_queue, 'r') as f: - data = json.load(f) - # Queue is a JSON array, not an object with "pending" key - if isinstance(data, list): - return data - return data.get("pending", []) - except: - return [] - - def _get_action_from_queue(self, action_index: int) -> Optional[Dict[str, Any]]: - """Get a specific action from the queue by index""" - pending = self._get_pending_approvals() - if 0 <= action_index < len(pending): - return pending[action_index] - return None - - def _get_recent_logs(self, count: int = 10) -> List[str]: - """Get recent orchestrator log lines""" - if not self.orchestrator_log.exists(): - return [] - - try: - with open(self.orchestrator_log, 'r') as f: - lines = f.readlines() - return [line.strip() for line in lines[-count:] if line.strip()] - except: - return [] - +# Import the unified implementation +from chat import ask_main +# Entry point if __name__ == "__main__": - import sys - import argparse - - parser = argparse.ArgumentParser(description="Ask Macha a question or discuss an action") - parser.add_argument("--discuss", type=int, metavar="ACTION_ID", help="Discuss a specific queued action") - parser.add_argument("--follow-up", type=str, metavar="QUESTION", help="Follow-up question about the action") - parser.add_argument("question", nargs="*", help="Your question for Macha") - parser.add_argument("--no-context", action="store_true", help="Don't include system context") - - args = parser.parse_args() - - # Load config if available - config_file = Path("/etc/macha-autonomous/config.json") - ollama_host = "http://localhost:11434" - model = "gpt-oss:latest" - - if config_file.exists(): - try: - with open(config_file, 'r') as f: - config = json.load(f) - ollama_host = config.get("ollama_host", ollama_host) - model = config.get("model", model) - except: - pass - - conversation = MachaConversation( - ollama_host=ollama_host, - model=model - ) - - if args.discuss is not None: - if args.follow_up: - # Follow-up question about a specific action - action = conversation._get_action_from_queue(args.discuss) - if not action: - print(f"No action found at queue position {args.discuss}. Use 'macha-approve list' to see available actions.") - sys.exit(1) - - # Build context with the action details - action_context = f""" -QUEUED ACTION #{args.discuss}: -Diagnosis: {action.get('proposal', {}).get('diagnosis', 'N/A')} -Proposed Action: {action.get('proposal', {}).get('proposed_action', 'N/A')} -Action Type: {action.get('proposal', {}).get('action_type', 'N/A')} -Risk Level: {action.get('proposal', {}).get('risk_level', 'N/A')} -Commands: {json.dumps(action.get('proposal', {}).get('commands', []), indent=2)} -Reasoning: {action.get('proposal', {}).get('reasoning', 'N/A')} - -FOLLOW-UP QUESTION: -{args.follow_up} -""" - - # Query the AI with the action context - response = conversation._query_ollama(f"""{MachaAgent.SYSTEM_PROMPT} - -TASK: ANSWER FOLLOW-UP QUESTION ABOUT QUEUED ACTION -================================================================================ - -You are answering a follow-up question about a proposed fix that is awaiting approval. -Be helpful and answer directly. If the user is concerned about risks, explain them clearly. -If they ask about alternatives, suggest them. - -{action_context} - -RESPOND CONCISELY AND DIRECTLY. -""") - - else: - # Initial discussion about the action - response = conversation.discuss_action(args.discuss) - elif args.question: - # Ask a general question - question = " ".join(args.question) - response = conversation.ask(question, include_context=not args.no_context) - else: - parser.print_help() - sys.exit(1) - - # Only print formatted output for initial discussion, not for follow-ups - if args.follow_up: - print(response) - else: - print("\n" + "="*60) - print("MACHA:") - print("="*60) - print(response) - print("="*60 + "\n") - + ask_main() diff --git a/module.nix b/module.nix index 37c019c..716c9f2 100644 --- a/module.nix +++ b/module.nix @@ -507,7 +507,7 @@ print('='*60) " '') - # Tool to ask Macha questions + # Tool to ask Macha questions (unified with macha-chat, uses ask_main entry point) (pkgs.writeScriptBin "macha-ask" '' #!${pkgs.bash}/bin/bash if [ $# -eq 0 ]; then @@ -515,7 +515,8 @@ print('='*60) echo "Example: macha-ask Why did you recommend restarting that service?" exit 1 fi - sudo -u ${cfg.user} ${pkgs.coreutils}/bin/env CHROMA_ENV_FILE="" ANONYMIZED_TELEMETRY="False" ${pythonEnv}/bin/python3 ${./.}/conversation.py "$@" + # Run as macha user with ask_main entry point from chat.py + sudo -u ${cfg.user} ${pkgs.coreutils}/bin/env PYTHONPATH=${toString ./.} CHROMA_ENV_FILE="" ANONYMIZED_TELEMETRY="False" ${pythonEnv}/bin/python3 -c "from chat import ask_main; ask_main()" "$@" '') # Issue tracking CLI