Pin pylint version to 3.1.1 in tox.ini
[osm/LCM.git] / osm_lcm / lcm.py
index e6c4e19..9b62d82 100644 (file)
@@ -19,7 +19,6 @@
 
 
 # DEBUG WITH PDB
-import os
 import pdb
 
 import asyncio
@@ -28,6 +27,7 @@ import logging
 import logging.handlers
 import getopt
 import sys
+from random import SystemRandom
 
 from osm_lcm import ns, vim_sdn, netslice
 from osm_lcm.ng_ro import NgRoException, NgRoClient
@@ -46,12 +46,11 @@ from osm_lcm.data_utils.database.database import Database
 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
 from osm_lcm.data_utils.lcm_config import LcmCfg
 from osm_lcm.lcm_hc import get_health_check_file
-from os import path
-from random import choice as random_choice
+from os import path, getenv
 from n2vc import version as n2vc_version
 import traceback
 
-if os.getenv("OSMLCM_PDB_DEBUG", None) is not None:
+if getenv("OSMLCM_PDB_DEBUG", None) is not None:
     pdb.set_trace()
 
 
@@ -63,7 +62,6 @@ min_common_version = "0.1.19"
 
 
 class Lcm:
-
     ping_interval_pace = (
         120  # how many time ping is send once is confirmed all is running
     )
@@ -71,7 +69,7 @@ class Lcm:
 
     main_config = LcmCfg()
 
-    def __init__(self, config_file, loop=None):
+    def __init__(self, config_file):
         """
         Init, Connect to database, filesystem storage, and messaging
         :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
@@ -97,7 +95,6 @@ class Lcm:
         self.logger.critical("Loaded configuration:" + str(self.main_config.to_dict()))
         # TODO: check if lcm_hc.py is necessary
         self.health_check_file = get_health_check_file(self.main_config.to_dict())
-        self.loop = loop or asyncio.get_event_loop()
         self.ns = (
             self.netslice
         ) = (
@@ -169,7 +166,7 @@ class Lcm:
 
             # copy message configuration in order to remove 'group_id' for msg_admin
             config_message = self.main_config.message.to_dict()
-            config_message["loop"] = self.loop
+            config_message["loop"] = asyncio.get_event_loop()
             if config_message["driver"] == "local":
                 self.msg = msglocal.MsgLocal()
                 self.msg.connect(config_message)
@@ -206,12 +203,12 @@ class Lcm:
                 # try new  RO, if fail old RO
                 try:
                     self.main_config.RO.uri = ro_uri + "ro"
-                    ro_server = NgRoClient(self.loop, **self.main_config.RO.to_dict())
+                    ro_server = NgRoClient(**self.main_config.RO.to_dict())
                     ro_version = await ro_server.get_version()
                     self.main_config.RO.ng = True
                 except Exception:
                     self.main_config.RO.uri = ro_uri + "openmano"
-                    ro_server = ROClient(self.loop, **self.main_config.RO.to_dict())
+                    ro_server = ROClient(**self.main_config.RO.to_dict())
                     ro_version = await ro_server.get_version()
                     self.main_config.RO.ng = False
                 if versiontuple(ro_version) < versiontuple(min_RO_version):
@@ -263,7 +260,6 @@ class Lcm:
                         "worker_id": self.worker_id,
                         "version": lcm_version,
                     },
-                    self.loop,
                 )
                 # time between pings are low when it is not received and at starting
                 wait_time = (
@@ -274,7 +270,7 @@ class Lcm:
                 if not self.pings_not_received:
                     kafka_has_received = True
                 self.pings_not_received += 1
-                await asyncio.sleep(wait_time, loop=self.loop)
+                await asyncio.sleep(wait_time)
                 if self.pings_not_received > 10:
                     raise LcmException("It is not receiving pings from Kafka bus")
                 consecutive_errors = 0
@@ -296,9 +292,9 @@ class Lcm:
                     "Task kafka_read retrying after Exception {}".format(e)
                 )
                 wait_time = 2 if not first_start else 5
-                await asyncio.sleep(wait_time, loop=self.loop)
+                await asyncio.sleep(wait_time)
 
-    def kafka_read_callback(self, topic, command, params):
+    async def kafka_read_callback(self, topic, command, params):
         order_id = 1
 
         if topic != "admin" and command != "ping":
@@ -318,7 +314,7 @@ class Lcm:
             sys.stdout.flush()
             return
         elif command == "test":
-            asyncio.Task(self.test(params), loop=self.loop)
+            asyncio.Task(self.test(params))
             return
 
         if topic == "admin":
@@ -336,6 +332,44 @@ class Lcm:
                         )
                     )
             return
+        elif topic == "nslcmops":
+            if command == "cancel":
+                nslcmop_id = params["_id"]
+                self.logger.debug("Cancelling nslcmop {}".format(nslcmop_id))
+                nsr_id = params["nsInstanceId"]
+                # cancel the tasks and wait
+                for task in self.lcm_tasks.cancel("ns", nsr_id, nslcmop_id):
+                    try:
+                        await task
+                        self.logger.debug(
+                            "Cancelled task ended {},{},{}".format(
+                                nsr_id, nslcmop_id, task
+                            )
+                        )
+                    except asyncio.CancelledError:
+                        self.logger.debug(
+                            "Task already cancelled and finished {},{},{}".format(
+                                nsr_id, nslcmop_id, task
+                            )
+                        )
+                # update DB
+                q_filter = {"_id": nslcmop_id}
+                update_dict = {
+                    "operationState": "FAILED_TEMP",
+                    "isCancelPending": False,
+                }
+                unset_dict = {
+                    "cancelMode": None,
+                }
+                self.db.set_one(
+                    "nslcmops",
+                    q_filter=q_filter,
+                    update_dict=update_dict,
+                    fail_on_empty=False,
+                    unset=unset_dict,
+                )
+                self.logger.debug("LCM task cancelled {},{}".format(nsr_id, nslcmop_id))
+            return
         elif topic == "pla":
             if command == "placement":
                 self.ns.update_nsrs_with_pla_result(params)
@@ -348,6 +382,13 @@ class Lcm:
                     "k8scluster", k8scluster_id, order_id, "k8scluster_create", task
                 )
                 return
+            elif command == "edit" or command == "edited":
+                k8scluster_id = params.get("_id")
+                task = asyncio.ensure_future(self.k8scluster.edit(params, order_id))
+                self.lcm_tasks.register(
+                    "k8scluster", k8scluster_id, order_id, "k8scluster_edit", task
+                )
+                return
             elif command == "delete" or command == "deleted":
                 k8scluster_id = params.get("_id")
                 task = asyncio.ensure_future(self.k8scluster.delete(params, order_id))
@@ -361,6 +402,11 @@ class Lcm:
                 task = asyncio.ensure_future(self.vca.create(params, order_id))
                 self.lcm_tasks.register("vca", vca_id, order_id, "vca_create", task)
                 return
+            elif command == "edit" or command == "edited":
+                vca_id = params.get("_id")
+                task = asyncio.ensure_future(self.vca.edit(params, order_id))
+                self.lcm_tasks.register("vca", vca_id, order_id, "vca_edit", task)
+                return
             elif command == "delete" or command == "deleted":
                 vca_id = params.get("_id")
                 task = asyncio.ensure_future(self.vca.delete(params, order_id))
@@ -642,7 +688,6 @@ class Lcm:
         self.logger.debug(
             "Task kafka_read Enter with worker_id={}".format(self.worker_id)
         )
-        # future = asyncio.Future()
         self.consecutive_errors = 0
         self.first_start = True
         while self.consecutive_errors < 10:
@@ -657,16 +702,18 @@ class Lcm:
                     "vca",
                     "k8srepo",
                     "pla",
+                    "nslcmops",
                 )
                 topics_admin = ("admin",)
                 await asyncio.gather(
                     self.msg.aioread(
-                        topics, self.loop, self.kafka_read_callback, from_beginning=True
+                        topics,
+                        aiocallback=self.kafka_read_callback,
+                        from_beginning=True,
                     ),
                     self.msg_admin.aioread(
                         topics_admin,
-                        self.loop,
-                        self.kafka_read_callback,
+                        aiocallback=self.kafka_read_callback,
                         group_id=False,
                     ),
                 )
@@ -689,43 +736,34 @@ class Lcm:
                     "Task kafka_read retrying after Exception {}".format(e)
                 )
                 wait_time = 2 if not self.first_start else 5
-                await asyncio.sleep(wait_time, loop=self.loop)
+                await asyncio.sleep(wait_time)
 
-        # self.logger.debug("Task kafka_read terminating")
         self.logger.debug("Task kafka_read exit")
 
-    def start(self):
+    async def kafka_read_ping(self):
+        await asyncio.gather(self.kafka_read(), self.kafka_ping())
 
+    async def start(self):
         # check RO version
-        self.loop.run_until_complete(self.check_RO_version())
+        await self.check_RO_version()
 
-        self.ns = ns.NsLcm(self.msg, self.lcm_tasks, self.main_config, self.loop)
+        self.ns = ns.NsLcm(self.msg, self.lcm_tasks, self.main_config)
         # TODO: modify the rest of classes to use the LcmCfg object instead of dicts
         self.netslice = netslice.NetsliceLcm(
-            self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop, self.ns
-        )
-        self.vim = vim_sdn.VimLcm(
-            self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
-        )
-        self.wim = vim_sdn.WimLcm(
-            self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
-        )
-        self.sdn = vim_sdn.SdnLcm(
-            self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
+            self.msg, self.lcm_tasks, self.main_config.to_dict(), self.ns
         )
+        self.vim = vim_sdn.VimLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
+        self.wim = vim_sdn.WimLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
+        self.sdn = vim_sdn.SdnLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
         self.k8scluster = vim_sdn.K8sClusterLcm(
-            self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
-        )
-        self.vca = vim_sdn.VcaLcm(
-            self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
+            self.msg, self.lcm_tasks, self.main_config.to_dict()
         )
+        self.vca = vim_sdn.VcaLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
         self.k8srepo = vim_sdn.K8sRepoLcm(
-            self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
+            self.msg, self.lcm_tasks, self.main_config.to_dict()
         )
 
-        self.loop.run_until_complete(
-            asyncio.gather(self.kafka_read(), self.kafka_ping())
-        )
+        await self.kafka_read_ping()
 
         # TODO
         # self.logger.debug("Terminating cancelling creation tasks")
@@ -733,12 +771,10 @@ class Lcm:
         # timeout = 200
         # while self.is_pending_tasks():
         #     self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
-        #     await asyncio.sleep(2, loop=self.loop)
+        #     await asyncio.sleep(2)
         #     timeout -= 2
         #     if not timeout:
         #         self.lcm_tasks.cancel("ALL", "ALL")
-        self.loop.close()
-        self.loop = None
         if self.db:
             self.db.db_disconnect()
         if self.msg:
@@ -763,18 +799,22 @@ class Lcm:
         will provide a random one
         :return: Obtained ID
         """
-        # Try getting docker id. If fails, get pid
-        try:
-            with open("/proc/self/cgroup", "r") as f:
-                text_id_ = f.readline()
-                _, _, text_id = text_id_.rpartition("/")
-                text_id = text_id.replace("\n", "")[:12]
-                if text_id:
-                    return text_id
-        except Exception:
-            pass
-        # Return a random id
-        return "".join(random_choice("0123456789abcdef") for _ in range(12))
+
+        def get_docker_id():
+            try:
+                with open("/proc/self/cgroup", "r") as f:
+                    text_id_ = f.readline()
+                    _, _, text_id = text_id_.rpartition("/")
+                    return text_id.replace("\n", "")[:12]
+            except Exception:
+                return None
+
+        def generate_random_id():
+            return "".join(SystemRandom().choice("0123456789abcdef") for _ in range(12))
+
+        # Try getting docker id. If it fails, generate a random id
+        docker_id = get_docker_id()
+        return docker_id if docker_id else generate_random_id()
 
 
 def usage():
@@ -792,7 +832,6 @@ def usage():
 
 
 if __name__ == "__main__":
-
     try:
         # print("SYS.PATH='{}'".format(sys.path))
         # load parameters and configuration
@@ -816,14 +855,9 @@ if __name__ == "__main__":
                 from osm_lcm.lcm_hc import health_check
 
                 health_check(config_file, Lcm.ping_interval_pace)
-            # elif o == "--log-socket-port":
-            #     log_socket_port = a
-            # elif o == "--log-socket-host":
-            #     log_socket_host = a
-            # elif o == "--log-file":
-            #     log_file = a
             else:
-                assert False, "Unhandled option"
+                print(f"Unhandled option: {o}")
+                exit(1)
 
         if config_file:
             if not path.isfile(config_file):
@@ -847,7 +881,7 @@ if __name__ == "__main__":
                 )
                 exit(1)
         lcm = Lcm(config_file)
-        lcm.start()
+        asyncio.run(lcm.start())
     except (LcmException, getopt.GetoptError) as e:
         print(str(e), file=sys.stderr)
         # usage()