68 lines
2.4 KiB
Python
68 lines
2.4 KiB
Python
from rest_framework import authentication
|
|
from authentication.models import ToolshedUser
|
|
|
|
|
|
def split_userhandle_or_throw(userhandle):
|
|
if '@' not in userhandle:
|
|
raise ValueError('Userhandle must be in the format username@domain')
|
|
username, domain = userhandle.split('@')
|
|
if not username:
|
|
raise ValueError('Username cannot be empty')
|
|
if not domain:
|
|
raise ValueError('Domain cannot be empty')
|
|
return username, domain
|
|
|
|
|
|
def verify_request(request, raw_request_body):
|
|
authentication_header = request.META.get('HTTP_AUTHORIZATION')
|
|
|
|
if not authentication_header:
|
|
raise ValueError('No authentication header provided')
|
|
|
|
if not authentication_header.startswith('Signature '):
|
|
raise ValueError('Authorization header must be in the format "Signature author@domain:signature_hex[128]"')
|
|
|
|
signature = authentication_header.split('Signature ')[1]
|
|
|
|
if ':' not in signature:
|
|
raise ValueError('Authorization header must be in the format "Signature author@domain:signature_hex[128]"')
|
|
|
|
author = signature.split(':')[0]
|
|
signature_bytes_hex = signature.split(':')[1]
|
|
|
|
if not author or not signature_bytes_hex or len(signature_bytes_hex) != 128:
|
|
raise ValueError('Authorization header must be in the format "Signature author@domain:signature_hex[128]"')
|
|
|
|
username, domain = split_userhandle_or_throw(author)
|
|
|
|
signed_data = request.build_absolute_uri()
|
|
|
|
if request.method == 'POST':
|
|
signed_data += raw_request_body
|
|
elif request.method == 'PUT':
|
|
signed_data += raw_request_body
|
|
elif request.method == 'PATCH':
|
|
signed_data += raw_request_body
|
|
|
|
return username, domain, signed_data, signature_bytes_hex
|
|
|
|
|
|
def authenticate_request_against_local_users(request, raw_request_body):
|
|
try:
|
|
username, domain, signed_data, signature_bytes_hex = verify_request(request, raw_request_body)
|
|
except ValueError:
|
|
return None
|
|
try:
|
|
author_user = ToolshedUser.objects.get(username=username, domain=domain)
|
|
except ToolshedUser.DoesNotExist:
|
|
return None
|
|
if author_user.public_identity.verify(signed_data, signature_bytes_hex):
|
|
return author_user
|
|
else:
|
|
return None
|
|
|
|
|
|
class SignatureAuthenticationLocal(authentication.BaseAuthentication):
|
|
def authenticate(self, request):
|
|
return authenticate_request_against_local_users(
|
|
request, request.body.decode('utf-8')), None
|