Source code for owtf.utils.error

"""
owtf.utils.error
~~~~~~~~~~~~~~~~

The error handler provides a centralised control for aborting the application
and logging errors for debugging later.
"""
import logging
import multiprocessing
import signal
import sys

try:
    from raven.contrib.tornado import AsyncSentryClient

    raven_installed = True
except ImportError:
    raven_installed = False

from owtf.settings import SENTRY_API_KEY
from owtf.lib.exceptions import FrameworkAbortException, PluginAbortException

__all__ = [
    "abort_framework",
    "user_abort",
    "get_option_from_user",
    "SentryProxy",
    "get_sentry_client",
    "log_and_exit_handler",
    "setup_signal_handlers",
]

command = None
len_padding = 100
padding = "\n{}\n\n".format("_" * len_padding)
sub_padding = "\n{}\n".format("*" * len_padding)


[docs]def abort_framework(message): """Abort the OWTF framework. :warning: If it happens really early and :class:`framework.core.Core` has note been instantiated yet, `sys.exit()` is called with error code -1 :param str message: Descriptive message about the abort. :return: full message explaining the abort. :rtype: str """ message = "Aborted by Framework: {0}".format(message) logging.error(message) sys.exit(message)
[docs]def get_option_from_user(options): """Give the user options to select :param options: Set of available options for the user :type options: `str` :return: The different options for the user to choose from :rtype: `str` """ return input("Options: 'e'+Enter= Exit {!s}, Enter= Next test\n".format(options))
[docs]def user_abort(level, partial_output=""): """This function handles the next steps when a user presses Ctrl-C :param level: The level which was aborted :type level: `str` :param partial_output: Partial output generated by the command or plugin :type partial_output: `str` :return: Message to present to the user :rtype: `str` """ # Levels so far can be Command or Plugin logging.info("\nThe %s was aborted by the user: Please check the report and plugin output files", level) message = ("\nThe {} was aborted by the user: Please check the report and plugin output files".format(level)) if level == "Command": option = "p" if option == "e": # Try to save partial plugin results. raise FrameworkAbortException(partial_output) elif option == "p": # Move on to next plugin. # Jump to next handler and pass partial output to avoid losing results. raise PluginAbortException(partial_output) return message
signame_by_signum = {v: k for k, v in signal.__dict__.items() if k.startswith("SIG") and not k.startswith("SIG_")}
[docs]class SentryProxy(object): """Simple proxy for sentry client that logs to stderr even if no sentry client exists.""" def __init__(self, sentry_client): self.sentry_client = sentry_client
[docs] def capture_exception(self, exc_info=None, **kwargs): if self.sentry_client: self.sentry_client.capture_exception(exc_info=exc_info, **kwargs) logging.exception("exception occurred")
[docs]def get_sentry_client(sentry_key=SENTRY_API_KEY): if sentry_key and raven_installed: logging.info("[+] Sentry client setup key: %s", sentry_key) sentry_client = SentryProxy(sentry_client=AsyncSentryClient(sentry_key)) else: if not sentry_key: logging.info("[-] No Sentry key specified") if not raven_installed: logging.info("[-] Raven (sentry client) not installed") sentry_client = SentryProxy(sentry_client=None) return sentry_client
[docs]def log_and_exit_handler(signum, frame): logging.debug("%s: caught signal %s, exiting", multiprocessing.current_process().name, signame_by_signum[signum]) sys.exit(1)
[docs]def setup_signal_handlers(): """Setup the handlers""" for signum in [signal.SIGINT, signal.SIGTERM]: signal.signal(signum, log_and_exit_handler)