Ubuntu 22.04 and Python 3.10 preparation
[osm/LCM.git] / osm_lcm / lcm.py
index 5638943..1edc960 100644 (file)
@@ -63,7 +63,6 @@ min_common_version = "0.1.19"
 
 
 class Lcm:
-
     ping_interval_pace = (
         120  # how many time ping is send once is confirmed all is running
     )
@@ -71,7 +70,7 @@ class Lcm:
 
     main_config = LcmCfg()
 
-    def __init__(self, config_file, loop=None):
+    def __init__(self, config_file):
         """
         Init, Connect to database, filesystem storage, and messaging
         :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
@@ -97,7 +96,6 @@ class Lcm:
         self.logger.critical("Loaded configuration:" + str(self.main_config.to_dict()))
         # TODO: check if lcm_hc.py is necessary
         self.health_check_file = get_health_check_file(self.main_config.to_dict())
-        self.loop = loop or asyncio.get_event_loop()
         self.ns = (
             self.netslice
         ) = (
@@ -169,7 +167,7 @@ class Lcm:
 
             # copy message configuration in order to remove 'group_id' for msg_admin
             config_message = self.main_config.message.to_dict()
-            config_message["loop"] = self.loop
+            config_message["loop"] = asyncio.get_event_loop()
             if config_message["driver"] == "local":
                 self.msg = msglocal.MsgLocal()
                 self.msg.connect(config_message)
@@ -206,12 +204,12 @@ class Lcm:
                 # try new  RO, if fail old RO
                 try:
                     self.main_config.RO.uri = ro_uri + "ro"
-                    ro_server = NgRoClient(self.loop, **self.main_config.RO.to_dict())
+                    ro_server = NgRoClient(**self.main_config.RO.to_dict())
                     ro_version = await ro_server.get_version()
                     self.main_config.RO.ng = True
                 except Exception:
                     self.main_config.RO.uri = ro_uri + "openmano"
-                    ro_server = ROClient(self.loop, **self.main_config.RO.to_dict())
+                    ro_server = ROClient(**self.main_config.RO.to_dict())
                     ro_version = await ro_server.get_version()
                     self.main_config.RO.ng = False
                 if versiontuple(ro_version) < versiontuple(min_RO_version):
@@ -263,7 +261,6 @@ class Lcm:
                         "worker_id": self.worker_id,
                         "version": lcm_version,
                     },
-                    self.loop,
                 )
                 # time between pings are low when it is not received and at starting
                 wait_time = (
@@ -274,7 +271,7 @@ class Lcm:
                 if not self.pings_not_received:
                     kafka_has_received = True
                 self.pings_not_received += 1
-                await asyncio.sleep(wait_time, loop=self.loop)
+                await asyncio.sleep(wait_time)
                 if self.pings_not_received > 10:
                     raise LcmException("It is not receiving pings from Kafka bus")
                 consecutive_errors = 0
@@ -296,7 +293,7 @@ class Lcm:
                     "Task kafka_read retrying after Exception {}".format(e)
                 )
                 wait_time = 2 if not first_start else 5
-                await asyncio.sleep(wait_time, loop=self.loop)
+                await asyncio.sleep(wait_time)
 
     def kafka_read_callback(self, topic, command, params):
         order_id = 1
@@ -318,7 +315,7 @@ class Lcm:
             sys.stdout.flush()
             return
         elif command == "test":
-            asyncio.Task(self.test(params), loop=self.loop)
+            asyncio.Task(self.test(params))
             return
 
         if topic == "admin":
@@ -348,6 +345,13 @@ class Lcm:
                     "k8scluster", k8scluster_id, order_id, "k8scluster_create", task
                 )
                 return
+            elif command == "edit" or command == "edited":
+                k8scluster_id = params.get("_id")
+                task = asyncio.ensure_future(self.k8scluster.edit(params, order_id))
+                self.lcm_tasks.register(
+                    "k8scluster", k8scluster_id, order_id, "k8scluster_edit", task
+                )
+                return
             elif command == "delete" or command == "deleted":
                 k8scluster_id = params.get("_id")
                 task = asyncio.ensure_future(self.k8scluster.delete(params, order_id))
@@ -361,6 +365,11 @@ class Lcm:
                 task = asyncio.ensure_future(self.vca.create(params, order_id))
                 self.lcm_tasks.register("vca", vca_id, order_id, "vca_create", task)
                 return
+            elif command == "edit" or command == "edited":
+                vca_id = params.get("_id")
+                task = asyncio.ensure_future(self.vca.edit(params, order_id))
+                self.lcm_tasks.register("vca", vca_id, order_id, "vca_edit", task)
+                return
             elif command == "delete" or command == "deleted":
                 vca_id = params.get("_id")
                 task = asyncio.ensure_future(self.vca.delete(params, order_id))
@@ -480,7 +489,7 @@ class Lcm:
                             db_nsr["config-status"],
                             db_nsr["detailed-status"],
                             db_nsr["_admin"]["deployed"],
-                            self.lcm_ns_tasks.get(nsr_id),
+                            self.lcm_tasks.task_registry["ns"].get(nsr_id, ""),
                         )
                     )
                 except Exception as e:
@@ -542,7 +551,7 @@ class Lcm:
                             db_nsir["config-status"],
                             db_nsir["detailed-status"],
                             db_nsir["_admin"]["deployed"],
-                            self.lcm_netslice_tasks.get(nsir_id),
+                            self.lcm_tasks.task_registry["nsi"].get(nsir_id, ""),
                         )
                     )
                 except Exception as e:
@@ -661,11 +670,10 @@ class Lcm:
                 topics_admin = ("admin",)
                 await asyncio.gather(
                     self.msg.aioread(
-                        topics, self.loop, self.kafka_read_callback, from_beginning=True
+                        topics, self.kafka_read_callback, from_beginning=True
                     ),
                     self.msg_admin.aioread(
                         topics_admin,
-                        self.loop,
                         self.kafka_read_callback,
                         group_id=False,
                     ),
@@ -689,43 +697,35 @@ class Lcm:
                     "Task kafka_read retrying after Exception {}".format(e)
                 )
                 wait_time = 2 if not self.first_start else 5
-                await asyncio.sleep(wait_time, loop=self.loop)
+                await asyncio.sleep(wait_time)
 
         # self.logger.debug("Task kafka_read terminating")
         self.logger.debug("Task kafka_read exit")
 
-    def start(self):
+    async def kafka_read_ping(self):
+        await asyncio.gather(self.kafka_read(), self.kafka_ping())
 
+    def start(self):
         # check RO version
-        self.loop.run_until_complete(self.check_RO_version())
+        asyncio.run(self.check_RO_version())
 
-        self.ns = ns.NsLcm(self.msg, self.lcm_tasks, self.main_config, self.loop)
+        self.ns = ns.NsLcm(self.msg, self.lcm_tasks, self.main_config)
         # TODO: modify the rest of classes to use the LcmCfg object instead of dicts
         self.netslice = netslice.NetsliceLcm(
-            self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop, self.ns
-        )
-        self.vim = vim_sdn.VimLcm(
-            self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
-        )
-        self.wim = vim_sdn.WimLcm(
-            self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
-        )
-        self.sdn = vim_sdn.SdnLcm(
-            self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
+            self.msg, self.lcm_tasks, self.main_config.to_dict(), self.ns
         )
+        self.vim = vim_sdn.VimLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
+        self.wim = vim_sdn.WimLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
+        self.sdn = vim_sdn.SdnLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
         self.k8scluster = vim_sdn.K8sClusterLcm(
-            self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
-        )
-        self.vca = vim_sdn.VcaLcm(
-            self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
+            self.msg, self.lcm_tasks, self.main_config.to_dict()
         )
+        self.vca = vim_sdn.VcaLcm(self.msg, self.lcm_tasks, self.main_config.to_dict())
         self.k8srepo = vim_sdn.K8sRepoLcm(
-            self.msg, self.lcm_tasks, self.main_config.to_dict(), self.loop
+            self.msg, self.lcm_tasks, self.main_config.to_dict()
         )
 
-        self.loop.run_until_complete(
-            asyncio.gather(self.kafka_read(), self.kafka_ping())
-        )
+        asyncio.run(self.kafka_read_ping())
 
         # TODO
         # self.logger.debug("Terminating cancelling creation tasks")
@@ -733,12 +733,10 @@ class Lcm:
         # timeout = 200
         # while self.is_pending_tasks():
         #     self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
-        #     await asyncio.sleep(2, loop=self.loop)
+        #     await asyncio.sleep(2)
         #     timeout -= 2
         #     if not timeout:
         #         self.lcm_tasks.cancel("ALL", "ALL")
-        self.loop.close()
-        self.loop = None
         if self.db:
             self.db.db_disconnect()
         if self.msg:
@@ -792,7 +790,6 @@ def usage():
 
 
 if __name__ == "__main__":
-
     try:
         # print("SYS.PATH='{}'".format(sys.path))
         # load parameters and configuration