| Gulsum Atici | e7e0b75 | 2022-09-30 14:31:26 +0300 | [diff] [blame] | 1 | # !/usr/bin/python3 |
| 2 | # |
| 3 | # Copyright 2022 Canonical Ltd. |
| 4 | # |
| 5 | # Licensed under the Apache License, Version 2.0 (the "License"); |
| 6 | # you may not use this file except in compliance with the License. |
| 7 | # You may obtain a copy of the License at |
| 8 | # |
| 9 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | # |
| 11 | # Unless required by applicable law or agreed to in writing, software |
| 12 | # distributed under the License is distributed on an "AS IS" BASIS, |
| 13 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 14 | # See the License for the specific language governing permissions and |
| 15 | # limitations under the License. |
| 16 | |
| 17 | import asyncio |
| 18 | import logging |
| 19 | import traceback |
| 20 | |
| 21 | from osm_common.dbbase import DbException |
| 22 | from osm_common.msgbase import MsgException |
| 23 | from osm_lcm.lcm_utils import LcmBase |
| 24 | from osm_lcm.lcm_utils import LcmException |
| 25 | from osm_lcm.paas_conn import JujuPaasConnException, paas_connector_factory |
| 26 | |
| 27 | from time import time |
| 28 | |
| 29 | |
| 30 | def paas_service_factory( |
| 31 | msg: object, |
| 32 | lcm_tasks: object, |
| 33 | db: object, |
| 34 | fs: object, |
| 35 | log: object, |
| 36 | loop: object, |
| 37 | config: dict, |
| 38 | paas_type="juju", |
| 39 | ) -> object: |
| 40 | """Factory Method to create the paas_service objects according to PaaS Type. |
| 41 | Args: |
| 42 | msg (object): Message object to be used to write the messages to Kafka Bus |
| 43 | lcm_tasks (object): Task object to register the tasks |
| 44 | db (object): Database object to write current operation status |
| 45 | fs (object): Filesystem object to use during operations |
| 46 | log (object) Logger for tracing |
| 47 | loop (object) Async event loop object |
| 48 | config (dict): Dictionary with extra PaaS Service information. |
| 49 | paas_type (str): Identifier to create paas_service object using correct PaaS Service Class |
| 50 | |
| 51 | Returns: |
| 52 | paas_service (object): paas_service objects created according to given PaaS Type |
| 53 | |
| 54 | Raises: |
| 55 | PaasServiceException |
| 56 | """ |
| 57 | orchestrators = { |
| 58 | "juju": JujuPaasService, |
| 59 | } |
| 60 | |
| 61 | if paas_type not in orchestrators.keys(): |
| 62 | raise PaasServiceException(f"PaaS type: {paas_type} is not available.") |
| 63 | |
| 64 | return orchestrators[paas_type]( |
| 65 | msg=msg, lcm_tasks=lcm_tasks, db=db, fs=fs, loop=loop, logger=log, config=config |
| 66 | ) |
| 67 | |
| 68 | |
| 69 | class PaasServiceException(Exception): |
| 70 | """PaaS Service Exception Base Class""" |
| 71 | |
| 72 | def __init__(self, message: str = ""): |
| 73 | """Constructor of PaaS Service Exception |
| 74 | Args: |
| 75 | message (str): error message to be raised |
| 76 | """ |
| 77 | Exception.__init__(self, message) |
| 78 | self.message = message |
| 79 | |
| 80 | def __str__(self): |
| 81 | return self.message |
| 82 | |
| 83 | def __repr__(self): |
| 84 | return "{}({})".format(type(self), self.message) |
| 85 | |
| 86 | |
| 87 | class JujuPaasServiceException(PaasServiceException): |
| 88 | """Juju PaaS Service exception class""" |
| 89 | |
| 90 | |
| 91 | class JujuPaasService(LcmBase): |
| 92 | """Juju PaaS Service class to handle ns operations such as instantiate, terminate, action etc.""" |
| 93 | |
| 94 | timeout_ns_deploy = 3600 |
| 95 | |
| 96 | def __init__( |
| 97 | self, |
| 98 | msg: object, |
| 99 | lcm_tasks: object, |
| 100 | db: object, |
| 101 | fs: object, |
| 102 | loop: object, |
| 103 | logger: object, |
| 104 | config: dict, |
| 105 | ): |
| 106 | """ |
| 107 | Args: |
| 108 | msg (object): Message object to be used to write the messages to Kafka Bus |
| 109 | lcm_tasks (object): Task object to register the tasks |
| 110 | db (object): Database object to write current operation status |
| 111 | fs (object): Filesystem object to use during operations |
| 112 | loop (object) Async event loop object |
| 113 | logger (object): Logger for tracing |
| 114 | config (dict): Dictionary with extra PaaS Service information. |
| 115 | """ |
| 116 | self.logger = logging.getLogger("lcm.juju_paas_service") |
| 117 | self.loop = loop |
| 118 | self.lcm_tasks = lcm_tasks |
| 119 | self.config = config |
| 120 | super(JujuPaasService, self).__init__(msg=msg, logger=self.logger) |
| 121 | |
| 122 | self.paas_connector = paas_connector_factory( |
| 123 | self.msg, |
| 124 | self.lcm_tasks, |
| 125 | self.db, |
| 126 | self.fs, |
| 127 | self.loop, |
| 128 | self.logger, |
| 129 | self.config, |
| 130 | "juju", |
| 131 | ) |
| 132 | |
| 133 | def _lock_ha_task(self, nslcmop_id: str, nsr_id: str, keyword: str) -> bool: |
| 134 | """Lock the task. |
| 135 | Args: |
| 136 | nslcmop_id (str): NS LCM operation id |
| 137 | nsr_id (str): NS service record to be used |
| 138 | keyword (str): Word which indicates action such as instantiate, terminate |
| 139 | |
| 140 | Returns: |
| 141 | task_is_locked_by_me (Boolean): True if task_is_locked_by_me else False |
| 142 | """ |
| 143 | task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id) |
| 144 | if not task_is_locked_by_me: |
| 145 | self.logger.debug( |
| 146 | f"{keyword}() task is not locked by me, ns={nsr_id}, exiting." |
| 147 | ) |
| 148 | return task_is_locked_by_me |
| 149 | |
| 150 | def _write_ns_status( |
| 151 | self, |
| 152 | nsr_id: str, |
| 153 | ns_state: str, |
| 154 | current_operation: str, |
| 155 | current_operation_id: str, |
| 156 | error_description: str = None, |
| 157 | error_detail: str = None, |
| 158 | other_update: dict = None, |
| 159 | ) -> None: |
| 160 | """Update NS record. |
| 161 | Args: |
| 162 | nsr_id (str): NS service record to be used |
| 163 | ns_state (str): NS state |
| 164 | current_operation (str): Current operation name |
| 165 | current_operation_id (str): Current operation ID |
| 166 | error_description (str): Error description |
| 167 | error_detail (str): Details of error |
| 168 | other_update: (dict): Other required changes at database if provided |
| 169 | |
| 170 | Raises: |
| 171 | DbException |
| 172 | """ |
| 173 | try: |
| 174 | db_dict = other_update or {} |
| 175 | db_update_dict = { |
| 176 | "_admin.nslcmop": current_operation_id, |
| 177 | "_admin.current-operation": current_operation_id, |
| 178 | "_admin.operation-type": current_operation |
| 179 | if current_operation != "IDLE" |
| 180 | else None, |
| 181 | "currentOperation": current_operation, |
| 182 | "currentOperationID": current_operation_id, |
| 183 | "errorDescription": error_description, |
| 184 | "errorDetail": error_detail, |
| 185 | } |
| 186 | db_dict.update(db_update_dict) |
| 187 | |
| 188 | if ns_state: |
| 189 | db_dict["nsState"] = ns_state |
| 190 | self.update_db_2("nsrs", nsr_id, db_dict) |
| 191 | |
| 192 | except DbException as e: |
| 193 | error = f"Error writing NS status, ns={nsr_id}: {e}" |
| 194 | self.logger.error(error) |
| 195 | raise JujuPaasServiceException(error) |
| 196 | |
| 197 | def _write_op_status( |
| 198 | self, |
| 199 | op_id: str, |
| 200 | stage: str = None, |
| 201 | error_message: str = None, |
| 202 | queue_position: int = 0, |
| 203 | operation_state: str = None, |
| 204 | other_update: dict = None, |
| 205 | ) -> None: |
| 206 | """Update NS LCM Operation Status. |
| 207 | Args: |
| 208 | op_id (str): Operation ID |
| 209 | stage (str): Indicates the stage of operations |
| 210 | error_message (str): Error description |
| 211 | queue_position (int): Operation position in the queue |
| 212 | operation_state (str): State of operation |
| 213 | other_update: (dict): Other required changes at database if provided |
| 214 | |
| 215 | Raises: |
| 216 | DbException |
| 217 | """ |
| 218 | try: |
| 219 | db_dict = other_update or {} |
| 220 | db_dict["queuePosition"] = queue_position |
| 221 | if stage: |
| 222 | db_dict["stage"] = str(stage) |
| 223 | if error_message: |
| 224 | db_dict["errorMessage"] = error_message |
| 225 | if operation_state: |
| 226 | db_dict["operationState"] = operation_state |
| 227 | db_dict["statusEnteredTime"] = time() |
| 228 | self.update_db_2("nslcmops", op_id, db_dict) |
| 229 | |
| 230 | except DbException as e: |
| 231 | error = f"Error writing OPERATION status for op_id: {op_id} -> {e}" |
| 232 | self.logger.error(error) |
| 233 | raise JujuPaasServiceException(error) |
| 234 | |
| 235 | def _update_nsr_error_desc( |
| 236 | self, |
| 237 | stage: str, |
| 238 | new_error: str, |
| 239 | error_list: list, |
| 240 | error_detail_list: list, |
| 241 | nsr_id: str, |
| 242 | ) -> None: |
| 243 | """Update error description in NS record. |
| 244 | Args: |
| 245 | stage (str): Indicates the stage of operations |
| 246 | new_error (str): New detected error |
| 247 | error_list (str): Updated error list |
| 248 | error_detail_list: Updated detailed error list |
| 249 | nsr_id (str): NS service record to be used |
| 250 | |
| 251 | Raises: |
| 252 | DbException |
| 253 | """ |
| 254 | if new_error: |
| 255 | stage += " Errors: " + ". ".join(error_detail_list) + "." |
| 256 | if nsr_id: |
| 257 | try: |
| 258 | # Update nsr |
| 259 | self.update_db_2( |
| 260 | "nsrs", |
| 261 | nsr_id, |
| 262 | { |
| 263 | "errorDescription": "Error at: " + ", ".join(error_list), |
| 264 | "errorDetail": ". ".join(error_detail_list), |
| 265 | }, |
| 266 | ) |
| 267 | |
| 268 | except DbException as e: |
| 269 | error = f"Error updating NSR error description for nsr_id: {nsr_id} -> {e}" |
| 270 | self.logger.error(error) |
| 271 | raise JujuPaasServiceException(error) |
| 272 | |
| 273 | def _check_tasks_in_done( |
| 274 | self, |
| 275 | completed_tasks_list: list, |
| 276 | created_tasks_info: dict, |
| 277 | error_list: list, |
| 278 | error_detail_list: list, |
| 279 | logging_text: str, |
| 280 | ) -> (str, str, str): |
| 281 | """Check the completed tasks to detect errors |
| 282 | Args: |
| 283 | completed_tasks_list (list): List of completed tasks |
| 284 | created_tasks_info: Dictionary which includes the tasks |
| 285 | error_list: List of errors |
| 286 | error_detail_list: List includes details of errors |
| 287 | logging_text: Main log message |
| 288 | |
| 289 | Returns: |
| 290 | new_error (str): New detected error |
| 291 | error_list (str): Updated error list |
| 292 | error_detail_list: Updated detailed error list |
| 293 | """ |
| 294 | new_error = "" |
| 295 | for task in completed_tasks_list: |
| 296 | if task.cancelled(): |
| 297 | exc = "Cancelled" |
| 298 | else: |
| 299 | exc = task.exception() |
| 300 | if exc: |
| 301 | if isinstance(exc, asyncio.TimeoutError): |
| 302 | exc = "Timeout" |
| 303 | new_error = created_tasks_info[task] + ": {}".format(exc) |
| 304 | error_list.append(created_tasks_info[task]) |
| 305 | error_detail_list.append(new_error) |
| 306 | if isinstance( |
| 307 | exc, |
| 308 | ( |
| 309 | str, |
| 310 | DbException, |
| 311 | LcmException, |
| 312 | JujuPaasConnException, |
| 313 | JujuPaasServiceException, |
| 314 | ), |
| 315 | ): |
| 316 | self.logger.error(logging_text + new_error) |
| 317 | else: |
| 318 | exc_traceback = "".join( |
| 319 | traceback.format_exception(None, exc, exc.__traceback__) |
| 320 | ) |
| 321 | self.logger.error( |
| 322 | logging_text + created_tasks_info[task] + " " + exc_traceback |
| 323 | ) |
| 324 | else: |
| 325 | self.logger.debug(logging_text + created_tasks_info[task] + ": Done") |
| 326 | |
| 327 | return new_error, error_list, error_detail_list |
| 328 | |
| 329 | async def _wait_for_tasks( |
| 330 | self, |
| 331 | logging_text: str, |
| 332 | created_tasks_info: dict, |
| 333 | timeout: int, |
| 334 | stage: str, |
| 335 | nslcmop_id: str, |
| 336 | nsr_id: str, |
| 337 | ) -> None: |
| 338 | """Wait for tasks to be completed. |
| 339 | Args: |
| 340 | logging_text (str): Log message |
| 341 | created_tasks_info (dict): Dictionary which includes the tasks |
| 342 | timeout (inst): Timeout in seconds |
| 343 | stage (str): Indicates the stage of operations |
| 344 | nslcmop_id (str): NS LCM Operation ID |
| 345 | nsr_id (str): NS service record to be used |
| 346 | """ |
| 347 | time_start = time() |
| 348 | error_detail_list, error_list = [], [] |
| 349 | pending_tasks = list(created_tasks_info.keys()) |
| 350 | num_tasks = len(pending_tasks) |
| 351 | num_done = 0 |
| 352 | |
| 353 | self._write_op_status(nslcmop_id, stage=f"{stage}: {num_done}/{num_tasks}") |
| 354 | |
| 355 | while pending_tasks: |
| 356 | _timeout = timeout + time_start - time() |
| 357 | done, pending_tasks = await asyncio.wait( |
| 358 | pending_tasks, timeout=_timeout, return_when=asyncio.FIRST_COMPLETED |
| 359 | ) |
| 360 | num_done += len(done) |
| 361 | if not done: |
| 362 | # Timeout error |
| 363 | for task in pending_tasks: |
| 364 | new_error = created_tasks_info[task] + ": Timeout" |
| 365 | error_detail_list.append(new_error) |
| 366 | error_list.append(new_error) |
| 367 | break |
| 368 | # Find out the errors in completed tasks |
| 369 | new_error, error_list, error_detail_list = self._check_tasks_in_done( |
| 370 | completed_tasks_list=done, |
| 371 | created_tasks_info=created_tasks_info, |
| 372 | error_detail_list=error_detail_list, |
| 373 | error_list=error_list, |
| 374 | logging_text=logging_text, |
| 375 | ) |
| 376 | |
| 377 | self._update_nsr_error_desc( |
| 378 | stage=f"{stage}: {num_done}/{num_tasks}", |
| 379 | new_error=new_error, |
| 380 | error_list=error_list, |
| 381 | error_detail_list=error_detail_list, |
| 382 | nsr_id=nsr_id, |
| 383 | ) |
| 384 | |
| 385 | self._write_op_status(nslcmop_id, stage=f"{stage}: {num_done}/{num_tasks}") |
| 386 | |
| 387 | return error_detail_list |
| 388 | |
| 389 | def _prepare_db_before_operation( |
| 390 | self, |
| 391 | db_nsr_update: dict, |
| 392 | nsr_id: str, |
| 393 | nslcmop_id: str, |
| 394 | detailed: str = None, |
| 395 | operational: str = None, |
| 396 | ns_state: str = None, |
| 397 | current_op: str = None, |
| 398 | stage: str = None, |
| 399 | ) -> None: |
| 400 | """Update DB before performing NS operations |
| 401 | Args: |
| 402 | db_nsr_update (dict): NS record update dictionary |
| 403 | nsr_id (str): NS record ID |
| 404 | nslcmop_id (str): NS LCM Operation ID |
| 405 | detailed: (str): Detailed status |
| 406 | operational (str): Operational status |
| 407 | ns_state (str): NS state |
| 408 | current_op (str): Current operation name |
| 409 | stage (str): Indicates the stage of operations |
| 410 | """ |
| 411 | db_nsr_update["detailed-status"] = detailed |
| 412 | db_nsr_update["operational-status"] = operational |
| 413 | |
| 414 | self._write_ns_status( |
| 415 | nsr_id=nsr_id, |
| 416 | ns_state=ns_state, |
| 417 | current_operation=current_op, |
| 418 | current_operation_id=nslcmop_id, |
| 419 | other_update=db_nsr_update, |
| 420 | ) |
| 421 | self._write_op_status(op_id=nslcmop_id, stage=stage, queue_position=0) |
| 422 | |
| 423 | async def _report_to_kafka( |
| 424 | self, |
| 425 | nsr_id: str, |
| 426 | nslcmop_id: str, |
| 427 | nslcmop_operation_state: str, |
| 428 | logging_text: str, |
| 429 | message: str, |
| 430 | autoremove="False", |
| 431 | ) -> None: |
| 432 | """Report operation status to Kafka. |
| 433 | Args: |
| 434 | nsr_id (str): NS record ID |
| 435 | nslcmop_id (str): NS LCM Operation ID |
| 436 | nslcmop_operation_state (str): NS LCM Operation status |
| 437 | logging_text (str): Common log message |
| 438 | message (str): Message which is sent through Kafka |
| 439 | autoremove (Boolean): True/False If True NBI deletes NS from DB |
| 440 | |
| 441 | Raises: |
| 442 | PaasServiceException |
| 443 | """ |
| 444 | if nslcmop_operation_state: |
| 445 | update_dict = { |
| 446 | "nsr_id": nsr_id, |
| 447 | "nslcmop_id": nslcmop_id, |
| 448 | "operationState": nslcmop_operation_state, |
| 449 | } |
| 450 | if message == "terminated": |
| 451 | update_dict["autoremove"] = autoremove |
| 452 | try: |
| 453 | await self.msg.aiowrite( |
| 454 | "ns", |
| 455 | message, |
| 456 | update_dict, |
| 457 | loop=self.loop, |
| 458 | ) |
| 459 | except MsgException as e: |
| 460 | error = logging_text + f"kafka_write notification Exception: {e}" |
| 461 | self.logger.error(error) |
| 462 | raise PaasServiceException(error) |
| 463 | |
| 464 | def _update_ns_state(self, nsr_id: str, db_nsr_update: dict, ns_state: str) -> None: |
| 465 | """Update NS state in NSR and VNFRs |
| 466 | Args: |
| 467 | nsr_id (str): NS record ID |
| 468 | db_nsr_update (dict): NS record dictionary |
| 469 | ns_state (str): NS status |
| 470 | """ |
| 471 | db_nsr_update["_admin.nsState"] = ns_state |
| 472 | self.update_db_2("nsrs", nsr_id, db_nsr_update) |
| 473 | self.db.set_list("vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": ns_state}) |
| 474 | |
| 475 | def _update_db_ns_state_after_operation( |
| 476 | self, |
| 477 | error_list: list, |
| 478 | operation_type: str, |
| 479 | nslcmop_id: str, |
| 480 | db_nsr_update: dict, |
| 481 | db_nsr: dict, |
| 482 | nsr_id: str, |
| 483 | ) -> None: |
| 484 | """Update NS status at database after performing operations |
| 485 | Args: |
| 486 | error_list (list): List of errors |
| 487 | operation_type (str): Type of operation such as instantiate/terminate |
| 488 | nslcmop_id (str): NS LCM Operation ID |
| 489 | db_nsr_update (dict): NSR update dictionary |
| 490 | db_nsr (dict): NS record dictionary |
| 491 | nsr_id (str): NS record ID |
| 492 | """ |
| 493 | ns_state = "" |
| 494 | if error_list: |
| 495 | error_detail = ". ".join(error_list) |
| 496 | error_description_nsr = "Operation: {}.{}".format( |
| 497 | operation_type, nslcmop_id |
| 498 | ) |
| 499 | db_nsr_update["detailed-status"] = ( |
| 500 | error_description_nsr + " Detail: " + error_detail |
| 501 | ) |
| 502 | ns_state = "BROKEN" |
| 503 | |
| 504 | else: |
| 505 | error_detail = None |
| 506 | error_description_nsr = None |
| 507 | db_nsr_update["detailed-status"] = "Done" |
| 508 | if operation_type == "instantiate": |
| 509 | ns_state = "READY" |
| 510 | elif operation_type == "terminate": |
| 511 | ns_state = "NOT_INSTANTIATED" |
| 512 | db_nsr_update["operational-status"] = "terminated" |
| 513 | db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED" |
| 514 | |
| 515 | if db_nsr: |
| 516 | self._write_ns_status( |
| 517 | nsr_id=nsr_id, |
| 518 | ns_state=ns_state, |
| 519 | current_operation="IDLE", |
| 520 | current_operation_id=None, |
| 521 | error_description=error_description_nsr, |
| 522 | error_detail=error_detail, |
| 523 | other_update=db_nsr_update, |
| 524 | ) |
| 525 | |
| 526 | if ns_state == "NOT_INSTANTIATED": |
| 527 | self.db.set_list( |
| 528 | "vnfrs", |
| 529 | {"nsr-id-ref": nsr_id}, |
| 530 | {"_admin.nsState": "NOT_INSTANTIATED"}, |
| 531 | ) |
| 532 | |
| 533 | def _update_db_nslcmop_status_after_operation( |
| 534 | self, error_list: list, db_nslcmop_update: dict, nslcmop_id: str |
| 535 | ) -> str: |
| 536 | """Update NS LCM operation status at database after performing operation |
| 537 | Args |
| 538 | error_list (list): List of errors |
| 539 | db_nslcmop_update (dict): NS LCM operation update dictionary |
| 540 | nslcmop_id (str): NS LCM Operation ID |
| 541 | |
| 542 | Returns: |
| 543 | nslcmop_operation_state (str): State of NS LCM operation |
| 544 | """ |
| 545 | if error_list: |
| 546 | error_detail = ". ".join(error_list) |
| 547 | error_description_nslcmop = "Detail: {}".format(error_detail) |
| 548 | db_nslcmop_update["detailed-status"] = error_detail |
| 549 | nslcmop_operation_state = "FAILED" |
| 550 | |
| 551 | else: |
| 552 | error_description_nslcmop = None |
| 553 | db_nslcmop_update["detailed-status"] = "Done" |
| 554 | nslcmop_operation_state = "COMPLETED" |
| 555 | |
| 556 | self._write_op_status( |
| 557 | op_id=nslcmop_id, |
| 558 | stage=nslcmop_operation_state, |
| 559 | error_message=error_description_nslcmop, |
| 560 | operation_state=nslcmop_operation_state, |
| 561 | other_update=db_nslcmop_update, |
| 562 | ) |
| 563 | |
| 564 | return nslcmop_operation_state |
| 565 | |
| 566 | def _update_db_after_operation( |
| 567 | self, |
| 568 | nslcmop_id: str, |
| 569 | db_nsr: str, |
| 570 | nsr_id: str, |
| 571 | db_nslcmop_update: dict = None, |
| 572 | db_nsr_update: dict = None, |
| 573 | error_list: list = None, |
| 574 | operation_type: str = None, |
| 575 | ) -> str: |
| 576 | """Update database after operation is performed. |
| 577 | Args: |
| 578 | nslcmop_id (str): NS LCM Operation ID |
| 579 | db_nsr (dict): NS record dictionary |
| 580 | nsr_id (str): NS record ID |
| 581 | db_nslcmop_update (dict): NS LCM operation update dictionary |
| 582 | db_nsr_update (dict): NSR update dictionary |
| 583 | error_list (list): List of errors |
| 584 | operation_type (str): Type of operation such as instantiate/terminate |
| 585 | |
| 586 | Returns: |
| 587 | nslcmop_operation_state (str): State of NS LCM operation |
| 588 | """ |
| 589 | # Update NS state |
| 590 | self._update_db_ns_state_after_operation( |
| 591 | error_list=error_list, |
| 592 | operation_type=operation_type, |
| 593 | nslcmop_id=nslcmop_id, |
| 594 | db_nsr_update=db_nsr_update, |
| 595 | db_nsr=db_nsr, |
| 596 | nsr_id=nsr_id, |
| 597 | ) |
| 598 | |
| 599 | # Update NS LCM Operation State |
| 600 | nslcmop_operation_state = self._update_db_nslcmop_status_after_operation( |
| 601 | error_list, db_nslcmop_update, nslcmop_id |
| 602 | ) |
| 603 | return nslcmop_operation_state |
| 604 | |
| 605 | async def instantiate(self, nsr_id: str, nslcmop_id: str) -> None: |
| 606 | """Perform PaaS Service instantiation. |
| 607 | Args: |
| 608 | nsr_id (str): NS service record to be used |
| 609 | nslcmop_id (str): NS LCM operation id |
| 610 | """ |
| 611 | # Locking HA task |
| 612 | if not self._lock_ha_task(nslcmop_id, nsr_id, keyword="instantiate"): |
| 613 | return |
| 614 | |
| 615 | logging_text = f"Task ns={nsr_id} instantiate={nslcmop_id} " |
| 616 | self.logger.debug(logging_text + "Enter") |
| 617 | |
| 618 | # Required containers |
| 619 | db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) |
| 620 | db_nsr_update, db_nslcmop_update, tasks_dict_info = {}, {}, {} |
| 621 | exc = None |
| 622 | error_list = [] |
| 623 | |
| 624 | try: |
| 625 | # Wait for any previous tasks in process |
| 626 | await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id) |
| 627 | # Update nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id |
| 628 | self._prepare_db_before_operation( |
| 629 | db_nsr_update, |
| 630 | nsr_id, |
| 631 | nslcmop_id, |
| 632 | detailed="creating", |
| 633 | operational="init", |
| 634 | ns_state="BUILDING", |
| 635 | current_op="INSTANTIATING", |
| 636 | stage="Building", |
| 637 | ) |
| 638 | |
| 639 | # Perform PaaS Service Deployment using PaaS Connector |
| 640 | self.logger.debug(logging_text + "Creating instantiate task") |
| 641 | task_instantiate = asyncio.ensure_future( |
| 642 | self.paas_connector.instantiate(nsr_id, nslcmop_id) |
| 643 | ) |
| 644 | self.lcm_tasks.register( |
| 645 | "ns", |
| 646 | nsr_id, |
| 647 | nslcmop_id, |
| 648 | "instantiate_juju_paas_service", |
| 649 | task_instantiate, |
| 650 | ) |
| 651 | tasks_dict_info[task_instantiate] = "Instantiate juju PaaS Service" |
| 652 | |
| 653 | # Update nsState="INSTANTIATED" |
| 654 | self.logger.debug(logging_text + "INSTANTIATED") |
| 655 | self._update_ns_state(nsr_id, db_nsr_update, "INSTANTIATED") |
| 656 | |
| 657 | except ( |
| 658 | DbException, |
| 659 | LcmException, |
| 660 | JujuPaasConnException, |
| 661 | JujuPaasServiceException, |
| 662 | ) as e: |
| 663 | self.logger.error(logging_text + "Exit Exception: {}".format(e)) |
| 664 | exc = e |
| 665 | except asyncio.CancelledError: |
| 666 | self.logger.error(logging_text + "Cancelled Exception") |
| 667 | exc = "Operation was cancelled" |
| 668 | |
| 669 | finally: |
| 670 | if exc: |
| 671 | error_list.append(str(exc)) |
| 672 | try: |
| 673 | if tasks_dict_info: |
| 674 | # Wait for pending tasks |
| 675 | stage = "Waiting for instantiate pending tasks." |
| 676 | self.logger.debug(logging_text + stage) |
| 677 | error_list += await self._wait_for_tasks( |
| 678 | logging_text, |
| 679 | tasks_dict_info, |
| 680 | self.timeout_ns_deploy, |
| 681 | stage, |
| 682 | nslcmop_id, |
| 683 | nsr_id=nsr_id, |
| 684 | ) |
| 685 | except asyncio.CancelledError: |
| 686 | error_list.append("Cancelled") |
| 687 | except Exception as exc: |
| 688 | error_list.append(str(exc)) |
| 689 | |
| 690 | # Update operational-status |
| 691 | self.logger.debug("updating operational status") |
| 692 | db_nsr_update["operational-status"] = "running" |
| 693 | |
| 694 | # Update status at database after operation |
| 695 | self.logger.debug(logging_text + "Updating DB after operation") |
| 696 | nslcmop_operation_state = self._update_db_after_operation( |
| 697 | nslcmop_id, |
| 698 | db_nsr, |
| 699 | nsr_id, |
| 700 | db_nslcmop_update=db_nslcmop_update, |
| 701 | db_nsr_update=db_nsr_update, |
| 702 | error_list=error_list, |
| 703 | operation_type="instantiate", |
| 704 | ) |
| 705 | |
| 706 | # Write to Kafka bus to report the operation status |
| 707 | await self._report_to_kafka( |
| 708 | nsr_id, |
| 709 | nslcmop_id, |
| 710 | nslcmop_operation_state, |
| 711 | logging_text, |
| 712 | "instantiated", |
| 713 | ) |
| 714 | self.logger.debug(logging_text + "Exit") |
| 715 | |
| 716 | # Remove task |
| 717 | self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate") |
| 718 | |
| 719 | async def terminate(self, nsr_id: str, nslcmop_id: str) -> None: |
| 720 | """Perform PaaS Service termination. |
| 721 | Args: |
| 722 | nsr_id (str): NS service record to be used |
| 723 | nslcmop_id (str): NS LCM operation id |
| 724 | """ |
| 725 | # Locking HA task |
| 726 | if not self._lock_ha_task(nslcmop_id, nsr_id, keyword="terminate"): |
| 727 | return |
| 728 | |
| 729 | logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id) |
| 730 | self.logger.debug(logging_text + "Enter") |
| 731 | |
| 732 | # Update ns termination timeout |
| 733 | timeout_ns_terminate = self.timeout_ns_deploy |
| 734 | db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) |
| 735 | db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id}) |
| 736 | operation_params = db_nslcmop.get("operationParams") or {} |
| 737 | |
| 738 | if operation_params.get("timeout_ns_terminate"): |
| 739 | timeout_ns_terminate = operation_params["timeout_ns_terminate"] |
| 740 | |
| 741 | # Required containers |
| 742 | autoremove = False |
| 743 | db_nsr_update, db_nslcmop_update, tasks_dict_info = {}, {}, {} |
| 744 | exc = None |
| 745 | error_list = [] |
| 746 | |
| 747 | try: |
| 748 | # Wait for any previous tasks in process |
| 749 | await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id) |
| 750 | |
| 751 | # Update nsState="TERMINATING", currentOperation="TERMINATING", currentOperationID=nslcmop_id |
| 752 | self._prepare_db_before_operation( |
| 753 | db_nsr_update, |
| 754 | nsr_id, |
| 755 | nslcmop_id, |
| 756 | detailed="terminating", |
| 757 | operational="terminate", |
| 758 | ns_state="TERMINATING", |
| 759 | current_op="TERMINATING", |
| 760 | stage="terminating", |
| 761 | ) |
| 762 | |
| 763 | # Perform PaaS Service deletion using PaaS Connector |
| 764 | self.logger.debug(logging_text + "Creating terminate task") |
| 765 | task_terminate = asyncio.ensure_future( |
| 766 | self.paas_connector.terminate(nsr_id, nslcmop_id) |
| 767 | ) |
| 768 | self.lcm_tasks.register( |
| 769 | "ns", nsr_id, nslcmop_id, "terminate_juju_paas_service", task_terminate |
| 770 | ) |
| 771 | tasks_dict_info[task_terminate] = "Terminate juju PaaS Service" |
| 772 | |
| 773 | # Update nsState="TERMINATED" |
| 774 | self.logger.debug(logging_text + "TERMINATED") |
| 775 | self._update_ns_state(nsr_id, db_nsr_update, "TERMINATED") |
| 776 | |
| 777 | except ( |
| 778 | DbException, |
| 779 | LcmException, |
| 780 | JujuPaasConnException, |
| 781 | JujuPaasServiceException, |
| 782 | ) as e: |
| 783 | self.logger.error(logging_text + "Exit Exception: {}".format(e)) |
| 784 | exc = e |
| 785 | except asyncio.CancelledError: |
| 786 | self.logger.error(logging_text + "Cancelled Exception") |
| 787 | exc = "Operation was cancelled" |
| 788 | |
| 789 | finally: |
| 790 | if exc: |
| 791 | error_list.append(str(exc)) |
| 792 | try: |
| 793 | if tasks_dict_info: |
| 794 | # Wait for pending tasks |
| 795 | stage = "Waiting for pending tasks for termination." |
| 796 | self.logger.debug(logging_text + stage) |
| 797 | error_list += await self._wait_for_tasks( |
| 798 | logging_text, |
| 799 | tasks_dict_info, |
| 800 | min(self.timeout_ns_deploy, timeout_ns_terminate), |
| 801 | stage, |
| 802 | nslcmop_id, |
| 803 | nsr_id=nsr_id, |
| 804 | ) |
| 805 | except asyncio.CancelledError: |
| 806 | error_list.append("Cancelled") |
| 807 | except Exception as exc: |
| 808 | error_list.append(str(exc)) |
| 809 | |
| 810 | # Update status at database |
| 811 | nslcmop_operation_state = self._update_db_after_operation( |
| 812 | nslcmop_id, |
| 813 | db_nsr, |
| 814 | nsr_id, |
| 815 | db_nslcmop_update=db_nslcmop_update, |
| 816 | db_nsr_update=db_nsr_update, |
| 817 | error_list=error_list, |
| 818 | operation_type="terminate", |
| 819 | ) |
| 820 | |
| 821 | # Write to Kafka bus to report the operation status |
| 822 | if operation_params: |
| 823 | autoremove = operation_params.get("autoremove", False) |
| 824 | |
| 825 | await self._report_to_kafka( |
| 826 | nsr_id, |
| 827 | nslcmop_id, |
| 828 | nslcmop_operation_state, |
| 829 | logging_text, |
| 830 | "terminated", |
| 831 | autoremove=autoremove, |
| 832 | ) |
| 833 | self.logger.debug(logging_text + "Exit") |
| 834 | |
| 835 | # Remove task |
| 836 | self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate") |
| 837 | |
| 838 | async def action(self, nsr_id: str, nslcmop_id: str): |
| 839 | """Perform action on PaaS service. |
| 840 | Args: |
| 841 | nsr_id (str): NS service record to be used |
| 842 | nslcmop_id (str): NS LCM operation id |
| 843 | |
| 844 | Raises: |
| 845 | NotImplementedError |
| 846 | """ |
| 847 | raise NotImplementedError("Juju Paas Service action method is not implemented") |