X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_lcm%2Flcm.py;h=f8a97aeb4b0443a88a0399f0c2ad777d4bd6b9d5;hb=refs%2Fchanges%2F84%2F14284%2F4;hp=5ed39abcf89c26acc1d25462256a2a217d89feb3;hpb=4c0e6805c44f9ed1d0bb35161bf69645f5b84151;p=osm%2FLCM.git diff --git a/osm_lcm/lcm.py b/osm_lcm/lcm.py index 5ed39ab..f8a97ae 100644 --- a/osm_lcm/lcm.py +++ b/osm_lcm/lcm.py @@ -21,6 +21,7 @@ # DEBUG WITH PDB import pdb +import os import asyncio import yaml import logging @@ -294,7 +295,7 @@ class Lcm: wait_time = 2 if not first_start else 5 await asyncio.sleep(wait_time) - def kafka_read_callback(self, topic, command, params): + async def kafka_read_callback(self, topic, command, params): order_id = 1 if topic != "admin" and command != "ping": @@ -332,6 +333,44 @@ class Lcm: ) ) return + elif topic == "nslcmops": + if command == "cancel": + nslcmop_id = params["_id"] + self.logger.debug("Cancelling nslcmop {}".format(nslcmop_id)) + nsr_id = params["nsInstanceId"] + # cancel the tasks and wait + for task in self.lcm_tasks.cancel("ns", nsr_id, nslcmop_id): + try: + await task + self.logger.debug( + "Cancelled task ended {},{},{}".format( + nsr_id, nslcmop_id, task + ) + ) + except asyncio.CancelledError: + self.logger.debug( + "Task already cancelled and finished {},{},{}".format( + nsr_id, nslcmop_id, task + ) + ) + # update DB + q_filter = {"_id": nslcmop_id} + update_dict = { + "operationState": "FAILED_TEMP", + "isCancelPending": False, + } + unset_dict = { + "cancelMode": None, + } + self.db.set_one( + "nslcmops", + q_filter=q_filter, + update_dict=update_dict, + fail_on_empty=False, + unset=unset_dict, + ) + self.logger.debug("LCM task cancelled {},{}".format(nsr_id, nslcmop_id)) + return elif topic == "pla": if command == "placement": self.ns.update_nsrs_with_pla_result(params) @@ -650,7 +689,6 @@ class Lcm: self.logger.debug( "Task kafka_read Enter with worker_id={}".format(self.worker_id) ) - # future = asyncio.Future() self.consecutive_errors = 0 self.first_start = True while self.consecutive_errors < 10: @@ -665,15 +703,18 @@ class Lcm: "vca", "k8srepo", "pla", + "nslcmops", ) topics_admin = ("admin",) await asyncio.gather( self.msg.aioread( - topics, self.kafka_read_callback, from_beginning=True + topics, + aiocallback=self.kafka_read_callback, + from_beginning=True, ), self.msg_admin.aioread( topics_admin, - self.kafka_read_callback, + aiocallback=self.kafka_read_callback, group_id=False, ), ) @@ -698,7 +739,6 @@ class Lcm: wait_time = 2 if not self.first_start else 5 await asyncio.sleep(wait_time) - # self.logger.debug("Task kafka_read terminating") self.logger.debug("Task kafka_read exit") async def kafka_read_ping(self): @@ -841,6 +881,7 @@ if __name__ == "__main__": file=sys.stderr, ) exit(1) + config_file = os.path.realpath(os.path.normpath(os.path.abspath(config_file))) lcm = Lcm(config_file) asyncio.run(lcm.start()) except (LcmException, getopt.GetoptError) as e: