| # !/usr/bin/python3 |
| # |
| # Copyright 2022 Canonical Ltd. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import asyncio |
| import logging |
| import traceback |
| |
| from osm_common.dbbase import DbException |
| from osm_common.msgbase import MsgException |
| from osm_lcm.lcm_utils import LcmBase |
| from osm_lcm.lcm_utils import LcmException |
| from osm_lcm.paas_conn import JujuPaasConnException, paas_connector_factory |
| |
| from time import time |
| |
| |
| def paas_service_factory( |
| msg: object, |
| lcm_tasks: object, |
| db: object, |
| fs: object, |
| log: object, |
| loop: object, |
| config: dict, |
| paas_type="juju", |
| ) -> object: |
| """Factory Method to create the paas_service objects according to PaaS Type. |
| Args: |
| msg (object): Message object to be used to write the messages to Kafka Bus |
| lcm_tasks (object): Task object to register the tasks |
| db (object): Database object to write current operation status |
| fs (object): Filesystem object to use during operations |
| log (object) Logger for tracing |
| loop (object) Async event loop object |
| config (dict): Dictionary with extra PaaS Service information. |
| paas_type (str): Identifier to create paas_service object using correct PaaS Service Class |
| |
| Returns: |
| paas_service (object): paas_service objects created according to given PaaS Type |
| |
| Raises: |
| PaasServiceException |
| """ |
| orchestrators = { |
| "juju": JujuPaasService, |
| } |
| |
| if paas_type not in orchestrators.keys(): |
| raise PaasServiceException(f"PaaS type: {paas_type} is not available.") |
| |
| return orchestrators[paas_type]( |
| msg=msg, lcm_tasks=lcm_tasks, db=db, fs=fs, loop=loop, logger=log, config=config |
| ) |
| |
| |
| class PaasServiceException(Exception): |
| """PaaS Service Exception Base Class""" |
| |
| def __init__(self, message: str = ""): |
| """Constructor of PaaS Service Exception |
| Args: |
| message (str): error message to be raised |
| """ |
| Exception.__init__(self, message) |
| self.message = message |
| |
| def __str__(self): |
| return self.message |
| |
| def __repr__(self): |
| return "{}({})".format(type(self), self.message) |
| |
| |
| class JujuPaasServiceException(PaasServiceException): |
| """Juju PaaS Service exception class""" |
| |
| |
| class JujuPaasService(LcmBase): |
| """Juju PaaS Service class to handle ns operations such as instantiate, terminate, action etc.""" |
| |
| timeout_ns_deploy = 3600 |
| |
| def __init__( |
| self, |
| msg: object, |
| lcm_tasks: object, |
| db: object, |
| fs: object, |
| loop: object, |
| logger: object, |
| config: dict, |
| ): |
| """ |
| Args: |
| msg (object): Message object to be used to write the messages to Kafka Bus |
| lcm_tasks (object): Task object to register the tasks |
| db (object): Database object to write current operation status |
| fs (object): Filesystem object to use during operations |
| loop (object) Async event loop object |
| logger (object): Logger for tracing |
| config (dict): Dictionary with extra PaaS Service information. |
| """ |
| self.logger = logging.getLogger("lcm.juju_paas_service") |
| self.loop = loop |
| self.lcm_tasks = lcm_tasks |
| self.config = config |
| super(JujuPaasService, self).__init__(msg=msg, logger=self.logger) |
| |
| self.paas_connector = paas_connector_factory( |
| self.msg, |
| self.lcm_tasks, |
| self.db, |
| self.fs, |
| self.loop, |
| self.logger, |
| self.config, |
| "juju", |
| ) |
| |
| def _lock_ha_task(self, nslcmop_id: str, nsr_id: str, keyword: str) -> bool: |
| """Lock the task. |
| Args: |
| nslcmop_id (str): NS LCM operation id |
| nsr_id (str): NS service record to be used |
| keyword (str): Word which indicates action such as instantiate, terminate |
| |
| Returns: |
| task_is_locked_by_me (Boolean): True if task_is_locked_by_me else False |
| """ |
| task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id) |
| if not task_is_locked_by_me: |
| self.logger.debug( |
| f"{keyword}() task is not locked by me, ns={nsr_id}, exiting." |
| ) |
| return task_is_locked_by_me |
| |
| def _write_ns_status( |
| self, |
| nsr_id: str, |
| ns_state: str, |
| current_operation: str, |
| current_operation_id: str, |
| error_description: str = None, |
| error_detail: str = None, |
| other_update: dict = None, |
| ) -> None: |
| """Update NS record. |
| Args: |
| nsr_id (str): NS service record to be used |
| ns_state (str): NS state |
| current_operation (str): Current operation name |
| current_operation_id (str): Current operation ID |
| error_description (str): Error description |
| error_detail (str): Details of error |
| other_update: (dict): Other required changes at database if provided |
| |
| Raises: |
| DbException |
| """ |
| try: |
| db_dict = other_update or {} |
| db_update_dict = { |
| "_admin.nslcmop": current_operation_id, |
| "_admin.current-operation": current_operation_id, |
| "_admin.operation-type": current_operation |
| if current_operation != "IDLE" |
| else None, |
| "currentOperation": current_operation, |
| "currentOperationID": current_operation_id, |
| "errorDescription": error_description, |
| "errorDetail": error_detail, |
| } |
| db_dict.update(db_update_dict) |
| |
| if ns_state: |
| db_dict["nsState"] = ns_state |
| self.update_db_2("nsrs", nsr_id, db_dict) |
| |
| except DbException as e: |
| error = f"Error writing NS status, ns={nsr_id}: {e}" |
| self.logger.error(error) |
| raise JujuPaasServiceException(error) |
| |
| def _write_op_status( |
| self, |
| op_id: str, |
| stage: str = None, |
| error_message: str = None, |
| queue_position: int = 0, |
| operation_state: str = None, |
| other_update: dict = None, |
| ) -> None: |
| """Update NS LCM Operation Status. |
| Args: |
| op_id (str): Operation ID |
| stage (str): Indicates the stage of operations |
| error_message (str): Error description |
| queue_position (int): Operation position in the queue |
| operation_state (str): State of operation |
| other_update: (dict): Other required changes at database if provided |
| |
| Raises: |
| DbException |
| """ |
| try: |
| db_dict = other_update or {} |
| db_dict["queuePosition"] = queue_position |
| if stage: |
| db_dict["stage"] = str(stage) |
| if error_message: |
| db_dict["errorMessage"] = error_message |
| if operation_state: |
| db_dict["operationState"] = operation_state |
| db_dict["statusEnteredTime"] = time() |
| self.update_db_2("nslcmops", op_id, db_dict) |
| |
| except DbException as e: |
| error = f"Error writing OPERATION status for op_id: {op_id} -> {e}" |
| self.logger.error(error) |
| raise JujuPaasServiceException(error) |
| |
| def _update_nsr_error_desc( |
| self, |
| stage: str, |
| new_error: str, |
| error_list: list, |
| error_detail_list: list, |
| nsr_id: str, |
| ) -> None: |
| """Update error description in NS record. |
| Args: |
| stage (str): Indicates the stage of operations |
| new_error (str): New detected error |
| error_list (str): Updated error list |
| error_detail_list: Updated detailed error list |
| nsr_id (str): NS service record to be used |
| |
| Raises: |
| DbException |
| """ |
| if new_error: |
| stage += " Errors: " + ". ".join(error_detail_list) + "." |
| if nsr_id: |
| try: |
| # Update nsr |
| self.update_db_2( |
| "nsrs", |
| nsr_id, |
| { |
| "errorDescription": "Error at: " + ", ".join(error_list), |
| "errorDetail": ". ".join(error_detail_list), |
| }, |
| ) |
| |
| except DbException as e: |
| error = f"Error updating NSR error description for nsr_id: {nsr_id} -> {e}" |
| self.logger.error(error) |
| raise JujuPaasServiceException(error) |
| |
| def _check_tasks_in_done( |
| self, |
| completed_tasks_list: list, |
| created_tasks_info: dict, |
| error_list: list, |
| error_detail_list: list, |
| logging_text: str, |
| ) -> (str, str, str): |
| """Check the completed tasks to detect errors |
| Args: |
| completed_tasks_list (list): List of completed tasks |
| created_tasks_info: Dictionary which includes the tasks |
| error_list: List of errors |
| error_detail_list: List includes details of errors |
| logging_text: Main log message |
| |
| Returns: |
| new_error (str): New detected error |
| error_list (str): Updated error list |
| error_detail_list: Updated detailed error list |
| """ |
| new_error = "" |
| for task in completed_tasks_list: |
| if task.cancelled(): |
| exc = "Cancelled" |
| else: |
| exc = task.exception() |
| if exc: |
| if isinstance(exc, asyncio.TimeoutError): |
| exc = "Timeout" |
| new_error = created_tasks_info[task] + ": {}".format(exc) |
| error_list.append(created_tasks_info[task]) |
| error_detail_list.append(new_error) |
| if isinstance( |
| exc, |
| ( |
| str, |
| DbException, |
| LcmException, |
| JujuPaasConnException, |
| JujuPaasServiceException, |
| ), |
| ): |
| self.logger.error(logging_text + new_error) |
| else: |
| exc_traceback = "".join( |
| traceback.format_exception(None, exc, exc.__traceback__) |
| ) |
| self.logger.error( |
| logging_text + created_tasks_info[task] + " " + exc_traceback |
| ) |
| else: |
| self.logger.debug(logging_text + created_tasks_info[task] + ": Done") |
| |
| return new_error, error_list, error_detail_list |
| |
| async def _wait_for_tasks( |
| self, |
| logging_text: str, |
| created_tasks_info: dict, |
| timeout: int, |
| stage: str, |
| nslcmop_id: str, |
| nsr_id: str, |
| ) -> None: |
| """Wait for tasks to be completed. |
| Args: |
| logging_text (str): Log message |
| created_tasks_info (dict): Dictionary which includes the tasks |
| timeout (inst): Timeout in seconds |
| stage (str): Indicates the stage of operations |
| nslcmop_id (str): NS LCM Operation ID |
| nsr_id (str): NS service record to be used |
| """ |
| time_start = time() |
| error_detail_list, error_list = [], [] |
| pending_tasks = list(created_tasks_info.keys()) |
| num_tasks = len(pending_tasks) |
| num_done = 0 |
| |
| self._write_op_status(nslcmop_id, stage=f"{stage}: {num_done}/{num_tasks}") |
| |
| while pending_tasks: |
| _timeout = timeout + time_start - time() |
| done, pending_tasks = await asyncio.wait( |
| pending_tasks, timeout=_timeout, return_when=asyncio.FIRST_COMPLETED |
| ) |
| num_done += len(done) |
| if not done: |
| # Timeout error |
| for task in pending_tasks: |
| new_error = created_tasks_info[task] + ": Timeout" |
| error_detail_list.append(new_error) |
| error_list.append(new_error) |
| break |
| # Find out the errors in completed tasks |
| new_error, error_list, error_detail_list = self._check_tasks_in_done( |
| completed_tasks_list=done, |
| created_tasks_info=created_tasks_info, |
| error_detail_list=error_detail_list, |
| error_list=error_list, |
| logging_text=logging_text, |
| ) |
| |
| self._update_nsr_error_desc( |
| stage=f"{stage}: {num_done}/{num_tasks}", |
| new_error=new_error, |
| error_list=error_list, |
| error_detail_list=error_detail_list, |
| nsr_id=nsr_id, |
| ) |
| |
| self._write_op_status(nslcmop_id, stage=f"{stage}: {num_done}/{num_tasks}") |
| |
| return error_detail_list |
| |
| def _prepare_db_before_operation( |
| self, |
| db_nsr_update: dict, |
| nsr_id: str, |
| nslcmop_id: str, |
| detailed: str = None, |
| operational: str = None, |
| ns_state: str = None, |
| current_op: str = None, |
| stage: str = None, |
| ) -> None: |
| """Update DB before performing NS operations |
| Args: |
| db_nsr_update (dict): NS record update dictionary |
| nsr_id (str): NS record ID |
| nslcmop_id (str): NS LCM Operation ID |
| detailed: (str): Detailed status |
| operational (str): Operational status |
| ns_state (str): NS state |
| current_op (str): Current operation name |
| stage (str): Indicates the stage of operations |
| """ |
| db_nsr_update["detailed-status"] = detailed |
| db_nsr_update["operational-status"] = operational |
| |
| self._write_ns_status( |
| nsr_id=nsr_id, |
| ns_state=ns_state, |
| current_operation=current_op, |
| current_operation_id=nslcmop_id, |
| other_update=db_nsr_update, |
| ) |
| self._write_op_status(op_id=nslcmop_id, stage=stage, queue_position=0) |
| |
| async def _report_to_kafka( |
| self, |
| nsr_id: str, |
| nslcmop_id: str, |
| nslcmop_operation_state: str, |
| logging_text: str, |
| message: str, |
| autoremove="False", |
| ) -> None: |
| """Report operation status to Kafka. |
| Args: |
| nsr_id (str): NS record ID |
| nslcmop_id (str): NS LCM Operation ID |
| nslcmop_operation_state (str): NS LCM Operation status |
| logging_text (str): Common log message |
| message (str): Message which is sent through Kafka |
| autoremove (Boolean): True/False If True NBI deletes NS from DB |
| |
| Raises: |
| PaasServiceException |
| """ |
| if nslcmop_operation_state: |
| update_dict = { |
| "nsr_id": nsr_id, |
| "nslcmop_id": nslcmop_id, |
| "operationState": nslcmop_operation_state, |
| } |
| if message == "terminated": |
| update_dict["autoremove"] = autoremove |
| try: |
| await self.msg.aiowrite( |
| "ns", |
| message, |
| update_dict, |
| loop=self.loop, |
| ) |
| except MsgException as e: |
| error = logging_text + f"kafka_write notification Exception: {e}" |
| self.logger.error(error) |
| raise PaasServiceException(error) |
| |
| def _update_ns_state(self, nsr_id: str, db_nsr_update: dict, ns_state: str) -> None: |
| """Update NS state in NSR and VNFRs |
| Args: |
| nsr_id (str): NS record ID |
| db_nsr_update (dict): NS record dictionary |
| ns_state (str): NS status |
| """ |
| db_nsr_update["_admin.nsState"] = ns_state |
| self.update_db_2("nsrs", nsr_id, db_nsr_update) |
| self.db.set_list("vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": ns_state}) |
| |
| def _update_db_ns_state_after_operation( |
| self, |
| error_list: list, |
| operation_type: str, |
| nslcmop_id: str, |
| db_nsr_update: dict, |
| db_nsr: dict, |
| nsr_id: str, |
| ) -> None: |
| """Update NS status at database after performing operations |
| Args: |
| error_list (list): List of errors |
| operation_type (str): Type of operation such as instantiate/terminate |
| nslcmop_id (str): NS LCM Operation ID |
| db_nsr_update (dict): NSR update dictionary |
| db_nsr (dict): NS record dictionary |
| nsr_id (str): NS record ID |
| """ |
| ns_state = "" |
| if error_list: |
| error_detail = ". ".join(error_list) |
| error_description_nsr = "Operation: {}.{}".format( |
| operation_type, nslcmop_id |
| ) |
| db_nsr_update["detailed-status"] = ( |
| error_description_nsr + " Detail: " + error_detail |
| ) |
| ns_state = "BROKEN" |
| |
| else: |
| error_detail = None |
| error_description_nsr = None |
| db_nsr_update["detailed-status"] = "Done" |
| if operation_type == "instantiate": |
| ns_state = "READY" |
| elif operation_type == "terminate": |
| ns_state = "NOT_INSTANTIATED" |
| db_nsr_update["operational-status"] = "terminated" |
| db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED" |
| |
| if db_nsr: |
| self._write_ns_status( |
| nsr_id=nsr_id, |
| ns_state=ns_state, |
| current_operation="IDLE", |
| current_operation_id=None, |
| error_description=error_description_nsr, |
| error_detail=error_detail, |
| other_update=db_nsr_update, |
| ) |
| |
| if ns_state == "NOT_INSTANTIATED": |
| self.db.set_list( |
| "vnfrs", |
| {"nsr-id-ref": nsr_id}, |
| {"_admin.nsState": "NOT_INSTANTIATED"}, |
| ) |
| |
| def _update_db_nslcmop_status_after_operation( |
| self, error_list: list, db_nslcmop_update: dict, nslcmop_id: str |
| ) -> str: |
| """Update NS LCM operation status at database after performing operation |
| Args |
| error_list (list): List of errors |
| db_nslcmop_update (dict): NS LCM operation update dictionary |
| nslcmop_id (str): NS LCM Operation ID |
| |
| Returns: |
| nslcmop_operation_state (str): State of NS LCM operation |
| """ |
| if error_list: |
| error_detail = ". ".join(error_list) |
| error_description_nslcmop = "Detail: {}".format(error_detail) |
| db_nslcmop_update["detailed-status"] = error_detail |
| nslcmop_operation_state = "FAILED" |
| |
| else: |
| error_description_nslcmop = None |
| db_nslcmop_update["detailed-status"] = "Done" |
| nslcmop_operation_state = "COMPLETED" |
| |
| self._write_op_status( |
| op_id=nslcmop_id, |
| stage=nslcmop_operation_state, |
| error_message=error_description_nslcmop, |
| operation_state=nslcmop_operation_state, |
| other_update=db_nslcmop_update, |
| ) |
| |
| return nslcmop_operation_state |
| |
| def _update_db_after_operation( |
| self, |
| nslcmop_id: str, |
| db_nsr: str, |
| nsr_id: str, |
| db_nslcmop_update: dict = None, |
| db_nsr_update: dict = None, |
| error_list: list = None, |
| operation_type: str = None, |
| ) -> str: |
| """Update database after operation is performed. |
| Args: |
| nslcmop_id (str): NS LCM Operation ID |
| db_nsr (dict): NS record dictionary |
| nsr_id (str): NS record ID |
| db_nslcmop_update (dict): NS LCM operation update dictionary |
| db_nsr_update (dict): NSR update dictionary |
| error_list (list): List of errors |
| operation_type (str): Type of operation such as instantiate/terminate |
| |
| Returns: |
| nslcmop_operation_state (str): State of NS LCM operation |
| """ |
| # Update NS state |
| self._update_db_ns_state_after_operation( |
| error_list=error_list, |
| operation_type=operation_type, |
| nslcmop_id=nslcmop_id, |
| db_nsr_update=db_nsr_update, |
| db_nsr=db_nsr, |
| nsr_id=nsr_id, |
| ) |
| |
| # Update NS LCM Operation State |
| nslcmop_operation_state = self._update_db_nslcmop_status_after_operation( |
| error_list, db_nslcmop_update, nslcmop_id |
| ) |
| return nslcmop_operation_state |
| |
| async def instantiate(self, nsr_id: str, nslcmop_id: str) -> None: |
| """Perform PaaS Service instantiation. |
| Args: |
| nsr_id (str): NS service record to be used |
| nslcmop_id (str): NS LCM operation id |
| """ |
| # Locking HA task |
| if not self._lock_ha_task(nslcmop_id, nsr_id, keyword="instantiate"): |
| return |
| |
| logging_text = f"Task ns={nsr_id} instantiate={nslcmop_id} " |
| self.logger.debug(logging_text + "Enter") |
| |
| # Required containers |
| db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) |
| db_nsr_update, db_nslcmop_update, tasks_dict_info = {}, {}, {} |
| exc = None |
| error_list = [] |
| |
| try: |
| # Wait for any previous tasks in process |
| await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id) |
| # Update nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id |
| self._prepare_db_before_operation( |
| db_nsr_update, |
| nsr_id, |
| nslcmop_id, |
| detailed="creating", |
| operational="init", |
| ns_state="BUILDING", |
| current_op="INSTANTIATING", |
| stage="Building", |
| ) |
| |
| # Perform PaaS Service Deployment using PaaS Connector |
| self.logger.debug(logging_text + "Creating instantiate task") |
| task_instantiate = asyncio.ensure_future( |
| self.paas_connector.instantiate(nsr_id, nslcmop_id) |
| ) |
| self.lcm_tasks.register( |
| "ns", |
| nsr_id, |
| nslcmop_id, |
| "instantiate_juju_paas_service", |
| task_instantiate, |
| ) |
| tasks_dict_info[task_instantiate] = "Instantiate juju PaaS Service" |
| |
| # Update nsState="INSTANTIATED" |
| self.logger.debug(logging_text + "INSTANTIATED") |
| self._update_ns_state(nsr_id, db_nsr_update, "INSTANTIATED") |
| |
| except ( |
| DbException, |
| LcmException, |
| JujuPaasConnException, |
| JujuPaasServiceException, |
| ) as e: |
| self.logger.error(logging_text + "Exit Exception: {}".format(e)) |
| exc = e |
| except asyncio.CancelledError: |
| self.logger.error(logging_text + "Cancelled Exception") |
| exc = "Operation was cancelled" |
| |
| finally: |
| if exc: |
| error_list.append(str(exc)) |
| try: |
| if tasks_dict_info: |
| # Wait for pending tasks |
| stage = "Waiting for instantiate pending tasks." |
| self.logger.debug(logging_text + stage) |
| error_list += await self._wait_for_tasks( |
| logging_text, |
| tasks_dict_info, |
| self.timeout_ns_deploy, |
| stage, |
| nslcmop_id, |
| nsr_id=nsr_id, |
| ) |
| except asyncio.CancelledError: |
| error_list.append("Cancelled") |
| except Exception as exc: |
| error_list.append(str(exc)) |
| |
| # Update operational-status |
| self.logger.debug("updating operational status") |
| db_nsr_update["operational-status"] = "running" |
| |
| # Update status at database after operation |
| self.logger.debug(logging_text + "Updating DB after operation") |
| nslcmop_operation_state = self._update_db_after_operation( |
| nslcmop_id, |
| db_nsr, |
| nsr_id, |
| db_nslcmop_update=db_nslcmop_update, |
| db_nsr_update=db_nsr_update, |
| error_list=error_list, |
| operation_type="instantiate", |
| ) |
| |
| # Write to Kafka bus to report the operation status |
| await self._report_to_kafka( |
| nsr_id, |
| nslcmop_id, |
| nslcmop_operation_state, |
| logging_text, |
| "instantiated", |
| ) |
| self.logger.debug(logging_text + "Exit") |
| |
| # Remove task |
| self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate") |
| |
| async def terminate(self, nsr_id: str, nslcmop_id: str) -> None: |
| """Perform PaaS Service termination. |
| Args: |
| nsr_id (str): NS service record to be used |
| nslcmop_id (str): NS LCM operation id |
| """ |
| # Locking HA task |
| if not self._lock_ha_task(nslcmop_id, nsr_id, keyword="terminate"): |
| return |
| |
| logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id) |
| self.logger.debug(logging_text + "Enter") |
| |
| # Update ns termination timeout |
| timeout_ns_terminate = self.timeout_ns_deploy |
| db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) |
| db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id}) |
| operation_params = db_nslcmop.get("operationParams") or {} |
| |
| if operation_params.get("timeout_ns_terminate"): |
| timeout_ns_terminate = operation_params["timeout_ns_terminate"] |
| |
| # Required containers |
| autoremove = False |
| db_nsr_update, db_nslcmop_update, tasks_dict_info = {}, {}, {} |
| exc = None |
| error_list = [] |
| |
| try: |
| # Wait for any previous tasks in process |
| await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id) |
| |
| # Update nsState="TERMINATING", currentOperation="TERMINATING", currentOperationID=nslcmop_id |
| self._prepare_db_before_operation( |
| db_nsr_update, |
| nsr_id, |
| nslcmop_id, |
| detailed="terminating", |
| operational="terminate", |
| ns_state="TERMINATING", |
| current_op="TERMINATING", |
| stage="terminating", |
| ) |
| |
| # Perform PaaS Service deletion using PaaS Connector |
| self.logger.debug(logging_text + "Creating terminate task") |
| task_terminate = asyncio.ensure_future( |
| self.paas_connector.terminate(nsr_id, nslcmop_id) |
| ) |
| self.lcm_tasks.register( |
| "ns", nsr_id, nslcmop_id, "terminate_juju_paas_service", task_terminate |
| ) |
| tasks_dict_info[task_terminate] = "Terminate juju PaaS Service" |
| |
| # Update nsState="TERMINATED" |
| self.logger.debug(logging_text + "TERMINATED") |
| self._update_ns_state(nsr_id, db_nsr_update, "TERMINATED") |
| |
| except ( |
| DbException, |
| LcmException, |
| JujuPaasConnException, |
| JujuPaasServiceException, |
| ) as e: |
| self.logger.error(logging_text + "Exit Exception: {}".format(e)) |
| exc = e |
| except asyncio.CancelledError: |
| self.logger.error(logging_text + "Cancelled Exception") |
| exc = "Operation was cancelled" |
| |
| finally: |
| if exc: |
| error_list.append(str(exc)) |
| try: |
| if tasks_dict_info: |
| # Wait for pending tasks |
| stage = "Waiting for pending tasks for termination." |
| self.logger.debug(logging_text + stage) |
| error_list += await self._wait_for_tasks( |
| logging_text, |
| tasks_dict_info, |
| min(self.timeout_ns_deploy, timeout_ns_terminate), |
| stage, |
| nslcmop_id, |
| nsr_id=nsr_id, |
| ) |
| except asyncio.CancelledError: |
| error_list.append("Cancelled") |
| except Exception as exc: |
| error_list.append(str(exc)) |
| |
| # Update status at database |
| nslcmop_operation_state = self._update_db_after_operation( |
| nslcmop_id, |
| db_nsr, |
| nsr_id, |
| db_nslcmop_update=db_nslcmop_update, |
| db_nsr_update=db_nsr_update, |
| error_list=error_list, |
| operation_type="terminate", |
| ) |
| |
| # Write to Kafka bus to report the operation status |
| if operation_params: |
| autoremove = operation_params.get("autoremove", False) |
| |
| await self._report_to_kafka( |
| nsr_id, |
| nslcmop_id, |
| nslcmop_operation_state, |
| logging_text, |
| "terminated", |
| autoremove=autoremove, |
| ) |
| self.logger.debug(logging_text + "Exit") |
| |
| # Remove task |
| self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate") |
| |
| async def action(self, nsr_id: str, nslcmop_id: str): |
| """Perform action on PaaS service. |
| Args: |
| nsr_id (str): NS service record to be used |
| nslcmop_id (str): NS LCM operation id |
| |
| Raises: |
| NotImplementedError |
| """ |
| raise NotImplementedError("Juju Paas Service action method is not implemented") |