+ # look for primitive
+ config_primitive_desc = descriptor_configuration = None
+ if vdu_id:
+ descriptor_configuration = get_configuration(db_vnfd, vdu_id)
+ elif kdu_name:
+ descriptor_configuration = get_configuration(db_vnfd, kdu_name)
+ elif vnf_index:
+ descriptor_configuration = get_configuration(db_vnfd, db_vnfd["id"])
+ else:
+ descriptor_configuration = db_nsd.get("ns-configuration")
+
+ if descriptor_configuration and descriptor_configuration.get(
+ "config-primitive"
+ ):
+ for config_primitive in descriptor_configuration["config-primitive"]:
+ if config_primitive["name"] == primitive:
+ config_primitive_desc = config_primitive
+ break
+
+ if not config_primitive_desc:
+ if not (kdu_name and primitive in ("upgrade", "rollback", "status")):
+ raise LcmException(
+ "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
+ primitive
+ )
+ )
+ primitive_name = primitive
+ ee_descriptor_id = None
+ else:
+ primitive_name = config_primitive_desc.get(
+ "execution-environment-primitive", primitive
+ )
+ ee_descriptor_id = config_primitive_desc.get(
+ "execution-environment-ref"
+ )
+
+ if vnf_index:
+ if vdu_id:
+ vdur = next(
+ (x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None
+ )
+ desc_params = parse_yaml_strings(vdur.get("additionalParams"))
+ elif kdu_name:
+ kdur = next(
+ (x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None
+ )
+ desc_params = parse_yaml_strings(kdur.get("additionalParams"))
+ else:
+ desc_params = parse_yaml_strings(
+ db_vnfr.get("additionalParamsForVnf")
+ )
+ else:
+ desc_params = parse_yaml_strings(db_nsr.get("additionalParamsForNs"))
+ if kdu_name and get_configuration(db_vnfd, kdu_name):
+ kdu_configuration = get_configuration(db_vnfd, kdu_name)
+ actions = set()
+ for primitive in kdu_configuration.get("initial-config-primitive", []):
+ actions.add(primitive["name"])
+ for primitive in kdu_configuration.get("config-primitive", []):
+ actions.add(primitive["name"])
+ kdu = find_in_list(
+ nsr_deployed["K8s"],
+ lambda kdu: kdu_name == kdu["kdu-name"]
+ and kdu["member-vnf-index"] == vnf_index,
+ )
+ kdu_action = (
+ True
+ if primitive_name in actions
+ and kdu["k8scluster-type"] not in ("helm-chart", "helm-chart-v3")
+ else False
+ )
+
+ # TODO check if ns is in a proper status
+ if kdu_name and (
+ primitive_name in ("upgrade", "rollback", "status") or kdu_action
+ ):
+ # kdur and desc_params already set from before
+ if primitive_params:
+ desc_params.update(primitive_params)
+ # TODO Check if we will need something at vnf level
+ for index, kdu in enumerate(get_iterable(nsr_deployed, "K8s")):
+ if (
+ kdu_name == kdu["kdu-name"]
+ and kdu["member-vnf-index"] == vnf_index
+ ):
+ break
+ else:
+ raise LcmException(
+ "KDU '{}' for vnf '{}' not deployed".format(kdu_name, vnf_index)
+ )
+
+ if kdu.get("k8scluster-type") not in self.k8scluster_map:
+ msg = "unknown k8scluster-type '{}'".format(
+ kdu.get("k8scluster-type")
+ )
+ raise LcmException(msg)
+
+ db_dict = {
+ "collection": "nsrs",
+ "filter": {"_id": nsr_id},
+ "path": "_admin.deployed.K8s.{}".format(index),
+ }
+ self.logger.debug(
+ logging_text
+ + "Exec k8s {} on {}.{}".format(primitive_name, vnf_index, kdu_name)
+ )
+ step = "Executing kdu {}".format(primitive_name)
+ if primitive_name == "upgrade":
+ if desc_params.get("kdu_model"):
+ kdu_model = desc_params.get("kdu_model")
+ del desc_params["kdu_model"]
+ else:
+ kdu_model = kdu.get("kdu-model")
+ parts = kdu_model.split(sep=":")
+ if len(parts) == 2:
+ kdu_model = parts[0]
+
+ detailed_status = await asyncio.wait_for(
+ self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
+ cluster_uuid=kdu.get("k8scluster-uuid"),
+ kdu_instance=kdu.get("kdu-instance"),
+ atomic=True,
+ kdu_model=kdu_model,
+ params=desc_params,
+ db_dict=db_dict,
+ timeout=timeout_ns_action,
+ ),
+ timeout=timeout_ns_action + 10,
+ )
+ self.logger.debug(
+ logging_text + " Upgrade of kdu {} done".format(detailed_status)
+ )
+ elif primitive_name == "rollback":
+ detailed_status = await asyncio.wait_for(
+ self.k8scluster_map[kdu["k8scluster-type"]].rollback(
+ cluster_uuid=kdu.get("k8scluster-uuid"),
+ kdu_instance=kdu.get("kdu-instance"),
+ db_dict=db_dict,
+ ),
+ timeout=timeout_ns_action,
+ )
+ elif primitive_name == "status":
+ detailed_status = await asyncio.wait_for(
+ self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
+ cluster_uuid=kdu.get("k8scluster-uuid"),
+ kdu_instance=kdu.get("kdu-instance"),
+ vca_id=vca_id,
+ ),
+ timeout=timeout_ns_action,
+ )
+ else:
+ kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(
+ kdu["kdu-name"], nsr_id
+ )
+ params = self._map_primitive_params(
+ config_primitive_desc, primitive_params, desc_params
+ )
+
+ detailed_status = await asyncio.wait_for(
+ self.k8scluster_map[kdu["k8scluster-type"]].exec_primitive(
+ cluster_uuid=kdu.get("k8scluster-uuid"),
+ kdu_instance=kdu_instance,
+ primitive_name=primitive_name,
+ params=params,
+ db_dict=db_dict,
+ timeout=timeout_ns_action,
+ vca_id=vca_id,
+ ),
+ timeout=timeout_ns_action,
+ )
+
+ if detailed_status:
+ nslcmop_operation_state = "COMPLETED"
+ else:
+ detailed_status = ""
+ nslcmop_operation_state = "FAILED"
+ else:
+ ee_id, vca_type = self._look_for_deployed_vca(
+ nsr_deployed["VCA"],
+ member_vnf_index=vnf_index,
+ vdu_id=vdu_id,
+ vdu_count_index=vdu_count_index,
+ ee_descriptor_id=ee_descriptor_id,
+ )
+ for vca_index, vca_deployed in enumerate(
+ db_nsr["_admin"]["deployed"]["VCA"]
+ ):
+ if vca_deployed.get("member-vnf-index") == vnf_index:
+ db_dict = {
+ "collection": "nsrs",
+ "filter": {"_id": nsr_id},
+ "path": "_admin.deployed.VCA.{}.".format(vca_index),
+ }
+ break
+ (
+ nslcmop_operation_state,
+ detailed_status,
+ ) = await self._ns_execute_primitive(
+ ee_id,
+ primitive=primitive_name,
+ primitive_params=self._map_primitive_params(
+ config_primitive_desc, primitive_params, desc_params
+ ),
+ timeout=timeout_ns_action,
+ vca_type=vca_type,
+ db_dict=db_dict,
+ vca_id=vca_id,
+ )
+
+ db_nslcmop_update["detailed-status"] = detailed_status
+ error_description_nslcmop = (
+ detailed_status if nslcmop_operation_state == "FAILED" else ""
+ )
+ self.logger.debug(
+ logging_text
+ + " task Done with result {} {}".format(
+ nslcmop_operation_state, detailed_status
+ )
+ )
+ return # database update is called inside finally
+
+ except (DbException, LcmException, N2VCException, K8sException) as e:
+ self.logger.error(logging_text + "Exit Exception {}".format(e))
+ exc = e
+ except asyncio.CancelledError:
+ self.logger.error(
+ logging_text + "Cancelled Exception while '{}'".format(step)
+ )
+ exc = "Operation was cancelled"
+ except asyncio.TimeoutError:
+ self.logger.error(logging_text + "Timeout while '{}'".format(step))
+ exc = "Timeout"
+ except Exception as e:
+ exc = traceback.format_exc()
+ self.logger.critical(
+ logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
+ exc_info=True,
+ )
+ finally:
+ if exc:
+ db_nslcmop_update[
+ "detailed-status"
+ ] = (
+ detailed_status
+ ) = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
+ nslcmop_operation_state = "FAILED"
+ if db_nsr:
+ self._write_ns_status(
+ nsr_id=nsr_id,
+ ns_state=db_nsr[
+ "nsState"
+ ], # TODO check if degraded. For the moment use previous status
+ current_operation="IDLE",
+ current_operation_id=None,
+ # error_description=error_description_nsr,
+ # error_detail=error_detail,
+ other_update=db_nsr_update,
+ )
+
+ self._write_op_status(
+ op_id=nslcmop_id,
+ stage="",
+ error_message=error_description_nslcmop,
+ operation_state=nslcmop_operation_state,
+ other_update=db_nslcmop_update,
+ )
+
+ if nslcmop_operation_state:
+ try:
+ await self.msg.aiowrite(
+ "ns",
+ "actioned",
+ {
+ "nsr_id": nsr_id,
+ "nslcmop_id": nslcmop_id,
+ "operationState": nslcmop_operation_state,
+ },
+ loop=self.loop,
+ )
+ except Exception as e:
+ self.logger.error(
+ logging_text + "kafka_write notification Exception {}".format(e)
+ )
+ self.logger.debug(logging_text + "Exit")
+ self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
+ return nslcmop_operation_state, detailed_status
+
+ async def _ns_charm_upgrade(
+ self,
+ ee_id,
+ charm_id,
+ charm_type,
+ path,
+ timeout: float = None,
+ ) -> (str, str):
+ """This method upgrade charms in VNF instances
+
+ Args:
+ ee_id: Execution environment id
+ path: Local path to the charm
+ charm_id: charm-id
+ charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
+ timeout: (Float) Timeout for the ns update operation
+
+ Returns:
+ result: (str, str) COMPLETED/FAILED, details
+ """
+ try:
+ charm_type = charm_type or "lxc_proxy_charm"
+ output = await self.vca_map[charm_type].upgrade_charm(
+ ee_id=ee_id,
+ path=path,
+ charm_id=charm_id,
+ charm_type=charm_type,
+ timeout=timeout or self.timeout_ns_update,
+ )
+
+ if output:
+ return "COMPLETED", output
+
+ except (LcmException, asyncio.CancelledError):
+ raise
+
+ except Exception as e:
+
+ self.logger.debug("Error upgrading charm {}".format(path))
+
+ return "FAILED", "Error upgrading charm {}: {}".format(path, e)
+
+ async def update(self, nsr_id, nslcmop_id):
+ """Update NS according to different update types
+
+ This method performs upgrade of VNF instances then updates the revision
+ number in VNF record
+
+ Args:
+ nsr_id: Network service will be updated
+ nslcmop_id: ns lcm operation id
+
+ Returns:
+ It may raise DbException, LcmException, N2VCException, K8sException
+
+ """
+ # Try to lock HA task here
+ task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
+ if not task_is_locked_by_me:
+ return
+
+ logging_text = "Task ns={} update={} ".format(nsr_id, nslcmop_id)
+ self.logger.debug(logging_text + "Enter")
+
+ # Set the required variables to be filled up later
+ db_nsr = None
+ db_nslcmop_update = {}
+ vnfr_update = {}
+ nslcmop_operation_state = None
+ db_nsr_update = {}
+ error_description_nslcmop = ""
+ exc = None
+ change_type = ""
+ detailed_status = ""
+
+ try:
+ # wait for any previous tasks in process
+ step = "Waiting for previous operations to terminate"
+ await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
+ self._write_ns_status(
+ nsr_id=nsr_id,
+ ns_state=None,
+ current_operation="UPDATING",
+ current_operation_id=nslcmop_id,
+ )
+
+ step = "Getting nslcmop from database"
+ db_nslcmop = self.db.get_one(
+ "nslcmops", {"_id": nslcmop_id}, fail_on_empty=False
+ )
+ update_type = db_nslcmop["operationParams"]["updateType"]
+
+ step = "Getting nsr from database"
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+ old_operational_status = db_nsr["operational-status"]
+ db_nsr_update["operational-status"] = "updating"
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ nsr_deployed = db_nsr["_admin"].get("deployed")
+
+ if update_type == "CHANGE_VNFPKG":
+
+ # Get the input parameters given through update request
+ vnf_instance_id = db_nslcmop["operationParams"][
+ "changeVnfPackageData"
+ ].get("vnfInstanceId")
+
+ vnfd_id = db_nslcmop["operationParams"]["changeVnfPackageData"].get(
+ "vnfdId"
+ )
+ timeout_seconds = db_nslcmop["operationParams"].get("timeout_ns_update")
+
+ step = "Getting vnfr from database"
+ db_vnfr = self.db.get_one(
+ "vnfrs", {"_id": vnf_instance_id}, fail_on_empty=False
+ )