Fix Bug 2158 and Bug 2254 by arranging exception handling
[osm/RO.git] / NG-RO / osm_ng_ro / ns_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 #
18 ##
19
20 """
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
25 """
26
27 from copy import deepcopy
28 from http import HTTPStatus
29 import logging
30 from os import makedirs
31 from os import path
32 import queue
33 import threading
34 import time
35 import traceback
36 from typing import Dict
37 from unittest.mock import Mock
38
39 from importlib_metadata import entry_points
40 from osm_common.dbbase import DbException
41 from osm_ng_ro.vim_admin import LockRenew
42 from osm_ro_plugin import sdnconn
43 from osm_ro_plugin import vimconn
44 from osm_ro_plugin.sdn_dummy import SdnDummyConnector
45 from osm_ro_plugin.vim_dummy import VimDummyConnector
46 import yaml
47
48 __author__ = "Alfonso Tierno"
49 __date__ = "$28-Sep-2017 12:07:15$"
50
51
52 def deep_get(target_dict, *args, **kwargs):
53 """
54 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
55 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
56 :param target_dict: dictionary to be read
57 :param args: list of keys to read from target_dict
58 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
59 :return: The wanted value if exists, None or default otherwise
60 """
61 for key in args:
62 if not isinstance(target_dict, dict) or key not in target_dict:
63 return kwargs.get("default")
64 target_dict = target_dict[key]
65 return target_dict
66
67
68 class NsWorkerException(Exception):
69 pass
70
71
72 class FailingConnector:
73 def __init__(self, error_msg):
74 self.error_msg = error_msg
75
76 for method in dir(vimconn.VimConnector):
77 if method[0] != "_":
78 setattr(
79 self, method, Mock(side_effect=vimconn.VimConnException(error_msg))
80 )
81
82 for method in dir(sdnconn.SdnConnectorBase):
83 if method[0] != "_":
84 setattr(
85 self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg))
86 )
87
88
89 class NsWorkerExceptionNotFound(NsWorkerException):
90 pass
91
92
93 class VimInteractionBase:
94 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
95 It implements methods that does nothing and return ok"""
96
97 def __init__(self, db, my_vims, db_vims, logger):
98 self.db = db
99 self.logger = logger
100 self.my_vims = my_vims
101 self.db_vims = db_vims
102
103 def new(self, ro_task, task_index, task_depends):
104 return "BUILD", {}
105
106 def refresh(self, ro_task):
107 """skip calling VIM to get image, flavor status. Assumes ok"""
108 if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
109 return "FAILED", {}
110
111 return "DONE", {}
112
113 def delete(self, ro_task, task_index):
114 """skip calling VIM to delete image. Assumes ok"""
115 return "DONE", {}
116
117 def exec(self, ro_task, task_index, task_depends):
118 return "DONE", None, None
119
120
121 class VimInteractionNet(VimInteractionBase):
122 def new(self, ro_task, task_index, task_depends):
123 vim_net_id = None
124 task = ro_task["tasks"][task_index]
125 task_id = task["task_id"]
126 created = False
127 created_items = {}
128 target_vim = self.my_vims[ro_task["target_id"]]
129 mgmtnet = False
130 mgmtnet_defined_in_vim = False
131
132 try:
133 # FIND
134 if task.get("find_params"):
135 # if management, get configuration of VIM
136 if task["find_params"].get("filter_dict"):
137 vim_filter = task["find_params"]["filter_dict"]
138 # management network
139 elif task["find_params"].get("mgmt"):
140 mgmtnet = True
141 if deep_get(
142 self.db_vims[ro_task["target_id"]],
143 "config",
144 "management_network_id",
145 ):
146 mgmtnet_defined_in_vim = True
147 vim_filter = {
148 "id": self.db_vims[ro_task["target_id"]]["config"][
149 "management_network_id"
150 ]
151 }
152 elif deep_get(
153 self.db_vims[ro_task["target_id"]],
154 "config",
155 "management_network_name",
156 ):
157 mgmtnet_defined_in_vim = True
158 vim_filter = {
159 "name": self.db_vims[ro_task["target_id"]]["config"][
160 "management_network_name"
161 ]
162 }
163 else:
164 vim_filter = {"name": task["find_params"]["name"]}
165 else:
166 raise NsWorkerExceptionNotFound(
167 "Invalid find_params for new_net {}".format(task["find_params"])
168 )
169
170 vim_nets = target_vim.get_network_list(vim_filter)
171 if not vim_nets and not task.get("params"):
172 # If there is mgmt-network in the descriptor,
173 # there is no mapping of that network to a VIM network in the descriptor,
174 # also there is no mapping in the "--config" parameter or at VIM creation;
175 # that mgmt-network will be created.
176 if mgmtnet and not mgmtnet_defined_in_vim:
177 net_name = (
178 vim_filter.get("name")
179 if vim_filter.get("name")
180 else vim_filter.get("id")[:16]
181 )
182 vim_net_id, created_items = target_vim.new_network(
183 net_name, None
184 )
185 self.logger.debug(
186 "Created mgmt network vim_net_id: {}".format(vim_net_id)
187 )
188 created = True
189 else:
190 raise NsWorkerExceptionNotFound(
191 "Network not found with this criteria: '{}'".format(
192 task.get("find_params")
193 )
194 )
195 elif len(vim_nets) > 1:
196 raise NsWorkerException(
197 "More than one network found with this criteria: '{}'".format(
198 task["find_params"]
199 )
200 )
201
202 if vim_nets:
203 vim_net_id = vim_nets[0]["id"]
204 else:
205 # CREATE
206 params = task["params"]
207 vim_net_id, created_items = target_vim.new_network(**params)
208 created = True
209
210 ro_vim_item_update = {
211 "vim_id": vim_net_id,
212 "vim_status": "BUILD",
213 "created": created,
214 "created_items": created_items,
215 "vim_details": None,
216 "vim_message": None,
217 }
218 self.logger.debug(
219 "task={} {} new-net={} created={}".format(
220 task_id, ro_task["target_id"], vim_net_id, created
221 )
222 )
223
224 return "BUILD", ro_vim_item_update
225 except (vimconn.VimConnException, NsWorkerException) as e:
226 self.logger.error(
227 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e)
228 )
229 ro_vim_item_update = {
230 "vim_status": "VIM_ERROR",
231 "created": created,
232 "vim_message": str(e),
233 }
234
235 return "FAILED", ro_vim_item_update
236
237 def refresh(self, ro_task):
238 """Call VIM to get network status"""
239 ro_task_id = ro_task["_id"]
240 target_vim = self.my_vims[ro_task["target_id"]]
241 vim_id = ro_task["vim_info"]["vim_id"]
242 net_to_refresh_list = [vim_id]
243
244 try:
245 vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
246 vim_info = vim_dict[vim_id]
247
248 if vim_info["status"] == "ACTIVE":
249 task_status = "DONE"
250 elif vim_info["status"] == "BUILD":
251 task_status = "BUILD"
252 else:
253 task_status = "FAILED"
254 except vimconn.VimConnException as e:
255 # Mark all tasks at VIM_ERROR status
256 self.logger.error(
257 "ro_task={} vim={} get-net={}: {}".format(
258 ro_task_id, ro_task["target_id"], vim_id, e
259 )
260 )
261 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
262 task_status = "FAILED"
263
264 ro_vim_item_update = {}
265 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
266 ro_vim_item_update["vim_status"] = vim_info["status"]
267
268 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
269 ro_vim_item_update["vim_name"] = vim_info.get("name")
270
271 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
272 if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"):
273 ro_vim_item_update["vim_message"] = vim_info.get("error_msg")
274 elif vim_info["status"] == "DELETED":
275 ro_vim_item_update["vim_id"] = None
276 ro_vim_item_update["vim_message"] = "Deleted externally"
277 else:
278 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
279 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
280
281 if ro_vim_item_update:
282 self.logger.debug(
283 "ro_task={} {} get-net={}: status={} {}".format(
284 ro_task_id,
285 ro_task["target_id"],
286 vim_id,
287 ro_vim_item_update.get("vim_status"),
288 ro_vim_item_update.get("vim_message")
289 if ro_vim_item_update.get("vim_status") != "ACTIVE"
290 else "",
291 )
292 )
293
294 return task_status, ro_vim_item_update
295
296 def delete(self, ro_task, task_index):
297 task = ro_task["tasks"][task_index]
298 task_id = task["task_id"]
299 net_vim_id = ro_task["vim_info"]["vim_id"]
300 ro_vim_item_update_ok = {
301 "vim_status": "DELETED",
302 "created": False,
303 "vim_message": "DELETED",
304 "vim_id": None,
305 }
306
307 try:
308 if net_vim_id or ro_task["vim_info"]["created_items"]:
309 target_vim = self.my_vims[ro_task["target_id"]]
310 target_vim.delete_network(
311 net_vim_id, ro_task["vim_info"]["created_items"]
312 )
313 except vimconn.VimConnNotFoundException:
314 ro_vim_item_update_ok["vim_message"] = "already deleted"
315 except vimconn.VimConnException as e:
316 self.logger.error(
317 "ro_task={} vim={} del-net={}: {}".format(
318 ro_task["_id"], ro_task["target_id"], net_vim_id, e
319 )
320 )
321 ro_vim_item_update = {
322 "vim_status": "VIM_ERROR",
323 "vim_message": "Error while deleting: {}".format(e),
324 }
325
326 return "FAILED", ro_vim_item_update
327
328 self.logger.debug(
329 "task={} {} del-net={} {}".format(
330 task_id,
331 ro_task["target_id"],
332 net_vim_id,
333 ro_vim_item_update_ok.get("vim_message", ""),
334 )
335 )
336
337 return "DONE", ro_vim_item_update_ok
338
339
340 class VimInteractionVdu(VimInteractionBase):
341 max_retries_inject_ssh_key = 20 # 20 times
342 time_retries_inject_ssh_key = 30 # wevery 30 seconds
343
344 def new(self, ro_task, task_index, task_depends):
345 task = ro_task["tasks"][task_index]
346 task_id = task["task_id"]
347 created = False
348 target_vim = self.my_vims[ro_task["target_id"]]
349 try:
350 created = True
351 params = task["params"]
352 params_copy = deepcopy(params)
353 net_list = params_copy["net_list"]
354
355 for net in net_list:
356 # change task_id into network_id
357 if "net_id" in net and net["net_id"].startswith("TASK-"):
358 network_id = task_depends[net["net_id"]]
359
360 if not network_id:
361 raise NsWorkerException(
362 "Cannot create VM because depends on a network not created or found "
363 "for {}".format(net["net_id"])
364 )
365
366 net["net_id"] = network_id
367
368 if params_copy["image_id"].startswith("TASK-"):
369 params_copy["image_id"] = task_depends[params_copy["image_id"]]
370
371 if params_copy["flavor_id"].startswith("TASK-"):
372 params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
373
374 affinity_group_list = params_copy["affinity_group_list"]
375 for affinity_group in affinity_group_list:
376 # change task_id into affinity_group_id
377 if "affinity_group_id" in affinity_group and affinity_group[
378 "affinity_group_id"
379 ].startswith("TASK-"):
380 affinity_group_id = task_depends[
381 affinity_group["affinity_group_id"]
382 ]
383
384 if not affinity_group_id:
385 raise NsWorkerException(
386 "found for {}".format(affinity_group["affinity_group_id"])
387 )
388
389 affinity_group["affinity_group_id"] = affinity_group_id
390 vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
391 interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
392
393 # add to created items previous_created_volumes (healing)
394 if task.get("previous_created_volumes"):
395 for k, v in task["previous_created_volumes"].items():
396 created_items[k] = v
397
398 ro_vim_item_update = {
399 "vim_id": vim_vm_id,
400 "vim_status": "BUILD",
401 "created": created,
402 "created_items": created_items,
403 "vim_details": None,
404 "vim_message": None,
405 "interfaces_vim_ids": interfaces,
406 "interfaces": [],
407 "interfaces_backup": [],
408 }
409 self.logger.debug(
410 "task={} {} new-vm={} created={}".format(
411 task_id, ro_task["target_id"], vim_vm_id, created
412 )
413 )
414
415 return "BUILD", ro_vim_item_update
416 except (vimconn.VimConnException, NsWorkerException) as e:
417 self.logger.debug(traceback.format_exc())
418 self.logger.error(
419 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
420 )
421 ro_vim_item_update = {
422 "vim_status": "VIM_ERROR",
423 "created": created,
424 "vim_message": str(e),
425 }
426
427 return "FAILED", ro_vim_item_update
428
429 def delete(self, ro_task, task_index):
430 task = ro_task["tasks"][task_index]
431 task_id = task["task_id"]
432 vm_vim_id = ro_task["vim_info"]["vim_id"]
433 ro_vim_item_update_ok = {
434 "vim_status": "DELETED",
435 "created": False,
436 "vim_message": "DELETED",
437 "vim_id": None,
438 }
439
440 try:
441 self.logger.debug(
442 "delete_vminstance: vm_vim_id={} created_items={}".format(
443 vm_vim_id, ro_task["vim_info"]["created_items"]
444 )
445 )
446 if vm_vim_id or ro_task["vim_info"]["created_items"]:
447 target_vim = self.my_vims[ro_task["target_id"]]
448 target_vim.delete_vminstance(
449 vm_vim_id,
450 ro_task["vim_info"]["created_items"],
451 ro_task["vim_info"].get("volumes_to_hold", []),
452 )
453 except vimconn.VimConnNotFoundException:
454 ro_vim_item_update_ok["vim_message"] = "already deleted"
455 except vimconn.VimConnException as e:
456 self.logger.error(
457 "ro_task={} vim={} del-vm={}: {}".format(
458 ro_task["_id"], ro_task["target_id"], vm_vim_id, e
459 )
460 )
461 ro_vim_item_update = {
462 "vim_status": "VIM_ERROR",
463 "vim_message": "Error while deleting: {}".format(e),
464 }
465
466 return "FAILED", ro_vim_item_update
467
468 self.logger.debug(
469 "task={} {} del-vm={} {}".format(
470 task_id,
471 ro_task["target_id"],
472 vm_vim_id,
473 ro_vim_item_update_ok.get("vim_message", ""),
474 )
475 )
476
477 return "DONE", ro_vim_item_update_ok
478
479 def refresh(self, ro_task):
480 """Call VIM to get vm status"""
481 ro_task_id = ro_task["_id"]
482 target_vim = self.my_vims[ro_task["target_id"]]
483 vim_id = ro_task["vim_info"]["vim_id"]
484
485 if not vim_id:
486 return None, None
487
488 vm_to_refresh_list = [vim_id]
489 try:
490 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
491 vim_info = vim_dict[vim_id]
492
493 if vim_info["status"] == "ACTIVE":
494 task_status = "DONE"
495 elif vim_info["status"] == "BUILD":
496 task_status = "BUILD"
497 else:
498 task_status = "FAILED"
499
500 # try to load and parse vim_information
501 try:
502 vim_info_info = yaml.safe_load(vim_info["vim_info"])
503 if vim_info_info.get("name"):
504 vim_info["name"] = vim_info_info["name"]
505 except Exception as vim_info_error:
506 self.logger.exception(
507 f"{vim_info_error} occured while getting the vim_info from yaml"
508 )
509 except vimconn.VimConnException as e:
510 # Mark all tasks at VIM_ERROR status
511 self.logger.error(
512 "ro_task={} vim={} get-vm={}: {}".format(
513 ro_task_id, ro_task["target_id"], vim_id, e
514 )
515 )
516 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
517 task_status = "FAILED"
518
519 ro_vim_item_update = {}
520
521 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
522 vim_interfaces = []
523 if vim_info.get("interfaces"):
524 for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
525 iface = next(
526 (
527 iface
528 for iface in vim_info["interfaces"]
529 if vim_iface_id == iface["vim_interface_id"]
530 ),
531 None,
532 )
533 # if iface:
534 # iface.pop("vim_info", None)
535 vim_interfaces.append(iface)
536
537 task_create = next(
538 t
539 for t in ro_task["tasks"]
540 if t and t["action"] == "CREATE" and t["status"] != "FINISHED"
541 )
542 if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None:
543 vim_interfaces[task_create["mgmt_vnf_interface"]][
544 "mgmt_vnf_interface"
545 ] = True
546
547 mgmt_vdu_iface = task_create.get(
548 "mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0)
549 )
550 if vim_interfaces:
551 vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
552
553 if ro_task["vim_info"]["interfaces"] != vim_interfaces:
554 ro_vim_item_update["interfaces"] = vim_interfaces
555
556 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
557 ro_vim_item_update["vim_status"] = vim_info["status"]
558
559 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
560 ro_vim_item_update["vim_name"] = vim_info.get("name")
561
562 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
563 if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"):
564 ro_vim_item_update["vim_message"] = vim_info.get("error_msg")
565 elif vim_info["status"] == "DELETED":
566 ro_vim_item_update["vim_id"] = None
567 ro_vim_item_update["vim_message"] = "Deleted externally"
568 else:
569 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
570 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
571
572 if ro_vim_item_update:
573 self.logger.debug(
574 "ro_task={} {} get-vm={}: status={} {}".format(
575 ro_task_id,
576 ro_task["target_id"],
577 vim_id,
578 ro_vim_item_update.get("vim_status"),
579 ro_vim_item_update.get("vim_message")
580 if ro_vim_item_update.get("vim_status") != "ACTIVE"
581 else "",
582 )
583 )
584
585 return task_status, ro_vim_item_update
586
587 def exec(self, ro_task, task_index, task_depends):
588 task = ro_task["tasks"][task_index]
589 task_id = task["task_id"]
590 target_vim = self.my_vims[ro_task["target_id"]]
591 db_task_update = {"retries": 0}
592 retries = task.get("retries", 0)
593
594 try:
595 params = task["params"]
596 params_copy = deepcopy(params)
597 params_copy["ro_key"] = self.db.decrypt(
598 params_copy.pop("private_key"),
599 params_copy.pop("schema_version"),
600 params_copy.pop("salt"),
601 )
602 params_copy["ip_addr"] = params_copy.pop("ip_address")
603 target_vim.inject_user_key(**params_copy)
604 self.logger.debug(
605 "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"])
606 )
607
608 return (
609 "DONE",
610 None,
611 db_task_update,
612 ) # params_copy["key"]
613 except (vimconn.VimConnException, NsWorkerException) as e:
614 retries += 1
615
616 self.logger.debug(traceback.format_exc())
617 if retries < self.max_retries_inject_ssh_key:
618 return (
619 "BUILD",
620 None,
621 {
622 "retries": retries,
623 "next_retry": self.time_retries_inject_ssh_key,
624 },
625 )
626
627 self.logger.error(
628 "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e)
629 )
630 ro_vim_item_update = {"vim_message": str(e)}
631
632 return "FAILED", ro_vim_item_update, db_task_update
633
634
635 class VimInteractionImage(VimInteractionBase):
636 def new(self, ro_task, task_index, task_depends):
637 task = ro_task["tasks"][task_index]
638 task_id = task["task_id"]
639 created = False
640 created_items = {}
641 target_vim = self.my_vims[ro_task["target_id"]]
642
643 try:
644 # FIND
645 vim_image_id = ""
646 if task.get("find_params"):
647 vim_images = target_vim.get_image_list(
648 task["find_params"].get("filter_dict", {})
649 )
650
651 if not vim_images:
652 raise NsWorkerExceptionNotFound(
653 "Image not found with this criteria: '{}'".format(
654 task["find_params"]
655 )
656 )
657 elif len(vim_images) > 1:
658 raise NsWorkerException(
659 "More than one image found with this criteria: '{}'".format(
660 task["find_params"]
661 )
662 )
663 else:
664 vim_image_id = vim_images[0]["id"]
665
666 ro_vim_item_update = {
667 "vim_id": vim_image_id,
668 "vim_status": "ACTIVE",
669 "created": created,
670 "created_items": created_items,
671 "vim_details": None,
672 "vim_message": None,
673 }
674 self.logger.debug(
675 "task={} {} new-image={} created={}".format(
676 task_id, ro_task["target_id"], vim_image_id, created
677 )
678 )
679
680 return "DONE", ro_vim_item_update
681 except (NsWorkerException, vimconn.VimConnException) as e:
682 self.logger.error(
683 "task={} {} new-image: {}".format(task_id, ro_task["target_id"], e)
684 )
685 ro_vim_item_update = {
686 "vim_status": "VIM_ERROR",
687 "created": created,
688 "vim_message": str(e),
689 }
690
691 return "FAILED", ro_vim_item_update
692
693
694 class VimInteractionSharedVolume(VimInteractionBase):
695 def delete(self, ro_task, task_index):
696 task = ro_task["tasks"][task_index]
697 task_id = task["task_id"]
698 shared_volume_vim_id = ro_task["vim_info"]["vim_id"]
699 created_items = ro_task["vim_info"]["created_items"]
700 ro_vim_item_update_ok = {
701 "vim_status": "DELETED",
702 "created": False,
703 "vim_message": "DELETED",
704 "vim_id": None,
705 }
706 if created_items and created_items.get(shared_volume_vim_id).get("keep"):
707 ro_vim_item_update_ok = {
708 "vim_status": "ACTIVE",
709 "created": False,
710 "vim_message": None,
711 }
712 return "DONE", ro_vim_item_update_ok
713 try:
714 if shared_volume_vim_id:
715 target_vim = self.my_vims[ro_task["target_id"]]
716 target_vim.delete_shared_volumes(shared_volume_vim_id)
717 except vimconn.VimConnNotFoundException:
718 ro_vim_item_update_ok["vim_message"] = "already deleted"
719 except vimconn.VimConnException as e:
720 self.logger.error(
721 "ro_task={} vim={} del-shared-volume={}: {}".format(
722 ro_task["_id"], ro_task["target_id"], shared_volume_vim_id, e
723 )
724 )
725 ro_vim_item_update = {
726 "vim_status": "VIM_ERROR",
727 "vim_message": "Error while deleting: {}".format(e),
728 }
729
730 return "FAILED", ro_vim_item_update
731
732 self.logger.debug(
733 "task={} {} del-shared-volume={} {}".format(
734 task_id,
735 ro_task["target_id"],
736 shared_volume_vim_id,
737 ro_vim_item_update_ok.get("vim_message", ""),
738 )
739 )
740
741 return "DONE", ro_vim_item_update_ok
742
743 def new(self, ro_task, task_index, task_depends):
744 task = ro_task["tasks"][task_index]
745 task_id = task["task_id"]
746 created = False
747 created_items = {}
748 target_vim = self.my_vims[ro_task["target_id"]]
749
750 try:
751 shared_volume_vim_id = None
752 shared_volume_data = None
753
754 if task.get("params"):
755 shared_volume_data = task["params"]
756
757 if shared_volume_data:
758 self.logger.info(
759 f"Creating the new shared_volume for {shared_volume_data}\n"
760 )
761 (
762 shared_volume_name,
763 shared_volume_vim_id,
764 ) = target_vim.new_shared_volumes(shared_volume_data)
765 created = True
766 created_items[shared_volume_vim_id] = {
767 "name": shared_volume_name,
768 "keep": shared_volume_data.get("keep"),
769 }
770
771 ro_vim_item_update = {
772 "vim_id": shared_volume_vim_id,
773 "vim_status": "ACTIVE",
774 "created": created,
775 "created_items": created_items,
776 "vim_details": None,
777 "vim_message": None,
778 }
779 self.logger.debug(
780 "task={} {} new-shared-volume={} created={}".format(
781 task_id, ro_task["target_id"], shared_volume_vim_id, created
782 )
783 )
784
785 return "DONE", ro_vim_item_update
786 except (vimconn.VimConnException, NsWorkerException) as e:
787 self.logger.error(
788 "task={} vim={} new-shared-volume:"
789 " {}".format(task_id, ro_task["target_id"], e)
790 )
791 ro_vim_item_update = {
792 "vim_status": "VIM_ERROR",
793 "created": created,
794 "vim_message": str(e),
795 }
796
797 return "FAILED", ro_vim_item_update
798
799
800 class VimInteractionFlavor(VimInteractionBase):
801 def delete(self, ro_task, task_index):
802 task = ro_task["tasks"][task_index]
803 task_id = task["task_id"]
804 flavor_vim_id = ro_task["vim_info"]["vim_id"]
805 ro_vim_item_update_ok = {
806 "vim_status": "DELETED",
807 "created": False,
808 "vim_message": "DELETED",
809 "vim_id": None,
810 }
811
812 try:
813 if flavor_vim_id:
814 target_vim = self.my_vims[ro_task["target_id"]]
815 target_vim.delete_flavor(flavor_vim_id)
816 except vimconn.VimConnNotFoundException:
817 ro_vim_item_update_ok["vim_message"] = "already deleted"
818 except vimconn.VimConnException as e:
819 self.logger.error(
820 "ro_task={} vim={} del-flavor={}: {}".format(
821 ro_task["_id"], ro_task["target_id"], flavor_vim_id, e
822 )
823 )
824 ro_vim_item_update = {
825 "vim_status": "VIM_ERROR",
826 "vim_message": "Error while deleting: {}".format(e),
827 }
828
829 return "FAILED", ro_vim_item_update
830
831 self.logger.debug(
832 "task={} {} del-flavor={} {}".format(
833 task_id,
834 ro_task["target_id"],
835 flavor_vim_id,
836 ro_vim_item_update_ok.get("vim_message", ""),
837 )
838 )
839
840 return "DONE", ro_vim_item_update_ok
841
842 def new(self, ro_task, task_index, task_depends):
843 task = ro_task["tasks"][task_index]
844 task_id = task["task_id"]
845 created = False
846 created_items = {}
847 target_vim = self.my_vims[ro_task["target_id"]]
848 try:
849 # FIND
850 vim_flavor_id = None
851
852 if task.get("find_params", {}).get("vim_flavor_id"):
853 vim_flavor_id = task["find_params"]["vim_flavor_id"]
854 elif task.get("find_params", {}).get("flavor_data"):
855 try:
856 flavor_data = task["find_params"]["flavor_data"]
857 vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
858 except vimconn.VimConnNotFoundException as flavor_not_found_msg:
859 self.logger.warning(
860 f"VimConnNotFoundException occured: {flavor_not_found_msg}"
861 )
862
863 if not vim_flavor_id and task.get("params"):
864 # CREATE
865 flavor_data = task["params"]["flavor_data"]
866 vim_flavor_id = target_vim.new_flavor(flavor_data)
867 created = True
868
869 ro_vim_item_update = {
870 "vim_id": vim_flavor_id,
871 "vim_status": "ACTIVE",
872 "created": created,
873 "created_items": created_items,
874 "vim_details": None,
875 "vim_message": None,
876 }
877 self.logger.debug(
878 "task={} {} new-flavor={} created={}".format(
879 task_id, ro_task["target_id"], vim_flavor_id, created
880 )
881 )
882
883 return "DONE", ro_vim_item_update
884 except (vimconn.VimConnException, NsWorkerException) as e:
885 self.logger.error(
886 "task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e)
887 )
888 ro_vim_item_update = {
889 "vim_status": "VIM_ERROR",
890 "created": created,
891 "vim_message": str(e),
892 }
893
894 return "FAILED", ro_vim_item_update
895
896
897 class VimInteractionAffinityGroup(VimInteractionBase):
898 def delete(self, ro_task, task_index):
899 task = ro_task["tasks"][task_index]
900 task_id = task["task_id"]
901 affinity_group_vim_id = ro_task["vim_info"]["vim_id"]
902 ro_vim_item_update_ok = {
903 "vim_status": "DELETED",
904 "created": False,
905 "vim_message": "DELETED",
906 "vim_id": None,
907 }
908
909 try:
910 if affinity_group_vim_id:
911 target_vim = self.my_vims[ro_task["target_id"]]
912 target_vim.delete_affinity_group(affinity_group_vim_id)
913 except vimconn.VimConnNotFoundException:
914 ro_vim_item_update_ok["vim_message"] = "already deleted"
915 except vimconn.VimConnException as e:
916 self.logger.error(
917 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format(
918 ro_task["_id"], ro_task["target_id"], affinity_group_vim_id, e
919 )
920 )
921 ro_vim_item_update = {
922 "vim_status": "VIM_ERROR",
923 "vim_message": "Error while deleting: {}".format(e),
924 }
925
926 return "FAILED", ro_vim_item_update
927
928 self.logger.debug(
929 "task={} {} del-affinity-or-anti-affinity-group={} {}".format(
930 task_id,
931 ro_task["target_id"],
932 affinity_group_vim_id,
933 ro_vim_item_update_ok.get("vim_message", ""),
934 )
935 )
936
937 return "DONE", ro_vim_item_update_ok
938
939 def new(self, ro_task, task_index, task_depends):
940 task = ro_task["tasks"][task_index]
941 task_id = task["task_id"]
942 created = False
943 created_items = {}
944 target_vim = self.my_vims[ro_task["target_id"]]
945
946 try:
947 affinity_group_vim_id = None
948 affinity_group_data = None
949 param_affinity_group_id = ""
950
951 if task.get("params"):
952 affinity_group_data = task["params"].get("affinity_group_data")
953
954 if affinity_group_data and affinity_group_data.get("vim-affinity-group-id"):
955 try:
956 param_affinity_group_id = task["params"]["affinity_group_data"].get(
957 "vim-affinity-group-id"
958 )
959 affinity_group_vim_id = target_vim.get_affinity_group(
960 param_affinity_group_id
961 ).get("id")
962 except vimconn.VimConnNotFoundException:
963 self.logger.error(
964 "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}"
965 "could not be found at VIM. Creating a new one.".format(
966 task_id, ro_task["target_id"], param_affinity_group_id
967 )
968 )
969
970 if not affinity_group_vim_id and affinity_group_data:
971 affinity_group_vim_id = target_vim.new_affinity_group(
972 affinity_group_data
973 )
974 created = True
975
976 ro_vim_item_update = {
977 "vim_id": affinity_group_vim_id,
978 "vim_status": "ACTIVE",
979 "created": created,
980 "created_items": created_items,
981 "vim_details": None,
982 "vim_message": None,
983 }
984 self.logger.debug(
985 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format(
986 task_id, ro_task["target_id"], affinity_group_vim_id, created
987 )
988 )
989
990 return "DONE", ro_vim_item_update
991 except (vimconn.VimConnException, NsWorkerException) as e:
992 self.logger.error(
993 "task={} vim={} new-affinity-or-anti-affinity-group:"
994 " {}".format(task_id, ro_task["target_id"], e)
995 )
996 ro_vim_item_update = {
997 "vim_status": "VIM_ERROR",
998 "created": created,
999 "vim_message": str(e),
1000 }
1001
1002 return "FAILED", ro_vim_item_update
1003
1004
1005 class VimInteractionUpdateVdu(VimInteractionBase):
1006 def exec(self, ro_task, task_index, task_depends):
1007 task = ro_task["tasks"][task_index]
1008 task_id = task["task_id"]
1009 db_task_update = {"retries": 0}
1010 created = False
1011 created_items = {}
1012 target_vim = self.my_vims[ro_task["target_id"]]
1013
1014 try:
1015 vim_vm_id = ""
1016 if task.get("params"):
1017 vim_vm_id = task["params"].get("vim_vm_id")
1018 action = task["params"].get("action")
1019 context = {action: action}
1020 target_vim.action_vminstance(vim_vm_id, context)
1021 # created = True
1022 ro_vim_item_update = {
1023 "vim_id": vim_vm_id,
1024 "vim_status": "ACTIVE",
1025 "created": created,
1026 "created_items": created_items,
1027 "vim_details": None,
1028 "vim_message": None,
1029 }
1030 self.logger.debug(
1031 "task={} {} vm-migration done".format(task_id, ro_task["target_id"])
1032 )
1033 return "DONE", ro_vim_item_update, db_task_update
1034 except (vimconn.VimConnException, NsWorkerException) as e:
1035 self.logger.error(
1036 "task={} vim={} VM Migration:"
1037 " {}".format(task_id, ro_task["target_id"], e)
1038 )
1039 ro_vim_item_update = {
1040 "vim_status": "VIM_ERROR",
1041 "created": created,
1042 "vim_message": str(e),
1043 }
1044
1045 return "FAILED", ro_vim_item_update, db_task_update
1046
1047
1048 class VimInteractionSdnNet(VimInteractionBase):
1049 @staticmethod
1050 def _match_pci(port_pci, mapping):
1051 """
1052 Check if port_pci matches with mapping.
1053 The mapping can have brackets to indicate that several chars are accepted. e.g
1054 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
1055 :param port_pci: text
1056 :param mapping: text, can contain brackets to indicate several chars are available
1057 :return: True if matches, False otherwise
1058 """
1059 if not port_pci or not mapping:
1060 return False
1061 if port_pci == mapping:
1062 return True
1063
1064 mapping_index = 0
1065 pci_index = 0
1066 while True:
1067 bracket_start = mapping.find("[", mapping_index)
1068
1069 if bracket_start == -1:
1070 break
1071
1072 bracket_end = mapping.find("]", bracket_start)
1073 if bracket_end == -1:
1074 break
1075
1076 length = bracket_start - mapping_index
1077 if (
1078 length
1079 and port_pci[pci_index : pci_index + length]
1080 != mapping[mapping_index:bracket_start]
1081 ):
1082 return False
1083
1084 if (
1085 port_pci[pci_index + length]
1086 not in mapping[bracket_start + 1 : bracket_end]
1087 ):
1088 return False
1089
1090 pci_index += length + 1
1091 mapping_index = bracket_end + 1
1092
1093 if port_pci[pci_index:] != mapping[mapping_index:]:
1094 return False
1095
1096 return True
1097
1098 def _get_interfaces(self, vlds_to_connect, vim_account_id):
1099 """
1100 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
1101 :param vim_account_id:
1102 :return:
1103 """
1104 interfaces = []
1105
1106 for vld in vlds_to_connect:
1107 table, _, db_id = vld.partition(":")
1108 db_id, _, vld = db_id.partition(":")
1109 _, _, vld_id = vld.partition(".")
1110
1111 if table == "vnfrs":
1112 q_filter = {"vim-account-id": vim_account_id, "_id": db_id}
1113 iface_key = "vnf-vld-id"
1114 else: # table == "nsrs"
1115 q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id}
1116 iface_key = "ns-vld-id"
1117
1118 db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter)
1119
1120 for db_vnfr in db_vnfrs:
1121 for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())):
1122 for iface_index, interface in enumerate(vdur["interfaces"]):
1123 if interface.get(iface_key) == vld_id and interface.get(
1124 "type"
1125 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
1126 # only SR-IOV o PT
1127 interface_ = interface.copy()
1128 interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
1129 db_vnfr["_id"], vdu_index, iface_index
1130 )
1131
1132 if vdur.get("status") == "ERROR":
1133 interface_["status"] = "ERROR"
1134
1135 interfaces.append(interface_)
1136
1137 return interfaces
1138
1139 def refresh(self, ro_task):
1140 # look for task create
1141 task_create_index, _ = next(
1142 i_t
1143 for i_t in enumerate(ro_task["tasks"])
1144 if i_t[1]
1145 and i_t[1]["action"] == "CREATE"
1146 and i_t[1]["status"] != "FINISHED"
1147 )
1148
1149 return self.new(ro_task, task_create_index, None)
1150
1151 def new(self, ro_task, task_index, task_depends):
1152 task = ro_task["tasks"][task_index]
1153 task_id = task["task_id"]
1154 target_vim = self.my_vims[ro_task["target_id"]]
1155
1156 sdn_net_id = ro_task["vim_info"]["vim_id"]
1157
1158 created_items = ro_task["vim_info"].get("created_items")
1159 connected_ports = ro_task["vim_info"].get("connected_ports", [])
1160 new_connected_ports = []
1161 last_update = ro_task["vim_info"].get("last_update", 0)
1162 sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD"
1163 error_list = []
1164 created = ro_task["vim_info"].get("created", False)
1165
1166 try:
1167 # CREATE
1168 db_vim = {}
1169 params = task["params"]
1170 vlds_to_connect = params.get("vlds", [])
1171 associated_vim = params.get("target_vim")
1172 # external additional ports
1173 additional_ports = params.get("sdn-ports") or ()
1174 _, _, vim_account_id = (
1175 (None, None, None)
1176 if associated_vim is None
1177 else associated_vim.partition(":")
1178 )
1179
1180 if associated_vim:
1181 # get associated VIM
1182 if associated_vim not in self.db_vims:
1183 self.db_vims[associated_vim] = self.db.get_one(
1184 "vim_accounts", {"_id": vim_account_id}
1185 )
1186
1187 db_vim = self.db_vims[associated_vim]
1188
1189 # look for ports to connect
1190 ports = self._get_interfaces(vlds_to_connect, vim_account_id)
1191 # print(ports)
1192
1193 sdn_ports = []
1194 pending_ports = error_ports = 0
1195 vlan_used = None
1196 sdn_need_update = False
1197
1198 for port in ports:
1199 vlan_used = port.get("vlan") or vlan_used
1200
1201 # TODO. Do not connect if already done
1202 if not port.get("compute_node") or not port.get("pci"):
1203 if port.get("status") == "ERROR":
1204 error_ports += 1
1205 else:
1206 pending_ports += 1
1207 continue
1208
1209 pmap = None
1210 compute_node_mappings = next(
1211 (
1212 c
1213 for c in db_vim["config"].get("sdn-port-mapping", ())
1214 if c and c["compute_node"] == port["compute_node"]
1215 ),
1216 None,
1217 )
1218
1219 if compute_node_mappings:
1220 # process port_mapping pci of type 0000:af:1[01].[1357]
1221 pmap = next(
1222 (
1223 p
1224 for p in compute_node_mappings["ports"]
1225 if self._match_pci(port["pci"], p.get("pci"))
1226 ),
1227 None,
1228 )
1229
1230 if not pmap:
1231 if not db_vim["config"].get("mapping_not_needed"):
1232 error_list.append(
1233 "Port mapping not found for compute_node={} pci={}".format(
1234 port["compute_node"], port["pci"]
1235 )
1236 )
1237 continue
1238
1239 pmap = {}
1240
1241 service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"])
1242 new_port = {
1243 "service_endpoint_id": pmap.get("service_endpoint_id")
1244 or service_endpoint_id,
1245 "service_endpoint_encapsulation_type": "dot1q"
1246 if port["type"] == "SR-IOV"
1247 else None,
1248 "service_endpoint_encapsulation_info": {
1249 "vlan": port.get("vlan"),
1250 "mac": port.get("mac-address"),
1251 "device_id": pmap.get("device_id") or port["compute_node"],
1252 "device_interface_id": pmap.get("device_interface_id")
1253 or port["pci"],
1254 "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"),
1255 "switch_port": pmap.get("switch_port"),
1256 "service_mapping_info": pmap.get("service_mapping_info"),
1257 },
1258 }
1259
1260 # TODO
1261 # if port["modified_at"] > last_update:
1262 # sdn_need_update = True
1263 new_connected_ports.append(port["id"]) # TODO
1264 sdn_ports.append(new_port)
1265
1266 if error_ports:
1267 error_list.append(
1268 "{} interfaces have not been created as VDU is on ERROR status".format(
1269 error_ports
1270 )
1271 )
1272
1273 # connect external ports
1274 for index, additional_port in enumerate(additional_ports):
1275 additional_port_id = additional_port.get(
1276 "service_endpoint_id"
1277 ) or "external-{}".format(index)
1278 sdn_ports.append(
1279 {
1280 "service_endpoint_id": additional_port_id,
1281 "service_endpoint_encapsulation_type": additional_port.get(
1282 "service_endpoint_encapsulation_type", "dot1q"
1283 ),
1284 "service_endpoint_encapsulation_info": {
1285 "vlan": additional_port.get("vlan") or vlan_used,
1286 "mac": additional_port.get("mac_address"),
1287 "device_id": additional_port.get("device_id"),
1288 "device_interface_id": additional_port.get(
1289 "device_interface_id"
1290 ),
1291 "switch_dpid": additional_port.get("switch_dpid")
1292 or additional_port.get("switch_id"),
1293 "switch_port": additional_port.get("switch_port"),
1294 "service_mapping_info": additional_port.get(
1295 "service_mapping_info"
1296 ),
1297 },
1298 }
1299 )
1300 new_connected_ports.append(additional_port_id)
1301 sdn_info = ""
1302
1303 # if there are more ports to connect or they have been modified, call create/update
1304 if error_list:
1305 sdn_status = "ERROR"
1306 sdn_info = "; ".join(error_list)
1307 elif set(connected_ports) != set(new_connected_ports) or sdn_need_update:
1308 last_update = time.time()
1309
1310 if not sdn_net_id:
1311 if len(sdn_ports) < 2:
1312 sdn_status = "ACTIVE"
1313
1314 if not pending_ports:
1315 self.logger.debug(
1316 "task={} {} new-sdn-net done, less than 2 ports".format(
1317 task_id, ro_task["target_id"]
1318 )
1319 )
1320 else:
1321 net_type = params.get("type") or "ELAN"
1322 (
1323 sdn_net_id,
1324 created_items,
1325 ) = target_vim.create_connectivity_service(net_type, sdn_ports)
1326 created = True
1327 self.logger.debug(
1328 "task={} {} new-sdn-net={} created={}".format(
1329 task_id, ro_task["target_id"], sdn_net_id, created
1330 )
1331 )
1332 else:
1333 created_items = target_vim.edit_connectivity_service(
1334 sdn_net_id, conn_info=created_items, connection_points=sdn_ports
1335 )
1336 created = True
1337 self.logger.debug(
1338 "task={} {} update-sdn-net={} created={}".format(
1339 task_id, ro_task["target_id"], sdn_net_id, created
1340 )
1341 )
1342
1343 connected_ports = new_connected_ports
1344 elif sdn_net_id:
1345 wim_status_dict = target_vim.get_connectivity_service_status(
1346 sdn_net_id, conn_info=created_items
1347 )
1348 sdn_status = wim_status_dict["sdn_status"]
1349
1350 if wim_status_dict.get("sdn_info"):
1351 sdn_info = str(wim_status_dict.get("sdn_info")) or ""
1352
1353 if wim_status_dict.get("error_msg"):
1354 sdn_info = wim_status_dict.get("error_msg") or ""
1355
1356 if pending_ports:
1357 if sdn_status != "ERROR":
1358 sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1359 len(ports) - pending_ports, len(ports)
1360 )
1361
1362 if sdn_status == "ACTIVE":
1363 sdn_status = "BUILD"
1364
1365 ro_vim_item_update = {
1366 "vim_id": sdn_net_id,
1367 "vim_status": sdn_status,
1368 "created": created,
1369 "created_items": created_items,
1370 "connected_ports": connected_ports,
1371 "vim_details": sdn_info,
1372 "vim_message": None,
1373 "last_update": last_update,
1374 }
1375
1376 return sdn_status, ro_vim_item_update
1377 except Exception as e:
1378 self.logger.error(
1379 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e),
1380 exc_info=not isinstance(
1381 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1382 ),
1383 )
1384 ro_vim_item_update = {
1385 "vim_status": "VIM_ERROR",
1386 "created": created,
1387 "vim_message": str(e),
1388 }
1389
1390 return "FAILED", ro_vim_item_update
1391
1392 def delete(self, ro_task, task_index):
1393 task = ro_task["tasks"][task_index]
1394 task_id = task["task_id"]
1395 sdn_vim_id = ro_task["vim_info"].get("vim_id")
1396 ro_vim_item_update_ok = {
1397 "vim_status": "DELETED",
1398 "created": False,
1399 "vim_message": "DELETED",
1400 "vim_id": None,
1401 }
1402
1403 try:
1404 if sdn_vim_id:
1405 target_vim = self.my_vims[ro_task["target_id"]]
1406 target_vim.delete_connectivity_service(
1407 sdn_vim_id, ro_task["vim_info"].get("created_items")
1408 )
1409
1410 except Exception as e:
1411 if (
1412 isinstance(e, sdnconn.SdnConnectorError)
1413 and e.http_code == HTTPStatus.NOT_FOUND.value
1414 ):
1415 ro_vim_item_update_ok["vim_message"] = "already deleted"
1416 else:
1417 self.logger.error(
1418 "ro_task={} vim={} del-sdn-net={}: {}".format(
1419 ro_task["_id"], ro_task["target_id"], sdn_vim_id, e
1420 ),
1421 exc_info=not isinstance(
1422 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1423 ),
1424 )
1425 ro_vim_item_update = {
1426 "vim_status": "VIM_ERROR",
1427 "vim_message": "Error while deleting: {}".format(e),
1428 }
1429
1430 return "FAILED", ro_vim_item_update
1431
1432 self.logger.debug(
1433 "task={} {} del-sdn-net={} {}".format(
1434 task_id,
1435 ro_task["target_id"],
1436 sdn_vim_id,
1437 ro_vim_item_update_ok.get("vim_message", ""),
1438 )
1439 )
1440
1441 return "DONE", ro_vim_item_update_ok
1442
1443
1444 class VimInteractionMigration(VimInteractionBase):
1445 def exec(self, ro_task, task_index, task_depends):
1446 task = ro_task["tasks"][task_index]
1447 task_id = task["task_id"]
1448 db_task_update = {"retries": 0}
1449 target_vim = self.my_vims[ro_task["target_id"]]
1450 vim_interfaces = []
1451 created = False
1452 created_items = {}
1453 refreshed_vim_info = {}
1454
1455 try:
1456 vim_vm_id = ""
1457 if task.get("params"):
1458 vim_vm_id = task["params"].get("vim_vm_id")
1459 migrate_host = task["params"].get("migrate_host")
1460 _, migrated_compute_node = target_vim.migrate_instance(
1461 vim_vm_id, migrate_host
1462 )
1463
1464 if migrated_compute_node:
1465 # When VM is migrated, vdu["vim_info"] needs to be updated
1466 vdu_old_vim_info = task["params"]["vdu_vim_info"].get(
1467 ro_task["target_id"]
1468 )
1469
1470 # Refresh VM to get new vim_info
1471 vm_to_refresh_list = [vim_vm_id]
1472 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
1473 refreshed_vim_info = vim_dict[vim_vm_id]
1474
1475 if refreshed_vim_info.get("interfaces"):
1476 for old_iface in vdu_old_vim_info.get("interfaces"):
1477 iface = next(
1478 (
1479 iface
1480 for iface in refreshed_vim_info["interfaces"]
1481 if old_iface["vim_interface_id"]
1482 == iface["vim_interface_id"]
1483 ),
1484 None,
1485 )
1486 vim_interfaces.append(iface)
1487
1488 ro_vim_item_update = {
1489 "vim_id": vim_vm_id,
1490 "vim_status": "ACTIVE",
1491 "created": created,
1492 "created_items": created_items,
1493 "vim_details": None,
1494 "vim_message": None,
1495 }
1496
1497 if refreshed_vim_info and refreshed_vim_info.get("status") not in (
1498 "ERROR",
1499 "VIM_ERROR",
1500 ):
1501 ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"]
1502
1503 if vim_interfaces:
1504 ro_vim_item_update["interfaces"] = vim_interfaces
1505
1506 self.logger.debug(
1507 "task={} {} vm-migration done".format(task_id, ro_task["target_id"])
1508 )
1509
1510 return "DONE", ro_vim_item_update, db_task_update
1511
1512 except (vimconn.VimConnException, NsWorkerException) as e:
1513 self.logger.error(
1514 "task={} vim={} VM Migration:"
1515 " {}".format(task_id, ro_task["target_id"], e)
1516 )
1517 ro_vim_item_update = {
1518 "vim_status": "VIM_ERROR",
1519 "created": created,
1520 "vim_message": str(e),
1521 }
1522
1523 return "FAILED", ro_vim_item_update, db_task_update
1524
1525
1526 class VimInteractionResize(VimInteractionBase):
1527 def exec(self, ro_task, task_index, task_depends):
1528 task = ro_task["tasks"][task_index]
1529 task_id = task["task_id"]
1530 db_task_update = {"retries": 0}
1531 created = False
1532 target_flavor_uuid = None
1533 created_items = {}
1534 refreshed_vim_info = {}
1535 target_vim = self.my_vims[ro_task["target_id"]]
1536
1537 try:
1538 vim_vm_id = ""
1539 if task.get("params"):
1540 vim_vm_id = task["params"].get("vim_vm_id")
1541 flavor_dict = task["params"].get("flavor_dict")
1542 self.logger.info("flavor_dict %s", flavor_dict)
1543
1544 try:
1545 target_flavor_uuid = target_vim.get_flavor_id_from_data(flavor_dict)
1546 except Exception as e:
1547 self.logger.info("Cannot find any flavor matching %s.", str(e))
1548 try:
1549 target_flavor_uuid = target_vim.new_flavor(flavor_dict)
1550 except Exception as e:
1551 self.logger.error("Error creating flavor at VIM %s.", str(e))
1552
1553 if target_flavor_uuid is not None:
1554 resized_status = target_vim.resize_instance(
1555 vim_vm_id, target_flavor_uuid
1556 )
1557
1558 if resized_status:
1559 # Refresh VM to get new vim_info
1560 vm_to_refresh_list = [vim_vm_id]
1561 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
1562 refreshed_vim_info = vim_dict[vim_vm_id]
1563
1564 ro_vim_item_update = {
1565 "vim_id": vim_vm_id,
1566 "vim_status": "ACTIVE",
1567 "created": created,
1568 "created_items": created_items,
1569 "vim_details": None,
1570 "vim_message": None,
1571 }
1572
1573 if refreshed_vim_info and refreshed_vim_info.get("status") not in (
1574 "ERROR",
1575 "VIM_ERROR",
1576 ):
1577 ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"]
1578
1579 self.logger.debug(
1580 "task={} {} resize done".format(task_id, ro_task["target_id"])
1581 )
1582 return "DONE", ro_vim_item_update, db_task_update
1583 except (vimconn.VimConnException, NsWorkerException) as e:
1584 self.logger.error(
1585 "task={} vim={} Resize:" " {}".format(task_id, ro_task["target_id"], e)
1586 )
1587 ro_vim_item_update = {
1588 "vim_status": "VIM_ERROR",
1589 "created": created,
1590 "vim_message": str(e),
1591 }
1592
1593 return "FAILED", ro_vim_item_update, db_task_update
1594
1595
1596 class ConfigValidate:
1597 def __init__(self, config: Dict):
1598 self.conf = config
1599
1600 @property
1601 def active(self):
1602 # default 1 min, allowed >= 60 or -1, -1 disables periodic checks
1603 if (
1604 self.conf["period"]["refresh_active"] >= 60
1605 or self.conf["period"]["refresh_active"] == -1
1606 ):
1607 return self.conf["period"]["refresh_active"]
1608
1609 return 60
1610
1611 @property
1612 def build(self):
1613 return self.conf["period"]["refresh_build"]
1614
1615 @property
1616 def image(self):
1617 return self.conf["period"]["refresh_image"]
1618
1619 @property
1620 def error(self):
1621 return self.conf["period"]["refresh_error"]
1622
1623 @property
1624 def queue_size(self):
1625 return self.conf["period"]["queue_size"]
1626
1627
1628 class NsWorker(threading.Thread):
1629 def __init__(self, worker_index, config, plugins, db):
1630 """
1631 :param worker_index: thread index
1632 :param config: general configuration of RO, among others the process_id with the docker id where it runs
1633 :param plugins: global shared dict with the loaded plugins
1634 :param db: database class instance to use
1635 """
1636 threading.Thread.__init__(self)
1637 self.config = config
1638 self.plugins = plugins
1639 self.plugin_name = "unknown"
1640 self.logger = logging.getLogger("ro.worker{}".format(worker_index))
1641 self.worker_index = worker_index
1642 # refresh periods for created items
1643 self.refresh_config = ConfigValidate(config)
1644 self.task_queue = queue.Queue(self.refresh_config.queue_size)
1645 # targetvim: vimplugin class
1646 self.my_vims = {}
1647 # targetvim: vim information from database
1648 self.db_vims = {}
1649 # targetvim list
1650 self.vim_targets = []
1651 self.my_id = config["process_id"] + ":" + str(worker_index)
1652 self.db = db
1653 self.item2class = {
1654 "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger),
1655 "shared-volumes": VimInteractionSharedVolume(
1656 self.db, self.my_vims, self.db_vims, self.logger
1657 ),
1658 "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger),
1659 "image": VimInteractionImage(
1660 self.db, self.my_vims, self.db_vims, self.logger
1661 ),
1662 "flavor": VimInteractionFlavor(
1663 self.db, self.my_vims, self.db_vims, self.logger
1664 ),
1665 "sdn_net": VimInteractionSdnNet(
1666 self.db, self.my_vims, self.db_vims, self.logger
1667 ),
1668 "update": VimInteractionUpdateVdu(
1669 self.db, self.my_vims, self.db_vims, self.logger
1670 ),
1671 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
1672 self.db, self.my_vims, self.db_vims, self.logger
1673 ),
1674 "migrate": VimInteractionMigration(
1675 self.db, self.my_vims, self.db_vims, self.logger
1676 ),
1677 "verticalscale": VimInteractionResize(
1678 self.db, self.my_vims, self.db_vims, self.logger
1679 ),
1680 }
1681 self.time_last_task_processed = None
1682 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1683 self.tasks_to_delete = []
1684 # it is idle when there are not vim_targets associated
1685 self.idle = True
1686 self.task_locked_time = config["global"]["task_locked_time"]
1687
1688 def insert_task(self, task):
1689 try:
1690 self.task_queue.put(task, False)
1691 return None
1692 except queue.Full:
1693 raise NsWorkerException("timeout inserting a task")
1694
1695 def terminate(self):
1696 self.insert_task("exit")
1697
1698 def del_task(self, task):
1699 with self.task_lock:
1700 if task["status"] == "SCHEDULED":
1701 task["status"] = "SUPERSEDED"
1702 return True
1703 else: # task["status"] == "processing"
1704 self.task_lock.release()
1705 return False
1706
1707 def _process_vim_config(self, target_id: str, db_vim: dict) -> None:
1708 """
1709 Process vim config, creating vim configuration files as ca_cert
1710 :param target_id: vim/sdn/wim + id
1711 :param db_vim: Vim dictionary obtained from database
1712 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1713 """
1714 if not db_vim.get("config"):
1715 return
1716
1717 file_name = ""
1718 work_dir = "/app/osm_ro/certs"
1719
1720 try:
1721 if db_vim["config"].get("ca_cert_content"):
1722 file_name = f"{work_dir}/{target_id}:{self.worker_index}"
1723
1724 if not path.isdir(file_name):
1725 makedirs(file_name)
1726
1727 file_name = file_name + "/ca_cert"
1728
1729 with open(file_name, "w") as f:
1730 f.write(db_vim["config"]["ca_cert_content"])
1731 del db_vim["config"]["ca_cert_content"]
1732 db_vim["config"]["ca_cert"] = file_name
1733 except Exception as e:
1734 raise NsWorkerException(
1735 "Error writing to file '{}': {}".format(file_name, e)
1736 )
1737
1738 def _load_plugin(self, name, type="vim"):
1739 # type can be vim or sdn
1740 if "rovim_dummy" not in self.plugins:
1741 self.plugins["rovim_dummy"] = VimDummyConnector
1742
1743 if "rosdn_dummy" not in self.plugins:
1744 self.plugins["rosdn_dummy"] = SdnDummyConnector
1745
1746 if name in self.plugins:
1747 return self.plugins[name]
1748
1749 try:
1750 for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name):
1751 self.plugins[name] = ep.load()
1752 except Exception as e:
1753 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e))
1754
1755 if name and name not in self.plugins:
1756 raise NsWorkerException(
1757 "Plugin 'osm_{n}' has not been installed".format(n=name)
1758 )
1759
1760 return self.plugins[name]
1761
1762 def _unload_vim(self, target_id):
1763 """
1764 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1765 :param target_id: Contains type:_id; where type can be 'vim', ...
1766 :return: None.
1767 """
1768 try:
1769 self.db_vims.pop(target_id, None)
1770 self.my_vims.pop(target_id, None)
1771
1772 if target_id in self.vim_targets:
1773 self.vim_targets.remove(target_id)
1774
1775 self.logger.info("Unloaded {}".format(target_id))
1776 except Exception as e:
1777 self.logger.error("Cannot unload {}: {}".format(target_id, e))
1778
1779 def _check_vim(self, target_id):
1780 """
1781 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1782 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1783 :return: None.
1784 """
1785 target, _, _id = target_id.partition(":")
1786 now = time.time()
1787 update_dict = {}
1788 unset_dict = {}
1789 op_text = ""
1790 step = ""
1791 loaded = target_id in self.vim_targets
1792 target_database = (
1793 "vim_accounts"
1794 if target == "vim"
1795 else "wim_accounts"
1796 if target == "wim"
1797 else "sdns"
1798 )
1799 error_text = ""
1800
1801 try:
1802 step = "Getting {} from db".format(target_id)
1803 db_vim = self.db.get_one(target_database, {"_id": _id})
1804
1805 for op_index, operation in enumerate(
1806 db_vim["_admin"].get("operations", ())
1807 ):
1808 if operation["operationState"] != "PROCESSING":
1809 continue
1810
1811 locked_at = operation.get("locked_at")
1812
1813 if locked_at is not None and locked_at >= now - self.task_locked_time:
1814 # some other thread is doing this operation
1815 return
1816
1817 # lock
1818 op_text = "_admin.operations.{}.".format(op_index)
1819
1820 if not self.db.set_one(
1821 target_database,
1822 q_filter={
1823 "_id": _id,
1824 op_text + "operationState": "PROCESSING",
1825 op_text + "locked_at": locked_at,
1826 },
1827 update_dict={
1828 op_text + "locked_at": now,
1829 "admin.current_operation": op_index,
1830 },
1831 fail_on_empty=False,
1832 ):
1833 return
1834
1835 unset_dict[op_text + "locked_at"] = None
1836 unset_dict["current_operation"] = None
1837 step = "Loading " + target_id
1838 error_text = self._load_vim(target_id)
1839
1840 if not error_text:
1841 step = "Checking connectivity"
1842
1843 if target == "vim":
1844 self.my_vims[target_id].check_vim_connectivity()
1845 else:
1846 self.my_vims[target_id].check_credentials()
1847
1848 update_dict["_admin.operationalState"] = "ENABLED"
1849 update_dict["_admin.detailed-status"] = ""
1850 unset_dict[op_text + "detailed-status"] = None
1851 update_dict[op_text + "operationState"] = "COMPLETED"
1852
1853 return
1854
1855 except Exception as e:
1856 error_text = "{}: {}".format(step, e)
1857 self.logger.error("{} for {}: {}".format(step, target_id, e))
1858
1859 finally:
1860 if update_dict or unset_dict:
1861 if error_text:
1862 update_dict[op_text + "operationState"] = "FAILED"
1863 update_dict[op_text + "detailed-status"] = error_text
1864 unset_dict.pop(op_text + "detailed-status", None)
1865 update_dict["_admin.operationalState"] = "ERROR"
1866 update_dict["_admin.detailed-status"] = error_text
1867
1868 if op_text:
1869 update_dict[op_text + "statusEnteredTime"] = now
1870
1871 self.db.set_one(
1872 target_database,
1873 q_filter={"_id": _id},
1874 update_dict=update_dict,
1875 unset=unset_dict,
1876 fail_on_empty=False,
1877 )
1878
1879 if not loaded:
1880 self._unload_vim(target_id)
1881
1882 def _reload_vim(self, target_id):
1883 if target_id in self.vim_targets:
1884 self._load_vim(target_id)
1885 else:
1886 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1887 # just remove it to force load again next time it is needed
1888 self.db_vims.pop(target_id, None)
1889
1890 def _load_vim(self, target_id):
1891 """
1892 Load or reload a vim_account, sdn_controller or wim_account.
1893 Read content from database, load the plugin if not loaded.
1894 In case of error loading the plugin, it loads a failing VIM_connector
1895 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1896 :param target_id: Contains type:_id; where type can be 'vim', ...
1897 :return: None if ok, descriptive text if error
1898 """
1899 target, _, _id = target_id.partition(":")
1900 target_database = (
1901 "vim_accounts"
1902 if target == "vim"
1903 else "wim_accounts"
1904 if target == "wim"
1905 else "sdns"
1906 )
1907 plugin_name = ""
1908 vim = None
1909 step = "Getting {}={} from db".format(target, _id)
1910
1911 try:
1912 # TODO process for wim, sdnc, ...
1913 vim = self.db.get_one(target_database, {"_id": _id})
1914
1915 # if deep_get(vim, "config", "sdn-controller"):
1916 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1917 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1918
1919 step = "Decrypting password"
1920 schema_version = vim.get("schema_version")
1921 self.db.encrypt_decrypt_fields(
1922 vim,
1923 "decrypt",
1924 fields=("password", "secret"),
1925 schema_version=schema_version,
1926 salt=_id,
1927 )
1928 self._process_vim_config(target_id, vim)
1929
1930 if target == "vim":
1931 plugin_name = "rovim_" + vim["vim_type"]
1932 step = "Loading plugin '{}'".format(plugin_name)
1933 vim_module_conn = self._load_plugin(plugin_name)
1934 step = "Loading {}'".format(target_id)
1935 self.my_vims[target_id] = vim_module_conn(
1936 uuid=vim["_id"],
1937 name=vim["name"],
1938 tenant_id=vim.get("vim_tenant_id"),
1939 tenant_name=vim.get("vim_tenant_name"),
1940 url=vim["vim_url"],
1941 url_admin=None,
1942 user=vim["vim_user"],
1943 passwd=vim["vim_password"],
1944 config=vim.get("config") or {},
1945 persistent_info={},
1946 )
1947 else: # sdn
1948 plugin_name = "rosdn_" + (vim.get("type") or vim.get("wim_type"))
1949 step = "Loading plugin '{}'".format(plugin_name)
1950 vim_module_conn = self._load_plugin(plugin_name, "sdn")
1951 step = "Loading {}'".format(target_id)
1952 wim = deepcopy(vim)
1953 wim_config = wim.pop("config", {}) or {}
1954 wim["uuid"] = wim["_id"]
1955 if "url" in wim and "wim_url" not in wim:
1956 wim["wim_url"] = wim["url"]
1957 elif "url" not in wim and "wim_url" in wim:
1958 wim["url"] = wim["wim_url"]
1959
1960 if wim.get("dpid"):
1961 wim_config["dpid"] = wim.pop("dpid")
1962
1963 if wim.get("switch_id"):
1964 wim_config["switch_id"] = wim.pop("switch_id")
1965
1966 # wim, wim_account, config
1967 self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config)
1968 self.db_vims[target_id] = vim
1969 self.error_status = None
1970
1971 self.logger.info(
1972 "Connector loaded for {}, plugin={}".format(target_id, plugin_name)
1973 )
1974 except Exception as e:
1975 self.logger.error(
1976 "Cannot load {} plugin={}: {} {}".format(
1977 target_id, plugin_name, step, e
1978 )
1979 )
1980
1981 self.db_vims[target_id] = vim or {}
1982 self.db_vims[target_id] = FailingConnector(str(e))
1983 error_status = "{} Error: {}".format(step, e)
1984
1985 return error_status
1986 finally:
1987 if target_id not in self.vim_targets:
1988 self.vim_targets.append(target_id)
1989
1990 def _get_db_task(self):
1991 """
1992 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1993 :return: None
1994 """
1995 now = time.time()
1996
1997 if not self.time_last_task_processed:
1998 self.time_last_task_processed = now
1999
2000 try:
2001 while True:
2002 """
2003 # Log RO tasks only when loglevel is DEBUG
2004 if self.logger.getEffectiveLevel() == logging.DEBUG:
2005 self._log_ro_task(
2006 None,
2007 None,
2008 None,
2009 "TASK_WF",
2010 "task_locked_time="
2011 + str(self.task_locked_time)
2012 + " "
2013 + "time_last_task_processed="
2014 + str(self.time_last_task_processed)
2015 + " "
2016 + "now="
2017 + str(now),
2018 )
2019 """
2020 locked = self.db.set_one(
2021 "ro_tasks",
2022 q_filter={
2023 "target_id": self.vim_targets,
2024 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
2025 "locked_at.lt": now - self.task_locked_time,
2026 "to_check_at.lt": self.time_last_task_processed,
2027 "to_check_at.gt": -1,
2028 },
2029 update_dict={"locked_by": self.my_id, "locked_at": now},
2030 fail_on_empty=False,
2031 )
2032
2033 if locked:
2034 # read and return
2035 ro_task = self.db.get_one(
2036 "ro_tasks",
2037 q_filter={
2038 "target_id": self.vim_targets,
2039 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
2040 "locked_at": now,
2041 },
2042 )
2043 return ro_task
2044
2045 if self.time_last_task_processed == now:
2046 self.time_last_task_processed = None
2047 return None
2048 else:
2049 self.time_last_task_processed = now
2050 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
2051
2052 except DbException as e:
2053 self.logger.error("Database exception at _get_db_task: {}".format(e))
2054 except Exception as e:
2055 self.logger.critical(
2056 "Unexpected exception at _get_db_task: {}".format(e), exc_info=True
2057 )
2058
2059 return None
2060
2061 def _delete_task(self, ro_task, task_index, task_depends, db_update):
2062 """
2063 Determine if this task need to be done or superseded
2064 :return: None
2065 """
2066 my_task = ro_task["tasks"][task_index]
2067 task_id = my_task["task_id"]
2068 needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get(
2069 "created_items", False
2070 )
2071
2072 self.logger.debug("Needed delete: {}".format(needed_delete))
2073 if my_task["status"] == "FAILED":
2074 return None, None # TODO need to be retry??
2075
2076 try:
2077 for index, task in enumerate(ro_task["tasks"]):
2078 if index == task_index or not task:
2079 continue # own task
2080
2081 if (
2082 my_task["target_record"] == task["target_record"]
2083 and task["action"] == "CREATE"
2084 ):
2085 # set to finished
2086 db_update["tasks.{}.status".format(index)] = task[
2087 "status"
2088 ] = "FINISHED"
2089 elif task["action"] == "CREATE" and task["status"] not in (
2090 "FINISHED",
2091 "SUPERSEDED",
2092 ):
2093 needed_delete = False
2094
2095 if needed_delete:
2096 self.logger.debug(
2097 "Deleting ro_task={} task_index={}".format(ro_task, task_index)
2098 )
2099 return self.item2class[my_task["item"]].delete(ro_task, task_index)
2100 else:
2101 return "SUPERSEDED", None
2102 except Exception as e:
2103 if not isinstance(e, NsWorkerException):
2104 self.logger.critical(
2105 "Unexpected exception at _delete_task task={}: {}".format(
2106 task_id, e
2107 ),
2108 exc_info=True,
2109 )
2110
2111 return "FAILED", {"vim_status": "VIM_ERROR", "vim_message": str(e)}
2112
2113 def _create_task(self, ro_task, task_index, task_depends, db_update):
2114 """
2115 Determine if this task need to create something at VIM
2116 :return: None
2117 """
2118 my_task = ro_task["tasks"][task_index]
2119 task_id = my_task["task_id"]
2120
2121 if my_task["status"] == "FAILED":
2122 return None, None # TODO need to be retry??
2123 elif my_task["status"] == "SCHEDULED":
2124 # check if already created by another task
2125 for index, task in enumerate(ro_task["tasks"]):
2126 if index == task_index or not task:
2127 continue # own task
2128
2129 if task["action"] == "CREATE" and task["status"] not in (
2130 "SCHEDULED",
2131 "FINISHED",
2132 "SUPERSEDED",
2133 ):
2134 return task["status"], "COPY_VIM_INFO"
2135
2136 try:
2137 task_status, ro_vim_item_update = self.item2class[my_task["item"]].new(
2138 ro_task, task_index, task_depends
2139 )
2140 # TODO update other CREATE tasks
2141 except Exception as e:
2142 if not isinstance(e, NsWorkerException):
2143 self.logger.error(
2144 "Error executing task={}: {}".format(task_id, e), exc_info=True
2145 )
2146
2147 task_status = "FAILED"
2148 ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_message": str(e)}
2149 # TODO update ro_vim_item_update
2150
2151 return task_status, ro_vim_item_update
2152 else:
2153 return None, None
2154
2155 def _get_dependency(self, task_id, ro_task=None, target_id=None):
2156 """
2157 Look for dependency task
2158 :param task_id: Can be one of
2159 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2160 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2161 3. task.task_id: "<action_id>:number"
2162 :param ro_task:
2163 :param target_id:
2164 :return: database ro_task plus index of task
2165 """
2166 if (
2167 task_id.startswith("vim:")
2168 or task_id.startswith("sdn:")
2169 or task_id.startswith("wim:")
2170 ):
2171 target_id, _, task_id = task_id.partition(" ")
2172
2173 if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
2174 ro_task_dependency = self.db.get_one(
2175 "ro_tasks",
2176 q_filter={"target_id": target_id, "tasks.target_record_id": task_id},
2177 fail_on_empty=False,
2178 )
2179
2180 if ro_task_dependency:
2181 for task_index, task in enumerate(ro_task_dependency["tasks"]):
2182 if task["target_record_id"] == task_id:
2183 return ro_task_dependency, task_index
2184
2185 else:
2186 if ro_task:
2187 for task_index, task in enumerate(ro_task["tasks"]):
2188 if task and task["task_id"] == task_id:
2189 return ro_task, task_index
2190
2191 ro_task_dependency = self.db.get_one(
2192 "ro_tasks",
2193 q_filter={
2194 "tasks.ANYINDEX.task_id": task_id,
2195 "tasks.ANYINDEX.target_record.ne": None,
2196 },
2197 fail_on_empty=False,
2198 )
2199
2200 self.logger.debug("ro_task_dependency={}".format(ro_task_dependency))
2201 if ro_task_dependency:
2202 for task_index, task in enumerate(ro_task_dependency["tasks"]):
2203 if task["task_id"] == task_id:
2204 return ro_task_dependency, task_index
2205 raise NsWorkerException("Cannot get depending task {}".format(task_id))
2206
2207 def update_vm_refresh(self, ro_task):
2208 """Enables the VM status updates if self.refresh_config.active parameter
2209 is not -1 and then updates the DB accordingly
2210
2211 """
2212 try:
2213 self.logger.debug("Checking if VM status update config")
2214 next_refresh = time.time()
2215 next_refresh = self._get_next_refresh(ro_task, next_refresh)
2216
2217 if next_refresh != -1:
2218 db_ro_task_update = {}
2219 now = time.time()
2220 next_check_at = now + (24 * 60 * 60)
2221 next_check_at = min(next_check_at, next_refresh)
2222 db_ro_task_update["vim_info.refresh_at"] = next_refresh
2223 db_ro_task_update["to_check_at"] = next_check_at
2224
2225 self.logger.debug(
2226 "Finding tasks which to be updated to enable VM status updates"
2227 )
2228 refresh_tasks = self.db.get_list(
2229 "ro_tasks",
2230 q_filter={
2231 "tasks.status": "DONE",
2232 "to_check_at.lt": 0,
2233 },
2234 )
2235 self.logger.debug("Updating tasks to change the to_check_at status")
2236 for task in refresh_tasks:
2237 q_filter = {
2238 "_id": task["_id"],
2239 }
2240 self.db.set_one(
2241 "ro_tasks",
2242 q_filter=q_filter,
2243 update_dict=db_ro_task_update,
2244 fail_on_empty=True,
2245 )
2246
2247 except Exception as e:
2248 self.logger.error(f"Error updating tasks to enable VM status updates: {e}")
2249
2250 def _get_next_refresh(self, ro_task: dict, next_refresh: float):
2251 """Decide the next_refresh according to vim type and refresh config period.
2252 Args:
2253 ro_task (dict): ro_task details
2254 next_refresh (float): next refresh time as epoch format
2255
2256 Returns:
2257 next_refresh (float) -1 if vm updates are disabled or vim type is openstack.
2258 """
2259 target_vim = ro_task["target_id"]
2260 vim_type = self.db_vims[target_vim]["vim_type"]
2261 if self.refresh_config.active == -1 or vim_type == "openstack":
2262 next_refresh = -1
2263 else:
2264 next_refresh += self.refresh_config.active
2265 return next_refresh
2266
2267 def _process_pending_tasks(self, ro_task):
2268 ro_task_id = ro_task["_id"]
2269 now = time.time()
2270 # one day
2271 next_check_at = now + (24 * 60 * 60)
2272 db_ro_task_update = {}
2273
2274 def _update_refresh(new_status):
2275 # compute next_refresh
2276 nonlocal task
2277 nonlocal next_check_at
2278 nonlocal db_ro_task_update
2279 nonlocal ro_task
2280
2281 next_refresh = time.time()
2282
2283 if task["item"] in ("image", "flavor"):
2284 next_refresh += self.refresh_config.image
2285 elif new_status == "BUILD":
2286 next_refresh += self.refresh_config.build
2287 elif new_status == "DONE":
2288 next_refresh = self._get_next_refresh(ro_task, next_refresh)
2289 else:
2290 next_refresh += self.refresh_config.error
2291
2292 next_check_at = min(next_check_at, next_refresh)
2293 db_ro_task_update["vim_info.refresh_at"] = next_refresh
2294 ro_task["vim_info"]["refresh_at"] = next_refresh
2295
2296 try:
2297 """
2298 # Log RO tasks only when loglevel is DEBUG
2299 if self.logger.getEffectiveLevel() == logging.DEBUG:
2300 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
2301 """
2302 # Check if vim status refresh is enabled again
2303 self.update_vm_refresh(ro_task)
2304 # 0: get task_status_create
2305 lock_object = None
2306 task_status_create = None
2307 task_create = next(
2308 (
2309 t
2310 for t in ro_task["tasks"]
2311 if t
2312 and t["action"] == "CREATE"
2313 and t["status"] in ("BUILD", "DONE")
2314 ),
2315 None,
2316 )
2317
2318 if task_create:
2319 task_status_create = task_create["status"]
2320
2321 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
2322 for task_action in ("DELETE", "CREATE", "EXEC"):
2323 db_vim_update = None
2324 new_status = None
2325
2326 for task_index, task in enumerate(ro_task["tasks"]):
2327 if not task:
2328 continue # task deleted
2329
2330 task_depends = {}
2331 target_update = None
2332
2333 if (
2334 (
2335 task_action in ("DELETE", "EXEC")
2336 and task["status"] not in ("SCHEDULED", "BUILD")
2337 )
2338 or task["action"] != task_action
2339 or (
2340 task_action == "CREATE"
2341 and task["status"] in ("FINISHED", "SUPERSEDED")
2342 )
2343 ):
2344 continue
2345
2346 task_path = "tasks.{}.status".format(task_index)
2347 try:
2348 db_vim_info_update = None
2349 dependency_ro_task = {}
2350
2351 if task["status"] == "SCHEDULED":
2352 # check if tasks that this depends on have been completed
2353 dependency_not_completed = False
2354
2355 for dependency_task_id in task.get("depends_on") or ():
2356 (
2357 dependency_ro_task,
2358 dependency_task_index,
2359 ) = self._get_dependency(
2360 dependency_task_id, target_id=ro_task["target_id"]
2361 )
2362 dependency_task = dependency_ro_task["tasks"][
2363 dependency_task_index
2364 ]
2365 self.logger.debug(
2366 "dependency_ro_task={} dependency_task_index={}".format(
2367 dependency_ro_task, dependency_task_index
2368 )
2369 )
2370
2371 if dependency_task["status"] == "SCHEDULED":
2372 dependency_not_completed = True
2373 next_check_at = min(
2374 next_check_at, dependency_ro_task["to_check_at"]
2375 )
2376 # must allow dependent task to be processed first
2377 # to do this set time after last_task_processed
2378 next_check_at = max(
2379 self.time_last_task_processed, next_check_at
2380 )
2381 break
2382 elif dependency_task["status"] == "FAILED":
2383 error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
2384 task["action"],
2385 task["item"],
2386 dependency_task["action"],
2387 dependency_task["item"],
2388 dependency_task_id,
2389 dependency_ro_task["vim_info"].get(
2390 "vim_message"
2391 ),
2392 )
2393 self.logger.error(
2394 "task={} {}".format(task["task_id"], error_text)
2395 )
2396 raise NsWorkerException(error_text)
2397
2398 task_depends[dependency_task_id] = dependency_ro_task[
2399 "vim_info"
2400 ]["vim_id"]
2401 task_depends[
2402 "TASK-{}".format(dependency_task_id)
2403 ] = dependency_ro_task["vim_info"]["vim_id"]
2404
2405 if dependency_not_completed:
2406 self.logger.warning(
2407 "DEPENDENCY NOT COMPLETED {}".format(
2408 dependency_ro_task["vim_info"]["vim_id"]
2409 )
2410 )
2411 # TODO set at vim_info.vim_details that it is waiting
2412 continue
2413
2414 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
2415 # the task of renew this locking. It will update database locket_at periodically
2416 if not lock_object:
2417 lock_object = LockRenew.add_lock_object(
2418 "ro_tasks", ro_task, self
2419 )
2420 if task["action"] == "DELETE":
2421 (
2422 new_status,
2423 db_vim_info_update,
2424 ) = self._delete_task(
2425 ro_task, task_index, task_depends, db_ro_task_update
2426 )
2427 new_status = (
2428 "FINISHED" if new_status == "DONE" else new_status
2429 )
2430 # ^with FINISHED instead of DONE it will not be refreshing
2431
2432 if new_status in ("FINISHED", "SUPERSEDED"):
2433 target_update = "DELETE"
2434 elif task["action"] == "EXEC":
2435 (
2436 new_status,
2437 db_vim_info_update,
2438 db_task_update,
2439 ) = self.item2class[task["item"]].exec(
2440 ro_task, task_index, task_depends
2441 )
2442 new_status = (
2443 "FINISHED" if new_status == "DONE" else new_status
2444 )
2445 # ^with FINISHED instead of DONE it will not be refreshing
2446
2447 if db_task_update:
2448 # load into database the modified db_task_update "retries" and "next_retry"
2449 if db_task_update.get("retries"):
2450 db_ro_task_update[
2451 "tasks.{}.retries".format(task_index)
2452 ] = db_task_update["retries"]
2453
2454 next_check_at = time.time() + db_task_update.get(
2455 "next_retry", 60
2456 )
2457 target_update = None
2458 elif task["action"] == "CREATE":
2459 if task["status"] == "SCHEDULED":
2460 if task_status_create:
2461 new_status = task_status_create
2462 target_update = "COPY_VIM_INFO"
2463 else:
2464 new_status, db_vim_info_update = self.item2class[
2465 task["item"]
2466 ].new(ro_task, task_index, task_depends)
2467 _update_refresh(new_status)
2468 else:
2469 refresh_at = ro_task["vim_info"]["refresh_at"]
2470 if refresh_at and refresh_at != -1 and now > refresh_at:
2471 (
2472 new_status,
2473 db_vim_info_update,
2474 ) = self.item2class[
2475 task["item"]
2476 ].refresh(ro_task)
2477 _update_refresh(new_status)
2478 else:
2479 # The refresh is updated to avoid set the value of "refresh_at" to
2480 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2481 # because it can happen that in this case the task is never processed
2482 _update_refresh(task["status"])
2483
2484 except Exception as e:
2485 new_status = "FAILED"
2486 db_vim_info_update = {
2487 "vim_status": "VIM_ERROR",
2488 "vim_message": str(e),
2489 }
2490
2491 if not isinstance(
2492 e, (NsWorkerException, vimconn.VimConnException)
2493 ):
2494 self.logger.error(
2495 "Unexpected exception at _delete_task task={}: {}".format(
2496 task["task_id"], e
2497 ),
2498 exc_info=True,
2499 )
2500
2501 try:
2502 if db_vim_info_update:
2503 db_vim_update = db_vim_info_update.copy()
2504 db_ro_task_update.update(
2505 {
2506 "vim_info." + k: v
2507 for k, v in db_vim_info_update.items()
2508 }
2509 )
2510 ro_task["vim_info"].update(db_vim_info_update)
2511
2512 if new_status:
2513 if task_action == "CREATE":
2514 task_status_create = new_status
2515 db_ro_task_update[task_path] = new_status
2516
2517 if target_update or db_vim_update:
2518 if target_update == "DELETE":
2519 self._update_target(task, None)
2520 elif target_update == "COPY_VIM_INFO":
2521 self._update_target(task, ro_task["vim_info"])
2522 else:
2523 self._update_target(task, db_vim_update)
2524
2525 except Exception as e:
2526 if (
2527 isinstance(e, DbException)
2528 and e.http_code == HTTPStatus.NOT_FOUND
2529 ):
2530 # if the vnfrs or nsrs has been removed from database, this task must be removed
2531 self.logger.debug(
2532 "marking to delete task={}".format(task["task_id"])
2533 )
2534 self.tasks_to_delete.append(task)
2535 else:
2536 self.logger.error(
2537 "Unexpected exception at _update_target task={}: {}".format(
2538 task["task_id"], e
2539 ),
2540 exc_info=True,
2541 )
2542
2543 locked_at = ro_task["locked_at"]
2544
2545 if lock_object:
2546 locked_at = [
2547 lock_object["locked_at"],
2548 lock_object["locked_at"] + self.task_locked_time,
2549 ]
2550 # locked_at contains two times to avoid race condition. In case the lock has been renewed, it will
2551 # contain exactly locked_at + self.task_locked_time
2552 LockRenew.remove_lock_object(lock_object)
2553
2554 q_filter = {
2555 "_id": ro_task["_id"],
2556 "to_check_at": ro_task["to_check_at"],
2557 "locked_at": locked_at,
2558 }
2559 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2560 # outside this task (by ro_nbi) do not update it
2561 db_ro_task_update["locked_by"] = None
2562 # locked_at converted to int only for debugging. When it is not decimals it means it has been unlocked
2563 db_ro_task_update["locked_at"] = int(now - self.task_locked_time)
2564 db_ro_task_update["modified_at"] = now
2565 db_ro_task_update["to_check_at"] = next_check_at
2566
2567 """
2568 # Log RO tasks only when loglevel is DEBUG
2569 if self.logger.getEffectiveLevel() == logging.DEBUG:
2570 db_ro_task_update_log = db_ro_task_update.copy()
2571 db_ro_task_update_log["_id"] = q_filter["_id"]
2572 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2573 """
2574
2575 if not self.db.set_one(
2576 "ro_tasks",
2577 update_dict=db_ro_task_update,
2578 q_filter=q_filter,
2579 fail_on_empty=False,
2580 ):
2581 del db_ro_task_update["to_check_at"]
2582 del q_filter["to_check_at"]
2583 """
2584 # Log RO tasks only when loglevel is DEBUG
2585 if self.logger.getEffectiveLevel() == logging.DEBUG:
2586 self._log_ro_task(
2587 None,
2588 db_ro_task_update_log,
2589 None,
2590 "TASK_WF",
2591 "SET_TASK " + str(q_filter),
2592 )
2593 """
2594 self.db.set_one(
2595 "ro_tasks",
2596 q_filter=q_filter,
2597 update_dict=db_ro_task_update,
2598 fail_on_empty=True,
2599 )
2600 except DbException as e:
2601 self.logger.error(
2602 "ro_task={} Error updating database {}".format(ro_task_id, e)
2603 )
2604 except Exception as e:
2605 self.logger.error(
2606 "Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True
2607 )
2608
2609 def _update_target(self, task, ro_vim_item_update):
2610 table, _, temp = task["target_record"].partition(":")
2611 _id, _, path_vim_status = temp.partition(":")
2612 path_item = path_vim_status[: path_vim_status.rfind(".")]
2613 path_item = path_item[: path_item.rfind(".")]
2614 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
2615 # path_item: dot separated list targeting record information, e.g. "vdur.10"
2616
2617 if ro_vim_item_update:
2618 update_dict = {
2619 path_vim_status + "." + k: v
2620 for k, v in ro_vim_item_update.items()
2621 if k
2622 in (
2623 "vim_id",
2624 "vim_details",
2625 "vim_message",
2626 "vim_name",
2627 "vim_status",
2628 "interfaces",
2629 "interfaces_backup",
2630 )
2631 }
2632
2633 if path_vim_status.startswith("vdur."):
2634 # for backward compatibility, add vdur.name apart from vdur.vim_name
2635 if ro_vim_item_update.get("vim_name"):
2636 update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"]
2637
2638 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
2639 if ro_vim_item_update.get("vim_id"):
2640 update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"]
2641
2642 # update general status
2643 if ro_vim_item_update.get("vim_status"):
2644 update_dict[path_item + ".status"] = ro_vim_item_update[
2645 "vim_status"
2646 ]
2647
2648 if ro_vim_item_update.get("interfaces"):
2649 path_interfaces = path_item + ".interfaces"
2650
2651 for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
2652 if iface:
2653 update_dict.update(
2654 {
2655 path_interfaces + ".{}.".format(i) + k: v
2656 for k, v in iface.items()
2657 if k in ("vlan", "compute_node", "pci")
2658 }
2659 )
2660
2661 # put ip_address and mac_address with ip-address and mac-address
2662 if iface.get("ip_address"):
2663 update_dict[
2664 path_interfaces + ".{}.".format(i) + "ip-address"
2665 ] = iface["ip_address"]
2666
2667 if iface.get("mac_address"):
2668 update_dict[
2669 path_interfaces + ".{}.".format(i) + "mac-address"
2670 ] = iface["mac_address"]
2671
2672 if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
2673 update_dict["ip-address"] = iface.get("ip_address").split(
2674 ";"
2675 )[0]
2676
2677 if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
2678 update_dict[path_item + ".ip-address"] = iface.get(
2679 "ip_address"
2680 ).split(";")[0]
2681
2682 self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
2683
2684 # If interfaces exists, it backups VDU interfaces in the DB for healing operations
2685 if ro_vim_item_update.get("interfaces"):
2686 search_key = path_vim_status + ".interfaces"
2687 if update_dict.get(search_key):
2688 interfaces_backup_update = {
2689 path_vim_status + ".interfaces_backup": update_dict[search_key]
2690 }
2691
2692 self.db.set_one(
2693 table,
2694 q_filter={"_id": _id},
2695 update_dict=interfaces_backup_update,
2696 )
2697
2698 else:
2699 update_dict = {path_item + ".status": "DELETED"}
2700 self.db.set_one(
2701 table,
2702 q_filter={"_id": _id},
2703 update_dict=update_dict,
2704 unset={path_vim_status: None},
2705 )
2706
2707 def _process_delete_db_tasks(self):
2708 """
2709 Delete task from database because vnfrs or nsrs or both have been deleted
2710 :return: None. Uses and modify self.tasks_to_delete
2711 """
2712 while self.tasks_to_delete:
2713 task = self.tasks_to_delete[0]
2714 vnfrs_deleted = None
2715 nsr_id = task["nsr_id"]
2716
2717 if task["target_record"].startswith("vnfrs:"):
2718 # check if nsrs is present
2719 if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False):
2720 vnfrs_deleted = task["target_record"].split(":")[1]
2721
2722 try:
2723 self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted)
2724 except Exception as e:
2725 self.logger.error(
2726 "Error deleting task={}: {}".format(task["task_id"], e)
2727 )
2728 self.tasks_to_delete.pop(0)
2729
2730 @staticmethod
2731 def delete_db_tasks(db, nsr_id, vnfrs_deleted):
2732 """
2733 Static method because it is called from osm_ng_ro.ns
2734 :param db: instance of database to use
2735 :param nsr_id: affected nsrs id
2736 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2737 :return: None, exception is fails
2738 """
2739 retries = 5
2740 for retry in range(retries):
2741 ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
2742 now = time.time()
2743 conflict = False
2744
2745 for ro_task in ro_tasks:
2746 db_update = {}
2747 to_delete_ro_task = True
2748
2749 for index, task in enumerate(ro_task["tasks"]):
2750 if not task:
2751 pass
2752 elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or (
2753 vnfrs_deleted
2754 and task["target_record"].startswith("vnfrs:" + vnfrs_deleted)
2755 ):
2756 db_update["tasks.{}".format(index)] = None
2757 else:
2758 # used by other nsr, ro_task cannot be deleted
2759 to_delete_ro_task = False
2760
2761 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2762 if to_delete_ro_task:
2763 if not db.del_one(
2764 "ro_tasks",
2765 q_filter={
2766 "_id": ro_task["_id"],
2767 "modified_at": ro_task["modified_at"],
2768 },
2769 fail_on_empty=False,
2770 ):
2771 conflict = True
2772 elif db_update:
2773 db_update["modified_at"] = now
2774 if not db.set_one(
2775 "ro_tasks",
2776 q_filter={
2777 "_id": ro_task["_id"],
2778 "modified_at": ro_task["modified_at"],
2779 },
2780 update_dict=db_update,
2781 fail_on_empty=False,
2782 ):
2783 conflict = True
2784 if not conflict:
2785 return
2786 else:
2787 raise NsWorkerException("Exceeded {} retries".format(retries))
2788
2789 def run(self):
2790 # load database
2791 self.logger.info("Starting")
2792 while True:
2793 # step 1: get commands from queue
2794 try:
2795 if self.vim_targets:
2796 task = self.task_queue.get(block=False)
2797 else:
2798 if not self.idle:
2799 self.logger.debug("enters in idle state")
2800 self.idle = True
2801 task = self.task_queue.get(block=True)
2802 self.idle = False
2803
2804 if task[0] == "terminate":
2805 break
2806 elif task[0] == "load_vim":
2807 self.logger.info("order to load vim {}".format(task[1]))
2808 self._load_vim(task[1])
2809 elif task[0] == "unload_vim":
2810 self.logger.info("order to unload vim {}".format(task[1]))
2811 self._unload_vim(task[1])
2812 elif task[0] == "reload_vim":
2813 self._reload_vim(task[1])
2814 elif task[0] == "check_vim":
2815 self.logger.info("order to check vim {}".format(task[1]))
2816 self._check_vim(task[1])
2817 continue
2818 except Exception as e:
2819 if isinstance(e, queue.Empty):
2820 pass
2821 else:
2822 self.logger.critical(
2823 "Error processing task: {}".format(e), exc_info=True
2824 )
2825
2826 # step 2: process pending_tasks, delete not needed tasks
2827 try:
2828 if self.tasks_to_delete:
2829 self._process_delete_db_tasks()
2830 busy = False
2831 """
2832 # Log RO tasks only when loglevel is DEBUG
2833 if self.logger.getEffectiveLevel() == logging.DEBUG:
2834 _ = self._get_db_all_tasks()
2835 """
2836 ro_task = self._get_db_task()
2837 if ro_task:
2838 self.logger.debug("Task to process: {}".format(ro_task))
2839 time.sleep(1)
2840 self._process_pending_tasks(ro_task)
2841 busy = True
2842 if not busy:
2843 time.sleep(5)
2844 except Exception as e:
2845 self.logger.critical(
2846 "Unexpected exception at run: " + str(e), exc_info=True
2847 )
2848
2849 self.logger.info("Finishing")