add ImportedIdentifierSets
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
j3d1 2023-10-22 22:46:55 +02:00
parent b6f1da1580
commit 5faec68ebf
6 changed files with 454 additions and 21 deletions

View file

@ -1,9 +1,11 @@
#!/usr/bin/env python3
import json
import os
import sys
from argparse import ArgumentParser
import dotenv
from django.db import transaction, IntegrityError
def yesno(prompt, default=False):
@ -78,10 +80,68 @@ def configure():
call_command('collectstatic', '--no-input')
if yesno("Do you want to import all categories, properties and tags contained in this repository?", default=True):
from hostadmin.serializers import CategorySerializer, PropertySerializer, TagSerializer
from hostadmin.models import ImportedIdentifierSets
if not os.path.exists('shared_data'):
os.mkdir('shared_data')
files = os.listdir('shared_data')
idsets = {}
for file in files:
if file.endswith('.json'):
name = "git:" + file[:-5]
with open('shared_data/' + file, 'r') as f:
try:
idset = json.load(f)
idsets[name] = idset
except json.decoder.JSONDecodeError:
print('Error: invalid JSON in file {}'.format(file))
imported_sets = ImportedIdentifierSets.objects.all()
for name in [name for name in idsets.keys() if imported_sets.filter(name=name).exists()]:
print('Identifier set {} already imported, skipping'.format(name))
queue = [name for name in idsets.keys() if not imported_sets.filter(name=name).exists()]
while queue:
name = queue.pop(0)
print('Importing {}...'.format(name))
idset = idsets[name]
if 'depends' in idset:
unmet_deps = [dep for dep in idset['depends'] if not imported_sets.filter(name=dep).exists()]
if unmet_deps:
if all([dep in idsets.keys() for dep in unmet_deps]):
print('Not all dependencies for {} are imported, postponing'.format(name))
queue.append(name)
continue
else:
print('unknown dependencies for {}: {}'.format(name, unmet_deps))
continue
with transaction.atomic():
try:
if 'categories' in idset:
for category in idset['categories']:
serializer = CategorySerializer(data=category)
if serializer.is_valid():
serializer.save(origin=name)
if 'properties' in idset:
for property in idset['properties']:
serializer = PropertySerializer(data=property)
if serializer.is_valid():
serializer.save(origin=name)
if 'tags' in idset:
for tag in idset['tags']:
serializer = TagSerializer(data=tag)
if serializer.is_valid():
serializer.save(origin=name)
imported_sets.create(name=name)
except IntegrityError:
print('Error: integrity error while importing {}\n\tmight be cause by name conflicts with existing'
' categories, properties or tags'.format(name))
continue
def reset():
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "backend.settings")
import django
import shutil
django.setup()
@ -90,17 +150,20 @@ def reset():
except FileNotFoundError:
pass
for file in os.listdir('userfiles'):
try:
shutil.rmtree('userfiles/' + file)
except FileNotFoundError:
pass
os.system("git clean -f */migrations")
from django.core.management import call_command
apps = ['authentication', 'authtoken', 'sessions', 'hostadmin', 'toolshed', 'admin']
apps = ['authentication', 'authtoken', 'sessions', 'hostadmin', 'files', 'toolshed', 'admin']
for app in apps:
call_command('makemigrations', app)
for app in apps:
call_command('migrate', app)
def testdata():
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "backend.settings")

View file

@ -1,26 +1,59 @@
from rest_framework import routers, serializers, viewsets
from rest_framework import routers, viewsets
from rest_framework.authentication import TokenAuthentication
from rest_framework.permissions import IsAuthenticated
from rest_framework.permissions import IsAuthenticated, IsAdminUser
from authentication.signature_auth import SignatureAuthenticationLocal
from hostadmin.models import Domain
from hostadmin.serializers import DomainSerializer, CategorySerializer, PropertySerializer, TagSerializer
from toolshed.models import Category, Property, Tag
router = routers.SimpleRouter()
class DomainSerializer(serializers.ModelSerializer):
class Meta:
model = Domain
fields = '__all__'
class DomainViewSet(viewsets.ModelViewSet):
queryset = Domain.objects.all()
serializer_class = DomainSerializer
authentication_classes = [TokenAuthentication]
permission_classes = [IsAuthenticated]
authentication_classes = [TokenAuthentication, SignatureAuthenticationLocal]
permission_classes = [IsAuthenticated, IsAdminUser]
def perform_create(self, serializer):
serializer.save(owner=self.request.user)
class CategoryViewSet(viewsets.ModelViewSet):
queryset = Category.objects.all()
serializer_class = CategorySerializer
authentication_classes = [TokenAuthentication, SignatureAuthenticationLocal]
permission_classes = [IsAuthenticated, IsAdminUser]
def perform_create(self, serializer):
serializer.save(origin='api')
class PropertyViewSet(viewsets.ModelViewSet):
queryset = Property.objects.all()
serializer_class = PropertySerializer
authentication_classes = [TokenAuthentication, SignatureAuthenticationLocal]
permission_classes = [IsAuthenticated, IsAdminUser]
def perform_create(self, serializer):
serializer.save(origin='api')
class TagViewSet(viewsets.ModelViewSet):
queryset = Tag.objects.all()
serializer_class = TagSerializer
authentication_classes = [TokenAuthentication, SignatureAuthenticationLocal]
permission_classes = [IsAuthenticated, IsAdminUser]
def perform_create(self, serializer):
serializer.save(origin='api')
router.register(r'domains', DomainViewSet, basename='domains')
router.register(r'categories', CategoryViewSet, basename='categories')
router.register(r'properties', PropertyViewSet, basename='properties')
router.register(r'tags', TagViewSet, basename='tags')
urlpatterns = [
*router.urls,

View file

@ -0,0 +1,21 @@
# Generated by Django 4.2.2 on 2023-07-04 22:30
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('hostadmin', '0001_initial'),
]
operations = [
migrations.CreateModel(
name='ImportedIdentifierSets',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('name', models.CharField(max_length=255, unique=True)),
('created_at', models.DateTimeField(auto_now_add=True)),
],
),
]

View file

@ -8,3 +8,8 @@ class Domain(models.Model):
def __str__(self):
return self.name
class ImportedIdentifierSets(models.Model):
name = models.CharField(max_length=255, unique=True)
created_at = models.DateTimeField(auto_now_add=True)

View file

@ -0,0 +1,44 @@
from rest_framework import serializers
from authentication.serializers import OwnerSerializer
from hostadmin.models import Domain
from toolshed.models import Category, Property, Tag
class DomainSerializer(serializers.ModelSerializer):
owner = OwnerSerializer(read_only=True)
class Meta:
model = Domain
fields = ['name', 'owner', 'open_registration']
def create(self, validated_data):
return super().create(validated_data)
class CategorySerializer(serializers.ModelSerializer):
parent = serializers.SlugRelatedField(slug_field='name', queryset=Category.objects.all(), required=False)
class Meta:
model = Category
fields = ['name', 'description', 'parent', 'origin']
read_only_fields = ['origin']
class PropertySerializer(serializers.ModelSerializer):
category = serializers.SlugRelatedField(slug_field='name', queryset=Category.objects.all(), required=False)
class Meta:
model = Property
fields = ['name', 'description', 'category', 'unit_symbol', 'unit_name', 'unit_name_plural', 'base2_prefix',
'dimensions', 'origin']
read_only_fields = ['origin']
class TagSerializer(serializers.ModelSerializer):
category = serializers.SlugRelatedField(slug_field='name', queryset=Category.objects.all(), required=False)
class Meta:
model = Tag
fields = ['name', 'description', 'category', 'origin']
read_only_fields = ['origin']

View file

@ -1,16 +1,18 @@
from django.test import TestCase
from authentication.models import ToolshedUser
from authentication.tests import SignatureAuthClient, UserTestMixin, ToolshedTestCase
from hostadmin.models import Domain
from django.test import Client
from toolshed.tests import CategoryTestMixin, TagTestMixin, PropertyTestMixin
anonymous_client = Client()
client = SignatureAuthClient()
class DomainTestCase(TestCase):
class DomainTestCase(ToolshedTestCase):
def setUp(self):
admin = ToolshedUser.objects.create_superuser('admin', 'admin@localhost', '')
admin.set_password('testpassword')
admin.save()
admin = ToolshedUser.objects.create_superuser('admin', 'admin@localhost', 'testpassword')
example_com = Domain.objects.create(name='example.com', owner=admin, open_registration=True)
example_com.save()
def test_domain(self):
example_com = Domain.objects.get(name='example.com')
@ -18,3 +20,268 @@ class DomainTestCase(TestCase):
self.assertEqual(example_com.owner.username, 'admin')
self.assertEqual(example_com.open_registration, True)
self.assertEqual(str(example_com), 'example.com')
class DomainApiTestCase(UserTestMixin, ToolshedTestCase):
def setUp(self):
super().setUp()
self.prepare_users()
def test_get_domains(self):
response = client.get('/api/domains/', self.f['local_user1'])
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json(), ['example.com'])
def test_admin_get_domains_fail(self):
response = client.get('/admin/domains/', self.f['local_user1'])
self.assertEqual(response.status_code, 403)
def test_admin_get_domains(self):
response = client.get('/admin/domains/', self.f['admin'])
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.json()), 1)
self.assertEqual(response.json()[0]['name'], 'example.com')
self.assertEqual(response.json()[0]['owner'], str(self.f['admin']))
self.assertEqual(response.json()[0]['open_registration'], True)
def test_admin_create_domain(self):
response = client.post('/admin/domains/', self.f['admin'],
{'name': 'example2.com', 'owner': 'local_user1', 'open_registration': False})
self.assertEqual(response.status_code, 201)
self.assertEqual(response.json()['name'], 'example2.com')
self.assertEqual(response.json()['owner'], str(self.f['admin']))
self.assertEqual(response.json()['open_registration'], False)
self.assertEqual(Domain.objects.count(), 2)
self.assertEqual(Domain.objects.get(name='example2.com').owner, self.f['admin'])
self.assertEqual(Domain.objects.get(name='example2.com').open_registration, False)
def test_admin_create_domain_fail(self):
response = client.post('/admin/domains/', self.f['local_user1'],
{'name': 'example2.com', 'owner': 'local_user1', 'open_registration': False})
self.assertEqual(response.status_code, 403)
self.assertEqual(Domain.objects.count(), 1)
def test_admin_update_domain(self):
response = client.put('/admin/domains/1/', self.f['admin'],
{'name': 'example.com', 'owner': 'local_user1', 'open_registration': False})
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json()['name'], 'example.com')
self.assertEqual(response.json()['owner'], str(self.f['admin']))
self.assertEqual(response.json()['open_registration'], False)
self.assertEqual(Domain.objects.count(), 1)
self.assertEqual(Domain.objects.get(name='example.com').owner, self.f['admin'])
self.assertEqual(Domain.objects.get(name='example.com').open_registration, False)
def test_admin_update_domain_fail(self):
response = client.put('/admin/domains/1/', self.f['local_user1'],
{'name': 'example.com', 'owner': 'local_user1', 'open_registration': False})
self.assertEqual(response.status_code, 403)
self.assertEqual(Domain.objects.count(), 1)
def test_admin_delete_domain(self):
response = client.delete('/admin/domains/1/', self.f['admin'])
self.assertEqual(response.status_code, 204)
self.assertEqual(Domain.objects.count(), 0)
def test_admin_delete_domain_fail(self):
response = client.delete('/admin/domains/1/', self.f['local_user1'])
self.assertEqual(response.status_code, 403)
self.assertEqual(Domain.objects.count(), 1)
class CategoryApiTestCase(UserTestMixin, CategoryTestMixin, ToolshedTestCase):
def setUp(self):
super().setUp()
self.prepare_users()
self.prepare_categories()
def test_get_categories(self):
response = client.get('/api/categories/', self.f['local_user1'])
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json(),
["cat1", "cat2", "cat3", "cat1/subcat1", "cat1/subcat2", "cat1/subcat1/subcat3"])
def test_admin_get_categories_fail(self):
response = client.get('/admin/categories/', self.f['local_user1'])
self.assertEqual(response.status_code, 403)
def test_admin_get_categories(self):
response = client.get('/admin/categories/', self.f['admin'])
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.json()), 6)
self.assertEqual(response.json()[0]['name'], 'cat1')
self.assertEqual(response.json()[1]['name'], 'cat2')
self.assertEqual(response.json()[2]['name'], 'cat3')
self.assertEqual(response.json()[3]['name'], 'subcat1')
self.assertEqual(response.json()[3]['parent'], 'cat1')
self.assertEqual(response.json()[4]['name'], 'subcat2')
self.assertEqual(response.json()[4]['parent'], 'cat1')
self.assertEqual(response.json()[5]['name'], 'subcat3')
self.assertEqual(response.json()[5]['parent'], 'subcat1')
def test_admin_create_category(self):
response = client.post('/admin/categories/', self.f['admin'], {'name': 'cat4'})
self.assertEqual(response.status_code, 201)
self.assertEqual(response.json()['name'], 'cat4')
self.assertEqual(response.json()['description'], None)
self.assertEqual(response.json()['parent'], None)
self.assertEqual(response.json()['origin'], 'api')
def test_admin_post_subcategory(self):
response = client.post('/admin/categories/', self.f['admin'], {'name': 'subcat4', 'parent': 'cat1'})
self.assertEqual(response.status_code, 201)
self.assertEqual(response.json()['name'], 'subcat4')
self.assertEqual(response.json()['description'], None)
self.assertEqual(response.json()['parent'], 'cat1')
self.assertEqual(response.json()['origin'], 'api')
def test_admin_put_category(self):
response = client.put('/admin/categories/1/', self.f['admin'], {'name': 'cat5'})
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json()['name'], 'cat5')
self.assertEqual(response.json()['description'], None)
self.assertEqual(response.json()['parent'], None)
self.assertEqual(response.json()['origin'], 'test')
def test_admin_patch_category(self):
response = client.patch('/admin/categories/1/', self.f['admin'], {'name': 'cat5'})
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json()['name'], 'cat5')
self.assertEqual(response.json()['description'], None)
self.assertEqual(response.json()['parent'], None)
self.assertEqual(response.json()['origin'], 'test')
def test_admin_delete_category(self):
response = client.delete('/admin/categories/2/', self.f['admin'])
self.assertEqual(response.status_code, 204)
class TagApiTestCase(UserTestMixin, CategoryTestMixin, TagTestMixin, ToolshedTestCase):
def setUp(self):
super().setUp()
self.prepare_users()
self.prepare_categories()
self.prepare_tags()
def test_get_tags(self):
response = client.get('/api/tags/', self.f['local_user1'])
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json(), ["tag1", "tag2", "tag3"])
def test_admin_get_tags_fail(self):
response = client.get('/admin/tags/', self.f['local_user1'])
self.assertEqual(response.status_code, 403)
def test_admin_get_tags(self):
response = client.get('/admin/tags/', self.f['admin'])
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.json()), 3)
self.assertEqual(response.json()[0]['name'], 'tag1')
def test_admin_create_tag(self):
response = client.post('/admin/tags/', self.f['admin'], {'name': 'tag4'})
self.assertEqual(response.status_code, 201)
self.assertEqual(response.json()['name'], 'tag4')
self.assertEqual(response.json()['description'], None)
self.assertEqual(response.json()['origin'], 'api')
self.assertEqual(response.json()['category'], None)
def test_admin_put_tag(self):
response = client.put('/admin/tags/1/', self.f['admin'], {'name': 'tag5'})
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json()['name'], 'tag5')
self.assertEqual(response.json()['description'], 'tag1 description')
self.assertEqual(response.json()['origin'], 'test')
self.assertEqual(response.json()['category'], 'cat1')
def test_admin_patch_tag(self):
response = client.patch('/admin/tags/1/', self.f['admin'], {'name': 'tag5'})
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json()['name'], 'tag5')
self.assertEqual(response.json()['description'], 'tag1 description')
self.assertEqual(response.json()['origin'], 'test')
self.assertEqual(response.json()['category'], 'cat1')
def test_admin_delete_tag(self):
response = client.delete('/admin/tags/2/', self.f['admin'])
self.assertEqual(response.status_code, 204)
class PropertyApiTestCase(UserTestMixin, CategoryTestMixin, PropertyTestMixin, ToolshedTestCase):
def setUp(self):
super().setUp()
self.prepare_users()
self.prepare_categories()
self.prepare_properties()
def test_get_properties(self):
response = client.get('/api/properties/', self.f['local_user1'])
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.json()), 3)
self.assertEqual(response.json()[0]['name'], 'prop1')
self.assertEqual(response.json()[1]['name'], 'prop2')
self.assertEqual(response.json()[2]['name'], 'prop3')
def test_admin_get_properties_fail(self):
response = client.get('/admin/properties/', self.f['local_user1'])
self.assertEqual(response.status_code, 403)
def test_admin_get_properties(self):
response = client.get('/admin/properties/', self.f['admin'])
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.json()), 3)
self.assertEqual(response.json()[0]['name'], 'prop1')
self.assertEqual(response.json()[1]['name'], 'prop2')
self.assertEqual(response.json()[2]['name'], 'prop3')
def test_admin_create_property(self):
response = client.post('/admin/properties/', self.f['admin'], {'name': 'prop4'})
self.assertEqual(response.status_code, 201)
self.assertEqual(response.json()['name'], 'prop4')
self.assertEqual(response.json()['description'], None)
self.assertEqual(response.json()['origin'], 'api')
self.assertEqual(response.json()['category'], None)
self.assertEqual(response.json()['unit_symbol'], None)
self.assertEqual(response.json()['unit_name'], None)
self.assertEqual(response.json()['unit_name_plural'], None)
self.assertEqual(response.json()['base2_prefix'], False)
self.assertEqual(response.json()['dimensions'], 1)
# self.assertEqual(response.json()['sort_lexicographically'], False)
def test_admin_put_property(self):
response = client.put('/admin/properties/1/', self.f['admin'], {'name': 'prop5'})
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json()['name'], 'prop5')
self.assertEqual(response.json()['description'], None)
self.assertEqual(response.json()['origin'], 'test')
self.assertEqual(response.json()['category'], None)
self.assertEqual(response.json()['unit_symbol'], None)
self.assertEqual(response.json()['unit_name'], None)
self.assertEqual(response.json()['unit_name_plural'], None)
self.assertEqual(response.json()['base2_prefix'], False)
self.assertEqual(response.json()['dimensions'], 1)
# self.assertEqual(response.json()['sort_lexicographically'], False)
def test_admin_patch_property(self):
response = client.patch('/admin/properties/1/', self.f['admin'], {'name': 'prop5'})
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json()['name'], 'prop5')
self.assertEqual(response.json()['description'], None)
self.assertEqual(response.json()['origin'], 'test')
self.assertEqual(response.json()['category'], None)
self.assertEqual(response.json()['unit_symbol'], None)
self.assertEqual(response.json()['unit_name'], None)
self.assertEqual(response.json()['unit_name_plural'], None)
self.assertEqual(response.json()['base2_prefix'], False)
self.assertEqual(response.json()['dimensions'], 1)
# self.assertEqual(response.json()['sort_lexicographically'], False)
def test_admin_delete_property(self):
response = client.delete('/admin/properties/2/', self.f['admin'])
self.assertEqual(response.status_code, 204)