blob: 0a011575947638f57b381605b7ae529586f9afab [file] [log] [blame]
# !/usr/bin/python3
#
# Copyright 2022 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import logging
import traceback
from osm_common.dbbase import DbException
from osm_common.msgbase import MsgException
from osm_lcm.lcm_utils import LcmBase
from osm_lcm.lcm_utils import LcmException
from osm_lcm.paas_conn import JujuPaasConnException, paas_connector_factory
from time import time
def paas_service_factory(
msg: object,
lcm_tasks: object,
db: object,
fs: object,
log: object,
loop: object,
config: dict,
paas_type="juju",
) -> object:
"""Factory Method to create the paas_service objects according to PaaS Type.
Args:
msg (object): Message object to be used to write the messages to Kafka Bus
lcm_tasks (object): Task object to register the tasks
db (object): Database object to write current operation status
fs (object): Filesystem object to use during operations
log (object) Logger for tracing
loop (object) Async event loop object
config (dict): Dictionary with extra PaaS Service information.
paas_type (str): Identifier to create paas_service object using correct PaaS Service Class
Returns:
paas_service (object): paas_service objects created according to given PaaS Type
Raises:
PaasServiceException
"""
orchestrators = {
"juju": JujuPaasService,
}
if paas_type not in orchestrators.keys():
raise PaasServiceException(f"PaaS type: {paas_type} is not available.")
return orchestrators[paas_type](
msg=msg, lcm_tasks=lcm_tasks, db=db, fs=fs, loop=loop, logger=log, config=config
)
class PaasServiceException(Exception):
"""PaaS Service Exception Base Class"""
def __init__(self, message: str = ""):
"""Constructor of PaaS Service Exception
Args:
message (str): error message to be raised
"""
Exception.__init__(self, message)
self.message = message
def __str__(self):
return self.message
def __repr__(self):
return "{}({})".format(type(self), self.message)
class JujuPaasServiceException(PaasServiceException):
"""Juju PaaS Service exception class"""
class JujuPaasService(LcmBase):
"""Juju PaaS Service class to handle ns operations such as instantiate, terminate, action etc."""
timeout_ns_deploy = 3600
def __init__(
self,
msg: object,
lcm_tasks: object,
db: object,
fs: object,
loop: object,
logger: object,
config: dict,
):
"""
Args:
msg (object): Message object to be used to write the messages to Kafka Bus
lcm_tasks (object): Task object to register the tasks
db (object): Database object to write current operation status
fs (object): Filesystem object to use during operations
loop (object) Async event loop object
logger (object): Logger for tracing
config (dict): Dictionary with extra PaaS Service information.
"""
self.logger = logging.getLogger("lcm.juju_paas_service")
self.loop = loop
self.lcm_tasks = lcm_tasks
self.config = config
super(JujuPaasService, self).__init__(msg=msg, logger=self.logger)
self.paas_connector = paas_connector_factory(
self.msg,
self.lcm_tasks,
self.db,
self.fs,
self.loop,
self.logger,
self.config,
"juju",
)
def _lock_ha_task(self, nslcmop_id: str, nsr_id: str, keyword: str) -> bool:
"""Lock the task.
Args:
nslcmop_id (str): NS LCM operation id
nsr_id (str): NS service record to be used
keyword (str): Word which indicates action such as instantiate, terminate
Returns:
task_is_locked_by_me (Boolean): True if task_is_locked_by_me else False
"""
task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
if not task_is_locked_by_me:
self.logger.debug(
f"{keyword}() task is not locked by me, ns={nsr_id}, exiting."
)
return task_is_locked_by_me
def _write_ns_status(
self,
nsr_id: str,
ns_state: str,
current_operation: str,
current_operation_id: str,
error_description: str = None,
error_detail: str = None,
other_update: dict = None,
) -> None:
"""Update NS record.
Args:
nsr_id (str): NS service record to be used
ns_state (str): NS state
current_operation (str): Current operation name
current_operation_id (str): Current operation ID
error_description (str): Error description
error_detail (str): Details of error
other_update: (dict): Other required changes at database if provided
Raises:
DbException
"""
try:
db_dict = other_update or {}
db_update_dict = {
"_admin.nslcmop": current_operation_id,
"_admin.current-operation": current_operation_id,
"_admin.operation-type": current_operation
if current_operation != "IDLE"
else None,
"currentOperation": current_operation,
"currentOperationID": current_operation_id,
"errorDescription": error_description,
"errorDetail": error_detail,
}
db_dict.update(db_update_dict)
if ns_state:
db_dict["nsState"] = ns_state
self.update_db_2("nsrs", nsr_id, db_dict)
except DbException as e:
error = f"Error writing NS status, ns={nsr_id}: {e}"
self.logger.error(error)
raise JujuPaasServiceException(error)
def _write_op_status(
self,
op_id: str,
stage: str = None,
error_message: str = None,
queue_position: int = 0,
operation_state: str = None,
other_update: dict = None,
) -> None:
"""Update NS LCM Operation Status.
Args:
op_id (str): Operation ID
stage (str): Indicates the stage of operations
error_message (str): Error description
queue_position (int): Operation position in the queue
operation_state (str): State of operation
other_update: (dict): Other required changes at database if provided
Raises:
DbException
"""
try:
db_dict = other_update or {}
db_dict["queuePosition"] = queue_position
if stage:
db_dict["stage"] = str(stage)
if error_message:
db_dict["errorMessage"] = error_message
if operation_state:
db_dict["operationState"] = operation_state
db_dict["statusEnteredTime"] = time()
self.update_db_2("nslcmops", op_id, db_dict)
except DbException as e:
error = f"Error writing OPERATION status for op_id: {op_id} -> {e}"
self.logger.error(error)
raise JujuPaasServiceException(error)
def _update_nsr_error_desc(
self,
stage: str,
new_error: str,
error_list: list,
error_detail_list: list,
nsr_id: str,
) -> None:
"""Update error description in NS record.
Args:
stage (str): Indicates the stage of operations
new_error (str): New detected error
error_list (str): Updated error list
error_detail_list: Updated detailed error list
nsr_id (str): NS service record to be used
Raises:
DbException
"""
if new_error:
stage += " Errors: " + ". ".join(error_detail_list) + "."
if nsr_id:
try:
# Update nsr
self.update_db_2(
"nsrs",
nsr_id,
{
"errorDescription": "Error at: " + ", ".join(error_list),
"errorDetail": ". ".join(error_detail_list),
},
)
except DbException as e:
error = f"Error updating NSR error description for nsr_id: {nsr_id} -> {e}"
self.logger.error(error)
raise JujuPaasServiceException(error)
def _check_tasks_in_done(
self,
completed_tasks_list: list,
created_tasks_info: dict,
error_list: list,
error_detail_list: list,
logging_text: str,
) -> (str, str, str):
"""Check the completed tasks to detect errors
Args:
completed_tasks_list (list): List of completed tasks
created_tasks_info: Dictionary which includes the tasks
error_list: List of errors
error_detail_list: List includes details of errors
logging_text: Main log message
Returns:
new_error (str): New detected error
error_list (str): Updated error list
error_detail_list: Updated detailed error list
"""
new_error = ""
for task in completed_tasks_list:
if task.cancelled():
exc = "Cancelled"
else:
exc = task.exception()
if exc:
if isinstance(exc, asyncio.TimeoutError):
exc = "Timeout"
new_error = created_tasks_info[task] + ": {}".format(exc)
error_list.append(created_tasks_info[task])
error_detail_list.append(new_error)
if isinstance(
exc,
(
str,
DbException,
LcmException,
JujuPaasConnException,
JujuPaasServiceException,
),
):
self.logger.error(logging_text + new_error)
else:
exc_traceback = "".join(
traceback.format_exception(None, exc, exc.__traceback__)
)
self.logger.error(
logging_text + created_tasks_info[task] + " " + exc_traceback
)
else:
self.logger.debug(logging_text + created_tasks_info[task] + ": Done")
return new_error, error_list, error_detail_list
async def _wait_for_tasks(
self,
logging_text: str,
created_tasks_info: dict,
timeout: int,
stage: str,
nslcmop_id: str,
nsr_id: str,
) -> None:
"""Wait for tasks to be completed.
Args:
logging_text (str): Log message
created_tasks_info (dict): Dictionary which includes the tasks
timeout (inst): Timeout in seconds
stage (str): Indicates the stage of operations
nslcmop_id (str): NS LCM Operation ID
nsr_id (str): NS service record to be used
"""
time_start = time()
error_detail_list, error_list = [], []
pending_tasks = list(created_tasks_info.keys())
num_tasks = len(pending_tasks)
num_done = 0
self._write_op_status(nslcmop_id, stage=f"{stage}: {num_done}/{num_tasks}")
while pending_tasks:
_timeout = timeout + time_start - time()
done, pending_tasks = await asyncio.wait(
pending_tasks, timeout=_timeout, return_when=asyncio.FIRST_COMPLETED
)
num_done += len(done)
if not done:
# Timeout error
for task in pending_tasks:
new_error = created_tasks_info[task] + ": Timeout"
error_detail_list.append(new_error)
error_list.append(new_error)
break
# Find out the errors in completed tasks
new_error, error_list, error_detail_list = self._check_tasks_in_done(
completed_tasks_list=done,
created_tasks_info=created_tasks_info,
error_detail_list=error_detail_list,
error_list=error_list,
logging_text=logging_text,
)
self._update_nsr_error_desc(
stage=f"{stage}: {num_done}/{num_tasks}",
new_error=new_error,
error_list=error_list,
error_detail_list=error_detail_list,
nsr_id=nsr_id,
)
self._write_op_status(nslcmop_id, stage=f"{stage}: {num_done}/{num_tasks}")
return error_detail_list
def _prepare_db_before_operation(
self,
db_nsr_update: dict,
nsr_id: str,
nslcmop_id: str,
detailed: str = None,
operational: str = None,
ns_state: str = None,
current_op: str = None,
stage: str = None,
) -> None:
"""Update DB before performing NS operations
Args:
db_nsr_update (dict): NS record update dictionary
nsr_id (str): NS record ID
nslcmop_id (str): NS LCM Operation ID
detailed: (str): Detailed status
operational (str): Operational status
ns_state (str): NS state
current_op (str): Current operation name
stage (str): Indicates the stage of operations
"""
db_nsr_update["detailed-status"] = detailed
db_nsr_update["operational-status"] = operational
self._write_ns_status(
nsr_id=nsr_id,
ns_state=ns_state,
current_operation=current_op,
current_operation_id=nslcmop_id,
other_update=db_nsr_update,
)
self._write_op_status(op_id=nslcmop_id, stage=stage, queue_position=0)
async def _report_to_kafka(
self,
nsr_id: str,
nslcmop_id: str,
nslcmop_operation_state: str,
logging_text: str,
message: str,
autoremove="False",
) -> None:
"""Report operation status to Kafka.
Args:
nsr_id (str): NS record ID
nslcmop_id (str): NS LCM Operation ID
nslcmop_operation_state (str): NS LCM Operation status
logging_text (str): Common log message
message (str): Message which is sent through Kafka
autoremove (Boolean): True/False If True NBI deletes NS from DB
Raises:
PaasServiceException
"""
if nslcmop_operation_state:
update_dict = {
"nsr_id": nsr_id,
"nslcmop_id": nslcmop_id,
"operationState": nslcmop_operation_state,
}
if message == "terminated":
update_dict["autoremove"] = autoremove
try:
await self.msg.aiowrite(
"ns",
message,
update_dict,
loop=self.loop,
)
except MsgException as e:
error = logging_text + f"kafka_write notification Exception: {e}"
self.logger.error(error)
raise PaasServiceException(error)
def _update_ns_state(self, nsr_id: str, db_nsr_update: dict, ns_state: str) -> None:
"""Update NS state in NSR and VNFRs
Args:
nsr_id (str): NS record ID
db_nsr_update (dict): NS record dictionary
ns_state (str): NS status
"""
db_nsr_update["_admin.nsState"] = ns_state
self.update_db_2("nsrs", nsr_id, db_nsr_update)
self.db.set_list("vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": ns_state})
def _update_db_ns_state_after_operation(
self,
error_list: list,
operation_type: str,
nslcmop_id: str,
db_nsr_update: dict,
db_nsr: dict,
nsr_id: str,
) -> None:
"""Update NS status at database after performing operations
Args:
error_list (list): List of errors
operation_type (str): Type of operation such as instantiate/terminate
nslcmop_id (str): NS LCM Operation ID
db_nsr_update (dict): NSR update dictionary
db_nsr (dict): NS record dictionary
nsr_id (str): NS record ID
"""
ns_state = ""
if error_list:
error_detail = ". ".join(error_list)
error_description_nsr = "Operation: {}.{}".format(
operation_type, nslcmop_id
)
db_nsr_update["detailed-status"] = (
error_description_nsr + " Detail: " + error_detail
)
ns_state = "BROKEN"
else:
error_detail = None
error_description_nsr = None
db_nsr_update["detailed-status"] = "Done"
if operation_type == "instantiate":
ns_state = "READY"
elif operation_type == "terminate":
ns_state = "NOT_INSTANTIATED"
db_nsr_update["operational-status"] = "terminated"
db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
if db_nsr:
self._write_ns_status(
nsr_id=nsr_id,
ns_state=ns_state,
current_operation="IDLE",
current_operation_id=None,
error_description=error_description_nsr,
error_detail=error_detail,
other_update=db_nsr_update,
)
if ns_state == "NOT_INSTANTIATED":
self.db.set_list(
"vnfrs",
{"nsr-id-ref": nsr_id},
{"_admin.nsState": "NOT_INSTANTIATED"},
)
def _update_db_nslcmop_status_after_operation(
self, error_list: list, db_nslcmop_update: dict, nslcmop_id: str
) -> str:
"""Update NS LCM operation status at database after performing operation
Args
error_list (list): List of errors
db_nslcmop_update (dict): NS LCM operation update dictionary
nslcmop_id (str): NS LCM Operation ID
Returns:
nslcmop_operation_state (str): State of NS LCM operation
"""
if error_list:
error_detail = ". ".join(error_list)
error_description_nslcmop = "Detail: {}".format(error_detail)
db_nslcmop_update["detailed-status"] = error_detail
nslcmop_operation_state = "FAILED"
else:
error_description_nslcmop = None
db_nslcmop_update["detailed-status"] = "Done"
nslcmop_operation_state = "COMPLETED"
self._write_op_status(
op_id=nslcmop_id,
stage=nslcmop_operation_state,
error_message=error_description_nslcmop,
operation_state=nslcmop_operation_state,
other_update=db_nslcmop_update,
)
return nslcmop_operation_state
def _update_db_after_operation(
self,
nslcmop_id: str,
db_nsr: str,
nsr_id: str,
db_nslcmop_update: dict = None,
db_nsr_update: dict = None,
error_list: list = None,
operation_type: str = None,
) -> str:
"""Update database after operation is performed.
Args:
nslcmop_id (str): NS LCM Operation ID
db_nsr (dict): NS record dictionary
nsr_id (str): NS record ID
db_nslcmop_update (dict): NS LCM operation update dictionary
db_nsr_update (dict): NSR update dictionary
error_list (list): List of errors
operation_type (str): Type of operation such as instantiate/terminate
Returns:
nslcmop_operation_state (str): State of NS LCM operation
"""
# Update NS state
self._update_db_ns_state_after_operation(
error_list=error_list,
operation_type=operation_type,
nslcmop_id=nslcmop_id,
db_nsr_update=db_nsr_update,
db_nsr=db_nsr,
nsr_id=nsr_id,
)
# Update NS LCM Operation State
nslcmop_operation_state = self._update_db_nslcmop_status_after_operation(
error_list, db_nslcmop_update, nslcmop_id
)
return nslcmop_operation_state
async def instantiate(self, nsr_id: str, nslcmop_id: str) -> None:
"""Perform PaaS Service instantiation.
Args:
nsr_id (str): NS service record to be used
nslcmop_id (str): NS LCM operation id
"""
# Locking HA task
if not self._lock_ha_task(nslcmop_id, nsr_id, keyword="instantiate"):
return
logging_text = f"Task ns={nsr_id} instantiate={nslcmop_id} "
self.logger.debug(logging_text + "Enter")
# Required containers
db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
db_nsr_update, db_nslcmop_update, tasks_dict_info = {}, {}, {}
exc = None
error_list = []
try:
# Wait for any previous tasks in process
await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
# Update nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
self._prepare_db_before_operation(
db_nsr_update,
nsr_id,
nslcmop_id,
detailed="creating",
operational="init",
ns_state="BUILDING",
current_op="INSTANTIATING",
stage="Building",
)
# Perform PaaS Service Deployment using PaaS Connector
self.logger.debug(logging_text + "Creating instantiate task")
task_instantiate = asyncio.ensure_future(
self.paas_connector.instantiate(nsr_id, nslcmop_id)
)
self.lcm_tasks.register(
"ns",
nsr_id,
nslcmop_id,
"instantiate_juju_paas_service",
task_instantiate,
)
tasks_dict_info[task_instantiate] = "Instantiate juju PaaS Service"
# Update nsState="INSTANTIATED"
self.logger.debug(logging_text + "INSTANTIATED")
self._update_ns_state(nsr_id, db_nsr_update, "INSTANTIATED")
except (
DbException,
LcmException,
JujuPaasConnException,
JujuPaasServiceException,
) as e:
self.logger.error(logging_text + "Exit Exception: {}".format(e))
exc = e
except asyncio.CancelledError:
self.logger.error(logging_text + "Cancelled Exception")
exc = "Operation was cancelled"
finally:
if exc:
error_list.append(str(exc))
try:
if tasks_dict_info:
# Wait for pending tasks
stage = "Waiting for instantiate pending tasks."
self.logger.debug(logging_text + stage)
error_list += await self._wait_for_tasks(
logging_text,
tasks_dict_info,
self.timeout_ns_deploy,
stage,
nslcmop_id,
nsr_id=nsr_id,
)
except asyncio.CancelledError:
error_list.append("Cancelled")
except Exception as exc:
error_list.append(str(exc))
# Update operational-status
self.logger.debug("updating operational status")
db_nsr_update["operational-status"] = "running"
# Update status at database after operation
self.logger.debug(logging_text + "Updating DB after operation")
nslcmop_operation_state = self._update_db_after_operation(
nslcmop_id,
db_nsr,
nsr_id,
db_nslcmop_update=db_nslcmop_update,
db_nsr_update=db_nsr_update,
error_list=error_list,
operation_type="instantiate",
)
# Write to Kafka bus to report the operation status
await self._report_to_kafka(
nsr_id,
nslcmop_id,
nslcmop_operation_state,
logging_text,
"instantiated",
)
self.logger.debug(logging_text + "Exit")
# Remove task
self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
async def terminate(self, nsr_id: str, nslcmop_id: str) -> None:
"""Perform PaaS Service termination.
Args:
nsr_id (str): NS service record to be used
nslcmop_id (str): NS LCM operation id
"""
# Locking HA task
if not self._lock_ha_task(nslcmop_id, nsr_id, keyword="terminate"):
return
logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
self.logger.debug(logging_text + "Enter")
# Update ns termination timeout
timeout_ns_terminate = self.timeout_ns_deploy
db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
operation_params = db_nslcmop.get("operationParams") or {}
if operation_params.get("timeout_ns_terminate"):
timeout_ns_terminate = operation_params["timeout_ns_terminate"]
# Required containers
autoremove = False
db_nsr_update, db_nslcmop_update, tasks_dict_info = {}, {}, {}
exc = None
error_list = []
try:
# Wait for any previous tasks in process
await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
# Update nsState="TERMINATING", currentOperation="TERMINATING", currentOperationID=nslcmop_id
self._prepare_db_before_operation(
db_nsr_update,
nsr_id,
nslcmop_id,
detailed="terminating",
operational="terminate",
ns_state="TERMINATING",
current_op="TERMINATING",
stage="terminating",
)
# Perform PaaS Service deletion using PaaS Connector
self.logger.debug(logging_text + "Creating terminate task")
task_terminate = asyncio.ensure_future(
self.paas_connector.terminate(nsr_id, nslcmop_id)
)
self.lcm_tasks.register(
"ns", nsr_id, nslcmop_id, "terminate_juju_paas_service", task_terminate
)
tasks_dict_info[task_terminate] = "Terminate juju PaaS Service"
# Update nsState="TERMINATED"
self.logger.debug(logging_text + "TERMINATED")
self._update_ns_state(nsr_id, db_nsr_update, "TERMINATED")
except (
DbException,
LcmException,
JujuPaasConnException,
JujuPaasServiceException,
) as e:
self.logger.error(logging_text + "Exit Exception: {}".format(e))
exc = e
except asyncio.CancelledError:
self.logger.error(logging_text + "Cancelled Exception")
exc = "Operation was cancelled"
finally:
if exc:
error_list.append(str(exc))
try:
if tasks_dict_info:
# Wait for pending tasks
stage = "Waiting for pending tasks for termination."
self.logger.debug(logging_text + stage)
error_list += await self._wait_for_tasks(
logging_text,
tasks_dict_info,
min(self.timeout_ns_deploy, timeout_ns_terminate),
stage,
nslcmop_id,
nsr_id=nsr_id,
)
except asyncio.CancelledError:
error_list.append("Cancelled")
except Exception as exc:
error_list.append(str(exc))
# Update status at database
nslcmop_operation_state = self._update_db_after_operation(
nslcmop_id,
db_nsr,
nsr_id,
db_nslcmop_update=db_nslcmop_update,
db_nsr_update=db_nsr_update,
error_list=error_list,
operation_type="terminate",
)
# Write to Kafka bus to report the operation status
if operation_params:
autoremove = operation_params.get("autoremove", False)
await self._report_to_kafka(
nsr_id,
nslcmop_id,
nslcmop_operation_state,
logging_text,
"terminated",
autoremove=autoremove,
)
self.logger.debug(logging_text + "Exit")
# Remove task
self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
async def action(self, nsr_id: str, nslcmop_id: str):
"""Perform action on PaaS service.
Args:
nsr_id (str): NS service record to be used
nslcmop_id (str): NS LCM operation id
Raises:
NotImplementedError
"""
raise NotImplementedError("Juju Paas Service action method is not implemented")