import logging
# import yaml
from traceback import format_exc as traceback_format_exc
-from osm_ng_ro.ns_thread import NsWorker
+from osm_ng_ro.ns_thread import NsWorker, NsWorkerException, deep_get
from osm_ng_ro.validation import validate_input, deploy_schema
from osm_common import dbmongo, dbmemory, fslocal, fsmongo, msglocal, msgkafka, version as common_version
from osm_common.dbbase import DbException
from threading import Lock
from random import choice as random_choice
from time import time
-from jinja2 import Environment, Template, meta, TemplateError, TemplateNotFound, TemplateSyntaxError
+from jinja2 import Environment, TemplateError, TemplateNotFound, StrictUndefined, UndefinedError
from cryptography.hazmat.primitives import serialization as crypto_serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.backends import default_backend as crypto_default_backend
self.msg = None
self.config = None
# self.operations = None
- self.logger = logging.getLogger("ro.ns")
+ self.logger = None
+ # ^ Getting logger inside method self.start because parent logger (ro) is not available yet.
+ # If done now it will not be linked to parent not getting its handler and level
self.map_topic = {}
self.write_lock = None
self.assignment = {}
+ self.assignment_list = []
self.next_worker = 0
self.plugins = {}
self.workers = []
"""
self.config = config
self.config["process_id"] = get_process_id() # used for HA identity
+ self.logger = logging.getLogger("ro.ns")
# check right version of common
if versiontuple(common_version) < versiontuple(min_common_version):
raise NsException("Not compatible osm/common version '{}'. Needed '{}' or higher".format(
elif config["storage"]["driver"] == "mongo":
self.fs = fsmongo.FsMongo()
self.fs.fs_connect(config["storage"])
+ elif config["storage"]["driver"] is None:
+ pass
else:
raise NsException("Invalid configuration param '{}' at '[storage]':'driver'".format(
config["storage"]["driver"]))
for worker in self.workers:
worker.insert_task(("terminate",))
- def _create_worker(self, vim_account_id):
- # TODO make use of the limit self.config["global"]["server.ns_threads"]
+ def _create_worker(self, target_id, load=True):
+ # Look for a thread not alive
worker_id = next((i for i in range(len(self.workers)) if not self.workers[i].is_alive()), None)
- if worker_id is None:
- worker_id = len(self.workers)
- self.workers.append(NsWorker(worker_id, self.config, self.plugins, self.db))
+ if worker_id:
+ # re-start worker
self.workers[worker_id].start()
- self.workers[worker_id].insert_task(("load_vim", vim_account_id))
+ else:
+ worker_id = len(self.workers)
+ if worker_id < self.config["global"]["server.ns_threads"]:
+ # create a new worker
+ self.workers.append(NsWorker(worker_id, self.config, self.plugins, self.db))
+ self.workers[worker_id].start()
+ else:
+ # reached maximum number of threads, assign VIM to an existing one
+ worker_id = self.next_worker
+ self.next_worker = (self.next_worker + 1) % self.config["global"]["server.ns_threads"]
+ if load:
+ self.workers[worker_id].insert_task(("load_vim", target_id))
return worker_id
- def _assign_vim(self, vim_account_id):
- if vim_account_id not in self.assignment:
- self.assignment[vim_account_id] = self._create_worker(vim_account_id)
+ def assign_vim(self, target_id):
+ if target_id not in self.assignment:
+ self.assignment[target_id] = self._create_worker(target_id)
+ self.assignment_list.append(target_id)
+
+ def reload_vim(self, target_id):
+ # send reload_vim to the thread working with this VIM and inform all that a VIM has been changed,
+ # this is because database VIM information is cached for threads working with SDN
+ # if target_id in self.assignment:
+ # worker_id = self.assignment[target_id]
+ # self.workers[worker_id].insert_task(("reload_vim", target_id))
+ for worker in self.workers:
+ if worker.is_alive():
+ worker.insert_task(("reload_vim", target_id))
+
+ def unload_vim(self, target_id):
+ if target_id in self.assignment:
+ worker_id = self.assignment[target_id]
+ self.workers[worker_id].insert_task(("unload_vim", target_id))
+ del self.assignment[target_id]
+ self.assignment_list.remove(target_id)
+
+ def check_vim(self, target_id):
+ if target_id in self.assignment:
+ worker_id = self.assignment[target_id]
+ else:
+ worker_id = self._create_worker(target_id, load=False)
+
+ worker = self.workers[worker_id]
+ worker.insert_task(("check_vim", target_id))
def _get_cloud_init(self, where):
"""
if _type == "file":
base_folder = vnfd["_admin"]["storage"]
cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"], name)
+ if not self.fs:
+ raise NsException("Cannot read file '{}'. Filesystem not loaded, change configuration at storage.driver"
+ .format(cloud_init_file))
with self.fs.file_open(cloud_init_file, "r") as ci_file:
cloud_init_content = ci_file.read()
elif _type == "vdu":
return cloud_init_content
def _parse_jinja2(self, cloud_init_content, params, context):
+
try:
- env = Environment()
- ast = env.parse(cloud_init_content)
- mandatory_vars = meta.find_undeclared_variables(ast)
- if mandatory_vars:
- for var in mandatory_vars:
- if not params or var not in params:
- raise NsException(
- "Variable '{}' defined at vnfd='{}' must be provided in the instantiation parameters"
- "inside the 'additionalParamsForVnf' block".format(var, context))
- template = Template(cloud_init_content)
+ env = Environment(undefined=StrictUndefined)
+ template = env.from_string(cloud_init_content)
return template.render(params or {})
-
- except (TemplateError, TemplateNotFound, TemplateSyntaxError) as e:
+ except UndefinedError as e:
+ raise NsException(
+ "Variable '{}' defined at vnfd='{}' must be provided in the instantiation parameters"
+ "inside the 'additionalParamsForVnf' block".format(e, context))
+ except (TemplateError, TemplateNotFound) as e:
raise NsException("Error parsing Jinja2 to cloud-init content at vnfd='{}': {}".format(context, e))
def _create_db_ro_nsrs(self, nsr_id, now):
crypto_serialization.PublicFormat.OpenSSH
)
private_key = private_key.decode('utf8')
+ # Change first line because Paramiko needs a explicit start with 'BEGIN RSA PRIVATE KEY'
+ i = private_key.find("\n")
+ private_key = "-----BEGIN RSA PRIVATE KEY-----" + private_key[i:]
public_key = public_key.decode('utf8')
except Exception as e:
raise NsException("Cannot create ssh-keys: {}".format(e))
return db_content
def deploy(self, session, indata, version, nsr_id, *args, **kwargs):
- print("ns.deploy session={} indata={} version={} nsr_id={}".format(session, indata, version, nsr_id))
+ self.logger.debug("ns.deploy nsr_id={} indata={}".format(nsr_id, indata))
validate_input(indata, deploy_schema)
action_id = indata.get("action_id", str(uuid4()))
task_index = 0
# get current deployment
- db_nsr = None
- # db_nslcmop = None
db_nsr_update = {} # update operation on nsrs
db_vnfrs_update = {}
- # db_nslcmop_update = {} # update operation on nslcmops
db_vnfrs = {} # vnf's info indexed by _id
- vdu2cloud_init = {}
+ nb_ro_tasks = 0 # for logging
+ vdu2cloud_init = indata.get("cloud_init_content") or {}
step = ''
logging_text = "Task deploy nsr_id={} action_id={} ".format(nsr_id, action_id)
self.logger.debug(logging_text + "Enter")
try:
step = "Getting ns and vnfr record from db"
- # db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
- db_ro_tasks = []
db_new_tasks = []
+ tasks_by_target_record_id = {}
# read from db: vnf's of this ns
step = "Getting vnfrs from db"
db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
break
index += 1
- def _create_task(item, action, target_record, target_record_id, extra_dict=None):
+ def _create_task(target_id, item, action, target_record, target_record_id, extra_dict=None):
nonlocal task_index
nonlocal action_id
nonlocal nsr_id
task = {
+ "target_id": target_id, # it will be removed before pushing at database
"action_id": action_id,
"nsr_id": nsr_id,
"task_id": "{}:{}".format(action_id, task_index),
task_index += 1
return task
- def _create_ro_task(vim_account_id, item, action, target_record, target_record_id, extra_dict=None):
+ def _create_ro_task(target_id, task):
nonlocal action_id
nonlocal task_index
nonlocal now
- _id = action_id + ":" + str(task_index)
+ _id = task["task_id"]
db_ro_task = {
"_id": _id,
"locked_by": None,
"locked_at": 0.0,
- "target_id": "vim:" + vim_account_id,
+ "target_id": target_id,
"vim_info": {
"created": False,
"created_items": None,
"modified_at": now,
"created_at": now,
"to_check_at": now,
- "tasks": [_create_task(item, action, target_record, target_record_id, extra_dict)],
+ "tasks": [task],
}
return db_ro_task
- def _process_image_params(target_image, vim_info):
+ def _process_image_params(target_image, vim_info, target_record_id):
find_params = {}
if target_image.get("image"):
find_params["filter_dict"] = {"name": target_image.get("image")}
find_params["filter_dict"] = {"checksum": target_image.get("image_checksum")}
return {"find_params": find_params}
- def _process_flavor_params(target_flavor, vim_info):
+ def _process_flavor_params(target_flavor, vim_info, target_record_id):
def _get_resource_allocation_params(quota_descriptor):
"""
"ram": int(target_flavor["memory-mb"]),
"vcpus": target_flavor["vcpu-count"],
}
+ numa = {}
+ extended = {}
if target_flavor.get("guest-epa"):
extended = {}
- numa = {}
epa_vcpu_set = False
if target_flavor["guest-epa"].get("numa-node-policy"):
numa_node_policy = target_flavor["guest-epa"].get("numa-node-policy")
extra_dict["params"] = {"flavor_data": flavor_data_name}
return extra_dict
- def _process_net_params(target_vld, vim_info):
+ def _ip_profile_2_ro(ip_profile):
+ if not ip_profile:
+ return None
+ ro_ip_profile = {
+ "ip_version": "IPv4" if "v4" in ip_profile.get("ip-version", "ipv4") else "IPv6",
+ "subnet_address": ip_profile.get("subnet-address"),
+ "gateway_address": ip_profile.get("gateway-address"),
+ "dhcp_enabled": ip_profile["dhcp-params"].get("enabled", True),
+ "dhcp_start_address": ip_profile["dhcp-params"].get("start-address"),
+ "dhcp_count": ip_profile["dhcp-params"].get("count"),
+
+ }
+ if ip_profile.get("dns-server"):
+ ro_ip_profile["dns_address"] = ";".join([v["address"] for v in ip_profile["dns-server"]])
+ if ip_profile.get('security-group'):
+ ro_ip_profile["security_group"] = ip_profile['security-group']
+ return ro_ip_profile
+
+ def _process_net_params(target_vld, vim_info, target_record_id):
nonlocal indata
extra_dict = {}
+
+ if vim_info.get("sdn"):
+ # vnf_preffix = "vnfrs:{}".format(vnfr_id)
+ # ns_preffix = "nsrs:{}".format(nsr_id)
+ vld_target_record_id, _, _ = target_record_id.rpartition(".") # remove the ending ".sdn
+ extra_dict["params"] = {k: vim_info[k] for k in ("sdn-ports", "target_vim", "vlds", "type")
+ if vim_info.get(k)}
+ # TODO needed to add target_id in the dependency.
+ if vim_info.get("target_vim"):
+ extra_dict["depends_on"] = [vim_info.get("target_vim") + " " + vld_target_record_id]
+ return extra_dict
+
if vim_info.get("vim_network_name"):
extra_dict["find_params"] = {"filter_dict": {"name": vim_info.get("vim_network_name")}}
elif vim_info.get("vim_network_id"):
# create
extra_dict["params"] = {
"net_name": "{}-{}".format(indata["name"][:16], target_vld.get("name", target_vld["id"])[:16]),
- "ip_profile": vim_info.get('ip_profile'),
+ "ip_profile": _ip_profile_2_ro(vim_info.get('ip_profile')),
"provider_network_profile": vim_info.get('provider_network'),
}
if not target_vld.get("underlay"):
extra_dict["params"]["net_type"] = "ptp" if target_vld.get("type") == "ELINE" else "data"
return extra_dict
- def _process_vdu_params(target_vdu, vim_info):
+ def _process_vdu_params(target_vdu, vim_info, target_record_id):
nonlocal vnfr_id
nonlocal nsr_id
nonlocal indata
nonlocal vnfr
nonlocal vdu2cloud_init
+ nonlocal tasks_by_target_record_id
vnf_preffix = "vnfrs:{}".format(vnfr_id)
ns_preffix = "nsrs:{}".format(nsr_id)
image_text = ns_preffix + ":image." + target_vdu["ns-image-id"]
else:
net_text = vnf_preffix + ":vld." + interface["vnf-vld-id"]
extra_dict["depends_on"].append(net_text)
- net_item = {
- "name": interface["name"],
- "net_id": "TASK-" + net_text,
- "vpci": interface.get("vpci"),
- "type": "virtual",
- # TODO mac_address: used for SR-IOV ifaces #TODO for other types
- # TODO floating_ip: True/False (or it can be None)
- }
+ net_item = {x: v for x, v in interface.items() if x in
+ ("name", "vpci", "port_security", "port_security_disable_strategy", "floating_ip")}
+ net_item["net_id"] = "TASK-" + net_text
+ net_item["type"] = "virtual"
+ # TODO mac_address: used for SR-IOV ifaces #TODO for other types
+ # TODO floating_ip: True/False (or it can be None)
if interface.get("type") in ("SR-IOV", "PCI-PASSTHROUGH"):
+ # mark the net create task as type data
+ if deep_get(tasks_by_target_record_id, net_text, "params", "net_type"):
+ tasks_by_target_record_id[net_text]["params"]["net_type"] = "data"
net_item["use"] = "data"
net_item["model"] = interface["type"]
net_item["type"] = interface["type"]
else: # if interface.get("type") in ("VIRTIO", "E1000", "PARAVIRT"):
net_item["use"] = "bridge"
net_item["model"] = interface.get("type")
+ if interface.get("ip-address"):
+ net_item["ip_address"] = interface["ip-address"]
+ if interface.get("mac-address"):
+ net_item["mac_address"] = interface["mac-address"]
net_list.append(net_item)
if interface.get("mgmt-vnf"):
extra_dict["mgmt_vnf_interface"] = iface_index
elif interface.get("mgmt-interface"):
extra_dict["mgmt_vdu_interface"] = iface_index
-
# cloud config
cloud_config = {}
if target_vdu.get("cloud-init"):
return extra_dict
def _process_items(target_list, existing_list, db_record, db_update, db_path, item, process_params):
- nonlocal db_ro_tasks
nonlocal db_new_tasks
+ nonlocal tasks_by_target_record_id
nonlocal task_index
- # ensure all the target_list elements has an "id". If not assign the index
+ # ensure all the target_list elements has an "id". If not assign the index as id
for target_index, tl in enumerate(target_list):
if tl and not tl.get("id"):
tl["id"] = str(target_index)
- # step 1 networks to be deleted/updated
- for vld_index, existing_vld in enumerate(existing_list):
- target_vld = next((vld for vld in target_list if vld["id"] == existing_vld["id"]), None)
- for existing_vim_index, existing_vim_info in enumerate(existing_vld.get("vim_info", ())):
- if not existing_vim_info:
+ # step 1 items (networks,vdus,...) to be deleted/updated
+ for item_index, existing_item in enumerate(existing_list):
+ target_item = next((t for t in target_list if t["id"] == existing_item["id"]), None)
+ for target_vim, existing_viminfo in existing_item.get("vim_info", {}).items():
+ if existing_viminfo is None:
continue
- if target_vld:
- target_viminfo = next((target_viminfo for target_viminfo in target_vld.get("vim_info", ())
- if existing_vim_info["vim_account_id"] == target_viminfo[
- "vim_account_id"]), None)
+ if target_item:
+ target_viminfo = target_item.get("vim_info", {}).get(target_vim)
else:
target_viminfo = None
- if not target_viminfo:
+ if target_viminfo is None:
# must be deleted
- self._assign_vim(existing_vim_info["vim_account_id"])
- db_new_tasks.append(_create_task(
- item, "DELETE",
- target_record="{}.{}.vim_info.{}".format(db_record, vld_index, existing_vim_index),
- target_record_id="{}.{}".format(db_record, existing_vld["id"])))
+ self.assign_vim(target_vim)
+ target_record_id = "{}.{}".format(db_record, existing_item["id"])
+ item_ = item
+ if target_vim.startswith("sdn"):
+ # item must be sdn-net instead of net if target_vim is a sdn
+ item_ = "sdn_net"
+ target_record_id += ".sdn"
+ task = _create_task(
+ target_vim, item_, "DELETE",
+ target_record="{}.{}.vim_info.{}".format(db_record, item_index, target_vim),
+ target_record_id=target_record_id)
+ tasks_by_target_record_id[target_record_id] = task
+ db_new_tasks.append(task)
# TODO delete
# TODO check one by one the vims to be created/deleted
- # step 2 networks to be created
- for target_vld in target_list:
- vld_index = -1
- for vld_index, existing_vld in enumerate(existing_list):
- if existing_vld["id"] == target_vld["id"]:
+ # step 2 items (networks,vdus,...) to be created
+ for target_item in target_list:
+ item_index = -1
+ for item_index, existing_item in enumerate(existing_list):
+ if existing_item["id"] == target_item["id"]:
break
else:
- vld_index += 1
- db_update[db_path + ".{}".format(vld_index)] = target_vld
- existing_list.append(target_vld)
- existing_vld = None
+ item_index += 1
+ db_update[db_path + ".{}".format(item_index)] = target_item
+ existing_list.append(target_item)
+ existing_item = None
- for vim_index, vim_info in enumerate(target_vld["vim_info"]):
+ for target_vim, target_viminfo in target_item.get("vim_info", {}).items():
existing_viminfo = None
- if existing_vld:
- existing_viminfo = next(
- (existing_viminfo for existing_viminfo in existing_vld.get("vim_info", ())
- if vim_info["vim_account_id"] == existing_viminfo["vim_account_id"]), None)
+ if existing_item:
+ existing_viminfo = existing_item.get("vim_info", {}).get(target_vim)
# TODO check if different. Delete and create???
# TODO delete if not exist
- if existing_viminfo:
+ if existing_viminfo is not None:
continue
- extra_dict = process_params(target_vld, vim_info)
-
- self._assign_vim(vim_info["vim_account_id"])
- db_ro_tasks.append(_create_ro_task(
- vim_info["vim_account_id"], item, "CREATE",
- target_record="{}.{}.vim_info.{}".format(db_record, vld_index, vim_index),
- target_record_id="{}.{}".format(db_record, target_vld["id"]),
- extra_dict=extra_dict))
-
- db_update[db_path + ".{}".format(vld_index)] = target_vld
+ target_record_id = "{}.{}".format(db_record, target_item["id"])
+ item_ = item
+ if target_vim.startswith("sdn"):
+ # item must be sdn-net instead of net if target_vim is a sdn
+ item_ = "sdn_net"
+ target_record_id += ".sdn"
+ extra_dict = process_params(target_item, target_viminfo, target_record_id)
+
+ self.assign_vim(target_vim)
+ task = _create_task(
+ target_vim, item_, "CREATE",
+ target_record="{}.{}.vim_info.{}".format(db_record, item_index, target_vim),
+ target_record_id=target_record_id,
+ extra_dict=extra_dict)
+ tasks_by_target_record_id[target_record_id] = task
+ db_new_tasks.append(task)
+ if target_item.get("common_id"):
+ task["common_id"] = target_item["common_id"]
+
+ db_update[db_path + ".{}".format(item_index)] = target_item
def _process_action(indata):
- nonlocal db_ro_tasks
nonlocal db_new_tasks
nonlocal task_index
nonlocal db_vnfrs
nonlocal db_ro_nsr
- if indata["action"] == "inject_ssh_key":
- key = indata.get("key")
- user = indata.get("user")
- password = indata.get("password")
+ if indata["action"]["action"] == "inject_ssh_key":
+ key = indata["action"].get("key")
+ user = indata["action"].get("user")
+ password = indata["action"].get("password")
for vnf in indata.get("vnf", ()):
- if vnf.get("_id") not in db_vnfrs:
+ if vnf["_id"] not in db_vnfrs:
raise NsException("Invalid vnf={}".format(vnf["_id"]))
db_vnfr = db_vnfrs[vnf["_id"]]
for target_vdu in vnf.get("vdur", ()):
i_v[1]["id"] == target_vdu["id"]), (None, None))
if not vdur:
raise NsException("Invalid vdu vnf={}.{}".format(vnf["_id"], target_vdu["id"]))
- vim_info = vdur["vim_info"][0]
- self._assign_vim(vim_info["vim_account_id"])
+ target_vim, vim_info = next(k_v for k_v in vdur["vim_info"].items())
+ self.assign_vim(target_vim)
target_record = "vnfrs:{}:vdur.{}.ssh_keys".format(vnf["_id"], vdu_index)
extra_dict = {
"depends_on": ["vnfrs:{}:vdur.{}".format(vnf["_id"], vdur["id"])],
"params": {
- "ip_address": vdur.gt("ip_address"),
+ "ip_address": vdur.get("ip-address"),
"user": user,
"key": key,
"password": password,
"schema_version": db_ro_nsr["_admin"]["schema_version"]
}
}
- db_ro_tasks.append(_create_ro_task(vim_info["vim_account_id"], "vdu", "EXEC",
- target_record=target_record,
- target_record_id=None,
- extra_dict=extra_dict))
+ task = _create_task(target_vim, "vdu", "EXEC",
+ target_record=target_record,
+ target_record_id=None,
+ extra_dict=extra_dict)
+ db_new_tasks.append(task)
with self.write_lock:
if indata.get("action"):
db_path="vld", item="net", process_params=_process_net_params)
step = "process NS images"
- _process_items(target_list=indata["image"] or [], existing_list=db_nsr.get("image") or [],
+ _process_items(target_list=indata.get("image") or [], existing_list=db_nsr.get("image") or [],
db_record="nsrs:{}:image".format(nsr_id),
db_update=db_nsr_update, db_path="image", item="image",
process_params=_process_image_params)
step = "process NS flavors"
- _process_items(target_list=indata["flavor"] or [], existing_list=db_nsr.get("flavor") or [],
+ _process_items(target_list=indata.get("flavor") or [], existing_list=db_nsr.get("flavor") or [],
db_record="nsrs:{}:flavor".format(nsr_id),
db_update=db_nsr_update, db_path="flavor", item="flavor",
process_params=_process_flavor_params)
db_update=db_vnfrs_update[vnfr["_id"]], db_path="vdur", item="vdu",
process_params=_process_vdu_params)
- step = "Updating database, Creating ro_tasks"
- if db_ro_tasks:
- self.db.create_list("ro_tasks", db_ro_tasks)
- step = "Updating database, Appending tasks to ro_tasks"
- for task in db_new_tasks:
- if not self.db.set_one("ro_tasks", q_filter={"tasks.target_record": task["target_record"]},
+ for db_task in db_new_tasks:
+ step = "Updating database, Appending tasks to ro_tasks"
+ target_id = db_task.pop("target_id")
+ common_id = db_task.get("common_id")
+ if common_id:
+ if self.db.set_one("ro_tasks",
+ q_filter={"target_id": target_id,
+ "tasks.common_id": common_id},
update_dict={"to_check_at": now, "modified_at": now},
- push={"tasks": task}, fail_on_empty=False):
- self.logger.error(logging_text + "Cannot find task for target_record={}".
- format(task["target_record"]))
- # TODO something else appart from logging?
+ push={"tasks": db_task}, fail_on_empty=False):
+ continue
+ if not self.db.set_one("ro_tasks",
+ q_filter={"target_id": target_id,
+ "tasks.target_record": db_task["target_record"]},
+ update_dict={"to_check_at": now, "modified_at": now},
+ push={"tasks": db_task}, fail_on_empty=False):
+ # Create a ro_task
+ step = "Updating database, Creating ro_tasks"
+ db_ro_task = _create_ro_task(target_id, db_task)
+ nb_ro_tasks += 1
+ self.db.create("ro_tasks", db_ro_task)
step = "Updating database, nsrs"
if db_nsr_update:
self.db.set_one("nsrs", {"_id": nsr_id}, db_nsr_update)
step = "Updating database, vnfrs={}".format(vnfr_id)
self.db.set_one("vnfrs", {"_id": vnfr_id}, db_vnfr_update)
- self.logger.debug(logging_text + "Exit")
+ self.logger.debug(logging_text + "Exit. Created {} ro_tasks; {} tasks".format(nb_ro_tasks,
+ len(db_new_tasks)))
return {"status": "ok", "nsr_id": nsr_id, "action_id": action_id}, action_id, True
except Exception as e:
raise NsException(e)
def delete(self, session, indata, version, nsr_id, *args, **kwargs):
- print("ns.delete session={} indata={} version={} nsr_id={}".format(session, indata, version, nsr_id))
- # TODO del when ALL "tasks.nsr_id" are None of nsr_id
+ self.logger.debug("ns.delete version={} nsr_id={}".format(version, nsr_id))
# self.db.del_list({"_id": ro_task["_id"], "tasks.nsr_id.ne": nsr_id})
- retries = 5
- for retry in range(retries):
- with self.write_lock:
- ro_tasks = self.db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
- if not ro_tasks:
- break
- now = time()
- conflict = False
- for ro_task in ro_tasks:
- db_update = {}
- to_delete = True
- for index, task in enumerate(ro_task["tasks"]):
- if not task:
- pass
- elif task["nsr_id"] == nsr_id:
- db_update["tasks.{}".format(index)] = None
- else:
- to_delete = False # used by other nsr, cannot be deleted
- # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
- if to_delete:
- if not self.db.del_one("ro_tasks",
- q_filter={"_id": ro_task["_id"], "modified_at": ro_task["modified_at"]},
- fail_on_empty=False):
- conflict = True
- elif db_update:
- db_update["modified_at"] = now
- if not self.db.set_one("ro_tasks",
- q_filter={"_id": ro_task["_id"], "modified_at": ro_task["modified_at"]},
- update_dict=db_update,
- fail_on_empty=False):
- conflict = True
- if not conflict:
- break
- else:
- raise NsException("Exceeded {} retries".format(retries))
-
+ with self.write_lock:
+ try:
+ NsWorker.delete_db_tasks(self.db, nsr_id, None)
+ except NsWorkerException as e:
+ raise NsException(e)
return None, None, True
def status(self, session, indata, version, nsr_id, action_id, *args, **kwargs):
- print("ns.status session={} indata={} version={} nsr_id={}, action_id={}".format(session, indata, version,
- nsr_id, action_id))
+ # self.logger.debug("ns.status version={} nsr_id={}, action_id={} indata={}"
+ # .format(version, nsr_id, action_id, indata))
task_list = []
done = 0
total = 0
details = []
for ro_task in ro_tasks:
for task in ro_task["tasks"]:
- if task["action_id"] == action_id:
+ if task and task["action_id"] == action_id:
task_list.append(task)
total += 1
if task["status"] == "FAILED":
global_status = "FAILED"
- details.append(ro_task.get("vim_details", ''))
+ error_text = "Error at {} {}: {}".format(task["action"].lower(), task["item"],
+ ro_task["vim_info"].get("vim_details") or "unknown")
+ details.append(error_text)
elif task["status"] in ("SCHEDULED", "BUILD"):
if global_status != "FAILED":
global_status = "BUILD"
import time
import queue
import logging
+import yaml
from pkg_resources import iter_entry_points
# from osm_common import dbmongo, dbmemory, fslocal, fsmongo, msglocal, msgkafka, version as common_version
from osm_common.dbbase import DbException
# from osm_common.fsbase import FsException
# from osm_common.msgbase import MsgException
from osm_ro_plugin.vim_dummy import VimDummyConnector
-from osm_ro_plugin import vimconn
+from osm_ro_plugin.sdn_dummy import SdnDummyConnector
+from osm_ro_plugin import vimconn, sdnconn
from copy import deepcopy
from unittest.mock import Mock
+from http import HTTPStatus
+from os import mkdir
+from shutil import rmtree
__author__ = "Alfonso Tierno"
__date__ = "$28-Sep-2017 12:07:15$"
for method in dir(vimconn.VimConnector):
if method[0] != "_":
setattr(self, method, Mock(side_effect=vimconn.VimConnException(error_msg)))
+ for method in dir(sdnconn.SdnConnectorBase):
+ if method[0] != "_":
+ setattr(self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg)))
class NsWorkerExceptionNotFound(NsWorkerException):
pass
-class NsWorker(threading.Thread):
- REFRESH_BUILD = 5 # 5 seconds
- REFRESH_ACTIVE = 60 # 1 minute
- REFRESH_ERROR = 600
- REFRESH_IMAGE = 3600 * 10
- REFRESH_DELETE = 3600 * 10
- QUEUE_SIZE = 2000
- # TODO delete assigment_lock = Lock()
- terminate = False
- # TODO delete assignment = {}
- MAX_TIME_LOCKED = 3600
-
- def __init__(self, worker, config, plugins, db):
- """Init a thread.
- Arguments:
- 'id' number of thead
- 'name' name of thread
- 'host','user': host ip or name to manage and user
- 'db', 'db_lock': database class and lock to use it in exclusion
- """
- threading.Thread.__init__(self)
- self.config = config
- self.plugins = plugins
- self.plugin_name = "unknown"
- self.logger = logging.getLogger('ro.worker{}'.format("worker"))
- self.worker_id = worker
- self.task_queue = queue.Queue(self.QUEUE_SIZE)
- self.my_vims = {} # targetvim: vimplugin class
- self.db_vims = {} # targetvim: vim information from database
- self.vim_targets = [] # targetvim list
- self.my_id = config["process_id"] + ":" + str(worker)
+class VimInteractionBase:
+ """ Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
+ It implements methods that does nothing and return ok"""
+ def __init__(self, db, my_vims, db_vims, logger):
self.db = db
- self.item2create = {
- "net": self.new_net,
- "vdu": self.new_vm,
- "image": self.new_image,
- "flavor": self.new_flavor,
- }
- self.item2refresh = {
- "net": self.refresh_net,
- "vdu": self.refresh_vm,
- "image": self.refresh_ok,
- "flavor": self.refresh_ok,
- }
- self.item2delete = {
- "net": self.del_net,
- "vdu": self.del_vm,
- "image": self.delete_ok,
- "flavor": self.del_flavor,
- }
- self.item2action = {
- "vdu": self.exec_vm,
- }
- self.time_last_task_processed = None
-
- def insert_task(self, task):
- try:
- self.task_queue.put(task, False)
- return None
- except queue.Full:
- raise NsWorkerException("timeout inserting a task")
-
- def terminate(self):
- self.insert_task("exit")
+ self.logger = logger
+ self.my_vims = my_vims
+ self.db_vims = db_vims
- def del_task(self, task):
- with self.task_lock:
- if task["status"] == "SCHEDULED":
- task["status"] = "SUPERSEDED"
- return True
- else: # task["status"] == "processing"
- self.task_lock.release()
- return False
+ def new(self, ro_task, task_index, task_depends):
+ return "BUILD", {}
- def _load_plugin(self, name, type="vim"):
- # type can be vim or sdn
- if "rovim_dummy" not in self.plugins:
- self.plugins["rovim_dummy"] = VimDummyConnector
- if name in self.plugins:
- return self.plugins[name]
- try:
- for v in iter_entry_points('osm_ro{}.plugins'.format(type), name):
- self.plugins[name] = v.load()
- except Exception as e:
- self.logger.critical("Cannot load osm_{}: {}".format(name, e))
- if name:
- self.plugins[name] = FailingConnector("Cannot load osm_{}: {}".format(name, e))
- if name and name not in self.plugins:
- error_text = "Cannot load a module for {t} type '{n}'. The plugin 'osm_{n}' has not been" \
- " registered".format(t=type, n=name)
- self.logger.critical(error_text)
- self.plugins[name] = FailingConnector(error_text)
+ def refresh(self, ro_task):
+ """skip calling VIM to get image, flavor status. Assumes ok"""
+ if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
+ return "FAILED", {}
+ return "DONE", {}
- return self.plugins[name]
+ def delete(self, ro_task, task_index):
+ """skip calling VIM to delete image. Assumes ok"""
+ return "DONE", {}
- def _load_vim(self, vim_account_id):
- target_id = "vim:" + vim_account_id
- plugin_name = ""
- vim = None
- try:
- step = "Getting vim={} from db".format(vim_account_id)
- vim = self.db.get_one("vim_accounts", {"_id": vim_account_id})
+ def exec(self, ro_task, task_index, task_depends):
+ return "DONE", None, None
- # if deep_get(vim, "config", "sdn-controller"):
- # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
- # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
- step = "Decrypt password"
- schema_version = vim.get("schema_version")
- self.db.encrypt_decrypt_fields(vim, "decrypt", fields=('password', 'secret'),
- schema_version=schema_version, salt=vim_account_id)
-
- step = "Load plugin 'rovim_{}'".format(vim.get("vim_type"))
- plugin_name = "rovim_" + vim["vim_type"]
- vim_module_conn = self._load_plugin(plugin_name)
- self.my_vims[target_id] = vim_module_conn(
- uuid=vim['_id'], name=vim['name'],
- tenant_id=vim.get('vim_tenant_id'), tenant_name=vim.get('vim_tenant_name'),
- url=vim['vim_url'], url_admin=None,
- user=vim['vim_user'], passwd=vim['vim_password'],
- config=vim.get('config'), persistent_info={}
- )
- self.vim_targets.append(target_id)
- self.db_vims[target_id] = vim
- self.error_status = None
- self.logger.info("Vim Connector loaded for vim_account={}, plugin={}".format(
- vim_account_id, plugin_name))
- except Exception as e:
- self.logger.error("Cannot load vimconnector for vim_account={} plugin={}: {} {}".format(
- vim_account_id, plugin_name, step, e))
- self.db_vims[target_id] = vim or {}
- self.my_vims[target_id] = FailingConnector(str(e))
- self.error_status = "Error loading vimconnector: {}".format(e)
+class VimInteractionNet(VimInteractionBase):
- def _get_db_task(self):
- """
- Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
- :return: None
- """
- now = time.time()
- if not self.time_last_task_processed:
- self.time_last_task_processed = now
+ def new(self, ro_task, task_index, task_depends):
+ vim_net_id = None
+ task = ro_task["tasks"][task_index]
+ task_id = task["task_id"]
+ created = False
+ created_items = {}
+ target_vim = self.my_vims[ro_task["target_id"]]
try:
- while True:
- locked = self.db.set_one(
- "ro_tasks",
- q_filter={"target_id": self.vim_targets,
- "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
- "locked_at.lt": now - self.MAX_TIME_LOCKED,
- "to_check_at.lt": self.time_last_task_processed},
- update_dict={"locked_by": self.my_id, "locked_at": now},
- fail_on_empty=False)
- if locked:
- # read and return
- ro_task = self.db.get_one(
- "ro_tasks",
- q_filter={"target_id": self.vim_targets,
- "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
- "locked_at": now})
- return ro_task
- if self.time_last_task_processed == now:
- self.time_last_task_processed = None
- return None
+ # FIND
+ if task.get("find_params"):
+ # if management, get configuration of VIM
+ if task["find_params"].get("filter_dict"):
+ vim_filter = task["find_params"]["filter_dict"]
+ elif task["find_params"].get("mgmt"): # mamagement network
+ if deep_get(self.db_vims[ro_task["target_id"]], "config", "management_network_id"):
+ vim_filter = {"id": self.db_vims[ro_task["target_id"]]["config"]["management_network_id"]}
+ elif deep_get(self.db_vims[ro_task["target_id"]], "config", "management_network_name"):
+ vim_filter = {"name": self.db_vims[ro_task["target_id"]]["config"]["management_network_name"]}
+ else:
+ vim_filter = {"name": task["find_params"]["name"]}
else:
- self.time_last_task_processed = now
- # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
-
- except DbException as e:
- self.logger.error("Database exception at _get_db_task: {}".format(e))
- except Exception as e:
- self.logger.critical("Unexpected exception at _get_db_task: {}".format(e), exc_info=True)
- return None
+ raise NsWorkerExceptionNotFound("Invalid find_params for new_net {}".format(task["find_params"]))
- def _delete_task(self, ro_task, task_index, task_depends, db_update):
- """
- Determine if this task need to be done or superseded
- :return: None
- """
- my_task = ro_task["tasks"][task_index]
- task_id = my_task["task_id"]
- needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get("created_items", False)
- if my_task["status"] == "FAILED":
- return None, None # TODO need to be retry??
- try:
- for index, task in enumerate(ro_task["tasks"]):
- if index == task_index:
- continue # own task
- if my_task["target_record"] == task["target_record"] and task["action"] == "CREATE":
- # set to finished
- db_update["tasks.{}.status".format(index)] = task["status"] = "FINISHED"
- elif task["action"] == "CREATE" and task["status"] not in ("FINISHED", "SUPERSEDED"):
- needed_delete = False
- if needed_delete:
- return self.item2delete[my_task["item"]](ro_task, task_index)
+ vim_nets = target_vim.get_network_list(vim_filter)
+ if not vim_nets and not task.get("params"):
+ raise NsWorkerExceptionNotFound("Network not found with this criteria: '{}'".format(
+ task.get("find_params")))
+ elif len(vim_nets) > 1:
+ raise NsWorkerException(
+ "More than one network found with this criteria: '{}'".format(task["find_params"]))
+ if vim_nets:
+ vim_net_id = vim_nets[0]["id"]
else:
- return "SUPERSEDED", None
- except Exception as e:
- if not isinstance(e, NsWorkerException):
- self.logger.critical("Unexpected exception at _delete_task task={}: {}".format(task_id, e),
- exc_info=True)
- return "FAILED", {"vim_status": "VIM_ERROR", "vim_details": str(e)}
+ # CREATE
+ params = task["params"]
+ vim_net_id, created_items = target_vim.new_network(**params)
+ created = True
- def _create_task(self, ro_task, task_index, task_depends, db_update):
- """
- Determine if this task need to be created
- :return: None
- """
- my_task = ro_task["tasks"][task_index]
- task_id = my_task["task_id"]
- task_status = None
- if my_task["status"] == "FAILED":
- return None, None # TODO need to be retry??
- elif my_task["status"] == "SCHEDULED":
- # check if already created by another task
- for index, task in enumerate(ro_task["tasks"]):
- if index == task_index:
- continue # own task
- if task["action"] == "CREATE" and task["status"] not in ("SCHEDULED", "FINISHED", "SUPERSEDED"):
- return task["status"], "COPY_VIM_INFO"
+ ro_vim_item_update = {"vim_id": vim_net_id,
+ "vim_status": "BUILD",
+ "created": created,
+ "created_items": created_items,
+ "vim_details": None}
+ self.logger.debug(
+ "task={} {} new-net={} created={}".format(task_id, ro_task["target_id"], vim_net_id, created))
+ return "BUILD", ro_vim_item_update
+ except (vimconn.VimConnException, NsWorkerException) as e:
+ self.logger.error("task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e))
+ ro_vim_item_update = {"vim_status": "VIM_ERROR",
+ "created": created,
+ "vim_details": str(e)}
+ return "FAILED", ro_vim_item_update
- try:
- task_status, ro_vim_item_update = self.item2create[my_task["item"]](ro_task, task_index, task_depends)
- # TODO update other CREATE tasks
- except Exception as e:
- if not isinstance(e, NsWorkerException):
- self.logger.error("Error executing task={}: {}".format(task_id, e), exc_info=True)
- task_status = "FAILED"
- ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_details": str(e)}
- # TODO update ro_vim_item_update
- return task_status, ro_vim_item_update
- else:
- return None, None
+ def refresh(self, ro_task):
+ """Call VIM to get network status"""
+ ro_task_id = ro_task["_id"]
+ target_vim = self.my_vims[ro_task["target_id"]]
- def _get_dependency(self, task_id, ro_task=None, target_id=None):
- if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
- ro_task_dependency = self.db.get_one(
- "ro_tasks",
- q_filter={"target_id": target_id,
- "tasks.target_record_id": task_id
- },
- fail_on_empty=False)
- if ro_task_dependency:
- for task_index, task in enumerate(ro_task_dependency["tasks"]):
- if task["target_record_id"] == task_id:
- return ro_task_dependency, task_index
+ vim_id = ro_task["vim_info"]["vim_id"]
+ net_to_refresh_list = [vim_id]
+ try:
+ vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
+ vim_info = vim_dict[vim_id]
+ if vim_info["status"] == "ACTIVE":
+ task_status = "DONE"
+ elif vim_info["status"] == "BUILD":
+ task_status = "BUILD"
+ else:
+ task_status = "FAILED"
+ except vimconn.VimConnException as e:
+ # Mark all tasks at VIM_ERROR status
+ self.logger.error("ro_task={} vim={} get-net={}: {}".format(ro_task_id, ro_task["target_id"], vim_id, e))
+ vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
+ task_status = "FAILED"
+ ro_vim_item_update = {}
+ if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
+ ro_vim_item_update["vim_status"] = vim_info["status"]
+ if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
+ ro_vim_item_update["vim_name"] = vim_info.get("name")
+ if vim_info["status"] in ("ERROR", "VIM_ERROR"):
+ if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"):
+ ro_vim_item_update["vim_details"] = vim_info.get("error_msg")
+ elif vim_info["status"] == "DELETED":
+ ro_vim_item_update["vim_id"] = None
+ ro_vim_item_update["vim_details"] = "Deleted externally"
else:
- if ro_task:
- for task_index, task in enumerate(ro_task["tasks"]):
- if task["task_id"] == task_id:
- return ro_task, task_index
- ro_task_dependency = self.db.get_one(
- "ro_tasks",
- q_filter={"tasks.ANYINDEX.task_id": task_id,
- "tasks.ANYINDEX.target_record.ne": None
- },
- fail_on_empty=False)
- if ro_task_dependency:
- for task_index, task in ro_task_dependency["tasks"]:
- if task["task_id"] == task_id:
- return ro_task_dependency, task_index
- raise NsWorkerException("Cannot get depending task {}".format(task_id))
-
- def _proccess_pending_tasks(self, ro_task):
- ro_task_id = ro_task["_id"]
- now = time.time()
- next_check_at = now + (24*60*60) # one day
- db_ro_task_update = {}
-
- def _update_refresh(new_status):
- # compute next_refresh
- nonlocal task
- nonlocal next_check_at
- nonlocal db_ro_task_update
- nonlocal ro_task
-
- next_refresh = time.time()
- if task["item"] in ("image", "flavor"):
- next_refresh += self.REFRESH_IMAGE
- elif new_status == "BUILD":
- next_refresh += self.REFRESH_BUILD
- elif new_status == "DONE":
- next_refresh += self.REFRESH_ACTIVE
- else:
- next_refresh += self.REFRESH_ERROR
- next_check_at = min(next_check_at, next_refresh)
- db_ro_task_update["vim_info.refresh_at"] = next_refresh
- ro_task["vim_info"]["refresh_at"] = next_refresh
+ if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
+ ro_vim_item_update["vim_details"] = vim_info["vim_info"]
+ if ro_vim_item_update:
+ self.logger.debug("ro_task={} {} get-net={}: status={} {}".format(
+ ro_task_id, ro_task["target_id"], vim_id, ro_vim_item_update.get("vim_status"),
+ ro_vim_item_update.get("vim_details") if ro_vim_item_update.get("vim_status") != "ACTIVE" else ''))
+ return task_status, ro_vim_item_update
+ def delete(self, ro_task, task_index):
+ task = ro_task["tasks"][task_index]
+ task_id = task["task_id"]
+ net_vim_id = ro_task["vim_info"]["vim_id"]
+ ro_vim_item_update_ok = {"vim_status": "DELETED",
+ "created": False,
+ "vim_details": "DELETED",
+ "vim_id": None}
try:
- # 0 get task_status_create
- task_status_create = None
- task_create = next((t for t in ro_task["tasks"] if t["action"] == "CREATE" and
- t["status"] in ("BUILD", "DONE")), None)
- if task_create:
- task_status_create = task_create["status"]
- # 1. look for SCHEDULED or if CREATE also DONE,BUILD
- for task_action in ("DELETE", "CREATE", "EXEC"):
- db_vim_update = None
- for task_index, task in enumerate(ro_task["tasks"]):
- target_update = None
- if (task_action in ("DELETE", "EXEC") and task["status"] != "SCHEDULED") or\
- task["action"] != task_action or \
- (task_action == "CREATE" and task["status"] in ("FINISHED", "SUPERSEDED")):
- continue
- task_path = "tasks.{}.status".format(task_index)
- try:
- if task["status"] == "SCHEDULED":
- task_depends = {}
- # check if tasks that this depends on have been completed
- dependency_not_completed = False
- for dependency_task_id in (task.get("depends_on") or ()):
- dependency_ro_task, dependency_task_index = \
- self._get_dependency(dependency_task_id, target_id=ro_task["target_id"])
- dependency_task = dependency_ro_task["tasks"][dependency_task_index]
- if dependency_task["status"] == "SCHEDULED":
- dependency_not_completed = True
- next_check_at = min(next_check_at, dependency_ro_task["to_check_at"])
- break
- elif dependency_task["status"] == "FAILED":
- error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
- task["action"], task["item"], dependency_task["action"],
- dependency_task["item"], dependency_task_id,
- dependency_ro_task["vim_info"].get("vim_details"))
- self.logger.error("task={} {}".format(task["task_id"], error_text))
- raise NsWorkerException(error_text)
-
- task_depends[dependency_task_id] = dependency_ro_task["vim_info"]["vim_id"]
- task_depends["TASK-{}".format(dependency_task_id)] = \
- dependency_ro_task["vim_info"]["vim_id"]
- if dependency_not_completed:
- # TODO set at vim_info.vim_details that it is waiting
- continue
-
- if task["action"] == "DELETE":
- new_status, db_vim_info_update = self._delete_task(ro_task, task_index,
- task_depends, db_ro_task_update)
- new_status = "FINISHED" if new_status == "DONE" else new_status
- # ^with FINISHED instead of DONE it will not be refreshing
- if new_status in ("FINISHED", "SUPERSEDED"):
- target_update = "DELETE"
- elif task["action"] == "EXEC":
- self.item2action[task["item"]](ro_task, task_index, task_depends, db_ro_task_update)
- new_status = "FINISHED" if new_status == "DONE" else new_status
- # ^with FINISHED instead of DONE it will not be refreshing
- if new_status in ("FINISHED", "SUPERSEDED"):
- target_update = "DELETE"
- elif task["action"] == "CREATE":
- if task["status"] == "SCHEDULED":
- if task_status_create:
- new_status = task_status_create
- target_update = "COPY_VIM_INFO"
- else:
- new_status, db_vim_info_update = \
- self.item2create[task["item"]](ro_task, task_index, task_depends)
- # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
- _update_refresh(new_status)
- else:
- if ro_task["vim_info"]["refresh_at"] and now > ro_task["vim_info"]["refresh_at"]:
- new_status, db_vim_info_update = self.item2refresh[task["item"]](ro_task)
- _update_refresh(new_status)
- except Exception as e:
- new_status = "FAILED"
- db_vim_info_update = {"vim_status": "VIM_ERROR", "vim_details": str(e)}
- if not isinstance(e, (NsWorkerException, vimconn.VimConnException)):
- self.logger.error("Unexpected exception at _delete_task task={}: {}".
- format(task["task_id"], e), exc_info=True)
-
- try:
- if db_vim_info_update:
- db_vim_update = db_vim_info_update.copy()
- db_ro_task_update.update({"vim_info." + k: v for k, v in db_vim_info_update.items()})
- ro_task["vim_info"].update(db_vim_info_update)
+ if net_vim_id or ro_task["vim_info"]["created_items"]:
+ target_vim = self.my_vims[ro_task["target_id"]]
+ target_vim.delete_network(net_vim_id, ro_task["vim_info"]["created_items"])
- if new_status:
- if task_action == "CREATE":
- task_status_create = new_status
- db_ro_task_update[task_path] = new_status
- if target_update or db_vim_update:
+ except vimconn.VimConnNotFoundException:
+ ro_vim_item_update_ok["vim_details"] = "already deleted"
- if target_update == "DELETE":
- self._update_target(task, None)
- elif target_update == "COPY_VIM_INFO":
- self._update_target(task, ro_task["vim_info"])
- else:
- self._update_target(task, db_vim_update)
+ except vimconn.VimConnException as e:
+ self.logger.error("ro_task={} vim={} del-net={}: {}".format(ro_task["_id"], ro_task["target_id"],
+ net_vim_id, e))
+ ro_vim_item_update = {"vim_status": "VIM_ERROR",
+ "vim_details": "Error while deleting: {}".format(e)}
+ return "FAILED", ro_vim_item_update
- except Exception as e:
- self.logger.error("Unexpected exception at _update_target task={}: {}".
- format(task["task_id"], e), exc_info=True)
+ self.logger.debug("task={} {} del-net={} {}".format(task_id, ro_task["target_id"], net_vim_id,
+ ro_vim_item_update_ok.get("vim_details", "")))
+ return "DONE", ro_vim_item_update_ok
- # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
- # outside this task (by ro_nbi) do not update it
- db_ro_task_update["locked_by"] = None
- # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
- db_ro_task_update["locked_at"] = int(now - self.MAX_TIME_LOCKED)
- db_ro_task_update["to_check_at"] = next_check_at
- if not self.db.set_one("ro_tasks",
- update_dict=db_ro_task_update,
- q_filter={"_id": ro_task["_id"], "to_check_at": ro_task["to_check_at"]},
- fail_on_empty=False):
- del db_ro_task_update["to_check_at"]
- self.db.set_one("ro_tasks",
- q_filter={"_id": ro_task["_id"]},
- update_dict=db_ro_task_update,
- fail_on_empty=True)
- except DbException as e:
- self.logger.error("ro_task={} Error updating database {}".format(ro_task_id, e))
- except Exception as e:
- self.logger.error("Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True)
- def _update_target(self, task, ro_vim_item_update):
- try:
- table, _id, path = task["target_record"].split(":")
- if ro_vim_item_update:
- update_dict = {path + "." + k: v for k, v in ro_vim_item_update.items() if k in
- ('vim_id', 'vim_details', 'vim_name', 'vim_status', 'interfaces')}
- if ro_vim_item_update.get("interfaces"):
- path_vdu = path[:path.rfind(".")]
- path_vdu = path_vdu[:path_vdu.rfind(".")]
- path_interfaces = path_vdu + ".interfaces"
- for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
- if iface:
- update_dict.update({path_interfaces + ".{}.".format(i) + k: v for k, v in iface.items() if
- k in ('ip_address', 'mac_address', 'vlan', 'compute_node', 'pci')})
- if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
- update_dict["ip-address"] = iface.get("ip_address").split(";")[0]
- if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
- update_dict[path_vdu + ".ip-address"] = iface.get("ip_address").split(";")[0]
-
- self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
- else:
- self.db.set_one(table, q_filter={"_id": _id}, update_dict=None,
- unset={path: None})
- except DbException as e:
- self.logger.error("Cannot update database '{}': '{}'".format(task["target_record"], e))
+class VimInteractionVdu(VimInteractionBase):
+ max_retries_inject_ssh_key = 20 # 20 times
+ time_retries_inject_ssh_key = 30 # wevery 30 seconds
- def new_image(self, ro_task, task_index, task_depends):
+ def new(self, ro_task, task_index, task_depends):
task = ro_task["tasks"][task_index]
task_id = task["task_id"]
created = False
created_items = {}
target_vim = self.my_vims[ro_task["target_id"]]
try:
- # FIND
- if task.get("find_params"):
- vim_images = target_vim.get_image_list(**task["find_params"])
- if not vim_images:
- raise NsWorkerExceptionNotFound("Image not found with this criteria: '{}'".format(
- task["find_params"]))
- elif len(vim_images) > 1:
- raise NsWorkerException(
- "More than one network found with this criteria: '{}'".format(task["find_params"]))
- else:
- vim_image_id = vim_images[0]["id"]
+ created = True
+ params = task["params"]
+ params_copy = deepcopy(params)
+ net_list = params_copy["net_list"]
+ for net in net_list:
+ if "net_id" in net and net["net_id"].startswith("TASK-"): # change task_id into network_id
+ network_id = task_depends[net["net_id"]]
+ if not network_id:
+ raise NsWorkerException("Cannot create VM because depends on a network not created or found "
+ "for {}".format(net["net_id"]))
+ net["net_id"] = network_id
+ if params_copy["image_id"].startswith("TASK-"):
+ params_copy["image_id"] = task_depends[params_copy["image_id"]]
+ if params_copy["flavor_id"].startswith("TASK-"):
+ params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
- ro_vim_item_update = {"vim_id": vim_image_id,
- "vim_status": "DONE",
+ vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
+ interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
+
+ ro_vim_item_update = {"vim_id": vim_vm_id,
+ "vim_status": "BUILD",
"created": created,
"created_items": created_items,
- "vim_details": None}
+ "vim_details": None,
+ "interfaces_vim_ids": interfaces,
+ "interfaces": [],
+ }
self.logger.debug(
- "task={} {} new-image={} created={}".format(task_id, ro_task["target_id"], vim_image_id, created))
- return "DONE", ro_vim_item_update
- except (NsWorkerException, vimconn.VimConnException) as e:
- self.logger.error("task={} {} new-image: {}".format(task_id, ro_task["target_id"], e))
+ "task={} {} new-vm={} created={}".format(task_id, ro_task["target_id"], vim_vm_id, created))
+ return "BUILD", ro_vim_item_update
+ except (vimconn.VimConnException, NsWorkerException) as e:
+ self.logger.error("task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e))
ro_vim_item_update = {"vim_status": "VIM_ERROR",
"created": created,
"vim_details": str(e)}
return "FAILED", ro_vim_item_update
- def del_flavor(self, ro_task, task_index):
+ def delete(self, ro_task, task_index):
task = ro_task["tasks"][task_index]
task_id = task["task_id"]
- flavor_vim_id = ro_task["vim_info"]["vim_id"]
+ vm_vim_id = ro_task["vim_info"]["vim_id"]
ro_vim_item_update_ok = {"vim_status": "DELETED",
"created": False,
"vim_details": "DELETED",
"vim_id": None}
try:
- if flavor_vim_id:
+ if vm_vim_id or ro_task["vim_info"]["created_items"]:
target_vim = self.my_vims[ro_task["target_id"]]
- target_vim.delete_flavor(flavor_vim_id)
+ target_vim.delete_vminstance(vm_vim_id, ro_task["vim_info"]["created_items"])
except vimconn.VimConnNotFoundException:
ro_vim_item_update_ok["vim_details"] = "already deleted"
except vimconn.VimConnException as e:
- self.logger.error("ro_task={} vim={} del-flavor={}: {}".format(
- ro_task["_id"], ro_task["target_id"], flavor_vim_id, e))
+ self.logger.error("ro_task={} vim={} del-vm={}: {}".format(ro_task["_id"], ro_task["target_id"],
+ vm_vim_id, e))
ro_vim_item_update = {"vim_status": "VIM_ERROR",
"vim_details": "Error while deleting: {}".format(e)}
return "FAILED", ro_vim_item_update
- self.logger.debug("task={} {} del-flavor={} {}".format(
- task_id, ro_task["target_id"], flavor_vim_id, ro_vim_item_update_ok.get("vim_details", "")))
+ self.logger.debug("task={} {} del-vm={} {}".format(task_id, ro_task["target_id"], vm_vim_id,
+ ro_vim_item_update_ok.get("vim_details", "")))
return "DONE", ro_vim_item_update_ok
- def refresh_ok(self, ro_task):
- """skip calling VIM to get image status. Assumes ok"""
- if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
- return "FAILED", {}
- return "DONE", {}
-
- def delete_ok(self, ro_task):
- """skip calling VIM to delete image status. Assumes ok"""
- return "DONE", {}
-
- def new_flavor(self, ro_task, task_index, task_depends):
- task = ro_task["tasks"][task_index]
- task_id = task["task_id"]
- created = False
- created_items = {}
- target_vim = self.my_vims[ro_task["target_id"]]
- try:
- # FIND
- vim_flavor_id = None
- if task.get("find_params"):
- try:
- flavor_data = task["find_params"]["flavor_data"]
- vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
- except vimconn.VimConnNotFoundException:
- pass
-
- if not vim_flavor_id and task.get("params"):
- # CREATE
- flavor_data = task["params"]["flavor_data"]
- vim_flavor_id = target_vim.new_flavor(flavor_data)
- created = True
-
- ro_vim_item_update = {"vim_id": vim_flavor_id,
- "vim_status": "DONE",
- "created": created,
- "created_items": created_items,
- "vim_details": None}
- self.logger.debug(
- "task={} {} new-flavor={} created={}".format(task_id, ro_task["target_id"], vim_flavor_id, created))
- return "DONE", ro_vim_item_update
- except (vimconn.VimConnException, NsWorkerException) as e:
- self.logger.error("task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e))
- ro_vim_item_update = {"vim_status": "VIM_ERROR",
- "created": created,
- "vim_details": str(e)}
- return "FAILED", ro_vim_item_update
-
- def new_net(self, ro_task, task_index, task_depends):
- vim_net_id = None
- task = ro_task["tasks"][task_index]
- task_id = task["task_id"]
- created = False
- created_items = {}
- target_vim = self.my_vims[ro_task["target_id"]]
- try:
- # FIND
- if task.get("find_params"):
- # if management, get configuration of VIM
- if task["find_params"].get("filter_dict"):
- vim_filter = task["find_params"]["filter_dict"]
- elif task["find_params"].get("mgmt"): # mamagement network
- if deep_get(self.db_vims[ro_task["target_id"]], "config", "management_network_id"):
- vim_filter = {"id": self.db_vims[ro_task["target_id"]]["config"]["management_network_id"]}
- elif deep_get(self.db_vims[ro_task["target_id"]], "config", "management_network_name"):
- vim_filter = {"name": self.db_vims[ro_task["target_id"]]["config"]["management_network_name"]}
- else:
- vim_filter = {"name": task["find_params"]["name"]}
- else:
- raise NsWorkerExceptionNotFound("Invalid find_params for new_net {}".format(task["find_params"]))
-
- vim_nets = target_vim.get_network_list(vim_filter)
- if not vim_nets and not task.get("params"):
- raise NsWorkerExceptionNotFound("Network not found with this criteria: '{}'".format(
- task.get("find_params")))
- elif len(vim_nets) > 1:
- raise NsWorkerException(
- "More than one network found with this criteria: '{}'".format(task["find_params"]))
- if vim_nets:
- vim_net_id = vim_nets[0]["id"]
- else:
- # CREATE
- params = task["params"]
- vim_net_id, created_items = target_vim.new_network(**params)
- created = True
-
- ro_vim_item_update = {"vim_id": vim_net_id,
- "vim_status": "BUILD",
- "created": created,
- "created_items": created_items,
- "vim_details": None}
- self.logger.debug(
- "task={} {} new-net={} created={}".format(task_id, ro_task["target_id"], vim_net_id, created))
- return "BUILD", ro_vim_item_update
- except (vimconn.VimConnException, NsWorkerException) as e:
- self.logger.error("task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e))
- ro_vim_item_update = {"vim_status": "VIM_ERROR",
- "created": created,
- "vim_details": str(e)}
- return "FAILED", ro_vim_item_update
-
- def refresh_net(self, ro_task):
- """Call VIM to get network status"""
- ro_task_id = ro_task["_id"]
+ def refresh(self, ro_task):
+ """Call VIM to get vm status"""
+ ro_task_id = ro_task["_id"]
target_vim = self.my_vims[ro_task["target_id"]]
vim_id = ro_task["vim_info"]["vim_id"]
- net_to_refresh_list = [vim_id]
+ if not vim_id:
+ return None, None
+ vm_to_refresh_list = [vim_id]
try:
- vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
+ vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
vim_info = vim_dict[vim_id]
if vim_info["status"] == "ACTIVE":
task_status = "DONE"
task_status = "BUILD"
else:
task_status = "FAILED"
+ # try to load and parse vim_information
+ try:
+ vim_info_info = yaml.safe_load(vim_info["vim_info"])
+ if vim_info_info.get("name"):
+ vim_info["name"] = vim_info_info["name"]
+ except Exception:
+ pass
except vimconn.VimConnException as e:
# Mark all tasks at VIM_ERROR status
- self.logger.error("ro_task={} vim={} get-net={}: {}".format(ro_task_id, ro_task["target_id"], vim_id, e))
+ self.logger.error("ro_task={} vim={} get-vm={}: {}".format(ro_task_id, ro_task["target_id"], vim_id, e))
vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
task_status = "FAILED"
ro_vim_item_update = {}
+ # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
+ vim_interfaces = []
+ if vim_info.get("interfaces"):
+ for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
+ iface = next((iface for iface in vim_info["interfaces"] if vim_iface_id == iface["vim_interface_id"]),
+ None)
+ # if iface:
+ # iface.pop("vim_info", None)
+ vim_interfaces.append(iface)
+
+ task_create = next(t for t in ro_task["tasks"] if t and t["action"] == "CREATE" and t["status"] != "FINISHED")
+ if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None:
+ vim_interfaces[task_create["mgmt_vnf_interface"]]["mgmt_vnf_interface"] = True
+ mgmt_vdu_iface = task_create.get("mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0))
+ if vim_interfaces:
+ vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
+
+ if ro_task["vim_info"]["interfaces"] != vim_interfaces:
+ ro_vim_item_update["interfaces"] = vim_interfaces
if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
ro_vim_item_update["vim_status"] = vim_info["status"]
if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
ro_vim_item_update["vim_name"] = vim_info.get("name")
if vim_info["status"] in ("ERROR", "VIM_ERROR"):
- if ro_task["vim_info"]["vim_details"] != vim_info["error_msg"]:
- ro_vim_item_update["vim_details"] = vim_info["error_msg"]
+ if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"):
+ ro_vim_item_update["vim_details"] = vim_info.get("error_msg")
elif vim_info["status"] == "DELETED":
ro_vim_item_update["vim_id"] = None
ro_vim_item_update["vim_details"] = "Deleted externally"
if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
ro_vim_item_update["vim_details"] = vim_info["vim_info"]
if ro_vim_item_update:
- self.logger.debug("ro_task={} {} get-net={}: status={} {}".format(
+ self.logger.debug("ro_task={} {} get-vm={}: status={} {}".format(
ro_task_id, ro_task["target_id"], vim_id, ro_vim_item_update.get("vim_status"),
ro_vim_item_update.get("vim_details") if ro_vim_item_update.get("vim_status") != "ACTIVE" else ''))
return task_status, ro_vim_item_update
- def del_net(self, ro_task, task_index):
+ def exec(self, ro_task, task_index, task_depends):
task = ro_task["tasks"][task_index]
task_id = task["task_id"]
- net_vim_id = ro_task["vim_info"]["vim_id"]
+ target_vim = self.my_vims[ro_task["target_id"]]
+ db_task_update = {"retries": 0}
+ retries = task.get("retries", 0)
+ try:
+ params = task["params"]
+ params_copy = deepcopy(params)
+ params_copy["ro_key"] = self.db.decrypt(params_copy.pop("private_key"),
+ params_copy.pop("schema_version"), params_copy.pop("salt"))
+ params_copy["ip_addr"] = params_copy.pop("ip_address")
+ target_vim.inject_user_key(**params_copy)
+ self.logger.debug(
+ "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"]))
+ return "DONE", None, db_task_update, # params_copy["key"]
+ except (vimconn.VimConnException, NsWorkerException) as e:
+ retries += 1
+ if retries < self.max_retries_inject_ssh_key:
+ return "BUILD", None, {"retries": retries, "next_retry": self.time_retries_inject_ssh_key}
+ self.logger.error("task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e))
+ ro_vim_item_update = {"vim_details": str(e)}
+ return "FAILED", ro_vim_item_update, db_task_update
+
+
+class VimInteractionImage(VimInteractionBase):
+
+ def new(self, ro_task, task_index, task_depends):
+ task = ro_task["tasks"][task_index]
+ task_id = task["task_id"]
+ created = False
+ created_items = {}
+ target_vim = self.my_vims[ro_task["target_id"]]
+ try:
+ # FIND
+ if task.get("find_params"):
+ vim_images = target_vim.get_image_list(**task["find_params"])
+ if not vim_images:
+ raise NsWorkerExceptionNotFound("Image not found with this criteria: '{}'".format(
+ task["find_params"]))
+ elif len(vim_images) > 1:
+ raise NsWorkerException(
+ "More than one network found with this criteria: '{}'".format(task["find_params"]))
+ else:
+ vim_image_id = vim_images[0]["id"]
+
+ ro_vim_item_update = {"vim_id": vim_image_id,
+ "vim_status": "DONE",
+ "created": created,
+ "created_items": created_items,
+ "vim_details": None}
+ self.logger.debug(
+ "task={} {} new-image={} created={}".format(task_id, ro_task["target_id"], vim_image_id, created))
+ return "DONE", ro_vim_item_update
+ except (NsWorkerException, vimconn.VimConnException) as e:
+ self.logger.error("task={} {} new-image: {}".format(task_id, ro_task["target_id"], e))
+ ro_vim_item_update = {"vim_status": "VIM_ERROR",
+ "created": created,
+ "vim_details": str(e)}
+ return "FAILED", ro_vim_item_update
+
+
+class VimInteractionFlavor(VimInteractionBase):
+
+ def delete(self, ro_task, task_index):
+ task = ro_task["tasks"][task_index]
+ task_id = task["task_id"]
+ flavor_vim_id = ro_task["vim_info"]["vim_id"]
ro_vim_item_update_ok = {"vim_status": "DELETED",
"created": False,
"vim_details": "DELETED",
"vim_id": None}
try:
- if net_vim_id or ro_task["vim_info"]["created_items"]:
+ if flavor_vim_id:
target_vim = self.my_vims[ro_task["target_id"]]
- target_vim.delete_network(net_vim_id, ro_task["vim_info"]["created_items"])
+ target_vim.delete_flavor(flavor_vim_id)
except vimconn.VimConnNotFoundException:
ro_vim_item_update_ok["vim_details"] = "already deleted"
except vimconn.VimConnException as e:
- self.logger.error("ro_task={} vim={} del-net={}: {}".format(ro_task["_id"], ro_task["target_id"],
- net_vim_id, e))
+ self.logger.error("ro_task={} vim={} del-flavor={}: {}".format(
+ ro_task["_id"], ro_task["target_id"], flavor_vim_id, e))
ro_vim_item_update = {"vim_status": "VIM_ERROR",
"vim_details": "Error while deleting: {}".format(e)}
return "FAILED", ro_vim_item_update
- self.logger.debug("task={} {} del-net={} {}".format(task_id, ro_task["target_id"], net_vim_id,
- ro_vim_item_update_ok.get("vim_details", "")))
+ self.logger.debug("task={} {} del-flavor={} {}".format(
+ task_id, ro_task["target_id"], flavor_vim_id, ro_vim_item_update_ok.get("vim_details", "")))
+ return "DONE", ro_vim_item_update_ok
+
+ def new(self, ro_task, task_index, task_depends):
+ task = ro_task["tasks"][task_index]
+ task_id = task["task_id"]
+ created = False
+ created_items = {}
+ target_vim = self.my_vims[ro_task["target_id"]]
+ try:
+ # FIND
+ vim_flavor_id = None
+ if task.get("find_params"):
+ try:
+ flavor_data = task["find_params"]["flavor_data"]
+ vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
+ except vimconn.VimConnNotFoundException:
+ pass
+
+ if not vim_flavor_id and task.get("params"):
+ # CREATE
+ flavor_data = task["params"]["flavor_data"]
+ vim_flavor_id = target_vim.new_flavor(flavor_data)
+ created = True
+
+ ro_vim_item_update = {"vim_id": vim_flavor_id,
+ "vim_status": "DONE",
+ "created": created,
+ "created_items": created_items,
+ "vim_details": None}
+ self.logger.debug(
+ "task={} {} new-flavor={} created={}".format(task_id, ro_task["target_id"], vim_flavor_id, created))
+ return "DONE", ro_vim_item_update
+ except (vimconn.VimConnException, NsWorkerException) as e:
+ self.logger.error("task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e))
+ ro_vim_item_update = {"vim_status": "VIM_ERROR",
+ "created": created,
+ "vim_details": str(e)}
+ return "FAILED", ro_vim_item_update
+
+
+class VimInteractionSdnNet(VimInteractionBase):
+
+ @staticmethod
+ def _match_pci(port_pci, mapping):
+ """
+ Check if port_pci matches with mapping
+ mapping can have brackets to indicate that several chars are accepted. e.g
+ pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
+ :param port_pci: text
+ :param mapping: text, can contain brackets to indicate several chars are available
+ :return: True if matches, False otherwise
+ """
+ if not port_pci or not mapping:
+ return False
+ if port_pci == mapping:
+ return True
+
+ mapping_index = 0
+ pci_index = 0
+ while True:
+ bracket_start = mapping.find("[", mapping_index)
+ if bracket_start == -1:
+ break
+ bracket_end = mapping.find("]", bracket_start)
+ if bracket_end == -1:
+ break
+ length = bracket_start - mapping_index
+ if length and port_pci[pci_index:pci_index + length] != mapping[mapping_index:bracket_start]:
+ return False
+ if port_pci[pci_index + length] not in mapping[bracket_start+1:bracket_end]:
+ return False
+ pci_index += length + 1
+ mapping_index = bracket_end + 1
+
+ if port_pci[pci_index:] != mapping[mapping_index:]:
+ return False
+ return True
+
+ def _get_interfaces(self, vlds_to_connect, vim_account_id):
+ """
+ :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
+ :param vim_account_id:
+ :return:
+ """
+ interfaces = []
+ for vld in vlds_to_connect:
+ table, _, db_id = vld.partition(":")
+ db_id, _, vld = db_id.partition(":")
+ _, _, vld_id = vld.partition(".")
+ if table == "vnfrs":
+ q_filter = {"vim-account-id": vim_account_id, "_id": db_id}
+ iface_key = "vnf-vld-id"
+ else: # table == "nsrs"
+ q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id}
+ iface_key = "ns-vld-id"
+ db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter)
+ for db_vnfr in db_vnfrs:
+ for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())):
+ for iface_index, interface in enumerate(vdur["interfaces"]):
+ if interface.get(iface_key) == vld_id and \
+ interface.get("type") in ("SR-IOV", "PCI-PASSTHROUGH"):
+ # only SR-IOV o PT
+ interface_ = interface.copy()
+ interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(db_vnfr["_id"], vdu_index,
+ iface_index)
+ if vdur.get("status") == "ERROR":
+ interface_["status"] = "ERROR"
+ interfaces.append(interface_)
+ return interfaces
+
+ def refresh(self, ro_task):
+ # look for task create
+ task_create_index, _ = next(i_t for i_t in enumerate(ro_task["tasks"])
+ if i_t[1] and i_t[1]["action"] == "CREATE" and i_t[1]["status"] != "FINISHED")
+
+ return self.new(ro_task, task_create_index, None)
+
+ def new(self, ro_task, task_index, task_depends):
+
+ task = ro_task["tasks"][task_index]
+ task_id = task["task_id"]
+ target_vim = self.my_vims[ro_task["target_id"]]
+
+ sdn_net_id = ro_task["vim_info"]["vim_id"]
+
+ created_items = ro_task["vim_info"].get("created_items")
+ connected_ports = ro_task["vim_info"].get("connected_ports", [])
+ new_connected_ports = []
+ last_update = ro_task["vim_info"].get("last_update", 0)
+ sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD"
+ error_list = []
+ created = ro_task["vim_info"].get("created", False)
+
+ try:
+
+ # CREATE
+ params = task["params"]
+ vlds_to_connect = params["vlds"]
+ associated_vim = params["target_vim"]
+ additional_ports = params.get("sdn-ports") or () # external additional ports
+ _, _, vim_account_id = associated_vim.partition(":")
+ if associated_vim:
+ # get associated VIM
+ if associated_vim not in self.db_vims:
+ self.db_vims[associated_vim] = self.db.get_one("vim_accounts", {"_id": vim_account_id})
+ db_vim = self.db_vims[associated_vim]
+
+ # look for ports to connect
+ ports = self._get_interfaces(vlds_to_connect, vim_account_id)
+ # print(ports)
+
+ sdn_ports = []
+ pending_ports = error_ports = 0
+ vlan_used = None
+ sdn_need_update = False
+ for port in ports:
+ vlan_used = port.get("vlan") or vlan_used
+ # TODO. Do not connect if already done
+ if not port.get("compute_node") or not port.get("pci"):
+ if port.get("status") == "ERROR":
+ error_ports += 1
+ else:
+ pending_ports += 1
+ continue
+ pmap = None
+ compute_node_mappings = next((c for c in db_vim["config"].get("sdn-port-mapping", ())
+ if c and c["compute_node"] == port["compute_node"]), None)
+ if compute_node_mappings:
+ # process port_mapping pci of type 0000:af:1[01].[1357]
+ pmap = next((p for p in compute_node_mappings["ports"]
+ if self._match_pci(port["pci"], p.get("pci"))), None)
+ if not pmap:
+ if not db_vim["config"].get("mapping_not_needed"):
+ error_list.append("Port mapping not found for compute_node={} pci={}".format(
+ port["compute_node"], port["pci"]))
+ continue
+ pmap = {}
+
+ service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"])
+ new_port = {
+ "service_endpoint_id": pmap.get("service_endpoint_id") or service_endpoint_id,
+ "service_endpoint_encapsulation_type": "dot1q" if port["type"] == "SR-IOV" else None,
+ "service_endpoint_encapsulation_info": {
+ "vlan": port.get("vlan"),
+ "mac": port.get("mac_address"),
+ "device_id": pmap.get("device_id") or port["compute_node"], # device_id
+ "device_interface_id": pmap.get("device_interface_id") or port["pci"],
+ "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"),
+ "switch_port": pmap.get("switch_port"),
+ "service_mapping_info": pmap.get("service_mapping_info"),
+ }
+ }
+
+ # TODO
+ # if port["modified_at"] > last_update:
+ # sdn_need_update = True
+ new_connected_ports.append(port["id"]) # TODO
+ sdn_ports.append(new_port)
+
+ if error_ports:
+ error_list.append("{} interfaces have not been created as VDU is on ERROR status".format(error_ports))
+
+ # connect external ports
+ for index, additional_port in enumerate(additional_ports):
+ additional_port_id = additional_port.get("service_endpoint_id") or "external-{}".format(index)
+ sdn_ports.append({
+ "service_endpoint_id": additional_port_id,
+ "service_endpoint_encapsulation_type": additional_port.get("service_endpoint_encapsulation_type",
+ "dot1q"),
+ "service_endpoint_encapsulation_info": {
+ "vlan": additional_port.get("vlan") or vlan_used,
+ "mac": additional_port.get("mac_address"),
+ "device_id": additional_port.get("device_id"),
+ "device_interface_id": additional_port.get("device_interface_id"),
+ "switch_dpid": additional_port.get("switch_dpid") or additional_port.get("switch_id"),
+ "switch_port": additional_port.get("switch_port"),
+ "service_mapping_info": additional_port.get("service_mapping_info"),
+ }})
+ new_connected_ports.append(additional_port_id)
+ sdn_info = ""
+ # if there are more ports to connect or they have been modified, call create/update
+ if error_list:
+ sdn_status = "ERROR"
+ sdn_info = "; ".join(error_list)
+ elif set(connected_ports) != set(new_connected_ports) or sdn_need_update:
+ last_update = time.time()
+ if not sdn_net_id:
+ if len(sdn_ports) < 2:
+ sdn_status = "ACTIVE"
+ if not pending_ports:
+ self.logger.debug("task={} {} new-sdn-net done, less than 2 ports".
+ format(task_id, ro_task["target_id"]))
+ else:
+ net_type = params.get("type") or "ELAN"
+ sdn_net_id, created_items = target_vim.create_connectivity_service(
+ net_type, sdn_ports)
+ created = True
+ self.logger.debug("task={} {} new-sdn-net={} created={}".
+ format(task_id, ro_task["target_id"], sdn_net_id, created))
+ else:
+ created_items = target_vim.edit_connectivity_service(
+ sdn_net_id, conn_info=created_items, connection_points=sdn_ports)
+ created = True
+ self.logger.debug("task={} {} update-sdn-net={} created={}".
+ format(task_id, ro_task["target_id"], sdn_net_id, created))
+ connected_ports = new_connected_ports
+ elif sdn_net_id:
+ wim_status_dict = target_vim.get_connectivity_service_status(sdn_net_id, conn_info=created_items)
+ sdn_status = wim_status_dict["sdn_status"]
+ if wim_status_dict.get("sdn_info"):
+ sdn_info = str(wim_status_dict.get("sdn_info")) or ""
+ if wim_status_dict.get("error_msg"):
+ sdn_info = wim_status_dict.get("error_msg") or ""
+
+ if pending_ports:
+ if sdn_status != "ERROR":
+ sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
+ len(ports)-pending_ports, len(ports))
+ if sdn_status == "ACTIVE":
+ sdn_status = "BUILD"
+
+ ro_vim_item_update = {"vim_id": sdn_net_id,
+ "vim_status": sdn_status,
+ "created": created,
+ "created_items": created_items,
+ "connected_ports": connected_ports,
+ "vim_details": sdn_info,
+ "last_update": last_update}
+ return sdn_status, ro_vim_item_update
+ except Exception as e:
+ self.logger.error("task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e),
+ exc_info=not isinstance(e, (sdnconn.SdnConnectorError, vimconn.VimConnException)))
+ ro_vim_item_update = {"vim_status": "VIM_ERROR",
+ "created": created,
+ "vim_details": str(e)}
+ return "FAILED", ro_vim_item_update
+
+ def delete(self, ro_task, task_index):
+ task = ro_task["tasks"][task_index]
+ task_id = task["task_id"]
+ sdn_vim_id = ro_task["vim_info"].get("vim_id")
+ ro_vim_item_update_ok = {"vim_status": "DELETED",
+ "created": False,
+ "vim_details": "DELETED",
+ "vim_id": None}
+ try:
+ if sdn_vim_id:
+ target_vim = self.my_vims[ro_task["target_id"]]
+ target_vim.delete_connectivity_service(sdn_vim_id, ro_task["vim_info"].get("created_items"))
+
+ except Exception as e:
+ if isinstance(e, sdnconn.SdnConnectorError) and e.http_code == HTTPStatus.NOT_FOUND.value:
+ ro_vim_item_update_ok["vim_details"] = "already deleted"
+ else:
+ self.logger.error("ro_task={} vim={} del-sdn-net={}: {}".format(ro_task["_id"], ro_task["target_id"],
+ sdn_vim_id, e),
+ exc_info=not isinstance(e, (sdnconn.SdnConnectorError, vimconn.VimConnException)))
+ ro_vim_item_update = {"vim_status": "VIM_ERROR",
+ "vim_details": "Error while deleting: {}".format(e)}
+ return "FAILED", ro_vim_item_update
+
+ self.logger.debug("task={} {} del-sdn-net={} {}".format(task_id, ro_task["target_id"], sdn_vim_id,
+ ro_vim_item_update_ok.get("vim_details", "")))
return "DONE", ro_vim_item_update_ok
- def new_vm(self, ro_task, task_index, task_depends):
- task = ro_task["tasks"][task_index]
- task_id = task["task_id"]
- created = False
- created_items = {}
- target_vim = self.my_vims[ro_task["target_id"]]
- try:
- created = True
- params = task["params"]
- params_copy = deepcopy(params)
- net_list = params_copy["net_list"]
- for net in net_list:
- if "net_id" in net and net["net_id"].startswith("TASK-"): # change task_id into network_id
- network_id = task_depends[net["net_id"]]
- if not network_id:
- raise NsWorkerException("Cannot create VM because depends on a network not created or found "
- "for {}".format(net["net_id"]))
- net["net_id"] = network_id
- if params_copy["image_id"].startswith("TASK-"):
- params_copy["image_id"] = task_depends[params_copy["image_id"]]
- if params_copy["flavor_id"].startswith("TASK-"):
- params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
- vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
- interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
+class NsWorker(threading.Thread):
+ REFRESH_BUILD = 5 # 5 seconds
+ REFRESH_ACTIVE = 60 # 1 minute
+ REFRESH_ERROR = 600
+ REFRESH_IMAGE = 3600 * 10
+ REFRESH_DELETE = 3600 * 10
+ QUEUE_SIZE = 2000
+ # TODO delete assigment_lock = Lock()
+ terminate = False
+ # TODO delete assignment = {}
+ MAX_TIME_LOCKED = 3600
+ MAX_TIME_VIM_LOCKED = 120
+
+ def __init__(self, worker_index, config, plugins, db):
+ """
+
+ :param worker_index: thread index
+ :param config: general configuration of RO, among others the process_id with the docker id where it runs
+ :param plugins: global shared dict with the loaded plugins
+ :param db: database class instance to use
+ """
+ threading.Thread.__init__(self)
+ self.config = config
+ self.plugins = plugins
+ self.plugin_name = "unknown"
+ self.logger = logging.getLogger('ro.worker{}'.format(worker_index))
+ self.worker_index = worker_index
+ self.task_queue = queue.Queue(self.QUEUE_SIZE)
+ self.my_vims = {} # targetvim: vimplugin class
+ self.db_vims = {} # targetvim: vim information from database
+ self.vim_targets = [] # targetvim list
+ self.my_id = config["process_id"] + ":" + str(worker_index)
+ self.db = db
+ self.item2class = {
+ "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger),
+ "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger),
+ "image": VimInteractionImage(self.db, self.my_vims, self.db_vims, self.logger),
+ "flavor": VimInteractionFlavor(self.db, self.my_vims, self.db_vims, self.logger),
+ "sdn_net": VimInteractionSdnNet(self.db, self.my_vims, self.db_vims, self.logger),
+ }
+ self.time_last_task_processed = None
+ self.tasks_to_delete = [] # lists of tasks to delete because nsrs or vnfrs has been deleted from db
+
+ def insert_task(self, task):
+ try:
+ self.task_queue.put(task, False)
+ return None
+ except queue.Full:
+ raise NsWorkerException("timeout inserting a task")
+
+ def terminate(self):
+ self.insert_task("exit")
+
+ def del_task(self, task):
+ with self.task_lock:
+ if task["status"] == "SCHEDULED":
+ task["status"] = "SUPERSEDED"
+ return True
+ else: # task["status"] == "processing"
+ self.task_lock.release()
+ return False
+
+ def _process_vim_config(self, target_id, db_vim):
+ """
+ Process vim config, creating vim configuration files as ca_cert
+ :param target_id: vim/sdn/wim + id
+ :param db_vim: Vim dictionary obtained from database
+ :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
+ """
+ if not db_vim.get("config"):
+ return
+ file_name = ""
+ try:
+ if db_vim["config"].get("ca_cert_content"):
+ file_name = "{}:{}".format(target_id, self.worker_index)
+ try:
+ mkdir(file_name)
+ except FileExistsError:
+ pass
+ file_name = file_name + "/ca_cert"
+ with open(file_name, "w") as f:
+ f.write(db_vim["config"]["ca_cert_content"])
+ del db_vim["config"]["ca_cert_content"]
+ db_vim["config"]["ca_cert"] = file_name
+ except Exception as e:
+ raise NsWorkerException("Error writing to file '{}': {}".format(file_name, e))
+
+ def _load_plugin(self, name, type="vim"):
+ # type can be vim or sdn
+ if "rovim_dummy" not in self.plugins:
+ self.plugins["rovim_dummy"] = VimDummyConnector
+ if "rosdn_dummy" not in self.plugins:
+ self.plugins["rosdn_dummy"] = SdnDummyConnector
+ if name in self.plugins:
+ return self.plugins[name]
+ try:
+ for v in iter_entry_points('osm_ro{}.plugins'.format(type), name):
+ self.plugins[name] = v.load()
+ except Exception as e:
+ raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e))
+ if name and name not in self.plugins:
+ raise NsWorkerException("Plugin 'osm_{n}' has not been installed".format(n=name))
+ return self.plugins[name]
+
+ def _unload_vim(self, target_id):
+ """
+ Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
+ :param target_id: Contains type:_id; where type can be 'vim', ...
+ :return: None.
+ """
+ try:
+ target, _, _id = target_id.partition(":")
+ self.db_vims.pop(target_id, None)
+ self.my_vims.pop(target_id, None)
+ self.vim_targets.remove(target_id)
+ rmtree("{}:{}".format(target_id, self.worker_index))
+ except FileNotFoundError:
+ pass # this is raised by rmtree if folder does not exist
+ except Exception as e:
+ self.logger.error("Cannot unload {}: {}".format(target_id, e))
+
+ def _check_vim(self, target_id):
+ """
+ Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
+ :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
+ :return: None.
+ """
+ target, _, _id = target_id.partition(":")
+ now = time.time()
+ update_dict = {}
+ unset_dict = {}
+ op_text = ""
+ step = ""
+ loaded = target_id in self.my_vims
+ target_database = "vim_accounts" if target == "vim" else "wim_accounts" if target == "wim" else "sdns"
+ try:
+ step = "Getting {} from db".format(target_id)
+ db_vim = self.db.get_one(target_database, {"_id": _id})
+ for op_index, operation in enumerate(db_vim["_admin"].get("operations", ())):
+ if operation["operationState"] != "PROCESSING":
+ continue
+ locked_at = operation.get("locked_at")
+ if locked_at is not None and locked_at >= now - self.MAX_TIME_VIM_LOCKED:
+ # some other thread is doing this operation
+ return
+ # lock
+ op_text = "_admin.operations.{}.".format(op_index)
+ if not self.db.set_one(target_database,
+ q_filter={"_id": _id,
+ op_text + "operationState": "PROCESSING",
+ op_text + "locked_at": locked_at
+ },
+ update_dict={op_text + "locked_at": now,
+ "admin.current_operation": op_index},
+ fail_on_empty=False):
+ return
+ unset_dict[op_text + "locked_at"] = None
+ unset_dict["current_operation"] = None
+ step = "Loading " + target_id
+ error_text = self._load_vim(target_id)
+ if not error_text:
+ step = "Checking connectivity"
+ if target == 'vim':
+ self.my_vims[target_id].check_vim_connectivity()
+ else:
+ self.my_vims[target_id].check_credentials()
+ update_dict["_admin.operationalState"] = "ENABLED"
+ update_dict["_admin.detailed-status"] = ""
+ unset_dict[op_text + "detailed-status"] = None
+ update_dict[op_text + "operationState"] = "COMPLETED"
+ return
+
+ except Exception as e:
+ error_text = "{}: {}".format(step, e)
+ self.logger.error("{} for {}: {}".format(step, target_id, e))
+
+ finally:
+ if update_dict or unset_dict:
+ if error_text:
+ update_dict[op_text + "operationState"] = "FAILED"
+ update_dict[op_text + "detailed-status"] = error_text
+ unset_dict.pop(op_text + "detailed-status", None)
+ update_dict["_admin.operationalState"] = "ERROR"
+ update_dict["_admin.detailed-status"] = error_text
+ if op_text:
+ update_dict[op_text + "statusEnteredTime"] = now
+ self.db.set_one(target_database, q_filter={"_id": _id}, update_dict=update_dict, unset=unset_dict,
+ fail_on_empty=False)
+ if not loaded:
+ self._unload_vim(target_id)
+
+ def _reload_vim(self, target_id):
+ if target_id in self.vim_targets:
+ self._load_vim(target_id)
+ else:
+ # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
+ # just remove it to force load again next time it is needed
+ self.db_vims.pop(target_id, None)
+
+ def _load_vim(self, target_id):
+ """
+ Load or reload a vim_account, sdn_controller or wim_account.
+ Read content from database, load the plugin if not loaded.
+ In case of error loading the plugin, it load a failing VIM_connector
+ It fills self db_vims dictionary, my_vims dictionary and vim_targets list
+ :param target_id: Contains type:_id; where type can be 'vim', ...
+ :return: None if ok, descriptive text if error
+ """
+ target, _, _id = target_id.partition(":")
+ target_database = "vim_accounts" if target == "vim" else "wim_accounts" if target == "wim" else "sdns"
+ plugin_name = ""
+ vim = None
+ try:
+ step = "Getting {}={} from db".format(target, _id)
+ # TODO process for wim, sdnc, ...
+ vim = self.db.get_one(target_database, {"_id": _id})
+
+ # if deep_get(vim, "config", "sdn-controller"):
+ # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
+ # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
+
+ step = "Decrypting password"
+ schema_version = vim.get("schema_version")
+ self.db.encrypt_decrypt_fields(vim, "decrypt", fields=('password', 'secret'),
+ schema_version=schema_version, salt=_id)
+ self._process_vim_config(target_id, vim)
+ if target == "vim":
+ plugin_name = "rovim_" + vim["vim_type"]
+ step = "Loading plugin '{}'".format(plugin_name)
+ vim_module_conn = self._load_plugin(plugin_name)
+ step = "Loading {}'".format(target_id)
+ self.my_vims[target_id] = vim_module_conn(
+ uuid=vim['_id'], name=vim['name'],
+ tenant_id=vim.get('vim_tenant_id'), tenant_name=vim.get('vim_tenant_name'),
+ url=vim['vim_url'], url_admin=None,
+ user=vim['vim_user'], passwd=vim['vim_password'],
+ config=vim.get('config') or {}, persistent_info={}
+ )
+ else: # sdn
+ plugin_name = "rosdn_" + vim["type"]
+ step = "Loading plugin '{}'".format(plugin_name)
+ vim_module_conn = self._load_plugin(plugin_name, "sdn")
+ step = "Loading {}'".format(target_id)
+ wim = deepcopy(vim)
+ wim_config = wim.pop("config", {}) or {}
+ wim["uuid"] = wim["_id"]
+ wim["wim_url"] = wim["url"]
+ if wim.get("dpid"):
+ wim_config["dpid"] = wim.pop("dpid")
+ if wim.get("switch_id"):
+ wim_config["switch_id"] = wim.pop("switch_id")
+ self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config) # wim, wim_account, config
+ self.db_vims[target_id] = vim
+ self.error_status = None
+ self.logger.info("Connector loaded for {}, plugin={}".format(target_id, plugin_name))
+ except Exception as e:
+ self.logger.error("Cannot load {} plugin={}: {} {}".format(
+ target_id, plugin_name, step, e))
+ self.db_vims[target_id] = vim or {}
+ self.db_vims[target_id] = FailingConnector(str(e))
+ error_status = "{} Error: {}".format(step, e)
+ return error_status
+ finally:
+ if target_id not in self.vim_targets:
+ self.vim_targets.append(target_id)
+
+ def _get_db_task(self):
+ """
+ Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
+ :return: None
+ """
+ now = time.time()
+ if not self.time_last_task_processed:
+ self.time_last_task_processed = now
+ try:
+ while True:
+ locked = self.db.set_one(
+ "ro_tasks",
+ q_filter={"target_id": self.vim_targets,
+ "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
+ "locked_at.lt": now - self.MAX_TIME_LOCKED,
+ "to_check_at.lt": self.time_last_task_processed},
+ update_dict={"locked_by": self.my_id, "locked_at": now},
+ fail_on_empty=False)
+ if locked:
+ # read and return
+ ro_task = self.db.get_one(
+ "ro_tasks",
+ q_filter={"target_id": self.vim_targets,
+ "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
+ "locked_at": now})
+ return ro_task
+ if self.time_last_task_processed == now:
+ self.time_last_task_processed = None
+ return None
+ else:
+ self.time_last_task_processed = now
+ # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
+
+ except DbException as e:
+ self.logger.error("Database exception at _get_db_task: {}".format(e))
+ except Exception as e:
+ self.logger.critical("Unexpected exception at _get_db_task: {}".format(e), exc_info=True)
+ return None
+
+ def _delete_task(self, ro_task, task_index, task_depends, db_update):
+ """
+ Determine if this task need to be done or superseded
+ :return: None
+ """
+ my_task = ro_task["tasks"][task_index]
+ task_id = my_task["task_id"]
+ needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get("created_items", False)
+ if my_task["status"] == "FAILED":
+ return None, None # TODO need to be retry??
+ try:
+ for index, task in enumerate(ro_task["tasks"]):
+ if index == task_index or not task:
+ continue # own task
+ if my_task["target_record"] == task["target_record"] and task["action"] == "CREATE":
+ # set to finished
+ db_update["tasks.{}.status".format(index)] = task["status"] = "FINISHED"
+ elif task["action"] == "CREATE" and task["status"] not in ("FINISHED", "SUPERSEDED"):
+ needed_delete = False
+ if needed_delete:
+ return self.item2class[my_task["item"]].delete(ro_task, task_index)
+ else:
+ return "SUPERSEDED", None
+ except Exception as e:
+ if not isinstance(e, NsWorkerException):
+ self.logger.critical("Unexpected exception at _delete_task task={}: {}".format(task_id, e),
+ exc_info=True)
+ return "FAILED", {"vim_status": "VIM_ERROR", "vim_details": str(e)}
+
+ def _create_task(self, ro_task, task_index, task_depends, db_update):
+ """
+ Determine if this task need to create something at VIM
+ :return: None
+ """
+ my_task = ro_task["tasks"][task_index]
+ task_id = my_task["task_id"]
+ task_status = None
+ if my_task["status"] == "FAILED":
+ return None, None # TODO need to be retry??
+ elif my_task["status"] == "SCHEDULED":
+ # check if already created by another task
+ for index, task in enumerate(ro_task["tasks"]):
+ if index == task_index or not task:
+ continue # own task
+ if task["action"] == "CREATE" and task["status"] not in ("SCHEDULED", "FINISHED", "SUPERSEDED"):
+ return task["status"], "COPY_VIM_INFO"
+
+ try:
+ task_status, ro_vim_item_update = self.item2class[my_task["item"]].new(
+ ro_task, task_index, task_depends)
+ # TODO update other CREATE tasks
+ except Exception as e:
+ if not isinstance(e, NsWorkerException):
+ self.logger.error("Error executing task={}: {}".format(task_id, e), exc_info=True)
+ task_status = "FAILED"
+ ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_details": str(e)}
+ # TODO update ro_vim_item_update
+ return task_status, ro_vim_item_update
+ else:
+ return None, None
+
+ def _get_dependency(self, task_id, ro_task=None, target_id=None):
+ """
+ Look for dependency task
+ :param task_id: Can be one of
+ 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
+ 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
+ 3. task.task_id: "<action_id>:number"
+ :param ro_task:
+ :param target_id:
+ :return: database ro_task plus index of task
+ """
+ if task_id.startswith("vim:") or task_id.startswith("sdn:") or task_id.startswith("wim:"):
+ target_id, _, task_id = task_id.partition(" ")
+
+ if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
+ ro_task_dependency = self.db.get_one(
+ "ro_tasks",
+ q_filter={"target_id": target_id,
+ "tasks.target_record_id": task_id
+ },
+ fail_on_empty=False)
+ if ro_task_dependency:
+ for task_index, task in enumerate(ro_task_dependency["tasks"]):
+ if task["target_record_id"] == task_id:
+ return ro_task_dependency, task_index
+
+ else:
+ if ro_task:
+ for task_index, task in enumerate(ro_task["tasks"]):
+ if task and task["task_id"] == task_id:
+ return ro_task, task_index
+ ro_task_dependency = self.db.get_one(
+ "ro_tasks",
+ q_filter={"tasks.ANYINDEX.task_id": task_id,
+ "tasks.ANYINDEX.target_record.ne": None
+ },
+ fail_on_empty=False)
+ if ro_task_dependency:
+ for task_index, task in ro_task_dependency["tasks"]:
+ if task["task_id"] == task_id:
+ return ro_task_dependency, task_index
+ raise NsWorkerException("Cannot get depending task {}".format(task_id))
+
+ def _process_pending_tasks(self, ro_task):
+ ro_task_id = ro_task["_id"]
+ now = time.time()
+ next_check_at = now + (24*60*60) # one day
+ db_ro_task_update = {}
+
+ def _update_refresh(new_status):
+ # compute next_refresh
+ nonlocal task
+ nonlocal next_check_at
+ nonlocal db_ro_task_update
+ nonlocal ro_task
- ro_vim_item_update = {"vim_id": vim_vm_id,
- "vim_status": "BUILD",
- "created": created,
- "created_items": created_items,
- "vim_details": None,
- "interfaces_vim_ids": interfaces,
- "interfaces": [],
- }
- self.logger.debug(
- "task={} {} new-vm={} created={}".format(task_id, ro_task["target_id"], vim_vm_id, created))
- return "BUILD", ro_vim_item_update
- except (vimconn.VimConnException, NsWorkerException) as e:
- self.logger.error("task={} vim={} new-vm: {}".format(task_id, ro_task["target_id"], e))
- ro_vim_item_update = {"vim_status": "VIM_ERROR",
- "created": created,
- "vim_details": str(e)}
- return "FAILED", ro_vim_item_update
+ next_refresh = time.time()
+ if task["item"] in ("image", "flavor"):
+ next_refresh += self.REFRESH_IMAGE
+ elif new_status == "BUILD":
+ next_refresh += self.REFRESH_BUILD
+ elif new_status == "DONE":
+ next_refresh += self.REFRESH_ACTIVE
+ else:
+ next_refresh += self.REFRESH_ERROR
+ next_check_at = min(next_check_at, next_refresh)
+ db_ro_task_update["vim_info.refresh_at"] = next_refresh
+ ro_task["vim_info"]["refresh_at"] = next_refresh
- def del_vm(self, ro_task, task_index):
- task = ro_task["tasks"][task_index]
- task_id = task["task_id"]
- vm_vim_id = ro_task["vim_info"]["vim_id"]
- ro_vim_item_update_ok = {"vim_status": "DELETED",
- "created": False,
- "vim_details": "DELETED",
- "vim_id": None}
try:
- if vm_vim_id or ro_task["vim_info"]["created_items"]:
- target_vim = self.my_vims[ro_task["target_id"]]
- target_vim.delete_vminstance(vm_vim_id, ro_task["vim_info"]["created_items"])
+ # 0 get task_status_create
+ task_status_create = None
+ task_create = next((t for t in ro_task["tasks"] if t and t["action"] == "CREATE" and
+ t["status"] in ("BUILD", "DONE")), None)
+ if task_create:
+ task_status_create = task_create["status"]
+ # 1. look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
+ for task_action in ("DELETE", "CREATE", "EXEC"):
+ db_vim_update = None
+ new_status = None
+ for task_index, task in enumerate(ro_task["tasks"]):
+ if not task:
+ continue # task deleted
+ target_update = None
+ if (task_action in ("DELETE", "EXEC") and task["status"] not in ("SCHEDULED", "BUILD")) or \
+ task["action"] != task_action or \
+ (task_action == "CREATE" and task["status"] in ("FINISHED", "SUPERSEDED")):
+ continue
+ task_path = "tasks.{}.status".format(task_index)
+ try:
+ db_vim_info_update = None
+ if task["status"] == "SCHEDULED":
+ task_depends = {}
+ # check if tasks that this depends on have been completed
+ dependency_not_completed = False
+ for dependency_task_id in (task.get("depends_on") or ()):
+ dependency_ro_task, dependency_task_index = \
+ self._get_dependency(dependency_task_id, target_id=ro_task["target_id"])
+ dependency_task = dependency_ro_task["tasks"][dependency_task_index]
+ if dependency_task["status"] == "SCHEDULED":
+ dependency_not_completed = True
+ next_check_at = min(next_check_at, dependency_ro_task["to_check_at"])
+ break
+ elif dependency_task["status"] == "FAILED":
+ error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
+ task["action"], task["item"], dependency_task["action"],
+ dependency_task["item"], dependency_task_id,
+ dependency_ro_task["vim_info"].get("vim_details"))
+ self.logger.error("task={} {}".format(task["task_id"], error_text))
+ raise NsWorkerException(error_text)
- except vimconn.VimConnNotFoundException:
- ro_vim_item_update_ok["vim_details"] = "already deleted"
+ task_depends[dependency_task_id] = dependency_ro_task["vim_info"]["vim_id"]
+ task_depends["TASK-{}".format(dependency_task_id)] = \
+ dependency_ro_task["vim_info"]["vim_id"]
+ if dependency_not_completed:
+ # TODO set at vim_info.vim_details that it is waiting
+ continue
- except vimconn.VimConnException as e:
- self.logger.error("ro_task={} vim={} del-vm={}: {}".format(ro_task["_id"], ro_task["target_id"],
- vm_vim_id, e))
- ro_vim_item_update = {"vim_status": "VIM_ERROR",
- "vim_details": "Error while deleting: {}".format(e)}
- return "FAILED", ro_vim_item_update
+ if task["action"] == "DELETE":
+ new_status, db_vim_info_update = self._delete_task(ro_task, task_index,
+ task_depends, db_ro_task_update)
+ new_status = "FINISHED" if new_status == "DONE" else new_status
+ # ^with FINISHED instead of DONE it will not be refreshing
+ if new_status in ("FINISHED", "SUPERSEDED"):
+ target_update = "DELETE"
+ elif task["action"] == "EXEC":
+ new_status, db_vim_info_update, db_task_update = self.item2class[task["item"]].exec(
+ ro_task, task_index, task_depends)
+ new_status = "FINISHED" if new_status == "DONE" else new_status
+ # ^with FINISHED instead of DONE it will not be refreshing
+ if db_task_update:
+ # load into database the modified db_task_update "retries" and "next_retry"
+ if db_task_update.get("retries"):
+ db_ro_task_update["tasks.{}.retries".format(task_index)] = db_task_update["retries"]
+ next_check_at = time.time() + db_task_update.get("next_retry", 60)
+ target_update = None
+ elif task["action"] == "CREATE":
+ if task["status"] == "SCHEDULED":
+ if task_status_create:
+ new_status = task_status_create
+ target_update = "COPY_VIM_INFO"
+ else:
+ new_status, db_vim_info_update = \
+ self.item2class[task["item"]].new(ro_task, task_index, task_depends)
+ # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
+ _update_refresh(new_status)
+ else:
+ if ro_task["vim_info"]["refresh_at"] and now > ro_task["vim_info"]["refresh_at"]:
+ new_status, db_vim_info_update = self.item2class[task["item"]].refresh(ro_task)
+ _update_refresh(new_status)
+ except Exception as e:
+ new_status = "FAILED"
+ db_vim_info_update = {"vim_status": "VIM_ERROR", "vim_details": str(e)}
+ if not isinstance(e, (NsWorkerException, vimconn.VimConnException)):
+ self.logger.error("Unexpected exception at _delete_task task={}: {}".
+ format(task["task_id"], e), exc_info=True)
- self.logger.debug("task={} {} del-vm={} {}".format(task_id, ro_task["target_id"], vm_vim_id,
- ro_vim_item_update_ok.get("vim_details", "")))
- return "DONE", ro_vim_item_update_ok
+ try:
+ if db_vim_info_update:
+ db_vim_update = db_vim_info_update.copy()
+ db_ro_task_update.update({"vim_info." + k: v for k, v in db_vim_info_update.items()})
+ ro_task["vim_info"].update(db_vim_info_update)
- def refresh_vm(self, ro_task):
- """Call VIM to get vm status"""
- ro_task_id = ro_task["_id"]
- target_vim = self.my_vims[ro_task["target_id"]]
+ if new_status:
+ if task_action == "CREATE":
+ task_status_create = new_status
+ db_ro_task_update[task_path] = new_status
+ if target_update or db_vim_update:
- vim_id = ro_task["vim_info"]["vim_id"]
- if not vim_id:
- return None, None
- vm_to_refresh_list = [vim_id]
- try:
- vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
- vim_info = vim_dict[vim_id]
- if vim_info["status"] == "ACTIVE":
- task_status = "DONE"
- elif vim_info["status"] == "BUILD":
- task_status = "BUILD"
- else:
- task_status = "FAILED"
- except vimconn.VimConnException as e:
- # Mark all tasks at VIM_ERROR status
- self.logger.error("ro_task={} vim={} get-vm={}: {}".format(ro_task_id, ro_task["target_id"], vim_id, e))
- vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
- task_status = "FAILED"
+ if target_update == "DELETE":
+ self._update_target(task, None)
+ elif target_update == "COPY_VIM_INFO":
+ self._update_target(task, ro_task["vim_info"])
+ else:
+ self._update_target(task, db_vim_update)
- ro_vim_item_update = {}
- # TODO check and update interfaces
- vim_interfaces = []
- for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
- iface = next((iface for iface in vim_info["interfaces"] if vim_iface_id == iface["vim_interface_id"]), None)
- # if iface:
- # iface.pop("vim_info", None)
- vim_interfaces.append(iface)
-
- task = ro_task["tasks"][0] # TODO look for a task CREATE and active
- if task.get("mgmt_vnf_interface") is not None:
- vim_interfaces[task["mgmt_vnf_interface"]]["mgmt_vnf_interface"] = True
- mgmt_vdu_iface = task.get("mgmt_vdu_interface", task.get("mgmt_vnf_interface", 0))
- vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
+ except Exception as e:
+ if isinstance(e, DbException) and e.http_code == HTTPStatus.NOT_FOUND:
+ # if the vnfrs or nsrs has been removed from database, this task must be removed
+ self.logger.debug("marking to delete task={}".format(task["task_id"]))
+ self.tasks_to_delete.append(task)
+ else:
+ self.logger.error("Unexpected exception at _update_target task={}: {}".
+ format(task["task_id"], e), exc_info=True)
- if ro_task["vim_info"]["interfaces"] != vim_interfaces:
- ro_vim_item_update["interfaces"] = vim_interfaces
- if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
- ro_vim_item_update["vim_status"] = vim_info["status"]
- if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
- ro_vim_item_update["vim_name"] = vim_info.get("name")
- if vim_info["status"] in ("ERROR", "VIM_ERROR"):
- if ro_task["vim_info"]["vim_details"] != vim_info["error_msg"]:
- ro_vim_item_update["vim_details"] = vim_info["error_msg"]
- elif vim_info["status"] == "DELETED":
- ro_vim_item_update["vim_id"] = None
- ro_vim_item_update["vim_details"] = "Deleted externally"
- else:
- if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
- ro_vim_item_update["vim_details"] = vim_info["vim_info"]
+ q_filter = {"_id": ro_task["_id"], "to_check_at": ro_task["to_check_at"], "locked_at": ro_task["locked_at"]}
+ # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
+ # outside this task (by ro_nbi) do not update it
+ db_ro_task_update["locked_by"] = None
+ # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
+ db_ro_task_update["locked_at"] = int(now - self.MAX_TIME_LOCKED)
+ db_ro_task_update["to_check_at"] = next_check_at
+ if not self.db.set_one("ro_tasks",
+ update_dict=db_ro_task_update,
+ q_filter=q_filter,
+ fail_on_empty=False):
+ del db_ro_task_update["to_check_at"]
+ del q_filter["to_check_at"]
+ self.db.set_one("ro_tasks",
+ q_filter=q_filter,
+ update_dict=db_ro_task_update,
+ fail_on_empty=True)
+ except DbException as e:
+ self.logger.error("ro_task={} Error updating database {}".format(ro_task_id, e))
+ except Exception as e:
+ self.logger.error("Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True)
+
+ def _update_target(self, task, ro_vim_item_update):
+ table, _, temp = task["target_record"].partition(":")
+ _id, _, path_vim_status = temp.partition(":")
+ path_item = path_vim_status[:path_vim_status.rfind(".")]
+ path_item = path_item[:path_item.rfind(".")]
+ # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
+ # path_item: dot separated list targeting record information, e.g. "vdur.10"
if ro_vim_item_update:
- self.logger.debug("ro_task={} {} get-vm={}: status={} {}".format(
- ro_task_id, ro_task["target_id"], vim_id, ro_vim_item_update.get("vim_status"),
- ro_vim_item_update.get("vim_details") if ro_vim_item_update.get("vim_status") != "ACTIVE" else ''))
- return task_status, ro_vim_item_update
+ update_dict = {path_vim_status + "." + k: v for k, v in ro_vim_item_update.items() if k in
+ ('vim_id', 'vim_details', 'vim_name', 'vim_status', 'interfaces')}
+ if path_vim_status.startswith("vdur."):
+ # for backward compatibility, add vdur.name apart from vdur.vim_name
+ if ro_vim_item_update.get("vim_name"):
+ update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"]
+ # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
+ if ro_vim_item_update.get("vim_id"):
+ update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"]
+ # update general status
+ if ro_vim_item_update.get("vim_status"):
+ update_dict[path_item + ".status"] = ro_vim_item_update["vim_status"]
+ if ro_vim_item_update.get("interfaces"):
+ path_interfaces = path_item + ".interfaces"
+ for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
+ if iface:
+ update_dict.update({path_interfaces + ".{}.".format(i) + k: v for k, v in iface.items() if
+ k in ('vlan', 'compute_node', 'pci')})
+ # put ip_address and mac_address with ip-address and mac-address
+ if iface.get('ip_address'):
+ update_dict[path_interfaces + ".{}.".format(i) + "ip-address"] = iface['ip_address']
+ if iface.get('mac_address'):
+ update_dict[path_interfaces + ".{}.".format(i) + "mac-address"] = iface['mac_address']
+ if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
+ update_dict["ip-address"] = iface.get("ip_address").split(";")[0]
+ if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
+ update_dict[path_item + ".ip-address"] = iface.get("ip_address").split(";")[0]
+
+ self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
+ else:
+ update_dict = {path_item + ".status": "DELETED"}
+ self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict, unset={path_vim_status: None})
- def exec_vm(self, ro_task, task_index, task_depends):
- task = ro_task["tasks"][task_index]
- task_id = task["task_id"]
- target_vim = self.my_vims[ro_task["target_id"]]
- try:
- params = task["params"]
- params_copy = deepcopy(params)
- params_copy["use_pri_key"] = self.db.decrypt(params_copy.pop("private_key"),
- params_copy.pop("schema_version"), params_copy.pop("salt"))
+ def _process_delete_db_tasks(self):
+ """
+ Delete task from database because vnfrs or nsrs or both have been deleted
+ :return: None. Uses and modify self.tasks_to_delete
+ """
+ while self.tasks_to_delete:
+ task = self.tasks_to_delete[0]
+ vnfrs_deleted = None
+ nsr_id = task["nsr_id"]
+ if task["target_record"].startswith("vnfrs:"):
+ # check if nsrs is present
+ if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False):
+ vnfrs_deleted = task["target_record"].split(":")[1]
+ try:
+ self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted)
+ except Exception as e:
+ self.logger.error("Error deleting task={}: {}".format(task["task_id"], e))
+ self.tasks_to_delete.pop(0)
- target_vim.inject_user_key(**params_copy)
- self.logger.debug(
- "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"]))
- return "DONE", params_copy["key"]
- except (vimconn.VimConnException, NsWorkerException) as e:
- self.logger.error("task={} vim={} new-vm: {}".format(task_id, ro_task["target_id"], e))
- ro_vim_item_update = {"vim_details": str(e)}
- return "FAILED", ro_vim_item_update
+ @staticmethod
+ def delete_db_tasks(db, nsr_id, vnfrs_deleted):
+ """
+ Static method because it is called from osm_ng_ro.ns
+ :param db: instance of database to use
+ :param nsr_id: affected nsrs id
+ :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
+ :return: None, exception is fails
+ """
+ retries = 5
+ for retry in range(retries):
+ ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
+ now = time.time()
+ conflict = False
+ for ro_task in ro_tasks:
+ db_update = {}
+ to_delete_ro_task = True
+ for index, task in enumerate(ro_task["tasks"]):
+ if not task:
+ pass
+ elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or \
+ (vnfrs_deleted and task["target_record"].startswith("vnfrs:"+vnfrs_deleted)):
+ db_update["tasks.{}".format(index)] = None
+ else:
+ to_delete_ro_task = False # used by other nsr, ro_task cannot be deleted
+ # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
+ if to_delete_ro_task:
+ if not db.del_one("ro_tasks",
+ q_filter={"_id": ro_task["_id"], "modified_at": ro_task["modified_at"]},
+ fail_on_empty=False):
+ conflict = True
+ elif db_update:
+ db_update["modified_at"] = now
+ if not db.set_one("ro_tasks",
+ q_filter={"_id": ro_task["_id"], "modified_at": ro_task["modified_at"]},
+ update_dict=db_update,
+ fail_on_empty=False):
+ conflict = True
+ if not conflict:
+ return
+ else:
+ raise NsWorkerException("Exceeded {} retries".format(retries))
def run(self):
# load database
self.logger.debug("Starting")
while True:
+ # step 1: get commands from queue
try:
task = self.task_queue.get(block=False if self.my_vims else True)
if task[0] == "terminate":
break
- if task[0] == "load_vim":
+ elif task[0] == "load_vim":
self._load_vim(task[1])
+ elif task[0] == "unload_vim":
+ self._unload_vim(task[1])
+ elif task[0] == "reload_vim":
+ self._reload_vim(task[1])
+ elif task[0] == "check_vim":
+ self._check_vim(task[1])
continue
- except queue.Empty:
- pass
+ except Exception as e:
+ if isinstance(e, queue.Empty):
+ pass
+ else:
+ self.logger.critical("Error processing task: {}".format(e), exc_info=True)
+ # step 2: process pending_tasks, delete not needed tasks
try:
+ if self.tasks_to_delete:
+ self._process_delete_db_tasks()
busy = False
ro_task = self._get_db_task()
if ro_task:
- self._proccess_pending_tasks(ro_task)
+ self._process_pending_tasks(ro_task)
busy = True
if not busy:
time.sleep(5)