1import logging 2import re 3from copy import deepcopy 4 5from django.contrib import messages 6from django.contrib.contenttypes.models import ContentType 7from django.core.exceptions import FieldDoesNotExist, ObjectDoesNotExist, ValidationError 8from django.db import transaction, IntegrityError 9from django.db.models import ManyToManyField, ProtectedError 10from django.forms import Form, ModelMultipleChoiceField, MultipleHiddenInput, Textarea 11from django.http import HttpResponse 12from django.shortcuts import get_object_or_404, redirect, render 13from django.utils.html import escape 14from django.utils.http import is_safe_url 15from django.utils.safestring import mark_safe 16from django.views.generic import View 17from django_tables2.export import TableExport 18 19from extras.models import ExportTemplate 20from extras.signals import clear_webhooks 21from utilities.error_handlers import handle_protectederror 22from utilities.exceptions import AbortTransaction, PermissionsViolation 23from utilities.forms import ( 24 BootstrapMixin, BulkRenameForm, ConfirmationForm, CSVDataField, CSVFileField, ImportForm, restrict_form_fields, 25) 26from utilities.permissions import get_permission_for_model 27from utilities.tables import paginate_table 28from utilities.utils import normalize_querydict, prepare_cloned_fields 29from utilities.views import GetReturnURLMixin, ObjectPermissionRequiredMixin 30 31 32class ObjectView(ObjectPermissionRequiredMixin, View): 33 """ 34 Retrieve a single object for display. 35 36 queryset: The base queryset for retrieving the object 37 template_name: Name of the template to use 38 """ 39 queryset = None 40 template_name = None 41 42 def get_required_permission(self): 43 return get_permission_for_model(self.queryset.model, 'view') 44 45 def get_template_name(self): 46 """ 47 Return self.template_name if set. Otherwise, resolve the template path by model app_label and name. 48 """ 49 if self.template_name is not None: 50 return self.template_name 51 model_opts = self.queryset.model._meta 52 return f'{model_opts.app_label}/{model_opts.model_name}.html' 53 54 def get_extra_context(self, request, instance): 55 """ 56 Return any additional context data for the template. 57 58 request: The current request 59 instance: The object being viewed 60 """ 61 return {} 62 63 def get(self, request, *args, **kwargs): 64 """ 65 Generic GET handler for accessing an object by PK or slug 66 """ 67 instance = get_object_or_404(self.queryset, **kwargs) 68 69 return render(request, self.get_template_name(), { 70 'object': instance, 71 **self.get_extra_context(request, instance), 72 }) 73 74 75class ObjectListView(ObjectPermissionRequiredMixin, View): 76 """ 77 List a series of objects. 78 79 queryset: The queryset of objects to display. Note: Prefetching related objects is not necessary, as the 80 table will prefetch objects as needed depending on the columns being displayed. 81 filter: A django-filter FilterSet that is applied to the queryset 82 filter_form: The form used to render filter options 83 table: The django-tables2 Table used to render the objects list 84 template_name: The name of the template 85 """ 86 queryset = None 87 filterset = None 88 filterset_form = None 89 table = None 90 template_name = 'generic/object_list.html' 91 action_buttons = ('add', 'import', 'export') 92 93 def get_required_permission(self): 94 return get_permission_for_model(self.queryset.model, 'view') 95 96 def get_table(self, request, permissions): 97 table = self.table(self.queryset, user=request.user) 98 if 'pk' in table.base_columns and (permissions['change'] or permissions['delete']): 99 table.columns.show('pk') 100 101 return table 102 103 def export_yaml(self): 104 """ 105 Export the queryset of objects as concatenated YAML documents. 106 """ 107 yaml_data = [obj.to_yaml() for obj in self.queryset] 108 109 return '---\n'.join(yaml_data) 110 111 def export_table(self, table, columns=None): 112 """ 113 Export all table data in CSV format. 114 115 :param table: The Table instance to export 116 :param columns: A list of specific columns to include. If not specified, all columns will be exported. 117 """ 118 exclude_columns = {'pk'} 119 if columns: 120 all_columns = [col_name for col_name, _ in table.selected_columns + table.available_columns] 121 exclude_columns.update({ 122 col for col in all_columns if col not in columns 123 }) 124 exporter = TableExport( 125 export_format=TableExport.CSV, 126 table=table, 127 exclude_columns=exclude_columns 128 ) 129 return exporter.response( 130 filename=f'netbox_{self.queryset.model._meta.verbose_name_plural}.csv' 131 ) 132 133 def export_template(self, template, request): 134 """ 135 Render an ExportTemplate using the current queryset. 136 137 :param template: ExportTemplate instance 138 :param request: The current request 139 """ 140 try: 141 return template.render_to_response(self.queryset) 142 except Exception as e: 143 messages.error(request, f"There was an error rendering the selected export template ({template.name}): {e}") 144 return redirect(request.path) 145 146 def get(self, request): 147 model = self.queryset.model 148 content_type = ContentType.objects.get_for_model(model) 149 150 if self.filterset: 151 self.queryset = self.filterset(request.GET, self.queryset).qs 152 153 # Compile a dictionary indicating which permissions are available to the current user for this model 154 permissions = {} 155 for action in ('add', 'change', 'delete', 'view'): 156 perm_name = get_permission_for_model(model, action) 157 permissions[action] = request.user.has_perm(perm_name) 158 159 if 'export' in request.GET: 160 161 # Export the current table view 162 if request.GET['export'] == 'table': 163 table = self.get_table(request, permissions) 164 columns = [name for name, _ in table.selected_columns] 165 return self.export_table(table, columns) 166 167 # Render an ExportTemplate 168 elif request.GET['export']: 169 template = get_object_or_404(ExportTemplate, content_type=content_type, name=request.GET['export']) 170 return self.export_template(template, request) 171 172 # Check for YAML export support on the model 173 elif hasattr(model, 'to_yaml'): 174 response = HttpResponse(self.export_yaml(), content_type='text/yaml') 175 filename = 'netbox_{}.yaml'.format(self.queryset.model._meta.verbose_name_plural) 176 response['Content-Disposition'] = 'attachment; filename="{}"'.format(filename) 177 return response 178 179 # Fall back to default table/YAML export 180 else: 181 table = self.get_table(request, permissions) 182 return self.export_table(table) 183 184 # Render the objects table 185 table = self.get_table(request, permissions) 186 paginate_table(table, request) 187 188 context = { 189 'content_type': content_type, 190 'table': table, 191 'permissions': permissions, 192 'action_buttons': self.action_buttons, 193 'filter_form': self.filterset_form(request.GET, label_suffix='') if self.filterset_form else None, 194 } 195 context.update(self.extra_context()) 196 197 return render(request, self.template_name, context) 198 199 def extra_context(self): 200 return {} 201 202 203class ObjectEditView(GetReturnURLMixin, ObjectPermissionRequiredMixin, View): 204 """ 205 Create or edit a single object. 206 207 queryset: The base queryset for the object being modified 208 model_form: The form used to create or edit the object 209 template_name: The name of the template 210 """ 211 queryset = None 212 model_form = None 213 template_name = 'generic/object_edit.html' 214 215 def get_required_permission(self): 216 # self._permission_action is set by dispatch() to either "add" or "change" depending on whether 217 # we are modifying an existing object or creating a new one. 218 return get_permission_for_model(self.queryset.model, self._permission_action) 219 220 def get_object(self, kwargs): 221 # Look up an existing object by slug or PK, if provided. 222 if 'slug' in kwargs: 223 obj = get_object_or_404(self.queryset, slug=kwargs['slug']) 224 elif 'pk' in kwargs: 225 obj = get_object_or_404(self.queryset, pk=kwargs['pk']) 226 # Otherwise, return a new instance. 227 else: 228 return self.queryset.model() 229 230 # Take a snapshot of change-logged models 231 if hasattr(obj, 'snapshot'): 232 obj.snapshot() 233 234 return obj 235 236 def alter_obj(self, obj, request, url_args, url_kwargs): 237 # Allow views to add extra info to an object before it is processed. For example, a parent object can be defined 238 # given some parameter from the request URL. 239 return obj 240 241 def dispatch(self, request, *args, **kwargs): 242 # Determine required permission based on whether we are editing an existing object 243 self._permission_action = 'change' if kwargs else 'add' 244 245 return super().dispatch(request, *args, **kwargs) 246 247 def get(self, request, *args, **kwargs): 248 obj = self.alter_obj(self.get_object(kwargs), request, args, kwargs) 249 250 initial_data = normalize_querydict(request.GET) 251 form = self.model_form(instance=obj, initial=initial_data) 252 restrict_form_fields(form, request.user) 253 254 return render(request, self.template_name, { 255 'obj': obj, 256 'obj_type': self.queryset.model._meta.verbose_name, 257 'form': form, 258 'return_url': self.get_return_url(request, obj), 259 }) 260 261 def post(self, request, *args, **kwargs): 262 logger = logging.getLogger('netbox.views.ObjectEditView') 263 obj = self.alter_obj(self.get_object(kwargs), request, args, kwargs) 264 form = self.model_form( 265 data=request.POST, 266 files=request.FILES, 267 instance=obj 268 ) 269 restrict_form_fields(form, request.user) 270 271 if form.is_valid(): 272 logger.debug("Form validation was successful") 273 274 try: 275 with transaction.atomic(): 276 object_created = form.instance.pk is None 277 obj = form.save() 278 279 # Check that the new object conforms with any assigned object-level permissions 280 if not self.queryset.filter(pk=obj.pk).first(): 281 raise PermissionsViolation() 282 283 msg = '{} {}'.format( 284 'Created' if object_created else 'Modified', 285 self.queryset.model._meta.verbose_name 286 ) 287 logger.info(f"{msg} {obj} (PK: {obj.pk})") 288 if hasattr(obj, 'get_absolute_url'): 289 msg = '{} <a href="{}">{}</a>'.format(msg, obj.get_absolute_url(), escape(obj)) 290 else: 291 msg = '{} {}'.format(msg, escape(obj)) 292 messages.success(request, mark_safe(msg)) 293 294 if '_addanother' in request.POST: 295 redirect_url = request.path 296 297 # If the object has clone_fields, pre-populate a new instance of the form 298 if hasattr(obj, 'clone_fields'): 299 redirect_url += f"?{prepare_cloned_fields(obj)}" 300 301 return redirect(redirect_url) 302 303 return_url = self.get_return_url(request, obj) 304 305 return redirect(return_url) 306 307 except PermissionsViolation: 308 msg = "Object save failed due to object-level permissions violation" 309 logger.debug(msg) 310 form.add_error(None, msg) 311 clear_webhooks.send(sender=self) 312 313 else: 314 logger.debug("Form validation failed") 315 316 return render(request, self.template_name, { 317 'obj': obj, 318 'obj_type': self.queryset.model._meta.verbose_name, 319 'form': form, 320 'return_url': self.get_return_url(request, obj), 321 }) 322 323 324class ObjectDeleteView(GetReturnURLMixin, ObjectPermissionRequiredMixin, View): 325 """ 326 Delete a single object. 327 328 queryset: The base queryset for the object being deleted 329 template_name: The name of the template 330 """ 331 queryset = None 332 template_name = 'generic/object_delete.html' 333 334 def get_required_permission(self): 335 return get_permission_for_model(self.queryset.model, 'delete') 336 337 def get_object(self, kwargs): 338 # Look up object by slug if one has been provided. Otherwise, use PK. 339 if 'slug' in kwargs: 340 obj = get_object_or_404(self.queryset, slug=kwargs['slug']) 341 else: 342 obj = get_object_or_404(self.queryset, pk=kwargs['pk']) 343 344 # Take a snapshot of change-logged models 345 if hasattr(obj, 'snapshot'): 346 obj.snapshot() 347 348 return obj 349 350 def get(self, request, **kwargs): 351 obj = self.get_object(kwargs) 352 form = ConfirmationForm(initial=request.GET) 353 354 return render(request, self.template_name, { 355 'obj': obj, 356 'form': form, 357 'obj_type': self.queryset.model._meta.verbose_name, 358 'return_url': self.get_return_url(request, obj), 359 }) 360 361 def post(self, request, **kwargs): 362 logger = logging.getLogger('netbox.views.ObjectDeleteView') 363 obj = self.get_object(kwargs) 364 form = ConfirmationForm(request.POST) 365 366 if form.is_valid(): 367 logger.debug("Form validation was successful") 368 369 try: 370 obj.delete() 371 except ProtectedError as e: 372 logger.info("Caught ProtectedError while attempting to delete object") 373 handle_protectederror([obj], request, e) 374 return redirect(obj.get_absolute_url()) 375 376 msg = 'Deleted {} {}'.format(self.queryset.model._meta.verbose_name, obj) 377 logger.info(msg) 378 messages.success(request, msg) 379 380 return_url = form.cleaned_data.get('return_url') 381 if return_url is not None and is_safe_url(url=return_url, allowed_hosts=request.get_host()): 382 return redirect(return_url) 383 else: 384 return redirect(self.get_return_url(request, obj)) 385 386 else: 387 logger.debug("Form validation failed") 388 389 return render(request, self.template_name, { 390 'obj': obj, 391 'form': form, 392 'obj_type': self.queryset.model._meta.verbose_name, 393 'return_url': self.get_return_url(request, obj), 394 }) 395 396 397class BulkCreateView(GetReturnURLMixin, ObjectPermissionRequiredMixin, View): 398 """ 399 Create new objects in bulk. 400 401 queryset: Base queryset for the objects being created 402 form: Form class which provides the `pattern` field 403 model_form: The ModelForm used to create individual objects 404 pattern_target: Name of the field to be evaluated as a pattern (if any) 405 template_name: The name of the template 406 """ 407 queryset = None 408 form = None 409 model_form = None 410 pattern_target = '' 411 template_name = None 412 413 def get_required_permission(self): 414 return get_permission_for_model(self.queryset.model, 'add') 415 416 def get(self, request): 417 # Set initial values for visible form fields from query args 418 initial = {} 419 for field in getattr(self.model_form._meta, 'fields', []): 420 if request.GET.get(field): 421 initial[field] = request.GET[field] 422 423 form = self.form() 424 model_form = self.model_form(initial=initial) 425 426 return render(request, self.template_name, { 427 'obj_type': self.model_form._meta.model._meta.verbose_name, 428 'form': form, 429 'model_form': model_form, 430 'return_url': self.get_return_url(request), 431 }) 432 433 def post(self, request): 434 logger = logging.getLogger('netbox.views.BulkCreateView') 435 model = self.queryset.model 436 form = self.form(request.POST) 437 model_form = self.model_form(request.POST) 438 439 if form.is_valid(): 440 logger.debug("Form validation was successful") 441 pattern = form.cleaned_data['pattern'] 442 new_objs = [] 443 444 try: 445 with transaction.atomic(): 446 447 # Create objects from the expanded. Abort the transaction on the first validation error. 448 for value in pattern: 449 450 # Reinstantiate the model form each time to avoid overwriting the same instance. Use a mutable 451 # copy of the POST QueryDict so that we can update the target field value. 452 model_form = self.model_form(request.POST.copy()) 453 model_form.data[self.pattern_target] = value 454 455 # Validate each new object independently. 456 if model_form.is_valid(): 457 obj = model_form.save() 458 logger.debug(f"Created {obj} (PK: {obj.pk})") 459 new_objs.append(obj) 460 else: 461 # Copy any errors on the pattern target field to the pattern form. 462 errors = model_form.errors.as_data() 463 if errors.get(self.pattern_target): 464 form.add_error('pattern', errors[self.pattern_target]) 465 # Raise an IntegrityError to break the for loop and abort the transaction. 466 raise IntegrityError() 467 468 # Enforce object-level permissions 469 if self.queryset.filter(pk__in=[obj.pk for obj in new_objs]).count() != len(new_objs): 470 raise PermissionsViolation 471 472 # If we make it to this point, validation has succeeded on all new objects. 473 msg = "Added {} {}".format(len(new_objs), model._meta.verbose_name_plural) 474 logger.info(msg) 475 messages.success(request, msg) 476 477 if '_addanother' in request.POST: 478 return redirect(request.path) 479 return redirect(self.get_return_url(request)) 480 481 except IntegrityError: 482 pass 483 484 except PermissionsViolation: 485 msg = "Object creation failed due to object-level permissions violation" 486 logger.debug(msg) 487 form.add_error(None, msg) 488 489 else: 490 logger.debug("Form validation failed") 491 492 return render(request, self.template_name, { 493 'form': form, 494 'model_form': model_form, 495 'obj_type': model._meta.verbose_name, 496 'return_url': self.get_return_url(request), 497 }) 498 499 500class ObjectImportView(GetReturnURLMixin, ObjectPermissionRequiredMixin, View): 501 """ 502 Import a single object (YAML or JSON format). 503 504 queryset: Base queryset for the objects being created 505 model_form: The ModelForm used to create individual objects 506 related_object_forms: A dictionary mapping of forms to be used for the creation of related (child) objects 507 template_name: The name of the template 508 """ 509 queryset = None 510 model_form = None 511 related_object_forms = dict() 512 template_name = 'generic/object_import.html' 513 514 def get_required_permission(self): 515 return get_permission_for_model(self.queryset.model, 'add') 516 517 def get(self, request): 518 form = ImportForm() 519 520 return render(request, self.template_name, { 521 'form': form, 522 'obj_type': self.queryset.model._meta.verbose_name, 523 'return_url': self.get_return_url(request), 524 }) 525 526 def post(self, request): 527 logger = logging.getLogger('netbox.views.ObjectImportView') 528 form = ImportForm(request.POST) 529 530 if form.is_valid(): 531 logger.debug("Import form validation was successful") 532 533 # Initialize model form 534 data = form.cleaned_data['data'] 535 model_form = self.model_form(data) 536 restrict_form_fields(model_form, request.user) 537 538 # Assign default values for any fields which were not specified. We have to do this manually because passing 539 # 'initial=' to the form on initialization merely sets default values for the widgets. Since widgets are not 540 # used for YAML/JSON import, we first bind the imported data normally, then update the form's data with the 541 # applicable field defaults as needed prior to form validation. 542 for field_name, field in model_form.fields.items(): 543 if field_name not in data and hasattr(field, 'initial'): 544 model_form.data[field_name] = field.initial 545 546 if model_form.is_valid(): 547 548 try: 549 with transaction.atomic(): 550 551 # Save the primary object 552 obj = model_form.save() 553 554 # Enforce object-level permissions 555 if not self.queryset.filter(pk=obj.pk).first(): 556 raise PermissionsViolation() 557 558 logger.debug(f"Created {obj} (PK: {obj.pk})") 559 560 # Iterate through the related object forms (if any), validating and saving each instance. 561 for field_name, related_object_form in self.related_object_forms.items(): 562 logger.debug("Processing form for related objects: {related_object_form}") 563 564 related_obj_pks = [] 565 for i, rel_obj_data in enumerate(data.get(field_name, list())): 566 567 f = related_object_form(obj, rel_obj_data) 568 569 for subfield_name, field in f.fields.items(): 570 if subfield_name not in rel_obj_data and hasattr(field, 'initial'): 571 f.data[subfield_name] = field.initial 572 573 if f.is_valid(): 574 related_obj = f.save() 575 related_obj_pks.append(related_obj.pk) 576 else: 577 # Replicate errors on the related object form to the primary form for display 578 for subfield_name, errors in f.errors.items(): 579 for err in errors: 580 err_msg = "{}[{}] {}: {}".format(field_name, i, subfield_name, err) 581 model_form.add_error(None, err_msg) 582 raise AbortTransaction() 583 584 # Enforce object-level permissions on related objects 585 model = related_object_form.Meta.model 586 if model.objects.filter(pk__in=related_obj_pks).count() != len(related_obj_pks): 587 raise ObjectDoesNotExist 588 589 except AbortTransaction: 590 clear_webhooks.send(sender=self) 591 592 except PermissionsViolation: 593 msg = "Object creation failed due to object-level permissions violation" 594 logger.debug(msg) 595 form.add_error(None, msg) 596 clear_webhooks.send(sender=self) 597 598 if not model_form.errors: 599 logger.info(f"Import object {obj} (PK: {obj.pk})") 600 messages.success(request, mark_safe('Imported object: <a href="{}">{}</a>'.format( 601 obj.get_absolute_url(), obj 602 ))) 603 604 if '_addanother' in request.POST: 605 return redirect(request.get_full_path()) 606 607 return_url = form.cleaned_data.get('return_url') 608 if return_url is not None and is_safe_url(url=return_url, allowed_hosts=request.get_host()): 609 return redirect(return_url) 610 else: 611 return redirect(self.get_return_url(request, obj)) 612 613 else: 614 logger.debug("Model form validation failed") 615 616 # Replicate model form errors for display 617 for field, errors in model_form.errors.items(): 618 for err in errors: 619 if field == '__all__': 620 form.add_error(None, err) 621 else: 622 form.add_error(None, "{}: {}".format(field, err)) 623 624 else: 625 logger.debug("Import form validation failed") 626 627 return render(request, self.template_name, { 628 'form': form, 629 'obj_type': self.queryset.model._meta.verbose_name, 630 'return_url': self.get_return_url(request), 631 }) 632 633 634class BulkImportView(GetReturnURLMixin, ObjectPermissionRequiredMixin, View): 635 """ 636 Import objects in bulk (CSV format). 637 638 queryset: Base queryset for the model 639 model_form: The form used to create each imported object 640 table: The django-tables2 Table used to render the list of imported objects 641 template_name: The name of the template 642 widget_attrs: A dict of attributes to apply to the import widget (e.g. to require a session key) 643 """ 644 queryset = None 645 model_form = None 646 table = None 647 template_name = 'generic/object_bulk_import.html' 648 widget_attrs = {} 649 650 def _import_form(self, *args, **kwargs): 651 652 class ImportForm(BootstrapMixin, Form): 653 csv = CSVDataField( 654 from_form=self.model_form, 655 widget=Textarea(attrs=self.widget_attrs) 656 ) 657 csv_file = CSVFileField( 658 label="CSV file", 659 from_form=self.model_form, 660 required=False 661 ) 662 663 def clean(self): 664 csv_rows = self.cleaned_data['csv'][1] if 'csv' in self.cleaned_data else None 665 csv_file = self.files.get('csv_file') 666 667 # Check that the user has not submitted both text data and a file 668 if csv_rows and csv_file: 669 raise ValidationError( 670 "Cannot process CSV text and file attachment simultaneously. Please choose only one import " 671 "method." 672 ) 673 674 return ImportForm(*args, **kwargs) 675 676 def _save_obj(self, obj_form, request): 677 """ 678 Provide a hook to modify the object immediately before saving it (e.g. to encrypt secret data). 679 """ 680 return obj_form.save() 681 682 def get_required_permission(self): 683 return get_permission_for_model(self.queryset.model, 'add') 684 685 def get(self, request): 686 687 return render(request, self.template_name, { 688 'form': self._import_form(), 689 'fields': self.model_form().fields, 690 'obj_type': self.model_form._meta.model._meta.verbose_name, 691 'return_url': self.get_return_url(request), 692 }) 693 694 def post(self, request): 695 logger = logging.getLogger('netbox.views.BulkImportView') 696 new_objs = [] 697 form = self._import_form(request.POST, request.FILES) 698 699 if form.is_valid(): 700 logger.debug("Form validation was successful") 701 702 try: 703 # Iterate through CSV data and bind each row to a new model form instance. 704 with transaction.atomic(): 705 if request.FILES: 706 headers, records = form.cleaned_data['csv_file'] 707 else: 708 headers, records = form.cleaned_data['csv'] 709 for row, data in enumerate(records, start=1): 710 obj_form = self.model_form(data, headers=headers) 711 restrict_form_fields(obj_form, request.user) 712 713 if obj_form.is_valid(): 714 obj = self._save_obj(obj_form, request) 715 new_objs.append(obj) 716 else: 717 for field, err in obj_form.errors.items(): 718 form.add_error('csv', "Row {} {}: {}".format(row, field, err[0])) 719 raise ValidationError("") 720 721 # Enforce object-level permissions 722 if self.queryset.filter(pk__in=[obj.pk for obj in new_objs]).count() != len(new_objs): 723 raise PermissionsViolation 724 725 # Compile a table containing the imported objects 726 obj_table = self.table(new_objs) 727 728 if new_objs: 729 msg = 'Imported {} {}'.format(len(new_objs), new_objs[0]._meta.verbose_name_plural) 730 logger.info(msg) 731 messages.success(request, msg) 732 733 return render(request, "import_success.html", { 734 'table': obj_table, 735 'return_url': self.get_return_url(request), 736 }) 737 738 except ValidationError: 739 clear_webhooks.send(sender=self) 740 741 except PermissionsViolation: 742 msg = "Object import failed due to object-level permissions violation" 743 logger.debug(msg) 744 form.add_error(None, msg) 745 clear_webhooks.send(sender=self) 746 747 else: 748 logger.debug("Form validation failed") 749 750 return render(request, self.template_name, { 751 'form': form, 752 'fields': self.model_form().fields, 753 'obj_type': self.model_form._meta.model._meta.verbose_name, 754 'return_url': self.get_return_url(request), 755 }) 756 757 758class BulkEditView(GetReturnURLMixin, ObjectPermissionRequiredMixin, View): 759 """ 760 Edit objects in bulk. 761 762 queryset: Custom queryset to use when retrieving objects (e.g. to select related objects) 763 filter: FilterSet to apply when deleting by QuerySet 764 table: The table used to display devices being edited 765 form: The form class used to edit objects in bulk 766 template_name: The name of the template 767 """ 768 queryset = None 769 filterset = None 770 table = None 771 form = None 772 template_name = 'generic/object_bulk_edit.html' 773 774 def get_required_permission(self): 775 return get_permission_for_model(self.queryset.model, 'change') 776 777 def get(self, request): 778 return redirect(self.get_return_url(request)) 779 780 def post(self, request, **kwargs): 781 logger = logging.getLogger('netbox.views.BulkEditView') 782 model = self.queryset.model 783 784 # If we are editing *all* objects in the queryset, replace the PK list with all matched objects. 785 if request.POST.get('_all') and self.filterset is not None: 786 pk_list = self.filterset(request.GET, self.queryset.values_list('pk', flat=True)).qs 787 else: 788 pk_list = request.POST.getlist('pk') 789 790 # Include the PK list as initial data for the form 791 initial_data = {'pk': pk_list} 792 793 # Check for other contextual data needed for the form. We avoid passing all of request.GET because the 794 # filter values will conflict with the bulk edit form fields. 795 # TODO: Find a better way to accomplish this 796 if 'device' in request.GET: 797 initial_data['device'] = request.GET.get('device') 798 elif 'device_type' in request.GET: 799 initial_data['device_type'] = request.GET.get('device_type') 800 elif 'virtual_machine' in request.GET: 801 initial_data['virtual_machine'] = request.GET.get('virtual_machine') 802 803 if '_apply' in request.POST: 804 form = self.form(model, request.POST, initial=initial_data) 805 restrict_form_fields(form, request.user) 806 807 if form.is_valid(): 808 logger.debug("Form validation was successful") 809 custom_fields = form.custom_fields if hasattr(form, 'custom_fields') else [] 810 standard_fields = [ 811 field for field in form.fields if field not in custom_fields + ['pk'] 812 ] 813 nullified_fields = request.POST.getlist('_nullify') 814 815 try: 816 817 with transaction.atomic(): 818 819 updated_objects = [] 820 for obj in self.queryset.filter(pk__in=form.cleaned_data['pk']): 821 822 # Take a snapshot of change-logged models 823 if hasattr(obj, 'snapshot'): 824 obj.snapshot() 825 826 # Update standard fields. If a field is listed in _nullify, delete its value. 827 for name in standard_fields: 828 829 try: 830 model_field = model._meta.get_field(name) 831 except FieldDoesNotExist: 832 # This form field is used to modify a field rather than set its value directly 833 model_field = None 834 835 # Handle nullification 836 if name in form.nullable_fields and name in nullified_fields: 837 if isinstance(model_field, ManyToManyField): 838 getattr(obj, name).set([]) 839 else: 840 setattr(obj, name, None if model_field.null else '') 841 842 # ManyToManyFields 843 elif isinstance(model_field, ManyToManyField): 844 if form.cleaned_data[name]: 845 getattr(obj, name).set(form.cleaned_data[name]) 846 # Normal fields 847 elif name in form.changed_data: 848 setattr(obj, name, form.cleaned_data[name]) 849 850 # Update custom fields 851 for name in custom_fields: 852 if name in form.nullable_fields and name in nullified_fields: 853 obj.custom_field_data[name] = None 854 elif name in form.changed_data: 855 obj.custom_field_data[name] = form.cleaned_data[name] 856 857 obj.full_clean() 858 obj.save() 859 updated_objects.append(obj) 860 logger.debug(f"Saved {obj} (PK: {obj.pk})") 861 862 # Add/remove tags 863 if form.cleaned_data.get('add_tags', None): 864 obj.tags.add(*form.cleaned_data['add_tags']) 865 if form.cleaned_data.get('remove_tags', None): 866 obj.tags.remove(*form.cleaned_data['remove_tags']) 867 868 # Enforce object-level permissions 869 if self.queryset.filter(pk__in=[obj.pk for obj in updated_objects]).count() != len(updated_objects): 870 raise PermissionsViolation 871 872 if updated_objects: 873 msg = 'Updated {} {}'.format(len(updated_objects), model._meta.verbose_name_plural) 874 logger.info(msg) 875 messages.success(self.request, msg) 876 877 return redirect(self.get_return_url(request)) 878 879 except ValidationError as e: 880 messages.error(self.request, "{} failed validation: {}".format(obj, ", ".join(e.messages))) 881 clear_webhooks.send(sender=self) 882 883 except PermissionsViolation: 884 msg = "Object update failed due to object-level permissions violation" 885 logger.debug(msg) 886 form.add_error(None, msg) 887 clear_webhooks.send(sender=self) 888 889 else: 890 logger.debug("Form validation failed") 891 892 else: 893 894 form = self.form(model, initial=initial_data) 895 restrict_form_fields(form, request.user) 896 897 # Retrieve objects being edited 898 table = self.table(self.queryset.filter(pk__in=pk_list), orderable=False) 899 if not table.rows: 900 messages.warning(request, "No {} were selected.".format(model._meta.verbose_name_plural)) 901 return redirect(self.get_return_url(request)) 902 903 return render(request, self.template_name, { 904 'form': form, 905 'table': table, 906 'obj_type_plural': model._meta.verbose_name_plural, 907 'return_url': self.get_return_url(request), 908 }) 909 910 911class BulkRenameView(GetReturnURLMixin, ObjectPermissionRequiredMixin, View): 912 """ 913 An extendable view for renaming objects in bulk. 914 """ 915 queryset = None 916 template_name = 'generic/object_bulk_rename.html' 917 918 def __init__(self, *args, **kwargs): 919 super().__init__(*args, **kwargs) 920 921 # Create a new Form class from BulkRenameForm 922 class _Form(BulkRenameForm): 923 pk = ModelMultipleChoiceField( 924 queryset=self.queryset, 925 widget=MultipleHiddenInput() 926 ) 927 928 self.form = _Form 929 930 def get_required_permission(self): 931 return get_permission_for_model(self.queryset.model, 'change') 932 933 def post(self, request): 934 logger = logging.getLogger('netbox.views.BulkRenameView') 935 936 if '_preview' in request.POST or '_apply' in request.POST: 937 form = self.form(request.POST, initial={'pk': request.POST.getlist('pk')}) 938 selected_objects = self.queryset.filter(pk__in=form.initial['pk']) 939 940 if form.is_valid(): 941 try: 942 with transaction.atomic(): 943 renamed_pks = [] 944 for obj in selected_objects: 945 946 # Take a snapshot of change-logged models 947 if hasattr(obj, 'snapshot'): 948 obj.snapshot() 949 950 find = form.cleaned_data['find'] 951 replace = form.cleaned_data['replace'] 952 if form.cleaned_data['use_regex']: 953 try: 954 obj.new_name = re.sub(find, replace, obj.name) 955 # Catch regex group reference errors 956 except re.error: 957 obj.new_name = obj.name 958 else: 959 obj.new_name = obj.name.replace(find, replace) 960 renamed_pks.append(obj.pk) 961 962 if '_apply' in request.POST: 963 for obj in selected_objects: 964 obj.name = obj.new_name 965 obj.save() 966 967 # Enforce constrained permissions 968 if self.queryset.filter(pk__in=renamed_pks).count() != len(selected_objects): 969 raise PermissionsViolation 970 971 messages.success(request, "Renamed {} {}".format( 972 len(selected_objects), 973 self.queryset.model._meta.verbose_name_plural 974 )) 975 return redirect(self.get_return_url(request)) 976 977 except PermissionsViolation: 978 msg = "Object update failed due to object-level permissions violation" 979 logger.debug(msg) 980 form.add_error(None, msg) 981 clear_webhooks.send(sender=self) 982 983 else: 984 form = self.form(initial={'pk': request.POST.getlist('pk')}) 985 selected_objects = self.queryset.filter(pk__in=form.initial['pk']) 986 987 return render(request, self.template_name, { 988 'form': form, 989 'obj_type_plural': self.queryset.model._meta.verbose_name_plural, 990 'selected_objects': selected_objects, 991 'return_url': self.get_return_url(request), 992 }) 993 994 995class BulkDeleteView(GetReturnURLMixin, ObjectPermissionRequiredMixin, View): 996 """ 997 Delete objects in bulk. 998 999 queryset: Custom queryset to use when retrieving objects (e.g. to select related objects) 1000 filter: FilterSet to apply when deleting by QuerySet 1001 table: The table used to display devices being deleted 1002 form: The form class used to delete objects in bulk 1003 template_name: The name of the template 1004 """ 1005 queryset = None 1006 filterset = None 1007 table = None 1008 form = None 1009 template_name = 'generic/object_bulk_delete.html' 1010 1011 def get_required_permission(self): 1012 return get_permission_for_model(self.queryset.model, 'delete') 1013 1014 def get(self, request): 1015 return redirect(self.get_return_url(request)) 1016 1017 def post(self, request, **kwargs): 1018 logger = logging.getLogger('netbox.views.BulkDeleteView') 1019 model = self.queryset.model 1020 1021 # Are we deleting *all* objects in the queryset or just a selected subset? 1022 if request.POST.get('_all'): 1023 qs = model.objects.all() 1024 if self.filterset is not None: 1025 qs = self.filterset(request.GET, qs).qs 1026 pk_list = qs.only('pk').values_list('pk', flat=True) 1027 else: 1028 pk_list = [int(pk) for pk in request.POST.getlist('pk')] 1029 1030 form_cls = self.get_form() 1031 1032 if '_confirm' in request.POST: 1033 form = form_cls(request.POST) 1034 if form.is_valid(): 1035 logger.debug("Form validation was successful") 1036 1037 # Delete objects 1038 queryset = self.queryset.filter(pk__in=pk_list) 1039 deleted_count = queryset.count() 1040 try: 1041 for obj in queryset: 1042 # Take a snapshot of change-logged models 1043 if hasattr(obj, 'snapshot'): 1044 obj.snapshot() 1045 obj.delete() 1046 except ProtectedError as e: 1047 logger.info("Caught ProtectedError while attempting to delete objects") 1048 handle_protectederror(queryset, request, e) 1049 return redirect(self.get_return_url(request)) 1050 1051 msg = f"Deleted {deleted_count} {model._meta.verbose_name_plural}" 1052 logger.info(msg) 1053 messages.success(request, msg) 1054 return redirect(self.get_return_url(request)) 1055 1056 else: 1057 logger.debug("Form validation failed") 1058 1059 else: 1060 form = form_cls(initial={ 1061 'pk': pk_list, 1062 'return_url': self.get_return_url(request), 1063 }) 1064 1065 # Retrieve objects being deleted 1066 table = self.table(self.queryset.filter(pk__in=pk_list), orderable=False) 1067 if not table.rows: 1068 messages.warning(request, "No {} were selected for deletion.".format(model._meta.verbose_name_plural)) 1069 return redirect(self.get_return_url(request)) 1070 1071 return render(request, self.template_name, { 1072 'form': form, 1073 'obj_type_plural': model._meta.verbose_name_plural, 1074 'table': table, 1075 'return_url': self.get_return_url(request), 1076 }) 1077 1078 def get_form(self): 1079 """ 1080 Provide a standard bulk delete form if none has been specified for the view 1081 """ 1082 class BulkDeleteForm(ConfirmationForm): 1083 pk = ModelMultipleChoiceField(queryset=self.queryset, widget=MultipleHiddenInput) 1084 1085 if self.form: 1086 return self.form 1087 1088 return BulkDeleteForm 1089 1090 1091# 1092# Device/VirtualMachine components 1093# 1094 1095# TODO: Replace with BulkCreateView 1096class ComponentCreateView(GetReturnURLMixin, ObjectPermissionRequiredMixin, View): 1097 """ 1098 Add one or more components (e.g. interfaces, console ports, etc.) to a Device or VirtualMachine. 1099 """ 1100 queryset = None 1101 form = None 1102 model_form = None 1103 template_name = 'generic/object_edit.html' 1104 1105 def get_required_permission(self): 1106 return get_permission_for_model(self.queryset.model, 'add') 1107 1108 def get(self, request): 1109 1110 form = self.form(initial=request.GET) 1111 1112 return render(request, self.template_name, { 1113 'obj': self.queryset.model(), 1114 'obj_type': self.queryset.model._meta.verbose_name, 1115 'form': form, 1116 'return_url': self.get_return_url(request), 1117 }) 1118 1119 def post(self, request): 1120 logger = logging.getLogger('netbox.views.ComponentCreateView') 1121 form = self.form(request.POST, initial=request.GET) 1122 self.validate_form(request, form) 1123 1124 if form.is_valid() and not form.errors: 1125 if '_addanother' in request.POST: 1126 return redirect(request.get_full_path()) 1127 else: 1128 return redirect(self.get_return_url(request)) 1129 1130 return render(request, self.template_name, { 1131 'obj_type': self.queryset.model._meta.verbose_name, 1132 'form': form, 1133 'return_url': self.get_return_url(request), 1134 }) 1135 1136 def validate_form(self, request, form): 1137 """ 1138 Validate form values and set errors on the form object as they are detected. If 1139 no errors are found, signal success messages. 1140 """ 1141 1142 logger = logging.getLogger('netbox.views.ComponentCreateView') 1143 if form.is_valid(): 1144 new_components = [] 1145 data = deepcopy(request.POST) 1146 names = form.cleaned_data['name_pattern'] 1147 labels = form.cleaned_data.get('label_pattern') 1148 1149 for i, name in enumerate(names): 1150 label = labels[i] if labels else None 1151 # Initialize the individual component form 1152 data['name'] = name 1153 data['label'] = label 1154 1155 if hasattr(form, 'get_iterative_data'): 1156 data.update(form.get_iterative_data(i)) 1157 1158 component_form = self.model_form(data) 1159 1160 if component_form.is_valid(): 1161 new_components.append(component_form) 1162 1163 else: 1164 for field, errors in component_form.errors.as_data().items(): 1165 # Assign errors on the child form's name/label field to name_pattern/label_pattern on the parent form 1166 if field == 'name': 1167 field = 'name_pattern' 1168 elif field == 'label': 1169 field = 'label_pattern' 1170 for e in errors: 1171 form.add_error(field, '{}: {}'.format(name, ', '.join(e))) 1172 1173 if not form.errors: 1174 try: 1175 with transaction.atomic(): 1176 # Create the new components 1177 new_objs = [] 1178 for component_form in new_components: 1179 obj = component_form.save() 1180 new_objs.append(obj) 1181 1182 # Enforce object-level permissions 1183 if self.queryset.filter(pk__in=[obj.pk for obj in new_objs]).count() != len(new_objs): 1184 raise PermissionsViolation 1185 1186 messages.success(request, "Added {} {}".format( 1187 len(new_components), self.queryset.model._meta.verbose_name_plural 1188 )) 1189 # Return the newly created objects so overridden post methods can use the data as needed. 1190 return new_objs 1191 1192 except PermissionsViolation: 1193 msg = "Component creation failed due to object-level permissions violation" 1194 logger.debug(msg) 1195 form.add_error(None, msg) 1196 clear_webhooks.send(sender=self) 1197 1198 return None 1199 1200 1201class BulkComponentCreateView(GetReturnURLMixin, ObjectPermissionRequiredMixin, View): 1202 """ 1203 Add one or more components (e.g. interfaces, console ports, etc.) to a set of Devices or VirtualMachines. 1204 """ 1205 parent_model = None 1206 parent_field = None 1207 form = None 1208 queryset = None 1209 model_form = None 1210 filterset = None 1211 table = None 1212 template_name = 'generic/object_bulk_add_component.html' 1213 1214 def get_required_permission(self): 1215 return f'dcim.add_{self.queryset.model._meta.model_name}' 1216 1217 def post(self, request): 1218 logger = logging.getLogger('netbox.views.BulkComponentCreateView') 1219 parent_model_name = self.parent_model._meta.verbose_name_plural 1220 model_name = self.queryset.model._meta.verbose_name_plural 1221 1222 # Are we editing *all* objects in the queryset or just a selected subset? 1223 if request.POST.get('_all') and self.filterset is not None: 1224 pk_list = [obj.pk for obj in self.filterset(request.GET, self.parent_model.objects.only('pk')).qs] 1225 else: 1226 pk_list = [int(pk) for pk in request.POST.getlist('pk')] 1227 1228 selected_objects = self.parent_model.objects.filter(pk__in=pk_list) 1229 if not selected_objects: 1230 messages.warning(request, "No {} were selected.".format(self.parent_model._meta.verbose_name_plural)) 1231 return redirect(self.get_return_url(request)) 1232 table = self.table(selected_objects) 1233 1234 if '_create' in request.POST: 1235 form = self.form(request.POST) 1236 1237 if form.is_valid(): 1238 logger.debug("Form validation was successful") 1239 1240 new_components = [] 1241 data = deepcopy(form.cleaned_data) 1242 1243 try: 1244 with transaction.atomic(): 1245 1246 for obj in data['pk']: 1247 1248 names = data['name_pattern'] 1249 labels = data['label_pattern'] if 'label_pattern' in data else None 1250 for i, name in enumerate(names): 1251 label = labels[i] if labels else None 1252 1253 component_data = { 1254 self.parent_field: obj.pk, 1255 'name': name, 1256 'label': label 1257 } 1258 component_data.update(data) 1259 component_form = self.model_form(component_data) 1260 if component_form.is_valid(): 1261 instance = component_form.save() 1262 logger.debug(f"Created {instance} on {instance.parent_object}") 1263 new_components.append(instance) 1264 else: 1265 for field, errors in component_form.errors.as_data().items(): 1266 for e in errors: 1267 form.add_error(field, '{} {}: {}'.format(obj, name, ', '.join(e))) 1268 1269 # Enforce object-level permissions 1270 if self.queryset.filter(pk__in=[obj.pk for obj in new_components]).count() != len(new_components): 1271 raise PermissionsViolation 1272 1273 except IntegrityError: 1274 clear_webhooks.send(sender=self) 1275 1276 except PermissionsViolation: 1277 msg = "Component creation failed due to object-level permissions violation" 1278 logger.debug(msg) 1279 form.add_error(None, msg) 1280 clear_webhooks.send(sender=self) 1281 1282 if not form.errors: 1283 msg = "Added {} {} to {} {}.".format( 1284 len(new_components), 1285 model_name, 1286 len(form.cleaned_data['pk']), 1287 parent_model_name 1288 ) 1289 logger.info(msg) 1290 messages.success(request, msg) 1291 1292 return redirect(self.get_return_url(request)) 1293 1294 else: 1295 logger.debug("Form validation failed") 1296 1297 else: 1298 form = self.form(initial={'pk': pk_list}) 1299 1300 return render(request, self.template_name, { 1301 'form': form, 1302 'parent_model_name': parent_model_name, 1303 'model_name': model_name, 1304 'table': table, 1305 'return_url': self.get_return_url(request), 1306 }) 1307