Compare commits

..

1 Commits

Author SHA1 Message Date
Sebastian Lohff 41a0f64ff7 Explicitly set encoding for http requests in tests
Due to the upgrade to charset-normalizer 2.0.4 guessing the encoding
inside the tests did not work anymore and caused the umlaut tests to
fail. Explicitly specifying the encoding on the requests' response
object fixes this.
2021-09-07 23:12:01 +02:00
7 changed files with 993 additions and 1104 deletions

View File

@ -11,7 +11,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python: [2.7, 3.7, 3.8, 3.9, "3.10", 3.11]
python: [2.7, 3.6, 3.7, 3.8, 3.9]
steps:
- uses: actions/checkout@v2
@ -23,15 +23,3 @@ jobs:
run: pip install tox
- name: Run Tox
run: tox -e py
pep8:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Setup python
uses: actions/setup-python@v2
with:
python-version: 3.9
- name: Install Tox
run: pip install tox
- name: Run Tox pep8
run: "tox -e pep8"

View File

@ -1,29 +1,9 @@
servefile changelog
===================
2023-01-23 v0.5.4
-----------------
0.5.4 released
* code reformatting for better maintainability
* upload to uploaddir instead of /tmp for large files
* add python3.10 / python3.11 support
* drop python3.6 support
2021-11-18 v0.5.3
-----------------
0.5.3 released
* improved test performance
2021-09-08 v0.5.2
-----------------
0.5.2 released
Unreleased
----------
* fixed bug where exception was shown on transmission abort with python3
* fixed wrong/outdated pyopenssl package names

View File

@ -1,4 +1,4 @@
.TH SERVEFILE 1 "January 2023" "servefile 0.5.4" "User Commands"
.TH SERVEFILE 1 "September 2020" "servefile 0.5.1" "User Commands"
.SH NAME
servefile \- small HTTP-Server for temporary file transfer

View File

@ -7,10 +7,11 @@
from __future__ import print_function
__version__ = '0.5.4'
__version__ = '0.5.1'
import argparse
import base64
import cgi
import datetime
import io
import mimetypes
@ -20,9 +21,7 @@ import select
import socket
from subprocess import Popen, PIPE
import sys
import tempfile
import time
import warnings
# fix imports for python2/python3
try:
@ -43,18 +42,11 @@ try:
except ImportError:
pass
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=DeprecationWarning)
# scheduled for removal in python3.13, used for FieldStorage
import cgi
def getDateStrNow():
""" Get the current time formatted for HTTP header """
now = datetime.datetime.fromtimestamp(time.mktime(time.gmtime()))
return now.strftime("%a, %d %b %Y %H:%M:%S GMT")
class FileBaseHandler(BaseHTTPServer.BaseHTTPRequestHandler):
fileName = None
blockSize = 1024 * 1024
@ -114,7 +106,7 @@ class FileBaseHandler(BaseHTTPServer.BaseHTTPRequestHandler):
except ValueError:
return (False, None)
if fromto[0] >= fileLength or fromto[0] < 0 or fromto[1] >= fileLength or fromto[1] - fromto[0] < 0:
if fromto[0] >= fileLength or fromto[0] < 0 or fromto[1] >= fileLength or fromto[1]-fromto[0] < 0:
# oops, already done! (requested range out of range)
self.send_response(416)
self.send_header('Content-Range', 'bytes */%d' % fileLength)
@ -149,7 +141,7 @@ class FileBaseHandler(BaseHTTPServer.BaseHTTPRequestHandler):
# now we can wind the file *brrrrrr*
myfile.seek(fromto[0])
if fromto is not None:
if fromto != None:
self.send_response(216)
self.send_header('Content-Range', 'bytes %d-%d/%d' % (fromto[0], fromto[1], fileLength))
fileLength = fromto[1] - fromto[0] + 1
@ -170,8 +162,8 @@ class FileBaseHandler(BaseHTTPServer.BaseHTTPRequestHandler):
return True
def getChunk(self, myfile, fromto):
if fromto and myfile.tell() + self.blockSize >= fromto[1]:
readsize = fromto[1] - myfile.tell() + 1
if fromto and myfile.tell()+self.blockSize >= fromto[1]:
readsize = fromto[1]-myfile.tell()+1
else:
readsize = self.blockSize
return myfile.read(readsize)
@ -254,7 +246,7 @@ class TarFileHandler(FileBaseHandler):
# give the process a short time to find out if it can
# pack/compress the file
time.sleep(0.05)
if tarCmd.poll() is not None and tarCmd.poll() != 0:
if tarCmd.poll() != None and tarCmd.poll() != 0:
# something went wrong
print("Error while compressing '%s'. Aborting request." % self.target)
self.send_response(500)
@ -424,7 +416,7 @@ class DirListingHandler(FileBaseHandler):
</tr>
</thead>
<tbody>
""" % {'path': os.path.normpath(unquote(self.path))} # noqa: E501
""" % {'path': os.path.normpath(unquote(self.path))}
footer = """</tbody></table></div>
<div class="footer"><a href="http://seba-geek.de/stuff/servefile/">servefile %(version)s</a></div>
<script>
@ -510,7 +502,7 @@ class DirListingHandler(FileBaseHandler):
dir_items = list()
file_items = list()
for item in [".."] + sorted(os.listdir(path), key=lambda x: x.lower()):
for item in [".."] + sorted(os.listdir(path), key=lambda x:x.lower()):
# create path to item
itemPath = os.path.join(path, item)
@ -532,7 +524,10 @@ class DirListingHandler(FileBaseHandler):
target_items.append((item, itemPath, stat))
# Directories first, then files
for (tuple_list, is_dir) in ((dir_items, True), (file_items, False)):
for (tuple_list, is_dir) in (
(dir_items, True),
(file_items, False),
):
for (item, itemPath, stat) in tuple_list:
self._appendToListing(content, item, itemPath, stat, is_dir=is_dir)
@ -614,25 +609,8 @@ class FilePutter(BaseHTTPServer.BaseHTTPRequestHandler):
# create FieldStorage object for multipart parsing
env = os.environ
env['REQUEST_METHOD'] = "POST"
targetDir = self.targetDir
class CustomFieldStorage(cgi.FieldStorage):
def make_file(self, *args, **kwargs):
"""Overwritten to use a named file and the upload directory
Python 2.7 has an unused "binary" argument while Python 3 does
not have any arguments. Python 2.7 does not have a
self._binary_file attribute.
"""
if sys.version_info.major == 2 or self._binary_file:
return tempfile.NamedTemporaryFile("wb+", dir=targetDir)
else:
return tempfile.NamedTemporaryFile(
"w+", encoding=self.encoding, newline='\n', dir=targetDir)
fstorage = CustomFieldStorage(fp=self.rfile, headers=self.headers, environ=env)
if "file" not in fstorage:
fstorage = cgi.FieldStorage(fp=self.rfile, headers=self.headers, environ=env)
if not "file" in fstorage:
self.sendResponse(400, "No file found in request.")
return
@ -641,14 +619,7 @@ class FilePutter(BaseHTTPServer.BaseHTTPRequestHandler):
self.sendResponse(400, "Filename was empty or invalid")
return
# put the file at the right place, send 200 afterwards
if getattr(fstorage["file"].file, "name", None):
# the sent file was large, so we can just hard link the temporary
# file and are done
os.link(fstorage["file"].file.name, destFileName)
else:
# write file to disk. it was small enough so no temporary file was
# created
# write file down to disk, send a 200 afterwards
target = open(destFileName, "wb")
bytesLeft = length
while bytesLeft > 0:
@ -706,8 +677,7 @@ class FilePutter(BaseHTTPServer.BaseHTTPRequestHandler):
self.sendResponse(411, "Content-Length was invalid or not set.")
return -1
if self.maxUploadSize > 0 and length > self.maxUploadSize:
self.sendResponse(413, "Your file was too big! Maximum allowed size is %d byte. <a href=\"/\">back</a>" %
self.maxUploadSize)
self.sendResponse(413, "Your file was too big! Maximum allowed size is %d byte. <a href=\"/\">back</a>" % self.maxUploadSize)
return -1
return length
@ -744,7 +714,6 @@ class FilePutter(BaseHTTPServer.BaseHTTPRequestHandler):
return extraDestFileName
# never reached
class ThreadedHTTPServer(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
def handle_error(self, request, client_address):
_, exc_value, _ = sys.exc_info()
@ -820,7 +789,6 @@ class SecureHandler():
self.rfile = socket._fileobject(self.request, "rb", self.rbufsize)
self.wfile = socket._fileobject(self.request, "wb", self.wbufsize)
class ServeFileException(Exception):
pass
@ -845,8 +813,7 @@ class ServeFile():
if self.serveMode not in range(self._NUM_MODES):
self.serveMode = None
raise ValueError("Unknown serve mode, needs to be MODE_SINGLE, "
"MODE_SINGLETAR, MODE_UPLOAD or MODE_DIRLIST.")
raise ValueError("Unknown serve mode, needs to be MODE_SINGLE, MODE_SINGLETAR, MODE_UPLOAD or MODE_DIRLIST.")
def setIPv4(self, ipv4):
""" En- or disable ipv4 """
@ -860,23 +827,23 @@ class ServeFile():
""" Get IPs from all interfaces via ip or ifconfig. """
# ip and ifconfig sometimes are located in /sbin/
os.environ['PATH'] += ':/sbin:/usr/sbin'
proc = Popen(r"ip addr|"
r"sed -n -e 's/.*inet6\{0,1\} \([0-9.a-fA-F:]\+\).*/\1/ p'|"
r"grep -v '^fe80\|^127.0.0.1\|^::1'",
proc = Popen(r"ip addr|" + \
"sed -n -e 's/.*inet6\{0,1\} \([0-9.a-fA-F:]\+\).*/\\1/ p'|" + \
"grep -v '^fe80\|^127.0.0.1\|^::1'", \
shell=True, stdout=PIPE, stderr=PIPE)
if proc.wait() != 0:
# ip failed somehow, falling back to ifconfig
oldLang = os.environ.get("LC_ALL", None)
os.environ['LC_ALL'] = "C"
proc = Popen(r"ifconfig|"
r"sed -n 's/.*inet6\{0,1\}\( addr:\)\{0,1\} \{0,1\}\([0-9a-fA-F.:]*\).*/"
r"\2/p'|"
r"grep -v '^fe80\|^127.0.0.1\|^::1'",
proc = Popen(r"ifconfig|" + \
"sed -n 's/.*inet6\{0,1\}\( addr:\)\{0,1\} \{0,1\}\([0-9a-fA-F.:]*\).*/" + \
"\\2/p'|" + \
"grep -v '^fe80\|^127.0.0.1\|^::1'", \
shell=True, stdout=PIPE, stderr=PIPE)
if oldLang:
os.environ['LC_ALL'] = oldLang
else:
del os.environ['LC_ALL']
del(os.environ['LC_ALL'])
if proc.wait() != 0:
# we couldn't find any ip address
proc = None
@ -919,7 +886,7 @@ class ServeFile():
req = crypto.X509Req()
subj = req.get_subject()
subj.CN = "127.0.0.1"
subj.O = "servefile laboratories" # noqa: E741
subj.O = "servefile laboratories"
subj.OU = "servefile"
# generate altnames
@ -940,7 +907,7 @@ class ServeFile():
# with the same serial ==> we just use the seconds as serial.
cert.set_serial_number(int(time.time()))
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(365 * 24 * 60 * 60)
cert.gmtime_adj_notAfter(365*24*60*60)
cert.set_issuer(req.get_subject())
cert.set_subject(req.get_subject())
cert.add_extensions([ext])
@ -984,8 +951,7 @@ class ServeFile():
server = SecureThreadedHTTPServer(self._getCert(), self._getKey(),
(listenIp, self.port), handler, bind_and_activate=False)
except SSL.Error as e:
raise ServeFileException("SSL error: Could not read SSL public/private key "
"from file(s) (error was: \"%s\")" % (e[0][0][2],))
raise ServeFileException("SSL error: Could not read SSL public/private key from file(s) (error was: \"%s\")" % (e[0][0][2],))
else:
server = ThreadedHTTPServer((listenIp, self.port), handler,
bind_and_activate=False)
@ -1018,7 +984,7 @@ class ServeFile():
print("Serving \"%s\" for uploads at port %d." % (self.target, self.port))
# print urls with local network adresses
print("\nSome addresses %s will be available at:" %
print("\nSome addresses %s will be available at:" % \
("this file" if (self.serveMode != self.MODE_UPLOAD) else "the uploadform", ))
ips = self.getIPs()
if not ips or len(ips) == 0 or ips[0] == '':
@ -1075,8 +1041,7 @@ class ServeFile():
try:
os.mkdir(self.target)
except (IOError, OSError) as e:
raise ServeFileException("Error: Could not create directory '%s' for uploads, %r" %
(self.target, str(e)))
raise ServeFileException("Error: Could not create directory '%s' for uploads, %r" % (self.target, str(e)))
else:
raise ServeFileException("Error: Upload directory already exists and is a file.")
FilePutter.targetDir = os.path.abspath(self.target)
@ -1095,7 +1060,6 @@ class ServeFile():
AuthenticationHandler.authString = self.auth
if self.authrealm:
AuthenticationHandler.realm = self.authrealm
class AuthenticatedHandler(AuthenticationHandler, handler):
pass
handler = AuthenticatedHandler
@ -1141,8 +1105,7 @@ class AuthenticationHandler():
self.send_response(401)
self.send_header("WWW-Authenticate", "Basic realm=\"%s\"" % self.realm)
self.send_header("Connection", "close")
errorMsg = ("<html><head><title>401 - Unauthorized</title></head>"
"<body><h1>401 - Unauthorized</h1></body></html>")
errorMsg = "<html><head><title>401 - Unauthorized</title></head><body><h1>401 - Unauthorized</h1></body></html>"
self.send_header("Content-Length", str(len(errorMsg)))
self.end_headers()
self.wfile.write(errorMsg.encode())
@ -1152,35 +1115,32 @@ def main():
parser = argparse.ArgumentParser(prog='servefile', description='Serve a single file via HTTP.')
parser.add_argument('--version', action='version', version='%(prog)s ' + __version__)
parser.add_argument('target', metavar='file/directory', type=str)
parser.add_argument('-p', '--port', type=int, default=8080,
parser.add_argument('-p', '--port', type=int, default=8080, \
help='Port to listen on')
parser.add_argument('-u', '--upload', action="store_true", default=False,
parser.add_argument('-u', '--upload', action="store_true", default=False, \
help="Enable uploads to a given directory")
parser.add_argument('-s', '--max-upload-size', type=str,
parser.add_argument('-s', '--max-upload-size', type=str, \
help="Limit upload size in kB. Size modifiers are allowed, e.g. 2G, 12MB, 1B")
parser.add_argument('-l', '--list-dir', action="store_true", default=False,
parser.add_argument('-l', '--list-dir', action="store_true", default=False, \
help="Show directory indexes and allow access to all subdirectories")
parser.add_argument('--ssl', action="store_true", default=False,
parser.add_argument('--ssl', action="store_true", default=False, \
help="Enable SSL. If no key/cert is specified one will be generated")
parser.add_argument('--key', type=str,
help="Keyfile to use for SSL. If no cert is given with --cert the keyfile "
"will also be searched for a cert")
parser.add_argument('--cert', type=str,
parser.add_argument('--key', type=str, \
help="Keyfile to use for SSL. If no cert is given with --cert the keyfile will also be searched for a cert")
parser.add_argument('--cert', type=str, \
help="Certfile to use for SSL")
parser.add_argument('-a', '--auth', type=str, metavar='user:password',
parser.add_argument('-a', '--auth', type=str, metavar='user:password', \
help="Set user and password for HTTP basic authentication")
parser.add_argument('--realm', type=str, default=None,
parser.add_argument('--realm', type=str, default=None,\
help="Set a realm for HTTP basic authentication")
parser.add_argument('-t', '--tar', action="store_true", default=False,
help="Enable on the fly tar creation for given file or directory. "
"Note: Download continuation will not be available")
parser.add_argument('-c', '--compression', type=str, metavar='method',
default="none",
help="Set compression method, only in combination with --tar. "
"Can be one of %s" % ", ".join(TarFileHandler.compressionMethods))
parser.add_argument('-4', '--ipv4-only', action="store_true", default=False,
parser.add_argument('-t', '--tar', action="store_true", default=False, \
help="Enable on the fly tar creation for given file or directory. Note: Download continuation will not be available")
parser.add_argument('-c', '--compression', type=str, metavar='method', \
default="none", \
help="Set compression method, only in combination with --tar. Can be one of %s" % ", ".join(TarFileHandler.compressionMethods))
parser.add_argument('-4', '--ipv4-only', action="store_true", default=False, \
help="Listen on IPv4 only")
parser.add_argument('-6', '--ipv6-only', action="store_true", default=False,
parser.add_argument('-6', '--ipv6-only', action="store_true", default=False, \
help="Listen on IPv6 only")
args = parser.parse_args()
@ -1196,7 +1156,7 @@ def main():
sys.exit(1)
if args.max_upload_size:
sizeRe = re.match(r"^(\d+(?:[,.]\d+)?)(?:([bkmgtpe])(?:(?<!b)b?)?)?$", args.max_upload_size.lower())
sizeRe = re.match("^(\d+(?:[,.]\d+)?)(?:([bkmgtpe])(?:(?<!b)b?)?)?$", args.max_upload_size.lower())
if not sizeRe:
print("Error: Your max upload size param is broken. Try something like 3M or 2.5Gb.")
sys.exit(1)
@ -1222,9 +1182,8 @@ def main():
if args.auth:
dpos = args.auth.find(":")
if dpos <= 0 or dpos == (len(args.auth) - 1):
print("Error: User and password for HTTP basic authentication need to be both "
"at least one character and have to be separated by a \":\".")
if dpos <= 0 or dpos == (len(args.auth)-1):
print("Error: User and password for HTTP basic authentication need to be both at least one character and have to be separated by a \":\".")
sys.exit(1)
if args.realm and not args.auth:
@ -1295,3 +1254,4 @@ def main():
if __name__ == '__main__':
main()

View File

@ -11,7 +11,7 @@ setup(
long_description=long_description,
long_description_content_type='text/markdown',
platforms='posix',
version='0.5.4',
version='0.5.1',
license='GPLv3 or later',
url='https://github.com/sebageek/servefile/',
author='Sebastian Lohff',
@ -38,11 +38,10 @@ setup(
'Programming Language :: Python :: 2',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Programming Language :: Python :: 3.11',
'Topic :: Communications',
'Topic :: Communications :: File Sharing',
'Topic :: Internet',

View File

@ -9,7 +9,6 @@ import sys
import tarfile
import time
import urllib3
from requests.exceptions import ConnectionError
# crudly written to learn more about pytest and to have a base for refactoring
@ -59,6 +58,7 @@ def run_servefile():
print("running {} with args {}".format(", ".join(servefile_path), args))
p = subprocess.Popen([sys.executable] + servefile_path + args, **kwargs)
time.sleep(kwargs.get('timeout', 0.3))
instances.append(p)
return p
@ -91,7 +91,7 @@ def datadir(tmp_path):
return _datadir
def make_request(path='/', host='localhost', port=SERVEFILE_DEFAULT_PORT, method='get', protocol='http',
def make_request(path='/', host='localhost', port=SERVEFILE_DEFAULT_PORT, method='get', protocol='http', timeout=5,
encoding='utf-8', **kwargs):
url = '{}://{}:{}{}'.format(protocol, host, port, path)
print('Calling {} on {} with {}'.format(method, url, kwargs))
@ -103,7 +103,7 @@ def make_request(path='/', host='localhost', port=SERVEFILE_DEFAULT_PORT, method
return r
def check_download(expected_data=None, path='/', fname=None, **kwargs):
def check_download(expected_data=None, path='/', fname=None, status_code=200, **kwargs):
if fname is None:
fname = os.path.basename(path)
r = make_request(path, **kwargs)
@ -117,22 +117,6 @@ def check_download(expected_data=None, path='/', fname=None, **kwargs):
return r # for additional tests
def _retry_while(exception, function, timeout=2):
now = time.time # float seconds since epoch
def wrapped(*args, **kwargs):
timeout_after = now() + timeout
while True:
try:
return function(*args, **kwargs)
except exception:
if now() >= timeout_after:
raise
time.sleep(0.1)
return wrapped
def _test_version(run_servefile, standalone):
# we expect the version on stdout (python3.4+) or stderr(python2.6-3.3)
s = run_servefile('--version', standalone=standalone, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
@ -146,7 +130,7 @@ def _test_version(run_servefile, standalone):
version = s.stdout.readline().decode().strip()
# hardcode version as string until servefile is a module
assert version == 'servefile 0.5.4'
assert version == 'servefile 0.5.1'
def test_version(run_servefile):
@ -163,7 +147,7 @@ def test_correct_headers(run_servefile, datadir):
p = datadir({'testfile': data}) / 'testfile'
run_servefile(str(p))
r = _retry_while(ConnectionError, make_request)()
r = make_request()
assert r.status_code == 200
assert r.headers.get('Content-Type') == 'application/octet-stream'
assert r.headers.get('Content-Disposition') == 'attachment; filename="testfile"'
@ -176,7 +160,7 @@ def test_redirect_and_download(run_servefile, datadir):
run_servefile(str(p))
# redirect
r = _retry_while(ConnectionError, make_request)(allow_redirects=False)
r = make_request(allow_redirects=False)
assert r.status_code == 302
assert r.headers.get('Location') == '/testfile'
@ -191,7 +175,7 @@ def test_redirect_and_download_with_umlaut(run_servefile, datadir):
run_servefile(str(p))
# redirect
r = _retry_while(ConnectionError, make_request)(allow_redirects=False)
r = make_request(allow_redirects=False)
assert r.status_code == 302
assert r.headers.get('Location') == '/{}'.format(quote(filename))
@ -206,7 +190,7 @@ def test_specify_port(run_servefile, datadir):
p = datadir({'testfile': data}) / 'testfile'
run_servefile([str(p), '-p', str(SERVEFILE_SECONDARY_PORT)])
_retry_while(ConnectionError, check_download)(data, fname='testfile', port=SERVEFILE_SECONDARY_PORT)
check_download(data, fname='testfile', port=SERVEFILE_SECONDARY_PORT)
def test_ipv4_only(run_servefile, datadir):
@ -214,7 +198,7 @@ def test_ipv4_only(run_servefile, datadir):
p = datadir({'testfile': data}) / 'testfile'
run_servefile([str(p), '-4'])
_retry_while(ConnectionError, check_download)(data, fname='testfile', host='127.0.0.1')
check_download(data, fname='testfile', host='127.0.0.1')
sock = socket.socket(socket.AF_INET6)
with pytest.raises(connrefused_exc):
@ -227,7 +211,7 @@ def test_big_download(run_servefile, datadir):
p = datadir({'testfile': data}) / 'testfile'
run_servefile(str(p))
_retry_while(ConnectionError, check_download)(data, fname='testfile')
check_download(data, fname='testfile')
def test_authentication(run_servefile, datadir):
@ -236,11 +220,11 @@ def test_authentication(run_servefile, datadir):
run_servefile([str(p), '-a', 'user:password'])
for auth in [('foo', 'bar'), ('user', 'wrong'), ('unknown', 'password')]:
r = _retry_while(ConnectionError, make_request)(auth=auth)
r = make_request(auth=auth)
assert '401 - Unauthorized' in r.text
assert r.status_code == 401
_retry_while(ConnectionError, check_download)(data, fname='testfile', auth=('user', 'password'))
check_download(data, fname='testfile', auth=('user', 'password'))
def test_serve_directory(run_servefile, datadir):
@ -257,12 +241,12 @@ def test_serve_directory(run_servefile, datadir):
# check if all files are in directory listing
# (could be made more sophisticated with beautifulsoup)
for path in '/', '/../':
r = _retry_while(ConnectionError, make_request)(path)
r = make_request(path)
for k in d:
assert quote(k) in r.text
for fname, content in d['foo'].items():
_retry_while(ConnectionError, check_download)(content, '/foo/' + fname)
check_download(content, '/foo/' + fname)
r = make_request('/unknown')
assert r.status_code == 404
@ -284,7 +268,7 @@ def test_serve_relative_directory(run_servefile, datadir):
# check if all files are in directory listing
# (could be made more sophisticated with beautifulsoup)
for path in '/', '/../':
r = _retry_while(ConnectionError, make_request)(path)
r = make_request(path)
for k in d:
assert k in r.text
@ -308,14 +292,14 @@ def test_upload(run_servefile, tmp_path):
run_servefile(['-u', str(uploaddir)])
# check upload form present
r = _retry_while(ConnectionError, make_request)()
assert r.status_code == 200
assert 'multipart/form-data' in r.text
# check that servefile created the directory
assert uploaddir.is_dir()
# check upload form present
r = make_request()
assert r.status_code == 200
assert 'multipart/form-data' in r.text
# upload file
files = {'file': ('haiku.txt', data)}
r = make_request(method='post', files=files)
@ -345,7 +329,7 @@ def test_upload_size_limit(run_servefile, tmp_path):
# upload file that is too big
files = {'file': ('toobig', "x" * 2049)}
r = _retry_while(ConnectionError, make_request)(method='post', files=files)
r = make_request(method='post', files=files)
assert 'Your file was too big' in r.text
assert r.status_code == 413
assert not (uploaddir / 'toobig').exists()
@ -357,20 +341,6 @@ def test_upload_size_limit(run_servefile, tmp_path):
assert r.status_code == 200
def test_upload_large_file(run_servefile, tmp_path):
# small files end up in BytesIO while large files get temporary files. this
# test makes sure we hit the large file codepath at least once
uploaddir = tmp_path / 'upload'
run_servefile(['-u', str(uploaddir)])
data = "asdf" * 1024
files = {'file': ('more_data.txt', data)}
r = _retry_while(ConnectionError, make_request)(method='post', files=files)
assert r.status_code == 200
with open(str(uploaddir / 'more_data.txt')) as f:
assert f.read() == data
def test_tar_mode(run_servefile, datadir):
d = {
'foo': {
@ -384,7 +354,7 @@ def test_tar_mode(run_servefile, datadir):
# test redirect?
# test contents of tar file
r = _retry_while(ConnectionError, make_request)()
r = make_request()
assert r.status_code == 200
tar = tarfile.open(fileobj=io.BytesIO(r.content))
assert len(tar.getmembers()) == 3
@ -400,7 +370,7 @@ def test_tar_compression(run_servefile, datadir):
p = datadir(d)
run_servefile(['-c', 'gzip', '-t', str(p / 'foo')])
r = _retry_while(ConnectionError, make_request)()
r = make_request()
assert r.status_code == 200
tar = tarfile.open(fileobj=io.BytesIO(r.content), mode='r:gz')
assert len(tar.getmembers()) == 1
@ -410,6 +380,7 @@ def test_https(run_servefile, datadir):
data = "NOOT NOOT"
p = datadir({'testfile': data}) / 'testfile'
run_servefile(['--ssl', str(p)])
time.sleep(0.2) # time for generating ssl certificates
# fingerprint = None
# while not fingerprint:
@ -424,7 +395,7 @@ def test_https(run_servefile, datadir):
# assert fingerprint
urllib3.disable_warnings()
_retry_while(ConnectionError, check_download)(data, protocol='https', verify=False)
check_download(data, protocol='https', verify=False)
def test_https_big_download(run_servefile, datadir):
@ -432,9 +403,10 @@ def test_https_big_download(run_servefile, datadir):
data = "x" * (10 * 1024 ** 2)
p = datadir({'testfile': data}) / 'testfile'
run_servefile(['--ssl', str(p)])
time.sleep(0.2) # time for generating ssl certificates
urllib3.disable_warnings()
_retry_while(ConnectionError, check_download)(data, protocol='https', verify=False)
check_download(data, protocol='https', verify=False)
def test_abort_download(run_servefile, datadir):
@ -447,7 +419,7 @@ def test_abort_download(run_servefile, datadir):
# provoke a connection abort
# hopefully the buffers will not fill up with all of the 10mb
sock = socket.socket(socket.AF_INET)
_retry_while(connrefused_exc, sock.connect)(("localhost", SERVEFILE_DEFAULT_PORT))
sock.connect(("localhost", SERVEFILE_DEFAULT_PORT))
sock.send(b"GET /testfile HTTP/1.0\n\n")
resp = sock.recv(100)
assert resp != b''

14
tox.ini
View File

@ -1,19 +1,9 @@
[tox]
envlist = py27,py37,py38,py39,py310,py311,pep8
envlist = py27,py36,py37,py38,py39
[testenv]
deps =
pathlib2; python_version<"3"
pytest
requests
flake8
commands = pytest -v --tb=short {posargs}
[testenv:pep8]
commands = flake8 servefile/ {posargs}
[flake8]
show-source = True
max-line-length = 120
ignore = E123,E125,E241,E402,E741,W503,W504,H301
exclude=.venv,.git,.tox,dist,doc,*lib/python*,*egg,build
commands = pytest --tb=short {posargs}