Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor!: multi-level snapshot comparison and config parser refactor #128

Merged
merged 16 commits into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/report/fw2.snapshot
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@
"num-gtpu-pending": "0"
},
"content_version": {
"version": "8647-7730"
"version": "8647-7740"
},
"arp_table": {},
"license": {
Expand Down Expand Up @@ -184,4 +184,4 @@
"ethernet1/2": "up",
"ethernet1/3": "up"
}
}
}
46 changes: 46 additions & 0 deletions examples/report/lic-5.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
{
"license": {
"Logging Service": {
"authcode": null,
"custom": {
"_Log_Storage_TB": "7",
"logtype": "disk",
"alogging": "enabled"
},
"description": "Device Logging Service",
"expired": "no",
"expires": "August 04, 2024",
"feature": "Logging Service",
"issued": "June 27, 2022",
"serial": "013201027229"
},
"Y feature": {
"authcode": null,
"description": "Device Logging Service",
"expired": "no",
"expires": "August 04, 2024",
"feature": "Logging Service",
"issued": "June 29, 2022",
"serial": "013201027248"
},
"PAN-DB URL Filtering": {
"Logging Service": "somehting",
"authcode": null,
"description": "Palo Alto Networks URL Filtering License",
"expired": "no",
"expires": "June 30, 2028",
"feature": "PAN-DB URL Filtering",
"issued": "April 27, 2023",
"serial": "013201027229"
},
"C feature": {
"authcode": null,
"description": "Palo Alto Networks URL Filtering License",
"expired": "no",
"expires": "June 30, 2028",
"feature": "PAN-DB URL Filtering",
"issued": "April 27, 2023",
"serial": "013201027229"
}
}
}
45 changes: 45 additions & 0 deletions examples/report/lic-6.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
{
"license": {
"Logging Service": {
"authcode": null,
"custom": {
"_Log_Storage_TB": "9",
"logtype": "tape",
"blogging": "enabled"
},
"description": "Device Logging Service",
"expired": "no",
"expires": "August 04, 2024",
"feature": "Logging Service",
"issued": "June 29, 2022",
"serial": "013201027229"
},
"X feature": {
"authcode": null,
"description": "Device Logging Service",
"expired": "no",
"expires": "August 04, 2024",
"feature": "Logging Service",
"issued": "June 29, 2022",
"serial": "013201027248"
},
"PAN-DB URL Filtering": {
"authcode": null,
"description": "Palo Alto Networks URL Filtering License",
"expired": "no",
"expires": "June 30, 2028",
"feature": "PAN-DB URL Filtering",
"issued": "April 29, 2023",
"serial": "013201027248"
},
"C feature": {
"authcode": null,
"description": "Palo Alto Networks URL Filtering License",
"expired": "no",
"expires": "June 30, 2028",
"feature": "PAN-DB URL Filtering",
"issued": "April 27, 2023",
"serial": "013201027229"
}
}
}
109 changes: 93 additions & 16 deletions examples/report/snapshot_load_compare.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,101 @@ def load_snap(fname: str) -> dict:


if __name__ == "__main__":
snapshots = {"fw1": load_snap("fw1.snapshot"), "fw2": load_snap("fw2.snapshot")}
# snapshots = {"fw1": load_snap("fw1.snapshot"), "fw2": load_snap("fw2.snapshot")}
# snapshots = {"fw1": load_snap("arp_table_left.snapshot"), "fw2": load_snap("arp_table_right.snapshot")}
# snapshots = {"fw1": load_snap("lic-1.json"), "fw2": load_snap("lic-2.json")}
snapshots = {"fw1": load_snap("lic-5.json"), "fw2": load_snap("lic-6.json")}


reports = [
"all",
{"ip_sec_tunnels": {"properties": ["state"], "count_change_threshold": 5}},
{"arp_table": {"properties": ["!ttl"], "count_change_threshold": 10}},
{"nics": {"count_change_threshold": 10}},
{"license": {"properties": ["!serial"]}},
{"routes": {"properties": ["!flags"], "count_change_threshold": 10}},
"!content_version",
{
"session_stats": {
"thresholds": [
{"num-max": 10},
{"num-tcp": 10},
]
}
},
# "all",
# {"ip_sec_tunnels": {"properties": ["state"], "count_change_threshold": 5}},
# {"arp_table": {"properties": ["!ttl"], "count_change_threshold": 10}},
# {"nics": {"count_change_threshold": 10}},
# {"license": {"properties": ["!serial"]}},

# NOTE lic-* files and below tests are added for testing during review - will be removed afterwards

# "!license",
# "license",
######## Top level keys - not intented but works
# {"license": {
# "properties": ["Logging Service"] # even works for parent level
# }},
# {"license": {
# "properties": ["!Logging Service"] # also support if property exists in different levels in different dicts
# }},
# {"license": {
# "properties": ["issued", "PAN-DB URL Filtering"] # multi-level "AND" operation (combination with parent) for properties is not supported on purpose - "PAN-DB URL Filtering" diff will be made for all its attributes since its the parent
# }},
# {"license": {
# "properties": ["X feature"]
# }},
# {"license": {
# "properties": ["!X feature"]
# }},
# {"license": {
# "properties": ["C feature"]
# }},

######## 1st and 2nd level keys
# {"license": {
# "properties": ["!_Log_Storage_TB"] # works
# }},
# {"license": {
# "properties": ["_Log_Storage_TB"] # works
# }},
# {"license": {
# "properties": ["_Log_Storage_TB","issued"] # works
# }},
# {"license": {
# "properties": ["issued", "!logtype"] # works
# }},
# {"license": {
# "properties": ["serial", "!logtype"] # works
# }},
# {"license": {
# "properties": ["custom"] # works
# }},
# {"license": {
# "properties": ["!custom"] # works
# }},
# {"license": {
# "properties": ["alogging"] # works
# }},
# {"license": {
# "properties": ["blogging"] # works
# }},
# {"license": {
# "properties": ["something"] # since no such key is there it passes - works
# }},
# {"license": {
# "properties": ["serial", "non-existing"] # works - compare only requested
# }},
# {"license": {
# "properties": ["all"] # works
# }},
# {"license": {
# "properties": ["!issued", "all"] # works - compare all except
# }},
# {"license": {
# "properties": ["!logtype"] # works
# }},
{"license": {
"properties": ["!Logging Service", "issued"] # works
}},


# {"routes": {"properties": ["!flags"], "count_change_threshold": 10}},
# "!content_version",
# {
# "session_stats": {
# "thresholds": [
# {"num-max": 10},
# {"num-tcp": 10},
# ]
# }
# },
]

compare = SnapshotCompare(
Expand Down
75 changes: 42 additions & 33 deletions panos_upgrade_assurance/snapshot_compare.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Optional, Union, List, Dict
from panos_upgrade_assurance.utils import ConfigParser, SnapType
from panos_upgrade_assurance.utils import ConfigParser, SnapType, get_all_dict_keys
from panos_upgrade_assurance import exceptions

from itertools import chain

class SnapshotCompare:
"""Class comparing snapshots of Firewall Nodes.
Expand Down Expand Up @@ -237,10 +237,10 @@ def calculate_diff_on_dicts(
) -> Dict[str, dict]:
"""The static method to calculate a difference between two dictionaries.

By default dictionaries are compared by going down all nested levels, to the point where key-value pairs are just strings
or numbers. It is possible to configure which keys from these pairs should be compared (by default we compare all
available keys). This is done using the `properties` parameter. It's a list of the bottom most level keys. For example,
when comparing route tables snapshots are formatted like:
By default dictionaries are compared by going down all nested levels. It is possible to configure which keys on each
level should be compared (by default we compare all available keys). This is done using the `properties` parameter.
It's a list of keys that can be compared or skipped on each level. For example, when comparing route tables snapshots are
formatted like:

```python showLineNumbers
{
Expand All @@ -260,8 +260,9 @@ def calculate_diff_on_dicts(
}
```

The bottom most level keys are:
The keys to process here can be:

- 'default_0.0.0.0/0_ethernet1/3',
- `virtual-router`,
- `destination`,
- `nexthop`,
Expand Down Expand Up @@ -393,51 +394,59 @@ def calculate_diff_on_dicts(
changed=dict(passed=True, changed_raw={}),
)

chain_unique_set = lambda set1, set2: set(chain(set1,set2))

missing = left_side_to_compare.keys() - right_side_to_compare.keys()
if missing:
result["missing"]["passed"] = False
for key in missing:
for key in missing:
if ConfigParser.is_element_included(key, properties):
result["missing"]["missing_keys"].append(key)
result["missing"]["passed"] = False

added = right_side_to_compare.keys() - left_side_to_compare.keys()
if added:
result["added"]["passed"] = False
for key in added:
for key in added:
if ConfigParser.is_element_included(key, properties):
result["added"]["added_keys"].append(key)
result["added"]["passed"] = False

common_keys = left_side_to_compare.keys() & right_side_to_compare.keys()
if common_keys:
next_level_value = left_side_to_compare[next(iter(common_keys))]
at_lowest_level = True if not isinstance(next_level_value, dict) else False
keys_to_check = (
ConfigParser(valid_elements=set(common_keys), requested_config=properties).prepare_config()
if at_lowest_level
else common_keys
)

item_changed = False
for key in keys_to_check:
if right_side_to_compare[key] != left_side_to_compare[key]:
if isinstance(left_side_to_compare[key], str):
item_changed = False
for key in common_keys:
if right_side_to_compare[key] != left_side_to_compare[key]:
if isinstance(left_side_to_compare[key], str):
if ConfigParser.is_element_included(key, properties):
result["changed"]["changed_raw"][key] = dict(
left_snap=left_side_to_compare[key],
right_snap=right_side_to_compare[key],
)
item_changed = True
elif isinstance(left_side_to_compare[key], dict):

elif isinstance(left_side_to_compare[key], dict):
nested_keys_within_common_key = chain_unique_set(get_all_dict_keys(left_side_to_compare[key]),
get_all_dict_keys(right_side_to_compare[key]))
if ConfigParser.is_element_explicit_excluded(key, properties):
continue # skip to the next key

if properties and key in properties:
# call without properties - do not allow multi level (combined with parent) filtering..
nested_results = SnapshotCompare.calculate_diff_on_dicts(
left_side_to_compare=left_side_to_compare[key],
right_side_to_compare=right_side_to_compare[key],
)
else:
nested_results = SnapshotCompare.calculate_diff_on_dicts(
left_side_to_compare=left_side_to_compare[key],
right_side_to_compare=right_side_to_compare[key],
properties=properties,
)

SnapshotCompare.calculate_passed(nested_results)
if not nested_results["passed"]:
result["changed"]["changed_raw"][key] = nested_results
item_changed = True
else:
raise exceptions.WrongDataTypeException(f"Unknown value format for key {key}.")
result["changed"]["passed"] = not item_changed
SnapshotCompare.calculate_passed(nested_results)
if not nested_results["passed"]:
result["changed"]["changed_raw"][key] = nested_results
item_changed = True
else:
raise exceptions.WrongDataTypeException(f"Unknown value format for key {key}.")
result["changed"]["passed"] = not item_changed

return result

Expand Down
Loading
Loading