1 # -*- coding: utf-8 -*-
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
27 from copy
import deepcopy
28 from http
import HTTPStatus
32 from shutil
import rmtree
36 from typing
import Dict
37 from unittest
.mock
import Mock
39 from importlib_metadata
import entry_points
40 from osm_common
.dbbase
import DbException
41 from osm_ng_ro
.vim_admin
import LockRenew
42 from osm_ro_plugin
import sdnconn
, vimconn
43 from osm_ro_plugin
.sdn_dummy
import SdnDummyConnector
44 from osm_ro_plugin
.vim_dummy
import VimDummyConnector
48 __author__
= "Alfonso Tierno"
49 __date__
= "$28-Sep-2017 12:07:15$"
52 def deep_get(target_dict
, *args
, **kwargs
):
54 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
55 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
56 :param target_dict: dictionary to be read
57 :param args: list of keys to read from target_dict
58 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
59 :return: The wanted value if exist, None or default otherwise
62 if not isinstance(target_dict
, dict) or key
not in target_dict
:
63 return kwargs
.get("default")
64 target_dict
= target_dict
[key
]
68 class NsWorkerException(Exception):
72 class FailingConnector
:
73 def __init__(self
, error_msg
):
74 self
.error_msg
= error_msg
76 for method
in dir(vimconn
.VimConnector
):
79 self
, method
, Mock(side_effect
=vimconn
.VimConnException(error_msg
))
82 for method
in dir(sdnconn
.SdnConnectorBase
):
85 self
, method
, Mock(side_effect
=sdnconn
.SdnConnectorError(error_msg
))
89 class NsWorkerExceptionNotFound(NsWorkerException
):
93 class VimInteractionBase
:
94 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
95 It implements methods that does nothing and return ok"""
97 def __init__(self
, db
, my_vims
, db_vims
, logger
):
100 self
.my_vims
= my_vims
101 self
.db_vims
= db_vims
103 def new(self
, ro_task
, task_index
, task_depends
):
106 def refresh(self
, ro_task
):
107 """skip calling VIM to get image, flavor status. Assumes ok"""
108 if ro_task
["vim_info"]["vim_status"] == "VIM_ERROR":
113 def delete(self
, ro_task
, task_index
):
114 """skip calling VIM to delete image. Assumes ok"""
117 def exec(self
, ro_task
, task_index
, task_depends
):
118 return "DONE", None, None
121 class VimInteractionNet(VimInteractionBase
):
122 def new(self
, ro_task
, task_index
, task_depends
):
124 task
= ro_task
["tasks"][task_index
]
125 task_id
= task
["task_id"]
128 target_vim
= self
.my_vims
[ro_task
["target_id"]]
130 mgmtnet_defined_in_vim
= False
134 if task
.get("find_params"):
135 # if management, get configuration of VIM
136 if task
["find_params"].get("filter_dict"):
137 vim_filter
= task
["find_params"]["filter_dict"]
139 elif task
["find_params"].get("mgmt"):
142 self
.db_vims
[ro_task
["target_id"]],
144 "management_network_id",
146 mgmtnet_defined_in_vim
= True
148 "id": self
.db_vims
[ro_task
["target_id"]]["config"][
149 "management_network_id"
153 self
.db_vims
[ro_task
["target_id"]],
155 "management_network_name",
157 mgmtnet_defined_in_vim
= True
159 "name": self
.db_vims
[ro_task
["target_id"]]["config"][
160 "management_network_name"
164 vim_filter
= {"name": task
["find_params"]["name"]}
166 raise NsWorkerExceptionNotFound(
167 "Invalid find_params for new_net {}".format(task
["find_params"])
170 vim_nets
= target_vim
.get_network_list(vim_filter
)
171 if not vim_nets
and not task
.get("params"):
172 # If there is mgmt-network in the descriptor,
173 # there is no mapping of that network to a VIM network in the descriptor,
174 # also there is no mapping in the "--config" parameter or at VIM creation;
175 # that mgmt-network will be created.
176 if mgmtnet
and not mgmtnet_defined_in_vim
:
178 vim_filter
.get("name")
179 if vim_filter
.get("name")
180 else vim_filter
.get("id")[:16]
182 vim_net_id
, created_items
= target_vim
.new_network(
186 "Created mgmt network vim_net_id: {}".format(vim_net_id
)
190 raise NsWorkerExceptionNotFound(
191 "Network not found with this criteria: '{}'".format(
192 task
.get("find_params")
195 elif len(vim_nets
) > 1:
196 raise NsWorkerException(
197 "More than one network found with this criteria: '{}'".format(
203 vim_net_id
= vim_nets
[0]["id"]
206 params
= task
["params"]
207 vim_net_id
, created_items
= target_vim
.new_network(**params
)
210 ro_vim_item_update
= {
211 "vim_id": vim_net_id
,
212 "vim_status": "BUILD",
214 "created_items": created_items
,
218 "task={} {} new-net={} created={}".format(
219 task_id
, ro_task
["target_id"], vim_net_id
, created
223 return "BUILD", ro_vim_item_update
224 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
226 "task={} vim={} new-net: {}".format(task_id
, ro_task
["target_id"], e
)
228 ro_vim_item_update
= {
229 "vim_status": "VIM_ERROR",
231 "vim_details": str(e
),
234 return "FAILED", ro_vim_item_update
236 def refresh(self
, ro_task
):
237 """Call VIM to get network status"""
238 ro_task_id
= ro_task
["_id"]
239 target_vim
= self
.my_vims
[ro_task
["target_id"]]
240 vim_id
= ro_task
["vim_info"]["vim_id"]
241 net_to_refresh_list
= [vim_id
]
244 vim_dict
= target_vim
.refresh_nets_status(net_to_refresh_list
)
245 vim_info
= vim_dict
[vim_id
]
247 if vim_info
["status"] == "ACTIVE":
249 elif vim_info
["status"] == "BUILD":
250 task_status
= "BUILD"
252 task_status
= "FAILED"
253 except vimconn
.VimConnException
as e
:
254 # Mark all tasks at VIM_ERROR status
256 "ro_task={} vim={} get-net={}: {}".format(
257 ro_task_id
, ro_task
["target_id"], vim_id
, e
260 vim_info
= {"status": "VIM_ERROR", "error_msg": str(e
)}
261 task_status
= "FAILED"
263 ro_vim_item_update
= {}
264 if ro_task
["vim_info"]["vim_status"] != vim_info
["status"]:
265 ro_vim_item_update
["vim_status"] = vim_info
["status"]
267 if ro_task
["vim_info"]["vim_name"] != vim_info
.get("name"):
268 ro_vim_item_update
["vim_name"] = vim_info
.get("name")
270 if vim_info
["status"] in ("ERROR", "VIM_ERROR"):
271 if ro_task
["vim_info"]["vim_details"] != vim_info
.get("error_msg"):
272 ro_vim_item_update
["vim_details"] = vim_info
.get("error_msg")
273 elif vim_info
["status"] == "DELETED":
274 ro_vim_item_update
["vim_id"] = None
275 ro_vim_item_update
["vim_details"] = "Deleted externally"
277 if ro_task
["vim_info"]["vim_details"] != vim_info
["vim_info"]:
278 ro_vim_item_update
["vim_details"] = vim_info
["vim_info"]
280 if ro_vim_item_update
:
282 "ro_task={} {} get-net={}: status={} {}".format(
284 ro_task
["target_id"],
286 ro_vim_item_update
.get("vim_status"),
287 ro_vim_item_update
.get("vim_details")
288 if ro_vim_item_update
.get("vim_status") != "ACTIVE"
293 return task_status
, ro_vim_item_update
295 def delete(self
, ro_task
, task_index
):
296 task
= ro_task
["tasks"][task_index
]
297 task_id
= task
["task_id"]
298 net_vim_id
= ro_task
["vim_info"]["vim_id"]
299 ro_vim_item_update_ok
= {
300 "vim_status": "DELETED",
302 "vim_details": "DELETED",
307 if net_vim_id
or ro_task
["vim_info"]["created_items"]:
308 target_vim
= self
.my_vims
[ro_task
["target_id"]]
309 target_vim
.delete_network(
310 net_vim_id
, ro_task
["vim_info"]["created_items"]
312 except vimconn
.VimConnNotFoundException
:
313 ro_vim_item_update_ok
["vim_details"] = "already deleted"
314 except vimconn
.VimConnException
as e
:
316 "ro_task={} vim={} del-net={}: {}".format(
317 ro_task
["_id"], ro_task
["target_id"], net_vim_id
, e
320 ro_vim_item_update
= {
321 "vim_status": "VIM_ERROR",
322 "vim_details": "Error while deleting: {}".format(e
),
325 return "FAILED", ro_vim_item_update
328 "task={} {} del-net={} {}".format(
330 ro_task
["target_id"],
332 ro_vim_item_update_ok
.get("vim_details", ""),
336 return "DONE", ro_vim_item_update_ok
339 class VimInteractionVdu(VimInteractionBase
):
340 max_retries_inject_ssh_key
= 20 # 20 times
341 time_retries_inject_ssh_key
= 30 # wevery 30 seconds
343 def new(self
, ro_task
, task_index
, task_depends
):
344 task
= ro_task
["tasks"][task_index
]
345 task_id
= task
["task_id"]
348 target_vim
= self
.my_vims
[ro_task
["target_id"]]
352 params
= task
["params"]
353 params_copy
= deepcopy(params
)
354 net_list
= params_copy
["net_list"]
357 # change task_id into network_id
358 if "net_id" in net
and net
["net_id"].startswith("TASK-"):
359 network_id
= task_depends
[net
["net_id"]]
362 raise NsWorkerException(
363 "Cannot create VM because depends on a network not created or found "
364 "for {}".format(net
["net_id"])
367 net
["net_id"] = network_id
369 if params_copy
["image_id"].startswith("TASK-"):
370 params_copy
["image_id"] = task_depends
[params_copy
["image_id"]]
372 if params_copy
["flavor_id"].startswith("TASK-"):
373 params_copy
["flavor_id"] = task_depends
[params_copy
["flavor_id"]]
375 affinity_group_list
= params_copy
["affinity_group_list"]
376 for affinity_group
in affinity_group_list
:
377 # change task_id into affinity_group_id
378 if "affinity_group_id" in affinity_group
and affinity_group
[
380 ].startswith("TASK-"):
381 affinity_group_id
= task_depends
[
382 affinity_group
["affinity_group_id"]
385 if not affinity_group_id
:
386 raise NsWorkerException(
387 "found for {}".format(affinity_group
["affinity_group_id"])
390 affinity_group
["affinity_group_id"] = affinity_group_id
392 vim_vm_id
, created_items
= target_vim
.new_vminstance(**params_copy
)
393 interfaces
= [iface
["vim_id"] for iface
in params_copy
["net_list"]]
395 ro_vim_item_update
= {
397 "vim_status": "BUILD",
399 "created_items": created_items
,
401 "interfaces_vim_ids": interfaces
,
405 "task={} {} new-vm={} created={}".format(
406 task_id
, ro_task
["target_id"], vim_vm_id
, created
410 return "BUILD", ro_vim_item_update
411 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
413 "task={} {} new-vm: {}".format(task_id
, ro_task
["target_id"], e
)
415 ro_vim_item_update
= {
416 "vim_status": "VIM_ERROR",
418 "vim_details": str(e
),
421 return "FAILED", ro_vim_item_update
423 def delete(self
, ro_task
, task_index
):
424 task
= ro_task
["tasks"][task_index
]
425 task_id
= task
["task_id"]
426 vm_vim_id
= ro_task
["vim_info"]["vim_id"]
427 ro_vim_item_update_ok
= {
428 "vim_status": "DELETED",
430 "vim_details": "DELETED",
435 if vm_vim_id
or ro_task
["vim_info"]["created_items"]:
436 target_vim
= self
.my_vims
[ro_task
["target_id"]]
437 target_vim
.delete_vminstance(
438 vm_vim_id
, ro_task
["vim_info"]["created_items"]
440 except vimconn
.VimConnNotFoundException
:
441 ro_vim_item_update_ok
["vim_details"] = "already deleted"
442 except vimconn
.VimConnException
as e
:
444 "ro_task={} vim={} del-vm={}: {}".format(
445 ro_task
["_id"], ro_task
["target_id"], vm_vim_id
, e
448 ro_vim_item_update
= {
449 "vim_status": "VIM_ERROR",
450 "vim_details": "Error while deleting: {}".format(e
),
453 return "FAILED", ro_vim_item_update
456 "task={} {} del-vm={} {}".format(
458 ro_task
["target_id"],
460 ro_vim_item_update_ok
.get("vim_details", ""),
464 return "DONE", ro_vim_item_update_ok
466 def refresh(self
, ro_task
):
467 """Call VIM to get vm status"""
468 ro_task_id
= ro_task
["_id"]
469 target_vim
= self
.my_vims
[ro_task
["target_id"]]
470 vim_id
= ro_task
["vim_info"]["vim_id"]
475 vm_to_refresh_list
= [vim_id
]
477 vim_dict
= target_vim
.refresh_vms_status(vm_to_refresh_list
)
478 vim_info
= vim_dict
[vim_id
]
480 if vim_info
["status"] == "ACTIVE":
482 elif vim_info
["status"] == "BUILD":
483 task_status
= "BUILD"
485 task_status
= "FAILED"
487 # try to load and parse vim_information
489 vim_info_info
= yaml
.safe_load(vim_info
["vim_info"])
490 if vim_info_info
.get("name"):
491 vim_info
["name"] = vim_info_info
["name"]
492 except Exception as vim_info_error
:
493 self
.logger
.exception(
494 f
"{vim_info_error} occured while getting the vim_info from yaml"
496 except vimconn
.VimConnException
as e
:
497 # Mark all tasks at VIM_ERROR status
499 "ro_task={} vim={} get-vm={}: {}".format(
500 ro_task_id
, ro_task
["target_id"], vim_id
, e
503 vim_info
= {"status": "VIM_ERROR", "error_msg": str(e
)}
504 task_status
= "FAILED"
506 ro_vim_item_update
= {}
508 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
510 if vim_info
.get("interfaces"):
511 for vim_iface_id
in ro_task
["vim_info"]["interfaces_vim_ids"]:
515 for iface
in vim_info
["interfaces"]
516 if vim_iface_id
== iface
["vim_interface_id"]
521 # iface.pop("vim_info", None)
522 vim_interfaces
.append(iface
)
526 for t
in ro_task
["tasks"]
527 if t
and t
["action"] == "CREATE" and t
["status"] != "FINISHED"
529 if vim_interfaces
and task_create
.get("mgmt_vnf_interface") is not None:
530 vim_interfaces
[task_create
["mgmt_vnf_interface"]][
534 mgmt_vdu_iface
= task_create
.get(
535 "mgmt_vdu_interface", task_create
.get("mgmt_vnf_interface", 0)
538 vim_interfaces
[mgmt_vdu_iface
]["mgmt_vdu_interface"] = True
540 if ro_task
["vim_info"]["interfaces"] != vim_interfaces
:
541 ro_vim_item_update
["interfaces"] = vim_interfaces
543 if ro_task
["vim_info"]["vim_status"] != vim_info
["status"]:
544 ro_vim_item_update
["vim_status"] = vim_info
["status"]
546 if ro_task
["vim_info"]["vim_name"] != vim_info
.get("name"):
547 ro_vim_item_update
["vim_name"] = vim_info
.get("name")
549 if vim_info
["status"] in ("ERROR", "VIM_ERROR"):
550 if ro_task
["vim_info"]["vim_details"] != vim_info
.get("error_msg"):
551 ro_vim_item_update
["vim_details"] = vim_info
.get("error_msg")
552 elif vim_info
["status"] == "DELETED":
553 ro_vim_item_update
["vim_id"] = None
554 ro_vim_item_update
["vim_details"] = "Deleted externally"
556 if ro_task
["vim_info"]["vim_details"] != vim_info
["vim_info"]:
557 ro_vim_item_update
["vim_details"] = vim_info
["vim_info"]
559 if ro_vim_item_update
:
561 "ro_task={} {} get-vm={}: status={} {}".format(
563 ro_task
["target_id"],
565 ro_vim_item_update
.get("vim_status"),
566 ro_vim_item_update
.get("vim_details")
567 if ro_vim_item_update
.get("vim_status") != "ACTIVE"
572 return task_status
, ro_vim_item_update
574 def exec(self
, ro_task
, task_index
, task_depends
):
575 task
= ro_task
["tasks"][task_index
]
576 task_id
= task
["task_id"]
577 target_vim
= self
.my_vims
[ro_task
["target_id"]]
578 db_task_update
= {"retries": 0}
579 retries
= task
.get("retries", 0)
582 params
= task
["params"]
583 params_copy
= deepcopy(params
)
584 params_copy
["ro_key"] = self
.db
.decrypt(
585 params_copy
.pop("private_key"),
586 params_copy
.pop("schema_version"),
587 params_copy
.pop("salt"),
589 params_copy
["ip_addr"] = params_copy
.pop("ip_address")
590 target_vim
.inject_user_key(**params_copy
)
592 "task={} {} action-vm=inject_key".format(task_id
, ro_task
["target_id"])
599 ) # params_copy["key"]
600 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
603 self
.logger
.debug(traceback
.format_exc())
604 if retries
< self
.max_retries_inject_ssh_key
:
610 "next_retry": self
.time_retries_inject_ssh_key
,
615 "task={} {} inject-ssh-key: {}".format(task_id
, ro_task
["target_id"], e
)
617 ro_vim_item_update
= {"vim_details": str(e
)}
619 return "FAILED", ro_vim_item_update
, db_task_update
622 class VimInteractionImage(VimInteractionBase
):
623 def new(self
, ro_task
, task_index
, task_depends
):
624 task
= ro_task
["tasks"][task_index
]
625 task_id
= task
["task_id"]
628 target_vim
= self
.my_vims
[ro_task
["target_id"]]
632 if task
.get("find_params"):
633 vim_images
= target_vim
.get_image_list(**task
["find_params"])
636 raise NsWorkerExceptionNotFound(
637 "Image not found with this criteria: '{}'".format(
641 elif len(vim_images
) > 1:
642 raise NsWorkerException(
643 "More than one image found with this criteria: '{}'".format(
648 vim_image_id
= vim_images
[0]["id"]
650 ro_vim_item_update
= {
651 "vim_id": vim_image_id
,
652 "vim_status": "DONE",
654 "created_items": created_items
,
658 "task={} {} new-image={} created={}".format(
659 task_id
, ro_task
["target_id"], vim_image_id
, created
663 return "DONE", ro_vim_item_update
664 except (NsWorkerException
, vimconn
.VimConnException
) as e
:
666 "task={} {} new-image: {}".format(task_id
, ro_task
["target_id"], e
)
668 ro_vim_item_update
= {
669 "vim_status": "VIM_ERROR",
671 "vim_details": str(e
),
674 return "FAILED", ro_vim_item_update
677 class VimInteractionFlavor(VimInteractionBase
):
678 def delete(self
, ro_task
, task_index
):
679 task
= ro_task
["tasks"][task_index
]
680 task_id
= task
["task_id"]
681 flavor_vim_id
= ro_task
["vim_info"]["vim_id"]
682 ro_vim_item_update_ok
= {
683 "vim_status": "DELETED",
685 "vim_details": "DELETED",
691 target_vim
= self
.my_vims
[ro_task
["target_id"]]
692 target_vim
.delete_flavor(flavor_vim_id
)
693 except vimconn
.VimConnNotFoundException
:
694 ro_vim_item_update_ok
["vim_details"] = "already deleted"
695 except vimconn
.VimConnException
as e
:
697 "ro_task={} vim={} del-flavor={}: {}".format(
698 ro_task
["_id"], ro_task
["target_id"], flavor_vim_id
, e
701 ro_vim_item_update
= {
702 "vim_status": "VIM_ERROR",
703 "vim_details": "Error while deleting: {}".format(e
),
706 return "FAILED", ro_vim_item_update
709 "task={} {} del-flavor={} {}".format(
711 ro_task
["target_id"],
713 ro_vim_item_update_ok
.get("vim_details", ""),
717 return "DONE", ro_vim_item_update_ok
719 def new(self
, ro_task
, task_index
, task_depends
):
720 task
= ro_task
["tasks"][task_index
]
721 task_id
= task
["task_id"]
724 target_vim
= self
.my_vims
[ro_task
["target_id"]]
730 if task
.get("find_params"):
732 flavor_data
= task
["find_params"]["flavor_data"]
733 vim_flavor_id
= target_vim
.get_flavor_id_from_data(flavor_data
)
734 except vimconn
.VimConnNotFoundException
:
735 self
.logger
.exception("VimConnNotFoundException occured.")
737 if not vim_flavor_id
and task
.get("params"):
739 flavor_data
= task
["params"]["flavor_data"]
740 vim_flavor_id
= target_vim
.new_flavor(flavor_data
)
743 ro_vim_item_update
= {
744 "vim_id": vim_flavor_id
,
745 "vim_status": "DONE",
747 "created_items": created_items
,
751 "task={} {} new-flavor={} created={}".format(
752 task_id
, ro_task
["target_id"], vim_flavor_id
, created
756 return "DONE", ro_vim_item_update
757 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
759 "task={} vim={} new-flavor: {}".format(task_id
, ro_task
["target_id"], e
)
761 ro_vim_item_update
= {
762 "vim_status": "VIM_ERROR",
764 "vim_details": str(e
),
767 return "FAILED", ro_vim_item_update
770 class VimInteractionAffinityGroup(VimInteractionBase
):
771 def delete(self
, ro_task
, task_index
):
772 task
= ro_task
["tasks"][task_index
]
773 task_id
= task
["task_id"]
774 affinity_group_vim_id
= ro_task
["vim_info"]["vim_id"]
775 ro_vim_item_update_ok
= {
776 "vim_status": "DELETED",
778 "vim_details": "DELETED",
783 if affinity_group_vim_id
:
784 target_vim
= self
.my_vims
[ro_task
["target_id"]]
785 target_vim
.delete_affinity_group(affinity_group_vim_id
)
786 except vimconn
.VimConnNotFoundException
:
787 ro_vim_item_update_ok
["vim_details"] = "already deleted"
788 except vimconn
.VimConnException
as e
:
790 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format(
791 ro_task
["_id"], ro_task
["target_id"], affinity_group_vim_id
, e
794 ro_vim_item_update
= {
795 "vim_status": "VIM_ERROR",
796 "vim_details": "Error while deleting: {}".format(e
),
799 return "FAILED", ro_vim_item_update
802 "task={} {} del-affinity-or-anti-affinity-group={} {}".format(
804 ro_task
["target_id"],
805 affinity_group_vim_id
,
806 ro_vim_item_update_ok
.get("vim_details", ""),
810 return "DONE", ro_vim_item_update_ok
812 def new(self
, ro_task
, task_index
, task_depends
):
813 task
= ro_task
["tasks"][task_index
]
814 task_id
= task
["task_id"]
817 target_vim
= self
.my_vims
[ro_task
["target_id"]]
820 affinity_group_vim_id
= None
821 affinity_group_data
= None
823 if task
.get("params"):
824 affinity_group_data
= task
["params"].get("affinity_group_data")
826 if affinity_group_data
and affinity_group_data
.get("vim-affinity-group-id"):
828 param_affinity_group_id
= task
["params"]["affinity_group_data"].get(
829 "vim-affinity-group-id"
831 affinity_group_vim_id
= target_vim
.get_affinity_group(
832 param_affinity_group_id
834 except vimconn
.VimConnNotFoundException
:
836 "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}"
837 "could not be found at VIM. Creating a new one.".format(
838 task_id
, ro_task
["target_id"], param_affinity_group_id
842 if not affinity_group_vim_id
and affinity_group_data
:
843 affinity_group_vim_id
= target_vim
.new_affinity_group(
848 ro_vim_item_update
= {
849 "vim_id": affinity_group_vim_id
,
850 "vim_status": "DONE",
852 "created_items": created_items
,
856 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format(
857 task_id
, ro_task
["target_id"], affinity_group_vim_id
, created
861 return "DONE", ro_vim_item_update
862 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
864 "task={} vim={} new-affinity-or-anti-affinity-group:"
865 " {}".format(task_id
, ro_task
["target_id"], e
)
867 ro_vim_item_update
= {
868 "vim_status": "VIM_ERROR",
870 "vim_details": str(e
),
873 return "FAILED", ro_vim_item_update
876 class VimInteractionSdnNet(VimInteractionBase
):
878 def _match_pci(port_pci
, mapping
):
880 Check if port_pci matches with mapping
881 mapping can have brackets to indicate that several chars are accepted. e.g
882 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
883 :param port_pci: text
884 :param mapping: text, can contain brackets to indicate several chars are available
885 :return: True if matches, False otherwise
887 if not port_pci
or not mapping
:
889 if port_pci
== mapping
:
895 bracket_start
= mapping
.find("[", mapping_index
)
897 if bracket_start
== -1:
900 bracket_end
= mapping
.find("]", bracket_start
)
901 if bracket_end
== -1:
904 length
= bracket_start
- mapping_index
907 and port_pci
[pci_index
: pci_index
+ length
]
908 != mapping
[mapping_index
:bracket_start
]
913 port_pci
[pci_index
+ length
]
914 not in mapping
[bracket_start
+ 1 : bracket_end
]
918 pci_index
+= length
+ 1
919 mapping_index
= bracket_end
+ 1
921 if port_pci
[pci_index
:] != mapping
[mapping_index
:]:
926 def _get_interfaces(self
, vlds_to_connect
, vim_account_id
):
928 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
929 :param vim_account_id:
934 for vld
in vlds_to_connect
:
935 table
, _
, db_id
= vld
.partition(":")
936 db_id
, _
, vld
= db_id
.partition(":")
937 _
, _
, vld_id
= vld
.partition(".")
940 q_filter
= {"vim-account-id": vim_account_id
, "_id": db_id
}
941 iface_key
= "vnf-vld-id"
942 else: # table == "nsrs"
943 q_filter
= {"vim-account-id": vim_account_id
, "nsr-id-ref": db_id
}
944 iface_key
= "ns-vld-id"
946 db_vnfrs
= self
.db
.get_list("vnfrs", q_filter
=q_filter
)
948 for db_vnfr
in db_vnfrs
:
949 for vdu_index
, vdur
in enumerate(db_vnfr
.get("vdur", ())):
950 for iface_index
, interface
in enumerate(vdur
["interfaces"]):
951 if interface
.get(iface_key
) == vld_id
and interface
.get(
953 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
955 interface_
= interface
.copy()
956 interface_
["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
957 db_vnfr
["_id"], vdu_index
, iface_index
960 if vdur
.get("status") == "ERROR":
961 interface_
["status"] = "ERROR"
963 interfaces
.append(interface_
)
967 def refresh(self
, ro_task
):
968 # look for task create
969 task_create_index
, _
= next(
971 for i_t
in enumerate(ro_task
["tasks"])
973 and i_t
[1]["action"] == "CREATE"
974 and i_t
[1]["status"] != "FINISHED"
977 return self
.new(ro_task
, task_create_index
, None)
979 def new(self
, ro_task
, task_index
, task_depends
):
980 task
= ro_task
["tasks"][task_index
]
981 task_id
= task
["task_id"]
982 target_vim
= self
.my_vims
[ro_task
["target_id"]]
984 sdn_net_id
= ro_task
["vim_info"]["vim_id"]
986 created_items
= ro_task
["vim_info"].get("created_items")
987 connected_ports
= ro_task
["vim_info"].get("connected_ports", [])
988 new_connected_ports
= []
989 last_update
= ro_task
["vim_info"].get("last_update", 0)
990 sdn_status
= ro_task
["vim_info"].get("vim_status", "BUILD") or "BUILD"
992 created
= ro_task
["vim_info"].get("created", False)
996 params
= task
["params"]
997 vlds_to_connect
= params
.get("vlds", [])
998 associated_vim
= params
.get("target_vim")
999 # external additional ports
1000 additional_ports
= params
.get("sdn-ports") or ()
1001 _
, _
, vim_account_id
= (
1003 if associated_vim
is None
1004 else associated_vim
.partition(":")
1008 # get associated VIM
1009 if associated_vim
not in self
.db_vims
:
1010 self
.db_vims
[associated_vim
] = self
.db
.get_one(
1011 "vim_accounts", {"_id": vim_account_id
}
1014 db_vim
= self
.db_vims
[associated_vim
]
1016 # look for ports to connect
1017 ports
= self
._get
_interfaces
(vlds_to_connect
, vim_account_id
)
1021 pending_ports
= error_ports
= 0
1023 sdn_need_update
= False
1026 vlan_used
= port
.get("vlan") or vlan_used
1028 # TODO. Do not connect if already done
1029 if not port
.get("compute_node") or not port
.get("pci"):
1030 if port
.get("status") == "ERROR":
1037 compute_node_mappings
= next(
1040 for c
in db_vim
["config"].get("sdn-port-mapping", ())
1041 if c
and c
["compute_node"] == port
["compute_node"]
1046 if compute_node_mappings
:
1047 # process port_mapping pci of type 0000:af:1[01].[1357]
1051 for p
in compute_node_mappings
["ports"]
1052 if self
._match
_pci
(port
["pci"], p
.get("pci"))
1058 if not db_vim
["config"].get("mapping_not_needed"):
1060 "Port mapping not found for compute_node={} pci={}".format(
1061 port
["compute_node"], port
["pci"]
1068 service_endpoint_id
= "{}:{}".format(port
["compute_node"], port
["pci"])
1070 "service_endpoint_id": pmap
.get("service_endpoint_id")
1071 or service_endpoint_id
,
1072 "service_endpoint_encapsulation_type": "dot1q"
1073 if port
["type"] == "SR-IOV"
1075 "service_endpoint_encapsulation_info": {
1076 "vlan": port
.get("vlan"),
1077 "mac": port
.get("mac-address"),
1078 "device_id": pmap
.get("device_id") or port
["compute_node"],
1079 "device_interface_id": pmap
.get("device_interface_id")
1081 "switch_dpid": pmap
.get("switch_id") or pmap
.get("switch_dpid"),
1082 "switch_port": pmap
.get("switch_port"),
1083 "service_mapping_info": pmap
.get("service_mapping_info"),
1088 # if port["modified_at"] > last_update:
1089 # sdn_need_update = True
1090 new_connected_ports
.append(port
["id"]) # TODO
1091 sdn_ports
.append(new_port
)
1095 "{} interfaces have not been created as VDU is on ERROR status".format(
1100 # connect external ports
1101 for index
, additional_port
in enumerate(additional_ports
):
1102 additional_port_id
= additional_port
.get(
1103 "service_endpoint_id"
1104 ) or "external-{}".format(index
)
1107 "service_endpoint_id": additional_port_id
,
1108 "service_endpoint_encapsulation_type": additional_port
.get(
1109 "service_endpoint_encapsulation_type", "dot1q"
1111 "service_endpoint_encapsulation_info": {
1112 "vlan": additional_port
.get("vlan") or vlan_used
,
1113 "mac": additional_port
.get("mac_address"),
1114 "device_id": additional_port
.get("device_id"),
1115 "device_interface_id": additional_port
.get(
1116 "device_interface_id"
1118 "switch_dpid": additional_port
.get("switch_dpid")
1119 or additional_port
.get("switch_id"),
1120 "switch_port": additional_port
.get("switch_port"),
1121 "service_mapping_info": additional_port
.get(
1122 "service_mapping_info"
1127 new_connected_ports
.append(additional_port_id
)
1130 # if there are more ports to connect or they have been modified, call create/update
1132 sdn_status
= "ERROR"
1133 sdn_info
= "; ".join(error_list
)
1134 elif set(connected_ports
) != set(new_connected_ports
) or sdn_need_update
:
1135 last_update
= time
.time()
1138 if len(sdn_ports
) < 2:
1139 sdn_status
= "ACTIVE"
1141 if not pending_ports
:
1143 "task={} {} new-sdn-net done, less than 2 ports".format(
1144 task_id
, ro_task
["target_id"]
1148 net_type
= params
.get("type") or "ELAN"
1152 ) = target_vim
.create_connectivity_service(net_type
, sdn_ports
)
1155 "task={} {} new-sdn-net={} created={}".format(
1156 task_id
, ro_task
["target_id"], sdn_net_id
, created
1160 created_items
= target_vim
.edit_connectivity_service(
1161 sdn_net_id
, conn_info
=created_items
, connection_points
=sdn_ports
1165 "task={} {} update-sdn-net={} created={}".format(
1166 task_id
, ro_task
["target_id"], sdn_net_id
, created
1170 connected_ports
= new_connected_ports
1172 wim_status_dict
= target_vim
.get_connectivity_service_status(
1173 sdn_net_id
, conn_info
=created_items
1175 sdn_status
= wim_status_dict
["sdn_status"]
1177 if wim_status_dict
.get("sdn_info"):
1178 sdn_info
= str(wim_status_dict
.get("sdn_info")) or ""
1180 if wim_status_dict
.get("error_msg"):
1181 sdn_info
= wim_status_dict
.get("error_msg") or ""
1184 if sdn_status
!= "ERROR":
1185 sdn_info
= "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1186 len(ports
) - pending_ports
, len(ports
)
1189 if sdn_status
== "ACTIVE":
1190 sdn_status
= "BUILD"
1192 ro_vim_item_update
= {
1193 "vim_id": sdn_net_id
,
1194 "vim_status": sdn_status
,
1196 "created_items": created_items
,
1197 "connected_ports": connected_ports
,
1198 "vim_details": sdn_info
,
1199 "last_update": last_update
,
1202 return sdn_status
, ro_vim_item_update
1203 except Exception as e
:
1205 "task={} vim={} new-net: {}".format(task_id
, ro_task
["target_id"], e
),
1206 exc_info
=not isinstance(
1207 e
, (sdnconn
.SdnConnectorError
, vimconn
.VimConnException
)
1210 ro_vim_item_update
= {
1211 "vim_status": "VIM_ERROR",
1213 "vim_details": str(e
),
1216 return "FAILED", ro_vim_item_update
1218 def delete(self
, ro_task
, task_index
):
1219 task
= ro_task
["tasks"][task_index
]
1220 task_id
= task
["task_id"]
1221 sdn_vim_id
= ro_task
["vim_info"].get("vim_id")
1222 ro_vim_item_update_ok
= {
1223 "vim_status": "DELETED",
1225 "vim_details": "DELETED",
1231 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1232 target_vim
.delete_connectivity_service(
1233 sdn_vim_id
, ro_task
["vim_info"].get("created_items")
1236 except Exception as e
:
1238 isinstance(e
, sdnconn
.SdnConnectorError
)
1239 and e
.http_code
== HTTPStatus
.NOT_FOUND
.value
1241 ro_vim_item_update_ok
["vim_details"] = "already deleted"
1244 "ro_task={} vim={} del-sdn-net={}: {}".format(
1245 ro_task
["_id"], ro_task
["target_id"], sdn_vim_id
, e
1247 exc_info
=not isinstance(
1248 e
, (sdnconn
.SdnConnectorError
, vimconn
.VimConnException
)
1251 ro_vim_item_update
= {
1252 "vim_status": "VIM_ERROR",
1253 "vim_details": "Error while deleting: {}".format(e
),
1256 return "FAILED", ro_vim_item_update
1259 "task={} {} del-sdn-net={} {}".format(
1261 ro_task
["target_id"],
1263 ro_vim_item_update_ok
.get("vim_details", ""),
1267 return "DONE", ro_vim_item_update_ok
1270 class ConfigValidate
:
1271 def __init__(self
, config
: Dict
):
1276 # default 1 min, allowed >= 60 or -1, -1 disables periodic checks
1278 self
.conf
["period"]["refresh_active"] >= 60
1279 or self
.conf
["period"]["refresh_active"] == -1
1281 return self
.conf
["period"]["refresh_active"]
1287 return self
.conf
["period"]["refresh_build"]
1291 return self
.conf
["period"]["refresh_image"]
1295 return self
.conf
["period"]["refresh_error"]
1298 def queue_size(self
):
1299 return self
.conf
["period"]["queue_size"]
1302 class NsWorker(threading
.Thread
):
1303 def __init__(self
, worker_index
, config
, plugins
, db
):
1306 :param worker_index: thread index
1307 :param config: general configuration of RO, among others the process_id with the docker id where it runs
1308 :param plugins: global shared dict with the loaded plugins
1309 :param db: database class instance to use
1311 threading
.Thread
.__init
__(self
)
1312 self
.config
= config
1313 self
.plugins
= plugins
1314 self
.plugin_name
= "unknown"
1315 self
.logger
= logging
.getLogger("ro.worker{}".format(worker_index
))
1316 self
.worker_index
= worker_index
1317 # refresh periods for created items
1318 self
.refresh_config
= ConfigValidate(config
)
1319 self
.task_queue
= queue
.Queue(self
.refresh_config
.queue_size
)
1320 # targetvim: vimplugin class
1322 # targetvim: vim information from database
1325 self
.vim_targets
= []
1326 self
.my_id
= config
["process_id"] + ":" + str(worker_index
)
1329 "net": VimInteractionNet(self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
),
1330 "vdu": VimInteractionVdu(self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
),
1331 "image": VimInteractionImage(
1332 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1334 "flavor": VimInteractionFlavor(
1335 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1337 "sdn_net": VimInteractionSdnNet(
1338 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1340 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
1341 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1344 self
.time_last_task_processed
= None
1345 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1346 self
.tasks_to_delete
= []
1347 # it is idle when there are not vim_targets associated
1349 self
.task_locked_time
= config
["global"]["task_locked_time"]
1351 def insert_task(self
, task
):
1353 self
.task_queue
.put(task
, False)
1356 raise NsWorkerException("timeout inserting a task")
1358 def terminate(self
):
1359 self
.insert_task("exit")
1361 def del_task(self
, task
):
1362 with self
.task_lock
:
1363 if task
["status"] == "SCHEDULED":
1364 task
["status"] = "SUPERSEDED"
1366 else: # task["status"] == "processing"
1367 self
.task_lock
.release()
1370 def _process_vim_config(self
, target_id
, db_vim
):
1372 Process vim config, creating vim configuration files as ca_cert
1373 :param target_id: vim/sdn/wim + id
1374 :param db_vim: Vim dictionary obtained from database
1375 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1377 if not db_vim
.get("config"):
1383 if db_vim
["config"].get("ca_cert_content"):
1384 file_name
= "{}:{}".format(target_id
, self
.worker_index
)
1388 except FileExistsError
:
1389 self
.logger
.exception(
1390 "FileExistsError occured while processing vim_config."
1393 file_name
= file_name
+ "/ca_cert"
1395 with
open(file_name
, "w") as f
:
1396 f
.write(db_vim
["config"]["ca_cert_content"])
1397 del db_vim
["config"]["ca_cert_content"]
1398 db_vim
["config"]["ca_cert"] = file_name
1399 except Exception as e
:
1400 raise NsWorkerException(
1401 "Error writing to file '{}': {}".format(file_name
, e
)
1404 def _load_plugin(self
, name
, type="vim"):
1405 # type can be vim or sdn
1406 if "rovim_dummy" not in self
.plugins
:
1407 self
.plugins
["rovim_dummy"] = VimDummyConnector
1409 if "rosdn_dummy" not in self
.plugins
:
1410 self
.plugins
["rosdn_dummy"] = SdnDummyConnector
1412 if name
in self
.plugins
:
1413 return self
.plugins
[name
]
1416 for ep
in entry_points(group
="osm_ro{}.plugins".format(type), name
=name
):
1417 self
.plugins
[name
] = ep
.load()
1418 except Exception as e
:
1419 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name
, e
))
1421 if name
and name
not in self
.plugins
:
1422 raise NsWorkerException(
1423 "Plugin 'osm_{n}' has not been installed".format(n
=name
)
1426 return self
.plugins
[name
]
1428 def _unload_vim(self
, target_id
):
1430 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1431 :param target_id: Contains type:_id; where type can be 'vim', ...
1435 self
.db_vims
.pop(target_id
, None)
1436 self
.my_vims
.pop(target_id
, None)
1438 if target_id
in self
.vim_targets
:
1439 self
.vim_targets
.remove(target_id
)
1441 self
.logger
.info("Unloaded {}".format(target_id
))
1442 rmtree("{}:{}".format(target_id
, self
.worker_index
))
1443 except FileNotFoundError
:
1444 # This is raised by rmtree if folder does not exist.
1445 self
.logger
.exception("FileNotFoundError occured while unloading VIM.")
1446 except Exception as e
:
1447 self
.logger
.error("Cannot unload {}: {}".format(target_id
, e
))
1449 def _check_vim(self
, target_id
):
1451 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1452 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1455 target
, _
, _id
= target_id
.partition(":")
1461 loaded
= target_id
in self
.vim_targets
1471 step
= "Getting {} from db".format(target_id
)
1472 db_vim
= self
.db
.get_one(target_database
, {"_id": _id
})
1474 for op_index
, operation
in enumerate(
1475 db_vim
["_admin"].get("operations", ())
1477 if operation
["operationState"] != "PROCESSING":
1480 locked_at
= operation
.get("locked_at")
1482 if locked_at
is not None and locked_at
>= now
- self
.task_locked_time
:
1483 # some other thread is doing this operation
1487 op_text
= "_admin.operations.{}.".format(op_index
)
1489 if not self
.db
.set_one(
1493 op_text
+ "operationState": "PROCESSING",
1494 op_text
+ "locked_at": locked_at
,
1497 op_text
+ "locked_at": now
,
1498 "admin.current_operation": op_index
,
1500 fail_on_empty
=False,
1504 unset_dict
[op_text
+ "locked_at"] = None
1505 unset_dict
["current_operation"] = None
1506 step
= "Loading " + target_id
1507 error_text
= self
._load
_vim
(target_id
)
1510 step
= "Checking connectivity"
1513 self
.my_vims
[target_id
].check_vim_connectivity()
1515 self
.my_vims
[target_id
].check_credentials()
1517 update_dict
["_admin.operationalState"] = "ENABLED"
1518 update_dict
["_admin.detailed-status"] = ""
1519 unset_dict
[op_text
+ "detailed-status"] = None
1520 update_dict
[op_text
+ "operationState"] = "COMPLETED"
1524 except Exception as e
:
1525 error_text
= "{}: {}".format(step
, e
)
1526 self
.logger
.error("{} for {}: {}".format(step
, target_id
, e
))
1529 if update_dict
or unset_dict
:
1531 update_dict
[op_text
+ "operationState"] = "FAILED"
1532 update_dict
[op_text
+ "detailed-status"] = error_text
1533 unset_dict
.pop(op_text
+ "detailed-status", None)
1534 update_dict
["_admin.operationalState"] = "ERROR"
1535 update_dict
["_admin.detailed-status"] = error_text
1538 update_dict
[op_text
+ "statusEnteredTime"] = now
1542 q_filter
={"_id": _id
},
1543 update_dict
=update_dict
,
1545 fail_on_empty
=False,
1549 self
._unload
_vim
(target_id
)
1551 def _reload_vim(self
, target_id
):
1552 if target_id
in self
.vim_targets
:
1553 self
._load
_vim
(target_id
)
1555 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1556 # just remove it to force load again next time it is needed
1557 self
.db_vims
.pop(target_id
, None)
1559 def _load_vim(self
, target_id
):
1561 Load or reload a vim_account, sdn_controller or wim_account.
1562 Read content from database, load the plugin if not loaded.
1563 In case of error loading the plugin, it load a failing VIM_connector
1564 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1565 :param target_id: Contains type:_id; where type can be 'vim', ...
1566 :return: None if ok, descriptive text if error
1568 target
, _
, _id
= target_id
.partition(":")
1580 step
= "Getting {}={} from db".format(target
, _id
)
1581 # TODO process for wim, sdnc, ...
1582 vim
= self
.db
.get_one(target_database
, {"_id": _id
})
1584 # if deep_get(vim, "config", "sdn-controller"):
1585 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1586 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1588 step
= "Decrypting password"
1589 schema_version
= vim
.get("schema_version")
1590 self
.db
.encrypt_decrypt_fields(
1593 fields
=("password", "secret"),
1594 schema_version
=schema_version
,
1597 self
._process
_vim
_config
(target_id
, vim
)
1600 plugin_name
= "rovim_" + vim
["vim_type"]
1601 step
= "Loading plugin '{}'".format(plugin_name
)
1602 vim_module_conn
= self
._load
_plugin
(plugin_name
)
1603 step
= "Loading {}'".format(target_id
)
1604 self
.my_vims
[target_id
] = vim_module_conn(
1607 tenant_id
=vim
.get("vim_tenant_id"),
1608 tenant_name
=vim
.get("vim_tenant_name"),
1611 user
=vim
["vim_user"],
1612 passwd
=vim
["vim_password"],
1613 config
=vim
.get("config") or {},
1617 plugin_name
= "rosdn_" + (vim
.get("type") or vim
.get("wim_type"))
1618 step
= "Loading plugin '{}'".format(plugin_name
)
1619 vim_module_conn
= self
._load
_plugin
(plugin_name
, "sdn")
1620 step
= "Loading {}'".format(target_id
)
1622 wim_config
= wim
.pop("config", {}) or {}
1623 wim
["uuid"] = wim
["_id"]
1624 if "url" in wim
and "wim_url" not in wim
:
1625 wim
["wim_url"] = wim
["url"]
1626 elif "url" not in wim
and "wim_url" in wim
:
1627 wim
["url"] = wim
["wim_url"]
1630 wim_config
["dpid"] = wim
.pop("dpid")
1632 if wim
.get("switch_id"):
1633 wim_config
["switch_id"] = wim
.pop("switch_id")
1635 # wim, wim_account, config
1636 self
.my_vims
[target_id
] = vim_module_conn(wim
, wim
, wim_config
)
1637 self
.db_vims
[target_id
] = vim
1638 self
.error_status
= None
1641 "Connector loaded for {}, plugin={}".format(target_id
, plugin_name
)
1643 except Exception as e
:
1645 "Cannot load {} plugin={}: {} {}".format(
1646 target_id
, plugin_name
, step
, e
1650 self
.db_vims
[target_id
] = vim
or {}
1651 self
.db_vims
[target_id
] = FailingConnector(str(e
))
1652 error_status
= "{} Error: {}".format(step
, e
)
1656 if target_id
not in self
.vim_targets
:
1657 self
.vim_targets
.append(target_id
)
1659 def _get_db_task(self
):
1661 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1666 if not self
.time_last_task_processed
:
1667 self
.time_last_task_processed
= now
1672 # Log RO tasks only when loglevel is DEBUG
1673 if self.logger.getEffectiveLevel() == logging.DEBUG:
1680 + str(self.task_locked_time)
1682 + "time_last_task_processed="
1683 + str(self.time_last_task_processed)
1689 locked
= self
.db
.set_one(
1692 "target_id": self
.vim_targets
,
1693 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1694 "locked_at.lt": now
- self
.task_locked_time
,
1695 "to_check_at.lt": self
.time_last_task_processed
,
1696 "to_check_at.gt": -1,
1698 update_dict
={"locked_by": self
.my_id
, "locked_at": now
},
1699 fail_on_empty
=False,
1704 ro_task
= self
.db
.get_one(
1707 "target_id": self
.vim_targets
,
1708 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1714 if self
.time_last_task_processed
== now
:
1715 self
.time_last_task_processed
= None
1718 self
.time_last_task_processed
= now
1719 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
1721 except DbException
as e
:
1722 self
.logger
.error("Database exception at _get_db_task: {}".format(e
))
1723 except Exception as e
:
1724 self
.logger
.critical(
1725 "Unexpected exception at _get_db_task: {}".format(e
), exc_info
=True
1730 def _get_db_all_tasks(self
):
1732 Read all content of table ro_tasks to log it
1736 # Checking the content of the BD:
1739 ro_task
= self
.db
.get_list("ro_tasks")
1741 self
._log
_ro
_task
(rt
, None, None, "TASK_WF", "GET_ALL_TASKS")
1744 except DbException
as e
:
1745 self
.logger
.error("Database exception at _get_db_all_tasks: {}".format(e
))
1746 except Exception as e
:
1747 self
.logger
.critical(
1748 "Unexpected exception at _get_db_all_tasks: {}".format(e
), exc_info
=True
1753 def _log_ro_task(self
, ro_task
, db_ro_task_update
, db_ro_task_delete
, mark
, event
):
1755 Generate a log with the following format:
1757 Mark;Event;ro_task_id;locked_at;modified_at;created_at;to_check_at;locked_by;
1758 target_id;vim_info.refresh_at;vim_info;no_of_tasks;task_status;action_id;
1759 task_array_index;task_id;task_action;task_item;task_args
1763 TASK_WF;GET_TASK;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;1642158724.8210013;
1764 1642158640.7986135;1642158640.7986135;1642158640.7986135;b134c9494e75:0a
1765 ;vim:b7ff9e24-8868-4d68-8a57-a59dc11d0327;None;{'created': False,
1766 'created_items': None, 'vim_id': None, 'vim_name': None, 'vim_status': None,
1767 'vim_details': None, 'refresh_at': None};1;SCHEDULED;
1768 888f1864-749a-4fc2-bc1a-97c0fffd6a6f;0;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;
1769 CREATE;image;{'filter_dict': {'name': 'ubuntu-os-cloud:image-family:ubuntu-1804-lts'}}
1774 if ro_task
is not None and isinstance(ro_task
, dict):
1775 for t
in ro_task
["tasks"]:
1779 line
.append(ro_task
.get("_id", ""))
1780 line
.append(str(ro_task
.get("locked_at", "")))
1781 line
.append(str(ro_task
.get("modified_at", "")))
1782 line
.append(str(ro_task
.get("created_at", "")))
1783 line
.append(str(ro_task
.get("to_check_at", "")))
1784 line
.append(str(ro_task
.get("locked_by", "")))
1785 line
.append(str(ro_task
.get("target_id", "")))
1786 line
.append(str(ro_task
.get("vim_info", {}).get("refresh_at", "")))
1787 line
.append(str(ro_task
.get("vim_info", "")))
1788 line
.append(str(ro_task
.get("tasks", "")))
1789 if isinstance(t
, dict):
1790 line
.append(str(t
.get("status", "")))
1791 line
.append(str(t
.get("action_id", "")))
1793 line
.append(str(t
.get("task_id", "")))
1794 line
.append(str(t
.get("action", "")))
1795 line
.append(str(t
.get("item", "")))
1796 line
.append(str(t
.get("find_params", "")))
1797 line
.append(str(t
.get("params", "")))
1799 line
.extend([""] * 2)
1801 line
.extend([""] * 5)
1804 self
.logger
.debug(";".join(line
))
1805 elif db_ro_task_update
is not None and isinstance(db_ro_task_update
, dict):
1808 st
= "tasks.{}.status".format(i
)
1809 if st
not in db_ro_task_update
:
1814 line
.append(db_ro_task_update
.get("_id", ""))
1815 line
.append(str(db_ro_task_update
.get("locked_at", "")))
1816 line
.append(str(db_ro_task_update
.get("modified_at", "")))
1818 line
.append(str(db_ro_task_update
.get("to_check_at", "")))
1819 line
.append(str(db_ro_task_update
.get("locked_by", "")))
1821 line
.append(str(db_ro_task_update
.get("vim_info.refresh_at", "")))
1823 line
.append(str(db_ro_task_update
.get("vim_info", "")))
1824 line
.append(str(str(db_ro_task_update
).count(".status")))
1825 line
.append(db_ro_task_update
.get(st
, ""))
1828 line
.extend([""] * 3)
1830 self
.logger
.debug(";".join(line
))
1832 elif db_ro_task_delete
is not None and isinstance(db_ro_task_delete
, dict):
1836 line
.append(db_ro_task_delete
.get("_id", ""))
1838 line
.append(db_ro_task_delete
.get("modified_at", ""))
1839 line
.extend([""] * 13)
1840 self
.logger
.debug(";".join(line
))
1846 line
.extend([""] * 16)
1847 self
.logger
.debug(";".join(line
))
1849 except Exception as e
:
1850 self
.logger
.error("Error logging ro_task: {}".format(e
))
1852 def _delete_task(self
, ro_task
, task_index
, task_depends
, db_update
):
1854 Determine if this task need to be done or superseded
1857 my_task
= ro_task
["tasks"][task_index
]
1858 task_id
= my_task
["task_id"]
1859 needed_delete
= ro_task
["vim_info"]["created"] or ro_task
["vim_info"].get(
1860 "created_items", False
1863 if my_task
["status"] == "FAILED":
1864 return None, None # TODO need to be retry??
1867 for index
, task
in enumerate(ro_task
["tasks"]):
1868 if index
== task_index
or not task
:
1872 my_task
["target_record"] == task
["target_record"]
1873 and task
["action"] == "CREATE"
1876 db_update
["tasks.{}.status".format(index
)] = task
[
1879 elif task
["action"] == "CREATE" and task
["status"] not in (
1883 needed_delete
= False
1886 return self
.item2class
[my_task
["item"]].delete(ro_task
, task_index
)
1888 return "SUPERSEDED", None
1889 except Exception as e
:
1890 if not isinstance(e
, NsWorkerException
):
1891 self
.logger
.critical(
1892 "Unexpected exception at _delete_task task={}: {}".format(
1898 return "FAILED", {"vim_status": "VIM_ERROR", "vim_details": str(e
)}
1900 def _create_task(self
, ro_task
, task_index
, task_depends
, db_update
):
1902 Determine if this task need to create something at VIM
1905 my_task
= ro_task
["tasks"][task_index
]
1906 task_id
= my_task
["task_id"]
1909 if my_task
["status"] == "FAILED":
1910 return None, None # TODO need to be retry??
1911 elif my_task
["status"] == "SCHEDULED":
1912 # check if already created by another task
1913 for index
, task
in enumerate(ro_task
["tasks"]):
1914 if index
== task_index
or not task
:
1917 if task
["action"] == "CREATE" and task
["status"] not in (
1922 return task
["status"], "COPY_VIM_INFO"
1925 task_status
, ro_vim_item_update
= self
.item2class
[my_task
["item"]].new(
1926 ro_task
, task_index
, task_depends
1928 # TODO update other CREATE tasks
1929 except Exception as e
:
1930 if not isinstance(e
, NsWorkerException
):
1932 "Error executing task={}: {}".format(task_id
, e
), exc_info
=True
1935 task_status
= "FAILED"
1936 ro_vim_item_update
= {"vim_status": "VIM_ERROR", "vim_details": str(e
)}
1937 # TODO update ro_vim_item_update
1939 return task_status
, ro_vim_item_update
1943 def _get_dependency(self
, task_id
, ro_task
=None, target_id
=None):
1945 Look for dependency task
1946 :param task_id: Can be one of
1947 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1948 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1949 3. task.task_id: "<action_id>:number"
1952 :return: database ro_task plus index of task
1955 task_id
.startswith("vim:")
1956 or task_id
.startswith("sdn:")
1957 or task_id
.startswith("wim:")
1959 target_id
, _
, task_id
= task_id
.partition(" ")
1961 if task_id
.startswith("nsrs:") or task_id
.startswith("vnfrs:"):
1962 ro_task_dependency
= self
.db
.get_one(
1964 q_filter
={"target_id": target_id
, "tasks.target_record_id": task_id
},
1965 fail_on_empty
=False,
1968 if ro_task_dependency
:
1969 for task_index
, task
in enumerate(ro_task_dependency
["tasks"]):
1970 if task
["target_record_id"] == task_id
:
1971 return ro_task_dependency
, task_index
1975 for task_index
, task
in enumerate(ro_task
["tasks"]):
1976 if task
and task
["task_id"] == task_id
:
1977 return ro_task
, task_index
1979 ro_task_dependency
= self
.db
.get_one(
1982 "tasks.ANYINDEX.task_id": task_id
,
1983 "tasks.ANYINDEX.target_record.ne": None,
1985 fail_on_empty
=False,
1988 if ro_task_dependency
:
1989 for task_index
, task
in ro_task_dependency
["tasks"]:
1990 if task
["task_id"] == task_id
:
1991 return ro_task_dependency
, task_index
1992 raise NsWorkerException("Cannot get depending task {}".format(task_id
))
1994 def update_vm_refresh(self
, ro_task
):
1995 """Enables the VM status updates if self.refresh_config.active parameter
1996 is not -1 and then updates the DB accordingly
2000 self
.logger
.debug("Checking if VM status update config")
2001 next_refresh
= time
.time()
2002 next_refresh
= self
._get
_next
_refresh
(ro_task
, next_refresh
)
2004 if next_refresh
!= -1:
2005 db_ro_task_update
= {}
2007 next_check_at
= now
+ (24 * 60 * 60)
2008 next_check_at
= min(next_check_at
, next_refresh
)
2009 db_ro_task_update
["vim_info.refresh_at"] = next_refresh
2010 db_ro_task_update
["to_check_at"] = next_check_at
2013 "Finding tasks which to be updated to enable VM status updates"
2015 refresh_tasks
= self
.db
.get_list(
2018 "tasks.status": "DONE",
2019 "to_check_at.lt": 0,
2022 self
.logger
.debug("Updating tasks to change the to_check_at status")
2023 for task
in refresh_tasks
:
2030 update_dict
=db_ro_task_update
,
2034 except Exception as e
:
2035 self
.logger
.error(f
"Error updating tasks to enable VM status updates: {e}")
2037 def _get_next_refresh(self
, ro_task
: dict, next_refresh
: float):
2038 """Decide the next_refresh according to vim type and refresh config period.
2040 ro_task (dict): ro_task details
2041 next_refresh (float): next refresh time as epoch format
2044 next_refresh (float) -1 if vm updates are disabled or vim type is openstack.
2046 target_vim
= ro_task
["target_id"]
2047 vim_type
= self
.db_vims
[target_vim
]["vim_type"]
2048 if self
.refresh_config
.active
== -1 or vim_type
== "openstack":
2051 next_refresh
+= self
.refresh_config
.active
2054 def _process_pending_tasks(self
, ro_task
):
2055 ro_task_id
= ro_task
["_id"]
2058 next_check_at
= now
+ (24 * 60 * 60)
2059 db_ro_task_update
= {}
2061 def _update_refresh(new_status
):
2062 # compute next_refresh
2064 nonlocal next_check_at
2065 nonlocal db_ro_task_update
2068 next_refresh
= time
.time()
2070 if task
["item"] in ("image", "flavor"):
2071 next_refresh
+= self
.refresh_config
.image
2072 elif new_status
== "BUILD":
2073 next_refresh
+= self
.refresh_config
.build
2074 elif new_status
== "DONE":
2075 next_refresh
= self
._get
_next
_refresh
(ro_task
, next_refresh
)
2077 next_refresh
+= self
.refresh_config
.error
2079 next_check_at
= min(next_check_at
, next_refresh
)
2080 db_ro_task_update
["vim_info.refresh_at"] = next_refresh
2081 ro_task
["vim_info"]["refresh_at"] = next_refresh
2085 # Log RO tasks only when loglevel is DEBUG
2086 if self.logger.getEffectiveLevel() == logging.DEBUG:
2087 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
2089 # Check if vim status refresh is enabled again
2090 self
.update_vm_refresh(ro_task
)
2091 # 0: get task_status_create
2093 task_status_create
= None
2097 for t
in ro_task
["tasks"]
2099 and t
["action"] == "CREATE"
2100 and t
["status"] in ("BUILD", "DONE")
2106 task_status_create
= task_create
["status"]
2108 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
2109 for task_action
in ("DELETE", "CREATE", "EXEC"):
2110 db_vim_update
= None
2113 for task_index
, task
in enumerate(ro_task
["tasks"]):
2115 continue # task deleted
2118 target_update
= None
2122 task_action
in ("DELETE", "EXEC")
2123 and task
["status"] not in ("SCHEDULED", "BUILD")
2125 or task
["action"] != task_action
2127 task_action
== "CREATE"
2128 and task
["status"] in ("FINISHED", "SUPERSEDED")
2133 task_path
= "tasks.{}.status".format(task_index
)
2135 db_vim_info_update
= None
2137 if task
["status"] == "SCHEDULED":
2138 # check if tasks that this depends on have been completed
2139 dependency_not_completed
= False
2141 for dependency_task_id
in task
.get("depends_on") or ():
2144 dependency_task_index
,
2145 ) = self
._get
_dependency
(
2146 dependency_task_id
, target_id
=ro_task
["target_id"]
2148 dependency_task
= dependency_ro_task
["tasks"][
2149 dependency_task_index
2152 if dependency_task
["status"] == "SCHEDULED":
2153 dependency_not_completed
= True
2154 next_check_at
= min(
2155 next_check_at
, dependency_ro_task
["to_check_at"]
2157 # must allow dependent task to be processed first
2158 # to do this set time after last_task_processed
2159 next_check_at
= max(
2160 self
.time_last_task_processed
, next_check_at
2163 elif dependency_task
["status"] == "FAILED":
2164 error_text
= "Cannot {} {} because depends on failed {} {} id={}): {}".format(
2167 dependency_task
["action"],
2168 dependency_task
["item"],
2170 dependency_ro_task
["vim_info"].get(
2175 "task={} {}".format(task
["task_id"], error_text
)
2177 raise NsWorkerException(error_text
)
2179 task_depends
[dependency_task_id
] = dependency_ro_task
[
2183 "TASK-{}".format(dependency_task_id
)
2184 ] = dependency_ro_task
["vim_info"]["vim_id"]
2186 if dependency_not_completed
:
2187 # TODO set at vim_info.vim_details that it is waiting
2190 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
2191 # the task of renew this locking. It will update database locket_at periodically
2193 lock_object
= LockRenew
.add_lock_object(
2194 "ro_tasks", ro_task
, self
2197 if task
["action"] == "DELETE":
2201 ) = self
._delete
_task
(
2202 ro_task
, task_index
, task_depends
, db_ro_task_update
2205 "FINISHED" if new_status
== "DONE" else new_status
2207 # ^with FINISHED instead of DONE it will not be refreshing
2209 if new_status
in ("FINISHED", "SUPERSEDED"):
2210 target_update
= "DELETE"
2211 elif task
["action"] == "EXEC":
2216 ) = self
.item2class
[task
["item"]].exec(
2217 ro_task
, task_index
, task_depends
2220 "FINISHED" if new_status
== "DONE" else new_status
2222 # ^with FINISHED instead of DONE it will not be refreshing
2225 # load into database the modified db_task_update "retries" and "next_retry"
2226 if db_task_update
.get("retries"):
2228 "tasks.{}.retries".format(task_index
)
2229 ] = db_task_update
["retries"]
2231 next_check_at
= time
.time() + db_task_update
.get(
2234 target_update
= None
2235 elif task
["action"] == "CREATE":
2236 if task
["status"] == "SCHEDULED":
2237 if task_status_create
:
2238 new_status
= task_status_create
2239 target_update
= "COPY_VIM_INFO"
2241 new_status
, db_vim_info_update
= self
.item2class
[
2243 ].new(ro_task
, task_index
, task_depends
)
2244 # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
2245 _update_refresh(new_status
)
2247 refresh_at
= ro_task
["vim_info"]["refresh_at"]
2248 if refresh_at
and refresh_at
!= -1 and now
> refresh_at
:
2249 new_status
, db_vim_info_update
= self
.item2class
[
2252 _update_refresh(new_status
)
2254 # The refresh is updated to avoid set the value of "refresh_at" to
2255 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2256 # because it can happen that in this case the task is never processed
2257 _update_refresh(task
["status"])
2259 except Exception as e
:
2260 new_status
= "FAILED"
2261 db_vim_info_update
= {
2262 "vim_status": "VIM_ERROR",
2263 "vim_details": str(e
),
2267 e
, (NsWorkerException
, vimconn
.VimConnException
)
2270 "Unexpected exception at _delete_task task={}: {}".format(
2277 if db_vim_info_update
:
2278 db_vim_update
= db_vim_info_update
.copy()
2279 db_ro_task_update
.update(
2282 for k
, v
in db_vim_info_update
.items()
2285 ro_task
["vim_info"].update(db_vim_info_update
)
2288 if task_action
== "CREATE":
2289 task_status_create
= new_status
2290 db_ro_task_update
[task_path
] = new_status
2292 if target_update
or db_vim_update
:
2293 if target_update
== "DELETE":
2294 self
._update
_target
(task
, None)
2295 elif target_update
== "COPY_VIM_INFO":
2296 self
._update
_target
(task
, ro_task
["vim_info"])
2298 self
._update
_target
(task
, db_vim_update
)
2300 except Exception as e
:
2302 isinstance(e
, DbException
)
2303 and e
.http_code
== HTTPStatus
.NOT_FOUND
2305 # if the vnfrs or nsrs has been removed from database, this task must be removed
2307 "marking to delete task={}".format(task
["task_id"])
2309 self
.tasks_to_delete
.append(task
)
2312 "Unexpected exception at _update_target task={}: {}".format(
2318 locked_at
= ro_task
["locked_at"]
2322 lock_object
["locked_at"],
2323 lock_object
["locked_at"] + self
.task_locked_time
,
2325 # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
2326 # contain exactly locked_at + self.task_locked_time
2327 LockRenew
.remove_lock_object(lock_object
)
2330 "_id": ro_task
["_id"],
2331 "to_check_at": ro_task
["to_check_at"],
2332 "locked_at": locked_at
,
2334 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2335 # outside this task (by ro_nbi) do not update it
2336 db_ro_task_update
["locked_by"] = None
2337 # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
2338 db_ro_task_update
["locked_at"] = int(now
- self
.task_locked_time
)
2339 db_ro_task_update
["modified_at"] = now
2340 db_ro_task_update
["to_check_at"] = next_check_at
2343 # Log RO tasks only when loglevel is DEBUG
2344 if self.logger.getEffectiveLevel() == logging.DEBUG:
2345 db_ro_task_update_log = db_ro_task_update.copy()
2346 db_ro_task_update_log["_id"] = q_filter["_id"]
2347 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2350 if not self
.db
.set_one(
2352 update_dict
=db_ro_task_update
,
2354 fail_on_empty
=False,
2356 del db_ro_task_update
["to_check_at"]
2357 del q_filter
["to_check_at"]
2359 # Log RO tasks only when loglevel is DEBUG
2360 if self.logger.getEffectiveLevel() == logging.DEBUG:
2363 db_ro_task_update_log,
2366 "SET_TASK " + str(q_filter),
2372 update_dict
=db_ro_task_update
,
2375 except DbException
as e
:
2377 "ro_task={} Error updating database {}".format(ro_task_id
, e
)
2379 except Exception as e
:
2381 "Error executing ro_task={}: {}".format(ro_task_id
, e
), exc_info
=True
2384 def _update_target(self
, task
, ro_vim_item_update
):
2385 table
, _
, temp
= task
["target_record"].partition(":")
2386 _id
, _
, path_vim_status
= temp
.partition(":")
2387 path_item
= path_vim_status
[: path_vim_status
.rfind(".")]
2388 path_item
= path_item
[: path_item
.rfind(".")]
2389 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
2390 # path_item: dot separated list targeting record information, e.g. "vdur.10"
2392 if ro_vim_item_update
:
2394 path_vim_status
+ "." + k
: v
2395 for k
, v
in ro_vim_item_update
.items()
2397 in ("vim_id", "vim_details", "vim_name", "vim_status", "interfaces")
2400 if path_vim_status
.startswith("vdur."):
2401 # for backward compatibility, add vdur.name apart from vdur.vim_name
2402 if ro_vim_item_update
.get("vim_name"):
2403 update_dict
[path_item
+ ".name"] = ro_vim_item_update
["vim_name"]
2405 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
2406 if ro_vim_item_update
.get("vim_id"):
2407 update_dict
[path_item
+ ".vim-id"] = ro_vim_item_update
["vim_id"]
2409 # update general status
2410 if ro_vim_item_update
.get("vim_status"):
2411 update_dict
[path_item
+ ".status"] = ro_vim_item_update
[
2415 if ro_vim_item_update
.get("interfaces"):
2416 path_interfaces
= path_item
+ ".interfaces"
2418 for i
, iface
in enumerate(ro_vim_item_update
.get("interfaces")):
2422 path_interfaces
+ ".{}.".format(i
) + k
: v
2423 for k
, v
in iface
.items()
2424 if k
in ("vlan", "compute_node", "pci")
2428 # put ip_address and mac_address with ip-address and mac-address
2429 if iface
.get("ip_address"):
2431 path_interfaces
+ ".{}.".format(i
) + "ip-address"
2432 ] = iface
["ip_address"]
2434 if iface
.get("mac_address"):
2436 path_interfaces
+ ".{}.".format(i
) + "mac-address"
2437 ] = iface
["mac_address"]
2439 if iface
.get("mgmt_vnf_interface") and iface
.get("ip_address"):
2440 update_dict
["ip-address"] = iface
.get("ip_address").split(
2444 if iface
.get("mgmt_vdu_interface") and iface
.get("ip_address"):
2445 update_dict
[path_item
+ ".ip-address"] = iface
.get(
2449 self
.db
.set_one(table
, q_filter
={"_id": _id
}, update_dict
=update_dict
)
2451 update_dict
= {path_item
+ ".status": "DELETED"}
2454 q_filter
={"_id": _id
},
2455 update_dict
=update_dict
,
2456 unset
={path_vim_status
: None},
2459 def _process_delete_db_tasks(self
):
2461 Delete task from database because vnfrs or nsrs or both have been deleted
2462 :return: None. Uses and modify self.tasks_to_delete
2464 while self
.tasks_to_delete
:
2465 task
= self
.tasks_to_delete
[0]
2466 vnfrs_deleted
= None
2467 nsr_id
= task
["nsr_id"]
2469 if task
["target_record"].startswith("vnfrs:"):
2470 # check if nsrs is present
2471 if self
.db
.get_one("nsrs", {"_id": nsr_id
}, fail_on_empty
=False):
2472 vnfrs_deleted
= task
["target_record"].split(":")[1]
2475 self
.delete_db_tasks(self
.db
, nsr_id
, vnfrs_deleted
)
2476 except Exception as e
:
2478 "Error deleting task={}: {}".format(task
["task_id"], e
)
2480 self
.tasks_to_delete
.pop(0)
2483 def delete_db_tasks(db
, nsr_id
, vnfrs_deleted
):
2485 Static method because it is called from osm_ng_ro.ns
2486 :param db: instance of database to use
2487 :param nsr_id: affected nsrs id
2488 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2489 :return: None, exception is fails
2492 for retry
in range(retries
):
2493 ro_tasks
= db
.get_list("ro_tasks", {"tasks.nsr_id": nsr_id
})
2497 for ro_task
in ro_tasks
:
2499 to_delete_ro_task
= True
2501 for index
, task
in enumerate(ro_task
["tasks"]):
2504 elif (not vnfrs_deleted
and task
["nsr_id"] == nsr_id
) or (
2506 and task
["target_record"].startswith("vnfrs:" + vnfrs_deleted
)
2508 db_update
["tasks.{}".format(index
)] = None
2510 # used by other nsr, ro_task cannot be deleted
2511 to_delete_ro_task
= False
2513 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2514 if to_delete_ro_task
:
2518 "_id": ro_task
["_id"],
2519 "modified_at": ro_task
["modified_at"],
2521 fail_on_empty
=False,
2525 db_update
["modified_at"] = now
2529 "_id": ro_task
["_id"],
2530 "modified_at": ro_task
["modified_at"],
2532 update_dict
=db_update
,
2533 fail_on_empty
=False,
2539 raise NsWorkerException("Exceeded {} retries".format(retries
))
2543 self
.logger
.info("Starting")
2545 # step 1: get commands from queue
2547 if self
.vim_targets
:
2548 task
= self
.task_queue
.get(block
=False)
2551 self
.logger
.debug("enters in idle state")
2553 task
= self
.task_queue
.get(block
=True)
2556 if task
[0] == "terminate":
2558 elif task
[0] == "load_vim":
2559 self
.logger
.info("order to load vim {}".format(task
[1]))
2560 self
._load
_vim
(task
[1])
2561 elif task
[0] == "unload_vim":
2562 self
.logger
.info("order to unload vim {}".format(task
[1]))
2563 self
._unload
_vim
(task
[1])
2564 elif task
[0] == "reload_vim":
2565 self
._reload
_vim
(task
[1])
2566 elif task
[0] == "check_vim":
2567 self
.logger
.info("order to check vim {}".format(task
[1]))
2568 self
._check
_vim
(task
[1])
2570 except Exception as e
:
2571 if isinstance(e
, queue
.Empty
):
2574 self
.logger
.critical(
2575 "Error processing task: {}".format(e
), exc_info
=True
2578 # step 2: process pending_tasks, delete not needed tasks
2580 if self
.tasks_to_delete
:
2581 self
._process
_delete
_db
_tasks
()
2584 # Log RO tasks only when loglevel is DEBUG
2585 if self.logger.getEffectiveLevel() == logging.DEBUG:
2586 _ = self._get_db_all_tasks()
2588 ro_task
= self
._get
_db
_task
()
2590 self
._process
_pending
_tasks
(ro_task
)
2594 except Exception as e
:
2595 self
.logger
.critical(
2596 "Unexpected exception at run: " + str(e
), exc_info
=True
2599 self
.logger
.info("Finishing")