Network Slice Manager: Instantiate and Terminate actions
[osm/LCM.git] / osm_lcm / lcm.py
index 1e6bba1..d4a09db 100644 (file)
@@ -10,6 +10,7 @@ import sys
 import ROclient
 import ns
 import vim_sdn
+import netslice
 from lcm_utils import versiontuple, LcmException, TaskRegistry
 
 # from osm_lcm import version as lcm_version, version_date as lcm_version_date, ROclient
@@ -25,14 +26,17 @@ from n2vc import version as n2vc_version
 __author__ = "Alfonso Tierno"
 min_RO_version = [0, 5, 72]
 min_n2vc_version = "0.0.2"
-min_common_version = "0.1.7"
+min_common_version = "0.1.11"
 # uncomment if LCM is installed as library and installed, and get them from __init__.py
-lcm_version = '0.1.18'
-lcm_version_date = '2018-10-11'
+lcm_version = '0.1.23'
+lcm_version_date = '2018-11-13'
 
 
 class Lcm:
 
+    ping_interval_pace = 120  # how many time ping is send once is confirmed all is running
+    ping_interval_boot = 5    # how many time ping is sent when booting
+
     def __init__(self, config_file, loop=None):
         """
         Init, Connect to database, filesystem storage, and messaging
@@ -102,7 +106,7 @@ class Lcm:
             raise LcmException("Not compatible osm/N2VC version '{}'. Needed '{}' or higher".format(
                 n2vc_version, min_n2vc_version))
         # check version of common
-        if versiontuple(common_version) < versiontuple("0.1.7"):
+        if versiontuple(common_version) < versiontuple(min_common_version):
             raise LcmException("Not compatible osm/common version '{}'. Needed '{}' or higher".format(
                 common_version, min_common_version))
 
@@ -139,6 +143,8 @@ class Lcm:
             raise LcmException(str(e))
 
         self.ns = ns.NsLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config, self.vca_config, self.loop)
+        self.netslice = netslice.NetsliceLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config, 
+                                             self.vca_config, self.loop)
         self.vim = vim_sdn.VimLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config, self.loop)
         self.sdn = vim_sdn.SdnLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config, self.loop)
 
@@ -168,7 +174,7 @@ class Lcm:
             try:
                 await self.msg.aiowrite("admin", "ping", {"from": "lcm", "to": "lcm"}, self.loop)
                 # time between pings are low when it is not received and at starting
-                wait_time = 5 if not kafka_has_received else 120
+                wait_time = self.ping_interval_boot if not kafka_has_received else self.ping_interval_pace
                 if not self.pings_not_received:
                     kafka_has_received = True
                 self.pings_not_received += 1
@@ -198,7 +204,7 @@ class Lcm:
         first_start = True
         while consecutive_errors < 10:
             try:
-                topics = ("admin", "ns", "vim_account", "sdn")
+                topics = ("admin", "ns", "vim_account", "sdn", "nsi")
                 topic, command, params = await self.msg.aioread(topics, self.loop)
                 if topic != "admin" and command != "ping":
                     self.logger.debug("Task kafka_read receives {} {}: {}".format(topic, command, params))
@@ -273,6 +279,40 @@ class Lcm:
                         continue  # TODO cleaning of task just in case should be done
                     elif command in ("terminated", "instantiated", "scaled", "actioned"):  # "scaled-cooldown-time"
                         continue
+                elif topic == "nsi":  # netslice LCM processes (instantiate, terminate, etc)
+                    if command == "instantiate":
+                        # self.logger.debug("Instantiating Network Slice {}".format(nsilcmop["netsliceInstanceId"]))
+                        nsilcmop = params
+                        nsilcmop_id = nsilcmop["_id"]                               # slice operation id
+                        nsir_id = nsilcmop["netsliceInstanceId"]                    # slice record id
+                        task = asyncio.ensure_future(self.netslice.instantiate(nsir_id, nsilcmop_id))
+                        self.lcm_tasks.register("nsi", nsir_id, nsilcmop_id, "nsi_instantiate", task)
+                        continue
+                    elif command == "terminate":
+                        # self.logger.debug("Terminating Network Slice NS {}".format(nsilcmop["netsliceInstanceId"]))
+                        nsilcmop = params
+                        nsilcmop_id = nsilcmop["_id"]                               # slice operation id
+                        nsir_id = nsilcmop["netsliceInstanceId"]                    # slice record id
+                        self.lcm_tasks.cancel(topic, nsir_id)
+                        task = asyncio.ensure_future(self.netslice.terminate(nsir_id, nsilcmop_id))
+                        self.lcm_tasks.register("nsi", nsir_id, nsilcmop_id, "nsi_terminate", task)
+                        continue
+                    elif command == "show":
+                        try:
+                            db_nsir = self.db.get_one("nsirs", {"_id": nsir_id})
+                            print("nsir:\n    _id={}\n    operational-status: {}\n    config-status: {}"
+                                  "\n    detailed-status: {}\n    deploy: {}\n    tasks: {}"
+                                  "".format(nsir_id, db_nsir["operational-status"], db_nsir["config-status"],
+                                            db_nsir["detailed-status"],
+                                            db_nsir["_admin"]["deployed"], self.lcm_netslice_tasks.get(nsir_id)))
+                        except Exception as e:
+                            print("nsir {} not found: {}".format(nsir_id, e))
+                        sys.stdout.flush()
+                        continue
+                    elif command == "deleted":
+                        continue  # TODO cleaning of task just in case should be done
+                    elif command in ("terminated", "instantiated", "scaled", "actioned"):  # "scaled-cooldown-time"
+                        continue
                 elif topic == "vim_account":
                     vim_id = params["_id"]
                     if command == "create":
@@ -322,6 +362,27 @@ class Lcm:
         # self.logger.debug("Task kafka_read terminating")
         self.logger.debug("Task kafka_read exit")
 
+    def health_check(self):
+
+        global exit_code
+        task = None
+        exit_code = 1
+
+        def health_check_callback(topic, command, params):
+            global exit_code
+            print("receiving callback {} {} {}".format(topic, command, params))
+            if topic == "admin" and command == "ping" and params["to"] == "lcm" and params["from"] == "lcm":
+                # print("received LCM ping")
+                exit_code = 0
+                task.cancel()
+
+        try:
+            task = asyncio.ensure_future(self.msg.aioread(("admin",), self.loop, health_check_callback))
+            self.loop.run_until_complete(task)
+        except Exception:
+            pass
+        exit(exit_code)
+
     def start(self):
 
         # check RO version
@@ -361,12 +422,14 @@ class Lcm:
                 if not k.startswith("OSMLCM_"):
                     continue
                 k_items = k.lower().split("_")
+                if len(k_items) < 3:
+                    continue
+                if k_items[1] in ("ro", "vca"):
+                    # put in capital letter
+                    k_items[1] = k_items[1].upper()
                 c = conf
                 try:
                     for k_item in k_items[1:-1]:
-                        if k_item in ("ro", "vca"):
-                            # put in capital letter
-                            k_item = k_item.upper()
                         c = c[k_item]
                     if k_items[-1] == "port":
                         c[k_items[-1]] = int(v)
@@ -384,6 +447,7 @@ class Lcm:
 def usage():
     print("""Usage: {} [options]
         -c|--config [configuration_file]: loads the configuration file (default: ./nbi.cfg)
+        --health-check: do not run lcm, but inspect kafka bus to determine if lcm is healthy
         -h|--help: shows this help
         """.format(sys.argv[0]))
     # --log-socket-host HOST: send logs to this host")
@@ -393,15 +457,18 @@ def usage():
 if __name__ == '__main__':
     try:
         # load parameters and configuration
-        opts, args = getopt.getopt(sys.argv[1:], "hc:", ["config=", "help"])
+        opts, args = getopt.getopt(sys.argv[1:], "hc:", ["config=", "help", "health-check"])
         # TODO add  "log-socket-host=", "log-socket-port=", "log-file="
         config_file = None
+        health_check = None
         for o, a in opts:
             if o in ("-h", "--help"):
                 usage()
                 sys.exit()
             elif o in ("-c", "--config"):
                 config_file = a
+            elif o == "--health-check":
+                health_check = True
             # elif o == "--log-socket-port":
             #     log_socket_port = a
             # elif o == "--log-socket-host":
@@ -412,17 +479,20 @@ if __name__ == '__main__':
                 assert False, "Unhandled option"
         if config_file:
             if not path.isfile(config_file):
-                print("configuration file '{}' that not exist".format(config_file), file=sys.stderr)
+                print("configuration file '{}' not exist".format(config_file), file=sys.stderr)
                 exit(1)
         else:
             for config_file in (__file__[:__file__.rfind(".")] + ".cfg", "./lcm.cfg", "/etc/osm/lcm.cfg"):
                 if path.isfile(config_file):
                     break
             else:
-                print("No configuration file 'nbi.cfg' found neither at local folder nor at /etc/osm/", file=sys.stderr)
+                print("No configuration file 'lcm.cfg' found neither at local folder nor at /etc/osm/", file=sys.stderr)
                 exit(1)
         lcm = Lcm(config_file)
-        lcm.start()
+        if health_check:
+            lcm.health_check()
+        else:
+            lcm.start()
     except (LcmException, getopt.GetoptError) as e:
         print(str(e), file=sys.stderr)
         # usage()