275 lines
10 KiB
Python
275 lines
10 KiB
Python
# This file is part of dnmgmt, a number resource management system
|
|
# Licensed under GNU General Public License v3 or later
|
|
# Written by Sebastian Lohff (seba@someserver.de)
|
|
|
|
from django import forms
|
|
from django.db.models import Q
|
|
|
|
from whoisdb.models import InetNum
|
|
from whoisdb.forms import MntFormMixin, WhoisObjectFormMixin
|
|
from whoisdb.validators import IP46CIDRValidator
|
|
|
|
from .models import Domain, Nameserver, ReverseZone
|
|
|
|
import re
|
|
import ipaddress
|
|
|
|
|
|
class DSRecordMixin(object):
|
|
ds_re = re.compile(r"^(?P<id>\d+)\s+(?P<crypto>\d+)\s+(?P<hashtype>\d+)\s+(?P<hash>[0-9a-fA-F]+)$")
|
|
HASH_SUPPORTED = (1, 2)
|
|
CRYPTO_SUPPORTED = (3, 5, 6, 8, 10, 13, 15)
|
|
|
|
def clean_ds_records(self):
|
|
ds_records = self.cleaned_data['ds_records'].strip()
|
|
result = []
|
|
|
|
if not ds_records:
|
|
return ''
|
|
|
|
for n, rec in enumerate(ds_records.split("\n"), 1):
|
|
rec = rec.strip()
|
|
m = self.ds_re.match(rec)
|
|
if not m:
|
|
raise forms.ValidationError("Could not parse records {} - needs to be in format "
|
|
"'<id> <crypto> <hashtype> <hash>'".format(n))
|
|
|
|
if int(m.group('hashtype')) not in self.HASH_SUPPORTED:
|
|
raise forms.ValidationError("Record {} has an invalid hashtype of {}, supported are {}"
|
|
"".format(n, m.group('hashtype'), " ".join(map(str, self.HASH_SUPPORTED))))
|
|
if int(m.group('crypto')) not in self.CRYPTO_SUPPORTED:
|
|
raise forms.ValidationError("Record {} has unsupported crypto {}, supported are {}"
|
|
"".format(n, m.group('crypto'), " ".join(map(str, self.CRYPTO_SUPPORTED))))
|
|
|
|
result.append("{id} {crypto} {hashtype} {hash}".format(**m.groupdict()))
|
|
|
|
return "\n".join(result)
|
|
|
|
|
|
class DomainForm(MntFormMixin, WhoisObjectFormMixin, DSRecordMixin, forms.ModelForm):
|
|
class Meta:
|
|
model = Domain
|
|
fields = ['name', 'nameservers', 'mnt_by', 'admin_c', 'ds_records']
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super(DomainForm, self).__init__(*args, **kwargs)
|
|
|
|
mnts = self._user.maintainer_set.all()
|
|
self.fields['nameservers'].queryset = Nameserver.objects.filter(Q(mnt_by__in=mnts))
|
|
|
|
instance = getattr(self, "instance", None)
|
|
self._create = not (instance and instance.pk)
|
|
|
|
if not self._create:
|
|
self.fields['name'].disabled = True
|
|
self.fields['nameservers'].queryset |= self.instance.nameservers.all()
|
|
|
|
self.fields['nameservers'].queryset = self.fields['nameservers'].queryset.distinct()
|
|
|
|
def clean_name(self):
|
|
name = self.cleaned_data['name'].lower()
|
|
if self._create:
|
|
if not name.endswith("."):
|
|
name += "."
|
|
|
|
if not name.endswith("dn."):
|
|
raise forms.ValidationError("Only .dn domains can be registered at this point")
|
|
|
|
if name.count(".") > 2:
|
|
raise forms.ValidationError("No subdomains can be registered")
|
|
|
|
if not re.match("^[a-z0-9.-]+$", name):
|
|
raise forms.ValidationError("Only a-z, 0-9 and - are allowed inside the domain name")
|
|
|
|
try:
|
|
Domain.objects.get(name=name)
|
|
raise forms.ValidationError("Domain already exists")
|
|
except Domain.DoesNotExist:
|
|
pass
|
|
|
|
return name
|
|
|
|
|
|
class NameserverForm(MntFormMixin, WhoisObjectFormMixin, forms.ModelForm):
|
|
class Meta:
|
|
model = Nameserver
|
|
fields = ['name', 'glueIPv4', 'glueIPv6', 'mnt_by', 'admin_c']
|
|
|
|
help_texts = {
|
|
"glueIPv4": "Note: You can only set a glue record if the base domain of this nameserver belongs to you!"
|
|
}
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super(NameserverForm, self).__init__(*args, **kwargs)
|
|
|
|
instance = getattr(self, "instance", None)
|
|
self._create = not (instance and instance.pk)
|
|
|
|
def cleanNetwork(self, glue):
|
|
ip = ipaddress.ip_address(glue)
|
|
proto = InetNum.IPv4 if ip.version == 4 else InetNum.IPv6
|
|
nets = InetNum.objects.filter(parent_range=None, protocol=proto)
|
|
|
|
if len(nets) == 0:
|
|
raise forms.ValidationError("No range has been registered for IPv%s in the whois interface" % ip.version)
|
|
|
|
for net in nets:
|
|
if ip in net.getNetwork():
|
|
break
|
|
else:
|
|
raise forms.ValidationError("Glue record address is not inside DarkNet (subnet %s)" % ", ".join(map(lambda _x: _x.prefix(), nets)))
|
|
|
|
def clean_glueIPv4(self):
|
|
glue = self.cleaned_data['glueIPv4']
|
|
|
|
if glue:
|
|
self.cleanNetwork(glue)
|
|
|
|
return glue
|
|
|
|
def clean_glueIPv6(self):
|
|
glue = self.cleaned_data['glueIPv6']
|
|
|
|
if glue:
|
|
self.cleanNetwork(glue)
|
|
|
|
return glue
|
|
|
|
def clean_name(self):
|
|
name = self.cleaned_data['name'].lower().strip()
|
|
if not name.endswith("."):
|
|
name += "."
|
|
|
|
# allow name to stay if it did not change
|
|
if not self._create and self.instance.name == name:
|
|
return name
|
|
|
|
if name.count(".") <= 2:
|
|
raise forms.ValidationError("Nameserver must be inside a domain (e.g. ns1.noot.dn.)")
|
|
|
|
mnts = self._user.maintainer_set.all()
|
|
try:
|
|
obj = Nameserver.objects.get(name=name, mnt_by__in=mnts)
|
|
if self._create or not self._create and obj.pk != self.instance.pk:
|
|
raise forms.ValidationError("You already have a nameserver with this name under your control")
|
|
except Nameserver.DoesNotExist:
|
|
pass
|
|
except Nameserver.MultipleObjectsReturned:
|
|
pass
|
|
|
|
return name
|
|
|
|
def clean(self):
|
|
cleaned_data = super(NameserverForm, self).clean()
|
|
|
|
if not self.errors:
|
|
name = cleaned_data.get("name")
|
|
mntBy = cleaned_data.get("mnt_by")
|
|
zone = ".".join(name.split(".")[-3:])
|
|
ipv4 = cleaned_data.get("glueIPv4", None)
|
|
ipv6 = cleaned_data.get("glueIPv6", None)
|
|
|
|
if not ipv4:
|
|
ipv4 = None
|
|
if not ipv6:
|
|
ipv6 = None
|
|
|
|
if self._create and (ipv4 or ipv6) or not self._create and not (self.instance.glueIPv4 == ipv4 and self.instance.glueIPv6 == ipv6):
|
|
mnts = self._user.maintainer_set.all()
|
|
domains = Domain.objects.filter(mnt_by__in=mnts)
|
|
found = False
|
|
for domain in domains:
|
|
if domain.name == zone:
|
|
found = True
|
|
break
|
|
|
|
if not found:
|
|
raise forms.ValidationError("You have glue IPs set, but this domain is not under a domain you control.")
|
|
|
|
if ipv4 or ipv6:
|
|
try:
|
|
ns = Nameserver.objects.get(Q(name=name) & (Q(glueIPv4__isnull=False) | Q(glueIPv6__isnull=False)))
|
|
if self._create or ns.pk != self.instance.pk:
|
|
nsMnts = ", ".join(n.handle for n in ns.mnt_by.all())
|
|
|
|
raise forms.ValidationError("Only one nameserver for this domain can have glue records and one already exists (maintained by %s)" % nsMnts)
|
|
except Nameserver.DoesNotExist:
|
|
pass
|
|
|
|
failedMnts = set()
|
|
for ns in Nameserver.objects.filter(name=name, mnt_by__in=mntBy):
|
|
if self._create or self.instance.pk != ns.pk:
|
|
for mnt in ns.mnt_by.all():
|
|
if mnt in mntBy:
|
|
failedMnts.add(mnt.handle)
|
|
|
|
if len(failedMnts) > 0:
|
|
raise forms.ValidationError("The following maintainer objects already have this nameservers: %s" % ", ".join(failedMnts))
|
|
|
|
|
|
return cleaned_data
|
|
|
|
|
|
class ReverseZoneForm(DSRecordMixin, forms.ModelForm):
|
|
prefix = forms.CharField(validators=[IP46CIDRValidator])
|
|
|
|
class Meta:
|
|
model = ReverseZone
|
|
fields = ['parentNet', 'nameservers']
|
|
|
|
help_texts = {
|
|
"prefix": "The prefix in CIDR form for which this object is responsible",
|
|
}
|
|
|
|
def __init__(self, user, *args, **kwargs):
|
|
self._user = user
|
|
|
|
super(ReverseZoneForm, self).__init__(*args, **kwargs)
|
|
|
|
instance = getattr(self, "instance", None)
|
|
self._create = not (instance and instance.pk)
|
|
|
|
mnts = self._user.maintainer_set.all()
|
|
self.fields['parentNet'].queryset = InetNum.objects.filter(Q(mnt_by__in=mnts) | Q(mnt_lower__in=mnts)).distinct()
|
|
|
|
self.fields['nameservers'].queryset = Nameserver.objects.filter(Q(mnt_by__in=mnts))
|
|
|
|
if not self._create:
|
|
self.fields['prefix'].disabled = True
|
|
self.fields['nameservers'].queryset |= self.instance.nameservers.all()
|
|
|
|
self.fields['nameservers'].queryset = self.fields['nameservers'].queryset.distinct()
|
|
|
|
def clean_prefix(self):
|
|
prefix = self.cleaned_data['prefix']
|
|
|
|
net = ipaddress.ip_network(prefix)
|
|
if net.version == 6 and net.prefixlen % 4 != 0:
|
|
raise forms.ValidationError("IPv6 reverse zone prefix length has to be a multiple of 4")
|
|
|
|
return prefix
|
|
|
|
def clean(self):
|
|
cleaned_data = super(ReverseZoneForm, self).clean()
|
|
|
|
if not self.errors:
|
|
if self._create:
|
|
net = ipaddress.ip_network(cleaned_data['prefix'])
|
|
parentNet = cleaned_data['parentNet'].getNetwork()
|
|
|
|
if net.network_address not in parentNet:
|
|
raise forms.ValidationError("Given prefix %s is not inside of parent netblock %s" % (net, parentNet))
|
|
|
|
# For now just check all the zones...
|
|
#zones = ReverseZone.objects.filter(parentNet=cleaned_data['parentNet'])
|
|
zones = ReverseZone.objects.all()
|
|
for zone in zones:
|
|
if net.network_address in zone.parentNet.getNetwork():
|
|
raise forms.ValidationError("Given prefix already has a reverse zone object associated to it: %s" % zone)
|
|
|
|
self.instance.address = str(net.network_address)
|
|
self.instance.netmask = net.prefixlen
|
|
|
|
|
|
return cleaned_data
|