Enabling Flake8 and import sorting
[osm/RO.git] / NG-RO / osm_ng_ro / ns_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 #
18 ##
19
20 """"
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
25 """
26
27 from copy import deepcopy
28 from http import HTTPStatus
29 import logging
30 from os import mkdir
31 import queue
32 from shutil import rmtree
33 import threading
34 import time
35 from unittest.mock import Mock
36
37 from importlib_metadata import entry_points
38 from osm_common.dbbase import DbException
39 from osm_ng_ro.vim_admin import LockRenew
40 from osm_ro_plugin import sdnconn, vimconn
41 from osm_ro_plugin.sdn_dummy import SdnDummyConnector
42 from osm_ro_plugin.vim_dummy import VimDummyConnector
43 import yaml
44
45 __author__ = "Alfonso Tierno"
46 __date__ = "$28-Sep-2017 12:07:15$"
47
48
49 def deep_get(target_dict, *args, **kwargs):
50 """
51 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
52 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
53 :param target_dict: dictionary to be read
54 :param args: list of keys to read from target_dict
55 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
56 :return: The wanted value if exist, None or default otherwise
57 """
58 for key in args:
59 if not isinstance(target_dict, dict) or key not in target_dict:
60 return kwargs.get("default")
61 target_dict = target_dict[key]
62 return target_dict
63
64
65 class NsWorkerException(Exception):
66 pass
67
68
69 class FailingConnector:
70 def __init__(self, error_msg):
71 self.error_msg = error_msg
72
73 for method in dir(vimconn.VimConnector):
74 if method[0] != "_":
75 setattr(
76 self, method, Mock(side_effect=vimconn.VimConnException(error_msg))
77 )
78
79 for method in dir(sdnconn.SdnConnectorBase):
80 if method[0] != "_":
81 setattr(
82 self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg))
83 )
84
85
86 class NsWorkerExceptionNotFound(NsWorkerException):
87 pass
88
89
90 class VimInteractionBase:
91 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
92 It implements methods that does nothing and return ok"""
93
94 def __init__(self, db, my_vims, db_vims, logger):
95 self.db = db
96 self.logger = logger
97 self.my_vims = my_vims
98 self.db_vims = db_vims
99
100 def new(self, ro_task, task_index, task_depends):
101 return "BUILD", {}
102
103 def refresh(self, ro_task):
104 """skip calling VIM to get image, flavor status. Assumes ok"""
105 if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
106 return "FAILED", {}
107
108 return "DONE", {}
109
110 def delete(self, ro_task, task_index):
111 """skip calling VIM to delete image. Assumes ok"""
112 return "DONE", {}
113
114 def exec(self, ro_task, task_index, task_depends):
115 return "DONE", None, None
116
117
118 class VimInteractionNet(VimInteractionBase):
119 def new(self, ro_task, task_index, task_depends):
120 vim_net_id = None
121 task = ro_task["tasks"][task_index]
122 task_id = task["task_id"]
123 created = False
124 created_items = {}
125 target_vim = self.my_vims[ro_task["target_id"]]
126
127 try:
128 # FIND
129 if task.get("find_params"):
130 # if management, get configuration of VIM
131 if task["find_params"].get("filter_dict"):
132 vim_filter = task["find_params"]["filter_dict"]
133 # mamagement network
134 elif task["find_params"].get("mgmt"):
135 if deep_get(
136 self.db_vims[ro_task["target_id"]],
137 "config",
138 "management_network_id",
139 ):
140 vim_filter = {
141 "id": self.db_vims[ro_task["target_id"]]["config"][
142 "management_network_id"
143 ]
144 }
145 elif deep_get(
146 self.db_vims[ro_task["target_id"]],
147 "config",
148 "management_network_name",
149 ):
150 vim_filter = {
151 "name": self.db_vims[ro_task["target_id"]]["config"][
152 "management_network_name"
153 ]
154 }
155 else:
156 vim_filter = {"name": task["find_params"]["name"]}
157 else:
158 raise NsWorkerExceptionNotFound(
159 "Invalid find_params for new_net {}".format(task["find_params"])
160 )
161
162 vim_nets = target_vim.get_network_list(vim_filter)
163 if not vim_nets and not task.get("params"):
164 raise NsWorkerExceptionNotFound(
165 "Network not found with this criteria: '{}'".format(
166 task.get("find_params")
167 )
168 )
169 elif len(vim_nets) > 1:
170 raise NsWorkerException(
171 "More than one network found with this criteria: '{}'".format(
172 task["find_params"]
173 )
174 )
175
176 if vim_nets:
177 vim_net_id = vim_nets[0]["id"]
178 else:
179 # CREATE
180 params = task["params"]
181 vim_net_id, created_items = target_vim.new_network(**params)
182 created = True
183
184 ro_vim_item_update = {
185 "vim_id": vim_net_id,
186 "vim_status": "BUILD",
187 "created": created,
188 "created_items": created_items,
189 "vim_details": None,
190 }
191 self.logger.debug(
192 "task={} {} new-net={} created={}".format(
193 task_id, ro_task["target_id"], vim_net_id, created
194 )
195 )
196
197 return "BUILD", ro_vim_item_update
198 except (vimconn.VimConnException, NsWorkerException) as e:
199 self.logger.error(
200 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e)
201 )
202 ro_vim_item_update = {
203 "vim_status": "VIM_ERROR",
204 "created": created,
205 "vim_details": str(e),
206 }
207
208 return "FAILED", ro_vim_item_update
209
210 def refresh(self, ro_task):
211 """Call VIM to get network status"""
212 ro_task_id = ro_task["_id"]
213 target_vim = self.my_vims[ro_task["target_id"]]
214 vim_id = ro_task["vim_info"]["vim_id"]
215 net_to_refresh_list = [vim_id]
216
217 try:
218 vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
219 vim_info = vim_dict[vim_id]
220
221 if vim_info["status"] == "ACTIVE":
222 task_status = "DONE"
223 elif vim_info["status"] == "BUILD":
224 task_status = "BUILD"
225 else:
226 task_status = "FAILED"
227 except vimconn.VimConnException as e:
228 # Mark all tasks at VIM_ERROR status
229 self.logger.error(
230 "ro_task={} vim={} get-net={}: {}".format(
231 ro_task_id, ro_task["target_id"], vim_id, e
232 )
233 )
234 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
235 task_status = "FAILED"
236
237 ro_vim_item_update = {}
238 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
239 ro_vim_item_update["vim_status"] = vim_info["status"]
240
241 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
242 ro_vim_item_update["vim_name"] = vim_info.get("name")
243
244 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
245 if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"):
246 ro_vim_item_update["vim_details"] = vim_info.get("error_msg")
247 elif vim_info["status"] == "DELETED":
248 ro_vim_item_update["vim_id"] = None
249 ro_vim_item_update["vim_details"] = "Deleted externally"
250 else:
251 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
252 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
253
254 if ro_vim_item_update:
255 self.logger.debug(
256 "ro_task={} {} get-net={}: status={} {}".format(
257 ro_task_id,
258 ro_task["target_id"],
259 vim_id,
260 ro_vim_item_update.get("vim_status"),
261 ro_vim_item_update.get("vim_details")
262 if ro_vim_item_update.get("vim_status") != "ACTIVE"
263 else "",
264 )
265 )
266
267 return task_status, ro_vim_item_update
268
269 def delete(self, ro_task, task_index):
270 task = ro_task["tasks"][task_index]
271 task_id = task["task_id"]
272 net_vim_id = ro_task["vim_info"]["vim_id"]
273 ro_vim_item_update_ok = {
274 "vim_status": "DELETED",
275 "created": False,
276 "vim_details": "DELETED",
277 "vim_id": None,
278 }
279
280 try:
281 if net_vim_id or ro_task["vim_info"]["created_items"]:
282 target_vim = self.my_vims[ro_task["target_id"]]
283 target_vim.delete_network(
284 net_vim_id, ro_task["vim_info"]["created_items"]
285 )
286 except vimconn.VimConnNotFoundException:
287 ro_vim_item_update_ok["vim_details"] = "already deleted"
288 except vimconn.VimConnException as e:
289 self.logger.error(
290 "ro_task={} vim={} del-net={}: {}".format(
291 ro_task["_id"], ro_task["target_id"], net_vim_id, e
292 )
293 )
294 ro_vim_item_update = {
295 "vim_status": "VIM_ERROR",
296 "vim_details": "Error while deleting: {}".format(e),
297 }
298
299 return "FAILED", ro_vim_item_update
300
301 self.logger.debug(
302 "task={} {} del-net={} {}".format(
303 task_id,
304 ro_task["target_id"],
305 net_vim_id,
306 ro_vim_item_update_ok.get("vim_details", ""),
307 )
308 )
309
310 return "DONE", ro_vim_item_update_ok
311
312
313 class VimInteractionVdu(VimInteractionBase):
314 max_retries_inject_ssh_key = 20 # 20 times
315 time_retries_inject_ssh_key = 30 # wevery 30 seconds
316
317 def new(self, ro_task, task_index, task_depends):
318 task = ro_task["tasks"][task_index]
319 task_id = task["task_id"]
320 created = False
321 created_items = {}
322 target_vim = self.my_vims[ro_task["target_id"]]
323
324 try:
325 created = True
326 params = task["params"]
327 params_copy = deepcopy(params)
328 net_list = params_copy["net_list"]
329
330 for net in net_list:
331 # change task_id into network_id
332 if "net_id" in net and net["net_id"].startswith("TASK-"):
333 network_id = task_depends[net["net_id"]]
334
335 if not network_id:
336 raise NsWorkerException(
337 "Cannot create VM because depends on a network not created or found "
338 "for {}".format(net["net_id"])
339 )
340
341 net["net_id"] = network_id
342
343 if params_copy["image_id"].startswith("TASK-"):
344 params_copy["image_id"] = task_depends[params_copy["image_id"]]
345
346 if params_copy["flavor_id"].startswith("TASK-"):
347 params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
348
349 vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
350 interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
351
352 ro_vim_item_update = {
353 "vim_id": vim_vm_id,
354 "vim_status": "BUILD",
355 "created": created,
356 "created_items": created_items,
357 "vim_details": None,
358 "interfaces_vim_ids": interfaces,
359 "interfaces": [],
360 }
361 self.logger.debug(
362 "task={} {} new-vm={} created={}".format(
363 task_id, ro_task["target_id"], vim_vm_id, created
364 )
365 )
366
367 return "BUILD", ro_vim_item_update
368 except (vimconn.VimConnException, NsWorkerException) as e:
369 self.logger.error(
370 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
371 )
372 ro_vim_item_update = {
373 "vim_status": "VIM_ERROR",
374 "created": created,
375 "vim_details": str(e),
376 }
377
378 return "FAILED", ro_vim_item_update
379
380 def delete(self, ro_task, task_index):
381 task = ro_task["tasks"][task_index]
382 task_id = task["task_id"]
383 vm_vim_id = ro_task["vim_info"]["vim_id"]
384 ro_vim_item_update_ok = {
385 "vim_status": "DELETED",
386 "created": False,
387 "vim_details": "DELETED",
388 "vim_id": None,
389 }
390
391 try:
392 if vm_vim_id or ro_task["vim_info"]["created_items"]:
393 target_vim = self.my_vims[ro_task["target_id"]]
394 target_vim.delete_vminstance(
395 vm_vim_id, ro_task["vim_info"]["created_items"]
396 )
397 except vimconn.VimConnNotFoundException:
398 ro_vim_item_update_ok["vim_details"] = "already deleted"
399 except vimconn.VimConnException as e:
400 self.logger.error(
401 "ro_task={} vim={} del-vm={}: {}".format(
402 ro_task["_id"], ro_task["target_id"], vm_vim_id, e
403 )
404 )
405 ro_vim_item_update = {
406 "vim_status": "VIM_ERROR",
407 "vim_details": "Error while deleting: {}".format(e),
408 }
409
410 return "FAILED", ro_vim_item_update
411
412 self.logger.debug(
413 "task={} {} del-vm={} {}".format(
414 task_id,
415 ro_task["target_id"],
416 vm_vim_id,
417 ro_vim_item_update_ok.get("vim_details", ""),
418 )
419 )
420
421 return "DONE", ro_vim_item_update_ok
422
423 def refresh(self, ro_task):
424 """Call VIM to get vm status"""
425 ro_task_id = ro_task["_id"]
426 target_vim = self.my_vims[ro_task["target_id"]]
427 vim_id = ro_task["vim_info"]["vim_id"]
428
429 if not vim_id:
430 return None, None
431
432 vm_to_refresh_list = [vim_id]
433 try:
434 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
435 vim_info = vim_dict[vim_id]
436
437 if vim_info["status"] == "ACTIVE":
438 task_status = "DONE"
439 elif vim_info["status"] == "BUILD":
440 task_status = "BUILD"
441 else:
442 task_status = "FAILED"
443
444 # try to load and parse vim_information
445 try:
446 vim_info_info = yaml.safe_load(vim_info["vim_info"])
447 if vim_info_info.get("name"):
448 vim_info["name"] = vim_info_info["name"]
449 except Exception:
450 pass
451 except vimconn.VimConnException as e:
452 # Mark all tasks at VIM_ERROR status
453 self.logger.error(
454 "ro_task={} vim={} get-vm={}: {}".format(
455 ro_task_id, ro_task["target_id"], vim_id, e
456 )
457 )
458 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
459 task_status = "FAILED"
460
461 ro_vim_item_update = {}
462
463 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
464 vim_interfaces = []
465 if vim_info.get("interfaces"):
466 for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
467 iface = next(
468 (
469 iface
470 for iface in vim_info["interfaces"]
471 if vim_iface_id == iface["vim_interface_id"]
472 ),
473 None,
474 )
475 # if iface:
476 # iface.pop("vim_info", None)
477 vim_interfaces.append(iface)
478
479 task_create = next(
480 t
481 for t in ro_task["tasks"]
482 if t and t["action"] == "CREATE" and t["status"] != "FINISHED"
483 )
484 if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None:
485 vim_interfaces[task_create["mgmt_vnf_interface"]][
486 "mgmt_vnf_interface"
487 ] = True
488
489 mgmt_vdu_iface = task_create.get(
490 "mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0)
491 )
492 if vim_interfaces:
493 vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
494
495 if ro_task["vim_info"]["interfaces"] != vim_interfaces:
496 ro_vim_item_update["interfaces"] = vim_interfaces
497
498 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
499 ro_vim_item_update["vim_status"] = vim_info["status"]
500
501 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
502 ro_vim_item_update["vim_name"] = vim_info.get("name")
503
504 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
505 if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"):
506 ro_vim_item_update["vim_details"] = vim_info.get("error_msg")
507 elif vim_info["status"] == "DELETED":
508 ro_vim_item_update["vim_id"] = None
509 ro_vim_item_update["vim_details"] = "Deleted externally"
510 else:
511 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
512 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
513
514 if ro_vim_item_update:
515 self.logger.debug(
516 "ro_task={} {} get-vm={}: status={} {}".format(
517 ro_task_id,
518 ro_task["target_id"],
519 vim_id,
520 ro_vim_item_update.get("vim_status"),
521 ro_vim_item_update.get("vim_details")
522 if ro_vim_item_update.get("vim_status") != "ACTIVE"
523 else "",
524 )
525 )
526
527 return task_status, ro_vim_item_update
528
529 def exec(self, ro_task, task_index, task_depends):
530 task = ro_task["tasks"][task_index]
531 task_id = task["task_id"]
532 target_vim = self.my_vims[ro_task["target_id"]]
533 db_task_update = {"retries": 0}
534 retries = task.get("retries", 0)
535
536 try:
537 params = task["params"]
538 params_copy = deepcopy(params)
539 params_copy["ro_key"] = self.db.decrypt(
540 params_copy.pop("private_key"),
541 params_copy.pop("schema_version"),
542 params_copy.pop("salt"),
543 )
544 params_copy["ip_addr"] = params_copy.pop("ip_address")
545 target_vim.inject_user_key(**params_copy)
546 self.logger.debug(
547 "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"])
548 )
549
550 return (
551 "DONE",
552 None,
553 db_task_update,
554 ) # params_copy["key"]
555 except (vimconn.VimConnException, NsWorkerException) as e:
556 retries += 1
557
558 if retries < self.max_retries_inject_ssh_key:
559 return (
560 "BUILD",
561 None,
562 {
563 "retries": retries,
564 "next_retry": self.time_retries_inject_ssh_key,
565 },
566 )
567
568 self.logger.error(
569 "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e)
570 )
571 ro_vim_item_update = {"vim_details": str(e)}
572
573 return "FAILED", ro_vim_item_update, db_task_update
574
575
576 class VimInteractionImage(VimInteractionBase):
577 def new(self, ro_task, task_index, task_depends):
578 task = ro_task["tasks"][task_index]
579 task_id = task["task_id"]
580 created = False
581 created_items = {}
582 target_vim = self.my_vims[ro_task["target_id"]]
583
584 try:
585 # FIND
586 if task.get("find_params"):
587 vim_images = target_vim.get_image_list(**task["find_params"])
588
589 if not vim_images:
590 raise NsWorkerExceptionNotFound(
591 "Image not found with this criteria: '{}'".format(
592 task["find_params"]
593 )
594 )
595 elif len(vim_images) > 1:
596 raise NsWorkerException(
597 "More than one image found with this criteria: '{}'".format(
598 task["find_params"]
599 )
600 )
601 else:
602 vim_image_id = vim_images[0]["id"]
603
604 ro_vim_item_update = {
605 "vim_id": vim_image_id,
606 "vim_status": "DONE",
607 "created": created,
608 "created_items": created_items,
609 "vim_details": None,
610 }
611 self.logger.debug(
612 "task={} {} new-image={} created={}".format(
613 task_id, ro_task["target_id"], vim_image_id, created
614 )
615 )
616
617 return "DONE", ro_vim_item_update
618 except (NsWorkerException, vimconn.VimConnException) as e:
619 self.logger.error(
620 "task={} {} new-image: {}".format(task_id, ro_task["target_id"], e)
621 )
622 ro_vim_item_update = {
623 "vim_status": "VIM_ERROR",
624 "created": created,
625 "vim_details": str(e),
626 }
627
628 return "FAILED", ro_vim_item_update
629
630
631 class VimInteractionFlavor(VimInteractionBase):
632 def delete(self, ro_task, task_index):
633 task = ro_task["tasks"][task_index]
634 task_id = task["task_id"]
635 flavor_vim_id = ro_task["vim_info"]["vim_id"]
636 ro_vim_item_update_ok = {
637 "vim_status": "DELETED",
638 "created": False,
639 "vim_details": "DELETED",
640 "vim_id": None,
641 }
642
643 try:
644 if flavor_vim_id:
645 target_vim = self.my_vims[ro_task["target_id"]]
646 target_vim.delete_flavor(flavor_vim_id)
647 except vimconn.VimConnNotFoundException:
648 ro_vim_item_update_ok["vim_details"] = "already deleted"
649 except vimconn.VimConnException as e:
650 self.logger.error(
651 "ro_task={} vim={} del-flavor={}: {}".format(
652 ro_task["_id"], ro_task["target_id"], flavor_vim_id, e
653 )
654 )
655 ro_vim_item_update = {
656 "vim_status": "VIM_ERROR",
657 "vim_details": "Error while deleting: {}".format(e),
658 }
659
660 return "FAILED", ro_vim_item_update
661
662 self.logger.debug(
663 "task={} {} del-flavor={} {}".format(
664 task_id,
665 ro_task["target_id"],
666 flavor_vim_id,
667 ro_vim_item_update_ok.get("vim_details", ""),
668 )
669 )
670
671 return "DONE", ro_vim_item_update_ok
672
673 def new(self, ro_task, task_index, task_depends):
674 task = ro_task["tasks"][task_index]
675 task_id = task["task_id"]
676 created = False
677 created_items = {}
678 target_vim = self.my_vims[ro_task["target_id"]]
679
680 try:
681 # FIND
682 vim_flavor_id = None
683
684 if task.get("find_params"):
685 try:
686 flavor_data = task["find_params"]["flavor_data"]
687 vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
688 except vimconn.VimConnNotFoundException:
689 pass
690
691 if not vim_flavor_id and task.get("params"):
692 # CREATE
693 flavor_data = task["params"]["flavor_data"]
694 vim_flavor_id = target_vim.new_flavor(flavor_data)
695 created = True
696
697 ro_vim_item_update = {
698 "vim_id": vim_flavor_id,
699 "vim_status": "DONE",
700 "created": created,
701 "created_items": created_items,
702 "vim_details": None,
703 }
704 self.logger.debug(
705 "task={} {} new-flavor={} created={}".format(
706 task_id, ro_task["target_id"], vim_flavor_id, created
707 )
708 )
709
710 return "DONE", ro_vim_item_update
711 except (vimconn.VimConnException, NsWorkerException) as e:
712 self.logger.error(
713 "task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e)
714 )
715 ro_vim_item_update = {
716 "vim_status": "VIM_ERROR",
717 "created": created,
718 "vim_details": str(e),
719 }
720
721 return "FAILED", ro_vim_item_update
722
723
724 class VimInteractionSdnNet(VimInteractionBase):
725 @staticmethod
726 def _match_pci(port_pci, mapping):
727 """
728 Check if port_pci matches with mapping
729 mapping can have brackets to indicate that several chars are accepted. e.g
730 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
731 :param port_pci: text
732 :param mapping: text, can contain brackets to indicate several chars are available
733 :return: True if matches, False otherwise
734 """
735 if not port_pci or not mapping:
736 return False
737 if port_pci == mapping:
738 return True
739
740 mapping_index = 0
741 pci_index = 0
742 while True:
743 bracket_start = mapping.find("[", mapping_index)
744
745 if bracket_start == -1:
746 break
747
748 bracket_end = mapping.find("]", bracket_start)
749 if bracket_end == -1:
750 break
751
752 length = bracket_start - mapping_index
753 if (
754 length
755 and port_pci[pci_index : pci_index + length]
756 != mapping[mapping_index:bracket_start]
757 ):
758 return False
759
760 if (
761 port_pci[pci_index + length]
762 not in mapping[bracket_start + 1 : bracket_end]
763 ):
764 return False
765
766 pci_index += length + 1
767 mapping_index = bracket_end + 1
768
769 if port_pci[pci_index:] != mapping[mapping_index:]:
770 return False
771
772 return True
773
774 def _get_interfaces(self, vlds_to_connect, vim_account_id):
775 """
776 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
777 :param vim_account_id:
778 :return:
779 """
780 interfaces = []
781
782 for vld in vlds_to_connect:
783 table, _, db_id = vld.partition(":")
784 db_id, _, vld = db_id.partition(":")
785 _, _, vld_id = vld.partition(".")
786
787 if table == "vnfrs":
788 q_filter = {"vim-account-id": vim_account_id, "_id": db_id}
789 iface_key = "vnf-vld-id"
790 else: # table == "nsrs"
791 q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id}
792 iface_key = "ns-vld-id"
793
794 db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter)
795
796 for db_vnfr in db_vnfrs:
797 for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())):
798 for iface_index, interface in enumerate(vdur["interfaces"]):
799 if interface.get(iface_key) == vld_id and interface.get(
800 "type"
801 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
802 # only SR-IOV o PT
803 interface_ = interface.copy()
804 interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
805 db_vnfr["_id"], vdu_index, iface_index
806 )
807
808 if vdur.get("status") == "ERROR":
809 interface_["status"] = "ERROR"
810
811 interfaces.append(interface_)
812
813 return interfaces
814
815 def refresh(self, ro_task):
816 # look for task create
817 task_create_index, _ = next(
818 i_t
819 for i_t in enumerate(ro_task["tasks"])
820 if i_t[1]
821 and i_t[1]["action"] == "CREATE"
822 and i_t[1]["status"] != "FINISHED"
823 )
824
825 return self.new(ro_task, task_create_index, None)
826
827 def new(self, ro_task, task_index, task_depends):
828
829 task = ro_task["tasks"][task_index]
830 task_id = task["task_id"]
831 target_vim = self.my_vims[ro_task["target_id"]]
832
833 sdn_net_id = ro_task["vim_info"]["vim_id"]
834
835 created_items = ro_task["vim_info"].get("created_items")
836 connected_ports = ro_task["vim_info"].get("connected_ports", [])
837 new_connected_ports = []
838 last_update = ro_task["vim_info"].get("last_update", 0)
839 sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD"
840 error_list = []
841 created = ro_task["vim_info"].get("created", False)
842
843 try:
844 # CREATE
845 params = task["params"]
846 vlds_to_connect = params["vlds"]
847 associated_vim = params["target_vim"]
848 # external additional ports
849 additional_ports = params.get("sdn-ports") or ()
850 _, _, vim_account_id = associated_vim.partition(":")
851
852 if associated_vim:
853 # get associated VIM
854 if associated_vim not in self.db_vims:
855 self.db_vims[associated_vim] = self.db.get_one(
856 "vim_accounts", {"_id": vim_account_id}
857 )
858
859 db_vim = self.db_vims[associated_vim]
860
861 # look for ports to connect
862 ports = self._get_interfaces(vlds_to_connect, vim_account_id)
863 # print(ports)
864
865 sdn_ports = []
866 pending_ports = error_ports = 0
867 vlan_used = None
868 sdn_need_update = False
869
870 for port in ports:
871 vlan_used = port.get("vlan") or vlan_used
872
873 # TODO. Do not connect if already done
874 if not port.get("compute_node") or not port.get("pci"):
875 if port.get("status") == "ERROR":
876 error_ports += 1
877 else:
878 pending_ports += 1
879 continue
880
881 pmap = None
882 compute_node_mappings = next(
883 (
884 c
885 for c in db_vim["config"].get("sdn-port-mapping", ())
886 if c and c["compute_node"] == port["compute_node"]
887 ),
888 None,
889 )
890
891 if compute_node_mappings:
892 # process port_mapping pci of type 0000:af:1[01].[1357]
893 pmap = next(
894 (
895 p
896 for p in compute_node_mappings["ports"]
897 if self._match_pci(port["pci"], p.get("pci"))
898 ),
899 None,
900 )
901
902 if not pmap:
903 if not db_vim["config"].get("mapping_not_needed"):
904 error_list.append(
905 "Port mapping not found for compute_node={} pci={}".format(
906 port["compute_node"], port["pci"]
907 )
908 )
909 continue
910
911 pmap = {}
912
913 service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"])
914 new_port = {
915 "service_endpoint_id": pmap.get("service_endpoint_id")
916 or service_endpoint_id,
917 "service_endpoint_encapsulation_type": "dot1q"
918 if port["type"] == "SR-IOV"
919 else None,
920 "service_endpoint_encapsulation_info": {
921 "vlan": port.get("vlan"),
922 "mac": port.get("mac-address"),
923 "device_id": pmap.get("device_id") or port["compute_node"],
924 "device_interface_id": pmap.get("device_interface_id")
925 or port["pci"],
926 "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"),
927 "switch_port": pmap.get("switch_port"),
928 "service_mapping_info": pmap.get("service_mapping_info"),
929 },
930 }
931
932 # TODO
933 # if port["modified_at"] > last_update:
934 # sdn_need_update = True
935 new_connected_ports.append(port["id"]) # TODO
936 sdn_ports.append(new_port)
937
938 if error_ports:
939 error_list.append(
940 "{} interfaces have not been created as VDU is on ERROR status".format(
941 error_ports
942 )
943 )
944
945 # connect external ports
946 for index, additional_port in enumerate(additional_ports):
947 additional_port_id = additional_port.get(
948 "service_endpoint_id"
949 ) or "external-{}".format(index)
950 sdn_ports.append(
951 {
952 "service_endpoint_id": additional_port_id,
953 "service_endpoint_encapsulation_type": additional_port.get(
954 "service_endpoint_encapsulation_type", "dot1q"
955 ),
956 "service_endpoint_encapsulation_info": {
957 "vlan": additional_port.get("vlan") or vlan_used,
958 "mac": additional_port.get("mac_address"),
959 "device_id": additional_port.get("device_id"),
960 "device_interface_id": additional_port.get(
961 "device_interface_id"
962 ),
963 "switch_dpid": additional_port.get("switch_dpid")
964 or additional_port.get("switch_id"),
965 "switch_port": additional_port.get("switch_port"),
966 "service_mapping_info": additional_port.get(
967 "service_mapping_info"
968 ),
969 },
970 }
971 )
972 new_connected_ports.append(additional_port_id)
973 sdn_info = ""
974
975 # if there are more ports to connect or they have been modified, call create/update
976 if error_list:
977 sdn_status = "ERROR"
978 sdn_info = "; ".join(error_list)
979 elif set(connected_ports) != set(new_connected_ports) or sdn_need_update:
980 last_update = time.time()
981
982 if not sdn_net_id:
983 if len(sdn_ports) < 2:
984 sdn_status = "ACTIVE"
985
986 if not pending_ports:
987 self.logger.debug(
988 "task={} {} new-sdn-net done, less than 2 ports".format(
989 task_id, ro_task["target_id"]
990 )
991 )
992 else:
993 net_type = params.get("type") or "ELAN"
994 (
995 sdn_net_id,
996 created_items,
997 ) = target_vim.create_connectivity_service(net_type, sdn_ports)
998 created = True
999 self.logger.debug(
1000 "task={} {} new-sdn-net={} created={}".format(
1001 task_id, ro_task["target_id"], sdn_net_id, created
1002 )
1003 )
1004 else:
1005 created_items = target_vim.edit_connectivity_service(
1006 sdn_net_id, conn_info=created_items, connection_points=sdn_ports
1007 )
1008 created = True
1009 self.logger.debug(
1010 "task={} {} update-sdn-net={} created={}".format(
1011 task_id, ro_task["target_id"], sdn_net_id, created
1012 )
1013 )
1014
1015 connected_ports = new_connected_ports
1016 elif sdn_net_id:
1017 wim_status_dict = target_vim.get_connectivity_service_status(
1018 sdn_net_id, conn_info=created_items
1019 )
1020 sdn_status = wim_status_dict["sdn_status"]
1021
1022 if wim_status_dict.get("sdn_info"):
1023 sdn_info = str(wim_status_dict.get("sdn_info")) or ""
1024
1025 if wim_status_dict.get("error_msg"):
1026 sdn_info = wim_status_dict.get("error_msg") or ""
1027
1028 if pending_ports:
1029 if sdn_status != "ERROR":
1030 sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1031 len(ports) - pending_ports, len(ports)
1032 )
1033
1034 if sdn_status == "ACTIVE":
1035 sdn_status = "BUILD"
1036
1037 ro_vim_item_update = {
1038 "vim_id": sdn_net_id,
1039 "vim_status": sdn_status,
1040 "created": created,
1041 "created_items": created_items,
1042 "connected_ports": connected_ports,
1043 "vim_details": sdn_info,
1044 "last_update": last_update,
1045 }
1046
1047 return sdn_status, ro_vim_item_update
1048 except Exception as e:
1049 self.logger.error(
1050 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e),
1051 exc_info=not isinstance(
1052 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1053 ),
1054 )
1055 ro_vim_item_update = {
1056 "vim_status": "VIM_ERROR",
1057 "created": created,
1058 "vim_details": str(e),
1059 }
1060
1061 return "FAILED", ro_vim_item_update
1062
1063 def delete(self, ro_task, task_index):
1064 task = ro_task["tasks"][task_index]
1065 task_id = task["task_id"]
1066 sdn_vim_id = ro_task["vim_info"].get("vim_id")
1067 ro_vim_item_update_ok = {
1068 "vim_status": "DELETED",
1069 "created": False,
1070 "vim_details": "DELETED",
1071 "vim_id": None,
1072 }
1073
1074 try:
1075 if sdn_vim_id:
1076 target_vim = self.my_vims[ro_task["target_id"]]
1077 target_vim.delete_connectivity_service(
1078 sdn_vim_id, ro_task["vim_info"].get("created_items")
1079 )
1080
1081 except Exception as e:
1082 if (
1083 isinstance(e, sdnconn.SdnConnectorError)
1084 and e.http_code == HTTPStatus.NOT_FOUND.value
1085 ):
1086 ro_vim_item_update_ok["vim_details"] = "already deleted"
1087 else:
1088 self.logger.error(
1089 "ro_task={} vim={} del-sdn-net={}: {}".format(
1090 ro_task["_id"], ro_task["target_id"], sdn_vim_id, e
1091 ),
1092 exc_info=not isinstance(
1093 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1094 ),
1095 )
1096 ro_vim_item_update = {
1097 "vim_status": "VIM_ERROR",
1098 "vim_details": "Error while deleting: {}".format(e),
1099 }
1100
1101 return "FAILED", ro_vim_item_update
1102
1103 self.logger.debug(
1104 "task={} {} del-sdn-net={} {}".format(
1105 task_id,
1106 ro_task["target_id"],
1107 sdn_vim_id,
1108 ro_vim_item_update_ok.get("vim_details", ""),
1109 )
1110 )
1111
1112 return "DONE", ro_vim_item_update_ok
1113
1114
1115 class NsWorker(threading.Thread):
1116 REFRESH_BUILD = 5 # 5 seconds
1117 REFRESH_ACTIVE = 60 # 1 minute
1118 REFRESH_ERROR = 600
1119 REFRESH_IMAGE = 3600 * 10
1120 REFRESH_DELETE = 3600 * 10
1121 QUEUE_SIZE = 100
1122 terminate = False
1123
1124 def __init__(self, worker_index, config, plugins, db):
1125 """
1126
1127 :param worker_index: thread index
1128 :param config: general configuration of RO, among others the process_id with the docker id where it runs
1129 :param plugins: global shared dict with the loaded plugins
1130 :param db: database class instance to use
1131 """
1132 threading.Thread.__init__(self)
1133 self.config = config
1134 self.plugins = plugins
1135 self.plugin_name = "unknown"
1136 self.logger = logging.getLogger("ro.worker{}".format(worker_index))
1137 self.worker_index = worker_index
1138 self.task_queue = queue.Queue(self.QUEUE_SIZE)
1139 # targetvim: vimplugin class
1140 self.my_vims = {}
1141 # targetvim: vim information from database
1142 self.db_vims = {}
1143 # targetvim list
1144 self.vim_targets = []
1145 self.my_id = config["process_id"] + ":" + str(worker_index)
1146 self.db = db
1147 self.item2class = {
1148 "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger),
1149 "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger),
1150 "image": VimInteractionImage(
1151 self.db, self.my_vims, self.db_vims, self.logger
1152 ),
1153 "flavor": VimInteractionFlavor(
1154 self.db, self.my_vims, self.db_vims, self.logger
1155 ),
1156 "sdn_net": VimInteractionSdnNet(
1157 self.db, self.my_vims, self.db_vims, self.logger
1158 ),
1159 }
1160 self.time_last_task_processed = None
1161 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1162 self.tasks_to_delete = []
1163 # it is idle when there are not vim_targets associated
1164 self.idle = True
1165 self.task_locked_time = config["global"]["task_locked_time"]
1166
1167 def insert_task(self, task):
1168 try:
1169 self.task_queue.put(task, False)
1170 return None
1171 except queue.Full:
1172 raise NsWorkerException("timeout inserting a task")
1173
1174 def terminate(self):
1175 self.insert_task("exit")
1176
1177 def del_task(self, task):
1178 with self.task_lock:
1179 if task["status"] == "SCHEDULED":
1180 task["status"] = "SUPERSEDED"
1181 return True
1182 else: # task["status"] == "processing"
1183 self.task_lock.release()
1184 return False
1185
1186 def _process_vim_config(self, target_id, db_vim):
1187 """
1188 Process vim config, creating vim configuration files as ca_cert
1189 :param target_id: vim/sdn/wim + id
1190 :param db_vim: Vim dictionary obtained from database
1191 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1192 """
1193 if not db_vim.get("config"):
1194 return
1195
1196 file_name = ""
1197
1198 try:
1199 if db_vim["config"].get("ca_cert_content"):
1200 file_name = "{}:{}".format(target_id, self.worker_index)
1201
1202 try:
1203 mkdir(file_name)
1204 except FileExistsError:
1205 pass
1206
1207 file_name = file_name + "/ca_cert"
1208
1209 with open(file_name, "w") as f:
1210 f.write(db_vim["config"]["ca_cert_content"])
1211 del db_vim["config"]["ca_cert_content"]
1212 db_vim["config"]["ca_cert"] = file_name
1213 except Exception as e:
1214 raise NsWorkerException(
1215 "Error writing to file '{}': {}".format(file_name, e)
1216 )
1217
1218 def _load_plugin(self, name, type="vim"):
1219 # type can be vim or sdn
1220 if "rovim_dummy" not in self.plugins:
1221 self.plugins["rovim_dummy"] = VimDummyConnector
1222
1223 if "rosdn_dummy" not in self.plugins:
1224 self.plugins["rosdn_dummy"] = SdnDummyConnector
1225
1226 if name in self.plugins:
1227 return self.plugins[name]
1228
1229 try:
1230 for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name):
1231 self.plugins[name] = ep.load()
1232 except Exception as e:
1233 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e))
1234
1235 if name and name not in self.plugins:
1236 raise NsWorkerException(
1237 "Plugin 'osm_{n}' has not been installed".format(n=name)
1238 )
1239
1240 return self.plugins[name]
1241
1242 def _unload_vim(self, target_id):
1243 """
1244 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1245 :param target_id: Contains type:_id; where type can be 'vim', ...
1246 :return: None.
1247 """
1248 try:
1249 self.db_vims.pop(target_id, None)
1250 self.my_vims.pop(target_id, None)
1251
1252 if target_id in self.vim_targets:
1253 self.vim_targets.remove(target_id)
1254
1255 self.logger.info("Unloaded {}".format(target_id))
1256 rmtree("{}:{}".format(target_id, self.worker_index))
1257 except FileNotFoundError:
1258 pass # this is raised by rmtree if folder does not exist
1259 except Exception as e:
1260 self.logger.error("Cannot unload {}: {}".format(target_id, e))
1261
1262 def _check_vim(self, target_id):
1263 """
1264 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1265 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1266 :return: None.
1267 """
1268 target, _, _id = target_id.partition(":")
1269 now = time.time()
1270 update_dict = {}
1271 unset_dict = {}
1272 op_text = ""
1273 step = ""
1274 loaded = target_id in self.vim_targets
1275 target_database = (
1276 "vim_accounts"
1277 if target == "vim"
1278 else "wim_accounts"
1279 if target == "wim"
1280 else "sdns"
1281 )
1282
1283 try:
1284 step = "Getting {} from db".format(target_id)
1285 db_vim = self.db.get_one(target_database, {"_id": _id})
1286
1287 for op_index, operation in enumerate(
1288 db_vim["_admin"].get("operations", ())
1289 ):
1290 if operation["operationState"] != "PROCESSING":
1291 continue
1292
1293 locked_at = operation.get("locked_at")
1294
1295 if locked_at is not None and locked_at >= now - self.task_locked_time:
1296 # some other thread is doing this operation
1297 return
1298
1299 # lock
1300 op_text = "_admin.operations.{}.".format(op_index)
1301
1302 if not self.db.set_one(
1303 target_database,
1304 q_filter={
1305 "_id": _id,
1306 op_text + "operationState": "PROCESSING",
1307 op_text + "locked_at": locked_at,
1308 },
1309 update_dict={
1310 op_text + "locked_at": now,
1311 "admin.current_operation": op_index,
1312 },
1313 fail_on_empty=False,
1314 ):
1315 return
1316
1317 unset_dict[op_text + "locked_at"] = None
1318 unset_dict["current_operation"] = None
1319 step = "Loading " + target_id
1320 error_text = self._load_vim(target_id)
1321
1322 if not error_text:
1323 step = "Checking connectivity"
1324
1325 if target == "vim":
1326 self.my_vims[target_id].check_vim_connectivity()
1327 else:
1328 self.my_vims[target_id].check_credentials()
1329
1330 update_dict["_admin.operationalState"] = "ENABLED"
1331 update_dict["_admin.detailed-status"] = ""
1332 unset_dict[op_text + "detailed-status"] = None
1333 update_dict[op_text + "operationState"] = "COMPLETED"
1334
1335 return
1336
1337 except Exception as e:
1338 error_text = "{}: {}".format(step, e)
1339 self.logger.error("{} for {}: {}".format(step, target_id, e))
1340
1341 finally:
1342 if update_dict or unset_dict:
1343 if error_text:
1344 update_dict[op_text + "operationState"] = "FAILED"
1345 update_dict[op_text + "detailed-status"] = error_text
1346 unset_dict.pop(op_text + "detailed-status", None)
1347 update_dict["_admin.operationalState"] = "ERROR"
1348 update_dict["_admin.detailed-status"] = error_text
1349
1350 if op_text:
1351 update_dict[op_text + "statusEnteredTime"] = now
1352
1353 self.db.set_one(
1354 target_database,
1355 q_filter={"_id": _id},
1356 update_dict=update_dict,
1357 unset=unset_dict,
1358 fail_on_empty=False,
1359 )
1360
1361 if not loaded:
1362 self._unload_vim(target_id)
1363
1364 def _reload_vim(self, target_id):
1365 if target_id in self.vim_targets:
1366 self._load_vim(target_id)
1367 else:
1368 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1369 # just remove it to force load again next time it is needed
1370 self.db_vims.pop(target_id, None)
1371
1372 def _load_vim(self, target_id):
1373 """
1374 Load or reload a vim_account, sdn_controller or wim_account.
1375 Read content from database, load the plugin if not loaded.
1376 In case of error loading the plugin, it load a failing VIM_connector
1377 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1378 :param target_id: Contains type:_id; where type can be 'vim', ...
1379 :return: None if ok, descriptive text if error
1380 """
1381 target, _, _id = target_id.partition(":")
1382 target_database = (
1383 "vim_accounts"
1384 if target == "vim"
1385 else "wim_accounts"
1386 if target == "wim"
1387 else "sdns"
1388 )
1389 plugin_name = ""
1390 vim = None
1391
1392 try:
1393 step = "Getting {}={} from db".format(target, _id)
1394 # TODO process for wim, sdnc, ...
1395 vim = self.db.get_one(target_database, {"_id": _id})
1396
1397 # if deep_get(vim, "config", "sdn-controller"):
1398 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1399 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1400
1401 step = "Decrypting password"
1402 schema_version = vim.get("schema_version")
1403 self.db.encrypt_decrypt_fields(
1404 vim,
1405 "decrypt",
1406 fields=("password", "secret"),
1407 schema_version=schema_version,
1408 salt=_id,
1409 )
1410 self._process_vim_config(target_id, vim)
1411
1412 if target == "vim":
1413 plugin_name = "rovim_" + vim["vim_type"]
1414 step = "Loading plugin '{}'".format(plugin_name)
1415 vim_module_conn = self._load_plugin(plugin_name)
1416 step = "Loading {}'".format(target_id)
1417 self.my_vims[target_id] = vim_module_conn(
1418 uuid=vim["_id"],
1419 name=vim["name"],
1420 tenant_id=vim.get("vim_tenant_id"),
1421 tenant_name=vim.get("vim_tenant_name"),
1422 url=vim["vim_url"],
1423 url_admin=None,
1424 user=vim["vim_user"],
1425 passwd=vim["vim_password"],
1426 config=vim.get("config") or {},
1427 persistent_info={},
1428 )
1429 else: # sdn
1430 plugin_name = "rosdn_" + vim["type"]
1431 step = "Loading plugin '{}'".format(plugin_name)
1432 vim_module_conn = self._load_plugin(plugin_name, "sdn")
1433 step = "Loading {}'".format(target_id)
1434 wim = deepcopy(vim)
1435 wim_config = wim.pop("config", {}) or {}
1436 wim["uuid"] = wim["_id"]
1437 wim["wim_url"] = wim["url"]
1438
1439 if wim.get("dpid"):
1440 wim_config["dpid"] = wim.pop("dpid")
1441
1442 if wim.get("switch_id"):
1443 wim_config["switch_id"] = wim.pop("switch_id")
1444
1445 # wim, wim_account, config
1446 self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config)
1447 self.db_vims[target_id] = vim
1448 self.error_status = None
1449
1450 self.logger.info(
1451 "Connector loaded for {}, plugin={}".format(target_id, plugin_name)
1452 )
1453 except Exception as e:
1454 self.logger.error(
1455 "Cannot load {} plugin={}: {} {}".format(
1456 target_id, plugin_name, step, e
1457 )
1458 )
1459
1460 self.db_vims[target_id] = vim or {}
1461 self.db_vims[target_id] = FailingConnector(str(e))
1462 error_status = "{} Error: {}".format(step, e)
1463
1464 return error_status
1465 finally:
1466 if target_id not in self.vim_targets:
1467 self.vim_targets.append(target_id)
1468
1469 def _get_db_task(self):
1470 """
1471 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1472 :return: None
1473 """
1474 now = time.time()
1475
1476 if not self.time_last_task_processed:
1477 self.time_last_task_processed = now
1478
1479 try:
1480 while True:
1481 locked = self.db.set_one(
1482 "ro_tasks",
1483 q_filter={
1484 "target_id": self.vim_targets,
1485 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1486 "locked_at.lt": now - self.task_locked_time,
1487 "to_check_at.lt": self.time_last_task_processed,
1488 },
1489 update_dict={"locked_by": self.my_id, "locked_at": now},
1490 fail_on_empty=False,
1491 )
1492
1493 if locked:
1494 # read and return
1495 ro_task = self.db.get_one(
1496 "ro_tasks",
1497 q_filter={
1498 "target_id": self.vim_targets,
1499 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1500 "locked_at": now,
1501 },
1502 )
1503 return ro_task
1504
1505 if self.time_last_task_processed == now:
1506 self.time_last_task_processed = None
1507 return None
1508 else:
1509 self.time_last_task_processed = now
1510 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
1511
1512 except DbException as e:
1513 self.logger.error("Database exception at _get_db_task: {}".format(e))
1514 except Exception as e:
1515 self.logger.critical(
1516 "Unexpected exception at _get_db_task: {}".format(e), exc_info=True
1517 )
1518
1519 return None
1520
1521 def _delete_task(self, ro_task, task_index, task_depends, db_update):
1522 """
1523 Determine if this task need to be done or superseded
1524 :return: None
1525 """
1526 my_task = ro_task["tasks"][task_index]
1527 task_id = my_task["task_id"]
1528 needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get(
1529 "created_items", False
1530 )
1531
1532 if my_task["status"] == "FAILED":
1533 return None, None # TODO need to be retry??
1534
1535 try:
1536 for index, task in enumerate(ro_task["tasks"]):
1537 if index == task_index or not task:
1538 continue # own task
1539
1540 if (
1541 my_task["target_record"] == task["target_record"]
1542 and task["action"] == "CREATE"
1543 ):
1544 # set to finished
1545 db_update["tasks.{}.status".format(index)] = task[
1546 "status"
1547 ] = "FINISHED"
1548 elif task["action"] == "CREATE" and task["status"] not in (
1549 "FINISHED",
1550 "SUPERSEDED",
1551 ):
1552 needed_delete = False
1553
1554 if needed_delete:
1555 return self.item2class[my_task["item"]].delete(ro_task, task_index)
1556 else:
1557 return "SUPERSEDED", None
1558 except Exception as e:
1559 if not isinstance(e, NsWorkerException):
1560 self.logger.critical(
1561 "Unexpected exception at _delete_task task={}: {}".format(
1562 task_id, e
1563 ),
1564 exc_info=True,
1565 )
1566
1567 return "FAILED", {"vim_status": "VIM_ERROR", "vim_details": str(e)}
1568
1569 def _create_task(self, ro_task, task_index, task_depends, db_update):
1570 """
1571 Determine if this task need to create something at VIM
1572 :return: None
1573 """
1574 my_task = ro_task["tasks"][task_index]
1575 task_id = my_task["task_id"]
1576 task_status = None
1577
1578 if my_task["status"] == "FAILED":
1579 return None, None # TODO need to be retry??
1580 elif my_task["status"] == "SCHEDULED":
1581 # check if already created by another task
1582 for index, task in enumerate(ro_task["tasks"]):
1583 if index == task_index or not task:
1584 continue # own task
1585
1586 if task["action"] == "CREATE" and task["status"] not in (
1587 "SCHEDULED",
1588 "FINISHED",
1589 "SUPERSEDED",
1590 ):
1591 return task["status"], "COPY_VIM_INFO"
1592
1593 try:
1594 task_status, ro_vim_item_update = self.item2class[my_task["item"]].new(
1595 ro_task, task_index, task_depends
1596 )
1597 # TODO update other CREATE tasks
1598 except Exception as e:
1599 if not isinstance(e, NsWorkerException):
1600 self.logger.error(
1601 "Error executing task={}: {}".format(task_id, e), exc_info=True
1602 )
1603
1604 task_status = "FAILED"
1605 ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_details": str(e)}
1606 # TODO update ro_vim_item_update
1607
1608 return task_status, ro_vim_item_update
1609 else:
1610 return None, None
1611
1612 def _get_dependency(self, task_id, ro_task=None, target_id=None):
1613 """
1614 Look for dependency task
1615 :param task_id: Can be one of
1616 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1617 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1618 3. task.task_id: "<action_id>:number"
1619 :param ro_task:
1620 :param target_id:
1621 :return: database ro_task plus index of task
1622 """
1623 if (
1624 task_id.startswith("vim:")
1625 or task_id.startswith("sdn:")
1626 or task_id.startswith("wim:")
1627 ):
1628 target_id, _, task_id = task_id.partition(" ")
1629
1630 if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
1631 ro_task_dependency = self.db.get_one(
1632 "ro_tasks",
1633 q_filter={"target_id": target_id, "tasks.target_record_id": task_id},
1634 fail_on_empty=False,
1635 )
1636
1637 if ro_task_dependency:
1638 for task_index, task in enumerate(ro_task_dependency["tasks"]):
1639 if task["target_record_id"] == task_id:
1640 return ro_task_dependency, task_index
1641
1642 else:
1643 if ro_task:
1644 for task_index, task in enumerate(ro_task["tasks"]):
1645 if task and task["task_id"] == task_id:
1646 return ro_task, task_index
1647
1648 ro_task_dependency = self.db.get_one(
1649 "ro_tasks",
1650 q_filter={
1651 "tasks.ANYINDEX.task_id": task_id,
1652 "tasks.ANYINDEX.target_record.ne": None,
1653 },
1654 fail_on_empty=False,
1655 )
1656
1657 if ro_task_dependency:
1658 for task_index, task in ro_task_dependency["tasks"]:
1659 if task["task_id"] == task_id:
1660 return ro_task_dependency, task_index
1661 raise NsWorkerException("Cannot get depending task {}".format(task_id))
1662
1663 def _process_pending_tasks(self, ro_task):
1664 ro_task_id = ro_task["_id"]
1665 now = time.time()
1666 # one day
1667 next_check_at = now + (24 * 60 * 60)
1668 db_ro_task_update = {}
1669
1670 def _update_refresh(new_status):
1671 # compute next_refresh
1672 nonlocal task
1673 nonlocal next_check_at
1674 nonlocal db_ro_task_update
1675 nonlocal ro_task
1676
1677 next_refresh = time.time()
1678
1679 if task["item"] in ("image", "flavor"):
1680 next_refresh += self.REFRESH_IMAGE
1681 elif new_status == "BUILD":
1682 next_refresh += self.REFRESH_BUILD
1683 elif new_status == "DONE":
1684 next_refresh += self.REFRESH_ACTIVE
1685 else:
1686 next_refresh += self.REFRESH_ERROR
1687
1688 next_check_at = min(next_check_at, next_refresh)
1689 db_ro_task_update["vim_info.refresh_at"] = next_refresh
1690 ro_task["vim_info"]["refresh_at"] = next_refresh
1691
1692 try:
1693 # 0: get task_status_create
1694 lock_object = None
1695 task_status_create = None
1696 task_create = next(
1697 (
1698 t
1699 for t in ro_task["tasks"]
1700 if t
1701 and t["action"] == "CREATE"
1702 and t["status"] in ("BUILD", "DONE")
1703 ),
1704 None,
1705 )
1706
1707 if task_create:
1708 task_status_create = task_create["status"]
1709
1710 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
1711 for task_action in ("DELETE", "CREATE", "EXEC"):
1712 db_vim_update = None
1713 new_status = None
1714
1715 for task_index, task in enumerate(ro_task["tasks"]):
1716 if not task:
1717 continue # task deleted
1718
1719 task_depends = {}
1720 target_update = None
1721
1722 if (
1723 (
1724 task_action in ("DELETE", "EXEC")
1725 and task["status"] not in ("SCHEDULED", "BUILD")
1726 )
1727 or task["action"] != task_action
1728 or (
1729 task_action == "CREATE"
1730 and task["status"] in ("FINISHED", "SUPERSEDED")
1731 )
1732 ):
1733 continue
1734
1735 task_path = "tasks.{}.status".format(task_index)
1736 try:
1737 db_vim_info_update = None
1738
1739 if task["status"] == "SCHEDULED":
1740 # check if tasks that this depends on have been completed
1741 dependency_not_completed = False
1742
1743 for dependency_task_id in task.get("depends_on") or ():
1744 (
1745 dependency_ro_task,
1746 dependency_task_index,
1747 ) = self._get_dependency(
1748 dependency_task_id, target_id=ro_task["target_id"]
1749 )
1750 dependency_task = dependency_ro_task["tasks"][
1751 dependency_task_index
1752 ]
1753
1754 if dependency_task["status"] == "SCHEDULED":
1755 dependency_not_completed = True
1756 next_check_at = min(
1757 next_check_at, dependency_ro_task["to_check_at"]
1758 )
1759 # must allow dependent task to be processed first
1760 # to do this set time after last_task_processed
1761 next_check_at = max(
1762 self.time_last_task_processed, next_check_at
1763 )
1764 break
1765 elif dependency_task["status"] == "FAILED":
1766 error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
1767 task["action"],
1768 task["item"],
1769 dependency_task["action"],
1770 dependency_task["item"],
1771 dependency_task_id,
1772 dependency_ro_task["vim_info"].get(
1773 "vim_details"
1774 ),
1775 )
1776 self.logger.error(
1777 "task={} {}".format(task["task_id"], error_text)
1778 )
1779 raise NsWorkerException(error_text)
1780
1781 task_depends[dependency_task_id] = dependency_ro_task[
1782 "vim_info"
1783 ]["vim_id"]
1784 task_depends[
1785 "TASK-{}".format(dependency_task_id)
1786 ] = dependency_ro_task["vim_info"]["vim_id"]
1787
1788 if dependency_not_completed:
1789 # TODO set at vim_info.vim_details that it is waiting
1790 continue
1791
1792 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
1793 # the task of renew this locking. It will update database locket_at periodically
1794 if not lock_object:
1795 lock_object = LockRenew.add_lock_object(
1796 "ro_tasks", ro_task, self
1797 )
1798
1799 if task["action"] == "DELETE":
1800 (new_status, db_vim_info_update,) = self._delete_task(
1801 ro_task, task_index, task_depends, db_ro_task_update
1802 )
1803 new_status = (
1804 "FINISHED" if new_status == "DONE" else new_status
1805 )
1806 # ^with FINISHED instead of DONE it will not be refreshing
1807
1808 if new_status in ("FINISHED", "SUPERSEDED"):
1809 target_update = "DELETE"
1810 elif task["action"] == "EXEC":
1811 (
1812 new_status,
1813 db_vim_info_update,
1814 db_task_update,
1815 ) = self.item2class[task["item"]].exec(
1816 ro_task, task_index, task_depends
1817 )
1818 new_status = (
1819 "FINISHED" if new_status == "DONE" else new_status
1820 )
1821 # ^with FINISHED instead of DONE it will not be refreshing
1822
1823 if db_task_update:
1824 # load into database the modified db_task_update "retries" and "next_retry"
1825 if db_task_update.get("retries"):
1826 db_ro_task_update[
1827 "tasks.{}.retries".format(task_index)
1828 ] = db_task_update["retries"]
1829
1830 next_check_at = time.time() + db_task_update.get(
1831 "next_retry", 60
1832 )
1833 target_update = None
1834 elif task["action"] == "CREATE":
1835 if task["status"] == "SCHEDULED":
1836 if task_status_create:
1837 new_status = task_status_create
1838 target_update = "COPY_VIM_INFO"
1839 else:
1840 new_status, db_vim_info_update = self.item2class[
1841 task["item"]
1842 ].new(ro_task, task_index, task_depends)
1843 # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
1844 _update_refresh(new_status)
1845 else:
1846 if (
1847 ro_task["vim_info"]["refresh_at"]
1848 and now > ro_task["vim_info"]["refresh_at"]
1849 ):
1850 new_status, db_vim_info_update = self.item2class[
1851 task["item"]
1852 ].refresh(ro_task)
1853 _update_refresh(new_status)
1854
1855 except Exception as e:
1856 new_status = "FAILED"
1857 db_vim_info_update = {
1858 "vim_status": "VIM_ERROR",
1859 "vim_details": str(e),
1860 }
1861
1862 if not isinstance(
1863 e, (NsWorkerException, vimconn.VimConnException)
1864 ):
1865 self.logger.error(
1866 "Unexpected exception at _delete_task task={}: {}".format(
1867 task["task_id"], e
1868 ),
1869 exc_info=True,
1870 )
1871
1872 try:
1873 if db_vim_info_update:
1874 db_vim_update = db_vim_info_update.copy()
1875 db_ro_task_update.update(
1876 {
1877 "vim_info." + k: v
1878 for k, v in db_vim_info_update.items()
1879 }
1880 )
1881 ro_task["vim_info"].update(db_vim_info_update)
1882
1883 if new_status:
1884 if task_action == "CREATE":
1885 task_status_create = new_status
1886 db_ro_task_update[task_path] = new_status
1887
1888 if target_update or db_vim_update:
1889 if target_update == "DELETE":
1890 self._update_target(task, None)
1891 elif target_update == "COPY_VIM_INFO":
1892 self._update_target(task, ro_task["vim_info"])
1893 else:
1894 self._update_target(task, db_vim_update)
1895
1896 except Exception as e:
1897 if (
1898 isinstance(e, DbException)
1899 and e.http_code == HTTPStatus.NOT_FOUND
1900 ):
1901 # if the vnfrs or nsrs has been removed from database, this task must be removed
1902 self.logger.debug(
1903 "marking to delete task={}".format(task["task_id"])
1904 )
1905 self.tasks_to_delete.append(task)
1906 else:
1907 self.logger.error(
1908 "Unexpected exception at _update_target task={}: {}".format(
1909 task["task_id"], e
1910 ),
1911 exc_info=True,
1912 )
1913
1914 locked_at = ro_task["locked_at"]
1915
1916 if lock_object:
1917 locked_at = [
1918 lock_object["locked_at"],
1919 lock_object["locked_at"] + self.task_locked_time,
1920 ]
1921 # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
1922 # contain exactly locked_at + self.task_locked_time
1923 LockRenew.remove_lock_object(lock_object)
1924
1925 q_filter = {
1926 "_id": ro_task["_id"],
1927 "to_check_at": ro_task["to_check_at"],
1928 "locked_at": locked_at,
1929 }
1930 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
1931 # outside this task (by ro_nbi) do not update it
1932 db_ro_task_update["locked_by"] = None
1933 # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
1934 db_ro_task_update["locked_at"] = int(now - self.task_locked_time)
1935 db_ro_task_update["modified_at"] = now
1936 db_ro_task_update["to_check_at"] = next_check_at
1937
1938 if not self.db.set_one(
1939 "ro_tasks",
1940 update_dict=db_ro_task_update,
1941 q_filter=q_filter,
1942 fail_on_empty=False,
1943 ):
1944 del db_ro_task_update["to_check_at"]
1945 del q_filter["to_check_at"]
1946 self.db.set_one(
1947 "ro_tasks",
1948 q_filter=q_filter,
1949 update_dict=db_ro_task_update,
1950 fail_on_empty=True,
1951 )
1952 except DbException as e:
1953 self.logger.error(
1954 "ro_task={} Error updating database {}".format(ro_task_id, e)
1955 )
1956 except Exception as e:
1957 self.logger.error(
1958 "Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True
1959 )
1960
1961 def _update_target(self, task, ro_vim_item_update):
1962 table, _, temp = task["target_record"].partition(":")
1963 _id, _, path_vim_status = temp.partition(":")
1964 path_item = path_vim_status[: path_vim_status.rfind(".")]
1965 path_item = path_item[: path_item.rfind(".")]
1966 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
1967 # path_item: dot separated list targeting record information, e.g. "vdur.10"
1968
1969 if ro_vim_item_update:
1970 update_dict = {
1971 path_vim_status + "." + k: v
1972 for k, v in ro_vim_item_update.items()
1973 if k
1974 in ("vim_id", "vim_details", "vim_name", "vim_status", "interfaces")
1975 }
1976
1977 if path_vim_status.startswith("vdur."):
1978 # for backward compatibility, add vdur.name apart from vdur.vim_name
1979 if ro_vim_item_update.get("vim_name"):
1980 update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"]
1981
1982 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
1983 if ro_vim_item_update.get("vim_id"):
1984 update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"]
1985
1986 # update general status
1987 if ro_vim_item_update.get("vim_status"):
1988 update_dict[path_item + ".status"] = ro_vim_item_update[
1989 "vim_status"
1990 ]
1991
1992 if ro_vim_item_update.get("interfaces"):
1993 path_interfaces = path_item + ".interfaces"
1994
1995 for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
1996 if iface:
1997 update_dict.update(
1998 {
1999 path_interfaces + ".{}.".format(i) + k: v
2000 for k, v in iface.items()
2001 if k in ("vlan", "compute_node", "pci")
2002 }
2003 )
2004
2005 # put ip_address and mac_address with ip-address and mac-address
2006 if iface.get("ip_address"):
2007 update_dict[
2008 path_interfaces + ".{}.".format(i) + "ip-address"
2009 ] = iface["ip_address"]
2010
2011 if iface.get("mac_address"):
2012 update_dict[
2013 path_interfaces + ".{}.".format(i) + "mac-address"
2014 ] = iface["mac_address"]
2015
2016 if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
2017 update_dict["ip-address"] = iface.get("ip_address").split(
2018 ";"
2019 )[0]
2020
2021 if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
2022 update_dict[path_item + ".ip-address"] = iface.get(
2023 "ip_address"
2024 ).split(";")[0]
2025
2026 self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
2027 else:
2028 update_dict = {path_item + ".status": "DELETED"}
2029 self.db.set_one(
2030 table,
2031 q_filter={"_id": _id},
2032 update_dict=update_dict,
2033 unset={path_vim_status: None},
2034 )
2035
2036 def _process_delete_db_tasks(self):
2037 """
2038 Delete task from database because vnfrs or nsrs or both have been deleted
2039 :return: None. Uses and modify self.tasks_to_delete
2040 """
2041 while self.tasks_to_delete:
2042 task = self.tasks_to_delete[0]
2043 vnfrs_deleted = None
2044 nsr_id = task["nsr_id"]
2045
2046 if task["target_record"].startswith("vnfrs:"):
2047 # check if nsrs is present
2048 if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False):
2049 vnfrs_deleted = task["target_record"].split(":")[1]
2050
2051 try:
2052 self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted)
2053 except Exception as e:
2054 self.logger.error(
2055 "Error deleting task={}: {}".format(task["task_id"], e)
2056 )
2057 self.tasks_to_delete.pop(0)
2058
2059 @staticmethod
2060 def delete_db_tasks(db, nsr_id, vnfrs_deleted):
2061 """
2062 Static method because it is called from osm_ng_ro.ns
2063 :param db: instance of database to use
2064 :param nsr_id: affected nsrs id
2065 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2066 :return: None, exception is fails
2067 """
2068 retries = 5
2069 for retry in range(retries):
2070 ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
2071 now = time.time()
2072 conflict = False
2073
2074 for ro_task in ro_tasks:
2075 db_update = {}
2076 to_delete_ro_task = True
2077
2078 for index, task in enumerate(ro_task["tasks"]):
2079 if not task:
2080 pass
2081 elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or (
2082 vnfrs_deleted
2083 and task["target_record"].startswith("vnfrs:" + vnfrs_deleted)
2084 ):
2085 db_update["tasks.{}".format(index)] = None
2086 else:
2087 # used by other nsr, ro_task cannot be deleted
2088 to_delete_ro_task = False
2089
2090 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2091 if to_delete_ro_task:
2092 if not db.del_one(
2093 "ro_tasks",
2094 q_filter={
2095 "_id": ro_task["_id"],
2096 "modified_at": ro_task["modified_at"],
2097 },
2098 fail_on_empty=False,
2099 ):
2100 conflict = True
2101 elif db_update:
2102 db_update["modified_at"] = now
2103 if not db.set_one(
2104 "ro_tasks",
2105 q_filter={
2106 "_id": ro_task["_id"],
2107 "modified_at": ro_task["modified_at"],
2108 },
2109 update_dict=db_update,
2110 fail_on_empty=False,
2111 ):
2112 conflict = True
2113 if not conflict:
2114 return
2115 else:
2116 raise NsWorkerException("Exceeded {} retries".format(retries))
2117
2118 def run(self):
2119 # load database
2120 self.logger.info("Starting")
2121 while True:
2122 # step 1: get commands from queue
2123 try:
2124 if self.vim_targets:
2125 task = self.task_queue.get(block=False)
2126 else:
2127 if not self.idle:
2128 self.logger.debug("enters in idle state")
2129 self.idle = True
2130 task = self.task_queue.get(block=True)
2131 self.idle = False
2132
2133 if task[0] == "terminate":
2134 break
2135 elif task[0] == "load_vim":
2136 self.logger.info("order to load vim {}".format(task[1]))
2137 self._load_vim(task[1])
2138 elif task[0] == "unload_vim":
2139 self.logger.info("order to unload vim {}".format(task[1]))
2140 self._unload_vim(task[1])
2141 elif task[0] == "reload_vim":
2142 self._reload_vim(task[1])
2143 elif task[0] == "check_vim":
2144 self.logger.info("order to check vim {}".format(task[1]))
2145 self._check_vim(task[1])
2146 continue
2147 except Exception as e:
2148 if isinstance(e, queue.Empty):
2149 pass
2150 else:
2151 self.logger.critical(
2152 "Error processing task: {}".format(e), exc_info=True
2153 )
2154
2155 # step 2: process pending_tasks, delete not needed tasks
2156 try:
2157 if self.tasks_to_delete:
2158 self._process_delete_db_tasks()
2159 busy = False
2160 ro_task = self._get_db_task()
2161 if ro_task:
2162 self._process_pending_tasks(ro_task)
2163 busy = True
2164 if not busy:
2165 time.sleep(5)
2166 except Exception as e:
2167 self.logger.critical(
2168 "Unexpected exception at run: " + str(e), exc_info=True
2169 )
2170
2171 self.logger.info("Finishing")