Adding PaaS Service Creation
Adding AbstractPaasConnector and JujuPaasService Classes
Change-Id: I1678a8aa9d9fa453c5e21a340c29c35c82989594
Signed-off-by: Gulsum Atici <gulsum.atici@canonical.com>
diff --git a/osm_lcm/lcm.py b/osm_lcm/lcm.py
index 5bbeade..5f34280 100644
--- a/osm_lcm/lcm.py
+++ b/osm_lcm/lcm.py
@@ -34,7 +34,14 @@
from osm_lcm.ROclient import ROClient, ROClientException
from time import time
-from osm_lcm.lcm_utils import versiontuple, LcmException, TaskRegistry, LcmExceptionExit
+from osm_lcm.lcm_utils import (
+ get_paas_id_by_nsr_id,
+ get_paas_type_by_paas_id,
+ LcmException,
+ LcmExceptionExit,
+ TaskRegistry,
+ versiontuple,
+)
from osm_lcm import version as lcm_version, version_date as lcm_version_date
from osm_common import msglocal, msgkafka
@@ -45,6 +52,7 @@
from osm_lcm.data_utils.database.database import Database
from osm_lcm.data_utils.filesystem.filesystem import Filesystem
from osm_lcm.lcm_hc import get_health_check_file
+from osm_lcm.paas_service import paas_service_factory
from os import environ, path
from random import choice as random_choice
from n2vc import version as n2vc_version
@@ -78,7 +86,7 @@
def __init__(self, config_file, loop=None):
"""
Init, Connect to database, filesystem storage, and messaging
- :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
+ :param config_file: two level dictionary with configuration. Top level should contain 'database', 'storage',
:return: None
"""
self.db = None
@@ -123,8 +131,13 @@
self.vim
) = (
self.wim
- ) = self.sdn = self.k8scluster = self.vca = self.k8srepo = self.paas = None
-
+ ) = (
+ self.sdn
+ ) = (
+ self.k8scluster
+ ) = (
+ self.vca
+ ) = self.k8srepo = self.paas = self.paas_service = self.juju_paas = None
# logging
log_format_simple = (
"%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
@@ -334,6 +347,77 @@
else:
self.logger.error("Invalid command {} for PaaS topic".format(command))
+ def _kafka_read_ns_instantiate(self, params: dict) -> None:
+ """Operations to be performed if the topic is ns and command is instantiate.
+ Args:
+ params (dict): Dictionary including NS related parameters
+ """
+ nsr_id, nslcmop_id = params["nsInstanceId"], params["_id"]
+ paas_id = params["operationParams"].get("paasAccountId")
+
+ if paas_id:
+ paas_type = get_paas_type_by_paas_id(paas_id, self.db)
+ task = asyncio.ensure_future(
+ self.paas_service[paas_type].instantiate(nsr_id, nslcmop_id)
+ )
+ self.logger.debug(
+ "Deploying NS {} using PaaS account {}".format(nsr_id, paas_id)
+ )
+
+ else:
+ task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id))
+ self.logger.debug("Deploying NS {}".format(nsr_id))
+
+ self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_instantiate", task)
+
+ def _kafka_read_ns_terminate(self, params: dict, topic: str) -> None:
+ """Operations to be performed if the topic is ns and command is terminate.
+ Args:
+ params (dict): Dictionary including NS related parameters
+ topic (str): Name of Kafka topic
+ """
+ nsr_id, nslcmop_id = params["nsInstanceId"], params["_id"]
+ paas_id = get_paas_id_by_nsr_id(nsr_id, self.db)
+
+ if paas_id:
+ paas_type = get_paas_type_by_paas_id(paas_id, self.db)
+ task = asyncio.ensure_future(
+ self.paas_service[paas_type].terminate(nsr_id, nslcmop_id)
+ )
+ self.logger.debug(
+ "Terminating NS {} using PaaS account {}".format(nsr_id, paas_id)
+ )
+
+ else:
+ self.lcm_tasks.cancel(topic, nsr_id)
+ task = asyncio.ensure_future(self.ns.terminate(nsr_id, nslcmop_id))
+ self.logger.debug("Terminating NS {}".format(nsr_id))
+
+ self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_terminate", task)
+
+ def _kafka_read_ns_action(self, params: dict) -> None:
+ """Operations to be performed if the topic is ns and command is action.
+ Args:
+ params (dict): Dictionary including NS related parameters
+ """
+ nsr_id, nslcmop_id = params["nsInstanceId"], params["_id"]
+ paas_id = get_paas_id_by_nsr_id(nsr_id, self.db)
+
+ if paas_id:
+ paas_type = get_paas_type_by_paas_id(paas_id, self.db)
+ task = asyncio.ensure_future(
+ self.paas_service[paas_type].action(nsr_id, nslcmop_id)
+ )
+ self.logger.debug(
+ "Running action on NS {} using PaaS account {}".format(nsr_id, paas_id)
+ )
+
+ else:
+ task = asyncio.ensure_future(self.ns.action(nsr_id, nslcmop_id))
+ self.logger.debug("Running action on NS {}".format(nsr_id))
+
+ self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_action", task)
+
def kafka_read_callback(self, topic, command, params):
order_id = 1
@@ -423,24 +507,13 @@
return
elif topic == "ns":
if command == "instantiate":
- # self.logger.debug("Deploying NS {}".format(nsr_id))
- nslcmop = params
- nslcmop_id = nslcmop["_id"]
- nsr_id = nslcmop["nsInstanceId"]
- task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id))
- self.lcm_tasks.register(
- "ns", nsr_id, nslcmop_id, "ns_instantiate", task
- )
+ self._kafka_read_ns_instantiate(params)
return
+
elif command == "terminate":
- # self.logger.debug("Deleting NS {}".format(nsr_id))
- nslcmop = params
- nslcmop_id = nslcmop["_id"]
- nsr_id = nslcmop["nsInstanceId"]
- self.lcm_tasks.cancel(topic, nsr_id)
- task = asyncio.ensure_future(self.ns.terminate(nsr_id, nslcmop_id))
- self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_terminate", task)
+ self._kafka_read_ns_terminate(params, topic)
return
+
elif command == "vca_status_refresh":
nslcmop = params
nslcmop_id = nslcmop["_id"]
@@ -452,49 +525,36 @@
"ns", nsr_id, nslcmop_id, "ns_vca_status_refresh", task
)
return
+
elif command == "action":
- # self.logger.debug("Update NS {}".format(nsr_id))
- nslcmop = params
- nslcmop_id = nslcmop["_id"]
- nsr_id = nslcmop["nsInstanceId"]
- task = asyncio.ensure_future(self.ns.action(nsr_id, nslcmop_id))
- self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_action", task)
+ self._kafka_read_ns_action(params)
return
+
elif command == "update":
# self.logger.debug("Update NS {}".format(nsr_id))
- nslcmop = params
- nslcmop_id = nslcmop["_id"]
- nsr_id = nslcmop["nsInstanceId"]
+ nsr_id, nslcmop_id = params["nsInstanceId"], params["_id"]
task = asyncio.ensure_future(self.ns.update(nsr_id, nslcmop_id))
self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_update", task)
return
elif command == "scale":
# self.logger.debug("Update NS {}".format(nsr_id))
- nslcmop = params
- nslcmop_id = nslcmop["_id"]
- nsr_id = nslcmop["nsInstanceId"]
+ nsr_id, nslcmop_id = params["nsInstanceId"], params["_id"]
task = asyncio.ensure_future(self.ns.scale(nsr_id, nslcmop_id))
self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_scale", task)
return
elif command == "heal":
# self.logger.debug("Healing NS {}".format(nsr_id))
- nslcmop = params
- nslcmop_id = nslcmop["_id"]
- nsr_id = nslcmop["nsInstanceId"]
+ nsr_id, nslcmop_id = params["nsInstanceId"], params["_id"]
task = asyncio.ensure_future(self.ns.heal(nsr_id, nslcmop_id))
self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_heal", task)
return
elif command == "migrate":
- nslcmop = params
- nslcmop_id = nslcmop["_id"]
- nsr_id = nslcmop["nsInstanceId"]
+ nsr_id, nslcmop_id = params["nsInstanceId"], params["_id"]
task = asyncio.ensure_future(self.ns.migrate(nsr_id, nslcmop_id))
self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_migrate", task)
return
elif command == "verticalscale":
- nslcmop = params
- nslcmop_id = nslcmop["_id"]
- nsr_id = nslcmop["nsInstanceId"]
+ nsr_id, nslcmop_id = params["nsInstanceId"], params["_id"]
task = asyncio.ensure_future(self.ns.vertical_scale(nsr_id, nslcmop_id))
self.logger.debug(
"nsr_id,nslcmop_id,task {},{},{}".format(nsr_id, nslcmop_id, task)
@@ -519,7 +579,7 @@
db_nsr["config-status"],
db_nsr["detailed-status"],
db_nsr["_admin"]["deployed"],
- self.lcm_ns_tasks.get(nsr_id),
+ self.lcm_tasks.task_registry["ns"][nsr_id],
)
)
except Exception as e:
@@ -755,6 +815,21 @@
self.msg, self.lcm_tasks, self.config, self.loop
)
+ # Specific PaaS Service Object for "Juju" PaaS Orchestrator type
+ self.juju_paas = paas_service_factory(
+ self.msg,
+ self.lcm_tasks,
+ self.db,
+ self.fs,
+ self.logger,
+ self.loop,
+ self.config,
+ "juju",
+ )
+ # Mapping between paas_type and PaaS service object
+ self.paas_service = {
+ "juju": self.juju_paas,
+ }
self.loop.run_until_complete(
asyncio.gather(self.kafka_read(), self.kafka_ping())
)