From c2033f23c8111a172f70873beda52735e23f41a6 Mon Sep 17 00:00:00 2001 From: Felipe Vicens Date: Thu, 15 Nov 2018 15:09:58 +0100 Subject: [PATCH] Network Slice Manager: Instantiate and Terminate actions Change-Id: I0c7e815b84f33ca981358ff25e17f9903611e854 Signed-off-by: Felipe Vicens --- Dockerfile.local | 4 +- osm_lcm/ROclient.py | 4 +- osm_lcm/lcm.py | 39 +++++- osm_lcm/lcm_utils.py | 9 +- osm_lcm/netslice.py | 300 +++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 348 insertions(+), 8 deletions(-) create mode 100644 osm_lcm/netslice.py diff --git a/Dockerfile.local b/Dockerfile.local index 65c7ce1..e5bdb86 100644 --- a/Dockerfile.local +++ b/Dockerfile.local @@ -19,7 +19,7 @@ FROM ubuntu:16.04 WORKDIR /app/osm_lcm # Copy the current directory contents into the container at /app -ADD . /app +#ADD . /app RUN apt-get update && apt-get install -y git tox python3 \ python3-pip python3-aiohttp \ @@ -82,6 +82,8 @@ ENV OSMLCM_MESSAGE_PORT 9092 ENV OSMLCM_GLOBAL_LOGFILE /app/log/lcm.log ENV OSMLCM_GLOBAL_LOGLEVEL DEBUG +ADD . /app + # Run app.py when the container launches CMD ["python3", "lcm.py"] diff --git a/osm_lcm/ROclient.py b/osm_lcm/ROclient.py index d0bef33..1fd5f95 100644 --- a/osm_lcm/ROclient.py +++ b/osm_lcm/ROclient.py @@ -99,7 +99,7 @@ def remove_envelop(item, indata=None): class ROClient: headers_req = {'Accept': 'application/yaml', 'content-type': 'application/yaml'} client_to_RO = {'tenant': 'tenants', 'vim': 'datacenters', 'vim_account': 'datacenters', 'sdn': 'sdn_controllers', - 'vnfd': 'vnfs', 'nsd': 'scenarios', + 'vnfd': 'vnfs', 'nsd': 'scenarios', 'ns': 'instances'} mandatory_for_create = { 'tenant': ("name", ), @@ -757,7 +757,7 @@ class ROClient: async def create(self, item, descriptor=None, descriptor_format=None, **kwargs): """ Creates an item from its descriptor - :param item: can be 'tenant', 'vnfd', 'nsd', 'ns', 'vim', 'vim_account', 'sdn' + :param item: can be 'tenant', 'vnfd', 'nsd', 'ns', 'vim', 'vim_account', 'sdn', nstd :param descriptor: can be a dict, or a yaml/json text. Autodetect unless descriptor_format is provided :param descriptor_format: Can be 'json' or 'yaml' :param kwargs: Overrides descriptor with values as name, description, vim_url, vim_url_admin, vim_type diff --git a/osm_lcm/lcm.py b/osm_lcm/lcm.py index 39a254b..d4a09db 100644 --- a/osm_lcm/lcm.py +++ b/osm_lcm/lcm.py @@ -10,6 +10,7 @@ import sys import ROclient import ns import vim_sdn +import netslice from lcm_utils import versiontuple, LcmException, TaskRegistry # from osm_lcm import version as lcm_version, version_date as lcm_version_date, ROclient @@ -142,6 +143,8 @@ class Lcm: raise LcmException(str(e)) self.ns = ns.NsLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config, self.vca_config, self.loop) + self.netslice = netslice.NetsliceLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config, + self.vca_config, self.loop) self.vim = vim_sdn.VimLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config, self.loop) self.sdn = vim_sdn.SdnLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config, self.loop) @@ -201,7 +204,7 @@ class Lcm: first_start = True while consecutive_errors < 10: try: - topics = ("admin", "ns", "vim_account", "sdn") + topics = ("admin", "ns", "vim_account", "sdn", "nsi") topic, command, params = await self.msg.aioread(topics, self.loop) if topic != "admin" and command != "ping": self.logger.debug("Task kafka_read receives {} {}: {}".format(topic, command, params)) @@ -276,6 +279,40 @@ class Lcm: continue # TODO cleaning of task just in case should be done elif command in ("terminated", "instantiated", "scaled", "actioned"): # "scaled-cooldown-time" continue + elif topic == "nsi": # netslice LCM processes (instantiate, terminate, etc) + if command == "instantiate": + # self.logger.debug("Instantiating Network Slice {}".format(nsilcmop["netsliceInstanceId"])) + nsilcmop = params + nsilcmop_id = nsilcmop["_id"] # slice operation id + nsir_id = nsilcmop["netsliceInstanceId"] # slice record id + task = asyncio.ensure_future(self.netslice.instantiate(nsir_id, nsilcmop_id)) + self.lcm_tasks.register("nsi", nsir_id, nsilcmop_id, "nsi_instantiate", task) + continue + elif command == "terminate": + # self.logger.debug("Terminating Network Slice NS {}".format(nsilcmop["netsliceInstanceId"])) + nsilcmop = params + nsilcmop_id = nsilcmop["_id"] # slice operation id + nsir_id = nsilcmop["netsliceInstanceId"] # slice record id + self.lcm_tasks.cancel(topic, nsir_id) + task = asyncio.ensure_future(self.netslice.terminate(nsir_id, nsilcmop_id)) + self.lcm_tasks.register("nsi", nsir_id, nsilcmop_id, "nsi_terminate", task) + continue + elif command == "show": + try: + db_nsir = self.db.get_one("nsirs", {"_id": nsir_id}) + print("nsir:\n _id={}\n operational-status: {}\n config-status: {}" + "\n detailed-status: {}\n deploy: {}\n tasks: {}" + "".format(nsir_id, db_nsir["operational-status"], db_nsir["config-status"], + db_nsir["detailed-status"], + db_nsir["_admin"]["deployed"], self.lcm_netslice_tasks.get(nsir_id))) + except Exception as e: + print("nsir {} not found: {}".format(nsir_id, e)) + sys.stdout.flush() + continue + elif command == "deleted": + continue # TODO cleaning of task just in case should be done + elif command in ("terminated", "instantiated", "scaled", "actioned"): # "scaled-cooldown-time" + continue elif topic == "vim_account": vim_id = params["_id"] if command == "create": diff --git a/osm_lcm/lcm_utils.py b/osm_lcm/lcm_utils.py index cb74118..2c84356 100644 --- a/osm_lcm/lcm_utils.py +++ b/osm_lcm/lcm_utils.py @@ -40,6 +40,7 @@ class TaskRegistry: def __init__(self): self.task_registry = { "ns": {}, + "nsi": {}, "vim_account": {}, "sdn": {}, } @@ -47,7 +48,7 @@ class TaskRegistry: def register(self, topic, _id, op_id, task_name, task): """ Register a new task - :param topic: Can be "ns", "vim_account", "sdn" + :param topic: Can be "ns", "nsi", "vim_account", "sdn" :param _id: _id of the related item :param op_id: id of the operation of the related item :param task_name: Task descriptive name, as create, instantiate, terminate. Must be unique in this op_id @@ -65,7 +66,7 @@ class TaskRegistry: def remove(self, topic, _id, op_id, task_name=None): """ When task is ended, it should removed. It ignores missing tasks - :param topic: Can be "ns", "vim_account", "sdn" + :param topic: Can be "ns", "nsi", "vim_account", "sdn" :param _id: _id of the related item :param op_id: id of the operation of the related item :param task_name: Task descriptive name. If note it deletes all @@ -103,8 +104,8 @@ class TaskRegistry: def cancel(self, topic, _id, target_op_id=None, target_task_name=None): """ - Cancel all active tasks of a concrete ns, vim_account, sdn identified for _id. If op_id is supplied only this is - cancelled, and the same with task_name + Cancel all active tasks of a concrete ns, nsi, vim_account, sdn identified for _id. If op_id is supplied only + this is cancelled, and the same with task_name """ if not self.task_registry[topic].get(_id): return diff --git a/osm_lcm/netslice.py b/osm_lcm/netslice.py new file mode 100644 index 0000000..aecac77 --- /dev/null +++ b/osm_lcm/netslice.py @@ -0,0 +1,300 @@ +#!/usr/bin/python3 +# -*- coding: utf-8 -*- + +import asyncio +import logging +import logging.handlers +import traceback +import ns +from lcm_utils import LcmException, LcmBase +from osm_common.dbbase import DbException +from time import time + +__author__ = "Felipe Vicens, Pol Alemany, Alfonso Tierno" + + +class NetsliceLcm(LcmBase): + + def __init__(self, db, msg, fs, lcm_tasks, ro_config, vca_config, loop): + """ + Init, Connect to database, filesystem storage, and messaging + :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', + :return: None + """ + # logging + self.logger = logging.getLogger('lcm.netslice') + self.loop = loop + self.lcm_tasks = lcm_tasks + self.ns = ns.NsLcm(db, msg, fs, lcm_tasks, ro_config, vca_config, loop) + + super().__init__(db, msg, fs, self.logger) + + # TODO: check logging_text within the self.logger.info/debug + async def instantiate(self, nsir_id, nsilcmop_id): + logging_text = "Task netslice={} instantiate={} ".format(nsir_id, nsilcmop_id) + self.logger.debug(logging_text + "Enter") + # get all needed from database + exc = None + db_nsir = None + db_nsilcmop = None + db_nsir_update = {"_admin.nsilcmop": nsilcmop_id} + db_nsilcmop_update = {} + nsilcmop_operation_state = None + + try: + step = "Getting nsir={} from db".format(nsir_id) + db_nsir = self.db.get_one("nsis", {"_id": nsir_id}) + step = "Getting nsilcmop={} from db".format(nsilcmop_id) + db_nsilcmop = self.db.get_one("nsilcmops", {"_id": nsilcmop_id}) + + # look if previous tasks is in process + task_name, task_dependency = self.lcm_tasks.lookfor_related("nsi", nsir_id, nsilcmop_id) + if task_dependency: + step = db_nsilcmop_update["detailed-status"] = \ + "Waiting for related tasks to be completed: {}".format(task_name) + self.logger.debug(logging_text + step) + self.update_db_2("nsilcmops", nsilcmop_id, db_nsilcmop_update) + _, pending = await asyncio.wait(task_dependency, timeout=3600) + if pending: + raise LcmException("Timeout waiting related tasks to be completed") + + # Empty list to keep track of network service records status in the netslice + nsir_admin = db_nsir["_admin"] + + nsir_admin["nsrs-detailed-list"] = [] + + # Slice status Creating + db_nsir_update["detailed-status"] = "creating" + db_nsir_update["operational-status"] = "init" + self.update_db_2("nsis", nsir_id, db_nsir_update) + + # Iterate over the network services operation ids to instantiate NSs + # TODO: (future improvement) look another way check the tasks instead of keep asking + # -> https://docs.python.org/3/library/asyncio-task.html#waiting-primitives + # steps: declare ns_tasks, add task when terminate is called, await asyncio.wait(vca_task_list, timeout=300) + # ns_tasks = [] + nslcmop_ids = db_nsilcmop["operationParams"].get("nslcmops_ids") + for nslcmop_id in nslcmop_ids: + nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id}) + nsr_id = nslcmop.get("nsInstanceId") + step = "Launching ns={} instantiate={} task".format(nsr_id, nslcmop) + task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id)) + self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_instantiate", task) + + # Wait until Network Slice is ready + step = nsir_status_detailed = " Waiting nsi ready. nsi_id={}".format(nsir_id) + nsrs_detailed_list_old = None + self.logger.debug(logging_text + step) + + # TODO: substitute while for await (all task to be done or not) + deployment_timeout = 2 * 3600 # Two hours + while deployment_timeout > 0: + # Check ns instantiation status + nsi_ready = True + nsrs_detailed_list = [] + for nslcmop_item in nslcmop_ids: + nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_item}) + status = nslcmop.get("operationState") + # TODO: (future improvement) other possible status: ROLLING_BACK,ROLLED_BACK + nsrs_detailed_list.append({"nsrId": nslcmop["nsInstanceId"], "status": nslcmop["operationState"], + "detailed-status": + nsir_status_detailed + "; {}".format(nslcmop.get("detailed-status"))}) + if status not in ["COMPLETED", "PARTIALLY_COMPLETED", "FAILED", "FAILED_TEMP"]: + nsi_ready = False + + if nsrs_detailed_list != nsrs_detailed_list_old: + nsir_admin["nsrs-detailed-list"] = nsrs_detailed_list + nsrs_detailed_list_old = nsrs_detailed_list + db_nsir_update["_admin"] = nsir_admin + self.update_db_2("nsis", nsir_id, db_nsir_update) + + if nsi_ready: + step = "Network Slice Instance is ready. nsi_id={}".format(nsir_id) + for items in nsrs_detailed_list: + if "FAILED" in items.values(): + raise LcmException("Error deploying NSI: {}".format(nsir_id)) + break + + # TODO: future improvement due to synchronism -> await asyncio.wait(vca_task_list, timeout=300) + await asyncio.sleep(5, loop=self.loop) + deployment_timeout -= 5 + + if deployment_timeout <= 0: + raise LcmException("Timeout waiting nsi to be ready. nsi_id={}".format(nsir_id)) + + db_nsir_update["operational-status"] = "running" + db_nsir_update["detailed-status"] = "done" + db_nsir_update["config-status"] = "configured" + db_nsilcmop_update["operationState"] = nsilcmop_operation_state = "COMPLETED" + db_nsilcmop_update["statusEnteredTime"] = time() + db_nsilcmop_update["detailed-status"] = "done" + return + + except (LcmException, DbException) as e: + self.logger.error(logging_text + "Exit Exception while '{}': {}".format(step, e)) + exc = e + except asyncio.CancelledError: + self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step)) + exc = "Operation was cancelled" + except Exception as e: + exc = traceback.format_exc() + self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e), + exc_info=True) + finally: + if exc: + if db_nsir: + db_nsir_update["detailed-status"] = "ERROR {}: {}".format(step, exc) + db_nsir_update["operational-status"] = "failed" + if db_nsilcmop: + db_nsilcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc) + db_nsilcmop_update["operationState"] = nsilcmop_operation_state = "FAILED" + db_nsilcmop_update["statusEnteredTime"] = time() + if db_nsir: + db_nsir_update["_admin.nsiState"] = "INSTANTIATED" + db_nsir_update["_admin.nsilcmop"] = None + self.update_db_2("nsis", nsir_id, db_nsir_update) + if db_nsilcmop: + + self.update_db_2("nsilcmops", nsilcmop_id, db_nsilcmop_update) + if nsilcmop_operation_state: + try: + await self.msg.aiowrite("nsi", "instantiated", {"nsir_id": nsir_id, "nsilcmop_id": nsilcmop_id, + "operationState": nsilcmop_operation_state}) + except Exception as e: + self.logger.error(logging_text + "kafka_write notification Exception {}".format(e)) + self.logger.debug(logging_text + "Exit") + self.lcm_tasks.remove("nsi", nsir_id, nsilcmop_id, "nsi_instantiate") + + async def terminate(self, nsir_id, nsilcmop_id): + logging_text = "Task nsi={} terminate={} ".format(nsir_id, nsilcmop_id) + self.logger.debug(logging_text + "Enter") + exc = None + db_nsir = None + db_nsilcmop = None + db_nsir_update = {"_admin.nsilcmop": nsilcmop_id} + db_nsilcmop_update = {} + nsilcmop_operation_state = None + try: + step = "Getting nsir={} from db".format(nsir_id) + db_nsir = self.db.get_one("nsis", {"_id": nsir_id}) + step = "Getting nsilcmop={} from db".format(nsilcmop_id) + db_nsilcmop = self.db.get_one("nsilcmops", {"_id": nsilcmop_id}) + + # TODO: Check if makes sense check the nsiState=NOT_INSTANTIATED when terminate + # CASE: Instance was terminated but there is a second request to terminate the instance + if db_nsir["_admin"]["nsiState"] == "NOT_INSTANTIATED": + return + + # Slice status Terminating + db_nsir_update["operational-status"] = "terminating" + db_nsir_update["config-status"] = "terminating" + self.update_db_2("nsis", nsir_id, db_nsir_update) + + # look if previous tasks is in process + task_name, task_dependency = self.lcm_tasks.lookfor_related("nsi", nsir_id, nsilcmop_id) + if task_dependency: + step = db_nsilcmop_update["detailed-status"] = \ + "Waiting for related tasks to be completed: {}".format(task_name) + self.logger.debug(logging_text + step) + self.update_db_2("nsilcmops", nsilcmop_id, db_nsilcmop_update) + _, pending = await asyncio.wait(task_dependency, timeout=3600) + if pending: + raise LcmException("Timeout waiting related tasks to be completed") + + # Gets the list to keep track of network service records status in the netslice + nsir_admin = db_nsir["_admin"] + nsrs_detailed_list = [] + + # Iterate over the network services operation ids to terminate NSs + # TODO: (future improvement) look another way check the tasks instead of keep asking + # -> https://docs.python.org/3/library/asyncio-task.html#waiting-primitives + # steps: declare ns_tasks, add task when terminate is called, await asyncio.wait(vca_task_list, timeout=300) + # ns_tasks = [] + nslcmop_ids = db_nsilcmop["operationParams"].get("nslcmops_ids") + for nslcmop_id in nslcmop_ids: + nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id}) + nsr_id = nslcmop["operationParams"].get("nsInstanceId") + task = asyncio.ensure_future(self.ns.terminate(nsr_id, nslcmop_id)) + self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_instantiate", task) + + # Wait until Network Slice is terminated + step = nsir_status_detailed = " Waiting nsi terminated. nsi_id={}".format(nsir_id) + nsrs_detailed_list_old = None + self.logger.debug(logging_text + step) + + termination_timeout = 2 * 3600 # Two hours + while termination_timeout > 0: + # Check ns termination status + nsi_ready = True + nsrs_detailed_list = [] + for nslcmop_item in nslcmop_ids: + nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_item}) + status = nslcmop["operationState"] + # TODO: (future improvement) other possible status: ROLLING_BACK,ROLLED_BACK + nsrs_detailed_list.append({"nsrId": nslcmop["nsInstanceId"], "status": nslcmop["operationState"], + "detailed-status": + nsir_status_detailed + "; {}".format(nslcmop.get("detailed-status"))}) + if status not in ["COMPLETED", "PARTIALLY_COMPLETED", "FAILED", "FAILED_TEMP"]: + nsi_ready = False + + if nsrs_detailed_list != nsrs_detailed_list_old: + nsir_admin["nsrs-detailed-list"] = nsrs_detailed_list + nsrs_detailed_list_old = nsrs_detailed_list + db_nsir_update["_admin"] = nsir_admin + self.update_db_2("nsis", nsir_id, db_nsir_update) + + if nsi_ready: + step = "Network Slice Instance is terminated. nsi_id={}".format(nsir_id) + for items in nsrs_detailed_list: + if "FAILED" in items.values(): + raise LcmException("Error terminating NSI: {}".format(nsir_id)) + break + + await asyncio.sleep(5, loop=self.loop) + termination_timeout -= 5 + + if termination_timeout <= 0: + raise LcmException("Timeout waiting nsi to be terminated. nsi_id={}".format(nsir_id)) + + db_nsir_update["operational-status"] = "terminated" + db_nsir_update["config-status"] = "configured" + db_nsir_update["detailed-status"] = "done" + db_nsilcmop_update["operationState"] = nsilcmop_operation_state = "COMPLETED" + db_nsilcmop_update["statusEnteredTime"] = time() + db_nsilcmop_update["detailed-status"] = "done" + return + + except (LcmException, DbException) as e: + self.logger.error(logging_text + "Exit Exception while '{}': {}".format(step, e)) + exc = e + except asyncio.CancelledError: + self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step)) + exc = "Operation was cancelled" + except Exception as e: + exc = traceback.format_exc() + self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e), + exc_info=True) + finally: + if exc: + if db_nsir: + db_nsir_update["detailed-status"] = "ERROR {}: {}".format(step, exc) + db_nsir_update["operational-status"] = "failed" + if db_nsilcmop: + db_nsilcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc) + db_nsilcmop_update["operationState"] = nsilcmop_operation_state = "FAILED" + db_nsilcmop_update["statusEnteredTime"] = time() + if db_nsir: + db_nsir_update["_admin.nsilcmop"] = None + db_nsir_update["_admin.nsiState"] = "TERMINATED" + self.update_db_2("nsis", nsir_id, db_nsir_update) + if db_nsilcmop: + self.update_db_2("nsilcmops", nsilcmop_id, db_nsilcmop_update) + + if nsilcmop_operation_state: + try: + await self.msg.aiowrite("nsi", "terminated", {"nsir_id": nsir_id, "nsilcmop_id": nsilcmop_id, + "operationState": nsilcmop_operation_state}) + except Exception as e: + self.logger.error(logging_text + "kafka_write notification Exception {}".format(e)) + self.logger.debug(logging_text + "Exit") + self.lcm_tasks.remove("nsi", nsir_id, nsilcmop_id, "nsi_terminate") -- 2.17.1