refactor user model and tests in authentication module
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
j3d1 2023-07-07 19:10:07 +02:00
parent 24d8edec19
commit 3afeed81b3
4 changed files with 48 additions and 60 deletions

View file

@ -93,9 +93,7 @@ def registerUser(request):
Domain.objects.get(name=domain, open_registration=True) Domain.objects.get(name=domain, open_registration=True)
user = ToolshedUser.objects.create_user(username, email, '', domain=domain) user = ToolshedUser.objects.create_user(username, email, password, domain=domain)
user.set_password(password)
user.save()
return Response({'username': user.username, 'domain': user.domain}) return Response({'username': user.username, 'domain': user.domain})
except Domain.DoesNotExist: except Domain.DoesNotExist:
return Response({'errors': {'domain': 'Domain does not exist or is not open for registration'}}, status=400) return Response({'errors': {'domain': 'Domain does not exist or is not open for registration'}}, status=400)

View file

@ -52,26 +52,27 @@ class ToolshedUserManager(auth.models.BaseUserManager):
extra_fields['private_key'] = private_key.encode(encoder=HexEncoder).decode('utf-8') extra_fields['private_key'] = private_key.encode(encoder=HexEncoder).decode('utf-8')
try: try:
with transaction.atomic(): with transaction.atomic():
extra_fields['public_identity'] = identity = KnownIdentity.objects.create( extra_fields['public_identity'] = identity = KnownIdentity.objects.get_or_create(
username=username, domain=domain, public_key=public_key.encode(encoder=HexEncoder).decode('utf-8')) username=username, domain=domain, public_key=public_key.encode(encoder=HexEncoder).decode('utf-8'))[0]
try:
with transaction.atomic():
user = super().create(username=username, email=email, password=password, domain=domain,
**extra_fields)
user.set_password(password)
user.save()
except IntegrityError:
identity.delete()
raise ValueError('Username or email already exists')
else:
return user
except IntegrityError: except IntegrityError:
raise ValueError('Username already exists') raise ValueError('Username already exists')
else:
try:
with transaction.atomic():
user = super().create(username=username, email=email, password=password, domain=domain,
**extra_fields)
user.save()
except IntegrityError:
identity.delete()
raise ValueError('Username or email already exists')
else:
return user
def create_superuser(self, username, email, password, **extra_fields): def create_superuser(self, username, email, password, **extra_fields):
user = self.create_user(username=username, email=email, password=password, **extra_fields) user = self.create_user(username, email, password, **extra_fields)
user.is_staff = True user.is_staff = True
user.is_superuser = True user.is_superuser = True
user.save()
return user return user

View file

@ -35,42 +35,39 @@ class DummyExternalUser:
class SignatureAuthClient: class SignatureAuthClient:
base = Client(SERVER_NAME='testserver') base = Client(SERVER_NAME='testserver')
def get(self, target, user, **kwargs): def __init__(self, **kwargs):
self.user = kwargs.get('user', None)
self.header_prefix = kwargs.get('header_prefix', 'Signature ')
self.bad_signature = kwargs.get('bad_signature', False)
def build_header(self, method, target, user, payload=None):
user = user if user is not None else self.user
if user is None: if user is None:
raise ValueError("User must not be None") raise ValueError("User must not be None")
signature = user.sign("http://testserver" + target) payload_json = json.dumps(payload, separators=(',', ':')) if payload is not None else ''
header = {'HTTP_AUTHORIZATION': 'Signature ' + str(user) + ':' + signature} signature = user.sign(
"http://testserver" + target + (payload_json if not self.bad_signature else payload_json[:-2] + "ff"))
return {'HTTP_AUTHORIZATION': self.header_prefix + str(user) + ':' + signature,
'content_type': 'application/json'}, payload_json
def get(self, target, user=None, **kwargs):
header, payload = self.build_header('GET', target, user)
return self.base.get(target, **header, **kwargs) return self.base.get(target, **header, **kwargs)
def post(self, target, user, data, **kwargs): def post(self, target, user=None, data=None, **kwargs):
if user is None: header, payload = self.build_header('POST', target, user, data)
raise ValueError("User must not be None") return self.base.post(target, payload, **header, **kwargs)
json_data = json.dumps(data, separators=(',', ':'))
signature = user.sign("http://testserver" + target + json_data)
header = {'HTTP_AUTHORIZATION': 'Signature ' + str(user) + ':' + signature}
return self.base.post(target, json_data, **header, content_type='application/json', **kwargs)
def put(self, target, user, data, **kwargs): def put(self, target, user=None, data=None, **kwargs):
if user is None: header, payload = self.build_header('PUT', target, user, data)
raise ValueError("User must not be None") return self.base.put(target, payload, **header, **kwargs)
json_data = json.dumps(data, separators=(',', ':'))
signature = user.sign("http://testserver" + target + json_data)
header = {'HTTP_AUTHORIZATION': 'Signature ' + str(user) + ':' + signature}
return self.base.put(target, json_data, **header, content_type='application/json', **kwargs)
def patch(self, target, user, data, **kwargs): def patch(self, target, user=None, data=None, **kwargs):
if user is None: header, payload = self.build_header('PATCH', target, user, data)
raise ValueError("User must not be None") return self.base.patch(target, payload, **header, **kwargs)
json_data = json.dumps(data, separators=(',', ':'))
signature = user.sign("http://testserver" + target + json_data)
header = {'HTTP_AUTHORIZATION': 'Signature ' + str(user) + ':' + signature}
return self.base.patch(target, json_data, **header, content_type='application/json', **kwargs)
def delete(self, target, user, **kwargs): def delete(self, target, user=None, **kwargs):
if user is None: header, payload = self.build_header('DELETE', target, user)
raise ValueError("User must not be None")
signature = user.sign("http://testserver" + target)
header = {'HTTP_AUTHORIZATION': 'Signature ' + str(user) + ':' + signature}
return self.base.delete(target, **header, **kwargs) return self.base.delete(target, **header, **kwargs)
@ -81,18 +78,11 @@ class ToolshedTestCase(TestCase):
class UserTestMixin: class UserTestMixin:
def prepare_users(self): def prepare_users(self):
self.f['admin'] = ToolshedUser.objects.create_superuser('testadmin', 'testadmin@localhost', '') self.f['admin'] = ToolshedUser.objects.create_superuser('testadmin', 'testadmin@localhost', 'testpassword')
self.f['admin'].set_password('testpassword')
self.f['admin'].save()
self.f['example_com'] = Domain.objects.create(name='example.com', owner=self.f['admin'], open_registration=True) self.f['example_com'] = Domain.objects.create(name='example.com', owner=self.f['admin'], open_registration=True)
self.f['example_com'].save() self.f['local_user1'] = ToolshedUser.objects.create_user('testuser1', 'test1@abc.de', 'testpassword2',
self.f['local_user1'] = ToolshedUser.objects.create_user('testuser1', 'test1@abc.de', '',
domain=self.f['example_com'].name) domain=self.f['example_com'].name)
self.f['local_user1'].set_password('testpassword2') self.f['local_user2'] = ToolshedUser.objects.create_user('testuser2', 'test2@abc.de', 'testpassword3',
self.f['local_user1'].save()
self.f['local_user2'] = ToolshedUser.objects.create_user('testuser2', 'test2@abc.de', '',
domain=self.f['example_com'].name) domain=self.f['example_com'].name)
self.f['local_user2'].set_password('testpassword3')
self.f['local_user2'].save()
self.f['ext_user1'] = DummyExternalUser('extuser1', 'external.org') self.f['ext_user1'] = DummyExternalUser('extuser1', 'external.org')
self.f['ext_user2'] = DummyExternalUser('extuser2', 'external.org') self.f['ext_user2'] = DummyExternalUser('extuser2', 'external.org')

View file

@ -6,7 +6,6 @@ from nacl.signing import SigningKey
from authentication.models import ToolshedUser, KnownIdentity from authentication.models import ToolshedUser, KnownIdentity
from authentication.tests import UserTestMixin, SignatureAuthClient, DummyExternalUser, ToolshedTestCase from authentication.tests import UserTestMixin, SignatureAuthClient, DummyExternalUser, ToolshedTestCase
from hostadmin.models import Domain
class AuthorizationTestCase(ToolshedTestCase): class AuthorizationTestCase(ToolshedTestCase):
@ -168,18 +167,18 @@ class UserModelTestCase(UserTestMixin, ToolshedTestCase):
def test_create_existing_user(self): def test_create_existing_user(self):
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
ToolshedUser.objects.create_user('testuser1', 'test3@abc.de', '', domain='example.com') ToolshedUser.objects.create_user('testuser1', 'test3@abc.de', 'testpassword', domain='example.com')
def test_create_existing_user2(self): def test_create_existing_user2(self):
key = SigningKey.generate() key = SigningKey.generate()
KnownIdentity.objects.create(username="testuser3", domain='localhost', KnownIdentity.objects.create(username="testuser3", domain='localhost',
public_key=key.verify_key.encode(encoder=HexEncoder).decode('utf-8')) public_key=key.verify_key.encode(encoder=HexEncoder).decode('utf-8'))
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
ToolshedUser.objects.create_user('testuser3', 'test3@abc.de', '', domain='localhost') ToolshedUser.objects.create_user('testuser3', 'test3@abc.de', 'testpassword', domain='localhost')
def test_create_reuse_email(self): def test_create_reuse_email(self):
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
ToolshedUser.objects.create_user('testuser3', 'test1@abc.de', '', domain='example.com') ToolshedUser.objects.create_user('testuser3', 'test1@abc.de', 'testpassword', domain='example.com')
def test_create_user_invalid_private_key(self): def test_create_user_invalid_private_key(self):
with self.assertRaises(TypeError): with self.assertRaises(TypeError):