1 # -*- coding: utf-8 -*-
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
27 from copy
import deepcopy
28 from http
import HTTPStatus
32 from shutil
import rmtree
36 from typing
import Dict
37 from unittest
.mock
import Mock
39 from importlib_metadata
import entry_points
40 from osm_common
.dbbase
import DbException
41 from osm_ng_ro
.vim_admin
import LockRenew
42 from osm_ro_plugin
import sdnconn
, vimconn
43 from osm_ro_plugin
.sdn_dummy
import SdnDummyConnector
44 from osm_ro_plugin
.vim_dummy
import VimDummyConnector
48 __author__
= "Alfonso Tierno"
49 __date__
= "$28-Sep-2017 12:07:15$"
52 def deep_get(target_dict
, *args
, **kwargs
):
54 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
55 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
56 :param target_dict: dictionary to be read
57 :param args: list of keys to read from target_dict
58 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
59 :return: The wanted value if exist, None or default otherwise
62 if not isinstance(target_dict
, dict) or key
not in target_dict
:
63 return kwargs
.get("default")
64 target_dict
= target_dict
[key
]
68 class NsWorkerException(Exception):
72 class FailingConnector
:
73 def __init__(self
, error_msg
):
74 self
.error_msg
= error_msg
76 for method
in dir(vimconn
.VimConnector
):
79 self
, method
, Mock(side_effect
=vimconn
.VimConnException(error_msg
))
82 for method
in dir(sdnconn
.SdnConnectorBase
):
85 self
, method
, Mock(side_effect
=sdnconn
.SdnConnectorError(error_msg
))
89 class NsWorkerExceptionNotFound(NsWorkerException
):
93 class VimInteractionBase
:
94 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
95 It implements methods that does nothing and return ok"""
97 def __init__(self
, db
, my_vims
, db_vims
, logger
):
100 self
.my_vims
= my_vims
101 self
.db_vims
= db_vims
103 def new(self
, ro_task
, task_index
, task_depends
):
106 def refresh(self
, ro_task
):
107 """skip calling VIM to get image, flavor status. Assumes ok"""
108 if ro_task
["vim_info"]["vim_status"] == "VIM_ERROR":
113 def delete(self
, ro_task
, task_index
):
114 """skip calling VIM to delete image. Assumes ok"""
117 def exec(self
, ro_task
, task_index
, task_depends
):
118 return "DONE", None, None
121 class VimInteractionNet(VimInteractionBase
):
122 def new(self
, ro_task
, task_index
, task_depends
):
124 task
= ro_task
["tasks"][task_index
]
125 task_id
= task
["task_id"]
128 target_vim
= self
.my_vims
[ro_task
["target_id"]]
130 mgmtnet_defined_in_vim
= False
134 if task
.get("find_params"):
135 # if management, get configuration of VIM
136 if task
["find_params"].get("filter_dict"):
137 vim_filter
= task
["find_params"]["filter_dict"]
139 elif task
["find_params"].get("mgmt"):
142 self
.db_vims
[ro_task
["target_id"]],
144 "management_network_id",
146 mgmtnet_defined_in_vim
= True
148 "id": self
.db_vims
[ro_task
["target_id"]]["config"][
149 "management_network_id"
153 self
.db_vims
[ro_task
["target_id"]],
155 "management_network_name",
157 mgmtnet_defined_in_vim
= True
159 "name": self
.db_vims
[ro_task
["target_id"]]["config"][
160 "management_network_name"
164 vim_filter
= {"name": task
["find_params"]["name"]}
166 raise NsWorkerExceptionNotFound(
167 "Invalid find_params for new_net {}".format(task
["find_params"])
170 vim_nets
= target_vim
.get_network_list(vim_filter
)
171 if not vim_nets
and not task
.get("params"):
172 # If there is mgmt-network in the descriptor,
173 # there is no mapping of that network to a VIM network in the descriptor,
174 # also there is no mapping in the "--config" parameter or at VIM creation;
175 # that mgmt-network will be created.
176 if mgmtnet
and not mgmtnet_defined_in_vim
:
178 vim_filter
.get("name")
179 if vim_filter
.get("name")
180 else vim_filter
.get("id")[:16]
182 vim_net_id
, created_items
= target_vim
.new_network(
186 "Created mgmt network vim_net_id: {}".format(vim_net_id
)
190 raise NsWorkerExceptionNotFound(
191 "Network not found with this criteria: '{}'".format(
192 task
.get("find_params")
195 elif len(vim_nets
) > 1:
196 raise NsWorkerException(
197 "More than one network found with this criteria: '{}'".format(
203 vim_net_id
= vim_nets
[0]["id"]
206 params
= task
["params"]
207 vim_net_id
, created_items
= target_vim
.new_network(**params
)
210 ro_vim_item_update
= {
211 "vim_id": vim_net_id
,
212 "vim_status": "BUILD",
214 "created_items": created_items
,
218 "task={} {} new-net={} created={}".format(
219 task_id
, ro_task
["target_id"], vim_net_id
, created
223 return "BUILD", ro_vim_item_update
224 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
226 "task={} vim={} new-net: {}".format(task_id
, ro_task
["target_id"], e
)
228 ro_vim_item_update
= {
229 "vim_status": "VIM_ERROR",
231 "vim_details": str(e
),
234 return "FAILED", ro_vim_item_update
236 def refresh(self
, ro_task
):
237 """Call VIM to get network status"""
238 ro_task_id
= ro_task
["_id"]
239 target_vim
= self
.my_vims
[ro_task
["target_id"]]
240 vim_id
= ro_task
["vim_info"]["vim_id"]
241 net_to_refresh_list
= [vim_id
]
244 vim_dict
= target_vim
.refresh_nets_status(net_to_refresh_list
)
245 vim_info
= vim_dict
[vim_id
]
247 if vim_info
["status"] == "ACTIVE":
249 elif vim_info
["status"] == "BUILD":
250 task_status
= "BUILD"
252 task_status
= "FAILED"
253 except vimconn
.VimConnException
as e
:
254 # Mark all tasks at VIM_ERROR status
256 "ro_task={} vim={} get-net={}: {}".format(
257 ro_task_id
, ro_task
["target_id"], vim_id
, e
260 vim_info
= {"status": "VIM_ERROR", "error_msg": str(e
)}
261 task_status
= "FAILED"
263 ro_vim_item_update
= {}
264 if ro_task
["vim_info"]["vim_status"] != vim_info
["status"]:
265 ro_vim_item_update
["vim_status"] = vim_info
["status"]
267 if ro_task
["vim_info"]["vim_name"] != vim_info
.get("name"):
268 ro_vim_item_update
["vim_name"] = vim_info
.get("name")
270 if vim_info
["status"] in ("ERROR", "VIM_ERROR"):
271 if ro_task
["vim_info"]["vim_details"] != vim_info
.get("error_msg"):
272 ro_vim_item_update
["vim_details"] = vim_info
.get("error_msg")
273 elif vim_info
["status"] == "DELETED":
274 ro_vim_item_update
["vim_id"] = None
275 ro_vim_item_update
["vim_details"] = "Deleted externally"
277 if ro_task
["vim_info"]["vim_details"] != vim_info
["vim_info"]:
278 ro_vim_item_update
["vim_details"] = vim_info
["vim_info"]
280 if ro_vim_item_update
:
282 "ro_task={} {} get-net={}: status={} {}".format(
284 ro_task
["target_id"],
286 ro_vim_item_update
.get("vim_status"),
287 ro_vim_item_update
.get("vim_details")
288 if ro_vim_item_update
.get("vim_status") != "ACTIVE"
293 return task_status
, ro_vim_item_update
295 def delete(self
, ro_task
, task_index
):
296 task
= ro_task
["tasks"][task_index
]
297 task_id
= task
["task_id"]
298 net_vim_id
= ro_task
["vim_info"]["vim_id"]
299 ro_vim_item_update_ok
= {
300 "vim_status": "DELETED",
302 "vim_details": "DELETED",
307 if net_vim_id
or ro_task
["vim_info"]["created_items"]:
308 target_vim
= self
.my_vims
[ro_task
["target_id"]]
309 target_vim
.delete_network(
310 net_vim_id
, ro_task
["vim_info"]["created_items"]
312 except vimconn
.VimConnNotFoundException
:
313 ro_vim_item_update_ok
["vim_details"] = "already deleted"
314 except vimconn
.VimConnException
as e
:
316 "ro_task={} vim={} del-net={}: {}".format(
317 ro_task
["_id"], ro_task
["target_id"], net_vim_id
, e
320 ro_vim_item_update
= {
321 "vim_status": "VIM_ERROR",
322 "vim_details": "Error while deleting: {}".format(e
),
325 return "FAILED", ro_vim_item_update
328 "task={} {} del-net={} {}".format(
330 ro_task
["target_id"],
332 ro_vim_item_update_ok
.get("vim_details", ""),
336 return "DONE", ro_vim_item_update_ok
339 class VimInteractionVdu(VimInteractionBase
):
340 max_retries_inject_ssh_key
= 20 # 20 times
341 time_retries_inject_ssh_key
= 30 # wevery 30 seconds
343 def new(self
, ro_task
, task_index
, task_depends
):
344 task
= ro_task
["tasks"][task_index
]
345 task_id
= task
["task_id"]
348 target_vim
= self
.my_vims
[ro_task
["target_id"]]
352 params
= task
["params"]
353 params_copy
= deepcopy(params
)
354 net_list
= params_copy
["net_list"]
357 # change task_id into network_id
358 if "net_id" in net
and net
["net_id"].startswith("TASK-"):
359 network_id
= task_depends
[net
["net_id"]]
362 raise NsWorkerException(
363 "Cannot create VM because depends on a network not created or found "
364 "for {}".format(net
["net_id"])
367 net
["net_id"] = network_id
369 if params_copy
["image_id"].startswith("TASK-"):
370 params_copy
["image_id"] = task_depends
[params_copy
["image_id"]]
372 if params_copy
["flavor_id"].startswith("TASK-"):
373 params_copy
["flavor_id"] = task_depends
[params_copy
["flavor_id"]]
375 affinity_group_list
= params_copy
["affinity_group_list"]
376 for affinity_group
in affinity_group_list
:
377 # change task_id into affinity_group_id
378 if "affinity_group_id" in affinity_group
and affinity_group
[
380 ].startswith("TASK-"):
381 affinity_group_id
= task_depends
[
382 affinity_group
["affinity_group_id"]
385 if not affinity_group_id
:
386 raise NsWorkerException(
387 "found for {}".format(affinity_group
["affinity_group_id"])
390 affinity_group
["affinity_group_id"] = affinity_group_id
392 vim_vm_id
, created_items
= target_vim
.new_vminstance(**params_copy
)
393 interfaces
= [iface
["vim_id"] for iface
in params_copy
["net_list"]]
395 ro_vim_item_update
= {
397 "vim_status": "BUILD",
399 "created_items": created_items
,
401 "interfaces_vim_ids": interfaces
,
405 "task={} {} new-vm={} created={}".format(
406 task_id
, ro_task
["target_id"], vim_vm_id
, created
410 return "BUILD", ro_vim_item_update
411 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
413 "task={} {} new-vm: {}".format(task_id
, ro_task
["target_id"], e
)
415 ro_vim_item_update
= {
416 "vim_status": "VIM_ERROR",
418 "vim_details": str(e
),
421 return "FAILED", ro_vim_item_update
423 def delete(self
, ro_task
, task_index
):
424 task
= ro_task
["tasks"][task_index
]
425 task_id
= task
["task_id"]
426 vm_vim_id
= ro_task
["vim_info"]["vim_id"]
427 ro_vim_item_update_ok
= {
428 "vim_status": "DELETED",
430 "vim_details": "DELETED",
435 if vm_vim_id
or ro_task
["vim_info"]["created_items"]:
436 target_vim
= self
.my_vims
[ro_task
["target_id"]]
437 target_vim
.delete_vminstance(
438 vm_vim_id
, ro_task
["vim_info"]["created_items"]
440 except vimconn
.VimConnNotFoundException
:
441 ro_vim_item_update_ok
["vim_details"] = "already deleted"
442 except vimconn
.VimConnException
as e
:
444 "ro_task={} vim={} del-vm={}: {}".format(
445 ro_task
["_id"], ro_task
["target_id"], vm_vim_id
, e
448 ro_vim_item_update
= {
449 "vim_status": "VIM_ERROR",
450 "vim_details": "Error while deleting: {}".format(e
),
453 return "FAILED", ro_vim_item_update
456 "task={} {} del-vm={} {}".format(
458 ro_task
["target_id"],
460 ro_vim_item_update_ok
.get("vim_details", ""),
464 return "DONE", ro_vim_item_update_ok
466 def refresh(self
, ro_task
):
467 """Call VIM to get vm status"""
468 ro_task_id
= ro_task
["_id"]
469 target_vim
= self
.my_vims
[ro_task
["target_id"]]
470 vim_id
= ro_task
["vim_info"]["vim_id"]
475 vm_to_refresh_list
= [vim_id
]
477 vim_dict
= target_vim
.refresh_vms_status(vm_to_refresh_list
)
478 vim_info
= vim_dict
[vim_id
]
480 if vim_info
["status"] == "ACTIVE":
482 elif vim_info
["status"] == "BUILD":
483 task_status
= "BUILD"
485 task_status
= "FAILED"
487 # try to load and parse vim_information
489 vim_info_info
= yaml
.safe_load(vim_info
["vim_info"])
490 if vim_info_info
.get("name"):
491 vim_info
["name"] = vim_info_info
["name"]
492 except Exception as vim_info_error
:
493 self
.logger
.exception(
494 f
"{vim_info_error} occured while getting the vim_info from yaml"
496 except vimconn
.VimConnException
as e
:
497 # Mark all tasks at VIM_ERROR status
499 "ro_task={} vim={} get-vm={}: {}".format(
500 ro_task_id
, ro_task
["target_id"], vim_id
, e
503 vim_info
= {"status": "VIM_ERROR", "error_msg": str(e
)}
504 task_status
= "FAILED"
506 ro_vim_item_update
= {}
508 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
510 if vim_info
.get("interfaces"):
511 for vim_iface_id
in ro_task
["vim_info"]["interfaces_vim_ids"]:
515 for iface
in vim_info
["interfaces"]
516 if vim_iface_id
== iface
["vim_interface_id"]
521 # iface.pop("vim_info", None)
522 vim_interfaces
.append(iface
)
526 for t
in ro_task
["tasks"]
527 if t
and t
["action"] == "CREATE" and t
["status"] != "FINISHED"
529 if vim_interfaces
and task_create
.get("mgmt_vnf_interface") is not None:
530 vim_interfaces
[task_create
["mgmt_vnf_interface"]][
534 mgmt_vdu_iface
= task_create
.get(
535 "mgmt_vdu_interface", task_create
.get("mgmt_vnf_interface", 0)
538 vim_interfaces
[mgmt_vdu_iface
]["mgmt_vdu_interface"] = True
540 if ro_task
["vim_info"]["interfaces"] != vim_interfaces
:
541 ro_vim_item_update
["interfaces"] = vim_interfaces
543 if ro_task
["vim_info"]["vim_status"] != vim_info
["status"]:
544 ro_vim_item_update
["vim_status"] = vim_info
["status"]
546 if ro_task
["vim_info"]["vim_name"] != vim_info
.get("name"):
547 ro_vim_item_update
["vim_name"] = vim_info
.get("name")
549 if vim_info
["status"] in ("ERROR", "VIM_ERROR"):
550 if ro_task
["vim_info"]["vim_details"] != vim_info
.get("error_msg"):
551 ro_vim_item_update
["vim_details"] = vim_info
.get("error_msg")
552 elif vim_info
["status"] == "DELETED":
553 ro_vim_item_update
["vim_id"] = None
554 ro_vim_item_update
["vim_details"] = "Deleted externally"
556 if ro_task
["vim_info"]["vim_details"] != vim_info
["vim_info"]:
557 ro_vim_item_update
["vim_details"] = vim_info
["vim_info"]
559 if ro_vim_item_update
:
561 "ro_task={} {} get-vm={}: status={} {}".format(
563 ro_task
["target_id"],
565 ro_vim_item_update
.get("vim_status"),
566 ro_vim_item_update
.get("vim_details")
567 if ro_vim_item_update
.get("vim_status") != "ACTIVE"
572 return task_status
, ro_vim_item_update
574 def exec(self
, ro_task
, task_index
, task_depends
):
575 task
= ro_task
["tasks"][task_index
]
576 task_id
= task
["task_id"]
577 target_vim
= self
.my_vims
[ro_task
["target_id"]]
578 db_task_update
= {"retries": 0}
579 retries
= task
.get("retries", 0)
582 params
= task
["params"]
583 params_copy
= deepcopy(params
)
584 params_copy
["ro_key"] = self
.db
.decrypt(
585 params_copy
.pop("private_key"),
586 params_copy
.pop("schema_version"),
587 params_copy
.pop("salt"),
589 params_copy
["ip_addr"] = params_copy
.pop("ip_address")
590 target_vim
.inject_user_key(**params_copy
)
592 "task={} {} action-vm=inject_key".format(task_id
, ro_task
["target_id"])
599 ) # params_copy["key"]
600 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
603 self
.logger
.debug(traceback
.format_exc())
604 if retries
< self
.max_retries_inject_ssh_key
:
610 "next_retry": self
.time_retries_inject_ssh_key
,
615 "task={} {} inject-ssh-key: {}".format(task_id
, ro_task
["target_id"], e
)
617 ro_vim_item_update
= {"vim_details": str(e
)}
619 return "FAILED", ro_vim_item_update
, db_task_update
622 class VimInteractionImage(VimInteractionBase
):
623 def new(self
, ro_task
, task_index
, task_depends
):
624 task
= ro_task
["tasks"][task_index
]
625 task_id
= task
["task_id"]
628 target_vim
= self
.my_vims
[ro_task
["target_id"]]
632 if task
.get("find_params"):
633 vim_images
= target_vim
.get_image_list(**task
["find_params"])
636 raise NsWorkerExceptionNotFound(
637 "Image not found with this criteria: '{}'".format(
641 elif len(vim_images
) > 1:
642 raise NsWorkerException(
643 "More than one image found with this criteria: '{}'".format(
648 vim_image_id
= vim_images
[0]["id"]
650 ro_vim_item_update
= {
651 "vim_id": vim_image_id
,
652 "vim_status": "DONE",
654 "created_items": created_items
,
658 "task={} {} new-image={} created={}".format(
659 task_id
, ro_task
["target_id"], vim_image_id
, created
663 return "DONE", ro_vim_item_update
664 except (NsWorkerException
, vimconn
.VimConnException
) as e
:
666 "task={} {} new-image: {}".format(task_id
, ro_task
["target_id"], e
)
668 ro_vim_item_update
= {
669 "vim_status": "VIM_ERROR",
671 "vim_details": str(e
),
674 return "FAILED", ro_vim_item_update
677 class VimInteractionFlavor(VimInteractionBase
):
678 def delete(self
, ro_task
, task_index
):
679 task
= ro_task
["tasks"][task_index
]
680 task_id
= task
["task_id"]
681 flavor_vim_id
= ro_task
["vim_info"]["vim_id"]
682 ro_vim_item_update_ok
= {
683 "vim_status": "DELETED",
685 "vim_details": "DELETED",
691 target_vim
= self
.my_vims
[ro_task
["target_id"]]
692 target_vim
.delete_flavor(flavor_vim_id
)
693 except vimconn
.VimConnNotFoundException
:
694 ro_vim_item_update_ok
["vim_details"] = "already deleted"
695 except vimconn
.VimConnException
as e
:
697 "ro_task={} vim={} del-flavor={}: {}".format(
698 ro_task
["_id"], ro_task
["target_id"], flavor_vim_id
, e
701 ro_vim_item_update
= {
702 "vim_status": "VIM_ERROR",
703 "vim_details": "Error while deleting: {}".format(e
),
706 return "FAILED", ro_vim_item_update
709 "task={} {} del-flavor={} {}".format(
711 ro_task
["target_id"],
713 ro_vim_item_update_ok
.get("vim_details", ""),
717 return "DONE", ro_vim_item_update_ok
719 def new(self
, ro_task
, task_index
, task_depends
):
720 task
= ro_task
["tasks"][task_index
]
721 task_id
= task
["task_id"]
724 target_vim
= self
.my_vims
[ro_task
["target_id"]]
730 if task
.get("find_params"):
732 flavor_data
= task
["find_params"]["flavor_data"]
733 vim_flavor_id
= target_vim
.get_flavor_id_from_data(flavor_data
)
734 except vimconn
.VimConnNotFoundException
:
735 self
.logger
.exception("VimConnNotFoundException occured.")
737 if not vim_flavor_id
and task
.get("params"):
739 flavor_data
= task
["params"]["flavor_data"]
740 vim_flavor_id
= target_vim
.new_flavor(flavor_data
)
743 ro_vim_item_update
= {
744 "vim_id": vim_flavor_id
,
745 "vim_status": "DONE",
747 "created_items": created_items
,
751 "task={} {} new-flavor={} created={}".format(
752 task_id
, ro_task
["target_id"], vim_flavor_id
, created
756 return "DONE", ro_vim_item_update
757 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
759 "task={} vim={} new-flavor: {}".format(task_id
, ro_task
["target_id"], e
)
761 ro_vim_item_update
= {
762 "vim_status": "VIM_ERROR",
764 "vim_details": str(e
),
767 return "FAILED", ro_vim_item_update
770 class VimInteractionAffinityGroup(VimInteractionBase
):
771 def delete(self
, ro_task
, task_index
):
772 task
= ro_task
["tasks"][task_index
]
773 task_id
= task
["task_id"]
774 affinity_group_vim_id
= ro_task
["vim_info"]["vim_id"]
775 ro_vim_item_update_ok
= {
776 "vim_status": "DELETED",
778 "vim_details": "DELETED",
783 if affinity_group_vim_id
:
784 target_vim
= self
.my_vims
[ro_task
["target_id"]]
785 target_vim
.delete_affinity_group(affinity_group_vim_id
)
786 except vimconn
.VimConnNotFoundException
:
787 ro_vim_item_update_ok
["vim_details"] = "already deleted"
788 except vimconn
.VimConnException
as e
:
790 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format(
791 ro_task
["_id"], ro_task
["target_id"], affinity_group_vim_id
, e
794 ro_vim_item_update
= {
795 "vim_status": "VIM_ERROR",
796 "vim_details": "Error while deleting: {}".format(e
),
799 return "FAILED", ro_vim_item_update
802 "task={} {} del-affinity-or-anti-affinity-group={} {}".format(
804 ro_task
["target_id"],
805 affinity_group_vim_id
,
806 ro_vim_item_update_ok
.get("vim_details", ""),
810 return "DONE", ro_vim_item_update_ok
812 def new(self
, ro_task
, task_index
, task_depends
):
813 task
= ro_task
["tasks"][task_index
]
814 task_id
= task
["task_id"]
817 target_vim
= self
.my_vims
[ro_task
["target_id"]]
820 affinity_group_vim_id
= None
821 affinity_group_data
= None
823 if task
.get("params"):
824 affinity_group_data
= task
["params"].get("affinity_group_data")
826 if affinity_group_data
and affinity_group_data
.get("vim-affinity-group-id"):
828 param_affinity_group_id
= task
["params"]["affinity_group_data"].get(
829 "vim-affinity-group-id"
831 affinity_group_vim_id
= target_vim
.get_affinity_group(
832 param_affinity_group_id
834 except vimconn
.VimConnNotFoundException
:
836 "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}"
837 "could not be found at VIM. Creating a new one.".format(
838 task_id
, ro_task
["target_id"], param_affinity_group_id
842 if not affinity_group_vim_id
and affinity_group_data
:
843 affinity_group_vim_id
= target_vim
.new_affinity_group(
848 ro_vim_item_update
= {
849 "vim_id": affinity_group_vim_id
,
850 "vim_status": "DONE",
852 "created_items": created_items
,
856 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format(
857 task_id
, ro_task
["target_id"], affinity_group_vim_id
, created
861 return "DONE", ro_vim_item_update
862 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
864 "task={} vim={} new-affinity-or-anti-affinity-group:"
865 " {}".format(task_id
, ro_task
["target_id"], e
)
867 ro_vim_item_update
= {
868 "vim_status": "VIM_ERROR",
870 "vim_details": str(e
),
873 return "FAILED", ro_vim_item_update
876 class VimInteractionSdnNet(VimInteractionBase
):
878 def _match_pci(port_pci
, mapping
):
880 Check if port_pci matches with mapping
881 mapping can have brackets to indicate that several chars are accepted. e.g
882 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
883 :param port_pci: text
884 :param mapping: text, can contain brackets to indicate several chars are available
885 :return: True if matches, False otherwise
887 if not port_pci
or not mapping
:
889 if port_pci
== mapping
:
895 bracket_start
= mapping
.find("[", mapping_index
)
897 if bracket_start
== -1:
900 bracket_end
= mapping
.find("]", bracket_start
)
901 if bracket_end
== -1:
904 length
= bracket_start
- mapping_index
907 and port_pci
[pci_index
: pci_index
+ length
]
908 != mapping
[mapping_index
:bracket_start
]
913 port_pci
[pci_index
+ length
]
914 not in mapping
[bracket_start
+ 1 : bracket_end
]
918 pci_index
+= length
+ 1
919 mapping_index
= bracket_end
+ 1
921 if port_pci
[pci_index
:] != mapping
[mapping_index
:]:
926 def _get_interfaces(self
, vlds_to_connect
, vim_account_id
):
928 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
929 :param vim_account_id:
934 for vld
in vlds_to_connect
:
935 table
, _
, db_id
= vld
.partition(":")
936 db_id
, _
, vld
= db_id
.partition(":")
937 _
, _
, vld_id
= vld
.partition(".")
940 q_filter
= {"vim-account-id": vim_account_id
, "_id": db_id
}
941 iface_key
= "vnf-vld-id"
942 else: # table == "nsrs"
943 q_filter
= {"vim-account-id": vim_account_id
, "nsr-id-ref": db_id
}
944 iface_key
= "ns-vld-id"
946 db_vnfrs
= self
.db
.get_list("vnfrs", q_filter
=q_filter
)
948 for db_vnfr
in db_vnfrs
:
949 for vdu_index
, vdur
in enumerate(db_vnfr
.get("vdur", ())):
950 for iface_index
, interface
in enumerate(vdur
["interfaces"]):
951 if interface
.get(iface_key
) == vld_id
and interface
.get(
953 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
955 interface_
= interface
.copy()
956 interface_
["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
957 db_vnfr
["_id"], vdu_index
, iface_index
960 if vdur
.get("status") == "ERROR":
961 interface_
["status"] = "ERROR"
963 interfaces
.append(interface_
)
967 def refresh(self
, ro_task
):
968 # look for task create
969 task_create_index
, _
= next(
971 for i_t
in enumerate(ro_task
["tasks"])
973 and i_t
[1]["action"] == "CREATE"
974 and i_t
[1]["status"] != "FINISHED"
977 return self
.new(ro_task
, task_create_index
, None)
979 def new(self
, ro_task
, task_index
, task_depends
):
981 task
= ro_task
["tasks"][task_index
]
982 task_id
= task
["task_id"]
983 target_vim
= self
.my_vims
[ro_task
["target_id"]]
985 sdn_net_id
= ro_task
["vim_info"]["vim_id"]
987 created_items
= ro_task
["vim_info"].get("created_items")
988 connected_ports
= ro_task
["vim_info"].get("connected_ports", [])
989 new_connected_ports
= []
990 last_update
= ro_task
["vim_info"].get("last_update", 0)
991 sdn_status
= ro_task
["vim_info"].get("vim_status", "BUILD") or "BUILD"
993 created
= ro_task
["vim_info"].get("created", False)
997 params
= task
["params"]
998 vlds_to_connect
= params
.get("vlds", [])
999 associated_vim
= params
.get("target_vim")
1000 # external additional ports
1001 additional_ports
= params
.get("sdn-ports") or ()
1002 _
, _
, vim_account_id
= (
1004 if associated_vim
is None
1005 else associated_vim
.partition(":")
1009 # get associated VIM
1010 if associated_vim
not in self
.db_vims
:
1011 self
.db_vims
[associated_vim
] = self
.db
.get_one(
1012 "vim_accounts", {"_id": vim_account_id
}
1015 db_vim
= self
.db_vims
[associated_vim
]
1017 # look for ports to connect
1018 ports
= self
._get
_interfaces
(vlds_to_connect
, vim_account_id
)
1022 pending_ports
= error_ports
= 0
1024 sdn_need_update
= False
1027 vlan_used
= port
.get("vlan") or vlan_used
1029 # TODO. Do not connect if already done
1030 if not port
.get("compute_node") or not port
.get("pci"):
1031 if port
.get("status") == "ERROR":
1038 compute_node_mappings
= next(
1041 for c
in db_vim
["config"].get("sdn-port-mapping", ())
1042 if c
and c
["compute_node"] == port
["compute_node"]
1047 if compute_node_mappings
:
1048 # process port_mapping pci of type 0000:af:1[01].[1357]
1052 for p
in compute_node_mappings
["ports"]
1053 if self
._match
_pci
(port
["pci"], p
.get("pci"))
1059 if not db_vim
["config"].get("mapping_not_needed"):
1061 "Port mapping not found for compute_node={} pci={}".format(
1062 port
["compute_node"], port
["pci"]
1069 service_endpoint_id
= "{}:{}".format(port
["compute_node"], port
["pci"])
1071 "service_endpoint_id": pmap
.get("service_endpoint_id")
1072 or service_endpoint_id
,
1073 "service_endpoint_encapsulation_type": "dot1q"
1074 if port
["type"] == "SR-IOV"
1076 "service_endpoint_encapsulation_info": {
1077 "vlan": port
.get("vlan"),
1078 "mac": port
.get("mac-address"),
1079 "device_id": pmap
.get("device_id") or port
["compute_node"],
1080 "device_interface_id": pmap
.get("device_interface_id")
1082 "switch_dpid": pmap
.get("switch_id") or pmap
.get("switch_dpid"),
1083 "switch_port": pmap
.get("switch_port"),
1084 "service_mapping_info": pmap
.get("service_mapping_info"),
1089 # if port["modified_at"] > last_update:
1090 # sdn_need_update = True
1091 new_connected_ports
.append(port
["id"]) # TODO
1092 sdn_ports
.append(new_port
)
1096 "{} interfaces have not been created as VDU is on ERROR status".format(
1101 # connect external ports
1102 for index
, additional_port
in enumerate(additional_ports
):
1103 additional_port_id
= additional_port
.get(
1104 "service_endpoint_id"
1105 ) or "external-{}".format(index
)
1108 "service_endpoint_id": additional_port_id
,
1109 "service_endpoint_encapsulation_type": additional_port
.get(
1110 "service_endpoint_encapsulation_type", "dot1q"
1112 "service_endpoint_encapsulation_info": {
1113 "vlan": additional_port
.get("vlan") or vlan_used
,
1114 "mac": additional_port
.get("mac_address"),
1115 "device_id": additional_port
.get("device_id"),
1116 "device_interface_id": additional_port
.get(
1117 "device_interface_id"
1119 "switch_dpid": additional_port
.get("switch_dpid")
1120 or additional_port
.get("switch_id"),
1121 "switch_port": additional_port
.get("switch_port"),
1122 "service_mapping_info": additional_port
.get(
1123 "service_mapping_info"
1128 new_connected_ports
.append(additional_port_id
)
1131 # if there are more ports to connect or they have been modified, call create/update
1133 sdn_status
= "ERROR"
1134 sdn_info
= "; ".join(error_list
)
1135 elif set(connected_ports
) != set(new_connected_ports
) or sdn_need_update
:
1136 last_update
= time
.time()
1139 if len(sdn_ports
) < 2:
1140 sdn_status
= "ACTIVE"
1142 if not pending_ports
:
1144 "task={} {} new-sdn-net done, less than 2 ports".format(
1145 task_id
, ro_task
["target_id"]
1149 net_type
= params
.get("type") or "ELAN"
1153 ) = target_vim
.create_connectivity_service(net_type
, sdn_ports
)
1156 "task={} {} new-sdn-net={} created={}".format(
1157 task_id
, ro_task
["target_id"], sdn_net_id
, created
1161 created_items
= target_vim
.edit_connectivity_service(
1162 sdn_net_id
, conn_info
=created_items
, connection_points
=sdn_ports
1166 "task={} {} update-sdn-net={} created={}".format(
1167 task_id
, ro_task
["target_id"], sdn_net_id
, created
1171 connected_ports
= new_connected_ports
1173 wim_status_dict
= target_vim
.get_connectivity_service_status(
1174 sdn_net_id
, conn_info
=created_items
1176 sdn_status
= wim_status_dict
["sdn_status"]
1178 if wim_status_dict
.get("sdn_info"):
1179 sdn_info
= str(wim_status_dict
.get("sdn_info")) or ""
1181 if wim_status_dict
.get("error_msg"):
1182 sdn_info
= wim_status_dict
.get("error_msg") or ""
1185 if sdn_status
!= "ERROR":
1186 sdn_info
= "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1187 len(ports
) - pending_ports
, len(ports
)
1190 if sdn_status
== "ACTIVE":
1191 sdn_status
= "BUILD"
1193 ro_vim_item_update
= {
1194 "vim_id": sdn_net_id
,
1195 "vim_status": sdn_status
,
1197 "created_items": created_items
,
1198 "connected_ports": connected_ports
,
1199 "vim_details": sdn_info
,
1200 "last_update": last_update
,
1203 return sdn_status
, ro_vim_item_update
1204 except Exception as e
:
1206 "task={} vim={} new-net: {}".format(task_id
, ro_task
["target_id"], e
),
1207 exc_info
=not isinstance(
1208 e
, (sdnconn
.SdnConnectorError
, vimconn
.VimConnException
)
1211 ro_vim_item_update
= {
1212 "vim_status": "VIM_ERROR",
1214 "vim_details": str(e
),
1217 return "FAILED", ro_vim_item_update
1219 def delete(self
, ro_task
, task_index
):
1220 task
= ro_task
["tasks"][task_index
]
1221 task_id
= task
["task_id"]
1222 sdn_vim_id
= ro_task
["vim_info"].get("vim_id")
1223 ro_vim_item_update_ok
= {
1224 "vim_status": "DELETED",
1226 "vim_details": "DELETED",
1232 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1233 target_vim
.delete_connectivity_service(
1234 sdn_vim_id
, ro_task
["vim_info"].get("created_items")
1237 except Exception as e
:
1239 isinstance(e
, sdnconn
.SdnConnectorError
)
1240 and e
.http_code
== HTTPStatus
.NOT_FOUND
.value
1242 ro_vim_item_update_ok
["vim_details"] = "already deleted"
1245 "ro_task={} vim={} del-sdn-net={}: {}".format(
1246 ro_task
["_id"], ro_task
["target_id"], sdn_vim_id
, e
1248 exc_info
=not isinstance(
1249 e
, (sdnconn
.SdnConnectorError
, vimconn
.VimConnException
)
1252 ro_vim_item_update
= {
1253 "vim_status": "VIM_ERROR",
1254 "vim_details": "Error while deleting: {}".format(e
),
1257 return "FAILED", ro_vim_item_update
1260 "task={} {} del-sdn-net={} {}".format(
1262 ro_task
["target_id"],
1264 ro_vim_item_update_ok
.get("vim_details", ""),
1268 return "DONE", ro_vim_item_update_ok
1271 class ConfigValidate
:
1272 def __init__(self
, config
: Dict
):
1277 # default 1 min, allowed >= 60 or -1, -1 disables periodic checks
1279 self
.conf
["period"]["refresh_active"] >= 60
1280 or self
.conf
["period"]["refresh_active"] == -1
1282 return self
.conf
["period"]["refresh_active"]
1288 return self
.conf
["period"]["refresh_build"]
1292 return self
.conf
["period"]["refresh_image"]
1296 return self
.conf
["period"]["refresh_error"]
1299 def queue_size(self
):
1300 return self
.conf
["period"]["queue_size"]
1303 class NsWorker(threading
.Thread
):
1304 def __init__(self
, worker_index
, config
, plugins
, db
):
1307 :param worker_index: thread index
1308 :param config: general configuration of RO, among others the process_id with the docker id where it runs
1309 :param plugins: global shared dict with the loaded plugins
1310 :param db: database class instance to use
1312 threading
.Thread
.__init
__(self
)
1313 self
.config
= config
1314 self
.plugins
= plugins
1315 self
.plugin_name
= "unknown"
1316 self
.logger
= logging
.getLogger("ro.worker{}".format(worker_index
))
1317 self
.worker_index
= worker_index
1318 # refresh periods for created items
1319 self
.refresh_config
= ConfigValidate(config
)
1320 self
.task_queue
= queue
.Queue(self
.refresh_config
.queue_size
)
1321 # targetvim: vimplugin class
1323 # targetvim: vim information from database
1326 self
.vim_targets
= []
1327 self
.my_id
= config
["process_id"] + ":" + str(worker_index
)
1330 "net": VimInteractionNet(self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
),
1331 "vdu": VimInteractionVdu(self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
),
1332 "image": VimInteractionImage(
1333 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1335 "flavor": VimInteractionFlavor(
1336 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1338 "sdn_net": VimInteractionSdnNet(
1339 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1341 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
1342 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1345 self
.time_last_task_processed
= None
1346 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1347 self
.tasks_to_delete
= []
1348 # it is idle when there are not vim_targets associated
1350 self
.task_locked_time
= config
["global"]["task_locked_time"]
1352 def insert_task(self
, task
):
1354 self
.task_queue
.put(task
, False)
1357 raise NsWorkerException("timeout inserting a task")
1359 def terminate(self
):
1360 self
.insert_task("exit")
1362 def del_task(self
, task
):
1363 with self
.task_lock
:
1364 if task
["status"] == "SCHEDULED":
1365 task
["status"] = "SUPERSEDED"
1367 else: # task["status"] == "processing"
1368 self
.task_lock
.release()
1371 def _process_vim_config(self
, target_id
, db_vim
):
1373 Process vim config, creating vim configuration files as ca_cert
1374 :param target_id: vim/sdn/wim + id
1375 :param db_vim: Vim dictionary obtained from database
1376 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1378 if not db_vim
.get("config"):
1384 if db_vim
["config"].get("ca_cert_content"):
1385 file_name
= "{}:{}".format(target_id
, self
.worker_index
)
1389 except FileExistsError
:
1390 self
.logger
.exception(
1391 "FileExistsError occured while processing vim_config."
1394 file_name
= file_name
+ "/ca_cert"
1396 with
open(file_name
, "w") as f
:
1397 f
.write(db_vim
["config"]["ca_cert_content"])
1398 del db_vim
["config"]["ca_cert_content"]
1399 db_vim
["config"]["ca_cert"] = file_name
1400 except Exception as e
:
1401 raise NsWorkerException(
1402 "Error writing to file '{}': {}".format(file_name
, e
)
1405 def _load_plugin(self
, name
, type="vim"):
1406 # type can be vim or sdn
1407 if "rovim_dummy" not in self
.plugins
:
1408 self
.plugins
["rovim_dummy"] = VimDummyConnector
1410 if "rosdn_dummy" not in self
.plugins
:
1411 self
.plugins
["rosdn_dummy"] = SdnDummyConnector
1413 if name
in self
.plugins
:
1414 return self
.plugins
[name
]
1417 for ep
in entry_points(group
="osm_ro{}.plugins".format(type), name
=name
):
1418 self
.plugins
[name
] = ep
.load()
1419 except Exception as e
:
1420 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name
, e
))
1422 if name
and name
not in self
.plugins
:
1423 raise NsWorkerException(
1424 "Plugin 'osm_{n}' has not been installed".format(n
=name
)
1427 return self
.plugins
[name
]
1429 def _unload_vim(self
, target_id
):
1431 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1432 :param target_id: Contains type:_id; where type can be 'vim', ...
1436 self
.db_vims
.pop(target_id
, None)
1437 self
.my_vims
.pop(target_id
, None)
1439 if target_id
in self
.vim_targets
:
1440 self
.vim_targets
.remove(target_id
)
1442 self
.logger
.info("Unloaded {}".format(target_id
))
1443 rmtree("{}:{}".format(target_id
, self
.worker_index
))
1444 except FileNotFoundError
:
1445 # This is raised by rmtree if folder does not exist.
1446 self
.logger
.exception("FileNotFoundError occured while unloading VIM.")
1447 except Exception as e
:
1448 self
.logger
.error("Cannot unload {}: {}".format(target_id
, e
))
1450 def _check_vim(self
, target_id
):
1452 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1453 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1456 target
, _
, _id
= target_id
.partition(":")
1462 loaded
= target_id
in self
.vim_targets
1472 step
= "Getting {} from db".format(target_id
)
1473 db_vim
= self
.db
.get_one(target_database
, {"_id": _id
})
1475 for op_index
, operation
in enumerate(
1476 db_vim
["_admin"].get("operations", ())
1478 if operation
["operationState"] != "PROCESSING":
1481 locked_at
= operation
.get("locked_at")
1483 if locked_at
is not None and locked_at
>= now
- self
.task_locked_time
:
1484 # some other thread is doing this operation
1488 op_text
= "_admin.operations.{}.".format(op_index
)
1490 if not self
.db
.set_one(
1494 op_text
+ "operationState": "PROCESSING",
1495 op_text
+ "locked_at": locked_at
,
1498 op_text
+ "locked_at": now
,
1499 "admin.current_operation": op_index
,
1501 fail_on_empty
=False,
1505 unset_dict
[op_text
+ "locked_at"] = None
1506 unset_dict
["current_operation"] = None
1507 step
= "Loading " + target_id
1508 error_text
= self
._load
_vim
(target_id
)
1511 step
= "Checking connectivity"
1514 self
.my_vims
[target_id
].check_vim_connectivity()
1516 self
.my_vims
[target_id
].check_credentials()
1518 update_dict
["_admin.operationalState"] = "ENABLED"
1519 update_dict
["_admin.detailed-status"] = ""
1520 unset_dict
[op_text
+ "detailed-status"] = None
1521 update_dict
[op_text
+ "operationState"] = "COMPLETED"
1525 except Exception as e
:
1526 error_text
= "{}: {}".format(step
, e
)
1527 self
.logger
.error("{} for {}: {}".format(step
, target_id
, e
))
1530 if update_dict
or unset_dict
:
1532 update_dict
[op_text
+ "operationState"] = "FAILED"
1533 update_dict
[op_text
+ "detailed-status"] = error_text
1534 unset_dict
.pop(op_text
+ "detailed-status", None)
1535 update_dict
["_admin.operationalState"] = "ERROR"
1536 update_dict
["_admin.detailed-status"] = error_text
1539 update_dict
[op_text
+ "statusEnteredTime"] = now
1543 q_filter
={"_id": _id
},
1544 update_dict
=update_dict
,
1546 fail_on_empty
=False,
1550 self
._unload
_vim
(target_id
)
1552 def _reload_vim(self
, target_id
):
1553 if target_id
in self
.vim_targets
:
1554 self
._load
_vim
(target_id
)
1556 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1557 # just remove it to force load again next time it is needed
1558 self
.db_vims
.pop(target_id
, None)
1560 def _load_vim(self
, target_id
):
1562 Load or reload a vim_account, sdn_controller or wim_account.
1563 Read content from database, load the plugin if not loaded.
1564 In case of error loading the plugin, it load a failing VIM_connector
1565 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1566 :param target_id: Contains type:_id; where type can be 'vim', ...
1567 :return: None if ok, descriptive text if error
1569 target
, _
, _id
= target_id
.partition(":")
1581 step
= "Getting {}={} from db".format(target
, _id
)
1582 # TODO process for wim, sdnc, ...
1583 vim
= self
.db
.get_one(target_database
, {"_id": _id
})
1585 # if deep_get(vim, "config", "sdn-controller"):
1586 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1587 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1589 step
= "Decrypting password"
1590 schema_version
= vim
.get("schema_version")
1591 self
.db
.encrypt_decrypt_fields(
1594 fields
=("password", "secret"),
1595 schema_version
=schema_version
,
1598 self
._process
_vim
_config
(target_id
, vim
)
1601 plugin_name
= "rovim_" + vim
["vim_type"]
1602 step
= "Loading plugin '{}'".format(plugin_name
)
1603 vim_module_conn
= self
._load
_plugin
(plugin_name
)
1604 step
= "Loading {}'".format(target_id
)
1605 self
.my_vims
[target_id
] = vim_module_conn(
1608 tenant_id
=vim
.get("vim_tenant_id"),
1609 tenant_name
=vim
.get("vim_tenant_name"),
1612 user
=vim
["vim_user"],
1613 passwd
=vim
["vim_password"],
1614 config
=vim
.get("config") or {},
1618 plugin_name
= "rosdn_" + (vim
.get("type") or vim
.get("wim_type"))
1619 step
= "Loading plugin '{}'".format(plugin_name
)
1620 vim_module_conn
= self
._load
_plugin
(plugin_name
, "sdn")
1621 step
= "Loading {}'".format(target_id
)
1623 wim_config
= wim
.pop("config", {}) or {}
1624 wim
["uuid"] = wim
["_id"]
1625 if "url" in wim
and "wim_url" not in wim
:
1626 wim
["wim_url"] = wim
["url"]
1627 elif "url" not in wim
and "wim_url" in wim
:
1628 wim
["url"] = wim
["wim_url"]
1631 wim_config
["dpid"] = wim
.pop("dpid")
1633 if wim
.get("switch_id"):
1634 wim_config
["switch_id"] = wim
.pop("switch_id")
1636 # wim, wim_account, config
1637 self
.my_vims
[target_id
] = vim_module_conn(wim
, wim
, wim_config
)
1638 self
.db_vims
[target_id
] = vim
1639 self
.error_status
= None
1642 "Connector loaded for {}, plugin={}".format(target_id
, plugin_name
)
1644 except Exception as e
:
1646 "Cannot load {} plugin={}: {} {}".format(
1647 target_id
, plugin_name
, step
, e
1651 self
.db_vims
[target_id
] = vim
or {}
1652 self
.db_vims
[target_id
] = FailingConnector(str(e
))
1653 error_status
= "{} Error: {}".format(step
, e
)
1657 if target_id
not in self
.vim_targets
:
1658 self
.vim_targets
.append(target_id
)
1660 def _get_db_task(self
):
1662 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1667 if not self
.time_last_task_processed
:
1668 self
.time_last_task_processed
= now
1673 # Log RO tasks only when loglevel is DEBUG
1674 if self.logger.getEffectiveLevel() == logging.DEBUG:
1681 + str(self.task_locked_time)
1683 + "time_last_task_processed="
1684 + str(self.time_last_task_processed)
1690 locked
= self
.db
.set_one(
1693 "target_id": self
.vim_targets
,
1694 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1695 "locked_at.lt": now
- self
.task_locked_time
,
1696 "to_check_at.lt": self
.time_last_task_processed
,
1697 "to_check_at.gt": -1,
1699 update_dict
={"locked_by": self
.my_id
, "locked_at": now
},
1700 fail_on_empty
=False,
1705 ro_task
= self
.db
.get_one(
1708 "target_id": self
.vim_targets
,
1709 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1715 if self
.time_last_task_processed
== now
:
1716 self
.time_last_task_processed
= None
1719 self
.time_last_task_processed
= now
1720 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
1722 except DbException
as e
:
1723 self
.logger
.error("Database exception at _get_db_task: {}".format(e
))
1724 except Exception as e
:
1725 self
.logger
.critical(
1726 "Unexpected exception at _get_db_task: {}".format(e
), exc_info
=True
1731 def _get_db_all_tasks(self
):
1733 Read all content of table ro_tasks to log it
1737 # Checking the content of the BD:
1740 ro_task
= self
.db
.get_list("ro_tasks")
1742 self
._log
_ro
_task
(rt
, None, None, "TASK_WF", "GET_ALL_TASKS")
1745 except DbException
as e
:
1746 self
.logger
.error("Database exception at _get_db_all_tasks: {}".format(e
))
1747 except Exception as e
:
1748 self
.logger
.critical(
1749 "Unexpected exception at _get_db_all_tasks: {}".format(e
), exc_info
=True
1754 def _log_ro_task(self
, ro_task
, db_ro_task_update
, db_ro_task_delete
, mark
, event
):
1756 Generate a log with the following format:
1758 Mark;Event;ro_task_id;locked_at;modified_at;created_at;to_check_at;locked_by;
1759 target_id;vim_info.refresh_at;vim_info;no_of_tasks;task_status;action_id;
1760 task_array_index;task_id;task_action;task_item;task_args
1764 TASK_WF;GET_TASK;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;1642158724.8210013;
1765 1642158640.7986135;1642158640.7986135;1642158640.7986135;b134c9494e75:0a
1766 ;vim:b7ff9e24-8868-4d68-8a57-a59dc11d0327;None;{'created': False,
1767 'created_items': None, 'vim_id': None, 'vim_name': None, 'vim_status': None,
1768 'vim_details': None, 'refresh_at': None};1;SCHEDULED;
1769 888f1864-749a-4fc2-bc1a-97c0fffd6a6f;0;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;
1770 CREATE;image;{'filter_dict': {'name': 'ubuntu-os-cloud:image-family:ubuntu-1804-lts'}}
1775 if ro_task
is not None and isinstance(ro_task
, dict):
1776 for t
in ro_task
["tasks"]:
1780 line
.append(ro_task
.get("_id", ""))
1781 line
.append(str(ro_task
.get("locked_at", "")))
1782 line
.append(str(ro_task
.get("modified_at", "")))
1783 line
.append(str(ro_task
.get("created_at", "")))
1784 line
.append(str(ro_task
.get("to_check_at", "")))
1785 line
.append(str(ro_task
.get("locked_by", "")))
1786 line
.append(str(ro_task
.get("target_id", "")))
1787 line
.append(str(ro_task
.get("vim_info", {}).get("refresh_at", "")))
1788 line
.append(str(ro_task
.get("vim_info", "")))
1789 line
.append(str(ro_task
.get("tasks", "")))
1790 if isinstance(t
, dict):
1791 line
.append(str(t
.get("status", "")))
1792 line
.append(str(t
.get("action_id", "")))
1794 line
.append(str(t
.get("task_id", "")))
1795 line
.append(str(t
.get("action", "")))
1796 line
.append(str(t
.get("item", "")))
1797 line
.append(str(t
.get("find_params", "")))
1798 line
.append(str(t
.get("params", "")))
1800 line
.extend([""] * 2)
1802 line
.extend([""] * 5)
1805 self
.logger
.debug(";".join(line
))
1806 elif db_ro_task_update
is not None and isinstance(db_ro_task_update
, dict):
1809 st
= "tasks.{}.status".format(i
)
1810 if st
not in db_ro_task_update
:
1815 line
.append(db_ro_task_update
.get("_id", ""))
1816 line
.append(str(db_ro_task_update
.get("locked_at", "")))
1817 line
.append(str(db_ro_task_update
.get("modified_at", "")))
1819 line
.append(str(db_ro_task_update
.get("to_check_at", "")))
1820 line
.append(str(db_ro_task_update
.get("locked_by", "")))
1822 line
.append(str(db_ro_task_update
.get("vim_info.refresh_at", "")))
1824 line
.append(str(db_ro_task_update
.get("vim_info", "")))
1825 line
.append(str(str(db_ro_task_update
).count(".status")))
1826 line
.append(db_ro_task_update
.get(st
, ""))
1829 line
.extend([""] * 3)
1831 self
.logger
.debug(";".join(line
))
1833 elif db_ro_task_delete
is not None and isinstance(db_ro_task_delete
, dict):
1837 line
.append(db_ro_task_delete
.get("_id", ""))
1839 line
.append(db_ro_task_delete
.get("modified_at", ""))
1840 line
.extend([""] * 13)
1841 self
.logger
.debug(";".join(line
))
1847 line
.extend([""] * 16)
1848 self
.logger
.debug(";".join(line
))
1850 except Exception as e
:
1851 self
.logger
.error("Error logging ro_task: {}".format(e
))
1853 def _delete_task(self
, ro_task
, task_index
, task_depends
, db_update
):
1855 Determine if this task need to be done or superseded
1858 my_task
= ro_task
["tasks"][task_index
]
1859 task_id
= my_task
["task_id"]
1860 needed_delete
= ro_task
["vim_info"]["created"] or ro_task
["vim_info"].get(
1861 "created_items", False
1864 if my_task
["status"] == "FAILED":
1865 return None, None # TODO need to be retry??
1868 for index
, task
in enumerate(ro_task
["tasks"]):
1869 if index
== task_index
or not task
:
1873 my_task
["target_record"] == task
["target_record"]
1874 and task
["action"] == "CREATE"
1877 db_update
["tasks.{}.status".format(index
)] = task
[
1880 elif task
["action"] == "CREATE" and task
["status"] not in (
1884 needed_delete
= False
1887 return self
.item2class
[my_task
["item"]].delete(ro_task
, task_index
)
1889 return "SUPERSEDED", None
1890 except Exception as e
:
1891 if not isinstance(e
, NsWorkerException
):
1892 self
.logger
.critical(
1893 "Unexpected exception at _delete_task task={}: {}".format(
1899 return "FAILED", {"vim_status": "VIM_ERROR", "vim_details": str(e
)}
1901 def _create_task(self
, ro_task
, task_index
, task_depends
, db_update
):
1903 Determine if this task need to create something at VIM
1906 my_task
= ro_task
["tasks"][task_index
]
1907 task_id
= my_task
["task_id"]
1910 if my_task
["status"] == "FAILED":
1911 return None, None # TODO need to be retry??
1912 elif my_task
["status"] == "SCHEDULED":
1913 # check if already created by another task
1914 for index
, task
in enumerate(ro_task
["tasks"]):
1915 if index
== task_index
or not task
:
1918 if task
["action"] == "CREATE" and task
["status"] not in (
1923 return task
["status"], "COPY_VIM_INFO"
1926 task_status
, ro_vim_item_update
= self
.item2class
[my_task
["item"]].new(
1927 ro_task
, task_index
, task_depends
1929 # TODO update other CREATE tasks
1930 except Exception as e
:
1931 if not isinstance(e
, NsWorkerException
):
1933 "Error executing task={}: {}".format(task_id
, e
), exc_info
=True
1936 task_status
= "FAILED"
1937 ro_vim_item_update
= {"vim_status": "VIM_ERROR", "vim_details": str(e
)}
1938 # TODO update ro_vim_item_update
1940 return task_status
, ro_vim_item_update
1944 def _get_dependency(self
, task_id
, ro_task
=None, target_id
=None):
1946 Look for dependency task
1947 :param task_id: Can be one of
1948 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1949 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1950 3. task.task_id: "<action_id>:number"
1953 :return: database ro_task plus index of task
1956 task_id
.startswith("vim:")
1957 or task_id
.startswith("sdn:")
1958 or task_id
.startswith("wim:")
1960 target_id
, _
, task_id
= task_id
.partition(" ")
1962 if task_id
.startswith("nsrs:") or task_id
.startswith("vnfrs:"):
1963 ro_task_dependency
= self
.db
.get_one(
1965 q_filter
={"target_id": target_id
, "tasks.target_record_id": task_id
},
1966 fail_on_empty
=False,
1969 if ro_task_dependency
:
1970 for task_index
, task
in enumerate(ro_task_dependency
["tasks"]):
1971 if task
["target_record_id"] == task_id
:
1972 return ro_task_dependency
, task_index
1976 for task_index
, task
in enumerate(ro_task
["tasks"]):
1977 if task
and task
["task_id"] == task_id
:
1978 return ro_task
, task_index
1980 ro_task_dependency
= self
.db
.get_one(
1983 "tasks.ANYINDEX.task_id": task_id
,
1984 "tasks.ANYINDEX.target_record.ne": None,
1986 fail_on_empty
=False,
1989 if ro_task_dependency
:
1990 for task_index
, task
in ro_task_dependency
["tasks"]:
1991 if task
["task_id"] == task_id
:
1992 return ro_task_dependency
, task_index
1993 raise NsWorkerException("Cannot get depending task {}".format(task_id
))
1995 def update_vm_refresh(self
):
1996 """Enables the VM status updates if self.refresh_config.active parameter
1997 is not -1 and than updates the DB accordingly
2001 self
.logger
.debug("Checking if VM status update config")
2002 next_refresh
= time
.time()
2003 if self
.refresh_config
.active
== -1:
2006 next_refresh
+= self
.refresh_config
.active
2008 if next_refresh
!= -1:
2009 db_ro_task_update
= {}
2011 next_check_at
= now
+ (24 * 60 * 60)
2012 next_check_at
= min(next_check_at
, next_refresh
)
2013 db_ro_task_update
["vim_info.refresh_at"] = next_refresh
2014 db_ro_task_update
["to_check_at"] = next_check_at
2017 "Finding tasks which to be updated to enable VM status updates"
2019 refresh_tasks
= self
.db
.get_list(
2022 "tasks.status": "DONE",
2023 "to_check_at.lt": 0,
2026 self
.logger
.debug("Updating tasks to change the to_check_at status")
2027 for task
in refresh_tasks
:
2034 update_dict
=db_ro_task_update
,
2038 except Exception as e
:
2039 self
.logger
.error(f
"Error updating tasks to enable VM status updates: {e}")
2041 def _process_pending_tasks(self
, ro_task
):
2042 ro_task_id
= ro_task
["_id"]
2045 next_check_at
= now
+ (24 * 60 * 60)
2046 db_ro_task_update
= {}
2048 def _update_refresh(new_status
):
2049 # compute next_refresh
2051 nonlocal next_check_at
2052 nonlocal db_ro_task_update
2055 next_refresh
= time
.time()
2057 if task
["item"] in ("image", "flavor"):
2058 next_refresh
+= self
.refresh_config
.image
2059 elif new_status
== "BUILD":
2060 next_refresh
+= self
.refresh_config
.build
2061 elif new_status
== "DONE":
2062 if self
.refresh_config
.active
== -1:
2065 next_refresh
+= self
.refresh_config
.active
2067 next_refresh
+= self
.refresh_config
.error
2069 next_check_at
= min(next_check_at
, next_refresh
)
2070 db_ro_task_update
["vim_info.refresh_at"] = next_refresh
2071 ro_task
["vim_info"]["refresh_at"] = next_refresh
2075 # Log RO tasks only when loglevel is DEBUG
2076 if self.logger.getEffectiveLevel() == logging.DEBUG:
2077 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
2079 # Check if vim status refresh is enabled again
2080 self
.update_vm_refresh()
2081 # 0: get task_status_create
2083 task_status_create
= None
2087 for t
in ro_task
["tasks"]
2089 and t
["action"] == "CREATE"
2090 and t
["status"] in ("BUILD", "DONE")
2096 task_status_create
= task_create
["status"]
2098 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
2099 for task_action
in ("DELETE", "CREATE", "EXEC"):
2100 db_vim_update
= None
2103 for task_index
, task
in enumerate(ro_task
["tasks"]):
2105 continue # task deleted
2108 target_update
= None
2112 task_action
in ("DELETE", "EXEC")
2113 and task
["status"] not in ("SCHEDULED", "BUILD")
2115 or task
["action"] != task_action
2117 task_action
== "CREATE"
2118 and task
["status"] in ("FINISHED", "SUPERSEDED")
2123 task_path
= "tasks.{}.status".format(task_index
)
2125 db_vim_info_update
= None
2127 if task
["status"] == "SCHEDULED":
2128 # check if tasks that this depends on have been completed
2129 dependency_not_completed
= False
2131 for dependency_task_id
in task
.get("depends_on") or ():
2134 dependency_task_index
,
2135 ) = self
._get
_dependency
(
2136 dependency_task_id
, target_id
=ro_task
["target_id"]
2138 dependency_task
= dependency_ro_task
["tasks"][
2139 dependency_task_index
2142 if dependency_task
["status"] == "SCHEDULED":
2143 dependency_not_completed
= True
2144 next_check_at
= min(
2145 next_check_at
, dependency_ro_task
["to_check_at"]
2147 # must allow dependent task to be processed first
2148 # to do this set time after last_task_processed
2149 next_check_at
= max(
2150 self
.time_last_task_processed
, next_check_at
2153 elif dependency_task
["status"] == "FAILED":
2154 error_text
= "Cannot {} {} because depends on failed {} {} id={}): {}".format(
2157 dependency_task
["action"],
2158 dependency_task
["item"],
2160 dependency_ro_task
["vim_info"].get(
2165 "task={} {}".format(task
["task_id"], error_text
)
2167 raise NsWorkerException(error_text
)
2169 task_depends
[dependency_task_id
] = dependency_ro_task
[
2173 "TASK-{}".format(dependency_task_id
)
2174 ] = dependency_ro_task
["vim_info"]["vim_id"]
2176 if dependency_not_completed
:
2177 # TODO set at vim_info.vim_details that it is waiting
2180 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
2181 # the task of renew this locking. It will update database locket_at periodically
2183 lock_object
= LockRenew
.add_lock_object(
2184 "ro_tasks", ro_task
, self
2187 if task
["action"] == "DELETE":
2188 (new_status
, db_vim_info_update
,) = self
._delete
_task
(
2189 ro_task
, task_index
, task_depends
, db_ro_task_update
2192 "FINISHED" if new_status
== "DONE" else new_status
2194 # ^with FINISHED instead of DONE it will not be refreshing
2196 if new_status
in ("FINISHED", "SUPERSEDED"):
2197 target_update
= "DELETE"
2198 elif task
["action"] == "EXEC":
2203 ) = self
.item2class
[task
["item"]].exec(
2204 ro_task
, task_index
, task_depends
2207 "FINISHED" if new_status
== "DONE" else new_status
2209 # ^with FINISHED instead of DONE it will not be refreshing
2212 # load into database the modified db_task_update "retries" and "next_retry"
2213 if db_task_update
.get("retries"):
2215 "tasks.{}.retries".format(task_index
)
2216 ] = db_task_update
["retries"]
2218 next_check_at
= time
.time() + db_task_update
.get(
2221 target_update
= None
2222 elif task
["action"] == "CREATE":
2223 if task
["status"] == "SCHEDULED":
2224 if task_status_create
:
2225 new_status
= task_status_create
2226 target_update
= "COPY_VIM_INFO"
2228 new_status
, db_vim_info_update
= self
.item2class
[
2230 ].new(ro_task
, task_index
, task_depends
)
2231 # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
2232 _update_refresh(new_status
)
2234 refresh_at
= ro_task
["vim_info"]["refresh_at"]
2235 if refresh_at
and refresh_at
!= -1 and now
> refresh_at
:
2236 new_status
, db_vim_info_update
= self
.item2class
[
2239 _update_refresh(new_status
)
2241 # The refresh is updated to avoid set the value of "refresh_at" to
2242 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2243 # because it can happen that in this case the task is never processed
2244 _update_refresh(task
["status"])
2246 except Exception as e
:
2247 new_status
= "FAILED"
2248 db_vim_info_update
= {
2249 "vim_status": "VIM_ERROR",
2250 "vim_details": str(e
),
2254 e
, (NsWorkerException
, vimconn
.VimConnException
)
2257 "Unexpected exception at _delete_task task={}: {}".format(
2264 if db_vim_info_update
:
2265 db_vim_update
= db_vim_info_update
.copy()
2266 db_ro_task_update
.update(
2269 for k
, v
in db_vim_info_update
.items()
2272 ro_task
["vim_info"].update(db_vim_info_update
)
2275 if task_action
== "CREATE":
2276 task_status_create
= new_status
2277 db_ro_task_update
[task_path
] = new_status
2279 if target_update
or db_vim_update
:
2280 if target_update
== "DELETE":
2281 self
._update
_target
(task
, None)
2282 elif target_update
== "COPY_VIM_INFO":
2283 self
._update
_target
(task
, ro_task
["vim_info"])
2285 self
._update
_target
(task
, db_vim_update
)
2287 except Exception as e
:
2289 isinstance(e
, DbException
)
2290 and e
.http_code
== HTTPStatus
.NOT_FOUND
2292 # if the vnfrs or nsrs has been removed from database, this task must be removed
2294 "marking to delete task={}".format(task
["task_id"])
2296 self
.tasks_to_delete
.append(task
)
2299 "Unexpected exception at _update_target task={}: {}".format(
2305 locked_at
= ro_task
["locked_at"]
2309 lock_object
["locked_at"],
2310 lock_object
["locked_at"] + self
.task_locked_time
,
2312 # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
2313 # contain exactly locked_at + self.task_locked_time
2314 LockRenew
.remove_lock_object(lock_object
)
2317 "_id": ro_task
["_id"],
2318 "to_check_at": ro_task
["to_check_at"],
2319 "locked_at": locked_at
,
2321 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2322 # outside this task (by ro_nbi) do not update it
2323 db_ro_task_update
["locked_by"] = None
2324 # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
2325 db_ro_task_update
["locked_at"] = int(now
- self
.task_locked_time
)
2326 db_ro_task_update
["modified_at"] = now
2327 db_ro_task_update
["to_check_at"] = next_check_at
2330 # Log RO tasks only when loglevel is DEBUG
2331 if self.logger.getEffectiveLevel() == logging.DEBUG:
2332 db_ro_task_update_log = db_ro_task_update.copy()
2333 db_ro_task_update_log["_id"] = q_filter["_id"]
2334 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2337 if not self
.db
.set_one(
2339 update_dict
=db_ro_task_update
,
2341 fail_on_empty
=False,
2343 del db_ro_task_update
["to_check_at"]
2344 del q_filter
["to_check_at"]
2346 # Log RO tasks only when loglevel is DEBUG
2347 if self.logger.getEffectiveLevel() == logging.DEBUG:
2350 db_ro_task_update_log,
2353 "SET_TASK " + str(q_filter),
2359 update_dict
=db_ro_task_update
,
2362 except DbException
as e
:
2364 "ro_task={} Error updating database {}".format(ro_task_id
, e
)
2366 except Exception as e
:
2368 "Error executing ro_task={}: {}".format(ro_task_id
, e
), exc_info
=True
2371 def _update_target(self
, task
, ro_vim_item_update
):
2372 table
, _
, temp
= task
["target_record"].partition(":")
2373 _id
, _
, path_vim_status
= temp
.partition(":")
2374 path_item
= path_vim_status
[: path_vim_status
.rfind(".")]
2375 path_item
= path_item
[: path_item
.rfind(".")]
2376 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
2377 # path_item: dot separated list targeting record information, e.g. "vdur.10"
2379 if ro_vim_item_update
:
2381 path_vim_status
+ "." + k
: v
2382 for k
, v
in ro_vim_item_update
.items()
2384 in ("vim_id", "vim_details", "vim_name", "vim_status", "interfaces")
2387 if path_vim_status
.startswith("vdur."):
2388 # for backward compatibility, add vdur.name apart from vdur.vim_name
2389 if ro_vim_item_update
.get("vim_name"):
2390 update_dict
[path_item
+ ".name"] = ro_vim_item_update
["vim_name"]
2392 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
2393 if ro_vim_item_update
.get("vim_id"):
2394 update_dict
[path_item
+ ".vim-id"] = ro_vim_item_update
["vim_id"]
2396 # update general status
2397 if ro_vim_item_update
.get("vim_status"):
2398 update_dict
[path_item
+ ".status"] = ro_vim_item_update
[
2402 if ro_vim_item_update
.get("interfaces"):
2403 path_interfaces
= path_item
+ ".interfaces"
2405 for i
, iface
in enumerate(ro_vim_item_update
.get("interfaces")):
2409 path_interfaces
+ ".{}.".format(i
) + k
: v
2410 for k
, v
in iface
.items()
2411 if k
in ("vlan", "compute_node", "pci")
2415 # put ip_address and mac_address with ip-address and mac-address
2416 if iface
.get("ip_address"):
2418 path_interfaces
+ ".{}.".format(i
) + "ip-address"
2419 ] = iface
["ip_address"]
2421 if iface
.get("mac_address"):
2423 path_interfaces
+ ".{}.".format(i
) + "mac-address"
2424 ] = iface
["mac_address"]
2426 if iface
.get("mgmt_vnf_interface") and iface
.get("ip_address"):
2427 update_dict
["ip-address"] = iface
.get("ip_address").split(
2431 if iface
.get("mgmt_vdu_interface") and iface
.get("ip_address"):
2432 update_dict
[path_item
+ ".ip-address"] = iface
.get(
2436 self
.db
.set_one(table
, q_filter
={"_id": _id
}, update_dict
=update_dict
)
2438 update_dict
= {path_item
+ ".status": "DELETED"}
2441 q_filter
={"_id": _id
},
2442 update_dict
=update_dict
,
2443 unset
={path_vim_status
: None},
2446 def _process_delete_db_tasks(self
):
2448 Delete task from database because vnfrs or nsrs or both have been deleted
2449 :return: None. Uses and modify self.tasks_to_delete
2451 while self
.tasks_to_delete
:
2452 task
= self
.tasks_to_delete
[0]
2453 vnfrs_deleted
= None
2454 nsr_id
= task
["nsr_id"]
2456 if task
["target_record"].startswith("vnfrs:"):
2457 # check if nsrs is present
2458 if self
.db
.get_one("nsrs", {"_id": nsr_id
}, fail_on_empty
=False):
2459 vnfrs_deleted
= task
["target_record"].split(":")[1]
2462 self
.delete_db_tasks(self
.db
, nsr_id
, vnfrs_deleted
)
2463 except Exception as e
:
2465 "Error deleting task={}: {}".format(task
["task_id"], e
)
2467 self
.tasks_to_delete
.pop(0)
2470 def delete_db_tasks(db
, nsr_id
, vnfrs_deleted
):
2472 Static method because it is called from osm_ng_ro.ns
2473 :param db: instance of database to use
2474 :param nsr_id: affected nsrs id
2475 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2476 :return: None, exception is fails
2479 for retry
in range(retries
):
2480 ro_tasks
= db
.get_list("ro_tasks", {"tasks.nsr_id": nsr_id
})
2484 for ro_task
in ro_tasks
:
2486 to_delete_ro_task
= True
2488 for index
, task
in enumerate(ro_task
["tasks"]):
2491 elif (not vnfrs_deleted
and task
["nsr_id"] == nsr_id
) or (
2493 and task
["target_record"].startswith("vnfrs:" + vnfrs_deleted
)
2495 db_update
["tasks.{}".format(index
)] = None
2497 # used by other nsr, ro_task cannot be deleted
2498 to_delete_ro_task
= False
2500 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2501 if to_delete_ro_task
:
2505 "_id": ro_task
["_id"],
2506 "modified_at": ro_task
["modified_at"],
2508 fail_on_empty
=False,
2512 db_update
["modified_at"] = now
2516 "_id": ro_task
["_id"],
2517 "modified_at": ro_task
["modified_at"],
2519 update_dict
=db_update
,
2520 fail_on_empty
=False,
2526 raise NsWorkerException("Exceeded {} retries".format(retries
))
2530 self
.logger
.info("Starting")
2532 # step 1: get commands from queue
2534 if self
.vim_targets
:
2535 task
= self
.task_queue
.get(block
=False)
2538 self
.logger
.debug("enters in idle state")
2540 task
= self
.task_queue
.get(block
=True)
2543 if task
[0] == "terminate":
2545 elif task
[0] == "load_vim":
2546 self
.logger
.info("order to load vim {}".format(task
[1]))
2547 self
._load
_vim
(task
[1])
2548 elif task
[0] == "unload_vim":
2549 self
.logger
.info("order to unload vim {}".format(task
[1]))
2550 self
._unload
_vim
(task
[1])
2551 elif task
[0] == "reload_vim":
2552 self
._reload
_vim
(task
[1])
2553 elif task
[0] == "check_vim":
2554 self
.logger
.info("order to check vim {}".format(task
[1]))
2555 self
._check
_vim
(task
[1])
2557 except Exception as e
:
2558 if isinstance(e
, queue
.Empty
):
2561 self
.logger
.critical(
2562 "Error processing task: {}".format(e
), exc_info
=True
2565 # step 2: process pending_tasks, delete not needed tasks
2567 if self
.tasks_to_delete
:
2568 self
._process
_delete
_db
_tasks
()
2571 # Log RO tasks only when loglevel is DEBUG
2572 if self.logger.getEffectiveLevel() == logging.DEBUG:
2573 _ = self._get_db_all_tasks()
2575 ro_task
= self
._get
_db
_task
()
2577 self
._process
_pending
_tasks
(ro_task
)
2581 except Exception as e
:
2582 self
.logger
.critical(
2583 "Unexpected exception at run: " + str(e
), exc_info
=True
2586 self
.logger
.info("Finishing")