CHANGE SLD+TLD to FQDN, CHANGE dedublication to show all entries
This commit is contained in:
parent
6ce10f673e
commit
fc72f6f51c
3 changed files with 83 additions and 142 deletions
140
main.py
140
main.py
|
@ -32,29 +32,11 @@ def process_domain_entry(domain_entry):
|
|||
if domain_entry.endswith('.'):
|
||||
domain_entry = domain_entry[:-1]
|
||||
|
||||
# Parse domain components
|
||||
parts = domain_entry.split('.')
|
||||
if len(parts) > 1:
|
||||
# For domain.tld format
|
||||
if len(parts) == 2:
|
||||
sld = parts[0] # Second Level Domain
|
||||
tld = parts[1] # Top Level Domain
|
||||
domain_info = {
|
||||
"sld": sld,
|
||||
"tld": tld,
|
||||
"full_domain": domain_entry
|
||||
}
|
||||
# For subdomain.domain.tld format
|
||||
else:
|
||||
sld = parts[-2] # Second Level Domain
|
||||
tld = parts[-1] # Top Level Domain
|
||||
subdomain = '.'.join(parts[:-2]) # Subdomains
|
||||
domain_info = {
|
||||
"sld": sld,
|
||||
"tld": tld,
|
||||
"full_domain": domain_entry,
|
||||
"subdomain": subdomain
|
||||
}
|
||||
if domain_entry:
|
||||
# Store only the full domain name without splitting
|
||||
domain_info = {
|
||||
"full_domain": domain_entry
|
||||
}
|
||||
return domain_info
|
||||
return None
|
||||
|
||||
|
@ -90,7 +72,7 @@ async def process_csv_upload(file_content, upload_id, description=None):
|
|||
domain_info = process_domain_entry(domain_entry)
|
||||
if domain_info:
|
||||
# Create a unique key to avoid duplicates within this upload
|
||||
unique_key = f"{domain_info['sld']}.{domain_info['tld']}"
|
||||
unique_key = domain_info['full_domain']
|
||||
|
||||
if unique_key not in unique_domains:
|
||||
unique_domains.add(unique_key)
|
||||
|
@ -127,22 +109,9 @@ async def process_csv_upload(file_content, upload_id, description=None):
|
|||
"timestamp": timestamp
|
||||
}
|
||||
|
||||
# Add domain components
|
||||
if len(domain_parts) > 1:
|
||||
if domain_parts[0].startswith('_'): # Service records like _dmarc
|
||||
entry["service"] = domain_parts[0]
|
||||
# Adjust domain parts
|
||||
domain_parts = domain_parts[1:]
|
||||
|
||||
# For domain.tld format
|
||||
if len(domain_parts) == 2:
|
||||
entry["sld"] = domain_parts[0] # Second Level Domain
|
||||
entry["tld"] = domain_parts[1] # Top Level Domain
|
||||
# For subdomain.domain.tld format
|
||||
elif len(domain_parts) > 2:
|
||||
entry["sld"] = domain_parts[-2] # Second Level Domain
|
||||
entry["tld"] = domain_parts[-1] # Top Level Domain
|
||||
entry["subdomain"] = '.'.join(domain_parts[:-2]) # Subdomains
|
||||
# Add special handling for service records
|
||||
if len(domain_parts) > 0 and domain_parts[0].startswith('_'): # Service records like _dmarc
|
||||
entry["service"] = domain_parts[0]
|
||||
|
||||
dns_records_to_insert.append(entry)
|
||||
|
||||
|
@ -201,34 +170,38 @@ def load_domains(specific_upload_id: str = None) -> List[Dict]:
|
|||
print(f"Error loading domains from database: {e}")
|
||||
return []
|
||||
|
||||
# Load DNS entries from database - deduplicated by domain, class, and type (no history)
|
||||
def load_dns_entries(specific_upload_id: str = None) -> List[Dict]:
|
||||
# Load DNS entries from database - with optional deduplication
|
||||
def load_dns_entries(specific_upload_id: str = None, deduplicate: bool = False) -> List[Dict]:
|
||||
try:
|
||||
entries = dns_records_table.all()
|
||||
|
||||
# If a specific upload ID is provided, only show records from that upload
|
||||
if specific_upload_id:
|
||||
entries = [e for e in entries if e.get('upload_id') == specific_upload_id]
|
||||
return entries
|
||||
|
||||
# Sort by timestamp in descending order (newest first)
|
||||
entries.sort(key=lambda x: x.get('timestamp', ''), reverse=True)
|
||||
|
||||
# Create a dictionary to track unique entries (most recent only)
|
||||
unique_entries = {}
|
||||
|
||||
for entry in entries:
|
||||
# Create a unique key based on domain, class, and type
|
||||
unique_key = f"{entry.get('domain')}:{entry.get('record_class')}:{entry.get('record_type')}"
|
||||
# If deduplication is requested, only keep the most recent entry for each unique combination
|
||||
if deduplicate:
|
||||
# Create a dictionary to track unique entries (most recent only)
|
||||
unique_entries = {}
|
||||
|
||||
# Only keep the most recent entry for each unique combination
|
||||
if unique_key not in unique_entries:
|
||||
# Mark as most recent entry
|
||||
entry['is_latest'] = True
|
||||
unique_entries[unique_key] = entry
|
||||
|
||||
# Return the deduplicated list with only the most recent entries
|
||||
return list(unique_entries.values())
|
||||
for entry in entries:
|
||||
# Create a unique key based on domain, class, type, TTL, and data
|
||||
unique_key = f"{entry.get('domain')}:{entry.get('record_class')}:{entry.get('record_type')}:{entry.get('ttl')}:{entry.get('record_data')}"
|
||||
|
||||
# Only keep the most recent entry for each unique combination
|
||||
if unique_key not in unique_entries:
|
||||
# Mark as most recent entry
|
||||
entry['is_latest'] = True
|
||||
unique_entries[unique_key] = entry
|
||||
|
||||
# Return the deduplicated list with only the most recent entries
|
||||
return list(unique_entries.values())
|
||||
else:
|
||||
# No deduplication - return all entries
|
||||
return entries
|
||||
except Exception as e:
|
||||
print(f"Error loading DNS records from database: {e}")
|
||||
return []
|
||||
|
@ -237,9 +210,7 @@ def load_dns_entries(specific_upload_id: str = None) -> List[Dict]:
|
|||
def get_unique_values(entries: List[Dict]) -> Dict[str, Set]:
|
||||
unique_values = {
|
||||
"record_type": set(),
|
||||
"record_class": set(),
|
||||
"tld": set(),
|
||||
"sld": set()
|
||||
"record_class": set()
|
||||
}
|
||||
|
||||
for entry in entries:
|
||||
|
@ -361,28 +332,23 @@ async def dns_records(
|
|||
upload_id: Optional[str] = None,
|
||||
record_type: Optional[str] = None,
|
||||
record_class: Optional[str] = None,
|
||||
tld: Optional[str] = None,
|
||||
sld: Optional[str] = None,
|
||||
domain: Optional[str] = None
|
||||
domain: Optional[str] = None,
|
||||
deduplicate: Optional[bool] = True # Default to showing only unique latest entries
|
||||
):
|
||||
"""DNS Records page with filtering"""
|
||||
# Get all entries first, based on upload_id if provided
|
||||
entries = load_dns_entries(upload_id)
|
||||
# Get all entries first, based on upload_id if provided, with deduplication option
|
||||
entries = load_dns_entries(upload_id, deduplicate)
|
||||
|
||||
# Apply additional filters if provided
|
||||
if record_type:
|
||||
entries = [e for e in entries if e.get("record_type") == record_type]
|
||||
if record_class:
|
||||
entries = [e for e in entries if e.get("record_class") == record_class]
|
||||
if tld:
|
||||
entries = [e for e in entries if e.get("tld") == tld]
|
||||
if sld:
|
||||
entries = [e for e in entries if e.get("sld") == sld]
|
||||
if domain:
|
||||
entries = [e for e in entries if domain.lower() in e.get("domain", "").lower()]
|
||||
|
||||
# Get unique values for filter dropdowns from all entries (not filtered)
|
||||
all_entries = load_dns_entries(upload_id)
|
||||
all_entries = load_dns_entries(upload_id, deduplicate=False)
|
||||
unique_values = get_unique_values(all_entries)
|
||||
uploads = get_uploads()
|
||||
|
||||
|
@ -392,7 +358,8 @@ async def dns_records(
|
|||
"request": request,
|
||||
"entries": entries,
|
||||
"unique_values": unique_values,
|
||||
"uploads": uploads
|
||||
"uploads": uploads,
|
||||
"deduplicate": deduplicate
|
||||
}
|
||||
)
|
||||
|
||||
|
@ -402,24 +369,24 @@ async def get_all_uploads():
|
|||
"""API endpoint that returns all uploads"""
|
||||
return get_uploads()
|
||||
|
||||
@app.get("/api/slds", response_model=List[Dict])
|
||||
async def get_slds(upload_id: Optional[str] = None):
|
||||
"""API endpoint that returns all SLDs with optional filter by upload_id"""
|
||||
@app.get("/api/domains", response_model=List[Dict])
|
||||
async def get_domains(upload_id: Optional[str] = None):
|
||||
"""API endpoint that returns all domains with optional filter by upload_id"""
|
||||
# The load_domains function now handles deduplication and upload_id filtering
|
||||
domains = load_domains(upload_id)
|
||||
return domains
|
||||
|
||||
@app.get("/api/slds/{sld}", response_model=List[Dict])
|
||||
async def get_domains_by_sld(sld: str, upload_id: Optional[str] = None):
|
||||
"""API endpoint that returns domains for a specific SLD with optional filter by upload_id"""
|
||||
@app.get("/api/domains/{domain}", response_model=List[Dict])
|
||||
async def get_domains_by_name(domain: str, upload_id: Optional[str] = None):
|
||||
"""API endpoint that returns domains matching a specific domain name with optional filter by upload_id"""
|
||||
# Get domains, already deduplicated and optionally filtered by upload_id
|
||||
all_domains = load_domains(upload_id)
|
||||
|
||||
# Filter by SLD
|
||||
filtered = [item for item in all_domains if item["sld"].lower() == sld.lower()]
|
||||
# Filter by domain name
|
||||
filtered = [item for item in all_domains if domain.lower() in item["full_domain"].lower()]
|
||||
|
||||
if not filtered:
|
||||
raise HTTPException(status_code=404, detail=f"No domains found with SLD: {sld}")
|
||||
raise HTTPException(status_code=404, detail=f"No domains found matching: {domain}")
|
||||
|
||||
return filtered
|
||||
|
||||
|
@ -427,24 +394,19 @@ async def get_domains_by_sld(sld: str, upload_id: Optional[str] = None):
|
|||
async def get_dns_entries(
|
||||
record_type: Optional[str] = None,
|
||||
record_class: Optional[str] = None,
|
||||
tld: Optional[str] = None,
|
||||
sld: Optional[str] = None,
|
||||
domain: Optional[str] = None,
|
||||
upload_id: Optional[str] = None
|
||||
upload_id: Optional[str] = None,
|
||||
deduplicate: Optional[bool] = True
|
||||
):
|
||||
"""API endpoint that returns filtered DNS entries"""
|
||||
"""API endpoint that returns filtered DNS entries with optional deduplication"""
|
||||
# Get entries - if upload_id is specified, only those entries are returned
|
||||
entries = load_dns_entries(upload_id)
|
||||
entries = load_dns_entries(upload_id, deduplicate)
|
||||
|
||||
# Apply additional filters if provided
|
||||
if record_type:
|
||||
entries = [e for e in entries if e.get("record_type") == record_type]
|
||||
if record_class:
|
||||
entries = [e for e in entries if e.get("record_class") == record_class]
|
||||
if tld:
|
||||
entries = [e for e in entries if e.get("tld") == tld]
|
||||
if sld:
|
||||
entries = [e for e in entries if e.get("sld") == sld]
|
||||
if domain:
|
||||
entries = [e for e in entries if domain.lower() in e.get("domain", "").lower()]
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue