Compare commits

...

4 Commits

Author SHA1 Message Date
Sebastian Lohff d52669b8b0 Initial tests 2019-02-14 23:35:41 +01:00
Sebastian Lohff 30738981f4 Document lzma/xz support in manpage 2019-02-14 23:25:36 +01:00
Sebastian Lohff 53d803626d
Merge pull request #2 from bitwave/tar-lzma-support
added compression method lzma for tar files
2019-02-14 23:22:58 +01:00
bitwave aeb8588198 added compression method lzma for tar files 2018-04-25 19:08:58 +02:00
6 changed files with 296 additions and 3 deletions

5
.gitignore vendored
View File

@ -1,2 +1,7 @@
MANIFEST
dist/
*.pyc
__pycache__
*.swp
servefile.egg-info
.tox

View File

@ -7,7 +7,7 @@
from __future__ import print_function
__version__ = '0.4.4'
__version__ = '0.4.5-unreleased'
import argparse
import base64
@ -227,7 +227,7 @@ class FileHandler(FileBaseHandler):
class TarFileHandler(FileBaseHandler):
target = None
compression = "none"
compressionMethods = ("none", "gzip", "bzip2")
compressionMethods = ("none", "gzip", "bzip2", "xz")
def do_HEAD(self):
if self.checkAndDoRedirect():
@ -269,6 +269,8 @@ class TarFileHandler(FileBaseHandler):
cmd = ["tar", "-cz"]
elif self.compression == "bzip2":
cmd = ["tar", "-cj"]
elif self.compression == "xz":
cmd = ["tar", "-cJ"]
else:
raise ValueError("Unknown compression mode '%s'." % self.compression)
@ -287,6 +289,8 @@ class TarFileHandler(FileBaseHandler):
return ".tar.gz"
elif TarFileHandler.compression == "bzip2":
return ".tar.bz2"
elif TarFileHandler.compression == "xz":
return ".tar.xz"
raise ValueError("Unknown compression mode '%s'." % TarFileHandler.compression)

View File

@ -94,7 +94,7 @@ available.
.TP
\fB\-c\fR method, \fB\-\-compression\fR method
Set compression method, only in combination with
\fB\-\-tar\fR. Can be one of none, gzip, bzip2.
\fB\-\-tar\fR. Can be one of none, gzip, bzip2, xz.
.TP
\fB\-4\fR, \fB\-\-ipv4\-only\fR
Listen on IPv4 only

View File

@ -12,6 +12,7 @@ setup(
url='http://seba-geek.de/stuff/servefile/',
author='Sebastian Lohff',
author_email='seba@someserver.de',
install_requires=['pyopenssl'],
scripts=['servefile'],
)

275
tests/test_servefile.py Normal file
View File

@ -0,0 +1,275 @@
import io
import pytest
import requests
import subprocess
import tarfile
import time
import urllib3
# crudly written to learn more about pytest and to have a base for refactoring
@pytest.fixture
def run_servefile():
instances = []
def _run_servefile(args, **kwargs):
if not isinstance(args, list):
args = [args]
print("running with args", args)
p = subprocess.Popen(['servefile'] + args, **kwargs)
time.sleep(kwargs.get('timeout', 0.3))
instances.append(p)
return p
yield _run_servefile
for instance in instances:
try:
instance.terminate()
except OSError:
pass
instance.wait()
@pytest.fixture
def datadir(tmp_path):
def _datadir(data, path=None):
path = path or tmp_path
for k, v in data.items():
if isinstance(v, dict):
new_path = path / k
new_path.mkdir()
_datadir(v, new_path)
else:
(path / k).write_text(v.decode())
return path
return _datadir
@pytest.fixture(scope='class')
def servefile_simple(run_servefile, datadir):
data = "NOOT NOOT"
p = datadir({'testfile': data}) / 'testfile'
run_servefile(str(p))
return dict(host='localhost', port=8080, protocol='http')
def download_ok(r, expected_data, expected_code=200):
assert r.status_code == expected_code
assert r.text == expected_data
def make_request(path='/', host='localhost', port=8080, method='get', protocol='http', **kwargs):
url = '{}://{}:{}{}'.format(protocol, host, port, path)
r = getattr(requests, method)(url, **kwargs)
return r
def check_download(expected_data=None, path='/', status_code=200, **kwargs):
r = make_request(path, **kwargs)
assert r.status_code == 200
assert r.text == expected_data
assert r.headers.get('Content-Type') == 'application/octet-stream'
# assert r.headers.get('Content-Disposition') == 'attachment; filename=""'
assert r.headers.get('Content-Transfer-Encoding') == 'binary'
return r # for additional tests
def test_version(run_servefile):
s = run_servefile('--version', stderr=subprocess.PIPE)
s.wait()
version = s.stderr.readline().strip()
assert version == 'servefile 0.4.5-unreleased'
def test_correct_headers(run_servefile, datadir):
data = "NOOT NOOT"
p = datadir({'testfile': data}) / 'testfile'
run_servefile(str(p))
r = make_request()
assert r.status_code == 200
assert r.headers.get('Content-Type') == 'application/octet-stream'
assert r.headers.get('Content-Disposition') == 'attachment; filename="testfile"'
assert r.headers.get('Content-Transfer-Encoding') == 'binary'
def test_redirect_and_download(run_servefile, datadir):
data = "NOOT NOOT"
p = datadir({'testfile': data}) / 'testfile'
run_servefile(str(p))
r = make_request(allow_redirects=False)
assert r.status_code == 302
assert r.headers.get('Location') == '/testfile'
r = requests.get("http://127.0.0.1:8080")
download_ok(r, data)
def test_specify_port(run_servefile, datadir):
data = "NOOT NOOT"
p = datadir({'testfile': data}) / 'testfile'
run_servefile([str(p), '-p', '8081'])
r = make_request(port=8081)
download_ok(r, data)
def test_big_download(run_servefile, datadir):
# test with about 10 mb of data
data = "x" * (10 * 1024 ** 2)
p = datadir({'testfile': data}) / 'testfile'
run_servefile(str(p))
r = make_request()
download_ok(r, data)
def test_authentication(run_servefile, datadir):
data = "NOOT NOOT"
p = datadir({'testfile': data}) / 'testfile'
run_servefile([str(p), '-a', 'user:password'])
for auth in [('foo', 'bar'), ('user', 'wrong'), ('unknown', 'password')]:
r = make_request(auth=auth)
assert '401 - Unauthorized' in r.text
assert r.status_code == 401
r = make_request(auth=('user', 'password'))
download_ok(r, data)
def test_serve_directory(run_servefile, datadir):
d = {
'foo': {'kratzbaum': 'cat', 'I like Cats!': 'kitteh', '&&&&&&&': 'wheee'},
'bar': {'thisisaverylongfilenamefortestingthatthisstillworksproperly': 'jup!'},
'noot': 'still data in here',
'bigfile': 'x' * (10 * 1024 ** 2),
}
p = datadir(d)
run_servefile([str(p), '-l'])
# check if all files are in directory listing
# (could be made more sophisticated with beautifulsoup)
for path in '/', '/../':
r = make_request(path)
for k in d:
assert k in r.text
for fname, content in d['foo'].items():
check_download(content, '/foo/' + fname)
r = make_request('/unknown')
assert r.status_code == 404
# download
check_download('jup!', '/bar/thisisaverylongfilenamefortestingthatthisstillworksproperly')
def test_upload(run_servefile, tmp_path):
data = ('this is my live now\n'
'uploading strings to servers\n'
'so very joyful')
uploaddir = tmp_path / 'upload'
# check that uploaddir does not exist before servefile is started
assert not uploaddir.is_dir()
run_servefile(['-u', str(uploaddir)])
# check that servefile created the directory
assert uploaddir.is_dir()
# check upload form present
r = make_request()
assert r.status_code == 200
assert 'multipart/form-data' in r.text
# upload file
files = {'file': ('haiku.txt', data.decode())}
r = make_request(method='post', files=files)
assert 'Thanks' in r.text
assert r.status_code == 200
with open(str(uploaddir / 'haiku.txt')) as f:
assert f.read() == data
# upload file AGAIN!! (and check it is available unter a different name)
files = {'file': ('haiku.txt', data.decode())}
r = make_request(method='post', files=files)
assert r.status_code == 200
with open(str(uploaddir / 'haiku.txt(1)')) as f:
assert f.read() == data
def test_upload_size_limit(run_servefile, tmp_path):
uploaddir = tmp_path / 'upload'
run_servefile(['-s', '2kb', '-u', str(uploaddir)])
# upload file that is too big
files = {'file': ('toobig', "x" * 2049)}
r = make_request(method='post', files=files)
assert 'Your file was too big' in r.text
assert r.status_code == 413
assert not (uploaddir / 'toobig').exists()
# upload file that should fit
# the size has to be smaller than 2kb, as the sent size also includes mime-headers
files = {'file': ('justright', "x" * 1900)}
r = make_request(method='post', files=files)
assert r.status_code == 200
def test_tar_mode(run_servefile, datadir):
d = {
'foo': {
'bar': 'hello testmode my old friend',
'baz': 'you came to test me once again',
}
}
p = datadir(d)
run_servefile(['-t', str(p / 'foo')])
# test redirect?
# test contents of tar file
r = make_request()
assert r.status_code == 200
tar = tarfile.open(fileobj=io.BytesIO(r.text.encode()))
assert len(tar.getmembers()) == 3
assert tar.getmember('foo').isdir()
for filename, content in d['foo'].items():
info = tar.getmember('foo/{}'.format(filename))
assert info.isfile
assert tar.extractfile(info.path).read() == content
def test_tar_compression(run_servefile, datadir):
pass
def test_https(run_servefile, datadir):
data = "NOOT NOOT"
p = datadir({'testfile': data}) / 'testfile'
run_servefile(['--ssl', str(p)])
time.sleep(0.2) # time for generating ssl certificates
# fingerprint = None
# while not fingerprint:
# line = s.stdout.readline()
# print(line)
# # if we find this line we went too far...
# assert not line.startswith("Some addresses this file will be available at")
# if line.startswith("SHA1 fingerprint"):
# fingerprint = line.replace("SHA1 fingerprint: ", "").strip()
# break
# assert fingerprint
urllib3.disable_warnings()
check_download(data, protocol='https', verify=False)

8
tox.ini Normal file
View File

@ -0,0 +1,8 @@
[tox]
envlist = py27,py36
[testenv]
deps =
pytest
requests
commands = pytest --tb=short