002-Remote-Logging-Funnel/main.py
2025-06-08 22:13:26 +02:00

262 lines
No EOL
8.2 KiB
Python

# Simple FastAPI Data Collector
# Purpose: Accept simple comma-separated input and save to text files
# Usage: curl -X POST http://localhost:8000/api/run1/ -d "host_a,is ok"
from fastapi import FastAPI, HTTPException, Request, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from datetime import datetime
from pathlib import Path
import re
import bleach
import os
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Initialize FastAPI application
app = FastAPI(
title="Simple Data Collector",
version="1.0.0",
description="Simple API for collecting comma-separated data"
)
# Define the base directory for input files
INPUT_DIR = Path("input")
INPUT_DIR.mkdir(exist_ok=True) # Create directory if it doesn't exist
# Load API keys from environment variables (with fallback for backward compatibility)
INPUT_API_KEYS = [
key.strip() for key in os.getenv("INPUT_API_KEYS", "input_token_123").split(",")
if key.strip()
]
READ_API_KEYS = [
key.strip() for key in os.getenv("READ_API_KEYS", "read_token_456").split(",")
if key.strip()
]
# Validate that we have at least one key for each operation
if not INPUT_API_KEYS:
raise ValueError("At least one INPUT_API_KEY must be configured")
if not READ_API_KEYS:
raise ValueError("At least one READ_API_KEY must be configured")
# Security schemes
input_security = HTTPBearer()
read_security = HTTPBearer()
def verify_input_token(credentials: HTTPAuthorizationCredentials = Depends(input_security)):
"""Verify bearer token for input operations (supports multiple API keys)"""
if credentials.credentials not in INPUT_API_KEYS:
raise HTTPException(
status_code=401,
detail="Invalid authentication token for input operations"
)
return credentials
def verify_read_token(credentials: HTTPAuthorizationCredentials = Depends(read_security)):
"""Verify bearer token for read operations (supports multiple API keys)"""
if credentials.credentials not in READ_API_KEYS:
raise HTTPException(
status_code=401,
detail="Invalid authentication token for read operations"
)
return credentials
def sanitize_input(data: str) -> str:
"""
Sanitize input data using bleach to prevent malicious content.
Args:
data (str): Raw input data
Returns:
str: Sanitized data safe for storage
"""
if not data:
return ""
# Clean with bleach - strip all HTML tags and dangerous content
sanitized = bleach.clean(data, tags=[], attributes={}, strip=True)
# Remove control characters except newlines and tabs
sanitized = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', sanitized)
# Normalize whitespace
sanitized = re.sub(r'\s+', ' ', sanitized).strip()
# Limit length to prevent DoS
max_length = 1000
if len(sanitized) > max_length:
sanitized = sanitized[:max_length] + "... [truncated]"
return sanitized
def sanitize_run_name(run_name: str) -> str:
"""
Sanitize run name to ensure it's safe for filesystem operations.
Args:
run_name (str): Raw run name
Returns:
str: Sanitized run name safe for filesystem
"""
if not run_name:
raise HTTPException(status_code=400, detail="Run name cannot be empty")
# Use bleach to clean any potential HTML/script content
sanitized = bleach.clean(run_name, tags=[], attributes={}, strip=True)
# Remove dangerous filesystem characters
sanitized = re.sub(r'[^\w\-.]', '_', sanitized)
# Remove leading/trailing dots and dashes
sanitized = sanitized.strip('.-')
# Prevent reserved names
reserved_names = ['con', 'prn', 'aux', 'nul', 'com1', 'com2', 'com3', 'com4',
'com5', 'com6', 'com7', 'com8', 'com9', 'lpt1', 'lpt2',
'lpt3', 'lpt4', 'lpt5', 'lpt6', 'lpt7', 'lpt8', 'lpt9']
if sanitized.lower() in reserved_names:
sanitized = f"safe_{sanitized}"
# Limit length
if len(sanitized) > 50:
sanitized = sanitized[:50]
if not sanitized:
raise HTTPException(status_code=400, detail="Invalid run name after sanitization")
return sanitized
def save_to_file(run_name: str, data: str):
"""
Save data as a single line to input/{run_name}/results.txt
Args:
run_name (str): Name of the run (creates subdirectory)
data (str): The data to save as a single line
"""
# Create run-specific directory
run_dir = INPUT_DIR / run_name
run_dir.mkdir(exist_ok=True)
# Define the results file path
results_file = run_dir / "results.txt"
# Get current timestamp
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Append the data with timestamp to the file
with open(results_file, "a", encoding="utf-8") as f:
f.write(f"{timestamp} - {data.strip()}\n")
@app.get("/")
async def root():
"""Root endpoint providing API information"""
return {
"message": "Simple Data Collector API",
"version": "1.0.0",
"usage": "POST /api/{run_name}/ with data in body",
"example": "curl -X POST http://localhost:8000/api/run1/ -d 'host_a,is ok'",
"output": "Data saved to input/{run_name}/results.txt"
}
@app.post("/api/{run_name}/")
async def collect_data(run_name: str, request: Request, token: HTTPAuthorizationCredentials = Depends(verify_input_token)):
"""
Collect simple data and save to text file.
Args:
run_name (str): Name of the run (creates input/{run_name}/ directory)
request: Raw request body containing the data
Returns:
dict: Confirmation message
Example:
curl -X POST http://localhost:8000/api/run1/ -d "host_a,is ok"
-> Saves to input/run1/results.txt
"""
# Read the raw body data
body = await request.body()
data = body.decode("utf-8")
# Validate that we have some data
if not data.strip():
raise HTTPException(status_code=400, detail="No data provided")
# Sanitize input data and run name
sanitized_data = sanitize_input(data)
sanitized_run_name = sanitize_run_name(run_name)
# Save to file
save_to_file(sanitized_run_name, sanitized_data)
# Return confirmation
return {
"message": "Data saved successfully",
"run_name": sanitized_run_name,
"data": sanitized_data,
"saved_to": f"input/{sanitized_run_name}/results.txt",
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
@app.get("/results/{run_name}/")
async def get_results(run_name: str, token: HTTPAuthorizationCredentials = Depends(verify_read_token)):
"""
Retrieve all results for a specific run.
Args:
run_name (str): Name of the run to get results for
Returns:
dict: All lines from the results file
"""
# Sanitize run name for safe filesystem access
sanitized_run_name = sanitize_run_name(run_name)
results_file = INPUT_DIR / sanitized_run_name / "results.txt"
if not results_file.exists():
raise HTTPException(
status_code=404,
detail=f"No results found for run '{sanitized_run_name}'"
)
# Read all lines from the file
with open(results_file, "r", encoding="utf-8") as f:
lines = f.readlines()
return {
"run_name": sanitized_run_name,
"total_entries": len(lines),
"results": [line.strip() for line in lines]
}
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {
"status": "healthy",
"service": "Simple Data Collector",
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
# Application entry point
if __name__ == "__main__":
import uvicorn
print("Starting Simple Data Collector API...")
print("Server will be available at: http://localhost:8000")
print("API documentation at: http://localhost:8000/docs")
print("Example: curl -X POST http://localhost:8000/api/run1/ -d 'host_a,is ok'")
print("Results saved to: ./input/{run_name}/results.txt")
uvicorn.run(
app,
host="0.0.0.0",
port=8000,
log_level="info"
)