Source code for owtf.utils.error
"""
owtf.utils.error
~~~~~~~~~~~~~~~~
The error handler provides a centralised control for aborting the application
and logging errors for debugging later.
"""
import logging
import multiprocessing
import signal
import sys
try:
from raven.contrib.tornado import AsyncSentryClient
raven_installed = True
except ImportError:
raven_installed = False
from owtf.settings import SENTRY_API_KEY
from owtf.lib.exceptions import FrameworkAbortException, PluginAbortException
__all__ = [
"abort_framework",
"user_abort",
"get_option_from_user",
"SentryProxy",
"get_sentry_client",
"log_and_exit_handler",
"setup_signal_handlers",
]
command = None
len_padding = 100
padding = "\n{}\n\n".format("_" * len_padding)
sub_padding = "\n{}\n".format("*" * len_padding)
[docs]def abort_framework(message):
"""Abort the OWTF framework.
:warning: If it happens really early and :class:`framework.core.Core`
has note been instantiated yet, `sys.exit()` is called with error
code -1
:param str message: Descriptive message about the abort.
:return: full message explaining the abort.
:rtype: str
"""
message = "Aborted by Framework: {0}".format(message)
logging.error(message)
sys.exit(message)
[docs]def get_option_from_user(options):
"""Give the user options to select
:param options: Set of available options for the user
:type options: `str`
:return: The different options for the user to choose from
:rtype: `str`
"""
return input("Options: 'e'+Enter= Exit {!s}, Enter= Next test\n".format(options))
[docs]def user_abort(level, partial_output=""):
"""This function handles the next steps when a user presses Ctrl-C
:param level: The level which was aborted
:type level: `str`
:param partial_output: Partial output generated by the command or plugin
:type partial_output: `str`
:return: Message to present to the user
:rtype: `str`
"""
# Levels so far can be Command or Plugin
logging.info("\nThe %s was aborted by the user: Please check the report and plugin output files", level)
message = ("\nThe {} was aborted by the user: Please check the report and plugin output files".format(level))
if level == "Command":
option = "p"
if option == "e":
# Try to save partial plugin results.
raise FrameworkAbortException(partial_output)
elif option == "p": # Move on to next plugin.
# Jump to next handler and pass partial output to avoid losing results.
raise PluginAbortException(partial_output)
return message
signame_by_signum = {v: k for k, v in signal.__dict__.items() if k.startswith("SIG") and not k.startswith("SIG_")}
[docs]class SentryProxy(object):
"""Simple proxy for sentry client that logs to stderr even if no sentry client exists."""
def __init__(self, sentry_client):
self.sentry_client = sentry_client
[docs] def capture_exception(self, exc_info=None, **kwargs):
if self.sentry_client:
self.sentry_client.capture_exception(exc_info=exc_info, **kwargs)
logging.exception("exception occurred")
[docs]def get_sentry_client(sentry_key=SENTRY_API_KEY):
if sentry_key and raven_installed:
logging.info("[+] Sentry client setup key: %s", sentry_key)
sentry_client = SentryProxy(sentry_client=AsyncSentryClient(sentry_key))
else:
if not sentry_key:
logging.info("[-] No Sentry key specified")
if not raven_installed:
logging.info("[-] Raven (sentry client) not installed")
sentry_client = SentryProxy(sentry_client=None)
return sentry_client
[docs]def log_and_exit_handler(signum, frame):
logging.debug("%s: caught signal %s, exiting", multiprocessing.current_process().name, signame_by_signum[signum])
sys.exit(1)
[docs]def setup_signal_handlers():
"""Setup the handlers"""
for signum in [signal.SIGINT, signal.SIGTERM]:
signal.signal(signum, log_and_exit_handler)