71944464588a2d1e8aee6ed805010ecdfe9d0d86
[osm/RO.git] / NG-RO / osm_ng_ro / ns_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 #
18 ##
19
20 """"
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
25 """
26
27 from copy import deepcopy
28 from http import HTTPStatus
29 import logging
30 from os import makedirs
31 from os import path
32 import queue
33 from shutil import rmtree
34 import threading
35 import time
36 import traceback
37 from typing import Dict
38 from unittest.mock import Mock
39
40 from importlib_metadata import entry_points
41 from osm_common.dbbase import DbException
42 from osm_ng_ro.vim_admin import LockRenew
43 from osm_ro_plugin import sdnconn
44 from osm_ro_plugin import vimconn
45 from osm_ro_plugin.sdn_dummy import SdnDummyConnector
46 from osm_ro_plugin.vim_dummy import VimDummyConnector
47 import yaml
48
49 __author__ = "Alfonso Tierno"
50 __date__ = "$28-Sep-2017 12:07:15$"
51
52
53 def deep_get(target_dict, *args, **kwargs):
54 """
55 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
56 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
57 :param target_dict: dictionary to be read
58 :param args: list of keys to read from target_dict
59 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
60 :return: The wanted value if exist, None or default otherwise
61 """
62 for key in args:
63 if not isinstance(target_dict, dict) or key not in target_dict:
64 return kwargs.get("default")
65 target_dict = target_dict[key]
66 return target_dict
67
68
69 class NsWorkerException(Exception):
70 pass
71
72
73 class FailingConnector:
74 def __init__(self, error_msg):
75 self.error_msg = error_msg
76
77 for method in dir(vimconn.VimConnector):
78 if method[0] != "_":
79 setattr(
80 self, method, Mock(side_effect=vimconn.VimConnException(error_msg))
81 )
82
83 for method in dir(sdnconn.SdnConnectorBase):
84 if method[0] != "_":
85 setattr(
86 self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg))
87 )
88
89
90 class NsWorkerExceptionNotFound(NsWorkerException):
91 pass
92
93
94 class VimInteractionBase:
95 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
96 It implements methods that does nothing and return ok"""
97
98 def __init__(self, db, my_vims, db_vims, logger):
99 self.db = db
100 self.logger = logger
101 self.my_vims = my_vims
102 self.db_vims = db_vims
103
104 def new(self, ro_task, task_index, task_depends):
105 return "BUILD", {}
106
107 def refresh(self, ro_task):
108 """skip calling VIM to get image, flavor status. Assumes ok"""
109 if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
110 return "FAILED", {}
111
112 return "DONE", {}
113
114 def delete(self, ro_task, task_index):
115 """skip calling VIM to delete image. Assumes ok"""
116 return "DONE", {}
117
118 def exec(self, ro_task, task_index, task_depends):
119 return "DONE", None, None
120
121
122 class VimInteractionNet(VimInteractionBase):
123 def new(self, ro_task, task_index, task_depends):
124 vim_net_id = None
125 task = ro_task["tasks"][task_index]
126 task_id = task["task_id"]
127 created = False
128 created_items = {}
129 target_vim = self.my_vims[ro_task["target_id"]]
130 mgmtnet = False
131 mgmtnet_defined_in_vim = False
132
133 try:
134 # FIND
135 if task.get("find_params"):
136 # if management, get configuration of VIM
137 if task["find_params"].get("filter_dict"):
138 vim_filter = task["find_params"]["filter_dict"]
139 # management network
140 elif task["find_params"].get("mgmt"):
141 mgmtnet = True
142 if deep_get(
143 self.db_vims[ro_task["target_id"]],
144 "config",
145 "management_network_id",
146 ):
147 mgmtnet_defined_in_vim = True
148 vim_filter = {
149 "id": self.db_vims[ro_task["target_id"]]["config"][
150 "management_network_id"
151 ]
152 }
153 elif deep_get(
154 self.db_vims[ro_task["target_id"]],
155 "config",
156 "management_network_name",
157 ):
158 mgmtnet_defined_in_vim = True
159 vim_filter = {
160 "name": self.db_vims[ro_task["target_id"]]["config"][
161 "management_network_name"
162 ]
163 }
164 else:
165 vim_filter = {"name": task["find_params"]["name"]}
166 else:
167 raise NsWorkerExceptionNotFound(
168 "Invalid find_params for new_net {}".format(task["find_params"])
169 )
170
171 vim_nets = target_vim.get_network_list(vim_filter)
172 if not vim_nets and not task.get("params"):
173 # If there is mgmt-network in the descriptor,
174 # there is no mapping of that network to a VIM network in the descriptor,
175 # also there is no mapping in the "--config" parameter or at VIM creation;
176 # that mgmt-network will be created.
177 if mgmtnet and not mgmtnet_defined_in_vim:
178 net_name = (
179 vim_filter.get("name")
180 if vim_filter.get("name")
181 else vim_filter.get("id")[:16]
182 )
183 vim_net_id, created_items = target_vim.new_network(
184 net_name, None
185 )
186 self.logger.debug(
187 "Created mgmt network vim_net_id: {}".format(vim_net_id)
188 )
189 created = True
190 else:
191 raise NsWorkerExceptionNotFound(
192 "Network not found with this criteria: '{}'".format(
193 task.get("find_params")
194 )
195 )
196 elif len(vim_nets) > 1:
197 raise NsWorkerException(
198 "More than one network found with this criteria: '{}'".format(
199 task["find_params"]
200 )
201 )
202
203 if vim_nets:
204 vim_net_id = vim_nets[0]["id"]
205 else:
206 # CREATE
207 params = task["params"]
208 vim_net_id, created_items = target_vim.new_network(**params)
209 created = True
210
211 ro_vim_item_update = {
212 "vim_id": vim_net_id,
213 "vim_status": "BUILD",
214 "created": created,
215 "created_items": created_items,
216 "vim_details": None,
217 "vim_message": None,
218 }
219 self.logger.debug(
220 "task={} {} new-net={} created={}".format(
221 task_id, ro_task["target_id"], vim_net_id, created
222 )
223 )
224
225 return "BUILD", ro_vim_item_update
226 except (vimconn.VimConnException, NsWorkerException) as e:
227 self.logger.error(
228 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e)
229 )
230 ro_vim_item_update = {
231 "vim_status": "VIM_ERROR",
232 "created": created,
233 "vim_message": str(e),
234 }
235
236 return "FAILED", ro_vim_item_update
237
238 def refresh(self, ro_task):
239 """Call VIM to get network status"""
240 ro_task_id = ro_task["_id"]
241 target_vim = self.my_vims[ro_task["target_id"]]
242 vim_id = ro_task["vim_info"]["vim_id"]
243 net_to_refresh_list = [vim_id]
244
245 try:
246 vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
247 vim_info = vim_dict[vim_id]
248
249 if vim_info["status"] == "ACTIVE":
250 task_status = "DONE"
251 elif vim_info["status"] == "BUILD":
252 task_status = "BUILD"
253 else:
254 task_status = "FAILED"
255 except vimconn.VimConnException as e:
256 # Mark all tasks at VIM_ERROR status
257 self.logger.error(
258 "ro_task={} vim={} get-net={}: {}".format(
259 ro_task_id, ro_task["target_id"], vim_id, e
260 )
261 )
262 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
263 task_status = "FAILED"
264
265 ro_vim_item_update = {}
266 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
267 ro_vim_item_update["vim_status"] = vim_info["status"]
268
269 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
270 ro_vim_item_update["vim_name"] = vim_info.get("name")
271
272 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
273 if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"):
274 ro_vim_item_update["vim_message"] = vim_info.get("error_msg")
275 elif vim_info["status"] == "DELETED":
276 ro_vim_item_update["vim_id"] = None
277 ro_vim_item_update["vim_message"] = "Deleted externally"
278 else:
279 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
280 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
281
282 if ro_vim_item_update:
283 self.logger.debug(
284 "ro_task={} {} get-net={}: status={} {}".format(
285 ro_task_id,
286 ro_task["target_id"],
287 vim_id,
288 ro_vim_item_update.get("vim_status"),
289 ro_vim_item_update.get("vim_message")
290 if ro_vim_item_update.get("vim_status") != "ACTIVE"
291 else "",
292 )
293 )
294
295 return task_status, ro_vim_item_update
296
297 def delete(self, ro_task, task_index):
298 task = ro_task["tasks"][task_index]
299 task_id = task["task_id"]
300 net_vim_id = ro_task["vim_info"]["vim_id"]
301 ro_vim_item_update_ok = {
302 "vim_status": "DELETED",
303 "created": False,
304 "vim_message": "DELETED",
305 "vim_id": None,
306 }
307
308 try:
309 if net_vim_id or ro_task["vim_info"]["created_items"]:
310 target_vim = self.my_vims[ro_task["target_id"]]
311 target_vim.delete_network(
312 net_vim_id, ro_task["vim_info"]["created_items"]
313 )
314 except vimconn.VimConnNotFoundException:
315 ro_vim_item_update_ok["vim_message"] = "already deleted"
316 except vimconn.VimConnException as e:
317 self.logger.error(
318 "ro_task={} vim={} del-net={}: {}".format(
319 ro_task["_id"], ro_task["target_id"], net_vim_id, e
320 )
321 )
322 ro_vim_item_update = {
323 "vim_status": "VIM_ERROR",
324 "vim_message": "Error while deleting: {}".format(e),
325 }
326
327 return "FAILED", ro_vim_item_update
328
329 self.logger.debug(
330 "task={} {} del-net={} {}".format(
331 task_id,
332 ro_task["target_id"],
333 net_vim_id,
334 ro_vim_item_update_ok.get("vim_message", ""),
335 )
336 )
337
338 return "DONE", ro_vim_item_update_ok
339
340
341 class VimInteractionVdu(VimInteractionBase):
342 max_retries_inject_ssh_key = 20 # 20 times
343 time_retries_inject_ssh_key = 30 # wevery 30 seconds
344
345 def new(self, ro_task, task_index, task_depends):
346 task = ro_task["tasks"][task_index]
347 task_id = task["task_id"]
348 created = False
349 created_items = {}
350 target_vim = self.my_vims[ro_task["target_id"]]
351
352 try:
353 created = True
354 params = task["params"]
355 params_copy = deepcopy(params)
356 net_list = params_copy["net_list"]
357
358 for net in net_list:
359 # change task_id into network_id
360 if "net_id" in net and net["net_id"].startswith("TASK-"):
361 network_id = task_depends[net["net_id"]]
362
363 if not network_id:
364 raise NsWorkerException(
365 "Cannot create VM because depends on a network not created or found "
366 "for {}".format(net["net_id"])
367 )
368
369 net["net_id"] = network_id
370
371 if params_copy["image_id"].startswith("TASK-"):
372 params_copy["image_id"] = task_depends[params_copy["image_id"]]
373
374 if params_copy["flavor_id"].startswith("TASK-"):
375 params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
376
377 affinity_group_list = params_copy["affinity_group_list"]
378 for affinity_group in affinity_group_list:
379 # change task_id into affinity_group_id
380 if "affinity_group_id" in affinity_group and affinity_group[
381 "affinity_group_id"
382 ].startswith("TASK-"):
383 affinity_group_id = task_depends[
384 affinity_group["affinity_group_id"]
385 ]
386
387 if not affinity_group_id:
388 raise NsWorkerException(
389 "found for {}".format(affinity_group["affinity_group_id"])
390 )
391
392 affinity_group["affinity_group_id"] = affinity_group_id
393
394 vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
395 interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
396
397 # add to created items previous_created_volumes (healing)
398 if task.get("previous_created_volumes"):
399 for k, v in task["previous_created_volumes"].items():
400 created_items[k] = v
401
402 ro_vim_item_update = {
403 "vim_id": vim_vm_id,
404 "vim_status": "BUILD",
405 "created": created,
406 "created_items": created_items,
407 "vim_details": None,
408 "vim_message": None,
409 "interfaces_vim_ids": interfaces,
410 "interfaces": [],
411 "interfaces_backup": [],
412 }
413 self.logger.debug(
414 "task={} {} new-vm={} created={}".format(
415 task_id, ro_task["target_id"], vim_vm_id, created
416 )
417 )
418
419 return "BUILD", ro_vim_item_update
420 except (vimconn.VimConnException, NsWorkerException) as e:
421 self.logger.debug(traceback.format_exc())
422 self.logger.error(
423 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
424 )
425 ro_vim_item_update = {
426 "vim_status": "VIM_ERROR",
427 "created": created,
428 "vim_message": str(e),
429 }
430
431 return "FAILED", ro_vim_item_update
432
433 def delete(self, ro_task, task_index):
434 task = ro_task["tasks"][task_index]
435 task_id = task["task_id"]
436 vm_vim_id = ro_task["vim_info"]["vim_id"]
437 ro_vim_item_update_ok = {
438 "vim_status": "DELETED",
439 "created": False,
440 "vim_message": "DELETED",
441 "vim_id": None,
442 }
443
444 try:
445 self.logger.debug(
446 "delete_vminstance: vm_vim_id={} created_items={}".format(
447 vm_vim_id, ro_task["vim_info"]["created_items"]
448 )
449 )
450 if vm_vim_id or ro_task["vim_info"]["created_items"]:
451 target_vim = self.my_vims[ro_task["target_id"]]
452 target_vim.delete_vminstance(
453 vm_vim_id,
454 ro_task["vim_info"]["created_items"],
455 ro_task["vim_info"].get("volumes_to_hold", []),
456 )
457 except vimconn.VimConnNotFoundException:
458 ro_vim_item_update_ok["vim_message"] = "already deleted"
459 except vimconn.VimConnException as e:
460 self.logger.error(
461 "ro_task={} vim={} del-vm={}: {}".format(
462 ro_task["_id"], ro_task["target_id"], vm_vim_id, e
463 )
464 )
465 ro_vim_item_update = {
466 "vim_status": "VIM_ERROR",
467 "vim_message": "Error while deleting: {}".format(e),
468 }
469
470 return "FAILED", ro_vim_item_update
471
472 self.logger.debug(
473 "task={} {} del-vm={} {}".format(
474 task_id,
475 ro_task["target_id"],
476 vm_vim_id,
477 ro_vim_item_update_ok.get("vim_message", ""),
478 )
479 )
480
481 return "DONE", ro_vim_item_update_ok
482
483 def refresh(self, ro_task):
484 """Call VIM to get vm status"""
485 ro_task_id = ro_task["_id"]
486 target_vim = self.my_vims[ro_task["target_id"]]
487 vim_id = ro_task["vim_info"]["vim_id"]
488
489 if not vim_id:
490 return None, None
491
492 vm_to_refresh_list = [vim_id]
493 try:
494 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
495 vim_info = vim_dict[vim_id]
496
497 if vim_info["status"] == "ACTIVE":
498 task_status = "DONE"
499 elif vim_info["status"] == "BUILD":
500 task_status = "BUILD"
501 else:
502 task_status = "FAILED"
503
504 # try to load and parse vim_information
505 try:
506 vim_info_info = yaml.safe_load(vim_info["vim_info"])
507 if vim_info_info.get("name"):
508 vim_info["name"] = vim_info_info["name"]
509 except Exception as vim_info_error:
510 self.logger.exception(
511 f"{vim_info_error} occured while getting the vim_info from yaml"
512 )
513 except vimconn.VimConnException as e:
514 # Mark all tasks at VIM_ERROR status
515 self.logger.error(
516 "ro_task={} vim={} get-vm={}: {}".format(
517 ro_task_id, ro_task["target_id"], vim_id, e
518 )
519 )
520 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
521 task_status = "FAILED"
522
523 ro_vim_item_update = {}
524
525 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
526 vim_interfaces = []
527 if vim_info.get("interfaces"):
528 for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
529 iface = next(
530 (
531 iface
532 for iface in vim_info["interfaces"]
533 if vim_iface_id == iface["vim_interface_id"]
534 ),
535 None,
536 )
537 # if iface:
538 # iface.pop("vim_info", None)
539 vim_interfaces.append(iface)
540
541 task_create = next(
542 t
543 for t in ro_task["tasks"]
544 if t and t["action"] == "CREATE" and t["status"] != "FINISHED"
545 )
546 if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None:
547 vim_interfaces[task_create["mgmt_vnf_interface"]][
548 "mgmt_vnf_interface"
549 ] = True
550
551 mgmt_vdu_iface = task_create.get(
552 "mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0)
553 )
554 if vim_interfaces:
555 vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
556
557 if ro_task["vim_info"]["interfaces"] != vim_interfaces:
558 ro_vim_item_update["interfaces"] = vim_interfaces
559
560 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
561 ro_vim_item_update["vim_status"] = vim_info["status"]
562
563 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
564 ro_vim_item_update["vim_name"] = vim_info.get("name")
565
566 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
567 if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"):
568 ro_vim_item_update["vim_message"] = vim_info.get("error_msg")
569 elif vim_info["status"] == "DELETED":
570 ro_vim_item_update["vim_id"] = None
571 ro_vim_item_update["vim_message"] = "Deleted externally"
572 else:
573 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
574 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
575
576 if ro_vim_item_update:
577 self.logger.debug(
578 "ro_task={} {} get-vm={}: status={} {}".format(
579 ro_task_id,
580 ro_task["target_id"],
581 vim_id,
582 ro_vim_item_update.get("vim_status"),
583 ro_vim_item_update.get("vim_message")
584 if ro_vim_item_update.get("vim_status") != "ACTIVE"
585 else "",
586 )
587 )
588
589 return task_status, ro_vim_item_update
590
591 def exec(self, ro_task, task_index, task_depends):
592 task = ro_task["tasks"][task_index]
593 task_id = task["task_id"]
594 target_vim = self.my_vims[ro_task["target_id"]]
595 db_task_update = {"retries": 0}
596 retries = task.get("retries", 0)
597
598 try:
599 params = task["params"]
600 params_copy = deepcopy(params)
601 params_copy["ro_key"] = self.db.decrypt(
602 params_copy.pop("private_key"),
603 params_copy.pop("schema_version"),
604 params_copy.pop("salt"),
605 )
606 params_copy["ip_addr"] = params_copy.pop("ip_address")
607 target_vim.inject_user_key(**params_copy)
608 self.logger.debug(
609 "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"])
610 )
611
612 return (
613 "DONE",
614 None,
615 db_task_update,
616 ) # params_copy["key"]
617 except (vimconn.VimConnException, NsWorkerException) as e:
618 retries += 1
619
620 self.logger.debug(traceback.format_exc())
621 if retries < self.max_retries_inject_ssh_key:
622 return (
623 "BUILD",
624 None,
625 {
626 "retries": retries,
627 "next_retry": self.time_retries_inject_ssh_key,
628 },
629 )
630
631 self.logger.error(
632 "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e)
633 )
634 ro_vim_item_update = {"vim_message": str(e)}
635
636 return "FAILED", ro_vim_item_update, db_task_update
637
638
639 class VimInteractionImage(VimInteractionBase):
640 def new(self, ro_task, task_index, task_depends):
641 task = ro_task["tasks"][task_index]
642 task_id = task["task_id"]
643 created = False
644 created_items = {}
645 target_vim = self.my_vims[ro_task["target_id"]]
646
647 try:
648 # FIND
649 if task.get("find_params"):
650 vim_images = target_vim.get_image_list(**task["find_params"])
651
652 if not vim_images:
653 raise NsWorkerExceptionNotFound(
654 "Image not found with this criteria: '{}'".format(
655 task["find_params"]
656 )
657 )
658 elif len(vim_images) > 1:
659 raise NsWorkerException(
660 "More than one image found with this criteria: '{}'".format(
661 task["find_params"]
662 )
663 )
664 else:
665 vim_image_id = vim_images[0]["id"]
666
667 ro_vim_item_update = {
668 "vim_id": vim_image_id,
669 "vim_status": "DONE",
670 "created": created,
671 "created_items": created_items,
672 "vim_details": None,
673 "vim_message": None,
674 }
675 self.logger.debug(
676 "task={} {} new-image={} created={}".format(
677 task_id, ro_task["target_id"], vim_image_id, created
678 )
679 )
680
681 return "DONE", ro_vim_item_update
682 except (NsWorkerException, vimconn.VimConnException) as e:
683 self.logger.error(
684 "task={} {} new-image: {}".format(task_id, ro_task["target_id"], e)
685 )
686 ro_vim_item_update = {
687 "vim_status": "VIM_ERROR",
688 "created": created,
689 "vim_message": str(e),
690 }
691
692 return "FAILED", ro_vim_item_update
693
694
695 class VimInteractionFlavor(VimInteractionBase):
696 def delete(self, ro_task, task_index):
697 task = ro_task["tasks"][task_index]
698 task_id = task["task_id"]
699 flavor_vim_id = ro_task["vim_info"]["vim_id"]
700 ro_vim_item_update_ok = {
701 "vim_status": "DELETED",
702 "created": False,
703 "vim_message": "DELETED",
704 "vim_id": None,
705 }
706
707 try:
708 if flavor_vim_id:
709 target_vim = self.my_vims[ro_task["target_id"]]
710 target_vim.delete_flavor(flavor_vim_id)
711 except vimconn.VimConnNotFoundException:
712 ro_vim_item_update_ok["vim_message"] = "already deleted"
713 except vimconn.VimConnException as e:
714 self.logger.error(
715 "ro_task={} vim={} del-flavor={}: {}".format(
716 ro_task["_id"], ro_task["target_id"], flavor_vim_id, e
717 )
718 )
719 ro_vim_item_update = {
720 "vim_status": "VIM_ERROR",
721 "vim_message": "Error while deleting: {}".format(e),
722 }
723
724 return "FAILED", ro_vim_item_update
725
726 self.logger.debug(
727 "task={} {} del-flavor={} {}".format(
728 task_id,
729 ro_task["target_id"],
730 flavor_vim_id,
731 ro_vim_item_update_ok.get("vim_message", ""),
732 )
733 )
734
735 return "DONE", ro_vim_item_update_ok
736
737 def new(self, ro_task, task_index, task_depends):
738 task = ro_task["tasks"][task_index]
739 task_id = task["task_id"]
740 created = False
741 created_items = {}
742 target_vim = self.my_vims[ro_task["target_id"]]
743
744 try:
745 # FIND
746 vim_flavor_id = None
747
748 if task.get("find_params"):
749 try:
750 flavor_data = task["find_params"]["flavor_data"]
751 vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
752 except vimconn.VimConnNotFoundException:
753 self.logger.warning("VimConnNotFoundException occured.")
754
755 if not vim_flavor_id and task.get("params"):
756 # CREATE
757 flavor_data = task["params"]["flavor_data"]
758 vim_flavor_id = target_vim.new_flavor(flavor_data)
759 created = True
760
761 ro_vim_item_update = {
762 "vim_id": vim_flavor_id,
763 "vim_status": "DONE",
764 "created": created,
765 "created_items": created_items,
766 "vim_details": None,
767 "vim_message": None,
768 }
769 self.logger.debug(
770 "task={} {} new-flavor={} created={}".format(
771 task_id, ro_task["target_id"], vim_flavor_id, created
772 )
773 )
774
775 return "DONE", ro_vim_item_update
776 except (vimconn.VimConnException, NsWorkerException) as e:
777 self.logger.error(
778 "task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e)
779 )
780 ro_vim_item_update = {
781 "vim_status": "VIM_ERROR",
782 "created": created,
783 "vim_message": str(e),
784 }
785
786 return "FAILED", ro_vim_item_update
787
788
789 class VimInteractionAffinityGroup(VimInteractionBase):
790 def delete(self, ro_task, task_index):
791 task = ro_task["tasks"][task_index]
792 task_id = task["task_id"]
793 affinity_group_vim_id = ro_task["vim_info"]["vim_id"]
794 ro_vim_item_update_ok = {
795 "vim_status": "DELETED",
796 "created": False,
797 "vim_message": "DELETED",
798 "vim_id": None,
799 }
800
801 try:
802 if affinity_group_vim_id:
803 target_vim = self.my_vims[ro_task["target_id"]]
804 target_vim.delete_affinity_group(affinity_group_vim_id)
805 except vimconn.VimConnNotFoundException:
806 ro_vim_item_update_ok["vim_message"] = "already deleted"
807 except vimconn.VimConnException as e:
808 self.logger.error(
809 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format(
810 ro_task["_id"], ro_task["target_id"], affinity_group_vim_id, e
811 )
812 )
813 ro_vim_item_update = {
814 "vim_status": "VIM_ERROR",
815 "vim_message": "Error while deleting: {}".format(e),
816 }
817
818 return "FAILED", ro_vim_item_update
819
820 self.logger.debug(
821 "task={} {} del-affinity-or-anti-affinity-group={} {}".format(
822 task_id,
823 ro_task["target_id"],
824 affinity_group_vim_id,
825 ro_vim_item_update_ok.get("vim_message", ""),
826 )
827 )
828
829 return "DONE", ro_vim_item_update_ok
830
831 def new(self, ro_task, task_index, task_depends):
832 task = ro_task["tasks"][task_index]
833 task_id = task["task_id"]
834 created = False
835 created_items = {}
836 target_vim = self.my_vims[ro_task["target_id"]]
837
838 try:
839 affinity_group_vim_id = None
840 affinity_group_data = None
841
842 if task.get("params"):
843 affinity_group_data = task["params"].get("affinity_group_data")
844
845 if affinity_group_data and affinity_group_data.get("vim-affinity-group-id"):
846 try:
847 param_affinity_group_id = task["params"]["affinity_group_data"].get(
848 "vim-affinity-group-id"
849 )
850 affinity_group_vim_id = target_vim.get_affinity_group(
851 param_affinity_group_id
852 ).get("id")
853 except vimconn.VimConnNotFoundException:
854 self.logger.error(
855 "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}"
856 "could not be found at VIM. Creating a new one.".format(
857 task_id, ro_task["target_id"], param_affinity_group_id
858 )
859 )
860
861 if not affinity_group_vim_id and affinity_group_data:
862 affinity_group_vim_id = target_vim.new_affinity_group(
863 affinity_group_data
864 )
865 created = True
866
867 ro_vim_item_update = {
868 "vim_id": affinity_group_vim_id,
869 "vim_status": "DONE",
870 "created": created,
871 "created_items": created_items,
872 "vim_details": None,
873 "vim_message": None,
874 }
875 self.logger.debug(
876 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format(
877 task_id, ro_task["target_id"], affinity_group_vim_id, created
878 )
879 )
880
881 return "DONE", ro_vim_item_update
882 except (vimconn.VimConnException, NsWorkerException) as e:
883 self.logger.error(
884 "task={} vim={} new-affinity-or-anti-affinity-group:"
885 " {}".format(task_id, ro_task["target_id"], e)
886 )
887 ro_vim_item_update = {
888 "vim_status": "VIM_ERROR",
889 "created": created,
890 "vim_message": str(e),
891 }
892
893 return "FAILED", ro_vim_item_update
894
895
896 class VimInteractionUpdateVdu(VimInteractionBase):
897 def exec(self, ro_task, task_index, task_depends):
898 task = ro_task["tasks"][task_index]
899 task_id = task["task_id"]
900 db_task_update = {"retries": 0}
901 created = False
902 created_items = {}
903 target_vim = self.my_vims[ro_task["target_id"]]
904
905 try:
906 if task.get("params"):
907 vim_vm_id = task["params"].get("vim_vm_id")
908 action = task["params"].get("action")
909 context = {action: action}
910 target_vim.action_vminstance(vim_vm_id, context)
911 # created = True
912 ro_vim_item_update = {
913 "vim_id": vim_vm_id,
914 "vim_status": "DONE",
915 "created": created,
916 "created_items": created_items,
917 "vim_details": None,
918 "vim_message": None,
919 }
920 self.logger.debug(
921 "task={} {} vm-migration done".format(task_id, ro_task["target_id"])
922 )
923 return "DONE", ro_vim_item_update, db_task_update
924 except (vimconn.VimConnException, NsWorkerException) as e:
925 self.logger.error(
926 "task={} vim={} VM Migration:"
927 " {}".format(task_id, ro_task["target_id"], e)
928 )
929 ro_vim_item_update = {
930 "vim_status": "VIM_ERROR",
931 "created": created,
932 "vim_message": str(e),
933 }
934
935 return "FAILED", ro_vim_item_update, db_task_update
936
937
938 class VimInteractionSdnNet(VimInteractionBase):
939 @staticmethod
940 def _match_pci(port_pci, mapping):
941 """
942 Check if port_pci matches with mapping
943 mapping can have brackets to indicate that several chars are accepted. e.g
944 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
945 :param port_pci: text
946 :param mapping: text, can contain brackets to indicate several chars are available
947 :return: True if matches, False otherwise
948 """
949 if not port_pci or not mapping:
950 return False
951 if port_pci == mapping:
952 return True
953
954 mapping_index = 0
955 pci_index = 0
956 while True:
957 bracket_start = mapping.find("[", mapping_index)
958
959 if bracket_start == -1:
960 break
961
962 bracket_end = mapping.find("]", bracket_start)
963 if bracket_end == -1:
964 break
965
966 length = bracket_start - mapping_index
967 if (
968 length
969 and port_pci[pci_index : pci_index + length]
970 != mapping[mapping_index:bracket_start]
971 ):
972 return False
973
974 if (
975 port_pci[pci_index + length]
976 not in mapping[bracket_start + 1 : bracket_end]
977 ):
978 return False
979
980 pci_index += length + 1
981 mapping_index = bracket_end + 1
982
983 if port_pci[pci_index:] != mapping[mapping_index:]:
984 return False
985
986 return True
987
988 def _get_interfaces(self, vlds_to_connect, vim_account_id):
989 """
990 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
991 :param vim_account_id:
992 :return:
993 """
994 interfaces = []
995
996 for vld in vlds_to_connect:
997 table, _, db_id = vld.partition(":")
998 db_id, _, vld = db_id.partition(":")
999 _, _, vld_id = vld.partition(".")
1000
1001 if table == "vnfrs":
1002 q_filter = {"vim-account-id": vim_account_id, "_id": db_id}
1003 iface_key = "vnf-vld-id"
1004 else: # table == "nsrs"
1005 q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id}
1006 iface_key = "ns-vld-id"
1007
1008 db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter)
1009
1010 for db_vnfr in db_vnfrs:
1011 for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())):
1012 for iface_index, interface in enumerate(vdur["interfaces"]):
1013 if interface.get(iface_key) == vld_id and interface.get(
1014 "type"
1015 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
1016 # only SR-IOV o PT
1017 interface_ = interface.copy()
1018 interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
1019 db_vnfr["_id"], vdu_index, iface_index
1020 )
1021
1022 if vdur.get("status") == "ERROR":
1023 interface_["status"] = "ERROR"
1024
1025 interfaces.append(interface_)
1026
1027 return interfaces
1028
1029 def refresh(self, ro_task):
1030 # look for task create
1031 task_create_index, _ = next(
1032 i_t
1033 for i_t in enumerate(ro_task["tasks"])
1034 if i_t[1]
1035 and i_t[1]["action"] == "CREATE"
1036 and i_t[1]["status"] != "FINISHED"
1037 )
1038
1039 return self.new(ro_task, task_create_index, None)
1040
1041 def new(self, ro_task, task_index, task_depends):
1042
1043 task = ro_task["tasks"][task_index]
1044 task_id = task["task_id"]
1045 target_vim = self.my_vims[ro_task["target_id"]]
1046
1047 sdn_net_id = ro_task["vim_info"]["vim_id"]
1048
1049 created_items = ro_task["vim_info"].get("created_items")
1050 connected_ports = ro_task["vim_info"].get("connected_ports", [])
1051 new_connected_ports = []
1052 last_update = ro_task["vim_info"].get("last_update", 0)
1053 sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD"
1054 error_list = []
1055 created = ro_task["vim_info"].get("created", False)
1056
1057 try:
1058 # CREATE
1059 params = task["params"]
1060 vlds_to_connect = params.get("vlds", [])
1061 associated_vim = params.get("target_vim")
1062 # external additional ports
1063 additional_ports = params.get("sdn-ports") or ()
1064 _, _, vim_account_id = (
1065 (None, None, None)
1066 if associated_vim is None
1067 else associated_vim.partition(":")
1068 )
1069
1070 if associated_vim:
1071 # get associated VIM
1072 if associated_vim not in self.db_vims:
1073 self.db_vims[associated_vim] = self.db.get_one(
1074 "vim_accounts", {"_id": vim_account_id}
1075 )
1076
1077 db_vim = self.db_vims[associated_vim]
1078
1079 # look for ports to connect
1080 ports = self._get_interfaces(vlds_to_connect, vim_account_id)
1081 # print(ports)
1082
1083 sdn_ports = []
1084 pending_ports = error_ports = 0
1085 vlan_used = None
1086 sdn_need_update = False
1087
1088 for port in ports:
1089 vlan_used = port.get("vlan") or vlan_used
1090
1091 # TODO. Do not connect if already done
1092 if not port.get("compute_node") or not port.get("pci"):
1093 if port.get("status") == "ERROR":
1094 error_ports += 1
1095 else:
1096 pending_ports += 1
1097 continue
1098
1099 pmap = None
1100 compute_node_mappings = next(
1101 (
1102 c
1103 for c in db_vim["config"].get("sdn-port-mapping", ())
1104 if c and c["compute_node"] == port["compute_node"]
1105 ),
1106 None,
1107 )
1108
1109 if compute_node_mappings:
1110 # process port_mapping pci of type 0000:af:1[01].[1357]
1111 pmap = next(
1112 (
1113 p
1114 for p in compute_node_mappings["ports"]
1115 if self._match_pci(port["pci"], p.get("pci"))
1116 ),
1117 None,
1118 )
1119
1120 if not pmap:
1121 if not db_vim["config"].get("mapping_not_needed"):
1122 error_list.append(
1123 "Port mapping not found for compute_node={} pci={}".format(
1124 port["compute_node"], port["pci"]
1125 )
1126 )
1127 continue
1128
1129 pmap = {}
1130
1131 service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"])
1132 new_port = {
1133 "service_endpoint_id": pmap.get("service_endpoint_id")
1134 or service_endpoint_id,
1135 "service_endpoint_encapsulation_type": "dot1q"
1136 if port["type"] == "SR-IOV"
1137 else None,
1138 "service_endpoint_encapsulation_info": {
1139 "vlan": port.get("vlan"),
1140 "mac": port.get("mac-address"),
1141 "device_id": pmap.get("device_id") or port["compute_node"],
1142 "device_interface_id": pmap.get("device_interface_id")
1143 or port["pci"],
1144 "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"),
1145 "switch_port": pmap.get("switch_port"),
1146 "service_mapping_info": pmap.get("service_mapping_info"),
1147 },
1148 }
1149
1150 # TODO
1151 # if port["modified_at"] > last_update:
1152 # sdn_need_update = True
1153 new_connected_ports.append(port["id"]) # TODO
1154 sdn_ports.append(new_port)
1155
1156 if error_ports:
1157 error_list.append(
1158 "{} interfaces have not been created as VDU is on ERROR status".format(
1159 error_ports
1160 )
1161 )
1162
1163 # connect external ports
1164 for index, additional_port in enumerate(additional_ports):
1165 additional_port_id = additional_port.get(
1166 "service_endpoint_id"
1167 ) or "external-{}".format(index)
1168 sdn_ports.append(
1169 {
1170 "service_endpoint_id": additional_port_id,
1171 "service_endpoint_encapsulation_type": additional_port.get(
1172 "service_endpoint_encapsulation_type", "dot1q"
1173 ),
1174 "service_endpoint_encapsulation_info": {
1175 "vlan": additional_port.get("vlan") or vlan_used,
1176 "mac": additional_port.get("mac_address"),
1177 "device_id": additional_port.get("device_id"),
1178 "device_interface_id": additional_port.get(
1179 "device_interface_id"
1180 ),
1181 "switch_dpid": additional_port.get("switch_dpid")
1182 or additional_port.get("switch_id"),
1183 "switch_port": additional_port.get("switch_port"),
1184 "service_mapping_info": additional_port.get(
1185 "service_mapping_info"
1186 ),
1187 },
1188 }
1189 )
1190 new_connected_ports.append(additional_port_id)
1191 sdn_info = ""
1192
1193 # if there are more ports to connect or they have been modified, call create/update
1194 if error_list:
1195 sdn_status = "ERROR"
1196 sdn_info = "; ".join(error_list)
1197 elif set(connected_ports) != set(new_connected_ports) or sdn_need_update:
1198 last_update = time.time()
1199
1200 if not sdn_net_id:
1201 if len(sdn_ports) < 2:
1202 sdn_status = "ACTIVE"
1203
1204 if not pending_ports:
1205 self.logger.debug(
1206 "task={} {} new-sdn-net done, less than 2 ports".format(
1207 task_id, ro_task["target_id"]
1208 )
1209 )
1210 else:
1211 net_type = params.get("type") or "ELAN"
1212 (
1213 sdn_net_id,
1214 created_items,
1215 ) = target_vim.create_connectivity_service(net_type, sdn_ports)
1216 created = True
1217 self.logger.debug(
1218 "task={} {} new-sdn-net={} created={}".format(
1219 task_id, ro_task["target_id"], sdn_net_id, created
1220 )
1221 )
1222 else:
1223 created_items = target_vim.edit_connectivity_service(
1224 sdn_net_id, conn_info=created_items, connection_points=sdn_ports
1225 )
1226 created = True
1227 self.logger.debug(
1228 "task={} {} update-sdn-net={} created={}".format(
1229 task_id, ro_task["target_id"], sdn_net_id, created
1230 )
1231 )
1232
1233 connected_ports = new_connected_ports
1234 elif sdn_net_id:
1235 wim_status_dict = target_vim.get_connectivity_service_status(
1236 sdn_net_id, conn_info=created_items
1237 )
1238 sdn_status = wim_status_dict["sdn_status"]
1239
1240 if wim_status_dict.get("sdn_info"):
1241 sdn_info = str(wim_status_dict.get("sdn_info")) or ""
1242
1243 if wim_status_dict.get("error_msg"):
1244 sdn_info = wim_status_dict.get("error_msg") or ""
1245
1246 if pending_ports:
1247 if sdn_status != "ERROR":
1248 sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1249 len(ports) - pending_ports, len(ports)
1250 )
1251
1252 if sdn_status == "ACTIVE":
1253 sdn_status = "BUILD"
1254
1255 ro_vim_item_update = {
1256 "vim_id": sdn_net_id,
1257 "vim_status": sdn_status,
1258 "created": created,
1259 "created_items": created_items,
1260 "connected_ports": connected_ports,
1261 "vim_details": sdn_info,
1262 "vim_message": None,
1263 "last_update": last_update,
1264 }
1265
1266 return sdn_status, ro_vim_item_update
1267 except Exception as e:
1268 self.logger.error(
1269 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e),
1270 exc_info=not isinstance(
1271 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1272 ),
1273 )
1274 ro_vim_item_update = {
1275 "vim_status": "VIM_ERROR",
1276 "created": created,
1277 "vim_message": str(e),
1278 }
1279
1280 return "FAILED", ro_vim_item_update
1281
1282 def delete(self, ro_task, task_index):
1283 task = ro_task["tasks"][task_index]
1284 task_id = task["task_id"]
1285 sdn_vim_id = ro_task["vim_info"].get("vim_id")
1286 ro_vim_item_update_ok = {
1287 "vim_status": "DELETED",
1288 "created": False,
1289 "vim_message": "DELETED",
1290 "vim_id": None,
1291 }
1292
1293 try:
1294 if sdn_vim_id:
1295 target_vim = self.my_vims[ro_task["target_id"]]
1296 target_vim.delete_connectivity_service(
1297 sdn_vim_id, ro_task["vim_info"].get("created_items")
1298 )
1299
1300 except Exception as e:
1301 if (
1302 isinstance(e, sdnconn.SdnConnectorError)
1303 and e.http_code == HTTPStatus.NOT_FOUND.value
1304 ):
1305 ro_vim_item_update_ok["vim_message"] = "already deleted"
1306 else:
1307 self.logger.error(
1308 "ro_task={} vim={} del-sdn-net={}: {}".format(
1309 ro_task["_id"], ro_task["target_id"], sdn_vim_id, e
1310 ),
1311 exc_info=not isinstance(
1312 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1313 ),
1314 )
1315 ro_vim_item_update = {
1316 "vim_status": "VIM_ERROR",
1317 "vim_message": "Error while deleting: {}".format(e),
1318 }
1319
1320 return "FAILED", ro_vim_item_update
1321
1322 self.logger.debug(
1323 "task={} {} del-sdn-net={} {}".format(
1324 task_id,
1325 ro_task["target_id"],
1326 sdn_vim_id,
1327 ro_vim_item_update_ok.get("vim_message", ""),
1328 )
1329 )
1330
1331 return "DONE", ro_vim_item_update_ok
1332
1333
1334 class VimInteractionMigration(VimInteractionBase):
1335 def exec(self, ro_task, task_index, task_depends):
1336 task = ro_task["tasks"][task_index]
1337 task_id = task["task_id"]
1338 db_task_update = {"retries": 0}
1339 target_vim = self.my_vims[ro_task["target_id"]]
1340 vim_interfaces = []
1341 created = False
1342 created_items = {}
1343 refreshed_vim_info = {}
1344
1345 try:
1346 if task.get("params"):
1347 vim_vm_id = task["params"].get("vim_vm_id")
1348 migrate_host = task["params"].get("migrate_host")
1349 _, migrated_compute_node = target_vim.migrate_instance(
1350 vim_vm_id, migrate_host
1351 )
1352
1353 if migrated_compute_node:
1354 # When VM is migrated, vdu["vim_info"] needs to be updated
1355 vdu_old_vim_info = task["params"]["vdu_vim_info"].get(
1356 ro_task["target_id"]
1357 )
1358
1359 # Refresh VM to get new vim_info
1360 vm_to_refresh_list = [vim_vm_id]
1361 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
1362 refreshed_vim_info = vim_dict[vim_vm_id]
1363
1364 if refreshed_vim_info.get("interfaces"):
1365 for old_iface in vdu_old_vim_info.get("interfaces"):
1366 iface = next(
1367 (
1368 iface
1369 for iface in refreshed_vim_info["interfaces"]
1370 if old_iface["vim_interface_id"]
1371 == iface["vim_interface_id"]
1372 ),
1373 None,
1374 )
1375 vim_interfaces.append(iface)
1376
1377 ro_vim_item_update = {
1378 "vim_id": vim_vm_id,
1379 "vim_status": "ACTIVE",
1380 "created": created,
1381 "created_items": created_items,
1382 "vim_details": None,
1383 "vim_message": None,
1384 }
1385
1386 if refreshed_vim_info and refreshed_vim_info.get("status") not in (
1387 "ERROR",
1388 "VIM_ERROR",
1389 ):
1390 ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"]
1391
1392 if vim_interfaces:
1393 ro_vim_item_update["interfaces"] = vim_interfaces
1394
1395 self.logger.debug(
1396 "task={} {} vm-migration done".format(task_id, ro_task["target_id"])
1397 )
1398
1399 return "DONE", ro_vim_item_update, db_task_update
1400
1401 except (vimconn.VimConnException, NsWorkerException) as e:
1402 self.logger.error(
1403 "task={} vim={} VM Migration:"
1404 " {}".format(task_id, ro_task["target_id"], e)
1405 )
1406 ro_vim_item_update = {
1407 "vim_status": "VIM_ERROR",
1408 "created": created,
1409 "vim_message": str(e),
1410 }
1411
1412 return "FAILED", ro_vim_item_update, db_task_update
1413
1414
1415 class VimInteractionResize(VimInteractionBase):
1416 def exec(self, ro_task, task_index, task_depends):
1417 task = ro_task["tasks"][task_index]
1418 task_id = task["task_id"]
1419 db_task_update = {"retries": 0}
1420 created = False
1421 target_flavor_uuid = None
1422 created_items = {}
1423 refreshed_vim_info = {}
1424 target_vim = self.my_vims[ro_task["target_id"]]
1425
1426 try:
1427 if task.get("params"):
1428 vim_vm_id = task["params"].get("vim_vm_id")
1429 flavor_dict = task["params"].get("flavor_dict")
1430 self.logger.info("flavor_dict %s", flavor_dict)
1431
1432 try:
1433 target_flavor_uuid = target_vim.get_flavor_id_from_data(flavor_dict)
1434 except Exception as e:
1435 self.logger.info("Cannot find any flavor matching %s.", str(e))
1436 try:
1437 target_flavor_uuid = target_vim.new_flavor(flavor_dict)
1438 except Exception as e:
1439 self.logger.error("Error creating flavor at VIM %s.", str(e))
1440
1441 if target_flavor_uuid is not None:
1442 resized_status = target_vim.resize_instance(
1443 vim_vm_id, target_flavor_uuid
1444 )
1445
1446 if resized_status:
1447 # Refresh VM to get new vim_info
1448 vm_to_refresh_list = [vim_vm_id]
1449 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
1450 refreshed_vim_info = vim_dict[vim_vm_id]
1451
1452 ro_vim_item_update = {
1453 "vim_id": vim_vm_id,
1454 "vim_status": "DONE",
1455 "created": created,
1456 "created_items": created_items,
1457 "vim_details": None,
1458 "vim_message": None,
1459 }
1460
1461 if refreshed_vim_info and refreshed_vim_info.get("status") not in (
1462 "ERROR",
1463 "VIM_ERROR",
1464 ):
1465 ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"]
1466
1467 self.logger.debug(
1468 "task={} {} resize done".format(task_id, ro_task["target_id"])
1469 )
1470 return "DONE", ro_vim_item_update, db_task_update
1471 except (vimconn.VimConnException, NsWorkerException) as e:
1472 self.logger.error(
1473 "task={} vim={} Resize:" " {}".format(task_id, ro_task["target_id"], e)
1474 )
1475 ro_vim_item_update = {
1476 "vim_status": "VIM_ERROR",
1477 "created": created,
1478 "vim_message": str(e),
1479 }
1480
1481 return "FAILED", ro_vim_item_update, db_task_update
1482
1483
1484 class ConfigValidate:
1485 def __init__(self, config: Dict):
1486 self.conf = config
1487
1488 @property
1489 def active(self):
1490 # default 1 min, allowed >= 60 or -1, -1 disables periodic checks
1491 if (
1492 self.conf["period"]["refresh_active"] >= 60
1493 or self.conf["period"]["refresh_active"] == -1
1494 ):
1495 return self.conf["period"]["refresh_active"]
1496
1497 return 60
1498
1499 @property
1500 def build(self):
1501 return self.conf["period"]["refresh_build"]
1502
1503 @property
1504 def image(self):
1505 return self.conf["period"]["refresh_image"]
1506
1507 @property
1508 def error(self):
1509 return self.conf["period"]["refresh_error"]
1510
1511 @property
1512 def queue_size(self):
1513 return self.conf["period"]["queue_size"]
1514
1515
1516 class NsWorker(threading.Thread):
1517 def __init__(self, worker_index, config, plugins, db):
1518 """
1519 :param worker_index: thread index
1520 :param config: general configuration of RO, among others the process_id with the docker id where it runs
1521 :param plugins: global shared dict with the loaded plugins
1522 :param db: database class instance to use
1523 """
1524 threading.Thread.__init__(self)
1525 self.config = config
1526 self.plugins = plugins
1527 self.plugin_name = "unknown"
1528 self.logger = logging.getLogger("ro.worker{}".format(worker_index))
1529 self.worker_index = worker_index
1530 # refresh periods for created items
1531 self.refresh_config = ConfigValidate(config)
1532 self.task_queue = queue.Queue(self.refresh_config.queue_size)
1533 # targetvim: vimplugin class
1534 self.my_vims = {}
1535 # targetvim: vim information from database
1536 self.db_vims = {}
1537 # targetvim list
1538 self.vim_targets = []
1539 self.my_id = config["process_id"] + ":" + str(worker_index)
1540 self.db = db
1541 self.item2class = {
1542 "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger),
1543 "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger),
1544 "image": VimInteractionImage(
1545 self.db, self.my_vims, self.db_vims, self.logger
1546 ),
1547 "flavor": VimInteractionFlavor(
1548 self.db, self.my_vims, self.db_vims, self.logger
1549 ),
1550 "sdn_net": VimInteractionSdnNet(
1551 self.db, self.my_vims, self.db_vims, self.logger
1552 ),
1553 "update": VimInteractionUpdateVdu(
1554 self.db, self.my_vims, self.db_vims, self.logger
1555 ),
1556 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
1557 self.db, self.my_vims, self.db_vims, self.logger
1558 ),
1559 "migrate": VimInteractionMigration(
1560 self.db, self.my_vims, self.db_vims, self.logger
1561 ),
1562 "verticalscale": VimInteractionResize(
1563 self.db, self.my_vims, self.db_vims, self.logger
1564 ),
1565 }
1566 self.time_last_task_processed = None
1567 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1568 self.tasks_to_delete = []
1569 # it is idle when there are not vim_targets associated
1570 self.idle = True
1571 self.task_locked_time = config["global"]["task_locked_time"]
1572
1573 def insert_task(self, task):
1574 try:
1575 self.task_queue.put(task, False)
1576 return None
1577 except queue.Full:
1578 raise NsWorkerException("timeout inserting a task")
1579
1580 def terminate(self):
1581 self.insert_task("exit")
1582
1583 def del_task(self, task):
1584 with self.task_lock:
1585 if task["status"] == "SCHEDULED":
1586 task["status"] = "SUPERSEDED"
1587 return True
1588 else: # task["status"] == "processing"
1589 self.task_lock.release()
1590 return False
1591
1592 def _process_vim_config(self, target_id: str, db_vim: dict) -> None:
1593 """
1594 Process vim config, creating vim configuration files as ca_cert
1595 :param target_id: vim/sdn/wim + id
1596 :param db_vim: Vim dictionary obtained from database
1597 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1598 """
1599 if not db_vim.get("config"):
1600 return
1601
1602 file_name = ""
1603 work_dir = "/app/osm_ro/certs"
1604
1605 try:
1606 if db_vim["config"].get("ca_cert_content"):
1607 file_name = f"{work_dir}/{target_id}:{self.worker_index}"
1608
1609 if not path.isdir(file_name):
1610 makedirs(file_name)
1611
1612 file_name = file_name + "/ca_cert"
1613
1614 with open(file_name, "w") as f:
1615 f.write(db_vim["config"]["ca_cert_content"])
1616 del db_vim["config"]["ca_cert_content"]
1617 db_vim["config"]["ca_cert"] = file_name
1618 except Exception as e:
1619 raise NsWorkerException(
1620 "Error writing to file '{}': {}".format(file_name, e)
1621 )
1622
1623 def _load_plugin(self, name, type="vim"):
1624 # type can be vim or sdn
1625 if "rovim_dummy" not in self.plugins:
1626 self.plugins["rovim_dummy"] = VimDummyConnector
1627
1628 if "rosdn_dummy" not in self.plugins:
1629 self.plugins["rosdn_dummy"] = SdnDummyConnector
1630
1631 if name in self.plugins:
1632 return self.plugins[name]
1633
1634 try:
1635 for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name):
1636 self.plugins[name] = ep.load()
1637 except Exception as e:
1638 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e))
1639
1640 if name and name not in self.plugins:
1641 raise NsWorkerException(
1642 "Plugin 'osm_{n}' has not been installed".format(n=name)
1643 )
1644
1645 return self.plugins[name]
1646
1647 def _unload_vim(self, target_id):
1648 """
1649 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1650 :param target_id: Contains type:_id; where type can be 'vim', ...
1651 :return: None.
1652 """
1653 try:
1654 self.db_vims.pop(target_id, None)
1655 self.my_vims.pop(target_id, None)
1656
1657 if target_id in self.vim_targets:
1658 self.vim_targets.remove(target_id)
1659
1660 self.logger.info("Unloaded {}".format(target_id))
1661 rmtree("{}:{}".format(target_id, self.worker_index))
1662 except FileNotFoundError:
1663 # This is raised by rmtree if folder does not exist.
1664 self.logger.exception("FileNotFoundError occured while unloading VIM.")
1665 except Exception as e:
1666 self.logger.error("Cannot unload {}: {}".format(target_id, e))
1667
1668 def _check_vim(self, target_id):
1669 """
1670 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1671 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1672 :return: None.
1673 """
1674 target, _, _id = target_id.partition(":")
1675 now = time.time()
1676 update_dict = {}
1677 unset_dict = {}
1678 op_text = ""
1679 step = ""
1680 loaded = target_id in self.vim_targets
1681 target_database = (
1682 "vim_accounts"
1683 if target == "vim"
1684 else "wim_accounts"
1685 if target == "wim"
1686 else "sdns"
1687 )
1688
1689 try:
1690 step = "Getting {} from db".format(target_id)
1691 db_vim = self.db.get_one(target_database, {"_id": _id})
1692
1693 for op_index, operation in enumerate(
1694 db_vim["_admin"].get("operations", ())
1695 ):
1696 if operation["operationState"] != "PROCESSING":
1697 continue
1698
1699 locked_at = operation.get("locked_at")
1700
1701 if locked_at is not None and locked_at >= now - self.task_locked_time:
1702 # some other thread is doing this operation
1703 return
1704
1705 # lock
1706 op_text = "_admin.operations.{}.".format(op_index)
1707
1708 if not self.db.set_one(
1709 target_database,
1710 q_filter={
1711 "_id": _id,
1712 op_text + "operationState": "PROCESSING",
1713 op_text + "locked_at": locked_at,
1714 },
1715 update_dict={
1716 op_text + "locked_at": now,
1717 "admin.current_operation": op_index,
1718 },
1719 fail_on_empty=False,
1720 ):
1721 return
1722
1723 unset_dict[op_text + "locked_at"] = None
1724 unset_dict["current_operation"] = None
1725 step = "Loading " + target_id
1726 error_text = self._load_vim(target_id)
1727
1728 if not error_text:
1729 step = "Checking connectivity"
1730
1731 if target == "vim":
1732 self.my_vims[target_id].check_vim_connectivity()
1733 else:
1734 self.my_vims[target_id].check_credentials()
1735
1736 update_dict["_admin.operationalState"] = "ENABLED"
1737 update_dict["_admin.detailed-status"] = ""
1738 unset_dict[op_text + "detailed-status"] = None
1739 update_dict[op_text + "operationState"] = "COMPLETED"
1740
1741 return
1742
1743 except Exception as e:
1744 error_text = "{}: {}".format(step, e)
1745 self.logger.error("{} for {}: {}".format(step, target_id, e))
1746
1747 finally:
1748 if update_dict or unset_dict:
1749 if error_text:
1750 update_dict[op_text + "operationState"] = "FAILED"
1751 update_dict[op_text + "detailed-status"] = error_text
1752 unset_dict.pop(op_text + "detailed-status", None)
1753 update_dict["_admin.operationalState"] = "ERROR"
1754 update_dict["_admin.detailed-status"] = error_text
1755
1756 if op_text:
1757 update_dict[op_text + "statusEnteredTime"] = now
1758
1759 self.db.set_one(
1760 target_database,
1761 q_filter={"_id": _id},
1762 update_dict=update_dict,
1763 unset=unset_dict,
1764 fail_on_empty=False,
1765 )
1766
1767 if not loaded:
1768 self._unload_vim(target_id)
1769
1770 def _reload_vim(self, target_id):
1771 if target_id in self.vim_targets:
1772 self._load_vim(target_id)
1773 else:
1774 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1775 # just remove it to force load again next time it is needed
1776 self.db_vims.pop(target_id, None)
1777
1778 def _load_vim(self, target_id):
1779 """
1780 Load or reload a vim_account, sdn_controller or wim_account.
1781 Read content from database, load the plugin if not loaded.
1782 In case of error loading the plugin, it load a failing VIM_connector
1783 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1784 :param target_id: Contains type:_id; where type can be 'vim', ...
1785 :return: None if ok, descriptive text if error
1786 """
1787 target, _, _id = target_id.partition(":")
1788 target_database = (
1789 "vim_accounts"
1790 if target == "vim"
1791 else "wim_accounts"
1792 if target == "wim"
1793 else "sdns"
1794 )
1795 plugin_name = ""
1796 vim = None
1797
1798 try:
1799 step = "Getting {}={} from db".format(target, _id)
1800 # TODO process for wim, sdnc, ...
1801 vim = self.db.get_one(target_database, {"_id": _id})
1802
1803 # if deep_get(vim, "config", "sdn-controller"):
1804 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1805 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1806
1807 step = "Decrypting password"
1808 schema_version = vim.get("schema_version")
1809 self.db.encrypt_decrypt_fields(
1810 vim,
1811 "decrypt",
1812 fields=("password", "secret"),
1813 schema_version=schema_version,
1814 salt=_id,
1815 )
1816 self._process_vim_config(target_id, vim)
1817
1818 if target == "vim":
1819 plugin_name = "rovim_" + vim["vim_type"]
1820 step = "Loading plugin '{}'".format(plugin_name)
1821 vim_module_conn = self._load_plugin(plugin_name)
1822 step = "Loading {}'".format(target_id)
1823 self.my_vims[target_id] = vim_module_conn(
1824 uuid=vim["_id"],
1825 name=vim["name"],
1826 tenant_id=vim.get("vim_tenant_id"),
1827 tenant_name=vim.get("vim_tenant_name"),
1828 url=vim["vim_url"],
1829 url_admin=None,
1830 user=vim["vim_user"],
1831 passwd=vim["vim_password"],
1832 config=vim.get("config") or {},
1833 persistent_info={},
1834 )
1835 else: # sdn
1836 plugin_name = "rosdn_" + (vim.get("type") or vim.get("wim_type"))
1837 step = "Loading plugin '{}'".format(plugin_name)
1838 vim_module_conn = self._load_plugin(plugin_name, "sdn")
1839 step = "Loading {}'".format(target_id)
1840 wim = deepcopy(vim)
1841 wim_config = wim.pop("config", {}) or {}
1842 wim["uuid"] = wim["_id"]
1843 if "url" in wim and "wim_url" not in wim:
1844 wim["wim_url"] = wim["url"]
1845 elif "url" not in wim and "wim_url" in wim:
1846 wim["url"] = wim["wim_url"]
1847
1848 if wim.get("dpid"):
1849 wim_config["dpid"] = wim.pop("dpid")
1850
1851 if wim.get("switch_id"):
1852 wim_config["switch_id"] = wim.pop("switch_id")
1853
1854 # wim, wim_account, config
1855 self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config)
1856 self.db_vims[target_id] = vim
1857 self.error_status = None
1858
1859 self.logger.info(
1860 "Connector loaded for {}, plugin={}".format(target_id, plugin_name)
1861 )
1862 except Exception as e:
1863 self.logger.error(
1864 "Cannot load {} plugin={}: {} {}".format(
1865 target_id, plugin_name, step, e
1866 )
1867 )
1868
1869 self.db_vims[target_id] = vim or {}
1870 self.db_vims[target_id] = FailingConnector(str(e))
1871 error_status = "{} Error: {}".format(step, e)
1872
1873 return error_status
1874 finally:
1875 if target_id not in self.vim_targets:
1876 self.vim_targets.append(target_id)
1877
1878 def _get_db_task(self):
1879 """
1880 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1881 :return: None
1882 """
1883 now = time.time()
1884
1885 if not self.time_last_task_processed:
1886 self.time_last_task_processed = now
1887
1888 try:
1889 while True:
1890 """
1891 # Log RO tasks only when loglevel is DEBUG
1892 if self.logger.getEffectiveLevel() == logging.DEBUG:
1893 self._log_ro_task(
1894 None,
1895 None,
1896 None,
1897 "TASK_WF",
1898 "task_locked_time="
1899 + str(self.task_locked_time)
1900 + " "
1901 + "time_last_task_processed="
1902 + str(self.time_last_task_processed)
1903 + " "
1904 + "now="
1905 + str(now),
1906 )
1907 """
1908 locked = self.db.set_one(
1909 "ro_tasks",
1910 q_filter={
1911 "target_id": self.vim_targets,
1912 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1913 "locked_at.lt": now - self.task_locked_time,
1914 "to_check_at.lt": self.time_last_task_processed,
1915 "to_check_at.gt": -1,
1916 },
1917 update_dict={"locked_by": self.my_id, "locked_at": now},
1918 fail_on_empty=False,
1919 )
1920
1921 if locked:
1922 # read and return
1923 ro_task = self.db.get_one(
1924 "ro_tasks",
1925 q_filter={
1926 "target_id": self.vim_targets,
1927 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1928 "locked_at": now,
1929 },
1930 )
1931 return ro_task
1932
1933 if self.time_last_task_processed == now:
1934 self.time_last_task_processed = None
1935 return None
1936 else:
1937 self.time_last_task_processed = now
1938 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
1939
1940 except DbException as e:
1941 self.logger.error("Database exception at _get_db_task: {}".format(e))
1942 except Exception as e:
1943 self.logger.critical(
1944 "Unexpected exception at _get_db_task: {}".format(e), exc_info=True
1945 )
1946
1947 return None
1948
1949 def _get_db_all_tasks(self):
1950 """
1951 Read all content of table ro_tasks to log it
1952 :return: None
1953 """
1954 try:
1955 # Checking the content of the BD:
1956
1957 # read and return
1958 ro_task = self.db.get_list("ro_tasks")
1959 for rt in ro_task:
1960 self._log_ro_task(rt, None, None, "TASK_WF", "GET_ALL_TASKS")
1961 return ro_task
1962
1963 except DbException as e:
1964 self.logger.error("Database exception at _get_db_all_tasks: {}".format(e))
1965 except Exception as e:
1966 self.logger.critical(
1967 "Unexpected exception at _get_db_all_tasks: {}".format(e), exc_info=True
1968 )
1969
1970 return None
1971
1972 def _log_ro_task(self, ro_task, db_ro_task_update, db_ro_task_delete, mark, event):
1973 """
1974 Generate a log with the following format:
1975
1976 Mark;Event;ro_task_id;locked_at;modified_at;created_at;to_check_at;locked_by;
1977 target_id;vim_info.refresh_at;vim_info;no_of_tasks;task_status;action_id;
1978 task_array_index;task_id;task_action;task_item;task_args
1979
1980 Example:
1981
1982 TASK_WF;GET_TASK;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;1642158724.8210013;
1983 1642158640.7986135;1642158640.7986135;1642158640.7986135;b134c9494e75:0a
1984 ;vim:b7ff9e24-8868-4d68-8a57-a59dc11d0327;None;{'created': False,
1985 'created_items': None, 'vim_id': None, 'vim_name': None, 'vim_status': None,
1986 'vim_details': None, 'vim_message': None, 'refresh_at': None};1;SCHEDULED;
1987 888f1864-749a-4fc2-bc1a-97c0fffd6a6f;0;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;
1988 CREATE;image;{'filter_dict': {'name': 'ubuntu-os-cloud:image-family:ubuntu-1804-lts'}}
1989 """
1990 try:
1991 line = []
1992 i = 0
1993 if ro_task is not None and isinstance(ro_task, dict):
1994 for t in ro_task["tasks"]:
1995 line.clear()
1996 line.append(mark)
1997 line.append(event)
1998 line.append(ro_task.get("_id", ""))
1999 line.append(str(ro_task.get("locked_at", "")))
2000 line.append(str(ro_task.get("modified_at", "")))
2001 line.append(str(ro_task.get("created_at", "")))
2002 line.append(str(ro_task.get("to_check_at", "")))
2003 line.append(str(ro_task.get("locked_by", "")))
2004 line.append(str(ro_task.get("target_id", "")))
2005 line.append(str(ro_task.get("vim_info", {}).get("refresh_at", "")))
2006 line.append(str(ro_task.get("vim_info", "")))
2007 line.append(str(ro_task.get("tasks", "")))
2008 if isinstance(t, dict):
2009 line.append(str(t.get("status", "")))
2010 line.append(str(t.get("action_id", "")))
2011 line.append(str(i))
2012 line.append(str(t.get("task_id", "")))
2013 line.append(str(t.get("action", "")))
2014 line.append(str(t.get("item", "")))
2015 line.append(str(t.get("find_params", "")))
2016 line.append(str(t.get("params", "")))
2017 else:
2018 line.extend([""] * 2)
2019 line.append(str(i))
2020 line.extend([""] * 5)
2021
2022 i += 1
2023 self.logger.debug(";".join(line))
2024 elif db_ro_task_update is not None and isinstance(db_ro_task_update, dict):
2025 i = 0
2026 while True:
2027 st = "tasks.{}.status".format(i)
2028 if st not in db_ro_task_update:
2029 break
2030 line.clear()
2031 line.append(mark)
2032 line.append(event)
2033 line.append(db_ro_task_update.get("_id", ""))
2034 line.append(str(db_ro_task_update.get("locked_at", "")))
2035 line.append(str(db_ro_task_update.get("modified_at", "")))
2036 line.append("")
2037 line.append(str(db_ro_task_update.get("to_check_at", "")))
2038 line.append(str(db_ro_task_update.get("locked_by", "")))
2039 line.append("")
2040 line.append(str(db_ro_task_update.get("vim_info.refresh_at", "")))
2041 line.append("")
2042 line.append(str(db_ro_task_update.get("vim_info", "")))
2043 line.append(str(str(db_ro_task_update).count(".status")))
2044 line.append(db_ro_task_update.get(st, ""))
2045 line.append("")
2046 line.append(str(i))
2047 line.extend([""] * 3)
2048 i += 1
2049 self.logger.debug(";".join(line))
2050
2051 elif db_ro_task_delete is not None and isinstance(db_ro_task_delete, dict):
2052 line.clear()
2053 line.append(mark)
2054 line.append(event)
2055 line.append(db_ro_task_delete.get("_id", ""))
2056 line.append("")
2057 line.append(db_ro_task_delete.get("modified_at", ""))
2058 line.extend([""] * 13)
2059 self.logger.debug(";".join(line))
2060
2061 else:
2062 line.clear()
2063 line.append(mark)
2064 line.append(event)
2065 line.extend([""] * 16)
2066 self.logger.debug(";".join(line))
2067
2068 except Exception as e:
2069 self.logger.error("Error logging ro_task: {}".format(e))
2070
2071 def _delete_task(self, ro_task, task_index, task_depends, db_update):
2072 """
2073 Determine if this task need to be done or superseded
2074 :return: None
2075 """
2076 my_task = ro_task["tasks"][task_index]
2077 task_id = my_task["task_id"]
2078 needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get(
2079 "created_items", False
2080 )
2081
2082 self.logger.debug("Needed delete: {}".format(needed_delete))
2083 if my_task["status"] == "FAILED":
2084 return None, None # TODO need to be retry??
2085
2086 try:
2087 for index, task in enumerate(ro_task["tasks"]):
2088 if index == task_index or not task:
2089 continue # own task
2090
2091 if (
2092 my_task["target_record"] == task["target_record"]
2093 and task["action"] == "CREATE"
2094 ):
2095 # set to finished
2096 db_update["tasks.{}.status".format(index)] = task[
2097 "status"
2098 ] = "FINISHED"
2099 elif task["action"] == "CREATE" and task["status"] not in (
2100 "FINISHED",
2101 "SUPERSEDED",
2102 ):
2103 needed_delete = False
2104
2105 if needed_delete:
2106 self.logger.debug(
2107 "Deleting ro_task={} task_index={}".format(ro_task, task_index)
2108 )
2109 return self.item2class[my_task["item"]].delete(ro_task, task_index)
2110 else:
2111 return "SUPERSEDED", None
2112 except Exception as e:
2113 if not isinstance(e, NsWorkerException):
2114 self.logger.critical(
2115 "Unexpected exception at _delete_task task={}: {}".format(
2116 task_id, e
2117 ),
2118 exc_info=True,
2119 )
2120
2121 return "FAILED", {"vim_status": "VIM_ERROR", "vim_message": str(e)}
2122
2123 def _create_task(self, ro_task, task_index, task_depends, db_update):
2124 """
2125 Determine if this task need to create something at VIM
2126 :return: None
2127 """
2128 my_task = ro_task["tasks"][task_index]
2129 task_id = my_task["task_id"]
2130 task_status = None
2131
2132 if my_task["status"] == "FAILED":
2133 return None, None # TODO need to be retry??
2134 elif my_task["status"] == "SCHEDULED":
2135 # check if already created by another task
2136 for index, task in enumerate(ro_task["tasks"]):
2137 if index == task_index or not task:
2138 continue # own task
2139
2140 if task["action"] == "CREATE" and task["status"] not in (
2141 "SCHEDULED",
2142 "FINISHED",
2143 "SUPERSEDED",
2144 ):
2145 return task["status"], "COPY_VIM_INFO"
2146
2147 try:
2148 task_status, ro_vim_item_update = self.item2class[my_task["item"]].new(
2149 ro_task, task_index, task_depends
2150 )
2151 # TODO update other CREATE tasks
2152 except Exception as e:
2153 if not isinstance(e, NsWorkerException):
2154 self.logger.error(
2155 "Error executing task={}: {}".format(task_id, e), exc_info=True
2156 )
2157
2158 task_status = "FAILED"
2159 ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_message": str(e)}
2160 # TODO update ro_vim_item_update
2161
2162 return task_status, ro_vim_item_update
2163 else:
2164 return None, None
2165
2166 def _get_dependency(self, task_id, ro_task=None, target_id=None):
2167 """
2168 Look for dependency task
2169 :param task_id: Can be one of
2170 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2171 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2172 3. task.task_id: "<action_id>:number"
2173 :param ro_task:
2174 :param target_id:
2175 :return: database ro_task plus index of task
2176 """
2177 if (
2178 task_id.startswith("vim:")
2179 or task_id.startswith("sdn:")
2180 or task_id.startswith("wim:")
2181 ):
2182 target_id, _, task_id = task_id.partition(" ")
2183
2184 if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
2185 ro_task_dependency = self.db.get_one(
2186 "ro_tasks",
2187 q_filter={"target_id": target_id, "tasks.target_record_id": task_id},
2188 fail_on_empty=False,
2189 )
2190
2191 if ro_task_dependency:
2192 for task_index, task in enumerate(ro_task_dependency["tasks"]):
2193 if task["target_record_id"] == task_id:
2194 return ro_task_dependency, task_index
2195
2196 else:
2197 if ro_task:
2198 for task_index, task in enumerate(ro_task["tasks"]):
2199 if task and task["task_id"] == task_id:
2200 return ro_task, task_index
2201
2202 ro_task_dependency = self.db.get_one(
2203 "ro_tasks",
2204 q_filter={
2205 "tasks.ANYINDEX.task_id": task_id,
2206 "tasks.ANYINDEX.target_record.ne": None,
2207 },
2208 fail_on_empty=False,
2209 )
2210
2211 self.logger.debug("ro_task_dependency={}".format(ro_task_dependency))
2212 if ro_task_dependency:
2213 for task_index, task in enumerate(ro_task_dependency["tasks"]):
2214 if task["task_id"] == task_id:
2215 return ro_task_dependency, task_index
2216 raise NsWorkerException("Cannot get depending task {}".format(task_id))
2217
2218 def update_vm_refresh(self):
2219 """Enables the VM status updates if self.refresh_config.active parameter
2220 is not -1 and than updates the DB accordingly
2221
2222 """
2223 try:
2224 self.logger.debug("Checking if VM status update config")
2225 next_refresh = time.time()
2226 if self.refresh_config.active == -1:
2227 next_refresh = -1
2228 else:
2229 next_refresh += self.refresh_config.active
2230
2231 if next_refresh != -1:
2232 db_ro_task_update = {}
2233 now = time.time()
2234 next_check_at = now + (24 * 60 * 60)
2235 next_check_at = min(next_check_at, next_refresh)
2236 db_ro_task_update["vim_info.refresh_at"] = next_refresh
2237 db_ro_task_update["to_check_at"] = next_check_at
2238
2239 self.logger.debug(
2240 "Finding tasks which to be updated to enable VM status updates"
2241 )
2242 refresh_tasks = self.db.get_list(
2243 "ro_tasks",
2244 q_filter={
2245 "tasks.status": "DONE",
2246 "to_check_at.lt": 0,
2247 },
2248 )
2249 self.logger.debug("Updating tasks to change the to_check_at status")
2250 for task in refresh_tasks:
2251 q_filter = {
2252 "_id": task["_id"],
2253 }
2254 self.db.set_one(
2255 "ro_tasks",
2256 q_filter=q_filter,
2257 update_dict=db_ro_task_update,
2258 fail_on_empty=True,
2259 )
2260
2261 except Exception as e:
2262 self.logger.error(f"Error updating tasks to enable VM status updates: {e}")
2263
2264 def _process_pending_tasks(self, ro_task):
2265 ro_task_id = ro_task["_id"]
2266 now = time.time()
2267 # one day
2268 next_check_at = now + (24 * 60 * 60)
2269 db_ro_task_update = {}
2270
2271 def _update_refresh(new_status):
2272 # compute next_refresh
2273 nonlocal task
2274 nonlocal next_check_at
2275 nonlocal db_ro_task_update
2276 nonlocal ro_task
2277
2278 next_refresh = time.time()
2279
2280 if task["item"] in ("image", "flavor"):
2281 next_refresh += self.refresh_config.image
2282 elif new_status == "BUILD":
2283 next_refresh += self.refresh_config.build
2284 elif new_status == "DONE":
2285 if self.refresh_config.active == -1:
2286 next_refresh = -1
2287 else:
2288 next_refresh += self.refresh_config.active
2289 else:
2290 next_refresh += self.refresh_config.error
2291
2292 next_check_at = min(next_check_at, next_refresh)
2293 db_ro_task_update["vim_info.refresh_at"] = next_refresh
2294 ro_task["vim_info"]["refresh_at"] = next_refresh
2295
2296 try:
2297 """
2298 # Log RO tasks only when loglevel is DEBUG
2299 if self.logger.getEffectiveLevel() == logging.DEBUG:
2300 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
2301 """
2302 # Check if vim status refresh is enabled again
2303 self.update_vm_refresh()
2304 # 0: get task_status_create
2305 lock_object = None
2306 task_status_create = None
2307 task_create = next(
2308 (
2309 t
2310 for t in ro_task["tasks"]
2311 if t
2312 and t["action"] == "CREATE"
2313 and t["status"] in ("BUILD", "DONE")
2314 ),
2315 None,
2316 )
2317
2318 if task_create:
2319 task_status_create = task_create["status"]
2320
2321 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
2322 for task_action in ("DELETE", "CREATE", "EXEC"):
2323 db_vim_update = None
2324 new_status = None
2325
2326 for task_index, task in enumerate(ro_task["tasks"]):
2327 if not task:
2328 continue # task deleted
2329
2330 task_depends = {}
2331 target_update = None
2332
2333 if (
2334 (
2335 task_action in ("DELETE", "EXEC")
2336 and task["status"] not in ("SCHEDULED", "BUILD")
2337 )
2338 or task["action"] != task_action
2339 or (
2340 task_action == "CREATE"
2341 and task["status"] in ("FINISHED", "SUPERSEDED")
2342 )
2343 ):
2344 continue
2345
2346 task_path = "tasks.{}.status".format(task_index)
2347 try:
2348 db_vim_info_update = None
2349
2350 if task["status"] == "SCHEDULED":
2351 # check if tasks that this depends on have been completed
2352 dependency_not_completed = False
2353
2354 for dependency_task_id in task.get("depends_on") or ():
2355 (
2356 dependency_ro_task,
2357 dependency_task_index,
2358 ) = self._get_dependency(
2359 dependency_task_id, target_id=ro_task["target_id"]
2360 )
2361 dependency_task = dependency_ro_task["tasks"][
2362 dependency_task_index
2363 ]
2364 self.logger.debug(
2365 "dependency_ro_task={} dependency_task_index={}".format(
2366 dependency_ro_task, dependency_task_index
2367 )
2368 )
2369
2370 if dependency_task["status"] == "SCHEDULED":
2371 dependency_not_completed = True
2372 next_check_at = min(
2373 next_check_at, dependency_ro_task["to_check_at"]
2374 )
2375 # must allow dependent task to be processed first
2376 # to do this set time after last_task_processed
2377 next_check_at = max(
2378 self.time_last_task_processed, next_check_at
2379 )
2380 break
2381 elif dependency_task["status"] == "FAILED":
2382 error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
2383 task["action"],
2384 task["item"],
2385 dependency_task["action"],
2386 dependency_task["item"],
2387 dependency_task_id,
2388 dependency_ro_task["vim_info"].get(
2389 "vim_message"
2390 ),
2391 )
2392 self.logger.error(
2393 "task={} {}".format(task["task_id"], error_text)
2394 )
2395 raise NsWorkerException(error_text)
2396
2397 task_depends[dependency_task_id] = dependency_ro_task[
2398 "vim_info"
2399 ]["vim_id"]
2400 task_depends[
2401 "TASK-{}".format(dependency_task_id)
2402 ] = dependency_ro_task["vim_info"]["vim_id"]
2403
2404 if dependency_not_completed:
2405 self.logger.warning(
2406 "DEPENDENCY NOT COMPLETED {}".format(
2407 dependency_ro_task["vim_info"]["vim_id"]
2408 )
2409 )
2410 # TODO set at vim_info.vim_details that it is waiting
2411 continue
2412
2413 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
2414 # the task of renew this locking. It will update database locket_at periodically
2415 if not lock_object:
2416 lock_object = LockRenew.add_lock_object(
2417 "ro_tasks", ro_task, self
2418 )
2419
2420 if task["action"] == "DELETE":
2421 (new_status, db_vim_info_update,) = self._delete_task(
2422 ro_task, task_index, task_depends, db_ro_task_update
2423 )
2424 new_status = (
2425 "FINISHED" if new_status == "DONE" else new_status
2426 )
2427 # ^with FINISHED instead of DONE it will not be refreshing
2428
2429 if new_status in ("FINISHED", "SUPERSEDED"):
2430 target_update = "DELETE"
2431 elif task["action"] == "EXEC":
2432 (
2433 new_status,
2434 db_vim_info_update,
2435 db_task_update,
2436 ) = self.item2class[task["item"]].exec(
2437 ro_task, task_index, task_depends
2438 )
2439 new_status = (
2440 "FINISHED" if new_status == "DONE" else new_status
2441 )
2442 # ^with FINISHED instead of DONE it will not be refreshing
2443
2444 if db_task_update:
2445 # load into database the modified db_task_update "retries" and "next_retry"
2446 if db_task_update.get("retries"):
2447 db_ro_task_update[
2448 "tasks.{}.retries".format(task_index)
2449 ] = db_task_update["retries"]
2450
2451 next_check_at = time.time() + db_task_update.get(
2452 "next_retry", 60
2453 )
2454 target_update = None
2455 elif task["action"] == "CREATE":
2456 if task["status"] == "SCHEDULED":
2457 if task_status_create:
2458 new_status = task_status_create
2459 target_update = "COPY_VIM_INFO"
2460 else:
2461 new_status, db_vim_info_update = self.item2class[
2462 task["item"]
2463 ].new(ro_task, task_index, task_depends)
2464 # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
2465 _update_refresh(new_status)
2466 else:
2467 refresh_at = ro_task["vim_info"]["refresh_at"]
2468 if refresh_at and refresh_at != -1 and now > refresh_at:
2469 (new_status, db_vim_info_update,) = self.item2class[
2470 task["item"]
2471 ].refresh(ro_task)
2472 _update_refresh(new_status)
2473 else:
2474 # The refresh is updated to avoid set the value of "refresh_at" to
2475 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2476 # because it can happen that in this case the task is never processed
2477 _update_refresh(task["status"])
2478
2479 except Exception as e:
2480 new_status = "FAILED"
2481 db_vim_info_update = {
2482 "vim_status": "VIM_ERROR",
2483 "vim_message": str(e),
2484 }
2485
2486 if not isinstance(
2487 e, (NsWorkerException, vimconn.VimConnException)
2488 ):
2489 self.logger.error(
2490 "Unexpected exception at _delete_task task={}: {}".format(
2491 task["task_id"], e
2492 ),
2493 exc_info=True,
2494 )
2495
2496 try:
2497 if db_vim_info_update:
2498 db_vim_update = db_vim_info_update.copy()
2499 db_ro_task_update.update(
2500 {
2501 "vim_info." + k: v
2502 for k, v in db_vim_info_update.items()
2503 }
2504 )
2505 ro_task["vim_info"].update(db_vim_info_update)
2506
2507 if new_status:
2508 if task_action == "CREATE":
2509 task_status_create = new_status
2510 db_ro_task_update[task_path] = new_status
2511
2512 if target_update or db_vim_update:
2513 if target_update == "DELETE":
2514 self._update_target(task, None)
2515 elif target_update == "COPY_VIM_INFO":
2516 self._update_target(task, ro_task["vim_info"])
2517 else:
2518 self._update_target(task, db_vim_update)
2519
2520 except Exception as e:
2521 if (
2522 isinstance(e, DbException)
2523 and e.http_code == HTTPStatus.NOT_FOUND
2524 ):
2525 # if the vnfrs or nsrs has been removed from database, this task must be removed
2526 self.logger.debug(
2527 "marking to delete task={}".format(task["task_id"])
2528 )
2529 self.tasks_to_delete.append(task)
2530 else:
2531 self.logger.error(
2532 "Unexpected exception at _update_target task={}: {}".format(
2533 task["task_id"], e
2534 ),
2535 exc_info=True,
2536 )
2537
2538 locked_at = ro_task["locked_at"]
2539
2540 if lock_object:
2541 locked_at = [
2542 lock_object["locked_at"],
2543 lock_object["locked_at"] + self.task_locked_time,
2544 ]
2545 # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
2546 # contain exactly locked_at + self.task_locked_time
2547 LockRenew.remove_lock_object(lock_object)
2548
2549 q_filter = {
2550 "_id": ro_task["_id"],
2551 "to_check_at": ro_task["to_check_at"],
2552 "locked_at": locked_at,
2553 }
2554 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2555 # outside this task (by ro_nbi) do not update it
2556 db_ro_task_update["locked_by"] = None
2557 # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
2558 db_ro_task_update["locked_at"] = int(now - self.task_locked_time)
2559 db_ro_task_update["modified_at"] = now
2560 db_ro_task_update["to_check_at"] = next_check_at
2561
2562 """
2563 # Log RO tasks only when loglevel is DEBUG
2564 if self.logger.getEffectiveLevel() == logging.DEBUG:
2565 db_ro_task_update_log = db_ro_task_update.copy()
2566 db_ro_task_update_log["_id"] = q_filter["_id"]
2567 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2568 """
2569
2570 if not self.db.set_one(
2571 "ro_tasks",
2572 update_dict=db_ro_task_update,
2573 q_filter=q_filter,
2574 fail_on_empty=False,
2575 ):
2576 del db_ro_task_update["to_check_at"]
2577 del q_filter["to_check_at"]
2578 """
2579 # Log RO tasks only when loglevel is DEBUG
2580 if self.logger.getEffectiveLevel() == logging.DEBUG:
2581 self._log_ro_task(
2582 None,
2583 db_ro_task_update_log,
2584 None,
2585 "TASK_WF",
2586 "SET_TASK " + str(q_filter),
2587 )
2588 """
2589 self.db.set_one(
2590 "ro_tasks",
2591 q_filter=q_filter,
2592 update_dict=db_ro_task_update,
2593 fail_on_empty=True,
2594 )
2595 except DbException as e:
2596 self.logger.error(
2597 "ro_task={} Error updating database {}".format(ro_task_id, e)
2598 )
2599 except Exception as e:
2600 self.logger.error(
2601 "Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True
2602 )
2603
2604 def _update_target(self, task, ro_vim_item_update):
2605 table, _, temp = task["target_record"].partition(":")
2606 _id, _, path_vim_status = temp.partition(":")
2607 path_item = path_vim_status[: path_vim_status.rfind(".")]
2608 path_item = path_item[: path_item.rfind(".")]
2609 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
2610 # path_item: dot separated list targeting record information, e.g. "vdur.10"
2611
2612 if ro_vim_item_update:
2613 update_dict = {
2614 path_vim_status + "." + k: v
2615 for k, v in ro_vim_item_update.items()
2616 if k
2617 in (
2618 "vim_id",
2619 "vim_details",
2620 "vim_message",
2621 "vim_name",
2622 "vim_status",
2623 "interfaces",
2624 "interfaces_backup",
2625 )
2626 }
2627
2628 if path_vim_status.startswith("vdur."):
2629 # for backward compatibility, add vdur.name apart from vdur.vim_name
2630 if ro_vim_item_update.get("vim_name"):
2631 update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"]
2632
2633 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
2634 if ro_vim_item_update.get("vim_id"):
2635 update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"]
2636
2637 # update general status
2638 if ro_vim_item_update.get("vim_status"):
2639 update_dict[path_item + ".status"] = ro_vim_item_update[
2640 "vim_status"
2641 ]
2642
2643 if ro_vim_item_update.get("interfaces"):
2644 path_interfaces = path_item + ".interfaces"
2645
2646 for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
2647 if iface:
2648 update_dict.update(
2649 {
2650 path_interfaces + ".{}.".format(i) + k: v
2651 for k, v in iface.items()
2652 if k in ("vlan", "compute_node", "pci")
2653 }
2654 )
2655
2656 # put ip_address and mac_address with ip-address and mac-address
2657 if iface.get("ip_address"):
2658 update_dict[
2659 path_interfaces + ".{}.".format(i) + "ip-address"
2660 ] = iface["ip_address"]
2661
2662 if iface.get("mac_address"):
2663 update_dict[
2664 path_interfaces + ".{}.".format(i) + "mac-address"
2665 ] = iface["mac_address"]
2666
2667 if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
2668 update_dict["ip-address"] = iface.get("ip_address").split(
2669 ";"
2670 )[0]
2671
2672 if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
2673 update_dict[path_item + ".ip-address"] = iface.get(
2674 "ip_address"
2675 ).split(";")[0]
2676
2677 self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
2678
2679 # If interfaces exists, it backups VDU interfaces in the DB for healing operations
2680 if ro_vim_item_update.get("interfaces"):
2681 search_key = path_vim_status + ".interfaces"
2682 if update_dict.get(search_key):
2683 interfaces_backup_update = {
2684 path_vim_status + ".interfaces_backup": update_dict[search_key]
2685 }
2686
2687 self.db.set_one(
2688 table,
2689 q_filter={"_id": _id},
2690 update_dict=interfaces_backup_update,
2691 )
2692
2693 else:
2694 update_dict = {path_item + ".status": "DELETED"}
2695 self.db.set_one(
2696 table,
2697 q_filter={"_id": _id},
2698 update_dict=update_dict,
2699 unset={path_vim_status: None},
2700 )
2701
2702 def _process_delete_db_tasks(self):
2703 """
2704 Delete task from database because vnfrs or nsrs or both have been deleted
2705 :return: None. Uses and modify self.tasks_to_delete
2706 """
2707 while self.tasks_to_delete:
2708 task = self.tasks_to_delete[0]
2709 vnfrs_deleted = None
2710 nsr_id = task["nsr_id"]
2711
2712 if task["target_record"].startswith("vnfrs:"):
2713 # check if nsrs is present
2714 if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False):
2715 vnfrs_deleted = task["target_record"].split(":")[1]
2716
2717 try:
2718 self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted)
2719 except Exception as e:
2720 self.logger.error(
2721 "Error deleting task={}: {}".format(task["task_id"], e)
2722 )
2723 self.tasks_to_delete.pop(0)
2724
2725 @staticmethod
2726 def delete_db_tasks(db, nsr_id, vnfrs_deleted):
2727 """
2728 Static method because it is called from osm_ng_ro.ns
2729 :param db: instance of database to use
2730 :param nsr_id: affected nsrs id
2731 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2732 :return: None, exception is fails
2733 """
2734 retries = 5
2735 for retry in range(retries):
2736 ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
2737 now = time.time()
2738 conflict = False
2739
2740 for ro_task in ro_tasks:
2741 db_update = {}
2742 to_delete_ro_task = True
2743
2744 for index, task in enumerate(ro_task["tasks"]):
2745 if not task:
2746 pass
2747 elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or (
2748 vnfrs_deleted
2749 and task["target_record"].startswith("vnfrs:" + vnfrs_deleted)
2750 ):
2751 db_update["tasks.{}".format(index)] = None
2752 else:
2753 # used by other nsr, ro_task cannot be deleted
2754 to_delete_ro_task = False
2755
2756 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2757 if to_delete_ro_task:
2758 if not db.del_one(
2759 "ro_tasks",
2760 q_filter={
2761 "_id": ro_task["_id"],
2762 "modified_at": ro_task["modified_at"],
2763 },
2764 fail_on_empty=False,
2765 ):
2766 conflict = True
2767 elif db_update:
2768 db_update["modified_at"] = now
2769 if not db.set_one(
2770 "ro_tasks",
2771 q_filter={
2772 "_id": ro_task["_id"],
2773 "modified_at": ro_task["modified_at"],
2774 },
2775 update_dict=db_update,
2776 fail_on_empty=False,
2777 ):
2778 conflict = True
2779 if not conflict:
2780 return
2781 else:
2782 raise NsWorkerException("Exceeded {} retries".format(retries))
2783
2784 def run(self):
2785 # load database
2786 self.logger.info("Starting")
2787 while True:
2788 # step 1: get commands from queue
2789 try:
2790 if self.vim_targets:
2791 task = self.task_queue.get(block=False)
2792 else:
2793 if not self.idle:
2794 self.logger.debug("enters in idle state")
2795 self.idle = True
2796 task = self.task_queue.get(block=True)
2797 self.idle = False
2798
2799 if task[0] == "terminate":
2800 break
2801 elif task[0] == "load_vim":
2802 self.logger.info("order to load vim {}".format(task[1]))
2803 self._load_vim(task[1])
2804 elif task[0] == "unload_vim":
2805 self.logger.info("order to unload vim {}".format(task[1]))
2806 self._unload_vim(task[1])
2807 elif task[0] == "reload_vim":
2808 self._reload_vim(task[1])
2809 elif task[0] == "check_vim":
2810 self.logger.info("order to check vim {}".format(task[1]))
2811 self._check_vim(task[1])
2812 continue
2813 except Exception as e:
2814 if isinstance(e, queue.Empty):
2815 pass
2816 else:
2817 self.logger.critical(
2818 "Error processing task: {}".format(e), exc_info=True
2819 )
2820
2821 # step 2: process pending_tasks, delete not needed tasks
2822 try:
2823 if self.tasks_to_delete:
2824 self._process_delete_db_tasks()
2825 busy = False
2826 """
2827 # Log RO tasks only when loglevel is DEBUG
2828 if self.logger.getEffectiveLevel() == logging.DEBUG:
2829 _ = self._get_db_all_tasks()
2830 """
2831 ro_task = self._get_db_task()
2832 if ro_task:
2833 self.logger.debug("Task to process: {}".format(ro_task))
2834 time.sleep(1)
2835 self._process_pending_tasks(ro_task)
2836 busy = True
2837 if not busy:
2838 time.sleep(5)
2839 except Exception as e:
2840 self.logger.critical(
2841 "Unexpected exception at run: " + str(e), exc_info=True
2842 )
2843
2844 self.logger.info("Finishing")