#!/usr/bin/env python3 import subprocess import json import time import os import base64 import hashlib import mimetypes import platform import re import jwt import secrets from datetime import datetime, timedelta, timezone import threading import socket from http.server import HTTPServer, BaseHTTPRequestHandler from urllib.parse import parse_qs, urlparse, unquote # Optional: Load environment variables from .env file if python-dotenv is available try: from dotenv import load_dotenv load_dotenv() # Load environment variables from .env file if it exists print("Successfully loaded .env file") except ImportError: print("python-dotenv not installed. Using environment variables directly.") # Initialize mime types mimetypes.init() # --- Configuration --- # Load JWT Secret from environment variable or use a secure fallback # Generate one using: python -c "import secrets; print(secrets.token_hex(32))" JWT_SECRET_KEY = os.environ.get("JWT_SECRET", secrets.token_hex(32)) JWT_ALGORITHM = "HS256" JWT_EXPIRATION_HOURS = int(os.environ.get("JWT_EXPIRATION_HOURS", "8")) SERVER_PORT = int(os.environ.get("SERVER_PORT", "8000")) SNMP_COLLECTION_INTERVAL = int(os.environ.get("SNMP_COLLECTION_INTERVAL", "10")) # Authentication Configuration default_password = secrets.token_urlsafe(16) # Generate a secure random password as fallback AUTH_CONFIG = { 'password': os.environ.get('AUTH_PASSWORD', default_password), 'password_hash': None # Will be set on startup } # SNMP Configuration SNMP_HOST = os.environ.get("SNMP_HOST", "127.0.0.1") SNMP_COMMUNITY = os.environ.get("SNMP_COMMUNITY", "public") SNMP_VERSION = os.environ.get("SNMP_VERSION", "2c") # Data storage - in memory for simplicity metrics_data = {} DATA_POINTS_TO_KEEP = 60 data_lock = threading.Lock() # --- New Data Structures for App Performance --- request_log = [] # Stores tuples of (timestamp, duration_ms, status_code) request_log_lock = threading.Lock() app_perf_data = {} # Stores calculated app performance metrics APP_PERF_POINTS_TO_KEEP = 60 # --- End New Data Structures --- # Define metrics to collect METRICS = [ {"name": "network_in", "oid": "ifInOctets.1", "label": "Network In (bytes)"}, {"name": "network_out", "oid": "ifOutOctets.1", "label": "Network Out (bytes)"}, # {"name": "system_uptime", "oid": "sysUpTime.0", "label": "System Uptime"}, {"name": "system_processes", "oid": "hrSystemProcesses.0", "label": "System Processes"}, # --- Placeholders for New Metrics --- # App Performance - calculated, not SNMP {"name": "app_response_time_avg", "label": "Avg Response Time (ms)", "type": "calculated"}, {"name": "app_error_rate", "label": "Error Rate (%)", "type": "calculated"}, {"name": "app_request_count", "label": "Request Count", "type": "calculated"}, # Service Availability - calculated, not SNMP {"name": "service_http_status", "label": "HTTP Service Status", "type": "calculated"}, {"name": "service_ssh_status", "label": "SSH Service Status", "type": "calculated"}, # Add other services here if needed # --- End Placeholders --- ] # Map of alternate OIDs ALTERNATE_OIDS = { "cpu_load": [ "laLoad.1", "ssCpuRawUser.0", "hrProcessorLoad.0", "systemStats.ssCpuUser.0" ], "memory_total": [ "hrMemorySize.0", "memTotalReal.0", "hrStorageSize.1" ], "memory_used": [ "memAvailReal.0", "hrStorageUsed.1", "hrSWRunPerfMem.1" ] } # --- End Configuration --- # --- Utility Functions --- def run_snmp_command(oid): """Run SNMP command and return the value""" try: # Format command according to OpenBSD snmp manual # snmp get [options] agent oid ... cmd = [ "snmp", "get", "-v", SNMP_VERSION, "-c", SNMP_COMMUNITY, SNMP_HOST, oid ] # Print the command being executed for debugging print(f"Executing: {' '.join(cmd)}") # Don't use check=True so we can capture error output result = subprocess.run(cmd, capture_output=True, text=True) # Check for errors if result.returncode != 0: print(f"SNMP Error: {result.stderr.strip()}") # Try with doas if permissions might be an issue try: cmd_with_doas = ["doas"] + cmd print(f"Retrying with doas: {' '.join(cmd_with_doas)}") result = subprocess.run(cmd_with_doas, capture_output=True, text=True) if result.returncode != 0: print(f"SNMP Error with doas: {result.stderr.strip()}") return None except Exception as e: print(f"Failed to run with doas: {e}") return None # Parse the output to extract just the value output = result.stdout.strip() # Show output for debugging print(f"SNMP Output: {output}") # Check for "No Such Instance" or similar errors if "No Such Instance" in output or "No Such Object" in output: print(f"OID not available: {output}") return None # Proper parsing based on OpenBSD snmp output format if "=" in output: value_part = output.split("=", 1)[1].strip() # Special handling for Timeticks if "Timeticks:" in value_part: # Extract the numeric part from something like "Timeticks: (12345) 1:2:3.45" ticks_part = value_part.split("(", 1)[1].split(")", 1)[0] return float(ticks_part) elif ":" in value_part: value_part = value_part.split(":", 1)[1].strip() # Convert to a number if possible try: return float(value_part) except ValueError: # Only return string values that don't look like errors if "error" not in value_part.lower() and "no such" not in value_part.lower(): return value_part print(f"Error in SNMP value: {value_part}") return None else: print(f"Unexpected SNMP output format: {output}") return None except subprocess.SubprocessError as e: print(f"Error running SNMP command: {e}") if hasattr(e, 'stderr') and e.stderr: print(f"STDERR: {e.stderr}") return None except Exception as e: print(f"Unexpected error in run_snmp_command: {e}") return None def discover_supported_oids(): """Attempt to discover which OIDs are supported by the system""" print("\n----- Discovering supported OIDs (excluding memory total) -----") discovered_metrics = [] # Try to find a working CPU metric print("Looking for CPU metrics...") for oid in ALTERNATE_OIDS["cpu_load"]: value = run_snmp_command(oid) if value is not None and isinstance(value, (int, float)) and value >= 0: print(f"Found working CPU OID: {oid}") discovered_metrics.append({ "name": "cpu_load", "oid": oid, "label": "CPU Load/Usage" }) break # Try to find working memory metrics (excluding total memory) print("Looking for memory metrics (excluding total)...") # First try to find total memory (but don't add it to the list) total_memory_oid_found = None for oid in ALTERNATE_OIDS["memory_total"]: value = run_snmp_command(oid) if value is not None and isinstance(value, (int, float)) and value > 0: print(f" (Found working memory total OID: {oid} - skipping addition)") # discovered_metrics.append({ # "name": "memory_total", # "oid": oid, # "label": "Total Memory" # }) total_memory_oid_found = oid # Still note if found for potential future use break # Then try to find used memory for oid in ALTERNATE_OIDS["memory_used"]: value = run_snmp_command(oid) if value is not None and isinstance(value, (int, float)) and value > 0: print(f"Found working memory used OID: {oid}") discovered_metrics.append({ "name": "memory_used", "oid": oid, "label": "Memory Usage" }) break # If we found total memory but not used memory, we can use the total as a placeholder # We keep discovery, but won't add memory_size to METRICS by default anymore # Also removing the fallback addition here as total memory is not desired # if total_memory_oid_found and not any(m["name"] == "memory_used" for m in discovered_metrics): # print(f" (Skipping memory total as placeholder for memory usage)") # # discovered_metrics.append({ # # "name": "memory_size", # # "oid": total_memory_oid_found, # # "label": "Memory Size" # # }) return discovered_metrics def collect_metrics(): """Collect all configured metrics""" with data_lock: current_time = int(time.time() * 1000) # JavaScript timestamp (milliseconds) for metric in METRICS: metric_name = metric["name"] oid = metric.get("oid") # Use .get() as calculated metrics won't have OID # Initialize the metric data if it doesn't exist if metric_name not in metrics_data: metrics_data[metric_name] = [] # Skip SNMP collection for calculated metrics if metric.get("type") == "calculated": continue value = run_snmp_command(oid) if value is not None and (isinstance(value, (int, float)) or isinstance(value, str) and value.strip() != ""): print(f"Successfully collected {metric_name}: {value}") # Add the new data point metrics_data[metric_name].append({ "timestamp": current_time, "value": value }) # Keep only the most recent data points if len(metrics_data[metric_name]) > DATA_POINTS_TO_KEEP: metrics_data[metric_name] = metrics_data[metric_name][-DATA_POINTS_TO_KEEP:] else: print(f"Failed to collect {metric_name}") # --- New Collection Functions (moved before metrics_collector) --- def collect_application_performance(): """Calculates app performance metrics from the request log.""" current_time = int(time.time() * 1000) with request_log_lock: # Make a copy to avoid holding the lock during calculations log_copy = list(request_log) # Clear the original log for the next interval request_log.clear() if not log_copy: # No requests logged in this interval avg_response_time = 0 error_rate = 0 request_count = 0 else: total_duration = sum(log[1] for log in log_copy) total_requests = len(log_copy) error_count = sum(1 for log in log_copy if log[2] >= 400) avg_response_time = total_duration / total_requests if total_requests > 0 else 0 error_rate = (error_count / total_requests) * 100 if total_requests > 0 else 0 request_count = total_requests print(f"Collected App Perf: Avg Response={avg_response_time:.2f}ms, Error Rate={error_rate:.2f}%, Count={request_count}") # Store the calculated metrics with data_lock: for name, value in [ ("app_response_time_avg", avg_response_time), ("app_error_rate", error_rate), ("app_request_count", request_count) ]: if name not in app_perf_data: # Use app_perf_data for calculated app metrics app_perf_data[name] = [] app_perf_data[name].append({"timestamp": current_time, "value": value}) if len(app_perf_data[name]) > APP_PERF_POINTS_TO_KEEP: app_perf_data[name] = app_perf_data[name][-APP_PERF_POINTS_TO_KEEP:] def check_service_status(host, port, timeout=1): """Checks if a TCP service is available on a host and port.""" try: # Use socket.create_connection for simplicity and IPv6 compatibility with socket.create_connection((host, port), timeout=timeout): return 1 # Service is up except socket.timeout: print(f"Service check timeout for {host}:{port}") return 0 # Timeout means service is not readily available except ConnectionRefusedError: # Explicitly handle connection refused return 0 # Service is actively refusing connection except OSError as e: # Catch other potential OS errors like network unreachable print(f"Service check OS error for {host}:{port}: {e}") return 0 except Exception as e: # Catch any other unexpected errors print(f"Unexpected error checking service {host}:{port}: {e}") return 0 # Treat other errors as down def collect_service_availability(): """Collects availability status for configured services.""" current_time = int(time.time() * 1000) services_to_check = [ # Check the web server itself {"name": "service_http_status", "host": "127.0.0.1", "port": SERVER_PORT}, # Check SSH {"name": "service_ssh_status", "host": "127.0.0.1", "port": 22}, # Add other services like database here: # {"name": "service_db_status", "host": "db_host", "port": db_port}, ] print("Collecting Service Availability...") with data_lock: # Assuming lock is sufficient for metrics_data writes for service in services_to_check: metric_name = service["name"] host = service["host"] port = service["port"] status = check_service_status(host, port) print(f" {metric_name} ({host}:{port}): {'Up' if status == 1 else 'Down'}") if metric_name not in metrics_data: # Store directly in metrics_data metrics_data[metric_name] = [] # Ensure data structure matches other metrics metrics_data[metric_name].append({"timestamp": current_time, "value": status}) # Keep only the most recent data points if len(metrics_data[metric_name]) > DATA_POINTS_TO_KEEP: metrics_data[metric_name] = metrics_data[metric_name][-DATA_POINTS_TO_KEEP:] # --- End New Collection Functions --- def metrics_collector(): """Background thread to collect metrics periodically""" while True: try: print("\n--- Collecting metrics ---") # Collect SNMP metrics collect_metrics() # Collect Service Availability collect_service_availability() # Collect Application Performance collect_application_performance() # Process logs collected by web server except Exception as e: print(f"Error in metrics collector loop: {e}") time.sleep(SNMP_COLLECTION_INTERVAL) def verify_password(password): """Verify the password against stored hash (no username needed).""" hashed_password = hashlib.sha256(password.encode()).hexdigest() # Compare the provided password hash with the configured hash return secrets.compare_digest(hashed_password, AUTH_CONFIG['password_hash']) def get_server_stats(): """Generate server stats, including basic service checks.""" stats = {} try: # --- Get Uptime --- if platform.system() == "Windows": # Windows uptime via PowerShell (in seconds) uptime_cmd = ["powershell", "-Command", "Get-CimInstance Win32_OperatingSystem | Select-Object LastBootUpTime,LocalDateTime | ForEach-Object {(New-TimeSpan -Start $_.LastBootUpTime -End $_.LocalDateTime).TotalSeconds}"] uptime_sec = int(float(subprocess.check_output(uptime_cmd).decode().strip())) days = uptime_sec // 86400 hours = (uptime_sec % 86400) // 3600 minutes = (uptime_sec % 3600) // 60 uptime_text = f"{days} days, {hours} hours, {minutes} minutes" else: # Linux/Unix uptime uptime_output = subprocess.check_output(["uptime"]).decode() # Extract the uptime text uptime_match = re.search(r'up\s+(.*?),\s+\d+\s+user', uptime_output) uptime_text = uptime_match.group(1) if uptime_match else "Unknown" # Extract load averages load_match = re.search(r'load average[s]?:\s+([0-9.]+),\s+([0-9.]+),\s+([0-9.]+)', uptime_output) if load_match: stats['load_1min'] = load_match.group(1) stats['load_5min'] = load_match.group(2) stats['load_15min'] = load_match.group(3) else: stats['load_1min'] = "0" stats['load_5min'] = "0" stats['load_15min'] = "0" # Set uptime stats['uptime'] = uptime_text # Get CPU cores if platform.system() == "Windows": # Windows core count cores_cmd = ["powershell", "-Command", "Get-CimInstance Win32_ComputerSystem | Select-Object NumberOfLogicalProcessors"] cores = int(subprocess.check_output(cores_cmd).decode().strip().split("\r\n")[-1]) else: # Linux/Unix core count try: cores = int(subprocess.check_output(["grep", "-c", "processor", "/proc/cpuinfo"]).decode().strip()) except: # Fallback for systems without /proc/cpuinfo try: cores = int(subprocess.check_output(["sysctl", "-n", "hw.ncpu"]).decode().strip()) except: cores = 1 # Default if we can't determine stats['cores'] = str(cores) # --- Check Service Availability --- print("Checking essential services for server stats...") # Check HTTP service (self) stats['service_http_status'] = check_service_status("127.0.0.1", SERVER_PORT, timeout=0.5) print(f" HTTP check (port {SERVER_PORT}): {'Up' if stats['service_http_status'] == 1 else 'Down'}") # Check SSH service stats['service_ssh_status'] = check_service_status("127.0.0.1", 22, timeout=0.5) print(f" SSH check (port 22): {'Up' if stats['service_ssh_status'] == 1 else 'Down'}") # --- End Service Availability Check --- stats['timestamp'] = int(time.time()) return stats except Exception as e: print(f"Error generating server stats: {e}") # Return default values if error, including default service status return { 'uptime': 'Unknown', 'load_1min': '0', 'load_5min': '0', 'load_15min': '0', 'cores': '1', 'service_http_status': 0, # Default to down on error 'service_ssh_status': 0, # Default to down on error 'timestamp': int(time.time()) } # --- HTTP Request Handler with JWT --- class WebServer(BaseHTTPRequestHandler): """HTTP request handler for the combined web server with JWT Auth""" # Base directory for serving files base_path = os.path.dirname(os.path.abspath(__file__)) static_path = os.path.join(base_path, "static") templates_path = os.path.join(base_path, "templates") def log_message(self, format, *args): client_ip = self.client_address[0] print(f"[{client_ip}] {format % args}") def send_response_with_headers(self, status_code, content_type, extra_headers=None): self.send_response(status_code) self.send_header("Content-Type", content_type) self.send_header("Access-Control-Allow-Origin", "*") self.send_header("Cache-Control", "no-cache, no-store, must-revalidate") self.send_header("Pragma", "no-cache") self.send_header("Expires", "0") if extra_headers: for header, value in extra_headers.items(): self.send_header(header, value) self.end_headers() def send_error_json(self, status_code, message): self.log_message(f"Sending error {status_code}: {message}") self.send_response(status_code) self.send_header("Content-Type", "application/json") self.send_header("Access-Control-Allow-Origin", "*") self.send_header("Cache-Control", "no-cache, no-store, must-revalidate") self.send_header("Pragma", "no-cache") self.send_header("Expires", "0") self.end_headers() self.wfile.write(json.dumps({"error": message}).encode()) def serve_static_file(self, requested_path): """Serve a static file from the static directory""" try: # Prevent directory traversal, ensure path starts with /static/ if not requested_path.startswith('/static/') or '..' in requested_path: self.send_error_json(403, "Access denied to static resource") return # Construct absolute path relative to the static directory relative_path = requested_path.lstrip('/') # Remove leading slash file_path = os.path.abspath(os.path.join(self.base_path, relative_path)) # Security check: ensure the final path is within the static directory if not file_path.startswith(self.static_path): self.log_message(f"Attempted access outside static dir: {file_path}") self.send_error_json(403, "Access denied") return if not os.path.exists(file_path) or not os.path.isfile(file_path): self.send_error_json(404, f"Static file not found: {requested_path}") return # Guess MIME type content_type, _ = mimetypes.guess_type(file_path) if not content_type: content_type = 'application/octet-stream' # Default binary type self.log_message(f"Serving static file: {file_path} ({content_type})") with open(file_path, 'rb') as file: fs = os.fstat(file.fileno()) self.send_response_with_headers(200, content_type, {"Content-Length": str(fs.st_size)}) self.wfile.write(file.read()) except Exception as e: self.log_message(f"Error serving static file {requested_path}: {e}") self.send_error_json(500, "Internal server error serving file") def serve_template(self, template_name): """Serve an HTML template from the templates directory""" try: file_path = os.path.abspath(os.path.join(self.templates_path, template_name)) # Security check if not file_path.startswith(self.templates_path) or '..' in template_name: self.send_error_json(403, "Access denied to template") return if not os.path.exists(file_path) or not os.path.isfile(file_path): self.send_error_json(404, f"Template not found: {template_name}") return self.log_message(f"Serving template: {file_path}") with open(file_path, 'rb') as file: fs = os.fstat(file.fileno()) self.send_response_with_headers(200, "text/html", {"Content-Length": str(fs.st_size)}) self.wfile.write(file.read()) except Exception as e: self.log_message(f"Error serving template {template_name}: {e}") self.send_error_json(500, "Internal server error serving template") def verify_jwt(self): """Verify JWT from Authorization header. Returns payload or None.""" auth_header = self.headers.get('Authorization') if not auth_header or not auth_header.startswith('Bearer '): return None token = auth_header.split(' ')[1] try: payload = jwt.decode(token, JWT_SECRET_KEY, algorithms=[JWT_ALGORITHM]) # Optional: check 'sub' or other claims if needed self.log_message(f"JWT verified for sub: {payload.get('sub')}") return payload except jwt.ExpiredSignatureError: self.log_message("JWT verification failed: ExpiredSignatureError") return None # Indicate expired but valid structure except jwt.InvalidTokenError as e: self.log_message(f"JWT verification failed: InvalidTokenError ({e})") return None # Indicate invalid token except Exception as e: self.log_message(f"JWT verification failed: Unexpected error ({e})") return None def require_auth(self): """Decorator-like method to check auth before proceeding. Returns True if authorized.""" payload = self.verify_jwt() if payload: return True else: self.send_error_json(401, "Authentication required or token expired") return False def do_OPTIONS(self): self.send_response(204) # No Content for OPTIONS self.send_header("Access-Control-Allow-Origin", "*") self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS") self.send_header("Access-Control-Allow-Headers", "Content-Type, Authorization") self.end_headers() def do_POST(self): path = urlparse(self.path).path if path == "/api/login": content_length = int(self.headers.get('Content-Length', 0)) post_data = self.rfile.read(content_length).decode('utf-8') try: login_data = json.loads(post_data) password = login_data.get('password', '') # Only get password if verify_password(password): # Only pass password # Create JWT payload (no 'sub' needed or set to generic) expiration = datetime.now(timezone.utc) + timedelta(hours=JWT_EXPIRATION_HOURS) payload = { # 'sub': 'authenticated_user', # Optional generic subject 'iat': datetime.now(timezone.utc), 'exp': expiration } token = jwt.encode(payload, JWT_SECRET_KEY, algorithm=JWT_ALGORITHM) self.log_message(f"Login successful, token issued.") # Removed username self.send_response_with_headers(200, "application/json") self.wfile.write(json.dumps({"token": token}).encode()) else: self.log_message(f"Login failed: Invalid password") # Removed username self.send_error_json(401, "Invalid credentials") except json.JSONDecodeError: self.send_error_json(400, "Invalid JSON") except Exception as e: self.log_message(f"Error during login: {e}") self.send_error_json(500, "Server error during login") else: self.send_error_json(404, "Not found") def do_GET(self): url_parts = urlparse(self.path) path = unquote(url_parts.path) # --- API Endpoints --- if path == "/api/metrics": if self.require_auth(): self.send_response_with_headers(200, "application/json") with data_lock: # Combine SNMP metrics and calculated App Perf metrics combined_metrics = metrics_data.copy() combined_metrics.update(app_perf_data) # Merge app perf data response = { "metrics": combined_metrics, # Update definitions to include calculated metrics' labels "definitions": { m["name"]: {"label": m["label"]} for m in METRICS # Use the main METRICS list for definitions } } self.wfile.write(json.dumps(response).encode()) elif path.startswith("/api/metric/"): if self.require_auth(): metric_name = path.split("/")[-1] # Check both metrics_data and app_perf_data data_to_send = None with data_lock: # Use data_lock for both, assuming app_perf_data updates are covered by it in collector if metric_name in metrics_data: data_to_send = metrics_data[metric_name] elif metric_name in app_perf_data: data_to_send = app_perf_data[metric_name] if data_to_send: self.send_response_with_headers(200, "application/json") self.wfile.write(json.dumps(data_to_send).encode()) else: self.send_error_json(404, f"Metric '{metric_name}' not found") # Use send_error_json for consistency elif path == "/stat/.server-stats.json": # Server stats endpoint does not require authentication stats = get_server_stats() self.send_response_with_headers(200, "application/json") self.wfile.write(json.dumps(stats, indent=4).encode()) # --- Static Files & Templates --- elif path == "/": self.serve_template("index.html") elif path.startswith("/static/"): self.serve_static_file(path) elif path == "/favicon.ico": # Favicon served from static dir self.serve_static_file("/static/favicon.ico") # Assuming favicon is in static else: # Serve index.html for any other path (for single-page app behavior) # OR send 404 if you prefer strict path matching self.serve_template("index.html") # Alternatively: self.send_error_json(404, "Not found") # --- Server Startup --- def get_ip_address(): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) try: s.connect(('10.255.255.255', 1)) ip = s.getsockname()[0] except Exception: ip = '127.0.0.1' finally: s.close() return ip def start_server(port=SERVER_PORT): try: # Update password hash at startup based on the password AUTH_CONFIG['password_hash'] = hashlib.sha256(AUTH_CONFIG['password'].encode()).hexdigest() # Print startup information (mask sensitive info) print(f"\nStarting server with the following configuration:") print(f"Server Port: {port}") if AUTH_CONFIG['password'] == default_password: print(f"WARNING: Using auto-generated password: {AUTH_CONFIG['password']}") print(f"You should set AUTH_PASSWORD in the environment or .env file") else: print(f"Using configured password (from environment)") # Print a hint of the JWT secret but never the full value jwt_hint = JWT_SECRET_KEY[:4] + "..." + JWT_SECRET_KEY[-4:] if len(JWT_SECRET_KEY) > 8 else "****" print(f"JWT Secret configured: {jwt_hint}") server = HTTPServer(("0.0.0.0", port), WebServer) ip = get_ip_address() print(f"\nServer is running at: http://{ip}:{port}/") print(f"Press Ctrl+C to stop the server") server.serve_forever() except KeyboardInterrupt: print("\nShutting down server...") server.socket.close() except ImportError: print("\nERROR: PyJWT library not found.") print("Please install it using: pip install PyJWT cryptography") except OSError as e: if e.errno == 98: # Address already in use print(f"\nERROR: Port {port} is already in use.") print("Please stop the other process or choose a different port.") else: print(f"\nError starting server: {e}") except Exception as e: print(f"\nError starting server: {e}") if __name__ == "__main__": print("\n=== Comprehensive Web and Metrics Server with JWT ===") print("Starting with the following configuration:") print(f"SNMP Host: {SNMP_HOST}") print(f"SNMP Community: {SNMP_COMMUNITY}") print(f"SNMP Version: {SNMP_VERSION}") # Display only the metrics configured for collection/calculation initially print(f"Configured Metrics: {[m['name'] + (' (SNMP)' if 'oid' in m and m.get('type') != 'calculated' else ' (calculated)') for m in METRICS]}") print("\nAttempting to discover additional SNMP metrics (won't be added to periodic collection)...") # Store discovered metrics separately, don't add them back to METRICS discovered_metrics_info = discover_supported_oids() if discovered_metrics_info: print(f"Discovered {len(discovered_metrics_info)} additional potentially available SNMP metrics:") for m in discovered_metrics_info: print(f" - {m['name']} ({m['oid']})") # METRICS.extend(discovered) # DO NOT add discovered metrics back to the main polling list else: print("No additional SNMP metrics discovered") collector_thread = threading.Thread(target=metrics_collector, daemon=True) collector_thread.start() start_server() # --- End Add request logging --- def handle_one_request(self): """Handle a single HTTP request and log performance.""" start_time = time.monotonic() # Use a variable to store the status code, as super().handle_one_request() doesn't return it easily self._current_status_code = 200 # Default to 200 in case no response is sent try: # Call the original handler super().handle_one_request() except Exception as e: # Log exceptions and set status to 500 self.log_message(f"Error handling request: {e}") self._current_status_code = 500 # Don't re-raise so we can still capture metrics finally: # Calculate duration *after* the request is handled end_time = time.monotonic() duration_ms = (end_time - start_time) * 1000 # Use the captured status code or default to 500 if something went very wrong before send_response was called status_code = getattr(self, '_current_status_code', 500) # Log the request details (timestamp, duration, status) with request_log_lock: request_log.append((int(time.time() * 1000), duration_ms, status_code)) # Optional: Limit request_log size if it grows too large between collections # Consider if this is needed based on expected request volume and collection interval MAX_LOG_ENTRIES = 1000 if len(request_log) > MAX_LOG_ENTRIES: request_log = request_log[-MAX_LOG_ENTRIES:] # Log the request completion for debugging # Ensure requestline is available; default if not requestline = getattr(self, 'requestline', 'Unknown Request') self.log_message(f'"{requestline}" {status_code} - {duration_ms:.2f}ms') def send_response(self, code, message=None): """Override send_response to capture the status code.""" self.log_message(f"Setting status code: {code}") # Debug log self._current_status_code = code # Store the status code super().send_response(code, message) # --- End request logging methods --- def send_response_with_headers(self, status_code, content_type, extra_headers=None): # Call the overridden send_response to ensure status code is captured self.send_response(status_code) self.send_header("Content-Type", content_type) self.send_header("Access-Control-Allow-Origin", "*") self.send_header("Cache-Control", "no-cache, no-store, must-revalidate") self.send_header("Pragma", "no-cache") self.send_header("Expires", "0") if extra_headers: for header, value in extra_headers.items(): self.send_header(header, value) self.end_headers() def send_error_json(self, status_code, message): self.log_message(f"Sending error {status_code}: {message}") # Call the overridden send_response directly here to ensure the status code is set before headers self.send_response(status_code) # Set headers manually after send_response self.send_header("Content-Type", "application/json") self.send_header("Access-Control-Allow-Origin", "*") self.send_header("Cache-Control", "no-cache, no-store, must-revalidate") self.send_header("Pragma", "no-cache") self.send_header("Expires", "0") self.end_headers() self.wfile.write(json.dumps({"error": message}).encode()) # --- End Add request logging ---