Compare commits
1 Commits
2db018a93e
...
6169afc266
Author | SHA1 | Date |
---|---|---|
Sebastian Lohff | 6169afc266 |
|
@ -1,12 +1,8 @@
|
||||||
import io
|
|
||||||
import pytest
|
import pytest
|
||||||
import requests
|
import requests
|
||||||
import subprocess
|
|
||||||
import tarfile
|
|
||||||
import time
|
import time
|
||||||
import urllib3
|
import subprocess
|
||||||
|
|
||||||
# crudly written to learn more about pytest and to have a base for refactoring
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def run_servefile():
|
def run_servefile():
|
||||||
|
@ -48,38 +44,28 @@ def datadir(tmp_path):
|
||||||
return _datadir
|
return _datadir
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope='class')
|
@pytest.fixture(scope='module')
|
||||||
def servefile_simple(run_servefile, datadir):
|
def servefile_simple(run_servefile, datadir):
|
||||||
data = "NOOT NOOT"
|
data = "NOOT NOOT"
|
||||||
p = datadir({'testfile': data}) / 'testfile'
|
p = datadir({'testfile': data}) / 'testfile'
|
||||||
|
|
||||||
run_servefile(str(p))
|
run_servefile(str(p))
|
||||||
|
|
||||||
return dict(host='localhost', port=8080, protocol='http')
|
|
||||||
|
|
||||||
|
|
||||||
def download_ok(r, expected_data, expected_code=200):
|
def download_ok(r, expected_data, expected_code=200):
|
||||||
assert r.status_code == expected_code
|
assert r.status_code == expected_code
|
||||||
assert r.text == expected_data
|
assert r.text == expected_data
|
||||||
|
|
||||||
|
|
||||||
def make_request(path='/', host='localhost', port=8080, method='get', protocol='http', **kwargs):
|
def check_download(expected_data, path='/', host='localhost', port=8080, protocol='http', status_code=200):
|
||||||
url = '{}://{}:{}{}'.format(protocol, host, port, path)
|
url = '{}://{}:{}{}'.format(protocol, host, port, path)
|
||||||
r = getattr(requests, method)(url, **kwargs)
|
r = requests.get(url, allow_redirects=False)
|
||||||
|
|
||||||
return r
|
|
||||||
|
|
||||||
|
|
||||||
def check_download(expected_data=None, path='/', status_code=200, **kwargs):
|
|
||||||
r = make_request(path, **kwargs)
|
|
||||||
assert r.status_code == 200
|
assert r.status_code == 200
|
||||||
assert r.text == expected_data
|
assert r.text == expected_data
|
||||||
assert r.headers.get('Content-Type') == 'application/octet-stream'
|
assert r.headers.get('Content-Type') == 'application/octet-stream'
|
||||||
# assert r.headers.get('Content-Disposition') == 'attachment; filename=""'
|
# assert r.headers.get('Content-Disposition') == 'attachment; filename=""'
|
||||||
assert r.headers.get('Content-Transfer-Encoding') == 'binary'
|
assert r.headers.get('Content-Transfer-Encoding') == 'binary'
|
||||||
|
|
||||||
return r # for additional tests
|
|
||||||
|
|
||||||
|
|
||||||
def test_version(run_servefile):
|
def test_version(run_servefile):
|
||||||
s = run_servefile('--version', stderr=subprocess.PIPE)
|
s = run_servefile('--version', stderr=subprocess.PIPE)
|
||||||
|
@ -92,7 +78,7 @@ def test_correct_headers(run_servefile, datadir):
|
||||||
data = "NOOT NOOT"
|
data = "NOOT NOOT"
|
||||||
p = datadir({'testfile': data}) / 'testfile'
|
p = datadir({'testfile': data}) / 'testfile'
|
||||||
run_servefile(str(p))
|
run_servefile(str(p))
|
||||||
r = make_request()
|
r = requests.get("http://127.0.0.1:8080")
|
||||||
assert r.status_code == 200
|
assert r.status_code == 200
|
||||||
assert r.headers.get('Content-Type') == 'application/octet-stream'
|
assert r.headers.get('Content-Type') == 'application/octet-stream'
|
||||||
assert r.headers.get('Content-Disposition') == 'attachment; filename="testfile"'
|
assert r.headers.get('Content-Disposition') == 'attachment; filename="testfile"'
|
||||||
|
@ -105,7 +91,7 @@ def test_redirect_and_download(run_servefile, datadir):
|
||||||
|
|
||||||
run_servefile(str(p))
|
run_servefile(str(p))
|
||||||
|
|
||||||
r = make_request(allow_redirects=False)
|
r = requests.get("http://127.0.0.1:8080", allow_redirects=False)
|
||||||
assert r.status_code == 302
|
assert r.status_code == 302
|
||||||
assert r.headers.get('Location') == '/testfile'
|
assert r.headers.get('Location') == '/testfile'
|
||||||
|
|
||||||
|
@ -117,7 +103,7 @@ def test_specify_port(run_servefile, datadir):
|
||||||
data = "NOOT NOOT"
|
data = "NOOT NOOT"
|
||||||
p = datadir({'testfile': data}) / 'testfile'
|
p = datadir({'testfile': data}) / 'testfile'
|
||||||
run_servefile([str(p), '-p', '8081'])
|
run_servefile([str(p), '-p', '8081'])
|
||||||
r = make_request(port=8081)
|
r = requests.get("http://127.0.0.1:8081")
|
||||||
download_ok(r, data)
|
download_ok(r, data)
|
||||||
|
|
||||||
|
|
||||||
|
@ -127,7 +113,7 @@ def test_big_download(run_servefile, datadir):
|
||||||
p = datadir({'testfile': data}) / 'testfile'
|
p = datadir({'testfile': data}) / 'testfile'
|
||||||
|
|
||||||
run_servefile(str(p))
|
run_servefile(str(p))
|
||||||
r = make_request()
|
r = requests.get("http://127.0.0.1:8080")
|
||||||
download_ok(r, data)
|
download_ok(r, data)
|
||||||
|
|
||||||
|
|
||||||
|
@ -137,17 +123,17 @@ def test_authentication(run_servefile, datadir):
|
||||||
|
|
||||||
run_servefile([str(p), '-a', 'user:password'])
|
run_servefile([str(p), '-a', 'user:password'])
|
||||||
for auth in [('foo', 'bar'), ('user', 'wrong'), ('unknown', 'password')]:
|
for auth in [('foo', 'bar'), ('user', 'wrong'), ('unknown', 'password')]:
|
||||||
r = make_request(auth=auth)
|
r = requests.get('http://127.0.0.1:8080', auth=auth)
|
||||||
assert '401 - Unauthorized' in r.text
|
assert '401 - Unauthorized' in r.text
|
||||||
assert r.status_code == 401
|
assert r.status_code == 401
|
||||||
|
|
||||||
r = make_request(auth=('user', 'password'))
|
r = requests.get("http://127.0.0.1:8080", auth=('user', 'password'))
|
||||||
download_ok(r, data)
|
download_ok(r, data)
|
||||||
|
|
||||||
|
|
||||||
def test_serve_directory(run_servefile, datadir):
|
def test_serve_directory(run_servefile, datadir):
|
||||||
d = {
|
d = {
|
||||||
'foo': {'kratzbaum': 'cat', 'I like Cats!': 'kitteh', '&&&&&&&': 'wheee'},
|
'foo': {'1': 'cat', '2': 'kitteh', '3': 'wheee'},
|
||||||
'bar': {'thisisaverylongfilenamefortestingthatthisstillworksproperly': 'jup!'},
|
'bar': {'thisisaverylongfilenamefortestingthatthisstillworksproperly': 'jup!'},
|
||||||
'noot': 'still data in here',
|
'noot': 'still data in here',
|
||||||
'bigfile': 'x' * (10 * 1024 ** 2),
|
'bigfile': 'x' * (10 * 1024 ** 2),
|
||||||
|
@ -158,112 +144,16 @@ def test_serve_directory(run_servefile, datadir):
|
||||||
# check if all files are in directory listing
|
# check if all files are in directory listing
|
||||||
# (could be made more sophisticated with beautifulsoup)
|
# (could be made more sophisticated with beautifulsoup)
|
||||||
for path in '/', '/../':
|
for path in '/', '/../':
|
||||||
r = make_request(path)
|
r = requests.get("http://127.0.0.1:8080" + path)
|
||||||
for k in d:
|
for k in d:
|
||||||
assert k in r.text
|
assert k in r.text
|
||||||
|
|
||||||
for fname, content in d['foo'].items():
|
|
||||||
check_download(content, '/foo/' + fname)
|
|
||||||
|
|
||||||
r = make_request('/unknown')
|
|
||||||
assert r.status_code == 404
|
|
||||||
|
|
||||||
# download
|
# download
|
||||||
|
check_download('cat', '/foo/1')
|
||||||
|
check_download('kitteh', '/foo/2')
|
||||||
check_download('jup!', '/bar/thisisaverylongfilenamefortestingthatthisstillworksproperly')
|
check_download('jup!', '/bar/thisisaverylongfilenamefortestingthatthisstillworksproperly')
|
||||||
|
|
||||||
def test_upload(run_servefile, tmp_path):
|
|
||||||
data = ('this is my live now\n'
|
|
||||||
'uploading strings to servers\n'
|
|
||||||
'so very joyful')
|
|
||||||
uploaddir = tmp_path / 'upload'
|
|
||||||
# check that uploaddir does not exist before servefile is started
|
|
||||||
assert not uploaddir.is_dir()
|
|
||||||
|
|
||||||
run_servefile(['-u', str(uploaddir)])
|
|
||||||
|
|
||||||
# check that servefile created the directory
|
|
||||||
assert uploaddir.is_dir()
|
|
||||||
|
|
||||||
# check upload form present
|
|
||||||
r = make_request()
|
|
||||||
assert r.status_code == 200
|
|
||||||
assert 'multipart/form-data' in r.text
|
|
||||||
|
|
||||||
# upload file
|
|
||||||
files = {'file': ('haiku.txt', data.decode())}
|
|
||||||
r = make_request(method='post', files=files)
|
|
||||||
assert 'Thanks' in r.text
|
|
||||||
assert r.status_code == 200
|
|
||||||
with open(str(uploaddir / 'haiku.txt')) as f:
|
|
||||||
assert f.read() == data
|
|
||||||
|
|
||||||
# upload file AGAIN!! (and check it is available unter a different name)
|
|
||||||
files = {'file': ('haiku.txt', data.decode())}
|
|
||||||
r = make_request(method='post', files=files)
|
|
||||||
assert r.status_code == 200
|
|
||||||
with open(str(uploaddir / 'haiku.txt(1)')) as f:
|
|
||||||
assert f.read() == data
|
|
||||||
|
|
||||||
|
|
||||||
def test_upload_size_limit(run_servefile, tmp_path):
|
|
||||||
uploaddir = tmp_path / 'upload'
|
|
||||||
run_servefile(['-s', '2kb''-u', str(uploaddir)])
|
|
||||||
|
|
||||||
# upload file that is too big
|
|
||||||
files = {'file': ('toobig', "x" * 2049)}
|
|
||||||
r = make_request(method='post', files=files)
|
|
||||||
assert 'Your file was too big' in r.text
|
|
||||||
assert r.status_code == 200
|
|
||||||
assert not (uploaddir / 'toobig').exists()
|
|
||||||
|
|
||||||
# upload file that should fit
|
|
||||||
assert False
|
|
||||||
|
|
||||||
def test_tar_mode(run_servefile, datadir):
|
|
||||||
d = {
|
|
||||||
'foo': {
|
|
||||||
'bar': 'hello testmode my old friend',
|
|
||||||
'baz': 'you came to test me once again',
|
|
||||||
}
|
|
||||||
}
|
|
||||||
p = datadir(d)
|
|
||||||
run_servefile(['-t', str(p / 'foo')])
|
|
||||||
|
|
||||||
# test redirect?
|
|
||||||
|
|
||||||
# test contents of tar file
|
|
||||||
r = make_request()
|
|
||||||
assert r.status_code == 200
|
|
||||||
tar = tarfile.open(fileobj=io.BytesIO(r.text.encode()))
|
|
||||||
assert len(tar.getmembers()) == 3
|
|
||||||
assert tar.getmember('foo').isdir()
|
|
||||||
for filename, content in d['foo'].items():
|
|
||||||
info = tar.getmember('foo/{}'.format(filename))
|
|
||||||
assert info.isfile
|
|
||||||
assert tar.extractfile(info.path).read() == content
|
|
||||||
|
|
||||||
|
|
||||||
def test_tar_compression(run_servefile, datadir):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def test_https(run_servefile, datadir):
|
|
||||||
data = "NOOT NOOT"
|
|
||||||
p = datadir({'testfile': data}) / 'testfile'
|
|
||||||
run_servefile(['--ssl', str(p)], stdout=subprocess.PIPE)
|
|
||||||
|
|
||||||
# fingerprint = None
|
|
||||||
# while not fingerprint:
|
|
||||||
# line = s.stdout.readline()
|
|
||||||
# print(line)
|
|
||||||
# # if we find this line we went too far...
|
|
||||||
# assert not line.startswith("Some addresses this file will be available at")
|
|
||||||
|
|
||||||
# if line.startswith("SHA1 fingerprint"):
|
|
||||||
# fingerprint = line.replace("SHA1 fingerprint: ", "").strip()
|
|
||||||
# break
|
|
||||||
|
|
||||||
# assert fingerprint
|
|
||||||
|
|
||||||
urllib3
|
|
||||||
check_download(protocol='https')
|
|
||||||
|
|
||||||
|
class Foo:
|
||||||
|
def noot(servefile_simple):
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue