Adding PaaS Service Creation 16/12616/3
authorGulsum Atici <gulsum.atici@canonical.com>
Fri, 30 Sep 2022 11:31:26 +0000 (14:31 +0300)
committerGulsum Atici <gulsum.atici@canonical.com>
Wed, 26 Oct 2022 18:15:07 +0000 (21:15 +0300)
Adding AbstractPaasConnector and JujuPaasService Classes

Change-Id: I1678a8aa9d9fa453c5e21a340c29c35c82989594
Signed-off-by: Gulsum Atici <gulsum.atici@canonical.com>
osm_lcm/lcm.py
osm_lcm/lcm_utils.py
osm_lcm/paas_conn.py [new file with mode: 0644]
osm_lcm/paas_service.py [new file with mode: 0644]
osm_lcm/tests/test_lcm_helm_conn.py

index 5bbeade..5f34280 100644 (file)
@@ -34,7 +34,14 @@ from osm_lcm.ng_ro import NgRoException, NgRoClient
 from osm_lcm.ROclient import ROClient, ROClientException
 
 from time import time
-from osm_lcm.lcm_utils import versiontuple, LcmException, TaskRegistry, LcmExceptionExit
+from osm_lcm.lcm_utils import (
+    get_paas_id_by_nsr_id,
+    get_paas_type_by_paas_id,
+    LcmException,
+    LcmExceptionExit,
+    TaskRegistry,
+    versiontuple,
+)
 from osm_lcm import version as lcm_version, version_date as lcm_version_date
 
 from osm_common import msglocal, msgkafka
@@ -45,6 +52,7 @@ from osm_common.msgbase import MsgException
 from osm_lcm.data_utils.database.database import Database
 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
 from osm_lcm.lcm_hc import get_health_check_file
+from osm_lcm.paas_service import paas_service_factory
 from os import environ, path
 from random import choice as random_choice
 from n2vc import version as n2vc_version
@@ -78,7 +86,7 @@ class Lcm:
     def __init__(self, config_file, loop=None):
         """
         Init, Connect to database, filesystem storage, and messaging
-        :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
+        :param config_file: two level dictionary with configuration. Top level should contain 'database', 'storage',
         :return: None
         """
         self.db = None
@@ -123,8 +131,13 @@ class Lcm:
             self.vim
         ) = (
             self.wim
-        ) = self.sdn = self.k8scluster = self.vca = self.k8srepo = self.paas = None
-
+        ) = (
+            self.sdn
+        ) = (
+            self.k8scluster
+        ) = (
+            self.vca
+        ) = self.k8srepo = self.paas = self.paas_service = self.juju_paas = None
         # logging
         log_format_simple = (
             "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
@@ -334,6 +347,77 @@ class Lcm:
         else:
             self.logger.error("Invalid command {} for PaaS topic".format(command))
 
+    def _kafka_read_ns_instantiate(self, params: dict) -> None:
+        """Operations to be performed if the topic is ns and command is instantiate.
+        Args:
+            params  (dict):     Dictionary including NS related parameters
+        """
+        nsr_id, nslcmop_id = params["nsInstanceId"], params["_id"]
+        paas_id = params["operationParams"].get("paasAccountId")
+
+        if paas_id:
+            paas_type = get_paas_type_by_paas_id(paas_id, self.db)
+            task = asyncio.ensure_future(
+                self.paas_service[paas_type].instantiate(nsr_id, nslcmop_id)
+            )
+            self.logger.debug(
+                "Deploying NS {} using PaaS account {}".format(nsr_id, paas_id)
+            )
+
+        else:
+            task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id))
+            self.logger.debug("Deploying NS {}".format(nsr_id))
+
+        self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_instantiate", task)
+
+    def _kafka_read_ns_terminate(self, params: dict, topic: str) -> None:
+        """Operations to be performed if the topic is ns and command is terminate.
+        Args:
+            params  (dict):     Dictionary including NS related parameters
+            topic   (str):      Name of Kafka topic
+        """
+        nsr_id, nslcmop_id = params["nsInstanceId"], params["_id"]
+        paas_id = get_paas_id_by_nsr_id(nsr_id, self.db)
+
+        if paas_id:
+            paas_type = get_paas_type_by_paas_id(paas_id, self.db)
+            task = asyncio.ensure_future(
+                self.paas_service[paas_type].terminate(nsr_id, nslcmop_id)
+            )
+            self.logger.debug(
+                "Terminating NS {} using PaaS account {}".format(nsr_id, paas_id)
+            )
+
+        else:
+            self.lcm_tasks.cancel(topic, nsr_id)
+            task = asyncio.ensure_future(self.ns.terminate(nsr_id, nslcmop_id))
+            self.logger.debug("Terminating NS {}".format(nsr_id))
+
+        self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_terminate", task)
+
+    def _kafka_read_ns_action(self, params: dict) -> None:
+        """Operations to be performed if the topic is ns and command is action.
+        Args:
+            params  (dict):     Dictionary including NS related parameters
+        """
+        nsr_id, nslcmop_id = params["nsInstanceId"], params["_id"]
+        paas_id = get_paas_id_by_nsr_id(nsr_id, self.db)
+
+        if paas_id:
+            paas_type = get_paas_type_by_paas_id(paas_id, self.db)
+            task = asyncio.ensure_future(
+                self.paas_service[paas_type].action(nsr_id, nslcmop_id)
+            )
+            self.logger.debug(
+                "Running action on NS {} using PaaS account {}".format(nsr_id, paas_id)
+            )
+
+        else:
+            task = asyncio.ensure_future(self.ns.action(nsr_id, nslcmop_id))
+            self.logger.debug("Running action on NS {}".format(nsr_id))
+
+        self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_action", task)
+
     def kafka_read_callback(self, topic, command, params):
         order_id = 1
 
@@ -423,24 +507,13 @@ class Lcm:
                 return
         elif topic == "ns":
             if command == "instantiate":
-                # self.logger.debug("Deploying NS {}".format(nsr_id))
-                nslcmop = params
-                nslcmop_id = nslcmop["_id"]
-                nsr_id = nslcmop["nsInstanceId"]
-                task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id))
-                self.lcm_tasks.register(
-                    "ns", nsr_id, nslcmop_id, "ns_instantiate", task
-                )
+                self._kafka_read_ns_instantiate(params)
                 return
+
             elif command == "terminate":
-                # self.logger.debug("Deleting NS {}".format(nsr_id))
-                nslcmop = params
-                nslcmop_id = nslcmop["_id"]
-                nsr_id = nslcmop["nsInstanceId"]
-                self.lcm_tasks.cancel(topic, nsr_id)
-                task = asyncio.ensure_future(self.ns.terminate(nsr_id, nslcmop_id))
-                self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_terminate", task)
+                self._kafka_read_ns_terminate(params, topic)
                 return
+
             elif command == "vca_status_refresh":
                 nslcmop = params
                 nslcmop_id = nslcmop["_id"]
@@ -452,49 +525,36 @@ class Lcm:
                     "ns", nsr_id, nslcmop_id, "ns_vca_status_refresh", task
                 )
                 return
+
             elif command == "action":
-                # self.logger.debug("Update NS {}".format(nsr_id))
-                nslcmop = params
-                nslcmop_id = nslcmop["_id"]
-                nsr_id = nslcmop["nsInstanceId"]
-                task = asyncio.ensure_future(self.ns.action(nsr_id, nslcmop_id))
-                self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_action", task)
+                self._kafka_read_ns_action(params)
                 return
+
             elif command == "update":
                 # self.logger.debug("Update NS {}".format(nsr_id))
-                nslcmop = params
-                nslcmop_id = nslcmop["_id"]
-                nsr_id = nslcmop["nsInstanceId"]
+                nsr_id, nslcmop_id = params["nsInstanceId"], params["_id"]
                 task = asyncio.ensure_future(self.ns.update(nsr_id, nslcmop_id))
                 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_update", task)
                 return
             elif command == "scale":
                 # self.logger.debug("Update NS {}".format(nsr_id))
-                nslcmop = params
-                nslcmop_id = nslcmop["_id"]
-                nsr_id = nslcmop["nsInstanceId"]
+                nsr_id, nslcmop_id = params["nsInstanceId"], params["_id"]
                 task = asyncio.ensure_future(self.ns.scale(nsr_id, nslcmop_id))
                 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_scale", task)
                 return
             elif command == "heal":
                 # self.logger.debug("Healing NS {}".format(nsr_id))
-                nslcmop = params
-                nslcmop_id = nslcmop["_id"]
-                nsr_id = nslcmop["nsInstanceId"]
+                nsr_id, nslcmop_id = params["nsInstanceId"], params["_id"]
                 task = asyncio.ensure_future(self.ns.heal(nsr_id, nslcmop_id))
                 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_heal", task)
                 return
             elif command == "migrate":
-                nslcmop = params
-                nslcmop_id = nslcmop["_id"]
-                nsr_id = nslcmop["nsInstanceId"]
+                nsr_id, nslcmop_id = params["nsInstanceId"], params["_id"]
                 task = asyncio.ensure_future(self.ns.migrate(nsr_id, nslcmop_id))
                 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_migrate", task)
                 return
             elif command == "verticalscale":
-                nslcmop = params
-                nslcmop_id = nslcmop["_id"]
-                nsr_id = nslcmop["nsInstanceId"]
+                nsr_id, nslcmop_id = params["nsInstanceId"], params["_id"]
                 task = asyncio.ensure_future(self.ns.vertical_scale(nsr_id, nslcmop_id))
                 self.logger.debug(
                     "nsr_id,nslcmop_id,task {},{},{}".format(nsr_id, nslcmop_id, task)
@@ -519,7 +579,7 @@ class Lcm:
                             db_nsr["config-status"],
                             db_nsr["detailed-status"],
                             db_nsr["_admin"]["deployed"],
-                            self.lcm_ns_tasks.get(nsr_id),
+                            self.lcm_tasks.task_registry["ns"][nsr_id],
                         )
                     )
                 except Exception as e:
@@ -755,6 +815,21 @@ class Lcm:
             self.msg, self.lcm_tasks, self.config, self.loop
         )
 
+        # Specific PaaS Service Object for "Juju" PaaS Orchestrator type
+        self.juju_paas = paas_service_factory(
+            self.msg,
+            self.lcm_tasks,
+            self.db,
+            self.fs,
+            self.logger,
+            self.loop,
+            self.config,
+            "juju",
+        )
+        # Mapping between paas_type and PaaS service object
+        self.paas_service = {
+            "juju": self.juju_paas,
+        }
         self.loop.run_until_complete(
             asyncio.gather(self.kafka_read(), self.kafka_ping())
         )
index 19852d0..5cd5a2f 100644 (file)
@@ -88,6 +88,32 @@ def get_iterable(in_dict, in_key):
     return in_dict[in_key]
 
 
+def get_paas_id_by_nsr_id(nsr_id: str, db: object) -> str:
+    """Get the PaaS account ID using NS record ID.
+    Args:
+        nsr_id (str):       NS record ID
+        db  (object):       Database Object
+
+    Returns:
+        paas_id   (str)     PaaS account ID
+    """
+    db_nsr = db.get_one("nsrs", {"_id": nsr_id})
+    return db_nsr.get("paasdatacenter")
+
+
+def get_paas_type_by_paas_id(paas_id: str, db: object) -> str:
+    """Get the PaaS type using PaaS account ID.
+    Args:
+        paas_id (str):      PaaS account ID
+        db  (object):       Database Object
+
+    Returns:
+        paas_type   (str)   Paas Orchestrator type
+    """
+    db_paas = db.get_one("paas", {"_id": paas_id})
+    return db_paas["paas_type"]
+
+
 def check_juju_bundle_existence(vnfd: dict) -> str:
     """Checks the existence of juju-bundle in the descriptor
 
diff --git a/osm_lcm/paas_conn.py b/osm_lcm/paas_conn.py
new file mode 100644 (file)
index 0000000..78638b3
--- /dev/null
@@ -0,0 +1,213 @@
+#!/usr/bin/python3
+#
+# Copyright 2022 Canonical Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+#     Unless required by applicable law or agreed to in writing, software
+#     distributed under the License is distributed on an "AS IS" BASIS,
+#     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#     See the License for the specific language governing permissions and
+#     limitations under the License.
+
+import abc
+import asyncio
+import logging
+
+
+def paas_connector_factory(
+    uuid: str,
+    name: str,
+    db: object,
+    fs: object,
+    loop: object,
+    log: object,
+    config: dict,
+    paas_type="juju",
+):
+    """Factory Method to create the paas_connector objects according to PaaS Type.
+    Args:
+        uuid    (str):              Internal id of PaaS account
+        name    (str):              name assigned to PaaS account, can be used for logging
+        db  (object):               Database object to write current operation status
+        fs  (object):               Filesystem object to use during operations
+        loop    (object):           Async event loop object
+        log (object):               Logger for tracing
+        config  (dict):             Dictionary with extra PaaS information.
+        paas_type   (str):          Identifier to create paas_connector object using correct PaaS Connector Class
+
+    Returns:
+        paas_connector  (object):   paas_connector objects created according to given PaaS Type
+
+    Raises:
+        PaasConnException
+    """
+    connectors = {
+        "juju": JujuPaasConnector,
+    }
+    if paas_type not in connectors.keys():
+        raise PaasConnException(f"PaaS type: {paas_type} is not available.")
+
+    return connectors[paas_type](uuid, name, db, fs, loop, log, config)
+
+
+class PaasConnException(Exception):
+    """PaaS Connector Exception Base Class"""
+
+    def __init__(self, message: str = ""):
+        """Constructor of PaaS Connector Exception
+        Args:
+            message (str):  error message to be raised
+        """
+        Exception.__init__(self, message)
+        self.message = message
+
+    def __str__(self):
+        return self.message
+
+    def __repr__(self):
+        return "{}({})".format(type(self), self.message)
+
+
+class JujuPaasConnException(PaasConnException):
+    """Juju PaaS Connector Exception Class"""
+
+
+class AbstractPaasConnector(abc.ABC):
+    """Abstract PaaS Connector class to perform operations using PaaS Orchestrator."""
+
+    def __init__(
+        self,
+        uuid=None,
+        name=None,
+        db=None,
+        fs=None,
+        logger=None,
+        loop=None,
+        config=None,
+    ):
+        """Constructor of PaaS Connector.
+        Args:
+            uuid    (str):      internal id of PaaS account
+            name    (str):      name assigned to this account, can be used for logging
+            db  (object):       database object to write current operation status
+            fs  (object):       Filesystem object to use during operations
+            logger (object):    Logger for tracing
+            loop    (object):   Async event loop object
+            config  (dict):     Dictionary with extra PaaS information.
+        """
+        self.id = uuid
+        self.name = name
+        self.db = db
+        self.fs = fs
+        self.config = config or {}
+        self.logger = logger
+
+    @abc.abstractmethod
+    async def connect(self, endpoints: str, user: str = None, secret: str = None):
+        """Abstract method to connect PaaS account using endpoints, user and secret.
+        Args:
+            endpoints   (str):     Endpoint/URL to connect PaaS account
+            user    (str):         User which is used to connect PaaS account
+            secret  (str):         Used for authentication
+        """
+
+    @abc.abstractmethod
+    async def instantiate(self, nsr_id: str, nslcmop_id: str):
+        """Abstract method to perform PaaS Service instantiation.
+        Args:
+            nsr_id   (str):     NS service record to be used
+            nslcmop_id  (str):  NS LCM operation id
+        """
+
+    @abc.abstractmethod
+    async def terminate(self, nsr_id: str, nslcmop_id: str):
+        """Abstract method to perform PaaS Service termination.
+        Args:
+            nsr_id   (str):     NS service record to be used
+            nslcmop_id  (str):  NS LCM operation id
+        """
+
+    @abc.abstractmethod
+    async def action(self, nsr_id: str, nslcmop_id: str):
+        """Abstract method to perform action on PaaS Service.
+        Args:
+            nsr_id   (str):     NS service record to be used
+            nslcmop_id  (str):  NS LCM operation id
+        """
+
+
+class JujuPaasConnector(AbstractPaasConnector):
+    """Concrete PaaS Connector class to perform operations using the Juju PaaS Orchestrator."""
+
+    def __init__(
+        self,
+        uuid=None,
+        name=None,
+        db=None,
+        fs=None,
+        logger=None,
+        loop=None,
+        config=None,
+    ):
+        self.logger = logging.getLogger("lcm.juju_paas_connector")
+        super(JujuPaasConnector, self).__init__(logger=self.logger)
+
+    async def connect(self, endpoints: str, user: str = None, secret: str = None):
+        """Connect Juju PaaS account using endpoints, user and secret.
+        Args:
+            endpoints   (str):     Endpoint/URL to connect PaaS account
+            user    (str):         User which is used to connect PaaS account
+            secret  (str):         Used for authentication
+
+        Raises:
+            NotImplementedError
+        """
+        raise NotImplementedError(
+            "Juju Paas Connector connect method is not implemented"
+        )
+
+    async def instantiate(self, nsr_id: str, nslcmop_id: str):
+        """Perform Service instantiation.
+        Args:
+            nsr_id   (str):     NS service record to be used
+            nslcmop_id  (str):  NS LCM operation id
+
+        Raises:
+            JujuPaasConnException
+        """
+        # This is not the real implementation
+        # Sample code blocks to validate method execution
+        await asyncio.sleep(1)
+        self.logger.debug("Juju Paas Connector instantiate method is called")
+
+    async def terminate(self, nsr_id: str, nslcmop_id: str):
+        """Perform PaaS Service termination.
+        Args:
+            nsr_id   (str):     NS service record to be used
+            nslcmop_id  (str):  NS LCM operation id
+
+        Raises:
+            JujuPaasConnException
+        """
+        # This is not the real implementation
+        # Sample code blocks to validate method execution
+        await asyncio.sleep(1)
+        self.logger.debug("Juju Paas Connector terminate method is called")
+
+    async def action(self, nsr_id: str, nslcmop_id: str):
+        """Perform action on PaaS Service.
+        Args:
+            nsr_id   (str):     NS service record to be used
+            nslcmop_id  (str):  NS LCM operation id
+
+        Raises:
+            NotImplementedError
+        """
+        raise NotImplementedError(
+            "Juju Paas Connector instantiate method is not implemented"
+        )
diff --git a/osm_lcm/paas_service.py b/osm_lcm/paas_service.py
new file mode 100644 (file)
index 0000000..0a01157
--- /dev/null
@@ -0,0 +1,847 @@
+# !/usr/bin/python3
+#
+# Copyright 2022 Canonical Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+#     Unless required by applicable law or agreed to in writing, software
+#     distributed under the License is distributed on an "AS IS" BASIS,
+#     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#     See the License for the specific language governing permissions and
+#     limitations under the License.
+
+import asyncio
+import logging
+import traceback
+
+from osm_common.dbbase import DbException
+from osm_common.msgbase import MsgException
+from osm_lcm.lcm_utils import LcmBase
+from osm_lcm.lcm_utils import LcmException
+from osm_lcm.paas_conn import JujuPaasConnException, paas_connector_factory
+
+from time import time
+
+
+def paas_service_factory(
+    msg: object,
+    lcm_tasks: object,
+    db: object,
+    fs: object,
+    log: object,
+    loop: object,
+    config: dict,
+    paas_type="juju",
+) -> object:
+    """Factory Method to create the paas_service objects according to PaaS Type.
+    Args:
+        msg (object):           Message object to be used to write the messages to Kafka Bus
+        lcm_tasks   (object):   Task object to register the tasks
+        db  (object):           Database object to write current operation status
+        fs  (object):           Filesystem object to use during operations
+        log (object)            Logger for tracing
+        loop (object)           Async event loop object
+        config  (dict):         Dictionary with extra PaaS Service information.
+        paas_type   (str):      Identifier to create paas_service object using correct PaaS Service Class
+
+    Returns:
+        paas_service  (object):   paas_service objects created according to given PaaS Type
+
+    Raises:
+        PaasServiceException
+    """
+    orchestrators = {
+        "juju": JujuPaasService,
+    }
+
+    if paas_type not in orchestrators.keys():
+        raise PaasServiceException(f"PaaS type: {paas_type} is not available.")
+
+    return orchestrators[paas_type](
+        msg=msg, lcm_tasks=lcm_tasks, db=db, fs=fs, loop=loop, logger=log, config=config
+    )
+
+
+class PaasServiceException(Exception):
+    """PaaS Service Exception Base Class"""
+
+    def __init__(self, message: str = ""):
+        """Constructor of PaaS Service Exception
+        Args:
+            message (str):  error message to be raised
+        """
+        Exception.__init__(self, message)
+        self.message = message
+
+    def __str__(self):
+        return self.message
+
+    def __repr__(self):
+        return "{}({})".format(type(self), self.message)
+
+
+class JujuPaasServiceException(PaasServiceException):
+    """Juju PaaS Service exception class"""
+
+
+class JujuPaasService(LcmBase):
+    """Juju PaaS Service class to handle ns operations such as instantiate, terminate, action etc."""
+
+    timeout_ns_deploy = 3600
+
+    def __init__(
+        self,
+        msg: object,
+        lcm_tasks: object,
+        db: object,
+        fs: object,
+        loop: object,
+        logger: object,
+        config: dict,
+    ):
+        """
+        Args:
+            msg (object):           Message object to be used to write the messages to Kafka Bus
+            lcm_tasks   (object):   Task object to register the tasks
+            db  (object):           Database object to write current operation status
+            fs  (object):           Filesystem object to use during operations
+            loop (object)           Async event loop object
+            logger (object):        Logger for tracing
+            config  (dict):         Dictionary with extra PaaS Service information.
+        """
+        self.logger = logging.getLogger("lcm.juju_paas_service")
+        self.loop = loop
+        self.lcm_tasks = lcm_tasks
+        self.config = config
+        super(JujuPaasService, self).__init__(msg=msg, logger=self.logger)
+
+        self.paas_connector = paas_connector_factory(
+            self.msg,
+            self.lcm_tasks,
+            self.db,
+            self.fs,
+            self.loop,
+            self.logger,
+            self.config,
+            "juju",
+        )
+
+    def _lock_ha_task(self, nslcmop_id: str, nsr_id: str, keyword: str) -> bool:
+        """Lock the task.
+        Args:
+            nslcmop_id  (str):          NS LCM operation id
+            nsr_id   (str):             NS service record to be used
+            keyword (str):              Word which indicates action such as instantiate, terminate
+
+        Returns:
+            task_is_locked_by_me (Boolean): True if task_is_locked_by_me else False
+        """
+        task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
+        if not task_is_locked_by_me:
+            self.logger.debug(
+                f"{keyword}() task is not locked by me, ns={nsr_id}, exiting."
+            )
+        return task_is_locked_by_me
+
+    def _write_ns_status(
+        self,
+        nsr_id: str,
+        ns_state: str,
+        current_operation: str,
+        current_operation_id: str,
+        error_description: str = None,
+        error_detail: str = None,
+        other_update: dict = None,
+    ) -> None:
+        """Update NS record.
+        Args:
+            nsr_id  (str):                      NS service record to be used
+            ns_state    (str):                  NS state
+            current_operation   (str):          Current operation name
+            current_operation_id    (str):      Current operation ID
+            error_description   (str):          Error description
+            error_detail        (str):          Details of error
+            other_update: (dict):               Other required changes at database if provided
+
+        Raises:
+            DbException
+        """
+        try:
+            db_dict = other_update or {}
+            db_update_dict = {
+                "_admin.nslcmop": current_operation_id,
+                "_admin.current-operation": current_operation_id,
+                "_admin.operation-type": current_operation
+                if current_operation != "IDLE"
+                else None,
+                "currentOperation": current_operation,
+                "currentOperationID": current_operation_id,
+                "errorDescription": error_description,
+                "errorDetail": error_detail,
+            }
+            db_dict.update(db_update_dict)
+
+            if ns_state:
+                db_dict["nsState"] = ns_state
+            self.update_db_2("nsrs", nsr_id, db_dict)
+
+        except DbException as e:
+            error = f"Error writing NS status, ns={nsr_id}: {e}"
+            self.logger.error(error)
+            raise JujuPaasServiceException(error)
+
+    def _write_op_status(
+        self,
+        op_id: str,
+        stage: str = None,
+        error_message: str = None,
+        queue_position: int = 0,
+        operation_state: str = None,
+        other_update: dict = None,
+    ) -> None:
+        """Update NS LCM Operation Status.
+        Args:
+            op_id  (str):                       Operation ID
+            stage    (str):                     Indicates the stage of operations
+            error_message   (str):              Error description
+            queue_position    (int):            Operation position in the queue
+            operation_state   (str):            State of operation
+            other_update: (dict):               Other required changes at database if provided
+
+        Raises:
+            DbException
+        """
+        try:
+            db_dict = other_update or {}
+            db_dict["queuePosition"] = queue_position
+            if stage:
+                db_dict["stage"] = str(stage)
+            if error_message:
+                db_dict["errorMessage"] = error_message
+            if operation_state:
+                db_dict["operationState"] = operation_state
+                db_dict["statusEnteredTime"] = time()
+            self.update_db_2("nslcmops", op_id, db_dict)
+
+        except DbException as e:
+            error = f"Error writing OPERATION status for op_id: {op_id} -> {e}"
+            self.logger.error(error)
+            raise JujuPaasServiceException(error)
+
+    def _update_nsr_error_desc(
+        self,
+        stage: str,
+        new_error: str,
+        error_list: list,
+        error_detail_list: list,
+        nsr_id: str,
+    ) -> None:
+        """Update error description in NS record.
+        Args:
+            stage   (str):          Indicates the stage of operations
+            new_error   (str):      New detected error
+            error_list  (str):      Updated error list
+            error_detail_list:      Updated detailed error list
+            nsr_id  (str):          NS service record to be used
+
+        Raises:
+            DbException
+        """
+        if new_error:
+            stage += " Errors: " + ". ".join(error_detail_list) + "."
+            if nsr_id:
+                try:
+                    # Update nsr
+                    self.update_db_2(
+                        "nsrs",
+                        nsr_id,
+                        {
+                            "errorDescription": "Error at: " + ", ".join(error_list),
+                            "errorDetail": ". ".join(error_detail_list),
+                        },
+                    )
+
+                except DbException as e:
+                    error = f"Error updating NSR error description for nsr_id: {nsr_id} -> {e}"
+                    self.logger.error(error)
+                    raise JujuPaasServiceException(error)
+
+    def _check_tasks_in_done(
+        self,
+        completed_tasks_list: list,
+        created_tasks_info: dict,
+        error_list: list,
+        error_detail_list: list,
+        logging_text: str,
+    ) -> (str, str, str):
+        """Check the completed tasks to detect errors
+        Args:
+            completed_tasks_list    (list):         List of completed tasks
+            created_tasks_info:     Dictionary which includes the tasks
+            error_list:             List of errors
+            error_detail_list:      List includes details of errors
+            logging_text:           Main log message
+
+        Returns:
+            new_error   (str):      New detected error
+            error_list  (str):      Updated error list
+            error_detail_list:      Updated detailed error list
+        """
+        new_error = ""
+        for task in completed_tasks_list:
+            if task.cancelled():
+                exc = "Cancelled"
+            else:
+                exc = task.exception()
+            if exc:
+                if isinstance(exc, asyncio.TimeoutError):
+                    exc = "Timeout"
+                new_error = created_tasks_info[task] + ": {}".format(exc)
+                error_list.append(created_tasks_info[task])
+                error_detail_list.append(new_error)
+                if isinstance(
+                    exc,
+                    (
+                        str,
+                        DbException,
+                        LcmException,
+                        JujuPaasConnException,
+                        JujuPaasServiceException,
+                    ),
+                ):
+                    self.logger.error(logging_text + new_error)
+                else:
+                    exc_traceback = "".join(
+                        traceback.format_exception(None, exc, exc.__traceback__)
+                    )
+                    self.logger.error(
+                        logging_text + created_tasks_info[task] + " " + exc_traceback
+                    )
+            else:
+                self.logger.debug(logging_text + created_tasks_info[task] + ": Done")
+
+        return new_error, error_list, error_detail_list
+
+    async def _wait_for_tasks(
+        self,
+        logging_text: str,
+        created_tasks_info: dict,
+        timeout: int,
+        stage: str,
+        nslcmop_id: str,
+        nsr_id: str,
+    ) -> None:
+        """Wait for tasks to be completed.
+        Args:
+            logging_text  (str):                Log message
+            created_tasks_info    (dict):       Dictionary which includes the tasks
+            timeout   (inst):                   Timeout in seconds
+            stage   (str):                      Indicates the stage of operations
+            nslcmop_id   (str):                 NS LCM Operation ID
+            nsr_id        (str):                NS service record to be used
+        """
+        time_start = time()
+        error_detail_list, error_list = [], []
+        pending_tasks = list(created_tasks_info.keys())
+        num_tasks = len(pending_tasks)
+        num_done = 0
+
+        self._write_op_status(nslcmop_id, stage=f"{stage}: {num_done}/{num_tasks}")
+
+        while pending_tasks:
+            _timeout = timeout + time_start - time()
+            done, pending_tasks = await asyncio.wait(
+                pending_tasks, timeout=_timeout, return_when=asyncio.FIRST_COMPLETED
+            )
+            num_done += len(done)
+            if not done:
+                # Timeout error
+                for task in pending_tasks:
+                    new_error = created_tasks_info[task] + ": Timeout"
+                    error_detail_list.append(new_error)
+                    error_list.append(new_error)
+                break
+            # Find out the errors in completed tasks
+            new_error, error_list, error_detail_list = self._check_tasks_in_done(
+                completed_tasks_list=done,
+                created_tasks_info=created_tasks_info,
+                error_detail_list=error_detail_list,
+                error_list=error_list,
+                logging_text=logging_text,
+            )
+
+            self._update_nsr_error_desc(
+                stage=f"{stage}: {num_done}/{num_tasks}",
+                new_error=new_error,
+                error_list=error_list,
+                error_detail_list=error_detail_list,
+                nsr_id=nsr_id,
+            )
+
+            self._write_op_status(nslcmop_id, stage=f"{stage}: {num_done}/{num_tasks}")
+
+        return error_detail_list
+
+    def _prepare_db_before_operation(
+        self,
+        db_nsr_update: dict,
+        nsr_id: str,
+        nslcmop_id: str,
+        detailed: str = None,
+        operational: str = None,
+        ns_state: str = None,
+        current_op: str = None,
+        stage: str = None,
+    ) -> None:
+        """Update DB before performing NS operations
+        Args:
+            db_nsr_update   (dict):  NS record update dictionary
+            nsr_id  (str):          NS record ID
+            nslcmop_id  (str):      NS LCM Operation ID
+            detailed:   (str):      Detailed status
+            operational     (str):  Operational status
+            ns_state    (str):      NS state
+            current_op  (str):      Current operation name
+            stage   (str):          Indicates the stage of operations
+        """
+        db_nsr_update["detailed-status"] = detailed
+        db_nsr_update["operational-status"] = operational
+
+        self._write_ns_status(
+            nsr_id=nsr_id,
+            ns_state=ns_state,
+            current_operation=current_op,
+            current_operation_id=nslcmop_id,
+            other_update=db_nsr_update,
+        )
+        self._write_op_status(op_id=nslcmop_id, stage=stage, queue_position=0)
+
+    async def _report_to_kafka(
+        self,
+        nsr_id: str,
+        nslcmop_id: str,
+        nslcmop_operation_state: str,
+        logging_text: str,
+        message: str,
+        autoremove="False",
+    ) -> None:
+        """Report operation status to Kafka.
+        Args:
+            nsr_id  (str):                  NS record ID
+            nslcmop_id  (str):              NS LCM Operation ID
+            nslcmop_operation_state (str):  NS LCM Operation status
+            logging_text    (str):          Common log message
+            message (str):                  Message which is sent through Kafka
+            autoremove  (Boolean):          True/False If True NBI deletes NS from DB
+
+        Raises:
+            PaasServiceException
+        """
+        if nslcmop_operation_state:
+            update_dict = {
+                "nsr_id": nsr_id,
+                "nslcmop_id": nslcmop_id,
+                "operationState": nslcmop_operation_state,
+            }
+            if message == "terminated":
+                update_dict["autoremove"] = autoremove
+            try:
+                await self.msg.aiowrite(
+                    "ns",
+                    message,
+                    update_dict,
+                    loop=self.loop,
+                )
+            except MsgException as e:
+                error = logging_text + f"kafka_write notification Exception: {e}"
+                self.logger.error(error)
+                raise PaasServiceException(error)
+
+    def _update_ns_state(self, nsr_id: str, db_nsr_update: dict, ns_state: str) -> None:
+        """Update NS state in NSR and VNFRs
+        Args:
+            nsr_id  (str):              NS record ID
+            db_nsr_update   (dict):     NS record dictionary
+            ns_state    (str):          NS status
+        """
+        db_nsr_update["_admin.nsState"] = ns_state
+        self.update_db_2("nsrs", nsr_id, db_nsr_update)
+        self.db.set_list("vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": ns_state})
+
+    def _update_db_ns_state_after_operation(
+        self,
+        error_list: list,
+        operation_type: str,
+        nslcmop_id: str,
+        db_nsr_update: dict,
+        db_nsr: dict,
+        nsr_id: str,
+    ) -> None:
+        """Update NS status at database after performing operations
+        Args:
+            error_list  (list):             List of errors
+            operation_type  (str):          Type of operation such as instantiate/terminate
+            nslcmop_id  (str):              NS LCM Operation ID
+            db_nsr_update   (dict):         NSR update dictionary
+            db_nsr  (dict):                 NS record dictionary
+            nsr_id  (str):                  NS record ID
+        """
+        ns_state = ""
+        if error_list:
+            error_detail = ". ".join(error_list)
+            error_description_nsr = "Operation: {}.{}".format(
+                operation_type, nslcmop_id
+            )
+            db_nsr_update["detailed-status"] = (
+                error_description_nsr + " Detail: " + error_detail
+            )
+            ns_state = "BROKEN"
+
+        else:
+            error_detail = None
+            error_description_nsr = None
+            db_nsr_update["detailed-status"] = "Done"
+            if operation_type == "instantiate":
+                ns_state = "READY"
+            elif operation_type == "terminate":
+                ns_state = "NOT_INSTANTIATED"
+                db_nsr_update["operational-status"] = "terminated"
+                db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
+
+        if db_nsr:
+            self._write_ns_status(
+                nsr_id=nsr_id,
+                ns_state=ns_state,
+                current_operation="IDLE",
+                current_operation_id=None,
+                error_description=error_description_nsr,
+                error_detail=error_detail,
+                other_update=db_nsr_update,
+            )
+
+        if ns_state == "NOT_INSTANTIATED":
+            self.db.set_list(
+                "vnfrs",
+                {"nsr-id-ref": nsr_id},
+                {"_admin.nsState": "NOT_INSTANTIATED"},
+            )
+
+    def _update_db_nslcmop_status_after_operation(
+        self, error_list: list, db_nslcmop_update: dict, nslcmop_id: str
+    ) -> str:
+        """Update NS LCM operation status at database after performing operation
+        Args
+            error_list  (list):             List of errors
+            db_nslcmop_update   (dict):     NS LCM operation update dictionary
+            nslcmop_id  (str):              NS LCM Operation ID
+
+        Returns:
+            nslcmop_operation_state (str):  State of NS LCM operation
+        """
+        if error_list:
+            error_detail = ". ".join(error_list)
+            error_description_nslcmop = "Detail: {}".format(error_detail)
+            db_nslcmop_update["detailed-status"] = error_detail
+            nslcmop_operation_state = "FAILED"
+
+        else:
+            error_description_nslcmop = None
+            db_nslcmop_update["detailed-status"] = "Done"
+            nslcmop_operation_state = "COMPLETED"
+
+        self._write_op_status(
+            op_id=nslcmop_id,
+            stage=nslcmop_operation_state,
+            error_message=error_description_nslcmop,
+            operation_state=nslcmop_operation_state,
+            other_update=db_nslcmop_update,
+        )
+
+        return nslcmop_operation_state
+
+    def _update_db_after_operation(
+        self,
+        nslcmop_id: str,
+        db_nsr: str,
+        nsr_id: str,
+        db_nslcmop_update: dict = None,
+        db_nsr_update: dict = None,
+        error_list: list = None,
+        operation_type: str = None,
+    ) -> str:
+        """Update database after operation is performed.
+        Args:
+            nslcmop_id  (str):              NS LCM Operation ID
+            db_nsr  (dict):                 NS record dictionary
+            nsr_id  (str):                  NS record ID
+            db_nslcmop_update   (dict):     NS LCM operation update dictionary
+            db_nsr_update   (dict):         NSR update dictionary
+            error_list  (list):             List of errors
+            operation_type  (str):          Type of operation such as instantiate/terminate
+
+        Returns:
+            nslcmop_operation_state (str):  State of NS LCM operation
+        """
+        # Update NS state
+        self._update_db_ns_state_after_operation(
+            error_list=error_list,
+            operation_type=operation_type,
+            nslcmop_id=nslcmop_id,
+            db_nsr_update=db_nsr_update,
+            db_nsr=db_nsr,
+            nsr_id=nsr_id,
+        )
+
+        # Update NS LCM Operation State
+        nslcmop_operation_state = self._update_db_nslcmop_status_after_operation(
+            error_list, db_nslcmop_update, nslcmop_id
+        )
+        return nslcmop_operation_state
+
+    async def instantiate(self, nsr_id: str, nslcmop_id: str) -> None:
+        """Perform PaaS Service instantiation.
+        Args:
+            nsr_id   (str):     NS service record to be used
+            nslcmop_id  (str):  NS LCM operation id
+        """
+        # Locking HA task
+        if not self._lock_ha_task(nslcmop_id, nsr_id, keyword="instantiate"):
+            return
+
+        logging_text = f"Task ns={nsr_id} instantiate={nslcmop_id} "
+        self.logger.debug(logging_text + "Enter")
+
+        # Required containers
+        db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+        db_nsr_update, db_nslcmop_update, tasks_dict_info = {}, {}, {}
+        exc = None
+        error_list = []
+
+        try:
+            # Wait for any previous tasks in process
+            await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
+            # Update nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
+            self._prepare_db_before_operation(
+                db_nsr_update,
+                nsr_id,
+                nslcmop_id,
+                detailed="creating",
+                operational="init",
+                ns_state="BUILDING",
+                current_op="INSTANTIATING",
+                stage="Building",
+            )
+
+            # Perform PaaS Service Deployment using PaaS Connector
+            self.logger.debug(logging_text + "Creating instantiate task")
+            task_instantiate = asyncio.ensure_future(
+                self.paas_connector.instantiate(nsr_id, nslcmop_id)
+            )
+            self.lcm_tasks.register(
+                "ns",
+                nsr_id,
+                nslcmop_id,
+                "instantiate_juju_paas_service",
+                task_instantiate,
+            )
+            tasks_dict_info[task_instantiate] = "Instantiate juju PaaS Service"
+
+            # Update nsState="INSTANTIATED"
+            self.logger.debug(logging_text + "INSTANTIATED")
+            self._update_ns_state(nsr_id, db_nsr_update, "INSTANTIATED")
+
+        except (
+            DbException,
+            LcmException,
+            JujuPaasConnException,
+            JujuPaasServiceException,
+        ) as e:
+            self.logger.error(logging_text + "Exit Exception: {}".format(e))
+            exc = e
+        except asyncio.CancelledError:
+            self.logger.error(logging_text + "Cancelled Exception")
+            exc = "Operation was cancelled"
+
+        finally:
+            if exc:
+                error_list.append(str(exc))
+            try:
+                if tasks_dict_info:
+                    # Wait for pending tasks
+                    stage = "Waiting for instantiate pending tasks."
+                    self.logger.debug(logging_text + stage)
+                    error_list += await self._wait_for_tasks(
+                        logging_text,
+                        tasks_dict_info,
+                        self.timeout_ns_deploy,
+                        stage,
+                        nslcmop_id,
+                        nsr_id=nsr_id,
+                    )
+            except asyncio.CancelledError:
+                error_list.append("Cancelled")
+            except Exception as exc:
+                error_list.append(str(exc))
+
+            # Update operational-status
+            self.logger.debug("updating operational status")
+            db_nsr_update["operational-status"] = "running"
+
+            # Update status at database after operation
+            self.logger.debug(logging_text + "Updating DB after operation")
+            nslcmop_operation_state = self._update_db_after_operation(
+                nslcmop_id,
+                db_nsr,
+                nsr_id,
+                db_nslcmop_update=db_nslcmop_update,
+                db_nsr_update=db_nsr_update,
+                error_list=error_list,
+                operation_type="instantiate",
+            )
+
+            # Write to Kafka bus to report the operation status
+            await self._report_to_kafka(
+                nsr_id,
+                nslcmop_id,
+                nslcmop_operation_state,
+                logging_text,
+                "instantiated",
+            )
+            self.logger.debug(logging_text + "Exit")
+
+            # Remove task
+            self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
+
+    async def terminate(self, nsr_id: str, nslcmop_id: str) -> None:
+        """Perform PaaS Service termination.
+        Args:
+            nsr_id   (str):     NS service record to be used
+            nslcmop_id  (str):  NS LCM operation id
+        """
+        # Locking HA task
+        if not self._lock_ha_task(nslcmop_id, nsr_id, keyword="terminate"):
+            return
+
+        logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
+        self.logger.debug(logging_text + "Enter")
+
+        # Update ns termination timeout
+        timeout_ns_terminate = self.timeout_ns_deploy
+        db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+        db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
+        operation_params = db_nslcmop.get("operationParams") or {}
+
+        if operation_params.get("timeout_ns_terminate"):
+            timeout_ns_terminate = operation_params["timeout_ns_terminate"]
+
+        # Required containers
+        autoremove = False
+        db_nsr_update, db_nslcmop_update, tasks_dict_info = {}, {}, {}
+        exc = None
+        error_list = []
+
+        try:
+            # Wait for any previous tasks in process
+            await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
+
+            # Update nsState="TERMINATING", currentOperation="TERMINATING", currentOperationID=nslcmop_id
+            self._prepare_db_before_operation(
+                db_nsr_update,
+                nsr_id,
+                nslcmop_id,
+                detailed="terminating",
+                operational="terminate",
+                ns_state="TERMINATING",
+                current_op="TERMINATING",
+                stage="terminating",
+            )
+
+            # Perform PaaS Service deletion using PaaS Connector
+            self.logger.debug(logging_text + "Creating terminate task")
+            task_terminate = asyncio.ensure_future(
+                self.paas_connector.terminate(nsr_id, nslcmop_id)
+            )
+            self.lcm_tasks.register(
+                "ns", nsr_id, nslcmop_id, "terminate_juju_paas_service", task_terminate
+            )
+            tasks_dict_info[task_terminate] = "Terminate juju PaaS Service"
+
+            # Update nsState="TERMINATED"
+            self.logger.debug(logging_text + "TERMINATED")
+            self._update_ns_state(nsr_id, db_nsr_update, "TERMINATED")
+
+        except (
+            DbException,
+            LcmException,
+            JujuPaasConnException,
+            JujuPaasServiceException,
+        ) as e:
+            self.logger.error(logging_text + "Exit Exception: {}".format(e))
+            exc = e
+        except asyncio.CancelledError:
+            self.logger.error(logging_text + "Cancelled Exception")
+            exc = "Operation was cancelled"
+
+        finally:
+            if exc:
+                error_list.append(str(exc))
+            try:
+                if tasks_dict_info:
+                    # Wait for pending tasks
+                    stage = "Waiting for pending tasks for termination."
+                    self.logger.debug(logging_text + stage)
+                    error_list += await self._wait_for_tasks(
+                        logging_text,
+                        tasks_dict_info,
+                        min(self.timeout_ns_deploy, timeout_ns_terminate),
+                        stage,
+                        nslcmop_id,
+                        nsr_id=nsr_id,
+                    )
+            except asyncio.CancelledError:
+                error_list.append("Cancelled")
+            except Exception as exc:
+                error_list.append(str(exc))
+
+            # Update status at database
+            nslcmop_operation_state = self._update_db_after_operation(
+                nslcmop_id,
+                db_nsr,
+                nsr_id,
+                db_nslcmop_update=db_nslcmop_update,
+                db_nsr_update=db_nsr_update,
+                error_list=error_list,
+                operation_type="terminate",
+            )
+
+            # Write to Kafka bus to report the operation status
+            if operation_params:
+                autoremove = operation_params.get("autoremove", False)
+
+            await self._report_to_kafka(
+                nsr_id,
+                nslcmop_id,
+                nslcmop_operation_state,
+                logging_text,
+                "terminated",
+                autoremove=autoremove,
+            )
+            self.logger.debug(logging_text + "Exit")
+
+            # Remove task
+            self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
+
+    async def action(self, nsr_id: str, nslcmop_id: str):
+        """Perform action on PaaS service.
+        Args:
+            nsr_id   (str):     NS service record to be used
+            nslcmop_id  (str):  NS LCM operation id
+
+        Raises:
+            NotImplementedError
+        """
+        raise NotImplementedError("Juju Paas Service action method is not implemented")
index 7116517..3730162 100644 (file)
@@ -79,8 +79,11 @@ class TestLcmHelmConn(asynctest.TestCase):
 
         self.db.get_one.return_value = {"_admin": {"helm-chart-v3": {"id": "myk8s_id"}}}
         ee_id, _ = await self.helm_conn.create_execution_environment(
-            namespace, db_dict, artifact_path=artifact_path,
-            chart_model=chart_model, vca_type="helm-v3"
+            namespace,
+            db_dict,
+            artifact_path=artifact_path,
+            chart_model=chart_model,
+            vca_type="helm-v3",
         )
         self.assertEqual(
             ee_id,