Compare commits

..

No commits in common. "616bb8d014b86dfabe14c4724e9a38c143caaf21" and "6ce10f673e3a4559004fbdacb004b86d5ea81861" have entirely different histories.

4 changed files with 168 additions and 530 deletions

View file

@ -85,39 +85,11 @@ Where columns are:
4. Record Type (A, AAAA, MX, CNAME, TXT, etc.) 4. Record Type (A, AAAA, MX, CNAME, TXT, etc.)
5. Record Data (IP address, hostname, or other data depending on record type) 5. Record Data (IP address, hostname, or other data depending on record type)
## Domain Base Name Detection
The application includes functionality to identify base domains from fully qualified domain names, including handling of multi-part TLDs like ".co.uk" or ".com.au".
### Multi-Part TLD List
The application uses a hardcoded list of common multi-part TLDs to correctly extract base domains (e.g., "example.co.uk" from "mail.example.co.uk").
This list can be found in `main.py` as `MULTI_PART_TLDS`.
### Updating the TLD List
To ensure accurate domain parsing, you should periodically update the multi-part TLD list. The best sources for this information are:
1. **Public Suffix List (PSL)**: The most comprehensive and authoritative source
- Website: https://publicsuffix.org/list/
- GitHub: https://github.com/publicsuffix/list
- This list is maintained by Mozilla and used by browsers and DNS applications
2. **IANA's TLD Database**: The official registry of top-level domains
- Website: https://www.iana.org/domains/root/db
3. **Commercial Domain Registrars**: Often provide lists of available TLDs
- Examples: GoDaddy, Namecheap, etc.
For the most accurate and comprehensive implementation, consider implementing a parser for the Public Suffix List or using a library that maintains this list (e.g., `publicsuffix2` for Python).
## API Endpoints ## API Endpoints
- `/api/uploads` - Get all uploads - `/api/uploads` - Get all uploads
- `/api/domains` - Get all domains - `/api/slds` - Get all SLDs (Second Level Domains)
- `/api/base-domains` - Get only unique base domains (e.g., example.com, example.co.uk) with simplified response format - `/api/slds/{sld}` - Get domains by SLD
- `/api/domains/{domain}` - Get domains by name
- `/api/dns` - Get all DNS records - `/api/dns` - Get all DNS records
- `/api/dns/types` - Get unique values for filters - `/api/dns/types` - Get unique values for filters
@ -128,27 +100,8 @@ You can filter the API results using the following query parameters:
- `upload_id` - Filter by specific upload - `upload_id` - Filter by specific upload
- `record_type` - Filter by DNS record type - `record_type` - Filter by DNS record type
- `record_class` - Filter by DNS record class - `record_class` - Filter by DNS record class
- `tld` - Filter by Top Level Domain
- `sld` - Filter by Second Level Domain
- `domain` - Search by domain name - `domain` - Search by domain name
- `base_domains_only` - Only show base domains (e.g., example.com not mail.example.com)
- `deduplicate` - For DNS records, control whether to show all records or deduplicate
Examples: Example: `/api/dns?record_type=A&tld=com&upload_id=upload_20250408120000`
- `/api/domains?base_domains_only=true` - Show only base domains
- `/api/base-domains` - Get a simplified list of unique base domains
- `/api/dns?record_type=A&domain=example.com&deduplicate=false` - Show all A records for example.com without deduplication
### Response Format Examples
1. Base Domains Endpoint (`/api/base-domains`):
```json
[
{
"domain": "example.com",
"timestamp": "2025-04-08T12:00:00"
},
{
"domain": "example.co.uk",
"timestamp": "2025-04-08T12:00:00"
}
]
```

351
main.py
View file

@ -3,7 +3,7 @@ import re
import io import io
import datetime import datetime
from fastapi import FastAPI, Request, HTTPException, Query, UploadFile, File, Form from fastapi import FastAPI, Request, HTTPException, Query, UploadFile, File, Form
from fastapi.responses import HTMLResponse, RedirectResponse, Response from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates from fastapi.templating import Jinja2Templates
import uvicorn import uvicorn
@ -32,11 +32,29 @@ def process_domain_entry(domain_entry):
if domain_entry.endswith('.'): if domain_entry.endswith('.'):
domain_entry = domain_entry[:-1] domain_entry = domain_entry[:-1]
if domain_entry: # Parse domain components
# Store only the full domain name without splitting parts = domain_entry.split('.')
if len(parts) > 1:
# For domain.tld format
if len(parts) == 2:
sld = parts[0] # Second Level Domain
tld = parts[1] # Top Level Domain
domain_info = { domain_info = {
"sld": sld,
"tld": tld,
"full_domain": domain_entry "full_domain": domain_entry
} }
# For subdomain.domain.tld format
else:
sld = parts[-2] # Second Level Domain
tld = parts[-1] # Top Level Domain
subdomain = '.'.join(parts[:-2]) # Subdomains
domain_info = {
"sld": sld,
"tld": tld,
"full_domain": domain_entry,
"subdomain": subdomain
}
return domain_info return domain_info
return None return None
@ -72,7 +90,7 @@ async def process_csv_upload(file_content, upload_id, description=None):
domain_info = process_domain_entry(domain_entry) domain_info = process_domain_entry(domain_entry)
if domain_info: if domain_info:
# Create a unique key to avoid duplicates within this upload # Create a unique key to avoid duplicates within this upload
unique_key = domain_info['full_domain'] unique_key = f"{domain_info['sld']}.{domain_info['tld']}"
if unique_key not in unique_domains: if unique_key not in unique_domains:
unique_domains.add(unique_key) unique_domains.add(unique_key)
@ -109,9 +127,22 @@ async def process_csv_upload(file_content, upload_id, description=None):
"timestamp": timestamp "timestamp": timestamp
} }
# Add special handling for service records # Add domain components
if len(domain_parts) > 0 and domain_parts[0].startswith('_'): # Service records like _dmarc if len(domain_parts) > 1:
if domain_parts[0].startswith('_'): # Service records like _dmarc
entry["service"] = domain_parts[0] entry["service"] = domain_parts[0]
# Adjust domain parts
domain_parts = domain_parts[1:]
# For domain.tld format
if len(domain_parts) == 2:
entry["sld"] = domain_parts[0] # Second Level Domain
entry["tld"] = domain_parts[1] # Top Level Domain
# For subdomain.domain.tld format
elif len(domain_parts) > 2:
entry["sld"] = domain_parts[-2] # Second Level Domain
entry["tld"] = domain_parts[-1] # Top Level Domain
entry["subdomain"] = '.'.join(domain_parts[:-2]) # Subdomains
dns_records_to_insert.append(entry) dns_records_to_insert.append(entry)
@ -139,42 +170,23 @@ async def process_csv_upload(file_content, upload_id, description=None):
print(traceback.format_exc()) print(traceback.format_exc())
return 0, 0 return 0, 0
# Load domains from database - deduplicated by full domain name, with optional base domain filtering # Load domains from database - deduplicated by full domain name
def load_domains(specific_upload_id: str = None, base_domains_only: bool = False) -> List[Dict]: def load_domains(specific_upload_id: str = None) -> List[Dict]:
try: try:
domains = domains_table.all() domains = domains_table.all()
# If a specific upload ID is provided, only show domains from that upload # If a specific upload ID is provided, only show domains from that upload
if specific_upload_id: if specific_upload_id:
domains = [d for d in domains if d.get('upload_id') == specific_upload_id] domains = [d for d in domains if d.get('upload_id') == specific_upload_id]
if not base_domains_only:
return domains return domains
# Add the base_domain field to each domain
for domain in domains:
domain['base_domain'] = extract_base_domain(domain.get('full_domain', ''))
# Sort by timestamp in descending order (newest first) # Sort by timestamp in descending order (newest first)
domains.sort(key=lambda x: x.get('timestamp', ''), reverse=True) domains.sort(key=lambda x: x.get('timestamp', ''), reverse=True)
# Create a dictionary to track unique domains # Create a dictionary to track unique domains by full domain name
unique_domains = {} unique_domains = {}
base_domains_set = set()
# First pass: collect all base domains
if base_domains_only:
for domain in domains:
base_domains_set.add(domain.get('base_domain', ''))
for domain in domains: for domain in domains:
# If base_domains_only is True, only keep domains that are base domains themselves
if base_domains_only:
full_domain = domain.get('full_domain', '')
base_domain = domain.get('base_domain', '')
if full_domain != base_domain:
continue
# Create a unique key based on the full domain name # Create a unique key based on the full domain name
unique_key = domain.get('full_domain', '') unique_key = domain.get('full_domain', '')
@ -189,26 +201,25 @@ def load_domains(specific_upload_id: str = None, base_domains_only: bool = False
print(f"Error loading domains from database: {e}") print(f"Error loading domains from database: {e}")
return [] return []
# Load DNS entries from database - with optional deduplication # Load DNS entries from database - deduplicated by domain, class, and type (no history)
def load_dns_entries(specific_upload_id: str = None, deduplicate: bool = False) -> List[Dict]: def load_dns_entries(specific_upload_id: str = None) -> List[Dict]:
try: try:
entries = dns_records_table.all() entries = dns_records_table.all()
# If a specific upload ID is provided, only show records from that upload # If a specific upload ID is provided, only show records from that upload
if specific_upload_id: if specific_upload_id:
entries = [e for e in entries if e.get('upload_id') == specific_upload_id] entries = [e for e in entries if e.get('upload_id') == specific_upload_id]
return entries
# Sort by timestamp in descending order (newest first) # Sort by timestamp in descending order (newest first)
entries.sort(key=lambda x: x.get('timestamp', ''), reverse=True) entries.sort(key=lambda x: x.get('timestamp', ''), reverse=True)
# If deduplication is requested, only keep the most recent entry for each unique combination
if deduplicate:
# Create a dictionary to track unique entries (most recent only) # Create a dictionary to track unique entries (most recent only)
unique_entries = {} unique_entries = {}
for entry in entries: for entry in entries:
# Create a unique key based on domain, class, type, TTL, and data # Create a unique key based on domain, class, and type
unique_key = f"{entry.get('domain')}:{entry.get('record_class')}:{entry.get('record_type')}:{entry.get('ttl')}:{entry.get('record_data')}" unique_key = f"{entry.get('domain')}:{entry.get('record_class')}:{entry.get('record_type')}"
# Only keep the most recent entry for each unique combination # Only keep the most recent entry for each unique combination
if unique_key not in unique_entries: if unique_key not in unique_entries:
@ -218,102 +229,17 @@ def load_dns_entries(specific_upload_id: str = None, deduplicate: bool = False)
# Return the deduplicated list with only the most recent entries # Return the deduplicated list with only the most recent entries
return list(unique_entries.values()) return list(unique_entries.values())
else:
# No deduplication - return all entries
return entries
except Exception as e: except Exception as e:
print(f"Error loading DNS records from database: {e}") print(f"Error loading DNS records from database: {e}")
return [] return []
# List of known multi-part TLDs
MULTI_PART_TLDS = [
'co.uk', 'org.uk', 'me.uk', 'ac.uk', 'gov.uk', 'net.uk', 'sch.uk',
'com.au', 'net.au', 'org.au', 'edu.au', 'gov.au', 'asn.au', 'id.au',
'co.nz', 'net.nz', 'org.nz', 'govt.nz', 'ac.nz', 'school.nz', 'geek.nz',
'com.sg', 'edu.sg', 'gov.sg', 'net.sg', 'org.sg', 'per.sg',
'co.za', 'org.za', 'web.za', 'net.za', 'gov.za', 'ac.za',
'com.br', 'net.br', 'org.br', 'gov.br', 'edu.br',
'co.jp', 'ac.jp', 'go.jp', 'or.jp', 'ne.jp', 'gr.jp',
'co.in', 'firm.in', 'net.in', 'org.in', 'gen.in', 'ind.in',
'edu.cn', 'gov.cn', 'net.cn', 'org.cn', 'com.cn', 'ac.cn',
'com.mx', 'net.mx', 'org.mx', 'edu.mx', 'gob.mx'
]
# Extract the base domain (SLD+TLD) from a full domain name
def extract_base_domain(domain: str) -> str:
if not domain:
return domain
# Remove trailing dot if present
if domain.endswith('.'):
domain = domain[:-1]
parts = domain.split('.')
# Check if the domain has enough parts
if len(parts) <= 1:
return domain
# Check for known multi-part TLDs first
for tld in MULTI_PART_TLDS:
tld_parts = tld.split('.')
if len(parts) > len(tld_parts) and '.'.join(parts[-len(tld_parts):]) == tld:
# The domain has a multi-part TLD, extract SLD + multi-part TLD
return parts[-len(tld_parts)-1] + '.' + tld
# Default case: extract last two parts
if len(parts) > 1:
return '.'.join(parts[-2:])
return domain
# Get all unique base domains from the database
def get_unique_base_domains(specific_upload_id: str = None) -> List[Dict]:
try:
domains = domains_table.all()
# If a specific upload ID is provided, only show domains from that upload
if specific_upload_id:
domains = [d for d in domains if d.get('upload_id') == specific_upload_id]
# Add the base_domain field to each domain
for domain in domains:
domain['base_domain'] = extract_base_domain(domain.get('full_domain', ''))
# Sort by timestamp in descending order (newest first)
domains.sort(key=lambda x: x.get('timestamp', ''), reverse=True)
# Create dictionaries to track unique base domains
unique_base_domains = {}
# Process each domain and keep only unique base domains
for domain in domains:
base_domain = domain.get('base_domain', '')
# Skip if no base domain
if not base_domain:
continue
# Check if this base domain has been seen before
if base_domain not in unique_base_domains:
# Create a new entry for this base domain - with simplified fields
base_domain_entry = {
'domain': base_domain,
'timestamp': domain.get('timestamp')
}
unique_base_domains[base_domain] = base_domain_entry
# Return the list of unique base domains
return list(unique_base_domains.values())
except Exception as e:
print(f"Error getting unique base domains: {e}")
return []
# Get unique values for filter dropdowns # Get unique values for filter dropdowns
def get_unique_values(entries: List[Dict]) -> Dict[str, Set]: def get_unique_values(entries: List[Dict]) -> Dict[str, Set]:
unique_values = { unique_values = {
"record_type": set(), "record_type": set(),
"record_class": set() "record_class": set(),
"tld": set(),
"sld": set()
} }
for entry in entries: for entry in entries:
@ -350,77 +276,18 @@ def delete_upload(upload_id):
print(f"Error deleting upload {upload_id}: {e}") print(f"Error deleting upload {upload_id}: {e}")
return False return False
# CSV Export Functions
def domains_to_csv(domains: List[Dict]) -> str:
"""Convert domains data to CSV format"""
csv_output = io.StringIO()
if not domains:
return ""
# Determine fields based on data
# Always include the full_domain field
fields = ["full_domain", "timestamp"]
if "base_domain" in domains[0]:
fields.insert(1, "base_domain")
# Add headers
writer = csv.DictWriter(csv_output, fieldnames=fields, extrasaction='ignore')
writer.writeheader()
# Add data
for domain in domains:
# Create a row dict with formatted timestamp
row = {k: domain.get(k) for k in fields}
if "timestamp" in row and row["timestamp"]:
# Format timestamp nicely for CSV
row["timestamp"] = row["timestamp"].replace('T', ' ').split('.')[0]
writer.writerow(row)
return csv_output.getvalue()
def dns_records_to_csv(records: List[Dict]) -> str:
"""Convert DNS records data to CSV format"""
csv_output = io.StringIO()
if not records:
return ""
# Define the fields to include in the CSV
fields = ["domain", "ttl", "record_class", "record_type", "record_data", "timestamp"]
# Add headers
writer = csv.DictWriter(csv_output, fieldnames=fields, extrasaction='ignore')
writer.writeheader()
# Add data
for record in records:
# Create a row dict with formatted timestamp
row = {k: record.get(k) for k in fields}
if "timestamp" in row and row["timestamp"]:
# Format timestamp nicely for CSV
row["timestamp"] = row["timestamp"].replace('T', ' ').split('.')[0]
writer.writerow(row)
return csv_output.getvalue()
# Routes # Routes
@app.get("/", response_class=HTMLResponse) @app.get("/", response_class=HTMLResponse)
async def home( async def home(request: Request, upload_id: Optional[str] = None):
request: Request, """Home page with upload form and SLD listing"""
upload_id: Optional[str] = None, domains = load_domains(upload_id)
base_domains_only: Optional[bool] = False
):
"""Home page with upload form and domain listing"""
domains = load_domains(upload_id, base_domains_only)
uploads = get_uploads() uploads = get_uploads()
return templates.TemplateResponse( return templates.TemplateResponse(
"index.html", "index.html",
{ {
"request": request, "request": request,
"domains": domains, "domains": domains,
"uploads": uploads, "uploads": uploads
"base_domains_only": base_domains_only
} }
) )
@ -494,23 +361,28 @@ async def dns_records(
upload_id: Optional[str] = None, upload_id: Optional[str] = None,
record_type: Optional[str] = None, record_type: Optional[str] = None,
record_class: Optional[str] = None, record_class: Optional[str] = None,
domain: Optional[str] = None, tld: Optional[str] = None,
deduplicate: Optional[bool] = True # Default to showing only unique latest entries sld: Optional[str] = None,
domain: Optional[str] = None
): ):
"""DNS Records page with filtering""" """DNS Records page with filtering"""
# Get all entries first, based on upload_id if provided, with deduplication option # Get all entries first, based on upload_id if provided
entries = load_dns_entries(upload_id, deduplicate) entries = load_dns_entries(upload_id)
# Apply additional filters if provided # Apply additional filters if provided
if record_type: if record_type:
entries = [e for e in entries if e.get("record_type") == record_type] entries = [e for e in entries if e.get("record_type") == record_type]
if record_class: if record_class:
entries = [e for e in entries if e.get("record_class") == record_class] entries = [e for e in entries if e.get("record_class") == record_class]
if tld:
entries = [e for e in entries if e.get("tld") == tld]
if sld:
entries = [e for e in entries if e.get("sld") == sld]
if domain: if domain:
entries = [e for e in entries if domain.lower() in e.get("domain", "").lower()] entries = [e for e in entries if domain.lower() in e.get("domain", "").lower()]
# Get unique values for filter dropdowns from all entries (not filtered) # Get unique values for filter dropdowns from all entries (not filtered)
all_entries = load_dns_entries(upload_id, deduplicate=False) all_entries = load_dns_entries(upload_id)
unique_values = get_unique_values(all_entries) unique_values = get_unique_values(all_entries)
uploads = get_uploads() uploads = get_uploads()
@ -520,96 +392,34 @@ async def dns_records(
"request": request, "request": request,
"entries": entries, "entries": entries,
"unique_values": unique_values, "unique_values": unique_values,
"uploads": uploads, "uploads": uploads
"deduplicate": deduplicate
} }
) )
@app.get("/export-domains-csv")
async def export_domains_csv(
upload_id: Optional[str] = None,
base_domains_only: Optional[bool] = False
):
"""Export domains as CSV"""
domains = load_domains(upload_id, base_domains_only)
csv_content = domains_to_csv(domains)
# Generate a filename with timestamp
filename = f"domains_export_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
# Return the CSV as a downloadable file
return Response(
content=csv_content,
media_type="text/csv",
headers={"Content-Disposition": f"attachment; filename={filename}"}
)
@app.get("/export-dns-csv")
async def export_dns_csv(
upload_id: Optional[str] = None,
record_type: Optional[str] = None,
record_class: Optional[str] = None,
domain: Optional[str] = None,
deduplicate: Optional[bool] = True
):
"""Export DNS records as CSV"""
# Get entries with applied filters
entries = load_dns_entries(upload_id, deduplicate)
# Apply additional filters if provided
if record_type:
entries = [e for e in entries if e.get("record_type") == record_type]
if record_class:
entries = [e for e in entries if e.get("record_class") == record_class]
if domain:
entries = [e for e in entries if domain.lower() in e.get("domain", "").lower()]
csv_content = dns_records_to_csv(entries)
# Generate a filename with timestamp
filename = f"dns_records_export_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
# Return the CSV as a downloadable file
return Response(
content=csv_content,
media_type="text/csv",
headers={"Content-Disposition": f"attachment; filename={filename}"}
)
# API Routes # API Routes
@app.get("/api/uploads", response_model=List[Dict]) @app.get("/api/uploads", response_model=List[Dict])
async def get_all_uploads(): async def get_all_uploads():
"""API endpoint that returns all uploads""" """API endpoint that returns all uploads"""
return get_uploads() return get_uploads()
@app.get("/api/domains", response_model=List[Dict]) @app.get("/api/slds", response_model=List[Dict])
async def get_domains( async def get_slds(upload_id: Optional[str] = None):
upload_id: Optional[str] = None, """API endpoint that returns all SLDs with optional filter by upload_id"""
base_domains_only: Optional[bool] = False # The load_domains function now handles deduplication and upload_id filtering
): domains = load_domains(upload_id)
"""API endpoint that returns all domains with optional filtering"""
# The load_domains function handles deduplication and filtering
domains = load_domains(upload_id, base_domains_only)
return domains return domains
@app.get("/api/base-domains", response_model=List[Dict]) @app.get("/api/slds/{sld}", response_model=List[Dict])
async def get_base_domains(upload_id: Optional[str] = None): async def get_domains_by_sld(sld: str, upload_id: Optional[str] = None):
"""API endpoint that returns only unique base domains""" """API endpoint that returns domains for a specific SLD with optional filter by upload_id"""
# Get only the unique base domains
base_domains = get_unique_base_domains(upload_id)
return base_domains
@app.get("/api/domains/{domain}", response_model=List[Dict])
async def get_domains_by_name(domain: str, upload_id: Optional[str] = None):
"""API endpoint that returns domains matching a specific domain name with optional filter by upload_id"""
# Get domains, already deduplicated and optionally filtered by upload_id # Get domains, already deduplicated and optionally filtered by upload_id
all_domains = load_domains(upload_id) all_domains = load_domains(upload_id)
# Filter by domain name # Filter by SLD
filtered = [item for item in all_domains if domain.lower() in item["full_domain"].lower()] filtered = [item for item in all_domains if item["sld"].lower() == sld.lower()]
if not filtered: if not filtered:
raise HTTPException(status_code=404, detail=f"No domains found matching: {domain}") raise HTTPException(status_code=404, detail=f"No domains found with SLD: {sld}")
return filtered return filtered
@ -617,19 +427,24 @@ async def get_domains_by_name(domain: str, upload_id: Optional[str] = None):
async def get_dns_entries( async def get_dns_entries(
record_type: Optional[str] = None, record_type: Optional[str] = None,
record_class: Optional[str] = None, record_class: Optional[str] = None,
tld: Optional[str] = None,
sld: Optional[str] = None,
domain: Optional[str] = None, domain: Optional[str] = None,
upload_id: Optional[str] = None, upload_id: Optional[str] = None
deduplicate: Optional[bool] = True
): ):
"""API endpoint that returns filtered DNS entries with optional deduplication""" """API endpoint that returns filtered DNS entries"""
# Get entries - if upload_id is specified, only those entries are returned # Get entries - if upload_id is specified, only those entries are returned
entries = load_dns_entries(upload_id, deduplicate) entries = load_dns_entries(upload_id)
# Apply additional filters if provided # Apply additional filters if provided
if record_type: if record_type:
entries = [e for e in entries if e.get("record_type") == record_type] entries = [e for e in entries if e.get("record_type") == record_type]
if record_class: if record_class:
entries = [e for e in entries if e.get("record_class") == record_class] entries = [e for e in entries if e.get("record_class") == record_class]
if tld:
entries = [e for e in entries if e.get("tld") == tld]
if sld:
entries = [e for e in entries if e.get("sld") == sld]
if domain: if domain:
entries = [e for e in entries if domain.lower() in e.get("domain", "").lower()] entries = [e for e in entries if domain.lower() in e.get("domain", "").lower()]

View file

@ -115,10 +115,14 @@
background-color: #e0e0e0; background-color: #e0e0e0;
color: #333; color: #333;
} }
.domain-badge { .sld-badge {
background-color: #d1e7dd; background-color: #d1e7dd;
color: #0f5132; color: #0f5132;
} }
.tld-badge {
background-color: #cfe2ff;
color: #0a58ca;
}
.service-badge { .service-badge {
background-color: #fff3cd; background-color: #fff3cd;
color: #664d03; color: #664d03;
@ -145,13 +149,6 @@
margin-left: 10px; margin-left: 10px;
font-weight: normal; font-weight: normal;
} }
.dedup-note {
display: block;
font-size: 0.7em;
color: #666;
font-weight: normal;
margin-top: 5px;
}
.tooltip { .tooltip {
position: relative; position: relative;
display: inline-block; display: inline-block;
@ -184,43 +181,14 @@
width: 100%; width: 100%;
} }
} }
.table-header {
display: flex;
justify-content: space-between;
align-items: flex-start;
margin-bottom: 15px;
}
.table-actions {
display: flex;
gap: 10px;
margin-top: 10px;
}
.btn-export {
display: inline-flex;
align-items: center;
gap: 5px;
padding: 8px 15px;
background-color: #4CAF50;
color: white;
border-radius: 4px;
text-decoration: none;
font-weight: bold;
font-size: 0.9em;
}
.btn-export:hover {
background-color: #45a049;
}
.icon {
font-size: 1.2em;
}
</style> </style>
</head> </head>
<body> <body>
<div class="container"> <div class="container">
<h1>DNS Records Viewer {% if deduplicate %}(Deduplicated){% else %}(All Records){% endif %}</h1> <h1>DNS Entry Viewer</h1>
<div class="nav"> <div class="nav">
<a href="/" class="nav-link">Domains</a> <a href="/" class="nav-link">SLD View</a>
<a href="/dns-records" class="nav-link">DNS Records</a> <a href="/dns-records" class="nav-link">DNS Records</a>
</div> </div>
@ -255,45 +223,34 @@
</select> </select>
</div> </div>
<div class="filter-group"> <div class="filter-group">
<label for="domain-search">Domain Search:</label> <label for="tld">TLD:</label>
<input type="text" id="domain-search" name="domain" placeholder="Enter domain name..." value="{{ request.query_params.get('domain', '') }}"> <select id="tld" name="tld">
<option value="">All TLDs</option>
{% for tld in unique_values.tld %}
<option value="{{ tld }}" {% if request.query_params.get('tld') == tld %}selected{% endif %}>{{ tld }}</option>
{% endfor %}
</select>
</div> </div>
<div class="filter-group"> <div class="filter-group">
<label for="deduplicate">Deduplicate Records:</label> <label for="sld">SLD:</label>
<select id="deduplicate" name="deduplicate" class="filter-select"> <select id="sld" name="sld">
<option value="true" {% if request.query_params.get('deduplicate', 'true') == 'true' %}selected{% endif %}>Yes (Deduplicate Identical Records)</option> <option value="">All SLDs</option>
<option value="false" {% if request.query_params.get('deduplicate') == 'false' %}selected{% endif %}>No (Show All)</option> {% for sld in unique_values.sld %}
<option value="{{ sld }}" {% if request.query_params.get('sld') == sld %}selected{% endif %}>{{ sld }}</option>
{% endfor %}
</select> </select>
</div> </div>
<div class="filter-group">
<label for="domain-search">Domain Search:</label>
<input type="text" id="domain-search" name="domain" placeholder="Enter domain name..." value="{{ request.query_params.get('domain', '') }}">
</div>
<div class="filter-buttons"> <div class="filter-buttons">
<button type="submit">Apply Filters</button> <button type="submit">Apply Filters</button>
<a href="/dns-records" class="reset-button" style="padding: 8px 16px; background-color: #f44336; color: white; text-decoration: none; border-radius: 4px; font-weight: bold; display: inline-block;">Reset</a> <a href="/dns-records" class="reset-button" style="padding: 8px 16px; background-color: #f44336; color: white; text-decoration: none; border-radius: 4px; font-weight: bold; display: inline-block;">Reset</a>
</div> </div>
</form> </form>
<div class="api-section"> <h2>DNS Records <span class="count-badge">{{ entries|length }}</span></h2>
<h3>API Endpoints</h3>
<p>Get all DNS entries: <code>/api/dns</code></p>
<p>Get filtered DNS entries: <code>/api/dns?record_type=A&domain=example.com</code></p>
<p>Filter by upload: <code>/api/dns?upload_id={upload_id}</code></p>
<p>Show all records (no deduplication): <code>/api/dns?deduplicate=false</code></p>
<p>Get unique filter values: <code>/api/dns/types</code></p>
</div>
<div class="table-header">
<h2>DNS Records <span class="count-badge">{{ entries|length }}</span>
{% if deduplicate %}
<small class="dedup-note">(Showing most recent entries for each Domain+Class+Type+TTL+Data combination)</small>
{% endif %}
</h2>
{% if entries %}
<div class="table-actions">
<a href="/export-dns-csv?{% if request.query_params.get('upload_id') %}upload_id={{ request.query_params.get('upload_id') }}{% endif %}{% if request.query_params.get('record_type') %}&record_type={{ request.query_params.get('record_type') }}{% endif %}{% if request.query_params.get('record_class') %}&record_class={{ request.query_params.get('record_class') }}{% endif %}{% if request.query_params.get('domain') %}&domain={{ request.query_params.get('domain') }}{% endif %}{% if request.query_params.get('deduplicate') == 'false' %}&deduplicate=false{% endif %}" class="btn-export" title="Export to CSV">
<span class="icon">📥</span> Export CSV
</a>
</div>
{% endif %}
</div>
{% if entries %} {% if entries %}
<table id="dns-table"> <table id="dns-table">
@ -314,7 +271,11 @@
{% if entry.get('service') %} {% if entry.get('service') %}
<span class="badge service-badge">{{ entry.service }}</span> <span class="badge service-badge">{{ entry.service }}</span>
{% endif %} {% endif %}
<span class="badge domain-badge">{{ entry.domain }}</span> {% if entry.get('subdomain') %}
{{ entry.subdomain }}.
{% endif %}
<span class="badge sld-badge">{{ entry.sld }}</span>.
<span class="badge tld-badge">{{ entry.tld }}</span>
</td> </td>
<td>{{ entry.ttl }}</td> <td>{{ entry.ttl }}</td>
<td>{{ entry.record_class }}</td> <td>{{ entry.record_class }}</td>
@ -337,6 +298,14 @@
{% else %} {% else %}
<p>No DNS entries found. Please upload a CSV file to get started.</p> <p>No DNS entries found. Please upload a CSV file to get started.</p>
{% endif %} {% endif %}
<div class="api-section">
<h2>API Endpoints</h2>
<p>Get all DNS entries: <code>/api/dns</code></p>
<p>Get filtered DNS entries: <code>/api/dns?record_type=A&tld=de</code></p>
<p>Filter by upload: <code>/api/dns?upload_id={upload_id}</code></p>
<p>Get unique filter values: <code>/api/dns/types</code></p>
</div>
</div> </div>
<!-- All JavaScript removed, using server-side FastAPI for filtering --> <!-- All JavaScript removed, using server-side FastAPI for filtering -->

View file

@ -53,7 +53,7 @@
tr:hover { tr:hover {
background-color: #f5f5f5; background-color: #f5f5f5;
} }
.domain-badge { .sld-badge {
display: inline-block; display: inline-block;
padding: 3px 7px; padding: 3px 7px;
background-color: #d1e7dd; background-color: #d1e7dd;
@ -61,7 +61,7 @@
font-size: 0.9em; font-size: 0.9em;
color: #0f5132; color: #0f5132;
} }
.base-domain-badge { .tld-badge {
display: inline-block; display: inline-block;
padding: 3px 7px; padding: 3px 7px;
background-color: #cfe2ff; background-color: #cfe2ff;
@ -69,15 +69,6 @@
font-size: 0.9em; font-size: 0.9em;
color: #0a58ca; color: #0a58ca;
} }
.same-domain-badge {
display: inline-block;
padding: 3px 7px;
background-color: #e9ecef;
border-radius: 4px;
font-size: 0.9em;
color: #6c757d;
font-style: italic;
}
.api-section { .api-section {
margin-top: 30px; margin-top: 30px;
padding: 15px; padding: 15px;
@ -144,76 +135,12 @@
} }
.filter-form { .filter-form {
margin-bottom: 20px; margin-bottom: 20px;
background-color: #f9f9f9;
padding: 15px;
border-radius: 5px;
}
.filter-row {
display: flex;
flex-wrap: wrap;
gap: 15px;
align-items: flex-end;
}
.filter-group {
display: flex;
flex-direction: column;
}
.filter-group label {
font-weight: bold;
margin-bottom: 5px;
font-size: 0.9em;
} }
.filter-select { .filter-select {
padding: 8px 12px; padding: 8px 12px;
border: 1px solid #ddd; border: 1px solid #ddd;
border-radius: 4px; border-radius: 4px;
min-width: 150px; margin-right: 10px;
}
.btn-sm {
padding: 8px 16px;
font-size: 0.9em;
}
.reset-button {
display: inline-block;
padding: 8px 16px;
background-color: #f44336;
color: white;
text-decoration: none;
border-radius: 4px;
font-weight: bold;
font-size: 0.9em;
}
.reset-button:hover {
background-color: #e53935;
color: white;
}
.table-header {
display: flex;
justify-content: space-between;
align-items: center;
margin-bottom: 15px;
}
.table-actions {
display: flex;
gap: 10px;
}
.btn-export {
display: inline-flex;
align-items: center;
gap: 5px;
padding: 8px 15px;
background-color: #4CAF50;
color: white;
border-radius: 4px;
text-decoration: none;
font-weight: bold;
font-size: 0.9em;
}
.btn-export:hover {
background-color: #45a049;
}
.icon {
font-size: 1.2em;
} }
</style> </style>
</head> </head>
@ -222,7 +149,7 @@
<h1>Domain Management System</h1> <h1>Domain Management System</h1>
<div class="nav"> <div class="nav">
<a href="/" class="nav-link">Domains</a> <a href="/" class="nav-link">SLD View</a>
<a href="/dns-records" class="nav-link">DNS Records</a> <a href="/dns-records" class="nav-link">DNS Records</a>
</div> </div>
@ -280,10 +207,8 @@
<div class="filter-form"> <div class="filter-form">
<h2>Domain List</h2> <h2>Domain List</h2>
<form id="filterForm" method="get"> <form id="filterForm" method="get">
<div class="filter-row">
<div class="filter-group">
<label for="upload_filter">Filter by upload:</label> <label for="upload_filter">Filter by upload:</label>
<select id="upload_filter" name="upload_id" class="filter-select"> <select id="upload_filter" name="upload_id" class="filter-select" onchange="this.form.submit()">
<option value="">All uploads</option> <option value="">All uploads</option>
{% for upload in uploads %} {% for upload in uploads %}
<option value="{{ upload.id }}" {% if request.query_params.get('upload_id') == upload.id %}selected{% endif %}> <option value="{{ upload.id }}" {% if request.query_params.get('upload_id') == upload.id %}selected{% endif %}>
@ -291,60 +216,36 @@
</option> </option>
{% endfor %} {% endfor %}
</select> </select>
</div>
<div class="filter-group">
<label for="base_domains_only">Show base domains only:</label>
<select id="base_domains_only" name="base_domains_only" class="filter-select">
<option value="false" {% if request.query_params.get('base_domains_only', 'false') == 'false' %}selected{% endif %}>No (Show All)</option>
<option value="true" {% if request.query_params.get('base_domains_only') == 'true' %}selected{% endif %}>Yes (example.com only)</option>
</select>
</div>
<div class="filter-buttons">
<button type="submit" class="btn btn-sm">Apply Filters</button>
<a href="/" class="reset-button">Reset</a>
</div>
</div>
</form> </form>
</div> </div>
<div class="api-section"> <div class="api-section">
<h3>API Endpoints</h3> <h3>API Endpoints</h3>
<p>Get all uploads: <code>/api/uploads</code></p> <p>Get all uploads: <code>/api/uploads</code></p>
<p>Get all domains: <code>/api/domains</code></p> <p>Get all domains: <code>/api/slds</code></p>
<p>Get only base domains: <code>/api/base-domains</code> (simplified format: <code>{"domain": "example.com", "timestamp": "..."}</code>)</p> <p>Get domains by SLD: <code>/api/slds/{sld}</code></p>
<p>Get domains by name: <code>/api/domains/{domain}</code></p> <p>Filter by upload: <code>/api/slds?upload_id={upload_id}</code></p>
<p>Filter by upload: <code>/api/domains?upload_id={upload_id}</code></p>
<p>Show base domains only: <code>/api/domains?base_domains_only=true</code></p>
</div> </div>
{% if domains %} {% if domains %}
<div class="table-header">
<p>Found {{ domains|length }} domains{% if request.query_params.get('upload_id') %} in this upload{% endif %}.</p> <p>Found {{ domains|length }} domains{% if request.query_params.get('upload_id') %} in this upload{% endif %}.</p>
<div class="table-actions">
<a href="/export-domains-csv?{% if request.query_params.get('upload_id') %}upload_id={{ request.query_params.get('upload_id') }}{% endif %}{% if request.query_params.get('base_domains_only') == 'true' %}&base_domains_only=true{% endif %}" class="btn-export" title="Export to CSV">
<span class="icon">📥</span> Export CSV
</a>
</div>
</div>
<table> <table>
<thead> <thead>
<tr> <tr>
<th>Domain</th> <th>SLD</th>
{% if not base_domains_only %} <th>TLD</th>
<th>Base Domain</th> <th>Subdomain</th>
{% endif %} <th>Full Domain</th>
<th>Upload Date</th> <th>Upload Date</th>
</tr> </tr>
</thead> </thead>
<tbody> <tbody>
{% for item in domains %} {% for item in domains %}
<tr> <tr>
<td><span class="domain-badge">{{ item.full_domain }}</span></td> <td><span class="sld-badge">{{ item.sld }}</span></td>
{% if not base_domains_only %} <td><span class="tld-badge">{{ item.tld }}</span></td>
<td>{% if item.base_domain != item.full_domain %}<span class="base-domain-badge">{{ item.base_domain }}</span>{% else %}<span class="same-domain-badge">Same as domain</span>{% endif %}</td> <td>{{ item.get('subdomain', 'N/A') }}</td>
{% endif %} <td>{{ item.full_domain }}</td>
<td>{{ item.timestamp.replace('T', ' ').split('.')[0] if item.get('timestamp') else 'N/A' }}</td> <td>{{ item.timestamp.replace('T', ' ').split('.')[0] if item.get('timestamp') else 'N/A' }}</td>
</tr> </tr>
{% endfor %} {% endfor %}