42a8a5912c2c849499d6ae465d7896424d42f65b
[osm/RO.git] / NG-RO / osm_ng_ro / ns_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 #
18 ##
19
20 """
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
25 """
26
27 from copy import deepcopy
28 from http import HTTPStatus
29 import logging
30 from os import makedirs
31 from os import path
32 import queue
33 import threading
34 import time
35 import traceback
36 from typing import Dict
37 from unittest.mock import Mock
38
39 from importlib_metadata import entry_points
40 from osm_common.dbbase import DbException
41 from osm_ng_ro.vim_admin import LockRenew
42 from osm_ro_plugin import sdnconn
43 from osm_ro_plugin import vimconn
44 from osm_ro_plugin.sdn_dummy import SdnDummyConnector
45 from osm_ro_plugin.vim_dummy import VimDummyConnector
46 import yaml
47
48 __author__ = "Alfonso Tierno"
49 __date__ = "$28-Sep-2017 12:07:15$"
50
51
52 def deep_get(target_dict, *args, **kwargs):
53 """
54 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
55 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
56 :param target_dict: dictionary to be read
57 :param args: list of keys to read from target_dict
58 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
59 :return: The wanted value if exists, None or default otherwise
60 """
61 for key in args:
62 if not isinstance(target_dict, dict) or key not in target_dict:
63 return kwargs.get("default")
64 target_dict = target_dict[key]
65 return target_dict
66
67
68 class NsWorkerException(Exception):
69 pass
70
71
72 class FailingConnector:
73 def __init__(self, error_msg):
74 self.error_msg = error_msg
75
76 for method in dir(vimconn.VimConnector):
77 if method[0] != "_":
78 setattr(
79 self, method, Mock(side_effect=vimconn.VimConnException(error_msg))
80 )
81
82 for method in dir(sdnconn.SdnConnectorBase):
83 if method[0] != "_":
84 setattr(
85 self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg))
86 )
87
88
89 class NsWorkerExceptionNotFound(NsWorkerException):
90 pass
91
92
93 class VimInteractionBase:
94 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
95 It implements methods that does nothing and return ok"""
96
97 def __init__(self, db, my_vims, db_vims, logger):
98 self.db = db
99 self.logger = logger
100 self.my_vims = my_vims
101 self.db_vims = db_vims
102
103 def new(self, ro_task, task_index, task_depends):
104 return "BUILD", {}
105
106 def refresh(self, ro_task):
107 """skip calling VIM to get image, flavor status. Assumes ok"""
108 if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
109 return "FAILED", {}
110
111 return "DONE", {}
112
113 def delete(self, ro_task, task_index):
114 """skip calling VIM to delete image. Assumes ok"""
115 return "DONE", {}
116
117 def exec(self, ro_task, task_index, task_depends):
118 return "DONE", None, None
119
120
121 class VimInteractionNet(VimInteractionBase):
122 def new(self, ro_task, task_index, task_depends):
123 vim_net_id = None
124 task = ro_task["tasks"][task_index]
125 task_id = task["task_id"]
126 created = False
127 created_items = {}
128 target_vim = self.my_vims[ro_task["target_id"]]
129 mgmtnet = False
130 mgmtnet_defined_in_vim = False
131
132 try:
133 # FIND
134 if task.get("find_params"):
135 # if management, get configuration of VIM
136 if task["find_params"].get("filter_dict"):
137 vim_filter = task["find_params"]["filter_dict"]
138 # management network
139 elif task["find_params"].get("mgmt"):
140 mgmtnet = True
141 if deep_get(
142 self.db_vims[ro_task["target_id"]],
143 "config",
144 "management_network_id",
145 ):
146 mgmtnet_defined_in_vim = True
147 vim_filter = {
148 "id": self.db_vims[ro_task["target_id"]]["config"][
149 "management_network_id"
150 ]
151 }
152 elif deep_get(
153 self.db_vims[ro_task["target_id"]],
154 "config",
155 "management_network_name",
156 ):
157 mgmtnet_defined_in_vim = True
158 vim_filter = {
159 "name": self.db_vims[ro_task["target_id"]]["config"][
160 "management_network_name"
161 ]
162 }
163 else:
164 vim_filter = {"name": task["find_params"]["name"]}
165 else:
166 raise NsWorkerExceptionNotFound(
167 "Invalid find_params for new_net {}".format(task["find_params"])
168 )
169
170 vim_nets = target_vim.get_network_list(vim_filter)
171 if not vim_nets and not task.get("params"):
172 # If there is mgmt-network in the descriptor,
173 # there is no mapping of that network to a VIM network in the descriptor,
174 # also there is no mapping in the "--config" parameter or at VIM creation;
175 # that mgmt-network will be created.
176 if mgmtnet and not mgmtnet_defined_in_vim:
177 net_name = (
178 vim_filter.get("name")
179 if vim_filter.get("name")
180 else vim_filter.get("id")[:16]
181 )
182 vim_net_id, created_items = target_vim.new_network(
183 net_name, None
184 )
185 self.logger.debug(
186 "Created mgmt network vim_net_id: {}".format(vim_net_id)
187 )
188 created = True
189 else:
190 raise NsWorkerExceptionNotFound(
191 "Network not found with this criteria: '{}'".format(
192 task.get("find_params")
193 )
194 )
195 elif len(vim_nets) > 1:
196 raise NsWorkerException(
197 "More than one network found with this criteria: '{}'".format(
198 task["find_params"]
199 )
200 )
201
202 if vim_nets:
203 vim_net_id = vim_nets[0]["id"]
204 else:
205 # CREATE
206 params = task["params"]
207 vim_net_id, created_items = target_vim.new_network(**params)
208 created = True
209
210 ro_vim_item_update = {
211 "vim_id": vim_net_id,
212 "vim_status": "BUILD",
213 "created": created,
214 "created_items": created_items,
215 "vim_details": None,
216 "vim_message": None,
217 }
218 self.logger.debug(
219 "task={} {} new-net={} created={}".format(
220 task_id, ro_task["target_id"], vim_net_id, created
221 )
222 )
223
224 return "BUILD", ro_vim_item_update
225 except (vimconn.VimConnException, NsWorkerException) as e:
226 self.logger.error(
227 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e)
228 )
229 ro_vim_item_update = {
230 "vim_status": "VIM_ERROR",
231 "created": created,
232 "vim_message": str(e),
233 }
234
235 return "FAILED", ro_vim_item_update
236
237 def refresh(self, ro_task):
238 """Call VIM to get network status"""
239 ro_task_id = ro_task["_id"]
240 target_vim = self.my_vims[ro_task["target_id"]]
241 vim_id = ro_task["vim_info"]["vim_id"]
242 net_to_refresh_list = [vim_id]
243
244 try:
245 vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
246 vim_info = vim_dict[vim_id]
247
248 if vim_info["status"] == "ACTIVE":
249 task_status = "DONE"
250 elif vim_info["status"] == "BUILD":
251 task_status = "BUILD"
252 else:
253 task_status = "FAILED"
254 except vimconn.VimConnException as e:
255 # Mark all tasks at VIM_ERROR status
256 self.logger.error(
257 "ro_task={} vim={} get-net={}: {}".format(
258 ro_task_id, ro_task["target_id"], vim_id, e
259 )
260 )
261 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
262 task_status = "FAILED"
263
264 ro_vim_item_update = {}
265 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
266 ro_vim_item_update["vim_status"] = vim_info["status"]
267
268 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
269 ro_vim_item_update["vim_name"] = vim_info.get("name")
270
271 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
272 if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"):
273 ro_vim_item_update["vim_message"] = vim_info.get("error_msg")
274 elif vim_info["status"] == "DELETED":
275 ro_vim_item_update["vim_id"] = None
276 ro_vim_item_update["vim_message"] = "Deleted externally"
277 else:
278 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
279 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
280
281 if ro_vim_item_update:
282 self.logger.debug(
283 "ro_task={} {} get-net={}: status={} {}".format(
284 ro_task_id,
285 ro_task["target_id"],
286 vim_id,
287 ro_vim_item_update.get("vim_status"),
288 ro_vim_item_update.get("vim_message")
289 if ro_vim_item_update.get("vim_status") != "ACTIVE"
290 else "",
291 )
292 )
293
294 return task_status, ro_vim_item_update
295
296 def delete(self, ro_task, task_index):
297 task = ro_task["tasks"][task_index]
298 task_id = task["task_id"]
299 net_vim_id = ro_task["vim_info"]["vim_id"]
300 ro_vim_item_update_ok = {
301 "vim_status": "DELETED",
302 "created": False,
303 "vim_message": "DELETED",
304 "vim_id": None,
305 }
306
307 try:
308 if net_vim_id or ro_task["vim_info"]["created_items"]:
309 target_vim = self.my_vims[ro_task["target_id"]]
310 target_vim.delete_network(
311 net_vim_id, ro_task["vim_info"]["created_items"]
312 )
313 except vimconn.VimConnNotFoundException:
314 ro_vim_item_update_ok["vim_message"] = "already deleted"
315 except vimconn.VimConnException as e:
316 self.logger.error(
317 "ro_task={} vim={} del-net={}: {}".format(
318 ro_task["_id"], ro_task["target_id"], net_vim_id, e
319 )
320 )
321 ro_vim_item_update = {
322 "vim_status": "VIM_ERROR",
323 "vim_message": "Error while deleting: {}".format(e),
324 }
325
326 return "FAILED", ro_vim_item_update
327
328 self.logger.debug(
329 "task={} {} del-net={} {}".format(
330 task_id,
331 ro_task["target_id"],
332 net_vim_id,
333 ro_vim_item_update_ok.get("vim_message", ""),
334 )
335 )
336
337 return "DONE", ro_vim_item_update_ok
338
339
340 class VimInteractionVdu(VimInteractionBase):
341 max_retries_inject_ssh_key = 20 # 20 times
342 time_retries_inject_ssh_key = 30 # wevery 30 seconds
343
344 def new(self, ro_task, task_index, task_depends):
345 task = ro_task["tasks"][task_index]
346 task_id = task["task_id"]
347 created = False
348 target_vim = self.my_vims[ro_task["target_id"]]
349 try:
350 created = True
351 params = task["params"]
352 params_copy = deepcopy(params)
353 net_list = params_copy["net_list"]
354
355 for net in net_list:
356 # change task_id into network_id
357 if "net_id" in net and net["net_id"].startswith("TASK-"):
358 network_id = task_depends[net["net_id"]]
359
360 if not network_id:
361 raise NsWorkerException(
362 "Cannot create VM because depends on a network not created or found "
363 "for {}".format(net["net_id"])
364 )
365
366 net["net_id"] = network_id
367
368 if params_copy["image_id"].startswith("TASK-"):
369 params_copy["image_id"] = task_depends[params_copy["image_id"]]
370
371 if params_copy["flavor_id"].startswith("TASK-"):
372 params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
373
374 affinity_group_list = params_copy["affinity_group_list"]
375 for affinity_group in affinity_group_list:
376 # change task_id into affinity_group_id
377 if "affinity_group_id" in affinity_group and affinity_group[
378 "affinity_group_id"
379 ].startswith("TASK-"):
380 affinity_group_id = task_depends[
381 affinity_group["affinity_group_id"]
382 ]
383
384 if not affinity_group_id:
385 raise NsWorkerException(
386 "found for {}".format(affinity_group["affinity_group_id"])
387 )
388
389 affinity_group["affinity_group_id"] = affinity_group_id
390 vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
391 interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
392
393 # add to created items previous_created_volumes (healing)
394 if task.get("previous_created_volumes"):
395 for k, v in task["previous_created_volumes"].items():
396 created_items[k] = v
397
398 ro_vim_item_update = {
399 "vim_id": vim_vm_id,
400 "vim_status": "BUILD",
401 "created": created,
402 "created_items": created_items,
403 "vim_details": None,
404 "vim_message": None,
405 "interfaces_vim_ids": interfaces,
406 "interfaces": [],
407 "interfaces_backup": [],
408 }
409 self.logger.debug(
410 "task={} {} new-vm={} created={}".format(
411 task_id, ro_task["target_id"], vim_vm_id, created
412 )
413 )
414
415 return "BUILD", ro_vim_item_update
416 except (vimconn.VimConnException, NsWorkerException) as e:
417 self.logger.debug(traceback.format_exc())
418 self.logger.error(
419 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
420 )
421 ro_vim_item_update = {
422 "vim_status": "VIM_ERROR",
423 "created": created,
424 "vim_message": str(e),
425 }
426
427 return "FAILED", ro_vim_item_update
428
429 def delete(self, ro_task, task_index):
430 task = ro_task["tasks"][task_index]
431 task_id = task["task_id"]
432 vm_vim_id = ro_task["vim_info"]["vim_id"]
433 ro_vim_item_update_ok = {
434 "vim_status": "DELETED",
435 "created": False,
436 "vim_message": "DELETED",
437 "vim_id": None,
438 }
439
440 try:
441 self.logger.debug(
442 "delete_vminstance: vm_vim_id={} created_items={}".format(
443 vm_vim_id, ro_task["vim_info"]["created_items"]
444 )
445 )
446 if vm_vim_id or ro_task["vim_info"]["created_items"]:
447 target_vim = self.my_vims[ro_task["target_id"]]
448 target_vim.delete_vminstance(
449 vm_vim_id,
450 ro_task["vim_info"]["created_items"],
451 ro_task["vim_info"].get("volumes_to_hold", []),
452 )
453 except vimconn.VimConnNotFoundException:
454 ro_vim_item_update_ok["vim_message"] = "already deleted"
455 except vimconn.VimConnException as e:
456 self.logger.error(
457 "ro_task={} vim={} del-vm={}: {}".format(
458 ro_task["_id"], ro_task["target_id"], vm_vim_id, e
459 )
460 )
461 ro_vim_item_update = {
462 "vim_status": "VIM_ERROR",
463 "vim_message": "Error while deleting: {}".format(e),
464 }
465
466 return "FAILED", ro_vim_item_update
467
468 self.logger.debug(
469 "task={} {} del-vm={} {}".format(
470 task_id,
471 ro_task["target_id"],
472 vm_vim_id,
473 ro_vim_item_update_ok.get("vim_message", ""),
474 )
475 )
476
477 return "DONE", ro_vim_item_update_ok
478
479 def refresh(self, ro_task):
480 """Call VIM to get vm status"""
481 ro_task_id = ro_task["_id"]
482 target_vim = self.my_vims[ro_task["target_id"]]
483 vim_id = ro_task["vim_info"]["vim_id"]
484
485 if not vim_id:
486 return None, None
487
488 vm_to_refresh_list = [vim_id]
489 try:
490 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
491 vim_info = vim_dict[vim_id]
492
493 if vim_info["status"] == "ACTIVE":
494 task_status = "DONE"
495 elif vim_info["status"] == "BUILD":
496 task_status = "BUILD"
497 else:
498 task_status = "FAILED"
499
500 # try to load and parse vim_information
501 try:
502 vim_info_info = yaml.safe_load(vim_info["vim_info"])
503 if vim_info_info.get("name"):
504 vim_info["name"] = vim_info_info["name"]
505 except Exception as vim_info_error:
506 self.logger.exception(
507 f"{vim_info_error} occured while getting the vim_info from yaml"
508 )
509 except vimconn.VimConnException as e:
510 # Mark all tasks at VIM_ERROR status
511 self.logger.error(
512 "ro_task={} vim={} get-vm={}: {}".format(
513 ro_task_id, ro_task["target_id"], vim_id, e
514 )
515 )
516 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
517 task_status = "FAILED"
518
519 ro_vim_item_update = {}
520
521 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
522 vim_interfaces = []
523 if vim_info.get("interfaces"):
524 for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
525 iface = next(
526 (
527 iface
528 for iface in vim_info["interfaces"]
529 if vim_iface_id == iface["vim_interface_id"]
530 ),
531 None,
532 )
533 # if iface:
534 # iface.pop("vim_info", None)
535 vim_interfaces.append(iface)
536
537 task_create = next(
538 t
539 for t in ro_task["tasks"]
540 if t and t["action"] == "CREATE" and t["status"] != "FINISHED"
541 )
542 if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None:
543 vim_interfaces[task_create["mgmt_vnf_interface"]][
544 "mgmt_vnf_interface"
545 ] = True
546
547 mgmt_vdu_iface = task_create.get(
548 "mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0)
549 )
550 if vim_interfaces:
551 vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
552
553 if ro_task["vim_info"]["interfaces"] != vim_interfaces:
554 ro_vim_item_update["interfaces"] = vim_interfaces
555
556 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
557 ro_vim_item_update["vim_status"] = vim_info["status"]
558
559 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
560 ro_vim_item_update["vim_name"] = vim_info.get("name")
561
562 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
563 if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"):
564 ro_vim_item_update["vim_message"] = vim_info.get("error_msg")
565 elif vim_info["status"] == "DELETED":
566 ro_vim_item_update["vim_id"] = None
567 ro_vim_item_update["vim_message"] = "Deleted externally"
568 else:
569 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
570 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
571
572 if ro_vim_item_update:
573 self.logger.debug(
574 "ro_task={} {} get-vm={}: status={} {}".format(
575 ro_task_id,
576 ro_task["target_id"],
577 vim_id,
578 ro_vim_item_update.get("vim_status"),
579 ro_vim_item_update.get("vim_message")
580 if ro_vim_item_update.get("vim_status") != "ACTIVE"
581 else "",
582 )
583 )
584
585 return task_status, ro_vim_item_update
586
587 def exec(self, ro_task, task_index, task_depends):
588 task = ro_task["tasks"][task_index]
589 task_id = task["task_id"]
590 target_vim = self.my_vims[ro_task["target_id"]]
591 db_task_update = {"retries": 0}
592 retries = task.get("retries", 0)
593
594 try:
595 params = task["params"]
596 params_copy = deepcopy(params)
597 params_copy["ro_key"] = self.db.decrypt(
598 params_copy.pop("private_key"),
599 params_copy.pop("schema_version"),
600 params_copy.pop("salt"),
601 )
602 params_copy["ip_addr"] = params_copy.pop("ip_address")
603 target_vim.inject_user_key(**params_copy)
604 self.logger.debug(
605 "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"])
606 )
607
608 return (
609 "DONE",
610 None,
611 db_task_update,
612 ) # params_copy["key"]
613 except (vimconn.VimConnException, NsWorkerException) as e:
614 retries += 1
615
616 self.logger.debug(traceback.format_exc())
617 if retries < self.max_retries_inject_ssh_key:
618 return (
619 "BUILD",
620 None,
621 {
622 "retries": retries,
623 "next_retry": self.time_retries_inject_ssh_key,
624 },
625 )
626
627 self.logger.error(
628 "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e)
629 )
630 ro_vim_item_update = {"vim_message": str(e)}
631
632 return "FAILED", ro_vim_item_update, db_task_update
633
634
635 class VimInteractionImage(VimInteractionBase):
636 def new(self, ro_task, task_index, task_depends):
637 task = ro_task["tasks"][task_index]
638 task_id = task["task_id"]
639 created = False
640 created_items = {}
641 target_vim = self.my_vims[ro_task["target_id"]]
642
643 try:
644 # FIND
645 vim_image_id = ""
646 if task.get("find_params"):
647 vim_images = target_vim.get_image_list(**task["find_params"])
648
649 if not vim_images:
650 raise NsWorkerExceptionNotFound(
651 "Image not found with this criteria: '{}'".format(
652 task["find_params"]
653 )
654 )
655 elif len(vim_images) > 1:
656 raise NsWorkerException(
657 "More than one image found with this criteria: '{}'".format(
658 task["find_params"]
659 )
660 )
661 else:
662 vim_image_id = vim_images[0]["id"]
663
664 ro_vim_item_update = {
665 "vim_id": vim_image_id,
666 "vim_status": "ACTIVE",
667 "created": created,
668 "created_items": created_items,
669 "vim_details": None,
670 "vim_message": None,
671 }
672 self.logger.debug(
673 "task={} {} new-image={} created={}".format(
674 task_id, ro_task["target_id"], vim_image_id, created
675 )
676 )
677
678 return "DONE", ro_vim_item_update
679 except (NsWorkerException, vimconn.VimConnException) as e:
680 self.logger.error(
681 "task={} {} new-image: {}".format(task_id, ro_task["target_id"], e)
682 )
683 ro_vim_item_update = {
684 "vim_status": "VIM_ERROR",
685 "created": created,
686 "vim_message": str(e),
687 }
688
689 return "FAILED", ro_vim_item_update
690
691
692 class VimInteractionSharedVolume(VimInteractionBase):
693 def delete(self, ro_task, task_index):
694 task = ro_task["tasks"][task_index]
695 task_id = task["task_id"]
696 shared_volume_vim_id = ro_task["vim_info"]["vim_id"]
697 created_items = ro_task["vim_info"]["created_items"]
698 ro_vim_item_update_ok = {
699 "vim_status": "DELETED",
700 "created": False,
701 "vim_message": "DELETED",
702 "vim_id": None,
703 }
704 if created_items and created_items.get(shared_volume_vim_id).get("keep"):
705 ro_vim_item_update_ok = {
706 "vim_status": "ACTIVE",
707 "created": False,
708 "vim_message": None,
709 }
710 return "DONE", ro_vim_item_update_ok
711 try:
712 if shared_volume_vim_id:
713 target_vim = self.my_vims[ro_task["target_id"]]
714 target_vim.delete_shared_volumes(shared_volume_vim_id)
715 except vimconn.VimConnNotFoundException:
716 ro_vim_item_update_ok["vim_message"] = "already deleted"
717 except vimconn.VimConnException as e:
718 self.logger.error(
719 "ro_task={} vim={} del-shared-volume={}: {}".format(
720 ro_task["_id"], ro_task["target_id"], shared_volume_vim_id, e
721 )
722 )
723 ro_vim_item_update = {
724 "vim_status": "VIM_ERROR",
725 "vim_message": "Error while deleting: {}".format(e),
726 }
727
728 return "FAILED", ro_vim_item_update
729
730 self.logger.debug(
731 "task={} {} del-shared-volume={} {}".format(
732 task_id,
733 ro_task["target_id"],
734 shared_volume_vim_id,
735 ro_vim_item_update_ok.get("vim_message", ""),
736 )
737 )
738
739 return "DONE", ro_vim_item_update_ok
740
741 def new(self, ro_task, task_index, task_depends):
742 task = ro_task["tasks"][task_index]
743 task_id = task["task_id"]
744 created = False
745 created_items = {}
746 target_vim = self.my_vims[ro_task["target_id"]]
747
748 try:
749 shared_volume_vim_id = None
750 shared_volume_data = None
751
752 if task.get("params"):
753 shared_volume_data = task["params"]
754
755 if shared_volume_data:
756 self.logger.info(
757 f"Creating the new shared_volume for {shared_volume_data}\n"
758 )
759 (
760 shared_volume_name,
761 shared_volume_vim_id,
762 ) = target_vim.new_shared_volumes(shared_volume_data)
763 created = True
764 created_items[shared_volume_vim_id] = {
765 "name": shared_volume_name,
766 "keep": shared_volume_data.get("keep"),
767 }
768
769 ro_vim_item_update = {
770 "vim_id": shared_volume_vim_id,
771 "vim_status": "ACTIVE",
772 "created": created,
773 "created_items": created_items,
774 "vim_details": None,
775 "vim_message": None,
776 }
777 self.logger.debug(
778 "task={} {} new-shared-volume={} created={}".format(
779 task_id, ro_task["target_id"], shared_volume_vim_id, created
780 )
781 )
782
783 return "DONE", ro_vim_item_update
784 except (vimconn.VimConnException, NsWorkerException) as e:
785 self.logger.error(
786 "task={} vim={} new-shared-volume:"
787 " {}".format(task_id, ro_task["target_id"], e)
788 )
789 ro_vim_item_update = {
790 "vim_status": "VIM_ERROR",
791 "created": created,
792 "vim_message": str(e),
793 }
794
795 return "FAILED", ro_vim_item_update
796
797
798 class VimInteractionFlavor(VimInteractionBase):
799 def delete(self, ro_task, task_index):
800 task = ro_task["tasks"][task_index]
801 task_id = task["task_id"]
802 flavor_vim_id = ro_task["vim_info"]["vim_id"]
803 ro_vim_item_update_ok = {
804 "vim_status": "DELETED",
805 "created": False,
806 "vim_message": "DELETED",
807 "vim_id": None,
808 }
809
810 try:
811 if flavor_vim_id:
812 target_vim = self.my_vims[ro_task["target_id"]]
813 target_vim.delete_flavor(flavor_vim_id)
814 except vimconn.VimConnNotFoundException:
815 ro_vim_item_update_ok["vim_message"] = "already deleted"
816 except vimconn.VimConnException as e:
817 self.logger.error(
818 "ro_task={} vim={} del-flavor={}: {}".format(
819 ro_task["_id"], ro_task["target_id"], flavor_vim_id, e
820 )
821 )
822 ro_vim_item_update = {
823 "vim_status": "VIM_ERROR",
824 "vim_message": "Error while deleting: {}".format(e),
825 }
826
827 return "FAILED", ro_vim_item_update
828
829 self.logger.debug(
830 "task={} {} del-flavor={} {}".format(
831 task_id,
832 ro_task["target_id"],
833 flavor_vim_id,
834 ro_vim_item_update_ok.get("vim_message", ""),
835 )
836 )
837
838 return "DONE", ro_vim_item_update_ok
839
840 def new(self, ro_task, task_index, task_depends):
841 task = ro_task["tasks"][task_index]
842 task_id = task["task_id"]
843 created = False
844 created_items = {}
845 target_vim = self.my_vims[ro_task["target_id"]]
846 try:
847 # FIND
848 vim_flavor_id = None
849
850 if task.get("find_params", {}).get("vim_flavor_id"):
851 vim_flavor_id = task["find_params"]["vim_flavor_id"]
852 elif task.get("find_params", {}).get("flavor_data"):
853 try:
854 flavor_data = task["find_params"]["flavor_data"]
855 vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
856 except vimconn.VimConnNotFoundException as flavor_not_found_msg:
857 self.logger.warning(
858 f"VimConnNotFoundException occured: {flavor_not_found_msg}"
859 )
860
861 if not vim_flavor_id and task.get("params"):
862 # CREATE
863 flavor_data = task["params"]["flavor_data"]
864 vim_flavor_id = target_vim.new_flavor(flavor_data)
865 created = True
866
867 ro_vim_item_update = {
868 "vim_id": vim_flavor_id,
869 "vim_status": "ACTIVE",
870 "created": created,
871 "created_items": created_items,
872 "vim_details": None,
873 "vim_message": None,
874 }
875 self.logger.debug(
876 "task={} {} new-flavor={} created={}".format(
877 task_id, ro_task["target_id"], vim_flavor_id, created
878 )
879 )
880
881 return "DONE", ro_vim_item_update
882 except (vimconn.VimConnException, NsWorkerException) as e:
883 self.logger.error(
884 "task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e)
885 )
886 ro_vim_item_update = {
887 "vim_status": "VIM_ERROR",
888 "created": created,
889 "vim_message": str(e),
890 }
891
892 return "FAILED", ro_vim_item_update
893
894
895 class VimInteractionAffinityGroup(VimInteractionBase):
896 def delete(self, ro_task, task_index):
897 task = ro_task["tasks"][task_index]
898 task_id = task["task_id"]
899 affinity_group_vim_id = ro_task["vim_info"]["vim_id"]
900 ro_vim_item_update_ok = {
901 "vim_status": "DELETED",
902 "created": False,
903 "vim_message": "DELETED",
904 "vim_id": None,
905 }
906
907 try:
908 if affinity_group_vim_id:
909 target_vim = self.my_vims[ro_task["target_id"]]
910 target_vim.delete_affinity_group(affinity_group_vim_id)
911 except vimconn.VimConnNotFoundException:
912 ro_vim_item_update_ok["vim_message"] = "already deleted"
913 except vimconn.VimConnException as e:
914 self.logger.error(
915 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format(
916 ro_task["_id"], ro_task["target_id"], affinity_group_vim_id, e
917 )
918 )
919 ro_vim_item_update = {
920 "vim_status": "VIM_ERROR",
921 "vim_message": "Error while deleting: {}".format(e),
922 }
923
924 return "FAILED", ro_vim_item_update
925
926 self.logger.debug(
927 "task={} {} del-affinity-or-anti-affinity-group={} {}".format(
928 task_id,
929 ro_task["target_id"],
930 affinity_group_vim_id,
931 ro_vim_item_update_ok.get("vim_message", ""),
932 )
933 )
934
935 return "DONE", ro_vim_item_update_ok
936
937 def new(self, ro_task, task_index, task_depends):
938 task = ro_task["tasks"][task_index]
939 task_id = task["task_id"]
940 created = False
941 created_items = {}
942 target_vim = self.my_vims[ro_task["target_id"]]
943
944 try:
945 affinity_group_vim_id = None
946 affinity_group_data = None
947 param_affinity_group_id = ""
948
949 if task.get("params"):
950 affinity_group_data = task["params"].get("affinity_group_data")
951
952 if affinity_group_data and affinity_group_data.get("vim-affinity-group-id"):
953 try:
954 param_affinity_group_id = task["params"]["affinity_group_data"].get(
955 "vim-affinity-group-id"
956 )
957 affinity_group_vim_id = target_vim.get_affinity_group(
958 param_affinity_group_id
959 ).get("id")
960 except vimconn.VimConnNotFoundException:
961 self.logger.error(
962 "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}"
963 "could not be found at VIM. Creating a new one.".format(
964 task_id, ro_task["target_id"], param_affinity_group_id
965 )
966 )
967
968 if not affinity_group_vim_id and affinity_group_data:
969 affinity_group_vim_id = target_vim.new_affinity_group(
970 affinity_group_data
971 )
972 created = True
973
974 ro_vim_item_update = {
975 "vim_id": affinity_group_vim_id,
976 "vim_status": "ACTIVE",
977 "created": created,
978 "created_items": created_items,
979 "vim_details": None,
980 "vim_message": None,
981 }
982 self.logger.debug(
983 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format(
984 task_id, ro_task["target_id"], affinity_group_vim_id, created
985 )
986 )
987
988 return "DONE", ro_vim_item_update
989 except (vimconn.VimConnException, NsWorkerException) as e:
990 self.logger.error(
991 "task={} vim={} new-affinity-or-anti-affinity-group:"
992 " {}".format(task_id, ro_task["target_id"], e)
993 )
994 ro_vim_item_update = {
995 "vim_status": "VIM_ERROR",
996 "created": created,
997 "vim_message": str(e),
998 }
999
1000 return "FAILED", ro_vim_item_update
1001
1002
1003 class VimInteractionUpdateVdu(VimInteractionBase):
1004 def exec(self, ro_task, task_index, task_depends):
1005 task = ro_task["tasks"][task_index]
1006 task_id = task["task_id"]
1007 db_task_update = {"retries": 0}
1008 created = False
1009 created_items = {}
1010 target_vim = self.my_vims[ro_task["target_id"]]
1011
1012 try:
1013 vim_vm_id = ""
1014 if task.get("params"):
1015 vim_vm_id = task["params"].get("vim_vm_id")
1016 action = task["params"].get("action")
1017 context = {action: action}
1018 target_vim.action_vminstance(vim_vm_id, context)
1019 # created = True
1020 ro_vim_item_update = {
1021 "vim_id": vim_vm_id,
1022 "vim_status": "ACTIVE",
1023 "created": created,
1024 "created_items": created_items,
1025 "vim_details": None,
1026 "vim_message": None,
1027 }
1028 self.logger.debug(
1029 "task={} {} vm-migration done".format(task_id, ro_task["target_id"])
1030 )
1031 return "DONE", ro_vim_item_update, db_task_update
1032 except (vimconn.VimConnException, NsWorkerException) as e:
1033 self.logger.error(
1034 "task={} vim={} VM Migration:"
1035 " {}".format(task_id, ro_task["target_id"], e)
1036 )
1037 ro_vim_item_update = {
1038 "vim_status": "VIM_ERROR",
1039 "created": created,
1040 "vim_message": str(e),
1041 }
1042
1043 return "FAILED", ro_vim_item_update, db_task_update
1044
1045
1046 class VimInteractionSdnNet(VimInteractionBase):
1047 @staticmethod
1048 def _match_pci(port_pci, mapping):
1049 """
1050 Check if port_pci matches with mapping.
1051 The mapping can have brackets to indicate that several chars are accepted. e.g
1052 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
1053 :param port_pci: text
1054 :param mapping: text, can contain brackets to indicate several chars are available
1055 :return: True if matches, False otherwise
1056 """
1057 if not port_pci or not mapping:
1058 return False
1059 if port_pci == mapping:
1060 return True
1061
1062 mapping_index = 0
1063 pci_index = 0
1064 while True:
1065 bracket_start = mapping.find("[", mapping_index)
1066
1067 if bracket_start == -1:
1068 break
1069
1070 bracket_end = mapping.find("]", bracket_start)
1071 if bracket_end == -1:
1072 break
1073
1074 length = bracket_start - mapping_index
1075 if (
1076 length
1077 and port_pci[pci_index : pci_index + length]
1078 != mapping[mapping_index:bracket_start]
1079 ):
1080 return False
1081
1082 if (
1083 port_pci[pci_index + length]
1084 not in mapping[bracket_start + 1 : bracket_end]
1085 ):
1086 return False
1087
1088 pci_index += length + 1
1089 mapping_index = bracket_end + 1
1090
1091 if port_pci[pci_index:] != mapping[mapping_index:]:
1092 return False
1093
1094 return True
1095
1096 def _get_interfaces(self, vlds_to_connect, vim_account_id):
1097 """
1098 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
1099 :param vim_account_id:
1100 :return:
1101 """
1102 interfaces = []
1103
1104 for vld in vlds_to_connect:
1105 table, _, db_id = vld.partition(":")
1106 db_id, _, vld = db_id.partition(":")
1107 _, _, vld_id = vld.partition(".")
1108
1109 if table == "vnfrs":
1110 q_filter = {"vim-account-id": vim_account_id, "_id": db_id}
1111 iface_key = "vnf-vld-id"
1112 else: # table == "nsrs"
1113 q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id}
1114 iface_key = "ns-vld-id"
1115
1116 db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter)
1117
1118 for db_vnfr in db_vnfrs:
1119 for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())):
1120 for iface_index, interface in enumerate(vdur["interfaces"]):
1121 if interface.get(iface_key) == vld_id and interface.get(
1122 "type"
1123 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
1124 # only SR-IOV o PT
1125 interface_ = interface.copy()
1126 interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
1127 db_vnfr["_id"], vdu_index, iface_index
1128 )
1129
1130 if vdur.get("status") == "ERROR":
1131 interface_["status"] = "ERROR"
1132
1133 interfaces.append(interface_)
1134
1135 return interfaces
1136
1137 def refresh(self, ro_task):
1138 # look for task create
1139 task_create_index, _ = next(
1140 i_t
1141 for i_t in enumerate(ro_task["tasks"])
1142 if i_t[1]
1143 and i_t[1]["action"] == "CREATE"
1144 and i_t[1]["status"] != "FINISHED"
1145 )
1146
1147 return self.new(ro_task, task_create_index, None)
1148
1149 def new(self, ro_task, task_index, task_depends):
1150 task = ro_task["tasks"][task_index]
1151 task_id = task["task_id"]
1152 target_vim = self.my_vims[ro_task["target_id"]]
1153
1154 sdn_net_id = ro_task["vim_info"]["vim_id"]
1155
1156 created_items = ro_task["vim_info"].get("created_items")
1157 connected_ports = ro_task["vim_info"].get("connected_ports", [])
1158 new_connected_ports = []
1159 last_update = ro_task["vim_info"].get("last_update", 0)
1160 sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD"
1161 error_list = []
1162 created = ro_task["vim_info"].get("created", False)
1163
1164 try:
1165 # CREATE
1166 db_vim = {}
1167 params = task["params"]
1168 vlds_to_connect = params.get("vlds", [])
1169 associated_vim = params.get("target_vim")
1170 # external additional ports
1171 additional_ports = params.get("sdn-ports") or ()
1172 _, _, vim_account_id = (
1173 (None, None, None)
1174 if associated_vim is None
1175 else associated_vim.partition(":")
1176 )
1177
1178 if associated_vim:
1179 # get associated VIM
1180 if associated_vim not in self.db_vims:
1181 self.db_vims[associated_vim] = self.db.get_one(
1182 "vim_accounts", {"_id": vim_account_id}
1183 )
1184
1185 db_vim = self.db_vims[associated_vim]
1186
1187 # look for ports to connect
1188 ports = self._get_interfaces(vlds_to_connect, vim_account_id)
1189 # print(ports)
1190
1191 sdn_ports = []
1192 pending_ports = error_ports = 0
1193 vlan_used = None
1194 sdn_need_update = False
1195
1196 for port in ports:
1197 vlan_used = port.get("vlan") or vlan_used
1198
1199 # TODO. Do not connect if already done
1200 if not port.get("compute_node") or not port.get("pci"):
1201 if port.get("status") == "ERROR":
1202 error_ports += 1
1203 else:
1204 pending_ports += 1
1205 continue
1206
1207 pmap = None
1208 compute_node_mappings = next(
1209 (
1210 c
1211 for c in db_vim["config"].get("sdn-port-mapping", ())
1212 if c and c["compute_node"] == port["compute_node"]
1213 ),
1214 None,
1215 )
1216
1217 if compute_node_mappings:
1218 # process port_mapping pci of type 0000:af:1[01].[1357]
1219 pmap = next(
1220 (
1221 p
1222 for p in compute_node_mappings["ports"]
1223 if self._match_pci(port["pci"], p.get("pci"))
1224 ),
1225 None,
1226 )
1227
1228 if not pmap:
1229 if not db_vim["config"].get("mapping_not_needed"):
1230 error_list.append(
1231 "Port mapping not found for compute_node={} pci={}".format(
1232 port["compute_node"], port["pci"]
1233 )
1234 )
1235 continue
1236
1237 pmap = {}
1238
1239 service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"])
1240 new_port = {
1241 "service_endpoint_id": pmap.get("service_endpoint_id")
1242 or service_endpoint_id,
1243 "service_endpoint_encapsulation_type": "dot1q"
1244 if port["type"] == "SR-IOV"
1245 else None,
1246 "service_endpoint_encapsulation_info": {
1247 "vlan": port.get("vlan"),
1248 "mac": port.get("mac-address"),
1249 "device_id": pmap.get("device_id") or port["compute_node"],
1250 "device_interface_id": pmap.get("device_interface_id")
1251 or port["pci"],
1252 "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"),
1253 "switch_port": pmap.get("switch_port"),
1254 "service_mapping_info": pmap.get("service_mapping_info"),
1255 },
1256 }
1257
1258 # TODO
1259 # if port["modified_at"] > last_update:
1260 # sdn_need_update = True
1261 new_connected_ports.append(port["id"]) # TODO
1262 sdn_ports.append(new_port)
1263
1264 if error_ports:
1265 error_list.append(
1266 "{} interfaces have not been created as VDU is on ERROR status".format(
1267 error_ports
1268 )
1269 )
1270
1271 # connect external ports
1272 for index, additional_port in enumerate(additional_ports):
1273 additional_port_id = additional_port.get(
1274 "service_endpoint_id"
1275 ) or "external-{}".format(index)
1276 sdn_ports.append(
1277 {
1278 "service_endpoint_id": additional_port_id,
1279 "service_endpoint_encapsulation_type": additional_port.get(
1280 "service_endpoint_encapsulation_type", "dot1q"
1281 ),
1282 "service_endpoint_encapsulation_info": {
1283 "vlan": additional_port.get("vlan") or vlan_used,
1284 "mac": additional_port.get("mac_address"),
1285 "device_id": additional_port.get("device_id"),
1286 "device_interface_id": additional_port.get(
1287 "device_interface_id"
1288 ),
1289 "switch_dpid": additional_port.get("switch_dpid")
1290 or additional_port.get("switch_id"),
1291 "switch_port": additional_port.get("switch_port"),
1292 "service_mapping_info": additional_port.get(
1293 "service_mapping_info"
1294 ),
1295 },
1296 }
1297 )
1298 new_connected_ports.append(additional_port_id)
1299 sdn_info = ""
1300
1301 # if there are more ports to connect or they have been modified, call create/update
1302 if error_list:
1303 sdn_status = "ERROR"
1304 sdn_info = "; ".join(error_list)
1305 elif set(connected_ports) != set(new_connected_ports) or sdn_need_update:
1306 last_update = time.time()
1307
1308 if not sdn_net_id:
1309 if len(sdn_ports) < 2:
1310 sdn_status = "ACTIVE"
1311
1312 if not pending_ports:
1313 self.logger.debug(
1314 "task={} {} new-sdn-net done, less than 2 ports".format(
1315 task_id, ro_task["target_id"]
1316 )
1317 )
1318 else:
1319 net_type = params.get("type") or "ELAN"
1320 (
1321 sdn_net_id,
1322 created_items,
1323 ) = target_vim.create_connectivity_service(net_type, sdn_ports)
1324 created = True
1325 self.logger.debug(
1326 "task={} {} new-sdn-net={} created={}".format(
1327 task_id, ro_task["target_id"], sdn_net_id, created
1328 )
1329 )
1330 else:
1331 created_items = target_vim.edit_connectivity_service(
1332 sdn_net_id, conn_info=created_items, connection_points=sdn_ports
1333 )
1334 created = True
1335 self.logger.debug(
1336 "task={} {} update-sdn-net={} created={}".format(
1337 task_id, ro_task["target_id"], sdn_net_id, created
1338 )
1339 )
1340
1341 connected_ports = new_connected_ports
1342 elif sdn_net_id:
1343 wim_status_dict = target_vim.get_connectivity_service_status(
1344 sdn_net_id, conn_info=created_items
1345 )
1346 sdn_status = wim_status_dict["sdn_status"]
1347
1348 if wim_status_dict.get("sdn_info"):
1349 sdn_info = str(wim_status_dict.get("sdn_info")) or ""
1350
1351 if wim_status_dict.get("error_msg"):
1352 sdn_info = wim_status_dict.get("error_msg") or ""
1353
1354 if pending_ports:
1355 if sdn_status != "ERROR":
1356 sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1357 len(ports) - pending_ports, len(ports)
1358 )
1359
1360 if sdn_status == "ACTIVE":
1361 sdn_status = "BUILD"
1362
1363 ro_vim_item_update = {
1364 "vim_id": sdn_net_id,
1365 "vim_status": sdn_status,
1366 "created": created,
1367 "created_items": created_items,
1368 "connected_ports": connected_ports,
1369 "vim_details": sdn_info,
1370 "vim_message": None,
1371 "last_update": last_update,
1372 }
1373
1374 return sdn_status, ro_vim_item_update
1375 except Exception as e:
1376 self.logger.error(
1377 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e),
1378 exc_info=not isinstance(
1379 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1380 ),
1381 )
1382 ro_vim_item_update = {
1383 "vim_status": "VIM_ERROR",
1384 "created": created,
1385 "vim_message": str(e),
1386 }
1387
1388 return "FAILED", ro_vim_item_update
1389
1390 def delete(self, ro_task, task_index):
1391 task = ro_task["tasks"][task_index]
1392 task_id = task["task_id"]
1393 sdn_vim_id = ro_task["vim_info"].get("vim_id")
1394 ro_vim_item_update_ok = {
1395 "vim_status": "DELETED",
1396 "created": False,
1397 "vim_message": "DELETED",
1398 "vim_id": None,
1399 }
1400
1401 try:
1402 if sdn_vim_id:
1403 target_vim = self.my_vims[ro_task["target_id"]]
1404 target_vim.delete_connectivity_service(
1405 sdn_vim_id, ro_task["vim_info"].get("created_items")
1406 )
1407
1408 except Exception as e:
1409 if (
1410 isinstance(e, sdnconn.SdnConnectorError)
1411 and e.http_code == HTTPStatus.NOT_FOUND.value
1412 ):
1413 ro_vim_item_update_ok["vim_message"] = "already deleted"
1414 else:
1415 self.logger.error(
1416 "ro_task={} vim={} del-sdn-net={}: {}".format(
1417 ro_task["_id"], ro_task["target_id"], sdn_vim_id, e
1418 ),
1419 exc_info=not isinstance(
1420 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1421 ),
1422 )
1423 ro_vim_item_update = {
1424 "vim_status": "VIM_ERROR",
1425 "vim_message": "Error while deleting: {}".format(e),
1426 }
1427
1428 return "FAILED", ro_vim_item_update
1429
1430 self.logger.debug(
1431 "task={} {} del-sdn-net={} {}".format(
1432 task_id,
1433 ro_task["target_id"],
1434 sdn_vim_id,
1435 ro_vim_item_update_ok.get("vim_message", ""),
1436 )
1437 )
1438
1439 return "DONE", ro_vim_item_update_ok
1440
1441
1442 class VimInteractionMigration(VimInteractionBase):
1443 def exec(self, ro_task, task_index, task_depends):
1444 task = ro_task["tasks"][task_index]
1445 task_id = task["task_id"]
1446 db_task_update = {"retries": 0}
1447 target_vim = self.my_vims[ro_task["target_id"]]
1448 vim_interfaces = []
1449 created = False
1450 created_items = {}
1451 refreshed_vim_info = {}
1452
1453 try:
1454 vim_vm_id = ""
1455 if task.get("params"):
1456 vim_vm_id = task["params"].get("vim_vm_id")
1457 migrate_host = task["params"].get("migrate_host")
1458 _, migrated_compute_node = target_vim.migrate_instance(
1459 vim_vm_id, migrate_host
1460 )
1461
1462 if migrated_compute_node:
1463 # When VM is migrated, vdu["vim_info"] needs to be updated
1464 vdu_old_vim_info = task["params"]["vdu_vim_info"].get(
1465 ro_task["target_id"]
1466 )
1467
1468 # Refresh VM to get new vim_info
1469 vm_to_refresh_list = [vim_vm_id]
1470 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
1471 refreshed_vim_info = vim_dict[vim_vm_id]
1472
1473 if refreshed_vim_info.get("interfaces"):
1474 for old_iface in vdu_old_vim_info.get("interfaces"):
1475 iface = next(
1476 (
1477 iface
1478 for iface in refreshed_vim_info["interfaces"]
1479 if old_iface["vim_interface_id"]
1480 == iface["vim_interface_id"]
1481 ),
1482 None,
1483 )
1484 vim_interfaces.append(iface)
1485
1486 ro_vim_item_update = {
1487 "vim_id": vim_vm_id,
1488 "vim_status": "ACTIVE",
1489 "created": created,
1490 "created_items": created_items,
1491 "vim_details": None,
1492 "vim_message": None,
1493 }
1494
1495 if refreshed_vim_info and refreshed_vim_info.get("status") not in (
1496 "ERROR",
1497 "VIM_ERROR",
1498 ):
1499 ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"]
1500
1501 if vim_interfaces:
1502 ro_vim_item_update["interfaces"] = vim_interfaces
1503
1504 self.logger.debug(
1505 "task={} {} vm-migration done".format(task_id, ro_task["target_id"])
1506 )
1507
1508 return "DONE", ro_vim_item_update, db_task_update
1509
1510 except (vimconn.VimConnException, NsWorkerException) as e:
1511 self.logger.error(
1512 "task={} vim={} VM Migration:"
1513 " {}".format(task_id, ro_task["target_id"], e)
1514 )
1515 ro_vim_item_update = {
1516 "vim_status": "VIM_ERROR",
1517 "created": created,
1518 "vim_message": str(e),
1519 }
1520
1521 return "FAILED", ro_vim_item_update, db_task_update
1522
1523
1524 class VimInteractionResize(VimInteractionBase):
1525 def exec(self, ro_task, task_index, task_depends):
1526 task = ro_task["tasks"][task_index]
1527 task_id = task["task_id"]
1528 db_task_update = {"retries": 0}
1529 created = False
1530 target_flavor_uuid = None
1531 created_items = {}
1532 refreshed_vim_info = {}
1533 target_vim = self.my_vims[ro_task["target_id"]]
1534
1535 try:
1536 vim_vm_id = ""
1537 if task.get("params"):
1538 vim_vm_id = task["params"].get("vim_vm_id")
1539 flavor_dict = task["params"].get("flavor_dict")
1540 self.logger.info("flavor_dict %s", flavor_dict)
1541
1542 try:
1543 target_flavor_uuid = target_vim.get_flavor_id_from_data(flavor_dict)
1544 except Exception as e:
1545 self.logger.info("Cannot find any flavor matching %s.", str(e))
1546 try:
1547 target_flavor_uuid = target_vim.new_flavor(flavor_dict)
1548 except Exception as e:
1549 self.logger.error("Error creating flavor at VIM %s.", str(e))
1550
1551 if target_flavor_uuid is not None:
1552 resized_status = target_vim.resize_instance(
1553 vim_vm_id, target_flavor_uuid
1554 )
1555
1556 if resized_status:
1557 # Refresh VM to get new vim_info
1558 vm_to_refresh_list = [vim_vm_id]
1559 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
1560 refreshed_vim_info = vim_dict[vim_vm_id]
1561
1562 ro_vim_item_update = {
1563 "vim_id": vim_vm_id,
1564 "vim_status": "ACTIVE",
1565 "created": created,
1566 "created_items": created_items,
1567 "vim_details": None,
1568 "vim_message": None,
1569 }
1570
1571 if refreshed_vim_info and refreshed_vim_info.get("status") not in (
1572 "ERROR",
1573 "VIM_ERROR",
1574 ):
1575 ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"]
1576
1577 self.logger.debug(
1578 "task={} {} resize done".format(task_id, ro_task["target_id"])
1579 )
1580 return "DONE", ro_vim_item_update, db_task_update
1581 except (vimconn.VimConnException, NsWorkerException) as e:
1582 self.logger.error(
1583 "task={} vim={} Resize:" " {}".format(task_id, ro_task["target_id"], e)
1584 )
1585 ro_vim_item_update = {
1586 "vim_status": "VIM_ERROR",
1587 "created": created,
1588 "vim_message": str(e),
1589 }
1590
1591 return "FAILED", ro_vim_item_update, db_task_update
1592
1593
1594 class ConfigValidate:
1595 def __init__(self, config: Dict):
1596 self.conf = config
1597
1598 @property
1599 def active(self):
1600 # default 1 min, allowed >= 60 or -1, -1 disables periodic checks
1601 if (
1602 self.conf["period"]["refresh_active"] >= 60
1603 or self.conf["period"]["refresh_active"] == -1
1604 ):
1605 return self.conf["period"]["refresh_active"]
1606
1607 return 60
1608
1609 @property
1610 def build(self):
1611 return self.conf["period"]["refresh_build"]
1612
1613 @property
1614 def image(self):
1615 return self.conf["period"]["refresh_image"]
1616
1617 @property
1618 def error(self):
1619 return self.conf["period"]["refresh_error"]
1620
1621 @property
1622 def queue_size(self):
1623 return self.conf["period"]["queue_size"]
1624
1625
1626 class NsWorker(threading.Thread):
1627 def __init__(self, worker_index, config, plugins, db):
1628 """
1629 :param worker_index: thread index
1630 :param config: general configuration of RO, among others the process_id with the docker id where it runs
1631 :param plugins: global shared dict with the loaded plugins
1632 :param db: database class instance to use
1633 """
1634 threading.Thread.__init__(self)
1635 self.config = config
1636 self.plugins = plugins
1637 self.plugin_name = "unknown"
1638 self.logger = logging.getLogger("ro.worker{}".format(worker_index))
1639 self.worker_index = worker_index
1640 # refresh periods for created items
1641 self.refresh_config = ConfigValidate(config)
1642 self.task_queue = queue.Queue(self.refresh_config.queue_size)
1643 # targetvim: vimplugin class
1644 self.my_vims = {}
1645 # targetvim: vim information from database
1646 self.db_vims = {}
1647 # targetvim list
1648 self.vim_targets = []
1649 self.my_id = config["process_id"] + ":" + str(worker_index)
1650 self.db = db
1651 self.item2class = {
1652 "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger),
1653 "shared-volumes": VimInteractionSharedVolume(
1654 self.db, self.my_vims, self.db_vims, self.logger
1655 ),
1656 "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger),
1657 "image": VimInteractionImage(
1658 self.db, self.my_vims, self.db_vims, self.logger
1659 ),
1660 "flavor": VimInteractionFlavor(
1661 self.db, self.my_vims, self.db_vims, self.logger
1662 ),
1663 "sdn_net": VimInteractionSdnNet(
1664 self.db, self.my_vims, self.db_vims, self.logger
1665 ),
1666 "update": VimInteractionUpdateVdu(
1667 self.db, self.my_vims, self.db_vims, self.logger
1668 ),
1669 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
1670 self.db, self.my_vims, self.db_vims, self.logger
1671 ),
1672 "migrate": VimInteractionMigration(
1673 self.db, self.my_vims, self.db_vims, self.logger
1674 ),
1675 "verticalscale": VimInteractionResize(
1676 self.db, self.my_vims, self.db_vims, self.logger
1677 ),
1678 }
1679 self.time_last_task_processed = None
1680 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1681 self.tasks_to_delete = []
1682 # it is idle when there are not vim_targets associated
1683 self.idle = True
1684 self.task_locked_time = config["global"]["task_locked_time"]
1685
1686 def insert_task(self, task):
1687 try:
1688 self.task_queue.put(task, False)
1689 return None
1690 except queue.Full:
1691 raise NsWorkerException("timeout inserting a task")
1692
1693 def terminate(self):
1694 self.insert_task("exit")
1695
1696 def del_task(self, task):
1697 with self.task_lock:
1698 if task["status"] == "SCHEDULED":
1699 task["status"] = "SUPERSEDED"
1700 return True
1701 else: # task["status"] == "processing"
1702 self.task_lock.release()
1703 return False
1704
1705 def _process_vim_config(self, target_id: str, db_vim: dict) -> None:
1706 """
1707 Process vim config, creating vim configuration files as ca_cert
1708 :param target_id: vim/sdn/wim + id
1709 :param db_vim: Vim dictionary obtained from database
1710 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1711 """
1712 if not db_vim.get("config"):
1713 return
1714
1715 file_name = ""
1716 work_dir = "/app/osm_ro/certs"
1717
1718 try:
1719 if db_vim["config"].get("ca_cert_content"):
1720 file_name = f"{work_dir}/{target_id}:{self.worker_index}"
1721
1722 if not path.isdir(file_name):
1723 makedirs(file_name)
1724
1725 file_name = file_name + "/ca_cert"
1726
1727 with open(file_name, "w") as f:
1728 f.write(db_vim["config"]["ca_cert_content"])
1729 del db_vim["config"]["ca_cert_content"]
1730 db_vim["config"]["ca_cert"] = file_name
1731 except Exception as e:
1732 raise NsWorkerException(
1733 "Error writing to file '{}': {}".format(file_name, e)
1734 )
1735
1736 def _load_plugin(self, name, type="vim"):
1737 # type can be vim or sdn
1738 if "rovim_dummy" not in self.plugins:
1739 self.plugins["rovim_dummy"] = VimDummyConnector
1740
1741 if "rosdn_dummy" not in self.plugins:
1742 self.plugins["rosdn_dummy"] = SdnDummyConnector
1743
1744 if name in self.plugins:
1745 return self.plugins[name]
1746
1747 try:
1748 for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name):
1749 self.plugins[name] = ep.load()
1750 except Exception as e:
1751 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e))
1752
1753 if name and name not in self.plugins:
1754 raise NsWorkerException(
1755 "Plugin 'osm_{n}' has not been installed".format(n=name)
1756 )
1757
1758 return self.plugins[name]
1759
1760 def _unload_vim(self, target_id):
1761 """
1762 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1763 :param target_id: Contains type:_id; where type can be 'vim', ...
1764 :return: None.
1765 """
1766 try:
1767 self.db_vims.pop(target_id, None)
1768 self.my_vims.pop(target_id, None)
1769
1770 if target_id in self.vim_targets:
1771 self.vim_targets.remove(target_id)
1772
1773 self.logger.info("Unloaded {}".format(target_id))
1774 except Exception as e:
1775 self.logger.error("Cannot unload {}: {}".format(target_id, e))
1776
1777 def _check_vim(self, target_id):
1778 """
1779 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1780 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1781 :return: None.
1782 """
1783 target, _, _id = target_id.partition(":")
1784 now = time.time()
1785 update_dict = {}
1786 unset_dict = {}
1787 op_text = ""
1788 step = ""
1789 loaded = target_id in self.vim_targets
1790 target_database = (
1791 "vim_accounts"
1792 if target == "vim"
1793 else "wim_accounts"
1794 if target == "wim"
1795 else "sdns"
1796 )
1797 error_text = ""
1798
1799 try:
1800 step = "Getting {} from db".format(target_id)
1801 db_vim = self.db.get_one(target_database, {"_id": _id})
1802
1803 for op_index, operation in enumerate(
1804 db_vim["_admin"].get("operations", ())
1805 ):
1806 if operation["operationState"] != "PROCESSING":
1807 continue
1808
1809 locked_at = operation.get("locked_at")
1810
1811 if locked_at is not None and locked_at >= now - self.task_locked_time:
1812 # some other thread is doing this operation
1813 return
1814
1815 # lock
1816 op_text = "_admin.operations.{}.".format(op_index)
1817
1818 if not self.db.set_one(
1819 target_database,
1820 q_filter={
1821 "_id": _id,
1822 op_text + "operationState": "PROCESSING",
1823 op_text + "locked_at": locked_at,
1824 },
1825 update_dict={
1826 op_text + "locked_at": now,
1827 "admin.current_operation": op_index,
1828 },
1829 fail_on_empty=False,
1830 ):
1831 return
1832
1833 unset_dict[op_text + "locked_at"] = None
1834 unset_dict["current_operation"] = None
1835 step = "Loading " + target_id
1836 error_text = self._load_vim(target_id)
1837
1838 if not error_text:
1839 step = "Checking connectivity"
1840
1841 if target == "vim":
1842 self.my_vims[target_id].check_vim_connectivity()
1843 else:
1844 self.my_vims[target_id].check_credentials()
1845
1846 update_dict["_admin.operationalState"] = "ENABLED"
1847 update_dict["_admin.detailed-status"] = ""
1848 unset_dict[op_text + "detailed-status"] = None
1849 update_dict[op_text + "operationState"] = "COMPLETED"
1850
1851 return
1852
1853 except Exception as e:
1854 error_text = "{}: {}".format(step, e)
1855 self.logger.error("{} for {}: {}".format(step, target_id, e))
1856
1857 finally:
1858 if update_dict or unset_dict:
1859 if error_text:
1860 update_dict[op_text + "operationState"] = "FAILED"
1861 update_dict[op_text + "detailed-status"] = error_text
1862 unset_dict.pop(op_text + "detailed-status", None)
1863 update_dict["_admin.operationalState"] = "ERROR"
1864 update_dict["_admin.detailed-status"] = error_text
1865
1866 if op_text:
1867 update_dict[op_text + "statusEnteredTime"] = now
1868
1869 self.db.set_one(
1870 target_database,
1871 q_filter={"_id": _id},
1872 update_dict=update_dict,
1873 unset=unset_dict,
1874 fail_on_empty=False,
1875 )
1876
1877 if not loaded:
1878 self._unload_vim(target_id)
1879
1880 def _reload_vim(self, target_id):
1881 if target_id in self.vim_targets:
1882 self._load_vim(target_id)
1883 else:
1884 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1885 # just remove it to force load again next time it is needed
1886 self.db_vims.pop(target_id, None)
1887
1888 def _load_vim(self, target_id):
1889 """
1890 Load or reload a vim_account, sdn_controller or wim_account.
1891 Read content from database, load the plugin if not loaded.
1892 In case of error loading the plugin, it loads a failing VIM_connector
1893 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1894 :param target_id: Contains type:_id; where type can be 'vim', ...
1895 :return: None if ok, descriptive text if error
1896 """
1897 target, _, _id = target_id.partition(":")
1898 target_database = (
1899 "vim_accounts"
1900 if target == "vim"
1901 else "wim_accounts"
1902 if target == "wim"
1903 else "sdns"
1904 )
1905 plugin_name = ""
1906 vim = None
1907 step = "Getting {}={} from db".format(target, _id)
1908
1909 try:
1910 # TODO process for wim, sdnc, ...
1911 vim = self.db.get_one(target_database, {"_id": _id})
1912
1913 # if deep_get(vim, "config", "sdn-controller"):
1914 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1915 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1916
1917 step = "Decrypting password"
1918 schema_version = vim.get("schema_version")
1919 self.db.encrypt_decrypt_fields(
1920 vim,
1921 "decrypt",
1922 fields=("password", "secret"),
1923 schema_version=schema_version,
1924 salt=_id,
1925 )
1926 self._process_vim_config(target_id, vim)
1927
1928 if target == "vim":
1929 plugin_name = "rovim_" + vim["vim_type"]
1930 step = "Loading plugin '{}'".format(plugin_name)
1931 vim_module_conn = self._load_plugin(plugin_name)
1932 step = "Loading {}'".format(target_id)
1933 self.my_vims[target_id] = vim_module_conn(
1934 uuid=vim["_id"],
1935 name=vim["name"],
1936 tenant_id=vim.get("vim_tenant_id"),
1937 tenant_name=vim.get("vim_tenant_name"),
1938 url=vim["vim_url"],
1939 url_admin=None,
1940 user=vim["vim_user"],
1941 passwd=vim["vim_password"],
1942 config=vim.get("config") or {},
1943 persistent_info={},
1944 )
1945 else: # sdn
1946 plugin_name = "rosdn_" + (vim.get("type") or vim.get("wim_type"))
1947 step = "Loading plugin '{}'".format(plugin_name)
1948 vim_module_conn = self._load_plugin(plugin_name, "sdn")
1949 step = "Loading {}'".format(target_id)
1950 wim = deepcopy(vim)
1951 wim_config = wim.pop("config", {}) or {}
1952 wim["uuid"] = wim["_id"]
1953 if "url" in wim and "wim_url" not in wim:
1954 wim["wim_url"] = wim["url"]
1955 elif "url" not in wim and "wim_url" in wim:
1956 wim["url"] = wim["wim_url"]
1957
1958 if wim.get("dpid"):
1959 wim_config["dpid"] = wim.pop("dpid")
1960
1961 if wim.get("switch_id"):
1962 wim_config["switch_id"] = wim.pop("switch_id")
1963
1964 # wim, wim_account, config
1965 self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config)
1966 self.db_vims[target_id] = vim
1967 self.error_status = None
1968
1969 self.logger.info(
1970 "Connector loaded for {}, plugin={}".format(target_id, plugin_name)
1971 )
1972 except Exception as e:
1973 self.logger.error(
1974 "Cannot load {} plugin={}: {} {}".format(
1975 target_id, plugin_name, step, e
1976 )
1977 )
1978
1979 self.db_vims[target_id] = vim or {}
1980 self.db_vims[target_id] = FailingConnector(str(e))
1981 error_status = "{} Error: {}".format(step, e)
1982
1983 return error_status
1984 finally:
1985 if target_id not in self.vim_targets:
1986 self.vim_targets.append(target_id)
1987
1988 def _get_db_task(self):
1989 """
1990 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1991 :return: None
1992 """
1993 now = time.time()
1994
1995 if not self.time_last_task_processed:
1996 self.time_last_task_processed = now
1997
1998 try:
1999 while True:
2000 """
2001 # Log RO tasks only when loglevel is DEBUG
2002 if self.logger.getEffectiveLevel() == logging.DEBUG:
2003 self._log_ro_task(
2004 None,
2005 None,
2006 None,
2007 "TASK_WF",
2008 "task_locked_time="
2009 + str(self.task_locked_time)
2010 + " "
2011 + "time_last_task_processed="
2012 + str(self.time_last_task_processed)
2013 + " "
2014 + "now="
2015 + str(now),
2016 )
2017 """
2018 locked = self.db.set_one(
2019 "ro_tasks",
2020 q_filter={
2021 "target_id": self.vim_targets,
2022 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
2023 "locked_at.lt": now - self.task_locked_time,
2024 "to_check_at.lt": self.time_last_task_processed,
2025 "to_check_at.gt": -1,
2026 },
2027 update_dict={"locked_by": self.my_id, "locked_at": now},
2028 fail_on_empty=False,
2029 )
2030
2031 if locked:
2032 # read and return
2033 ro_task = self.db.get_one(
2034 "ro_tasks",
2035 q_filter={
2036 "target_id": self.vim_targets,
2037 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
2038 "locked_at": now,
2039 },
2040 )
2041 return ro_task
2042
2043 if self.time_last_task_processed == now:
2044 self.time_last_task_processed = None
2045 return None
2046 else:
2047 self.time_last_task_processed = now
2048 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
2049
2050 except DbException as e:
2051 self.logger.error("Database exception at _get_db_task: {}".format(e))
2052 except Exception as e:
2053 self.logger.critical(
2054 "Unexpected exception at _get_db_task: {}".format(e), exc_info=True
2055 )
2056
2057 return None
2058
2059 def _delete_task(self, ro_task, task_index, task_depends, db_update):
2060 """
2061 Determine if this task need to be done or superseded
2062 :return: None
2063 """
2064 my_task = ro_task["tasks"][task_index]
2065 task_id = my_task["task_id"]
2066 needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get(
2067 "created_items", False
2068 )
2069
2070 self.logger.debug("Needed delete: {}".format(needed_delete))
2071 if my_task["status"] == "FAILED":
2072 return None, None # TODO need to be retry??
2073
2074 try:
2075 for index, task in enumerate(ro_task["tasks"]):
2076 if index == task_index or not task:
2077 continue # own task
2078
2079 if (
2080 my_task["target_record"] == task["target_record"]
2081 and task["action"] == "CREATE"
2082 ):
2083 # set to finished
2084 db_update["tasks.{}.status".format(index)] = task[
2085 "status"
2086 ] = "FINISHED"
2087 elif task["action"] == "CREATE" and task["status"] not in (
2088 "FINISHED",
2089 "SUPERSEDED",
2090 ):
2091 needed_delete = False
2092
2093 if needed_delete:
2094 self.logger.debug(
2095 "Deleting ro_task={} task_index={}".format(ro_task, task_index)
2096 )
2097 return self.item2class[my_task["item"]].delete(ro_task, task_index)
2098 else:
2099 return "SUPERSEDED", None
2100 except Exception as e:
2101 if not isinstance(e, NsWorkerException):
2102 self.logger.critical(
2103 "Unexpected exception at _delete_task task={}: {}".format(
2104 task_id, e
2105 ),
2106 exc_info=True,
2107 )
2108
2109 return "FAILED", {"vim_status": "VIM_ERROR", "vim_message": str(e)}
2110
2111 def _create_task(self, ro_task, task_index, task_depends, db_update):
2112 """
2113 Determine if this task need to create something at VIM
2114 :return: None
2115 """
2116 my_task = ro_task["tasks"][task_index]
2117 task_id = my_task["task_id"]
2118
2119 if my_task["status"] == "FAILED":
2120 return None, None # TODO need to be retry??
2121 elif my_task["status"] == "SCHEDULED":
2122 # check if already created by another task
2123 for index, task in enumerate(ro_task["tasks"]):
2124 if index == task_index or not task:
2125 continue # own task
2126
2127 if task["action"] == "CREATE" and task["status"] not in (
2128 "SCHEDULED",
2129 "FINISHED",
2130 "SUPERSEDED",
2131 ):
2132 return task["status"], "COPY_VIM_INFO"
2133
2134 try:
2135 task_status, ro_vim_item_update = self.item2class[my_task["item"]].new(
2136 ro_task, task_index, task_depends
2137 )
2138 # TODO update other CREATE tasks
2139 except Exception as e:
2140 if not isinstance(e, NsWorkerException):
2141 self.logger.error(
2142 "Error executing task={}: {}".format(task_id, e), exc_info=True
2143 )
2144
2145 task_status = "FAILED"
2146 ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_message": str(e)}
2147 # TODO update ro_vim_item_update
2148
2149 return task_status, ro_vim_item_update
2150 else:
2151 return None, None
2152
2153 def _get_dependency(self, task_id, ro_task=None, target_id=None):
2154 """
2155 Look for dependency task
2156 :param task_id: Can be one of
2157 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2158 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2159 3. task.task_id: "<action_id>:number"
2160 :param ro_task:
2161 :param target_id:
2162 :return: database ro_task plus index of task
2163 """
2164 if (
2165 task_id.startswith("vim:")
2166 or task_id.startswith("sdn:")
2167 or task_id.startswith("wim:")
2168 ):
2169 target_id, _, task_id = task_id.partition(" ")
2170
2171 if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
2172 ro_task_dependency = self.db.get_one(
2173 "ro_tasks",
2174 q_filter={"target_id": target_id, "tasks.target_record_id": task_id},
2175 fail_on_empty=False,
2176 )
2177
2178 if ro_task_dependency:
2179 for task_index, task in enumerate(ro_task_dependency["tasks"]):
2180 if task["target_record_id"] == task_id:
2181 return ro_task_dependency, task_index
2182
2183 else:
2184 if ro_task:
2185 for task_index, task in enumerate(ro_task["tasks"]):
2186 if task and task["task_id"] == task_id:
2187 return ro_task, task_index
2188
2189 ro_task_dependency = self.db.get_one(
2190 "ro_tasks",
2191 q_filter={
2192 "tasks.ANYINDEX.task_id": task_id,
2193 "tasks.ANYINDEX.target_record.ne": None,
2194 },
2195 fail_on_empty=False,
2196 )
2197
2198 self.logger.debug("ro_task_dependency={}".format(ro_task_dependency))
2199 if ro_task_dependency:
2200 for task_index, task in enumerate(ro_task_dependency["tasks"]):
2201 if task["task_id"] == task_id:
2202 return ro_task_dependency, task_index
2203 raise NsWorkerException("Cannot get depending task {}".format(task_id))
2204
2205 def update_vm_refresh(self, ro_task):
2206 """Enables the VM status updates if self.refresh_config.active parameter
2207 is not -1 and then updates the DB accordingly
2208
2209 """
2210 try:
2211 self.logger.debug("Checking if VM status update config")
2212 next_refresh = time.time()
2213 next_refresh = self._get_next_refresh(ro_task, next_refresh)
2214
2215 if next_refresh != -1:
2216 db_ro_task_update = {}
2217 now = time.time()
2218 next_check_at = now + (24 * 60 * 60)
2219 next_check_at = min(next_check_at, next_refresh)
2220 db_ro_task_update["vim_info.refresh_at"] = next_refresh
2221 db_ro_task_update["to_check_at"] = next_check_at
2222
2223 self.logger.debug(
2224 "Finding tasks which to be updated to enable VM status updates"
2225 )
2226 refresh_tasks = self.db.get_list(
2227 "ro_tasks",
2228 q_filter={
2229 "tasks.status": "DONE",
2230 "to_check_at.lt": 0,
2231 },
2232 )
2233 self.logger.debug("Updating tasks to change the to_check_at status")
2234 for task in refresh_tasks:
2235 q_filter = {
2236 "_id": task["_id"],
2237 }
2238 self.db.set_one(
2239 "ro_tasks",
2240 q_filter=q_filter,
2241 update_dict=db_ro_task_update,
2242 fail_on_empty=True,
2243 )
2244
2245 except Exception as e:
2246 self.logger.error(f"Error updating tasks to enable VM status updates: {e}")
2247
2248 def _get_next_refresh(self, ro_task: dict, next_refresh: float):
2249 """Decide the next_refresh according to vim type and refresh config period.
2250 Args:
2251 ro_task (dict): ro_task details
2252 next_refresh (float): next refresh time as epoch format
2253
2254 Returns:
2255 next_refresh (float) -1 if vm updates are disabled or vim type is openstack.
2256 """
2257 target_vim = ro_task["target_id"]
2258 vim_type = self.db_vims[target_vim]["vim_type"]
2259 if self.refresh_config.active == -1 or vim_type == "openstack":
2260 next_refresh = -1
2261 else:
2262 next_refresh += self.refresh_config.active
2263 return next_refresh
2264
2265 def _process_pending_tasks(self, ro_task):
2266 ro_task_id = ro_task["_id"]
2267 now = time.time()
2268 # one day
2269 next_check_at = now + (24 * 60 * 60)
2270 db_ro_task_update = {}
2271
2272 def _update_refresh(new_status):
2273 # compute next_refresh
2274 nonlocal task
2275 nonlocal next_check_at
2276 nonlocal db_ro_task_update
2277 nonlocal ro_task
2278
2279 next_refresh = time.time()
2280
2281 if task["item"] in ("image", "flavor"):
2282 next_refresh += self.refresh_config.image
2283 elif new_status == "BUILD":
2284 next_refresh += self.refresh_config.build
2285 elif new_status == "DONE":
2286 next_refresh = self._get_next_refresh(ro_task, next_refresh)
2287 else:
2288 next_refresh += self.refresh_config.error
2289
2290 next_check_at = min(next_check_at, next_refresh)
2291 db_ro_task_update["vim_info.refresh_at"] = next_refresh
2292 ro_task["vim_info"]["refresh_at"] = next_refresh
2293
2294 try:
2295 """
2296 # Log RO tasks only when loglevel is DEBUG
2297 if self.logger.getEffectiveLevel() == logging.DEBUG:
2298 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
2299 """
2300 # Check if vim status refresh is enabled again
2301 self.update_vm_refresh(ro_task)
2302 # 0: get task_status_create
2303 lock_object = None
2304 task_status_create = None
2305 task_create = next(
2306 (
2307 t
2308 for t in ro_task["tasks"]
2309 if t
2310 and t["action"] == "CREATE"
2311 and t["status"] in ("BUILD", "DONE")
2312 ),
2313 None,
2314 )
2315
2316 if task_create:
2317 task_status_create = task_create["status"]
2318
2319 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
2320 for task_action in ("DELETE", "CREATE", "EXEC"):
2321 db_vim_update = None
2322 new_status = None
2323
2324 for task_index, task in enumerate(ro_task["tasks"]):
2325 if not task:
2326 continue # task deleted
2327
2328 task_depends = {}
2329 target_update = None
2330
2331 if (
2332 (
2333 task_action in ("DELETE", "EXEC")
2334 and task["status"] not in ("SCHEDULED", "BUILD")
2335 )
2336 or task["action"] != task_action
2337 or (
2338 task_action == "CREATE"
2339 and task["status"] in ("FINISHED", "SUPERSEDED")
2340 )
2341 ):
2342 continue
2343
2344 task_path = "tasks.{}.status".format(task_index)
2345 try:
2346 db_vim_info_update = None
2347 dependency_ro_task = {}
2348
2349 if task["status"] == "SCHEDULED":
2350 # check if tasks that this depends on have been completed
2351 dependency_not_completed = False
2352
2353 for dependency_task_id in task.get("depends_on") or ():
2354 (
2355 dependency_ro_task,
2356 dependency_task_index,
2357 ) = self._get_dependency(
2358 dependency_task_id, target_id=ro_task["target_id"]
2359 )
2360 dependency_task = dependency_ro_task["tasks"][
2361 dependency_task_index
2362 ]
2363 self.logger.debug(
2364 "dependency_ro_task={} dependency_task_index={}".format(
2365 dependency_ro_task, dependency_task_index
2366 )
2367 )
2368
2369 if dependency_task["status"] == "SCHEDULED":
2370 dependency_not_completed = True
2371 next_check_at = min(
2372 next_check_at, dependency_ro_task["to_check_at"]
2373 )
2374 # must allow dependent task to be processed first
2375 # to do this set time after last_task_processed
2376 next_check_at = max(
2377 self.time_last_task_processed, next_check_at
2378 )
2379 break
2380 elif dependency_task["status"] == "FAILED":
2381 error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
2382 task["action"],
2383 task["item"],
2384 dependency_task["action"],
2385 dependency_task["item"],
2386 dependency_task_id,
2387 dependency_ro_task["vim_info"].get(
2388 "vim_message"
2389 ),
2390 )
2391 self.logger.error(
2392 "task={} {}".format(task["task_id"], error_text)
2393 )
2394 raise NsWorkerException(error_text)
2395
2396 task_depends[dependency_task_id] = dependency_ro_task[
2397 "vim_info"
2398 ]["vim_id"]
2399 task_depends[
2400 "TASK-{}".format(dependency_task_id)
2401 ] = dependency_ro_task["vim_info"]["vim_id"]
2402
2403 if dependency_not_completed:
2404 self.logger.warning(
2405 "DEPENDENCY NOT COMPLETED {}".format(
2406 dependency_ro_task["vim_info"]["vim_id"]
2407 )
2408 )
2409 # TODO set at vim_info.vim_details that it is waiting
2410 continue
2411
2412 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
2413 # the task of renew this locking. It will update database locket_at periodically
2414 if not lock_object:
2415 lock_object = LockRenew.add_lock_object(
2416 "ro_tasks", ro_task, self
2417 )
2418 if task["action"] == "DELETE":
2419 (
2420 new_status,
2421 db_vim_info_update,
2422 ) = self._delete_task(
2423 ro_task, task_index, task_depends, db_ro_task_update
2424 )
2425 new_status = (
2426 "FINISHED" if new_status == "DONE" else new_status
2427 )
2428 # ^with FINISHED instead of DONE it will not be refreshing
2429
2430 if new_status in ("FINISHED", "SUPERSEDED"):
2431 target_update = "DELETE"
2432 elif task["action"] == "EXEC":
2433 (
2434 new_status,
2435 db_vim_info_update,
2436 db_task_update,
2437 ) = self.item2class[task["item"]].exec(
2438 ro_task, task_index, task_depends
2439 )
2440 new_status = (
2441 "FINISHED" if new_status == "DONE" else new_status
2442 )
2443 # ^with FINISHED instead of DONE it will not be refreshing
2444
2445 if db_task_update:
2446 # load into database the modified db_task_update "retries" and "next_retry"
2447 if db_task_update.get("retries"):
2448 db_ro_task_update[
2449 "tasks.{}.retries".format(task_index)
2450 ] = db_task_update["retries"]
2451
2452 next_check_at = time.time() + db_task_update.get(
2453 "next_retry", 60
2454 )
2455 target_update = None
2456 elif task["action"] == "CREATE":
2457 if task["status"] == "SCHEDULED":
2458 if task_status_create:
2459 new_status = task_status_create
2460 target_update = "COPY_VIM_INFO"
2461 else:
2462 new_status, db_vim_info_update = self.item2class[
2463 task["item"]
2464 ].new(ro_task, task_index, task_depends)
2465 _update_refresh(new_status)
2466 else:
2467 refresh_at = ro_task["vim_info"]["refresh_at"]
2468 if refresh_at and refresh_at != -1 and now > refresh_at:
2469 (
2470 new_status,
2471 db_vim_info_update,
2472 ) = self.item2class[
2473 task["item"]
2474 ].refresh(ro_task)
2475 _update_refresh(new_status)
2476 else:
2477 # The refresh is updated to avoid set the value of "refresh_at" to
2478 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2479 # because it can happen that in this case the task is never processed
2480 _update_refresh(task["status"])
2481
2482 except Exception as e:
2483 new_status = "FAILED"
2484 db_vim_info_update = {
2485 "vim_status": "VIM_ERROR",
2486 "vim_message": str(e),
2487 }
2488
2489 if not isinstance(
2490 e, (NsWorkerException, vimconn.VimConnException)
2491 ):
2492 self.logger.error(
2493 "Unexpected exception at _delete_task task={}: {}".format(
2494 task["task_id"], e
2495 ),
2496 exc_info=True,
2497 )
2498
2499 try:
2500 if db_vim_info_update:
2501 db_vim_update = db_vim_info_update.copy()
2502 db_ro_task_update.update(
2503 {
2504 "vim_info." + k: v
2505 for k, v in db_vim_info_update.items()
2506 }
2507 )
2508 ro_task["vim_info"].update(db_vim_info_update)
2509
2510 if new_status:
2511 if task_action == "CREATE":
2512 task_status_create = new_status
2513 db_ro_task_update[task_path] = new_status
2514
2515 if target_update or db_vim_update:
2516 if target_update == "DELETE":
2517 self._update_target(task, None)
2518 elif target_update == "COPY_VIM_INFO":
2519 self._update_target(task, ro_task["vim_info"])
2520 else:
2521 self._update_target(task, db_vim_update)
2522
2523 except Exception as e:
2524 if (
2525 isinstance(e, DbException)
2526 and e.http_code == HTTPStatus.NOT_FOUND
2527 ):
2528 # if the vnfrs or nsrs has been removed from database, this task must be removed
2529 self.logger.debug(
2530 "marking to delete task={}".format(task["task_id"])
2531 )
2532 self.tasks_to_delete.append(task)
2533 else:
2534 self.logger.error(
2535 "Unexpected exception at _update_target task={}: {}".format(
2536 task["task_id"], e
2537 ),
2538 exc_info=True,
2539 )
2540
2541 locked_at = ro_task["locked_at"]
2542
2543 if lock_object:
2544 locked_at = [
2545 lock_object["locked_at"],
2546 lock_object["locked_at"] + self.task_locked_time,
2547 ]
2548 # locked_at contains two times to avoid race condition. In case the lock has been renewed, it will
2549 # contain exactly locked_at + self.task_locked_time
2550 LockRenew.remove_lock_object(lock_object)
2551
2552 q_filter = {
2553 "_id": ro_task["_id"],
2554 "to_check_at": ro_task["to_check_at"],
2555 "locked_at": locked_at,
2556 }
2557 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2558 # outside this task (by ro_nbi) do not update it
2559 db_ro_task_update["locked_by"] = None
2560 # locked_at converted to int only for debugging. When it is not decimals it means it has been unlocked
2561 db_ro_task_update["locked_at"] = int(now - self.task_locked_time)
2562 db_ro_task_update["modified_at"] = now
2563 db_ro_task_update["to_check_at"] = next_check_at
2564
2565 """
2566 # Log RO tasks only when loglevel is DEBUG
2567 if self.logger.getEffectiveLevel() == logging.DEBUG:
2568 db_ro_task_update_log = db_ro_task_update.copy()
2569 db_ro_task_update_log["_id"] = q_filter["_id"]
2570 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2571 """
2572
2573 if not self.db.set_one(
2574 "ro_tasks",
2575 update_dict=db_ro_task_update,
2576 q_filter=q_filter,
2577 fail_on_empty=False,
2578 ):
2579 del db_ro_task_update["to_check_at"]
2580 del q_filter["to_check_at"]
2581 """
2582 # Log RO tasks only when loglevel is DEBUG
2583 if self.logger.getEffectiveLevel() == logging.DEBUG:
2584 self._log_ro_task(
2585 None,
2586 db_ro_task_update_log,
2587 None,
2588 "TASK_WF",
2589 "SET_TASK " + str(q_filter),
2590 )
2591 """
2592 self.db.set_one(
2593 "ro_tasks",
2594 q_filter=q_filter,
2595 update_dict=db_ro_task_update,
2596 fail_on_empty=True,
2597 )
2598 except DbException as e:
2599 self.logger.error(
2600 "ro_task={} Error updating database {}".format(ro_task_id, e)
2601 )
2602 except Exception as e:
2603 self.logger.error(
2604 "Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True
2605 )
2606
2607 def _update_target(self, task, ro_vim_item_update):
2608 table, _, temp = task["target_record"].partition(":")
2609 _id, _, path_vim_status = temp.partition(":")
2610 path_item = path_vim_status[: path_vim_status.rfind(".")]
2611 path_item = path_item[: path_item.rfind(".")]
2612 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
2613 # path_item: dot separated list targeting record information, e.g. "vdur.10"
2614
2615 if ro_vim_item_update:
2616 update_dict = {
2617 path_vim_status + "." + k: v
2618 for k, v in ro_vim_item_update.items()
2619 if k
2620 in (
2621 "vim_id",
2622 "vim_details",
2623 "vim_message",
2624 "vim_name",
2625 "vim_status",
2626 "interfaces",
2627 "interfaces_backup",
2628 )
2629 }
2630
2631 if path_vim_status.startswith("vdur."):
2632 # for backward compatibility, add vdur.name apart from vdur.vim_name
2633 if ro_vim_item_update.get("vim_name"):
2634 update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"]
2635
2636 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
2637 if ro_vim_item_update.get("vim_id"):
2638 update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"]
2639
2640 # update general status
2641 if ro_vim_item_update.get("vim_status"):
2642 update_dict[path_item + ".status"] = ro_vim_item_update[
2643 "vim_status"
2644 ]
2645
2646 if ro_vim_item_update.get("interfaces"):
2647 path_interfaces = path_item + ".interfaces"
2648
2649 for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
2650 if iface:
2651 update_dict.update(
2652 {
2653 path_interfaces + ".{}.".format(i) + k: v
2654 for k, v in iface.items()
2655 if k in ("vlan", "compute_node", "pci")
2656 }
2657 )
2658
2659 # put ip_address and mac_address with ip-address and mac-address
2660 if iface.get("ip_address"):
2661 update_dict[
2662 path_interfaces + ".{}.".format(i) + "ip-address"
2663 ] = iface["ip_address"]
2664
2665 if iface.get("mac_address"):
2666 update_dict[
2667 path_interfaces + ".{}.".format(i) + "mac-address"
2668 ] = iface["mac_address"]
2669
2670 if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
2671 update_dict["ip-address"] = iface.get("ip_address").split(
2672 ";"
2673 )[0]
2674
2675 if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
2676 update_dict[path_item + ".ip-address"] = iface.get(
2677 "ip_address"
2678 ).split(";")[0]
2679
2680 self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
2681
2682 # If interfaces exists, it backups VDU interfaces in the DB for healing operations
2683 if ro_vim_item_update.get("interfaces"):
2684 search_key = path_vim_status + ".interfaces"
2685 if update_dict.get(search_key):
2686 interfaces_backup_update = {
2687 path_vim_status + ".interfaces_backup": update_dict[search_key]
2688 }
2689
2690 self.db.set_one(
2691 table,
2692 q_filter={"_id": _id},
2693 update_dict=interfaces_backup_update,
2694 )
2695
2696 else:
2697 update_dict = {path_item + ".status": "DELETED"}
2698 self.db.set_one(
2699 table,
2700 q_filter={"_id": _id},
2701 update_dict=update_dict,
2702 unset={path_vim_status: None},
2703 )
2704
2705 def _process_delete_db_tasks(self):
2706 """
2707 Delete task from database because vnfrs or nsrs or both have been deleted
2708 :return: None. Uses and modify self.tasks_to_delete
2709 """
2710 while self.tasks_to_delete:
2711 task = self.tasks_to_delete[0]
2712 vnfrs_deleted = None
2713 nsr_id = task["nsr_id"]
2714
2715 if task["target_record"].startswith("vnfrs:"):
2716 # check if nsrs is present
2717 if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False):
2718 vnfrs_deleted = task["target_record"].split(":")[1]
2719
2720 try:
2721 self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted)
2722 except Exception as e:
2723 self.logger.error(
2724 "Error deleting task={}: {}".format(task["task_id"], e)
2725 )
2726 self.tasks_to_delete.pop(0)
2727
2728 @staticmethod
2729 def delete_db_tasks(db, nsr_id, vnfrs_deleted):
2730 """
2731 Static method because it is called from osm_ng_ro.ns
2732 :param db: instance of database to use
2733 :param nsr_id: affected nsrs id
2734 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2735 :return: None, exception is fails
2736 """
2737 retries = 5
2738 for retry in range(retries):
2739 ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
2740 now = time.time()
2741 conflict = False
2742
2743 for ro_task in ro_tasks:
2744 db_update = {}
2745 to_delete_ro_task = True
2746
2747 for index, task in enumerate(ro_task["tasks"]):
2748 if not task:
2749 pass
2750 elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or (
2751 vnfrs_deleted
2752 and task["target_record"].startswith("vnfrs:" + vnfrs_deleted)
2753 ):
2754 db_update["tasks.{}".format(index)] = None
2755 else:
2756 # used by other nsr, ro_task cannot be deleted
2757 to_delete_ro_task = False
2758
2759 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2760 if to_delete_ro_task:
2761 if not db.del_one(
2762 "ro_tasks",
2763 q_filter={
2764 "_id": ro_task["_id"],
2765 "modified_at": ro_task["modified_at"],
2766 },
2767 fail_on_empty=False,
2768 ):
2769 conflict = True
2770 elif db_update:
2771 db_update["modified_at"] = now
2772 if not db.set_one(
2773 "ro_tasks",
2774 q_filter={
2775 "_id": ro_task["_id"],
2776 "modified_at": ro_task["modified_at"],
2777 },
2778 update_dict=db_update,
2779 fail_on_empty=False,
2780 ):
2781 conflict = True
2782 if not conflict:
2783 return
2784 else:
2785 raise NsWorkerException("Exceeded {} retries".format(retries))
2786
2787 def run(self):
2788 # load database
2789 self.logger.info("Starting")
2790 while True:
2791 # step 1: get commands from queue
2792 try:
2793 if self.vim_targets:
2794 task = self.task_queue.get(block=False)
2795 else:
2796 if not self.idle:
2797 self.logger.debug("enters in idle state")
2798 self.idle = True
2799 task = self.task_queue.get(block=True)
2800 self.idle = False
2801
2802 if task[0] == "terminate":
2803 break
2804 elif task[0] == "load_vim":
2805 self.logger.info("order to load vim {}".format(task[1]))
2806 self._load_vim(task[1])
2807 elif task[0] == "unload_vim":
2808 self.logger.info("order to unload vim {}".format(task[1]))
2809 self._unload_vim(task[1])
2810 elif task[0] == "reload_vim":
2811 self._reload_vim(task[1])
2812 elif task[0] == "check_vim":
2813 self.logger.info("order to check vim {}".format(task[1]))
2814 self._check_vim(task[1])
2815 continue
2816 except Exception as e:
2817 if isinstance(e, queue.Empty):
2818 pass
2819 else:
2820 self.logger.critical(
2821 "Error processing task: {}".format(e), exc_info=True
2822 )
2823
2824 # step 2: process pending_tasks, delete not needed tasks
2825 try:
2826 if self.tasks_to_delete:
2827 self._process_delete_db_tasks()
2828 busy = False
2829 """
2830 # Log RO tasks only when loglevel is DEBUG
2831 if self.logger.getEffectiveLevel() == logging.DEBUG:
2832 _ = self._get_db_all_tasks()
2833 """
2834 ro_task = self._get_db_task()
2835 if ro_task:
2836 self.logger.debug("Task to process: {}".format(ro_task))
2837 time.sleep(1)
2838 self._process_pending_tasks(ro_task)
2839 busy = True
2840 if not busy:
2841 time.sleep(5)
2842 except Exception as e:
2843 self.logger.critical(
2844 "Unexpected exception at run: " + str(e), exc_info=True
2845 )
2846
2847 self.logger.info("Finishing")