Network Slice Manager: Instantiate and Terminate actions 90/6890/2 netslice
authorFelipe Vicens <felipe.vicens@atos.net>
Thu, 15 Nov 2018 14:09:58 +0000 (15:09 +0100)
committerFelipe Vicens <felipe.vicens@atos.net>
Fri, 16 Nov 2018 17:56:15 +0000 (18:56 +0100)
Change-Id: I0c7e815b84f33ca981358ff25e17f9903611e854
Signed-off-by: Felipe Vicens <felipe.vicens@atos.net>
Dockerfile.local
osm_lcm/ROclient.py
osm_lcm/lcm.py
osm_lcm/lcm_utils.py
osm_lcm/netslice.py [new file with mode: 0644]

index 65c7ce1..e5bdb86 100644 (file)
@@ -19,7 +19,7 @@ FROM ubuntu:16.04
 WORKDIR /app/osm_lcm
 
 # Copy the current directory contents into the container at /app
-ADD . /app
+#ADD . /app
 
 RUN apt-get update && apt-get install -y git tox python3 \
     python3-pip python3-aiohttp \
@@ -82,6 +82,8 @@ ENV OSMLCM_MESSAGE_PORT    9092
 ENV OSMLCM_GLOBAL_LOGFILE  /app/log/lcm.log
 ENV OSMLCM_GLOBAL_LOGLEVEL DEBUG
 
+ADD . /app
+
 # Run app.py when the container launches
 CMD ["python3", "lcm.py"]
 
index d0bef33..1fd5f95 100644 (file)
@@ -99,7 +99,7 @@ def remove_envelop(item, indata=None):
 class ROClient:
     headers_req = {'Accept': 'application/yaml', 'content-type': 'application/yaml'}
     client_to_RO = {'tenant': 'tenants', 'vim': 'datacenters', 'vim_account': 'datacenters', 'sdn': 'sdn_controllers',
-                    'vnfd': 'vnfs', 'nsd': 'scenarios',
+                    'vnfd': 'vnfs', 'nsd': 'scenarios', 
                     'ns': 'instances'}
     mandatory_for_create = {
         'tenant': ("name", ),
@@ -757,7 +757,7 @@ class ROClient:
     async def create(self, item, descriptor=None, descriptor_format=None, **kwargs):
         """
         Creates an item from its descriptor
-        :param item: can be 'tenant', 'vnfd', 'nsd', 'ns', 'vim', 'vim_account', 'sdn'
+        :param item: can be 'tenant', 'vnfd', 'nsd', 'ns', 'vim', 'vim_account', 'sdn', nstd
         :param descriptor: can be a dict, or a yaml/json text. Autodetect unless descriptor_format is provided
         :param descriptor_format: Can be 'json' or 'yaml'
         :param kwargs: Overrides descriptor with values as name, description, vim_url, vim_url_admin, vim_type
index 39a254b..d4a09db 100644 (file)
@@ -10,6 +10,7 @@ import sys
 import ROclient
 import ns
 import vim_sdn
+import netslice
 from lcm_utils import versiontuple, LcmException, TaskRegistry
 
 # from osm_lcm import version as lcm_version, version_date as lcm_version_date, ROclient
@@ -142,6 +143,8 @@ class Lcm:
             raise LcmException(str(e))
 
         self.ns = ns.NsLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config, self.vca_config, self.loop)
+        self.netslice = netslice.NetsliceLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config, 
+                                             self.vca_config, self.loop)
         self.vim = vim_sdn.VimLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config, self.loop)
         self.sdn = vim_sdn.SdnLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config, self.loop)
 
@@ -201,7 +204,7 @@ class Lcm:
         first_start = True
         while consecutive_errors < 10:
             try:
-                topics = ("admin", "ns", "vim_account", "sdn")
+                topics = ("admin", "ns", "vim_account", "sdn", "nsi")
                 topic, command, params = await self.msg.aioread(topics, self.loop)
                 if topic != "admin" and command != "ping":
                     self.logger.debug("Task kafka_read receives {} {}: {}".format(topic, command, params))
@@ -276,6 +279,40 @@ class Lcm:
                         continue  # TODO cleaning of task just in case should be done
                     elif command in ("terminated", "instantiated", "scaled", "actioned"):  # "scaled-cooldown-time"
                         continue
+                elif topic == "nsi":  # netslice LCM processes (instantiate, terminate, etc)
+                    if command == "instantiate":
+                        # self.logger.debug("Instantiating Network Slice {}".format(nsilcmop["netsliceInstanceId"]))
+                        nsilcmop = params
+                        nsilcmop_id = nsilcmop["_id"]                               # slice operation id
+                        nsir_id = nsilcmop["netsliceInstanceId"]                    # slice record id
+                        task = asyncio.ensure_future(self.netslice.instantiate(nsir_id, nsilcmop_id))
+                        self.lcm_tasks.register("nsi", nsir_id, nsilcmop_id, "nsi_instantiate", task)
+                        continue
+                    elif command == "terminate":
+                        # self.logger.debug("Terminating Network Slice NS {}".format(nsilcmop["netsliceInstanceId"]))
+                        nsilcmop = params
+                        nsilcmop_id = nsilcmop["_id"]                               # slice operation id
+                        nsir_id = nsilcmop["netsliceInstanceId"]                    # slice record id
+                        self.lcm_tasks.cancel(topic, nsir_id)
+                        task = asyncio.ensure_future(self.netslice.terminate(nsir_id, nsilcmop_id))
+                        self.lcm_tasks.register("nsi", nsir_id, nsilcmop_id, "nsi_terminate", task)
+                        continue
+                    elif command == "show":
+                        try:
+                            db_nsir = self.db.get_one("nsirs", {"_id": nsir_id})
+                            print("nsir:\n    _id={}\n    operational-status: {}\n    config-status: {}"
+                                  "\n    detailed-status: {}\n    deploy: {}\n    tasks: {}"
+                                  "".format(nsir_id, db_nsir["operational-status"], db_nsir["config-status"],
+                                            db_nsir["detailed-status"],
+                                            db_nsir["_admin"]["deployed"], self.lcm_netslice_tasks.get(nsir_id)))
+                        except Exception as e:
+                            print("nsir {} not found: {}".format(nsir_id, e))
+                        sys.stdout.flush()
+                        continue
+                    elif command == "deleted":
+                        continue  # TODO cleaning of task just in case should be done
+                    elif command in ("terminated", "instantiated", "scaled", "actioned"):  # "scaled-cooldown-time"
+                        continue
                 elif topic == "vim_account":
                     vim_id = params["_id"]
                     if command == "create":
index cb74118..2c84356 100644 (file)
@@ -40,6 +40,7 @@ class TaskRegistry:
     def __init__(self):
         self.task_registry = {
             "ns": {},
+            "nsi": {},
             "vim_account": {},
             "sdn": {},
         }
@@ -47,7 +48,7 @@ class TaskRegistry:
     def register(self, topic, _id, op_id, task_name, task):
         """
         Register a new task
-        :param topic: Can be "ns", "vim_account", "sdn"
+        :param topic: Can be "ns", "nsi", "vim_account", "sdn"
         :param _id: _id of the related item
         :param op_id: id of the operation of the related item
         :param task_name: Task descriptive name, as create, instantiate, terminate. Must be unique in this op_id
@@ -65,7 +66,7 @@ class TaskRegistry:
     def remove(self, topic, _id, op_id, task_name=None):
         """
         When task is ended, it should removed. It ignores missing tasks
-        :param topic: Can be "ns", "vim_account", "sdn"
+        :param topic: Can be "ns", "nsi", "vim_account", "sdn"
         :param _id: _id of the related item
         :param op_id: id of the operation of the related item
         :param task_name: Task descriptive name. If note it deletes all
@@ -103,8 +104,8 @@ class TaskRegistry:
 
     def cancel(self, topic, _id, target_op_id=None, target_task_name=None):
         """
-        Cancel all active tasks of a concrete ns, vim_account, sdn identified for _id. If op_id is supplied only this is
-        cancelled, and the same with task_name
+        Cancel all active tasks of a concrete ns, nsi, vim_account, sdn identified for _id. If op_id is supplied only 
+        this is cancelled, and the same with task_name
         """
         if not self.task_registry[topic].get(_id):
             return
diff --git a/osm_lcm/netslice.py b/osm_lcm/netslice.py
new file mode 100644 (file)
index 0000000..aecac77
--- /dev/null
@@ -0,0 +1,300 @@
+#!/usr/bin/python3
+# -*- coding: utf-8 -*-
+
+import asyncio
+import logging
+import logging.handlers
+import traceback
+import ns
+from lcm_utils import LcmException, LcmBase
+from osm_common.dbbase import DbException
+from time import time
+
+__author__ = "Felipe Vicens, Pol Alemany, Alfonso Tierno"
+
+
+class NetsliceLcm(LcmBase):
+
+    def __init__(self, db, msg, fs, lcm_tasks, ro_config, vca_config, loop):
+        """
+        Init, Connect to database, filesystem storage, and messaging
+        :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
+        :return: None
+        """
+        # logging
+        self.logger = logging.getLogger('lcm.netslice')
+        self.loop = loop
+        self.lcm_tasks = lcm_tasks
+        self.ns = ns.NsLcm(db, msg, fs, lcm_tasks, ro_config, vca_config, loop)
+
+        super().__init__(db, msg, fs, self.logger)
+
+    # TODO: check logging_text within the self.logger.info/debug
+    async def instantiate(self, nsir_id, nsilcmop_id):
+        logging_text = "Task netslice={} instantiate={} ".format(nsir_id, nsilcmop_id)
+        self.logger.debug(logging_text + "Enter")
+        # get all needed from database
+        exc = None 
+        db_nsir = None
+        db_nsilcmop = None
+        db_nsir_update = {"_admin.nsilcmop": nsilcmop_id}
+        db_nsilcmop_update = {}
+        nsilcmop_operation_state = None
+
+        try:
+            step = "Getting nsir={} from db".format(nsir_id)
+            db_nsir = self.db.get_one("nsis", {"_id": nsir_id})
+            step = "Getting nsilcmop={} from db".format(nsilcmop_id)
+            db_nsilcmop = self.db.get_one("nsilcmops", {"_id": nsilcmop_id})
+            
+            # look if previous tasks is in process
+            task_name, task_dependency = self.lcm_tasks.lookfor_related("nsi", nsir_id, nsilcmop_id)
+            if task_dependency:
+                step = db_nsilcmop_update["detailed-status"] = \
+                    "Waiting for related tasks to be completed: {}".format(task_name)
+                self.logger.debug(logging_text + step)
+                self.update_db_2("nsilcmops", nsilcmop_id, db_nsilcmop_update)
+                _, pending = await asyncio.wait(task_dependency, timeout=3600)
+                if pending:
+                    raise LcmException("Timeout waiting related tasks to be completed")
+                       
+            # Empty list to keep track of network service records status in the netslice
+            nsir_admin = db_nsir["_admin"]
+
+            nsir_admin["nsrs-detailed-list"] = []
+
+            # Slice status Creating
+            db_nsir_update["detailed-status"] = "creating"
+            db_nsir_update["operational-status"] = "init"
+            self.update_db_2("nsis", nsir_id, db_nsir_update)            
+
+            # Iterate over the network services operation ids to instantiate NSs
+            # TODO: (future improvement) look another way check the tasks instead of keep asking 
+            # -> https://docs.python.org/3/library/asyncio-task.html#waiting-primitives
+            # steps: declare ns_tasks, add task when terminate is called, await asyncio.wait(vca_task_list, timeout=300)
+            # ns_tasks = []
+            nslcmop_ids = db_nsilcmop["operationParams"].get("nslcmops_ids")
+            for nslcmop_id in nslcmop_ids:
+                nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
+                nsr_id = nslcmop.get("nsInstanceId")
+                step = "Launching ns={} instantiate={} task".format(nsr_id, nslcmop)
+                task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id))
+                self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_instantiate", task)
+
+            # Wait until Network Slice is ready
+            step = nsir_status_detailed = " Waiting nsi ready. nsi_id={}".format(nsir_id)
+            nsrs_detailed_list_old = None
+            self.logger.debug(logging_text + step)
+            
+            # TODO: substitute while for await (all task to be done or not)
+            deployment_timeout = 2 * 3600   # Two hours
+            while deployment_timeout > 0:
+                # Check ns instantiation status
+                nsi_ready = True
+                nsrs_detailed_list = []
+                for nslcmop_item in nslcmop_ids:
+                    nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_item})
+                    status = nslcmop.get("operationState")
+                    # TODO: (future improvement) other possible status: ROLLING_BACK,ROLLED_BACK
+                    nsrs_detailed_list.append({"nsrId": nslcmop["nsInstanceId"], "status": nslcmop["operationState"],
+                                               "detailed-status": 
+                                               nsir_status_detailed + "; {}".format(nslcmop.get("detailed-status"))})
+                    if status not in ["COMPLETED", "PARTIALLY_COMPLETED", "FAILED", "FAILED_TEMP"]:  
+                        nsi_ready = False
+
+                if nsrs_detailed_list != nsrs_detailed_list_old:
+                    nsir_admin["nsrs-detailed-list"] = nsrs_detailed_list
+                    nsrs_detailed_list_old = nsrs_detailed_list
+                    db_nsir_update["_admin"] = nsir_admin
+                    self.update_db_2("nsis", nsir_id, db_nsir_update)
+
+                if nsi_ready:
+                    step = "Network Slice Instance is ready. nsi_id={}".format(nsir_id)
+                    for items in nsrs_detailed_list:
+                        if "FAILED" in items.values():
+                            raise LcmException("Error deploying NSI: {}".format(nsir_id))
+                    break
+                # TODO: future improvement due to synchronism -> await asyncio.wait(vca_task_list, timeout=300)
+                await asyncio.sleep(5, loop=self.loop)
+                deployment_timeout -= 5
+            
+            if deployment_timeout <= 0:
+                raise LcmException("Timeout waiting nsi to be ready. nsi_id={}".format(nsir_id))
+
+            db_nsir_update["operational-status"] = "running"
+            db_nsir_update["detailed-status"] = "done"
+            db_nsir_update["config-status"] = "configured"
+            db_nsilcmop_update["operationState"] = nsilcmop_operation_state = "COMPLETED"
+            db_nsilcmop_update["statusEnteredTime"] = time()
+            db_nsilcmop_update["detailed-status"] = "done"
+            return
+
+        except (LcmException, DbException) as e:
+            self.logger.error(logging_text + "Exit Exception while '{}': {}".format(step, e))
+            exc = e
+        except asyncio.CancelledError:
+            self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
+            exc = "Operation was cancelled"
+        except Exception as e:
+            exc = traceback.format_exc()
+            self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e),
+                                 exc_info=True)
+        finally:
+            if exc:
+                if db_nsir:
+                    db_nsir_update["detailed-status"] = "ERROR {}: {}".format(step, exc)
+                    db_nsir_update["operational-status"] = "failed"
+                if db_nsilcmop:
+                    db_nsilcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
+                    db_nsilcmop_update["operationState"] = nsilcmop_operation_state = "FAILED"
+                    db_nsilcmop_update["statusEnteredTime"] = time()
+            if db_nsir:
+                db_nsir_update["_admin.nsiState"] = "INSTANTIATED"
+                db_nsir_update["_admin.nsilcmop"] = None
+                self.update_db_2("nsis", nsir_id, db_nsir_update)
+            if db_nsilcmop:
+
+                self.update_db_2("nsilcmops", nsilcmop_id, db_nsilcmop_update)
+            if nsilcmop_operation_state:
+                try:
+                    await self.msg.aiowrite("nsi", "instantiated", {"nsir_id": nsir_id, "nsilcmop_id": nsilcmop_id,
+                                                                    "operationState": nsilcmop_operation_state})
+                except Exception as e:
+                    self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
+            self.logger.debug(logging_text + "Exit")
+            self.lcm_tasks.remove("nsi", nsir_id, nsilcmop_id, "nsi_instantiate")
+
+    async def terminate(self, nsir_id, nsilcmop_id):
+        logging_text = "Task nsi={} terminate={} ".format(nsir_id, nsilcmop_id)
+        self.logger.debug(logging_text + "Enter")
+        exc = None 
+        db_nsir = None
+        db_nsilcmop = None
+        db_nsir_update = {"_admin.nsilcmop": nsilcmop_id}
+        db_nsilcmop_update = {}
+        nsilcmop_operation_state = None
+        try:
+            step = "Getting nsir={} from db".format(nsir_id)
+            db_nsir = self.db.get_one("nsis", {"_id": nsir_id})
+            step = "Getting nsilcmop={} from db".format(nsilcmop_id)
+            db_nsilcmop = self.db.get_one("nsilcmops", {"_id": nsilcmop_id})
+            
+            # TODO: Check if makes sense check the nsiState=NOT_INSTANTIATED when terminate
+            # CASE: Instance was terminated but there is a second request to terminate the instance
+            if db_nsir["_admin"]["nsiState"] == "NOT_INSTANTIATED":
+                return
+
+            # Slice status Terminating
+            db_nsir_update["operational-status"] = "terminating"
+            db_nsir_update["config-status"] = "terminating"
+            self.update_db_2("nsis", nsir_id, db_nsir_update)
+
+            # look if previous tasks is in process
+            task_name, task_dependency = self.lcm_tasks.lookfor_related("nsi", nsir_id, nsilcmop_id)
+            if task_dependency:
+                step = db_nsilcmop_update["detailed-status"] = \
+                    "Waiting for related tasks to be completed: {}".format(task_name)
+                self.logger.debug(logging_text + step)
+                self.update_db_2("nsilcmops", nsilcmop_id, db_nsilcmop_update)
+                _, pending = await asyncio.wait(task_dependency, timeout=3600)
+                if pending:
+                    raise LcmException("Timeout waiting related tasks to be completed")
+                       
+            # Gets the list to keep track of network service records status in the netslice
+            nsir_admin = db_nsir["_admin"]
+            nsrs_detailed_list = []       
+
+            # Iterate over the network services operation ids to terminate NSs
+            # TODO: (future improvement) look another way check the tasks instead of keep asking 
+            # -> https://docs.python.org/3/library/asyncio-task.html#waiting-primitives
+            # steps: declare ns_tasks, add task when terminate is called, await asyncio.wait(vca_task_list, timeout=300)
+            # ns_tasks = []
+            nslcmop_ids = db_nsilcmop["operationParams"].get("nslcmops_ids")
+            for nslcmop_id in nslcmop_ids:
+                nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
+                nsr_id = nslcmop["operationParams"].get("nsInstanceId")
+                task = asyncio.ensure_future(self.ns.terminate(nsr_id, nslcmop_id))
+                self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_instantiate", task)
+
+            # Wait until Network Slice is terminated
+            step = nsir_status_detailed = " Waiting nsi terminated. nsi_id={}".format(nsir_id)
+            nsrs_detailed_list_old = None
+            self.logger.debug(logging_text + step)
+            
+            termination_timeout = 2 * 3600   # Two hours
+            while termination_timeout > 0:
+                # Check ns termination status
+                nsi_ready = True
+                nsrs_detailed_list = []
+                for nslcmop_item in nslcmop_ids:
+                    nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_item})
+                    status = nslcmop["operationState"]
+                    # TODO: (future improvement) other possible status: ROLLING_BACK,ROLLED_BACK 
+                    nsrs_detailed_list.append({"nsrId": nslcmop["nsInstanceId"], "status": nslcmop["operationState"],
+                                               "detailed-status": 
+                                               nsir_status_detailed + "; {}".format(nslcmop.get("detailed-status"))})
+                    if status not in ["COMPLETED", "PARTIALLY_COMPLETED", "FAILED", "FAILED_TEMP"]:
+                        nsi_ready = False
+
+                if nsrs_detailed_list != nsrs_detailed_list_old:
+                    nsir_admin["nsrs-detailed-list"] = nsrs_detailed_list
+                    nsrs_detailed_list_old = nsrs_detailed_list
+                    db_nsir_update["_admin"] = nsir_admin
+                    self.update_db_2("nsis", nsir_id, db_nsir_update)
+                    
+                if nsi_ready:
+                    step = "Network Slice Instance is terminated. nsi_id={}".format(nsir_id)
+                    for items in nsrs_detailed_list:
+                        if "FAILED" in items.values():
+                            raise LcmException("Error terminating NSI: {}".format(nsir_id))
+                    break
+
+                await asyncio.sleep(5, loop=self.loop)
+                termination_timeout -= 5
+
+            if termination_timeout <= 0:
+                raise LcmException("Timeout waiting nsi to be terminated. nsi_id={}".format(nsir_id))
+
+            db_nsir_update["operational-status"] = "terminated"
+            db_nsir_update["config-status"] = "configured"            
+            db_nsir_update["detailed-status"] = "done" 
+            db_nsilcmop_update["operationState"] = nsilcmop_operation_state = "COMPLETED"
+            db_nsilcmop_update["statusEnteredTime"] = time()
+            db_nsilcmop_update["detailed-status"] = "done"
+            return
+
+        except (LcmException, DbException) as e:
+            self.logger.error(logging_text + "Exit Exception while '{}': {}".format(step, e))
+            exc = e
+        except asyncio.CancelledError:
+            self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
+            exc = "Operation was cancelled"
+        except Exception as e:
+            exc = traceback.format_exc()
+            self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e),
+                                 exc_info=True)
+        finally:
+            if exc:
+                if db_nsir:
+                    db_nsir_update["detailed-status"] = "ERROR {}: {}".format(step, exc)
+                    db_nsir_update["operational-status"] = "failed"
+                if db_nsilcmop:
+                    db_nsilcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
+                    db_nsilcmop_update["operationState"] = nsilcmop_operation_state = "FAILED"
+                    db_nsilcmop_update["statusEnteredTime"] = time()
+            if db_nsir:
+                db_nsir_update["_admin.nsilcmop"] = None
+                db_nsir_update["_admin.nsiState"] = "TERMINATED"
+                self.update_db_2("nsis", nsir_id, db_nsir_update)
+            if db_nsilcmop:
+                self.update_db_2("nsilcmops", nsilcmop_id, db_nsilcmop_update)
+
+            if nsilcmop_operation_state:
+                try:
+                    await self.msg.aiowrite("nsi", "terminated", {"nsir_id": nsir_id, "nsilcmop_id": nsilcmop_id,
+                                                                  "operationState": nsilcmop_operation_state})
+                except Exception as e:
+                    self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
+            self.logger.debug(logging_text + "Exit")
+            self.lcm_tasks.remove("nsi", nsir_id, nsilcmop_id, "nsi_terminate")