from fastapi import FastAPI, Request, Query, HTTPException from fastapi.responses import HTMLResponse from fastapi.templating import Jinja2Templates from fastapi.staticfiles import StaticFiles from typing import List, Optional import os import re from datetime import datetime, timedelta from pydantic import BaseModel app = FastAPI(title="VPN Session Viewer") templates = Jinja2Templates(directory="templates") # Model for log entries class LogEntry(BaseModel): gateway: str timestamp: datetime filename: str @app.get("/", response_class=HTMLResponse) async def home(request: Request, gateway: Optional[str] = None): logs = get_all_logs() if gateway: logs = [log for log in logs if log.gateway == gateway] gateways = sorted(set(log.gateway for log in get_all_logs())) return templates.TemplateResponse("index.html", { "request": request, "logs": logs, "gateways": gateways, "selected_gateway": gateway }) @app.get("/api/logs", response_model=List[LogEntry]) async def api_logs(gateway: Optional[str] = None): """Get all logs or filter by gateway name""" logs = get_all_logs() if gateway: logs = [log for log in logs if log.gateway == gateway] return logs @app.get("/api/gateways", response_model=List[str]) async def api_gateways(): """Get list of unique gateway names""" logs = get_all_logs() gateways = set(log.gateway for log in logs) return sorted(list(gateways)) @app.get("/api/log-content/{filename}", response_model=List[dict]) async def api_log_content(filename: str): """Get parsed log content for a specific file""" log_path = os.path.join(os.getcwd(), "logs", filename) try: _, parsed_rows = parse_log_file(log_path) except FileNotFoundError: raise HTTPException(status_code=404, detail=f"Log file {filename} not found") return parsed_rows @app.get("/combined", response_class=HTMLResponse) async def combined_view( request: Request, gateway: Optional[str] = None, search: Optional[str] = None, start_date: Optional[str] = None, end_date: Optional[str] = None, use_default_time: Optional[str] = None ): """Combined view of all logs with filtering and search""" # Convert string form parameter to boolean use_default_time_bool = use_default_time is not None logs_dir = os.path.join(os.getcwd(), "logs") all_rows = [] common_columns = set() # Parse date strings into datetime objects if provided # or set defaults if not provided and use_default_time is True start_datetime = None end_datetime = None # If no dates are provided and use_default_time is True, set defaults if not start_date and not end_date and use_default_time_bool: # Set end_datetime to current time - use UTC timezone for consistency end_datetime = datetime.now().replace(tzinfo=None) # Set start_datetime to 30 minutes ago start_datetime = end_datetime - timedelta(minutes=30) else: # Process provided dates if start_date: try: # Handle both ISO format and datetime-local input format if 'T' in start_date: # Make sure we have seconds if not provided if len(start_date.split('T')[1].split(':')) == 2: start_date = f"{start_date}:00" # Add timezone if missing if not start_date.endswith('Z') and '+' not in start_date: start_date = f"{start_date}Z" else: # If only date without time, set time to start of day start_date = f"{start_date}T00:00:00Z" # Parse and remove timezone for consistent comparisons start_datetime = datetime.fromisoformat(start_date.replace('Z', '+00:00')).replace(tzinfo=None) except ValueError as e: print(f"Error parsing start_date: {e}") if end_date: try: # Handle both ISO format and datetime-local input format if 'T' in end_date: # Make sure we have seconds if not provided if len(end_date.split('T')[1].split(':')) == 2: end_date = f"{end_date}:00" # Add timezone if missing if not end_date.endswith('Z') and '+' not in end_date: end_date = f"{end_date}Z" else: # If only date without time, set time to end of day end_date = f"{end_date}T23:59:59Z" # Parse and remove timezone for consistent comparisons end_datetime = datetime.fromisoformat(end_date.replace('Z', '+00:00')).replace(tzinfo=None) except ValueError as e: print(f"Error parsing end_date: {e}") # Get all log files log_files = [f for f in os.listdir(logs_dir) if f.endswith(".logs")] # Parse all log files and collect all rows for filename in log_files: log_path = os.path.join(logs_dir, filename) try: columns, rows = parse_log_file(log_path) if columns: common_columns.update(columns) all_rows.extend(rows) except Exception as e: print(f"Error processing file {filename} in combined view: {e}") # Apply gateway filter if specified if gateway: all_rows = [row for row in all_rows if row.get("_gateway") == gateway] # Apply date range filter if specified if start_datetime or end_datetime: filtered_rows = [] for row in all_rows: timestamp = row.get("_timestamp") if timestamp: if start_datetime and timestamp < start_datetime: continue if end_datetime and timestamp > end_datetime: continue filtered_rows.append(row) all_rows = filtered_rows # Apply search filter if specified if search and search.strip(): search_term = search.lower() filtered_rows = [] for row in all_rows: for key, value in row.items(): if isinstance(value, str) and search_term in value.lower(): filtered_rows.append(row) break all_rows = filtered_rows # Sort by timestamp descending (newest first) all_rows.sort(key=lambda x: x.get("_timestamp", datetime.min), reverse=True) # Get unique gateway names for filter dropdown gateways = sorted(set(row.get("_gateway") for row in all_rows if row.get("_gateway"))) # Prepare final columns list while preserving original order # We'll use a reference order from the first log file that has columns reference_columns = [] for filename in log_files: log_path = os.path.join(logs_dir, filename) first_columns, _ = parse_log_file(log_path) if first_columns: reference_columns = first_columns break # Ensure all common columns are included while preserving original order where possible display_columns = [] # First add columns in the reference order for col in reference_columns: if col in common_columns: display_columns.append(col) common_columns.remove(col) # Add any remaining columns display_columns.extend(sorted(list(common_columns))) # Add metadata columns last meta_columns = ["_gateway", "_timestamp", "_source_file"] final_columns = display_columns + meta_columns # Format dates for display in datetime-local form fields formatted_start_date = start_datetime.strftime('%Y-%m-%dT%H:%M') if start_datetime else "" formatted_end_date = end_datetime.strftime('%Y-%m-%dT%H:%M') if end_datetime else "" return templates.TemplateResponse("combined.html", { "request": request, "rows": all_rows, "columns": final_columns, "gateways": gateways, "selected_gateway": gateway, "search_term": search, "start_date": formatted_start_date, "end_date": formatted_end_date }) @app.get("/api/all-entries", response_model=List[dict]) async def api_all_entries( gateway: Optional[str] = None, search: Optional[str] = None, start_date: Optional[str] = None, end_date: Optional[str] = None, use_default_time: Optional[str] = None ): """Get all log entries from all files with optional filtering""" # Convert string parameter to boolean use_default_time_bool = use_default_time is not None logs_dir = os.path.join(os.getcwd(), "logs") all_rows = [] # Parse date strings into datetime objects if provided # or set defaults if not provided and use_default_time is True start_datetime = None end_datetime = None # If no dates are provided and use_default_time is True, set defaults if not start_date and not end_date and use_default_time_bool: # Set end_datetime to current time - use UTC timezone for consistency end_datetime = datetime.now().replace(tzinfo=None) # Set start_datetime to 30 minutes ago start_datetime = end_datetime - timedelta(minutes=30) else: if start_date: try: # Handle both ISO format and datetime-local input format if 'T' in start_date: # Make sure we have seconds if not provided if len(start_date.split('T')[1].split(':')) == 2: start_date = f"{start_date}:00" # Add timezone if missing if not start_date.endswith('Z') and '+' not in start_date: start_date = f"{start_date}Z" else: # If only date without time, set time to start of day start_date = f"{start_date}T00:00:00Z" # Parse and remove timezone for consistent comparisons start_datetime = datetime.fromisoformat(start_date.replace('Z', '+00:00')).replace(tzinfo=None) except ValueError as e: print(f"Error parsing start_date: {e}") if end_date: try: # Handle both ISO format and datetime-local input format if 'T' in end_date: # Make sure we have seconds if not provided if len(end_date.split('T')[1].split(':')) == 2: end_date = f"{end_date}:00" # Add timezone if missing if not end_date.endswith('Z') and '+' not in end_date: end_date = f"{end_date}Z" else: # If only date without time, set time to end of day end_date = f"{end_date}T23:59:59Z" # Parse and remove timezone for consistent comparisons end_datetime = datetime.fromisoformat(end_date.replace('Z', '+00:00')).replace(tzinfo=None) except ValueError as e: print(f"Error parsing end_date: {e}") # Get all log files log_files = [f for f in os.listdir(logs_dir) if f.endswith(".logs")] # Parse all log files and collect all rows and track column order reference_columns = [] for filename in log_files: log_path = os.path.join(logs_dir, filename) try: columns, rows = parse_log_file(log_path) if columns and not reference_columns: # Save column order from first file with columns reference_columns = columns all_rows.extend(rows) except Exception as e: print(f"Error processing file {filename} in api_all_entries: {e}") # Apply gateway filter if specified if gateway: all_rows = [row for row in all_rows if row.get("_gateway") == gateway] # Apply date range filter if specified if start_datetime or end_datetime: filtered_rows = [] for row in all_rows: timestamp = row.get("_timestamp") if timestamp: if start_datetime and timestamp < start_datetime: continue if end_datetime and timestamp > end_datetime: continue filtered_rows.append(row) all_rows = filtered_rows # Apply search filter if specified if search and search.strip(): search_term = search.lower() filtered_rows = [] for row in all_rows: for key, value in row.items(): if isinstance(value, str) and search_term in value.lower(): filtered_rows.append(row) break all_rows = filtered_rows # Sort by timestamp descending (newest first) all_rows.sort(key=lambda x: x.get("_timestamp", datetime.min), reverse=True) return all_rows class LogRow(BaseModel): """Model for a parsed log row""" index: Optional[int] = None user: Optional[str] = None group: Optional[str] = None # Fields for Login Users section auth_type: Optional[str] = None timeout: Optional[str] = None auth_timeout: Optional[str] = None from_ip: Optional[str] = None http: Optional[str] = None https: Optional[str] = None two_factor: Optional[str] = None # Fields for Sessions section source_ip: Optional[str] = None duration: Optional[str] = None io_bytes: Optional[str] = None tunnel_dest_ip: Optional[str] = None # Generic field for raw line raw_line: str @app.get("/view/{filename}", response_class=HTMLResponse) async def view_log(request: Request, filename: str): log_path = os.path.join(os.getcwd(), "logs", filename) raw_content = None parsed_rows = [] header_columns = [] try: # Read the file in binary mode first to check for encodings with open(log_path, "rb") as file: binary_content = file.read() # Check for BOM (Byte Order Mark) at the beginning of the file raw_content = None # Check for UTF-16 LE BOM if binary_content.startswith(b'\xff\xfe'): try: raw_content = binary_content.decode('utf-16-le') except UnicodeDecodeError: pass # Check for UTF-16 BE BOM if raw_content is None and binary_content.startswith(b'\xfe\xff'): try: raw_content = binary_content.decode('utf-16-be') except UnicodeDecodeError: pass # Try UTF-8 if raw_content is None: try: raw_content = binary_content.decode('utf-8') except UnicodeDecodeError: pass # Try common encodings if we still don't have content if raw_content is None: for encoding in ['utf-16', 'latin1', 'cp1252', 'iso-8859-1']: try: raw_content = binary_content.decode(encoding) break except UnicodeDecodeError: continue # If all decodings fail, use latin1 as a fallback with replacement if raw_content is None: raw_content = binary_content.decode('latin1', errors='replace') header_columns, parsed_dict_rows = parse_log_file(log_path) # Convert dictionary rows to LogRow objects for backward compatibility with the template for row_dict in parsed_dict_rows: row = LogRow(raw_line="") # Common fields if "Index" in row_dict and row_dict["Index"].isdigit(): row.index = int(row_dict["Index"]) if "User" in row_dict: row.user = row_dict["User"] if "Group" in row_dict: row.group = row_dict["Group"] # Login Users fields if "Auth Type" in row_dict: row.auth_type = row_dict["Auth Type"] if "Timeout" in row_dict: row.timeout = row_dict["Timeout"] if "Auth-Timeout" in row_dict: row.auth_timeout = row_dict["Auth-Timeout"] if "From" in row_dict: row.from_ip = row_dict["From"] if "HTTP in/out" in row_dict: row.http = row_dict["HTTP in/out"] if "HTTPS in/out" in row_dict: row.https = row_dict["HTTPS in/out"] if "Two-factor Auth" in row_dict: row.two_factor = row_dict["Two-factor Auth"] # VPN Sessions fields if "Source IP" in row_dict: row.source_ip = row_dict["Source IP"] if "Duration" in row_dict: row.duration = row_dict["Duration"] if "I/O Bytes" in row_dict: row.io_bytes = row_dict["I/O Bytes"] if "Tunnel/Dest IP" in row_dict: row.tunnel_dest_ip = row_dict["Tunnel/Dest IP"] parsed_rows.append(row) except FileNotFoundError: raw_content = f"Log file {filename} not found" gateway, timestamp = parse_filename(filename) return templates.TemplateResponse("view.html", { "request": request, "filename": filename, "gateway": gateway, "timestamp": timestamp, "raw_content": raw_content, "parsed_rows": parsed_rows, "columns": header_columns }) def get_all_logs() -> List[LogEntry]: """Get all log files in the logs directory""" logs_dir = os.path.join(os.getcwd(), "logs") log_files = [f for f in os.listdir(logs_dir) if f.endswith(".logs")] result = [] for filename in log_files: try: gateway, timestamp = parse_filename(filename) if gateway and timestamp: result.append(LogEntry( gateway=gateway, timestamp=timestamp, filename=filename )) else: print(f"Could not parse filename: {filename}") except Exception as e: print(f"Error processing log file {filename}: {e}") # Sort by timestamp descending (newest first) result.sort(key=lambda x: x.timestamp, reverse=True) return result def parse_filename(filename: str): """Parse gateway name and timestamp from filename""" pattern = r"^(.+)_(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z)\.logs$" match = re.match(pattern, filename) if match: gateway = match.group(1) timestamp_str = match.group(2) # Parse timestamp but remove timezone info for consistent comparisons timestamp = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00')).replace(tzinfo=None) return gateway, timestamp return None, None def parse_log_file(log_path): """Parse a log file and return header columns and rows""" parsed_rows = [] header_columns = [] try: # Read the file in binary mode first to check for encodings with open(log_path, "rb") as file: binary_content = file.read() # Check for BOM (Byte Order Mark) at the beginning of the file content = None # Check for UTF-16 LE BOM if binary_content.startswith(b'\xff\xfe'): try: content = binary_content.decode('utf-16-le') except UnicodeDecodeError: pass # Check for UTF-16 BE BOM if content is None and binary_content.startswith(b'\xfe\xff'): try: content = binary_content.decode('utf-16-be') except UnicodeDecodeError: pass # Try UTF-8 if content is None: try: content = binary_content.decode('utf-8') except UnicodeDecodeError: pass # Try common encodings if we still don't have content if content is None: for encoding in ['utf-16', 'latin1', 'cp1252', 'iso-8859-1']: try: content = binary_content.decode(encoding) break except UnicodeDecodeError: continue # If all decodings fail, use latin1 as a fallback with replacement if content is None: content = binary_content.decode('latin1', errors='replace') lines = content.splitlines() # Find the "SSL-VPN sessions:" section session_section_start = None for i, line in enumerate(lines): if "SSL-VPN sessions:" in line: session_section_start = i break if session_section_start is None: # If SSL-VPN sessions section not found, fall back to the login users section for i, line in enumerate(lines): if "SSL-VPN Login Users:" in line: session_section_start = i break if session_section_start is None: # No recognized sections found return header_columns, parsed_rows # Find header line with column names (it should be right after the section title) header_line_idx = session_section_start + 1 if header_line_idx < len(lines): header_line = lines[header_line_idx] if "Index" in header_line and "User" in header_line and "Group" in header_line: # Preserve exact order of columns from file header_columns = [col.strip() for col in header_line.split("\t") if col.strip()] # Parse data rows for line in lines[header_line_idx+1:]: # Stop parsing when we hit an empty line or a new section if not line.strip() or line.strip().endswith("#"): break if line.strip() and not line.startswith("FBI-HQ-SSLVPN #"): columns = [col.strip() for col in line.split("\t") if col] row_data = {} # Map columns to dictionary in original order with extra whitespace handling for i, col in enumerate(columns): if i < len(header_columns): column_name = header_columns[i] # Triple strip to ensure all possible whitespace is removed clean_value = col.strip() if col else "" # Special handling for Tunnel/Dest IP which may have extra spaces if column_name == "Tunnel/Dest IP": clean_value = clean_value.strip() row_data[column_name] = clean_value # Add source filename metadata filename = os.path.basename(log_path) gateway, timestamp = parse_filename(filename) row_data["_source_file"] = filename row_data["_gateway"] = gateway row_data["_timestamp"] = timestamp parsed_rows.append(row_data) except Exception as e: print(f"Error parsing log file {log_path}: {e}") return header_columns, parsed_rows if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)