Skip to content

Commit

Permalink
Issue #222: fixed Findings creating spuriois Elements
Browse files Browse the repository at this point in the history
  • Loading branch information
izar committed Oct 24, 2023
1 parent 5ec33e1 commit 144b993
Showing 1 changed file with 55 additions and 39 deletions.
94 changes: 55 additions & 39 deletions pytm/pytm.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,15 +242,16 @@ def __ne__(self, other):
def __str__(self):
return ", ".join(sorted(set(d.name for d in self)))


class varControls(var):
def __set__(self, instance, value):
if not isinstance(value, Controls):
raise ValueError(
"expecting an Controls "
"value, got a {}".format(type(value))
"expecting an Controls " "value, got a {}".format(type(value))
)
super().__set__(instance, value)


class Action(Enum):
"""Action taken when validating a threat model."""

Expand Down Expand Up @@ -442,18 +443,25 @@ def _apply_defaults(flows, data):
e._safeset("dstPort", e.sink.port)
if hasattr(e.sink.controls, "isEncrypted"):
e.controls._safeset("isEncrypted", e.sink.controls.isEncrypted)
e.controls._safeset("authenticatesDestination", e.source.controls.authenticatesDestination)
e.controls._safeset("checksDestinationRevocation", e.source.controls.checksDestinationRevocation)
e.controls._safeset(
"authenticatesDestination", e.source.controls.authenticatesDestination
)
e.controls._safeset(
"checksDestinationRevocation", e.source.controls.checksDestinationRevocation
)

for d in e.data:
if d.isStored:
if hasattr(e.sink.controls, "isEncryptedAtRest"):
for d in e.data:
d._safeset("isDestEncryptedAtRest", e.sink.controls.isEncryptedAtRest)
d._safeset(
"isDestEncryptedAtRest", e.sink.controls.isEncryptedAtRest
)
if hasattr(e.source, "isEncryptedAtRest"):
for d in e.data:
d._safeset(
"isSourceEncryptedAtRest", e.source.controls.isEncryptedAtRest
"isSourceEncryptedAtRest",
e.source.controls.isEncryptedAtRest,
)
if d.credentialsLife != Lifetime.NONE and not d.isCredentials:
d._safeset("isCredentials", True)
Expand Down Expand Up @@ -522,28 +530,26 @@ def _describe_classes(classes):

def _list_elements():
"""List all elements which can be used in a threat model with the corresponding description"""

def all_subclasses(cls):
"""Get all sub classes of a class"""
subclasses = set(cls.__subclasses__())
return subclasses.union(
(s for c in subclasses for s in all_subclasses(c)))
return subclasses.union((s for c in subclasses for s in all_subclasses(c)))

def print_components(cls_list):
elements = sorted(cls_list, key=lambda c: c.__name__)
max_len = max((len(e.__name__) for e in elements))
for sc in elements:
doc = sc.__doc__ if sc.__doc__ is not None else ''
print(f'{sc.__name__:<{max_len}} -- {doc}')
#print all elements
print('Elements:')
doc = sc.__doc__ if sc.__doc__ is not None else ""
print(f"{sc.__name__:<{max_len}} -- {doc}")

# print all elements
print("Elements:")
print_components(all_subclasses(Element))

# Print Attributes
print('\nAtributes:')
print_components(
all_subclasses(OrderedEnum) | {Data, Action, Lifetime}
)

print("\nAtributes:")
print_components(all_subclasses(OrderedEnum) | {Data, Action, Lifetime})


def _get_elements_and_boundaries(flows):
Expand Down Expand Up @@ -661,7 +667,7 @@ def __init__(
if args:
element = args[0]
else:
element = kwargs.pop("element", Element("invalid"))
element = kwargs.pop("element", Element("created from Finding"))

self.target = element.name
self.element = element
Expand All @@ -683,6 +689,7 @@ def __init__(

threat_id = kwargs.get("threat_id", None)
for f in element.overrides:
# we override on threat_id, which is
if f.threat_id != threat_id:
continue
for i in dir(f.__class__):
Expand Down Expand Up @@ -729,7 +736,14 @@ class TM:
_data = []
_threatsExcluded = []
_sf = None
_duplicate_ignored_attrs = "name", "note", "order", "response", "responseTo", "controls"
_duplicate_ignored_attrs = (
"name",
"note",
"order",
"response",
"responseTo",
"controls",
)
name = varString("", required=True, doc="Model name")
description = varString("", required=True, doc="Model description")
threatsFile = varString(
Expand Down Expand Up @@ -877,15 +891,14 @@ def _check_duplicates(self, flows):

left_controls_attrs = left.controls._attr_values()
right_controls_attrs = right.controls._attr_values()
#for a in self._duplicate_ignored_attrs:
# for a in self._duplicate_ignored_attrs:
# del left_controls_attrs[a], right_controls_attrs[a]
if left_controls_attrs != right_controls_attrs:
continue
if self.onDuplicates == Action.IGNORE:
right._is_drawn = True
continue


raise ValueError(
"Duplicate Dataflow found between {} and {}: "
"{} is same as {}".format(
Expand Down Expand Up @@ -1087,7 +1100,6 @@ def _stale(self, days):
print(f"Checking for code {days} days older than this model.")

for e in TM._elements:

for src in e.sourceFiles:
try:
src_mtime = datetime.fromtimestamp(
Expand Down Expand Up @@ -1164,6 +1176,7 @@ def get_table(self, db, klass):
]
return db.define_table(name, fields)


class Controls:
"""Controls implemented by/on and Element"""

Expand Down Expand Up @@ -1256,15 +1269,13 @@ def _attr_values(self):
result[i] = value
return result


def _safeset(self, attr, value):
try:
setattr(self, attr, value)
except ValueError:
pass



class Element:
"""A generic element"""

Expand Down Expand Up @@ -1303,7 +1314,8 @@ def __init__(self, name, **kwargs):
self.controls = Controls()
self.uuid = uuid.UUID(int=random.getrandbits(128))
self._is_drawn = False
TM._elements.append(self)
if self.name != "created from Finding":
TM._elements.append(self)

def __repr__(self):
return "<{0}.{1}({2}) at {3}>".format(
Expand Down Expand Up @@ -1607,7 +1619,7 @@ class Datastore(Asset):
* FILE_SYSTEM - files on a file system
* SQL - A SQL Database
* LDAP - An LDAP Server
* AWS_S3 - An S3 Bucket within AWS"""
* AWS_S3 - An S3 Bucket within AWS""",
)

def __init__(self, name, **kwargs):
Expand Down Expand Up @@ -1874,27 +1886,29 @@ def serialize(obj, nested=False):
result[i.lstrip("_")] = value
return result


def encode_element_threat_data(obj):
"""Used to html encode threat data from a list of Elements"""
encoded_elements = []
if (type(obj) is not list):
raise ValueError("expecting a list value, got a {}".format(type(value)))
if type(obj) is not list:
raise ValueError("expecting a list value, got a {}".format(type(obj)))

for o in obj:
c = copy.deepcopy(o)
for a in o._attr_values():
if (a == "findings"):
encoded_findings = encode_threat_data(o.findings)
c._safeset("findings", encoded_findings)
c = copy.deepcopy(o)
for a in o._attr_values():
if a == "findings":
encoded_findings = encode_threat_data(o.findings)
c._safeset("findings", encoded_findings)
else:
v = getattr(o, a)
if (type(v) is not list or (type(v) is list and len(v) != 0)):
c._safeset(a, v)
encoded_elements.append(c)
v = getattr(o, a)
if type(v) is not list or (type(v) is list and len(v) != 0):
c._safeset(a, v)

encoded_elements.append(c)

return encoded_elements


def encode_threat_data(obj):
"""Used to html encode threat data from a list of threats or findings"""
encoded_threat_data = []
Expand Down Expand Up @@ -1955,7 +1969,9 @@ def get_args():
"--describe", help="describe the properties available for a given element"
)
_parser.add_argument(
"--list-elements", action="store_true", help="list all elements which can be part of a threat model"
"--list-elements",
action="store_true",
help="list all elements which can be part of a threat model",
)
_parser.add_argument("--json", help="output a JSON file")
_parser.add_argument(
Expand Down

0 comments on commit 144b993

Please sign in to comment.