861ed3d723a78d1cef02e6bc5b5811e119e8e25f
[osm/RO.git] / NG-RO / osm_ng_ro / ns_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 #
18 ##
19
20 """
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
25 """
26
27 from copy import deepcopy
28 from http import HTTPStatus
29 import logging
30 from os import makedirs
31 from os import path
32 import queue
33 import threading
34 import time
35 import traceback
36 from typing import Dict
37 from unittest.mock import Mock
38
39 from importlib_metadata import entry_points
40 from osm_common.dbbase import DbException
41 from osm_ng_ro.vim_admin import LockRenew
42 from osm_ro_plugin import sdnconn
43 from osm_ro_plugin import vimconn
44 from osm_ro_plugin.sdn_dummy import SdnDummyConnector
45 from osm_ro_plugin.vim_dummy import VimDummyConnector
46 import yaml
47
48 __author__ = "Alfonso Tierno"
49 __date__ = "$28-Sep-2017 12:07:15$"
50
51
52 def deep_get(target_dict, *args, **kwargs):
53 """
54 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
55 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
56 :param target_dict: dictionary to be read
57 :param args: list of keys to read from target_dict
58 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
59 :return: The wanted value if exists, None or default otherwise
60 """
61 for key in args:
62 if not isinstance(target_dict, dict) or key not in target_dict:
63 return kwargs.get("default")
64 target_dict = target_dict[key]
65 return target_dict
66
67
68 class NsWorkerException(Exception):
69 pass
70
71
72 class FailingConnector:
73 def __init__(self, error_msg):
74 self.error_msg = error_msg
75
76 for method in dir(vimconn.VimConnector):
77 if method[0] != "_":
78 setattr(
79 self, method, Mock(side_effect=vimconn.VimConnException(error_msg))
80 )
81
82 for method in dir(sdnconn.SdnConnectorBase):
83 if method[0] != "_":
84 setattr(
85 self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg))
86 )
87
88
89 class NsWorkerExceptionNotFound(NsWorkerException):
90 pass
91
92
93 class VimInteractionBase:
94 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
95 It implements methods that does nothing and return ok"""
96
97 def __init__(self, db, my_vims, db_vims, logger):
98 self.db = db
99 self.logger = logger
100 self.my_vims = my_vims
101 self.db_vims = db_vims
102
103 def new(self, ro_task, task_index, task_depends):
104 return "BUILD", {}
105
106 def refresh(self, ro_task):
107 """skip calling VIM to get image, flavor status. Assumes ok"""
108 if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
109 return "FAILED", {}
110
111 return "DONE", {}
112
113 def delete(self, ro_task, task_index):
114 """skip calling VIM to delete image. Assumes ok"""
115 return "DONE", {}
116
117 def exec(self, ro_task, task_index, task_depends):
118 return "DONE", None, None
119
120
121 class VimInteractionNet(VimInteractionBase):
122 def new(self, ro_task, task_index, task_depends):
123 vim_net_id = None
124 task = ro_task["tasks"][task_index]
125 task_id = task["task_id"]
126 created = False
127 created_items = {}
128 target_vim = self.my_vims[ro_task["target_id"]]
129 mgmtnet = False
130 mgmtnet_defined_in_vim = False
131
132 try:
133 # FIND
134 if task.get("find_params"):
135 # if management, get configuration of VIM
136 if task["find_params"].get("filter_dict"):
137 vim_filter = task["find_params"]["filter_dict"]
138 # management network
139 elif task["find_params"].get("mgmt"):
140 mgmtnet = True
141 if deep_get(
142 self.db_vims[ro_task["target_id"]],
143 "config",
144 "management_network_id",
145 ):
146 mgmtnet_defined_in_vim = True
147 vim_filter = {
148 "id": self.db_vims[ro_task["target_id"]]["config"][
149 "management_network_id"
150 ]
151 }
152 elif deep_get(
153 self.db_vims[ro_task["target_id"]],
154 "config",
155 "management_network_name",
156 ):
157 mgmtnet_defined_in_vim = True
158 vim_filter = {
159 "name": self.db_vims[ro_task["target_id"]]["config"][
160 "management_network_name"
161 ]
162 }
163 else:
164 vim_filter = {"name": task["find_params"]["name"]}
165 else:
166 raise NsWorkerExceptionNotFound(
167 "Invalid find_params for new_net {}".format(task["find_params"])
168 )
169
170 vim_nets = target_vim.get_network_list(vim_filter)
171 if not vim_nets and not task.get("params"):
172 # If there is mgmt-network in the descriptor,
173 # there is no mapping of that network to a VIM network in the descriptor,
174 # also there is no mapping in the "--config" parameter or at VIM creation;
175 # that mgmt-network will be created.
176 if mgmtnet and not mgmtnet_defined_in_vim:
177 net_name = (
178 vim_filter.get("name")
179 if vim_filter.get("name")
180 else vim_filter.get("id")[:16]
181 )
182 vim_net_id, created_items = target_vim.new_network(
183 net_name, None
184 )
185 self.logger.debug(
186 "Created mgmt network vim_net_id: {}".format(vim_net_id)
187 )
188 created = True
189 else:
190 raise NsWorkerExceptionNotFound(
191 "Network not found with this criteria: '{}'".format(
192 task.get("find_params")
193 )
194 )
195 elif len(vim_nets) > 1:
196 raise NsWorkerException(
197 "More than one network found with this criteria: '{}'".format(
198 task["find_params"]
199 )
200 )
201
202 if vim_nets:
203 vim_net_id = vim_nets[0]["id"]
204 else:
205 # CREATE
206 params = task["params"]
207 vim_net_id, created_items = target_vim.new_network(**params)
208 created = True
209
210 ro_vim_item_update = {
211 "vim_id": vim_net_id,
212 "vim_status": "BUILD",
213 "created": created,
214 "created_items": created_items,
215 "vim_details": None,
216 "vim_message": None,
217 }
218 self.logger.debug(
219 "task={} {} new-net={} created={}".format(
220 task_id, ro_task["target_id"], vim_net_id, created
221 )
222 )
223
224 return "BUILD", ro_vim_item_update
225 except (vimconn.VimConnException, NsWorkerException) as e:
226 self.logger.error(
227 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e)
228 )
229 ro_vim_item_update = {
230 "vim_status": "VIM_ERROR",
231 "created": created,
232 "vim_message": str(e),
233 }
234
235 return "FAILED", ro_vim_item_update
236
237 def refresh(self, ro_task):
238 """Call VIM to get network status"""
239 ro_task_id = ro_task["_id"]
240 target_vim = self.my_vims[ro_task["target_id"]]
241 vim_id = ro_task["vim_info"]["vim_id"]
242 net_to_refresh_list = [vim_id]
243
244 try:
245 vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
246 vim_info = vim_dict[vim_id]
247
248 if vim_info["status"] == "ACTIVE":
249 task_status = "DONE"
250 elif vim_info["status"] == "BUILD":
251 task_status = "BUILD"
252 else:
253 task_status = "FAILED"
254 except vimconn.VimConnException as e:
255 # Mark all tasks at VIM_ERROR status
256 self.logger.error(
257 "ro_task={} vim={} get-net={}: {}".format(
258 ro_task_id, ro_task["target_id"], vim_id, e
259 )
260 )
261 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
262 task_status = "FAILED"
263
264 ro_vim_item_update = {}
265 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
266 ro_vim_item_update["vim_status"] = vim_info["status"]
267
268 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
269 ro_vim_item_update["vim_name"] = vim_info.get("name")
270
271 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
272 if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"):
273 ro_vim_item_update["vim_message"] = vim_info.get("error_msg")
274 elif vim_info["status"] == "DELETED":
275 ro_vim_item_update["vim_id"] = None
276 ro_vim_item_update["vim_message"] = "Deleted externally"
277 else:
278 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
279 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
280
281 if ro_vim_item_update:
282 self.logger.debug(
283 "ro_task={} {} get-net={}: status={} {}".format(
284 ro_task_id,
285 ro_task["target_id"],
286 vim_id,
287 ro_vim_item_update.get("vim_status"),
288 ro_vim_item_update.get("vim_message")
289 if ro_vim_item_update.get("vim_status") != "ACTIVE"
290 else "",
291 )
292 )
293
294 return task_status, ro_vim_item_update
295
296 def delete(self, ro_task, task_index):
297 task = ro_task["tasks"][task_index]
298 task_id = task["task_id"]
299 net_vim_id = ro_task["vim_info"]["vim_id"]
300 ro_vim_item_update_ok = {
301 "vim_status": "DELETED",
302 "created": False,
303 "vim_message": "DELETED",
304 "vim_id": None,
305 }
306
307 try:
308 if net_vim_id or ro_task["vim_info"]["created_items"]:
309 target_vim = self.my_vims[ro_task["target_id"]]
310 target_vim.delete_network(
311 net_vim_id, ro_task["vim_info"]["created_items"]
312 )
313 except vimconn.VimConnNotFoundException:
314 ro_vim_item_update_ok["vim_message"] = "already deleted"
315 except vimconn.VimConnException as e:
316 self.logger.error(
317 "ro_task={} vim={} del-net={}: {}".format(
318 ro_task["_id"], ro_task["target_id"], net_vim_id, e
319 )
320 )
321 ro_vim_item_update = {
322 "vim_status": "VIM_ERROR",
323 "vim_message": "Error while deleting: {}".format(e),
324 }
325
326 return "FAILED", ro_vim_item_update
327
328 self.logger.debug(
329 "task={} {} del-net={} {}".format(
330 task_id,
331 ro_task["target_id"],
332 net_vim_id,
333 ro_vim_item_update_ok.get("vim_message", ""),
334 )
335 )
336
337 return "DONE", ro_vim_item_update_ok
338
339
340 class VimInteractionVdu(VimInteractionBase):
341 max_retries_inject_ssh_key = 20 # 20 times
342 time_retries_inject_ssh_key = 30 # wevery 30 seconds
343
344 def new(self, ro_task, task_index, task_depends):
345 task = ro_task["tasks"][task_index]
346 task_id = task["task_id"]
347 created = False
348 target_vim = self.my_vims[ro_task["target_id"]]
349 try:
350 created = True
351 params = task["params"]
352 params_copy = deepcopy(params)
353 net_list = params_copy["net_list"]
354
355 for net in net_list:
356 # change task_id into network_id
357 if "net_id" in net and net["net_id"].startswith("TASK-"):
358 network_id = task_depends[net["net_id"]]
359
360 if not network_id:
361 raise NsWorkerException(
362 "Cannot create VM because depends on a network not created or found "
363 "for {}".format(net["net_id"])
364 )
365
366 net["net_id"] = network_id
367
368 if params_copy["image_id"].startswith("TASK-"):
369 params_copy["image_id"] = task_depends[params_copy["image_id"]]
370
371 if params_copy["flavor_id"].startswith("TASK-"):
372 params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
373
374 affinity_group_list = params_copy["affinity_group_list"]
375 for affinity_group in affinity_group_list:
376 # change task_id into affinity_group_id
377 if "affinity_group_id" in affinity_group and affinity_group[
378 "affinity_group_id"
379 ].startswith("TASK-"):
380 affinity_group_id = task_depends[
381 affinity_group["affinity_group_id"]
382 ]
383
384 if not affinity_group_id:
385 raise NsWorkerException(
386 "found for {}".format(affinity_group["affinity_group_id"])
387 )
388
389 affinity_group["affinity_group_id"] = affinity_group_id
390 vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
391 interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
392
393 # add to created items previous_created_volumes (healing)
394 if task.get("previous_created_volumes"):
395 for k, v in task["previous_created_volumes"].items():
396 created_items[k] = v
397
398 ro_vim_item_update = {
399 "vim_id": vim_vm_id,
400 "vim_status": "BUILD",
401 "created": created,
402 "created_items": created_items,
403 "vim_details": None,
404 "vim_message": None,
405 "interfaces_vim_ids": interfaces,
406 "interfaces": [],
407 "interfaces_backup": [],
408 }
409 self.logger.debug(
410 "task={} {} new-vm={} created={}".format(
411 task_id, ro_task["target_id"], vim_vm_id, created
412 )
413 )
414
415 return "BUILD", ro_vim_item_update
416 except (vimconn.VimConnException, NsWorkerException) as e:
417 self.logger.debug(traceback.format_exc())
418 self.logger.error(
419 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
420 )
421 ro_vim_item_update = {
422 "vim_status": "VIM_ERROR",
423 "created": created,
424 "vim_message": str(e),
425 }
426
427 return "FAILED", ro_vim_item_update
428
429 def delete(self, ro_task, task_index):
430 task = ro_task["tasks"][task_index]
431 task_id = task["task_id"]
432 vm_vim_id = ro_task["vim_info"]["vim_id"]
433 ro_vim_item_update_ok = {
434 "vim_status": "DELETED",
435 "created": False,
436 "vim_message": "DELETED",
437 "vim_id": None,
438 }
439
440 try:
441 self.logger.debug(
442 "delete_vminstance: vm_vim_id={} created_items={}".format(
443 vm_vim_id, ro_task["vim_info"]["created_items"]
444 )
445 )
446 if vm_vim_id or ro_task["vim_info"]["created_items"]:
447 target_vim = self.my_vims[ro_task["target_id"]]
448 target_vim.delete_vminstance(
449 vm_vim_id,
450 ro_task["vim_info"]["created_items"],
451 ro_task["vim_info"].get("volumes_to_hold", []),
452 )
453 except vimconn.VimConnNotFoundException:
454 ro_vim_item_update_ok["vim_message"] = "already deleted"
455 except vimconn.VimConnException as e:
456 self.logger.error(
457 "ro_task={} vim={} del-vm={}: {}".format(
458 ro_task["_id"], ro_task["target_id"], vm_vim_id, e
459 )
460 )
461 ro_vim_item_update = {
462 "vim_status": "VIM_ERROR",
463 "vim_message": "Error while deleting: {}".format(e),
464 }
465
466 return "FAILED", ro_vim_item_update
467
468 self.logger.debug(
469 "task={} {} del-vm={} {}".format(
470 task_id,
471 ro_task["target_id"],
472 vm_vim_id,
473 ro_vim_item_update_ok.get("vim_message", ""),
474 )
475 )
476
477 return "DONE", ro_vim_item_update_ok
478
479 def refresh(self, ro_task):
480 """Call VIM to get vm status"""
481 ro_task_id = ro_task["_id"]
482 target_vim = self.my_vims[ro_task["target_id"]]
483 vim_id = ro_task["vim_info"]["vim_id"]
484
485 if not vim_id:
486 return None, None
487
488 vm_to_refresh_list = [vim_id]
489 try:
490 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
491 vim_info = vim_dict[vim_id]
492
493 if vim_info["status"] == "ACTIVE":
494 task_status = "DONE"
495 elif vim_info["status"] == "BUILD":
496 task_status = "BUILD"
497 else:
498 task_status = "FAILED"
499
500 # try to load and parse vim_information
501 try:
502 vim_info_info = yaml.safe_load(vim_info["vim_info"])
503 if vim_info_info.get("name"):
504 vim_info["name"] = vim_info_info["name"]
505 except Exception as vim_info_error:
506 self.logger.exception(
507 f"{vim_info_error} occured while getting the vim_info from yaml"
508 )
509 except vimconn.VimConnException as e:
510 # Mark all tasks at VIM_ERROR status
511 self.logger.error(
512 "ro_task={} vim={} get-vm={}: {}".format(
513 ro_task_id, ro_task["target_id"], vim_id, e
514 )
515 )
516 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
517 task_status = "FAILED"
518
519 ro_vim_item_update = {}
520
521 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
522 vim_interfaces = []
523 if vim_info.get("interfaces"):
524 for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
525 iface = next(
526 (
527 iface
528 for iface in vim_info["interfaces"]
529 if vim_iface_id == iface["vim_interface_id"]
530 ),
531 None,
532 )
533 # if iface:
534 # iface.pop("vim_info", None)
535 vim_interfaces.append(iface)
536
537 task_create = next(
538 t
539 for t in ro_task["tasks"]
540 if t and t["action"] == "CREATE" and t["status"] != "FINISHED"
541 )
542 if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None:
543 vim_interfaces[task_create["mgmt_vnf_interface"]][
544 "mgmt_vnf_interface"
545 ] = True
546
547 mgmt_vdu_iface = task_create.get(
548 "mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0)
549 )
550 if vim_interfaces:
551 vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
552
553 if ro_task["vim_info"]["interfaces"] != vim_interfaces:
554 ro_vim_item_update["interfaces"] = vim_interfaces
555
556 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
557 ro_vim_item_update["vim_status"] = vim_info["status"]
558
559 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
560 ro_vim_item_update["vim_name"] = vim_info.get("name")
561
562 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
563 if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"):
564 ro_vim_item_update["vim_message"] = vim_info.get("error_msg")
565 elif vim_info["status"] == "DELETED":
566 ro_vim_item_update["vim_id"] = None
567 ro_vim_item_update["vim_message"] = "Deleted externally"
568 else:
569 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
570 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
571
572 if ro_vim_item_update:
573 self.logger.debug(
574 "ro_task={} {} get-vm={}: status={} {}".format(
575 ro_task_id,
576 ro_task["target_id"],
577 vim_id,
578 ro_vim_item_update.get("vim_status"),
579 ro_vim_item_update.get("vim_message")
580 if ro_vim_item_update.get("vim_status") != "ACTIVE"
581 else "",
582 )
583 )
584
585 return task_status, ro_vim_item_update
586
587 def exec(self, ro_task, task_index, task_depends):
588 task = ro_task["tasks"][task_index]
589 task_id = task["task_id"]
590 target_vim = self.my_vims[ro_task["target_id"]]
591 db_task_update = {"retries": 0}
592 retries = task.get("retries", 0)
593
594 try:
595 params = task["params"]
596 params_copy = deepcopy(params)
597 params_copy["ro_key"] = self.db.decrypt(
598 params_copy.pop("private_key"),
599 params_copy.pop("schema_version"),
600 params_copy.pop("salt"),
601 )
602 params_copy["ip_addr"] = params_copy.pop("ip_address")
603 target_vim.inject_user_key(**params_copy)
604 self.logger.debug(
605 "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"])
606 )
607
608 return (
609 "DONE",
610 None,
611 db_task_update,
612 ) # params_copy["key"]
613 except (vimconn.VimConnException, NsWorkerException) as e:
614 retries += 1
615
616 self.logger.debug(traceback.format_exc())
617 if retries < self.max_retries_inject_ssh_key:
618 return (
619 "BUILD",
620 None,
621 {
622 "retries": retries,
623 "next_retry": self.time_retries_inject_ssh_key,
624 },
625 )
626
627 self.logger.error(
628 "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e)
629 )
630 ro_vim_item_update = {"vim_message": str(e)}
631
632 return "FAILED", ro_vim_item_update, db_task_update
633
634
635 class VimInteractionImage(VimInteractionBase):
636 def new(self, ro_task, task_index, task_depends):
637 task = ro_task["tasks"][task_index]
638 task_id = task["task_id"]
639 created = False
640 created_items = {}
641 target_vim = self.my_vims[ro_task["target_id"]]
642
643 try:
644 # FIND
645 vim_image_id = ""
646 if task.get("find_params"):
647 vim_images = target_vim.get_image_list(**task["find_params"])
648
649 if not vim_images:
650 raise NsWorkerExceptionNotFound(
651 "Image not found with this criteria: '{}'".format(
652 task["find_params"]
653 )
654 )
655 elif len(vim_images) > 1:
656 raise NsWorkerException(
657 "More than one image found with this criteria: '{}'".format(
658 task["find_params"]
659 )
660 )
661 else:
662 vim_image_id = vim_images[0]["id"]
663
664 ro_vim_item_update = {
665 "vim_id": vim_image_id,
666 "vim_status": "ACTIVE",
667 "created": created,
668 "created_items": created_items,
669 "vim_details": None,
670 "vim_message": None,
671 }
672 self.logger.debug(
673 "task={} {} new-image={} created={}".format(
674 task_id, ro_task["target_id"], vim_image_id, created
675 )
676 )
677
678 return "DONE", ro_vim_item_update
679 except (NsWorkerException, vimconn.VimConnException) as e:
680 self.logger.error(
681 "task={} {} new-image: {}".format(task_id, ro_task["target_id"], e)
682 )
683 ro_vim_item_update = {
684 "vim_status": "VIM_ERROR",
685 "created": created,
686 "vim_message": str(e),
687 }
688
689 return "FAILED", ro_vim_item_update
690
691
692 class VimInteractionSharedVolume(VimInteractionBase):
693 def delete(self, ro_task, task_index):
694 task = ro_task["tasks"][task_index]
695 task_id = task["task_id"]
696 shared_volume_vim_id = ro_task["vim_info"]["vim_id"]
697 created_items = ro_task["vim_info"]["created_items"]
698 ro_vim_item_update_ok = {
699 "vim_status": "DELETED",
700 "created": False,
701 "vim_message": "DELETED",
702 "vim_id": None,
703 }
704 if created_items and created_items.get(shared_volume_vim_id).get("keep"):
705 ro_vim_item_update_ok = {
706 "vim_status": "ACTIVE",
707 "created": False,
708 "vim_message": None,
709 }
710 return "DONE", ro_vim_item_update_ok
711 try:
712 if shared_volume_vim_id:
713 target_vim = self.my_vims[ro_task["target_id"]]
714 target_vim.delete_shared_volumes(shared_volume_vim_id)
715 except vimconn.VimConnNotFoundException:
716 ro_vim_item_update_ok["vim_message"] = "already deleted"
717 except vimconn.VimConnException as e:
718 self.logger.error(
719 "ro_task={} vim={} del-shared-volume={}: {}".format(
720 ro_task["_id"], ro_task["target_id"], shared_volume_vim_id, e
721 )
722 )
723 ro_vim_item_update = {
724 "vim_status": "VIM_ERROR",
725 "vim_message": "Error while deleting: {}".format(e),
726 }
727
728 return "FAILED", ro_vim_item_update
729
730 self.logger.debug(
731 "task={} {} del-shared-volume={} {}".format(
732 task_id,
733 ro_task["target_id"],
734 shared_volume_vim_id,
735 ro_vim_item_update_ok.get("vim_message", ""),
736 )
737 )
738
739 return "DONE", ro_vim_item_update_ok
740
741 def new(self, ro_task, task_index, task_depends):
742 task = ro_task["tasks"][task_index]
743 task_id = task["task_id"]
744 created = False
745 created_items = {}
746 target_vim = self.my_vims[ro_task["target_id"]]
747
748 try:
749 shared_volume_vim_id = None
750 shared_volume_data = None
751
752 if task.get("params"):
753 shared_volume_data = task["params"]
754
755 if shared_volume_data:
756 self.logger.info(
757 f"Creating the new shared_volume for {shared_volume_data}\n"
758 )
759 (
760 shared_volume_name,
761 shared_volume_vim_id,
762 ) = target_vim.new_shared_volumes(shared_volume_data)
763 created = True
764 created_items[shared_volume_vim_id] = {
765 "name": shared_volume_name,
766 "keep": shared_volume_data.get("keep"),
767 }
768
769 ro_vim_item_update = {
770 "vim_id": shared_volume_vim_id,
771 "vim_status": "ACTIVE",
772 "created": created,
773 "created_items": created_items,
774 "vim_details": None,
775 "vim_message": None,
776 }
777 self.logger.debug(
778 "task={} {} new-shared-volume={} created={}".format(
779 task_id, ro_task["target_id"], shared_volume_vim_id, created
780 )
781 )
782
783 return "DONE", ro_vim_item_update
784 except (vimconn.VimConnException, NsWorkerException) as e:
785 self.logger.error(
786 "task={} vim={} new-shared-volume:"
787 " {}".format(task_id, ro_task["target_id"], e)
788 )
789 ro_vim_item_update = {
790 "vim_status": "VIM_ERROR",
791 "created": created,
792 "vim_message": str(e),
793 }
794
795 return "FAILED", ro_vim_item_update
796
797
798 class VimInteractionFlavor(VimInteractionBase):
799 def delete(self, ro_task, task_index):
800 task = ro_task["tasks"][task_index]
801 task_id = task["task_id"]
802 flavor_vim_id = ro_task["vim_info"]["vim_id"]
803 ro_vim_item_update_ok = {
804 "vim_status": "DELETED",
805 "created": False,
806 "vim_message": "DELETED",
807 "vim_id": None,
808 }
809
810 try:
811 if flavor_vim_id:
812 target_vim = self.my_vims[ro_task["target_id"]]
813 target_vim.delete_flavor(flavor_vim_id)
814 except vimconn.VimConnNotFoundException:
815 ro_vim_item_update_ok["vim_message"] = "already deleted"
816 except vimconn.VimConnException as e:
817 self.logger.error(
818 "ro_task={} vim={} del-flavor={}: {}".format(
819 ro_task["_id"], ro_task["target_id"], flavor_vim_id, e
820 )
821 )
822 ro_vim_item_update = {
823 "vim_status": "VIM_ERROR",
824 "vim_message": "Error while deleting: {}".format(e),
825 }
826
827 return "FAILED", ro_vim_item_update
828
829 self.logger.debug(
830 "task={} {} del-flavor={} {}".format(
831 task_id,
832 ro_task["target_id"],
833 flavor_vim_id,
834 ro_vim_item_update_ok.get("vim_message", ""),
835 )
836 )
837
838 return "DONE", ro_vim_item_update_ok
839
840 def new(self, ro_task, task_index, task_depends):
841 task = ro_task["tasks"][task_index]
842 task_id = task["task_id"]
843 created = False
844 created_items = {}
845 target_vim = self.my_vims[ro_task["target_id"]]
846 try:
847 # FIND
848 vim_flavor_id = None
849
850 if task.get("find_params"):
851 try:
852 flavor_data = task["find_params"]["flavor_data"]
853 vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
854 except vimconn.VimConnNotFoundException as flavor_not_found_msg:
855 self.logger.warning(
856 f"VimConnNotFoundException occured: {flavor_not_found_msg}"
857 )
858
859 if not vim_flavor_id and task.get("params"):
860 # CREATE
861 flavor_data = task["params"]["flavor_data"]
862 vim_flavor_id = target_vim.new_flavor(flavor_data)
863 created = True
864
865 ro_vim_item_update = {
866 "vim_id": vim_flavor_id,
867 "vim_status": "ACTIVE",
868 "created": created,
869 "created_items": created_items,
870 "vim_details": None,
871 "vim_message": None,
872 }
873 self.logger.debug(
874 "task={} {} new-flavor={} created={}".format(
875 task_id, ro_task["target_id"], vim_flavor_id, created
876 )
877 )
878
879 return "DONE", ro_vim_item_update
880 except (vimconn.VimConnException, NsWorkerException) as e:
881 self.logger.error(
882 "task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e)
883 )
884 ro_vim_item_update = {
885 "vim_status": "VIM_ERROR",
886 "created": created,
887 "vim_message": str(e),
888 }
889
890 return "FAILED", ro_vim_item_update
891
892
893 class VimInteractionAffinityGroup(VimInteractionBase):
894 def delete(self, ro_task, task_index):
895 task = ro_task["tasks"][task_index]
896 task_id = task["task_id"]
897 affinity_group_vim_id = ro_task["vim_info"]["vim_id"]
898 ro_vim_item_update_ok = {
899 "vim_status": "DELETED",
900 "created": False,
901 "vim_message": "DELETED",
902 "vim_id": None,
903 }
904
905 try:
906 if affinity_group_vim_id:
907 target_vim = self.my_vims[ro_task["target_id"]]
908 target_vim.delete_affinity_group(affinity_group_vim_id)
909 except vimconn.VimConnNotFoundException:
910 ro_vim_item_update_ok["vim_message"] = "already deleted"
911 except vimconn.VimConnException as e:
912 self.logger.error(
913 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format(
914 ro_task["_id"], ro_task["target_id"], affinity_group_vim_id, e
915 )
916 )
917 ro_vim_item_update = {
918 "vim_status": "VIM_ERROR",
919 "vim_message": "Error while deleting: {}".format(e),
920 }
921
922 return "FAILED", ro_vim_item_update
923
924 self.logger.debug(
925 "task={} {} del-affinity-or-anti-affinity-group={} {}".format(
926 task_id,
927 ro_task["target_id"],
928 affinity_group_vim_id,
929 ro_vim_item_update_ok.get("vim_message", ""),
930 )
931 )
932
933 return "DONE", ro_vim_item_update_ok
934
935 def new(self, ro_task, task_index, task_depends):
936 task = ro_task["tasks"][task_index]
937 task_id = task["task_id"]
938 created = False
939 created_items = {}
940 target_vim = self.my_vims[ro_task["target_id"]]
941
942 try:
943 affinity_group_vim_id = None
944 affinity_group_data = None
945 param_affinity_group_id = ""
946
947 if task.get("params"):
948 affinity_group_data = task["params"].get("affinity_group_data")
949
950 if affinity_group_data and affinity_group_data.get("vim-affinity-group-id"):
951 try:
952 param_affinity_group_id = task["params"]["affinity_group_data"].get(
953 "vim-affinity-group-id"
954 )
955 affinity_group_vim_id = target_vim.get_affinity_group(
956 param_affinity_group_id
957 ).get("id")
958 except vimconn.VimConnNotFoundException:
959 self.logger.error(
960 "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}"
961 "could not be found at VIM. Creating a new one.".format(
962 task_id, ro_task["target_id"], param_affinity_group_id
963 )
964 )
965
966 if not affinity_group_vim_id and affinity_group_data:
967 affinity_group_vim_id = target_vim.new_affinity_group(
968 affinity_group_data
969 )
970 created = True
971
972 ro_vim_item_update = {
973 "vim_id": affinity_group_vim_id,
974 "vim_status": "ACTIVE",
975 "created": created,
976 "created_items": created_items,
977 "vim_details": None,
978 "vim_message": None,
979 }
980 self.logger.debug(
981 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format(
982 task_id, ro_task["target_id"], affinity_group_vim_id, created
983 )
984 )
985
986 return "DONE", ro_vim_item_update
987 except (vimconn.VimConnException, NsWorkerException) as e:
988 self.logger.error(
989 "task={} vim={} new-affinity-or-anti-affinity-group:"
990 " {}".format(task_id, ro_task["target_id"], e)
991 )
992 ro_vim_item_update = {
993 "vim_status": "VIM_ERROR",
994 "created": created,
995 "vim_message": str(e),
996 }
997
998 return "FAILED", ro_vim_item_update
999
1000
1001 class VimInteractionUpdateVdu(VimInteractionBase):
1002 def exec(self, ro_task, task_index, task_depends):
1003 task = ro_task["tasks"][task_index]
1004 task_id = task["task_id"]
1005 db_task_update = {"retries": 0}
1006 created = False
1007 created_items = {}
1008 target_vim = self.my_vims[ro_task["target_id"]]
1009
1010 try:
1011 vim_vm_id = ""
1012 if task.get("params"):
1013 vim_vm_id = task["params"].get("vim_vm_id")
1014 action = task["params"].get("action")
1015 context = {action: action}
1016 target_vim.action_vminstance(vim_vm_id, context)
1017 # created = True
1018 ro_vim_item_update = {
1019 "vim_id": vim_vm_id,
1020 "vim_status": "ACTIVE",
1021 "created": created,
1022 "created_items": created_items,
1023 "vim_details": None,
1024 "vim_message": None,
1025 }
1026 self.logger.debug(
1027 "task={} {} vm-migration done".format(task_id, ro_task["target_id"])
1028 )
1029 return "DONE", ro_vim_item_update, db_task_update
1030 except (vimconn.VimConnException, NsWorkerException) as e:
1031 self.logger.error(
1032 "task={} vim={} VM Migration:"
1033 " {}".format(task_id, ro_task["target_id"], e)
1034 )
1035 ro_vim_item_update = {
1036 "vim_status": "VIM_ERROR",
1037 "created": created,
1038 "vim_message": str(e),
1039 }
1040
1041 return "FAILED", ro_vim_item_update, db_task_update
1042
1043
1044 class VimInteractionSdnNet(VimInteractionBase):
1045 @staticmethod
1046 def _match_pci(port_pci, mapping):
1047 """
1048 Check if port_pci matches with mapping.
1049 The mapping can have brackets to indicate that several chars are accepted. e.g
1050 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
1051 :param port_pci: text
1052 :param mapping: text, can contain brackets to indicate several chars are available
1053 :return: True if matches, False otherwise
1054 """
1055 if not port_pci or not mapping:
1056 return False
1057 if port_pci == mapping:
1058 return True
1059
1060 mapping_index = 0
1061 pci_index = 0
1062 while True:
1063 bracket_start = mapping.find("[", mapping_index)
1064
1065 if bracket_start == -1:
1066 break
1067
1068 bracket_end = mapping.find("]", bracket_start)
1069 if bracket_end == -1:
1070 break
1071
1072 length = bracket_start - mapping_index
1073 if (
1074 length
1075 and port_pci[pci_index : pci_index + length]
1076 != mapping[mapping_index:bracket_start]
1077 ):
1078 return False
1079
1080 if (
1081 port_pci[pci_index + length]
1082 not in mapping[bracket_start + 1 : bracket_end]
1083 ):
1084 return False
1085
1086 pci_index += length + 1
1087 mapping_index = bracket_end + 1
1088
1089 if port_pci[pci_index:] != mapping[mapping_index:]:
1090 return False
1091
1092 return True
1093
1094 def _get_interfaces(self, vlds_to_connect, vim_account_id):
1095 """
1096 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
1097 :param vim_account_id:
1098 :return:
1099 """
1100 interfaces = []
1101
1102 for vld in vlds_to_connect:
1103 table, _, db_id = vld.partition(":")
1104 db_id, _, vld = db_id.partition(":")
1105 _, _, vld_id = vld.partition(".")
1106
1107 if table == "vnfrs":
1108 q_filter = {"vim-account-id": vim_account_id, "_id": db_id}
1109 iface_key = "vnf-vld-id"
1110 else: # table == "nsrs"
1111 q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id}
1112 iface_key = "ns-vld-id"
1113
1114 db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter)
1115
1116 for db_vnfr in db_vnfrs:
1117 for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())):
1118 for iface_index, interface in enumerate(vdur["interfaces"]):
1119 if interface.get(iface_key) == vld_id and interface.get(
1120 "type"
1121 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
1122 # only SR-IOV o PT
1123 interface_ = interface.copy()
1124 interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
1125 db_vnfr["_id"], vdu_index, iface_index
1126 )
1127
1128 if vdur.get("status") == "ERROR":
1129 interface_["status"] = "ERROR"
1130
1131 interfaces.append(interface_)
1132
1133 return interfaces
1134
1135 def refresh(self, ro_task):
1136 # look for task create
1137 task_create_index, _ = next(
1138 i_t
1139 for i_t in enumerate(ro_task["tasks"])
1140 if i_t[1]
1141 and i_t[1]["action"] == "CREATE"
1142 and i_t[1]["status"] != "FINISHED"
1143 )
1144
1145 return self.new(ro_task, task_create_index, None)
1146
1147 def new(self, ro_task, task_index, task_depends):
1148 task = ro_task["tasks"][task_index]
1149 task_id = task["task_id"]
1150 target_vim = self.my_vims[ro_task["target_id"]]
1151
1152 sdn_net_id = ro_task["vim_info"]["vim_id"]
1153
1154 created_items = ro_task["vim_info"].get("created_items")
1155 connected_ports = ro_task["vim_info"].get("connected_ports", [])
1156 new_connected_ports = []
1157 last_update = ro_task["vim_info"].get("last_update", 0)
1158 sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD"
1159 error_list = []
1160 created = ro_task["vim_info"].get("created", False)
1161
1162 try:
1163 # CREATE
1164 db_vim = {}
1165 params = task["params"]
1166 vlds_to_connect = params.get("vlds", [])
1167 associated_vim = params.get("target_vim")
1168 # external additional ports
1169 additional_ports = params.get("sdn-ports") or ()
1170 _, _, vim_account_id = (
1171 (None, None, None)
1172 if associated_vim is None
1173 else associated_vim.partition(":")
1174 )
1175
1176 if associated_vim:
1177 # get associated VIM
1178 if associated_vim not in self.db_vims:
1179 self.db_vims[associated_vim] = self.db.get_one(
1180 "vim_accounts", {"_id": vim_account_id}
1181 )
1182
1183 db_vim = self.db_vims[associated_vim]
1184
1185 # look for ports to connect
1186 ports = self._get_interfaces(vlds_to_connect, vim_account_id)
1187 # print(ports)
1188
1189 sdn_ports = []
1190 pending_ports = error_ports = 0
1191 vlan_used = None
1192 sdn_need_update = False
1193
1194 for port in ports:
1195 vlan_used = port.get("vlan") or vlan_used
1196
1197 # TODO. Do not connect if already done
1198 if not port.get("compute_node") or not port.get("pci"):
1199 if port.get("status") == "ERROR":
1200 error_ports += 1
1201 else:
1202 pending_ports += 1
1203 continue
1204
1205 pmap = None
1206 compute_node_mappings = next(
1207 (
1208 c
1209 for c in db_vim["config"].get("sdn-port-mapping", ())
1210 if c and c["compute_node"] == port["compute_node"]
1211 ),
1212 None,
1213 )
1214
1215 if compute_node_mappings:
1216 # process port_mapping pci of type 0000:af:1[01].[1357]
1217 pmap = next(
1218 (
1219 p
1220 for p in compute_node_mappings["ports"]
1221 if self._match_pci(port["pci"], p.get("pci"))
1222 ),
1223 None,
1224 )
1225
1226 if not pmap:
1227 if not db_vim["config"].get("mapping_not_needed"):
1228 error_list.append(
1229 "Port mapping not found for compute_node={} pci={}".format(
1230 port["compute_node"], port["pci"]
1231 )
1232 )
1233 continue
1234
1235 pmap = {}
1236
1237 service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"])
1238 new_port = {
1239 "service_endpoint_id": pmap.get("service_endpoint_id")
1240 or service_endpoint_id,
1241 "service_endpoint_encapsulation_type": "dot1q"
1242 if port["type"] == "SR-IOV"
1243 else None,
1244 "service_endpoint_encapsulation_info": {
1245 "vlan": port.get("vlan"),
1246 "mac": port.get("mac-address"),
1247 "device_id": pmap.get("device_id") or port["compute_node"],
1248 "device_interface_id": pmap.get("device_interface_id")
1249 or port["pci"],
1250 "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"),
1251 "switch_port": pmap.get("switch_port"),
1252 "service_mapping_info": pmap.get("service_mapping_info"),
1253 },
1254 }
1255
1256 # TODO
1257 # if port["modified_at"] > last_update:
1258 # sdn_need_update = True
1259 new_connected_ports.append(port["id"]) # TODO
1260 sdn_ports.append(new_port)
1261
1262 if error_ports:
1263 error_list.append(
1264 "{} interfaces have not been created as VDU is on ERROR status".format(
1265 error_ports
1266 )
1267 )
1268
1269 # connect external ports
1270 for index, additional_port in enumerate(additional_ports):
1271 additional_port_id = additional_port.get(
1272 "service_endpoint_id"
1273 ) or "external-{}".format(index)
1274 sdn_ports.append(
1275 {
1276 "service_endpoint_id": additional_port_id,
1277 "service_endpoint_encapsulation_type": additional_port.get(
1278 "service_endpoint_encapsulation_type", "dot1q"
1279 ),
1280 "service_endpoint_encapsulation_info": {
1281 "vlan": additional_port.get("vlan") or vlan_used,
1282 "mac": additional_port.get("mac_address"),
1283 "device_id": additional_port.get("device_id"),
1284 "device_interface_id": additional_port.get(
1285 "device_interface_id"
1286 ),
1287 "switch_dpid": additional_port.get("switch_dpid")
1288 or additional_port.get("switch_id"),
1289 "switch_port": additional_port.get("switch_port"),
1290 "service_mapping_info": additional_port.get(
1291 "service_mapping_info"
1292 ),
1293 },
1294 }
1295 )
1296 new_connected_ports.append(additional_port_id)
1297 sdn_info = ""
1298
1299 # if there are more ports to connect or they have been modified, call create/update
1300 if error_list:
1301 sdn_status = "ERROR"
1302 sdn_info = "; ".join(error_list)
1303 elif set(connected_ports) != set(new_connected_ports) or sdn_need_update:
1304 last_update = time.time()
1305
1306 if not sdn_net_id:
1307 if len(sdn_ports) < 2:
1308 sdn_status = "ACTIVE"
1309
1310 if not pending_ports:
1311 self.logger.debug(
1312 "task={} {} new-sdn-net done, less than 2 ports".format(
1313 task_id, ro_task["target_id"]
1314 )
1315 )
1316 else:
1317 net_type = params.get("type") or "ELAN"
1318 (
1319 sdn_net_id,
1320 created_items,
1321 ) = target_vim.create_connectivity_service(net_type, sdn_ports)
1322 created = True
1323 self.logger.debug(
1324 "task={} {} new-sdn-net={} created={}".format(
1325 task_id, ro_task["target_id"], sdn_net_id, created
1326 )
1327 )
1328 else:
1329 created_items = target_vim.edit_connectivity_service(
1330 sdn_net_id, conn_info=created_items, connection_points=sdn_ports
1331 )
1332 created = True
1333 self.logger.debug(
1334 "task={} {} update-sdn-net={} created={}".format(
1335 task_id, ro_task["target_id"], sdn_net_id, created
1336 )
1337 )
1338
1339 connected_ports = new_connected_ports
1340 elif sdn_net_id:
1341 wim_status_dict = target_vim.get_connectivity_service_status(
1342 sdn_net_id, conn_info=created_items
1343 )
1344 sdn_status = wim_status_dict["sdn_status"]
1345
1346 if wim_status_dict.get("sdn_info"):
1347 sdn_info = str(wim_status_dict.get("sdn_info")) or ""
1348
1349 if wim_status_dict.get("error_msg"):
1350 sdn_info = wim_status_dict.get("error_msg") or ""
1351
1352 if pending_ports:
1353 if sdn_status != "ERROR":
1354 sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1355 len(ports) - pending_ports, len(ports)
1356 )
1357
1358 if sdn_status == "ACTIVE":
1359 sdn_status = "BUILD"
1360
1361 ro_vim_item_update = {
1362 "vim_id": sdn_net_id,
1363 "vim_status": sdn_status,
1364 "created": created,
1365 "created_items": created_items,
1366 "connected_ports": connected_ports,
1367 "vim_details": sdn_info,
1368 "vim_message": None,
1369 "last_update": last_update,
1370 }
1371
1372 return sdn_status, ro_vim_item_update
1373 except Exception as e:
1374 self.logger.error(
1375 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e),
1376 exc_info=not isinstance(
1377 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1378 ),
1379 )
1380 ro_vim_item_update = {
1381 "vim_status": "VIM_ERROR",
1382 "created": created,
1383 "vim_message": str(e),
1384 }
1385
1386 return "FAILED", ro_vim_item_update
1387
1388 def delete(self, ro_task, task_index):
1389 task = ro_task["tasks"][task_index]
1390 task_id = task["task_id"]
1391 sdn_vim_id = ro_task["vim_info"].get("vim_id")
1392 ro_vim_item_update_ok = {
1393 "vim_status": "DELETED",
1394 "created": False,
1395 "vim_message": "DELETED",
1396 "vim_id": None,
1397 }
1398
1399 try:
1400 if sdn_vim_id:
1401 target_vim = self.my_vims[ro_task["target_id"]]
1402 target_vim.delete_connectivity_service(
1403 sdn_vim_id, ro_task["vim_info"].get("created_items")
1404 )
1405
1406 except Exception as e:
1407 if (
1408 isinstance(e, sdnconn.SdnConnectorError)
1409 and e.http_code == HTTPStatus.NOT_FOUND.value
1410 ):
1411 ro_vim_item_update_ok["vim_message"] = "already deleted"
1412 else:
1413 self.logger.error(
1414 "ro_task={} vim={} del-sdn-net={}: {}".format(
1415 ro_task["_id"], ro_task["target_id"], sdn_vim_id, e
1416 ),
1417 exc_info=not isinstance(
1418 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1419 ),
1420 )
1421 ro_vim_item_update = {
1422 "vim_status": "VIM_ERROR",
1423 "vim_message": "Error while deleting: {}".format(e),
1424 }
1425
1426 return "FAILED", ro_vim_item_update
1427
1428 self.logger.debug(
1429 "task={} {} del-sdn-net={} {}".format(
1430 task_id,
1431 ro_task["target_id"],
1432 sdn_vim_id,
1433 ro_vim_item_update_ok.get("vim_message", ""),
1434 )
1435 )
1436
1437 return "DONE", ro_vim_item_update_ok
1438
1439
1440 class VimInteractionMigration(VimInteractionBase):
1441 def exec(self, ro_task, task_index, task_depends):
1442 task = ro_task["tasks"][task_index]
1443 task_id = task["task_id"]
1444 db_task_update = {"retries": 0}
1445 target_vim = self.my_vims[ro_task["target_id"]]
1446 vim_interfaces = []
1447 created = False
1448 created_items = {}
1449 refreshed_vim_info = {}
1450
1451 try:
1452 vim_vm_id = ""
1453 if task.get("params"):
1454 vim_vm_id = task["params"].get("vim_vm_id")
1455 migrate_host = task["params"].get("migrate_host")
1456 _, migrated_compute_node = target_vim.migrate_instance(
1457 vim_vm_id, migrate_host
1458 )
1459
1460 if migrated_compute_node:
1461 # When VM is migrated, vdu["vim_info"] needs to be updated
1462 vdu_old_vim_info = task["params"]["vdu_vim_info"].get(
1463 ro_task["target_id"]
1464 )
1465
1466 # Refresh VM to get new vim_info
1467 vm_to_refresh_list = [vim_vm_id]
1468 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
1469 refreshed_vim_info = vim_dict[vim_vm_id]
1470
1471 if refreshed_vim_info.get("interfaces"):
1472 for old_iface in vdu_old_vim_info.get("interfaces"):
1473 iface = next(
1474 (
1475 iface
1476 for iface in refreshed_vim_info["interfaces"]
1477 if old_iface["vim_interface_id"]
1478 == iface["vim_interface_id"]
1479 ),
1480 None,
1481 )
1482 vim_interfaces.append(iface)
1483
1484 ro_vim_item_update = {
1485 "vim_id": vim_vm_id,
1486 "vim_status": "ACTIVE",
1487 "created": created,
1488 "created_items": created_items,
1489 "vim_details": None,
1490 "vim_message": None,
1491 }
1492
1493 if refreshed_vim_info and refreshed_vim_info.get("status") not in (
1494 "ERROR",
1495 "VIM_ERROR",
1496 ):
1497 ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"]
1498
1499 if vim_interfaces:
1500 ro_vim_item_update["interfaces"] = vim_interfaces
1501
1502 self.logger.debug(
1503 "task={} {} vm-migration done".format(task_id, ro_task["target_id"])
1504 )
1505
1506 return "DONE", ro_vim_item_update, db_task_update
1507
1508 except (vimconn.VimConnException, NsWorkerException) as e:
1509 self.logger.error(
1510 "task={} vim={} VM Migration:"
1511 " {}".format(task_id, ro_task["target_id"], e)
1512 )
1513 ro_vim_item_update = {
1514 "vim_status": "VIM_ERROR",
1515 "created": created,
1516 "vim_message": str(e),
1517 }
1518
1519 return "FAILED", ro_vim_item_update, db_task_update
1520
1521
1522 class VimInteractionResize(VimInteractionBase):
1523 def exec(self, ro_task, task_index, task_depends):
1524 task = ro_task["tasks"][task_index]
1525 task_id = task["task_id"]
1526 db_task_update = {"retries": 0}
1527 created = False
1528 target_flavor_uuid = None
1529 created_items = {}
1530 refreshed_vim_info = {}
1531 target_vim = self.my_vims[ro_task["target_id"]]
1532
1533 try:
1534 vim_vm_id = ""
1535 if task.get("params"):
1536 vim_vm_id = task["params"].get("vim_vm_id")
1537 flavor_dict = task["params"].get("flavor_dict")
1538 self.logger.info("flavor_dict %s", flavor_dict)
1539
1540 try:
1541 target_flavor_uuid = target_vim.get_flavor_id_from_data(flavor_dict)
1542 except Exception as e:
1543 self.logger.info("Cannot find any flavor matching %s.", str(e))
1544 try:
1545 target_flavor_uuid = target_vim.new_flavor(flavor_dict)
1546 except Exception as e:
1547 self.logger.error("Error creating flavor at VIM %s.", str(e))
1548
1549 if target_flavor_uuid is not None:
1550 resized_status = target_vim.resize_instance(
1551 vim_vm_id, target_flavor_uuid
1552 )
1553
1554 if resized_status:
1555 # Refresh VM to get new vim_info
1556 vm_to_refresh_list = [vim_vm_id]
1557 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
1558 refreshed_vim_info = vim_dict[vim_vm_id]
1559
1560 ro_vim_item_update = {
1561 "vim_id": vim_vm_id,
1562 "vim_status": "ACTIVE",
1563 "created": created,
1564 "created_items": created_items,
1565 "vim_details": None,
1566 "vim_message": None,
1567 }
1568
1569 if refreshed_vim_info and refreshed_vim_info.get("status") not in (
1570 "ERROR",
1571 "VIM_ERROR",
1572 ):
1573 ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"]
1574
1575 self.logger.debug(
1576 "task={} {} resize done".format(task_id, ro_task["target_id"])
1577 )
1578 return "DONE", ro_vim_item_update, db_task_update
1579 except (vimconn.VimConnException, NsWorkerException) as e:
1580 self.logger.error(
1581 "task={} vim={} Resize:" " {}".format(task_id, ro_task["target_id"], e)
1582 )
1583 ro_vim_item_update = {
1584 "vim_status": "VIM_ERROR",
1585 "created": created,
1586 "vim_message": str(e),
1587 }
1588
1589 return "FAILED", ro_vim_item_update, db_task_update
1590
1591
1592 class ConfigValidate:
1593 def __init__(self, config: Dict):
1594 self.conf = config
1595
1596 @property
1597 def active(self):
1598 # default 1 min, allowed >= 60 or -1, -1 disables periodic checks
1599 if (
1600 self.conf["period"]["refresh_active"] >= 60
1601 or self.conf["period"]["refresh_active"] == -1
1602 ):
1603 return self.conf["period"]["refresh_active"]
1604
1605 return 60
1606
1607 @property
1608 def build(self):
1609 return self.conf["period"]["refresh_build"]
1610
1611 @property
1612 def image(self):
1613 return self.conf["period"]["refresh_image"]
1614
1615 @property
1616 def error(self):
1617 return self.conf["period"]["refresh_error"]
1618
1619 @property
1620 def queue_size(self):
1621 return self.conf["period"]["queue_size"]
1622
1623
1624 class NsWorker(threading.Thread):
1625 def __init__(self, worker_index, config, plugins, db):
1626 """
1627 :param worker_index: thread index
1628 :param config: general configuration of RO, among others the process_id with the docker id where it runs
1629 :param plugins: global shared dict with the loaded plugins
1630 :param db: database class instance to use
1631 """
1632 threading.Thread.__init__(self)
1633 self.config = config
1634 self.plugins = plugins
1635 self.plugin_name = "unknown"
1636 self.logger = logging.getLogger("ro.worker{}".format(worker_index))
1637 self.worker_index = worker_index
1638 # refresh periods for created items
1639 self.refresh_config = ConfigValidate(config)
1640 self.task_queue = queue.Queue(self.refresh_config.queue_size)
1641 # targetvim: vimplugin class
1642 self.my_vims = {}
1643 # targetvim: vim information from database
1644 self.db_vims = {}
1645 # targetvim list
1646 self.vim_targets = []
1647 self.my_id = config["process_id"] + ":" + str(worker_index)
1648 self.db = db
1649 self.item2class = {
1650 "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger),
1651 "shared-volumes": VimInteractionSharedVolume(
1652 self.db, self.my_vims, self.db_vims, self.logger
1653 ),
1654 "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger),
1655 "image": VimInteractionImage(
1656 self.db, self.my_vims, self.db_vims, self.logger
1657 ),
1658 "flavor": VimInteractionFlavor(
1659 self.db, self.my_vims, self.db_vims, self.logger
1660 ),
1661 "sdn_net": VimInteractionSdnNet(
1662 self.db, self.my_vims, self.db_vims, self.logger
1663 ),
1664 "update": VimInteractionUpdateVdu(
1665 self.db, self.my_vims, self.db_vims, self.logger
1666 ),
1667 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
1668 self.db, self.my_vims, self.db_vims, self.logger
1669 ),
1670 "migrate": VimInteractionMigration(
1671 self.db, self.my_vims, self.db_vims, self.logger
1672 ),
1673 "verticalscale": VimInteractionResize(
1674 self.db, self.my_vims, self.db_vims, self.logger
1675 ),
1676 }
1677 self.time_last_task_processed = None
1678 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1679 self.tasks_to_delete = []
1680 # it is idle when there are not vim_targets associated
1681 self.idle = True
1682 self.task_locked_time = config["global"]["task_locked_time"]
1683
1684 def insert_task(self, task):
1685 try:
1686 self.task_queue.put(task, False)
1687 return None
1688 except queue.Full:
1689 raise NsWorkerException("timeout inserting a task")
1690
1691 def terminate(self):
1692 self.insert_task("exit")
1693
1694 def del_task(self, task):
1695 with self.task_lock:
1696 if task["status"] == "SCHEDULED":
1697 task["status"] = "SUPERSEDED"
1698 return True
1699 else: # task["status"] == "processing"
1700 self.task_lock.release()
1701 return False
1702
1703 def _process_vim_config(self, target_id: str, db_vim: dict) -> None:
1704 """
1705 Process vim config, creating vim configuration files as ca_cert
1706 :param target_id: vim/sdn/wim + id
1707 :param db_vim: Vim dictionary obtained from database
1708 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1709 """
1710 if not db_vim.get("config"):
1711 return
1712
1713 file_name = ""
1714 work_dir = "/app/osm_ro/certs"
1715
1716 try:
1717 if db_vim["config"].get("ca_cert_content"):
1718 file_name = f"{work_dir}/{target_id}:{self.worker_index}"
1719
1720 if not path.isdir(file_name):
1721 makedirs(file_name)
1722
1723 file_name = file_name + "/ca_cert"
1724
1725 with open(file_name, "w") as f:
1726 f.write(db_vim["config"]["ca_cert_content"])
1727 del db_vim["config"]["ca_cert_content"]
1728 db_vim["config"]["ca_cert"] = file_name
1729 except Exception as e:
1730 raise NsWorkerException(
1731 "Error writing to file '{}': {}".format(file_name, e)
1732 )
1733
1734 def _load_plugin(self, name, type="vim"):
1735 # type can be vim or sdn
1736 if "rovim_dummy" not in self.plugins:
1737 self.plugins["rovim_dummy"] = VimDummyConnector
1738
1739 if "rosdn_dummy" not in self.plugins:
1740 self.plugins["rosdn_dummy"] = SdnDummyConnector
1741
1742 if name in self.plugins:
1743 return self.plugins[name]
1744
1745 try:
1746 for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name):
1747 self.plugins[name] = ep.load()
1748 except Exception as e:
1749 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e))
1750
1751 if name and name not in self.plugins:
1752 raise NsWorkerException(
1753 "Plugin 'osm_{n}' has not been installed".format(n=name)
1754 )
1755
1756 return self.plugins[name]
1757
1758 def _unload_vim(self, target_id):
1759 """
1760 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1761 :param target_id: Contains type:_id; where type can be 'vim', ...
1762 :return: None.
1763 """
1764 try:
1765 self.db_vims.pop(target_id, None)
1766 self.my_vims.pop(target_id, None)
1767
1768 if target_id in self.vim_targets:
1769 self.vim_targets.remove(target_id)
1770
1771 self.logger.info("Unloaded {}".format(target_id))
1772 except Exception as e:
1773 self.logger.error("Cannot unload {}: {}".format(target_id, e))
1774
1775 def _check_vim(self, target_id):
1776 """
1777 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1778 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1779 :return: None.
1780 """
1781 target, _, _id = target_id.partition(":")
1782 now = time.time()
1783 update_dict = {}
1784 unset_dict = {}
1785 op_text = ""
1786 step = ""
1787 loaded = target_id in self.vim_targets
1788 target_database = (
1789 "vim_accounts"
1790 if target == "vim"
1791 else "wim_accounts"
1792 if target == "wim"
1793 else "sdns"
1794 )
1795 error_text = ""
1796
1797 try:
1798 step = "Getting {} from db".format(target_id)
1799 db_vim = self.db.get_one(target_database, {"_id": _id})
1800
1801 for op_index, operation in enumerate(
1802 db_vim["_admin"].get("operations", ())
1803 ):
1804 if operation["operationState"] != "PROCESSING":
1805 continue
1806
1807 locked_at = operation.get("locked_at")
1808
1809 if locked_at is not None and locked_at >= now - self.task_locked_time:
1810 # some other thread is doing this operation
1811 return
1812
1813 # lock
1814 op_text = "_admin.operations.{}.".format(op_index)
1815
1816 if not self.db.set_one(
1817 target_database,
1818 q_filter={
1819 "_id": _id,
1820 op_text + "operationState": "PROCESSING",
1821 op_text + "locked_at": locked_at,
1822 },
1823 update_dict={
1824 op_text + "locked_at": now,
1825 "admin.current_operation": op_index,
1826 },
1827 fail_on_empty=False,
1828 ):
1829 return
1830
1831 unset_dict[op_text + "locked_at"] = None
1832 unset_dict["current_operation"] = None
1833 step = "Loading " + target_id
1834 error_text = self._load_vim(target_id)
1835
1836 if not error_text:
1837 step = "Checking connectivity"
1838
1839 if target == "vim":
1840 self.my_vims[target_id].check_vim_connectivity()
1841 else:
1842 self.my_vims[target_id].check_credentials()
1843
1844 update_dict["_admin.operationalState"] = "ENABLED"
1845 update_dict["_admin.detailed-status"] = ""
1846 unset_dict[op_text + "detailed-status"] = None
1847 update_dict[op_text + "operationState"] = "COMPLETED"
1848
1849 return
1850
1851 except Exception as e:
1852 error_text = "{}: {}".format(step, e)
1853 self.logger.error("{} for {}: {}".format(step, target_id, e))
1854
1855 finally:
1856 if update_dict or unset_dict:
1857 if error_text:
1858 update_dict[op_text + "operationState"] = "FAILED"
1859 update_dict[op_text + "detailed-status"] = error_text
1860 unset_dict.pop(op_text + "detailed-status", None)
1861 update_dict["_admin.operationalState"] = "ERROR"
1862 update_dict["_admin.detailed-status"] = error_text
1863
1864 if op_text:
1865 update_dict[op_text + "statusEnteredTime"] = now
1866
1867 self.db.set_one(
1868 target_database,
1869 q_filter={"_id": _id},
1870 update_dict=update_dict,
1871 unset=unset_dict,
1872 fail_on_empty=False,
1873 )
1874
1875 if not loaded:
1876 self._unload_vim(target_id)
1877
1878 def _reload_vim(self, target_id):
1879 if target_id in self.vim_targets:
1880 self._load_vim(target_id)
1881 else:
1882 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1883 # just remove it to force load again next time it is needed
1884 self.db_vims.pop(target_id, None)
1885
1886 def _load_vim(self, target_id):
1887 """
1888 Load or reload a vim_account, sdn_controller or wim_account.
1889 Read content from database, load the plugin if not loaded.
1890 In case of error loading the plugin, it loads a failing VIM_connector
1891 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1892 :param target_id: Contains type:_id; where type can be 'vim', ...
1893 :return: None if ok, descriptive text if error
1894 """
1895 target, _, _id = target_id.partition(":")
1896 target_database = (
1897 "vim_accounts"
1898 if target == "vim"
1899 else "wim_accounts"
1900 if target == "wim"
1901 else "sdns"
1902 )
1903 plugin_name = ""
1904 vim = None
1905 step = "Getting {}={} from db".format(target, _id)
1906
1907 try:
1908 # TODO process for wim, sdnc, ...
1909 vim = self.db.get_one(target_database, {"_id": _id})
1910
1911 # if deep_get(vim, "config", "sdn-controller"):
1912 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1913 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1914
1915 step = "Decrypting password"
1916 schema_version = vim.get("schema_version")
1917 self.db.encrypt_decrypt_fields(
1918 vim,
1919 "decrypt",
1920 fields=("password", "secret"),
1921 schema_version=schema_version,
1922 salt=_id,
1923 )
1924 self._process_vim_config(target_id, vim)
1925
1926 if target == "vim":
1927 plugin_name = "rovim_" + vim["vim_type"]
1928 step = "Loading plugin '{}'".format(plugin_name)
1929 vim_module_conn = self._load_plugin(plugin_name)
1930 step = "Loading {}'".format(target_id)
1931 self.my_vims[target_id] = vim_module_conn(
1932 uuid=vim["_id"],
1933 name=vim["name"],
1934 tenant_id=vim.get("vim_tenant_id"),
1935 tenant_name=vim.get("vim_tenant_name"),
1936 url=vim["vim_url"],
1937 url_admin=None,
1938 user=vim["vim_user"],
1939 passwd=vim["vim_password"],
1940 config=vim.get("config") or {},
1941 persistent_info={},
1942 )
1943 else: # sdn
1944 plugin_name = "rosdn_" + (vim.get("type") or vim.get("wim_type"))
1945 step = "Loading plugin '{}'".format(plugin_name)
1946 vim_module_conn = self._load_plugin(plugin_name, "sdn")
1947 step = "Loading {}'".format(target_id)
1948 wim = deepcopy(vim)
1949 wim_config = wim.pop("config", {}) or {}
1950 wim["uuid"] = wim["_id"]
1951 if "url" in wim and "wim_url" not in wim:
1952 wim["wim_url"] = wim["url"]
1953 elif "url" not in wim and "wim_url" in wim:
1954 wim["url"] = wim["wim_url"]
1955
1956 if wim.get("dpid"):
1957 wim_config["dpid"] = wim.pop("dpid")
1958
1959 if wim.get("switch_id"):
1960 wim_config["switch_id"] = wim.pop("switch_id")
1961
1962 # wim, wim_account, config
1963 self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config)
1964 self.db_vims[target_id] = vim
1965 self.error_status = None
1966
1967 self.logger.info(
1968 "Connector loaded for {}, plugin={}".format(target_id, plugin_name)
1969 )
1970 except Exception as e:
1971 self.logger.error(
1972 "Cannot load {} plugin={}: {} {}".format(
1973 target_id, plugin_name, step, e
1974 )
1975 )
1976
1977 self.db_vims[target_id] = vim or {}
1978 self.db_vims[target_id] = FailingConnector(str(e))
1979 error_status = "{} Error: {}".format(step, e)
1980
1981 return error_status
1982 finally:
1983 if target_id not in self.vim_targets:
1984 self.vim_targets.append(target_id)
1985
1986 def _get_db_task(self):
1987 """
1988 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1989 :return: None
1990 """
1991 now = time.time()
1992
1993 if not self.time_last_task_processed:
1994 self.time_last_task_processed = now
1995
1996 try:
1997 while True:
1998 """
1999 # Log RO tasks only when loglevel is DEBUG
2000 if self.logger.getEffectiveLevel() == logging.DEBUG:
2001 self._log_ro_task(
2002 None,
2003 None,
2004 None,
2005 "TASK_WF",
2006 "task_locked_time="
2007 + str(self.task_locked_time)
2008 + " "
2009 + "time_last_task_processed="
2010 + str(self.time_last_task_processed)
2011 + " "
2012 + "now="
2013 + str(now),
2014 )
2015 """
2016 locked = self.db.set_one(
2017 "ro_tasks",
2018 q_filter={
2019 "target_id": self.vim_targets,
2020 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
2021 "locked_at.lt": now - self.task_locked_time,
2022 "to_check_at.lt": self.time_last_task_processed,
2023 "to_check_at.gt": -1,
2024 },
2025 update_dict={"locked_by": self.my_id, "locked_at": now},
2026 fail_on_empty=False,
2027 )
2028
2029 if locked:
2030 # read and return
2031 ro_task = self.db.get_one(
2032 "ro_tasks",
2033 q_filter={
2034 "target_id": self.vim_targets,
2035 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
2036 "locked_at": now,
2037 },
2038 )
2039 return ro_task
2040
2041 if self.time_last_task_processed == now:
2042 self.time_last_task_processed = None
2043 return None
2044 else:
2045 self.time_last_task_processed = now
2046 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
2047
2048 except DbException as e:
2049 self.logger.error("Database exception at _get_db_task: {}".format(e))
2050 except Exception as e:
2051 self.logger.critical(
2052 "Unexpected exception at _get_db_task: {}".format(e), exc_info=True
2053 )
2054
2055 return None
2056
2057 def _delete_task(self, ro_task, task_index, task_depends, db_update):
2058 """
2059 Determine if this task need to be done or superseded
2060 :return: None
2061 """
2062 my_task = ro_task["tasks"][task_index]
2063 task_id = my_task["task_id"]
2064 needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get(
2065 "created_items", False
2066 )
2067
2068 self.logger.debug("Needed delete: {}".format(needed_delete))
2069 if my_task["status"] == "FAILED":
2070 return None, None # TODO need to be retry??
2071
2072 try:
2073 for index, task in enumerate(ro_task["tasks"]):
2074 if index == task_index or not task:
2075 continue # own task
2076
2077 if (
2078 my_task["target_record"] == task["target_record"]
2079 and task["action"] == "CREATE"
2080 ):
2081 # set to finished
2082 db_update["tasks.{}.status".format(index)] = task[
2083 "status"
2084 ] = "FINISHED"
2085 elif task["action"] == "CREATE" and task["status"] not in (
2086 "FINISHED",
2087 "SUPERSEDED",
2088 ):
2089 needed_delete = False
2090
2091 if needed_delete:
2092 self.logger.debug(
2093 "Deleting ro_task={} task_index={}".format(ro_task, task_index)
2094 )
2095 return self.item2class[my_task["item"]].delete(ro_task, task_index)
2096 else:
2097 return "SUPERSEDED", None
2098 except Exception as e:
2099 if not isinstance(e, NsWorkerException):
2100 self.logger.critical(
2101 "Unexpected exception at _delete_task task={}: {}".format(
2102 task_id, e
2103 ),
2104 exc_info=True,
2105 )
2106
2107 return "FAILED", {"vim_status": "VIM_ERROR", "vim_message": str(e)}
2108
2109 def _create_task(self, ro_task, task_index, task_depends, db_update):
2110 """
2111 Determine if this task need to create something at VIM
2112 :return: None
2113 """
2114 my_task = ro_task["tasks"][task_index]
2115 task_id = my_task["task_id"]
2116
2117 if my_task["status"] == "FAILED":
2118 return None, None # TODO need to be retry??
2119 elif my_task["status"] == "SCHEDULED":
2120 # check if already created by another task
2121 for index, task in enumerate(ro_task["tasks"]):
2122 if index == task_index or not task:
2123 continue # own task
2124
2125 if task["action"] == "CREATE" and task["status"] not in (
2126 "SCHEDULED",
2127 "FINISHED",
2128 "SUPERSEDED",
2129 ):
2130 return task["status"], "COPY_VIM_INFO"
2131
2132 try:
2133 task_status, ro_vim_item_update = self.item2class[my_task["item"]].new(
2134 ro_task, task_index, task_depends
2135 )
2136 # TODO update other CREATE tasks
2137 except Exception as e:
2138 if not isinstance(e, NsWorkerException):
2139 self.logger.error(
2140 "Error executing task={}: {}".format(task_id, e), exc_info=True
2141 )
2142
2143 task_status = "FAILED"
2144 ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_message": str(e)}
2145 # TODO update ro_vim_item_update
2146
2147 return task_status, ro_vim_item_update
2148 else:
2149 return None, None
2150
2151 def _get_dependency(self, task_id, ro_task=None, target_id=None):
2152 """
2153 Look for dependency task
2154 :param task_id: Can be one of
2155 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2156 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2157 3. task.task_id: "<action_id>:number"
2158 :param ro_task:
2159 :param target_id:
2160 :return: database ro_task plus index of task
2161 """
2162 if (
2163 task_id.startswith("vim:")
2164 or task_id.startswith("sdn:")
2165 or task_id.startswith("wim:")
2166 ):
2167 target_id, _, task_id = task_id.partition(" ")
2168
2169 if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
2170 ro_task_dependency = self.db.get_one(
2171 "ro_tasks",
2172 q_filter={"target_id": target_id, "tasks.target_record_id": task_id},
2173 fail_on_empty=False,
2174 )
2175
2176 if ro_task_dependency:
2177 for task_index, task in enumerate(ro_task_dependency["tasks"]):
2178 if task["target_record_id"] == task_id:
2179 return ro_task_dependency, task_index
2180
2181 else:
2182 if ro_task:
2183 for task_index, task in enumerate(ro_task["tasks"]):
2184 if task and task["task_id"] == task_id:
2185 return ro_task, task_index
2186
2187 ro_task_dependency = self.db.get_one(
2188 "ro_tasks",
2189 q_filter={
2190 "tasks.ANYINDEX.task_id": task_id,
2191 "tasks.ANYINDEX.target_record.ne": None,
2192 },
2193 fail_on_empty=False,
2194 )
2195
2196 self.logger.debug("ro_task_dependency={}".format(ro_task_dependency))
2197 if ro_task_dependency:
2198 for task_index, task in enumerate(ro_task_dependency["tasks"]):
2199 if task["task_id"] == task_id:
2200 return ro_task_dependency, task_index
2201 raise NsWorkerException("Cannot get depending task {}".format(task_id))
2202
2203 def update_vm_refresh(self, ro_task):
2204 """Enables the VM status updates if self.refresh_config.active parameter
2205 is not -1 and then updates the DB accordingly
2206
2207 """
2208 try:
2209 self.logger.debug("Checking if VM status update config")
2210 next_refresh = time.time()
2211 next_refresh = self._get_next_refresh(ro_task, next_refresh)
2212
2213 if next_refresh != -1:
2214 db_ro_task_update = {}
2215 now = time.time()
2216 next_check_at = now + (24 * 60 * 60)
2217 next_check_at = min(next_check_at, next_refresh)
2218 db_ro_task_update["vim_info.refresh_at"] = next_refresh
2219 db_ro_task_update["to_check_at"] = next_check_at
2220
2221 self.logger.debug(
2222 "Finding tasks which to be updated to enable VM status updates"
2223 )
2224 refresh_tasks = self.db.get_list(
2225 "ro_tasks",
2226 q_filter={
2227 "tasks.status": "DONE",
2228 "to_check_at.lt": 0,
2229 },
2230 )
2231 self.logger.debug("Updating tasks to change the to_check_at status")
2232 for task in refresh_tasks:
2233 q_filter = {
2234 "_id": task["_id"],
2235 }
2236 self.db.set_one(
2237 "ro_tasks",
2238 q_filter=q_filter,
2239 update_dict=db_ro_task_update,
2240 fail_on_empty=True,
2241 )
2242
2243 except Exception as e:
2244 self.logger.error(f"Error updating tasks to enable VM status updates: {e}")
2245
2246 def _get_next_refresh(self, ro_task: dict, next_refresh: float):
2247 """Decide the next_refresh according to vim type and refresh config period.
2248 Args:
2249 ro_task (dict): ro_task details
2250 next_refresh (float): next refresh time as epoch format
2251
2252 Returns:
2253 next_refresh (float) -1 if vm updates are disabled or vim type is openstack.
2254 """
2255 target_vim = ro_task["target_id"]
2256 vim_type = self.db_vims[target_vim]["vim_type"]
2257 if self.refresh_config.active == -1 or vim_type == "openstack":
2258 next_refresh = -1
2259 else:
2260 next_refresh += self.refresh_config.active
2261 return next_refresh
2262
2263 def _process_pending_tasks(self, ro_task):
2264 ro_task_id = ro_task["_id"]
2265 now = time.time()
2266 # one day
2267 next_check_at = now + (24 * 60 * 60)
2268 db_ro_task_update = {}
2269
2270 def _update_refresh(new_status):
2271 # compute next_refresh
2272 nonlocal task
2273 nonlocal next_check_at
2274 nonlocal db_ro_task_update
2275 nonlocal ro_task
2276
2277 next_refresh = time.time()
2278
2279 if task["item"] in ("image", "flavor"):
2280 next_refresh += self.refresh_config.image
2281 elif new_status == "BUILD":
2282 next_refresh += self.refresh_config.build
2283 elif new_status == "DONE":
2284 next_refresh = self._get_next_refresh(ro_task, next_refresh)
2285 else:
2286 next_refresh += self.refresh_config.error
2287
2288 next_check_at = min(next_check_at, next_refresh)
2289 db_ro_task_update["vim_info.refresh_at"] = next_refresh
2290 ro_task["vim_info"]["refresh_at"] = next_refresh
2291
2292 try:
2293 """
2294 # Log RO tasks only when loglevel is DEBUG
2295 if self.logger.getEffectiveLevel() == logging.DEBUG:
2296 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
2297 """
2298 # Check if vim status refresh is enabled again
2299 self.update_vm_refresh(ro_task)
2300 # 0: get task_status_create
2301 lock_object = None
2302 task_status_create = None
2303 task_create = next(
2304 (
2305 t
2306 for t in ro_task["tasks"]
2307 if t
2308 and t["action"] == "CREATE"
2309 and t["status"] in ("BUILD", "DONE")
2310 ),
2311 None,
2312 )
2313
2314 if task_create:
2315 task_status_create = task_create["status"]
2316
2317 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
2318 for task_action in ("DELETE", "CREATE", "EXEC"):
2319 db_vim_update = None
2320 new_status = None
2321
2322 for task_index, task in enumerate(ro_task["tasks"]):
2323 if not task:
2324 continue # task deleted
2325
2326 task_depends = {}
2327 target_update = None
2328
2329 if (
2330 (
2331 task_action in ("DELETE", "EXEC")
2332 and task["status"] not in ("SCHEDULED", "BUILD")
2333 )
2334 or task["action"] != task_action
2335 or (
2336 task_action == "CREATE"
2337 and task["status"] in ("FINISHED", "SUPERSEDED")
2338 )
2339 ):
2340 continue
2341
2342 task_path = "tasks.{}.status".format(task_index)
2343 try:
2344 db_vim_info_update = None
2345 dependency_ro_task = {}
2346
2347 if task["status"] == "SCHEDULED":
2348 # check if tasks that this depends on have been completed
2349 dependency_not_completed = False
2350
2351 for dependency_task_id in task.get("depends_on") or ():
2352 (
2353 dependency_ro_task,
2354 dependency_task_index,
2355 ) = self._get_dependency(
2356 dependency_task_id, target_id=ro_task["target_id"]
2357 )
2358 dependency_task = dependency_ro_task["tasks"][
2359 dependency_task_index
2360 ]
2361 self.logger.debug(
2362 "dependency_ro_task={} dependency_task_index={}".format(
2363 dependency_ro_task, dependency_task_index
2364 )
2365 )
2366
2367 if dependency_task["status"] == "SCHEDULED":
2368 dependency_not_completed = True
2369 next_check_at = min(
2370 next_check_at, dependency_ro_task["to_check_at"]
2371 )
2372 # must allow dependent task to be processed first
2373 # to do this set time after last_task_processed
2374 next_check_at = max(
2375 self.time_last_task_processed, next_check_at
2376 )
2377 break
2378 elif dependency_task["status"] == "FAILED":
2379 error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
2380 task["action"],
2381 task["item"],
2382 dependency_task["action"],
2383 dependency_task["item"],
2384 dependency_task_id,
2385 dependency_ro_task["vim_info"].get(
2386 "vim_message"
2387 ),
2388 )
2389 self.logger.error(
2390 "task={} {}".format(task["task_id"], error_text)
2391 )
2392 raise NsWorkerException(error_text)
2393
2394 task_depends[dependency_task_id] = dependency_ro_task[
2395 "vim_info"
2396 ]["vim_id"]
2397 task_depends[
2398 "TASK-{}".format(dependency_task_id)
2399 ] = dependency_ro_task["vim_info"]["vim_id"]
2400
2401 if dependency_not_completed:
2402 self.logger.warning(
2403 "DEPENDENCY NOT COMPLETED {}".format(
2404 dependency_ro_task["vim_info"]["vim_id"]
2405 )
2406 )
2407 # TODO set at vim_info.vim_details that it is waiting
2408 continue
2409
2410 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
2411 # the task of renew this locking. It will update database locket_at periodically
2412 if not lock_object:
2413 lock_object = LockRenew.add_lock_object(
2414 "ro_tasks", ro_task, self
2415 )
2416 if task["action"] == "DELETE":
2417 (
2418 new_status,
2419 db_vim_info_update,
2420 ) = self._delete_task(
2421 ro_task, task_index, task_depends, db_ro_task_update
2422 )
2423 new_status = (
2424 "FINISHED" if new_status == "DONE" else new_status
2425 )
2426 # ^with FINISHED instead of DONE it will not be refreshing
2427
2428 if new_status in ("FINISHED", "SUPERSEDED"):
2429 target_update = "DELETE"
2430 elif task["action"] == "EXEC":
2431 (
2432 new_status,
2433 db_vim_info_update,
2434 db_task_update,
2435 ) = self.item2class[task["item"]].exec(
2436 ro_task, task_index, task_depends
2437 )
2438 new_status = (
2439 "FINISHED" if new_status == "DONE" else new_status
2440 )
2441 # ^with FINISHED instead of DONE it will not be refreshing
2442
2443 if db_task_update:
2444 # load into database the modified db_task_update "retries" and "next_retry"
2445 if db_task_update.get("retries"):
2446 db_ro_task_update[
2447 "tasks.{}.retries".format(task_index)
2448 ] = db_task_update["retries"]
2449
2450 next_check_at = time.time() + db_task_update.get(
2451 "next_retry", 60
2452 )
2453 target_update = None
2454 elif task["action"] == "CREATE":
2455 if task["status"] == "SCHEDULED":
2456 if task_status_create:
2457 new_status = task_status_create
2458 target_update = "COPY_VIM_INFO"
2459 else:
2460 new_status, db_vim_info_update = self.item2class[
2461 task["item"]
2462 ].new(ro_task, task_index, task_depends)
2463 _update_refresh(new_status)
2464 else:
2465 refresh_at = ro_task["vim_info"]["refresh_at"]
2466 if refresh_at and refresh_at != -1 and now > refresh_at:
2467 (
2468 new_status,
2469 db_vim_info_update,
2470 ) = self.item2class[
2471 task["item"]
2472 ].refresh(ro_task)
2473 _update_refresh(new_status)
2474 else:
2475 # The refresh is updated to avoid set the value of "refresh_at" to
2476 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2477 # because it can happen that in this case the task is never processed
2478 _update_refresh(task["status"])
2479
2480 except Exception as e:
2481 new_status = "FAILED"
2482 db_vim_info_update = {
2483 "vim_status": "VIM_ERROR",
2484 "vim_message": str(e),
2485 }
2486
2487 if not isinstance(
2488 e, (NsWorkerException, vimconn.VimConnException)
2489 ):
2490 self.logger.error(
2491 "Unexpected exception at _delete_task task={}: {}".format(
2492 task["task_id"], e
2493 ),
2494 exc_info=True,
2495 )
2496
2497 try:
2498 if db_vim_info_update:
2499 db_vim_update = db_vim_info_update.copy()
2500 db_ro_task_update.update(
2501 {
2502 "vim_info." + k: v
2503 for k, v in db_vim_info_update.items()
2504 }
2505 )
2506 ro_task["vim_info"].update(db_vim_info_update)
2507
2508 if new_status:
2509 if task_action == "CREATE":
2510 task_status_create = new_status
2511 db_ro_task_update[task_path] = new_status
2512
2513 if target_update or db_vim_update:
2514 if target_update == "DELETE":
2515 self._update_target(task, None)
2516 elif target_update == "COPY_VIM_INFO":
2517 self._update_target(task, ro_task["vim_info"])
2518 else:
2519 self._update_target(task, db_vim_update)
2520
2521 except Exception as e:
2522 if (
2523 isinstance(e, DbException)
2524 and e.http_code == HTTPStatus.NOT_FOUND
2525 ):
2526 # if the vnfrs or nsrs has been removed from database, this task must be removed
2527 self.logger.debug(
2528 "marking to delete task={}".format(task["task_id"])
2529 )
2530 self.tasks_to_delete.append(task)
2531 else:
2532 self.logger.error(
2533 "Unexpected exception at _update_target task={}: {}".format(
2534 task["task_id"], e
2535 ),
2536 exc_info=True,
2537 )
2538
2539 locked_at = ro_task["locked_at"]
2540
2541 if lock_object:
2542 locked_at = [
2543 lock_object["locked_at"],
2544 lock_object["locked_at"] + self.task_locked_time,
2545 ]
2546 # locked_at contains two times to avoid race condition. In case the lock has been renewed, it will
2547 # contain exactly locked_at + self.task_locked_time
2548 LockRenew.remove_lock_object(lock_object)
2549
2550 q_filter = {
2551 "_id": ro_task["_id"],
2552 "to_check_at": ro_task["to_check_at"],
2553 "locked_at": locked_at,
2554 }
2555 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2556 # outside this task (by ro_nbi) do not update it
2557 db_ro_task_update["locked_by"] = None
2558 # locked_at converted to int only for debugging. When it is not decimals it means it has been unlocked
2559 db_ro_task_update["locked_at"] = int(now - self.task_locked_time)
2560 db_ro_task_update["modified_at"] = now
2561 db_ro_task_update["to_check_at"] = next_check_at
2562
2563 """
2564 # Log RO tasks only when loglevel is DEBUG
2565 if self.logger.getEffectiveLevel() == logging.DEBUG:
2566 db_ro_task_update_log = db_ro_task_update.copy()
2567 db_ro_task_update_log["_id"] = q_filter["_id"]
2568 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2569 """
2570
2571 if not self.db.set_one(
2572 "ro_tasks",
2573 update_dict=db_ro_task_update,
2574 q_filter=q_filter,
2575 fail_on_empty=False,
2576 ):
2577 del db_ro_task_update["to_check_at"]
2578 del q_filter["to_check_at"]
2579 """
2580 # Log RO tasks only when loglevel is DEBUG
2581 if self.logger.getEffectiveLevel() == logging.DEBUG:
2582 self._log_ro_task(
2583 None,
2584 db_ro_task_update_log,
2585 None,
2586 "TASK_WF",
2587 "SET_TASK " + str(q_filter),
2588 )
2589 """
2590 self.db.set_one(
2591 "ro_tasks",
2592 q_filter=q_filter,
2593 update_dict=db_ro_task_update,
2594 fail_on_empty=True,
2595 )
2596 except DbException as e:
2597 self.logger.error(
2598 "ro_task={} Error updating database {}".format(ro_task_id, e)
2599 )
2600 except Exception as e:
2601 self.logger.error(
2602 "Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True
2603 )
2604
2605 def _update_target(self, task, ro_vim_item_update):
2606 table, _, temp = task["target_record"].partition(":")
2607 _id, _, path_vim_status = temp.partition(":")
2608 path_item = path_vim_status[: path_vim_status.rfind(".")]
2609 path_item = path_item[: path_item.rfind(".")]
2610 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
2611 # path_item: dot separated list targeting record information, e.g. "vdur.10"
2612
2613 if ro_vim_item_update:
2614 update_dict = {
2615 path_vim_status + "." + k: v
2616 for k, v in ro_vim_item_update.items()
2617 if k
2618 in (
2619 "vim_id",
2620 "vim_details",
2621 "vim_message",
2622 "vim_name",
2623 "vim_status",
2624 "interfaces",
2625 "interfaces_backup",
2626 )
2627 }
2628
2629 if path_vim_status.startswith("vdur."):
2630 # for backward compatibility, add vdur.name apart from vdur.vim_name
2631 if ro_vim_item_update.get("vim_name"):
2632 update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"]
2633
2634 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
2635 if ro_vim_item_update.get("vim_id"):
2636 update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"]
2637
2638 # update general status
2639 if ro_vim_item_update.get("vim_status"):
2640 update_dict[path_item + ".status"] = ro_vim_item_update[
2641 "vim_status"
2642 ]
2643
2644 if ro_vim_item_update.get("interfaces"):
2645 path_interfaces = path_item + ".interfaces"
2646
2647 for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
2648 if iface:
2649 update_dict.update(
2650 {
2651 path_interfaces + ".{}.".format(i) + k: v
2652 for k, v in iface.items()
2653 if k in ("vlan", "compute_node", "pci")
2654 }
2655 )
2656
2657 # put ip_address and mac_address with ip-address and mac-address
2658 if iface.get("ip_address"):
2659 update_dict[
2660 path_interfaces + ".{}.".format(i) + "ip-address"
2661 ] = iface["ip_address"]
2662
2663 if iface.get("mac_address"):
2664 update_dict[
2665 path_interfaces + ".{}.".format(i) + "mac-address"
2666 ] = iface["mac_address"]
2667
2668 if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
2669 update_dict["ip-address"] = iface.get("ip_address").split(
2670 ";"
2671 )[0]
2672
2673 if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
2674 update_dict[path_item + ".ip-address"] = iface.get(
2675 "ip_address"
2676 ).split(";")[0]
2677
2678 self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
2679
2680 # If interfaces exists, it backups VDU interfaces in the DB for healing operations
2681 if ro_vim_item_update.get("interfaces"):
2682 search_key = path_vim_status + ".interfaces"
2683 if update_dict.get(search_key):
2684 interfaces_backup_update = {
2685 path_vim_status + ".interfaces_backup": update_dict[search_key]
2686 }
2687
2688 self.db.set_one(
2689 table,
2690 q_filter={"_id": _id},
2691 update_dict=interfaces_backup_update,
2692 )
2693
2694 else:
2695 update_dict = {path_item + ".status": "DELETED"}
2696 self.db.set_one(
2697 table,
2698 q_filter={"_id": _id},
2699 update_dict=update_dict,
2700 unset={path_vim_status: None},
2701 )
2702
2703 def _process_delete_db_tasks(self):
2704 """
2705 Delete task from database because vnfrs or nsrs or both have been deleted
2706 :return: None. Uses and modify self.tasks_to_delete
2707 """
2708 while self.tasks_to_delete:
2709 task = self.tasks_to_delete[0]
2710 vnfrs_deleted = None
2711 nsr_id = task["nsr_id"]
2712
2713 if task["target_record"].startswith("vnfrs:"):
2714 # check if nsrs is present
2715 if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False):
2716 vnfrs_deleted = task["target_record"].split(":")[1]
2717
2718 try:
2719 self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted)
2720 except Exception as e:
2721 self.logger.error(
2722 "Error deleting task={}: {}".format(task["task_id"], e)
2723 )
2724 self.tasks_to_delete.pop(0)
2725
2726 @staticmethod
2727 def delete_db_tasks(db, nsr_id, vnfrs_deleted):
2728 """
2729 Static method because it is called from osm_ng_ro.ns
2730 :param db: instance of database to use
2731 :param nsr_id: affected nsrs id
2732 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2733 :return: None, exception is fails
2734 """
2735 retries = 5
2736 for retry in range(retries):
2737 ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
2738 now = time.time()
2739 conflict = False
2740
2741 for ro_task in ro_tasks:
2742 db_update = {}
2743 to_delete_ro_task = True
2744
2745 for index, task in enumerate(ro_task["tasks"]):
2746 if not task:
2747 pass
2748 elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or (
2749 vnfrs_deleted
2750 and task["target_record"].startswith("vnfrs:" + vnfrs_deleted)
2751 ):
2752 db_update["tasks.{}".format(index)] = None
2753 else:
2754 # used by other nsr, ro_task cannot be deleted
2755 to_delete_ro_task = False
2756
2757 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2758 if to_delete_ro_task:
2759 if not db.del_one(
2760 "ro_tasks",
2761 q_filter={
2762 "_id": ro_task["_id"],
2763 "modified_at": ro_task["modified_at"],
2764 },
2765 fail_on_empty=False,
2766 ):
2767 conflict = True
2768 elif db_update:
2769 db_update["modified_at"] = now
2770 if not db.set_one(
2771 "ro_tasks",
2772 q_filter={
2773 "_id": ro_task["_id"],
2774 "modified_at": ro_task["modified_at"],
2775 },
2776 update_dict=db_update,
2777 fail_on_empty=False,
2778 ):
2779 conflict = True
2780 if not conflict:
2781 return
2782 else:
2783 raise NsWorkerException("Exceeded {} retries".format(retries))
2784
2785 def run(self):
2786 # load database
2787 self.logger.info("Starting")
2788 while True:
2789 # step 1: get commands from queue
2790 try:
2791 if self.vim_targets:
2792 task = self.task_queue.get(block=False)
2793 else:
2794 if not self.idle:
2795 self.logger.debug("enters in idle state")
2796 self.idle = True
2797 task = self.task_queue.get(block=True)
2798 self.idle = False
2799
2800 if task[0] == "terminate":
2801 break
2802 elif task[0] == "load_vim":
2803 self.logger.info("order to load vim {}".format(task[1]))
2804 self._load_vim(task[1])
2805 elif task[0] == "unload_vim":
2806 self.logger.info("order to unload vim {}".format(task[1]))
2807 self._unload_vim(task[1])
2808 elif task[0] == "reload_vim":
2809 self._reload_vim(task[1])
2810 elif task[0] == "check_vim":
2811 self.logger.info("order to check vim {}".format(task[1]))
2812 self._check_vim(task[1])
2813 continue
2814 except Exception as e:
2815 if isinstance(e, queue.Empty):
2816 pass
2817 else:
2818 self.logger.critical(
2819 "Error processing task: {}".format(e), exc_info=True
2820 )
2821
2822 # step 2: process pending_tasks, delete not needed tasks
2823 try:
2824 if self.tasks_to_delete:
2825 self._process_delete_db_tasks()
2826 busy = False
2827 """
2828 # Log RO tasks only when loglevel is DEBUG
2829 if self.logger.getEffectiveLevel() == logging.DEBUG:
2830 _ = self._get_db_all_tasks()
2831 """
2832 ro_task = self._get_db_task()
2833 if ro_task:
2834 self.logger.debug("Task to process: {}".format(ro_task))
2835 time.sleep(1)
2836 self._process_pending_tasks(ro_task)
2837 busy = True
2838 if not busy:
2839 time.sleep(5)
2840 except Exception as e:
2841 self.logger.critical(
2842 "Unexpected exception at run: " + str(e), exc_info=True
2843 )
2844
2845 self.logger.info("Finishing")