Refactor tests, add compression test

This commit is contained in:
Sebastian Lohff 2019-02-17 20:53:37 +01:00
parent 0dcd09ac80
commit 1ec1671cb1
1 changed files with 24 additions and 29 deletions

View File

@ -1,4 +1,5 @@
import io import io
import os
import pytest import pytest
import requests import requests
import subprocess import subprocess
@ -51,21 +52,6 @@ def datadir(tmp_path):
return _datadir return _datadir
@pytest.fixture(scope='class')
def servefile_simple(run_servefile, datadir):
data = "NOOT NOOT"
p = datadir({'testfile': data}) / 'testfile'
run_servefile(str(p))
return dict(host='localhost', port=8080, protocol='http')
def download_ok(r, expected_data, expected_code=200):
assert r.status_code == expected_code
assert r.text == expected_data
def make_request(path='/', host='localhost', port=8080, method='get', protocol='http', **kwargs): def make_request(path='/', host='localhost', port=8080, method='get', protocol='http', **kwargs):
url = '{}://{}:{}{}'.format(protocol, host, port, path) url = '{}://{}:{}{}'.format(protocol, host, port, path)
print('Calling {} on {} with {}'.format(method, url, kwargs)) print('Calling {} on {} with {}'.format(method, url, kwargs))
@ -74,12 +60,15 @@ def make_request(path='/', host='localhost', port=8080, method='get', protocol='
return r return r
def check_download(expected_data=None, path='/', status_code=200, **kwargs): def check_download(expected_data=None, path='/', fname=None, status_code=200, **kwargs):
if fname is None:
fname = os.path.basename(path)
r = make_request(path, **kwargs) r = make_request(path, **kwargs)
assert r.status_code == 200 assert r.status_code == 200
assert r.text == expected_data assert r.text == expected_data
assert r.headers.get('Content-Type') == 'application/octet-stream' assert r.headers.get('Content-Type') == 'application/octet-stream'
# assert r.headers.get('Content-Disposition') == 'attachment; filename=""' if fname:
assert r.headers.get('Content-Disposition') == 'attachment; filename="{}"'.format(fname)
assert r.headers.get('Content-Transfer-Encoding') == 'binary' assert r.headers.get('Content-Transfer-Encoding') == 'binary'
return r # for additional tests return r # for additional tests
@ -97,6 +86,7 @@ def test_correct_headers(run_servefile, datadir):
data = "NOOT NOOT" data = "NOOT NOOT"
p = datadir({'testfile': data}) / 'testfile' p = datadir({'testfile': data}) / 'testfile'
run_servefile(str(p)) run_servefile(str(p))
r = make_request() r = make_request()
assert r.status_code == 200 assert r.status_code == 200
assert r.headers.get('Content-Type') == 'application/octet-stream' assert r.headers.get('Content-Type') == 'application/octet-stream'
@ -107,33 +97,32 @@ def test_correct_headers(run_servefile, datadir):
def test_redirect_and_download(run_servefile, datadir): def test_redirect_and_download(run_servefile, datadir):
data = "NOOT NOOT" data = "NOOT NOOT"
p = datadir({'testfile': data}) / 'testfile' p = datadir({'testfile': data}) / 'testfile'
run_servefile(str(p)) run_servefile(str(p))
# redirect
r = make_request(allow_redirects=False) r = make_request(allow_redirects=False)
assert r.status_code == 302 assert r.status_code == 302
assert r.headers.get('Location') == '/testfile' assert r.headers.get('Location') == '/testfile'
r = requests.get("http://127.0.0.1:8080") # normal download
download_ok(r, data) check_download(data, fname='testfile')
def test_specify_port(run_servefile, datadir): def test_specify_port(run_servefile, datadir):
data = "NOOT NOOT" data = "NOOT NOOT"
p = datadir({'testfile': data}) / 'testfile' p = datadir({'testfile': data}) / 'testfile'
run_servefile([str(p), '-p', '8081']) run_servefile([str(p), '-p', '8081'])
r = make_request(port=8081)
download_ok(r, data) check_download(data, fname='testfile', port=8081)
def test_big_download(run_servefile, datadir): def test_big_download(run_servefile, datadir):
# test with about 10 mb of data # test with about 10 mb of data
data = "x" * (10 * 1024 ** 2) data = "x" * (10 * 1024 ** 2)
p = datadir({'testfile': data}) / 'testfile' p = datadir({'testfile': data}) / 'testfile'
run_servefile(str(p)) run_servefile(str(p))
r = make_request()
download_ok(r, data) check_download(data, fname='testfile')
def test_authentication(run_servefile, datadir): def test_authentication(run_servefile, datadir):
@ -146,8 +135,7 @@ def test_authentication(run_servefile, datadir):
assert '401 - Unauthorized' in r.text assert '401 - Unauthorized' in r.text
assert r.status_code == 401 assert r.status_code == 401
r = make_request(auth=('user', 'password')) check_download(data, fname='testfile', auth=('user', 'password'))
download_ok(r, data)
def test_serve_directory(run_servefile, datadir): def test_serve_directory(run_servefile, datadir):
@ -244,7 +232,7 @@ def test_tar_mode(run_servefile, datadir):
# test contents of tar file # test contents of tar file
r = make_request() r = make_request()
assert r.status_code == 200 assert r.status_code == 200
tar = tarfile.open(fileobj=io.BytesIO(r.text.encode())) tar = tarfile.open(fileobj=io.BytesIO(r.content))
assert len(tar.getmembers()) == 3 assert len(tar.getmembers()) == 3
assert tar.getmember('foo').isdir() assert tar.getmember('foo').isdir()
for filename, content in d['foo'].items(): for filename, content in d['foo'].items():
@ -254,7 +242,14 @@ def test_tar_mode(run_servefile, datadir):
def test_tar_compression(run_servefile, datadir): def test_tar_compression(run_servefile, datadir):
pass d = {'foo': 'blubb'}
p = datadir(d)
run_servefile(['-c', 'gzip', '-t', str(p / 'foo')])
r = make_request()
assert r.status_code == 200
tar = tarfile.open(fileobj=io.BytesIO(r.content), mode='r:gz')
assert len(tar.getmembers()) == 1
def test_https(run_servefile, datadir): def test_https(run_servefile, datadir):