616 lines
24 KiB
Python
616 lines
24 KiB
Python
from fastapi import FastAPI, Request, Query, HTTPException
|
|
from fastapi.responses import HTMLResponse
|
|
from fastapi.templating import Jinja2Templates
|
|
from fastapi.staticfiles import StaticFiles
|
|
from typing import List, Optional
|
|
import os
|
|
import re
|
|
from datetime import datetime, timedelta
|
|
from pydantic import BaseModel
|
|
|
|
app = FastAPI(title="VPN Session Viewer")
|
|
|
|
templates = Jinja2Templates(directory="templates")
|
|
|
|
# Model for log entries
|
|
class LogEntry(BaseModel):
|
|
gateway: str
|
|
timestamp: datetime
|
|
filename: str
|
|
|
|
@app.get("/", response_class=HTMLResponse)
|
|
async def home(request: Request, gateway: Optional[str] = None):
|
|
logs = get_all_logs()
|
|
|
|
if gateway:
|
|
logs = [log for log in logs if log.gateway == gateway]
|
|
|
|
gateways = sorted(set(log.gateway for log in get_all_logs()))
|
|
|
|
return templates.TemplateResponse("index.html", {
|
|
"request": request,
|
|
"logs": logs,
|
|
"gateways": gateways,
|
|
"selected_gateway": gateway
|
|
})
|
|
|
|
@app.get("/api/logs", response_model=List[LogEntry])
|
|
async def api_logs(gateway: Optional[str] = None):
|
|
"""Get all logs or filter by gateway name"""
|
|
logs = get_all_logs()
|
|
|
|
if gateway:
|
|
logs = [log for log in logs if log.gateway == gateway]
|
|
|
|
return logs
|
|
|
|
@app.get("/api/gateways", response_model=List[str])
|
|
async def api_gateways():
|
|
"""Get list of unique gateway names"""
|
|
logs = get_all_logs()
|
|
gateways = set(log.gateway for log in logs)
|
|
return sorted(list(gateways))
|
|
|
|
@app.get("/api/log-content/{filename}", response_model=List[dict])
|
|
async def api_log_content(filename: str):
|
|
"""Get parsed log content for a specific file"""
|
|
log_path = os.path.join(os.getcwd(), "logs", filename)
|
|
|
|
try:
|
|
_, parsed_rows = parse_log_file(log_path)
|
|
except FileNotFoundError:
|
|
raise HTTPException(status_code=404, detail=f"Log file {filename} not found")
|
|
|
|
return parsed_rows
|
|
|
|
@app.get("/combined", response_class=HTMLResponse)
|
|
async def combined_view(
|
|
request: Request,
|
|
gateway: Optional[str] = None,
|
|
search: Optional[str] = None,
|
|
start_date: Optional[str] = None,
|
|
end_date: Optional[str] = None,
|
|
use_default_time: Optional[str] = None
|
|
):
|
|
"""Combined view of all logs with filtering and search"""
|
|
# Convert string form parameter to boolean
|
|
use_default_time_bool = use_default_time is not None
|
|
logs_dir = os.path.join(os.getcwd(), "logs")
|
|
all_rows = []
|
|
common_columns = set()
|
|
|
|
# Parse date strings into datetime objects if provided
|
|
# or set defaults if not provided and use_default_time is True
|
|
start_datetime = None
|
|
end_datetime = None
|
|
|
|
# If no dates are provided and use_default_time is True, set defaults
|
|
if not start_date and not end_date and use_default_time_bool:
|
|
# Set end_datetime to current time - use UTC timezone for consistency
|
|
end_datetime = datetime.now().replace(tzinfo=None)
|
|
# Set start_datetime to 30 minutes ago
|
|
start_datetime = end_datetime - timedelta(minutes=30)
|
|
else:
|
|
# Process provided dates
|
|
if start_date:
|
|
try:
|
|
# Handle both ISO format and datetime-local input format
|
|
if 'T' in start_date:
|
|
# Make sure we have seconds if not provided
|
|
if len(start_date.split('T')[1].split(':')) == 2:
|
|
start_date = f"{start_date}:00"
|
|
# Add timezone if missing
|
|
if not start_date.endswith('Z') and '+' not in start_date:
|
|
start_date = f"{start_date}Z"
|
|
else:
|
|
# If only date without time, set time to start of day
|
|
start_date = f"{start_date}T00:00:00Z"
|
|
|
|
# Parse and remove timezone for consistent comparisons
|
|
start_datetime = datetime.fromisoformat(start_date.replace('Z', '+00:00')).replace(tzinfo=None)
|
|
except ValueError as e:
|
|
print(f"Error parsing start_date: {e}")
|
|
|
|
if end_date:
|
|
try:
|
|
# Handle both ISO format and datetime-local input format
|
|
if 'T' in end_date:
|
|
# Make sure we have seconds if not provided
|
|
if len(end_date.split('T')[1].split(':')) == 2:
|
|
end_date = f"{end_date}:00"
|
|
# Add timezone if missing
|
|
if not end_date.endswith('Z') and '+' not in end_date:
|
|
end_date = f"{end_date}Z"
|
|
else:
|
|
# If only date without time, set time to end of day
|
|
end_date = f"{end_date}T23:59:59Z"
|
|
|
|
# Parse and remove timezone for consistent comparisons
|
|
end_datetime = datetime.fromisoformat(end_date.replace('Z', '+00:00')).replace(tzinfo=None)
|
|
except ValueError as e:
|
|
print(f"Error parsing end_date: {e}")
|
|
|
|
# Get all log files
|
|
log_files = [f for f in os.listdir(logs_dir) if f.endswith(".logs")]
|
|
|
|
# Parse all log files and collect all rows
|
|
for filename in log_files:
|
|
log_path = os.path.join(logs_dir, filename)
|
|
try:
|
|
columns, rows = parse_log_file(log_path)
|
|
|
|
if columns:
|
|
common_columns.update(columns)
|
|
|
|
all_rows.extend(rows)
|
|
except Exception as e:
|
|
print(f"Error processing file {filename} in combined view: {e}")
|
|
|
|
# Apply gateway filter if specified
|
|
if gateway:
|
|
all_rows = [row for row in all_rows if row.get("_gateway") == gateway]
|
|
|
|
# Apply date range filter if specified
|
|
if start_datetime or end_datetime:
|
|
filtered_rows = []
|
|
for row in all_rows:
|
|
timestamp = row.get("_timestamp")
|
|
if timestamp:
|
|
if start_datetime and timestamp < start_datetime:
|
|
continue
|
|
if end_datetime and timestamp > end_datetime:
|
|
continue
|
|
filtered_rows.append(row)
|
|
all_rows = filtered_rows
|
|
|
|
# Apply search filter if specified
|
|
if search and search.strip():
|
|
search_term = search.lower()
|
|
filtered_rows = []
|
|
|
|
for row in all_rows:
|
|
for key, value in row.items():
|
|
if isinstance(value, str) and search_term in value.lower():
|
|
filtered_rows.append(row)
|
|
break
|
|
|
|
all_rows = filtered_rows
|
|
|
|
# Sort by timestamp descending (newest first)
|
|
all_rows.sort(key=lambda x: x.get("_timestamp", datetime.min), reverse=True)
|
|
|
|
# Get unique gateway names for filter dropdown
|
|
gateways = sorted(set(row.get("_gateway") for row in all_rows if row.get("_gateway")))
|
|
|
|
# Prepare final columns list while preserving original order
|
|
# We'll use a reference order from the first log file that has columns
|
|
reference_columns = []
|
|
for filename in log_files:
|
|
log_path = os.path.join(logs_dir, filename)
|
|
first_columns, _ = parse_log_file(log_path)
|
|
if first_columns:
|
|
reference_columns = first_columns
|
|
break
|
|
|
|
# Ensure all common columns are included while preserving original order where possible
|
|
display_columns = []
|
|
# First add columns in the reference order
|
|
for col in reference_columns:
|
|
if col in common_columns:
|
|
display_columns.append(col)
|
|
common_columns.remove(col)
|
|
|
|
# Add any remaining columns
|
|
display_columns.extend(sorted(list(common_columns)))
|
|
|
|
# Add metadata columns last
|
|
meta_columns = ["_gateway", "_timestamp", "_source_file"]
|
|
final_columns = display_columns + meta_columns
|
|
|
|
# Format dates for display in datetime-local form fields
|
|
formatted_start_date = start_datetime.strftime('%Y-%m-%dT%H:%M') if start_datetime else ""
|
|
formatted_end_date = end_datetime.strftime('%Y-%m-%dT%H:%M') if end_datetime else ""
|
|
|
|
return templates.TemplateResponse("combined.html", {
|
|
"request": request,
|
|
"rows": all_rows,
|
|
"columns": final_columns,
|
|
"gateways": gateways,
|
|
"selected_gateway": gateway,
|
|
"search_term": search,
|
|
"start_date": formatted_start_date,
|
|
"end_date": formatted_end_date
|
|
})
|
|
|
|
@app.get("/api/all-entries", response_model=List[dict])
|
|
async def api_all_entries(
|
|
gateway: Optional[str] = None,
|
|
search: Optional[str] = None,
|
|
start_date: Optional[str] = None,
|
|
end_date: Optional[str] = None,
|
|
use_default_time: Optional[str] = None
|
|
):
|
|
"""Get all log entries from all files with optional filtering"""
|
|
# Convert string parameter to boolean
|
|
use_default_time_bool = use_default_time is not None
|
|
logs_dir = os.path.join(os.getcwd(), "logs")
|
|
all_rows = []
|
|
|
|
# Parse date strings into datetime objects if provided
|
|
# or set defaults if not provided and use_default_time is True
|
|
start_datetime = None
|
|
end_datetime = None
|
|
|
|
# If no dates are provided and use_default_time is True, set defaults
|
|
if not start_date and not end_date and use_default_time_bool:
|
|
# Set end_datetime to current time - use UTC timezone for consistency
|
|
end_datetime = datetime.now().replace(tzinfo=None)
|
|
# Set start_datetime to 30 minutes ago
|
|
start_datetime = end_datetime - timedelta(minutes=30)
|
|
else:
|
|
if start_date:
|
|
try:
|
|
# Handle both ISO format and datetime-local input format
|
|
if 'T' in start_date:
|
|
# Make sure we have seconds if not provided
|
|
if len(start_date.split('T')[1].split(':')) == 2:
|
|
start_date = f"{start_date}:00"
|
|
# Add timezone if missing
|
|
if not start_date.endswith('Z') and '+' not in start_date:
|
|
start_date = f"{start_date}Z"
|
|
else:
|
|
# If only date without time, set time to start of day
|
|
start_date = f"{start_date}T00:00:00Z"
|
|
|
|
# Parse and remove timezone for consistent comparisons
|
|
start_datetime = datetime.fromisoformat(start_date.replace('Z', '+00:00')).replace(tzinfo=None)
|
|
except ValueError as e:
|
|
print(f"Error parsing start_date: {e}")
|
|
|
|
if end_date:
|
|
try:
|
|
# Handle both ISO format and datetime-local input format
|
|
if 'T' in end_date:
|
|
# Make sure we have seconds if not provided
|
|
if len(end_date.split('T')[1].split(':')) == 2:
|
|
end_date = f"{end_date}:00"
|
|
# Add timezone if missing
|
|
if not end_date.endswith('Z') and '+' not in end_date:
|
|
end_date = f"{end_date}Z"
|
|
else:
|
|
# If only date without time, set time to end of day
|
|
end_date = f"{end_date}T23:59:59Z"
|
|
|
|
# Parse and remove timezone for consistent comparisons
|
|
end_datetime = datetime.fromisoformat(end_date.replace('Z', '+00:00')).replace(tzinfo=None)
|
|
except ValueError as e:
|
|
print(f"Error parsing end_date: {e}")
|
|
|
|
# Get all log files
|
|
log_files = [f for f in os.listdir(logs_dir) if f.endswith(".logs")]
|
|
|
|
# Parse all log files and collect all rows and track column order
|
|
reference_columns = []
|
|
for filename in log_files:
|
|
log_path = os.path.join(logs_dir, filename)
|
|
try:
|
|
columns, rows = parse_log_file(log_path)
|
|
if columns and not reference_columns:
|
|
# Save column order from first file with columns
|
|
reference_columns = columns
|
|
all_rows.extend(rows)
|
|
except Exception as e:
|
|
print(f"Error processing file {filename} in api_all_entries: {e}")
|
|
|
|
# Apply gateway filter if specified
|
|
if gateway:
|
|
all_rows = [row for row in all_rows if row.get("_gateway") == gateway]
|
|
|
|
# Apply date range filter if specified
|
|
if start_datetime or end_datetime:
|
|
filtered_rows = []
|
|
for row in all_rows:
|
|
timestamp = row.get("_timestamp")
|
|
if timestamp:
|
|
if start_datetime and timestamp < start_datetime:
|
|
continue
|
|
if end_datetime and timestamp > end_datetime:
|
|
continue
|
|
filtered_rows.append(row)
|
|
all_rows = filtered_rows
|
|
|
|
# Apply search filter if specified
|
|
if search and search.strip():
|
|
search_term = search.lower()
|
|
filtered_rows = []
|
|
|
|
for row in all_rows:
|
|
for key, value in row.items():
|
|
if isinstance(value, str) and search_term in value.lower():
|
|
filtered_rows.append(row)
|
|
break
|
|
|
|
all_rows = filtered_rows
|
|
|
|
# Sort by timestamp descending (newest first)
|
|
all_rows.sort(key=lambda x: x.get("_timestamp", datetime.min), reverse=True)
|
|
|
|
return all_rows
|
|
|
|
class LogRow(BaseModel):
|
|
"""Model for a parsed log row"""
|
|
index: Optional[int] = None
|
|
user: Optional[str] = None
|
|
group: Optional[str] = None
|
|
# Fields for Login Users section
|
|
auth_type: Optional[str] = None
|
|
timeout: Optional[str] = None
|
|
auth_timeout: Optional[str] = None
|
|
from_ip: Optional[str] = None
|
|
http: Optional[str] = None
|
|
https: Optional[str] = None
|
|
two_factor: Optional[str] = None
|
|
# Fields for Sessions section
|
|
source_ip: Optional[str] = None
|
|
duration: Optional[str] = None
|
|
io_bytes: Optional[str] = None
|
|
tunnel_dest_ip: Optional[str] = None
|
|
# Generic field for raw line
|
|
raw_line: str
|
|
|
|
@app.get("/view/{filename}", response_class=HTMLResponse)
|
|
async def view_log(request: Request, filename: str):
|
|
log_path = os.path.join(os.getcwd(), "logs", filename)
|
|
raw_content = None
|
|
parsed_rows = []
|
|
header_columns = []
|
|
|
|
try:
|
|
# Read the file in binary mode first to check for encodings
|
|
with open(log_path, "rb") as file:
|
|
binary_content = file.read()
|
|
|
|
# Check for BOM (Byte Order Mark) at the beginning of the file
|
|
raw_content = None
|
|
|
|
# Check for UTF-16 LE BOM
|
|
if binary_content.startswith(b'\xff\xfe'):
|
|
try:
|
|
raw_content = binary_content.decode('utf-16-le')
|
|
except UnicodeDecodeError:
|
|
pass
|
|
|
|
# Check for UTF-16 BE BOM
|
|
if raw_content is None and binary_content.startswith(b'\xfe\xff'):
|
|
try:
|
|
raw_content = binary_content.decode('utf-16-be')
|
|
except UnicodeDecodeError:
|
|
pass
|
|
|
|
# Try UTF-8
|
|
if raw_content is None:
|
|
try:
|
|
raw_content = binary_content.decode('utf-8')
|
|
except UnicodeDecodeError:
|
|
pass
|
|
|
|
# Try common encodings if we still don't have content
|
|
if raw_content is None:
|
|
for encoding in ['utf-16', 'latin1', 'cp1252', 'iso-8859-1']:
|
|
try:
|
|
raw_content = binary_content.decode(encoding)
|
|
break
|
|
except UnicodeDecodeError:
|
|
continue
|
|
|
|
# If all decodings fail, use latin1 as a fallback with replacement
|
|
if raw_content is None:
|
|
raw_content = binary_content.decode('latin1', errors='replace')
|
|
|
|
header_columns, parsed_dict_rows = parse_log_file(log_path)
|
|
|
|
# Convert dictionary rows to LogRow objects for backward compatibility with the template
|
|
for row_dict in parsed_dict_rows:
|
|
row = LogRow(raw_line="")
|
|
|
|
# Common fields
|
|
if "Index" in row_dict and row_dict["Index"].isdigit():
|
|
row.index = int(row_dict["Index"])
|
|
if "User" in row_dict:
|
|
row.user = row_dict["User"]
|
|
if "Group" in row_dict:
|
|
row.group = row_dict["Group"]
|
|
|
|
# Login Users fields
|
|
if "Auth Type" in row_dict:
|
|
row.auth_type = row_dict["Auth Type"]
|
|
if "Timeout" in row_dict:
|
|
row.timeout = row_dict["Timeout"]
|
|
if "Auth-Timeout" in row_dict:
|
|
row.auth_timeout = row_dict["Auth-Timeout"]
|
|
if "From" in row_dict:
|
|
row.from_ip = row_dict["From"]
|
|
if "HTTP in/out" in row_dict:
|
|
row.http = row_dict["HTTP in/out"]
|
|
if "HTTPS in/out" in row_dict:
|
|
row.https = row_dict["HTTPS in/out"]
|
|
if "Two-factor Auth" in row_dict:
|
|
row.two_factor = row_dict["Two-factor Auth"]
|
|
|
|
# VPN Sessions fields
|
|
if "Source IP" in row_dict:
|
|
row.source_ip = row_dict["Source IP"]
|
|
if "Duration" in row_dict:
|
|
row.duration = row_dict["Duration"]
|
|
if "I/O Bytes" in row_dict:
|
|
row.io_bytes = row_dict["I/O Bytes"]
|
|
if "Tunnel/Dest IP" in row_dict:
|
|
row.tunnel_dest_ip = row_dict["Tunnel/Dest IP"]
|
|
|
|
parsed_rows.append(row)
|
|
|
|
except FileNotFoundError:
|
|
raw_content = f"Log file {filename} not found"
|
|
|
|
gateway, timestamp = parse_filename(filename)
|
|
|
|
return templates.TemplateResponse("view.html", {
|
|
"request": request,
|
|
"filename": filename,
|
|
"gateway": gateway,
|
|
"timestamp": timestamp,
|
|
"raw_content": raw_content,
|
|
"parsed_rows": parsed_rows,
|
|
"columns": header_columns
|
|
})
|
|
|
|
def get_all_logs() -> List[LogEntry]:
|
|
"""Get all log files in the logs directory"""
|
|
logs_dir = os.path.join(os.getcwd(), "logs")
|
|
log_files = [f for f in os.listdir(logs_dir) if f.endswith(".logs")]
|
|
result = []
|
|
|
|
for filename in log_files:
|
|
try:
|
|
gateway, timestamp = parse_filename(filename)
|
|
if gateway and timestamp:
|
|
result.append(LogEntry(
|
|
gateway=gateway,
|
|
timestamp=timestamp,
|
|
filename=filename
|
|
))
|
|
else:
|
|
print(f"Could not parse filename: {filename}")
|
|
except Exception as e:
|
|
print(f"Error processing log file {filename}: {e}")
|
|
|
|
# Sort by timestamp descending (newest first)
|
|
result.sort(key=lambda x: x.timestamp, reverse=True)
|
|
return result
|
|
|
|
def parse_filename(filename: str):
|
|
"""Parse gateway name and timestamp from filename"""
|
|
pattern = r"^(.+)_(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z)\.logs$"
|
|
match = re.match(pattern, filename)
|
|
|
|
if match:
|
|
gateway = match.group(1)
|
|
timestamp_str = match.group(2)
|
|
# Parse timestamp but remove timezone info for consistent comparisons
|
|
timestamp = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00')).replace(tzinfo=None)
|
|
return gateway, timestamp
|
|
|
|
return None, None
|
|
|
|
def parse_log_file(log_path):
|
|
"""Parse a log file and return header columns and rows"""
|
|
parsed_rows = []
|
|
header_columns = []
|
|
|
|
try:
|
|
# Read the file in binary mode first to check for encodings
|
|
with open(log_path, "rb") as file:
|
|
binary_content = file.read()
|
|
|
|
# Check for BOM (Byte Order Mark) at the beginning of the file
|
|
content = None
|
|
|
|
# Check for UTF-16 LE BOM
|
|
if binary_content.startswith(b'\xff\xfe'):
|
|
try:
|
|
content = binary_content.decode('utf-16-le')
|
|
except UnicodeDecodeError:
|
|
pass
|
|
|
|
# Check for UTF-16 BE BOM
|
|
if content is None and binary_content.startswith(b'\xfe\xff'):
|
|
try:
|
|
content = binary_content.decode('utf-16-be')
|
|
except UnicodeDecodeError:
|
|
pass
|
|
|
|
# Try UTF-8
|
|
if content is None:
|
|
try:
|
|
content = binary_content.decode('utf-8')
|
|
except UnicodeDecodeError:
|
|
pass
|
|
|
|
# Try common encodings if we still don't have content
|
|
if content is None:
|
|
for encoding in ['utf-16', 'latin1', 'cp1252', 'iso-8859-1']:
|
|
try:
|
|
content = binary_content.decode(encoding)
|
|
break
|
|
except UnicodeDecodeError:
|
|
continue
|
|
|
|
# If all decodings fail, use latin1 as a fallback with replacement
|
|
if content is None:
|
|
content = binary_content.decode('latin1', errors='replace')
|
|
|
|
lines = content.splitlines()
|
|
|
|
# Find the "SSL-VPN sessions:" section
|
|
session_section_start = None
|
|
for i, line in enumerate(lines):
|
|
if "SSL-VPN sessions:" in line:
|
|
session_section_start = i
|
|
break
|
|
|
|
if session_section_start is None:
|
|
# If SSL-VPN sessions section not found, fall back to the login users section
|
|
for i, line in enumerate(lines):
|
|
if "SSL-VPN Login Users:" in line:
|
|
session_section_start = i
|
|
break
|
|
|
|
if session_section_start is None:
|
|
# No recognized sections found
|
|
return header_columns, parsed_rows
|
|
|
|
# Find header line with column names (it should be right after the section title)
|
|
header_line_idx = session_section_start + 1
|
|
if header_line_idx < len(lines):
|
|
header_line = lines[header_line_idx]
|
|
if "Index" in header_line and "User" in header_line and "Group" in header_line:
|
|
# Preserve exact order of columns from file
|
|
header_columns = [col.strip() for col in header_line.split("\t") if col.strip()]
|
|
|
|
# Parse data rows
|
|
for line in lines[header_line_idx+1:]:
|
|
# Stop parsing when we hit an empty line or a new section
|
|
if not line.strip() or line.strip().endswith("#"):
|
|
break
|
|
|
|
if line.strip() and not line.startswith("FBI-HQ-SSLVPN #"):
|
|
columns = [col.strip() for col in line.split("\t") if col]
|
|
row_data = {}
|
|
|
|
# Map columns to dictionary in original order with extra whitespace handling
|
|
for i, col in enumerate(columns):
|
|
if i < len(header_columns):
|
|
column_name = header_columns[i]
|
|
# Triple strip to ensure all possible whitespace is removed
|
|
clean_value = col.strip() if col else ""
|
|
# Special handling for Tunnel/Dest IP which may have extra spaces
|
|
if column_name == "Tunnel/Dest IP":
|
|
clean_value = clean_value.strip()
|
|
row_data[column_name] = clean_value
|
|
|
|
# Add source filename metadata
|
|
filename = os.path.basename(log_path)
|
|
gateway, timestamp = parse_filename(filename)
|
|
row_data["_source_file"] = filename
|
|
row_data["_gateway"] = gateway
|
|
row_data["_timestamp"] = timestamp
|
|
|
|
parsed_rows.append(row_data)
|
|
except Exception as e:
|
|
print(f"Error parsing log file {log_path}: {e}")
|
|
|
|
return header_columns, parsed_rows
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run(app, host="0.0.0.0", port=8000)
|