Reformatting RO
[osm/RO.git] / NG-RO / osm_ng_ro / ns_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 #
18 ##
19
20 """"
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
25 """
26
27 import logging
28 import queue
29 import threading
30 import time
31 import yaml
32 from copy import deepcopy
33 from http import HTTPStatus
34 from os import mkdir
35 from pkg_resources import iter_entry_points
36 from shutil import rmtree
37 from unittest.mock import Mock
38
39 # from osm_common import dbmongo, dbmemory, fslocal, fsmongo, msglocal, msgkafka, version as common_version
40 from osm_common.dbbase import DbException
41 from osm_ro_plugin.vim_dummy import VimDummyConnector
42 from osm_ro_plugin.sdn_dummy import SdnDummyConnector
43 from osm_ro_plugin import vimconn, sdnconn
44 from osm_ng_ro.vim_admin import LockRenew
45
46
47 __author__ = "Alfonso Tierno"
48 __date__ = "$28-Sep-2017 12:07:15$"
49
50
51 def deep_get(target_dict, *args, **kwargs):
52 """
53 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
54 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
55 :param target_dict: dictionary to be read
56 :param args: list of keys to read from target_dict
57 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
58 :return: The wanted value if exist, None or default otherwise
59 """
60 for key in args:
61 if not isinstance(target_dict, dict) or key not in target_dict:
62 return kwargs.get("default")
63 target_dict = target_dict[key]
64 return target_dict
65
66
67 class NsWorkerException(Exception):
68 pass
69
70
71 class FailingConnector:
72 def __init__(self, error_msg):
73 self.error_msg = error_msg
74
75 for method in dir(vimconn.VimConnector):
76 if method[0] != "_":
77 setattr(
78 self, method, Mock(side_effect=vimconn.VimConnException(error_msg))
79 )
80
81 for method in dir(sdnconn.SdnConnectorBase):
82 if method[0] != "_":
83 setattr(
84 self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg))
85 )
86
87
88 class NsWorkerExceptionNotFound(NsWorkerException):
89 pass
90
91
92 class VimInteractionBase:
93 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
94 It implements methods that does nothing and return ok"""
95
96 def __init__(self, db, my_vims, db_vims, logger):
97 self.db = db
98 self.logger = logger
99 self.my_vims = my_vims
100 self.db_vims = db_vims
101
102 def new(self, ro_task, task_index, task_depends):
103 return "BUILD", {}
104
105 def refresh(self, ro_task):
106 """skip calling VIM to get image, flavor status. Assumes ok"""
107 if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
108 return "FAILED", {}
109
110 return "DONE", {}
111
112 def delete(self, ro_task, task_index):
113 """skip calling VIM to delete image. Assumes ok"""
114 return "DONE", {}
115
116 def exec(self, ro_task, task_index, task_depends):
117 return "DONE", None, None
118
119
120 class VimInteractionNet(VimInteractionBase):
121 def new(self, ro_task, task_index, task_depends):
122 vim_net_id = None
123 task = ro_task["tasks"][task_index]
124 task_id = task["task_id"]
125 created = False
126 created_items = {}
127 target_vim = self.my_vims[ro_task["target_id"]]
128
129 try:
130 # FIND
131 if task.get("find_params"):
132 # if management, get configuration of VIM
133 if task["find_params"].get("filter_dict"):
134 vim_filter = task["find_params"]["filter_dict"]
135 # mamagement network
136 elif task["find_params"].get("mgmt"):
137 if deep_get(
138 self.db_vims[ro_task["target_id"]],
139 "config",
140 "management_network_id",
141 ):
142 vim_filter = {
143 "id": self.db_vims[ro_task["target_id"]]["config"][
144 "management_network_id"
145 ]
146 }
147 elif deep_get(
148 self.db_vims[ro_task["target_id"]],
149 "config",
150 "management_network_name",
151 ):
152 vim_filter = {
153 "name": self.db_vims[ro_task["target_id"]]["config"][
154 "management_network_name"
155 ]
156 }
157 else:
158 vim_filter = {"name": task["find_params"]["name"]}
159 else:
160 raise NsWorkerExceptionNotFound(
161 "Invalid find_params for new_net {}".format(task["find_params"])
162 )
163
164 vim_nets = target_vim.get_network_list(vim_filter)
165 if not vim_nets and not task.get("params"):
166 raise NsWorkerExceptionNotFound(
167 "Network not found with this criteria: '{}'".format(
168 task.get("find_params")
169 )
170 )
171 elif len(vim_nets) > 1:
172 raise NsWorkerException(
173 "More than one network found with this criteria: '{}'".format(
174 task["find_params"]
175 )
176 )
177
178 if vim_nets:
179 vim_net_id = vim_nets[0]["id"]
180 else:
181 # CREATE
182 params = task["params"]
183 vim_net_id, created_items = target_vim.new_network(**params)
184 created = True
185
186 ro_vim_item_update = {
187 "vim_id": vim_net_id,
188 "vim_status": "BUILD",
189 "created": created,
190 "created_items": created_items,
191 "vim_details": None,
192 }
193 self.logger.debug(
194 "task={} {} new-net={} created={}".format(
195 task_id, ro_task["target_id"], vim_net_id, created
196 )
197 )
198
199 return "BUILD", ro_vim_item_update
200 except (vimconn.VimConnException, NsWorkerException) as e:
201 self.logger.error(
202 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e)
203 )
204 ro_vim_item_update = {
205 "vim_status": "VIM_ERROR",
206 "created": created,
207 "vim_details": str(e),
208 }
209
210 return "FAILED", ro_vim_item_update
211
212 def refresh(self, ro_task):
213 """Call VIM to get network status"""
214 ro_task_id = ro_task["_id"]
215 target_vim = self.my_vims[ro_task["target_id"]]
216 vim_id = ro_task["vim_info"]["vim_id"]
217 net_to_refresh_list = [vim_id]
218
219 try:
220 vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
221 vim_info = vim_dict[vim_id]
222
223 if vim_info["status"] == "ACTIVE":
224 task_status = "DONE"
225 elif vim_info["status"] == "BUILD":
226 task_status = "BUILD"
227 else:
228 task_status = "FAILED"
229 except vimconn.VimConnException as e:
230 # Mark all tasks at VIM_ERROR status
231 self.logger.error(
232 "ro_task={} vim={} get-net={}: {}".format(
233 ro_task_id, ro_task["target_id"], vim_id, e
234 )
235 )
236 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
237 task_status = "FAILED"
238
239 ro_vim_item_update = {}
240 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
241 ro_vim_item_update["vim_status"] = vim_info["status"]
242
243 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
244 ro_vim_item_update["vim_name"] = vim_info.get("name")
245
246 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
247 if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"):
248 ro_vim_item_update["vim_details"] = vim_info.get("error_msg")
249 elif vim_info["status"] == "DELETED":
250 ro_vim_item_update["vim_id"] = None
251 ro_vim_item_update["vim_details"] = "Deleted externally"
252 else:
253 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
254 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
255
256 if ro_vim_item_update:
257 self.logger.debug(
258 "ro_task={} {} get-net={}: status={} {}".format(
259 ro_task_id,
260 ro_task["target_id"],
261 vim_id,
262 ro_vim_item_update.get("vim_status"),
263 ro_vim_item_update.get("vim_details")
264 if ro_vim_item_update.get("vim_status") != "ACTIVE"
265 else "",
266 )
267 )
268
269 return task_status, ro_vim_item_update
270
271 def delete(self, ro_task, task_index):
272 task = ro_task["tasks"][task_index]
273 task_id = task["task_id"]
274 net_vim_id = ro_task["vim_info"]["vim_id"]
275 ro_vim_item_update_ok = {
276 "vim_status": "DELETED",
277 "created": False,
278 "vim_details": "DELETED",
279 "vim_id": None,
280 }
281
282 try:
283 if net_vim_id or ro_task["vim_info"]["created_items"]:
284 target_vim = self.my_vims[ro_task["target_id"]]
285 target_vim.delete_network(
286 net_vim_id, ro_task["vim_info"]["created_items"]
287 )
288 except vimconn.VimConnNotFoundException:
289 ro_vim_item_update_ok["vim_details"] = "already deleted"
290 except vimconn.VimConnException as e:
291 self.logger.error(
292 "ro_task={} vim={} del-net={}: {}".format(
293 ro_task["_id"], ro_task["target_id"], net_vim_id, e
294 )
295 )
296 ro_vim_item_update = {
297 "vim_status": "VIM_ERROR",
298 "vim_details": "Error while deleting: {}".format(e),
299 }
300
301 return "FAILED", ro_vim_item_update
302
303 self.logger.debug(
304 "task={} {} del-net={} {}".format(
305 task_id,
306 ro_task["target_id"],
307 net_vim_id,
308 ro_vim_item_update_ok.get("vim_details", ""),
309 )
310 )
311
312 return "DONE", ro_vim_item_update_ok
313
314
315 class VimInteractionVdu(VimInteractionBase):
316 max_retries_inject_ssh_key = 20 # 20 times
317 time_retries_inject_ssh_key = 30 # wevery 30 seconds
318
319 def new(self, ro_task, task_index, task_depends):
320 task = ro_task["tasks"][task_index]
321 task_id = task["task_id"]
322 created = False
323 created_items = {}
324 target_vim = self.my_vims[ro_task["target_id"]]
325
326 try:
327 created = True
328 params = task["params"]
329 params_copy = deepcopy(params)
330 net_list = params_copy["net_list"]
331
332 for net in net_list:
333 # change task_id into network_id
334 if "net_id" in net and net["net_id"].startswith("TASK-"):
335 network_id = task_depends[net["net_id"]]
336
337 if not network_id:
338 raise NsWorkerException(
339 "Cannot create VM because depends on a network not created or found "
340 "for {}".format(net["net_id"])
341 )
342
343 net["net_id"] = network_id
344
345 if params_copy["image_id"].startswith("TASK-"):
346 params_copy["image_id"] = task_depends[params_copy["image_id"]]
347
348 if params_copy["flavor_id"].startswith("TASK-"):
349 params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
350
351 vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
352 interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
353
354 ro_vim_item_update = {
355 "vim_id": vim_vm_id,
356 "vim_status": "BUILD",
357 "created": created,
358 "created_items": created_items,
359 "vim_details": None,
360 "interfaces_vim_ids": interfaces,
361 "interfaces": [],
362 }
363 self.logger.debug(
364 "task={} {} new-vm={} created={}".format(
365 task_id, ro_task["target_id"], vim_vm_id, created
366 )
367 )
368
369 return "BUILD", ro_vim_item_update
370 except (vimconn.VimConnException, NsWorkerException) as e:
371 self.logger.error(
372 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
373 )
374 ro_vim_item_update = {
375 "vim_status": "VIM_ERROR",
376 "created": created,
377 "vim_details": str(e),
378 }
379
380 return "FAILED", ro_vim_item_update
381
382 def delete(self, ro_task, task_index):
383 task = ro_task["tasks"][task_index]
384 task_id = task["task_id"]
385 vm_vim_id = ro_task["vim_info"]["vim_id"]
386 ro_vim_item_update_ok = {
387 "vim_status": "DELETED",
388 "created": False,
389 "vim_details": "DELETED",
390 "vim_id": None,
391 }
392
393 try:
394 if vm_vim_id or ro_task["vim_info"]["created_items"]:
395 target_vim = self.my_vims[ro_task["target_id"]]
396 target_vim.delete_vminstance(
397 vm_vim_id, ro_task["vim_info"]["created_items"]
398 )
399 except vimconn.VimConnNotFoundException:
400 ro_vim_item_update_ok["vim_details"] = "already deleted"
401 except vimconn.VimConnException as e:
402 self.logger.error(
403 "ro_task={} vim={} del-vm={}: {}".format(
404 ro_task["_id"], ro_task["target_id"], vm_vim_id, e
405 )
406 )
407 ro_vim_item_update = {
408 "vim_status": "VIM_ERROR",
409 "vim_details": "Error while deleting: {}".format(e),
410 }
411
412 return "FAILED", ro_vim_item_update
413
414 self.logger.debug(
415 "task={} {} del-vm={} {}".format(
416 task_id,
417 ro_task["target_id"],
418 vm_vim_id,
419 ro_vim_item_update_ok.get("vim_details", ""),
420 )
421 )
422
423 return "DONE", ro_vim_item_update_ok
424
425 def refresh(self, ro_task):
426 """Call VIM to get vm status"""
427 ro_task_id = ro_task["_id"]
428 target_vim = self.my_vims[ro_task["target_id"]]
429 vim_id = ro_task["vim_info"]["vim_id"]
430
431 if not vim_id:
432 return None, None
433
434 vm_to_refresh_list = [vim_id]
435 try:
436 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
437 vim_info = vim_dict[vim_id]
438
439 if vim_info["status"] == "ACTIVE":
440 task_status = "DONE"
441 elif vim_info["status"] == "BUILD":
442 task_status = "BUILD"
443 else:
444 task_status = "FAILED"
445
446 # try to load and parse vim_information
447 try:
448 vim_info_info = yaml.safe_load(vim_info["vim_info"])
449 if vim_info_info.get("name"):
450 vim_info["name"] = vim_info_info["name"]
451 except Exception:
452 pass
453 except vimconn.VimConnException as e:
454 # Mark all tasks at VIM_ERROR status
455 self.logger.error(
456 "ro_task={} vim={} get-vm={}: {}".format(
457 ro_task_id, ro_task["target_id"], vim_id, e
458 )
459 )
460 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
461 task_status = "FAILED"
462
463 ro_vim_item_update = {}
464
465 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
466 vim_interfaces = []
467 if vim_info.get("interfaces"):
468 for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
469 iface = next(
470 (
471 iface
472 for iface in vim_info["interfaces"]
473 if vim_iface_id == iface["vim_interface_id"]
474 ),
475 None,
476 )
477 # if iface:
478 # iface.pop("vim_info", None)
479 vim_interfaces.append(iface)
480
481 task_create = next(
482 t
483 for t in ro_task["tasks"]
484 if t and t["action"] == "CREATE" and t["status"] != "FINISHED"
485 )
486 if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None:
487 vim_interfaces[task_create["mgmt_vnf_interface"]][
488 "mgmt_vnf_interface"
489 ] = True
490
491 mgmt_vdu_iface = task_create.get(
492 "mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0)
493 )
494 if vim_interfaces:
495 vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
496
497 if ro_task["vim_info"]["interfaces"] != vim_interfaces:
498 ro_vim_item_update["interfaces"] = vim_interfaces
499
500 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
501 ro_vim_item_update["vim_status"] = vim_info["status"]
502
503 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
504 ro_vim_item_update["vim_name"] = vim_info.get("name")
505
506 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
507 if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"):
508 ro_vim_item_update["vim_details"] = vim_info.get("error_msg")
509 elif vim_info["status"] == "DELETED":
510 ro_vim_item_update["vim_id"] = None
511 ro_vim_item_update["vim_details"] = "Deleted externally"
512 else:
513 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
514 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
515
516 if ro_vim_item_update:
517 self.logger.debug(
518 "ro_task={} {} get-vm={}: status={} {}".format(
519 ro_task_id,
520 ro_task["target_id"],
521 vim_id,
522 ro_vim_item_update.get("vim_status"),
523 ro_vim_item_update.get("vim_details")
524 if ro_vim_item_update.get("vim_status") != "ACTIVE"
525 else "",
526 )
527 )
528
529 return task_status, ro_vim_item_update
530
531 def exec(self, ro_task, task_index, task_depends):
532 task = ro_task["tasks"][task_index]
533 task_id = task["task_id"]
534 target_vim = self.my_vims[ro_task["target_id"]]
535 db_task_update = {"retries": 0}
536 retries = task.get("retries", 0)
537
538 try:
539 params = task["params"]
540 params_copy = deepcopy(params)
541 params_copy["ro_key"] = self.db.decrypt(
542 params_copy.pop("private_key"),
543 params_copy.pop("schema_version"),
544 params_copy.pop("salt"),
545 )
546 params_copy["ip_addr"] = params_copy.pop("ip_address")
547 target_vim.inject_user_key(**params_copy)
548 self.logger.debug(
549 "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"])
550 )
551
552 return (
553 "DONE",
554 None,
555 db_task_update,
556 ) # params_copy["key"]
557 except (vimconn.VimConnException, NsWorkerException) as e:
558 retries += 1
559
560 if retries < self.max_retries_inject_ssh_key:
561 return (
562 "BUILD",
563 None,
564 {
565 "retries": retries,
566 "next_retry": self.time_retries_inject_ssh_key,
567 },
568 )
569
570 self.logger.error(
571 "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e)
572 )
573 ro_vim_item_update = {"vim_details": str(e)}
574
575 return "FAILED", ro_vim_item_update, db_task_update
576
577
578 class VimInteractionImage(VimInteractionBase):
579 def new(self, ro_task, task_index, task_depends):
580 task = ro_task["tasks"][task_index]
581 task_id = task["task_id"]
582 created = False
583 created_items = {}
584 target_vim = self.my_vims[ro_task["target_id"]]
585
586 try:
587 # FIND
588 if task.get("find_params"):
589 vim_images = target_vim.get_image_list(**task["find_params"])
590
591 if not vim_images:
592 raise NsWorkerExceptionNotFound(
593 "Image not found with this criteria: '{}'".format(
594 task["find_params"]
595 )
596 )
597 elif len(vim_images) > 1:
598 raise NsWorkerException(
599 "More than one network found with this criteria: '{}'".format(
600 task["find_params"]
601 )
602 )
603 else:
604 vim_image_id = vim_images[0]["id"]
605
606 ro_vim_item_update = {
607 "vim_id": vim_image_id,
608 "vim_status": "DONE",
609 "created": created,
610 "created_items": created_items,
611 "vim_details": None,
612 }
613 self.logger.debug(
614 "task={} {} new-image={} created={}".format(
615 task_id, ro_task["target_id"], vim_image_id, created
616 )
617 )
618
619 return "DONE", ro_vim_item_update
620 except (NsWorkerException, vimconn.VimConnException) as e:
621 self.logger.error(
622 "task={} {} new-image: {}".format(task_id, ro_task["target_id"], e)
623 )
624 ro_vim_item_update = {
625 "vim_status": "VIM_ERROR",
626 "created": created,
627 "vim_details": str(e),
628 }
629
630 return "FAILED", ro_vim_item_update
631
632
633 class VimInteractionFlavor(VimInteractionBase):
634 def delete(self, ro_task, task_index):
635 task = ro_task["tasks"][task_index]
636 task_id = task["task_id"]
637 flavor_vim_id = ro_task["vim_info"]["vim_id"]
638 ro_vim_item_update_ok = {
639 "vim_status": "DELETED",
640 "created": False,
641 "vim_details": "DELETED",
642 "vim_id": None,
643 }
644
645 try:
646 if flavor_vim_id:
647 target_vim = self.my_vims[ro_task["target_id"]]
648 target_vim.delete_flavor(flavor_vim_id)
649 except vimconn.VimConnNotFoundException:
650 ro_vim_item_update_ok["vim_details"] = "already deleted"
651 except vimconn.VimConnException as e:
652 self.logger.error(
653 "ro_task={} vim={} del-flavor={}: {}".format(
654 ro_task["_id"], ro_task["target_id"], flavor_vim_id, e
655 )
656 )
657 ro_vim_item_update = {
658 "vim_status": "VIM_ERROR",
659 "vim_details": "Error while deleting: {}".format(e),
660 }
661
662 return "FAILED", ro_vim_item_update
663
664 self.logger.debug(
665 "task={} {} del-flavor={} {}".format(
666 task_id,
667 ro_task["target_id"],
668 flavor_vim_id,
669 ro_vim_item_update_ok.get("vim_details", ""),
670 )
671 )
672
673 return "DONE", ro_vim_item_update_ok
674
675 def new(self, ro_task, task_index, task_depends):
676 task = ro_task["tasks"][task_index]
677 task_id = task["task_id"]
678 created = False
679 created_items = {}
680 target_vim = self.my_vims[ro_task["target_id"]]
681
682 try:
683 # FIND
684 vim_flavor_id = None
685
686 if task.get("find_params"):
687 try:
688 flavor_data = task["find_params"]["flavor_data"]
689 vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
690 except vimconn.VimConnNotFoundException:
691 pass
692
693 if not vim_flavor_id and task.get("params"):
694 # CREATE
695 flavor_data = task["params"]["flavor_data"]
696 vim_flavor_id = target_vim.new_flavor(flavor_data)
697 created = True
698
699 ro_vim_item_update = {
700 "vim_id": vim_flavor_id,
701 "vim_status": "DONE",
702 "created": created,
703 "created_items": created_items,
704 "vim_details": None,
705 }
706 self.logger.debug(
707 "task={} {} new-flavor={} created={}".format(
708 task_id, ro_task["target_id"], vim_flavor_id, created
709 )
710 )
711
712 return "DONE", ro_vim_item_update
713 except (vimconn.VimConnException, NsWorkerException) as e:
714 self.logger.error(
715 "task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e)
716 )
717 ro_vim_item_update = {
718 "vim_status": "VIM_ERROR",
719 "created": created,
720 "vim_details": str(e),
721 }
722
723 return "FAILED", ro_vim_item_update
724
725
726 class VimInteractionSdnNet(VimInteractionBase):
727 @staticmethod
728 def _match_pci(port_pci, mapping):
729 """
730 Check if port_pci matches with mapping
731 mapping can have brackets to indicate that several chars are accepted. e.g
732 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
733 :param port_pci: text
734 :param mapping: text, can contain brackets to indicate several chars are available
735 :return: True if matches, False otherwise
736 """
737 if not port_pci or not mapping:
738 return False
739 if port_pci == mapping:
740 return True
741
742 mapping_index = 0
743 pci_index = 0
744 while True:
745 bracket_start = mapping.find("[", mapping_index)
746
747 if bracket_start == -1:
748 break
749
750 bracket_end = mapping.find("]", bracket_start)
751 if bracket_end == -1:
752 break
753
754 length = bracket_start - mapping_index
755 if (
756 length
757 and port_pci[pci_index : pci_index + length]
758 != mapping[mapping_index:bracket_start]
759 ):
760 return False
761
762 if (
763 port_pci[pci_index + length]
764 not in mapping[bracket_start + 1 : bracket_end]
765 ):
766 return False
767
768 pci_index += length + 1
769 mapping_index = bracket_end + 1
770
771 if port_pci[pci_index:] != mapping[mapping_index:]:
772 return False
773
774 return True
775
776 def _get_interfaces(self, vlds_to_connect, vim_account_id):
777 """
778 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
779 :param vim_account_id:
780 :return:
781 """
782 interfaces = []
783
784 for vld in vlds_to_connect:
785 table, _, db_id = vld.partition(":")
786 db_id, _, vld = db_id.partition(":")
787 _, _, vld_id = vld.partition(".")
788
789 if table == "vnfrs":
790 q_filter = {"vim-account-id": vim_account_id, "_id": db_id}
791 iface_key = "vnf-vld-id"
792 else: # table == "nsrs"
793 q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id}
794 iface_key = "ns-vld-id"
795
796 db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter)
797
798 for db_vnfr in db_vnfrs:
799 for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())):
800 for iface_index, interface in enumerate(vdur["interfaces"]):
801 if interface.get(iface_key) == vld_id and interface.get(
802 "type"
803 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
804 # only SR-IOV o PT
805 interface_ = interface.copy()
806 interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
807 db_vnfr["_id"], vdu_index, iface_index
808 )
809
810 if vdur.get("status") == "ERROR":
811 interface_["status"] = "ERROR"
812
813 interfaces.append(interface_)
814
815 return interfaces
816
817 def refresh(self, ro_task):
818 # look for task create
819 task_create_index, _ = next(
820 i_t
821 for i_t in enumerate(ro_task["tasks"])
822 if i_t[1]
823 and i_t[1]["action"] == "CREATE"
824 and i_t[1]["status"] != "FINISHED"
825 )
826
827 return self.new(ro_task, task_create_index, None)
828
829 def new(self, ro_task, task_index, task_depends):
830
831 task = ro_task["tasks"][task_index]
832 task_id = task["task_id"]
833 target_vim = self.my_vims[ro_task["target_id"]]
834
835 sdn_net_id = ro_task["vim_info"]["vim_id"]
836
837 created_items = ro_task["vim_info"].get("created_items")
838 connected_ports = ro_task["vim_info"].get("connected_ports", [])
839 new_connected_ports = []
840 last_update = ro_task["vim_info"].get("last_update", 0)
841 sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD"
842 error_list = []
843 created = ro_task["vim_info"].get("created", False)
844
845 try:
846 # CREATE
847 params = task["params"]
848 vlds_to_connect = params["vlds"]
849 associated_vim = params["target_vim"]
850 # external additional ports
851 additional_ports = params.get("sdn-ports") or ()
852 _, _, vim_account_id = associated_vim.partition(":")
853
854 if associated_vim:
855 # get associated VIM
856 if associated_vim not in self.db_vims:
857 self.db_vims[associated_vim] = self.db.get_one(
858 "vim_accounts", {"_id": vim_account_id}
859 )
860
861 db_vim = self.db_vims[associated_vim]
862
863 # look for ports to connect
864 ports = self._get_interfaces(vlds_to_connect, vim_account_id)
865 # print(ports)
866
867 sdn_ports = []
868 pending_ports = error_ports = 0
869 vlan_used = None
870 sdn_need_update = False
871
872 for port in ports:
873 vlan_used = port.get("vlan") or vlan_used
874
875 # TODO. Do not connect if already done
876 if not port.get("compute_node") or not port.get("pci"):
877 if port.get("status") == "ERROR":
878 error_ports += 1
879 else:
880 pending_ports += 1
881 continue
882
883 pmap = None
884 compute_node_mappings = next(
885 (
886 c
887 for c in db_vim["config"].get("sdn-port-mapping", ())
888 if c and c["compute_node"] == port["compute_node"]
889 ),
890 None,
891 )
892
893 if compute_node_mappings:
894 # process port_mapping pci of type 0000:af:1[01].[1357]
895 pmap = next(
896 (
897 p
898 for p in compute_node_mappings["ports"]
899 if self._match_pci(port["pci"], p.get("pci"))
900 ),
901 None,
902 )
903
904 if not pmap:
905 if not db_vim["config"].get("mapping_not_needed"):
906 error_list.append(
907 "Port mapping not found for compute_node={} pci={}".format(
908 port["compute_node"], port["pci"]
909 )
910 )
911 continue
912
913 pmap = {}
914
915 service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"])
916 new_port = {
917 "service_endpoint_id": pmap.get("service_endpoint_id")
918 or service_endpoint_id,
919 "service_endpoint_encapsulation_type": "dot1q"
920 if port["type"] == "SR-IOV"
921 else None,
922 "service_endpoint_encapsulation_info": {
923 "vlan": port.get("vlan"),
924 "mac": port.get("mac_address"),
925 "device_id": pmap.get("device_id") or port["compute_node"],
926 "device_interface_id": pmap.get("device_interface_id")
927 or port["pci"],
928 "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"),
929 "switch_port": pmap.get("switch_port"),
930 "service_mapping_info": pmap.get("service_mapping_info"),
931 },
932 }
933
934 # TODO
935 # if port["modified_at"] > last_update:
936 # sdn_need_update = True
937 new_connected_ports.append(port["id"]) # TODO
938 sdn_ports.append(new_port)
939
940 if error_ports:
941 error_list.append(
942 "{} interfaces have not been created as VDU is on ERROR status".format(
943 error_ports
944 )
945 )
946
947 # connect external ports
948 for index, additional_port in enumerate(additional_ports):
949 additional_port_id = additional_port.get(
950 "service_endpoint_id"
951 ) or "external-{}".format(index)
952 sdn_ports.append(
953 {
954 "service_endpoint_id": additional_port_id,
955 "service_endpoint_encapsulation_type": additional_port.get(
956 "service_endpoint_encapsulation_type", "dot1q"
957 ),
958 "service_endpoint_encapsulation_info": {
959 "vlan": additional_port.get("vlan") or vlan_used,
960 "mac": additional_port.get("mac_address"),
961 "device_id": additional_port.get("device_id"),
962 "device_interface_id": additional_port.get(
963 "device_interface_id"
964 ),
965 "switch_dpid": additional_port.get("switch_dpid")
966 or additional_port.get("switch_id"),
967 "switch_port": additional_port.get("switch_port"),
968 "service_mapping_info": additional_port.get(
969 "service_mapping_info"
970 ),
971 },
972 }
973 )
974 new_connected_ports.append(additional_port_id)
975 sdn_info = ""
976
977 # if there are more ports to connect or they have been modified, call create/update
978 if error_list:
979 sdn_status = "ERROR"
980 sdn_info = "; ".join(error_list)
981 elif set(connected_ports) != set(new_connected_ports) or sdn_need_update:
982 last_update = time.time()
983
984 if not sdn_net_id:
985 if len(sdn_ports) < 2:
986 sdn_status = "ACTIVE"
987
988 if not pending_ports:
989 self.logger.debug(
990 "task={} {} new-sdn-net done, less than 2 ports".format(
991 task_id, ro_task["target_id"]
992 )
993 )
994 else:
995 net_type = params.get("type") or "ELAN"
996 (
997 sdn_net_id,
998 created_items,
999 ) = target_vim.create_connectivity_service(net_type, sdn_ports)
1000 created = True
1001 self.logger.debug(
1002 "task={} {} new-sdn-net={} created={}".format(
1003 task_id, ro_task["target_id"], sdn_net_id, created
1004 )
1005 )
1006 else:
1007 created_items = target_vim.edit_connectivity_service(
1008 sdn_net_id, conn_info=created_items, connection_points=sdn_ports
1009 )
1010 created = True
1011 self.logger.debug(
1012 "task={} {} update-sdn-net={} created={}".format(
1013 task_id, ro_task["target_id"], sdn_net_id, created
1014 )
1015 )
1016
1017 connected_ports = new_connected_ports
1018 elif sdn_net_id:
1019 wim_status_dict = target_vim.get_connectivity_service_status(
1020 sdn_net_id, conn_info=created_items
1021 )
1022 sdn_status = wim_status_dict["sdn_status"]
1023
1024 if wim_status_dict.get("sdn_info"):
1025 sdn_info = str(wim_status_dict.get("sdn_info")) or ""
1026
1027 if wim_status_dict.get("error_msg"):
1028 sdn_info = wim_status_dict.get("error_msg") or ""
1029
1030 if pending_ports:
1031 if sdn_status != "ERROR":
1032 sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1033 len(ports) - pending_ports, len(ports)
1034 )
1035
1036 if sdn_status == "ACTIVE":
1037 sdn_status = "BUILD"
1038
1039 ro_vim_item_update = {
1040 "vim_id": sdn_net_id,
1041 "vim_status": sdn_status,
1042 "created": created,
1043 "created_items": created_items,
1044 "connected_ports": connected_ports,
1045 "vim_details": sdn_info,
1046 "last_update": last_update,
1047 }
1048
1049 return sdn_status, ro_vim_item_update
1050 except Exception as e:
1051 self.logger.error(
1052 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e),
1053 exc_info=not isinstance(
1054 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1055 ),
1056 )
1057 ro_vim_item_update = {
1058 "vim_status": "VIM_ERROR",
1059 "created": created,
1060 "vim_details": str(e),
1061 }
1062
1063 return "FAILED", ro_vim_item_update
1064
1065 def delete(self, ro_task, task_index):
1066 task = ro_task["tasks"][task_index]
1067 task_id = task["task_id"]
1068 sdn_vim_id = ro_task["vim_info"].get("vim_id")
1069 ro_vim_item_update_ok = {
1070 "vim_status": "DELETED",
1071 "created": False,
1072 "vim_details": "DELETED",
1073 "vim_id": None,
1074 }
1075
1076 try:
1077 if sdn_vim_id:
1078 target_vim = self.my_vims[ro_task["target_id"]]
1079 target_vim.delete_connectivity_service(
1080 sdn_vim_id, ro_task["vim_info"].get("created_items")
1081 )
1082
1083 except Exception as e:
1084 if (
1085 isinstance(e, sdnconn.SdnConnectorError)
1086 and e.http_code == HTTPStatus.NOT_FOUND.value
1087 ):
1088 ro_vim_item_update_ok["vim_details"] = "already deleted"
1089 else:
1090 self.logger.error(
1091 "ro_task={} vim={} del-sdn-net={}: {}".format(
1092 ro_task["_id"], ro_task["target_id"], sdn_vim_id, e
1093 ),
1094 exc_info=not isinstance(
1095 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1096 ),
1097 )
1098 ro_vim_item_update = {
1099 "vim_status": "VIM_ERROR",
1100 "vim_details": "Error while deleting: {}".format(e),
1101 }
1102
1103 return "FAILED", ro_vim_item_update
1104
1105 self.logger.debug(
1106 "task={} {} del-sdn-net={} {}".format(
1107 task_id,
1108 ro_task["target_id"],
1109 sdn_vim_id,
1110 ro_vim_item_update_ok.get("vim_details", ""),
1111 )
1112 )
1113
1114 return "DONE", ro_vim_item_update_ok
1115
1116
1117 class NsWorker(threading.Thread):
1118 REFRESH_BUILD = 5 # 5 seconds
1119 REFRESH_ACTIVE = 60 # 1 minute
1120 REFRESH_ERROR = 600
1121 REFRESH_IMAGE = 3600 * 10
1122 REFRESH_DELETE = 3600 * 10
1123 QUEUE_SIZE = 100
1124 terminate = False
1125
1126 def __init__(self, worker_index, config, plugins, db):
1127 """
1128
1129 :param worker_index: thread index
1130 :param config: general configuration of RO, among others the process_id with the docker id where it runs
1131 :param plugins: global shared dict with the loaded plugins
1132 :param db: database class instance to use
1133 """
1134 threading.Thread.__init__(self)
1135 self.config = config
1136 self.plugins = plugins
1137 self.plugin_name = "unknown"
1138 self.logger = logging.getLogger("ro.worker{}".format(worker_index))
1139 self.worker_index = worker_index
1140 self.task_queue = queue.Queue(self.QUEUE_SIZE)
1141 # targetvim: vimplugin class
1142 self.my_vims = {}
1143 # targetvim: vim information from database
1144 self.db_vims = {}
1145 # targetvim list
1146 self.vim_targets = []
1147 self.my_id = config["process_id"] + ":" + str(worker_index)
1148 self.db = db
1149 self.item2class = {
1150 "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger),
1151 "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger),
1152 "image": VimInteractionImage(
1153 self.db, self.my_vims, self.db_vims, self.logger
1154 ),
1155 "flavor": VimInteractionFlavor(
1156 self.db, self.my_vims, self.db_vims, self.logger
1157 ),
1158 "sdn_net": VimInteractionSdnNet(
1159 self.db, self.my_vims, self.db_vims, self.logger
1160 ),
1161 }
1162 self.time_last_task_processed = None
1163 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1164 self.tasks_to_delete = []
1165 # it is idle when there are not vim_targets associated
1166 self.idle = True
1167 self.task_locked_time = config["global"]["task_locked_time"]
1168
1169 def insert_task(self, task):
1170 try:
1171 self.task_queue.put(task, False)
1172 return None
1173 except queue.Full:
1174 raise NsWorkerException("timeout inserting a task")
1175
1176 def terminate(self):
1177 self.insert_task("exit")
1178
1179 def del_task(self, task):
1180 with self.task_lock:
1181 if task["status"] == "SCHEDULED":
1182 task["status"] = "SUPERSEDED"
1183 return True
1184 else: # task["status"] == "processing"
1185 self.task_lock.release()
1186 return False
1187
1188 def _process_vim_config(self, target_id, db_vim):
1189 """
1190 Process vim config, creating vim configuration files as ca_cert
1191 :param target_id: vim/sdn/wim + id
1192 :param db_vim: Vim dictionary obtained from database
1193 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1194 """
1195 if not db_vim.get("config"):
1196 return
1197
1198 file_name = ""
1199
1200 try:
1201 if db_vim["config"].get("ca_cert_content"):
1202 file_name = "{}:{}".format(target_id, self.worker_index)
1203
1204 try:
1205 mkdir(file_name)
1206 except FileExistsError:
1207 pass
1208
1209 file_name = file_name + "/ca_cert"
1210
1211 with open(file_name, "w") as f:
1212 f.write(db_vim["config"]["ca_cert_content"])
1213 del db_vim["config"]["ca_cert_content"]
1214 db_vim["config"]["ca_cert"] = file_name
1215 except Exception as e:
1216 raise NsWorkerException(
1217 "Error writing to file '{}': {}".format(file_name, e)
1218 )
1219
1220 def _load_plugin(self, name, type="vim"):
1221 # type can be vim or sdn
1222 if "rovim_dummy" not in self.plugins:
1223 self.plugins["rovim_dummy"] = VimDummyConnector
1224
1225 if "rosdn_dummy" not in self.plugins:
1226 self.plugins["rosdn_dummy"] = SdnDummyConnector
1227
1228 if name in self.plugins:
1229 return self.plugins[name]
1230
1231 try:
1232 for v in iter_entry_points("osm_ro{}.plugins".format(type), name):
1233 self.plugins[name] = v.load()
1234 except Exception as e:
1235 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e))
1236
1237 if name and name not in self.plugins:
1238 raise NsWorkerException(
1239 "Plugin 'osm_{n}' has not been installed".format(n=name)
1240 )
1241
1242 return self.plugins[name]
1243
1244 def _unload_vim(self, target_id):
1245 """
1246 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1247 :param target_id: Contains type:_id; where type can be 'vim', ...
1248 :return: None.
1249 """
1250 try:
1251 self.db_vims.pop(target_id, None)
1252 self.my_vims.pop(target_id, None)
1253
1254 if target_id in self.vim_targets:
1255 self.vim_targets.remove(target_id)
1256
1257 self.logger.info("Unloaded {}".format(target_id))
1258 rmtree("{}:{}".format(target_id, self.worker_index))
1259 except FileNotFoundError:
1260 pass # this is raised by rmtree if folder does not exist
1261 except Exception as e:
1262 self.logger.error("Cannot unload {}: {}".format(target_id, e))
1263
1264 def _check_vim(self, target_id):
1265 """
1266 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1267 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1268 :return: None.
1269 """
1270 target, _, _id = target_id.partition(":")
1271 now = time.time()
1272 update_dict = {}
1273 unset_dict = {}
1274 op_text = ""
1275 step = ""
1276 loaded = target_id in self.vim_targets
1277 target_database = (
1278 "vim_accounts"
1279 if target == "vim"
1280 else "wim_accounts"
1281 if target == "wim"
1282 else "sdns"
1283 )
1284
1285 try:
1286 step = "Getting {} from db".format(target_id)
1287 db_vim = self.db.get_one(target_database, {"_id": _id})
1288
1289 for op_index, operation in enumerate(
1290 db_vim["_admin"].get("operations", ())
1291 ):
1292 if operation["operationState"] != "PROCESSING":
1293 continue
1294
1295 locked_at = operation.get("locked_at")
1296
1297 if locked_at is not None and locked_at >= now - self.task_locked_time:
1298 # some other thread is doing this operation
1299 return
1300
1301 # lock
1302 op_text = "_admin.operations.{}.".format(op_index)
1303
1304 if not self.db.set_one(
1305 target_database,
1306 q_filter={
1307 "_id": _id,
1308 op_text + "operationState": "PROCESSING",
1309 op_text + "locked_at": locked_at,
1310 },
1311 update_dict={
1312 op_text + "locked_at": now,
1313 "admin.current_operation": op_index,
1314 },
1315 fail_on_empty=False,
1316 ):
1317 return
1318
1319 unset_dict[op_text + "locked_at"] = None
1320 unset_dict["current_operation"] = None
1321 step = "Loading " + target_id
1322 error_text = self._load_vim(target_id)
1323
1324 if not error_text:
1325 step = "Checking connectivity"
1326
1327 if target == "vim":
1328 self.my_vims[target_id].check_vim_connectivity()
1329 else:
1330 self.my_vims[target_id].check_credentials()
1331
1332 update_dict["_admin.operationalState"] = "ENABLED"
1333 update_dict["_admin.detailed-status"] = ""
1334 unset_dict[op_text + "detailed-status"] = None
1335 update_dict[op_text + "operationState"] = "COMPLETED"
1336
1337 return
1338
1339 except Exception as e:
1340 error_text = "{}: {}".format(step, e)
1341 self.logger.error("{} for {}: {}".format(step, target_id, e))
1342
1343 finally:
1344 if update_dict or unset_dict:
1345 if error_text:
1346 update_dict[op_text + "operationState"] = "FAILED"
1347 update_dict[op_text + "detailed-status"] = error_text
1348 unset_dict.pop(op_text + "detailed-status", None)
1349 update_dict["_admin.operationalState"] = "ERROR"
1350 update_dict["_admin.detailed-status"] = error_text
1351
1352 if op_text:
1353 update_dict[op_text + "statusEnteredTime"] = now
1354
1355 self.db.set_one(
1356 target_database,
1357 q_filter={"_id": _id},
1358 update_dict=update_dict,
1359 unset=unset_dict,
1360 fail_on_empty=False,
1361 )
1362
1363 if not loaded:
1364 self._unload_vim(target_id)
1365
1366 def _reload_vim(self, target_id):
1367 if target_id in self.vim_targets:
1368 self._load_vim(target_id)
1369 else:
1370 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1371 # just remove it to force load again next time it is needed
1372 self.db_vims.pop(target_id, None)
1373
1374 def _load_vim(self, target_id):
1375 """
1376 Load or reload a vim_account, sdn_controller or wim_account.
1377 Read content from database, load the plugin if not loaded.
1378 In case of error loading the plugin, it load a failing VIM_connector
1379 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1380 :param target_id: Contains type:_id; where type can be 'vim', ...
1381 :return: None if ok, descriptive text if error
1382 """
1383 target, _, _id = target_id.partition(":")
1384 target_database = (
1385 "vim_accounts"
1386 if target == "vim"
1387 else "wim_accounts"
1388 if target == "wim"
1389 else "sdns"
1390 )
1391 plugin_name = ""
1392 vim = None
1393
1394 try:
1395 step = "Getting {}={} from db".format(target, _id)
1396 # TODO process for wim, sdnc, ...
1397 vim = self.db.get_one(target_database, {"_id": _id})
1398
1399 # if deep_get(vim, "config", "sdn-controller"):
1400 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1401 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1402
1403 step = "Decrypting password"
1404 schema_version = vim.get("schema_version")
1405 self.db.encrypt_decrypt_fields(
1406 vim,
1407 "decrypt",
1408 fields=("password", "secret"),
1409 schema_version=schema_version,
1410 salt=_id,
1411 )
1412 self._process_vim_config(target_id, vim)
1413
1414 if target == "vim":
1415 plugin_name = "rovim_" + vim["vim_type"]
1416 step = "Loading plugin '{}'".format(plugin_name)
1417 vim_module_conn = self._load_plugin(plugin_name)
1418 step = "Loading {}'".format(target_id)
1419 self.my_vims[target_id] = vim_module_conn(
1420 uuid=vim["_id"],
1421 name=vim["name"],
1422 tenant_id=vim.get("vim_tenant_id"),
1423 tenant_name=vim.get("vim_tenant_name"),
1424 url=vim["vim_url"],
1425 url_admin=None,
1426 user=vim["vim_user"],
1427 passwd=vim["vim_password"],
1428 config=vim.get("config") or {},
1429 persistent_info={},
1430 )
1431 else: # sdn
1432 plugin_name = "rosdn_" + vim["type"]
1433 step = "Loading plugin '{}'".format(plugin_name)
1434 vim_module_conn = self._load_plugin(plugin_name, "sdn")
1435 step = "Loading {}'".format(target_id)
1436 wim = deepcopy(vim)
1437 wim_config = wim.pop("config", {}) or {}
1438 wim["uuid"] = wim["_id"]
1439 wim["wim_url"] = wim["url"]
1440
1441 if wim.get("dpid"):
1442 wim_config["dpid"] = wim.pop("dpid")
1443
1444 if wim.get("switch_id"):
1445 wim_config["switch_id"] = wim.pop("switch_id")
1446
1447 # wim, wim_account, config
1448 self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config)
1449 self.db_vims[target_id] = vim
1450 self.error_status = None
1451
1452 self.logger.info(
1453 "Connector loaded for {}, plugin={}".format(target_id, plugin_name)
1454 )
1455 except Exception as e:
1456 self.logger.error(
1457 "Cannot load {} plugin={}: {} {}".format(
1458 target_id, plugin_name, step, e
1459 )
1460 )
1461
1462 self.db_vims[target_id] = vim or {}
1463 self.db_vims[target_id] = FailingConnector(str(e))
1464 error_status = "{} Error: {}".format(step, e)
1465
1466 return error_status
1467 finally:
1468 if target_id not in self.vim_targets:
1469 self.vim_targets.append(target_id)
1470
1471 def _get_db_task(self):
1472 """
1473 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1474 :return: None
1475 """
1476 now = time.time()
1477
1478 if not self.time_last_task_processed:
1479 self.time_last_task_processed = now
1480
1481 try:
1482 while True:
1483 locked = self.db.set_one(
1484 "ro_tasks",
1485 q_filter={
1486 "target_id": self.vim_targets,
1487 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1488 "locked_at.lt": now - self.task_locked_time,
1489 "to_check_at.lt": self.time_last_task_processed,
1490 },
1491 update_dict={"locked_by": self.my_id, "locked_at": now},
1492 fail_on_empty=False,
1493 )
1494
1495 if locked:
1496 # read and return
1497 ro_task = self.db.get_one(
1498 "ro_tasks",
1499 q_filter={
1500 "target_id": self.vim_targets,
1501 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1502 "locked_at": now,
1503 },
1504 )
1505 return ro_task
1506
1507 if self.time_last_task_processed == now:
1508 self.time_last_task_processed = None
1509 return None
1510 else:
1511 self.time_last_task_processed = now
1512 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
1513
1514 except DbException as e:
1515 self.logger.error("Database exception at _get_db_task: {}".format(e))
1516 except Exception as e:
1517 self.logger.critical(
1518 "Unexpected exception at _get_db_task: {}".format(e), exc_info=True
1519 )
1520
1521 return None
1522
1523 def _delete_task(self, ro_task, task_index, task_depends, db_update):
1524 """
1525 Determine if this task need to be done or superseded
1526 :return: None
1527 """
1528 my_task = ro_task["tasks"][task_index]
1529 task_id = my_task["task_id"]
1530 needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get(
1531 "created_items", False
1532 )
1533
1534 if my_task["status"] == "FAILED":
1535 return None, None # TODO need to be retry??
1536
1537 try:
1538 for index, task in enumerate(ro_task["tasks"]):
1539 if index == task_index or not task:
1540 continue # own task
1541
1542 if (
1543 my_task["target_record"] == task["target_record"]
1544 and task["action"] == "CREATE"
1545 ):
1546 # set to finished
1547 db_update["tasks.{}.status".format(index)] = task[
1548 "status"
1549 ] = "FINISHED"
1550 elif task["action"] == "CREATE" and task["status"] not in (
1551 "FINISHED",
1552 "SUPERSEDED",
1553 ):
1554 needed_delete = False
1555
1556 if needed_delete:
1557 return self.item2class[my_task["item"]].delete(ro_task, task_index)
1558 else:
1559 return "SUPERSEDED", None
1560 except Exception as e:
1561 if not isinstance(e, NsWorkerException):
1562 self.logger.critical(
1563 "Unexpected exception at _delete_task task={}: {}".format(
1564 task_id, e
1565 ),
1566 exc_info=True,
1567 )
1568
1569 return "FAILED", {"vim_status": "VIM_ERROR", "vim_details": str(e)}
1570
1571 def _create_task(self, ro_task, task_index, task_depends, db_update):
1572 """
1573 Determine if this task need to create something at VIM
1574 :return: None
1575 """
1576 my_task = ro_task["tasks"][task_index]
1577 task_id = my_task["task_id"]
1578 task_status = None
1579
1580 if my_task["status"] == "FAILED":
1581 return None, None # TODO need to be retry??
1582 elif my_task["status"] == "SCHEDULED":
1583 # check if already created by another task
1584 for index, task in enumerate(ro_task["tasks"]):
1585 if index == task_index or not task:
1586 continue # own task
1587
1588 if task["action"] == "CREATE" and task["status"] not in (
1589 "SCHEDULED",
1590 "FINISHED",
1591 "SUPERSEDED",
1592 ):
1593 return task["status"], "COPY_VIM_INFO"
1594
1595 try:
1596 task_status, ro_vim_item_update = self.item2class[my_task["item"]].new(
1597 ro_task, task_index, task_depends
1598 )
1599 # TODO update other CREATE tasks
1600 except Exception as e:
1601 if not isinstance(e, NsWorkerException):
1602 self.logger.error(
1603 "Error executing task={}: {}".format(task_id, e), exc_info=True
1604 )
1605
1606 task_status = "FAILED"
1607 ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_details": str(e)}
1608 # TODO update ro_vim_item_update
1609
1610 return task_status, ro_vim_item_update
1611 else:
1612 return None, None
1613
1614 def _get_dependency(self, task_id, ro_task=None, target_id=None):
1615 """
1616 Look for dependency task
1617 :param task_id: Can be one of
1618 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1619 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1620 3. task.task_id: "<action_id>:number"
1621 :param ro_task:
1622 :param target_id:
1623 :return: database ro_task plus index of task
1624 """
1625 if (
1626 task_id.startswith("vim:")
1627 or task_id.startswith("sdn:")
1628 or task_id.startswith("wim:")
1629 ):
1630 target_id, _, task_id = task_id.partition(" ")
1631
1632 if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
1633 ro_task_dependency = self.db.get_one(
1634 "ro_tasks",
1635 q_filter={"target_id": target_id, "tasks.target_record_id": task_id},
1636 fail_on_empty=False,
1637 )
1638
1639 if ro_task_dependency:
1640 for task_index, task in enumerate(ro_task_dependency["tasks"]):
1641 if task["target_record_id"] == task_id:
1642 return ro_task_dependency, task_index
1643
1644 else:
1645 if ro_task:
1646 for task_index, task in enumerate(ro_task["tasks"]):
1647 if task and task["task_id"] == task_id:
1648 return ro_task, task_index
1649
1650 ro_task_dependency = self.db.get_one(
1651 "ro_tasks",
1652 q_filter={
1653 "tasks.ANYINDEX.task_id": task_id,
1654 "tasks.ANYINDEX.target_record.ne": None,
1655 },
1656 fail_on_empty=False,
1657 )
1658
1659 if ro_task_dependency:
1660 for task_index, task in ro_task_dependency["tasks"]:
1661 if task["task_id"] == task_id:
1662 return ro_task_dependency, task_index
1663 raise NsWorkerException("Cannot get depending task {}".format(task_id))
1664
1665 def _process_pending_tasks(self, ro_task):
1666 ro_task_id = ro_task["_id"]
1667 now = time.time()
1668 # one day
1669 next_check_at = now + (24 * 60 * 60)
1670 db_ro_task_update = {}
1671
1672 def _update_refresh(new_status):
1673 # compute next_refresh
1674 nonlocal task
1675 nonlocal next_check_at
1676 nonlocal db_ro_task_update
1677 nonlocal ro_task
1678
1679 next_refresh = time.time()
1680
1681 if task["item"] in ("image", "flavor"):
1682 next_refresh += self.REFRESH_IMAGE
1683 elif new_status == "BUILD":
1684 next_refresh += self.REFRESH_BUILD
1685 elif new_status == "DONE":
1686 next_refresh += self.REFRESH_ACTIVE
1687 else:
1688 next_refresh += self.REFRESH_ERROR
1689
1690 next_check_at = min(next_check_at, next_refresh)
1691 db_ro_task_update["vim_info.refresh_at"] = next_refresh
1692 ro_task["vim_info"]["refresh_at"] = next_refresh
1693
1694 try:
1695 # 0: get task_status_create
1696 lock_object = None
1697 task_status_create = None
1698 task_create = next(
1699 (
1700 t
1701 for t in ro_task["tasks"]
1702 if t
1703 and t["action"] == "CREATE"
1704 and t["status"] in ("BUILD", "DONE")
1705 ),
1706 None,
1707 )
1708
1709 if task_create:
1710 task_status_create = task_create["status"]
1711
1712 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
1713 for task_action in ("DELETE", "CREATE", "EXEC"):
1714 db_vim_update = None
1715 new_status = None
1716
1717 for task_index, task in enumerate(ro_task["tasks"]):
1718 if not task:
1719 continue # task deleted
1720
1721 task_depends = {}
1722 target_update = None
1723
1724 if (
1725 (
1726 task_action in ("DELETE", "EXEC")
1727 and task["status"] not in ("SCHEDULED", "BUILD")
1728 )
1729 or task["action"] != task_action
1730 or (
1731 task_action == "CREATE"
1732 and task["status"] in ("FINISHED", "SUPERSEDED")
1733 )
1734 ):
1735 continue
1736
1737 task_path = "tasks.{}.status".format(task_index)
1738 try:
1739 db_vim_info_update = None
1740
1741 if task["status"] == "SCHEDULED":
1742 # check if tasks that this depends on have been completed
1743 dependency_not_completed = False
1744
1745 for dependency_task_id in task.get("depends_on") or ():
1746 (
1747 dependency_ro_task,
1748 dependency_task_index,
1749 ) = self._get_dependency(
1750 dependency_task_id, target_id=ro_task["target_id"]
1751 )
1752 dependency_task = dependency_ro_task["tasks"][
1753 dependency_task_index
1754 ]
1755
1756 if dependency_task["status"] == "SCHEDULED":
1757 dependency_not_completed = True
1758 next_check_at = min(
1759 next_check_at, dependency_ro_task["to_check_at"]
1760 )
1761 break
1762 elif dependency_task["status"] == "FAILED":
1763 error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
1764 task["action"],
1765 task["item"],
1766 dependency_task["action"],
1767 dependency_task["item"],
1768 dependency_task_id,
1769 dependency_ro_task["vim_info"].get(
1770 "vim_details"
1771 ),
1772 )
1773 self.logger.error(
1774 "task={} {}".format(task["task_id"], error_text)
1775 )
1776 raise NsWorkerException(error_text)
1777
1778 task_depends[dependency_task_id] = dependency_ro_task[
1779 "vim_info"
1780 ]["vim_id"]
1781 task_depends[
1782 "TASK-{}".format(dependency_task_id)
1783 ] = dependency_ro_task["vim_info"]["vim_id"]
1784
1785 if dependency_not_completed:
1786 # TODO set at vim_info.vim_details that it is waiting
1787 continue
1788
1789 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
1790 # the task of renew this locking. It will update database locket_at periodically
1791 if not lock_object:
1792 lock_object = LockRenew.add_lock_object(
1793 "ro_tasks", ro_task, self
1794 )
1795
1796 if task["action"] == "DELETE":
1797 (new_status, db_vim_info_update,) = self._delete_task(
1798 ro_task, task_index, task_depends, db_ro_task_update
1799 )
1800 new_status = (
1801 "FINISHED" if new_status == "DONE" else new_status
1802 )
1803 # ^with FINISHED instead of DONE it will not be refreshing
1804
1805 if new_status in ("FINISHED", "SUPERSEDED"):
1806 target_update = "DELETE"
1807 elif task["action"] == "EXEC":
1808 (
1809 new_status,
1810 db_vim_info_update,
1811 db_task_update,
1812 ) = self.item2class[task["item"]].exec(
1813 ro_task, task_index, task_depends
1814 )
1815 new_status = (
1816 "FINISHED" if new_status == "DONE" else new_status
1817 )
1818 # ^with FINISHED instead of DONE it will not be refreshing
1819
1820 if db_task_update:
1821 # load into database the modified db_task_update "retries" and "next_retry"
1822 if db_task_update.get("retries"):
1823 db_ro_task_update[
1824 "tasks.{}.retries".format(task_index)
1825 ] = db_task_update["retries"]
1826
1827 next_check_at = time.time() + db_task_update.get(
1828 "next_retry", 60
1829 )
1830 target_update = None
1831 elif task["action"] == "CREATE":
1832 if task["status"] == "SCHEDULED":
1833 if task_status_create:
1834 new_status = task_status_create
1835 target_update = "COPY_VIM_INFO"
1836 else:
1837 new_status, db_vim_info_update = self.item2class[
1838 task["item"]
1839 ].new(ro_task, task_index, task_depends)
1840 # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
1841 _update_refresh(new_status)
1842 else:
1843 if (
1844 ro_task["vim_info"]["refresh_at"]
1845 and now > ro_task["vim_info"]["refresh_at"]
1846 ):
1847 new_status, db_vim_info_update = self.item2class[
1848 task["item"]
1849 ].refresh(ro_task)
1850 _update_refresh(new_status)
1851
1852 except Exception as e:
1853 new_status = "FAILED"
1854 db_vim_info_update = {
1855 "vim_status": "VIM_ERROR",
1856 "vim_details": str(e),
1857 }
1858
1859 if not isinstance(
1860 e, (NsWorkerException, vimconn.VimConnException)
1861 ):
1862 self.logger.error(
1863 "Unexpected exception at _delete_task task={}: {}".format(
1864 task["task_id"], e
1865 ),
1866 exc_info=True,
1867 )
1868
1869 try:
1870 if db_vim_info_update:
1871 db_vim_update = db_vim_info_update.copy()
1872 db_ro_task_update.update(
1873 {
1874 "vim_info." + k: v
1875 for k, v in db_vim_info_update.items()
1876 }
1877 )
1878 ro_task["vim_info"].update(db_vim_info_update)
1879
1880 if new_status:
1881 if task_action == "CREATE":
1882 task_status_create = new_status
1883 db_ro_task_update[task_path] = new_status
1884
1885 if target_update or db_vim_update:
1886 if target_update == "DELETE":
1887 self._update_target(task, None)
1888 elif target_update == "COPY_VIM_INFO":
1889 self._update_target(task, ro_task["vim_info"])
1890 else:
1891 self._update_target(task, db_vim_update)
1892
1893 except Exception as e:
1894 if (
1895 isinstance(e, DbException)
1896 and e.http_code == HTTPStatus.NOT_FOUND
1897 ):
1898 # if the vnfrs or nsrs has been removed from database, this task must be removed
1899 self.logger.debug(
1900 "marking to delete task={}".format(task["task_id"])
1901 )
1902 self.tasks_to_delete.append(task)
1903 else:
1904 self.logger.error(
1905 "Unexpected exception at _update_target task={}: {}".format(
1906 task["task_id"], e
1907 ),
1908 exc_info=True,
1909 )
1910
1911 locked_at = ro_task["locked_at"]
1912
1913 if lock_object:
1914 locked_at = [
1915 lock_object["locked_at"],
1916 lock_object["locked_at"] + self.task_locked_time,
1917 ]
1918 # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
1919 # contain exactly locked_at + self.task_locked_time
1920 LockRenew.remove_lock_object(lock_object)
1921
1922 q_filter = {
1923 "_id": ro_task["_id"],
1924 "to_check_at": ro_task["to_check_at"],
1925 "locked_at": locked_at,
1926 }
1927 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
1928 # outside this task (by ro_nbi) do not update it
1929 db_ro_task_update["locked_by"] = None
1930 # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
1931 db_ro_task_update["locked_at"] = int(now - self.task_locked_time)
1932 db_ro_task_update["modified_at"] = now
1933 db_ro_task_update["to_check_at"] = next_check_at
1934
1935 if not self.db.set_one(
1936 "ro_tasks",
1937 update_dict=db_ro_task_update,
1938 q_filter=q_filter,
1939 fail_on_empty=False,
1940 ):
1941 del db_ro_task_update["to_check_at"]
1942 del q_filter["to_check_at"]
1943 self.db.set_one(
1944 "ro_tasks",
1945 q_filter=q_filter,
1946 update_dict=db_ro_task_update,
1947 fail_on_empty=True,
1948 )
1949 except DbException as e:
1950 self.logger.error(
1951 "ro_task={} Error updating database {}".format(ro_task_id, e)
1952 )
1953 except Exception as e:
1954 self.logger.error(
1955 "Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True
1956 )
1957
1958 def _update_target(self, task, ro_vim_item_update):
1959 table, _, temp = task["target_record"].partition(":")
1960 _id, _, path_vim_status = temp.partition(":")
1961 path_item = path_vim_status[: path_vim_status.rfind(".")]
1962 path_item = path_item[: path_item.rfind(".")]
1963 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
1964 # path_item: dot separated list targeting record information, e.g. "vdur.10"
1965
1966 if ro_vim_item_update:
1967 update_dict = {
1968 path_vim_status + "." + k: v
1969 for k, v in ro_vim_item_update.items()
1970 if k
1971 in ("vim_id", "vim_details", "vim_name", "vim_status", "interfaces")
1972 }
1973
1974 if path_vim_status.startswith("vdur."):
1975 # for backward compatibility, add vdur.name apart from vdur.vim_name
1976 if ro_vim_item_update.get("vim_name"):
1977 update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"]
1978
1979 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
1980 if ro_vim_item_update.get("vim_id"):
1981 update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"]
1982
1983 # update general status
1984 if ro_vim_item_update.get("vim_status"):
1985 update_dict[path_item + ".status"] = ro_vim_item_update[
1986 "vim_status"
1987 ]
1988
1989 if ro_vim_item_update.get("interfaces"):
1990 path_interfaces = path_item + ".interfaces"
1991
1992 for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
1993 if iface:
1994 update_dict.update(
1995 {
1996 path_interfaces + ".{}.".format(i) + k: v
1997 for k, v in iface.items()
1998 if k in ("vlan", "compute_node", "pci")
1999 }
2000 )
2001
2002 # put ip_address and mac_address with ip-address and mac-address
2003 if iface.get("ip_address"):
2004 update_dict[
2005 path_interfaces + ".{}.".format(i) + "ip-address"
2006 ] = iface["ip_address"]
2007
2008 if iface.get("mac_address"):
2009 update_dict[
2010 path_interfaces + ".{}.".format(i) + "mac-address"
2011 ] = iface["mac_address"]
2012
2013 if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
2014 update_dict["ip-address"] = iface.get("ip_address").split(
2015 ";"
2016 )[0]
2017
2018 if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
2019 update_dict[path_item + ".ip-address"] = iface.get(
2020 "ip_address"
2021 ).split(";")[0]
2022
2023 self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
2024 else:
2025 update_dict = {path_item + ".status": "DELETED"}
2026 self.db.set_one(
2027 table,
2028 q_filter={"_id": _id},
2029 update_dict=update_dict,
2030 unset={path_vim_status: None},
2031 )
2032
2033 def _process_delete_db_tasks(self):
2034 """
2035 Delete task from database because vnfrs or nsrs or both have been deleted
2036 :return: None. Uses and modify self.tasks_to_delete
2037 """
2038 while self.tasks_to_delete:
2039 task = self.tasks_to_delete[0]
2040 vnfrs_deleted = None
2041 nsr_id = task["nsr_id"]
2042
2043 if task["target_record"].startswith("vnfrs:"):
2044 # check if nsrs is present
2045 if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False):
2046 vnfrs_deleted = task["target_record"].split(":")[1]
2047
2048 try:
2049 self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted)
2050 except Exception as e:
2051 self.logger.error(
2052 "Error deleting task={}: {}".format(task["task_id"], e)
2053 )
2054 self.tasks_to_delete.pop(0)
2055
2056 @staticmethod
2057 def delete_db_tasks(db, nsr_id, vnfrs_deleted):
2058 """
2059 Static method because it is called from osm_ng_ro.ns
2060 :param db: instance of database to use
2061 :param nsr_id: affected nsrs id
2062 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2063 :return: None, exception is fails
2064 """
2065 retries = 5
2066 for retry in range(retries):
2067 ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
2068 now = time.time()
2069 conflict = False
2070
2071 for ro_task in ro_tasks:
2072 db_update = {}
2073 to_delete_ro_task = True
2074
2075 for index, task in enumerate(ro_task["tasks"]):
2076 if not task:
2077 pass
2078 elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or (
2079 vnfrs_deleted
2080 and task["target_record"].startswith("vnfrs:" + vnfrs_deleted)
2081 ):
2082 db_update["tasks.{}".format(index)] = None
2083 else:
2084 # used by other nsr, ro_task cannot be deleted
2085 to_delete_ro_task = False
2086
2087 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2088 if to_delete_ro_task:
2089 if not db.del_one(
2090 "ro_tasks",
2091 q_filter={
2092 "_id": ro_task["_id"],
2093 "modified_at": ro_task["modified_at"],
2094 },
2095 fail_on_empty=False,
2096 ):
2097 conflict = True
2098 elif db_update:
2099 db_update["modified_at"] = now
2100 if not db.set_one(
2101 "ro_tasks",
2102 q_filter={
2103 "_id": ro_task["_id"],
2104 "modified_at": ro_task["modified_at"],
2105 },
2106 update_dict=db_update,
2107 fail_on_empty=False,
2108 ):
2109 conflict = True
2110 if not conflict:
2111 return
2112 else:
2113 raise NsWorkerException("Exceeded {} retries".format(retries))
2114
2115 def run(self):
2116 # load database
2117 self.logger.info("Starting")
2118 while True:
2119 # step 1: get commands from queue
2120 try:
2121 if self.vim_targets:
2122 task = self.task_queue.get(block=False)
2123 else:
2124 if not self.idle:
2125 self.logger.debug("enters in idle state")
2126 self.idle = True
2127 task = self.task_queue.get(block=True)
2128 self.idle = False
2129
2130 if task[0] == "terminate":
2131 break
2132 elif task[0] == "load_vim":
2133 self.logger.info("order to load vim {}".format(task[1]))
2134 self._load_vim(task[1])
2135 elif task[0] == "unload_vim":
2136 self.logger.info("order to unload vim {}".format(task[1]))
2137 self._unload_vim(task[1])
2138 elif task[0] == "reload_vim":
2139 self._reload_vim(task[1])
2140 elif task[0] == "check_vim":
2141 self.logger.info("order to check vim {}".format(task[1]))
2142 self._check_vim(task[1])
2143 continue
2144 except Exception as e:
2145 if isinstance(e, queue.Empty):
2146 pass
2147 else:
2148 self.logger.critical(
2149 "Error processing task: {}".format(e), exc_info=True
2150 )
2151
2152 # step 2: process pending_tasks, delete not needed tasks
2153 try:
2154 if self.tasks_to_delete:
2155 self._process_delete_db_tasks()
2156 busy = False
2157 ro_task = self._get_db_task()
2158 if ro_task:
2159 self._process_pending_tasks(ro_task)
2160 busy = True
2161 if not busy:
2162 time.sleep(5)
2163 except Exception as e:
2164 self.logger.critical(
2165 "Unexpected exception at run: " + str(e), exc_info=True
2166 )
2167
2168 self.logger.info("Finishing")