1 # -*- coding: utf-8 -*-
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
27 from copy
import deepcopy
28 from http
import HTTPStatus
32 from shutil
import rmtree
36 from typing
import Dict
37 from unittest
.mock
import Mock
39 from importlib_metadata
import entry_points
40 from osm_common
.dbbase
import DbException
41 from osm_ng_ro
.vim_admin
import LockRenew
42 from osm_ro_plugin
import sdnconn
, vimconn
43 from osm_ro_plugin
.sdn_dummy
import SdnDummyConnector
44 from osm_ro_plugin
.vim_dummy
import VimDummyConnector
48 __author__
= "Alfonso Tierno"
49 __date__
= "$28-Sep-2017 12:07:15$"
52 def deep_get(target_dict
, *args
, **kwargs
):
54 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
55 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
56 :param target_dict: dictionary to be read
57 :param args: list of keys to read from target_dict
58 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
59 :return: The wanted value if exist, None or default otherwise
62 if not isinstance(target_dict
, dict) or key
not in target_dict
:
63 return kwargs
.get("default")
64 target_dict
= target_dict
[key
]
68 class NsWorkerException(Exception):
72 class FailingConnector
:
73 def __init__(self
, error_msg
):
74 self
.error_msg
= error_msg
76 for method
in dir(vimconn
.VimConnector
):
79 self
, method
, Mock(side_effect
=vimconn
.VimConnException(error_msg
))
82 for method
in dir(sdnconn
.SdnConnectorBase
):
85 self
, method
, Mock(side_effect
=sdnconn
.SdnConnectorError(error_msg
))
89 class NsWorkerExceptionNotFound(NsWorkerException
):
93 class VimInteractionBase
:
94 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
95 It implements methods that does nothing and return ok"""
97 def __init__(self
, db
, my_vims
, db_vims
, logger
):
100 self
.my_vims
= my_vims
101 self
.db_vims
= db_vims
103 def new(self
, ro_task
, task_index
, task_depends
):
106 def refresh(self
, ro_task
):
107 """skip calling VIM to get image, flavor status. Assumes ok"""
108 if ro_task
["vim_info"]["vim_status"] == "VIM_ERROR":
113 def delete(self
, ro_task
, task_index
):
114 """skip calling VIM to delete image. Assumes ok"""
117 def exec(self
, ro_task
, task_index
, task_depends
):
118 return "DONE", None, None
121 class VimInteractionNet(VimInteractionBase
):
122 def new(self
, ro_task
, task_index
, task_depends
):
124 task
= ro_task
["tasks"][task_index
]
125 task_id
= task
["task_id"]
128 target_vim
= self
.my_vims
[ro_task
["target_id"]]
130 mgmtnet_defined_in_vim
= False
134 if task
.get("find_params"):
135 # if management, get configuration of VIM
136 if task
["find_params"].get("filter_dict"):
137 vim_filter
= task
["find_params"]["filter_dict"]
139 elif task
["find_params"].get("mgmt"):
142 self
.db_vims
[ro_task
["target_id"]],
144 "management_network_id",
146 mgmtnet_defined_in_vim
= True
148 "id": self
.db_vims
[ro_task
["target_id"]]["config"][
149 "management_network_id"
153 self
.db_vims
[ro_task
["target_id"]],
155 "management_network_name",
157 mgmtnet_defined_in_vim
= True
159 "name": self
.db_vims
[ro_task
["target_id"]]["config"][
160 "management_network_name"
164 vim_filter
= {"name": task
["find_params"]["name"]}
166 raise NsWorkerExceptionNotFound(
167 "Invalid find_params for new_net {}".format(task
["find_params"])
170 vim_nets
= target_vim
.get_network_list(vim_filter
)
171 if not vim_nets
and not task
.get("params"):
172 # If there is mgmt-network in the descriptor,
173 # there is no mapping of that network to a VIM network in the descriptor,
174 # also there is no mapping in the "--config" parameter or at VIM creation;
175 # that mgmt-network will be created.
176 if mgmtnet
and not mgmtnet_defined_in_vim
:
178 vim_filter
.get("name")
179 if vim_filter
.get("name")
180 else vim_filter
.get("id")[:16]
182 vim_net_id
, created_items
= target_vim
.new_network(
186 "Created mgmt network vim_net_id: {}".format(vim_net_id
)
190 raise NsWorkerExceptionNotFound(
191 "Network not found with this criteria: '{}'".format(
192 task
.get("find_params")
195 elif len(vim_nets
) > 1:
196 raise NsWorkerException(
197 "More than one network found with this criteria: '{}'".format(
203 vim_net_id
= vim_nets
[0]["id"]
206 params
= task
["params"]
207 vim_net_id
, created_items
= target_vim
.new_network(**params
)
210 ro_vim_item_update
= {
211 "vim_id": vim_net_id
,
212 "vim_status": "BUILD",
214 "created_items": created_items
,
218 "task={} {} new-net={} created={}".format(
219 task_id
, ro_task
["target_id"], vim_net_id
, created
223 return "BUILD", ro_vim_item_update
224 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
226 "task={} vim={} new-net: {}".format(task_id
, ro_task
["target_id"], e
)
228 ro_vim_item_update
= {
229 "vim_status": "VIM_ERROR",
231 "vim_details": str(e
),
234 return "FAILED", ro_vim_item_update
236 def refresh(self
, ro_task
):
237 """Call VIM to get network status"""
238 ro_task_id
= ro_task
["_id"]
239 target_vim
= self
.my_vims
[ro_task
["target_id"]]
240 vim_id
= ro_task
["vim_info"]["vim_id"]
241 net_to_refresh_list
= [vim_id
]
244 vim_dict
= target_vim
.refresh_nets_status(net_to_refresh_list
)
245 vim_info
= vim_dict
[vim_id
]
247 if vim_info
["status"] == "ACTIVE":
249 elif vim_info
["status"] == "BUILD":
250 task_status
= "BUILD"
252 task_status
= "FAILED"
253 except vimconn
.VimConnException
as e
:
254 # Mark all tasks at VIM_ERROR status
256 "ro_task={} vim={} get-net={}: {}".format(
257 ro_task_id
, ro_task
["target_id"], vim_id
, e
260 vim_info
= {"status": "VIM_ERROR", "error_msg": str(e
)}
261 task_status
= "FAILED"
263 ro_vim_item_update
= {}
264 if ro_task
["vim_info"]["vim_status"] != vim_info
["status"]:
265 ro_vim_item_update
["vim_status"] = vim_info
["status"]
267 if ro_task
["vim_info"]["vim_name"] != vim_info
.get("name"):
268 ro_vim_item_update
["vim_name"] = vim_info
.get("name")
270 if vim_info
["status"] in ("ERROR", "VIM_ERROR"):
271 if ro_task
["vim_info"]["vim_details"] != vim_info
.get("error_msg"):
272 ro_vim_item_update
["vim_details"] = vim_info
.get("error_msg")
273 elif vim_info
["status"] == "DELETED":
274 ro_vim_item_update
["vim_id"] = None
275 ro_vim_item_update
["vim_details"] = "Deleted externally"
277 if ro_task
["vim_info"]["vim_details"] != vim_info
["vim_info"]:
278 ro_vim_item_update
["vim_details"] = vim_info
["vim_info"]
280 if ro_vim_item_update
:
282 "ro_task={} {} get-net={}: status={} {}".format(
284 ro_task
["target_id"],
286 ro_vim_item_update
.get("vim_status"),
287 ro_vim_item_update
.get("vim_details")
288 if ro_vim_item_update
.get("vim_status") != "ACTIVE"
293 return task_status
, ro_vim_item_update
295 def delete(self
, ro_task
, task_index
):
296 task
= ro_task
["tasks"][task_index
]
297 task_id
= task
["task_id"]
298 net_vim_id
= ro_task
["vim_info"]["vim_id"]
299 ro_vim_item_update_ok
= {
300 "vim_status": "DELETED",
302 "vim_details": "DELETED",
307 if net_vim_id
or ro_task
["vim_info"]["created_items"]:
308 target_vim
= self
.my_vims
[ro_task
["target_id"]]
309 target_vim
.delete_network(
310 net_vim_id
, ro_task
["vim_info"]["created_items"]
312 except vimconn
.VimConnNotFoundException
:
313 ro_vim_item_update_ok
["vim_details"] = "already deleted"
314 except vimconn
.VimConnException
as e
:
316 "ro_task={} vim={} del-net={}: {}".format(
317 ro_task
["_id"], ro_task
["target_id"], net_vim_id
, e
320 ro_vim_item_update
= {
321 "vim_status": "VIM_ERROR",
322 "vim_details": "Error while deleting: {}".format(e
),
325 return "FAILED", ro_vim_item_update
328 "task={} {} del-net={} {}".format(
330 ro_task
["target_id"],
332 ro_vim_item_update_ok
.get("vim_details", ""),
336 return "DONE", ro_vim_item_update_ok
339 class VimInteractionVdu(VimInteractionBase
):
340 max_retries_inject_ssh_key
= 20 # 20 times
341 time_retries_inject_ssh_key
= 30 # wevery 30 seconds
343 def new(self
, ro_task
, task_index
, task_depends
):
344 task
= ro_task
["tasks"][task_index
]
345 task_id
= task
["task_id"]
348 target_vim
= self
.my_vims
[ro_task
["target_id"]]
352 params
= task
["params"]
353 params_copy
= deepcopy(params
)
354 net_list
= params_copy
["net_list"]
357 # change task_id into network_id
358 if "net_id" in net
and net
["net_id"].startswith("TASK-"):
359 network_id
= task_depends
[net
["net_id"]]
362 raise NsWorkerException(
363 "Cannot create VM because depends on a network not created or found "
364 "for {}".format(net
["net_id"])
367 net
["net_id"] = network_id
369 if params_copy
["image_id"].startswith("TASK-"):
370 params_copy
["image_id"] = task_depends
[params_copy
["image_id"]]
372 if params_copy
["flavor_id"].startswith("TASK-"):
373 params_copy
["flavor_id"] = task_depends
[params_copy
["flavor_id"]]
375 affinity_group_list
= params_copy
["affinity_group_list"]
376 for affinity_group
in affinity_group_list
:
377 # change task_id into affinity_group_id
378 if "affinity_group_id" in affinity_group
and affinity_group
[
380 ].startswith("TASK-"):
381 affinity_group_id
= task_depends
[
382 affinity_group
["affinity_group_id"]
385 if not affinity_group_id
:
386 raise NsWorkerException(
387 "found for {}".format(affinity_group
["affinity_group_id"])
390 affinity_group
["affinity_group_id"] = affinity_group_id
392 vim_vm_id
, created_items
= target_vim
.new_vminstance(**params_copy
)
393 interfaces
= [iface
["vim_id"] for iface
in params_copy
["net_list"]]
395 ro_vim_item_update
= {
397 "vim_status": "BUILD",
399 "created_items": created_items
,
401 "interfaces_vim_ids": interfaces
,
405 "task={} {} new-vm={} created={}".format(
406 task_id
, ro_task
["target_id"], vim_vm_id
, created
410 return "BUILD", ro_vim_item_update
411 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
413 "task={} {} new-vm: {}".format(task_id
, ro_task
["target_id"], e
)
415 ro_vim_item_update
= {
416 "vim_status": "VIM_ERROR",
418 "vim_details": str(e
),
421 return "FAILED", ro_vim_item_update
423 def delete(self
, ro_task
, task_index
):
424 task
= ro_task
["tasks"][task_index
]
425 task_id
= task
["task_id"]
426 vm_vim_id
= ro_task
["vim_info"]["vim_id"]
427 ro_vim_item_update_ok
= {
428 "vim_status": "DELETED",
430 "vim_details": "DELETED",
435 if vm_vim_id
or ro_task
["vim_info"]["created_items"]:
436 target_vim
= self
.my_vims
[ro_task
["target_id"]]
437 target_vim
.delete_vminstance(
438 vm_vim_id
, ro_task
["vim_info"]["created_items"]
440 except vimconn
.VimConnNotFoundException
:
441 ro_vim_item_update_ok
["vim_details"] = "already deleted"
442 except vimconn
.VimConnException
as e
:
444 "ro_task={} vim={} del-vm={}: {}".format(
445 ro_task
["_id"], ro_task
["target_id"], vm_vim_id
, e
448 ro_vim_item_update
= {
449 "vim_status": "VIM_ERROR",
450 "vim_details": "Error while deleting: {}".format(e
),
453 return "FAILED", ro_vim_item_update
456 "task={} {} del-vm={} {}".format(
458 ro_task
["target_id"],
460 ro_vim_item_update_ok
.get("vim_details", ""),
464 return "DONE", ro_vim_item_update_ok
466 def refresh(self
, ro_task
):
467 """Call VIM to get vm status"""
468 ro_task_id
= ro_task
["_id"]
469 target_vim
= self
.my_vims
[ro_task
["target_id"]]
470 vim_id
= ro_task
["vim_info"]["vim_id"]
475 vm_to_refresh_list
= [vim_id
]
477 vim_dict
= target_vim
.refresh_vms_status(vm_to_refresh_list
)
478 vim_info
= vim_dict
[vim_id
]
480 if vim_info
["status"] == "ACTIVE":
482 elif vim_info
["status"] == "BUILD":
483 task_status
= "BUILD"
485 task_status
= "FAILED"
487 # try to load and parse vim_information
489 vim_info_info
= yaml
.safe_load(vim_info
["vim_info"])
490 if vim_info_info
.get("name"):
491 vim_info
["name"] = vim_info_info
["name"]
494 except vimconn
.VimConnException
as e
:
495 # Mark all tasks at VIM_ERROR status
497 "ro_task={} vim={} get-vm={}: {}".format(
498 ro_task_id
, ro_task
["target_id"], vim_id
, e
501 vim_info
= {"status": "VIM_ERROR", "error_msg": str(e
)}
502 task_status
= "FAILED"
504 ro_vim_item_update
= {}
506 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
508 if vim_info
.get("interfaces"):
509 for vim_iface_id
in ro_task
["vim_info"]["interfaces_vim_ids"]:
513 for iface
in vim_info
["interfaces"]
514 if vim_iface_id
== iface
["vim_interface_id"]
519 # iface.pop("vim_info", None)
520 vim_interfaces
.append(iface
)
524 for t
in ro_task
["tasks"]
525 if t
and t
["action"] == "CREATE" and t
["status"] != "FINISHED"
527 if vim_interfaces
and task_create
.get("mgmt_vnf_interface") is not None:
528 vim_interfaces
[task_create
["mgmt_vnf_interface"]][
532 mgmt_vdu_iface
= task_create
.get(
533 "mgmt_vdu_interface", task_create
.get("mgmt_vnf_interface", 0)
536 vim_interfaces
[mgmt_vdu_iface
]["mgmt_vdu_interface"] = True
538 if ro_task
["vim_info"]["interfaces"] != vim_interfaces
:
539 ro_vim_item_update
["interfaces"] = vim_interfaces
541 if ro_task
["vim_info"]["vim_status"] != vim_info
["status"]:
542 ro_vim_item_update
["vim_status"] = vim_info
["status"]
544 if ro_task
["vim_info"]["vim_name"] != vim_info
.get("name"):
545 ro_vim_item_update
["vim_name"] = vim_info
.get("name")
547 if vim_info
["status"] in ("ERROR", "VIM_ERROR"):
548 if ro_task
["vim_info"]["vim_details"] != vim_info
.get("error_msg"):
549 ro_vim_item_update
["vim_details"] = vim_info
.get("error_msg")
550 elif vim_info
["status"] == "DELETED":
551 ro_vim_item_update
["vim_id"] = None
552 ro_vim_item_update
["vim_details"] = "Deleted externally"
554 if ro_task
["vim_info"]["vim_details"] != vim_info
["vim_info"]:
555 ro_vim_item_update
["vim_details"] = vim_info
["vim_info"]
557 if ro_vim_item_update
:
559 "ro_task={} {} get-vm={}: status={} {}".format(
561 ro_task
["target_id"],
563 ro_vim_item_update
.get("vim_status"),
564 ro_vim_item_update
.get("vim_details")
565 if ro_vim_item_update
.get("vim_status") != "ACTIVE"
570 return task_status
, ro_vim_item_update
572 def exec(self
, ro_task
, task_index
, task_depends
):
573 task
= ro_task
["tasks"][task_index
]
574 task_id
= task
["task_id"]
575 target_vim
= self
.my_vims
[ro_task
["target_id"]]
576 db_task_update
= {"retries": 0}
577 retries
= task
.get("retries", 0)
580 params
= task
["params"]
581 params_copy
= deepcopy(params
)
582 params_copy
["ro_key"] = self
.db
.decrypt(
583 params_copy
.pop("private_key"),
584 params_copy
.pop("schema_version"),
585 params_copy
.pop("salt"),
587 params_copy
["ip_addr"] = params_copy
.pop("ip_address")
588 target_vim
.inject_user_key(**params_copy
)
590 "task={} {} action-vm=inject_key".format(task_id
, ro_task
["target_id"])
597 ) # params_copy["key"]
598 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
601 self
.logger
.debug(traceback
.format_exc())
602 if retries
< self
.max_retries_inject_ssh_key
:
608 "next_retry": self
.time_retries_inject_ssh_key
,
613 "task={} {} inject-ssh-key: {}".format(task_id
, ro_task
["target_id"], e
)
615 ro_vim_item_update
= {"vim_details": str(e
)}
617 return "FAILED", ro_vim_item_update
, db_task_update
620 class VimInteractionImage(VimInteractionBase
):
621 def new(self
, ro_task
, task_index
, task_depends
):
622 task
= ro_task
["tasks"][task_index
]
623 task_id
= task
["task_id"]
626 target_vim
= self
.my_vims
[ro_task
["target_id"]]
630 if task
.get("find_params"):
631 vim_images
= target_vim
.get_image_list(**task
["find_params"])
634 raise NsWorkerExceptionNotFound(
635 "Image not found with this criteria: '{}'".format(
639 elif len(vim_images
) > 1:
640 raise NsWorkerException(
641 "More than one image found with this criteria: '{}'".format(
646 vim_image_id
= vim_images
[0]["id"]
648 ro_vim_item_update
= {
649 "vim_id": vim_image_id
,
650 "vim_status": "DONE",
652 "created_items": created_items
,
656 "task={} {} new-image={} created={}".format(
657 task_id
, ro_task
["target_id"], vim_image_id
, created
661 return "DONE", ro_vim_item_update
662 except (NsWorkerException
, vimconn
.VimConnException
) as e
:
664 "task={} {} new-image: {}".format(task_id
, ro_task
["target_id"], e
)
666 ro_vim_item_update
= {
667 "vim_status": "VIM_ERROR",
669 "vim_details": str(e
),
672 return "FAILED", ro_vim_item_update
675 class VimInteractionFlavor(VimInteractionBase
):
676 def delete(self
, ro_task
, task_index
):
677 task
= ro_task
["tasks"][task_index
]
678 task_id
= task
["task_id"]
679 flavor_vim_id
= ro_task
["vim_info"]["vim_id"]
680 ro_vim_item_update_ok
= {
681 "vim_status": "DELETED",
683 "vim_details": "DELETED",
689 target_vim
= self
.my_vims
[ro_task
["target_id"]]
690 target_vim
.delete_flavor(flavor_vim_id
)
691 except vimconn
.VimConnNotFoundException
:
692 ro_vim_item_update_ok
["vim_details"] = "already deleted"
693 except vimconn
.VimConnException
as e
:
695 "ro_task={} vim={} del-flavor={}: {}".format(
696 ro_task
["_id"], ro_task
["target_id"], flavor_vim_id
, e
699 ro_vim_item_update
= {
700 "vim_status": "VIM_ERROR",
701 "vim_details": "Error while deleting: {}".format(e
),
704 return "FAILED", ro_vim_item_update
707 "task={} {} del-flavor={} {}".format(
709 ro_task
["target_id"],
711 ro_vim_item_update_ok
.get("vim_details", ""),
715 return "DONE", ro_vim_item_update_ok
717 def new(self
, ro_task
, task_index
, task_depends
):
718 task
= ro_task
["tasks"][task_index
]
719 task_id
= task
["task_id"]
722 target_vim
= self
.my_vims
[ro_task
["target_id"]]
728 if task
.get("find_params"):
730 flavor_data
= task
["find_params"]["flavor_data"]
731 vim_flavor_id
= target_vim
.get_flavor_id_from_data(flavor_data
)
732 except vimconn
.VimConnNotFoundException
:
735 if not vim_flavor_id
and task
.get("params"):
737 flavor_data
= task
["params"]["flavor_data"]
738 vim_flavor_id
= target_vim
.new_flavor(flavor_data
)
741 ro_vim_item_update
= {
742 "vim_id": vim_flavor_id
,
743 "vim_status": "DONE",
745 "created_items": created_items
,
749 "task={} {} new-flavor={} created={}".format(
750 task_id
, ro_task
["target_id"], vim_flavor_id
, created
754 return "DONE", ro_vim_item_update
755 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
757 "task={} vim={} new-flavor: {}".format(task_id
, ro_task
["target_id"], e
)
759 ro_vim_item_update
= {
760 "vim_status": "VIM_ERROR",
762 "vim_details": str(e
),
765 return "FAILED", ro_vim_item_update
768 class VimInteractionAffinityGroup(VimInteractionBase
):
769 def delete(self
, ro_task
, task_index
):
770 task
= ro_task
["tasks"][task_index
]
771 task_id
= task
["task_id"]
772 affinity_group_vim_id
= ro_task
["vim_info"]["vim_id"]
773 ro_vim_item_update_ok
= {
774 "vim_status": "DELETED",
776 "vim_details": "DELETED",
781 if affinity_group_vim_id
:
782 target_vim
= self
.my_vims
[ro_task
["target_id"]]
783 target_vim
.delete_affinity_group(affinity_group_vim_id
)
784 except vimconn
.VimConnNotFoundException
:
785 ro_vim_item_update_ok
["vim_details"] = "already deleted"
786 except vimconn
.VimConnException
as e
:
788 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format(
789 ro_task
["_id"], ro_task
["target_id"], affinity_group_vim_id
, e
792 ro_vim_item_update
= {
793 "vim_status": "VIM_ERROR",
794 "vim_details": "Error while deleting: {}".format(e
),
797 return "FAILED", ro_vim_item_update
800 "task={} {} del-affinity-or-anti-affinity-group={} {}".format(
802 ro_task
["target_id"],
803 affinity_group_vim_id
,
804 ro_vim_item_update_ok
.get("vim_details", ""),
808 return "DONE", ro_vim_item_update_ok
810 def new(self
, ro_task
, task_index
, task_depends
):
811 task
= ro_task
["tasks"][task_index
]
812 task_id
= task
["task_id"]
815 target_vim
= self
.my_vims
[ro_task
["target_id"]]
818 affinity_group_vim_id
= None
819 affinity_group_data
= None
821 if task
.get("params"):
822 affinity_group_data
= task
["params"].get("affinity_group_data")
824 if affinity_group_data
and affinity_group_data
.get("vim-affinity-group-id"):
826 param_affinity_group_id
= task
["params"]["affinity_group_data"].get(
827 "vim-affinity-group-id"
829 affinity_group_vim_id
= target_vim
.get_affinity_group(
830 param_affinity_group_id
832 except vimconn
.VimConnNotFoundException
:
834 "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}"
835 "could not be found at VIM. Creating a new one.".format(
836 task_id
, ro_task
["target_id"], param_affinity_group_id
840 if not affinity_group_vim_id
and affinity_group_data
:
841 affinity_group_vim_id
= target_vim
.new_affinity_group(
846 ro_vim_item_update
= {
847 "vim_id": affinity_group_vim_id
,
848 "vim_status": "DONE",
850 "created_items": created_items
,
854 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format(
855 task_id
, ro_task
["target_id"], affinity_group_vim_id
, created
859 return "DONE", ro_vim_item_update
860 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
862 "task={} vim={} new-affinity-or-anti-affinity-group:"
863 " {}".format(task_id
, ro_task
["target_id"], e
)
865 ro_vim_item_update
= {
866 "vim_status": "VIM_ERROR",
868 "vim_details": str(e
),
871 return "FAILED", ro_vim_item_update
874 class VimInteractionSdnNet(VimInteractionBase
):
876 def _match_pci(port_pci
, mapping
):
878 Check if port_pci matches with mapping
879 mapping can have brackets to indicate that several chars are accepted. e.g
880 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
881 :param port_pci: text
882 :param mapping: text, can contain brackets to indicate several chars are available
883 :return: True if matches, False otherwise
885 if not port_pci
or not mapping
:
887 if port_pci
== mapping
:
893 bracket_start
= mapping
.find("[", mapping_index
)
895 if bracket_start
== -1:
898 bracket_end
= mapping
.find("]", bracket_start
)
899 if bracket_end
== -1:
902 length
= bracket_start
- mapping_index
905 and port_pci
[pci_index
: pci_index
+ length
]
906 != mapping
[mapping_index
:bracket_start
]
911 port_pci
[pci_index
+ length
]
912 not in mapping
[bracket_start
+ 1 : bracket_end
]
916 pci_index
+= length
+ 1
917 mapping_index
= bracket_end
+ 1
919 if port_pci
[pci_index
:] != mapping
[mapping_index
:]:
924 def _get_interfaces(self
, vlds_to_connect
, vim_account_id
):
926 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
927 :param vim_account_id:
932 for vld
in vlds_to_connect
:
933 table
, _
, db_id
= vld
.partition(":")
934 db_id
, _
, vld
= db_id
.partition(":")
935 _
, _
, vld_id
= vld
.partition(".")
938 q_filter
= {"vim-account-id": vim_account_id
, "_id": db_id
}
939 iface_key
= "vnf-vld-id"
940 else: # table == "nsrs"
941 q_filter
= {"vim-account-id": vim_account_id
, "nsr-id-ref": db_id
}
942 iface_key
= "ns-vld-id"
944 db_vnfrs
= self
.db
.get_list("vnfrs", q_filter
=q_filter
)
946 for db_vnfr
in db_vnfrs
:
947 for vdu_index
, vdur
in enumerate(db_vnfr
.get("vdur", ())):
948 for iface_index
, interface
in enumerate(vdur
["interfaces"]):
949 if interface
.get(iface_key
) == vld_id
and interface
.get(
951 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
953 interface_
= interface
.copy()
954 interface_
["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
955 db_vnfr
["_id"], vdu_index
, iface_index
958 if vdur
.get("status") == "ERROR":
959 interface_
["status"] = "ERROR"
961 interfaces
.append(interface_
)
965 def refresh(self
, ro_task
):
966 # look for task create
967 task_create_index
, _
= next(
969 for i_t
in enumerate(ro_task
["tasks"])
971 and i_t
[1]["action"] == "CREATE"
972 and i_t
[1]["status"] != "FINISHED"
975 return self
.new(ro_task
, task_create_index
, None)
977 def new(self
, ro_task
, task_index
, task_depends
):
979 task
= ro_task
["tasks"][task_index
]
980 task_id
= task
["task_id"]
981 target_vim
= self
.my_vims
[ro_task
["target_id"]]
983 sdn_net_id
= ro_task
["vim_info"]["vim_id"]
985 created_items
= ro_task
["vim_info"].get("created_items")
986 connected_ports
= ro_task
["vim_info"].get("connected_ports", [])
987 new_connected_ports
= []
988 last_update
= ro_task
["vim_info"].get("last_update", 0)
989 sdn_status
= ro_task
["vim_info"].get("vim_status", "BUILD") or "BUILD"
991 created
= ro_task
["vim_info"].get("created", False)
995 params
= task
["params"]
996 vlds_to_connect
= params
["vlds"]
997 associated_vim
= params
["target_vim"]
998 # external additional ports
999 additional_ports
= params
.get("sdn-ports") or ()
1000 _
, _
, vim_account_id
= associated_vim
.partition(":")
1003 # get associated VIM
1004 if associated_vim
not in self
.db_vims
:
1005 self
.db_vims
[associated_vim
] = self
.db
.get_one(
1006 "vim_accounts", {"_id": vim_account_id
}
1009 db_vim
= self
.db_vims
[associated_vim
]
1011 # look for ports to connect
1012 ports
= self
._get
_interfaces
(vlds_to_connect
, vim_account_id
)
1016 pending_ports
= error_ports
= 0
1018 sdn_need_update
= False
1021 vlan_used
= port
.get("vlan") or vlan_used
1023 # TODO. Do not connect if already done
1024 if not port
.get("compute_node") or not port
.get("pci"):
1025 if port
.get("status") == "ERROR":
1032 compute_node_mappings
= next(
1035 for c
in db_vim
["config"].get("sdn-port-mapping", ())
1036 if c
and c
["compute_node"] == port
["compute_node"]
1041 if compute_node_mappings
:
1042 # process port_mapping pci of type 0000:af:1[01].[1357]
1046 for p
in compute_node_mappings
["ports"]
1047 if self
._match
_pci
(port
["pci"], p
.get("pci"))
1053 if not db_vim
["config"].get("mapping_not_needed"):
1055 "Port mapping not found for compute_node={} pci={}".format(
1056 port
["compute_node"], port
["pci"]
1063 service_endpoint_id
= "{}:{}".format(port
["compute_node"], port
["pci"])
1065 "service_endpoint_id": pmap
.get("service_endpoint_id")
1066 or service_endpoint_id
,
1067 "service_endpoint_encapsulation_type": "dot1q"
1068 if port
["type"] == "SR-IOV"
1070 "service_endpoint_encapsulation_info": {
1071 "vlan": port
.get("vlan"),
1072 "mac": port
.get("mac-address"),
1073 "device_id": pmap
.get("device_id") or port
["compute_node"],
1074 "device_interface_id": pmap
.get("device_interface_id")
1076 "switch_dpid": pmap
.get("switch_id") or pmap
.get("switch_dpid"),
1077 "switch_port": pmap
.get("switch_port"),
1078 "service_mapping_info": pmap
.get("service_mapping_info"),
1083 # if port["modified_at"] > last_update:
1084 # sdn_need_update = True
1085 new_connected_ports
.append(port
["id"]) # TODO
1086 sdn_ports
.append(new_port
)
1090 "{} interfaces have not been created as VDU is on ERROR status".format(
1095 # connect external ports
1096 for index
, additional_port
in enumerate(additional_ports
):
1097 additional_port_id
= additional_port
.get(
1098 "service_endpoint_id"
1099 ) or "external-{}".format(index
)
1102 "service_endpoint_id": additional_port_id
,
1103 "service_endpoint_encapsulation_type": additional_port
.get(
1104 "service_endpoint_encapsulation_type", "dot1q"
1106 "service_endpoint_encapsulation_info": {
1107 "vlan": additional_port
.get("vlan") or vlan_used
,
1108 "mac": additional_port
.get("mac_address"),
1109 "device_id": additional_port
.get("device_id"),
1110 "device_interface_id": additional_port
.get(
1111 "device_interface_id"
1113 "switch_dpid": additional_port
.get("switch_dpid")
1114 or additional_port
.get("switch_id"),
1115 "switch_port": additional_port
.get("switch_port"),
1116 "service_mapping_info": additional_port
.get(
1117 "service_mapping_info"
1122 new_connected_ports
.append(additional_port_id
)
1125 # if there are more ports to connect or they have been modified, call create/update
1127 sdn_status
= "ERROR"
1128 sdn_info
= "; ".join(error_list
)
1129 elif set(connected_ports
) != set(new_connected_ports
) or sdn_need_update
:
1130 last_update
= time
.time()
1133 if len(sdn_ports
) < 2:
1134 sdn_status
= "ACTIVE"
1136 if not pending_ports
:
1138 "task={} {} new-sdn-net done, less than 2 ports".format(
1139 task_id
, ro_task
["target_id"]
1143 net_type
= params
.get("type") or "ELAN"
1147 ) = target_vim
.create_connectivity_service(net_type
, sdn_ports
)
1150 "task={} {} new-sdn-net={} created={}".format(
1151 task_id
, ro_task
["target_id"], sdn_net_id
, created
1155 created_items
= target_vim
.edit_connectivity_service(
1156 sdn_net_id
, conn_info
=created_items
, connection_points
=sdn_ports
1160 "task={} {} update-sdn-net={} created={}".format(
1161 task_id
, ro_task
["target_id"], sdn_net_id
, created
1165 connected_ports
= new_connected_ports
1167 wim_status_dict
= target_vim
.get_connectivity_service_status(
1168 sdn_net_id
, conn_info
=created_items
1170 sdn_status
= wim_status_dict
["sdn_status"]
1172 if wim_status_dict
.get("sdn_info"):
1173 sdn_info
= str(wim_status_dict
.get("sdn_info")) or ""
1175 if wim_status_dict
.get("error_msg"):
1176 sdn_info
= wim_status_dict
.get("error_msg") or ""
1179 if sdn_status
!= "ERROR":
1180 sdn_info
= "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1181 len(ports
) - pending_ports
, len(ports
)
1184 if sdn_status
== "ACTIVE":
1185 sdn_status
= "BUILD"
1187 ro_vim_item_update
= {
1188 "vim_id": sdn_net_id
,
1189 "vim_status": sdn_status
,
1191 "created_items": created_items
,
1192 "connected_ports": connected_ports
,
1193 "vim_details": sdn_info
,
1194 "last_update": last_update
,
1197 return sdn_status
, ro_vim_item_update
1198 except Exception as e
:
1200 "task={} vim={} new-net: {}".format(task_id
, ro_task
["target_id"], e
),
1201 exc_info
=not isinstance(
1202 e
, (sdnconn
.SdnConnectorError
, vimconn
.VimConnException
)
1205 ro_vim_item_update
= {
1206 "vim_status": "VIM_ERROR",
1208 "vim_details": str(e
),
1211 return "FAILED", ro_vim_item_update
1213 def delete(self
, ro_task
, task_index
):
1214 task
= ro_task
["tasks"][task_index
]
1215 task_id
= task
["task_id"]
1216 sdn_vim_id
= ro_task
["vim_info"].get("vim_id")
1217 ro_vim_item_update_ok
= {
1218 "vim_status": "DELETED",
1220 "vim_details": "DELETED",
1226 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1227 target_vim
.delete_connectivity_service(
1228 sdn_vim_id
, ro_task
["vim_info"].get("created_items")
1231 except Exception as e
:
1233 isinstance(e
, sdnconn
.SdnConnectorError
)
1234 and e
.http_code
== HTTPStatus
.NOT_FOUND
.value
1236 ro_vim_item_update_ok
["vim_details"] = "already deleted"
1239 "ro_task={} vim={} del-sdn-net={}: {}".format(
1240 ro_task
["_id"], ro_task
["target_id"], sdn_vim_id
, e
1242 exc_info
=not isinstance(
1243 e
, (sdnconn
.SdnConnectorError
, vimconn
.VimConnException
)
1246 ro_vim_item_update
= {
1247 "vim_status": "VIM_ERROR",
1248 "vim_details": "Error while deleting: {}".format(e
),
1251 return "FAILED", ro_vim_item_update
1254 "task={} {} del-sdn-net={} {}".format(
1256 ro_task
["target_id"],
1258 ro_vim_item_update_ok
.get("vim_details", ""),
1262 return "DONE", ro_vim_item_update_ok
1265 class ConfigValidate
:
1266 def __init__(self
, config
: Dict
):
1271 # default 1 min, allowed >= 60 or -1, -1 disables periodic checks
1273 self
.conf
["period"]["refresh_active"] >= 60
1274 or self
.conf
["period"]["refresh_active"] == -1
1276 return self
.conf
["period"]["refresh_active"]
1282 return self
.conf
["period"]["refresh_build"]
1286 return self
.conf
["period"]["refresh_image"]
1290 return self
.conf
["period"]["refresh_error"]
1293 def queue_size(self
):
1294 return self
.conf
["period"]["queue_size"]
1297 class NsWorker(threading
.Thread
):
1298 def __init__(self
, worker_index
, config
, plugins
, db
):
1301 :param worker_index: thread index
1302 :param config: general configuration of RO, among others the process_id with the docker id where it runs
1303 :param plugins: global shared dict with the loaded plugins
1304 :param db: database class instance to use
1306 threading
.Thread
.__init
__(self
)
1307 self
.config
= config
1308 self
.plugins
= plugins
1309 self
.plugin_name
= "unknown"
1310 self
.logger
= logging
.getLogger("ro.worker{}".format(worker_index
))
1311 self
.worker_index
= worker_index
1312 # refresh periods for created items
1313 self
.refresh_config
= ConfigValidate(config
)
1314 self
.task_queue
= queue
.Queue(self
.refresh_config
.queue_size
)
1315 # targetvim: vimplugin class
1317 # targetvim: vim information from database
1320 self
.vim_targets
= []
1321 self
.my_id
= config
["process_id"] + ":" + str(worker_index
)
1324 "net": VimInteractionNet(self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
),
1325 "vdu": VimInteractionVdu(self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
),
1326 "image": VimInteractionImage(
1327 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1329 "flavor": VimInteractionFlavor(
1330 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1332 "sdn_net": VimInteractionSdnNet(
1333 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1335 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
1336 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1339 self
.time_last_task_processed
= None
1340 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1341 self
.tasks_to_delete
= []
1342 # it is idle when there are not vim_targets associated
1344 self
.task_locked_time
= config
["global"]["task_locked_time"]
1346 def insert_task(self
, task
):
1348 self
.task_queue
.put(task
, False)
1351 raise NsWorkerException("timeout inserting a task")
1353 def terminate(self
):
1354 self
.insert_task("exit")
1356 def del_task(self
, task
):
1357 with self
.task_lock
:
1358 if task
["status"] == "SCHEDULED":
1359 task
["status"] = "SUPERSEDED"
1361 else: # task["status"] == "processing"
1362 self
.task_lock
.release()
1365 def _process_vim_config(self
, target_id
, db_vim
):
1367 Process vim config, creating vim configuration files as ca_cert
1368 :param target_id: vim/sdn/wim + id
1369 :param db_vim: Vim dictionary obtained from database
1370 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1372 if not db_vim
.get("config"):
1378 if db_vim
["config"].get("ca_cert_content"):
1379 file_name
= "{}:{}".format(target_id
, self
.worker_index
)
1383 except FileExistsError
:
1386 file_name
= file_name
+ "/ca_cert"
1388 with
open(file_name
, "w") as f
:
1389 f
.write(db_vim
["config"]["ca_cert_content"])
1390 del db_vim
["config"]["ca_cert_content"]
1391 db_vim
["config"]["ca_cert"] = file_name
1392 except Exception as e
:
1393 raise NsWorkerException(
1394 "Error writing to file '{}': {}".format(file_name
, e
)
1397 def _load_plugin(self
, name
, type="vim"):
1398 # type can be vim or sdn
1399 if "rovim_dummy" not in self
.plugins
:
1400 self
.plugins
["rovim_dummy"] = VimDummyConnector
1402 if "rosdn_dummy" not in self
.plugins
:
1403 self
.plugins
["rosdn_dummy"] = SdnDummyConnector
1405 if name
in self
.plugins
:
1406 return self
.plugins
[name
]
1409 for ep
in entry_points(group
="osm_ro{}.plugins".format(type), name
=name
):
1410 self
.plugins
[name
] = ep
.load()
1411 except Exception as e
:
1412 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name
, e
))
1414 if name
and name
not in self
.plugins
:
1415 raise NsWorkerException(
1416 "Plugin 'osm_{n}' has not been installed".format(n
=name
)
1419 return self
.plugins
[name
]
1421 def _unload_vim(self
, target_id
):
1423 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1424 :param target_id: Contains type:_id; where type can be 'vim', ...
1428 self
.db_vims
.pop(target_id
, None)
1429 self
.my_vims
.pop(target_id
, None)
1431 if target_id
in self
.vim_targets
:
1432 self
.vim_targets
.remove(target_id
)
1434 self
.logger
.info("Unloaded {}".format(target_id
))
1435 rmtree("{}:{}".format(target_id
, self
.worker_index
))
1436 except FileNotFoundError
:
1437 pass # this is raised by rmtree if folder does not exist
1438 except Exception as e
:
1439 self
.logger
.error("Cannot unload {}: {}".format(target_id
, e
))
1441 def _check_vim(self
, target_id
):
1443 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1444 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1447 target
, _
, _id
= target_id
.partition(":")
1453 loaded
= target_id
in self
.vim_targets
1463 step
= "Getting {} from db".format(target_id
)
1464 db_vim
= self
.db
.get_one(target_database
, {"_id": _id
})
1466 for op_index
, operation
in enumerate(
1467 db_vim
["_admin"].get("operations", ())
1469 if operation
["operationState"] != "PROCESSING":
1472 locked_at
= operation
.get("locked_at")
1474 if locked_at
is not None and locked_at
>= now
- self
.task_locked_time
:
1475 # some other thread is doing this operation
1479 op_text
= "_admin.operations.{}.".format(op_index
)
1481 if not self
.db
.set_one(
1485 op_text
+ "operationState": "PROCESSING",
1486 op_text
+ "locked_at": locked_at
,
1489 op_text
+ "locked_at": now
,
1490 "admin.current_operation": op_index
,
1492 fail_on_empty
=False,
1496 unset_dict
[op_text
+ "locked_at"] = None
1497 unset_dict
["current_operation"] = None
1498 step
= "Loading " + target_id
1499 error_text
= self
._load
_vim
(target_id
)
1502 step
= "Checking connectivity"
1505 self
.my_vims
[target_id
].check_vim_connectivity()
1507 self
.my_vims
[target_id
].check_credentials()
1509 update_dict
["_admin.operationalState"] = "ENABLED"
1510 update_dict
["_admin.detailed-status"] = ""
1511 unset_dict
[op_text
+ "detailed-status"] = None
1512 update_dict
[op_text
+ "operationState"] = "COMPLETED"
1516 except Exception as e
:
1517 error_text
= "{}: {}".format(step
, e
)
1518 self
.logger
.error("{} for {}: {}".format(step
, target_id
, e
))
1521 if update_dict
or unset_dict
:
1523 update_dict
[op_text
+ "operationState"] = "FAILED"
1524 update_dict
[op_text
+ "detailed-status"] = error_text
1525 unset_dict
.pop(op_text
+ "detailed-status", None)
1526 update_dict
["_admin.operationalState"] = "ERROR"
1527 update_dict
["_admin.detailed-status"] = error_text
1530 update_dict
[op_text
+ "statusEnteredTime"] = now
1534 q_filter
={"_id": _id
},
1535 update_dict
=update_dict
,
1537 fail_on_empty
=False,
1541 self
._unload
_vim
(target_id
)
1543 def _reload_vim(self
, target_id
):
1544 if target_id
in self
.vim_targets
:
1545 self
._load
_vim
(target_id
)
1547 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1548 # just remove it to force load again next time it is needed
1549 self
.db_vims
.pop(target_id
, None)
1551 def _load_vim(self
, target_id
):
1553 Load or reload a vim_account, sdn_controller or wim_account.
1554 Read content from database, load the plugin if not loaded.
1555 In case of error loading the plugin, it load a failing VIM_connector
1556 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1557 :param target_id: Contains type:_id; where type can be 'vim', ...
1558 :return: None if ok, descriptive text if error
1560 target
, _
, _id
= target_id
.partition(":")
1572 step
= "Getting {}={} from db".format(target
, _id
)
1573 # TODO process for wim, sdnc, ...
1574 vim
= self
.db
.get_one(target_database
, {"_id": _id
})
1576 # if deep_get(vim, "config", "sdn-controller"):
1577 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1578 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1580 step
= "Decrypting password"
1581 schema_version
= vim
.get("schema_version")
1582 self
.db
.encrypt_decrypt_fields(
1585 fields
=("password", "secret"),
1586 schema_version
=schema_version
,
1589 self
._process
_vim
_config
(target_id
, vim
)
1592 plugin_name
= "rovim_" + vim
["vim_type"]
1593 step
= "Loading plugin '{}'".format(plugin_name
)
1594 vim_module_conn
= self
._load
_plugin
(plugin_name
)
1595 step
= "Loading {}'".format(target_id
)
1596 self
.my_vims
[target_id
] = vim_module_conn(
1599 tenant_id
=vim
.get("vim_tenant_id"),
1600 tenant_name
=vim
.get("vim_tenant_name"),
1603 user
=vim
["vim_user"],
1604 passwd
=vim
["vim_password"],
1605 config
=vim
.get("config") or {},
1609 plugin_name
= "rosdn_" + vim
["type"]
1610 step
= "Loading plugin '{}'".format(plugin_name
)
1611 vim_module_conn
= self
._load
_plugin
(plugin_name
, "sdn")
1612 step
= "Loading {}'".format(target_id
)
1614 wim_config
= wim
.pop("config", {}) or {}
1615 wim
["uuid"] = wim
["_id"]
1616 wim
["wim_url"] = wim
["url"]
1619 wim_config
["dpid"] = wim
.pop("dpid")
1621 if wim
.get("switch_id"):
1622 wim_config
["switch_id"] = wim
.pop("switch_id")
1624 # wim, wim_account, config
1625 self
.my_vims
[target_id
] = vim_module_conn(wim
, wim
, wim_config
)
1626 self
.db_vims
[target_id
] = vim
1627 self
.error_status
= None
1630 "Connector loaded for {}, plugin={}".format(target_id
, plugin_name
)
1632 except Exception as e
:
1634 "Cannot load {} plugin={}: {} {}".format(
1635 target_id
, plugin_name
, step
, e
1639 self
.db_vims
[target_id
] = vim
or {}
1640 self
.db_vims
[target_id
] = FailingConnector(str(e
))
1641 error_status
= "{} Error: {}".format(step
, e
)
1645 if target_id
not in self
.vim_targets
:
1646 self
.vim_targets
.append(target_id
)
1648 def _get_db_task(self
):
1650 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1655 if not self
.time_last_task_processed
:
1656 self
.time_last_task_processed
= now
1661 # Log RO tasks only when loglevel is DEBUG
1662 if self.logger.getEffectiveLevel() == logging.DEBUG:
1669 + str(self.task_locked_time)
1671 + "time_last_task_processed="
1672 + str(self.time_last_task_processed)
1678 locked
= self
.db
.set_one(
1681 "target_id": self
.vim_targets
,
1682 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1683 "locked_at.lt": now
- self
.task_locked_time
,
1684 "to_check_at.lt": self
.time_last_task_processed
,
1685 "to_check_at.gt": -1,
1687 update_dict
={"locked_by": self
.my_id
, "locked_at": now
},
1688 fail_on_empty
=False,
1693 ro_task
= self
.db
.get_one(
1696 "target_id": self
.vim_targets
,
1697 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1703 if self
.time_last_task_processed
== now
:
1704 self
.time_last_task_processed
= None
1707 self
.time_last_task_processed
= now
1708 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
1710 except DbException
as e
:
1711 self
.logger
.error("Database exception at _get_db_task: {}".format(e
))
1712 except Exception as e
:
1713 self
.logger
.critical(
1714 "Unexpected exception at _get_db_task: {}".format(e
), exc_info
=True
1719 def _get_db_all_tasks(self
):
1721 Read all content of table ro_tasks to log it
1725 # Checking the content of the BD:
1728 ro_task
= self
.db
.get_list("ro_tasks")
1730 self
._log
_ro
_task
(rt
, None, None, "TASK_WF", "GET_ALL_TASKS")
1733 except DbException
as e
:
1734 self
.logger
.error("Database exception at _get_db_all_tasks: {}".format(e
))
1735 except Exception as e
:
1736 self
.logger
.critical(
1737 "Unexpected exception at _get_db_all_tasks: {}".format(e
), exc_info
=True
1742 def _log_ro_task(self
, ro_task
, db_ro_task_update
, db_ro_task_delete
, mark
, event
):
1744 Generate a log with the following format:
1746 Mark;Event;ro_task_id;locked_at;modified_at;created_at;to_check_at;locked_by;
1747 target_id;vim_info.refresh_at;vim_info;no_of_tasks;task_status;action_id;
1748 task_array_index;task_id;task_action;task_item;task_args
1752 TASK_WF;GET_TASK;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;1642158724.8210013;
1753 1642158640.7986135;1642158640.7986135;1642158640.7986135;b134c9494e75:0a
1754 ;vim:b7ff9e24-8868-4d68-8a57-a59dc11d0327;None;{'created': False,
1755 'created_items': None, 'vim_id': None, 'vim_name': None, 'vim_status': None,
1756 'vim_details': None, 'refresh_at': None};1;SCHEDULED;
1757 888f1864-749a-4fc2-bc1a-97c0fffd6a6f;0;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;
1758 CREATE;image;{'filter_dict': {'name': 'ubuntu-os-cloud:image-family:ubuntu-1804-lts'}}
1763 if ro_task
is not None and isinstance(ro_task
, dict):
1764 for t
in ro_task
["tasks"]:
1768 line
.append(ro_task
.get("_id", ""))
1769 line
.append(str(ro_task
.get("locked_at", "")))
1770 line
.append(str(ro_task
.get("modified_at", "")))
1771 line
.append(str(ro_task
.get("created_at", "")))
1772 line
.append(str(ro_task
.get("to_check_at", "")))
1773 line
.append(str(ro_task
.get("locked_by", "")))
1774 line
.append(str(ro_task
.get("target_id", "")))
1775 line
.append(str(ro_task
.get("vim_info", {}).get("refresh_at", "")))
1776 line
.append(str(ro_task
.get("vim_info", "")))
1777 line
.append(str(ro_task
.get("tasks", "")))
1778 if isinstance(t
, dict):
1779 line
.append(str(t
.get("status", "")))
1780 line
.append(str(t
.get("action_id", "")))
1782 line
.append(str(t
.get("task_id", "")))
1783 line
.append(str(t
.get("action", "")))
1784 line
.append(str(t
.get("item", "")))
1785 line
.append(str(t
.get("find_params", "")))
1786 line
.append(str(t
.get("params", "")))
1788 line
.extend([""] * 2)
1790 line
.extend([""] * 5)
1793 self
.logger
.debug(";".join(line
))
1794 elif db_ro_task_update
is not None and isinstance(db_ro_task_update
, dict):
1797 st
= "tasks.{}.status".format(i
)
1798 if st
not in db_ro_task_update
:
1803 line
.append(db_ro_task_update
.get("_id", ""))
1804 line
.append(str(db_ro_task_update
.get("locked_at", "")))
1805 line
.append(str(db_ro_task_update
.get("modified_at", "")))
1807 line
.append(str(db_ro_task_update
.get("to_check_at", "")))
1808 line
.append(str(db_ro_task_update
.get("locked_by", "")))
1810 line
.append(str(db_ro_task_update
.get("vim_info.refresh_at", "")))
1812 line
.append(str(db_ro_task_update
.get("vim_info", "")))
1813 line
.append(str(str(db_ro_task_update
).count(".status")))
1814 line
.append(db_ro_task_update
.get(st
, ""))
1817 line
.extend([""] * 3)
1819 self
.logger
.debug(";".join(line
))
1821 elif db_ro_task_delete
is not None and isinstance(db_ro_task_delete
, dict):
1825 line
.append(db_ro_task_delete
.get("_id", ""))
1827 line
.append(db_ro_task_delete
.get("modified_at", ""))
1828 line
.extend([""] * 13)
1829 self
.logger
.debug(";".join(line
))
1835 line
.extend([""] * 16)
1836 self
.logger
.debug(";".join(line
))
1838 except Exception as e
:
1839 self
.logger
.error("Error logging ro_task: {}".format(e
))
1841 def _delete_task(self
, ro_task
, task_index
, task_depends
, db_update
):
1843 Determine if this task need to be done or superseded
1846 my_task
= ro_task
["tasks"][task_index
]
1847 task_id
= my_task
["task_id"]
1848 needed_delete
= ro_task
["vim_info"]["created"] or ro_task
["vim_info"].get(
1849 "created_items", False
1852 if my_task
["status"] == "FAILED":
1853 return None, None # TODO need to be retry??
1856 for index
, task
in enumerate(ro_task
["tasks"]):
1857 if index
== task_index
or not task
:
1861 my_task
["target_record"] == task
["target_record"]
1862 and task
["action"] == "CREATE"
1865 db_update
["tasks.{}.status".format(index
)] = task
[
1868 elif task
["action"] == "CREATE" and task
["status"] not in (
1872 needed_delete
= False
1875 return self
.item2class
[my_task
["item"]].delete(ro_task
, task_index
)
1877 return "SUPERSEDED", None
1878 except Exception as e
:
1879 if not isinstance(e
, NsWorkerException
):
1880 self
.logger
.critical(
1881 "Unexpected exception at _delete_task task={}: {}".format(
1887 return "FAILED", {"vim_status": "VIM_ERROR", "vim_details": str(e
)}
1889 def _create_task(self
, ro_task
, task_index
, task_depends
, db_update
):
1891 Determine if this task need to create something at VIM
1894 my_task
= ro_task
["tasks"][task_index
]
1895 task_id
= my_task
["task_id"]
1898 if my_task
["status"] == "FAILED":
1899 return None, None # TODO need to be retry??
1900 elif my_task
["status"] == "SCHEDULED":
1901 # check if already created by another task
1902 for index
, task
in enumerate(ro_task
["tasks"]):
1903 if index
== task_index
or not task
:
1906 if task
["action"] == "CREATE" and task
["status"] not in (
1911 return task
["status"], "COPY_VIM_INFO"
1914 task_status
, ro_vim_item_update
= self
.item2class
[my_task
["item"]].new(
1915 ro_task
, task_index
, task_depends
1917 # TODO update other CREATE tasks
1918 except Exception as e
:
1919 if not isinstance(e
, NsWorkerException
):
1921 "Error executing task={}: {}".format(task_id
, e
), exc_info
=True
1924 task_status
= "FAILED"
1925 ro_vim_item_update
= {"vim_status": "VIM_ERROR", "vim_details": str(e
)}
1926 # TODO update ro_vim_item_update
1928 return task_status
, ro_vim_item_update
1932 def _get_dependency(self
, task_id
, ro_task
=None, target_id
=None):
1934 Look for dependency task
1935 :param task_id: Can be one of
1936 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1937 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1938 3. task.task_id: "<action_id>:number"
1941 :return: database ro_task plus index of task
1944 task_id
.startswith("vim:")
1945 or task_id
.startswith("sdn:")
1946 or task_id
.startswith("wim:")
1948 target_id
, _
, task_id
= task_id
.partition(" ")
1950 if task_id
.startswith("nsrs:") or task_id
.startswith("vnfrs:"):
1951 ro_task_dependency
= self
.db
.get_one(
1953 q_filter
={"target_id": target_id
, "tasks.target_record_id": task_id
},
1954 fail_on_empty
=False,
1957 if ro_task_dependency
:
1958 for task_index
, task
in enumerate(ro_task_dependency
["tasks"]):
1959 if task
["target_record_id"] == task_id
:
1960 return ro_task_dependency
, task_index
1964 for task_index
, task
in enumerate(ro_task
["tasks"]):
1965 if task
and task
["task_id"] == task_id
:
1966 return ro_task
, task_index
1968 ro_task_dependency
= self
.db
.get_one(
1971 "tasks.ANYINDEX.task_id": task_id
,
1972 "tasks.ANYINDEX.target_record.ne": None,
1974 fail_on_empty
=False,
1977 if ro_task_dependency
:
1978 for task_index
, task
in ro_task_dependency
["tasks"]:
1979 if task
["task_id"] == task_id
:
1980 return ro_task_dependency
, task_index
1981 raise NsWorkerException("Cannot get depending task {}".format(task_id
))
1983 def update_vm_refresh(self
):
1984 """Enables the VM status updates if self.refresh_config.active parameter
1985 is not -1 and than updates the DB accordingly
1989 self
.logger
.debug("Checking if VM status update config")
1990 next_refresh
= time
.time()
1991 if self
.refresh_config
.active
== -1:
1994 next_refresh
+= self
.refresh_config
.active
1996 if next_refresh
!= -1:
1997 db_ro_task_update
= {}
1999 next_check_at
= now
+ (24 * 60 * 60)
2000 next_check_at
= min(next_check_at
, next_refresh
)
2001 db_ro_task_update
["vim_info.refresh_at"] = next_refresh
2002 db_ro_task_update
["to_check_at"] = next_check_at
2005 "Finding tasks which to be updated to enable VM status updates"
2007 refresh_tasks
= self
.db
.get_list(
2010 "tasks.status": "DONE",
2011 "to_check_at.lt": 0,
2014 self
.logger
.debug("Updating tasks to change the to_check_at status")
2015 for task
in refresh_tasks
:
2022 update_dict
=db_ro_task_update
,
2026 except Exception as e
:
2027 self
.logger
.error(f
"Error updating tasks to enable VM status updates: {e}")
2029 def _process_pending_tasks(self
, ro_task
):
2030 ro_task_id
= ro_task
["_id"]
2033 next_check_at
= now
+ (24 * 60 * 60)
2034 db_ro_task_update
= {}
2036 def _update_refresh(new_status
):
2037 # compute next_refresh
2039 nonlocal next_check_at
2040 nonlocal db_ro_task_update
2043 next_refresh
= time
.time()
2045 if task
["item"] in ("image", "flavor"):
2046 next_refresh
+= self
.refresh_config
.image
2047 elif new_status
== "BUILD":
2048 next_refresh
+= self
.refresh_config
.build
2049 elif new_status
== "DONE":
2050 if self
.refresh_config
.active
== -1:
2053 next_refresh
+= self
.refresh_config
.active
2055 next_refresh
+= self
.refresh_config
.error
2057 next_check_at
= min(next_check_at
, next_refresh
)
2058 db_ro_task_update
["vim_info.refresh_at"] = next_refresh
2059 ro_task
["vim_info"]["refresh_at"] = next_refresh
2063 # Log RO tasks only when loglevel is DEBUG
2064 if self.logger.getEffectiveLevel() == logging.DEBUG:
2065 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
2067 # Check if vim status refresh is enabled again
2068 self
.update_vm_refresh()
2069 # 0: get task_status_create
2071 task_status_create
= None
2075 for t
in ro_task
["tasks"]
2077 and t
["action"] == "CREATE"
2078 and t
["status"] in ("BUILD", "DONE")
2084 task_status_create
= task_create
["status"]
2086 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
2087 for task_action
in ("DELETE", "CREATE", "EXEC"):
2088 db_vim_update
= None
2091 for task_index
, task
in enumerate(ro_task
["tasks"]):
2093 continue # task deleted
2096 target_update
= None
2100 task_action
in ("DELETE", "EXEC")
2101 and task
["status"] not in ("SCHEDULED", "BUILD")
2103 or task
["action"] != task_action
2105 task_action
== "CREATE"
2106 and task
["status"] in ("FINISHED", "SUPERSEDED")
2111 task_path
= "tasks.{}.status".format(task_index
)
2113 db_vim_info_update
= None
2115 if task
["status"] == "SCHEDULED":
2116 # check if tasks that this depends on have been completed
2117 dependency_not_completed
= False
2119 for dependency_task_id
in task
.get("depends_on") or ():
2122 dependency_task_index
,
2123 ) = self
._get
_dependency
(
2124 dependency_task_id
, target_id
=ro_task
["target_id"]
2126 dependency_task
= dependency_ro_task
["tasks"][
2127 dependency_task_index
2130 if dependency_task
["status"] == "SCHEDULED":
2131 dependency_not_completed
= True
2132 next_check_at
= min(
2133 next_check_at
, dependency_ro_task
["to_check_at"]
2135 # must allow dependent task to be processed first
2136 # to do this set time after last_task_processed
2137 next_check_at
= max(
2138 self
.time_last_task_processed
, next_check_at
2141 elif dependency_task
["status"] == "FAILED":
2142 error_text
= "Cannot {} {} because depends on failed {} {} id={}): {}".format(
2145 dependency_task
["action"],
2146 dependency_task
["item"],
2148 dependency_ro_task
["vim_info"].get(
2153 "task={} {}".format(task
["task_id"], error_text
)
2155 raise NsWorkerException(error_text
)
2157 task_depends
[dependency_task_id
] = dependency_ro_task
[
2161 "TASK-{}".format(dependency_task_id
)
2162 ] = dependency_ro_task
["vim_info"]["vim_id"]
2164 if dependency_not_completed
:
2165 # TODO set at vim_info.vim_details that it is waiting
2168 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
2169 # the task of renew this locking. It will update database locket_at periodically
2171 lock_object
= LockRenew
.add_lock_object(
2172 "ro_tasks", ro_task
, self
2175 if task
["action"] == "DELETE":
2176 (new_status
, db_vim_info_update
,) = self
._delete
_task
(
2177 ro_task
, task_index
, task_depends
, db_ro_task_update
2180 "FINISHED" if new_status
== "DONE" else new_status
2182 # ^with FINISHED instead of DONE it will not be refreshing
2184 if new_status
in ("FINISHED", "SUPERSEDED"):
2185 target_update
= "DELETE"
2186 elif task
["action"] == "EXEC":
2191 ) = self
.item2class
[task
["item"]].exec(
2192 ro_task
, task_index
, task_depends
2195 "FINISHED" if new_status
== "DONE" else new_status
2197 # ^with FINISHED instead of DONE it will not be refreshing
2200 # load into database the modified db_task_update "retries" and "next_retry"
2201 if db_task_update
.get("retries"):
2203 "tasks.{}.retries".format(task_index
)
2204 ] = db_task_update
["retries"]
2206 next_check_at
= time
.time() + db_task_update
.get(
2209 target_update
= None
2210 elif task
["action"] == "CREATE":
2211 if task
["status"] == "SCHEDULED":
2212 if task_status_create
:
2213 new_status
= task_status_create
2214 target_update
= "COPY_VIM_INFO"
2216 new_status
, db_vim_info_update
= self
.item2class
[
2218 ].new(ro_task
, task_index
, task_depends
)
2219 # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
2220 _update_refresh(new_status
)
2222 refresh_at
= ro_task
["vim_info"]["refresh_at"]
2223 if refresh_at
and refresh_at
!= -1 and now
> refresh_at
:
2224 new_status
, db_vim_info_update
= self
.item2class
[
2227 _update_refresh(new_status
)
2229 # The refresh is updated to avoid set the value of "refresh_at" to
2230 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2231 # because it can happen that in this case the task is never processed
2232 _update_refresh(task
["status"])
2234 except Exception as e
:
2235 new_status
= "FAILED"
2236 db_vim_info_update
= {
2237 "vim_status": "VIM_ERROR",
2238 "vim_details": str(e
),
2242 e
, (NsWorkerException
, vimconn
.VimConnException
)
2245 "Unexpected exception at _delete_task task={}: {}".format(
2252 if db_vim_info_update
:
2253 db_vim_update
= db_vim_info_update
.copy()
2254 db_ro_task_update
.update(
2257 for k
, v
in db_vim_info_update
.items()
2260 ro_task
["vim_info"].update(db_vim_info_update
)
2263 if task_action
== "CREATE":
2264 task_status_create
= new_status
2265 db_ro_task_update
[task_path
] = new_status
2267 if target_update
or db_vim_update
:
2268 if target_update
== "DELETE":
2269 self
._update
_target
(task
, None)
2270 elif target_update
== "COPY_VIM_INFO":
2271 self
._update
_target
(task
, ro_task
["vim_info"])
2273 self
._update
_target
(task
, db_vim_update
)
2275 except Exception as e
:
2277 isinstance(e
, DbException
)
2278 and e
.http_code
== HTTPStatus
.NOT_FOUND
2280 # if the vnfrs or nsrs has been removed from database, this task must be removed
2282 "marking to delete task={}".format(task
["task_id"])
2284 self
.tasks_to_delete
.append(task
)
2287 "Unexpected exception at _update_target task={}: {}".format(
2293 locked_at
= ro_task
["locked_at"]
2297 lock_object
["locked_at"],
2298 lock_object
["locked_at"] + self
.task_locked_time
,
2300 # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
2301 # contain exactly locked_at + self.task_locked_time
2302 LockRenew
.remove_lock_object(lock_object
)
2305 "_id": ro_task
["_id"],
2306 "to_check_at": ro_task
["to_check_at"],
2307 "locked_at": locked_at
,
2309 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2310 # outside this task (by ro_nbi) do not update it
2311 db_ro_task_update
["locked_by"] = None
2312 # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
2313 db_ro_task_update
["locked_at"] = int(now
- self
.task_locked_time
)
2314 db_ro_task_update
["modified_at"] = now
2315 db_ro_task_update
["to_check_at"] = next_check_at
2318 # Log RO tasks only when loglevel is DEBUG
2319 if self.logger.getEffectiveLevel() == logging.DEBUG:
2320 db_ro_task_update_log = db_ro_task_update.copy()
2321 db_ro_task_update_log["_id"] = q_filter["_id"]
2322 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2325 if not self
.db
.set_one(
2327 update_dict
=db_ro_task_update
,
2329 fail_on_empty
=False,
2331 del db_ro_task_update
["to_check_at"]
2332 del q_filter
["to_check_at"]
2334 # Log RO tasks only when loglevel is DEBUG
2335 if self.logger.getEffectiveLevel() == logging.DEBUG:
2338 db_ro_task_update_log,
2341 "SET_TASK " + str(q_filter),
2347 update_dict
=db_ro_task_update
,
2350 except DbException
as e
:
2352 "ro_task={} Error updating database {}".format(ro_task_id
, e
)
2354 except Exception as e
:
2356 "Error executing ro_task={}: {}".format(ro_task_id
, e
), exc_info
=True
2359 def _update_target(self
, task
, ro_vim_item_update
):
2360 table
, _
, temp
= task
["target_record"].partition(":")
2361 _id
, _
, path_vim_status
= temp
.partition(":")
2362 path_item
= path_vim_status
[: path_vim_status
.rfind(".")]
2363 path_item
= path_item
[: path_item
.rfind(".")]
2364 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
2365 # path_item: dot separated list targeting record information, e.g. "vdur.10"
2367 if ro_vim_item_update
:
2369 path_vim_status
+ "." + k
: v
2370 for k
, v
in ro_vim_item_update
.items()
2372 in ("vim_id", "vim_details", "vim_name", "vim_status", "interfaces")
2375 if path_vim_status
.startswith("vdur."):
2376 # for backward compatibility, add vdur.name apart from vdur.vim_name
2377 if ro_vim_item_update
.get("vim_name"):
2378 update_dict
[path_item
+ ".name"] = ro_vim_item_update
["vim_name"]
2380 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
2381 if ro_vim_item_update
.get("vim_id"):
2382 update_dict
[path_item
+ ".vim-id"] = ro_vim_item_update
["vim_id"]
2384 # update general status
2385 if ro_vim_item_update
.get("vim_status"):
2386 update_dict
[path_item
+ ".status"] = ro_vim_item_update
[
2390 if ro_vim_item_update
.get("interfaces"):
2391 path_interfaces
= path_item
+ ".interfaces"
2393 for i
, iface
in enumerate(ro_vim_item_update
.get("interfaces")):
2397 path_interfaces
+ ".{}.".format(i
) + k
: v
2398 for k
, v
in iface
.items()
2399 if k
in ("vlan", "compute_node", "pci")
2403 # put ip_address and mac_address with ip-address and mac-address
2404 if iface
.get("ip_address"):
2406 path_interfaces
+ ".{}.".format(i
) + "ip-address"
2407 ] = iface
["ip_address"]
2409 if iface
.get("mac_address"):
2411 path_interfaces
+ ".{}.".format(i
) + "mac-address"
2412 ] = iface
["mac_address"]
2414 if iface
.get("mgmt_vnf_interface") and iface
.get("ip_address"):
2415 update_dict
["ip-address"] = iface
.get("ip_address").split(
2419 if iface
.get("mgmt_vdu_interface") and iface
.get("ip_address"):
2420 update_dict
[path_item
+ ".ip-address"] = iface
.get(
2424 self
.db
.set_one(table
, q_filter
={"_id": _id
}, update_dict
=update_dict
)
2426 update_dict
= {path_item
+ ".status": "DELETED"}
2429 q_filter
={"_id": _id
},
2430 update_dict
=update_dict
,
2431 unset
={path_vim_status
: None},
2434 def _process_delete_db_tasks(self
):
2436 Delete task from database because vnfrs or nsrs or both have been deleted
2437 :return: None. Uses and modify self.tasks_to_delete
2439 while self
.tasks_to_delete
:
2440 task
= self
.tasks_to_delete
[0]
2441 vnfrs_deleted
= None
2442 nsr_id
= task
["nsr_id"]
2444 if task
["target_record"].startswith("vnfrs:"):
2445 # check if nsrs is present
2446 if self
.db
.get_one("nsrs", {"_id": nsr_id
}, fail_on_empty
=False):
2447 vnfrs_deleted
= task
["target_record"].split(":")[1]
2450 self
.delete_db_tasks(self
.db
, nsr_id
, vnfrs_deleted
)
2451 except Exception as e
:
2453 "Error deleting task={}: {}".format(task
["task_id"], e
)
2455 self
.tasks_to_delete
.pop(0)
2458 def delete_db_tasks(db
, nsr_id
, vnfrs_deleted
):
2460 Static method because it is called from osm_ng_ro.ns
2461 :param db: instance of database to use
2462 :param nsr_id: affected nsrs id
2463 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2464 :return: None, exception is fails
2467 for retry
in range(retries
):
2468 ro_tasks
= db
.get_list("ro_tasks", {"tasks.nsr_id": nsr_id
})
2472 for ro_task
in ro_tasks
:
2474 to_delete_ro_task
= True
2476 for index
, task
in enumerate(ro_task
["tasks"]):
2479 elif (not vnfrs_deleted
and task
["nsr_id"] == nsr_id
) or (
2481 and task
["target_record"].startswith("vnfrs:" + vnfrs_deleted
)
2483 db_update
["tasks.{}".format(index
)] = None
2485 # used by other nsr, ro_task cannot be deleted
2486 to_delete_ro_task
= False
2488 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2489 if to_delete_ro_task
:
2493 "_id": ro_task
["_id"],
2494 "modified_at": ro_task
["modified_at"],
2496 fail_on_empty
=False,
2500 db_update
["modified_at"] = now
2504 "_id": ro_task
["_id"],
2505 "modified_at": ro_task
["modified_at"],
2507 update_dict
=db_update
,
2508 fail_on_empty
=False,
2514 raise NsWorkerException("Exceeded {} retries".format(retries
))
2518 self
.logger
.info("Starting")
2520 # step 1: get commands from queue
2522 if self
.vim_targets
:
2523 task
= self
.task_queue
.get(block
=False)
2526 self
.logger
.debug("enters in idle state")
2528 task
= self
.task_queue
.get(block
=True)
2531 if task
[0] == "terminate":
2533 elif task
[0] == "load_vim":
2534 self
.logger
.info("order to load vim {}".format(task
[1]))
2535 self
._load
_vim
(task
[1])
2536 elif task
[0] == "unload_vim":
2537 self
.logger
.info("order to unload vim {}".format(task
[1]))
2538 self
._unload
_vim
(task
[1])
2539 elif task
[0] == "reload_vim":
2540 self
._reload
_vim
(task
[1])
2541 elif task
[0] == "check_vim":
2542 self
.logger
.info("order to check vim {}".format(task
[1]))
2543 self
._check
_vim
(task
[1])
2545 except Exception as e
:
2546 if isinstance(e
, queue
.Empty
):
2549 self
.logger
.critical(
2550 "Error processing task: {}".format(e
), exc_info
=True
2553 # step 2: process pending_tasks, delete not needed tasks
2555 if self
.tasks_to_delete
:
2556 self
._process
_delete
_db
_tasks
()
2559 # Log RO tasks only when loglevel is DEBUG
2560 if self.logger.getEffectiveLevel() == logging.DEBUG:
2561 _ = self._get_db_all_tasks()
2563 ro_task
= self
._get
_db
_task
()
2565 self
._process
_pending
_tasks
(ro_task
)
2569 except Exception as e
:
2570 self
.logger
.critical(
2571 "Unexpected exception at run: " + str(e
), exc_info
=True
2574 self
.logger
.info("Finishing")