Source code for owtf.filesrv.handlers
"""
owtf.filesrv.handlers
~~~~~~~~~~~~~~~~~~~~~
"""
import datetime
import email.utils
import hashlib
import mimetypes
import os
import stat
import subprocess
import time
import tornado
import tornado.template
import tornado.web
__all__ = ['StaticFileHandler']
[docs]class StaticFileHandler(tornado.web.StaticFileHandler):
[docs] def set_default_headers(self):
self.add_header("Access-Control-Allow-Origin", "*")
self.add_header("Access-Control-Allow-Methods", "GET, POST, DELETE")
[docs] def get(self, path, include_body=True):
"""
This is an edited method of original class so that we can show
directory listing and set correct Content-Type
"""
path = self.parse_url_path(path)
abspath = os.path.abspath(os.path.join(self.root, path))
self.absolute_path = abspath
if not os.path.exists(abspath):
raise tornado.web.HTTPError(404)
# Check if a directory if so provide listing
if os.path.isdir(abspath):
# need to look at the request.path here for when path is empty
# but there is some prefix to the path that was already
# trimmed by the routing
# Just loop once to get dirnames and filenames :P
for abspath, dirnames, filenames in os.walk(abspath):
break
directory_listing_template = tornado.template.Template("""
<html>
<head>
<title>Directory Listing</title>
</head>
<body>
<h1>Index of</h1>
<hr>
<ul>
<li><a href="../">../</a></li>
{% if len(dirnames) > 0 %}
<h2>Directories</h2>
{% for item in dirnames %}
<li><a href="{{ url_escape(item, plus=False) }}/">{{ item }}/</a></li>
{% end %}
{% end %}
{% if len(filenames) > 0 %}
<h2>Files</h2>
{% for item in filenames %}
<li><a href="{{ url_escape(item, plus=False) }}">{{ item }}</a></li>
{% end %}
{% end %}
</ul>
</body>
</html>
""")
self.write(directory_listing_template.generate(dirnames=dirnames, filenames=filenames))
return
if os.path.isfile(abspath): # So file
stat_result = os.stat(abspath)
modified = datetime.datetime.fromtimestamp(stat_result[stat.ST_MTIME])
self.set_header("Last-Modified", modified)
mime_type, encoding = mimetypes.guess_type(abspath)
if mime_type:
self.set_header("Content-Type", mime_type)
cache_time = self.get_cache_time(path, modified, mime_type)
if cache_time > 0:
self.set_header("Expires", datetime.datetime.utcnow() + datetime.timedelta(seconds=cache_time))
self.set_header("Cache-Control", "max-age=%s" % str(cache_time))
else:
self.set_header("Cache-Control", "public")
self.set_extra_headers(path)
# Check the If-Modified-Since, and don't send the result if the
# content has not been modified
ims_value = self.request.headers.get("If-Modified-Since")
if ims_value is not None:
date_tuple = email.utils.parsedate(ims_value)
if_since = datetime.datetime.fromtimestamp(time.mktime(date_tuple))
if if_since >= modified:
self.set_status(304)
return
no_of_lines = self.get_argument("lines", default="-1")
if no_of_lines != "-1":
data = subprocess.check_output(["tail", "-" + no_of_lines, abspath])
else:
with open(abspath, "rb") as file:
data = file.read()
hasher = hashlib.sha1()
hasher.update(data)
self.set_header("Etag", '"%s"' % hasher.hexdigest())
if include_body:
self.write(data)
else:
assert self.request.method == "HEAD"
self.set_header("Content-Length", len(data))