blob: 0b96c53619cdfbed424b0f21027250f9705ce773 [file] [log] [blame]
# -*- coding: utf-8 -*-
##
# Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
##
""""
This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
The tasks are stored at database in table ro_tasks
A single ro_task refers to a VIM element (flavor, image, network, ...).
A ro_task can contain several 'tasks', each one with a target, where to store the results
"""
import threading
import time
import queue
import logging
from pkg_resources import iter_entry_points
# from osm_common import dbmongo, dbmemory, fslocal, fsmongo, msglocal, msgkafka, version as common_version
from osm_common.dbbase import DbException
# from osm_common.fsbase import FsException
# from osm_common.msgbase import MsgException
from osm_ro_plugin.vim_dummy import VimDummyConnector
from osm_ro_plugin import vimconn
from copy import deepcopy
from unittest.mock import Mock
__author__ = "Alfonso Tierno"
__date__ = "$28-Sep-2017 12:07:15$"
def deep_get(target_dict, *args, **kwargs):
"""
Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
:param target_dict: dictionary to be read
:param args: list of keys to read from target_dict
:param kwargs: only can contain default=value to return if key is not present in the nested dictionary
:return: The wanted value if exist, None or default otherwise
"""
for key in args:
if not isinstance(target_dict, dict) or key not in target_dict:
return kwargs.get("default")
target_dict = target_dict[key]
return target_dict
class NsWorkerException(Exception):
pass
class FailingConnector:
def __init__(self, error_msg):
self.error_msg = error_msg
for method in dir(vimconn.VimConnector):
if method[0] != "_":
setattr(self, method, Mock(side_effect=vimconn.VimConnException(error_msg)))
class NsWorkerExceptionNotFound(NsWorkerException):
pass
class NsWorker(threading.Thread):
REFRESH_BUILD = 5 # 5 seconds
REFRESH_ACTIVE = 60 # 1 minute
REFRESH_ERROR = 600
REFRESH_IMAGE = 3600 * 10
REFRESH_DELETE = 3600 * 10
QUEUE_SIZE = 2000
# TODO delete assigment_lock = Lock()
terminate = False
# TODO delete assignment = {}
MAX_TIME_LOCKED = 3600
def __init__(self, worker, config, plugins, db):
"""Init a thread.
Arguments:
'id' number of thead
'name' name of thread
'host','user': host ip or name to manage and user
'db', 'db_lock': database class and lock to use it in exclusion
"""
threading.Thread.__init__(self)
self.config = config
self.plugins = plugins
self.plugin_name = "unknown"
self.logger = logging.getLogger('ro.worker{}'.format("worker"))
self.worker_id = worker
self.task_queue = queue.Queue(self.QUEUE_SIZE)
self.my_vims = {} # targetvim: vimplugin class
self.db_vims = {} # targetvim: vim information from database
self.vim_targets = [] # targetvim list
self.my_id = config["process_id"] + ":" + str(worker)
self.db = db
self.item2create = {
"net": self.new_net,
"vdu": self.new_vm,
"image": self.new_image,
"flavor": self.new_flavor,
}
self.item2refresh = {
"net": self.refresh_net,
"vdu": self.refresh_vm,
"image": self.refresh_ok,
"flavor": self.refresh_ok,
}
self.item2delete = {
"net": self.del_net,
"vdu": self.del_vm,
"image": self.delete_ok,
"flavor": self.del_flavor,
}
self.item2action = {
"vdu": self.exec_vm,
}
self.time_last_task_processed = None
def insert_task(self, task):
try:
self.task_queue.put(task, False)
return None
except queue.Full:
raise NsWorkerException("timeout inserting a task")
def terminate(self):
self.insert_task("exit")
def del_task(self, task):
with self.task_lock:
if task["status"] == "SCHEDULED":
task["status"] = "SUPERSEDED"
return True
else: # task["status"] == "processing"
self.task_lock.release()
return False
def _load_plugin(self, name, type="vim"):
# type can be vim or sdn
if "rovim_dummy" not in self.plugins:
self.plugins["rovim_dummy"] = VimDummyConnector
if name in self.plugins:
return self.plugins[name]
try:
for v in iter_entry_points('osm_ro{}.plugins'.format(type), name):
self.plugins[name] = v.load()
except Exception as e:
self.logger.critical("Cannot load osm_{}: {}".format(name, e))
if name:
self.plugins[name] = FailingConnector("Cannot load osm_{}: {}".format(name, e))
if name and name not in self.plugins:
error_text = "Cannot load a module for {t} type '{n}'. The plugin 'osm_{n}' has not been" \
" registered".format(t=type, n=name)
self.logger.critical(error_text)
self.plugins[name] = FailingConnector(error_text)
return self.plugins[name]
def _load_vim(self, vim_account_id):
target_id = "vim:" + vim_account_id
plugin_name = ""
vim = None
try:
step = "Getting vim={} from db".format(vim_account_id)
vim = self.db.get_one("vim_accounts", {"_id": vim_account_id})
# if deep_get(vim, "config", "sdn-controller"):
# step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
# db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
step = "Decrypt password"
schema_version = vim.get("schema_version")
self.db.encrypt_decrypt_fields(vim, "decrypt", fields=('password', 'secret'),
schema_version=schema_version, salt=vim_account_id)
step = "Load plugin 'rovim_{}'".format(vim.get("vim_type"))
plugin_name = "rovim_" + vim["vim_type"]
vim_module_conn = self._load_plugin(plugin_name)
self.my_vims[target_id] = vim_module_conn(
uuid=vim['_id'], name=vim['name'],
tenant_id=vim.get('vim_tenant_id'), tenant_name=vim.get('vim_tenant_name'),
url=vim['vim_url'], url_admin=None,
user=vim['vim_user'], passwd=vim['vim_password'],
config=vim.get('config'), persistent_info={}
)
self.vim_targets.append(target_id)
self.db_vims[target_id] = vim
self.error_status = None
self.logger.info("Vim Connector loaded for vim_account={}, plugin={}".format(
vim_account_id, plugin_name))
except Exception as e:
self.logger.error("Cannot load vimconnector for vim_account={} plugin={}: {} {}".format(
vim_account_id, plugin_name, step, e))
self.db_vims[target_id] = vim or {}
self.my_vims[target_id] = FailingConnector(str(e))
self.error_status = "Error loading vimconnector: {}".format(e)
def _get_db_task(self):
"""
Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
:return: None
"""
now = time.time()
if not self.time_last_task_processed:
self.time_last_task_processed = now
try:
while True:
locked = self.db.set_one(
"ro_tasks",
q_filter={"target_id": self.vim_targets,
"tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
"locked_at.lt": now - self.MAX_TIME_LOCKED,
"to_check_at.lt": self.time_last_task_processed},
update_dict={"locked_by": self.my_id, "locked_at": now},
fail_on_empty=False)
if locked:
# read and return
ro_task = self.db.get_one(
"ro_tasks",
q_filter={"target_id": self.vim_targets,
"tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
"locked_at": now})
return ro_task
if self.time_last_task_processed == now:
self.time_last_task_processed = None
return None
else:
self.time_last_task_processed = now
# self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
except DbException as e:
self.logger.error("Database exception at _get_db_task: {}".format(e))
except Exception as e:
self.logger.critical("Unexpected exception at _get_db_task: {}".format(e), exc_info=True)
return None
def _delete_task(self, ro_task, task_index, task_depends, db_update):
"""
Determine if this task need to be done or superseded
:return: None
"""
my_task = ro_task["tasks"][task_index]
task_id = my_task["task_id"]
needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get("created_items", False)
if my_task["status"] == "FAILED":
return None, None # TODO need to be retry??
try:
for index, task in enumerate(ro_task["tasks"]):
if index == task_index:
continue # own task
if my_task["target_record"] == task["target_record"] and task["action"] == "CREATE":
# set to finished
db_update["tasks.{}.status".format(index)] = task["status"] = "FINISHED"
elif task["action"] == "CREATE" and task["status"] not in ("FINISHED", "SUPERSEDED"):
needed_delete = False
if needed_delete:
return self.item2delete[my_task["item"]](ro_task, task_index)
else:
return "SUPERSEDED", None
except Exception as e:
if not isinstance(e, NsWorkerException):
self.logger.critical("Unexpected exception at _delete_task task={}: {}".format(task_id, e),
exc_info=True)
return "FAILED", {"vim_status": "VIM_ERROR", "vim_details": str(e)}
def _create_task(self, ro_task, task_index, task_depends, db_update):
"""
Determine if this task need to be created
:return: None
"""
my_task = ro_task["tasks"][task_index]
task_id = my_task["task_id"]
task_status = None
if my_task["status"] == "FAILED":
return None, None # TODO need to be retry??
elif my_task["status"] == "SCHEDULED":
# check if already created by another task
for index, task in enumerate(ro_task["tasks"]):
if index == task_index:
continue # own task
if task["action"] == "CREATE" and task["status"] not in ("SCHEDULED", "FINISHED", "SUPERSEDED"):
return task["status"], "COPY_VIM_INFO"
try:
task_status, ro_vim_item_update = self.item2create[my_task["item"]](ro_task, task_index, task_depends)
# TODO update other CREATE tasks
except Exception as e:
if not isinstance(e, NsWorkerException):
self.logger.error("Error executing task={}: {}".format(task_id, e), exc_info=True)
task_status = "FAILED"
ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_details": str(e)}
# TODO update ro_vim_item_update
return task_status, ro_vim_item_update
else:
return None, None
def _get_dependency(self, task_id, ro_task=None, target_id=None):
if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
ro_task_dependency = self.db.get_one(
"ro_tasks",
q_filter={"target_id": target_id,
"tasks.target_record_id": task_id
},
fail_on_empty=False)
if ro_task_dependency:
for task_index, task in enumerate(ro_task_dependency["tasks"]):
if task["target_record_id"] == task_id:
return ro_task_dependency, task_index
else:
if ro_task:
for task_index, task in enumerate(ro_task["tasks"]):
if task["task_id"] == task_id:
return ro_task, task_index
ro_task_dependency = self.db.get_one(
"ro_tasks",
q_filter={"tasks.ANYINDEX.task_id": task_id,
"tasks.ANYINDEX.target_record.ne": None
},
fail_on_empty=False)
if ro_task_dependency:
for task_index, task in ro_task_dependency["tasks"]:
if task["task_id"] == task_id:
return ro_task_dependency, task_index
raise NsWorkerException("Cannot get depending task {}".format(task_id))
def _proccess_pending_tasks(self, ro_task):
ro_task_id = ro_task["_id"]
now = time.time()
next_check_at = now + (24*60*60) # one day
db_ro_task_update = {}
def _update_refresh(new_status):
# compute next_refresh
nonlocal task
nonlocal next_check_at
nonlocal db_ro_task_update
nonlocal ro_task
next_refresh = time.time()
if task["item"] in ("image", "flavor"):
next_refresh += self.REFRESH_IMAGE
elif new_status == "BUILD":
next_refresh += self.REFRESH_BUILD
elif new_status == "DONE":
next_refresh += self.REFRESH_ACTIVE
else:
next_refresh += self.REFRESH_ERROR
next_check_at = min(next_check_at, next_refresh)
db_ro_task_update["vim_info.refresh_at"] = next_refresh
ro_task["vim_info"]["refresh_at"] = next_refresh
try:
# 0 get task_status_create
task_status_create = None
task_create = next((t for t in ro_task["tasks"] if t["action"] == "CREATE" and
t["status"] in ("BUILD", "DONE")), None)
if task_create:
task_status_create = task_create["status"]
# 1. look for SCHEDULED or if CREATE also DONE,BUILD
for task_action in ("DELETE", "CREATE", "EXEC"):
db_vim_update = None
for task_index, task in enumerate(ro_task["tasks"]):
target_update = None
if (task_action in ("DELETE", "EXEC") and task["status"] != "SCHEDULED") or\
task["action"] != task_action or \
(task_action == "CREATE" and task["status"] in ("FINISHED", "SUPERSEDED")):
continue
task_path = "tasks.{}.status".format(task_index)
try:
if task["status"] == "SCHEDULED":
task_depends = {}
# check if tasks that this depends on have been completed
dependency_not_completed = False
for dependency_task_id in (task.get("depends_on") or ()):
dependency_ro_task, dependency_task_index = \
self._get_dependency(dependency_task_id, target_id=ro_task["target_id"])
dependency_task = dependency_ro_task["tasks"][dependency_task_index]
if dependency_task["status"] == "SCHEDULED":
dependency_not_completed = True
next_check_at = min(next_check_at, dependency_ro_task["to_check_at"])
break
elif dependency_task["status"] == "FAILED":
error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
task["action"], task["item"], dependency_task["action"],
dependency_task["item"], dependency_task_id,
dependency_ro_task["vim_info"].get("vim_details"))
self.logger.error("task={} {}".format(task["task_id"], error_text))
raise NsWorkerException(error_text)
task_depends[dependency_task_id] = dependency_ro_task["vim_info"]["vim_id"]
task_depends["TASK-{}".format(dependency_task_id)] = \
dependency_ro_task["vim_info"]["vim_id"]
if dependency_not_completed:
# TODO set at vim_info.vim_details that it is waiting
continue
if task["action"] == "DELETE":
new_status, db_vim_info_update = self._delete_task(ro_task, task_index,
task_depends, db_ro_task_update)
new_status = "FINISHED" if new_status == "DONE" else new_status
# ^with FINISHED instead of DONE it will not be refreshing
if new_status in ("FINISHED", "SUPERSEDED"):
target_update = "DELETE"
elif task["action"] == "EXEC":
self.item2action[task["item"]](ro_task, task_index, task_depends, db_ro_task_update)
new_status = "FINISHED" if new_status == "DONE" else new_status
# ^with FINISHED instead of DONE it will not be refreshing
if new_status in ("FINISHED", "SUPERSEDED"):
target_update = "DELETE"
elif task["action"] == "CREATE":
if task["status"] == "SCHEDULED":
if task_status_create:
new_status = task_status_create
target_update = "COPY_VIM_INFO"
else:
new_status, db_vim_info_update = \
self.item2create[task["item"]](ro_task, task_index, task_depends)
# self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
_update_refresh(new_status)
else:
if ro_task["vim_info"]["refresh_at"] and now > ro_task["vim_info"]["refresh_at"]:
new_status, db_vim_info_update = self.item2refresh[task["item"]](ro_task)
_update_refresh(new_status)
except Exception as e:
new_status = "FAILED"
db_vim_info_update = {"vim_status": "VIM_ERROR", "vim_details": str(e)}
if not isinstance(e, (NsWorkerException, vimconn.VimConnException)):
self.logger.error("Unexpected exception at _delete_task task={}: {}".
format(task["task_id"], e), exc_info=True)
try:
if db_vim_info_update:
db_vim_update = db_vim_info_update.copy()
db_ro_task_update.update({"vim_info." + k: v for k, v in db_vim_info_update.items()})
ro_task["vim_info"].update(db_vim_info_update)
if new_status:
if task_action == "CREATE":
task_status_create = new_status
db_ro_task_update[task_path] = new_status
if target_update or db_vim_update:
if target_update == "DELETE":
self._update_target(task, None)
elif target_update == "COPY_VIM_INFO":
self._update_target(task, ro_task["vim_info"])
else:
self._update_target(task, db_vim_update)
except Exception as e:
self.logger.error("Unexpected exception at _update_target task={}: {}".
format(task["task_id"], e), exc_info=True)
# modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
# outside this task (by ro_nbi) do not update it
db_ro_task_update["locked_by"] = None
# locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
db_ro_task_update["locked_at"] = int(now - self.MAX_TIME_LOCKED)
db_ro_task_update["to_check_at"] = next_check_at
if not self.db.set_one("ro_tasks",
update_dict=db_ro_task_update,
q_filter={"_id": ro_task["_id"], "to_check_at": ro_task["to_check_at"]},
fail_on_empty=False):
del db_ro_task_update["to_check_at"]
self.db.set_one("ro_tasks",
q_filter={"_id": ro_task["_id"]},
update_dict=db_ro_task_update,
fail_on_empty=True)
except DbException as e:
self.logger.error("ro_task={} Error updating database {}".format(ro_task_id, e))
except Exception as e:
self.logger.error("Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True)
def _update_target(self, task, ro_vim_item_update):
try:
table, _id, path = task["target_record"].split(":")
if ro_vim_item_update:
update_dict = {path + "." + k: v for k, v in ro_vim_item_update.items() if k in
('vim_id', 'vim_details', 'vim_name', 'vim_status', 'interfaces')}
if ro_vim_item_update.get("interfaces"):
path_vdu = path[:path.rfind(".")]
path_vdu = path_vdu[:path_vdu.rfind(".")]
path_interfaces = path_vdu + ".interfaces"
for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
if iface:
update_dict.update({path_interfaces + ".{}.".format(i) + k: v for k, v in iface.items() if
k in ('ip_address', 'mac_address', 'vlan', 'compute_node', 'pci')})
if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
update_dict["ip-address"] = iface.get("ip_address").split(";")[0]
if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
update_dict[path_vdu + ".ip-address"] = iface.get("ip_address").split(";")[0]
self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
else:
self.db.set_one(table, q_filter={"_id": _id}, update_dict=None,
unset={path: None})
except DbException as e:
self.logger.error("Cannot update database '{}': '{}'".format(task["target_record"], e))
def new_image(self, ro_task, task_index, task_depends):
task = ro_task["tasks"][task_index]
task_id = task["task_id"]
created = False
created_items = {}
target_vim = self.my_vims[ro_task["target_id"]]
try:
# FIND
if task.get("find_params"):
vim_images = target_vim.get_image_list(**task["find_params"])
if not vim_images:
raise NsWorkerExceptionNotFound("Image not found with this criteria: '{}'".format(
task["find_params"]))
elif len(vim_images) > 1:
raise NsWorkerException(
"More than one network found with this criteria: '{}'".format(task["find_params"]))
else:
vim_image_id = vim_images[0]["id"]
ro_vim_item_update = {"vim_id": vim_image_id,
"vim_status": "DONE",
"created": created,
"created_items": created_items,
"vim_details": None}
self.logger.debug(
"task={} {} new-image={} created={}".format(task_id, ro_task["target_id"], vim_image_id, created))
return "DONE", ro_vim_item_update
except (NsWorkerException, vimconn.VimConnException) as e:
self.logger.error("task={} {} new-image: {}".format(task_id, ro_task["target_id"], e))
ro_vim_item_update = {"vim_status": "VIM_ERROR",
"created": created,
"vim_details": str(e)}
return "FAILED", ro_vim_item_update
def del_flavor(self, ro_task, task_index):
task = ro_task["tasks"][task_index]
task_id = task["task_id"]
flavor_vim_id = ro_task["vim_info"]["vim_id"]
ro_vim_item_update_ok = {"vim_status": "DELETED",
"created": False,
"vim_details": "DELETED",
"vim_id": None}
try:
if flavor_vim_id:
target_vim = self.my_vims[ro_task["target_id"]]
target_vim.delete_flavor(flavor_vim_id)
except vimconn.VimConnNotFoundException:
ro_vim_item_update_ok["vim_details"] = "already deleted"
except vimconn.VimConnException as e:
self.logger.error("ro_task={} vim={} del-flavor={}: {}".format(
ro_task["_id"], ro_task["target_id"], flavor_vim_id, e))
ro_vim_item_update = {"vim_status": "VIM_ERROR",
"vim_details": "Error while deleting: {}".format(e)}
return "FAILED", ro_vim_item_update
self.logger.debug("task={} {} del-flavor={} {}".format(
task_id, ro_task["target_id"], flavor_vim_id, ro_vim_item_update_ok.get("vim_details", "")))
return "DONE", ro_vim_item_update_ok
def refresh_ok(self, ro_task):
"""skip calling VIM to get image status. Assumes ok"""
if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
return "FAILED", {}
return "DONE", {}
def delete_ok(self, ro_task):
"""skip calling VIM to delete image status. Assumes ok"""
return "DONE", {}
def new_flavor(self, ro_task, task_index, task_depends):
task = ro_task["tasks"][task_index]
task_id = task["task_id"]
created = False
created_items = {}
target_vim = self.my_vims[ro_task["target_id"]]
try:
# FIND
vim_flavor_id = None
if task.get("find_params"):
try:
flavor_data = task["find_params"]["flavor_data"]
vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
except vimconn.VimConnNotFoundException:
pass
if not vim_flavor_id and task.get("params"):
# CREATE
flavor_data = task["params"]["flavor_data"]
vim_flavor_id = target_vim.new_flavor(flavor_data)
created = True
ro_vim_item_update = {"vim_id": vim_flavor_id,
"vim_status": "DONE",
"created": created,
"created_items": created_items,
"vim_details": None}
self.logger.debug(
"task={} {} new-flavor={} created={}".format(task_id, ro_task["target_id"], vim_flavor_id, created))
return "DONE", ro_vim_item_update
except (vimconn.VimConnException, NsWorkerException) as e:
self.logger.error("task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e))
ro_vim_item_update = {"vim_status": "VIM_ERROR",
"created": created,
"vim_details": str(e)}
return "FAILED", ro_vim_item_update
def new_net(self, ro_task, task_index, task_depends):
vim_net_id = None
task = ro_task["tasks"][task_index]
task_id = task["task_id"]
created = False
created_items = {}
target_vim = self.my_vims[ro_task["target_id"]]
try:
# FIND
if task.get("find_params"):
# if management, get configuration of VIM
if task["find_params"].get("filter_dict"):
vim_filter = task["find_params"]["filter_dict"]
elif task["find_params"].get("mgmt"): # mamagement network
if deep_get(self.db_vims[ro_task["target_id"]], "config", "management_network_id"):
vim_filter = {"id": self.db_vims[ro_task["target_id"]]["config"]["management_network_id"]}
elif deep_get(self.db_vims[ro_task["target_id"]], "config", "management_network_name"):
vim_filter = {"name": self.db_vims[ro_task["target_id"]]["config"]["management_network_name"]}
else:
vim_filter = {"name": task["find_params"]["name"]}
else:
raise NsWorkerExceptionNotFound("Invalid find_params for new_net {}".format(task["find_params"]))
vim_nets = target_vim.get_network_list(vim_filter)
if not vim_nets and not task.get("params"):
raise NsWorkerExceptionNotFound("Network not found with this criteria: '{}'".format(
task.get("find_params")))
elif len(vim_nets) > 1:
raise NsWorkerException(
"More than one network found with this criteria: '{}'".format(task["find_params"]))
if vim_nets:
vim_net_id = vim_nets[0]["id"]
else:
# CREATE
params = task["params"]
vim_net_id, created_items = target_vim.new_network(**params)
created = True
ro_vim_item_update = {"vim_id": vim_net_id,
"vim_status": "BUILD",
"created": created,
"created_items": created_items,
"vim_details": None}
self.logger.debug(
"task={} {} new-net={} created={}".format(task_id, ro_task["target_id"], vim_net_id, created))
return "BUILD", ro_vim_item_update
except (vimconn.VimConnException, NsWorkerException) as e:
self.logger.error("task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e))
ro_vim_item_update = {"vim_status": "VIM_ERROR",
"created": created,
"vim_details": str(e)}
return "FAILED", ro_vim_item_update
def refresh_net(self, ro_task):
"""Call VIM to get network status"""
ro_task_id = ro_task["_id"]
target_vim = self.my_vims[ro_task["target_id"]]
vim_id = ro_task["vim_info"]["vim_id"]
net_to_refresh_list = [vim_id]
try:
vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
vim_info = vim_dict[vim_id]
if vim_info["status"] == "ACTIVE":
task_status = "DONE"
elif vim_info["status"] == "BUILD":
task_status = "BUILD"
else:
task_status = "FAILED"
except vimconn.VimConnException as e:
# Mark all tasks at VIM_ERROR status
self.logger.error("ro_task={} vim={} get-net={}: {}".format(ro_task_id, ro_task["target_id"], vim_id, e))
vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
task_status = "FAILED"
ro_vim_item_update = {}
if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
ro_vim_item_update["vim_status"] = vim_info["status"]
if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
ro_vim_item_update["vim_name"] = vim_info.get("name")
if vim_info["status"] in ("ERROR", "VIM_ERROR"):
if ro_task["vim_info"]["vim_details"] != vim_info["error_msg"]:
ro_vim_item_update["vim_details"] = vim_info["error_msg"]
elif vim_info["status"] == "DELETED":
ro_vim_item_update["vim_id"] = None
ro_vim_item_update["vim_details"] = "Deleted externally"
else:
if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
ro_vim_item_update["vim_details"] = vim_info["vim_info"]
if ro_vim_item_update:
self.logger.debug("ro_task={} {} get-net={}: status={} {}".format(
ro_task_id, ro_task["target_id"], vim_id, ro_vim_item_update.get("vim_status"),
ro_vim_item_update.get("vim_details") if ro_vim_item_update.get("vim_status") != "ACTIVE" else ''))
return task_status, ro_vim_item_update
def del_net(self, ro_task, task_index):
task = ro_task["tasks"][task_index]
task_id = task["task_id"]
net_vim_id = ro_task["vim_info"]["vim_id"]
ro_vim_item_update_ok = {"vim_status": "DELETED",
"created": False,
"vim_details": "DELETED",
"vim_id": None}
try:
if net_vim_id or ro_task["vim_info"]["created_items"]:
target_vim = self.my_vims[ro_task["target_id"]]
target_vim.delete_network(net_vim_id, ro_task["vim_info"]["created_items"])
except vimconn.VimConnNotFoundException:
ro_vim_item_update_ok["vim_details"] = "already deleted"
except vimconn.VimConnException as e:
self.logger.error("ro_task={} vim={} del-net={}: {}".format(ro_task["_id"], ro_task["target_id"],
net_vim_id, e))
ro_vim_item_update = {"vim_status": "VIM_ERROR",
"vim_details": "Error while deleting: {}".format(e)}
return "FAILED", ro_vim_item_update
self.logger.debug("task={} {} del-net={} {}".format(task_id, ro_task["target_id"], net_vim_id,
ro_vim_item_update_ok.get("vim_details", "")))
return "DONE", ro_vim_item_update_ok
def new_vm(self, ro_task, task_index, task_depends):
task = ro_task["tasks"][task_index]
task_id = task["task_id"]
created = False
created_items = {}
target_vim = self.my_vims[ro_task["target_id"]]
try:
created = True
params = task["params"]
params_copy = deepcopy(params)
net_list = params_copy["net_list"]
for net in net_list:
if "net_id" in net and net["net_id"].startswith("TASK-"): # change task_id into network_id
network_id = task_depends[net["net_id"]]
if not network_id:
raise NsWorkerException("Cannot create VM because depends on a network not created or found "
"for {}".format(net["net_id"]))
net["net_id"] = network_id
if params_copy["image_id"].startswith("TASK-"):
params_copy["image_id"] = task_depends[params_copy["image_id"]]
if params_copy["flavor_id"].startswith("TASK-"):
params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
ro_vim_item_update = {"vim_id": vim_vm_id,
"vim_status": "BUILD",
"created": created,
"created_items": created_items,
"vim_details": None,
"interfaces_vim_ids": interfaces,
"interfaces": [],
}
self.logger.debug(
"task={} {} new-vm={} created={}".format(task_id, ro_task["target_id"], vim_vm_id, created))
return "BUILD", ro_vim_item_update
except (vimconn.VimConnException, NsWorkerException) as e:
self.logger.error("task={} vim={} new-vm: {}".format(task_id, ro_task["target_id"], e))
ro_vim_item_update = {"vim_status": "VIM_ERROR",
"created": created,
"vim_details": str(e)}
return "FAILED", ro_vim_item_update
def del_vm(self, ro_task, task_index):
task = ro_task["tasks"][task_index]
task_id = task["task_id"]
vm_vim_id = ro_task["vim_info"]["vim_id"]
ro_vim_item_update_ok = {"vim_status": "DELETED",
"created": False,
"vim_details": "DELETED",
"vim_id": None}
try:
if vm_vim_id or ro_task["vim_info"]["created_items"]:
target_vim = self.my_vims[ro_task["target_id"]]
target_vim.delete_vminstance(vm_vim_id, ro_task["vim_info"]["created_items"])
except vimconn.VimConnNotFoundException:
ro_vim_item_update_ok["vim_details"] = "already deleted"
except vimconn.VimConnException as e:
self.logger.error("ro_task={} vim={} del-vm={}: {}".format(ro_task["_id"], ro_task["target_id"],
vm_vim_id, e))
ro_vim_item_update = {"vim_status": "VIM_ERROR",
"vim_details": "Error while deleting: {}".format(e)}
return "FAILED", ro_vim_item_update
self.logger.debug("task={} {} del-vm={} {}".format(task_id, ro_task["target_id"], vm_vim_id,
ro_vim_item_update_ok.get("vim_details", "")))
return "DONE", ro_vim_item_update_ok
def refresh_vm(self, ro_task):
"""Call VIM to get vm status"""
ro_task_id = ro_task["_id"]
target_vim = self.my_vims[ro_task["target_id"]]
vim_id = ro_task["vim_info"]["vim_id"]
if not vim_id:
return None, None
vm_to_refresh_list = [vim_id]
try:
vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
vim_info = vim_dict[vim_id]
if vim_info["status"] == "ACTIVE":
task_status = "DONE"
elif vim_info["status"] == "BUILD":
task_status = "BUILD"
else:
task_status = "FAILED"
except vimconn.VimConnException as e:
# Mark all tasks at VIM_ERROR status
self.logger.error("ro_task={} vim={} get-vm={}: {}".format(ro_task_id, ro_task["target_id"], vim_id, e))
vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
task_status = "FAILED"
ro_vim_item_update = {}
# TODO check and update interfaces
vim_interfaces = []
for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
iface = next((iface for iface in vim_info["interfaces"] if vim_iface_id == iface["vim_interface_id"]), None)
# if iface:
# iface.pop("vim_info", None)
vim_interfaces.append(iface)
task = ro_task["tasks"][0] # TODO look for a task CREATE and active
if task.get("mgmt_vnf_interface") is not None:
vim_interfaces[task["mgmt_vnf_interface"]]["mgmt_vnf_interface"] = True
mgmt_vdu_iface = task.get("mgmt_vdu_interface", task.get("mgmt_vnf_interface", 0))
vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
if ro_task["vim_info"]["interfaces"] != vim_interfaces:
ro_vim_item_update["interfaces"] = vim_interfaces
if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
ro_vim_item_update["vim_status"] = vim_info["status"]
if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
ro_vim_item_update["vim_name"] = vim_info.get("name")
if vim_info["status"] in ("ERROR", "VIM_ERROR"):
if ro_task["vim_info"]["vim_details"] != vim_info["error_msg"]:
ro_vim_item_update["vim_details"] = vim_info["error_msg"]
elif vim_info["status"] == "DELETED":
ro_vim_item_update["vim_id"] = None
ro_vim_item_update["vim_details"] = "Deleted externally"
else:
if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
ro_vim_item_update["vim_details"] = vim_info["vim_info"]
if ro_vim_item_update:
self.logger.debug("ro_task={} {} get-vm={}: status={} {}".format(
ro_task_id, ro_task["target_id"], vim_id, ro_vim_item_update.get("vim_status"),
ro_vim_item_update.get("vim_details") if ro_vim_item_update.get("vim_status") != "ACTIVE" else ''))
return task_status, ro_vim_item_update
def exec_vm(self, ro_task, task_index, task_depends):
task = ro_task["tasks"][task_index]
task_id = task["task_id"]
target_vim = self.my_vims[ro_task["target_id"]]
try:
params = task["params"]
params_copy = deepcopy(params)
params_copy["use_pri_key"] = self.db.decrypt(params_copy.pop("private_key"),
params_copy.pop("schema_version"), params_copy.pop("salt"))
target_vim.inject_user_key(**params_copy)
self.logger.debug(
"task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"]))
return "DONE", params_copy["key"]
except (vimconn.VimConnException, NsWorkerException) as e:
self.logger.error("task={} vim={} new-vm: {}".format(task_id, ro_task["target_id"], e))
ro_vim_item_update = {"vim_details": str(e)}
return "FAILED", ro_vim_item_update
def run(self):
# load database
self.logger.debug("Starting")
while True:
try:
task = self.task_queue.get(block=False if self.my_vims else True)
if task[0] == "terminate":
break
if task[0] == "load_vim":
self._load_vim(task[1])
continue
except queue.Empty:
pass
try:
busy = False
ro_task = self._get_db_task()
if ro_task:
self._proccess_pending_tasks(ro_task)
busy = True
if not busy:
time.sleep(5)
except Exception as e:
self.logger.critical("Unexpected exception at run: " + str(e), exc_info=True)
self.logger.debug("Finishing")