7656a5448511c7eb28633cfd1227039a943786b8
[osm/RO.git] / NG-RO / osm_ng_ro / ns_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 #
18 ##
19
20 """"
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
25 """
26
27 from copy import deepcopy
28 from http import HTTPStatus
29 import logging
30 from os import mkdir
31 import queue
32 from shutil import rmtree
33 import threading
34 import time
35 import traceback
36 from typing import Dict
37 from unittest.mock import Mock
38
39 from importlib_metadata import entry_points
40 from osm_common.dbbase import DbException
41 from osm_ng_ro.vim_admin import LockRenew
42 from osm_ro_plugin import sdnconn, vimconn
43 from osm_ro_plugin.sdn_dummy import SdnDummyConnector
44 from osm_ro_plugin.vim_dummy import VimDummyConnector
45 import yaml
46
47 __author__ = "Alfonso Tierno"
48 __date__ = "$28-Sep-2017 12:07:15$"
49
50
51 def deep_get(target_dict, *args, **kwargs):
52 """
53 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
54 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
55 :param target_dict: dictionary to be read
56 :param args: list of keys to read from target_dict
57 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
58 :return: The wanted value if exist, None or default otherwise
59 """
60 for key in args:
61 if not isinstance(target_dict, dict) or key not in target_dict:
62 return kwargs.get("default")
63 target_dict = target_dict[key]
64 return target_dict
65
66
67 class NsWorkerException(Exception):
68 pass
69
70
71 class FailingConnector:
72 def __init__(self, error_msg):
73 self.error_msg = error_msg
74
75 for method in dir(vimconn.VimConnector):
76 if method[0] != "_":
77 setattr(
78 self, method, Mock(side_effect=vimconn.VimConnException(error_msg))
79 )
80
81 for method in dir(sdnconn.SdnConnectorBase):
82 if method[0] != "_":
83 setattr(
84 self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg))
85 )
86
87
88 class NsWorkerExceptionNotFound(NsWorkerException):
89 pass
90
91
92 class VimInteractionBase:
93 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
94 It implements methods that does nothing and return ok"""
95
96 def __init__(self, db, my_vims, db_vims, logger):
97 self.db = db
98 self.logger = logger
99 self.my_vims = my_vims
100 self.db_vims = db_vims
101
102 def new(self, ro_task, task_index, task_depends):
103 return "BUILD", {}
104
105 def refresh(self, ro_task):
106 """skip calling VIM to get image, flavor status. Assumes ok"""
107 if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
108 return "FAILED", {}
109
110 return "DONE", {}
111
112 def delete(self, ro_task, task_index):
113 """skip calling VIM to delete image. Assumes ok"""
114 return "DONE", {}
115
116 def exec(self, ro_task, task_index, task_depends):
117 return "DONE", None, None
118
119
120 class VimInteractionNet(VimInteractionBase):
121 def new(self, ro_task, task_index, task_depends):
122 vim_net_id = None
123 task = ro_task["tasks"][task_index]
124 task_id = task["task_id"]
125 created = False
126 created_items = {}
127 target_vim = self.my_vims[ro_task["target_id"]]
128 mgmtnet = False
129 mgmtnet_defined_in_vim = False
130
131 try:
132 # FIND
133 if task.get("find_params"):
134 # if management, get configuration of VIM
135 if task["find_params"].get("filter_dict"):
136 vim_filter = task["find_params"]["filter_dict"]
137 # management network
138 elif task["find_params"].get("mgmt"):
139 mgmtnet = True
140 if deep_get(
141 self.db_vims[ro_task["target_id"]],
142 "config",
143 "management_network_id",
144 ):
145 mgmtnet_defined_in_vim = True
146 vim_filter = {
147 "id": self.db_vims[ro_task["target_id"]]["config"][
148 "management_network_id"
149 ]
150 }
151 elif deep_get(
152 self.db_vims[ro_task["target_id"]],
153 "config",
154 "management_network_name",
155 ):
156 mgmtnet_defined_in_vim = True
157 vim_filter = {
158 "name": self.db_vims[ro_task["target_id"]]["config"][
159 "management_network_name"
160 ]
161 }
162 else:
163 vim_filter = {"name": task["find_params"]["name"]}
164 else:
165 raise NsWorkerExceptionNotFound(
166 "Invalid find_params for new_net {}".format(task["find_params"])
167 )
168
169 vim_nets = target_vim.get_network_list(vim_filter)
170 if not vim_nets and not task.get("params"):
171 # If there is mgmt-network in the descriptor,
172 # there is no mapping of that network to a VIM network in the descriptor,
173 # also there is no mapping in the "--config" parameter or at VIM creation;
174 # that mgmt-network will be created.
175 if mgmtnet and not mgmtnet_defined_in_vim:
176 net_name = (
177 vim_filter.get("name")
178 if vim_filter.get("name")
179 else vim_filter.get("id")[:16]
180 )
181 vim_net_id, created_items = target_vim.new_network(
182 net_name, None
183 )
184 self.logger.debug(
185 "Created mgmt network vim_net_id: {}".format(vim_net_id)
186 )
187 created = True
188 else:
189 raise NsWorkerExceptionNotFound(
190 "Network not found with this criteria: '{}'".format(
191 task.get("find_params")
192 )
193 )
194 elif len(vim_nets) > 1:
195 raise NsWorkerException(
196 "More than one network found with this criteria: '{}'".format(
197 task["find_params"]
198 )
199 )
200
201 if vim_nets:
202 vim_net_id = vim_nets[0]["id"]
203 else:
204 # CREATE
205 params = task["params"]
206 vim_net_id, created_items = target_vim.new_network(**params)
207 created = True
208
209 ro_vim_item_update = {
210 "vim_id": vim_net_id,
211 "vim_status": "BUILD",
212 "created": created,
213 "created_items": created_items,
214 "vim_details": None,
215 "vim_message": None,
216 }
217 self.logger.debug(
218 "task={} {} new-net={} created={}".format(
219 task_id, ro_task["target_id"], vim_net_id, created
220 )
221 )
222
223 return "BUILD", ro_vim_item_update
224 except (vimconn.VimConnException, NsWorkerException) as e:
225 self.logger.error(
226 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e)
227 )
228 ro_vim_item_update = {
229 "vim_status": "VIM_ERROR",
230 "created": created,
231 "vim_message": str(e),
232 }
233
234 return "FAILED", ro_vim_item_update
235
236 def refresh(self, ro_task):
237 """Call VIM to get network status"""
238 ro_task_id = ro_task["_id"]
239 target_vim = self.my_vims[ro_task["target_id"]]
240 vim_id = ro_task["vim_info"]["vim_id"]
241 net_to_refresh_list = [vim_id]
242
243 try:
244 vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
245 vim_info = vim_dict[vim_id]
246
247 if vim_info["status"] == "ACTIVE":
248 task_status = "DONE"
249 elif vim_info["status"] == "BUILD":
250 task_status = "BUILD"
251 else:
252 task_status = "FAILED"
253 except vimconn.VimConnException as e:
254 # Mark all tasks at VIM_ERROR status
255 self.logger.error(
256 "ro_task={} vim={} get-net={}: {}".format(
257 ro_task_id, ro_task["target_id"], vim_id, e
258 )
259 )
260 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
261 task_status = "FAILED"
262
263 ro_vim_item_update = {}
264 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
265 ro_vim_item_update["vim_status"] = vim_info["status"]
266
267 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
268 ro_vim_item_update["vim_name"] = vim_info.get("name")
269
270 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
271 if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"):
272 ro_vim_item_update["vim_message"] = vim_info.get("error_msg")
273 elif vim_info["status"] == "DELETED":
274 ro_vim_item_update["vim_id"] = None
275 ro_vim_item_update["vim_message"] = "Deleted externally"
276 else:
277 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
278 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
279
280 if ro_vim_item_update:
281 self.logger.debug(
282 "ro_task={} {} get-net={}: status={} {}".format(
283 ro_task_id,
284 ro_task["target_id"],
285 vim_id,
286 ro_vim_item_update.get("vim_status"),
287 ro_vim_item_update.get("vim_message")
288 if ro_vim_item_update.get("vim_status") != "ACTIVE"
289 else "",
290 )
291 )
292
293 return task_status, ro_vim_item_update
294
295 def delete(self, ro_task, task_index):
296 task = ro_task["tasks"][task_index]
297 task_id = task["task_id"]
298 net_vim_id = ro_task["vim_info"]["vim_id"]
299 ro_vim_item_update_ok = {
300 "vim_status": "DELETED",
301 "created": False,
302 "vim_message": "DELETED",
303 "vim_id": None,
304 }
305
306 try:
307 if net_vim_id or ro_task["vim_info"]["created_items"]:
308 target_vim = self.my_vims[ro_task["target_id"]]
309 target_vim.delete_network(
310 net_vim_id, ro_task["vim_info"]["created_items"]
311 )
312 except vimconn.VimConnNotFoundException:
313 ro_vim_item_update_ok["vim_message"] = "already deleted"
314 except vimconn.VimConnException as e:
315 self.logger.error(
316 "ro_task={} vim={} del-net={}: {}".format(
317 ro_task["_id"], ro_task["target_id"], net_vim_id, e
318 )
319 )
320 ro_vim_item_update = {
321 "vim_status": "VIM_ERROR",
322 "vim_message": "Error while deleting: {}".format(e),
323 }
324
325 return "FAILED", ro_vim_item_update
326
327 self.logger.debug(
328 "task={} {} del-net={} {}".format(
329 task_id,
330 ro_task["target_id"],
331 net_vim_id,
332 ro_vim_item_update_ok.get("vim_message", ""),
333 )
334 )
335
336 return "DONE", ro_vim_item_update_ok
337
338
339 class VimInteractionVdu(VimInteractionBase):
340 max_retries_inject_ssh_key = 20 # 20 times
341 time_retries_inject_ssh_key = 30 # wevery 30 seconds
342
343 def new(self, ro_task, task_index, task_depends):
344 task = ro_task["tasks"][task_index]
345 task_id = task["task_id"]
346 created = False
347 created_items = {}
348 target_vim = self.my_vims[ro_task["target_id"]]
349
350 try:
351 created = True
352 params = task["params"]
353 params_copy = deepcopy(params)
354 net_list = params_copy["net_list"]
355
356 for net in net_list:
357 # change task_id into network_id
358 if "net_id" in net and net["net_id"].startswith("TASK-"):
359 network_id = task_depends[net["net_id"]]
360
361 if not network_id:
362 raise NsWorkerException(
363 "Cannot create VM because depends on a network not created or found "
364 "for {}".format(net["net_id"])
365 )
366
367 net["net_id"] = network_id
368
369 if params_copy["image_id"].startswith("TASK-"):
370 params_copy["image_id"] = task_depends[params_copy["image_id"]]
371
372 if params_copy["flavor_id"].startswith("TASK-"):
373 params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
374
375 affinity_group_list = params_copy["affinity_group_list"]
376 for affinity_group in affinity_group_list:
377 # change task_id into affinity_group_id
378 if "affinity_group_id" in affinity_group and affinity_group[
379 "affinity_group_id"
380 ].startswith("TASK-"):
381 affinity_group_id = task_depends[
382 affinity_group["affinity_group_id"]
383 ]
384
385 if not affinity_group_id:
386 raise NsWorkerException(
387 "found for {}".format(affinity_group["affinity_group_id"])
388 )
389
390 affinity_group["affinity_group_id"] = affinity_group_id
391
392 vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
393 interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
394
395 # add to created items previous_created_volumes (healing)
396 if task.get("previous_created_volumes"):
397 for k, v in task["previous_created_volumes"].items():
398 created_items[k] = v
399
400 ro_vim_item_update = {
401 "vim_id": vim_vm_id,
402 "vim_status": "BUILD",
403 "created": created,
404 "created_items": created_items,
405 "vim_details": None,
406 "vim_message": None,
407 "interfaces_vim_ids": interfaces,
408 "interfaces": [],
409 "interfaces_backup": [],
410 }
411 self.logger.debug(
412 "task={} {} new-vm={} created={}".format(
413 task_id, ro_task["target_id"], vim_vm_id, created
414 )
415 )
416
417 return "BUILD", ro_vim_item_update
418 except (vimconn.VimConnException, NsWorkerException) as e:
419 self.logger.debug(traceback.format_exc())
420 self.logger.error(
421 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
422 )
423 ro_vim_item_update = {
424 "vim_status": "VIM_ERROR",
425 "created": created,
426 "vim_message": str(e),
427 }
428
429 return "FAILED", ro_vim_item_update
430
431 def delete(self, ro_task, task_index):
432 task = ro_task["tasks"][task_index]
433 task_id = task["task_id"]
434 vm_vim_id = ro_task["vim_info"]["vim_id"]
435 ro_vim_item_update_ok = {
436 "vim_status": "DELETED",
437 "created": False,
438 "vim_message": "DELETED",
439 "vim_id": None,
440 }
441
442 try:
443 self.logger.debug(
444 "delete_vminstance: vm_vim_id={} created_items={}".format(
445 vm_vim_id, ro_task["vim_info"]["created_items"]
446 )
447 )
448 if vm_vim_id or ro_task["vim_info"]["created_items"]:
449 target_vim = self.my_vims[ro_task["target_id"]]
450 target_vim.delete_vminstance(
451 vm_vim_id,
452 ro_task["vim_info"]["created_items"],
453 ro_task["vim_info"].get("volumes_to_hold", []),
454 )
455 except vimconn.VimConnNotFoundException:
456 ro_vim_item_update_ok["vim_message"] = "already deleted"
457 except vimconn.VimConnException as e:
458 self.logger.error(
459 "ro_task={} vim={} del-vm={}: {}".format(
460 ro_task["_id"], ro_task["target_id"], vm_vim_id, e
461 )
462 )
463 ro_vim_item_update = {
464 "vim_status": "VIM_ERROR",
465 "vim_message": "Error while deleting: {}".format(e),
466 }
467
468 return "FAILED", ro_vim_item_update
469
470 self.logger.debug(
471 "task={} {} del-vm={} {}".format(
472 task_id,
473 ro_task["target_id"],
474 vm_vim_id,
475 ro_vim_item_update_ok.get("vim_message", ""),
476 )
477 )
478
479 return "DONE", ro_vim_item_update_ok
480
481 def refresh(self, ro_task):
482 """Call VIM to get vm status"""
483 ro_task_id = ro_task["_id"]
484 target_vim = self.my_vims[ro_task["target_id"]]
485 vim_id = ro_task["vim_info"]["vim_id"]
486
487 if not vim_id:
488 return None, None
489
490 vm_to_refresh_list = [vim_id]
491 try:
492 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
493 vim_info = vim_dict[vim_id]
494
495 if vim_info["status"] == "ACTIVE":
496 task_status = "DONE"
497 elif vim_info["status"] == "BUILD":
498 task_status = "BUILD"
499 else:
500 task_status = "FAILED"
501
502 # try to load and parse vim_information
503 try:
504 vim_info_info = yaml.safe_load(vim_info["vim_info"])
505 if vim_info_info.get("name"):
506 vim_info["name"] = vim_info_info["name"]
507 except Exception:
508 pass
509 except vimconn.VimConnException as e:
510 # Mark all tasks at VIM_ERROR status
511 self.logger.error(
512 "ro_task={} vim={} get-vm={}: {}".format(
513 ro_task_id, ro_task["target_id"], vim_id, e
514 )
515 )
516 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
517 task_status = "FAILED"
518
519 ro_vim_item_update = {}
520
521 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
522 vim_interfaces = []
523 if vim_info.get("interfaces"):
524 for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
525 iface = next(
526 (
527 iface
528 for iface in vim_info["interfaces"]
529 if vim_iface_id == iface["vim_interface_id"]
530 ),
531 None,
532 )
533 # if iface:
534 # iface.pop("vim_info", None)
535 vim_interfaces.append(iface)
536
537 task_create = next(
538 t
539 for t in ro_task["tasks"]
540 if t and t["action"] == "CREATE" and t["status"] != "FINISHED"
541 )
542 if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None:
543 vim_interfaces[task_create["mgmt_vnf_interface"]][
544 "mgmt_vnf_interface"
545 ] = True
546
547 mgmt_vdu_iface = task_create.get(
548 "mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0)
549 )
550 if vim_interfaces:
551 vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
552
553 if ro_task["vim_info"]["interfaces"] != vim_interfaces:
554 ro_vim_item_update["interfaces"] = vim_interfaces
555
556 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
557 ro_vim_item_update["vim_status"] = vim_info["status"]
558
559 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
560 ro_vim_item_update["vim_name"] = vim_info.get("name")
561
562 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
563 if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"):
564 ro_vim_item_update["vim_message"] = vim_info.get("error_msg")
565 elif vim_info["status"] == "DELETED":
566 ro_vim_item_update["vim_id"] = None
567 ro_vim_item_update["vim_message"] = "Deleted externally"
568 else:
569 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
570 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
571
572 if ro_vim_item_update:
573 self.logger.debug(
574 "ro_task={} {} get-vm={}: status={} {}".format(
575 ro_task_id,
576 ro_task["target_id"],
577 vim_id,
578 ro_vim_item_update.get("vim_status"),
579 ro_vim_item_update.get("vim_message")
580 if ro_vim_item_update.get("vim_status") != "ACTIVE"
581 else "",
582 )
583 )
584
585 return task_status, ro_vim_item_update
586
587 def exec(self, ro_task, task_index, task_depends):
588 task = ro_task["tasks"][task_index]
589 task_id = task["task_id"]
590 target_vim = self.my_vims[ro_task["target_id"]]
591 db_task_update = {"retries": 0}
592 retries = task.get("retries", 0)
593
594 try:
595 params = task["params"]
596 params_copy = deepcopy(params)
597 params_copy["ro_key"] = self.db.decrypt(
598 params_copy.pop("private_key"),
599 params_copy.pop("schema_version"),
600 params_copy.pop("salt"),
601 )
602 params_copy["ip_addr"] = params_copy.pop("ip_address")
603 target_vim.inject_user_key(**params_copy)
604 self.logger.debug(
605 "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"])
606 )
607
608 return (
609 "DONE",
610 None,
611 db_task_update,
612 ) # params_copy["key"]
613 except (vimconn.VimConnException, NsWorkerException) as e:
614 retries += 1
615
616 self.logger.debug(traceback.format_exc())
617 if retries < self.max_retries_inject_ssh_key:
618 return (
619 "BUILD",
620 None,
621 {
622 "retries": retries,
623 "next_retry": self.time_retries_inject_ssh_key,
624 },
625 )
626
627 self.logger.error(
628 "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e)
629 )
630 ro_vim_item_update = {"vim_message": str(e)}
631
632 return "FAILED", ro_vim_item_update, db_task_update
633
634
635 class VimInteractionImage(VimInteractionBase):
636 def new(self, ro_task, task_index, task_depends):
637 task = ro_task["tasks"][task_index]
638 task_id = task["task_id"]
639 created = False
640 created_items = {}
641 target_vim = self.my_vims[ro_task["target_id"]]
642
643 try:
644 # FIND
645 if task.get("find_params"):
646 vim_images = target_vim.get_image_list(**task["find_params"])
647
648 if not vim_images:
649 raise NsWorkerExceptionNotFound(
650 "Image not found with this criteria: '{}'".format(
651 task["find_params"]
652 )
653 )
654 elif len(vim_images) > 1:
655 raise NsWorkerException(
656 "More than one image found with this criteria: '{}'".format(
657 task["find_params"]
658 )
659 )
660 else:
661 vim_image_id = vim_images[0]["id"]
662
663 ro_vim_item_update = {
664 "vim_id": vim_image_id,
665 "vim_status": "DONE",
666 "created": created,
667 "created_items": created_items,
668 "vim_details": None,
669 "vim_message": None,
670 }
671 self.logger.debug(
672 "task={} {} new-image={} created={}".format(
673 task_id, ro_task["target_id"], vim_image_id, created
674 )
675 )
676
677 return "DONE", ro_vim_item_update
678 except (NsWorkerException, vimconn.VimConnException) as e:
679 self.logger.error(
680 "task={} {} new-image: {}".format(task_id, ro_task["target_id"], e)
681 )
682 ro_vim_item_update = {
683 "vim_status": "VIM_ERROR",
684 "created": created,
685 "vim_message": str(e),
686 }
687
688 return "FAILED", ro_vim_item_update
689
690
691 class VimInteractionFlavor(VimInteractionBase):
692 def delete(self, ro_task, task_index):
693 task = ro_task["tasks"][task_index]
694 task_id = task["task_id"]
695 flavor_vim_id = ro_task["vim_info"]["vim_id"]
696 ro_vim_item_update_ok = {
697 "vim_status": "DELETED",
698 "created": False,
699 "vim_message": "DELETED",
700 "vim_id": None,
701 }
702
703 try:
704 if flavor_vim_id:
705 target_vim = self.my_vims[ro_task["target_id"]]
706 target_vim.delete_flavor(flavor_vim_id)
707 except vimconn.VimConnNotFoundException:
708 ro_vim_item_update_ok["vim_message"] = "already deleted"
709 except vimconn.VimConnException as e:
710 self.logger.error(
711 "ro_task={} vim={} del-flavor={}: {}".format(
712 ro_task["_id"], ro_task["target_id"], flavor_vim_id, e
713 )
714 )
715 ro_vim_item_update = {
716 "vim_status": "VIM_ERROR",
717 "vim_message": "Error while deleting: {}".format(e),
718 }
719
720 return "FAILED", ro_vim_item_update
721
722 self.logger.debug(
723 "task={} {} del-flavor={} {}".format(
724 task_id,
725 ro_task["target_id"],
726 flavor_vim_id,
727 ro_vim_item_update_ok.get("vim_message", ""),
728 )
729 )
730
731 return "DONE", ro_vim_item_update_ok
732
733 def new(self, ro_task, task_index, task_depends):
734 task = ro_task["tasks"][task_index]
735 task_id = task["task_id"]
736 created = False
737 created_items = {}
738 target_vim = self.my_vims[ro_task["target_id"]]
739
740 try:
741 # FIND
742 vim_flavor_id = None
743
744 if task.get("find_params"):
745 try:
746 flavor_data = task["find_params"]["flavor_data"]
747 vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
748 except vimconn.VimConnNotFoundException:
749 pass
750
751 if not vim_flavor_id and task.get("params"):
752 # CREATE
753 flavor_data = task["params"]["flavor_data"]
754 vim_flavor_id = target_vim.new_flavor(flavor_data)
755 created = True
756
757 ro_vim_item_update = {
758 "vim_id": vim_flavor_id,
759 "vim_status": "DONE",
760 "created": created,
761 "created_items": created_items,
762 "vim_details": None,
763 "vim_message": None,
764 }
765 self.logger.debug(
766 "task={} {} new-flavor={} created={}".format(
767 task_id, ro_task["target_id"], vim_flavor_id, created
768 )
769 )
770
771 return "DONE", ro_vim_item_update
772 except (vimconn.VimConnException, NsWorkerException) as e:
773 self.logger.error(
774 "task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e)
775 )
776 ro_vim_item_update = {
777 "vim_status": "VIM_ERROR",
778 "created": created,
779 "vim_message": str(e),
780 }
781
782 return "FAILED", ro_vim_item_update
783
784
785 class VimInteractionAffinityGroup(VimInteractionBase):
786 def delete(self, ro_task, task_index):
787 task = ro_task["tasks"][task_index]
788 task_id = task["task_id"]
789 affinity_group_vim_id = ro_task["vim_info"]["vim_id"]
790 ro_vim_item_update_ok = {
791 "vim_status": "DELETED",
792 "created": False,
793 "vim_message": "DELETED",
794 "vim_id": None,
795 }
796
797 try:
798 if affinity_group_vim_id:
799 target_vim = self.my_vims[ro_task["target_id"]]
800 target_vim.delete_affinity_group(affinity_group_vim_id)
801 except vimconn.VimConnNotFoundException:
802 ro_vim_item_update_ok["vim_message"] = "already deleted"
803 except vimconn.VimConnException as e:
804 self.logger.error(
805 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format(
806 ro_task["_id"], ro_task["target_id"], affinity_group_vim_id, e
807 )
808 )
809 ro_vim_item_update = {
810 "vim_status": "VIM_ERROR",
811 "vim_message": "Error while deleting: {}".format(e),
812 }
813
814 return "FAILED", ro_vim_item_update
815
816 self.logger.debug(
817 "task={} {} del-affinity-or-anti-affinity-group={} {}".format(
818 task_id,
819 ro_task["target_id"],
820 affinity_group_vim_id,
821 ro_vim_item_update_ok.get("vim_message", ""),
822 )
823 )
824
825 return "DONE", ro_vim_item_update_ok
826
827 def new(self, ro_task, task_index, task_depends):
828 task = ro_task["tasks"][task_index]
829 task_id = task["task_id"]
830 created = False
831 created_items = {}
832 target_vim = self.my_vims[ro_task["target_id"]]
833
834 try:
835 affinity_group_vim_id = None
836 affinity_group_data = None
837
838 if task.get("params"):
839 affinity_group_data = task["params"].get("affinity_group_data")
840
841 if affinity_group_data and affinity_group_data.get("vim-affinity-group-id"):
842 try:
843 param_affinity_group_id = task["params"]["affinity_group_data"].get(
844 "vim-affinity-group-id"
845 )
846 affinity_group_vim_id = target_vim.get_affinity_group(
847 param_affinity_group_id
848 ).get("id")
849 except vimconn.VimConnNotFoundException:
850 self.logger.error(
851 "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}"
852 "could not be found at VIM. Creating a new one.".format(
853 task_id, ro_task["target_id"], param_affinity_group_id
854 )
855 )
856
857 if not affinity_group_vim_id and affinity_group_data:
858 affinity_group_vim_id = target_vim.new_affinity_group(
859 affinity_group_data
860 )
861 created = True
862
863 ro_vim_item_update = {
864 "vim_id": affinity_group_vim_id,
865 "vim_status": "DONE",
866 "created": created,
867 "created_items": created_items,
868 "vim_details": None,
869 "vim_message": None,
870 }
871 self.logger.debug(
872 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format(
873 task_id, ro_task["target_id"], affinity_group_vim_id, created
874 )
875 )
876
877 return "DONE", ro_vim_item_update
878 except (vimconn.VimConnException, NsWorkerException) as e:
879 self.logger.error(
880 "task={} vim={} new-affinity-or-anti-affinity-group:"
881 " {}".format(task_id, ro_task["target_id"], e)
882 )
883 ro_vim_item_update = {
884 "vim_status": "VIM_ERROR",
885 "created": created,
886 "vim_message": str(e),
887 }
888
889 return "FAILED", ro_vim_item_update
890
891
892 class VimInteractionUpdateVdu(VimInteractionBase):
893 def exec(self, ro_task, task_index, task_depends):
894 task = ro_task["tasks"][task_index]
895 task_id = task["task_id"]
896 db_task_update = {"retries": 0}
897 created = False
898 created_items = {}
899 target_vim = self.my_vims[ro_task["target_id"]]
900
901 try:
902 if task.get("params"):
903 vim_vm_id = task["params"].get("vim_vm_id")
904 action = task["params"].get("action")
905 context = {action: action}
906 target_vim.action_vminstance(vim_vm_id, context)
907 # created = True
908 ro_vim_item_update = {
909 "vim_id": vim_vm_id,
910 "vim_status": "DONE",
911 "created": created,
912 "created_items": created_items,
913 "vim_details": None,
914 "vim_message": None,
915 }
916 self.logger.debug(
917 "task={} {} vm-migration done".format(task_id, ro_task["target_id"])
918 )
919 return "DONE", ro_vim_item_update, db_task_update
920 except (vimconn.VimConnException, NsWorkerException) as e:
921 self.logger.error(
922 "task={} vim={} VM Migration:"
923 " {}".format(task_id, ro_task["target_id"], e)
924 )
925 ro_vim_item_update = {
926 "vim_status": "VIM_ERROR",
927 "created": created,
928 "vim_message": str(e),
929 }
930
931 return "FAILED", ro_vim_item_update, db_task_update
932
933
934 class VimInteractionSdnNet(VimInteractionBase):
935 @staticmethod
936 def _match_pci(port_pci, mapping):
937 """
938 Check if port_pci matches with mapping
939 mapping can have brackets to indicate that several chars are accepted. e.g
940 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
941 :param port_pci: text
942 :param mapping: text, can contain brackets to indicate several chars are available
943 :return: True if matches, False otherwise
944 """
945 if not port_pci or not mapping:
946 return False
947 if port_pci == mapping:
948 return True
949
950 mapping_index = 0
951 pci_index = 0
952 while True:
953 bracket_start = mapping.find("[", mapping_index)
954
955 if bracket_start == -1:
956 break
957
958 bracket_end = mapping.find("]", bracket_start)
959 if bracket_end == -1:
960 break
961
962 length = bracket_start - mapping_index
963 if (
964 length
965 and port_pci[pci_index : pci_index + length]
966 != mapping[mapping_index:bracket_start]
967 ):
968 return False
969
970 if (
971 port_pci[pci_index + length]
972 not in mapping[bracket_start + 1 : bracket_end]
973 ):
974 return False
975
976 pci_index += length + 1
977 mapping_index = bracket_end + 1
978
979 if port_pci[pci_index:] != mapping[mapping_index:]:
980 return False
981
982 return True
983
984 def _get_interfaces(self, vlds_to_connect, vim_account_id):
985 """
986 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
987 :param vim_account_id:
988 :return:
989 """
990 interfaces = []
991
992 for vld in vlds_to_connect:
993 table, _, db_id = vld.partition(":")
994 db_id, _, vld = db_id.partition(":")
995 _, _, vld_id = vld.partition(".")
996
997 if table == "vnfrs":
998 q_filter = {"vim-account-id": vim_account_id, "_id": db_id}
999 iface_key = "vnf-vld-id"
1000 else: # table == "nsrs"
1001 q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id}
1002 iface_key = "ns-vld-id"
1003
1004 db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter)
1005
1006 for db_vnfr in db_vnfrs:
1007 for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())):
1008 for iface_index, interface in enumerate(vdur["interfaces"]):
1009 if interface.get(iface_key) == vld_id and interface.get(
1010 "type"
1011 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
1012 # only SR-IOV o PT
1013 interface_ = interface.copy()
1014 interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
1015 db_vnfr["_id"], vdu_index, iface_index
1016 )
1017
1018 if vdur.get("status") == "ERROR":
1019 interface_["status"] = "ERROR"
1020
1021 interfaces.append(interface_)
1022
1023 return interfaces
1024
1025 def refresh(self, ro_task):
1026 # look for task create
1027 task_create_index, _ = next(
1028 i_t
1029 for i_t in enumerate(ro_task["tasks"])
1030 if i_t[1]
1031 and i_t[1]["action"] == "CREATE"
1032 and i_t[1]["status"] != "FINISHED"
1033 )
1034
1035 return self.new(ro_task, task_create_index, None)
1036
1037 def new(self, ro_task, task_index, task_depends):
1038
1039 task = ro_task["tasks"][task_index]
1040 task_id = task["task_id"]
1041 target_vim = self.my_vims[ro_task["target_id"]]
1042
1043 sdn_net_id = ro_task["vim_info"]["vim_id"]
1044
1045 created_items = ro_task["vim_info"].get("created_items")
1046 connected_ports = ro_task["vim_info"].get("connected_ports", [])
1047 new_connected_ports = []
1048 last_update = ro_task["vim_info"].get("last_update", 0)
1049 sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD"
1050 error_list = []
1051 created = ro_task["vim_info"].get("created", False)
1052
1053 try:
1054 # CREATE
1055 params = task["params"]
1056 vlds_to_connect = params["vlds"]
1057 associated_vim = params["target_vim"]
1058 # external additional ports
1059 additional_ports = params.get("sdn-ports") or ()
1060 _, _, vim_account_id = associated_vim.partition(":")
1061
1062 if associated_vim:
1063 # get associated VIM
1064 if associated_vim not in self.db_vims:
1065 self.db_vims[associated_vim] = self.db.get_one(
1066 "vim_accounts", {"_id": vim_account_id}
1067 )
1068
1069 db_vim = self.db_vims[associated_vim]
1070
1071 # look for ports to connect
1072 ports = self._get_interfaces(vlds_to_connect, vim_account_id)
1073 # print(ports)
1074
1075 sdn_ports = []
1076 pending_ports = error_ports = 0
1077 vlan_used = None
1078 sdn_need_update = False
1079
1080 for port in ports:
1081 vlan_used = port.get("vlan") or vlan_used
1082
1083 # TODO. Do not connect if already done
1084 if not port.get("compute_node") or not port.get("pci"):
1085 if port.get("status") == "ERROR":
1086 error_ports += 1
1087 else:
1088 pending_ports += 1
1089 continue
1090
1091 pmap = None
1092 compute_node_mappings = next(
1093 (
1094 c
1095 for c in db_vim["config"].get("sdn-port-mapping", ())
1096 if c and c["compute_node"] == port["compute_node"]
1097 ),
1098 None,
1099 )
1100
1101 if compute_node_mappings:
1102 # process port_mapping pci of type 0000:af:1[01].[1357]
1103 pmap = next(
1104 (
1105 p
1106 for p in compute_node_mappings["ports"]
1107 if self._match_pci(port["pci"], p.get("pci"))
1108 ),
1109 None,
1110 )
1111
1112 if not pmap:
1113 if not db_vim["config"].get("mapping_not_needed"):
1114 error_list.append(
1115 "Port mapping not found for compute_node={} pci={}".format(
1116 port["compute_node"], port["pci"]
1117 )
1118 )
1119 continue
1120
1121 pmap = {}
1122
1123 service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"])
1124 new_port = {
1125 "service_endpoint_id": pmap.get("service_endpoint_id")
1126 or service_endpoint_id,
1127 "service_endpoint_encapsulation_type": "dot1q"
1128 if port["type"] == "SR-IOV"
1129 else None,
1130 "service_endpoint_encapsulation_info": {
1131 "vlan": port.get("vlan"),
1132 "mac": port.get("mac-address"),
1133 "device_id": pmap.get("device_id") or port["compute_node"],
1134 "device_interface_id": pmap.get("device_interface_id")
1135 or port["pci"],
1136 "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"),
1137 "switch_port": pmap.get("switch_port"),
1138 "service_mapping_info": pmap.get("service_mapping_info"),
1139 },
1140 }
1141
1142 # TODO
1143 # if port["modified_at"] > last_update:
1144 # sdn_need_update = True
1145 new_connected_ports.append(port["id"]) # TODO
1146 sdn_ports.append(new_port)
1147
1148 if error_ports:
1149 error_list.append(
1150 "{} interfaces have not been created as VDU is on ERROR status".format(
1151 error_ports
1152 )
1153 )
1154
1155 # connect external ports
1156 for index, additional_port in enumerate(additional_ports):
1157 additional_port_id = additional_port.get(
1158 "service_endpoint_id"
1159 ) or "external-{}".format(index)
1160 sdn_ports.append(
1161 {
1162 "service_endpoint_id": additional_port_id,
1163 "service_endpoint_encapsulation_type": additional_port.get(
1164 "service_endpoint_encapsulation_type", "dot1q"
1165 ),
1166 "service_endpoint_encapsulation_info": {
1167 "vlan": additional_port.get("vlan") or vlan_used,
1168 "mac": additional_port.get("mac_address"),
1169 "device_id": additional_port.get("device_id"),
1170 "device_interface_id": additional_port.get(
1171 "device_interface_id"
1172 ),
1173 "switch_dpid": additional_port.get("switch_dpid")
1174 or additional_port.get("switch_id"),
1175 "switch_port": additional_port.get("switch_port"),
1176 "service_mapping_info": additional_port.get(
1177 "service_mapping_info"
1178 ),
1179 },
1180 }
1181 )
1182 new_connected_ports.append(additional_port_id)
1183 sdn_info = ""
1184
1185 # if there are more ports to connect or they have been modified, call create/update
1186 if error_list:
1187 sdn_status = "ERROR"
1188 sdn_info = "; ".join(error_list)
1189 elif set(connected_ports) != set(new_connected_ports) or sdn_need_update:
1190 last_update = time.time()
1191
1192 if not sdn_net_id:
1193 if len(sdn_ports) < 2:
1194 sdn_status = "ACTIVE"
1195
1196 if not pending_ports:
1197 self.logger.debug(
1198 "task={} {} new-sdn-net done, less than 2 ports".format(
1199 task_id, ro_task["target_id"]
1200 )
1201 )
1202 else:
1203 net_type = params.get("type") or "ELAN"
1204 (
1205 sdn_net_id,
1206 created_items,
1207 ) = target_vim.create_connectivity_service(net_type, sdn_ports)
1208 created = True
1209 self.logger.debug(
1210 "task={} {} new-sdn-net={} created={}".format(
1211 task_id, ro_task["target_id"], sdn_net_id, created
1212 )
1213 )
1214 else:
1215 created_items = target_vim.edit_connectivity_service(
1216 sdn_net_id, conn_info=created_items, connection_points=sdn_ports
1217 )
1218 created = True
1219 self.logger.debug(
1220 "task={} {} update-sdn-net={} created={}".format(
1221 task_id, ro_task["target_id"], sdn_net_id, created
1222 )
1223 )
1224
1225 connected_ports = new_connected_ports
1226 elif sdn_net_id:
1227 wim_status_dict = target_vim.get_connectivity_service_status(
1228 sdn_net_id, conn_info=created_items
1229 )
1230 sdn_status = wim_status_dict["sdn_status"]
1231
1232 if wim_status_dict.get("sdn_info"):
1233 sdn_info = str(wim_status_dict.get("sdn_info")) or ""
1234
1235 if wim_status_dict.get("error_msg"):
1236 sdn_info = wim_status_dict.get("error_msg") or ""
1237
1238 if pending_ports:
1239 if sdn_status != "ERROR":
1240 sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1241 len(ports) - pending_ports, len(ports)
1242 )
1243
1244 if sdn_status == "ACTIVE":
1245 sdn_status = "BUILD"
1246
1247 ro_vim_item_update = {
1248 "vim_id": sdn_net_id,
1249 "vim_status": sdn_status,
1250 "created": created,
1251 "created_items": created_items,
1252 "connected_ports": connected_ports,
1253 "vim_details": sdn_info,
1254 "vim_message": None,
1255 "last_update": last_update,
1256 }
1257
1258 return sdn_status, ro_vim_item_update
1259 except Exception as e:
1260 self.logger.error(
1261 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e),
1262 exc_info=not isinstance(
1263 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1264 ),
1265 )
1266 ro_vim_item_update = {
1267 "vim_status": "VIM_ERROR",
1268 "created": created,
1269 "vim_message": str(e),
1270 }
1271
1272 return "FAILED", ro_vim_item_update
1273
1274 def delete(self, ro_task, task_index):
1275 task = ro_task["tasks"][task_index]
1276 task_id = task["task_id"]
1277 sdn_vim_id = ro_task["vim_info"].get("vim_id")
1278 ro_vim_item_update_ok = {
1279 "vim_status": "DELETED",
1280 "created": False,
1281 "vim_message": "DELETED",
1282 "vim_id": None,
1283 }
1284
1285 try:
1286 if sdn_vim_id:
1287 target_vim = self.my_vims[ro_task["target_id"]]
1288 target_vim.delete_connectivity_service(
1289 sdn_vim_id, ro_task["vim_info"].get("created_items")
1290 )
1291
1292 except Exception as e:
1293 if (
1294 isinstance(e, sdnconn.SdnConnectorError)
1295 and e.http_code == HTTPStatus.NOT_FOUND.value
1296 ):
1297 ro_vim_item_update_ok["vim_message"] = "already deleted"
1298 else:
1299 self.logger.error(
1300 "ro_task={} vim={} del-sdn-net={}: {}".format(
1301 ro_task["_id"], ro_task["target_id"], sdn_vim_id, e
1302 ),
1303 exc_info=not isinstance(
1304 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1305 ),
1306 )
1307 ro_vim_item_update = {
1308 "vim_status": "VIM_ERROR",
1309 "vim_message": "Error while deleting: {}".format(e),
1310 }
1311
1312 return "FAILED", ro_vim_item_update
1313
1314 self.logger.debug(
1315 "task={} {} del-sdn-net={} {}".format(
1316 task_id,
1317 ro_task["target_id"],
1318 sdn_vim_id,
1319 ro_vim_item_update_ok.get("vim_message", ""),
1320 )
1321 )
1322
1323 return "DONE", ro_vim_item_update_ok
1324
1325
1326 class VimInteractionMigration(VimInteractionBase):
1327 def exec(self, ro_task, task_index, task_depends):
1328 task = ro_task["tasks"][task_index]
1329 task_id = task["task_id"]
1330 db_task_update = {"retries": 0}
1331 target_vim = self.my_vims[ro_task["target_id"]]
1332 vim_interfaces = []
1333 created = False
1334 created_items = {}
1335 refreshed_vim_info = {}
1336
1337 try:
1338 if task.get("params"):
1339 vim_vm_id = task["params"].get("vim_vm_id")
1340 migrate_host = task["params"].get("migrate_host")
1341 _, migrated_compute_node = target_vim.migrate_instance(
1342 vim_vm_id, migrate_host
1343 )
1344
1345 if migrated_compute_node:
1346 # When VM is migrated, vdu["vim_info"] needs to be updated
1347 vdu_old_vim_info = task["params"]["vdu_vim_info"].get(
1348 ro_task["target_id"]
1349 )
1350
1351 # Refresh VM to get new vim_info
1352 vm_to_refresh_list = [vim_vm_id]
1353 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
1354 refreshed_vim_info = vim_dict[vim_vm_id]
1355
1356 if refreshed_vim_info.get("interfaces"):
1357 for old_iface in vdu_old_vim_info.get("interfaces"):
1358 iface = next(
1359 (
1360 iface
1361 for iface in refreshed_vim_info["interfaces"]
1362 if old_iface["vim_interface_id"]
1363 == iface["vim_interface_id"]
1364 ),
1365 None,
1366 )
1367 vim_interfaces.append(iface)
1368
1369 ro_vim_item_update = {
1370 "vim_id": vim_vm_id,
1371 "vim_status": "ACTIVE",
1372 "created": created,
1373 "created_items": created_items,
1374 "vim_details": None,
1375 "vim_message": None,
1376 }
1377
1378 if refreshed_vim_info and refreshed_vim_info.get("status") not in (
1379 "ERROR",
1380 "VIM_ERROR",
1381 ):
1382 ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"]
1383
1384 if vim_interfaces:
1385 ro_vim_item_update["interfaces"] = vim_interfaces
1386
1387 self.logger.debug(
1388 "task={} {} vm-migration done".format(task_id, ro_task["target_id"])
1389 )
1390
1391 return "DONE", ro_vim_item_update, db_task_update
1392
1393 except (vimconn.VimConnException, NsWorkerException) as e:
1394 self.logger.error(
1395 "task={} vim={} VM Migration:"
1396 " {}".format(task_id, ro_task["target_id"], e)
1397 )
1398 ro_vim_item_update = {
1399 "vim_status": "VIM_ERROR",
1400 "created": created,
1401 "vim_message": str(e),
1402 }
1403
1404 return "FAILED", ro_vim_item_update, db_task_update
1405
1406
1407 class VimInteractionResize(VimInteractionBase):
1408 def exec(self, ro_task, task_index, task_depends):
1409 task = ro_task["tasks"][task_index]
1410 task_id = task["task_id"]
1411 db_task_update = {"retries": 0}
1412 created = False
1413 target_flavor_uuid = None
1414 created_items = {}
1415 refreshed_vim_info = {}
1416 target_vim = self.my_vims[ro_task["target_id"]]
1417
1418 try:
1419 if task.get("params"):
1420 vim_vm_id = task["params"].get("vim_vm_id")
1421 flavor_dict = task["params"].get("flavor_dict")
1422 self.logger.info("flavor_dict %s", flavor_dict)
1423
1424 try:
1425 target_flavor_uuid = target_vim.get_flavor_id_from_data(flavor_dict)
1426 except Exception as e:
1427 self.logger.info("Cannot find any flavor matching %s.", str(e))
1428 try:
1429 target_flavor_uuid = target_vim.new_flavor(flavor_dict)
1430 except Exception as e:
1431 self.logger.error("Error creating flavor at VIM %s.", str(e))
1432
1433 if target_flavor_uuid is not None:
1434 resized_status = target_vim.resize_instance(
1435 vim_vm_id, target_flavor_uuid
1436 )
1437
1438 if resized_status:
1439 # Refresh VM to get new vim_info
1440 vm_to_refresh_list = [vim_vm_id]
1441 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
1442 refreshed_vim_info = vim_dict[vim_vm_id]
1443
1444 ro_vim_item_update = {
1445 "vim_id": vim_vm_id,
1446 "vim_status": "DONE",
1447 "created": created,
1448 "created_items": created_items,
1449 "vim_details": None,
1450 "vim_message": None,
1451 }
1452
1453 if refreshed_vim_info and refreshed_vim_info.get("status") not in (
1454 "ERROR",
1455 "VIM_ERROR",
1456 ):
1457 ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"]
1458
1459 self.logger.debug(
1460 "task={} {} resize done".format(task_id, ro_task["target_id"])
1461 )
1462 return "DONE", ro_vim_item_update, db_task_update
1463 except (vimconn.VimConnException, NsWorkerException) as e:
1464 self.logger.error(
1465 "task={} vim={} Resize:" " {}".format(task_id, ro_task["target_id"], e)
1466 )
1467 ro_vim_item_update = {
1468 "vim_status": "VIM_ERROR",
1469 "created": created,
1470 "vim_message": str(e),
1471 }
1472
1473 return "FAILED", ro_vim_item_update, db_task_update
1474
1475
1476 class ConfigValidate:
1477 def __init__(self, config: Dict):
1478 self.conf = config
1479
1480 @property
1481 def active(self):
1482 # default 1 min, allowed >= 60 or -1, -1 disables periodic checks
1483 if (
1484 self.conf["period"]["refresh_active"] >= 60
1485 or self.conf["period"]["refresh_active"] == -1
1486 ):
1487 return self.conf["period"]["refresh_active"]
1488
1489 return 60
1490
1491 @property
1492 def build(self):
1493 return self.conf["period"]["refresh_build"]
1494
1495 @property
1496 def image(self):
1497 return self.conf["period"]["refresh_image"]
1498
1499 @property
1500 def error(self):
1501 return self.conf["period"]["refresh_error"]
1502
1503 @property
1504 def queue_size(self):
1505 return self.conf["period"]["queue_size"]
1506
1507
1508 class NsWorker(threading.Thread):
1509 def __init__(self, worker_index, config, plugins, db):
1510 """
1511 :param worker_index: thread index
1512 :param config: general configuration of RO, among others the process_id with the docker id where it runs
1513 :param plugins: global shared dict with the loaded plugins
1514 :param db: database class instance to use
1515 """
1516 threading.Thread.__init__(self)
1517 self.config = config
1518 self.plugins = plugins
1519 self.plugin_name = "unknown"
1520 self.logger = logging.getLogger("ro.worker{}".format(worker_index))
1521 self.worker_index = worker_index
1522 # refresh periods for created items
1523 self.refresh_config = ConfigValidate(config)
1524 self.task_queue = queue.Queue(self.refresh_config.queue_size)
1525 # targetvim: vimplugin class
1526 self.my_vims = {}
1527 # targetvim: vim information from database
1528 self.db_vims = {}
1529 # targetvim list
1530 self.vim_targets = []
1531 self.my_id = config["process_id"] + ":" + str(worker_index)
1532 self.db = db
1533 self.item2class = {
1534 "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger),
1535 "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger),
1536 "image": VimInteractionImage(
1537 self.db, self.my_vims, self.db_vims, self.logger
1538 ),
1539 "flavor": VimInteractionFlavor(
1540 self.db, self.my_vims, self.db_vims, self.logger
1541 ),
1542 "sdn_net": VimInteractionSdnNet(
1543 self.db, self.my_vims, self.db_vims, self.logger
1544 ),
1545 "update": VimInteractionUpdateVdu(
1546 self.db, self.my_vims, self.db_vims, self.logger
1547 ),
1548 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
1549 self.db, self.my_vims, self.db_vims, self.logger
1550 ),
1551 "migrate": VimInteractionMigration(
1552 self.db, self.my_vims, self.db_vims, self.logger
1553 ),
1554 "verticalscale": VimInteractionResize(
1555 self.db, self.my_vims, self.db_vims, self.logger
1556 ),
1557 }
1558 self.time_last_task_processed = None
1559 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1560 self.tasks_to_delete = []
1561 # it is idle when there are not vim_targets associated
1562 self.idle = True
1563 self.task_locked_time = config["global"]["task_locked_time"]
1564
1565 def insert_task(self, task):
1566 try:
1567 self.task_queue.put(task, False)
1568 return None
1569 except queue.Full:
1570 raise NsWorkerException("timeout inserting a task")
1571
1572 def terminate(self):
1573 self.insert_task("exit")
1574
1575 def del_task(self, task):
1576 with self.task_lock:
1577 if task["status"] == "SCHEDULED":
1578 task["status"] = "SUPERSEDED"
1579 return True
1580 else: # task["status"] == "processing"
1581 self.task_lock.release()
1582 return False
1583
1584 def _process_vim_config(self, target_id, db_vim):
1585 """
1586 Process vim config, creating vim configuration files as ca_cert
1587 :param target_id: vim/sdn/wim + id
1588 :param db_vim: Vim dictionary obtained from database
1589 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1590 """
1591 if not db_vim.get("config"):
1592 return
1593
1594 file_name = ""
1595
1596 try:
1597 if db_vim["config"].get("ca_cert_content"):
1598 file_name = "{}:{}".format(target_id, self.worker_index)
1599
1600 try:
1601 mkdir(file_name)
1602 except FileExistsError:
1603 pass
1604
1605 file_name = file_name + "/ca_cert"
1606
1607 with open(file_name, "w") as f:
1608 f.write(db_vim["config"]["ca_cert_content"])
1609 del db_vim["config"]["ca_cert_content"]
1610 db_vim["config"]["ca_cert"] = file_name
1611 except Exception as e:
1612 raise NsWorkerException(
1613 "Error writing to file '{}': {}".format(file_name, e)
1614 )
1615
1616 def _load_plugin(self, name, type="vim"):
1617 # type can be vim or sdn
1618 if "rovim_dummy" not in self.plugins:
1619 self.plugins["rovim_dummy"] = VimDummyConnector
1620
1621 if "rosdn_dummy" not in self.plugins:
1622 self.plugins["rosdn_dummy"] = SdnDummyConnector
1623
1624 if name in self.plugins:
1625 return self.plugins[name]
1626
1627 try:
1628 for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name):
1629 self.plugins[name] = ep.load()
1630 except Exception as e:
1631 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e))
1632
1633 if name and name not in self.plugins:
1634 raise NsWorkerException(
1635 "Plugin 'osm_{n}' has not been installed".format(n=name)
1636 )
1637
1638 return self.plugins[name]
1639
1640 def _unload_vim(self, target_id):
1641 """
1642 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1643 :param target_id: Contains type:_id; where type can be 'vim', ...
1644 :return: None.
1645 """
1646 try:
1647 self.db_vims.pop(target_id, None)
1648 self.my_vims.pop(target_id, None)
1649
1650 if target_id in self.vim_targets:
1651 self.vim_targets.remove(target_id)
1652
1653 self.logger.info("Unloaded {}".format(target_id))
1654 rmtree("{}:{}".format(target_id, self.worker_index))
1655 except FileNotFoundError:
1656 pass # this is raised by rmtree if folder does not exist
1657 except Exception as e:
1658 self.logger.error("Cannot unload {}: {}".format(target_id, e))
1659
1660 def _check_vim(self, target_id):
1661 """
1662 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1663 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1664 :return: None.
1665 """
1666 target, _, _id = target_id.partition(":")
1667 now = time.time()
1668 update_dict = {}
1669 unset_dict = {}
1670 op_text = ""
1671 step = ""
1672 loaded = target_id in self.vim_targets
1673 target_database = (
1674 "vim_accounts"
1675 if target == "vim"
1676 else "wim_accounts"
1677 if target == "wim"
1678 else "sdns"
1679 )
1680
1681 try:
1682 step = "Getting {} from db".format(target_id)
1683 db_vim = self.db.get_one(target_database, {"_id": _id})
1684
1685 for op_index, operation in enumerate(
1686 db_vim["_admin"].get("operations", ())
1687 ):
1688 if operation["operationState"] != "PROCESSING":
1689 continue
1690
1691 locked_at = operation.get("locked_at")
1692
1693 if locked_at is not None and locked_at >= now - self.task_locked_time:
1694 # some other thread is doing this operation
1695 return
1696
1697 # lock
1698 op_text = "_admin.operations.{}.".format(op_index)
1699
1700 if not self.db.set_one(
1701 target_database,
1702 q_filter={
1703 "_id": _id,
1704 op_text + "operationState": "PROCESSING",
1705 op_text + "locked_at": locked_at,
1706 },
1707 update_dict={
1708 op_text + "locked_at": now,
1709 "admin.current_operation": op_index,
1710 },
1711 fail_on_empty=False,
1712 ):
1713 return
1714
1715 unset_dict[op_text + "locked_at"] = None
1716 unset_dict["current_operation"] = None
1717 step = "Loading " + target_id
1718 error_text = self._load_vim(target_id)
1719
1720 if not error_text:
1721 step = "Checking connectivity"
1722
1723 if target == "vim":
1724 self.my_vims[target_id].check_vim_connectivity()
1725 else:
1726 self.my_vims[target_id].check_credentials()
1727
1728 update_dict["_admin.operationalState"] = "ENABLED"
1729 update_dict["_admin.detailed-status"] = ""
1730 unset_dict[op_text + "detailed-status"] = None
1731 update_dict[op_text + "operationState"] = "COMPLETED"
1732
1733 return
1734
1735 except Exception as e:
1736 error_text = "{}: {}".format(step, e)
1737 self.logger.error("{} for {}: {}".format(step, target_id, e))
1738
1739 finally:
1740 if update_dict or unset_dict:
1741 if error_text:
1742 update_dict[op_text + "operationState"] = "FAILED"
1743 update_dict[op_text + "detailed-status"] = error_text
1744 unset_dict.pop(op_text + "detailed-status", None)
1745 update_dict["_admin.operationalState"] = "ERROR"
1746 update_dict["_admin.detailed-status"] = error_text
1747
1748 if op_text:
1749 update_dict[op_text + "statusEnteredTime"] = now
1750
1751 self.db.set_one(
1752 target_database,
1753 q_filter={"_id": _id},
1754 update_dict=update_dict,
1755 unset=unset_dict,
1756 fail_on_empty=False,
1757 )
1758
1759 if not loaded:
1760 self._unload_vim(target_id)
1761
1762 def _reload_vim(self, target_id):
1763 if target_id in self.vim_targets:
1764 self._load_vim(target_id)
1765 else:
1766 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1767 # just remove it to force load again next time it is needed
1768 self.db_vims.pop(target_id, None)
1769
1770 def _load_vim(self, target_id):
1771 """
1772 Load or reload a vim_account, sdn_controller or wim_account.
1773 Read content from database, load the plugin if not loaded.
1774 In case of error loading the plugin, it load a failing VIM_connector
1775 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1776 :param target_id: Contains type:_id; where type can be 'vim', ...
1777 :return: None if ok, descriptive text if error
1778 """
1779 target, _, _id = target_id.partition(":")
1780 target_database = (
1781 "vim_accounts"
1782 if target == "vim"
1783 else "wim_accounts"
1784 if target == "wim"
1785 else "sdns"
1786 )
1787 plugin_name = ""
1788 vim = None
1789
1790 try:
1791 step = "Getting {}={} from db".format(target, _id)
1792 # TODO process for wim, sdnc, ...
1793 vim = self.db.get_one(target_database, {"_id": _id})
1794
1795 # if deep_get(vim, "config", "sdn-controller"):
1796 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1797 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1798
1799 step = "Decrypting password"
1800 schema_version = vim.get("schema_version")
1801 self.db.encrypt_decrypt_fields(
1802 vim,
1803 "decrypt",
1804 fields=("password", "secret"),
1805 schema_version=schema_version,
1806 salt=_id,
1807 )
1808 self._process_vim_config(target_id, vim)
1809
1810 if target == "vim":
1811 plugin_name = "rovim_" + vim["vim_type"]
1812 step = "Loading plugin '{}'".format(plugin_name)
1813 vim_module_conn = self._load_plugin(plugin_name)
1814 step = "Loading {}'".format(target_id)
1815 self.my_vims[target_id] = vim_module_conn(
1816 uuid=vim["_id"],
1817 name=vim["name"],
1818 tenant_id=vim.get("vim_tenant_id"),
1819 tenant_name=vim.get("vim_tenant_name"),
1820 url=vim["vim_url"],
1821 url_admin=None,
1822 user=vim["vim_user"],
1823 passwd=vim["vim_password"],
1824 config=vim.get("config") or {},
1825 persistent_info={},
1826 )
1827 else: # sdn
1828 plugin_name = "rosdn_" + vim["type"]
1829 step = "Loading plugin '{}'".format(plugin_name)
1830 vim_module_conn = self._load_plugin(plugin_name, "sdn")
1831 step = "Loading {}'".format(target_id)
1832 wim = deepcopy(vim)
1833 wim_config = wim.pop("config", {}) or {}
1834 wim["uuid"] = wim["_id"]
1835 wim["wim_url"] = wim["url"]
1836
1837 if wim.get("dpid"):
1838 wim_config["dpid"] = wim.pop("dpid")
1839
1840 if wim.get("switch_id"):
1841 wim_config["switch_id"] = wim.pop("switch_id")
1842
1843 # wim, wim_account, config
1844 self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config)
1845 self.db_vims[target_id] = vim
1846 self.error_status = None
1847
1848 self.logger.info(
1849 "Connector loaded for {}, plugin={}".format(target_id, plugin_name)
1850 )
1851 except Exception as e:
1852 self.logger.error(
1853 "Cannot load {} plugin={}: {} {}".format(
1854 target_id, plugin_name, step, e
1855 )
1856 )
1857
1858 self.db_vims[target_id] = vim or {}
1859 self.db_vims[target_id] = FailingConnector(str(e))
1860 error_status = "{} Error: {}".format(step, e)
1861
1862 return error_status
1863 finally:
1864 if target_id not in self.vim_targets:
1865 self.vim_targets.append(target_id)
1866
1867 def _get_db_task(self):
1868 """
1869 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1870 :return: None
1871 """
1872 now = time.time()
1873
1874 if not self.time_last_task_processed:
1875 self.time_last_task_processed = now
1876
1877 try:
1878 while True:
1879 """
1880 # Log RO tasks only when loglevel is DEBUG
1881 if self.logger.getEffectiveLevel() == logging.DEBUG:
1882 self._log_ro_task(
1883 None,
1884 None,
1885 None,
1886 "TASK_WF",
1887 "task_locked_time="
1888 + str(self.task_locked_time)
1889 + " "
1890 + "time_last_task_processed="
1891 + str(self.time_last_task_processed)
1892 + " "
1893 + "now="
1894 + str(now),
1895 )
1896 """
1897 locked = self.db.set_one(
1898 "ro_tasks",
1899 q_filter={
1900 "target_id": self.vim_targets,
1901 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1902 "locked_at.lt": now - self.task_locked_time,
1903 "to_check_at.lt": self.time_last_task_processed,
1904 "to_check_at.gt": -1,
1905 },
1906 update_dict={"locked_by": self.my_id, "locked_at": now},
1907 fail_on_empty=False,
1908 )
1909
1910 if locked:
1911 # read and return
1912 ro_task = self.db.get_one(
1913 "ro_tasks",
1914 q_filter={
1915 "target_id": self.vim_targets,
1916 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1917 "locked_at": now,
1918 },
1919 )
1920 return ro_task
1921
1922 if self.time_last_task_processed == now:
1923 self.time_last_task_processed = None
1924 return None
1925 else:
1926 self.time_last_task_processed = now
1927 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
1928
1929 except DbException as e:
1930 self.logger.error("Database exception at _get_db_task: {}".format(e))
1931 except Exception as e:
1932 self.logger.critical(
1933 "Unexpected exception at _get_db_task: {}".format(e), exc_info=True
1934 )
1935
1936 return None
1937
1938 def _get_db_all_tasks(self):
1939 """
1940 Read all content of table ro_tasks to log it
1941 :return: None
1942 """
1943 try:
1944 # Checking the content of the BD:
1945
1946 # read and return
1947 ro_task = self.db.get_list("ro_tasks")
1948 for rt in ro_task:
1949 self._log_ro_task(rt, None, None, "TASK_WF", "GET_ALL_TASKS")
1950 return ro_task
1951
1952 except DbException as e:
1953 self.logger.error("Database exception at _get_db_all_tasks: {}".format(e))
1954 except Exception as e:
1955 self.logger.critical(
1956 "Unexpected exception at _get_db_all_tasks: {}".format(e), exc_info=True
1957 )
1958
1959 return None
1960
1961 def _log_ro_task(self, ro_task, db_ro_task_update, db_ro_task_delete, mark, event):
1962 """
1963 Generate a log with the following format:
1964
1965 Mark;Event;ro_task_id;locked_at;modified_at;created_at;to_check_at;locked_by;
1966 target_id;vim_info.refresh_at;vim_info;no_of_tasks;task_status;action_id;
1967 task_array_index;task_id;task_action;task_item;task_args
1968
1969 Example:
1970
1971 TASK_WF;GET_TASK;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;1642158724.8210013;
1972 1642158640.7986135;1642158640.7986135;1642158640.7986135;b134c9494e75:0a
1973 ;vim:b7ff9e24-8868-4d68-8a57-a59dc11d0327;None;{'created': False,
1974 'created_items': None, 'vim_id': None, 'vim_name': None, 'vim_status': None,
1975 'vim_details': None, 'vim_message': None, 'refresh_at': None};1;SCHEDULED;
1976 888f1864-749a-4fc2-bc1a-97c0fffd6a6f;0;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;
1977 CREATE;image;{'filter_dict': {'name': 'ubuntu-os-cloud:image-family:ubuntu-1804-lts'}}
1978 """
1979 try:
1980 line = []
1981 i = 0
1982 if ro_task is not None and isinstance(ro_task, dict):
1983 for t in ro_task["tasks"]:
1984 line.clear()
1985 line.append(mark)
1986 line.append(event)
1987 line.append(ro_task.get("_id", ""))
1988 line.append(str(ro_task.get("locked_at", "")))
1989 line.append(str(ro_task.get("modified_at", "")))
1990 line.append(str(ro_task.get("created_at", "")))
1991 line.append(str(ro_task.get("to_check_at", "")))
1992 line.append(str(ro_task.get("locked_by", "")))
1993 line.append(str(ro_task.get("target_id", "")))
1994 line.append(str(ro_task.get("vim_info", {}).get("refresh_at", "")))
1995 line.append(str(ro_task.get("vim_info", "")))
1996 line.append(str(ro_task.get("tasks", "")))
1997 if isinstance(t, dict):
1998 line.append(str(t.get("status", "")))
1999 line.append(str(t.get("action_id", "")))
2000 line.append(str(i))
2001 line.append(str(t.get("task_id", "")))
2002 line.append(str(t.get("action", "")))
2003 line.append(str(t.get("item", "")))
2004 line.append(str(t.get("find_params", "")))
2005 line.append(str(t.get("params", "")))
2006 else:
2007 line.extend([""] * 2)
2008 line.append(str(i))
2009 line.extend([""] * 5)
2010
2011 i += 1
2012 self.logger.debug(";".join(line))
2013 elif db_ro_task_update is not None and isinstance(db_ro_task_update, dict):
2014 i = 0
2015 while True:
2016 st = "tasks.{}.status".format(i)
2017 if st not in db_ro_task_update:
2018 break
2019 line.clear()
2020 line.append(mark)
2021 line.append(event)
2022 line.append(db_ro_task_update.get("_id", ""))
2023 line.append(str(db_ro_task_update.get("locked_at", "")))
2024 line.append(str(db_ro_task_update.get("modified_at", "")))
2025 line.append("")
2026 line.append(str(db_ro_task_update.get("to_check_at", "")))
2027 line.append(str(db_ro_task_update.get("locked_by", "")))
2028 line.append("")
2029 line.append(str(db_ro_task_update.get("vim_info.refresh_at", "")))
2030 line.append("")
2031 line.append(str(db_ro_task_update.get("vim_info", "")))
2032 line.append(str(str(db_ro_task_update).count(".status")))
2033 line.append(db_ro_task_update.get(st, ""))
2034 line.append("")
2035 line.append(str(i))
2036 line.extend([""] * 3)
2037 i += 1
2038 self.logger.debug(";".join(line))
2039
2040 elif db_ro_task_delete is not None and isinstance(db_ro_task_delete, dict):
2041 line.clear()
2042 line.append(mark)
2043 line.append(event)
2044 line.append(db_ro_task_delete.get("_id", ""))
2045 line.append("")
2046 line.append(db_ro_task_delete.get("modified_at", ""))
2047 line.extend([""] * 13)
2048 self.logger.debug(";".join(line))
2049
2050 else:
2051 line.clear()
2052 line.append(mark)
2053 line.append(event)
2054 line.extend([""] * 16)
2055 self.logger.debug(";".join(line))
2056
2057 except Exception as e:
2058 self.logger.error("Error logging ro_task: {}".format(e))
2059
2060 def _delete_task(self, ro_task, task_index, task_depends, db_update):
2061 """
2062 Determine if this task need to be done or superseded
2063 :return: None
2064 """
2065 my_task = ro_task["tasks"][task_index]
2066 task_id = my_task["task_id"]
2067 needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get(
2068 "created_items", False
2069 )
2070
2071 self.logger.warning("Needed delete: {}".format(needed_delete))
2072 if my_task["status"] == "FAILED":
2073 return None, None # TODO need to be retry??
2074
2075 try:
2076 for index, task in enumerate(ro_task["tasks"]):
2077 if index == task_index or not task:
2078 continue # own task
2079
2080 if (
2081 my_task["target_record"] == task["target_record"]
2082 and task["action"] == "CREATE"
2083 ):
2084 # set to finished
2085 db_update["tasks.{}.status".format(index)] = task[
2086 "status"
2087 ] = "FINISHED"
2088 elif task["action"] == "CREATE" and task["status"] not in (
2089 "FINISHED",
2090 "SUPERSEDED",
2091 ):
2092 needed_delete = False
2093
2094 if needed_delete:
2095 self.logger.warning(
2096 "Deleting ro_task={} task_index={}".format(ro_task, task_index)
2097 )
2098 return self.item2class[my_task["item"]].delete(ro_task, task_index)
2099 else:
2100 return "SUPERSEDED", None
2101 except Exception as e:
2102 if not isinstance(e, NsWorkerException):
2103 self.logger.critical(
2104 "Unexpected exception at _delete_task task={}: {}".format(
2105 task_id, e
2106 ),
2107 exc_info=True,
2108 )
2109
2110 return "FAILED", {"vim_status": "VIM_ERROR", "vim_message": str(e)}
2111
2112 def _create_task(self, ro_task, task_index, task_depends, db_update):
2113 """
2114 Determine if this task need to create something at VIM
2115 :return: None
2116 """
2117 my_task = ro_task["tasks"][task_index]
2118 task_id = my_task["task_id"]
2119 task_status = None
2120
2121 if my_task["status"] == "FAILED":
2122 return None, None # TODO need to be retry??
2123 elif my_task["status"] == "SCHEDULED":
2124 # check if already created by another task
2125 for index, task in enumerate(ro_task["tasks"]):
2126 if index == task_index or not task:
2127 continue # own task
2128
2129 if task["action"] == "CREATE" and task["status"] not in (
2130 "SCHEDULED",
2131 "FINISHED",
2132 "SUPERSEDED",
2133 ):
2134 return task["status"], "COPY_VIM_INFO"
2135
2136 try:
2137 task_status, ro_vim_item_update = self.item2class[my_task["item"]].new(
2138 ro_task, task_index, task_depends
2139 )
2140 # TODO update other CREATE tasks
2141 except Exception as e:
2142 if not isinstance(e, NsWorkerException):
2143 self.logger.error(
2144 "Error executing task={}: {}".format(task_id, e), exc_info=True
2145 )
2146
2147 task_status = "FAILED"
2148 ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_message": str(e)}
2149 # TODO update ro_vim_item_update
2150
2151 return task_status, ro_vim_item_update
2152 else:
2153 return None, None
2154
2155 def _get_dependency(self, task_id, ro_task=None, target_id=None):
2156 """
2157 Look for dependency task
2158 :param task_id: Can be one of
2159 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2160 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2161 3. task.task_id: "<action_id>:number"
2162 :param ro_task:
2163 :param target_id:
2164 :return: database ro_task plus index of task
2165 """
2166 if (
2167 task_id.startswith("vim:")
2168 or task_id.startswith("sdn:")
2169 or task_id.startswith("wim:")
2170 ):
2171 target_id, _, task_id = task_id.partition(" ")
2172
2173 if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
2174 ro_task_dependency = self.db.get_one(
2175 "ro_tasks",
2176 q_filter={"target_id": target_id, "tasks.target_record_id": task_id},
2177 fail_on_empty=False,
2178 )
2179
2180 if ro_task_dependency:
2181 for task_index, task in enumerate(ro_task_dependency["tasks"]):
2182 if task["target_record_id"] == task_id:
2183 return ro_task_dependency, task_index
2184
2185 else:
2186 if ro_task:
2187 for task_index, task in enumerate(ro_task["tasks"]):
2188 if task and task["task_id"] == task_id:
2189 return ro_task, task_index
2190
2191 ro_task_dependency = self.db.get_one(
2192 "ro_tasks",
2193 q_filter={
2194 "tasks.ANYINDEX.task_id": task_id,
2195 "tasks.ANYINDEX.target_record.ne": None,
2196 },
2197 fail_on_empty=False,
2198 )
2199
2200 self.logger.warning("ro_task_dependency={}".format(ro_task_dependency))
2201 if ro_task_dependency:
2202 for task_index, task in enumerate(ro_task_dependency["tasks"]):
2203 if task["task_id"] == task_id:
2204 return ro_task_dependency, task_index
2205 raise NsWorkerException("Cannot get depending task {}".format(task_id))
2206
2207 def update_vm_refresh(self):
2208 """Enables the VM status updates if self.refresh_config.active parameter
2209 is not -1 and than updates the DB accordingly
2210
2211 """
2212 try:
2213 self.logger.debug("Checking if VM status update config")
2214 next_refresh = time.time()
2215 if self.refresh_config.active == -1:
2216 next_refresh = -1
2217 else:
2218 next_refresh += self.refresh_config.active
2219
2220 if next_refresh != -1:
2221 db_ro_task_update = {}
2222 now = time.time()
2223 next_check_at = now + (24 * 60 * 60)
2224 next_check_at = min(next_check_at, next_refresh)
2225 db_ro_task_update["vim_info.refresh_at"] = next_refresh
2226 db_ro_task_update["to_check_at"] = next_check_at
2227
2228 self.logger.debug(
2229 "Finding tasks which to be updated to enable VM status updates"
2230 )
2231 refresh_tasks = self.db.get_list(
2232 "ro_tasks",
2233 q_filter={
2234 "tasks.status": "DONE",
2235 "to_check_at.lt": 0,
2236 },
2237 )
2238 self.logger.debug("Updating tasks to change the to_check_at status")
2239 for task in refresh_tasks:
2240 q_filter = {
2241 "_id": task["_id"],
2242 }
2243 self.db.set_one(
2244 "ro_tasks",
2245 q_filter=q_filter,
2246 update_dict=db_ro_task_update,
2247 fail_on_empty=True,
2248 )
2249
2250 except Exception as e:
2251 self.logger.error(f"Error updating tasks to enable VM status updates: {e}")
2252
2253 def _process_pending_tasks(self, ro_task):
2254 ro_task_id = ro_task["_id"]
2255 now = time.time()
2256 # one day
2257 next_check_at = now + (24 * 60 * 60)
2258 db_ro_task_update = {}
2259
2260 def _update_refresh(new_status):
2261 # compute next_refresh
2262 nonlocal task
2263 nonlocal next_check_at
2264 nonlocal db_ro_task_update
2265 nonlocal ro_task
2266
2267 next_refresh = time.time()
2268
2269 if task["item"] in ("image", "flavor"):
2270 next_refresh += self.refresh_config.image
2271 elif new_status == "BUILD":
2272 next_refresh += self.refresh_config.build
2273 elif new_status == "DONE":
2274 if self.refresh_config.active == -1:
2275 next_refresh = -1
2276 else:
2277 next_refresh += self.refresh_config.active
2278 else:
2279 next_refresh += self.refresh_config.error
2280
2281 next_check_at = min(next_check_at, next_refresh)
2282 db_ro_task_update["vim_info.refresh_at"] = next_refresh
2283 ro_task["vim_info"]["refresh_at"] = next_refresh
2284
2285 try:
2286 """
2287 # Log RO tasks only when loglevel is DEBUG
2288 if self.logger.getEffectiveLevel() == logging.DEBUG:
2289 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
2290 """
2291 # Check if vim status refresh is enabled again
2292 self.update_vm_refresh()
2293 # 0: get task_status_create
2294 lock_object = None
2295 task_status_create = None
2296 task_create = next(
2297 (
2298 t
2299 for t in ro_task["tasks"]
2300 if t
2301 and t["action"] == "CREATE"
2302 and t["status"] in ("BUILD", "DONE")
2303 ),
2304 None,
2305 )
2306
2307 if task_create:
2308 task_status_create = task_create["status"]
2309
2310 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
2311 for task_action in ("DELETE", "CREATE", "EXEC"):
2312 db_vim_update = None
2313 new_status = None
2314
2315 for task_index, task in enumerate(ro_task["tasks"]):
2316 if not task:
2317 continue # task deleted
2318
2319 task_depends = {}
2320 target_update = None
2321
2322 if (
2323 (
2324 task_action in ("DELETE", "EXEC")
2325 and task["status"] not in ("SCHEDULED", "BUILD")
2326 )
2327 or task["action"] != task_action
2328 or (
2329 task_action == "CREATE"
2330 and task["status"] in ("FINISHED", "SUPERSEDED")
2331 )
2332 ):
2333 continue
2334
2335 task_path = "tasks.{}.status".format(task_index)
2336 try:
2337 db_vim_info_update = None
2338
2339 if task["status"] == "SCHEDULED":
2340 # check if tasks that this depends on have been completed
2341 dependency_not_completed = False
2342
2343 for dependency_task_id in task.get("depends_on") or ():
2344 (
2345 dependency_ro_task,
2346 dependency_task_index,
2347 ) = self._get_dependency(
2348 dependency_task_id, target_id=ro_task["target_id"]
2349 )
2350 dependency_task = dependency_ro_task["tasks"][
2351 dependency_task_index
2352 ]
2353 self.logger.warning(
2354 "dependency_ro_task={} dependency_task_index={}".format(
2355 dependency_ro_task, dependency_task_index
2356 )
2357 )
2358
2359 if dependency_task["status"] == "SCHEDULED":
2360 dependency_not_completed = True
2361 next_check_at = min(
2362 next_check_at, dependency_ro_task["to_check_at"]
2363 )
2364 # must allow dependent task to be processed first
2365 # to do this set time after last_task_processed
2366 next_check_at = max(
2367 self.time_last_task_processed, next_check_at
2368 )
2369 break
2370 elif dependency_task["status"] == "FAILED":
2371 error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
2372 task["action"],
2373 task["item"],
2374 dependency_task["action"],
2375 dependency_task["item"],
2376 dependency_task_id,
2377 dependency_ro_task["vim_info"].get(
2378 "vim_message"
2379 ),
2380 )
2381 self.logger.error(
2382 "task={} {}".format(task["task_id"], error_text)
2383 )
2384 raise NsWorkerException(error_text)
2385
2386 task_depends[dependency_task_id] = dependency_ro_task[
2387 "vim_info"
2388 ]["vim_id"]
2389 task_depends[
2390 "TASK-{}".format(dependency_task_id)
2391 ] = dependency_ro_task["vim_info"]["vim_id"]
2392
2393 if dependency_not_completed:
2394 self.logger.warning(
2395 "DEPENDENCY NOT COMPLETED {}".format(
2396 dependency_ro_task["vim_info"]["vim_id"]
2397 )
2398 )
2399 # TODO set at vim_info.vim_details that it is waiting
2400 continue
2401
2402 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
2403 # the task of renew this locking. It will update database locket_at periodically
2404 if not lock_object:
2405 lock_object = LockRenew.add_lock_object(
2406 "ro_tasks", ro_task, self
2407 )
2408
2409 if task["action"] == "DELETE":
2410 (new_status, db_vim_info_update,) = self._delete_task(
2411 ro_task, task_index, task_depends, db_ro_task_update
2412 )
2413 new_status = (
2414 "FINISHED" if new_status == "DONE" else new_status
2415 )
2416 # ^with FINISHED instead of DONE it will not be refreshing
2417
2418 if new_status in ("FINISHED", "SUPERSEDED"):
2419 target_update = "DELETE"
2420 elif task["action"] == "EXEC":
2421 (
2422 new_status,
2423 db_vim_info_update,
2424 db_task_update,
2425 ) = self.item2class[task["item"]].exec(
2426 ro_task, task_index, task_depends
2427 )
2428 new_status = (
2429 "FINISHED" if new_status == "DONE" else new_status
2430 )
2431 # ^with FINISHED instead of DONE it will not be refreshing
2432
2433 if db_task_update:
2434 # load into database the modified db_task_update "retries" and "next_retry"
2435 if db_task_update.get("retries"):
2436 db_ro_task_update[
2437 "tasks.{}.retries".format(task_index)
2438 ] = db_task_update["retries"]
2439
2440 next_check_at = time.time() + db_task_update.get(
2441 "next_retry", 60
2442 )
2443 target_update = None
2444 elif task["action"] == "CREATE":
2445 if task["status"] == "SCHEDULED":
2446 if task_status_create:
2447 new_status = task_status_create
2448 target_update = "COPY_VIM_INFO"
2449 else:
2450 new_status, db_vim_info_update = self.item2class[
2451 task["item"]
2452 ].new(ro_task, task_index, task_depends)
2453 # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
2454 _update_refresh(new_status)
2455 else:
2456 refresh_at = ro_task["vim_info"]["refresh_at"]
2457 if refresh_at and refresh_at != -1 and now > refresh_at:
2458 (new_status, db_vim_info_update,) = self.item2class[
2459 task["item"]
2460 ].refresh(ro_task)
2461 _update_refresh(new_status)
2462 else:
2463 # The refresh is updated to avoid set the value of "refresh_at" to
2464 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2465 # because it can happen that in this case the task is never processed
2466 _update_refresh(task["status"])
2467
2468 except Exception as e:
2469 new_status = "FAILED"
2470 db_vim_info_update = {
2471 "vim_status": "VIM_ERROR",
2472 "vim_message": str(e),
2473 }
2474
2475 if not isinstance(
2476 e, (NsWorkerException, vimconn.VimConnException)
2477 ):
2478 self.logger.error(
2479 "Unexpected exception at _delete_task task={}: {}".format(
2480 task["task_id"], e
2481 ),
2482 exc_info=True,
2483 )
2484
2485 try:
2486 if db_vim_info_update:
2487 db_vim_update = db_vim_info_update.copy()
2488 db_ro_task_update.update(
2489 {
2490 "vim_info." + k: v
2491 for k, v in db_vim_info_update.items()
2492 }
2493 )
2494 ro_task["vim_info"].update(db_vim_info_update)
2495
2496 if new_status:
2497 if task_action == "CREATE":
2498 task_status_create = new_status
2499 db_ro_task_update[task_path] = new_status
2500
2501 if target_update or db_vim_update:
2502 if target_update == "DELETE":
2503 self._update_target(task, None)
2504 elif target_update == "COPY_VIM_INFO":
2505 self._update_target(task, ro_task["vim_info"])
2506 else:
2507 self._update_target(task, db_vim_update)
2508
2509 except Exception as e:
2510 if (
2511 isinstance(e, DbException)
2512 and e.http_code == HTTPStatus.NOT_FOUND
2513 ):
2514 # if the vnfrs or nsrs has been removed from database, this task must be removed
2515 self.logger.debug(
2516 "marking to delete task={}".format(task["task_id"])
2517 )
2518 self.tasks_to_delete.append(task)
2519 else:
2520 self.logger.error(
2521 "Unexpected exception at _update_target task={}: {}".format(
2522 task["task_id"], e
2523 ),
2524 exc_info=True,
2525 )
2526
2527 locked_at = ro_task["locked_at"]
2528
2529 if lock_object:
2530 locked_at = [
2531 lock_object["locked_at"],
2532 lock_object["locked_at"] + self.task_locked_time,
2533 ]
2534 # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
2535 # contain exactly locked_at + self.task_locked_time
2536 LockRenew.remove_lock_object(lock_object)
2537
2538 q_filter = {
2539 "_id": ro_task["_id"],
2540 "to_check_at": ro_task["to_check_at"],
2541 "locked_at": locked_at,
2542 }
2543 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2544 # outside this task (by ro_nbi) do not update it
2545 db_ro_task_update["locked_by"] = None
2546 # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
2547 db_ro_task_update["locked_at"] = int(now - self.task_locked_time)
2548 db_ro_task_update["modified_at"] = now
2549 db_ro_task_update["to_check_at"] = next_check_at
2550
2551 """
2552 # Log RO tasks only when loglevel is DEBUG
2553 if self.logger.getEffectiveLevel() == logging.DEBUG:
2554 db_ro_task_update_log = db_ro_task_update.copy()
2555 db_ro_task_update_log["_id"] = q_filter["_id"]
2556 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2557 """
2558
2559 if not self.db.set_one(
2560 "ro_tasks",
2561 update_dict=db_ro_task_update,
2562 q_filter=q_filter,
2563 fail_on_empty=False,
2564 ):
2565 del db_ro_task_update["to_check_at"]
2566 del q_filter["to_check_at"]
2567 """
2568 # Log RO tasks only when loglevel is DEBUG
2569 if self.logger.getEffectiveLevel() == logging.DEBUG:
2570 self._log_ro_task(
2571 None,
2572 db_ro_task_update_log,
2573 None,
2574 "TASK_WF",
2575 "SET_TASK " + str(q_filter),
2576 )
2577 """
2578 self.db.set_one(
2579 "ro_tasks",
2580 q_filter=q_filter,
2581 update_dict=db_ro_task_update,
2582 fail_on_empty=True,
2583 )
2584 except DbException as e:
2585 self.logger.error(
2586 "ro_task={} Error updating database {}".format(ro_task_id, e)
2587 )
2588 except Exception as e:
2589 self.logger.error(
2590 "Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True
2591 )
2592
2593 def _update_target(self, task, ro_vim_item_update):
2594 table, _, temp = task["target_record"].partition(":")
2595 _id, _, path_vim_status = temp.partition(":")
2596 path_item = path_vim_status[: path_vim_status.rfind(".")]
2597 path_item = path_item[: path_item.rfind(".")]
2598 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
2599 # path_item: dot separated list targeting record information, e.g. "vdur.10"
2600
2601 if ro_vim_item_update:
2602 update_dict = {
2603 path_vim_status + "." + k: v
2604 for k, v in ro_vim_item_update.items()
2605 if k
2606 in (
2607 "vim_id",
2608 "vim_details",
2609 "vim_message",
2610 "vim_name",
2611 "vim_status",
2612 "interfaces",
2613 "interfaces_backup",
2614 )
2615 }
2616
2617 if path_vim_status.startswith("vdur."):
2618 # for backward compatibility, add vdur.name apart from vdur.vim_name
2619 if ro_vim_item_update.get("vim_name"):
2620 update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"]
2621
2622 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
2623 if ro_vim_item_update.get("vim_id"):
2624 update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"]
2625
2626 # update general status
2627 if ro_vim_item_update.get("vim_status"):
2628 update_dict[path_item + ".status"] = ro_vim_item_update[
2629 "vim_status"
2630 ]
2631
2632 if ro_vim_item_update.get("interfaces"):
2633 path_interfaces = path_item + ".interfaces"
2634
2635 for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
2636 if iface:
2637 update_dict.update(
2638 {
2639 path_interfaces + ".{}.".format(i) + k: v
2640 for k, v in iface.items()
2641 if k in ("vlan", "compute_node", "pci")
2642 }
2643 )
2644
2645 # put ip_address and mac_address with ip-address and mac-address
2646 if iface.get("ip_address"):
2647 update_dict[
2648 path_interfaces + ".{}.".format(i) + "ip-address"
2649 ] = iface["ip_address"]
2650
2651 if iface.get("mac_address"):
2652 update_dict[
2653 path_interfaces + ".{}.".format(i) + "mac-address"
2654 ] = iface["mac_address"]
2655
2656 if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
2657 update_dict["ip-address"] = iface.get("ip_address").split(
2658 ";"
2659 )[0]
2660
2661 if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
2662 update_dict[path_item + ".ip-address"] = iface.get(
2663 "ip_address"
2664 ).split(";")[0]
2665
2666 self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
2667
2668 # If interfaces exists, it backups VDU interfaces in the DB for healing operations
2669 if ro_vim_item_update.get("interfaces"):
2670 search_key = path_vim_status + ".interfaces"
2671 if update_dict.get(search_key):
2672 interfaces_backup_update = {
2673 path_vim_status + ".interfaces_backup": update_dict[search_key]
2674 }
2675
2676 self.db.set_one(
2677 table,
2678 q_filter={"_id": _id},
2679 update_dict=interfaces_backup_update,
2680 )
2681
2682 else:
2683 update_dict = {path_item + ".status": "DELETED"}
2684 self.db.set_one(
2685 table,
2686 q_filter={"_id": _id},
2687 update_dict=update_dict,
2688 unset={path_vim_status: None},
2689 )
2690
2691 def _process_delete_db_tasks(self):
2692 """
2693 Delete task from database because vnfrs or nsrs or both have been deleted
2694 :return: None. Uses and modify self.tasks_to_delete
2695 """
2696 while self.tasks_to_delete:
2697 task = self.tasks_to_delete[0]
2698 vnfrs_deleted = None
2699 nsr_id = task["nsr_id"]
2700
2701 if task["target_record"].startswith("vnfrs:"):
2702 # check if nsrs is present
2703 if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False):
2704 vnfrs_deleted = task["target_record"].split(":")[1]
2705
2706 try:
2707 self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted)
2708 except Exception as e:
2709 self.logger.error(
2710 "Error deleting task={}: {}".format(task["task_id"], e)
2711 )
2712 self.tasks_to_delete.pop(0)
2713
2714 @staticmethod
2715 def delete_db_tasks(db, nsr_id, vnfrs_deleted):
2716 """
2717 Static method because it is called from osm_ng_ro.ns
2718 :param db: instance of database to use
2719 :param nsr_id: affected nsrs id
2720 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2721 :return: None, exception is fails
2722 """
2723 retries = 5
2724 for retry in range(retries):
2725 ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
2726 now = time.time()
2727 conflict = False
2728
2729 for ro_task in ro_tasks:
2730 db_update = {}
2731 to_delete_ro_task = True
2732
2733 for index, task in enumerate(ro_task["tasks"]):
2734 if not task:
2735 pass
2736 elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or (
2737 vnfrs_deleted
2738 and task["target_record"].startswith("vnfrs:" + vnfrs_deleted)
2739 ):
2740 db_update["tasks.{}".format(index)] = None
2741 else:
2742 # used by other nsr, ro_task cannot be deleted
2743 to_delete_ro_task = False
2744
2745 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2746 if to_delete_ro_task:
2747 if not db.del_one(
2748 "ro_tasks",
2749 q_filter={
2750 "_id": ro_task["_id"],
2751 "modified_at": ro_task["modified_at"],
2752 },
2753 fail_on_empty=False,
2754 ):
2755 conflict = True
2756 elif db_update:
2757 db_update["modified_at"] = now
2758 if not db.set_one(
2759 "ro_tasks",
2760 q_filter={
2761 "_id": ro_task["_id"],
2762 "modified_at": ro_task["modified_at"],
2763 },
2764 update_dict=db_update,
2765 fail_on_empty=False,
2766 ):
2767 conflict = True
2768 if not conflict:
2769 return
2770 else:
2771 raise NsWorkerException("Exceeded {} retries".format(retries))
2772
2773 def run(self):
2774 # load database
2775 self.logger.info("Starting")
2776 while True:
2777 # step 1: get commands from queue
2778 try:
2779 if self.vim_targets:
2780 task = self.task_queue.get(block=False)
2781 else:
2782 if not self.idle:
2783 self.logger.debug("enters in idle state")
2784 self.idle = True
2785 task = self.task_queue.get(block=True)
2786 self.idle = False
2787
2788 if task[0] == "terminate":
2789 break
2790 elif task[0] == "load_vim":
2791 self.logger.info("order to load vim {}".format(task[1]))
2792 self._load_vim(task[1])
2793 elif task[0] == "unload_vim":
2794 self.logger.info("order to unload vim {}".format(task[1]))
2795 self._unload_vim(task[1])
2796 elif task[0] == "reload_vim":
2797 self._reload_vim(task[1])
2798 elif task[0] == "check_vim":
2799 self.logger.info("order to check vim {}".format(task[1]))
2800 self._check_vim(task[1])
2801 continue
2802 except Exception as e:
2803 if isinstance(e, queue.Empty):
2804 pass
2805 else:
2806 self.logger.critical(
2807 "Error processing task: {}".format(e), exc_info=True
2808 )
2809
2810 # step 2: process pending_tasks, delete not needed tasks
2811 try:
2812 if self.tasks_to_delete:
2813 self._process_delete_db_tasks()
2814 busy = False
2815 """
2816 # Log RO tasks only when loglevel is DEBUG
2817 if self.logger.getEffectiveLevel() == logging.DEBUG:
2818 _ = self._get_db_all_tasks()
2819 """
2820 ro_task = self._get_db_task()
2821 if ro_task:
2822 self.logger.warning("Task to process: {}".format(ro_task))
2823 time.sleep(1)
2824 self._process_pending_tasks(ro_task)
2825 busy = True
2826 if not busy:
2827 time.sleep(5)
2828 except Exception as e:
2829 self.logger.critical(
2830 "Unexpected exception at run: " + str(e), exc_info=True
2831 )
2832
2833 self.logger.info("Finishing")