diff --git a/backend/configure.py b/backend/configure.py index 9f10659..ead8fa9 100755 --- a/backend/configure.py +++ b/backend/configure.py @@ -1,9 +1,11 @@ #!/usr/bin/env python3 +import json import os import sys from argparse import ArgumentParser import dotenv +from django.db import transaction, IntegrityError def yesno(prompt, default=False): @@ -78,10 +80,68 @@ def configure(): call_command('collectstatic', '--no-input') + if yesno("Do you want to import all categories, properties and tags contained in this repository?", default=True): + from hostadmin.serializers import CategorySerializer, PropertySerializer, TagSerializer + from hostadmin.models import ImportedIdentifierSets + if not os.path.exists('shared_data'): + os.mkdir('shared_data') + files = os.listdir('shared_data') + idsets = {} + for file in files: + if file.endswith('.json'): + name = "git:" + file[:-5] + with open('shared_data/' + file, 'r') as f: + try: + idset = json.load(f) + idsets[name] = idset + except json.decoder.JSONDecodeError: + print('Error: invalid JSON in file {}'.format(file)) + imported_sets = ImportedIdentifierSets.objects.all() + for name in [name for name in idsets.keys() if imported_sets.filter(name=name).exists()]: + print('Identifier set {} already imported, skipping'.format(name)) + queue = [name for name in idsets.keys() if not imported_sets.filter(name=name).exists()] + while queue: + name = queue.pop(0) + print('Importing {}...'.format(name)) + idset = idsets[name] + if 'depends' in idset: + unmet_deps = [dep for dep in idset['depends'] if not imported_sets.filter(name=dep).exists()] + if unmet_deps: + if all([dep in idsets.keys() for dep in unmet_deps]): + print('Not all dependencies for {} are imported, postponing'.format(name)) + queue.append(name) + continue + else: + print('unknown dependencies for {}: {}'.format(name, unmet_deps)) + continue + with transaction.atomic(): + try: + if 'categories' in idset: + for category in idset['categories']: + serializer = CategorySerializer(data=category) + if serializer.is_valid(): + serializer.save(origin=name) + if 'properties' in idset: + for property in idset['properties']: + serializer = PropertySerializer(data=property) + if serializer.is_valid(): + serializer.save(origin=name) + if 'tags' in idset: + for tag in idset['tags']: + serializer = TagSerializer(data=tag) + if serializer.is_valid(): + serializer.save(origin=name) + imported_sets.create(name=name) + except IntegrityError: + print('Error: integrity error while importing {}\n\tmight be cause by name conflicts with existing' + ' categories, properties or tags'.format(name)) + continue + def reset(): os.environ.setdefault("DJANGO_SETTINGS_MODULE", "backend.settings") import django + import shutil django.setup() @@ -90,17 +150,20 @@ def reset(): except FileNotFoundError: pass + for file in os.listdir('userfiles'): + try: + shutil.rmtree('userfiles/' + file) + except FileNotFoundError: + pass + os.system("git clean -f */migrations") from django.core.management import call_command - apps = ['authentication', 'authtoken', 'sessions', 'hostadmin', 'toolshed', 'admin'] + apps = ['authentication', 'authtoken', 'sessions', 'hostadmin', 'files', 'toolshed', 'admin'] for app in apps: call_command('makemigrations', app) - for app in apps: - call_command('migrate', app) - def testdata(): os.environ.setdefault("DJANGO_SETTINGS_MODULE", "backend.settings") diff --git a/backend/hostadmin/api.py b/backend/hostadmin/api.py index 6f0319c..0bd5208 100644 --- a/backend/hostadmin/api.py +++ b/backend/hostadmin/api.py @@ -1,26 +1,59 @@ -from rest_framework import routers, serializers, viewsets +from rest_framework import routers, viewsets from rest_framework.authentication import TokenAuthentication -from rest_framework.permissions import IsAuthenticated +from rest_framework.permissions import IsAuthenticated, IsAdminUser +from authentication.signature_auth import SignatureAuthenticationLocal from hostadmin.models import Domain +from hostadmin.serializers import DomainSerializer, CategorySerializer, PropertySerializer, TagSerializer +from toolshed.models import Category, Property, Tag router = routers.SimpleRouter() -class DomainSerializer(serializers.ModelSerializer): - class Meta: - model = Domain - fields = '__all__' - - class DomainViewSet(viewsets.ModelViewSet): queryset = Domain.objects.all() serializer_class = DomainSerializer - authentication_classes = [TokenAuthentication] - permission_classes = [IsAuthenticated] + authentication_classes = [TokenAuthentication, SignatureAuthenticationLocal] + permission_classes = [IsAuthenticated, IsAdminUser] + + def perform_create(self, serializer): + serializer.save(owner=self.request.user) + + +class CategoryViewSet(viewsets.ModelViewSet): + queryset = Category.objects.all() + serializer_class = CategorySerializer + authentication_classes = [TokenAuthentication, SignatureAuthenticationLocal] + permission_classes = [IsAuthenticated, IsAdminUser] + + def perform_create(self, serializer): + serializer.save(origin='api') + + +class PropertyViewSet(viewsets.ModelViewSet): + queryset = Property.objects.all() + serializer_class = PropertySerializer + authentication_classes = [TokenAuthentication, SignatureAuthenticationLocal] + permission_classes = [IsAuthenticated, IsAdminUser] + + def perform_create(self, serializer): + serializer.save(origin='api') + + +class TagViewSet(viewsets.ModelViewSet): + queryset = Tag.objects.all() + serializer_class = TagSerializer + authentication_classes = [TokenAuthentication, SignatureAuthenticationLocal] + permission_classes = [IsAuthenticated, IsAdminUser] + + def perform_create(self, serializer): + serializer.save(origin='api') router.register(r'domains', DomainViewSet, basename='domains') +router.register(r'categories', CategoryViewSet, basename='categories') +router.register(r'properties', PropertyViewSet, basename='properties') +router.register(r'tags', TagViewSet, basename='tags') urlpatterns = [ *router.urls, diff --git a/backend/hostadmin/migrations/0002_importedidentifiersets.py b/backend/hostadmin/migrations/0002_importedidentifiersets.py new file mode 100644 index 0000000..a01abc2 --- /dev/null +++ b/backend/hostadmin/migrations/0002_importedidentifiersets.py @@ -0,0 +1,21 @@ +# Generated by Django 4.2.2 on 2023-07-04 22:30 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('hostadmin', '0001_initial'), + ] + + operations = [ + migrations.CreateModel( + name='ImportedIdentifierSets', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('name', models.CharField(max_length=255, unique=True)), + ('created_at', models.DateTimeField(auto_now_add=True)), + ], + ), + ] diff --git a/backend/hostadmin/models.py b/backend/hostadmin/models.py index 1bb2288..bac6ac1 100644 --- a/backend/hostadmin/models.py +++ b/backend/hostadmin/models.py @@ -8,3 +8,8 @@ class Domain(models.Model): def __str__(self): return self.name + + +class ImportedIdentifierSets(models.Model): + name = models.CharField(max_length=255, unique=True) + created_at = models.DateTimeField(auto_now_add=True) diff --git a/backend/hostadmin/serializers.py b/backend/hostadmin/serializers.py new file mode 100644 index 0000000..201cff7 --- /dev/null +++ b/backend/hostadmin/serializers.py @@ -0,0 +1,44 @@ +from rest_framework import serializers + +from authentication.serializers import OwnerSerializer +from hostadmin.models import Domain +from toolshed.models import Category, Property, Tag + + +class DomainSerializer(serializers.ModelSerializer): + owner = OwnerSerializer(read_only=True) + + class Meta: + model = Domain + fields = ['name', 'owner', 'open_registration'] + + def create(self, validated_data): + return super().create(validated_data) + + +class CategorySerializer(serializers.ModelSerializer): + parent = serializers.SlugRelatedField(slug_field='name', queryset=Category.objects.all(), required=False) + + class Meta: + model = Category + fields = ['name', 'description', 'parent', 'origin'] + read_only_fields = ['origin'] + + +class PropertySerializer(serializers.ModelSerializer): + category = serializers.SlugRelatedField(slug_field='name', queryset=Category.objects.all(), required=False) + + class Meta: + model = Property + fields = ['name', 'description', 'category', 'unit_symbol', 'unit_name', 'unit_name_plural', 'base2_prefix', + 'dimensions', 'origin'] + read_only_fields = ['origin'] + + +class TagSerializer(serializers.ModelSerializer): + category = serializers.SlugRelatedField(slug_field='name', queryset=Category.objects.all(), required=False) + + class Meta: + model = Tag + fields = ['name', 'description', 'category', 'origin'] + read_only_fields = ['origin'] diff --git a/backend/hostadmin/tests.py b/backend/hostadmin/tests.py index f2f84df..513d705 100644 --- a/backend/hostadmin/tests.py +++ b/backend/hostadmin/tests.py @@ -1,16 +1,18 @@ -from django.test import TestCase - from authentication.models import ToolshedUser +from authentication.tests import SignatureAuthClient, UserTestMixin, ToolshedTestCase from hostadmin.models import Domain +from django.test import Client + +from toolshed.tests import CategoryTestMixin, TagTestMixin, PropertyTestMixin + +anonymous_client = Client() +client = SignatureAuthClient() -class DomainTestCase(TestCase): +class DomainTestCase(ToolshedTestCase): def setUp(self): - admin = ToolshedUser.objects.create_superuser('admin', 'admin@localhost', '') - admin.set_password('testpassword') - admin.save() + admin = ToolshedUser.objects.create_superuser('admin', 'admin@localhost', 'testpassword') example_com = Domain.objects.create(name='example.com', owner=admin, open_registration=True) - example_com.save() def test_domain(self): example_com = Domain.objects.get(name='example.com') @@ -18,3 +20,268 @@ class DomainTestCase(TestCase): self.assertEqual(example_com.owner.username, 'admin') self.assertEqual(example_com.open_registration, True) self.assertEqual(str(example_com), 'example.com') + + +class DomainApiTestCase(UserTestMixin, ToolshedTestCase): + def setUp(self): + super().setUp() + self.prepare_users() + + def test_get_domains(self): + response = client.get('/api/domains/', self.f['local_user1']) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json(), ['example.com']) + + def test_admin_get_domains_fail(self): + response = client.get('/admin/domains/', self.f['local_user1']) + self.assertEqual(response.status_code, 403) + + def test_admin_get_domains(self): + response = client.get('/admin/domains/', self.f['admin']) + self.assertEqual(response.status_code, 200) + self.assertEqual(len(response.json()), 1) + self.assertEqual(response.json()[0]['name'], 'example.com') + self.assertEqual(response.json()[0]['owner'], str(self.f['admin'])) + self.assertEqual(response.json()[0]['open_registration'], True) + + def test_admin_create_domain(self): + response = client.post('/admin/domains/', self.f['admin'], + {'name': 'example2.com', 'owner': 'local_user1', 'open_registration': False}) + self.assertEqual(response.status_code, 201) + self.assertEqual(response.json()['name'], 'example2.com') + self.assertEqual(response.json()['owner'], str(self.f['admin'])) + self.assertEqual(response.json()['open_registration'], False) + self.assertEqual(Domain.objects.count(), 2) + self.assertEqual(Domain.objects.get(name='example2.com').owner, self.f['admin']) + self.assertEqual(Domain.objects.get(name='example2.com').open_registration, False) + + def test_admin_create_domain_fail(self): + response = client.post('/admin/domains/', self.f['local_user1'], + {'name': 'example2.com', 'owner': 'local_user1', 'open_registration': False}) + self.assertEqual(response.status_code, 403) + self.assertEqual(Domain.objects.count(), 1) + + def test_admin_update_domain(self): + response = client.put('/admin/domains/1/', self.f['admin'], + {'name': 'example.com', 'owner': 'local_user1', 'open_registration': False}) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json()['name'], 'example.com') + self.assertEqual(response.json()['owner'], str(self.f['admin'])) + self.assertEqual(response.json()['open_registration'], False) + self.assertEqual(Domain.objects.count(), 1) + self.assertEqual(Domain.objects.get(name='example.com').owner, self.f['admin']) + self.assertEqual(Domain.objects.get(name='example.com').open_registration, False) + + def test_admin_update_domain_fail(self): + response = client.put('/admin/domains/1/', self.f['local_user1'], + {'name': 'example.com', 'owner': 'local_user1', 'open_registration': False}) + self.assertEqual(response.status_code, 403) + self.assertEqual(Domain.objects.count(), 1) + + def test_admin_delete_domain(self): + response = client.delete('/admin/domains/1/', self.f['admin']) + self.assertEqual(response.status_code, 204) + self.assertEqual(Domain.objects.count(), 0) + + def test_admin_delete_domain_fail(self): + response = client.delete('/admin/domains/1/', self.f['local_user1']) + self.assertEqual(response.status_code, 403) + self.assertEqual(Domain.objects.count(), 1) + + +class CategoryApiTestCase(UserTestMixin, CategoryTestMixin, ToolshedTestCase): + + def setUp(self): + super().setUp() + self.prepare_users() + self.prepare_categories() + + def test_get_categories(self): + response = client.get('/api/categories/', self.f['local_user1']) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json(), + ["cat1", "cat2", "cat3", "cat1/subcat1", "cat1/subcat2", "cat1/subcat1/subcat3"]) + + def test_admin_get_categories_fail(self): + response = client.get('/admin/categories/', self.f['local_user1']) + self.assertEqual(response.status_code, 403) + + def test_admin_get_categories(self): + response = client.get('/admin/categories/', self.f['admin']) + self.assertEqual(response.status_code, 200) + self.assertEqual(len(response.json()), 6) + self.assertEqual(response.json()[0]['name'], 'cat1') + self.assertEqual(response.json()[1]['name'], 'cat2') + self.assertEqual(response.json()[2]['name'], 'cat3') + self.assertEqual(response.json()[3]['name'], 'subcat1') + self.assertEqual(response.json()[3]['parent'], 'cat1') + self.assertEqual(response.json()[4]['name'], 'subcat2') + self.assertEqual(response.json()[4]['parent'], 'cat1') + self.assertEqual(response.json()[5]['name'], 'subcat3') + self.assertEqual(response.json()[5]['parent'], 'subcat1') + + def test_admin_create_category(self): + response = client.post('/admin/categories/', self.f['admin'], {'name': 'cat4'}) + self.assertEqual(response.status_code, 201) + self.assertEqual(response.json()['name'], 'cat4') + self.assertEqual(response.json()['description'], None) + self.assertEqual(response.json()['parent'], None) + self.assertEqual(response.json()['origin'], 'api') + + def test_admin_post_subcategory(self): + response = client.post('/admin/categories/', self.f['admin'], {'name': 'subcat4', 'parent': 'cat1'}) + self.assertEqual(response.status_code, 201) + self.assertEqual(response.json()['name'], 'subcat4') + self.assertEqual(response.json()['description'], None) + self.assertEqual(response.json()['parent'], 'cat1') + self.assertEqual(response.json()['origin'], 'api') + + def test_admin_put_category(self): + response = client.put('/admin/categories/1/', self.f['admin'], {'name': 'cat5'}) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json()['name'], 'cat5') + self.assertEqual(response.json()['description'], None) + self.assertEqual(response.json()['parent'], None) + self.assertEqual(response.json()['origin'], 'test') + + def test_admin_patch_category(self): + response = client.patch('/admin/categories/1/', self.f['admin'], {'name': 'cat5'}) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json()['name'], 'cat5') + self.assertEqual(response.json()['description'], None) + self.assertEqual(response.json()['parent'], None) + self.assertEqual(response.json()['origin'], 'test') + + def test_admin_delete_category(self): + response = client.delete('/admin/categories/2/', self.f['admin']) + self.assertEqual(response.status_code, 204) + + +class TagApiTestCase(UserTestMixin, CategoryTestMixin, TagTestMixin, ToolshedTestCase): + + def setUp(self): + super().setUp() + self.prepare_users() + self.prepare_categories() + self.prepare_tags() + + def test_get_tags(self): + response = client.get('/api/tags/', self.f['local_user1']) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json(), ["tag1", "tag2", "tag3"]) + + def test_admin_get_tags_fail(self): + response = client.get('/admin/tags/', self.f['local_user1']) + self.assertEqual(response.status_code, 403) + + def test_admin_get_tags(self): + response = client.get('/admin/tags/', self.f['admin']) + self.assertEqual(response.status_code, 200) + self.assertEqual(len(response.json()), 3) + self.assertEqual(response.json()[0]['name'], 'tag1') + + def test_admin_create_tag(self): + response = client.post('/admin/tags/', self.f['admin'], {'name': 'tag4'}) + self.assertEqual(response.status_code, 201) + self.assertEqual(response.json()['name'], 'tag4') + self.assertEqual(response.json()['description'], None) + self.assertEqual(response.json()['origin'], 'api') + self.assertEqual(response.json()['category'], None) + + def test_admin_put_tag(self): + response = client.put('/admin/tags/1/', self.f['admin'], {'name': 'tag5'}) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json()['name'], 'tag5') + self.assertEqual(response.json()['description'], 'tag1 description') + self.assertEqual(response.json()['origin'], 'test') + self.assertEqual(response.json()['category'], 'cat1') + + def test_admin_patch_tag(self): + response = client.patch('/admin/tags/1/', self.f['admin'], {'name': 'tag5'}) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json()['name'], 'tag5') + self.assertEqual(response.json()['description'], 'tag1 description') + self.assertEqual(response.json()['origin'], 'test') + self.assertEqual(response.json()['category'], 'cat1') + + def test_admin_delete_tag(self): + response = client.delete('/admin/tags/2/', self.f['admin']) + self.assertEqual(response.status_code, 204) + + +class PropertyApiTestCase(UserTestMixin, CategoryTestMixin, PropertyTestMixin, ToolshedTestCase): + + def setUp(self): + super().setUp() + self.prepare_users() + self.prepare_categories() + self.prepare_properties() + + def test_get_properties(self): + response = client.get('/api/properties/', self.f['local_user1']) + self.assertEqual(response.status_code, 200) + self.assertEqual(len(response.json()), 3) + self.assertEqual(response.json()[0]['name'], 'prop1') + self.assertEqual(response.json()[1]['name'], 'prop2') + self.assertEqual(response.json()[2]['name'], 'prop3') + + def test_admin_get_properties_fail(self): + response = client.get('/admin/properties/', self.f['local_user1']) + self.assertEqual(response.status_code, 403) + + def test_admin_get_properties(self): + response = client.get('/admin/properties/', self.f['admin']) + self.assertEqual(response.status_code, 200) + self.assertEqual(len(response.json()), 3) + self.assertEqual(response.json()[0]['name'], 'prop1') + self.assertEqual(response.json()[1]['name'], 'prop2') + self.assertEqual(response.json()[2]['name'], 'prop3') + + def test_admin_create_property(self): + response = client.post('/admin/properties/', self.f['admin'], {'name': 'prop4'}) + self.assertEqual(response.status_code, 201) + self.assertEqual(response.json()['name'], 'prop4') + self.assertEqual(response.json()['description'], None) + self.assertEqual(response.json()['origin'], 'api') + self.assertEqual(response.json()['category'], None) + self.assertEqual(response.json()['unit_symbol'], None) + self.assertEqual(response.json()['unit_name'], None) + self.assertEqual(response.json()['unit_name_plural'], None) + self.assertEqual(response.json()['base2_prefix'], False) + self.assertEqual(response.json()['dimensions'], 1) + + # self.assertEqual(response.json()['sort_lexicographically'], False) + + def test_admin_put_property(self): + response = client.put('/admin/properties/1/', self.f['admin'], {'name': 'prop5'}) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json()['name'], 'prop5') + self.assertEqual(response.json()['description'], None) + self.assertEqual(response.json()['origin'], 'test') + self.assertEqual(response.json()['category'], None) + self.assertEqual(response.json()['unit_symbol'], None) + self.assertEqual(response.json()['unit_name'], None) + self.assertEqual(response.json()['unit_name_plural'], None) + self.assertEqual(response.json()['base2_prefix'], False) + self.assertEqual(response.json()['dimensions'], 1) + + # self.assertEqual(response.json()['sort_lexicographically'], False) + + def test_admin_patch_property(self): + response = client.patch('/admin/properties/1/', self.f['admin'], {'name': 'prop5'}) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json()['name'], 'prop5') + self.assertEqual(response.json()['description'], None) + self.assertEqual(response.json()['origin'], 'test') + self.assertEqual(response.json()['category'], None) + self.assertEqual(response.json()['unit_symbol'], None) + self.assertEqual(response.json()['unit_name'], None) + self.assertEqual(response.json()['unit_name_plural'], None) + self.assertEqual(response.json()['base2_prefix'], False) + self.assertEqual(response.json()['dimensions'], 1) + + # self.assertEqual(response.json()['sort_lexicographically'], False) + + def test_admin_delete_property(self): + response = client.delete('/admin/properties/2/', self.f['admin']) + self.assertEqual(response.status_code, 204)