Source code for owtf.utils.file

"""
owtf.utils.file
~~~~~~~~~~~~~~~

"""
import codecs
import logging
import os
import shutil
import sys
import tempfile

from owtf.settings import LOGS_DIR, OUTPUT_PATH, OWTF_CONF, TARGETS_DIR, WORKER_LOG_DIR
from owtf.utils.error import abort_framework
from owtf.utils.strings import wipe_bad_chars


[docs]def catch_io_errors(func): """Decorator on I/O functions. If an error is detected, force OWTF to quit properly. """ def io_error(*args, **kwargs): """Call the original function while checking for errors. If `owtf_clean` parameter is not explicitely passed or if it is set to `True`, it force OWTF to properly exit. """ owtf_clean = kwargs.pop("owtf_clean", True) try: return func(*args, **kwargs) except (OSError, IOError) as e: if owtf_clean: abort_framework("Error when calling '{!s}'! {!s}.".format(func.__name__, str(e))) raise e return io_error
[docs]class FileOperations(object):
[docs] @staticmethod @catch_io_errors def create_missing_dirs(path): """Creates missing directories if not already present :param path: Path of the directory to create :type path: `str` :return: :rtype: None """ # truncate filepath to 255 char (*nix limit) # See issue #521 directory = path[:255] if os.path.isfile(directory): directory = os.path.dirname(directory) if not os.path.exists(directory): # Create any missing directories. FileOperations.make_dirs(directory)
[docs] @staticmethod @catch_io_errors def codecs_open(*args, **kwargs): """ A wrapper for Python codes with additional argument support""" return codecs.open(*args, **kwargs)
[docs] @staticmethod @catch_io_errors def dump_file(filename, contents, directory): """Create a file with user supplied contents :param filename: File to be created :type filename: `str` :param contents: Contents to write to the file :type contents: `str` :param directory: Directory where the file will be saved :type directory: `str` :return: The final full path of the created file :rtype: `str` """ save_path = os.path.join(directory, wipe_bad_chars(filename)) FileOperations.create_missing_dirs(directory) with FileOperations.codecs_open(save_path, "w", "utf-8") as f: f.write(contents) return save_path
[docs] @staticmethod @catch_io_errors def make_dirs(*args, **kwargs): return os.makedirs(*args, **kwargs)
[docs] @staticmethod @catch_io_errors def open(*args, **kwargs): return open(*args, **kwargs)
[docs] @staticmethod @catch_io_errors def rm_tree(*args, **kwargs): return shutil.rmtree(*args, **kwargs)
[docs] @staticmethod @catch_io_errors def mkdir(*args, **kwargs): return os.mkdir(*args, **kwargs)
[docs]def directory_access(path, mode): """Check if a directory can be accessed in the specified mode by the current user. :param str path: Directory path. :param str mode: Access type. :return: Valid access rights :rtype: `str` """ try: temp_file = tempfile.NamedTemporaryFile(mode=mode, dir=path, delete=True) except (IOError, OSError): return False return True
[docs]def get_file_as_list(filename): """Get file contents as a list :param filename: File path :type filename: `str` :return: Output list of the content :rtype: `list` """ try: with open(filename, "r") as f: output = f.read().split("\n") logging.info("Loaded file: %s", filename) except IOError: logging.error("Cannot open file: %s (%s)", filename, str(sys.exc_info())) output = [] return output
[docs]def create_temp_storage_dirs(owtf_pid): """Create a temporary directory in /tmp with pid suffix. :return: :rtype: None """ tmp_dir = os.path.join("/tmp", "owtf") if not os.path.exists(tmp_dir): tmp_dir = os.path.join(tmp_dir, str(owtf_pid)) if not os.path.exists(tmp_dir): FileOperations.make_dirs(tmp_dir)
[docs]def clean_temp_storage_dirs(owtf_pid): """Rename older temporary directory to avoid any further confusions. :return: :rtype: None """ curr_tmp_dir = os.path.join("/tmp", "owtf", str(owtf_pid)) new_tmp_dir = os.path.join("/tmp", "owtf", "old-{!s}".format(owtf_pid)) if os.path.exists(curr_tmp_dir) and os.access(curr_tmp_dir, os.W_OK): os.rename(curr_tmp_dir, new_tmp_dir)
[docs]def get_output_dir(): """Gets the output directory for the session :return: The path to the output directory :rtype: `str` """ output_dir = os.path.expanduser(OUTPUT_PATH) if not os.path.isabs(output_dir) and directory_access(os.getcwd(), "w+"): return output_dir else: # The output_dir may not be created yet, so check its parent. if directory_access(os.path.dirname(output_dir), "w+"): return output_dir return os.path.expanduser(os.path.join(OWTF_CONF, output_dir))
[docs]def get_output_dir_target(): """Returns the output directory for the targets :return: Path to output directory :rtype: `str` """ return os.path.join(get_output_dir(), TARGETS_DIR)
[docs]def get_dir_worker_logs(): """Returns the output directory for the worker logs :return: Path to output directory for the worker logs :rtype: `str` """ return os.path.join(get_output_dir(), WORKER_LOG_DIR)
[docs]def cleanup_target_dirs(target_url): """Cleanup the directories for the specific target :return: None :rtype: None """ return FileOperations.rm_tree(get_target_dir(target_url))
[docs]def get_target_dir(target_url): """Gets the specific directory for a target in the target output directory :param target_url: Target URL for which directory path is needed :type target_url: `str` :return: Path to the target URL specific directory :rtype: `str` """ clean_target_url = target_url.replace("/", "_").replace(":", "").replace("#", "") return os.path.join(get_output_dir_target(), clean_target_url)
[docs]def create_output_dir_target(target_url): """Creates output directories for the target URL :param target_url: The target URL :type target_url: `str` :return: None :rtype: None """ FileOperations.create_missing_dirs(get_target_dir(target_url))
[docs]def get_logs_dir(): """ Get log directory by checking if abs or relative path is provided in config file """ # Check access for logs dir parent directory because logs dir may not be created. if os.path.isabs(LOGS_DIR) and directory_access(os.path.dirname(LOGS_DIR), "w+"): return LOGS_DIR else: return os.path.join(get_output_dir(), LOGS_DIR)
[docs]def get_log_path(process_name): """Get the log file path based on the process name :param process_name: Process name :type process_name: `str` :return: Path to the specific log file :rtype: `str` """ log_file_name = "{!s}.log".format(process_name) return os.path.join(get_logs_dir(), log_file_name)