172 lines
4.8 KiB
Python
172 lines
4.8 KiB
Python
# This file is part of dnmgmt, a number resource management system
|
|
# Licensed under GNU General Public License v3 or later
|
|
# Written by Sebastian Lohff (seba@someserver.de)
|
|
|
|
import whoisdb.models
|
|
import domains.models
|
|
|
|
from django.db.models import F, When, Max, Case, IntegerField
|
|
|
|
import ipaddress
|
|
import re
|
|
|
|
def _addFields(fields, obj, fieldNames):
|
|
for fieldName in fieldNames:
|
|
fields.append((fieldName.capitalize().replace("_", " "), getattr(obj, fieldName)))
|
|
|
|
|
|
def getWhoisObjectFields(obj, owner):
|
|
fields = []
|
|
|
|
if getattr(obj, "handle", None):
|
|
_addFields(fields, obj, ["handle"])
|
|
|
|
c = type(obj)
|
|
if c == whoisdb.models.Maintainer:
|
|
_addFields(fields, obj, ["description", "admin_c"])
|
|
if owner:
|
|
_addFields(fields, obj, ["auth"])
|
|
elif c == whoisdb.models.Contact:
|
|
_addFields(fields, obj, ["name", "mnt_by"])
|
|
elif c == whoisdb.models.ASBlock:
|
|
_addFields(fields, obj, ["name"])
|
|
fields.append(("AS Range", "%s - %s" % (obj.asBegin, obj.asEnd)))
|
|
_addFields(fields, obj, ["description", "parent_block", "mnt_by", "mnt_lower", "admin_c"])
|
|
elif c == whoisdb.models.ASNumber:
|
|
_addFields(fields, obj, ["name", "number", "description", "asblock", "volatile", "mnt_by", "mnt_lower", "admin_c"])
|
|
elif c == whoisdb.models.InetNum:
|
|
_addFields(fields, obj, ["name"])
|
|
fields.append(("Address CIDR", obj.prefix()))
|
|
_addFields(fields, obj, ["description", "parent_range", "origin_as", "mnt_by", "mnt_lower", "admin_c"])
|
|
elif c == domains.models.Domain:
|
|
_addFields(fields, obj, ["name", "nameservers", "mnt_by", "admin_c"])
|
|
elif c == domains.models.Nameserver:
|
|
_addFields(fields, obj, ["name", "glueIPv4", "glueIPv6", "mnt_by", "admin_c"])
|
|
elif c == domains.models.ReverseZone:
|
|
#_addFields(fields, obj, ["name"])
|
|
fields.append(("Address CIDR", obj.prefix()))
|
|
_addFields(fields, obj, ["parentNet", "nameservers"])
|
|
|
|
_addFields(fields, obj, ["created", "last_modified"])
|
|
|
|
return fields
|
|
|
|
|
|
def guessWhoisObject(self, handle):
|
|
# is it a normal handle?
|
|
pass
|
|
|
|
def findInDatabase(rawValue):
|
|
# is this an ip address?
|
|
rawValue = rawValue.strip().upper()
|
|
value = None
|
|
results = []
|
|
|
|
# try subnetwork
|
|
try:
|
|
value = ipaddress.ip_network(rawValue, strict=False)
|
|
except ValueError:
|
|
pass
|
|
|
|
if value:
|
|
# ssubnet
|
|
obj = whoisdb.models.InetNum.objects.filter(address=str(value.network_address), netmask=value.prefixlen)
|
|
results.extend(obj)
|
|
|
|
# single ip
|
|
value = None
|
|
try:
|
|
value = ipaddress.ip_address(rawValue)
|
|
except ValueError:
|
|
pass
|
|
|
|
if value:
|
|
# NOTE: this is only for "small subnets", we could increase this...
|
|
baseaddr = None
|
|
if value.version == 4:
|
|
basenet = ipaddress.ip_network("%s/24" % value, strict=False)
|
|
baseaddr = ".".join(str(basenet).split(".")[0:3]) + "."
|
|
else:
|
|
basenet = ipaddress.ip_network("%s/56" % value.exploded, strict=False)
|
|
baseaddr = ":".join(str(basenet).split(":")[0:4])[-2]
|
|
|
|
nets = whoisdb.models.InetNum.objects.filter(address__startswith=baseaddr).order_by("-netmask")
|
|
for net in nets:
|
|
if value in net.getNetwork():
|
|
results.append(net)
|
|
break
|
|
|
|
# asnumber?
|
|
m = re.match("^(?:AS)?(\d+)$", rawValue)
|
|
if m:
|
|
# asnumber!
|
|
num = int(m.group(1))
|
|
obj = whoisdb.models.ASNumber.objects.filter(number=num)
|
|
results.extend(obj)
|
|
|
|
# find a matching block
|
|
blocks = whoisdb.models.ASBlock.objects.filter(asBegin__lte=num, asEnd__gte=num).annotate(size=F('asEnd')-F('asBegin')).order_by('size')
|
|
if blocks.count() > 0:
|
|
results.append(blocks[0])
|
|
|
|
# asblocks? smallest asblock containing the range
|
|
# WHEN anotation foo... could also be done in asnumber match
|
|
# also look for number - number queries?
|
|
|
|
# domain?
|
|
if rawValue.endswith("DN") or rawValue.endswith("DN."):
|
|
value = rawValue.lower()
|
|
if not value.endswith("."):
|
|
value += "."
|
|
|
|
obj = domains.models.Domain.objects.filter(name=value)
|
|
results.extend(obj)
|
|
|
|
# contact by name?
|
|
|
|
# handlenames for Maintainer, Contact, InetNum, ASNumber, ASBlock
|
|
handleObjs = [
|
|
whoisdb.models.Contact,
|
|
whoisdb.models.Maintainer,
|
|
whoisdb.models.InetNum,
|
|
whoisdb.models.ASBlock,
|
|
whoisdb.models.ASNumber,
|
|
]
|
|
for handleObj in handleObjs:
|
|
obj = handleObj.objects.filter(handle=rawValue)
|
|
if not obj and len(rawValue) >= 3:
|
|
obj = handleObj.objects.filter(handle__startswith=rawValue)
|
|
results.extend(obj)
|
|
|
|
return results
|
|
|
|
def findHandleFromStr(rawValue):
|
|
handleObjs = [
|
|
whoisdb.models.Contact,
|
|
whoisdb.models.Maintainer,
|
|
whoisdb.models.InetNum,
|
|
whoisdb.models.ASBlock,
|
|
whoisdb.models.ASNumber,
|
|
]
|
|
|
|
for handleObj in handleObjs:
|
|
try:
|
|
return handleObj.objects.get(handle=rawValue)
|
|
except handleObj.DoesNotExist:
|
|
pass
|
|
|
|
return None
|
|
|
|
|
|
def orderQueryset(qs, userOwned, objOwned):
|
|
# when for
|
|
whens = [When(userOwned, then=2)]
|
|
if objOwned:
|
|
# add existing
|
|
whens.append(When(objOwned, then=1))
|
|
|
|
qs = qs.annotate(card=Max(Case(*whens, default=0, output_field=IntegerField()))).order_by("-card")
|
|
|
|
return qs
|
|
|