Source code for owtf.db.models

"""
owtf.db.models
~~~~~~~~~~~~~~

The SQLAlchemy models for every table in the OWTF DB.
"""
import datetime

from sqlalchemy import Boolean, Column, DateTime, Float, ForeignKey, \
    Index, Integer, String, Table, Text, UniqueConstraint
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import relationship

Base = declarative_base()

# This table actually allows us to make a many to many relationship
# between transactions table and grep_outputs table
target_association_table = Table('target_session_association', Base.metadata,
                                 Column('target_id', Integer, ForeignKey('targets.id')),
                                 Column('session_id', Integer, ForeignKey('sessions.id')))

Index('target_id_idx', target_association_table.c.target_id, postgresql_using='btree')


[docs]class Session(Base): __tablename__ = "sessions" id = Column(Integer, primary_key=True, autoincrement=True) name = Column(String, unique=True) active = Column(Boolean, default=False) targets = relationship("Target", secondary=target_association_table, backref="sessions")
[docs]class Target(Base): __tablename__ = "targets" id = Column(Integer, primary_key=True, autoincrement=True) target_url = Column(String, unique=True) host_ip = Column(String) port_number = Column(String) url_scheme = Column(String) alternative_ips = Column(String, nullable=True) # Comma seperated host_name = Column(String) host_path = Column(String) ip_url = Column(String) top_domain = Column(String) top_url = Column(String) scope = Column(Boolean, default=True) transactions = relationship("Transaction", cascade="delete") poutputs = relationship("PluginOutput", cascade="delete") urls = relationship("Url", cascade="delete") commands = relationship("Command", cascade="delete") # Also has a column session specified as backref in # session model works = relationship("Work", backref="target", cascade="delete") @hybrid_property def max_user_rank(self): user_ranks = [-1] user_ranks += [poutput.user_rank for poutput in self.poutputs] return (max(user_ranks)) @hybrid_property def max_owtf_rank(self): owtf_ranks = [-1] owtf_ranks += [poutput.owtf_rank for poutput in self.poutputs] return (max(owtf_ranks)) def __repr__(self): return "<Target (url='%s')>" % (self.target_url)
# This table actually allows us to make a many to many relationship # between transactions table and grep_outputs table transaction_association_table = Table('transaction_grep_association', Base.metadata, Column('transaction_id', Integer, ForeignKey('transactions.id')), Column('grep_output_id', Integer, ForeignKey('grep_outputs.id'))) Index('transaction_id_idx', transaction_association_table.c.transaction_id, postgresql_using='btree')
[docs]class Transaction(Base): __tablename__ = "transactions" target_id = Column(Integer, ForeignKey("targets.id")) id = Column(Integer, primary_key=True) url = Column(String) scope = Column(Boolean, default=False) method = Column(String) data = Column(String, nullable=True) # Post DATA time = Column(Float(precision=10)) time_human = Column(String) local_timestamp = Column(DateTime) raw_request = Column(Text) response_status = Column(String) response_headers = Column(Text) response_size = Column(Integer, nullable=True) response_body = Column(Text, nullable=True) binary_response = Column(Boolean, nullable=True) session_tokens = Column(String, nullable=True) login = Column(Boolean, nullable=True) logout = Column(Boolean, nullable=True) grep_outputs = relationship( "GrepOutput", secondary=transaction_association_table, cascade="delete", backref="transactions") def __repr__(self): return "<HTTP Transaction (url='%s' method='%s' response_status='%s')>" % (self.url, self.method, self.response_status)
[docs]class GrepOutput(Base): __tablename__ = "grep_outputs" target_id = Column(Integer, ForeignKey("targets.id")) id = Column(Integer, primary_key=True) name = Column(String) output = Column(Text) # Also has a column transactions, which is added by # using backref in transaction __table_args__ = (UniqueConstraint('name', 'output', target_id),)
[docs]class Url(Base): __tablename__ = "urls" target_id = Column(Integer, ForeignKey("targets.id")) url = Column(String, primary_key=True) visited = Column(Boolean, default=False) scope = Column(Boolean, default=True) def __repr__(self): return "<URL (url='%s')>" % (self.url)
[docs]class PluginOutput(Base): __tablename__ = "plugin_outputs" target_id = Column(Integer, ForeignKey("targets.id")) plugin_key = Column(String, ForeignKey("plugins.key")) # There is a column named plugin which is caused by backref from the plugin class id = Column(Integer, primary_key=True) plugin_code = Column(String) # OWTF Code plugin_group = Column(String) plugin_type = Column(String) date_time = Column(DateTime, default=datetime.datetime.now()) start_time = Column(DateTime) end_time = Column(DateTime) output = Column(String, nullable=True) error = Column(String, nullable=True) status = Column(String, nullable=True) user_notes = Column(String, nullable=True) user_rank = Column(Integer, nullable=True, default=-1) owtf_rank = Column(Integer, nullable=True, default=-1) output_path = Column(String, nullable=True) @hybrid_property def run_time(self): return self.end_time - self.start_time __table_args__ = (UniqueConstraint('plugin_key', 'target_id'),)
[docs]class Command(Base): __tablename__ = "command_register" start_time = Column(DateTime) end_time = Column(DateTime) success = Column(Boolean, default=False) target_id = Column(Integer, ForeignKey("targets.id")) plugin_key = Column(String, ForeignKey("plugins.key")) modified_command = Column(String) original_command = Column(String, primary_key=True) @hybrid_property def run_time(self): return self.end_time - self.start_time
[docs]class Error(Base): __tablename__ = "errors" id = Column(Integer, primary_key=True) owtf_message = Column(String) traceback = Column(String, nullable=True) user_message = Column(String, nullable=True) reported = Column(Boolean, default=False) github_issue_url = Column(String, nullable=True) def __repr__(self): return "<Error (traceback='%s')>" % (self.traceback)
[docs]class Resource(Base): __tablename__ = "resources" id = Column(Integer, primary_key=True) dirty = Column(Boolean, default=False) # Dirty if user edited it. Useful while updating resource_name = Column(String) resource_type = Column(String) resource = Column(String) __table_args__ = (UniqueConstraint('resource', 'resource_type', 'resource_name'),)
[docs]class ConfigSetting(Base): __tablename__ = "configuration" key = Column(String, primary_key=True) value = Column(String) section = Column(String) descrip = Column(String, nullable=True) dirty = Column(Boolean, default=False) def __repr__(self): return "<ConfigSetting (key='%s', value='%s', dirty='%r')>" % (self.key, self.value, self.dirty)
[docs]class TestGroup(Base): __tablename__ = "test_groups" code = Column(String, primary_key=True) group = Column(String) # web, network descrip = Column(String) hint = Column(String, nullable=True) url = Column(String) priority = Column(Integer) plugins = relationship("Plugin")
[docs]class Plugin(Base): __tablename__ = "plugins" key = Column(String, primary_key=True) # key = type@code title = Column(String) name = Column(String) code = Column(String, ForeignKey("test_groups.code")) group = Column(String) type = Column(String) descrip = Column(String, nullable=True) file = Column(String) attr = Column(String, nullable=True) works = relationship("Work", backref="plugin", cascade="delete") outputs = relationship("PluginOutput", backref="plugin") def __repr__(self): return "<Plugin (code='%s', group='%s', type='%s')>" % (self.code, self.group, self.type) @hybrid_property def min_time(self): """ Consider last 5 runs only, better performance and accuracy """ poutputs_num = len(self.outputs) if poutputs_num != 0: if poutputs_num < 5: run_times = [poutput.run_time for poutput in self.outputs] else: run_times = [poutput.run_time for poutput in self.outputs[-5:]] return min(run_times) else: return None @hybrid_property def max_time(self): """ Consider last 5 runs only, better performance and accuracy """ poutputs_num = len(self.outputs) if poutputs_num != 0: if poutputs_num < 5: run_times = [poutput.run_time for poutput in self.outputs] else: run_times = [poutput.run_time for poutput in self.outputs[-5:]] return max(run_times) else: return None __table_args__ = (UniqueConstraint('type', 'code'),)
[docs]class Work(Base): __tablename__ = "worklist" id = Column(Integer, primary_key=True, autoincrement=True) target_id = Column(Integer, ForeignKey("targets.id")) plugin_key = Column(String, ForeignKey("plugins.key")) active = Column(Boolean, default=True) # Columns plugin and target are created using backrefs __table_args__ = (UniqueConstraint('target_id', 'plugin_key'),) def __repr__(self): return "<Work (target='%s', plugin='%s')>" % (self.target_id, self.plugin_key)
[docs]class Mapping(Base): __tablename__ = 'mappings' owtf_code = Column(String, primary_key=True) mappings = Column(String) category = Column(String, nullable=True)