mirror of
https://github.com/retspen/webvirtcloud
synced 2025-07-31 12:41:08 +00:00
Accounts app improvements and tests
This commit is contained in:
parent
8afef36656
commit
5172a9f619
20 changed files with 622 additions and 227 deletions
|
|
@ -1,45 +1,111 @@
|
|||
from django.contrib.auth.models import Permission, User
|
||||
from appsettings.settings import app_settings
|
||||
from computes.models import Compute
|
||||
from django.conf import settings
|
||||
from django.contrib.auth import get_user_model
|
||||
from django.contrib.auth.models import Permission
|
||||
from django.shortcuts import reverse
|
||||
from django.test import Client, TestCase
|
||||
from instances.models import Instance
|
||||
from instances.utils import refr
|
||||
from libvirt import VIR_DOMAIN_UNDEFINE_NVRAM
|
||||
from vrtManager.create import wvmCreate
|
||||
|
||||
from accounts.forms import UserInstanceForm, UserSSHKeyForm
|
||||
from accounts.models import UserInstance, UserSSHKey
|
||||
from accounts.utils import validate_ssh_key
|
||||
|
||||
|
||||
class AccountsTestCase(TestCase):
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
super().setUpClass()
|
||||
|
||||
# Add users for testing purposes
|
||||
User = get_user_model()
|
||||
cls.admin_user = User.objects.get(pk=1)
|
||||
cls.test_user = User.objects.create_user(username='test', password='test')
|
||||
|
||||
# Add localhost compute
|
||||
cls.compute = Compute(
|
||||
name='test-compute',
|
||||
hostname='localhost',
|
||||
login='',
|
||||
password='',
|
||||
details='local',
|
||||
type=4,
|
||||
)
|
||||
cls.compute.save()
|
||||
|
||||
cls.connection = wvmCreate(
|
||||
cls.compute.hostname,
|
||||
cls.compute.login,
|
||||
cls.compute.password,
|
||||
cls.compute.type,
|
||||
)
|
||||
|
||||
# Add disks for testing
|
||||
cls.connection.create_volume(
|
||||
'default',
|
||||
'test-volume',
|
||||
1,
|
||||
'qcow2',
|
||||
False,
|
||||
0,
|
||||
0,
|
||||
)
|
||||
|
||||
# XML for testing vm
|
||||
with open('conf/test-vm.xml', 'r') as f:
|
||||
cls.xml = f.read()
|
||||
|
||||
# Create testing vm from XML
|
||||
cls.connection._defineXML(cls.xml)
|
||||
refr(cls.compute)
|
||||
cls.instance = Instance.objects.get(pk=1)
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
# Destroy testing vm
|
||||
cls.instance.proxy.delete_all_disks()
|
||||
cls.instance.proxy.delete(VIR_DOMAIN_UNDEFINE_NVRAM)
|
||||
super().tearDownClass()
|
||||
|
||||
def setUp(self):
|
||||
self.client.login(username='admin', password='admin')
|
||||
user = User.objects.create_user(username='test', password='test')
|
||||
permission = Permission.objects.get(codename='change_password')
|
||||
user.user_permissions.add(permission)
|
||||
self.test_user.user_permissions.add(permission)
|
||||
self.rsa_key = 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAgQC6OOdbfv27QVnSC6sKxGaHb6YFc+3gxCkyVR3cTSXE/n5BEGf8aOgBpepULWa1RZfxYHY14PlKULDygdXSdrrR2kNSwoKz/Oo4d+3EE92L7ocl1+djZbptzgWgtw1OseLwbFik+iKlIdqPsH+IUQvX7yV545ZQtAP8Qj1R+uCqkw== test@test'
|
||||
self.ecdsa_key = 'ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBJc5xpT3R0iFJYNZbmWgAiDlHquX/BcV1kVTsnBfiMsZgU3lGaqz2eb7IBcir/dxGnsVENTTmPQ6sNcxLxT9kkQ= realgecko@archlinux'
|
||||
|
||||
def test_profile(self):
|
||||
response = self.client.get(reverse('profile'))
|
||||
response = self.client.get(reverse('accounts:profile'))
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
response = self.client.get(reverse('account', args=[2]))
|
||||
response = self.client.get(reverse('accounts:account', args=[self.test_user.id]))
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
def test_account_with_otp(self):
|
||||
settings.OTP_ENABLED = True
|
||||
response = self.client.get(reverse('accounts:account', args=[self.test_user.id]))
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
def test_login_logout(self):
|
||||
user = User.objects.get(username='test')
|
||||
self.assertEqual(user.id, 2)
|
||||
|
||||
client = Client()
|
||||
|
||||
response = client.post(reverse("login"), {"username": "test", "password": "test"})
|
||||
self.assertRedirects(response, reverse('profile'))
|
||||
response = client.post(reverse("accounts:login"), {"username": "test", "password": "test"})
|
||||
self.assertRedirects(response, reverse('accounts:profile'))
|
||||
|
||||
response = client.get(reverse('logout'))
|
||||
self.assertRedirects(response, reverse('login'))
|
||||
response = client.get(reverse('accounts:logout'))
|
||||
self.assertRedirects(response, reverse('accounts:login'))
|
||||
|
||||
def test_password_change(self):
|
||||
client = Client()
|
||||
def test_change_password(self):
|
||||
self.client.force_login(self.test_user)
|
||||
|
||||
logged_in = client.login(username='test', password='test')
|
||||
self.assertTrue(logged_in)
|
||||
|
||||
response = client.get(reverse('change_password'))
|
||||
response = self.client.get(reverse('accounts:change_password'))
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
response = client.post(
|
||||
reverse('change_password'),
|
||||
response = self.client.post(
|
||||
reverse('accounts:change_password'),
|
||||
{
|
||||
'old_password': 'wrongpass',
|
||||
'new_password1': 'newpw',
|
||||
|
|
@ -48,17 +114,154 @@ class AccountsTestCase(TestCase):
|
|||
)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
response = client.post(
|
||||
reverse('change_password'),
|
||||
response = self.client.post(
|
||||
reverse('accounts:change_password'),
|
||||
{
|
||||
'old_password': 'test',
|
||||
'new_password1': 'newpw',
|
||||
'new_password2': 'newpw',
|
||||
},
|
||||
)
|
||||
self.assertRedirects(response, reverse('profile'))
|
||||
self.assertRedirects(response, reverse('accounts:profile'))
|
||||
|
||||
client.logout()
|
||||
self.client.logout()
|
||||
|
||||
logged_in = client.login(username='test', password='newpw')
|
||||
logged_in = self.client.login(username='test', password='newpw')
|
||||
self.assertTrue(logged_in)
|
||||
|
||||
def test_user_instance_create_update_delete(self):
|
||||
# create
|
||||
response = self.client.get(reverse('accounts:user_instance_create', args=[self.test_user.id]))
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
response = self.client.post(
|
||||
reverse('accounts:user_instance_create', args=[self.test_user.id]),
|
||||
{
|
||||
'user': self.test_user.id,
|
||||
'instance': self.instance.id,
|
||||
'is_change': False,
|
||||
'is_delete': False,
|
||||
'is_vnc': False,
|
||||
},
|
||||
)
|
||||
self.assertRedirects(response, reverse('accounts:account', args=[self.test_user.id]))
|
||||
|
||||
user_instance: UserInstance = UserInstance.objects.get(pk=1)
|
||||
self.assertEqual(user_instance.user, self.test_user)
|
||||
self.assertEqual(user_instance.instance, self.instance)
|
||||
self.assertEqual(user_instance.is_change, False)
|
||||
self.assertEqual(user_instance.is_delete, False)
|
||||
self.assertEqual(user_instance.is_vnc, False)
|
||||
|
||||
# update
|
||||
response = self.client.get(reverse('accounts:user_instance_update', args=[user_instance.id]))
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
response = self.client.post(
|
||||
reverse('accounts:user_instance_update', args=[user_instance.id]),
|
||||
{
|
||||
'user': self.test_user.id,
|
||||
'instance': self.instance.id,
|
||||
'is_change': True,
|
||||
'is_delete': True,
|
||||
'is_vnc': True,
|
||||
},
|
||||
)
|
||||
self.assertRedirects(response, reverse('accounts:account', args=[self.test_user.id]))
|
||||
|
||||
user_instance: UserInstance = UserInstance.objects.get(pk=1)
|
||||
self.assertEqual(user_instance.user, self.test_user)
|
||||
self.assertEqual(user_instance.instance, self.instance)
|
||||
self.assertEqual(user_instance.is_change, True)
|
||||
self.assertEqual(user_instance.is_delete, True)
|
||||
self.assertEqual(user_instance.is_vnc, True)
|
||||
|
||||
# delete
|
||||
response = self.client.get(reverse('accounts:user_instance_delete', args=[user_instance.id]))
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
response = self.client.post(reverse('accounts:user_instance_delete', args=[user_instance.id]))
|
||||
self.assertRedirects(response, reverse('accounts:account', args=[self.test_user.id]))
|
||||
|
||||
# test 'next' redirect during deletion
|
||||
user_instance = UserInstance.objects.create(user=self.test_user, instance=self.instance)
|
||||
response = self.client.post(
|
||||
reverse('accounts:user_instance_delete', args=[user_instance.id]) + '?next=' + reverse('index'))
|
||||
self.assertRedirects(response, reverse('index'))
|
||||
|
||||
def test_update_user_profile(self):
|
||||
self.client.force_login(self.test_user)
|
||||
|
||||
user = get_user_model().objects.get(username='test')
|
||||
self.assertEqual(user.first_name, '')
|
||||
self.assertEqual(user.last_name, '')
|
||||
self.assertEqual(user.email, '')
|
||||
|
||||
response = self.client.post(reverse('accounts:profile'), {
|
||||
'first_name': 'first name',
|
||||
'last_name': 'last name',
|
||||
'email': 'email@mail.mail',
|
||||
})
|
||||
self.assertRedirects(response, reverse('accounts:profile'))
|
||||
|
||||
user = get_user_model().objects.get(username='test')
|
||||
self.assertEqual(user.first_name, 'first name')
|
||||
self.assertEqual(user.last_name, 'last name')
|
||||
self.assertEqual(user.email, 'email@mail.mail')
|
||||
|
||||
def test_create_delete_ssh_key(self):
|
||||
response = self.client.get(reverse('accounts:ssh_key_create'))
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
response = self.client.post(reverse('accounts:ssh_key_create'), {
|
||||
'keyname': 'keyname',
|
||||
'keypublic': self.rsa_key,
|
||||
})
|
||||
self.assertRedirects(response, reverse('accounts:profile'))
|
||||
|
||||
key = UserSSHKey.objects.get(pk=1)
|
||||
self.assertEqual(key.keyname, 'keyname')
|
||||
self.assertEqual(key.keypublic, self.rsa_key)
|
||||
|
||||
response = self.client.get(reverse('accounts:ssh_key_delete', args=[1]))
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
response = self.client.post(reverse('accounts:ssh_key_delete', args=[1]))
|
||||
self.assertRedirects(response, reverse('accounts:profile'))
|
||||
|
||||
def test_validate_ssh_key(self):
|
||||
self.assertFalse(validate_ssh_key(''))
|
||||
self.assertFalse(validate_ssh_key('ssh-rsa ABBA test@test'))
|
||||
self.assertFalse(validate_ssh_key('ssh-rsa AAAABwdzZGY= test@test'))
|
||||
self.assertFalse(validate_ssh_key('ssh-rsa AAA test@test'))
|
||||
# validate ecdsa key
|
||||
self.assertTrue(validate_ssh_key(self.ecdsa_key))
|
||||
|
||||
def test_forms(self):
|
||||
# raise available validation errors for maximum coverage
|
||||
form = UserSSHKeyForm({'keyname': 'keyname', 'keypublic': self.rsa_key}, user=self.test_user)
|
||||
form.save()
|
||||
|
||||
form = UserSSHKeyForm({'keyname': 'keyname', 'keypublic': self.rsa_key}, user=self.test_user)
|
||||
self.assertFalse(form.is_valid())
|
||||
|
||||
form = UserSSHKeyForm({'keyname': 'keyname', 'keypublic': 'invalid key'}, user=self.test_user)
|
||||
self.assertFalse(form.is_valid())
|
||||
|
||||
app_settings.ALLOW_INSTANCE_MULTIPLE_OWNER = 'False'
|
||||
form = UserInstanceForm({
|
||||
'user': self.admin_user.id,
|
||||
'instance': self.instance.id,
|
||||
'is_change': False,
|
||||
'is_delete': False,
|
||||
'is_vnc': False,
|
||||
})
|
||||
form.save()
|
||||
form = UserInstanceForm({
|
||||
'user': self.test_user.id,
|
||||
'instance': self.instance.id,
|
||||
'is_change': False,
|
||||
'is_delete': False,
|
||||
'is_vnc': False,
|
||||
})
|
||||
self.assertFalse(form.is_valid())
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue