VPN-Session-History-FastAPI/main.py
2025-04-10 21:40:30 +02:00

528 lines
21 KiB
Python

from fastapi import FastAPI, Request, Query, HTTPException
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
from typing import List, Optional
import os
import re
from datetime import datetime, timedelta
from pydantic import BaseModel
app = FastAPI(title="VPN Session Viewer")
templates = Jinja2Templates(directory="templates")
# Model for log entries
class LogEntry(BaseModel):
gateway: str
timestamp: datetime
filename: str
@app.get("/", response_class=HTMLResponse)
async def home(request: Request, gateway: Optional[str] = None):
logs = get_all_logs()
if gateway:
logs = [log for log in logs if log.gateway == gateway]
gateways = sorted(set(log.gateway for log in get_all_logs()))
return templates.TemplateResponse("index.html", {
"request": request,
"logs": logs,
"gateways": gateways,
"selected_gateway": gateway
})
@app.get("/api/logs", response_model=List[LogEntry])
async def api_logs(gateway: Optional[str] = None):
"""Get all logs or filter by gateway name"""
logs = get_all_logs()
if gateway:
logs = [log for log in logs if log.gateway == gateway]
return logs
@app.get("/api/gateways", response_model=List[str])
async def api_gateways():
"""Get list of unique gateway names"""
logs = get_all_logs()
gateways = set(log.gateway for log in logs)
return sorted(list(gateways))
@app.get("/api/log-content/{filename}", response_model=List[dict])
async def api_log_content(filename: str):
"""Get parsed log content for a specific file"""
log_path = os.path.join(os.getcwd(), "logs", filename)
try:
_, parsed_rows = parse_log_file(log_path)
except FileNotFoundError:
raise HTTPException(status_code=404, detail=f"Log file {filename} not found")
return parsed_rows
@app.get("/combined", response_class=HTMLResponse)
async def combined_view(
request: Request,
gateway: Optional[str] = None,
search: Optional[str] = None,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
use_default_time: Optional[str] = None
):
"""Combined view of all logs with filtering and search"""
# Convert string form parameter to boolean
use_default_time_bool = use_default_time is not None
logs_dir = os.path.join(os.getcwd(), "logs")
all_rows = []
common_columns = set()
# Parse date strings into datetime objects if provided
# or set defaults if not provided and use_default_time is True
start_datetime = None
end_datetime = None
# If no dates are provided and use_default_time is True, set defaults
if not start_date and not end_date and use_default_time_bool:
# Set end_datetime to current time - use UTC timezone for consistency
end_datetime = datetime.now().replace(tzinfo=None)
# Set start_datetime to 30 minutes ago
start_datetime = end_datetime - timedelta(minutes=30)
else:
# Process provided dates
if start_date:
try:
# Handle both ISO format and datetime-local input format
if 'T' in start_date:
# Make sure we have seconds if not provided
if len(start_date.split('T')[1].split(':')) == 2:
start_date = f"{start_date}:00"
# Add timezone if missing
if not start_date.endswith('Z') and '+' not in start_date:
start_date = f"{start_date}Z"
else:
# If only date without time, set time to start of day
start_date = f"{start_date}T00:00:00Z"
# Parse and remove timezone for consistent comparisons
start_datetime = datetime.fromisoformat(start_date.replace('Z', '+00:00')).replace(tzinfo=None)
except ValueError as e:
print(f"Error parsing start_date: {e}")
if end_date:
try:
# Handle both ISO format and datetime-local input format
if 'T' in end_date:
# Make sure we have seconds if not provided
if len(end_date.split('T')[1].split(':')) == 2:
end_date = f"{end_date}:00"
# Add timezone if missing
if not end_date.endswith('Z') and '+' not in end_date:
end_date = f"{end_date}Z"
else:
# If only date without time, set time to end of day
end_date = f"{end_date}T23:59:59Z"
# Parse and remove timezone for consistent comparisons
end_datetime = datetime.fromisoformat(end_date.replace('Z', '+00:00')).replace(tzinfo=None)
except ValueError as e:
print(f"Error parsing end_date: {e}")
# Get all log files
log_files = [f for f in os.listdir(logs_dir) if f.endswith(".logs")]
# Parse all log files and collect all rows
for filename in log_files:
log_path = os.path.join(logs_dir, filename)
columns, rows = parse_log_file(log_path)
if columns:
common_columns.update(columns)
all_rows.extend(rows)
# Apply gateway filter if specified
if gateway:
all_rows = [row for row in all_rows if row.get("_gateway") == gateway]
# Apply date range filter if specified
if start_datetime or end_datetime:
filtered_rows = []
for row in all_rows:
timestamp = row.get("_timestamp")
if timestamp:
if start_datetime and timestamp < start_datetime:
continue
if end_datetime and timestamp > end_datetime:
continue
filtered_rows.append(row)
all_rows = filtered_rows
# Apply search filter if specified
if search and search.strip():
search_term = search.lower()
filtered_rows = []
for row in all_rows:
for key, value in row.items():
if isinstance(value, str) and search_term in value.lower():
filtered_rows.append(row)
break
all_rows = filtered_rows
# Sort by timestamp descending (newest first)
all_rows.sort(key=lambda x: x.get("_timestamp", datetime.min), reverse=True)
# Get unique gateway names for filter dropdown
gateways = sorted(set(row.get("_gateway") for row in all_rows if row.get("_gateway")))
# Prepare final columns list while preserving original order
# We'll use a reference order from the first log file that has columns
reference_columns = []
for filename in log_files:
log_path = os.path.join(logs_dir, filename)
first_columns, _ = parse_log_file(log_path)
if first_columns:
reference_columns = first_columns
break
# Ensure all common columns are included while preserving original order where possible
display_columns = []
# First add columns in the reference order
for col in reference_columns:
if col in common_columns:
display_columns.append(col)
common_columns.remove(col)
# Add any remaining columns
display_columns.extend(sorted(list(common_columns)))
# Add metadata columns last
meta_columns = ["_gateway", "_timestamp", "_source_file"]
final_columns = display_columns + meta_columns
# Format dates for display in datetime-local form fields
formatted_start_date = start_datetime.strftime('%Y-%m-%dT%H:%M') if start_datetime else ""
formatted_end_date = end_datetime.strftime('%Y-%m-%dT%H:%M') if end_datetime else ""
return templates.TemplateResponse("combined.html", {
"request": request,
"rows": all_rows,
"columns": final_columns,
"gateways": gateways,
"selected_gateway": gateway,
"search_term": search,
"start_date": formatted_start_date,
"end_date": formatted_end_date
})
@app.get("/api/all-entries", response_model=List[dict])
async def api_all_entries(
gateway: Optional[str] = None,
search: Optional[str] = None,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
use_default_time: Optional[str] = None
):
"""Get all log entries from all files with optional filtering"""
# Convert string parameter to boolean
use_default_time_bool = use_default_time is not None
logs_dir = os.path.join(os.getcwd(), "logs")
all_rows = []
# Parse date strings into datetime objects if provided
# or set defaults if not provided and use_default_time is True
start_datetime = None
end_datetime = None
# If no dates are provided and use_default_time is True, set defaults
if not start_date and not end_date and use_default_time_bool:
# Set end_datetime to current time - use UTC timezone for consistency
end_datetime = datetime.now().replace(tzinfo=None)
# Set start_datetime to 30 minutes ago
start_datetime = end_datetime - timedelta(minutes=30)
else:
if start_date:
try:
# Handle both ISO format and datetime-local input format
if 'T' in start_date:
# Make sure we have seconds if not provided
if len(start_date.split('T')[1].split(':')) == 2:
start_date = f"{start_date}:00"
# Add timezone if missing
if not start_date.endswith('Z') and '+' not in start_date:
start_date = f"{start_date}Z"
else:
# If only date without time, set time to start of day
start_date = f"{start_date}T00:00:00Z"
# Parse and remove timezone for consistent comparisons
start_datetime = datetime.fromisoformat(start_date.replace('Z', '+00:00')).replace(tzinfo=None)
except ValueError as e:
print(f"Error parsing start_date: {e}")
if end_date:
try:
# Handle both ISO format and datetime-local input format
if 'T' in end_date:
# Make sure we have seconds if not provided
if len(end_date.split('T')[1].split(':')) == 2:
end_date = f"{end_date}:00"
# Add timezone if missing
if not end_date.endswith('Z') and '+' not in end_date:
end_date = f"{end_date}Z"
else:
# If only date without time, set time to end of day
end_date = f"{end_date}T23:59:59Z"
# Parse and remove timezone for consistent comparisons
end_datetime = datetime.fromisoformat(end_date.replace('Z', '+00:00')).replace(tzinfo=None)
except ValueError as e:
print(f"Error parsing end_date: {e}")
# Get all log files
log_files = [f for f in os.listdir(logs_dir) if f.endswith(".logs")]
# Parse all log files and collect all rows and track column order
reference_columns = []
for filename in log_files:
log_path = os.path.join(logs_dir, filename)
columns, rows = parse_log_file(log_path)
if columns and not reference_columns:
# Save column order from first file with columns
reference_columns = columns
all_rows.extend(rows)
# Apply gateway filter if specified
if gateway:
all_rows = [row for row in all_rows if row.get("_gateway") == gateway]
# Apply date range filter if specified
if start_datetime or end_datetime:
filtered_rows = []
for row in all_rows:
timestamp = row.get("_timestamp")
if timestamp:
if start_datetime and timestamp < start_datetime:
continue
if end_datetime and timestamp > end_datetime:
continue
filtered_rows.append(row)
all_rows = filtered_rows
# Apply search filter if specified
if search and search.strip():
search_term = search.lower()
filtered_rows = []
for row in all_rows:
for key, value in row.items():
if isinstance(value, str) and search_term in value.lower():
filtered_rows.append(row)
break
all_rows = filtered_rows
# Sort by timestamp descending (newest first)
all_rows.sort(key=lambda x: x.get("_timestamp", datetime.min), reverse=True)
return all_rows
class LogRow(BaseModel):
"""Model for a parsed log row"""
index: Optional[int] = None
user: Optional[str] = None
group: Optional[str] = None
# Fields for Login Users section
auth_type: Optional[str] = None
timeout: Optional[str] = None
auth_timeout: Optional[str] = None
from_ip: Optional[str] = None
http: Optional[str] = None
https: Optional[str] = None
two_factor: Optional[str] = None
# Fields for Sessions section
source_ip: Optional[str] = None
duration: Optional[str] = None
io_bytes: Optional[str] = None
tunnel_dest_ip: Optional[str] = None
# Generic field for raw line
raw_line: str
@app.get("/view/{filename}", response_class=HTMLResponse)
async def view_log(request: Request, filename: str):
log_path = os.path.join(os.getcwd(), "logs", filename)
raw_content = ""
parsed_rows = []
header_columns = []
try:
with open(log_path, "r") as file:
raw_content = file.read()
header_columns, parsed_dict_rows = parse_log_file(log_path)
# Convert dictionary rows to LogRow objects for backward compatibility with the template
for row_dict in parsed_dict_rows:
row = LogRow(raw_line="")
# Common fields
if "Index" in row_dict and row_dict["Index"].isdigit():
row.index = int(row_dict["Index"])
if "User" in row_dict:
row.user = row_dict["User"]
if "Group" in row_dict:
row.group = row_dict["Group"]
# Login Users fields
if "Auth Type" in row_dict:
row.auth_type = row_dict["Auth Type"]
if "Timeout" in row_dict:
row.timeout = row_dict["Timeout"]
if "Auth-Timeout" in row_dict:
row.auth_timeout = row_dict["Auth-Timeout"]
if "From" in row_dict:
row.from_ip = row_dict["From"]
if "HTTP in/out" in row_dict:
row.http = row_dict["HTTP in/out"]
if "HTTPS in/out" in row_dict:
row.https = row_dict["HTTPS in/out"]
if "Two-factor Auth" in row_dict:
row.two_factor = row_dict["Two-factor Auth"]
# VPN Sessions fields
if "Source IP" in row_dict:
row.source_ip = row_dict["Source IP"]
if "Duration" in row_dict:
row.duration = row_dict["Duration"]
if "I/O Bytes" in row_dict:
row.io_bytes = row_dict["I/O Bytes"]
if "Tunnel/Dest IP" in row_dict:
row.tunnel_dest_ip = row_dict["Tunnel/Dest IP"]
parsed_rows.append(row)
except FileNotFoundError:
raw_content = f"Log file {filename} not found"
gateway, timestamp = parse_filename(filename)
return templates.TemplateResponse("view.html", {
"request": request,
"filename": filename,
"gateway": gateway,
"timestamp": timestamp,
"raw_content": raw_content,
"parsed_rows": parsed_rows,
"columns": header_columns
})
def get_all_logs() -> List[LogEntry]:
"""Get all log files in the logs directory"""
logs_dir = os.path.join(os.getcwd(), "logs")
log_files = [f for f in os.listdir(logs_dir) if f.endswith(".logs")]
result = []
for filename in log_files:
gateway, timestamp = parse_filename(filename)
if gateway and timestamp:
result.append(LogEntry(
gateway=gateway,
timestamp=timestamp,
filename=filename
))
# Sort by timestamp descending (newest first)
result.sort(key=lambda x: x.timestamp, reverse=True)
return result
def parse_filename(filename: str):
"""Parse gateway name and timestamp from filename"""
pattern = r"^(.+)_(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z)\.logs$"
match = re.match(pattern, filename)
if match:
gateway = match.group(1)
timestamp_str = match.group(2)
# Parse timestamp but remove timezone info for consistent comparisons
timestamp = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00')).replace(tzinfo=None)
return gateway, timestamp
return None, None
def parse_log_file(log_path):
"""Parse a log file and return header columns and rows"""
parsed_rows = []
header_columns = []
try:
with open(log_path, "r") as file:
content = file.read()
lines = content.splitlines()
# Find the "SSL-VPN sessions:" section
session_section_start = None
for i, line in enumerate(lines):
if "SSL-VPN sessions:" in line:
session_section_start = i
break
if session_section_start is None:
# If SSL-VPN sessions section not found, fall back to the login users section
for i, line in enumerate(lines):
if "SSL-VPN Login Users:" in line:
session_section_start = i
break
if session_section_start is None:
# No recognized sections found
return header_columns, parsed_rows
# Find header line with column names (it should be right after the section title)
header_line_idx = session_section_start + 1
if header_line_idx < len(lines):
header_line = lines[header_line_idx]
if "Index" in header_line and "User" in header_line and "Group" in header_line:
# Preserve exact order of columns from file
header_columns = [col.strip() for col in header_line.split("\t") if col.strip()]
# Parse data rows
for line in lines[header_line_idx+1:]:
# Stop parsing when we hit an empty line or a new section
if not line.strip() or line.strip().endswith("#"):
break
if line.strip() and not line.startswith("FBI-HQ-SSLVPN #"):
columns = [col.strip() for col in line.split("\t") if col]
row_data = {}
# Map columns to dictionary in original order with extra whitespace handling
for i, col in enumerate(columns):
if i < len(header_columns):
column_name = header_columns[i]
# Triple strip to ensure all possible whitespace is removed
clean_value = col.strip() if col else ""
# Special handling for Tunnel/Dest IP which may have extra spaces
if column_name == "Tunnel/Dest IP":
clean_value = clean_value.strip()
row_data[column_name] = clean_value
# Add source filename metadata
filename = os.path.basename(log_path)
gateway, timestamp = parse_filename(filename)
row_data["_source_file"] = filename
row_data["_gateway"] = gateway
row_data["_timestamp"] = timestamp
parsed_rows.append(row_data)
except Exception as e:
print(f"Error parsing log file {log_path}: {e}")
return header_columns, parsed_rows
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)