mirror of
https://github.com/retspen/webvirtcloud
synced 2025-01-12 08:25:18 +00:00
308 lines
11 KiB
Python
308 lines
11 KiB
Python
from appsettings.settings import app_settings
|
|
from computes.models import Compute
|
|
from django.conf import settings
|
|
from django.contrib.auth import get_user_model
|
|
from django.contrib.auth.models import Permission
|
|
from django.shortcuts import reverse
|
|
from django.test import Client, TestCase
|
|
from instances.models import Instance
|
|
from instances.utils import refr
|
|
from libvirt import VIR_DOMAIN_UNDEFINE_NVRAM
|
|
from vrtManager.create import wvmCreate
|
|
|
|
from accounts.forms import UserInstanceForm, UserSSHKeyForm
|
|
from accounts.models import UserInstance, UserSSHKey
|
|
from accounts.utils import validate_ssh_key
|
|
|
|
|
|
class AccountsTestCase(TestCase):
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
super().setUpClass()
|
|
|
|
# Add users for testing purposes
|
|
User = get_user_model()
|
|
cls.admin_user = User.objects.get(pk=1)
|
|
cls.test_user = User.objects.create_user(username="test", password="test")
|
|
|
|
# Add localhost compute
|
|
cls.compute = Compute(
|
|
name="test-compute",
|
|
hostname="localhost",
|
|
login="",
|
|
password="",
|
|
details="local",
|
|
type=4,
|
|
)
|
|
cls.compute.save()
|
|
|
|
cls.connection = wvmCreate(
|
|
cls.compute.hostname,
|
|
cls.compute.login,
|
|
cls.compute.password,
|
|
cls.compute.type,
|
|
)
|
|
|
|
# Add disks for testing
|
|
cls.connection.create_volume(
|
|
"default",
|
|
"test-volume",
|
|
1,
|
|
"qcow2",
|
|
False,
|
|
0,
|
|
0,
|
|
)
|
|
|
|
# XML for testing vm
|
|
with open("conf/test-vm.xml", "r") as f:
|
|
cls.xml = f.read()
|
|
|
|
# Create testing vm from XML
|
|
cls.connection._defineXML(cls.xml)
|
|
refr(cls.compute)
|
|
cls.instance = Instance.objects.get(pk=1)
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
# Destroy testing vm
|
|
cls.instance.proxy.delete_all_disks()
|
|
cls.instance.proxy.delete(VIR_DOMAIN_UNDEFINE_NVRAM)
|
|
super().tearDownClass()
|
|
|
|
def setUp(self):
|
|
self.client.login(username="admin", password="admin")
|
|
permission = Permission.objects.get(codename="change_password")
|
|
self.test_user.user_permissions.add(permission)
|
|
self.rsa_key = "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAgQC6OOdbfv27QVnSC6sKxGaHb6YFc+3gxCkyVR3cTSXE/n5BEGf8aOgBpepULWa1RZfxYHY14PlKULDygdXSdrrR2kNSwoKz/Oo4d+3EE92L7ocl1+djZbptzgWgtw1OseLwbFik+iKlIdqPsH+IUQvX7yV545ZQtAP8Qj1R+uCqkw== test@test"
|
|
self.ecdsa_key = "ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBJc5xpT3R0iFJYNZbmWgAiDlHquX/BcV1kVTsnBfiMsZgU3lGaqz2eb7IBcir/dxGnsVENTTmPQ6sNcxLxT9kkQ= realgecko@archlinux"
|
|
|
|
def test_profile(self):
|
|
response = self.client.get(reverse("accounts:profile"))
|
|
self.assertEqual(response.status_code, 200)
|
|
|
|
response = self.client.get(
|
|
reverse("accounts:account", args=[self.test_user.id])
|
|
)
|
|
self.assertEqual(response.status_code, 200)
|
|
|
|
def test_account_with_otp(self):
|
|
settings.OTP_ENABLED = True
|
|
response = self.client.get(
|
|
reverse("accounts:account", args=[self.test_user.id])
|
|
)
|
|
self.assertEqual(response.status_code, 200)
|
|
|
|
def test_login_logout(self):
|
|
client = Client()
|
|
|
|
response = client.post(
|
|
reverse("accounts:login"), {"username": "test", "password": "test"}
|
|
)
|
|
self.assertRedirects(response, reverse("accounts:profile"))
|
|
|
|
response = client.get(reverse("accounts:logout"))
|
|
self.assertRedirects(response, reverse("accounts:login"))
|
|
|
|
def test_change_password(self):
|
|
self.client.force_login(self.test_user)
|
|
|
|
response = self.client.get(reverse("accounts:change_password"))
|
|
self.assertEqual(response.status_code, 200)
|
|
|
|
response = self.client.post(
|
|
reverse("accounts:change_password"),
|
|
{
|
|
"old_password": "wrongpass",
|
|
"new_password1": "newpw",
|
|
"new_password2": "newpw",
|
|
},
|
|
)
|
|
self.assertEqual(response.status_code, 200)
|
|
|
|
response = self.client.post(
|
|
reverse("accounts:change_password"),
|
|
{
|
|
"old_password": "test",
|
|
"new_password1": "newpw",
|
|
"new_password2": "newpw",
|
|
},
|
|
)
|
|
self.assertRedirects(response, reverse("accounts:profile"))
|
|
|
|
self.client.logout()
|
|
|
|
logged_in = self.client.login(username="test", password="newpw")
|
|
self.assertTrue(logged_in)
|
|
|
|
def test_user_instance_create_update_delete(self):
|
|
# create
|
|
response = self.client.get(
|
|
reverse("accounts:user_instance_create", args=[self.test_user.id])
|
|
)
|
|
self.assertEqual(response.status_code, 200)
|
|
|
|
response = self.client.post(
|
|
reverse("accounts:user_instance_create", args=[self.test_user.id]),
|
|
{
|
|
"user": self.test_user.id,
|
|
"instance": self.instance.id,
|
|
"is_change": False,
|
|
"is_delete": False,
|
|
"is_vnc": False,
|
|
},
|
|
)
|
|
self.assertRedirects(
|
|
response, reverse("accounts:account", args=[self.test_user.id])
|
|
)
|
|
|
|
user_instance: UserInstance = UserInstance.objects.get(pk=1)
|
|
self.assertEqual(user_instance.user, self.test_user)
|
|
self.assertEqual(user_instance.instance, self.instance)
|
|
self.assertEqual(user_instance.is_change, False)
|
|
self.assertEqual(user_instance.is_delete, False)
|
|
self.assertEqual(user_instance.is_vnc, False)
|
|
|
|
# update
|
|
response = self.client.get(
|
|
reverse("accounts:user_instance_update", args=[user_instance.id])
|
|
)
|
|
self.assertEqual(response.status_code, 200)
|
|
|
|
response = self.client.post(
|
|
reverse("accounts:user_instance_update", args=[user_instance.id]),
|
|
{
|
|
"user": self.test_user.id,
|
|
"instance": self.instance.id,
|
|
"is_change": True,
|
|
"is_delete": True,
|
|
"is_vnc": True,
|
|
},
|
|
)
|
|
self.assertRedirects(
|
|
response, reverse("accounts:account", args=[self.test_user.id])
|
|
)
|
|
|
|
user_instance: UserInstance = UserInstance.objects.get(pk=1)
|
|
self.assertEqual(user_instance.user, self.test_user)
|
|
self.assertEqual(user_instance.instance, self.instance)
|
|
self.assertEqual(user_instance.is_change, True)
|
|
self.assertEqual(user_instance.is_delete, True)
|
|
self.assertEqual(user_instance.is_vnc, True)
|
|
|
|
# delete
|
|
response = self.client.get(
|
|
reverse("accounts:user_instance_delete", args=[user_instance.id])
|
|
)
|
|
self.assertEqual(response.status_code, 200)
|
|
|
|
response = self.client.post(
|
|
reverse("accounts:user_instance_delete", args=[user_instance.id])
|
|
)
|
|
self.assertRedirects(
|
|
response, reverse("accounts:account", args=[self.test_user.id])
|
|
)
|
|
|
|
# test 'next' redirect during deletion
|
|
user_instance = UserInstance.objects.create(
|
|
user=self.test_user, instance=self.instance
|
|
)
|
|
response = self.client.post(
|
|
reverse("accounts:user_instance_delete", args=[user_instance.id])
|
|
+ "?next="
|
|
+ reverse("index")
|
|
)
|
|
self.assertRedirects(response, reverse("index"))
|
|
|
|
def test_update_user_profile(self):
|
|
self.client.force_login(self.test_user)
|
|
|
|
user = get_user_model().objects.get(username="test")
|
|
self.assertEqual(user.first_name, "")
|
|
self.assertEqual(user.last_name, "")
|
|
self.assertEqual(user.email, "")
|
|
|
|
response = self.client.post(
|
|
reverse("accounts:profile"),
|
|
{
|
|
"first_name": "first name",
|
|
"last_name": "last name",
|
|
"email": "email@mail.mail",
|
|
},
|
|
)
|
|
self.assertRedirects(response, reverse("accounts:profile"))
|
|
|
|
user = get_user_model().objects.get(username="test")
|
|
self.assertEqual(user.first_name, "first name")
|
|
self.assertEqual(user.last_name, "last name")
|
|
self.assertEqual(user.email, "email@mail.mail")
|
|
|
|
def test_create_delete_ssh_key(self):
|
|
response = self.client.get(reverse("accounts:ssh_key_create"))
|
|
self.assertEqual(response.status_code, 200)
|
|
|
|
response = self.client.post(
|
|
reverse("accounts:ssh_key_create"),
|
|
{
|
|
"keyname": "keyname",
|
|
"keypublic": self.rsa_key,
|
|
},
|
|
)
|
|
self.assertRedirects(response, reverse("accounts:profile"))
|
|
|
|
key = UserSSHKey.objects.get(pk=1)
|
|
self.assertEqual(key.keyname, "keyname")
|
|
self.assertEqual(key.keypublic, self.rsa_key)
|
|
|
|
response = self.client.get(reverse("accounts:ssh_key_delete", args=[1]))
|
|
self.assertEqual(response.status_code, 200)
|
|
|
|
response = self.client.post(reverse("accounts:ssh_key_delete", args=[1]))
|
|
self.assertRedirects(response, reverse("accounts:profile"))
|
|
|
|
def test_validate_ssh_key(self):
|
|
self.assertFalse(validate_ssh_key(""))
|
|
self.assertFalse(validate_ssh_key("ssh-rsa ABBA test@test"))
|
|
self.assertFalse(validate_ssh_key("ssh-rsa AAAABwdzZGY= test@test"))
|
|
self.assertFalse(validate_ssh_key("ssh-rsa AAA test@test"))
|
|
# validate ecdsa key
|
|
self.assertTrue(validate_ssh_key(self.ecdsa_key))
|
|
|
|
def test_forms(self):
|
|
# raise available validation errors for maximum coverage
|
|
form = UserSSHKeyForm(
|
|
{"keyname": "keyname", "keypublic": self.rsa_key}, user=self.test_user
|
|
)
|
|
form.save()
|
|
|
|
form = UserSSHKeyForm(
|
|
{"keyname": "keyname", "keypublic": self.rsa_key}, user=self.test_user
|
|
)
|
|
self.assertFalse(form.is_valid())
|
|
|
|
form = UserSSHKeyForm(
|
|
{"keyname": "keyname", "keypublic": "invalid key"}, user=self.test_user
|
|
)
|
|
self.assertFalse(form.is_valid())
|
|
|
|
app_settings.ALLOW_INSTANCE_MULTIPLE_OWNER = "False"
|
|
form = UserInstanceForm(
|
|
{
|
|
"user": self.admin_user.id,
|
|
"instance": self.instance.id,
|
|
"is_change": False,
|
|
"is_delete": False,
|
|
"is_vnc": False,
|
|
}
|
|
)
|
|
form.save()
|
|
form = UserInstanceForm(
|
|
{
|
|
"user": self.test_user.id,
|
|
"instance": self.instance.id,
|
|
"is_change": False,
|
|
"is_delete": False,
|
|
"is_vnc": False,
|
|
}
|
|
)
|
|
self.assertFalse(form.is_valid())
|