from osm_lcm.ROclient import ROClient, ROClientException
from time import time
-from osm_lcm.lcm_utils import versiontuple, LcmException, TaskRegistry, LcmExceptionExit
+from osm_lcm.lcm_utils import (
+ get_paas_id_by_nsr_id,
+ get_paas_type_by_paas_id,
+ LcmException,
+ LcmExceptionExit,
+ TaskRegistry,
+ versiontuple,
+)
from osm_lcm import version as lcm_version, version_date as lcm_version_date
from osm_common import msglocal, msgkafka
from osm_lcm.data_utils.database.database import Database
from osm_lcm.data_utils.filesystem.filesystem import Filesystem
from osm_lcm.lcm_hc import get_health_check_file
+from osm_lcm.paas_service import paas_service_factory
from os import environ, path
from random import choice as random_choice
from n2vc import version as n2vc_version
def __init__(self, config_file, loop=None):
"""
Init, Connect to database, filesystem storage, and messaging
- :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
+ :param config_file: two level dictionary with configuration. Top level should contain 'database', 'storage',
:return: None
"""
self.db = None
self.vim
) = (
self.wim
- ) = self.sdn = self.k8scluster = self.vca = self.k8srepo = self.paas = None
-
+ ) = (
+ self.sdn
+ ) = (
+ self.k8scluster
+ ) = (
+ self.vca
+ ) = self.k8srepo = self.paas = self.paas_service = self.juju_paas = None
# logging
log_format_simple = (
"%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
else:
self.logger.error("Invalid command {} for PaaS topic".format(command))
+ def _kafka_read_ns_instantiate(self, params: dict) -> None:
+ """Operations to be performed if the topic is ns and command is instantiate.
+ Args:
+ params (dict): Dictionary including NS related parameters
+ """
+ nsr_id, nslcmop_id = params["nsInstanceId"], params["_id"]
+ paas_id = params["operationParams"].get("paasAccountId")
+
+ if paas_id:
+ paas_type = get_paas_type_by_paas_id(paas_id, self.db)
+ task = asyncio.ensure_future(
+ self.paas_service[paas_type].instantiate(nsr_id, nslcmop_id)
+ )
+ self.logger.debug(
+ "Deploying NS {} using PaaS account {}".format(nsr_id, paas_id)
+ )
+
+ else:
+ task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id))
+ self.logger.debug("Deploying NS {}".format(nsr_id))
+
+ self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_instantiate", task)
+
+ def _kafka_read_ns_terminate(self, params: dict, topic: str) -> None:
+ """Operations to be performed if the topic is ns and command is terminate.
+ Args:
+ params (dict): Dictionary including NS related parameters
+ topic (str): Name of Kafka topic
+ """
+ nsr_id, nslcmop_id = params["nsInstanceId"], params["_id"]
+ paas_id = get_paas_id_by_nsr_id(nsr_id, self.db)
+
+ if paas_id:
+ paas_type = get_paas_type_by_paas_id(paas_id, self.db)
+ task = asyncio.ensure_future(
+ self.paas_service[paas_type].terminate(nsr_id, nslcmop_id)
+ )
+ self.logger.debug(
+ "Terminating NS {} using PaaS account {}".format(nsr_id, paas_id)
+ )
+
+ else:
+ self.lcm_tasks.cancel(topic, nsr_id)
+ task = asyncio.ensure_future(self.ns.terminate(nsr_id, nslcmop_id))
+ self.logger.debug("Terminating NS {}".format(nsr_id))
+
+ self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_terminate", task)
+
+ def _kafka_read_ns_action(self, params: dict) -> None:
+ """Operations to be performed if the topic is ns and command is action.
+ Args:
+ params (dict): Dictionary including NS related parameters
+ """
+ nsr_id, nslcmop_id = params["nsInstanceId"], params["_id"]
+ paas_id = get_paas_id_by_nsr_id(nsr_id, self.db)
+
+ if paas_id:
+ paas_type = get_paas_type_by_paas_id(paas_id, self.db)
+ task = asyncio.ensure_future(
+ self.paas_service[paas_type].action(nsr_id, nslcmop_id)
+ )
+ self.logger.debug(
+ "Running action on NS {} using PaaS account {}".format(nsr_id, paas_id)
+ )
+
+ else:
+ task = asyncio.ensure_future(self.ns.action(nsr_id, nslcmop_id))
+ self.logger.debug("Running action on NS {}".format(nsr_id))
+
+ self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_action", task)
+
def kafka_read_callback(self, topic, command, params):
order_id = 1
return
elif topic == "ns":
if command == "instantiate":
- # self.logger.debug("Deploying NS {}".format(nsr_id))
- nslcmop = params
- nslcmop_id = nslcmop["_id"]
- nsr_id = nslcmop["nsInstanceId"]
- task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id))
- self.lcm_tasks.register(
- "ns", nsr_id, nslcmop_id, "ns_instantiate", task
- )
+ self._kafka_read_ns_instantiate(params)
return
+
elif command == "terminate":
- # self.logger.debug("Deleting NS {}".format(nsr_id))
- nslcmop = params
- nslcmop_id = nslcmop["_id"]
- nsr_id = nslcmop["nsInstanceId"]
- self.lcm_tasks.cancel(topic, nsr_id)
- task = asyncio.ensure_future(self.ns.terminate(nsr_id, nslcmop_id))
- self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_terminate", task)
+ self._kafka_read_ns_terminate(params, topic)
return
+
elif command == "vca_status_refresh":
nslcmop = params
nslcmop_id = nslcmop["_id"]
"ns", nsr_id, nslcmop_id, "ns_vca_status_refresh", task
)
return
+
elif command == "action":
- # self.logger.debug("Update NS {}".format(nsr_id))
- nslcmop = params
- nslcmop_id = nslcmop["_id"]
- nsr_id = nslcmop["nsInstanceId"]
- task = asyncio.ensure_future(self.ns.action(nsr_id, nslcmop_id))
- self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_action", task)
+ self._kafka_read_ns_action(params)
return
+
elif command == "update":
# self.logger.debug("Update NS {}".format(nsr_id))
- nslcmop = params
- nslcmop_id = nslcmop["_id"]
- nsr_id = nslcmop["nsInstanceId"]
+ nsr_id, nslcmop_id = params["nsInstanceId"], params["_id"]
task = asyncio.ensure_future(self.ns.update(nsr_id, nslcmop_id))
self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_update", task)
return
elif command == "scale":
# self.logger.debug("Update NS {}".format(nsr_id))
- nslcmop = params
- nslcmop_id = nslcmop["_id"]
- nsr_id = nslcmop["nsInstanceId"]
+ nsr_id, nslcmop_id = params["nsInstanceId"], params["_id"]
task = asyncio.ensure_future(self.ns.scale(nsr_id, nslcmop_id))
self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_scale", task)
return
elif command == "heal":
# self.logger.debug("Healing NS {}".format(nsr_id))
- nslcmop = params
- nslcmop_id = nslcmop["_id"]
- nsr_id = nslcmop["nsInstanceId"]
+ nsr_id, nslcmop_id = params["nsInstanceId"], params["_id"]
task = asyncio.ensure_future(self.ns.heal(nsr_id, nslcmop_id))
self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_heal", task)
return
elif command == "migrate":
- nslcmop = params
- nslcmop_id = nslcmop["_id"]
- nsr_id = nslcmop["nsInstanceId"]
+ nsr_id, nslcmop_id = params["nsInstanceId"], params["_id"]
task = asyncio.ensure_future(self.ns.migrate(nsr_id, nslcmop_id))
self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_migrate", task)
return
elif command == "verticalscale":
- nslcmop = params
- nslcmop_id = nslcmop["_id"]
- nsr_id = nslcmop["nsInstanceId"]
+ nsr_id, nslcmop_id = params["nsInstanceId"], params["_id"]
task = asyncio.ensure_future(self.ns.vertical_scale(nsr_id, nslcmop_id))
self.logger.debug(
"nsr_id,nslcmop_id,task {},{},{}".format(nsr_id, nslcmop_id, task)
db_nsr["config-status"],
db_nsr["detailed-status"],
db_nsr["_admin"]["deployed"],
- self.lcm_ns_tasks.get(nsr_id),
+ self.lcm_tasks.task_registry["ns"][nsr_id],
)
)
except Exception as e:
self.msg, self.lcm_tasks, self.config, self.loop
)
+ # Specific PaaS Service Object for "Juju" PaaS Orchestrator type
+ self.juju_paas = paas_service_factory(
+ self.msg,
+ self.lcm_tasks,
+ self.db,
+ self.fs,
+ self.logger,
+ self.loop,
+ self.config,
+ "juju",
+ )
+ # Mapping between paas_type and PaaS service object
+ self.paas_service = {
+ "juju": self.juju_paas,
+ }
self.loop.run_until_complete(
asyncio.gather(self.kafka_read(), self.kafka_ping())
)
--- /dev/null
+#!/usr/bin/python3
+#
+# Copyright 2022 Canonical Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import abc
+import asyncio
+import logging
+
+
+def paas_connector_factory(
+ uuid: str,
+ name: str,
+ db: object,
+ fs: object,
+ loop: object,
+ log: object,
+ config: dict,
+ paas_type="juju",
+):
+ """Factory Method to create the paas_connector objects according to PaaS Type.
+ Args:
+ uuid (str): Internal id of PaaS account
+ name (str): name assigned to PaaS account, can be used for logging
+ db (object): Database object to write current operation status
+ fs (object): Filesystem object to use during operations
+ loop (object): Async event loop object
+ log (object): Logger for tracing
+ config (dict): Dictionary with extra PaaS information.
+ paas_type (str): Identifier to create paas_connector object using correct PaaS Connector Class
+
+ Returns:
+ paas_connector (object): paas_connector objects created according to given PaaS Type
+
+ Raises:
+ PaasConnException
+ """
+ connectors = {
+ "juju": JujuPaasConnector,
+ }
+ if paas_type not in connectors.keys():
+ raise PaasConnException(f"PaaS type: {paas_type} is not available.")
+
+ return connectors[paas_type](uuid, name, db, fs, loop, log, config)
+
+
+class PaasConnException(Exception):
+ """PaaS Connector Exception Base Class"""
+
+ def __init__(self, message: str = ""):
+ """Constructor of PaaS Connector Exception
+ Args:
+ message (str): error message to be raised
+ """
+ Exception.__init__(self, message)
+ self.message = message
+
+ def __str__(self):
+ return self.message
+
+ def __repr__(self):
+ return "{}({})".format(type(self), self.message)
+
+
+class JujuPaasConnException(PaasConnException):
+ """Juju PaaS Connector Exception Class"""
+
+
+class AbstractPaasConnector(abc.ABC):
+ """Abstract PaaS Connector class to perform operations using PaaS Orchestrator."""
+
+ def __init__(
+ self,
+ uuid=None,
+ name=None,
+ db=None,
+ fs=None,
+ logger=None,
+ loop=None,
+ config=None,
+ ):
+ """Constructor of PaaS Connector.
+ Args:
+ uuid (str): internal id of PaaS account
+ name (str): name assigned to this account, can be used for logging
+ db (object): database object to write current operation status
+ fs (object): Filesystem object to use during operations
+ logger (object): Logger for tracing
+ loop (object): Async event loop object
+ config (dict): Dictionary with extra PaaS information.
+ """
+ self.id = uuid
+ self.name = name
+ self.db = db
+ self.fs = fs
+ self.config = config or {}
+ self.logger = logger
+
+ @abc.abstractmethod
+ async def connect(self, endpoints: str, user: str = None, secret: str = None):
+ """Abstract method to connect PaaS account using endpoints, user and secret.
+ Args:
+ endpoints (str): Endpoint/URL to connect PaaS account
+ user (str): User which is used to connect PaaS account
+ secret (str): Used for authentication
+ """
+
+ @abc.abstractmethod
+ async def instantiate(self, nsr_id: str, nslcmop_id: str):
+ """Abstract method to perform PaaS Service instantiation.
+ Args:
+ nsr_id (str): NS service record to be used
+ nslcmop_id (str): NS LCM operation id
+ """
+
+ @abc.abstractmethod
+ async def terminate(self, nsr_id: str, nslcmop_id: str):
+ """Abstract method to perform PaaS Service termination.
+ Args:
+ nsr_id (str): NS service record to be used
+ nslcmop_id (str): NS LCM operation id
+ """
+
+ @abc.abstractmethod
+ async def action(self, nsr_id: str, nslcmop_id: str):
+ """Abstract method to perform action on PaaS Service.
+ Args:
+ nsr_id (str): NS service record to be used
+ nslcmop_id (str): NS LCM operation id
+ """
+
+
+class JujuPaasConnector(AbstractPaasConnector):
+ """Concrete PaaS Connector class to perform operations using the Juju PaaS Orchestrator."""
+
+ def __init__(
+ self,
+ uuid=None,
+ name=None,
+ db=None,
+ fs=None,
+ logger=None,
+ loop=None,
+ config=None,
+ ):
+ self.logger = logging.getLogger("lcm.juju_paas_connector")
+ super(JujuPaasConnector, self).__init__(logger=self.logger)
+
+ async def connect(self, endpoints: str, user: str = None, secret: str = None):
+ """Connect Juju PaaS account using endpoints, user and secret.
+ Args:
+ endpoints (str): Endpoint/URL to connect PaaS account
+ user (str): User which is used to connect PaaS account
+ secret (str): Used for authentication
+
+ Raises:
+ NotImplementedError
+ """
+ raise NotImplementedError(
+ "Juju Paas Connector connect method is not implemented"
+ )
+
+ async def instantiate(self, nsr_id: str, nslcmop_id: str):
+ """Perform Service instantiation.
+ Args:
+ nsr_id (str): NS service record to be used
+ nslcmop_id (str): NS LCM operation id
+
+ Raises:
+ JujuPaasConnException
+ """
+ # This is not the real implementation
+ # Sample code blocks to validate method execution
+ await asyncio.sleep(1)
+ self.logger.debug("Juju Paas Connector instantiate method is called")
+
+ async def terminate(self, nsr_id: str, nslcmop_id: str):
+ """Perform PaaS Service termination.
+ Args:
+ nsr_id (str): NS service record to be used
+ nslcmop_id (str): NS LCM operation id
+
+ Raises:
+ JujuPaasConnException
+ """
+ # This is not the real implementation
+ # Sample code blocks to validate method execution
+ await asyncio.sleep(1)
+ self.logger.debug("Juju Paas Connector terminate method is called")
+
+ async def action(self, nsr_id: str, nslcmop_id: str):
+ """Perform action on PaaS Service.
+ Args:
+ nsr_id (str): NS service record to be used
+ nslcmop_id (str): NS LCM operation id
+
+ Raises:
+ NotImplementedError
+ """
+ raise NotImplementedError(
+ "Juju Paas Connector instantiate method is not implemented"
+ )
--- /dev/null
+# !/usr/bin/python3
+#
+# Copyright 2022 Canonical Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import asyncio
+import logging
+import traceback
+
+from osm_common.dbbase import DbException
+from osm_common.msgbase import MsgException
+from osm_lcm.lcm_utils import LcmBase
+from osm_lcm.lcm_utils import LcmException
+from osm_lcm.paas_conn import JujuPaasConnException, paas_connector_factory
+
+from time import time
+
+
+def paas_service_factory(
+ msg: object,
+ lcm_tasks: object,
+ db: object,
+ fs: object,
+ log: object,
+ loop: object,
+ config: dict,
+ paas_type="juju",
+) -> object:
+ """Factory Method to create the paas_service objects according to PaaS Type.
+ Args:
+ msg (object): Message object to be used to write the messages to Kafka Bus
+ lcm_tasks (object): Task object to register the tasks
+ db (object): Database object to write current operation status
+ fs (object): Filesystem object to use during operations
+ log (object) Logger for tracing
+ loop (object) Async event loop object
+ config (dict): Dictionary with extra PaaS Service information.
+ paas_type (str): Identifier to create paas_service object using correct PaaS Service Class
+
+ Returns:
+ paas_service (object): paas_service objects created according to given PaaS Type
+
+ Raises:
+ PaasServiceException
+ """
+ orchestrators = {
+ "juju": JujuPaasService,
+ }
+
+ if paas_type not in orchestrators.keys():
+ raise PaasServiceException(f"PaaS type: {paas_type} is not available.")
+
+ return orchestrators[paas_type](
+ msg=msg, lcm_tasks=lcm_tasks, db=db, fs=fs, loop=loop, logger=log, config=config
+ )
+
+
+class PaasServiceException(Exception):
+ """PaaS Service Exception Base Class"""
+
+ def __init__(self, message: str = ""):
+ """Constructor of PaaS Service Exception
+ Args:
+ message (str): error message to be raised
+ """
+ Exception.__init__(self, message)
+ self.message = message
+
+ def __str__(self):
+ return self.message
+
+ def __repr__(self):
+ return "{}({})".format(type(self), self.message)
+
+
+class JujuPaasServiceException(PaasServiceException):
+ """Juju PaaS Service exception class"""
+
+
+class JujuPaasService(LcmBase):
+ """Juju PaaS Service class to handle ns operations such as instantiate, terminate, action etc."""
+
+ timeout_ns_deploy = 3600
+
+ def __init__(
+ self,
+ msg: object,
+ lcm_tasks: object,
+ db: object,
+ fs: object,
+ loop: object,
+ logger: object,
+ config: dict,
+ ):
+ """
+ Args:
+ msg (object): Message object to be used to write the messages to Kafka Bus
+ lcm_tasks (object): Task object to register the tasks
+ db (object): Database object to write current operation status
+ fs (object): Filesystem object to use during operations
+ loop (object) Async event loop object
+ logger (object): Logger for tracing
+ config (dict): Dictionary with extra PaaS Service information.
+ """
+ self.logger = logging.getLogger("lcm.juju_paas_service")
+ self.loop = loop
+ self.lcm_tasks = lcm_tasks
+ self.config = config
+ super(JujuPaasService, self).__init__(msg=msg, logger=self.logger)
+
+ self.paas_connector = paas_connector_factory(
+ self.msg,
+ self.lcm_tasks,
+ self.db,
+ self.fs,
+ self.loop,
+ self.logger,
+ self.config,
+ "juju",
+ )
+
+ def _lock_ha_task(self, nslcmop_id: str, nsr_id: str, keyword: str) -> bool:
+ """Lock the task.
+ Args:
+ nslcmop_id (str): NS LCM operation id
+ nsr_id (str): NS service record to be used
+ keyword (str): Word which indicates action such as instantiate, terminate
+
+ Returns:
+ task_is_locked_by_me (Boolean): True if task_is_locked_by_me else False
+ """
+ task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
+ if not task_is_locked_by_me:
+ self.logger.debug(
+ f"{keyword}() task is not locked by me, ns={nsr_id}, exiting."
+ )
+ return task_is_locked_by_me
+
+ def _write_ns_status(
+ self,
+ nsr_id: str,
+ ns_state: str,
+ current_operation: str,
+ current_operation_id: str,
+ error_description: str = None,
+ error_detail: str = None,
+ other_update: dict = None,
+ ) -> None:
+ """Update NS record.
+ Args:
+ nsr_id (str): NS service record to be used
+ ns_state (str): NS state
+ current_operation (str): Current operation name
+ current_operation_id (str): Current operation ID
+ error_description (str): Error description
+ error_detail (str): Details of error
+ other_update: (dict): Other required changes at database if provided
+
+ Raises:
+ DbException
+ """
+ try:
+ db_dict = other_update or {}
+ db_update_dict = {
+ "_admin.nslcmop": current_operation_id,
+ "_admin.current-operation": current_operation_id,
+ "_admin.operation-type": current_operation
+ if current_operation != "IDLE"
+ else None,
+ "currentOperation": current_operation,
+ "currentOperationID": current_operation_id,
+ "errorDescription": error_description,
+ "errorDetail": error_detail,
+ }
+ db_dict.update(db_update_dict)
+
+ if ns_state:
+ db_dict["nsState"] = ns_state
+ self.update_db_2("nsrs", nsr_id, db_dict)
+
+ except DbException as e:
+ error = f"Error writing NS status, ns={nsr_id}: {e}"
+ self.logger.error(error)
+ raise JujuPaasServiceException(error)
+
+ def _write_op_status(
+ self,
+ op_id: str,
+ stage: str = None,
+ error_message: str = None,
+ queue_position: int = 0,
+ operation_state: str = None,
+ other_update: dict = None,
+ ) -> None:
+ """Update NS LCM Operation Status.
+ Args:
+ op_id (str): Operation ID
+ stage (str): Indicates the stage of operations
+ error_message (str): Error description
+ queue_position (int): Operation position in the queue
+ operation_state (str): State of operation
+ other_update: (dict): Other required changes at database if provided
+
+ Raises:
+ DbException
+ """
+ try:
+ db_dict = other_update or {}
+ db_dict["queuePosition"] = queue_position
+ if stage:
+ db_dict["stage"] = str(stage)
+ if error_message:
+ db_dict["errorMessage"] = error_message
+ if operation_state:
+ db_dict["operationState"] = operation_state
+ db_dict["statusEnteredTime"] = time()
+ self.update_db_2("nslcmops", op_id, db_dict)
+
+ except DbException as e:
+ error = f"Error writing OPERATION status for op_id: {op_id} -> {e}"
+ self.logger.error(error)
+ raise JujuPaasServiceException(error)
+
+ def _update_nsr_error_desc(
+ self,
+ stage: str,
+ new_error: str,
+ error_list: list,
+ error_detail_list: list,
+ nsr_id: str,
+ ) -> None:
+ """Update error description in NS record.
+ Args:
+ stage (str): Indicates the stage of operations
+ new_error (str): New detected error
+ error_list (str): Updated error list
+ error_detail_list: Updated detailed error list
+ nsr_id (str): NS service record to be used
+
+ Raises:
+ DbException
+ """
+ if new_error:
+ stage += " Errors: " + ". ".join(error_detail_list) + "."
+ if nsr_id:
+ try:
+ # Update nsr
+ self.update_db_2(
+ "nsrs",
+ nsr_id,
+ {
+ "errorDescription": "Error at: " + ", ".join(error_list),
+ "errorDetail": ". ".join(error_detail_list),
+ },
+ )
+
+ except DbException as e:
+ error = f"Error updating NSR error description for nsr_id: {nsr_id} -> {e}"
+ self.logger.error(error)
+ raise JujuPaasServiceException(error)
+
+ def _check_tasks_in_done(
+ self,
+ completed_tasks_list: list,
+ created_tasks_info: dict,
+ error_list: list,
+ error_detail_list: list,
+ logging_text: str,
+ ) -> (str, str, str):
+ """Check the completed tasks to detect errors
+ Args:
+ completed_tasks_list (list): List of completed tasks
+ created_tasks_info: Dictionary which includes the tasks
+ error_list: List of errors
+ error_detail_list: List includes details of errors
+ logging_text: Main log message
+
+ Returns:
+ new_error (str): New detected error
+ error_list (str): Updated error list
+ error_detail_list: Updated detailed error list
+ """
+ new_error = ""
+ for task in completed_tasks_list:
+ if task.cancelled():
+ exc = "Cancelled"
+ else:
+ exc = task.exception()
+ if exc:
+ if isinstance(exc, asyncio.TimeoutError):
+ exc = "Timeout"
+ new_error = created_tasks_info[task] + ": {}".format(exc)
+ error_list.append(created_tasks_info[task])
+ error_detail_list.append(new_error)
+ if isinstance(
+ exc,
+ (
+ str,
+ DbException,
+ LcmException,
+ JujuPaasConnException,
+ JujuPaasServiceException,
+ ),
+ ):
+ self.logger.error(logging_text + new_error)
+ else:
+ exc_traceback = "".join(
+ traceback.format_exception(None, exc, exc.__traceback__)
+ )
+ self.logger.error(
+ logging_text + created_tasks_info[task] + " " + exc_traceback
+ )
+ else:
+ self.logger.debug(logging_text + created_tasks_info[task] + ": Done")
+
+ return new_error, error_list, error_detail_list
+
+ async def _wait_for_tasks(
+ self,
+ logging_text: str,
+ created_tasks_info: dict,
+ timeout: int,
+ stage: str,
+ nslcmop_id: str,
+ nsr_id: str,
+ ) -> None:
+ """Wait for tasks to be completed.
+ Args:
+ logging_text (str): Log message
+ created_tasks_info (dict): Dictionary which includes the tasks
+ timeout (inst): Timeout in seconds
+ stage (str): Indicates the stage of operations
+ nslcmop_id (str): NS LCM Operation ID
+ nsr_id (str): NS service record to be used
+ """
+ time_start = time()
+ error_detail_list, error_list = [], []
+ pending_tasks = list(created_tasks_info.keys())
+ num_tasks = len(pending_tasks)
+ num_done = 0
+
+ self._write_op_status(nslcmop_id, stage=f"{stage}: {num_done}/{num_tasks}")
+
+ while pending_tasks:
+ _timeout = timeout + time_start - time()
+ done, pending_tasks = await asyncio.wait(
+ pending_tasks, timeout=_timeout, return_when=asyncio.FIRST_COMPLETED
+ )
+ num_done += len(done)
+ if not done:
+ # Timeout error
+ for task in pending_tasks:
+ new_error = created_tasks_info[task] + ": Timeout"
+ error_detail_list.append(new_error)
+ error_list.append(new_error)
+ break
+ # Find out the errors in completed tasks
+ new_error, error_list, error_detail_list = self._check_tasks_in_done(
+ completed_tasks_list=done,
+ created_tasks_info=created_tasks_info,
+ error_detail_list=error_detail_list,
+ error_list=error_list,
+ logging_text=logging_text,
+ )
+
+ self._update_nsr_error_desc(
+ stage=f"{stage}: {num_done}/{num_tasks}",
+ new_error=new_error,
+ error_list=error_list,
+ error_detail_list=error_detail_list,
+ nsr_id=nsr_id,
+ )
+
+ self._write_op_status(nslcmop_id, stage=f"{stage}: {num_done}/{num_tasks}")
+
+ return error_detail_list
+
+ def _prepare_db_before_operation(
+ self,
+ db_nsr_update: dict,
+ nsr_id: str,
+ nslcmop_id: str,
+ detailed: str = None,
+ operational: str = None,
+ ns_state: str = None,
+ current_op: str = None,
+ stage: str = None,
+ ) -> None:
+ """Update DB before performing NS operations
+ Args:
+ db_nsr_update (dict): NS record update dictionary
+ nsr_id (str): NS record ID
+ nslcmop_id (str): NS LCM Operation ID
+ detailed: (str): Detailed status
+ operational (str): Operational status
+ ns_state (str): NS state
+ current_op (str): Current operation name
+ stage (str): Indicates the stage of operations
+ """
+ db_nsr_update["detailed-status"] = detailed
+ db_nsr_update["operational-status"] = operational
+
+ self._write_ns_status(
+ nsr_id=nsr_id,
+ ns_state=ns_state,
+ current_operation=current_op,
+ current_operation_id=nslcmop_id,
+ other_update=db_nsr_update,
+ )
+ self._write_op_status(op_id=nslcmop_id, stage=stage, queue_position=0)
+
+ async def _report_to_kafka(
+ self,
+ nsr_id: str,
+ nslcmop_id: str,
+ nslcmop_operation_state: str,
+ logging_text: str,
+ message: str,
+ autoremove="False",
+ ) -> None:
+ """Report operation status to Kafka.
+ Args:
+ nsr_id (str): NS record ID
+ nslcmop_id (str): NS LCM Operation ID
+ nslcmop_operation_state (str): NS LCM Operation status
+ logging_text (str): Common log message
+ message (str): Message which is sent through Kafka
+ autoremove (Boolean): True/False If True NBI deletes NS from DB
+
+ Raises:
+ PaasServiceException
+ """
+ if nslcmop_operation_state:
+ update_dict = {
+ "nsr_id": nsr_id,
+ "nslcmop_id": nslcmop_id,
+ "operationState": nslcmop_operation_state,
+ }
+ if message == "terminated":
+ update_dict["autoremove"] = autoremove
+ try:
+ await self.msg.aiowrite(
+ "ns",
+ message,
+ update_dict,
+ loop=self.loop,
+ )
+ except MsgException as e:
+ error = logging_text + f"kafka_write notification Exception: {e}"
+ self.logger.error(error)
+ raise PaasServiceException(error)
+
+ def _update_ns_state(self, nsr_id: str, db_nsr_update: dict, ns_state: str) -> None:
+ """Update NS state in NSR and VNFRs
+ Args:
+ nsr_id (str): NS record ID
+ db_nsr_update (dict): NS record dictionary
+ ns_state (str): NS status
+ """
+ db_nsr_update["_admin.nsState"] = ns_state
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ self.db.set_list("vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": ns_state})
+
+ def _update_db_ns_state_after_operation(
+ self,
+ error_list: list,
+ operation_type: str,
+ nslcmop_id: str,
+ db_nsr_update: dict,
+ db_nsr: dict,
+ nsr_id: str,
+ ) -> None:
+ """Update NS status at database after performing operations
+ Args:
+ error_list (list): List of errors
+ operation_type (str): Type of operation such as instantiate/terminate
+ nslcmop_id (str): NS LCM Operation ID
+ db_nsr_update (dict): NSR update dictionary
+ db_nsr (dict): NS record dictionary
+ nsr_id (str): NS record ID
+ """
+ ns_state = ""
+ if error_list:
+ error_detail = ". ".join(error_list)
+ error_description_nsr = "Operation: {}.{}".format(
+ operation_type, nslcmop_id
+ )
+ db_nsr_update["detailed-status"] = (
+ error_description_nsr + " Detail: " + error_detail
+ )
+ ns_state = "BROKEN"
+
+ else:
+ error_detail = None
+ error_description_nsr = None
+ db_nsr_update["detailed-status"] = "Done"
+ if operation_type == "instantiate":
+ ns_state = "READY"
+ elif operation_type == "terminate":
+ ns_state = "NOT_INSTANTIATED"
+ db_nsr_update["operational-status"] = "terminated"
+ db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
+
+ if db_nsr:
+ self._write_ns_status(
+ nsr_id=nsr_id,
+ ns_state=ns_state,
+ current_operation="IDLE",
+ current_operation_id=None,
+ error_description=error_description_nsr,
+ error_detail=error_detail,
+ other_update=db_nsr_update,
+ )
+
+ if ns_state == "NOT_INSTANTIATED":
+ self.db.set_list(
+ "vnfrs",
+ {"nsr-id-ref": nsr_id},
+ {"_admin.nsState": "NOT_INSTANTIATED"},
+ )
+
+ def _update_db_nslcmop_status_after_operation(
+ self, error_list: list, db_nslcmop_update: dict, nslcmop_id: str
+ ) -> str:
+ """Update NS LCM operation status at database after performing operation
+ Args
+ error_list (list): List of errors
+ db_nslcmop_update (dict): NS LCM operation update dictionary
+ nslcmop_id (str): NS LCM Operation ID
+
+ Returns:
+ nslcmop_operation_state (str): State of NS LCM operation
+ """
+ if error_list:
+ error_detail = ". ".join(error_list)
+ error_description_nslcmop = "Detail: {}".format(error_detail)
+ db_nslcmop_update["detailed-status"] = error_detail
+ nslcmop_operation_state = "FAILED"
+
+ else:
+ error_description_nslcmop = None
+ db_nslcmop_update["detailed-status"] = "Done"
+ nslcmop_operation_state = "COMPLETED"
+
+ self._write_op_status(
+ op_id=nslcmop_id,
+ stage=nslcmop_operation_state,
+ error_message=error_description_nslcmop,
+ operation_state=nslcmop_operation_state,
+ other_update=db_nslcmop_update,
+ )
+
+ return nslcmop_operation_state
+
+ def _update_db_after_operation(
+ self,
+ nslcmop_id: str,
+ db_nsr: str,
+ nsr_id: str,
+ db_nslcmop_update: dict = None,
+ db_nsr_update: dict = None,
+ error_list: list = None,
+ operation_type: str = None,
+ ) -> str:
+ """Update database after operation is performed.
+ Args:
+ nslcmop_id (str): NS LCM Operation ID
+ db_nsr (dict): NS record dictionary
+ nsr_id (str): NS record ID
+ db_nslcmop_update (dict): NS LCM operation update dictionary
+ db_nsr_update (dict): NSR update dictionary
+ error_list (list): List of errors
+ operation_type (str): Type of operation such as instantiate/terminate
+
+ Returns:
+ nslcmop_operation_state (str): State of NS LCM operation
+ """
+ # Update NS state
+ self._update_db_ns_state_after_operation(
+ error_list=error_list,
+ operation_type=operation_type,
+ nslcmop_id=nslcmop_id,
+ db_nsr_update=db_nsr_update,
+ db_nsr=db_nsr,
+ nsr_id=nsr_id,
+ )
+
+ # Update NS LCM Operation State
+ nslcmop_operation_state = self._update_db_nslcmop_status_after_operation(
+ error_list, db_nslcmop_update, nslcmop_id
+ )
+ return nslcmop_operation_state
+
+ async def instantiate(self, nsr_id: str, nslcmop_id: str) -> None:
+ """Perform PaaS Service instantiation.
+ Args:
+ nsr_id (str): NS service record to be used
+ nslcmop_id (str): NS LCM operation id
+ """
+ # Locking HA task
+ if not self._lock_ha_task(nslcmop_id, nsr_id, keyword="instantiate"):
+ return
+
+ logging_text = f"Task ns={nsr_id} instantiate={nslcmop_id} "
+ self.logger.debug(logging_text + "Enter")
+
+ # Required containers
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+ db_nsr_update, db_nslcmop_update, tasks_dict_info = {}, {}, {}
+ exc = None
+ error_list = []
+
+ try:
+ # Wait for any previous tasks in process
+ await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
+ # Update nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
+ self._prepare_db_before_operation(
+ db_nsr_update,
+ nsr_id,
+ nslcmop_id,
+ detailed="creating",
+ operational="init",
+ ns_state="BUILDING",
+ current_op="INSTANTIATING",
+ stage="Building",
+ )
+
+ # Perform PaaS Service Deployment using PaaS Connector
+ self.logger.debug(logging_text + "Creating instantiate task")
+ task_instantiate = asyncio.ensure_future(
+ self.paas_connector.instantiate(nsr_id, nslcmop_id)
+ )
+ self.lcm_tasks.register(
+ "ns",
+ nsr_id,
+ nslcmop_id,
+ "instantiate_juju_paas_service",
+ task_instantiate,
+ )
+ tasks_dict_info[task_instantiate] = "Instantiate juju PaaS Service"
+
+ # Update nsState="INSTANTIATED"
+ self.logger.debug(logging_text + "INSTANTIATED")
+ self._update_ns_state(nsr_id, db_nsr_update, "INSTANTIATED")
+
+ except (
+ DbException,
+ LcmException,
+ JujuPaasConnException,
+ JujuPaasServiceException,
+ ) as e:
+ self.logger.error(logging_text + "Exit Exception: {}".format(e))
+ exc = e
+ except asyncio.CancelledError:
+ self.logger.error(logging_text + "Cancelled Exception")
+ exc = "Operation was cancelled"
+
+ finally:
+ if exc:
+ error_list.append(str(exc))
+ try:
+ if tasks_dict_info:
+ # Wait for pending tasks
+ stage = "Waiting for instantiate pending tasks."
+ self.logger.debug(logging_text + stage)
+ error_list += await self._wait_for_tasks(
+ logging_text,
+ tasks_dict_info,
+ self.timeout_ns_deploy,
+ stage,
+ nslcmop_id,
+ nsr_id=nsr_id,
+ )
+ except asyncio.CancelledError:
+ error_list.append("Cancelled")
+ except Exception as exc:
+ error_list.append(str(exc))
+
+ # Update operational-status
+ self.logger.debug("updating operational status")
+ db_nsr_update["operational-status"] = "running"
+
+ # Update status at database after operation
+ self.logger.debug(logging_text + "Updating DB after operation")
+ nslcmop_operation_state = self._update_db_after_operation(
+ nslcmop_id,
+ db_nsr,
+ nsr_id,
+ db_nslcmop_update=db_nslcmop_update,
+ db_nsr_update=db_nsr_update,
+ error_list=error_list,
+ operation_type="instantiate",
+ )
+
+ # Write to Kafka bus to report the operation status
+ await self._report_to_kafka(
+ nsr_id,
+ nslcmop_id,
+ nslcmop_operation_state,
+ logging_text,
+ "instantiated",
+ )
+ self.logger.debug(logging_text + "Exit")
+
+ # Remove task
+ self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
+
+ async def terminate(self, nsr_id: str, nslcmop_id: str) -> None:
+ """Perform PaaS Service termination.
+ Args:
+ nsr_id (str): NS service record to be used
+ nslcmop_id (str): NS LCM operation id
+ """
+ # Locking HA task
+ if not self._lock_ha_task(nslcmop_id, nsr_id, keyword="terminate"):
+ return
+
+ logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
+ self.logger.debug(logging_text + "Enter")
+
+ # Update ns termination timeout
+ timeout_ns_terminate = self.timeout_ns_deploy
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+ db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
+ operation_params = db_nslcmop.get("operationParams") or {}
+
+ if operation_params.get("timeout_ns_terminate"):
+ timeout_ns_terminate = operation_params["timeout_ns_terminate"]
+
+ # Required containers
+ autoremove = False
+ db_nsr_update, db_nslcmop_update, tasks_dict_info = {}, {}, {}
+ exc = None
+ error_list = []
+
+ try:
+ # Wait for any previous tasks in process
+ await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
+
+ # Update nsState="TERMINATING", currentOperation="TERMINATING", currentOperationID=nslcmop_id
+ self._prepare_db_before_operation(
+ db_nsr_update,
+ nsr_id,
+ nslcmop_id,
+ detailed="terminating",
+ operational="terminate",
+ ns_state="TERMINATING",
+ current_op="TERMINATING",
+ stage="terminating",
+ )
+
+ # Perform PaaS Service deletion using PaaS Connector
+ self.logger.debug(logging_text + "Creating terminate task")
+ task_terminate = asyncio.ensure_future(
+ self.paas_connector.terminate(nsr_id, nslcmop_id)
+ )
+ self.lcm_tasks.register(
+ "ns", nsr_id, nslcmop_id, "terminate_juju_paas_service", task_terminate
+ )
+ tasks_dict_info[task_terminate] = "Terminate juju PaaS Service"
+
+ # Update nsState="TERMINATED"
+ self.logger.debug(logging_text + "TERMINATED")
+ self._update_ns_state(nsr_id, db_nsr_update, "TERMINATED")
+
+ except (
+ DbException,
+ LcmException,
+ JujuPaasConnException,
+ JujuPaasServiceException,
+ ) as e:
+ self.logger.error(logging_text + "Exit Exception: {}".format(e))
+ exc = e
+ except asyncio.CancelledError:
+ self.logger.error(logging_text + "Cancelled Exception")
+ exc = "Operation was cancelled"
+
+ finally:
+ if exc:
+ error_list.append(str(exc))
+ try:
+ if tasks_dict_info:
+ # Wait for pending tasks
+ stage = "Waiting for pending tasks for termination."
+ self.logger.debug(logging_text + stage)
+ error_list += await self._wait_for_tasks(
+ logging_text,
+ tasks_dict_info,
+ min(self.timeout_ns_deploy, timeout_ns_terminate),
+ stage,
+ nslcmop_id,
+ nsr_id=nsr_id,
+ )
+ except asyncio.CancelledError:
+ error_list.append("Cancelled")
+ except Exception as exc:
+ error_list.append(str(exc))
+
+ # Update status at database
+ nslcmop_operation_state = self._update_db_after_operation(
+ nslcmop_id,
+ db_nsr,
+ nsr_id,
+ db_nslcmop_update=db_nslcmop_update,
+ db_nsr_update=db_nsr_update,
+ error_list=error_list,
+ operation_type="terminate",
+ )
+
+ # Write to Kafka bus to report the operation status
+ if operation_params:
+ autoremove = operation_params.get("autoremove", False)
+
+ await self._report_to_kafka(
+ nsr_id,
+ nslcmop_id,
+ nslcmop_operation_state,
+ logging_text,
+ "terminated",
+ autoremove=autoremove,
+ )
+ self.logger.debug(logging_text + "Exit")
+
+ # Remove task
+ self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
+
+ async def action(self, nsr_id: str, nslcmop_id: str):
+ """Perform action on PaaS service.
+ Args:
+ nsr_id (str): NS service record to be used
+ nslcmop_id (str): NS LCM operation id
+
+ Raises:
+ NotImplementedError
+ """
+ raise NotImplementedError("Juju Paas Service action method is not implemented")