Reformat files according to new black validation
[osm/RO.git] / NG-RO / osm_ng_ro / ns_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 #
18 ##
19
20 """"
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
25 """
26
27 from copy import deepcopy
28 from http import HTTPStatus
29 import logging
30 from os import mkdir
31 import queue
32 from shutil import rmtree
33 import threading
34 import time
35 import traceback
36 from typing import Dict
37 from unittest.mock import Mock
38
39 from importlib_metadata import entry_points
40 from osm_common.dbbase import DbException
41 from osm_ng_ro.vim_admin import LockRenew
42 from osm_ro_plugin import sdnconn, vimconn
43 from osm_ro_plugin.sdn_dummy import SdnDummyConnector
44 from osm_ro_plugin.vim_dummy import VimDummyConnector
45 import yaml
46
47 __author__ = "Alfonso Tierno"
48 __date__ = "$28-Sep-2017 12:07:15$"
49
50
51 def deep_get(target_dict, *args, **kwargs):
52 """
53 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
54 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
55 :param target_dict: dictionary to be read
56 :param args: list of keys to read from target_dict
57 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
58 :return: The wanted value if exist, None or default otherwise
59 """
60 for key in args:
61 if not isinstance(target_dict, dict) or key not in target_dict:
62 return kwargs.get("default")
63 target_dict = target_dict[key]
64 return target_dict
65
66
67 class NsWorkerException(Exception):
68 pass
69
70
71 class FailingConnector:
72 def __init__(self, error_msg):
73 self.error_msg = error_msg
74
75 for method in dir(vimconn.VimConnector):
76 if method[0] != "_":
77 setattr(
78 self, method, Mock(side_effect=vimconn.VimConnException(error_msg))
79 )
80
81 for method in dir(sdnconn.SdnConnectorBase):
82 if method[0] != "_":
83 setattr(
84 self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg))
85 )
86
87
88 class NsWorkerExceptionNotFound(NsWorkerException):
89 pass
90
91
92 class VimInteractionBase:
93 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
94 It implements methods that does nothing and return ok"""
95
96 def __init__(self, db, my_vims, db_vims, logger):
97 self.db = db
98 self.logger = logger
99 self.my_vims = my_vims
100 self.db_vims = db_vims
101
102 def new(self, ro_task, task_index, task_depends):
103 return "BUILD", {}
104
105 def refresh(self, ro_task):
106 """skip calling VIM to get image, flavor status. Assumes ok"""
107 if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
108 return "FAILED", {}
109
110 return "DONE", {}
111
112 def delete(self, ro_task, task_index):
113 """skip calling VIM to delete image. Assumes ok"""
114 return "DONE", {}
115
116 def exec(self, ro_task, task_index, task_depends):
117 return "DONE", None, None
118
119
120 class VimInteractionNet(VimInteractionBase):
121 def new(self, ro_task, task_index, task_depends):
122 vim_net_id = None
123 task = ro_task["tasks"][task_index]
124 task_id = task["task_id"]
125 created = False
126 created_items = {}
127 target_vim = self.my_vims[ro_task["target_id"]]
128 mgmtnet = False
129 mgmtnet_defined_in_vim = False
130
131 try:
132 # FIND
133 if task.get("find_params"):
134 # if management, get configuration of VIM
135 if task["find_params"].get("filter_dict"):
136 vim_filter = task["find_params"]["filter_dict"]
137 # management network
138 elif task["find_params"].get("mgmt"):
139 mgmtnet = True
140 if deep_get(
141 self.db_vims[ro_task["target_id"]],
142 "config",
143 "management_network_id",
144 ):
145 mgmtnet_defined_in_vim = True
146 vim_filter = {
147 "id": self.db_vims[ro_task["target_id"]]["config"][
148 "management_network_id"
149 ]
150 }
151 elif deep_get(
152 self.db_vims[ro_task["target_id"]],
153 "config",
154 "management_network_name",
155 ):
156 mgmtnet_defined_in_vim = True
157 vim_filter = {
158 "name": self.db_vims[ro_task["target_id"]]["config"][
159 "management_network_name"
160 ]
161 }
162 else:
163 vim_filter = {"name": task["find_params"]["name"]}
164 else:
165 raise NsWorkerExceptionNotFound(
166 "Invalid find_params for new_net {}".format(task["find_params"])
167 )
168
169 vim_nets = target_vim.get_network_list(vim_filter)
170 if not vim_nets and not task.get("params"):
171 # If there is mgmt-network in the descriptor,
172 # there is no mapping of that network to a VIM network in the descriptor,
173 # also there is no mapping in the "--config" parameter or at VIM creation;
174 # that mgmt-network will be created.
175 if mgmtnet and not mgmtnet_defined_in_vim:
176 net_name = (
177 vim_filter.get("name")
178 if vim_filter.get("name")
179 else vim_filter.get("id")[:16]
180 )
181 vim_net_id, created_items = target_vim.new_network(
182 net_name, None
183 )
184 self.logger.debug(
185 "Created mgmt network vim_net_id: {}".format(vim_net_id)
186 )
187 created = True
188 else:
189 raise NsWorkerExceptionNotFound(
190 "Network not found with this criteria: '{}'".format(
191 task.get("find_params")
192 )
193 )
194 elif len(vim_nets) > 1:
195 raise NsWorkerException(
196 "More than one network found with this criteria: '{}'".format(
197 task["find_params"]
198 )
199 )
200
201 if vim_nets:
202 vim_net_id = vim_nets[0]["id"]
203 else:
204 # CREATE
205 params = task["params"]
206 vim_net_id, created_items = target_vim.new_network(**params)
207 created = True
208
209 ro_vim_item_update = {
210 "vim_id": vim_net_id,
211 "vim_status": "BUILD",
212 "created": created,
213 "created_items": created_items,
214 "vim_details": None,
215 "vim_message": None,
216 }
217 self.logger.debug(
218 "task={} {} new-net={} created={}".format(
219 task_id, ro_task["target_id"], vim_net_id, created
220 )
221 )
222
223 return "BUILD", ro_vim_item_update
224 except (vimconn.VimConnException, NsWorkerException) as e:
225 self.logger.error(
226 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e)
227 )
228 ro_vim_item_update = {
229 "vim_status": "VIM_ERROR",
230 "created": created,
231 "vim_message": str(e),
232 }
233
234 return "FAILED", ro_vim_item_update
235
236 def refresh(self, ro_task):
237 """Call VIM to get network status"""
238 ro_task_id = ro_task["_id"]
239 target_vim = self.my_vims[ro_task["target_id"]]
240 vim_id = ro_task["vim_info"]["vim_id"]
241 net_to_refresh_list = [vim_id]
242
243 try:
244 vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
245 vim_info = vim_dict[vim_id]
246
247 if vim_info["status"] == "ACTIVE":
248 task_status = "DONE"
249 elif vim_info["status"] == "BUILD":
250 task_status = "BUILD"
251 else:
252 task_status = "FAILED"
253 except vimconn.VimConnException as e:
254 # Mark all tasks at VIM_ERROR status
255 self.logger.error(
256 "ro_task={} vim={} get-net={}: {}".format(
257 ro_task_id, ro_task["target_id"], vim_id, e
258 )
259 )
260 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
261 task_status = "FAILED"
262
263 ro_vim_item_update = {}
264 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
265 ro_vim_item_update["vim_status"] = vim_info["status"]
266
267 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
268 ro_vim_item_update["vim_name"] = vim_info.get("name")
269
270 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
271 if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"):
272 ro_vim_item_update["vim_message"] = vim_info.get("error_msg")
273 elif vim_info["status"] == "DELETED":
274 ro_vim_item_update["vim_id"] = None
275 ro_vim_item_update["vim_message"] = "Deleted externally"
276 else:
277 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
278 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
279
280 if ro_vim_item_update:
281 self.logger.debug(
282 "ro_task={} {} get-net={}: status={} {}".format(
283 ro_task_id,
284 ro_task["target_id"],
285 vim_id,
286 ro_vim_item_update.get("vim_status"),
287 ro_vim_item_update.get("vim_message")
288 if ro_vim_item_update.get("vim_status") != "ACTIVE"
289 else "",
290 )
291 )
292
293 return task_status, ro_vim_item_update
294
295 def delete(self, ro_task, task_index):
296 task = ro_task["tasks"][task_index]
297 task_id = task["task_id"]
298 net_vim_id = ro_task["vim_info"]["vim_id"]
299 ro_vim_item_update_ok = {
300 "vim_status": "DELETED",
301 "created": False,
302 "vim_message": "DELETED",
303 "vim_id": None,
304 }
305
306 try:
307 if net_vim_id or ro_task["vim_info"]["created_items"]:
308 target_vim = self.my_vims[ro_task["target_id"]]
309 target_vim.delete_network(
310 net_vim_id, ro_task["vim_info"]["created_items"]
311 )
312 except vimconn.VimConnNotFoundException:
313 ro_vim_item_update_ok["vim_message"] = "already deleted"
314 except vimconn.VimConnException as e:
315 self.logger.error(
316 "ro_task={} vim={} del-net={}: {}".format(
317 ro_task["_id"], ro_task["target_id"], net_vim_id, e
318 )
319 )
320 ro_vim_item_update = {
321 "vim_status": "VIM_ERROR",
322 "vim_message": "Error while deleting: {}".format(e),
323 }
324
325 return "FAILED", ro_vim_item_update
326
327 self.logger.debug(
328 "task={} {} del-net={} {}".format(
329 task_id,
330 ro_task["target_id"],
331 net_vim_id,
332 ro_vim_item_update_ok.get("vim_message", ""),
333 )
334 )
335
336 return "DONE", ro_vim_item_update_ok
337
338
339 class VimInteractionVdu(VimInteractionBase):
340 max_retries_inject_ssh_key = 20 # 20 times
341 time_retries_inject_ssh_key = 30 # wevery 30 seconds
342
343 def new(self, ro_task, task_index, task_depends):
344 task = ro_task["tasks"][task_index]
345 task_id = task["task_id"]
346 created = False
347 created_items = {}
348 target_vim = self.my_vims[ro_task["target_id"]]
349
350 try:
351 created = True
352 params = task["params"]
353 params_copy = deepcopy(params)
354 net_list = params_copy["net_list"]
355
356 for net in net_list:
357 # change task_id into network_id
358 if "net_id" in net and net["net_id"].startswith("TASK-"):
359 network_id = task_depends[net["net_id"]]
360
361 if not network_id:
362 raise NsWorkerException(
363 "Cannot create VM because depends on a network not created or found "
364 "for {}".format(net["net_id"])
365 )
366
367 net["net_id"] = network_id
368
369 if params_copy["image_id"].startswith("TASK-"):
370 params_copy["image_id"] = task_depends[params_copy["image_id"]]
371
372 if params_copy["flavor_id"].startswith("TASK-"):
373 params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
374
375 affinity_group_list = params_copy["affinity_group_list"]
376 for affinity_group in affinity_group_list:
377 # change task_id into affinity_group_id
378 if "affinity_group_id" in affinity_group and affinity_group[
379 "affinity_group_id"
380 ].startswith("TASK-"):
381 affinity_group_id = task_depends[
382 affinity_group["affinity_group_id"]
383 ]
384
385 if not affinity_group_id:
386 raise NsWorkerException(
387 "found for {}".format(affinity_group["affinity_group_id"])
388 )
389
390 affinity_group["affinity_group_id"] = affinity_group_id
391
392 vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
393 interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
394
395 # add to created items previous_created_volumes (healing)
396 if task.get("previous_created_volumes"):
397 for k, v in task["previous_created_volumes"].items():
398 created_items[k] = v
399
400 ro_vim_item_update = {
401 "vim_id": vim_vm_id,
402 "vim_status": "BUILD",
403 "created": created,
404 "created_items": created_items,
405 "vim_details": None,
406 "vim_message": None,
407 "interfaces_vim_ids": interfaces,
408 "interfaces": [],
409 "interfaces_backup": [],
410 }
411 self.logger.debug(
412 "task={} {} new-vm={} created={}".format(
413 task_id, ro_task["target_id"], vim_vm_id, created
414 )
415 )
416
417 return "BUILD", ro_vim_item_update
418 except (vimconn.VimConnException, NsWorkerException) as e:
419 self.logger.debug(traceback.format_exc())
420 self.logger.error(
421 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
422 )
423 ro_vim_item_update = {
424 "vim_status": "VIM_ERROR",
425 "created": created,
426 "vim_message": str(e),
427 }
428
429 return "FAILED", ro_vim_item_update
430
431 def delete(self, ro_task, task_index):
432 task = ro_task["tasks"][task_index]
433 task_id = task["task_id"]
434 vm_vim_id = ro_task["vim_info"]["vim_id"]
435 ro_vim_item_update_ok = {
436 "vim_status": "DELETED",
437 "created": False,
438 "vim_message": "DELETED",
439 "vim_id": None,
440 }
441
442 try:
443 self.logger.debug(
444 "delete_vminstance: vm_vim_id={} created_items={}".format(
445 vm_vim_id, ro_task["vim_info"]["created_items"]
446 )
447 )
448 if vm_vim_id or ro_task["vim_info"]["created_items"]:
449 target_vim = self.my_vims[ro_task["target_id"]]
450 target_vim.delete_vminstance(
451 vm_vim_id,
452 ro_task["vim_info"]["created_items"],
453 ro_task["vim_info"].get("volumes_to_hold", []),
454 )
455 except vimconn.VimConnNotFoundException:
456 ro_vim_item_update_ok["vim_message"] = "already deleted"
457 except vimconn.VimConnException as e:
458 self.logger.error(
459 "ro_task={} vim={} del-vm={}: {}".format(
460 ro_task["_id"], ro_task["target_id"], vm_vim_id, e
461 )
462 )
463 ro_vim_item_update = {
464 "vim_status": "VIM_ERROR",
465 "vim_message": "Error while deleting: {}".format(e),
466 }
467
468 return "FAILED", ro_vim_item_update
469
470 self.logger.debug(
471 "task={} {} del-vm={} {}".format(
472 task_id,
473 ro_task["target_id"],
474 vm_vim_id,
475 ro_vim_item_update_ok.get("vim_message", ""),
476 )
477 )
478
479 return "DONE", ro_vim_item_update_ok
480
481 def refresh(self, ro_task):
482 """Call VIM to get vm status"""
483 ro_task_id = ro_task["_id"]
484 target_vim = self.my_vims[ro_task["target_id"]]
485 vim_id = ro_task["vim_info"]["vim_id"]
486
487 if not vim_id:
488 return None, None
489
490 vm_to_refresh_list = [vim_id]
491 try:
492 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
493 vim_info = vim_dict[vim_id]
494
495 if vim_info["status"] == "ACTIVE":
496 task_status = "DONE"
497 elif vim_info["status"] == "BUILD":
498 task_status = "BUILD"
499 else:
500 task_status = "FAILED"
501
502 # try to load and parse vim_information
503 try:
504 vim_info_info = yaml.safe_load(vim_info["vim_info"])
505 if vim_info_info.get("name"):
506 vim_info["name"] = vim_info_info["name"]
507 except Exception as vim_info_error:
508 self.logger.exception(
509 f"{vim_info_error} occured while getting the vim_info from yaml"
510 )
511 except vimconn.VimConnException as e:
512 # Mark all tasks at VIM_ERROR status
513 self.logger.error(
514 "ro_task={} vim={} get-vm={}: {}".format(
515 ro_task_id, ro_task["target_id"], vim_id, e
516 )
517 )
518 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
519 task_status = "FAILED"
520
521 ro_vim_item_update = {}
522
523 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
524 vim_interfaces = []
525 if vim_info.get("interfaces"):
526 for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
527 iface = next(
528 (
529 iface
530 for iface in vim_info["interfaces"]
531 if vim_iface_id == iface["vim_interface_id"]
532 ),
533 None,
534 )
535 # if iface:
536 # iface.pop("vim_info", None)
537 vim_interfaces.append(iface)
538
539 task_create = next(
540 t
541 for t in ro_task["tasks"]
542 if t and t["action"] == "CREATE" and t["status"] != "FINISHED"
543 )
544 if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None:
545 vim_interfaces[task_create["mgmt_vnf_interface"]][
546 "mgmt_vnf_interface"
547 ] = True
548
549 mgmt_vdu_iface = task_create.get(
550 "mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0)
551 )
552 if vim_interfaces:
553 vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
554
555 if ro_task["vim_info"]["interfaces"] != vim_interfaces:
556 ro_vim_item_update["interfaces"] = vim_interfaces
557
558 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
559 ro_vim_item_update["vim_status"] = vim_info["status"]
560
561 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
562 ro_vim_item_update["vim_name"] = vim_info.get("name")
563
564 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
565 if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"):
566 ro_vim_item_update["vim_message"] = vim_info.get("error_msg")
567 elif vim_info["status"] == "DELETED":
568 ro_vim_item_update["vim_id"] = None
569 ro_vim_item_update["vim_message"] = "Deleted externally"
570 else:
571 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
572 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
573
574 if ro_vim_item_update:
575 self.logger.debug(
576 "ro_task={} {} get-vm={}: status={} {}".format(
577 ro_task_id,
578 ro_task["target_id"],
579 vim_id,
580 ro_vim_item_update.get("vim_status"),
581 ro_vim_item_update.get("vim_message")
582 if ro_vim_item_update.get("vim_status") != "ACTIVE"
583 else "",
584 )
585 )
586
587 return task_status, ro_vim_item_update
588
589 def exec(self, ro_task, task_index, task_depends):
590 task = ro_task["tasks"][task_index]
591 task_id = task["task_id"]
592 target_vim = self.my_vims[ro_task["target_id"]]
593 db_task_update = {"retries": 0}
594 retries = task.get("retries", 0)
595
596 try:
597 params = task["params"]
598 params_copy = deepcopy(params)
599 params_copy["ro_key"] = self.db.decrypt(
600 params_copy.pop("private_key"),
601 params_copy.pop("schema_version"),
602 params_copy.pop("salt"),
603 )
604 params_copy["ip_addr"] = params_copy.pop("ip_address")
605 target_vim.inject_user_key(**params_copy)
606 self.logger.debug(
607 "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"])
608 )
609
610 return (
611 "DONE",
612 None,
613 db_task_update,
614 ) # params_copy["key"]
615 except (vimconn.VimConnException, NsWorkerException) as e:
616 retries += 1
617
618 self.logger.debug(traceback.format_exc())
619 if retries < self.max_retries_inject_ssh_key:
620 return (
621 "BUILD",
622 None,
623 {
624 "retries": retries,
625 "next_retry": self.time_retries_inject_ssh_key,
626 },
627 )
628
629 self.logger.error(
630 "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e)
631 )
632 ro_vim_item_update = {"vim_message": str(e)}
633
634 return "FAILED", ro_vim_item_update, db_task_update
635
636
637 class VimInteractionImage(VimInteractionBase):
638 def new(self, ro_task, task_index, task_depends):
639 task = ro_task["tasks"][task_index]
640 task_id = task["task_id"]
641 created = False
642 created_items = {}
643 target_vim = self.my_vims[ro_task["target_id"]]
644
645 try:
646 # FIND
647 if task.get("find_params"):
648 vim_images = target_vim.get_image_list(**task["find_params"])
649
650 if not vim_images:
651 raise NsWorkerExceptionNotFound(
652 "Image not found with this criteria: '{}'".format(
653 task["find_params"]
654 )
655 )
656 elif len(vim_images) > 1:
657 raise NsWorkerException(
658 "More than one image found with this criteria: '{}'".format(
659 task["find_params"]
660 )
661 )
662 else:
663 vim_image_id = vim_images[0]["id"]
664
665 ro_vim_item_update = {
666 "vim_id": vim_image_id,
667 "vim_status": "DONE",
668 "created": created,
669 "created_items": created_items,
670 "vim_details": None,
671 "vim_message": None,
672 }
673 self.logger.debug(
674 "task={} {} new-image={} created={}".format(
675 task_id, ro_task["target_id"], vim_image_id, created
676 )
677 )
678
679 return "DONE", ro_vim_item_update
680 except (NsWorkerException, vimconn.VimConnException) as e:
681 self.logger.error(
682 "task={} {} new-image: {}".format(task_id, ro_task["target_id"], e)
683 )
684 ro_vim_item_update = {
685 "vim_status": "VIM_ERROR",
686 "created": created,
687 "vim_message": str(e),
688 }
689
690 return "FAILED", ro_vim_item_update
691
692
693 class VimInteractionFlavor(VimInteractionBase):
694 def delete(self, ro_task, task_index):
695 task = ro_task["tasks"][task_index]
696 task_id = task["task_id"]
697 flavor_vim_id = ro_task["vim_info"]["vim_id"]
698 ro_vim_item_update_ok = {
699 "vim_status": "DELETED",
700 "created": False,
701 "vim_message": "DELETED",
702 "vim_id": None,
703 }
704
705 try:
706 if flavor_vim_id:
707 target_vim = self.my_vims[ro_task["target_id"]]
708 target_vim.delete_flavor(flavor_vim_id)
709 except vimconn.VimConnNotFoundException:
710 ro_vim_item_update_ok["vim_message"] = "already deleted"
711 except vimconn.VimConnException as e:
712 self.logger.error(
713 "ro_task={} vim={} del-flavor={}: {}".format(
714 ro_task["_id"], ro_task["target_id"], flavor_vim_id, e
715 )
716 )
717 ro_vim_item_update = {
718 "vim_status": "VIM_ERROR",
719 "vim_message": "Error while deleting: {}".format(e),
720 }
721
722 return "FAILED", ro_vim_item_update
723
724 self.logger.debug(
725 "task={} {} del-flavor={} {}".format(
726 task_id,
727 ro_task["target_id"],
728 flavor_vim_id,
729 ro_vim_item_update_ok.get("vim_message", ""),
730 )
731 )
732
733 return "DONE", ro_vim_item_update_ok
734
735 def new(self, ro_task, task_index, task_depends):
736 task = ro_task["tasks"][task_index]
737 task_id = task["task_id"]
738 created = False
739 created_items = {}
740 target_vim = self.my_vims[ro_task["target_id"]]
741
742 try:
743 # FIND
744 vim_flavor_id = None
745
746 if task.get("find_params"):
747 try:
748 flavor_data = task["find_params"]["flavor_data"]
749 vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
750 except vimconn.VimConnNotFoundException:
751 self.logger.exception("VimConnNotFoundException occured.")
752
753 if not vim_flavor_id and task.get("params"):
754 # CREATE
755 flavor_data = task["params"]["flavor_data"]
756 vim_flavor_id = target_vim.new_flavor(flavor_data)
757 created = True
758
759 ro_vim_item_update = {
760 "vim_id": vim_flavor_id,
761 "vim_status": "DONE",
762 "created": created,
763 "created_items": created_items,
764 "vim_details": None,
765 "vim_message": None,
766 }
767 self.logger.debug(
768 "task={} {} new-flavor={} created={}".format(
769 task_id, ro_task["target_id"], vim_flavor_id, created
770 )
771 )
772
773 return "DONE", ro_vim_item_update
774 except (vimconn.VimConnException, NsWorkerException) as e:
775 self.logger.error(
776 "task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e)
777 )
778 ro_vim_item_update = {
779 "vim_status": "VIM_ERROR",
780 "created": created,
781 "vim_message": str(e),
782 }
783
784 return "FAILED", ro_vim_item_update
785
786
787 class VimInteractionAffinityGroup(VimInteractionBase):
788 def delete(self, ro_task, task_index):
789 task = ro_task["tasks"][task_index]
790 task_id = task["task_id"]
791 affinity_group_vim_id = ro_task["vim_info"]["vim_id"]
792 ro_vim_item_update_ok = {
793 "vim_status": "DELETED",
794 "created": False,
795 "vim_message": "DELETED",
796 "vim_id": None,
797 }
798
799 try:
800 if affinity_group_vim_id:
801 target_vim = self.my_vims[ro_task["target_id"]]
802 target_vim.delete_affinity_group(affinity_group_vim_id)
803 except vimconn.VimConnNotFoundException:
804 ro_vim_item_update_ok["vim_message"] = "already deleted"
805 except vimconn.VimConnException as e:
806 self.logger.error(
807 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format(
808 ro_task["_id"], ro_task["target_id"], affinity_group_vim_id, e
809 )
810 )
811 ro_vim_item_update = {
812 "vim_status": "VIM_ERROR",
813 "vim_message": "Error while deleting: {}".format(e),
814 }
815
816 return "FAILED", ro_vim_item_update
817
818 self.logger.debug(
819 "task={} {} del-affinity-or-anti-affinity-group={} {}".format(
820 task_id,
821 ro_task["target_id"],
822 affinity_group_vim_id,
823 ro_vim_item_update_ok.get("vim_message", ""),
824 )
825 )
826
827 return "DONE", ro_vim_item_update_ok
828
829 def new(self, ro_task, task_index, task_depends):
830 task = ro_task["tasks"][task_index]
831 task_id = task["task_id"]
832 created = False
833 created_items = {}
834 target_vim = self.my_vims[ro_task["target_id"]]
835
836 try:
837 affinity_group_vim_id = None
838 affinity_group_data = None
839
840 if task.get("params"):
841 affinity_group_data = task["params"].get("affinity_group_data")
842
843 if affinity_group_data and affinity_group_data.get("vim-affinity-group-id"):
844 try:
845 param_affinity_group_id = task["params"]["affinity_group_data"].get(
846 "vim-affinity-group-id"
847 )
848 affinity_group_vim_id = target_vim.get_affinity_group(
849 param_affinity_group_id
850 ).get("id")
851 except vimconn.VimConnNotFoundException:
852 self.logger.error(
853 "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}"
854 "could not be found at VIM. Creating a new one.".format(
855 task_id, ro_task["target_id"], param_affinity_group_id
856 )
857 )
858
859 if not affinity_group_vim_id and affinity_group_data:
860 affinity_group_vim_id = target_vim.new_affinity_group(
861 affinity_group_data
862 )
863 created = True
864
865 ro_vim_item_update = {
866 "vim_id": affinity_group_vim_id,
867 "vim_status": "DONE",
868 "created": created,
869 "created_items": created_items,
870 "vim_details": None,
871 "vim_message": None,
872 }
873 self.logger.debug(
874 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format(
875 task_id, ro_task["target_id"], affinity_group_vim_id, created
876 )
877 )
878
879 return "DONE", ro_vim_item_update
880 except (vimconn.VimConnException, NsWorkerException) as e:
881 self.logger.error(
882 "task={} vim={} new-affinity-or-anti-affinity-group:"
883 " {}".format(task_id, ro_task["target_id"], e)
884 )
885 ro_vim_item_update = {
886 "vim_status": "VIM_ERROR",
887 "created": created,
888 "vim_message": str(e),
889 }
890
891 return "FAILED", ro_vim_item_update
892
893
894 class VimInteractionUpdateVdu(VimInteractionBase):
895 def exec(self, ro_task, task_index, task_depends):
896 task = ro_task["tasks"][task_index]
897 task_id = task["task_id"]
898 db_task_update = {"retries": 0}
899 created = False
900 created_items = {}
901 target_vim = self.my_vims[ro_task["target_id"]]
902
903 try:
904 if task.get("params"):
905 vim_vm_id = task["params"].get("vim_vm_id")
906 action = task["params"].get("action")
907 context = {action: action}
908 target_vim.action_vminstance(vim_vm_id, context)
909 # created = True
910 ro_vim_item_update = {
911 "vim_id": vim_vm_id,
912 "vim_status": "DONE",
913 "created": created,
914 "created_items": created_items,
915 "vim_details": None,
916 "vim_message": None,
917 }
918 self.logger.debug(
919 "task={} {} vm-migration done".format(task_id, ro_task["target_id"])
920 )
921 return "DONE", ro_vim_item_update, db_task_update
922 except (vimconn.VimConnException, NsWorkerException) as e:
923 self.logger.error(
924 "task={} vim={} VM Migration:"
925 " {}".format(task_id, ro_task["target_id"], e)
926 )
927 ro_vim_item_update = {
928 "vim_status": "VIM_ERROR",
929 "created": created,
930 "vim_message": str(e),
931 }
932
933 return "FAILED", ro_vim_item_update, db_task_update
934
935
936 class VimInteractionSdnNet(VimInteractionBase):
937 @staticmethod
938 def _match_pci(port_pci, mapping):
939 """
940 Check if port_pci matches with mapping
941 mapping can have brackets to indicate that several chars are accepted. e.g
942 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
943 :param port_pci: text
944 :param mapping: text, can contain brackets to indicate several chars are available
945 :return: True if matches, False otherwise
946 """
947 if not port_pci or not mapping:
948 return False
949 if port_pci == mapping:
950 return True
951
952 mapping_index = 0
953 pci_index = 0
954 while True:
955 bracket_start = mapping.find("[", mapping_index)
956
957 if bracket_start == -1:
958 break
959
960 bracket_end = mapping.find("]", bracket_start)
961 if bracket_end == -1:
962 break
963
964 length = bracket_start - mapping_index
965 if (
966 length
967 and port_pci[pci_index : pci_index + length]
968 != mapping[mapping_index:bracket_start]
969 ):
970 return False
971
972 if (
973 port_pci[pci_index + length]
974 not in mapping[bracket_start + 1 : bracket_end]
975 ):
976 return False
977
978 pci_index += length + 1
979 mapping_index = bracket_end + 1
980
981 if port_pci[pci_index:] != mapping[mapping_index:]:
982 return False
983
984 return True
985
986 def _get_interfaces(self, vlds_to_connect, vim_account_id):
987 """
988 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
989 :param vim_account_id:
990 :return:
991 """
992 interfaces = []
993
994 for vld in vlds_to_connect:
995 table, _, db_id = vld.partition(":")
996 db_id, _, vld = db_id.partition(":")
997 _, _, vld_id = vld.partition(".")
998
999 if table == "vnfrs":
1000 q_filter = {"vim-account-id": vim_account_id, "_id": db_id}
1001 iface_key = "vnf-vld-id"
1002 else: # table == "nsrs"
1003 q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id}
1004 iface_key = "ns-vld-id"
1005
1006 db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter)
1007
1008 for db_vnfr in db_vnfrs:
1009 for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())):
1010 for iface_index, interface in enumerate(vdur["interfaces"]):
1011 if interface.get(iface_key) == vld_id and interface.get(
1012 "type"
1013 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
1014 # only SR-IOV o PT
1015 interface_ = interface.copy()
1016 interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
1017 db_vnfr["_id"], vdu_index, iface_index
1018 )
1019
1020 if vdur.get("status") == "ERROR":
1021 interface_["status"] = "ERROR"
1022
1023 interfaces.append(interface_)
1024
1025 return interfaces
1026
1027 def refresh(self, ro_task):
1028 # look for task create
1029 task_create_index, _ = next(
1030 i_t
1031 for i_t in enumerate(ro_task["tasks"])
1032 if i_t[1]
1033 and i_t[1]["action"] == "CREATE"
1034 and i_t[1]["status"] != "FINISHED"
1035 )
1036
1037 return self.new(ro_task, task_create_index, None)
1038
1039 def new(self, ro_task, task_index, task_depends):
1040 task = ro_task["tasks"][task_index]
1041 task_id = task["task_id"]
1042 target_vim = self.my_vims[ro_task["target_id"]]
1043
1044 sdn_net_id = ro_task["vim_info"]["vim_id"]
1045
1046 created_items = ro_task["vim_info"].get("created_items")
1047 connected_ports = ro_task["vim_info"].get("connected_ports", [])
1048 new_connected_ports = []
1049 last_update = ro_task["vim_info"].get("last_update", 0)
1050 sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD"
1051 error_list = []
1052 created = ro_task["vim_info"].get("created", False)
1053
1054 try:
1055 # CREATE
1056 params = task["params"]
1057 vlds_to_connect = params.get("vlds", [])
1058 associated_vim = params.get("target_vim")
1059 # external additional ports
1060 additional_ports = params.get("sdn-ports") or ()
1061 _, _, vim_account_id = (
1062 (None, None, None)
1063 if associated_vim is None
1064 else associated_vim.partition(":")
1065 )
1066
1067 if associated_vim:
1068 # get associated VIM
1069 if associated_vim not in self.db_vims:
1070 self.db_vims[associated_vim] = self.db.get_one(
1071 "vim_accounts", {"_id": vim_account_id}
1072 )
1073
1074 db_vim = self.db_vims[associated_vim]
1075
1076 # look for ports to connect
1077 ports = self._get_interfaces(vlds_to_connect, vim_account_id)
1078 # print(ports)
1079
1080 sdn_ports = []
1081 pending_ports = error_ports = 0
1082 vlan_used = None
1083 sdn_need_update = False
1084
1085 for port in ports:
1086 vlan_used = port.get("vlan") or vlan_used
1087
1088 # TODO. Do not connect if already done
1089 if not port.get("compute_node") or not port.get("pci"):
1090 if port.get("status") == "ERROR":
1091 error_ports += 1
1092 else:
1093 pending_ports += 1
1094 continue
1095
1096 pmap = None
1097 compute_node_mappings = next(
1098 (
1099 c
1100 for c in db_vim["config"].get("sdn-port-mapping", ())
1101 if c and c["compute_node"] == port["compute_node"]
1102 ),
1103 None,
1104 )
1105
1106 if compute_node_mappings:
1107 # process port_mapping pci of type 0000:af:1[01].[1357]
1108 pmap = next(
1109 (
1110 p
1111 for p in compute_node_mappings["ports"]
1112 if self._match_pci(port["pci"], p.get("pci"))
1113 ),
1114 None,
1115 )
1116
1117 if not pmap:
1118 if not db_vim["config"].get("mapping_not_needed"):
1119 error_list.append(
1120 "Port mapping not found for compute_node={} pci={}".format(
1121 port["compute_node"], port["pci"]
1122 )
1123 )
1124 continue
1125
1126 pmap = {}
1127
1128 service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"])
1129 new_port = {
1130 "service_endpoint_id": pmap.get("service_endpoint_id")
1131 or service_endpoint_id,
1132 "service_endpoint_encapsulation_type": "dot1q"
1133 if port["type"] == "SR-IOV"
1134 else None,
1135 "service_endpoint_encapsulation_info": {
1136 "vlan": port.get("vlan"),
1137 "mac": port.get("mac-address"),
1138 "device_id": pmap.get("device_id") or port["compute_node"],
1139 "device_interface_id": pmap.get("device_interface_id")
1140 or port["pci"],
1141 "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"),
1142 "switch_port": pmap.get("switch_port"),
1143 "service_mapping_info": pmap.get("service_mapping_info"),
1144 },
1145 }
1146
1147 # TODO
1148 # if port["modified_at"] > last_update:
1149 # sdn_need_update = True
1150 new_connected_ports.append(port["id"]) # TODO
1151 sdn_ports.append(new_port)
1152
1153 if error_ports:
1154 error_list.append(
1155 "{} interfaces have not been created as VDU is on ERROR status".format(
1156 error_ports
1157 )
1158 )
1159
1160 # connect external ports
1161 for index, additional_port in enumerate(additional_ports):
1162 additional_port_id = additional_port.get(
1163 "service_endpoint_id"
1164 ) or "external-{}".format(index)
1165 sdn_ports.append(
1166 {
1167 "service_endpoint_id": additional_port_id,
1168 "service_endpoint_encapsulation_type": additional_port.get(
1169 "service_endpoint_encapsulation_type", "dot1q"
1170 ),
1171 "service_endpoint_encapsulation_info": {
1172 "vlan": additional_port.get("vlan") or vlan_used,
1173 "mac": additional_port.get("mac_address"),
1174 "device_id": additional_port.get("device_id"),
1175 "device_interface_id": additional_port.get(
1176 "device_interface_id"
1177 ),
1178 "switch_dpid": additional_port.get("switch_dpid")
1179 or additional_port.get("switch_id"),
1180 "switch_port": additional_port.get("switch_port"),
1181 "service_mapping_info": additional_port.get(
1182 "service_mapping_info"
1183 ),
1184 },
1185 }
1186 )
1187 new_connected_ports.append(additional_port_id)
1188 sdn_info = ""
1189
1190 # if there are more ports to connect or they have been modified, call create/update
1191 if error_list:
1192 sdn_status = "ERROR"
1193 sdn_info = "; ".join(error_list)
1194 elif set(connected_ports) != set(new_connected_ports) or sdn_need_update:
1195 last_update = time.time()
1196
1197 if not sdn_net_id:
1198 if len(sdn_ports) < 2:
1199 sdn_status = "ACTIVE"
1200
1201 if not pending_ports:
1202 self.logger.debug(
1203 "task={} {} new-sdn-net done, less than 2 ports".format(
1204 task_id, ro_task["target_id"]
1205 )
1206 )
1207 else:
1208 net_type = params.get("type") or "ELAN"
1209 (
1210 sdn_net_id,
1211 created_items,
1212 ) = target_vim.create_connectivity_service(net_type, sdn_ports)
1213 created = True
1214 self.logger.debug(
1215 "task={} {} new-sdn-net={} created={}".format(
1216 task_id, ro_task["target_id"], sdn_net_id, created
1217 )
1218 )
1219 else:
1220 created_items = target_vim.edit_connectivity_service(
1221 sdn_net_id, conn_info=created_items, connection_points=sdn_ports
1222 )
1223 created = True
1224 self.logger.debug(
1225 "task={} {} update-sdn-net={} created={}".format(
1226 task_id, ro_task["target_id"], sdn_net_id, created
1227 )
1228 )
1229
1230 connected_ports = new_connected_ports
1231 elif sdn_net_id:
1232 wim_status_dict = target_vim.get_connectivity_service_status(
1233 sdn_net_id, conn_info=created_items
1234 )
1235 sdn_status = wim_status_dict["sdn_status"]
1236
1237 if wim_status_dict.get("sdn_info"):
1238 sdn_info = str(wim_status_dict.get("sdn_info")) or ""
1239
1240 if wim_status_dict.get("error_msg"):
1241 sdn_info = wim_status_dict.get("error_msg") or ""
1242
1243 if pending_ports:
1244 if sdn_status != "ERROR":
1245 sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1246 len(ports) - pending_ports, len(ports)
1247 )
1248
1249 if sdn_status == "ACTIVE":
1250 sdn_status = "BUILD"
1251
1252 ro_vim_item_update = {
1253 "vim_id": sdn_net_id,
1254 "vim_status": sdn_status,
1255 "created": created,
1256 "created_items": created_items,
1257 "connected_ports": connected_ports,
1258 "vim_details": sdn_info,
1259 "vim_message": None,
1260 "last_update": last_update,
1261 }
1262
1263 return sdn_status, ro_vim_item_update
1264 except Exception as e:
1265 self.logger.error(
1266 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e),
1267 exc_info=not isinstance(
1268 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1269 ),
1270 )
1271 ro_vim_item_update = {
1272 "vim_status": "VIM_ERROR",
1273 "created": created,
1274 "vim_message": str(e),
1275 }
1276
1277 return "FAILED", ro_vim_item_update
1278
1279 def delete(self, ro_task, task_index):
1280 task = ro_task["tasks"][task_index]
1281 task_id = task["task_id"]
1282 sdn_vim_id = ro_task["vim_info"].get("vim_id")
1283 ro_vim_item_update_ok = {
1284 "vim_status": "DELETED",
1285 "created": False,
1286 "vim_message": "DELETED",
1287 "vim_id": None,
1288 }
1289
1290 try:
1291 if sdn_vim_id:
1292 target_vim = self.my_vims[ro_task["target_id"]]
1293 target_vim.delete_connectivity_service(
1294 sdn_vim_id, ro_task["vim_info"].get("created_items")
1295 )
1296
1297 except Exception as e:
1298 if (
1299 isinstance(e, sdnconn.SdnConnectorError)
1300 and e.http_code == HTTPStatus.NOT_FOUND.value
1301 ):
1302 ro_vim_item_update_ok["vim_message"] = "already deleted"
1303 else:
1304 self.logger.error(
1305 "ro_task={} vim={} del-sdn-net={}: {}".format(
1306 ro_task["_id"], ro_task["target_id"], sdn_vim_id, e
1307 ),
1308 exc_info=not isinstance(
1309 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1310 ),
1311 )
1312 ro_vim_item_update = {
1313 "vim_status": "VIM_ERROR",
1314 "vim_message": "Error while deleting: {}".format(e),
1315 }
1316
1317 return "FAILED", ro_vim_item_update
1318
1319 self.logger.debug(
1320 "task={} {} del-sdn-net={} {}".format(
1321 task_id,
1322 ro_task["target_id"],
1323 sdn_vim_id,
1324 ro_vim_item_update_ok.get("vim_message", ""),
1325 )
1326 )
1327
1328 return "DONE", ro_vim_item_update_ok
1329
1330
1331 class VimInteractionMigration(VimInteractionBase):
1332 def exec(self, ro_task, task_index, task_depends):
1333 task = ro_task["tasks"][task_index]
1334 task_id = task["task_id"]
1335 db_task_update = {"retries": 0}
1336 target_vim = self.my_vims[ro_task["target_id"]]
1337 vim_interfaces = []
1338 created = False
1339 created_items = {}
1340 refreshed_vim_info = {}
1341
1342 try:
1343 if task.get("params"):
1344 vim_vm_id = task["params"].get("vim_vm_id")
1345 migrate_host = task["params"].get("migrate_host")
1346 _, migrated_compute_node = target_vim.migrate_instance(
1347 vim_vm_id, migrate_host
1348 )
1349
1350 if migrated_compute_node:
1351 # When VM is migrated, vdu["vim_info"] needs to be updated
1352 vdu_old_vim_info = task["params"]["vdu_vim_info"].get(
1353 ro_task["target_id"]
1354 )
1355
1356 # Refresh VM to get new vim_info
1357 vm_to_refresh_list = [vim_vm_id]
1358 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
1359 refreshed_vim_info = vim_dict[vim_vm_id]
1360
1361 if refreshed_vim_info.get("interfaces"):
1362 for old_iface in vdu_old_vim_info.get("interfaces"):
1363 iface = next(
1364 (
1365 iface
1366 for iface in refreshed_vim_info["interfaces"]
1367 if old_iface["vim_interface_id"]
1368 == iface["vim_interface_id"]
1369 ),
1370 None,
1371 )
1372 vim_interfaces.append(iface)
1373
1374 ro_vim_item_update = {
1375 "vim_id": vim_vm_id,
1376 "vim_status": "ACTIVE",
1377 "created": created,
1378 "created_items": created_items,
1379 "vim_details": None,
1380 "vim_message": None,
1381 }
1382
1383 if refreshed_vim_info and refreshed_vim_info.get("status") not in (
1384 "ERROR",
1385 "VIM_ERROR",
1386 ):
1387 ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"]
1388
1389 if vim_interfaces:
1390 ro_vim_item_update["interfaces"] = vim_interfaces
1391
1392 self.logger.debug(
1393 "task={} {} vm-migration done".format(task_id, ro_task["target_id"])
1394 )
1395
1396 return "DONE", ro_vim_item_update, db_task_update
1397
1398 except (vimconn.VimConnException, NsWorkerException) as e:
1399 self.logger.error(
1400 "task={} vim={} VM Migration:"
1401 " {}".format(task_id, ro_task["target_id"], e)
1402 )
1403 ro_vim_item_update = {
1404 "vim_status": "VIM_ERROR",
1405 "created": created,
1406 "vim_message": str(e),
1407 }
1408
1409 return "FAILED", ro_vim_item_update, db_task_update
1410
1411
1412 class VimInteractionResize(VimInteractionBase):
1413 def exec(self, ro_task, task_index, task_depends):
1414 task = ro_task["tasks"][task_index]
1415 task_id = task["task_id"]
1416 db_task_update = {"retries": 0}
1417 created = False
1418 target_flavor_uuid = None
1419 created_items = {}
1420 refreshed_vim_info = {}
1421 target_vim = self.my_vims[ro_task["target_id"]]
1422
1423 try:
1424 if task.get("params"):
1425 vim_vm_id = task["params"].get("vim_vm_id")
1426 flavor_dict = task["params"].get("flavor_dict")
1427 self.logger.info("flavor_dict %s", flavor_dict)
1428
1429 try:
1430 target_flavor_uuid = target_vim.get_flavor_id_from_data(flavor_dict)
1431 except Exception as e:
1432 self.logger.info("Cannot find any flavor matching %s.", str(e))
1433 try:
1434 target_flavor_uuid = target_vim.new_flavor(flavor_dict)
1435 except Exception as e:
1436 self.logger.error("Error creating flavor at VIM %s.", str(e))
1437
1438 if target_flavor_uuid is not None:
1439 resized_status = target_vim.resize_instance(
1440 vim_vm_id, target_flavor_uuid
1441 )
1442
1443 if resized_status:
1444 # Refresh VM to get new vim_info
1445 vm_to_refresh_list = [vim_vm_id]
1446 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
1447 refreshed_vim_info = vim_dict[vim_vm_id]
1448
1449 ro_vim_item_update = {
1450 "vim_id": vim_vm_id,
1451 "vim_status": "DONE",
1452 "created": created,
1453 "created_items": created_items,
1454 "vim_details": None,
1455 "vim_message": None,
1456 }
1457
1458 if refreshed_vim_info and refreshed_vim_info.get("status") not in (
1459 "ERROR",
1460 "VIM_ERROR",
1461 ):
1462 ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"]
1463
1464 self.logger.debug(
1465 "task={} {} resize done".format(task_id, ro_task["target_id"])
1466 )
1467 return "DONE", ro_vim_item_update, db_task_update
1468 except (vimconn.VimConnException, NsWorkerException) as e:
1469 self.logger.error(
1470 "task={} vim={} Resize:" " {}".format(task_id, ro_task["target_id"], e)
1471 )
1472 ro_vim_item_update = {
1473 "vim_status": "VIM_ERROR",
1474 "created": created,
1475 "vim_message": str(e),
1476 }
1477
1478 return "FAILED", ro_vim_item_update, db_task_update
1479
1480
1481 class ConfigValidate:
1482 def __init__(self, config: Dict):
1483 self.conf = config
1484
1485 @property
1486 def active(self):
1487 # default 1 min, allowed >= 60 or -1, -1 disables periodic checks
1488 if (
1489 self.conf["period"]["refresh_active"] >= 60
1490 or self.conf["period"]["refresh_active"] == -1
1491 ):
1492 return self.conf["period"]["refresh_active"]
1493
1494 return 60
1495
1496 @property
1497 def build(self):
1498 return self.conf["period"]["refresh_build"]
1499
1500 @property
1501 def image(self):
1502 return self.conf["period"]["refresh_image"]
1503
1504 @property
1505 def error(self):
1506 return self.conf["period"]["refresh_error"]
1507
1508 @property
1509 def queue_size(self):
1510 return self.conf["period"]["queue_size"]
1511
1512
1513 class NsWorker(threading.Thread):
1514 def __init__(self, worker_index, config, plugins, db):
1515 """
1516 :param worker_index: thread index
1517 :param config: general configuration of RO, among others the process_id with the docker id where it runs
1518 :param plugins: global shared dict with the loaded plugins
1519 :param db: database class instance to use
1520 """
1521 threading.Thread.__init__(self)
1522 self.config = config
1523 self.plugins = plugins
1524 self.plugin_name = "unknown"
1525 self.logger = logging.getLogger("ro.worker{}".format(worker_index))
1526 self.worker_index = worker_index
1527 # refresh periods for created items
1528 self.refresh_config = ConfigValidate(config)
1529 self.task_queue = queue.Queue(self.refresh_config.queue_size)
1530 # targetvim: vimplugin class
1531 self.my_vims = {}
1532 # targetvim: vim information from database
1533 self.db_vims = {}
1534 # targetvim list
1535 self.vim_targets = []
1536 self.my_id = config["process_id"] + ":" + str(worker_index)
1537 self.db = db
1538 self.item2class = {
1539 "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger),
1540 "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger),
1541 "image": VimInteractionImage(
1542 self.db, self.my_vims, self.db_vims, self.logger
1543 ),
1544 "flavor": VimInteractionFlavor(
1545 self.db, self.my_vims, self.db_vims, self.logger
1546 ),
1547 "sdn_net": VimInteractionSdnNet(
1548 self.db, self.my_vims, self.db_vims, self.logger
1549 ),
1550 "update": VimInteractionUpdateVdu(
1551 self.db, self.my_vims, self.db_vims, self.logger
1552 ),
1553 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
1554 self.db, self.my_vims, self.db_vims, self.logger
1555 ),
1556 "migrate": VimInteractionMigration(
1557 self.db, self.my_vims, self.db_vims, self.logger
1558 ),
1559 "verticalscale": VimInteractionResize(
1560 self.db, self.my_vims, self.db_vims, self.logger
1561 ),
1562 }
1563 self.time_last_task_processed = None
1564 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1565 self.tasks_to_delete = []
1566 # it is idle when there are not vim_targets associated
1567 self.idle = True
1568 self.task_locked_time = config["global"]["task_locked_time"]
1569
1570 def insert_task(self, task):
1571 try:
1572 self.task_queue.put(task, False)
1573 return None
1574 except queue.Full:
1575 raise NsWorkerException("timeout inserting a task")
1576
1577 def terminate(self):
1578 self.insert_task("exit")
1579
1580 def del_task(self, task):
1581 with self.task_lock:
1582 if task["status"] == "SCHEDULED":
1583 task["status"] = "SUPERSEDED"
1584 return True
1585 else: # task["status"] == "processing"
1586 self.task_lock.release()
1587 return False
1588
1589 def _process_vim_config(self, target_id, db_vim):
1590 """
1591 Process vim config, creating vim configuration files as ca_cert
1592 :param target_id: vim/sdn/wim + id
1593 :param db_vim: Vim dictionary obtained from database
1594 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1595 """
1596 if not db_vim.get("config"):
1597 return
1598
1599 file_name = ""
1600
1601 try:
1602 if db_vim["config"].get("ca_cert_content"):
1603 file_name = "{}:{}".format(target_id, self.worker_index)
1604
1605 try:
1606 mkdir(file_name)
1607 except FileExistsError:
1608 self.logger.exception(
1609 "FileExistsError occured while processing vim_config."
1610 )
1611
1612 file_name = file_name + "/ca_cert"
1613
1614 with open(file_name, "w") as f:
1615 f.write(db_vim["config"]["ca_cert_content"])
1616 del db_vim["config"]["ca_cert_content"]
1617 db_vim["config"]["ca_cert"] = file_name
1618 except Exception as e:
1619 raise NsWorkerException(
1620 "Error writing to file '{}': {}".format(file_name, e)
1621 )
1622
1623 def _load_plugin(self, name, type="vim"):
1624 # type can be vim or sdn
1625 if "rovim_dummy" not in self.plugins:
1626 self.plugins["rovim_dummy"] = VimDummyConnector
1627
1628 if "rosdn_dummy" not in self.plugins:
1629 self.plugins["rosdn_dummy"] = SdnDummyConnector
1630
1631 if name in self.plugins:
1632 return self.plugins[name]
1633
1634 try:
1635 for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name):
1636 self.plugins[name] = ep.load()
1637 except Exception as e:
1638 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e))
1639
1640 if name and name not in self.plugins:
1641 raise NsWorkerException(
1642 "Plugin 'osm_{n}' has not been installed".format(n=name)
1643 )
1644
1645 return self.plugins[name]
1646
1647 def _unload_vim(self, target_id):
1648 """
1649 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1650 :param target_id: Contains type:_id; where type can be 'vim', ...
1651 :return: None.
1652 """
1653 try:
1654 self.db_vims.pop(target_id, None)
1655 self.my_vims.pop(target_id, None)
1656
1657 if target_id in self.vim_targets:
1658 self.vim_targets.remove(target_id)
1659
1660 self.logger.info("Unloaded {}".format(target_id))
1661 rmtree("{}:{}".format(target_id, self.worker_index))
1662 except FileNotFoundError:
1663 # This is raised by rmtree if folder does not exist.
1664 self.logger.exception("FileNotFoundError occured while unloading VIM.")
1665 except Exception as e:
1666 self.logger.error("Cannot unload {}: {}".format(target_id, e))
1667
1668 def _check_vim(self, target_id):
1669 """
1670 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1671 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1672 :return: None.
1673 """
1674 target, _, _id = target_id.partition(":")
1675 now = time.time()
1676 update_dict = {}
1677 unset_dict = {}
1678 op_text = ""
1679 step = ""
1680 loaded = target_id in self.vim_targets
1681 target_database = (
1682 "vim_accounts"
1683 if target == "vim"
1684 else "wim_accounts"
1685 if target == "wim"
1686 else "sdns"
1687 )
1688
1689 try:
1690 step = "Getting {} from db".format(target_id)
1691 db_vim = self.db.get_one(target_database, {"_id": _id})
1692
1693 for op_index, operation in enumerate(
1694 db_vim["_admin"].get("operations", ())
1695 ):
1696 if operation["operationState"] != "PROCESSING":
1697 continue
1698
1699 locked_at = operation.get("locked_at")
1700
1701 if locked_at is not None and locked_at >= now - self.task_locked_time:
1702 # some other thread is doing this operation
1703 return
1704
1705 # lock
1706 op_text = "_admin.operations.{}.".format(op_index)
1707
1708 if not self.db.set_one(
1709 target_database,
1710 q_filter={
1711 "_id": _id,
1712 op_text + "operationState": "PROCESSING",
1713 op_text + "locked_at": locked_at,
1714 },
1715 update_dict={
1716 op_text + "locked_at": now,
1717 "admin.current_operation": op_index,
1718 },
1719 fail_on_empty=False,
1720 ):
1721 return
1722
1723 unset_dict[op_text + "locked_at"] = None
1724 unset_dict["current_operation"] = None
1725 step = "Loading " + target_id
1726 error_text = self._load_vim(target_id)
1727
1728 if not error_text:
1729 step = "Checking connectivity"
1730
1731 if target == "vim":
1732 self.my_vims[target_id].check_vim_connectivity()
1733 else:
1734 self.my_vims[target_id].check_credentials()
1735
1736 update_dict["_admin.operationalState"] = "ENABLED"
1737 update_dict["_admin.detailed-status"] = ""
1738 unset_dict[op_text + "detailed-status"] = None
1739 update_dict[op_text + "operationState"] = "COMPLETED"
1740
1741 return
1742
1743 except Exception as e:
1744 error_text = "{}: {}".format(step, e)
1745 self.logger.error("{} for {}: {}".format(step, target_id, e))
1746
1747 finally:
1748 if update_dict or unset_dict:
1749 if error_text:
1750 update_dict[op_text + "operationState"] = "FAILED"
1751 update_dict[op_text + "detailed-status"] = error_text
1752 unset_dict.pop(op_text + "detailed-status", None)
1753 update_dict["_admin.operationalState"] = "ERROR"
1754 update_dict["_admin.detailed-status"] = error_text
1755
1756 if op_text:
1757 update_dict[op_text + "statusEnteredTime"] = now
1758
1759 self.db.set_one(
1760 target_database,
1761 q_filter={"_id": _id},
1762 update_dict=update_dict,
1763 unset=unset_dict,
1764 fail_on_empty=False,
1765 )
1766
1767 if not loaded:
1768 self._unload_vim(target_id)
1769
1770 def _reload_vim(self, target_id):
1771 if target_id in self.vim_targets:
1772 self._load_vim(target_id)
1773 else:
1774 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1775 # just remove it to force load again next time it is needed
1776 self.db_vims.pop(target_id, None)
1777
1778 def _load_vim(self, target_id):
1779 """
1780 Load or reload a vim_account, sdn_controller or wim_account.
1781 Read content from database, load the plugin if not loaded.
1782 In case of error loading the plugin, it load a failing VIM_connector
1783 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1784 :param target_id: Contains type:_id; where type can be 'vim', ...
1785 :return: None if ok, descriptive text if error
1786 """
1787 target, _, _id = target_id.partition(":")
1788 target_database = (
1789 "vim_accounts"
1790 if target == "vim"
1791 else "wim_accounts"
1792 if target == "wim"
1793 else "sdns"
1794 )
1795 plugin_name = ""
1796 vim = None
1797
1798 try:
1799 step = "Getting {}={} from db".format(target, _id)
1800 # TODO process for wim, sdnc, ...
1801 vim = self.db.get_one(target_database, {"_id": _id})
1802
1803 # if deep_get(vim, "config", "sdn-controller"):
1804 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1805 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1806
1807 step = "Decrypting password"
1808 schema_version = vim.get("schema_version")
1809 self.db.encrypt_decrypt_fields(
1810 vim,
1811 "decrypt",
1812 fields=("password", "secret"),
1813 schema_version=schema_version,
1814 salt=_id,
1815 )
1816 self._process_vim_config(target_id, vim)
1817
1818 if target == "vim":
1819 plugin_name = "rovim_" + vim["vim_type"]
1820 step = "Loading plugin '{}'".format(plugin_name)
1821 vim_module_conn = self._load_plugin(plugin_name)
1822 step = "Loading {}'".format(target_id)
1823 self.my_vims[target_id] = vim_module_conn(
1824 uuid=vim["_id"],
1825 name=vim["name"],
1826 tenant_id=vim.get("vim_tenant_id"),
1827 tenant_name=vim.get("vim_tenant_name"),
1828 url=vim["vim_url"],
1829 url_admin=None,
1830 user=vim["vim_user"],
1831 passwd=vim["vim_password"],
1832 config=vim.get("config") or {},
1833 persistent_info={},
1834 )
1835 else: # sdn
1836 plugin_name = "rosdn_" + (vim.get("type") or vim.get("wim_type"))
1837 step = "Loading plugin '{}'".format(plugin_name)
1838 vim_module_conn = self._load_plugin(plugin_name, "sdn")
1839 step = "Loading {}'".format(target_id)
1840 wim = deepcopy(vim)
1841 wim_config = wim.pop("config", {}) or {}
1842 wim["uuid"] = wim["_id"]
1843 if "url" in wim and "wim_url" not in wim:
1844 wim["wim_url"] = wim["url"]
1845 elif "url" not in wim and "wim_url" in wim:
1846 wim["url"] = wim["wim_url"]
1847
1848 if wim.get("dpid"):
1849 wim_config["dpid"] = wim.pop("dpid")
1850
1851 if wim.get("switch_id"):
1852 wim_config["switch_id"] = wim.pop("switch_id")
1853
1854 # wim, wim_account, config
1855 self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config)
1856 self.db_vims[target_id] = vim
1857 self.error_status = None
1858
1859 self.logger.info(
1860 "Connector loaded for {}, plugin={}".format(target_id, plugin_name)
1861 )
1862 except Exception as e:
1863 self.logger.error(
1864 "Cannot load {} plugin={}: {} {}".format(
1865 target_id, plugin_name, step, e
1866 )
1867 )
1868
1869 self.db_vims[target_id] = vim or {}
1870 self.db_vims[target_id] = FailingConnector(str(e))
1871 error_status = "{} Error: {}".format(step, e)
1872
1873 return error_status
1874 finally:
1875 if target_id not in self.vim_targets:
1876 self.vim_targets.append(target_id)
1877
1878 def _get_db_task(self):
1879 """
1880 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1881 :return: None
1882 """
1883 now = time.time()
1884
1885 if not self.time_last_task_processed:
1886 self.time_last_task_processed = now
1887
1888 try:
1889 while True:
1890 """
1891 # Log RO tasks only when loglevel is DEBUG
1892 if self.logger.getEffectiveLevel() == logging.DEBUG:
1893 self._log_ro_task(
1894 None,
1895 None,
1896 None,
1897 "TASK_WF",
1898 "task_locked_time="
1899 + str(self.task_locked_time)
1900 + " "
1901 + "time_last_task_processed="
1902 + str(self.time_last_task_processed)
1903 + " "
1904 + "now="
1905 + str(now),
1906 )
1907 """
1908 locked = self.db.set_one(
1909 "ro_tasks",
1910 q_filter={
1911 "target_id": self.vim_targets,
1912 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1913 "locked_at.lt": now - self.task_locked_time,
1914 "to_check_at.lt": self.time_last_task_processed,
1915 "to_check_at.gt": -1,
1916 },
1917 update_dict={"locked_by": self.my_id, "locked_at": now},
1918 fail_on_empty=False,
1919 )
1920
1921 if locked:
1922 # read and return
1923 ro_task = self.db.get_one(
1924 "ro_tasks",
1925 q_filter={
1926 "target_id": self.vim_targets,
1927 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1928 "locked_at": now,
1929 },
1930 )
1931 return ro_task
1932
1933 if self.time_last_task_processed == now:
1934 self.time_last_task_processed = None
1935 return None
1936 else:
1937 self.time_last_task_processed = now
1938 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
1939
1940 except DbException as e:
1941 self.logger.error("Database exception at _get_db_task: {}".format(e))
1942 except Exception as e:
1943 self.logger.critical(
1944 "Unexpected exception at _get_db_task: {}".format(e), exc_info=True
1945 )
1946
1947 return None
1948
1949 def _get_db_all_tasks(self):
1950 """
1951 Read all content of table ro_tasks to log it
1952 :return: None
1953 """
1954 try:
1955 # Checking the content of the BD:
1956
1957 # read and return
1958 ro_task = self.db.get_list("ro_tasks")
1959 for rt in ro_task:
1960 self._log_ro_task(rt, None, None, "TASK_WF", "GET_ALL_TASKS")
1961 return ro_task
1962
1963 except DbException as e:
1964 self.logger.error("Database exception at _get_db_all_tasks: {}".format(e))
1965 except Exception as e:
1966 self.logger.critical(
1967 "Unexpected exception at _get_db_all_tasks: {}".format(e), exc_info=True
1968 )
1969
1970 return None
1971
1972 def _log_ro_task(self, ro_task, db_ro_task_update, db_ro_task_delete, mark, event):
1973 """
1974 Generate a log with the following format:
1975
1976 Mark;Event;ro_task_id;locked_at;modified_at;created_at;to_check_at;locked_by;
1977 target_id;vim_info.refresh_at;vim_info;no_of_tasks;task_status;action_id;
1978 task_array_index;task_id;task_action;task_item;task_args
1979
1980 Example:
1981
1982 TASK_WF;GET_TASK;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;1642158724.8210013;
1983 1642158640.7986135;1642158640.7986135;1642158640.7986135;b134c9494e75:0a
1984 ;vim:b7ff9e24-8868-4d68-8a57-a59dc11d0327;None;{'created': False,
1985 'created_items': None, 'vim_id': None, 'vim_name': None, 'vim_status': None,
1986 'vim_details': None, 'vim_message': None, 'refresh_at': None};1;SCHEDULED;
1987 888f1864-749a-4fc2-bc1a-97c0fffd6a6f;0;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;
1988 CREATE;image;{'filter_dict': {'name': 'ubuntu-os-cloud:image-family:ubuntu-1804-lts'}}
1989 """
1990 try:
1991 line = []
1992 i = 0
1993 if ro_task is not None and isinstance(ro_task, dict):
1994 for t in ro_task["tasks"]:
1995 line.clear()
1996 line.append(mark)
1997 line.append(event)
1998 line.append(ro_task.get("_id", ""))
1999 line.append(str(ro_task.get("locked_at", "")))
2000 line.append(str(ro_task.get("modified_at", "")))
2001 line.append(str(ro_task.get("created_at", "")))
2002 line.append(str(ro_task.get("to_check_at", "")))
2003 line.append(str(ro_task.get("locked_by", "")))
2004 line.append(str(ro_task.get("target_id", "")))
2005 line.append(str(ro_task.get("vim_info", {}).get("refresh_at", "")))
2006 line.append(str(ro_task.get("vim_info", "")))
2007 line.append(str(ro_task.get("tasks", "")))
2008 if isinstance(t, dict):
2009 line.append(str(t.get("status", "")))
2010 line.append(str(t.get("action_id", "")))
2011 line.append(str(i))
2012 line.append(str(t.get("task_id", "")))
2013 line.append(str(t.get("action", "")))
2014 line.append(str(t.get("item", "")))
2015 line.append(str(t.get("find_params", "")))
2016 line.append(str(t.get("params", "")))
2017 else:
2018 line.extend([""] * 2)
2019 line.append(str(i))
2020 line.extend([""] * 5)
2021
2022 i += 1
2023 self.logger.debug(";".join(line))
2024 elif db_ro_task_update is not None and isinstance(db_ro_task_update, dict):
2025 i = 0
2026 while True:
2027 st = "tasks.{}.status".format(i)
2028 if st not in db_ro_task_update:
2029 break
2030 line.clear()
2031 line.append(mark)
2032 line.append(event)
2033 line.append(db_ro_task_update.get("_id", ""))
2034 line.append(str(db_ro_task_update.get("locked_at", "")))
2035 line.append(str(db_ro_task_update.get("modified_at", "")))
2036 line.append("")
2037 line.append(str(db_ro_task_update.get("to_check_at", "")))
2038 line.append(str(db_ro_task_update.get("locked_by", "")))
2039 line.append("")
2040 line.append(str(db_ro_task_update.get("vim_info.refresh_at", "")))
2041 line.append("")
2042 line.append(str(db_ro_task_update.get("vim_info", "")))
2043 line.append(str(str(db_ro_task_update).count(".status")))
2044 line.append(db_ro_task_update.get(st, ""))
2045 line.append("")
2046 line.append(str(i))
2047 line.extend([""] * 3)
2048 i += 1
2049 self.logger.debug(";".join(line))
2050
2051 elif db_ro_task_delete is not None and isinstance(db_ro_task_delete, dict):
2052 line.clear()
2053 line.append(mark)
2054 line.append(event)
2055 line.append(db_ro_task_delete.get("_id", ""))
2056 line.append("")
2057 line.append(db_ro_task_delete.get("modified_at", ""))
2058 line.extend([""] * 13)
2059 self.logger.debug(";".join(line))
2060
2061 else:
2062 line.clear()
2063 line.append(mark)
2064 line.append(event)
2065 line.extend([""] * 16)
2066 self.logger.debug(";".join(line))
2067
2068 except Exception as e:
2069 self.logger.error("Error logging ro_task: {}".format(e))
2070
2071 def _delete_task(self, ro_task, task_index, task_depends, db_update):
2072 """
2073 Determine if this task need to be done or superseded
2074 :return: None
2075 """
2076 my_task = ro_task["tasks"][task_index]
2077 task_id = my_task["task_id"]
2078 needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get(
2079 "created_items", False
2080 )
2081
2082 self.logger.warning("Needed delete: {}".format(needed_delete))
2083 if my_task["status"] == "FAILED":
2084 return None, None # TODO need to be retry??
2085
2086 try:
2087 for index, task in enumerate(ro_task["tasks"]):
2088 if index == task_index or not task:
2089 continue # own task
2090
2091 if (
2092 my_task["target_record"] == task["target_record"]
2093 and task["action"] == "CREATE"
2094 ):
2095 # set to finished
2096 db_update["tasks.{}.status".format(index)] = task[
2097 "status"
2098 ] = "FINISHED"
2099 elif task["action"] == "CREATE" and task["status"] not in (
2100 "FINISHED",
2101 "SUPERSEDED",
2102 ):
2103 needed_delete = False
2104
2105 if needed_delete:
2106 self.logger.warning(
2107 "Deleting ro_task={} task_index={}".format(ro_task, task_index)
2108 )
2109 return self.item2class[my_task["item"]].delete(ro_task, task_index)
2110 else:
2111 return "SUPERSEDED", None
2112 except Exception as e:
2113 if not isinstance(e, NsWorkerException):
2114 self.logger.critical(
2115 "Unexpected exception at _delete_task task={}: {}".format(
2116 task_id, e
2117 ),
2118 exc_info=True,
2119 )
2120
2121 return "FAILED", {"vim_status": "VIM_ERROR", "vim_message": str(e)}
2122
2123 def _create_task(self, ro_task, task_index, task_depends, db_update):
2124 """
2125 Determine if this task need to create something at VIM
2126 :return: None
2127 """
2128 my_task = ro_task["tasks"][task_index]
2129 task_id = my_task["task_id"]
2130 task_status = None
2131
2132 if my_task["status"] == "FAILED":
2133 return None, None # TODO need to be retry??
2134 elif my_task["status"] == "SCHEDULED":
2135 # check if already created by another task
2136 for index, task in enumerate(ro_task["tasks"]):
2137 if index == task_index or not task:
2138 continue # own task
2139
2140 if task["action"] == "CREATE" and task["status"] not in (
2141 "SCHEDULED",
2142 "FINISHED",
2143 "SUPERSEDED",
2144 ):
2145 return task["status"], "COPY_VIM_INFO"
2146
2147 try:
2148 task_status, ro_vim_item_update = self.item2class[my_task["item"]].new(
2149 ro_task, task_index, task_depends
2150 )
2151 # TODO update other CREATE tasks
2152 except Exception as e:
2153 if not isinstance(e, NsWorkerException):
2154 self.logger.error(
2155 "Error executing task={}: {}".format(task_id, e), exc_info=True
2156 )
2157
2158 task_status = "FAILED"
2159 ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_message": str(e)}
2160 # TODO update ro_vim_item_update
2161
2162 return task_status, ro_vim_item_update
2163 else:
2164 return None, None
2165
2166 def _get_dependency(self, task_id, ro_task=None, target_id=None):
2167 """
2168 Look for dependency task
2169 :param task_id: Can be one of
2170 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2171 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2172 3. task.task_id: "<action_id>:number"
2173 :param ro_task:
2174 :param target_id:
2175 :return: database ro_task plus index of task
2176 """
2177 if (
2178 task_id.startswith("vim:")
2179 or task_id.startswith("sdn:")
2180 or task_id.startswith("wim:")
2181 ):
2182 target_id, _, task_id = task_id.partition(" ")
2183
2184 if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
2185 ro_task_dependency = self.db.get_one(
2186 "ro_tasks",
2187 q_filter={"target_id": target_id, "tasks.target_record_id": task_id},
2188 fail_on_empty=False,
2189 )
2190
2191 if ro_task_dependency:
2192 for task_index, task in enumerate(ro_task_dependency["tasks"]):
2193 if task["target_record_id"] == task_id:
2194 return ro_task_dependency, task_index
2195
2196 else:
2197 if ro_task:
2198 for task_index, task in enumerate(ro_task["tasks"]):
2199 if task and task["task_id"] == task_id:
2200 return ro_task, task_index
2201
2202 ro_task_dependency = self.db.get_one(
2203 "ro_tasks",
2204 q_filter={
2205 "tasks.ANYINDEX.task_id": task_id,
2206 "tasks.ANYINDEX.target_record.ne": None,
2207 },
2208 fail_on_empty=False,
2209 )
2210
2211 self.logger.warning("ro_task_dependency={}".format(ro_task_dependency))
2212 if ro_task_dependency:
2213 for task_index, task in enumerate(ro_task_dependency["tasks"]):
2214 if task["task_id"] == task_id:
2215 return ro_task_dependency, task_index
2216 raise NsWorkerException("Cannot get depending task {}".format(task_id))
2217
2218 def update_vm_refresh(self):
2219 """Enables the VM status updates if self.refresh_config.active parameter
2220 is not -1 and than updates the DB accordingly
2221
2222 """
2223 try:
2224 self.logger.debug("Checking if VM status update config")
2225 next_refresh = time.time()
2226 if self.refresh_config.active == -1:
2227 next_refresh = -1
2228 else:
2229 next_refresh += self.refresh_config.active
2230
2231 if next_refresh != -1:
2232 db_ro_task_update = {}
2233 now = time.time()
2234 next_check_at = now + (24 * 60 * 60)
2235 next_check_at = min(next_check_at, next_refresh)
2236 db_ro_task_update["vim_info.refresh_at"] = next_refresh
2237 db_ro_task_update["to_check_at"] = next_check_at
2238
2239 self.logger.debug(
2240 "Finding tasks which to be updated to enable VM status updates"
2241 )
2242 refresh_tasks = self.db.get_list(
2243 "ro_tasks",
2244 q_filter={
2245 "tasks.status": "DONE",
2246 "to_check_at.lt": 0,
2247 },
2248 )
2249 self.logger.debug("Updating tasks to change the to_check_at status")
2250 for task in refresh_tasks:
2251 q_filter = {
2252 "_id": task["_id"],
2253 }
2254 self.db.set_one(
2255 "ro_tasks",
2256 q_filter=q_filter,
2257 update_dict=db_ro_task_update,
2258 fail_on_empty=True,
2259 )
2260
2261 except Exception as e:
2262 self.logger.error(f"Error updating tasks to enable VM status updates: {e}")
2263
2264 def _process_pending_tasks(self, ro_task):
2265 ro_task_id = ro_task["_id"]
2266 now = time.time()
2267 # one day
2268 next_check_at = now + (24 * 60 * 60)
2269 db_ro_task_update = {}
2270
2271 def _update_refresh(new_status):
2272 # compute next_refresh
2273 nonlocal task
2274 nonlocal next_check_at
2275 nonlocal db_ro_task_update
2276 nonlocal ro_task
2277
2278 next_refresh = time.time()
2279
2280 if task["item"] in ("image", "flavor"):
2281 next_refresh += self.refresh_config.image
2282 elif new_status == "BUILD":
2283 next_refresh += self.refresh_config.build
2284 elif new_status == "DONE":
2285 if self.refresh_config.active == -1:
2286 next_refresh = -1
2287 else:
2288 next_refresh += self.refresh_config.active
2289 else:
2290 next_refresh += self.refresh_config.error
2291
2292 next_check_at = min(next_check_at, next_refresh)
2293 db_ro_task_update["vim_info.refresh_at"] = next_refresh
2294 ro_task["vim_info"]["refresh_at"] = next_refresh
2295
2296 try:
2297 """
2298 # Log RO tasks only when loglevel is DEBUG
2299 if self.logger.getEffectiveLevel() == logging.DEBUG:
2300 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
2301 """
2302 # Check if vim status refresh is enabled again
2303 self.update_vm_refresh()
2304 # 0: get task_status_create
2305 lock_object = None
2306 task_status_create = None
2307 task_create = next(
2308 (
2309 t
2310 for t in ro_task["tasks"]
2311 if t
2312 and t["action"] == "CREATE"
2313 and t["status"] in ("BUILD", "DONE")
2314 ),
2315 None,
2316 )
2317
2318 if task_create:
2319 task_status_create = task_create["status"]
2320
2321 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
2322 for task_action in ("DELETE", "CREATE", "EXEC"):
2323 db_vim_update = None
2324 new_status = None
2325
2326 for task_index, task in enumerate(ro_task["tasks"]):
2327 if not task:
2328 continue # task deleted
2329
2330 task_depends = {}
2331 target_update = None
2332
2333 if (
2334 (
2335 task_action in ("DELETE", "EXEC")
2336 and task["status"] not in ("SCHEDULED", "BUILD")
2337 )
2338 or task["action"] != task_action
2339 or (
2340 task_action == "CREATE"
2341 and task["status"] in ("FINISHED", "SUPERSEDED")
2342 )
2343 ):
2344 continue
2345
2346 task_path = "tasks.{}.status".format(task_index)
2347 try:
2348 db_vim_info_update = None
2349
2350 if task["status"] == "SCHEDULED":
2351 # check if tasks that this depends on have been completed
2352 dependency_not_completed = False
2353
2354 for dependency_task_id in task.get("depends_on") or ():
2355 (
2356 dependency_ro_task,
2357 dependency_task_index,
2358 ) = self._get_dependency(
2359 dependency_task_id, target_id=ro_task["target_id"]
2360 )
2361 dependency_task = dependency_ro_task["tasks"][
2362 dependency_task_index
2363 ]
2364 self.logger.warning(
2365 "dependency_ro_task={} dependency_task_index={}".format(
2366 dependency_ro_task, dependency_task_index
2367 )
2368 )
2369
2370 if dependency_task["status"] == "SCHEDULED":
2371 dependency_not_completed = True
2372 next_check_at = min(
2373 next_check_at, dependency_ro_task["to_check_at"]
2374 )
2375 # must allow dependent task to be processed first
2376 # to do this set time after last_task_processed
2377 next_check_at = max(
2378 self.time_last_task_processed, next_check_at
2379 )
2380 break
2381 elif dependency_task["status"] == "FAILED":
2382 error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
2383 task["action"],
2384 task["item"],
2385 dependency_task["action"],
2386 dependency_task["item"],
2387 dependency_task_id,
2388 dependency_ro_task["vim_info"].get(
2389 "vim_message"
2390 ),
2391 )
2392 self.logger.error(
2393 "task={} {}".format(task["task_id"], error_text)
2394 )
2395 raise NsWorkerException(error_text)
2396
2397 task_depends[dependency_task_id] = dependency_ro_task[
2398 "vim_info"
2399 ]["vim_id"]
2400 task_depends[
2401 "TASK-{}".format(dependency_task_id)
2402 ] = dependency_ro_task["vim_info"]["vim_id"]
2403
2404 if dependency_not_completed:
2405 self.logger.warning(
2406 "DEPENDENCY NOT COMPLETED {}".format(
2407 dependency_ro_task["vim_info"]["vim_id"]
2408 )
2409 )
2410 # TODO set at vim_info.vim_details that it is waiting
2411 continue
2412
2413 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
2414 # the task of renew this locking. It will update database locket_at periodically
2415 if not lock_object:
2416 lock_object = LockRenew.add_lock_object(
2417 "ro_tasks", ro_task, self
2418 )
2419
2420 if task["action"] == "DELETE":
2421 (
2422 new_status,
2423 db_vim_info_update,
2424 ) = self._delete_task(
2425 ro_task, task_index, task_depends, db_ro_task_update
2426 )
2427 new_status = (
2428 "FINISHED" if new_status == "DONE" else new_status
2429 )
2430 # ^with FINISHED instead of DONE it will not be refreshing
2431
2432 if new_status in ("FINISHED", "SUPERSEDED"):
2433 target_update = "DELETE"
2434 elif task["action"] == "EXEC":
2435 (
2436 new_status,
2437 db_vim_info_update,
2438 db_task_update,
2439 ) = self.item2class[task["item"]].exec(
2440 ro_task, task_index, task_depends
2441 )
2442 new_status = (
2443 "FINISHED" if new_status == "DONE" else new_status
2444 )
2445 # ^with FINISHED instead of DONE it will not be refreshing
2446
2447 if db_task_update:
2448 # load into database the modified db_task_update "retries" and "next_retry"
2449 if db_task_update.get("retries"):
2450 db_ro_task_update[
2451 "tasks.{}.retries".format(task_index)
2452 ] = db_task_update["retries"]
2453
2454 next_check_at = time.time() + db_task_update.get(
2455 "next_retry", 60
2456 )
2457 target_update = None
2458 elif task["action"] == "CREATE":
2459 if task["status"] == "SCHEDULED":
2460 if task_status_create:
2461 new_status = task_status_create
2462 target_update = "COPY_VIM_INFO"
2463 else:
2464 new_status, db_vim_info_update = self.item2class[
2465 task["item"]
2466 ].new(ro_task, task_index, task_depends)
2467 # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
2468 _update_refresh(new_status)
2469 else:
2470 refresh_at = ro_task["vim_info"]["refresh_at"]
2471 if refresh_at and refresh_at != -1 and now > refresh_at:
2472 (
2473 new_status,
2474 db_vim_info_update,
2475 ) = self.item2class[
2476 task["item"]
2477 ].refresh(ro_task)
2478 _update_refresh(new_status)
2479 else:
2480 # The refresh is updated to avoid set the value of "refresh_at" to
2481 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2482 # because it can happen that in this case the task is never processed
2483 _update_refresh(task["status"])
2484
2485 except Exception as e:
2486 new_status = "FAILED"
2487 db_vim_info_update = {
2488 "vim_status": "VIM_ERROR",
2489 "vim_message": str(e),
2490 }
2491
2492 if not isinstance(
2493 e, (NsWorkerException, vimconn.VimConnException)
2494 ):
2495 self.logger.error(
2496 "Unexpected exception at _delete_task task={}: {}".format(
2497 task["task_id"], e
2498 ),
2499 exc_info=True,
2500 )
2501
2502 try:
2503 if db_vim_info_update:
2504 db_vim_update = db_vim_info_update.copy()
2505 db_ro_task_update.update(
2506 {
2507 "vim_info." + k: v
2508 for k, v in db_vim_info_update.items()
2509 }
2510 )
2511 ro_task["vim_info"].update(db_vim_info_update)
2512
2513 if new_status:
2514 if task_action == "CREATE":
2515 task_status_create = new_status
2516 db_ro_task_update[task_path] = new_status
2517
2518 if target_update or db_vim_update:
2519 if target_update == "DELETE":
2520 self._update_target(task, None)
2521 elif target_update == "COPY_VIM_INFO":
2522 self._update_target(task, ro_task["vim_info"])
2523 else:
2524 self._update_target(task, db_vim_update)
2525
2526 except Exception as e:
2527 if (
2528 isinstance(e, DbException)
2529 and e.http_code == HTTPStatus.NOT_FOUND
2530 ):
2531 # if the vnfrs or nsrs has been removed from database, this task must be removed
2532 self.logger.debug(
2533 "marking to delete task={}".format(task["task_id"])
2534 )
2535 self.tasks_to_delete.append(task)
2536 else:
2537 self.logger.error(
2538 "Unexpected exception at _update_target task={}: {}".format(
2539 task["task_id"], e
2540 ),
2541 exc_info=True,
2542 )
2543
2544 locked_at = ro_task["locked_at"]
2545
2546 if lock_object:
2547 locked_at = [
2548 lock_object["locked_at"],
2549 lock_object["locked_at"] + self.task_locked_time,
2550 ]
2551 # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
2552 # contain exactly locked_at + self.task_locked_time
2553 LockRenew.remove_lock_object(lock_object)
2554
2555 q_filter = {
2556 "_id": ro_task["_id"],
2557 "to_check_at": ro_task["to_check_at"],
2558 "locked_at": locked_at,
2559 }
2560 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2561 # outside this task (by ro_nbi) do not update it
2562 db_ro_task_update["locked_by"] = None
2563 # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
2564 db_ro_task_update["locked_at"] = int(now - self.task_locked_time)
2565 db_ro_task_update["modified_at"] = now
2566 db_ro_task_update["to_check_at"] = next_check_at
2567
2568 """
2569 # Log RO tasks only when loglevel is DEBUG
2570 if self.logger.getEffectiveLevel() == logging.DEBUG:
2571 db_ro_task_update_log = db_ro_task_update.copy()
2572 db_ro_task_update_log["_id"] = q_filter["_id"]
2573 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2574 """
2575
2576 if not self.db.set_one(
2577 "ro_tasks",
2578 update_dict=db_ro_task_update,
2579 q_filter=q_filter,
2580 fail_on_empty=False,
2581 ):
2582 del db_ro_task_update["to_check_at"]
2583 del q_filter["to_check_at"]
2584 """
2585 # Log RO tasks only when loglevel is DEBUG
2586 if self.logger.getEffectiveLevel() == logging.DEBUG:
2587 self._log_ro_task(
2588 None,
2589 db_ro_task_update_log,
2590 None,
2591 "TASK_WF",
2592 "SET_TASK " + str(q_filter),
2593 )
2594 """
2595 self.db.set_one(
2596 "ro_tasks",
2597 q_filter=q_filter,
2598 update_dict=db_ro_task_update,
2599 fail_on_empty=True,
2600 )
2601 except DbException as e:
2602 self.logger.error(
2603 "ro_task={} Error updating database {}".format(ro_task_id, e)
2604 )
2605 except Exception as e:
2606 self.logger.error(
2607 "Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True
2608 )
2609
2610 def _update_target(self, task, ro_vim_item_update):
2611 table, _, temp = task["target_record"].partition(":")
2612 _id, _, path_vim_status = temp.partition(":")
2613 path_item = path_vim_status[: path_vim_status.rfind(".")]
2614 path_item = path_item[: path_item.rfind(".")]
2615 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
2616 # path_item: dot separated list targeting record information, e.g. "vdur.10"
2617
2618 if ro_vim_item_update:
2619 update_dict = {
2620 path_vim_status + "." + k: v
2621 for k, v in ro_vim_item_update.items()
2622 if k
2623 in (
2624 "vim_id",
2625 "vim_details",
2626 "vim_message",
2627 "vim_name",
2628 "vim_status",
2629 "interfaces",
2630 "interfaces_backup",
2631 )
2632 }
2633
2634 if path_vim_status.startswith("vdur."):
2635 # for backward compatibility, add vdur.name apart from vdur.vim_name
2636 if ro_vim_item_update.get("vim_name"):
2637 update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"]
2638
2639 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
2640 if ro_vim_item_update.get("vim_id"):
2641 update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"]
2642
2643 # update general status
2644 if ro_vim_item_update.get("vim_status"):
2645 update_dict[path_item + ".status"] = ro_vim_item_update[
2646 "vim_status"
2647 ]
2648
2649 if ro_vim_item_update.get("interfaces"):
2650 path_interfaces = path_item + ".interfaces"
2651
2652 for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
2653 if iface:
2654 update_dict.update(
2655 {
2656 path_interfaces + ".{}.".format(i) + k: v
2657 for k, v in iface.items()
2658 if k in ("vlan", "compute_node", "pci")
2659 }
2660 )
2661
2662 # put ip_address and mac_address with ip-address and mac-address
2663 if iface.get("ip_address"):
2664 update_dict[
2665 path_interfaces + ".{}.".format(i) + "ip-address"
2666 ] = iface["ip_address"]
2667
2668 if iface.get("mac_address"):
2669 update_dict[
2670 path_interfaces + ".{}.".format(i) + "mac-address"
2671 ] = iface["mac_address"]
2672
2673 if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
2674 update_dict["ip-address"] = iface.get("ip_address").split(
2675 ";"
2676 )[0]
2677
2678 if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
2679 update_dict[path_item + ".ip-address"] = iface.get(
2680 "ip_address"
2681 ).split(";")[0]
2682
2683 self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
2684
2685 # If interfaces exists, it backups VDU interfaces in the DB for healing operations
2686 if ro_vim_item_update.get("interfaces"):
2687 search_key = path_vim_status + ".interfaces"
2688 if update_dict.get(search_key):
2689 interfaces_backup_update = {
2690 path_vim_status + ".interfaces_backup": update_dict[search_key]
2691 }
2692
2693 self.db.set_one(
2694 table,
2695 q_filter={"_id": _id},
2696 update_dict=interfaces_backup_update,
2697 )
2698
2699 else:
2700 update_dict = {path_item + ".status": "DELETED"}
2701 self.db.set_one(
2702 table,
2703 q_filter={"_id": _id},
2704 update_dict=update_dict,
2705 unset={path_vim_status: None},
2706 )
2707
2708 def _process_delete_db_tasks(self):
2709 """
2710 Delete task from database because vnfrs or nsrs or both have been deleted
2711 :return: None. Uses and modify self.tasks_to_delete
2712 """
2713 while self.tasks_to_delete:
2714 task = self.tasks_to_delete[0]
2715 vnfrs_deleted = None
2716 nsr_id = task["nsr_id"]
2717
2718 if task["target_record"].startswith("vnfrs:"):
2719 # check if nsrs is present
2720 if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False):
2721 vnfrs_deleted = task["target_record"].split(":")[1]
2722
2723 try:
2724 self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted)
2725 except Exception as e:
2726 self.logger.error(
2727 "Error deleting task={}: {}".format(task["task_id"], e)
2728 )
2729 self.tasks_to_delete.pop(0)
2730
2731 @staticmethod
2732 def delete_db_tasks(db, nsr_id, vnfrs_deleted):
2733 """
2734 Static method because it is called from osm_ng_ro.ns
2735 :param db: instance of database to use
2736 :param nsr_id: affected nsrs id
2737 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2738 :return: None, exception is fails
2739 """
2740 retries = 5
2741 for retry in range(retries):
2742 ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
2743 now = time.time()
2744 conflict = False
2745
2746 for ro_task in ro_tasks:
2747 db_update = {}
2748 to_delete_ro_task = True
2749
2750 for index, task in enumerate(ro_task["tasks"]):
2751 if not task:
2752 pass
2753 elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or (
2754 vnfrs_deleted
2755 and task["target_record"].startswith("vnfrs:" + vnfrs_deleted)
2756 ):
2757 db_update["tasks.{}".format(index)] = None
2758 else:
2759 # used by other nsr, ro_task cannot be deleted
2760 to_delete_ro_task = False
2761
2762 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2763 if to_delete_ro_task:
2764 if not db.del_one(
2765 "ro_tasks",
2766 q_filter={
2767 "_id": ro_task["_id"],
2768 "modified_at": ro_task["modified_at"],
2769 },
2770 fail_on_empty=False,
2771 ):
2772 conflict = True
2773 elif db_update:
2774 db_update["modified_at"] = now
2775 if not db.set_one(
2776 "ro_tasks",
2777 q_filter={
2778 "_id": ro_task["_id"],
2779 "modified_at": ro_task["modified_at"],
2780 },
2781 update_dict=db_update,
2782 fail_on_empty=False,
2783 ):
2784 conflict = True
2785 if not conflict:
2786 return
2787 else:
2788 raise NsWorkerException("Exceeded {} retries".format(retries))
2789
2790 def run(self):
2791 # load database
2792 self.logger.info("Starting")
2793 while True:
2794 # step 1: get commands from queue
2795 try:
2796 if self.vim_targets:
2797 task = self.task_queue.get(block=False)
2798 else:
2799 if not self.idle:
2800 self.logger.debug("enters in idle state")
2801 self.idle = True
2802 task = self.task_queue.get(block=True)
2803 self.idle = False
2804
2805 if task[0] == "terminate":
2806 break
2807 elif task[0] == "load_vim":
2808 self.logger.info("order to load vim {}".format(task[1]))
2809 self._load_vim(task[1])
2810 elif task[0] == "unload_vim":
2811 self.logger.info("order to unload vim {}".format(task[1]))
2812 self._unload_vim(task[1])
2813 elif task[0] == "reload_vim":
2814 self._reload_vim(task[1])
2815 elif task[0] == "check_vim":
2816 self.logger.info("order to check vim {}".format(task[1]))
2817 self._check_vim(task[1])
2818 continue
2819 except Exception as e:
2820 if isinstance(e, queue.Empty):
2821 pass
2822 else:
2823 self.logger.critical(
2824 "Error processing task: {}".format(e), exc_info=True
2825 )
2826
2827 # step 2: process pending_tasks, delete not needed tasks
2828 try:
2829 if self.tasks_to_delete:
2830 self._process_delete_db_tasks()
2831 busy = False
2832 """
2833 # Log RO tasks only when loglevel is DEBUG
2834 if self.logger.getEffectiveLevel() == logging.DEBUG:
2835 _ = self._get_db_all_tasks()
2836 """
2837 ro_task = self._get_db_task()
2838 if ro_task:
2839 self.logger.warning("Task to process: {}".format(ro_task))
2840 time.sleep(1)
2841 self._process_pending_tasks(ro_task)
2842 busy = True
2843 if not busy:
2844 time.sleep(5)
2845 except Exception as e:
2846 self.logger.critical(
2847 "Unexpected exception at run: " + str(e), exc_info=True
2848 )
2849
2850 self.logger.info("Finishing")