From: tierno Date: Wed, 12 Sep 2018 15:47:18 +0000 (+0200) Subject: fixed issue where kafka message is sent before database update X-Git-Tag: v5.0.0~27 X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FLCM.git;a=commitdiff_plain;h=6e9d2eb347c5daca2a598b607dc6d51c15662406 fixed issue where kafka message is sent before database update Change-Id: I09345c0e299d155f422696cf5fadd645bdac3fa1 Signed-off-by: tierno --- diff --git a/osm_lcm/lcm.py b/osm_lcm/lcm.py index bd83c12..b9417f2 100644 --- a/osm_lcm/lcm.py +++ b/osm_lcm/lcm.py @@ -28,11 +28,21 @@ from time import time __author__ = "Alfonso Tierno" min_RO_version = [0, 5, 72] +min_n2vc_version = "0.0.2" +min_common_version = "0.1.7" # uncomment if LCM is installed as library and installed, and get them from __init__.py -lcm_version = '0.1.14' -lcm_version_date = '2018-09-11' +lcm_version = '0.1.15' +lcm_version_date = '2018-09-13' +def versiontuple(v): + """utility for compare dot separate versions. Fills with zeros to proper number comparison""" + filled = [] + for point in v.split("."): + filled.append(point.zfill(8)) + return tuple(filled) + + class LcmException(Exception): pass @@ -207,11 +217,12 @@ class Lcm: # check version of N2VC # TODO enhance with int conversion or from distutils.version import LooseVersion # or with list(map(int, version.split("."))) - if N2VC_version < "0.0.2": - raise LcmException("Not compatible osm/N2VC version '{}'. Needed '0.0.2' or higher".format(N2VC_version)) - if common_version < "0.1.7": - raise LcmException("Not compatible osm/common version '{}'. Needed '0.1.7' or higher".format( - common_version)) + if versiontuple(N2VC_version) < versiontuple(min_n2vc_version): + raise LcmException("Not compatible osm/N2VC version '{}'. Needed '{}' or higher".format( + N2VC_version, min_n2vc_version)) + if versiontuple(common_version) < versiontuple("0.1.7"): + raise LcmException("Not compatible osm/common version '{}'. Needed '{}' or higher".format( + common_version, min_common_version)) try: # TODO check database version @@ -662,7 +673,7 @@ class Lcm: if ci_file: ci_file.close() - def n2vc_callback(self, model_name, application_name, status, message, db_nsr, db_nslcmop, task=None): + def n2vc_callback(self, model_name, application_name, status, message, n2vc_info, task=None): """ Callback both for charm status change and task completion :param model_name: Charm model name @@ -677,23 +688,26 @@ class Lcm: - removing, - removed :param message: detailed message error - :param db_nsr: nsr database content - :param db_nslcmop: nslcmop database content + :param n2vc_info dictionary with information shared with instantiate task. Contains: + nsr_id: + nslcmop_id: + lcmOperationType: currently "instantiate" + deployed: dictionary with {: {operational-status: , detailed-status: }} + db_update: dictionary to be filled with the changes to be wrote to database with format key.key.key: value + n2vc_event: event used to notify instantiation task that some change has been produced :param task: None for charm status change, or task for completion task callback :return: """ - nsr_id = None - nslcmop_id = None - db_nsr_update = {} - db_nslcmop_update = {} try: - nsr_id = db_nsr["_id"] - nsr_lcm = db_nsr["_admin"]["deployed"] - nslcmop_id = db_nslcmop["_id"] - ns_operation = db_nslcmop["lcmOperationType"] + nsr_id = n2vc_info["nsr_id"] + deployed = n2vc_info["deployed"] + db_nsr_update = n2vc_info["db_update"] + nslcmop_id = n2vc_info["nslcmop_id"] + ns_operation = n2vc_info["lcmOperationType"] + n2vc_event = n2vc_info["n2vc_event"] logging_text = "Task ns={} {}={} [n2vc_callback] application={}".format(nsr_id, ns_operation, nslcmop_id, application_name) - vca_deployed = nsr_lcm["VCA"].get(application_name) + vca_deployed = deployed.get(application_name) if not vca_deployed: self.logger.error(logging_text + " Not present at nsr._admin.deployed.VCA") return @@ -701,33 +715,21 @@ class Lcm: if task: if task.cancelled(): self.logger.debug(logging_text + " task Cancelled") - # TODO update db_nslcmop - return + vca_deployed['operational-status'] = "error" + db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(application_name)] = "error" + vca_deployed['detailed-status'] = "Task Cancelled" + db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(application_name)] = "Task Cancelled" - if task.done(): + elif task.done(): exc = task.exception() if exc: self.logger.error(logging_text + " task Exception={}".format(exc)) - if ns_operation in ("instantiate", "terminate"): - vca_deployed['operational-status'] = "error" - db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(application_name)] = \ - "error" - vca_deployed['detailed-status'] = str(exc) - db_nsr_update[ - "_admin.deployed.VCA.{}.detailed-status".format(application_name)] = str(exc) - elif ns_operation == "action": - db_nslcmop_update["operationState"] = "FAILED" - db_nslcmop_update["detailed-status"] = str(exc) - db_nslcmop_update["statusEnteredTime"] = time() - return - + vca_deployed['operational-status'] = "error" + db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(application_name)] = "error" + vca_deployed['detailed-status'] = str(exc) + db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(application_name)] = str(exc) else: self.logger.debug(logging_text + " task Done") - # TODO revise with Adam if action is finished and ok when task is done - if ns_operation == "action": - db_nslcmop_update["operationState"] = "COMPLETED" - db_nslcmop_update["detailed-status"] = "Done" - db_nslcmop_update["statusEnteredTime"] = time() # task is Done, but callback is still ongoing. So ignore return elif status: @@ -741,58 +743,10 @@ class Lcm: else: self.logger.critical(logging_text + " Enter with bad parameters", exc_info=True) return - - all_active = True - status_map = {} - n2vc_error_text = [] # contain text error list. If empty no one is in error status - for _, vca_info in nsr_lcm["VCA"].items(): - vca_status = vca_info["operational-status"] - if vca_status not in status_map: - # Initialize it - status_map[vca_status] = 0 - status_map[vca_status] += 1 - - if vca_status != "active": - all_active = False - if vca_status in ("error", "blocked"): - n2vc_error_text.append("member_vnf_index={} vdu_id={} {}: {}".format(vca_info["member-vnf-index"], - vca_info["vdu_id"], vca_status, - vca_info["detailed-status"])) - - if all_active: - self.logger.debug(logging_text + " All active") - db_nsr_update["config-status"] = "configured" - db_nsr_update["detailed-status"] = "done" - db_nslcmop_update["operationState"] = "COMPLETED" - db_nslcmop_update["detailed-status"] = "Done" - db_nslcmop_update["statusEnteredTime"] = time() - elif n2vc_error_text: - db_nsr_update["config-status"] = "failed" - error_text = "fail configuring " + ";".join(n2vc_error_text) - db_nsr_update["detailed-status"] = error_text - db_nslcmop_update["operationState"] = "FAILED_TEMP" - db_nslcmop_update["detailed-status"] = error_text - db_nslcmop_update["statusEnteredTime"] = time() - else: - cs = "configuring: " - separator = "" - for status, num in status_map.items(): - cs += separator + "{}: {}".format(status, num) - separator = ", " - db_nsr_update["config-status"] = cs - db_nsr_update["detailed-status"] = cs - db_nslcmop_update["detailed-status"] = cs - + # wake up instantiate task + n2vc_event.set() except Exception as e: self.logger.critical(logging_text + " Exception {}".format(e), exc_info=True) - finally: - try: - if db_nslcmop_update: - self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update) - if db_nsr_update: - self.update_db_2("nsrs", nsr_id, db_nsr_update) - except Exception as e: - self.logger.critical(logging_text + " Update database Exception {}".format(e), exc_info=True) def ns_params_2_RO(self, ns_params, nsd, vnfd_dict): """ @@ -1071,9 +1025,11 @@ class Lcm: db_nslcmop = None db_nsr_update = {} db_nslcmop_update = {} + nslcmop_operation_state = None db_vnfrs = {} RO_descriptor_number = 0 # number of descriptors created at RO descriptor_id_2_RO = {} # map between vnfd/nsd id to the id used at RO + n2vc_info = {} exc = None try: step = "Getting nslcmop={} from db".format(nslcmop_id) @@ -1262,7 +1218,7 @@ class Lcm: # The parameters we'll need to deploy a charm number_to_configure = 0 - def deploy(vnf_index, vdu_id, mgmt_ip_address, config_primitive=None): + def deploy(vnf_index, vdu_id, mgmt_ip_address, n2vc_info, config_primitive=None): """An inner function to deploy the charm from either vnf or vdu vnf_index is mandatory. vdu_id can be None for a vnf configuration or the id for vdu configuration """ @@ -1319,6 +1275,13 @@ class Lcm: self.logger.debug("Task create_ns={} Passing artifacts path '{}' for {}".format(nsr_id, charm_path, proxy_charm)) + if not n2vc_info: + n2vc_info["nsr_id"] = nsr_id + n2vc_info["nslcmop_id"] = nslcmop_id + n2vc_info["n2vc_event"] = asyncio.Event(loop=self.loop) + n2vc_info["lcmOperationType"] = "instantiate" + n2vc_info["deployed"] = nsr_lcm["VCA"] + n2vc_info["db_update"] = db_nsr_update task = asyncio.ensure_future( self.n2vc.DeployCharms( model_name, # The network service name @@ -1328,13 +1291,12 @@ class Lcm: params, # Runtime params, like mgmt ip {}, # for native charms only self.n2vc_callback, # Callback for status changes - db_nsr, # Callback parameter - db_nslcmop, + n2vc_info, # Callback parameter None, # Callback parameter (task) ) ) task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None, None, - db_nsr, db_nslcmop)) + n2vc_info)) self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "create_charm:" + application_name, task) step = "Looking for needed vnfd to configure" @@ -1360,7 +1322,7 @@ class Lcm: # subsequent calls will be a nop and return immediately. step = "connecting to N2VC to configure vnf {}".format(vnf_index) await self.n2vc.login() - deploy(vnf_index, None, db_vnfrs[vnf_index]["ip-address"], config_primitive) + deploy(vnf_index, None, db_vnfrs[vnf_index]["ip-address"], n2vc_info, config_primitive) number_to_configure += 1 # Deploy charms for each VDU that supports one. @@ -1380,31 +1342,78 @@ class Lcm: step = "connecting to N2VC to configure vdu {} from vnf {}".format(vdu["id"], vnf_index) await self.n2vc.login() deploy(vnf_index, vdu["id"], db_vnfrs[vnf_index]["vdur"][vdu_index]["ip-address"], - config_primitive) + n2vc_info, config_primitive) number_to_configure += 1 vdu_index += 1 + db_nsr_update["operational-status"] = "running" + configuration_failed = False if number_to_configure: - db_nsr_update["config-status"] = "configuring" - db_nsr_update["operational-status"] = "running" - db_nsr_update["detailed-status"] = "configuring: init: {}".format(number_to_configure) - db_nslcmop_update["detailed-status"] = "configuring: init: {}".format(number_to_configure) - else: - db_nslcmop_update["operationState"] = "COMPLETED" + old_status = "configuring: init: {}".format(number_to_configure) + db_nsr_update["config-status"] = old_status + db_nsr_update["detailed-status"] = old_status + db_nslcmop_update["detailed-status"] = old_status + + # wait until all are configured. + while True: + if db_nsr_update: + self.update_db_2("nsrs", nsr_id, db_nsr_update) + if db_nslcmop_update: + self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update) + await n2vc_info["n2vc_event"].wait() + n2vc_info["n2vc_event"].clear() + all_active = True + status_map = {} + n2vc_error_text = [] # contain text error list. If empty no one is in error status + for _, vca_info in nsr_lcm["VCA"].items(): + vca_status = vca_info["operational-status"] + if vca_status not in status_map: + # Initialize it + status_map[vca_status] = 0 + status_map[vca_status] += 1 + + if vca_status != "active": + all_active = False + if vca_status in ("error", "blocked"): + n2vc_error_text.append( + "member_vnf_index={} vdu_id={} {}: {}".format(vca_info["member-vnf-index"], + vca_info["vdu_id"], vca_status, + vca_info["detailed-status"])) + + if all_active: + break + elif n2vc_error_text: + db_nsr_update["config-status"] = "failed" + error_text = "fail configuring " + ";".join(n2vc_error_text) + db_nsr_update["detailed-status"] = error_text + db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED_TEMP" + db_nslcmop_update["detailed-status"] = error_text + db_nslcmop_update["statusEnteredTime"] = time() + configuration_failed = True + break + else: + cs = "configuring: " + separator = "" + for status, num in status_map.items(): + cs += separator + "{}: {}".format(status, num) + separator = ", " + if old_status != cs: + db_nsr_update["config-status"] = cs + db_nsr_update["detailed-status"] = cs + db_nslcmop_update["detailed-status"] = cs + old_status = cs + + if not configuration_failed: + # all is done + db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED" db_nslcmop_update["statusEnteredTime"] = time() db_nslcmop_update["detailed-status"] = "done" db_nsr_update["config-status"] = "configured" db_nsr_update["detailed-status"] = "done" - db_nsr_update["operational-status"] = "running" - step = "Sending monitoring parameters to PM" + + # step = "Sending monitoring parameters to PM" # for c_vnf in nsd["constituent-vnfd"]: # await self.create_monitoring(nsr_id, c_vnf["member-vnf-index"], needed_vnfd[c_vnf["vnfd-id-ref"]]) - try: - await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id}) - except Exception as e: - self.logger.error(logging_text + "kafka_write notification Exception {}".format(e)) - - self.logger.debug(logging_text + "Exit") return except (ROclient.ROClientException, DbException, LcmException) as e: @@ -1424,12 +1433,20 @@ class Lcm: db_nsr_update["operational-status"] = "failed" if db_nslcmop: db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc) - db_nslcmop_update["operationState"] = "FAILED" + db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED" db_nslcmop_update["statusEnteredTime"] = time() if db_nsr_update: self.update_db_2("nsrs", nsr_id, db_nsr_update) if db_nslcmop_update: self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update) + if nslcmop_operation_state: + try: + await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id, + "operationState": nslcmop_operation_state}) + except Exception as e: + self.logger.error(logging_text + "kafka_write notification Exception {}".format(e)) + + self.logger.debug(logging_text + "Exit") self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate") async def ns_terminate(self, nsr_id, nslcmop_id): @@ -1443,6 +1460,7 @@ class Lcm: vca_task_dict = {} db_nsr_update = {} db_nslcmop_update = {} + nslcmop_operation_state = None try: step = "Getting nslcmop={} from db".format(nslcmop_id) db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id}) @@ -1613,12 +1631,13 @@ class Lcm: db_nsr_update["operational-status"] = "failed" db_nsr_update["detailed-status"] = "Deletion errors " + "; ".join(failed_detail) db_nslcmop_update["detailed-status"] = "; ".join(failed_detail) - db_nslcmop_update["operationState"] = "FAILED" + db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED" db_nslcmop_update["statusEnteredTime"] = time() elif db_nslcmop["operationParams"].get("autoremove"): self.db.del_one("nsrs", {"_id": nsr_id}) db_nsr_update.clear() self.db.del_list("nslcmops", {"nsInstanceId": nsr_id}) + nslcmop_operation_state = "COMPLETED" db_nslcmop_update.clear() self.db.del_list("vnfrs", {"nsr-id-ref": nsr_id}) self.logger.debug(logging_text + "Delete from database") @@ -1627,13 +1646,8 @@ class Lcm: db_nsr_update["detailed-status"] = "Done" db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED" db_nslcmop_update["detailed-status"] = "Done" - db_nslcmop_update["operationState"] = "COMPLETED" + db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED" db_nslcmop_update["statusEnteredTime"] = time() - try: - await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id}) - except Exception as e: - self.logger.error(logging_text + "kafka_write notification Exception {}".format(e)) - self.logger.debug(logging_text + "Exit") except (ROclient.ROClientException, DbException) as e: self.logger.error(logging_text + "Exit Exception {}".format(e)) @@ -1646,15 +1660,20 @@ class Lcm: self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True) finally: if exc and db_nslcmop: - db_nslcmop_update = { - "detailed-status": "FAILED {}: {}".format(step, exc), - "operationState": "FAILED", - "statusEnteredTime": time(), - } + db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc) + db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED" + db_nslcmop_update["statusEnteredTime"] = time() if db_nslcmop_update: self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update) if db_nsr_update: self.update_db_2("nsrs", nsr_id, db_nsr_update) + if nslcmop_operation_state: + try: + await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id, + "operationState": nslcmop_operation_state}) + except Exception as e: + self.logger.error(logging_text + "kafka_write notification Exception {}".format(e)) + self.logger.debug(logging_text + "Exit") self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate") async def _ns_execute_primitive(self, db_deployed, nsr_name, member_vnf_index, vdu_id, primitive, primitive_params): @@ -1720,7 +1739,8 @@ class Lcm: # get all needed from database db_nsr = None db_nslcmop = None - db_nslcmop_update = None + db_nslcmop_update = {} + nslcmop_operation_state = None exc = None try: step = "Getting information from database" @@ -1736,11 +1756,9 @@ class Lcm: primitive_params = db_nslcmop["operationParams"]["primitive_params"] result, result_detail = await self._ns_execute_primitive(nsr_lcm, nsr_name, vnf_index, vdu_id, primitive, primitive_params) - db_nslcmop_update = { - "detailed-status": result_detail, - "operationState": result, - "statusEnteredTime": time() - } + db_nslcmop_update["detailed-status"] = result_detail + db_nslcmop_update["operationState"] = nslcmop_operation_state = result + db_nslcmop_update["statusEnteredTime"] = time() self.logger.debug(logging_text + " task Done with result {} {}".format(result, result_detail)) return # database update is called inside finally @@ -1755,13 +1773,19 @@ class Lcm: self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True) finally: if exc and db_nslcmop: - db_nslcmop_update = { - "detailed-status": "FAILED {}: {}".format(step, exc), - "operationState": "FAILED", - "statusEnteredTime": time(), - } + db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc) + db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED" + db_nslcmop_update["statusEnteredTime"] = time() if db_nslcmop_update: self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update) + self.logger.debug(logging_text + "Exit") + if nslcmop_operation_state: + try: + await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id, + "operationState": nslcmop_operation_state}) + except Exception as e: + self.logger.error(logging_text + "kafka_write notification Exception {}".format(e)) + self.logger.debug(logging_text + "Exit") self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action") async def ns_scale(self, nsr_id, nslcmop_id): @@ -1771,6 +1795,7 @@ class Lcm: db_nsr = None db_nslcmop = None db_nslcmop_update = {} + nslcmop_operation_state = None db_nsr_update = {} exc = None try: @@ -2011,19 +2036,11 @@ class Lcm: if result == "FAILED": raise LcmException(result_detail) - db_nslcmop_update["operationState"] = "COMPLETED" + db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED" db_nslcmop_update["statusEnteredTime"] = time() db_nslcmop_update["detailed-status"] = "done" db_nsr_update["detailed-status"] = "done" db_nsr_update["operational-status"] = "running" - try: - await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id}) - # if cooldown_time: - # await asyncio.sleep(cooldown_time) - # await self.msg.aiowrite("ns", "scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id}) - except Exception as e: - self.logger.error(logging_text + "kafka_write notification Exception {}".format(e)) - self.logger.debug(logging_text + "Exit Ok") return except (ROclient.ROClientException, DbException, LcmException) as e: self.logger.error(logging_text + "Exit Exception {}".format(e)) @@ -2037,11 +2054,9 @@ class Lcm: finally: if exc: if db_nslcmop: - db_nslcmop_update = { - "detailed-status": "FAILED {}: {}".format(step, exc), - "operationState": "FAILED", - "statusEnteredTime": time(), - } + db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc) + db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED" + db_nslcmop_update["statusEnteredTime"] = time() if db_nsr: db_nsr_update["operational-status"] = "FAILED {}: {}".format(step, exc), db_nsr_update["detailed-status"] = "failed" @@ -2049,6 +2064,16 @@ class Lcm: self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update) if db_nsr_update: self.update_db_2("nsrs", nsr_id, db_nsr_update) + if nslcmop_operation_state: + try: + await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id, + "operationState": nslcmop_operation_state}) + # if cooldown_time: + # await asyncio.sleep(cooldown_time) + # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id}) + except Exception as e: + self.logger.error(logging_text + "kafka_write notification Exception {}".format(e)) + self.logger.debug(logging_text + "Exit") self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale") async def test(self, param=None):