| tierno | 1d213f4 | 2020-04-24 14:02:51 +0000 | [diff] [blame] | 1 | # -*- coding: utf-8 -*- |
| 2 | |
| 3 | ## |
| 4 | # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U. |
| 5 | # |
| 6 | # Licensed under the Apache License, Version 2.0 (the "License"); you may |
| 7 | # not use this file except in compliance with the License. You may obtain |
| 8 | # a copy of the License at |
| 9 | # |
| 10 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 11 | # |
| 12 | # Unless required by applicable law or agreed to in writing, software |
| 13 | # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| 14 | # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| 15 | # License for the specific language governing permissions and limitations |
| 16 | # under the License. |
| 17 | # |
| 18 | ## |
| 19 | |
| 20 | """" |
| 21 | This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM. |
| 22 | The tasks are stored at database in table ro_tasks |
| 23 | A single ro_task refers to a VIM element (flavor, image, network, ...). |
| 24 | A ro_task can contain several 'tasks', each one with a target, where to store the results |
| 25 | """ |
| 26 | |
| 27 | import threading |
| 28 | import time |
| 29 | import queue |
| 30 | import logging |
| 31 | from pkg_resources import iter_entry_points |
| 32 | # from osm_common import dbmongo, dbmemory, fslocal, fsmongo, msglocal, msgkafka, version as common_version |
| 33 | from osm_common.dbbase import DbException |
| 34 | # from osm_common.fsbase import FsException |
| 35 | # from osm_common.msgbase import MsgException |
| 36 | from osm_ro_plugin.vim_dummy import VimDummyConnector |
| 37 | from osm_ro_plugin import vimconn |
| 38 | from copy import deepcopy |
| 39 | from unittest.mock import Mock |
| 40 | |
| 41 | __author__ = "Alfonso Tierno" |
| 42 | __date__ = "$28-Sep-2017 12:07:15$" |
| 43 | |
| 44 | |
| 45 | def deep_get(target_dict, *args, **kwargs): |
| 46 | """ |
| 47 | Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None |
| 48 | Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None |
| 49 | :param target_dict: dictionary to be read |
| 50 | :param args: list of keys to read from target_dict |
| 51 | :param kwargs: only can contain default=value to return if key is not present in the nested dictionary |
| 52 | :return: The wanted value if exist, None or default otherwise |
| 53 | """ |
| 54 | for key in args: |
| 55 | if not isinstance(target_dict, dict) or key not in target_dict: |
| 56 | return kwargs.get("default") |
| 57 | target_dict = target_dict[key] |
| 58 | return target_dict |
| 59 | |
| 60 | |
| 61 | class NsWorkerException(Exception): |
| 62 | pass |
| 63 | |
| 64 | |
| 65 | class FailingConnector: |
| 66 | def __init__(self, error_msg): |
| 67 | self.error_msg = error_msg |
| 68 | for method in dir(vimconn.VimConnector): |
| 69 | if method[0] != "_": |
| 70 | setattr(self, method, Mock(side_effect=vimconn.VimConnException(error_msg))) |
| 71 | |
| 72 | |
| 73 | class NsWorkerExceptionNotFound(NsWorkerException): |
| 74 | pass |
| 75 | |
| 76 | |
| 77 | class NsWorker(threading.Thread): |
| 78 | REFRESH_BUILD = 5 # 5 seconds |
| 79 | REFRESH_ACTIVE = 60 # 1 minute |
| 80 | REFRESH_ERROR = 600 |
| 81 | REFRESH_IMAGE = 3600 * 10 |
| 82 | REFRESH_DELETE = 3600 * 10 |
| 83 | QUEUE_SIZE = 2000 |
| 84 | # TODO delete assigment_lock = Lock() |
| 85 | terminate = False |
| 86 | # TODO delete assignment = {} |
| 87 | MAX_TIME_LOCKED = 3600 |
| 88 | |
| 89 | def __init__(self, worker, config, plugins, db): |
| 90 | """Init a thread. |
| 91 | Arguments: |
| 92 | 'id' number of thead |
| 93 | 'name' name of thread |
| 94 | 'host','user': host ip or name to manage and user |
| 95 | 'db', 'db_lock': database class and lock to use it in exclusion |
| 96 | """ |
| 97 | threading.Thread.__init__(self) |
| 98 | self.config = config |
| 99 | self.plugins = plugins |
| 100 | self.plugin_name = "unknown" |
| 101 | self.logger = logging.getLogger('ro.worker{}'.format("worker")) |
| 102 | self.worker_id = worker |
| 103 | self.task_queue = queue.Queue(self.QUEUE_SIZE) |
| 104 | self.my_vims = {} # targetvim: vimplugin class |
| 105 | self.db_vims = {} # targetvim: vim information from database |
| 106 | self.vim_targets = [] # targetvim list |
| 107 | self.my_id = config["process_id"] + ":" + str(worker) |
| 108 | self.db = db |
| 109 | self.item2create = { |
| 110 | "net": self.new_net, |
| 111 | "vdu": self.new_vm, |
| 112 | "image": self.new_image, |
| 113 | "flavor": self.new_flavor, |
| 114 | } |
| 115 | self.item2refresh = { |
| 116 | "net": self.refresh_net, |
| 117 | "vdu": self.refresh_vm, |
| 118 | "image": self.refresh_ok, |
| 119 | "flavor": self.refresh_ok, |
| 120 | } |
| 121 | self.item2delete = { |
| 122 | "net": self.del_net, |
| 123 | "vdu": self.del_vm, |
| 124 | "image": self.delete_ok, |
| 125 | "flavor": self.del_flavor, |
| 126 | } |
| 127 | self.item2action = { |
| 128 | "vdu": self.exec_vm, |
| 129 | } |
| 130 | self.time_last_task_processed = None |
| 131 | |
| 132 | def insert_task(self, task): |
| 133 | try: |
| 134 | self.task_queue.put(task, False) |
| 135 | return None |
| 136 | except queue.Full: |
| 137 | raise NsWorkerException("timeout inserting a task") |
| 138 | |
| 139 | def terminate(self): |
| 140 | self.insert_task("exit") |
| 141 | |
| 142 | def del_task(self, task): |
| 143 | with self.task_lock: |
| 144 | if task["status"] == "SCHEDULED": |
| 145 | task["status"] = "SUPERSEDED" |
| 146 | return True |
| 147 | else: # task["status"] == "processing" |
| 148 | self.task_lock.release() |
| 149 | return False |
| 150 | |
| 151 | def _load_plugin(self, name, type="vim"): |
| 152 | # type can be vim or sdn |
| 153 | if "rovim_dummy" not in self.plugins: |
| 154 | self.plugins["rovim_dummy"] = VimDummyConnector |
| 155 | if name in self.plugins: |
| 156 | return self.plugins[name] |
| 157 | try: |
| 158 | for v in iter_entry_points('osm_ro{}.plugins'.format(type), name): |
| 159 | self.plugins[name] = v.load() |
| 160 | except Exception as e: |
| 161 | self.logger.critical("Cannot load osm_{}: {}".format(name, e)) |
| 162 | if name: |
| 163 | self.plugins[name] = FailingConnector("Cannot load osm_{}: {}".format(name, e)) |
| 164 | if name and name not in self.plugins: |
| 165 | error_text = "Cannot load a module for {t} type '{n}'. The plugin 'osm_{n}' has not been" \ |
| 166 | " registered".format(t=type, n=name) |
| 167 | self.logger.critical(error_text) |
| 168 | self.plugins[name] = FailingConnector(error_text) |
| 169 | |
| 170 | return self.plugins[name] |
| 171 | |
| 172 | def _load_vim(self, vim_account_id): |
| 173 | target_id = "vim:" + vim_account_id |
| 174 | plugin_name = "" |
| 175 | vim = None |
| 176 | try: |
| 177 | step = "Getting vim={} from db".format(vim_account_id) |
| 178 | vim = self.db.get_one("vim_accounts", {"_id": vim_account_id}) |
| 179 | |
| 180 | # if deep_get(vim, "config", "sdn-controller"): |
| 181 | # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"]) |
| 182 | # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]}) |
| 183 | |
| 184 | step = "Decrypt password" |
| 185 | schema_version = vim.get("schema_version") |
| 186 | self.db.encrypt_decrypt_fields(vim, "decrypt", fields=('password', 'secret'), |
| 187 | schema_version=schema_version, salt=vim_account_id) |
| 188 | |
| 189 | step = "Load plugin 'rovim_{}'".format(vim.get("vim_type")) |
| 190 | plugin_name = "rovim_" + vim["vim_type"] |
| 191 | vim_module_conn = self._load_plugin(plugin_name) |
| 192 | self.my_vims[target_id] = vim_module_conn( |
| 193 | uuid=vim['_id'], name=vim['name'], |
| 194 | tenant_id=vim.get('vim_tenant_id'), tenant_name=vim.get('vim_tenant_name'), |
| 195 | url=vim['vim_url'], url_admin=None, |
| 196 | user=vim['vim_user'], passwd=vim['vim_password'], |
| 197 | config=vim.get('config'), persistent_info={} |
| 198 | ) |
| 199 | self.vim_targets.append(target_id) |
| 200 | self.db_vims[target_id] = vim |
| 201 | self.error_status = None |
| 202 | self.logger.info("Vim Connector loaded for vim_account={}, plugin={}".format( |
| 203 | vim_account_id, plugin_name)) |
| 204 | except Exception as e: |
| 205 | self.logger.error("Cannot load vimconnector for vim_account={} plugin={}: {} {}".format( |
| 206 | vim_account_id, plugin_name, step, e)) |
| 207 | self.db_vims[target_id] = vim or {} |
| 208 | self.my_vims[target_id] = FailingConnector(str(e)) |
| 209 | self.error_status = "Error loading vimconnector: {}".format(e) |
| 210 | |
| 211 | def _get_db_task(self): |
| 212 | """ |
| 213 | Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions |
| 214 | :return: None |
| 215 | """ |
| 216 | now = time.time() |
| 217 | if not self.time_last_task_processed: |
| 218 | self.time_last_task_processed = now |
| 219 | try: |
| 220 | while True: |
| 221 | locked = self.db.set_one( |
| 222 | "ro_tasks", |
| 223 | q_filter={"target_id": self.vim_targets, |
| 224 | "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'], |
| 225 | "locked_at.lt": now - self.MAX_TIME_LOCKED, |
| 226 | "to_check_at.lt": self.time_last_task_processed}, |
| 227 | update_dict={"locked_by": self.my_id, "locked_at": now}, |
| 228 | fail_on_empty=False) |
| 229 | if locked: |
| 230 | # read and return |
| 231 | ro_task = self.db.get_one( |
| 232 | "ro_tasks", |
| 233 | q_filter={"target_id": self.vim_targets, |
| 234 | "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'], |
| 235 | "locked_at": now}) |
| 236 | return ro_task |
| 237 | if self.time_last_task_processed == now: |
| 238 | self.time_last_task_processed = None |
| 239 | return None |
| 240 | else: |
| 241 | self.time_last_task_processed = now |
| 242 | # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now) |
| 243 | |
| 244 | except DbException as e: |
| 245 | self.logger.error("Database exception at _get_db_task: {}".format(e)) |
| 246 | except Exception as e: |
| 247 | self.logger.critical("Unexpected exception at _get_db_task: {}".format(e), exc_info=True) |
| 248 | return None |
| 249 | |
| 250 | def _delete_task(self, ro_task, task_index, task_depends, db_update): |
| 251 | """ |
| 252 | Determine if this task need to be done or superseded |
| 253 | :return: None |
| 254 | """ |
| 255 | my_task = ro_task["tasks"][task_index] |
| 256 | task_id = my_task["task_id"] |
| 257 | needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get("created_items", False) |
| 258 | if my_task["status"] == "FAILED": |
| 259 | return None, None # TODO need to be retry?? |
| 260 | try: |
| 261 | for index, task in enumerate(ro_task["tasks"]): |
| 262 | if index == task_index: |
| 263 | continue # own task |
| 264 | if my_task["target_record"] == task["target_record"] and task["action"] == "CREATE": |
| 265 | # set to finished |
| 266 | db_update["tasks.{}.status".format(index)] = task["status"] = "FINISHED" |
| 267 | elif task["action"] == "CREATE" and task["status"] not in ("FINISHED", "SUPERSEDED"): |
| 268 | needed_delete = False |
| 269 | if needed_delete: |
| 270 | return self.item2delete[my_task["item"]](ro_task, task_index) |
| 271 | else: |
| 272 | return "SUPERSEDED", None |
| 273 | except Exception as e: |
| 274 | if not isinstance(e, NsWorkerException): |
| 275 | self.logger.critical("Unexpected exception at _delete_task task={}: {}".format(task_id, e), |
| 276 | exc_info=True) |
| 277 | return "FAILED", {"vim_status": "VIM_ERROR", "vim_details": str(e)} |
| 278 | |
| 279 | def _create_task(self, ro_task, task_index, task_depends, db_update): |
| 280 | """ |
| 281 | Determine if this task need to be created |
| 282 | :return: None |
| 283 | """ |
| 284 | my_task = ro_task["tasks"][task_index] |
| 285 | task_id = my_task["task_id"] |
| 286 | task_status = None |
| 287 | if my_task["status"] == "FAILED": |
| 288 | return None, None # TODO need to be retry?? |
| 289 | elif my_task["status"] == "SCHEDULED": |
| 290 | # check if already created by another task |
| 291 | for index, task in enumerate(ro_task["tasks"]): |
| 292 | if index == task_index: |
| 293 | continue # own task |
| 294 | if task["action"] == "CREATE" and task["status"] not in ("SCHEDULED", "FINISHED", "SUPERSEDED"): |
| 295 | return task["status"], "COPY_VIM_INFO" |
| 296 | |
| 297 | try: |
| 298 | task_status, ro_vim_item_update = self.item2create[my_task["item"]](ro_task, task_index, task_depends) |
| 299 | # TODO update other CREATE tasks |
| 300 | except Exception as e: |
| 301 | if not isinstance(e, NsWorkerException): |
| 302 | self.logger.error("Error executing task={}: {}".format(task_id, e), exc_info=True) |
| 303 | task_status = "FAILED" |
| 304 | ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_details": str(e)} |
| 305 | # TODO update ro_vim_item_update |
| 306 | return task_status, ro_vim_item_update |
| 307 | else: |
| 308 | return None, None |
| 309 | |
| 310 | def _get_dependency(self, task_id, ro_task=None, target_id=None): |
| 311 | if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"): |
| 312 | ro_task_dependency = self.db.get_one( |
| 313 | "ro_tasks", |
| 314 | q_filter={"target_id": target_id, |
| 315 | "tasks.target_record_id": task_id |
| 316 | }, |
| 317 | fail_on_empty=False) |
| 318 | if ro_task_dependency: |
| 319 | for task_index, task in enumerate(ro_task_dependency["tasks"]): |
| 320 | if task["target_record_id"] == task_id: |
| 321 | return ro_task_dependency, task_index |
| 322 | |
| 323 | else: |
| 324 | if ro_task: |
| 325 | for task_index, task in enumerate(ro_task["tasks"]): |
| 326 | if task["task_id"] == task_id: |
| 327 | return ro_task, task_index |
| 328 | ro_task_dependency = self.db.get_one( |
| 329 | "ro_tasks", |
| 330 | q_filter={"tasks.ANYINDEX.task_id": task_id, |
| 331 | "tasks.ANYINDEX.target_record.ne": None |
| 332 | }, |
| 333 | fail_on_empty=False) |
| 334 | if ro_task_dependency: |
| 335 | for task_index, task in ro_task_dependency["tasks"]: |
| 336 | if task["task_id"] == task_id: |
| 337 | return ro_task_dependency, task_index |
| 338 | raise NsWorkerException("Cannot get depending task {}".format(task_id)) |
| 339 | |
| 340 | def _proccess_pending_tasks(self, ro_task): |
| 341 | ro_task_id = ro_task["_id"] |
| 342 | now = time.time() |
| 343 | next_check_at = now + (24*60*60) # one day |
| 344 | db_ro_task_update = {} |
| 345 | |
| 346 | def _update_refresh(new_status): |
| 347 | # compute next_refresh |
| 348 | nonlocal task |
| 349 | nonlocal next_check_at |
| 350 | nonlocal db_ro_task_update |
| 351 | nonlocal ro_task |
| 352 | |
| 353 | next_refresh = time.time() |
| 354 | if task["item"] in ("image", "flavor"): |
| 355 | next_refresh += self.REFRESH_IMAGE |
| 356 | elif new_status == "BUILD": |
| 357 | next_refresh += self.REFRESH_BUILD |
| 358 | elif new_status == "DONE": |
| 359 | next_refresh += self.REFRESH_ACTIVE |
| 360 | else: |
| 361 | next_refresh += self.REFRESH_ERROR |
| 362 | next_check_at = min(next_check_at, next_refresh) |
| 363 | db_ro_task_update["vim_info.refresh_at"] = next_refresh |
| 364 | ro_task["vim_info"]["refresh_at"] = next_refresh |
| 365 | |
| 366 | try: |
| 367 | # 0 get task_status_create |
| 368 | task_status_create = None |
| 369 | task_create = next((t for t in ro_task["tasks"] if t["action"] == "CREATE" and |
| 370 | t["status"] in ("BUILD", "DONE")), None) |
| 371 | if task_create: |
| 372 | task_status_create = task_create["status"] |
| 373 | # 1. look for SCHEDULED or if CREATE also DONE,BUILD |
| 374 | for task_action in ("DELETE", "CREATE", "EXEC"): |
| 375 | db_vim_update = None |
| 376 | for task_index, task in enumerate(ro_task["tasks"]): |
| 377 | target_update = None |
| 378 | if (task_action in ("DELETE", "EXEC") and task["status"] != "SCHEDULED") or\ |
| 379 | task["action"] != task_action or \ |
| 380 | (task_action == "CREATE" and task["status"] in ("FINISHED", "SUPERSEDED")): |
| 381 | continue |
| 382 | task_path = "tasks.{}.status".format(task_index) |
| 383 | try: |
| 384 | if task["status"] == "SCHEDULED": |
| 385 | task_depends = {} |
| 386 | # check if tasks that this depends on have been completed |
| 387 | dependency_not_completed = False |
| 388 | for dependency_task_id in (task.get("depends_on") or ()): |
| 389 | dependency_ro_task, dependency_task_index = \ |
| 390 | self._get_dependency(dependency_task_id, target_id=ro_task["target_id"]) |
| 391 | dependency_task = dependency_ro_task["tasks"][dependency_task_index] |
| 392 | if dependency_task["status"] == "SCHEDULED": |
| 393 | dependency_not_completed = True |
| 394 | next_check_at = min(next_check_at, dependency_ro_task["to_check_at"]) |
| 395 | break |
| 396 | elif dependency_task["status"] == "FAILED": |
| 397 | error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format( |
| 398 | task["action"], task["item"], dependency_task["action"], |
| 399 | dependency_task["item"], dependency_task_id, |
| 400 | dependency_ro_task["vim_info"].get("vim_details")) |
| 401 | self.logger.error("task={} {}".format(task["task_id"], error_text)) |
| 402 | raise NsWorkerException(error_text) |
| 403 | |
| 404 | task_depends[dependency_task_id] = dependency_ro_task["vim_info"]["vim_id"] |
| 405 | task_depends["TASK-{}".format(dependency_task_id)] = \ |
| 406 | dependency_ro_task["vim_info"]["vim_id"] |
| 407 | if dependency_not_completed: |
| 408 | # TODO set at vim_info.vim_details that it is waiting |
| 409 | continue |
| 410 | |
| 411 | if task["action"] == "DELETE": |
| 412 | new_status, db_vim_info_update = self._delete_task(ro_task, task_index, |
| 413 | task_depends, db_ro_task_update) |
| 414 | new_status = "FINISHED" if new_status == "DONE" else new_status |
| 415 | # ^with FINISHED instead of DONE it will not be refreshing |
| 416 | if new_status in ("FINISHED", "SUPERSEDED"): |
| 417 | target_update = "DELETE" |
| 418 | elif task["action"] == "EXEC": |
| 419 | self.item2action[task["item"]](ro_task, task_index, task_depends, db_ro_task_update) |
| 420 | new_status = "FINISHED" if new_status == "DONE" else new_status |
| 421 | # ^with FINISHED instead of DONE it will not be refreshing |
| 422 | if new_status in ("FINISHED", "SUPERSEDED"): |
| 423 | target_update = "DELETE" |
| 424 | elif task["action"] == "CREATE": |
| 425 | if task["status"] == "SCHEDULED": |
| 426 | if task_status_create: |
| 427 | new_status = task_status_create |
| 428 | target_update = "COPY_VIM_INFO" |
| 429 | else: |
| 430 | new_status, db_vim_info_update = \ |
| 431 | self.item2create[task["item"]](ro_task, task_index, task_depends) |
| 432 | # self._create_task(ro_task, task_index, task_depends, db_ro_task_update) |
| 433 | _update_refresh(new_status) |
| 434 | else: |
| 435 | if ro_task["vim_info"]["refresh_at"] and now > ro_task["vim_info"]["refresh_at"]: |
| 436 | new_status, db_vim_info_update = self.item2refresh[task["item"]](ro_task) |
| 437 | _update_refresh(new_status) |
| 438 | except Exception as e: |
| 439 | new_status = "FAILED" |
| 440 | db_vim_info_update = {"vim_status": "VIM_ERROR", "vim_details": str(e)} |
| 441 | if not isinstance(e, (NsWorkerException, vimconn.VimConnException)): |
| 442 | self.logger.error("Unexpected exception at _delete_task task={}: {}". |
| 443 | format(task["task_id"], e), exc_info=True) |
| 444 | |
| 445 | try: |
| 446 | if db_vim_info_update: |
| 447 | db_vim_update = db_vim_info_update.copy() |
| 448 | db_ro_task_update.update({"vim_info." + k: v for k, v in db_vim_info_update.items()}) |
| 449 | ro_task["vim_info"].update(db_vim_info_update) |
| 450 | |
| 451 | if new_status: |
| 452 | if task_action == "CREATE": |
| 453 | task_status_create = new_status |
| 454 | db_ro_task_update[task_path] = new_status |
| 455 | if target_update or db_vim_update: |
| 456 | |
| 457 | if target_update == "DELETE": |
| 458 | self._update_target(task, None) |
| 459 | elif target_update == "COPY_VIM_INFO": |
| 460 | self._update_target(task, ro_task["vim_info"]) |
| 461 | else: |
| 462 | self._update_target(task, db_vim_update) |
| 463 | |
| 464 | except Exception as e: |
| 465 | self.logger.error("Unexpected exception at _update_target task={}: {}". |
| 466 | format(task["task_id"], e), exc_info=True) |
| 467 | |
| 468 | # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified, |
| 469 | # outside this task (by ro_nbi) do not update it |
| 470 | db_ro_task_update["locked_by"] = None |
| 471 | # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked |
| 472 | db_ro_task_update["locked_at"] = int(now - self.MAX_TIME_LOCKED) |
| 473 | db_ro_task_update["to_check_at"] = next_check_at |
| 474 | if not self.db.set_one("ro_tasks", |
| 475 | update_dict=db_ro_task_update, |
| 476 | q_filter={"_id": ro_task["_id"], "to_check_at": ro_task["to_check_at"]}, |
| 477 | fail_on_empty=False): |
| 478 | del db_ro_task_update["to_check_at"] |
| 479 | self.db.set_one("ro_tasks", |
| 480 | q_filter={"_id": ro_task["_id"]}, |
| 481 | update_dict=db_ro_task_update, |
| 482 | fail_on_empty=True) |
| 483 | except DbException as e: |
| 484 | self.logger.error("ro_task={} Error updating database {}".format(ro_task_id, e)) |
| 485 | except Exception as e: |
| 486 | self.logger.error("Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True) |
| 487 | |
| 488 | def _update_target(self, task, ro_vim_item_update): |
| 489 | try: |
| 490 | table, _id, path = task["target_record"].split(":") |
| 491 | if ro_vim_item_update: |
| 492 | update_dict = {path + "." + k: v for k, v in ro_vim_item_update.items() if k in |
| 493 | ('vim_id', 'vim_details', 'vim_name', 'vim_status', 'interfaces')} |
| 494 | if ro_vim_item_update.get("interfaces"): |
| 495 | path_vdu = path[:path.rfind(".")] |
| 496 | path_vdu = path_vdu[:path_vdu.rfind(".")] |
| 497 | path_interfaces = path_vdu + ".interfaces" |
| 498 | for i, iface in enumerate(ro_vim_item_update.get("interfaces")): |
| 499 | if iface: |
| 500 | update_dict.update({path_interfaces + ".{}.".format(i) + k: v for k, v in iface.items() if |
| 501 | k in ('ip_address', 'mac_address', 'vlan', 'compute_node', 'pci')}) |
| 502 | if iface.get("mgmt_vnf_interface") and iface.get("ip_address"): |
| 503 | update_dict["ip-address"] = iface.get("ip_address").split(";")[0] |
| 504 | if iface.get("mgmt_vdu_interface") and iface.get("ip_address"): |
| 505 | update_dict[path_vdu + ".ip-address"] = iface.get("ip_address").split(";")[0] |
| 506 | |
| 507 | self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict) |
| 508 | else: |
| 509 | self.db.set_one(table, q_filter={"_id": _id}, update_dict=None, |
| 510 | unset={path: None}) |
| 511 | except DbException as e: |
| 512 | self.logger.error("Cannot update database '{}': '{}'".format(task["target_record"], e)) |
| 513 | |
| 514 | def new_image(self, ro_task, task_index, task_depends): |
| 515 | task = ro_task["tasks"][task_index] |
| 516 | task_id = task["task_id"] |
| 517 | created = False |
| 518 | created_items = {} |
| 519 | target_vim = self.my_vims[ro_task["target_id"]] |
| 520 | try: |
| 521 | # FIND |
| 522 | if task.get("find_params"): |
| 523 | vim_images = target_vim.get_image_list(**task["find_params"]) |
| 524 | if not vim_images: |
| 525 | raise NsWorkerExceptionNotFound("Image not found with this criteria: '{}'".format( |
| 526 | task["find_params"])) |
| 527 | elif len(vim_images) > 1: |
| 528 | raise NsWorkerException( |
| 529 | "More than one network found with this criteria: '{}'".format(task["find_params"])) |
| 530 | else: |
| 531 | vim_image_id = vim_images[0]["id"] |
| 532 | |
| 533 | ro_vim_item_update = {"vim_id": vim_image_id, |
| 534 | "vim_status": "DONE", |
| 535 | "created": created, |
| 536 | "created_items": created_items, |
| 537 | "vim_details": None} |
| 538 | self.logger.debug( |
| 539 | "task={} {} new-image={} created={}".format(task_id, ro_task["target_id"], vim_image_id, created)) |
| 540 | return "DONE", ro_vim_item_update |
| 541 | except (NsWorkerException, vimconn.VimConnException) as e: |
| 542 | self.logger.error("task={} {} new-image: {}".format(task_id, ro_task["target_id"], e)) |
| 543 | ro_vim_item_update = {"vim_status": "VIM_ERROR", |
| 544 | "created": created, |
| 545 | "vim_details": str(e)} |
| 546 | return "FAILED", ro_vim_item_update |
| 547 | |
| 548 | def del_flavor(self, ro_task, task_index): |
| 549 | task = ro_task["tasks"][task_index] |
| 550 | task_id = task["task_id"] |
| 551 | flavor_vim_id = ro_task["vim_info"]["vim_id"] |
| 552 | ro_vim_item_update_ok = {"vim_status": "DELETED", |
| 553 | "created": False, |
| 554 | "vim_details": "DELETED", |
| 555 | "vim_id": None} |
| 556 | try: |
| 557 | if flavor_vim_id: |
| 558 | target_vim = self.my_vims[ro_task["target_id"]] |
| 559 | target_vim.delete_flavor(flavor_vim_id) |
| 560 | |
| 561 | except vimconn.VimConnNotFoundException: |
| 562 | ro_vim_item_update_ok["vim_details"] = "already deleted" |
| 563 | |
| 564 | except vimconn.VimConnException as e: |
| 565 | self.logger.error("ro_task={} vim={} del-flavor={}: {}".format( |
| 566 | ro_task["_id"], ro_task["target_id"], flavor_vim_id, e)) |
| 567 | ro_vim_item_update = {"vim_status": "VIM_ERROR", |
| 568 | "vim_details": "Error while deleting: {}".format(e)} |
| 569 | return "FAILED", ro_vim_item_update |
| 570 | |
| 571 | self.logger.debug("task={} {} del-flavor={} {}".format( |
| 572 | task_id, ro_task["target_id"], flavor_vim_id, ro_vim_item_update_ok.get("vim_details", ""))) |
| 573 | return "DONE", ro_vim_item_update_ok |
| 574 | |
| 575 | def refresh_ok(self, ro_task): |
| 576 | """skip calling VIM to get image status. Assumes ok""" |
| 577 | if ro_task["vim_info"]["vim_status"] == "VIM_ERROR": |
| 578 | return "FAILED", {} |
| 579 | return "DONE", {} |
| 580 | |
| 581 | def delete_ok(self, ro_task): |
| 582 | """skip calling VIM to delete image status. Assumes ok""" |
| 583 | return "DONE", {} |
| 584 | |
| 585 | def new_flavor(self, ro_task, task_index, task_depends): |
| 586 | task = ro_task["tasks"][task_index] |
| 587 | task_id = task["task_id"] |
| 588 | created = False |
| 589 | created_items = {} |
| 590 | target_vim = self.my_vims[ro_task["target_id"]] |
| 591 | try: |
| 592 | # FIND |
| 593 | vim_flavor_id = None |
| 594 | if task.get("find_params"): |
| 595 | try: |
| 596 | flavor_data = task["find_params"]["flavor_data"] |
| 597 | vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data) |
| 598 | except vimconn.VimConnNotFoundException: |
| 599 | pass |
| 600 | |
| 601 | if not vim_flavor_id and task.get("params"): |
| 602 | # CREATE |
| 603 | flavor_data = task["params"]["flavor_data"] |
| 604 | vim_flavor_id = target_vim.new_flavor(flavor_data) |
| 605 | created = True |
| 606 | |
| 607 | ro_vim_item_update = {"vim_id": vim_flavor_id, |
| 608 | "vim_status": "DONE", |
| 609 | "created": created, |
| 610 | "created_items": created_items, |
| 611 | "vim_details": None} |
| 612 | self.logger.debug( |
| 613 | "task={} {} new-flavor={} created={}".format(task_id, ro_task["target_id"], vim_flavor_id, created)) |
| 614 | return "DONE", ro_vim_item_update |
| 615 | except (vimconn.VimConnException, NsWorkerException) as e: |
| 616 | self.logger.error("task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e)) |
| 617 | ro_vim_item_update = {"vim_status": "VIM_ERROR", |
| 618 | "created": created, |
| 619 | "vim_details": str(e)} |
| 620 | return "FAILED", ro_vim_item_update |
| 621 | |
| 622 | def new_net(self, ro_task, task_index, task_depends): |
| 623 | vim_net_id = None |
| 624 | task = ro_task["tasks"][task_index] |
| 625 | task_id = task["task_id"] |
| 626 | created = False |
| 627 | created_items = {} |
| 628 | target_vim = self.my_vims[ro_task["target_id"]] |
| 629 | try: |
| 630 | # FIND |
| 631 | if task.get("find_params"): |
| 632 | # if management, get configuration of VIM |
| 633 | if task["find_params"].get("filter_dict"): |
| 634 | vim_filter = task["find_params"]["filter_dict"] |
| 635 | elif task["find_params"].get("mgmt"): # mamagement network |
| 636 | if deep_get(self.db_vims[ro_task["target_id"]], "config", "management_network_id"): |
| 637 | vim_filter = {"id": self.db_vims[ro_task["target_id"]]["config"]["management_network_id"]} |
| 638 | elif deep_get(self.db_vims[ro_task["target_id"]], "config", "management_network_name"): |
| 639 | vim_filter = {"name": self.db_vims[ro_task["target_id"]]["config"]["management_network_name"]} |
| 640 | else: |
| 641 | vim_filter = {"name": task["find_params"]["name"]} |
| 642 | else: |
| 643 | raise NsWorkerExceptionNotFound("Invalid find_params for new_net {}".format(task["find_params"])) |
| 644 | |
| 645 | vim_nets = target_vim.get_network_list(vim_filter) |
| 646 | if not vim_nets and not task.get("params"): |
| 647 | raise NsWorkerExceptionNotFound("Network not found with this criteria: '{}'".format( |
| 648 | task.get("find_params"))) |
| 649 | elif len(vim_nets) > 1: |
| 650 | raise NsWorkerException( |
| 651 | "More than one network found with this criteria: '{}'".format(task["find_params"])) |
| 652 | if vim_nets: |
| 653 | vim_net_id = vim_nets[0]["id"] |
| 654 | else: |
| 655 | # CREATE |
| 656 | params = task["params"] |
| 657 | vim_net_id, created_items = target_vim.new_network(**params) |
| 658 | created = True |
| 659 | |
| 660 | ro_vim_item_update = {"vim_id": vim_net_id, |
| 661 | "vim_status": "BUILD", |
| 662 | "created": created, |
| 663 | "created_items": created_items, |
| 664 | "vim_details": None} |
| 665 | self.logger.debug( |
| 666 | "task={} {} new-net={} created={}".format(task_id, ro_task["target_id"], vim_net_id, created)) |
| 667 | return "BUILD", ro_vim_item_update |
| 668 | except (vimconn.VimConnException, NsWorkerException) as e: |
| 669 | self.logger.error("task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e)) |
| 670 | ro_vim_item_update = {"vim_status": "VIM_ERROR", |
| 671 | "created": created, |
| 672 | "vim_details": str(e)} |
| 673 | return "FAILED", ro_vim_item_update |
| 674 | |
| 675 | def refresh_net(self, ro_task): |
| 676 | """Call VIM to get network status""" |
| 677 | ro_task_id = ro_task["_id"] |
| 678 | target_vim = self.my_vims[ro_task["target_id"]] |
| 679 | |
| 680 | vim_id = ro_task["vim_info"]["vim_id"] |
| 681 | net_to_refresh_list = [vim_id] |
| 682 | try: |
| 683 | vim_dict = target_vim.refresh_nets_status(net_to_refresh_list) |
| 684 | vim_info = vim_dict[vim_id] |
| 685 | if vim_info["status"] == "ACTIVE": |
| 686 | task_status = "DONE" |
| 687 | elif vim_info["status"] == "BUILD": |
| 688 | task_status = "BUILD" |
| 689 | else: |
| 690 | task_status = "FAILED" |
| 691 | except vimconn.VimConnException as e: |
| 692 | # Mark all tasks at VIM_ERROR status |
| 693 | self.logger.error("ro_task={} vim={} get-net={}: {}".format(ro_task_id, ro_task["target_id"], vim_id, e)) |
| 694 | vim_info = {"status": "VIM_ERROR", "error_msg": str(e)} |
| 695 | task_status = "FAILED" |
| 696 | |
| 697 | ro_vim_item_update = {} |
| 698 | if ro_task["vim_info"]["vim_status"] != vim_info["status"]: |
| 699 | ro_vim_item_update["vim_status"] = vim_info["status"] |
| 700 | if ro_task["vim_info"]["vim_name"] != vim_info.get("name"): |
| 701 | ro_vim_item_update["vim_name"] = vim_info.get("name") |
| 702 | if vim_info["status"] in ("ERROR", "VIM_ERROR"): |
| 703 | if ro_task["vim_info"]["vim_details"] != vim_info["error_msg"]: |
| 704 | ro_vim_item_update["vim_details"] = vim_info["error_msg"] |
| 705 | elif vim_info["status"] == "DELETED": |
| 706 | ro_vim_item_update["vim_id"] = None |
| 707 | ro_vim_item_update["vim_details"] = "Deleted externally" |
| 708 | else: |
| 709 | if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]: |
| 710 | ro_vim_item_update["vim_details"] = vim_info["vim_info"] |
| 711 | if ro_vim_item_update: |
| 712 | self.logger.debug("ro_task={} {} get-net={}: status={} {}".format( |
| 713 | ro_task_id, ro_task["target_id"], vim_id, ro_vim_item_update.get("vim_status"), |
| 714 | ro_vim_item_update.get("vim_details") if ro_vim_item_update.get("vim_status") != "ACTIVE" else '')) |
| 715 | return task_status, ro_vim_item_update |
| 716 | |
| 717 | def del_net(self, ro_task, task_index): |
| 718 | task = ro_task["tasks"][task_index] |
| 719 | task_id = task["task_id"] |
| 720 | net_vim_id = ro_task["vim_info"]["vim_id"] |
| 721 | ro_vim_item_update_ok = {"vim_status": "DELETED", |
| 722 | "created": False, |
| 723 | "vim_details": "DELETED", |
| 724 | "vim_id": None} |
| 725 | try: |
| 726 | if net_vim_id or ro_task["vim_info"]["created_items"]: |
| 727 | target_vim = self.my_vims[ro_task["target_id"]] |
| 728 | target_vim.delete_network(net_vim_id, ro_task["vim_info"]["created_items"]) |
| 729 | |
| 730 | except vimconn.VimConnNotFoundException: |
| 731 | ro_vim_item_update_ok["vim_details"] = "already deleted" |
| 732 | |
| 733 | except vimconn.VimConnException as e: |
| 734 | self.logger.error("ro_task={} vim={} del-net={}: {}".format(ro_task["_id"], ro_task["target_id"], |
| 735 | net_vim_id, e)) |
| 736 | ro_vim_item_update = {"vim_status": "VIM_ERROR", |
| 737 | "vim_details": "Error while deleting: {}".format(e)} |
| 738 | return "FAILED", ro_vim_item_update |
| 739 | |
| 740 | self.logger.debug("task={} {} del-net={} {}".format(task_id, ro_task["target_id"], net_vim_id, |
| 741 | ro_vim_item_update_ok.get("vim_details", ""))) |
| 742 | return "DONE", ro_vim_item_update_ok |
| 743 | |
| 744 | def new_vm(self, ro_task, task_index, task_depends): |
| 745 | task = ro_task["tasks"][task_index] |
| 746 | task_id = task["task_id"] |
| 747 | created = False |
| 748 | created_items = {} |
| 749 | target_vim = self.my_vims[ro_task["target_id"]] |
| 750 | try: |
| 751 | created = True |
| 752 | params = task["params"] |
| 753 | params_copy = deepcopy(params) |
| 754 | net_list = params_copy["net_list"] |
| 755 | for net in net_list: |
| 756 | if "net_id" in net and net["net_id"].startswith("TASK-"): # change task_id into network_id |
| 757 | network_id = task_depends[net["net_id"]] |
| 758 | if not network_id: |
| 759 | raise NsWorkerException("Cannot create VM because depends on a network not created or found " |
| 760 | "for {}".format(net["net_id"])) |
| 761 | net["net_id"] = network_id |
| 762 | if params_copy["image_id"].startswith("TASK-"): |
| 763 | params_copy["image_id"] = task_depends[params_copy["image_id"]] |
| 764 | if params_copy["flavor_id"].startswith("TASK-"): |
| 765 | params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]] |
| 766 | |
| 767 | vim_vm_id, created_items = target_vim.new_vminstance(**params_copy) |
| 768 | interfaces = [iface["vim_id"] for iface in params_copy["net_list"]] |
| 769 | |
| 770 | ro_vim_item_update = {"vim_id": vim_vm_id, |
| 771 | "vim_status": "BUILD", |
| 772 | "created": created, |
| 773 | "created_items": created_items, |
| 774 | "vim_details": None, |
| 775 | "interfaces_vim_ids": interfaces, |
| 776 | "interfaces": [], |
| 777 | } |
| 778 | self.logger.debug( |
| 779 | "task={} {} new-vm={} created={}".format(task_id, ro_task["target_id"], vim_vm_id, created)) |
| 780 | return "BUILD", ro_vim_item_update |
| 781 | except (vimconn.VimConnException, NsWorkerException) as e: |
| 782 | self.logger.error("task={} vim={} new-vm: {}".format(task_id, ro_task["target_id"], e)) |
| 783 | ro_vim_item_update = {"vim_status": "VIM_ERROR", |
| 784 | "created": created, |
| 785 | "vim_details": str(e)} |
| 786 | return "FAILED", ro_vim_item_update |
| 787 | |
| 788 | def del_vm(self, ro_task, task_index): |
| 789 | task = ro_task["tasks"][task_index] |
| 790 | task_id = task["task_id"] |
| 791 | vm_vim_id = ro_task["vim_info"]["vim_id"] |
| 792 | ro_vim_item_update_ok = {"vim_status": "DELETED", |
| 793 | "created": False, |
| 794 | "vim_details": "DELETED", |
| 795 | "vim_id": None} |
| 796 | try: |
| 797 | if vm_vim_id or ro_task["vim_info"]["created_items"]: |
| 798 | target_vim = self.my_vims[ro_task["target_id"]] |
| 799 | target_vim.delete_vminstance(vm_vim_id, ro_task["vim_info"]["created_items"]) |
| 800 | |
| 801 | except vimconn.VimConnNotFoundException: |
| 802 | ro_vim_item_update_ok["vim_details"] = "already deleted" |
| 803 | |
| 804 | except vimconn.VimConnException as e: |
| 805 | self.logger.error("ro_task={} vim={} del-vm={}: {}".format(ro_task["_id"], ro_task["target_id"], |
| 806 | vm_vim_id, e)) |
| 807 | ro_vim_item_update = {"vim_status": "VIM_ERROR", |
| 808 | "vim_details": "Error while deleting: {}".format(e)} |
| 809 | return "FAILED", ro_vim_item_update |
| 810 | |
| 811 | self.logger.debug("task={} {} del-vm={} {}".format(task_id, ro_task["target_id"], vm_vim_id, |
| 812 | ro_vim_item_update_ok.get("vim_details", ""))) |
| 813 | return "DONE", ro_vim_item_update_ok |
| 814 | |
| 815 | def refresh_vm(self, ro_task): |
| 816 | """Call VIM to get vm status""" |
| 817 | ro_task_id = ro_task["_id"] |
| 818 | target_vim = self.my_vims[ro_task["target_id"]] |
| 819 | |
| 820 | vim_id = ro_task["vim_info"]["vim_id"] |
| 821 | if not vim_id: |
| 822 | return None, None |
| 823 | vm_to_refresh_list = [vim_id] |
| 824 | try: |
| 825 | vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list) |
| 826 | vim_info = vim_dict[vim_id] |
| 827 | if vim_info["status"] == "ACTIVE": |
| 828 | task_status = "DONE" |
| 829 | elif vim_info["status"] == "BUILD": |
| 830 | task_status = "BUILD" |
| 831 | else: |
| 832 | task_status = "FAILED" |
| 833 | except vimconn.VimConnException as e: |
| 834 | # Mark all tasks at VIM_ERROR status |
| 835 | self.logger.error("ro_task={} vim={} get-vm={}: {}".format(ro_task_id, ro_task["target_id"], vim_id, e)) |
| 836 | vim_info = {"status": "VIM_ERROR", "error_msg": str(e)} |
| 837 | task_status = "FAILED" |
| 838 | |
| 839 | ro_vim_item_update = {} |
| 840 | # TODO check and update interfaces |
| 841 | vim_interfaces = [] |
| 842 | for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]: |
| 843 | iface = next((iface for iface in vim_info["interfaces"] if vim_iface_id == iface["vim_interface_id"]), None) |
| 844 | # if iface: |
| 845 | # iface.pop("vim_info", None) |
| 846 | vim_interfaces.append(iface) |
| 847 | |
| 848 | task = ro_task["tasks"][0] # TODO look for a task CREATE and active |
| 849 | if task.get("mgmt_vnf_interface") is not None: |
| 850 | vim_interfaces[task["mgmt_vnf_interface"]]["mgmt_vnf_interface"] = True |
| 851 | mgmt_vdu_iface = task.get("mgmt_vdu_interface", task.get("mgmt_vnf_interface", 0)) |
| 852 | vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True |
| 853 | |
| 854 | if ro_task["vim_info"]["interfaces"] != vim_interfaces: |
| 855 | ro_vim_item_update["interfaces"] = vim_interfaces |
| 856 | if ro_task["vim_info"]["vim_status"] != vim_info["status"]: |
| 857 | ro_vim_item_update["vim_status"] = vim_info["status"] |
| 858 | if ro_task["vim_info"]["vim_name"] != vim_info.get("name"): |
| 859 | ro_vim_item_update["vim_name"] = vim_info.get("name") |
| 860 | if vim_info["status"] in ("ERROR", "VIM_ERROR"): |
| 861 | if ro_task["vim_info"]["vim_details"] != vim_info["error_msg"]: |
| 862 | ro_vim_item_update["vim_details"] = vim_info["error_msg"] |
| 863 | elif vim_info["status"] == "DELETED": |
| 864 | ro_vim_item_update["vim_id"] = None |
| 865 | ro_vim_item_update["vim_details"] = "Deleted externally" |
| 866 | else: |
| 867 | if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]: |
| 868 | ro_vim_item_update["vim_details"] = vim_info["vim_info"] |
| 869 | if ro_vim_item_update: |
| 870 | self.logger.debug("ro_task={} {} get-vm={}: status={} {}".format( |
| 871 | ro_task_id, ro_task["target_id"], vim_id, ro_vim_item_update.get("vim_status"), |
| 872 | ro_vim_item_update.get("vim_details") if ro_vim_item_update.get("vim_status") != "ACTIVE" else '')) |
| 873 | return task_status, ro_vim_item_update |
| 874 | |
| 875 | def exec_vm(self, ro_task, task_index, task_depends): |
| 876 | task = ro_task["tasks"][task_index] |
| 877 | task_id = task["task_id"] |
| 878 | target_vim = self.my_vims[ro_task["target_id"]] |
| 879 | try: |
| 880 | params = task["params"] |
| 881 | params_copy = deepcopy(params) |
| 882 | params_copy["use_pri_key"] = self.db.decrypt(params_copy.pop("private_key"), |
| 883 | params_copy.pop("schema_version"), params_copy.pop("salt")) |
| 884 | |
| 885 | target_vim.inject_user_key(**params_copy) |
| 886 | self.logger.debug( |
| 887 | "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"])) |
| 888 | return "DONE", params_copy["key"] |
| 889 | except (vimconn.VimConnException, NsWorkerException) as e: |
| 890 | self.logger.error("task={} vim={} new-vm: {}".format(task_id, ro_task["target_id"], e)) |
| 891 | ro_vim_item_update = {"vim_details": str(e)} |
| 892 | return "FAILED", ro_vim_item_update |
| 893 | |
| 894 | def run(self): |
| 895 | # load database |
| 896 | self.logger.debug("Starting") |
| 897 | while True: |
| 898 | try: |
| 899 | task = self.task_queue.get(block=False if self.my_vims else True) |
| 900 | if task[0] == "terminate": |
| 901 | break |
| 902 | if task[0] == "load_vim": |
| 903 | self._load_vim(task[1]) |
| 904 | continue |
| 905 | except queue.Empty: |
| 906 | pass |
| 907 | |
| 908 | try: |
| 909 | busy = False |
| 910 | ro_task = self._get_db_task() |
| 911 | if ro_task: |
| 912 | self._proccess_pending_tasks(ro_task) |
| 913 | busy = True |
| 914 | if not busy: |
| 915 | time.sleep(5) |
| 916 | except Exception as e: |
| 917 | self.logger.critical("Unexpected exception at run: " + str(e), exc_info=True) |
| 918 | |
| 919 | self.logger.debug("Finishing") |