From 863646d133c58d900301012ee6c6338bb57d038c Mon Sep 17 00:00:00 2001 From: lakshmj Date: Wed, 25 Mar 2026 20:16:52 +0000 Subject: [PATCH 1/3] Code changes for supporting Offline LAS Licensing in restricted mode. Signed-off-by: lakshmj --- examples/nslaslicense_offline.yaml | 20 ++++++++++++- plugins/module_utils/las_utils.py | 37 +++++++++++++++++++------ plugins/modules/nslaslicense_offline.py | 35 +++++++++++++++++++---- 3 files changed, 77 insertions(+), 15 deletions(-) diff --git a/examples/nslaslicense_offline.yaml b/examples/nslaslicense_offline.yaml index 9180a699..f5dfe3bc 100644 --- a/examples/nslaslicense_offline.yaml +++ b/examples/nslaslicense_offline.yaml @@ -3,7 +3,7 @@ hosts: demo_netscalers gather_facts: false tasks: - - name: Apply offline LAS license to NetScaler ADC + - name: Apply offline LAS license to NetScaler ADC (standard mode) delegate_to: localhost netscaler.adc.nslaslicense_offline: nsip: "{{ nsip }}" @@ -19,3 +19,21 @@ - name: Display license result ansible.builtin.debug: var: lic_result + + - name: Apply offline LAS license to NetScaler ADC (restricted mode) + delegate_to: localhost + netscaler.adc.nslaslicense_offline: + nsip: "{{ nsip }}" + nitro_user: "{{ nitro_user }}" + nitro_pass: "{{ nitro_pass }}" + nitro_protocol: "{{ nitro_protocol | default('https') }}" + validate_certs: false + entitlement_name: "VPX 10000 Premium" + is_fips: false + las_secrets_json: /path/to/las_secrets.json + restricted_mode: true + register: lic_result_restricted + + - name: Display restricted mode license result + ansible.builtin.debug: + var: lic_result_restricted diff --git a/plugins/module_utils/las_utils.py b/plugins/module_utils/las_utils.py index 889fe631..95dad28f 100644 --- a/plugins/module_utils/las_utils.py +++ b/plugins/module_utils/las_utils.py @@ -299,6 +299,17 @@ def import_offline_activation_request(self, request_file, fingerprint, bearer, l loglines.append("ERROR: import_offline_activation_request: {0}".format(str(e))) return "EXCEPTION ERROR" + def import_restricted_offline_activation_request(self, lsid, pubkey, bearer, loglines): + url = "{0}/support/{1}/importrestrictedofflineactivationrequest".format(self._base_url, self._ccid) + headers = {"Content-Type": "application/json", "Authorization": "CWSAuth bearer={0}".format(bearer)} + payload = {"ver": "1.0", "lsid": lsid, "pubkey": pubkey} + try: + result = self._post_json(url, headers, payload) + return result.get("importrequesttoken", "") + except Exception as e: + loglines.append("ERROR: import_restricted_offline_activation_request: {0}".format(str(e))) + return "EXCEPTION ERROR" + def generate_offline_activation(self, import_token, bearer, ent_name, loglines): url = "{0}/{1}/{2}/generateofflineactivation".format(self._base_url, self._ccid, self.endpoint) headers = {"Content-Type": "application/json", "Authorization": "CWSAuth bearer={0}".format(bearer)} @@ -458,7 +469,8 @@ def get_offline_request_package(nitro, ip, username, password, local_dir, new_ap # --------------------------------------------------------------------------- -def extract_lsguid(file_path, loglines): +def extract_request_fields(file_path, loglines): + """Extract lsguid, lsid, and pubkey from the NS offline activation request tgz in one pass.""" dest_dir = os.path.dirname(file_path) # Validate that file_path is within dest_dir to guard against path traversal. real_file_path = os.path.realpath(file_path) @@ -504,8 +516,12 @@ def extract_lsguid(file_path, loglines): loglines.append("DEBUG: Could not remove temp file lasData.tgz: {0}".format(str(e))) lsguid = data["lsguid"] + inner = data.get("data", {}) + lsid = inner["lsid"] + pubkey = inner["pubkey"] loglines.append("INFO: Extracted lsguid: {0}".format(lsguid)) - return lsguid + loglines.append("INFO: Extracted lsid: {0}".format(lsid)) + return lsguid, lsid, pubkey # --------------------------------------------------------------------------- @@ -558,7 +574,7 @@ def get_ent_name(request_pem, request_ed, is_fips, loglines): # --------------------------------------------------------------------------- -def generate_offline_package(lsguid, request_file, output_file, ent_name, secret_file, loglines): +def generate_offline_package(lsguid, request_file, output_file, ent_name, secret_file, loglines, restricted_mode=False, lsid=None, pubkey=None): client = LASClient(lsguid, secret_file) bearer = client.validate_bearer_cache() @@ -572,13 +588,16 @@ def generate_offline_package(lsguid, request_file, output_file, ent_name, secret loglines.append("ERROR: Failed to obtain bearer token from LAS") return None - fingerprint = client.get_fingerprint_for_lsguid(bearer, loglines) - if "ERROR" in str(fingerprint): - loglines.append("ERROR: Failed to get device fingerprint for lsguid {0}".format(lsguid)) - return None - loglines.append("INFO: Device fingerprint in LAS: {0!r}".format(fingerprint)) + if restricted_mode: + import_token = client.import_restricted_offline_activation_request(lsid, pubkey, bearer, loglines) + else: + fingerprint = client.get_fingerprint_for_lsguid(bearer, loglines) + if "ERROR" in str(fingerprint): + loglines.append("ERROR: Failed to get device fingerprint for lsguid {0}".format(lsguid)) + return None + loglines.append("INFO: Device fingerprint in LAS: {0!r}".format(fingerprint)) + import_token = client.import_offline_activation_request(request_file, fingerprint, bearer, loglines) - import_token = client.import_offline_activation_request(request_file, fingerprint, bearer, loglines) if not import_token or "ERROR" in import_token: loglines.append("ERROR: Failed to import offline activation request") return None diff --git a/plugins/modules/nslaslicense_offline.py b/plugins/modules/nslaslicense_offline.py index ffe4d472..3104fe67 100644 --- a/plugins/modules/nslaslicense_offline.py +++ b/plugins/modules/nslaslicense_offline.py @@ -78,6 +78,13 @@ - The file must contain the keys C(ccid), C(client), C(password), C(las_endpoint), and C(cc_endpoint). type: str required: true + restricted_mode: + description: + - Set to C(true) to use the restricted offline activation request flow. + - When enabled, the module extracts C(lsid) and C(pubkey) from the activation request package + and calls the restricted import endpoint instead of the standard file-upload endpoint. + type: bool + default: false """ EXAMPLES = r""" @@ -112,6 +119,19 @@ nitro_pass: "{{ nitro_pass }}" entitlement_name: "MPX 8905 Standard" las_secrets_json: ./path/to/las_secrets.json + +- name: Generate and apply offline LAS license using restricted mode (no file upload) + delegate_to: localhost + netscaler.adc.nslaslicense_offline: + nsip: 10.102.201.230 + nitro_user: nsroot + nitro_pass: "{{ nitro_pass }}" + nitro_protocol: https + validate_certs: false + entitlement_name: "VPX 10000 Premium" + is_fips: false + las_secrets_json: ./path/to/las_secrets.json + restricted_mode: true """ RETURN = r""" @@ -156,7 +176,7 @@ apply_license_blob_ns, check_if_new_api, check_ns_version, - extract_lsguid, + extract_request_fields, generate_offline_package, get_offline_request_package, ) @@ -177,6 +197,7 @@ def main(): entitlement_name=dict(required=True, type="str"), is_fips=dict(type="bool", default=False), las_secrets_json=dict(required=True, type="str", no_log=False), + restricted_mode=dict(type="bool", default=False), ) module = AnsibleModule( @@ -196,6 +217,7 @@ def main(): ent_name = module.params["entitlement_name"] is_fips = module.params["is_fips"] las_secrets_json = module.params["las_secrets_json"] + restricted_mode = module.params["restricted_mode"] if username != "nsroot": module.fail_json(msg="Only the 'nsroot' account is supported. Got: '{0}'".format(username), **result) @@ -289,20 +311,23 @@ def main(): request_file = os.path.join(temp_dir, ns_file_name) loglines.append("INFO: Got request package: {0}".format(request_file)) - # Extract lsguid (retry once on parse failure) + # Extract lsguid (always) and lsid+pubkey (retry once on parse failure) try: - lsguid = extract_lsguid(request_file, loglines) + lsguid, lsid, pubkey = extract_request_fields(request_file, loglines) except Exception as e: loglines.append("WARNING: First parse attempt failed ({0}), re-downloading package".format(str(e))) ns_file_name = get_offline_request_package(nitro, ip, username, password, temp_dir, new_api, loglines) if not ns_file_name: module.fail_json(msg="Re-download of activation request package failed", **result) request_file = os.path.join(temp_dir, ns_file_name) - lsguid = extract_lsguid(request_file, loglines) + lsguid, lsid, pubkey = extract_request_fields(request_file, loglines) # Generate offline token from LAS cloud output_file = "offline_token_{0}_activation.blob.tgz".format(ip) - if generate_offline_package(lsguid, request_file, output_file, ent_name, las_secrets_json, loglines) is None: + if generate_offline_package( + lsguid, request_file, output_file, ent_name, las_secrets_json, loglines, + restricted_mode=restricted_mode, lsid=lsid if restricted_mode else None, pubkey=pubkey if restricted_mode else None, + ) is None: module.fail_json(msg="Failed to generate offline license token from LAS", **result) # Apply license blob to device From 99a72f8b9b669534142d9001b99732c7ad135297 Mon Sep 17 00:00:00 2001 From: lakshmj Date: Thu, 26 Mar 2026 01:37:22 +0000 Subject: [PATCH 2/3] Code changes for supporting Offline LAS Licensing in restricted mode. Signed-off-by: lakshmj --- plugins/module_utils/las_utils.py | 388 +++++++++++++++---- plugins/modules/nslaslicense_offline.py | 136 +++++-- plugins/modules/routerdynamicrouting_info.py | 2 +- 3 files changed, 424 insertions(+), 102 deletions(-) diff --git a/plugins/module_utils/las_utils.py b/plugins/module_utils/las_utils.py index 95dad28f..f29f999c 100644 --- a/plugins/module_utils/las_utils.py +++ b/plugins/module_utils/las_utils.py @@ -109,7 +109,13 @@ ) MPX14K_PEMS = frozenset( - ("CNS_14020_SERVER", "CNS_14030_SERVER", "CNS_14060_SERVER", "CNS_14080_SERVER", "CNS_14500_SERVER") + ( + "CNS_14020_SERVER", + "CNS_14030_SERVER", + "CNS_14060_SERVER", + "CNS_14080_SERVER", + "CNS_14500_SERVER", + ) ) @@ -134,7 +140,9 @@ def __init__(self, ip, protocol, user, password, validate_certs, loglines=None): self.last_response_body = "" def _url(self, resource): - return "{0}://{1}/nitro/v1/config/{2}".format(self._protocol, self._ip, resource) + return "{0}://{1}/nitro/v1/config/{2}".format( + self._protocol, self._ip, resource + ) def get(self, resource): url = self._url(resource) @@ -147,7 +155,11 @@ def get(self, resource): method="GET", ) body = resp.read() - self._loglines.append("DEBUG: NITRO GET response: {0}".format(body.decode("utf-8", errors="replace").strip())) + self._loglines.append( + "DEBUG: NITRO GET response: {0}".format( + body.decode("utf-8", errors="replace").strip() + ) + ) return json.loads(body) if body.strip() else {} except Exception as e: self._loglines.append("DEBUG: NITRO GET exception: {0}".format(str(e))) @@ -157,7 +169,9 @@ def post(self, resource, payload, action=None): url = self._url(resource) if action: url += "?action={0}".format(action) - self._loglines.append("DEBUG: NITRO POST {0} request: {1}".format(url, json.dumps(payload))) + self._loglines.append( + "DEBUG: NITRO POST {0} request: {1}".format(url, json.dumps(payload)) + ) try: resp = open_url( url, @@ -168,7 +182,9 @@ def post(self, resource, payload, action=None): ) body = resp.read() self.last_response_body = body.decode("utf-8", errors="replace").strip() - self._loglines.append("DEBUG: NITRO POST response: {0!r}".format(self.last_response_body)) + self._loglines.append( + "DEBUG: NITRO POST response: {0!r}".format(self.last_response_body) + ) return json.loads(body) if body.strip() else {} except Exception as e: self._loglines.append("DEBUG: NITRO POST exception: {0}".format(str(e))) @@ -187,7 +203,13 @@ def build_multipart(fields, files): body = b"" for name, value in fields.items(): body += b"--" + boundary.encode() + crlf - body += b'Content-Disposition: form-data; name="' + name.encode() + b'"' + crlf + crlf + body += ( + b'Content-Disposition: form-data; name="' + + name.encode() + + b'"' + + crlf + + crlf + ) body += value.encode() + crlf for name, (filename, file_content) in files.items(): body += b"--" + boundary.encode() + crlf @@ -209,9 +231,11 @@ class LASClient: """Client for the LAS (License Activation Service) cloud API.""" # Namespaced by effective user ID to avoid insecure shared /tmp file access. - _BEARER_CACHE = os.path.join(tempfile.gettempdir(), "r56_bearer_{0}".format(os.geteuid())) + _BEARER_CACHE = os.path.join( + tempfile.gettempdir(), "r56_bearer_{0}".format(os.geteuid()) + ) - def __init__(self, lsguid, secret_file): + def __init__(self, lsguid, secret_file, loglines=None): self.endpoint = "netscalerfixedbw" self.lsguid = lsguid with open(secret_file, "r") as f: @@ -221,16 +245,47 @@ def __init__(self, lsguid, secret_file): self._client_secret = x["password"] self._base_url = x["las_endpoint"] self._cc_token_url = x["cc_endpoint"] + self._loglines = loglines if loglines is not None else [] - def _post_json(self, url, headers, payload): - resp = open_url( - url, - headers=headers, - method="POST", - data=json.dumps(payload).encode("utf-8"), - timeout=60, + def _post_json(self, url, headers, payload, log_payload=True): + logged_payload = ( + payload + if log_payload + else { + k: ("***" if k in ("clientSecret", "password") else v) + for k, v in payload.items() + } + ) + self._loglines.append( + "DEBUG: LAS POST {0} request: {1}".format(url, json.dumps(logged_payload)) ) - return json.loads(resp.read()) + try: + resp = open_url( + url, + headers=headers, + method="POST", + data=json.dumps(payload).encode("utf-8"), + timeout=60, + ) + body = resp.read() + self._loglines.append( + "DEBUG: LAS POST response: {0}".format( + body.decode("utf-8", errors="replace").strip() + ) + ) + return json.loads(body) + except Exception as e: + error_body = "" + if hasattr(e, "read"): + try: + error_body = e.read().decode("utf-8", errors="replace").strip() + except Exception: + pass + msg = "DEBUG: LAS POST exception: {0}".format(str(e)) + if error_body: + msg += " response_body={0}".format(error_body) + self._loglines.append(msg) + raise def generate_bearer_token(self): headers = {"Content-Type": "application/json"} @@ -238,6 +293,7 @@ def generate_bearer_token(self): self._cc_token_url, headers, {"clientId": self._client_id, "clientSecret": self._client_secret}, + log_payload=False, ) token = result.get("token", "") with open(self._BEARER_CACHE, "w") as f: @@ -251,17 +307,30 @@ def validate_bearer_cache(self): bearer = f.read().strip() if not bearer: return None - url = "{0}/support/{1}/{2}/listls".format(self._base_url, self._ccid, self.endpoint) - headers = {"Content-Type": "application/json", "Authorization": "CWSAuth bearer={0}".format(bearer)} + url = "{0}/support/{1}/{2}/listls".format( + self._base_url, self._ccid, self.endpoint + ) + headers = { + "Content-Type": "application/json", + "Authorization": "CWSAuth bearer={0}".format(bearer), + } try: self._post_json(url, headers, {"ver": "1.0"}) return bearer - except Exception: + except Exception as e: + self._loglines.append( + "DEBUG: LAS bearer cache validation failed: {0}".format(str(e)) + ) return None def get_fingerprint_for_lsguid(self, bearer, loglines): - url = "{0}/support/{1}/{2}/listls".format(self._base_url, self._ccid, self.endpoint) - headers = {"Content-Type": "application/json", "Authorization": "CWSAuth bearer={0}".format(bearer)} + url = "{0}/support/{1}/{2}/listls".format( + self._base_url, self._ccid, self.endpoint + ) + headers = { + "Content-Type": "application/json", + "Authorization": "CWSAuth bearer={0}".format(bearer), + } try: ls_list = self._post_json(url, headers, {"ver": "1.0"}) for ls in ls_list.get("lstlasactivatedls", []): @@ -273,16 +342,29 @@ def get_fingerprint_for_lsguid(self, bearer, loglines): return "EXCEPTION ERROR" def get_customer_entitlements(self, bearer, platform, loglines): - url = "{0}/{1}/netscalerfixedbw/customerentitlements".format(self._base_url, self._ccid) - headers = {"Content-Type": "application/json", "Authorization": "CWSAuth bearer={0}".format(bearer)} + url = "{0}/{1}/netscalerfixedbw/customerentitlements".format( + self._base_url, self._ccid + ) + headers = { + "Content-Type": "application/json", + "Authorization": "CWSAuth bearer={0}".format(bearer), + } try: return self._post_json(url, headers, {"ver": "1.0", "platform": platform}) except Exception as e: - loglines.append("ERROR: get_customer_entitlements platform={0}: {1}".format(platform, str(e))) + loglines.append( + "ERROR: get_customer_entitlements platform={0}: {1}".format( + platform, str(e) + ) + ) return None - def import_offline_activation_request(self, request_file, fingerprint, bearer, loglines): - url = "{0}/support/{1}/{2}/importofflineactivationrequest".format(self._base_url, self._ccid, self.endpoint) + def import_offline_activation_request( + self, request_file, fingerprint, bearer, loglines + ): + url = "{0}/support/{1}/{2}/importofflineactivationrequest".format( + self._base_url, self._ccid, self.endpoint + ) base_data = json.dumps({"ver": "1.0", "lsfingerprint": fingerprint}) with open(request_file, "rb") as f: file_content = f.read() @@ -290,44 +372,104 @@ def import_offline_activation_request(self, request_file, fingerprint, bearer, l fields={"data": base_data}, files={"file": (os.path.basename(request_file), file_content)}, ) - headers = {"Authorization": "CWSAuth bearer={0}".format(bearer), "Content-Type": content_type} + headers = { + "Authorization": "CWSAuth bearer={0}".format(bearer), + "Content-Type": content_type, + } + loglines.append( + "DEBUG: LAS POST {0} request: multipart/form-data file={1}".format( + url, os.path.basename(request_file) + ) + ) try: resp = open_url(url, headers=headers, method="POST", data=body, timeout=120) - result = json.loads(resp.read()) + raw = resp.read() + loglines.append( + "DEBUG: LAS POST response: {0}".format( + raw.decode("utf-8", errors="replace").strip() + ) + ) + result = json.loads(raw) return result.get("importrequesttoken", "") except Exception as e: - loglines.append("ERROR: import_offline_activation_request: {0}".format(str(e))) + loglines.append( + "ERROR: import_offline_activation_request: {0}".format(str(e)) + ) return "EXCEPTION ERROR" - def import_restricted_offline_activation_request(self, lsid, pubkey, bearer, loglines): - url = "{0}/support/{1}/importrestrictedofflineactivationrequest".format(self._base_url, self._ccid) - headers = {"Content-Type": "application/json", "Authorization": "CWSAuth bearer={0}".format(bearer)} + def import_restricted_offline_activation_request( + self, lsid, pubkey, bearer, loglines + ): + url = "{0}/support/{1}/{2}/importrestrictedofflineactivationrequest".format( + self._base_url, self._ccid, self.endpoint + ) + headers = { + "Content-Type": "application/json", + "Authorization": "CWSAuth bearer={0}".format(bearer), + } payload = {"ver": "1.0", "lsid": lsid, "pubkey": pubkey} try: result = self._post_json(url, headers, payload) return result.get("importrequesttoken", "") except Exception as e: - loglines.append("ERROR: import_restricted_offline_activation_request: {0}".format(str(e))) + loglines.append( + "ERROR: import_restricted_offline_activation_request: {0}".format( + str(e) + ) + ) return "EXCEPTION ERROR" def generate_offline_activation(self, import_token, bearer, ent_name, loglines): - url = "{0}/{1}/{2}/generateofflineactivation".format(self._base_url, self._ccid, self.endpoint) - headers = {"Content-Type": "application/json", "Authorization": "CWSAuth bearer={0}".format(bearer)} - data = {"ver": "1.0", "importrequesttoken": import_token, "entitlementname": ent_name} + url = "{0}/{1}/{2}/generateofflineactivation".format( + self._base_url, self._ccid, self.endpoint + ) + headers = { + "Content-Type": "application/json", + "Authorization": "CWSAuth bearer={0}".format(bearer), + } + data = { + "ver": "1.0", + "importrequesttoken": import_token, + "entitlementname": ent_name, + } try: return self._post_json(url, headers, data) except Exception as e: loglines.append("ERROR: generate_offline_activation: {0}".format(str(e))) return "EXCEPTION ERROR" - def get_blob_from_las(self, newactivationid, lsfingerprint, output_file, bearer, loglines): - url = "{0}/support/{1}/{2}/exportofflineactivationresponse".format(self._base_url, self._ccid, self.endpoint) - headers = {"Content-Type": "application/json", "Authorization": "CWSAuth bearer={0}".format(bearer)} - payload = {"ver": "1.0", "lsfingerprint": lsfingerprint, "newactivationid": newactivationid} + def get_blob_from_las( + self, newactivationid, lsfingerprint, output_file, bearer, loglines + ): + url = "{0}/support/{1}/{2}/exportofflineactivationresponse".format( + self._base_url, self._ccid, self.endpoint + ) + headers = { + "Content-Type": "application/json", + "Authorization": "CWSAuth bearer={0}".format(bearer), + } + payload = { + "ver": "1.0", + "lsfingerprint": lsfingerprint, + "newactivationid": newactivationid, + } + loglines.append( + "DEBUG: LAS POST {0} request: {1}".format(url, json.dumps(payload)) + ) try: - resp = open_url(url, headers=headers, method="POST", data=json.dumps(payload).encode("utf-8"), timeout=120) + resp = open_url( + url, + headers=headers, + method="POST", + data=json.dumps(payload).encode("utf-8"), + timeout=120, + ) + blob = resp.read() + loglines.append( + "DEBUG: LAS POST response: ".format(len(blob)) + ) with open(output_file, "wb") as f: - f.write(resp.read()) + f.write(blob) return "SUCCESS" except Exception as e: loglines.append("ERROR: get_blob_from_las: {0}".format(str(e))) @@ -347,9 +489,13 @@ def sftp_get(ip, username, password, remote_path, local_path, loglines): ssh.connect(ip, username=username, password=password) sftp = ssh.open_sftp() sftp.get(remote_path, local_path) - loglines.append("INFO: SFTP downloaded {0} -> {1}".format(remote_path, local_path)) + loglines.append( + "INFO: SFTP downloaded {0} -> {1}".format(remote_path, local_path) + ) except Exception as e: - raise RuntimeError("SFTP get failed ({0} -> {1}): {2}".format(remote_path, local_path, str(e))) + raise RuntimeError( + "SFTP get failed ({0} -> {1}): {2}".format(remote_path, local_path, str(e)) + ) finally: if sftp: sftp.close() @@ -364,9 +510,13 @@ def sftp_put(ip, username, password, local_path, remote_path, loglines): ssh.connect(ip, port=22, username=username, password=password) sftp = ssh.open_sftp() sftp.put(local_path, remote_path) - loglines.append("INFO: SFTP uploaded {0} -> {1}".format(local_path, remote_path)) + loglines.append( + "INFO: SFTP uploaded {0} -> {1}".format(local_path, remote_path) + ) except Exception as e: - raise RuntimeError("SFTP put failed ({0} -> {1}): {2}".format(local_path, remote_path, str(e))) + raise RuntimeError( + "SFTP put failed ({0} -> {1}): {2}".format(local_path, remote_path, str(e)) + ) finally: if sftp: sftp.close() @@ -387,20 +537,40 @@ def check_ns_version(nitro, is_fips, loglines): o = nitro.get("nsversion") ns = o.get("nsversion", {}) if not isinstance(ns, dict): - return {"version": None, "build": None, "las_ok": False, "reason": "Missing nsversion in NITRO response"} + return { + "version": None, + "build": None, + "las_ok": False, + "reason": "Missing nsversion in NITRO response", + } ver_str = ns.get("version", "") if not ver_str: - return {"version": None, "build": None, "las_ok": False, "reason": "Empty version field in nsversion"} + return { + "version": None, + "build": None, + "las_ok": False, + "reason": "Empty version field in nsversion", + } loglines.append("INFO: NS version string: {0}".format(ver_str)) version_match = re.search(r"NS(\d+\.\d+)", ver_str) if not version_match: - return {"version": None, "build": None, "las_ok": False, "reason": "Unable to parse version from: {0}".format(ver_str)} + return { + "version": None, + "build": None, + "las_ok": False, + "reason": "Unable to parse version from: {0}".format(ver_str), + } version = version_match.group(1) build_match = re.search(r"Build\s+(\d+)\.(\d+)", ver_str) if not build_match: - return {"version": version, "build": None, "las_ok": False, "reason": "Unable to parse build from: {0}".format(ver_str)} + return { + "version": version, + "build": None, + "las_ok": False, + "reason": "Unable to parse build from: {0}".format(ver_str), + } major_build = int(build_match.group(1)) minor_build = int(build_match.group(2)) @@ -412,14 +582,27 @@ def check_ns_version(nitro, is_fips, loglines): elif version == "13.1": if is_fips: las_ok = is_build_ge(major_build, minor_build, 37, 247) - reason = "Minimum required build is 13.1-37.247 (FIPS)" if not las_ok else "Meets minimum build 13.1-37.247 (FIPS)" + reason = ( + "Minimum required build is 13.1-37.247 (FIPS)" + if not las_ok + else "Meets minimum build 13.1-37.247 (FIPS)" + ) else: las_ok = is_build_ge(major_build, minor_build, 60, 29) - reason = "Minimum required build is 13.1-60.29" if not las_ok else "Meets minimum build 13.1-60.29" + reason = ( + "Minimum required build is 13.1-60.29" + if not las_ok + else "Meets minimum build 13.1-60.29" + ) else: reason = "Unsupported version {0} for LAS offline licensing".format(version) - return {"version": version, "build": "{0}.{1}".format(major_build, minor_build), "las_ok": las_ok, "reason": reason} + return { + "version": version, + "build": "{0}.{1}".format(major_build, minor_build), + "las_ok": las_ok, + "reason": reason, + } def check_if_new_api(mapping, release, major, minor): @@ -449,18 +632,28 @@ def check_if_new_api(mapping, release, major, minor): # --------------------------------------------------------------------------- -def get_offline_request_package(nitro, ip, username, password, local_dir, new_api, loglines): +def get_offline_request_package( + nitro, ip, username, password, local_dir, new_api, loglines +): """Trigger NITRO to generate the NS offline activation request tgz, then SFTP it to local_dir.""" - resource = "nslicenseactivationdata?args=usehostname:true" if new_api else "nslicenseactivationdata" + resource = ( + "nslicenseactivationdata?args=usehostname:true" + if new_api + else "nslicenseactivationdata" + ) o = nitro.get(resource) src_file = (o.get("nslicenseactivationdata") or {}).get("filename", "") if not src_file: - loglines.append("ERROR: Could not get package filename from NITRO response: {0}".format(o)) + loglines.append( + "ERROR: Could not get package filename from NITRO response: {0}".format(o) + ) return "" local_path = os.path.join(local_dir, src_file) - sftp_get(ip, username, password, "/nsconfig/license/" + src_file, local_path, loglines) + sftp_get( + ip, username, password, "/nsconfig/license/" + src_file, local_path, loglines + ) return src_file @@ -476,7 +669,9 @@ def extract_request_fields(file_path, loglines): real_file_path = os.path.realpath(file_path) real_dest_dir = os.path.realpath(dest_dir) if not real_file_path.startswith(real_dest_dir + os.sep): - raise RuntimeError("Invalid file path outside temp directory: {0}".format(file_path)) + raise RuntimeError( + "Invalid file path outside temp directory: {0}".format(file_path) + ) json_file = "ns_offline_activation_request.json" # shell=False ensures no shell metacharacter interpretation; all args are controlled internally. cmd = [ @@ -489,7 +684,9 @@ def extract_request_fields(file_path, loglines): "-C", dest_dir, ] - proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, shell=False) # nosec B603 + proc = subprocess.Popen( + cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, shell=False + ) # nosec B603 stdout, stderr = proc.communicate() if proc.returncode != 0: raise RuntimeError("tar extraction failed: {0}".format(stderr)) @@ -509,11 +706,15 @@ def extract_request_fields(file_path, loglines): try: os.remove(json_path) except Exception as e: - loglines.append("DEBUG: Could not remove temp file {0}: {1}".format(json_path, str(e))) + loglines.append( + "DEBUG: Could not remove temp file {0}: {1}".format(json_path, str(e)) + ) try: os.remove(os.path.join(dest_dir, "lasData.tgz")) except Exception as e: - loglines.append("DEBUG: Could not remove temp file lasData.tgz: {0}".format(str(e))) + loglines.append( + "DEBUG: Could not remove temp file lasData.tgz: {0}".format(str(e)) + ) lsguid = data["lsguid"] inner = data.get("data", {}) @@ -533,7 +734,11 @@ def apply_license_blob_ns(nitro, ip, username, password, fname, loglines): sftp_put(ip, username, password, fname, "/nsconfig/license/" + fname, loglines) payload = { "params": {"action": "apply", "warning": "YES"}, - "nslaslicense": {"filename": fname, "filelocation": "/nsconfig/license", "fixedbandwidth": True}, + "nslaslicense": { + "filename": fname, + "filelocation": "/nsconfig/license", + "fixedbandwidth": True, + }, } r = nitro.post("nslaslicense", payload, action="apply") if r.get("errorcode") == 1043: @@ -550,7 +755,9 @@ def apply_license_blob_ns(nitro, ip, username, password, fname, loglines): def get_ent_name(request_pem, request_ed, is_fips, loglines): base_ent = PEM_ENT_NAME_MAPPING.get(request_pem) if not base_ent: - loglines.append("ERROR: PEM {0} not found in entitlement mapping".format(request_pem)) + loglines.append( + "ERROR: PEM {0} not found in entitlement mapping".format(request_pem) + ) return None if is_fips: @@ -560,10 +767,16 @@ def get_ent_name(request_pem, request_ed, is_fips, loglines): if request_pem in FIPS_MPX_PREMIUM_ONLY_PEMS and request_ed != "Premium": loglines.append("ERROR: FIPS MPX devices only support the Premium edition") return None - base_ent = "FIPS MPX 15120-50G" if request_pem == "CNS_15120_SERVER" else "FIPS " + base_ent + base_ent = ( + "FIPS MPX 15120-50G" + if request_pem == "CNS_15120_SERVER" + else "FIPS " + base_ent + ) if request_ed not in ("Advanced", "Standard", "Premium"): - loglines.append("ERROR: Invalid edition {0} for PEM {1}".format(request_ed, request_pem)) + loglines.append( + "ERROR: Invalid edition {0} for PEM {1}".format(request_ed, request_pem) + ) return None return base_ent + " " + request_ed @@ -574,8 +787,18 @@ def get_ent_name(request_pem, request_ed, is_fips, loglines): # --------------------------------------------------------------------------- -def generate_offline_package(lsguid, request_file, output_file, ent_name, secret_file, loglines, restricted_mode=False, lsid=None, pubkey=None): - client = LASClient(lsguid, secret_file) +def generate_offline_package( + lsguid, + request_file, + output_file, + ent_name, + secret_file, + loglines, + restricted_mode=False, + lsid=None, + pubkey=None, +): + client = LASClient(lsguid, secret_file, loglines=loglines) bearer = client.validate_bearer_cache() if not bearer: @@ -589,27 +812,46 @@ def generate_offline_package(lsguid, request_file, output_file, ent_name, secret return None if restricted_mode: - import_token = client.import_restricted_offline_activation_request(lsid, pubkey, bearer, loglines) + import_token = client.import_restricted_offline_activation_request( + lsid, pubkey, bearer, loglines + ) else: fingerprint = client.get_fingerprint_for_lsguid(bearer, loglines) if "ERROR" in str(fingerprint): - loglines.append("ERROR: Failed to get device fingerprint for lsguid {0}".format(lsguid)) + loglines.append( + "ERROR: Failed to get device fingerprint for lsguid {0}".format(lsguid) + ) return None loglines.append("INFO: Device fingerprint in LAS: {0!r}".format(fingerprint)) - import_token = client.import_offline_activation_request(request_file, fingerprint, bearer, loglines) + import_token = client.import_offline_activation_request( + request_file, fingerprint, bearer, loglines + ) if not import_token or "ERROR" in import_token: loglines.append("ERROR: Failed to import offline activation request") return None loglines.append("INFO: Import token: {0}".format(import_token)) - gen_resp = client.generate_offline_activation(import_token, bearer, ent_name, loglines) + gen_resp = client.generate_offline_activation( + import_token, bearer, ent_name, loglines + ) if not isinstance(gen_resp, dict): loglines.append("ERROR: Failed to generate offline activation from LAS") return None - loglines.append("INFO: New activation ID: {0}".format(gen_resp.get("newactivationid"))) + loglines.append( + "INFO: New activation ID: {0}".format(gen_resp.get("newactivationid")) + ) - if client.get_blob_from_las(gen_resp["newactivationid"], gen_resp["lsfingerprint"], output_file, bearer, loglines) != "SUCCESS": + if ( + client.get_blob_from_las( + gen_resp["newactivationid"], + gen_resp["lsfingerprint"], + output_file, + bearer, + loglines, + ) + != "SUCCESS" + ): loglines.append("ERROR: Failed to retrieve license blob from LAS") return None diff --git a/plugins/modules/nslaslicense_offline.py b/plugins/modules/nslaslicense_offline.py index 3104fe67..a0731ac6 100644 --- a/plugins/modules/nslaslicense_offline.py +++ b/plugins/modules/nslaslicense_offline.py @@ -123,7 +123,7 @@ - name: Generate and apply offline LAS license using restricted mode (no file upload) delegate_to: localhost netscaler.adc.nslaslicense_offline: - nsip: 10.102.201.230 + nsip: 10.102.201.231 nitro_user: nsroot nitro_pass: "{{ nitro_pass }}" nitro_protocol: https @@ -189,11 +189,27 @@ def main(): argument_spec = dict( - nsip=dict(required=True, type="str", fallback=(env_fallback, ["NETSCALER_NSIP"])), - nitro_user=dict(required=True, type="str", no_log=True, fallback=(env_fallback, ["NETSCALER_NITRO_USER"])), - nitro_pass=dict(required=True, type="str", no_log=True, fallback=(env_fallback, ["NETSCALER_NITRO_PASS"])), + nsip=dict( + required=True, type="str", fallback=(env_fallback, ["NETSCALER_NSIP"]) + ), + nitro_user=dict( + required=True, + type="str", + no_log=True, + fallback=(env_fallback, ["NETSCALER_NITRO_USER"]), + ), + nitro_pass=dict( + required=True, + type="str", + no_log=True, + fallback=(env_fallback, ["NETSCALER_NITRO_PASS"]), + ), nitro_protocol=dict(type="str", choices=["http", "https"], default="https"), - validate_certs=dict(type="bool", default=True, fallback=(env_fallback, ["NETSCALER_VALIDATE_CERTS"])), + validate_certs=dict( + type="bool", + default=True, + fallback=(env_fallback, ["NETSCALER_VALIDATE_CERTS"]), + ), entitlement_name=dict(required=True, type="str"), is_fips=dict(type="bool", default=False), las_secrets_json=dict(required=True, type="str", no_log=False), @@ -206,7 +222,9 @@ def main(): ) if not HAS_PARAMIKO: - module.fail_json(msg="The 'paramiko' Python library is required. Install it with: pip install paramiko") + module.fail_json( + msg="The 'paramiko' Python library is required. Install it with: pip install paramiko" + ) loglines = [] result = dict(changed=False, failed=False, loglines=loglines) @@ -219,10 +237,15 @@ def main(): las_secrets_json = module.params["las_secrets_json"] restricted_mode = module.params["restricted_mode"] if username != "nsroot": - module.fail_json(msg="Only the 'nsroot' account is supported. Got: '{0}'".format(username), **result) + module.fail_json( + msg="Only the 'nsroot' account is supported. Got: '{0}'".format(username), + **result, + ) if not os.path.isfile(las_secrets_json): - module.fail_json(msg="las_secrets_json not found: {0}".format(las_secrets_json), **result) + module.fail_json( + msg="las_secrets_json not found: {0}".format(las_secrets_json), **result + ) _valid_ent_prefixes = ( "FIPS MPX 14", @@ -245,7 +268,9 @@ def main(): ) if not ent_name.startswith(_valid_ent_prefixes): module.fail_json( - msg="Invalid entitlement_name '{0}'. Must start with one of: {1}".format(ent_name, ", ".join(_valid_ent_prefixes)), + msg="Invalid entitlement_name '{0}'. Must start with one of: {1}".format( + ent_name, ", ".join(_valid_ent_prefixes) + ), **result, ) @@ -262,27 +287,49 @@ def main(): else: loglines.append("INFO: Using cached bearer token for entitlement validation") if not bearer: - module.fail_json(msg="Failed to obtain bearer token from LAS to validate entitlement_name", **result) + module.fail_json( + msg="Failed to obtain bearer token from LAS to validate entitlement_name", + **result, + ) ent_resp = las_client.get_customer_entitlements(bearer, platform, loglines) if ent_resp is None: module.fail_json( - msg="Failed to fetch customer entitlements from LAS for platform '{0}'".format(platform), + msg="Failed to fetch customer entitlements from LAS for platform '{0}'".format( + platform + ), **result, ) valid_entitlements = [e.get("type", "") for e in ent_resp.get("entitlements", [])] - loglines.append("INFO: Valid entitlements for platform '{0}': {1}".format(platform, valid_entitlements)) + loglines.append( + "INFO: Valid entitlements for platform '{0}': {1}".format( + platform, valid_entitlements + ) + ) if ent_name not in valid_entitlements: module.fail_json( msg="entitlement_name '{0}' is not a valid customer entitlement for platform '{1}'. Valid entitlements: [{2}]".format( - ent_name, platform, ", ".join(valid_entitlements) if valid_entitlements else "none found" + ent_name, + platform, + ", ".join(valid_entitlements) if valid_entitlements else "none found", ), **result, ) - loglines.append("INFO: entitlement_name '{0}' validated successfully against LAS".format(ent_name)) + loglines.append( + "INFO: entitlement_name '{0}' validated successfully against LAS".format( + ent_name + ) + ) - nitro = NitroHelper(ip, module.params["nitro_protocol"], username, password, module.params["validate_certs"], loglines) + nitro = NitroHelper( + ip, + module.params["nitro_protocol"], + username, + password, + module.params["validate_certs"], + loglines, + ) # Version check and new_api flag ver_info = check_ns_version(nitro, is_fips, loglines) @@ -298,15 +345,24 @@ def main(): release = ver_info["version"] build = ver_info["build"] mapping = NEW_API_MAPPING_FIPS if is_fips else NEW_API_MAPPING_NS - new_api = check_if_new_api(mapping, release, build.split(".")[0], build.split(".")[-1]) - loglines.append("INFO: release={0} build={1} new_api={2}".format(release, build, new_api)) + new_api = check_if_new_api( + mapping, release, build.split(".")[0], build.split(".")[-1] + ) + loglines.append( + "INFO: release={0} build={1} new_api={2}".format(release, build, new_api) + ) # Get activation request package from device temp_dir = os.path.join(tempfile.mkdtemp(prefix="nslas_"), "") try: - ns_file_name = get_offline_request_package(nitro, ip, username, password, temp_dir, new_api, loglines) + ns_file_name = get_offline_request_package( + nitro, ip, username, password, temp_dir, new_api, loglines + ) if not ns_file_name: - module.fail_json(msg="Failed to retrieve activation request package from device", **result) + module.fail_json( + msg="Failed to retrieve activation request package from device", + **result, + ) request_file = os.path.join(temp_dir, ns_file_name) loglines.append("INFO: Got request package: {0}".format(request_file)) @@ -315,27 +371,51 @@ def main(): try: lsguid, lsid, pubkey = extract_request_fields(request_file, loglines) except Exception as e: - loglines.append("WARNING: First parse attempt failed ({0}), re-downloading package".format(str(e))) - ns_file_name = get_offline_request_package(nitro, ip, username, password, temp_dir, new_api, loglines) + loglines.append( + "WARNING: First parse attempt failed ({0}), re-downloading package".format( + str(e) + ) + ) + ns_file_name = get_offline_request_package( + nitro, ip, username, password, temp_dir, new_api, loglines + ) if not ns_file_name: - module.fail_json(msg="Re-download of activation request package failed", **result) + module.fail_json( + msg="Re-download of activation request package failed", **result + ) request_file = os.path.join(temp_dir, ns_file_name) lsguid, lsid, pubkey = extract_request_fields(request_file, loglines) # Generate offline token from LAS cloud output_file = "offline_token_{0}_activation.blob.tgz".format(ip) - if generate_offline_package( - lsguid, request_file, output_file, ent_name, las_secrets_json, loglines, - restricted_mode=restricted_mode, lsid=lsid if restricted_mode else None, pubkey=pubkey if restricted_mode else None, - ) is None: - module.fail_json(msg="Failed to generate offline license token from LAS", **result) + if ( + generate_offline_package( + lsguid, + request_file, + output_file, + ent_name, + las_secrets_json, + loglines, + restricted_mode=restricted_mode, + lsid=lsid if restricted_mode else None, + pubkey=pubkey if restricted_mode else None, + ) + is None + ): + module.fail_json( + msg="Failed to generate offline license token from LAS", **result + ) # Apply license blob to device apply_license_blob_ns(nitro, ip, username, password, output_file, loglines) result["changed"] = True result["output_file"] = output_file - loglines.append("INFO: Successfully generated and applied offline license blob to {0}".format(ip)) + loglines.append( + "INFO: Successfully generated and applied offline license blob to {0}".format( + ip + ) + ) except Exception as e: loglines.append("ERROR: {0}".format(str(e))) diff --git a/plugins/modules/routerdynamicrouting_info.py b/plugins/modules/routerdynamicrouting_info.py index 82cf039c..fbd29ced 100644 --- a/plugins/modules/routerdynamicrouting_info.py +++ b/plugins/modules/routerdynamicrouting_info.py @@ -232,7 +232,7 @@ def main(): if is_exist and response: module_result["info"] = { "commandstring": commandstring, - "response": response[0].get("output", "") + "response": response[0].get("output", ""), } log("INFO: Successfully retrieved routing information") else: From 86a93f6a7bc1fb145f46b85c5ef703f39debbce1 Mon Sep 17 00:00:00 2001 From: lakshmj Date: Thu, 26 Mar 2026 02:14:13 +0000 Subject: [PATCH 3/3] Fixing Bandit security check failure. Signed-off-by: lakshmj --- plugins/module_utils/las_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/module_utils/las_utils.py b/plugins/module_utils/las_utils.py index f29f999c..93df96af 100644 --- a/plugins/module_utils/las_utils.py +++ b/plugins/module_utils/las_utils.py @@ -280,7 +280,7 @@ def _post_json(self, url, headers, payload, log_payload=True): try: error_body = e.read().decode("utf-8", errors="replace").strip() except Exception: - pass + error_body = "" msg = "DEBUG: LAS POST exception: {0}".format(str(e)) if error_body: msg += " response_body={0}".format(error_body)