1 # -*- coding: utf-8 -*-
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
27 from copy
import deepcopy
28 from http
import HTTPStatus
32 from shutil
import rmtree
36 from typing
import Dict
37 from unittest
.mock
import Mock
39 from importlib_metadata
import entry_points
40 from osm_common
.dbbase
import DbException
41 from osm_ng_ro
.vim_admin
import LockRenew
42 from osm_ro_plugin
import sdnconn
, vimconn
43 from osm_ro_plugin
.sdn_dummy
import SdnDummyConnector
44 from osm_ro_plugin
.vim_dummy
import VimDummyConnector
47 __author__
= "Alfonso Tierno"
48 __date__
= "$28-Sep-2017 12:07:15$"
51 def deep_get(target_dict
, *args
, **kwargs
):
53 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
54 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
55 :param target_dict: dictionary to be read
56 :param args: list of keys to read from target_dict
57 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
58 :return: The wanted value if exist, None or default otherwise
61 if not isinstance(target_dict
, dict) or key
not in target_dict
:
62 return kwargs
.get("default")
63 target_dict
= target_dict
[key
]
67 class NsWorkerException(Exception):
71 class FailingConnector
:
72 def __init__(self
, error_msg
):
73 self
.error_msg
= error_msg
75 for method
in dir(vimconn
.VimConnector
):
78 self
, method
, Mock(side_effect
=vimconn
.VimConnException(error_msg
))
81 for method
in dir(sdnconn
.SdnConnectorBase
):
84 self
, method
, Mock(side_effect
=sdnconn
.SdnConnectorError(error_msg
))
88 class NsWorkerExceptionNotFound(NsWorkerException
):
92 class VimInteractionBase
:
93 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
94 It implements methods that does nothing and return ok"""
96 def __init__(self
, db
, my_vims
, db_vims
, logger
):
99 self
.my_vims
= my_vims
100 self
.db_vims
= db_vims
102 def new(self
, ro_task
, task_index
, task_depends
):
105 def refresh(self
, ro_task
):
106 """skip calling VIM to get image, flavor status. Assumes ok"""
107 if ro_task
["vim_info"]["vim_status"] == "VIM_ERROR":
112 def delete(self
, ro_task
, task_index
):
113 """skip calling VIM to delete image. Assumes ok"""
116 def exec(self
, ro_task
, task_index
, task_depends
):
117 return "DONE", None, None
120 class VimInteractionNet(VimInteractionBase
):
121 def new(self
, ro_task
, task_index
, task_depends
):
123 task
= ro_task
["tasks"][task_index
]
124 task_id
= task
["task_id"]
127 target_vim
= self
.my_vims
[ro_task
["target_id"]]
129 mgmtnet_defined_in_vim
= False
133 if task
.get("find_params"):
134 # if management, get configuration of VIM
135 if task
["find_params"].get("filter_dict"):
136 vim_filter
= task
["find_params"]["filter_dict"]
138 elif task
["find_params"].get("mgmt"):
141 self
.db_vims
[ro_task
["target_id"]],
143 "management_network_id",
145 mgmtnet_defined_in_vim
= True
147 "id": self
.db_vims
[ro_task
["target_id"]]["config"][
148 "management_network_id"
152 self
.db_vims
[ro_task
["target_id"]],
154 "management_network_name",
156 mgmtnet_defined_in_vim
= True
158 "name": self
.db_vims
[ro_task
["target_id"]]["config"][
159 "management_network_name"
163 vim_filter
= {"name": task
["find_params"]["name"]}
165 raise NsWorkerExceptionNotFound(
166 "Invalid find_params for new_net {}".format(task
["find_params"])
169 vim_nets
= target_vim
.get_network_list(vim_filter
)
170 if not vim_nets
and not task
.get("params"):
171 # If there is mgmt-network in the descriptor,
172 # there is no mapping of that network to a VIM network in the descriptor,
173 # also there is no mapping in the "--config" parameter or at VIM creation;
174 # that mgmt-network will be created.
175 if mgmtnet
and not mgmtnet_defined_in_vim
:
177 vim_filter
.get("name")
178 if vim_filter
.get("name")
179 else vim_filter
.get("id")[:16]
181 vim_net_id
, created_items
= target_vim
.new_network(
185 "Created mgmt network vim_net_id: {}".format(vim_net_id
)
189 raise NsWorkerExceptionNotFound(
190 "Network not found with this criteria: '{}'".format(
191 task
.get("find_params")
194 elif len(vim_nets
) > 1:
195 raise NsWorkerException(
196 "More than one network found with this criteria: '{}'".format(
202 vim_net_id
= vim_nets
[0]["id"]
205 params
= task
["params"]
206 vim_net_id
, created_items
= target_vim
.new_network(**params
)
209 ro_vim_item_update
= {
210 "vim_id": vim_net_id
,
211 "vim_status": "BUILD",
213 "created_items": created_items
,
218 "task={} {} new-net={} created={}".format(
219 task_id
, ro_task
["target_id"], vim_net_id
, created
223 return "BUILD", ro_vim_item_update
224 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
226 "task={} vim={} new-net: {}".format(task_id
, ro_task
["target_id"], e
)
228 ro_vim_item_update
= {
229 "vim_status": "VIM_ERROR",
231 "vim_message": str(e
),
234 return "FAILED", ro_vim_item_update
236 def refresh(self
, ro_task
):
237 """Call VIM to get network status"""
238 ro_task_id
= ro_task
["_id"]
239 target_vim
= self
.my_vims
[ro_task
["target_id"]]
240 vim_id
= ro_task
["vim_info"]["vim_id"]
241 net_to_refresh_list
= [vim_id
]
244 vim_dict
= target_vim
.refresh_nets_status(net_to_refresh_list
)
245 vim_info
= vim_dict
[vim_id
]
247 if vim_info
["status"] == "ACTIVE":
249 elif vim_info
["status"] == "BUILD":
250 task_status
= "BUILD"
252 task_status
= "FAILED"
253 except vimconn
.VimConnException
as e
:
254 # Mark all tasks at VIM_ERROR status
256 "ro_task={} vim={} get-net={}: {}".format(
257 ro_task_id
, ro_task
["target_id"], vim_id
, e
260 vim_info
= {"status": "VIM_ERROR", "error_msg": str(e
)}
261 task_status
= "FAILED"
263 ro_vim_item_update
= {}
264 if ro_task
["vim_info"]["vim_status"] != vim_info
["status"]:
265 ro_vim_item_update
["vim_status"] = vim_info
["status"]
267 if ro_task
["vim_info"]["vim_name"] != vim_info
.get("name"):
268 ro_vim_item_update
["vim_name"] = vim_info
.get("name")
270 if vim_info
["status"] in ("ERROR", "VIM_ERROR"):
271 if ro_task
["vim_info"]["vim_message"] != vim_info
.get("error_msg"):
272 ro_vim_item_update
["vim_message"] = vim_info
.get("error_msg")
273 elif vim_info
["status"] == "DELETED":
274 ro_vim_item_update
["vim_id"] = None
275 ro_vim_item_update
["vim_message"] = "Deleted externally"
277 if ro_task
["vim_info"]["vim_details"] != vim_info
["vim_info"]:
278 ro_vim_item_update
["vim_details"] = vim_info
["vim_info"]
280 if ro_vim_item_update
:
282 "ro_task={} {} get-net={}: status={} {}".format(
284 ro_task
["target_id"],
286 ro_vim_item_update
.get("vim_status"),
287 ro_vim_item_update
.get("vim_message")
288 if ro_vim_item_update
.get("vim_status") != "ACTIVE"
293 return task_status
, ro_vim_item_update
295 def delete(self
, ro_task
, task_index
):
296 task
= ro_task
["tasks"][task_index
]
297 task_id
= task
["task_id"]
298 net_vim_id
= ro_task
["vim_info"]["vim_id"]
299 ro_vim_item_update_ok
= {
300 "vim_status": "DELETED",
302 "vim_message": "DELETED",
307 if net_vim_id
or ro_task
["vim_info"]["created_items"]:
308 target_vim
= self
.my_vims
[ro_task
["target_id"]]
309 target_vim
.delete_network(
310 net_vim_id
, ro_task
["vim_info"]["created_items"]
312 except vimconn
.VimConnNotFoundException
:
313 ro_vim_item_update_ok
["vim_message"] = "already deleted"
314 except vimconn
.VimConnException
as e
:
316 "ro_task={} vim={} del-net={}: {}".format(
317 ro_task
["_id"], ro_task
["target_id"], net_vim_id
, e
320 ro_vim_item_update
= {
321 "vim_status": "VIM_ERROR",
322 "vim_message": "Error while deleting: {}".format(e
),
325 return "FAILED", ro_vim_item_update
328 "task={} {} del-net={} {}".format(
330 ro_task
["target_id"],
332 ro_vim_item_update_ok
.get("vim_message", ""),
336 return "DONE", ro_vim_item_update_ok
339 class VimInteractionVdu(VimInteractionBase
):
340 max_retries_inject_ssh_key
= 20 # 20 times
341 time_retries_inject_ssh_key
= 30 # wevery 30 seconds
343 def new(self
, ro_task
, task_index
, task_depends
):
344 task
= ro_task
["tasks"][task_index
]
345 task_id
= task
["task_id"]
348 target_vim
= self
.my_vims
[ro_task
["target_id"]]
352 params
= task
["params"]
353 params_copy
= deepcopy(params
)
354 net_list
= params_copy
["net_list"]
357 # change task_id into network_id
358 if "net_id" in net
and net
["net_id"].startswith("TASK-"):
359 network_id
= task_depends
[net
["net_id"]]
362 raise NsWorkerException(
363 "Cannot create VM because depends on a network not created or found "
364 "for {}".format(net
["net_id"])
367 net
["net_id"] = network_id
369 if params_copy
["image_id"].startswith("TASK-"):
370 params_copy
["image_id"] = task_depends
[params_copy
["image_id"]]
372 if params_copy
["flavor_id"].startswith("TASK-"):
373 params_copy
["flavor_id"] = task_depends
[params_copy
["flavor_id"]]
375 affinity_group_list
= params_copy
["affinity_group_list"]
376 for affinity_group
in affinity_group_list
:
377 # change task_id into affinity_group_id
378 if "affinity_group_id" in affinity_group
and affinity_group
[
380 ].startswith("TASK-"):
381 affinity_group_id
= task_depends
[
382 affinity_group
["affinity_group_id"]
385 if not affinity_group_id
:
386 raise NsWorkerException(
387 "found for {}".format(affinity_group
["affinity_group_id"])
390 affinity_group
["affinity_group_id"] = affinity_group_id
392 vim_vm_id
, created_items
= target_vim
.new_vminstance(**params_copy
)
393 interfaces
= [iface
["vim_id"] for iface
in params_copy
["net_list"]]
395 # add to created items previous_created_volumes (healing)
396 if task
.get("previous_created_volumes"):
397 for k
, v
in task
["previous_created_volumes"].items():
400 ro_vim_item_update
= {
402 "vim_status": "BUILD",
404 "created_items": created_items
,
407 "interfaces_vim_ids": interfaces
,
409 "interfaces_backup": [],
412 "task={} {} new-vm={} created={}".format(
413 task_id
, ro_task
["target_id"], vim_vm_id
, created
417 return "BUILD", ro_vim_item_update
418 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
419 self
.logger
.debug(traceback
.format_exc())
421 "task={} {} new-vm: {}".format(task_id
, ro_task
["target_id"], e
)
423 ro_vim_item_update
= {
424 "vim_status": "VIM_ERROR",
426 "vim_message": str(e
),
429 return "FAILED", ro_vim_item_update
431 def delete(self
, ro_task
, task_index
):
432 task
= ro_task
["tasks"][task_index
]
433 task_id
= task
["task_id"]
434 vm_vim_id
= ro_task
["vim_info"]["vim_id"]
435 ro_vim_item_update_ok
= {
436 "vim_status": "DELETED",
438 "vim_message": "DELETED",
444 "delete_vminstance: vm_vim_id={} created_items={}".format(
445 vm_vim_id
, ro_task
["vim_info"]["created_items"]
448 if vm_vim_id
or ro_task
["vim_info"]["created_items"]:
449 target_vim
= self
.my_vims
[ro_task
["target_id"]]
450 target_vim
.delete_vminstance(
452 ro_task
["vim_info"]["created_items"],
453 ro_task
["vim_info"].get("volumes_to_hold", []),
455 except vimconn
.VimConnNotFoundException
:
456 ro_vim_item_update_ok
["vim_message"] = "already deleted"
457 except vimconn
.VimConnException
as e
:
459 "ro_task={} vim={} del-vm={}: {}".format(
460 ro_task
["_id"], ro_task
["target_id"], vm_vim_id
, e
463 ro_vim_item_update
= {
464 "vim_status": "VIM_ERROR",
465 "vim_message": "Error while deleting: {}".format(e
),
468 return "FAILED", ro_vim_item_update
471 "task={} {} del-vm={} {}".format(
473 ro_task
["target_id"],
475 ro_vim_item_update_ok
.get("vim_message", ""),
479 return "DONE", ro_vim_item_update_ok
481 def refresh(self
, ro_task
):
482 """Call VIM to get vm status"""
483 ro_task_id
= ro_task
["_id"]
484 target_vim
= self
.my_vims
[ro_task
["target_id"]]
485 vim_id
= ro_task
["vim_info"]["vim_id"]
490 vm_to_refresh_list
= [vim_id
]
492 vim_dict
= target_vim
.refresh_vms_status(vm_to_refresh_list
)
493 vim_info
= vim_dict
[vim_id
]
495 if vim_info
["status"] == "ACTIVE":
497 elif vim_info
["status"] == "BUILD":
498 task_status
= "BUILD"
500 task_status
= "FAILED"
502 # try to load and parse vim_information
504 vim_info_info
= yaml
.safe_load(vim_info
["vim_info"])
505 if vim_info_info
.get("name"):
506 vim_info
["name"] = vim_info_info
["name"]
509 except vimconn
.VimConnException
as e
:
510 # Mark all tasks at VIM_ERROR status
512 "ro_task={} vim={} get-vm={}: {}".format(
513 ro_task_id
, ro_task
["target_id"], vim_id
, e
516 vim_info
= {"status": "VIM_ERROR", "error_msg": str(e
)}
517 task_status
= "FAILED"
519 ro_vim_item_update
= {}
521 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
523 if vim_info
.get("interfaces"):
524 for vim_iface_id
in ro_task
["vim_info"]["interfaces_vim_ids"]:
528 for iface
in vim_info
["interfaces"]
529 if vim_iface_id
== iface
["vim_interface_id"]
534 # iface.pop("vim_info", None)
535 vim_interfaces
.append(iface
)
539 for t
in ro_task
["tasks"]
540 if t
and t
["action"] == "CREATE" and t
["status"] != "FINISHED"
542 if vim_interfaces
and task_create
.get("mgmt_vnf_interface") is not None:
543 vim_interfaces
[task_create
["mgmt_vnf_interface"]][
547 mgmt_vdu_iface
= task_create
.get(
548 "mgmt_vdu_interface", task_create
.get("mgmt_vnf_interface", 0)
551 vim_interfaces
[mgmt_vdu_iface
]["mgmt_vdu_interface"] = True
553 if ro_task
["vim_info"]["interfaces"] != vim_interfaces
:
554 ro_vim_item_update
["interfaces"] = vim_interfaces
556 if ro_task
["vim_info"]["vim_status"] != vim_info
["status"]:
557 ro_vim_item_update
["vim_status"] = vim_info
["status"]
559 if ro_task
["vim_info"]["vim_name"] != vim_info
.get("name"):
560 ro_vim_item_update
["vim_name"] = vim_info
.get("name")
562 if vim_info
["status"] in ("ERROR", "VIM_ERROR"):
563 if ro_task
["vim_info"]["vim_message"] != vim_info
.get("error_msg"):
564 ro_vim_item_update
["vim_message"] = vim_info
.get("error_msg")
565 elif vim_info
["status"] == "DELETED":
566 ro_vim_item_update
["vim_id"] = None
567 ro_vim_item_update
["vim_message"] = "Deleted externally"
569 if ro_task
["vim_info"]["vim_details"] != vim_info
["vim_info"]:
570 ro_vim_item_update
["vim_details"] = vim_info
["vim_info"]
572 if ro_vim_item_update
:
574 "ro_task={} {} get-vm={}: status={} {}".format(
576 ro_task
["target_id"],
578 ro_vim_item_update
.get("vim_status"),
579 ro_vim_item_update
.get("vim_message")
580 if ro_vim_item_update
.get("vim_status") != "ACTIVE"
585 return task_status
, ro_vim_item_update
587 def exec(self
, ro_task
, task_index
, task_depends
):
588 task
= ro_task
["tasks"][task_index
]
589 task_id
= task
["task_id"]
590 target_vim
= self
.my_vims
[ro_task
["target_id"]]
591 db_task_update
= {"retries": 0}
592 retries
= task
.get("retries", 0)
595 params
= task
["params"]
596 params_copy
= deepcopy(params
)
597 params_copy
["ro_key"] = self
.db
.decrypt(
598 params_copy
.pop("private_key"),
599 params_copy
.pop("schema_version"),
600 params_copy
.pop("salt"),
602 params_copy
["ip_addr"] = params_copy
.pop("ip_address")
603 target_vim
.inject_user_key(**params_copy
)
605 "task={} {} action-vm=inject_key".format(task_id
, ro_task
["target_id"])
612 ) # params_copy["key"]
613 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
616 self
.logger
.debug(traceback
.format_exc())
617 if retries
< self
.max_retries_inject_ssh_key
:
623 "next_retry": self
.time_retries_inject_ssh_key
,
628 "task={} {} inject-ssh-key: {}".format(task_id
, ro_task
["target_id"], e
)
630 ro_vim_item_update
= {"vim_message": str(e
)}
632 return "FAILED", ro_vim_item_update
, db_task_update
635 class VimInteractionImage(VimInteractionBase
):
636 def new(self
, ro_task
, task_index
, task_depends
):
637 task
= ro_task
["tasks"][task_index
]
638 task_id
= task
["task_id"]
641 target_vim
= self
.my_vims
[ro_task
["target_id"]]
645 if task
.get("find_params"):
646 vim_images
= target_vim
.get_image_list(**task
["find_params"])
649 raise NsWorkerExceptionNotFound(
650 "Image not found with this criteria: '{}'".format(
654 elif len(vim_images
) > 1:
655 raise NsWorkerException(
656 "More than one image found with this criteria: '{}'".format(
661 vim_image_id
= vim_images
[0]["id"]
663 ro_vim_item_update
= {
664 "vim_id": vim_image_id
,
665 "vim_status": "DONE",
667 "created_items": created_items
,
672 "task={} {} new-image={} created={}".format(
673 task_id
, ro_task
["target_id"], vim_image_id
, created
677 return "DONE", ro_vim_item_update
678 except (NsWorkerException
, vimconn
.VimConnException
) as e
:
680 "task={} {} new-image: {}".format(task_id
, ro_task
["target_id"], e
)
682 ro_vim_item_update
= {
683 "vim_status": "VIM_ERROR",
685 "vim_message": str(e
),
688 return "FAILED", ro_vim_item_update
691 class VimInteractionFlavor(VimInteractionBase
):
692 def delete(self
, ro_task
, task_index
):
693 task
= ro_task
["tasks"][task_index
]
694 task_id
= task
["task_id"]
695 flavor_vim_id
= ro_task
["vim_info"]["vim_id"]
696 ro_vim_item_update_ok
= {
697 "vim_status": "DELETED",
699 "vim_message": "DELETED",
705 target_vim
= self
.my_vims
[ro_task
["target_id"]]
706 target_vim
.delete_flavor(flavor_vim_id
)
707 except vimconn
.VimConnNotFoundException
:
708 ro_vim_item_update_ok
["vim_message"] = "already deleted"
709 except vimconn
.VimConnException
as e
:
711 "ro_task={} vim={} del-flavor={}: {}".format(
712 ro_task
["_id"], ro_task
["target_id"], flavor_vim_id
, e
715 ro_vim_item_update
= {
716 "vim_status": "VIM_ERROR",
717 "vim_message": "Error while deleting: {}".format(e
),
720 return "FAILED", ro_vim_item_update
723 "task={} {} del-flavor={} {}".format(
725 ro_task
["target_id"],
727 ro_vim_item_update_ok
.get("vim_message", ""),
731 return "DONE", ro_vim_item_update_ok
733 def new(self
, ro_task
, task_index
, task_depends
):
734 task
= ro_task
["tasks"][task_index
]
735 task_id
= task
["task_id"]
738 target_vim
= self
.my_vims
[ro_task
["target_id"]]
744 if task
.get("find_params"):
746 flavor_data
= task
["find_params"]["flavor_data"]
747 vim_flavor_id
= target_vim
.get_flavor_id_from_data(flavor_data
)
748 except vimconn
.VimConnNotFoundException
:
751 if not vim_flavor_id
and task
.get("params"):
753 flavor_data
= task
["params"]["flavor_data"]
754 vim_flavor_id
= target_vim
.new_flavor(flavor_data
)
757 ro_vim_item_update
= {
758 "vim_id": vim_flavor_id
,
759 "vim_status": "DONE",
761 "created_items": created_items
,
766 "task={} {} new-flavor={} created={}".format(
767 task_id
, ro_task
["target_id"], vim_flavor_id
, created
771 return "DONE", ro_vim_item_update
772 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
774 "task={} vim={} new-flavor: {}".format(task_id
, ro_task
["target_id"], e
)
776 ro_vim_item_update
= {
777 "vim_status": "VIM_ERROR",
779 "vim_message": str(e
),
782 return "FAILED", ro_vim_item_update
785 class VimInteractionAffinityGroup(VimInteractionBase
):
786 def delete(self
, ro_task
, task_index
):
787 task
= ro_task
["tasks"][task_index
]
788 task_id
= task
["task_id"]
789 affinity_group_vim_id
= ro_task
["vim_info"]["vim_id"]
790 ro_vim_item_update_ok
= {
791 "vim_status": "DELETED",
793 "vim_message": "DELETED",
798 if affinity_group_vim_id
:
799 target_vim
= self
.my_vims
[ro_task
["target_id"]]
800 target_vim
.delete_affinity_group(affinity_group_vim_id
)
801 except vimconn
.VimConnNotFoundException
:
802 ro_vim_item_update_ok
["vim_message"] = "already deleted"
803 except vimconn
.VimConnException
as e
:
805 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format(
806 ro_task
["_id"], ro_task
["target_id"], affinity_group_vim_id
, e
809 ro_vim_item_update
= {
810 "vim_status": "VIM_ERROR",
811 "vim_message": "Error while deleting: {}".format(e
),
814 return "FAILED", ro_vim_item_update
817 "task={} {} del-affinity-or-anti-affinity-group={} {}".format(
819 ro_task
["target_id"],
820 affinity_group_vim_id
,
821 ro_vim_item_update_ok
.get("vim_message", ""),
825 return "DONE", ro_vim_item_update_ok
827 def new(self
, ro_task
, task_index
, task_depends
):
828 task
= ro_task
["tasks"][task_index
]
829 task_id
= task
["task_id"]
832 target_vim
= self
.my_vims
[ro_task
["target_id"]]
835 affinity_group_vim_id
= None
836 affinity_group_data
= None
838 if task
.get("params"):
839 affinity_group_data
= task
["params"].get("affinity_group_data")
841 if affinity_group_data
and affinity_group_data
.get("vim-affinity-group-id"):
843 param_affinity_group_id
= task
["params"]["affinity_group_data"].get(
844 "vim-affinity-group-id"
846 affinity_group_vim_id
= target_vim
.get_affinity_group(
847 param_affinity_group_id
849 except vimconn
.VimConnNotFoundException
:
851 "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}"
852 "could not be found at VIM. Creating a new one.".format(
853 task_id
, ro_task
["target_id"], param_affinity_group_id
857 if not affinity_group_vim_id
and affinity_group_data
:
858 affinity_group_vim_id
= target_vim
.new_affinity_group(
863 ro_vim_item_update
= {
864 "vim_id": affinity_group_vim_id
,
865 "vim_status": "DONE",
867 "created_items": created_items
,
872 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format(
873 task_id
, ro_task
["target_id"], affinity_group_vim_id
, created
877 return "DONE", ro_vim_item_update
878 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
880 "task={} vim={} new-affinity-or-anti-affinity-group:"
881 " {}".format(task_id
, ro_task
["target_id"], e
)
883 ro_vim_item_update
= {
884 "vim_status": "VIM_ERROR",
886 "vim_message": str(e
),
889 return "FAILED", ro_vim_item_update
892 class VimInteractionUpdateVdu(VimInteractionBase
):
893 def exec(self
, ro_task
, task_index
, task_depends
):
894 task
= ro_task
["tasks"][task_index
]
895 task_id
= task
["task_id"]
896 db_task_update
= {"retries": 0}
899 target_vim
= self
.my_vims
[ro_task
["target_id"]]
902 if task
.get("params"):
903 vim_vm_id
= task
["params"].get("vim_vm_id")
904 action
= task
["params"].get("action")
905 context
= {action
: action
}
906 target_vim
.action_vminstance(vim_vm_id
, context
)
908 ro_vim_item_update
= {
910 "vim_status": "DONE",
912 "created_items": created_items
,
917 "task={} {} vm-migration done".format(task_id
, ro_task
["target_id"])
919 return "DONE", ro_vim_item_update
, db_task_update
920 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
922 "task={} vim={} VM Migration:"
923 " {}".format(task_id
, ro_task
["target_id"], e
)
925 ro_vim_item_update
= {
926 "vim_status": "VIM_ERROR",
928 "vim_message": str(e
),
931 return "FAILED", ro_vim_item_update
, db_task_update
934 class VimInteractionSdnNet(VimInteractionBase
):
936 def _match_pci(port_pci
, mapping
):
938 Check if port_pci matches with mapping
939 mapping can have brackets to indicate that several chars are accepted. e.g
940 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
941 :param port_pci: text
942 :param mapping: text, can contain brackets to indicate several chars are available
943 :return: True if matches, False otherwise
945 if not port_pci
or not mapping
:
947 if port_pci
== mapping
:
953 bracket_start
= mapping
.find("[", mapping_index
)
955 if bracket_start
== -1:
958 bracket_end
= mapping
.find("]", bracket_start
)
959 if bracket_end
== -1:
962 length
= bracket_start
- mapping_index
965 and port_pci
[pci_index
: pci_index
+ length
]
966 != mapping
[mapping_index
:bracket_start
]
971 port_pci
[pci_index
+ length
]
972 not in mapping
[bracket_start
+ 1 : bracket_end
]
976 pci_index
+= length
+ 1
977 mapping_index
= bracket_end
+ 1
979 if port_pci
[pci_index
:] != mapping
[mapping_index
:]:
984 def _get_interfaces(self
, vlds_to_connect
, vim_account_id
):
986 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
987 :param vim_account_id:
992 for vld
in vlds_to_connect
:
993 table
, _
, db_id
= vld
.partition(":")
994 db_id
, _
, vld
= db_id
.partition(":")
995 _
, _
, vld_id
= vld
.partition(".")
998 q_filter
= {"vim-account-id": vim_account_id
, "_id": db_id
}
999 iface_key
= "vnf-vld-id"
1000 else: # table == "nsrs"
1001 q_filter
= {"vim-account-id": vim_account_id
, "nsr-id-ref": db_id
}
1002 iface_key
= "ns-vld-id"
1004 db_vnfrs
= self
.db
.get_list("vnfrs", q_filter
=q_filter
)
1006 for db_vnfr
in db_vnfrs
:
1007 for vdu_index
, vdur
in enumerate(db_vnfr
.get("vdur", ())):
1008 for iface_index
, interface
in enumerate(vdur
["interfaces"]):
1009 if interface
.get(iface_key
) == vld_id
and interface
.get(
1011 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
1013 interface_
= interface
.copy()
1014 interface_
["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
1015 db_vnfr
["_id"], vdu_index
, iface_index
1018 if vdur
.get("status") == "ERROR":
1019 interface_
["status"] = "ERROR"
1021 interfaces
.append(interface_
)
1025 def refresh(self
, ro_task
):
1026 # look for task create
1027 task_create_index
, _
= next(
1029 for i_t
in enumerate(ro_task
["tasks"])
1031 and i_t
[1]["action"] == "CREATE"
1032 and i_t
[1]["status"] != "FINISHED"
1035 return self
.new(ro_task
, task_create_index
, None)
1037 def new(self
, ro_task
, task_index
, task_depends
):
1039 task
= ro_task
["tasks"][task_index
]
1040 task_id
= task
["task_id"]
1041 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1043 sdn_net_id
= ro_task
["vim_info"]["vim_id"]
1045 created_items
= ro_task
["vim_info"].get("created_items")
1046 connected_ports
= ro_task
["vim_info"].get("connected_ports", [])
1047 new_connected_ports
= []
1048 last_update
= ro_task
["vim_info"].get("last_update", 0)
1049 sdn_status
= ro_task
["vim_info"].get("vim_status", "BUILD") or "BUILD"
1051 created
= ro_task
["vim_info"].get("created", False)
1055 params
= task
["params"]
1056 vlds_to_connect
= params
["vlds"]
1057 associated_vim
= params
["target_vim"]
1058 # external additional ports
1059 additional_ports
= params
.get("sdn-ports") or ()
1060 _
, _
, vim_account_id
= associated_vim
.partition(":")
1063 # get associated VIM
1064 if associated_vim
not in self
.db_vims
:
1065 self
.db_vims
[associated_vim
] = self
.db
.get_one(
1066 "vim_accounts", {"_id": vim_account_id
}
1069 db_vim
= self
.db_vims
[associated_vim
]
1071 # look for ports to connect
1072 ports
= self
._get
_interfaces
(vlds_to_connect
, vim_account_id
)
1076 pending_ports
= error_ports
= 0
1078 sdn_need_update
= False
1081 vlan_used
= port
.get("vlan") or vlan_used
1083 # TODO. Do not connect if already done
1084 if not port
.get("compute_node") or not port
.get("pci"):
1085 if port
.get("status") == "ERROR":
1092 compute_node_mappings
= next(
1095 for c
in db_vim
["config"].get("sdn-port-mapping", ())
1096 if c
and c
["compute_node"] == port
["compute_node"]
1101 if compute_node_mappings
:
1102 # process port_mapping pci of type 0000:af:1[01].[1357]
1106 for p
in compute_node_mappings
["ports"]
1107 if self
._match
_pci
(port
["pci"], p
.get("pci"))
1113 if not db_vim
["config"].get("mapping_not_needed"):
1115 "Port mapping not found for compute_node={} pci={}".format(
1116 port
["compute_node"], port
["pci"]
1123 service_endpoint_id
= "{}:{}".format(port
["compute_node"], port
["pci"])
1125 "service_endpoint_id": pmap
.get("service_endpoint_id")
1126 or service_endpoint_id
,
1127 "service_endpoint_encapsulation_type": "dot1q"
1128 if port
["type"] == "SR-IOV"
1130 "service_endpoint_encapsulation_info": {
1131 "vlan": port
.get("vlan"),
1132 "mac": port
.get("mac-address"),
1133 "device_id": pmap
.get("device_id") or port
["compute_node"],
1134 "device_interface_id": pmap
.get("device_interface_id")
1136 "switch_dpid": pmap
.get("switch_id") or pmap
.get("switch_dpid"),
1137 "switch_port": pmap
.get("switch_port"),
1138 "service_mapping_info": pmap
.get("service_mapping_info"),
1143 # if port["modified_at"] > last_update:
1144 # sdn_need_update = True
1145 new_connected_ports
.append(port
["id"]) # TODO
1146 sdn_ports
.append(new_port
)
1150 "{} interfaces have not been created as VDU is on ERROR status".format(
1155 # connect external ports
1156 for index
, additional_port
in enumerate(additional_ports
):
1157 additional_port_id
= additional_port
.get(
1158 "service_endpoint_id"
1159 ) or "external-{}".format(index
)
1162 "service_endpoint_id": additional_port_id
,
1163 "service_endpoint_encapsulation_type": additional_port
.get(
1164 "service_endpoint_encapsulation_type", "dot1q"
1166 "service_endpoint_encapsulation_info": {
1167 "vlan": additional_port
.get("vlan") or vlan_used
,
1168 "mac": additional_port
.get("mac_address"),
1169 "device_id": additional_port
.get("device_id"),
1170 "device_interface_id": additional_port
.get(
1171 "device_interface_id"
1173 "switch_dpid": additional_port
.get("switch_dpid")
1174 or additional_port
.get("switch_id"),
1175 "switch_port": additional_port
.get("switch_port"),
1176 "service_mapping_info": additional_port
.get(
1177 "service_mapping_info"
1182 new_connected_ports
.append(additional_port_id
)
1185 # if there are more ports to connect or they have been modified, call create/update
1187 sdn_status
= "ERROR"
1188 sdn_info
= "; ".join(error_list
)
1189 elif set(connected_ports
) != set(new_connected_ports
) or sdn_need_update
:
1190 last_update
= time
.time()
1193 if len(sdn_ports
) < 2:
1194 sdn_status
= "ACTIVE"
1196 if not pending_ports
:
1198 "task={} {} new-sdn-net done, less than 2 ports".format(
1199 task_id
, ro_task
["target_id"]
1203 net_type
= params
.get("type") or "ELAN"
1207 ) = target_vim
.create_connectivity_service(net_type
, sdn_ports
)
1210 "task={} {} new-sdn-net={} created={}".format(
1211 task_id
, ro_task
["target_id"], sdn_net_id
, created
1215 created_items
= target_vim
.edit_connectivity_service(
1216 sdn_net_id
, conn_info
=created_items
, connection_points
=sdn_ports
1220 "task={} {} update-sdn-net={} created={}".format(
1221 task_id
, ro_task
["target_id"], sdn_net_id
, created
1225 connected_ports
= new_connected_ports
1227 wim_status_dict
= target_vim
.get_connectivity_service_status(
1228 sdn_net_id
, conn_info
=created_items
1230 sdn_status
= wim_status_dict
["sdn_status"]
1232 if wim_status_dict
.get("sdn_info"):
1233 sdn_info
= str(wim_status_dict
.get("sdn_info")) or ""
1235 if wim_status_dict
.get("error_msg"):
1236 sdn_info
= wim_status_dict
.get("error_msg") or ""
1239 if sdn_status
!= "ERROR":
1240 sdn_info
= "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1241 len(ports
) - pending_ports
, len(ports
)
1244 if sdn_status
== "ACTIVE":
1245 sdn_status
= "BUILD"
1247 ro_vim_item_update
= {
1248 "vim_id": sdn_net_id
,
1249 "vim_status": sdn_status
,
1251 "created_items": created_items
,
1252 "connected_ports": connected_ports
,
1253 "vim_details": sdn_info
,
1254 "vim_message": None,
1255 "last_update": last_update
,
1258 return sdn_status
, ro_vim_item_update
1259 except Exception as e
:
1261 "task={} vim={} new-net: {}".format(task_id
, ro_task
["target_id"], e
),
1262 exc_info
=not isinstance(
1263 e
, (sdnconn
.SdnConnectorError
, vimconn
.VimConnException
)
1266 ro_vim_item_update
= {
1267 "vim_status": "VIM_ERROR",
1269 "vim_message": str(e
),
1272 return "FAILED", ro_vim_item_update
1274 def delete(self
, ro_task
, task_index
):
1275 task
= ro_task
["tasks"][task_index
]
1276 task_id
= task
["task_id"]
1277 sdn_vim_id
= ro_task
["vim_info"].get("vim_id")
1278 ro_vim_item_update_ok
= {
1279 "vim_status": "DELETED",
1281 "vim_message": "DELETED",
1287 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1288 target_vim
.delete_connectivity_service(
1289 sdn_vim_id
, ro_task
["vim_info"].get("created_items")
1292 except Exception as e
:
1294 isinstance(e
, sdnconn
.SdnConnectorError
)
1295 and e
.http_code
== HTTPStatus
.NOT_FOUND
.value
1297 ro_vim_item_update_ok
["vim_message"] = "already deleted"
1300 "ro_task={} vim={} del-sdn-net={}: {}".format(
1301 ro_task
["_id"], ro_task
["target_id"], sdn_vim_id
, e
1303 exc_info
=not isinstance(
1304 e
, (sdnconn
.SdnConnectorError
, vimconn
.VimConnException
)
1307 ro_vim_item_update
= {
1308 "vim_status": "VIM_ERROR",
1309 "vim_message": "Error while deleting: {}".format(e
),
1312 return "FAILED", ro_vim_item_update
1315 "task={} {} del-sdn-net={} {}".format(
1317 ro_task
["target_id"],
1319 ro_vim_item_update_ok
.get("vim_message", ""),
1323 return "DONE", ro_vim_item_update_ok
1326 class VimInteractionMigration(VimInteractionBase
):
1327 def exec(self
, ro_task
, task_index
, task_depends
):
1328 task
= ro_task
["tasks"][task_index
]
1329 task_id
= task
["task_id"]
1330 db_task_update
= {"retries": 0}
1331 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1335 refreshed_vim_info
= {}
1338 if task
.get("params"):
1339 vim_vm_id
= task
["params"].get("vim_vm_id")
1340 migrate_host
= task
["params"].get("migrate_host")
1341 _
, migrated_compute_node
= target_vim
.migrate_instance(
1342 vim_vm_id
, migrate_host
1345 if migrated_compute_node
:
1346 # When VM is migrated, vdu["vim_info"] needs to be updated
1347 vdu_old_vim_info
= task
["params"]["vdu_vim_info"].get(
1348 ro_task
["target_id"]
1351 # Refresh VM to get new vim_info
1352 vm_to_refresh_list
= [vim_vm_id
]
1353 vim_dict
= target_vim
.refresh_vms_status(vm_to_refresh_list
)
1354 refreshed_vim_info
= vim_dict
[vim_vm_id
]
1356 if refreshed_vim_info
.get("interfaces"):
1357 for old_iface
in vdu_old_vim_info
.get("interfaces"):
1361 for iface
in refreshed_vim_info
["interfaces"]
1362 if old_iface
["vim_interface_id"]
1363 == iface
["vim_interface_id"]
1367 vim_interfaces
.append(iface
)
1369 ro_vim_item_update
= {
1370 "vim_id": vim_vm_id
,
1371 "vim_status": "ACTIVE",
1373 "created_items": created_items
,
1374 "vim_details": None,
1375 "vim_message": None,
1378 if refreshed_vim_info
and refreshed_vim_info
.get("status") not in (
1382 ro_vim_item_update
["vim_details"] = refreshed_vim_info
["vim_info"]
1385 ro_vim_item_update
["interfaces"] = vim_interfaces
1388 "task={} {} vm-migration done".format(task_id
, ro_task
["target_id"])
1391 return "DONE", ro_vim_item_update
, db_task_update
1393 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
1395 "task={} vim={} VM Migration:"
1396 " {}".format(task_id
, ro_task
["target_id"], e
)
1398 ro_vim_item_update
= {
1399 "vim_status": "VIM_ERROR",
1401 "vim_message": str(e
),
1404 return "FAILED", ro_vim_item_update
, db_task_update
1407 class VimInteractionResize(VimInteractionBase
):
1408 def exec(self
, ro_task
, task_index
, task_depends
):
1409 task
= ro_task
["tasks"][task_index
]
1410 task_id
= task
["task_id"]
1411 db_task_update
= {"retries": 0}
1413 target_flavor_uuid
= None
1415 refreshed_vim_info
= {}
1416 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1419 if task
.get("params"):
1420 vim_vm_id
= task
["params"].get("vim_vm_id")
1421 flavor_dict
= task
["params"].get("flavor_dict")
1422 self
.logger
.info("flavor_dict %s", flavor_dict
)
1425 target_flavor_uuid
= target_vim
.get_flavor_id_from_data(flavor_dict
)
1426 except Exception as e
:
1427 self
.logger
.info("Cannot find any flavor matching %s.", str(e
))
1429 target_flavor_uuid
= target_vim
.new_flavor(flavor_dict
)
1430 except Exception as e
:
1431 self
.logger
.error("Error creating flavor at VIM %s.", str(e
))
1433 if target_flavor_uuid
is not None:
1434 resized_status
= target_vim
.resize_instance(
1435 vim_vm_id
, target_flavor_uuid
1439 # Refresh VM to get new vim_info
1440 vm_to_refresh_list
= [vim_vm_id
]
1441 vim_dict
= target_vim
.refresh_vms_status(vm_to_refresh_list
)
1442 refreshed_vim_info
= vim_dict
[vim_vm_id
]
1444 ro_vim_item_update
= {
1445 "vim_id": vim_vm_id
,
1446 "vim_status": "DONE",
1448 "created_items": created_items
,
1449 "vim_details": None,
1450 "vim_message": None,
1453 if refreshed_vim_info
and refreshed_vim_info
.get("status") not in (
1457 ro_vim_item_update
["vim_details"] = refreshed_vim_info
["vim_info"]
1460 "task={} {} resize done".format(task_id
, ro_task
["target_id"])
1462 return "DONE", ro_vim_item_update
, db_task_update
1463 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
1465 "task={} vim={} Resize:" " {}".format(task_id
, ro_task
["target_id"], e
)
1467 ro_vim_item_update
= {
1468 "vim_status": "VIM_ERROR",
1470 "vim_message": str(e
),
1473 return "FAILED", ro_vim_item_update
, db_task_update
1476 class ConfigValidate
:
1477 def __init__(self
, config
: Dict
):
1482 # default 1 min, allowed >= 60 or -1, -1 disables periodic checks
1484 self
.conf
["period"]["refresh_active"] >= 60
1485 or self
.conf
["period"]["refresh_active"] == -1
1487 return self
.conf
["period"]["refresh_active"]
1493 return self
.conf
["period"]["refresh_build"]
1497 return self
.conf
["period"]["refresh_image"]
1501 return self
.conf
["period"]["refresh_error"]
1504 def queue_size(self
):
1505 return self
.conf
["period"]["queue_size"]
1508 class NsWorker(threading
.Thread
):
1509 def __init__(self
, worker_index
, config
, plugins
, db
):
1511 :param worker_index: thread index
1512 :param config: general configuration of RO, among others the process_id with the docker id where it runs
1513 :param plugins: global shared dict with the loaded plugins
1514 :param db: database class instance to use
1516 threading
.Thread
.__init
__(self
)
1517 self
.config
= config
1518 self
.plugins
= plugins
1519 self
.plugin_name
= "unknown"
1520 self
.logger
= logging
.getLogger("ro.worker{}".format(worker_index
))
1521 self
.worker_index
= worker_index
1522 # refresh periods for created items
1523 self
.refresh_config
= ConfigValidate(config
)
1524 self
.task_queue
= queue
.Queue(self
.refresh_config
.queue_size
)
1525 # targetvim: vimplugin class
1527 # targetvim: vim information from database
1530 self
.vim_targets
= []
1531 self
.my_id
= config
["process_id"] + ":" + str(worker_index
)
1534 "net": VimInteractionNet(self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
),
1535 "vdu": VimInteractionVdu(self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
),
1536 "image": VimInteractionImage(
1537 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1539 "flavor": VimInteractionFlavor(
1540 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1542 "sdn_net": VimInteractionSdnNet(
1543 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1545 "update": VimInteractionUpdateVdu(
1546 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1548 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
1549 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1551 "migrate": VimInteractionMigration(
1552 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1554 "verticalscale": VimInteractionResize(
1555 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
1558 self
.time_last_task_processed
= None
1559 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1560 self
.tasks_to_delete
= []
1561 # it is idle when there are not vim_targets associated
1563 self
.task_locked_time
= config
["global"]["task_locked_time"]
1565 def insert_task(self
, task
):
1567 self
.task_queue
.put(task
, False)
1570 raise NsWorkerException("timeout inserting a task")
1572 def terminate(self
):
1573 self
.insert_task("exit")
1575 def del_task(self
, task
):
1576 with self
.task_lock
:
1577 if task
["status"] == "SCHEDULED":
1578 task
["status"] = "SUPERSEDED"
1580 else: # task["status"] == "processing"
1581 self
.task_lock
.release()
1584 def _process_vim_config(self
, target_id
, db_vim
):
1586 Process vim config, creating vim configuration files as ca_cert
1587 :param target_id: vim/sdn/wim + id
1588 :param db_vim: Vim dictionary obtained from database
1589 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1591 if not db_vim
.get("config"):
1597 if db_vim
["config"].get("ca_cert_content"):
1598 file_name
= "{}:{}".format(target_id
, self
.worker_index
)
1602 except FileExistsError
:
1605 file_name
= file_name
+ "/ca_cert"
1607 with
open(file_name
, "w") as f
:
1608 f
.write(db_vim
["config"]["ca_cert_content"])
1609 del db_vim
["config"]["ca_cert_content"]
1610 db_vim
["config"]["ca_cert"] = file_name
1611 except Exception as e
:
1612 raise NsWorkerException(
1613 "Error writing to file '{}': {}".format(file_name
, e
)
1616 def _load_plugin(self
, name
, type="vim"):
1617 # type can be vim or sdn
1618 if "rovim_dummy" not in self
.plugins
:
1619 self
.plugins
["rovim_dummy"] = VimDummyConnector
1621 if "rosdn_dummy" not in self
.plugins
:
1622 self
.plugins
["rosdn_dummy"] = SdnDummyConnector
1624 if name
in self
.plugins
:
1625 return self
.plugins
[name
]
1628 for ep
in entry_points(group
="osm_ro{}.plugins".format(type), name
=name
):
1629 self
.plugins
[name
] = ep
.load()
1630 except Exception as e
:
1631 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name
, e
))
1633 if name
and name
not in self
.plugins
:
1634 raise NsWorkerException(
1635 "Plugin 'osm_{n}' has not been installed".format(n
=name
)
1638 return self
.plugins
[name
]
1640 def _unload_vim(self
, target_id
):
1642 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1643 :param target_id: Contains type:_id; where type can be 'vim', ...
1647 self
.db_vims
.pop(target_id
, None)
1648 self
.my_vims
.pop(target_id
, None)
1650 if target_id
in self
.vim_targets
:
1651 self
.vim_targets
.remove(target_id
)
1653 self
.logger
.info("Unloaded {}".format(target_id
))
1654 rmtree("{}:{}".format(target_id
, self
.worker_index
))
1655 except FileNotFoundError
:
1656 pass # this is raised by rmtree if folder does not exist
1657 except Exception as e
:
1658 self
.logger
.error("Cannot unload {}: {}".format(target_id
, e
))
1660 def _check_vim(self
, target_id
):
1662 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1663 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1666 target
, _
, _id
= target_id
.partition(":")
1672 loaded
= target_id
in self
.vim_targets
1682 step
= "Getting {} from db".format(target_id
)
1683 db_vim
= self
.db
.get_one(target_database
, {"_id": _id
})
1685 for op_index
, operation
in enumerate(
1686 db_vim
["_admin"].get("operations", ())
1688 if operation
["operationState"] != "PROCESSING":
1691 locked_at
= operation
.get("locked_at")
1693 if locked_at
is not None and locked_at
>= now
- self
.task_locked_time
:
1694 # some other thread is doing this operation
1698 op_text
= "_admin.operations.{}.".format(op_index
)
1700 if not self
.db
.set_one(
1704 op_text
+ "operationState": "PROCESSING",
1705 op_text
+ "locked_at": locked_at
,
1708 op_text
+ "locked_at": now
,
1709 "admin.current_operation": op_index
,
1711 fail_on_empty
=False,
1715 unset_dict
[op_text
+ "locked_at"] = None
1716 unset_dict
["current_operation"] = None
1717 step
= "Loading " + target_id
1718 error_text
= self
._load
_vim
(target_id
)
1721 step
= "Checking connectivity"
1724 self
.my_vims
[target_id
].check_vim_connectivity()
1726 self
.my_vims
[target_id
].check_credentials()
1728 update_dict
["_admin.operationalState"] = "ENABLED"
1729 update_dict
["_admin.detailed-status"] = ""
1730 unset_dict
[op_text
+ "detailed-status"] = None
1731 update_dict
[op_text
+ "operationState"] = "COMPLETED"
1735 except Exception as e
:
1736 error_text
= "{}: {}".format(step
, e
)
1737 self
.logger
.error("{} for {}: {}".format(step
, target_id
, e
))
1740 if update_dict
or unset_dict
:
1742 update_dict
[op_text
+ "operationState"] = "FAILED"
1743 update_dict
[op_text
+ "detailed-status"] = error_text
1744 unset_dict
.pop(op_text
+ "detailed-status", None)
1745 update_dict
["_admin.operationalState"] = "ERROR"
1746 update_dict
["_admin.detailed-status"] = error_text
1749 update_dict
[op_text
+ "statusEnteredTime"] = now
1753 q_filter
={"_id": _id
},
1754 update_dict
=update_dict
,
1756 fail_on_empty
=False,
1760 self
._unload
_vim
(target_id
)
1762 def _reload_vim(self
, target_id
):
1763 if target_id
in self
.vim_targets
:
1764 self
._load
_vim
(target_id
)
1766 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1767 # just remove it to force load again next time it is needed
1768 self
.db_vims
.pop(target_id
, None)
1770 def _load_vim(self
, target_id
):
1772 Load or reload a vim_account, sdn_controller or wim_account.
1773 Read content from database, load the plugin if not loaded.
1774 In case of error loading the plugin, it load a failing VIM_connector
1775 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1776 :param target_id: Contains type:_id; where type can be 'vim', ...
1777 :return: None if ok, descriptive text if error
1779 target
, _
, _id
= target_id
.partition(":")
1791 step
= "Getting {}={} from db".format(target
, _id
)
1792 # TODO process for wim, sdnc, ...
1793 vim
= self
.db
.get_one(target_database
, {"_id": _id
})
1795 # if deep_get(vim, "config", "sdn-controller"):
1796 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1797 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1799 step
= "Decrypting password"
1800 schema_version
= vim
.get("schema_version")
1801 self
.db
.encrypt_decrypt_fields(
1804 fields
=("password", "secret"),
1805 schema_version
=schema_version
,
1808 self
._process
_vim
_config
(target_id
, vim
)
1811 plugin_name
= "rovim_" + vim
["vim_type"]
1812 step
= "Loading plugin '{}'".format(plugin_name
)
1813 vim_module_conn
= self
._load
_plugin
(plugin_name
)
1814 step
= "Loading {}'".format(target_id
)
1815 self
.my_vims
[target_id
] = vim_module_conn(
1818 tenant_id
=vim
.get("vim_tenant_id"),
1819 tenant_name
=vim
.get("vim_tenant_name"),
1822 user
=vim
["vim_user"],
1823 passwd
=vim
["vim_password"],
1824 config
=vim
.get("config") or {},
1828 plugin_name
= "rosdn_" + vim
["type"]
1829 step
= "Loading plugin '{}'".format(plugin_name
)
1830 vim_module_conn
= self
._load
_plugin
(plugin_name
, "sdn")
1831 step
= "Loading {}'".format(target_id
)
1833 wim_config
= wim
.pop("config", {}) or {}
1834 wim
["uuid"] = wim
["_id"]
1835 wim
["wim_url"] = wim
["url"]
1838 wim_config
["dpid"] = wim
.pop("dpid")
1840 if wim
.get("switch_id"):
1841 wim_config
["switch_id"] = wim
.pop("switch_id")
1843 # wim, wim_account, config
1844 self
.my_vims
[target_id
] = vim_module_conn(wim
, wim
, wim_config
)
1845 self
.db_vims
[target_id
] = vim
1846 self
.error_status
= None
1849 "Connector loaded for {}, plugin={}".format(target_id
, plugin_name
)
1851 except Exception as e
:
1853 "Cannot load {} plugin={}: {} {}".format(
1854 target_id
, plugin_name
, step
, e
1858 self
.db_vims
[target_id
] = vim
or {}
1859 self
.db_vims
[target_id
] = FailingConnector(str(e
))
1860 error_status
= "{} Error: {}".format(step
, e
)
1864 if target_id
not in self
.vim_targets
:
1865 self
.vim_targets
.append(target_id
)
1867 def _get_db_task(self
):
1869 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1874 if not self
.time_last_task_processed
:
1875 self
.time_last_task_processed
= now
1880 # Log RO tasks only when loglevel is DEBUG
1881 if self.logger.getEffectiveLevel() == logging.DEBUG:
1888 + str(self.task_locked_time)
1890 + "time_last_task_processed="
1891 + str(self.time_last_task_processed)
1897 locked
= self
.db
.set_one(
1900 "target_id": self
.vim_targets
,
1901 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1902 "locked_at.lt": now
- self
.task_locked_time
,
1903 "to_check_at.lt": self
.time_last_task_processed
,
1904 "to_check_at.gt": -1,
1906 update_dict
={"locked_by": self
.my_id
, "locked_at": now
},
1907 fail_on_empty
=False,
1912 ro_task
= self
.db
.get_one(
1915 "target_id": self
.vim_targets
,
1916 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1922 if self
.time_last_task_processed
== now
:
1923 self
.time_last_task_processed
= None
1926 self
.time_last_task_processed
= now
1927 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
1929 except DbException
as e
:
1930 self
.logger
.error("Database exception at _get_db_task: {}".format(e
))
1931 except Exception as e
:
1932 self
.logger
.critical(
1933 "Unexpected exception at _get_db_task: {}".format(e
), exc_info
=True
1938 def _get_db_all_tasks(self
):
1940 Read all content of table ro_tasks to log it
1944 # Checking the content of the BD:
1947 ro_task
= self
.db
.get_list("ro_tasks")
1949 self
._log
_ro
_task
(rt
, None, None, "TASK_WF", "GET_ALL_TASKS")
1952 except DbException
as e
:
1953 self
.logger
.error("Database exception at _get_db_all_tasks: {}".format(e
))
1954 except Exception as e
:
1955 self
.logger
.critical(
1956 "Unexpected exception at _get_db_all_tasks: {}".format(e
), exc_info
=True
1961 def _log_ro_task(self
, ro_task
, db_ro_task_update
, db_ro_task_delete
, mark
, event
):
1963 Generate a log with the following format:
1965 Mark;Event;ro_task_id;locked_at;modified_at;created_at;to_check_at;locked_by;
1966 target_id;vim_info.refresh_at;vim_info;no_of_tasks;task_status;action_id;
1967 task_array_index;task_id;task_action;task_item;task_args
1971 TASK_WF;GET_TASK;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;1642158724.8210013;
1972 1642158640.7986135;1642158640.7986135;1642158640.7986135;b134c9494e75:0a
1973 ;vim:b7ff9e24-8868-4d68-8a57-a59dc11d0327;None;{'created': False,
1974 'created_items': None, 'vim_id': None, 'vim_name': None, 'vim_status': None,
1975 'vim_details': None, 'vim_message': None, 'refresh_at': None};1;SCHEDULED;
1976 888f1864-749a-4fc2-bc1a-97c0fffd6a6f;0;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;
1977 CREATE;image;{'filter_dict': {'name': 'ubuntu-os-cloud:image-family:ubuntu-1804-lts'}}
1982 if ro_task
is not None and isinstance(ro_task
, dict):
1983 for t
in ro_task
["tasks"]:
1987 line
.append(ro_task
.get("_id", ""))
1988 line
.append(str(ro_task
.get("locked_at", "")))
1989 line
.append(str(ro_task
.get("modified_at", "")))
1990 line
.append(str(ro_task
.get("created_at", "")))
1991 line
.append(str(ro_task
.get("to_check_at", "")))
1992 line
.append(str(ro_task
.get("locked_by", "")))
1993 line
.append(str(ro_task
.get("target_id", "")))
1994 line
.append(str(ro_task
.get("vim_info", {}).get("refresh_at", "")))
1995 line
.append(str(ro_task
.get("vim_info", "")))
1996 line
.append(str(ro_task
.get("tasks", "")))
1997 if isinstance(t
, dict):
1998 line
.append(str(t
.get("status", "")))
1999 line
.append(str(t
.get("action_id", "")))
2001 line
.append(str(t
.get("task_id", "")))
2002 line
.append(str(t
.get("action", "")))
2003 line
.append(str(t
.get("item", "")))
2004 line
.append(str(t
.get("find_params", "")))
2005 line
.append(str(t
.get("params", "")))
2007 line
.extend([""] * 2)
2009 line
.extend([""] * 5)
2012 self
.logger
.debug(";".join(line
))
2013 elif db_ro_task_update
is not None and isinstance(db_ro_task_update
, dict):
2016 st
= "tasks.{}.status".format(i
)
2017 if st
not in db_ro_task_update
:
2022 line
.append(db_ro_task_update
.get("_id", ""))
2023 line
.append(str(db_ro_task_update
.get("locked_at", "")))
2024 line
.append(str(db_ro_task_update
.get("modified_at", "")))
2026 line
.append(str(db_ro_task_update
.get("to_check_at", "")))
2027 line
.append(str(db_ro_task_update
.get("locked_by", "")))
2029 line
.append(str(db_ro_task_update
.get("vim_info.refresh_at", "")))
2031 line
.append(str(db_ro_task_update
.get("vim_info", "")))
2032 line
.append(str(str(db_ro_task_update
).count(".status")))
2033 line
.append(db_ro_task_update
.get(st
, ""))
2036 line
.extend([""] * 3)
2038 self
.logger
.debug(";".join(line
))
2040 elif db_ro_task_delete
is not None and isinstance(db_ro_task_delete
, dict):
2044 line
.append(db_ro_task_delete
.get("_id", ""))
2046 line
.append(db_ro_task_delete
.get("modified_at", ""))
2047 line
.extend([""] * 13)
2048 self
.logger
.debug(";".join(line
))
2054 line
.extend([""] * 16)
2055 self
.logger
.debug(";".join(line
))
2057 except Exception as e
:
2058 self
.logger
.error("Error logging ro_task: {}".format(e
))
2060 def _delete_task(self
, ro_task
, task_index
, task_depends
, db_update
):
2062 Determine if this task need to be done or superseded
2065 my_task
= ro_task
["tasks"][task_index
]
2066 task_id
= my_task
["task_id"]
2067 needed_delete
= ro_task
["vim_info"]["created"] or ro_task
["vim_info"].get(
2068 "created_items", False
2071 self
.logger
.warning("Needed delete: {}".format(needed_delete
))
2072 if my_task
["status"] == "FAILED":
2073 return None, None # TODO need to be retry??
2076 for index
, task
in enumerate(ro_task
["tasks"]):
2077 if index
== task_index
or not task
:
2081 my_task
["target_record"] == task
["target_record"]
2082 and task
["action"] == "CREATE"
2085 db_update
["tasks.{}.status".format(index
)] = task
[
2088 elif task
["action"] == "CREATE" and task
["status"] not in (
2092 needed_delete
= False
2095 self
.logger
.warning(
2096 "Deleting ro_task={} task_index={}".format(ro_task
, task_index
)
2098 return self
.item2class
[my_task
["item"]].delete(ro_task
, task_index
)
2100 return "SUPERSEDED", None
2101 except Exception as e
:
2102 if not isinstance(e
, NsWorkerException
):
2103 self
.logger
.critical(
2104 "Unexpected exception at _delete_task task={}: {}".format(
2110 return "FAILED", {"vim_status": "VIM_ERROR", "vim_message": str(e
)}
2112 def _create_task(self
, ro_task
, task_index
, task_depends
, db_update
):
2114 Determine if this task need to create something at VIM
2117 my_task
= ro_task
["tasks"][task_index
]
2118 task_id
= my_task
["task_id"]
2121 if my_task
["status"] == "FAILED":
2122 return None, None # TODO need to be retry??
2123 elif my_task
["status"] == "SCHEDULED":
2124 # check if already created by another task
2125 for index
, task
in enumerate(ro_task
["tasks"]):
2126 if index
== task_index
or not task
:
2129 if task
["action"] == "CREATE" and task
["status"] not in (
2134 return task
["status"], "COPY_VIM_INFO"
2137 task_status
, ro_vim_item_update
= self
.item2class
[my_task
["item"]].new(
2138 ro_task
, task_index
, task_depends
2140 # TODO update other CREATE tasks
2141 except Exception as e
:
2142 if not isinstance(e
, NsWorkerException
):
2144 "Error executing task={}: {}".format(task_id
, e
), exc_info
=True
2147 task_status
= "FAILED"
2148 ro_vim_item_update
= {"vim_status": "VIM_ERROR", "vim_message": str(e
)}
2149 # TODO update ro_vim_item_update
2151 return task_status
, ro_vim_item_update
2155 def _get_dependency(self
, task_id
, ro_task
=None, target_id
=None):
2157 Look for dependency task
2158 :param task_id: Can be one of
2159 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2160 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2161 3. task.task_id: "<action_id>:number"
2164 :return: database ro_task plus index of task
2167 task_id
.startswith("vim:")
2168 or task_id
.startswith("sdn:")
2169 or task_id
.startswith("wim:")
2171 target_id
, _
, task_id
= task_id
.partition(" ")
2173 if task_id
.startswith("nsrs:") or task_id
.startswith("vnfrs:"):
2174 ro_task_dependency
= self
.db
.get_one(
2176 q_filter
={"target_id": target_id
, "tasks.target_record_id": task_id
},
2177 fail_on_empty
=False,
2180 if ro_task_dependency
:
2181 for task_index
, task
in enumerate(ro_task_dependency
["tasks"]):
2182 if task
["target_record_id"] == task_id
:
2183 return ro_task_dependency
, task_index
2187 for task_index
, task
in enumerate(ro_task
["tasks"]):
2188 if task
and task
["task_id"] == task_id
:
2189 return ro_task
, task_index
2191 ro_task_dependency
= self
.db
.get_one(
2194 "tasks.ANYINDEX.task_id": task_id
,
2195 "tasks.ANYINDEX.target_record.ne": None,
2197 fail_on_empty
=False,
2200 self
.logger
.warning("ro_task_dependency={}".format(ro_task_dependency
))
2201 if ro_task_dependency
:
2202 for task_index
, task
in enumerate(ro_task_dependency
["tasks"]):
2203 if task
["task_id"] == task_id
:
2204 return ro_task_dependency
, task_index
2205 raise NsWorkerException("Cannot get depending task {}".format(task_id
))
2207 def update_vm_refresh(self
):
2208 """Enables the VM status updates if self.refresh_config.active parameter
2209 is not -1 and than updates the DB accordingly
2213 self
.logger
.debug("Checking if VM status update config")
2214 next_refresh
= time
.time()
2215 if self
.refresh_config
.active
== -1:
2218 next_refresh
+= self
.refresh_config
.active
2220 if next_refresh
!= -1:
2221 db_ro_task_update
= {}
2223 next_check_at
= now
+ (24 * 60 * 60)
2224 next_check_at
= min(next_check_at
, next_refresh
)
2225 db_ro_task_update
["vim_info.refresh_at"] = next_refresh
2226 db_ro_task_update
["to_check_at"] = next_check_at
2229 "Finding tasks which to be updated to enable VM status updates"
2231 refresh_tasks
= self
.db
.get_list(
2234 "tasks.status": "DONE",
2235 "to_check_at.lt": 0,
2238 self
.logger
.debug("Updating tasks to change the to_check_at status")
2239 for task
in refresh_tasks
:
2246 update_dict
=db_ro_task_update
,
2250 except Exception as e
:
2251 self
.logger
.error(f
"Error updating tasks to enable VM status updates: {e}")
2253 def _process_pending_tasks(self
, ro_task
):
2254 ro_task_id
= ro_task
["_id"]
2257 next_check_at
= now
+ (24 * 60 * 60)
2258 db_ro_task_update
= {}
2260 def _update_refresh(new_status
):
2261 # compute next_refresh
2263 nonlocal next_check_at
2264 nonlocal db_ro_task_update
2267 next_refresh
= time
.time()
2269 if task
["item"] in ("image", "flavor"):
2270 next_refresh
+= self
.refresh_config
.image
2271 elif new_status
== "BUILD":
2272 next_refresh
+= self
.refresh_config
.build
2273 elif new_status
== "DONE":
2274 if self
.refresh_config
.active
== -1:
2277 next_refresh
+= self
.refresh_config
.active
2279 next_refresh
+= self
.refresh_config
.error
2281 next_check_at
= min(next_check_at
, next_refresh
)
2282 db_ro_task_update
["vim_info.refresh_at"] = next_refresh
2283 ro_task
["vim_info"]["refresh_at"] = next_refresh
2287 # Log RO tasks only when loglevel is DEBUG
2288 if self.logger.getEffectiveLevel() == logging.DEBUG:
2289 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
2291 # Check if vim status refresh is enabled again
2292 self
.update_vm_refresh()
2293 # 0: get task_status_create
2295 task_status_create
= None
2299 for t
in ro_task
["tasks"]
2301 and t
["action"] == "CREATE"
2302 and t
["status"] in ("BUILD", "DONE")
2308 task_status_create
= task_create
["status"]
2310 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
2311 for task_action
in ("DELETE", "CREATE", "EXEC"):
2312 db_vim_update
= None
2315 for task_index
, task
in enumerate(ro_task
["tasks"]):
2317 continue # task deleted
2320 target_update
= None
2324 task_action
in ("DELETE", "EXEC")
2325 and task
["status"] not in ("SCHEDULED", "BUILD")
2327 or task
["action"] != task_action
2329 task_action
== "CREATE"
2330 and task
["status"] in ("FINISHED", "SUPERSEDED")
2335 task_path
= "tasks.{}.status".format(task_index
)
2337 db_vim_info_update
= None
2339 if task
["status"] == "SCHEDULED":
2340 # check if tasks that this depends on have been completed
2341 dependency_not_completed
= False
2343 for dependency_task_id
in task
.get("depends_on") or ():
2346 dependency_task_index
,
2347 ) = self
._get
_dependency
(
2348 dependency_task_id
, target_id
=ro_task
["target_id"]
2350 dependency_task
= dependency_ro_task
["tasks"][
2351 dependency_task_index
2353 self
.logger
.warning(
2354 "dependency_ro_task={} dependency_task_index={}".format(
2355 dependency_ro_task
, dependency_task_index
2359 if dependency_task
["status"] == "SCHEDULED":
2360 dependency_not_completed
= True
2361 next_check_at
= min(
2362 next_check_at
, dependency_ro_task
["to_check_at"]
2364 # must allow dependent task to be processed first
2365 # to do this set time after last_task_processed
2366 next_check_at
= max(
2367 self
.time_last_task_processed
, next_check_at
2370 elif dependency_task
["status"] == "FAILED":
2371 error_text
= "Cannot {} {} because depends on failed {} {} id={}): {}".format(
2374 dependency_task
["action"],
2375 dependency_task
["item"],
2377 dependency_ro_task
["vim_info"].get(
2382 "task={} {}".format(task
["task_id"], error_text
)
2384 raise NsWorkerException(error_text
)
2386 task_depends
[dependency_task_id
] = dependency_ro_task
[
2390 "TASK-{}".format(dependency_task_id
)
2391 ] = dependency_ro_task
["vim_info"]["vim_id"]
2393 if dependency_not_completed
:
2394 self
.logger
.warning(
2395 "DEPENDENCY NOT COMPLETED {}".format(
2396 dependency_ro_task
["vim_info"]["vim_id"]
2399 # TODO set at vim_info.vim_details that it is waiting
2402 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
2403 # the task of renew this locking. It will update database locket_at periodically
2405 lock_object
= LockRenew
.add_lock_object(
2406 "ro_tasks", ro_task
, self
2409 if task
["action"] == "DELETE":
2410 (new_status
, db_vim_info_update
,) = self
._delete
_task
(
2411 ro_task
, task_index
, task_depends
, db_ro_task_update
2414 "FINISHED" if new_status
== "DONE" else new_status
2416 # ^with FINISHED instead of DONE it will not be refreshing
2418 if new_status
in ("FINISHED", "SUPERSEDED"):
2419 target_update
= "DELETE"
2420 elif task
["action"] == "EXEC":
2425 ) = self
.item2class
[task
["item"]].exec(
2426 ro_task
, task_index
, task_depends
2429 "FINISHED" if new_status
== "DONE" else new_status
2431 # ^with FINISHED instead of DONE it will not be refreshing
2434 # load into database the modified db_task_update "retries" and "next_retry"
2435 if db_task_update
.get("retries"):
2437 "tasks.{}.retries".format(task_index
)
2438 ] = db_task_update
["retries"]
2440 next_check_at
= time
.time() + db_task_update
.get(
2443 target_update
= None
2444 elif task
["action"] == "CREATE":
2445 if task
["status"] == "SCHEDULED":
2446 if task_status_create
:
2447 new_status
= task_status_create
2448 target_update
= "COPY_VIM_INFO"
2450 new_status
, db_vim_info_update
= self
.item2class
[
2452 ].new(ro_task
, task_index
, task_depends
)
2453 # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
2454 _update_refresh(new_status
)
2456 refresh_at
= ro_task
["vim_info"]["refresh_at"]
2457 if refresh_at
and refresh_at
!= -1 and now
> refresh_at
:
2458 (new_status
, db_vim_info_update
,) = self
.item2class
[
2461 _update_refresh(new_status
)
2463 # The refresh is updated to avoid set the value of "refresh_at" to
2464 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2465 # because it can happen that in this case the task is never processed
2466 _update_refresh(task
["status"])
2468 except Exception as e
:
2469 new_status
= "FAILED"
2470 db_vim_info_update
= {
2471 "vim_status": "VIM_ERROR",
2472 "vim_message": str(e
),
2476 e
, (NsWorkerException
, vimconn
.VimConnException
)
2479 "Unexpected exception at _delete_task task={}: {}".format(
2486 if db_vim_info_update
:
2487 db_vim_update
= db_vim_info_update
.copy()
2488 db_ro_task_update
.update(
2491 for k
, v
in db_vim_info_update
.items()
2494 ro_task
["vim_info"].update(db_vim_info_update
)
2497 if task_action
== "CREATE":
2498 task_status_create
= new_status
2499 db_ro_task_update
[task_path
] = new_status
2501 if target_update
or db_vim_update
:
2502 if target_update
== "DELETE":
2503 self
._update
_target
(task
, None)
2504 elif target_update
== "COPY_VIM_INFO":
2505 self
._update
_target
(task
, ro_task
["vim_info"])
2507 self
._update
_target
(task
, db_vim_update
)
2509 except Exception as e
:
2511 isinstance(e
, DbException
)
2512 and e
.http_code
== HTTPStatus
.NOT_FOUND
2514 # if the vnfrs or nsrs has been removed from database, this task must be removed
2516 "marking to delete task={}".format(task
["task_id"])
2518 self
.tasks_to_delete
.append(task
)
2521 "Unexpected exception at _update_target task={}: {}".format(
2527 locked_at
= ro_task
["locked_at"]
2531 lock_object
["locked_at"],
2532 lock_object
["locked_at"] + self
.task_locked_time
,
2534 # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
2535 # contain exactly locked_at + self.task_locked_time
2536 LockRenew
.remove_lock_object(lock_object
)
2539 "_id": ro_task
["_id"],
2540 "to_check_at": ro_task
["to_check_at"],
2541 "locked_at": locked_at
,
2543 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2544 # outside this task (by ro_nbi) do not update it
2545 db_ro_task_update
["locked_by"] = None
2546 # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
2547 db_ro_task_update
["locked_at"] = int(now
- self
.task_locked_time
)
2548 db_ro_task_update
["modified_at"] = now
2549 db_ro_task_update
["to_check_at"] = next_check_at
2552 # Log RO tasks only when loglevel is DEBUG
2553 if self.logger.getEffectiveLevel() == logging.DEBUG:
2554 db_ro_task_update_log = db_ro_task_update.copy()
2555 db_ro_task_update_log["_id"] = q_filter["_id"]
2556 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2559 if not self
.db
.set_one(
2561 update_dict
=db_ro_task_update
,
2563 fail_on_empty
=False,
2565 del db_ro_task_update
["to_check_at"]
2566 del q_filter
["to_check_at"]
2568 # Log RO tasks only when loglevel is DEBUG
2569 if self.logger.getEffectiveLevel() == logging.DEBUG:
2572 db_ro_task_update_log,
2575 "SET_TASK " + str(q_filter),
2581 update_dict
=db_ro_task_update
,
2584 except DbException
as e
:
2586 "ro_task={} Error updating database {}".format(ro_task_id
, e
)
2588 except Exception as e
:
2590 "Error executing ro_task={}: {}".format(ro_task_id
, e
), exc_info
=True
2593 def _update_target(self
, task
, ro_vim_item_update
):
2594 table
, _
, temp
= task
["target_record"].partition(":")
2595 _id
, _
, path_vim_status
= temp
.partition(":")
2596 path_item
= path_vim_status
[: path_vim_status
.rfind(".")]
2597 path_item
= path_item
[: path_item
.rfind(".")]
2598 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
2599 # path_item: dot separated list targeting record information, e.g. "vdur.10"
2601 if ro_vim_item_update
:
2603 path_vim_status
+ "." + k
: v
2604 for k
, v
in ro_vim_item_update
.items()
2613 "interfaces_backup",
2617 if path_vim_status
.startswith("vdur."):
2618 # for backward compatibility, add vdur.name apart from vdur.vim_name
2619 if ro_vim_item_update
.get("vim_name"):
2620 update_dict
[path_item
+ ".name"] = ro_vim_item_update
["vim_name"]
2622 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
2623 if ro_vim_item_update
.get("vim_id"):
2624 update_dict
[path_item
+ ".vim-id"] = ro_vim_item_update
["vim_id"]
2626 # update general status
2627 if ro_vim_item_update
.get("vim_status"):
2628 update_dict
[path_item
+ ".status"] = ro_vim_item_update
[
2632 if ro_vim_item_update
.get("interfaces"):
2633 path_interfaces
= path_item
+ ".interfaces"
2635 for i
, iface
in enumerate(ro_vim_item_update
.get("interfaces")):
2639 path_interfaces
+ ".{}.".format(i
) + k
: v
2640 for k
, v
in iface
.items()
2641 if k
in ("vlan", "compute_node", "pci")
2645 # put ip_address and mac_address with ip-address and mac-address
2646 if iface
.get("ip_address"):
2648 path_interfaces
+ ".{}.".format(i
) + "ip-address"
2649 ] = iface
["ip_address"]
2651 if iface
.get("mac_address"):
2653 path_interfaces
+ ".{}.".format(i
) + "mac-address"
2654 ] = iface
["mac_address"]
2656 if iface
.get("mgmt_vnf_interface") and iface
.get("ip_address"):
2657 update_dict
["ip-address"] = iface
.get("ip_address").split(
2661 if iface
.get("mgmt_vdu_interface") and iface
.get("ip_address"):
2662 update_dict
[path_item
+ ".ip-address"] = iface
.get(
2666 self
.db
.set_one(table
, q_filter
={"_id": _id
}, update_dict
=update_dict
)
2668 # If interfaces exists, it backups VDU interfaces in the DB for healing operations
2669 if ro_vim_item_update
.get("interfaces"):
2670 search_key
= path_vim_status
+ ".interfaces"
2671 if update_dict
.get(search_key
):
2672 interfaces_backup_update
= {
2673 path_vim_status
+ ".interfaces_backup": update_dict
[search_key
]
2678 q_filter
={"_id": _id
},
2679 update_dict
=interfaces_backup_update
,
2683 update_dict
= {path_item
+ ".status": "DELETED"}
2686 q_filter
={"_id": _id
},
2687 update_dict
=update_dict
,
2688 unset
={path_vim_status
: None},
2691 def _process_delete_db_tasks(self
):
2693 Delete task from database because vnfrs or nsrs or both have been deleted
2694 :return: None. Uses and modify self.tasks_to_delete
2696 while self
.tasks_to_delete
:
2697 task
= self
.tasks_to_delete
[0]
2698 vnfrs_deleted
= None
2699 nsr_id
= task
["nsr_id"]
2701 if task
["target_record"].startswith("vnfrs:"):
2702 # check if nsrs is present
2703 if self
.db
.get_one("nsrs", {"_id": nsr_id
}, fail_on_empty
=False):
2704 vnfrs_deleted
= task
["target_record"].split(":")[1]
2707 self
.delete_db_tasks(self
.db
, nsr_id
, vnfrs_deleted
)
2708 except Exception as e
:
2710 "Error deleting task={}: {}".format(task
["task_id"], e
)
2712 self
.tasks_to_delete
.pop(0)
2715 def delete_db_tasks(db
, nsr_id
, vnfrs_deleted
):
2717 Static method because it is called from osm_ng_ro.ns
2718 :param db: instance of database to use
2719 :param nsr_id: affected nsrs id
2720 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2721 :return: None, exception is fails
2724 for retry
in range(retries
):
2725 ro_tasks
= db
.get_list("ro_tasks", {"tasks.nsr_id": nsr_id
})
2729 for ro_task
in ro_tasks
:
2731 to_delete_ro_task
= True
2733 for index
, task
in enumerate(ro_task
["tasks"]):
2736 elif (not vnfrs_deleted
and task
["nsr_id"] == nsr_id
) or (
2738 and task
["target_record"].startswith("vnfrs:" + vnfrs_deleted
)
2740 db_update
["tasks.{}".format(index
)] = None
2742 # used by other nsr, ro_task cannot be deleted
2743 to_delete_ro_task
= False
2745 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2746 if to_delete_ro_task
:
2750 "_id": ro_task
["_id"],
2751 "modified_at": ro_task
["modified_at"],
2753 fail_on_empty
=False,
2757 db_update
["modified_at"] = now
2761 "_id": ro_task
["_id"],
2762 "modified_at": ro_task
["modified_at"],
2764 update_dict
=db_update
,
2765 fail_on_empty
=False,
2771 raise NsWorkerException("Exceeded {} retries".format(retries
))
2775 self
.logger
.info("Starting")
2777 # step 1: get commands from queue
2779 if self
.vim_targets
:
2780 task
= self
.task_queue
.get(block
=False)
2783 self
.logger
.debug("enters in idle state")
2785 task
= self
.task_queue
.get(block
=True)
2788 if task
[0] == "terminate":
2790 elif task
[0] == "load_vim":
2791 self
.logger
.info("order to load vim {}".format(task
[1]))
2792 self
._load
_vim
(task
[1])
2793 elif task
[0] == "unload_vim":
2794 self
.logger
.info("order to unload vim {}".format(task
[1]))
2795 self
._unload
_vim
(task
[1])
2796 elif task
[0] == "reload_vim":
2797 self
._reload
_vim
(task
[1])
2798 elif task
[0] == "check_vim":
2799 self
.logger
.info("order to check vim {}".format(task
[1]))
2800 self
._check
_vim
(task
[1])
2802 except Exception as e
:
2803 if isinstance(e
, queue
.Empty
):
2806 self
.logger
.critical(
2807 "Error processing task: {}".format(e
), exc_info
=True
2810 # step 2: process pending_tasks, delete not needed tasks
2812 if self
.tasks_to_delete
:
2813 self
._process
_delete
_db
_tasks
()
2816 # Log RO tasks only when loglevel is DEBUG
2817 if self.logger.getEffectiveLevel() == logging.DEBUG:
2818 _ = self._get_db_all_tasks()
2820 ro_task
= self
._get
_db
_task
()
2822 self
.logger
.warning("Task to process: {}".format(ro_task
))
2824 self
._process
_pending
_tasks
(ro_task
)
2828 except Exception as e
:
2829 self
.logger
.critical(
2830 "Unexpected exception at run: " + str(e
), exc_info
=True
2833 self
.logger
.info("Finishing")