2023-07-01 18:45:38 +00:00
|
|
|
from django.core.files.base import ContentFile
|
|
|
|
from django.core.files.storage import DefaultStorage
|
|
|
|
from django.db import IntegrityError, transaction
|
|
|
|
from django.test import Client
|
2023-10-23 21:51:30 +00:00
|
|
|
from authentication.tests import SignatureAuthClient, ToolshedTestCase, UserTestMixin
|
2023-11-01 03:32:03 +00:00
|
|
|
from toolshed.tests import InventoryTestMixin
|
2023-07-01 18:45:38 +00:00
|
|
|
from nacl.hash import sha256
|
|
|
|
from nacl.encoding import HexEncoder
|
|
|
|
import base64
|
|
|
|
|
|
|
|
from files.models import File
|
|
|
|
|
|
|
|
anonymous_client = Client()
|
|
|
|
client = SignatureAuthClient()
|
|
|
|
|
|
|
|
|
|
|
|
def rmdir(storage, path):
|
|
|
|
dirs, files = storage.listdir(path)
|
|
|
|
for file in files:
|
|
|
|
storage.delete(path + file)
|
|
|
|
for dir in dirs:
|
|
|
|
rmdir(storage, path + dir + "/")
|
|
|
|
storage.delete(path)
|
|
|
|
|
|
|
|
|
|
|
|
def countdir(storage, path):
|
|
|
|
dirs, files = storage.listdir(path)
|
|
|
|
count = len(files)
|
|
|
|
for dir in dirs:
|
|
|
|
count += countdir(storage, path + dir + "/")
|
|
|
|
return count
|
|
|
|
|
|
|
|
|
|
|
|
class FilesTestMixin:
|
|
|
|
def prepare_files(self):
|
|
|
|
rmdir(DefaultStorage(), '')
|
|
|
|
self.f['test_content1'] = b'testcontent1'
|
|
|
|
self.f['hash1'] = sha256(self.f['test_content1'], encoder=HexEncoder).decode('utf-8')
|
|
|
|
self.f['encoded_content1'] = base64.b64encode(self.f['test_content1']).decode('utf-8')
|
|
|
|
self.f['test_file1'] = File.objects.create(mime_type='text/plain', data=self.f['encoded_content1'])
|
|
|
|
self.f['test_content2'] = b'testcontent2'
|
|
|
|
self.f['hash2'] = sha256(self.f['test_content2'], encoder=HexEncoder).decode('utf-8')
|
|
|
|
self.f['encoded_content2'] = base64.b64encode(self.f['test_content2']).decode('utf-8')
|
|
|
|
self.f['test_file2'] = File.objects.create(mime_type='text/plain', data=self.f['encoded_content2'])
|
|
|
|
self.f['test_content3'] = b'testcontent3'
|
|
|
|
self.f['hash3'] = sha256(self.f['test_content3'], encoder=HexEncoder).decode('utf-8')
|
|
|
|
self.f['encoded_content3'] = base64.b64encode(self.f['test_content3']).decode('utf-8')
|
|
|
|
self.f['test_file3'] = File.objects.create(mime_type='text/plain', data=self.f['encoded_content3'])
|
|
|
|
self.f['test_content4'] = b'testcontent4'
|
|
|
|
self.f['hash4'] = sha256(self.f['test_content4'], encoder=HexEncoder).decode('utf-8')
|
|
|
|
self.f['encoded_content4'] = base64.b64encode(self.f['test_content4']).decode('utf-8')
|
|
|
|
|
|
|
|
|
|
|
|
class FilesTestCase(FilesTestMixin, ToolshedTestCase):
|
|
|
|
def setUp(self):
|
|
|
|
super().setUp()
|
|
|
|
self.prepare_files()
|
|
|
|
|
|
|
|
def test_file_list(self):
|
|
|
|
self.assertEqual(File.objects.count(), 3)
|
|
|
|
self.assertEqual(countdir(DefaultStorage(), ''), 3)
|
|
|
|
|
|
|
|
def test_file_upload(self):
|
|
|
|
File.objects.create(mime_type='text/plain', data=self.f['encoded_content4'])
|
|
|
|
self.assertEqual(File.objects.count(), 4)
|
|
|
|
self.assertEqual(countdir(DefaultStorage(), ''), 4)
|
|
|
|
self.assertEqual(File.objects.get(id=4).file.read(), self.f['test_content4'])
|
|
|
|
self.assertEqual(File.objects.get(id=4).file.name,
|
|
|
|
f"{self.f['hash4'][:2]}/{self.f['hash4'][2:4]}/{self.f['hash4'][4:6]}/{self.f['hash4'][6:]}")
|
|
|
|
|
|
|
|
def test_file_upload_fail(self):
|
|
|
|
with transaction.atomic():
|
|
|
|
with self.assertRaises(ValueError):
|
|
|
|
File.objects.create(file=ContentFile(self.f['test_content4']), mime_type='text/plain')
|
|
|
|
self.assertEqual(File.objects.count(), 3)
|
|
|
|
self.assertEqual(countdir(DefaultStorage(), ''), 3)
|
|
|
|
|
|
|
|
def test_file_upload_duplicate(self):
|
|
|
|
with transaction.atomic():
|
|
|
|
with self.assertRaises(IntegrityError):
|
|
|
|
File.objects.create(mime_type='text/plain', data=self.f['encoded_content3'])
|
|
|
|
self.assertEqual(File.objects.count(), 3)
|
|
|
|
self.assertEqual(countdir(DefaultStorage(), ''), 3)
|
|
|
|
|
|
|
|
def test_file_upload_get_or_create(self):
|
|
|
|
file, created = File.objects.get_or_create(data=self.f['encoded_content3'])
|
|
|
|
self.assertEqual(File.objects.count(), 3)
|
|
|
|
self.assertEqual(countdir(DefaultStorage(), ''), 3)
|
|
|
|
self.assertFalse(created)
|
|
|
|
self.assertEqual(file.file.read(), self.f['test_content3'])
|
|
|
|
self.assertEqual(file.file.name,
|
|
|
|
f"{self.f['hash3'][:2]}/{self.f['hash3'][2:4]}/{self.f['hash3'][4:6]}/{self.f['hash3'][6:]}")
|
|
|
|
file, created = File.objects.get_or_create(data=self.f['encoded_content4'])
|
|
|
|
self.assertEqual(File.objects.count(), 4)
|
|
|
|
self.assertEqual(countdir(DefaultStorage(), ''), 4)
|
|
|
|
self.assertTrue(created)
|
|
|
|
self.assertEqual(file.file.read(), self.f['test_content4'])
|
|
|
|
self.assertEqual(file.file.name,
|
2023-10-23 21:51:30 +00:00
|
|
|
f"{self.f['hash4'][:2]}/{self.f['hash4'][2:4]}/{self.f['hash4'][4:6]}/{self.f['hash4'][6:]}")
|
2023-07-01 18:45:38 +00:00
|
|
|
|
|
|
|
def test_file_upload_get_or_create_fail(self):
|
|
|
|
with transaction.atomic():
|
|
|
|
with self.assertRaises(ValueError):
|
|
|
|
File.objects.get_or_create(hash=self.f['hash3'])
|
|
|
|
self.assertEqual(File.objects.count(), 3)
|
|
|
|
self.assertEqual(countdir(DefaultStorage(), ''), 3)
|
2023-10-23 21:51:30 +00:00
|
|
|
|
|
|
|
|
2023-11-01 03:32:03 +00:00
|
|
|
class MediaUrlTestCase(FilesTestMixin, UserTestMixin, InventoryTestMixin, ToolshedTestCase):
|
2023-10-23 21:51:30 +00:00
|
|
|
def setUp(self):
|
|
|
|
super().setUp()
|
|
|
|
self.prepare_files()
|
|
|
|
self.prepare_users()
|
2023-11-01 03:32:03 +00:00
|
|
|
self.prepare_categories()
|
|
|
|
self.prepare_tags()
|
|
|
|
self.prepare_properties()
|
|
|
|
self.prepare_inventory()
|
|
|
|
self.f['item1'].files.add(self.f['test_file1'])
|
|
|
|
self.f['item1'].files.add(self.f['test_file2'])
|
|
|
|
self.f['item2'].files.add(self.f['test_file1'])
|
|
|
|
|
2023-10-23 21:51:30 +00:00
|
|
|
|
|
|
|
def test_file_url(self):
|
|
|
|
reply = client.get(
|
|
|
|
f"/media/{self.f['hash1'][:2]}/{self.f['hash1'][2:4]}/{self.f['hash1'][4:6]}/{self.f['hash1'][6:]}",
|
|
|
|
self.f['local_user1'])
|
|
|
|
self.assertEqual(reply.status_code, 200)
|
|
|
|
self.assertEqual(reply.headers['X-Accel-Redirect'],
|
|
|
|
f"/redirect_media/{self.f['hash1'][:2]}/{self.f['hash1'][2:4]}/{self.f['hash1'][4:6]}/{self.f['hash1'][6:]}")
|
2023-11-01 01:42:34 +00:00
|
|
|
self.assertEqual(reply.headers['Content-Type'], self.f['test_file1'].mime_type)
|
2023-10-23 21:51:30 +00:00
|
|
|
reply = client.get(
|
|
|
|
f"/media/{self.f['hash2'][:2]}/{self.f['hash2'][2:4]}/{self.f['hash2'][4:6]}/{self.f['hash2'][6:]}",
|
|
|
|
self.f['local_user1'])
|
|
|
|
self.assertEqual(reply.status_code, 200)
|
|
|
|
self.assertEqual(reply.headers['X-Accel-Redirect'],
|
|
|
|
f"/redirect_media/{self.f['hash2'][:2]}/{self.f['hash2'][2:4]}/{self.f['hash2'][4:6]}/{self.f['hash2'][6:]}")
|
2023-11-01 01:42:34 +00:00
|
|
|
self.assertEqual(reply.headers['Content-Type'], self.f['test_file2'].mime_type)
|
2023-11-01 03:32:03 +00:00
|
|
|
reply = client.get(
|
|
|
|
f"/media/{self.f['hash2'][:2]}/{self.f['hash2'][2:4]}/{self.f['hash2'][4:6]}/{self.f['hash2'][6:]}",
|
|
|
|
self.f['local_user2'])
|
|
|
|
self.assertEqual(reply.status_code, 200)
|
|
|
|
self.assertEqual(reply.headers['X-Accel-Redirect'],
|
|
|
|
f"/redirect_media/{self.f['hash2'][:2]}/{self.f['hash2'][2:4]}/{self.f['hash2'][4:6]}/{self.f['hash2'][6:]}")
|
|
|
|
self.assertEqual(reply.headers['Content-Type'], self.f['test_file2'].mime_type)
|
2023-10-23 21:51:30 +00:00
|
|
|
|
|
|
|
def test_file_url_fail(self):
|
|
|
|
reply = client.get('/media/{}/'.format('nonexistent'), self.f['local_user1'])
|
|
|
|
self.assertEqual(reply.status_code, 404)
|
|
|
|
self.assertTrue('X-Accel-Redirect' not in reply.headers)
|
|
|
|
|
2023-11-01 03:32:03 +00:00
|
|
|
def test_file_url_anonymous(self):
|
|
|
|
reply = anonymous_client.get(
|
|
|
|
f"/media/{self.f['hash1'][:2]}/{self.f['hash1'][2:4]}/{self.f['hash1'][4:6]}/{self.f['hash1'][6:]}")
|
|
|
|
self.assertEqual(reply.status_code, 403)
|
|
|
|
self.assertTrue('X-Accel-Redirect' not in reply.headers)
|
2023-10-23 21:51:30 +00:00
|
|
|
|
2023-11-01 03:32:03 +00:00
|
|
|
def test_file_url_wrong_user(self):
|
|
|
|
reply = client.get(
|
|
|
|
f"/media/{self.f['hash3'][:2]}/{self.f['hash3'][2:4]}/{self.f['hash3'][4:6]}/{self.f['hash3'][6:]}",
|
|
|
|
self.f['local_user1'])
|
|
|
|
self.assertEqual(reply.status_code, 404)
|
|
|
|
self.assertTrue('X-Accel-Redirect' not in reply.headers)
|
|
|
|
reply = client.get(
|
|
|
|
f"/media/{self.f['hash2'][:2]}/{self.f['hash2'][2:4]}/{self.f['hash2'][4:6]}/{self.f['hash2'][6:]}",
|
|
|
|
self.f['ext_user1'])
|
|
|
|
self.assertEqual(reply.status_code, 404)
|
|
|
|
self.assertTrue('X-Accel-Redirect' not in reply.headers)
|