from http import HTTPStatus
import logging
from random import choice as random_choice
-
-import yaml
from threading import Lock
from time import time
from traceback import format_exc as traceback_format_exc
from osm_common.msgbase import MsgException
from osm_ng_ro.ns_thread import deep_get, NsWorker, NsWorkerException
from osm_ng_ro.validation import deploy_schema, validate_input
+import yaml
__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
min_common_version = "0.1.16"
"vim_name": None,
"vim_status": None,
"vim_details": None,
+ "vim_message": None,
"refresh_at": None,
},
"modified_at": now,
extra_dict = {"depends_on": [image_text, flavor_text]}
net_list = []
+ # If the position info is provided for all the interfaces, it will be sorted
+ # according to position number ascendingly.
+ if all(i.get("position") for i in target_vdu["interfaces"]):
+ sorted_interfaces = sorted(
+ target_vdu["interfaces"],
+ key=lambda x: (x.get("position") is None, x.get("position")),
+ )
+ target_vdu["interfaces"] = sorted_interfaces
+
+ # If the position info is provided for some interfaces but not all of them, the interfaces
+ # which has specific position numbers will be placed and others' positions will not be taken care.
+ else:
+ if any(i.get("position") for i in target_vdu["interfaces"]):
+ n = len(target_vdu["interfaces"])
+ sorted_interfaces = [-1] * n
+ k, m = 0, 0
+ while k < n:
+ if target_vdu["interfaces"][k].get("position"):
+ idx = target_vdu["interfaces"][k]["position"]
+ sorted_interfaces[idx - 1] = target_vdu["interfaces"][k]
+ k += 1
+ while m < n:
+ if not target_vdu["interfaces"][m].get("position"):
+ idy = sorted_interfaces.index(-1)
+ sorted_interfaces[idy] = target_vdu["interfaces"][m]
+ m += 1
+
+ target_vdu["interfaces"] = sorted_interfaces
+
+ # If the position info is not provided for the interfaces, interfaces will be attached
+ # according to the order in the VNFD.
for iface_index, interface in enumerate(target_vdu["interfaces"]):
if interface.get("ns-vld-id"):
net_text = ns_preffix + ":vld." + interface["ns-vld-id"]
"""
vnfr = kwargs.get("vnfr")
vdu2cloud_init = kwargs.get("vdu2cloud_init")
- #logger = kwargs.get("logger")
+ # logger = kwargs.get("logger")
db = kwargs.get("db")
fs = kwargs.get("fs")
ro_nsr_public_key = kwargs.get("ro_nsr_public_key")
"floating_ip",
)
}
- existing_ifaces = existing_vdu["vim_info"][target_id].get("interfaces", [])
+ existing_ifaces = existing_vdu["vim_info"][target_id].get(
+ "interfaces_backup", []
+ )
net_id = next(
- (i["vim_net_id"] for i in existing_ifaces if i["ip_address"] == interface["ip-address"]),
+ (
+ i["vim_net_id"]
+ for i in existing_ifaces
+ if i["ip_address"] == interface["ip-address"]
+ ),
None,
)
affinity_group = {}
for affinity_group_id in existing_vdu["affinity-or-anti-affinity-group-id"]:
for group in db_nsr.get("affinity-or-anti-affinity-group"):
- if group["id"] == affinity_group_id and group["vim_info"][target_id].get("vim_id", None) is not None:
- affinity_group["affinity_group_id"] = group["vim_info"][target_id].get("vim_id", None)
+ if (
+ group["id"] == affinity_group_id
+ and group["vim_info"][target_id].get("vim_id", None) is not None
+ ):
+ affinity_group["affinity_group_id"] = group["vim_info"][
+ target_id
+ ].get("vim_id", None)
affinity_group_list.append(affinity_group)
extra_dict["params"] = {
extra_dict=change.get("extra_dict", None),
)
- self.logger.warning(
- "ns.define_all_tasks task={}".format(task)
- )
+ self.logger.warning("ns.define_all_tasks task={}".format(task))
tasks_by_target_record_id[change["target_record_id"]] = task
db_new_tasks.append(task)
target_id = db_task.pop("target_id")
common_id = db_task.get("common_id")
+ # Do not chek tasks with vim_status DELETED
+ # because in manual heealing there are two tasks for the same vdur:
+ # one with vim_status deleted and the other one with the actual VM status.
+
if common_id:
if self.db.set_one(
"ro_tasks",
q_filter={
"target_id": target_id,
"tasks.common_id": common_id,
+ "vim_info.vim_status.ne": "DELETED",
},
update_dict={"to_check_at": now, "modified_at": now},
push={"tasks": db_task},
q_filter={
"target_id": target_id,
"tasks.target_record": db_task["target_record"],
+ "vim_info.vim_status.ne": "DELETED",
},
update_dict={"to_check_at": now, "modified_at": now},
push={"tasks": db_task},
self.logger.debug("Updating database, Creating ro_tasks")
db_ro_task = Ns._create_ro_task(target_id, db_task)
- # If DELETE task: the associated created items shoud be removed
+ # If DELETE task: the associated created items should be removed
# (except persistent volumes):
if action == "DELETE":
db_ro_task["vim_info"]["created"] = True
- db_ro_task["vim_info"]["created_items"] = db_task.get("created_items", {})
+ db_ro_task["vim_info"]["created_items"] = db_task.get(
+ "created_items", {}
+ )
+ db_ro_task["vim_info"]["volumes_to_hold"] = db_task.get(
+ "volumes_to_hold", []
+ )
db_ro_task["vim_info"]["vim_id"] = db_task.get("vim_id", None)
nb_ro_tasks += 1
def _prepare_created_items_for_healing(
self,
- target_id,
- existing_vdu,
+ nsr_id,
+ target_record,
):
- # Only ports are considered because created volumes are persistent
- ports_list = {}
- vim_interfaces = existing_vdu["vim_info"][target_id].get("interfaces", [])
- for iface in vim_interfaces:
- ports_list["port:" + iface["vim_interface_id"]] = True
+ created_items = {}
+ # Get created_items from ro_task
+ ro_tasks = self.db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
+ for ro_task in ro_tasks:
+ for task in ro_task["tasks"]:
+ if (
+ task["target_record"] == target_record
+ and task["action"] == "CREATE"
+ and ro_task["vim_info"]["created_items"]
+ ):
+ created_items = ro_task["vim_info"]["created_items"]
+ break
- return ports_list
+ return created_items
def _prepare_persistent_volumes_for_healing(
self,
item_index = 0
existing_instance = None
for instance in existing_vnf.get("vdur", None):
- if (instance["vdu-name"] == vdu_name and instance["count-index"] == count_index):
+ if (
+ instance["vdu-name"] == vdu_name
+ and instance["count-index"] == count_index
+ ):
existing_instance = instance
break
else:
target_record = f"{db_record}.{item_index}.vim_info.{target_vim}"
created_items = self._prepare_created_items_for_healing(
- target_vim, existing_instance
+ nsr_id, target_record
)
volumes_to_hold = self._prepare_persistent_volumes_for_healing(
# The CREATE task depens on the DELETE task
extra_dict["depends_on"] = [delete_task_id]
+ # Add volumes created from created_items if any
+ # Ports should be deleted with delete task and automatically created with create task
+ volumes = {}
+ for k, v in created_items.items():
+ try:
+ k_item, _, k_id = k.partition(":")
+ if k_item == "volume":
+ volumes[k] = v
+ except Exception as e:
+ self.logger.error(
+ "Error evaluating created item {}: {}".format(k, e)
+ )
+ extra_dict["previous_created_volumes"] = volumes
+
deployment_info = {
"action_id": action_id,
"nsr_id": nsr_id,
)
# Delete all ro_tasks registered for the targets vdurs (target_record)
- # If task of type CREATE exist then vim will try to get info form deleted VMs.
+ # If task of type CREATE exist then vim will try to get info form deleted VMs.
# So remove all task related to target record.
ro_tasks = self.db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
for change in changes_list:
},
fail_on_empty=False,
)
-
+
step = "Updating database, Appending tasks to ro_tasks"
self.upload_recreate_tasks(
db_new_tasks=db_new_tasks,
error_text = "Error at {} {}: {}".format(
task["action"].lower(),
task["item"],
- ro_task["vim_info"].get("vim_details") or "unknown",
+ ro_task["vim_info"].get("vim_message") or "unknown",
)
details.append(error_text)
elif task["status"] in ("SCHEDULED", "BUILD"):
return None, None, True
+ def rebuild_start_stop_task(
+ self,
+ vdu_id,
+ vnf_id,
+ vdu_index,
+ action_id,
+ nsr_id,
+ task_index,
+ target_vim,
+ extra_dict,
+ ):
+ self._assign_vim(target_vim)
+ target_record = "vnfrs:{}:vdur.{}".format(vnf_id, vdu_index)
+ target_record_id = "vnfrs:{}:vdur.{}".format(vnf_id, vdu_id)
+ deployment_info = {
+ "action_id": action_id,
+ "nsr_id": nsr_id,
+ "task_index": task_index,
+ }
+
+ task = Ns._create_task(
+ deployment_info=deployment_info,
+ target_id=target_vim,
+ item="update",
+ action="EXEC",
+ target_record=target_record,
+ target_record_id=target_record_id,
+ extra_dict=extra_dict,
+ )
+ return task
+
+ def rebuild_start_stop(
+ self, session, action_dict, version, nsr_id, *args, **kwargs
+ ):
+ task_index = 0
+ extra_dict = {}
+ now = time()
+ action_id = action_dict.get("action_id", str(uuid4()))
+ step = ""
+ logging_text = "Task deploy nsr_id={} action_id={} ".format(nsr_id, action_id)
+ self.logger.debug(logging_text + "Enter")
+
+ action = list(action_dict.keys())[0]
+ task_dict = action_dict.get(action)
+ vim_vm_id = action_dict.get(action).get("vim_vm_id")
+
+ if action_dict.get("stop"):
+ action = "shutoff"
+ db_new_tasks = []
+ try:
+ step = "lock the operation & do task creation"
+ with self.write_lock:
+ extra_dict["params"] = {
+ "vim_vm_id": vim_vm_id,
+ "action": action,
+ }
+ task = self.rebuild_start_stop_task(
+ task_dict["vdu_id"],
+ task_dict["vnf_id"],
+ task_dict["vdu_index"],
+ action_id,
+ nsr_id,
+ task_index,
+ task_dict["target_vim"],
+ extra_dict,
+ )
+ db_new_tasks.append(task)
+ step = "upload Task to db"
+ self.upload_all_tasks(
+ db_new_tasks=db_new_tasks,
+ now=now,
+ )
+ self.logger.debug(
+ logging_text + "Exit. Created {} tasks".format(len(db_new_tasks))
+ )
+ return (
+ {"status": "ok", "nsr_id": nsr_id, "action_id": action_id},
+ action_id,
+ True,
+ )
+ except Exception as e:
+ if isinstance(e, (DbException, NsException)):
+ self.logger.error(
+ logging_text + "Exit Exception while '{}': {}".format(step, e)
+ )
+ else:
+ e = traceback_format_exc()
+ self.logger.critical(
+ logging_text + "Exit Exception while '{}': {}".format(step, e),
+ exc_info=True,
+ )
+ raise NsException(e)
+
def get_deploy(self, session, indata, version, nsr_id, action_id, *args, **kwargs):
nsrs = self.db.get_list("nsrs", {})
return_data = []
return_data.append(task["action_id"])
return return_data, None, True
+
+ def migrate_task(
+ self, vdu, vnf, vdu_index, action_id, nsr_id, task_index, extra_dict
+ ):
+ target_vim, vim_info = next(k_v for k_v in vdu["vim_info"].items())
+ self._assign_vim(target_vim)
+ target_record = "vnfrs:{}:vdur.{}".format(vnf["_id"], vdu_index)
+ target_record_id = "vnfrs:{}:vdur.{}".format(vnf["_id"], vdu["id"])
+ deployment_info = {
+ "action_id": action_id,
+ "nsr_id": nsr_id,
+ "task_index": task_index,
+ }
+
+ task = Ns._create_task(
+ deployment_info=deployment_info,
+ target_id=target_vim,
+ item="migrate",
+ action="EXEC",
+ target_record=target_record,
+ target_record_id=target_record_id,
+ extra_dict=extra_dict,
+ )
+
+ return task
+
+ def migrate(self, session, indata, version, nsr_id, *args, **kwargs):
+ task_index = 0
+ extra_dict = {}
+ now = time()
+ action_id = indata.get("action_id", str(uuid4()))
+ step = ""
+ logging_text = "Task deploy nsr_id={} action_id={} ".format(nsr_id, action_id)
+ self.logger.debug(logging_text + "Enter")
+ try:
+ vnf_instance_id = indata["vnfInstanceId"]
+ step = "Getting vnfrs from db"
+ db_vnfr = self.db.get_one("vnfrs", {"_id": vnf_instance_id})
+ vdu = indata.get("vdu")
+ migrateToHost = indata.get("migrateToHost")
+ db_new_tasks = []
+
+ with self.write_lock:
+ if vdu is not None:
+ vdu_id = indata["vdu"]["vduId"]
+ vdu_count_index = indata["vdu"].get("vduCountIndex", 0)
+ for vdu_index, vdu in enumerate(db_vnfr["vdur"]):
+ if (
+ vdu["vdu-id-ref"] == vdu_id
+ and vdu["count-index"] == vdu_count_index
+ ):
+ extra_dict["params"] = {
+ "vim_vm_id": vdu["vim-id"],
+ "migrate_host": migrateToHost,
+ "vdu_vim_info": vdu["vim_info"],
+ }
+ step = "Creating migration task for vdu:{}".format(vdu)
+ task = self.migrate_task(
+ vdu,
+ db_vnfr,
+ vdu_index,
+ action_id,
+ nsr_id,
+ task_index,
+ extra_dict,
+ )
+ db_new_tasks.append(task)
+ task_index += 1
+ break
+ else:
+
+ for vdu_index, vdu in enumerate(db_vnfr["vdur"]):
+ extra_dict["params"] = {
+ "vim_vm_id": vdu["vim-id"],
+ "migrate_host": migrateToHost,
+ "vdu_vim_info": vdu["vim_info"],
+ }
+ step = "Creating migration task for vdu:{}".format(vdu)
+ task = self.migrate_task(
+ vdu,
+ db_vnfr,
+ vdu_index,
+ action_id,
+ nsr_id,
+ task_index,
+ extra_dict,
+ )
+ db_new_tasks.append(task)
+ task_index += 1
+
+ self.upload_all_tasks(
+ db_new_tasks=db_new_tasks,
+ now=now,
+ )
+
+ self.logger.debug(
+ logging_text + "Exit. Created {} tasks".format(len(db_new_tasks))
+ )
+ return (
+ {"status": "ok", "nsr_id": nsr_id, "action_id": action_id},
+ action_id,
+ True,
+ )
+ except Exception as e:
+ if isinstance(e, (DbException, NsException)):
+ self.logger.error(
+ logging_text + "Exit Exception while '{}': {}".format(step, e)
+ )
+ else:
+ e = traceback_format_exc()
+ self.logger.critical(
+ logging_text + "Exit Exception while '{}': {}".format(step, e),
+ exc_info=True,
+ )
+ raise NsException(e)