pre api key
This commit is contained in:
commit
9569f90aba
8 changed files with 900 additions and 0 deletions
245
main.py
Normal file
245
main.py
Normal file
|
@ -0,0 +1,245 @@
|
|||
# Simple FastAPI Data Collector
|
||||
# Purpose: Accept simple comma-separated input and save to text files
|
||||
# Usage: curl -X POST http://localhost:8000/api/run1/ -d "host_a,is ok"
|
||||
|
||||
from fastapi import FastAPI, HTTPException, Request, Depends
|
||||
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
import re
|
||||
import bleach
|
||||
|
||||
# Initialize FastAPI application
|
||||
app = FastAPI(
|
||||
title="Simple Data Collector",
|
||||
version="1.0.0",
|
||||
description="Simple API for collecting comma-separated data"
|
||||
)
|
||||
|
||||
# Define the base directory for input files
|
||||
INPUT_DIR = Path("input")
|
||||
INPUT_DIR.mkdir(exist_ok=True) # Create directory if it doesn't exist
|
||||
|
||||
# Authentication tokens
|
||||
INPUT_TOKEN = "input_token_123" # Token for POST endpoints
|
||||
READ_TOKEN = "read_token_456" # Token for GET endpoints
|
||||
|
||||
# Security schemes
|
||||
input_security = HTTPBearer()
|
||||
read_security = HTTPBearer()
|
||||
|
||||
def verify_input_token(credentials: HTTPAuthorizationCredentials = Depends(input_security)):
|
||||
"""Verify bearer token for input operations"""
|
||||
if credentials.credentials != INPUT_TOKEN:
|
||||
raise HTTPException(
|
||||
status_code=401,
|
||||
detail="Invalid authentication token for input operations"
|
||||
)
|
||||
return credentials
|
||||
|
||||
def verify_read_token(credentials: HTTPAuthorizationCredentials = Depends(read_security)):
|
||||
"""Verify bearer token for read operations"""
|
||||
if credentials.credentials != READ_TOKEN:
|
||||
raise HTTPException(
|
||||
status_code=401,
|
||||
detail="Invalid authentication token for read operations"
|
||||
)
|
||||
return credentials
|
||||
|
||||
def sanitize_input(data: str) -> str:
|
||||
"""
|
||||
Sanitize input data using bleach to prevent malicious content.
|
||||
|
||||
Args:
|
||||
data (str): Raw input data
|
||||
|
||||
Returns:
|
||||
str: Sanitized data safe for storage
|
||||
"""
|
||||
if not data:
|
||||
return ""
|
||||
|
||||
# Clean with bleach - strip all HTML tags and dangerous content
|
||||
sanitized = bleach.clean(data, tags=[], attributes={}, strip=True)
|
||||
|
||||
# Remove control characters except newlines and tabs
|
||||
sanitized = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', sanitized)
|
||||
|
||||
# Normalize whitespace
|
||||
sanitized = re.sub(r'\s+', ' ', sanitized).strip()
|
||||
|
||||
# Limit length to prevent DoS
|
||||
max_length = 1000
|
||||
if len(sanitized) > max_length:
|
||||
sanitized = sanitized[:max_length] + "... [truncated]"
|
||||
|
||||
return sanitized
|
||||
|
||||
def sanitize_run_name(run_name: str) -> str:
|
||||
"""
|
||||
Sanitize run name to ensure it's safe for filesystem operations.
|
||||
|
||||
Args:
|
||||
run_name (str): Raw run name
|
||||
|
||||
Returns:
|
||||
str: Sanitized run name safe for filesystem
|
||||
"""
|
||||
if not run_name:
|
||||
raise HTTPException(status_code=400, detail="Run name cannot be empty")
|
||||
|
||||
# Use bleach to clean any potential HTML/script content
|
||||
sanitized = bleach.clean(run_name, tags=[], attributes={}, strip=True)
|
||||
|
||||
# Remove dangerous filesystem characters
|
||||
sanitized = re.sub(r'[^\w\-.]', '_', sanitized)
|
||||
|
||||
# Remove leading/trailing dots and dashes
|
||||
sanitized = sanitized.strip('.-')
|
||||
|
||||
# Prevent reserved names
|
||||
reserved_names = ['con', 'prn', 'aux', 'nul', 'com1', 'com2', 'com3', 'com4',
|
||||
'com5', 'com6', 'com7', 'com8', 'com9', 'lpt1', 'lpt2',
|
||||
'lpt3', 'lpt4', 'lpt5', 'lpt6', 'lpt7', 'lpt8', 'lpt9']
|
||||
|
||||
if sanitized.lower() in reserved_names:
|
||||
sanitized = f"safe_{sanitized}"
|
||||
|
||||
# Limit length
|
||||
if len(sanitized) > 50:
|
||||
sanitized = sanitized[:50]
|
||||
|
||||
if not sanitized:
|
||||
raise HTTPException(status_code=400, detail="Invalid run name after sanitization")
|
||||
|
||||
return sanitized
|
||||
|
||||
def save_to_file(run_name: str, data: str):
|
||||
"""
|
||||
Save data as a single line to input/{run_name}/results.txt
|
||||
|
||||
Args:
|
||||
run_name (str): Name of the run (creates subdirectory)
|
||||
data (str): The data to save as a single line
|
||||
"""
|
||||
# Create run-specific directory
|
||||
run_dir = INPUT_DIR / run_name
|
||||
run_dir.mkdir(exist_ok=True)
|
||||
|
||||
# Define the results file path
|
||||
results_file = run_dir / "results.txt"
|
||||
|
||||
# Get current timestamp
|
||||
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
|
||||
# Append the data with timestamp to the file
|
||||
with open(results_file, "a", encoding="utf-8") as f:
|
||||
f.write(f"{timestamp} - {data.strip()}\n")
|
||||
|
||||
@app.get("/")
|
||||
async def root():
|
||||
"""Root endpoint providing API information"""
|
||||
return {
|
||||
"message": "Simple Data Collector API",
|
||||
"version": "1.0.0",
|
||||
"usage": "POST /api/{run_name}/ with data in body",
|
||||
"example": "curl -X POST http://localhost:8000/api/run1/ -d 'host_a,is ok'",
|
||||
"output": "Data saved to input/{run_name}/results.txt"
|
||||
}
|
||||
|
||||
@app.post("/api/{run_name}/")
|
||||
async def collect_data(run_name: str, request: Request, token: HTTPAuthorizationCredentials = Depends(verify_input_token)):
|
||||
"""
|
||||
Collect simple data and save to text file.
|
||||
|
||||
Args:
|
||||
run_name (str): Name of the run (creates input/{run_name}/ directory)
|
||||
request: Raw request body containing the data
|
||||
|
||||
Returns:
|
||||
dict: Confirmation message
|
||||
|
||||
Example:
|
||||
curl -X POST http://localhost:8000/api/run1/ -d "host_a,is ok"
|
||||
-> Saves to input/run1/results.txt
|
||||
"""
|
||||
# Read the raw body data
|
||||
body = await request.body()
|
||||
data = body.decode("utf-8")
|
||||
|
||||
# Validate that we have some data
|
||||
if not data.strip():
|
||||
raise HTTPException(status_code=400, detail="No data provided")
|
||||
|
||||
# Sanitize input data and run name
|
||||
sanitized_data = sanitize_input(data)
|
||||
sanitized_run_name = sanitize_run_name(run_name)
|
||||
|
||||
# Save to file
|
||||
save_to_file(sanitized_run_name, sanitized_data)
|
||||
|
||||
# Return confirmation
|
||||
return {
|
||||
"message": "Data saved successfully",
|
||||
"run_name": sanitized_run_name,
|
||||
"data": sanitized_data,
|
||||
"saved_to": f"input/{sanitized_run_name}/results.txt",
|
||||
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
}
|
||||
|
||||
@app.get("/results/{run_name}/")
|
||||
async def get_results(run_name: str, token: HTTPAuthorizationCredentials = Depends(verify_read_token)):
|
||||
"""
|
||||
Retrieve all results for a specific run.
|
||||
|
||||
Args:
|
||||
run_name (str): Name of the run to get results for
|
||||
|
||||
Returns:
|
||||
dict: All lines from the results file
|
||||
"""
|
||||
# Sanitize run name for safe filesystem access
|
||||
sanitized_run_name = sanitize_run_name(run_name)
|
||||
results_file = INPUT_DIR / sanitized_run_name / "results.txt"
|
||||
|
||||
if not results_file.exists():
|
||||
raise HTTPException(
|
||||
status_code=404,
|
||||
detail=f"No results found for run '{sanitized_run_name}'"
|
||||
)
|
||||
|
||||
# Read all lines from the file
|
||||
with open(results_file, "r", encoding="utf-8") as f:
|
||||
lines = f.readlines()
|
||||
|
||||
return {
|
||||
"run_name": sanitized_run_name,
|
||||
"total_entries": len(lines),
|
||||
"results": [line.strip() for line in lines]
|
||||
}
|
||||
|
||||
@app.get("/health")
|
||||
async def health_check():
|
||||
"""Health check endpoint"""
|
||||
return {
|
||||
"status": "healthy",
|
||||
"service": "Simple Data Collector",
|
||||
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
}
|
||||
|
||||
# Application entry point
|
||||
if __name__ == "__main__":
|
||||
import uvicorn
|
||||
|
||||
print("Starting Simple Data Collector API...")
|
||||
print("Server will be available at: http://localhost:8000")
|
||||
print("API documentation at: http://localhost:8000/docs")
|
||||
print("Example: curl -X POST http://localhost:8000/api/run1/ -d 'host_a,is ok'")
|
||||
print("Results saved to: ./input/{run_name}/results.txt")
|
||||
|
||||
uvicorn.run(
|
||||
app,
|
||||
host="0.0.0.0",
|
||||
port=8000,
|
||||
log_level="info"
|
||||
)
|
Loading…
Add table
Add a link
Reference in a new issue