Fix bug 1446: NS ends in state BUILDING
[osm/RO.git] / NG-RO / osm_ng_ro / ns_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 #
18 ##
19
20 """"
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
25 """
26
27 import logging
28 import queue
29 import threading
30 import time
31 import yaml
32 from copy import deepcopy
33 from http import HTTPStatus
34 from os import mkdir
35 from pkg_resources import iter_entry_points
36 from shutil import rmtree
37 from unittest.mock import Mock
38
39 # from osm_common import dbmongo, dbmemory, fslocal, fsmongo, msglocal, msgkafka, version as common_version
40 from osm_common.dbbase import DbException
41 from osm_ro_plugin.vim_dummy import VimDummyConnector
42 from osm_ro_plugin.sdn_dummy import SdnDummyConnector
43 from osm_ro_plugin import vimconn, sdnconn
44 from osm_ng_ro.vim_admin import LockRenew
45
46
47 __author__ = "Alfonso Tierno"
48 __date__ = "$28-Sep-2017 12:07:15$"
49
50
51 def deep_get(target_dict, *args, **kwargs):
52 """
53 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
54 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
55 :param target_dict: dictionary to be read
56 :param args: list of keys to read from target_dict
57 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
58 :return: The wanted value if exist, None or default otherwise
59 """
60 for key in args:
61 if not isinstance(target_dict, dict) or key not in target_dict:
62 return kwargs.get("default")
63 target_dict = target_dict[key]
64 return target_dict
65
66
67 class NsWorkerException(Exception):
68 pass
69
70
71 class FailingConnector:
72 def __init__(self, error_msg):
73 self.error_msg = error_msg
74
75 for method in dir(vimconn.VimConnector):
76 if method[0] != "_":
77 setattr(
78 self, method, Mock(side_effect=vimconn.VimConnException(error_msg))
79 )
80
81 for method in dir(sdnconn.SdnConnectorBase):
82 if method[0] != "_":
83 setattr(
84 self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg))
85 )
86
87
88 class NsWorkerExceptionNotFound(NsWorkerException):
89 pass
90
91
92 class VimInteractionBase:
93 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
94 It implements methods that does nothing and return ok"""
95
96 def __init__(self, db, my_vims, db_vims, logger):
97 self.db = db
98 self.logger = logger
99 self.my_vims = my_vims
100 self.db_vims = db_vims
101
102 def new(self, ro_task, task_index, task_depends):
103 return "BUILD", {}
104
105 def refresh(self, ro_task):
106 """skip calling VIM to get image, flavor status. Assumes ok"""
107 if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
108 return "FAILED", {}
109
110 return "DONE", {}
111
112 def delete(self, ro_task, task_index):
113 """skip calling VIM to delete image. Assumes ok"""
114 return "DONE", {}
115
116 def exec(self, ro_task, task_index, task_depends):
117 return "DONE", None, None
118
119
120 class VimInteractionNet(VimInteractionBase):
121 def new(self, ro_task, task_index, task_depends):
122 vim_net_id = None
123 task = ro_task["tasks"][task_index]
124 task_id = task["task_id"]
125 created = False
126 created_items = {}
127 target_vim = self.my_vims[ro_task["target_id"]]
128
129 try:
130 # FIND
131 if task.get("find_params"):
132 # if management, get configuration of VIM
133 if task["find_params"].get("filter_dict"):
134 vim_filter = task["find_params"]["filter_dict"]
135 # mamagement network
136 elif task["find_params"].get("mgmt"):
137 if deep_get(
138 self.db_vims[ro_task["target_id"]],
139 "config",
140 "management_network_id",
141 ):
142 vim_filter = {
143 "id": self.db_vims[ro_task["target_id"]]["config"][
144 "management_network_id"
145 ]
146 }
147 elif deep_get(
148 self.db_vims[ro_task["target_id"]],
149 "config",
150 "management_network_name",
151 ):
152 vim_filter = {
153 "name": self.db_vims[ro_task["target_id"]]["config"][
154 "management_network_name"
155 ]
156 }
157 else:
158 vim_filter = {"name": task["find_params"]["name"]}
159 else:
160 raise NsWorkerExceptionNotFound(
161 "Invalid find_params for new_net {}".format(task["find_params"])
162 )
163
164 vim_nets = target_vim.get_network_list(vim_filter)
165 if not vim_nets and not task.get("params"):
166 raise NsWorkerExceptionNotFound(
167 "Network not found with this criteria: '{}'".format(
168 task.get("find_params")
169 )
170 )
171 elif len(vim_nets) > 1:
172 raise NsWorkerException(
173 "More than one network found with this criteria: '{}'".format(
174 task["find_params"]
175 )
176 )
177
178 if vim_nets:
179 vim_net_id = vim_nets[0]["id"]
180 else:
181 # CREATE
182 params = task["params"]
183 vim_net_id, created_items = target_vim.new_network(**params)
184 created = True
185
186 ro_vim_item_update = {
187 "vim_id": vim_net_id,
188 "vim_status": "BUILD",
189 "created": created,
190 "created_items": created_items,
191 "vim_details": None,
192 }
193 self.logger.debug(
194 "task={} {} new-net={} created={}".format(
195 task_id, ro_task["target_id"], vim_net_id, created
196 )
197 )
198
199 return "BUILD", ro_vim_item_update
200 except (vimconn.VimConnException, NsWorkerException) as e:
201 self.logger.error(
202 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e)
203 )
204 ro_vim_item_update = {
205 "vim_status": "VIM_ERROR",
206 "created": created,
207 "vim_details": str(e),
208 }
209
210 return "FAILED", ro_vim_item_update
211
212 def refresh(self, ro_task):
213 """Call VIM to get network status"""
214 ro_task_id = ro_task["_id"]
215 target_vim = self.my_vims[ro_task["target_id"]]
216 vim_id = ro_task["vim_info"]["vim_id"]
217 net_to_refresh_list = [vim_id]
218
219 try:
220 vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
221 vim_info = vim_dict[vim_id]
222
223 if vim_info["status"] == "ACTIVE":
224 task_status = "DONE"
225 elif vim_info["status"] == "BUILD":
226 task_status = "BUILD"
227 else:
228 task_status = "FAILED"
229 except vimconn.VimConnException as e:
230 # Mark all tasks at VIM_ERROR status
231 self.logger.error(
232 "ro_task={} vim={} get-net={}: {}".format(
233 ro_task_id, ro_task["target_id"], vim_id, e
234 )
235 )
236 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
237 task_status = "FAILED"
238
239 ro_vim_item_update = {}
240 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
241 ro_vim_item_update["vim_status"] = vim_info["status"]
242
243 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
244 ro_vim_item_update["vim_name"] = vim_info.get("name")
245
246 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
247 if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"):
248 ro_vim_item_update["vim_details"] = vim_info.get("error_msg")
249 elif vim_info["status"] == "DELETED":
250 ro_vim_item_update["vim_id"] = None
251 ro_vim_item_update["vim_details"] = "Deleted externally"
252 else:
253 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
254 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
255
256 if ro_vim_item_update:
257 self.logger.debug(
258 "ro_task={} {} get-net={}: status={} {}".format(
259 ro_task_id,
260 ro_task["target_id"],
261 vim_id,
262 ro_vim_item_update.get("vim_status"),
263 ro_vim_item_update.get("vim_details")
264 if ro_vim_item_update.get("vim_status") != "ACTIVE"
265 else "",
266 )
267 )
268
269 return task_status, ro_vim_item_update
270
271 def delete(self, ro_task, task_index):
272 task = ro_task["tasks"][task_index]
273 task_id = task["task_id"]
274 net_vim_id = ro_task["vim_info"]["vim_id"]
275 ro_vim_item_update_ok = {
276 "vim_status": "DELETED",
277 "created": False,
278 "vim_details": "DELETED",
279 "vim_id": None,
280 }
281
282 try:
283 if net_vim_id or ro_task["vim_info"]["created_items"]:
284 target_vim = self.my_vims[ro_task["target_id"]]
285 target_vim.delete_network(
286 net_vim_id, ro_task["vim_info"]["created_items"]
287 )
288 except vimconn.VimConnNotFoundException:
289 ro_vim_item_update_ok["vim_details"] = "already deleted"
290 except vimconn.VimConnException as e:
291 self.logger.error(
292 "ro_task={} vim={} del-net={}: {}".format(
293 ro_task["_id"], ro_task["target_id"], net_vim_id, e
294 )
295 )
296 ro_vim_item_update = {
297 "vim_status": "VIM_ERROR",
298 "vim_details": "Error while deleting: {}".format(e),
299 }
300
301 return "FAILED", ro_vim_item_update
302
303 self.logger.debug(
304 "task={} {} del-net={} {}".format(
305 task_id,
306 ro_task["target_id"],
307 net_vim_id,
308 ro_vim_item_update_ok.get("vim_details", ""),
309 )
310 )
311
312 return "DONE", ro_vim_item_update_ok
313
314
315 class VimInteractionVdu(VimInteractionBase):
316 max_retries_inject_ssh_key = 20 # 20 times
317 time_retries_inject_ssh_key = 30 # wevery 30 seconds
318
319 def new(self, ro_task, task_index, task_depends):
320 task = ro_task["tasks"][task_index]
321 task_id = task["task_id"]
322 created = False
323 created_items = {}
324 target_vim = self.my_vims[ro_task["target_id"]]
325
326 try:
327 created = True
328 params = task["params"]
329 params_copy = deepcopy(params)
330 net_list = params_copy["net_list"]
331
332 for net in net_list:
333 # change task_id into network_id
334 if "net_id" in net and net["net_id"].startswith("TASK-"):
335 network_id = task_depends[net["net_id"]]
336
337 if not network_id:
338 raise NsWorkerException(
339 "Cannot create VM because depends on a network not created or found "
340 "for {}".format(net["net_id"])
341 )
342
343 net["net_id"] = network_id
344
345 if params_copy["image_id"].startswith("TASK-"):
346 params_copy["image_id"] = task_depends[params_copy["image_id"]]
347
348 if params_copy["flavor_id"].startswith("TASK-"):
349 params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
350
351 vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
352 interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
353
354 ro_vim_item_update = {
355 "vim_id": vim_vm_id,
356 "vim_status": "BUILD",
357 "created": created,
358 "created_items": created_items,
359 "vim_details": None,
360 "interfaces_vim_ids": interfaces,
361 "interfaces": [],
362 }
363 self.logger.debug(
364 "task={} {} new-vm={} created={}".format(
365 task_id, ro_task["target_id"], vim_vm_id, created
366 )
367 )
368
369 return "BUILD", ro_vim_item_update
370 except (vimconn.VimConnException, NsWorkerException) as e:
371 self.logger.error(
372 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
373 )
374 ro_vim_item_update = {
375 "vim_status": "VIM_ERROR",
376 "created": created,
377 "vim_details": str(e),
378 }
379
380 return "FAILED", ro_vim_item_update
381
382 def delete(self, ro_task, task_index):
383 task = ro_task["tasks"][task_index]
384 task_id = task["task_id"]
385 vm_vim_id = ro_task["vim_info"]["vim_id"]
386 ro_vim_item_update_ok = {
387 "vim_status": "DELETED",
388 "created": False,
389 "vim_details": "DELETED",
390 "vim_id": None,
391 }
392
393 try:
394 if vm_vim_id or ro_task["vim_info"]["created_items"]:
395 target_vim = self.my_vims[ro_task["target_id"]]
396 target_vim.delete_vminstance(
397 vm_vim_id, ro_task["vim_info"]["created_items"]
398 )
399 except vimconn.VimConnNotFoundException:
400 ro_vim_item_update_ok["vim_details"] = "already deleted"
401 except vimconn.VimConnException as e:
402 self.logger.error(
403 "ro_task={} vim={} del-vm={}: {}".format(
404 ro_task["_id"], ro_task["target_id"], vm_vim_id, e
405 )
406 )
407 ro_vim_item_update = {
408 "vim_status": "VIM_ERROR",
409 "vim_details": "Error while deleting: {}".format(e),
410 }
411
412 return "FAILED", ro_vim_item_update
413
414 self.logger.debug(
415 "task={} {} del-vm={} {}".format(
416 task_id,
417 ro_task["target_id"],
418 vm_vim_id,
419 ro_vim_item_update_ok.get("vim_details", ""),
420 )
421 )
422
423 return "DONE", ro_vim_item_update_ok
424
425 def refresh(self, ro_task):
426 """Call VIM to get vm status"""
427 ro_task_id = ro_task["_id"]
428 target_vim = self.my_vims[ro_task["target_id"]]
429 vim_id = ro_task["vim_info"]["vim_id"]
430
431 if not vim_id:
432 return None, None
433
434 vm_to_refresh_list = [vim_id]
435 try:
436 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
437 vim_info = vim_dict[vim_id]
438
439 if vim_info["status"] == "ACTIVE":
440 task_status = "DONE"
441 elif vim_info["status"] == "BUILD":
442 task_status = "BUILD"
443 else:
444 task_status = "FAILED"
445
446 # try to load and parse vim_information
447 try:
448 vim_info_info = yaml.safe_load(vim_info["vim_info"])
449 if vim_info_info.get("name"):
450 vim_info["name"] = vim_info_info["name"]
451 except Exception:
452 pass
453 except vimconn.VimConnException as e:
454 # Mark all tasks at VIM_ERROR status
455 self.logger.error(
456 "ro_task={} vim={} get-vm={}: {}".format(
457 ro_task_id, ro_task["target_id"], vim_id, e
458 )
459 )
460 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
461 task_status = "FAILED"
462
463 ro_vim_item_update = {}
464
465 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
466 vim_interfaces = []
467 if vim_info.get("interfaces"):
468 for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
469 iface = next(
470 (
471 iface
472 for iface in vim_info["interfaces"]
473 if vim_iface_id == iface["vim_interface_id"]
474 ),
475 None,
476 )
477 # if iface:
478 # iface.pop("vim_info", None)
479 vim_interfaces.append(iface)
480
481 task_create = next(
482 t
483 for t in ro_task["tasks"]
484 if t and t["action"] == "CREATE" and t["status"] != "FINISHED"
485 )
486 if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None:
487 vim_interfaces[task_create["mgmt_vnf_interface"]][
488 "mgmt_vnf_interface"
489 ] = True
490
491 mgmt_vdu_iface = task_create.get(
492 "mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0)
493 )
494 if vim_interfaces:
495 vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
496
497 if ro_task["vim_info"]["interfaces"] != vim_interfaces:
498 ro_vim_item_update["interfaces"] = vim_interfaces
499
500 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
501 ro_vim_item_update["vim_status"] = vim_info["status"]
502
503 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
504 ro_vim_item_update["vim_name"] = vim_info.get("name")
505
506 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
507 if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"):
508 ro_vim_item_update["vim_details"] = vim_info.get("error_msg")
509 elif vim_info["status"] == "DELETED":
510 ro_vim_item_update["vim_id"] = None
511 ro_vim_item_update["vim_details"] = "Deleted externally"
512 else:
513 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
514 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
515
516 if ro_vim_item_update:
517 self.logger.debug(
518 "ro_task={} {} get-vm={}: status={} {}".format(
519 ro_task_id,
520 ro_task["target_id"],
521 vim_id,
522 ro_vim_item_update.get("vim_status"),
523 ro_vim_item_update.get("vim_details")
524 if ro_vim_item_update.get("vim_status") != "ACTIVE"
525 else "",
526 )
527 )
528
529 return task_status, ro_vim_item_update
530
531 def exec(self, ro_task, task_index, task_depends):
532 task = ro_task["tasks"][task_index]
533 task_id = task["task_id"]
534 target_vim = self.my_vims[ro_task["target_id"]]
535 db_task_update = {"retries": 0}
536 retries = task.get("retries", 0)
537
538 try:
539 params = task["params"]
540 params_copy = deepcopy(params)
541 params_copy["ro_key"] = self.db.decrypt(
542 params_copy.pop("private_key"),
543 params_copy.pop("schema_version"),
544 params_copy.pop("salt"),
545 )
546 params_copy["ip_addr"] = params_copy.pop("ip_address")
547 target_vim.inject_user_key(**params_copy)
548 self.logger.debug(
549 "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"])
550 )
551
552 return (
553 "DONE",
554 None,
555 db_task_update,
556 ) # params_copy["key"]
557 except (vimconn.VimConnException, NsWorkerException) as e:
558 retries += 1
559
560 if retries < self.max_retries_inject_ssh_key:
561 return (
562 "BUILD",
563 None,
564 {
565 "retries": retries,
566 "next_retry": self.time_retries_inject_ssh_key,
567 },
568 )
569
570 self.logger.error(
571 "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e)
572 )
573 ro_vim_item_update = {"vim_details": str(e)}
574
575 return "FAILED", ro_vim_item_update, db_task_update
576
577
578 class VimInteractionImage(VimInteractionBase):
579 def new(self, ro_task, task_index, task_depends):
580 task = ro_task["tasks"][task_index]
581 task_id = task["task_id"]
582 created = False
583 created_items = {}
584 target_vim = self.my_vims[ro_task["target_id"]]
585
586 try:
587 # FIND
588 if task.get("find_params"):
589 vim_images = target_vim.get_image_list(**task["find_params"])
590
591 if not vim_images:
592 raise NsWorkerExceptionNotFound(
593 "Image not found with this criteria: '{}'".format(
594 task["find_params"]
595 )
596 )
597 elif len(vim_images) > 1:
598 raise NsWorkerException(
599 "More than one network found with this criteria: '{}'".format(
600 task["find_params"]
601 )
602 )
603 else:
604 vim_image_id = vim_images[0]["id"]
605
606 ro_vim_item_update = {
607 "vim_id": vim_image_id,
608 "vim_status": "DONE",
609 "created": created,
610 "created_items": created_items,
611 "vim_details": None,
612 }
613 self.logger.debug(
614 "task={} {} new-image={} created={}".format(
615 task_id, ro_task["target_id"], vim_image_id, created
616 )
617 )
618
619 return "DONE", ro_vim_item_update
620 except (NsWorkerException, vimconn.VimConnException) as e:
621 self.logger.error(
622 "task={} {} new-image: {}".format(task_id, ro_task["target_id"], e)
623 )
624 ro_vim_item_update = {
625 "vim_status": "VIM_ERROR",
626 "created": created,
627 "vim_details": str(e),
628 }
629
630 return "FAILED", ro_vim_item_update
631
632
633 class VimInteractionFlavor(VimInteractionBase):
634 def delete(self, ro_task, task_index):
635 task = ro_task["tasks"][task_index]
636 task_id = task["task_id"]
637 flavor_vim_id = ro_task["vim_info"]["vim_id"]
638 ro_vim_item_update_ok = {
639 "vim_status": "DELETED",
640 "created": False,
641 "vim_details": "DELETED",
642 "vim_id": None,
643 }
644
645 try:
646 if flavor_vim_id:
647 target_vim = self.my_vims[ro_task["target_id"]]
648 target_vim.delete_flavor(flavor_vim_id)
649 except vimconn.VimConnNotFoundException:
650 ro_vim_item_update_ok["vim_details"] = "already deleted"
651 except vimconn.VimConnException as e:
652 self.logger.error(
653 "ro_task={} vim={} del-flavor={}: {}".format(
654 ro_task["_id"], ro_task["target_id"], flavor_vim_id, e
655 )
656 )
657 ro_vim_item_update = {
658 "vim_status": "VIM_ERROR",
659 "vim_details": "Error while deleting: {}".format(e),
660 }
661
662 return "FAILED", ro_vim_item_update
663
664 self.logger.debug(
665 "task={} {} del-flavor={} {}".format(
666 task_id,
667 ro_task["target_id"],
668 flavor_vim_id,
669 ro_vim_item_update_ok.get("vim_details", ""),
670 )
671 )
672
673 return "DONE", ro_vim_item_update_ok
674
675 def new(self, ro_task, task_index, task_depends):
676 task = ro_task["tasks"][task_index]
677 task_id = task["task_id"]
678 created = False
679 created_items = {}
680 target_vim = self.my_vims[ro_task["target_id"]]
681
682 try:
683 # FIND
684 vim_flavor_id = None
685
686 if task.get("find_params"):
687 try:
688 flavor_data = task["find_params"]["flavor_data"]
689 vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
690 except vimconn.VimConnNotFoundException:
691 pass
692
693 if not vim_flavor_id and task.get("params"):
694 # CREATE
695 flavor_data = task["params"]["flavor_data"]
696 vim_flavor_id = target_vim.new_flavor(flavor_data)
697 created = True
698
699 ro_vim_item_update = {
700 "vim_id": vim_flavor_id,
701 "vim_status": "DONE",
702 "created": created,
703 "created_items": created_items,
704 "vim_details": None,
705 }
706 self.logger.debug(
707 "task={} {} new-flavor={} created={}".format(
708 task_id, ro_task["target_id"], vim_flavor_id, created
709 )
710 )
711
712 return "DONE", ro_vim_item_update
713 except (vimconn.VimConnException, NsWorkerException) as e:
714 self.logger.error(
715 "task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e)
716 )
717 ro_vim_item_update = {
718 "vim_status": "VIM_ERROR",
719 "created": created,
720 "vim_details": str(e),
721 }
722
723 return "FAILED", ro_vim_item_update
724
725
726 class VimInteractionSdnNet(VimInteractionBase):
727 @staticmethod
728 def _match_pci(port_pci, mapping):
729 """
730 Check if port_pci matches with mapping
731 mapping can have brackets to indicate that several chars are accepted. e.g
732 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
733 :param port_pci: text
734 :param mapping: text, can contain brackets to indicate several chars are available
735 :return: True if matches, False otherwise
736 """
737 if not port_pci or not mapping:
738 return False
739 if port_pci == mapping:
740 return True
741
742 mapping_index = 0
743 pci_index = 0
744 while True:
745 bracket_start = mapping.find("[", mapping_index)
746
747 if bracket_start == -1:
748 break
749
750 bracket_end = mapping.find("]", bracket_start)
751 if bracket_end == -1:
752 break
753
754 length = bracket_start - mapping_index
755 if (
756 length
757 and port_pci[pci_index : pci_index + length]
758 != mapping[mapping_index:bracket_start]
759 ):
760 return False
761
762 if (
763 port_pci[pci_index + length]
764 not in mapping[bracket_start + 1 : bracket_end]
765 ):
766 return False
767
768 pci_index += length + 1
769 mapping_index = bracket_end + 1
770
771 if port_pci[pci_index:] != mapping[mapping_index:]:
772 return False
773
774 return True
775
776 def _get_interfaces(self, vlds_to_connect, vim_account_id):
777 """
778 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
779 :param vim_account_id:
780 :return:
781 """
782 interfaces = []
783
784 for vld in vlds_to_connect:
785 table, _, db_id = vld.partition(":")
786 db_id, _, vld = db_id.partition(":")
787 _, _, vld_id = vld.partition(".")
788
789 if table == "vnfrs":
790 q_filter = {"vim-account-id": vim_account_id, "_id": db_id}
791 iface_key = "vnf-vld-id"
792 else: # table == "nsrs"
793 q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id}
794 iface_key = "ns-vld-id"
795
796 db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter)
797
798 for db_vnfr in db_vnfrs:
799 for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())):
800 for iface_index, interface in enumerate(vdur["interfaces"]):
801 if interface.get(iface_key) == vld_id and interface.get(
802 "type"
803 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
804 # only SR-IOV o PT
805 interface_ = interface.copy()
806 interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
807 db_vnfr["_id"], vdu_index, iface_index
808 )
809
810 if vdur.get("status") == "ERROR":
811 interface_["status"] = "ERROR"
812
813 interfaces.append(interface_)
814
815 return interfaces
816
817 def refresh(self, ro_task):
818 # look for task create
819 task_create_index, _ = next(
820 i_t
821 for i_t in enumerate(ro_task["tasks"])
822 if i_t[1]
823 and i_t[1]["action"] == "CREATE"
824 and i_t[1]["status"] != "FINISHED"
825 )
826
827 return self.new(ro_task, task_create_index, None)
828
829 def new(self, ro_task, task_index, task_depends):
830
831 task = ro_task["tasks"][task_index]
832 task_id = task["task_id"]
833 target_vim = self.my_vims[ro_task["target_id"]]
834
835 sdn_net_id = ro_task["vim_info"]["vim_id"]
836
837 created_items = ro_task["vim_info"].get("created_items")
838 connected_ports = ro_task["vim_info"].get("connected_ports", [])
839 new_connected_ports = []
840 last_update = ro_task["vim_info"].get("last_update", 0)
841 sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD"
842 error_list = []
843 created = ro_task["vim_info"].get("created", False)
844
845 try:
846 # CREATE
847 params = task["params"]
848 vlds_to_connect = params["vlds"]
849 associated_vim = params["target_vim"]
850 # external additional ports
851 additional_ports = params.get("sdn-ports") or ()
852 _, _, vim_account_id = associated_vim.partition(":")
853
854 if associated_vim:
855 # get associated VIM
856 if associated_vim not in self.db_vims:
857 self.db_vims[associated_vim] = self.db.get_one(
858 "vim_accounts", {"_id": vim_account_id}
859 )
860
861 db_vim = self.db_vims[associated_vim]
862
863 # look for ports to connect
864 ports = self._get_interfaces(vlds_to_connect, vim_account_id)
865 # print(ports)
866
867 sdn_ports = []
868 pending_ports = error_ports = 0
869 vlan_used = None
870 sdn_need_update = False
871
872 for port in ports:
873 vlan_used = port.get("vlan") or vlan_used
874
875 # TODO. Do not connect if already done
876 if not port.get("compute_node") or not port.get("pci"):
877 if port.get("status") == "ERROR":
878 error_ports += 1
879 else:
880 pending_ports += 1
881 continue
882
883 pmap = None
884 compute_node_mappings = next(
885 (
886 c
887 for c in db_vim["config"].get("sdn-port-mapping", ())
888 if c and c["compute_node"] == port["compute_node"]
889 ),
890 None,
891 )
892
893 if compute_node_mappings:
894 # process port_mapping pci of type 0000:af:1[01].[1357]
895 pmap = next(
896 (
897 p
898 for p in compute_node_mappings["ports"]
899 if self._match_pci(port["pci"], p.get("pci"))
900 ),
901 None,
902 )
903
904 if not pmap:
905 if not db_vim["config"].get("mapping_not_needed"):
906 error_list.append(
907 "Port mapping not found for compute_node={} pci={}".format(
908 port["compute_node"], port["pci"]
909 )
910 )
911 continue
912
913 pmap = {}
914
915 service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"])
916 new_port = {
917 "service_endpoint_id": pmap.get("service_endpoint_id")
918 or service_endpoint_id,
919 "service_endpoint_encapsulation_type": "dot1q"
920 if port["type"] == "SR-IOV"
921 else None,
922 "service_endpoint_encapsulation_info": {
923 "vlan": port.get("vlan"),
924 "mac": port.get("mac-address"),
925 "device_id": pmap.get("device_id") or port["compute_node"],
926 "device_interface_id": pmap.get("device_interface_id")
927 or port["pci"],
928 "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"),
929 "switch_port": pmap.get("switch_port"),
930 "service_mapping_info": pmap.get("service_mapping_info"),
931 },
932 }
933
934 # TODO
935 # if port["modified_at"] > last_update:
936 # sdn_need_update = True
937 new_connected_ports.append(port["id"]) # TODO
938 sdn_ports.append(new_port)
939
940 if error_ports:
941 error_list.append(
942 "{} interfaces have not been created as VDU is on ERROR status".format(
943 error_ports
944 )
945 )
946
947 # connect external ports
948 for index, additional_port in enumerate(additional_ports):
949 additional_port_id = additional_port.get(
950 "service_endpoint_id"
951 ) or "external-{}".format(index)
952 sdn_ports.append(
953 {
954 "service_endpoint_id": additional_port_id,
955 "service_endpoint_encapsulation_type": additional_port.get(
956 "service_endpoint_encapsulation_type", "dot1q"
957 ),
958 "service_endpoint_encapsulation_info": {
959 "vlan": additional_port.get("vlan") or vlan_used,
960 "mac": additional_port.get("mac_address"),
961 "device_id": additional_port.get("device_id"),
962 "device_interface_id": additional_port.get(
963 "device_interface_id"
964 ),
965 "switch_dpid": additional_port.get("switch_dpid")
966 or additional_port.get("switch_id"),
967 "switch_port": additional_port.get("switch_port"),
968 "service_mapping_info": additional_port.get(
969 "service_mapping_info"
970 ),
971 },
972 }
973 )
974 new_connected_ports.append(additional_port_id)
975 sdn_info = ""
976
977 # if there are more ports to connect or they have been modified, call create/update
978 if error_list:
979 sdn_status = "ERROR"
980 sdn_info = "; ".join(error_list)
981 elif set(connected_ports) != set(new_connected_ports) or sdn_need_update:
982 last_update = time.time()
983
984 if not sdn_net_id:
985 if len(sdn_ports) < 2:
986 sdn_status = "ACTIVE"
987
988 if not pending_ports:
989 self.logger.debug(
990 "task={} {} new-sdn-net done, less than 2 ports".format(
991 task_id, ro_task["target_id"]
992 )
993 )
994 else:
995 net_type = params.get("type") or "ELAN"
996 (
997 sdn_net_id,
998 created_items,
999 ) = target_vim.create_connectivity_service(net_type, sdn_ports)
1000 created = True
1001 self.logger.debug(
1002 "task={} {} new-sdn-net={} created={}".format(
1003 task_id, ro_task["target_id"], sdn_net_id, created
1004 )
1005 )
1006 else:
1007 created_items = target_vim.edit_connectivity_service(
1008 sdn_net_id, conn_info=created_items, connection_points=sdn_ports
1009 )
1010 created = True
1011 self.logger.debug(
1012 "task={} {} update-sdn-net={} created={}".format(
1013 task_id, ro_task["target_id"], sdn_net_id, created
1014 )
1015 )
1016
1017 connected_ports = new_connected_ports
1018 elif sdn_net_id:
1019 wim_status_dict = target_vim.get_connectivity_service_status(
1020 sdn_net_id, conn_info=created_items
1021 )
1022 sdn_status = wim_status_dict["sdn_status"]
1023
1024 if wim_status_dict.get("sdn_info"):
1025 sdn_info = str(wim_status_dict.get("sdn_info")) or ""
1026
1027 if wim_status_dict.get("error_msg"):
1028 sdn_info = wim_status_dict.get("error_msg") or ""
1029
1030 if pending_ports:
1031 if sdn_status != "ERROR":
1032 sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1033 len(ports) - pending_ports, len(ports)
1034 )
1035
1036 if sdn_status == "ACTIVE":
1037 sdn_status = "BUILD"
1038
1039 ro_vim_item_update = {
1040 "vim_id": sdn_net_id,
1041 "vim_status": sdn_status,
1042 "created": created,
1043 "created_items": created_items,
1044 "connected_ports": connected_ports,
1045 "vim_details": sdn_info,
1046 "last_update": last_update,
1047 }
1048
1049 return sdn_status, ro_vim_item_update
1050 except Exception as e:
1051 self.logger.error(
1052 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e),
1053 exc_info=not isinstance(
1054 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1055 ),
1056 )
1057 ro_vim_item_update = {
1058 "vim_status": "VIM_ERROR",
1059 "created": created,
1060 "vim_details": str(e),
1061 }
1062
1063 return "FAILED", ro_vim_item_update
1064
1065 def delete(self, ro_task, task_index):
1066 task = ro_task["tasks"][task_index]
1067 task_id = task["task_id"]
1068 sdn_vim_id = ro_task["vim_info"].get("vim_id")
1069 ro_vim_item_update_ok = {
1070 "vim_status": "DELETED",
1071 "created": False,
1072 "vim_details": "DELETED",
1073 "vim_id": None,
1074 }
1075
1076 try:
1077 if sdn_vim_id:
1078 target_vim = self.my_vims[ro_task["target_id"]]
1079 target_vim.delete_connectivity_service(
1080 sdn_vim_id, ro_task["vim_info"].get("created_items")
1081 )
1082
1083 except Exception as e:
1084 if (
1085 isinstance(e, sdnconn.SdnConnectorError)
1086 and e.http_code == HTTPStatus.NOT_FOUND.value
1087 ):
1088 ro_vim_item_update_ok["vim_details"] = "already deleted"
1089 else:
1090 self.logger.error(
1091 "ro_task={} vim={} del-sdn-net={}: {}".format(
1092 ro_task["_id"], ro_task["target_id"], sdn_vim_id, e
1093 ),
1094 exc_info=not isinstance(
1095 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1096 ),
1097 )
1098 ro_vim_item_update = {
1099 "vim_status": "VIM_ERROR",
1100 "vim_details": "Error while deleting: {}".format(e),
1101 }
1102
1103 return "FAILED", ro_vim_item_update
1104
1105 self.logger.debug(
1106 "task={} {} del-sdn-net={} {}".format(
1107 task_id,
1108 ro_task["target_id"],
1109 sdn_vim_id,
1110 ro_vim_item_update_ok.get("vim_details", ""),
1111 )
1112 )
1113
1114 return "DONE", ro_vim_item_update_ok
1115
1116
1117 class NsWorker(threading.Thread):
1118 REFRESH_BUILD = 5 # 5 seconds
1119 REFRESH_ACTIVE = 60 # 1 minute
1120 REFRESH_ERROR = 600
1121 REFRESH_IMAGE = 3600 * 10
1122 REFRESH_DELETE = 3600 * 10
1123 QUEUE_SIZE = 100
1124 terminate = False
1125
1126 def __init__(self, worker_index, config, plugins, db):
1127 """
1128
1129 :param worker_index: thread index
1130 :param config: general configuration of RO, among others the process_id with the docker id where it runs
1131 :param plugins: global shared dict with the loaded plugins
1132 :param db: database class instance to use
1133 """
1134 threading.Thread.__init__(self)
1135 self.config = config
1136 self.plugins = plugins
1137 self.plugin_name = "unknown"
1138 self.logger = logging.getLogger("ro.worker{}".format(worker_index))
1139 self.worker_index = worker_index
1140 self.task_queue = queue.Queue(self.QUEUE_SIZE)
1141 # targetvim: vimplugin class
1142 self.my_vims = {}
1143 # targetvim: vim information from database
1144 self.db_vims = {}
1145 # targetvim list
1146 self.vim_targets = []
1147 self.my_id = config["process_id"] + ":" + str(worker_index)
1148 self.db = db
1149 self.item2class = {
1150 "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger),
1151 "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger),
1152 "image": VimInteractionImage(
1153 self.db, self.my_vims, self.db_vims, self.logger
1154 ),
1155 "flavor": VimInteractionFlavor(
1156 self.db, self.my_vims, self.db_vims, self.logger
1157 ),
1158 "sdn_net": VimInteractionSdnNet(
1159 self.db, self.my_vims, self.db_vims, self.logger
1160 ),
1161 }
1162 self.time_last_task_processed = None
1163 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1164 self.tasks_to_delete = []
1165 # it is idle when there are not vim_targets associated
1166 self.idle = True
1167 self.task_locked_time = config["global"]["task_locked_time"]
1168
1169 def insert_task(self, task):
1170 try:
1171 self.task_queue.put(task, False)
1172 return None
1173 except queue.Full:
1174 raise NsWorkerException("timeout inserting a task")
1175
1176 def terminate(self):
1177 self.insert_task("exit")
1178
1179 def del_task(self, task):
1180 with self.task_lock:
1181 if task["status"] == "SCHEDULED":
1182 task["status"] = "SUPERSEDED"
1183 return True
1184 else: # task["status"] == "processing"
1185 self.task_lock.release()
1186 return False
1187
1188 def _process_vim_config(self, target_id, db_vim):
1189 """
1190 Process vim config, creating vim configuration files as ca_cert
1191 :param target_id: vim/sdn/wim + id
1192 :param db_vim: Vim dictionary obtained from database
1193 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1194 """
1195 if not db_vim.get("config"):
1196 return
1197
1198 file_name = ""
1199
1200 try:
1201 if db_vim["config"].get("ca_cert_content"):
1202 file_name = "{}:{}".format(target_id, self.worker_index)
1203
1204 try:
1205 mkdir(file_name)
1206 except FileExistsError:
1207 pass
1208
1209 file_name = file_name + "/ca_cert"
1210
1211 with open(file_name, "w") as f:
1212 f.write(db_vim["config"]["ca_cert_content"])
1213 del db_vim["config"]["ca_cert_content"]
1214 db_vim["config"]["ca_cert"] = file_name
1215 except Exception as e:
1216 raise NsWorkerException(
1217 "Error writing to file '{}': {}".format(file_name, e)
1218 )
1219
1220 def _load_plugin(self, name, type="vim"):
1221 # type can be vim or sdn
1222 if "rovim_dummy" not in self.plugins:
1223 self.plugins["rovim_dummy"] = VimDummyConnector
1224
1225 if "rosdn_dummy" not in self.plugins:
1226 self.plugins["rosdn_dummy"] = SdnDummyConnector
1227
1228 if name in self.plugins:
1229 return self.plugins[name]
1230
1231 try:
1232 for v in iter_entry_points("osm_ro{}.plugins".format(type), name):
1233 self.plugins[name] = v.load()
1234 except Exception as e:
1235 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e))
1236
1237 if name and name not in self.plugins:
1238 raise NsWorkerException(
1239 "Plugin 'osm_{n}' has not been installed".format(n=name)
1240 )
1241
1242 return self.plugins[name]
1243
1244 def _unload_vim(self, target_id):
1245 """
1246 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1247 :param target_id: Contains type:_id; where type can be 'vim', ...
1248 :return: None.
1249 """
1250 try:
1251 self.db_vims.pop(target_id, None)
1252 self.my_vims.pop(target_id, None)
1253
1254 if target_id in self.vim_targets:
1255 self.vim_targets.remove(target_id)
1256
1257 self.logger.info("Unloaded {}".format(target_id))
1258 rmtree("{}:{}".format(target_id, self.worker_index))
1259 except FileNotFoundError:
1260 pass # this is raised by rmtree if folder does not exist
1261 except Exception as e:
1262 self.logger.error("Cannot unload {}: {}".format(target_id, e))
1263
1264 def _check_vim(self, target_id):
1265 """
1266 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1267 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1268 :return: None.
1269 """
1270 target, _, _id = target_id.partition(":")
1271 now = time.time()
1272 update_dict = {}
1273 unset_dict = {}
1274 op_text = ""
1275 step = ""
1276 loaded = target_id in self.vim_targets
1277 target_database = (
1278 "vim_accounts"
1279 if target == "vim"
1280 else "wim_accounts"
1281 if target == "wim"
1282 else "sdns"
1283 )
1284
1285 try:
1286 step = "Getting {} from db".format(target_id)
1287 db_vim = self.db.get_one(target_database, {"_id": _id})
1288
1289 for op_index, operation in enumerate(
1290 db_vim["_admin"].get("operations", ())
1291 ):
1292 if operation["operationState"] != "PROCESSING":
1293 continue
1294
1295 locked_at = operation.get("locked_at")
1296
1297 if locked_at is not None and locked_at >= now - self.task_locked_time:
1298 # some other thread is doing this operation
1299 return
1300
1301 # lock
1302 op_text = "_admin.operations.{}.".format(op_index)
1303
1304 if not self.db.set_one(
1305 target_database,
1306 q_filter={
1307 "_id": _id,
1308 op_text + "operationState": "PROCESSING",
1309 op_text + "locked_at": locked_at,
1310 },
1311 update_dict={
1312 op_text + "locked_at": now,
1313 "admin.current_operation": op_index,
1314 },
1315 fail_on_empty=False,
1316 ):
1317 return
1318
1319 unset_dict[op_text + "locked_at"] = None
1320 unset_dict["current_operation"] = None
1321 step = "Loading " + target_id
1322 error_text = self._load_vim(target_id)
1323
1324 if not error_text:
1325 step = "Checking connectivity"
1326
1327 if target == "vim":
1328 self.my_vims[target_id].check_vim_connectivity()
1329 else:
1330 self.my_vims[target_id].check_credentials()
1331
1332 update_dict["_admin.operationalState"] = "ENABLED"
1333 update_dict["_admin.detailed-status"] = ""
1334 unset_dict[op_text + "detailed-status"] = None
1335 update_dict[op_text + "operationState"] = "COMPLETED"
1336
1337 return
1338
1339 except Exception as e:
1340 error_text = "{}: {}".format(step, e)
1341 self.logger.error("{} for {}: {}".format(step, target_id, e))
1342
1343 finally:
1344 if update_dict or unset_dict:
1345 if error_text:
1346 update_dict[op_text + "operationState"] = "FAILED"
1347 update_dict[op_text + "detailed-status"] = error_text
1348 unset_dict.pop(op_text + "detailed-status", None)
1349 update_dict["_admin.operationalState"] = "ERROR"
1350 update_dict["_admin.detailed-status"] = error_text
1351
1352 if op_text:
1353 update_dict[op_text + "statusEnteredTime"] = now
1354
1355 self.db.set_one(
1356 target_database,
1357 q_filter={"_id": _id},
1358 update_dict=update_dict,
1359 unset=unset_dict,
1360 fail_on_empty=False,
1361 )
1362
1363 if not loaded:
1364 self._unload_vim(target_id)
1365
1366 def _reload_vim(self, target_id):
1367 if target_id in self.vim_targets:
1368 self._load_vim(target_id)
1369 else:
1370 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1371 # just remove it to force load again next time it is needed
1372 self.db_vims.pop(target_id, None)
1373
1374 def _load_vim(self, target_id):
1375 """
1376 Load or reload a vim_account, sdn_controller or wim_account.
1377 Read content from database, load the plugin if not loaded.
1378 In case of error loading the plugin, it load a failing VIM_connector
1379 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1380 :param target_id: Contains type:_id; where type can be 'vim', ...
1381 :return: None if ok, descriptive text if error
1382 """
1383 target, _, _id = target_id.partition(":")
1384 target_database = (
1385 "vim_accounts"
1386 if target == "vim"
1387 else "wim_accounts"
1388 if target == "wim"
1389 else "sdns"
1390 )
1391 plugin_name = ""
1392 vim = None
1393
1394 try:
1395 step = "Getting {}={} from db".format(target, _id)
1396 # TODO process for wim, sdnc, ...
1397 vim = self.db.get_one(target_database, {"_id": _id})
1398
1399 # if deep_get(vim, "config", "sdn-controller"):
1400 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1401 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1402
1403 step = "Decrypting password"
1404 schema_version = vim.get("schema_version")
1405 self.db.encrypt_decrypt_fields(
1406 vim,
1407 "decrypt",
1408 fields=("password", "secret"),
1409 schema_version=schema_version,
1410 salt=_id,
1411 )
1412 self._process_vim_config(target_id, vim)
1413
1414 if target == "vim":
1415 plugin_name = "rovim_" + vim["vim_type"]
1416 step = "Loading plugin '{}'".format(plugin_name)
1417 vim_module_conn = self._load_plugin(plugin_name)
1418 step = "Loading {}'".format(target_id)
1419 self.my_vims[target_id] = vim_module_conn(
1420 uuid=vim["_id"],
1421 name=vim["name"],
1422 tenant_id=vim.get("vim_tenant_id"),
1423 tenant_name=vim.get("vim_tenant_name"),
1424 url=vim["vim_url"],
1425 url_admin=None,
1426 user=vim["vim_user"],
1427 passwd=vim["vim_password"],
1428 config=vim.get("config") or {},
1429 persistent_info={},
1430 )
1431 else: # sdn
1432 plugin_name = "rosdn_" + vim["type"]
1433 step = "Loading plugin '{}'".format(plugin_name)
1434 vim_module_conn = self._load_plugin(plugin_name, "sdn")
1435 step = "Loading {}'".format(target_id)
1436 wim = deepcopy(vim)
1437 wim_config = wim.pop("config", {}) or {}
1438 wim["uuid"] = wim["_id"]
1439 wim["wim_url"] = wim["url"]
1440
1441 if wim.get("dpid"):
1442 wim_config["dpid"] = wim.pop("dpid")
1443
1444 if wim.get("switch_id"):
1445 wim_config["switch_id"] = wim.pop("switch_id")
1446
1447 # wim, wim_account, config
1448 self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config)
1449 self.db_vims[target_id] = vim
1450 self.error_status = None
1451
1452 self.logger.info(
1453 "Connector loaded for {}, plugin={}".format(target_id, plugin_name)
1454 )
1455 except Exception as e:
1456 self.logger.error(
1457 "Cannot load {} plugin={}: {} {}".format(
1458 target_id, plugin_name, step, e
1459 )
1460 )
1461
1462 self.db_vims[target_id] = vim or {}
1463 self.db_vims[target_id] = FailingConnector(str(e))
1464 error_status = "{} Error: {}".format(step, e)
1465
1466 return error_status
1467 finally:
1468 if target_id not in self.vim_targets:
1469 self.vim_targets.append(target_id)
1470
1471 def _get_db_task(self):
1472 """
1473 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1474 :return: None
1475 """
1476 now = time.time()
1477
1478 if not self.time_last_task_processed:
1479 self.time_last_task_processed = now
1480
1481 try:
1482 while True:
1483 locked = self.db.set_one(
1484 "ro_tasks",
1485 q_filter={
1486 "target_id": self.vim_targets,
1487 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1488 "locked_at.lt": now - self.task_locked_time,
1489 "to_check_at.lt": self.time_last_task_processed,
1490 },
1491 update_dict={"locked_by": self.my_id, "locked_at": now},
1492 fail_on_empty=False,
1493 )
1494
1495 if locked:
1496 # read and return
1497 ro_task = self.db.get_one(
1498 "ro_tasks",
1499 q_filter={
1500 "target_id": self.vim_targets,
1501 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1502 "locked_at": now,
1503 },
1504 )
1505 return ro_task
1506
1507 if self.time_last_task_processed == now:
1508 self.time_last_task_processed = None
1509 return None
1510 else:
1511 self.time_last_task_processed = now
1512 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
1513
1514 except DbException as e:
1515 self.logger.error("Database exception at _get_db_task: {}".format(e))
1516 except Exception as e:
1517 self.logger.critical(
1518 "Unexpected exception at _get_db_task: {}".format(e), exc_info=True
1519 )
1520
1521 return None
1522
1523 def _delete_task(self, ro_task, task_index, task_depends, db_update):
1524 """
1525 Determine if this task need to be done or superseded
1526 :return: None
1527 """
1528 my_task = ro_task["tasks"][task_index]
1529 task_id = my_task["task_id"]
1530 needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get(
1531 "created_items", False
1532 )
1533
1534 if my_task["status"] == "FAILED":
1535 return None, None # TODO need to be retry??
1536
1537 try:
1538 for index, task in enumerate(ro_task["tasks"]):
1539 if index == task_index or not task:
1540 continue # own task
1541
1542 if (
1543 my_task["target_record"] == task["target_record"]
1544 and task["action"] == "CREATE"
1545 ):
1546 # set to finished
1547 db_update["tasks.{}.status".format(index)] = task[
1548 "status"
1549 ] = "FINISHED"
1550 elif task["action"] == "CREATE" and task["status"] not in (
1551 "FINISHED",
1552 "SUPERSEDED",
1553 ):
1554 needed_delete = False
1555
1556 if needed_delete:
1557 return self.item2class[my_task["item"]].delete(ro_task, task_index)
1558 else:
1559 return "SUPERSEDED", None
1560 except Exception as e:
1561 if not isinstance(e, NsWorkerException):
1562 self.logger.critical(
1563 "Unexpected exception at _delete_task task={}: {}".format(
1564 task_id, e
1565 ),
1566 exc_info=True,
1567 )
1568
1569 return "FAILED", {"vim_status": "VIM_ERROR", "vim_details": str(e)}
1570
1571 def _create_task(self, ro_task, task_index, task_depends, db_update):
1572 """
1573 Determine if this task need to create something at VIM
1574 :return: None
1575 """
1576 my_task = ro_task["tasks"][task_index]
1577 task_id = my_task["task_id"]
1578 task_status = None
1579
1580 if my_task["status"] == "FAILED":
1581 return None, None # TODO need to be retry??
1582 elif my_task["status"] == "SCHEDULED":
1583 # check if already created by another task
1584 for index, task in enumerate(ro_task["tasks"]):
1585 if index == task_index or not task:
1586 continue # own task
1587
1588 if task["action"] == "CREATE" and task["status"] not in (
1589 "SCHEDULED",
1590 "FINISHED",
1591 "SUPERSEDED",
1592 ):
1593 return task["status"], "COPY_VIM_INFO"
1594
1595 try:
1596 task_status, ro_vim_item_update = self.item2class[my_task["item"]].new(
1597 ro_task, task_index, task_depends
1598 )
1599 # TODO update other CREATE tasks
1600 except Exception as e:
1601 if not isinstance(e, NsWorkerException):
1602 self.logger.error(
1603 "Error executing task={}: {}".format(task_id, e), exc_info=True
1604 )
1605
1606 task_status = "FAILED"
1607 ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_details": str(e)}
1608 # TODO update ro_vim_item_update
1609
1610 return task_status, ro_vim_item_update
1611 else:
1612 return None, None
1613
1614 def _get_dependency(self, task_id, ro_task=None, target_id=None):
1615 """
1616 Look for dependency task
1617 :param task_id: Can be one of
1618 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1619 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1620 3. task.task_id: "<action_id>:number"
1621 :param ro_task:
1622 :param target_id:
1623 :return: database ro_task plus index of task
1624 """
1625 if (
1626 task_id.startswith("vim:")
1627 or task_id.startswith("sdn:")
1628 or task_id.startswith("wim:")
1629 ):
1630 target_id, _, task_id = task_id.partition(" ")
1631
1632 if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
1633 ro_task_dependency = self.db.get_one(
1634 "ro_tasks",
1635 q_filter={"target_id": target_id, "tasks.target_record_id": task_id},
1636 fail_on_empty=False,
1637 )
1638
1639 if ro_task_dependency:
1640 for task_index, task in enumerate(ro_task_dependency["tasks"]):
1641 if task["target_record_id"] == task_id:
1642 return ro_task_dependency, task_index
1643
1644 else:
1645 if ro_task:
1646 for task_index, task in enumerate(ro_task["tasks"]):
1647 if task and task["task_id"] == task_id:
1648 return ro_task, task_index
1649
1650 ro_task_dependency = self.db.get_one(
1651 "ro_tasks",
1652 q_filter={
1653 "tasks.ANYINDEX.task_id": task_id,
1654 "tasks.ANYINDEX.target_record.ne": None,
1655 },
1656 fail_on_empty=False,
1657 )
1658
1659 if ro_task_dependency:
1660 for task_index, task in ro_task_dependency["tasks"]:
1661 if task["task_id"] == task_id:
1662 return ro_task_dependency, task_index
1663 raise NsWorkerException("Cannot get depending task {}".format(task_id))
1664
1665 def _process_pending_tasks(self, ro_task):
1666 ro_task_id = ro_task["_id"]
1667 now = time.time()
1668 # one day
1669 next_check_at = now + (24 * 60 * 60)
1670 db_ro_task_update = {}
1671
1672 def _update_refresh(new_status):
1673 # compute next_refresh
1674 nonlocal task
1675 nonlocal next_check_at
1676 nonlocal db_ro_task_update
1677 nonlocal ro_task
1678
1679 next_refresh = time.time()
1680
1681 if task["item"] in ("image", "flavor"):
1682 next_refresh += self.REFRESH_IMAGE
1683 elif new_status == "BUILD":
1684 next_refresh += self.REFRESH_BUILD
1685 elif new_status == "DONE":
1686 next_refresh += self.REFRESH_ACTIVE
1687 else:
1688 next_refresh += self.REFRESH_ERROR
1689
1690 next_check_at = min(next_check_at, next_refresh)
1691 db_ro_task_update["vim_info.refresh_at"] = next_refresh
1692 ro_task["vim_info"]["refresh_at"] = next_refresh
1693
1694 try:
1695 # 0: get task_status_create
1696 lock_object = None
1697 task_status_create = None
1698 task_create = next(
1699 (
1700 t
1701 for t in ro_task["tasks"]
1702 if t
1703 and t["action"] == "CREATE"
1704 and t["status"] in ("BUILD", "DONE")
1705 ),
1706 None,
1707 )
1708
1709 if task_create:
1710 task_status_create = task_create["status"]
1711
1712 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
1713 for task_action in ("DELETE", "CREATE", "EXEC"):
1714 db_vim_update = None
1715 new_status = None
1716
1717 for task_index, task in enumerate(ro_task["tasks"]):
1718 if not task:
1719 continue # task deleted
1720
1721 task_depends = {}
1722 target_update = None
1723
1724 if (
1725 (
1726 task_action in ("DELETE", "EXEC")
1727 and task["status"] not in ("SCHEDULED", "BUILD")
1728 )
1729 or task["action"] != task_action
1730 or (
1731 task_action == "CREATE"
1732 and task["status"] in ("FINISHED", "SUPERSEDED")
1733 )
1734 ):
1735 continue
1736
1737 task_path = "tasks.{}.status".format(task_index)
1738 try:
1739 db_vim_info_update = None
1740
1741 if task["status"] == "SCHEDULED":
1742 # check if tasks that this depends on have been completed
1743 dependency_not_completed = False
1744
1745 for dependency_task_id in task.get("depends_on") or ():
1746 (
1747 dependency_ro_task,
1748 dependency_task_index,
1749 ) = self._get_dependency(
1750 dependency_task_id, target_id=ro_task["target_id"]
1751 )
1752 dependency_task = dependency_ro_task["tasks"][
1753 dependency_task_index
1754 ]
1755
1756 if dependency_task["status"] == "SCHEDULED":
1757 dependency_not_completed = True
1758 next_check_at = min(
1759 next_check_at, dependency_ro_task["to_check_at"]
1760 )
1761 # must allow dependent task to be processed first
1762 # to do this set time after last_task_processed
1763 next_check_at = max(
1764 self.time_last_task_processed, next_check_at
1765 )
1766 break
1767 elif dependency_task["status"] == "FAILED":
1768 error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
1769 task["action"],
1770 task["item"],
1771 dependency_task["action"],
1772 dependency_task["item"],
1773 dependency_task_id,
1774 dependency_ro_task["vim_info"].get(
1775 "vim_details"
1776 ),
1777 )
1778 self.logger.error(
1779 "task={} {}".format(task["task_id"], error_text)
1780 )
1781 raise NsWorkerException(error_text)
1782
1783 task_depends[dependency_task_id] = dependency_ro_task[
1784 "vim_info"
1785 ]["vim_id"]
1786 task_depends[
1787 "TASK-{}".format(dependency_task_id)
1788 ] = dependency_ro_task["vim_info"]["vim_id"]
1789
1790 if dependency_not_completed:
1791 # TODO set at vim_info.vim_details that it is waiting
1792 continue
1793
1794 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
1795 # the task of renew this locking. It will update database locket_at periodically
1796 if not lock_object:
1797 lock_object = LockRenew.add_lock_object(
1798 "ro_tasks", ro_task, self
1799 )
1800
1801 if task["action"] == "DELETE":
1802 (new_status, db_vim_info_update,) = self._delete_task(
1803 ro_task, task_index, task_depends, db_ro_task_update
1804 )
1805 new_status = (
1806 "FINISHED" if new_status == "DONE" else new_status
1807 )
1808 # ^with FINISHED instead of DONE it will not be refreshing
1809
1810 if new_status in ("FINISHED", "SUPERSEDED"):
1811 target_update = "DELETE"
1812 elif task["action"] == "EXEC":
1813 (
1814 new_status,
1815 db_vim_info_update,
1816 db_task_update,
1817 ) = self.item2class[task["item"]].exec(
1818 ro_task, task_index, task_depends
1819 )
1820 new_status = (
1821 "FINISHED" if new_status == "DONE" else new_status
1822 )
1823 # ^with FINISHED instead of DONE it will not be refreshing
1824
1825 if db_task_update:
1826 # load into database the modified db_task_update "retries" and "next_retry"
1827 if db_task_update.get("retries"):
1828 db_ro_task_update[
1829 "tasks.{}.retries".format(task_index)
1830 ] = db_task_update["retries"]
1831
1832 next_check_at = time.time() + db_task_update.get(
1833 "next_retry", 60
1834 )
1835 target_update = None
1836 elif task["action"] == "CREATE":
1837 if task["status"] == "SCHEDULED":
1838 if task_status_create:
1839 new_status = task_status_create
1840 target_update = "COPY_VIM_INFO"
1841 else:
1842 new_status, db_vim_info_update = self.item2class[
1843 task["item"]
1844 ].new(ro_task, task_index, task_depends)
1845 # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
1846 _update_refresh(new_status)
1847 else:
1848 if (
1849 ro_task["vim_info"]["refresh_at"]
1850 and now > ro_task["vim_info"]["refresh_at"]
1851 ):
1852 new_status, db_vim_info_update = self.item2class[
1853 task["item"]
1854 ].refresh(ro_task)
1855 _update_refresh(new_status)
1856
1857 except Exception as e:
1858 new_status = "FAILED"
1859 db_vim_info_update = {
1860 "vim_status": "VIM_ERROR",
1861 "vim_details": str(e),
1862 }
1863
1864 if not isinstance(
1865 e, (NsWorkerException, vimconn.VimConnException)
1866 ):
1867 self.logger.error(
1868 "Unexpected exception at _delete_task task={}: {}".format(
1869 task["task_id"], e
1870 ),
1871 exc_info=True,
1872 )
1873
1874 try:
1875 if db_vim_info_update:
1876 db_vim_update = db_vim_info_update.copy()
1877 db_ro_task_update.update(
1878 {
1879 "vim_info." + k: v
1880 for k, v in db_vim_info_update.items()
1881 }
1882 )
1883 ro_task["vim_info"].update(db_vim_info_update)
1884
1885 if new_status:
1886 if task_action == "CREATE":
1887 task_status_create = new_status
1888 db_ro_task_update[task_path] = new_status
1889
1890 if target_update or db_vim_update:
1891 if target_update == "DELETE":
1892 self._update_target(task, None)
1893 elif target_update == "COPY_VIM_INFO":
1894 self._update_target(task, ro_task["vim_info"])
1895 else:
1896 self._update_target(task, db_vim_update)
1897
1898 except Exception as e:
1899 if (
1900 isinstance(e, DbException)
1901 and e.http_code == HTTPStatus.NOT_FOUND
1902 ):
1903 # if the vnfrs or nsrs has been removed from database, this task must be removed
1904 self.logger.debug(
1905 "marking to delete task={}".format(task["task_id"])
1906 )
1907 self.tasks_to_delete.append(task)
1908 else:
1909 self.logger.error(
1910 "Unexpected exception at _update_target task={}: {}".format(
1911 task["task_id"], e
1912 ),
1913 exc_info=True,
1914 )
1915
1916 locked_at = ro_task["locked_at"]
1917
1918 if lock_object:
1919 locked_at = [
1920 lock_object["locked_at"],
1921 lock_object["locked_at"] + self.task_locked_time,
1922 ]
1923 # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
1924 # contain exactly locked_at + self.task_locked_time
1925 LockRenew.remove_lock_object(lock_object)
1926
1927 q_filter = {
1928 "_id": ro_task["_id"],
1929 "to_check_at": ro_task["to_check_at"],
1930 "locked_at": locked_at,
1931 }
1932 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
1933 # outside this task (by ro_nbi) do not update it
1934 db_ro_task_update["locked_by"] = None
1935 # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
1936 db_ro_task_update["locked_at"] = int(now - self.task_locked_time)
1937 db_ro_task_update["modified_at"] = now
1938 db_ro_task_update["to_check_at"] = next_check_at
1939
1940 if not self.db.set_one(
1941 "ro_tasks",
1942 update_dict=db_ro_task_update,
1943 q_filter=q_filter,
1944 fail_on_empty=False,
1945 ):
1946 del db_ro_task_update["to_check_at"]
1947 del q_filter["to_check_at"]
1948 self.db.set_one(
1949 "ro_tasks",
1950 q_filter=q_filter,
1951 update_dict=db_ro_task_update,
1952 fail_on_empty=True,
1953 )
1954 except DbException as e:
1955 self.logger.error(
1956 "ro_task={} Error updating database {}".format(ro_task_id, e)
1957 )
1958 except Exception as e:
1959 self.logger.error(
1960 "Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True
1961 )
1962
1963 def _update_target(self, task, ro_vim_item_update):
1964 table, _, temp = task["target_record"].partition(":")
1965 _id, _, path_vim_status = temp.partition(":")
1966 path_item = path_vim_status[: path_vim_status.rfind(".")]
1967 path_item = path_item[: path_item.rfind(".")]
1968 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
1969 # path_item: dot separated list targeting record information, e.g. "vdur.10"
1970
1971 if ro_vim_item_update:
1972 update_dict = {
1973 path_vim_status + "." + k: v
1974 for k, v in ro_vim_item_update.items()
1975 if k
1976 in ("vim_id", "vim_details", "vim_name", "vim_status", "interfaces")
1977 }
1978
1979 if path_vim_status.startswith("vdur."):
1980 # for backward compatibility, add vdur.name apart from vdur.vim_name
1981 if ro_vim_item_update.get("vim_name"):
1982 update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"]
1983
1984 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
1985 if ro_vim_item_update.get("vim_id"):
1986 update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"]
1987
1988 # update general status
1989 if ro_vim_item_update.get("vim_status"):
1990 update_dict[path_item + ".status"] = ro_vim_item_update[
1991 "vim_status"
1992 ]
1993
1994 if ro_vim_item_update.get("interfaces"):
1995 path_interfaces = path_item + ".interfaces"
1996
1997 for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
1998 if iface:
1999 update_dict.update(
2000 {
2001 path_interfaces + ".{}.".format(i) + k: v
2002 for k, v in iface.items()
2003 if k in ("vlan", "compute_node", "pci")
2004 }
2005 )
2006
2007 # put ip_address and mac_address with ip-address and mac-address
2008 if iface.get("ip_address"):
2009 update_dict[
2010 path_interfaces + ".{}.".format(i) + "ip-address"
2011 ] = iface["ip_address"]
2012
2013 if iface.get("mac_address"):
2014 update_dict[
2015 path_interfaces + ".{}.".format(i) + "mac-address"
2016 ] = iface["mac_address"]
2017
2018 if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
2019 update_dict["ip-address"] = iface.get("ip_address").split(
2020 ";"
2021 )[0]
2022
2023 if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
2024 update_dict[path_item + ".ip-address"] = iface.get(
2025 "ip_address"
2026 ).split(";")[0]
2027
2028 self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
2029 else:
2030 update_dict = {path_item + ".status": "DELETED"}
2031 self.db.set_one(
2032 table,
2033 q_filter={"_id": _id},
2034 update_dict=update_dict,
2035 unset={path_vim_status: None},
2036 )
2037
2038 def _process_delete_db_tasks(self):
2039 """
2040 Delete task from database because vnfrs or nsrs or both have been deleted
2041 :return: None. Uses and modify self.tasks_to_delete
2042 """
2043 while self.tasks_to_delete:
2044 task = self.tasks_to_delete[0]
2045 vnfrs_deleted = None
2046 nsr_id = task["nsr_id"]
2047
2048 if task["target_record"].startswith("vnfrs:"):
2049 # check if nsrs is present
2050 if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False):
2051 vnfrs_deleted = task["target_record"].split(":")[1]
2052
2053 try:
2054 self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted)
2055 except Exception as e:
2056 self.logger.error(
2057 "Error deleting task={}: {}".format(task["task_id"], e)
2058 )
2059 self.tasks_to_delete.pop(0)
2060
2061 @staticmethod
2062 def delete_db_tasks(db, nsr_id, vnfrs_deleted):
2063 """
2064 Static method because it is called from osm_ng_ro.ns
2065 :param db: instance of database to use
2066 :param nsr_id: affected nsrs id
2067 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2068 :return: None, exception is fails
2069 """
2070 retries = 5
2071 for retry in range(retries):
2072 ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
2073 now = time.time()
2074 conflict = False
2075
2076 for ro_task in ro_tasks:
2077 db_update = {}
2078 to_delete_ro_task = True
2079
2080 for index, task in enumerate(ro_task["tasks"]):
2081 if not task:
2082 pass
2083 elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or (
2084 vnfrs_deleted
2085 and task["target_record"].startswith("vnfrs:" + vnfrs_deleted)
2086 ):
2087 db_update["tasks.{}".format(index)] = None
2088 else:
2089 # used by other nsr, ro_task cannot be deleted
2090 to_delete_ro_task = False
2091
2092 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2093 if to_delete_ro_task:
2094 if not db.del_one(
2095 "ro_tasks",
2096 q_filter={
2097 "_id": ro_task["_id"],
2098 "modified_at": ro_task["modified_at"],
2099 },
2100 fail_on_empty=False,
2101 ):
2102 conflict = True
2103 elif db_update:
2104 db_update["modified_at"] = now
2105 if not db.set_one(
2106 "ro_tasks",
2107 q_filter={
2108 "_id": ro_task["_id"],
2109 "modified_at": ro_task["modified_at"],
2110 },
2111 update_dict=db_update,
2112 fail_on_empty=False,
2113 ):
2114 conflict = True
2115 if not conflict:
2116 return
2117 else:
2118 raise NsWorkerException("Exceeded {} retries".format(retries))
2119
2120 def run(self):
2121 # load database
2122 self.logger.info("Starting")
2123 while True:
2124 # step 1: get commands from queue
2125 try:
2126 if self.vim_targets:
2127 task = self.task_queue.get(block=False)
2128 else:
2129 if not self.idle:
2130 self.logger.debug("enters in idle state")
2131 self.idle = True
2132 task = self.task_queue.get(block=True)
2133 self.idle = False
2134
2135 if task[0] == "terminate":
2136 break
2137 elif task[0] == "load_vim":
2138 self.logger.info("order to load vim {}".format(task[1]))
2139 self._load_vim(task[1])
2140 elif task[0] == "unload_vim":
2141 self.logger.info("order to unload vim {}".format(task[1]))
2142 self._unload_vim(task[1])
2143 elif task[0] == "reload_vim":
2144 self._reload_vim(task[1])
2145 elif task[0] == "check_vim":
2146 self.logger.info("order to check vim {}".format(task[1]))
2147 self._check_vim(task[1])
2148 continue
2149 except Exception as e:
2150 if isinstance(e, queue.Empty):
2151 pass
2152 else:
2153 self.logger.critical(
2154 "Error processing task: {}".format(e), exc_info=True
2155 )
2156
2157 # step 2: process pending_tasks, delete not needed tasks
2158 try:
2159 if self.tasks_to_delete:
2160 self._process_delete_db_tasks()
2161 busy = False
2162 ro_task = self._get_db_task()
2163 if ro_task:
2164 self._process_pending_tasks(ro_task)
2165 busy = True
2166 if not busy:
2167 time.sleep(5)
2168 except Exception as e:
2169 self.logger.critical(
2170 "Unexpected exception at run: " + str(e), exc_info=True
2171 )
2172
2173 self.logger.info("Finishing")