diff --git a/.gitignore b/.gitignore index 3400509..2d17488 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,7 @@ MANIFEST dist/ +*.pyc +__pycache__ +*.swp +servefile.egg-info +.tox diff --git a/setup.py b/setup.py index abf7650..252161f 100755 --- a/setup.py +++ b/setup.py @@ -12,6 +12,7 @@ setup( url='http://seba-geek.de/stuff/servefile/', author='Sebastian Lohff', author_email='seba@someserver.de', + install_requires=['pyopenssl'], scripts=['servefile'], ) diff --git a/tests/test_servefile.py b/tests/test_servefile.py new file mode 100644 index 0000000..866ac6f --- /dev/null +++ b/tests/test_servefile.py @@ -0,0 +1,276 @@ +import io +import os +import pytest +import requests +import subprocess +import tarfile +import time +import urllib3 + +# crudly written to learn more about pytest and to have a base for refactoring + + +@pytest.fixture +def run_servefile(): + instances = [] + + def _run_servefile(args, **kwargs): + if not isinstance(args, list): + args = [args] + print("running with args", args) + p = subprocess.Popen(['servefile'] + args, **kwargs) + time.sleep(kwargs.get('timeout', 0.3)) + instances.append(p) + + return p + + yield _run_servefile + + for instance in instances: + try: + instance.terminate() + except OSError: + pass + instance.wait() + + +@pytest.fixture +def datadir(tmp_path): + def _datadir(data, path=None): + path = path or tmp_path + for k, v in data.items(): + if isinstance(v, dict): + new_path = path / k + new_path.mkdir() + _datadir(v, new_path) + else: + if hasattr(v, 'decode'): + v = v.decode() # python2 compability + (path / k).write_text(v) + + return path + return _datadir + + +def make_request(path='/', host='localhost', port=8080, method='get', protocol='http', **kwargs): + url = '{}://{}:{}{}'.format(protocol, host, port, path) + print('Calling {} on {} with {}'.format(method, url, kwargs)) + r = getattr(requests, method)(url, **kwargs) + + return r + + +def check_download(expected_data=None, path='/', fname=None, status_code=200, **kwargs): + if fname is None: + fname = os.path.basename(path) + r = make_request(path, **kwargs) + assert r.status_code == 200 + assert r.text == expected_data + assert r.headers.get('Content-Type') == 'application/octet-stream' + if fname: + assert r.headers.get('Content-Disposition') == 'attachment; filename="{}"'.format(fname) + assert r.headers.get('Content-Transfer-Encoding') == 'binary' + + return r # for additional tests + + +def test_version(run_servefile): + # we expect the version on stdout (python3.4+) or stderr(python2.6-3.3) + s = run_servefile('--version', stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + s.wait() + version = s.stdout.readline().decode().strip() + + # hardcode version as string until servefile is a module + assert version == 'servefile 0.4.4' + + +def test_correct_headers(run_servefile, datadir): + data = "NOOT NOOT" + p = datadir({'testfile': data}) / 'testfile' + run_servefile(str(p)) + + r = make_request() + assert r.status_code == 200 + assert r.headers.get('Content-Type') == 'application/octet-stream' + assert r.headers.get('Content-Disposition') == 'attachment; filename="testfile"' + assert r.headers.get('Content-Transfer-Encoding') == 'binary' + + +def test_redirect_and_download(run_servefile, datadir): + data = "NOOT NOOT" + p = datadir({'testfile': data}) / 'testfile' + run_servefile(str(p)) + + # redirect + r = make_request(allow_redirects=False) + assert r.status_code == 302 + assert r.headers.get('Location') == '/testfile' + + # normal download + check_download(data, fname='testfile') + + +def test_specify_port(run_servefile, datadir): + data = "NOOT NOOT" + p = datadir({'testfile': data}) / 'testfile' + run_servefile([str(p), '-p', '8081']) + + check_download(data, fname='testfile', port=8081) + + +def test_big_download(run_servefile, datadir): + # test with about 10 mb of data + data = "x" * (10 * 1024 ** 2) + p = datadir({'testfile': data}) / 'testfile' + run_servefile(str(p)) + + check_download(data, fname='testfile') + + +def test_authentication(run_servefile, datadir): + data = "NOOT NOOT" + p = datadir({'testfile': data}) / 'testfile' + + run_servefile([str(p), '-a', 'user:password']) + for auth in [('foo', 'bar'), ('user', 'wrong'), ('unknown', 'password')]: + r = make_request(auth=auth) + assert '401 - Unauthorized' in r.text + assert r.status_code == 401 + + check_download(data, fname='testfile', auth=('user', 'password')) + + +def test_serve_directory(run_servefile, datadir): + d = { + 'foo': {'kratzbaum': 'cat', 'I like Cats!': 'kitteh', '&&&&&&&': 'wheee'}, + 'bar': {'thisisaverylongfilenamefortestingthatthisstillworksproperly': 'jup!'}, + 'noot': 'still data in here', + 'bigfile': 'x' * (10 * 1024 ** 2), + } + p = datadir(d) + run_servefile([str(p), '-l']) + + # check if all files are in directory listing + # (could be made more sophisticated with beautifulsoup) + for path in '/', '/../': + r = make_request(path) + for k in d: + assert k in r.text + + for fname, content in d['foo'].items(): + check_download(content, '/foo/' + fname) + + r = make_request('/unknown') + assert r.status_code == 404 + + # download + check_download('jup!', '/bar/thisisaverylongfilenamefortestingthatthisstillworksproperly') + + +def test_upload(run_servefile, tmp_path): + data = ('this is my live now\n' + 'uploading strings to servers\n' + 'so very joyful') + uploaddir = tmp_path / 'upload' + # check that uploaddir does not exist before servefile is started + assert not uploaddir.is_dir() + + run_servefile(['-u', str(uploaddir)]) + + # check that servefile created the directory + assert uploaddir.is_dir() + + # check upload form present + r = make_request() + assert r.status_code == 200 + assert 'multipart/form-data' in r.text + + # upload file + files = {'file': ('haiku.txt', data)} + r = make_request(method='post', files=files) + assert 'Thanks' in r.text + assert r.status_code == 200 + with open(str(uploaddir / 'haiku.txt')) as f: + assert f.read() == data + + # upload file AGAIN!! (and check it is available unter a different name) + files = {'file': ('haiku.txt', data)} + r = make_request(method='post', files=files) + assert r.status_code == 200 + with open(str(uploaddir / 'haiku.txt(1)')) as f: + assert f.read() == data + + +def test_upload_size_limit(run_servefile, tmp_path): + uploaddir = tmp_path / 'upload' + run_servefile(['-s', '2kb', '-u', str(uploaddir)]) + + # upload file that is too big + files = {'file': ('toobig', "x" * 2049)} + r = make_request(method='post', files=files) + assert 'Your file was too big' in r.text + assert r.status_code == 413 + assert not (uploaddir / 'toobig').exists() + + # upload file that should fit + # the size has to be smaller than 2kb, as the sent size also includes mime-headers + files = {'file': ('justright', "x" * 1900)} + r = make_request(method='post', files=files) + assert r.status_code == 200 + + +def test_tar_mode(run_servefile, datadir): + d = { + 'foo': { + 'bar': 'hello testmode my old friend', + 'baz': 'you came to test me once again', + } + } + p = datadir(d) + run_servefile(['-t', str(p / 'foo')]) + + # test redirect? + + # test contents of tar file + r = make_request() + assert r.status_code == 200 + tar = tarfile.open(fileobj=io.BytesIO(r.content)) + assert len(tar.getmembers()) == 3 + assert tar.getmember('foo').isdir() + for filename, content in d['foo'].items(): + info = tar.getmember('foo/{}'.format(filename)) + assert info.isfile + assert tar.extractfile(info.path).read().decode() == content + + +def test_tar_compression(run_servefile, datadir): + d = {'foo': 'blubb'} + p = datadir(d) + run_servefile(['-c', 'gzip', '-t', str(p / 'foo')]) + + r = make_request() + assert r.status_code == 200 + tar = tarfile.open(fileobj=io.BytesIO(r.content), mode='r:gz') + assert len(tar.getmembers()) == 1 + + +def test_https(run_servefile, datadir): + data = "NOOT NOOT" + p = datadir({'testfile': data}) / 'testfile' + run_servefile(['--ssl', str(p)]) + time.sleep(0.2) # time for generating ssl certificates + + # fingerprint = None + # while not fingerprint: + # line = s.stdout.readline() + # print(line) + # # if we find this line we went too far... + # assert not line.startswith("Some addresses this file will be available at") + + # if line.startswith("SHA1 fingerprint"): + # fingerprint = line.replace("SHA1 fingerprint: ", "").strip() + # break + + # assert fingerprint + urllib3.disable_warnings() + check_download(data, protocol='https', verify=False) diff --git a/tox.ini b/tox.ini new file mode 100644 index 0000000..0160c41 --- /dev/null +++ b/tox.ini @@ -0,0 +1,8 @@ +[tox] +envlist = py27,py36 + +[testenv] +deps = + pytest + requests +commands = pytest --tb=short