Compare commits

..

No commits in common. "master" and "v0.4.0" have entirely different histories.

13 changed files with 641 additions and 2152 deletions

View File

@ -1,37 +0,0 @@
name: Run Tox
on:
push:
branches:
- master
pull_request:
jobs:
build:
runs-on: ubuntu-latest
strategy:
matrix:
python: [2.7, 3.7, 3.8, 3.9, "3.10", 3.11]
steps:
- uses: actions/checkout@v2
- name: Setup Python
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python }}
- name: Install Tox
run: pip install tox
- name: Run Tox
run: tox -e py
pep8:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Setup python
uses: actions/setup-python@v2
with:
python-version: 3.9
- name: Install Tox
run: pip install tox
- name: Run Tox pep8
run: "tox -e pep8"

5
.gitignore vendored
View File

@ -1,7 +1,2 @@
MANIFEST
dist/
*.pyc
__pycache__
*.swp
servefile.egg-info
.tox

125
ChangeLog
View File

@ -1,125 +0,0 @@
servefile changelog
===================
2023-01-23 v0.5.4
-----------------
0.5.4 released
* code reformatting for better maintainability
* upload to uploaddir instead of /tmp for large files
* add python3.10 / python3.11 support
* drop python3.6 support
2021-11-18 v0.5.3
-----------------
0.5.3 released
* improved test performance
2021-09-08 v0.5.2
-----------------
0.5.2 released
* fixed bug where exception was shown on transmission abort with python3
* fixed wrong/outdated pyopenssl package names
* tests are now using a free non-default port to avoid clashes; if
wished the ports can be set from outside by specifying the
environment variables SERVEFILE_DEFAULT_PORT and
SERVEFILE_SECONDARY_PORT
* fixed broken redirect when filename contained umlauts or other characters
that should have been quoted
* fixed broken special char handling in directory listing for python2
* drop python3.5 support
* fixed PUT uploads with python3 and documented PUT-uploads with curl
2020-10-30 v0.5.1
-----------------
0.5.1 released
* version bump for broken pypi release
2020-10-29 v0.5.0
-----------------
0.5.0 released
* python3 support
* test suite
* fixed an endless redirect loop when serving ../
* added sorting for list view
* added lzma/xz as compression method
2015-11-10 v0.4.4
-----------------
0.4.4 released
* prefer using TLS1.2/TLS1 with --ssl if available
* issue v3 certificates for self signed certificates with --ssl
* removed lots of unnecessary error output
* fixed a bug where wrong ranges were used on a HEAD request in directory listing mode
* fixed a bug where directory listing mode allowed path traversal
2013-12-28 v0.4.3
-----------------
0.4.3 released
* display user/password in url-list when authentication is used
* various directory-listing patches by Sebastian Pipping
* case-insensitive sorting
* sort directories to top
* hide .. in top directory
* better error reporting/exception handling patch by Robert Buchholz
* properly tell clients that http keep-alive is not available
2012-06-27 v0.4.2
-----------------
0.4.2 released
* new directory listing + nicer index
* IPv6 support
* basic auth realm configurable
* various bugfixes
2012-05-04 v0.4.1
-----------------
0.4.1 released
* tar + compression feature
* compression
* shows fingerprint for self generated certs
* added manpage
2012-04-16 v0.4.0
-----------------
0.4.0 released
* SSL capabilities
* Automatic creation of self signed certificates
* HTTP basic auth
* HEAD support
* POST/Multipart upload support
* PUT/POST upload support
2012-04-05 v0.3.2
-----------------
0.3.2 released
* argparse

View File

@ -1,2 +0,0 @@
include ChangeLog
include servefile.1

View File

@ -1,33 +0,0 @@
Servefile
=========
Serve files from shell via a small HTTP server. The server redirects all HTTP
requests to the file, so only IP and port must be given to another user to
access the file. Its main purpose is to quickly send a file to users in your
local network, independent of their current setup (OS/software). Besides that
it also supports uploads, SSL, HTTP basic auth and directory listings.
Features:
* serve single file
* serve a directory with directory index
* file upload via webinterface
* HTTPS with on the fly generated self signed SSL certificates
* HTTP basic authentication
* serving files/directories as on request generated tar files
Install
-------
Via pip
```shell
pip install servefile
```
After installation either execute `servefile --help` or `python -m servefile --help`
Standalone:
If you don't have pip available just copy `servefile/servefile.py` onto the target machine, make it executable and you are ready to go.
```shell
$ wget https://raw.githubusercontent.com/sebageek/servefile/master/servefile/servefile.py -O servefile
$ chmod +x servefile
$ ./servefile --help
```

628
servefile Executable file
View File

@ -0,0 +1,628 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Licensed under GNU General Public License v3 or later
# Written by Sebastian Lohff (seba@seba-geek.de)
# http://seba-geek.de/stuff/servefile/
__version__ = '0.4.0'
import argparse
import base64
import cgi
import BaseHTTPServer
import commands
import datetime
import urllib
import os
import re
import SimpleHTTPServer
import SocketServer
import socket
from stat import ST_SIZE
from subprocess import Popen, PIPE
import sys
import time
# only activate SSL if available
HAVE_SSL = False
try:
from OpenSSL import SSL, crypto
HAVE_SSL = True
except ImportError:
pass
def getDateStrNow():
""" Get the current time formatted for HTTP header """
now = datetime.datetime.fromtimestamp(time.mktime(time.gmtime()))
return now.strftime("%a, %d %b %Y %H:%M:%S GMT")
class FileHandler(BaseHTTPServer.BaseHTTPRequestHandler):
fileName = "Undefined"
filePath = "/dev/null"
fileLength = 0
startTime = getDateStrNow()
blockSize = 1024 * 1024
def checkAndDoRedirect(self):
""" If request didn't request self.fileName redirect to self.fileName.
Returns True if a redirect was issued. """
if urllib.unquote(self.path) != "/" + self.fileName:
self.send_response(302)
self.send_header('Location', '/' + self.fileName)
self.end_headers()
return True
return False
def do_HEAD(self):
if self.checkAndDoRedirect():
return
self.send_response(200)
self.send_header('Content-Length', self.fileLength)
self.send_header('Last-Modified', self.startTime)
self.send_header('Content-Type', 'application/octet-stream')
self.send_header('Content-Disposition', 'attachment; filename="%s"' % self.fileName)
self.end_headers()
def do_GET(self):
if self.checkAndDoRedirect():
return
myfile = open(self.filePath, 'rb')
# find out if this is a continuing download
fromto = None
if "Range" in self.headers:
cont = self.headers.get("Range").split("=")
if len(cont) > 1 and cont[0] == 'bytes':
fromto = cont[1].split('-')
if len(fromto) > 1:
if fromto[1] == '':
fromto[1] = self.fileLength-1
fromto[0] = int(fromto[0])
fromto[1] = int(fromto[1])
if fromto[0] >= self.fileLength or fromto[0] < 0 or fromto[1] >= self.fileLength or fromto[1]-fromto[0] < 0:
# oops, already done!
self.send_response(416)
self.send_header('Content-Range', 'bytes */%s' % self.fileLength)
self.end_headers()
return
# now we can wind the file *brrrrrr*
myfile.seek(fromto[0])
if fromto != None:
self.send_response(216)
self.send_header('Content-Range', 'bytes %s-%s/%s' % (fromto[0], fromto[1], self.fileLength))
self.send_header('Content-Length', fromto[1]-fromto[0]+1)
else:
self.send_response(200)
self.send_header('Content-Length', self.fileLength)
self.send_header('Content-Disposition', 'attachment; filename="%s"' % self.fileName)
self.send_header('Content-Type', 'application/octet-stream')
self.send_header('Content-Transfer-Encoding', 'binary')
self.end_headers()
block = self.getChunk(myfile, fromto)
while block:
try:
self.wfile.write(block)
except socket.error, e:
print "%s ABORTED transmission (Reason %s: %s)" % (self.client_address[0], e[0], e[1])
return
block = self.getChunk(myfile, fromto)
myfile.close()
print "%s finished downloading" % (self.client_address[0])
return
def getChunk(self, myfile, fromto):
if fromto and myfile.tell()+self.blockSize >= fromto[1]:
readsize = fromto[1]-myfile.tell()+1
else:
readsize = self.blockSize
return myfile.read(readsize)
class FilePutter(BaseHTTPServer.BaseHTTPRequestHandler):
""" Simple HTTP Server which allows uploading to a specified directory
either via multipart/form-data or POST/PUT requests containing the file.
"""
targetDir = None
maxUploadSize = 0
uploadPage = """
<!docype html>
<html>
<form action="/" method="post" enctype="multipart/form-data">
<label for="file">Filename:</label>
<input type="file" name="file" id="file" />
<br />
<input type="submit" name="submit" value="Upload" />
</form>
</html>
"""
def do_GET(self):
""" Answer every GET request with the upload form """
self.sendResponse(200, self.uploadPage)
def do_POST(self):
""" Upload a file via POST
If the content-type is multipart/form-data it checks for the file
field and saves the data to disk. For other content-types it just
calls do_PUT and is handled as such except for the http response code.
Files can be uploaded with wget --post-file=path/to/file <url> or
curl -X POST -d @file <url> .
"""
length = self.getContentLength()
if length < 0:
return
ctype = self.headers.getheader('Content-Type')
# check for multipart/form-data.
if not (ctype and ctype.lower().startswith("multipart/form-data")):
# not a normal multipart request ==> handle as PUT request
return self.do_PUT(fromPost=True)
# create FieldStorage object for multipart parsing
env = os.environ
env['REQUEST_METHOD'] = "POST"
fstorage = cgi.FieldStorage(fp=self.rfile, headers=self.headers, environ=env)
if not "file" in fstorage:
self.sendResponse(400, "No file found in request.")
return
destFileName = self.getTargetName(fstorage["file"].filename)
if destFileName == "":
self.sendResponse(400, "Filename was empty or invalid")
return
# write file down to disk, send an
target = open(destFileName, "w")
target.write(fstorage["file"].file.read(length))
target.close()
self.sendResponse(200, "OK!")
def do_PUT(self, fromPost=False):
""" Upload a file via PUT
The request path is used as filename, so uploading a file to the url
http://host:8080/testfile will cause the file to be named testfile. If
no filename is given, a random name will be generated.
Files can be uploaded with e.g. curl -X POST -d @file <url> .
"""
length = self.getContentLength()
if length < 0:
return
fileName = urllib.unquote(self.path)
if fileName == "/":
# if no filename was given we have to generate one
fileName = str(time.time())
cleanFileName = self.getTargetName(fileName)
if cleanFileName == "":
self.sendResponse(400, "Filename was invalid")
return
# Sometimes clients want to be told to continue with their transfer
if self.headers.getheader("Expect") == "100-continue":
self.send_response(100)
self.end_headers()
target = open(cleanFileName, "w")
target.write(self.rfile.read(int(self.headers['Content-Length'])))
target.close()
self.sendResponse(fromPost and 200 or 201, "OK!")
def getContentLength(self):
length = 0
try:
length = int(self.headers['Content-Length'])
except (ValueError, KeyError):
pass
if length <= 0:
self.sendResponse(400, "Content-Length was invalid or not set.")
return -1
if length > self.maxUploadSize:
self.sendResponse(413, "Your file was too big! Maximum allowed size is %d byte. <a href=\"/\">back</a>" % self.maxUploadSize)
return -1
return length
def sendResponse(self, code, msg):
""" Send a HTTP response with code and msg, providing the correct
content-length.
"""
self.send_response(code)
self.send_header('Content-Type', 'text/html')
self.send_header('Content-Length', str(len(msg)))
self.end_headers()
self.wfile.write(msg)
def getTargetName(self, fname):
""" Generate a clean and secure filename.
This function takes a filename and strips all the slashes out of it.
If the file already exists in the target directory, a (NUM) will be
appended, so no file will be overwritten.
"""
cleanFileName = fname.replace("/", "")
if cleanFileName == "":
return ""
destFileName = self.targetDir + "/" + cleanFileName
if not os.path.exists(destFileName):
return destFileName
else:
i = 1
extraDestFileName = destFileName + "(%s)" % i
while os.path.exists(extraDestFileName):
i += 1
extraDestFileName = destFileName + "(%s)" % i
return extraDestFileName
# never reached
class ThreadedHTTPServer(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
pass
def catchSSLErrors(BaseSSLClass):
""" Class decorator which catches SSL errors and prints them. """
class X(BaseSSLClass):
def handle_one_request(self, *args, **kwargs):
try:
BaseSSLClass.handle_one_request(self, *args, **kwargs)
except SSL.Error, e:
if str(e) == "":
print "%s SSL Error (Empty error message)" % (self.client_address[0],)
else:
print "%s SSL Error: %s" % (self.client_address[0], e)
return X
class SecureThreadedHTTPServer(ThreadedHTTPServer):
def __init__(self, pubKey, privKey, *args, **kwargs):
ThreadedHTTPServer.__init__(self, *args, **kwargs)
ctx = SSL.Context(SSL.SSLv23_METHOD)
if type(pubKey) == crypto.X509 and type(privKey) == crypto.PKey:
ctx.use_certificate(pubKey)
ctx.use_privatekey(privKey)
else:
ctx.use_certificate_file(pubKey)
ctx.use_privatekey_file(privKey)
self.bsocket = socket.socket(self.address_family, self.socket_type)
self.socket = SSL.Connection(ctx, self.bsocket)
self.server_bind()
self.server_activate()
def shutdown_request(self, request):
request.shutdown()
class SecureHandler():
def setup(self):
self.connection = self.request
self.rfile = socket._fileobject(self.request, "rb", self.rbufsize)
self.wfile = socket._fileobject(self.request, "wb", self.wbufsize)
class ServeFileException(Exception):
pass
class ServeFile():
""" Main class to manage everything. """
(MODE_SINGLE, MODE_UPLOAD, MODE_LISTDIR) = range(3)
def __init__(self, target, port=8080, serveMode=0, useSSL=False):
self.target = target
self.port = port
self.serveMode = serveMode
self.dirCreated = False
self.useSSL = useSSL
self.cert = self.key = None
self.auth = None
self.maxUploadSize = 0
if self.serveMode not in range(3):
self.serveMode = None
raise ValueError("Unknown serve mode, needs to be MODE_SINGLE, MODE_UPLOAD or MODE_DIRLIST")
def getIPs(self):
""" Get IPs from all interfaces via ip or ifconfig. """
# ip and ifconfig sometimes are located in /sbin/
os.environ['PATH'] += ':/sbin:/usr/sbin'
proc = Popen(r"ip addr|" + \
"sed -n -e 's/.*inet6\? \([0-9.a-fA-F:]\+\)\/.*/\\1/ p'|" + \
"grep -v '^fe80\|^127.0.0.1\|^::1'", \
shell=True, stdout=PIPE, stderr=PIPE)
if proc.wait() != 0:
# ip failed somehow, falling back to ifconfig
oldLang = os.environ.get("LC_ALL", None)
os.environ['LC_ALL'] = "C"
proc = Popen(r"ifconfig|" + \
"sed -n 's/.*inet6\? addr: \?\([0-9a-fA-F.:]*\).*/" + \
"\\1/p'|" + \
"grep -v '^fe80\|^127.0.0.1\|^::1'", \
shell=True, stdout=PIPE, stderr=PIPE)
if oldLang:
os.environ['LC_ALL'] = oldLang
else:
del(os.environ['LC_ALL'])
if proc.wait() != 0:
# we couldn't find any ip address
proc = None
if proc:
ips = proc.stdout.read().strip().split("\n")
# FIXME: When BaseHTTP supports ipv6 properly, delete this line
ips = filter(lambda ip: ip.find(":") == -1, ips)
return ips
return None
def setSSLKeys(self, cert, key):
""" Set SSL cert/key. Can be either path to file or pyssl X509/PKey object. """
self.cert = cert
self.key = key
def setMaxUploadSize(self, limit):
""" Set the maximum upload size in byte """
self.maxUploadSize = limit
def genKeyPair(self):
print "Generating SSL certificate...",
sys.stdout.flush()
pkey = crypto.PKey()
pkey.generate_key(crypto.TYPE_RSA, 2048)
req = crypto.X509Req()
subj = req.get_subject()
subj.CN = "127.0.0.1"
subj.O = "servefile laboratories"
subj.OU = "servefile"
# generate altnames
altNames = []
for ip in self.getIPs() + ["127.0.0.1"]:
altNames.append("IP:%s" % ip)
altNames.append("DNS:localhost")
ext = crypto.X509Extension("subjectAltName", False, ",".join(altNames))
req.add_extensions([ext])
req.set_pubkey(pkey)
req.sign(pkey, "sha1")
cert = crypto.X509()
# some browsers complain if they see a cert from the same authority
# with the same serial ==> we just use the seconds as serial.
cert.set_serial_number(int(time.time()))
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(365*24*60*60)
cert.set_issuer(req.get_subject())
cert.set_subject(req.get_subject())
cert.add_extensions([ext])
cert.set_pubkey(req.get_pubkey())
cert.sign(pkey, "sha1")
self.cert = cert
self.key = pkey
print "done."
def _getCert(self):
return self.cert
def _getKey(self):
return self.key
def setAuth(self, user, password):
if len(user) == "" or len(password) == "":
raise ServeFileException("User and password both need to be at least one character long")
self.auth = base64.b64encode("%s:%s" % (user, password))
def _createServer(self, handler):
server = None
if self.useSSL:
if not self._getKey():
self.genKeyPair()
server = SecureThreadedHTTPServer(self._getCert(), self._getKey(), ('', self.port), handler)
else:
server = ThreadedHTTPServer(('', self.port), handler)
return server
def serve(self):
self.handler = self._confAndFindHandler()
self.server = self._createServer(self.handler)
if self.serveMode != self.MODE_UPLOAD:
print "Serving \"%s\" under port %d" % (self.target, self.port)
else:
print "Serving \"%s\" for uploads under port %d" % (self.target, self.port)
# print urls with local network adresses
print "\nSome addresses this will be available under:"
ips = self.getIPs()
if not ips or len(ips) == 0 or ips[0] == '':
print "Could not find any addresses"
else:
for ip in ips:
print "http%s://%s:%d/" % (self.useSSL and "s" or "", ip, self.port)
print ""
try:
self.server.serve_forever()
except KeyboardInterrupt:
self.server.socket.close()
# cleanup potential upload directory
if self.dirCreated and len(os.listdir(self.target)) == 0:
# created upload dir was not used
os.rmdir(self.target)
def _confAndFindHandler(self):
handler = None
if self.serveMode == self.MODE_SINGLE:
try:
testit = open(self.target, 'r')
testit.close()
FileHandler.filePath = self.target
FileHandler.fileName = os.path.basename(self.target)
FileHandler.fileLength = os.stat(self.target)[ST_SIZE]
except IOError:
raise ServeFileException("Error: Could not open file!")
handler = FileHandler
elif self.serveMode == self.MODE_UPLOAD:
if os.path.isdir(self.target):
print "Warning: Uploading to an already existing directory"
elif not os.path.exists(self.target):
self.dirCreated = True
try:
os.mkdir(self.target)
except IOError, OSError:
raise ServeFileException("Error: Could not create directory '%s' for uploads" % (self.target,) )
else:
raise ServeFileException("Error: Upload directory already exists and is a file")
FilePutter.targetDir = self.target
FilePutter.maxUploadSize = self.maxUploadSize
handler = FilePutter
elif self.serveMode == self.MODE_LISTDIR:
try:
os.chdir(self.target)
except OSError:
raise ServeFileException("Error: Could not change directory to '%s'" % self.target)
handler = SimpleHTTPServer.SimpleHTTPRequestHandler
if self.auth:
# do authentication
AuthenticationHandler.authString = self.auth
class AuthenticatedHandler(AuthenticationHandler, handler):
pass
handler = AuthenticatedHandler
if self.useSSL:
# secure handler
@catchSSLErrors
class AlreadySecuredHandler(SecureHandler, handler):
pass
handler = AlreadySecuredHandler
return handler
class AuthenticationHandler():
# base64 encoded user:password string for authentication
authString = None
realm = "Restricted area"
def handle_one_request(self):
""" Overloaded function to handle one request.
Before calling the responsible do_METHOD function, check credentials
"""
self.raw_requestline = self.rfile.readline()
if not self.raw_requestline:
self.close_connection = 1
return
if not self.parse_request(): # An error code has been sent, just exit
return
authorized = False
if "Authorization" in self.headers:
if self.headers["Authorization"] == ("Basic " + self.authString):
authorized = True
if authorized:
mname = 'do_' + self.command
if not hasattr(self, mname):
self.send_error(501, "Unsupported method (%r)" % self.command)
return
method = getattr(self, mname)
method()
else:
self.send_response(401)
self.send_header("WWW-Authenticate", "Basic realm=\"%s\"" % self.realm)
def main():
parser = argparse.ArgumentParser(description='Serve a single file via HTTP')
parser.add_argument('--version', action='version', version='%(prog)s ' + __version__)
parser.add_argument('target', metavar='file/directory', type=str)
parser.add_argument('-p', '--port', type=int, default=8080, \
help='port to listen on')
parser.add_argument('-u', '--upload', action="store_true", default=False, \
help="Enable uploads to a given directory")
parser.add_argument('-s', '--max-upload-size', type=str, \
help="Limit uploadsize in kb. Size modifiers are allowed, e.g. 2G, 12Mb, 1b.")
parser.add_argument('-l', '--list-dir', action="store_true", default=False, \
help="Show directory indexes and allow access to all subdirectories")
parser.add_argument('--ssl', action="store_true", default=False, \
help="Enable SSL. If no key/cert is specified one will be generated.")
parser.add_argument('--key', type=str, \
help="Keyfile to use for SSL. If no cert is given with --cert the keyfile will also be searched for a cert")
parser.add_argument('--cert', type=str, \
help="Certfile to use for SSL")
parser.add_argument('-a', '--auth', type=str, metavar='user:password', \
help="Set user and password for HTTP basic authentication")
args = parser.parse_args()
maxUploadSize = 0
# check for invalid option combinations/preparse stuff
if args.max_upload_size and not args.upload:
print "Error: max upload size can only be specified when in upload mode"
sys.exit(1)
if args.max_upload_size:
sizeRe = re.match("^(\d+(?:[,.]\d+)?)(?:([bkmgtpe])(?:(?<!b)b?)?)?$", args.max_upload_size.lower())
if not sizeRe:
print "Error: Your max upload size param is broken."
sys.exit(1)
uploadSize, modifier = sizeRe.groups()
uploadSize = float(uploadSize.replace(",", "."))
sizes = ["b", "k", "m", "g", "t", "p", "e"]
maxUploadSize = int(uploadSize * pow(1024, sizes.index(modifier or "k")))
if maxUploadSize < 0:
print "Error: Your max upload size can't be negative"
sys.exit(1)
if args.ssl and not HAVE_SSL:
print "Error: SSL is not available, please install pyssl (python-openssl)"
sys.exit(1)
if args.cert and not args.key:
print "Error: Please specify a key along with your cert"
sys.exit(1)
if not args.ssl and (args.cert or args.key):
print "Error: You need to turn on ssl with --ssl when specifying certs/keys"
sys.exit(1)
if args.auth:
dpos = args.auth.find(":")
if dpos <= 0 or dpos == (len(args.auth)-1):
print "Error: User and password for HTTP basic auth need to be both at least one character long and have to be seperated by a \":\""
sys.exit(1)
mode = None
if args.upload:
mode = ServeFile.MODE_UPLOAD
elif args.list_dir:
mode = ServeFile.MODE_LISTDIR
else:
mode = ServeFile.MODE_SINGLE
server = None
try:
server = ServeFile(args.target, args.port, mode, args.ssl)
if maxUploadSize > 0:
server.setMaxUploadSize(maxUploadSize)
if args.ssl and args.key:
cert = args.cert or args.key
server.setSSLKeys(cert, args.key)
if args.auth:
user, password = args.auth.split(":", 1)
server.setAuth(user, password)
server.serve()
except ServeFileException, e:
print e
sys.exit(1)
print "Good bye.."
if __name__ == '__main__':
main()

View File

@ -1,124 +0,0 @@
.TH SERVEFILE 1 "January 2023" "servefile 0.5.4" "User Commands"
.SH NAME
servefile \- small HTTP-Server for temporary file transfer
.SH SYNOPSIS
.B servefile
[\fI\-h\fR\fR] [\fI\-\-version\fR] [\fI\-p PORT\fR] [\fI\-u\fR] [\fI\-s MAX_UPLOAD_SIZE\fR] [\fI\-l\fR]
.IP
[\fI\-\-ssl\fR] [\fI\-\-key KEY\fR] [\fI\-\-cert CERT\fR] [\fI\-a user:password\fR]
\fIfile/directory\fR
.SH DISCLAIMER
Do not use this as a normal web server. This server is optimized for running
a short time and to send files to other people, not for doing high-performance
static file serving.
.SH DESCRIPTION
Servefile is a small HTTP-server intended for temporary file transfer mostly
in the local network. It aims to make transferring single files as painless as
possible and to replace tar/netcat solutions.
With just a file as argument servefile serves just that one file and redirects
all HTTP requests to that file.
Uploads can be done with curl, wget (see EXAMPLES) or a normal browser.
In upload mode with \fB\-u\fR servefile creates a directory and saves all
uploaded files into that directory. When uploading with curl or wget the
filename is extracted from the path part of the url used for the upload.
For SSL support pyopenssl (python3-openssl) needs to be installed. If no key
and cert is given, servefile will generate a key pair for you and display its
fingerprint.
In \fB--tar\fR mode the given file or directory will be packed on (each)
request and piped to the client through the HTTP connection, thus serving
always the latest content of the directory and preventing temporary file
creaton. Tar files will be created containing only the lowest directory name
from the full path, so using /path/to/dir/ as \fIfile/directory\fR argument
will create a tar file starting with the dir/ directory. When giving a file
as argument, only the file without any path will be in the tarfile.
Symlinks will not be dereferenced.
.SH COMMAND SUMMARY
.SS "positional arguments:"
.TP
\fIfile/directory\fR
file or directory (with \fB\-l\fR or \fB\-u\fR) which should be served or uploaded to
.SS "optional arguments:"
.TP
\fB\-h\fR, \fB\-\-help\fR
Show a help message and exit
.TP
\fB\-\-version\fR
Show program's version number and exit
.TP
\fB\-p\fR PORT, \fB\-\-port\fR PORT
Port to listen on
.TP
\fB\-u\fR, \fB\-\-upload\fR
Enable uploads to a given directory
.TP
\fB\-s\fR MAX_UPLOAD_SIZE, \fB\-\-max\-upload\-size\fR MAX_UPLOAD_SIZE
Limit upload size in kB. Size modifiers are allowed,
e.g. 2G, 12MB, 1B.
.TP
\fB\-l\fR, \fB\-\-list\-dir\fR
Show directory indexes and allow access to all
subdirectories
.TP
\fB\-\-ssl\fR
Enable SSL. If no key/cert is specified one will be
generated.
.TP
\fB\-\-key\fR KEY
Key file to use for SSL. If no cert is given with
\fB\-\-cert\fR the key file will also be searched for a cert
.TP
\fB\-\-cert\fR CERT
Certfile to use for SSL
.TP
\fB\-a\fR user:password, \fB\-\-auth\fR user:password
Set user and password for HTTP basic authentication
.TP
\fB\-\-realm\fR REALM
Set a realm for HTTP basic authentication. This is an
arbitrary string which is displayed when doing HTTP
basic authentication
.TP
\fB\-t\fR, \fB\-\-tar\fR
Enable on the fly tar creation for given file or
directory. Note: Download continuation will not be
available.
.TP
\fB\-c\fR method, \fB\-\-compression\fR method
Set compression method, only in combination with
\fB\-\-tar\fR. Can be one of none, gzip, bzip2, xz.
.TP
\fB\-4\fR, \fB\-\-ipv4\-only\fR
Listen on IPv4 only
.TP
\fB\-6\fR, \fB\-\-ipv6\-only\fR
Listen on IPv6 only
.SH EXAMPLES
Serving a single file with SSL and HTTP Basic auth:
.IP
servefile \-\-ssl \-\-auth foo:bar the_file
.PP
Enabling uploads to a directory:
.IP
servefile \-u dir/
.PP
Uploading file foo as bar to servefile via command line:
.PP
curl \-X PUT http://ip:port/bar \-\-data-binary @foo
curl \-X POST http://ip:port/bar \-\-data-binary @foo
wget http://ip:port/bar \-\-post-file=foo
.PP
Serving a on the fly generated tar.gz file of a directory:
.IP
servefile \-\-tar \-c gzip path/to/dir
.PP
.SH AUTHOR
servefile is developed by Sebastian Lohff <seba@someserver.de>

View File

View File

@ -1,3 +0,0 @@
from . import servefile
servefile.main()

File diff suppressed because it is too large Load Diff

View File

@ -1,53 +1,17 @@
#!/usr/bin/env python
#!/usr/bin/python
from setuptools import setup
with open("README.md") as f:
long_description = f.read()
from distutils.core import setup
setup(
name='servefile',
description='Serve files from shell via a small HTTP server',
long_description=long_description,
long_description_content_type='text/markdown',
platforms='posix',
version='0.5.4',
license='GPLv3 or later',
url='https://github.com/sebageek/servefile/',
author='Sebastian Lohff',
author_email='seba@someserver.de',
install_requires=['pyopenssl'],
tests_require=[
'pathlib2; python_version<"3"',
'pytest',
'requests',
],
packages=["servefile"],
entry_points={
"console_scripts": [
"servefile = servefile.servefile:main",
],
},
classifiers=[
'Development Status :: 5 - Production/Stable',
'Environment :: Console',
'Intended Audience :: End Users/Desktop',
'License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)',
'Natural Language :: English',
'Programming Language :: Python',
'Programming Language :: Python :: 2',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Programming Language :: Python :: 3.11',
'Topic :: Communications',
'Topic :: Communications :: File Sharing',
'Topic :: Internet',
'Topic :: Internet :: WWW/HTTP',
'Topic :: Internet :: WWW/HTTP :: HTTP Servers',
'Topic :: Utilities',
],
name='servefile',
description='Serve files from shell via a small HTTP server',
long_description='Serve files from shell via a small HTTP server. The server redirects all HTTP requests to the file, so only IP and port must be given to another user to access the file. Its main purpose is to quickly send a file to users in your local network, independent of their current setup (OS/software). Beneath that it also supports uploads, SSL, HTTP basic auth and directory listings.',
platforms='posix',
version='0.4.0',
license='GPLv3 or later',
url='http://seba-geek.de/stuff/servefile/',
author='Sebastian Lohff',
author_email='seba@someserver.de',
scripts=['servefile'],
)

View File

@ -1,458 +0,0 @@
# -*- coding: utf-8 -*-
import io
import os
import pytest
import requests
import socket
import subprocess
import sys
import tarfile
import time
import urllib3
from requests.exceptions import ConnectionError
# crudly written to learn more about pytest and to have a base for refactoring
if sys.version_info.major >= 3:
from pathlib import Path
from urllib.parse import quote
connrefused_exc = ConnectionRefusedError
else:
from pathlib2 import Path
from urllib import quote
connrefused_exc = socket.error
def _get_port_from_env(var_name, default):
port = int(os.environ.get(var_name, default))
if port == 0:
# do a one-time port selection for a free port, use it for all tests
s = socket.socket()
s.bind(('', 0))
port = s.getsockname()[1]
s.close()
return port
SERVEFILE_DEFAULT_PORT = _get_port_from_env('SERVEFILE_DEFAULT_PORT', 0)
SERVEFILE_SECONDARY_PORT = _get_port_from_env('SERVEFILE_SECONDARY_PORT', 0)
@pytest.fixture
def run_servefile():
instances = []
def _run_servefile(args, **kwargs):
if not isinstance(args, list):
args = [args]
if kwargs.pop('standalone', None):
# directly call servefile.py
servefile_path = [str(Path(__file__).parent.parent / 'servefile' / 'servefile.py')]
else:
# call servefile as python module
servefile_path = ['-m', 'servefile']
# use non-default default port, if one is given via env (and none via args)
if '-p' not in args and '--port' not in args:
args.extend(['-p', str(SERVEFILE_DEFAULT_PORT)])
print("running {} with args {}".format(", ".join(servefile_path), args))
p = subprocess.Popen([sys.executable] + servefile_path + args, **kwargs)
instances.append(p)
return p
yield _run_servefile
for instance in instances:
try:
instance.terminate()
except OSError:
pass
instance.wait()
@pytest.fixture
def datadir(tmp_path):
def _datadir(data, path=None):
path = path or tmp_path
for k, v in data.items():
if isinstance(v, dict):
new_path = path / k
new_path.mkdir()
_datadir(v, new_path)
else:
if hasattr(v, 'decode'):
v = v.decode('utf-8') # python2 compability
(path / k).write_text(v)
return path
return _datadir
def make_request(path='/', host='localhost', port=SERVEFILE_DEFAULT_PORT, method='get', protocol='http',
encoding='utf-8', **kwargs):
url = '{}://{}:{}{}'.format(protocol, host, port, path)
print('Calling {} on {} with {}'.format(method, url, kwargs))
r = getattr(requests, method)(url, **kwargs)
if r.encoding is None and encoding:
r.encoding = encoding
return r
def check_download(expected_data=None, path='/', fname=None, **kwargs):
if fname is None:
fname = os.path.basename(path)
r = make_request(path, **kwargs)
assert r.status_code == 200
assert r.text == expected_data
assert r.headers.get('Content-Type') == 'application/octet-stream'
if fname:
assert r.headers.get('Content-Disposition') == 'attachment; filename="{}"'.format(fname)
assert r.headers.get('Content-Transfer-Encoding') == 'binary'
return r # for additional tests
def _retry_while(exception, function, timeout=2):
now = time.time # float seconds since epoch
def wrapped(*args, **kwargs):
timeout_after = now() + timeout
while True:
try:
return function(*args, **kwargs)
except exception:
if now() >= timeout_after:
raise
time.sleep(0.1)
return wrapped
def _test_version(run_servefile, standalone):
# we expect the version on stdout (python3.4+) or stderr(python2.6-3.3)
s = run_servefile('--version', standalone=standalone, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
s.wait()
version = s.stdout.readline().decode().strip()
# python2 is deprecated, but we still want our tests to run for it
# CryptographyDeprecationWarnings get in the way for this
if 'CryptographyDeprecationWarning' in version:
s.stdout.readline() # ignore "from x import y" line
version = s.stdout.readline().decode().strip()
# hardcode version as string until servefile is a module
assert version == 'servefile 0.5.4'
def test_version(run_servefile):
_test_version(run_servefile, standalone=False)
def test_version_standalone(run_servefile):
# test if servefile also works by calling servefile.py directly
_test_version(run_servefile, standalone=True)
def test_correct_headers(run_servefile, datadir):
data = "NOOT NOOT"
p = datadir({'testfile': data}) / 'testfile'
run_servefile(str(p))
r = _retry_while(ConnectionError, make_request)()
assert r.status_code == 200
assert r.headers.get('Content-Type') == 'application/octet-stream'
assert r.headers.get('Content-Disposition') == 'attachment; filename="testfile"'
assert r.headers.get('Content-Transfer-Encoding') == 'binary'
def test_redirect_and_download(run_servefile, datadir):
data = "NOOT NOOT"
p = datadir({'testfile': data}) / 'testfile'
run_servefile(str(p))
# redirect
r = _retry_while(ConnectionError, make_request)(allow_redirects=False)
assert r.status_code == 302
assert r.headers.get('Location') == '/testfile'
# normal download
check_download(data, fname='testfile')
def test_redirect_and_download_with_umlaut(run_servefile, datadir):
data = "NÖÖT NÖÖT"
filename = "tästføile"
p = datadir({filename: data}) / filename
run_servefile(str(p))
# redirect
r = _retry_while(ConnectionError, make_request)(allow_redirects=False)
assert r.status_code == 302
assert r.headers.get('Location') == '/{}'.format(quote(filename))
# normal download
if sys.version_info.major < 3:
data = unicode(data, 'utf-8')
check_download(data, fname=filename)
def test_specify_port(run_servefile, datadir):
data = "NOOT NOOT"
p = datadir({'testfile': data}) / 'testfile'
run_servefile([str(p), '-p', str(SERVEFILE_SECONDARY_PORT)])
_retry_while(ConnectionError, check_download)(data, fname='testfile', port=SERVEFILE_SECONDARY_PORT)
def test_ipv4_only(run_servefile, datadir):
data = "NOOT NOOT"
p = datadir({'testfile': data}) / 'testfile'
run_servefile([str(p), '-4'])
_retry_while(ConnectionError, check_download)(data, fname='testfile', host='127.0.0.1')
sock = socket.socket(socket.AF_INET6)
with pytest.raises(connrefused_exc):
sock.connect(("::1", SERVEFILE_DEFAULT_PORT))
def test_big_download(run_servefile, datadir):
# test with about 10 mb of data
data = "x" * (10 * 1024 ** 2)
p = datadir({'testfile': data}) / 'testfile'
run_servefile(str(p))
_retry_while(ConnectionError, check_download)(data, fname='testfile')
def test_authentication(run_servefile, datadir):
data = "NOOT NOOT"
p = datadir({'testfile': data}) / 'testfile'
run_servefile([str(p), '-a', 'user:password'])
for auth in [('foo', 'bar'), ('user', 'wrong'), ('unknown', 'password')]:
r = _retry_while(ConnectionError, make_request)(auth=auth)
assert '401 - Unauthorized' in r.text
assert r.status_code == 401
_retry_while(ConnectionError, check_download)(data, fname='testfile', auth=('user', 'password'))
def test_serve_directory(run_servefile, datadir):
d = {
'foo': {'kratzbaum': 'cat', 'I like Cats!': 'kitteh', '&&&&&&&': 'wheee'},
'bar': {'thisisaverylongfilenamefortestingthatthisstillworksproperly': 'jup!'},
'noot': 'still data in here',
'bigfile': 'x' * (10 * 1024 ** 2),
'möwe': 'KRAKRAKRAKA',
}
p = datadir(d)
run_servefile([str(p), '-l'])
# check if all files are in directory listing
# (could be made more sophisticated with beautifulsoup)
for path in '/', '/../':
r = _retry_while(ConnectionError, make_request)(path)
for k in d:
assert quote(k) in r.text
for fname, content in d['foo'].items():
_retry_while(ConnectionError, check_download)(content, '/foo/' + fname)
r = make_request('/unknown')
assert r.status_code == 404
# download
check_download('jup!', '/bar/thisisaverylongfilenamefortestingthatthisstillworksproperly')
def test_serve_relative_directory(run_servefile, datadir):
d = {
'foo': {'kratzbaum': 'cat', 'I like Cats!': 'kitteh', '&&&&&&&': 'wheee'},
'bar': {'thisisaverylongfilenamefortestingthatthisstillworksproperly': 'jup!'},
'noot': 'still data in here',
'bigfile': 'x' * (10 * 1024 ** 2),
}
p = datadir(d)
run_servefile(['../', '-l'], cwd=os.path.join(str(p), 'foo'))
# check if all files are in directory listing
# (could be made more sophisticated with beautifulsoup)
for path in '/', '/../':
r = _retry_while(ConnectionError, make_request)(path)
for k in d:
assert k in r.text
for fname, content in d['foo'].items():
check_download(content, '/foo/' + fname)
r = make_request('/unknown')
assert r.status_code == 404
# download
check_download('jup!', '/bar/thisisaverylongfilenamefortestingthatthisstillworksproperly')
def test_upload(run_servefile, tmp_path):
data = ('this is my live now\n'
'uploading strings to servers\n'
'so very joyful')
uploaddir = tmp_path / 'upload'
# check that uploaddir does not exist before servefile is started
assert not uploaddir.is_dir()
run_servefile(['-u', str(uploaddir)])
# check upload form present
r = _retry_while(ConnectionError, make_request)()
assert r.status_code == 200
assert 'multipart/form-data' in r.text
# check that servefile created the directory
assert uploaddir.is_dir()
# upload file
files = {'file': ('haiku.txt', data)}
r = make_request(method='post', files=files)
assert 'Thanks' in r.text
assert r.status_code == 200
with open(str(uploaddir / 'haiku.txt')) as f:
assert f.read() == data
# upload file AGAIN!! (and check it is available unter a different name)
files = {'file': ('haiku.txt', data)}
r = make_request(method='post', files=files)
assert r.status_code == 200
with open(str(uploaddir / 'haiku.txt(1)')) as f:
assert f.read() == data
# upload file using PUT
r = make_request("/haiku.txt", method='put', data=data)
assert r.status_code == 201
assert 'OK!' in r.text
with open(str(uploaddir / 'haiku.txt(2)')) as f:
assert f.read() == data
def test_upload_size_limit(run_servefile, tmp_path):
uploaddir = tmp_path / 'upload'
run_servefile(['-s', '2kb', '-u', str(uploaddir)])
# upload file that is too big
files = {'file': ('toobig', "x" * 2049)}
r = _retry_while(ConnectionError, make_request)(method='post', files=files)
assert 'Your file was too big' in r.text
assert r.status_code == 413
assert not (uploaddir / 'toobig').exists()
# upload file that should fit
# the size has to be smaller than 2kb, as the sent size also includes mime-headers
files = {'file': ('justright', "x" * 1900)}
r = make_request(method='post', files=files)
assert r.status_code == 200
def test_upload_large_file(run_servefile, tmp_path):
# small files end up in BytesIO while large files get temporary files. this
# test makes sure we hit the large file codepath at least once
uploaddir = tmp_path / 'upload'
run_servefile(['-u', str(uploaddir)])
data = "asdf" * 1024
files = {'file': ('more_data.txt', data)}
r = _retry_while(ConnectionError, make_request)(method='post', files=files)
assert r.status_code == 200
with open(str(uploaddir / 'more_data.txt')) as f:
assert f.read() == data
def test_tar_mode(run_servefile, datadir):
d = {
'foo': {
'bar': 'hello testmode my old friend',
'baz': 'you came to test me once again',
}
}
p = datadir(d)
run_servefile(['-t', str(p / 'foo')])
# test redirect?
# test contents of tar file
r = _retry_while(ConnectionError, make_request)()
assert r.status_code == 200
tar = tarfile.open(fileobj=io.BytesIO(r.content))
assert len(tar.getmembers()) == 3
assert tar.getmember('foo').isdir()
for filename, content in d['foo'].items():
info = tar.getmember('foo/{}'.format(filename))
assert info.isfile
assert tar.extractfile(info.path).read().decode() == content
def test_tar_compression(run_servefile, datadir):
d = {'foo': 'blubb'}
p = datadir(d)
run_servefile(['-c', 'gzip', '-t', str(p / 'foo')])
r = _retry_while(ConnectionError, make_request)()
assert r.status_code == 200
tar = tarfile.open(fileobj=io.BytesIO(r.content), mode='r:gz')
assert len(tar.getmembers()) == 1
def test_https(run_servefile, datadir):
data = "NOOT NOOT"
p = datadir({'testfile': data}) / 'testfile'
run_servefile(['--ssl', str(p)])
# fingerprint = None
# while not fingerprint:
# line = s.stdout.readline()
# print(line)
# # if we find this line we went too far...
# assert not line.startswith("Some addresses this file will be available at")
# if line.startswith("SHA1 fingerprint"):
# fingerprint = line.replace("SHA1 fingerprint: ", "").strip()
# break
# assert fingerprint
urllib3.disable_warnings()
_retry_while(ConnectionError, check_download)(data, protocol='https', verify=False)
def test_https_big_download(run_servefile, datadir):
# test with about 10 mb of data
data = "x" * (10 * 1024 ** 2)
p = datadir({'testfile': data}) / 'testfile'
run_servefile(['--ssl', str(p)])
urllib3.disable_warnings()
_retry_while(ConnectionError, check_download)(data, protocol='https', verify=False)
def test_abort_download(run_servefile, datadir):
data = "x" * (10 * 1024 ** 2)
p = datadir({'testfile': data}) / 'testfile'
env = os.environ.copy()
env['PYTHONUNBUFFERED'] = '1'
proc = run_servefile(str(p), stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=env)
# provoke a connection abort
# hopefully the buffers will not fill up with all of the 10mb
sock = socket.socket(socket.AF_INET)
_retry_while(connrefused_exc, sock.connect)(("localhost", SERVEFILE_DEFAULT_PORT))
sock.send(b"GET /testfile HTTP/1.0\n\n")
resp = sock.recv(100)
assert resp != b''
sock.close()
time.sleep(0.1)
proc.kill()
out = proc.stdout.read().decode()
assert "127.0.0.1 ABORTED transmission" in out

19
tox.ini
View File

@ -1,19 +0,0 @@
[tox]
envlist = py27,py37,py38,py39,py310,py311,pep8
[testenv]
deps =
pathlib2; python_version<"3"
pytest
requests
flake8
commands = pytest -v --tb=short {posargs}
[testenv:pep8]
commands = flake8 servefile/ {posargs}
[flake8]
show-source = True
max-line-length = 120
ignore = E123,E125,E241,E402,E741,W503,W504,H301
exclude=.venv,.git,.tox,dist,doc,*lib/python*,*egg,build