Adding PaaS Service Creation

Adding AbstractPaasConnector and JujuPaasService Classes

Change-Id: I1678a8aa9d9fa453c5e21a340c29c35c82989594
Signed-off-by: Gulsum Atici <gulsum.atici@canonical.com>
diff --git a/osm_lcm/lcm.py b/osm_lcm/lcm.py
index 5bbeade..5f34280 100644
--- a/osm_lcm/lcm.py
+++ b/osm_lcm/lcm.py
@@ -34,7 +34,14 @@
 from osm_lcm.ROclient import ROClient, ROClientException
 
 from time import time
-from osm_lcm.lcm_utils import versiontuple, LcmException, TaskRegistry, LcmExceptionExit
+from osm_lcm.lcm_utils import (
+    get_paas_id_by_nsr_id,
+    get_paas_type_by_paas_id,
+    LcmException,
+    LcmExceptionExit,
+    TaskRegistry,
+    versiontuple,
+)
 from osm_lcm import version as lcm_version, version_date as lcm_version_date
 
 from osm_common import msglocal, msgkafka
@@ -45,6 +52,7 @@
 from osm_lcm.data_utils.database.database import Database
 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
 from osm_lcm.lcm_hc import get_health_check_file
+from osm_lcm.paas_service import paas_service_factory
 from os import environ, path
 from random import choice as random_choice
 from n2vc import version as n2vc_version
@@ -78,7 +86,7 @@
     def __init__(self, config_file, loop=None):
         """
         Init, Connect to database, filesystem storage, and messaging
-        :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
+        :param config_file: two level dictionary with configuration. Top level should contain 'database', 'storage',
         :return: None
         """
         self.db = None
@@ -123,8 +131,13 @@
             self.vim
         ) = (
             self.wim
-        ) = self.sdn = self.k8scluster = self.vca = self.k8srepo = self.paas = None
-
+        ) = (
+            self.sdn
+        ) = (
+            self.k8scluster
+        ) = (
+            self.vca
+        ) = self.k8srepo = self.paas = self.paas_service = self.juju_paas = None
         # logging
         log_format_simple = (
             "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
@@ -334,6 +347,77 @@
         else:
             self.logger.error("Invalid command {} for PaaS topic".format(command))
 
+    def _kafka_read_ns_instantiate(self, params: dict) -> None:
+        """Operations to be performed if the topic is ns and command is instantiate.
+        Args:
+            params  (dict):     Dictionary including NS related parameters
+        """
+        nsr_id, nslcmop_id = params["nsInstanceId"], params["_id"]
+        paas_id = params["operationParams"].get("paasAccountId")
+
+        if paas_id:
+            paas_type = get_paas_type_by_paas_id(paas_id, self.db)
+            task = asyncio.ensure_future(
+                self.paas_service[paas_type].instantiate(nsr_id, nslcmop_id)
+            )
+            self.logger.debug(
+                "Deploying NS {} using PaaS account {}".format(nsr_id, paas_id)
+            )
+
+        else:
+            task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id))
+            self.logger.debug("Deploying NS {}".format(nsr_id))
+
+        self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_instantiate", task)
+
+    def _kafka_read_ns_terminate(self, params: dict, topic: str) -> None:
+        """Operations to be performed if the topic is ns and command is terminate.
+        Args:
+            params  (dict):     Dictionary including NS related parameters
+            topic   (str):      Name of Kafka topic
+        """
+        nsr_id, nslcmop_id = params["nsInstanceId"], params["_id"]
+        paas_id = get_paas_id_by_nsr_id(nsr_id, self.db)
+
+        if paas_id:
+            paas_type = get_paas_type_by_paas_id(paas_id, self.db)
+            task = asyncio.ensure_future(
+                self.paas_service[paas_type].terminate(nsr_id, nslcmop_id)
+            )
+            self.logger.debug(
+                "Terminating NS {} using PaaS account {}".format(nsr_id, paas_id)
+            )
+
+        else:
+            self.lcm_tasks.cancel(topic, nsr_id)
+            task = asyncio.ensure_future(self.ns.terminate(nsr_id, nslcmop_id))
+            self.logger.debug("Terminating NS {}".format(nsr_id))
+
+        self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_terminate", task)
+
+    def _kafka_read_ns_action(self, params: dict) -> None:
+        """Operations to be performed if the topic is ns and command is action.
+        Args:
+            params  (dict):     Dictionary including NS related parameters
+        """
+        nsr_id, nslcmop_id = params["nsInstanceId"], params["_id"]
+        paas_id = get_paas_id_by_nsr_id(nsr_id, self.db)
+
+        if paas_id:
+            paas_type = get_paas_type_by_paas_id(paas_id, self.db)
+            task = asyncio.ensure_future(
+                self.paas_service[paas_type].action(nsr_id, nslcmop_id)
+            )
+            self.logger.debug(
+                "Running action on NS {} using PaaS account {}".format(nsr_id, paas_id)
+            )
+
+        else:
+            task = asyncio.ensure_future(self.ns.action(nsr_id, nslcmop_id))
+            self.logger.debug("Running action on NS {}".format(nsr_id))
+
+        self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_action", task)
+
     def kafka_read_callback(self, topic, command, params):
         order_id = 1
 
@@ -423,24 +507,13 @@
                 return
         elif topic == "ns":
             if command == "instantiate":
-                # self.logger.debug("Deploying NS {}".format(nsr_id))
-                nslcmop = params
-                nslcmop_id = nslcmop["_id"]
-                nsr_id = nslcmop["nsInstanceId"]
-                task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id))
-                self.lcm_tasks.register(
-                    "ns", nsr_id, nslcmop_id, "ns_instantiate", task
-                )
+                self._kafka_read_ns_instantiate(params)
                 return
+
             elif command == "terminate":
-                # self.logger.debug("Deleting NS {}".format(nsr_id))
-                nslcmop = params
-                nslcmop_id = nslcmop["_id"]
-                nsr_id = nslcmop["nsInstanceId"]
-                self.lcm_tasks.cancel(topic, nsr_id)
-                task = asyncio.ensure_future(self.ns.terminate(nsr_id, nslcmop_id))
-                self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_terminate", task)
+                self._kafka_read_ns_terminate(params, topic)
                 return
+
             elif command == "vca_status_refresh":
                 nslcmop = params
                 nslcmop_id = nslcmop["_id"]
@@ -452,49 +525,36 @@
                     "ns", nsr_id, nslcmop_id, "ns_vca_status_refresh", task
                 )
                 return
+
             elif command == "action":
-                # self.logger.debug("Update NS {}".format(nsr_id))
-                nslcmop = params
-                nslcmop_id = nslcmop["_id"]
-                nsr_id = nslcmop["nsInstanceId"]
-                task = asyncio.ensure_future(self.ns.action(nsr_id, nslcmop_id))
-                self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_action", task)
+                self._kafka_read_ns_action(params)
                 return
+
             elif command == "update":
                 # self.logger.debug("Update NS {}".format(nsr_id))
-                nslcmop = params
-                nslcmop_id = nslcmop["_id"]
-                nsr_id = nslcmop["nsInstanceId"]
+                nsr_id, nslcmop_id = params["nsInstanceId"], params["_id"]
                 task = asyncio.ensure_future(self.ns.update(nsr_id, nslcmop_id))
                 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_update", task)
                 return
             elif command == "scale":
                 # self.logger.debug("Update NS {}".format(nsr_id))
-                nslcmop = params
-                nslcmop_id = nslcmop["_id"]
-                nsr_id = nslcmop["nsInstanceId"]
+                nsr_id, nslcmop_id = params["nsInstanceId"], params["_id"]
                 task = asyncio.ensure_future(self.ns.scale(nsr_id, nslcmop_id))
                 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_scale", task)
                 return
             elif command == "heal":
                 # self.logger.debug("Healing NS {}".format(nsr_id))
-                nslcmop = params
-                nslcmop_id = nslcmop["_id"]
-                nsr_id = nslcmop["nsInstanceId"]
+                nsr_id, nslcmop_id = params["nsInstanceId"], params["_id"]
                 task = asyncio.ensure_future(self.ns.heal(nsr_id, nslcmop_id))
                 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_heal", task)
                 return
             elif command == "migrate":
-                nslcmop = params
-                nslcmop_id = nslcmop["_id"]
-                nsr_id = nslcmop["nsInstanceId"]
+                nsr_id, nslcmop_id = params["nsInstanceId"], params["_id"]
                 task = asyncio.ensure_future(self.ns.migrate(nsr_id, nslcmop_id))
                 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_migrate", task)
                 return
             elif command == "verticalscale":
-                nslcmop = params
-                nslcmop_id = nslcmop["_id"]
-                nsr_id = nslcmop["nsInstanceId"]
+                nsr_id, nslcmop_id = params["nsInstanceId"], params["_id"]
                 task = asyncio.ensure_future(self.ns.vertical_scale(nsr_id, nslcmop_id))
                 self.logger.debug(
                     "nsr_id,nslcmop_id,task {},{},{}".format(nsr_id, nslcmop_id, task)
@@ -519,7 +579,7 @@
                             db_nsr["config-status"],
                             db_nsr["detailed-status"],
                             db_nsr["_admin"]["deployed"],
-                            self.lcm_ns_tasks.get(nsr_id),
+                            self.lcm_tasks.task_registry["ns"][nsr_id],
                         )
                     )
                 except Exception as e:
@@ -755,6 +815,21 @@
             self.msg, self.lcm_tasks, self.config, self.loop
         )
 
+        # Specific PaaS Service Object for "Juju" PaaS Orchestrator type
+        self.juju_paas = paas_service_factory(
+            self.msg,
+            self.lcm_tasks,
+            self.db,
+            self.fs,
+            self.logger,
+            self.loop,
+            self.config,
+            "juju",
+        )
+        # Mapping between paas_type and PaaS service object
+        self.paas_service = {
+            "juju": self.juju_paas,
+        }
         self.loop.run_until_complete(
             asyncio.gather(self.kafka_read(), self.kafka_ping())
         )