From: tierno Date: Fri, 3 Mar 2017 22:51:05 +0000 (+0100) Subject: new persistent_info param at vimconn class X-Git-Tag: v2.0.0~40 X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=refs%2Fchanges%2F26%2F1226%2F1;p=osm%2FRO.git new persistent_info param at vimconn class Task management at vim_thread Change-Id: Ica8ef928811a973ac8edc669692ad2c867d9518a Signed-off-by: tierno --- diff --git a/nfvo.py b/nfvo.py index 80a606ba..14ec54f1 100644 --- a/nfvo.py +++ b/nfvo.py @@ -39,6 +39,9 @@ import vimconn import logging import collections from db_base import db_base_Exception +import nfvo_db +from threading import Lock +from time import time global global_config global vimconn_imported @@ -49,7 +52,13 @@ default_volume_size = '5' #size in GB vimconn_imported = {} # dictionary with VIM type as key, loaded module as value vim_threads = {"running":{}, "deleting": {}, "names": []} # threads running for attached-VIMs +vim_persistent_info = {} logger = logging.getLogger('openmano.nfvo') +task_lock = Lock() +task_dict = {} +last_task_id = 0.0 +db=None +db_lock=Lock() class NfvoException(Exception): def __init__(self, message, http_code): @@ -57,12 +66,35 @@ class NfvoException(Exception): Exception.__init__(self, message) +def get_task_id(): + global last_task_id + task_id = time() + if task_id <= last_task_id: + task_id = last_task_id + 0.000001 + last_task_id = task_id + return "TASK.{:.6f}".format(task_id) + + +def new_task(name, params, store=True, depends=None): + task_id = get_task_id() + task = {"status": "enqueued", "id": task_id, "name": name, "params": params} + if depends: + task["depends"] = depends + if store: + task_dict[task_id] = task + return task + + +def is_task_id(id): + return True if id[:5] == "TASK." else False + + def get_non_used_vim_name(datacenter_name, datacenter_id, tenant_name, tenant_id): name = datacenter_name[:16] if name not in vim_threads["names"]: vim_threads["names"].append(name) return name - name = datatacenter_name[:16] + "." + tenant_name[:16] + name = datacenter_name[:16] + "." + tenant_name[:16] if name not in vim_threads["names"]: vim_threads["names"].append(name) return name @@ -72,6 +104,9 @@ def get_non_used_vim_name(datacenter_name, datacenter_id, tenant_name, tenant_id def start_service(mydb): + global db, global_config + db = nfvo_db.nfvo_db() + db.connect(global_config['db_host'], global_config['db_user'], global_config['db_passwd'], global_config['db_name']) from_= 'tenants_datacenters as td join datacenters as d on td.datacenter_id=d.uuid join datacenter_tenants as dt on td.datacenter_tenant_id=dt.uuid' select_ = ('type','d.config as config','d.uuid as datacenter_id', 'vim_url', 'vim_url_admin', 'd.name as datacenter_name', 'dt.uuid as datacenter_tenant_id','dt.vim_tenant_name as vim_tenant_name','dt.vim_tenant_id as vim_tenant_id', @@ -95,33 +130,37 @@ def start_service(mydb): if module_info and module_info[0]: file.close(module_info[0]) raise NfvoException("Unknown vim type '{}'. Can not open file '{}.py'; {}: {}".format( - vim["type"], module, type(e).__name__, str(e)), HTTP_Bad_Request) + vim["type"], module, type(e).__name__, str(e)), HTTP_Bad_Request) + thread_id = vim["datacenter_id"] + "." + vim['nfvo_tenant_id'] + vim_persistent_info[thread_id] = {} try: #if not tenant: # return -HTTP_Bad_Request, "You must provide a valid tenant name or uuid for VIM %s" % ( vim["type"]) myvim = vimconn_imported[ vim["type"] ].vimconnector( - uuid=vim['datacenter_id'], name=vim['datacenter_name'], - tenant_id=vim['vim_tenant_id'], tenant_name=vim['vim_tenant_name'], - url=vim['vim_url'], url_admin=vim['vim_url_admin'], - user=vim['user'], passwd=vim['passwd'], - config=extra - ) + uuid=vim['datacenter_id'], name=vim['datacenter_name'], + tenant_id=vim['vim_tenant_id'], tenant_name=vim['vim_tenant_name'], + url=vim['vim_url'], url_admin=vim['vim_url_admin'], + user=vim['user'], passwd=vim['passwd'], + config=extra, persistent_info=vim_persistent_info[thread_id] + ) except Exception as e: raise NfvoException("Error at VIM {}; {}: {}".format(vim["type"], type(e).__name__, str(e)), HTTP_Internal_Server_Error) thread_name = get_non_used_vim_name(vim['datacenter_name'], vim['vim_tenant_id'], vim['vim_tenant_name'], vim['vim_tenant_id']) - new_thread = vim_thread.vim_thread(myvim, thread_name) + new_thread = vim_thread.vim_thread(myvim, task_lock, thread_name, vim['datacenter_name'], + vim.get('datacenter_tenant_id'), db=db, db_lock=db_lock) new_thread.start() - thread_id = vim["datacenter_id"] + "-" + vim['nfvo_tenant_id'] vim_threads["running"][thread_id] = new_thread except db_base_Exception as e: raise NfvoException(str(e) + " at nfvo.get_vim", e.http_code) + def stop_service(): for thread_id,thread in vim_threads["running"].items(): - thread.insert_task("exit") + thread.insert_task(new_task("exit", None, store=False)) vim_threads["deleting"][thread_id] = thread - vim_threads["running"]={} + vim_threads["running"] = {} + def get_flavorlist(mydb, vnf_id, nfvo_tenant=None): '''Obtain flavorList @@ -144,6 +183,7 @@ def get_flavorlist(mydb, vnf_id, nfvo_tenant=None): flavorList.append(flavor['flavor_id']) return flavorList + def get_imagelist(mydb, vnf_id, nfvo_tenant=None): '''Obtain imageList return result, content: @@ -162,6 +202,7 @@ def get_imagelist(mydb, vnf_id, nfvo_tenant=None): imageList.append(image['image_id']) return imageList + def get_vim(mydb, nfvo_tenant=None, datacenter_id=None, datacenter_name=None, datacenter_tenant_id=None, vim_tenant=None, vim_tenant_name=None, vim_user=None, vim_passwd=None): '''Obtain a dictionary of VIM (datacenter) classes with some of the input parameters @@ -207,14 +248,22 @@ def get_vim(mydb, nfvo_tenant=None, datacenter_id=None, datacenter_name=None, da vim["type"], module, type(e).__name__, str(e)), HTTP_Bad_Request) try: + if 'nfvo_tenant_id' in vim: + thread_id = vim["datacenter_id"] + "." + vim['nfvo_tenant_id'] + if thread_id not in vim_persistent_info: + vim_persistent_info[thread_id] = {} + persistent_info = vim_persistent_info[thread_id] + else: + persistent_info = {} #if not tenant: # return -HTTP_Bad_Request, "You must provide a valid tenant name or uuid for VIM %s" % ( vim["type"]) vim_dict[ vim['datacenter_id'] ] = vimconn_imported[ vim["type"] ].vimconnector( uuid=vim['datacenter_id'], name=vim['datacenter_name'], - tenant_id=vim.get('vim_tenant_id',vim_tenant), tenant_name=vim.get('vim_tenant_name',vim_tenant_name), + tenant_id=vim.get('vim_tenant_id',vim_tenant), + tenant_name=vim.get('vim_tenant_name',vim_tenant_name), url=vim['vim_url'], url_admin=vim['vim_url_admin'], user=vim.get('user',vim_user), passwd=vim.get('passwd',vim_passwd), - config=extra + config=extra, persistent_info=persistent_info ) except Exception as e: raise NfvoException("Error at VIM {}; {}: {}".format(vim["type"], type(e).__name__, str(e)), HTTP_Internal_Server_Error) @@ -222,6 +271,7 @@ def get_vim(mydb, nfvo_tenant=None, datacenter_id=None, datacenter_name=None, da except db_base_Exception as e: raise NfvoException(str(e) + " at nfvo.get_vim", e.http_code) + def rollback(mydb, vims, rollback_list): undeleted_items=[] #delete things by reverse order @@ -262,6 +312,7 @@ def rollback(mydb, vims, rollback_list): else: return False," Rollback fails to delete: " + str(undeleted_items) + def check_vnf_descriptor(vnf_descriptor, vnf_descriptor_version=1): global global_config #create a dictionary with vnfc-name: vnfc:interface-list key:values pairs @@ -458,6 +509,7 @@ def create_or_use_image(mydb, vims, image_dict, rollback_list, only_create_at_vi return image_vim_id if only_create_at_vim else image_mano_id + def create_or_use_flavor(mydb, vims, flavor_dict, rollback_list, only_create_at_vim=False, return_on_error = None): temp_flavor_dict= {'disk':flavor_dict.get('disk',1), 'ram':flavor_dict.get('ram'), @@ -610,6 +662,7 @@ def create_or_use_flavor(mydb, vims, flavor_dict, rollback_list, only_create_at_ return flavor_vim_id if only_create_at_vim else flavor_mano_id + def new_vnf(mydb, tenant_id, vnf_descriptor): global global_config @@ -744,6 +797,7 @@ def new_vnf(mydb, tenant_id, vnf_descriptor): #logger.error("start_scenario %s", error_text) raise NfvoException(error_text, e.http_code) + def new_vnf_v02(mydb, tenant_id, vnf_descriptor): global global_config @@ -877,6 +931,7 @@ def new_vnf_v02(mydb, tenant_id, vnf_descriptor): #logger.error("start_scenario %s", error_text) raise NfvoException(error_text, e.http_code) + def get_vnf_id(mydb, tenant_id, vnf_id): #check valid tenant_id check_tenant(mydb, tenant_id) @@ -1034,6 +1089,7 @@ def delete_vnf(mydb,tenant_id,vnf_id,datacenter=None,vim_tenant=None): #if undeletedItems: # return "delete_vnf. Undeleted: %s" %(undeletedItems) + def get_hosts_info(mydb, nfvo_tenant_id, datacenter_name=None): result, vims = get_vim(mydb, nfvo_tenant_id, None, datacenter_name) if result < 0: @@ -1047,6 +1103,7 @@ def get_hosts_info(mydb, nfvo_tenant_id, datacenter_name=None): topology = {'name':myvim['name'] , 'servers': servers} return result, topology + def get_hosts(mydb, nfvo_tenant_id): vims = get_vim(mydb, nfvo_tenant_id) if len(vims) == 0: @@ -1082,6 +1139,7 @@ def get_hosts(mydb, nfvo_tenant_id): except vimconn.vimconnException as e: raise NfvoException("Not possible to get_host_list from VIM: {}".format(str(e)), e.http_code) + def new_scenario(mydb, tenant_id, topo): # result, vims = get_vim(mydb, tenant_id) @@ -1365,6 +1423,7 @@ def new_scenario(mydb, tenant_id, topo): return c + def new_scenario_v02(mydb, tenant_id, scenario_dict, version): """ This creates a new scenario for version 0.2 and 0.3""" scenario = scenario_dict["scenario"] @@ -1488,12 +1547,14 @@ def new_scenario_v02(mydb, tenant_id, scenario_dict, version): scenario_id = mydb.new_scenario(scenario) return scenario_id + def edit_scenario(mydb, tenant_id, scenario_id, data): data["uuid"] = scenario_id data["tenant_id"] = tenant_id c = mydb.edit_scenario( data ) return c + def start_scenario(mydb, tenant_id, scenario_id, instance_scenario_name, instance_scenario_description, datacenter=None,vim_tenant=None, startvms=True): #print "Checking that nfvo_tenant_id exists and getting the VIM URI and the VIM tenant_id" datacenter_id, myvim = get_datacenter_by_name_uuid(mydb, tenant_id, datacenter, vim_tenant=vim_tenant) @@ -1696,6 +1757,7 @@ def start_scenario(mydb, tenant_id, scenario_id, instance_scenario_name, instanc #logger.error("start_scenario %s", error_text) raise NfvoException(error_text, e.http_code) + def unify_cloud_config(cloud_config_preserve, cloud_config): ''' join the cloud config information into cloud_config_preserve. In case of conflict cloud_config_preserve preserves @@ -1772,6 +1834,33 @@ def unify_cloud_config(cloud_config_preserve, cloud_config): return new_cloud_config +def get_vim_thread(tenant_id, datacenter_id_name=None, datacenter_tenant_id=None): + datacenter_id = None + datacenter_name = None + thread = None + if datacenter_id_name: + if utils.check_valid_uuid(datacenter_id_name): + datacenter_id = datacenter_id_name + else: + datacenter_name = datacenter_id_name + if datacenter_id: + thread = vim_threads["running"].get(datacenter_id + "." + tenant_id) + else: + for k, v in vim_threads["running"].items(): + datacenter_tenant = k.split(".") + if datacenter_tenant[0] == datacenter_id and datacenter_tenant[1] == tenant_id: + if thread: + raise NfvoException("More than one datacenters found, try to identify with uuid", HTTP_Conflict) + thread = v + elif not datacenter_id and datacenter_tenant[1] == tenant_id: + if thread.datacenter_name == datacenter_name: + if thread: + raise NfvoException("More than one datacenters found, try to identify with uuid", HTTP_Conflict) + thread = v + if not thread: + raise NfvoException("datacenter '{}' not found".format(str(datacenter_id_name)), HTTP_Not_Found) + return thread + def get_datacenter_by_name_uuid(mydb, tenant_id, datacenter_id_name=None, **extra_filter): datacenter_id = None @@ -1789,6 +1878,7 @@ def get_datacenter_by_name_uuid(mydb, tenant_id, datacenter_id_name=None, **extr raise NfvoException("More than one datacenters found, try to identify with uuid", HTTP_Conflict) return vims.keys()[0], vims.values()[0] + def update(d, u): '''Takes dict d and updates it with the values in dict u.''' '''It merges all depth levels''' @@ -1800,17 +1890,20 @@ def update(d, u): d[k] = u[k] return d + def create_instance(mydb, tenant_id, instance_dict): - #print "Checking that nfvo_tenant_id exists and getting the VIM URI and the VIM tenant_id" - #logger.debug("Creating instance...") + # print "Checking that nfvo_tenant_id exists and getting the VIM URI and the VIM tenant_id" + # logger.debug("Creating instance...") scenario = instance_dict["scenario"] #find main datacenter myvims = {} + myvim_threads = {} datacenter2tenant = {} datacenter = instance_dict.get("datacenter") default_datacenter_id, vim = get_datacenter_by_name_uuid(mydb, tenant_id, datacenter) myvims[default_datacenter_id] = vim + myvim_threads[default_datacenter_id] = get_vim_thread(tenant_id, default_datacenter_id) datacenter2tenant[default_datacenter_id] = vim['config']['datacenter_tenant_id'] #myvim_tenant = myvim['tenant_id'] # default_datacenter_name = vim['name'] @@ -1831,10 +1924,11 @@ def create_instance(mydb, tenant_id, instance_dict): logger.debug("Creating instance from scenario-dict:\n%s", yaml.safe_dump(scenarioDict, indent=4, default_flow_style=False)) #TODO remove instance_name = instance_dict["name"] instance_description = instance_dict.get("description") + instance_tasks={} try: - #0 check correct parameters + # 0 check correct parameters for net_name, net_instance_desc in instance_dict.get("networks",{}).iteritems(): - found=False + found = False for scenario_net in scenarioDict['nets']: if net_name == scenario_net["name"]: found = True @@ -1850,13 +1944,14 @@ def create_instance(mydb, tenant_id, instance_dict): #Add this datacenter to myvims d, v = get_datacenter_by_name_uuid(mydb, tenant_id, site["datacenter"]) myvims[d] = v + myvim_threads[d] = get_vim_thread(tenant_id, site["datacenter"]) datacenter2tenant[d] = v['config']['datacenter_tenant_id'] - site["datacenter"] = d #change name to id + site["datacenter"] = d #change name to id else: if site_without_datacenter_field: raise NfvoException("Found more than one entries without datacenter field at instance:networks:{}:sites".format(net_name), HTTP_Bad_Request) site_without_datacenter_field = True - site["datacenter"] = default_datacenter_id #change name to id + site["datacenter"] = default_datacenter_id #change name to id for vnf_name, vnf_instance_desc in instance_dict.get("vnfs",{}).iteritems(): found=False @@ -1867,10 +1962,11 @@ def create_instance(mydb, tenant_id, instance_dict): if not found: raise NfvoException("Invalid vnf name '{}' at instance:vnfs".format(vnf_instance_desc), HTTP_Bad_Request) if "datacenter" in vnf_instance_desc: - #Add this datacenter to myvims + # Add this datacenter to myvims if vnf_instance_desc["datacenter"] not in myvims: d, v = get_datacenter_by_name_uuid(mydb, tenant_id, vnf_instance_desc["datacenter"]) myvims[d] = v + myvim_threads[d] = get_vim_thread(tenant_id, vnf_instance_desc["datacenter"]) datacenter2tenant[d] = v['config']['datacenter_tenant_id'] scenario_vnf["datacenter"] = vnf_instance_desc["datacenter"] @@ -1910,7 +2006,7 @@ def create_instance(mydb, tenant_id, instance_dict): logger.debug("Creating instance scenario-dict MERGED:\n%s", yaml.safe_dump(scenarioDict, indent=4, default_flow_style=False)) - #1. Creating new nets (sce_nets) in the VIM" + # 1. Creating new nets (sce_nets) in the VIM" for sce_net in scenarioDict['nets']: sce_net["vim_id_sites"]={} descriptor_net = instance_dict.get("networks",{}).get(sce_net["name"],{}) @@ -1922,9 +2018,11 @@ def create_instance(mydb, tenant_id, instance_dict): if site.get("datacenter"): vim = myvims[ site["datacenter"] ] datacenter_id = site["datacenter"] + myvim_thread = myvim_threads[ site["datacenter"] ] else: vim = myvims[ default_datacenter_id ] datacenter_id = default_datacenter_id + myvim_thread = myvim_threads[default_datacenter_id] net_type = sce_net['type'] lookfor_filter = {'admin_state_up': True, 'status': 'ACTIVE'} #'shared': True if sce_net["external"]: @@ -1982,48 +2080,58 @@ def create_instance(mydb, tenant_id, instance_dict): create_network = False if create_network: #if network is not external - network_id = vim.new_network(net_vim_name, net_type, sce_net.get('ip_profile',None)) - sce_net["vim_id_sites"][datacenter_id] = network_id - auxNetDict['scenario'][sce_net['uuid']][datacenter_id] = network_id - rollbackList.append({'what':'network', 'where':'vim', 'vim_id':datacenter_id, 'uuid':network_id}) + task = new_task("new-net", (net_vim_name, net_type, sce_net.get('ip_profile',None))) + task_id = myvim_thread.insert_task(task) + instance_tasks[task_id] = task + #network_id = vim.new_network(net_vim_name, net_type, sce_net.get('ip_profile',None)) + sce_net["vim_id_sites"][datacenter_id] = task_id + auxNetDict['scenario'][sce_net['uuid']][datacenter_id] = task_id + rollbackList.append({'what':'network', 'where':'vim', 'vim_id':datacenter_id, 'uuid':task_id}) sce_net["created"] = True - #2. Creating new nets (vnf internal nets) in the VIM" + # 2. Creating new nets (vnf internal nets) in the VIM" #For each vnf net, we create it and we add it to instanceNetlist. for sce_vnf in scenarioDict['vnfs']: for net in sce_vnf['nets']: if sce_vnf.get("datacenter"): vim = myvims[ sce_vnf["datacenter"] ] datacenter_id = sce_vnf["datacenter"] + myvim_thread = myvim_threads[ sce_vnf["datacenter"]] else: vim = myvims[ default_datacenter_id ] datacenter_id = default_datacenter_id + myvim_thread = myvim_threads[default_datacenter_id] descriptor_net = instance_dict.get("vnfs",{}).get(sce_vnf["name"],{}) net_name = descriptor_net.get("name") if not net_name: net_name = "%s.%s" %(instance_name, net["name"]) net_name = net_name[:255] #limit length net_type = net['type'] - network_id = vim.new_network(net_name, net_type, net.get('ip_profile',None)) - net['vim_id'] = network_id + task = new_task("new-net", (net_name, net_type, net.get('ip_profile',None))) + task_id = myvim_thread.insert_task(task) + instance_tasks[task_id] = task + # network_id = vim.new_network(net_name, net_type, net.get('ip_profile',None)) + net['vim_id'] = task_id if sce_vnf['uuid'] not in auxNetDict: auxNetDict[sce_vnf['uuid']] = {} - auxNetDict[sce_vnf['uuid']][net['uuid']] = network_id - rollbackList.append({'what':'network','where':'vim','vim_id':datacenter_id,'uuid':network_id}) + auxNetDict[sce_vnf['uuid']][net['uuid']] = task_id + rollbackList.append({'what':'network','where':'vim','vim_id':datacenter_id,'uuid':task_id}) net["created"] = True #print "auxNetDict:" #print yaml.safe_dump(auxNetDict, indent=4, default_flow_style=False) - #3. Creating new vm instances in the VIM + # 3. Creating new vm instances in the VIM #myvim.new_vminstance(self,vimURI,tenant_id,name,description,image_id,flavor_id,net_dict) for sce_vnf in scenarioDict['vnfs']: if sce_vnf.get("datacenter"): vim = myvims[ sce_vnf["datacenter"] ] + myvim_thread = myvim_threads[ sce_vnf["datacenter"] ] datacenter_id = sce_vnf["datacenter"] else: vim = myvims[ default_datacenter_id ] + myvim_thread = myvim_threads[ default_datacenter_id ] datacenter_id = default_datacenter_id sce_vnf["datacenter_id"] = datacenter_id i = 0 @@ -2046,9 +2154,6 @@ def create_instance(mydb, tenant_id, instance_dict): flavor_dict['extended']= yaml.load(flavor_dict['extended']) flavor_id = create_or_use_flavor(mydb, {datacenter_id: vim}, flavor_dict, rollbackList, True) - - - #Obtain information for additional disks extended_flavor_dict = mydb.get_rows(FROM='datacenters_flavors', SELECT=('extended',), WHERE={'vim_id': flavor_id}) if not extended_flavor_dict: @@ -2063,14 +2168,11 @@ def create_instance(mydb, tenant_id, instance_dict): if 'disks' in extended_flavor_dict_yaml: myVMDict['disks'] = extended_flavor_dict_yaml['disks'] - - - vm['vim_flavor_id'] = flavor_id - myVMDict['imageRef'] = vm['vim_image_id'] myVMDict['flavorRef'] = vm['vim_flavor_id'] myVMDict['networks'] = [] + task_depends = {} #TODO ALF. connect_mgmt_interfaces. Connect management interfaces if this is true for iface in vm['interfaces']: netDict = {} @@ -2121,6 +2223,8 @@ def create_instance(mydb, tenant_id, instance_dict): break else: netDict['net_id'] = auxNetDict[ sce_vnf['uuid'] ][ iface['net_id'] ] + if is_task_id(netDict['net_id']): + task_depends[netDict['net_id']] = instance_tasks[netDict['net_id']] #skip bridge ifaces not connected to any net #if 'net_id' not in netDict or netDict['net_id']==None: # continue @@ -2134,9 +2238,11 @@ def create_instance(mydb, tenant_id, instance_dict): cloud_config_vm = unify_cloud_config(vm["boot_data"], cloud_config) else: cloud_config_vm = cloud_config - vm_id = vim.new_vminstance(myVMDict['name'],myVMDict['description'],myVMDict.get('start', None), - myVMDict['imageRef'],myVMDict['flavorRef'],myVMDict['networks'], cloud_config = cloud_config_vm, - disk_list = myVMDict['disks']) + task = new_task("new-vm", (myVMDict['name'], myVMDict['description'], myVMDict.get('start', None), + myVMDict['imageRef'], myVMDict['flavorRef'], myVMDict['networks'], + cloud_config_vm, myVMDict['disks']), depends=task_depends) + vm_id = myvim_thread.insert_task(task) + instance_tasks[vm_id] = task vm['vim_id'] = vm_id rollbackList.append({'what':'vm','where':'vim','vim_id':datacenter_id,'uuid':vm_id}) @@ -2151,6 +2257,15 @@ def create_instance(mydb, tenant_id, instance_dict): logger.debug("create_instance Deployment done scenarioDict: %s", yaml.safe_dump(scenarioDict, indent=4, default_flow_style=False) ) instance_id = mydb.new_instance_scenario_as_a_whole(tenant_id,instance_name, instance_description, scenarioDict) + # Update database with those ended tasks + for task in instance_tasks.values(): + if task["status"] == "ok": + if task["name"] == "new-vm": + mydb.update_rows("instance_vms", UPDATE={"vim_vm_id": task["result"]}, + WHERE={"vim_vm_id": task["id"]}) + elif task["name"] == "new-net": + mydb.update_rows("instance_nets", UPDATE={"vim_net_id": task["result"]}, + WHERE={"vim_net_id": task["id"]}) return mydb.get_instance_scenario(instance_id) except (NfvoException, vimconn.vimconnException,db_base_Exception) as e: message = rollback(mydb, myvims, rollbackList) @@ -2164,6 +2279,7 @@ def create_instance(mydb, tenant_id, instance_dict): #logger.error("create_instance: %s", error_text) raise NfvoException(error_text, e.http_code) + def delete_instance(mydb, tenant_id, instance_id): #print "Checking that the instance_id exists and getting the instance dictionary" instanceDict = mydb.get_instance_scenario(instance_id, tenant_id) @@ -2176,13 +2292,20 @@ def delete_instance(mydb, tenant_id, instance_id): #2. delete from VIM error_msg = "" - myvims={} + myvims = {} + myvim_threads = {} #2.1 deleting VMs #vm_fail_list=[] for sce_vnf in instanceDict['vnfs']: datacenter_key = (sce_vnf["datacenter_id"], sce_vnf["datacenter_tenant_id"]) if datacenter_key not in myvims: + try: + myvim_thread = get_vim_thread(tenant_id, sce_vnf["datacenter_id"], sce_vnf["datacenter_tenant_id"]) + except NfvoException as e: + logger.error(str(e)) + myvim_thread = None + myvim_threads[datacenter_key] = myvim_thread vims = get_vim(mydb, tenant_id, datacenter_id=sce_vnf["datacenter_id"], datacenter_tenant_id=sce_vnf["datacenter_tenant_id"]) if len(vims) == 0: @@ -2192,12 +2315,32 @@ def delete_instance(mydb, tenant_id, instance_id): else: myvims[datacenter_key] = vims.values()[0] myvim = myvims[datacenter_key] + myvim_thread = myvim_threads[datacenter_key] for vm in sce_vnf['vms']: if not myvim: error_msg += "\n VM id={} cannot be deleted because datacenter={} not found".format(vm['vim_vm_id'], sce_vnf["datacenter_id"]) continue try: - myvim.delete_vminstance(vm['vim_vm_id']) + task=None + if is_task_id(vm['vim_vm_id']): + task_id = vm['vim_vm_id'] + old_task = task_dict.get(task_id) + if not old_task: + error_msg += "\n VM was scheduled for create, but task {} is not found".format(task_id) + continue + with task_lock: + if old_task["status"] == "enqueued": + old_task["status"] = "deleted" + elif old_task["status"] == "error": + continue + elif old_task["status"] == "processing": + task = new_task("del-vm", task_id, depends={task_id: old_task}) + else: #ok + task = new_task("del-vm", old_task["result"]) + else: + task = new_task("del-vm", vm['vim_vm_id'], store=False) + if task: + myvim_thread.insert_task(task) except vimconn.vimconnNotFoundException as e: error_msg+="\n VM VIM_id={} not found at datacenter={}".format(vm['vim_vm_id'], sce_vnf["datacenter_id"]) logger.warn("VM instance '%s'uuid '%s', VIM id '%s', from VNF_id '%s' not found", @@ -2214,6 +2357,12 @@ def delete_instance(mydb, tenant_id, instance_id): continue #skip not created nets datacenter_key = (net["datacenter_id"], net["datacenter_tenant_id"]) if datacenter_key not in myvims: + try: + myvim_thread = get_vim_thread(tenant_id, sce_vnf["datacenter_id"], sce_vnf["datacenter_tenant_id"]) + except NfvoException as e: + logger.error(str(e)) + myvim_thread = None + myvim_threads[datacenter_key] = myvim_thread vims = get_vim(mydb, tenant_id, datacenter_id=net["datacenter_id"], datacenter_tenant_id=net["datacenter_tenant_id"]) if len(vims) == 0: @@ -2222,25 +2371,48 @@ def delete_instance(mydb, tenant_id, instance_id): else: myvims[datacenter_key] = vims.values()[0] myvim = myvims[datacenter_key] + myvim_thread = myvim_threads[datacenter_key] if not myvim: error_msg += "\n Net VIM_id={} cannot be deleted because datacenter={} not found".format(net['vim_net_id'], net["datacenter_id"]) continue try: - myvim.delete_network(net['vim_net_id']) + task = None + if is_task_id(net['vim_net_id']): + task_id = net['vim_net_id'] + old_task = task_dict.get(task_id) + if not old_task: + error_msg += "\n NET was scheduled for create, but task {} is not found".format(task_id) + continue + with task_lock: + if old_task["status"] == "enqueued": + old_task["status"] = "deleted" + elif old_task["status"] == "error": + continue + elif old_task["status"] == "processing": + task = new_task("del-net", task_id, depends={task_id: old_task}) + else: # ok + task = new_task("del-net", old_task["result"]) + else: + task = new_task("del-net", net['vim_net_id'], store=False) + if task: + myvim_thread.insert_task(task) except vimconn.vimconnNotFoundException as e: - error_msg+="\n NET VIM_id={} not found at datacenter={}".format(net['vim_net_id'], net["datacenter_id"]) + error_msg += "\n NET VIM_id={} not found at datacenter={}".format(net['vim_net_id'], net["datacenter_id"]) logger.warn("NET '%s', VIM_id '%s', from VNF_net_id '%s' not found", - net['uuid'], net['vim_net_id'], str(net['vnf_net_id'])) + net['uuid'], net['vim_net_id'], str(net['vnf_net_id'])) except vimconn.vimconnException as e: - error_msg+="\n NET VIM_id={} at datacenter={} Error: {} {}".format(net['vim_net_id'], net["datacenter_id"], e.http_code, str(e)) + error_msg += "\n NET VIM_id={} at datacenter={} Error: {} {}".format(net['vim_net_id'], + net["datacenter_id"], + e.http_code, str(e)) logger.error("Error %d deleting NET '%s', VIM_id '%s', from VNF_net_id '%s': %s", - e.http_code, net['uuid'], net['vim_net_id'], str(net['vnf_net_id']), str(e)) - if len(error_msg)>0: + e.http_code, net['uuid'], net['vim_net_id'], str(net['vnf_net_id']), str(e)) + if len(error_msg) > 0: return 'instance ' + message + ' deleted but some elements could not be deleted, or already deleted (error: 404) from VIM: ' + error_msg else: return 'instance ' + message + ' deleted' + def refresh_instance(mydb, nfvo_tenant, instanceDict, datacenter=None, vim_tenant=None): '''Refreshes a scenario instance. It modifies instanceDict''' '''Returns: @@ -2405,6 +2577,7 @@ def refresh_instance(mydb, nfvo_tenant, instanceDict, datacenter=None, vim_tenan return 0, 'Scenario instance ' + instance_id + ' refreshed.' + def instance_action(mydb,nfvo_tenant,instance_id, action_dict): #print "Checking that the instance_id exists and getting the instance dictionary" instanceDict = mydb.get_instance_scenario(instance_id, nfvo_tenant) @@ -2477,6 +2650,7 @@ def instance_action(mydb,nfvo_tenant,instance_id, action_dict): else: return vm_result + def create_or_use_console_proxy_thread(console_server, console_port): #look for a non-used port console_thread_key = console_server + ":" + str(console_port) @@ -2501,6 +2675,7 @@ def create_or_use_console_proxy_thread(console_server, console_port): raise NfvoException(str(e), HTTP_Bad_Request) raise NfvoException("Not found any free 'http_console_ports'", HTTP_Conflict) + def check_tenant(mydb, tenant_id): '''check that tenant exists at database''' tenant = mydb.get_rows(FROM='nfvo_tenants', SELECT=('uuid',), WHERE={'uuid': tenant_id}) @@ -2508,10 +2683,12 @@ def check_tenant(mydb, tenant_id): raise NfvoException("tenant '{}' not found".format(tenant_id), HTTP_Not_Found) return + def new_tenant(mydb, tenant_dict): tenant_id = mydb.new_row("nfvo_tenants", tenant_dict, add_uuid=True) return tenant_id + def delete_tenant(mydb, tenant): #get nfvo_tenant info @@ -2519,6 +2696,7 @@ def delete_tenant(mydb, tenant): mydb.delete_row_by_id("nfvo_tenants", tenant_dict['uuid']) return tenant_dict['uuid'] + " " + tenant_dict["name"] + def new_datacenter(mydb, datacenter_descriptor): if "config" in datacenter_descriptor: datacenter_descriptor["config"]=yaml.safe_dump(datacenter_descriptor["config"],default_flow_style=True,width=256) @@ -2536,6 +2714,7 @@ def new_datacenter(mydb, datacenter_descriptor): datacenter_id = mydb.new_row("datacenters", datacenter_descriptor, add_uuid=True) return datacenter_id + def edit_datacenter(mydb, datacenter_id_name, datacenter_descriptor): #obtain data, check that only one exist datacenter = mydb.get_table_by_uuid_name('datacenters', datacenter_id_name) @@ -2563,12 +2742,14 @@ def edit_datacenter(mydb, datacenter_id_name, datacenter_descriptor): mydb.update_rows('datacenters', datacenter_descriptor, where) return datacenter_id + def delete_datacenter(mydb, datacenter): #get nfvo_tenant info datacenter_dict = mydb.get_table_by_uuid_name('datacenters', datacenter, 'datacenter') mydb.delete_row_by_id("datacenters", datacenter_dict['uuid']) return datacenter_dict['uuid'] + " " + datacenter_dict['name'] + def associate_datacenter_to_tenant(mydb, nfvo_tenant, datacenter, vim_tenant_id=None, vim_tenant_name=None, vim_username=None, vim_password=None, config=None): #get datacenter info datacenter_id, myvim = get_datacenter_by_name_uuid(mydb, None, datacenter) @@ -2630,12 +2811,13 @@ def associate_datacenter_to_tenant(mydb, nfvo_tenant, datacenter, vim_tenant_id= # create thread datacenter_id, myvim = get_datacenter_by_name_uuid(mydb, tenant_dict['uuid'], datacenter_id) # reload data thread_name = get_non_used_vim_name(datacenter_name, datacenter_id, tenant_dict['name'], tenant_dict['uuid']) - new_thread = vim_thread.vim_thread(myvim, thread_name) + new_thread = vim_thread.vim_thread(myvim, task_lock, thread_name, datacenter_name, db=db, db_lock=db_lock) new_thread.start() - vim_threads["running"][datacenter_id + "-" + tenant_dict['uuid']] = new_thread - + thread_id = datacenter_id + "." + tenant_dict['uuid'] + vim_threads["running"][thread_id] = new_thread return datacenter_id + def deassociate_datacenter_to_tenant(mydb, tenant_id, datacenter, vim_tenant_id=None): #get datacenter info datacenter_id, myvim = get_datacenter_by_name_uuid(mydb, None, datacenter) @@ -2675,12 +2857,13 @@ def deassociate_datacenter_to_tenant(mydb, tenant_id, datacenter, vim_tenant_id= except db_base_Exception as e: logger.error("Cannot delete datacenter_tenants " + str(e)) pass # the error will be caused because dependencies, vim_tenant can not be deleted - thread_id = datacenter_id + "-" + tenant_datacenter_item["nfvo_tenant_id"] + thread_id = datacenter_id + "." + tenant_datacenter_item["nfvo_tenant_id"] thread = vim_threads["running"][thread_id] - thread.insert_task("exit") + thread.insert_task(new_task("exit", None, store=False)) vim_threads["deleting"][thread_id] = thread return "datacenter {} detached. {}".format(datacenter_id, warning) + def datacenter_action(mydb, tenant_id, datacenter, action_dict): #DEPRECATED #get datacenter info @@ -2723,6 +2906,7 @@ def datacenter_action(mydb, tenant_id, datacenter, action_dict): else: raise NfvoException("Unknown action " + str(action_dict), HTTP_Bad_Request) + def datacenter_edit_netmap(mydb, tenant_id, datacenter, netmap, action_dict): #get datacenter info datacenter_id, _ = get_datacenter_by_name_uuid(mydb, tenant_id, datacenter) @@ -2732,6 +2916,7 @@ def datacenter_edit_netmap(mydb, tenant_id, datacenter, netmap, action_dict): WHERE={'datacenter_id':datacenter_id, what: netmap}) return result + def datacenter_new_netmap(mydb, tenant_id, datacenter, action_dict=None): #get datacenter info datacenter_id, myvim = get_datacenter_by_name_uuid(mydb, tenant_id, datacenter) @@ -2778,6 +2963,7 @@ def datacenter_new_netmap(mydb, tenant_id, datacenter, action_dict=None): net_list.append(net_nfvo) return net_list + def vim_action_get(mydb, tenant_id, datacenter, item, name): #get datacenter info datacenter_id, myvim = get_datacenter_by_name_uuid(mydb, tenant_id, datacenter) @@ -2809,6 +2995,7 @@ def vim_action_get(mydb, tenant_id, datacenter, item, name): print "vim_action Not possible to get_%s_list from VIM: %s " % (item, str(e)) raise NfvoException("Not possible to get_{}_list from VIM: {}".format(item, str(e)), e.http_code) + def vim_action_delete(mydb, tenant_id, datacenter, item, name): #get datacenter info if tenant_id == "any": @@ -2842,6 +3029,7 @@ def vim_action_delete(mydb, tenant_id, datacenter, item, name): return "{} {} {} deleted".format(item[:-1], item_id,item_name) + def vim_action_create(mydb, tenant_id, datacenter, item, descriptor): #get datacenter info logger.debug("vim_action_create descriptor %s", str(descriptor)) diff --git a/vim_thread.py b/vim_thread.py index 5460523b..42279a23 100644 --- a/vim_thread.py +++ b/vim_thread.py @@ -33,97 +33,194 @@ import time import Queue import logging import vimconn - +from db_base import db_base_Exception # from logging import Logger # import auxiliary_functions as af -# TODO: insert a logging system + +def is_task_id(id): + return True if id[:5] == "TASK." else False class vim_thread(threading.Thread): - def __init__(self, vimconn, name=None): - '''Init a thread. + def __init__(self, vimconn, task_lock, name=None, datacenter_name=None, datacenter_tenant_id=None, db=None, db_lock=None): + """Init a thread. Arguments: 'id' number of thead 'name' name of thread 'host','user': host ip or name to manage and user 'db', 'db_lock': database class and lock to use it in exclusion - ''' + """ + self.tasksResult = {} + """ It will contain a dictionary with + task_id: + status: enqueued,done,error,deleted,processing + result: VIM result, + """ threading.Thread.__init__(self) self.vim = vimconn + self.datacenter_name = datacenter_name + self.datacenter_tenant_id = datacenter_tenant_id if not name: - self.name = vimconn["id"] + "-" + vimconn["config"]["datacenter_tenant_id"] + self.name = vimconn["id"] + "." + vimconn["config"]["datacenter_tenant_id"] else: self.name = name self.logger = logging.getLogger('openmano.vim.'+self.name) + self.db = db + self.db_lock = db_lock - self.queueLock = threading.Lock() - self.taskQueue = Queue.Queue(2000) + self.task_lock = task_lock + self.task_queue = Queue.Queue(2000) - - def insert_task(self, task, *aditional): + def insert_task(self, task): try: - self.queueLock.acquire() - task = self.taskQueue.put( (task,) + aditional, timeout=5) - self.queueLock.release() - return 1, None + self.task_queue.put(task, False) + return task["id"] except Queue.Full: - return -1, "timeout inserting a task over host " + self.name + raise vimconn.vimconnException(self.name + ": timeout inserting a task") + + def del_task(self, task): + with self.task_lock: + if task["status"] == "enqueued": + task["status"] == "deleted" + return True + else: # task["status"] == "processing" + self.task_lock.release() + return False def run(self): self.logger.debug("Starting") while True: #TODO reload service while True: - self.queueLock.acquire() - if not self.taskQueue.empty(): - task = self.taskQueue.get() + if not self.task_queue.empty(): + task = self.task_queue.get() + self.task_lock.acquire() + if task["status"] == "deleted": + self.task_lock.release() + continue + task["status"] == "processing" + self.task_lock.release() else: - task = None - self.queueLock.release() - - if task is None: now=time.time() time.sleep(1) - continue - - if task[0] == 'instance': - pass - elif task[0] == 'image': - pass - elif task[0] == 'exit': - print self.name, ": processing task exit" - self.terminate() + continue + self.logger.debug("processing task id={} name={} params={}".format(task["id"], task["name"], + str(task["params"]))) + if task["name"] == 'exit' or task["name"] == 'reload': + result, content = self.terminate(task) + elif task["name"] == 'new-vm': + result, content = self.new_vm(task) + elif task["name"] == 'del-vm': + result, content = self.del_vm(task) + elif task["name"] == 'new-net': + result, content = self.new_net(task) + elif task["name"] == 'del-net': + result, content = self.del_net(task) + else: + error_text = "unknown task {}".format(task["name"]) + self.logger.error(error_text) + result = False + content = error_text + + with self.task_lock: + task["status"] = "done" if result else "error" + task["result"] = content + self.task_queue.task_done() + + if task["name"] == 'exit': return 0 - elif task[0] == 'reload': - print self.name, ": processing task reload terminating and relaunching" - self.terminate() + elif task["name"] == 'reload': break - elif task[0] == 'edit-iface': - pass - elif task[0] == 'restore-iface': - pass - elif task[0] == 'new-ovsbridge': - pass - elif task[0] == 'new-vxlan': - pass - elif task[0] == 'del-ovsbridge': - pass - elif task[0] == 'del-vxlan': - pass - elif task[0] == 'create-ovs-bridge-port': - pass - elif task[0] == 'del-ovs-port': - pass - else: - self.logger.error("unknown task %s", str(task)) self.logger.debug("Finishing") - def terminate(self): - pass + def terminate(self, task): + return True, None + + def new_net(self, task): + try: + task_id = task["id"] + params = task["params"] + net_id = self.vim.new_network(*params) + with self.db_lock: + self.db.update_rows("instance_nets", UPDATE={"vim_net_id": net_id}, WHERE={"vim_net_id": task_id}) + return True, net_id + except db_base_Exception as e: + self.logger.error("Error updating database %s", str(e)) + return True, net_id + except vimconn.vimconnException as e: + return False, str(e) + + def new_vm(self, task): + try: + params = task["params"] + task_id = task["id"] + depends = task.get("depends") + net_list = params[5] + for net in net_list: + if is_task_id(net["net_id"]): # change task_id into network_id + try: + task_net = depends[net["net_id"]] + with self.task_lock: + if task_net["status"] == "error": + return False, "Cannot create VM because depends on a network that cannot be created: " + \ + str(task_net["result"]) + elif task_net["status"] == "enqueued" or task_net["status"] == "processing": + return False, "Cannot create VM because depends on a network still not created" + network_id = task_net["result"] + net["net_id"] = network_id + except Exception as e: + return False, "Error trying to map from task_id={} to task result: {}".format(net["net_id"], + str(e)) + vm_id = self.vim.new_vminstance(*params) + with self.db_lock: + self.db.update_rows("instance_vms", UPDATE={"vim_vm_id": vm_id}, WHERE={"vim_vm_id": task_id}) + return True, vm_id + except db_base_Exception as e: + self.logger.error("Error updtaing database %s", str(e)) + return True, vm_id + except vimconn.vimconnException as e: + return False, str(e) + + def del_vm(self, task): + vm_id = task["params"] + if is_task_id(vm_id): + try: + task_create = task["depends"][vm_id] + with self.task_lock: + if task_create["status"] == "error": + return True, "VM was not created. It has error: " + str(task_create["result"]) + elif task_create["status"] == "enqueued" or task_create["status"] == "processing": + return False, "Cannot delete VM because still creating" + vm_id = task_create["result"] + except Exception as e: + return False, "Error trying to get task_id='{}':".format(vm_id, str(e)) + try: + return True, self.vim.delete_vminstance(vm_id) + except vimconn.vimconnException as e: + return False, str(e) + + def del_net(self, task): + net_id = task["params"] + if is_task_id(net_id): + try: + task_create = task["depends"][net_id] + with self.task_lock: + if task_create["status"] == "error": + return True, "net was not created. It has error: " + str(task_create["result"]) + elif task_create["status"] == "enqueued" or task_create["status"] == "processing": + return False, "Cannot delete net because still creating" + net_id = task_create["result"] + except Exception as e: + return False, "Error trying to get task_id='{}':".format(net_id, str(e)) + try: + return True, self.vim.delete_network(net_id) + except vimconn.vimconnException as e: + return False, str(e) + diff --git a/vimconn.py b/vimconn.py index c1721cdb..a9bd9be6 100644 --- a/vimconn.py +++ b/vimconn.py @@ -373,6 +373,8 @@ class vimconnector(): 'VFnotShared'(SRIOV without VLAN tag) same as PF for network connectivity. VF where no other VFs are allocated on the same physical NIC 'bw': (optional) only for PF/VF/VFnotShared. Minimal Bandwidth required for the interface in GBPS + 'port_security': (optional) If False it must avoid any traffic filtering at this interface. If missing + or True, it must apply the default VIM behaviour After execution the method will add the key: 'vim_id': must be filled/added by this method with the VIM identifier generated by the VIM for this interface. 'net_list' is modified diff --git a/vimconn_openstack.py b/vimconn_openstack.py index 7ba280b0..b501d9da 100644 --- a/vimconn_openstack.py +++ b/vimconn_openstack.py @@ -68,7 +68,8 @@ volume_timeout = 60 server_timeout = 60 class vimconnector(vimconn.vimconnector): - def __init__(self, uuid, name, tenant_id, tenant_name, url, url_admin=None, user=None, passwd=None, log_level=None, config={}): + def __init__(self, uuid, name, tenant_id, tenant_name, url, url_admin=None, user=None, passwd=None, + log_level=None, config={}, persistent_info={}): '''using common constructor parameters. In this case 'url' is the keystone authorization url, 'url_admin' is not use @@ -77,7 +78,8 @@ class vimconnector(vimconn.vimconnector): if config.get('APIversion') == 'v3.3': self.osc_api_version = 'v3.3' vimconn.vimconnector.__init__(self, uuid, name, tenant_id, tenant_name, url, url_admin, user, passwd, log_level, config) - + + self.persistent_info = persistent_info self.k_creds={} self.n_creds={} if self.config.get("insecure"): diff --git a/vimconn_openvim.py b/vimconn_openvim.py index 9d6748a9..e86b54db 100644 --- a/vimconn_openvim.py +++ b/vimconn_openvim.py @@ -323,11 +323,13 @@ get_processor_rankings_response_schema = { } class vimconnector(vimconn.vimconnector): - def __init__(self, uuid, name, tenant_id, tenant_name, url, url_admin=None, user=None, passwd=None,log_level="DEBUG",config={}): + def __init__(self, uuid, name, tenant_id, tenant_name, url, url_admin=None, user=None, passwd=None, + log_level="DEBUG", config={}, persistent_info={}): vimconn.vimconnector.__init__(self, uuid, name, tenant_id, tenant_name, url, url_admin, user, passwd, log_level, config) self.tenant = None self.headers_req = {'content-type': 'application/json'} self.logger = logging.getLogger('openmano.vim.openvim') + self.persistent_info = persistent_info if tenant_id: self.tenant = tenant_id diff --git a/vimconn_vmware.py b/vimconn_vmware.py index 3e81f513..cf7445d0 100644 --- a/vimconn_vmware.py +++ b/vimconn_vmware.py @@ -127,7 +127,7 @@ flavorlist = {} class vimconnector(vimconn.vimconnector): def __init__(self, uuid=None, name=None, tenant_id=None, tenant_name=None, - url=None, url_admin=None, user=None, passwd=None, log_level=None, config={}): + url=None, url_admin=None, user=None, passwd=None, log_level=None, config={}, persistent_info={}): """ Constructor create vmware connector to vCloud director. @@ -164,6 +164,7 @@ class vimconnector(vimconn.vimconnector): self.logger = logging.getLogger('openmano.vim.vmware') self.logger.setLevel(10) + self.persistent_info = persistent_info self.name = name self.id = uuid