X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_lcm%2Flcm.py;h=9b62d82eef07a337eebb1445c675f0de256919ff;hb=b6049d378301d13e0ddaadb8433b8ad20f4f8a23;hp=2fc479feb3da55673ee9632f3950a1989eba26ab;hpb=1addc93e479dcb97fdfecc74606559d9897217ec;p=osm%2FLCM.git diff --git a/osm_lcm/lcm.py b/osm_lcm/lcm.py index 2fc479f..9b62d82 100644 --- a/osm_lcm/lcm.py +++ b/osm_lcm/lcm.py @@ -19,7 +19,6 @@ # DEBUG WITH PDB -import os import pdb import asyncio @@ -28,6 +27,7 @@ import logging import logging.handlers import getopt import sys +from random import SystemRandom from osm_lcm import ns, vim_sdn, netslice from osm_lcm.ng_ro import NgRoException, NgRoClient @@ -46,12 +46,11 @@ from osm_lcm.data_utils.database.database import Database from osm_lcm.data_utils.filesystem.filesystem import Filesystem from osm_lcm.data_utils.lcm_config import LcmCfg from osm_lcm.lcm_hc import get_health_check_file -from os import path -from random import choice as random_choice +from os import path, getenv from n2vc import version as n2vc_version import traceback -if os.getenv("OSMLCM_PDB_DEBUG", None) is not None: +if getenv("OSMLCM_PDB_DEBUG", None) is not None: pdb.set_trace() @@ -295,7 +294,7 @@ class Lcm: wait_time = 2 if not first_start else 5 await asyncio.sleep(wait_time) - def kafka_read_callback(self, topic, command, params): + async def kafka_read_callback(self, topic, command, params): order_id = 1 if topic != "admin" and command != "ping": @@ -333,6 +332,44 @@ class Lcm: ) ) return + elif topic == "nslcmops": + if command == "cancel": + nslcmop_id = params["_id"] + self.logger.debug("Cancelling nslcmop {}".format(nslcmop_id)) + nsr_id = params["nsInstanceId"] + # cancel the tasks and wait + for task in self.lcm_tasks.cancel("ns", nsr_id, nslcmop_id): + try: + await task + self.logger.debug( + "Cancelled task ended {},{},{}".format( + nsr_id, nslcmop_id, task + ) + ) + except asyncio.CancelledError: + self.logger.debug( + "Task already cancelled and finished {},{},{}".format( + nsr_id, nslcmop_id, task + ) + ) + # update DB + q_filter = {"_id": nslcmop_id} + update_dict = { + "operationState": "FAILED_TEMP", + "isCancelPending": False, + } + unset_dict = { + "cancelMode": None, + } + self.db.set_one( + "nslcmops", + q_filter=q_filter, + update_dict=update_dict, + fail_on_empty=False, + unset=unset_dict, + ) + self.logger.debug("LCM task cancelled {},{}".format(nsr_id, nslcmop_id)) + return elif topic == "pla": if command == "placement": self.ns.update_nsrs_with_pla_result(params) @@ -651,7 +688,6 @@ class Lcm: self.logger.debug( "Task kafka_read Enter with worker_id={}".format(self.worker_id) ) - # future = asyncio.Future() self.consecutive_errors = 0 self.first_start = True while self.consecutive_errors < 10: @@ -666,15 +702,18 @@ class Lcm: "vca", "k8srepo", "pla", + "nslcmops", ) topics_admin = ("admin",) await asyncio.gather( self.msg.aioread( - topics, self.kafka_read_callback, from_beginning=True + topics, + aiocallback=self.kafka_read_callback, + from_beginning=True, ), self.msg_admin.aioread( topics_admin, - self.kafka_read_callback, + aiocallback=self.kafka_read_callback, group_id=False, ), ) @@ -699,7 +738,6 @@ class Lcm: wait_time = 2 if not self.first_start else 5 await asyncio.sleep(wait_time) - # self.logger.debug("Task kafka_read terminating") self.logger.debug("Task kafka_read exit") async def kafka_read_ping(self): @@ -761,18 +799,22 @@ class Lcm: will provide a random one :return: Obtained ID """ - # Try getting docker id. If fails, get pid - try: - with open("/proc/self/cgroup", "r") as f: - text_id_ = f.readline() - _, _, text_id = text_id_.rpartition("/") - text_id = text_id.replace("\n", "")[:12] - if text_id: - return text_id - except Exception: - pass - # Return a random id - return "".join(random_choice("0123456789abcdef") for _ in range(12)) + + def get_docker_id(): + try: + with open("/proc/self/cgroup", "r") as f: + text_id_ = f.readline() + _, _, text_id = text_id_.rpartition("/") + return text_id.replace("\n", "")[:12] + except Exception: + return None + + def generate_random_id(): + return "".join(SystemRandom().choice("0123456789abcdef") for _ in range(12)) + + # Try getting docker id. If it fails, generate a random id + docker_id = get_docker_id() + return docker_id if docker_id else generate_random_id() def usage(): @@ -813,14 +855,9 @@ if __name__ == "__main__": from osm_lcm.lcm_hc import health_check health_check(config_file, Lcm.ping_interval_pace) - # elif o == "--log-socket-port": - # log_socket_port = a - # elif o == "--log-socket-host": - # log_socket_host = a - # elif o == "--log-file": - # log_file = a else: - assert False, "Unhandled option" + print(f"Unhandled option: {o}") + exit(1) if config_file: if not path.isfile(config_file):