362 lines
12 KiB
Python
362 lines
12 KiB
Python
from django import forms
|
|
from django.db.models import Q
|
|
from django.utils import timezone
|
|
|
|
from .models import Maintainer, Contact, InetNum, ASBlock, ASNumber
|
|
from .validators import HandleValidatorWithSuffix, IP46CIDRValidator
|
|
from .formfields import MultiTextInput
|
|
from .helpers import orderQueryset
|
|
|
|
import ipaddress
|
|
|
|
|
|
class WhoisObjectFormMixin(object):
|
|
def __init__(self, user, *args, **kwargs):
|
|
super(WhoisObjectFormMixin, self).__init__(*args, **kwargs)
|
|
self._user = user
|
|
|
|
instance = getattr(self, 'instance', None)
|
|
if instance and instance.pk:
|
|
self._create = False
|
|
#self.fields['handle'].disabled = True
|
|
else:
|
|
self._create = True
|
|
|
|
if 'handle' in self.fields:
|
|
self.fields['handle'].help_text = "Handle for this object in uppercase with a suffix of -%s" % instance.handleSuffix
|
|
|
|
# only show users contacts and already present contacts
|
|
mnts = self._user.maintainer_set.all()
|
|
if 'admin_c' in self.fields:
|
|
self.fields['admin_c'].queryset = Contact.getMntQueryset(mnts, self.instance, "admin_c")
|
|
|
|
def clean_handle(self):
|
|
HandleValidatorWithSuffix(self.instance.handleSuffix)(self.cleaned_data['handle'])
|
|
return self.cleaned_data['handle']
|
|
|
|
def clean(self):
|
|
cleaned_data = super(WhoisObjectFormMixin, self).clean()
|
|
if cleaned_data.get("handle") == "AUTO" and not self.errors:
|
|
name = cleaned_data.get("name")
|
|
if name is None:
|
|
name = self._user.username
|
|
|
|
cleaned_data['handle'] = self._meta.model.genGenericHandle(name)
|
|
|
|
# XXX: Find a better position to update last_changed
|
|
self.instance.last_modified = timezone.now()
|
|
|
|
return cleaned_data
|
|
|
|
|
|
class MntFormMixin(object):
|
|
protectedFields = []
|
|
|
|
def __init__(self, lower=False, *args, **kwargs):
|
|
super(MntFormMixin, self).__init__(*args, **kwargs)
|
|
|
|
self._editLower = lower
|
|
|
|
if self._editLower:
|
|
for key in self.protectedFields:
|
|
self.fields[key].disabled = True
|
|
|
|
instance = getattr(self, "instance", None)
|
|
if not hasattr(self, "_create"):
|
|
self._create = not (instance and instance.pk)
|
|
|
|
mntQs = orderQueryset(Maintainer.objects.all(), Q(auth=self._user), Q(pk__in=instance.mnt_by.all()) if not self._create else None)
|
|
self.fields["mnt_by"].queryset = mntQs
|
|
|
|
if "mnt_lower" in self.fields:
|
|
self.fields["mnt_lower"].queryset = mntQs
|
|
|
|
def clean(self):
|
|
cleaned_data = super(MntFormMixin, self).clean()
|
|
|
|
if not self.errors:
|
|
if self._create:
|
|
# at least one own mnt on creation
|
|
mnts = self._user.maintainer_set.all()
|
|
|
|
for mnt in cleaned_data['mnt_by']:
|
|
if mnt in mnts:
|
|
break
|
|
else:
|
|
raise forms.ValidationError("On object creation at least one maintainer needs to be under your control")
|
|
|
|
if cleaned_data['mnt_by'].count() > 5 or "mnt_lower" in self.fields and cleaned_data['mnt_lower'].count() > 5:
|
|
raise forms.ValidationError("Currently only 5 MNTs per object allowed. If you need more ask @ irc/rrequest and state your use case")
|
|
|
|
return cleaned_data
|
|
|
|
|
|
class MntForm(WhoisObjectFormMixin, forms.ModelForm):
|
|
class Meta:
|
|
model = Maintainer
|
|
#fields = ['handle', 'description', 'admin_c', 'auth']
|
|
fields = ['handle', 'description', 'admin_c']
|
|
widgets = {'auth': MultiTextInput()}
|
|
|
|
help_texts = {
|
|
'auth': 'Enter names of users which can edit this object (space separated; '
|
|
'and yes, validation is somewhat broken (values disappear on error - just reload))'
|
|
}
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super(MntForm, self).__init__(*args, **kwargs)
|
|
print(args, kwargs)
|
|
|
|
#if self._create:
|
|
# self.fields['auth'].text("noot")
|
|
|
|
|
|
class MntInitialForm(MntForm):
|
|
class Meta:
|
|
model = Maintainer
|
|
fields = ['handle', 'description']
|
|
|
|
|
|
class ContactForm(WhoisObjectFormMixin, forms.ModelForm):
|
|
class Meta:
|
|
model = Contact
|
|
fields = ['handle', 'name', 'mnt_by']
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super(ContactForm, self).__init__(*args, **kwargs)
|
|
|
|
if "mnt_by" in self.fields:
|
|
self.fields['mnt_by'].queryset = Maintainer.objects.filter(auth=self._user).distinct()
|
|
|
|
|
|
class ContactInitialForm(ContactForm):
|
|
class Meta:
|
|
model = Contact
|
|
fields = ['handle', 'name']
|
|
|
|
|
|
class InetNumForm(MntFormMixin, WhoisObjectFormMixin, forms.ModelForm):
|
|
prefix = forms.CharField()
|
|
protectedFields = ['protocol', 'parent_range', 'mnt_by', 'prefix']
|
|
|
|
class Meta:
|
|
model = InetNum
|
|
fields = ['handle', 'protocol', 'parent_range', 'prefix', 'name', 'description', 'origin_as', 'mnt_by', 'mnt_lower', 'admin_c']
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super(InetNumForm, self).__init__(*args, **kwargs)
|
|
|
|
if self._editLower:
|
|
for key in self.protectedFields:
|
|
self.fields[key].disabled = True
|
|
|
|
mnts = self._user.maintainer_set.all()
|
|
#self.fields['parent_range'].queryset = InetNum.objects.filter(Q(mnt_by__in=mnts) | Q(mnt_lower__in=mnts))
|
|
#if not self._create:
|
|
# self.fields['parent_range'].queryset |= InetNum.objects.filter(pk=self.instance.pk)
|
|
|
|
#self.fields['parent_range'].queryset = self.fields['parent_range'].queryset.distinct()
|
|
self.fields['parent_range'].queryset = InetNum.getMntQueryset(mnts, self.instance, "parent_range")
|
|
self.fields['origin_as'].queryset = ASNumber.getMntQueryset(mnts, self.instance, "origin_as")
|
|
|
|
def clean_prefix(self):
|
|
# make sure this is a subnet we're getting
|
|
net = self.cleaned_data['prefix'].lower()
|
|
IP46CIDRValidator(net)
|
|
|
|
try:
|
|
net = ipaddress.ip_network(net)
|
|
except ValueError as e:
|
|
raise forms.ValidationError(str(e))
|
|
|
|
return net
|
|
|
|
def clean_parent_range(self):
|
|
parent_range = self.cleaned_data.get('parent_range', None)
|
|
|
|
|
|
# allow parent range to be unset for already present objects
|
|
if not parent_range and (self._create or not self._create and self.instance.parent_range):
|
|
raise forms.ValidationError("Parent range must be set")
|
|
|
|
if not self._create and parent_range:
|
|
# make sure we don't have circular dependencies
|
|
obj = parent_range
|
|
while obj.parent_range:
|
|
if obj.pk == self.instance.pk:
|
|
raise forms.ValidationError("No circular dependencies allowed")
|
|
obj = obj.parent_range
|
|
|
|
if parent_range.origin_as.count() > 0:
|
|
raise forms.ValidationError("Parent range has origin as set")
|
|
|
|
return parent_range
|
|
|
|
def clean(self):
|
|
cleaned_data = super(InetNumForm, self).clean()
|
|
|
|
if not self._editLower:
|
|
if not self.errors:
|
|
if not self._create and self.cleaned_data['origin_as']:
|
|
if self.instance.inetnum_set.count() > 0:
|
|
ranges = ", ".join(map(str, self.instance.inetnum_set.all()))
|
|
raise forms.ValidationError("You cannot set an origin as if there are already existing subranges (%s)" % (ranges))
|
|
|
|
prefix = cleaned_data['prefix']
|
|
parent = cleaned_data['parent_range']
|
|
if parent:
|
|
parentNet = parent.getNetwork()
|
|
|
|
if cleaned_data['protocol'] != parent.protocol:
|
|
raise forms.ValidationError("Protocol type for prefix must be same as parent network")
|
|
|
|
# check if in parent block
|
|
if prefix.network_address not in parentNet or prefix.prefixlen < parentNet.prefixlen:
|
|
raise forms.ValidationError("Prefix must be inside parent network range")
|
|
|
|
|
|
# check if parent block has net that overlaps with us
|
|
for otherNet in parent.inetnum_set.all():
|
|
if self.instance and self.instance.pk == otherNet.pk:
|
|
continue
|
|
|
|
if otherNet.getNetwork().overlaps(prefix):
|
|
raise forms.ValidationError("The given prefix overlaps with network %s" % otherNet.handle)
|
|
|
|
# check if subnets to this subnet are (still) in current network
|
|
if not self._create:
|
|
for subnet in self.instance.inetnum_set.all():
|
|
if subnet.getNetwork().network_address not in self.instance.getNetwork():
|
|
raise forms.ValidationError("Subnet %s with %s is not in block anymore" % (subnet, subnet.getNetwork()))
|
|
|
|
self.instance.address = str(prefix.network_address)
|
|
self.instance.netmask = prefix.prefixlen
|
|
|
|
return cleaned_data
|
|
|
|
|
|
class ASBlockForm(MntFormMixin, WhoisObjectFormMixin, forms.ModelForm):
|
|
protectedFields = ['parent_block', 'asBegin', 'asEnd', 'mnt_by']
|
|
|
|
# FIXME: Filter blocks
|
|
class Meta:
|
|
model = ASBlock
|
|
fields = ['handle', 'parent_block', 'asBegin', 'asEnd', 'name', 'description', 'mnt_by', 'mnt_lower', 'admin_c']
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super(ASBlockForm, self).__init__(*args, **kwargs)
|
|
|
|
if not self.instance or self.instance and self.instance and self.instance.parent_block:
|
|
self.fields["parent_block"].required = True
|
|
if self.instance and self.instance.pk:
|
|
self.fields["parent_block"].disabled = True
|
|
|
|
mnts = self._user.maintainer_set.all()
|
|
self.fields['parent_block'].queryset = ASBlock.getMntQueryset(mnts, self.instance, "parent_block")
|
|
|
|
def clean_parent_block(self):
|
|
parent_block = self.cleaned_data.get('parent_block', None)
|
|
|
|
# allow parent range to be unset for already present objects
|
|
if not parent_block and (self._create or not self._create and self.instance.parent_block):
|
|
raise forms.ValidationError("Parent block must be set")
|
|
|
|
if not self._create and parent_block:
|
|
# make sure we don't have circular dependencies
|
|
obj = parent_block
|
|
while obj.parent_block:
|
|
if obj.pk == self.instance.pk:
|
|
raise forms.ValidationError("No circular dependencies allowed")
|
|
obj = obj.parent_block
|
|
|
|
return parent_block
|
|
|
|
def clean(self):
|
|
cleaned_data = super(ASBlockForm, self).clean()
|
|
|
|
if not self.errors:
|
|
asBegin = cleaned_data['asBegin']
|
|
asEnd = cleaned_data['asEnd']
|
|
parent = cleaned_data['parent_block']
|
|
# check if somebody is already using this block
|
|
|
|
# check if in range
|
|
if asBegin > asEnd:
|
|
raise forms.ValidationError("AS beginning must be smaller or equal to AS end")
|
|
|
|
if parent:
|
|
if parent.asnumber_set.count() > 0:
|
|
raise forms.ValidationError("The parent AS block is already references by following AS number objects: %s" % (", ".join(map(lambda _x: _x.handle, parent.asnumber_set.all())),))
|
|
|
|
# check if same range
|
|
if not (asBegin >= parent.asBegin and asEnd <= parent.asEnd):
|
|
raise forms.ValidationError("AS beginning and end must be inside the range of the parent AS block")
|
|
|
|
if parent.asBegin == asBegin and parent.asEnd == asEnd:
|
|
raise forms.ValidationError("The range of this block cannot be the same range AS the parent AS block")
|
|
|
|
# check for overlap with other asblocks
|
|
for block in parent.asblock_set.all():
|
|
if self.instance and self.instance.pk and block.pk == self.instance.pk:
|
|
continue
|
|
|
|
if block.asBegin <= asBegin <= block.asEnd or block.asBegin <= asEnd <= block.asEnd or \
|
|
asBegin <= block.asBegin <= asEnd or asBegin <= block.asEnd <= asEnd:
|
|
raise forms.ValidationError("Block overlaps with block %s" % block.handle)
|
|
|
|
if not self._create:
|
|
# check if subblocks are still in range
|
|
for subblock in self.instance.asblock_set.all():
|
|
if not (asBegin <= subblock.asBegin <= asEnd and asBegin <= subblock.asEnd <= asEnd):
|
|
raise forms.ValidationError("Subblock %s (%s - %s) is not contained in this block anymore" % (subblock, subblock.asBegin, subblock.asEnd))
|
|
|
|
# check if asnumbers are still in range
|
|
for asnumber in self.instance.asnumber_set.all():
|
|
if not (asBegin <= asnumber.number <= asEnd):
|
|
raise forms.ValidationError("AS %s (%s) is not contained in this block anymore" % (asnumber, asnumber.number))
|
|
|
|
|
|
return cleaned_data
|
|
|
|
|
|
class ASNumberForm(MntFormMixin, WhoisObjectFormMixin, forms.ModelForm):
|
|
protectedFields = ['asblock', 'number', 'mnt_by']
|
|
|
|
class Meta:
|
|
model = ASNumber
|
|
fields = ['handle', 'asblock', 'number', 'volatile', 'name', 'description', 'mnt_by', 'mnt_lower', 'admin_c']
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super(ASNumberForm, self).__init__(*args, **kwargs)
|
|
|
|
if not (self.instance and self.instance.pk):
|
|
self.fields["asblock"].required = True
|
|
else:
|
|
self.fields["asblock"].disabled = True
|
|
|
|
mnts = self._user.maintainer_set.all()
|
|
self.fields['asblock'].queryset = ASBlock.getMntQueryset(mnts, self.instance, "asblock")
|
|
|
|
def clean(self):
|
|
cleaned_data = super(ASNumberForm, self).clean()
|
|
|
|
if not self.errors:
|
|
number = cleaned_data['number']
|
|
block = cleaned_data['asblock']
|
|
|
|
# belongs to asblock?
|
|
if number < block.asBegin or number > block.asEnd:
|
|
raise forms.ValidationError("AS number is not inside AS block")
|
|
|
|
# does an entry already exist?
|
|
try:
|
|
otherAS = ASNumber.objects.get(number=number)
|
|
if not (self.instance and self.instance.pk and self.instance.pk == otherAS.pk):
|
|
raise forms.ValidationError("This AS number is already represented by %s" % otherAS.handle)
|
|
except ASNumber.DoesNotExist:
|
|
pass
|
|
|
|
# has already other asblock?
|
|
if block.asblock_set.count() > 0:
|
|
raise forms.ValidationError("The given AS block is already references by following sub AS blocks: %s" % (", ".join(map(lambda _x: _x.handle, block.asblock_set.all())),))
|