from appsettings.settings import app_settings from computes.models import Compute from django.conf import settings from django.contrib.auth import get_user_model from django.contrib.auth.models import Permission from django.shortcuts import reverse from django.test import Client, TestCase from instances.models import Instance from instances.utils import refr from libvirt import VIR_DOMAIN_UNDEFINE_NVRAM from vrtManager.create import wvmCreate from accounts.forms import UserInstanceForm, UserSSHKeyForm from accounts.models import UserInstance, UserSSHKey from accounts.utils import validate_ssh_key class AccountsTestCase(TestCase): @classmethod def setUpClass(cls): super().setUpClass() # Add users for testing purposes User = get_user_model() cls.admin_user = User.objects.get(pk=1) cls.test_user = User.objects.create_user(username="test", password="test") # Add localhost compute cls.compute = Compute( name="test-compute", hostname="localhost", login="", password="", details="local", type=4, ) cls.compute.save() cls.connection = wvmCreate( cls.compute.hostname, cls.compute.login, cls.compute.password, cls.compute.type, ) # Add disks for testing cls.connection.create_volume( "default", "test-volume", 1, "qcow2", False, 0, 0, ) # XML for testing vm with open("conf/test-vm.xml", "r") as f: cls.xml = f.read() # Create testing vm from XML cls.connection._defineXML(cls.xml) refr(cls.compute) cls.instance = Instance.objects.get(pk=1) @classmethod def tearDownClass(cls): # Destroy testing vm cls.instance.proxy.delete_all_disks() cls.instance.proxy.delete(VIR_DOMAIN_UNDEFINE_NVRAM) super().tearDownClass() def setUp(self): self.client.login(username="admin", password="admin") permission = Permission.objects.get(codename="change_password") self.test_user.user_permissions.add(permission) self.rsa_key = "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAgQC6OOdbfv27QVnSC6sKxGaHb6YFc+3gxCkyVR3cTSXE/n5BEGf8aOgBpepULWa1RZfxYHY14PlKULDygdXSdrrR2kNSwoKz/Oo4d+3EE92L7ocl1+djZbptzgWgtw1OseLwbFik+iKlIdqPsH+IUQvX7yV545ZQtAP8Qj1R+uCqkw== test@test" self.ecdsa_key = "ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBJc5xpT3R0iFJYNZbmWgAiDlHquX/BcV1kVTsnBfiMsZgU3lGaqz2eb7IBcir/dxGnsVENTTmPQ6sNcxLxT9kkQ= realgecko@archlinux" def test_profile(self): response = self.client.get(reverse("accounts:profile")) self.assertEqual(response.status_code, 200) response = self.client.get( reverse("accounts:account", args=[self.test_user.id]) ) self.assertEqual(response.status_code, 200) def test_account_with_otp(self): settings.OTP_ENABLED = True response = self.client.get( reverse("accounts:account", args=[self.test_user.id]) ) self.assertEqual(response.status_code, 200) def test_login_logout(self): client = Client() response = client.post( reverse("accounts:login"), {"username": "test", "password": "test"} ) self.assertRedirects(response, reverse("accounts:profile")) response = client.get(reverse("accounts:logout")) self.assertRedirects(response, reverse("accounts:login")) def test_change_password(self): self.client.force_login(self.test_user) response = self.client.get(reverse("accounts:change_password")) self.assertEqual(response.status_code, 200) response = self.client.post( reverse("accounts:change_password"), { "old_password": "wrongpass", "new_password1": "newpw", "new_password2": "newpw", }, ) self.assertEqual(response.status_code, 200) response = self.client.post( reverse("accounts:change_password"), { "old_password": "test", "new_password1": "newpw", "new_password2": "newpw", }, ) self.assertRedirects(response, reverse("accounts:profile")) self.client.logout() logged_in = self.client.login(username="test", password="newpw") self.assertTrue(logged_in) def test_user_instance_create_update_delete(self): # create response = self.client.get( reverse("accounts:user_instance_create", args=[self.test_user.id]) ) self.assertEqual(response.status_code, 200) response = self.client.post( reverse("accounts:user_instance_create", args=[self.test_user.id]), { "user": self.test_user.id, "instance": self.instance.id, "is_change": False, "is_delete": False, "is_vnc": False, }, ) self.assertRedirects( response, reverse("accounts:account", args=[self.test_user.id]) ) user_instance: UserInstance = UserInstance.objects.get(pk=1) self.assertEqual(user_instance.user, self.test_user) self.assertEqual(user_instance.instance, self.instance) self.assertEqual(user_instance.is_change, False) self.assertEqual(user_instance.is_delete, False) self.assertEqual(user_instance.is_vnc, False) # update response = self.client.get( reverse("accounts:user_instance_update", args=[user_instance.id]) ) self.assertEqual(response.status_code, 200) response = self.client.post( reverse("accounts:user_instance_update", args=[user_instance.id]), { "user": self.test_user.id, "instance": self.instance.id, "is_change": True, "is_delete": True, "is_vnc": True, }, ) self.assertRedirects( response, reverse("accounts:account", args=[self.test_user.id]) ) user_instance: UserInstance = UserInstance.objects.get(pk=1) self.assertEqual(user_instance.user, self.test_user) self.assertEqual(user_instance.instance, self.instance) self.assertEqual(user_instance.is_change, True) self.assertEqual(user_instance.is_delete, True) self.assertEqual(user_instance.is_vnc, True) # delete response = self.client.get( reverse("accounts:user_instance_delete", args=[user_instance.id]) ) self.assertEqual(response.status_code, 200) response = self.client.post( reverse("accounts:user_instance_delete", args=[user_instance.id]) ) self.assertRedirects( response, reverse("accounts:account", args=[self.test_user.id]) ) # test 'next' redirect during deletion user_instance = UserInstance.objects.create( user=self.test_user, instance=self.instance ) response = self.client.post( reverse("accounts:user_instance_delete", args=[user_instance.id]) + "?next=" + reverse("index") ) self.assertRedirects(response, reverse("index")) def test_update_user_profile(self): self.client.force_login(self.test_user) user = get_user_model().objects.get(username="test") self.assertEqual(user.first_name, "") self.assertEqual(user.last_name, "") self.assertEqual(user.email, "") response = self.client.post( reverse("accounts:profile"), { "first_name": "first name", "last_name": "last name", "email": "email@mail.mail", }, ) self.assertRedirects(response, reverse("accounts:profile")) user = get_user_model().objects.get(username="test") self.assertEqual(user.first_name, "first name") self.assertEqual(user.last_name, "last name") self.assertEqual(user.email, "email@mail.mail") def test_create_delete_ssh_key(self): response = self.client.get(reverse("accounts:ssh_key_create")) self.assertEqual(response.status_code, 200) response = self.client.post( reverse("accounts:ssh_key_create"), { "keyname": "keyname", "keypublic": self.rsa_key, }, ) self.assertRedirects(response, reverse("accounts:profile")) key = UserSSHKey.objects.get(pk=1) self.assertEqual(key.keyname, "keyname") self.assertEqual(key.keypublic, self.rsa_key) response = self.client.get(reverse("accounts:ssh_key_delete", args=[1])) self.assertEqual(response.status_code, 200) response = self.client.post(reverse("accounts:ssh_key_delete", args=[1])) self.assertRedirects(response, reverse("accounts:profile")) def test_validate_ssh_key(self): self.assertFalse(validate_ssh_key("")) self.assertFalse(validate_ssh_key("ssh-rsa ABBA test@test")) self.assertFalse(validate_ssh_key("ssh-rsa AAAABwdzZGY= test@test")) self.assertFalse(validate_ssh_key("ssh-rsa AAA test@test")) # validate ecdsa key self.assertTrue(validate_ssh_key(self.ecdsa_key)) def test_forms(self): # raise available validation errors for maximum coverage form = UserSSHKeyForm( {"keyname": "keyname", "keypublic": self.rsa_key}, user=self.test_user ) form.save() form = UserSSHKeyForm( {"keyname": "keyname", "keypublic": self.rsa_key}, user=self.test_user ) self.assertFalse(form.is_valid()) form = UserSSHKeyForm( {"keyname": "keyname", "keypublic": "invalid key"}, user=self.test_user ) self.assertFalse(form.is_valid()) app_settings.ALLOW_INSTANCE_MULTIPLE_OWNER = "False" form = UserInstanceForm( { "user": self.admin_user.id, "instance": self.instance.id, "is_change": False, "is_delete": False, "is_vnc": False, } ) form.save() form = UserInstanceForm( { "user": self.test_user.id, "instance": self.instance.id, "is_change": False, "is_delete": False, "is_vnc": False, } ) self.assertFalse(form.is_valid())