ADD base domain information on Index page and API endpoint
This commit is contained in:
parent
fc72f6f51c
commit
7db919bcb7
3 changed files with 270 additions and 27 deletions
142
main.py
142
main.py
|
@ -139,23 +139,42 @@ async def process_csv_upload(file_content, upload_id, description=None):
|
|||
print(traceback.format_exc())
|
||||
return 0, 0
|
||||
|
||||
# Load domains from database - deduplicated by full domain name
|
||||
def load_domains(specific_upload_id: str = None) -> List[Dict]:
|
||||
# Load domains from database - deduplicated by full domain name, with optional base domain filtering
|
||||
def load_domains(specific_upload_id: str = None, base_domains_only: bool = False) -> List[Dict]:
|
||||
try:
|
||||
domains = domains_table.all()
|
||||
|
||||
# If a specific upload ID is provided, only show domains from that upload
|
||||
if specific_upload_id:
|
||||
domains = [d for d in domains if d.get('upload_id') == specific_upload_id]
|
||||
return domains
|
||||
if not base_domains_only:
|
||||
return domains
|
||||
|
||||
# Add the base_domain field to each domain
|
||||
for domain in domains:
|
||||
domain['base_domain'] = extract_base_domain(domain.get('full_domain', ''))
|
||||
|
||||
# Sort by timestamp in descending order (newest first)
|
||||
domains.sort(key=lambda x: x.get('timestamp', ''), reverse=True)
|
||||
|
||||
# Create a dictionary to track unique domains by full domain name
|
||||
# Create a dictionary to track unique domains
|
||||
unique_domains = {}
|
||||
base_domains_set = set()
|
||||
|
||||
# First pass: collect all base domains
|
||||
if base_domains_only:
|
||||
for domain in domains:
|
||||
base_domains_set.add(domain.get('base_domain', ''))
|
||||
|
||||
for domain in domains:
|
||||
# If base_domains_only is True, only keep domains that are base domains themselves
|
||||
if base_domains_only:
|
||||
full_domain = domain.get('full_domain', '')
|
||||
base_domain = domain.get('base_domain', '')
|
||||
|
||||
if full_domain != base_domain:
|
||||
continue
|
||||
|
||||
# Create a unique key based on the full domain name
|
||||
unique_key = domain.get('full_domain', '')
|
||||
|
||||
|
@ -206,6 +225,90 @@ def load_dns_entries(specific_upload_id: str = None, deduplicate: bool = False)
|
|||
print(f"Error loading DNS records from database: {e}")
|
||||
return []
|
||||
|
||||
# List of known multi-part TLDs
|
||||
MULTI_PART_TLDS = [
|
||||
'co.uk', 'org.uk', 'me.uk', 'ac.uk', 'gov.uk', 'net.uk', 'sch.uk',
|
||||
'com.au', 'net.au', 'org.au', 'edu.au', 'gov.au', 'asn.au', 'id.au',
|
||||
'co.nz', 'net.nz', 'org.nz', 'govt.nz', 'ac.nz', 'school.nz', 'geek.nz',
|
||||
'com.sg', 'edu.sg', 'gov.sg', 'net.sg', 'org.sg', 'per.sg',
|
||||
'co.za', 'org.za', 'web.za', 'net.za', 'gov.za', 'ac.za',
|
||||
'com.br', 'net.br', 'org.br', 'gov.br', 'edu.br',
|
||||
'co.jp', 'ac.jp', 'go.jp', 'or.jp', 'ne.jp', 'gr.jp',
|
||||
'co.in', 'firm.in', 'net.in', 'org.in', 'gen.in', 'ind.in',
|
||||
'edu.cn', 'gov.cn', 'net.cn', 'org.cn', 'com.cn', 'ac.cn',
|
||||
'com.mx', 'net.mx', 'org.mx', 'edu.mx', 'gob.mx'
|
||||
]
|
||||
|
||||
# Extract the base domain (SLD+TLD) from a full domain name
|
||||
def extract_base_domain(domain: str) -> str:
|
||||
if not domain:
|
||||
return domain
|
||||
|
||||
# Remove trailing dot if present
|
||||
if domain.endswith('.'):
|
||||
domain = domain[:-1]
|
||||
|
||||
parts = domain.split('.')
|
||||
|
||||
# Check if the domain has enough parts
|
||||
if len(parts) <= 1:
|
||||
return domain
|
||||
|
||||
# Check for known multi-part TLDs first
|
||||
for tld in MULTI_PART_TLDS:
|
||||
tld_parts = tld.split('.')
|
||||
if len(parts) > len(tld_parts) and '.'.join(parts[-len(tld_parts):]) == tld:
|
||||
# The domain has a multi-part TLD, extract SLD + multi-part TLD
|
||||
return parts[-len(tld_parts)-1] + '.' + tld
|
||||
|
||||
# Default case: extract last two parts
|
||||
if len(parts) > 1:
|
||||
return '.'.join(parts[-2:])
|
||||
|
||||
return domain
|
||||
|
||||
# Get all unique base domains from the database
|
||||
def get_unique_base_domains(specific_upload_id: str = None) -> List[Dict]:
|
||||
try:
|
||||
domains = domains_table.all()
|
||||
|
||||
# If a specific upload ID is provided, only show domains from that upload
|
||||
if specific_upload_id:
|
||||
domains = [d for d in domains if d.get('upload_id') == specific_upload_id]
|
||||
|
||||
# Add the base_domain field to each domain
|
||||
for domain in domains:
|
||||
domain['base_domain'] = extract_base_domain(domain.get('full_domain', ''))
|
||||
|
||||
# Sort by timestamp in descending order (newest first)
|
||||
domains.sort(key=lambda x: x.get('timestamp', ''), reverse=True)
|
||||
|
||||
# Create dictionaries to track unique base domains
|
||||
unique_base_domains = {}
|
||||
|
||||
# Process each domain and keep only unique base domains
|
||||
for domain in domains:
|
||||
base_domain = domain.get('base_domain', '')
|
||||
|
||||
# Skip if no base domain
|
||||
if not base_domain:
|
||||
continue
|
||||
|
||||
# Check if this base domain has been seen before
|
||||
if base_domain not in unique_base_domains:
|
||||
# Create a new entry for this base domain - with simplified fields
|
||||
base_domain_entry = {
|
||||
'domain': base_domain,
|
||||
'timestamp': domain.get('timestamp')
|
||||
}
|
||||
unique_base_domains[base_domain] = base_domain_entry
|
||||
|
||||
# Return the list of unique base domains
|
||||
return list(unique_base_domains.values())
|
||||
except Exception as e:
|
||||
print(f"Error getting unique base domains: {e}")
|
||||
return []
|
||||
|
||||
# Get unique values for filter dropdowns
|
||||
def get_unique_values(entries: List[Dict]) -> Dict[str, Set]:
|
||||
unique_values = {
|
||||
|
@ -249,16 +352,21 @@ def delete_upload(upload_id):
|
|||
|
||||
# Routes
|
||||
@app.get("/", response_class=HTMLResponse)
|
||||
async def home(request: Request, upload_id: Optional[str] = None):
|
||||
"""Home page with upload form and SLD listing"""
|
||||
domains = load_domains(upload_id)
|
||||
async def home(
|
||||
request: Request,
|
||||
upload_id: Optional[str] = None,
|
||||
base_domains_only: Optional[bool] = False
|
||||
):
|
||||
"""Home page with upload form and domain listing"""
|
||||
domains = load_domains(upload_id, base_domains_only)
|
||||
uploads = get_uploads()
|
||||
return templates.TemplateResponse(
|
||||
"index.html",
|
||||
{
|
||||
"request": request,
|
||||
"domains": domains,
|
||||
"uploads": uploads
|
||||
"uploads": uploads,
|
||||
"base_domains_only": base_domains_only
|
||||
}
|
||||
)
|
||||
|
||||
|
@ -370,12 +478,22 @@ async def get_all_uploads():
|
|||
return get_uploads()
|
||||
|
||||
@app.get("/api/domains", response_model=List[Dict])
|
||||
async def get_domains(upload_id: Optional[str] = None):
|
||||
"""API endpoint that returns all domains with optional filter by upload_id"""
|
||||
# The load_domains function now handles deduplication and upload_id filtering
|
||||
domains = load_domains(upload_id)
|
||||
async def get_domains(
|
||||
upload_id: Optional[str] = None,
|
||||
base_domains_only: Optional[bool] = False
|
||||
):
|
||||
"""API endpoint that returns all domains with optional filtering"""
|
||||
# The load_domains function handles deduplication and filtering
|
||||
domains = load_domains(upload_id, base_domains_only)
|
||||
return domains
|
||||
|
||||
@app.get("/api/base-domains", response_model=List[Dict])
|
||||
async def get_base_domains(upload_id: Optional[str] = None):
|
||||
"""API endpoint that returns only unique base domains"""
|
||||
# Get only the unique base domains
|
||||
base_domains = get_unique_base_domains(upload_id)
|
||||
return base_domains
|
||||
|
||||
@app.get("/api/domains/{domain}", response_model=List[Dict])
|
||||
async def get_domains_by_name(domain: str, upload_id: Optional[str] = None):
|
||||
"""API endpoint that returns domains matching a specific domain name with optional filter by upload_id"""
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue