Compare commits
4 commits
6ce10f673e
...
616bb8d014
Author | SHA1 | Date | |
---|---|---|---|
616bb8d014 | |||
a7cc072777 | |||
7db919bcb7 | |||
fc72f6f51c |
4 changed files with 531 additions and 169 deletions
57
README.md
57
README.md
|
@ -85,11 +85,39 @@ Where columns are:
|
|||
4. Record Type (A, AAAA, MX, CNAME, TXT, etc.)
|
||||
5. Record Data (IP address, hostname, or other data depending on record type)
|
||||
|
||||
## Domain Base Name Detection
|
||||
|
||||
The application includes functionality to identify base domains from fully qualified domain names, including handling of multi-part TLDs like ".co.uk" or ".com.au".
|
||||
|
||||
### Multi-Part TLD List
|
||||
|
||||
The application uses a hardcoded list of common multi-part TLDs to correctly extract base domains (e.g., "example.co.uk" from "mail.example.co.uk").
|
||||
|
||||
This list can be found in `main.py` as `MULTI_PART_TLDS`.
|
||||
|
||||
### Updating the TLD List
|
||||
|
||||
To ensure accurate domain parsing, you should periodically update the multi-part TLD list. The best sources for this information are:
|
||||
|
||||
1. **Public Suffix List (PSL)**: The most comprehensive and authoritative source
|
||||
- Website: https://publicsuffix.org/list/
|
||||
- GitHub: https://github.com/publicsuffix/list
|
||||
- This list is maintained by Mozilla and used by browsers and DNS applications
|
||||
|
||||
2. **IANA's TLD Database**: The official registry of top-level domains
|
||||
- Website: https://www.iana.org/domains/root/db
|
||||
|
||||
3. **Commercial Domain Registrars**: Often provide lists of available TLDs
|
||||
- Examples: GoDaddy, Namecheap, etc.
|
||||
|
||||
For the most accurate and comprehensive implementation, consider implementing a parser for the Public Suffix List or using a library that maintains this list (e.g., `publicsuffix2` for Python).
|
||||
|
||||
## API Endpoints
|
||||
|
||||
- `/api/uploads` - Get all uploads
|
||||
- `/api/slds` - Get all SLDs (Second Level Domains)
|
||||
- `/api/slds/{sld}` - Get domains by SLD
|
||||
- `/api/domains` - Get all domains
|
||||
- `/api/base-domains` - Get only unique base domains (e.g., example.com, example.co.uk) with simplified response format
|
||||
- `/api/domains/{domain}` - Get domains by name
|
||||
- `/api/dns` - Get all DNS records
|
||||
- `/api/dns/types` - Get unique values for filters
|
||||
|
||||
|
@ -100,8 +128,27 @@ You can filter the API results using the following query parameters:
|
|||
- `upload_id` - Filter by specific upload
|
||||
- `record_type` - Filter by DNS record type
|
||||
- `record_class` - Filter by DNS record class
|
||||
- `tld` - Filter by Top Level Domain
|
||||
- `sld` - Filter by Second Level Domain
|
||||
- `domain` - Search by domain name
|
||||
- `base_domains_only` - Only show base domains (e.g., example.com not mail.example.com)
|
||||
- `deduplicate` - For DNS records, control whether to show all records or deduplicate
|
||||
|
||||
Example: `/api/dns?record_type=A&tld=com&upload_id=upload_20250408120000`
|
||||
Examples:
|
||||
- `/api/domains?base_domains_only=true` - Show only base domains
|
||||
- `/api/base-domains` - Get a simplified list of unique base domains
|
||||
- `/api/dns?record_type=A&domain=example.com&deduplicate=false` - Show all A records for example.com without deduplication
|
||||
|
||||
### Response Format Examples
|
||||
|
||||
1. Base Domains Endpoint (`/api/base-domains`):
|
||||
```json
|
||||
[
|
||||
{
|
||||
"domain": "example.com",
|
||||
"timestamp": "2025-04-08T12:00:00"
|
||||
},
|
||||
{
|
||||
"domain": "example.co.uk",
|
||||
"timestamp": "2025-04-08T12:00:00"
|
||||
}
|
||||
]
|
||||
```
|
||||
|
|
385
main.py
385
main.py
|
@ -3,7 +3,7 @@ import re
|
|||
import io
|
||||
import datetime
|
||||
from fastapi import FastAPI, Request, HTTPException, Query, UploadFile, File, Form
|
||||
from fastapi.responses import HTMLResponse, RedirectResponse
|
||||
from fastapi.responses import HTMLResponse, RedirectResponse, Response
|
||||
from fastapi.staticfiles import StaticFiles
|
||||
from fastapi.templating import Jinja2Templates
|
||||
import uvicorn
|
||||
|
@ -32,29 +32,11 @@ def process_domain_entry(domain_entry):
|
|||
if domain_entry.endswith('.'):
|
||||
domain_entry = domain_entry[:-1]
|
||||
|
||||
# Parse domain components
|
||||
parts = domain_entry.split('.')
|
||||
if len(parts) > 1:
|
||||
# For domain.tld format
|
||||
if len(parts) == 2:
|
||||
sld = parts[0] # Second Level Domain
|
||||
tld = parts[1] # Top Level Domain
|
||||
domain_info = {
|
||||
"sld": sld,
|
||||
"tld": tld,
|
||||
"full_domain": domain_entry
|
||||
}
|
||||
# For subdomain.domain.tld format
|
||||
else:
|
||||
sld = parts[-2] # Second Level Domain
|
||||
tld = parts[-1] # Top Level Domain
|
||||
subdomain = '.'.join(parts[:-2]) # Subdomains
|
||||
domain_info = {
|
||||
"sld": sld,
|
||||
"tld": tld,
|
||||
"full_domain": domain_entry,
|
||||
"subdomain": subdomain
|
||||
}
|
||||
if domain_entry:
|
||||
# Store only the full domain name without splitting
|
||||
domain_info = {
|
||||
"full_domain": domain_entry
|
||||
}
|
||||
return domain_info
|
||||
return None
|
||||
|
||||
|
@ -90,7 +72,7 @@ async def process_csv_upload(file_content, upload_id, description=None):
|
|||
domain_info = process_domain_entry(domain_entry)
|
||||
if domain_info:
|
||||
# Create a unique key to avoid duplicates within this upload
|
||||
unique_key = f"{domain_info['sld']}.{domain_info['tld']}"
|
||||
unique_key = domain_info['full_domain']
|
||||
|
||||
if unique_key not in unique_domains:
|
||||
unique_domains.add(unique_key)
|
||||
|
@ -127,22 +109,9 @@ async def process_csv_upload(file_content, upload_id, description=None):
|
|||
"timestamp": timestamp
|
||||
}
|
||||
|
||||
# Add domain components
|
||||
if len(domain_parts) > 1:
|
||||
if domain_parts[0].startswith('_'): # Service records like _dmarc
|
||||
entry["service"] = domain_parts[0]
|
||||
# Adjust domain parts
|
||||
domain_parts = domain_parts[1:]
|
||||
|
||||
# For domain.tld format
|
||||
if len(domain_parts) == 2:
|
||||
entry["sld"] = domain_parts[0] # Second Level Domain
|
||||
entry["tld"] = domain_parts[1] # Top Level Domain
|
||||
# For subdomain.domain.tld format
|
||||
elif len(domain_parts) > 2:
|
||||
entry["sld"] = domain_parts[-2] # Second Level Domain
|
||||
entry["tld"] = domain_parts[-1] # Top Level Domain
|
||||
entry["subdomain"] = '.'.join(domain_parts[:-2]) # Subdomains
|
||||
# Add special handling for service records
|
||||
if len(domain_parts) > 0 and domain_parts[0].startswith('_'): # Service records like _dmarc
|
||||
entry["service"] = domain_parts[0]
|
||||
|
||||
dns_records_to_insert.append(entry)
|
||||
|
||||
|
@ -170,23 +139,42 @@ async def process_csv_upload(file_content, upload_id, description=None):
|
|||
print(traceback.format_exc())
|
||||
return 0, 0
|
||||
|
||||
# Load domains from database - deduplicated by full domain name
|
||||
def load_domains(specific_upload_id: str = None) -> List[Dict]:
|
||||
# Load domains from database - deduplicated by full domain name, with optional base domain filtering
|
||||
def load_domains(specific_upload_id: str = None, base_domains_only: bool = False) -> List[Dict]:
|
||||
try:
|
||||
domains = domains_table.all()
|
||||
|
||||
# If a specific upload ID is provided, only show domains from that upload
|
||||
if specific_upload_id:
|
||||
domains = [d for d in domains if d.get('upload_id') == specific_upload_id]
|
||||
return domains
|
||||
if not base_domains_only:
|
||||
return domains
|
||||
|
||||
# Add the base_domain field to each domain
|
||||
for domain in domains:
|
||||
domain['base_domain'] = extract_base_domain(domain.get('full_domain', ''))
|
||||
|
||||
# Sort by timestamp in descending order (newest first)
|
||||
domains.sort(key=lambda x: x.get('timestamp', ''), reverse=True)
|
||||
|
||||
# Create a dictionary to track unique domains by full domain name
|
||||
# Create a dictionary to track unique domains
|
||||
unique_domains = {}
|
||||
base_domains_set = set()
|
||||
|
||||
# First pass: collect all base domains
|
||||
if base_domains_only:
|
||||
for domain in domains:
|
||||
base_domains_set.add(domain.get('base_domain', ''))
|
||||
|
||||
for domain in domains:
|
||||
# If base_domains_only is True, only keep domains that are base domains themselves
|
||||
if base_domains_only:
|
||||
full_domain = domain.get('full_domain', '')
|
||||
base_domain = domain.get('base_domain', '')
|
||||
|
||||
if full_domain != base_domain:
|
||||
continue
|
||||
|
||||
# Create a unique key based on the full domain name
|
||||
unique_key = domain.get('full_domain', '')
|
||||
|
||||
|
@ -201,45 +189,131 @@ def load_domains(specific_upload_id: str = None) -> List[Dict]:
|
|||
print(f"Error loading domains from database: {e}")
|
||||
return []
|
||||
|
||||
# Load DNS entries from database - deduplicated by domain, class, and type (no history)
|
||||
def load_dns_entries(specific_upload_id: str = None) -> List[Dict]:
|
||||
# Load DNS entries from database - with optional deduplication
|
||||
def load_dns_entries(specific_upload_id: str = None, deduplicate: bool = False) -> List[Dict]:
|
||||
try:
|
||||
entries = dns_records_table.all()
|
||||
|
||||
# If a specific upload ID is provided, only show records from that upload
|
||||
if specific_upload_id:
|
||||
entries = [e for e in entries if e.get('upload_id') == specific_upload_id]
|
||||
return entries
|
||||
|
||||
# Sort by timestamp in descending order (newest first)
|
||||
entries.sort(key=lambda x: x.get('timestamp', ''), reverse=True)
|
||||
|
||||
# Create a dictionary to track unique entries (most recent only)
|
||||
unique_entries = {}
|
||||
|
||||
for entry in entries:
|
||||
# Create a unique key based on domain, class, and type
|
||||
unique_key = f"{entry.get('domain')}:{entry.get('record_class')}:{entry.get('record_type')}"
|
||||
# If deduplication is requested, only keep the most recent entry for each unique combination
|
||||
if deduplicate:
|
||||
# Create a dictionary to track unique entries (most recent only)
|
||||
unique_entries = {}
|
||||
|
||||
# Only keep the most recent entry for each unique combination
|
||||
if unique_key not in unique_entries:
|
||||
# Mark as most recent entry
|
||||
entry['is_latest'] = True
|
||||
unique_entries[unique_key] = entry
|
||||
|
||||
# Return the deduplicated list with only the most recent entries
|
||||
return list(unique_entries.values())
|
||||
for entry in entries:
|
||||
# Create a unique key based on domain, class, type, TTL, and data
|
||||
unique_key = f"{entry.get('domain')}:{entry.get('record_class')}:{entry.get('record_type')}:{entry.get('ttl')}:{entry.get('record_data')}"
|
||||
|
||||
# Only keep the most recent entry for each unique combination
|
||||
if unique_key not in unique_entries:
|
||||
# Mark as most recent entry
|
||||
entry['is_latest'] = True
|
||||
unique_entries[unique_key] = entry
|
||||
|
||||
# Return the deduplicated list with only the most recent entries
|
||||
return list(unique_entries.values())
|
||||
else:
|
||||
# No deduplication - return all entries
|
||||
return entries
|
||||
except Exception as e:
|
||||
print(f"Error loading DNS records from database: {e}")
|
||||
return []
|
||||
|
||||
# List of known multi-part TLDs
|
||||
MULTI_PART_TLDS = [
|
||||
'co.uk', 'org.uk', 'me.uk', 'ac.uk', 'gov.uk', 'net.uk', 'sch.uk',
|
||||
'com.au', 'net.au', 'org.au', 'edu.au', 'gov.au', 'asn.au', 'id.au',
|
||||
'co.nz', 'net.nz', 'org.nz', 'govt.nz', 'ac.nz', 'school.nz', 'geek.nz',
|
||||
'com.sg', 'edu.sg', 'gov.sg', 'net.sg', 'org.sg', 'per.sg',
|
||||
'co.za', 'org.za', 'web.za', 'net.za', 'gov.za', 'ac.za',
|
||||
'com.br', 'net.br', 'org.br', 'gov.br', 'edu.br',
|
||||
'co.jp', 'ac.jp', 'go.jp', 'or.jp', 'ne.jp', 'gr.jp',
|
||||
'co.in', 'firm.in', 'net.in', 'org.in', 'gen.in', 'ind.in',
|
||||
'edu.cn', 'gov.cn', 'net.cn', 'org.cn', 'com.cn', 'ac.cn',
|
||||
'com.mx', 'net.mx', 'org.mx', 'edu.mx', 'gob.mx'
|
||||
]
|
||||
|
||||
# Extract the base domain (SLD+TLD) from a full domain name
|
||||
def extract_base_domain(domain: str) -> str:
|
||||
if not domain:
|
||||
return domain
|
||||
|
||||
# Remove trailing dot if present
|
||||
if domain.endswith('.'):
|
||||
domain = domain[:-1]
|
||||
|
||||
parts = domain.split('.')
|
||||
|
||||
# Check if the domain has enough parts
|
||||
if len(parts) <= 1:
|
||||
return domain
|
||||
|
||||
# Check for known multi-part TLDs first
|
||||
for tld in MULTI_PART_TLDS:
|
||||
tld_parts = tld.split('.')
|
||||
if len(parts) > len(tld_parts) and '.'.join(parts[-len(tld_parts):]) == tld:
|
||||
# The domain has a multi-part TLD, extract SLD + multi-part TLD
|
||||
return parts[-len(tld_parts)-1] + '.' + tld
|
||||
|
||||
# Default case: extract last two parts
|
||||
if len(parts) > 1:
|
||||
return '.'.join(parts[-2:])
|
||||
|
||||
return domain
|
||||
|
||||
# Get all unique base domains from the database
|
||||
def get_unique_base_domains(specific_upload_id: str = None) -> List[Dict]:
|
||||
try:
|
||||
domains = domains_table.all()
|
||||
|
||||
# If a specific upload ID is provided, only show domains from that upload
|
||||
if specific_upload_id:
|
||||
domains = [d for d in domains if d.get('upload_id') == specific_upload_id]
|
||||
|
||||
# Add the base_domain field to each domain
|
||||
for domain in domains:
|
||||
domain['base_domain'] = extract_base_domain(domain.get('full_domain', ''))
|
||||
|
||||
# Sort by timestamp in descending order (newest first)
|
||||
domains.sort(key=lambda x: x.get('timestamp', ''), reverse=True)
|
||||
|
||||
# Create dictionaries to track unique base domains
|
||||
unique_base_domains = {}
|
||||
|
||||
# Process each domain and keep only unique base domains
|
||||
for domain in domains:
|
||||
base_domain = domain.get('base_domain', '')
|
||||
|
||||
# Skip if no base domain
|
||||
if not base_domain:
|
||||
continue
|
||||
|
||||
# Check if this base domain has been seen before
|
||||
if base_domain not in unique_base_domains:
|
||||
# Create a new entry for this base domain - with simplified fields
|
||||
base_domain_entry = {
|
||||
'domain': base_domain,
|
||||
'timestamp': domain.get('timestamp')
|
||||
}
|
||||
unique_base_domains[base_domain] = base_domain_entry
|
||||
|
||||
# Return the list of unique base domains
|
||||
return list(unique_base_domains.values())
|
||||
except Exception as e:
|
||||
print(f"Error getting unique base domains: {e}")
|
||||
return []
|
||||
|
||||
# Get unique values for filter dropdowns
|
||||
def get_unique_values(entries: List[Dict]) -> Dict[str, Set]:
|
||||
unique_values = {
|
||||
"record_type": set(),
|
||||
"record_class": set(),
|
||||
"tld": set(),
|
||||
"sld": set()
|
||||
"record_class": set()
|
||||
}
|
||||
|
||||
for entry in entries:
|
||||
|
@ -276,18 +350,77 @@ def delete_upload(upload_id):
|
|||
print(f"Error deleting upload {upload_id}: {e}")
|
||||
return False
|
||||
|
||||
# CSV Export Functions
|
||||
def domains_to_csv(domains: List[Dict]) -> str:
|
||||
"""Convert domains data to CSV format"""
|
||||
csv_output = io.StringIO()
|
||||
|
||||
if not domains:
|
||||
return ""
|
||||
|
||||
# Determine fields based on data
|
||||
# Always include the full_domain field
|
||||
fields = ["full_domain", "timestamp"]
|
||||
if "base_domain" in domains[0]:
|
||||
fields.insert(1, "base_domain")
|
||||
|
||||
# Add headers
|
||||
writer = csv.DictWriter(csv_output, fieldnames=fields, extrasaction='ignore')
|
||||
writer.writeheader()
|
||||
|
||||
# Add data
|
||||
for domain in domains:
|
||||
# Create a row dict with formatted timestamp
|
||||
row = {k: domain.get(k) for k in fields}
|
||||
if "timestamp" in row and row["timestamp"]:
|
||||
# Format timestamp nicely for CSV
|
||||
row["timestamp"] = row["timestamp"].replace('T', ' ').split('.')[0]
|
||||
writer.writerow(row)
|
||||
|
||||
return csv_output.getvalue()
|
||||
|
||||
def dns_records_to_csv(records: List[Dict]) -> str:
|
||||
"""Convert DNS records data to CSV format"""
|
||||
csv_output = io.StringIO()
|
||||
|
||||
if not records:
|
||||
return ""
|
||||
|
||||
# Define the fields to include in the CSV
|
||||
fields = ["domain", "ttl", "record_class", "record_type", "record_data", "timestamp"]
|
||||
|
||||
# Add headers
|
||||
writer = csv.DictWriter(csv_output, fieldnames=fields, extrasaction='ignore')
|
||||
writer.writeheader()
|
||||
|
||||
# Add data
|
||||
for record in records:
|
||||
# Create a row dict with formatted timestamp
|
||||
row = {k: record.get(k) for k in fields}
|
||||
if "timestamp" in row and row["timestamp"]:
|
||||
# Format timestamp nicely for CSV
|
||||
row["timestamp"] = row["timestamp"].replace('T', ' ').split('.')[0]
|
||||
writer.writerow(row)
|
||||
|
||||
return csv_output.getvalue()
|
||||
|
||||
# Routes
|
||||
@app.get("/", response_class=HTMLResponse)
|
||||
async def home(request: Request, upload_id: Optional[str] = None):
|
||||
"""Home page with upload form and SLD listing"""
|
||||
domains = load_domains(upload_id)
|
||||
async def home(
|
||||
request: Request,
|
||||
upload_id: Optional[str] = None,
|
||||
base_domains_only: Optional[bool] = False
|
||||
):
|
||||
"""Home page with upload form and domain listing"""
|
||||
domains = load_domains(upload_id, base_domains_only)
|
||||
uploads = get_uploads()
|
||||
return templates.TemplateResponse(
|
||||
"index.html",
|
||||
{
|
||||
"request": request,
|
||||
"domains": domains,
|
||||
"uploads": uploads
|
||||
"uploads": uploads,
|
||||
"base_domains_only": base_domains_only
|
||||
}
|
||||
)
|
||||
|
||||
|
@ -361,28 +494,23 @@ async def dns_records(
|
|||
upload_id: Optional[str] = None,
|
||||
record_type: Optional[str] = None,
|
||||
record_class: Optional[str] = None,
|
||||
tld: Optional[str] = None,
|
||||
sld: Optional[str] = None,
|
||||
domain: Optional[str] = None
|
||||
domain: Optional[str] = None,
|
||||
deduplicate: Optional[bool] = True # Default to showing only unique latest entries
|
||||
):
|
||||
"""DNS Records page with filtering"""
|
||||
# Get all entries first, based on upload_id if provided
|
||||
entries = load_dns_entries(upload_id)
|
||||
# Get all entries first, based on upload_id if provided, with deduplication option
|
||||
entries = load_dns_entries(upload_id, deduplicate)
|
||||
|
||||
# Apply additional filters if provided
|
||||
if record_type:
|
||||
entries = [e for e in entries if e.get("record_type") == record_type]
|
||||
if record_class:
|
||||
entries = [e for e in entries if e.get("record_class") == record_class]
|
||||
if tld:
|
||||
entries = [e for e in entries if e.get("tld") == tld]
|
||||
if sld:
|
||||
entries = [e for e in entries if e.get("sld") == sld]
|
||||
if domain:
|
||||
entries = [e for e in entries if domain.lower() in e.get("domain", "").lower()]
|
||||
|
||||
# Get unique values for filter dropdowns from all entries (not filtered)
|
||||
all_entries = load_dns_entries(upload_id)
|
||||
all_entries = load_dns_entries(upload_id, deduplicate=False)
|
||||
unique_values = get_unique_values(all_entries)
|
||||
uploads = get_uploads()
|
||||
|
||||
|
@ -392,34 +520,96 @@ async def dns_records(
|
|||
"request": request,
|
||||
"entries": entries,
|
||||
"unique_values": unique_values,
|
||||
"uploads": uploads
|
||||
"uploads": uploads,
|
||||
"deduplicate": deduplicate
|
||||
}
|
||||
)
|
||||
|
||||
@app.get("/export-domains-csv")
|
||||
async def export_domains_csv(
|
||||
upload_id: Optional[str] = None,
|
||||
base_domains_only: Optional[bool] = False
|
||||
):
|
||||
"""Export domains as CSV"""
|
||||
domains = load_domains(upload_id, base_domains_only)
|
||||
csv_content = domains_to_csv(domains)
|
||||
|
||||
# Generate a filename with timestamp
|
||||
filename = f"domains_export_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
|
||||
|
||||
# Return the CSV as a downloadable file
|
||||
return Response(
|
||||
content=csv_content,
|
||||
media_type="text/csv",
|
||||
headers={"Content-Disposition": f"attachment; filename={filename}"}
|
||||
)
|
||||
|
||||
@app.get("/export-dns-csv")
|
||||
async def export_dns_csv(
|
||||
upload_id: Optional[str] = None,
|
||||
record_type: Optional[str] = None,
|
||||
record_class: Optional[str] = None,
|
||||
domain: Optional[str] = None,
|
||||
deduplicate: Optional[bool] = True
|
||||
):
|
||||
"""Export DNS records as CSV"""
|
||||
# Get entries with applied filters
|
||||
entries = load_dns_entries(upload_id, deduplicate)
|
||||
|
||||
# Apply additional filters if provided
|
||||
if record_type:
|
||||
entries = [e for e in entries if e.get("record_type") == record_type]
|
||||
if record_class:
|
||||
entries = [e for e in entries if e.get("record_class") == record_class]
|
||||
if domain:
|
||||
entries = [e for e in entries if domain.lower() in e.get("domain", "").lower()]
|
||||
|
||||
csv_content = dns_records_to_csv(entries)
|
||||
|
||||
# Generate a filename with timestamp
|
||||
filename = f"dns_records_export_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
|
||||
|
||||
# Return the CSV as a downloadable file
|
||||
return Response(
|
||||
content=csv_content,
|
||||
media_type="text/csv",
|
||||
headers={"Content-Disposition": f"attachment; filename={filename}"}
|
||||
)
|
||||
|
||||
# API Routes
|
||||
@app.get("/api/uploads", response_model=List[Dict])
|
||||
async def get_all_uploads():
|
||||
"""API endpoint that returns all uploads"""
|
||||
return get_uploads()
|
||||
|
||||
@app.get("/api/slds", response_model=List[Dict])
|
||||
async def get_slds(upload_id: Optional[str] = None):
|
||||
"""API endpoint that returns all SLDs with optional filter by upload_id"""
|
||||
# The load_domains function now handles deduplication and upload_id filtering
|
||||
domains = load_domains(upload_id)
|
||||
@app.get("/api/domains", response_model=List[Dict])
|
||||
async def get_domains(
|
||||
upload_id: Optional[str] = None,
|
||||
base_domains_only: Optional[bool] = False
|
||||
):
|
||||
"""API endpoint that returns all domains with optional filtering"""
|
||||
# The load_domains function handles deduplication and filtering
|
||||
domains = load_domains(upload_id, base_domains_only)
|
||||
return domains
|
||||
|
||||
@app.get("/api/slds/{sld}", response_model=List[Dict])
|
||||
async def get_domains_by_sld(sld: str, upload_id: Optional[str] = None):
|
||||
"""API endpoint that returns domains for a specific SLD with optional filter by upload_id"""
|
||||
@app.get("/api/base-domains", response_model=List[Dict])
|
||||
async def get_base_domains(upload_id: Optional[str] = None):
|
||||
"""API endpoint that returns only unique base domains"""
|
||||
# Get only the unique base domains
|
||||
base_domains = get_unique_base_domains(upload_id)
|
||||
return base_domains
|
||||
|
||||
@app.get("/api/domains/{domain}", response_model=List[Dict])
|
||||
async def get_domains_by_name(domain: str, upload_id: Optional[str] = None):
|
||||
"""API endpoint that returns domains matching a specific domain name with optional filter by upload_id"""
|
||||
# Get domains, already deduplicated and optionally filtered by upload_id
|
||||
all_domains = load_domains(upload_id)
|
||||
|
||||
# Filter by SLD
|
||||
filtered = [item for item in all_domains if item["sld"].lower() == sld.lower()]
|
||||
# Filter by domain name
|
||||
filtered = [item for item in all_domains if domain.lower() in item["full_domain"].lower()]
|
||||
|
||||
if not filtered:
|
||||
raise HTTPException(status_code=404, detail=f"No domains found with SLD: {sld}")
|
||||
raise HTTPException(status_code=404, detail=f"No domains found matching: {domain}")
|
||||
|
||||
return filtered
|
||||
|
||||
|
@ -427,24 +617,19 @@ async def get_domains_by_sld(sld: str, upload_id: Optional[str] = None):
|
|||
async def get_dns_entries(
|
||||
record_type: Optional[str] = None,
|
||||
record_class: Optional[str] = None,
|
||||
tld: Optional[str] = None,
|
||||
sld: Optional[str] = None,
|
||||
domain: Optional[str] = None,
|
||||
upload_id: Optional[str] = None
|
||||
upload_id: Optional[str] = None,
|
||||
deduplicate: Optional[bool] = True
|
||||
):
|
||||
"""API endpoint that returns filtered DNS entries"""
|
||||
"""API endpoint that returns filtered DNS entries with optional deduplication"""
|
||||
# Get entries - if upload_id is specified, only those entries are returned
|
||||
entries = load_dns_entries(upload_id)
|
||||
entries = load_dns_entries(upload_id, deduplicate)
|
||||
|
||||
# Apply additional filters if provided
|
||||
if record_type:
|
||||
entries = [e for e in entries if e.get("record_type") == record_type]
|
||||
if record_class:
|
||||
entries = [e for e in entries if e.get("record_class") == record_class]
|
||||
if tld:
|
||||
entries = [e for e in entries if e.get("tld") == tld]
|
||||
if sld:
|
||||
entries = [e for e in entries if e.get("sld") == sld]
|
||||
if domain:
|
||||
entries = [e for e in entries if domain.lower() in e.get("domain", "").lower()]
|
||||
|
||||
|
|
|
@ -115,14 +115,10 @@
|
|||
background-color: #e0e0e0;
|
||||
color: #333;
|
||||
}
|
||||
.sld-badge {
|
||||
.domain-badge {
|
||||
background-color: #d1e7dd;
|
||||
color: #0f5132;
|
||||
}
|
||||
.tld-badge {
|
||||
background-color: #cfe2ff;
|
||||
color: #0a58ca;
|
||||
}
|
||||
.service-badge {
|
||||
background-color: #fff3cd;
|
||||
color: #664d03;
|
||||
|
@ -149,6 +145,13 @@
|
|||
margin-left: 10px;
|
||||
font-weight: normal;
|
||||
}
|
||||
.dedup-note {
|
||||
display: block;
|
||||
font-size: 0.7em;
|
||||
color: #666;
|
||||
font-weight: normal;
|
||||
margin-top: 5px;
|
||||
}
|
||||
.tooltip {
|
||||
position: relative;
|
||||
display: inline-block;
|
||||
|
@ -181,14 +184,43 @@
|
|||
width: 100%;
|
||||
}
|
||||
}
|
||||
.table-header {
|
||||
display: flex;
|
||||
justify-content: space-between;
|
||||
align-items: flex-start;
|
||||
margin-bottom: 15px;
|
||||
}
|
||||
.table-actions {
|
||||
display: flex;
|
||||
gap: 10px;
|
||||
margin-top: 10px;
|
||||
}
|
||||
.btn-export {
|
||||
display: inline-flex;
|
||||
align-items: center;
|
||||
gap: 5px;
|
||||
padding: 8px 15px;
|
||||
background-color: #4CAF50;
|
||||
color: white;
|
||||
border-radius: 4px;
|
||||
text-decoration: none;
|
||||
font-weight: bold;
|
||||
font-size: 0.9em;
|
||||
}
|
||||
.btn-export:hover {
|
||||
background-color: #45a049;
|
||||
}
|
||||
.icon {
|
||||
font-size: 1.2em;
|
||||
}
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<div class="container">
|
||||
<h1>DNS Entry Viewer</h1>
|
||||
<h1>DNS Records Viewer {% if deduplicate %}(Deduplicated){% else %}(All Records){% endif %}</h1>
|
||||
|
||||
<div class="nav">
|
||||
<a href="/" class="nav-link">SLD View</a>
|
||||
<a href="/" class="nav-link">Domains</a>
|
||||
<a href="/dns-records" class="nav-link">DNS Records</a>
|
||||
</div>
|
||||
|
||||
|
@ -222,35 +254,46 @@
|
|||
{% endfor %}
|
||||
</select>
|
||||
</div>
|
||||
<div class="filter-group">
|
||||
<label for="tld">TLD:</label>
|
||||
<select id="tld" name="tld">
|
||||
<option value="">All TLDs</option>
|
||||
{% for tld in unique_values.tld %}
|
||||
<option value="{{ tld }}" {% if request.query_params.get('tld') == tld %}selected{% endif %}>{{ tld }}</option>
|
||||
{% endfor %}
|
||||
</select>
|
||||
</div>
|
||||
<div class="filter-group">
|
||||
<label for="sld">SLD:</label>
|
||||
<select id="sld" name="sld">
|
||||
<option value="">All SLDs</option>
|
||||
{% for sld in unique_values.sld %}
|
||||
<option value="{{ sld }}" {% if request.query_params.get('sld') == sld %}selected{% endif %}>{{ sld }}</option>
|
||||
{% endfor %}
|
||||
</select>
|
||||
</div>
|
||||
<div class="filter-group">
|
||||
<label for="domain-search">Domain Search:</label>
|
||||
<input type="text" id="domain-search" name="domain" placeholder="Enter domain name..." value="{{ request.query_params.get('domain', '') }}">
|
||||
</div>
|
||||
<div class="filter-group">
|
||||
<label for="deduplicate">Deduplicate Records:</label>
|
||||
<select id="deduplicate" name="deduplicate" class="filter-select">
|
||||
<option value="true" {% if request.query_params.get('deduplicate', 'true') == 'true' %}selected{% endif %}>Yes (Deduplicate Identical Records)</option>
|
||||
<option value="false" {% if request.query_params.get('deduplicate') == 'false' %}selected{% endif %}>No (Show All)</option>
|
||||
</select>
|
||||
</div>
|
||||
<div class="filter-buttons">
|
||||
<button type="submit">Apply Filters</button>
|
||||
<a href="/dns-records" class="reset-button" style="padding: 8px 16px; background-color: #f44336; color: white; text-decoration: none; border-radius: 4px; font-weight: bold; display: inline-block;">Reset</a>
|
||||
</div>
|
||||
</form>
|
||||
|
||||
<div class="api-section">
|
||||
<h3>API Endpoints</h3>
|
||||
<p>Get all DNS entries: <code>/api/dns</code></p>
|
||||
<p>Get filtered DNS entries: <code>/api/dns?record_type=A&domain=example.com</code></p>
|
||||
<p>Filter by upload: <code>/api/dns?upload_id={upload_id}</code></p>
|
||||
<p>Show all records (no deduplication): <code>/api/dns?deduplicate=false</code></p>
|
||||
<p>Get unique filter values: <code>/api/dns/types</code></p>
|
||||
</div>
|
||||
|
||||
<h2>DNS Records <span class="count-badge">{{ entries|length }}</span></h2>
|
||||
<div class="table-header">
|
||||
<h2>DNS Records <span class="count-badge">{{ entries|length }}</span>
|
||||
{% if deduplicate %}
|
||||
<small class="dedup-note">(Showing most recent entries for each Domain+Class+Type+TTL+Data combination)</small>
|
||||
{% endif %}
|
||||
</h2>
|
||||
{% if entries %}
|
||||
<div class="table-actions">
|
||||
<a href="/export-dns-csv?{% if request.query_params.get('upload_id') %}upload_id={{ request.query_params.get('upload_id') }}{% endif %}{% if request.query_params.get('record_type') %}&record_type={{ request.query_params.get('record_type') }}{% endif %}{% if request.query_params.get('record_class') %}&record_class={{ request.query_params.get('record_class') }}{% endif %}{% if request.query_params.get('domain') %}&domain={{ request.query_params.get('domain') }}{% endif %}{% if request.query_params.get('deduplicate') == 'false' %}&deduplicate=false{% endif %}" class="btn-export" title="Export to CSV">
|
||||
<span class="icon">📥</span> Export CSV
|
||||
</a>
|
||||
</div>
|
||||
{% endif %}
|
||||
</div>
|
||||
|
||||
{% if entries %}
|
||||
<table id="dns-table">
|
||||
|
@ -271,11 +314,7 @@
|
|||
{% if entry.get('service') %}
|
||||
<span class="badge service-badge">{{ entry.service }}</span>
|
||||
{% endif %}
|
||||
{% if entry.get('subdomain') %}
|
||||
{{ entry.subdomain }}.
|
||||
{% endif %}
|
||||
<span class="badge sld-badge">{{ entry.sld }}</span>.
|
||||
<span class="badge tld-badge">{{ entry.tld }}</span>
|
||||
<span class="badge domain-badge">{{ entry.domain }}</span>
|
||||
</td>
|
||||
<td>{{ entry.ttl }}</td>
|
||||
<td>{{ entry.record_class }}</td>
|
||||
|
@ -298,14 +337,6 @@
|
|||
{% else %}
|
||||
<p>No DNS entries found. Please upload a CSV file to get started.</p>
|
||||
{% endif %}
|
||||
|
||||
<div class="api-section">
|
||||
<h2>API Endpoints</h2>
|
||||
<p>Get all DNS entries: <code>/api/dns</code></p>
|
||||
<p>Get filtered DNS entries: <code>/api/dns?record_type=A&tld=de</code></p>
|
||||
<p>Filter by upload: <code>/api/dns?upload_id={upload_id}</code></p>
|
||||
<p>Get unique filter values: <code>/api/dns/types</code></p>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<!-- All JavaScript removed, using server-side FastAPI for filtering -->
|
||||
|
|
|
@ -53,7 +53,7 @@
|
|||
tr:hover {
|
||||
background-color: #f5f5f5;
|
||||
}
|
||||
.sld-badge {
|
||||
.domain-badge {
|
||||
display: inline-block;
|
||||
padding: 3px 7px;
|
||||
background-color: #d1e7dd;
|
||||
|
@ -61,7 +61,7 @@
|
|||
font-size: 0.9em;
|
||||
color: #0f5132;
|
||||
}
|
||||
.tld-badge {
|
||||
.base-domain-badge {
|
||||
display: inline-block;
|
||||
padding: 3px 7px;
|
||||
background-color: #cfe2ff;
|
||||
|
@ -69,6 +69,15 @@
|
|||
font-size: 0.9em;
|
||||
color: #0a58ca;
|
||||
}
|
||||
.same-domain-badge {
|
||||
display: inline-block;
|
||||
padding: 3px 7px;
|
||||
background-color: #e9ecef;
|
||||
border-radius: 4px;
|
||||
font-size: 0.9em;
|
||||
color: #6c757d;
|
||||
font-style: italic;
|
||||
}
|
||||
.api-section {
|
||||
margin-top: 30px;
|
||||
padding: 15px;
|
||||
|
@ -135,12 +144,76 @@
|
|||
}
|
||||
.filter-form {
|
||||
margin-bottom: 20px;
|
||||
background-color: #f9f9f9;
|
||||
padding: 15px;
|
||||
border-radius: 5px;
|
||||
}
|
||||
.filter-row {
|
||||
display: flex;
|
||||
flex-wrap: wrap;
|
||||
gap: 15px;
|
||||
align-items: flex-end;
|
||||
}
|
||||
.filter-group {
|
||||
display: flex;
|
||||
flex-direction: column;
|
||||
}
|
||||
.filter-group label {
|
||||
font-weight: bold;
|
||||
margin-bottom: 5px;
|
||||
font-size: 0.9em;
|
||||
}
|
||||
.filter-select {
|
||||
padding: 8px 12px;
|
||||
border: 1px solid #ddd;
|
||||
border-radius: 4px;
|
||||
margin-right: 10px;
|
||||
min-width: 150px;
|
||||
}
|
||||
.btn-sm {
|
||||
padding: 8px 16px;
|
||||
font-size: 0.9em;
|
||||
}
|
||||
.reset-button {
|
||||
display: inline-block;
|
||||
padding: 8px 16px;
|
||||
background-color: #f44336;
|
||||
color: white;
|
||||
text-decoration: none;
|
||||
border-radius: 4px;
|
||||
font-weight: bold;
|
||||
font-size: 0.9em;
|
||||
}
|
||||
.reset-button:hover {
|
||||
background-color: #e53935;
|
||||
color: white;
|
||||
}
|
||||
.table-header {
|
||||
display: flex;
|
||||
justify-content: space-between;
|
||||
align-items: center;
|
||||
margin-bottom: 15px;
|
||||
}
|
||||
.table-actions {
|
||||
display: flex;
|
||||
gap: 10px;
|
||||
}
|
||||
.btn-export {
|
||||
display: inline-flex;
|
||||
align-items: center;
|
||||
gap: 5px;
|
||||
padding: 8px 15px;
|
||||
background-color: #4CAF50;
|
||||
color: white;
|
||||
border-radius: 4px;
|
||||
text-decoration: none;
|
||||
font-weight: bold;
|
||||
font-size: 0.9em;
|
||||
}
|
||||
.btn-export:hover {
|
||||
background-color: #45a049;
|
||||
}
|
||||
.icon {
|
||||
font-size: 1.2em;
|
||||
}
|
||||
</style>
|
||||
</head>
|
||||
|
@ -149,7 +222,7 @@
|
|||
<h1>Domain Management System</h1>
|
||||
|
||||
<div class="nav">
|
||||
<a href="/" class="nav-link">SLD View</a>
|
||||
<a href="/" class="nav-link">Domains</a>
|
||||
<a href="/dns-records" class="nav-link">DNS Records</a>
|
||||
</div>
|
||||
|
||||
|
@ -207,45 +280,71 @@
|
|||
<div class="filter-form">
|
||||
<h2>Domain List</h2>
|
||||
<form id="filterForm" method="get">
|
||||
<label for="upload_filter">Filter by upload:</label>
|
||||
<select id="upload_filter" name="upload_id" class="filter-select" onchange="this.form.submit()">
|
||||
<option value="">All uploads</option>
|
||||
{% for upload in uploads %}
|
||||
<option value="{{ upload.id }}" {% if request.query_params.get('upload_id') == upload.id %}selected{% endif %}>
|
||||
{{ upload.filename }} - {{ upload.timestamp.replace('T', ' ').split('.')[0] }}
|
||||
</option>
|
||||
{% endfor %}
|
||||
</select>
|
||||
<div class="filter-row">
|
||||
<div class="filter-group">
|
||||
<label for="upload_filter">Filter by upload:</label>
|
||||
<select id="upload_filter" name="upload_id" class="filter-select">
|
||||
<option value="">All uploads</option>
|
||||
{% for upload in uploads %}
|
||||
<option value="{{ upload.id }}" {% if request.query_params.get('upload_id') == upload.id %}selected{% endif %}>
|
||||
{{ upload.filename }} - {{ upload.timestamp.replace('T', ' ').split('.')[0] }}
|
||||
</option>
|
||||
{% endfor %}
|
||||
</select>
|
||||
</div>
|
||||
|
||||
<div class="filter-group">
|
||||
<label for="base_domains_only">Show base domains only:</label>
|
||||
<select id="base_domains_only" name="base_domains_only" class="filter-select">
|
||||
<option value="false" {% if request.query_params.get('base_domains_only', 'false') == 'false' %}selected{% endif %}>No (Show All)</option>
|
||||
<option value="true" {% if request.query_params.get('base_domains_only') == 'true' %}selected{% endif %}>Yes (example.com only)</option>
|
||||
</select>
|
||||
</div>
|
||||
|
||||
<div class="filter-buttons">
|
||||
<button type="submit" class="btn btn-sm">Apply Filters</button>
|
||||
<a href="/" class="reset-button">Reset</a>
|
||||
</div>
|
||||
</div>
|
||||
</form>
|
||||
</div>
|
||||
|
||||
<div class="api-section">
|
||||
<h3>API Endpoints</h3>
|
||||
<p>Get all uploads: <code>/api/uploads</code></p>
|
||||
<p>Get all domains: <code>/api/slds</code></p>
|
||||
<p>Get domains by SLD: <code>/api/slds/{sld}</code></p>
|
||||
<p>Filter by upload: <code>/api/slds?upload_id={upload_id}</code></p>
|
||||
<p>Get all domains: <code>/api/domains</code></p>
|
||||
<p>Get only base domains: <code>/api/base-domains</code> (simplified format: <code>{"domain": "example.com", "timestamp": "..."}</code>)</p>
|
||||
<p>Get domains by name: <code>/api/domains/{domain}</code></p>
|
||||
<p>Filter by upload: <code>/api/domains?upload_id={upload_id}</code></p>
|
||||
<p>Show base domains only: <code>/api/domains?base_domains_only=true</code></p>
|
||||
</div>
|
||||
|
||||
{% if domains %}
|
||||
<p>Found {{ domains|length }} domains{% if request.query_params.get('upload_id') %} in this upload{% endif %}.</p>
|
||||
<div class="table-header">
|
||||
<p>Found {{ domains|length }} domains{% if request.query_params.get('upload_id') %} in this upload{% endif %}.</p>
|
||||
<div class="table-actions">
|
||||
<a href="/export-domains-csv?{% if request.query_params.get('upload_id') %}upload_id={{ request.query_params.get('upload_id') }}{% endif %}{% if request.query_params.get('base_domains_only') == 'true' %}&base_domains_only=true{% endif %}" class="btn-export" title="Export to CSV">
|
||||
<span class="icon">📥</span> Export CSV
|
||||
</a>
|
||||
</div>
|
||||
</div>
|
||||
<table>
|
||||
<thead>
|
||||
<tr>
|
||||
<th>SLD</th>
|
||||
<th>TLD</th>
|
||||
<th>Subdomain</th>
|
||||
<th>Full Domain</th>
|
||||
<th>Domain</th>
|
||||
{% if not base_domains_only %}
|
||||
<th>Base Domain</th>
|
||||
{% endif %}
|
||||
<th>Upload Date</th>
|
||||
</tr>
|
||||
</thead>
|
||||
<tbody>
|
||||
{% for item in domains %}
|
||||
<tr>
|
||||
<td><span class="sld-badge">{{ item.sld }}</span></td>
|
||||
<td><span class="tld-badge">{{ item.tld }}</span></td>
|
||||
<td>{{ item.get('subdomain', 'N/A') }}</td>
|
||||
<td>{{ item.full_domain }}</td>
|
||||
<td><span class="domain-badge">{{ item.full_domain }}</span></td>
|
||||
{% if not base_domains_only %}
|
||||
<td>{% if item.base_domain != item.full_domain %}<span class="base-domain-badge">{{ item.base_domain }}</span>{% else %}<span class="same-domain-badge">Same as domain</span>{% endif %}</td>
|
||||
{% endif %}
|
||||
<td>{{ item.timestamp.replace('T', ' ').split('.')[0] if item.get('timestamp') else 'N/A' }}</td>
|
||||
</tr>
|
||||
{% endfor %}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue