toolshed/backend/authentication/models.py
jedi bd59c40ac6
Some checks failed
continuous-integration/drone/push Build is failing
stash
2024-03-01 00:50:26 +01:00

131 lines
5.5 KiB
Python

from django.contrib import auth
from django.contrib.auth.models import AbstractUser
from django.db import models, transaction
from django.db.utils import IntegrityError
from nacl.encoding import HexEncoder
from nacl.exceptions import BadSignatureError
from nacl.signing import SigningKey, VerifyKey
class KnownIdentity(models.Model):
username = models.CharField(max_length=255)
domain = models.CharField(max_length=255)
public_key = models.CharField(max_length=255)
friends = models.ManyToManyField('self', symmetrical=True)
class Meta:
unique_together = ('username', 'domain')
indexes = [
models.Index(fields=['username', 'domain'], name='identity_idx'),
]
def __str__(self):
return f"{self.username}@{self.domain}"
@staticmethod
def is_authenticated():
return True
def friends_or_self(self):
return ToolshedUser.objects.filter(public_identity__friends=self) | ToolshedUser.objects.filter(
public_identity=self)
def verify(self, message, signature):
if len(signature) != 128 or not isinstance(signature, str):
raise TypeError('Signature must be 128 characters long and a string')
if not isinstance(message, str):
raise TypeError('Message must be a string')
try:
VerifyKey(bytes.fromhex(self.public_key)).verify(message.encode('utf-8'), bytes.fromhex(signature))
return True
except BadSignatureError:
return False
class ToolshedUserManager(auth.models.BaseUserManager):
def create_user(self, username, email, password, **extra_fields):
domain = extra_fields.pop('domain', 'localhost')
private_key_hex: str | None = extra_fields.pop('private_key', None)
if private_key_hex is not None:
if not isinstance(private_key_hex, str):
raise TypeError('Private key must be a string or no private key must be provided')
if len(private_key_hex) != 64:
raise ValueError('Private key must be 64 characters long or no private key must be provided')
if not all(c in '0123456789abcdef' for c in private_key_hex):
raise ValueError('Private key must be a hexadecimal string or no private key must be provided')
private_key = SigningKey(bytes.fromhex(private_key_hex))
else:
private_key = SigningKey.generate()
public_key = SigningKey(private_key.encode()).verify_key
extra_fields['private_key'] = private_key.encode(encoder=HexEncoder).decode('utf-8')
try:
with transaction.atomic():
extra_fields['public_identity'] = identity = KnownIdentity.objects.get_or_create(
username=username, domain=domain,
public_key=public_key.encode(encoder=HexEncoder).decode('utf-8'))[0]
try:
with transaction.atomic():
user = super().create(username=username, email=email, password=password, domain=domain,
**extra_fields)
user.set_password(password)
user.save()
except IntegrityError:
identity.delete()
raise ValueError('Username or email already exists')
else:
return user
except IntegrityError:
raise ValueError('Username already exists')
def create_superuser(self, username, email, password, **extra_fields):
user = self.create_user(username, email, password, **extra_fields)
user.is_staff = True
user.is_superuser = True
user.save()
return user
class ToolshedUser(AbstractUser):
email = models.EmailField(unique=True)
domain = models.CharField(max_length=255, default='localhost')
private_key = models.CharField(max_length=255)
public_identity = models.ForeignKey(KnownIdentity, on_delete=models.CASCADE, related_name='user')
objects = ToolshedUserManager()
class Meta:
unique_together = [('username', 'domain'), ['email']]
indexes = [
models.Index(fields=['username', 'domain'], name='user_idx'),
]
def __str__(self):
return f"{self.username}@{self.domain}"
@property
def friends(self):
return self.public_identity.friends
def sign(self, message):
if type(message) != str:
raise TypeError('Message must be a string')
private_key = SigningKey(self.private_key.encode(), encoder=HexEncoder)
return private_key.sign(message.encode('utf-8'), encoder=HexEncoder).signature.decode('utf-8')
def public_key(self):
return self.public_identity.public_key
class FriendRequestOutgoing(models.Model):
secret = models.CharField(max_length=255)
befriender_user = models.ForeignKey(ToolshedUser, on_delete=models.CASCADE, related_name='friend_requests_outgoing')
befriendee_username = models.CharField(max_length=255)
befriendee_domain = models.CharField(max_length=255)
class FriendRequestIncoming(models.Model):
secret = models.CharField(max_length=255)
befriender_username = models.CharField(max_length=255)
befriender_domain = models.CharField(max_length=255)
befriender_public_key = models.CharField(max_length=255)
befriendee_user = models.ForeignKey(ToolshedUser, on_delete=models.CASCADE, related_name='friend_requests_incoming')
created_at = models.DateTimeField(auto_now_add=True)