Source code for owtf.utils.error
"""
owtf.utils.error
~~~~~~~~~~~~~~~~
The error handler provides a centralised control for aborting the application
and logging errors for debugging later.
"""
import logging
import multiprocessing
import signal
import sys
try:
from raven.contrib.tornado import AsyncSentryClient
raven_installed = True
except ImportError:
raven_installed = False
from owtf.lib.exceptions import FrameworkAbortException, PluginAbortException
__all__ = [
'abort_framework', 'user_abort', 'get_option_from_user', 'SentryProxy', 'get_sentry_client', 'log_and_exit_handler',
'setup_signal_handlers'
]
command = None
len_padding = 100
padding = "\n%s\n\n" % ("_" * len_padding)
sub_padding = "\n%s\n" % ("*" * len_padding)
[docs]def abort_framework(message):
"""Abort the OWTF framework.
:warning: If it happens really early and :class:`framework.core.Core`
has note been instantiated yet, `sys.exit()` is called with error
code -1
:param str message: Descriptive message about the abort.
:return: full message explaining the abort.
:rtype: str
"""
message = "Aborted by Framework: {0}".format(message)
logging.error(message)
sys.exit(message)
[docs]def get_option_from_user(options):
"""Give the user options to select
:param options: Set of available options for the user
:type options: `str`
:return: The different options for the user to choose from
:rtype: `str`
"""
return input("Options: 'e'+Enter= Exit %s, Enter= Next test\n".format(options))
[docs]def user_abort(level, partial_output=''):
"""This function handles the next steps when a user presses Ctrl-C
:param level: The level which was aborted
:type level: `str`
:param partial_output: Partial output generated by the command or plugin
:type partial_output: `str`
:return: Message to present to the user
:rtype: `str`
"""
# Levels so far can be Command or Plugin
logging.info("\nThe %s was aborted by the user: Please check the report and plugin output files", level)
message = ("\nThe {0} was aborted by the user: Please check the report and plugin output files".format(level))
if level == 'Command':
option = 'p'
if option == 'e':
# Try to save partial plugin results.
raise FrameworkAbortException(partial_output)
elif option == 'p': # Move on to next plugin.
# Jump to next handler and pass partial output to avoid losing results.
raise PluginAbortException(partial_output)
return message
signame_by_signum = {v: k for k, v in signal.__dict__.items() if k.startswith('SIG') and not k.startswith('SIG_')}
[docs]class SentryProxy(object):
"""Simple proxy for sentry client that logs to stderr even if no sentry client exists."""
def __init__(self, sentry_client):
self.sentry_client = sentry_client
[docs] def capture_exception(self, exc_info=None, **kwargs):
if self.sentry_client:
self.sentry_client.capture_exception(exc_info=exc_info, **kwargs)
logging.exception("exception occurred")
[docs]def get_sentry_client(sentry_key):
if sentry_key and raven_installed:
logging.info("[+] Sentry client setup key={}".format(sentry_key))
sentry_client = SentryProxy(sentry_client=AsyncSentryClient(sentry_key))
else:
if not sentry_key:
logging.info("[-] No Sentry key specified")
if not raven_installed:
logging.info("[-] Raven (sentry client) not installed")
sentry_client = SentryProxy(sentry_client=None)
return sentry_client
[docs]def log_and_exit_handler(signum, frame):
logging.debug("{}: caught signal {}, exiting".format(multiprocessing.current_process().name,
signame_by_signum[signum]))
sys.exit(1)
[docs]def setup_signal_handlers():
"""Setup the handlers"""
for signum in [signal.SIGINT, signal.SIGTERM]:
signal.signal(signum, log_and_exit_handler)