__author__ = "Alfonso Tierno, Gerardo Garcia, Pablo Montes"
__date__ = "$26-aug-2014 11:09:29$"
-__version__ = "0.5.40-r550"
+__version__ = "0.5.41-r551"
version_date = "Nov 2017"
database_version = 27 # expected database schema version
else:
return str(data[0]) + '=' + json.dumps(str(data[1]))
- def __tuple2db_format_where(self, data):
- '''Compose the needed text for a SQL WHERE, parameter 'data' is a pair tuple (A,B),
- and it returns the text 'A="B"', where A is a field of a table and B is the value
- If B is None it returns the 'A is Null' text, without surrounding Null by quotes
- If B is not None it returns the text "A='B'" or 'A="B"' where B is surrounded by quotes,
- and it ensures internal quotes of B are escaped.
- '''
- if data[1]==None:
- return str(data[0]) + " is Null"
- elif isinstance(data[1], str):
- return str(data[0]) + '=' + json.dumps(data[1])
- else:
- return str(data[0]) + '=' + json.dumps(str(data[1]))
+ def __create_where(self, data, use_or=None):
+ """
+ Compose the needed text for a SQL WHERE, parameter 'data' can be a dict or a list of dict. By default lists are
+ concatenated with OR and dict with AND, unless parameter 'use_or' indicates other thing.
+ If a dict it will generate 'key1="value1" AND key2="value2" AND ...'.
+ If value is None, it will produce 'key is null'
+ If value is a list or tuple, it will produce 'key="value[0]" OR key="value[1]" OR ...'
+ keys can be suffixed by >,<,<>,>=,<= so that this is used to compare key and value instead of "="
+ The special keys "OR", "AND" with a dict value is used to create a nested WHERE
+ If a list, each item will be a dictionary that will be concatenated with OR by default
+ :param data: dict or list of dicts
+ :param use_or: Can be None (use default behaviour), True (use OR) or False (use AND)
+ :return: a string with the content to send to mysql
+ """
+ cmd = []
+ if isinstance(data, dict):
+ for k, v in data.items():
+ if k == "OR":
+ cmd.append("(" + self.__create_where(v, use_or=True) + ")")
+ continue
+ elif k == "AND":
+ cmd.append("(" + self.__create_where(v, use_or=False) + ")")
+ continue
- def __tuple2db_format_where_not(self, data):
- '''Compose the needed text for a SQL WHERE(not). parameter 'data' is a pair tuple (A,B),
- and it returns the text 'A<>"B"', where A is a field of a table and B is the value
- If B is None it returns the 'A is not Null' text, without surrounding Null by quotes
- If B is not None it returns the text "A<>'B'" or 'A<>"B"' where B is surrounded by quotes,
- and it ensures internal quotes of B are escaped.
- '''
- if data[1]==None:
- return str(data[0]) + " is not Null"
- elif isinstance(data[1], str):
- return str(data[0]) + '<>' + json.dumps(data[1])
+ if k.endswith(">") or k.endswith("<") or k.endswith("="):
+ pass
+ else:
+ k += "="
+
+ if v is None:
+ cmd.append(k.replace("=", " is").replace("<>", " is not") + " Null")
+ elif isinstance(v, (tuple, list)):
+ cmd2 = []
+ for v2 in v:
+ if v2 is None:
+ cmd2.append(k.replace("=", " is").replace("<>", " is not") + " Null")
+ else:
+ cmd2.append(k + json.dumps(str(v2)))
+ cmd.append("(" + " OR ".join(cmd2) + ")")
+ else:
+ cmd.append(k + json.dumps(str(v)))
+ elif isinstance(data, (tuple, list)):
+ if use_or is None:
+ use_or = True
+ for k in data:
+ cmd.append("(" + self.__create_where(k) + ")")
else:
- return str(data[0]) + '<>' + json.dumps(str(data[1]))
-
+ raise db_base_Exception("invalid WHERE clause at '{}'".format(data))
+ if use_or:
+ return " OR ".join(cmd)
+ return " AND ".join(cmd)
+
def __remove_quotes(self, data):
'''remove single quotes ' of any string content of data dictionary'''
for k,v in data.items():
data[k] = data[k].replace("'","_")
def _update_rows(self, table, UPDATE, WHERE, modified_time=0):
- ''' Update one or several rows into a table.
- Atributes
- UPDATE: dictionary with the key: value to change
- table: table where to update
- WHERE: dictionary of elements to update
- Return: the number of updated rows, exception if error
- '''
- #gettting uuid
+ """ Update one or several rows of a table.
+ :param UPDATE: dictionary with the changes. dict keys are database columns that will be set with the dict values
+ :param table: database table to update
+ :param WHERE: dict or list of dicts to compose the SQL WHERE clause.
+ If a dict it will generate 'key1="value1" AND key2="value2" AND ...'.
+ If value is None, it will produce 'key is null'
+ If value is a list or tuple, it will produce 'key="value[0]" OR key="value[1]" OR ...'
+ keys can be suffixed by >,<,<>,>=,<= so that this is used to compare key and value instead of "="
+ The special keys "OR", "AND" with a dict value is used to create a nested WHERE
+ If a list, each item will be a dictionary that will be concatenated with OR
+ :return: the number of updated rows, raises exception upon error
+ """
+ # gettting uuid
values = ",".join(map(self.__tuple2db_format_set, UPDATE.iteritems() ))
if modified_time:
values += ",modified_at={:f}".format(modified_time)
- cmd= "UPDATE " + table +" SET " + values +\
- " WHERE " + " and ".join(map(self.__tuple2db_format_where, WHERE.iteritems() ))
+ cmd= "UPDATE " + table + " SET " + values + " WHERE " + self.__create_where(WHERE)
self.logger.debug(cmd)
self.cur.execute(cmd)
return self.cur.rowcount
tries -= 1
def update_rows(self, table, UPDATE, WHERE, modified_time=0):
- ''' Update one or several rows into a table.
- Atributes
- UPDATE: dictionary with the key: value to change
- table: table where to update
- WHERE: dictionary of elements to update
- Return: (result, descriptive text) where result indicates the number of updated files
- '''
+ """ Update one or several rows of a table.
+ :param UPDATE: dictionary with the changes. dict keys are database columns that will be set with the dict values
+ :param table: database table to update
+ :param WHERE: dict or list of dicts to compose the SQL WHERE clause.
+ If a dict it will generate 'key1="value1" AND key2="value2" AND ...'.
+ If value is None, it will produce 'key is null'
+ If value is a list or tuple, it will produce 'key="value[0]" OR key="value[1]" OR ...'
+ keys can be suffixed by >,<,<>,>=,<= so that this is used to compare key and value instead of "="
+ The special keys "OR", "AND" with a dict value is used to create a nested WHERE
+ If a list, each item will be a dictionary that will be concatenated with OR
+ :param modified_time: Can contain the time to be set to the table row
+ :return: the number of updated rows, raises exception upon error
+ """
if table in self.tables_with_created_field and modified_time==0:
modified_time=time.time()
tries = 2
tries -= 1
def delete_row(self, **sql_dict):
- ''' Deletes rows from a table.
- Attribute sql_dir: dictionary with the following key: value
- 'FROM': string of table name (Mandatory)
- 'WHERE': dict of key:values, translated to key=value AND ... (Optional)
- 'WHERE_NOT': dict of key:values, translated to key<>value AND ... (Optional)
- if value is None, it is translated to key is not null
- 'LIMIT': limit of number of rows (Optional)
- Return: the number of deleted or exception if error
- '''
- #print sql_dict
- from_ = "FROM " + str(sql_dict['FROM'])
- #print 'from_', from_
- if 'WHERE' in sql_dict and len(sql_dict['WHERE']) > 0:
- w=sql_dict['WHERE']
- where_ = "WHERE " + " AND ".join(map(self.__tuple2db_format_where, w.iteritems()))
- else: where_ = ""
- if 'WHERE_NOT' in sql_dict and len(sql_dict['WHERE_NOT']) > 0:
- w=sql_dict['WHERE_NOT']
- where_2 = " AND ".join(map(self.__tuple2db_format_where_not, w.iteritems()))
- if len(where_)==0: where_ = "WHERE " + where_2
- else: where_ = where_ + " AND " + where_2
- #print 'where_', where_
- limit_ = "LIMIT " + str(sql_dict['LIMIT']) if 'LIMIT' in sql_dict else ""
- #print 'limit_', limit_
- cmd = " ".join( ("DELETE", from_, where_, limit_) )
+ """ Deletes rows from a table.
+ :param UPDATE: dictionary with the changes. dict keys are database columns that will be set with the dict values
+ :param FROM: string with table name (Mandatory)
+ :param WHERE: dict or list of dicts to compose the SQL WHERE clause. (Optional)
+ If a dict it will generate 'key1="value1" AND key2="value2" AND ...'.
+ If value is None, it will produce 'key is null'
+ If value is a list or tuple, it will produce 'key="value[0]" OR key="value[1]" OR ...'
+ keys can be suffixed by >,<,<>,>=,<= so that this is used to compare key and value instead of "="
+ The special keys "OR", "AND" with a dict value is used to create a nested WHERE
+ If a list, each item will be a dictionary that will be concatenated with OR
+ :return: the number of deleted rows, raises exception upon error
+ """
+ # print sql_dict
+ cmd = "DELETE FROM " + str(sql_dict['FROM'])
+ if sql_dict.get('WHERE'):
+ cmd += " WHERE " + self.__create_where(sql_dict['WHERE'])
+ if sql_dict.get('LIMIT'):
+ cmd += " LIMIT " + str(sql_dict['LIMIT'])
tries = 2
while tries:
try:
tries -= 1
def get_rows(self, **sql_dict):
- ''' Obtain rows from a table.
- Attribute sql_dir: dictionary with the following key: value
- 'SELECT': list or tuple of fields to retrieve) (by default all)
- 'FROM': string of table name (Mandatory)
- 'WHERE': dict of key:values, translated to key=value (key is null) AND ... (Optional)
- 'WHERE_NOT': dict of key:values, translated to key<>value (key is not null) AND ... (Optional)
- 'WHERE_OR': dict of key:values, translated to key=value OR ... (Optional)
- 'WHERE_AND_OR: str 'AND' or 'OR'(by default) mark the priority to 'WHERE AND (WHERE_OR)' or (WHERE) OR WHERE_OR' (Optional)
- 'LIMIT': limit of number of rows (Optional)
- 'ORDER_BY': list or tuple of fields to order, add ' DESC' to each item if inverse order is required
- Return: a list with dictionaries at each row
- '''
- #print sql_dict
- select_= "SELECT " + ("*" if 'SELECT' not in sql_dict else ",".join(map(str,sql_dict['SELECT'])) )
- #print 'select_', select_
- from_ = "FROM " + str(sql_dict['FROM'])
- #print 'from_', from_
- where_and = ""
- where_or = ""
- w=sql_dict.get('WHERE')
- if w:
- where_and = " AND ".join(map(self.__tuple2db_format_where, w.iteritems() ))
- w=sql_dict.get('WHERE_NOT')
- if w:
- if where_and: where_and += " AND "
- where_and += " AND ".join(map(self.__tuple2db_format_where_not, w.iteritems() ) )
- w=sql_dict.get('WHERE_OR')
- if w:
- where_or = " OR ".join(map(self.__tuple2db_format_where, w.iteritems() ))
- if where_and and where_or:
- if sql_dict.get("WHERE_AND_OR") == "AND":
- where_ = "WHERE " + where_and + " AND (" + where_or + ")"
+ """ Obtain rows from a table.
+ :param SELECT: list or tuple of fields to retrieve) (by default all)
+ :param FROM: string with table name (Mandatory)
+ :param WHERE: dict or list of dicts to compose the SQL WHERE clause. (Optional)
+ If a dict it will generate 'key1="value1" AND key2="value2" AND ...'.
+ If value is None, it will produce 'key is null'
+ If value is a list or tuple, it will produce 'key="value[0]" OR key="value[1]" OR ...'
+ keys can be suffixed by >,<,<>,>=,<= so that this is used to compare key and value instead of "="
+ The special keys "OR", "AND" with a dict value is used to create a nested WHERE
+ If a list, each item will be a dictionary that will be concatenated with OR
+ :param LIMIT: limit the number of obtianied entries (Optional)
+ :param ORDER_BY: list or tuple of fields to order, add ' DESC' to each item if inverse order is required
+ :return: a list with dictionaries at each row, raises exception upon error
+ """
+ # print sql_dict
+ cmd = "SELECT "
+ if 'SELECT' in sql_dict:
+ if isinstance(sql_dict['SELECT'], (tuple, list)):
+ cmd += ",".join(map(str, sql_dict['SELECT']))
else:
- where_ = "WHERE (" + where_and + ") OR " + where_or
- elif where_and and not where_or:
- where_ = "WHERE " + where_and
- elif not where_and and where_or:
- where_ = "WHERE " + where_or
+ cmd += sql_dict['SELECT']
else:
- where_ = ""
- #print 'where_', where_
- limit_ = "LIMIT " + str(sql_dict['LIMIT']) if 'LIMIT' in sql_dict else ""
- order_ = "ORDER BY " + ",".join(map(str,sql_dict['ORDER_BY'])) if 'ORDER_BY' in sql_dict else ""
-
- #print 'limit_', limit_
- cmd = " ".join( (select_, from_, where_, limit_, order_) )
+ cmd += "*"
+
+ cmd += " FROM " + str(sql_dict['FROM'])
+ if sql_dict.get('WHERE'):
+ cmd += " WHERE " + self.__create_where(sql_dict['WHERE'])
+
+ if 'ORDER_BY' in sql_dict:
+ cmd += " ORDER BY "
+ if isinstance(sql_dict['ORDER_BY'], (tuple, list)):
+ cmd += ",".join(map(str, sql_dict['ORDER_BY']))
+ else:
+ cmd += str(sql_dict['ORDER_BY'])
+
+ if 'LIMIT' in sql_dict:
+ cmd += " LIMIT " + str(sql_dict['LIMIT'])
+
tries = 2
while tries:
try:
if error_item_text==None:
error_item_text = table
what = 'uuid' if af.check_valid_uuid(uuid_name) else 'name'
- cmd = " SELECT * FROM {} WHERE {}='{}'".format(table, what, uuid_name)
+ cmd = " SELECT * FROM {} WHERE {}='{}'".format(table, what, uuid_name)
if WHERE_OR:
- where_or = " OR ".join(map(self.__tuple2db_format_where, WHERE_OR.iteritems() ))
+ where_or = self.__create_where(WHERE_OR, use_or=True)
if WHERE_AND_OR == "AND":
cmd += " AND (" + where_or + ")"
else:
cmd += " OR " + where_or
-
tries = 2
while tries:
try:
nfvo.check_tenant(mydb, tenant_id)
select_,where_,limit_ = filter_query_string(bottle.request.query, None,
('uuid', 'name', 'osm_id', 'description', 'public', "tenant_id", "created_at") )
- where_or = {}
if tenant_id != "any":
- where_or["tenant_id"] = tenant_id
- where_or["public"] = True
- vnfs = mydb.get_rows(FROM='vnfs', SELECT=select_,WHERE=where_,WHERE_OR=where_or, WHERE_AND_OR="AND",LIMIT=limit_)
- #change_keys_http2db(content, http2db_vnf, reverse=True)
+ where_["OR"]={"tenant_id": tenant_id, "public": True}
+ vnfs = mydb.get_rows(FROM='vnfs', SELECT=select_, WHERE=where_, LIMIT=limit_)
+ # change_keys_http2db(content, http2db_vnf, reverse=True)
utils.convert_str2boolean(vnfs, ('public',))
convert_datetime2str(vnfs)
- data={'vnfs' : vnfs}
+ data={'vnfs': vnfs}
return format_out(data)
except bottle.HTTPError:
raise
#obtain data
s,w,l=filter_query_string(bottle.request.query, None,
('uuid', 'name', 'osm_id', 'description', 'tenant_id', 'created_at', 'public'))
- where_or={}
if tenant_id != "any":
- where_or["tenant_id"] = tenant_id
- where_or["public"] = True
- scenarios = mydb.get_rows(SELECT=s, WHERE=w, WHERE_OR=where_or, WHERE_AND_OR="AND", LIMIT=l, FROM='scenarios')
+ w["OR"] = {"tenant_id": tenant_id, "public": True}
+ scenarios = mydb.get_rows(SELECT=s, WHERE=w, LIMIT=l, FROM='scenarios')
convert_datetime2str(scenarios)
utils.convert_str2boolean(scenarios, ('public',) )
data={'scenarios':scenarios}
'log_level_of': 'DEBUG'
}
try:
+ # starts ovim library
ovim = ovim_module.ovim(ovim_configuration)
ovim.start_service()
+ #delete old unneeded vim_actions
+ clean_db(mydb)
+
+ # starts vim_threads
from_= 'tenants_datacenters as td join datacenters as d on td.datacenter_id=d.uuid join '\
'datacenter_tenants as dt on td.datacenter_tenant_id=dt.uuid'
select_ = ('type', 'd.config as config', 'd.uuid as datacenter_id', 'vim_url', 'vim_url_admin',
return ("openmanod version {} {}\n(c) Copyright Telefonica".format(global_config["version"],
global_config["version_date"] ))
+def clean_db(mydb):
+ """
+ Clean unused or old entries at database to avoid unlimited growing
+ :param mydb: database connector
+ :return: None
+ """
+ # get and delete unused vim_actions: all elements deleted, one week before, instance not present
+ now = t.time()-3600*24*7
+ instance_action_id = None
+ nb_deleted = 0
+ while True:
+ actions_to_delete = mydb.get_rows(
+ SELECT=("item", "item_id", "instance_action_id"),
+ FROM="vim_actions as va join instance_actions as ia on va.instance_action_id=ia.uuid "
+ "left join instance_scenarios as i on ia.instance_id=i.uuid",
+ WHERE={"va.action": "DELETE", "va.modified_at<": now, "i.uuid": None,
+ "va.status": ("DONE", "SUPERSEDED")},
+ LIMIT=100
+ )
+ for to_delete in actions_to_delete:
+ mydb.delete_row(FROM="vim_actions", WHERE=to_delete)
+ if instance_action_id != to_delete["instance_action_id"]:
+ instance_action_id = to_delete["instance_action_id"]
+ mydb.delete_row(FROM="instance_actions", WHERE={"uuid": instance_action_id})
+ nb_deleted += len(actions_to_delete)
+ if len(actions_to_delete) < 100:
+ break
+ if nb_deleted:
+ logger.debug("Removed {} unused vim_actions".format(nb_deleted))
+
+
def get_flavorlist(mydb, vnf_id, nfvo_tenant=None):
'''Obtain flavorList
content = mydb.get_rows(FROM='vnfs join vms on vnfs.uuid=vms.vnf_id join interfaces on vms.uuid=interfaces.vm_id',\
SELECT=('interfaces.uuid as uuid','interfaces.external_name as external_name', 'vms.name as vm_name', 'interfaces.vm_id as vm_id', \
'interfaces.internal_name as internal_name', 'interfaces.type as type', 'interfaces.vpci as vpci','interfaces.bw as bw'),\
- WHERE={'vnfs.uuid': vnf_id},
- WHERE_NOT={'interfaces.external_name': None} )
+ WHERE={'vnfs.uuid': vnf_id, 'interfaces.external_name<>': None} )
#print content
data['vnf']['external-connections'] = content
#1.2: Check that VNF are present at database table vnfs. Insert uuid, description and external interfaces
for name,vnf in vnfs.items():
- where={}
- where_or={"tenant_id": tenant_id, 'public': "true"}
+ where = {"OR": {"tenant_id": tenant_id, 'public': "true"}}
error_text = ""
error_pos = "'topology':'nodes':'" + name + "'"
if 'vnf_id' in vnf:
if 'VNF model' in vnf:
error_text += " 'VNF model' " + vnf['VNF model']
where['name'] = vnf['VNF model']
- if len(where) == 0:
+ if len(where) == 1:
raise NfvoException("Descriptor need a 'vnf_id' or 'VNF model' field at " + error_pos, HTTP_Bad_Request)
vnf_db = mydb.get_rows(SELECT=('uuid','name','description'),
FROM='vnfs',
- WHERE=where,
- WHERE_OR=where_or,
- WHERE_AND_OR="AND")
+ WHERE=where)
if len(vnf_db)==0:
raise NfvoException("unknown" + error_text + " at " + error_pos, HTTP_Not_Found)
elif len(vnf_db)>1:
#get external interfaces
ext_ifaces = mydb.get_rows(SELECT=('external_name as name','i.uuid as iface_uuid', 'i.type as type'),
FROM='vnfs join vms on vnfs.uuid=vms.vnf_id join interfaces as i on vms.uuid=i.vm_id',
- WHERE={'vnfs.uuid':vnf['uuid']}, WHERE_NOT={'external_name':None} )
+ WHERE={'vnfs.uuid':vnf['uuid'], 'external_name<>': None} )
for ext_iface in ext_ifaces:
vnf['ifaces'][ ext_iface['name'] ] = {'uuid':ext_iface['iface_uuid'], 'type':ext_iface['type']}
# 1: Check that VNF are present at database table vnfs and update content into scenario dict
for name,vnf in scenario["vnfs"].iteritems():
- where={}
- where_or={"tenant_id": tenant_id, 'public': "true"}
+ where = {"OR": {"tenant_id": tenant_id, 'public': "true"}}
error_text = ""
error_pos = "'scenario':'vnfs':'" + name + "'"
if 'vnf_id' in vnf:
if 'vnf_name' in vnf:
error_text += " 'vnf_name' " + vnf['vnf_name']
where['name'] = vnf['vnf_name']
- if len(where) == 0:
+ if len(where) == 1:
raise NfvoException("Needed a 'vnf_id' or 'vnf_name' at " + error_pos, HTTP_Bad_Request)
vnf_db = mydb.get_rows(SELECT=('uuid', 'name', 'description'),
FROM='vnfs',
- WHERE=where,
- WHERE_OR=where_or,
- WHERE_AND_OR="AND")
+ WHERE=where)
if len(vnf_db) == 0:
raise NfvoException("Unknown" + error_text + " at " + error_pos, HTTP_Not_Found)
elif len(vnf_db) > 1:
# get external interfaces
ext_ifaces = mydb.get_rows(SELECT=('external_name as name', 'i.uuid as iface_uuid', 'i.type as type'),
FROM='vnfs join vms on vnfs.uuid=vms.vnf_id join interfaces as i on vms.uuid=i.vm_id',
- WHERE={'vnfs.uuid':vnf['uuid']}, WHERE_NOT={'external_name': None} )
+ WHERE={'vnfs.uuid':vnf['uuid'], 'external_name<>': None} )
for ext_iface in ext_ifaces:
vnf['ifaces'][ ext_iface['name'] ] = {'uuid':ext_iface['iface_uuid'], 'type': ext_iface['type']}
# TODO? get internal-connections from db.nets and their profiles, and update scenario[vnfs][internal-connections] accordingly
myvims = {}
myvim_threads = {}
vimthread_affected = {}
+ net2vm_dependencies = {}
task_index = 0
instance_action_id = get_task_id()
if not myvim:
error_msg += "\n VM id={} cannot be deleted because datacenter={} not found".format(vm['vim_vm_id'], sce_vnf["datacenter_id"])
continue
- try:
- db_vim_action = {
- "instance_action_id": instance_action_id,
- "task_index": task_index,
- "datacenter_vim_id": sce_vnf["datacenter_tenant_id"],
- "action": "DELETE",
- "status": "SCHEDULED",
- "item": "instance_vms",
- "item_id": vm["uuid"],
- "extra": yaml.safe_dump({"params": vm["interfaces"]},
- default_flow_style=True, width=256)
- }
- task_index += 1
- db_vim_actions.append(db_vim_action)
-
- except vimconn.vimconnNotFoundException as e:
- error_msg+="\n VM VIM_id={} not found at datacenter={}".format(vm['vim_vm_id'], sce_vnf["datacenter_id"])
- logger.warn("VM instance '%s'uuid '%s', VIM id '%s', from VNF_id '%s' not found",
- vm['name'], vm['uuid'], vm['vim_vm_id'], sce_vnf['vnf_id'])
- except vimconn.vimconnException as e:
- error_msg+="\n VM VIM_id={} at datacenter={} Error: {} {}".format(vm['vim_vm_id'], sce_vnf["datacenter_id"], e.http_code, str(e))
- logger.error("Error %d deleting VM instance '%s'uuid '%s', VIM_id '%s', from VNF_id '%s': %s",
- e.http_code, vm['name'], vm['uuid'], vm['vim_vm_id'], sce_vnf['vnf_id'], str(e))
+ db_vim_action = {
+ "instance_action_id": instance_action_id,
+ "task_index": task_index,
+ "datacenter_vim_id": sce_vnf["datacenter_tenant_id"],
+ "action": "DELETE",
+ "status": "SCHEDULED",
+ "item": "instance_vms",
+ "item_id": vm["uuid"],
+ "extra": yaml.safe_dump({"params": vm["interfaces"]},
+ default_flow_style=True, width=256)
+ }
+ db_vim_actions.append(db_vim_action)
+ for interface in vm["interfaces"]:
+ if not interface.get("instance_net_id"):
+ continue
+ if interface["instance_net_id"] not in net2vm_dependencies:
+ net2vm_dependencies[interface["instance_net_id"]] = []
+ net2vm_dependencies[interface["instance_net_id"]].append(task_index)
+ task_index += 1
# 2.2 deleting NETS
# net_fail_list=[]
for net in instanceDict['nets']:
- # TODO if not net['created']:
- # TODO continue #skip not created nets
-
vimthread_affected[net["datacenter_tenant_id"]] = None
datacenter_key = (net["datacenter_id"], net["datacenter_tenant_id"])
if datacenter_key not in myvims:
if not myvim:
error_msg += "\n Net VIM_id={} cannot be deleted because datacenter={} not found".format(net['vim_net_id'], net["datacenter_id"])
continue
- try:
- db_vim_action = {
- "instance_action_id": instance_action_id,
- "task_index": task_index,
- "datacenter_vim_id": net["datacenter_tenant_id"],
- "action": "DELETE",
- "status": "SCHEDULED",
- "item": "instance_nets",
- "item_id": net["uuid"],
- "extra": yaml.safe_dump({"params": (net['vim_net_id'], net['sdn_net_id'])},
- default_flow_style=True, width=256)
- }
- task_index += 1
- db_vim_actions.append(db_vim_action)
-
- except vimconn.vimconnNotFoundException as e:
- error_msg += "\n NET VIM_id={} not found at datacenter={}".format(net['vim_net_id'], net["datacenter_id"])
- logger.warn("NET '%s', VIM_id '%s', from VNF_net_id '%s' not found",
- net['uuid'], net['vim_net_id'], str(net['vnf_net_id']))
- except vimconn.vimconnException as e:
- error_msg += "\n NET VIM_id={} at datacenter={} Error: {} {}".format(net['vim_net_id'],
- net["datacenter_id"],
- e.http_code, str(e))
- logger.error("Error %d deleting NET '%s', VIM_id '%s', from VNF_net_id '%s': %s",
- e.http_code, net['uuid'], net['vim_net_id'], str(net['vnf_net_id']), str(e))
+ extra = {"params": (net['vim_net_id'], net['sdn_net_id'])}
+ if net2vm_dependencies.get(net["uuid"]):
+ extra["depends_on"] = net2vm_dependencies[net["uuid"]]
+ db_vim_action = {
+ "instance_action_id": instance_action_id,
+ "task_index": task_index,
+ "datacenter_vim_id": net["datacenter_tenant_id"],
+ "action": "DELETE",
+ "status": "SCHEDULED",
+ "item": "instance_nets",
+ "item_id": net["uuid"],
+ "extra": yaml.safe_dump(extra, default_flow_style=True, width=256)
+ }
+ task_index += 1
+ db_vim_actions.append(db_vim_action)
db_instance_action["number_tasks"] = task_index
db_tables = [
""""
This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
The tasks are stored at database in table vim_actions
-The task content are (M: stored at memory, D: stored at database):
+The task content is (M: stored at memory, D: stored at database):
MD instance_action_id: reference a global action over an instance-scenario: database instance_actions
- MD task_index: index number of the task. This with the previous are a key
+ MD task_index: index number of the task. This together with the previous forms a unique key identifier
MD datacenter_vim_id: should contain the uuid of the VIM managed by this thread
MD vim_id: id of the vm,net,etc at VIM
MD action: CREATE, DELETE, FIND
MD item: database table name, can be instance_vms, instance_nets, TODO: datacenter_flavors, datacenter_images
- MD item_id: uuid of the referenced entry in the preious table
+ MD item_id: uuid of the referenced entry in the previous table
MD status: SCHEDULED,BUILD,DONE,FAILED,SUPERSEDED
MD extra: text with yaml format at database, dict at memory with:
params: list with the params to be sent to the VIM for CREATE or FIND. For DELETE the vim_id is taken from other related tasks
find: (only for CREATE tasks) if present it should FIND before creating and use if existing. Contains the FIND params
- depends_on: list with the 'task_index'es of tasks that must be completed before. e.g. a vm depends on a net
+ depends_on: list with the 'task_index'es of tasks that must be completed before. e.g. a vm creation depends on a net creation
sdn_net_id: used for net.
tries:
interfaces: used for VMs. Each key is the uuid of the instance_interfaces entry at database
__author__ = "Alfonso Tierno, Pablo Montes"
__date__ = "$28-Sep-2017 12:07:15$"
-# from logging import Logger
-# import auxiliary_functions as af
-
def is_task_id(task_id):
return task_id.startswith("TASK-")
task_list = []
old_action_key = None
- with self.db_lock:
- vim_actions = self.db.get_rows(FROM="vim_actions",
- WHERE={"datacenter_vim_id": self.datacenter_tenant_id },
- ORDER_BY=("item", "item_id", "created_at",))
- for task in vim_actions:
- item = task["item"]
- item_id = task["item_id"]
- action_key = item + item_id
- if old_action_key != action_key:
- if not action_completed and task_list:
- # This will fill needed task parameters into memory, and insert the task if needed in
- # self.pending_tasks or self.refresh_tasks
- self._insert_pending_tasks(task_list)
- task_list = []
- old_action_key = action_key
- action_completed = False
- elif action_completed:
- continue
+ old_item_id = ""
+ old_item = ""
+ old_created_at = 0.0
+ database_limit = 200
+ while True:
+ # get 200 (database_limit) entries each time
+ with self.db_lock:
+ vim_actions = self.db.get_rows(FROM="vim_actions",
+ WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
+ "item_id>=": old_item_id},
+ ORDER_BY=("item_id", "item", "created_at",),
+ LIMIT=database_limit)
+ for task in vim_actions:
+ item = task["item"]
+ item_id = task["item_id"]
+
+ # skip the first entries that are already processed in the previous pool of 200
+ if old_item_id:
+ if item_id == old_item_id and item == old_item and task["created_at"] == old_created_at:
+ old_item_id = False # next one will be a new un-processed task
+ continue
- if task["status"] == "SCHEDULED" or task["action"] == "CREATE" or task["action"] == "FIND":
- task_list.append(task)
- elif task["action"] == "DELETE":
- # action completed because deleted and status is not SCHEDULED. Not needed anything
- action_completed = True
+ action_key = item + item_id
+ if old_action_key != action_key:
+ if not action_completed and task_list:
+ # This will fill needed task parameters into memory, and insert the task if needed in
+ # self.pending_tasks or self.refresh_tasks
+ self._insert_pending_tasks(task_list)
+ task_list = []
+ old_action_key = action_key
+ action_completed = False
+ elif action_completed:
+ continue
+ if task["status"] == "SCHEDULED" or task["action"] == "CREATE" or task["action"] == "FIND":
+ task_list.append(task)
+ elif task["action"] == "DELETE":
+ # action completed because deleted and status is not SCHEDULED. Not needed anything
+ action_completed = True
+ if len(vim_actions) == database_limit:
+ # update variables for get the next database iteration
+ old_item_id = item_id
+ old_item = item
+ old_created_at = task["created_at"]
+ else:
+ break
# Last actions group need to be inserted too
if not action_completed and task_list:
self._insert_pending_tasks(task_list)
+ self.logger.debug("reloaded vim actions pending:{} refresh:{}".format(
+ len(self.pending_tasks), len(self.refresh_tasks)))
def _refres_elements(self):
"""Call VIM to get VMs and networks status until 10 elements"""
if vim_info.get("error_msg"):
vim_info["error_msg"] = self._format_vim_error_msg(vim_info["error_msg"])
if task_vim_status != vim_info["status"] or task_error_msg != vim_info.get("error_msg") or \
- task_vim_info != vim_info["vim_info"]:
+ task_vim_info != vim_info.get("vim_info"):
task["extra"]["vim_status"] = vim_info["status"]
task["error_msg"] = vim_info.get("error_msg")
- task["vim_info"] = vim_info["vim_info"]
+ task["vim_info"] = vim_info.get("vim_info")
temp_dict = {"status": vim_info["status"],
"error_msg": vim_info.get("error_msg"),
- "vim_info": vim_info["vim_info"]}
+ "vim_info": vim_info.get("vim_info")}
with self.db_lock:
self.db.update_rows('instance_nets', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]})
self.db.update_rows(
while self.pending_tasks:
task = self.pending_tasks.pop(0)
nb_processed += 1
- if task["status"] == "SUPERSEDED":
- # not needed to do anything but update database with the new status
- result = True
- database_update = None
- elif not self.vim:
- task["status"] == "ERROR"
- task["error_msg"] = self.error_status
- result = False
- database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]}
- elif task["item"] == 'instance_vms':
- if task["action"] == "CREATE":
- result, database_update = self.new_vm(task)
- nb_created += 1
- elif task["action"] == "DELETE":
- result, database_update = self.del_vm(task)
- else:
- raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
- elif task["item"] == 'instance_nets':
- if task["action"] == "CREATE":
- result, database_update = self.new_net(task)
- nb_created += 1
- elif task["action"] == "DELETE":
- result, database_update = self.del_net(task)
- elif task["action"] == "FIND":
- result, database_update = self.get_net(task)
+ try:
+ # check if tasks that this depends on have been completed
+ dependency_not_completed = False
+ for task_index in task["extra"].get("depends_on", ()):
+ task_dependency = task["depends"].get("TASK-" + str(task_index))
+ if not task_dependency:
+ task_dependency = self._look_for_task(task["instance_action_id"], "TASK-" + str(task_index))
+ if not task_dependency:
+ raise VimThreadException(
+ "Cannot get depending net task trying to get depending task {}.{}".format(
+ task["instance_action_id"], task_index))
+ # task["depends"]["TASK-" + str(task_index)] = task_dependency #it references another object,so database must be look again
+ if task_dependency["status"] == "SCHEDULED":
+ dependency_not_completed = True
+ break
+ elif task_dependency["status"] == "FAILED":
+ raise VimThreadException(
+ "Cannot {} {}, task {}.{} {} because depends on failed {} {}, task{}.{}"
+ "task {}.{}".format(task["action"], task["item"],
+ task["instance_action_id"], task["task_index"],
+ task_dependency["instance_action_id"], task_dependency["task_index"],
+ task_dependency["action"], task_dependency["item"] ))
+ if dependency_not_completed:
+ # Move this task to the end.
+ task["extra"]["tries"] = task["extra"].get("tries", 0) + 1
+ if task["extra"]["tries"] <= 3:
+ self.pending_tasks.append(task)
+ continue
+ else:
+ raise VimThreadException(
+ "Cannot {} {}, (task {}.{}) because timeout waiting to complete {} {}, "
+ "(task {}.{})".format(task["action"], task["item"],
+ task["instance_action_id"], task["task_index"],
+ task_dependency["instance_action_id"], task_dependency["task_index"],
+ task_dependency["action"], task_dependency["item"]))
+
+ if task["status"] == "SUPERSEDED":
+ # not needed to do anything but update database with the new status
+ result = True
+ database_update = None
+ elif not self.vim:
+ task["status"] == "ERROR"
+ task["error_msg"] = self.error_status
+ result = False
+ database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]}
+ elif task["item"] == 'instance_vms':
+ if task["action"] == "CREATE":
+ result, database_update = self.new_vm(task)
+ nb_created += 1
+ elif task["action"] == "DELETE":
+ result, database_update = self.del_vm(task)
+ else:
+ raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
+ elif task["item"] == 'instance_nets':
+ if task["action"] == "CREATE":
+ result, database_update = self.new_net(task)
+ nb_created += 1
+ elif task["action"] == "DELETE":
+ result, database_update = self.del_net(task)
+ elif task["action"] == "FIND":
+ result, database_update = self.get_net(task)
+ else:
+ raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
else:
- raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
- else:
- raise vimconn.vimconnException(self.name + "unknown task item {}".format(task["item"]))
- # TODO
+ raise vimconn.vimconnException(self.name + "unknown task item {}".format(task["item"]))
+ # TODO
+ except VimThreadException as e:
+ result = False
+ task["error_msg"] = str(e)
+ task["status"] = "FAILED"
+ database_update = {"status": "VIM_ERROR", "vim_vm_id": None, "error_msg": task["error_msg"]}
- if task["status"] == "SCHEDULED":
- # This is because a depend task is not completed. Moved to the end. NOT USED YET
- if task["extra"].get("tries", 0) > 3:
- task["status"] == "FAILED"
- else:
- task["extra"]["tries"] = task["extra"].get("tries", 0) + 1
- self.pending_tasks.append(task)
- elif task["action"] == "DELETE":
+ if task["action"] == "DELETE":
action_key = task["item"] + task["item_id"]
del self.grouped_tasks[action_key]
elif task["action"] in ("CREATE", "FIND") and task["status"] in ("DONE", "BUILD"):
with self.db_lock:
self.db.update_rows(
table="vim_actions",
- UPDATE={"status": task["status"], "vim_id": task["vim_id"], "modified_at": now,
+ UPDATE={"status": task["status"], "vim_id": task.get("vim_id"), "modified_at": now,
"error_msg": task["error_msg"],
"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256)},
WHERE={"instance_action_id": task["instance_action_id"], "task_index": task["task_index"]})
error_text = ""
for net in net_list:
if "net_id" in net and is_task_id(net["net_id"]): # change task_id into network_id
- if net["net_id"] in depends:
- task_net = depends[net["net_id"]]
- else:
- task_net = self._look_for_task(task["instance_action_id"], net["net_id"])
- if not task_net:
- raise VimThreadException(
- "Error trying to get depending task from task_index={}".format(net["net_id"]))
- network_id = task_net.get("vim_id")
+ task_dependency = task["depends"].get(net["net_id"])
+ if not task_dependency:
+ task_dependency = self._look_for_task(task["instance_action_id"], net["net_id"])
+ if not task_dependency:
+ raise VimThreadException(
+ "Cannot get depending net task trying to get depending task {}.{}".format(
+ task["instance_action_id"], net["net_id"]))
+ network_id = task_dependency.get("vim_id")
if not network_id:
raise VimThreadException(
"Cannot create VM because depends on a network not created or found: " +
- str(task_net["error_msg"]))
+ str(depends[net["net_id"]]["error_msg"]))
net["net_id"] = network_id
vim_vm_id, created_items = self.vim.new_vminstance(*params)