import logging
import collections
from db_base import db_base_Exception
+import nfvo_db
+from threading import Lock
+from time import time
global global_config
global vimconn_imported
vimconn_imported = {} # dictionary with VIM type as key, loaded module as value
vim_threads = {"running":{}, "deleting": {}, "names": []} # threads running for attached-VIMs
+vim_persistent_info = {}
logger = logging.getLogger('openmano.nfvo')
+task_lock = Lock()
+task_dict = {}
+last_task_id = 0.0
+db=None
+db_lock=Lock()
class NfvoException(Exception):
def __init__(self, message, http_code):
Exception.__init__(self, message)
+def get_task_id():
+ global last_task_id
+ task_id = time()
+ if task_id <= last_task_id:
+ task_id = last_task_id + 0.000001
+ last_task_id = task_id
+ return "TASK.{:.6f}".format(task_id)
+
+
+def new_task(name, params, store=True, depends=None):
+ task_id = get_task_id()
+ task = {"status": "enqueued", "id": task_id, "name": name, "params": params}
+ if depends:
+ task["depends"] = depends
+ if store:
+ task_dict[task_id] = task
+ return task
+
+
+def is_task_id(id):
+ return True if id[:5] == "TASK." else False
+
+
def get_non_used_vim_name(datacenter_name, datacenter_id, tenant_name, tenant_id):
name = datacenter_name[:16]
if name not in vim_threads["names"]:
vim_threads["names"].append(name)
return name
- name = datatacenter_name[:16] + "." + tenant_name[:16]
+ name = datacenter_name[:16] + "." + tenant_name[:16]
if name not in vim_threads["names"]:
vim_threads["names"].append(name)
return name
def start_service(mydb):
+ global db, global_config
+ db = nfvo_db.nfvo_db()
+ db.connect(global_config['db_host'], global_config['db_user'], global_config['db_passwd'], global_config['db_name'])
from_= 'tenants_datacenters as td join datacenters as d on td.datacenter_id=d.uuid join datacenter_tenants as dt on td.datacenter_tenant_id=dt.uuid'
select_ = ('type','d.config as config','d.uuid as datacenter_id', 'vim_url', 'vim_url_admin', 'd.name as datacenter_name',
'dt.uuid as datacenter_tenant_id','dt.vim_tenant_name as vim_tenant_name','dt.vim_tenant_id as vim_tenant_id',
if module_info and module_info[0]:
file.close(module_info[0])
raise NfvoException("Unknown vim type '{}'. Can not open file '{}.py'; {}: {}".format(
- vim["type"], module, type(e).__name__, str(e)), HTTP_Bad_Request)
+ vim["type"], module, type(e).__name__, str(e)), HTTP_Bad_Request)
+ thread_id = vim["datacenter_id"] + "." + vim['nfvo_tenant_id']
+ vim_persistent_info[thread_id] = {}
try:
#if not tenant:
# return -HTTP_Bad_Request, "You must provide a valid tenant name or uuid for VIM %s" % ( vim["type"])
myvim = vimconn_imported[ vim["type"] ].vimconnector(
- uuid=vim['datacenter_id'], name=vim['datacenter_name'],
- tenant_id=vim['vim_tenant_id'], tenant_name=vim['vim_tenant_name'],
- url=vim['vim_url'], url_admin=vim['vim_url_admin'],
- user=vim['user'], passwd=vim['passwd'],
- config=extra
- )
+ uuid=vim['datacenter_id'], name=vim['datacenter_name'],
+ tenant_id=vim['vim_tenant_id'], tenant_name=vim['vim_tenant_name'],
+ url=vim['vim_url'], url_admin=vim['vim_url_admin'],
+ user=vim['user'], passwd=vim['passwd'],
+ config=extra, persistent_info=vim_persistent_info[thread_id]
+ )
except Exception as e:
raise NfvoException("Error at VIM {}; {}: {}".format(vim["type"], type(e).__name__, str(e)), HTTP_Internal_Server_Error)
thread_name = get_non_used_vim_name(vim['datacenter_name'], vim['vim_tenant_id'], vim['vim_tenant_name'], vim['vim_tenant_id'])
- new_thread = vim_thread.vim_thread(myvim, thread_name)
+ new_thread = vim_thread.vim_thread(myvim, task_lock, thread_name, vim['datacenter_name'],
+ vim.get('datacenter_tenant_id'), db=db, db_lock=db_lock)
new_thread.start()
- thread_id = vim["datacenter_id"] + "-" + vim['nfvo_tenant_id']
vim_threads["running"][thread_id] = new_thread
except db_base_Exception as e:
raise NfvoException(str(e) + " at nfvo.get_vim", e.http_code)
+
def stop_service():
for thread_id,thread in vim_threads["running"].items():
- thread.insert_task("exit")
+ thread.insert_task(new_task("exit", None, store=False))
vim_threads["deleting"][thread_id] = thread
- vim_threads["running"]={}
+ vim_threads["running"] = {}
+
def get_flavorlist(mydb, vnf_id, nfvo_tenant=None):
'''Obtain flavorList
flavorList.append(flavor['flavor_id'])
return flavorList
+
def get_imagelist(mydb, vnf_id, nfvo_tenant=None):
'''Obtain imageList
return result, content:
imageList.append(image['image_id'])
return imageList
+
def get_vim(mydb, nfvo_tenant=None, datacenter_id=None, datacenter_name=None, datacenter_tenant_id=None,
vim_tenant=None, vim_tenant_name=None, vim_user=None, vim_passwd=None):
'''Obtain a dictionary of VIM (datacenter) classes with some of the input parameters
vim["type"], module, type(e).__name__, str(e)), HTTP_Bad_Request)
try:
+ if 'nfvo_tenant_id' in vim:
+ thread_id = vim["datacenter_id"] + "." + vim['nfvo_tenant_id']
+ if thread_id not in vim_persistent_info:
+ vim_persistent_info[thread_id] = {}
+ persistent_info = vim_persistent_info[thread_id]
+ else:
+ persistent_info = {}
#if not tenant:
# return -HTTP_Bad_Request, "You must provide a valid tenant name or uuid for VIM %s" % ( vim["type"])
vim_dict[ vim['datacenter_id'] ] = vimconn_imported[ vim["type"] ].vimconnector(
uuid=vim['datacenter_id'], name=vim['datacenter_name'],
- tenant_id=vim.get('vim_tenant_id',vim_tenant), tenant_name=vim.get('vim_tenant_name',vim_tenant_name),
+ tenant_id=vim.get('vim_tenant_id',vim_tenant),
+ tenant_name=vim.get('vim_tenant_name',vim_tenant_name),
url=vim['vim_url'], url_admin=vim['vim_url_admin'],
user=vim.get('user',vim_user), passwd=vim.get('passwd',vim_passwd),
- config=extra
+ config=extra, persistent_info=persistent_info
)
except Exception as e:
raise NfvoException("Error at VIM {}; {}: {}".format(vim["type"], type(e).__name__, str(e)), HTTP_Internal_Server_Error)
except db_base_Exception as e:
raise NfvoException(str(e) + " at nfvo.get_vim", e.http_code)
+
def rollback(mydb, vims, rollback_list):
undeleted_items=[]
#delete things by reverse order
else:
return False," Rollback fails to delete: " + str(undeleted_items)
+
def check_vnf_descriptor(vnf_descriptor, vnf_descriptor_version=1):
global global_config
#create a dictionary with vnfc-name: vnfc:interface-list key:values pairs
return image_vim_id if only_create_at_vim else image_mano_id
+
def create_or_use_flavor(mydb, vims, flavor_dict, rollback_list, only_create_at_vim=False, return_on_error = None):
temp_flavor_dict= {'disk':flavor_dict.get('disk',1),
'ram':flavor_dict.get('ram'),
return flavor_vim_id if only_create_at_vim else flavor_mano_id
+
def new_vnf(mydb, tenant_id, vnf_descriptor):
global global_config
#logger.error("start_scenario %s", error_text)
raise NfvoException(error_text, e.http_code)
+
def new_vnf_v02(mydb, tenant_id, vnf_descriptor):
global global_config
#logger.error("start_scenario %s", error_text)
raise NfvoException(error_text, e.http_code)
+
def get_vnf_id(mydb, tenant_id, vnf_id):
#check valid tenant_id
check_tenant(mydb, tenant_id)
#if undeletedItems:
# return "delete_vnf. Undeleted: %s" %(undeletedItems)
+
def get_hosts_info(mydb, nfvo_tenant_id, datacenter_name=None):
result, vims = get_vim(mydb, nfvo_tenant_id, None, datacenter_name)
if result < 0:
topology = {'name':myvim['name'] , 'servers': servers}
return result, topology
+
def get_hosts(mydb, nfvo_tenant_id):
vims = get_vim(mydb, nfvo_tenant_id)
if len(vims) == 0:
except vimconn.vimconnException as e:
raise NfvoException("Not possible to get_host_list from VIM: {}".format(str(e)), e.http_code)
+
def new_scenario(mydb, tenant_id, topo):
# result, vims = get_vim(mydb, tenant_id)
return c
+
def new_scenario_v02(mydb, tenant_id, scenario_dict, version):
""" This creates a new scenario for version 0.2 and 0.3"""
scenario = scenario_dict["scenario"]
scenario_id = mydb.new_scenario(scenario)
return scenario_id
+
def edit_scenario(mydb, tenant_id, scenario_id, data):
data["uuid"] = scenario_id
data["tenant_id"] = tenant_id
c = mydb.edit_scenario( data )
return c
+
def start_scenario(mydb, tenant_id, scenario_id, instance_scenario_name, instance_scenario_description, datacenter=None,vim_tenant=None, startvms=True):
#print "Checking that nfvo_tenant_id exists and getting the VIM URI and the VIM tenant_id"
datacenter_id, myvim = get_datacenter_by_name_uuid(mydb, tenant_id, datacenter, vim_tenant=vim_tenant)
#logger.error("start_scenario %s", error_text)
raise NfvoException(error_text, e.http_code)
+
def unify_cloud_config(cloud_config_preserve, cloud_config):
''' join the cloud config information into cloud_config_preserve.
In case of conflict cloud_config_preserve preserves
return new_cloud_config
+def get_vim_thread(tenant_id, datacenter_id_name=None, datacenter_tenant_id=None):
+ datacenter_id = None
+ datacenter_name = None
+ thread = None
+ if datacenter_id_name:
+ if utils.check_valid_uuid(datacenter_id_name):
+ datacenter_id = datacenter_id_name
+ else:
+ datacenter_name = datacenter_id_name
+ if datacenter_id:
+ thread = vim_threads["running"].get(datacenter_id + "." + tenant_id)
+ else:
+ for k, v in vim_threads["running"].items():
+ datacenter_tenant = k.split(".")
+ if datacenter_tenant[0] == datacenter_id and datacenter_tenant[1] == tenant_id:
+ if thread:
+ raise NfvoException("More than one datacenters found, try to identify with uuid", HTTP_Conflict)
+ thread = v
+ elif not datacenter_id and datacenter_tenant[1] == tenant_id:
+ if thread.datacenter_name == datacenter_name:
+ if thread:
+ raise NfvoException("More than one datacenters found, try to identify with uuid", HTTP_Conflict)
+ thread = v
+ if not thread:
+ raise NfvoException("datacenter '{}' not found".format(str(datacenter_id_name)), HTTP_Not_Found)
+ return thread
+
def get_datacenter_by_name_uuid(mydb, tenant_id, datacenter_id_name=None, **extra_filter):
datacenter_id = None
raise NfvoException("More than one datacenters found, try to identify with uuid", HTTP_Conflict)
return vims.keys()[0], vims.values()[0]
+
def update(d, u):
'''Takes dict d and updates it with the values in dict u.'''
'''It merges all depth levels'''
d[k] = u[k]
return d
+
def create_instance(mydb, tenant_id, instance_dict):
- #print "Checking that nfvo_tenant_id exists and getting the VIM URI and the VIM tenant_id"
- #logger.debug("Creating instance...")
+ # print "Checking that nfvo_tenant_id exists and getting the VIM URI and the VIM tenant_id"
+ # logger.debug("Creating instance...")
scenario = instance_dict["scenario"]
#find main datacenter
myvims = {}
+ myvim_threads = {}
datacenter2tenant = {}
datacenter = instance_dict.get("datacenter")
default_datacenter_id, vim = get_datacenter_by_name_uuid(mydb, tenant_id, datacenter)
myvims[default_datacenter_id] = vim
+ myvim_threads[default_datacenter_id] = get_vim_thread(tenant_id, default_datacenter_id)
datacenter2tenant[default_datacenter_id] = vim['config']['datacenter_tenant_id']
#myvim_tenant = myvim['tenant_id']
# default_datacenter_name = vim['name']
logger.debug("Creating instance from scenario-dict:\n%s", yaml.safe_dump(scenarioDict, indent=4, default_flow_style=False)) #TODO remove
instance_name = instance_dict["name"]
instance_description = instance_dict.get("description")
+ instance_tasks={}
try:
- #0 check correct parameters
+ # 0 check correct parameters
for net_name, net_instance_desc in instance_dict.get("networks",{}).iteritems():
- found=False
+ found = False
for scenario_net in scenarioDict['nets']:
if net_name == scenario_net["name"]:
found = True
#Add this datacenter to myvims
d, v = get_datacenter_by_name_uuid(mydb, tenant_id, site["datacenter"])
myvims[d] = v
+ myvim_threads[d] = get_vim_thread(tenant_id, site["datacenter"])
datacenter2tenant[d] = v['config']['datacenter_tenant_id']
- site["datacenter"] = d #change name to id
+ site["datacenter"] = d #change name to id
else:
if site_without_datacenter_field:
raise NfvoException("Found more than one entries without datacenter field at instance:networks:{}:sites".format(net_name), HTTP_Bad_Request)
site_without_datacenter_field = True
- site["datacenter"] = default_datacenter_id #change name to id
+ site["datacenter"] = default_datacenter_id #change name to id
for vnf_name, vnf_instance_desc in instance_dict.get("vnfs",{}).iteritems():
found=False
if not found:
raise NfvoException("Invalid vnf name '{}' at instance:vnfs".format(vnf_instance_desc), HTTP_Bad_Request)
if "datacenter" in vnf_instance_desc:
- #Add this datacenter to myvims
+ # Add this datacenter to myvims
if vnf_instance_desc["datacenter"] not in myvims:
d, v = get_datacenter_by_name_uuid(mydb, tenant_id, vnf_instance_desc["datacenter"])
myvims[d] = v
+ myvim_threads[d] = get_vim_thread(tenant_id, vnf_instance_desc["datacenter"])
datacenter2tenant[d] = v['config']['datacenter_tenant_id']
scenario_vnf["datacenter"] = vnf_instance_desc["datacenter"]
logger.debug("Creating instance scenario-dict MERGED:\n%s", yaml.safe_dump(scenarioDict, indent=4, default_flow_style=False))
- #1. Creating new nets (sce_nets) in the VIM"
+ # 1. Creating new nets (sce_nets) in the VIM"
for sce_net in scenarioDict['nets']:
sce_net["vim_id_sites"]={}
descriptor_net = instance_dict.get("networks",{}).get(sce_net["name"],{})
if site.get("datacenter"):
vim = myvims[ site["datacenter"] ]
datacenter_id = site["datacenter"]
+ myvim_thread = myvim_threads[ site["datacenter"] ]
else:
vim = myvims[ default_datacenter_id ]
datacenter_id = default_datacenter_id
+ myvim_thread = myvim_threads[default_datacenter_id]
net_type = sce_net['type']
lookfor_filter = {'admin_state_up': True, 'status': 'ACTIVE'} #'shared': True
if sce_net["external"]:
create_network = False
if create_network:
#if network is not external
- network_id = vim.new_network(net_vim_name, net_type, sce_net.get('ip_profile',None))
- sce_net["vim_id_sites"][datacenter_id] = network_id
- auxNetDict['scenario'][sce_net['uuid']][datacenter_id] = network_id
- rollbackList.append({'what':'network', 'where':'vim', 'vim_id':datacenter_id, 'uuid':network_id})
+ task = new_task("new-net", (net_vim_name, net_type, sce_net.get('ip_profile',None)))
+ task_id = myvim_thread.insert_task(task)
+ instance_tasks[task_id] = task
+ #network_id = vim.new_network(net_vim_name, net_type, sce_net.get('ip_profile',None))
+ sce_net["vim_id_sites"][datacenter_id] = task_id
+ auxNetDict['scenario'][sce_net['uuid']][datacenter_id] = task_id
+ rollbackList.append({'what':'network', 'where':'vim', 'vim_id':datacenter_id, 'uuid':task_id})
sce_net["created"] = True
- #2. Creating new nets (vnf internal nets) in the VIM"
+ # 2. Creating new nets (vnf internal nets) in the VIM"
#For each vnf net, we create it and we add it to instanceNetlist.
for sce_vnf in scenarioDict['vnfs']:
for net in sce_vnf['nets']:
if sce_vnf.get("datacenter"):
vim = myvims[ sce_vnf["datacenter"] ]
datacenter_id = sce_vnf["datacenter"]
+ myvim_thread = myvim_threads[ sce_vnf["datacenter"]]
else:
vim = myvims[ default_datacenter_id ]
datacenter_id = default_datacenter_id
+ myvim_thread = myvim_threads[default_datacenter_id]
descriptor_net = instance_dict.get("vnfs",{}).get(sce_vnf["name"],{})
net_name = descriptor_net.get("name")
if not net_name:
net_name = "%s.%s" %(instance_name, net["name"])
net_name = net_name[:255] #limit length
net_type = net['type']
- network_id = vim.new_network(net_name, net_type, net.get('ip_profile',None))
- net['vim_id'] = network_id
+ task = new_task("new-net", (net_name, net_type, net.get('ip_profile',None)))
+ task_id = myvim_thread.insert_task(task)
+ instance_tasks[task_id] = task
+ # network_id = vim.new_network(net_name, net_type, net.get('ip_profile',None))
+ net['vim_id'] = task_id
if sce_vnf['uuid'] not in auxNetDict:
auxNetDict[sce_vnf['uuid']] = {}
- auxNetDict[sce_vnf['uuid']][net['uuid']] = network_id
- rollbackList.append({'what':'network','where':'vim','vim_id':datacenter_id,'uuid':network_id})
+ auxNetDict[sce_vnf['uuid']][net['uuid']] = task_id
+ rollbackList.append({'what':'network','where':'vim','vim_id':datacenter_id,'uuid':task_id})
net["created"] = True
#print "auxNetDict:"
#print yaml.safe_dump(auxNetDict, indent=4, default_flow_style=False)
- #3. Creating new vm instances in the VIM
+ # 3. Creating new vm instances in the VIM
#myvim.new_vminstance(self,vimURI,tenant_id,name,description,image_id,flavor_id,net_dict)
for sce_vnf in scenarioDict['vnfs']:
if sce_vnf.get("datacenter"):
vim = myvims[ sce_vnf["datacenter"] ]
+ myvim_thread = myvim_threads[ sce_vnf["datacenter"] ]
datacenter_id = sce_vnf["datacenter"]
else:
vim = myvims[ default_datacenter_id ]
+ myvim_thread = myvim_threads[ default_datacenter_id ]
datacenter_id = default_datacenter_id
sce_vnf["datacenter_id"] = datacenter_id
i = 0
flavor_dict['extended']= yaml.load(flavor_dict['extended'])
flavor_id = create_or_use_flavor(mydb, {datacenter_id: vim}, flavor_dict, rollbackList, True)
-
-
-
#Obtain information for additional disks
extended_flavor_dict = mydb.get_rows(FROM='datacenters_flavors', SELECT=('extended',), WHERE={'vim_id': flavor_id})
if not extended_flavor_dict:
if 'disks' in extended_flavor_dict_yaml:
myVMDict['disks'] = extended_flavor_dict_yaml['disks']
-
-
-
vm['vim_flavor_id'] = flavor_id
-
myVMDict['imageRef'] = vm['vim_image_id']
myVMDict['flavorRef'] = vm['vim_flavor_id']
myVMDict['networks'] = []
+ task_depends = {}
#TODO ALF. connect_mgmt_interfaces. Connect management interfaces if this is true
for iface in vm['interfaces']:
netDict = {}
break
else:
netDict['net_id'] = auxNetDict[ sce_vnf['uuid'] ][ iface['net_id'] ]
+ if is_task_id(netDict['net_id']):
+ task_depends[netDict['net_id']] = instance_tasks[netDict['net_id']]
#skip bridge ifaces not connected to any net
#if 'net_id' not in netDict or netDict['net_id']==None:
# continue
cloud_config_vm = unify_cloud_config(vm["boot_data"], cloud_config)
else:
cloud_config_vm = cloud_config
- vm_id = vim.new_vminstance(myVMDict['name'],myVMDict['description'],myVMDict.get('start', None),
- myVMDict['imageRef'],myVMDict['flavorRef'],myVMDict['networks'], cloud_config = cloud_config_vm,
- disk_list = myVMDict['disks'])
+ task = new_task("new-vm", (myVMDict['name'], myVMDict['description'], myVMDict.get('start', None),
+ myVMDict['imageRef'], myVMDict['flavorRef'], myVMDict['networks'],
+ cloud_config_vm, myVMDict['disks']), depends=task_depends)
+ vm_id = myvim_thread.insert_task(task)
+ instance_tasks[vm_id] = task
vm['vim_id'] = vm_id
rollbackList.append({'what':'vm','where':'vim','vim_id':datacenter_id,'uuid':vm_id})
logger.debug("create_instance Deployment done scenarioDict: %s",
yaml.safe_dump(scenarioDict, indent=4, default_flow_style=False) )
instance_id = mydb.new_instance_scenario_as_a_whole(tenant_id,instance_name, instance_description, scenarioDict)
+ # Update database with those ended tasks
+ for task in instance_tasks.values():
+ if task["status"] == "ok":
+ if task["name"] == "new-vm":
+ mydb.update_rows("instance_vms", UPDATE={"vim_vm_id": task["result"]},
+ WHERE={"vim_vm_id": task["id"]})
+ elif task["name"] == "new-net":
+ mydb.update_rows("instance_nets", UPDATE={"vim_net_id": task["result"]},
+ WHERE={"vim_net_id": task["id"]})
return mydb.get_instance_scenario(instance_id)
except (NfvoException, vimconn.vimconnException,db_base_Exception) as e:
message = rollback(mydb, myvims, rollbackList)
#logger.error("create_instance: %s", error_text)
raise NfvoException(error_text, e.http_code)
+
def delete_instance(mydb, tenant_id, instance_id):
#print "Checking that the instance_id exists and getting the instance dictionary"
instanceDict = mydb.get_instance_scenario(instance_id, tenant_id)
#2. delete from VIM
error_msg = ""
- myvims={}
+ myvims = {}
+ myvim_threads = {}
#2.1 deleting VMs
#vm_fail_list=[]
for sce_vnf in instanceDict['vnfs']:
datacenter_key = (sce_vnf["datacenter_id"], sce_vnf["datacenter_tenant_id"])
if datacenter_key not in myvims:
+ try:
+ myvim_thread = get_vim_thread(tenant_id, sce_vnf["datacenter_id"], sce_vnf["datacenter_tenant_id"])
+ except NfvoException as e:
+ logger.error(str(e))
+ myvim_thread = None
+ myvim_threads[datacenter_key] = myvim_thread
vims = get_vim(mydb, tenant_id, datacenter_id=sce_vnf["datacenter_id"],
datacenter_tenant_id=sce_vnf["datacenter_tenant_id"])
if len(vims) == 0:
else:
myvims[datacenter_key] = vims.values()[0]
myvim = myvims[datacenter_key]
+ myvim_thread = myvim_threads[datacenter_key]
for vm in sce_vnf['vms']:
if not myvim:
error_msg += "\n VM id={} cannot be deleted because datacenter={} not found".format(vm['vim_vm_id'], sce_vnf["datacenter_id"])
continue
try:
- myvim.delete_vminstance(vm['vim_vm_id'])
+ task=None
+ if is_task_id(vm['vim_vm_id']):
+ task_id = vm['vim_vm_id']
+ old_task = task_dict.get(task_id)
+ if not old_task:
+ error_msg += "\n VM was scheduled for create, but task {} is not found".format(task_id)
+ continue
+ with task_lock:
+ if old_task["status"] == "enqueued":
+ old_task["status"] = "deleted"
+ elif old_task["status"] == "error":
+ continue
+ elif old_task["status"] == "processing":
+ task = new_task("del-vm", task_id, depends={task_id: old_task})
+ else: #ok
+ task = new_task("del-vm", old_task["result"])
+ else:
+ task = new_task("del-vm", vm['vim_vm_id'], store=False)
+ if task:
+ myvim_thread.insert_task(task)
except vimconn.vimconnNotFoundException as e:
error_msg+="\n VM VIM_id={} not found at datacenter={}".format(vm['vim_vm_id'], sce_vnf["datacenter_id"])
logger.warn("VM instance '%s'uuid '%s', VIM id '%s', from VNF_id '%s' not found",
continue #skip not created nets
datacenter_key = (net["datacenter_id"], net["datacenter_tenant_id"])
if datacenter_key not in myvims:
+ try:
+ myvim_thread = get_vim_thread(tenant_id, sce_vnf["datacenter_id"], sce_vnf["datacenter_tenant_id"])
+ except NfvoException as e:
+ logger.error(str(e))
+ myvim_thread = None
+ myvim_threads[datacenter_key] = myvim_thread
vims = get_vim(mydb, tenant_id, datacenter_id=net["datacenter_id"],
datacenter_tenant_id=net["datacenter_tenant_id"])
if len(vims) == 0:
else:
myvims[datacenter_key] = vims.values()[0]
myvim = myvims[datacenter_key]
+ myvim_thread = myvim_threads[datacenter_key]
if not myvim:
error_msg += "\n Net VIM_id={} cannot be deleted because datacenter={} not found".format(net['vim_net_id'], net["datacenter_id"])
continue
try:
- myvim.delete_network(net['vim_net_id'])
+ task = None
+ if is_task_id(net['vim_net_id']):
+ task_id = net['vim_net_id']
+ old_task = task_dict.get(task_id)
+ if not old_task:
+ error_msg += "\n NET was scheduled for create, but task {} is not found".format(task_id)
+ continue
+ with task_lock:
+ if old_task["status"] == "enqueued":
+ old_task["status"] = "deleted"
+ elif old_task["status"] == "error":
+ continue
+ elif old_task["status"] == "processing":
+ task = new_task("del-net", task_id, depends={task_id: old_task})
+ else: # ok
+ task = new_task("del-net", old_task["result"])
+ else:
+ task = new_task("del-net", net['vim_net_id'], store=False)
+ if task:
+ myvim_thread.insert_task(task)
except vimconn.vimconnNotFoundException as e:
- error_msg+="\n NET VIM_id={} not found at datacenter={}".format(net['vim_net_id'], net["datacenter_id"])
+ error_msg += "\n NET VIM_id={} not found at datacenter={}".format(net['vim_net_id'], net["datacenter_id"])
logger.warn("NET '%s', VIM_id '%s', from VNF_net_id '%s' not found",
- net['uuid'], net['vim_net_id'], str(net['vnf_net_id']))
+ net['uuid'], net['vim_net_id'], str(net['vnf_net_id']))
except vimconn.vimconnException as e:
- error_msg+="\n NET VIM_id={} at datacenter={} Error: {} {}".format(net['vim_net_id'], net["datacenter_id"], e.http_code, str(e))
+ error_msg += "\n NET VIM_id={} at datacenter={} Error: {} {}".format(net['vim_net_id'],
+ net["datacenter_id"],
+ e.http_code, str(e))
logger.error("Error %d deleting NET '%s', VIM_id '%s', from VNF_net_id '%s': %s",
- e.http_code, net['uuid'], net['vim_net_id'], str(net['vnf_net_id']), str(e))
- if len(error_msg)>0:
+ e.http_code, net['uuid'], net['vim_net_id'], str(net['vnf_net_id']), str(e))
+ if len(error_msg) > 0:
return 'instance ' + message + ' deleted but some elements could not be deleted, or already deleted (error: 404) from VIM: ' + error_msg
else:
return 'instance ' + message + ' deleted'
+
def refresh_instance(mydb, nfvo_tenant, instanceDict, datacenter=None, vim_tenant=None):
'''Refreshes a scenario instance. It modifies instanceDict'''
'''Returns:
return 0, 'Scenario instance ' + instance_id + ' refreshed.'
+
def instance_action(mydb,nfvo_tenant,instance_id, action_dict):
#print "Checking that the instance_id exists and getting the instance dictionary"
instanceDict = mydb.get_instance_scenario(instance_id, nfvo_tenant)
else:
return vm_result
+
def create_or_use_console_proxy_thread(console_server, console_port):
#look for a non-used port
console_thread_key = console_server + ":" + str(console_port)
raise NfvoException(str(e), HTTP_Bad_Request)
raise NfvoException("Not found any free 'http_console_ports'", HTTP_Conflict)
+
def check_tenant(mydb, tenant_id):
'''check that tenant exists at database'''
tenant = mydb.get_rows(FROM='nfvo_tenants', SELECT=('uuid',), WHERE={'uuid': tenant_id})
raise NfvoException("tenant '{}' not found".format(tenant_id), HTTP_Not_Found)
return
+
def new_tenant(mydb, tenant_dict):
tenant_id = mydb.new_row("nfvo_tenants", tenant_dict, add_uuid=True)
return tenant_id
+
def delete_tenant(mydb, tenant):
#get nfvo_tenant info
mydb.delete_row_by_id("nfvo_tenants", tenant_dict['uuid'])
return tenant_dict['uuid'] + " " + tenant_dict["name"]
+
def new_datacenter(mydb, datacenter_descriptor):
if "config" in datacenter_descriptor:
datacenter_descriptor["config"]=yaml.safe_dump(datacenter_descriptor["config"],default_flow_style=True,width=256)
datacenter_id = mydb.new_row("datacenters", datacenter_descriptor, add_uuid=True)
return datacenter_id
+
def edit_datacenter(mydb, datacenter_id_name, datacenter_descriptor):
#obtain data, check that only one exist
datacenter = mydb.get_table_by_uuid_name('datacenters', datacenter_id_name)
mydb.update_rows('datacenters', datacenter_descriptor, where)
return datacenter_id
+
def delete_datacenter(mydb, datacenter):
#get nfvo_tenant info
datacenter_dict = mydb.get_table_by_uuid_name('datacenters', datacenter, 'datacenter')
mydb.delete_row_by_id("datacenters", datacenter_dict['uuid'])
return datacenter_dict['uuid'] + " " + datacenter_dict['name']
+
def associate_datacenter_to_tenant(mydb, nfvo_tenant, datacenter, vim_tenant_id=None, vim_tenant_name=None, vim_username=None, vim_password=None, config=None):
#get datacenter info
datacenter_id, myvim = get_datacenter_by_name_uuid(mydb, None, datacenter)
# create thread
datacenter_id, myvim = get_datacenter_by_name_uuid(mydb, tenant_dict['uuid'], datacenter_id) # reload data
thread_name = get_non_used_vim_name(datacenter_name, datacenter_id, tenant_dict['name'], tenant_dict['uuid'])
- new_thread = vim_thread.vim_thread(myvim, thread_name)
+ new_thread = vim_thread.vim_thread(myvim, task_lock, thread_name, datacenter_name, db=db, db_lock=db_lock)
new_thread.start()
- vim_threads["running"][datacenter_id + "-" + tenant_dict['uuid']] = new_thread
-
+ thread_id = datacenter_id + "." + tenant_dict['uuid']
+ vim_threads["running"][thread_id] = new_thread
return datacenter_id
+
def deassociate_datacenter_to_tenant(mydb, tenant_id, datacenter, vim_tenant_id=None):
#get datacenter info
datacenter_id, myvim = get_datacenter_by_name_uuid(mydb, None, datacenter)
except db_base_Exception as e:
logger.error("Cannot delete datacenter_tenants " + str(e))
pass # the error will be caused because dependencies, vim_tenant can not be deleted
- thread_id = datacenter_id + "-" + tenant_datacenter_item["nfvo_tenant_id"]
+ thread_id = datacenter_id + "." + tenant_datacenter_item["nfvo_tenant_id"]
thread = vim_threads["running"][thread_id]
- thread.insert_task("exit")
+ thread.insert_task(new_task("exit", None, store=False))
vim_threads["deleting"][thread_id] = thread
return "datacenter {} detached. {}".format(datacenter_id, warning)
+
def datacenter_action(mydb, tenant_id, datacenter, action_dict):
#DEPRECATED
#get datacenter info
else:
raise NfvoException("Unknown action " + str(action_dict), HTTP_Bad_Request)
+
def datacenter_edit_netmap(mydb, tenant_id, datacenter, netmap, action_dict):
#get datacenter info
datacenter_id, _ = get_datacenter_by_name_uuid(mydb, tenant_id, datacenter)
WHERE={'datacenter_id':datacenter_id, what: netmap})
return result
+
def datacenter_new_netmap(mydb, tenant_id, datacenter, action_dict=None):
#get datacenter info
datacenter_id, myvim = get_datacenter_by_name_uuid(mydb, tenant_id, datacenter)
net_list.append(net_nfvo)
return net_list
+
def vim_action_get(mydb, tenant_id, datacenter, item, name):
#get datacenter info
datacenter_id, myvim = get_datacenter_by_name_uuid(mydb, tenant_id, datacenter)
print "vim_action Not possible to get_%s_list from VIM: %s " % (item, str(e))
raise NfvoException("Not possible to get_{}_list from VIM: {}".format(item, str(e)), e.http_code)
+
def vim_action_delete(mydb, tenant_id, datacenter, item, name):
#get datacenter info
if tenant_id == "any":
return "{} {} {} deleted".format(item[:-1], item_id,item_name)
+
def vim_action_create(mydb, tenant_id, datacenter, item, descriptor):
#get datacenter info
logger.debug("vim_action_create descriptor %s", str(descriptor))
import Queue
import logging
import vimconn
-
+from db_base import db_base_Exception
# from logging import Logger
# import auxiliary_functions as af
-# TODO: insert a logging system
+
+def is_task_id(id):
+ return True if id[:5] == "TASK." else False
class vim_thread(threading.Thread):
- def __init__(self, vimconn, name=None):
- '''Init a thread.
+ def __init__(self, vimconn, task_lock, name=None, datacenter_name=None, datacenter_tenant_id=None, db=None, db_lock=None):
+ """Init a thread.
Arguments:
'id' number of thead
'name' name of thread
'host','user': host ip or name to manage and user
'db', 'db_lock': database class and lock to use it in exclusion
- '''
+ """
+ self.tasksResult = {}
+ """ It will contain a dictionary with
+ task_id:
+ status: enqueued,done,error,deleted,processing
+ result: VIM result,
+ """
threading.Thread.__init__(self)
self.vim = vimconn
+ self.datacenter_name = datacenter_name
+ self.datacenter_tenant_id = datacenter_tenant_id
if not name:
- self.name = vimconn["id"] + "-" + vimconn["config"]["datacenter_tenant_id"]
+ self.name = vimconn["id"] + "." + vimconn["config"]["datacenter_tenant_id"]
else:
self.name = name
self.logger = logging.getLogger('openmano.vim.'+self.name)
+ self.db = db
+ self.db_lock = db_lock
- self.queueLock = threading.Lock()
- self.taskQueue = Queue.Queue(2000)
+ self.task_lock = task_lock
+ self.task_queue = Queue.Queue(2000)
-
- def insert_task(self, task, *aditional):
+ def insert_task(self, task):
try:
- self.queueLock.acquire()
- task = self.taskQueue.put( (task,) + aditional, timeout=5)
- self.queueLock.release()
- return 1, None
+ self.task_queue.put(task, False)
+ return task["id"]
except Queue.Full:
- return -1, "timeout inserting a task over host " + self.name
+ raise vimconn.vimconnException(self.name + ": timeout inserting a task")
+
+ def del_task(self, task):
+ with self.task_lock:
+ if task["status"] == "enqueued":
+ task["status"] == "deleted"
+ return True
+ else: # task["status"] == "processing"
+ self.task_lock.release()
+ return False
def run(self):
self.logger.debug("Starting")
while True:
#TODO reload service
while True:
- self.queueLock.acquire()
- if not self.taskQueue.empty():
- task = self.taskQueue.get()
+ if not self.task_queue.empty():
+ task = self.task_queue.get()
+ self.task_lock.acquire()
+ if task["status"] == "deleted":
+ self.task_lock.release()
+ continue
+ task["status"] == "processing"
+ self.task_lock.release()
else:
- task = None
- self.queueLock.release()
-
- if task is None:
now=time.time()
time.sleep(1)
- continue
-
- if task[0] == 'instance':
- pass
- elif task[0] == 'image':
- pass
- elif task[0] == 'exit':
- print self.name, ": processing task exit"
- self.terminate()
+ continue
+ self.logger.debug("processing task id={} name={} params={}".format(task["id"], task["name"],
+ str(task["params"])))
+ if task["name"] == 'exit' or task["name"] == 'reload':
+ result, content = self.terminate(task)
+ elif task["name"] == 'new-vm':
+ result, content = self.new_vm(task)
+ elif task["name"] == 'del-vm':
+ result, content = self.del_vm(task)
+ elif task["name"] == 'new-net':
+ result, content = self.new_net(task)
+ elif task["name"] == 'del-net':
+ result, content = self.del_net(task)
+ else:
+ error_text = "unknown task {}".format(task["name"])
+ self.logger.error(error_text)
+ result = False
+ content = error_text
+
+ with self.task_lock:
+ task["status"] = "done" if result else "error"
+ task["result"] = content
+ self.task_queue.task_done()
+
+ if task["name"] == 'exit':
return 0
- elif task[0] == 'reload':
- print self.name, ": processing task reload terminating and relaunching"
- self.terminate()
+ elif task["name"] == 'reload':
break
- elif task[0] == 'edit-iface':
- pass
- elif task[0] == 'restore-iface':
- pass
- elif task[0] == 'new-ovsbridge':
- pass
- elif task[0] == 'new-vxlan':
- pass
- elif task[0] == 'del-ovsbridge':
- pass
- elif task[0] == 'del-vxlan':
- pass
- elif task[0] == 'create-ovs-bridge-port':
- pass
- elif task[0] == 'del-ovs-port':
- pass
- else:
- self.logger.error("unknown task %s", str(task))
self.logger.debug("Finishing")
- def terminate(self):
- pass
+ def terminate(self, task):
+ return True, None
+
+ def new_net(self, task):
+ try:
+ task_id = task["id"]
+ params = task["params"]
+ net_id = self.vim.new_network(*params)
+ with self.db_lock:
+ self.db.update_rows("instance_nets", UPDATE={"vim_net_id": net_id}, WHERE={"vim_net_id": task_id})
+ return True, net_id
+ except db_base_Exception as e:
+ self.logger.error("Error updating database %s", str(e))
+ return True, net_id
+ except vimconn.vimconnException as e:
+ return False, str(e)
+
+ def new_vm(self, task):
+ try:
+ params = task["params"]
+ task_id = task["id"]
+ depends = task.get("depends")
+ net_list = params[5]
+ for net in net_list:
+ if is_task_id(net["net_id"]): # change task_id into network_id
+ try:
+ task_net = depends[net["net_id"]]
+ with self.task_lock:
+ if task_net["status"] == "error":
+ return False, "Cannot create VM because depends on a network that cannot be created: " + \
+ str(task_net["result"])
+ elif task_net["status"] == "enqueued" or task_net["status"] == "processing":
+ return False, "Cannot create VM because depends on a network still not created"
+ network_id = task_net["result"]
+ net["net_id"] = network_id
+ except Exception as e:
+ return False, "Error trying to map from task_id={} to task result: {}".format(net["net_id"],
+ str(e))
+ vm_id = self.vim.new_vminstance(*params)
+ with self.db_lock:
+ self.db.update_rows("instance_vms", UPDATE={"vim_vm_id": vm_id}, WHERE={"vim_vm_id": task_id})
+ return True, vm_id
+ except db_base_Exception as e:
+ self.logger.error("Error updtaing database %s", str(e))
+ return True, vm_id
+ except vimconn.vimconnException as e:
+ return False, str(e)
+
+ def del_vm(self, task):
+ vm_id = task["params"]
+ if is_task_id(vm_id):
+ try:
+ task_create = task["depends"][vm_id]
+ with self.task_lock:
+ if task_create["status"] == "error":
+ return True, "VM was not created. It has error: " + str(task_create["result"])
+ elif task_create["status"] == "enqueued" or task_create["status"] == "processing":
+ return False, "Cannot delete VM because still creating"
+ vm_id = task_create["result"]
+ except Exception as e:
+ return False, "Error trying to get task_id='{}':".format(vm_id, str(e))
+ try:
+ return True, self.vim.delete_vminstance(vm_id)
+ except vimconn.vimconnException as e:
+ return False, str(e)
+
+ def del_net(self, task):
+ net_id = task["params"]
+ if is_task_id(net_id):
+ try:
+ task_create = task["depends"][net_id]
+ with self.task_lock:
+ if task_create["status"] == "error":
+ return True, "net was not created. It has error: " + str(task_create["result"])
+ elif task_create["status"] == "enqueued" or task_create["status"] == "processing":
+ return False, "Cannot delete net because still creating"
+ net_id = task_create["result"]
+ except Exception as e:
+ return False, "Error trying to get task_id='{}':".format(net_id, str(e))
+ try:
+ return True, self.vim.delete_network(net_id)
+ except vimconn.vimconnException as e:
+ return False, str(e)
+