diff --git a/backend/authentication/__init__.py b/backend/authentication/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/backend/authentication/admin.py b/backend/authentication/admin.py new file mode 100644 index 0000000..557cf94 --- /dev/null +++ b/backend/authentication/admin.py @@ -0,0 +1,29 @@ +from django.contrib import admin + +from authentication.models import ToolshedUser, KnownIdentity, FriendRequestOutgoing, FriendRequestIncoming + + +class ToolshedUserAdmin(admin.ModelAdmin): + list_display = ('username', 'email', 'first_name', 'last_name', 'is_staff', 'is_active', 'date_joined', 'domain') + search_fields = ('username', 'email', 'first_name', 'last_name', 'is_staff', 'is_active', 'date_joined', 'domain') + + +class KnownIdentityAdmin(admin.ModelAdmin): + list_display = ('username', 'domain', 'public_key') + search_fields = ('username', 'domain', 'public_key') + + +class FriendRequestOutgoingAdmin(admin.ModelAdmin): + list_display = ('secret', 'befriender_user', 'befriendee_username', 'befriendee_domain') + search_fields = ('secret', 'befriender_user', 'befriendee_username', 'befriendee_domain') + + +class FriendRequestIncomingAdmin(admin.ModelAdmin): + list_display = ('secret', 'befriender_username', 'befriender_domain', 'befriendee_user', 'befriender_public_key') + search_fields = ('secret', 'befriender_username', 'befriender_domain', 'befriendee_user', 'befriender_public_key') + + +admin.site.register(ToolshedUser, ToolshedUserAdmin) +admin.site.register(KnownIdentity, KnownIdentityAdmin) +admin.site.register(FriendRequestOutgoing, FriendRequestOutgoingAdmin) +admin.site.register(FriendRequestIncoming, FriendRequestIncomingAdmin) diff --git a/backend/authentication/api.py b/backend/authentication/api.py new file mode 100644 index 0000000..a96f229 --- /dev/null +++ b/backend/authentication/api.py @@ -0,0 +1,108 @@ +from django.contrib import auth +from django.db import IntegrityError +from django.urls import path, include +from rest_framework import routers, serializers, viewsets +from rest_framework.authentication import TokenAuthentication +from rest_framework.decorators import api_view, permission_classes, authentication_classes +from rest_framework.permissions import IsAuthenticated, IsAdminUser +from rest_framework.authtoken.models import Token +from rest_framework.authtoken.views import ObtainAuthToken +from rest_framework.response import Response + +from authentication.models import ToolshedUser +from authentication.signature_auth import SignatureAuthenticationLocal + +router = routers.SimpleRouter() + + +class UserAuthToken(ObtainAuthToken): + + def post(self, request, *args, **kwargs): + try: + fullname = request.data.get('username') + username = fullname.split('@')[0] + domain = fullname.split('@')[1] + password = request.data.get('password') + user = auth.authenticate(username=username, password=password, domain=domain) + token, created = Token.objects.get_or_create(user=user) + return Response({ + 'token': token.key, + 'key': user.private_key + }) + except IndexError: + return Response({ + 'error': 'Invalid Credentials' + }, status=400) + except IntegrityError: + return Response({ + 'error': 'Invalid Credentials' + }, status=400) + + +class UserSerializer(serializers.HyperlinkedModelSerializer): + class Meta: + model = ToolshedUser + fields = ['username', 'email', 'first_name', 'last_name', 'is_staff', 'is_active', 'date_joined'] + + +class UserViewSet(viewsets.ModelViewSet): + queryset = ToolshedUser.objects.all() + serializer_class = UserSerializer + authentication_classes = [TokenAuthentication] + permission_classes = [IsAuthenticated, IsAdminUser] + + +@api_view(['GET']) +@permission_classes([IsAuthenticated]) +@authentication_classes([SignatureAuthenticationLocal]) +def getUserInfo(request): + user = request.user + return Response({ + 'username': user.username, + 'domain': user.domain, + 'email': user.email + }) + + +@api_view(['POST']) +@permission_classes([]) +@authentication_classes([]) +def registerUser(request): + username = request.data.get('username') + domain = request.data.get('domain') + password = request.data.get('password') + email = request.data.get('email') + + errors = {} + if not username: + errors['username'] = 'Username is required' + if not domain: + errors['domain'] = 'Domain is required' + if not password: + errors['password'] = 'Password is required' + if not email: + errors['email'] = 'Email is required' + if ToolshedUser.objects.filter(email=email).exists(): + errors['email'] = 'Email already exists' + if ToolshedUser.objects.filter(username=username, domain=domain).exists(): + errors['username'] = 'Username already exists' + if errors: + return Response({'errors': errors}, status=400) + + if domain in ['localhost']: + user = ToolshedUser.objects.create_user(username, email, '', domain=domain) + user.set_password(password) + user.save() + return Response({'username': user.username, 'domain': user.domain}) + else: + return Response({'errors': {'domain': 'Domain does not exist or is not open for registration'}}, status=400) + + +router.register(r'users', UserViewSet) + +urlpatterns = [ + path('', include(router.urls)), + path('user/', getUserInfo), + path('register/', registerUser), + path('token/', UserAuthToken.as_view()), +] diff --git a/backend/authentication/migrations/0001_initial.py b/backend/authentication/migrations/0001_initial.py new file mode 100644 index 0000000..53ed6dc --- /dev/null +++ b/backend/authentication/migrations/0001_initial.py @@ -0,0 +1,96 @@ +# Generated by Django 4.2.2 on 2023-06-14 23:13 + +from django.conf import settings +import django.contrib.auth.validators +from django.db import migrations, models +import django.db.models.deletion +import django.utils.timezone + + +class Migration(migrations.Migration): + + initial = True + + dependencies = [ + ('auth', '0012_alter_user_first_name_max_length'), + ] + + operations = [ + migrations.CreateModel( + name='ToolshedUser', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('password', models.CharField(max_length=128, verbose_name='password')), + ('last_login', models.DateTimeField(blank=True, null=True, verbose_name='last login')), + ('is_superuser', models.BooleanField(default=False, help_text='Designates that this user has all permissions without explicitly assigning them.', verbose_name='superuser status')), + ('username', models.CharField(error_messages={'unique': 'A user with that username already exists.'}, help_text='Required. 150 characters or fewer. Letters, digits and @/./+/-/_ only.', max_length=150, unique=True, validators=[django.contrib.auth.validators.UnicodeUsernameValidator()], verbose_name='username')), + ('first_name', models.CharField(blank=True, max_length=150, verbose_name='first name')), + ('last_name', models.CharField(blank=True, max_length=150, verbose_name='last name')), + ('is_staff', models.BooleanField(default=False, help_text='Designates whether the user can log into this admin site.', verbose_name='staff status')), + ('is_active', models.BooleanField(default=True, help_text='Designates whether this user should be treated as active. Unselect this instead of deleting accounts.', verbose_name='active')), + ('date_joined', models.DateTimeField(default=django.utils.timezone.now, verbose_name='date joined')), + ('email', models.EmailField(max_length=254, unique=True)), + ('domain', models.CharField(default='localhost', max_length=255)), + ('private_key', models.CharField(max_length=255)), + ('groups', models.ManyToManyField(blank=True, help_text='The groups this user belongs to. A user will get all permissions granted to each of their groups.', related_name='user_set', related_query_name='user', to='auth.group', verbose_name='groups')), + ], + ), + migrations.CreateModel( + name='KnownIdentity', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('username', models.CharField(max_length=255)), + ('domain', models.CharField(max_length=255)), + ('public_key', models.CharField(max_length=255)), + ('friends', models.ManyToManyField(to='authentication.knownidentity')), + ], + ), + migrations.CreateModel( + name='FriendRequestOutgoing', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('secret', models.CharField(max_length=255)), + ('befriendee_username', models.CharField(max_length=255)), + ('befriendee_domain', models.CharField(max_length=255)), + ('befriender_user', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='friend_requests_outgoing', to=settings.AUTH_USER_MODEL)), + ], + ), + migrations.CreateModel( + name='FriendRequestIncoming', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('secret', models.CharField(max_length=255)), + ('befriender_username', models.CharField(max_length=255)), + ('befriender_domain', models.CharField(max_length=255)), + ('befriender_public_key', models.CharField(max_length=255)), + ('created_at', models.DateTimeField(auto_now_add=True)), + ('befriendee_user', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='friend_requests_incoming', to=settings.AUTH_USER_MODEL)), + ], + ), + migrations.AddField( + model_name='toolsheduser', + name='public_identity', + field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='user', to='authentication.knownidentity'), + ), + migrations.AddField( + model_name='toolsheduser', + name='user_permissions', + field=models.ManyToManyField(blank=True, help_text='Specific permissions for this user.', related_name='user_set', related_query_name='user', to='auth.permission', verbose_name='user permissions'), + ), + migrations.AddIndex( + model_name='knownidentity', + index=models.Index(fields=['username', 'domain'], name='identity_idx'), + ), + migrations.AlterUniqueTogether( + name='knownidentity', + unique_together={('username', 'domain')}, + ), + migrations.AddIndex( + model_name='toolsheduser', + index=models.Index(fields=['username', 'domain'], name='user_idx'), + ), + migrations.AlterUniqueTogether( + name='toolsheduser', + unique_together={('username', 'domain'), ('email',)}, + ), + ] diff --git a/backend/authentication/migrations/__init__.py b/backend/authentication/migrations/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/backend/authentication/models.py b/backend/authentication/models.py new file mode 100644 index 0000000..7ba8b20 --- /dev/null +++ b/backend/authentication/models.py @@ -0,0 +1,114 @@ +from django.contrib import auth +from django.contrib.auth.models import AbstractUser +from django.db import models, transaction +from django.db.utils import IntegrityError +from nacl.encoding import HexEncoder +from nacl.exceptions import BadSignatureError +from nacl.signing import SigningKey, VerifyKey + + +class KnownIdentity(models.Model): + username = models.CharField(max_length=255) + domain = models.CharField(max_length=255) + public_key = models.CharField(max_length=255) + friends = models.ManyToManyField('self', symmetrical=True) + + class Meta: + unique_together = ('username', 'domain') + indexes = [ + models.Index(fields=['username', 'domain'], name='identity_idx'), + ] + + def __str__(self): + return f"{self.username}@{self.domain}" + + def is_authenticated(self): + return True + + def verify(self, message, signature): + if len(signature) != 128 or type(signature) != str: + raise TypeError('Signature must be 128 characters long and a string') + if type(message) != str: + raise TypeError('Message must be a string') + try: + VerifyKey(bytes.fromhex(self.public_key)).verify(message.encode('utf-8'), bytes.fromhex(signature)) + return True + except BadSignatureError: + return False + + +class ToolshedUserManager(auth.models.BaseUserManager): + def create_user(self, username, email, password, **extra_fields): + domain = extra_fields.pop('domain', 'localhost') + private_key_hex = extra_fields.pop('private_key', None) + if private_key_hex and type(private_key_hex) != str: + raise TypeError('Private key must be a string or no private key must be provided') + if private_key_hex and len(private_key_hex) != 64: + raise ValueError('Private key must be 64 characters long or no private key must be provided') + if private_key_hex and not all(c in '0123456789abcdef' for c in private_key_hex): + raise ValueError('Private key must be a hexadecimal string or no private key must be provided') + private_key = SigningKey(bytes.fromhex(private_key_hex)) if private_key_hex else SigningKey.generate() + public_key = SigningKey(private_key.encode()).verify_key + extra_fields['private_key'] = private_key.encode(encoder=HexEncoder).decode('utf-8') + try: + with transaction.atomic(): + extra_fields['public_identity'] = identity = KnownIdentity.objects.create( + username=username, domain=domain, public_key=public_key.encode(encoder=HexEncoder).decode('utf-8')) + except IntegrityError: + raise ValueError('Username already exists') + else: + try: + with transaction.atomic(): + user = super().create(username=username, email=email, password=password, domain=domain, + **extra_fields) + user.save() + except IntegrityError: + identity.delete() + raise ValueError('Username or email already exists') + else: + return user + + def create_superuser(self, username, email, password, **extra_fields): + user = self.create_user(username=username, email=email, password=password, **extra_fields) + user.is_staff = True + user.is_superuser = True + return user + + +class ToolshedUser(AbstractUser): + email = models.EmailField(unique=True) + domain = models.CharField(max_length=255, default='localhost') + private_key = models.CharField(max_length=255) + public_identity = models.ForeignKey(KnownIdentity, on_delete=models.CASCADE, related_name='user') + objects = ToolshedUserManager() + + class Meta: + unique_together = [('username', 'domain'), ['email']] + indexes = [ + models.Index(fields=['username', 'domain'], name='user_idx'), + ] + + def __str__(self): + return f"{self.username}@{self.domain}" + + def sign(self, message): + if type(message) != str: + raise TypeError('Message must be a string') + private_key = SigningKey(self.private_key.encode(), encoder=HexEncoder) + return private_key.sign(message.encode('utf-8'), encoder=HexEncoder).signature.decode('utf-8') + + +class FriendRequestOutgoing(models.Model): + secret = models.CharField(max_length=255) + befriender_user = models.ForeignKey(ToolshedUser, on_delete=models.CASCADE, related_name='friend_requests_outgoing') + befriendee_username = models.CharField(max_length=255) + befriendee_domain = models.CharField(max_length=255) + + +class FriendRequestIncoming(models.Model): + secret = models.CharField(max_length=255) + befriender_username = models.CharField(max_length=255) + befriender_domain = models.CharField(max_length=255) + befriender_public_key = models.CharField(max_length=255) + befriendee_user = models.ForeignKey(ToolshedUser, on_delete=models.CASCADE, related_name='friend_requests_incoming') + created_at = models.DateTimeField(auto_now_add=True) diff --git a/backend/authentication/signature_auth.py b/backend/authentication/signature_auth.py new file mode 100644 index 0000000..b46fc11 --- /dev/null +++ b/backend/authentication/signature_auth.py @@ -0,0 +1,68 @@ +from rest_framework import authentication +from authentication.models import ToolshedUser + + +def split_userhandle_or_throw(userhandle): + if '@' not in userhandle: + raise ValueError('Userhandle must be in the format username@domain') + username, domain = userhandle.split('@') + if not username: + raise ValueError('Username cannot be empty') + if not domain: + raise ValueError('Domain cannot be empty') + return username, domain + + +def verify_request(request, raw_request_body): + authentication_header = request.META.get('HTTP_AUTHORIZATION') + + if not authentication_header: + raise ValueError('No authentication header provided') + + if not authentication_header.startswith('Signature '): + raise ValueError('Authorization header must be in the format "Signature author@domain:signature_hex[128]"') + + signature = authentication_header.split('Signature ')[1] + + if ':' not in signature: + raise ValueError('Authorization header must be in the format "Signature author@domain:signature_hex[128]"') + + author = signature.split(':')[0] + signature_bytes_hex = signature.split(':')[1] + + if not author or not signature_bytes_hex or len(signature_bytes_hex) != 128: + raise ValueError('Authorization header must be in the format "Signature author@domain:signature_hex[128]"') + + username, domain = split_userhandle_or_throw(author) + + signed_data = request.build_absolute_uri() + + if request.method == 'POST': + signed_data += raw_request_body + elif request.method == 'PUT': + signed_data += raw_request_body + elif request.method == 'PATCH': + signed_data += raw_request_body + + return username, domain, signed_data, signature_bytes_hex + + +def authenticate_request_against_local_users(request, raw_request_body): + try: + username, domain, signed_data, signature_bytes_hex = verify_request(request, raw_request_body) + except ValueError: + return None + try: + author_user = ToolshedUser.objects.get(username=username, domain=domain) + except ToolshedUser.DoesNotExist: + return None + if author_user.public_identity.verify(signed_data, signature_bytes_hex): + return author_user + else: + return None + + +class SignatureAuthenticationLocal(authentication.BaseAuthentication): + def authenticate(self, request): + return authenticate_request_against_local_users( + request, request.body.decode('utf-8')), None diff --git a/backend/authentication/tests/__init__.py b/backend/authentication/tests/__init__.py new file mode 100644 index 0000000..565dc95 --- /dev/null +++ b/backend/authentication/tests/__init__.py @@ -0,0 +1 @@ +from .helpers import * \ No newline at end of file diff --git a/backend/authentication/tests/helpers.py b/backend/authentication/tests/helpers.py new file mode 100644 index 0000000..61b1c74 --- /dev/null +++ b/backend/authentication/tests/helpers.py @@ -0,0 +1,84 @@ +import json + +from django.test import TestCase, Client +from nacl.encoding import HexEncoder + +from authentication.models import ToolshedUser, KnownIdentity +from nacl.signing import SigningKey + + +class DummyExternalUser: + def __init__(self, username, domain, known=True): + self.username = username + self.domain = domain + self.__signing_key = SigningKey.generate() + self.public_identity, _ = KnownIdentity.objects.get_or_create( + username=username, + domain=domain, + public_key=self.public_key()) if known else None + + def __str__(self): + return self.username + '@' + self.domain + + def sign(self, message): + return self.__signing_key.sign(message.encode('utf-8'), encoder=HexEncoder).signature.decode('utf-8') + + def public_key(self): + return self.__signing_key.verify_key.encode(encoder=HexEncoder).decode('utf-8') + + @property + def friends(self): + return self.public_identity.friends + + +class SignatureAuthClient: + base = Client(SERVER_NAME='testserver') + + def get(self, target, user, **kwargs): + signature = user.sign("http://testserver" + target) + header = {'HTTP_AUTHORIZATION': 'Signature ' + str(user) + ':' + signature} + return self.base.get(target, **header, **kwargs) + + def post(self, target, user, data, **kwargs): + json_data = json.dumps(data, separators=(',', ':')) + signature = user.sign("http://testserver" + target + json_data) + header = {'HTTP_AUTHORIZATION': 'Signature ' + str(user) + ':' + signature} + return self.base.post(target, json_data, **header, content_type='application/json', **kwargs) + + def put(self, target, user, data, **kwargs): + json_data = json.dumps(data, separators=(',', ':')) + signature = user.sign("http://testserver" + target + json_data) + header = {'HTTP_AUTHORIZATION': 'Signature ' + str(user) + ':' + signature} + return self.base.put(target, json_data, **header, content_type='application/json', **kwargs) + + def patch(self, target, user, data, **kwargs): + json_data = json.dumps(data, separators=(',', ':')) + signature = user.sign("http://testserver" + target + json_data) + header = {'HTTP_AUTHORIZATION': 'Signature ' + str(user) + ':' + signature} + return self.base.patch(target, json_data, **header, content_type='application/json', **kwargs) + + def delete(self, target, user, **kwargs): + signature = user.sign("http://testserver" + target) + header = {'HTTP_AUTHORIZATION': 'Signature ' + str(user) + ':' + signature} + return self.base.delete(target, **header, **kwargs) + + +class UserTestCase(TestCase): + ext_user1 = None + ext_user2 = None + local_user1 = None + local_user2 = None + + def setUp(self): + admin = ToolshedUser.objects.create_superuser('admin', 'admin@localhost', '') + admin.set_password('testpassword') + admin.save() + example_com = type('obj', (object,), {'name': 'example.com'}) + self.local_user1 = ToolshedUser.objects.create_user('testuser', 'test@abc.de', '', domain=example_com.name) + self.local_user1.set_password('testpassword2') + self.local_user1.save() + self.local_user2 = ToolshedUser.objects.create_user('testuser2', 'test2@abc.de', '', domain=example_com.name) + self.local_user2.set_password('testpassword3') + self.local_user2.save() + self.ext_user1 = DummyExternalUser('extuser1', 'external.org') + self.ext_user2 = DummyExternalUser('extuser2', 'external.org') diff --git a/backend/authentication/tests/test_auth.py b/backend/authentication/tests/test_auth.py new file mode 100644 index 0000000..bfca93b --- /dev/null +++ b/backend/authentication/tests/test_auth.py @@ -0,0 +1,379 @@ +from django.test import TestCase, Client +from nacl.encoding import HexEncoder +from nacl.signing import SigningKey + +from authentication.models import ToolshedUser, KnownIdentity +from authentication.tests import UserTestCase, SignatureAuthClient + + +class KnownIdentityTestCase(TestCase): + key = None + + def setUp(self): + self.key = SigningKey.generate() + KnownIdentity.objects.create(username="testuser", domain='external.com', + public_key=self.key.verify_key.encode(encoder=HexEncoder).decode('utf-8')) + + def test_known_identity(self): + identity = KnownIdentity.objects.get(username="testuser", domain='external.com') + self.assertEqual(identity.username, "testuser") + self.assertEqual(identity.domain, "external.com") + self.assertEqual(identity.public_key, self.key.verify_key.encode(encoder=HexEncoder).decode('utf-8')) + self.assertEqual(str(identity), "testuser@external.com") + self.assertTrue(identity.is_authenticated()) + + def test_known_identity_verify(self): + identity = KnownIdentity.objects.get(username="testuser", domain='external.com') + message = "Hello world, this is a test message." + signed = self.key.sign(message.encode('utf-8'), encoder=HexEncoder).signature + self.assertTrue(identity.verify(message, signed.decode('utf-8'))) + + def test_known_identity_verify_fail(self): + identity = KnownIdentity.objects.get(username="testuser", domain='external.com') + message = "Hello world, this is a test message." + signed = self.key.sign(message.encode('utf-8'), encoder=HexEncoder).signature + self.assertFalse(identity.verify(message + "x", signed.decode('utf-8'))) + + def test_known_identity_verify_fail2(self): + identity = KnownIdentity.objects.get(username="testuser", domain='external.com') + message = "Hello world, this is a test message." + signed = self.key.sign(message.encode('utf-8'), encoder=HexEncoder).signature + with self.assertRaises(TypeError): + identity.verify(message.encode('utf-8'), signed.decode('utf-8')) + + def test_known_identity_verify_fail3(self): + identity = KnownIdentity.objects.get(username="testuser", domain='external.com') + message = "Hello world, this is a test message." + signed = self.key.sign(message.encode('utf-8'), encoder=HexEncoder).signature + with self.assertRaises(TypeError): + identity.verify(message, signed) + + def test_known_identity_verify_fail4(self): + identity = KnownIdentity.objects.get(username="testuser", domain='external.com') + message = "Hello world, this is a test message." + signed = self.key.sign(message.encode('utf-8'), encoder=HexEncoder).signature + with self.assertRaises(TypeError): + identity.verify(message, bytes.fromhex(signed.decode('utf-8'))) + + +class UserModelTestCase(UserTestCase): + def setUp(self): + super().setUp() + + def test_admin(self): + user = ToolshedUser.objects.get(username='admin') + self.assertTrue(user.is_superuser) + self.assertTrue(user.is_staff) + self.assertTrue(user.is_active) + self.assertEqual(user.domain, 'localhost') + self.assertEqual(user.email, 'admin@localhost') + self.assertEqual(user.username, 'admin') + + def test_user(self): + user = ToolshedUser.objects.get(username='testuser') + self.assertFalse(user.is_superuser) + self.assertFalse(user.is_staff) + self.assertTrue(user.is_active) + self.assertEqual(user.domain, 'example.com') + self.assertEqual(user.email, 'test@abc.de') + self.assertEqual(user.username, 'testuser') + self.assertEqual(len(user.private_key), 64) + self.assertEqual(type(user.public_identity), KnownIdentity) + self.assertEqual(user.public_identity.domain, 'example.com') + self.assertEqual(user.public_identity.username, 'testuser') + self.assertEqual(len(user.public_identity.public_key), 64) + + def test_create_existing_user(self): + with self.assertRaises(ValueError): + ToolshedUser.objects.create_user('testuser', 'test3@abc.de', '', domain='localhost') + + def test_create_existing_user2(self): + key = SigningKey.generate() + KnownIdentity.objects.create(username="testuser3", domain='localhost', + public_key=key.verify_key.encode(encoder=HexEncoder).decode('utf-8')) + with self.assertRaises(ValueError): + ToolshedUser.objects.create_user('testuser3', 'test3@abc.de', '', domain='localhost') + + def test_create_reuse_email(self): + with self.assertRaises(ValueError): + ToolshedUser.objects.create_user('testuser3', 'test@abc.de', '', domain='localhost') + + def test_create_user_invalid_private_key(self): + with self.assertRaises(TypeError): + ToolshedUser.objects.create_user('testuser3', 'test3@abc.de', '', domain='localhost', + private_key=b'0123456789abcdef0123456789abcdef') + with self.assertRaises(ValueError): + ToolshedUser.objects.create_user('testuser3', 'test3@abc.de', '', domain='localhost', + private_key='7005c4097') + with self.assertRaises(ValueError): + ToolshedUser.objects.create_user('testuser3', 'test3@abc.de', '', domain='localhost', + private_key='0123456789abcdef0123456789abcdef' + 'Z123456789abcdef0123456789abcdef') + + def test_signature(self): + user = self.local_user1 + message = 'some message' + signature = user.sign(message) + self.assertEqual(len(signature), 128) + self.assertTrue(user.public_identity.verify(message, signature)) + + def test_signature_fail(self): + user = self.local_user1 + message = 'some message' + signature = user.sign(message) + self.assertFalse(user.public_identity.verify(message + 'x', signature)) + + def test_signature_fail2(self): + user = self.local_user1 + message = 'some message' + signature = user.sign(message) + signature = signature[:-2] + 'ee' + self.assertFalse(user.public_identity.verify(message, signature)) + + def test_signature_fail3(self): + user1 = self.local_user1 + user2 = self.local_user2 + message = 'some message' + signature = user1.sign(message) + self.assertFalse(user2.public_identity.verify(message, signature)) + + def test_signature_fail4(self): + user = self.local_user1 + message = 'some message' + with self.assertRaises(TypeError): + user.sign(message.encode('utf-8')) + + +class UserApiTestCase(UserTestCase): + + def setUp(self): + super().setUp() + self.anonymous_client = Client(SERVER_NAME='testserver') + self.client = SignatureAuthClient() + + def test_user_info(self): + reply = self.client.get('/auth/user/', self.local_user1) + self.assertEqual(reply.status_code, 200) + self.assertEqual(reply.json()['username'], 'testuser') + self.assertEqual(reply.json()['domain'], 'example.com') + self.assertEqual(reply.json()['email'], 'test@abc.de') + + def test_user_info2(self): + target = "/auth/user/" + signature = self.local_user1.sign("http://testserver" + target) + header = {'HTTP_AUTHORIZATION': 'Signature ' + str(self.local_user1) + ':' + signature} + reply = self.anonymous_client.get(target, **header) + self.assertEqual(reply.status_code, 200) + self.assertEqual(reply.json()['username'], 'testuser') + self.assertEqual(reply.json()['domain'], 'example.com') + + def test_user_info_fail(self): + reply = self.anonymous_client.get('/auth/user/') + self.assertEqual(reply.status_code, 403) + + def test_user_info_fail2(self): + reply = self.client.get('/auth/user/', self.ext_user1) + self.assertEqual(reply.status_code, 403) + + def test_user_info_fail3(self): + target = "/auth/user/" + signature = self.local_user1.sign("http://testserver2" + target) + header = {'HTTP_AUTHORIZATION': 'Signature ' + str(self.local_user1) + ':' + signature} + reply = self.anonymous_client.get(target, **header) + self.assertEqual(reply.status_code, 403) + + def test_user_info_fail4(self): + target = "/auth/user/" + signature = self.local_user1.sign("http://testserver" + target) + header = {'HTTP_AUTHORIZATION': 'Auth ' + str(self.local_user1) + ':' + signature} + reply = self.anonymous_client.get(target, **header) + self.assertEqual(reply.status_code, 403) + + def test_user_info_fail5(self): + target = "/auth/user/" + signature = self.local_user1.sign("http://testserver" + target) + header = {'HTTP_AUTHORIZATION': 'Signature ' + str(self.local_user1)} + reply = self.anonymous_client.get(target, **header) + self.assertEqual(reply.status_code, 403) + + def test_user_info_fail6(self): + target = "/auth/user/" + signature = self.local_user1.sign("http://testserver" + target) + header = {'HTTP_AUTHORIZATION': 'Signature ' + str(self.local_user1) + ':' + signature + 'f'} + reply = self.anonymous_client.get(target, **header) + self.assertEqual(reply.status_code, 403) + + def test_user_info_fail7(self): + target = "/auth/user/" + signature = self.local_user1.sign("http://testserver" + target) + header = {'HTTP_AUTHORIZATION': 'Signature ' + self.local_user1.username + ':' + signature} + reply = self.anonymous_client.get(target, **header) + self.assertEqual(reply.status_code, 403) + + def test_user_info_fail8(self): + target = "/auth/user/" + signature = self.local_user1.sign("http://testserver" + target) + header = {'HTTP_AUTHORIZATION': 'Signature ' + self.local_user1.username + '@:' + signature} + reply = self.anonymous_client.get(target, **header) + self.assertEqual(reply.status_code, 403) + + def test_user_info_fail9(self): + target = "/auth/user/" + signature = self.local_user1.sign("http://testserver" + target) + header = {'HTTP_AUTHORIZATION': 'Signature @' + self.local_user1.domain + ':' + signature} + reply = self.anonymous_client.get(target, **header) + self.assertEqual(reply.status_code, 403) + + +class LoginApiTestCase(UserTestCase): + user = None + client = Client(SERVER_NAME='testserver') + + def setUp(self): + super().setUp() + self.user = self.local_user1 + + def test_login(self): + reply = self.client.post('/auth/token/', + {'username': self.user.username + '@' + self.user.domain, 'password': 'testpassword2'}) + self.assertEqual(reply.status_code, 200) + self.assertTrue('token' in reply.json()) + self.assertTrue(len(reply.json()['token']) == 40) + self.assertTrue('key' in reply.json()) + self.assertEqual(len(reply.json()['key']), 64) + + def test_login_fail(self): + reply = self.client.post('/auth/token/', + {'username': self.user.username + '@' + self.user.domain, 'password': 'testpassword3'}) + self.assertEqual(reply.status_code, 400) + self.assertTrue('token' not in reply.json()) + self.assertTrue('error' in reply.json()) + + def test_login_fail2(self): + reply = self.client.post('/auth/token/', + {'username': self.user.username, 'password': 'testpassword2'}) + self.assertEqual(reply.status_code, 400) + self.assertTrue('token' not in reply.json()) + self.assertTrue('error' in reply.json()) + + +class RegistrationApiTestCase(TestCase): + client = Client(SERVER_NAME='testserver') + + def setUp(self): + admin = ToolshedUser.objects.create_superuser('admin', 'admin@localhost', '') + admin.set_password('testpassword') + admin.save() + example_com = type('obj', (object,), {'name': 'localhost'}) + user2 = ToolshedUser.objects.create_user('testuser2', 'test2@abc.de', '', domain=example_com.name) + user2.set_password('testpassword3') + user2.save() + + def test_registration(self): + self.assertEqual(ToolshedUser.objects.all().count(), 2) + reply = self.client.post('/auth/register/', + {'username': 'testuser', 'password': 'testpassword2', 'domain': 'localhost', + 'email': 'test@abc.de'}) + self.assertEqual(reply.status_code, 200) + self.assertTrue('username' in reply.json()) + self.assertTrue('domain' in reply.json()) + user = ToolshedUser.objects.get(username='testuser') + self.assertEqual(user.email, 'test@abc.de') + self.assertEqual(user.domain, 'localhost') + self.assertTrue(user.check_password('testpassword2')) + self.assertEqual(ToolshedUser.objects.all().count(), 3) + + def test_registration_fail(self): + reply = self.client.post('/auth/register/', + {'username': '', 'password': 'testpassword2', 'domain': 'localhost', + 'email': 'test@abc.de'}) + self.assertEqual(reply.status_code, 400) + self.assertTrue('errors' in reply.json()) + self.assertTrue('username' in reply.json()['errors']) + self.assertEqual(ToolshedUser.objects.all().count(), 2) + + def test_registration_fail2(self): + reply = self.client.post('/auth/register/', + {'password': 'testpassword2', 'domain': 'localhost', 'email': 'test@abc.de'}) + self.assertEqual(reply.status_code, 400) + self.assertTrue('errors' in reply.json()) + self.assertTrue('username' in reply.json()['errors']) + self.assertEqual(ToolshedUser.objects.all().count(), 2) + + def test_registration_fail3(self): + reply = self.client.post('/auth/register/', + {'username': 'testuser', 'password': '', 'domain': 'localhost', + 'email': 'test@abc.de'}) + self.assertEqual(reply.status_code, 400) + self.assertTrue('errors' in reply.json()) + self.assertTrue('password' in reply.json()['errors']) + self.assertEqual(ToolshedUser.objects.all().count(), 2) + + def test_registration_fail4(self): + reply = self.client.post('/auth/register/', + {'username': 'testuser', 'domain': 'localhost', 'email': 'test@abc.de'}) + self.assertEqual(reply.status_code, 400) + self.assertTrue('errors' in reply.json()) + self.assertTrue('password' in reply.json()['errors']) + self.assertEqual(ToolshedUser.objects.all().count(), 2) + + def test_registration_fail5(self): + reply = self.client.post('/auth/register/', + {'username': 'testuser', 'password': 'testpassword2', 'domain': '', + 'email': 'test@abc.de'}) + self.assertEqual(reply.status_code, 400) + self.assertTrue('errors' in reply.json()) + self.assertTrue('domain' in reply.json()['errors']) + self.assertEqual(ToolshedUser.objects.all().count(), 2) + + def test_registration_fail6(self): + reply = self.client.post('/auth/register/', + {'username': 'testuser', 'password': 'testpassword2', 'email': 'test@abc.de'}) + self.assertEqual(reply.status_code, 400) + self.assertTrue('errors' in reply.json()) + self.assertTrue('domain' in reply.json()['errors']) + self.assertEqual(ToolshedUser.objects.all().count(), 2) + + def test_registration_fail7(self): + reply = self.client.post('/auth/register/', + {'username': 'testuser', 'password': 'testpassword2', 'domain': 'localhost', + 'email': ''}) + self.assertEqual(reply.status_code, 400) + self.assertTrue('errors' in reply.json()) + self.assertTrue('email' in reply.json()['errors']) + self.assertEqual(ToolshedUser.objects.all().count(), 2) + + def test_registration_fail8(self): + reply = self.client.post('/auth/register/', + {'username': 'testuser', 'password': 'testpassword2', 'domain': 'localhost'}) + self.assertEqual(reply.status_code, 400) + self.assertTrue('errors' in reply.json()) + self.assertTrue('email' in reply.json()['errors']) + self.assertEqual(ToolshedUser.objects.all().count(), 2) + + def test_registration_existing_user(self): + reply = self.client.post('/auth/register/', + {'username': 'testuser2', 'password': 'testpassword2', 'domain': 'localhost', + 'email': 'test3@abc.de'}) + self.assertEqual(reply.status_code, 400) + self.assertTrue('errors' in reply.json()) + # TODO: check for sensible error message + self.assertEqual(ToolshedUser.objects.all().count(), 2) + + def test_registration_foreign_domain(self): + reply = self.client.post('/auth/register/', + {'username': 'testuser', 'password': 'testpassword2', 'domain': 'example.org', + 'email': 'test@abc.de'}) + self.assertEqual(reply.status_code, 400) + self.assertTrue('errors' in reply.json()) + self.assertTrue('domain' in reply.json()['errors']) + self.assertEqual(ToolshedUser.objects.all().count(), 2) + + def test_registration_reuse_email(self): + reply = self.client.post('/auth/register/', + {'username': 'testuser', 'password': 'testpassword2', 'domain': 'localhost', + 'email': 'test2@abc.de'}) + self.assertEqual(reply.status_code, 400) + self.assertTrue('errors' in reply.json()) + self.assertTrue('email' in reply.json()['errors']) + self.assertEqual(ToolshedUser.objects.all().count(), 2) diff --git a/backend/backend/settings.py b/backend/backend/settings.py index 308fa77..fd888c2 100644 --- a/backend/backend/settings.py +++ b/backend/backend/settings.py @@ -46,6 +46,7 @@ INSTALLED_APPS = [ 'rest_framework.authtoken', 'corsheaders', 'drf_yasg', + 'authentication', ] REST_FRAMEWORK = { @@ -109,6 +110,7 @@ DATABASES = { } } +AUTH_USER_MODEL = 'authentication.ToolshedUser' # Password validation # https://docs.djangoproject.com/en/4.1/ref/settings/#auth-password-validators diff --git a/backend/backend/urls.py b/backend/backend/urls.py index fbcab3d..649a816 100644 --- a/backend/backend/urls.py +++ b/backend/backend/urls.py @@ -30,5 +30,6 @@ schema_view = get_schema_view( urlpatterns = [ path('djangoadmin/', admin.site.urls), + path('auth/', include('authentication.api')), path('docs/', schema_view.with_ui('swagger', cache_timeout=0), name='api-docs'), ]