add authentication app

This commit is contained in:
j3d1 2023-06-15 01:16:52 +02:00
parent 5f68e12f82
commit 12191369b7
12 changed files with 882 additions and 0 deletions

View file

View file

@ -0,0 +1,29 @@
from django.contrib import admin
from authentication.models import ToolshedUser, KnownIdentity, FriendRequestOutgoing, FriendRequestIncoming
class ToolshedUserAdmin(admin.ModelAdmin):
list_display = ('username', 'email', 'first_name', 'last_name', 'is_staff', 'is_active', 'date_joined', 'domain')
search_fields = ('username', 'email', 'first_name', 'last_name', 'is_staff', 'is_active', 'date_joined', 'domain')
class KnownIdentityAdmin(admin.ModelAdmin):
list_display = ('username', 'domain', 'public_key')
search_fields = ('username', 'domain', 'public_key')
class FriendRequestOutgoingAdmin(admin.ModelAdmin):
list_display = ('secret', 'befriender_user', 'befriendee_username', 'befriendee_domain')
search_fields = ('secret', 'befriender_user', 'befriendee_username', 'befriendee_domain')
class FriendRequestIncomingAdmin(admin.ModelAdmin):
list_display = ('secret', 'befriender_username', 'befriender_domain', 'befriendee_user', 'befriender_public_key')
search_fields = ('secret', 'befriender_username', 'befriender_domain', 'befriendee_user', 'befriender_public_key')
admin.site.register(ToolshedUser, ToolshedUserAdmin)
admin.site.register(KnownIdentity, KnownIdentityAdmin)
admin.site.register(FriendRequestOutgoing, FriendRequestOutgoingAdmin)
admin.site.register(FriendRequestIncoming, FriendRequestIncomingAdmin)

View file

@ -0,0 +1,108 @@
from django.contrib import auth
from django.db import IntegrityError
from django.urls import path, include
from rest_framework import routers, serializers, viewsets
from rest_framework.authentication import TokenAuthentication
from rest_framework.decorators import api_view, permission_classes, authentication_classes
from rest_framework.permissions import IsAuthenticated, IsAdminUser
from rest_framework.authtoken.models import Token
from rest_framework.authtoken.views import ObtainAuthToken
from rest_framework.response import Response
from authentication.models import ToolshedUser
from authentication.signature_auth import SignatureAuthenticationLocal
router = routers.SimpleRouter()
class UserAuthToken(ObtainAuthToken):
def post(self, request, *args, **kwargs):
try:
fullname = request.data.get('username')
username = fullname.split('@')[0]
domain = fullname.split('@')[1]
password = request.data.get('password')
user = auth.authenticate(username=username, password=password, domain=domain)
token, created = Token.objects.get_or_create(user=user)
return Response({
'token': token.key,
'key': user.private_key
})
except IndexError:
return Response({
'error': 'Invalid Credentials'
}, status=400)
except IntegrityError:
return Response({
'error': 'Invalid Credentials'
}, status=400)
class UserSerializer(serializers.HyperlinkedModelSerializer):
class Meta:
model = ToolshedUser
fields = ['username', 'email', 'first_name', 'last_name', 'is_staff', 'is_active', 'date_joined']
class UserViewSet(viewsets.ModelViewSet):
queryset = ToolshedUser.objects.all()
serializer_class = UserSerializer
authentication_classes = [TokenAuthentication]
permission_classes = [IsAuthenticated, IsAdminUser]
@api_view(['GET'])
@permission_classes([IsAuthenticated])
@authentication_classes([SignatureAuthenticationLocal])
def getUserInfo(request):
user = request.user
return Response({
'username': user.username,
'domain': user.domain,
'email': user.email
})
@api_view(['POST'])
@permission_classes([])
@authentication_classes([])
def registerUser(request):
username = request.data.get('username')
domain = request.data.get('domain')
password = request.data.get('password')
email = request.data.get('email')
errors = {}
if not username:
errors['username'] = 'Username is required'
if not domain:
errors['domain'] = 'Domain is required'
if not password:
errors['password'] = 'Password is required'
if not email:
errors['email'] = 'Email is required'
if ToolshedUser.objects.filter(email=email).exists():
errors['email'] = 'Email already exists'
if ToolshedUser.objects.filter(username=username, domain=domain).exists():
errors['username'] = 'Username already exists'
if errors:
return Response({'errors': errors}, status=400)
if domain in ['localhost']:
user = ToolshedUser.objects.create_user(username, email, '', domain=domain)
user.set_password(password)
user.save()
return Response({'username': user.username, 'domain': user.domain})
else:
return Response({'errors': {'domain': 'Domain does not exist or is not open for registration'}}, status=400)
router.register(r'users', UserViewSet)
urlpatterns = [
path('', include(router.urls)),
path('user/', getUserInfo),
path('register/', registerUser),
path('token/', UserAuthToken.as_view()),
]

View file

@ -0,0 +1,96 @@
# Generated by Django 4.2.2 on 2023-06-14 23:13
from django.conf import settings
import django.contrib.auth.validators
from django.db import migrations, models
import django.db.models.deletion
import django.utils.timezone
class Migration(migrations.Migration):
initial = True
dependencies = [
('auth', '0012_alter_user_first_name_max_length'),
]
operations = [
migrations.CreateModel(
name='ToolshedUser',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('password', models.CharField(max_length=128, verbose_name='password')),
('last_login', models.DateTimeField(blank=True, null=True, verbose_name='last login')),
('is_superuser', models.BooleanField(default=False, help_text='Designates that this user has all permissions without explicitly assigning them.', verbose_name='superuser status')),
('username', models.CharField(error_messages={'unique': 'A user with that username already exists.'}, help_text='Required. 150 characters or fewer. Letters, digits and @/./+/-/_ only.', max_length=150, unique=True, validators=[django.contrib.auth.validators.UnicodeUsernameValidator()], verbose_name='username')),
('first_name', models.CharField(blank=True, max_length=150, verbose_name='first name')),
('last_name', models.CharField(blank=True, max_length=150, verbose_name='last name')),
('is_staff', models.BooleanField(default=False, help_text='Designates whether the user can log into this admin site.', verbose_name='staff status')),
('is_active', models.BooleanField(default=True, help_text='Designates whether this user should be treated as active. Unselect this instead of deleting accounts.', verbose_name='active')),
('date_joined', models.DateTimeField(default=django.utils.timezone.now, verbose_name='date joined')),
('email', models.EmailField(max_length=254, unique=True)),
('domain', models.CharField(default='localhost', max_length=255)),
('private_key', models.CharField(max_length=255)),
('groups', models.ManyToManyField(blank=True, help_text='The groups this user belongs to. A user will get all permissions granted to each of their groups.', related_name='user_set', related_query_name='user', to='auth.group', verbose_name='groups')),
],
),
migrations.CreateModel(
name='KnownIdentity',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('username', models.CharField(max_length=255)),
('domain', models.CharField(max_length=255)),
('public_key', models.CharField(max_length=255)),
('friends', models.ManyToManyField(to='authentication.knownidentity')),
],
),
migrations.CreateModel(
name='FriendRequestOutgoing',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('secret', models.CharField(max_length=255)),
('befriendee_username', models.CharField(max_length=255)),
('befriendee_domain', models.CharField(max_length=255)),
('befriender_user', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='friend_requests_outgoing', to=settings.AUTH_USER_MODEL)),
],
),
migrations.CreateModel(
name='FriendRequestIncoming',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('secret', models.CharField(max_length=255)),
('befriender_username', models.CharField(max_length=255)),
('befriender_domain', models.CharField(max_length=255)),
('befriender_public_key', models.CharField(max_length=255)),
('created_at', models.DateTimeField(auto_now_add=True)),
('befriendee_user', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='friend_requests_incoming', to=settings.AUTH_USER_MODEL)),
],
),
migrations.AddField(
model_name='toolsheduser',
name='public_identity',
field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='user', to='authentication.knownidentity'),
),
migrations.AddField(
model_name='toolsheduser',
name='user_permissions',
field=models.ManyToManyField(blank=True, help_text='Specific permissions for this user.', related_name='user_set', related_query_name='user', to='auth.permission', verbose_name='user permissions'),
),
migrations.AddIndex(
model_name='knownidentity',
index=models.Index(fields=['username', 'domain'], name='identity_idx'),
),
migrations.AlterUniqueTogether(
name='knownidentity',
unique_together={('username', 'domain')},
),
migrations.AddIndex(
model_name='toolsheduser',
index=models.Index(fields=['username', 'domain'], name='user_idx'),
),
migrations.AlterUniqueTogether(
name='toolsheduser',
unique_together={('username', 'domain'), ('email',)},
),
]

View file

@ -0,0 +1,114 @@
from django.contrib import auth
from django.contrib.auth.models import AbstractUser
from django.db import models, transaction
from django.db.utils import IntegrityError
from nacl.encoding import HexEncoder
from nacl.exceptions import BadSignatureError
from nacl.signing import SigningKey, VerifyKey
class KnownIdentity(models.Model):
username = models.CharField(max_length=255)
domain = models.CharField(max_length=255)
public_key = models.CharField(max_length=255)
friends = models.ManyToManyField('self', symmetrical=True)
class Meta:
unique_together = ('username', 'domain')
indexes = [
models.Index(fields=['username', 'domain'], name='identity_idx'),
]
def __str__(self):
return f"{self.username}@{self.domain}"
def is_authenticated(self):
return True
def verify(self, message, signature):
if len(signature) != 128 or type(signature) != str:
raise TypeError('Signature must be 128 characters long and a string')
if type(message) != str:
raise TypeError('Message must be a string')
try:
VerifyKey(bytes.fromhex(self.public_key)).verify(message.encode('utf-8'), bytes.fromhex(signature))
return True
except BadSignatureError:
return False
class ToolshedUserManager(auth.models.BaseUserManager):
def create_user(self, username, email, password, **extra_fields):
domain = extra_fields.pop('domain', 'localhost')
private_key_hex = extra_fields.pop('private_key', None)
if private_key_hex and type(private_key_hex) != str:
raise TypeError('Private key must be a string or no private key must be provided')
if private_key_hex and len(private_key_hex) != 64:
raise ValueError('Private key must be 64 characters long or no private key must be provided')
if private_key_hex and not all(c in '0123456789abcdef' for c in private_key_hex):
raise ValueError('Private key must be a hexadecimal string or no private key must be provided')
private_key = SigningKey(bytes.fromhex(private_key_hex)) if private_key_hex else SigningKey.generate()
public_key = SigningKey(private_key.encode()).verify_key
extra_fields['private_key'] = private_key.encode(encoder=HexEncoder).decode('utf-8')
try:
with transaction.atomic():
extra_fields['public_identity'] = identity = KnownIdentity.objects.create(
username=username, domain=domain, public_key=public_key.encode(encoder=HexEncoder).decode('utf-8'))
except IntegrityError:
raise ValueError('Username already exists')
else:
try:
with transaction.atomic():
user = super().create(username=username, email=email, password=password, domain=domain,
**extra_fields)
user.save()
except IntegrityError:
identity.delete()
raise ValueError('Username or email already exists')
else:
return user
def create_superuser(self, username, email, password, **extra_fields):
user = self.create_user(username=username, email=email, password=password, **extra_fields)
user.is_staff = True
user.is_superuser = True
return user
class ToolshedUser(AbstractUser):
email = models.EmailField(unique=True)
domain = models.CharField(max_length=255, default='localhost')
private_key = models.CharField(max_length=255)
public_identity = models.ForeignKey(KnownIdentity, on_delete=models.CASCADE, related_name='user')
objects = ToolshedUserManager()
class Meta:
unique_together = [('username', 'domain'), ['email']]
indexes = [
models.Index(fields=['username', 'domain'], name='user_idx'),
]
def __str__(self):
return f"{self.username}@{self.domain}"
def sign(self, message):
if type(message) != str:
raise TypeError('Message must be a string')
private_key = SigningKey(self.private_key.encode(), encoder=HexEncoder)
return private_key.sign(message.encode('utf-8'), encoder=HexEncoder).signature.decode('utf-8')
class FriendRequestOutgoing(models.Model):
secret = models.CharField(max_length=255)
befriender_user = models.ForeignKey(ToolshedUser, on_delete=models.CASCADE, related_name='friend_requests_outgoing')
befriendee_username = models.CharField(max_length=255)
befriendee_domain = models.CharField(max_length=255)
class FriendRequestIncoming(models.Model):
secret = models.CharField(max_length=255)
befriender_username = models.CharField(max_length=255)
befriender_domain = models.CharField(max_length=255)
befriender_public_key = models.CharField(max_length=255)
befriendee_user = models.ForeignKey(ToolshedUser, on_delete=models.CASCADE, related_name='friend_requests_incoming')
created_at = models.DateTimeField(auto_now_add=True)

View file

@ -0,0 +1,68 @@
from rest_framework import authentication
from authentication.models import ToolshedUser
def split_userhandle_or_throw(userhandle):
if '@' not in userhandle:
raise ValueError('Userhandle must be in the format username@domain')
username, domain = userhandle.split('@')
if not username:
raise ValueError('Username cannot be empty')
if not domain:
raise ValueError('Domain cannot be empty')
return username, domain
def verify_request(request, raw_request_body):
authentication_header = request.META.get('HTTP_AUTHORIZATION')
if not authentication_header:
raise ValueError('No authentication header provided')
if not authentication_header.startswith('Signature '):
raise ValueError('Authorization header must be in the format "Signature author@domain:signature_hex[128]"')
signature = authentication_header.split('Signature ')[1]
if ':' not in signature:
raise ValueError('Authorization header must be in the format "Signature author@domain:signature_hex[128]"')
author = signature.split(':')[0]
signature_bytes_hex = signature.split(':')[1]
if not author or not signature_bytes_hex or len(signature_bytes_hex) != 128:
raise ValueError('Authorization header must be in the format "Signature author@domain:signature_hex[128]"')
username, domain = split_userhandle_or_throw(author)
signed_data = request.build_absolute_uri()
if request.method == 'POST':
signed_data += raw_request_body
elif request.method == 'PUT':
signed_data += raw_request_body
elif request.method == 'PATCH':
signed_data += raw_request_body
return username, domain, signed_data, signature_bytes_hex
def authenticate_request_against_local_users(request, raw_request_body):
try:
username, domain, signed_data, signature_bytes_hex = verify_request(request, raw_request_body)
except ValueError:
return None
try:
author_user = ToolshedUser.objects.get(username=username, domain=domain)
except ToolshedUser.DoesNotExist:
return None
if author_user.public_identity.verify(signed_data, signature_bytes_hex):
return author_user
else:
return None
class SignatureAuthenticationLocal(authentication.BaseAuthentication):
def authenticate(self, request):
return authenticate_request_against_local_users(
request, request.body.decode('utf-8')), None

View file

@ -0,0 +1 @@
from .helpers import *

View file

@ -0,0 +1,84 @@
import json
from django.test import TestCase, Client
from nacl.encoding import HexEncoder
from authentication.models import ToolshedUser, KnownIdentity
from nacl.signing import SigningKey
class DummyExternalUser:
def __init__(self, username, domain, known=True):
self.username = username
self.domain = domain
self.__signing_key = SigningKey.generate()
self.public_identity, _ = KnownIdentity.objects.get_or_create(
username=username,
domain=domain,
public_key=self.public_key()) if known else None
def __str__(self):
return self.username + '@' + self.domain
def sign(self, message):
return self.__signing_key.sign(message.encode('utf-8'), encoder=HexEncoder).signature.decode('utf-8')
def public_key(self):
return self.__signing_key.verify_key.encode(encoder=HexEncoder).decode('utf-8')
@property
def friends(self):
return self.public_identity.friends
class SignatureAuthClient:
base = Client(SERVER_NAME='testserver')
def get(self, target, user, **kwargs):
signature = user.sign("http://testserver" + target)
header = {'HTTP_AUTHORIZATION': 'Signature ' + str(user) + ':' + signature}
return self.base.get(target, **header, **kwargs)
def post(self, target, user, data, **kwargs):
json_data = json.dumps(data, separators=(',', ':'))
signature = user.sign("http://testserver" + target + json_data)
header = {'HTTP_AUTHORIZATION': 'Signature ' + str(user) + ':' + signature}
return self.base.post(target, json_data, **header, content_type='application/json', **kwargs)
def put(self, target, user, data, **kwargs):
json_data = json.dumps(data, separators=(',', ':'))
signature = user.sign("http://testserver" + target + json_data)
header = {'HTTP_AUTHORIZATION': 'Signature ' + str(user) + ':' + signature}
return self.base.put(target, json_data, **header, content_type='application/json', **kwargs)
def patch(self, target, user, data, **kwargs):
json_data = json.dumps(data, separators=(',', ':'))
signature = user.sign("http://testserver" + target + json_data)
header = {'HTTP_AUTHORIZATION': 'Signature ' + str(user) + ':' + signature}
return self.base.patch(target, json_data, **header, content_type='application/json', **kwargs)
def delete(self, target, user, **kwargs):
signature = user.sign("http://testserver" + target)
header = {'HTTP_AUTHORIZATION': 'Signature ' + str(user) + ':' + signature}
return self.base.delete(target, **header, **kwargs)
class UserTestCase(TestCase):
ext_user1 = None
ext_user2 = None
local_user1 = None
local_user2 = None
def setUp(self):
admin = ToolshedUser.objects.create_superuser('admin', 'admin@localhost', '')
admin.set_password('testpassword')
admin.save()
example_com = type('obj', (object,), {'name': 'example.com'})
self.local_user1 = ToolshedUser.objects.create_user('testuser', 'test@abc.de', '', domain=example_com.name)
self.local_user1.set_password('testpassword2')
self.local_user1.save()
self.local_user2 = ToolshedUser.objects.create_user('testuser2', 'test2@abc.de', '', domain=example_com.name)
self.local_user2.set_password('testpassword3')
self.local_user2.save()
self.ext_user1 = DummyExternalUser('extuser1', 'external.org')
self.ext_user2 = DummyExternalUser('extuser2', 'external.org')

View file

@ -0,0 +1,379 @@
from django.test import TestCase, Client
from nacl.encoding import HexEncoder
from nacl.signing import SigningKey
from authentication.models import ToolshedUser, KnownIdentity
from authentication.tests import UserTestCase, SignatureAuthClient
class KnownIdentityTestCase(TestCase):
key = None
def setUp(self):
self.key = SigningKey.generate()
KnownIdentity.objects.create(username="testuser", domain='external.com',
public_key=self.key.verify_key.encode(encoder=HexEncoder).decode('utf-8'))
def test_known_identity(self):
identity = KnownIdentity.objects.get(username="testuser", domain='external.com')
self.assertEqual(identity.username, "testuser")
self.assertEqual(identity.domain, "external.com")
self.assertEqual(identity.public_key, self.key.verify_key.encode(encoder=HexEncoder).decode('utf-8'))
self.assertEqual(str(identity), "testuser@external.com")
self.assertTrue(identity.is_authenticated())
def test_known_identity_verify(self):
identity = KnownIdentity.objects.get(username="testuser", domain='external.com')
message = "Hello world, this is a test message."
signed = self.key.sign(message.encode('utf-8'), encoder=HexEncoder).signature
self.assertTrue(identity.verify(message, signed.decode('utf-8')))
def test_known_identity_verify_fail(self):
identity = KnownIdentity.objects.get(username="testuser", domain='external.com')
message = "Hello world, this is a test message."
signed = self.key.sign(message.encode('utf-8'), encoder=HexEncoder).signature
self.assertFalse(identity.verify(message + "x", signed.decode('utf-8')))
def test_known_identity_verify_fail2(self):
identity = KnownIdentity.objects.get(username="testuser", domain='external.com')
message = "Hello world, this is a test message."
signed = self.key.sign(message.encode('utf-8'), encoder=HexEncoder).signature
with self.assertRaises(TypeError):
identity.verify(message.encode('utf-8'), signed.decode('utf-8'))
def test_known_identity_verify_fail3(self):
identity = KnownIdentity.objects.get(username="testuser", domain='external.com')
message = "Hello world, this is a test message."
signed = self.key.sign(message.encode('utf-8'), encoder=HexEncoder).signature
with self.assertRaises(TypeError):
identity.verify(message, signed)
def test_known_identity_verify_fail4(self):
identity = KnownIdentity.objects.get(username="testuser", domain='external.com')
message = "Hello world, this is a test message."
signed = self.key.sign(message.encode('utf-8'), encoder=HexEncoder).signature
with self.assertRaises(TypeError):
identity.verify(message, bytes.fromhex(signed.decode('utf-8')))
class UserModelTestCase(UserTestCase):
def setUp(self):
super().setUp()
def test_admin(self):
user = ToolshedUser.objects.get(username='admin')
self.assertTrue(user.is_superuser)
self.assertTrue(user.is_staff)
self.assertTrue(user.is_active)
self.assertEqual(user.domain, 'localhost')
self.assertEqual(user.email, 'admin@localhost')
self.assertEqual(user.username, 'admin')
def test_user(self):
user = ToolshedUser.objects.get(username='testuser')
self.assertFalse(user.is_superuser)
self.assertFalse(user.is_staff)
self.assertTrue(user.is_active)
self.assertEqual(user.domain, 'example.com')
self.assertEqual(user.email, 'test@abc.de')
self.assertEqual(user.username, 'testuser')
self.assertEqual(len(user.private_key), 64)
self.assertEqual(type(user.public_identity), KnownIdentity)
self.assertEqual(user.public_identity.domain, 'example.com')
self.assertEqual(user.public_identity.username, 'testuser')
self.assertEqual(len(user.public_identity.public_key), 64)
def test_create_existing_user(self):
with self.assertRaises(ValueError):
ToolshedUser.objects.create_user('testuser', 'test3@abc.de', '', domain='localhost')
def test_create_existing_user2(self):
key = SigningKey.generate()
KnownIdentity.objects.create(username="testuser3", domain='localhost',
public_key=key.verify_key.encode(encoder=HexEncoder).decode('utf-8'))
with self.assertRaises(ValueError):
ToolshedUser.objects.create_user('testuser3', 'test3@abc.de', '', domain='localhost')
def test_create_reuse_email(self):
with self.assertRaises(ValueError):
ToolshedUser.objects.create_user('testuser3', 'test@abc.de', '', domain='localhost')
def test_create_user_invalid_private_key(self):
with self.assertRaises(TypeError):
ToolshedUser.objects.create_user('testuser3', 'test3@abc.de', '', domain='localhost',
private_key=b'0123456789abcdef0123456789abcdef')
with self.assertRaises(ValueError):
ToolshedUser.objects.create_user('testuser3', 'test3@abc.de', '', domain='localhost',
private_key='7005c4097')
with self.assertRaises(ValueError):
ToolshedUser.objects.create_user('testuser3', 'test3@abc.de', '', domain='localhost',
private_key='0123456789abcdef0123456789abcdef'
'Z123456789abcdef0123456789abcdef')
def test_signature(self):
user = self.local_user1
message = 'some message'
signature = user.sign(message)
self.assertEqual(len(signature), 128)
self.assertTrue(user.public_identity.verify(message, signature))
def test_signature_fail(self):
user = self.local_user1
message = 'some message'
signature = user.sign(message)
self.assertFalse(user.public_identity.verify(message + 'x', signature))
def test_signature_fail2(self):
user = self.local_user1
message = 'some message'
signature = user.sign(message)
signature = signature[:-2] + 'ee'
self.assertFalse(user.public_identity.verify(message, signature))
def test_signature_fail3(self):
user1 = self.local_user1
user2 = self.local_user2
message = 'some message'
signature = user1.sign(message)
self.assertFalse(user2.public_identity.verify(message, signature))
def test_signature_fail4(self):
user = self.local_user1
message = 'some message'
with self.assertRaises(TypeError):
user.sign(message.encode('utf-8'))
class UserApiTestCase(UserTestCase):
def setUp(self):
super().setUp()
self.anonymous_client = Client(SERVER_NAME='testserver')
self.client = SignatureAuthClient()
def test_user_info(self):
reply = self.client.get('/auth/user/', self.local_user1)
self.assertEqual(reply.status_code, 200)
self.assertEqual(reply.json()['username'], 'testuser')
self.assertEqual(reply.json()['domain'], 'example.com')
self.assertEqual(reply.json()['email'], 'test@abc.de')
def test_user_info2(self):
target = "/auth/user/"
signature = self.local_user1.sign("http://testserver" + target)
header = {'HTTP_AUTHORIZATION': 'Signature ' + str(self.local_user1) + ':' + signature}
reply = self.anonymous_client.get(target, **header)
self.assertEqual(reply.status_code, 200)
self.assertEqual(reply.json()['username'], 'testuser')
self.assertEqual(reply.json()['domain'], 'example.com')
def test_user_info_fail(self):
reply = self.anonymous_client.get('/auth/user/')
self.assertEqual(reply.status_code, 403)
def test_user_info_fail2(self):
reply = self.client.get('/auth/user/', self.ext_user1)
self.assertEqual(reply.status_code, 403)
def test_user_info_fail3(self):
target = "/auth/user/"
signature = self.local_user1.sign("http://testserver2" + target)
header = {'HTTP_AUTHORIZATION': 'Signature ' + str(self.local_user1) + ':' + signature}
reply = self.anonymous_client.get(target, **header)
self.assertEqual(reply.status_code, 403)
def test_user_info_fail4(self):
target = "/auth/user/"
signature = self.local_user1.sign("http://testserver" + target)
header = {'HTTP_AUTHORIZATION': 'Auth ' + str(self.local_user1) + ':' + signature}
reply = self.anonymous_client.get(target, **header)
self.assertEqual(reply.status_code, 403)
def test_user_info_fail5(self):
target = "/auth/user/"
signature = self.local_user1.sign("http://testserver" + target)
header = {'HTTP_AUTHORIZATION': 'Signature ' + str(self.local_user1)}
reply = self.anonymous_client.get(target, **header)
self.assertEqual(reply.status_code, 403)
def test_user_info_fail6(self):
target = "/auth/user/"
signature = self.local_user1.sign("http://testserver" + target)
header = {'HTTP_AUTHORIZATION': 'Signature ' + str(self.local_user1) + ':' + signature + 'f'}
reply = self.anonymous_client.get(target, **header)
self.assertEqual(reply.status_code, 403)
def test_user_info_fail7(self):
target = "/auth/user/"
signature = self.local_user1.sign("http://testserver" + target)
header = {'HTTP_AUTHORIZATION': 'Signature ' + self.local_user1.username + ':' + signature}
reply = self.anonymous_client.get(target, **header)
self.assertEqual(reply.status_code, 403)
def test_user_info_fail8(self):
target = "/auth/user/"
signature = self.local_user1.sign("http://testserver" + target)
header = {'HTTP_AUTHORIZATION': 'Signature ' + self.local_user1.username + '@:' + signature}
reply = self.anonymous_client.get(target, **header)
self.assertEqual(reply.status_code, 403)
def test_user_info_fail9(self):
target = "/auth/user/"
signature = self.local_user1.sign("http://testserver" + target)
header = {'HTTP_AUTHORIZATION': 'Signature @' + self.local_user1.domain + ':' + signature}
reply = self.anonymous_client.get(target, **header)
self.assertEqual(reply.status_code, 403)
class LoginApiTestCase(UserTestCase):
user = None
client = Client(SERVER_NAME='testserver')
def setUp(self):
super().setUp()
self.user = self.local_user1
def test_login(self):
reply = self.client.post('/auth/token/',
{'username': self.user.username + '@' + self.user.domain, 'password': 'testpassword2'})
self.assertEqual(reply.status_code, 200)
self.assertTrue('token' in reply.json())
self.assertTrue(len(reply.json()['token']) == 40)
self.assertTrue('key' in reply.json())
self.assertEqual(len(reply.json()['key']), 64)
def test_login_fail(self):
reply = self.client.post('/auth/token/',
{'username': self.user.username + '@' + self.user.domain, 'password': 'testpassword3'})
self.assertEqual(reply.status_code, 400)
self.assertTrue('token' not in reply.json())
self.assertTrue('error' in reply.json())
def test_login_fail2(self):
reply = self.client.post('/auth/token/',
{'username': self.user.username, 'password': 'testpassword2'})
self.assertEqual(reply.status_code, 400)
self.assertTrue('token' not in reply.json())
self.assertTrue('error' in reply.json())
class RegistrationApiTestCase(TestCase):
client = Client(SERVER_NAME='testserver')
def setUp(self):
admin = ToolshedUser.objects.create_superuser('admin', 'admin@localhost', '')
admin.set_password('testpassword')
admin.save()
example_com = type('obj', (object,), {'name': 'localhost'})
user2 = ToolshedUser.objects.create_user('testuser2', 'test2@abc.de', '', domain=example_com.name)
user2.set_password('testpassword3')
user2.save()
def test_registration(self):
self.assertEqual(ToolshedUser.objects.all().count(), 2)
reply = self.client.post('/auth/register/',
{'username': 'testuser', 'password': 'testpassword2', 'domain': 'localhost',
'email': 'test@abc.de'})
self.assertEqual(reply.status_code, 200)
self.assertTrue('username' in reply.json())
self.assertTrue('domain' in reply.json())
user = ToolshedUser.objects.get(username='testuser')
self.assertEqual(user.email, 'test@abc.de')
self.assertEqual(user.domain, 'localhost')
self.assertTrue(user.check_password('testpassword2'))
self.assertEqual(ToolshedUser.objects.all().count(), 3)
def test_registration_fail(self):
reply = self.client.post('/auth/register/',
{'username': '', 'password': 'testpassword2', 'domain': 'localhost',
'email': 'test@abc.de'})
self.assertEqual(reply.status_code, 400)
self.assertTrue('errors' in reply.json())
self.assertTrue('username' in reply.json()['errors'])
self.assertEqual(ToolshedUser.objects.all().count(), 2)
def test_registration_fail2(self):
reply = self.client.post('/auth/register/',
{'password': 'testpassword2', 'domain': 'localhost', 'email': 'test@abc.de'})
self.assertEqual(reply.status_code, 400)
self.assertTrue('errors' in reply.json())
self.assertTrue('username' in reply.json()['errors'])
self.assertEqual(ToolshedUser.objects.all().count(), 2)
def test_registration_fail3(self):
reply = self.client.post('/auth/register/',
{'username': 'testuser', 'password': '', 'domain': 'localhost',
'email': 'test@abc.de'})
self.assertEqual(reply.status_code, 400)
self.assertTrue('errors' in reply.json())
self.assertTrue('password' in reply.json()['errors'])
self.assertEqual(ToolshedUser.objects.all().count(), 2)
def test_registration_fail4(self):
reply = self.client.post('/auth/register/',
{'username': 'testuser', 'domain': 'localhost', 'email': 'test@abc.de'})
self.assertEqual(reply.status_code, 400)
self.assertTrue('errors' in reply.json())
self.assertTrue('password' in reply.json()['errors'])
self.assertEqual(ToolshedUser.objects.all().count(), 2)
def test_registration_fail5(self):
reply = self.client.post('/auth/register/',
{'username': 'testuser', 'password': 'testpassword2', 'domain': '',
'email': 'test@abc.de'})
self.assertEqual(reply.status_code, 400)
self.assertTrue('errors' in reply.json())
self.assertTrue('domain' in reply.json()['errors'])
self.assertEqual(ToolshedUser.objects.all().count(), 2)
def test_registration_fail6(self):
reply = self.client.post('/auth/register/',
{'username': 'testuser', 'password': 'testpassword2', 'email': 'test@abc.de'})
self.assertEqual(reply.status_code, 400)
self.assertTrue('errors' in reply.json())
self.assertTrue('domain' in reply.json()['errors'])
self.assertEqual(ToolshedUser.objects.all().count(), 2)
def test_registration_fail7(self):
reply = self.client.post('/auth/register/',
{'username': 'testuser', 'password': 'testpassword2', 'domain': 'localhost',
'email': ''})
self.assertEqual(reply.status_code, 400)
self.assertTrue('errors' in reply.json())
self.assertTrue('email' in reply.json()['errors'])
self.assertEqual(ToolshedUser.objects.all().count(), 2)
def test_registration_fail8(self):
reply = self.client.post('/auth/register/',
{'username': 'testuser', 'password': 'testpassword2', 'domain': 'localhost'})
self.assertEqual(reply.status_code, 400)
self.assertTrue('errors' in reply.json())
self.assertTrue('email' in reply.json()['errors'])
self.assertEqual(ToolshedUser.objects.all().count(), 2)
def test_registration_existing_user(self):
reply = self.client.post('/auth/register/',
{'username': 'testuser2', 'password': 'testpassword2', 'domain': 'localhost',
'email': 'test3@abc.de'})
self.assertEqual(reply.status_code, 400)
self.assertTrue('errors' in reply.json())
# TODO: check for sensible error message
self.assertEqual(ToolshedUser.objects.all().count(), 2)
def test_registration_foreign_domain(self):
reply = self.client.post('/auth/register/',
{'username': 'testuser', 'password': 'testpassword2', 'domain': 'example.org',
'email': 'test@abc.de'})
self.assertEqual(reply.status_code, 400)
self.assertTrue('errors' in reply.json())
self.assertTrue('domain' in reply.json()['errors'])
self.assertEqual(ToolshedUser.objects.all().count(), 2)
def test_registration_reuse_email(self):
reply = self.client.post('/auth/register/',
{'username': 'testuser', 'password': 'testpassword2', 'domain': 'localhost',
'email': 'test2@abc.de'})
self.assertEqual(reply.status_code, 400)
self.assertTrue('errors' in reply.json())
self.assertTrue('email' in reply.json()['errors'])
self.assertEqual(ToolshedUser.objects.all().count(), 2)

View file

@ -46,6 +46,7 @@ INSTALLED_APPS = [
'rest_framework.authtoken', 'rest_framework.authtoken',
'corsheaders', 'corsheaders',
'drf_yasg', 'drf_yasg',
'authentication',
] ]
REST_FRAMEWORK = { REST_FRAMEWORK = {
@ -109,6 +110,7 @@ DATABASES = {
} }
} }
AUTH_USER_MODEL = 'authentication.ToolshedUser'
# Password validation # Password validation
# https://docs.djangoproject.com/en/4.1/ref/settings/#auth-password-validators # https://docs.djangoproject.com/en/4.1/ref/settings/#auth-password-validators

View file

@ -30,5 +30,6 @@ schema_view = get_schema_view(
urlpatterns = [ urlpatterns = [
path('djangoadmin/', admin.site.urls), path('djangoadmin/', admin.site.urls),
path('auth/', include('authentication.api')),
path('docs/', schema_view.with_ui('swagger', cache_timeout=0), name='api-docs'), path('docs/', schema_view.with_ui('swagger', cache_timeout=0), name='api-docs'),
] ]