| 
							
							
							
						 |  |  | @ -0,0 +1,275 @@ | 
		
	
		
			
				|  |  |  |  | import io | 
		
	
		
			
				|  |  |  |  | import pytest | 
		
	
		
			
				|  |  |  |  | import requests | 
		
	
		
			
				|  |  |  |  | import subprocess | 
		
	
		
			
				|  |  |  |  | import tarfile | 
		
	
		
			
				|  |  |  |  | import time | 
		
	
		
			
				|  |  |  |  | import urllib3 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | # crudly written to learn more about pytest and to have a base for refactoring | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | @pytest.fixture | 
		
	
		
			
				|  |  |  |  | def run_servefile(): | 
		
	
		
			
				|  |  |  |  |     instances = [] | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |     def _run_servefile(args, **kwargs): | 
		
	
		
			
				|  |  |  |  |         if not isinstance(args, list): | 
		
	
		
			
				|  |  |  |  |             args = [args] | 
		
	
		
			
				|  |  |  |  |         print("running with args", args) | 
		
	
		
			
				|  |  |  |  |         p = subprocess.Popen(['servefile'] + args, **kwargs) | 
		
	
		
			
				|  |  |  |  |         time.sleep(kwargs.get('timeout', 0.3)) | 
		
	
		
			
				|  |  |  |  |         instances.append(p) | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |         return p | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |     yield _run_servefile | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |     for instance in instances: | 
		
	
		
			
				|  |  |  |  |         try: | 
		
	
		
			
				|  |  |  |  |             instance.terminate() | 
		
	
		
			
				|  |  |  |  |         except OSError: | 
		
	
		
			
				|  |  |  |  |             pass | 
		
	
		
			
				|  |  |  |  |         instance.wait() | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | @pytest.fixture | 
		
	
		
			
				|  |  |  |  | def datadir(tmp_path): | 
		
	
		
			
				|  |  |  |  |     def _datadir(data, path=None): | 
		
	
		
			
				|  |  |  |  |         path = path or tmp_path | 
		
	
		
			
				|  |  |  |  |         for k, v in data.items(): | 
		
	
		
			
				|  |  |  |  |             if isinstance(v, dict): | 
		
	
		
			
				|  |  |  |  |                 new_path = path / k | 
		
	
		
			
				|  |  |  |  |                 new_path.mkdir() | 
		
	
		
			
				|  |  |  |  |                 _datadir(v, new_path) | 
		
	
		
			
				|  |  |  |  |             else: | 
		
	
		
			
				|  |  |  |  |                 (path / k).write_text(v.decode()) | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |         return path | 
		
	
		
			
				|  |  |  |  |     return _datadir | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | @pytest.fixture(scope='class') | 
		
	
		
			
				|  |  |  |  | def servefile_simple(run_servefile, datadir): | 
		
	
		
			
				|  |  |  |  |     data = "NOOT NOOT" | 
		
	
		
			
				|  |  |  |  |     p = datadir({'testfile': data}) / 'testfile' | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |     run_servefile(str(p)) | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |     return dict(host='localhost', port=8080, protocol='http') | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | def download_ok(r, expected_data, expected_code=200): | 
		
	
		
			
				|  |  |  |  |     assert r.status_code == expected_code | 
		
	
		
			
				|  |  |  |  |     assert r.text == expected_data | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | def make_request(path='/', host='localhost', port=8080, method='get', protocol='http', **kwargs): | 
		
	
		
			
				|  |  |  |  |     url = '{}://{}:{}{}'.format(protocol, host, port, path) | 
		
	
		
			
				|  |  |  |  |     r = getattr(requests, method)(url, **kwargs) | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |     return r | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | def check_download(expected_data=None, path='/', status_code=200, **kwargs): | 
		
	
		
			
				|  |  |  |  |     r = make_request(path, **kwargs) | 
		
	
		
			
				|  |  |  |  |     assert r.status_code == 200 | 
		
	
		
			
				|  |  |  |  |     assert r.text == expected_data | 
		
	
		
			
				|  |  |  |  |     assert r.headers.get('Content-Type') == 'application/octet-stream' | 
		
	
		
			
				|  |  |  |  |     # assert r.headers.get('Content-Disposition') == 'attachment; filename=""' | 
		
	
		
			
				|  |  |  |  |     assert r.headers.get('Content-Transfer-Encoding') == 'binary' | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |     return r  # for additional tests | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | def test_version(run_servefile): | 
		
	
		
			
				|  |  |  |  |     s = run_servefile('--version', stderr=subprocess.PIPE) | 
		
	
		
			
				|  |  |  |  |     s.wait() | 
		
	
		
			
				|  |  |  |  |     version = s.stderr.readline().strip() | 
		
	
		
			
				|  |  |  |  |     assert version == 'servefile 0.4.5-unreleased' | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | def test_correct_headers(run_servefile, datadir): | 
		
	
		
			
				|  |  |  |  |     data = "NOOT NOOT" | 
		
	
		
			
				|  |  |  |  |     p = datadir({'testfile': data}) / 'testfile' | 
		
	
		
			
				|  |  |  |  |     run_servefile(str(p)) | 
		
	
		
			
				|  |  |  |  |     r = make_request() | 
		
	
		
			
				|  |  |  |  |     assert r.status_code == 200 | 
		
	
		
			
				|  |  |  |  |     assert r.headers.get('Content-Type') == 'application/octet-stream' | 
		
	
		
			
				|  |  |  |  |     assert r.headers.get('Content-Disposition') == 'attachment; filename="testfile"' | 
		
	
		
			
				|  |  |  |  |     assert r.headers.get('Content-Transfer-Encoding') == 'binary' | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | def test_redirect_and_download(run_servefile, datadir): | 
		
	
		
			
				|  |  |  |  |     data = "NOOT NOOT" | 
		
	
		
			
				|  |  |  |  |     p = datadir({'testfile': data}) / 'testfile' | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |     run_servefile(str(p)) | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |     r = make_request(allow_redirects=False) | 
		
	
		
			
				|  |  |  |  |     assert r.status_code == 302 | 
		
	
		
			
				|  |  |  |  |     assert r.headers.get('Location') == '/testfile' | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |     r = requests.get("http://127.0.0.1:8080") | 
		
	
		
			
				|  |  |  |  |     download_ok(r, data) | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | def test_specify_port(run_servefile, datadir): | 
		
	
		
			
				|  |  |  |  |     data = "NOOT NOOT" | 
		
	
		
			
				|  |  |  |  |     p = datadir({'testfile': data}) / 'testfile' | 
		
	
		
			
				|  |  |  |  |     run_servefile([str(p), '-p', '8081']) | 
		
	
		
			
				|  |  |  |  |     r = make_request(port=8081) | 
		
	
		
			
				|  |  |  |  |     download_ok(r, data) | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | def test_big_download(run_servefile, datadir): | 
		
	
		
			
				|  |  |  |  |     # test with about 10 mb of data | 
		
	
		
			
				|  |  |  |  |     data = "x" * (10 * 1024 ** 2) | 
		
	
		
			
				|  |  |  |  |     p = datadir({'testfile': data}) / 'testfile' | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |     run_servefile(str(p)) | 
		
	
		
			
				|  |  |  |  |     r = make_request() | 
		
	
		
			
				|  |  |  |  |     download_ok(r, data) | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | def test_authentication(run_servefile, datadir): | 
		
	
		
			
				|  |  |  |  |     data = "NOOT NOOT" | 
		
	
		
			
				|  |  |  |  |     p = datadir({'testfile': data}) / 'testfile' | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |     run_servefile([str(p), '-a', 'user:password']) | 
		
	
		
			
				|  |  |  |  |     for auth in [('foo', 'bar'), ('user', 'wrong'), ('unknown', 'password')]: | 
		
	
		
			
				|  |  |  |  |         r = make_request(auth=auth) | 
		
	
		
			
				|  |  |  |  |         assert '401 - Unauthorized' in r.text | 
		
	
		
			
				|  |  |  |  |         assert r.status_code == 401 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |     r = make_request(auth=('user', 'password')) | 
		
	
		
			
				|  |  |  |  |     download_ok(r, data) | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | def test_serve_directory(run_servefile, datadir): | 
		
	
		
			
				|  |  |  |  |     d = { | 
		
	
		
			
				|  |  |  |  |         'foo': {'kratzbaum': 'cat', 'I like Cats!': 'kitteh', '&&&&&&&': 'wheee'}, | 
		
	
		
			
				|  |  |  |  |         'bar': {'thisisaverylongfilenamefortestingthatthisstillworksproperly': 'jup!'}, | 
		
	
		
			
				|  |  |  |  |         'noot': 'still data in here', | 
		
	
		
			
				|  |  |  |  |         'bigfile': 'x' * (10 * 1024 ** 2), | 
		
	
		
			
				|  |  |  |  |     } | 
		
	
		
			
				|  |  |  |  |     p = datadir(d) | 
		
	
		
			
				|  |  |  |  |     run_servefile([str(p), '-l']) | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |     # check if all files are in directory listing | 
		
	
		
			
				|  |  |  |  |     # (could be made more sophisticated with beautifulsoup) | 
		
	
		
			
				|  |  |  |  |     for path in '/', '/../': | 
		
	
		
			
				|  |  |  |  |         r = make_request(path) | 
		
	
		
			
				|  |  |  |  |         for k in d: | 
		
	
		
			
				|  |  |  |  |             assert k in r.text | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |     for fname, content in d['foo'].items(): | 
		
	
		
			
				|  |  |  |  |         check_download(content, '/foo/' + fname) | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |     r = make_request('/unknown') | 
		
	
		
			
				|  |  |  |  |     assert r.status_code == 404 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |     # download | 
		
	
		
			
				|  |  |  |  |     check_download('jup!', '/bar/thisisaverylongfilenamefortestingthatthisstillworksproperly') | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | def test_upload(run_servefile, tmp_path): | 
		
	
		
			
				|  |  |  |  |     data = ('this is my live now\n' | 
		
	
		
			
				|  |  |  |  |             'uploading strings to servers\n' | 
		
	
		
			
				|  |  |  |  |             'so very joyful') | 
		
	
		
			
				|  |  |  |  |     uploaddir = tmp_path / 'upload' | 
		
	
		
			
				|  |  |  |  |     # check that uploaddir does not exist before servefile is started | 
		
	
		
			
				|  |  |  |  |     assert not uploaddir.is_dir() | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |     run_servefile(['-u', str(uploaddir)]) | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |     # check that servefile created the directory | 
		
	
		
			
				|  |  |  |  |     assert uploaddir.is_dir() | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |     # check upload form present | 
		
	
		
			
				|  |  |  |  |     r = make_request() | 
		
	
		
			
				|  |  |  |  |     assert r.status_code == 200 | 
		
	
		
			
				|  |  |  |  |     assert 'multipart/form-data' in r.text | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |     # upload file | 
		
	
		
			
				|  |  |  |  |     files = {'file': ('haiku.txt', data.decode())} | 
		
	
		
			
				|  |  |  |  |     r = make_request(method='post', files=files) | 
		
	
		
			
				|  |  |  |  |     assert 'Thanks' in r.text | 
		
	
		
			
				|  |  |  |  |     assert r.status_code == 200 | 
		
	
		
			
				|  |  |  |  |     with open(str(uploaddir / 'haiku.txt')) as f: | 
		
	
		
			
				|  |  |  |  |         assert f.read() == data | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |     # upload file AGAIN!! (and check it is available unter a different name) | 
		
	
		
			
				|  |  |  |  |     files = {'file': ('haiku.txt', data.decode())} | 
		
	
		
			
				|  |  |  |  |     r = make_request(method='post', files=files) | 
		
	
		
			
				|  |  |  |  |     assert r.status_code == 200 | 
		
	
		
			
				|  |  |  |  |     with open(str(uploaddir / 'haiku.txt(1)')) as f: | 
		
	
		
			
				|  |  |  |  |         assert f.read() == data | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | def test_upload_size_limit(run_servefile, tmp_path): | 
		
	
		
			
				|  |  |  |  |     uploaddir = tmp_path / 'upload' | 
		
	
		
			
				|  |  |  |  |     run_servefile(['-s', '2kb', '-u', str(uploaddir)]) | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |     # upload file that is too big | 
		
	
		
			
				|  |  |  |  |     files = {'file': ('toobig', "x" * 2049)} | 
		
	
		
			
				|  |  |  |  |     r = make_request(method='post', files=files) | 
		
	
		
			
				|  |  |  |  |     assert 'Your file was too big' in r.text | 
		
	
		
			
				|  |  |  |  |     assert r.status_code == 413 | 
		
	
		
			
				|  |  |  |  |     assert not (uploaddir / 'toobig').exists() | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |     # upload file that should fit | 
		
	
		
			
				|  |  |  |  |     # the size has to be smaller than 2kb, as the sent size also includes mime-headers | 
		
	
		
			
				|  |  |  |  |     files = {'file': ('justright', "x" * 1900)} | 
		
	
		
			
				|  |  |  |  |     r = make_request(method='post', files=files) | 
		
	
		
			
				|  |  |  |  |     assert r.status_code == 200 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | def test_tar_mode(run_servefile, datadir): | 
		
	
		
			
				|  |  |  |  |     d = { | 
		
	
		
			
				|  |  |  |  |         'foo': { | 
		
	
		
			
				|  |  |  |  |             'bar': 'hello testmode my old friend', | 
		
	
		
			
				|  |  |  |  |             'baz': 'you came to test me once again', | 
		
	
		
			
				|  |  |  |  |         } | 
		
	
		
			
				|  |  |  |  |     } | 
		
	
		
			
				|  |  |  |  |     p = datadir(d) | 
		
	
		
			
				|  |  |  |  |     run_servefile(['-t', str(p / 'foo')]) | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |     # test redirect? | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |     # test contents of tar file | 
		
	
		
			
				|  |  |  |  |     r = make_request() | 
		
	
		
			
				|  |  |  |  |     assert r.status_code == 200 | 
		
	
		
			
				|  |  |  |  |     tar = tarfile.open(fileobj=io.BytesIO(r.text.encode())) | 
		
	
		
			
				|  |  |  |  |     assert len(tar.getmembers()) == 3 | 
		
	
		
			
				|  |  |  |  |     assert tar.getmember('foo').isdir() | 
		
	
		
			
				|  |  |  |  |     for filename, content in d['foo'].items(): | 
		
	
		
			
				|  |  |  |  |         info = tar.getmember('foo/{}'.format(filename)) | 
		
	
		
			
				|  |  |  |  |         assert info.isfile | 
		
	
		
			
				|  |  |  |  |         assert tar.extractfile(info.path).read() == content | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | def test_tar_compression(run_servefile, datadir): | 
		
	
		
			
				|  |  |  |  |     pass | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | def test_https(run_servefile, datadir): | 
		
	
		
			
				|  |  |  |  |     data = "NOOT NOOT" | 
		
	
		
			
				|  |  |  |  |     p = datadir({'testfile': data}) / 'testfile' | 
		
	
		
			
				|  |  |  |  |     run_servefile(['--ssl', str(p)]) | 
		
	
		
			
				|  |  |  |  |     time.sleep(0.2)  # time for generating ssl certificates | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |     # fingerprint = None | 
		
	
		
			
				|  |  |  |  |     # while not fingerprint: | 
		
	
		
			
				|  |  |  |  |     #     line = s.stdout.readline() | 
		
	
		
			
				|  |  |  |  |     #     print(line) | 
		
	
		
			
				|  |  |  |  |     #     # if we find this line we went too far... | 
		
	
		
			
				|  |  |  |  |     #     assert not line.startswith("Some addresses this file will be available at") | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |     #     if line.startswith("SHA1 fingerprint"): | 
		
	
		
			
				|  |  |  |  |     #         fingerprint = line.replace("SHA1 fingerprint: ", "").strip() | 
		
	
		
			
				|  |  |  |  |     #         break | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |     # assert fingerprint | 
		
	
		
			
				|  |  |  |  |     urllib3.disable_warnings() | 
		
	
		
			
				|  |  |  |  |     check_download(data, protocol='https', verify=False) |