From 3fcfdb7674436861d6ab0740972573293b9a355f Mon Sep 17 00:00:00 2001 From: tierno Date: Tue, 24 Oct 2017 07:48:24 +0200 Subject: [PATCH] Avoid database growth by cleaning old vim_actions Change-Id: Ib3ad233c1a85e72fe24229282b59a88bc536c692 Signed-off-by: tierno --- openmanod | 2 +- osm_ro/db_base.py | 248 +++++++++++++++++++++++-------------------- osm_ro/httpserver.py | 16 ++- osm_ro/nfvo.py | 147 +++++++++++++------------ osm_ro/vim_thread.py | 214 +++++++++++++++++++++++-------------- 5 files changed, 354 insertions(+), 273 deletions(-) diff --git a/openmanod b/openmanod index 313b8390..02428d70 100755 --- a/openmanod +++ b/openmanod @@ -48,7 +48,7 @@ import osm_ro __author__ = "Alfonso Tierno, Gerardo Garcia, Pablo Montes" __date__ = "$26-aug-2014 11:09:29$" -__version__ = "0.5.40-r550" +__version__ = "0.5.41-r551" version_date = "Nov 2017" database_version = 27 # expected database schema version diff --git a/osm_ro/db_base.py b/osm_ro/db_base.py index c8e5eb16..5bdd02ad 100644 --- a/osm_ro/db_base.py +++ b/osm_ro/db_base.py @@ -290,34 +290,58 @@ class db_base(): else: return str(data[0]) + '=' + json.dumps(str(data[1])) - def __tuple2db_format_where(self, data): - '''Compose the needed text for a SQL WHERE, parameter 'data' is a pair tuple (A,B), - and it returns the text 'A="B"', where A is a field of a table and B is the value - If B is None it returns the 'A is Null' text, without surrounding Null by quotes - If B is not None it returns the text "A='B'" or 'A="B"' where B is surrounded by quotes, - and it ensures internal quotes of B are escaped. - ''' - if data[1]==None: - return str(data[0]) + " is Null" - elif isinstance(data[1], str): - return str(data[0]) + '=' + json.dumps(data[1]) - else: - return str(data[0]) + '=' + json.dumps(str(data[1])) + def __create_where(self, data, use_or=None): + """ + Compose the needed text for a SQL WHERE, parameter 'data' can be a dict or a list of dict. By default lists are + concatenated with OR and dict with AND, unless parameter 'use_or' indicates other thing. + If a dict it will generate 'key1="value1" AND key2="value2" AND ...'. + If value is None, it will produce 'key is null' + If value is a list or tuple, it will produce 'key="value[0]" OR key="value[1]" OR ...' + keys can be suffixed by >,<,<>,>=,<= so that this is used to compare key and value instead of "=" + The special keys "OR", "AND" with a dict value is used to create a nested WHERE + If a list, each item will be a dictionary that will be concatenated with OR by default + :param data: dict or list of dicts + :param use_or: Can be None (use default behaviour), True (use OR) or False (use AND) + :return: a string with the content to send to mysql + """ + cmd = [] + if isinstance(data, dict): + for k, v in data.items(): + if k == "OR": + cmd.append("(" + self.__create_where(v, use_or=True) + ")") + continue + elif k == "AND": + cmd.append("(" + self.__create_where(v, use_or=False) + ")") + continue - def __tuple2db_format_where_not(self, data): - '''Compose the needed text for a SQL WHERE(not). parameter 'data' is a pair tuple (A,B), - and it returns the text 'A<>"B"', where A is a field of a table and B is the value - If B is None it returns the 'A is not Null' text, without surrounding Null by quotes - If B is not None it returns the text "A<>'B'" or 'A<>"B"' where B is surrounded by quotes, - and it ensures internal quotes of B are escaped. - ''' - if data[1]==None: - return str(data[0]) + " is not Null" - elif isinstance(data[1], str): - return str(data[0]) + '<>' + json.dumps(data[1]) + if k.endswith(">") or k.endswith("<") or k.endswith("="): + pass + else: + k += "=" + + if v is None: + cmd.append(k.replace("=", " is").replace("<>", " is not") + " Null") + elif isinstance(v, (tuple, list)): + cmd2 = [] + for v2 in v: + if v2 is None: + cmd2.append(k.replace("=", " is").replace("<>", " is not") + " Null") + else: + cmd2.append(k + json.dumps(str(v2))) + cmd.append("(" + " OR ".join(cmd2) + ")") + else: + cmd.append(k + json.dumps(str(v))) + elif isinstance(data, (tuple, list)): + if use_or is None: + use_or = True + for k in data: + cmd.append("(" + self.__create_where(k) + ")") else: - return str(data[0]) + '<>' + json.dumps(str(data[1])) - + raise db_base_Exception("invalid WHERE clause at '{}'".format(data)) + if use_or: + return " OR ".join(cmd) + return " AND ".join(cmd) + def __remove_quotes(self, data): '''remove single quotes ' of any string content of data dictionary''' for k,v in data.items(): @@ -326,19 +350,23 @@ class db_base(): data[k] = data[k].replace("'","_") def _update_rows(self, table, UPDATE, WHERE, modified_time=0): - ''' Update one or several rows into a table. - Atributes - UPDATE: dictionary with the key: value to change - table: table where to update - WHERE: dictionary of elements to update - Return: the number of updated rows, exception if error - ''' - #gettting uuid + """ Update one or several rows of a table. + :param UPDATE: dictionary with the changes. dict keys are database columns that will be set with the dict values + :param table: database table to update + :param WHERE: dict or list of dicts to compose the SQL WHERE clause. + If a dict it will generate 'key1="value1" AND key2="value2" AND ...'. + If value is None, it will produce 'key is null' + If value is a list or tuple, it will produce 'key="value[0]" OR key="value[1]" OR ...' + keys can be suffixed by >,<,<>,>=,<= so that this is used to compare key and value instead of "=" + The special keys "OR", "AND" with a dict value is used to create a nested WHERE + If a list, each item will be a dictionary that will be concatenated with OR + :return: the number of updated rows, raises exception upon error + """ + # gettting uuid values = ",".join(map(self.__tuple2db_format_set, UPDATE.iteritems() )) if modified_time: values += ",modified_at={:f}".format(modified_time) - cmd= "UPDATE " + table +" SET " + values +\ - " WHERE " + " and ".join(map(self.__tuple2db_format_where, WHERE.iteritems() )) + cmd= "UPDATE " + table + " SET " + values + " WHERE " + self.__create_where(WHERE) self.logger.debug(cmd) self.cur.execute(cmd) return self.cur.rowcount @@ -444,13 +472,19 @@ class db_base(): tries -= 1 def update_rows(self, table, UPDATE, WHERE, modified_time=0): - ''' Update one or several rows into a table. - Atributes - UPDATE: dictionary with the key: value to change - table: table where to update - WHERE: dictionary of elements to update - Return: (result, descriptive text) where result indicates the number of updated files - ''' + """ Update one or several rows of a table. + :param UPDATE: dictionary with the changes. dict keys are database columns that will be set with the dict values + :param table: database table to update + :param WHERE: dict or list of dicts to compose the SQL WHERE clause. + If a dict it will generate 'key1="value1" AND key2="value2" AND ...'. + If value is None, it will produce 'key is null' + If value is a list or tuple, it will produce 'key="value[0]" OR key="value[1]" OR ...' + keys can be suffixed by >,<,<>,>=,<= so that this is used to compare key and value instead of "=" + The special keys "OR", "AND" with a dict value is used to create a nested WHERE + If a list, each item will be a dictionary that will be concatenated with OR + :param modified_time: Can contain the time to be set to the table row + :return: the number of updated rows, raises exception upon error + """ if table in self.tables_with_created_field and modified_time==0: modified_time=time.time() tries = 2 @@ -487,31 +521,24 @@ class db_base(): tries -= 1 def delete_row(self, **sql_dict): - ''' Deletes rows from a table. - Attribute sql_dir: dictionary with the following key: value - 'FROM': string of table name (Mandatory) - 'WHERE': dict of key:values, translated to key=value AND ... (Optional) - 'WHERE_NOT': dict of key:values, translated to key<>value AND ... (Optional) - if value is None, it is translated to key is not null - 'LIMIT': limit of number of rows (Optional) - Return: the number of deleted or exception if error - ''' - #print sql_dict - from_ = "FROM " + str(sql_dict['FROM']) - #print 'from_', from_ - if 'WHERE' in sql_dict and len(sql_dict['WHERE']) > 0: - w=sql_dict['WHERE'] - where_ = "WHERE " + " AND ".join(map(self.__tuple2db_format_where, w.iteritems())) - else: where_ = "" - if 'WHERE_NOT' in sql_dict and len(sql_dict['WHERE_NOT']) > 0: - w=sql_dict['WHERE_NOT'] - where_2 = " AND ".join(map(self.__tuple2db_format_where_not, w.iteritems())) - if len(where_)==0: where_ = "WHERE " + where_2 - else: where_ = where_ + " AND " + where_2 - #print 'where_', where_ - limit_ = "LIMIT " + str(sql_dict['LIMIT']) if 'LIMIT' in sql_dict else "" - #print 'limit_', limit_ - cmd = " ".join( ("DELETE", from_, where_, limit_) ) + """ Deletes rows from a table. + :param UPDATE: dictionary with the changes. dict keys are database columns that will be set with the dict values + :param FROM: string with table name (Mandatory) + :param WHERE: dict or list of dicts to compose the SQL WHERE clause. (Optional) + If a dict it will generate 'key1="value1" AND key2="value2" AND ...'. + If value is None, it will produce 'key is null' + If value is a list or tuple, it will produce 'key="value[0]" OR key="value[1]" OR ...' + keys can be suffixed by >,<,<>,>=,<= so that this is used to compare key and value instead of "=" + The special keys "OR", "AND" with a dict value is used to create a nested WHERE + If a list, each item will be a dictionary that will be concatenated with OR + :return: the number of deleted rows, raises exception upon error + """ + # print sql_dict + cmd = "DELETE FROM " + str(sql_dict['FROM']) + if sql_dict.get('WHERE'): + cmd += " WHERE " + self.__create_where(sql_dict['WHERE']) + if sql_dict.get('LIMIT'): + cmd += " LIMIT " + str(sql_dict['LIMIT']) tries = 2 while tries: try: @@ -542,52 +569,44 @@ class db_base(): tries -= 1 def get_rows(self, **sql_dict): - ''' Obtain rows from a table. - Attribute sql_dir: dictionary with the following key: value - 'SELECT': list or tuple of fields to retrieve) (by default all) - 'FROM': string of table name (Mandatory) - 'WHERE': dict of key:values, translated to key=value (key is null) AND ... (Optional) - 'WHERE_NOT': dict of key:values, translated to key<>value (key is not null) AND ... (Optional) - 'WHERE_OR': dict of key:values, translated to key=value OR ... (Optional) - 'WHERE_AND_OR: str 'AND' or 'OR'(by default) mark the priority to 'WHERE AND (WHERE_OR)' or (WHERE) OR WHERE_OR' (Optional) - 'LIMIT': limit of number of rows (Optional) - 'ORDER_BY': list or tuple of fields to order, add ' DESC' to each item if inverse order is required - Return: a list with dictionaries at each row - ''' - #print sql_dict - select_= "SELECT " + ("*" if 'SELECT' not in sql_dict else ",".join(map(str,sql_dict['SELECT'])) ) - #print 'select_', select_ - from_ = "FROM " + str(sql_dict['FROM']) - #print 'from_', from_ - where_and = "" - where_or = "" - w=sql_dict.get('WHERE') - if w: - where_and = " AND ".join(map(self.__tuple2db_format_where, w.iteritems() )) - w=sql_dict.get('WHERE_NOT') - if w: - if where_and: where_and += " AND " - where_and += " AND ".join(map(self.__tuple2db_format_where_not, w.iteritems() ) ) - w=sql_dict.get('WHERE_OR') - if w: - where_or = " OR ".join(map(self.__tuple2db_format_where, w.iteritems() )) - if where_and and where_or: - if sql_dict.get("WHERE_AND_OR") == "AND": - where_ = "WHERE " + where_and + " AND (" + where_or + ")" + """ Obtain rows from a table. + :param SELECT: list or tuple of fields to retrieve) (by default all) + :param FROM: string with table name (Mandatory) + :param WHERE: dict or list of dicts to compose the SQL WHERE clause. (Optional) + If a dict it will generate 'key1="value1" AND key2="value2" AND ...'. + If value is None, it will produce 'key is null' + If value is a list or tuple, it will produce 'key="value[0]" OR key="value[1]" OR ...' + keys can be suffixed by >,<,<>,>=,<= so that this is used to compare key and value instead of "=" + The special keys "OR", "AND" with a dict value is used to create a nested WHERE + If a list, each item will be a dictionary that will be concatenated with OR + :param LIMIT: limit the number of obtianied entries (Optional) + :param ORDER_BY: list or tuple of fields to order, add ' DESC' to each item if inverse order is required + :return: a list with dictionaries at each row, raises exception upon error + """ + # print sql_dict + cmd = "SELECT " + if 'SELECT' in sql_dict: + if isinstance(sql_dict['SELECT'], (tuple, list)): + cmd += ",".join(map(str, sql_dict['SELECT'])) else: - where_ = "WHERE (" + where_and + ") OR " + where_or - elif where_and and not where_or: - where_ = "WHERE " + where_and - elif not where_and and where_or: - where_ = "WHERE " + where_or + cmd += sql_dict['SELECT'] else: - where_ = "" - #print 'where_', where_ - limit_ = "LIMIT " + str(sql_dict['LIMIT']) if 'LIMIT' in sql_dict else "" - order_ = "ORDER BY " + ",".join(map(str,sql_dict['ORDER_BY'])) if 'ORDER_BY' in sql_dict else "" - - #print 'limit_', limit_ - cmd = " ".join( (select_, from_, where_, limit_, order_) ) + cmd += "*" + + cmd += " FROM " + str(sql_dict['FROM']) + if sql_dict.get('WHERE'): + cmd += " WHERE " + self.__create_where(sql_dict['WHERE']) + + if 'ORDER_BY' in sql_dict: + cmd += " ORDER BY " + if isinstance(sql_dict['ORDER_BY'], (tuple, list)): + cmd += ",".join(map(str, sql_dict['ORDER_BY'])) + else: + cmd += str(sql_dict['ORDER_BY']) + + if 'LIMIT' in sql_dict: + cmd += " LIMIT " + str(sql_dict['LIMIT']) + tries = 2 while tries: try: @@ -617,15 +636,14 @@ class db_base(): if error_item_text==None: error_item_text = table what = 'uuid' if af.check_valid_uuid(uuid_name) else 'name' - cmd = " SELECT * FROM {} WHERE {}='{}'".format(table, what, uuid_name) + cmd = " SELECT * FROM {} WHERE {}='{}'".format(table, what, uuid_name) if WHERE_OR: - where_or = " OR ".join(map(self.__tuple2db_format_where, WHERE_OR.iteritems() )) + where_or = self.__create_where(WHERE_OR, use_or=True) if WHERE_AND_OR == "AND": cmd += " AND (" + where_or + ")" else: cmd += " OR " + where_or - tries = 2 while tries: try: diff --git a/osm_ro/httpserver.py b/osm_ro/httpserver.py index b0432678..0497bbac 100644 --- a/osm_ro/httpserver.py +++ b/osm_ro/httpserver.py @@ -1088,15 +1088,13 @@ def http_get_vnfs(tenant_id): nfvo.check_tenant(mydb, tenant_id) select_,where_,limit_ = filter_query_string(bottle.request.query, None, ('uuid', 'name', 'osm_id', 'description', 'public', "tenant_id", "created_at") ) - where_or = {} if tenant_id != "any": - where_or["tenant_id"] = tenant_id - where_or["public"] = True - vnfs = mydb.get_rows(FROM='vnfs', SELECT=select_,WHERE=where_,WHERE_OR=where_or, WHERE_AND_OR="AND",LIMIT=limit_) - #change_keys_http2db(content, http2db_vnf, reverse=True) + where_["OR"]={"tenant_id": tenant_id, "public": True} + vnfs = mydb.get_rows(FROM='vnfs', SELECT=select_, WHERE=where_, LIMIT=limit_) + # change_keys_http2db(content, http2db_vnf, reverse=True) utils.convert_str2boolean(vnfs, ('public',)) convert_datetime2str(vnfs) - data={'vnfs' : vnfs} + data={'vnfs': vnfs} return format_out(data) except bottle.HTTPError: raise @@ -1398,11 +1396,9 @@ def http_get_scenarios(tenant_id): #obtain data s,w,l=filter_query_string(bottle.request.query, None, ('uuid', 'name', 'osm_id', 'description', 'tenant_id', 'created_at', 'public')) - where_or={} if tenant_id != "any": - where_or["tenant_id"] = tenant_id - where_or["public"] = True - scenarios = mydb.get_rows(SELECT=s, WHERE=w, WHERE_OR=where_or, WHERE_AND_OR="AND", LIMIT=l, FROM='scenarios') + w["OR"] = {"tenant_id": tenant_id, "public": True} + scenarios = mydb.get_rows(SELECT=s, WHERE=w, LIMIT=l, FROM='scenarios') convert_datetime2str(scenarios) utils.convert_str2boolean(scenarios, ('public',) ) data={'scenarios':scenarios} diff --git a/osm_ro/nfvo.py b/osm_ro/nfvo.py index 69c4ac36..e5d13eba 100644 --- a/osm_ro/nfvo.py +++ b/osm_ro/nfvo.py @@ -138,9 +138,14 @@ def start_service(mydb): 'log_level_of': 'DEBUG' } try: + # starts ovim library ovim = ovim_module.ovim(ovim_configuration) ovim.start_service() + #delete old unneeded vim_actions + clean_db(mydb) + + # starts vim_threads from_= 'tenants_datacenters as td join datacenters as d on td.datacenter_id=d.uuid join '\ 'datacenter_tenants as dt on td.datacenter_tenant_id=dt.uuid' select_ = ('type', 'd.config as config', 'd.uuid as datacenter_id', 'vim_url', 'vim_url_admin', @@ -224,6 +229,37 @@ def get_version(): return ("openmanod version {} {}\n(c) Copyright Telefonica".format(global_config["version"], global_config["version_date"] )) +def clean_db(mydb): + """ + Clean unused or old entries at database to avoid unlimited growing + :param mydb: database connector + :return: None + """ + # get and delete unused vim_actions: all elements deleted, one week before, instance not present + now = t.time()-3600*24*7 + instance_action_id = None + nb_deleted = 0 + while True: + actions_to_delete = mydb.get_rows( + SELECT=("item", "item_id", "instance_action_id"), + FROM="vim_actions as va join instance_actions as ia on va.instance_action_id=ia.uuid " + "left join instance_scenarios as i on ia.instance_id=i.uuid", + WHERE={"va.action": "DELETE", "va.modified_at<": now, "i.uuid": None, + "va.status": ("DONE", "SUPERSEDED")}, + LIMIT=100 + ) + for to_delete in actions_to_delete: + mydb.delete_row(FROM="vim_actions", WHERE=to_delete) + if instance_action_id != to_delete["instance_action_id"]: + instance_action_id = to_delete["instance_action_id"] + mydb.delete_row(FROM="instance_actions", WHERE={"uuid": instance_action_id}) + nb_deleted += len(actions_to_delete) + if len(actions_to_delete) < 100: + break + if nb_deleted: + logger.debug("Removed {} unused vim_actions".format(nb_deleted)) + + def get_flavorlist(mydb, vnf_id, nfvo_tenant=None): '''Obtain flavorList @@ -1474,8 +1510,7 @@ def get_vnf_id(mydb, tenant_id, vnf_id): content = mydb.get_rows(FROM='vnfs join vms on vnfs.uuid=vms.vnf_id join interfaces on vms.uuid=interfaces.vm_id',\ SELECT=('interfaces.uuid as uuid','interfaces.external_name as external_name', 'vms.name as vm_name', 'interfaces.vm_id as vm_id', \ 'interfaces.internal_name as internal_name', 'interfaces.type as type', 'interfaces.vpci as vpci','interfaces.bw as bw'),\ - WHERE={'vnfs.uuid': vnf_id}, - WHERE_NOT={'interfaces.external_name': None} ) + WHERE={'vnfs.uuid': vnf_id, 'interfaces.external_name<>': None} ) #print content data['vnf']['external-connections'] = content @@ -1662,8 +1697,7 @@ def new_scenario(mydb, tenant_id, topo): #1.2: Check that VNF are present at database table vnfs. Insert uuid, description and external interfaces for name,vnf in vnfs.items(): - where={} - where_or={"tenant_id": tenant_id, 'public': "true"} + where = {"OR": {"tenant_id": tenant_id, 'public': "true"}} error_text = "" error_pos = "'topology':'nodes':'" + name + "'" if 'vnf_id' in vnf: @@ -1672,14 +1706,12 @@ def new_scenario(mydb, tenant_id, topo): if 'VNF model' in vnf: error_text += " 'VNF model' " + vnf['VNF model'] where['name'] = vnf['VNF model'] - if len(where) == 0: + if len(where) == 1: raise NfvoException("Descriptor need a 'vnf_id' or 'VNF model' field at " + error_pos, HTTP_Bad_Request) vnf_db = mydb.get_rows(SELECT=('uuid','name','description'), FROM='vnfs', - WHERE=where, - WHERE_OR=where_or, - WHERE_AND_OR="AND") + WHERE=where) if len(vnf_db)==0: raise NfvoException("unknown" + error_text + " at " + error_pos, HTTP_Not_Found) elif len(vnf_db)>1: @@ -1689,7 +1721,7 @@ def new_scenario(mydb, tenant_id, topo): #get external interfaces ext_ifaces = mydb.get_rows(SELECT=('external_name as name','i.uuid as iface_uuid', 'i.type as type'), FROM='vnfs join vms on vnfs.uuid=vms.vnf_id join interfaces as i on vms.uuid=i.vm_id', - WHERE={'vnfs.uuid':vnf['uuid']}, WHERE_NOT={'external_name':None} ) + WHERE={'vnfs.uuid':vnf['uuid'], 'external_name<>': None} ) for ext_iface in ext_ifaces: vnf['ifaces'][ ext_iface['name'] ] = {'uuid':ext_iface['iface_uuid'], 'type':ext_iface['type']} @@ -1928,8 +1960,7 @@ def new_scenario_v02(mydb, tenant_id, scenario_dict, version): # 1: Check that VNF are present at database table vnfs and update content into scenario dict for name,vnf in scenario["vnfs"].iteritems(): - where={} - where_or={"tenant_id": tenant_id, 'public': "true"} + where = {"OR": {"tenant_id": tenant_id, 'public': "true"}} error_text = "" error_pos = "'scenario':'vnfs':'" + name + "'" if 'vnf_id' in vnf: @@ -1938,13 +1969,11 @@ def new_scenario_v02(mydb, tenant_id, scenario_dict, version): if 'vnf_name' in vnf: error_text += " 'vnf_name' " + vnf['vnf_name'] where['name'] = vnf['vnf_name'] - if len(where) == 0: + if len(where) == 1: raise NfvoException("Needed a 'vnf_id' or 'vnf_name' at " + error_pos, HTTP_Bad_Request) vnf_db = mydb.get_rows(SELECT=('uuid', 'name', 'description'), FROM='vnfs', - WHERE=where, - WHERE_OR=where_or, - WHERE_AND_OR="AND") + WHERE=where) if len(vnf_db) == 0: raise NfvoException("Unknown" + error_text + " at " + error_pos, HTTP_Not_Found) elif len(vnf_db) > 1: @@ -1955,7 +1984,7 @@ def new_scenario_v02(mydb, tenant_id, scenario_dict, version): # get external interfaces ext_ifaces = mydb.get_rows(SELECT=('external_name as name', 'i.uuid as iface_uuid', 'i.type as type'), FROM='vnfs join vms on vnfs.uuid=vms.vnf_id join interfaces as i on vms.uuid=i.vm_id', - WHERE={'vnfs.uuid':vnf['uuid']}, WHERE_NOT={'external_name': None} ) + WHERE={'vnfs.uuid':vnf['uuid'], 'external_name<>': None} ) for ext_iface in ext_ifaces: vnf['ifaces'][ ext_iface['name'] ] = {'uuid':ext_iface['iface_uuid'], 'type': ext_iface['type']} # TODO? get internal-connections from db.nets and their profiles, and update scenario[vnfs][internal-connections] accordingly @@ -3224,6 +3253,7 @@ def delete_instance(mydb, tenant_id, instance_id): myvims = {} myvim_threads = {} vimthread_affected = {} + net2vm_dependencies = {} task_index = 0 instance_action_id = get_task_id() @@ -3262,36 +3292,29 @@ def delete_instance(mydb, tenant_id, instance_id): if not myvim: error_msg += "\n VM id={} cannot be deleted because datacenter={} not found".format(vm['vim_vm_id'], sce_vnf["datacenter_id"]) continue - try: - db_vim_action = { - "instance_action_id": instance_action_id, - "task_index": task_index, - "datacenter_vim_id": sce_vnf["datacenter_tenant_id"], - "action": "DELETE", - "status": "SCHEDULED", - "item": "instance_vms", - "item_id": vm["uuid"], - "extra": yaml.safe_dump({"params": vm["interfaces"]}, - default_flow_style=True, width=256) - } - task_index += 1 - db_vim_actions.append(db_vim_action) - - except vimconn.vimconnNotFoundException as e: - error_msg+="\n VM VIM_id={} not found at datacenter={}".format(vm['vim_vm_id'], sce_vnf["datacenter_id"]) - logger.warn("VM instance '%s'uuid '%s', VIM id '%s', from VNF_id '%s' not found", - vm['name'], vm['uuid'], vm['vim_vm_id'], sce_vnf['vnf_id']) - except vimconn.vimconnException as e: - error_msg+="\n VM VIM_id={} at datacenter={} Error: {} {}".format(vm['vim_vm_id'], sce_vnf["datacenter_id"], e.http_code, str(e)) - logger.error("Error %d deleting VM instance '%s'uuid '%s', VIM_id '%s', from VNF_id '%s': %s", - e.http_code, vm['name'], vm['uuid'], vm['vim_vm_id'], sce_vnf['vnf_id'], str(e)) + db_vim_action = { + "instance_action_id": instance_action_id, + "task_index": task_index, + "datacenter_vim_id": sce_vnf["datacenter_tenant_id"], + "action": "DELETE", + "status": "SCHEDULED", + "item": "instance_vms", + "item_id": vm["uuid"], + "extra": yaml.safe_dump({"params": vm["interfaces"]}, + default_flow_style=True, width=256) + } + db_vim_actions.append(db_vim_action) + for interface in vm["interfaces"]: + if not interface.get("instance_net_id"): + continue + if interface["instance_net_id"] not in net2vm_dependencies: + net2vm_dependencies[interface["instance_net_id"]] = [] + net2vm_dependencies[interface["instance_net_id"]].append(task_index) + task_index += 1 # 2.2 deleting NETS # net_fail_list=[] for net in instanceDict['nets']: - # TODO if not net['created']: - # TODO continue #skip not created nets - vimthread_affected[net["datacenter_tenant_id"]] = None datacenter_key = (net["datacenter_id"], net["datacenter_tenant_id"]) if datacenter_key not in myvims: @@ -3314,31 +3337,21 @@ def delete_instance(mydb, tenant_id, instance_id): if not myvim: error_msg += "\n Net VIM_id={} cannot be deleted because datacenter={} not found".format(net['vim_net_id'], net["datacenter_id"]) continue - try: - db_vim_action = { - "instance_action_id": instance_action_id, - "task_index": task_index, - "datacenter_vim_id": net["datacenter_tenant_id"], - "action": "DELETE", - "status": "SCHEDULED", - "item": "instance_nets", - "item_id": net["uuid"], - "extra": yaml.safe_dump({"params": (net['vim_net_id'], net['sdn_net_id'])}, - default_flow_style=True, width=256) - } - task_index += 1 - db_vim_actions.append(db_vim_action) - - except vimconn.vimconnNotFoundException as e: - error_msg += "\n NET VIM_id={} not found at datacenter={}".format(net['vim_net_id'], net["datacenter_id"]) - logger.warn("NET '%s', VIM_id '%s', from VNF_net_id '%s' not found", - net['uuid'], net['vim_net_id'], str(net['vnf_net_id'])) - except vimconn.vimconnException as e: - error_msg += "\n NET VIM_id={} at datacenter={} Error: {} {}".format(net['vim_net_id'], - net["datacenter_id"], - e.http_code, str(e)) - logger.error("Error %d deleting NET '%s', VIM_id '%s', from VNF_net_id '%s': %s", - e.http_code, net['uuid'], net['vim_net_id'], str(net['vnf_net_id']), str(e)) + extra = {"params": (net['vim_net_id'], net['sdn_net_id'])} + if net2vm_dependencies.get(net["uuid"]): + extra["depends_on"] = net2vm_dependencies[net["uuid"]] + db_vim_action = { + "instance_action_id": instance_action_id, + "task_index": task_index, + "datacenter_vim_id": net["datacenter_tenant_id"], + "action": "DELETE", + "status": "SCHEDULED", + "item": "instance_nets", + "item_id": net["uuid"], + "extra": yaml.safe_dump(extra, default_flow_style=True, width=256) + } + task_index += 1 + db_vim_actions.append(db_vim_action) db_instance_action["number_tasks"] = task_index db_tables = [ diff --git a/osm_ro/vim_thread.py b/osm_ro/vim_thread.py index 2c30fb90..69e7f8c9 100644 --- a/osm_ro/vim_thread.py +++ b/osm_ro/vim_thread.py @@ -24,19 +24,19 @@ """" This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM. The tasks are stored at database in table vim_actions -The task content are (M: stored at memory, D: stored at database): +The task content is (M: stored at memory, D: stored at database): MD instance_action_id: reference a global action over an instance-scenario: database instance_actions - MD task_index: index number of the task. This with the previous are a key + MD task_index: index number of the task. This together with the previous forms a unique key identifier MD datacenter_vim_id: should contain the uuid of the VIM managed by this thread MD vim_id: id of the vm,net,etc at VIM MD action: CREATE, DELETE, FIND MD item: database table name, can be instance_vms, instance_nets, TODO: datacenter_flavors, datacenter_images - MD item_id: uuid of the referenced entry in the preious table + MD item_id: uuid of the referenced entry in the previous table MD status: SCHEDULED,BUILD,DONE,FAILED,SUPERSEDED MD extra: text with yaml format at database, dict at memory with: params: list with the params to be sent to the VIM for CREATE or FIND. For DELETE the vim_id is taken from other related tasks find: (only for CREATE tasks) if present it should FIND before creating and use if existing. Contains the FIND params - depends_on: list with the 'task_index'es of tasks that must be completed before. e.g. a vm depends on a net + depends_on: list with the 'task_index'es of tasks that must be completed before. e.g. a vm creation depends on a net creation sdn_net_id: used for net. tries: interfaces: used for VMs. Each key is the uuid of the instance_interfaces entry at database @@ -68,9 +68,6 @@ from lib_osm_openvim.ovim import ovimException __author__ = "Alfonso Tierno, Pablo Montes" __date__ = "$28-Sep-2017 12:07:15$" -# from logging import Logger -# import auxiliary_functions as af - def is_task_id(task_id): return task_id.startswith("TASK-") @@ -141,34 +138,57 @@ class vim_thread(threading.Thread): task_list = [] old_action_key = None - with self.db_lock: - vim_actions = self.db.get_rows(FROM="vim_actions", - WHERE={"datacenter_vim_id": self.datacenter_tenant_id }, - ORDER_BY=("item", "item_id", "created_at",)) - for task in vim_actions: - item = task["item"] - item_id = task["item_id"] - action_key = item + item_id - if old_action_key != action_key: - if not action_completed and task_list: - # This will fill needed task parameters into memory, and insert the task if needed in - # self.pending_tasks or self.refresh_tasks - self._insert_pending_tasks(task_list) - task_list = [] - old_action_key = action_key - action_completed = False - elif action_completed: - continue + old_item_id = "" + old_item = "" + old_created_at = 0.0 + database_limit = 200 + while True: + # get 200 (database_limit) entries each time + with self.db_lock: + vim_actions = self.db.get_rows(FROM="vim_actions", + WHERE={"datacenter_vim_id": self.datacenter_tenant_id, + "item_id>=": old_item_id}, + ORDER_BY=("item_id", "item", "created_at",), + LIMIT=database_limit) + for task in vim_actions: + item = task["item"] + item_id = task["item_id"] + + # skip the first entries that are already processed in the previous pool of 200 + if old_item_id: + if item_id == old_item_id and item == old_item and task["created_at"] == old_created_at: + old_item_id = False # next one will be a new un-processed task + continue - if task["status"] == "SCHEDULED" or task["action"] == "CREATE" or task["action"] == "FIND": - task_list.append(task) - elif task["action"] == "DELETE": - # action completed because deleted and status is not SCHEDULED. Not needed anything - action_completed = True + action_key = item + item_id + if old_action_key != action_key: + if not action_completed and task_list: + # This will fill needed task parameters into memory, and insert the task if needed in + # self.pending_tasks or self.refresh_tasks + self._insert_pending_tasks(task_list) + task_list = [] + old_action_key = action_key + action_completed = False + elif action_completed: + continue + if task["status"] == "SCHEDULED" or task["action"] == "CREATE" or task["action"] == "FIND": + task_list.append(task) + elif task["action"] == "DELETE": + # action completed because deleted and status is not SCHEDULED. Not needed anything + action_completed = True + if len(vim_actions) == database_limit: + # update variables for get the next database iteration + old_item_id = item_id + old_item = item + old_created_at = task["created_at"] + else: + break # Last actions group need to be inserted too if not action_completed and task_list: self._insert_pending_tasks(task_list) + self.logger.debug("reloaded vim actions pending:{} refresh:{}".format( + len(self.pending_tasks), len(self.refresh_tasks))) def _refres_elements(self): """Call VIM to get VMs and networks status until 10 elements""" @@ -347,13 +367,13 @@ class vim_thread(threading.Thread): if vim_info.get("error_msg"): vim_info["error_msg"] = self._format_vim_error_msg(vim_info["error_msg"]) if task_vim_status != vim_info["status"] or task_error_msg != vim_info.get("error_msg") or \ - task_vim_info != vim_info["vim_info"]: + task_vim_info != vim_info.get("vim_info"): task["extra"]["vim_status"] = vim_info["status"] task["error_msg"] = vim_info.get("error_msg") - task["vim_info"] = vim_info["vim_info"] + task["vim_info"] = vim_info.get("vim_info") temp_dict = {"status": vim_info["status"], "error_msg": vim_info.get("error_msg"), - "vim_info": vim_info["vim_info"]} + "vim_info": vim_info.get("vim_info")} with self.db_lock: self.db.update_rows('instance_nets', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]}) self.db.update_rows( @@ -416,45 +436,79 @@ class vim_thread(threading.Thread): while self.pending_tasks: task = self.pending_tasks.pop(0) nb_processed += 1 - if task["status"] == "SUPERSEDED": - # not needed to do anything but update database with the new status - result = True - database_update = None - elif not self.vim: - task["status"] == "ERROR" - task["error_msg"] = self.error_status - result = False - database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]} - elif task["item"] == 'instance_vms': - if task["action"] == "CREATE": - result, database_update = self.new_vm(task) - nb_created += 1 - elif task["action"] == "DELETE": - result, database_update = self.del_vm(task) - else: - raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"])) - elif task["item"] == 'instance_nets': - if task["action"] == "CREATE": - result, database_update = self.new_net(task) - nb_created += 1 - elif task["action"] == "DELETE": - result, database_update = self.del_net(task) - elif task["action"] == "FIND": - result, database_update = self.get_net(task) + try: + # check if tasks that this depends on have been completed + dependency_not_completed = False + for task_index in task["extra"].get("depends_on", ()): + task_dependency = task["depends"].get("TASK-" + str(task_index)) + if not task_dependency: + task_dependency = self._look_for_task(task["instance_action_id"], "TASK-" + str(task_index)) + if not task_dependency: + raise VimThreadException( + "Cannot get depending net task trying to get depending task {}.{}".format( + task["instance_action_id"], task_index)) + # task["depends"]["TASK-" + str(task_index)] = task_dependency #it references another object,so database must be look again + if task_dependency["status"] == "SCHEDULED": + dependency_not_completed = True + break + elif task_dependency["status"] == "FAILED": + raise VimThreadException( + "Cannot {} {}, task {}.{} {} because depends on failed {} {}, task{}.{}" + "task {}.{}".format(task["action"], task["item"], + task["instance_action_id"], task["task_index"], + task_dependency["instance_action_id"], task_dependency["task_index"], + task_dependency["action"], task_dependency["item"] )) + if dependency_not_completed: + # Move this task to the end. + task["extra"]["tries"] = task["extra"].get("tries", 0) + 1 + if task["extra"]["tries"] <= 3: + self.pending_tasks.append(task) + continue + else: + raise VimThreadException( + "Cannot {} {}, (task {}.{}) because timeout waiting to complete {} {}, " + "(task {}.{})".format(task["action"], task["item"], + task["instance_action_id"], task["task_index"], + task_dependency["instance_action_id"], task_dependency["task_index"], + task_dependency["action"], task_dependency["item"])) + + if task["status"] == "SUPERSEDED": + # not needed to do anything but update database with the new status + result = True + database_update = None + elif not self.vim: + task["status"] == "ERROR" + task["error_msg"] = self.error_status + result = False + database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]} + elif task["item"] == 'instance_vms': + if task["action"] == "CREATE": + result, database_update = self.new_vm(task) + nb_created += 1 + elif task["action"] == "DELETE": + result, database_update = self.del_vm(task) + else: + raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"])) + elif task["item"] == 'instance_nets': + if task["action"] == "CREATE": + result, database_update = self.new_net(task) + nb_created += 1 + elif task["action"] == "DELETE": + result, database_update = self.del_net(task) + elif task["action"] == "FIND": + result, database_update = self.get_net(task) + else: + raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"])) else: - raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"])) - else: - raise vimconn.vimconnException(self.name + "unknown task item {}".format(task["item"])) - # TODO + raise vimconn.vimconnException(self.name + "unknown task item {}".format(task["item"])) + # TODO + except VimThreadException as e: + result = False + task["error_msg"] = str(e) + task["status"] = "FAILED" + database_update = {"status": "VIM_ERROR", "vim_vm_id": None, "error_msg": task["error_msg"]} - if task["status"] == "SCHEDULED": - # This is because a depend task is not completed. Moved to the end. NOT USED YET - if task["extra"].get("tries", 0) > 3: - task["status"] == "FAILED" - else: - task["extra"]["tries"] = task["extra"].get("tries", 0) + 1 - self.pending_tasks.append(task) - elif task["action"] == "DELETE": + if task["action"] == "DELETE": action_key = task["item"] + task["item_id"] del self.grouped_tasks[action_key] elif task["action"] in ("CREATE", "FIND") and task["status"] in ("DONE", "BUILD"): @@ -469,7 +523,7 @@ class vim_thread(threading.Thread): with self.db_lock: self.db.update_rows( table="vim_actions", - UPDATE={"status": task["status"], "vim_id": task["vim_id"], "modified_at": now, + UPDATE={"status": task["status"], "vim_id": task.get("vim_id"), "modified_at": now, "error_msg": task["error_msg"], "extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256)}, WHERE={"instance_action_id": task["instance_action_id"], "task_index": task["task_index"]}) @@ -641,18 +695,18 @@ class vim_thread(threading.Thread): error_text = "" for net in net_list: if "net_id" in net and is_task_id(net["net_id"]): # change task_id into network_id - if net["net_id"] in depends: - task_net = depends[net["net_id"]] - else: - task_net = self._look_for_task(task["instance_action_id"], net["net_id"]) - if not task_net: - raise VimThreadException( - "Error trying to get depending task from task_index={}".format(net["net_id"])) - network_id = task_net.get("vim_id") + task_dependency = task["depends"].get(net["net_id"]) + if not task_dependency: + task_dependency = self._look_for_task(task["instance_action_id"], net["net_id"]) + if not task_dependency: + raise VimThreadException( + "Cannot get depending net task trying to get depending task {}.{}".format( + task["instance_action_id"], net["net_id"])) + network_id = task_dependency.get("vim_id") if not network_id: raise VimThreadException( "Cannot create VM because depends on a network not created or found: " + - str(task_net["error_msg"])) + str(depends[net["net_id"]]["error_msg"])) net["net_id"] = network_id vim_vm_id, created_items = self.vim.new_vminstance(*params) -- 2.25.1