X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FLCM.git;a=blobdiff_plain;f=osm_lcm%2Flcm.py;fp=osm_lcm%2Flcm.py;h=9b62d82eef07a337eebb1445c675f0de256919ff;hp=5ed39abcf89c26acc1d25462256a2a217d89feb3;hb=b6049d378301d13e0ddaadb8433b8ad20f4f8a23;hpb=e11384e1797ea0a5f8cd084d6f336948170bc640;ds=sidebyside diff --git a/osm_lcm/lcm.py b/osm_lcm/lcm.py index 5ed39ab..9b62d82 100644 --- a/osm_lcm/lcm.py +++ b/osm_lcm/lcm.py @@ -294,7 +294,7 @@ class Lcm: wait_time = 2 if not first_start else 5 await asyncio.sleep(wait_time) - def kafka_read_callback(self, topic, command, params): + async def kafka_read_callback(self, topic, command, params): order_id = 1 if topic != "admin" and command != "ping": @@ -332,6 +332,44 @@ class Lcm: ) ) return + elif topic == "nslcmops": + if command == "cancel": + nslcmop_id = params["_id"] + self.logger.debug("Cancelling nslcmop {}".format(nslcmop_id)) + nsr_id = params["nsInstanceId"] + # cancel the tasks and wait + for task in self.lcm_tasks.cancel("ns", nsr_id, nslcmop_id): + try: + await task + self.logger.debug( + "Cancelled task ended {},{},{}".format( + nsr_id, nslcmop_id, task + ) + ) + except asyncio.CancelledError: + self.logger.debug( + "Task already cancelled and finished {},{},{}".format( + nsr_id, nslcmop_id, task + ) + ) + # update DB + q_filter = {"_id": nslcmop_id} + update_dict = { + "operationState": "FAILED_TEMP", + "isCancelPending": False, + } + unset_dict = { + "cancelMode": None, + } + self.db.set_one( + "nslcmops", + q_filter=q_filter, + update_dict=update_dict, + fail_on_empty=False, + unset=unset_dict, + ) + self.logger.debug("LCM task cancelled {},{}".format(nsr_id, nslcmop_id)) + return elif topic == "pla": if command == "placement": self.ns.update_nsrs_with_pla_result(params) @@ -650,7 +688,6 @@ class Lcm: self.logger.debug( "Task kafka_read Enter with worker_id={}".format(self.worker_id) ) - # future = asyncio.Future() self.consecutive_errors = 0 self.first_start = True while self.consecutive_errors < 10: @@ -665,15 +702,18 @@ class Lcm: "vca", "k8srepo", "pla", + "nslcmops", ) topics_admin = ("admin",) await asyncio.gather( self.msg.aioread( - topics, self.kafka_read_callback, from_beginning=True + topics, + aiocallback=self.kafka_read_callback, + from_beginning=True, ), self.msg_admin.aioread( topics_admin, - self.kafka_read_callback, + aiocallback=self.kafka_read_callback, group_id=False, ), ) @@ -698,7 +738,6 @@ class Lcm: wait_time = 2 if not self.first_start else 5 await asyncio.sleep(wait_time) - # self.logger.debug("Task kafka_read terminating") self.logger.debug("Task kafka_read exit") async def kafka_read_ping(self):