mirror of
https://github.com/retspen/webvirtcloud
synced 2025-07-31 12:41:08 +00:00
format python code with black
This commit is contained in:
parent
ea409ca863
commit
217e106c8b
55 changed files with 2510 additions and 1454 deletions
|
@ -3,48 +3,59 @@ from django.db.models.signals import post_migrate
|
|||
|
||||
|
||||
def apply_change_password(sender, **kwargs):
|
||||
'''
|
||||
"""
|
||||
Apply new change_password permission for all users
|
||||
Depending on settings SHOW_PROFILE_EDIT_PASSWORD
|
||||
'''
|
||||
"""
|
||||
from django.conf import settings
|
||||
from django.contrib.auth.models import Permission, User
|
||||
if hasattr(settings, 'SHOW_PROFILE_EDIT_PASSWORD'):
|
||||
print('\033[1m! \033[92mSHOW_PROFILE_EDIT_PASSWORD is found inside settings.py\033[0m')
|
||||
print('\033[1m* \033[92mApplying permission can_change_password for all users\033[0m')
|
||||
|
||||
if hasattr(settings, "SHOW_PROFILE_EDIT_PASSWORD"):
|
||||
print("\033[1m! \033[92mSHOW_PROFILE_EDIT_PASSWORD is found inside settings.py\033[0m")
|
||||
print("\033[1m* \033[92mApplying permission can_change_password for all users\033[0m")
|
||||
users = User.objects.all()
|
||||
permission = Permission.objects.get(codename='change_password')
|
||||
permission = Permission.objects.get(codename="change_password")
|
||||
if settings.SHOW_PROFILE_EDIT_PASSWORD:
|
||||
print('\033[1m! \033[91mWarning!!! Setting to True for all users\033[0m')
|
||||
print("\033[1m! \033[91mWarning!!! Setting to True for all users\033[0m")
|
||||
for user in users:
|
||||
user.user_permissions.add(permission)
|
||||
else:
|
||||
print('\033[1m* \033[91mWarning!!! Setting to False for all users\033[0m')
|
||||
print("\033[1m* \033[91mWarning!!! Setting to False for all users\033[0m")
|
||||
for user in users:
|
||||
user.user_permissions.remove(permission)
|
||||
print('\033[1m! Don`t forget to remove the option from settings.py\033[0m')
|
||||
print("\033[1m! Don`t forget to remove the option from settings.py\033[0m")
|
||||
|
||||
|
||||
def create_admin(sender, **kwargs):
|
||||
'''
|
||||
"""
|
||||
Create initial admin user
|
||||
'''
|
||||
"""
|
||||
from accounts.models import UserAttributes
|
||||
from django.contrib.auth.models import User
|
||||
|
||||
plan = kwargs.get('plan', [])
|
||||
plan = kwargs.get("plan", [])
|
||||
for migration, rolled_back in plan:
|
||||
if migration.app_label == 'accounts' and migration.name == '0001_initial' and not rolled_back:
|
||||
if (
|
||||
migration.app_label == "accounts"
|
||||
and migration.name == "0001_initial"
|
||||
and not rolled_back
|
||||
):
|
||||
if User.objects.count() == 0:
|
||||
print('\033[1m* \033[92mCreating default admin user\033[0m')
|
||||
admin = User.objects.create_superuser('admin', None, 'admin')
|
||||
UserAttributes(user=admin, max_instances=-1, max_cpus=-1, max_memory=-1, max_disk_size=-1).save()
|
||||
print("\033[1m* \033[92mCreating default admin user\033[0m")
|
||||
admin = User.objects.create_superuser("admin", None, "admin")
|
||||
UserAttributes(
|
||||
user=admin,
|
||||
max_instances=-1,
|
||||
max_cpus=-1,
|
||||
max_memory=-1,
|
||||
max_disk_size=-1,
|
||||
).save()
|
||||
break
|
||||
|
||||
|
||||
class AccountsConfig(AppConfig):
|
||||
name = 'accounts'
|
||||
verbose_name = 'Accounts'
|
||||
name = "accounts"
|
||||
verbose_name = "Accounts"
|
||||
|
||||
def ready(self):
|
||||
post_migrate.connect(create_admin, sender=self)
|
||||
|
|
|
@ -12,52 +12,52 @@ class UserInstanceForm(ModelForm):
|
|||
super(UserInstanceForm, self).__init__(*args, **kwargs)
|
||||
|
||||
# Make user and instance fields not editable after creation
|
||||
instance = getattr(self, 'instance', None)
|
||||
instance = getattr(self, "instance", None)
|
||||
if instance and instance.id is not None:
|
||||
self.fields['user'].disabled = True
|
||||
self.fields['instance'].disabled = True
|
||||
self.fields["user"].disabled = True
|
||||
self.fields["instance"].disabled = True
|
||||
|
||||
def clean_instance(self):
|
||||
instance = self.cleaned_data['instance']
|
||||
if app_settings.ALLOW_INSTANCE_MULTIPLE_OWNER == 'False':
|
||||
instance = self.cleaned_data["instance"]
|
||||
if app_settings.ALLOW_INSTANCE_MULTIPLE_OWNER == "False":
|
||||
exists = UserInstance.objects.filter(instance=instance)
|
||||
if exists:
|
||||
raise ValidationError(_('Instance owned by another user'))
|
||||
raise ValidationError(_("Instance owned by another user"))
|
||||
|
||||
return instance
|
||||
|
||||
class Meta:
|
||||
model = UserInstance
|
||||
fields = '__all__'
|
||||
fields = "__all__"
|
||||
|
||||
|
||||
class ProfileForm(ModelForm):
|
||||
class Meta:
|
||||
model = get_user_model()
|
||||
fields = ('first_name', 'last_name', 'email')
|
||||
fields = ("first_name", "last_name", "email")
|
||||
|
||||
|
||||
class UserSSHKeyForm(ModelForm):
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.user = kwargs.pop('user', None)
|
||||
self.user = kwargs.pop("user", None)
|
||||
self.publickeys = UserSSHKey.objects.filter(user=self.user)
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def clean_keyname(self):
|
||||
for key in self.publickeys:
|
||||
if self.cleaned_data['keyname'] == key.keyname:
|
||||
if self.cleaned_data["keyname"] == key.keyname:
|
||||
raise ValidationError(_("Key name already exist"))
|
||||
|
||||
return self.cleaned_data['keyname']
|
||||
return self.cleaned_data["keyname"]
|
||||
|
||||
def clean_keypublic(self):
|
||||
for key in self.publickeys:
|
||||
if self.cleaned_data['keypublic'] == key.keypublic:
|
||||
if self.cleaned_data["keypublic"] == key.keypublic:
|
||||
raise ValidationError(_("Public key already exist"))
|
||||
|
||||
if not validate_ssh_key(self.cleaned_data['keypublic']):
|
||||
raise ValidationError(_('Invalid key'))
|
||||
return self.cleaned_data['keypublic']
|
||||
if not validate_ssh_key(self.cleaned_data["keypublic"]):
|
||||
raise ValidationError(_("Invalid key"))
|
||||
return self.cleaned_data["keypublic"]
|
||||
|
||||
def save(self, commit=True):
|
||||
ssh_key = super().save(commit=False)
|
||||
|
@ -68,8 +68,8 @@ class UserSSHKeyForm(ModelForm):
|
|||
|
||||
class Meta:
|
||||
model = UserSSHKey
|
||||
fields = ('keyname', 'keypublic')
|
||||
fields = ("keyname", "keypublic")
|
||||
|
||||
|
||||
class EmailOTPForm(Form):
|
||||
email = EmailField(label=_('Email'))
|
||||
email = EmailField(label=_("Email"))
|
||||
|
|
|
@ -7,7 +7,7 @@ from instances.models import Instance
|
|||
|
||||
class UserInstanceManager(models.Manager):
|
||||
def get_queryset(self):
|
||||
return super().get_queryset().select_related('instance', 'user')
|
||||
return super().get_queryset().select_related("instance", "user")
|
||||
|
||||
|
||||
class UserInstance(models.Model):
|
||||
|
@ -20,16 +20,19 @@ class UserInstance(models.Model):
|
|||
objects = UserInstanceManager()
|
||||
|
||||
def __str__(self):
|
||||
return _('Instance "%(inst)s" of user %(user)s') % {"inst": self.instance, "user": self.user}
|
||||
return _('Instance "%(inst)s" of user %(user)s') % {
|
||||
"inst": self.instance,
|
||||
"user": self.user,
|
||||
}
|
||||
|
||||
class Meta:
|
||||
unique_together = ['user', 'instance']
|
||||
unique_together = ["user", "instance"]
|
||||
|
||||
|
||||
class UserSSHKey(models.Model):
|
||||
user = models.ForeignKey(User, on_delete=models.DO_NOTHING)
|
||||
keyname = models.CharField(_('key name'), max_length=25)
|
||||
keypublic = models.CharField(_('public key'), max_length=500)
|
||||
keyname = models.CharField(_("key name"), max_length=25)
|
||||
keypublic = models.CharField(_("public key"), max_length=500)
|
||||
|
||||
def __str__(self):
|
||||
return self.keyname
|
||||
|
@ -38,26 +41,26 @@ class UserSSHKey(models.Model):
|
|||
class UserAttributes(models.Model):
|
||||
user = models.OneToOneField(User, on_delete=models.CASCADE)
|
||||
can_clone_instances = models.BooleanField(default=True)
|
||||
max_instances = models.IntegerField(_('max instances'),
|
||||
default=2,
|
||||
help_text=_("-1 for unlimited. Any integer value"),
|
||||
validators=[
|
||||
MinValueValidator(-1),
|
||||
])
|
||||
max_instances = models.IntegerField(
|
||||
_("max instances"),
|
||||
default=2,
|
||||
help_text=_("-1 for unlimited. Any integer value"),
|
||||
validators=[MinValueValidator(-1)],
|
||||
)
|
||||
max_cpus = models.IntegerField(
|
||||
_('max CPUs'),
|
||||
_("max CPUs"),
|
||||
default=2,
|
||||
help_text=_("-1 for unlimited. Any integer value"),
|
||||
validators=[MinValueValidator(-1)],
|
||||
)
|
||||
max_memory = models.IntegerField(
|
||||
_('max memory'),
|
||||
_("max memory"),
|
||||
default=2048,
|
||||
help_text=_("-1 for unlimited. Any integer value"),
|
||||
validators=[MinValueValidator(-1)],
|
||||
)
|
||||
max_disk_size = models.IntegerField(
|
||||
_('max disk size'),
|
||||
_("max disk size"),
|
||||
default=20,
|
||||
help_text=_("-1 for unlimited. Any integer value"),
|
||||
validators=[MinValueValidator(-1)],
|
||||
|
@ -71,8 +74,9 @@ class PermissionSet(models.Model):
|
|||
"""
|
||||
Dummy model for holding set of permissions we need to be automatically added by Django
|
||||
"""
|
||||
|
||||
class Meta:
|
||||
default_permissions = ()
|
||||
permissions = (('change_password', _('Can change password')), )
|
||||
permissions = (("change_password", _("Can change password")),)
|
||||
|
||||
managed = False
|
||||
|
|
|
@ -23,15 +23,15 @@ class AccountsTestCase(TestCase):
|
|||
# Add users for testing purposes
|
||||
User = get_user_model()
|
||||
cls.admin_user = User.objects.get(pk=1)
|
||||
cls.test_user = User.objects.create_user(username='test', password='test')
|
||||
cls.test_user = User.objects.create_user(username="test", password="test")
|
||||
|
||||
# Add localhost compute
|
||||
cls.compute = Compute(
|
||||
name='test-compute',
|
||||
hostname='localhost',
|
||||
login='',
|
||||
password='',
|
||||
details='local',
|
||||
name="test-compute",
|
||||
hostname="localhost",
|
||||
login="",
|
||||
password="",
|
||||
details="local",
|
||||
type=4,
|
||||
)
|
||||
cls.compute.save()
|
||||
|
@ -45,17 +45,17 @@ class AccountsTestCase(TestCase):
|
|||
|
||||
# Add disks for testing
|
||||
cls.connection.create_volume(
|
||||
'default',
|
||||
'test-volume',
|
||||
"default",
|
||||
"test-volume",
|
||||
1,
|
||||
'qcow2',
|
||||
"qcow2",
|
||||
False,
|
||||
0,
|
||||
0,
|
||||
)
|
||||
|
||||
# XML for testing vm
|
||||
with open('conf/test-vm.xml', 'r') as f:
|
||||
with open("conf/test-vm.xml", "r") as f:
|
||||
cls.xml = f.read()
|
||||
|
||||
# Create testing vm from XML
|
||||
|
@ -71,80 +71,90 @@ class AccountsTestCase(TestCase):
|
|||
super().tearDownClass()
|
||||
|
||||
def setUp(self):
|
||||
self.client.login(username='admin', password='admin')
|
||||
permission = Permission.objects.get(codename='change_password')
|
||||
self.client.login(username="admin", password="admin")
|
||||
permission = Permission.objects.get(codename="change_password")
|
||||
self.test_user.user_permissions.add(permission)
|
||||
self.rsa_key = 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAgQC6OOdbfv27QVnSC6sKxGaHb6YFc+3gxCkyVR3cTSXE/n5BEGf8aOgBpepULWa1RZfxYHY14PlKULDygdXSdrrR2kNSwoKz/Oo4d+3EE92L7ocl1+djZbptzgWgtw1OseLwbFik+iKlIdqPsH+IUQvX7yV545ZQtAP8Qj1R+uCqkw== test@test'
|
||||
self.ecdsa_key = 'ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBJc5xpT3R0iFJYNZbmWgAiDlHquX/BcV1kVTsnBfiMsZgU3lGaqz2eb7IBcir/dxGnsVENTTmPQ6sNcxLxT9kkQ= realgecko@archlinux'
|
||||
self.rsa_key = "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAgQC6OOdbfv27QVnSC6sKxGaHb6YFc+3gxCkyVR3cTSXE/n5BEGf8aOgBpepULWa1RZfxYHY14PlKULDygdXSdrrR2kNSwoKz/Oo4d+3EE92L7ocl1+djZbptzgWgtw1OseLwbFik+iKlIdqPsH+IUQvX7yV545ZQtAP8Qj1R+uCqkw== test@test"
|
||||
self.ecdsa_key = "ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBJc5xpT3R0iFJYNZbmWgAiDlHquX/BcV1kVTsnBfiMsZgU3lGaqz2eb7IBcir/dxGnsVENTTmPQ6sNcxLxT9kkQ= realgecko@archlinux"
|
||||
|
||||
def test_profile(self):
|
||||
response = self.client.get(reverse('accounts:profile'))
|
||||
response = self.client.get(reverse("accounts:profile"))
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
response = self.client.get(reverse('accounts:account', args=[self.test_user.id]))
|
||||
response = self.client.get(
|
||||
reverse("accounts:account", args=[self.test_user.id])
|
||||
)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
def test_account_with_otp(self):
|
||||
settings.OTP_ENABLED = True
|
||||
response = self.client.get(reverse('accounts:account', args=[self.test_user.id]))
|
||||
response = self.client.get(
|
||||
reverse("accounts:account", args=[self.test_user.id])
|
||||
)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
def test_login_logout(self):
|
||||
client = Client()
|
||||
|
||||
response = client.post(reverse("accounts:login"), {"username": "test", "password": "test"})
|
||||
self.assertRedirects(response, reverse('accounts:profile'))
|
||||
response = client.post(
|
||||
reverse("accounts:login"), {"username": "test", "password": "test"}
|
||||
)
|
||||
self.assertRedirects(response, reverse("accounts:profile"))
|
||||
|
||||
response = client.get(reverse('accounts:logout'))
|
||||
self.assertRedirects(response, reverse('accounts:login'))
|
||||
response = client.get(reverse("accounts:logout"))
|
||||
self.assertRedirects(response, reverse("accounts:login"))
|
||||
|
||||
def test_change_password(self):
|
||||
self.client.force_login(self.test_user)
|
||||
|
||||
response = self.client.get(reverse('accounts:change_password'))
|
||||
response = self.client.get(reverse("accounts:change_password"))
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
response = self.client.post(
|
||||
reverse('accounts:change_password'),
|
||||
reverse("accounts:change_password"),
|
||||
{
|
||||
'old_password': 'wrongpass',
|
||||
'new_password1': 'newpw',
|
||||
'new_password2': 'newpw',
|
||||
"old_password": "wrongpass",
|
||||
"new_password1": "newpw",
|
||||
"new_password2": "newpw",
|
||||
},
|
||||
)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
response = self.client.post(
|
||||
reverse('accounts:change_password'),
|
||||
reverse("accounts:change_password"),
|
||||
{
|
||||
'old_password': 'test',
|
||||
'new_password1': 'newpw',
|
||||
'new_password2': 'newpw',
|
||||
"old_password": "test",
|
||||
"new_password1": "newpw",
|
||||
"new_password2": "newpw",
|
||||
},
|
||||
)
|
||||
self.assertRedirects(response, reverse('accounts:profile'))
|
||||
self.assertRedirects(response, reverse("accounts:profile"))
|
||||
|
||||
self.client.logout()
|
||||
|
||||
logged_in = self.client.login(username='test', password='newpw')
|
||||
logged_in = self.client.login(username="test", password="newpw")
|
||||
self.assertTrue(logged_in)
|
||||
|
||||
def test_user_instance_create_update_delete(self):
|
||||
# create
|
||||
response = self.client.get(reverse('accounts:user_instance_create', args=[self.test_user.id]))
|
||||
response = self.client.get(
|
||||
reverse("accounts:user_instance_create", args=[self.test_user.id])
|
||||
)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
response = self.client.post(
|
||||
reverse('accounts:user_instance_create', args=[self.test_user.id]),
|
||||
reverse("accounts:user_instance_create", args=[self.test_user.id]),
|
||||
{
|
||||
'user': self.test_user.id,
|
||||
'instance': self.instance.id,
|
||||
'is_change': False,
|
||||
'is_delete': False,
|
||||
'is_vnc': False,
|
||||
"user": self.test_user.id,
|
||||
"instance": self.instance.id,
|
||||
"is_change": False,
|
||||
"is_delete": False,
|
||||
"is_vnc": False,
|
||||
},
|
||||
)
|
||||
self.assertRedirects(response, reverse('accounts:account', args=[self.test_user.id]))
|
||||
self.assertRedirects(
|
||||
response, reverse("accounts:account", args=[self.test_user.id])
|
||||
)
|
||||
|
||||
user_instance: UserInstance = UserInstance.objects.get(pk=1)
|
||||
self.assertEqual(user_instance.user, self.test_user)
|
||||
|
@ -154,20 +164,24 @@ class AccountsTestCase(TestCase):
|
|||
self.assertEqual(user_instance.is_vnc, False)
|
||||
|
||||
# update
|
||||
response = self.client.get(reverse('accounts:user_instance_update', args=[user_instance.id]))
|
||||
response = self.client.get(
|
||||
reverse("accounts:user_instance_update", args=[user_instance.id])
|
||||
)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
response = self.client.post(
|
||||
reverse('accounts:user_instance_update', args=[user_instance.id]),
|
||||
reverse("accounts:user_instance_update", args=[user_instance.id]),
|
||||
{
|
||||
'user': self.test_user.id,
|
||||
'instance': self.instance.id,
|
||||
'is_change': True,
|
||||
'is_delete': True,
|
||||
'is_vnc': True,
|
||||
"user": self.test_user.id,
|
||||
"instance": self.instance.id,
|
||||
"is_change": True,
|
||||
"is_delete": True,
|
||||
"is_vnc": True,
|
||||
},
|
||||
)
|
||||
self.assertRedirects(response, reverse('accounts:account', args=[self.test_user.id]))
|
||||
self.assertRedirects(
|
||||
response, reverse("accounts:account", args=[self.test_user.id])
|
||||
)
|
||||
|
||||
user_instance: UserInstance = UserInstance.objects.get(pk=1)
|
||||
self.assertEqual(user_instance.user, self.test_user)
|
||||
|
@ -177,91 +191,118 @@ class AccountsTestCase(TestCase):
|
|||
self.assertEqual(user_instance.is_vnc, True)
|
||||
|
||||
# delete
|
||||
response = self.client.get(reverse('accounts:user_instance_delete', args=[user_instance.id]))
|
||||
response = self.client.get(
|
||||
reverse("accounts:user_instance_delete", args=[user_instance.id])
|
||||
)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
response = self.client.post(reverse('accounts:user_instance_delete', args=[user_instance.id]))
|
||||
self.assertRedirects(response, reverse('accounts:account', args=[self.test_user.id]))
|
||||
response = self.client.post(
|
||||
reverse("accounts:user_instance_delete", args=[user_instance.id])
|
||||
)
|
||||
self.assertRedirects(
|
||||
response, reverse("accounts:account", args=[self.test_user.id])
|
||||
)
|
||||
|
||||
# test 'next' redirect during deletion
|
||||
user_instance = UserInstance.objects.create(user=self.test_user, instance=self.instance)
|
||||
user_instance = UserInstance.objects.create(
|
||||
user=self.test_user, instance=self.instance
|
||||
)
|
||||
response = self.client.post(
|
||||
reverse('accounts:user_instance_delete', args=[user_instance.id]) + '?next=' + reverse('index'))
|
||||
self.assertRedirects(response, reverse('index'))
|
||||
reverse("accounts:user_instance_delete", args=[user_instance.id])
|
||||
+ "?next="
|
||||
+ reverse("index")
|
||||
)
|
||||
self.assertRedirects(response, reverse("index"))
|
||||
|
||||
def test_update_user_profile(self):
|
||||
self.client.force_login(self.test_user)
|
||||
|
||||
user = get_user_model().objects.get(username='test')
|
||||
self.assertEqual(user.first_name, '')
|
||||
self.assertEqual(user.last_name, '')
|
||||
self.assertEqual(user.email, '')
|
||||
user = get_user_model().objects.get(username="test")
|
||||
self.assertEqual(user.first_name, "")
|
||||
self.assertEqual(user.last_name, "")
|
||||
self.assertEqual(user.email, "")
|
||||
|
||||
response = self.client.post(reverse('accounts:profile'), {
|
||||
'first_name': 'first name',
|
||||
'last_name': 'last name',
|
||||
'email': 'email@mail.mail',
|
||||
})
|
||||
self.assertRedirects(response, reverse('accounts:profile'))
|
||||
response = self.client.post(
|
||||
reverse("accounts:profile"),
|
||||
{
|
||||
"first_name": "first name",
|
||||
"last_name": "last name",
|
||||
"email": "email@mail.mail",
|
||||
},
|
||||
)
|
||||
self.assertRedirects(response, reverse("accounts:profile"))
|
||||
|
||||
user = get_user_model().objects.get(username='test')
|
||||
self.assertEqual(user.first_name, 'first name')
|
||||
self.assertEqual(user.last_name, 'last name')
|
||||
self.assertEqual(user.email, 'email@mail.mail')
|
||||
user = get_user_model().objects.get(username="test")
|
||||
self.assertEqual(user.first_name, "first name")
|
||||
self.assertEqual(user.last_name, "last name")
|
||||
self.assertEqual(user.email, "email@mail.mail")
|
||||
|
||||
def test_create_delete_ssh_key(self):
|
||||
response = self.client.get(reverse('accounts:ssh_key_create'))
|
||||
response = self.client.get(reverse("accounts:ssh_key_create"))
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
response = self.client.post(reverse('accounts:ssh_key_create'), {
|
||||
'keyname': 'keyname',
|
||||
'keypublic': self.rsa_key,
|
||||
})
|
||||
self.assertRedirects(response, reverse('accounts:profile'))
|
||||
response = self.client.post(
|
||||
reverse("accounts:ssh_key_create"),
|
||||
{
|
||||
"keyname": "keyname",
|
||||
"keypublic": self.rsa_key,
|
||||
},
|
||||
)
|
||||
self.assertRedirects(response, reverse("accounts:profile"))
|
||||
|
||||
key = UserSSHKey.objects.get(pk=1)
|
||||
self.assertEqual(key.keyname, 'keyname')
|
||||
self.assertEqual(key.keyname, "keyname")
|
||||
self.assertEqual(key.keypublic, self.rsa_key)
|
||||
|
||||
response = self.client.get(reverse('accounts:ssh_key_delete', args=[1]))
|
||||
response = self.client.get(reverse("accounts:ssh_key_delete", args=[1]))
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
response = self.client.post(reverse('accounts:ssh_key_delete', args=[1]))
|
||||
self.assertRedirects(response, reverse('accounts:profile'))
|
||||
response = self.client.post(reverse("accounts:ssh_key_delete", args=[1]))
|
||||
self.assertRedirects(response, reverse("accounts:profile"))
|
||||
|
||||
def test_validate_ssh_key(self):
|
||||
self.assertFalse(validate_ssh_key(''))
|
||||
self.assertFalse(validate_ssh_key('ssh-rsa ABBA test@test'))
|
||||
self.assertFalse(validate_ssh_key('ssh-rsa AAAABwdzZGY= test@test'))
|
||||
self.assertFalse(validate_ssh_key('ssh-rsa AAA test@test'))
|
||||
self.assertFalse(validate_ssh_key(""))
|
||||
self.assertFalse(validate_ssh_key("ssh-rsa ABBA test@test"))
|
||||
self.assertFalse(validate_ssh_key("ssh-rsa AAAABwdzZGY= test@test"))
|
||||
self.assertFalse(validate_ssh_key("ssh-rsa AAA test@test"))
|
||||
# validate ecdsa key
|
||||
self.assertTrue(validate_ssh_key(self.ecdsa_key))
|
||||
|
||||
def test_forms(self):
|
||||
# raise available validation errors for maximum coverage
|
||||
form = UserSSHKeyForm({'keyname': 'keyname', 'keypublic': self.rsa_key}, user=self.test_user)
|
||||
form = UserSSHKeyForm(
|
||||
{"keyname": "keyname", "keypublic": self.rsa_key}, user=self.test_user
|
||||
)
|
||||
form.save()
|
||||
|
||||
form = UserSSHKeyForm({'keyname': 'keyname', 'keypublic': self.rsa_key}, user=self.test_user)
|
||||
form = UserSSHKeyForm(
|
||||
{"keyname": "keyname", "keypublic": self.rsa_key}, user=self.test_user
|
||||
)
|
||||
self.assertFalse(form.is_valid())
|
||||
|
||||
form = UserSSHKeyForm({'keyname': 'keyname', 'keypublic': 'invalid key'}, user=self.test_user)
|
||||
form = UserSSHKeyForm(
|
||||
{"keyname": "keyname", "keypublic": "invalid key"}, user=self.test_user
|
||||
)
|
||||
self.assertFalse(form.is_valid())
|
||||
|
||||
app_settings.ALLOW_INSTANCE_MULTIPLE_OWNER = 'False'
|
||||
form = UserInstanceForm({
|
||||
'user': self.admin_user.id,
|
||||
'instance': self.instance.id,
|
||||
'is_change': False,
|
||||
'is_delete': False,
|
||||
'is_vnc': False,
|
||||
})
|
||||
app_settings.ALLOW_INSTANCE_MULTIPLE_OWNER = "False"
|
||||
form = UserInstanceForm(
|
||||
{
|
||||
"user": self.admin_user.id,
|
||||
"instance": self.instance.id,
|
||||
"is_change": False,
|
||||
"is_delete": False,
|
||||
"is_vnc": False,
|
||||
}
|
||||
)
|
||||
form.save()
|
||||
form = UserInstanceForm({
|
||||
'user': self.test_user.id,
|
||||
'instance': self.instance.id,
|
||||
'is_change': False,
|
||||
'is_delete': False,
|
||||
'is_vnc': False,
|
||||
})
|
||||
form = UserInstanceForm(
|
||||
{
|
||||
"user": self.test_user.id,
|
||||
"instance": self.instance.id,
|
||||
"is_change": False,
|
||||
"is_delete": False,
|
||||
"is_vnc": False,
|
||||
}
|
||||
)
|
||||
self.assertFalse(form.is_valid())
|
||||
|
|
|
@ -5,29 +5,50 @@ from django_otp.forms import OTPAuthenticationForm
|
|||
|
||||
from . import views
|
||||
|
||||
app_name = 'accounts'
|
||||
app_name = "accounts"
|
||||
|
||||
urlpatterns = [
|
||||
path('logout/', LogoutView.as_view(template_name='logout.html'), name='logout'),
|
||||
path('profile/', views.profile, name='profile'),
|
||||
path('profile/<int:user_id>/', views.account, name='account'),
|
||||
path('change_password/', views.change_password, name='change_password'),
|
||||
path('user_instance/create/<int:user_id>/', views.user_instance_create, name='user_instance_create'),
|
||||
path('user_instance/<int:pk>/update/', views.user_instance_update, name='user_instance_update'),
|
||||
path('user_instance/<int:pk>/delete/', views.user_instance_delete, name='user_instance_delete'),
|
||||
path('ssh_key/create/', views.ssh_key_create, name='ssh_key_create'),
|
||||
path('ssh_key/<int:pk>/delete/', views.ssh_key_delete, name='ssh_key_delete'),
|
||||
path("logout/", LogoutView.as_view(template_name="logout.html"), name="logout"),
|
||||
path("profile/", views.profile, name="profile"),
|
||||
path("profile/<int:user_id>/", views.account, name="account"),
|
||||
path("change_password/", views.change_password, name="change_password"),
|
||||
path(
|
||||
"user_instance/create/<int:user_id>/",
|
||||
views.user_instance_create,
|
||||
name="user_instance_create",
|
||||
),
|
||||
path(
|
||||
"user_instance/<int:pk>/update/",
|
||||
views.user_instance_update,
|
||||
name="user_instance_update",
|
||||
),
|
||||
path(
|
||||
"user_instance/<int:pk>/delete/",
|
||||
views.user_instance_delete,
|
||||
name="user_instance_delete",
|
||||
),
|
||||
path("ssh_key/create/", views.ssh_key_create, name="ssh_key_create"),
|
||||
path("ssh_key/<int:pk>/delete/", views.ssh_key_delete, name="ssh_key_delete"),
|
||||
]
|
||||
|
||||
if settings.OTP_ENABLED:
|
||||
urlpatterns += [
|
||||
path(
|
||||
'login/',
|
||||
LoginView.as_view(template_name='accounts/otp_login.html', authentication_form=OTPAuthenticationForm),
|
||||
name='login',
|
||||
"login/",
|
||||
LoginView.as_view(
|
||||
template_name="accounts/otp_login.html",
|
||||
authentication_form=OTPAuthenticationForm,
|
||||
),
|
||||
name="login",
|
||||
),
|
||||
path("email_otp/", views.email_otp, name="email_otp"),
|
||||
path(
|
||||
"admin_email_otp/<int:user_id>/",
|
||||
views.admin_email_otp,
|
||||
name="admin_email_otp",
|
||||
),
|
||||
path('email_otp/', views.email_otp, name='email_otp'),
|
||||
path('admin_email_otp/<int:user_id>/', views.admin_email_otp, name='admin_email_otp'),
|
||||
]
|
||||
else:
|
||||
urlpatterns += path('login/', LoginView.as_view(template_name='login.html'), name='login'),
|
||||
urlpatterns += (
|
||||
path("login/", LoginView.as_view(template_name="login.html"), name="login"),
|
||||
)
|
||||
|
|
|
@ -172,7 +172,10 @@ def email_otp(request):
|
|||
device = get_user_totp_device(user)
|
||||
send_email_with_otp(user, device)
|
||||
|
||||
messages.success(request, _("OTP Sent to %(email)s") % {"email": form.cleaned_data["email"]})
|
||||
messages.success(
|
||||
request,
|
||||
_("OTP Sent to %(email)s") % {"email": form.cleaned_data["email"]}
|
||||
)
|
||||
return redirect("accounts:login")
|
||||
|
||||
return render(
|
||||
|
@ -191,7 +194,13 @@ def admin_email_otp(request, user_id):
|
|||
device = get_user_totp_device(user)
|
||||
if user.email != "":
|
||||
send_email_with_otp(user, device)
|
||||
messages.success(request, _("OTP QR code was emailed to user %(user)s") % {"user": user})
|
||||
messages.success(
|
||||
request,
|
||||
_("OTP QR code was emailed to user %(user)s") % {"user": user}
|
||||
)
|
||||
else:
|
||||
messages.error(request, _("User email not set, failed to send QR code"))
|
||||
messages.error(
|
||||
request,
|
||||
_("User email not set, failed to send QR code")
|
||||
)
|
||||
return redirect("accounts:account", user.id)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue