1 # -*- coding: utf-8 -*-
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
27 from copy
import deepcopy
28 from http
import HTTPStatus
32 from shutil
import rmtree
36 from typing
import Dict
37 from unittest
.mock
import Mock
39 from importlib_metadata
import entry_points
40 from osm_common
.dbbase
import DbException
41 from osm_ng_ro
.vim_admin
import LockRenew
42 from osm_ro_plugin
import sdnconn
, vimconn
43 from osm_ro_plugin
.sdn_dummy
import SdnDummyConnector
44 from osm_ro_plugin
.vim_dummy
import VimDummyConnector
48 __author__
= "Alfonso Tierno"
49 __date__
= "$28-Sep-2017 12:07:15$"
52 def deep_get(target_dict
, *args
, **kwargs
):
54 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
55 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
56 :param target_dict: dictionary to be read
57 :param args: list of keys to read from target_dict
58 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
59 :return: The wanted value if exist, None or default otherwise
62 if not isinstance(target_dict
, dict) or key
not in target_dict
:
63 return kwargs
.get("default")
64 target_dict
= target_dict
[key
]
68 class NsWorkerException(Exception):
72 class FailingConnector
:
73 def __init__(self
, error_msg
):
74 self
.error_msg
= error_msg
76 for method
in dir(vimconn
.VimConnector
):
79 self
, method
, Mock(side_effect
=vimconn
.VimConnException(error_msg
))
82 for method
in dir(sdnconn
.SdnConnectorBase
):
85 self
, method
, Mock(side_effect
=sdnconn
.SdnConnectorError(error_msg
))
89 class NsWorkerExceptionNotFound(NsWorkerException
):
93 class VimInteractionBase
:
94 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
95 It implements methods that does nothing and return ok"""
97 def __init__(self
, db
, my_vims
, db_vims
, logger
):
100 self
.my_vims
= my_vims
101 self
.db_vims
= db_vims
103 def new(self
, ro_task
, task_index
, task_depends
):
106 def refresh(self
, ro_task
):
107 """skip calling VIM to get image, flavor status. Assumes ok"""
108 if ro_task
["vim_info"]["vim_status"] == "VIM_ERROR":
113 def delete(self
, ro_task
, task_index
):
114 """skip calling VIM to delete image. Assumes ok"""
117 def exec(self
, ro_task
, task_index
, task_depends
):
118 return "DONE", None, None
121 class VimInteractionNet(VimInteractionBase
):
122 def new(self
, ro_task
, task_index
, task_depends
):
124 task
= ro_task
["tasks"][task_index
]
125 task_id
= task
["task_id"]
128 target_vim
= self
.my_vims
[ro_task
["target_id"]]
130 mgmtnet_defined_in_vim
= False
134 if task
.get("find_params"):
135 # if management, get configuration of VIM
136 if task
["find_params"].get("filter_dict"):
137 vim_filter
= task
["find_params"]["filter_dict"]
139 elif task
["find_params"].get("mgmt"):
142 self
.db_vims
[ro_task
["target_id"]],
144 "management_network_id",
146 mgmtnet_defined_in_vim
= True
148 "id": self
.db_vims
[ro_task
["target_id"]]["config"][
149 "management_network_id"
153 self
.db_vims
[ro_task
["target_id"]],
155 "management_network_name",
157 mgmtnet_defined_in_vim
= True
159 "name": self
.db_vims
[ro_task
["target_id"]]["config"][
160 "management_network_name"
164 vim_filter
= {"name": task
["find_params"]["name"]}
166 raise NsWorkerExceptionNotFound(
167 "Invalid find_params for new_net {}".format(task
["find_params"])
170 vim_nets
= target_vim
.get_network_list(vim_filter
)
171 if not vim_nets
and not task
.get("params"):
172 # If there is mgmt-network in the descriptor,
173 # there is no mapping of that network to a VIM network in the descriptor,
174 # also there is no mapping in the "--config" parameter or at VIM creation;
175 # that mgmt-network will be created.
176 if mgmtnet
and not mgmtnet_defined_in_vim
:
178 vim_filter
.get("name")
179 if vim_filter
.get("name")
180 else vim_filter
.get("id")[:16]
182 vim_net_id
, created_items
= target_vim
.new_network(
186 "Created mgmt network vim_net_id: {}".format(vim_net_id
)
190 raise NsWorkerExceptionNotFound(
191 "Network not found with this criteria: '{}'".format(
192 task
.get("find_params")
195 elif len(vim_nets
) > 1:
196 raise NsWorkerException(
197 "More than one network found with this criteria: '{}'".format(
203 vim_net_id
= vim_nets
[0]["id"]
206 params
= task
["params"]
207 vim_net_id
, created_items
= target_vim
.new_network(**params
)
210 ro_vim_item_update
= {
211 "vim_id": vim_net_id
,
212 "vim_status": "BUILD",
214 "created_items": created_items
,
218 "task={} {} new-net={} created={}".format(
219 task_id
, ro_task
["target_id"], vim_net_id
, created
223 return "BUILD", ro_vim_item_update
224 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
226 "task={} vim={} new-net: {}".format(task_id
, ro_task
["target_id"], e
)
228 ro_vim_item_update
= {
229 "vim_status": "VIM_ERROR",
231 "vim_details": str(e
),
234 return "FAILED", ro_vim_item_update
236 def refresh(self
, ro_task
):
237 """Call VIM to get network status"""
238 ro_task_id
= ro_task
["_id"]
239 target_vim
= self
.my_vims
[ro_task
["target_id"]]
240 vim_id
= ro_task
["vim_info"]["vim_id"]
241 net_to_refresh_list
= [vim_id
]
244 vim_dict
= target_vim
.refresh_nets_status(net_to_refresh_list
)
245 vim_info
= vim_dict
[vim_id
]
247 if vim_info
["status"] == "ACTIVE":
249 elif vim_info
["status"] == "BUILD":
250 task_status
= "BUILD"
252 task_status
= "FAILED"
253 except vimconn
.VimConnException
as e
:
254 # Mark all tasks at VIM_ERROR status
256 "ro_task={} vim={} get-net={}: {}".format(
257 ro_task_id
, ro_task
["target_id"], vim_id
, e
260 vim_info
= {"status": "VIM_ERROR", "error_msg": str(e
)}
261 task_status
= "FAILED"
263 ro_vim_item_update
= {}
264 if ro_task
["vim_info"]["vim_status"] != vim_info
["status"]:
265 ro_vim_item_update
["vim_status"] = vim_info
["status"]
267 if ro_task
["vim_info"]["vim_name"] != vim_info
.get("name"):
268 ro_vim_item_update
["vim_name"] = vim_info
.get("name")
270 if vim_info
["status"] in ("ERROR", "VIM_ERROR"):
271 if ro_task
["vim_info"]["vim_details"] != vim_info
.get("error_msg"):
272 ro_vim_item_update
["vim_details"] = vim_info
.get("error_msg")
273 elif vim_info
["status"] == "DELETED":
274 ro_vim_item_update
["vim_id"] = None
275 ro_vim_item_update
["vim_details"] = "Deleted externally"
277 if ro_task
["vim_info"]["vim_details"] != vim_info
["vim_info"]:
278 ro_vim_item_update
["vim_details"] = vim_info
["vim_info"]
280 if ro_vim_item_update
:
282 "ro_task={} {} get-net={}: status={} {}".format(
284 ro_task
["target_id"],
286 ro_vim_item_update
.get("vim_status"),
287 ro_vim_item_update
.get("vim_details")
288 if ro_vim_item_update
.get("vim_status") != "ACTIVE"
293 return task_status
, ro_vim_item_update
295 def delete(self
, ro_task
, task_index
):
296 task
= ro_task
["tasks"][task_index
]
297 task_id
= task
["task_id"]
298 net_vim_id
= ro_task
["vim_info"]["vim_id"]
299 ro_vim_item_update_ok
= {
300 "vim_status": "DELETED",
302 "vim_details": "DELETED",
307 if net_vim_id
or ro_task
["vim_info"]["created_items"]:
308 target_vim
= self
.my_vims
[ro_task
["target_id"]]
309 target_vim
.delete_network(
310 net_vim_id
, ro_task
["vim_info"]["created_items"]
312 except vimconn
.VimConnNotFoundException
:
313 ro_vim_item_update_ok
["vim_details"] = "already deleted"
314 except vimconn
.VimConnException
as e
:
316 "ro_task={} vim={} del-net={}: {}".format(
317 ro_task
["_id"], ro_task
["target_id"], net_vim_id
, e
320 ro_vim_item_update
= {
321 "vim_status": "VIM_ERROR",
322 "vim_details": "Error while deleting: {}".format(e
),
325 return "FAILED", ro_vim_item_update
328 "task={} {} del-net={} {}".format(
330 ro_task
["target_id"],
332 ro_vim_item_update_ok
.get("vim_details", ""),
336 return "DONE", ro_vim_item_update_ok
339 class VimInteractionVdu(VimInteractionBase
):
340 max_retries_inject_ssh_key
= 20 # 20 times
341 time_retries_inject_ssh_key
= 30 # wevery 30 seconds
343 def new(self
, ro_task
, task_index
, task_depends
):
344 task
= ro_task
["tasks"][task_index
]
345 task_id
= task
["task_id"]
348 target_vim
= self
.my_vims
[ro_task
["target_id"]]
352 params
= task
["params"]
353 params_copy
= deepcopy(params
)
354 net_list
= params_copy
["net_list"]
357 # change task_id into network_id
358 if "net_id" in net
and net
["net_id"].startswith("TASK-"):
359 network_id
= task_depends
[net
["net_id"]]
362 raise NsWorkerException(
363 "Cannot create VM because depends on a network not created or found "
364 "for {}".format(net
["net_id"])
367 net
["net_id"] = network_id
369 if params_copy
["image_id"].startswith("TASK-"):
370 params_copy
["image_id"] = task_depends
[params_copy
["image_id"]]
372 if params_copy
["flavor_id"].startswith("TASK-"):
373 params_copy
["flavor_id"] = task_depends
[params_copy
["flavor_id"]]
375 affinity_group_list
= params_copy
["affinity_group_list"]
376 for affinity_group
in affinity_group_list
:
377 # change task_id into affinity_group_id
378 if "affinity_group_id" in affinity_group
and affinity_group
[
380 ].startswith("TASK-"):
381 affinity_group_id
= task_depends
[
382 affinity_group
["affinity_group_id"]
385 if not affinity_group_id
:
386 raise NsWorkerException(
387 "found for {}".format(affinity_group
["affinity_group_id"])
390 affinity_group
["affinity_group_id"] = affinity_group_id
392 vim_vm_id
, created_items
= target_vim
.new_vminstance(**params_copy
)
393 interfaces
= [iface
["vim_id"] for iface
in params_copy
["net_list"]]
395 ro_vim_item_update
= {
397 "vim_status": "BUILD",
399 "created_items": created_items
,
401 "interfaces_vim_ids": interfaces
,
405 "task={} {} new-vm={} created={}".format(
406 task_id
, ro_task
["target_id"], vim_vm_id
, created
410 return "BUILD", ro_vim_item_update
411 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
413 "task={} {} new-vm: {}".format(task_id
, ro_task
["target_id"], e
)
415 ro_vim_item_update
= {
416 "vim_status": "VIM_ERROR",
418 "vim_details": str(e
),
421 return "FAILED", ro_vim_item_update
423 def delete(self
, ro_task
, task_index
):
424 task
= ro_task
["tasks"][task_index
]
425 task_id
= task
["task_id"]
426 vm_vim_id
= ro_task
["vim_info"]["vim_id"]
427 ro_vim_item_update_ok
= {
428 "vim_status": "DELETED",
430 "vim_details": "DELETED",
435 if vm_vim_id
or ro_task
["vim_info"]["created_items"]:
436 target_vim
= self
.my_vims
[ro_task
["target_id"]]
437 target_vim
.delete_vminstance(
438 vm_vim_id
, ro_task
["vim_info"]["created_items"]
440 except vimconn
.VimConnNotFoundException
:
441 ro_vim_item_update_ok
["vim_details"] = "already deleted"
442 except vimconn
.VimConnException
as e
:
444 "ro_task={} vim={} del-vm={}: {}".format(
445 ro_task
["_id"], ro_task
["target_id"], vm_vim_id
, e
448 ro_vim_item_update
= {
449 "vim_status": "VIM_ERROR",
450 "vim_details": "Error while deleting: {}".format(e
),
453 return "FAILED", ro_vim_item_update
456 "task={} {} del-vm={} {}".format(
458 ro_task
["target_id"],
460 ro_vim_item_update_ok
.get("vim_details", ""),
464 return "DONE", ro_vim_item_update_ok
466 def refresh(self
, ro_task
):
467 """Call VIM to get vm status"""
468 ro_task_id
= ro_task
["_id"]
469 target_vim
= self
.my_vims
[ro_task
["target_id"]]
470 vim_id
= ro_task
["vim_info"]["vim_id"]
475 vm_to_refresh_list
= [vim_id
]
477 vim_dict
= target_vim
.refresh_vms_status(vm_to_refresh_list
)
478 vim_info
= vim_dict
[vim_id
]
480 if vim_info
["status"] == "ACTIVE":
482 elif vim_info
["status"] == "BUILD":
483 task_status
= "BUILD"
485 task_status
= "FAILED"
487 # try to load and parse vim_information
489 vim_info_info
= yaml
.safe_load(vim_info
["vim_info"])
490 if vim_info_info
.get("name"):
491 vim_info
["name"] = vim_info_info
["name"]
492 except Exception as vim_info_error
:
493 self
.logger
.exception(
494 f
"{vim_info_error} occured while getting the vim_info from yaml"
496 except vimconn
.VimConnException
as e
:
497 # Mark all tasks at VIM_ERROR status
499 "ro_task={} vim={} get-vm={}: {}".format(
500 ro_task_id
, ro_task
["target_id"], vim_id
, e
503 vim_info
= {"status": "VIM_ERROR", "error_msg": str(e
)}
504 task_status
= "FAILED"
506 ro_vim_item_update
= {}
508 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
510 if vim_info
.get("interfaces"):
511 for vim_iface_id
in ro_task
["vim_info"]["interfaces_vim_ids"]:
515 for iface
in vim_info
["interfaces"]
516 if vim_iface_id
== iface
["vim_interface_id"]
521 # iface.pop("vim_info", None)
522 vim_interfaces
.append(iface
)
526 for t
in ro_task
["tasks"]
527 if t
and t
["action"] == "CREATE" and t
["status"] != "FINISHED"
529 if vim_interfaces
and task_create
.get("mgmt_vnf_interface") is not None:
530 vim_interfaces
[task_create
["mgmt_vnf_interface"]][
534 mgmt_vdu_iface
= task_create
.get(
535 "mgmt_vdu_interface", task_create
.get("mgmt_vnf_interface", 0)
538 vim_interfaces
[mgmt_vdu_iface
]["mgmt_vdu_interface"] = True
540 if ro_task
["vim_info"]["interfaces"] != vim_interfaces
:
541 ro_vim_item_update
["interfaces"] = vim_interfaces
543 if ro_task
["vim_info"]["vim_status"] != vim_info
["status"]:
544 ro_vim_item_update
["vim_status"] = vim_info
["status"]
546 if ro_task
["vim_info"]["vim_name"] != vim_info
.get("name"):
547 ro_vim_item_update
["vim_name"] = vim_info
.get("name")
549 if vim_info
["status"] in ("ERROR", "VIM_ERROR"):
550 if ro_task
["vim_info"]["vim_details"] != vim_info
.get("error_msg"):
551 ro_vim_item_update
["vim_details"] = vim_info
.get("error_msg")
552 elif vim_info
["status"] == "DELETED":
553 ro_vim_item_update
["vim_id"] = None
554 ro_vim_item_update
["vim_details"] = "Deleted externally"
556 if ro_task
["vim_info"]["vim_details"] != vim_info
["vim_info"]:
557 ro_vim_item_update
["vim_details"] = vim_info
["vim_info"]
559 if ro_vim_item_update
:
561 "ro_task={} {} get-vm={}: status={} {}".format(
563 ro_task
["target_id"],
565 ro_vim_item_update
.get("vim_status"),
566 ro_vim_item_update
.get("vim_details")
567 if ro_vim_item_update
.get("vim_status") != "ACTIVE"
572 return task_status
, ro_vim_item_update
574 def exec(self
, ro_task
, task_index
, task_depends
):
575 task
= ro_task
["tasks"][task_index
]
576 task_id
= task
["task_id"]
577 target_vim
= self
.my_vims
[ro_task
["target_id"]]
578 db_task_update
= {"retries": 0}
579 retries
= task
.get("retries", 0)
582 params
= task
["params"]
583 params_copy
= deepcopy(params
)
584 params_copy
["ro_key"] = self
.db
.decrypt(
585 params_copy
.pop("private_key"),
586 params_copy
.pop("schema_version"),
587 params_copy
.pop("salt"),
589 params_copy
["ip_addr"] = params_copy
.pop("ip_address")
590 target_vim
.inject_user_key(**params_copy
)
592 "task={} {} action-vm=inject_key".format(task_id
, ro_task
["target_id"])
599 ) # params_copy["key"]
600 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
603 self
.logger
.debug(traceback
.format_exc())
604 if retries
< self
.max_retries_inject_ssh_key
:
610 "next_retry": self
.time_retries_inject_ssh_key
,
615 "task={} {} inject-ssh-key: {}".format(task_id
, ro_task
["target_id"], e
)
617 ro_vim_item_update
= {"vim_details": str(e
)}
619 return "FAILED", ro_vim_item_update
, db_task_update
622 class VimInteractionImage(VimInteractionBase
):
623 def new(self
, ro_task
, task_index
, task_depends
):
624 task
= ro_task
["tasks"][task_index
]
625 task_id
= task
["task_id"]
628 target_vim
= self
.my_vims
[ro_task
["target_id"]]
632 if task
.get("find_params"):
633 vim_images
= target_vim
.get_image_list(**task
["find_params"])
636 raise NsWorkerExceptionNotFound(
637 "Image not found with this criteria: '{}'".format(
641 elif len(vim_images
) > 1:
642 raise NsWorkerException(
643 "More than one image found with this criteria: '{}'".format(
648 vim_image_id
= vim_images
[0]["id"]
650 ro_vim_item_update
= {
651 "vim_id": vim_image_id
,
652 "vim_status": "DONE",
654 "created_items": created_items
,
658 "task={} {} new-image={} created={}".format(
659 task_id
, ro_task
["target_id"], vim_image_id
, created
663 return "DONE", ro_vim_item_update
664 except (NsWorkerException
, vimconn
.VimConnException
) as e
:
666 "task={} {} new-image: {}".format(task_id
, ro_task
["target_id"], e
)
668 ro_vim_item_update
= {
669 "vim_status": "VIM_ERROR",
671 "vim_details": str(e
),
674 return "FAILED", ro_vim_item_update
677 class VimInteractionFlavor(VimInteractionBase
):
678 def delete(self
, ro_task
, task_index
):
679 task
= ro_task
["tasks"][task_index
]
680 task_id
= task
["task_id"]
681 flavor_vim_id
= ro_task
["vim_info"]["vim_id"]
682 ro_vim_item_update_ok
= {
683 "vim_status": "DELETED",
685 "vim_details": "DELETED",
691 target_vim
= self
.my_vims
[ro_task
["target_id"]]
692 target_vim
.delete_flavor(flavor_vim_id
)
693 except vimconn
.VimConnNotFoundException
:
694 ro_vim_item_update_ok
["vim_details"] = "already deleted"
695 except vimconn
.VimConnException
as e
:
697 "ro_task={} vim={} del-flavor={}: {}".format(
698 ro_task
["_id"], ro_task
["target_id"], flavor_vim_id
, e
701 ro_vim_item_update
= {
702 "vim_status": "VIM_ERROR",
703 "vim_details": "Error while deleting: {}".format(e
),
706 return "FAILED", ro_vim_item_update
709 "task={} {} del-flavor={} {}".format(
711 ro_task
["target_id"],
713 ro_vim_item_update_ok
.get("vim_details", ""),
717 return "DONE", ro_vim_item_update_ok
719 def new(self
, ro_task
, task_index
, task_depends
):
720 task
= ro_task
["tasks"][task_index
]
721 task_id
= task
["task_id"]
724 target_vim
= self
.my_vims
[ro_task
["target_id"]]
730 if task
.get("find_params"):
732 flavor_data
= task
["find_params"]["flavor_data"]
733 vim_flavor_id
= target_vim
.get_flavor_id_from_data(flavor_data
)
734 except vimconn
.VimConnNotFoundException
:
735 self
.logger
.exception("VimConnNotFoundException occured.")
737 if not vim_flavor_id
and task
.get("params"):
739 flavor_data
= task
["params"]["flavor_data"]
740 vim_flavor_id
= target_vim
.new_flavor(flavor_data
)
743 ro_vim_item_update
= {
744 "vim_id": vim_flavor_id
,
745 "vim_status": "DONE",
747 "created_items": created_items
,
751 "task={} {} new-flavor={} created={}".format(
752 task_id
, ro_task
["target_id"], vim_flavor_id
, created
756 return "DONE", ro_vim_item_update
757 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
759 "task={} vim={} new-flavor: {}".format(task_id
, ro_task
["target_id"], e
)
761 ro_vim_item_update
= {
762 "vim_status": "VIM_ERROR",
764 "vim_details": str(e
),
767 return "FAILED", ro_vim_item_update
770 class VimInteractionAffinityGroup(VimInteractionBase
):
771 def delete(self
, ro_task
, task_index
):
772 task
= ro_task
["tasks"][task_index
]
773 task_id
= task
["task_id"]
774 affinity_group_vim_id
= ro_task
["vim_info"]["vim_id"]
775 ro_vim_item_update_ok
= {
776 "vim_status": "DELETED",
778 "vim_details": "DELETED",
783 if affinity_group_vim_id
:
784 target_vim
= self
.my_vims
[ro_task
["target_id"]]
785 target_vim
.delete_affinity_group(affinity_group_vim_id
)
786 except vimconn
.VimConnNotFoundException
:
787 ro_vim_item_update_ok
["vim_details"] = "already deleted"
788 except vimconn
.VimConnException
as e
:
790 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format(
791 ro_task
["_id"], ro_task
["target_id"], affinity_group_vim_id
, e
794 ro_vim_item_update
= {
795 "vim_status": "VIM_ERROR",
796 "vim_details": "Error while deleting: {}".format(e
),
799 return "FAILED", ro_vim_item_update
802 "task={} {} del-affinity-or-anti-affinity-group={} {}".format(
804 ro_task
["target_id"],
805 affinity_group_vim_id
,
806 ro_vim_item_update_ok
.get("vim_details", ""),
810 return "DONE", ro_vim_item_update_ok
812 def new(self
, ro_task
, task_index
, task_depends
):
813 task
= ro_task
["tasks"][task_index
]
814 task_id
= task
["task_id"]
817 target_vim
= self
.my_vims
[ro_task
["target_id"]]
820 affinity_group_vim_id
= None
821 affinity_group_data
= None
823 if task
.get("params"):
824 affinity_group_data
= task
["params"].get("affinity_group_data")
826 if affinity_group_data
and affinity_group_data
.get("vim-affinity-group-id"):
828 param_affinity_group_id
= task
["params"]["affinity_group_data"].get(
829 "vim-affinity-group-id"
831 affinity_group_vim_id
= target_vim
.get_affinity_group(
832 param_affinity_group_id
834 except vimconn
.VimConnNotFoundException
:
836 "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}"
837 "could not be found at VIM. Creating a new one.".format(
838 task_id
, ro_task
["target_id"], param_affinity_group_id
842 if not affinity_group_vim_id
and affinity_group_data
:
843 affinity_group_vim_id
= target_vim
.new_affinity_group(
848 ro_vim_item_update
= {
849 "vim_id": affinity_group_vim_id
,
850 "vim_status": "DONE",
852 "created_items": created_items
,
856 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format(
857 task_id
, ro_task
["target_id"], affinity_group_vim_id
, created
861 return "DONE", ro_vim_item_update
862 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
864 "task={} vim={} new-affinity-or-anti-affinity-group:"
865 " {}".format(task_id
, ro_task
["target_id"], e
)
867 ro_vim_item_update
= {
868 "vim_status": "VIM_ERROR",
870 "vim_details": str(e
),
873 return "FAILED", ro_vim_item_update
876 class VimInteractionSdnNet(VimInteractionBase
):
878 def _match_pci(port_pci
, mapping
):
880 Check if port_pci matches with mapping
881 mapping can have brackets to indicate that several chars are accepted. e.g
882 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
883 :param port_pci: text
884 :param mapping: text, can contain brackets to indicate several chars are available
885 :return: True if matches, False otherwise
887 if not port_pci
or not mapping
:
889 if port_pci
== mapping
:
895 bracket_start
= mapping
.find("[", mapping_index
)
897 if bracket_start
== -1:
900 bracket_end
= mapping
.find("]", bracket_start
)
901 if bracket_end
== -1:
904 length
= bracket_start
- mapping_index
907 and port_pci
[pci_index
: pci_index
+ length
]
908 != mapping
[mapping_index
:bracket_start
]
913 port_pci
[pci_index
+ length
]
914 not in mapping
[bracket_start
+ 1 : bracket_end
]
918 pci_index
+= length
+ 1
919 mapping_index
= bracket_end
+ 1
921 if port_pci
[pci_index
:] != mapping
[mapping_index
:]:
926 def _get_interfaces(self
, vlds_to_connect
, vim_account_id
):
928 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
929 :param vim_account_id:
934 for vld
in vlds_to_connect
:
935 table
, _
, db_id
= vld
.partition(":")
936 db_id
, _
, vld
= db_id
.partition(":")
937 _
, _
, vld_id
= vld
.partition(".")
940 q_filter
= {"vim-account-id": vim_account_id
, "_id": db_id
}
941 iface_key
= "vnf-vld-id"
942 else: # table == "nsrs"
943 q_filter
= {"vim-account-id": vim_account_id
, "nsr-id-ref": db_id
}
944 iface_key
= "ns-vld-id"
946 db_vnfrs
= self
.db
.get_list("vnfrs", q_filter
=q_filter
)
948 for db_vnfr
in db_vnfrs
:
949 for vdu_index
, vdur
in enumerate(db_vnfr
.get("vdur", ())):
950 for iface_index
, interface
in enumerate(vdur
["interfaces"]):
951 if interface
.get(iface_key
) == vld_id
and interface
.get(
953 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
955 interface_
= interface
.copy()
956 interface_
["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
957 db_vnfr
["_id"], vdu_index
, iface_index
960 if vdur
.get("status") == "ERROR":
961 interface_
["status"] = "ERROR"
963 interfaces
.append(interface_
)
967 def refresh(self
, ro_task
):
968 # look for task create
969 task_create_index
, _
= next(
971 for i_t
in enumerate(ro_task
["tasks"])
973 and i_t
[1]["action"] == "CREATE"
974 and i_t
[1]["status"] != "FINISHED"
977 return self
.new(ro_task
, task_create_index
, None)
979 def new(self
, ro_task
, task_index
, task_depends
):
980 task
= ro_task
["tasks"][task_index
]
981 task_id
= task
["task_id"]
982 target_vim
= self
.my_vims
[ro_task
["target_id"]]
984 sdn_net_id
= ro_task
["vim_info"]["vim_id"]
986 created_items
= ro_task
["vim_info"].get("created_items")
987 connected_ports
= ro_task
["vim_info"].get("connected_ports", [])
988 new_connected_ports
= []
989 last_update
= ro_task
["vim_info"].get("last_update", 0)
990 sdn_status
= ro_task
["vim_info"].get("vim_status", "BUILD") or "BUILD"
992 created
= ro_task
["vim_info"].get("created", False)
996 params
= task
["params"]
997 vlds_to_connect
= params
.get("vlds", [])
998 associated_vim
= params
.get("target_vim")
999 # external additional ports
1000 additional_ports
= params
.get("sdn-ports") or ()
1001 _
, _
, vim_account_id
= (
1003 if associated_vim
is None
1004 else associated_vim
.partition(":")
1008 # get associated VIM
1009 if associated_vim
not in self
.db_vims
:
1010 self
.db_vims
[associated_vim
] = self
.db
.get_one(
1011 "vim_accounts", {"_id": vim_account_id
}
1014 db_vim
= self
.db_vims
[associated_vim
]
1016 # look for ports to connect
1017 ports
= self
._get
_interfaces
(vlds_to_connect
, vim_account_id
)
1021 pending_ports
= error_ports
= 0
1023 sdn_need_update
= False
1026 vlan_used
= port
.get("vlan") or vlan_used
1028 # TODO. Do not connect if already done
1029 if not port
.get("compute_node") or not port
.get("pci"):
1030 if port
.get("status") == "ERROR":
1037 compute_node_mappings
= next(
1040 for c
in db_vim
["config"].get("sdn-port-mapping", ())
1041 if c
and c
["compute_node"] == port
["compute_node"]
1046 if compute_node_mappings
:
1047 # process port_mapping pci of type 0000:af:1[01].[1357]
1051 for p
in compute_node_mappings
["ports"]
1052 if self
._match
_pci
(port
["pci"], p
.get("pci"))
1058 if not db_vim
["config"].get("mapping_not_needed"):
1060 "Port mapping not found for compute_node={} pci={}".format(
1061 port
["compute_node"], port
["pci"]
1068 service_endpoint_id
= "{}:{}".format(port
["compute_node"], port
["pci"])
1070 "service_endpoint_id": pmap
.get("service_endpoint_id")
1071 or service_endpoint_id
,
1072 "service_endpoint_encapsulation_type": "dot1q"
1073 if port
["type"] == "SR-IOV"
1075 "service_endpoint_encapsulation_info": {
1076 "vlan": port
.get("vlan"),
1077 "mac": port
.get("mac-address"),
1078 "device_id": pmap
.get("device_id") or port
["compute_node"],
1079 "device_interface_id": pmap
.get("device_interface_id")
1081 "switch_dpid": pmap
.get("switch_id") or pmap
.get("switch_dpid"),
1082 "switch_port": pmap
.get("switch_port"),
1083 "service_mapping_info": pmap
.get("service_mapping_info"),
1088 # if port["modified_at"] > last_update:
1089 # sdn_need_update = True
1090 new_connected_ports
.append(port
["id"]) # TODO
1091 sdn_ports
.append(new_port
)
1095 "{} interfaces have not been created as VDU is on ERROR status".format(
1100 # connect external ports
1101 for index
, additional_port
in enumerate(additional_ports
):
1102 additional_port_id
= additional_port
.get(
1103 "service_endpoint_id"
1104 ) or "external-{}".format(index
)
1107 "service_endpoint_id": additional_port_id
,
1108 "service_endpoint_encapsulation_type": additional_port
.get(
1109 "service_endpoint_encapsulation_type", "dot1q"
1111 "service_endpoint_encapsulation_info": {
1112 "vlan": additional_port
.get("vlan") or vlan_used
,
1113 "mac": additional_port
.get("mac_address"),
1114 "device_id": additional_port
.get("device_id"),
1115 "device_interface_id": additional_port
.get(
1116 "device_interface_id"
1118 "switch_dpid": additional_port
.get("switch_dpid")
1119 or additional_port
.get("switch_id"),
1120 "switch_port": additional_port
.get("switch_port"),
1121 "service_mapping_info": additional_port
.get(
1122 "service_mapping_info"
1127 new_connected_ports
.append(additional_port_id
)
1130 # if there are more ports to connect or they have been modified, call create/update
1132 sdn_status
= "ERROR"
1133 sdn_info
= "; ".join(error_list
)
1134 elif set(connected_ports
) != set(new_connected_ports
) or sdn_need_update
:
1135 last_update
= time
.time()
1138 if len(sdn_ports
) < 2:
1139 sdn_status
= "ACTIVE"
1141 if not pending_ports
:
1143 "task={} {} new-sdn-net done, less than 2 ports".format(
1144 task_id
, ro_task
["target_id"]
1148 net_type
= params
.get("type") or "ELAN"
1152 ) = target_vim
.create_connectivity_service(net_type
, sdn_ports
)
1155 "task={} {} new-sdn-net={} created={}".format(
1156 task_id
, ro_task
["target_id"], sdn_net_id
, created
1160 created_items
= target_vim
.edit_connectivity_service(
1161 sdn_net_id
, conn_info
=created_items
, connection_points
=sdn_ports
1165 "task={} {} update-sdn-net={} created={}".format(
1166 task_id
, ro_task
["target_id"], sdn_net_id
, created
1170 connected_ports
= new_connected_ports
1172 wim_status_dict
= target_vim
.get_connectivity_service_status(
1173 sdn_net_id
, conn_info
=created_items
1175 sdn_status
= wim_status_dict
["sdn_status"]
1177 if wim_status_dict
.get("sdn_info"):
1178 sdn_info
= str(wim_status_dict
.get("sdn_info")) or ""
1180 if wim_status_dict
.get("error_msg"):
1181 sdn_info
= wim_status_dict
.get("error_msg") or ""
1184 if sdn_status
!= "ERROR":
1185 sdn_info
= "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1186 len(ports
) - pending_ports
, len(ports
)
1189 if sdn_status
== "ACTIVE":
1190 sdn_status
= "BUILD"
1192 ro_vim_item_update
= {
1193 "vim_id": sdn_net_id
,
1194 "vim_status": sdn_status
,
1196 "created_items": created_items
,
1197 "connected_ports": connected_ports
,
1198 "vim_details": sdn_info
,
1199 "last_update": last_update
,
1202 return sdn_status
, ro_vim_item_update
1203 except Exception as e
:
1205 "task={} vim={} new-net: {}".format(task_id
, ro_task
["target_id"], e
),
1206 exc_info
=not isinstance(
1207 e
, (sdnconn
.SdnConnectorError
, vimconn
.VimConnException
)
1210 ro_vim_item_update
= {
1211 "vim_status": "VIM_ERROR",
1213 "vim_details": str(e
),
1216 return "FAILED", ro_vim_item_update
1218 def delete(self
, ro_task
, task_index
):
1219 task
= ro_task
["tasks"][task_index
]
1220 task_id
= task
["task_id"]
1221 sdn_vim_id
= ro_task
["vim_info"].get("vim_id")
1222 ro_vim_item_update_ok
= {
1223 "vim_status": "DELETED",
1225 "vim_details": "DELETED",
1231 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1232 target_vim
.delete_connectivity_service(
1233 sdn_vim_id
, ro_task
["vim_info"].get("created_items")
1236 except Exception as e
:
1238 isinstance(e
, sdnconn
.SdnConnectorError
)
1239 and e
.http_code
== HTTPStatus
.NOT_FOUND
.value
1241 ro_vim_item_update_ok
["vim_details"] = "already deleted"
1244 "ro_task={} vim={} del-sdn-net={}: {}".format(
1245 ro_task
["_id"], ro_task
["target_id"], sdn_vim_id
, e
1247 exc_info
=not isinstance(
1248 e
, (sdnconn
.SdnConnectorError
, vimconn
.VimConnException
)
1251 ro_vim_item_update
= {
1252 "vim_status": "VIM_ERROR",
1253 "vim_details": "Error while deleting: {}".format(e
),
1256 return "FAILED", ro_vim_item_update
1259 "task={} {} del-sdn-net={} {}".format(
1261 ro_task
["target_id"],
1263 ro_vim_item_update_ok
.get("vim_details", ""),
1267 return "DONE", ro_vim_item_update_ok
1270 class ConfigValidate
:
1271 def __init__(self
, config
: Dict
):
1276 # default 1 min, allowed >= 60 or -1, -1 disables periodic checks
1278 self
.conf
["period"]["refresh_active"] >= 60
1279 or self
.conf
["period"]["refresh_active"] == -1
1281 return self
.conf
["period"]["refresh_active"]
1287 return self
.conf
["period"]["refresh_build"]
1291 return self
.conf
["period"]["refresh_image"]
1295 return self
.conf
["period"]["refresh_error"]
1298 def queue_size(self
):
1299 return self
.conf
["period"]["queue_size"]
1302 class NsWorker(threading
.Thread
):
1303 def __init__(self
, worker_index
, config
, plugins
, db
):
1306 :param worker_index: thread index
1307 :param config: general configuration of RO, among others the process_id with the docker id where it runs
1308 :param plugins: global shared dict with the loaded plugins
1309 :param db: database class instance to use
1311 threading
.Thread
.__init
__(self
)
1312 self
.config
= config
1313 self
.plugins
= plugins
1314 self
.plugin_name
= "unknown"
1315 self
.logger
= logging
.getLogger("ro.worker{}".format(worker_index
))
1316 self
.worker_index
= worker_index
1317 # refresh periods for created items
1318 self
.refresh_config
= ConfigValidate(config
)
1319 self
.task_queue
= queue
.Queue(self
.refresh_config
.queue_size
)
1320 # targetvim: vimplugin class
1322 # targetvim: vim information from database
1325 self
.vim_targets
= []
1326 self
.my_id
= config
["process_id"] + ":" + str(worker_index
)
1329 "net": VimInteractionNet(self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
),
1330 "vdu": VimInteractionVdu(self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
),
1331 "image": VimInteractionImage(
1332 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1334 "flavor": VimInteractionFlavor(
1335 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1337 "sdn_net": VimInteractionSdnNet(
1338 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1340 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
1341 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1344 self
.time_last_task_processed
= None
1345 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1346 self
.tasks_to_delete
= []
1347 # it is idle when there are not vim_targets associated
1349 self
.task_locked_time
= config
["global"]["task_locked_time"]
1351 def insert_task(self
, task
):
1353 self
.task_queue
.put(task
, False)
1356 raise NsWorkerException("timeout inserting a task")
1358 def terminate(self
):
1359 self
.insert_task("exit")
1361 def del_task(self
, task
):
1362 with self
.task_lock
:
1363 if task
["status"] == "SCHEDULED":
1364 task
["status"] = "SUPERSEDED"
1366 else: # task["status"] == "processing"
1367 self
.task_lock
.release()
1370 def _process_vim_config(self
, target_id
, db_vim
):
1372 Process vim config, creating vim configuration files as ca_cert
1373 :param target_id: vim/sdn/wim + id
1374 :param db_vim: Vim dictionary obtained from database
1375 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1377 if not db_vim
.get("config"):
1383 if db_vim
["config"].get("ca_cert_content"):
1384 file_name
= "{}:{}".format(target_id
, self
.worker_index
)
1388 except FileExistsError
:
1389 self
.logger
.exception(
1390 "FileExistsError occured while processing vim_config."
1393 file_name
= file_name
+ "/ca_cert"
1395 with
open(file_name
, "w") as f
:
1396 f
.write(db_vim
["config"]["ca_cert_content"])
1397 del db_vim
["config"]["ca_cert_content"]
1398 db_vim
["config"]["ca_cert"] = file_name
1399 except Exception as e
:
1400 raise NsWorkerException(
1401 "Error writing to file '{}': {}".format(file_name
, e
)
1404 def _load_plugin(self
, name
, type="vim"):
1405 # type can be vim or sdn
1406 if "rovim_dummy" not in self
.plugins
:
1407 self
.plugins
["rovim_dummy"] = VimDummyConnector
1409 if "rosdn_dummy" not in self
.plugins
:
1410 self
.plugins
["rosdn_dummy"] = SdnDummyConnector
1412 if name
in self
.plugins
:
1413 return self
.plugins
[name
]
1416 for ep
in entry_points(group
="osm_ro{}.plugins".format(type), name
=name
):
1417 self
.plugins
[name
] = ep
.load()
1418 except Exception as e
:
1419 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name
, e
))
1421 if name
and name
not in self
.plugins
:
1422 raise NsWorkerException(
1423 "Plugin 'osm_{n}' has not been installed".format(n
=name
)
1426 return self
.plugins
[name
]
1428 def _unload_vim(self
, target_id
):
1430 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1431 :param target_id: Contains type:_id; where type can be 'vim', ...
1435 self
.db_vims
.pop(target_id
, None)
1436 self
.my_vims
.pop(target_id
, None)
1438 if target_id
in self
.vim_targets
:
1439 self
.vim_targets
.remove(target_id
)
1441 self
.logger
.info("Unloaded {}".format(target_id
))
1442 rmtree("{}:{}".format(target_id
, self
.worker_index
))
1443 except FileNotFoundError
:
1444 # This is raised by rmtree if folder does not exist.
1445 self
.logger
.exception("FileNotFoundError occured while unloading VIM.")
1446 except Exception as e
:
1447 self
.logger
.error("Cannot unload {}: {}".format(target_id
, e
))
1449 def _check_vim(self
, target_id
):
1451 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1452 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1455 target
, _
, _id
= target_id
.partition(":")
1461 loaded
= target_id
in self
.vim_targets
1471 step
= "Getting {} from db".format(target_id
)
1472 db_vim
= self
.db
.get_one(target_database
, {"_id": _id
})
1474 for op_index
, operation
in enumerate(
1475 db_vim
["_admin"].get("operations", ())
1477 if operation
["operationState"] != "PROCESSING":
1480 locked_at
= operation
.get("locked_at")
1482 if locked_at
is not None and locked_at
>= now
- self
.task_locked_time
:
1483 # some other thread is doing this operation
1487 op_text
= "_admin.operations.{}.".format(op_index
)
1489 if not self
.db
.set_one(
1493 op_text
+ "operationState": "PROCESSING",
1494 op_text
+ "locked_at": locked_at
,
1497 op_text
+ "locked_at": now
,
1498 "admin.current_operation": op_index
,
1500 fail_on_empty
=False,
1504 unset_dict
[op_text
+ "locked_at"] = None
1505 unset_dict
["current_operation"] = None
1506 step
= "Loading " + target_id
1507 error_text
= self
._load
_vim
(target_id
)
1510 step
= "Checking connectivity"
1513 self
.my_vims
[target_id
].check_vim_connectivity()
1515 self
.my_vims
[target_id
].check_credentials()
1517 update_dict
["_admin.operationalState"] = "ENABLED"
1518 update_dict
["_admin.detailed-status"] = ""
1519 unset_dict
[op_text
+ "detailed-status"] = None
1520 update_dict
[op_text
+ "operationState"] = "COMPLETED"
1524 except Exception as e
:
1525 error_text
= "{}: {}".format(step
, e
)
1526 self
.logger
.error("{} for {}: {}".format(step
, target_id
, e
))
1529 if update_dict
or unset_dict
:
1531 update_dict
[op_text
+ "operationState"] = "FAILED"
1532 update_dict
[op_text
+ "detailed-status"] = error_text
1533 unset_dict
.pop(op_text
+ "detailed-status", None)
1534 update_dict
["_admin.operationalState"] = "ERROR"
1535 update_dict
["_admin.detailed-status"] = error_text
1538 update_dict
[op_text
+ "statusEnteredTime"] = now
1542 q_filter
={"_id": _id
},
1543 update_dict
=update_dict
,
1545 fail_on_empty
=False,
1549 self
._unload
_vim
(target_id
)
1551 def _reload_vim(self
, target_id
):
1552 if target_id
in self
.vim_targets
:
1553 self
._load
_vim
(target_id
)
1555 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1556 # just remove it to force load again next time it is needed
1557 self
.db_vims
.pop(target_id
, None)
1559 def _load_vim(self
, target_id
):
1561 Load or reload a vim_account, sdn_controller or wim_account.
1562 Read content from database, load the plugin if not loaded.
1563 In case of error loading the plugin, it load a failing VIM_connector
1564 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1565 :param target_id: Contains type:_id; where type can be 'vim', ...
1566 :return: None if ok, descriptive text if error
1568 target
, _
, _id
= target_id
.partition(":")
1580 step
= "Getting {}={} from db".format(target
, _id
)
1581 # TODO process for wim, sdnc, ...
1582 vim
= self
.db
.get_one(target_database
, {"_id": _id
})
1584 # if deep_get(vim, "config", "sdn-controller"):
1585 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1586 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1588 step
= "Decrypting password"
1589 schema_version
= vim
.get("schema_version")
1590 self
.db
.encrypt_decrypt_fields(
1593 fields
=("password", "secret"),
1594 schema_version
=schema_version
,
1597 self
._process
_vim
_config
(target_id
, vim
)
1600 plugin_name
= "rovim_" + vim
["vim_type"]
1601 step
= "Loading plugin '{}'".format(plugin_name
)
1602 vim_module_conn
= self
._load
_plugin
(plugin_name
)
1603 step
= "Loading {}'".format(target_id
)
1604 self
.my_vims
[target_id
] = vim_module_conn(
1607 tenant_id
=vim
.get("vim_tenant_id"),
1608 tenant_name
=vim
.get("vim_tenant_name"),
1611 user
=vim
["vim_user"],
1612 passwd
=vim
["vim_password"],
1613 config
=vim
.get("config") or {},
1617 plugin_name
= "rosdn_" + (vim
.get("type") or vim
.get("wim_type"))
1618 step
= "Loading plugin '{}'".format(plugin_name
)
1619 vim_module_conn
= self
._load
_plugin
(plugin_name
, "sdn")
1620 step
= "Loading {}'".format(target_id
)
1622 wim_config
= wim
.pop("config", {}) or {}
1623 wim
["uuid"] = wim
["_id"]
1624 if "url" in wim
and "wim_url" not in wim
:
1625 wim
["wim_url"] = wim
["url"]
1626 elif "url" not in wim
and "wim_url" in wim
:
1627 wim
["url"] = wim
["wim_url"]
1630 wim_config
["dpid"] = wim
.pop("dpid")
1632 if wim
.get("switch_id"):
1633 wim_config
["switch_id"] = wim
.pop("switch_id")
1635 # wim, wim_account, config
1636 self
.my_vims
[target_id
] = vim_module_conn(wim
, wim
, wim_config
)
1637 self
.db_vims
[target_id
] = vim
1638 self
.error_status
= None
1641 "Connector loaded for {}, plugin={}".format(target_id
, plugin_name
)
1643 except Exception as e
:
1645 "Cannot load {} plugin={}: {} {}".format(
1646 target_id
, plugin_name
, step
, e
1650 self
.db_vims
[target_id
] = vim
or {}
1651 self
.db_vims
[target_id
] = FailingConnector(str(e
))
1652 error_status
= "{} Error: {}".format(step
, e
)
1656 if target_id
not in self
.vim_targets
:
1657 self
.vim_targets
.append(target_id
)
1659 def _get_db_task(self
):
1661 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1666 if not self
.time_last_task_processed
:
1667 self
.time_last_task_processed
= now
1672 # Log RO tasks only when loglevel is DEBUG
1673 if self.logger.getEffectiveLevel() == logging.DEBUG:
1680 + str(self.task_locked_time)
1682 + "time_last_task_processed="
1683 + str(self.time_last_task_processed)
1689 locked
= self
.db
.set_one(
1692 "target_id": self
.vim_targets
,
1693 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1694 "locked_at.lt": now
- self
.task_locked_time
,
1695 "to_check_at.lt": self
.time_last_task_processed
,
1696 "to_check_at.gt": -1,
1698 update_dict
={"locked_by": self
.my_id
, "locked_at": now
},
1699 fail_on_empty
=False,
1704 ro_task
= self
.db
.get_one(
1707 "target_id": self
.vim_targets
,
1708 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1714 if self
.time_last_task_processed
== now
:
1715 self
.time_last_task_processed
= None
1718 self
.time_last_task_processed
= now
1719 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
1721 except DbException
as e
:
1722 self
.logger
.error("Database exception at _get_db_task: {}".format(e
))
1723 except Exception as e
:
1724 self
.logger
.critical(
1725 "Unexpected exception at _get_db_task: {}".format(e
), exc_info
=True
1730 def _get_db_all_tasks(self
):
1732 Read all content of table ro_tasks to log it
1736 # Checking the content of the BD:
1739 ro_task
= self
.db
.get_list("ro_tasks")
1741 self
._log
_ro
_task
(rt
, None, None, "TASK_WF", "GET_ALL_TASKS")
1744 except DbException
as e
:
1745 self
.logger
.error("Database exception at _get_db_all_tasks: {}".format(e
))
1746 except Exception as e
:
1747 self
.logger
.critical(
1748 "Unexpected exception at _get_db_all_tasks: {}".format(e
), exc_info
=True
1753 def _log_ro_task(self
, ro_task
, db_ro_task_update
, db_ro_task_delete
, mark
, event
):
1755 Generate a log with the following format:
1757 Mark;Event;ro_task_id;locked_at;modified_at;created_at;to_check_at;locked_by;
1758 target_id;vim_info.refresh_at;vim_info;no_of_tasks;task_status;action_id;
1759 task_array_index;task_id;task_action;task_item;task_args
1763 TASK_WF;GET_TASK;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;1642158724.8210013;
1764 1642158640.7986135;1642158640.7986135;1642158640.7986135;b134c9494e75:0a
1765 ;vim:b7ff9e24-8868-4d68-8a57-a59dc11d0327;None;{'created': False,
1766 'created_items': None, 'vim_id': None, 'vim_name': None, 'vim_status': None,
1767 'vim_details': None, 'refresh_at': None};1;SCHEDULED;
1768 888f1864-749a-4fc2-bc1a-97c0fffd6a6f;0;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;
1769 CREATE;image;{'filter_dict': {'name': 'ubuntu-os-cloud:image-family:ubuntu-1804-lts'}}
1774 if ro_task
is not None and isinstance(ro_task
, dict):
1775 for t
in ro_task
["tasks"]:
1779 line
.append(ro_task
.get("_id", ""))
1780 line
.append(str(ro_task
.get("locked_at", "")))
1781 line
.append(str(ro_task
.get("modified_at", "")))
1782 line
.append(str(ro_task
.get("created_at", "")))
1783 line
.append(str(ro_task
.get("to_check_at", "")))
1784 line
.append(str(ro_task
.get("locked_by", "")))
1785 line
.append(str(ro_task
.get("target_id", "")))
1786 line
.append(str(ro_task
.get("vim_info", {}).get("refresh_at", "")))
1787 line
.append(str(ro_task
.get("vim_info", "")))
1788 line
.append(str(ro_task
.get("tasks", "")))
1789 if isinstance(t
, dict):
1790 line
.append(str(t
.get("status", "")))
1791 line
.append(str(t
.get("action_id", "")))
1793 line
.append(str(t
.get("task_id", "")))
1794 line
.append(str(t
.get("action", "")))
1795 line
.append(str(t
.get("item", "")))
1796 line
.append(str(t
.get("find_params", "")))
1797 line
.append(str(t
.get("params", "")))
1799 line
.extend([""] * 2)
1801 line
.extend([""] * 5)
1804 self
.logger
.debug(";".join(line
))
1805 elif db_ro_task_update
is not None and isinstance(db_ro_task_update
, dict):
1808 st
= "tasks.{}.status".format(i
)
1809 if st
not in db_ro_task_update
:
1814 line
.append(db_ro_task_update
.get("_id", ""))
1815 line
.append(str(db_ro_task_update
.get("locked_at", "")))
1816 line
.append(str(db_ro_task_update
.get("modified_at", "")))
1818 line
.append(str(db_ro_task_update
.get("to_check_at", "")))
1819 line
.append(str(db_ro_task_update
.get("locked_by", "")))
1821 line
.append(str(db_ro_task_update
.get("vim_info.refresh_at", "")))
1823 line
.append(str(db_ro_task_update
.get("vim_info", "")))
1824 line
.append(str(str(db_ro_task_update
).count(".status")))
1825 line
.append(db_ro_task_update
.get(st
, ""))
1828 line
.extend([""] * 3)
1830 self
.logger
.debug(";".join(line
))
1832 elif db_ro_task_delete
is not None and isinstance(db_ro_task_delete
, dict):
1836 line
.append(db_ro_task_delete
.get("_id", ""))
1838 line
.append(db_ro_task_delete
.get("modified_at", ""))
1839 line
.extend([""] * 13)
1840 self
.logger
.debug(";".join(line
))
1846 line
.extend([""] * 16)
1847 self
.logger
.debug(";".join(line
))
1849 except Exception as e
:
1850 self
.logger
.error("Error logging ro_task: {}".format(e
))
1852 def _delete_task(self
, ro_task
, task_index
, task_depends
, db_update
):
1854 Determine if this task need to be done or superseded
1857 my_task
= ro_task
["tasks"][task_index
]
1858 task_id
= my_task
["task_id"]
1859 needed_delete
= ro_task
["vim_info"]["created"] or ro_task
["vim_info"].get(
1860 "created_items", False
1863 if my_task
["status"] == "FAILED":
1864 return None, None # TODO need to be retry??
1867 for index
, task
in enumerate(ro_task
["tasks"]):
1868 if index
== task_index
or not task
:
1872 my_task
["target_record"] == task
["target_record"]
1873 and task
["action"] == "CREATE"
1876 db_update
["tasks.{}.status".format(index
)] = task
[
1879 elif task
["action"] == "CREATE" and task
["status"] not in (
1883 needed_delete
= False
1886 return self
.item2class
[my_task
["item"]].delete(ro_task
, task_index
)
1888 return "SUPERSEDED", None
1889 except Exception as e
:
1890 if not isinstance(e
, NsWorkerException
):
1891 self
.logger
.critical(
1892 "Unexpected exception at _delete_task task={}: {}".format(
1898 return "FAILED", {"vim_status": "VIM_ERROR", "vim_details": str(e
)}
1900 def _create_task(self
, ro_task
, task_index
, task_depends
, db_update
):
1902 Determine if this task need to create something at VIM
1905 my_task
= ro_task
["tasks"][task_index
]
1906 task_id
= my_task
["task_id"]
1909 if my_task
["status"] == "FAILED":
1910 return None, None # TODO need to be retry??
1911 elif my_task
["status"] == "SCHEDULED":
1912 # check if already created by another task
1913 for index
, task
in enumerate(ro_task
["tasks"]):
1914 if index
== task_index
or not task
:
1917 if task
["action"] == "CREATE" and task
["status"] not in (
1922 return task
["status"], "COPY_VIM_INFO"
1925 task_status
, ro_vim_item_update
= self
.item2class
[my_task
["item"]].new(
1926 ro_task
, task_index
, task_depends
1928 # TODO update other CREATE tasks
1929 except Exception as e
:
1930 if not isinstance(e
, NsWorkerException
):
1932 "Error executing task={}: {}".format(task_id
, e
), exc_info
=True
1935 task_status
= "FAILED"
1936 ro_vim_item_update
= {"vim_status": "VIM_ERROR", "vim_details": str(e
)}
1937 # TODO update ro_vim_item_update
1939 return task_status
, ro_vim_item_update
1943 def _get_dependency(self
, task_id
, ro_task
=None, target_id
=None):
1945 Look for dependency task
1946 :param task_id: Can be one of
1947 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1948 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1949 3. task.task_id: "<action_id>:number"
1952 :return: database ro_task plus index of task
1955 task_id
.startswith("vim:")
1956 or task_id
.startswith("sdn:")
1957 or task_id
.startswith("wim:")
1959 target_id
, _
, task_id
= task_id
.partition(" ")
1961 if task_id
.startswith("nsrs:") or task_id
.startswith("vnfrs:"):
1962 ro_task_dependency
= self
.db
.get_one(
1964 q_filter
={"target_id": target_id
, "tasks.target_record_id": task_id
},
1965 fail_on_empty
=False,
1968 if ro_task_dependency
:
1969 for task_index
, task
in enumerate(ro_task_dependency
["tasks"]):
1970 if task
["target_record_id"] == task_id
:
1971 return ro_task_dependency
, task_index
1975 for task_index
, task
in enumerate(ro_task
["tasks"]):
1976 if task
and task
["task_id"] == task_id
:
1977 return ro_task
, task_index
1979 ro_task_dependency
= self
.db
.get_one(
1982 "tasks.ANYINDEX.task_id": task_id
,
1983 "tasks.ANYINDEX.target_record.ne": None,
1985 fail_on_empty
=False,
1988 if ro_task_dependency
:
1989 for task_index
, task
in ro_task_dependency
["tasks"]:
1990 if task
["task_id"] == task_id
:
1991 return ro_task_dependency
, task_index
1992 raise NsWorkerException("Cannot get depending task {}".format(task_id
))
1994 def update_vm_refresh(self
):
1995 """Enables the VM status updates if self.refresh_config.active parameter
1996 is not -1 and than updates the DB accordingly
2000 self
.logger
.debug("Checking if VM status update config")
2001 next_refresh
= time
.time()
2002 if self
.refresh_config
.active
== -1:
2005 next_refresh
+= self
.refresh_config
.active
2007 if next_refresh
!= -1:
2008 db_ro_task_update
= {}
2010 next_check_at
= now
+ (24 * 60 * 60)
2011 next_check_at
= min(next_check_at
, next_refresh
)
2012 db_ro_task_update
["vim_info.refresh_at"] = next_refresh
2013 db_ro_task_update
["to_check_at"] = next_check_at
2016 "Finding tasks which to be updated to enable VM status updates"
2018 refresh_tasks
= self
.db
.get_list(
2021 "tasks.status": "DONE",
2022 "to_check_at.lt": 0,
2025 self
.logger
.debug("Updating tasks to change the to_check_at status")
2026 for task
in refresh_tasks
:
2033 update_dict
=db_ro_task_update
,
2037 except Exception as e
:
2038 self
.logger
.error(f
"Error updating tasks to enable VM status updates: {e}")
2040 def _process_pending_tasks(self
, ro_task
):
2041 ro_task_id
= ro_task
["_id"]
2044 next_check_at
= now
+ (24 * 60 * 60)
2045 db_ro_task_update
= {}
2047 def _update_refresh(new_status
):
2048 # compute next_refresh
2050 nonlocal next_check_at
2051 nonlocal db_ro_task_update
2054 next_refresh
= time
.time()
2056 if task
["item"] in ("image", "flavor"):
2057 next_refresh
+= self
.refresh_config
.image
2058 elif new_status
== "BUILD":
2059 next_refresh
+= self
.refresh_config
.build
2060 elif new_status
== "DONE":
2061 if self
.refresh_config
.active
== -1:
2064 next_refresh
+= self
.refresh_config
.active
2066 next_refresh
+= self
.refresh_config
.error
2068 next_check_at
= min(next_check_at
, next_refresh
)
2069 db_ro_task_update
["vim_info.refresh_at"] = next_refresh
2070 ro_task
["vim_info"]["refresh_at"] = next_refresh
2074 # Log RO tasks only when loglevel is DEBUG
2075 if self.logger.getEffectiveLevel() == logging.DEBUG:
2076 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
2078 # Check if vim status refresh is enabled again
2079 self
.update_vm_refresh()
2080 # 0: get task_status_create
2082 task_status_create
= None
2086 for t
in ro_task
["tasks"]
2088 and t
["action"] == "CREATE"
2089 and t
["status"] in ("BUILD", "DONE")
2095 task_status_create
= task_create
["status"]
2097 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
2098 for task_action
in ("DELETE", "CREATE", "EXEC"):
2099 db_vim_update
= None
2102 for task_index
, task
in enumerate(ro_task
["tasks"]):
2104 continue # task deleted
2107 target_update
= None
2111 task_action
in ("DELETE", "EXEC")
2112 and task
["status"] not in ("SCHEDULED", "BUILD")
2114 or task
["action"] != task_action
2116 task_action
== "CREATE"
2117 and task
["status"] in ("FINISHED", "SUPERSEDED")
2122 task_path
= "tasks.{}.status".format(task_index
)
2124 db_vim_info_update
= None
2126 if task
["status"] == "SCHEDULED":
2127 # check if tasks that this depends on have been completed
2128 dependency_not_completed
= False
2130 for dependency_task_id
in task
.get("depends_on") or ():
2133 dependency_task_index
,
2134 ) = self
._get
_dependency
(
2135 dependency_task_id
, target_id
=ro_task
["target_id"]
2137 dependency_task
= dependency_ro_task
["tasks"][
2138 dependency_task_index
2141 if dependency_task
["status"] == "SCHEDULED":
2142 dependency_not_completed
= True
2143 next_check_at
= min(
2144 next_check_at
, dependency_ro_task
["to_check_at"]
2146 # must allow dependent task to be processed first
2147 # to do this set time after last_task_processed
2148 next_check_at
= max(
2149 self
.time_last_task_processed
, next_check_at
2152 elif dependency_task
["status"] == "FAILED":
2153 error_text
= "Cannot {} {} because depends on failed {} {} id={}): {}".format(
2156 dependency_task
["action"],
2157 dependency_task
["item"],
2159 dependency_ro_task
["vim_info"].get(
2164 "task={} {}".format(task
["task_id"], error_text
)
2166 raise NsWorkerException(error_text
)
2168 task_depends
[dependency_task_id
] = dependency_ro_task
[
2172 "TASK-{}".format(dependency_task_id
)
2173 ] = dependency_ro_task
["vim_info"]["vim_id"]
2175 if dependency_not_completed
:
2176 # TODO set at vim_info.vim_details that it is waiting
2179 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
2180 # the task of renew this locking. It will update database locket_at periodically
2182 lock_object
= LockRenew
.add_lock_object(
2183 "ro_tasks", ro_task
, self
2186 if task
["action"] == "DELETE":
2190 ) = self
._delete
_task
(
2191 ro_task
, task_index
, task_depends
, db_ro_task_update
2194 "FINISHED" if new_status
== "DONE" else new_status
2196 # ^with FINISHED instead of DONE it will not be refreshing
2198 if new_status
in ("FINISHED", "SUPERSEDED"):
2199 target_update
= "DELETE"
2200 elif task
["action"] == "EXEC":
2205 ) = self
.item2class
[task
["item"]].exec(
2206 ro_task
, task_index
, task_depends
2209 "FINISHED" if new_status
== "DONE" else new_status
2211 # ^with FINISHED instead of DONE it will not be refreshing
2214 # load into database the modified db_task_update "retries" and "next_retry"
2215 if db_task_update
.get("retries"):
2217 "tasks.{}.retries".format(task_index
)
2218 ] = db_task_update
["retries"]
2220 next_check_at
= time
.time() + db_task_update
.get(
2223 target_update
= None
2224 elif task
["action"] == "CREATE":
2225 if task
["status"] == "SCHEDULED":
2226 if task_status_create
:
2227 new_status
= task_status_create
2228 target_update
= "COPY_VIM_INFO"
2230 new_status
, db_vim_info_update
= self
.item2class
[
2232 ].new(ro_task
, task_index
, task_depends
)
2233 # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
2234 _update_refresh(new_status
)
2236 refresh_at
= ro_task
["vim_info"]["refresh_at"]
2237 if refresh_at
and refresh_at
!= -1 and now
> refresh_at
:
2238 new_status
, db_vim_info_update
= self
.item2class
[
2241 _update_refresh(new_status
)
2243 # The refresh is updated to avoid set the value of "refresh_at" to
2244 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2245 # because it can happen that in this case the task is never processed
2246 _update_refresh(task
["status"])
2248 except Exception as e
:
2249 new_status
= "FAILED"
2250 db_vim_info_update
= {
2251 "vim_status": "VIM_ERROR",
2252 "vim_details": str(e
),
2256 e
, (NsWorkerException
, vimconn
.VimConnException
)
2259 "Unexpected exception at _delete_task task={}: {}".format(
2266 if db_vim_info_update
:
2267 db_vim_update
= db_vim_info_update
.copy()
2268 db_ro_task_update
.update(
2271 for k
, v
in db_vim_info_update
.items()
2274 ro_task
["vim_info"].update(db_vim_info_update
)
2277 if task_action
== "CREATE":
2278 task_status_create
= new_status
2279 db_ro_task_update
[task_path
] = new_status
2281 if target_update
or db_vim_update
:
2282 if target_update
== "DELETE":
2283 self
._update
_target
(task
, None)
2284 elif target_update
== "COPY_VIM_INFO":
2285 self
._update
_target
(task
, ro_task
["vim_info"])
2287 self
._update
_target
(task
, db_vim_update
)
2289 except Exception as e
:
2291 isinstance(e
, DbException
)
2292 and e
.http_code
== HTTPStatus
.NOT_FOUND
2294 # if the vnfrs or nsrs has been removed from database, this task must be removed
2296 "marking to delete task={}".format(task
["task_id"])
2298 self
.tasks_to_delete
.append(task
)
2301 "Unexpected exception at _update_target task={}: {}".format(
2307 locked_at
= ro_task
["locked_at"]
2311 lock_object
["locked_at"],
2312 lock_object
["locked_at"] + self
.task_locked_time
,
2314 # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
2315 # contain exactly locked_at + self.task_locked_time
2316 LockRenew
.remove_lock_object(lock_object
)
2319 "_id": ro_task
["_id"],
2320 "to_check_at": ro_task
["to_check_at"],
2321 "locked_at": locked_at
,
2323 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2324 # outside this task (by ro_nbi) do not update it
2325 db_ro_task_update
["locked_by"] = None
2326 # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
2327 db_ro_task_update
["locked_at"] = int(now
- self
.task_locked_time
)
2328 db_ro_task_update
["modified_at"] = now
2329 db_ro_task_update
["to_check_at"] = next_check_at
2332 # Log RO tasks only when loglevel is DEBUG
2333 if self.logger.getEffectiveLevel() == logging.DEBUG:
2334 db_ro_task_update_log = db_ro_task_update.copy()
2335 db_ro_task_update_log["_id"] = q_filter["_id"]
2336 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2339 if not self
.db
.set_one(
2341 update_dict
=db_ro_task_update
,
2343 fail_on_empty
=False,
2345 del db_ro_task_update
["to_check_at"]
2346 del q_filter
["to_check_at"]
2348 # Log RO tasks only when loglevel is DEBUG
2349 if self.logger.getEffectiveLevel() == logging.DEBUG:
2352 db_ro_task_update_log,
2355 "SET_TASK " + str(q_filter),
2361 update_dict
=db_ro_task_update
,
2364 except DbException
as e
:
2366 "ro_task={} Error updating database {}".format(ro_task_id
, e
)
2368 except Exception as e
:
2370 "Error executing ro_task={}: {}".format(ro_task_id
, e
), exc_info
=True
2373 def _update_target(self
, task
, ro_vim_item_update
):
2374 table
, _
, temp
= task
["target_record"].partition(":")
2375 _id
, _
, path_vim_status
= temp
.partition(":")
2376 path_item
= path_vim_status
[: path_vim_status
.rfind(".")]
2377 path_item
= path_item
[: path_item
.rfind(".")]
2378 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
2379 # path_item: dot separated list targeting record information, e.g. "vdur.10"
2381 if ro_vim_item_update
:
2383 path_vim_status
+ "." + k
: v
2384 for k
, v
in ro_vim_item_update
.items()
2386 in ("vim_id", "vim_details", "vim_name", "vim_status", "interfaces")
2389 if path_vim_status
.startswith("vdur."):
2390 # for backward compatibility, add vdur.name apart from vdur.vim_name
2391 if ro_vim_item_update
.get("vim_name"):
2392 update_dict
[path_item
+ ".name"] = ro_vim_item_update
["vim_name"]
2394 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
2395 if ro_vim_item_update
.get("vim_id"):
2396 update_dict
[path_item
+ ".vim-id"] = ro_vim_item_update
["vim_id"]
2398 # update general status
2399 if ro_vim_item_update
.get("vim_status"):
2400 update_dict
[path_item
+ ".status"] = ro_vim_item_update
[
2404 if ro_vim_item_update
.get("interfaces"):
2405 path_interfaces
= path_item
+ ".interfaces"
2407 for i
, iface
in enumerate(ro_vim_item_update
.get("interfaces")):
2411 path_interfaces
+ ".{}.".format(i
) + k
: v
2412 for k
, v
in iface
.items()
2413 if k
in ("vlan", "compute_node", "pci")
2417 # put ip_address and mac_address with ip-address and mac-address
2418 if iface
.get("ip_address"):
2420 path_interfaces
+ ".{}.".format(i
) + "ip-address"
2421 ] = iface
["ip_address"]
2423 if iface
.get("mac_address"):
2425 path_interfaces
+ ".{}.".format(i
) + "mac-address"
2426 ] = iface
["mac_address"]
2428 if iface
.get("mgmt_vnf_interface") and iface
.get("ip_address"):
2429 update_dict
["ip-address"] = iface
.get("ip_address").split(
2433 if iface
.get("mgmt_vdu_interface") and iface
.get("ip_address"):
2434 update_dict
[path_item
+ ".ip-address"] = iface
.get(
2438 self
.db
.set_one(table
, q_filter
={"_id": _id
}, update_dict
=update_dict
)
2440 update_dict
= {path_item
+ ".status": "DELETED"}
2443 q_filter
={"_id": _id
},
2444 update_dict
=update_dict
,
2445 unset
={path_vim_status
: None},
2448 def _process_delete_db_tasks(self
):
2450 Delete task from database because vnfrs or nsrs or both have been deleted
2451 :return: None. Uses and modify self.tasks_to_delete
2453 while self
.tasks_to_delete
:
2454 task
= self
.tasks_to_delete
[0]
2455 vnfrs_deleted
= None
2456 nsr_id
= task
["nsr_id"]
2458 if task
["target_record"].startswith("vnfrs:"):
2459 # check if nsrs is present
2460 if self
.db
.get_one("nsrs", {"_id": nsr_id
}, fail_on_empty
=False):
2461 vnfrs_deleted
= task
["target_record"].split(":")[1]
2464 self
.delete_db_tasks(self
.db
, nsr_id
, vnfrs_deleted
)
2465 except Exception as e
:
2467 "Error deleting task={}: {}".format(task
["task_id"], e
)
2469 self
.tasks_to_delete
.pop(0)
2472 def delete_db_tasks(db
, nsr_id
, vnfrs_deleted
):
2474 Static method because it is called from osm_ng_ro.ns
2475 :param db: instance of database to use
2476 :param nsr_id: affected nsrs id
2477 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2478 :return: None, exception is fails
2481 for retry
in range(retries
):
2482 ro_tasks
= db
.get_list("ro_tasks", {"tasks.nsr_id": nsr_id
})
2486 for ro_task
in ro_tasks
:
2488 to_delete_ro_task
= True
2490 for index
, task
in enumerate(ro_task
["tasks"]):
2493 elif (not vnfrs_deleted
and task
["nsr_id"] == nsr_id
) or (
2495 and task
["target_record"].startswith("vnfrs:" + vnfrs_deleted
)
2497 db_update
["tasks.{}".format(index
)] = None
2499 # used by other nsr, ro_task cannot be deleted
2500 to_delete_ro_task
= False
2502 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2503 if to_delete_ro_task
:
2507 "_id": ro_task
["_id"],
2508 "modified_at": ro_task
["modified_at"],
2510 fail_on_empty
=False,
2514 db_update
["modified_at"] = now
2518 "_id": ro_task
["_id"],
2519 "modified_at": ro_task
["modified_at"],
2521 update_dict
=db_update
,
2522 fail_on_empty
=False,
2528 raise NsWorkerException("Exceeded {} retries".format(retries
))
2532 self
.logger
.info("Starting")
2534 # step 1: get commands from queue
2536 if self
.vim_targets
:
2537 task
= self
.task_queue
.get(block
=False)
2540 self
.logger
.debug("enters in idle state")
2542 task
= self
.task_queue
.get(block
=True)
2545 if task
[0] == "terminate":
2547 elif task
[0] == "load_vim":
2548 self
.logger
.info("order to load vim {}".format(task
[1]))
2549 self
._load
_vim
(task
[1])
2550 elif task
[0] == "unload_vim":
2551 self
.logger
.info("order to unload vim {}".format(task
[1]))
2552 self
._unload
_vim
(task
[1])
2553 elif task
[0] == "reload_vim":
2554 self
._reload
_vim
(task
[1])
2555 elif task
[0] == "check_vim":
2556 self
.logger
.info("order to check vim {}".format(task
[1]))
2557 self
._check
_vim
(task
[1])
2559 except Exception as e
:
2560 if isinstance(e
, queue
.Empty
):
2563 self
.logger
.critical(
2564 "Error processing task: {}".format(e
), exc_info
=True
2567 # step 2: process pending_tasks, delete not needed tasks
2569 if self
.tasks_to_delete
:
2570 self
._process
_delete
_db
_tasks
()
2573 # Log RO tasks only when loglevel is DEBUG
2574 if self.logger.getEffectiveLevel() == logging.DEBUG:
2575 _ = self._get_db_all_tasks()
2577 ro_task
= self
._get
_db
_task
()
2579 self
._process
_pending
_tasks
(ro_task
)
2583 except Exception as e
:
2584 self
.logger
.critical(
2585 "Unexpected exception at run: " + str(e
), exc_info
=True
2588 self
.logger
.info("Finishing")