From e7e0b7585149d1d380532b905eb16507476ea2e9 Mon Sep 17 00:00:00 2001 From: Gulsum Atici Date: Fri, 30 Sep 2022 14:31:26 +0300 Subject: [PATCH] Adding PaaS Service Creation Adding AbstractPaasConnector and JujuPaasService Classes Change-Id: I1678a8aa9d9fa453c5e21a340c29c35c82989594 Signed-off-by: Gulsum Atici --- osm_lcm/lcm.py | 157 ++++-- osm_lcm/lcm_utils.py | 26 + osm_lcm/paas_conn.py | 213 +++++++ osm_lcm/paas_service.py | 847 ++++++++++++++++++++++++++++ osm_lcm/tests/test_lcm_helm_conn.py | 7 +- 5 files changed, 1207 insertions(+), 43 deletions(-) create mode 100644 osm_lcm/paas_conn.py create mode 100644 osm_lcm/paas_service.py diff --git a/osm_lcm/lcm.py b/osm_lcm/lcm.py index 5bbeade..5f34280 100644 --- a/osm_lcm/lcm.py +++ b/osm_lcm/lcm.py @@ -34,7 +34,14 @@ from osm_lcm.ng_ro import NgRoException, NgRoClient from osm_lcm.ROclient import ROClient, ROClientException from time import time -from osm_lcm.lcm_utils import versiontuple, LcmException, TaskRegistry, LcmExceptionExit +from osm_lcm.lcm_utils import ( + get_paas_id_by_nsr_id, + get_paas_type_by_paas_id, + LcmException, + LcmExceptionExit, + TaskRegistry, + versiontuple, +) from osm_lcm import version as lcm_version, version_date as lcm_version_date from osm_common import msglocal, msgkafka @@ -45,6 +52,7 @@ from osm_common.msgbase import MsgException from osm_lcm.data_utils.database.database import Database from osm_lcm.data_utils.filesystem.filesystem import Filesystem from osm_lcm.lcm_hc import get_health_check_file +from osm_lcm.paas_service import paas_service_factory from os import environ, path from random import choice as random_choice from n2vc import version as n2vc_version @@ -78,7 +86,7 @@ class Lcm: def __init__(self, config_file, loop=None): """ Init, Connect to database, filesystem storage, and messaging - :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', + :param config_file: two level dictionary with configuration. Top level should contain 'database', 'storage', :return: None """ self.db = None @@ -123,8 +131,13 @@ class Lcm: self.vim ) = ( self.wim - ) = self.sdn = self.k8scluster = self.vca = self.k8srepo = self.paas = None - + ) = ( + self.sdn + ) = ( + self.k8scluster + ) = ( + self.vca + ) = self.k8srepo = self.paas = self.paas_service = self.juju_paas = None # logging log_format_simple = ( "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s" @@ -334,6 +347,77 @@ class Lcm: else: self.logger.error("Invalid command {} for PaaS topic".format(command)) + def _kafka_read_ns_instantiate(self, params: dict) -> None: + """Operations to be performed if the topic is ns and command is instantiate. + Args: + params (dict): Dictionary including NS related parameters + """ + nsr_id, nslcmop_id = params["nsInstanceId"], params["_id"] + paas_id = params["operationParams"].get("paasAccountId") + + if paas_id: + paas_type = get_paas_type_by_paas_id(paas_id, self.db) + task = asyncio.ensure_future( + self.paas_service[paas_type].instantiate(nsr_id, nslcmop_id) + ) + self.logger.debug( + "Deploying NS {} using PaaS account {}".format(nsr_id, paas_id) + ) + + else: + task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id)) + self.logger.debug("Deploying NS {}".format(nsr_id)) + + self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_instantiate", task) + + def _kafka_read_ns_terminate(self, params: dict, topic: str) -> None: + """Operations to be performed if the topic is ns and command is terminate. + Args: + params (dict): Dictionary including NS related parameters + topic (str): Name of Kafka topic + """ + nsr_id, nslcmop_id = params["nsInstanceId"], params["_id"] + paas_id = get_paas_id_by_nsr_id(nsr_id, self.db) + + if paas_id: + paas_type = get_paas_type_by_paas_id(paas_id, self.db) + task = asyncio.ensure_future( + self.paas_service[paas_type].terminate(nsr_id, nslcmop_id) + ) + self.logger.debug( + "Terminating NS {} using PaaS account {}".format(nsr_id, paas_id) + ) + + else: + self.lcm_tasks.cancel(topic, nsr_id) + task = asyncio.ensure_future(self.ns.terminate(nsr_id, nslcmop_id)) + self.logger.debug("Terminating NS {}".format(nsr_id)) + + self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_terminate", task) + + def _kafka_read_ns_action(self, params: dict) -> None: + """Operations to be performed if the topic is ns and command is action. + Args: + params (dict): Dictionary including NS related parameters + """ + nsr_id, nslcmop_id = params["nsInstanceId"], params["_id"] + paas_id = get_paas_id_by_nsr_id(nsr_id, self.db) + + if paas_id: + paas_type = get_paas_type_by_paas_id(paas_id, self.db) + task = asyncio.ensure_future( + self.paas_service[paas_type].action(nsr_id, nslcmop_id) + ) + self.logger.debug( + "Running action on NS {} using PaaS account {}".format(nsr_id, paas_id) + ) + + else: + task = asyncio.ensure_future(self.ns.action(nsr_id, nslcmop_id)) + self.logger.debug("Running action on NS {}".format(nsr_id)) + + self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_action", task) + def kafka_read_callback(self, topic, command, params): order_id = 1 @@ -423,24 +507,13 @@ class Lcm: return elif topic == "ns": if command == "instantiate": - # self.logger.debug("Deploying NS {}".format(nsr_id)) - nslcmop = params - nslcmop_id = nslcmop["_id"] - nsr_id = nslcmop["nsInstanceId"] - task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id)) - self.lcm_tasks.register( - "ns", nsr_id, nslcmop_id, "ns_instantiate", task - ) + self._kafka_read_ns_instantiate(params) return + elif command == "terminate": - # self.logger.debug("Deleting NS {}".format(nsr_id)) - nslcmop = params - nslcmop_id = nslcmop["_id"] - nsr_id = nslcmop["nsInstanceId"] - self.lcm_tasks.cancel(topic, nsr_id) - task = asyncio.ensure_future(self.ns.terminate(nsr_id, nslcmop_id)) - self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_terminate", task) + self._kafka_read_ns_terminate(params, topic) return + elif command == "vca_status_refresh": nslcmop = params nslcmop_id = nslcmop["_id"] @@ -452,49 +525,36 @@ class Lcm: "ns", nsr_id, nslcmop_id, "ns_vca_status_refresh", task ) return + elif command == "action": - # self.logger.debug("Update NS {}".format(nsr_id)) - nslcmop = params - nslcmop_id = nslcmop["_id"] - nsr_id = nslcmop["nsInstanceId"] - task = asyncio.ensure_future(self.ns.action(nsr_id, nslcmop_id)) - self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_action", task) + self._kafka_read_ns_action(params) return + elif command == "update": # self.logger.debug("Update NS {}".format(nsr_id)) - nslcmop = params - nslcmop_id = nslcmop["_id"] - nsr_id = nslcmop["nsInstanceId"] + nsr_id, nslcmop_id = params["nsInstanceId"], params["_id"] task = asyncio.ensure_future(self.ns.update(nsr_id, nslcmop_id)) self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_update", task) return elif command == "scale": # self.logger.debug("Update NS {}".format(nsr_id)) - nslcmop = params - nslcmop_id = nslcmop["_id"] - nsr_id = nslcmop["nsInstanceId"] + nsr_id, nslcmop_id = params["nsInstanceId"], params["_id"] task = asyncio.ensure_future(self.ns.scale(nsr_id, nslcmop_id)) self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_scale", task) return elif command == "heal": # self.logger.debug("Healing NS {}".format(nsr_id)) - nslcmop = params - nslcmop_id = nslcmop["_id"] - nsr_id = nslcmop["nsInstanceId"] + nsr_id, nslcmop_id = params["nsInstanceId"], params["_id"] task = asyncio.ensure_future(self.ns.heal(nsr_id, nslcmop_id)) self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_heal", task) return elif command == "migrate": - nslcmop = params - nslcmop_id = nslcmop["_id"] - nsr_id = nslcmop["nsInstanceId"] + nsr_id, nslcmop_id = params["nsInstanceId"], params["_id"] task = asyncio.ensure_future(self.ns.migrate(nsr_id, nslcmop_id)) self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_migrate", task) return elif command == "verticalscale": - nslcmop = params - nslcmop_id = nslcmop["_id"] - nsr_id = nslcmop["nsInstanceId"] + nsr_id, nslcmop_id = params["nsInstanceId"], params["_id"] task = asyncio.ensure_future(self.ns.vertical_scale(nsr_id, nslcmop_id)) self.logger.debug( "nsr_id,nslcmop_id,task {},{},{}".format(nsr_id, nslcmop_id, task) @@ -519,7 +579,7 @@ class Lcm: db_nsr["config-status"], db_nsr["detailed-status"], db_nsr["_admin"]["deployed"], - self.lcm_ns_tasks.get(nsr_id), + self.lcm_tasks.task_registry["ns"][nsr_id], ) ) except Exception as e: @@ -755,6 +815,21 @@ class Lcm: self.msg, self.lcm_tasks, self.config, self.loop ) + # Specific PaaS Service Object for "Juju" PaaS Orchestrator type + self.juju_paas = paas_service_factory( + self.msg, + self.lcm_tasks, + self.db, + self.fs, + self.logger, + self.loop, + self.config, + "juju", + ) + # Mapping between paas_type and PaaS service object + self.paas_service = { + "juju": self.juju_paas, + } self.loop.run_until_complete( asyncio.gather(self.kafka_read(), self.kafka_ping()) ) diff --git a/osm_lcm/lcm_utils.py b/osm_lcm/lcm_utils.py index 19852d0..5cd5a2f 100644 --- a/osm_lcm/lcm_utils.py +++ b/osm_lcm/lcm_utils.py @@ -88,6 +88,32 @@ def get_iterable(in_dict, in_key): return in_dict[in_key] +def get_paas_id_by_nsr_id(nsr_id: str, db: object) -> str: + """Get the PaaS account ID using NS record ID. + Args: + nsr_id (str): NS record ID + db (object): Database Object + + Returns: + paas_id (str) PaaS account ID + """ + db_nsr = db.get_one("nsrs", {"_id": nsr_id}) + return db_nsr.get("paasdatacenter") + + +def get_paas_type_by_paas_id(paas_id: str, db: object) -> str: + """Get the PaaS type using PaaS account ID. + Args: + paas_id (str): PaaS account ID + db (object): Database Object + + Returns: + paas_type (str) Paas Orchestrator type + """ + db_paas = db.get_one("paas", {"_id": paas_id}) + return db_paas["paas_type"] + + def check_juju_bundle_existence(vnfd: dict) -> str: """Checks the existence of juju-bundle in the descriptor diff --git a/osm_lcm/paas_conn.py b/osm_lcm/paas_conn.py new file mode 100644 index 0000000..78638b3 --- /dev/null +++ b/osm_lcm/paas_conn.py @@ -0,0 +1,213 @@ +#!/usr/bin/python3 +# +# Copyright 2022 Canonical Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +import asyncio +import logging + + +def paas_connector_factory( + uuid: str, + name: str, + db: object, + fs: object, + loop: object, + log: object, + config: dict, + paas_type="juju", +): + """Factory Method to create the paas_connector objects according to PaaS Type. + Args: + uuid (str): Internal id of PaaS account + name (str): name assigned to PaaS account, can be used for logging + db (object): Database object to write current operation status + fs (object): Filesystem object to use during operations + loop (object): Async event loop object + log (object): Logger for tracing + config (dict): Dictionary with extra PaaS information. + paas_type (str): Identifier to create paas_connector object using correct PaaS Connector Class + + Returns: + paas_connector (object): paas_connector objects created according to given PaaS Type + + Raises: + PaasConnException + """ + connectors = { + "juju": JujuPaasConnector, + } + if paas_type not in connectors.keys(): + raise PaasConnException(f"PaaS type: {paas_type} is not available.") + + return connectors[paas_type](uuid, name, db, fs, loop, log, config) + + +class PaasConnException(Exception): + """PaaS Connector Exception Base Class""" + + def __init__(self, message: str = ""): + """Constructor of PaaS Connector Exception + Args: + message (str): error message to be raised + """ + Exception.__init__(self, message) + self.message = message + + def __str__(self): + return self.message + + def __repr__(self): + return "{}({})".format(type(self), self.message) + + +class JujuPaasConnException(PaasConnException): + """Juju PaaS Connector Exception Class""" + + +class AbstractPaasConnector(abc.ABC): + """Abstract PaaS Connector class to perform operations using PaaS Orchestrator.""" + + def __init__( + self, + uuid=None, + name=None, + db=None, + fs=None, + logger=None, + loop=None, + config=None, + ): + """Constructor of PaaS Connector. + Args: + uuid (str): internal id of PaaS account + name (str): name assigned to this account, can be used for logging + db (object): database object to write current operation status + fs (object): Filesystem object to use during operations + logger (object): Logger for tracing + loop (object): Async event loop object + config (dict): Dictionary with extra PaaS information. + """ + self.id = uuid + self.name = name + self.db = db + self.fs = fs + self.config = config or {} + self.logger = logger + + @abc.abstractmethod + async def connect(self, endpoints: str, user: str = None, secret: str = None): + """Abstract method to connect PaaS account using endpoints, user and secret. + Args: + endpoints (str): Endpoint/URL to connect PaaS account + user (str): User which is used to connect PaaS account + secret (str): Used for authentication + """ + + @abc.abstractmethod + async def instantiate(self, nsr_id: str, nslcmop_id: str): + """Abstract method to perform PaaS Service instantiation. + Args: + nsr_id (str): NS service record to be used + nslcmop_id (str): NS LCM operation id + """ + + @abc.abstractmethod + async def terminate(self, nsr_id: str, nslcmop_id: str): + """Abstract method to perform PaaS Service termination. + Args: + nsr_id (str): NS service record to be used + nslcmop_id (str): NS LCM operation id + """ + + @abc.abstractmethod + async def action(self, nsr_id: str, nslcmop_id: str): + """Abstract method to perform action on PaaS Service. + Args: + nsr_id (str): NS service record to be used + nslcmop_id (str): NS LCM operation id + """ + + +class JujuPaasConnector(AbstractPaasConnector): + """Concrete PaaS Connector class to perform operations using the Juju PaaS Orchestrator.""" + + def __init__( + self, + uuid=None, + name=None, + db=None, + fs=None, + logger=None, + loop=None, + config=None, + ): + self.logger = logging.getLogger("lcm.juju_paas_connector") + super(JujuPaasConnector, self).__init__(logger=self.logger) + + async def connect(self, endpoints: str, user: str = None, secret: str = None): + """Connect Juju PaaS account using endpoints, user and secret. + Args: + endpoints (str): Endpoint/URL to connect PaaS account + user (str): User which is used to connect PaaS account + secret (str): Used for authentication + + Raises: + NotImplementedError + """ + raise NotImplementedError( + "Juju Paas Connector connect method is not implemented" + ) + + async def instantiate(self, nsr_id: str, nslcmop_id: str): + """Perform Service instantiation. + Args: + nsr_id (str): NS service record to be used + nslcmop_id (str): NS LCM operation id + + Raises: + JujuPaasConnException + """ + # This is not the real implementation + # Sample code blocks to validate method execution + await asyncio.sleep(1) + self.logger.debug("Juju Paas Connector instantiate method is called") + + async def terminate(self, nsr_id: str, nslcmop_id: str): + """Perform PaaS Service termination. + Args: + nsr_id (str): NS service record to be used + nslcmop_id (str): NS LCM operation id + + Raises: + JujuPaasConnException + """ + # This is not the real implementation + # Sample code blocks to validate method execution + await asyncio.sleep(1) + self.logger.debug("Juju Paas Connector terminate method is called") + + async def action(self, nsr_id: str, nslcmop_id: str): + """Perform action on PaaS Service. + Args: + nsr_id (str): NS service record to be used + nslcmop_id (str): NS LCM operation id + + Raises: + NotImplementedError + """ + raise NotImplementedError( + "Juju Paas Connector instantiate method is not implemented" + ) diff --git a/osm_lcm/paas_service.py b/osm_lcm/paas_service.py new file mode 100644 index 0000000..0a01157 --- /dev/null +++ b/osm_lcm/paas_service.py @@ -0,0 +1,847 @@ +# !/usr/bin/python3 +# +# Copyright 2022 Canonical Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import logging +import traceback + +from osm_common.dbbase import DbException +from osm_common.msgbase import MsgException +from osm_lcm.lcm_utils import LcmBase +from osm_lcm.lcm_utils import LcmException +from osm_lcm.paas_conn import JujuPaasConnException, paas_connector_factory + +from time import time + + +def paas_service_factory( + msg: object, + lcm_tasks: object, + db: object, + fs: object, + log: object, + loop: object, + config: dict, + paas_type="juju", +) -> object: + """Factory Method to create the paas_service objects according to PaaS Type. + Args: + msg (object): Message object to be used to write the messages to Kafka Bus + lcm_tasks (object): Task object to register the tasks + db (object): Database object to write current operation status + fs (object): Filesystem object to use during operations + log (object) Logger for tracing + loop (object) Async event loop object + config (dict): Dictionary with extra PaaS Service information. + paas_type (str): Identifier to create paas_service object using correct PaaS Service Class + + Returns: + paas_service (object): paas_service objects created according to given PaaS Type + + Raises: + PaasServiceException + """ + orchestrators = { + "juju": JujuPaasService, + } + + if paas_type not in orchestrators.keys(): + raise PaasServiceException(f"PaaS type: {paas_type} is not available.") + + return orchestrators[paas_type]( + msg=msg, lcm_tasks=lcm_tasks, db=db, fs=fs, loop=loop, logger=log, config=config + ) + + +class PaasServiceException(Exception): + """PaaS Service Exception Base Class""" + + def __init__(self, message: str = ""): + """Constructor of PaaS Service Exception + Args: + message (str): error message to be raised + """ + Exception.__init__(self, message) + self.message = message + + def __str__(self): + return self.message + + def __repr__(self): + return "{}({})".format(type(self), self.message) + + +class JujuPaasServiceException(PaasServiceException): + """Juju PaaS Service exception class""" + + +class JujuPaasService(LcmBase): + """Juju PaaS Service class to handle ns operations such as instantiate, terminate, action etc.""" + + timeout_ns_deploy = 3600 + + def __init__( + self, + msg: object, + lcm_tasks: object, + db: object, + fs: object, + loop: object, + logger: object, + config: dict, + ): + """ + Args: + msg (object): Message object to be used to write the messages to Kafka Bus + lcm_tasks (object): Task object to register the tasks + db (object): Database object to write current operation status + fs (object): Filesystem object to use during operations + loop (object) Async event loop object + logger (object): Logger for tracing + config (dict): Dictionary with extra PaaS Service information. + """ + self.logger = logging.getLogger("lcm.juju_paas_service") + self.loop = loop + self.lcm_tasks = lcm_tasks + self.config = config + super(JujuPaasService, self).__init__(msg=msg, logger=self.logger) + + self.paas_connector = paas_connector_factory( + self.msg, + self.lcm_tasks, + self.db, + self.fs, + self.loop, + self.logger, + self.config, + "juju", + ) + + def _lock_ha_task(self, nslcmop_id: str, nsr_id: str, keyword: str) -> bool: + """Lock the task. + Args: + nslcmop_id (str): NS LCM operation id + nsr_id (str): NS service record to be used + keyword (str): Word which indicates action such as instantiate, terminate + + Returns: + task_is_locked_by_me (Boolean): True if task_is_locked_by_me else False + """ + task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id) + if not task_is_locked_by_me: + self.logger.debug( + f"{keyword}() task is not locked by me, ns={nsr_id}, exiting." + ) + return task_is_locked_by_me + + def _write_ns_status( + self, + nsr_id: str, + ns_state: str, + current_operation: str, + current_operation_id: str, + error_description: str = None, + error_detail: str = None, + other_update: dict = None, + ) -> None: + """Update NS record. + Args: + nsr_id (str): NS service record to be used + ns_state (str): NS state + current_operation (str): Current operation name + current_operation_id (str): Current operation ID + error_description (str): Error description + error_detail (str): Details of error + other_update: (dict): Other required changes at database if provided + + Raises: + DbException + """ + try: + db_dict = other_update or {} + db_update_dict = { + "_admin.nslcmop": current_operation_id, + "_admin.current-operation": current_operation_id, + "_admin.operation-type": current_operation + if current_operation != "IDLE" + else None, + "currentOperation": current_operation, + "currentOperationID": current_operation_id, + "errorDescription": error_description, + "errorDetail": error_detail, + } + db_dict.update(db_update_dict) + + if ns_state: + db_dict["nsState"] = ns_state + self.update_db_2("nsrs", nsr_id, db_dict) + + except DbException as e: + error = f"Error writing NS status, ns={nsr_id}: {e}" + self.logger.error(error) + raise JujuPaasServiceException(error) + + def _write_op_status( + self, + op_id: str, + stage: str = None, + error_message: str = None, + queue_position: int = 0, + operation_state: str = None, + other_update: dict = None, + ) -> None: + """Update NS LCM Operation Status. + Args: + op_id (str): Operation ID + stage (str): Indicates the stage of operations + error_message (str): Error description + queue_position (int): Operation position in the queue + operation_state (str): State of operation + other_update: (dict): Other required changes at database if provided + + Raises: + DbException + """ + try: + db_dict = other_update or {} + db_dict["queuePosition"] = queue_position + if stage: + db_dict["stage"] = str(stage) + if error_message: + db_dict["errorMessage"] = error_message + if operation_state: + db_dict["operationState"] = operation_state + db_dict["statusEnteredTime"] = time() + self.update_db_2("nslcmops", op_id, db_dict) + + except DbException as e: + error = f"Error writing OPERATION status for op_id: {op_id} -> {e}" + self.logger.error(error) + raise JujuPaasServiceException(error) + + def _update_nsr_error_desc( + self, + stage: str, + new_error: str, + error_list: list, + error_detail_list: list, + nsr_id: str, + ) -> None: + """Update error description in NS record. + Args: + stage (str): Indicates the stage of operations + new_error (str): New detected error + error_list (str): Updated error list + error_detail_list: Updated detailed error list + nsr_id (str): NS service record to be used + + Raises: + DbException + """ + if new_error: + stage += " Errors: " + ". ".join(error_detail_list) + "." + if nsr_id: + try: + # Update nsr + self.update_db_2( + "nsrs", + nsr_id, + { + "errorDescription": "Error at: " + ", ".join(error_list), + "errorDetail": ". ".join(error_detail_list), + }, + ) + + except DbException as e: + error = f"Error updating NSR error description for nsr_id: {nsr_id} -> {e}" + self.logger.error(error) + raise JujuPaasServiceException(error) + + def _check_tasks_in_done( + self, + completed_tasks_list: list, + created_tasks_info: dict, + error_list: list, + error_detail_list: list, + logging_text: str, + ) -> (str, str, str): + """Check the completed tasks to detect errors + Args: + completed_tasks_list (list): List of completed tasks + created_tasks_info: Dictionary which includes the tasks + error_list: List of errors + error_detail_list: List includes details of errors + logging_text: Main log message + + Returns: + new_error (str): New detected error + error_list (str): Updated error list + error_detail_list: Updated detailed error list + """ + new_error = "" + for task in completed_tasks_list: + if task.cancelled(): + exc = "Cancelled" + else: + exc = task.exception() + if exc: + if isinstance(exc, asyncio.TimeoutError): + exc = "Timeout" + new_error = created_tasks_info[task] + ": {}".format(exc) + error_list.append(created_tasks_info[task]) + error_detail_list.append(new_error) + if isinstance( + exc, + ( + str, + DbException, + LcmException, + JujuPaasConnException, + JujuPaasServiceException, + ), + ): + self.logger.error(logging_text + new_error) + else: + exc_traceback = "".join( + traceback.format_exception(None, exc, exc.__traceback__) + ) + self.logger.error( + logging_text + created_tasks_info[task] + " " + exc_traceback + ) + else: + self.logger.debug(logging_text + created_tasks_info[task] + ": Done") + + return new_error, error_list, error_detail_list + + async def _wait_for_tasks( + self, + logging_text: str, + created_tasks_info: dict, + timeout: int, + stage: str, + nslcmop_id: str, + nsr_id: str, + ) -> None: + """Wait for tasks to be completed. + Args: + logging_text (str): Log message + created_tasks_info (dict): Dictionary which includes the tasks + timeout (inst): Timeout in seconds + stage (str): Indicates the stage of operations + nslcmop_id (str): NS LCM Operation ID + nsr_id (str): NS service record to be used + """ + time_start = time() + error_detail_list, error_list = [], [] + pending_tasks = list(created_tasks_info.keys()) + num_tasks = len(pending_tasks) + num_done = 0 + + self._write_op_status(nslcmop_id, stage=f"{stage}: {num_done}/{num_tasks}") + + while pending_tasks: + _timeout = timeout + time_start - time() + done, pending_tasks = await asyncio.wait( + pending_tasks, timeout=_timeout, return_when=asyncio.FIRST_COMPLETED + ) + num_done += len(done) + if not done: + # Timeout error + for task in pending_tasks: + new_error = created_tasks_info[task] + ": Timeout" + error_detail_list.append(new_error) + error_list.append(new_error) + break + # Find out the errors in completed tasks + new_error, error_list, error_detail_list = self._check_tasks_in_done( + completed_tasks_list=done, + created_tasks_info=created_tasks_info, + error_detail_list=error_detail_list, + error_list=error_list, + logging_text=logging_text, + ) + + self._update_nsr_error_desc( + stage=f"{stage}: {num_done}/{num_tasks}", + new_error=new_error, + error_list=error_list, + error_detail_list=error_detail_list, + nsr_id=nsr_id, + ) + + self._write_op_status(nslcmop_id, stage=f"{stage}: {num_done}/{num_tasks}") + + return error_detail_list + + def _prepare_db_before_operation( + self, + db_nsr_update: dict, + nsr_id: str, + nslcmop_id: str, + detailed: str = None, + operational: str = None, + ns_state: str = None, + current_op: str = None, + stage: str = None, + ) -> None: + """Update DB before performing NS operations + Args: + db_nsr_update (dict): NS record update dictionary + nsr_id (str): NS record ID + nslcmop_id (str): NS LCM Operation ID + detailed: (str): Detailed status + operational (str): Operational status + ns_state (str): NS state + current_op (str): Current operation name + stage (str): Indicates the stage of operations + """ + db_nsr_update["detailed-status"] = detailed + db_nsr_update["operational-status"] = operational + + self._write_ns_status( + nsr_id=nsr_id, + ns_state=ns_state, + current_operation=current_op, + current_operation_id=nslcmop_id, + other_update=db_nsr_update, + ) + self._write_op_status(op_id=nslcmop_id, stage=stage, queue_position=0) + + async def _report_to_kafka( + self, + nsr_id: str, + nslcmop_id: str, + nslcmop_operation_state: str, + logging_text: str, + message: str, + autoremove="False", + ) -> None: + """Report operation status to Kafka. + Args: + nsr_id (str): NS record ID + nslcmop_id (str): NS LCM Operation ID + nslcmop_operation_state (str): NS LCM Operation status + logging_text (str): Common log message + message (str): Message which is sent through Kafka + autoremove (Boolean): True/False If True NBI deletes NS from DB + + Raises: + PaasServiceException + """ + if nslcmop_operation_state: + update_dict = { + "nsr_id": nsr_id, + "nslcmop_id": nslcmop_id, + "operationState": nslcmop_operation_state, + } + if message == "terminated": + update_dict["autoremove"] = autoremove + try: + await self.msg.aiowrite( + "ns", + message, + update_dict, + loop=self.loop, + ) + except MsgException as e: + error = logging_text + f"kafka_write notification Exception: {e}" + self.logger.error(error) + raise PaasServiceException(error) + + def _update_ns_state(self, nsr_id: str, db_nsr_update: dict, ns_state: str) -> None: + """Update NS state in NSR and VNFRs + Args: + nsr_id (str): NS record ID + db_nsr_update (dict): NS record dictionary + ns_state (str): NS status + """ + db_nsr_update["_admin.nsState"] = ns_state + self.update_db_2("nsrs", nsr_id, db_nsr_update) + self.db.set_list("vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": ns_state}) + + def _update_db_ns_state_after_operation( + self, + error_list: list, + operation_type: str, + nslcmop_id: str, + db_nsr_update: dict, + db_nsr: dict, + nsr_id: str, + ) -> None: + """Update NS status at database after performing operations + Args: + error_list (list): List of errors + operation_type (str): Type of operation such as instantiate/terminate + nslcmop_id (str): NS LCM Operation ID + db_nsr_update (dict): NSR update dictionary + db_nsr (dict): NS record dictionary + nsr_id (str): NS record ID + """ + ns_state = "" + if error_list: + error_detail = ". ".join(error_list) + error_description_nsr = "Operation: {}.{}".format( + operation_type, nslcmop_id + ) + db_nsr_update["detailed-status"] = ( + error_description_nsr + " Detail: " + error_detail + ) + ns_state = "BROKEN" + + else: + error_detail = None + error_description_nsr = None + db_nsr_update["detailed-status"] = "Done" + if operation_type == "instantiate": + ns_state = "READY" + elif operation_type == "terminate": + ns_state = "NOT_INSTANTIATED" + db_nsr_update["operational-status"] = "terminated" + db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED" + + if db_nsr: + self._write_ns_status( + nsr_id=nsr_id, + ns_state=ns_state, + current_operation="IDLE", + current_operation_id=None, + error_description=error_description_nsr, + error_detail=error_detail, + other_update=db_nsr_update, + ) + + if ns_state == "NOT_INSTANTIATED": + self.db.set_list( + "vnfrs", + {"nsr-id-ref": nsr_id}, + {"_admin.nsState": "NOT_INSTANTIATED"}, + ) + + def _update_db_nslcmop_status_after_operation( + self, error_list: list, db_nslcmop_update: dict, nslcmop_id: str + ) -> str: + """Update NS LCM operation status at database after performing operation + Args + error_list (list): List of errors + db_nslcmop_update (dict): NS LCM operation update dictionary + nslcmop_id (str): NS LCM Operation ID + + Returns: + nslcmop_operation_state (str): State of NS LCM operation + """ + if error_list: + error_detail = ". ".join(error_list) + error_description_nslcmop = "Detail: {}".format(error_detail) + db_nslcmop_update["detailed-status"] = error_detail + nslcmop_operation_state = "FAILED" + + else: + error_description_nslcmop = None + db_nslcmop_update["detailed-status"] = "Done" + nslcmop_operation_state = "COMPLETED" + + self._write_op_status( + op_id=nslcmop_id, + stage=nslcmop_operation_state, + error_message=error_description_nslcmop, + operation_state=nslcmop_operation_state, + other_update=db_nslcmop_update, + ) + + return nslcmop_operation_state + + def _update_db_after_operation( + self, + nslcmop_id: str, + db_nsr: str, + nsr_id: str, + db_nslcmop_update: dict = None, + db_nsr_update: dict = None, + error_list: list = None, + operation_type: str = None, + ) -> str: + """Update database after operation is performed. + Args: + nslcmop_id (str): NS LCM Operation ID + db_nsr (dict): NS record dictionary + nsr_id (str): NS record ID + db_nslcmop_update (dict): NS LCM operation update dictionary + db_nsr_update (dict): NSR update dictionary + error_list (list): List of errors + operation_type (str): Type of operation such as instantiate/terminate + + Returns: + nslcmop_operation_state (str): State of NS LCM operation + """ + # Update NS state + self._update_db_ns_state_after_operation( + error_list=error_list, + operation_type=operation_type, + nslcmop_id=nslcmop_id, + db_nsr_update=db_nsr_update, + db_nsr=db_nsr, + nsr_id=nsr_id, + ) + + # Update NS LCM Operation State + nslcmop_operation_state = self._update_db_nslcmop_status_after_operation( + error_list, db_nslcmop_update, nslcmop_id + ) + return nslcmop_operation_state + + async def instantiate(self, nsr_id: str, nslcmop_id: str) -> None: + """Perform PaaS Service instantiation. + Args: + nsr_id (str): NS service record to be used + nslcmop_id (str): NS LCM operation id + """ + # Locking HA task + if not self._lock_ha_task(nslcmop_id, nsr_id, keyword="instantiate"): + return + + logging_text = f"Task ns={nsr_id} instantiate={nslcmop_id} " + self.logger.debug(logging_text + "Enter") + + # Required containers + db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) + db_nsr_update, db_nslcmop_update, tasks_dict_info = {}, {}, {} + exc = None + error_list = [] + + try: + # Wait for any previous tasks in process + await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id) + # Update nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id + self._prepare_db_before_operation( + db_nsr_update, + nsr_id, + nslcmop_id, + detailed="creating", + operational="init", + ns_state="BUILDING", + current_op="INSTANTIATING", + stage="Building", + ) + + # Perform PaaS Service Deployment using PaaS Connector + self.logger.debug(logging_text + "Creating instantiate task") + task_instantiate = asyncio.ensure_future( + self.paas_connector.instantiate(nsr_id, nslcmop_id) + ) + self.lcm_tasks.register( + "ns", + nsr_id, + nslcmop_id, + "instantiate_juju_paas_service", + task_instantiate, + ) + tasks_dict_info[task_instantiate] = "Instantiate juju PaaS Service" + + # Update nsState="INSTANTIATED" + self.logger.debug(logging_text + "INSTANTIATED") + self._update_ns_state(nsr_id, db_nsr_update, "INSTANTIATED") + + except ( + DbException, + LcmException, + JujuPaasConnException, + JujuPaasServiceException, + ) as e: + self.logger.error(logging_text + "Exit Exception: {}".format(e)) + exc = e + except asyncio.CancelledError: + self.logger.error(logging_text + "Cancelled Exception") + exc = "Operation was cancelled" + + finally: + if exc: + error_list.append(str(exc)) + try: + if tasks_dict_info: + # Wait for pending tasks + stage = "Waiting for instantiate pending tasks." + self.logger.debug(logging_text + stage) + error_list += await self._wait_for_tasks( + logging_text, + tasks_dict_info, + self.timeout_ns_deploy, + stage, + nslcmop_id, + nsr_id=nsr_id, + ) + except asyncio.CancelledError: + error_list.append("Cancelled") + except Exception as exc: + error_list.append(str(exc)) + + # Update operational-status + self.logger.debug("updating operational status") + db_nsr_update["operational-status"] = "running" + + # Update status at database after operation + self.logger.debug(logging_text + "Updating DB after operation") + nslcmop_operation_state = self._update_db_after_operation( + nslcmop_id, + db_nsr, + nsr_id, + db_nslcmop_update=db_nslcmop_update, + db_nsr_update=db_nsr_update, + error_list=error_list, + operation_type="instantiate", + ) + + # Write to Kafka bus to report the operation status + await self._report_to_kafka( + nsr_id, + nslcmop_id, + nslcmop_operation_state, + logging_text, + "instantiated", + ) + self.logger.debug(logging_text + "Exit") + + # Remove task + self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate") + + async def terminate(self, nsr_id: str, nslcmop_id: str) -> None: + """Perform PaaS Service termination. + Args: + nsr_id (str): NS service record to be used + nslcmop_id (str): NS LCM operation id + """ + # Locking HA task + if not self._lock_ha_task(nslcmop_id, nsr_id, keyword="terminate"): + return + + logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id) + self.logger.debug(logging_text + "Enter") + + # Update ns termination timeout + timeout_ns_terminate = self.timeout_ns_deploy + db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) + db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id}) + operation_params = db_nslcmop.get("operationParams") or {} + + if operation_params.get("timeout_ns_terminate"): + timeout_ns_terminate = operation_params["timeout_ns_terminate"] + + # Required containers + autoremove = False + db_nsr_update, db_nslcmop_update, tasks_dict_info = {}, {}, {} + exc = None + error_list = [] + + try: + # Wait for any previous tasks in process + await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id) + + # Update nsState="TERMINATING", currentOperation="TERMINATING", currentOperationID=nslcmop_id + self._prepare_db_before_operation( + db_nsr_update, + nsr_id, + nslcmop_id, + detailed="terminating", + operational="terminate", + ns_state="TERMINATING", + current_op="TERMINATING", + stage="terminating", + ) + + # Perform PaaS Service deletion using PaaS Connector + self.logger.debug(logging_text + "Creating terminate task") + task_terminate = asyncio.ensure_future( + self.paas_connector.terminate(nsr_id, nslcmop_id) + ) + self.lcm_tasks.register( + "ns", nsr_id, nslcmop_id, "terminate_juju_paas_service", task_terminate + ) + tasks_dict_info[task_terminate] = "Terminate juju PaaS Service" + + # Update nsState="TERMINATED" + self.logger.debug(logging_text + "TERMINATED") + self._update_ns_state(nsr_id, db_nsr_update, "TERMINATED") + + except ( + DbException, + LcmException, + JujuPaasConnException, + JujuPaasServiceException, + ) as e: + self.logger.error(logging_text + "Exit Exception: {}".format(e)) + exc = e + except asyncio.CancelledError: + self.logger.error(logging_text + "Cancelled Exception") + exc = "Operation was cancelled" + + finally: + if exc: + error_list.append(str(exc)) + try: + if tasks_dict_info: + # Wait for pending tasks + stage = "Waiting for pending tasks for termination." + self.logger.debug(logging_text + stage) + error_list += await self._wait_for_tasks( + logging_text, + tasks_dict_info, + min(self.timeout_ns_deploy, timeout_ns_terminate), + stage, + nslcmop_id, + nsr_id=nsr_id, + ) + except asyncio.CancelledError: + error_list.append("Cancelled") + except Exception as exc: + error_list.append(str(exc)) + + # Update status at database + nslcmop_operation_state = self._update_db_after_operation( + nslcmop_id, + db_nsr, + nsr_id, + db_nslcmop_update=db_nslcmop_update, + db_nsr_update=db_nsr_update, + error_list=error_list, + operation_type="terminate", + ) + + # Write to Kafka bus to report the operation status + if operation_params: + autoremove = operation_params.get("autoremove", False) + + await self._report_to_kafka( + nsr_id, + nslcmop_id, + nslcmop_operation_state, + logging_text, + "terminated", + autoremove=autoremove, + ) + self.logger.debug(logging_text + "Exit") + + # Remove task + self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate") + + async def action(self, nsr_id: str, nslcmop_id: str): + """Perform action on PaaS service. + Args: + nsr_id (str): NS service record to be used + nslcmop_id (str): NS LCM operation id + + Raises: + NotImplementedError + """ + raise NotImplementedError("Juju Paas Service action method is not implemented") diff --git a/osm_lcm/tests/test_lcm_helm_conn.py b/osm_lcm/tests/test_lcm_helm_conn.py index 7116517..3730162 100644 --- a/osm_lcm/tests/test_lcm_helm_conn.py +++ b/osm_lcm/tests/test_lcm_helm_conn.py @@ -79,8 +79,11 @@ class TestLcmHelmConn(asynctest.TestCase): self.db.get_one.return_value = {"_admin": {"helm-chart-v3": {"id": "myk8s_id"}}} ee_id, _ = await self.helm_conn.create_execution_environment( - namespace, db_dict, artifact_path=artifact_path, - chart_model=chart_model, vca_type="helm-v3" + namespace, + db_dict, + artifact_path=artifact_path, + chart_model=chart_model, + vca_type="helm-v3", ) self.assertEqual( ee_id, -- 2.25.1