class CheckFirewall
Class responsible for running readiness checks and creating Firewall state snapshots.
This class is designed to:
- run one or more
FirewallProxyclass methods, - gather and interpret results,
- present results.
It is split into two parts responsible for:
- running readiness checks, all methods related to this functionality are prefixed with
check_, - running state snapshots, all methods related to this functionality are prefixed with
get_, although usually theFirewallProxymethods are run directly.
Although it is possible to run the methods directly, the preferred way is to run them through one of the following run
methods:
run_readiness_checks()is responsible for running specified readiness checks,run_snapshots()is responsible for getting a snapshot of specified device areas.
Attributes
-
_snapshot_method_mapping (dict): Internal variable containing a map of all valid snapshot types mapped to the specific methods.This mapping is used to verify the requested snapshot types and to map the snapshot with an actual method that will eventually run. Keys in this dictionary are snapshot names as defined in the
SnapTypeclass, values are references to methods that will be run. -
_check_method_mapping (dict): Internal variable containing the map of all valid check types mapped to the specific methods.This mapping is used to verify requested check types and to map a check with an actual method that will be eventually run. Keys in this dictionary are check names as defined in the
CheckTypeclass, values are references to methods that will be run. -
EXPLICIT_CHECKS (set): Class variable containing a set of readiness checks that are only run if passed explicitly inchecks_configurationtorun_readiness_checks().
CheckFirewall.__init__
def __init__(node: FirewallProxy,
skip_force_locale: Optional[bool] = False) -> None
CheckFirewall constructor.
Parameters
- node (
FirewallProxy): Object representing a device against which checks and/or snapshots are run. SeeFirewallProxyclass' documentation. - skip_force_locale (
bool, optional): (defaults toFalse) Use with caution, when set toTruewill skip setting locale to en_US.UTF-8 for the module which will parse the datetime strings in checks with current locale setting.
CheckFirewall.check_pending_changes
def check_pending_changes() -> CheckResult
Check if there are pending changes on device.
It checks two states:
- if there is full commit required on the device,
- if not, if there is a candidate config pending on a device.
Returns
CheckResult: Object of CheckResult class representing the result of the content version check:
CheckStatus.SUCCESSif there is no pending configuration,CheckStatus.FAILotherwise.
CheckFirewall.check_panorama_connectivity
def check_panorama_connectivity() -> CheckResult
Check connectivity with the Panorama service.
Returns
CheckResult: Object of CheckResult class representing a state of Panorama connection:
CheckStatus.SUCCESSwhen device is connected to Panorama,CheckStatus.FAILotherwise,CheckStatus.ERRORis returned when no Panorama configuration is found.
CheckFirewall.check_ha_status
def check_ha_status(
skip_config_sync: Optional[bool] = False,
ignore_non_functional: Optional[bool] = False) -> CheckResult
Checks HA pair status from the perspective of the current device.
Currently, only Active-Passive configuration is supported.
Parameters
- skip_config_sync (
bool, optional): (defaults toFalse) Use with caution, when set toTruewill skip checking if configuration is synchronized between nodes. Helpful when verifying a state of a partially upgraded HA pair. - ignore_non_functional (
bool, optional): (defaults toFalse) Use with caution, when set toTruewill ignore if device state isnon-functionalon one of the nodes. Helpful when verifying a state of a partially upgraded HA pair with vmseries plugin version mismatch.
Returns
CheckResult: Object of CheckResult class representing results of HA pair status inspection:
CheckStatus.SUCCESSwhen pair is configured correctly,CheckStatus.FAILotherwise,CheckStatus.ERRORis returned when device is not a member of an HA pair or the pair is not in Active-Passive configuration.
CheckFirewall.check_is_ha_active
def check_is_ha_active(
skip_config_sync: Optional[bool] = False,
ignore_non_functional: Optional[bool] = False) -> CheckResult
Checks whether this is an active node of an HA pair.
Before checking the state of the current device, the check_ha_status() method is run.
If this method does not end with
CheckStatus.SUCCESS, its return value is passed as
the result of check_is_ha_active().
Detailed results matrix looks like this
-
CheckStatus.SUCCESSthe actual state of the device in an HA pair is checked, if the state is- active -
CheckStatus.SUCCESSis returned, - passive -
CheckStatus.FAILis returned,
- active -
-
anything else than
CheckStatus.SUCCESS, thecheck_ha_status()return value is passed as a return value of this method.
Parameters
- skip_config_sync (
bool, optional): (defaults toFalse) Use with caution, when set toTruewill skip checking if configuration is synchronized between nodes. Helpful when working with a partially upgraded HA pair. - ignore_non_functional (
bool, optional): (defaults toFalse) Use with caution, when set toTruewill ignore if device state isnon-functionalon one of the nodes. Helpful when verifying a state of a partially upgraded HA pair with vmseries plugin version mismatch.
Returns
CheckResult: Boolean information reflecting the state of the device.
CheckFirewall.check_expired_licenses
def check_expired_licenses(skip_licenses: Optional[list] = []) -> CheckResult
Check if any license is expired.
Parameters
- skip_licenses (
list, optional): (defaults to[]) List of license names that should be skipped during the check.
Raises
WrongDataTypeException: Raised whenskip_licensesis not type oflist.
Returns
CheckResult: Object of CheckResult class taking value of:
CheckStatus.SUCCESSif no license is expired,CheckStatus.FAILotherwiseCheckStatus.ERRORwhen there is not license information available in the API response.
CheckFirewall.check_active_support_license
def check_active_support_license() -> CheckResult
Check active support license with update server.
Returns
dict: Object of CheckResult class taking value of:
CheckStatus.SUCCESSif the support license is not expired,CheckStatus.FAILotherwise,CheckStatus.ERRORwhen no information cannot be retrieved or found in the API response.
CheckFirewall.check_critical_session
def check_critical_session(
source: Optional[str] = None,
destination: Optional[str] = None,
dest_port: Optional[Union[str, int]] = None) -> CheckResult
Check if a critical session is present in the sessions table.
Parameters
- source (
str, optional): (defaults toNone) Source IPv4 address for the examined session. - destination (
str, optional): (defaults toNone) Destination IPv4 address for the examined session. - dest_port (
int, str, optional): (defaults toNone) Destination port value. This should be an integer value, but string representations such as"8080"are also accepted.
Returns
CheckResult: Object of CheckResult class taking value of:
CheckStatus.SUCCESSif a session is found in the sessions table,CheckStatus.FAILotherwise,CheckStatus.SKIPPEDwhen no config is passed,CheckStatus.ERRORif the session table is empty.
CheckFirewall.check_content_version
def check_content_version(version: Optional[str] = None) -> CheckResult
Verify installed version of the Content Database.
This method runs in two modes:
- w/o any configuration - checks if the latest version of the Content DB is installed.
- with specific version passed - verifies if the installed Content DB is at least equal.
Parameters
- version (
str, optional): (defaults toNone) Target version of the content DB.
Returns
CheckResult: Object of CheckResult class taking value off:
CheckStatus.SUCCESSwhen the installed Content DB met the requirements.CheckStatus.FAILwhen it did not.
CheckFirewall.check_ntp_synchronization
def check_ntp_synchronization() -> CheckResult
Check synchronization with NTP server.
Returns
CheckResult: Object of CheckResult class taking value of:
CheckStatus.SUCCESSwhen a device is synchronized with the NTP server.CheckStatus.FAILwhen a device is not synchronized with the NTP server.CheckStatus.ERRORwhen a device is not configured for NTP synchronization.
CheckFirewall.check_arp_entry
def check_arp_entry(ip: Optional[str] = None,
interface: Optional[str] = None) -> CheckResult
Check if a given ARP entry is available in the ARP table.
Parameters
- interface (
str, optional): (defaults toNone) A name of an interface we examine for the ARP entries. When skipped, all interfaces are examined. - ip (
str, optional): (defaults toNone) IP address of the ARP entry we look for.
Returns
CheckResult: Object of CheckResult class taking value of:
CheckStatus.SUCCESSwhen the ARP entry is found.CheckStatus.FAILwhen the ARP entry is not found.CheckStatus.SKIPPEDwhenipis not provided.CheckStatus.ERRORwhen the ARP table is empty.
CheckFirewall.check_ipsec_tunnel_status
def check_ipsec_tunnel_status(
tunnel_name: Optional[str] = None,
proxy_ids: Optional[List[str]] = None,
require_all_active: Optional[bool] = False) -> CheckResult
Check if a given IPSec tunnel is in active state.
Parameters
- tunnel_name (
str, optional): (defaults toNone) Name of the searched IPSec tunnel. - proxy_ids (
list(str), optional): (defaults toNone) ProxyID names to check. All ProxyIDs are checked if None provided. - require_all_active (
bool, optional): (defaults toFalse) If set, all ProxyIDs should be inactivestate. States are checked only withinproxy_idsif provided.
Returns
CheckResult: Object of CheckResult class taking value of:
CheckStatus.SUCCESSwhen a tunnel is found and is in active state.CheckStatus.FAILwhen a tunnel is either not active or missing in the current configuration.CheckStatus.SKIPPEDwhentunnel_nameis not provided.CheckStatus.ERRORwhen no IPSec tunnels are configured on the device.
CheckFirewall.check_free_disk_space
def check_free_disk_space(image_version: Optional[str] = None) -> CheckResult
Check if a there is enough space on the /opt/panrepo volume for downloading an PanOS image.
This is a check intended to be run before the actual upgrade process starts.
The method operates in two modes:
- default - to be used as last resort, it will verify that the
/opt/panrepovolume has at least 3GB free space available. This amount of free space is somewhat arbitrary and it's based maximum image sizes (path level + base image) available at the time the method was written (+ some additional error margin). - specific target image - suggested mode, it will take one argument
image_versionwhich is the target PanOS version. For that version the actual image size (path + base image) will be calculated. Next, the available free space is verified against that image size + 10% (as an error margin).
Parameters
- image_version (
str, optional): (defaults toNone) Version of the target PanOS image.
Returns
CheckResult: Object of CheckResult class taking value of:
CheckStatus.SUCCESSwhen there is enough free space to download an image.CheckStatus.FAILwhen there is NOT enough free space, additionally the actual free space available is provided as the fail reason.
CheckFirewall.check_mp_dp_sync
def check_mp_dp_sync(diff_threshold: int = 0) -> CheckResult
Check if the Data and Management clocks are in sync.
Raises
WrongDataTypeException: Raised when thediff_thresholdis not type ofint.
Parameters
- diff_threshold (
int, optional): (defaults to0) Maximum allowable difference in seconds between both clocks.
Returns
CheckResult: Object of CheckResult class taking value of:
CheckStatus.SUCCESSwhen both clocks are the same or within threshold.CheckStatus.FAILwhen both clocks differ.
CheckFirewall.check_ssl_cert_requirements
def check_ssl_cert_requirements(rsa: dict = {},
ecdsa: dict = {}) -> CheckResult
Check if the certificates' keys meet minimum size requirements.
This method loops over all certificates installed on a device and compares certificate's properties with the ones
provided in input parameters. There are two parameters available, one describing RSA certificate requirements, the
other for ECDSA certificates. Both parameters are dictionaries accepting the following keys:
hash_method- a minimum (from security perspective) required hashing method,key_size- a minimum size of a key.
Parameters
-
rsa (
dict, optional): A dictionary describing minimum security requirements of aRSAcertificate. Default values for the certificate requirements are as follows:hash_method-SHA256,key_size-2048.
-
ecdsa (
dict, optional): A dictionary describing minimum security requirements of aECDSAcertificate. Default values for the certificate requirements are as follows:hash_method-SHA256,key_size-256.
Returns
CheckResult: Object of CheckResult class taking value of:
CheckStatus.SUCCESSwhen all certs meet the size requirements.CheckStatus.FAILif a least one cert does not meet the requirements - certificate names with their current sizes are provided inCheckResult.reasonproperty.CheckStatus.SKIPPEDwhen device does not have certificates installed.CheckStatus.ERRORwhen the certificate's properties (installed or required) are not supported.
CheckFirewall._calculate_schedule_time_diff
def _calculate_schedule_time_diff(now_dt: datetime, schedule_type: str,
schedule: dict) -> (int, str)
A method that calculates the time distance between two datetime objects.
This method is used only by CheckFirewall.check_scheduled_updates() method and it expects some information
to be already available.
Parameters
- now_dt (
datetime): Adatetimeobject representing the current moment in time. Ideally this should be the device's local time, taken from the management plane clock. - schedule_type (
str): A schedule type returned by PanOS, can be one of:every-*,hourly,daily,weekly,real-time. - schedule (
dict): Value of therecurringkey in the API response, seeFirewallProxy.get_update_schedules()documentation for details. Both formats (locally configured and pushed from a Panorama template) are supported.
Raises
MalformedResponseException: Thrown then theschedule_typeis not recognizable.
Returns
tuple(int, str): A tuple containing the calculated time difference (in minutes) and human-readable description.
CheckFirewall.check_scheduled_updates
def check_scheduled_updates(test_window: int = 60) -> CheckResult
Check if any Dynamic Update job is scheduled to run within the specified time window.
When device is configured via Panorama, this includes schedules set up in Templates. It does not however include schedules
configured in Panorama/Device Deployment/Dynamic Updates/Schedules.
Parameters
- test_window (
int, optional): (defaults to 60 minutes). A time window in minutes to look for an update job occurrence. Has to be a value between60and10080(1 week equivalent). The time window is calculated based on the device's local time (taken from the management plane).
Raises
MalformedResponseException: Thrown in case API response does not meet expectations.
Returns
CheckResult: Object of CheckResult class taking value of:
CheckStatus.SUCCESSwhen there is no update job planned within the test window.CheckStatus.FAILotherwise,CheckResult.reasonfield contains information about the planned jobs with next occurrence time provided if possible.CheckStatus.ERRORwhen thetest_windowparameter does not meet criteria.
CheckFirewall.check_non_finished_jobs
def check_non_finished_jobs() -> CheckResult
Check for any job with status different than FIN.
Returns
CheckResult: Object of CheckResult class taking value of:
CheckStatus.SUCCESSwhen all jobs are in FIN state.CheckStatus.FAILotherwise,CheckResult.reasonfield contains information about the 1st job found with status different than FIN (job ID and the actual status).CheckStatus.SKIPPEDwhen there are no jobs on a device.
CheckFirewall.check_global_jumbo_frame
def check_global_jumbo_frame(mode: bool = None) -> CheckResult
Check if the global jumbo frame configuration matches the desired mode.
Parameters
- mode (
bool): The desired mode of the global jumbo frame configuration.
Returns
CheckResult: Object of CheckResult class taking value of:
CheckStatus.SUCCESSwhen the global jumbo frame mode matches the desired mode.CheckStatus.FAILwhen the current global jumbo frame and the desired modes differ.CheckStatus.SKIPPEDwhenmodeis not provided.
CheckFirewall.check_system_environmentals
def check_system_environmentals(
components: Optional[List[str]] = None) -> CheckResult
Check system environmentals for alarms.
Parameters
- components (
list(str), optional): (defaults to None) List of components to check for alarms. If None, all components are checked. Valid components are 'thermal', 'fantray', 'fan', 'power', 'power-supply'.
Returns
CheckResult: Object of CheckResult class taking value of:
CheckStatus.SUCCESSif no alarms are found in the specified components.CheckStatus.FAILif any alarm is found in specified components.CheckStatus.ERRORwhen invalid components are specified or device did not return environmentals.
CheckFirewall.check_dp_cpu_utilization
def check_dp_cpu_utilization(threshold: int = 80,
minutes: int = 5) -> CheckResult
Check if the data plane CPU utilization is below a specified threshold.
This check retrieves the data plane CPU utilization for the specified duration and calculates the average CPU load across all cores. If the average CPU load is below the threshold, the check passes.
Parameters
- threshold (
int, optional): (defaults to 80) Maximum acceptable average CPU utilization percentage. - minutes (
int, optional): (defaults to 5) Number of minutes to check, between 1 and 60.
Raises
WrongDataTypeException: Raised when the threshold or minutes parameters are not integers.
Returns
CheckResult: Object of CheckResult class taking value of:
CheckStatus.SUCCESSwhen average CPU utilization is below the threshold.CheckStatus.FAILwhen average CPU utilization is equal to or above the threshold.CheckStatus.ERRORwhen the data cannot be retrieved.
CheckFirewall.check_mp_cpu_utilization
def check_mp_cpu_utilization(threshold: int = 80) -> CheckResult
Check if the management plane CPU utilization is below a specified threshold.
This check retrieves the management plane CPU utilization for the last 1 minute and compares it against the provided threshold.
Parameters
- threshold (
int, optional): (defaults to 80) Maximum acceptable CPU utilization percentage.
Raises
WrongDataTypeException: Raised when the threshold parameter is not an integer or is outside the allowed range.
Returns
CheckResult: Object of CheckResult class taking value of:
CheckStatus.SUCCESSwhen CPU utilization is below the threshold.CheckStatus.FAILwhen CPU utilization is equal to or above the threshold.CheckStatus.ERRORwhen the data cannot be retrieved.
CheckFirewall.get_content_db_version
def get_content_db_version() -> Dict[str, str]
Get Content DB version.
Returns
dict(str): To keep the standard of all get methods returning a dictionary this value is also returned as a dictionary in the following format:
{
'version': 'xxxx-yyyy'
}
CheckFirewall.get_ip_sec_tunnels
def get_ip_sec_tunnels() -> Dict[str, dict]
Extract information about IPSEC tunnels from all tunnel data retrieved from a device.
Returns
dict: Currently configured IPSEC tunnels. The returned value is similar to the example below. It can differ though depending on the version of PanOS:
{
"tunnel_name": {
"peerip": "10.26.129.5",
"name": "tunnel_name",
"outer-if": "ethernet1/2",
"gwid": "1",
"localip": "0.0.0.0",
"state": "init",
"inner-if": "tunnel.1",
"mon": "off",
"owner": "1",
"id": "1"
}
}
CheckFirewall.get_global_jumbo_frame
def get_global_jumbo_frame() -> Dict[str, bool]
Get whether global jumbo frame configuration is set or not.
Returns
dict: The global jumbo frame configuration.
{
'mode': True
}
CheckFirewall.run_readiness_checks
def run_readiness_checks(
checks_configuration: Optional[List[Union[str, dict]]] = None,
report_style: bool = False) -> Union[Dict[str, dict], Dict[str, str]]
Run readiness checks.
This method provides a convenient way of running readiness checks methods. For details on configuration see readiness checks documentation.
Parameters
- checks_configuration (
list(str,dict), optional): (defaults toNone) List of readiness checks to run. - report_style (
bool): (defaults toFalse) Changes the output to more descriptive. Can be used when generating a report from the checks.
Raises
WrongDataTypeException: An exception is raised when the configuration is in a data type different thenstrordict.
Returns
dict: Results of all configured checks.
CheckFirewall.run_snapshots
def run_snapshots(
snapshots_config: Optional[List[Union[str,
dict]]] = None) -> Dict[str, dict]
Run snapshots of different firewall areas states.
This method provides a convenient way of running snapshots of a device state. For details on configuration see state snapshots documentation.
Parameters
- snapshots_config (
list(str,dict), optional): (defaults toNone) Defines snapshots of which areas will be taken.
Raises
WrongDataTypeException: An exception is raised when the configuration in a data type is different than in a string.
Returns
dict: The results of the executed snapshots.
CheckFirewall.run_health_checks
def run_health_checks(
checks_configuration: Optional[List[Union[str, dict]]] = None,
report_style: bool = False) -> Union[Dict[str, dict], Dict[str, str]]
Run device health checks.
This method provides a convenient way of running health check methods. For details on configuration see the health checks documentation.
Parameters
- checks_configuration (
list(str,dict), optional): (defaults toNone) List of health checks to run. - report_style (
bool): (defaults toFalse) Changes the output to more descriptive. Can be used when generating a report from the checks.
Raises
WrongDataTypeException: An exception is raised when the configuration is in a data type different thenstrordict.
Returns
dict: Results of all configured checks.
CheckFirewall.check_version_against_version_match_dict
@staticmethod
def check_version_against_version_match_dict(version: Version,
match_dict: dict) -> bool
Compare the given software version against the match dict.
Parameters
- version (
Version): The software version to compare (e.g. "10.1.11"). - match_dict (
dict): A dictionary of tuples mapping major/minor versions to match criteria:
{
"81": [("==", "8.1.21.2"), (">=", "8.1.25.1")],
"90": [(">=", "9.0.16.5")],
}
Returns
bool: True If the given software version matches the provided match criteria
CheckFirewall.check_device_root_certificate_issue
def check_device_root_certificate_issue(
fail_when_affected_version_only: bool = True) -> CheckResult
Checks whether the target device is affected by the Root Certificate Expiration issue.
This check will FAIL if so, allowing you to build upgrade logic based on when and how it's failed.
This check will fail in the following scenarios:
- The device is running software that is affected by the issue AND is running out of date content AND is NOT running the user-id service or data redistribution
- The device is running software that is affected by the issue AND IS running user-id service OR data redistribution
Parameters
- fail_when_affected_version_only (
bool, optional): (defaults toTrue) When set to False, this test will only fail if the software version is affected by the root certificate issue, AND the device is used for data redistribution OR it's using an out-of-date content DB version.
Returns
CheckResult: Object of CheckResult class taking value of:
CheckStatus.SUCCESSif the device is not affected,CheckStatus.FAILotherwise.
CheckFirewall.check_cdss_and_panorama_certificate_issue
def check_cdss_and_panorama_certificate_issue() -> CheckResult
Checks whether the device is affected by the PAN-OS Certificate Expirations Jan 2024 advisory.
Check will fail in either of following scenarios:
- Device is running an affected software version
- Device is running an affected content version
- Device is running the fixed content version or higher but has not been rebooted - note this is best effort, and is based on when the content version was released and the device was rebooted
Returns
CheckResult: Object of CheckResult class taking value of:
CheckStatus.SUCCESSif the device is not affected,CheckStatus.FAILotherwise.