Compare commits
	
		
			3 Commits
		
	
	
		
			93e0c3dd7e
			...
			1ec1671cb1
		
	
	| Author | SHA1 | Date | 
|---|---|---|
|  | 1ec1671cb1 | |
|  | 0dcd09ac80 | |
|  | ae1800e157 | 
|  | @ -1,2 +1,7 @@ | ||||||
| MANIFEST | MANIFEST | ||||||
| dist/ | dist/ | ||||||
|  | *.pyc | ||||||
|  | __pycache__ | ||||||
|  | *.swp | ||||||
|  | servefile.egg-info | ||||||
|  | .tox | ||||||
|  |  | ||||||
|  | @ -7,7 +7,7 @@ | ||||||
| 
 | 
 | ||||||
| from __future__ import print_function | from __future__ import print_function | ||||||
| 
 | 
 | ||||||
| __version__ = '0.4.4' | __version__ = '0.4.5-unreleased' | ||||||
| 
 | 
 | ||||||
| import argparse | import argparse | ||||||
| import base64 | import base64 | ||||||
|  |  | ||||||
							
								
								
									
										1
									
								
								setup.py
								
								
								
								
							
							
						
						
									
										1
									
								
								setup.py
								
								
								
								
							|  | @ -12,6 +12,7 @@ setup( | ||||||
| 	url='http://seba-geek.de/stuff/servefile/', | 	url='http://seba-geek.de/stuff/servefile/', | ||||||
| 	author='Sebastian Lohff', | 	author='Sebastian Lohff', | ||||||
| 	author_email='seba@someserver.de', | 	author_email='seba@someserver.de', | ||||||
|  | 	install_requires=['pyopenssl'], | ||||||
| 	scripts=['servefile'], | 	scripts=['servefile'], | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -0,0 +1,274 @@ | ||||||
|  | import io | ||||||
|  | import os | ||||||
|  | import pytest | ||||||
|  | import requests | ||||||
|  | import subprocess | ||||||
|  | import tarfile | ||||||
|  | import time | ||||||
|  | import urllib3 | ||||||
|  | 
 | ||||||
|  | # crudly written to learn more about pytest and to have a base for refactoring | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | @pytest.fixture | ||||||
|  | def run_servefile(): | ||||||
|  |     instances = [] | ||||||
|  | 
 | ||||||
|  |     def _run_servefile(args, **kwargs): | ||||||
|  |         if not isinstance(args, list): | ||||||
|  |             args = [args] | ||||||
|  |         print("running with args", args) | ||||||
|  |         p = subprocess.Popen(['servefile'] + args, **kwargs) | ||||||
|  |         time.sleep(kwargs.get('timeout', 0.3)) | ||||||
|  |         instances.append(p) | ||||||
|  | 
 | ||||||
|  |         return p | ||||||
|  | 
 | ||||||
|  |     yield _run_servefile | ||||||
|  | 
 | ||||||
|  |     for instance in instances: | ||||||
|  |         try: | ||||||
|  |             instance.terminate() | ||||||
|  |         except OSError: | ||||||
|  |             pass | ||||||
|  |         instance.wait() | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | @pytest.fixture | ||||||
|  | def datadir(tmp_path): | ||||||
|  |     def _datadir(data, path=None): | ||||||
|  |         path = path or tmp_path | ||||||
|  |         for k, v in data.items(): | ||||||
|  |             if isinstance(v, dict): | ||||||
|  |                 new_path = path / k | ||||||
|  |                 new_path.mkdir() | ||||||
|  |                 _datadir(v, new_path) | ||||||
|  |             else: | ||||||
|  |                 if hasattr(v, 'decode'): | ||||||
|  |                     v = v.decode()  # python2 compability | ||||||
|  |                 (path / k).write_text(v) | ||||||
|  | 
 | ||||||
|  |         return path | ||||||
|  |     return _datadir | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def make_request(path='/', host='localhost', port=8080, method='get', protocol='http', **kwargs): | ||||||
|  |     url = '{}://{}:{}{}'.format(protocol, host, port, path) | ||||||
|  |     print('Calling {} on {} with {}'.format(method, url, kwargs)) | ||||||
|  |     r = getattr(requests, method)(url, **kwargs) | ||||||
|  | 
 | ||||||
|  |     return r | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def check_download(expected_data=None, path='/', fname=None, status_code=200, **kwargs): | ||||||
|  |     if fname is None: | ||||||
|  |         fname = os.path.basename(path) | ||||||
|  |     r = make_request(path, **kwargs) | ||||||
|  |     assert r.status_code == 200 | ||||||
|  |     assert r.text == expected_data | ||||||
|  |     assert r.headers.get('Content-Type') == 'application/octet-stream' | ||||||
|  |     if fname: | ||||||
|  |         assert r.headers.get('Content-Disposition') == 'attachment; filename="{}"'.format(fname) | ||||||
|  |     assert r.headers.get('Content-Transfer-Encoding') == 'binary' | ||||||
|  | 
 | ||||||
|  |     return r  # for additional tests | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def test_version(run_servefile): | ||||||
|  |     # we expect the version on stdout (python3.4+) or stderr(python2.6-3.3) | ||||||
|  |     s = run_servefile('--version', stdout=subprocess.PIPE, stderr=subprocess.STDOUT) | ||||||
|  |     s.wait() | ||||||
|  |     version = s.stdout.readline().decode().strip() | ||||||
|  |     assert version == 'servefile 0.4.5-unreleased' | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def test_correct_headers(run_servefile, datadir): | ||||||
|  |     data = "NOOT NOOT" | ||||||
|  |     p = datadir({'testfile': data}) / 'testfile' | ||||||
|  |     run_servefile(str(p)) | ||||||
|  | 
 | ||||||
|  |     r = make_request() | ||||||
|  |     assert r.status_code == 200 | ||||||
|  |     assert r.headers.get('Content-Type') == 'application/octet-stream' | ||||||
|  |     assert r.headers.get('Content-Disposition') == 'attachment; filename="testfile"' | ||||||
|  |     assert r.headers.get('Content-Transfer-Encoding') == 'binary' | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def test_redirect_and_download(run_servefile, datadir): | ||||||
|  |     data = "NOOT NOOT" | ||||||
|  |     p = datadir({'testfile': data}) / 'testfile' | ||||||
|  |     run_servefile(str(p)) | ||||||
|  | 
 | ||||||
|  |     # redirect | ||||||
|  |     r = make_request(allow_redirects=False) | ||||||
|  |     assert r.status_code == 302 | ||||||
|  |     assert r.headers.get('Location') == '/testfile' | ||||||
|  | 
 | ||||||
|  |     # normal download | ||||||
|  |     check_download(data, fname='testfile') | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def test_specify_port(run_servefile, datadir): | ||||||
|  |     data = "NOOT NOOT" | ||||||
|  |     p = datadir({'testfile': data}) / 'testfile' | ||||||
|  |     run_servefile([str(p), '-p', '8081']) | ||||||
|  | 
 | ||||||
|  |     check_download(data, fname='testfile', port=8081) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def test_big_download(run_servefile, datadir): | ||||||
|  |     # test with about 10 mb of data | ||||||
|  |     data = "x" * (10 * 1024 ** 2) | ||||||
|  |     p = datadir({'testfile': data}) / 'testfile' | ||||||
|  |     run_servefile(str(p)) | ||||||
|  | 
 | ||||||
|  |     check_download(data, fname='testfile') | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def test_authentication(run_servefile, datadir): | ||||||
|  |     data = "NOOT NOOT" | ||||||
|  |     p = datadir({'testfile': data}) / 'testfile' | ||||||
|  | 
 | ||||||
|  |     run_servefile([str(p), '-a', 'user:password']) | ||||||
|  |     for auth in [('foo', 'bar'), ('user', 'wrong'), ('unknown', 'password')]: | ||||||
|  |         r = make_request(auth=auth) | ||||||
|  |         assert '401 - Unauthorized' in r.text | ||||||
|  |         assert r.status_code == 401 | ||||||
|  | 
 | ||||||
|  |     check_download(data, fname='testfile', auth=('user', 'password')) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def test_serve_directory(run_servefile, datadir): | ||||||
|  |     d = { | ||||||
|  |         'foo': {'kratzbaum': 'cat', 'I like Cats!': 'kitteh', '&&&&&&&': 'wheee'}, | ||||||
|  |         'bar': {'thisisaverylongfilenamefortestingthatthisstillworksproperly': 'jup!'}, | ||||||
|  |         'noot': 'still data in here', | ||||||
|  |         'bigfile': 'x' * (10 * 1024 ** 2), | ||||||
|  |     } | ||||||
|  |     p = datadir(d) | ||||||
|  |     run_servefile([str(p), '-l']) | ||||||
|  | 
 | ||||||
|  |     # check if all files are in directory listing | ||||||
|  |     # (could be made more sophisticated with beautifulsoup) | ||||||
|  |     for path in '/', '/../': | ||||||
|  |         r = make_request(path) | ||||||
|  |         for k in d: | ||||||
|  |             assert k in r.text | ||||||
|  | 
 | ||||||
|  |     for fname, content in d['foo'].items(): | ||||||
|  |         check_download(content, '/foo/' + fname) | ||||||
|  | 
 | ||||||
|  |     r = make_request('/unknown') | ||||||
|  |     assert r.status_code == 404 | ||||||
|  | 
 | ||||||
|  |     # download | ||||||
|  |     check_download('jup!', '/bar/thisisaverylongfilenamefortestingthatthisstillworksproperly') | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def test_upload(run_servefile, tmp_path): | ||||||
|  |     data = ('this is my live now\n' | ||||||
|  |             'uploading strings to servers\n' | ||||||
|  |             'so very joyful') | ||||||
|  |     uploaddir = tmp_path / 'upload' | ||||||
|  |     # check that uploaddir does not exist before servefile is started | ||||||
|  |     assert not uploaddir.is_dir() | ||||||
|  | 
 | ||||||
|  |     run_servefile(['-u', str(uploaddir)]) | ||||||
|  | 
 | ||||||
|  |     # check that servefile created the directory | ||||||
|  |     assert uploaddir.is_dir() | ||||||
|  | 
 | ||||||
|  |     # check upload form present | ||||||
|  |     r = make_request() | ||||||
|  |     assert r.status_code == 200 | ||||||
|  |     assert 'multipart/form-data' in r.text | ||||||
|  | 
 | ||||||
|  |     # upload file | ||||||
|  |     files = {'file': ('haiku.txt', data)} | ||||||
|  |     r = make_request(method='post', files=files) | ||||||
|  |     assert 'Thanks' in r.text | ||||||
|  |     assert r.status_code == 200 | ||||||
|  |     with open(str(uploaddir / 'haiku.txt')) as f: | ||||||
|  |         assert f.read() == data | ||||||
|  | 
 | ||||||
|  |     # upload file AGAIN!! (and check it is available unter a different name) | ||||||
|  |     files = {'file': ('haiku.txt', data)} | ||||||
|  |     r = make_request(method='post', files=files) | ||||||
|  |     assert r.status_code == 200 | ||||||
|  |     with open(str(uploaddir / 'haiku.txt(1)')) as f: | ||||||
|  |         assert f.read() == data | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def test_upload_size_limit(run_servefile, tmp_path): | ||||||
|  |     uploaddir = tmp_path / 'upload' | ||||||
|  |     run_servefile(['-s', '2kb', '-u', str(uploaddir)]) | ||||||
|  | 
 | ||||||
|  |     # upload file that is too big | ||||||
|  |     files = {'file': ('toobig', "x" * 2049)} | ||||||
|  |     r = make_request(method='post', files=files) | ||||||
|  |     assert 'Your file was too big' in r.text | ||||||
|  |     assert r.status_code == 413 | ||||||
|  |     assert not (uploaddir / 'toobig').exists() | ||||||
|  | 
 | ||||||
|  |     # upload file that should fit | ||||||
|  |     # the size has to be smaller than 2kb, as the sent size also includes mime-headers | ||||||
|  |     files = {'file': ('justright', "x" * 1900)} | ||||||
|  |     r = make_request(method='post', files=files) | ||||||
|  |     assert r.status_code == 200 | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def test_tar_mode(run_servefile, datadir): | ||||||
|  |     d = { | ||||||
|  |         'foo': { | ||||||
|  |             'bar': 'hello testmode my old friend', | ||||||
|  |             'baz': 'you came to test me once again', | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |     p = datadir(d) | ||||||
|  |     run_servefile(['-t', str(p / 'foo')]) | ||||||
|  | 
 | ||||||
|  |     # test redirect? | ||||||
|  | 
 | ||||||
|  |     # test contents of tar file | ||||||
|  |     r = make_request() | ||||||
|  |     assert r.status_code == 200 | ||||||
|  |     tar = tarfile.open(fileobj=io.BytesIO(r.content)) | ||||||
|  |     assert len(tar.getmembers()) == 3 | ||||||
|  |     assert tar.getmember('foo').isdir() | ||||||
|  |     for filename, content in d['foo'].items(): | ||||||
|  |         info = tar.getmember('foo/{}'.format(filename)) | ||||||
|  |         assert info.isfile | ||||||
|  |         assert tar.extractfile(info.path).read().decode() == content | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def test_tar_compression(run_servefile, datadir): | ||||||
|  |     d = {'foo': 'blubb'} | ||||||
|  |     p = datadir(d) | ||||||
|  |     run_servefile(['-c', 'gzip', '-t', str(p / 'foo')]) | ||||||
|  | 
 | ||||||
|  |     r = make_request() | ||||||
|  |     assert r.status_code == 200 | ||||||
|  |     tar = tarfile.open(fileobj=io.BytesIO(r.content), mode='r:gz') | ||||||
|  |     assert len(tar.getmembers()) == 1 | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def test_https(run_servefile, datadir): | ||||||
|  |     data = "NOOT NOOT" | ||||||
|  |     p = datadir({'testfile': data}) / 'testfile' | ||||||
|  |     run_servefile(['--ssl', str(p)]) | ||||||
|  |     time.sleep(0.2)  # time for generating ssl certificates | ||||||
|  | 
 | ||||||
|  |     # fingerprint = None | ||||||
|  |     # while not fingerprint: | ||||||
|  |     #     line = s.stdout.readline() | ||||||
|  |     #     print(line) | ||||||
|  |     #     # if we find this line we went too far... | ||||||
|  |     #     assert not line.startswith("Some addresses this file will be available at") | ||||||
|  | 
 | ||||||
|  |     #     if line.startswith("SHA1 fingerprint"): | ||||||
|  |     #         fingerprint = line.replace("SHA1 fingerprint: ", "").strip() | ||||||
|  |     #         break | ||||||
|  | 
 | ||||||
|  |     # assert fingerprint | ||||||
|  |     urllib3.disable_warnings() | ||||||
|  |     check_download(data, protocol='https', verify=False) | ||||||
		Loading…
	
		Reference in New Issue