Fix bug 2157 to resolve issues with IETF L2VPN WIM connector
[osm/RO.git] / NG-RO / osm_ng_ro / ns_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 #
18 ##
19
20 """"
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
25 """
26
27 from copy import deepcopy
28 from http import HTTPStatus
29 import logging
30 from os import mkdir
31 import queue
32 from shutil import rmtree
33 import threading
34 import time
35 import traceback
36 from typing import Dict
37 from unittest.mock import Mock
38
39 from importlib_metadata import entry_points
40 from osm_common.dbbase import DbException
41 from osm_ng_ro.vim_admin import LockRenew
42 from osm_ro_plugin import sdnconn, vimconn
43 from osm_ro_plugin.sdn_dummy import SdnDummyConnector
44 from osm_ro_plugin.vim_dummy import VimDummyConnector
45 import yaml
46
47 __author__ = "Alfonso Tierno"
48 __date__ = "$28-Sep-2017 12:07:15$"
49
50
51 def deep_get(target_dict, *args, **kwargs):
52 """
53 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
54 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
55 :param target_dict: dictionary to be read
56 :param args: list of keys to read from target_dict
57 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
58 :return: The wanted value if exist, None or default otherwise
59 """
60 for key in args:
61 if not isinstance(target_dict, dict) or key not in target_dict:
62 return kwargs.get("default")
63 target_dict = target_dict[key]
64 return target_dict
65
66
67 class NsWorkerException(Exception):
68 pass
69
70
71 class FailingConnector:
72 def __init__(self, error_msg):
73 self.error_msg = error_msg
74
75 for method in dir(vimconn.VimConnector):
76 if method[0] != "_":
77 setattr(
78 self, method, Mock(side_effect=vimconn.VimConnException(error_msg))
79 )
80
81 for method in dir(sdnconn.SdnConnectorBase):
82 if method[0] != "_":
83 setattr(
84 self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg))
85 )
86
87
88 class NsWorkerExceptionNotFound(NsWorkerException):
89 pass
90
91
92 class VimInteractionBase:
93 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
94 It implements methods that does nothing and return ok"""
95
96 def __init__(self, db, my_vims, db_vims, logger):
97 self.db = db
98 self.logger = logger
99 self.my_vims = my_vims
100 self.db_vims = db_vims
101
102 def new(self, ro_task, task_index, task_depends):
103 return "BUILD", {}
104
105 def refresh(self, ro_task):
106 """skip calling VIM to get image, flavor status. Assumes ok"""
107 if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
108 return "FAILED", {}
109
110 return "DONE", {}
111
112 def delete(self, ro_task, task_index):
113 """skip calling VIM to delete image. Assumes ok"""
114 return "DONE", {}
115
116 def exec(self, ro_task, task_index, task_depends):
117 return "DONE", None, None
118
119
120 class VimInteractionNet(VimInteractionBase):
121 def new(self, ro_task, task_index, task_depends):
122 vim_net_id = None
123 task = ro_task["tasks"][task_index]
124 task_id = task["task_id"]
125 created = False
126 created_items = {}
127 target_vim = self.my_vims[ro_task["target_id"]]
128 mgmtnet = False
129 mgmtnet_defined_in_vim = False
130
131 try:
132 # FIND
133 if task.get("find_params"):
134 # if management, get configuration of VIM
135 if task["find_params"].get("filter_dict"):
136 vim_filter = task["find_params"]["filter_dict"]
137 # management network
138 elif task["find_params"].get("mgmt"):
139 mgmtnet = True
140 if deep_get(
141 self.db_vims[ro_task["target_id"]],
142 "config",
143 "management_network_id",
144 ):
145 mgmtnet_defined_in_vim = True
146 vim_filter = {
147 "id": self.db_vims[ro_task["target_id"]]["config"][
148 "management_network_id"
149 ]
150 }
151 elif deep_get(
152 self.db_vims[ro_task["target_id"]],
153 "config",
154 "management_network_name",
155 ):
156 mgmtnet_defined_in_vim = True
157 vim_filter = {
158 "name": self.db_vims[ro_task["target_id"]]["config"][
159 "management_network_name"
160 ]
161 }
162 else:
163 vim_filter = {"name": task["find_params"]["name"]}
164 else:
165 raise NsWorkerExceptionNotFound(
166 "Invalid find_params for new_net {}".format(task["find_params"])
167 )
168
169 vim_nets = target_vim.get_network_list(vim_filter)
170 if not vim_nets and not task.get("params"):
171 # If there is mgmt-network in the descriptor,
172 # there is no mapping of that network to a VIM network in the descriptor,
173 # also there is no mapping in the "--config" parameter or at VIM creation;
174 # that mgmt-network will be created.
175 if mgmtnet and not mgmtnet_defined_in_vim:
176 net_name = (
177 vim_filter.get("name")
178 if vim_filter.get("name")
179 else vim_filter.get("id")[:16]
180 )
181 vim_net_id, created_items = target_vim.new_network(
182 net_name, None
183 )
184 self.logger.debug(
185 "Created mgmt network vim_net_id: {}".format(vim_net_id)
186 )
187 created = True
188 else:
189 raise NsWorkerExceptionNotFound(
190 "Network not found with this criteria: '{}'".format(
191 task.get("find_params")
192 )
193 )
194 elif len(vim_nets) > 1:
195 raise NsWorkerException(
196 "More than one network found with this criteria: '{}'".format(
197 task["find_params"]
198 )
199 )
200
201 if vim_nets:
202 vim_net_id = vim_nets[0]["id"]
203 else:
204 # CREATE
205 params = task["params"]
206 vim_net_id, created_items = target_vim.new_network(**params)
207 created = True
208
209 ro_vim_item_update = {
210 "vim_id": vim_net_id,
211 "vim_status": "BUILD",
212 "created": created,
213 "created_items": created_items,
214 "vim_details": None,
215 "vim_message": None,
216 }
217 self.logger.debug(
218 "task={} {} new-net={} created={}".format(
219 task_id, ro_task["target_id"], vim_net_id, created
220 )
221 )
222
223 return "BUILD", ro_vim_item_update
224 except (vimconn.VimConnException, NsWorkerException) as e:
225 self.logger.error(
226 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e)
227 )
228 ro_vim_item_update = {
229 "vim_status": "VIM_ERROR",
230 "created": created,
231 "vim_message": str(e),
232 }
233
234 return "FAILED", ro_vim_item_update
235
236 def refresh(self, ro_task):
237 """Call VIM to get network status"""
238 ro_task_id = ro_task["_id"]
239 target_vim = self.my_vims[ro_task["target_id"]]
240 vim_id = ro_task["vim_info"]["vim_id"]
241 net_to_refresh_list = [vim_id]
242
243 try:
244 vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
245 vim_info = vim_dict[vim_id]
246
247 if vim_info["status"] == "ACTIVE":
248 task_status = "DONE"
249 elif vim_info["status"] == "BUILD":
250 task_status = "BUILD"
251 else:
252 task_status = "FAILED"
253 except vimconn.VimConnException as e:
254 # Mark all tasks at VIM_ERROR status
255 self.logger.error(
256 "ro_task={} vim={} get-net={}: {}".format(
257 ro_task_id, ro_task["target_id"], vim_id, e
258 )
259 )
260 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
261 task_status = "FAILED"
262
263 ro_vim_item_update = {}
264 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
265 ro_vim_item_update["vim_status"] = vim_info["status"]
266
267 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
268 ro_vim_item_update["vim_name"] = vim_info.get("name")
269
270 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
271 if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"):
272 ro_vim_item_update["vim_message"] = vim_info.get("error_msg")
273 elif vim_info["status"] == "DELETED":
274 ro_vim_item_update["vim_id"] = None
275 ro_vim_item_update["vim_message"] = "Deleted externally"
276 else:
277 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
278 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
279
280 if ro_vim_item_update:
281 self.logger.debug(
282 "ro_task={} {} get-net={}: status={} {}".format(
283 ro_task_id,
284 ro_task["target_id"],
285 vim_id,
286 ro_vim_item_update.get("vim_status"),
287 ro_vim_item_update.get("vim_message")
288 if ro_vim_item_update.get("vim_status") != "ACTIVE"
289 else "",
290 )
291 )
292
293 return task_status, ro_vim_item_update
294
295 def delete(self, ro_task, task_index):
296 task = ro_task["tasks"][task_index]
297 task_id = task["task_id"]
298 net_vim_id = ro_task["vim_info"]["vim_id"]
299 ro_vim_item_update_ok = {
300 "vim_status": "DELETED",
301 "created": False,
302 "vim_message": "DELETED",
303 "vim_id": None,
304 }
305
306 try:
307 if net_vim_id or ro_task["vim_info"]["created_items"]:
308 target_vim = self.my_vims[ro_task["target_id"]]
309 target_vim.delete_network(
310 net_vim_id, ro_task["vim_info"]["created_items"]
311 )
312 except vimconn.VimConnNotFoundException:
313 ro_vim_item_update_ok["vim_message"] = "already deleted"
314 except vimconn.VimConnException as e:
315 self.logger.error(
316 "ro_task={} vim={} del-net={}: {}".format(
317 ro_task["_id"], ro_task["target_id"], net_vim_id, e
318 )
319 )
320 ro_vim_item_update = {
321 "vim_status": "VIM_ERROR",
322 "vim_message": "Error while deleting: {}".format(e),
323 }
324
325 return "FAILED", ro_vim_item_update
326
327 self.logger.debug(
328 "task={} {} del-net={} {}".format(
329 task_id,
330 ro_task["target_id"],
331 net_vim_id,
332 ro_vim_item_update_ok.get("vim_message", ""),
333 )
334 )
335
336 return "DONE", ro_vim_item_update_ok
337
338
339 class VimInteractionVdu(VimInteractionBase):
340 max_retries_inject_ssh_key = 20 # 20 times
341 time_retries_inject_ssh_key = 30 # wevery 30 seconds
342
343 def new(self, ro_task, task_index, task_depends):
344 task = ro_task["tasks"][task_index]
345 task_id = task["task_id"]
346 created = False
347 created_items = {}
348 target_vim = self.my_vims[ro_task["target_id"]]
349
350 try:
351 created = True
352 params = task["params"]
353 params_copy = deepcopy(params)
354 net_list = params_copy["net_list"]
355
356 for net in net_list:
357 # change task_id into network_id
358 if "net_id" in net and net["net_id"].startswith("TASK-"):
359 network_id = task_depends[net["net_id"]]
360
361 if not network_id:
362 raise NsWorkerException(
363 "Cannot create VM because depends on a network not created or found "
364 "for {}".format(net["net_id"])
365 )
366
367 net["net_id"] = network_id
368
369 if params_copy["image_id"].startswith("TASK-"):
370 params_copy["image_id"] = task_depends[params_copy["image_id"]]
371
372 if params_copy["flavor_id"].startswith("TASK-"):
373 params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
374
375 affinity_group_list = params_copy["affinity_group_list"]
376 for affinity_group in affinity_group_list:
377 # change task_id into affinity_group_id
378 if "affinity_group_id" in affinity_group and affinity_group[
379 "affinity_group_id"
380 ].startswith("TASK-"):
381 affinity_group_id = task_depends[
382 affinity_group["affinity_group_id"]
383 ]
384
385 if not affinity_group_id:
386 raise NsWorkerException(
387 "found for {}".format(affinity_group["affinity_group_id"])
388 )
389
390 affinity_group["affinity_group_id"] = affinity_group_id
391
392 vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
393 interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
394
395 # add to created items previous_created_volumes (healing)
396 if task.get("previous_created_volumes"):
397 for k, v in task["previous_created_volumes"].items():
398 created_items[k] = v
399
400 ro_vim_item_update = {
401 "vim_id": vim_vm_id,
402 "vim_status": "BUILD",
403 "created": created,
404 "created_items": created_items,
405 "vim_details": None,
406 "vim_message": None,
407 "interfaces_vim_ids": interfaces,
408 "interfaces": [],
409 "interfaces_backup": [],
410 }
411 self.logger.debug(
412 "task={} {} new-vm={} created={}".format(
413 task_id, ro_task["target_id"], vim_vm_id, created
414 )
415 )
416
417 return "BUILD", ro_vim_item_update
418 except (vimconn.VimConnException, NsWorkerException) as e:
419 self.logger.debug(traceback.format_exc())
420 self.logger.error(
421 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
422 )
423 ro_vim_item_update = {
424 "vim_status": "VIM_ERROR",
425 "created": created,
426 "vim_message": str(e),
427 }
428
429 return "FAILED", ro_vim_item_update
430
431 def delete(self, ro_task, task_index):
432 task = ro_task["tasks"][task_index]
433 task_id = task["task_id"]
434 vm_vim_id = ro_task["vim_info"]["vim_id"]
435 ro_vim_item_update_ok = {
436 "vim_status": "DELETED",
437 "created": False,
438 "vim_message": "DELETED",
439 "vim_id": None,
440 }
441
442 try:
443 self.logger.debug(
444 "delete_vminstance: vm_vim_id={} created_items={}".format(
445 vm_vim_id, ro_task["vim_info"]["created_items"]
446 )
447 )
448 if vm_vim_id or ro_task["vim_info"]["created_items"]:
449 target_vim = self.my_vims[ro_task["target_id"]]
450 target_vim.delete_vminstance(
451 vm_vim_id,
452 ro_task["vim_info"]["created_items"],
453 ro_task["vim_info"].get("volumes_to_hold", []),
454 )
455 except vimconn.VimConnNotFoundException:
456 ro_vim_item_update_ok["vim_message"] = "already deleted"
457 except vimconn.VimConnException as e:
458 self.logger.error(
459 "ro_task={} vim={} del-vm={}: {}".format(
460 ro_task["_id"], ro_task["target_id"], vm_vim_id, e
461 )
462 )
463 ro_vim_item_update = {
464 "vim_status": "VIM_ERROR",
465 "vim_message": "Error while deleting: {}".format(e),
466 }
467
468 return "FAILED", ro_vim_item_update
469
470 self.logger.debug(
471 "task={} {} del-vm={} {}".format(
472 task_id,
473 ro_task["target_id"],
474 vm_vim_id,
475 ro_vim_item_update_ok.get("vim_message", ""),
476 )
477 )
478
479 return "DONE", ro_vim_item_update_ok
480
481 def refresh(self, ro_task):
482 """Call VIM to get vm status"""
483 ro_task_id = ro_task["_id"]
484 target_vim = self.my_vims[ro_task["target_id"]]
485 vim_id = ro_task["vim_info"]["vim_id"]
486
487 if not vim_id:
488 return None, None
489
490 vm_to_refresh_list = [vim_id]
491 try:
492 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
493 vim_info = vim_dict[vim_id]
494
495 if vim_info["status"] == "ACTIVE":
496 task_status = "DONE"
497 elif vim_info["status"] == "BUILD":
498 task_status = "BUILD"
499 else:
500 task_status = "FAILED"
501
502 # try to load and parse vim_information
503 try:
504 vim_info_info = yaml.safe_load(vim_info["vim_info"])
505 if vim_info_info.get("name"):
506 vim_info["name"] = vim_info_info["name"]
507 except Exception as vim_info_error:
508 self.logger.exception(
509 f"{vim_info_error} occured while getting the vim_info from yaml"
510 )
511 except vimconn.VimConnException as e:
512 # Mark all tasks at VIM_ERROR status
513 self.logger.error(
514 "ro_task={} vim={} get-vm={}: {}".format(
515 ro_task_id, ro_task["target_id"], vim_id, e
516 )
517 )
518 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
519 task_status = "FAILED"
520
521 ro_vim_item_update = {}
522
523 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
524 vim_interfaces = []
525 if vim_info.get("interfaces"):
526 for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
527 iface = next(
528 (
529 iface
530 for iface in vim_info["interfaces"]
531 if vim_iface_id == iface["vim_interface_id"]
532 ),
533 None,
534 )
535 # if iface:
536 # iface.pop("vim_info", None)
537 vim_interfaces.append(iface)
538
539 task_create = next(
540 t
541 for t in ro_task["tasks"]
542 if t and t["action"] == "CREATE" and t["status"] != "FINISHED"
543 )
544 if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None:
545 vim_interfaces[task_create["mgmt_vnf_interface"]][
546 "mgmt_vnf_interface"
547 ] = True
548
549 mgmt_vdu_iface = task_create.get(
550 "mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0)
551 )
552 if vim_interfaces:
553 vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
554
555 if ro_task["vim_info"]["interfaces"] != vim_interfaces:
556 ro_vim_item_update["interfaces"] = vim_interfaces
557
558 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
559 ro_vim_item_update["vim_status"] = vim_info["status"]
560
561 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
562 ro_vim_item_update["vim_name"] = vim_info.get("name")
563
564 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
565 if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"):
566 ro_vim_item_update["vim_message"] = vim_info.get("error_msg")
567 elif vim_info["status"] == "DELETED":
568 ro_vim_item_update["vim_id"] = None
569 ro_vim_item_update["vim_message"] = "Deleted externally"
570 else:
571 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
572 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
573
574 if ro_vim_item_update:
575 self.logger.debug(
576 "ro_task={} {} get-vm={}: status={} {}".format(
577 ro_task_id,
578 ro_task["target_id"],
579 vim_id,
580 ro_vim_item_update.get("vim_status"),
581 ro_vim_item_update.get("vim_message")
582 if ro_vim_item_update.get("vim_status") != "ACTIVE"
583 else "",
584 )
585 )
586
587 return task_status, ro_vim_item_update
588
589 def exec(self, ro_task, task_index, task_depends):
590 task = ro_task["tasks"][task_index]
591 task_id = task["task_id"]
592 target_vim = self.my_vims[ro_task["target_id"]]
593 db_task_update = {"retries": 0}
594 retries = task.get("retries", 0)
595
596 try:
597 params = task["params"]
598 params_copy = deepcopy(params)
599 params_copy["ro_key"] = self.db.decrypt(
600 params_copy.pop("private_key"),
601 params_copy.pop("schema_version"),
602 params_copy.pop("salt"),
603 )
604 params_copy["ip_addr"] = params_copy.pop("ip_address")
605 target_vim.inject_user_key(**params_copy)
606 self.logger.debug(
607 "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"])
608 )
609
610 return (
611 "DONE",
612 None,
613 db_task_update,
614 ) # params_copy["key"]
615 except (vimconn.VimConnException, NsWorkerException) as e:
616 retries += 1
617
618 self.logger.debug(traceback.format_exc())
619 if retries < self.max_retries_inject_ssh_key:
620 return (
621 "BUILD",
622 None,
623 {
624 "retries": retries,
625 "next_retry": self.time_retries_inject_ssh_key,
626 },
627 )
628
629 self.logger.error(
630 "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e)
631 )
632 ro_vim_item_update = {"vim_message": str(e)}
633
634 return "FAILED", ro_vim_item_update, db_task_update
635
636
637 class VimInteractionImage(VimInteractionBase):
638 def new(self, ro_task, task_index, task_depends):
639 task = ro_task["tasks"][task_index]
640 task_id = task["task_id"]
641 created = False
642 created_items = {}
643 target_vim = self.my_vims[ro_task["target_id"]]
644
645 try:
646 # FIND
647 if task.get("find_params"):
648 vim_images = target_vim.get_image_list(**task["find_params"])
649
650 if not vim_images:
651 raise NsWorkerExceptionNotFound(
652 "Image not found with this criteria: '{}'".format(
653 task["find_params"]
654 )
655 )
656 elif len(vim_images) > 1:
657 raise NsWorkerException(
658 "More than one image found with this criteria: '{}'".format(
659 task["find_params"]
660 )
661 )
662 else:
663 vim_image_id = vim_images[0]["id"]
664
665 ro_vim_item_update = {
666 "vim_id": vim_image_id,
667 "vim_status": "DONE",
668 "created": created,
669 "created_items": created_items,
670 "vim_details": None,
671 "vim_message": None,
672 }
673 self.logger.debug(
674 "task={} {} new-image={} created={}".format(
675 task_id, ro_task["target_id"], vim_image_id, created
676 )
677 )
678
679 return "DONE", ro_vim_item_update
680 except (NsWorkerException, vimconn.VimConnException) as e:
681 self.logger.error(
682 "task={} {} new-image: {}".format(task_id, ro_task["target_id"], e)
683 )
684 ro_vim_item_update = {
685 "vim_status": "VIM_ERROR",
686 "created": created,
687 "vim_message": str(e),
688 }
689
690 return "FAILED", ro_vim_item_update
691
692
693 class VimInteractionFlavor(VimInteractionBase):
694 def delete(self, ro_task, task_index):
695 task = ro_task["tasks"][task_index]
696 task_id = task["task_id"]
697 flavor_vim_id = ro_task["vim_info"]["vim_id"]
698 ro_vim_item_update_ok = {
699 "vim_status": "DELETED",
700 "created": False,
701 "vim_message": "DELETED",
702 "vim_id": None,
703 }
704
705 try:
706 if flavor_vim_id:
707 target_vim = self.my_vims[ro_task["target_id"]]
708 target_vim.delete_flavor(flavor_vim_id)
709 except vimconn.VimConnNotFoundException:
710 ro_vim_item_update_ok["vim_message"] = "already deleted"
711 except vimconn.VimConnException as e:
712 self.logger.error(
713 "ro_task={} vim={} del-flavor={}: {}".format(
714 ro_task["_id"], ro_task["target_id"], flavor_vim_id, e
715 )
716 )
717 ro_vim_item_update = {
718 "vim_status": "VIM_ERROR",
719 "vim_message": "Error while deleting: {}".format(e),
720 }
721
722 return "FAILED", ro_vim_item_update
723
724 self.logger.debug(
725 "task={} {} del-flavor={} {}".format(
726 task_id,
727 ro_task["target_id"],
728 flavor_vim_id,
729 ro_vim_item_update_ok.get("vim_message", ""),
730 )
731 )
732
733 return "DONE", ro_vim_item_update_ok
734
735 def new(self, ro_task, task_index, task_depends):
736 task = ro_task["tasks"][task_index]
737 task_id = task["task_id"]
738 created = False
739 created_items = {}
740 target_vim = self.my_vims[ro_task["target_id"]]
741
742 try:
743 # FIND
744 vim_flavor_id = None
745
746 if task.get("find_params"):
747 try:
748 flavor_data = task["find_params"]["flavor_data"]
749 vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
750 except vimconn.VimConnNotFoundException:
751 self.logger.warning("VimConnNotFoundException occured.")
752
753 if not vim_flavor_id and task.get("params"):
754 # CREATE
755 flavor_data = task["params"]["flavor_data"]
756 vim_flavor_id = target_vim.new_flavor(flavor_data)
757 created = True
758
759 ro_vim_item_update = {
760 "vim_id": vim_flavor_id,
761 "vim_status": "DONE",
762 "created": created,
763 "created_items": created_items,
764 "vim_details": None,
765 "vim_message": None,
766 }
767 self.logger.debug(
768 "task={} {} new-flavor={} created={}".format(
769 task_id, ro_task["target_id"], vim_flavor_id, created
770 )
771 )
772
773 return "DONE", ro_vim_item_update
774 except (vimconn.VimConnException, NsWorkerException) as e:
775 self.logger.error(
776 "task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e)
777 )
778 ro_vim_item_update = {
779 "vim_status": "VIM_ERROR",
780 "created": created,
781 "vim_message": str(e),
782 }
783
784 return "FAILED", ro_vim_item_update
785
786
787 class VimInteractionAffinityGroup(VimInteractionBase):
788 def delete(self, ro_task, task_index):
789 task = ro_task["tasks"][task_index]
790 task_id = task["task_id"]
791 affinity_group_vim_id = ro_task["vim_info"]["vim_id"]
792 ro_vim_item_update_ok = {
793 "vim_status": "DELETED",
794 "created": False,
795 "vim_message": "DELETED",
796 "vim_id": None,
797 }
798
799 try:
800 if affinity_group_vim_id:
801 target_vim = self.my_vims[ro_task["target_id"]]
802 target_vim.delete_affinity_group(affinity_group_vim_id)
803 except vimconn.VimConnNotFoundException:
804 ro_vim_item_update_ok["vim_message"] = "already deleted"
805 except vimconn.VimConnException as e:
806 self.logger.error(
807 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format(
808 ro_task["_id"], ro_task["target_id"], affinity_group_vim_id, e
809 )
810 )
811 ro_vim_item_update = {
812 "vim_status": "VIM_ERROR",
813 "vim_message": "Error while deleting: {}".format(e),
814 }
815
816 return "FAILED", ro_vim_item_update
817
818 self.logger.debug(
819 "task={} {} del-affinity-or-anti-affinity-group={} {}".format(
820 task_id,
821 ro_task["target_id"],
822 affinity_group_vim_id,
823 ro_vim_item_update_ok.get("vim_message", ""),
824 )
825 )
826
827 return "DONE", ro_vim_item_update_ok
828
829 def new(self, ro_task, task_index, task_depends):
830 task = ro_task["tasks"][task_index]
831 task_id = task["task_id"]
832 created = False
833 created_items = {}
834 target_vim = self.my_vims[ro_task["target_id"]]
835
836 try:
837 affinity_group_vim_id = None
838 affinity_group_data = None
839
840 if task.get("params"):
841 affinity_group_data = task["params"].get("affinity_group_data")
842
843 if affinity_group_data and affinity_group_data.get("vim-affinity-group-id"):
844 try:
845 param_affinity_group_id = task["params"]["affinity_group_data"].get(
846 "vim-affinity-group-id"
847 )
848 affinity_group_vim_id = target_vim.get_affinity_group(
849 param_affinity_group_id
850 ).get("id")
851 except vimconn.VimConnNotFoundException:
852 self.logger.error(
853 "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}"
854 "could not be found at VIM. Creating a new one.".format(
855 task_id, ro_task["target_id"], param_affinity_group_id
856 )
857 )
858
859 if not affinity_group_vim_id and affinity_group_data:
860 affinity_group_vim_id = target_vim.new_affinity_group(
861 affinity_group_data
862 )
863 created = True
864
865 ro_vim_item_update = {
866 "vim_id": affinity_group_vim_id,
867 "vim_status": "DONE",
868 "created": created,
869 "created_items": created_items,
870 "vim_details": None,
871 "vim_message": None,
872 }
873 self.logger.debug(
874 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format(
875 task_id, ro_task["target_id"], affinity_group_vim_id, created
876 )
877 )
878
879 return "DONE", ro_vim_item_update
880 except (vimconn.VimConnException, NsWorkerException) as e:
881 self.logger.error(
882 "task={} vim={} new-affinity-or-anti-affinity-group:"
883 " {}".format(task_id, ro_task["target_id"], e)
884 )
885 ro_vim_item_update = {
886 "vim_status": "VIM_ERROR",
887 "created": created,
888 "vim_message": str(e),
889 }
890
891 return "FAILED", ro_vim_item_update
892
893
894 class VimInteractionUpdateVdu(VimInteractionBase):
895 def exec(self, ro_task, task_index, task_depends):
896 task = ro_task["tasks"][task_index]
897 task_id = task["task_id"]
898 db_task_update = {"retries": 0}
899 created = False
900 created_items = {}
901 target_vim = self.my_vims[ro_task["target_id"]]
902
903 try:
904 if task.get("params"):
905 vim_vm_id = task["params"].get("vim_vm_id")
906 action = task["params"].get("action")
907 context = {action: action}
908 target_vim.action_vminstance(vim_vm_id, context)
909 # created = True
910 ro_vim_item_update = {
911 "vim_id": vim_vm_id,
912 "vim_status": "DONE",
913 "created": created,
914 "created_items": created_items,
915 "vim_details": None,
916 "vim_message": None,
917 }
918 self.logger.debug(
919 "task={} {} vm-migration done".format(task_id, ro_task["target_id"])
920 )
921 return "DONE", ro_vim_item_update, db_task_update
922 except (vimconn.VimConnException, NsWorkerException) as e:
923 self.logger.error(
924 "task={} vim={} VM Migration:"
925 " {}".format(task_id, ro_task["target_id"], e)
926 )
927 ro_vim_item_update = {
928 "vim_status": "VIM_ERROR",
929 "created": created,
930 "vim_message": str(e),
931 }
932
933 return "FAILED", ro_vim_item_update, db_task_update
934
935
936 class VimInteractionSdnNet(VimInteractionBase):
937 @staticmethod
938 def _match_pci(port_pci, mapping):
939 """
940 Check if port_pci matches with mapping
941 mapping can have brackets to indicate that several chars are accepted. e.g
942 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
943 :param port_pci: text
944 :param mapping: text, can contain brackets to indicate several chars are available
945 :return: True if matches, False otherwise
946 """
947 if not port_pci or not mapping:
948 return False
949 if port_pci == mapping:
950 return True
951
952 mapping_index = 0
953 pci_index = 0
954 while True:
955 bracket_start = mapping.find("[", mapping_index)
956
957 if bracket_start == -1:
958 break
959
960 bracket_end = mapping.find("]", bracket_start)
961 if bracket_end == -1:
962 break
963
964 length = bracket_start - mapping_index
965 if (
966 length
967 and port_pci[pci_index : pci_index + length]
968 != mapping[mapping_index:bracket_start]
969 ):
970 return False
971
972 if (
973 port_pci[pci_index + length]
974 not in mapping[bracket_start + 1 : bracket_end]
975 ):
976 return False
977
978 pci_index += length + 1
979 mapping_index = bracket_end + 1
980
981 if port_pci[pci_index:] != mapping[mapping_index:]:
982 return False
983
984 return True
985
986 def _get_interfaces(self, vlds_to_connect, vim_account_id):
987 """
988 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
989 :param vim_account_id:
990 :return:
991 """
992 interfaces = []
993
994 for vld in vlds_to_connect:
995 table, _, db_id = vld.partition(":")
996 db_id, _, vld = db_id.partition(":")
997 _, _, vld_id = vld.partition(".")
998
999 if table == "vnfrs":
1000 q_filter = {"vim-account-id": vim_account_id, "_id": db_id}
1001 iface_key = "vnf-vld-id"
1002 else: # table == "nsrs"
1003 q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id}
1004 iface_key = "ns-vld-id"
1005
1006 db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter)
1007
1008 for db_vnfr in db_vnfrs:
1009 for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())):
1010 for iface_index, interface in enumerate(vdur["interfaces"]):
1011 if interface.get(iface_key) == vld_id and interface.get(
1012 "type"
1013 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
1014 # only SR-IOV o PT
1015 interface_ = interface.copy()
1016 interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
1017 db_vnfr["_id"], vdu_index, iface_index
1018 )
1019
1020 if vdur.get("status") == "ERROR":
1021 interface_["status"] = "ERROR"
1022
1023 interfaces.append(interface_)
1024
1025 return interfaces
1026
1027 def refresh(self, ro_task):
1028 # look for task create
1029 task_create_index, _ = next(
1030 i_t
1031 for i_t in enumerate(ro_task["tasks"])
1032 if i_t[1]
1033 and i_t[1]["action"] == "CREATE"
1034 and i_t[1]["status"] != "FINISHED"
1035 )
1036
1037 return self.new(ro_task, task_create_index, None)
1038
1039 def new(self, ro_task, task_index, task_depends):
1040
1041 task = ro_task["tasks"][task_index]
1042 task_id = task["task_id"]
1043 target_vim = self.my_vims[ro_task["target_id"]]
1044
1045 sdn_net_id = ro_task["vim_info"]["vim_id"]
1046
1047 created_items = ro_task["vim_info"].get("created_items")
1048 connected_ports = ro_task["vim_info"].get("connected_ports", [])
1049 new_connected_ports = []
1050 last_update = ro_task["vim_info"].get("last_update", 0)
1051 sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD"
1052 error_list = []
1053 created = ro_task["vim_info"].get("created", False)
1054
1055 try:
1056 # CREATE
1057 params = task["params"]
1058 vlds_to_connect = params.get("vlds", [])
1059 associated_vim = params.get("target_vim")
1060 # external additional ports
1061 additional_ports = params.get("sdn-ports") or ()
1062 _, _, vim_account_id = (
1063 (None, None, None)
1064 if associated_vim is None
1065 else associated_vim.partition(":")
1066 )
1067
1068 if associated_vim:
1069 # get associated VIM
1070 if associated_vim not in self.db_vims:
1071 self.db_vims[associated_vim] = self.db.get_one(
1072 "vim_accounts", {"_id": vim_account_id}
1073 )
1074
1075 db_vim = self.db_vims[associated_vim]
1076
1077 # look for ports to connect
1078 ports = self._get_interfaces(vlds_to_connect, vim_account_id)
1079 # print(ports)
1080
1081 sdn_ports = []
1082 pending_ports = error_ports = 0
1083 vlan_used = None
1084 sdn_need_update = False
1085
1086 for port in ports:
1087 vlan_used = port.get("vlan") or vlan_used
1088
1089 # TODO. Do not connect if already done
1090 if not port.get("compute_node") or not port.get("pci"):
1091 if port.get("status") == "ERROR":
1092 error_ports += 1
1093 else:
1094 pending_ports += 1
1095 continue
1096
1097 pmap = None
1098 compute_node_mappings = next(
1099 (
1100 c
1101 for c in db_vim["config"].get("sdn-port-mapping", ())
1102 if c and c["compute_node"] == port["compute_node"]
1103 ),
1104 None,
1105 )
1106
1107 if compute_node_mappings:
1108 # process port_mapping pci of type 0000:af:1[01].[1357]
1109 pmap = next(
1110 (
1111 p
1112 for p in compute_node_mappings["ports"]
1113 if self._match_pci(port["pci"], p.get("pci"))
1114 ),
1115 None,
1116 )
1117
1118 if not pmap:
1119 if not db_vim["config"].get("mapping_not_needed"):
1120 error_list.append(
1121 "Port mapping not found for compute_node={} pci={}".format(
1122 port["compute_node"], port["pci"]
1123 )
1124 )
1125 continue
1126
1127 pmap = {}
1128
1129 service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"])
1130 new_port = {
1131 "service_endpoint_id": pmap.get("service_endpoint_id")
1132 or service_endpoint_id,
1133 "service_endpoint_encapsulation_type": "dot1q"
1134 if port["type"] == "SR-IOV"
1135 else None,
1136 "service_endpoint_encapsulation_info": {
1137 "vlan": port.get("vlan"),
1138 "mac": port.get("mac-address"),
1139 "device_id": pmap.get("device_id") or port["compute_node"],
1140 "device_interface_id": pmap.get("device_interface_id")
1141 or port["pci"],
1142 "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"),
1143 "switch_port": pmap.get("switch_port"),
1144 "service_mapping_info": pmap.get("service_mapping_info"),
1145 },
1146 }
1147
1148 # TODO
1149 # if port["modified_at"] > last_update:
1150 # sdn_need_update = True
1151 new_connected_ports.append(port["id"]) # TODO
1152 sdn_ports.append(new_port)
1153
1154 if error_ports:
1155 error_list.append(
1156 "{} interfaces have not been created as VDU is on ERROR status".format(
1157 error_ports
1158 )
1159 )
1160
1161 # connect external ports
1162 for index, additional_port in enumerate(additional_ports):
1163 additional_port_id = additional_port.get(
1164 "service_endpoint_id"
1165 ) or "external-{}".format(index)
1166 sdn_ports.append(
1167 {
1168 "service_endpoint_id": additional_port_id,
1169 "service_endpoint_encapsulation_type": additional_port.get(
1170 "service_endpoint_encapsulation_type", "dot1q"
1171 ),
1172 "service_endpoint_encapsulation_info": {
1173 "vlan": additional_port.get("vlan") or vlan_used,
1174 "mac": additional_port.get("mac_address"),
1175 "device_id": additional_port.get("device_id"),
1176 "device_interface_id": additional_port.get(
1177 "device_interface_id"
1178 ),
1179 "switch_dpid": additional_port.get("switch_dpid")
1180 or additional_port.get("switch_id"),
1181 "switch_port": additional_port.get("switch_port"),
1182 "service_mapping_info": additional_port.get(
1183 "service_mapping_info"
1184 ),
1185 },
1186 }
1187 )
1188 new_connected_ports.append(additional_port_id)
1189 sdn_info = ""
1190
1191 # if there are more ports to connect or they have been modified, call create/update
1192 if error_list:
1193 sdn_status = "ERROR"
1194 sdn_info = "; ".join(error_list)
1195 elif set(connected_ports) != set(new_connected_ports) or sdn_need_update:
1196 last_update = time.time()
1197
1198 if not sdn_net_id:
1199 if len(sdn_ports) < 2:
1200 sdn_status = "ACTIVE"
1201
1202 if not pending_ports:
1203 self.logger.debug(
1204 "task={} {} new-sdn-net done, less than 2 ports".format(
1205 task_id, ro_task["target_id"]
1206 )
1207 )
1208 else:
1209 net_type = params.get("type") or "ELAN"
1210 (
1211 sdn_net_id,
1212 created_items,
1213 ) = target_vim.create_connectivity_service(net_type, sdn_ports)
1214 created = True
1215 self.logger.debug(
1216 "task={} {} new-sdn-net={} created={}".format(
1217 task_id, ro_task["target_id"], sdn_net_id, created
1218 )
1219 )
1220 else:
1221 created_items = target_vim.edit_connectivity_service(
1222 sdn_net_id, conn_info=created_items, connection_points=sdn_ports
1223 )
1224 created = True
1225 self.logger.debug(
1226 "task={} {} update-sdn-net={} created={}".format(
1227 task_id, ro_task["target_id"], sdn_net_id, created
1228 )
1229 )
1230
1231 connected_ports = new_connected_ports
1232 elif sdn_net_id:
1233 wim_status_dict = target_vim.get_connectivity_service_status(
1234 sdn_net_id, conn_info=created_items
1235 )
1236 sdn_status = wim_status_dict["sdn_status"]
1237
1238 if wim_status_dict.get("sdn_info"):
1239 sdn_info = str(wim_status_dict.get("sdn_info")) or ""
1240
1241 if wim_status_dict.get("error_msg"):
1242 sdn_info = wim_status_dict.get("error_msg") or ""
1243
1244 if pending_ports:
1245 if sdn_status != "ERROR":
1246 sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1247 len(ports) - pending_ports, len(ports)
1248 )
1249
1250 if sdn_status == "ACTIVE":
1251 sdn_status = "BUILD"
1252
1253 ro_vim_item_update = {
1254 "vim_id": sdn_net_id,
1255 "vim_status": sdn_status,
1256 "created": created,
1257 "created_items": created_items,
1258 "connected_ports": connected_ports,
1259 "vim_details": sdn_info,
1260 "vim_message": None,
1261 "last_update": last_update,
1262 }
1263
1264 return sdn_status, ro_vim_item_update
1265 except Exception as e:
1266 self.logger.error(
1267 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e),
1268 exc_info=not isinstance(
1269 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1270 ),
1271 )
1272 ro_vim_item_update = {
1273 "vim_status": "VIM_ERROR",
1274 "created": created,
1275 "vim_message": str(e),
1276 }
1277
1278 return "FAILED", ro_vim_item_update
1279
1280 def delete(self, ro_task, task_index):
1281 task = ro_task["tasks"][task_index]
1282 task_id = task["task_id"]
1283 sdn_vim_id = ro_task["vim_info"].get("vim_id")
1284 ro_vim_item_update_ok = {
1285 "vim_status": "DELETED",
1286 "created": False,
1287 "vim_message": "DELETED",
1288 "vim_id": None,
1289 }
1290
1291 try:
1292 if sdn_vim_id:
1293 target_vim = self.my_vims[ro_task["target_id"]]
1294 target_vim.delete_connectivity_service(
1295 sdn_vim_id, ro_task["vim_info"].get("created_items")
1296 )
1297
1298 except Exception as e:
1299 if (
1300 isinstance(e, sdnconn.SdnConnectorError)
1301 and e.http_code == HTTPStatus.NOT_FOUND.value
1302 ):
1303 ro_vim_item_update_ok["vim_message"] = "already deleted"
1304 else:
1305 self.logger.error(
1306 "ro_task={} vim={} del-sdn-net={}: {}".format(
1307 ro_task["_id"], ro_task["target_id"], sdn_vim_id, e
1308 ),
1309 exc_info=not isinstance(
1310 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1311 ),
1312 )
1313 ro_vim_item_update = {
1314 "vim_status": "VIM_ERROR",
1315 "vim_message": "Error while deleting: {}".format(e),
1316 }
1317
1318 return "FAILED", ro_vim_item_update
1319
1320 self.logger.debug(
1321 "task={} {} del-sdn-net={} {}".format(
1322 task_id,
1323 ro_task["target_id"],
1324 sdn_vim_id,
1325 ro_vim_item_update_ok.get("vim_message", ""),
1326 )
1327 )
1328
1329 return "DONE", ro_vim_item_update_ok
1330
1331
1332 class VimInteractionMigration(VimInteractionBase):
1333 def exec(self, ro_task, task_index, task_depends):
1334 task = ro_task["tasks"][task_index]
1335 task_id = task["task_id"]
1336 db_task_update = {"retries": 0}
1337 target_vim = self.my_vims[ro_task["target_id"]]
1338 vim_interfaces = []
1339 created = False
1340 created_items = {}
1341 refreshed_vim_info = {}
1342
1343 try:
1344 if task.get("params"):
1345 vim_vm_id = task["params"].get("vim_vm_id")
1346 migrate_host = task["params"].get("migrate_host")
1347 _, migrated_compute_node = target_vim.migrate_instance(
1348 vim_vm_id, migrate_host
1349 )
1350
1351 if migrated_compute_node:
1352 # When VM is migrated, vdu["vim_info"] needs to be updated
1353 vdu_old_vim_info = task["params"]["vdu_vim_info"].get(
1354 ro_task["target_id"]
1355 )
1356
1357 # Refresh VM to get new vim_info
1358 vm_to_refresh_list = [vim_vm_id]
1359 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
1360 refreshed_vim_info = vim_dict[vim_vm_id]
1361
1362 if refreshed_vim_info.get("interfaces"):
1363 for old_iface in vdu_old_vim_info.get("interfaces"):
1364 iface = next(
1365 (
1366 iface
1367 for iface in refreshed_vim_info["interfaces"]
1368 if old_iface["vim_interface_id"]
1369 == iface["vim_interface_id"]
1370 ),
1371 None,
1372 )
1373 vim_interfaces.append(iface)
1374
1375 ro_vim_item_update = {
1376 "vim_id": vim_vm_id,
1377 "vim_status": "ACTIVE",
1378 "created": created,
1379 "created_items": created_items,
1380 "vim_details": None,
1381 "vim_message": None,
1382 }
1383
1384 if refreshed_vim_info and refreshed_vim_info.get("status") not in (
1385 "ERROR",
1386 "VIM_ERROR",
1387 ):
1388 ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"]
1389
1390 if vim_interfaces:
1391 ro_vim_item_update["interfaces"] = vim_interfaces
1392
1393 self.logger.debug(
1394 "task={} {} vm-migration done".format(task_id, ro_task["target_id"])
1395 )
1396
1397 return "DONE", ro_vim_item_update, db_task_update
1398
1399 except (vimconn.VimConnException, NsWorkerException) as e:
1400 self.logger.error(
1401 "task={} vim={} VM Migration:"
1402 " {}".format(task_id, ro_task["target_id"], e)
1403 )
1404 ro_vim_item_update = {
1405 "vim_status": "VIM_ERROR",
1406 "created": created,
1407 "vim_message": str(e),
1408 }
1409
1410 return "FAILED", ro_vim_item_update, db_task_update
1411
1412
1413 class VimInteractionResize(VimInteractionBase):
1414 def exec(self, ro_task, task_index, task_depends):
1415 task = ro_task["tasks"][task_index]
1416 task_id = task["task_id"]
1417 db_task_update = {"retries": 0}
1418 created = False
1419 target_flavor_uuid = None
1420 created_items = {}
1421 refreshed_vim_info = {}
1422 target_vim = self.my_vims[ro_task["target_id"]]
1423
1424 try:
1425 if task.get("params"):
1426 vim_vm_id = task["params"].get("vim_vm_id")
1427 flavor_dict = task["params"].get("flavor_dict")
1428 self.logger.info("flavor_dict %s", flavor_dict)
1429
1430 try:
1431 target_flavor_uuid = target_vim.get_flavor_id_from_data(flavor_dict)
1432 except Exception as e:
1433 self.logger.info("Cannot find any flavor matching %s.", str(e))
1434 try:
1435 target_flavor_uuid = target_vim.new_flavor(flavor_dict)
1436 except Exception as e:
1437 self.logger.error("Error creating flavor at VIM %s.", str(e))
1438
1439 if target_flavor_uuid is not None:
1440 resized_status = target_vim.resize_instance(
1441 vim_vm_id, target_flavor_uuid
1442 )
1443
1444 if resized_status:
1445 # Refresh VM to get new vim_info
1446 vm_to_refresh_list = [vim_vm_id]
1447 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
1448 refreshed_vim_info = vim_dict[vim_vm_id]
1449
1450 ro_vim_item_update = {
1451 "vim_id": vim_vm_id,
1452 "vim_status": "DONE",
1453 "created": created,
1454 "created_items": created_items,
1455 "vim_details": None,
1456 "vim_message": None,
1457 }
1458
1459 if refreshed_vim_info and refreshed_vim_info.get("status") not in (
1460 "ERROR",
1461 "VIM_ERROR",
1462 ):
1463 ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"]
1464
1465 self.logger.debug(
1466 "task={} {} resize done".format(task_id, ro_task["target_id"])
1467 )
1468 return "DONE", ro_vim_item_update, db_task_update
1469 except (vimconn.VimConnException, NsWorkerException) as e:
1470 self.logger.error(
1471 "task={} vim={} Resize:" " {}".format(task_id, ro_task["target_id"], e)
1472 )
1473 ro_vim_item_update = {
1474 "vim_status": "VIM_ERROR",
1475 "created": created,
1476 "vim_message": str(e),
1477 }
1478
1479 return "FAILED", ro_vim_item_update, db_task_update
1480
1481
1482 class ConfigValidate:
1483 def __init__(self, config: Dict):
1484 self.conf = config
1485
1486 @property
1487 def active(self):
1488 # default 1 min, allowed >= 60 or -1, -1 disables periodic checks
1489 if (
1490 self.conf["period"]["refresh_active"] >= 60
1491 or self.conf["period"]["refresh_active"] == -1
1492 ):
1493 return self.conf["period"]["refresh_active"]
1494
1495 return 60
1496
1497 @property
1498 def build(self):
1499 return self.conf["period"]["refresh_build"]
1500
1501 @property
1502 def image(self):
1503 return self.conf["period"]["refresh_image"]
1504
1505 @property
1506 def error(self):
1507 return self.conf["period"]["refresh_error"]
1508
1509 @property
1510 def queue_size(self):
1511 return self.conf["period"]["queue_size"]
1512
1513
1514 class NsWorker(threading.Thread):
1515 def __init__(self, worker_index, config, plugins, db):
1516 """
1517 :param worker_index: thread index
1518 :param config: general configuration of RO, among others the process_id with the docker id where it runs
1519 :param plugins: global shared dict with the loaded plugins
1520 :param db: database class instance to use
1521 """
1522 threading.Thread.__init__(self)
1523 self.config = config
1524 self.plugins = plugins
1525 self.plugin_name = "unknown"
1526 self.logger = logging.getLogger("ro.worker{}".format(worker_index))
1527 self.worker_index = worker_index
1528 # refresh periods for created items
1529 self.refresh_config = ConfigValidate(config)
1530 self.task_queue = queue.Queue(self.refresh_config.queue_size)
1531 # targetvim: vimplugin class
1532 self.my_vims = {}
1533 # targetvim: vim information from database
1534 self.db_vims = {}
1535 # targetvim list
1536 self.vim_targets = []
1537 self.my_id = config["process_id"] + ":" + str(worker_index)
1538 self.db = db
1539 self.item2class = {
1540 "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger),
1541 "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger),
1542 "image": VimInteractionImage(
1543 self.db, self.my_vims, self.db_vims, self.logger
1544 ),
1545 "flavor": VimInteractionFlavor(
1546 self.db, self.my_vims, self.db_vims, self.logger
1547 ),
1548 "sdn_net": VimInteractionSdnNet(
1549 self.db, self.my_vims, self.db_vims, self.logger
1550 ),
1551 "update": VimInteractionUpdateVdu(
1552 self.db, self.my_vims, self.db_vims, self.logger
1553 ),
1554 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
1555 self.db, self.my_vims, self.db_vims, self.logger
1556 ),
1557 "migrate": VimInteractionMigration(
1558 self.db, self.my_vims, self.db_vims, self.logger
1559 ),
1560 "verticalscale": VimInteractionResize(
1561 self.db, self.my_vims, self.db_vims, self.logger
1562 ),
1563 }
1564 self.time_last_task_processed = None
1565 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1566 self.tasks_to_delete = []
1567 # it is idle when there are not vim_targets associated
1568 self.idle = True
1569 self.task_locked_time = config["global"]["task_locked_time"]
1570
1571 def insert_task(self, task):
1572 try:
1573 self.task_queue.put(task, False)
1574 return None
1575 except queue.Full:
1576 raise NsWorkerException("timeout inserting a task")
1577
1578 def terminate(self):
1579 self.insert_task("exit")
1580
1581 def del_task(self, task):
1582 with self.task_lock:
1583 if task["status"] == "SCHEDULED":
1584 task["status"] = "SUPERSEDED"
1585 return True
1586 else: # task["status"] == "processing"
1587 self.task_lock.release()
1588 return False
1589
1590 def _process_vim_config(self, target_id, db_vim):
1591 """
1592 Process vim config, creating vim configuration files as ca_cert
1593 :param target_id: vim/sdn/wim + id
1594 :param db_vim: Vim dictionary obtained from database
1595 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1596 """
1597 if not db_vim.get("config"):
1598 return
1599
1600 file_name = ""
1601
1602 try:
1603 if db_vim["config"].get("ca_cert_content"):
1604 file_name = "{}:{}".format(target_id, self.worker_index)
1605
1606 try:
1607 mkdir(file_name)
1608 except FileExistsError:
1609 self.logger.exception(
1610 "FileExistsError occured while processing vim_config."
1611 )
1612
1613 file_name = file_name + "/ca_cert"
1614
1615 with open(file_name, "w") as f:
1616 f.write(db_vim["config"]["ca_cert_content"])
1617 del db_vim["config"]["ca_cert_content"]
1618 db_vim["config"]["ca_cert"] = file_name
1619 except Exception as e:
1620 raise NsWorkerException(
1621 "Error writing to file '{}': {}".format(file_name, e)
1622 )
1623
1624 def _load_plugin(self, name, type="vim"):
1625 # type can be vim or sdn
1626 if "rovim_dummy" not in self.plugins:
1627 self.plugins["rovim_dummy"] = VimDummyConnector
1628
1629 if "rosdn_dummy" not in self.plugins:
1630 self.plugins["rosdn_dummy"] = SdnDummyConnector
1631
1632 if name in self.plugins:
1633 return self.plugins[name]
1634
1635 try:
1636 for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name):
1637 self.plugins[name] = ep.load()
1638 except Exception as e:
1639 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e))
1640
1641 if name and name not in self.plugins:
1642 raise NsWorkerException(
1643 "Plugin 'osm_{n}' has not been installed".format(n=name)
1644 )
1645
1646 return self.plugins[name]
1647
1648 def _unload_vim(self, target_id):
1649 """
1650 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1651 :param target_id: Contains type:_id; where type can be 'vim', ...
1652 :return: None.
1653 """
1654 try:
1655 self.db_vims.pop(target_id, None)
1656 self.my_vims.pop(target_id, None)
1657
1658 if target_id in self.vim_targets:
1659 self.vim_targets.remove(target_id)
1660
1661 self.logger.info("Unloaded {}".format(target_id))
1662 rmtree("{}:{}".format(target_id, self.worker_index))
1663 except FileNotFoundError:
1664 # This is raised by rmtree if folder does not exist.
1665 self.logger.exception("FileNotFoundError occured while unloading VIM.")
1666 except Exception as e:
1667 self.logger.error("Cannot unload {}: {}".format(target_id, e))
1668
1669 def _check_vim(self, target_id):
1670 """
1671 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1672 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1673 :return: None.
1674 """
1675 target, _, _id = target_id.partition(":")
1676 now = time.time()
1677 update_dict = {}
1678 unset_dict = {}
1679 op_text = ""
1680 step = ""
1681 loaded = target_id in self.vim_targets
1682 target_database = (
1683 "vim_accounts"
1684 if target == "vim"
1685 else "wim_accounts"
1686 if target == "wim"
1687 else "sdns"
1688 )
1689
1690 try:
1691 step = "Getting {} from db".format(target_id)
1692 db_vim = self.db.get_one(target_database, {"_id": _id})
1693
1694 for op_index, operation in enumerate(
1695 db_vim["_admin"].get("operations", ())
1696 ):
1697 if operation["operationState"] != "PROCESSING":
1698 continue
1699
1700 locked_at = operation.get("locked_at")
1701
1702 if locked_at is not None and locked_at >= now - self.task_locked_time:
1703 # some other thread is doing this operation
1704 return
1705
1706 # lock
1707 op_text = "_admin.operations.{}.".format(op_index)
1708
1709 if not self.db.set_one(
1710 target_database,
1711 q_filter={
1712 "_id": _id,
1713 op_text + "operationState": "PROCESSING",
1714 op_text + "locked_at": locked_at,
1715 },
1716 update_dict={
1717 op_text + "locked_at": now,
1718 "admin.current_operation": op_index,
1719 },
1720 fail_on_empty=False,
1721 ):
1722 return
1723
1724 unset_dict[op_text + "locked_at"] = None
1725 unset_dict["current_operation"] = None
1726 step = "Loading " + target_id
1727 error_text = self._load_vim(target_id)
1728
1729 if not error_text:
1730 step = "Checking connectivity"
1731
1732 if target == "vim":
1733 self.my_vims[target_id].check_vim_connectivity()
1734 else:
1735 self.my_vims[target_id].check_credentials()
1736
1737 update_dict["_admin.operationalState"] = "ENABLED"
1738 update_dict["_admin.detailed-status"] = ""
1739 unset_dict[op_text + "detailed-status"] = None
1740 update_dict[op_text + "operationState"] = "COMPLETED"
1741
1742 return
1743
1744 except Exception as e:
1745 error_text = "{}: {}".format(step, e)
1746 self.logger.error("{} for {}: {}".format(step, target_id, e))
1747
1748 finally:
1749 if update_dict or unset_dict:
1750 if error_text:
1751 update_dict[op_text + "operationState"] = "FAILED"
1752 update_dict[op_text + "detailed-status"] = error_text
1753 unset_dict.pop(op_text + "detailed-status", None)
1754 update_dict["_admin.operationalState"] = "ERROR"
1755 update_dict["_admin.detailed-status"] = error_text
1756
1757 if op_text:
1758 update_dict[op_text + "statusEnteredTime"] = now
1759
1760 self.db.set_one(
1761 target_database,
1762 q_filter={"_id": _id},
1763 update_dict=update_dict,
1764 unset=unset_dict,
1765 fail_on_empty=False,
1766 )
1767
1768 if not loaded:
1769 self._unload_vim(target_id)
1770
1771 def _reload_vim(self, target_id):
1772 if target_id in self.vim_targets:
1773 self._load_vim(target_id)
1774 else:
1775 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1776 # just remove it to force load again next time it is needed
1777 self.db_vims.pop(target_id, None)
1778
1779 def _load_vim(self, target_id):
1780 """
1781 Load or reload a vim_account, sdn_controller or wim_account.
1782 Read content from database, load the plugin if not loaded.
1783 In case of error loading the plugin, it load a failing VIM_connector
1784 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1785 :param target_id: Contains type:_id; where type can be 'vim', ...
1786 :return: None if ok, descriptive text if error
1787 """
1788 target, _, _id = target_id.partition(":")
1789 target_database = (
1790 "vim_accounts"
1791 if target == "vim"
1792 else "wim_accounts"
1793 if target == "wim"
1794 else "sdns"
1795 )
1796 plugin_name = ""
1797 vim = None
1798
1799 try:
1800 step = "Getting {}={} from db".format(target, _id)
1801 # TODO process for wim, sdnc, ...
1802 vim = self.db.get_one(target_database, {"_id": _id})
1803
1804 # if deep_get(vim, "config", "sdn-controller"):
1805 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1806 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1807
1808 step = "Decrypting password"
1809 schema_version = vim.get("schema_version")
1810 self.db.encrypt_decrypt_fields(
1811 vim,
1812 "decrypt",
1813 fields=("password", "secret"),
1814 schema_version=schema_version,
1815 salt=_id,
1816 )
1817 self._process_vim_config(target_id, vim)
1818
1819 if target == "vim":
1820 plugin_name = "rovim_" + vim["vim_type"]
1821 step = "Loading plugin '{}'".format(plugin_name)
1822 vim_module_conn = self._load_plugin(plugin_name)
1823 step = "Loading {}'".format(target_id)
1824 self.my_vims[target_id] = vim_module_conn(
1825 uuid=vim["_id"],
1826 name=vim["name"],
1827 tenant_id=vim.get("vim_tenant_id"),
1828 tenant_name=vim.get("vim_tenant_name"),
1829 url=vim["vim_url"],
1830 url_admin=None,
1831 user=vim["vim_user"],
1832 passwd=vim["vim_password"],
1833 config=vim.get("config") or {},
1834 persistent_info={},
1835 )
1836 else: # sdn
1837 plugin_name = "rosdn_" + (vim.get("type") or vim.get("wim_type"))
1838 step = "Loading plugin '{}'".format(plugin_name)
1839 vim_module_conn = self._load_plugin(plugin_name, "sdn")
1840 step = "Loading {}'".format(target_id)
1841 wim = deepcopy(vim)
1842 wim_config = wim.pop("config", {}) or {}
1843 wim["uuid"] = wim["_id"]
1844 if "url" in wim and "wim_url" not in wim:
1845 wim["wim_url"] = wim["url"]
1846 elif "url" not in wim and "wim_url" in wim:
1847 wim["url"] = wim["wim_url"]
1848
1849 if wim.get("dpid"):
1850 wim_config["dpid"] = wim.pop("dpid")
1851
1852 if wim.get("switch_id"):
1853 wim_config["switch_id"] = wim.pop("switch_id")
1854
1855 # wim, wim_account, config
1856 self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config)
1857 self.db_vims[target_id] = vim
1858 self.error_status = None
1859
1860 self.logger.info(
1861 "Connector loaded for {}, plugin={}".format(target_id, plugin_name)
1862 )
1863 except Exception as e:
1864 self.logger.error(
1865 "Cannot load {} plugin={}: {} {}".format(
1866 target_id, plugin_name, step, e
1867 )
1868 )
1869
1870 self.db_vims[target_id] = vim or {}
1871 self.db_vims[target_id] = FailingConnector(str(e))
1872 error_status = "{} Error: {}".format(step, e)
1873
1874 return error_status
1875 finally:
1876 if target_id not in self.vim_targets:
1877 self.vim_targets.append(target_id)
1878
1879 def _get_db_task(self):
1880 """
1881 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1882 :return: None
1883 """
1884 now = time.time()
1885
1886 if not self.time_last_task_processed:
1887 self.time_last_task_processed = now
1888
1889 try:
1890 while True:
1891 """
1892 # Log RO tasks only when loglevel is DEBUG
1893 if self.logger.getEffectiveLevel() == logging.DEBUG:
1894 self._log_ro_task(
1895 None,
1896 None,
1897 None,
1898 "TASK_WF",
1899 "task_locked_time="
1900 + str(self.task_locked_time)
1901 + " "
1902 + "time_last_task_processed="
1903 + str(self.time_last_task_processed)
1904 + " "
1905 + "now="
1906 + str(now),
1907 )
1908 """
1909 locked = self.db.set_one(
1910 "ro_tasks",
1911 q_filter={
1912 "target_id": self.vim_targets,
1913 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1914 "locked_at.lt": now - self.task_locked_time,
1915 "to_check_at.lt": self.time_last_task_processed,
1916 "to_check_at.gt": -1,
1917 },
1918 update_dict={"locked_by": self.my_id, "locked_at": now},
1919 fail_on_empty=False,
1920 )
1921
1922 if locked:
1923 # read and return
1924 ro_task = self.db.get_one(
1925 "ro_tasks",
1926 q_filter={
1927 "target_id": self.vim_targets,
1928 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1929 "locked_at": now,
1930 },
1931 )
1932 return ro_task
1933
1934 if self.time_last_task_processed == now:
1935 self.time_last_task_processed = None
1936 return None
1937 else:
1938 self.time_last_task_processed = now
1939 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
1940
1941 except DbException as e:
1942 self.logger.error("Database exception at _get_db_task: {}".format(e))
1943 except Exception as e:
1944 self.logger.critical(
1945 "Unexpected exception at _get_db_task: {}".format(e), exc_info=True
1946 )
1947
1948 return None
1949
1950 def _get_db_all_tasks(self):
1951 """
1952 Read all content of table ro_tasks to log it
1953 :return: None
1954 """
1955 try:
1956 # Checking the content of the BD:
1957
1958 # read and return
1959 ro_task = self.db.get_list("ro_tasks")
1960 for rt in ro_task:
1961 self._log_ro_task(rt, None, None, "TASK_WF", "GET_ALL_TASKS")
1962 return ro_task
1963
1964 except DbException as e:
1965 self.logger.error("Database exception at _get_db_all_tasks: {}".format(e))
1966 except Exception as e:
1967 self.logger.critical(
1968 "Unexpected exception at _get_db_all_tasks: {}".format(e), exc_info=True
1969 )
1970
1971 return None
1972
1973 def _log_ro_task(self, ro_task, db_ro_task_update, db_ro_task_delete, mark, event):
1974 """
1975 Generate a log with the following format:
1976
1977 Mark;Event;ro_task_id;locked_at;modified_at;created_at;to_check_at;locked_by;
1978 target_id;vim_info.refresh_at;vim_info;no_of_tasks;task_status;action_id;
1979 task_array_index;task_id;task_action;task_item;task_args
1980
1981 Example:
1982
1983 TASK_WF;GET_TASK;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;1642158724.8210013;
1984 1642158640.7986135;1642158640.7986135;1642158640.7986135;b134c9494e75:0a
1985 ;vim:b7ff9e24-8868-4d68-8a57-a59dc11d0327;None;{'created': False,
1986 'created_items': None, 'vim_id': None, 'vim_name': None, 'vim_status': None,
1987 'vim_details': None, 'vim_message': None, 'refresh_at': None};1;SCHEDULED;
1988 888f1864-749a-4fc2-bc1a-97c0fffd6a6f;0;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;
1989 CREATE;image;{'filter_dict': {'name': 'ubuntu-os-cloud:image-family:ubuntu-1804-lts'}}
1990 """
1991 try:
1992 line = []
1993 i = 0
1994 if ro_task is not None and isinstance(ro_task, dict):
1995 for t in ro_task["tasks"]:
1996 line.clear()
1997 line.append(mark)
1998 line.append(event)
1999 line.append(ro_task.get("_id", ""))
2000 line.append(str(ro_task.get("locked_at", "")))
2001 line.append(str(ro_task.get("modified_at", "")))
2002 line.append(str(ro_task.get("created_at", "")))
2003 line.append(str(ro_task.get("to_check_at", "")))
2004 line.append(str(ro_task.get("locked_by", "")))
2005 line.append(str(ro_task.get("target_id", "")))
2006 line.append(str(ro_task.get("vim_info", {}).get("refresh_at", "")))
2007 line.append(str(ro_task.get("vim_info", "")))
2008 line.append(str(ro_task.get("tasks", "")))
2009 if isinstance(t, dict):
2010 line.append(str(t.get("status", "")))
2011 line.append(str(t.get("action_id", "")))
2012 line.append(str(i))
2013 line.append(str(t.get("task_id", "")))
2014 line.append(str(t.get("action", "")))
2015 line.append(str(t.get("item", "")))
2016 line.append(str(t.get("find_params", "")))
2017 line.append(str(t.get("params", "")))
2018 else:
2019 line.extend([""] * 2)
2020 line.append(str(i))
2021 line.extend([""] * 5)
2022
2023 i += 1
2024 self.logger.debug(";".join(line))
2025 elif db_ro_task_update is not None and isinstance(db_ro_task_update, dict):
2026 i = 0
2027 while True:
2028 st = "tasks.{}.status".format(i)
2029 if st not in db_ro_task_update:
2030 break
2031 line.clear()
2032 line.append(mark)
2033 line.append(event)
2034 line.append(db_ro_task_update.get("_id", ""))
2035 line.append(str(db_ro_task_update.get("locked_at", "")))
2036 line.append(str(db_ro_task_update.get("modified_at", "")))
2037 line.append("")
2038 line.append(str(db_ro_task_update.get("to_check_at", "")))
2039 line.append(str(db_ro_task_update.get("locked_by", "")))
2040 line.append("")
2041 line.append(str(db_ro_task_update.get("vim_info.refresh_at", "")))
2042 line.append("")
2043 line.append(str(db_ro_task_update.get("vim_info", "")))
2044 line.append(str(str(db_ro_task_update).count(".status")))
2045 line.append(db_ro_task_update.get(st, ""))
2046 line.append("")
2047 line.append(str(i))
2048 line.extend([""] * 3)
2049 i += 1
2050 self.logger.debug(";".join(line))
2051
2052 elif db_ro_task_delete is not None and isinstance(db_ro_task_delete, dict):
2053 line.clear()
2054 line.append(mark)
2055 line.append(event)
2056 line.append(db_ro_task_delete.get("_id", ""))
2057 line.append("")
2058 line.append(db_ro_task_delete.get("modified_at", ""))
2059 line.extend([""] * 13)
2060 self.logger.debug(";".join(line))
2061
2062 else:
2063 line.clear()
2064 line.append(mark)
2065 line.append(event)
2066 line.extend([""] * 16)
2067 self.logger.debug(";".join(line))
2068
2069 except Exception as e:
2070 self.logger.error("Error logging ro_task: {}".format(e))
2071
2072 def _delete_task(self, ro_task, task_index, task_depends, db_update):
2073 """
2074 Determine if this task need to be done or superseded
2075 :return: None
2076 """
2077 my_task = ro_task["tasks"][task_index]
2078 task_id = my_task["task_id"]
2079 needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get(
2080 "created_items", False
2081 )
2082
2083 self.logger.debug("Needed delete: {}".format(needed_delete))
2084 if my_task["status"] == "FAILED":
2085 return None, None # TODO need to be retry??
2086
2087 try:
2088 for index, task in enumerate(ro_task["tasks"]):
2089 if index == task_index or not task:
2090 continue # own task
2091
2092 if (
2093 my_task["target_record"] == task["target_record"]
2094 and task["action"] == "CREATE"
2095 ):
2096 # set to finished
2097 db_update["tasks.{}.status".format(index)] = task[
2098 "status"
2099 ] = "FINISHED"
2100 elif task["action"] == "CREATE" and task["status"] not in (
2101 "FINISHED",
2102 "SUPERSEDED",
2103 ):
2104 needed_delete = False
2105
2106 if needed_delete:
2107 self.logger.debug(
2108 "Deleting ro_task={} task_index={}".format(ro_task, task_index)
2109 )
2110 return self.item2class[my_task["item"]].delete(ro_task, task_index)
2111 else:
2112 return "SUPERSEDED", None
2113 except Exception as e:
2114 if not isinstance(e, NsWorkerException):
2115 self.logger.critical(
2116 "Unexpected exception at _delete_task task={}: {}".format(
2117 task_id, e
2118 ),
2119 exc_info=True,
2120 )
2121
2122 return "FAILED", {"vim_status": "VIM_ERROR", "vim_message": str(e)}
2123
2124 def _create_task(self, ro_task, task_index, task_depends, db_update):
2125 """
2126 Determine if this task need to create something at VIM
2127 :return: None
2128 """
2129 my_task = ro_task["tasks"][task_index]
2130 task_id = my_task["task_id"]
2131 task_status = None
2132
2133 if my_task["status"] == "FAILED":
2134 return None, None # TODO need to be retry??
2135 elif my_task["status"] == "SCHEDULED":
2136 # check if already created by another task
2137 for index, task in enumerate(ro_task["tasks"]):
2138 if index == task_index or not task:
2139 continue # own task
2140
2141 if task["action"] == "CREATE" and task["status"] not in (
2142 "SCHEDULED",
2143 "FINISHED",
2144 "SUPERSEDED",
2145 ):
2146 return task["status"], "COPY_VIM_INFO"
2147
2148 try:
2149 task_status, ro_vim_item_update = self.item2class[my_task["item"]].new(
2150 ro_task, task_index, task_depends
2151 )
2152 # TODO update other CREATE tasks
2153 except Exception as e:
2154 if not isinstance(e, NsWorkerException):
2155 self.logger.error(
2156 "Error executing task={}: {}".format(task_id, e), exc_info=True
2157 )
2158
2159 task_status = "FAILED"
2160 ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_message": str(e)}
2161 # TODO update ro_vim_item_update
2162
2163 return task_status, ro_vim_item_update
2164 else:
2165 return None, None
2166
2167 def _get_dependency(self, task_id, ro_task=None, target_id=None):
2168 """
2169 Look for dependency task
2170 :param task_id: Can be one of
2171 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2172 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2173 3. task.task_id: "<action_id>:number"
2174 :param ro_task:
2175 :param target_id:
2176 :return: database ro_task plus index of task
2177 """
2178 if (
2179 task_id.startswith("vim:")
2180 or task_id.startswith("sdn:")
2181 or task_id.startswith("wim:")
2182 ):
2183 target_id, _, task_id = task_id.partition(" ")
2184
2185 if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
2186 ro_task_dependency = self.db.get_one(
2187 "ro_tasks",
2188 q_filter={"target_id": target_id, "tasks.target_record_id": task_id},
2189 fail_on_empty=False,
2190 )
2191
2192 if ro_task_dependency:
2193 for task_index, task in enumerate(ro_task_dependency["tasks"]):
2194 if task["target_record_id"] == task_id:
2195 return ro_task_dependency, task_index
2196
2197 else:
2198 if ro_task:
2199 for task_index, task in enumerate(ro_task["tasks"]):
2200 if task and task["task_id"] == task_id:
2201 return ro_task, task_index
2202
2203 ro_task_dependency = self.db.get_one(
2204 "ro_tasks",
2205 q_filter={
2206 "tasks.ANYINDEX.task_id": task_id,
2207 "tasks.ANYINDEX.target_record.ne": None,
2208 },
2209 fail_on_empty=False,
2210 )
2211
2212 self.logger.debug("ro_task_dependency={}".format(ro_task_dependency))
2213 if ro_task_dependency:
2214 for task_index, task in enumerate(ro_task_dependency["tasks"]):
2215 if task["task_id"] == task_id:
2216 return ro_task_dependency, task_index
2217 raise NsWorkerException("Cannot get depending task {}".format(task_id))
2218
2219 def update_vm_refresh(self):
2220 """Enables the VM status updates if self.refresh_config.active parameter
2221 is not -1 and than updates the DB accordingly
2222
2223 """
2224 try:
2225 self.logger.debug("Checking if VM status update config")
2226 next_refresh = time.time()
2227 if self.refresh_config.active == -1:
2228 next_refresh = -1
2229 else:
2230 next_refresh += self.refresh_config.active
2231
2232 if next_refresh != -1:
2233 db_ro_task_update = {}
2234 now = time.time()
2235 next_check_at = now + (24 * 60 * 60)
2236 next_check_at = min(next_check_at, next_refresh)
2237 db_ro_task_update["vim_info.refresh_at"] = next_refresh
2238 db_ro_task_update["to_check_at"] = next_check_at
2239
2240 self.logger.debug(
2241 "Finding tasks which to be updated to enable VM status updates"
2242 )
2243 refresh_tasks = self.db.get_list(
2244 "ro_tasks",
2245 q_filter={
2246 "tasks.status": "DONE",
2247 "to_check_at.lt": 0,
2248 },
2249 )
2250 self.logger.debug("Updating tasks to change the to_check_at status")
2251 for task in refresh_tasks:
2252 q_filter = {
2253 "_id": task["_id"],
2254 }
2255 self.db.set_one(
2256 "ro_tasks",
2257 q_filter=q_filter,
2258 update_dict=db_ro_task_update,
2259 fail_on_empty=True,
2260 )
2261
2262 except Exception as e:
2263 self.logger.error(f"Error updating tasks to enable VM status updates: {e}")
2264
2265 def _process_pending_tasks(self, ro_task):
2266 ro_task_id = ro_task["_id"]
2267 now = time.time()
2268 # one day
2269 next_check_at = now + (24 * 60 * 60)
2270 db_ro_task_update = {}
2271
2272 def _update_refresh(new_status):
2273 # compute next_refresh
2274 nonlocal task
2275 nonlocal next_check_at
2276 nonlocal db_ro_task_update
2277 nonlocal ro_task
2278
2279 next_refresh = time.time()
2280
2281 if task["item"] in ("image", "flavor"):
2282 next_refresh += self.refresh_config.image
2283 elif new_status == "BUILD":
2284 next_refresh += self.refresh_config.build
2285 elif new_status == "DONE":
2286 if self.refresh_config.active == -1:
2287 next_refresh = -1
2288 else:
2289 next_refresh += self.refresh_config.active
2290 else:
2291 next_refresh += self.refresh_config.error
2292
2293 next_check_at = min(next_check_at, next_refresh)
2294 db_ro_task_update["vim_info.refresh_at"] = next_refresh
2295 ro_task["vim_info"]["refresh_at"] = next_refresh
2296
2297 try:
2298 """
2299 # Log RO tasks only when loglevel is DEBUG
2300 if self.logger.getEffectiveLevel() == logging.DEBUG:
2301 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
2302 """
2303 # Check if vim status refresh is enabled again
2304 self.update_vm_refresh()
2305 # 0: get task_status_create
2306 lock_object = None
2307 task_status_create = None
2308 task_create = next(
2309 (
2310 t
2311 for t in ro_task["tasks"]
2312 if t
2313 and t["action"] == "CREATE"
2314 and t["status"] in ("BUILD", "DONE")
2315 ),
2316 None,
2317 )
2318
2319 if task_create:
2320 task_status_create = task_create["status"]
2321
2322 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
2323 for task_action in ("DELETE", "CREATE", "EXEC"):
2324 db_vim_update = None
2325 new_status = None
2326
2327 for task_index, task in enumerate(ro_task["tasks"]):
2328 if not task:
2329 continue # task deleted
2330
2331 task_depends = {}
2332 target_update = None
2333
2334 if (
2335 (
2336 task_action in ("DELETE", "EXEC")
2337 and task["status"] not in ("SCHEDULED", "BUILD")
2338 )
2339 or task["action"] != task_action
2340 or (
2341 task_action == "CREATE"
2342 and task["status"] in ("FINISHED", "SUPERSEDED")
2343 )
2344 ):
2345 continue
2346
2347 task_path = "tasks.{}.status".format(task_index)
2348 try:
2349 db_vim_info_update = None
2350
2351 if task["status"] == "SCHEDULED":
2352 # check if tasks that this depends on have been completed
2353 dependency_not_completed = False
2354
2355 for dependency_task_id in task.get("depends_on") or ():
2356 (
2357 dependency_ro_task,
2358 dependency_task_index,
2359 ) = self._get_dependency(
2360 dependency_task_id, target_id=ro_task["target_id"]
2361 )
2362 dependency_task = dependency_ro_task["tasks"][
2363 dependency_task_index
2364 ]
2365 self.logger.debug(
2366 "dependency_ro_task={} dependency_task_index={}".format(
2367 dependency_ro_task, dependency_task_index
2368 )
2369 )
2370
2371 if dependency_task["status"] == "SCHEDULED":
2372 dependency_not_completed = True
2373 next_check_at = min(
2374 next_check_at, dependency_ro_task["to_check_at"]
2375 )
2376 # must allow dependent task to be processed first
2377 # to do this set time after last_task_processed
2378 next_check_at = max(
2379 self.time_last_task_processed, next_check_at
2380 )
2381 break
2382 elif dependency_task["status"] == "FAILED":
2383 error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
2384 task["action"],
2385 task["item"],
2386 dependency_task["action"],
2387 dependency_task["item"],
2388 dependency_task_id,
2389 dependency_ro_task["vim_info"].get(
2390 "vim_message"
2391 ),
2392 )
2393 self.logger.error(
2394 "task={} {}".format(task["task_id"], error_text)
2395 )
2396 raise NsWorkerException(error_text)
2397
2398 task_depends[dependency_task_id] = dependency_ro_task[
2399 "vim_info"
2400 ]["vim_id"]
2401 task_depends[
2402 "TASK-{}".format(dependency_task_id)
2403 ] = dependency_ro_task["vim_info"]["vim_id"]
2404
2405 if dependency_not_completed:
2406 self.logger.warning(
2407 "DEPENDENCY NOT COMPLETED {}".format(
2408 dependency_ro_task["vim_info"]["vim_id"]
2409 )
2410 )
2411 # TODO set at vim_info.vim_details that it is waiting
2412 continue
2413
2414 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
2415 # the task of renew this locking. It will update database locket_at periodically
2416 if not lock_object:
2417 lock_object = LockRenew.add_lock_object(
2418 "ro_tasks", ro_task, self
2419 )
2420
2421 if task["action"] == "DELETE":
2422 (new_status, db_vim_info_update,) = self._delete_task(
2423 ro_task, task_index, task_depends, db_ro_task_update
2424 )
2425 new_status = (
2426 "FINISHED" if new_status == "DONE" else new_status
2427 )
2428 # ^with FINISHED instead of DONE it will not be refreshing
2429
2430 if new_status in ("FINISHED", "SUPERSEDED"):
2431 target_update = "DELETE"
2432 elif task["action"] == "EXEC":
2433 (
2434 new_status,
2435 db_vim_info_update,
2436 db_task_update,
2437 ) = self.item2class[task["item"]].exec(
2438 ro_task, task_index, task_depends
2439 )
2440 new_status = (
2441 "FINISHED" if new_status == "DONE" else new_status
2442 )
2443 # ^with FINISHED instead of DONE it will not be refreshing
2444
2445 if db_task_update:
2446 # load into database the modified db_task_update "retries" and "next_retry"
2447 if db_task_update.get("retries"):
2448 db_ro_task_update[
2449 "tasks.{}.retries".format(task_index)
2450 ] = db_task_update["retries"]
2451
2452 next_check_at = time.time() + db_task_update.get(
2453 "next_retry", 60
2454 )
2455 target_update = None
2456 elif task["action"] == "CREATE":
2457 if task["status"] == "SCHEDULED":
2458 if task_status_create:
2459 new_status = task_status_create
2460 target_update = "COPY_VIM_INFO"
2461 else:
2462 new_status, db_vim_info_update = self.item2class[
2463 task["item"]
2464 ].new(ro_task, task_index, task_depends)
2465 # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
2466 _update_refresh(new_status)
2467 else:
2468 refresh_at = ro_task["vim_info"]["refresh_at"]
2469 if refresh_at and refresh_at != -1 and now > refresh_at:
2470 (new_status, db_vim_info_update,) = self.item2class[
2471 task["item"]
2472 ].refresh(ro_task)
2473 _update_refresh(new_status)
2474 else:
2475 # The refresh is updated to avoid set the value of "refresh_at" to
2476 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2477 # because it can happen that in this case the task is never processed
2478 _update_refresh(task["status"])
2479
2480 except Exception as e:
2481 new_status = "FAILED"
2482 db_vim_info_update = {
2483 "vim_status": "VIM_ERROR",
2484 "vim_message": str(e),
2485 }
2486
2487 if not isinstance(
2488 e, (NsWorkerException, vimconn.VimConnException)
2489 ):
2490 self.logger.error(
2491 "Unexpected exception at _delete_task task={}: {}".format(
2492 task["task_id"], e
2493 ),
2494 exc_info=True,
2495 )
2496
2497 try:
2498 if db_vim_info_update:
2499 db_vim_update = db_vim_info_update.copy()
2500 db_ro_task_update.update(
2501 {
2502 "vim_info." + k: v
2503 for k, v in db_vim_info_update.items()
2504 }
2505 )
2506 ro_task["vim_info"].update(db_vim_info_update)
2507
2508 if new_status:
2509 if task_action == "CREATE":
2510 task_status_create = new_status
2511 db_ro_task_update[task_path] = new_status
2512
2513 if target_update or db_vim_update:
2514 if target_update == "DELETE":
2515 self._update_target(task, None)
2516 elif target_update == "COPY_VIM_INFO":
2517 self._update_target(task, ro_task["vim_info"])
2518 else:
2519 self._update_target(task, db_vim_update)
2520
2521 except Exception as e:
2522 if (
2523 isinstance(e, DbException)
2524 and e.http_code == HTTPStatus.NOT_FOUND
2525 ):
2526 # if the vnfrs or nsrs has been removed from database, this task must be removed
2527 self.logger.debug(
2528 "marking to delete task={}".format(task["task_id"])
2529 )
2530 self.tasks_to_delete.append(task)
2531 else:
2532 self.logger.error(
2533 "Unexpected exception at _update_target task={}: {}".format(
2534 task["task_id"], e
2535 ),
2536 exc_info=True,
2537 )
2538
2539 locked_at = ro_task["locked_at"]
2540
2541 if lock_object:
2542 locked_at = [
2543 lock_object["locked_at"],
2544 lock_object["locked_at"] + self.task_locked_time,
2545 ]
2546 # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
2547 # contain exactly locked_at + self.task_locked_time
2548 LockRenew.remove_lock_object(lock_object)
2549
2550 q_filter = {
2551 "_id": ro_task["_id"],
2552 "to_check_at": ro_task["to_check_at"],
2553 "locked_at": locked_at,
2554 }
2555 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2556 # outside this task (by ro_nbi) do not update it
2557 db_ro_task_update["locked_by"] = None
2558 # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
2559 db_ro_task_update["locked_at"] = int(now - self.task_locked_time)
2560 db_ro_task_update["modified_at"] = now
2561 db_ro_task_update["to_check_at"] = next_check_at
2562
2563 """
2564 # Log RO tasks only when loglevel is DEBUG
2565 if self.logger.getEffectiveLevel() == logging.DEBUG:
2566 db_ro_task_update_log = db_ro_task_update.copy()
2567 db_ro_task_update_log["_id"] = q_filter["_id"]
2568 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2569 """
2570
2571 if not self.db.set_one(
2572 "ro_tasks",
2573 update_dict=db_ro_task_update,
2574 q_filter=q_filter,
2575 fail_on_empty=False,
2576 ):
2577 del db_ro_task_update["to_check_at"]
2578 del q_filter["to_check_at"]
2579 """
2580 # Log RO tasks only when loglevel is DEBUG
2581 if self.logger.getEffectiveLevel() == logging.DEBUG:
2582 self._log_ro_task(
2583 None,
2584 db_ro_task_update_log,
2585 None,
2586 "TASK_WF",
2587 "SET_TASK " + str(q_filter),
2588 )
2589 """
2590 self.db.set_one(
2591 "ro_tasks",
2592 q_filter=q_filter,
2593 update_dict=db_ro_task_update,
2594 fail_on_empty=True,
2595 )
2596 except DbException as e:
2597 self.logger.error(
2598 "ro_task={} Error updating database {}".format(ro_task_id, e)
2599 )
2600 except Exception as e:
2601 self.logger.error(
2602 "Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True
2603 )
2604
2605 def _update_target(self, task, ro_vim_item_update):
2606 table, _, temp = task["target_record"].partition(":")
2607 _id, _, path_vim_status = temp.partition(":")
2608 path_item = path_vim_status[: path_vim_status.rfind(".")]
2609 path_item = path_item[: path_item.rfind(".")]
2610 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
2611 # path_item: dot separated list targeting record information, e.g. "vdur.10"
2612
2613 if ro_vim_item_update:
2614 update_dict = {
2615 path_vim_status + "." + k: v
2616 for k, v in ro_vim_item_update.items()
2617 if k
2618 in (
2619 "vim_id",
2620 "vim_details",
2621 "vim_message",
2622 "vim_name",
2623 "vim_status",
2624 "interfaces",
2625 "interfaces_backup",
2626 )
2627 }
2628
2629 if path_vim_status.startswith("vdur."):
2630 # for backward compatibility, add vdur.name apart from vdur.vim_name
2631 if ro_vim_item_update.get("vim_name"):
2632 update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"]
2633
2634 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
2635 if ro_vim_item_update.get("vim_id"):
2636 update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"]
2637
2638 # update general status
2639 if ro_vim_item_update.get("vim_status"):
2640 update_dict[path_item + ".status"] = ro_vim_item_update[
2641 "vim_status"
2642 ]
2643
2644 if ro_vim_item_update.get("interfaces"):
2645 path_interfaces = path_item + ".interfaces"
2646
2647 for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
2648 if iface:
2649 update_dict.update(
2650 {
2651 path_interfaces + ".{}.".format(i) + k: v
2652 for k, v in iface.items()
2653 if k in ("vlan", "compute_node", "pci")
2654 }
2655 )
2656
2657 # put ip_address and mac_address with ip-address and mac-address
2658 if iface.get("ip_address"):
2659 update_dict[
2660 path_interfaces + ".{}.".format(i) + "ip-address"
2661 ] = iface["ip_address"]
2662
2663 if iface.get("mac_address"):
2664 update_dict[
2665 path_interfaces + ".{}.".format(i) + "mac-address"
2666 ] = iface["mac_address"]
2667
2668 if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
2669 update_dict["ip-address"] = iface.get("ip_address").split(
2670 ";"
2671 )[0]
2672
2673 if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
2674 update_dict[path_item + ".ip-address"] = iface.get(
2675 "ip_address"
2676 ).split(";")[0]
2677
2678 self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
2679
2680 # If interfaces exists, it backups VDU interfaces in the DB for healing operations
2681 if ro_vim_item_update.get("interfaces"):
2682 search_key = path_vim_status + ".interfaces"
2683 if update_dict.get(search_key):
2684 interfaces_backup_update = {
2685 path_vim_status + ".interfaces_backup": update_dict[search_key]
2686 }
2687
2688 self.db.set_one(
2689 table,
2690 q_filter={"_id": _id},
2691 update_dict=interfaces_backup_update,
2692 )
2693
2694 else:
2695 update_dict = {path_item + ".status": "DELETED"}
2696 self.db.set_one(
2697 table,
2698 q_filter={"_id": _id},
2699 update_dict=update_dict,
2700 unset={path_vim_status: None},
2701 )
2702
2703 def _process_delete_db_tasks(self):
2704 """
2705 Delete task from database because vnfrs or nsrs or both have been deleted
2706 :return: None. Uses and modify self.tasks_to_delete
2707 """
2708 while self.tasks_to_delete:
2709 task = self.tasks_to_delete[0]
2710 vnfrs_deleted = None
2711 nsr_id = task["nsr_id"]
2712
2713 if task["target_record"].startswith("vnfrs:"):
2714 # check if nsrs is present
2715 if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False):
2716 vnfrs_deleted = task["target_record"].split(":")[1]
2717
2718 try:
2719 self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted)
2720 except Exception as e:
2721 self.logger.error(
2722 "Error deleting task={}: {}".format(task["task_id"], e)
2723 )
2724 self.tasks_to_delete.pop(0)
2725
2726 @staticmethod
2727 def delete_db_tasks(db, nsr_id, vnfrs_deleted):
2728 """
2729 Static method because it is called from osm_ng_ro.ns
2730 :param db: instance of database to use
2731 :param nsr_id: affected nsrs id
2732 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2733 :return: None, exception is fails
2734 """
2735 retries = 5
2736 for retry in range(retries):
2737 ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
2738 now = time.time()
2739 conflict = False
2740
2741 for ro_task in ro_tasks:
2742 db_update = {}
2743 to_delete_ro_task = True
2744
2745 for index, task in enumerate(ro_task["tasks"]):
2746 if not task:
2747 pass
2748 elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or (
2749 vnfrs_deleted
2750 and task["target_record"].startswith("vnfrs:" + vnfrs_deleted)
2751 ):
2752 db_update["tasks.{}".format(index)] = None
2753 else:
2754 # used by other nsr, ro_task cannot be deleted
2755 to_delete_ro_task = False
2756
2757 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2758 if to_delete_ro_task:
2759 if not db.del_one(
2760 "ro_tasks",
2761 q_filter={
2762 "_id": ro_task["_id"],
2763 "modified_at": ro_task["modified_at"],
2764 },
2765 fail_on_empty=False,
2766 ):
2767 conflict = True
2768 elif db_update:
2769 db_update["modified_at"] = now
2770 if not db.set_one(
2771 "ro_tasks",
2772 q_filter={
2773 "_id": ro_task["_id"],
2774 "modified_at": ro_task["modified_at"],
2775 },
2776 update_dict=db_update,
2777 fail_on_empty=False,
2778 ):
2779 conflict = True
2780 if not conflict:
2781 return
2782 else:
2783 raise NsWorkerException("Exceeded {} retries".format(retries))
2784
2785 def run(self):
2786 # load database
2787 self.logger.info("Starting")
2788 while True:
2789 # step 1: get commands from queue
2790 try:
2791 if self.vim_targets:
2792 task = self.task_queue.get(block=False)
2793 else:
2794 if not self.idle:
2795 self.logger.debug("enters in idle state")
2796 self.idle = True
2797 task = self.task_queue.get(block=True)
2798 self.idle = False
2799
2800 if task[0] == "terminate":
2801 break
2802 elif task[0] == "load_vim":
2803 self.logger.info("order to load vim {}".format(task[1]))
2804 self._load_vim(task[1])
2805 elif task[0] == "unload_vim":
2806 self.logger.info("order to unload vim {}".format(task[1]))
2807 self._unload_vim(task[1])
2808 elif task[0] == "reload_vim":
2809 self._reload_vim(task[1])
2810 elif task[0] == "check_vim":
2811 self.logger.info("order to check vim {}".format(task[1]))
2812 self._check_vim(task[1])
2813 continue
2814 except Exception as e:
2815 if isinstance(e, queue.Empty):
2816 pass
2817 else:
2818 self.logger.critical(
2819 "Error processing task: {}".format(e), exc_info=True
2820 )
2821
2822 # step 2: process pending_tasks, delete not needed tasks
2823 try:
2824 if self.tasks_to_delete:
2825 self._process_delete_db_tasks()
2826 busy = False
2827 """
2828 # Log RO tasks only when loglevel is DEBUG
2829 if self.logger.getEffectiveLevel() == logging.DEBUG:
2830 _ = self._get_db_all_tasks()
2831 """
2832 ro_task = self._get_db_task()
2833 if ro_task:
2834 self.logger.debug("Task to process: {}".format(ro_task))
2835 time.sleep(1)
2836 self._process_pending_tasks(ro_task)
2837 busy = True
2838 if not busy:
2839 time.sleep(5)
2840 except Exception as e:
2841 self.logger.critical(
2842 "Unexpected exception at run: " + str(e), exc_info=True
2843 )
2844
2845 self.logger.info("Finishing")