1 # -*- coding: utf-8 -*-
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
27 from copy
import deepcopy
28 from http
import HTTPStatus
30 from os
import makedirs
36 from typing
import Dict
37 from unittest
.mock
import Mock
39 from importlib_metadata
import entry_points
40 from osm_common
.dbbase
import DbException
41 from osm_ng_ro
.vim_admin
import LockRenew
42 from osm_ro_plugin
import sdnconn
43 from osm_ro_plugin
import vimconn
44 from osm_ro_plugin
.sdn_dummy
import SdnDummyConnector
45 from osm_ro_plugin
.vim_dummy
import VimDummyConnector
48 __author__
= "Alfonso Tierno"
49 __date__
= "$28-Sep-2017 12:07:15$"
52 def deep_get(target_dict
, *args
, **kwargs
):
54 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
55 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
56 :param target_dict: dictionary to be read
57 :param args: list of keys to read from target_dict
58 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
59 :return: The wanted value if exists, None or default otherwise
62 if not isinstance(target_dict
, dict) or key
not in target_dict
:
63 return kwargs
.get("default")
64 target_dict
= target_dict
[key
]
68 class NsWorkerException(Exception):
72 class FailingConnector
:
73 def __init__(self
, error_msg
):
74 self
.error_msg
= error_msg
76 for method
in dir(vimconn
.VimConnector
):
79 self
, method
, Mock(side_effect
=vimconn
.VimConnException(error_msg
))
82 for method
in dir(sdnconn
.SdnConnectorBase
):
85 self
, method
, Mock(side_effect
=sdnconn
.SdnConnectorError(error_msg
))
89 class NsWorkerExceptionNotFound(NsWorkerException
):
93 class VimInteractionBase
:
94 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
95 It implements methods that does nothing and return ok"""
97 def __init__(self
, db
, my_vims
, db_vims
, logger
):
100 self
.my_vims
= my_vims
101 self
.db_vims
= db_vims
103 def new(self
, ro_task
, task_index
, task_depends
):
106 def refresh(self
, ro_task
):
107 """skip calling VIM to get image, flavor status. Assumes ok"""
108 if ro_task
["vim_info"]["vim_status"] == "VIM_ERROR":
113 def delete(self
, ro_task
, task_index
):
114 """skip calling VIM to delete image. Assumes ok"""
117 def exec(self
, ro_task
, task_index
, task_depends
):
118 return "DONE", None, None
121 class VimInteractionNet(VimInteractionBase
):
122 def new(self
, ro_task
, task_index
, task_depends
):
124 task
= ro_task
["tasks"][task_index
]
125 task_id
= task
["task_id"]
128 target_vim
= self
.my_vims
[ro_task
["target_id"]]
130 mgmtnet_defined_in_vim
= False
134 if task
.get("find_params"):
135 # if management, get configuration of VIM
136 if task
["find_params"].get("filter_dict"):
137 vim_filter
= task
["find_params"]["filter_dict"]
139 elif task
["find_params"].get("mgmt"):
142 self
.db_vims
[ro_task
["target_id"]],
144 "management_network_id",
146 mgmtnet_defined_in_vim
= True
148 "id": self
.db_vims
[ro_task
["target_id"]]["config"][
149 "management_network_id"
153 self
.db_vims
[ro_task
["target_id"]],
155 "management_network_name",
157 mgmtnet_defined_in_vim
= True
159 "name": self
.db_vims
[ro_task
["target_id"]]["config"][
160 "management_network_name"
164 vim_filter
= {"name": task
["find_params"]["name"]}
166 raise NsWorkerExceptionNotFound(
167 "Invalid find_params for new_net {}".format(task
["find_params"])
170 vim_nets
= target_vim
.get_network_list(vim_filter
)
171 if not vim_nets
and not task
.get("params"):
172 # If there is mgmt-network in the descriptor,
173 # there is no mapping of that network to a VIM network in the descriptor,
174 # also there is no mapping in the "--config" parameter or at VIM creation;
175 # that mgmt-network will be created.
176 if mgmtnet
and not mgmtnet_defined_in_vim
:
178 vim_filter
.get("name")
179 if vim_filter
.get("name")
180 else vim_filter
.get("id")[:16]
182 vim_net_id
, created_items
= target_vim
.new_network(
186 "Created mgmt network vim_net_id: {}".format(vim_net_id
)
190 raise NsWorkerExceptionNotFound(
191 "Network not found with this criteria: '{}'".format(
192 task
.get("find_params")
195 elif len(vim_nets
) > 1:
196 raise NsWorkerException(
197 "More than one network found with this criteria: '{}'".format(
203 vim_net_id
= vim_nets
[0]["id"]
206 params
= task
["params"]
207 vim_net_id
, created_items
= target_vim
.new_network(**params
)
210 ro_vim_item_update
= {
211 "vim_id": vim_net_id
,
212 "vim_status": "BUILD",
214 "created_items": created_items
,
219 "task={} {} new-net={} created={}".format(
220 task_id
, ro_task
["target_id"], vim_net_id
, created
224 return "BUILD", ro_vim_item_update
225 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
227 "task={} vim={} new-net: {}".format(task_id
, ro_task
["target_id"], e
)
229 ro_vim_item_update
= {
230 "vim_status": "VIM_ERROR",
232 "vim_message": str(e
),
235 return "FAILED", ro_vim_item_update
237 def refresh(self
, ro_task
):
238 """Call VIM to get network status"""
239 ro_task_id
= ro_task
["_id"]
240 target_vim
= self
.my_vims
[ro_task
["target_id"]]
241 vim_id
= ro_task
["vim_info"]["vim_id"]
242 net_to_refresh_list
= [vim_id
]
245 vim_dict
= target_vim
.refresh_nets_status(net_to_refresh_list
)
246 vim_info
= vim_dict
[vim_id
]
248 if vim_info
["status"] == "ACTIVE":
250 elif vim_info
["status"] == "BUILD":
251 task_status
= "BUILD"
253 task_status
= "FAILED"
254 except vimconn
.VimConnException
as e
:
255 # Mark all tasks at VIM_ERROR status
257 "ro_task={} vim={} get-net={}: {}".format(
258 ro_task_id
, ro_task
["target_id"], vim_id
, e
261 vim_info
= {"status": "VIM_ERROR", "error_msg": str(e
)}
262 task_status
= "FAILED"
264 ro_vim_item_update
= {}
265 if ro_task
["vim_info"]["vim_status"] != vim_info
["status"]:
266 ro_vim_item_update
["vim_status"] = vim_info
["status"]
268 if ro_task
["vim_info"]["vim_name"] != vim_info
.get("name"):
269 ro_vim_item_update
["vim_name"] = vim_info
.get("name")
271 if vim_info
["status"] in ("ERROR", "VIM_ERROR"):
272 if ro_task
["vim_info"]["vim_message"] != vim_info
.get("error_msg"):
273 ro_vim_item_update
["vim_message"] = vim_info
.get("error_msg")
274 elif vim_info
["status"] == "DELETED":
275 ro_vim_item_update
["vim_id"] = None
276 ro_vim_item_update
["vim_message"] = "Deleted externally"
278 if ro_task
["vim_info"]["vim_details"] != vim_info
["vim_info"]:
279 ro_vim_item_update
["vim_details"] = vim_info
["vim_info"]
281 if ro_vim_item_update
:
283 "ro_task={} {} get-net={}: status={} {}".format(
285 ro_task
["target_id"],
287 ro_vim_item_update
.get("vim_status"),
289 ro_vim_item_update
.get("vim_message")
290 if ro_vim_item_update
.get("vim_status") != "ACTIVE"
296 return task_status
, ro_vim_item_update
298 def delete(self
, ro_task
, task_index
):
299 task
= ro_task
["tasks"][task_index
]
300 task_id
= task
["task_id"]
301 net_vim_id
= ro_task
["vim_info"]["vim_id"]
302 ro_vim_item_update_ok
= {
303 "vim_status": "DELETED",
305 "vim_message": "DELETED",
310 if net_vim_id
or ro_task
["vim_info"]["created_items"]:
311 target_vim
= self
.my_vims
[ro_task
["target_id"]]
312 target_vim
.delete_network(
313 net_vim_id
, ro_task
["vim_info"]["created_items"]
315 except vimconn
.VimConnNotFoundException
:
316 ro_vim_item_update_ok
["vim_message"] = "already deleted"
317 except vimconn
.VimConnException
as e
:
319 "ro_task={} vim={} del-net={}: {}".format(
320 ro_task
["_id"], ro_task
["target_id"], net_vim_id
, e
323 ro_vim_item_update
= {
324 "vim_status": "VIM_ERROR",
325 "vim_message": "Error while deleting: {}".format(e
),
328 return "FAILED", ro_vim_item_update
331 "task={} {} del-net={} {}".format(
333 ro_task
["target_id"],
335 ro_vim_item_update_ok
.get("vim_message", ""),
339 return "DONE", ro_vim_item_update_ok
342 class VimInteractionClassification(VimInteractionBase
):
343 def new(self
, ro_task
, task_index
, task_depends
):
344 task
= ro_task
["tasks"][task_index
]
345 task_id
= task
["task_id"]
347 target_vim
= self
.my_vims
[ro_task
["target_id"]]
351 params
= task
["params"]
352 params_copy
= deepcopy(params
)
354 name
= params_copy
.pop("name")
355 logical_source_port_index
= int(
356 params_copy
.pop("logical_source_port_index")
358 logical_source_port
= params_copy
["logical_source_port"]
360 if logical_source_port
.startswith("TASK-"):
361 vm_id
= task_depends
[logical_source_port
]
362 params_copy
["logical_source_port"] = target_vim
.refresh_vms_status(
364 )[vm_id
]["interfaces"][logical_source_port_index
]["vim_interface_id"]
366 vim_classification_id
= target_vim
.new_classification(
367 name
, "legacy_flow_classifier", params_copy
370 ro_vim_item_update
= {
371 "vim_id": vim_classification_id
,
372 "vim_status": "DONE",
378 "task={} {} created={}".format(task_id
, ro_task
["target_id"], created
)
381 return "DONE", ro_vim_item_update
382 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
383 self
.logger
.debug(traceback
.format_exc())
385 "task={} {} new-vm: {}".format(task_id
, ro_task
["target_id"], e
)
387 ro_vim_item_update
= {
388 "vim_status": "VIM_ERROR",
390 "vim_message": str(e
),
393 return "FAILED", ro_vim_item_update
395 def delete(self
, ro_task
, task_index
):
396 task
= ro_task
["tasks"][task_index
]
397 task_id
= task
["task_id"]
398 classification_vim_id
= ro_task
["vim_info"]["vim_id"]
399 ro_vim_item_update_ok
= {
400 "vim_status": "DELETED",
402 "vim_message": "DELETED",
407 if classification_vim_id
:
408 target_vim
= self
.my_vims
[ro_task
["target_id"]]
409 target_vim
.delete_classification(classification_vim_id
)
410 except vimconn
.VimConnNotFoundException
:
411 ro_vim_item_update_ok
["vim_message"] = "already deleted"
412 except vimconn
.VimConnException
as e
:
414 "ro_task={} vim={} del-classification={}: {}".format(
415 ro_task
["_id"], ro_task
["target_id"], classification_vim_id
, e
418 ro_vim_item_update
= {
419 "vim_status": "VIM_ERROR",
420 "vim_message": "Error while deleting: {}".format(e
),
423 return "FAILED", ro_vim_item_update
426 "task={} {} del-classification={} {}".format(
428 ro_task
["target_id"],
429 classification_vim_id
,
430 ro_vim_item_update_ok
.get("vim_message", ""),
434 return "DONE", ro_vim_item_update_ok
437 class VimInteractionSfi(VimInteractionBase
):
438 def new(self
, ro_task
, task_index
, task_depends
):
439 task
= ro_task
["tasks"][task_index
]
440 task_id
= task
["task_id"]
442 target_vim
= self
.my_vims
[ro_task
["target_id"]]
446 params
= task
["params"]
447 params_copy
= deepcopy(params
)
448 name
= params_copy
["name"]
449 ingress_port
= params_copy
["ingress_port"]
450 egress_port
= params_copy
["egress_port"]
451 ingress_port_index
= params_copy
["ingress_port_index"]
452 egress_port_index
= params_copy
["egress_port_index"]
454 ingress_port_id
= ingress_port
455 egress_port_id
= egress_port
457 vm_id
= task_depends
[ingress_port
]
459 if ingress_port
.startswith("TASK-"):
460 ingress_port_id
= target_vim
.refresh_vms_status([vm_id
])[vm_id
][
462 ][ingress_port_index
]["vim_interface_id"]
464 if ingress_port
== egress_port
:
465 egress_port_id
= ingress_port_id
467 if egress_port
.startswith("TASK-"):
468 egress_port_id
= target_vim
.refresh_vms_status([vm_id
])[vm_id
][
470 ][egress_port_index
]["vim_interface_id"]
472 ingress_port_id_list
= [ingress_port_id
]
473 egress_port_id_list
= [egress_port_id
]
475 vim_sfi_id
= target_vim
.new_sfi(
476 name
, ingress_port_id_list
, egress_port_id_list
, sfc_encap
=False
479 ro_vim_item_update
= {
480 "vim_id": vim_sfi_id
,
481 "vim_status": "DONE",
487 "task={} {} created={}".format(task_id
, ro_task
["target_id"], created
)
490 return "DONE", ro_vim_item_update
491 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
492 self
.logger
.debug(traceback
.format_exc())
494 "task={} {} new-vm: {}".format(task_id
, ro_task
["target_id"], e
)
496 ro_vim_item_update
= {
497 "vim_status": "VIM_ERROR",
499 "vim_message": str(e
),
502 return "FAILED", ro_vim_item_update
504 def delete(self
, ro_task
, task_index
):
505 task
= ro_task
["tasks"][task_index
]
506 task_id
= task
["task_id"]
507 sfi_vim_id
= ro_task
["vim_info"]["vim_id"]
508 ro_vim_item_update_ok
= {
509 "vim_status": "DELETED",
511 "vim_message": "DELETED",
517 target_vim
= self
.my_vims
[ro_task
["target_id"]]
518 target_vim
.delete_sfi(sfi_vim_id
)
519 except vimconn
.VimConnNotFoundException
:
520 ro_vim_item_update_ok
["vim_message"] = "already deleted"
521 except vimconn
.VimConnException
as e
:
523 "ro_task={} vim={} del-sfi={}: {}".format(
524 ro_task
["_id"], ro_task
["target_id"], sfi_vim_id
, e
527 ro_vim_item_update
= {
528 "vim_status": "VIM_ERROR",
529 "vim_message": "Error while deleting: {}".format(e
),
532 return "FAILED", ro_vim_item_update
535 "task={} {} del-sfi={} {}".format(
537 ro_task
["target_id"],
539 ro_vim_item_update_ok
.get("vim_message", ""),
543 return "DONE", ro_vim_item_update_ok
546 class VimInteractionSf(VimInteractionBase
):
547 def new(self
, ro_task
, task_index
, task_depends
):
548 task
= ro_task
["tasks"][task_index
]
549 task_id
= task
["task_id"]
551 target_vim
= self
.my_vims
[ro_task
["target_id"]]
555 params
= task
["params"]
556 params_copy
= deepcopy(params
)
557 name
= params_copy
["name"]
558 sfi_list
= params_copy
["sfis"]
562 sfi_id
= task_depends
[sfi
] if sfi
.startswith("TASK-") else sfi
563 sfi_id_list
.append(sfi_id
)
565 vim_sf_id
= target_vim
.new_sf(name
, sfi_id_list
, sfc_encap
=False)
567 ro_vim_item_update
= {
569 "vim_status": "DONE",
575 "task={} {} created={}".format(task_id
, ro_task
["target_id"], created
)
578 return "DONE", ro_vim_item_update
579 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
580 self
.logger
.debug(traceback
.format_exc())
582 "task={} {} new-vm: {}".format(task_id
, ro_task
["target_id"], e
)
584 ro_vim_item_update
= {
585 "vim_status": "VIM_ERROR",
587 "vim_message": str(e
),
590 return "FAILED", ro_vim_item_update
592 def delete(self
, ro_task
, task_index
):
593 task
= ro_task
["tasks"][task_index
]
594 task_id
= task
["task_id"]
595 sf_vim_id
= ro_task
["vim_info"]["vim_id"]
596 ro_vim_item_update_ok
= {
597 "vim_status": "DELETED",
599 "vim_message": "DELETED",
605 target_vim
= self
.my_vims
[ro_task
["target_id"]]
606 target_vim
.delete_sf(sf_vim_id
)
607 except vimconn
.VimConnNotFoundException
:
608 ro_vim_item_update_ok
["vim_message"] = "already deleted"
609 except vimconn
.VimConnException
as e
:
611 "ro_task={} vim={} del-sf={}: {}".format(
612 ro_task
["_id"], ro_task
["target_id"], sf_vim_id
, e
615 ro_vim_item_update
= {
616 "vim_status": "VIM_ERROR",
617 "vim_message": "Error while deleting: {}".format(e
),
620 return "FAILED", ro_vim_item_update
623 "task={} {} del-sf={} {}".format(
625 ro_task
["target_id"],
627 ro_vim_item_update_ok
.get("vim_message", ""),
631 return "DONE", ro_vim_item_update_ok
634 class VimInteractionSfp(VimInteractionBase
):
635 def new(self
, ro_task
, task_index
, task_depends
):
636 task
= ro_task
["tasks"][task_index
]
637 task_id
= task
["task_id"]
639 target_vim
= self
.my_vims
[ro_task
["target_id"]]
643 params
= task
["params"]
644 params_copy
= deepcopy(params
)
645 name
= params_copy
["name"]
646 sf_list
= params_copy
["sfs"]
647 classification_list
= params_copy
["classifications"]
649 classification_id_list
= []
652 for classification
in classification_list
:
654 task_depends
[classification
]
655 if classification
.startswith("TASK-")
658 classification_id_list
.append(classi_id
)
661 sf_id
= task_depends
[sf
] if sf
.startswith("TASK-") else sf
662 sf_id_list
.append(sf_id
)
664 vim_sfp_id
= target_vim
.new_sfp(
665 name
, classification_id_list
, sf_id_list
, sfc_encap
=False
668 ro_vim_item_update
= {
669 "vim_id": vim_sfp_id
,
670 "vim_status": "DONE",
676 "task={} {} created={}".format(task_id
, ro_task
["target_id"], created
)
679 return "DONE", ro_vim_item_update
680 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
681 self
.logger
.debug(traceback
.format_exc())
683 "task={} {} new-vm: {}".format(task_id
, ro_task
["target_id"], e
)
685 ro_vim_item_update
= {
686 "vim_status": "VIM_ERROR",
688 "vim_message": str(e
),
691 return "FAILED", ro_vim_item_update
693 def delete(self
, ro_task
, task_index
):
694 task
= ro_task
["tasks"][task_index
]
695 task_id
= task
["task_id"]
696 sfp_vim_id
= ro_task
["vim_info"]["vim_id"]
697 ro_vim_item_update_ok
= {
698 "vim_status": "DELETED",
700 "vim_message": "DELETED",
706 target_vim
= self
.my_vims
[ro_task
["target_id"]]
707 target_vim
.delete_sfp(sfp_vim_id
)
708 except vimconn
.VimConnNotFoundException
:
709 ro_vim_item_update_ok
["vim_message"] = "already deleted"
710 except vimconn
.VimConnException
as e
:
712 "ro_task={} vim={} del-sfp={}: {}".format(
713 ro_task
["_id"], ro_task
["target_id"], sfp_vim_id
, e
716 ro_vim_item_update
= {
717 "vim_status": "VIM_ERROR",
718 "vim_message": "Error while deleting: {}".format(e
),
721 return "FAILED", ro_vim_item_update
724 "task={} {} del-sfp={} {}".format(
726 ro_task
["target_id"],
728 ro_vim_item_update_ok
.get("vim_message", ""),
732 return "DONE", ro_vim_item_update_ok
735 class VimInteractionVdu(VimInteractionBase
):
736 max_retries_inject_ssh_key
= 20 # 20 times
737 time_retries_inject_ssh_key
= 30 # wevery 30 seconds
739 def new(self
, ro_task
, task_index
, task_depends
):
740 task
= ro_task
["tasks"][task_index
]
741 task_id
= task
["task_id"]
743 target_vim
= self
.my_vims
[ro_task
["target_id"]]
746 params
= task
["params"]
747 params_copy
= deepcopy(params
)
748 net_list
= params_copy
["net_list"]
751 # change task_id into network_id
752 if "net_id" in net
and net
["net_id"].startswith("TASK-"):
753 network_id
= task_depends
[net
["net_id"]]
756 raise NsWorkerException(
757 "Cannot create VM because depends on a network not created or found "
758 "for {}".format(net
["net_id"])
761 net
["net_id"] = network_id
763 if params_copy
["image_id"].startswith("TASK-"):
764 params_copy
["image_id"] = task_depends
[params_copy
["image_id"]]
766 if params_copy
["flavor_id"].startswith("TASK-"):
767 params_copy
["flavor_id"] = task_depends
[params_copy
["flavor_id"]]
769 affinity_group_list
= params_copy
["affinity_group_list"]
770 for affinity_group
in affinity_group_list
:
771 # change task_id into affinity_group_id
772 if "affinity_group_id" in affinity_group
and affinity_group
[
774 ].startswith("TASK-"):
775 affinity_group_id
= task_depends
[
776 affinity_group
["affinity_group_id"]
779 if not affinity_group_id
:
780 raise NsWorkerException(
781 "found for {}".format(affinity_group
["affinity_group_id"])
784 affinity_group
["affinity_group_id"] = affinity_group_id
785 vim_vm_id
, created_items
= target_vim
.new_vminstance(**params_copy
)
786 interfaces
= [iface
["vim_id"] for iface
in params_copy
["net_list"]]
788 # add to created items previous_created_volumes (healing)
789 if task
.get("previous_created_volumes"):
790 for k
, v
in task
["previous_created_volumes"].items():
793 ro_vim_item_update
= {
795 "vim_status": "BUILD",
797 "created_items": created_items
,
800 "interfaces_vim_ids": interfaces
,
802 "interfaces_backup": [],
805 "task={} {} new-vm={} created={}".format(
806 task_id
, ro_task
["target_id"], vim_vm_id
, created
810 return "BUILD", ro_vim_item_update
811 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
812 self
.logger
.debug(traceback
.format_exc())
814 "task={} {} new-vm: {}".format(task_id
, ro_task
["target_id"], e
)
816 ro_vim_item_update
= {
817 "vim_status": "VIM_ERROR",
819 "vim_message": str(e
),
822 return "FAILED", ro_vim_item_update
824 def delete(self
, ro_task
, task_index
):
825 task
= ro_task
["tasks"][task_index
]
826 task_id
= task
["task_id"]
827 vm_vim_id
= ro_task
["vim_info"]["vim_id"]
828 ro_vim_item_update_ok
= {
829 "vim_status": "DELETED",
831 "vim_message": "DELETED",
837 "delete_vminstance: vm_vim_id={} created_items={}".format(
838 vm_vim_id
, ro_task
["vim_info"]["created_items"]
841 if vm_vim_id
or ro_task
["vim_info"]["created_items"]:
842 target_vim
= self
.my_vims
[ro_task
["target_id"]]
843 target_vim
.delete_vminstance(
845 ro_task
["vim_info"]["created_items"],
846 ro_task
["vim_info"].get("volumes_to_hold", []),
848 except vimconn
.VimConnNotFoundException
:
849 ro_vim_item_update_ok
["vim_message"] = "already deleted"
850 except vimconn
.VimConnException
as e
:
852 "ro_task={} vim={} del-vm={}: {}".format(
853 ro_task
["_id"], ro_task
["target_id"], vm_vim_id
, e
856 ro_vim_item_update
= {
857 "vim_status": "VIM_ERROR",
858 "vim_message": "Error while deleting: {}".format(e
),
861 return "FAILED", ro_vim_item_update
864 "task={} {} del-vm={} {}".format(
866 ro_task
["target_id"],
868 ro_vim_item_update_ok
.get("vim_message", ""),
872 return "DONE", ro_vim_item_update_ok
874 def refresh(self
, ro_task
):
875 """Call VIM to get vm status"""
876 ro_task_id
= ro_task
["_id"]
877 target_vim
= self
.my_vims
[ro_task
["target_id"]]
878 vim_id
= ro_task
["vim_info"]["vim_id"]
883 vm_to_refresh_list
= [vim_id
]
885 vim_dict
= target_vim
.refresh_vms_status(vm_to_refresh_list
)
886 vim_info
= vim_dict
[vim_id
]
888 if vim_info
["status"] == "ACTIVE":
890 elif vim_info
["status"] == "BUILD":
891 task_status
= "BUILD"
893 task_status
= "FAILED"
895 # try to load and parse vim_information
897 vim_info_info
= yaml
.safe_load(vim_info
["vim_info"])
898 if vim_info_info
.get("name"):
899 vim_info
["name"] = vim_info_info
["name"]
900 except Exception as vim_info_error
:
901 self
.logger
.exception(
902 f
"{vim_info_error} occured while getting the vim_info from yaml"
904 except vimconn
.VimConnException
as e
:
905 # Mark all tasks at VIM_ERROR status
907 "ro_task={} vim={} get-vm={}: {}".format(
908 ro_task_id
, ro_task
["target_id"], vim_id
, e
911 vim_info
= {"status": "VIM_ERROR", "error_msg": str(e
)}
912 task_status
= "FAILED"
914 ro_vim_item_update
= {}
916 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
918 if vim_info
.get("interfaces"):
919 for vim_iface_id
in ro_task
["vim_info"]["interfaces_vim_ids"]:
923 for iface
in vim_info
["interfaces"]
924 if vim_iface_id
== iface
["vim_interface_id"]
929 # iface.pop("vim_info", None)
930 vim_interfaces
.append(iface
)
934 for t
in ro_task
["tasks"]
935 if t
and t
["action"] == "CREATE" and t
["status"] != "FINISHED"
937 if vim_interfaces
and task_create
.get("mgmt_vnf_interface") is not None:
938 vim_interfaces
[task_create
["mgmt_vnf_interface"]][
942 mgmt_vdu_iface
= task_create
.get(
943 "mgmt_vdu_interface", task_create
.get("mgmt_vnf_interface", 0)
946 vim_interfaces
[mgmt_vdu_iface
]["mgmt_vdu_interface"] = True
948 if ro_task
["vim_info"]["interfaces"] != vim_interfaces
:
949 ro_vim_item_update
["interfaces"] = vim_interfaces
951 if ro_task
["vim_info"]["vim_status"] != vim_info
["status"]:
952 ro_vim_item_update
["vim_status"] = vim_info
["status"]
954 if ro_task
["vim_info"]["vim_name"] != vim_info
.get("name"):
955 ro_vim_item_update
["vim_name"] = vim_info
.get("name")
957 if vim_info
["status"] in ("ERROR", "VIM_ERROR"):
958 if ro_task
["vim_info"]["vim_message"] != vim_info
.get("error_msg"):
959 ro_vim_item_update
["vim_message"] = vim_info
.get("error_msg")
960 elif vim_info
["status"] == "DELETED":
961 ro_vim_item_update
["vim_id"] = None
962 ro_vim_item_update
["vim_message"] = "Deleted externally"
964 if ro_task
["vim_info"]["vim_details"] != vim_info
["vim_info"]:
965 ro_vim_item_update
["vim_details"] = vim_info
["vim_info"]
967 if ro_vim_item_update
:
969 "ro_task={} {} get-vm={}: status={} {}".format(
971 ro_task
["target_id"],
973 ro_vim_item_update
.get("vim_status"),
975 ro_vim_item_update
.get("vim_message")
976 if ro_vim_item_update
.get("vim_status") != "ACTIVE"
982 return task_status
, ro_vim_item_update
984 def exec(self
, ro_task
, task_index
, task_depends
):
985 task
= ro_task
["tasks"][task_index
]
986 task_id
= task
["task_id"]
987 target_vim
= self
.my_vims
[ro_task
["target_id"]]
988 db_task_update
= {"retries": 0}
989 retries
= task
.get("retries", 0)
992 params
= task
["params"]
993 params_copy
= deepcopy(params
)
994 params_copy
["ro_key"] = self
.db
.decrypt(
995 params_copy
.pop("private_key"),
996 params_copy
.pop("schema_version"),
997 params_copy
.pop("salt"),
999 params_copy
["ip_addr"] = params_copy
.pop("ip_address")
1000 target_vim
.inject_user_key(**params_copy
)
1002 "task={} {} action-vm=inject_key".format(task_id
, ro_task
["target_id"])
1009 ) # params_copy["key"]
1010 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
1013 self
.logger
.debug(traceback
.format_exc())
1014 if retries
< self
.max_retries_inject_ssh_key
:
1020 "next_retry": self
.time_retries_inject_ssh_key
,
1025 "task={} {} inject-ssh-key: {}".format(task_id
, ro_task
["target_id"], e
)
1027 ro_vim_item_update
= {"vim_message": str(e
)}
1029 return "FAILED", ro_vim_item_update
, db_task_update
1032 class VimInteractionImage(VimInteractionBase
):
1033 def new(self
, ro_task
, task_index
, task_depends
):
1034 task
= ro_task
["tasks"][task_index
]
1035 task_id
= task
["task_id"]
1038 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1043 if task
.get("find_params"):
1044 vim_images
= target_vim
.get_image_list(
1045 task
["find_params"].get("filter_dict", {})
1049 raise NsWorkerExceptionNotFound(
1050 "Image not found with this criteria: '{}'".format(
1054 elif len(vim_images
) > 1:
1055 raise NsWorkerException(
1056 "More than one image found with this criteria: '{}'".format(
1061 vim_image_id
= vim_images
[0]["id"]
1063 ro_vim_item_update
= {
1064 "vim_id": vim_image_id
,
1065 "vim_status": "ACTIVE",
1067 "created_items": created_items
,
1068 "vim_details": None,
1069 "vim_message": None,
1072 "task={} {} new-image={} created={}".format(
1073 task_id
, ro_task
["target_id"], vim_image_id
, created
1077 return "DONE", ro_vim_item_update
1078 except (NsWorkerException
, vimconn
.VimConnException
) as e
:
1080 "task={} {} new-image: {}".format(task_id
, ro_task
["target_id"], e
)
1082 ro_vim_item_update
= {
1083 "vim_status": "VIM_ERROR",
1085 "vim_message": str(e
),
1088 return "FAILED", ro_vim_item_update
1091 class VimInteractionSharedVolume(VimInteractionBase
):
1092 def delete(self
, ro_task
, task_index
):
1093 task
= ro_task
["tasks"][task_index
]
1094 task_id
= task
["task_id"]
1095 shared_volume_vim_id
= ro_task
["vim_info"]["vim_id"]
1096 created_items
= ro_task
["vim_info"]["created_items"]
1097 ro_vim_item_update_ok
= {
1098 "vim_status": "DELETED",
1100 "vim_message": "DELETED",
1103 if created_items
and created_items
.get(shared_volume_vim_id
).get("keep"):
1104 ro_vim_item_update_ok
= {
1105 "vim_status": "ACTIVE",
1107 "vim_message": None,
1109 return "DONE", ro_vim_item_update_ok
1111 if shared_volume_vim_id
:
1112 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1113 target_vim
.delete_shared_volumes(shared_volume_vim_id
)
1114 except vimconn
.VimConnNotFoundException
:
1115 ro_vim_item_update_ok
["vim_message"] = "already deleted"
1116 except vimconn
.VimConnException
as e
:
1118 "ro_task={} vim={} del-shared-volume={}: {}".format(
1119 ro_task
["_id"], ro_task
["target_id"], shared_volume_vim_id
, e
1122 ro_vim_item_update
= {
1123 "vim_status": "VIM_ERROR",
1124 "vim_message": "Error while deleting: {}".format(e
),
1127 return "FAILED", ro_vim_item_update
1130 "task={} {} del-shared-volume={} {}".format(
1132 ro_task
["target_id"],
1133 shared_volume_vim_id
,
1134 ro_vim_item_update_ok
.get("vim_message", ""),
1138 return "DONE", ro_vim_item_update_ok
1140 def new(self
, ro_task
, task_index
, task_depends
):
1141 task
= ro_task
["tasks"][task_index
]
1142 task_id
= task
["task_id"]
1145 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1148 shared_volume_vim_id
= None
1149 shared_volume_data
= None
1151 if task
.get("params"):
1152 shared_volume_data
= task
["params"]
1154 if shared_volume_data
:
1156 f
"Creating the new shared_volume for {shared_volume_data}\n"
1160 shared_volume_vim_id
,
1161 ) = target_vim
.new_shared_volumes(shared_volume_data
)
1163 created_items
[shared_volume_vim_id
] = {
1164 "name": shared_volume_name
,
1165 "keep": shared_volume_data
.get("keep"),
1168 ro_vim_item_update
= {
1169 "vim_id": shared_volume_vim_id
,
1170 "vim_status": "ACTIVE",
1172 "created_items": created_items
,
1173 "vim_details": None,
1174 "vim_message": None,
1177 "task={} {} new-shared-volume={} created={}".format(
1178 task_id
, ro_task
["target_id"], shared_volume_vim_id
, created
1182 return "DONE", ro_vim_item_update
1183 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
1185 "task={} vim={} new-shared-volume:"
1186 " {}".format(task_id
, ro_task
["target_id"], e
)
1188 ro_vim_item_update
= {
1189 "vim_status": "VIM_ERROR",
1191 "vim_message": str(e
),
1194 return "FAILED", ro_vim_item_update
1197 class VimInteractionFlavor(VimInteractionBase
):
1198 def delete(self
, ro_task
, task_index
):
1199 task
= ro_task
["tasks"][task_index
]
1200 task_id
= task
["task_id"]
1201 flavor_vim_id
= ro_task
["vim_info"]["vim_id"]
1202 ro_vim_item_update_ok
= {
1203 "vim_status": "DELETED",
1205 "vim_message": "DELETED",
1211 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1212 target_vim
.delete_flavor(flavor_vim_id
)
1213 except vimconn
.VimConnNotFoundException
:
1214 ro_vim_item_update_ok
["vim_message"] = "already deleted"
1215 except vimconn
.VimConnException
as e
:
1217 "ro_task={} vim={} del-flavor={}: {}".format(
1218 ro_task
["_id"], ro_task
["target_id"], flavor_vim_id
, e
1221 ro_vim_item_update
= {
1222 "vim_status": "VIM_ERROR",
1223 "vim_message": "Error while deleting: {}".format(e
),
1226 return "FAILED", ro_vim_item_update
1229 "task={} {} del-flavor={} {}".format(
1231 ro_task
["target_id"],
1233 ro_vim_item_update_ok
.get("vim_message", ""),
1237 return "DONE", ro_vim_item_update_ok
1239 def new(self
, ro_task
, task_index
, task_depends
):
1240 task
= ro_task
["tasks"][task_index
]
1241 task_id
= task
["task_id"]
1244 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1247 vim_flavor_id
= None
1249 if task
.get("find_params", {}).get("vim_flavor_id"):
1250 vim_flavor_id
= task
["find_params"]["vim_flavor_id"]
1251 elif task
.get("find_params", {}).get("flavor_data"):
1253 flavor_data
= task
["find_params"]["flavor_data"]
1254 vim_flavor_id
= target_vim
.get_flavor_id_from_data(flavor_data
)
1255 except vimconn
.VimConnNotFoundException
as flavor_not_found_msg
:
1256 self
.logger
.warning(
1257 f
"VimConnNotFoundException occured: {flavor_not_found_msg}"
1260 if not vim_flavor_id
and task
.get("params"):
1262 flavor_data
= task
["params"]["flavor_data"]
1263 vim_flavor_id
= target_vim
.new_flavor(flavor_data
)
1266 ro_vim_item_update
= {
1267 "vim_id": vim_flavor_id
,
1268 "vim_status": "ACTIVE",
1270 "created_items": created_items
,
1271 "vim_details": None,
1272 "vim_message": None,
1275 "task={} {} new-flavor={} created={}".format(
1276 task_id
, ro_task
["target_id"], vim_flavor_id
, created
1280 return "DONE", ro_vim_item_update
1281 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
1283 "task={} vim={} new-flavor: {}".format(task_id
, ro_task
["target_id"], e
)
1285 ro_vim_item_update
= {
1286 "vim_status": "VIM_ERROR",
1288 "vim_message": str(e
),
1291 return "FAILED", ro_vim_item_update
1294 class VimInteractionAffinityGroup(VimInteractionBase
):
1295 def delete(self
, ro_task
, task_index
):
1296 task
= ro_task
["tasks"][task_index
]
1297 task_id
= task
["task_id"]
1298 affinity_group_vim_id
= ro_task
["vim_info"]["vim_id"]
1299 ro_vim_item_update_ok
= {
1300 "vim_status": "DELETED",
1302 "vim_message": "DELETED",
1307 if affinity_group_vim_id
:
1308 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1309 target_vim
.delete_affinity_group(affinity_group_vim_id
)
1310 except vimconn
.VimConnNotFoundException
:
1311 ro_vim_item_update_ok
["vim_message"] = "already deleted"
1312 except vimconn
.VimConnException
as e
:
1314 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format(
1315 ro_task
["_id"], ro_task
["target_id"], affinity_group_vim_id
, e
1318 ro_vim_item_update
= {
1319 "vim_status": "VIM_ERROR",
1320 "vim_message": "Error while deleting: {}".format(e
),
1323 return "FAILED", ro_vim_item_update
1326 "task={} {} del-affinity-or-anti-affinity-group={} {}".format(
1328 ro_task
["target_id"],
1329 affinity_group_vim_id
,
1330 ro_vim_item_update_ok
.get("vim_message", ""),
1334 return "DONE", ro_vim_item_update_ok
1336 def new(self
, ro_task
, task_index
, task_depends
):
1337 task
= ro_task
["tasks"][task_index
]
1338 task_id
= task
["task_id"]
1341 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1344 affinity_group_vim_id
= None
1345 affinity_group_data
= None
1346 param_affinity_group_id
= ""
1348 if task
.get("params"):
1349 affinity_group_data
= task
["params"].get("affinity_group_data")
1351 if affinity_group_data
and affinity_group_data
.get("vim-affinity-group-id"):
1353 param_affinity_group_id
= task
["params"]["affinity_group_data"].get(
1354 "vim-affinity-group-id"
1356 affinity_group_vim_id
= target_vim
.get_affinity_group(
1357 param_affinity_group_id
1359 except vimconn
.VimConnNotFoundException
:
1361 "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}"
1362 "could not be found at VIM. Creating a new one.".format(
1363 task_id
, ro_task
["target_id"], param_affinity_group_id
1367 if not affinity_group_vim_id
and affinity_group_data
:
1368 affinity_group_vim_id
= target_vim
.new_affinity_group(
1373 ro_vim_item_update
= {
1374 "vim_id": affinity_group_vim_id
,
1375 "vim_status": "ACTIVE",
1377 "created_items": created_items
,
1378 "vim_details": None,
1379 "vim_message": None,
1382 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format(
1383 task_id
, ro_task
["target_id"], affinity_group_vim_id
, created
1387 return "DONE", ro_vim_item_update
1388 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
1390 "task={} vim={} new-affinity-or-anti-affinity-group:"
1391 " {}".format(task_id
, ro_task
["target_id"], e
)
1393 ro_vim_item_update
= {
1394 "vim_status": "VIM_ERROR",
1396 "vim_message": str(e
),
1399 return "FAILED", ro_vim_item_update
1402 class VimInteractionUpdateVdu(VimInteractionBase
):
1403 def exec(self
, ro_task
, task_index
, task_depends
):
1404 task
= ro_task
["tasks"][task_index
]
1405 task_id
= task
["task_id"]
1406 db_task_update
= {"retries": 0}
1407 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1411 if task
.get("params"):
1412 vim_vm_id
= task
["params"].get("vim_vm_id")
1413 action
= task
["params"].get("action")
1414 context
= {action
: action
}
1415 target_vim
.action_vminstance(vim_vm_id
, context
)
1417 ro_vim_item_update
= {
1418 "vim_id": vim_vm_id
,
1419 "vim_status": "ACTIVE",
1422 "task={} {} vm-migration done".format(task_id
, ro_task
["target_id"])
1424 return "DONE", ro_vim_item_update
, db_task_update
1425 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
1427 "task={} vim={} VM Migration:"
1428 " {}".format(task_id
, ro_task
["target_id"], e
)
1430 ro_vim_item_update
= {
1431 "vim_status": "VIM_ERROR",
1432 "vim_message": str(e
),
1435 return "FAILED", ro_vim_item_update
, db_task_update
1438 class VimInteractionSdnNet(VimInteractionBase
):
1440 def _match_pci(port_pci
, mapping
):
1442 Check if port_pci matches with mapping.
1443 The mapping can have brackets to indicate that several chars are accepted. e.g
1444 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
1445 :param port_pci: text
1446 :param mapping: text, can contain brackets to indicate several chars are available
1447 :return: True if matches, False otherwise
1449 if not port_pci
or not mapping
:
1451 if port_pci
== mapping
:
1457 bracket_start
= mapping
.find("[", mapping_index
)
1459 if bracket_start
== -1:
1462 bracket_end
= mapping
.find("]", bracket_start
)
1463 if bracket_end
== -1:
1466 length
= bracket_start
- mapping_index
1469 and port_pci
[pci_index
: pci_index
+ length
]
1470 != mapping
[mapping_index
:bracket_start
]
1475 port_pci
[pci_index
+ length
]
1476 not in mapping
[bracket_start
+ 1 : bracket_end
]
1480 pci_index
+= length
+ 1
1481 mapping_index
= bracket_end
+ 1
1483 if port_pci
[pci_index
:] != mapping
[mapping_index
:]:
1488 def _get_interfaces(self
, vlds_to_connect
, vim_account_id
):
1490 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
1491 :param vim_account_id:
1496 for vld
in vlds_to_connect
:
1497 table
, _
, db_id
= vld
.partition(":")
1498 db_id
, _
, vld
= db_id
.partition(":")
1499 _
, _
, vld_id
= vld
.partition(".")
1501 if table
== "vnfrs":
1502 q_filter
= {"vim-account-id": vim_account_id
, "_id": db_id
}
1503 iface_key
= "vnf-vld-id"
1504 else: # table == "nsrs"
1505 q_filter
= {"vim-account-id": vim_account_id
, "nsr-id-ref": db_id
}
1506 iface_key
= "ns-vld-id"
1508 db_vnfrs
= self
.db
.get_list("vnfrs", q_filter
=q_filter
)
1510 for db_vnfr
in db_vnfrs
:
1511 for vdu_index
, vdur
in enumerate(db_vnfr
.get("vdur", ())):
1512 for iface_index
, interface
in enumerate(vdur
["interfaces"]):
1513 if interface
.get(iface_key
) == vld_id
and interface
.get(
1515 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
1517 interface_
= interface
.copy()
1518 interface_
["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
1519 db_vnfr
["_id"], vdu_index
, iface_index
1522 if vdur
.get("status") == "ERROR":
1523 interface_
["status"] = "ERROR"
1525 interfaces
.append(interface_
)
1529 def refresh(self
, ro_task
):
1530 # look for task create
1531 task_create_index
, _
= next(
1533 for i_t
in enumerate(ro_task
["tasks"])
1535 and i_t
[1]["action"] == "CREATE"
1536 and i_t
[1]["status"] != "FINISHED"
1539 return self
.new(ro_task
, task_create_index
, None)
1541 def new(self
, ro_task
, task_index
, task_depends
):
1542 task
= ro_task
["tasks"][task_index
]
1543 task_id
= task
["task_id"]
1544 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1546 sdn_net_id
= ro_task
["vim_info"]["vim_id"]
1548 created_items
= ro_task
["vim_info"].get("created_items")
1549 connected_ports
= ro_task
["vim_info"].get("connected_ports", [])
1550 new_connected_ports
= []
1551 last_update
= ro_task
["vim_info"].get("last_update", 0)
1552 sdn_status
= ro_task
["vim_info"].get("vim_status", "BUILD") or "BUILD"
1554 created
= ro_task
["vim_info"].get("created", False)
1559 params
= task
["params"]
1560 vlds_to_connect
= params
.get("vlds", [])
1561 associated_vim
= params
.get("target_vim")
1562 # external additional ports
1563 additional_ports
= params
.get("sdn-ports") or ()
1564 _
, _
, vim_account_id
= (
1566 if associated_vim
is None
1567 else associated_vim
.partition(":")
1571 # get associated VIM
1572 if associated_vim
not in self
.db_vims
:
1573 self
.db_vims
[associated_vim
] = self
.db
.get_one(
1574 "vim_accounts", {"_id": vim_account_id
}
1577 db_vim
= self
.db_vims
[associated_vim
]
1579 # look for ports to connect
1580 ports
= self
._get
_interfaces
(vlds_to_connect
, vim_account_id
)
1584 pending_ports
= error_ports
= 0
1586 sdn_need_update
= False
1589 vlan_used
= port
.get("vlan") or vlan_used
1591 # TODO. Do not connect if already done
1592 if not port
.get("compute_node") or not port
.get("pci"):
1593 if port
.get("status") == "ERROR":
1600 compute_node_mappings
= next(
1603 for c
in db_vim
["config"].get("sdn-port-mapping", ())
1604 if c
and c
["compute_node"] == port
["compute_node"]
1609 if compute_node_mappings
:
1610 # process port_mapping pci of type 0000:af:1[01].[1357]
1614 for p
in compute_node_mappings
["ports"]
1615 if self
._match
_pci
(port
["pci"], p
.get("pci"))
1621 if not db_vim
["config"].get("mapping_not_needed"):
1623 "Port mapping not found for compute_node={} pci={}".format(
1624 port
["compute_node"], port
["pci"]
1631 service_endpoint_id
= "{}:{}".format(port
["compute_node"], port
["pci"])
1633 "service_endpoint_id": pmap
.get("service_endpoint_id")
1634 or service_endpoint_id
,
1635 "service_endpoint_encapsulation_type": (
1636 "dot1q" if port
["type"] == "SR-IOV" else None
1638 "service_endpoint_encapsulation_info": {
1639 "vlan": port
.get("vlan"),
1640 "mac": port
.get("mac-address"),
1641 "device_id": pmap
.get("device_id") or port
["compute_node"],
1642 "device_interface_id": pmap
.get("device_interface_id")
1644 "switch_dpid": pmap
.get("switch_id") or pmap
.get("switch_dpid"),
1645 "switch_port": pmap
.get("switch_port"),
1646 "service_mapping_info": pmap
.get("service_mapping_info"),
1651 # if port["modified_at"] > last_update:
1652 # sdn_need_update = True
1653 new_connected_ports
.append(port
["id"]) # TODO
1654 sdn_ports
.append(new_port
)
1658 "{} interfaces have not been created as VDU is on ERROR status".format(
1663 # connect external ports
1664 for index
, additional_port
in enumerate(additional_ports
):
1665 additional_port_id
= additional_port
.get(
1666 "service_endpoint_id"
1667 ) or "external-{}".format(index
)
1670 "service_endpoint_id": additional_port_id
,
1671 "service_endpoint_encapsulation_type": additional_port
.get(
1672 "service_endpoint_encapsulation_type", "dot1q"
1674 "service_endpoint_encapsulation_info": {
1675 "vlan": additional_port
.get("vlan") or vlan_used
,
1676 "mac": additional_port
.get("mac_address"),
1677 "device_id": additional_port
.get("device_id"),
1678 "device_interface_id": additional_port
.get(
1679 "device_interface_id"
1681 "switch_dpid": additional_port
.get("switch_dpid")
1682 or additional_port
.get("switch_id"),
1683 "switch_port": additional_port
.get("switch_port"),
1684 "service_mapping_info": additional_port
.get(
1685 "service_mapping_info"
1690 new_connected_ports
.append(additional_port_id
)
1693 # if there are more ports to connect or they have been modified, call create/update
1695 sdn_status
= "ERROR"
1696 sdn_info
= "; ".join(error_list
)
1697 elif set(connected_ports
) != set(new_connected_ports
) or sdn_need_update
:
1698 last_update
= time
.time()
1701 if len(sdn_ports
) < 2:
1702 sdn_status
= "ACTIVE"
1704 if not pending_ports
:
1706 "task={} {} new-sdn-net done, less than 2 ports".format(
1707 task_id
, ro_task
["target_id"]
1711 net_type
= params
.get("type") or "ELAN"
1715 ) = target_vim
.create_connectivity_service(net_type
, sdn_ports
)
1718 "task={} {} new-sdn-net={} created={}".format(
1719 task_id
, ro_task
["target_id"], sdn_net_id
, created
1723 created_items
= target_vim
.edit_connectivity_service(
1724 sdn_net_id
, conn_info
=created_items
, connection_points
=sdn_ports
1728 "task={} {} update-sdn-net={} created={}".format(
1729 task_id
, ro_task
["target_id"], sdn_net_id
, created
1733 connected_ports
= new_connected_ports
1735 wim_status_dict
= target_vim
.get_connectivity_service_status(
1736 sdn_net_id
, conn_info
=created_items
1738 sdn_status
= wim_status_dict
["sdn_status"]
1740 if wim_status_dict
.get("sdn_info"):
1741 sdn_info
= str(wim_status_dict
.get("sdn_info")) or ""
1743 if wim_status_dict
.get("error_msg"):
1744 sdn_info
= wim_status_dict
.get("error_msg") or ""
1747 if sdn_status
!= "ERROR":
1748 sdn_info
= "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1749 len(ports
) - pending_ports
, len(ports
)
1752 if sdn_status
== "ACTIVE":
1753 sdn_status
= "BUILD"
1755 ro_vim_item_update
= {
1756 "vim_id": sdn_net_id
,
1757 "vim_status": sdn_status
,
1759 "created_items": created_items
,
1760 "connected_ports": connected_ports
,
1761 "vim_details": sdn_info
,
1762 "vim_message": None,
1763 "last_update": last_update
,
1766 return sdn_status
, ro_vim_item_update
1767 except Exception as e
:
1769 "task={} vim={} new-net: {}".format(task_id
, ro_task
["target_id"], e
),
1770 exc_info
=not isinstance(
1771 e
, (sdnconn
.SdnConnectorError
, vimconn
.VimConnException
)
1774 ro_vim_item_update
= {
1775 "vim_status": "VIM_ERROR",
1777 "vim_message": str(e
),
1780 return "FAILED", ro_vim_item_update
1782 def delete(self
, ro_task
, task_index
):
1783 task
= ro_task
["tasks"][task_index
]
1784 task_id
= task
["task_id"]
1785 sdn_vim_id
= ro_task
["vim_info"].get("vim_id")
1786 ro_vim_item_update_ok
= {
1787 "vim_status": "DELETED",
1789 "vim_message": "DELETED",
1795 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1796 target_vim
.delete_connectivity_service(
1797 sdn_vim_id
, ro_task
["vim_info"].get("created_items")
1800 except Exception as e
:
1802 isinstance(e
, sdnconn
.SdnConnectorError
)
1803 and e
.http_code
== HTTPStatus
.NOT_FOUND
.value
1805 ro_vim_item_update_ok
["vim_message"] = "already deleted"
1808 "ro_task={} vim={} del-sdn-net={}: {}".format(
1809 ro_task
["_id"], ro_task
["target_id"], sdn_vim_id
, e
1811 exc_info
=not isinstance(
1812 e
, (sdnconn
.SdnConnectorError
, vimconn
.VimConnException
)
1815 ro_vim_item_update
= {
1816 "vim_status": "VIM_ERROR",
1817 "vim_message": "Error while deleting: {}".format(e
),
1820 return "FAILED", ro_vim_item_update
1823 "task={} {} del-sdn-net={} {}".format(
1825 ro_task
["target_id"],
1827 ro_vim_item_update_ok
.get("vim_message", ""),
1831 return "DONE", ro_vim_item_update_ok
1834 class VimInteractionMigration(VimInteractionBase
):
1835 def exec(self
, ro_task
, task_index
, task_depends
):
1836 task
= ro_task
["tasks"][task_index
]
1837 task_id
= task
["task_id"]
1838 db_task_update
= {"retries": 0}
1839 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1841 refreshed_vim_info
= {}
1845 if task
.get("params"):
1846 vim_vm_id
= task
["params"].get("vim_vm_id")
1847 migrate_host
= task
["params"].get("migrate_host")
1848 _
, migrated_compute_node
= target_vim
.migrate_instance(
1849 vim_vm_id
, migrate_host
1852 if migrated_compute_node
:
1853 # When VM is migrated, vdu["vim_info"] needs to be updated
1854 vdu_old_vim_info
= task
["params"]["vdu_vim_info"].get(
1855 ro_task
["target_id"]
1858 # Refresh VM to get new vim_info
1859 vm_to_refresh_list
= [vim_vm_id
]
1860 vim_dict
= target_vim
.refresh_vms_status(vm_to_refresh_list
)
1861 refreshed_vim_info
= vim_dict
[vim_vm_id
]
1863 if refreshed_vim_info
.get("interfaces"):
1864 for old_iface
in vdu_old_vim_info
.get("interfaces"):
1868 for iface
in refreshed_vim_info
["interfaces"]
1869 if old_iface
["vim_interface_id"]
1870 == iface
["vim_interface_id"]
1874 vim_interfaces
.append(iface
)
1876 ro_vim_item_update
= {
1877 "vim_id": vim_vm_id
,
1878 "vim_status": "ACTIVE",
1879 "vim_details": None,
1880 "vim_message": None,
1883 if refreshed_vim_info
and refreshed_vim_info
.get("status") not in (
1887 ro_vim_item_update
["vim_details"] = refreshed_vim_info
["vim_info"]
1890 ro_vim_item_update
["interfaces"] = vim_interfaces
1893 "task={} {} vm-migration done".format(task_id
, ro_task
["target_id"])
1896 return "DONE", ro_vim_item_update
, db_task_update
1898 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
1900 "task={} vim={} VM Migration:"
1901 " {}".format(task_id
, ro_task
["target_id"], e
)
1903 ro_vim_item_update
= {
1904 "vim_status": "VIM_ERROR",
1905 "vim_message": str(e
),
1908 return "FAILED", ro_vim_item_update
, db_task_update
1911 class VimInteractionResize(VimInteractionBase
):
1912 def exec(self
, ro_task
, task_index
, task_depends
):
1913 task
= ro_task
["tasks"][task_index
]
1914 task_id
= task
["task_id"]
1915 db_task_update
= {"retries": 0}
1916 target_flavor_uuid
= None
1917 refreshed_vim_info
= {}
1918 target_vim
= self
.my_vims
[ro_task
["target_id"]]
1921 params
= task
["params"]
1922 params_copy
= deepcopy(params
)
1923 target_flavor_uuid
= task_depends
[params_copy
["flavor_id"]]
1925 if task
.get("params"):
1926 self
.logger
.info("vim_vm_id %s", vim_vm_id
)
1928 if target_flavor_uuid
is not None:
1929 resized_status
= target_vim
.resize_instance(
1930 vim_vm_id
, target_flavor_uuid
1934 # Refresh VM to get new vim_info
1935 vm_to_refresh_list
= [vim_vm_id
]
1936 vim_dict
= target_vim
.refresh_vms_status(vm_to_refresh_list
)
1937 refreshed_vim_info
= vim_dict
[vim_vm_id
]
1939 ro_vim_item_update
= {
1940 "vim_id": vim_vm_id
,
1941 "vim_status": "ACTIVE",
1942 "vim_details": None,
1943 "vim_message": None,
1946 if refreshed_vim_info
and refreshed_vim_info
.get("status") not in (
1950 ro_vim_item_update
["vim_details"] = refreshed_vim_info
["vim_info"]
1953 "task={} {} resize done".format(task_id
, ro_task
["target_id"])
1955 return "DONE", ro_vim_item_update
, db_task_update
1956 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
1958 "task={} vim={} Resize:" " {}".format(task_id
, ro_task
["target_id"], e
)
1960 ro_vim_item_update
= {
1961 "vim_status": "VIM_ERROR",
1962 "vim_message": str(e
),
1965 return "FAILED", ro_vim_item_update
, db_task_update
1968 class ConfigValidate
:
1969 def __init__(self
, config
: Dict
):
1974 # default 1 min, allowed >= 60 or -1, -1 disables periodic checks
1976 self
.conf
["period"]["refresh_active"] >= 60
1977 or self
.conf
["period"]["refresh_active"] == -1
1979 return self
.conf
["period"]["refresh_active"]
1985 return self
.conf
["period"]["refresh_build"]
1989 return self
.conf
["period"]["refresh_image"]
1993 return self
.conf
["period"]["refresh_error"]
1996 def queue_size(self
):
1997 return self
.conf
["period"]["queue_size"]
2000 class NsWorker(threading
.Thread
):
2001 def __init__(self
, worker_index
, config
, plugins
, db
):
2003 :param worker_index: thread index
2004 :param config: general configuration of RO, among others the process_id with the docker id where it runs
2005 :param plugins: global shared dict with the loaded plugins
2006 :param db: database class instance to use
2008 threading
.Thread
.__init
__(self
)
2009 self
.config
= config
2010 self
.plugins
= plugins
2011 self
.plugin_name
= "unknown"
2012 self
.logger
= logging
.getLogger("ro.worker{}".format(worker_index
))
2013 self
.worker_index
= worker_index
2014 # refresh periods for created items
2015 self
.refresh_config
= ConfigValidate(config
)
2016 self
.task_queue
= queue
.Queue(self
.refresh_config
.queue_size
)
2017 # targetvim: vimplugin class
2019 # targetvim: vim information from database
2022 self
.vim_targets
= []
2023 self
.my_id
= config
["process_id"] + ":" + str(worker_index
)
2026 "net": VimInteractionNet(self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
),
2027 "shared-volumes": VimInteractionSharedVolume(
2028 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
2030 "classification": VimInteractionClassification(
2031 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
2033 "sfi": VimInteractionSfi(self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
),
2034 "sf": VimInteractionSf(self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
),
2035 "sfp": VimInteractionSfp(self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
),
2036 "vdu": VimInteractionVdu(self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
),
2037 "image": VimInteractionImage(
2038 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
2040 "flavor": VimInteractionFlavor(
2041 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
2043 "sdn_net": VimInteractionSdnNet(
2044 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
2046 "update": VimInteractionUpdateVdu(
2047 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
2049 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
2050 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
2052 "migrate": VimInteractionMigration(
2053 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
2055 "verticalscale": VimInteractionResize(
2056 self
.db
, self
.my_vims
, self
.db_vims
, self
.logger
2059 self
.time_last_task_processed
= None
2060 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
2061 self
.tasks_to_delete
= []
2062 # it is idle when there are not vim_targets associated
2064 self
.task_locked_time
= config
["global"]["task_locked_time"]
2066 def insert_task(self
, task
):
2068 self
.task_queue
.put(task
, False)
2071 raise NsWorkerException("timeout inserting a task")
2073 def terminate(self
):
2074 self
.insert_task("exit")
2076 def del_task(self
, task
):
2077 with self
.task_lock
:
2078 if task
["status"] == "SCHEDULED":
2079 task
["status"] = "SUPERSEDED"
2081 else: # task["status"] == "processing"
2082 self
.task_lock
.release()
2085 def _process_vim_config(self
, target_id
: str, db_vim
: dict) -> None:
2087 Process vim config, creating vim configuration files as ca_cert
2088 :param target_id: vim/sdn/wim + id
2089 :param db_vim: Vim dictionary obtained from database
2090 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
2092 if not db_vim
.get("config"):
2096 work_dir
= "/app/osm_ro/certs"
2099 if db_vim
["config"].get("ca_cert_content"):
2100 file_name
= f
"{work_dir}/{target_id}:{self.worker_index}"
2102 if not path
.isdir(file_name
):
2105 file_name
= file_name
+ "/ca_cert"
2107 with
open(file_name
, "w") as f
:
2108 f
.write(db_vim
["config"]["ca_cert_content"])
2109 del db_vim
["config"]["ca_cert_content"]
2110 db_vim
["config"]["ca_cert"] = file_name
2111 except Exception as e
:
2112 raise NsWorkerException(
2113 "Error writing to file '{}': {}".format(file_name
, e
)
2116 def _load_plugin(self
, name
, type="vim"):
2117 # type can be vim or sdn
2118 if "rovim_dummy" not in self
.plugins
:
2119 self
.plugins
["rovim_dummy"] = VimDummyConnector
2121 if "rosdn_dummy" not in self
.plugins
:
2122 self
.plugins
["rosdn_dummy"] = SdnDummyConnector
2124 if name
in self
.plugins
:
2125 return self
.plugins
[name
]
2128 for ep
in entry_points(group
="osm_ro{}.plugins".format(type), name
=name
):
2129 self
.plugins
[name
] = ep
.load()
2130 except Exception as e
:
2131 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name
, e
))
2133 if name
and name
not in self
.plugins
:
2134 raise NsWorkerException(
2135 "Plugin 'osm_{n}' has not been installed".format(n
=name
)
2138 return self
.plugins
[name
]
2140 def _unload_vim(self
, target_id
):
2142 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
2143 :param target_id: Contains type:_id; where type can be 'vim', ...
2147 self
.db_vims
.pop(target_id
, None)
2148 self
.my_vims
.pop(target_id
, None)
2150 if target_id
in self
.vim_targets
:
2151 self
.vim_targets
.remove(target_id
)
2153 self
.logger
.info("Unloaded {}".format(target_id
))
2154 except Exception as e
:
2155 self
.logger
.error("Cannot unload {}: {}".format(target_id
, e
))
2157 def _check_vim(self
, target_id
):
2159 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
2160 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
2163 target
, _
, _id
= target_id
.partition(":")
2169 loaded
= target_id
in self
.vim_targets
2173 else "wim_accounts" if target
== "wim" else "sdns"
2178 step
= "Getting {} from db".format(target_id
)
2179 db_vim
= self
.db
.get_one(target_database
, {"_id": _id
})
2181 for op_index
, operation
in enumerate(
2182 db_vim
["_admin"].get("operations", ())
2184 if operation
["operationState"] != "PROCESSING":
2187 locked_at
= operation
.get("locked_at")
2189 if locked_at
is not None and locked_at
>= now
- self
.task_locked_time
:
2190 # some other thread is doing this operation
2194 op_text
= "_admin.operations.{}.".format(op_index
)
2196 if not self
.db
.set_one(
2200 op_text
+ "operationState": "PROCESSING",
2201 op_text
+ "locked_at": locked_at
,
2204 op_text
+ "locked_at": now
,
2205 "admin.current_operation": op_index
,
2207 fail_on_empty
=False,
2211 unset_dict
[op_text
+ "locked_at"] = None
2212 unset_dict
["current_operation"] = None
2213 step
= "Loading " + target_id
2214 error_text
= self
._load
_vim
(target_id
)
2217 step
= "Checking connectivity"
2220 self
.my_vims
[target_id
].check_vim_connectivity()
2222 self
.my_vims
[target_id
].check_credentials()
2224 update_dict
["_admin.operationalState"] = "ENABLED"
2225 update_dict
["_admin.detailed-status"] = ""
2226 unset_dict
[op_text
+ "detailed-status"] = None
2227 update_dict
[op_text
+ "operationState"] = "COMPLETED"
2231 except Exception as e
:
2232 error_text
= "{}: {}".format(step
, e
)
2233 self
.logger
.error("{} for {}: {}".format(step
, target_id
, e
))
2236 if update_dict
or unset_dict
:
2238 update_dict
[op_text
+ "operationState"] = "FAILED"
2239 update_dict
[op_text
+ "detailed-status"] = error_text
2240 unset_dict
.pop(op_text
+ "detailed-status", None)
2241 update_dict
["_admin.operationalState"] = "ERROR"
2242 update_dict
["_admin.detailed-status"] = error_text
2245 update_dict
[op_text
+ "statusEnteredTime"] = now
2249 q_filter
={"_id": _id
},
2250 update_dict
=update_dict
,
2252 fail_on_empty
=False,
2256 self
._unload
_vim
(target_id
)
2258 def _reload_vim(self
, target_id
):
2259 if target_id
in self
.vim_targets
:
2260 self
._load
_vim
(target_id
)
2262 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
2263 # just remove it to force load again next time it is needed
2264 self
.db_vims
.pop(target_id
, None)
2266 def _load_vim(self
, target_id
):
2268 Load or reload a vim_account, sdn_controller or wim_account.
2269 Read content from database, load the plugin if not loaded.
2270 In case of error loading the plugin, it loads a failing VIM_connector
2271 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
2272 :param target_id: Contains type:_id; where type can be 'vim', ...
2273 :return: None if ok, descriptive text if error
2275 target
, _
, _id
= target_id
.partition(":")
2279 else "wim_accounts" if target
== "wim" else "sdns"
2283 step
= "Getting {}={} from db".format(target
, _id
)
2286 # TODO process for wim, sdnc, ...
2287 vim
= self
.db
.get_one(target_database
, {"_id": _id
})
2289 # if deep_get(vim, "config", "sdn-controller"):
2290 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
2291 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
2293 step
= "Decrypting password"
2294 schema_version
= vim
.get("schema_version")
2295 self
.db
.encrypt_decrypt_fields(
2298 fields
=("password", "secret"),
2299 schema_version
=schema_version
,
2302 self
._process
_vim
_config
(target_id
, vim
)
2305 plugin_name
= "rovim_" + vim
["vim_type"]
2306 step
= "Loading plugin '{}'".format(plugin_name
)
2307 vim_module_conn
= self
._load
_plugin
(plugin_name
)
2308 step
= "Loading {}'".format(target_id
)
2309 self
.my_vims
[target_id
] = vim_module_conn(
2312 tenant_id
=vim
.get("vim_tenant_id"),
2313 tenant_name
=vim
.get("vim_tenant_name"),
2316 user
=vim
["vim_user"],
2317 passwd
=vim
["vim_password"],
2318 config
=vim
.get("config") or {},
2322 plugin_name
= "rosdn_" + (vim
.get("type") or vim
.get("wim_type"))
2323 step
= "Loading plugin '{}'".format(plugin_name
)
2324 vim_module_conn
= self
._load
_plugin
(plugin_name
, "sdn")
2325 step
= "Loading {}'".format(target_id
)
2327 wim_config
= wim
.pop("config", {}) or {}
2328 wim
["uuid"] = wim
["_id"]
2329 if "url" in wim
and "wim_url" not in wim
:
2330 wim
["wim_url"] = wim
["url"]
2331 elif "url" not in wim
and "wim_url" in wim
:
2332 wim
["url"] = wim
["wim_url"]
2335 wim_config
["dpid"] = wim
.pop("dpid")
2337 if wim
.get("switch_id"):
2338 wim_config
["switch_id"] = wim
.pop("switch_id")
2340 # wim, wim_account, config
2341 self
.my_vims
[target_id
] = vim_module_conn(wim
, wim
, wim_config
)
2342 self
.db_vims
[target_id
] = vim
2343 self
.error_status
= None
2346 "Connector loaded for {}, plugin={}".format(target_id
, plugin_name
)
2348 except Exception as e
:
2350 "Cannot load {} plugin={}: {} {}".format(
2351 target_id
, plugin_name
, step
, e
2355 self
.db_vims
[target_id
] = vim
or {}
2356 self
.db_vims
[target_id
] = FailingConnector(str(e
))
2357 error_status
= "{} Error: {}".format(step
, e
)
2361 if target_id
not in self
.vim_targets
:
2362 self
.vim_targets
.append(target_id
)
2364 def _get_db_task(self
):
2366 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
2371 if not self
.time_last_task_processed
:
2372 self
.time_last_task_processed
= now
2377 # Log RO tasks only when loglevel is DEBUG
2378 if self.logger.getEffectiveLevel() == logging.DEBUG:
2385 + str(self.task_locked_time)
2387 + "time_last_task_processed="
2388 + str(self.time_last_task_processed)
2394 locked
= self
.db
.set_one(
2397 "target_id": self
.vim_targets
,
2398 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
2399 "locked_at.lt": now
- self
.task_locked_time
,
2400 "to_check_at.lt": self
.time_last_task_processed
,
2401 "to_check_at.gt": -1,
2403 update_dict
={"locked_by": self
.my_id
, "locked_at": now
},
2404 fail_on_empty
=False,
2409 ro_task
= self
.db
.get_one(
2412 "target_id": self
.vim_targets
,
2413 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
2419 if self
.time_last_task_processed
== now
:
2420 self
.time_last_task_processed
= None
2423 self
.time_last_task_processed
= now
2424 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
2426 except DbException
as e
:
2427 self
.logger
.error("Database exception at _get_db_task: {}".format(e
))
2428 except Exception as e
:
2429 self
.logger
.critical(
2430 "Unexpected exception at _get_db_task: {}".format(e
), exc_info
=True
2435 def _delete_task(self
, ro_task
, task_index
, task_depends
, db_update
):
2437 Determine if this task need to be done or superseded
2440 my_task
= ro_task
["tasks"][task_index
]
2441 task_id
= my_task
["task_id"]
2442 needed_delete
= ro_task
["vim_info"]["created"] or ro_task
["vim_info"].get(
2443 "created_items", False
2446 self
.logger
.debug("Needed delete: {}".format(needed_delete
))
2447 if my_task
["status"] == "FAILED":
2448 return None, None # TODO need to be retry??
2451 for index
, task
in enumerate(ro_task
["tasks"]):
2452 if index
== task_index
or not task
:
2456 my_task
["target_record"] == task
["target_record"]
2457 and task
["action"] == "CREATE"
2460 db_update
["tasks.{}.status".format(index
)] = task
["status"] = (
2463 elif task
["action"] == "CREATE" and task
["status"] not in (
2467 needed_delete
= False
2471 "Deleting ro_task={} task_index={}".format(ro_task
, task_index
)
2473 return self
.item2class
[my_task
["item"]].delete(ro_task
, task_index
)
2475 return "SUPERSEDED", None
2476 except Exception as e
:
2477 if not isinstance(e
, NsWorkerException
):
2478 self
.logger
.critical(
2479 "Unexpected exception at _delete_task task={}: {}".format(
2485 return "FAILED", {"vim_status": "VIM_ERROR", "vim_message": str(e
)}
2487 def _create_task(self
, ro_task
, task_index
, task_depends
, db_update
):
2489 Determine if this task need to create something at VIM
2492 my_task
= ro_task
["tasks"][task_index
]
2493 task_id
= my_task
["task_id"]
2495 if my_task
["status"] == "FAILED":
2496 return None, None # TODO need to be retry??
2497 elif my_task
["status"] == "SCHEDULED":
2498 # check if already created by another task
2499 for index
, task
in enumerate(ro_task
["tasks"]):
2500 if index
== task_index
or not task
:
2503 if task
["action"] == "CREATE" and task
["status"] not in (
2508 return task
["status"], "COPY_VIM_INFO"
2511 task_status
, ro_vim_item_update
= self
.item2class
[my_task
["item"]].new(
2512 ro_task
, task_index
, task_depends
2514 # TODO update other CREATE tasks
2515 except Exception as e
:
2516 if not isinstance(e
, NsWorkerException
):
2518 "Error executing task={}: {}".format(task_id
, e
), exc_info
=True
2521 task_status
= "FAILED"
2522 ro_vim_item_update
= {"vim_status": "VIM_ERROR", "vim_message": str(e
)}
2523 # TODO update ro_vim_item_update
2525 return task_status
, ro_vim_item_update
2529 def _get_dependency(self
, task_id
, ro_task
=None, target_id
=None):
2531 Look for dependency task
2532 :param task_id: Can be one of
2533 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2534 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2535 3. task.task_id: "<action_id>:number"
2538 :return: database ro_task plus index of task
2541 task_id
.startswith("vim:")
2542 or task_id
.startswith("sdn:")
2543 or task_id
.startswith("wim:")
2545 target_id
, _
, task_id
= task_id
.partition(" ")
2547 if task_id
.startswith("nsrs:") or task_id
.startswith("vnfrs:"):
2548 ro_task_dependency
= self
.db
.get_one(
2550 q_filter
={"target_id": target_id
, "tasks.target_record_id": task_id
},
2551 fail_on_empty
=False,
2554 if ro_task_dependency
:
2555 for task_index
, task
in enumerate(ro_task_dependency
["tasks"]):
2556 if task
["target_record_id"] == task_id
:
2557 return ro_task_dependency
, task_index
2561 for task_index
, task
in enumerate(ro_task
["tasks"]):
2562 if task
and task
["task_id"] == task_id
:
2563 return ro_task
, task_index
2565 ro_task_dependency
= self
.db
.get_one(
2568 "tasks.ANYINDEX.task_id": task_id
,
2569 "tasks.ANYINDEX.target_record.ne": None,
2571 fail_on_empty
=False,
2574 self
.logger
.debug("ro_task_dependency={}".format(ro_task_dependency
))
2575 if ro_task_dependency
:
2576 for task_index
, task
in enumerate(ro_task_dependency
["tasks"]):
2577 if task
["task_id"] == task_id
:
2578 return ro_task_dependency
, task_index
2579 raise NsWorkerException("Cannot get depending task {}".format(task_id
))
2581 def update_vm_refresh(self
, ro_task
):
2582 """Enables the VM status updates if self.refresh_config.active parameter
2583 is not -1 and then updates the DB accordingly
2587 self
.logger
.debug("Checking if VM status update config")
2588 next_refresh
= time
.time()
2589 next_refresh
= self
._get
_next
_refresh
(ro_task
, next_refresh
)
2591 if next_refresh
!= -1:
2592 db_ro_task_update
= {}
2594 next_check_at
= now
+ (24 * 60 * 60)
2595 next_check_at
= min(next_check_at
, next_refresh
)
2596 db_ro_task_update
["vim_info.refresh_at"] = next_refresh
2597 db_ro_task_update
["to_check_at"] = next_check_at
2600 "Finding tasks which to be updated to enable VM status updates"
2602 refresh_tasks
= self
.db
.get_list(
2605 "tasks.status": "DONE",
2606 "to_check_at.lt": 0,
2609 self
.logger
.debug("Updating tasks to change the to_check_at status")
2610 for task
in refresh_tasks
:
2617 update_dict
=db_ro_task_update
,
2621 except Exception as e
:
2622 self
.logger
.error(f
"Error updating tasks to enable VM status updates: {e}")
2624 def _get_next_refresh(self
, ro_task
: dict, next_refresh
: float):
2625 """Decide the next_refresh according to vim type and refresh config period.
2627 ro_task (dict): ro_task details
2628 next_refresh (float): next refresh time as epoch format
2631 next_refresh (float) -1 if vm updates are disabled or vim type is openstack.
2633 target_vim
= ro_task
["target_id"]
2634 vim_type
= self
.db_vims
[target_vim
]["vim_type"]
2635 if self
.refresh_config
.active
== -1 or vim_type
== "openstack":
2638 next_refresh
+= self
.refresh_config
.active
2641 def _process_pending_tasks(self
, ro_task
):
2642 ro_task_id
= ro_task
["_id"]
2645 next_check_at
= now
+ (24 * 60 * 60)
2646 db_ro_task_update
= {}
2648 def _update_refresh(new_status
):
2649 # compute next_refresh
2651 nonlocal next_check_at
2652 nonlocal db_ro_task_update
2655 next_refresh
= time
.time()
2657 if task
["item"] in ("image", "flavor"):
2658 next_refresh
+= self
.refresh_config
.image
2659 elif new_status
== "BUILD":
2660 next_refresh
+= self
.refresh_config
.build
2661 elif new_status
== "DONE":
2662 next_refresh
= self
._get
_next
_refresh
(ro_task
, next_refresh
)
2664 next_refresh
+= self
.refresh_config
.error
2666 next_check_at
= min(next_check_at
, next_refresh
)
2667 db_ro_task_update
["vim_info.refresh_at"] = next_refresh
2668 ro_task
["vim_info"]["refresh_at"] = next_refresh
2672 # Log RO tasks only when loglevel is DEBUG
2673 if self.logger.getEffectiveLevel() == logging.DEBUG:
2674 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
2676 # Check if vim status refresh is enabled again
2677 self
.update_vm_refresh(ro_task
)
2678 # 0: get task_status_create
2680 task_status_create
= None
2684 for t
in ro_task
["tasks"]
2686 and t
["action"] == "CREATE"
2687 and t
["status"] in ("BUILD", "DONE")
2693 task_status_create
= task_create
["status"]
2695 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
2696 for task_action
in ("DELETE", "CREATE", "EXEC"):
2697 db_vim_update
= None
2700 for task_index
, task
in enumerate(ro_task
["tasks"]):
2702 continue # task deleted
2705 target_update
= None
2709 task_action
in ("DELETE", "EXEC")
2710 and task
["status"] not in ("SCHEDULED", "BUILD")
2712 or task
["action"] != task_action
2714 task_action
== "CREATE"
2715 and task
["status"] in ("FINISHED", "SUPERSEDED")
2720 task_path
= "tasks.{}.status".format(task_index
)
2722 db_vim_info_update
= None
2723 dependency_ro_task
= {}
2725 if task
["status"] == "SCHEDULED":
2726 # check if tasks that this depends on have been completed
2727 dependency_not_completed
= False
2729 for dependency_task_id
in task
.get("depends_on") or ():
2732 dependency_task_index
,
2733 ) = self
._get
_dependency
(
2734 dependency_task_id
, target_id
=ro_task
["target_id"]
2736 dependency_task
= dependency_ro_task
["tasks"][
2737 dependency_task_index
2740 "dependency_ro_task={} dependency_task_index={}".format(
2741 dependency_ro_task
, dependency_task_index
2745 if dependency_task
["status"] == "SCHEDULED":
2746 dependency_not_completed
= True
2747 next_check_at
= min(
2748 next_check_at
, dependency_ro_task
["to_check_at"]
2750 # must allow dependent task to be processed first
2751 # to do this set time after last_task_processed
2752 next_check_at
= max(
2753 self
.time_last_task_processed
, next_check_at
2756 elif dependency_task
["status"] == "FAILED":
2757 error_text
= "Cannot {} {} because depends on failed {} {} id={}): {}".format(
2760 dependency_task
["action"],
2761 dependency_task
["item"],
2763 dependency_ro_task
["vim_info"].get(
2768 "task={} {}".format(task
["task_id"], error_text
)
2770 raise NsWorkerException(error_text
)
2772 task_depends
[dependency_task_id
] = dependency_ro_task
[
2775 task_depends
["TASK-{}".format(dependency_task_id
)] = (
2776 dependency_ro_task
["vim_info"]["vim_id"]
2779 if dependency_not_completed
:
2780 self
.logger
.warning(
2781 "DEPENDENCY NOT COMPLETED {}".format(
2782 dependency_ro_task
["vim_info"]["vim_id"]
2785 # TODO set at vim_info.vim_details that it is waiting
2788 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
2789 # the task of renew this locking. It will update database locket_at periodically
2791 lock_object
= LockRenew
.add_lock_object(
2792 "ro_tasks", ro_task
, self
2794 if task
["action"] == "DELETE":
2798 ) = self
._delete
_task
(
2799 ro_task
, task_index
, task_depends
, db_ro_task_update
2802 "FINISHED" if new_status
== "DONE" else new_status
2804 # ^with FINISHED instead of DONE it will not be refreshing
2806 if new_status
in ("FINISHED", "SUPERSEDED"):
2807 target_update
= "DELETE"
2808 elif task
["action"] == "EXEC":
2813 ) = self
.item2class
[task
["item"]].exec(
2814 ro_task
, task_index
, task_depends
2817 "FINISHED" if new_status
== "DONE" else new_status
2819 # ^with FINISHED instead of DONE it will not be refreshing
2822 # load into database the modified db_task_update "retries" and "next_retry"
2823 if db_task_update
.get("retries"):
2825 "tasks.{}.retries".format(task_index
)
2826 ] = db_task_update
["retries"]
2828 next_check_at
= time
.time() + db_task_update
.get(
2831 target_update
= None
2832 elif task
["action"] == "CREATE":
2833 if task
["status"] == "SCHEDULED":
2834 if task_status_create
:
2835 new_status
= task_status_create
2836 target_update
= "COPY_VIM_INFO"
2838 new_status
, db_vim_info_update
= self
.item2class
[
2840 ].new(ro_task
, task_index
, task_depends
)
2841 _update_refresh(new_status
)
2843 refresh_at
= ro_task
["vim_info"]["refresh_at"]
2844 if refresh_at
and refresh_at
!= -1 and now
> refresh_at
:
2848 ) = self
.item2class
[
2851 _update_refresh(new_status
)
2853 # The refresh is updated to avoid set the value of "refresh_at" to
2854 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2855 # because it can happen that in this case the task is never processed
2856 _update_refresh(task
["status"])
2858 except Exception as e
:
2859 new_status
= "FAILED"
2860 db_vim_info_update
= {
2861 "vim_status": "VIM_ERROR",
2862 "vim_message": str(e
),
2866 e
, (NsWorkerException
, vimconn
.VimConnException
)
2869 "Unexpected exception at _delete_task task={}: {}".format(
2876 if db_vim_info_update
:
2877 db_vim_update
= db_vim_info_update
.copy()
2878 db_ro_task_update
.update(
2881 for k
, v
in db_vim_info_update
.items()
2884 ro_task
["vim_info"].update(db_vim_info_update
)
2887 if task_action
== "CREATE":
2888 task_status_create
= new_status
2889 db_ro_task_update
[task_path
] = new_status
2891 if target_update
or db_vim_update
:
2892 if target_update
== "DELETE":
2893 self
._update
_target
(task
, None)
2894 elif target_update
== "COPY_VIM_INFO":
2895 self
._update
_target
(task
, ro_task
["vim_info"])
2897 self
._update
_target
(task
, db_vim_update
)
2899 except Exception as e
:
2901 isinstance(e
, DbException
)
2902 and e
.http_code
== HTTPStatus
.NOT_FOUND
2904 # if the vnfrs or nsrs has been removed from database, this task must be removed
2906 "marking to delete task={}".format(task
["task_id"])
2908 self
.tasks_to_delete
.append(task
)
2911 "Unexpected exception at _update_target task={}: {}".format(
2917 locked_at
= ro_task
["locked_at"]
2921 lock_object
["locked_at"],
2922 lock_object
["locked_at"] + self
.task_locked_time
,
2924 # locked_at contains two times to avoid race condition. In case the lock has been renewed, it will
2925 # contain exactly locked_at + self.task_locked_time
2926 LockRenew
.remove_lock_object(lock_object
)
2929 "_id": ro_task
["_id"],
2930 "to_check_at": ro_task
["to_check_at"],
2931 "locked_at": locked_at
,
2933 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2934 # outside this task (by ro_nbi) do not update it
2935 db_ro_task_update
["locked_by"] = None
2936 # locked_at converted to int only for debugging. When it is not decimals it means it has been unlocked
2937 db_ro_task_update
["locked_at"] = int(now
- self
.task_locked_time
)
2938 db_ro_task_update
["modified_at"] = now
2939 db_ro_task_update
["to_check_at"] = next_check_at
2942 # Log RO tasks only when loglevel is DEBUG
2943 if self.logger.getEffectiveLevel() == logging.DEBUG:
2944 db_ro_task_update_log = db_ro_task_update.copy()
2945 db_ro_task_update_log["_id"] = q_filter["_id"]
2946 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2949 if not self
.db
.set_one(
2951 update_dict
=db_ro_task_update
,
2953 fail_on_empty
=False,
2955 del db_ro_task_update
["to_check_at"]
2956 del q_filter
["to_check_at"]
2958 # Log RO tasks only when loglevel is DEBUG
2959 if self.logger.getEffectiveLevel() == logging.DEBUG:
2962 db_ro_task_update_log,
2965 "SET_TASK " + str(q_filter),
2971 update_dict
=db_ro_task_update
,
2974 except DbException
as e
:
2976 "ro_task={} Error updating database {}".format(ro_task_id
, e
)
2978 except Exception as e
:
2980 "Error executing ro_task={}: {}".format(ro_task_id
, e
), exc_info
=True
2983 def _update_target(self
, task
, ro_vim_item_update
):
2984 table
, _
, temp
= task
["target_record"].partition(":")
2985 _id
, _
, path_vim_status
= temp
.partition(":")
2986 path_item
= path_vim_status
[: path_vim_status
.rfind(".")]
2987 path_item
= path_item
[: path_item
.rfind(".")]
2988 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
2989 # path_item: dot separated list targeting record information, e.g. "vdur.10"
2991 if ro_vim_item_update
:
2993 path_vim_status
+ "." + k
: v
2994 for k
, v
in ro_vim_item_update
.items()
3003 "interfaces_backup",
3007 if path_vim_status
.startswith("vdur."):
3008 # for backward compatibility, add vdur.name apart from vdur.vim_name
3009 if ro_vim_item_update
.get("vim_name"):
3010 update_dict
[path_item
+ ".name"] = ro_vim_item_update
["vim_name"]
3012 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
3013 if ro_vim_item_update
.get("vim_id"):
3014 update_dict
[path_item
+ ".vim-id"] = ro_vim_item_update
["vim_id"]
3016 # update general status
3017 if ro_vim_item_update
.get("vim_status"):
3018 update_dict
[path_item
+ ".status"] = ro_vim_item_update
[
3022 if ro_vim_item_update
.get("interfaces"):
3023 path_interfaces
= path_item
+ ".interfaces"
3025 for i
, iface
in enumerate(ro_vim_item_update
.get("interfaces")):
3029 path_interfaces
+ ".{}.".format(i
) + k
: v
3030 for k
, v
in iface
.items()
3031 if k
in ("vlan", "compute_node", "pci")
3035 # put ip_address and mac_address with ip-address and mac-address
3036 if iface
.get("ip_address"):
3038 path_interfaces
+ ".{}.".format(i
) + "ip-address"
3039 ] = iface
["ip_address"]
3041 if iface
.get("mac_address"):
3043 path_interfaces
+ ".{}.".format(i
) + "mac-address"
3044 ] = iface
["mac_address"]
3046 if iface
.get("mgmt_vnf_interface") and iface
.get("ip_address"):
3047 update_dict
["ip-address"] = iface
.get("ip_address").split(
3051 if iface
.get("mgmt_vdu_interface") and iface
.get("ip_address"):
3052 update_dict
[path_item
+ ".ip-address"] = iface
.get(
3056 self
.db
.set_one(table
, q_filter
={"_id": _id
}, update_dict
=update_dict
)
3058 # If interfaces exists, it backups VDU interfaces in the DB for healing operations
3059 if ro_vim_item_update
.get("interfaces"):
3060 search_key
= path_vim_status
+ ".interfaces"
3061 if update_dict
.get(search_key
):
3062 interfaces_backup_update
= {
3063 path_vim_status
+ ".interfaces_backup": update_dict
[search_key
]
3068 q_filter
={"_id": _id
},
3069 update_dict
=interfaces_backup_update
,
3073 update_dict
= {path_item
+ ".status": "DELETED"}
3076 q_filter
={"_id": _id
},
3077 update_dict
=update_dict
,
3078 unset
={path_vim_status
: None},
3081 def _process_delete_db_tasks(self
):
3083 Delete task from database because vnfrs or nsrs or both have been deleted
3084 :return: None. Uses and modify self.tasks_to_delete
3086 while self
.tasks_to_delete
:
3087 task
= self
.tasks_to_delete
[0]
3088 vnfrs_deleted
= None
3089 nsr_id
= task
["nsr_id"]
3091 if task
["target_record"].startswith("vnfrs:"):
3092 # check if nsrs is present
3093 if self
.db
.get_one("nsrs", {"_id": nsr_id
}, fail_on_empty
=False):
3094 vnfrs_deleted
= task
["target_record"].split(":")[1]
3097 self
.delete_db_tasks(self
.db
, nsr_id
, vnfrs_deleted
)
3098 except Exception as e
:
3100 "Error deleting task={}: {}".format(task
["task_id"], e
)
3102 self
.tasks_to_delete
.pop(0)
3105 def delete_db_tasks(db
, nsr_id
, vnfrs_deleted
):
3107 Static method because it is called from osm_ng_ro.ns
3108 :param db: instance of database to use
3109 :param nsr_id: affected nsrs id
3110 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
3111 :return: None, exception is fails
3114 for retry
in range(retries
):
3115 ro_tasks
= db
.get_list("ro_tasks", {"tasks.nsr_id": nsr_id
})
3119 for ro_task
in ro_tasks
:
3121 to_delete_ro_task
= True
3123 for index
, task
in enumerate(ro_task
["tasks"]):
3126 elif (not vnfrs_deleted
and task
["nsr_id"] == nsr_id
) or (
3128 and task
["target_record"].startswith("vnfrs:" + vnfrs_deleted
)
3130 db_update
["tasks.{}".format(index
)] = None
3132 # used by other nsr, ro_task cannot be deleted
3133 to_delete_ro_task
= False
3135 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
3136 if to_delete_ro_task
:
3140 "_id": ro_task
["_id"],
3141 "modified_at": ro_task
["modified_at"],
3143 fail_on_empty
=False,
3147 db_update
["modified_at"] = now
3151 "_id": ro_task
["_id"],
3152 "modified_at": ro_task
["modified_at"],
3154 update_dict
=db_update
,
3155 fail_on_empty
=False,
3161 raise NsWorkerException("Exceeded {} retries".format(retries
))
3165 self
.logger
.info("Starting")
3167 # step 1: get commands from queue
3169 if self
.vim_targets
:
3170 task
= self
.task_queue
.get(block
=False)
3173 self
.logger
.debug("enters in idle state")
3175 task
= self
.task_queue
.get(block
=True)
3178 if task
[0] == "terminate":
3180 elif task
[0] == "load_vim":
3181 self
.logger
.info("order to load vim {}".format(task
[1]))
3182 self
._load
_vim
(task
[1])
3183 elif task
[0] == "unload_vim":
3184 self
.logger
.info("order to unload vim {}".format(task
[1]))
3185 self
._unload
_vim
(task
[1])
3186 elif task
[0] == "reload_vim":
3187 self
._reload
_vim
(task
[1])
3188 elif task
[0] == "check_vim":
3189 self
.logger
.info("order to check vim {}".format(task
[1]))
3190 self
._check
_vim
(task
[1])
3192 except Exception as e
:
3193 if isinstance(e
, queue
.Empty
):
3196 self
.logger
.critical(
3197 "Error processing task: {}".format(e
), exc_info
=True
3200 # step 2: process pending_tasks, delete not needed tasks
3202 if self
.tasks_to_delete
:
3203 self
._process
_delete
_db
_tasks
()
3206 # Log RO tasks only when loglevel is DEBUG
3207 if self.logger.getEffectiveLevel() == logging.DEBUG:
3208 _ = self._get_db_all_tasks()
3210 ro_task
= self
._get
_db
_task
()
3212 self
.logger
.debug("Task to process: {}".format(ro_task
))
3214 self
._process
_pending
_tasks
(ro_task
)
3218 except Exception as e
:
3219 self
.logger
.critical(
3220 "Unexpected exception at run: " + str(e
), exc_info
=True
3223 self
.logger
.info("Finishing")