Source code for owtf.proxy.socket_wrapper

"""
owtf.proxy.socket_wrapper
~~~~~~~~~~~~~~~~~~~~~~~~~
"""
import ssl

from tornado import ioloop

from owtf.proxy.gen_cert import gen_signed_cert


[docs]def starttls( socket, domain, ca_crt, ca_key, ca_pass, certs_folder, success=None, failure=None, io=None, **options ): """Wrap an active socket in an SSL socket. Taken from https://gist.github.com/weaver/293449/4d9f64652583611d267604531a1d5f8c32ac6b16. :param socket: :type socket: :param domain: :type domain: :param ca_crt: :type ca_crt: :param ca_key: :type ca_key: :param ca_pass: :type ca_pass: :param certs_folder: :type certs_folder: :param success: :type success: :param failure: :type failure: :param io: :type io: :param options: :type options: :return: :rtype: """ # # Default Options options.setdefault("do_handshake_on_connect", False) options.setdefault("ssl_version", ssl.PROTOCOL_TLS) options.setdefault("server_side", True) # The idea is to handle domains with greater than 3 dots using wildcard certs if domain.count(".") >= 3: key, cert = gen_signed_cert( "*." + ".".join(domain.split(".")[-3:]), ca_crt, ca_key, ca_pass, certs_folder, ) else: key, cert = gen_signed_cert(domain, ca_crt, ca_key, ca_pass, certs_folder) options.setdefault("certfile", cert) options.setdefault("keyfile", key) # Handlers def done(): """Handshake finished successfully.""" io.remove_handler(wrapped.fileno()) success and success(wrapped) def error(): """The handshake failed.""" if failure: return failure(wrapped) # # By default, just close the socket. io.remove_handler(wrapped.fileno()) wrapped.close() def handshake(fd, events): """Handshake sequence with Tornado :param fd: :type fd: :param events: :type events: :return: :rtype: """ if events & io.ERROR: error() return try: new_state = io.ERROR wrapped.do_handshake() return done() except ssl.SSLError or ssl.SSLEOFError as exc: if exc.args[0] == ssl.SSL_ERROR_WANT_READ: new_state |= io.READ elif exc.args[0] == ssl.SSL_ERROR_WANT_WRITE: new_state |= io.WRITE else: raise if new_state != state[0]: state[0] = new_state io.update_handler(fd, new_state) # set up handshake state; use a list as a mutable cell. io = io or ioloop.IOLoop.current() state = [io.ERROR] # Wrap the socket; swap out handlers. io.remove_handler(socket.fileno()) try: wrapped = ssl.wrap_socket(socket, **options) except TypeError: # if python version less than 3.7 wrapped = ssl.SSLSocket(socket, **options) wrapped.setblocking(0) io.add_handler(wrapped.fileno(), handshake, state[0]) # Begin the handshake. handshake(wrapped.fileno(), 0) return wrapped