diff --git a/accounts/apps.py b/accounts/apps.py index 94f4d27..6a2e2bd 100644 --- a/accounts/apps.py +++ b/accounts/apps.py @@ -10,19 +10,19 @@ def apply_change_password(sender, **kwargs): from django.conf import settings from django.contrib.auth.models import User, Permission if hasattr(settings, 'SHOW_PROFILE_EDIT_PASSWORD'): - print('\033[92mSHOW_PROFILE_EDIT_PASSWORD is found inside settings.py\033[0m') - print('\033[92mApplying permission can_change_password for all users\033[0m') + print('\033[1m! \033[92mSHOW_PROFILE_EDIT_PASSWORD is found inside settings.py\033[0m') + print('\033[1m* \033[92mApplying permission can_change_password for all users\033[0m') users = User.objects.all() permission = Permission.objects.get(codename='change_password') if settings.SHOW_PROFILE_EDIT_PASSWORD: - print('\033[91mWarning!!! Setting to True for all users\033[0m') + print('\033[1m! \033[91mWarning!!! Setting to True for all users\033[0m') for user in users: user.user_permissions.add(permission) else: - print('\033[91mWarning!!! Setting to False for all users\033[0m') + print('\033[1m* \033[91mWarning!!! Setting to False for all users\033[0m') for user in users: user.user_permissions.remove(permission) - print('\033[1mDon`t forget to remove the option from settings.py\033[0m') + print('\033[1m! Don`t forget to remove the option from settings.py\033[0m') def create_admin(sender, **kwargs): @@ -32,11 +32,11 @@ def create_admin(sender, **kwargs): from django.contrib.auth.models import User from accounts.models import UserAttributes - plan = kwargs['plan'] + plan = kwargs.get('plan', []) for migration, rolled_back in plan: if migration.app_label == 'accounts' and migration.name == '0001_initial' and not rolled_back: if User.objects.count() == 0: - print('\033[92mCreating default admin user\033[0m') + print('\033[1m* \033[92mCreating default admin user\033[0m') admin = User.objects.create_superuser('admin', None, 'admin') UserAttributes(user=admin, max_instances=-1, max_cpus=-1, max_memory=-1, max_disk_size=-1).save() break @@ -47,5 +47,5 @@ class AccountsConfig(AppConfig): verbose_name = 'Accounts' def ready(self): - post_migrate.connect(apply_change_password, sender=self) post_migrate.connect(create_admin, sender=self) + post_migrate.connect(apply_change_password, sender=self) diff --git a/conf/test-vm.xml b/conf/test-vm.xml index c407266..d14f25f 100644 --- a/conf/test-vm.xml +++ b/conf/test-vm.xml @@ -1,7 +1,6 @@ test-vm 1bd3c1f2-dd12-4b8d-a298-dff387cb9f93 - None 131072 131072 1 diff --git a/instances/apps.py b/instances/apps.py index 4c5563d..3b5c07d 100644 --- a/instances/apps.py +++ b/instances/apps.py @@ -6,38 +6,41 @@ def migrate_can_clone_instances(sender, **kwargs): ''' Migrate can clone instances user attribute to permission ''' - from django.conf import settings - from django.contrib.auth.models import User, Permission - from accounts.models import UserAttributes + from django.contrib.auth.models import Permission, User - plan = kwargs['plan'] + plan = kwargs.get('plan', []) for migration, rolled_back in plan: if migration.app_label == 'instances' and migration.name == '0002_permissionset' and not rolled_back: users = User.objects.all() permission = Permission.objects.get(codename='clone_instances') - print('\033[92mMigrating can_clone_instaces user attribute to permission\033[0m') + print('\033[1m* \033[92mMigrating can_clone_instaces user attribute to permission\033[0m') for user in users: if user.userattributes: if user.userattributes.can_clone_instances: user.user_permissions.add(permission) break + def apply_passwordless_console(sender, **kwargs): ''' Apply new passwordless_console permission for all users ''' - from django.conf import settings - from django.contrib.auth.models import User, Permission + from django.contrib.auth.models import Permission + from django.contrib.auth import get_user_model + User = get_user_model() + plan = kwargs.get('plan', []) + for migration, rolled_back in plan: + if migration.app_label == 'instances' and migration.name == '0009_auto_20200717_0524' and not rolled_back: + print('\033[1m* \033[92mApplying permission passwordless_console for all users\033[0m') + users = User.objects.all() + permission = Permission.objects.get(codename='passwordless_console') + for user in users: + user.user_permissions.add(permission) - print('\033[92mApplying permission passwordless_console for all users\033[0m') - users = User.objects.all() - permission = Permission.objects.get(codename='passwordless_console') - for user in users: - user.user_permissions.add(permission) class InstancesConfig(AppConfig): name = 'instances' - verbose_name = 'instances' + verbose_name = 'Instances' def ready(self): post_migrate.connect(migrate_can_clone_instances, sender=self) diff --git a/instances/forms.py b/instances/forms.py index 85b7584..77433ba 100644 --- a/instances/forms.py +++ b/instances/forms.py @@ -61,6 +61,4 @@ class NewVMForm(forms.Form): have_symbol = re.match('^[a-zA-Z0-9._-]+$', name) if not have_symbol: raise forms.ValidationError(_('The name of the virtual machine must not contain any special characters')) - elif len(name) > 64: - raise forms.ValidationError(_('The name of the virtual machine must not exceed 20 characters')) return name diff --git a/instances/tests.py b/instances/tests.py index 3dac112..5c70322 100644 --- a/instances/tests.py +++ b/instances/tests.py @@ -1,65 +1,138 @@ -import tempfile +import re +from accounts.models import UserAttributes, UserInstance, UserSSHKey +from appsettings.models import AppSettings +from computes.models import Compute +from django.contrib.auth import get_user_model +from django.contrib.auth.models import Permission +from django.http.response import Http404 from django.shortcuts import reverse from django.test import TestCase +from libvirt import VIR_DOMAIN_UNDEFINE_NVRAM +from vrtManager.create import wvmCreate +from vrtManager.util import randomUUID -from computes.models import Compute +from instances.views import instance -from .models import Instance +from .models import Flavor, Instance +from .utils import refr class InstancesTestCase(TestCase): - def setUp(self): - self.client.login(username='admin', password='admin') - Compute( - name='local', + @classmethod + def setUpClass(cls): + super().setUpClass() + + # Add users for testing purposes + User = get_user_model() + cls.admin_user = User.objects.get(pk=1) + cls.test_user = User.objects.create(username='test-user') + UserAttributes.objects.create( + user=cls.test_user, + max_instances=1, + max_cpus=1, + max_memory=128, + max_disk_size=1, + ) + permission = Permission.objects.get(codename='clone_instances') + cls.test_user.user_permissions.add(permission) + + # Add localhost compute + cls.compute = Compute( + name='test-compute', hostname='localhost', login='', password='', details='local', type=4, - ).save() + ) + cls.compute.save() + + cls.connection = wvmCreate( + cls.compute.hostname, + cls.compute.login, + cls.compute.password, + cls.compute.type, + ) + + # Add disks for testing + cls.connection.create_volume( + 'default', + 'test-volume', + 1, + 'qcow2', + False, + 0, + 0, + ) + # XML for testing vm + with open('conf/test-vm.xml', 'r') as f: + cls.xml = f.read() + + # Create testing vm from XML + cls.connection._defineXML(cls.xml) + refr(cls.compute) + cls.instance: Instance = Instance.objects.get(pk=1) + + @classmethod + def tearDownClass(cls): + # Destroy testing vm + cls.instance.proxy.delete_all_disks() + cls.instance.proxy.delete(VIR_DOMAIN_UNDEFINE_NVRAM) + super().tearDownClass() + + def setUp(self): + self.client.login(username='admin', password='admin') + self.rsa_key = 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAgQC6OOdbfv27QVnSC6sKxGaHb6YFc+3gxCkyVR3cTSXE/n5BEGf8aOgBpepULWa1RZfxYHY14PlKULDygdXSdrrR2kNSwoKz/Oo4d+3EE92L7ocl1+djZbptzgWgtw1OseLwbFik+iKlIdqPsH+IUQvX7yV545ZQtAP8Qj1R+uCqkw== test@test' def test_index(self): response = self.client.get(reverse('instances:index')) - # with open('index.html', 'wb') as f: - # f.write(response.content) + self.assertEqual(response.status_code, 200) + + self.client.force_login(self.test_user) + response = self.client.get(reverse('instances:index')) self.assertEqual(response.status_code, 200) def test_create_select_type(self): response = self.client.get(reverse('instances:create_instance_select_type', args=[1])) self.assertEqual(response.status_code, 200) - def test_instance(self): - compute = Compute.objects.get(pk=1) + def test_instance_page(self): + response = self.client.get(reverse('instances:instance', args=[self.instance.id])) + self.assertEqual(response.status_code, 200) - # create volume - response = self.client.post( - reverse('create_volume', args=[compute.id, 'default']), - { - 'name': 'test', - 'format': 'qcow2', - 'size': '1', - 'meta_prealloc': False, - }, - ) - self.assertRedirects(response, reverse('storage', args=[compute.id, 'default'])) + self.client.force_login(self.test_user) + response = self.client.get(reverse('instances:instance', args=[self.instance.id])) + self.assertRaises(Http404) - # create instance - response = self.client.get(reverse('instances:create_instance', args=[compute.id, 'x86_64', 'q35'])) + # def test_create_volume(self): + # response = self.client.post( + # reverse('create_volume', args=[self.compute.id, 'default']), + # { + # 'name': 'test', + # 'format': 'qcow2', + # 'size': '1', + # 'meta_prealloc': False, + # }, + # ) + # self.assertRedirects(response, reverse('storage', args=[self.compute.id, 'default'])) + + def test_create_destroy_instance(self): + # Create + response = self.client.get(reverse('instances:create_instance', args=[self.compute.id, 'x86_64', 'q35'])) self.assertEqual(response.status_code, 200) response = self.client.post( - reverse('instances:create_instance', args=[compute.id, 'x86_64', 'q35']), + reverse('instances:create_instance', args=[self.compute.id, 'x86_64', 'q35']), { 'name': 'test', 'firmware': 'BIOS', 'vcpu': 1, 'vcpu_mode': 'host-model', - 'memory': 512, + 'memory': 128, 'device0': 'disk', 'bus0': 'virtio', - 'images': 'test.qcow2', + 'images': 'test-volume.qcow2', 'storage-control': 'default', 'image-control': 'test.qcow2', 'networks': 'default', @@ -77,87 +150,200 @@ class InstancesTestCase(TestCase): ) self.assertEqual(response.status_code, 302) - instance: Instance = Instance.objects.get(pk=1) - self.assertEqual(instance.name, 'test') + instance_qs: Instance = Instance.objects.filter(name='test') + self.assertEqual(len(instance_qs), 1) - # get instance page - response = self.client.get(reverse('instances:instance', args=[instance.id])) + instance = instance_qs[0] + + # Destroy + response = self.client.get(reverse('instances:destroy', args=[instance.id])) self.assertEqual(response.status_code, 200) - # resize cpu - self.assertEqual(instance.vcpu, 1) - self.assertEqual(instance.cur_vcpu, 1) + response = self.client.post( + reverse('instances:destroy', args=[instance.id]), + {}, # do not destroy disk image + HTTP_REFERER=reverse('index'), + ) + self.assertRedirects(response, reverse('instances:index')) - response = self.client.post(reverse('instances:resizevm_cpu', args=[instance.id]), {'vcpu': 4, 'cur_vcpu': 2}) - self.assertRedirects(response, reverse('instances:instance', args=[instance.id]) + '#resize') + def test_create_from_xml(self): + uuid = randomUUID() + xml = self.xml.replace('test-vm', 'test-vm-xml') + xml = re.sub('\s?.*?', f'{uuid}', xml) + response = self.client.post( + reverse('instances:create_instance_select_type', args=[self.compute.id]), + { + 'create_xml': True, + 'dom_xml': xml, + }, + ) + self.assertEqual(response.status_code, 302) - # reset cached properties - del instance.vcpu - del instance.cur_vcpu - self.assertEqual(instance.vcpu, 4) - self.assertEqual(instance.cur_vcpu, 2) + xml_instance_qs: Instance = Instance.objects.filter(name='test-vm-xml') + self.assertEqual(len(xml_instance_qs), 1) - # resize memory - self.assertEqual(instance.memory, 512) - self.assertEqual(instance.cur_memory, 512) + xml_instance = xml_instance_qs[0] - response = self.client.post(reverse('instances:resize_memory', args=[instance.id]), { - 'memory': 2048, - 'cur_memory': 1024 - }) - self.assertRedirects(response, reverse('instances:instance', args=[instance.id]) + '#resize') - - del instance.memory - del instance.cur_memory - self.assertEqual(instance.memory, 2048) - self.assertEqual(instance.cur_memory, 1024) - - # resize disk - self.assertEqual(instance.disks[0]['size'], 1024**3) - - response = self.client.post(reverse('instances:resize_disk', args=[instance.id]), { - 'disk_size_vda': '2', - }) - self.assertRedirects(response, reverse('instances:instance', args=[instance.id]) + '#resize') - - del instance.disks - self.assertEqual(instance.disks[0]['size'], 2 * 1024**3) - - # add new volume - self.assertEqual(len(instance.disks), 1) + # destroy started instance to maximize coverage + xml_instance.proxy.start() response = self.client.post( - reverse('instances:add_new_vol', args=[instance.id]), + reverse('instances:destroy', args=[xml_instance.id]), + {}, # do not delete disk image + HTTP_REFERER=reverse('index'), + ) + self.assertRedirects(response, reverse('instances:index')) + + def test_resize_cpu(self): + self.assertEqual(self.instance.vcpu, 1) + self.assertEqual(self.instance.cur_vcpu, 1) + + response = self.client.post(reverse('instances:resizevm_cpu', args=[self.instance.id]), { + 'vcpu': 4, + 'cur_vcpu': 2, + }) + self.assertRedirects(response, reverse('instances:instance', args=[self.instance.id]) + '#resize') + + # reset cached properties + del self.instance.vcpu + del self.instance.cur_vcpu + + self.assertEqual(self.instance.vcpu, 4) + self.assertEqual(self.instance.cur_vcpu, 2) + + def test_resize_cpu_with_quota(self): + # test for non admin user with quotas + vcpu = self.instance.vcpu + cur_vcpu = self.instance.cur_vcpu + + UserInstance.objects.create(user=self.test_user, instance=self.instance, is_change=True) + + self.client.force_login(self.test_user) + + response = self.client.post(reverse('instances:resizevm_cpu', args=[self.instance.id]), { + 'vcpu': 4, + 'cur_vcpu': 2, + }) + self.assertRedirects(response, reverse('instances:instance', args=[self.instance.id]) + '#resize') + + del self.instance.vcpu + del self.instance.cur_vcpu + + # no changes as user reached quota + self.assertEqual(self.instance.vcpu, vcpu) + self.assertEqual(self.instance.cur_vcpu, cur_vcpu) + + def test_resize_memory(self): + self.assertEqual(self.instance.memory, 128) + self.assertEqual(self.instance.cur_memory, 128) + + response = self.client.post(reverse('instances:resize_memory', args=[self.instance.id]), { + 'memory': 512, + 'cur_memory': 256 + }) + self.assertRedirects(response, reverse('instances:instance', args=[self.instance.id]) + '#resize') + + del self.instance.memory + del self.instance.cur_memory + self.assertEqual(self.instance.memory, 512) + self.assertEqual(self.instance.cur_memory, 256) + + response = self.client.post(reverse('instances:resize_memory', args=[self.instance.id]), { + 'memory_custom': 500, + 'cur_memory_custom': 200 + }) + self.assertRedirects(response, reverse('instances:instance', args=[self.instance.id]) + '#resize') + + del self.instance.memory + del self.instance.cur_memory + + self.assertEqual(self.instance.memory, 500) + self.assertEqual(self.instance.cur_memory, 200) + + def test_resize_memory_with_quota(self): + # test for non admin user with quotas + memory = self.instance.memory + cur_memory = self.instance.cur_memory + + UserInstance.objects.create(user=self.test_user, instance=self.instance, is_change=True) + + self.client.force_login(self.test_user) + + response = self.client.post(reverse('instances:resize_memory', args=[self.instance.id]), { + 'memory': 512, + 'cur_memory': 256 + }) + self.assertRedirects(response, reverse('instances:instance', args=[self.instance.id]) + '#resize') + + del self.instance.memory + del self.instance.cur_memory + + # no changes as user reached quota + self.assertEqual(self.instance.memory, memory) + self.assertEqual(self.instance.cur_memory, cur_memory) + + def test_resize_disk(self): + self.assertEqual(self.instance.disks[0]['size'], 1024**3) + + response = self.client.post(reverse('instances:resize_disk', args=[self.instance.id]), { + 'disk_size_vda': 2, + }) + self.assertRedirects(response, reverse('instances:instance', args=[self.instance.id]) + '#resize') + + del self.instance.disks + self.assertEqual(self.instance.disks[0]['size'], 2 * 1024**3) + + def test_resize_disk_with_quota(self): + # test for non admin user with quotas + disk_size = self.instance.disks[0]['size'] + UserInstance.objects.create(user=self.test_user, instance=self.instance, is_change=True) + + self.client.force_login(self.test_user) + + response = self.client.post(reverse('instances:resize_disk', args=[self.instance.id]), { + 'disk_size_vda': 3, + }) + self.assertRedirects(response, reverse('instances:instance', args=[self.instance.id]) + '#resize') + + # no changes as user reached quota + del self.instance.disks + self.assertEqual(self.instance.disks[0]['size'], disk_size) + + def test_add_delete_new_volume(self): + self.assertEqual(len(self.instance.disks), 1) + + response = self.client.post( + reverse('instances:add_new_vol', args=[self.instance.id]), { 'storage': 'default', - 'name': 'test2', + 'name': 'test-volume-2', 'size': 1, }, HTTP_REFERER=reverse('index'), ) self.assertEqual(response.status_code, 302) - del instance.disks - self.assertEqual(len(instance.disks), 2) + del self.instance.disks + self.assertEqual(len(self.instance.disks), 2) - # delete volume from instance response = self.client.post( - reverse('instances:delete_vol', args=[instance.id]), + reverse('instances:delete_vol', args=[self.instance.id]), { 'storage': 'default', 'dev': 'vdb', - 'name': 'test2.qcow2', + 'name': 'test-volume-2.qcow2', }, HTTP_REFERER=reverse('index'), ) self.assertEqual(response.status_code, 302) - del instance.disks - self.assertEqual(len(instance.disks), 1) + del self.instance.disks + self.assertEqual(len(self.instance.disks), 1) + def test_detach_attach_volume(self): # detach volume response = self.client.post( - reverse('instances:detach_vol', args=[instance.id]), + reverse('instances:detach_vol', args=[self.instance.id]), { 'dev': 'vda', }, @@ -165,28 +351,28 @@ class InstancesTestCase(TestCase): ) self.assertEqual(response.status_code, 302) - del instance.disks - self.assertEqual(len(instance.disks), 0) + del self.instance.disks + self.assertEqual(len(self.instance.disks), 0) - # add existing volume + # reattach volume response = self.client.post( - reverse('instances:add_existing_vol', args=[instance.id]), + reverse('instances:add_existing_vol', args=[self.instance.id]), { 'selected_storage': 'default', - 'vols': 'test.qcow2', + 'vols': 'test-volume.qcow2', }, HTTP_REFERER=reverse('index'), ) self.assertEqual(response.status_code, 302) - del instance.disks - self.assertEqual(len(instance.disks), 1) + del self.instance.disks + self.assertEqual(len(self.instance.disks), 1) - # edit volume + def test_edit_volume(self): response = self.client.post( - reverse('instances:edit_volume', args=[instance.id]), + reverse('instances:edit_volume', args=[self.instance.id]), { - 'vol_path': '/var/lib/libvirt/images/test.qcow2', + 'vol_path': '/var/lib/libvirt/images/test-volume.qcow2', # 'vol_shareable': False, # 'vol_readonly': False, 'vol_bus': 'virtio', @@ -199,11 +385,11 @@ class InstancesTestCase(TestCase): ) self.assertEqual(response.status_code, 302) - # add media device - self.assertEqual(len(instance.media), 1) + def test_attach_detach_cdrom(self): + self.assertEqual(len(self.instance.media), 1) response = self.client.post( - reverse('instances:add_cdrom', args=[instance.id]), + reverse('instances:add_cdrom', args=[self.instance.id]), { 'bus': 'sata', }, @@ -211,8 +397,8 @@ class InstancesTestCase(TestCase): ) self.assertEqual(response.status_code, 302) - del instance.media - self.assertEqual(len(instance.media), 2) + del self.instance.media + self.assertEqual(len(self.instance.media), 2) # create dummy iso # with tempfile.NamedTemporaryFile() as f: @@ -226,139 +412,291 @@ class InstancesTestCase(TestCase): # }, # ) - # remove media device + # detach CD-ROM drive response = self.client.post( - reverse('instances:detach_cdrom', args=[instance.id, 'sda']), + reverse('instances:detach_cdrom', args=[self.instance.id, 'sda']), {}, HTTP_REFERER=reverse('index'), ) self.assertEqual(response.status_code, 302) - del instance.media - self.assertEqual(len(instance.media), 1) + del self.instance.media + self.assertEqual(len(self.instance.media), 1) - # snapshots - self.assertEqual(len(instance.snapshots), 0) + def test_snapshots(self): + self.assertEqual(len(self.instance.snapshots), 0) response = self.client.post( - reverse('instances:snapshot', args=[instance.id]), + reverse('instances:snapshot', args=[self.instance.id]), { - 'name': 'test', + 'name': 'test-snapshot', }, HTTP_REFERER=reverse('index'), ) self.assertEqual(response.status_code, 302) - del instance.snapshots - self.assertEqual(len(instance.snapshots), 1) + del self.instance.snapshots + self.assertEqual(len(self.instance.snapshots), 1) response = self.client.post( - reverse('instances:revert_snapshot', args=[instance.id]), + reverse('instances:revert_snapshot', args=[self.instance.id]), { - 'name': 'test', + 'name': 'test-snapshot', }, HTTP_REFERER=reverse('index'), ) self.assertEqual(response.status_code, 302) response = self.client.post( - reverse('instances:delete_snapshot', args=[instance.id]), + reverse('instances:delete_snapshot', args=[self.instance.id]), { - 'name': 'test', + 'name': 'test-snapshot', }, HTTP_REFERER=reverse('index'), ) self.assertEqual(response.status_code, 302) - del instance.snapshots - self.assertEqual(len(instance.snapshots), 0) + del self.instance.snapshots + self.assertEqual(len(self.instance.snapshots), 0) - # autostart - self.assertEqual(instance.autostart, 0) + def test_autostart(self): + self.assertEqual(self.instance.autostart, 0) response = self.client.post( - reverse('instances:set_autostart', args=[instance.id]), + reverse('instances:set_autostart', args=[self.instance.id]), {}, HTTP_REFERER=reverse('index'), ) self.assertEqual(response.status_code, 302) - del instance.autostart - self.assertEqual(instance.autostart, 1) + del self.instance.autostart + self.assertEqual(self.instance.autostart, 1) response = self.client.post( - reverse('instances:unset_autostart', args=[instance.id]), + reverse('instances:unset_autostart', args=[self.instance.id]), {}, HTTP_REFERER=reverse('index'), ) self.assertEqual(response.status_code, 302) - del instance.autostart - self.assertEqual(instance.autostart, 0) + del self.instance.autostart + self.assertEqual(self.instance.autostart, 0) - # bootmenu - self.assertEqual(instance.bootmenu, True) + def test_bootmenu(self): + self.assertEqual(self.instance.bootmenu, True) response = self.client.post( - reverse('instances:unset_bootmenu', args=[instance.id]), + reverse('instances:unset_bootmenu', args=[self.instance.id]), {}, HTTP_REFERER=reverse('index'), ) self.assertEqual(response.status_code, 302) - del instance.bootmenu - self.assertEqual(instance.bootmenu, False) + del self.instance.bootmenu + self.assertEqual(self.instance.bootmenu, False) response = self.client.post( - reverse('instances:set_bootmenu', args=[instance.id]), + reverse('instances:set_bootmenu', args=[self.instance.id]), {}, HTTP_REFERER=reverse('index'), ) self.assertEqual(response.status_code, 302) - del instance.bootmenu - self.assertEqual(instance.bootmenu, True) + del self.instance.bootmenu + self.assertEqual(self.instance.bootmenu, True) - # guest agent - - self.assertEqual(instance.guest_agent, False) + def test_guest_agent(self): + self.assertEqual(self.instance.guest_agent, False) response = self.client.post( - reverse('instances:set_guest_agent', args=[instance.id]), + reverse('instances:set_guest_agent', args=[self.instance.id]), {'guest_agent': True}, HTTP_REFERER=reverse('index'), ) self.assertEqual(response.status_code, 302) - del instance.guest_agent - self.assertEqual(instance.guest_agent, True) + del self.instance.guest_agent + self.assertEqual(self.instance.guest_agent, True) response = self.client.post( - reverse('instances:set_guest_agent', args=[instance.id]), + reverse('instances:set_guest_agent', args=[self.instance.id]), {'guest_agent': False}, HTTP_REFERER=reverse('index'), ) self.assertEqual(response.status_code, 302) - del instance.guest_agent - self.assertEqual(instance.guest_agent, False) + del self.instance.guest_agent + self.assertEqual(self.instance.guest_agent, False) - # video model - self.assertEqual(instance.video_model, 'vga') + def test_video_model(self): + self.assertEqual(self.instance.video_model, 'vga') response = self.client.post( - reverse('instances:set_video_model', args=[instance.id]), + reverse('instances:set_video_model', args=[self.instance.id]), {'video_model': 'virtio'}, HTTP_REFERER=reverse('index'), ) self.assertEqual(response.status_code, 302) - del instance.video_model - self.assertEqual(instance.video_model, 'virtio') + del self.instance.video_model + self.assertEqual(self.instance.video_model, 'virtio') - # console + def test_owner(self): + self.assertEqual(UserInstance.objects.count(), 0) response = self.client.post( - reverse('instances:update_console', args=[instance.id]), + reverse('instances:add_owner', args=[self.instance.id]), + {'user_id': self.admin_user.id}, + HTTP_REFERER=reverse('index'), + ) + self.assertEqual(response.status_code, 302) + self.assertEqual(UserInstance.objects.count(), 1) + + user_instance: UserInstance = UserInstance.objects.get(id=1) + self.assertEqual(user_instance.instance_id, self.instance.id) + self.assertEqual(user_instance.user_id, self.admin_user.id) + + # test when no multiple owners allowed + setting = AppSettings.objects.get(key='ALLOW_INSTANCE_MULTIPLE_OWNER') + setting.value = 'False' + setting.save() + + response = self.client.post( + reverse('instances:add_owner', args=[self.instance.id]), + {'user_id': self.test_user.id}, + HTTP_REFERER=reverse('index'), + ) + self.assertEqual(response.status_code, 302) + self.assertEqual(UserInstance.objects.count(), 1) + + response = self.client.post( + reverse('instances:del_owner', args=[self.instance.id]), + {'userinstance': user_instance.id}, + HTTP_REFERER=reverse('index'), + ) + self.assertEqual(response.status_code, 302) + self.assertEqual(UserInstance.objects.count(), 0) + + def test_clone(self): + instance_count = Instance.objects.count() + response = self.client.post( + reverse('instances:clone', args=[self.instance.id]), + { + 'name': 'test-vm-clone', + 'clone-net-mac-0': 'de:ad:be:ef:de:ad', + 'disk-vda': 'test-clone.img', + 'clone-title': '', + 'clone-description': '', + 'clone': '', + }, + HTTP_REFERER=reverse('index'), + ) + self.assertEqual(response.status_code, 302) + self.assertEqual(Instance.objects.count(), instance_count + 1) + + clone_qs = Instance.objects.filter(name='test-vm-clone') + self.assertEqual(len(clone_qs), 1) + clone = clone_qs[0] + + self.assertEqual(clone.proxy.get_net_devices()[0]['mac'], 'de:ad:be:ef:de:ad') + + response = self.client.post( + reverse('instances:snapshot', args=[clone.id]), + { + 'name': 'test', + }, + HTTP_REFERER=reverse('index'), + ) + self.assertEqual(response.status_code, 302) + + response = self.client.post( + reverse('instances:destroy', args=[clone.id]), + { + 'delete_disk': True, + 'delete_nvram': True + }, + HTTP_REFERER=reverse('index'), + ) + self.assertRedirects(response, reverse('instances:index')) + self.assertEqual(Instance.objects.count(), instance_count) + + def test_clone_with_quota(self): + # test for non admin user with quotas + instance_count = Instance.objects.count() + + UserInstance.objects.create(user=self.test_user, instance=self.instance) + + self.client.force_login(self.test_user) + + response = self.client.post( + reverse('instances:clone', args=[self.instance.id]), + { + 'name': 'test-vm-clone', + 'clone-net-mac-0': 'de:ad:be:ef:de:ad', + 'disk-vda': 'test-clone.img', + 'clone-title': '', + 'clone-description': '', + 'clone': '', + }, + HTTP_REFERER=reverse('index'), + ) + self.assertEqual(response.status_code, 302) + # no new instances created as user reached quota + self.assertEqual(Instance.objects.count(), instance_count) + + def test_clone_errors(self): + instance_count = Instance.objects.count() + + # duplicate name + response = self.client.post( + reverse('instances:clone', args=[self.instance.id]), + { + 'name': 'test-vm', + 'clone-net-mac-0': 'de:ad:be:ef:de:ad', + 'disk-vda': 'test.img', + 'clone-title': '', + 'clone-description': '', + 'clone': '', + }, + HTTP_REFERER=reverse('index'), + ) + self.assertEqual(response.status_code, 302) + self.assertEqual(Instance.objects.count(), instance_count) + + # wrong name + response = self.client.post( + reverse('instances:clone', args=[self.instance.id]), + { + 'name': '!@#$', + 'clone-net-mac-0': 'de:ad:be:ef:de:ad', + 'disk-vda': '!@#$.img', + 'clone-title': '', + 'clone-description': '', + 'clone': '', + }, + HTTP_REFERER=reverse('index'), + ) + self.assertEqual(response.status_code, 302) + self.assertEqual(Instance.objects.count(), instance_count) + + # wrong mac + response = self.client.post( + reverse('instances:clone', args=[self.instance.id]), + { + 'name': 'test-vm-clone', + 'clone-net-mac-0': 'gh:ad:be:ef:de:ad', + 'disk-vda': 'test-clone.img', + 'clone-title': '', + 'clone-description': '', + 'clone': '', + }, + HTTP_REFERER=reverse('index'), + ) + self.assertEqual(response.status_code, 302) + self.assertEqual(Instance.objects.count(), instance_count) + + def test_console(self): + response = self.client.post( + reverse('instances:update_console', args=[self.instance.id]), { 'type': 'spice', 'listen_on': '0.0.0.0', @@ -369,109 +707,320 @@ class InstancesTestCase(TestCase): ) self.assertEqual(response.status_code, 302) - # poweron - self.assertEqual(instance.status, 5) - - response = self.client.get(reverse('instances:poweron', args=[instance.id]), HTTP_REFERER=reverse('index')) - self.assertEqual(response.status_code, 302) - - del instance.status - - self.assertEqual(instance.status, 1) - - # status - response = self.client.get(reverse('instances:status', args=[instance.id])) + def test_status(self): + response = self.client.get(reverse('instances:status', args=[self.instance.id])) self.assertEqual(response.status_code, 200) - # stats - response = self.client.get(reverse('instances:stats', args=[instance.id])) + def test_stats(self): + response = self.client.get(reverse('instances:stats', args=[self.instance.id])) self.assertEqual(response.status_code, 200) - # guess_mac_address - response = self.client.get(reverse('instances:guess_mac_address', args=[instance.name])) + def test_guess_mac_address(self): + response = self.client.get(reverse('instances:guess_mac_address', args=[self.instance.name])) self.assertEqual(response.status_code, 200) - # random_mac_address + def test_random_mac_address(self): response = self.client.get(reverse('instances:random_mac_address')) self.assertEqual(response.status_code, 200) - # random_mac_address + def test_guess_clone_name(self): response = self.client.get(reverse('instances:guess_clone_name')) self.assertEqual(response.status_code, 200) - # guess_mac_address - response = self.client.get(reverse('instances:check_instance', args=[instance.name])) + def test_sshkeys(self): + UserSSHKey.objects.create(keyname='keyname', keypublic=self.rsa_key, user=self.test_user) + UserInstance.objects.create(user=self.test_user, instance=self.instance) + + response = self.client.get(reverse('instances:sshkeys', args=[self.instance.id])) self.assertEqual(response.status_code, 200) - # sshkeys - response = self.client.get(reverse('instances:sshkeys', args=[instance.name])) + response = self.client.get(reverse('instances:sshkeys', args=[self.instance.id]) + '?plain=true') self.assertEqual(response.status_code, 200) + def test_check_instance(self): + response = self.client.get(reverse('instances:check_instance', args=['test-vm'])) + self.assertEqual(response.status_code, 200) + self.assertJSONEqual(response.content, {'vname': 'test-vm', 'exists': True}) + + def test_start_template(self): + # starting templates must fail + self.assertEqual(self.instance.status, 5) + + self.instance.is_template = True + self.instance.save() + + response = self.client.get(reverse('instances:poweron', args=[self.instance.id]), HTTP_REFERER=reverse('index')) + self.assertEqual(response.status_code, 302) + + del self.instance.status + self.assertEqual(self.instance.status, 5) + + self.instance.is_template = False + self.instance.save() + + def test_power(self): + # poweron + self.assertEqual(self.instance.status, 5) + + response = self.client.get(reverse('instances:poweron', args=[self.instance.id]), HTTP_REFERER=reverse('index')) + self.assertEqual(response.status_code, 302) + + del self.instance.status + + self.assertEqual(self.instance.status, 1) + # suspend - response = self.client.get(reverse('instances:suspend', args=[instance.id]), HTTP_REFERER=reverse('index')) + response = self.client.get(reverse('instances:suspend', args=[self.instance.id]), HTTP_REFERER=reverse('index')) self.assertEqual(response.status_code, 302) - del instance.status - self.assertEqual(instance.status, 3) + del self.instance.status + self.assertEqual(self.instance.status, 3) # resume - response = self.client.get(reverse('instances:resume', args=[instance.id]), HTTP_REFERER=reverse('index')) + response = self.client.get(reverse('instances:resume', args=[self.instance.id]), HTTP_REFERER=reverse('index')) self.assertEqual(response.status_code, 302) - del instance.status - self.assertEqual(instance.status, 1) + del self.instance.status + self.assertEqual(self.instance.status, 1) # poweroff - response = self.client.get(reverse('instances:poweroff', args=[instance.id]), HTTP_REFERER=reverse('index')) + response = self.client.get(reverse('instances:poweroff', args=[self.instance.id]), HTTP_REFERER=reverse('index')) self.assertEqual(response.status_code, 302) # as no OS is installed ACPI won't work - del instance.status - self.assertEqual(instance.status, 1) + del self.instance.status + self.assertEqual(self.instance.status, 1) # powercycle - response = self.client.get(reverse('instances:powercycle', args=[instance.id]), HTTP_REFERER=reverse('index')) + response = self.client.get(reverse('instances:powercycle', args=[self.instance.id]), HTTP_REFERER=reverse('index')) self.assertEqual(response.status_code, 302) - self.assertEqual(instance.status, 1) + + del self.instance.status + self.assertEqual(self.instance.status, 1) # force_off - response = self.client.get(reverse('instances:force_off', args=[instance.id]), HTTP_REFERER=reverse('index')) + response = self.client.get(reverse('instances:force_off', args=[self.instance.id]), HTTP_REFERER=reverse('index')) self.assertEqual(response.status_code, 302) - del instance.status - self.assertEqual(instance.status, 5) + del self.instance.status + self.assertEqual(self.instance.status, 5) - # delete started instance with disks - instance.proxy.start() - del instance.status - self.assertEqual(instance.status, 1) + def test_vv_file(self): + response = self.client.get(reverse('instances:getvvfile', args=[self.instance.id])) + self.assertEqual(response.status_code, 200) + + def test_vcpu_hotplug(self): + response = self.client.post( + reverse('instances:set_vcpu_hotplug', args=[self.instance.id]), + {'vcpu_hotplug': 'True'}, + HTTP_REFERER=reverse('index'), + ) + self.assertEqual(response.status_code, 302) + + def test_change_network(self): + self.assertEqual(self.instance.networks[0]['mac'], '52:54:00:a2:3c:e7') + response = self.client.post( + reverse('instances:change_network', args=[self.instance.id]), + { + 'net-mac-0': '52:54:00:a2:3c:e8', + 'net-source-0': 'net:default', + 'net-nwfilter-0': '', + 'net-model-0': 'virtio', + }, + HTTP_REFERER=reverse('index'), + ) + self.assertEqual(response.status_code, 302) + + del self.instance.networks + self.assertEqual(self.instance.networks[0]['mac'], '52:54:00:a2:3c:e8') + + def test_add_delete_network(self): + self.assertEqual(len(self.instance.networks), 1) + net_mac = self.instance.networks[0]['mac'] + response = self.client.post( + reverse('instances:add_network', args=[self.instance.id]), + { + 'add-net-mac': '52:54:00:a2:3c:e9', + 'add-net-network': 'net:default', + 'add_net-nwfilter': '', + }, + HTTP_REFERER=reverse('index'), + ) + self.assertEqual(response.status_code, 302) + + del self.instance.networks + self.assertEqual(len(self.instance.networks), 2) + self.assertEqual(self.instance.networks[1]['mac'], '52:54:00:a2:3c:e9') response = self.client.post( - reverse('instances:destroy', args=[instance.id]), - {'delete_disk': True}, + reverse('instances:delete_network', args=[self.instance.id]), + { + 'delete_network': '52:54:00:a2:3c:e9', + }, + HTTP_REFERER=reverse('index'), ) - self.assertRedirects(response, reverse('instances:index')) + self.assertEqual(response.status_code, 302) - # # create volume - # response = self.client.post( - # reverse('create_volume', args=[compute.id, 'default']), - # { - # 'name': 'test3', - # 'format': 'qcow2', - # 'size': '1', - # 'meta_prealloc': False, - # }, - # ) - # self.assertRedirects(response, reverse('storage', args=[compute.id, 'default'])) + del self.instance.networks + self.assertEqual(len(self.instance.networks), 1) + self.assertEqual(self.instance.networks[0]['mac'], net_mac) - # # delete volume + def test_set_link_state(self): + self.assertEqual(self.instance.networks[0]['state'], 'up') + response = self.client.post( + reverse('instances:set_link_state', args=[self.instance.id]), + { + 'mac': self.instance.networks[0]['mac'], + 'set_link_state': 'up', + }, + HTTP_REFERER=reverse('index'), + ) + self.assertEqual(response.status_code, 302) + + del self.instance.networks + self.assertEqual(self.instance.networks[0]['state'], 'down') + + def test_set_unset_qos(self): + self.assertEqual(len(self.instance.qos.keys()), 0) + net_mac = self.instance.networks[0]['mac'] + response = self.client.post( + reverse('instances:set_qos', args=[self.instance.id]), + { + 'net-mac-0': net_mac, + 'qos_direction': 'inbound', + 'qos_average': 1, + 'qos_peak': 1, + 'qos_burst': 1, + }, + HTTP_REFERER=reverse('index'), + ) + self.assertEqual(response.status_code, 302) + + del self.instance.qos + self.assertEqual(len(self.instance.qos.keys()), 1) + + response = self.client.post( + reverse('instances:unset_qos', args=[self.instance.id]), + { + 'net-mac': net_mac, + 'qos_direction': 'inbound', + }, + HTTP_REFERER=reverse('index'), + ) + self.assertEqual(response.status_code, 302) + del self.instance.qos + self.assertEqual(len(self.instance.qos.keys()), 0) + + # test on running instance + # self.instance.proxy.start() # response = self.client.post( - # reverse('instances:delete_vol', args=[instance.id]), + # reverse('instances:set_qos', args=[self.instance.id]), # { - # 'storage': 'default', - # 'dev': 'vdb', - # 'name': 'test3.qcow2', + # 'net-mac-0': net_mac, + # 'qos_direction': 'inbound', + # 'qos_average': 1, + # 'qos_peak': 1, + # 'qos_burst': 1, # }, # HTTP_REFERER=reverse('index'), # ) # self.assertEqual(response.status_code, 302) + # self.instance.proxy.force_shutdown() + + def test_change_options(self): + self.assertEqual(self.instance.title, '') + self.assertEqual(self.instance.description, '') + + response = self.client.post( + reverse('instances:change_options', args=[self.instance.id]), + { + 'title': 'test-vm-title', + 'description': 'test-vm description', + }, + HTTP_REFERER=reverse('index'), + ) + self.assertEqual(response.status_code, 302) + + del self.instance.title + del self.instance.description + + self.assertEqual(self.instance.title, 'test-vm-title') + self.assertEqual(self.instance.description, 'test-vm description') + + def test_flavors(self): + response = self.client.get(reverse('instances:flavor_create')) + self.assertEqual(response.status_code, 200) + + response = self.client.post( + reverse('instances:flavor_create'), + { + 'label': 'test_flavor', + 'memory': 256, + 'vcpu': 1, + 'disk': 10, + }, + HTTP_REFERER=reverse('index'), + ) + self.assertEqual(response.status_code, 302) + + id = Flavor.objects.last().id + + response = self.client.get(reverse('instances:flavor_update', args=[id])) + self.assertEqual(response.status_code, 200) + + response = self.client.post( + reverse('instances:flavor_update', args=[id]), + { + 'label': 'test_flavor_', + 'memory': 256, + 'vcpu': 1, + 'disk': 10, + }, + HTTP_REFERER=reverse('index'), + ) + self.assertEqual(response.status_code, 302) + + response = self.client.get(reverse('instances:flavor_delete', args=[id])) + self.assertEqual(response.status_code, 200) + + response = self.client.post( + reverse('instances:flavor_delete', args=[id]), + {}, + HTTP_REFERER=reverse('index'), + ) + self.assertEqual(response.status_code, 302) + + # def donot_test_instance(self): + # compute: Compute = Compute.objects.get(pk=1) + # user: User = User.objects.get(pk=1) + + # # delete started instance with disks + # self.instance.proxy.start() + # del self.instance.status + # self.assertEqual(self.instance.status, 1) + + # # create volume + # response = self.client.post( + # reverse('create_volume', args=[compute.id, 'default']), + # { + # 'name': 'test3', + # 'format': 'qcow2', + # 'size': '1', + # 'meta_prealloc': False, + # }, + # ) + # self.assertRedirects(response, reverse('storage', args=[compute.id, 'default'])) + + # # delete volume + # response = self.client.post( + # reverse('instances:delete_vol', args=[instance.id]), + # { + # 'storage': 'default', + # 'dev': 'vdb', + # 'name': 'test3.qcow2', + # }, + # HTTP_REFERER=reverse('index'), + # ) + # self.assertEqual(response.status_code, 302) + + # , list(response.context['messages'])[0] diff --git a/instances/urls.py b/instances/urls.py index cd1cd2c..7c7f5a0 100644 --- a/instances/urls.py +++ b/instances/urls.py @@ -65,5 +65,5 @@ urlpatterns = [ path('guess_clone_name/', views.guess_clone_name, name='guess_clone_name'), path('random_mac_address/', views.random_mac_address, name='random_mac_address'), path('check_instance//', views.check_instance, name='check_instance'), - path('sshkeys//', views.sshkeys, name='sshkeys'), + path('/sshkeys/', views.sshkeys, name='sshkeys'), ] diff --git a/instances/utils.py b/instances/utils.py index e6be97d..c4f20b8 100644 --- a/instances/utils.py +++ b/instances/utils.py @@ -1,40 +1,17 @@ import os import random import string -from collections import OrderedDict - -from django.conf import settings -from django.contrib.auth.models import User -from django.utils.translation import gettext_lazy as _ from accounts.models import UserInstance from appsettings.settings import app_settings -from logs.views import addlogmsg +from django.conf import settings +from django.utils.translation import gettext_lazy as _ from vrtManager.connection import connection_manager -from vrtManager.hostdetails import wvmHostDetails from vrtManager.instance import wvmInstance, wvmInstances from .models import Instance -def filesizefstr(size_str): - if size_str == '': - return 0 - size_str = size_str.upper().replace("B", "") - if size_str[-1] == 'K': - return int(float(size_str[:-1])) << 10 - elif size_str[-1] == 'M': - return int(float(size_str[:-1])) << 20 - elif size_str[-1] == 'G': - return int(float(size_str[:-1])) << 30 - elif size_str[-1] == 'T': - return int(float(size_str[:-1])) << 40 - elif size_str[-1] == 'P': - return int(float(size_str[:-1])) << 50 - else: - return int(float(size_str)) - - def get_clone_free_names(size=10): prefix = app_settings.CLONE_INSTANCE_DEFAULT_PREFIX free_names = [] @@ -186,36 +163,6 @@ def migrate_instance( instance.save() -def get_hosts_status(computes): - """ - Function return all hosts all vds on host - """ - compute_data = [] - for compute in computes: - compute_data.append({ - 'id': compute.id, - 'name': compute.name, - 'hostname': compute.hostname, - 'status': connection_manager.host_is_up(compute.type, compute.hostname), - 'type': compute.type, - 'login': compute.login, - 'password': compute.password, - 'details': compute.details, - }) - return compute_data - - -def get_userinstances_info(instance): - info = {} - uis = UserInstance.objects.filter(instance=instance) - info['count'] = uis.count() - if info['count'] > 0: - info['first_user'] = uis[0] - else: - info['first_user'] = None - return info - - def refr(compute): if compute.status is True: domains = compute.proxy.wvm.listAllDomains() @@ -229,83 +176,6 @@ def refr(compute): Instance(compute=compute, name=domain.name(), uuid=domain.UUIDString()).save() -def refresh_instance_database(comp, inst_name, info, all_host_vms, user): - # Multiple Instance Name Check - instances = Instance.objects.filter(name=inst_name) - if instances.count() > 1: - for i in instances: - user_instances_count = UserInstance.objects.filter(instance=i).count() - if user_instances_count == 0: - addlogmsg(user.username, i.name, _("Deleting due to multiple(Instance Name) records.")) - i.delete() - - # Multiple UUID Check - instances = Instance.objects.filter(uuid=info['uuid']) - if instances.count() > 1: - for i in instances: - if i.name != inst_name: - addlogmsg(user.username, i.name, _("Deleting due to multiple(UUID) records.")) - i.delete() - - try: - inst_on_db = Instance.objects.get(compute_id=comp["id"], name=inst_name) - if inst_on_db.uuid != info['uuid']: - inst_on_db.save() - - all_host_vms[comp["id"], comp["name"], comp["status"], comp["cpu"], comp["mem_size"], - comp["mem_perc"]][inst_name]['is_template'] = inst_on_db.is_template - all_host_vms[comp["id"], comp["name"], comp["status"], comp["cpu"], comp["mem_size"], - comp["mem_perc"]][inst_name]['userinstances'] = get_userinstances_info(inst_on_db) - except Instance.DoesNotExist: - inst_on_db = Instance(compute_id=comp["id"], name=inst_name, uuid=info['uuid']) - inst_on_db.save() - - -def get_user_instances(user): - all_user_vms = {} - user_instances = UserInstance.objects.filter(user=user) - for usr_inst in user_instances: - if connection_manager.host_is_up(usr_inst.instance.compute.type, usr_inst.instance.compute.hostname): - conn = wvmHostDetails( - usr_inst.instance.compute.hostname, - usr_inst.instance.compute.login, - usr_inst.instance.compute.password, - usr_inst.instance.compute.type, - ) - all_user_vms[usr_inst] = conn.get_user_instances(usr_inst.instance.name) - all_user_vms[usr_inst].update({'compute_id': usr_inst.instance.compute.id}) - return all_user_vms - - -def get_host_instances(compute): - all_host_vms = OrderedDict() - - # if compute.status: - comp_node_info = compute.proxy.get_node_info() - comp_mem = compute.proxy.get_memory_usage() - comp_instances = compute.proxy.get_host_instances(True) - - # if comp_instances: - comp_info = { - "id": compute.id, - "name": compute.name, - "status": compute.status, - "cpu": comp_node_info[3], - "mem_size": comp_node_info[2], - "mem_perc": comp_mem['percent'], - } - # refr(compute) - all_host_vms[comp_info["id"], comp_info["name"], comp_info["status"], comp_info["cpu"], comp_info["mem_size"], - comp_info["mem_perc"]] = comp_instances - for vm, info in comp_instances.items(): - # TODO: Delete this function completely - refresh_instance_database(comp_info, vm, info, all_host_vms, User.objects.get(pk=1)) - - # else: - # raise libvirtError(_(f"Problem occurred with host: {compute.name} - {status}")) - return all_host_vms - - def get_dhcp_mac_address(vname): dhcp_file = settings.BASE_DIR + '/dhcpd.conf' mac = '' @@ -342,20 +212,3 @@ def get_clone_disk_name(disk, prefix, clone_name=''): else: image = f"{disk['image']}-clone" return image - - -# TODO: this function is not used -def _get_clone_disks(disks, vname=''): - clone_disks = [] - for disk in disks: - new_image = get_clone_disk_name(disk, vname) - if not new_image: - continue - new_disk = { - 'dev': disk['dev'], - 'storage': disk['storage'], - 'image': new_image, - 'format': disk['format'], - } - clone_disks.append(new_disk) - return clone_disks diff --git a/instances/views.py b/instances/views.py index 871a1cb..66a0e1b 100644 --- a/instances/views.py +++ b/instances/views.py @@ -51,7 +51,7 @@ def index(request): def instance(request, pk): - instance: Instance = get_object_or_404(Instance, pk=pk) + instance: Instance = get_instance(request.user, pk) compute: Compute = instance.compute computes = Compute.objects.all().order_by('name') computes_count = computes.count() @@ -59,7 +59,6 @@ def instance(request, pk): publickeys = UserSSHKey.objects.filter(user_id=request.user.id) keymaps = settings.QEMU_KEYMAPS console_types = AppSettings.objects.get(key="QEMU_CONSOLE_DEFAULT_TYPE").choices_as_list() - # console_form = ConsoleForm( initial={ 'type': instance.console_type, @@ -67,7 +66,6 @@ def instance(request, pk): 'password': instance.console_passwd, 'keymap': instance.console_keymap, }) - # console_listen_addresses = settings.QEMU_CONSOLE_LISTEN_ADDRESSES bottom_bar = app_settings.VIEW_INSTANCE_DETAIL_BOTTOM_BAR allow_admin_or_not_template = request.user.is_superuser or request.user.is_staff or not instance.is_template @@ -78,15 +76,6 @@ def instance(request, pk): except UserInstance.DoesNotExist: userinstance = None - if not request.user.is_superuser: - if not userinstance: - return redirect(reverse('index')) - - if len(instance.media) != 0: - media_iso = sorted(instance.proxy.get_iso_media()) - else: - media_iso = [] - memory_range = [256, 512, 768, 1024, 2048, 3072, 4096, 6144, 8192, 16384] if instance.memory not in memory_range: insort(memory_range, instance.memory) @@ -127,18 +116,11 @@ def instance(request, pk): vcpu_host = len(instance.vcpu_range) memory_host = instance.proxy.get_max_memory() bus_host = instance.proxy.get_disk_bus_types(instance.arch, instance.machine) - # videos_host = instance.proxy.get_video_models(instance.arch, instance.machine) networks_host = sorted(instance.proxy.get_networks()) nwfilters_host = instance.proxy.get_nwfilters() storages_host = sorted(instance.proxy.get_storages(True)) net_models_host = instance.proxy.get_network_models() - try: - interfaces_host = sorted(instance.proxy.get_ifaces()) - except Exception as e: - addlogmsg(request.user.username, instance.name, e) - messages.error(request, e) - return render(request, 'instance.html', locals()) @@ -210,18 +192,18 @@ def check_instance(request, vname): data = {'vname': vname, 'exists': False} if instance: data['exists'] = True - return HttpResponse(json.dumps(data)) + return JsonResponse(data) -def sshkeys(request, vname): +def sshkeys(request, pk): """ :param request: :param vname: :return: """ - + instance = get_instance(request.user, pk) instance_keys = [] - userinstances = UserInstance.objects.filter(instance__name=vname) + userinstances = UserInstance.objects.filter(instance=instance) for ui in userinstances: keys = UserSSHKey.objects.filter(user=ui.user) @@ -239,7 +221,7 @@ def get_instance(user, pk): ''' Check that instance is available for user, if not raise 404 ''' - instance = Instance.objects.get(pk=pk) + instance = get_object_or_404(Instance, pk=pk) user_instances = user.userinstance_set.all().values_list('instance', flat=True) if user.is_superuser or instance.id in user_instances: @@ -498,7 +480,6 @@ def resize_disk(request, pk): if request.user.is_superuser or request.user.is_staff or userinstance.is_change: disks_new = list() for disk in disks: - # input_disk_size = utils.filesizefstr(request.POST.get('disk_size_' + disk['dev'], '')) input_disk_size = int(request.POST.get('disk_size_' + disk['dev'], '0')) * 1073741824 if input_disk_size > disk['size'] + (64 << 20): disk['size_new'] = input_disk_size @@ -973,7 +954,7 @@ def set_qos(request, pk): messages.success( request, _("%(qos_dir)s QoS is set. Network XML is changed. \ - Stop and start network to activate new config." ) % {'qos_dir': qos_dir.capitalize()}) + Stop and start network to activate new config.") % {'qos_dir': qos_dir.capitalize()}) return redirect(request.META.get('HTTP_REFERER') + '#network') @@ -991,7 +972,7 @@ def unset_qos(request, pk): messages.success( request, _("%(qos_dir)s QoS is deleted. Network XML is changed. \ - Stop and start network to activate new config." ) % {'qos_dir': qos_dir.capitalize()}) + Stop and start network to activate new config.") % {'qos_dir': qos_dir.capitalize()}) return redirect(request.META.get('HTTP_REFERER') + '#network') @@ -1005,7 +986,7 @@ def add_owner(request, pk): if app_settings.ALLOW_INSTANCE_MULTIPLE_OWNER == 'False': check_inst = UserInstance.objects.filter(instance=instance).count() - if check_inst > 1: + if check_inst > 0: messages.error(request, _("Only one owner is allowed and the one already added")) else: add_user_inst = UserInstance(instance=instance, user_id=user_id) @@ -1027,7 +1008,7 @@ def del_owner(request, pk): return redirect(request.META.get('HTTP_REFERER') + '#users') -@permission_required('instances.clone_instances') +@permission_required('instances.clone_instances', raise_exception=True) def clone(request, pk): instance = get_instance(request.user, pk)