1 # -*- coding: utf-8 -*-
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
27 from copy
import deepcopy
28 from http
import HTTPStatus
32 from shutil
import rmtree
36 from typing
import Dict
37 from unittest
.mock
import Mock
39 from importlib_metadata
import entry_points
40 from osm_common
.dbbase
import DbException
41 from osm_ng_ro
.vim_admin
import LockRenew
42 from osm_ro_plugin
import sdnconn
, vimconn
43 from osm_ro_plugin
.sdn_dummy
import SdnDummyConnector
44 from osm_ro_plugin
.vim_dummy
import VimDummyConnector
48 __author__
= "Alfonso Tierno"
49 __date__
= "$28-Sep-2017 12:07:15$"
52 def deep_get(target_dict
, *args
, **kwargs
):
54 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
55 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
56 :param target_dict: dictionary to be read
57 :param args: list of keys to read from target_dict
58 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
59 :return: The wanted value if exist, None or default otherwise
62 if not isinstance(target_dict
, dict) or key
not in target_dict
:
63 return kwargs
.get("default")
64 target_dict
= target_dict
[key
]
68 class NsWorkerException(Exception):
72 class FailingConnector
:
73 def __init__(self
, error_msg
):
74 self
.error_msg
= error_msg
76 for method
in dir(vimconn
.VimConnector
):
79 self
, method
, Mock(side_effect
=vimconn
.VimConnException(error_msg
))
82 for method
in dir(sdnconn
.SdnConnectorBase
):
85 self
, method
, Mock(side_effect
=sdnconn
.SdnConnectorError(error_msg
))
89 class NsWorkerExceptionNotFound(NsWorkerException
):
93 class VimInteractionBase
:
94 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
95 It implements methods that does nothing and return ok"""
97 def __init__(self
, db
, my_vims
, db_vims
, logger
):
100 self
.my_vims
= my_vims
101 self
.db_vims
= db_vims
103 def new(self
, ro_task
, task_index
, task_depends
):
106 def refresh(self
, ro_task
):
107 """skip calling VIM to get image, flavor status. Assumes ok"""
108 if ro_task
["vim_info"]["vim_status"] == "VIM_ERROR":
113 def delete(self
, ro_task
, task_index
):
114 """skip calling VIM to delete image. Assumes ok"""
117 def exec(self
, ro_task
, task_index
, task_depends
):
118 return "DONE", None, None
121 class VimInteractionNet(VimInteractionBase
):
122 def new(self
, ro_task
, task_index
, task_depends
):
124 task
= ro_task
["tasks"][task_index
]
125 task_id
= task
["task_id"]
128 target_vim
= self
.my_vims
[ro_task
["target_id"]]
130 mgmtnet_defined_in_vim
= False
134 if task
.get("find_params"):
135 # if management, get configuration of VIM
136 if task
["find_params"].get("filter_dict"):
137 vim_filter
= task
["find_params"]["filter_dict"]
139 elif task
["find_params"].get("mgmt"):
142 self
.db_vims
[ro_task
["target_id"]],
144 "management_network_id",
146 mgmtnet_defined_in_vim
= True
148 "id": self
.db_vims
[ro_task
["target_id"]]["config"][
149 "management_network_id"
153 self
.db_vims
[ro_task
["target_id"]],
155 "management_network_name",
157 mgmtnet_defined_in_vim
= True
159 "name": self
.db_vims
[ro_task
["target_id"]]["config"][
160 "management_network_name"
164 vim_filter
= {"name": task
["find_params"]["name"]}
166 raise NsWorkerExceptionNotFound(
167 "Invalid find_params for new_net {}".format(task
["find_params"])
170 vim_nets
= target_vim
.get_network_list(vim_filter
)
171 if not vim_nets
and not task
.get("params"):
172 # If there is mgmt-network in the descriptor,
173 # there is no mapping of that network to a VIM network in the descriptor,
174 # also there is no mapping in the "--config" parameter or at VIM creation;
175 # that mgmt-network will be created.
176 if mgmtnet
and not mgmtnet_defined_in_vim
:
178 vim_filter
.get("name")
179 if vim_filter
.get("name")
180 else vim_filter
.get("id")[:16]
182 vim_net_id
, created_items
= target_vim
.new_network(
186 "Created mgmt network vim_net_id: {}".format(vim_net_id
)
190 raise NsWorkerExceptionNotFound(
191 "Network not found with this criteria: '{}'".format(
192 task
.get("find_params")
195 elif len(vim_nets
) > 1:
196 raise NsWorkerException(
197 "More than one network found with this criteria: '{}'".format(
203 vim_net_id
= vim_nets
[0]["id"]
206 params
= task
["params"]
207 vim_net_id
, created_items
= target_vim
.new_network(**params
)
210 ro_vim_item_update
= {
211 "vim_id": vim_net_id
,
212 "vim_status": "BUILD",
214 "created_items": created_items
,
218 "task={} {} new-net={} created={}".format(
219 task_id
, ro_task
["target_id"], vim_net_id
, created
223 return "BUILD", ro_vim_item_update
224 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
226 "task={} vim={} new-net: {}".format(task_id
, ro_task
["target_id"], e
)
228 ro_vim_item_update
= {
229 "vim_status": "VIM_ERROR",
231 "vim_details": str(e
),
234 return "FAILED", ro_vim_item_update
236 def refresh(self
, ro_task
):
237 """Call VIM to get network status"""
238 ro_task_id
= ro_task
["_id"]
239 target_vim
= self
.my_vims
[ro_task
["target_id"]]
240 vim_id
= ro_task
["vim_info"]["vim_id"]
241 net_to_refresh_list
= [vim_id
]
244 vim_dict
= target_vim
.refresh_nets_status(net_to_refresh_list
)
245 vim_info
= vim_dict
[vim_id
]
247 if vim_info
["status"] == "ACTIVE":
249 elif vim_info
["status"] == "BUILD":
250 task_status
= "BUILD"
252 task_status
= "FAILED"
253 except vimconn
.VimConnException
as e
:
254 # Mark all tasks at VIM_ERROR status
256 "ro_task={} vim={} get-net={}: {}".format(
257 ro_task_id
, ro_task
["target_id"], vim_id
, e
260 vim_info
= {"status": "VIM_ERROR", "error_msg": str(e
)}
261 task_status
= "FAILED"
263 ro_vim_item_update
= {}
264 if ro_task
["vim_info"]["vim_status"] != vim_info
["status"]:
265 ro_vim_item_update
["vim_status"] = vim_info
["status"]
267 if ro_task
["vim_info"]["vim_name"] != vim_info
.get("name"):
268 ro_vim_item_update
["vim_name"] = vim_info
.get("name")
270 if vim_info
["status"] in ("ERROR", "VIM_ERROR"):
271 if ro_task
["vim_info"]["vim_details"] != vim_info
.get("error_msg"):
272 ro_vim_item_update
["vim_details"] = vim_info
.get("error_msg")
273 elif vim_info
["status"] == "DELETED":
274 ro_vim_item_update
["vim_id"] = None
275 ro_vim_item_update
["vim_details"] = "Deleted externally"
277 if ro_task
["vim_info"]["vim_details"] != vim_info
["vim_info"]:
278 ro_vim_item_update
["vim_details"] = vim_info
["vim_info"]
280 if ro_vim_item_update
:
282 "ro_task={} {} get-net={}: status={} {}".format(
284 ro_task
["target_id"],
286 ro_vim_item_update
.get("vim_status"),
287 ro_vim_item_update
.get("vim_details")
288 if ro_vim_item_update
.get("vim_status") != "ACTIVE"
293 return task_status
, ro_vim_item_update
295 def delete(self
, ro_task
, task_index
):
296 task
= ro_task
["tasks"][task_index
]
297 task_id
= task
["task_id"]
298 net_vim_id
= ro_task
["vim_info"]["vim_id"]
299 ro_vim_item_update_ok
= {
300 "vim_status": "DELETED",
302 "vim_details": "DELETED",
307 if net_vim_id
or ro_task
["vim_info"]["created_items"]:
308 target_vim
= self
.my_vims
[ro_task
["target_id"]]
309 target_vim
.delete_network(
310 net_vim_id
, ro_task
["vim_info"]["created_items"]
312 except vimconn
.VimConnNotFoundException
:
313 ro_vim_item_update_ok
["vim_details"] = "already deleted"
314 except vimconn
.VimConnException
as e
:
316 "ro_task={} vim={} del-net={}: {}".format(
317 ro_task
["_id"], ro_task
["target_id"], net_vim_id
, e
320 ro_vim_item_update
= {
321 "vim_status": "VIM_ERROR",
322 "vim_details": "Error while deleting: {}".format(e
),
325 return "FAILED", ro_vim_item_update
328 "task={} {} del-net={} {}".format(
330 ro_task
["target_id"],
332 ro_vim_item_update_ok
.get("vim_details", ""),
336 return "DONE", ro_vim_item_update_ok
339 class VimInteractionVdu(VimInteractionBase
):
340 max_retries_inject_ssh_key
= 20 # 20 times
341 time_retries_inject_ssh_key
= 30 # wevery 30 seconds
343 def new(self
, ro_task
, task_index
, task_depends
):
344 task
= ro_task
["tasks"][task_index
]
345 task_id
= task
["task_id"]
348 target_vim
= self
.my_vims
[ro_task
["target_id"]]
352 params
= task
["params"]
353 params_copy
= deepcopy(params
)
354 net_list
= params_copy
["net_list"]
357 # change task_id into network_id
358 if "net_id" in net
and net
["net_id"].startswith("TASK-"):
359 network_id
= task_depends
[net
["net_id"]]
362 raise NsWorkerException(
363 "Cannot create VM because depends on a network not created or found "
364 "for {}".format(net
["net_id"])
367 net
["net_id"] = network_id
369 if params_copy
["image_id"].startswith("TASK-"):
370 params_copy
["image_id"] = task_depends
[params_copy
["image_id"]]
372 if params_copy
["flavor_id"].startswith("TASK-"):
373 params_copy
["flavor_id"] = task_depends
[params_copy
["flavor_id"]]
375 affinity_group_list
= params_copy
["affinity_group_list"]
376 for affinity_group
in affinity_group_list
:
377 # change task_id into affinity_group_id
378 if "affinity_group_id" in affinity_group
and affinity_group
[
380 ].startswith("TASK-"):
381 affinity_group_id
= task_depends
[
382 affinity_group
["affinity_group_id"]
385 if not affinity_group_id
:
386 raise NsWorkerException(
387 "found for {}".format(affinity_group
["affinity_group_id"])
390 affinity_group
["affinity_group_id"] = affinity_group_id
392 vim_vm_id
, created_items
= target_vim
.new_vminstance(**params_copy
)
393 interfaces
= [iface
["vim_id"] for iface
in params_copy
["net_list"]]
395 ro_vim_item_update
= {
397 "vim_status": "BUILD",
399 "created_items": created_items
,
401 "interfaces_vim_ids": interfaces
,
405 "task={} {} new-vm={} created={}".format(
406 task_id
, ro_task
["target_id"], vim_vm_id
, created
410 return "BUILD", ro_vim_item_update
411 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
413 "task={} {} new-vm: {}".format(task_id
, ro_task
["target_id"], e
)
415 ro_vim_item_update
= {
416 "vim_status": "VIM_ERROR",
418 "vim_details": str(e
),
421 return "FAILED", ro_vim_item_update
423 def delete(self
, ro_task
, task_index
):
424 task
= ro_task
["tasks"][task_index
]
425 task_id
= task
["task_id"]
426 vm_vim_id
= ro_task
["vim_info"]["vim_id"]
427 ro_vim_item_update_ok
= {
428 "vim_status": "DELETED",
430 "vim_details": "DELETED",
435 if vm_vim_id
or ro_task
["vim_info"]["created_items"]:
436 target_vim
= self
.my_vims
[ro_task
["target_id"]]
437 target_vim
.delete_vminstance(
438 vm_vim_id
, ro_task
["vim_info"]["created_items"]
440 except vimconn
.VimConnNotFoundException
:
441 ro_vim_item_update_ok
["vim_details"] = "already deleted"
442 except vimconn
.VimConnException
as e
:
444 "ro_task={} vim={} del-vm={}: {}".format(
445 ro_task
["_id"], ro_task
["target_id"], vm_vim_id
, e
448 ro_vim_item_update
= {
449 "vim_status": "VIM_ERROR",
450 "vim_details": "Error while deleting: {}".format(e
),
453 return "FAILED", ro_vim_item_update
456 "task={} {} del-vm={} {}".format(
458 ro_task
["target_id"],
460 ro_vim_item_update_ok
.get("vim_details", ""),
464 return "DONE", ro_vim_item_update_ok
466 def refresh(self
, ro_task
):
467 """Call VIM to get vm status"""
468 ro_task_id
= ro_task
["_id"]
469 target_vim
= self
.my_vims
[ro_task
["target_id"]]
470 vim_id
= ro_task
["vim_info"]["vim_id"]
475 vm_to_refresh_list
= [vim_id
]
477 vim_dict
= target_vim
.refresh_vms_status(vm_to_refresh_list
)
478 vim_info
= vim_dict
[vim_id
]
480 if vim_info
["status"] == "ACTIVE":
482 elif vim_info
["status"] == "BUILD":
483 task_status
= "BUILD"
485 task_status
= "FAILED"
487 # try to load and parse vim_information
489 vim_info_info
= yaml
.safe_load(vim_info
["vim_info"])
490 if vim_info_info
.get("name"):
491 vim_info
["name"] = vim_info_info
["name"]
494 except vimconn
.VimConnException
as e
:
495 # Mark all tasks at VIM_ERROR status
497 "ro_task={} vim={} get-vm={}: {}".format(
498 ro_task_id
, ro_task
["target_id"], vim_id
, e
501 vim_info
= {"status": "VIM_ERROR", "error_msg": str(e
)}
502 task_status
= "FAILED"
504 ro_vim_item_update
= {}
506 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
508 if vim_info
.get("interfaces"):
509 for vim_iface_id
in ro_task
["vim_info"]["interfaces_vim_ids"]:
513 for iface
in vim_info
["interfaces"]
514 if vim_iface_id
== iface
["vim_interface_id"]
519 # iface.pop("vim_info", None)
520 vim_interfaces
.append(iface
)
524 for t
in ro_task
["tasks"]
525 if t
and t
["action"] == "CREATE" and t
["status"] != "FINISHED"
527 if vim_interfaces
and task_create
.get("mgmt_vnf_interface") is not None:
528 vim_interfaces
[task_create
["mgmt_vnf_interface"]][
532 mgmt_vdu_iface
= task_create
.get(
533 "mgmt_vdu_interface", task_create
.get("mgmt_vnf_interface", 0)
536 vim_interfaces
[mgmt_vdu_iface
]["mgmt_vdu_interface"] = True
538 if ro_task
["vim_info"]["interfaces"] != vim_interfaces
:
539 ro_vim_item_update
["interfaces"] = vim_interfaces
541 if ro_task
["vim_info"]["vim_status"] != vim_info
["status"]:
542 ro_vim_item_update
["vim_status"] = vim_info
["status"]
544 if ro_task
["vim_info"]["vim_name"] != vim_info
.get("name"):
545 ro_vim_item_update
["vim_name"] = vim_info
.get("name")
547 if vim_info
["status"] in ("ERROR", "VIM_ERROR"):
548 if ro_task
["vim_info"]["vim_details"] != vim_info
.get("error_msg"):
549 ro_vim_item_update
["vim_details"] = vim_info
.get("error_msg")
550 elif vim_info
["status"] == "DELETED":
551 ro_vim_item_update
["vim_id"] = None
552 ro_vim_item_update
["vim_details"] = "Deleted externally"
554 if ro_task
["vim_info"]["vim_details"] != vim_info
["vim_info"]:
555 ro_vim_item_update
["vim_details"] = vim_info
["vim_info"]
557 if ro_vim_item_update
:
559 "ro_task={} {} get-vm={}: status={} {}".format(
561 ro_task
["target_id"],
563 ro_vim_item_update
.get("vim_status"),
564 ro_vim_item_update
.get("vim_details")
565 if ro_vim_item_update
.get("vim_status") != "ACTIVE"
570 return task_status
, ro_vim_item_update
572 def exec(self
, ro_task
, task_index
, task_depends
):
573 task
= ro_task
["tasks"][task_index
]
574 task_id
= task
["task_id"]
575 target_vim
= self
.my_vims
[ro_task
["target_id"]]
576 db_task_update
= {"retries": 0}
577 retries
= task
.get("retries", 0)
580 params
= task
["params"]
581 params_copy
= deepcopy(params
)
582 params_copy
["ro_key"] = self
.db
.decrypt(
583 params_copy
.pop("private_key"),
584 params_copy
.pop("schema_version"),
585 params_copy
.pop("salt"),
587 params_copy
["ip_addr"] = params_copy
.pop("ip_address")
588 target_vim
.inject_user_key(**params_copy
)
590 "task={} {} action-vm=inject_key".format(task_id
, ro_task
["target_id"])
597 ) # params_copy["key"]
598 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
601 self
.logger
.debug(traceback
.format_exc())
602 if retries
< self
.max_retries_inject_ssh_key
:
608 "next_retry": self
.time_retries_inject_ssh_key
,
613 "task={} {} inject-ssh-key: {}".format(task_id
, ro_task
["target_id"], e
)
615 ro_vim_item_update
= {"vim_details": str(e
)}
617 return "FAILED", ro_vim_item_update
, db_task_update
620 class VimInteractionImage(VimInteractionBase
):
621 def new(self
, ro_task
, task_index
, task_depends
):
622 task
= ro_task
["tasks"][task_index
]
623 task_id
= task
["task_id"]
626 target_vim
= self
.my_vims
[ro_task
["target_id"]]
630 if task
.get("find_params"):
631 vim_images
= target_vim
.get_image_list(**task
["find_params"])
634 raise NsWorkerExceptionNotFound(
635 "Image not found with this criteria: '{}'".format(
639 elif len(vim_images
) > 1:
640 raise NsWorkerException(
641 "More than one image found with this criteria: '{}'".format(
646 vim_image_id
= vim_images
[0]["id"]
648 ro_vim_item_update
= {
649 "vim_id": vim_image_id
,
650 "vim_status": "DONE",
652 "created_items": created_items
,
656 "task={} {} new-image={} created={}".format(
657 task_id
, ro_task
["target_id"], vim_image_id
, created
661 return "DONE", ro_vim_item_update
662 except (NsWorkerException
, vimconn
.VimConnException
) as e
:
664 "task={} {} new-image: {}".format(task_id
, ro_task
["target_id"], e
)
666 ro_vim_item_update
= {
667 "vim_status": "VIM_ERROR",
669 "vim_details": str(e
),
672 return "FAILED", ro_vim_item_update
675 class VimInteractionFlavor(VimInteractionBase
):
676 def delete(self
, ro_task
, task_index
):
677 task
= ro_task
["tasks"][task_index
]
678 task_id
= task
["task_id"]
679 flavor_vim_id
= ro_task
["vim_info"]["vim_id"]
680 ro_vim_item_update_ok
= {
681 "vim_status": "DELETED",
683 "vim_details": "DELETED",
689 target_vim
= self
.my_vims
[ro_task
["target_id"]]
690 target_vim
.delete_flavor(flavor_vim_id
)
691 except vimconn
.VimConnNotFoundException
:
692 ro_vim_item_update_ok
["vim_details"] = "already deleted"
693 except vimconn
.VimConnException
as e
:
695 "ro_task={} vim={} del-flavor={}: {}".format(
696 ro_task
["_id"], ro_task
["target_id"], flavor_vim_id
, e
699 ro_vim_item_update
= {
700 "vim_status": "VIM_ERROR",
701 "vim_details": "Error while deleting: {}".format(e
),
704 return "FAILED", ro_vim_item_update
707 "task={} {} del-flavor={} {}".format(
709 ro_task
["target_id"],
711 ro_vim_item_update_ok
.get("vim_details", ""),
715 return "DONE", ro_vim_item_update_ok
717 def new(self
, ro_task
, task_index
, task_depends
):
718 task
= ro_task
["tasks"][task_index
]
719 task_id
= task
["task_id"]
722 target_vim
= self
.my_vims
[ro_task
["target_id"]]
728 if task
.get("find_params"):
730 flavor_data
= task
["find_params"]["flavor_data"]
731 vim_flavor_id
= target_vim
.get_flavor_id_from_data(flavor_data
)
732 except vimconn
.VimConnNotFoundException
:
735 if not vim_flavor_id
and task
.get("params"):
737 flavor_data
= task
["params"]["flavor_data"]
738 vim_flavor_id
= target_vim
.new_flavor(flavor_data
)
741 ro_vim_item_update
= {
742 "vim_id": vim_flavor_id
,
743 "vim_status": "DONE",
745 "created_items": created_items
,
749 "task={} {} new-flavor={} created={}".format(
750 task_id
, ro_task
["target_id"], vim_flavor_id
, created
754 return "DONE", ro_vim_item_update
755 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
757 "task={} vim={} new-flavor: {}".format(task_id
, ro_task
["target_id"], e
)
759 ro_vim_item_update
= {
760 "vim_status": "VIM_ERROR",
762 "vim_details": str(e
),
765 return "FAILED", ro_vim_item_update
768 class VimInteractionAffinityGroup(VimInteractionBase
):
769 def delete(self
, ro_task
, task_index
):
770 task
= ro_task
["tasks"][task_index
]
771 task_id
= task
["task_id"]
772 affinity_group_vim_id
= ro_task
["vim_info"]["vim_id"]
773 ro_vim_item_update_ok
= {
774 "vim_status": "DELETED",
776 "vim_details": "DELETED",
781 if affinity_group_vim_id
:
782 target_vim
= self
.my_vims
[ro_task
["target_id"]]
783 target_vim
.delete_affinity_group(affinity_group_vim_id
)
784 except vimconn
.VimConnNotFoundException
:
785 ro_vim_item_update_ok
["vim_details"] = "already deleted"
786 except vimconn
.VimConnException
as e
:
788 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format(
789 ro_task
["_id"], ro_task
["target_id"], affinity_group_vim_id
, e
792 ro_vim_item_update
= {
793 "vim_status": "VIM_ERROR",
794 "vim_details": "Error while deleting: {}".format(e
),
797 return "FAILED", ro_vim_item_update
800 "task={} {} del-affinity-or-anti-affinity-group={} {}".format(
802 ro_task
["target_id"],
803 affinity_group_vim_id
,
804 ro_vim_item_update_ok
.get("vim_details", ""),
808 return "DONE", ro_vim_item_update_ok
810 def new(self
, ro_task
, task_index
, task_depends
):
811 task
= ro_task
["tasks"][task_index
]
812 task_id
= task
["task_id"]
815 target_vim
= self
.my_vims
[ro_task
["target_id"]]
818 affinity_group_vim_id
= None
819 affinity_group_data
= None
821 if task
.get("params"):
822 affinity_group_data
= task
["params"].get("affinity_group_data")
824 if affinity_group_data
and affinity_group_data
.get("vim-affinity-group-id"):
826 param_affinity_group_id
= task
["params"]["affinity_group_data"].get(
827 "vim-affinity-group-id"
829 affinity_group_vim_id
= target_vim
.get_affinity_group(
830 param_affinity_group_id
832 except vimconn
.VimConnNotFoundException
:
834 "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}"
835 "could not be found at VIM. Creating a new one.".format(
836 task_id
, ro_task
["target_id"], param_affinity_group_id
840 if not affinity_group_vim_id
and affinity_group_data
:
841 affinity_group_vim_id
= target_vim
.new_affinity_group(
846 ro_vim_item_update
= {
847 "vim_id": affinity_group_vim_id
,
848 "vim_status": "DONE",
850 "created_items": created_items
,
854 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format(
855 task_id
, ro_task
["target_id"], affinity_group_vim_id
, created
859 return "DONE", ro_vim_item_update
860 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
862 "task={} vim={} new-affinity-or-anti-affinity-group:"
863 " {}".format(task_id
, ro_task
["target_id"], e
)
865 ro_vim_item_update
= {
866 "vim_status": "VIM_ERROR",
868 "vim_details": str(e
),
871 return "FAILED", ro_vim_item_update
874 class VimInteractionSdnNet(VimInteractionBase
):
876 def _match_pci(port_pci
, mapping
):
878 Check if port_pci matches with mapping
879 mapping can have brackets to indicate that several chars are accepted. e.g
880 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
881 :param port_pci: text
882 :param mapping: text, can contain brackets to indicate several chars are available
883 :return: True if matches, False otherwise
885 if not port_pci
or not mapping
:
887 if port_pci
== mapping
:
893 bracket_start
= mapping
.find("[", mapping_index
)
895 if bracket_start
== -1:
898 bracket_end
= mapping
.find("]", bracket_start
)
899 if bracket_end
== -1:
902 length
= bracket_start
- mapping_index
905 and port_pci
[pci_index
: pci_index
+ length
]
906 != mapping
[mapping_index
:bracket_start
]
911 port_pci
[pci_index
+ length
]
912 not in mapping
[bracket_start
+ 1 : bracket_end
]
916 pci_index
+= length
+ 1
917 mapping_index
= bracket_end
+ 1
919 if port_pci
[pci_index
:] != mapping
[mapping_index
:]:
924 def _get_interfaces(self
, vlds_to_connect
, vim_account_id
):
926 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
927 :param vim_account_id:
932 for vld
in vlds_to_connect
:
933 table
, _
, db_id
= vld
.partition(":")
934 db_id
, _
, vld
= db_id
.partition(":")
935 _
, _
, vld_id
= vld
.partition(".")
938 q_filter
= {"vim-account-id": vim_account_id
, "_id": db_id
}
939 iface_key
= "vnf-vld-id"
940 else: # table == "nsrs"
941 q_filter
= {"vim-account-id": vim_account_id
, "nsr-id-ref": db_id
}
942 iface_key
= "ns-vld-id"
944 db_vnfrs
= self
.db
.get_list("vnfrs", q_filter
=q_filter
)
946 for db_vnfr
in db_vnfrs
:
947 for vdu_index
, vdur
in enumerate(db_vnfr
.get("vdur", ())):
948 for iface_index
, interface
in enumerate(vdur
["interfaces"]):
949 if interface
.get(iface_key
) == vld_id
and interface
.get(
951 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
953 interface_
= interface
.copy()
954 interface_
["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
955 db_vnfr
["_id"], vdu_index
, iface_index
958 if vdur
.get("status") == "ERROR":
959 interface_
["status"] = "ERROR"
961 interfaces
.append(interface_
)
965 def refresh(self
, ro_task
):
966 # look for task create
967 task_create_index
, _
= next(
969 for i_t
in enumerate(ro_task
["tasks"])
971 and i_t
[1]["action"] == "CREATE"
972 and i_t
[1]["status"] != "FINISHED"
975 return self
.new(ro_task
, task_create_index
, None)
977 def new(self
, ro_task
, task_index
, task_depends
):
979 task
= ro_task
["tasks"][task_index
]
980 task_id
= task
["task_id"]
981 target_vim
= self
.my_vims
[ro_task
["target_id"]]
983 sdn_net_id
= ro_task
["vim_info"]["vim_id"]
985 created_items
= ro_task
["vim_info"].get("created_items")
986 connected_ports
= ro_task
["vim_info"].get("connected_ports", [])
987 new_connected_ports
= []
988 last_update
= ro_task
["vim_info"].get("last_update", 0)
989 sdn_status
= ro_task
["vim_info"].get("vim_status", "BUILD") or "BUILD"
991 created
= ro_task
["vim_info"].get("created", False)
995 params
= task
["params"]
996 vlds_to_connect
= params
.get("vlds", [])
997 associated_vim
= params
.get("target_vim")
998 # external additional ports
999 additional_ports
= params
.get("sdn-ports") or ()
1000 _
, _
, vim_account_id
= (
1002 if associated_vim
is None
1003 else associated_vim
.partition(":")
1007 # get associated VIM
1008 if associated_vim
not in self
.db_vims
:
1009 self
.db_vims
[associated_vim
] = self
.db
.get_one(
1010 "vim_accounts", {"_id": vim_account_id
}
1013 db_vim
= self
.db_vims
[associated_vim
]
1015 # look for ports to connect
1016 ports
= self
._get
_interfaces
(vlds_to_connect
, vim_account_id
)
1020 pending_ports
= error_ports
= 0
1022 sdn_need_update
= False
1025 vlan_used
= port
.get("vlan") or vlan_used
1027 # TODO. Do not connect if already done
1028 if not port
.get("compute_node") or not port
.get("pci"):
1029 if port
.get("status") == "ERROR":
1036 compute_node_mappings
= next(
1039 for c
in db_vim
["config"].get("sdn-port-mapping", ())
1040 if c
and c
["compute_node"] == port
["compute_node"]
1045 if compute_node_mappings
:
1046 # process port_mapping pci of type 0000:af:1[01].[1357]
1050 for p
in compute_node_mappings
["ports"]
1051 if self
._match
_pci
(port
["pci"], p
.get("pci"))
1057 if not db_vim
["config"].get("mapping_not_needed"):
1059 "Port mapping not found for compute_node={} pci={}".format(
1060 port
["compute_node"], port
["pci"]
1067 service_endpoint_id
= "{}:{}".format(port
["compute_node"], port
["pci"])
1069 "service_endpoint_id": pmap
.get("service_endpoint_id")
1070 or service_endpoint_id
,
1071 "service_endpoint_encapsulation_type": "dot1q"
1072 if port
["type"] == "SR-IOV"
1074 "service_endpoint_encapsulation_info": {
1075 "vlan": port
.get("vlan"),
1076 "mac": port
.get("mac-address"),
1077 "device_id": pmap
.get("device_id") or port
["compute_node"],
1078 "device_interface_id": pmap
.get("device_interface_id")
1080 "switch_dpid": pmap
.get("switch_id") or pmap
.get("switch_dpid"),
1081 "switch_port": pmap
.get("switch_port"),
1082 "service_mapping_info": pmap
.get("service_mapping_info"),
1087 # if port["modified_at"] > last_update:
1088 # sdn_need_update = True
1089 new_connected_ports
.append(port
["id"]) # TODO
1090 sdn_ports
.append(new_port
)
1094 "{} interfaces have not been created as VDU is on ERROR status".format(
1099 # connect external ports
1100 for index
, additional_port
in enumerate(additional_ports
):
1101 additional_port_id
= additional_port
.get(
1102 "service_endpoint_id"
1103 ) or "external-{}".format(index
)
1106 "service_endpoint_id": additional_port_id
,
1107 "service_endpoint_encapsulation_type": additional_port
.get(
1108 "service_endpoint_encapsulation_type", "dot1q"
1110 "service_endpoint_encapsulation_info": {
1111 "vlan": additional_port
.get("vlan") or vlan_used
,
1112 "mac": additional_port
.get("mac_address"),
1113 "device_id": additional_port
.get("device_id"),
1114 "device_interface_id": additional_port
.get(
1115 "device_interface_id"
1117 "switch_dpid": additional_port
.get("switch_dpid")
1118 or additional_port
.get("switch_id"),
1119 "switch_port": additional_port
.get("switch_port"),
1120 "service_mapping_info": additional_port
.get(
1121 "service_mapping_info"
1126 new_connected_ports
.append(additional_port_id
)
1129 # if there are more ports to connect or they have been modified, call create/update
1131 sdn_status
= "ERROR"
1132 sdn_info
= "; ".join(error_list
)
1133 elif set(connected_ports
) != set(new_connected_ports
) or sdn_need_update
:
1134 last_update
= time
.time()
1137 if len(sdn_ports
) < 2:
1138 sdn_status
= "ACTIVE"
1140 if not pending_ports
:
1142 "task={} {} new-sdn-net done, less than 2 ports".format(
1143 task_id
, ro_task
["target_id"]
1147 net_type
= params
.get("type") or "ELAN"
1151 ) = target_vim
.create_connectivity_service(net_type
, sdn_ports
)
1154 "task={} {} new-sdn-net={} created={}".format(
1155 task_id
, ro_task
["target_id"], sdn_net_id
, created
1159 created_items
= target_vim
.edit_connectivity_service(
1160 sdn_net_id
, conn_info
=created_items
, connection_points
=sdn_ports
1164 "task={} {} update-sdn-net={} created={}".format(
1165 task_id
, ro_task
["target_id"], sdn_net_id
, created
1169 connected_ports
= new_connected_ports
1171 wim_status_dict
= target_vim
.get_connectivity_service_status(
1172 sdn_net_id
, conn_info
=created_items
1174 sdn_status
= wim_status_dict
["sdn_status"]
1176 if wim_status_dict
.get("sdn_info"):
1177 sdn_info
= str(wim_status_dict
.get("sdn_info")) or ""
1179 if wim_status_dict
.get("error_msg"):
1180 sdn_info
= wim_status_dict
.get("error_msg") or ""
1183 if sdn_status
!= "ERROR":
1184 sdn_info
= "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1185 len(ports
) - pending_ports
, len(ports
)
1188 if sdn_status
== "ACTIVE":
1189 sdn_status
= "BUILD"
1191 ro_vim_item_update
= {
1192 "vim_id": sdn_net_id
,
1193 "vim_status": sdn_status
,
1195 "created_items": created_items
,
1196 "connected_ports": connected_ports
,
1197 "vim_details": sdn_info
,
1198 "last_update": last_update
,
1201 return sdn_status
, ro_vim_item_update
1202 except Exception as e
:
1204 "task={} vim={} new-net: {}".format(task_id
, ro_task
["target_id"], e
),
1205 exc_info
=not isinstance(
1206 e
, (sdnconn
.SdnConnectorError
, vimconn
.VimConnException
)
1209 ro_vim_item_update
= {
1210 "vim_status": "VIM_ERROR",
1212 "vim_details": str(e
),
1215 return "FAILED", ro_vim_item_update
1217 def delete(self
, ro_task
, task_index
):
1218 task
= ro_task
["tasks"][task_index
]
1219 task_id
= task
["task_id"]
1220 sdn_vim_id
= ro_task
["vim_info"].get("vim_id")
1221 ro_vim_item_update_ok
= {
1222 "vim_status": "DELETED",
1224 "vim_details": "DELETED",
1230 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1231 target_vim
.delete_connectivity_service(
1232 sdn_vim_id
, ro_task
["vim_info"].get("created_items")
1235 except Exception as e
:
1237 isinstance(e
, sdnconn
.SdnConnectorError
)
1238 and e
.http_code
== HTTPStatus
.NOT_FOUND
.value
1240 ro_vim_item_update_ok
["vim_details"] = "already deleted"
1243 "ro_task={} vim={} del-sdn-net={}: {}".format(
1244 ro_task
["_id"], ro_task
["target_id"], sdn_vim_id
, e
1246 exc_info
=not isinstance(
1247 e
, (sdnconn
.SdnConnectorError
, vimconn
.VimConnException
)
1250 ro_vim_item_update
= {
1251 "vim_status": "VIM_ERROR",
1252 "vim_details": "Error while deleting: {}".format(e
),
1255 return "FAILED", ro_vim_item_update
1258 "task={} {} del-sdn-net={} {}".format(
1260 ro_task
["target_id"],
1262 ro_vim_item_update_ok
.get("vim_details", ""),
1266 return "DONE", ro_vim_item_update_ok
1269 class ConfigValidate
:
1270 def __init__(self
, config
: Dict
):
1275 # default 1 min, allowed >= 60 or -1, -1 disables periodic checks
1277 self
.conf
["period"]["refresh_active"] >= 60
1278 or self
.conf
["period"]["refresh_active"] == -1
1280 return self
.conf
["period"]["refresh_active"]
1286 return self
.conf
["period"]["refresh_build"]
1290 return self
.conf
["period"]["refresh_image"]
1294 return self
.conf
["period"]["refresh_error"]
1297 def queue_size(self
):
1298 return self
.conf
["period"]["queue_size"]
1301 class NsWorker(threading
.Thread
):
1302 def __init__(self
, worker_index
, config
, plugins
, db
):
1305 :param worker_index: thread index
1306 :param config: general configuration of RO, among others the process_id with the docker id where it runs
1307 :param plugins: global shared dict with the loaded plugins
1308 :param db: database class instance to use
1310 threading
.Thread
.__init
__(self
)
1311 self
.config
= config
1312 self
.plugins
= plugins
1313 self
.plugin_name
= "unknown"
1314 self
.logger
= logging
.getLogger("ro.worker{}".format(worker_index
))
1315 self
.worker_index
= worker_index
1316 # refresh periods for created items
1317 self
.refresh_config
= ConfigValidate(config
)
1318 self
.task_queue
= queue
.Queue(self
.refresh_config
.queue_size
)
1319 # targetvim: vimplugin class
1321 # targetvim: vim information from database
1324 self
.vim_targets
= []
1325 self
.my_id
= config
["process_id"] + ":" + str(worker_index
)
1328 "net": VimInteractionNet(self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
),
1329 "vdu": VimInteractionVdu(self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
),
1330 "image": VimInteractionImage(
1331 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1333 "flavor": VimInteractionFlavor(
1334 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1336 "sdn_net": VimInteractionSdnNet(
1337 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1339 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
1340 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1343 self
.time_last_task_processed
= None
1344 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1345 self
.tasks_to_delete
= []
1346 # it is idle when there are not vim_targets associated
1348 self
.task_locked_time
= config
["global"]["task_locked_time"]
1350 def insert_task(self
, task
):
1352 self
.task_queue
.put(task
, False)
1355 raise NsWorkerException("timeout inserting a task")
1357 def terminate(self
):
1358 self
.insert_task("exit")
1360 def del_task(self
, task
):
1361 with self
.task_lock
:
1362 if task
["status"] == "SCHEDULED":
1363 task
["status"] = "SUPERSEDED"
1365 else: # task["status"] == "processing"
1366 self
.task_lock
.release()
1369 def _process_vim_config(self
, target_id
, db_vim
):
1371 Process vim config, creating vim configuration files as ca_cert
1372 :param target_id: vim/sdn/wim + id
1373 :param db_vim: Vim dictionary obtained from database
1374 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1376 if not db_vim
.get("config"):
1382 if db_vim
["config"].get("ca_cert_content"):
1383 file_name
= "{}:{}".format(target_id
, self
.worker_index
)
1387 except FileExistsError
:
1390 file_name
= file_name
+ "/ca_cert"
1392 with
open(file_name
, "w") as f
:
1393 f
.write(db_vim
["config"]["ca_cert_content"])
1394 del db_vim
["config"]["ca_cert_content"]
1395 db_vim
["config"]["ca_cert"] = file_name
1396 except Exception as e
:
1397 raise NsWorkerException(
1398 "Error writing to file '{}': {}".format(file_name
, e
)
1401 def _load_plugin(self
, name
, type="vim"):
1402 # type can be vim or sdn
1403 if "rovim_dummy" not in self
.plugins
:
1404 self
.plugins
["rovim_dummy"] = VimDummyConnector
1406 if "rosdn_dummy" not in self
.plugins
:
1407 self
.plugins
["rosdn_dummy"] = SdnDummyConnector
1409 if name
in self
.plugins
:
1410 return self
.plugins
[name
]
1413 for ep
in entry_points(group
="osm_ro{}.plugins".format(type), name
=name
):
1414 self
.plugins
[name
] = ep
.load()
1415 except Exception as e
:
1416 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name
, e
))
1418 if name
and name
not in self
.plugins
:
1419 raise NsWorkerException(
1420 "Plugin 'osm_{n}' has not been installed".format(n
=name
)
1423 return self
.plugins
[name
]
1425 def _unload_vim(self
, target_id
):
1427 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1428 :param target_id: Contains type:_id; where type can be 'vim', ...
1432 self
.db_vims
.pop(target_id
, None)
1433 self
.my_vims
.pop(target_id
, None)
1435 if target_id
in self
.vim_targets
:
1436 self
.vim_targets
.remove(target_id
)
1438 self
.logger
.info("Unloaded {}".format(target_id
))
1439 rmtree("{}:{}".format(target_id
, self
.worker_index
))
1440 except FileNotFoundError
:
1441 pass # this is raised by rmtree if folder does not exist
1442 except Exception as e
:
1443 self
.logger
.error("Cannot unload {}: {}".format(target_id
, e
))
1445 def _check_vim(self
, target_id
):
1447 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1448 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1451 target
, _
, _id
= target_id
.partition(":")
1457 loaded
= target_id
in self
.vim_targets
1467 step
= "Getting {} from db".format(target_id
)
1468 db_vim
= self
.db
.get_one(target_database
, {"_id": _id
})
1470 for op_index
, operation
in enumerate(
1471 db_vim
["_admin"].get("operations", ())
1473 if operation
["operationState"] != "PROCESSING":
1476 locked_at
= operation
.get("locked_at")
1478 if locked_at
is not None and locked_at
>= now
- self
.task_locked_time
:
1479 # some other thread is doing this operation
1483 op_text
= "_admin.operations.{}.".format(op_index
)
1485 if not self
.db
.set_one(
1489 op_text
+ "operationState": "PROCESSING",
1490 op_text
+ "locked_at": locked_at
,
1493 op_text
+ "locked_at": now
,
1494 "admin.current_operation": op_index
,
1496 fail_on_empty
=False,
1500 unset_dict
[op_text
+ "locked_at"] = None
1501 unset_dict
["current_operation"] = None
1502 step
= "Loading " + target_id
1503 error_text
= self
._load
_vim
(target_id
)
1506 step
= "Checking connectivity"
1509 self
.my_vims
[target_id
].check_vim_connectivity()
1511 self
.my_vims
[target_id
].check_credentials()
1513 update_dict
["_admin.operationalState"] = "ENABLED"
1514 update_dict
["_admin.detailed-status"] = ""
1515 unset_dict
[op_text
+ "detailed-status"] = None
1516 update_dict
[op_text
+ "operationState"] = "COMPLETED"
1520 except Exception as e
:
1521 error_text
= "{}: {}".format(step
, e
)
1522 self
.logger
.error("{} for {}: {}".format(step
, target_id
, e
))
1525 if update_dict
or unset_dict
:
1527 update_dict
[op_text
+ "operationState"] = "FAILED"
1528 update_dict
[op_text
+ "detailed-status"] = error_text
1529 unset_dict
.pop(op_text
+ "detailed-status", None)
1530 update_dict
["_admin.operationalState"] = "ERROR"
1531 update_dict
["_admin.detailed-status"] = error_text
1534 update_dict
[op_text
+ "statusEnteredTime"] = now
1538 q_filter
={"_id": _id
},
1539 update_dict
=update_dict
,
1541 fail_on_empty
=False,
1545 self
._unload
_vim
(target_id
)
1547 def _reload_vim(self
, target_id
):
1548 if target_id
in self
.vim_targets
:
1549 self
._load
_vim
(target_id
)
1551 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1552 # just remove it to force load again next time it is needed
1553 self
.db_vims
.pop(target_id
, None)
1555 def _load_vim(self
, target_id
):
1557 Load or reload a vim_account, sdn_controller or wim_account.
1558 Read content from database, load the plugin if not loaded.
1559 In case of error loading the plugin, it load a failing VIM_connector
1560 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1561 :param target_id: Contains type:_id; where type can be 'vim', ...
1562 :return: None if ok, descriptive text if error
1564 target
, _
, _id
= target_id
.partition(":")
1576 step
= "Getting {}={} from db".format(target
, _id
)
1577 # TODO process for wim, sdnc, ...
1578 vim
= self
.db
.get_one(target_database
, {"_id": _id
})
1580 # if deep_get(vim, "config", "sdn-controller"):
1581 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1582 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1584 step
= "Decrypting password"
1585 schema_version
= vim
.get("schema_version")
1586 self
.db
.encrypt_decrypt_fields(
1589 fields
=("password", "secret"),
1590 schema_version
=schema_version
,
1593 self
._process
_vim
_config
(target_id
, vim
)
1596 plugin_name
= "rovim_" + vim
["vim_type"]
1597 step
= "Loading plugin '{}'".format(plugin_name
)
1598 vim_module_conn
= self
._load
_plugin
(plugin_name
)
1599 step
= "Loading {}'".format(target_id
)
1600 self
.my_vims
[target_id
] = vim_module_conn(
1603 tenant_id
=vim
.get("vim_tenant_id"),
1604 tenant_name
=vim
.get("vim_tenant_name"),
1607 user
=vim
["vim_user"],
1608 passwd
=vim
["vim_password"],
1609 config
=vim
.get("config") or {},
1613 plugin_name
= "rosdn_" + (vim
.get("type") or vim
.get("wim_type"))
1614 step
= "Loading plugin '{}'".format(plugin_name
)
1615 vim_module_conn
= self
._load
_plugin
(plugin_name
, "sdn")
1616 step
= "Loading {}'".format(target_id
)
1618 wim_config
= wim
.pop("config", {}) or {}
1619 wim
["uuid"] = wim
["_id"]
1620 if "url" in wim
and "wim_url" not in wim
:
1621 wim
["wim_url"] = wim
["url"]
1622 elif "url" not in wim
and "wim_url" in wim
:
1623 wim
["url"] = wim
["wim_url"]
1626 wim_config
["dpid"] = wim
.pop("dpid")
1628 if wim
.get("switch_id"):
1629 wim_config
["switch_id"] = wim
.pop("switch_id")
1631 # wim, wim_account, config
1632 self
.my_vims
[target_id
] = vim_module_conn(wim
, wim
, wim_config
)
1633 self
.db_vims
[target_id
] = vim
1634 self
.error_status
= None
1637 "Connector loaded for {}, plugin={}".format(target_id
, plugin_name
)
1639 except Exception as e
:
1641 "Cannot load {} plugin={}: {} {}".format(
1642 target_id
, plugin_name
, step
, e
1646 self
.db_vims
[target_id
] = vim
or {}
1647 self
.db_vims
[target_id
] = FailingConnector(str(e
))
1648 error_status
= "{} Error: {}".format(step
, e
)
1652 if target_id
not in self
.vim_targets
:
1653 self
.vim_targets
.append(target_id
)
1655 def _get_db_task(self
):
1657 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1662 if not self
.time_last_task_processed
:
1663 self
.time_last_task_processed
= now
1668 # Log RO tasks only when loglevel is DEBUG
1669 if self.logger.getEffectiveLevel() == logging.DEBUG:
1676 + str(self.task_locked_time)
1678 + "time_last_task_processed="
1679 + str(self.time_last_task_processed)
1685 locked
= self
.db
.set_one(
1688 "target_id": self
.vim_targets
,
1689 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1690 "locked_at.lt": now
- self
.task_locked_time
,
1691 "to_check_at.lt": self
.time_last_task_processed
,
1692 "to_check_at.gt": -1,
1694 update_dict
={"locked_by": self
.my_id
, "locked_at": now
},
1695 fail_on_empty
=False,
1700 ro_task
= self
.db
.get_one(
1703 "target_id": self
.vim_targets
,
1704 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1710 if self
.time_last_task_processed
== now
:
1711 self
.time_last_task_processed
= None
1714 self
.time_last_task_processed
= now
1715 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
1717 except DbException
as e
:
1718 self
.logger
.error("Database exception at _get_db_task: {}".format(e
))
1719 except Exception as e
:
1720 self
.logger
.critical(
1721 "Unexpected exception at _get_db_task: {}".format(e
), exc_info
=True
1726 def _get_db_all_tasks(self
):
1728 Read all content of table ro_tasks to log it
1732 # Checking the content of the BD:
1735 ro_task
= self
.db
.get_list("ro_tasks")
1737 self
._log
_ro
_task
(rt
, None, None, "TASK_WF", "GET_ALL_TASKS")
1740 except DbException
as e
:
1741 self
.logger
.error("Database exception at _get_db_all_tasks: {}".format(e
))
1742 except Exception as e
:
1743 self
.logger
.critical(
1744 "Unexpected exception at _get_db_all_tasks: {}".format(e
), exc_info
=True
1749 def _log_ro_task(self
, ro_task
, db_ro_task_update
, db_ro_task_delete
, mark
, event
):
1751 Generate a log with the following format:
1753 Mark;Event;ro_task_id;locked_at;modified_at;created_at;to_check_at;locked_by;
1754 target_id;vim_info.refresh_at;vim_info;no_of_tasks;task_status;action_id;
1755 task_array_index;task_id;task_action;task_item;task_args
1759 TASK_WF;GET_TASK;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;1642158724.8210013;
1760 1642158640.7986135;1642158640.7986135;1642158640.7986135;b134c9494e75:0a
1761 ;vim:b7ff9e24-8868-4d68-8a57-a59dc11d0327;None;{'created': False,
1762 'created_items': None, 'vim_id': None, 'vim_name': None, 'vim_status': None,
1763 'vim_details': None, 'refresh_at': None};1;SCHEDULED;
1764 888f1864-749a-4fc2-bc1a-97c0fffd6a6f;0;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;
1765 CREATE;image;{'filter_dict': {'name': 'ubuntu-os-cloud:image-family:ubuntu-1804-lts'}}
1770 if ro_task
is not None and isinstance(ro_task
, dict):
1771 for t
in ro_task
["tasks"]:
1775 line
.append(ro_task
.get("_id", ""))
1776 line
.append(str(ro_task
.get("locked_at", "")))
1777 line
.append(str(ro_task
.get("modified_at", "")))
1778 line
.append(str(ro_task
.get("created_at", "")))
1779 line
.append(str(ro_task
.get("to_check_at", "")))
1780 line
.append(str(ro_task
.get("locked_by", "")))
1781 line
.append(str(ro_task
.get("target_id", "")))
1782 line
.append(str(ro_task
.get("vim_info", {}).get("refresh_at", "")))
1783 line
.append(str(ro_task
.get("vim_info", "")))
1784 line
.append(str(ro_task
.get("tasks", "")))
1785 if isinstance(t
, dict):
1786 line
.append(str(t
.get("status", "")))
1787 line
.append(str(t
.get("action_id", "")))
1789 line
.append(str(t
.get("task_id", "")))
1790 line
.append(str(t
.get("action", "")))
1791 line
.append(str(t
.get("item", "")))
1792 line
.append(str(t
.get("find_params", "")))
1793 line
.append(str(t
.get("params", "")))
1795 line
.extend([""] * 2)
1797 line
.extend([""] * 5)
1800 self
.logger
.debug(";".join(line
))
1801 elif db_ro_task_update
is not None and isinstance(db_ro_task_update
, dict):
1804 st
= "tasks.{}.status".format(i
)
1805 if st
not in db_ro_task_update
:
1810 line
.append(db_ro_task_update
.get("_id", ""))
1811 line
.append(str(db_ro_task_update
.get("locked_at", "")))
1812 line
.append(str(db_ro_task_update
.get("modified_at", "")))
1814 line
.append(str(db_ro_task_update
.get("to_check_at", "")))
1815 line
.append(str(db_ro_task_update
.get("locked_by", "")))
1817 line
.append(str(db_ro_task_update
.get("vim_info.refresh_at", "")))
1819 line
.append(str(db_ro_task_update
.get("vim_info", "")))
1820 line
.append(str(str(db_ro_task_update
).count(".status")))
1821 line
.append(db_ro_task_update
.get(st
, ""))
1824 line
.extend([""] * 3)
1826 self
.logger
.debug(";".join(line
))
1828 elif db_ro_task_delete
is not None and isinstance(db_ro_task_delete
, dict):
1832 line
.append(db_ro_task_delete
.get("_id", ""))
1834 line
.append(db_ro_task_delete
.get("modified_at", ""))
1835 line
.extend([""] * 13)
1836 self
.logger
.debug(";".join(line
))
1842 line
.extend([""] * 16)
1843 self
.logger
.debug(";".join(line
))
1845 except Exception as e
:
1846 self
.logger
.error("Error logging ro_task: {}".format(e
))
1848 def _delete_task(self
, ro_task
, task_index
, task_depends
, db_update
):
1850 Determine if this task need to be done or superseded
1853 my_task
= ro_task
["tasks"][task_index
]
1854 task_id
= my_task
["task_id"]
1855 needed_delete
= ro_task
["vim_info"]["created"] or ro_task
["vim_info"].get(
1856 "created_items", False
1859 if my_task
["status"] == "FAILED":
1860 return None, None # TODO need to be retry??
1863 for index
, task
in enumerate(ro_task
["tasks"]):
1864 if index
== task_index
or not task
:
1868 my_task
["target_record"] == task
["target_record"]
1869 and task
["action"] == "CREATE"
1872 db_update
["tasks.{}.status".format(index
)] = task
[
1875 elif task
["action"] == "CREATE" and task
["status"] not in (
1879 needed_delete
= False
1882 return self
.item2class
[my_task
["item"]].delete(ro_task
, task_index
)
1884 return "SUPERSEDED", None
1885 except Exception as e
:
1886 if not isinstance(e
, NsWorkerException
):
1887 self
.logger
.critical(
1888 "Unexpected exception at _delete_task task={}: {}".format(
1894 return "FAILED", {"vim_status": "VIM_ERROR", "vim_details": str(e
)}
1896 def _create_task(self
, ro_task
, task_index
, task_depends
, db_update
):
1898 Determine if this task need to create something at VIM
1901 my_task
= ro_task
["tasks"][task_index
]
1902 task_id
= my_task
["task_id"]
1905 if my_task
["status"] == "FAILED":
1906 return None, None # TODO need to be retry??
1907 elif my_task
["status"] == "SCHEDULED":
1908 # check if already created by another task
1909 for index
, task
in enumerate(ro_task
["tasks"]):
1910 if index
== task_index
or not task
:
1913 if task
["action"] == "CREATE" and task
["status"] not in (
1918 return task
["status"], "COPY_VIM_INFO"
1921 task_status
, ro_vim_item_update
= self
.item2class
[my_task
["item"]].new(
1922 ro_task
, task_index
, task_depends
1924 # TODO update other CREATE tasks
1925 except Exception as e
:
1926 if not isinstance(e
, NsWorkerException
):
1928 "Error executing task={}: {}".format(task_id
, e
), exc_info
=True
1931 task_status
= "FAILED"
1932 ro_vim_item_update
= {"vim_status": "VIM_ERROR", "vim_details": str(e
)}
1933 # TODO update ro_vim_item_update
1935 return task_status
, ro_vim_item_update
1939 def _get_dependency(self
, task_id
, ro_task
=None, target_id
=None):
1941 Look for dependency task
1942 :param task_id: Can be one of
1943 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1944 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1945 3. task.task_id: "<action_id>:number"
1948 :return: database ro_task plus index of task
1951 task_id
.startswith("vim:")
1952 or task_id
.startswith("sdn:")
1953 or task_id
.startswith("wim:")
1955 target_id
, _
, task_id
= task_id
.partition(" ")
1957 if task_id
.startswith("nsrs:") or task_id
.startswith("vnfrs:"):
1958 ro_task_dependency
= self
.db
.get_one(
1960 q_filter
={"target_id": target_id
, "tasks.target_record_id": task_id
},
1961 fail_on_empty
=False,
1964 if ro_task_dependency
:
1965 for task_index
, task
in enumerate(ro_task_dependency
["tasks"]):
1966 if task
["target_record_id"] == task_id
:
1967 return ro_task_dependency
, task_index
1971 for task_index
, task
in enumerate(ro_task
["tasks"]):
1972 if task
and task
["task_id"] == task_id
:
1973 return ro_task
, task_index
1975 ro_task_dependency
= self
.db
.get_one(
1978 "tasks.ANYINDEX.task_id": task_id
,
1979 "tasks.ANYINDEX.target_record.ne": None,
1981 fail_on_empty
=False,
1984 if ro_task_dependency
:
1985 for task_index
, task
in ro_task_dependency
["tasks"]:
1986 if task
["task_id"] == task_id
:
1987 return ro_task_dependency
, task_index
1988 raise NsWorkerException("Cannot get depending task {}".format(task_id
))
1990 def update_vm_refresh(self
):
1991 """Enables the VM status updates if self.refresh_config.active parameter
1992 is not -1 and than updates the DB accordingly
1996 self
.logger
.debug("Checking if VM status update config")
1997 next_refresh
= time
.time()
1998 if self
.refresh_config
.active
== -1:
2001 next_refresh
+= self
.refresh_config
.active
2003 if next_refresh
!= -1:
2004 db_ro_task_update
= {}
2006 next_check_at
= now
+ (24 * 60 * 60)
2007 next_check_at
= min(next_check_at
, next_refresh
)
2008 db_ro_task_update
["vim_info.refresh_at"] = next_refresh
2009 db_ro_task_update
["to_check_at"] = next_check_at
2012 "Finding tasks which to be updated to enable VM status updates"
2014 refresh_tasks
= self
.db
.get_list(
2017 "tasks.status": "DONE",
2018 "to_check_at.lt": 0,
2021 self
.logger
.debug("Updating tasks to change the to_check_at status")
2022 for task
in refresh_tasks
:
2029 update_dict
=db_ro_task_update
,
2033 except Exception as e
:
2034 self
.logger
.error(f
"Error updating tasks to enable VM status updates: {e}")
2036 def _process_pending_tasks(self
, ro_task
):
2037 ro_task_id
= ro_task
["_id"]
2040 next_check_at
= now
+ (24 * 60 * 60)
2041 db_ro_task_update
= {}
2043 def _update_refresh(new_status
):
2044 # compute next_refresh
2046 nonlocal next_check_at
2047 nonlocal db_ro_task_update
2050 next_refresh
= time
.time()
2052 if task
["item"] in ("image", "flavor"):
2053 next_refresh
+= self
.refresh_config
.image
2054 elif new_status
== "BUILD":
2055 next_refresh
+= self
.refresh_config
.build
2056 elif new_status
== "DONE":
2057 if self
.refresh_config
.active
== -1:
2060 next_refresh
+= self
.refresh_config
.active
2062 next_refresh
+= self
.refresh_config
.error
2064 next_check_at
= min(next_check_at
, next_refresh
)
2065 db_ro_task_update
["vim_info.refresh_at"] = next_refresh
2066 ro_task
["vim_info"]["refresh_at"] = next_refresh
2070 # Log RO tasks only when loglevel is DEBUG
2071 if self.logger.getEffectiveLevel() == logging.DEBUG:
2072 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
2074 # Check if vim status refresh is enabled again
2075 self
.update_vm_refresh()
2076 # 0: get task_status_create
2078 task_status_create
= None
2082 for t
in ro_task
["tasks"]
2084 and t
["action"] == "CREATE"
2085 and t
["status"] in ("BUILD", "DONE")
2091 task_status_create
= task_create
["status"]
2093 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
2094 for task_action
in ("DELETE", "CREATE", "EXEC"):
2095 db_vim_update
= None
2098 for task_index
, task
in enumerate(ro_task
["tasks"]):
2100 continue # task deleted
2103 target_update
= None
2107 task_action
in ("DELETE", "EXEC")
2108 and task
["status"] not in ("SCHEDULED", "BUILD")
2110 or task
["action"] != task_action
2112 task_action
== "CREATE"
2113 and task
["status"] in ("FINISHED", "SUPERSEDED")
2118 task_path
= "tasks.{}.status".format(task_index
)
2120 db_vim_info_update
= None
2122 if task
["status"] == "SCHEDULED":
2123 # check if tasks that this depends on have been completed
2124 dependency_not_completed
= False
2126 for dependency_task_id
in task
.get("depends_on") or ():
2129 dependency_task_index
,
2130 ) = self
._get
_dependency
(
2131 dependency_task_id
, target_id
=ro_task
["target_id"]
2133 dependency_task
= dependency_ro_task
["tasks"][
2134 dependency_task_index
2137 if dependency_task
["status"] == "SCHEDULED":
2138 dependency_not_completed
= True
2139 next_check_at
= min(
2140 next_check_at
, dependency_ro_task
["to_check_at"]
2142 # must allow dependent task to be processed first
2143 # to do this set time after last_task_processed
2144 next_check_at
= max(
2145 self
.time_last_task_processed
, next_check_at
2148 elif dependency_task
["status"] == "FAILED":
2149 error_text
= "Cannot {} {} because depends on failed {} {} id={}): {}".format(
2152 dependency_task
["action"],
2153 dependency_task
["item"],
2155 dependency_ro_task
["vim_info"].get(
2160 "task={} {}".format(task
["task_id"], error_text
)
2162 raise NsWorkerException(error_text
)
2164 task_depends
[dependency_task_id
] = dependency_ro_task
[
2168 "TASK-{}".format(dependency_task_id
)
2169 ] = dependency_ro_task
["vim_info"]["vim_id"]
2171 if dependency_not_completed
:
2172 # TODO set at vim_info.vim_details that it is waiting
2175 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
2176 # the task of renew this locking. It will update database locket_at periodically
2178 lock_object
= LockRenew
.add_lock_object(
2179 "ro_tasks", ro_task
, self
2182 if task
["action"] == "DELETE":
2183 (new_status
, db_vim_info_update
,) = self
._delete
_task
(
2184 ro_task
, task_index
, task_depends
, db_ro_task_update
2187 "FINISHED" if new_status
== "DONE" else new_status
2189 # ^with FINISHED instead of DONE it will not be refreshing
2191 if new_status
in ("FINISHED", "SUPERSEDED"):
2192 target_update
= "DELETE"
2193 elif task
["action"] == "EXEC":
2198 ) = self
.item2class
[task
["item"]].exec(
2199 ro_task
, task_index
, task_depends
2202 "FINISHED" if new_status
== "DONE" else new_status
2204 # ^with FINISHED instead of DONE it will not be refreshing
2207 # load into database the modified db_task_update "retries" and "next_retry"
2208 if db_task_update
.get("retries"):
2210 "tasks.{}.retries".format(task_index
)
2211 ] = db_task_update
["retries"]
2213 next_check_at
= time
.time() + db_task_update
.get(
2216 target_update
= None
2217 elif task
["action"] == "CREATE":
2218 if task
["status"] == "SCHEDULED":
2219 if task_status_create
:
2220 new_status
= task_status_create
2221 target_update
= "COPY_VIM_INFO"
2223 new_status
, db_vim_info_update
= self
.item2class
[
2225 ].new(ro_task
, task_index
, task_depends
)
2226 # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
2227 _update_refresh(new_status
)
2229 refresh_at
= ro_task
["vim_info"]["refresh_at"]
2230 if refresh_at
and refresh_at
!= -1 and now
> refresh_at
:
2231 new_status
, db_vim_info_update
= self
.item2class
[
2234 _update_refresh(new_status
)
2236 # The refresh is updated to avoid set the value of "refresh_at" to
2237 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2238 # because it can happen that in this case the task is never processed
2239 _update_refresh(task
["status"])
2241 except Exception as e
:
2242 new_status
= "FAILED"
2243 db_vim_info_update
= {
2244 "vim_status": "VIM_ERROR",
2245 "vim_details": str(e
),
2249 e
, (NsWorkerException
, vimconn
.VimConnException
)
2252 "Unexpected exception at _delete_task task={}: {}".format(
2259 if db_vim_info_update
:
2260 db_vim_update
= db_vim_info_update
.copy()
2261 db_ro_task_update
.update(
2264 for k
, v
in db_vim_info_update
.items()
2267 ro_task
["vim_info"].update(db_vim_info_update
)
2270 if task_action
== "CREATE":
2271 task_status_create
= new_status
2272 db_ro_task_update
[task_path
] = new_status
2274 if target_update
or db_vim_update
:
2275 if target_update
== "DELETE":
2276 self
._update
_target
(task
, None)
2277 elif target_update
== "COPY_VIM_INFO":
2278 self
._update
_target
(task
, ro_task
["vim_info"])
2280 self
._update
_target
(task
, db_vim_update
)
2282 except Exception as e
:
2284 isinstance(e
, DbException
)
2285 and e
.http_code
== HTTPStatus
.NOT_FOUND
2287 # if the vnfrs or nsrs has been removed from database, this task must be removed
2289 "marking to delete task={}".format(task
["task_id"])
2291 self
.tasks_to_delete
.append(task
)
2294 "Unexpected exception at _update_target task={}: {}".format(
2300 locked_at
= ro_task
["locked_at"]
2304 lock_object
["locked_at"],
2305 lock_object
["locked_at"] + self
.task_locked_time
,
2307 # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
2308 # contain exactly locked_at + self.task_locked_time
2309 LockRenew
.remove_lock_object(lock_object
)
2312 "_id": ro_task
["_id"],
2313 "to_check_at": ro_task
["to_check_at"],
2314 "locked_at": locked_at
,
2316 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2317 # outside this task (by ro_nbi) do not update it
2318 db_ro_task_update
["locked_by"] = None
2319 # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
2320 db_ro_task_update
["locked_at"] = int(now
- self
.task_locked_time
)
2321 db_ro_task_update
["modified_at"] = now
2322 db_ro_task_update
["to_check_at"] = next_check_at
2325 # Log RO tasks only when loglevel is DEBUG
2326 if self.logger.getEffectiveLevel() == logging.DEBUG:
2327 db_ro_task_update_log = db_ro_task_update.copy()
2328 db_ro_task_update_log["_id"] = q_filter["_id"]
2329 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2332 if not self
.db
.set_one(
2334 update_dict
=db_ro_task_update
,
2336 fail_on_empty
=False,
2338 del db_ro_task_update
["to_check_at"]
2339 del q_filter
["to_check_at"]
2341 # Log RO tasks only when loglevel is DEBUG
2342 if self.logger.getEffectiveLevel() == logging.DEBUG:
2345 db_ro_task_update_log,
2348 "SET_TASK " + str(q_filter),
2354 update_dict
=db_ro_task_update
,
2357 except DbException
as e
:
2359 "ro_task={} Error updating database {}".format(ro_task_id
, e
)
2361 except Exception as e
:
2363 "Error executing ro_task={}: {}".format(ro_task_id
, e
), exc_info
=True
2366 def _update_target(self
, task
, ro_vim_item_update
):
2367 table
, _
, temp
= task
["target_record"].partition(":")
2368 _id
, _
, path_vim_status
= temp
.partition(":")
2369 path_item
= path_vim_status
[: path_vim_status
.rfind(".")]
2370 path_item
= path_item
[: path_item
.rfind(".")]
2371 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
2372 # path_item: dot separated list targeting record information, e.g. "vdur.10"
2374 if ro_vim_item_update
:
2376 path_vim_status
+ "." + k
: v
2377 for k
, v
in ro_vim_item_update
.items()
2379 in ("vim_id", "vim_details", "vim_name", "vim_status", "interfaces")
2382 if path_vim_status
.startswith("vdur."):
2383 # for backward compatibility, add vdur.name apart from vdur.vim_name
2384 if ro_vim_item_update
.get("vim_name"):
2385 update_dict
[path_item
+ ".name"] = ro_vim_item_update
["vim_name"]
2387 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
2388 if ro_vim_item_update
.get("vim_id"):
2389 update_dict
[path_item
+ ".vim-id"] = ro_vim_item_update
["vim_id"]
2391 # update general status
2392 if ro_vim_item_update
.get("vim_status"):
2393 update_dict
[path_item
+ ".status"] = ro_vim_item_update
[
2397 if ro_vim_item_update
.get("interfaces"):
2398 path_interfaces
= path_item
+ ".interfaces"
2400 for i
, iface
in enumerate(ro_vim_item_update
.get("interfaces")):
2404 path_interfaces
+ ".{}.".format(i
) + k
: v
2405 for k
, v
in iface
.items()
2406 if k
in ("vlan", "compute_node", "pci")
2410 # put ip_address and mac_address with ip-address and mac-address
2411 if iface
.get("ip_address"):
2413 path_interfaces
+ ".{}.".format(i
) + "ip-address"
2414 ] = iface
["ip_address"]
2416 if iface
.get("mac_address"):
2418 path_interfaces
+ ".{}.".format(i
) + "mac-address"
2419 ] = iface
["mac_address"]
2421 if iface
.get("mgmt_vnf_interface") and iface
.get("ip_address"):
2422 update_dict
["ip-address"] = iface
.get("ip_address").split(
2426 if iface
.get("mgmt_vdu_interface") and iface
.get("ip_address"):
2427 update_dict
[path_item
+ ".ip-address"] = iface
.get(
2431 self
.db
.set_one(table
, q_filter
={"_id": _id
}, update_dict
=update_dict
)
2433 update_dict
= {path_item
+ ".status": "DELETED"}
2436 q_filter
={"_id": _id
},
2437 update_dict
=update_dict
,
2438 unset
={path_vim_status
: None},
2441 def _process_delete_db_tasks(self
):
2443 Delete task from database because vnfrs or nsrs or both have been deleted
2444 :return: None. Uses and modify self.tasks_to_delete
2446 while self
.tasks_to_delete
:
2447 task
= self
.tasks_to_delete
[0]
2448 vnfrs_deleted
= None
2449 nsr_id
= task
["nsr_id"]
2451 if task
["target_record"].startswith("vnfrs:"):
2452 # check if nsrs is present
2453 if self
.db
.get_one("nsrs", {"_id": nsr_id
}, fail_on_empty
=False):
2454 vnfrs_deleted
= task
["target_record"].split(":")[1]
2457 self
.delete_db_tasks(self
.db
, nsr_id
, vnfrs_deleted
)
2458 except Exception as e
:
2460 "Error deleting task={}: {}".format(task
["task_id"], e
)
2462 self
.tasks_to_delete
.pop(0)
2465 def delete_db_tasks(db
, nsr_id
, vnfrs_deleted
):
2467 Static method because it is called from osm_ng_ro.ns
2468 :param db: instance of database to use
2469 :param nsr_id: affected nsrs id
2470 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2471 :return: None, exception is fails
2474 for retry
in range(retries
):
2475 ro_tasks
= db
.get_list("ro_tasks", {"tasks.nsr_id": nsr_id
})
2479 for ro_task
in ro_tasks
:
2481 to_delete_ro_task
= True
2483 for index
, task
in enumerate(ro_task
["tasks"]):
2486 elif (not vnfrs_deleted
and task
["nsr_id"] == nsr_id
) or (
2488 and task
["target_record"].startswith("vnfrs:" + vnfrs_deleted
)
2490 db_update
["tasks.{}".format(index
)] = None
2492 # used by other nsr, ro_task cannot be deleted
2493 to_delete_ro_task
= False
2495 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2496 if to_delete_ro_task
:
2500 "_id": ro_task
["_id"],
2501 "modified_at": ro_task
["modified_at"],
2503 fail_on_empty
=False,
2507 db_update
["modified_at"] = now
2511 "_id": ro_task
["_id"],
2512 "modified_at": ro_task
["modified_at"],
2514 update_dict
=db_update
,
2515 fail_on_empty
=False,
2521 raise NsWorkerException("Exceeded {} retries".format(retries
))
2525 self
.logger
.info("Starting")
2527 # step 1: get commands from queue
2529 if self
.vim_targets
:
2530 task
= self
.task_queue
.get(block
=False)
2533 self
.logger
.debug("enters in idle state")
2535 task
= self
.task_queue
.get(block
=True)
2538 if task
[0] == "terminate":
2540 elif task
[0] == "load_vim":
2541 self
.logger
.info("order to load vim {}".format(task
[1]))
2542 self
._load
_vim
(task
[1])
2543 elif task
[0] == "unload_vim":
2544 self
.logger
.info("order to unload vim {}".format(task
[1]))
2545 self
._unload
_vim
(task
[1])
2546 elif task
[0] == "reload_vim":
2547 self
._reload
_vim
(task
[1])
2548 elif task
[0] == "check_vim":
2549 self
.logger
.info("order to check vim {}".format(task
[1]))
2550 self
._check
_vim
(task
[1])
2552 except Exception as e
:
2553 if isinstance(e
, queue
.Empty
):
2556 self
.logger
.critical(
2557 "Error processing task: {}".format(e
), exc_info
=True
2560 # step 2: process pending_tasks, delete not needed tasks
2562 if self
.tasks_to_delete
:
2563 self
._process
_delete
_db
_tasks
()
2566 # Log RO tasks only when loglevel is DEBUG
2567 if self.logger.getEffectiveLevel() == logging.DEBUG:
2568 _ = self._get_db_all_tasks()
2570 ro_task
= self
._get
_db
_task
()
2572 self
._process
_pending
_tasks
(ro_task
)
2576 except Exception as e
:
2577 self
.logger
.critical(
2578 "Unexpected exception at run: " + str(e
), exc_info
=True
2581 self
.logger
.info("Finishing")