From: Gabriel Cuba Date: Mon, 30 Oct 2023 18:44:49 +0000 (-0500) Subject: Feature 10996: Adds handling of nslcmop cancel event. kafka_read_callback is now... X-Git-Tag: release-v15.0-start~4 X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FLCM.git;a=commitdiff_plain;h=b6049d378301d13e0ddaadb8433b8ad20f4f8a23 Feature 10996: Adds handling of nslcmop cancel event. kafka_read_callback is now an async method. Change-Id: I2aff05aa282fd469f3ecaa83c3263c0be2fb07c4 Signed-off-by: Gabriel Cuba --- diff --git a/osm_lcm/lcm.py b/osm_lcm/lcm.py index 5ed39ab..9b62d82 100644 --- a/osm_lcm/lcm.py +++ b/osm_lcm/lcm.py @@ -294,7 +294,7 @@ class Lcm: wait_time = 2 if not first_start else 5 await asyncio.sleep(wait_time) - def kafka_read_callback(self, topic, command, params): + async def kafka_read_callback(self, topic, command, params): order_id = 1 if topic != "admin" and command != "ping": @@ -332,6 +332,44 @@ class Lcm: ) ) return + elif topic == "nslcmops": + if command == "cancel": + nslcmop_id = params["_id"] + self.logger.debug("Cancelling nslcmop {}".format(nslcmop_id)) + nsr_id = params["nsInstanceId"] + # cancel the tasks and wait + for task in self.lcm_tasks.cancel("ns", nsr_id, nslcmop_id): + try: + await task + self.logger.debug( + "Cancelled task ended {},{},{}".format( + nsr_id, nslcmop_id, task + ) + ) + except asyncio.CancelledError: + self.logger.debug( + "Task already cancelled and finished {},{},{}".format( + nsr_id, nslcmop_id, task + ) + ) + # update DB + q_filter = {"_id": nslcmop_id} + update_dict = { + "operationState": "FAILED_TEMP", + "isCancelPending": False, + } + unset_dict = { + "cancelMode": None, + } + self.db.set_one( + "nslcmops", + q_filter=q_filter, + update_dict=update_dict, + fail_on_empty=False, + unset=unset_dict, + ) + self.logger.debug("LCM task cancelled {},{}".format(nsr_id, nslcmop_id)) + return elif topic == "pla": if command == "placement": self.ns.update_nsrs_with_pla_result(params) @@ -650,7 +688,6 @@ class Lcm: self.logger.debug( "Task kafka_read Enter with worker_id={}".format(self.worker_id) ) - # future = asyncio.Future() self.consecutive_errors = 0 self.first_start = True while self.consecutive_errors < 10: @@ -665,15 +702,18 @@ class Lcm: "vca", "k8srepo", "pla", + "nslcmops", ) topics_admin = ("admin",) await asyncio.gather( self.msg.aioread( - topics, self.kafka_read_callback, from_beginning=True + topics, + aiocallback=self.kafka_read_callback, + from_beginning=True, ), self.msg_admin.aioread( topics_admin, - self.kafka_read_callback, + aiocallback=self.kafka_read_callback, group_id=False, ), ) @@ -698,7 +738,6 @@ class Lcm: wait_time = 2 if not self.first_start else 5 await asyncio.sleep(wait_time) - # self.logger.debug("Task kafka_read terminating") self.logger.debug("Task kafka_read exit") async def kafka_read_ping(self): diff --git a/osm_lcm/lcm_utils.py b/osm_lcm/lcm_utils.py index 5817b16..12fd7fb 100644 --- a/osm_lcm/lcm_utils.py +++ b/osm_lcm/lcm_utils.py @@ -567,17 +567,19 @@ class TaskRegistry(LcmBase): """ Cancel all active tasks of a concrete ns, nsi, vim_account, sdn identified for _id. If op_id is supplied only this is cancelled, and the same with task_name + :return: cancelled task to be awaited if needed """ if not self.task_registry[topic].get(_id): return for op_id in reversed(self.task_registry[topic][_id]): if target_op_id and target_op_id != op_id: continue - for task_name, task in self.task_registry[topic][_id][op_id].items(): + for task_name, task in list(self.task_registry[topic][_id][op_id].items()): if target_task_name and target_task_name != task_name: continue # result = task.cancel() + yield task # if result: # self.logger.debug("{} _id={} order_id={} task={} cancelled".format(topic, _id, op_id, task_name)) diff --git a/osm_lcm/ns.py b/osm_lcm/ns.py index 3628e50..8238a24 100644 --- a/osm_lcm/ns.py +++ b/osm_lcm/ns.py @@ -2954,7 +2954,15 @@ class NsLcm(LcmBase): stage[1] = stage[2] = "" except asyncio.CancelledError: error_list.append("Cancelled") - # TODO cancel all tasks + await self._cancel_pending_tasks(logging_text, tasks_dict_info) + await self._wait_for_tasks( + logging_text, + tasks_dict_info, + timeout_ns_deploy, + stage, + nslcmop_id, + nsr_id=nsr_id, + ) except Exception as exc: error_list.append(str(exc)) @@ -4614,7 +4622,14 @@ class NsLcm(LcmBase): stage[1] = stage[2] = "" except asyncio.CancelledError: error_list.append("Cancelled") - # TODO cancell all tasks + await self._cancel_pending_tasks(logging_text, tasks_dict_info) + await self._wait_for_tasks( + logging_text, + tasks_dict_info, + timeout_ns_terminate, + stage, + nslcmop_id, + ) except Exception as exc: error_list.append(str(exc)) # update status at database @@ -4777,6 +4792,11 @@ class NsLcm(LcmBase): self._write_op_status(nslcmop_id, stage) return error_detail_list + async def _cancel_pending_tasks(self, logging_text, created_tasks_info): + for task, name in created_tasks_info.items(): + self.logger.debug(logging_text + "Cancelling task: " + name) + task.cancel() + @staticmethod def _map_primitive_params(primitive_desc, params, instantiation_params): """ @@ -7141,16 +7161,31 @@ class NsLcm(LcmBase): exc_info=True, ) finally: + error_list = list() + if exc: + error_list.append(str(exc)) self._write_ns_status( nsr_id=nsr_id, ns_state=None, current_operation="IDLE", current_operation_id=None, ) - if tasks_dict_info: - stage[1] = "Waiting for instantiate pending tasks." - self.logger.debug(logging_text + stage[1]) - exc = await self._wait_for_tasks( + try: + if tasks_dict_info: + stage[1] = "Waiting for instantiate pending tasks." + self.logger.debug(logging_text + stage[1]) + exc = await self._wait_for_tasks( + logging_text, + tasks_dict_info, + self.timeout.ns_deploy, + stage, + nslcmop_id, + nsr_id=nsr_id, + ) + except asyncio.CancelledError: + error_list.append("Cancelled") + await self._cancel_pending_tasks(logging_text, tasks_dict_info) + await self._wait_for_tasks( logging_text, tasks_dict_info, self.timeout.ns_deploy, @@ -7158,10 +7193,13 @@ class NsLcm(LcmBase): nslcmop_id, nsr_id=nsr_id, ) - if exc: + if error_list: + error_detail = "; ".join(error_list) db_nslcmop_update[ "detailed-status" - ] = error_description_nslcmop = "FAILED {}: {}".format(step, exc) + ] = error_description_nslcmop = "FAILED {}: {}".format( + step, error_detail + ) nslcmop_operation_state = "FAILED" if db_nsr: db_nsr_update["operational-status"] = old_operational_status @@ -7175,7 +7213,7 @@ class NsLcm(LcmBase): db_nsr_update[ "detailed-status" ] = "FAILED scaling nslcmop={} {}: {}".format( - nslcmop_id, step, exc + nslcmop_id, step, error_detail ) else: error_description_nslcmop = None @@ -7924,10 +7962,25 @@ class NsLcm(LcmBase): exc_info=True, ) finally: - if tasks_dict_info: - stage[1] = "Waiting for healing pending tasks." - self.logger.debug(logging_text + stage[1]) - exc = await self._wait_for_tasks( + error_list = list() + if exc: + error_list.append(str(exc)) + try: + if tasks_dict_info: + stage[1] = "Waiting for healing pending tasks." + self.logger.debug(logging_text + stage[1]) + exc = await self._wait_for_tasks( + logging_text, + tasks_dict_info, + self.timeout.ns_deploy, + stage, + nslcmop_id, + nsr_id=nsr_id, + ) + except asyncio.CancelledError: + error_list.append("Cancelled") + await self._cancel_pending_tasks(logging_text, tasks_dict_info) + await self._wait_for_tasks( logging_text, tasks_dict_info, self.timeout.ns_deploy, @@ -7935,17 +7988,22 @@ class NsLcm(LcmBase): nslcmop_id, nsr_id=nsr_id, ) - if exc: + if error_list: + error_detail = "; ".join(error_list) db_nslcmop_update[ "detailed-status" - ] = error_description_nslcmop = "FAILED {}: {}".format(step, exc) + ] = error_description_nslcmop = "FAILED {}: {}".format( + step, error_detail + ) nslcmop_operation_state = "FAILED" if db_nsr: db_nsr_update["operational-status"] = old_operational_status db_nsr_update["config-status"] = old_config_status db_nsr_update[ "detailed-status" - ] = "FAILED healing nslcmop={} {}: {}".format(nslcmop_id, step, exc) + ] = "FAILED healing nslcmop={} {}: {}".format( + nslcmop_id, step, error_detail + ) for task, task_name in tasks_dict_info.items(): if not task.done() or task.cancelled() or task.exception(): if task_name.startswith(self.task_name_deploy_vca): diff --git a/osm_lcm/tests/test_lcm.py b/osm_lcm/tests/test_lcm.py index cc18c37..df6b49e 100644 --- a/osm_lcm/tests/test_lcm.py +++ b/osm_lcm/tests/test_lcm.py @@ -85,40 +85,40 @@ class TestLcm(TestCase): # with self.assertRaises(LcmException): # Lcm(config_file=self.config_file_without_storage_path) - def test_kafka_admin_topic_ping_command(self): + async def test_kafka_admin_topic_ping_command(self): params = { "to": "lcm", "from": "lcm", "worker_id": self.my_lcm.worker_id, } self.my_lcm.health_check_file = tempfile.mkstemp()[1] - self.my_lcm.kafka_read_callback("admin", "ping", params) + await self.my_lcm.kafka_read_callback("admin", "ping", params) pattern = "[0-9]{10}.[0-9]{5,8}" # Epoch time is written in health check file. result = re.findall(pattern, check_file_content(self.my_lcm.health_check_file)) self.assertTrue(result) - def test_kafka_wrong_topic_ping_command(self): + async def test_kafka_wrong_topic_ping_command(self): params = { "to": "lcm", "from": "lcm", "worker_id": self.my_lcm.worker_id, } self.my_lcm.health_check_file = tempfile.mkstemp()[1] - self.my_lcm.kafka_read_callback("kafka", "ping", params) + await self.my_lcm.kafka_read_callback("kafka", "ping", params) pattern = "[0-9]{10}.[0-9]{5,8}" # Health check file is empty. result = re.findall(pattern, check_file_content(self.my_lcm.health_check_file)) self.assertFalse(result) - def test_kafka_admin_topic_ping_command_wrong_worker_id(self): + async def test_kafka_admin_topic_ping_command_wrong_worker_id(self): params = { "to": "lcm", "from": "lcm", "worker_id": 5, } self.my_lcm.health_check_file = tempfile.mkstemp()[1] - self.my_lcm.kafka_read_callback("admin", "ping", params) + await self.my_lcm.kafka_read_callback("admin", "ping", params) pattern = "[0-9]{10}.[0-9]{5,8}" # Health check file is empty. result = re.findall(pattern, check_file_content(self.my_lcm.health_check_file))