Fixing inappropriate warning log level usage
[osm/RO.git] / NG-RO / osm_ng_ro / ns_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 #
18 ##
19
20 """"
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
25 """
26
27 from copy import deepcopy
28 from http import HTTPStatus
29 import logging
30 from os import mkdir
31 import queue
32 from shutil import rmtree
33 import threading
34 import time
35 import traceback
36 from typing import Dict
37 from unittest.mock import Mock
38
39 from importlib_metadata import entry_points
40 from osm_common.dbbase import DbException
41 from osm_ng_ro.vim_admin import LockRenew
42 from osm_ro_plugin import sdnconn, vimconn
43 from osm_ro_plugin.sdn_dummy import SdnDummyConnector
44 from osm_ro_plugin.vim_dummy import VimDummyConnector
45 import yaml
46
47 __author__ = "Alfonso Tierno"
48 __date__ = "$28-Sep-2017 12:07:15$"
49
50
51 def deep_get(target_dict, *args, **kwargs):
52 """
53 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
54 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
55 :param target_dict: dictionary to be read
56 :param args: list of keys to read from target_dict
57 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
58 :return: The wanted value if exist, None or default otherwise
59 """
60 for key in args:
61 if not isinstance(target_dict, dict) or key not in target_dict:
62 return kwargs.get("default")
63 target_dict = target_dict[key]
64 return target_dict
65
66
67 class NsWorkerException(Exception):
68 pass
69
70
71 class FailingConnector:
72 def __init__(self, error_msg):
73 self.error_msg = error_msg
74
75 for method in dir(vimconn.VimConnector):
76 if method[0] != "_":
77 setattr(
78 self, method, Mock(side_effect=vimconn.VimConnException(error_msg))
79 )
80
81 for method in dir(sdnconn.SdnConnectorBase):
82 if method[0] != "_":
83 setattr(
84 self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg))
85 )
86
87
88 class NsWorkerExceptionNotFound(NsWorkerException):
89 pass
90
91
92 class VimInteractionBase:
93 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
94 It implements methods that does nothing and return ok"""
95
96 def __init__(self, db, my_vims, db_vims, logger):
97 self.db = db
98 self.logger = logger
99 self.my_vims = my_vims
100 self.db_vims = db_vims
101
102 def new(self, ro_task, task_index, task_depends):
103 return "BUILD", {}
104
105 def refresh(self, ro_task):
106 """skip calling VIM to get image, flavor status. Assumes ok"""
107 if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
108 return "FAILED", {}
109
110 return "DONE", {}
111
112 def delete(self, ro_task, task_index):
113 """skip calling VIM to delete image. Assumes ok"""
114 return "DONE", {}
115
116 def exec(self, ro_task, task_index, task_depends):
117 return "DONE", None, None
118
119
120 class VimInteractionNet(VimInteractionBase):
121 def new(self, ro_task, task_index, task_depends):
122 vim_net_id = None
123 task = ro_task["tasks"][task_index]
124 task_id = task["task_id"]
125 created = False
126 created_items = {}
127 target_vim = self.my_vims[ro_task["target_id"]]
128 mgmtnet = False
129 mgmtnet_defined_in_vim = False
130
131 try:
132 # FIND
133 if task.get("find_params"):
134 # if management, get configuration of VIM
135 if task["find_params"].get("filter_dict"):
136 vim_filter = task["find_params"]["filter_dict"]
137 # management network
138 elif task["find_params"].get("mgmt"):
139 mgmtnet = True
140 if deep_get(
141 self.db_vims[ro_task["target_id"]],
142 "config",
143 "management_network_id",
144 ):
145 mgmtnet_defined_in_vim = True
146 vim_filter = {
147 "id": self.db_vims[ro_task["target_id"]]["config"][
148 "management_network_id"
149 ]
150 }
151 elif deep_get(
152 self.db_vims[ro_task["target_id"]],
153 "config",
154 "management_network_name",
155 ):
156 mgmtnet_defined_in_vim = True
157 vim_filter = {
158 "name": self.db_vims[ro_task["target_id"]]["config"][
159 "management_network_name"
160 ]
161 }
162 else:
163 vim_filter = {"name": task["find_params"]["name"]}
164 else:
165 raise NsWorkerExceptionNotFound(
166 "Invalid find_params for new_net {}".format(task["find_params"])
167 )
168
169 vim_nets = target_vim.get_network_list(vim_filter)
170 if not vim_nets and not task.get("params"):
171 # If there is mgmt-network in the descriptor,
172 # there is no mapping of that network to a VIM network in the descriptor,
173 # also there is no mapping in the "--config" parameter or at VIM creation;
174 # that mgmt-network will be created.
175 if mgmtnet and not mgmtnet_defined_in_vim:
176 net_name = (
177 vim_filter.get("name")
178 if vim_filter.get("name")
179 else vim_filter.get("id")[:16]
180 )
181 vim_net_id, created_items = target_vim.new_network(
182 net_name, None
183 )
184 self.logger.debug(
185 "Created mgmt network vim_net_id: {}".format(vim_net_id)
186 )
187 created = True
188 else:
189 raise NsWorkerExceptionNotFound(
190 "Network not found with this criteria: '{}'".format(
191 task.get("find_params")
192 )
193 )
194 elif len(vim_nets) > 1:
195 raise NsWorkerException(
196 "More than one network found with this criteria: '{}'".format(
197 task["find_params"]
198 )
199 )
200
201 if vim_nets:
202 vim_net_id = vim_nets[0]["id"]
203 else:
204 # CREATE
205 params = task["params"]
206 vim_net_id, created_items = target_vim.new_network(**params)
207 created = True
208
209 ro_vim_item_update = {
210 "vim_id": vim_net_id,
211 "vim_status": "BUILD",
212 "created": created,
213 "created_items": created_items,
214 "vim_details": None,
215 "vim_message": None,
216 }
217 self.logger.debug(
218 "task={} {} new-net={} created={}".format(
219 task_id, ro_task["target_id"], vim_net_id, created
220 )
221 )
222
223 return "BUILD", ro_vim_item_update
224 except (vimconn.VimConnException, NsWorkerException) as e:
225 self.logger.error(
226 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e)
227 )
228 ro_vim_item_update = {
229 "vim_status": "VIM_ERROR",
230 "created": created,
231 "vim_message": str(e),
232 }
233
234 return "FAILED", ro_vim_item_update
235
236 def refresh(self, ro_task):
237 """Call VIM to get network status"""
238 ro_task_id = ro_task["_id"]
239 target_vim = self.my_vims[ro_task["target_id"]]
240 vim_id = ro_task["vim_info"]["vim_id"]
241 net_to_refresh_list = [vim_id]
242
243 try:
244 vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
245 vim_info = vim_dict[vim_id]
246
247 if vim_info["status"] == "ACTIVE":
248 task_status = "DONE"
249 elif vim_info["status"] == "BUILD":
250 task_status = "BUILD"
251 else:
252 task_status = "FAILED"
253 except vimconn.VimConnException as e:
254 # Mark all tasks at VIM_ERROR status
255 self.logger.error(
256 "ro_task={} vim={} get-net={}: {}".format(
257 ro_task_id, ro_task["target_id"], vim_id, e
258 )
259 )
260 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
261 task_status = "FAILED"
262
263 ro_vim_item_update = {}
264 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
265 ro_vim_item_update["vim_status"] = vim_info["status"]
266
267 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
268 ro_vim_item_update["vim_name"] = vim_info.get("name")
269
270 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
271 if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"):
272 ro_vim_item_update["vim_message"] = vim_info.get("error_msg")
273 elif vim_info["status"] == "DELETED":
274 ro_vim_item_update["vim_id"] = None
275 ro_vim_item_update["vim_message"] = "Deleted externally"
276 else:
277 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
278 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
279
280 if ro_vim_item_update:
281 self.logger.debug(
282 "ro_task={} {} get-net={}: status={} {}".format(
283 ro_task_id,
284 ro_task["target_id"],
285 vim_id,
286 ro_vim_item_update.get("vim_status"),
287 ro_vim_item_update.get("vim_message")
288 if ro_vim_item_update.get("vim_status") != "ACTIVE"
289 else "",
290 )
291 )
292
293 return task_status, ro_vim_item_update
294
295 def delete(self, ro_task, task_index):
296 task = ro_task["tasks"][task_index]
297 task_id = task["task_id"]
298 net_vim_id = ro_task["vim_info"]["vim_id"]
299 ro_vim_item_update_ok = {
300 "vim_status": "DELETED",
301 "created": False,
302 "vim_message": "DELETED",
303 "vim_id": None,
304 }
305
306 try:
307 if net_vim_id or ro_task["vim_info"]["created_items"]:
308 target_vim = self.my_vims[ro_task["target_id"]]
309 target_vim.delete_network(
310 net_vim_id, ro_task["vim_info"]["created_items"]
311 )
312 except vimconn.VimConnNotFoundException:
313 ro_vim_item_update_ok["vim_message"] = "already deleted"
314 except vimconn.VimConnException as e:
315 self.logger.error(
316 "ro_task={} vim={} del-net={}: {}".format(
317 ro_task["_id"], ro_task["target_id"], net_vim_id, e
318 )
319 )
320 ro_vim_item_update = {
321 "vim_status": "VIM_ERROR",
322 "vim_message": "Error while deleting: {}".format(e),
323 }
324
325 return "FAILED", ro_vim_item_update
326
327 self.logger.debug(
328 "task={} {} del-net={} {}".format(
329 task_id,
330 ro_task["target_id"],
331 net_vim_id,
332 ro_vim_item_update_ok.get("vim_message", ""),
333 )
334 )
335
336 return "DONE", ro_vim_item_update_ok
337
338
339 class VimInteractionVdu(VimInteractionBase):
340 max_retries_inject_ssh_key = 20 # 20 times
341 time_retries_inject_ssh_key = 30 # wevery 30 seconds
342
343 def new(self, ro_task, task_index, task_depends):
344 task = ro_task["tasks"][task_index]
345 task_id = task["task_id"]
346 created = False
347 created_items = {}
348 target_vim = self.my_vims[ro_task["target_id"]]
349
350 try:
351 created = True
352 params = task["params"]
353 params_copy = deepcopy(params)
354 net_list = params_copy["net_list"]
355
356 for net in net_list:
357 # change task_id into network_id
358 if "net_id" in net and net["net_id"].startswith("TASK-"):
359 network_id = task_depends[net["net_id"]]
360
361 if not network_id:
362 raise NsWorkerException(
363 "Cannot create VM because depends on a network not created or found "
364 "for {}".format(net["net_id"])
365 )
366
367 net["net_id"] = network_id
368
369 if params_copy["image_id"].startswith("TASK-"):
370 params_copy["image_id"] = task_depends[params_copy["image_id"]]
371
372 if params_copy["flavor_id"].startswith("TASK-"):
373 params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
374
375 affinity_group_list = params_copy["affinity_group_list"]
376 for affinity_group in affinity_group_list:
377 # change task_id into affinity_group_id
378 if "affinity_group_id" in affinity_group and affinity_group[
379 "affinity_group_id"
380 ].startswith("TASK-"):
381 affinity_group_id = task_depends[
382 affinity_group["affinity_group_id"]
383 ]
384
385 if not affinity_group_id:
386 raise NsWorkerException(
387 "found for {}".format(affinity_group["affinity_group_id"])
388 )
389
390 affinity_group["affinity_group_id"] = affinity_group_id
391
392 vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
393 interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
394
395 # add to created items previous_created_volumes (healing)
396 if task.get("previous_created_volumes"):
397 for k, v in task["previous_created_volumes"].items():
398 created_items[k] = v
399
400 ro_vim_item_update = {
401 "vim_id": vim_vm_id,
402 "vim_status": "BUILD",
403 "created": created,
404 "created_items": created_items,
405 "vim_details": None,
406 "vim_message": None,
407 "interfaces_vim_ids": interfaces,
408 "interfaces": [],
409 "interfaces_backup": [],
410 }
411 self.logger.debug(
412 "task={} {} new-vm={} created={}".format(
413 task_id, ro_task["target_id"], vim_vm_id, created
414 )
415 )
416
417 return "BUILD", ro_vim_item_update
418 except (vimconn.VimConnException, NsWorkerException) as e:
419 self.logger.debug(traceback.format_exc())
420 self.logger.error(
421 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
422 )
423 ro_vim_item_update = {
424 "vim_status": "VIM_ERROR",
425 "created": created,
426 "vim_message": str(e),
427 }
428
429 return "FAILED", ro_vim_item_update
430
431 def delete(self, ro_task, task_index):
432 task = ro_task["tasks"][task_index]
433 task_id = task["task_id"]
434 vm_vim_id = ro_task["vim_info"]["vim_id"]
435 ro_vim_item_update_ok = {
436 "vim_status": "DELETED",
437 "created": False,
438 "vim_message": "DELETED",
439 "vim_id": None,
440 }
441
442 try:
443 self.logger.debug(
444 "delete_vminstance: vm_vim_id={} created_items={}".format(
445 vm_vim_id, ro_task["vim_info"]["created_items"]
446 )
447 )
448 if vm_vim_id or ro_task["vim_info"]["created_items"]:
449 target_vim = self.my_vims[ro_task["target_id"]]
450 target_vim.delete_vminstance(
451 vm_vim_id,
452 ro_task["vim_info"]["created_items"],
453 ro_task["vim_info"].get("volumes_to_hold", []),
454 )
455 except vimconn.VimConnNotFoundException:
456 ro_vim_item_update_ok["vim_message"] = "already deleted"
457 except vimconn.VimConnException as e:
458 self.logger.error(
459 "ro_task={} vim={} del-vm={}: {}".format(
460 ro_task["_id"], ro_task["target_id"], vm_vim_id, e
461 )
462 )
463 ro_vim_item_update = {
464 "vim_status": "VIM_ERROR",
465 "vim_message": "Error while deleting: {}".format(e),
466 }
467
468 return "FAILED", ro_vim_item_update
469
470 self.logger.debug(
471 "task={} {} del-vm={} {}".format(
472 task_id,
473 ro_task["target_id"],
474 vm_vim_id,
475 ro_vim_item_update_ok.get("vim_message", ""),
476 )
477 )
478
479 return "DONE", ro_vim_item_update_ok
480
481 def refresh(self, ro_task):
482 """Call VIM to get vm status"""
483 ro_task_id = ro_task["_id"]
484 target_vim = self.my_vims[ro_task["target_id"]]
485 vim_id = ro_task["vim_info"]["vim_id"]
486
487 if not vim_id:
488 return None, None
489
490 vm_to_refresh_list = [vim_id]
491 try:
492 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
493 vim_info = vim_dict[vim_id]
494
495 if vim_info["status"] == "ACTIVE":
496 task_status = "DONE"
497 elif vim_info["status"] == "BUILD":
498 task_status = "BUILD"
499 else:
500 task_status = "FAILED"
501
502 # try to load and parse vim_information
503 try:
504 vim_info_info = yaml.safe_load(vim_info["vim_info"])
505 if vim_info_info.get("name"):
506 vim_info["name"] = vim_info_info["name"]
507 except Exception as vim_info_error:
508 self.logger.exception(
509 f"{vim_info_error} occured while getting the vim_info from yaml"
510 )
511 except vimconn.VimConnException as e:
512 # Mark all tasks at VIM_ERROR status
513 self.logger.error(
514 "ro_task={} vim={} get-vm={}: {}".format(
515 ro_task_id, ro_task["target_id"], vim_id, e
516 )
517 )
518 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
519 task_status = "FAILED"
520
521 ro_vim_item_update = {}
522
523 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
524 vim_interfaces = []
525 if vim_info.get("interfaces"):
526 for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
527 iface = next(
528 (
529 iface
530 for iface in vim_info["interfaces"]
531 if vim_iface_id == iface["vim_interface_id"]
532 ),
533 None,
534 )
535 # if iface:
536 # iface.pop("vim_info", None)
537 vim_interfaces.append(iface)
538
539 task_create = next(
540 t
541 for t in ro_task["tasks"]
542 if t and t["action"] == "CREATE" and t["status"] != "FINISHED"
543 )
544 if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None:
545 vim_interfaces[task_create["mgmt_vnf_interface"]][
546 "mgmt_vnf_interface"
547 ] = True
548
549 mgmt_vdu_iface = task_create.get(
550 "mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0)
551 )
552 if vim_interfaces:
553 vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
554
555 if ro_task["vim_info"]["interfaces"] != vim_interfaces:
556 ro_vim_item_update["interfaces"] = vim_interfaces
557
558 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
559 ro_vim_item_update["vim_status"] = vim_info["status"]
560
561 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
562 ro_vim_item_update["vim_name"] = vim_info.get("name")
563
564 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
565 if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"):
566 ro_vim_item_update["vim_message"] = vim_info.get("error_msg")
567 elif vim_info["status"] == "DELETED":
568 ro_vim_item_update["vim_id"] = None
569 ro_vim_item_update["vim_message"] = "Deleted externally"
570 else:
571 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
572 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
573
574 if ro_vim_item_update:
575 self.logger.debug(
576 "ro_task={} {} get-vm={}: status={} {}".format(
577 ro_task_id,
578 ro_task["target_id"],
579 vim_id,
580 ro_vim_item_update.get("vim_status"),
581 ro_vim_item_update.get("vim_message")
582 if ro_vim_item_update.get("vim_status") != "ACTIVE"
583 else "",
584 )
585 )
586
587 return task_status, ro_vim_item_update
588
589 def exec(self, ro_task, task_index, task_depends):
590 task = ro_task["tasks"][task_index]
591 task_id = task["task_id"]
592 target_vim = self.my_vims[ro_task["target_id"]]
593 db_task_update = {"retries": 0}
594 retries = task.get("retries", 0)
595
596 try:
597 params = task["params"]
598 params_copy = deepcopy(params)
599 params_copy["ro_key"] = self.db.decrypt(
600 params_copy.pop("private_key"),
601 params_copy.pop("schema_version"),
602 params_copy.pop("salt"),
603 )
604 params_copy["ip_addr"] = params_copy.pop("ip_address")
605 target_vim.inject_user_key(**params_copy)
606 self.logger.debug(
607 "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"])
608 )
609
610 return (
611 "DONE",
612 None,
613 db_task_update,
614 ) # params_copy["key"]
615 except (vimconn.VimConnException, NsWorkerException) as e:
616 retries += 1
617
618 self.logger.debug(traceback.format_exc())
619 if retries < self.max_retries_inject_ssh_key:
620 return (
621 "BUILD",
622 None,
623 {
624 "retries": retries,
625 "next_retry": self.time_retries_inject_ssh_key,
626 },
627 )
628
629 self.logger.error(
630 "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e)
631 )
632 ro_vim_item_update = {"vim_message": str(e)}
633
634 return "FAILED", ro_vim_item_update, db_task_update
635
636
637 class VimInteractionImage(VimInteractionBase):
638 def new(self, ro_task, task_index, task_depends):
639 task = ro_task["tasks"][task_index]
640 task_id = task["task_id"]
641 created = False
642 created_items = {}
643 target_vim = self.my_vims[ro_task["target_id"]]
644
645 try:
646 # FIND
647 if task.get("find_params"):
648 vim_images = target_vim.get_image_list(**task["find_params"])
649
650 if not vim_images:
651 raise NsWorkerExceptionNotFound(
652 "Image not found with this criteria: '{}'".format(
653 task["find_params"]
654 )
655 )
656 elif len(vim_images) > 1:
657 raise NsWorkerException(
658 "More than one image found with this criteria: '{}'".format(
659 task["find_params"]
660 )
661 )
662 else:
663 vim_image_id = vim_images[0]["id"]
664
665 ro_vim_item_update = {
666 "vim_id": vim_image_id,
667 "vim_status": "DONE",
668 "created": created,
669 "created_items": created_items,
670 "vim_details": None,
671 "vim_message": None,
672 }
673 self.logger.debug(
674 "task={} {} new-image={} created={}".format(
675 task_id, ro_task["target_id"], vim_image_id, created
676 )
677 )
678
679 return "DONE", ro_vim_item_update
680 except (NsWorkerException, vimconn.VimConnException) as e:
681 self.logger.error(
682 "task={} {} new-image: {}".format(task_id, ro_task["target_id"], e)
683 )
684 ro_vim_item_update = {
685 "vim_status": "VIM_ERROR",
686 "created": created,
687 "vim_message": str(e),
688 }
689
690 return "FAILED", ro_vim_item_update
691
692
693 class VimInteractionFlavor(VimInteractionBase):
694 def delete(self, ro_task, task_index):
695 task = ro_task["tasks"][task_index]
696 task_id = task["task_id"]
697 flavor_vim_id = ro_task["vim_info"]["vim_id"]
698 ro_vim_item_update_ok = {
699 "vim_status": "DELETED",
700 "created": False,
701 "vim_message": "DELETED",
702 "vim_id": None,
703 }
704
705 try:
706 if flavor_vim_id:
707 target_vim = self.my_vims[ro_task["target_id"]]
708 target_vim.delete_flavor(flavor_vim_id)
709 except vimconn.VimConnNotFoundException:
710 ro_vim_item_update_ok["vim_message"] = "already deleted"
711 except vimconn.VimConnException as e:
712 self.logger.error(
713 "ro_task={} vim={} del-flavor={}: {}".format(
714 ro_task["_id"], ro_task["target_id"], flavor_vim_id, e
715 )
716 )
717 ro_vim_item_update = {
718 "vim_status": "VIM_ERROR",
719 "vim_message": "Error while deleting: {}".format(e),
720 }
721
722 return "FAILED", ro_vim_item_update
723
724 self.logger.debug(
725 "task={} {} del-flavor={} {}".format(
726 task_id,
727 ro_task["target_id"],
728 flavor_vim_id,
729 ro_vim_item_update_ok.get("vim_message", ""),
730 )
731 )
732
733 return "DONE", ro_vim_item_update_ok
734
735 def new(self, ro_task, task_index, task_depends):
736 task = ro_task["tasks"][task_index]
737 task_id = task["task_id"]
738 created = False
739 created_items = {}
740 target_vim = self.my_vims[ro_task["target_id"]]
741
742 try:
743 # FIND
744 vim_flavor_id = None
745
746 if task.get("find_params"):
747 try:
748 flavor_data = task["find_params"]["flavor_data"]
749 vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
750 except vimconn.VimConnNotFoundException:
751 self.logger.warning("VimConnNotFoundException occured.")
752
753 if not vim_flavor_id and task.get("params"):
754 # CREATE
755 flavor_data = task["params"]["flavor_data"]
756 vim_flavor_id = target_vim.new_flavor(flavor_data)
757 created = True
758
759 ro_vim_item_update = {
760 "vim_id": vim_flavor_id,
761 "vim_status": "DONE",
762 "created": created,
763 "created_items": created_items,
764 "vim_details": None,
765 "vim_message": None,
766 }
767 self.logger.debug(
768 "task={} {} new-flavor={} created={}".format(
769 task_id, ro_task["target_id"], vim_flavor_id, created
770 )
771 )
772
773 return "DONE", ro_vim_item_update
774 except (vimconn.VimConnException, NsWorkerException) as e:
775 self.logger.error(
776 "task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e)
777 )
778 ro_vim_item_update = {
779 "vim_status": "VIM_ERROR",
780 "created": created,
781 "vim_message": str(e),
782 }
783
784 return "FAILED", ro_vim_item_update
785
786
787 class VimInteractionAffinityGroup(VimInteractionBase):
788 def delete(self, ro_task, task_index):
789 task = ro_task["tasks"][task_index]
790 task_id = task["task_id"]
791 affinity_group_vim_id = ro_task["vim_info"]["vim_id"]
792 ro_vim_item_update_ok = {
793 "vim_status": "DELETED",
794 "created": False,
795 "vim_message": "DELETED",
796 "vim_id": None,
797 }
798
799 try:
800 if affinity_group_vim_id:
801 target_vim = self.my_vims[ro_task["target_id"]]
802 target_vim.delete_affinity_group(affinity_group_vim_id)
803 except vimconn.VimConnNotFoundException:
804 ro_vim_item_update_ok["vim_message"] = "already deleted"
805 except vimconn.VimConnException as e:
806 self.logger.error(
807 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format(
808 ro_task["_id"], ro_task["target_id"], affinity_group_vim_id, e
809 )
810 )
811 ro_vim_item_update = {
812 "vim_status": "VIM_ERROR",
813 "vim_message": "Error while deleting: {}".format(e),
814 }
815
816 return "FAILED", ro_vim_item_update
817
818 self.logger.debug(
819 "task={} {} del-affinity-or-anti-affinity-group={} {}".format(
820 task_id,
821 ro_task["target_id"],
822 affinity_group_vim_id,
823 ro_vim_item_update_ok.get("vim_message", ""),
824 )
825 )
826
827 return "DONE", ro_vim_item_update_ok
828
829 def new(self, ro_task, task_index, task_depends):
830 task = ro_task["tasks"][task_index]
831 task_id = task["task_id"]
832 created = False
833 created_items = {}
834 target_vim = self.my_vims[ro_task["target_id"]]
835
836 try:
837 affinity_group_vim_id = None
838 affinity_group_data = None
839
840 if task.get("params"):
841 affinity_group_data = task["params"].get("affinity_group_data")
842
843 if affinity_group_data and affinity_group_data.get("vim-affinity-group-id"):
844 try:
845 param_affinity_group_id = task["params"]["affinity_group_data"].get(
846 "vim-affinity-group-id"
847 )
848 affinity_group_vim_id = target_vim.get_affinity_group(
849 param_affinity_group_id
850 ).get("id")
851 except vimconn.VimConnNotFoundException:
852 self.logger.error(
853 "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}"
854 "could not be found at VIM. Creating a new one.".format(
855 task_id, ro_task["target_id"], param_affinity_group_id
856 )
857 )
858
859 if not affinity_group_vim_id and affinity_group_data:
860 affinity_group_vim_id = target_vim.new_affinity_group(
861 affinity_group_data
862 )
863 created = True
864
865 ro_vim_item_update = {
866 "vim_id": affinity_group_vim_id,
867 "vim_status": "DONE",
868 "created": created,
869 "created_items": created_items,
870 "vim_details": None,
871 "vim_message": None,
872 }
873 self.logger.debug(
874 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format(
875 task_id, ro_task["target_id"], affinity_group_vim_id, created
876 )
877 )
878
879 return "DONE", ro_vim_item_update
880 except (vimconn.VimConnException, NsWorkerException) as e:
881 self.logger.error(
882 "task={} vim={} new-affinity-or-anti-affinity-group:"
883 " {}".format(task_id, ro_task["target_id"], e)
884 )
885 ro_vim_item_update = {
886 "vim_status": "VIM_ERROR",
887 "created": created,
888 "vim_message": str(e),
889 }
890
891 return "FAILED", ro_vim_item_update
892
893
894 class VimInteractionUpdateVdu(VimInteractionBase):
895 def exec(self, ro_task, task_index, task_depends):
896 task = ro_task["tasks"][task_index]
897 task_id = task["task_id"]
898 db_task_update = {"retries": 0}
899 created = False
900 created_items = {}
901 target_vim = self.my_vims[ro_task["target_id"]]
902
903 try:
904 if task.get("params"):
905 vim_vm_id = task["params"].get("vim_vm_id")
906 action = task["params"].get("action")
907 context = {action: action}
908 target_vim.action_vminstance(vim_vm_id, context)
909 # created = True
910 ro_vim_item_update = {
911 "vim_id": vim_vm_id,
912 "vim_status": "DONE",
913 "created": created,
914 "created_items": created_items,
915 "vim_details": None,
916 "vim_message": None,
917 }
918 self.logger.debug(
919 "task={} {} vm-migration done".format(task_id, ro_task["target_id"])
920 )
921 return "DONE", ro_vim_item_update, db_task_update
922 except (vimconn.VimConnException, NsWorkerException) as e:
923 self.logger.error(
924 "task={} vim={} VM Migration:"
925 " {}".format(task_id, ro_task["target_id"], e)
926 )
927 ro_vim_item_update = {
928 "vim_status": "VIM_ERROR",
929 "created": created,
930 "vim_message": str(e),
931 }
932
933 return "FAILED", ro_vim_item_update, db_task_update
934
935
936 class VimInteractionSdnNet(VimInteractionBase):
937 @staticmethod
938 def _match_pci(port_pci, mapping):
939 """
940 Check if port_pci matches with mapping
941 mapping can have brackets to indicate that several chars are accepted. e.g
942 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
943 :param port_pci: text
944 :param mapping: text, can contain brackets to indicate several chars are available
945 :return: True if matches, False otherwise
946 """
947 if not port_pci or not mapping:
948 return False
949 if port_pci == mapping:
950 return True
951
952 mapping_index = 0
953 pci_index = 0
954 while True:
955 bracket_start = mapping.find("[", mapping_index)
956
957 if bracket_start == -1:
958 break
959
960 bracket_end = mapping.find("]", bracket_start)
961 if bracket_end == -1:
962 break
963
964 length = bracket_start - mapping_index
965 if (
966 length
967 and port_pci[pci_index : pci_index + length]
968 != mapping[mapping_index:bracket_start]
969 ):
970 return False
971
972 if (
973 port_pci[pci_index + length]
974 not in mapping[bracket_start + 1 : bracket_end]
975 ):
976 return False
977
978 pci_index += length + 1
979 mapping_index = bracket_end + 1
980
981 if port_pci[pci_index:] != mapping[mapping_index:]:
982 return False
983
984 return True
985
986 def _get_interfaces(self, vlds_to_connect, vim_account_id):
987 """
988 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
989 :param vim_account_id:
990 :return:
991 """
992 interfaces = []
993
994 for vld in vlds_to_connect:
995 table, _, db_id = vld.partition(":")
996 db_id, _, vld = db_id.partition(":")
997 _, _, vld_id = vld.partition(".")
998
999 if table == "vnfrs":
1000 q_filter = {"vim-account-id": vim_account_id, "_id": db_id}
1001 iface_key = "vnf-vld-id"
1002 else: # table == "nsrs"
1003 q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id}
1004 iface_key = "ns-vld-id"
1005
1006 db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter)
1007
1008 for db_vnfr in db_vnfrs:
1009 for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())):
1010 for iface_index, interface in enumerate(vdur["interfaces"]):
1011 if interface.get(iface_key) == vld_id and interface.get(
1012 "type"
1013 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
1014 # only SR-IOV o PT
1015 interface_ = interface.copy()
1016 interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
1017 db_vnfr["_id"], vdu_index, iface_index
1018 )
1019
1020 if vdur.get("status") == "ERROR":
1021 interface_["status"] = "ERROR"
1022
1023 interfaces.append(interface_)
1024
1025 return interfaces
1026
1027 def refresh(self, ro_task):
1028 # look for task create
1029 task_create_index, _ = next(
1030 i_t
1031 for i_t in enumerate(ro_task["tasks"])
1032 if i_t[1]
1033 and i_t[1]["action"] == "CREATE"
1034 and i_t[1]["status"] != "FINISHED"
1035 )
1036
1037 return self.new(ro_task, task_create_index, None)
1038
1039 def new(self, ro_task, task_index, task_depends):
1040
1041 task = ro_task["tasks"][task_index]
1042 task_id = task["task_id"]
1043 target_vim = self.my_vims[ro_task["target_id"]]
1044
1045 sdn_net_id = ro_task["vim_info"]["vim_id"]
1046
1047 created_items = ro_task["vim_info"].get("created_items")
1048 connected_ports = ro_task["vim_info"].get("connected_ports", [])
1049 new_connected_ports = []
1050 last_update = ro_task["vim_info"].get("last_update", 0)
1051 sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD"
1052 error_list = []
1053 created = ro_task["vim_info"].get("created", False)
1054
1055 try:
1056 # CREATE
1057 params = task["params"]
1058 vlds_to_connect = params["vlds"]
1059 associated_vim = params["target_vim"]
1060 # external additional ports
1061 additional_ports = params.get("sdn-ports") or ()
1062 _, _, vim_account_id = associated_vim.partition(":")
1063
1064 if associated_vim:
1065 # get associated VIM
1066 if associated_vim not in self.db_vims:
1067 self.db_vims[associated_vim] = self.db.get_one(
1068 "vim_accounts", {"_id": vim_account_id}
1069 )
1070
1071 db_vim = self.db_vims[associated_vim]
1072
1073 # look for ports to connect
1074 ports = self._get_interfaces(vlds_to_connect, vim_account_id)
1075 # print(ports)
1076
1077 sdn_ports = []
1078 pending_ports = error_ports = 0
1079 vlan_used = None
1080 sdn_need_update = False
1081
1082 for port in ports:
1083 vlan_used = port.get("vlan") or vlan_used
1084
1085 # TODO. Do not connect if already done
1086 if not port.get("compute_node") or not port.get("pci"):
1087 if port.get("status") == "ERROR":
1088 error_ports += 1
1089 else:
1090 pending_ports += 1
1091 continue
1092
1093 pmap = None
1094 compute_node_mappings = next(
1095 (
1096 c
1097 for c in db_vim["config"].get("sdn-port-mapping", ())
1098 if c and c["compute_node"] == port["compute_node"]
1099 ),
1100 None,
1101 )
1102
1103 if compute_node_mappings:
1104 # process port_mapping pci of type 0000:af:1[01].[1357]
1105 pmap = next(
1106 (
1107 p
1108 for p in compute_node_mappings["ports"]
1109 if self._match_pci(port["pci"], p.get("pci"))
1110 ),
1111 None,
1112 )
1113
1114 if not pmap:
1115 if not db_vim["config"].get("mapping_not_needed"):
1116 error_list.append(
1117 "Port mapping not found for compute_node={} pci={}".format(
1118 port["compute_node"], port["pci"]
1119 )
1120 )
1121 continue
1122
1123 pmap = {}
1124
1125 service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"])
1126 new_port = {
1127 "service_endpoint_id": pmap.get("service_endpoint_id")
1128 or service_endpoint_id,
1129 "service_endpoint_encapsulation_type": "dot1q"
1130 if port["type"] == "SR-IOV"
1131 else None,
1132 "service_endpoint_encapsulation_info": {
1133 "vlan": port.get("vlan"),
1134 "mac": port.get("mac-address"),
1135 "device_id": pmap.get("device_id") or port["compute_node"],
1136 "device_interface_id": pmap.get("device_interface_id")
1137 or port["pci"],
1138 "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"),
1139 "switch_port": pmap.get("switch_port"),
1140 "service_mapping_info": pmap.get("service_mapping_info"),
1141 },
1142 }
1143
1144 # TODO
1145 # if port["modified_at"] > last_update:
1146 # sdn_need_update = True
1147 new_connected_ports.append(port["id"]) # TODO
1148 sdn_ports.append(new_port)
1149
1150 if error_ports:
1151 error_list.append(
1152 "{} interfaces have not been created as VDU is on ERROR status".format(
1153 error_ports
1154 )
1155 )
1156
1157 # connect external ports
1158 for index, additional_port in enumerate(additional_ports):
1159 additional_port_id = additional_port.get(
1160 "service_endpoint_id"
1161 ) or "external-{}".format(index)
1162 sdn_ports.append(
1163 {
1164 "service_endpoint_id": additional_port_id,
1165 "service_endpoint_encapsulation_type": additional_port.get(
1166 "service_endpoint_encapsulation_type", "dot1q"
1167 ),
1168 "service_endpoint_encapsulation_info": {
1169 "vlan": additional_port.get("vlan") or vlan_used,
1170 "mac": additional_port.get("mac_address"),
1171 "device_id": additional_port.get("device_id"),
1172 "device_interface_id": additional_port.get(
1173 "device_interface_id"
1174 ),
1175 "switch_dpid": additional_port.get("switch_dpid")
1176 or additional_port.get("switch_id"),
1177 "switch_port": additional_port.get("switch_port"),
1178 "service_mapping_info": additional_port.get(
1179 "service_mapping_info"
1180 ),
1181 },
1182 }
1183 )
1184 new_connected_ports.append(additional_port_id)
1185 sdn_info = ""
1186
1187 # if there are more ports to connect or they have been modified, call create/update
1188 if error_list:
1189 sdn_status = "ERROR"
1190 sdn_info = "; ".join(error_list)
1191 elif set(connected_ports) != set(new_connected_ports) or sdn_need_update:
1192 last_update = time.time()
1193
1194 if not sdn_net_id:
1195 if len(sdn_ports) < 2:
1196 sdn_status = "ACTIVE"
1197
1198 if not pending_ports:
1199 self.logger.debug(
1200 "task={} {} new-sdn-net done, less than 2 ports".format(
1201 task_id, ro_task["target_id"]
1202 )
1203 )
1204 else:
1205 net_type = params.get("type") or "ELAN"
1206 (
1207 sdn_net_id,
1208 created_items,
1209 ) = target_vim.create_connectivity_service(net_type, sdn_ports)
1210 created = True
1211 self.logger.debug(
1212 "task={} {} new-sdn-net={} created={}".format(
1213 task_id, ro_task["target_id"], sdn_net_id, created
1214 )
1215 )
1216 else:
1217 created_items = target_vim.edit_connectivity_service(
1218 sdn_net_id, conn_info=created_items, connection_points=sdn_ports
1219 )
1220 created = True
1221 self.logger.debug(
1222 "task={} {} update-sdn-net={} created={}".format(
1223 task_id, ro_task["target_id"], sdn_net_id, created
1224 )
1225 )
1226
1227 connected_ports = new_connected_ports
1228 elif sdn_net_id:
1229 wim_status_dict = target_vim.get_connectivity_service_status(
1230 sdn_net_id, conn_info=created_items
1231 )
1232 sdn_status = wim_status_dict["sdn_status"]
1233
1234 if wim_status_dict.get("sdn_info"):
1235 sdn_info = str(wim_status_dict.get("sdn_info")) or ""
1236
1237 if wim_status_dict.get("error_msg"):
1238 sdn_info = wim_status_dict.get("error_msg") or ""
1239
1240 if pending_ports:
1241 if sdn_status != "ERROR":
1242 sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1243 len(ports) - pending_ports, len(ports)
1244 )
1245
1246 if sdn_status == "ACTIVE":
1247 sdn_status = "BUILD"
1248
1249 ro_vim_item_update = {
1250 "vim_id": sdn_net_id,
1251 "vim_status": sdn_status,
1252 "created": created,
1253 "created_items": created_items,
1254 "connected_ports": connected_ports,
1255 "vim_details": sdn_info,
1256 "vim_message": None,
1257 "last_update": last_update,
1258 }
1259
1260 return sdn_status, ro_vim_item_update
1261 except Exception as e:
1262 self.logger.error(
1263 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e),
1264 exc_info=not isinstance(
1265 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1266 ),
1267 )
1268 ro_vim_item_update = {
1269 "vim_status": "VIM_ERROR",
1270 "created": created,
1271 "vim_message": str(e),
1272 }
1273
1274 return "FAILED", ro_vim_item_update
1275
1276 def delete(self, ro_task, task_index):
1277 task = ro_task["tasks"][task_index]
1278 task_id = task["task_id"]
1279 sdn_vim_id = ro_task["vim_info"].get("vim_id")
1280 ro_vim_item_update_ok = {
1281 "vim_status": "DELETED",
1282 "created": False,
1283 "vim_message": "DELETED",
1284 "vim_id": None,
1285 }
1286
1287 try:
1288 if sdn_vim_id:
1289 target_vim = self.my_vims[ro_task["target_id"]]
1290 target_vim.delete_connectivity_service(
1291 sdn_vim_id, ro_task["vim_info"].get("created_items")
1292 )
1293
1294 except Exception as e:
1295 if (
1296 isinstance(e, sdnconn.SdnConnectorError)
1297 and e.http_code == HTTPStatus.NOT_FOUND.value
1298 ):
1299 ro_vim_item_update_ok["vim_message"] = "already deleted"
1300 else:
1301 self.logger.error(
1302 "ro_task={} vim={} del-sdn-net={}: {}".format(
1303 ro_task["_id"], ro_task["target_id"], sdn_vim_id, e
1304 ),
1305 exc_info=not isinstance(
1306 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1307 ),
1308 )
1309 ro_vim_item_update = {
1310 "vim_status": "VIM_ERROR",
1311 "vim_message": "Error while deleting: {}".format(e),
1312 }
1313
1314 return "FAILED", ro_vim_item_update
1315
1316 self.logger.debug(
1317 "task={} {} del-sdn-net={} {}".format(
1318 task_id,
1319 ro_task["target_id"],
1320 sdn_vim_id,
1321 ro_vim_item_update_ok.get("vim_message", ""),
1322 )
1323 )
1324
1325 return "DONE", ro_vim_item_update_ok
1326
1327
1328 class VimInteractionMigration(VimInteractionBase):
1329 def exec(self, ro_task, task_index, task_depends):
1330 task = ro_task["tasks"][task_index]
1331 task_id = task["task_id"]
1332 db_task_update = {"retries": 0}
1333 target_vim = self.my_vims[ro_task["target_id"]]
1334 vim_interfaces = []
1335 created = False
1336 created_items = {}
1337 refreshed_vim_info = {}
1338
1339 try:
1340 if task.get("params"):
1341 vim_vm_id = task["params"].get("vim_vm_id")
1342 migrate_host = task["params"].get("migrate_host")
1343 _, migrated_compute_node = target_vim.migrate_instance(
1344 vim_vm_id, migrate_host
1345 )
1346
1347 if migrated_compute_node:
1348 # When VM is migrated, vdu["vim_info"] needs to be updated
1349 vdu_old_vim_info = task["params"]["vdu_vim_info"].get(
1350 ro_task["target_id"]
1351 )
1352
1353 # Refresh VM to get new vim_info
1354 vm_to_refresh_list = [vim_vm_id]
1355 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
1356 refreshed_vim_info = vim_dict[vim_vm_id]
1357
1358 if refreshed_vim_info.get("interfaces"):
1359 for old_iface in vdu_old_vim_info.get("interfaces"):
1360 iface = next(
1361 (
1362 iface
1363 for iface in refreshed_vim_info["interfaces"]
1364 if old_iface["vim_interface_id"]
1365 == iface["vim_interface_id"]
1366 ),
1367 None,
1368 )
1369 vim_interfaces.append(iface)
1370
1371 ro_vim_item_update = {
1372 "vim_id": vim_vm_id,
1373 "vim_status": "ACTIVE",
1374 "created": created,
1375 "created_items": created_items,
1376 "vim_details": None,
1377 "vim_message": None,
1378 }
1379
1380 if refreshed_vim_info and refreshed_vim_info.get("status") not in (
1381 "ERROR",
1382 "VIM_ERROR",
1383 ):
1384 ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"]
1385
1386 if vim_interfaces:
1387 ro_vim_item_update["interfaces"] = vim_interfaces
1388
1389 self.logger.debug(
1390 "task={} {} vm-migration done".format(task_id, ro_task["target_id"])
1391 )
1392
1393 return "DONE", ro_vim_item_update, db_task_update
1394
1395 except (vimconn.VimConnException, NsWorkerException) as e:
1396 self.logger.error(
1397 "task={} vim={} VM Migration:"
1398 " {}".format(task_id, ro_task["target_id"], e)
1399 )
1400 ro_vim_item_update = {
1401 "vim_status": "VIM_ERROR",
1402 "created": created,
1403 "vim_message": str(e),
1404 }
1405
1406 return "FAILED", ro_vim_item_update, db_task_update
1407
1408
1409 class VimInteractionResize(VimInteractionBase):
1410 def exec(self, ro_task, task_index, task_depends):
1411 task = ro_task["tasks"][task_index]
1412 task_id = task["task_id"]
1413 db_task_update = {"retries": 0}
1414 created = False
1415 target_flavor_uuid = None
1416 created_items = {}
1417 refreshed_vim_info = {}
1418 target_vim = self.my_vims[ro_task["target_id"]]
1419
1420 try:
1421 if task.get("params"):
1422 vim_vm_id = task["params"].get("vim_vm_id")
1423 flavor_dict = task["params"].get("flavor_dict")
1424 self.logger.info("flavor_dict %s", flavor_dict)
1425
1426 try:
1427 target_flavor_uuid = target_vim.get_flavor_id_from_data(flavor_dict)
1428 except Exception as e:
1429 self.logger.info("Cannot find any flavor matching %s.", str(e))
1430 try:
1431 target_flavor_uuid = target_vim.new_flavor(flavor_dict)
1432 except Exception as e:
1433 self.logger.error("Error creating flavor at VIM %s.", str(e))
1434
1435 if target_flavor_uuid is not None:
1436 resized_status = target_vim.resize_instance(
1437 vim_vm_id, target_flavor_uuid
1438 )
1439
1440 if resized_status:
1441 # Refresh VM to get new vim_info
1442 vm_to_refresh_list = [vim_vm_id]
1443 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
1444 refreshed_vim_info = vim_dict[vim_vm_id]
1445
1446 ro_vim_item_update = {
1447 "vim_id": vim_vm_id,
1448 "vim_status": "DONE",
1449 "created": created,
1450 "created_items": created_items,
1451 "vim_details": None,
1452 "vim_message": None,
1453 }
1454
1455 if refreshed_vim_info and refreshed_vim_info.get("status") not in (
1456 "ERROR",
1457 "VIM_ERROR",
1458 ):
1459 ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"]
1460
1461 self.logger.debug(
1462 "task={} {} resize done".format(task_id, ro_task["target_id"])
1463 )
1464 return "DONE", ro_vim_item_update, db_task_update
1465 except (vimconn.VimConnException, NsWorkerException) as e:
1466 self.logger.error(
1467 "task={} vim={} Resize:" " {}".format(task_id, ro_task["target_id"], e)
1468 )
1469 ro_vim_item_update = {
1470 "vim_status": "VIM_ERROR",
1471 "created": created,
1472 "vim_message": str(e),
1473 }
1474
1475 return "FAILED", ro_vim_item_update, db_task_update
1476
1477
1478 class ConfigValidate:
1479 def __init__(self, config: Dict):
1480 self.conf = config
1481
1482 @property
1483 def active(self):
1484 # default 1 min, allowed >= 60 or -1, -1 disables periodic checks
1485 if (
1486 self.conf["period"]["refresh_active"] >= 60
1487 or self.conf["period"]["refresh_active"] == -1
1488 ):
1489 return self.conf["period"]["refresh_active"]
1490
1491 return 60
1492
1493 @property
1494 def build(self):
1495 return self.conf["period"]["refresh_build"]
1496
1497 @property
1498 def image(self):
1499 return self.conf["period"]["refresh_image"]
1500
1501 @property
1502 def error(self):
1503 return self.conf["period"]["refresh_error"]
1504
1505 @property
1506 def queue_size(self):
1507 return self.conf["period"]["queue_size"]
1508
1509
1510 class NsWorker(threading.Thread):
1511 def __init__(self, worker_index, config, plugins, db):
1512 """
1513 :param worker_index: thread index
1514 :param config: general configuration of RO, among others the process_id with the docker id where it runs
1515 :param plugins: global shared dict with the loaded plugins
1516 :param db: database class instance to use
1517 """
1518 threading.Thread.__init__(self)
1519 self.config = config
1520 self.plugins = plugins
1521 self.plugin_name = "unknown"
1522 self.logger = logging.getLogger("ro.worker{}".format(worker_index))
1523 self.worker_index = worker_index
1524 # refresh periods for created items
1525 self.refresh_config = ConfigValidate(config)
1526 self.task_queue = queue.Queue(self.refresh_config.queue_size)
1527 # targetvim: vimplugin class
1528 self.my_vims = {}
1529 # targetvim: vim information from database
1530 self.db_vims = {}
1531 # targetvim list
1532 self.vim_targets = []
1533 self.my_id = config["process_id"] + ":" + str(worker_index)
1534 self.db = db
1535 self.item2class = {
1536 "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger),
1537 "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger),
1538 "image": VimInteractionImage(
1539 self.db, self.my_vims, self.db_vims, self.logger
1540 ),
1541 "flavor": VimInteractionFlavor(
1542 self.db, self.my_vims, self.db_vims, self.logger
1543 ),
1544 "sdn_net": VimInteractionSdnNet(
1545 self.db, self.my_vims, self.db_vims, self.logger
1546 ),
1547 "update": VimInteractionUpdateVdu(
1548 self.db, self.my_vims, self.db_vims, self.logger
1549 ),
1550 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
1551 self.db, self.my_vims, self.db_vims, self.logger
1552 ),
1553 "migrate": VimInteractionMigration(
1554 self.db, self.my_vims, self.db_vims, self.logger
1555 ),
1556 "verticalscale": VimInteractionResize(
1557 self.db, self.my_vims, self.db_vims, self.logger
1558 ),
1559 }
1560 self.time_last_task_processed = None
1561 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1562 self.tasks_to_delete = []
1563 # it is idle when there are not vim_targets associated
1564 self.idle = True
1565 self.task_locked_time = config["global"]["task_locked_time"]
1566
1567 def insert_task(self, task):
1568 try:
1569 self.task_queue.put(task, False)
1570 return None
1571 except queue.Full:
1572 raise NsWorkerException("timeout inserting a task")
1573
1574 def terminate(self):
1575 self.insert_task("exit")
1576
1577 def del_task(self, task):
1578 with self.task_lock:
1579 if task["status"] == "SCHEDULED":
1580 task["status"] = "SUPERSEDED"
1581 return True
1582 else: # task["status"] == "processing"
1583 self.task_lock.release()
1584 return False
1585
1586 def _process_vim_config(self, target_id, db_vim):
1587 """
1588 Process vim config, creating vim configuration files as ca_cert
1589 :param target_id: vim/sdn/wim + id
1590 :param db_vim: Vim dictionary obtained from database
1591 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1592 """
1593 if not db_vim.get("config"):
1594 return
1595
1596 file_name = ""
1597
1598 try:
1599 if db_vim["config"].get("ca_cert_content"):
1600 file_name = "{}:{}".format(target_id, self.worker_index)
1601
1602 try:
1603 mkdir(file_name)
1604 except FileExistsError:
1605 self.logger.exception(
1606 "FileExistsError occured while processing vim_config."
1607 )
1608
1609 file_name = file_name + "/ca_cert"
1610
1611 with open(file_name, "w") as f:
1612 f.write(db_vim["config"]["ca_cert_content"])
1613 del db_vim["config"]["ca_cert_content"]
1614 db_vim["config"]["ca_cert"] = file_name
1615 except Exception as e:
1616 raise NsWorkerException(
1617 "Error writing to file '{}': {}".format(file_name, e)
1618 )
1619
1620 def _load_plugin(self, name, type="vim"):
1621 # type can be vim or sdn
1622 if "rovim_dummy" not in self.plugins:
1623 self.plugins["rovim_dummy"] = VimDummyConnector
1624
1625 if "rosdn_dummy" not in self.plugins:
1626 self.plugins["rosdn_dummy"] = SdnDummyConnector
1627
1628 if name in self.plugins:
1629 return self.plugins[name]
1630
1631 try:
1632 for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name):
1633 self.plugins[name] = ep.load()
1634 except Exception as e:
1635 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e))
1636
1637 if name and name not in self.plugins:
1638 raise NsWorkerException(
1639 "Plugin 'osm_{n}' has not been installed".format(n=name)
1640 )
1641
1642 return self.plugins[name]
1643
1644 def _unload_vim(self, target_id):
1645 """
1646 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1647 :param target_id: Contains type:_id; where type can be 'vim', ...
1648 :return: None.
1649 """
1650 try:
1651 self.db_vims.pop(target_id, None)
1652 self.my_vims.pop(target_id, None)
1653
1654 if target_id in self.vim_targets:
1655 self.vim_targets.remove(target_id)
1656
1657 self.logger.info("Unloaded {}".format(target_id))
1658 rmtree("{}:{}".format(target_id, self.worker_index))
1659 except FileNotFoundError:
1660 # This is raised by rmtree if folder does not exist.
1661 self.logger.exception("FileNotFoundError occured while unloading VIM.")
1662 except Exception as e:
1663 self.logger.error("Cannot unload {}: {}".format(target_id, e))
1664
1665 def _check_vim(self, target_id):
1666 """
1667 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1668 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1669 :return: None.
1670 """
1671 target, _, _id = target_id.partition(":")
1672 now = time.time()
1673 update_dict = {}
1674 unset_dict = {}
1675 op_text = ""
1676 step = ""
1677 loaded = target_id in self.vim_targets
1678 target_database = (
1679 "vim_accounts"
1680 if target == "vim"
1681 else "wim_accounts"
1682 if target == "wim"
1683 else "sdns"
1684 )
1685
1686 try:
1687 step = "Getting {} from db".format(target_id)
1688 db_vim = self.db.get_one(target_database, {"_id": _id})
1689
1690 for op_index, operation in enumerate(
1691 db_vim["_admin"].get("operations", ())
1692 ):
1693 if operation["operationState"] != "PROCESSING":
1694 continue
1695
1696 locked_at = operation.get("locked_at")
1697
1698 if locked_at is not None and locked_at >= now - self.task_locked_time:
1699 # some other thread is doing this operation
1700 return
1701
1702 # lock
1703 op_text = "_admin.operations.{}.".format(op_index)
1704
1705 if not self.db.set_one(
1706 target_database,
1707 q_filter={
1708 "_id": _id,
1709 op_text + "operationState": "PROCESSING",
1710 op_text + "locked_at": locked_at,
1711 },
1712 update_dict={
1713 op_text + "locked_at": now,
1714 "admin.current_operation": op_index,
1715 },
1716 fail_on_empty=False,
1717 ):
1718 return
1719
1720 unset_dict[op_text + "locked_at"] = None
1721 unset_dict["current_operation"] = None
1722 step = "Loading " + target_id
1723 error_text = self._load_vim(target_id)
1724
1725 if not error_text:
1726 step = "Checking connectivity"
1727
1728 if target == "vim":
1729 self.my_vims[target_id].check_vim_connectivity()
1730 else:
1731 self.my_vims[target_id].check_credentials()
1732
1733 update_dict["_admin.operationalState"] = "ENABLED"
1734 update_dict["_admin.detailed-status"] = ""
1735 unset_dict[op_text + "detailed-status"] = None
1736 update_dict[op_text + "operationState"] = "COMPLETED"
1737
1738 return
1739
1740 except Exception as e:
1741 error_text = "{}: {}".format(step, e)
1742 self.logger.error("{} for {}: {}".format(step, target_id, e))
1743
1744 finally:
1745 if update_dict or unset_dict:
1746 if error_text:
1747 update_dict[op_text + "operationState"] = "FAILED"
1748 update_dict[op_text + "detailed-status"] = error_text
1749 unset_dict.pop(op_text + "detailed-status", None)
1750 update_dict["_admin.operationalState"] = "ERROR"
1751 update_dict["_admin.detailed-status"] = error_text
1752
1753 if op_text:
1754 update_dict[op_text + "statusEnteredTime"] = now
1755
1756 self.db.set_one(
1757 target_database,
1758 q_filter={"_id": _id},
1759 update_dict=update_dict,
1760 unset=unset_dict,
1761 fail_on_empty=False,
1762 )
1763
1764 if not loaded:
1765 self._unload_vim(target_id)
1766
1767 def _reload_vim(self, target_id):
1768 if target_id in self.vim_targets:
1769 self._load_vim(target_id)
1770 else:
1771 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1772 # just remove it to force load again next time it is needed
1773 self.db_vims.pop(target_id, None)
1774
1775 def _load_vim(self, target_id):
1776 """
1777 Load or reload a vim_account, sdn_controller or wim_account.
1778 Read content from database, load the plugin if not loaded.
1779 In case of error loading the plugin, it load a failing VIM_connector
1780 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1781 :param target_id: Contains type:_id; where type can be 'vim', ...
1782 :return: None if ok, descriptive text if error
1783 """
1784 target, _, _id = target_id.partition(":")
1785 target_database = (
1786 "vim_accounts"
1787 if target == "vim"
1788 else "wim_accounts"
1789 if target == "wim"
1790 else "sdns"
1791 )
1792 plugin_name = ""
1793 vim = None
1794
1795 try:
1796 step = "Getting {}={} from db".format(target, _id)
1797 # TODO process for wim, sdnc, ...
1798 vim = self.db.get_one(target_database, {"_id": _id})
1799
1800 # if deep_get(vim, "config", "sdn-controller"):
1801 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1802 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1803
1804 step = "Decrypting password"
1805 schema_version = vim.get("schema_version")
1806 self.db.encrypt_decrypt_fields(
1807 vim,
1808 "decrypt",
1809 fields=("password", "secret"),
1810 schema_version=schema_version,
1811 salt=_id,
1812 )
1813 self._process_vim_config(target_id, vim)
1814
1815 if target == "vim":
1816 plugin_name = "rovim_" + vim["vim_type"]
1817 step = "Loading plugin '{}'".format(plugin_name)
1818 vim_module_conn = self._load_plugin(plugin_name)
1819 step = "Loading {}'".format(target_id)
1820 self.my_vims[target_id] = vim_module_conn(
1821 uuid=vim["_id"],
1822 name=vim["name"],
1823 tenant_id=vim.get("vim_tenant_id"),
1824 tenant_name=vim.get("vim_tenant_name"),
1825 url=vim["vim_url"],
1826 url_admin=None,
1827 user=vim["vim_user"],
1828 passwd=vim["vim_password"],
1829 config=vim.get("config") or {},
1830 persistent_info={},
1831 )
1832 else: # sdn
1833 plugin_name = "rosdn_" + vim["type"]
1834 step = "Loading plugin '{}'".format(plugin_name)
1835 vim_module_conn = self._load_plugin(plugin_name, "sdn")
1836 step = "Loading {}'".format(target_id)
1837 wim = deepcopy(vim)
1838 wim_config = wim.pop("config", {}) or {}
1839 wim["uuid"] = wim["_id"]
1840 wim["wim_url"] = wim["url"]
1841
1842 if wim.get("dpid"):
1843 wim_config["dpid"] = wim.pop("dpid")
1844
1845 if wim.get("switch_id"):
1846 wim_config["switch_id"] = wim.pop("switch_id")
1847
1848 # wim, wim_account, config
1849 self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config)
1850 self.db_vims[target_id] = vim
1851 self.error_status = None
1852
1853 self.logger.info(
1854 "Connector loaded for {}, plugin={}".format(target_id, plugin_name)
1855 )
1856 except Exception as e:
1857 self.logger.error(
1858 "Cannot load {} plugin={}: {} {}".format(
1859 target_id, plugin_name, step, e
1860 )
1861 )
1862
1863 self.db_vims[target_id] = vim or {}
1864 self.db_vims[target_id] = FailingConnector(str(e))
1865 error_status = "{} Error: {}".format(step, e)
1866
1867 return error_status
1868 finally:
1869 if target_id not in self.vim_targets:
1870 self.vim_targets.append(target_id)
1871
1872 def _get_db_task(self):
1873 """
1874 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1875 :return: None
1876 """
1877 now = time.time()
1878
1879 if not self.time_last_task_processed:
1880 self.time_last_task_processed = now
1881
1882 try:
1883 while True:
1884 """
1885 # Log RO tasks only when loglevel is DEBUG
1886 if self.logger.getEffectiveLevel() == logging.DEBUG:
1887 self._log_ro_task(
1888 None,
1889 None,
1890 None,
1891 "TASK_WF",
1892 "task_locked_time="
1893 + str(self.task_locked_time)
1894 + " "
1895 + "time_last_task_processed="
1896 + str(self.time_last_task_processed)
1897 + " "
1898 + "now="
1899 + str(now),
1900 )
1901 """
1902 locked = self.db.set_one(
1903 "ro_tasks",
1904 q_filter={
1905 "target_id": self.vim_targets,
1906 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1907 "locked_at.lt": now - self.task_locked_time,
1908 "to_check_at.lt": self.time_last_task_processed,
1909 "to_check_at.gt": -1,
1910 },
1911 update_dict={"locked_by": self.my_id, "locked_at": now},
1912 fail_on_empty=False,
1913 )
1914
1915 if locked:
1916 # read and return
1917 ro_task = self.db.get_one(
1918 "ro_tasks",
1919 q_filter={
1920 "target_id": self.vim_targets,
1921 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1922 "locked_at": now,
1923 },
1924 )
1925 return ro_task
1926
1927 if self.time_last_task_processed == now:
1928 self.time_last_task_processed = None
1929 return None
1930 else:
1931 self.time_last_task_processed = now
1932 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
1933
1934 except DbException as e:
1935 self.logger.error("Database exception at _get_db_task: {}".format(e))
1936 except Exception as e:
1937 self.logger.critical(
1938 "Unexpected exception at _get_db_task: {}".format(e), exc_info=True
1939 )
1940
1941 return None
1942
1943 def _get_db_all_tasks(self):
1944 """
1945 Read all content of table ro_tasks to log it
1946 :return: None
1947 """
1948 try:
1949 # Checking the content of the BD:
1950
1951 # read and return
1952 ro_task = self.db.get_list("ro_tasks")
1953 for rt in ro_task:
1954 self._log_ro_task(rt, None, None, "TASK_WF", "GET_ALL_TASKS")
1955 return ro_task
1956
1957 except DbException as e:
1958 self.logger.error("Database exception at _get_db_all_tasks: {}".format(e))
1959 except Exception as e:
1960 self.logger.critical(
1961 "Unexpected exception at _get_db_all_tasks: {}".format(e), exc_info=True
1962 )
1963
1964 return None
1965
1966 def _log_ro_task(self, ro_task, db_ro_task_update, db_ro_task_delete, mark, event):
1967 """
1968 Generate a log with the following format:
1969
1970 Mark;Event;ro_task_id;locked_at;modified_at;created_at;to_check_at;locked_by;
1971 target_id;vim_info.refresh_at;vim_info;no_of_tasks;task_status;action_id;
1972 task_array_index;task_id;task_action;task_item;task_args
1973
1974 Example:
1975
1976 TASK_WF;GET_TASK;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;1642158724.8210013;
1977 1642158640.7986135;1642158640.7986135;1642158640.7986135;b134c9494e75:0a
1978 ;vim:b7ff9e24-8868-4d68-8a57-a59dc11d0327;None;{'created': False,
1979 'created_items': None, 'vim_id': None, 'vim_name': None, 'vim_status': None,
1980 'vim_details': None, 'vim_message': None, 'refresh_at': None};1;SCHEDULED;
1981 888f1864-749a-4fc2-bc1a-97c0fffd6a6f;0;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;
1982 CREATE;image;{'filter_dict': {'name': 'ubuntu-os-cloud:image-family:ubuntu-1804-lts'}}
1983 """
1984 try:
1985 line = []
1986 i = 0
1987 if ro_task is not None and isinstance(ro_task, dict):
1988 for t in ro_task["tasks"]:
1989 line.clear()
1990 line.append(mark)
1991 line.append(event)
1992 line.append(ro_task.get("_id", ""))
1993 line.append(str(ro_task.get("locked_at", "")))
1994 line.append(str(ro_task.get("modified_at", "")))
1995 line.append(str(ro_task.get("created_at", "")))
1996 line.append(str(ro_task.get("to_check_at", "")))
1997 line.append(str(ro_task.get("locked_by", "")))
1998 line.append(str(ro_task.get("target_id", "")))
1999 line.append(str(ro_task.get("vim_info", {}).get("refresh_at", "")))
2000 line.append(str(ro_task.get("vim_info", "")))
2001 line.append(str(ro_task.get("tasks", "")))
2002 if isinstance(t, dict):
2003 line.append(str(t.get("status", "")))
2004 line.append(str(t.get("action_id", "")))
2005 line.append(str(i))
2006 line.append(str(t.get("task_id", "")))
2007 line.append(str(t.get("action", "")))
2008 line.append(str(t.get("item", "")))
2009 line.append(str(t.get("find_params", "")))
2010 line.append(str(t.get("params", "")))
2011 else:
2012 line.extend([""] * 2)
2013 line.append(str(i))
2014 line.extend([""] * 5)
2015
2016 i += 1
2017 self.logger.debug(";".join(line))
2018 elif db_ro_task_update is not None and isinstance(db_ro_task_update, dict):
2019 i = 0
2020 while True:
2021 st = "tasks.{}.status".format(i)
2022 if st not in db_ro_task_update:
2023 break
2024 line.clear()
2025 line.append(mark)
2026 line.append(event)
2027 line.append(db_ro_task_update.get("_id", ""))
2028 line.append(str(db_ro_task_update.get("locked_at", "")))
2029 line.append(str(db_ro_task_update.get("modified_at", "")))
2030 line.append("")
2031 line.append(str(db_ro_task_update.get("to_check_at", "")))
2032 line.append(str(db_ro_task_update.get("locked_by", "")))
2033 line.append("")
2034 line.append(str(db_ro_task_update.get("vim_info.refresh_at", "")))
2035 line.append("")
2036 line.append(str(db_ro_task_update.get("vim_info", "")))
2037 line.append(str(str(db_ro_task_update).count(".status")))
2038 line.append(db_ro_task_update.get(st, ""))
2039 line.append("")
2040 line.append(str(i))
2041 line.extend([""] * 3)
2042 i += 1
2043 self.logger.debug(";".join(line))
2044
2045 elif db_ro_task_delete is not None and isinstance(db_ro_task_delete, dict):
2046 line.clear()
2047 line.append(mark)
2048 line.append(event)
2049 line.append(db_ro_task_delete.get("_id", ""))
2050 line.append("")
2051 line.append(db_ro_task_delete.get("modified_at", ""))
2052 line.extend([""] * 13)
2053 self.logger.debug(";".join(line))
2054
2055 else:
2056 line.clear()
2057 line.append(mark)
2058 line.append(event)
2059 line.extend([""] * 16)
2060 self.logger.debug(";".join(line))
2061
2062 except Exception as e:
2063 self.logger.error("Error logging ro_task: {}".format(e))
2064
2065 def _delete_task(self, ro_task, task_index, task_depends, db_update):
2066 """
2067 Determine if this task need to be done or superseded
2068 :return: None
2069 """
2070 my_task = ro_task["tasks"][task_index]
2071 task_id = my_task["task_id"]
2072 needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get(
2073 "created_items", False
2074 )
2075
2076 self.logger.debug("Needed delete: {}".format(needed_delete))
2077 if my_task["status"] == "FAILED":
2078 return None, None # TODO need to be retry??
2079
2080 try:
2081 for index, task in enumerate(ro_task["tasks"]):
2082 if index == task_index or not task:
2083 continue # own task
2084
2085 if (
2086 my_task["target_record"] == task["target_record"]
2087 and task["action"] == "CREATE"
2088 ):
2089 # set to finished
2090 db_update["tasks.{}.status".format(index)] = task[
2091 "status"
2092 ] = "FINISHED"
2093 elif task["action"] == "CREATE" and task["status"] not in (
2094 "FINISHED",
2095 "SUPERSEDED",
2096 ):
2097 needed_delete = False
2098
2099 if needed_delete:
2100 self.logger.debug(
2101 "Deleting ro_task={} task_index={}".format(ro_task, task_index)
2102 )
2103 return self.item2class[my_task["item"]].delete(ro_task, task_index)
2104 else:
2105 return "SUPERSEDED", None
2106 except Exception as e:
2107 if not isinstance(e, NsWorkerException):
2108 self.logger.critical(
2109 "Unexpected exception at _delete_task task={}: {}".format(
2110 task_id, e
2111 ),
2112 exc_info=True,
2113 )
2114
2115 return "FAILED", {"vim_status": "VIM_ERROR", "vim_message": str(e)}
2116
2117 def _create_task(self, ro_task, task_index, task_depends, db_update):
2118 """
2119 Determine if this task need to create something at VIM
2120 :return: None
2121 """
2122 my_task = ro_task["tasks"][task_index]
2123 task_id = my_task["task_id"]
2124 task_status = None
2125
2126 if my_task["status"] == "FAILED":
2127 return None, None # TODO need to be retry??
2128 elif my_task["status"] == "SCHEDULED":
2129 # check if already created by another task
2130 for index, task in enumerate(ro_task["tasks"]):
2131 if index == task_index or not task:
2132 continue # own task
2133
2134 if task["action"] == "CREATE" and task["status"] not in (
2135 "SCHEDULED",
2136 "FINISHED",
2137 "SUPERSEDED",
2138 ):
2139 return task["status"], "COPY_VIM_INFO"
2140
2141 try:
2142 task_status, ro_vim_item_update = self.item2class[my_task["item"]].new(
2143 ro_task, task_index, task_depends
2144 )
2145 # TODO update other CREATE tasks
2146 except Exception as e:
2147 if not isinstance(e, NsWorkerException):
2148 self.logger.error(
2149 "Error executing task={}: {}".format(task_id, e), exc_info=True
2150 )
2151
2152 task_status = "FAILED"
2153 ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_message": str(e)}
2154 # TODO update ro_vim_item_update
2155
2156 return task_status, ro_vim_item_update
2157 else:
2158 return None, None
2159
2160 def _get_dependency(self, task_id, ro_task=None, target_id=None):
2161 """
2162 Look for dependency task
2163 :param task_id: Can be one of
2164 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2165 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2166 3. task.task_id: "<action_id>:number"
2167 :param ro_task:
2168 :param target_id:
2169 :return: database ro_task plus index of task
2170 """
2171 if (
2172 task_id.startswith("vim:")
2173 or task_id.startswith("sdn:")
2174 or task_id.startswith("wim:")
2175 ):
2176 target_id, _, task_id = task_id.partition(" ")
2177
2178 if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
2179 ro_task_dependency = self.db.get_one(
2180 "ro_tasks",
2181 q_filter={"target_id": target_id, "tasks.target_record_id": task_id},
2182 fail_on_empty=False,
2183 )
2184
2185 if ro_task_dependency:
2186 for task_index, task in enumerate(ro_task_dependency["tasks"]):
2187 if task["target_record_id"] == task_id:
2188 return ro_task_dependency, task_index
2189
2190 else:
2191 if ro_task:
2192 for task_index, task in enumerate(ro_task["tasks"]):
2193 if task and task["task_id"] == task_id:
2194 return ro_task, task_index
2195
2196 ro_task_dependency = self.db.get_one(
2197 "ro_tasks",
2198 q_filter={
2199 "tasks.ANYINDEX.task_id": task_id,
2200 "tasks.ANYINDEX.target_record.ne": None,
2201 },
2202 fail_on_empty=False,
2203 )
2204
2205 self.logger.debug("ro_task_dependency={}".format(ro_task_dependency))
2206 if ro_task_dependency:
2207 for task_index, task in enumerate(ro_task_dependency["tasks"]):
2208 if task["task_id"] == task_id:
2209 return ro_task_dependency, task_index
2210 raise NsWorkerException("Cannot get depending task {}".format(task_id))
2211
2212 def update_vm_refresh(self):
2213 """Enables the VM status updates if self.refresh_config.active parameter
2214 is not -1 and than updates the DB accordingly
2215
2216 """
2217 try:
2218 self.logger.debug("Checking if VM status update config")
2219 next_refresh = time.time()
2220 if self.refresh_config.active == -1:
2221 next_refresh = -1
2222 else:
2223 next_refresh += self.refresh_config.active
2224
2225 if next_refresh != -1:
2226 db_ro_task_update = {}
2227 now = time.time()
2228 next_check_at = now + (24 * 60 * 60)
2229 next_check_at = min(next_check_at, next_refresh)
2230 db_ro_task_update["vim_info.refresh_at"] = next_refresh
2231 db_ro_task_update["to_check_at"] = next_check_at
2232
2233 self.logger.debug(
2234 "Finding tasks which to be updated to enable VM status updates"
2235 )
2236 refresh_tasks = self.db.get_list(
2237 "ro_tasks",
2238 q_filter={
2239 "tasks.status": "DONE",
2240 "to_check_at.lt": 0,
2241 },
2242 )
2243 self.logger.debug("Updating tasks to change the to_check_at status")
2244 for task in refresh_tasks:
2245 q_filter = {
2246 "_id": task["_id"],
2247 }
2248 self.db.set_one(
2249 "ro_tasks",
2250 q_filter=q_filter,
2251 update_dict=db_ro_task_update,
2252 fail_on_empty=True,
2253 )
2254
2255 except Exception as e:
2256 self.logger.error(f"Error updating tasks to enable VM status updates: {e}")
2257
2258 def _process_pending_tasks(self, ro_task):
2259 ro_task_id = ro_task["_id"]
2260 now = time.time()
2261 # one day
2262 next_check_at = now + (24 * 60 * 60)
2263 db_ro_task_update = {}
2264
2265 def _update_refresh(new_status):
2266 # compute next_refresh
2267 nonlocal task
2268 nonlocal next_check_at
2269 nonlocal db_ro_task_update
2270 nonlocal ro_task
2271
2272 next_refresh = time.time()
2273
2274 if task["item"] in ("image", "flavor"):
2275 next_refresh += self.refresh_config.image
2276 elif new_status == "BUILD":
2277 next_refresh += self.refresh_config.build
2278 elif new_status == "DONE":
2279 if self.refresh_config.active == -1:
2280 next_refresh = -1
2281 else:
2282 next_refresh += self.refresh_config.active
2283 else:
2284 next_refresh += self.refresh_config.error
2285
2286 next_check_at = min(next_check_at, next_refresh)
2287 db_ro_task_update["vim_info.refresh_at"] = next_refresh
2288 ro_task["vim_info"]["refresh_at"] = next_refresh
2289
2290 try:
2291 """
2292 # Log RO tasks only when loglevel is DEBUG
2293 if self.logger.getEffectiveLevel() == logging.DEBUG:
2294 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
2295 """
2296 # Check if vim status refresh is enabled again
2297 self.update_vm_refresh()
2298 # 0: get task_status_create
2299 lock_object = None
2300 task_status_create = None
2301 task_create = next(
2302 (
2303 t
2304 for t in ro_task["tasks"]
2305 if t
2306 and t["action"] == "CREATE"
2307 and t["status"] in ("BUILD", "DONE")
2308 ),
2309 None,
2310 )
2311
2312 if task_create:
2313 task_status_create = task_create["status"]
2314
2315 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
2316 for task_action in ("DELETE", "CREATE", "EXEC"):
2317 db_vim_update = None
2318 new_status = None
2319
2320 for task_index, task in enumerate(ro_task["tasks"]):
2321 if not task:
2322 continue # task deleted
2323
2324 task_depends = {}
2325 target_update = None
2326
2327 if (
2328 (
2329 task_action in ("DELETE", "EXEC")
2330 and task["status"] not in ("SCHEDULED", "BUILD")
2331 )
2332 or task["action"] != task_action
2333 or (
2334 task_action == "CREATE"
2335 and task["status"] in ("FINISHED", "SUPERSEDED")
2336 )
2337 ):
2338 continue
2339
2340 task_path = "tasks.{}.status".format(task_index)
2341 try:
2342 db_vim_info_update = None
2343
2344 if task["status"] == "SCHEDULED":
2345 # check if tasks that this depends on have been completed
2346 dependency_not_completed = False
2347
2348 for dependency_task_id in task.get("depends_on") or ():
2349 (
2350 dependency_ro_task,
2351 dependency_task_index,
2352 ) = self._get_dependency(
2353 dependency_task_id, target_id=ro_task["target_id"]
2354 )
2355 dependency_task = dependency_ro_task["tasks"][
2356 dependency_task_index
2357 ]
2358 self.logger.debug(
2359 "dependency_ro_task={} dependency_task_index={}".format(
2360 dependency_ro_task, dependency_task_index
2361 )
2362 )
2363
2364 if dependency_task["status"] == "SCHEDULED":
2365 dependency_not_completed = True
2366 next_check_at = min(
2367 next_check_at, dependency_ro_task["to_check_at"]
2368 )
2369 # must allow dependent task to be processed first
2370 # to do this set time after last_task_processed
2371 next_check_at = max(
2372 self.time_last_task_processed, next_check_at
2373 )
2374 break
2375 elif dependency_task["status"] == "FAILED":
2376 error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
2377 task["action"],
2378 task["item"],
2379 dependency_task["action"],
2380 dependency_task["item"],
2381 dependency_task_id,
2382 dependency_ro_task["vim_info"].get(
2383 "vim_message"
2384 ),
2385 )
2386 self.logger.error(
2387 "task={} {}".format(task["task_id"], error_text)
2388 )
2389 raise NsWorkerException(error_text)
2390
2391 task_depends[dependency_task_id] = dependency_ro_task[
2392 "vim_info"
2393 ]["vim_id"]
2394 task_depends[
2395 "TASK-{}".format(dependency_task_id)
2396 ] = dependency_ro_task["vim_info"]["vim_id"]
2397
2398 if dependency_not_completed:
2399 self.logger.warning(
2400 "DEPENDENCY NOT COMPLETED {}".format(
2401 dependency_ro_task["vim_info"]["vim_id"]
2402 )
2403 )
2404 # TODO set at vim_info.vim_details that it is waiting
2405 continue
2406
2407 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
2408 # the task of renew this locking. It will update database locket_at periodically
2409 if not lock_object:
2410 lock_object = LockRenew.add_lock_object(
2411 "ro_tasks", ro_task, self
2412 )
2413
2414 if task["action"] == "DELETE":
2415 (new_status, db_vim_info_update,) = self._delete_task(
2416 ro_task, task_index, task_depends, db_ro_task_update
2417 )
2418 new_status = (
2419 "FINISHED" if new_status == "DONE" else new_status
2420 )
2421 # ^with FINISHED instead of DONE it will not be refreshing
2422
2423 if new_status in ("FINISHED", "SUPERSEDED"):
2424 target_update = "DELETE"
2425 elif task["action"] == "EXEC":
2426 (
2427 new_status,
2428 db_vim_info_update,
2429 db_task_update,
2430 ) = self.item2class[task["item"]].exec(
2431 ro_task, task_index, task_depends
2432 )
2433 new_status = (
2434 "FINISHED" if new_status == "DONE" else new_status
2435 )
2436 # ^with FINISHED instead of DONE it will not be refreshing
2437
2438 if db_task_update:
2439 # load into database the modified db_task_update "retries" and "next_retry"
2440 if db_task_update.get("retries"):
2441 db_ro_task_update[
2442 "tasks.{}.retries".format(task_index)
2443 ] = db_task_update["retries"]
2444
2445 next_check_at = time.time() + db_task_update.get(
2446 "next_retry", 60
2447 )
2448 target_update = None
2449 elif task["action"] == "CREATE":
2450 if task["status"] == "SCHEDULED":
2451 if task_status_create:
2452 new_status = task_status_create
2453 target_update = "COPY_VIM_INFO"
2454 else:
2455 new_status, db_vim_info_update = self.item2class[
2456 task["item"]
2457 ].new(ro_task, task_index, task_depends)
2458 # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
2459 _update_refresh(new_status)
2460 else:
2461 refresh_at = ro_task["vim_info"]["refresh_at"]
2462 if refresh_at and refresh_at != -1 and now > refresh_at:
2463 (new_status, db_vim_info_update,) = self.item2class[
2464 task["item"]
2465 ].refresh(ro_task)
2466 _update_refresh(new_status)
2467 else:
2468 # The refresh is updated to avoid set the value of "refresh_at" to
2469 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2470 # because it can happen that in this case the task is never processed
2471 _update_refresh(task["status"])
2472
2473 except Exception as e:
2474 new_status = "FAILED"
2475 db_vim_info_update = {
2476 "vim_status": "VIM_ERROR",
2477 "vim_message": str(e),
2478 }
2479
2480 if not isinstance(
2481 e, (NsWorkerException, vimconn.VimConnException)
2482 ):
2483 self.logger.error(
2484 "Unexpected exception at _delete_task task={}: {}".format(
2485 task["task_id"], e
2486 ),
2487 exc_info=True,
2488 )
2489
2490 try:
2491 if db_vim_info_update:
2492 db_vim_update = db_vim_info_update.copy()
2493 db_ro_task_update.update(
2494 {
2495 "vim_info." + k: v
2496 for k, v in db_vim_info_update.items()
2497 }
2498 )
2499 ro_task["vim_info"].update(db_vim_info_update)
2500
2501 if new_status:
2502 if task_action == "CREATE":
2503 task_status_create = new_status
2504 db_ro_task_update[task_path] = new_status
2505
2506 if target_update or db_vim_update:
2507 if target_update == "DELETE":
2508 self._update_target(task, None)
2509 elif target_update == "COPY_VIM_INFO":
2510 self._update_target(task, ro_task["vim_info"])
2511 else:
2512 self._update_target(task, db_vim_update)
2513
2514 except Exception as e:
2515 if (
2516 isinstance(e, DbException)
2517 and e.http_code == HTTPStatus.NOT_FOUND
2518 ):
2519 # if the vnfrs or nsrs has been removed from database, this task must be removed
2520 self.logger.debug(
2521 "marking to delete task={}".format(task["task_id"])
2522 )
2523 self.tasks_to_delete.append(task)
2524 else:
2525 self.logger.error(
2526 "Unexpected exception at _update_target task={}: {}".format(
2527 task["task_id"], e
2528 ),
2529 exc_info=True,
2530 )
2531
2532 locked_at = ro_task["locked_at"]
2533
2534 if lock_object:
2535 locked_at = [
2536 lock_object["locked_at"],
2537 lock_object["locked_at"] + self.task_locked_time,
2538 ]
2539 # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
2540 # contain exactly locked_at + self.task_locked_time
2541 LockRenew.remove_lock_object(lock_object)
2542
2543 q_filter = {
2544 "_id": ro_task["_id"],
2545 "to_check_at": ro_task["to_check_at"],
2546 "locked_at": locked_at,
2547 }
2548 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2549 # outside this task (by ro_nbi) do not update it
2550 db_ro_task_update["locked_by"] = None
2551 # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
2552 db_ro_task_update["locked_at"] = int(now - self.task_locked_time)
2553 db_ro_task_update["modified_at"] = now
2554 db_ro_task_update["to_check_at"] = next_check_at
2555
2556 """
2557 # Log RO tasks only when loglevel is DEBUG
2558 if self.logger.getEffectiveLevel() == logging.DEBUG:
2559 db_ro_task_update_log = db_ro_task_update.copy()
2560 db_ro_task_update_log["_id"] = q_filter["_id"]
2561 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2562 """
2563
2564 if not self.db.set_one(
2565 "ro_tasks",
2566 update_dict=db_ro_task_update,
2567 q_filter=q_filter,
2568 fail_on_empty=False,
2569 ):
2570 del db_ro_task_update["to_check_at"]
2571 del q_filter["to_check_at"]
2572 """
2573 # Log RO tasks only when loglevel is DEBUG
2574 if self.logger.getEffectiveLevel() == logging.DEBUG:
2575 self._log_ro_task(
2576 None,
2577 db_ro_task_update_log,
2578 None,
2579 "TASK_WF",
2580 "SET_TASK " + str(q_filter),
2581 )
2582 """
2583 self.db.set_one(
2584 "ro_tasks",
2585 q_filter=q_filter,
2586 update_dict=db_ro_task_update,
2587 fail_on_empty=True,
2588 )
2589 except DbException as e:
2590 self.logger.error(
2591 "ro_task={} Error updating database {}".format(ro_task_id, e)
2592 )
2593 except Exception as e:
2594 self.logger.error(
2595 "Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True
2596 )
2597
2598 def _update_target(self, task, ro_vim_item_update):
2599 table, _, temp = task["target_record"].partition(":")
2600 _id, _, path_vim_status = temp.partition(":")
2601 path_item = path_vim_status[: path_vim_status.rfind(".")]
2602 path_item = path_item[: path_item.rfind(".")]
2603 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
2604 # path_item: dot separated list targeting record information, e.g. "vdur.10"
2605
2606 if ro_vim_item_update:
2607 update_dict = {
2608 path_vim_status + "." + k: v
2609 for k, v in ro_vim_item_update.items()
2610 if k
2611 in (
2612 "vim_id",
2613 "vim_details",
2614 "vim_message",
2615 "vim_name",
2616 "vim_status",
2617 "interfaces",
2618 "interfaces_backup",
2619 )
2620 }
2621
2622 if path_vim_status.startswith("vdur."):
2623 # for backward compatibility, add vdur.name apart from vdur.vim_name
2624 if ro_vim_item_update.get("vim_name"):
2625 update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"]
2626
2627 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
2628 if ro_vim_item_update.get("vim_id"):
2629 update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"]
2630
2631 # update general status
2632 if ro_vim_item_update.get("vim_status"):
2633 update_dict[path_item + ".status"] = ro_vim_item_update[
2634 "vim_status"
2635 ]
2636
2637 if ro_vim_item_update.get("interfaces"):
2638 path_interfaces = path_item + ".interfaces"
2639
2640 for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
2641 if iface:
2642 update_dict.update(
2643 {
2644 path_interfaces + ".{}.".format(i) + k: v
2645 for k, v in iface.items()
2646 if k in ("vlan", "compute_node", "pci")
2647 }
2648 )
2649
2650 # put ip_address and mac_address with ip-address and mac-address
2651 if iface.get("ip_address"):
2652 update_dict[
2653 path_interfaces + ".{}.".format(i) + "ip-address"
2654 ] = iface["ip_address"]
2655
2656 if iface.get("mac_address"):
2657 update_dict[
2658 path_interfaces + ".{}.".format(i) + "mac-address"
2659 ] = iface["mac_address"]
2660
2661 if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
2662 update_dict["ip-address"] = iface.get("ip_address").split(
2663 ";"
2664 )[0]
2665
2666 if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
2667 update_dict[path_item + ".ip-address"] = iface.get(
2668 "ip_address"
2669 ).split(";")[0]
2670
2671 self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
2672
2673 # If interfaces exists, it backups VDU interfaces in the DB for healing operations
2674 if ro_vim_item_update.get("interfaces"):
2675 search_key = path_vim_status + ".interfaces"
2676 if update_dict.get(search_key):
2677 interfaces_backup_update = {
2678 path_vim_status + ".interfaces_backup": update_dict[search_key]
2679 }
2680
2681 self.db.set_one(
2682 table,
2683 q_filter={"_id": _id},
2684 update_dict=interfaces_backup_update,
2685 )
2686
2687 else:
2688 update_dict = {path_item + ".status": "DELETED"}
2689 self.db.set_one(
2690 table,
2691 q_filter={"_id": _id},
2692 update_dict=update_dict,
2693 unset={path_vim_status: None},
2694 )
2695
2696 def _process_delete_db_tasks(self):
2697 """
2698 Delete task from database because vnfrs or nsrs or both have been deleted
2699 :return: None. Uses and modify self.tasks_to_delete
2700 """
2701 while self.tasks_to_delete:
2702 task = self.tasks_to_delete[0]
2703 vnfrs_deleted = None
2704 nsr_id = task["nsr_id"]
2705
2706 if task["target_record"].startswith("vnfrs:"):
2707 # check if nsrs is present
2708 if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False):
2709 vnfrs_deleted = task["target_record"].split(":")[1]
2710
2711 try:
2712 self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted)
2713 except Exception as e:
2714 self.logger.error(
2715 "Error deleting task={}: {}".format(task["task_id"], e)
2716 )
2717 self.tasks_to_delete.pop(0)
2718
2719 @staticmethod
2720 def delete_db_tasks(db, nsr_id, vnfrs_deleted):
2721 """
2722 Static method because it is called from osm_ng_ro.ns
2723 :param db: instance of database to use
2724 :param nsr_id: affected nsrs id
2725 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2726 :return: None, exception is fails
2727 """
2728 retries = 5
2729 for retry in range(retries):
2730 ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
2731 now = time.time()
2732 conflict = False
2733
2734 for ro_task in ro_tasks:
2735 db_update = {}
2736 to_delete_ro_task = True
2737
2738 for index, task in enumerate(ro_task["tasks"]):
2739 if not task:
2740 pass
2741 elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or (
2742 vnfrs_deleted
2743 and task["target_record"].startswith("vnfrs:" + vnfrs_deleted)
2744 ):
2745 db_update["tasks.{}".format(index)] = None
2746 else:
2747 # used by other nsr, ro_task cannot be deleted
2748 to_delete_ro_task = False
2749
2750 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2751 if to_delete_ro_task:
2752 if not db.del_one(
2753 "ro_tasks",
2754 q_filter={
2755 "_id": ro_task["_id"],
2756 "modified_at": ro_task["modified_at"],
2757 },
2758 fail_on_empty=False,
2759 ):
2760 conflict = True
2761 elif db_update:
2762 db_update["modified_at"] = now
2763 if not db.set_one(
2764 "ro_tasks",
2765 q_filter={
2766 "_id": ro_task["_id"],
2767 "modified_at": ro_task["modified_at"],
2768 },
2769 update_dict=db_update,
2770 fail_on_empty=False,
2771 ):
2772 conflict = True
2773 if not conflict:
2774 return
2775 else:
2776 raise NsWorkerException("Exceeded {} retries".format(retries))
2777
2778 def run(self):
2779 # load database
2780 self.logger.info("Starting")
2781 while True:
2782 # step 1: get commands from queue
2783 try:
2784 if self.vim_targets:
2785 task = self.task_queue.get(block=False)
2786 else:
2787 if not self.idle:
2788 self.logger.debug("enters in idle state")
2789 self.idle = True
2790 task = self.task_queue.get(block=True)
2791 self.idle = False
2792
2793 if task[0] == "terminate":
2794 break
2795 elif task[0] == "load_vim":
2796 self.logger.info("order to load vim {}".format(task[1]))
2797 self._load_vim(task[1])
2798 elif task[0] == "unload_vim":
2799 self.logger.info("order to unload vim {}".format(task[1]))
2800 self._unload_vim(task[1])
2801 elif task[0] == "reload_vim":
2802 self._reload_vim(task[1])
2803 elif task[0] == "check_vim":
2804 self.logger.info("order to check vim {}".format(task[1]))
2805 self._check_vim(task[1])
2806 continue
2807 except Exception as e:
2808 if isinstance(e, queue.Empty):
2809 pass
2810 else:
2811 self.logger.critical(
2812 "Error processing task: {}".format(e), exc_info=True
2813 )
2814
2815 # step 2: process pending_tasks, delete not needed tasks
2816 try:
2817 if self.tasks_to_delete:
2818 self._process_delete_db_tasks()
2819 busy = False
2820 """
2821 # Log RO tasks only when loglevel is DEBUG
2822 if self.logger.getEffectiveLevel() == logging.DEBUG:
2823 _ = self._get_db_all_tasks()
2824 """
2825 ro_task = self._get_db_task()
2826 if ro_task:
2827 self.logger.debug("Task to process: {}".format(ro_task))
2828 time.sleep(1)
2829 self._process_pending_tasks(ro_task)
2830 busy = True
2831 if not busy:
2832 time.sleep(5)
2833 except Exception as e:
2834 self.logger.critical(
2835 "Unexpected exception at run: " + str(e), exc_info=True
2836 )
2837
2838 self.logger.info("Finishing")