Macha is now a standalone NixOS flake that can be imported into other systems. This provides: - Independent versioning - Easier reusability - Cleaner separation of concerns - Better development workflow Includes: - Complete autonomous system code - NixOS module with full configuration options - Queue-based architecture with priority system - Chunked map-reduce for large outputs - ChromaDB knowledge base - Tool calling system - Multi-host SSH management - Gotify notification integration All capabilities from DESIGN.md are preserved.
220 lines
7.2 KiB
Python
220 lines
7.2 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Issue Tracker - Internal ticketing system for tracking problems and their resolution
|
|
"""
|
|
|
|
import json
|
|
import uuid
|
|
from datetime import datetime
|
|
from typing import Dict, List, Any, Optional
|
|
from pathlib import Path
|
|
|
|
|
|
class IssueTracker:
|
|
"""Manages issue lifecycle: detection -> investigation -> resolution"""
|
|
|
|
def __init__(self, context_db, log_dir: str = "/var/lib/macha/logs"):
|
|
self.context_db = context_db
|
|
self.log_dir = Path(log_dir)
|
|
self.log_dir.mkdir(parents=True, exist_ok=True)
|
|
self.closed_log = self.log_dir / "closed_issues.jsonl"
|
|
|
|
def create_issue(
|
|
self,
|
|
hostname: str,
|
|
title: str,
|
|
description: str,
|
|
severity: str = "medium",
|
|
source: str = "auto-detected"
|
|
) -> str:
|
|
"""Create a new issue and return its ID"""
|
|
issue_id = str(uuid.uuid4())
|
|
now = datetime.utcnow().isoformat()
|
|
|
|
issue = {
|
|
"issue_id": issue_id,
|
|
"hostname": hostname,
|
|
"title": title,
|
|
"description": description,
|
|
"status": "open",
|
|
"severity": severity,
|
|
"created_at": now,
|
|
"updated_at": now,
|
|
"source": source,
|
|
"investigations": [],
|
|
"actions": [],
|
|
"resolution": None
|
|
}
|
|
|
|
self.context_db.store_issue(issue)
|
|
return issue_id
|
|
|
|
def get_issue(self, issue_id: str) -> Optional[Dict[str, Any]]:
|
|
"""Retrieve an issue by ID"""
|
|
return self.context_db.get_issue(issue_id)
|
|
|
|
def update_issue(
|
|
self,
|
|
issue_id: str,
|
|
status: Optional[str] = None,
|
|
investigation: Optional[Dict[str, Any]] = None,
|
|
action: Optional[Dict[str, Any]] = None
|
|
) -> bool:
|
|
"""Update an issue with new information"""
|
|
issue = self.get_issue(issue_id)
|
|
if not issue:
|
|
return False
|
|
|
|
if status:
|
|
issue["status"] = status
|
|
|
|
if investigation:
|
|
investigation["timestamp"] = datetime.utcnow().isoformat()
|
|
issue["investigations"].append(investigation)
|
|
|
|
if action:
|
|
action["timestamp"] = datetime.utcnow().isoformat()
|
|
issue["actions"].append(action)
|
|
|
|
issue["updated_at"] = datetime.utcnow().isoformat()
|
|
|
|
self.context_db.update_issue(issue)
|
|
return True
|
|
|
|
def find_similar_issue(
|
|
self,
|
|
hostname: str,
|
|
title: str,
|
|
description: str = None
|
|
) -> Optional[Dict[str, Any]]:
|
|
"""Find an existing open issue that matches this problem"""
|
|
open_issues = self.list_issues(hostname=hostname, status="open")
|
|
|
|
# Simple similarity check on title
|
|
title_lower = title.lower()
|
|
for issue in open_issues:
|
|
issue_title_lower = issue.get("title", "").lower()
|
|
|
|
# Check for keyword overlap
|
|
title_words = set(title_lower.split())
|
|
issue_words = set(issue_title_lower.split())
|
|
|
|
# If >50% of words overlap, consider it similar
|
|
if len(title_words & issue_words) / max(len(title_words), 1) > 0.5:
|
|
return issue
|
|
|
|
return None
|
|
|
|
def list_issues(
|
|
self,
|
|
hostname: Optional[str] = None,
|
|
status: Optional[str] = None,
|
|
severity: Optional[str] = None
|
|
) -> List[Dict[str, Any]]:
|
|
"""List issues with optional filters"""
|
|
return self.context_db.list_issues(
|
|
hostname=hostname,
|
|
status=status,
|
|
severity=severity
|
|
)
|
|
|
|
def resolve_issue(self, issue_id: str, resolution: str) -> bool:
|
|
"""Mark an issue as resolved with a resolution note"""
|
|
issue = self.get_issue(issue_id)
|
|
if not issue:
|
|
return False
|
|
|
|
issue["status"] = "resolved"
|
|
issue["resolution"] = resolution
|
|
issue["updated_at"] = datetime.utcnow().isoformat()
|
|
|
|
self.context_db.update_issue(issue)
|
|
return True
|
|
|
|
def close_issue(self, issue_id: str) -> bool:
|
|
"""Archive a resolved issue to the closed log"""
|
|
issue = self.get_issue(issue_id)
|
|
if not issue:
|
|
return False
|
|
|
|
# Can only close resolved issues
|
|
if issue["status"] != "resolved":
|
|
return False
|
|
|
|
issue["status"] = "closed"
|
|
issue["closed_at"] = datetime.utcnow().isoformat()
|
|
|
|
# Archive to closed log
|
|
self._archive_issue(issue)
|
|
|
|
# Remove from active database
|
|
self.context_db.delete_issue(issue_id)
|
|
|
|
return True
|
|
|
|
def get_issue_history(self, issue_id: str) -> Dict[str, Any]:
|
|
"""Get full history for an issue (investigations + actions)"""
|
|
issue = self.get_issue(issue_id)
|
|
if not issue:
|
|
return {}
|
|
|
|
return {
|
|
"issue": issue,
|
|
"investigation_count": len(issue.get("investigations", [])),
|
|
"action_count": len(issue.get("actions", [])),
|
|
"age_hours": self._calculate_age(issue["created_at"]),
|
|
"last_activity": issue["updated_at"]
|
|
}
|
|
|
|
def auto_resolve_if_fixed(self, hostname: str, detected_problems: List[str]) -> int:
|
|
"""
|
|
Auto-resolve open issues if their problems are no longer detected.
|
|
Returns count of auto-resolved issues.
|
|
"""
|
|
open_issues = self.list_issues(hostname=hostname, status="open")
|
|
resolved_count = 0
|
|
|
|
# Convert detected problems to lowercase for comparison
|
|
detected_lower = [p.lower() for p in detected_problems]
|
|
|
|
for issue in open_issues:
|
|
title_lower = issue.get("title", "").lower()
|
|
desc_lower = issue.get("description", "").lower()
|
|
|
|
# Check if issue keywords are still in detected problems
|
|
still_present = False
|
|
for detected in detected_lower:
|
|
if any(word in detected for word in title_lower.split()) or \
|
|
any(word in detected for word in desc_lower.split()):
|
|
still_present = True
|
|
break
|
|
|
|
# If problem is no longer detected, auto-resolve
|
|
if not still_present:
|
|
self.resolve_issue(
|
|
issue["issue_id"],
|
|
"Auto-resolved: Problem no longer detected in system monitoring"
|
|
)
|
|
resolved_count += 1
|
|
|
|
return resolved_count
|
|
|
|
def _archive_issue(self, issue: Dict[str, Any]):
|
|
"""Append closed issue to the archive log"""
|
|
try:
|
|
with open(self.closed_log, "a") as f:
|
|
f.write(json.dumps(issue) + "\n")
|
|
except Exception as e:
|
|
print(f"Failed to archive issue {issue.get('issue_id')}: {e}")
|
|
|
|
def _calculate_age(self, created_at: str) -> float:
|
|
"""Calculate age of issue in hours"""
|
|
try:
|
|
created = datetime.fromisoformat(created_at)
|
|
now = datetime.utcnow()
|
|
delta = now - created
|
|
return delta.total_seconds() / 3600
|
|
except:
|
|
return 0
|
|
|