Avoid database growth by cleaning old vim_actions 29/5629/5
authortierno <alfonso.tiernosepulveda@telefonica.com>
Tue, 24 Oct 2017 05:48:24 +0000 (07:48 +0200)
committertierno <alfonso.tiernosepulveda@telefonica.com>
Tue, 14 Nov 2017 12:42:13 +0000 (13:42 +0100)
Change-Id: Ib3ad233c1a85e72fe24229282b59a88bc536c692
Signed-off-by: tierno <alfonso.tiernosepulveda@telefonica.com>
openmanod
osm_ro/db_base.py
osm_ro/httpserver.py
osm_ro/nfvo.py
osm_ro/vim_thread.py

index 313b839..02428d7 100755 (executable)
--- a/openmanod
+++ b/openmanod
@@ -48,7 +48,7 @@ import osm_ro
 
 __author__ = "Alfonso Tierno, Gerardo Garcia, Pablo Montes"
 __date__ = "$26-aug-2014 11:09:29$"
-__version__ = "0.5.40-r550"
+__version__ = "0.5.41-r551"
 version_date = "Nov 2017"
 database_version = 27      # expected database schema version
 
index c8e5eb1..5bdd02a 100644 (file)
@@ -290,34 +290,58 @@ class db_base():
         else:
             return str(data[0]) + '=' + json.dumps(str(data[1]))
     
-    def __tuple2db_format_where(self, data):
-        '''Compose the needed text for a SQL WHERE, parameter 'data' is a pair tuple (A,B),
-        and it returns the text 'A="B"', where A is a field of a table and B is the value 
-        If B is None it returns the 'A is Null' text, without surrounding Null by quotes
-        If B is not None it returns the text "A='B'" or 'A="B"' where B is surrounded by quotes,
-        and it ensures internal quotes of B are escaped.
-        '''
-        if data[1]==None:
-            return str(data[0]) + " is Null"
-        elif isinstance(data[1], str):
-            return str(data[0]) + '=' + json.dumps(data[1])
-        else:
-            return str(data[0]) + '=' + json.dumps(str(data[1]))
+    def __create_where(self, data, use_or=None):
+        """
+        Compose the needed text for a SQL WHERE, parameter 'data' can be a dict or a list of dict. By default lists are
+        concatenated with OR and dict with AND, unless parameter 'use_or' indicates other thing.
+        If a dict it will generate 'key1="value1" AND key2="value2" AND ...'.
+            If value is None, it will produce 'key is null'
+            If value is a list or tuple, it will produce 'key="value[0]" OR key="value[1]" OR ...'
+            keys can be suffixed by >,<,<>,>=,<= so that this is used to compare key and value instead of "="
+        The special keys "OR", "AND" with a dict value is used to create a nested WHERE
+        If a list, each item will be a dictionary that will be concatenated with OR by default
+        :param data: dict or list of dicts
+        :param use_or: Can be None (use default behaviour), True (use OR) or False (use AND)
+        :return: a string with the content to send to mysql
+        """
+        cmd = []
+        if isinstance(data, dict):
+            for k, v in data.items():
+                if k == "OR":
+                    cmd.append("(" + self.__create_where(v, use_or=True) + ")")
+                    continue
+                elif k == "AND":
+                    cmd.append("(" + self.__create_where(v, use_or=False) + ")")
+                    continue
 
-    def __tuple2db_format_where_not(self, data):
-        '''Compose the needed text for a SQL WHERE(not). parameter 'data' is a pair tuple (A,B),
-        and it returns the text 'A<>"B"', where A is a field of a table and B is the value 
-        If B is None it returns the 'A is not Null' text, without surrounding Null by quotes
-        If B is not None it returns the text "A<>'B'" or 'A<>"B"' where B is surrounded by quotes,
-        and it ensures internal quotes of B are escaped.
-        '''
-        if data[1]==None:
-            return str(data[0]) + " is not Null"
-        elif isinstance(data[1], str):
-            return str(data[0]) + '<>' + json.dumps(data[1])
+                if k.endswith(">") or k.endswith("<") or k.endswith("="):
+                    pass
+                else:
+                    k += "="
+
+                if v is None:
+                    cmd.append(k.replace("=", " is").replace("<>", " is not") + " Null")
+                elif isinstance(v, (tuple, list)):
+                    cmd2 = []
+                    for v2 in v:
+                        if v2 is None:
+                            cmd2.append(k.replace("=", " is").replace("<>", " is not") + " Null")
+                        else:
+                            cmd2.append(k + json.dumps(str(v2)))
+                    cmd.append("(" + " OR ".join(cmd2) + ")")
+                else:
+                    cmd.append(k + json.dumps(str(v)))
+        elif isinstance(data, (tuple, list)):
+            if use_or is None:
+                use_or = True
+            for k in data:
+                cmd.append("(" + self.__create_where(k) + ")")
         else:
-            return str(data[0]) + '<>' + json.dumps(str(data[1]))
-    
+            raise db_base_Exception("invalid WHERE clause at '{}'".format(data))
+        if use_or:
+            return " OR ".join(cmd)
+        return " AND ".join(cmd)
+
     def __remove_quotes(self, data):
         '''remove single quotes ' of any string content of data dictionary'''
         for k,v in data.items():
@@ -326,19 +350,23 @@ class db_base():
                     data[k] = data[k].replace("'","_")
     
     def _update_rows(self, table, UPDATE, WHERE, modified_time=0):
-        ''' Update one or several rows into a table.
-        Atributes
-            UPDATE: dictionary with the key: value to change
-            table: table where to update
-            WHERE: dictionary of elements to update
-        Return: the number of updated rows, exception if error
-        '''
-                #gettting uuid 
+        """ Update one or several rows of a table.
+        :param UPDATE: dictionary with the changes. dict keys are database columns that will be set with the dict values
+        :param table: database table to update
+        :param WHERE: dict or list of dicts to compose the SQL WHERE clause.
+            If a dict it will generate 'key1="value1" AND key2="value2" AND ...'.
+                If value is None, it will produce 'key is null'
+                If value is a list or tuple, it will produce 'key="value[0]" OR key="value[1]" OR ...'
+                keys can be suffixed by >,<,<>,>=,<= so that this is used to compare key and value instead of "="
+                The special keys "OR", "AND" with a dict value is used to create a nested WHERE
+            If a list, each item will be a dictionary that will be concatenated with OR
+        :return: the number of updated rows, raises exception upon error
+        """
+        # gettting uuid
         values = ",".join(map(self.__tuple2db_format_set, UPDATE.iteritems() ))
         if modified_time:
             values += ",modified_at={:f}".format(modified_time)
-        cmd= "UPDATE " + table +" SET " + values +\
-            " WHERE " + " and ".join(map(self.__tuple2db_format_where, WHERE.iteritems() ))
+        cmd= "UPDATE " + table + " SET " + values + " WHERE " + self.__create_where(WHERE)
         self.logger.debug(cmd)
         self.cur.execute(cmd) 
         return self.cur.rowcount
@@ -444,13 +472,19 @@ class db_base():
             tries -= 1
 
     def update_rows(self, table, UPDATE, WHERE, modified_time=0):
-        ''' Update one or several rows into a table.
-        Atributes
-            UPDATE: dictionary with the key: value to change
-            table: table where to update
-            WHERE: dictionary of elements to update
-        Return: (result, descriptive text) where result indicates the number of updated files
-        '''
+        """ Update one or several rows of a table.
+        :param UPDATE: dictionary with the changes. dict keys are database columns that will be set with the dict values
+        :param table: database table to update
+        :param WHERE: dict or list of dicts to compose the SQL WHERE clause.
+            If a dict it will generate 'key1="value1" AND key2="value2" AND ...'.
+                If value is None, it will produce 'key is null'
+                If value is a list or tuple, it will produce 'key="value[0]" OR key="value[1]" OR ...'
+                keys can be suffixed by >,<,<>,>=,<= so that this is used to compare key and value instead of "="
+                The special keys "OR", "AND" with a dict value is used to create a nested WHERE
+            If a list, each item will be a dictionary that will be concatenated with OR
+        :param modified_time: Can contain the time to be set to the table row
+        :return: the number of updated rows, raises exception upon error
+        """
         if table in self.tables_with_created_field and modified_time==0:
             modified_time=time.time()
         tries = 2
@@ -487,31 +521,24 @@ class db_base():
             tries -= 1
 
     def delete_row(self, **sql_dict):
-        ''' Deletes rows from a table.
-        Attribute sql_dir: dictionary with the following key: value
-            'FROM': string of table name (Mandatory)
-            'WHERE': dict of key:values, translated to key=value AND ... (Optional)
-            'WHERE_NOT': dict of key:values, translated to key<>value AND ... (Optional) 
-                if value is None, it is translated to key is not null
-            'LIMIT': limit of number of rows (Optional)
-        Return: the number of deleted or exception if error
-        '''
-        #print sql_dict
-        from_  = "FROM " + str(sql_dict['FROM'])
-        #print 'from_', from_
-        if 'WHERE' in sql_dict and len(sql_dict['WHERE']) > 0:
-            w=sql_dict['WHERE']
-            where_ = "WHERE " + " AND ".join(map(self.__tuple2db_format_where, w.iteritems())) 
-        else: where_ = ""
-        if 'WHERE_NOT' in sql_dict and len(sql_dict['WHERE_NOT']) > 0: 
-            w=sql_dict['WHERE_NOT']
-            where_2 = " AND ".join(map(self.__tuple2db_format_where_not, w.iteritems()))
-            if len(where_)==0:   where_ = "WHERE " + where_2
-            else:                where_ = where_ + " AND " + where_2
-        #print 'where_', where_
-        limit_ = "LIMIT " + str(sql_dict['LIMIT']) if 'LIMIT' in sql_dict else ""
-        #print 'limit_', limit_
-        cmd =  " ".join( ("DELETE", from_, where_, limit_) )
+        """ Deletes rows from a table.
+        :param UPDATE: dictionary with the changes. dict keys are database columns that will be set with the dict values
+        :param FROM: string with table name (Mandatory)
+        :param WHERE: dict or list of dicts to compose the SQL WHERE clause. (Optional)
+            If a dict it will generate 'key1="value1" AND key2="value2" AND ...'.
+                If value is None, it will produce 'key is null'
+                If value is a list or tuple, it will produce 'key="value[0]" OR key="value[1]" OR ...'
+                keys can be suffixed by >,<,<>,>=,<= so that this is used to compare key and value instead of "="
+                The special keys "OR", "AND" with a dict value is used to create a nested WHERE
+            If a list, each item will be a dictionary that will be concatenated with OR
+        :return: the number of deleted rows, raises exception upon error
+        """
+        # print sql_dict
+        cmd = "DELETE FROM " + str(sql_dict['FROM'])
+        if sql_dict.get('WHERE'):
+            cmd += " WHERE " + self.__create_where(sql_dict['WHERE'])
+        if sql_dict.get('LIMIT'):
+            cmd += " LIMIT " + str(sql_dict['LIMIT'])
         tries = 2
         while tries:
             try:
@@ -542,52 +569,44 @@ class db_base():
             tries -= 1
     
     def get_rows(self, **sql_dict):
-        ''' Obtain rows from a table.
-        Attribute sql_dir: dictionary with the following key: value
-            'SELECT':    list or tuple of fields to retrieve) (by default all)
-            'FROM':      string of table name (Mandatory)
-            'WHERE':     dict of key:values, translated to key=value (key is null) AND ... (Optional)
-            'WHERE_NOT': dict of key:values, translated to key<>value (key is not null) AND ... (Optional)
-            'WHERE_OR': dict of key:values, translated to key=value OR ... (Optional)
-            'WHERE_AND_OR: str 'AND' or 'OR'(by default) mark the priority to 'WHERE AND (WHERE_OR)' or (WHERE) OR WHERE_OR' (Optional)
-            'LIMIT':     limit of number of rows (Optional)
-            'ORDER_BY':  list or tuple of fields to order, add ' DESC'  to each item if inverse order is required
-        Return: a list with dictionaries at each row
-        '''
-        #print sql_dict
-        select_= "SELECT " + ("*" if 'SELECT' not in sql_dict else ",".join(map(str,sql_dict['SELECT'])) )
-        #print 'select_', select_
-        from_  = "FROM " + str(sql_dict['FROM'])
-        #print 'from_', from_
-        where_and = ""
-        where_or = ""
-        w=sql_dict.get('WHERE')
-        if w:
-            where_and = " AND ".join(map(self.__tuple2db_format_where, w.iteritems() ))
-        w=sql_dict.get('WHERE_NOT')
-        if w: 
-            if where_and: where_and += " AND "
-            where_and += " AND ".join(map(self.__tuple2db_format_where_not, w.iteritems() ) )
-        w=sql_dict.get('WHERE_OR')
-        if w:
-            where_or =  " OR ".join(map(self.__tuple2db_format_where, w.iteritems() ))
-        if where_and and where_or:
-            if sql_dict.get("WHERE_AND_OR") == "AND":
-                where_ = "WHERE " + where_and + " AND (" + where_or + ")"
+        """ Obtain rows from a table.
+        :param SELECT: list or tuple of fields to retrieve) (by default all)
+        :param FROM: string with table name (Mandatory)
+        :param WHERE: dict or list of dicts to compose the SQL WHERE clause. (Optional)
+            If a dict it will generate 'key1="value1" AND key2="value2" AND ...'.
+                If value is None, it will produce 'key is null'
+                If value is a list or tuple, it will produce 'key="value[0]" OR key="value[1]" OR ...'
+                keys can be suffixed by >,<,<>,>=,<= so that this is used to compare key and value instead of "="
+                The special keys "OR", "AND" with a dict value is used to create a nested WHERE
+            If a list, each item will be a dictionary that will be concatenated with OR
+        :param LIMIT: limit the number of obtianied entries (Optional)
+        :param ORDER_BY:  list or tuple of fields to order, add ' DESC' to each item if inverse order is required
+        :return: a list with dictionaries at each row, raises exception upon error
+        """
+        # print sql_dict
+        cmd = "SELECT "
+        if 'SELECT' in sql_dict:
+            if isinstance(sql_dict['SELECT'], (tuple, list)):
+                cmd += ",".join(map(str, sql_dict['SELECT']))
             else:
-                where_ = "WHERE (" + where_and + ") OR " + where_or
-        elif where_and and not where_or:
-            where_ = "WHERE " + where_and
-        elif not where_and and where_or:
-            where_ = "WHERE " + where_or
+                cmd += sql_dict['SELECT']
         else:
-            where_ = ""
-        #print 'where_', where_
-        limit_ = "LIMIT " + str(sql_dict['LIMIT']) if 'LIMIT' in sql_dict else ""
-        order_ = "ORDER BY " + ",".join(map(str,sql_dict['ORDER_BY'])) if 'ORDER_BY' in sql_dict else ""
-        
-        #print 'limit_', limit_
-        cmd =  " ".join( (select_, from_, where_, limit_, order_) )
+            cmd += "*"
+
+        cmd += " FROM " + str(sql_dict['FROM'])
+        if sql_dict.get('WHERE'):
+            cmd += " WHERE " + self.__create_where(sql_dict['WHERE'])
+
+        if 'ORDER_BY' in sql_dict:
+            cmd += " ORDER BY "
+            if isinstance(sql_dict['ORDER_BY'], (tuple, list)):
+                cmd += ",".join(map(str, sql_dict['ORDER_BY']))
+            else:
+                cmd += str(sql_dict['ORDER_BY'])
+
+        if 'LIMIT' in sql_dict:
+            cmd += " LIMIT " + str(sql_dict['LIMIT'])
+
         tries = 2
         while tries:
             try:
@@ -617,15 +636,14 @@ class db_base():
         if error_item_text==None:
             error_item_text = table
         what = 'uuid' if af.check_valid_uuid(uuid_name) else 'name'
-        cmd =  " SELECT * FROM {} WHERE {}='{}'".format(table, what, uuid_name)
+        cmd = " SELECT * FROM {} WHERE {}='{}'".format(table, what, uuid_name)
         if WHERE_OR:
-            where_or =  " OR ".join(map(self.__tuple2db_format_where, WHERE_OR.iteritems() ))
+            where_or = self.__create_where(WHERE_OR, use_or=True)
             if WHERE_AND_OR == "AND":
                 cmd += " AND (" + where_or + ")"
             else:
                 cmd += " OR " + where_or
 
-        
         tries = 2
         while tries:
             try:
index b043267..0497bba 100644 (file)
@@ -1088,15 +1088,13 @@ def http_get_vnfs(tenant_id):
             nfvo.check_tenant(mydb, tenant_id)
         select_,where_,limit_ = filter_query_string(bottle.request.query, None,
                 ('uuid', 'name', 'osm_id', 'description', 'public', "tenant_id", "created_at") )
-        where_or = {}
         if tenant_id != "any":
-            where_or["tenant_id"] = tenant_id
-            where_or["public"] = True
-        vnfs = mydb.get_rows(FROM='vnfs', SELECT=select_,WHERE=where_,WHERE_OR=where_or, WHERE_AND_OR="AND",LIMIT=limit_)
-        #change_keys_http2db(content, http2db_vnf, reverse=True)
+            where_["OR"]={"tenant_id": tenant_id, "public": True}
+        vnfs = mydb.get_rows(FROM='vnfs', SELECT=select_, WHERE=where_, LIMIT=limit_)
+        # change_keys_http2db(content, http2db_vnf, reverse=True)
         utils.convert_str2boolean(vnfs, ('public',))
         convert_datetime2str(vnfs)
-        data={'vnfs' : vnfs}
+        data={'vnfs': vnfs}
         return format_out(data)
     except bottle.HTTPError:
         raise
@@ -1398,11 +1396,9 @@ def http_get_scenarios(tenant_id):
         #obtain data
         s,w,l=filter_query_string(bottle.request.query, None,
                                   ('uuid', 'name', 'osm_id', 'description', 'tenant_id', 'created_at', 'public'))
-        where_or={}
         if tenant_id != "any":
-            where_or["tenant_id"] = tenant_id
-            where_or["public"] = True
-        scenarios = mydb.get_rows(SELECT=s, WHERE=w, WHERE_OR=where_or, WHERE_AND_OR="AND", LIMIT=l, FROM='scenarios')
+            w["OR"] = {"tenant_id": tenant_id, "public": True}
+        scenarios = mydb.get_rows(SELECT=s, WHERE=w, LIMIT=l, FROM='scenarios')
         convert_datetime2str(scenarios)
         utils.convert_str2boolean(scenarios, ('public',) )
         data={'scenarios':scenarios}
index 69c4ac3..e5d13eb 100644 (file)
@@ -138,9 +138,14 @@ def start_service(mydb):
         'log_level_of': 'DEBUG'
     }
     try:
+        # starts ovim library
         ovim = ovim_module.ovim(ovim_configuration)
         ovim.start_service()
 
+        #delete old unneeded vim_actions
+        clean_db(mydb)
+
+        # starts vim_threads
         from_= 'tenants_datacenters as td join datacenters as d on td.datacenter_id=d.uuid join '\
                 'datacenter_tenants as dt on td.datacenter_tenant_id=dt.uuid'
         select_ = ('type', 'd.config as config', 'd.uuid as datacenter_id', 'vim_url', 'vim_url_admin',
@@ -224,6 +229,37 @@ def get_version():
     return  ("openmanod version {} {}\n(c) Copyright Telefonica".format(global_config["version"],
                                                                         global_config["version_date"] ))
 
+def clean_db(mydb):
+    """
+    Clean unused or old entries at database to avoid unlimited growing
+    :param mydb: database connector
+    :return: None
+    """
+    # get and delete unused vim_actions: all elements deleted, one week before, instance not present
+    now = t.time()-3600*24*7
+    instance_action_id = None
+    nb_deleted = 0
+    while True:
+        actions_to_delete = mydb.get_rows(
+            SELECT=("item", "item_id", "instance_action_id"),
+            FROM="vim_actions as va join instance_actions as ia on va.instance_action_id=ia.uuid "
+                    "left join instance_scenarios as i on ia.instance_id=i.uuid",
+            WHERE={"va.action": "DELETE", "va.modified_at<": now, "i.uuid": None,
+                   "va.status": ("DONE", "SUPERSEDED")},
+            LIMIT=100
+        )
+        for to_delete in actions_to_delete:
+            mydb.delete_row(FROM="vim_actions", WHERE=to_delete)
+            if instance_action_id != to_delete["instance_action_id"]:
+                instance_action_id = to_delete["instance_action_id"]
+                mydb.delete_row(FROM="instance_actions", WHERE={"uuid": instance_action_id})
+        nb_deleted += len(actions_to_delete)
+        if len(actions_to_delete) < 100:
+            break
+    if nb_deleted:
+        logger.debug("Removed {} unused vim_actions".format(nb_deleted))
+
+
 
 def get_flavorlist(mydb, vnf_id, nfvo_tenant=None):
     '''Obtain flavorList
@@ -1474,8 +1510,7 @@ def get_vnf_id(mydb, tenant_id, vnf_id):
     content = mydb.get_rows(FROM='vnfs join vms on vnfs.uuid=vms.vnf_id join interfaces on vms.uuid=interfaces.vm_id',\
                                     SELECT=('interfaces.uuid as uuid','interfaces.external_name as external_name', 'vms.name as vm_name', 'interfaces.vm_id as vm_id', \
                                             'interfaces.internal_name as internal_name', 'interfaces.type as type', 'interfaces.vpci as vpci','interfaces.bw as bw'),\
-                                    WHERE={'vnfs.uuid': vnf_id},
-                                    WHERE_NOT={'interfaces.external_name': None} )
+                                    WHERE={'vnfs.uuid': vnf_id, 'interfaces.external_name<>': None} )
     #print content
     data['vnf']['external-connections'] = content
 
@@ -1662,8 +1697,7 @@ def new_scenario(mydb, tenant_id, topo):
 
 #1.2: Check that VNF are present at database table vnfs. Insert uuid, description and external interfaces
     for name,vnf in vnfs.items():
-        where={}
-        where_or={"tenant_id": tenant_id, 'public': "true"}
+        where = {"OR": {"tenant_id": tenant_id, 'public': "true"}}
         error_text = ""
         error_pos = "'topology':'nodes':'" + name + "'"
         if 'vnf_id' in vnf:
@@ -1672,14 +1706,12 @@ def new_scenario(mydb, tenant_id, topo):
         if 'VNF model' in vnf:
             error_text += " 'VNF model' " +  vnf['VNF model']
             where['name'] = vnf['VNF model']
-        if len(where) == 0:
+        if len(where) == 1:
             raise NfvoException("Descriptor need a 'vnf_id' or 'VNF model' field at " + error_pos, HTTP_Bad_Request)
 
         vnf_db = mydb.get_rows(SELECT=('uuid','name','description'),
                                FROM='vnfs',
-                               WHERE=where,
-                               WHERE_OR=where_or,
-                               WHERE_AND_OR="AND")
+                               WHERE=where)
         if len(vnf_db)==0:
             raise NfvoException("unknown" + error_text + " at " + error_pos, HTTP_Not_Found)
         elif len(vnf_db)>1:
@@ -1689,7 +1721,7 @@ def new_scenario(mydb, tenant_id, topo):
         #get external interfaces
         ext_ifaces = mydb.get_rows(SELECT=('external_name as name','i.uuid as iface_uuid', 'i.type as type'),
             FROM='vnfs join vms on vnfs.uuid=vms.vnf_id join interfaces as i on vms.uuid=i.vm_id',
-            WHERE={'vnfs.uuid':vnf['uuid']}, WHERE_NOT={'external_name':None} )
+            WHERE={'vnfs.uuid':vnf['uuid'], 'external_name<>': None} )
         for ext_iface in ext_ifaces:
             vnf['ifaces'][ ext_iface['name'] ] = {'uuid':ext_iface['iface_uuid'], 'type':ext_iface['type']}
 
@@ -1928,8 +1960,7 @@ def new_scenario_v02(mydb, tenant_id, scenario_dict, version):
 
     # 1: Check that VNF are present at database table vnfs and update content into scenario dict
     for name,vnf in scenario["vnfs"].iteritems():
-        where={}
-        where_or={"tenant_id": tenant_id, 'public': "true"}
+        where = {"OR": {"tenant_id": tenant_id, 'public': "true"}}
         error_text = ""
         error_pos = "'scenario':'vnfs':'" + name + "'"
         if 'vnf_id' in vnf:
@@ -1938,13 +1969,11 @@ def new_scenario_v02(mydb, tenant_id, scenario_dict, version):
         if 'vnf_name' in vnf:
             error_text += " 'vnf_name' " + vnf['vnf_name']
             where['name'] = vnf['vnf_name']
-        if len(where) == 0:
+        if len(where) == 1:
             raise NfvoException("Needed a 'vnf_id' or 'vnf_name' at " + error_pos, HTTP_Bad_Request)
         vnf_db = mydb.get_rows(SELECT=('uuid', 'name', 'description'),
                                FROM='vnfs',
-                               WHERE=where,
-                               WHERE_OR=where_or,
-                               WHERE_AND_OR="AND")
+                               WHERE=where)
         if len(vnf_db) == 0:
             raise NfvoException("Unknown" + error_text + " at " + error_pos, HTTP_Not_Found)
         elif len(vnf_db) > 1:
@@ -1955,7 +1984,7 @@ def new_scenario_v02(mydb, tenant_id, scenario_dict, version):
         # get external interfaces
         ext_ifaces = mydb.get_rows(SELECT=('external_name as name', 'i.uuid as iface_uuid', 'i.type as type'),
                                    FROM='vnfs join vms on vnfs.uuid=vms.vnf_id join interfaces as i on vms.uuid=i.vm_id',
-                                   WHERE={'vnfs.uuid':vnf['uuid']}, WHERE_NOT={'external_name': None} )
+                                   WHERE={'vnfs.uuid':vnf['uuid'], 'external_name<>': None} )
         for ext_iface in ext_ifaces:
             vnf['ifaces'][ ext_iface['name'] ] = {'uuid':ext_iface['iface_uuid'], 'type': ext_iface['type']}
         # TODO? get internal-connections from db.nets and their profiles, and update scenario[vnfs][internal-connections] accordingly
@@ -3224,6 +3253,7 @@ def delete_instance(mydb, tenant_id, instance_id):
     myvims = {}
     myvim_threads = {}
     vimthread_affected = {}
+    net2vm_dependencies = {}
 
     task_index = 0
     instance_action_id = get_task_id()
@@ -3262,36 +3292,29 @@ def delete_instance(mydb, tenant_id, instance_id):
             if not myvim:
                 error_msg += "\n    VM id={} cannot be deleted because datacenter={} not found".format(vm['vim_vm_id'], sce_vnf["datacenter_id"])
                 continue
-            try:
-                db_vim_action = {
-                    "instance_action_id": instance_action_id,
-                    "task_index": task_index,
-                    "datacenter_vim_id": sce_vnf["datacenter_tenant_id"],
-                    "action": "DELETE",
-                    "status": "SCHEDULED",
-                    "item": "instance_vms",
-                    "item_id": vm["uuid"],
-                    "extra": yaml.safe_dump({"params": vm["interfaces"]},
-                                            default_flow_style=True, width=256)
-                }
-                task_index += 1
-                db_vim_actions.append(db_vim_action)
-
-            except vimconn.vimconnNotFoundException as e:
-                error_msg+="\n    VM VIM_id={} not found at datacenter={}".format(vm['vim_vm_id'], sce_vnf["datacenter_id"])
-                logger.warn("VM instance '%s'uuid '%s', VIM id '%s', from VNF_id '%s' not found",
-                    vm['name'], vm['uuid'], vm['vim_vm_id'], sce_vnf['vnf_id'])
-            except vimconn.vimconnException as e:
-                error_msg+="\n    VM VIM_id={} at datacenter={} Error: {} {}".format(vm['vim_vm_id'], sce_vnf["datacenter_id"], e.http_code, str(e))
-                logger.error("Error %d deleting VM instance '%s'uuid '%s', VIM_id '%s', from VNF_id '%s': %s",
-                    e.http_code, vm['name'], vm['uuid'], vm['vim_vm_id'], sce_vnf['vnf_id'], str(e))
+            db_vim_action = {
+                "instance_action_id": instance_action_id,
+                "task_index": task_index,
+                "datacenter_vim_id": sce_vnf["datacenter_tenant_id"],
+                "action": "DELETE",
+                "status": "SCHEDULED",
+                "item": "instance_vms",
+                "item_id": vm["uuid"],
+                "extra": yaml.safe_dump({"params": vm["interfaces"]},
+                                        default_flow_style=True, width=256)
+            }
+            db_vim_actions.append(db_vim_action)
+            for interface in vm["interfaces"]:
+                if not interface.get("instance_net_id"):
+                    continue
+                if interface["instance_net_id"] not in net2vm_dependencies:
+                    net2vm_dependencies[interface["instance_net_id"]] = []
+                net2vm_dependencies[interface["instance_net_id"]].append(task_index)
+            task_index += 1
 
     # 2.2 deleting NETS
     # net_fail_list=[]
     for net in instanceDict['nets']:
-        # TODO if not net['created']:
-        # TODO    continue #skip not created nets
-
         vimthread_affected[net["datacenter_tenant_id"]] = None
         datacenter_key = (net["datacenter_id"], net["datacenter_tenant_id"])
         if datacenter_key not in myvims:
@@ -3314,31 +3337,21 @@ def delete_instance(mydb, tenant_id, instance_id):
         if not myvim:
             error_msg += "\n    Net VIM_id={} cannot be deleted because datacenter={} not found".format(net['vim_net_id'], net["datacenter_id"])
             continue
-        try:
-            db_vim_action = {
-                "instance_action_id": instance_action_id,
-                "task_index": task_index,
-                "datacenter_vim_id": net["datacenter_tenant_id"],
-                "action": "DELETE",
-                "status": "SCHEDULED",
-                "item": "instance_nets",
-                "item_id": net["uuid"],
-                "extra": yaml.safe_dump({"params": (net['vim_net_id'], net['sdn_net_id'])},
-                                        default_flow_style=True, width=256)
-            }
-            task_index += 1
-            db_vim_actions.append(db_vim_action)
-
-        except vimconn.vimconnNotFoundException as e:
-            error_msg += "\n    NET VIM_id={} not found at datacenter={}".format(net['vim_net_id'], net["datacenter_id"])
-            logger.warn("NET '%s', VIM_id '%s', from VNF_net_id '%s' not found",
-                        net['uuid'], net['vim_net_id'], str(net['vnf_net_id']))
-        except vimconn.vimconnException as e:
-            error_msg += "\n    NET VIM_id={} at datacenter={} Error: {} {}".format(net['vim_net_id'],
-                                                                                    net["datacenter_id"],
-                                                                                    e.http_code, str(e))
-            logger.error("Error %d deleting NET '%s', VIM_id '%s', from VNF_net_id '%s': %s",
-                         e.http_code, net['uuid'], net['vim_net_id'], str(net['vnf_net_id']), str(e))
+        extra = {"params": (net['vim_net_id'], net['sdn_net_id'])}
+        if net2vm_dependencies.get(net["uuid"]):
+            extra["depends_on"] = net2vm_dependencies[net["uuid"]]
+        db_vim_action = {
+            "instance_action_id": instance_action_id,
+            "task_index": task_index,
+            "datacenter_vim_id": net["datacenter_tenant_id"],
+            "action": "DELETE",
+            "status": "SCHEDULED",
+            "item": "instance_nets",
+            "item_id": net["uuid"],
+            "extra": yaml.safe_dump(extra, default_flow_style=True, width=256)
+        }
+        task_index += 1
+        db_vim_actions.append(db_vim_action)
 
     db_instance_action["number_tasks"] = task_index
     db_tables = [
index 2c30fb9..69e7f8c 100644 (file)
 """"
 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
 The tasks are stored at database in table vim_actions
-The task content are (M: stored at memory, D: stored at database):
+The task content is (M: stored at memory, D: stored at database):
     MD  instance_action_id:  reference a global action over an instance-scenario: database instance_actions
-    MD  task_index:     index number of the task. This with the previous are a key
+    MD  task_index:     index number of the task. This together with the previous forms a unique key identifier
     MD  datacenter_vim_id:  should contain the uuid of the VIM managed by this thread
     MD  vim_id:     id of the vm,net,etc at VIM
     MD  action:     CREATE, DELETE, FIND
     MD  item:       database table name, can be instance_vms, instance_nets, TODO: datacenter_flavors, datacenter_images
-    MD  item_id:    uuid of the referenced entry in the preious table
+    MD  item_id:    uuid of the referenced entry in the previous table
     MD  status:     SCHEDULED,BUILD,DONE,FAILED,SUPERSEDED
     MD  extra:      text with yaml format at database, dict at memory with:
             params:     list with the params to be sent to the VIM for CREATE or FIND. For DELETE the vim_id is taken from other related tasks
             find:       (only for CREATE tasks) if present it should FIND before creating and use if existing. Contains the FIND params
-            depends_on: list with the 'task_index'es of tasks that must be completed before. e.g. a vm depends on a net
+            depends_on: list with the 'task_index'es of tasks that must be completed before. e.g. a vm creation depends on a net creation
             sdn_net_id: used for net.
             tries:
             interfaces: used for VMs. Each key is the uuid of the instance_interfaces entry at database
@@ -68,9 +68,6 @@ from lib_osm_openvim.ovim import ovimException
 __author__ = "Alfonso Tierno, Pablo Montes"
 __date__ = "$28-Sep-2017 12:07:15$"
 
-# from logging import Logger
-# import auxiliary_functions as af
-
 
 def is_task_id(task_id):
     return task_id.startswith("TASK-")
@@ -141,34 +138,57 @@ class vim_thread(threading.Thread):
         task_list = []
         old_action_key = None
 
-        with self.db_lock:
-            vim_actions = self.db.get_rows(FROM="vim_actions",
-                                           WHERE={"datacenter_vim_id":  self.datacenter_tenant_id },
-                                           ORDER_BY=("item", "item_id", "created_at",))
-        for task in vim_actions:
-            item = task["item"]
-            item_id = task["item_id"]
-            action_key = item + item_id
-            if old_action_key != action_key:
-                if not action_completed and task_list:
-                    # This will fill needed task parameters into memory, and insert the task if needed in
-                    # self.pending_tasks or self.refresh_tasks
-                    self._insert_pending_tasks(task_list)
-                task_list = []
-                old_action_key = action_key
-                action_completed = False
-            elif action_completed:
-                continue
+        old_item_id = ""
+        old_item = ""
+        old_created_at = 0.0
+        database_limit = 200
+        while True:
+            # get 200 (database_limit) entries each time
+            with self.db_lock:
+                vim_actions = self.db.get_rows(FROM="vim_actions",
+                                               WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
+                                                      "item_id>=": old_item_id},
+                                               ORDER_BY=("item_id", "item", "created_at",),
+                                               LIMIT=database_limit)
+            for task in vim_actions:
+                item = task["item"]
+                item_id = task["item_id"]
+
+                # skip the first entries that are already processed in the previous pool of 200
+                if old_item_id:
+                    if item_id == old_item_id and item == old_item and task["created_at"] == old_created_at:
+                        old_item_id = False  # next one will be a new un-processed task
+                    continue
 
-            if task["status"] == "SCHEDULED" or task["action"] == "CREATE" or task["action"] == "FIND":
-                task_list.append(task)
-            elif task["action"] == "DELETE":
-                # action completed because deleted and status is not SCHEDULED. Not needed anything
-                action_completed = True
+                action_key = item + item_id
+                if old_action_key != action_key:
+                    if not action_completed and task_list:
+                        # This will fill needed task parameters into memory, and insert the task if needed in
+                        # self.pending_tasks or self.refresh_tasks
+                        self._insert_pending_tasks(task_list)
+                    task_list = []
+                    old_action_key = action_key
+                    action_completed = False
+                elif action_completed:
+                    continue
 
+                if task["status"] == "SCHEDULED" or task["action"] == "CREATE" or task["action"] == "FIND":
+                    task_list.append(task)
+                elif task["action"] == "DELETE":
+                    # action completed because deleted and status is not SCHEDULED. Not needed anything
+                    action_completed = True
+            if len(vim_actions) == database_limit:
+                # update variables for get the next database iteration
+                old_item_id = item_id
+                old_item = item
+                old_created_at = task["created_at"]
+            else:
+                break
         # Last actions group need to be inserted too
         if not action_completed and task_list:
             self._insert_pending_tasks(task_list)
+        self.logger.debug("reloaded vim actions pending:{} refresh:{}".format(
+            len(self.pending_tasks), len(self.refresh_tasks)))
 
     def _refres_elements(self):
         """Call VIM to get VMs and networks status until 10 elements"""
@@ -347,13 +367,13 @@ class vim_thread(threading.Thread):
                     if vim_info.get("error_msg"):
                         vim_info["error_msg"] = self._format_vim_error_msg(vim_info["error_msg"])
                     if task_vim_status != vim_info["status"] or task_error_msg != vim_info.get("error_msg") or \
-                            task_vim_info != vim_info["vim_info"]:
+                            task_vim_info != vim_info.get("vim_info"):
                         task["extra"]["vim_status"] = vim_info["status"]
                         task["error_msg"] = vim_info.get("error_msg")
-                        task["vim_info"] = vim_info["vim_info"]
+                        task["vim_info"] = vim_info.get("vim_info")
                         temp_dict = {"status": vim_info["status"],
                                          "error_msg": vim_info.get("error_msg"),
-                                         "vim_info": vim_info["vim_info"]}
+                                         "vim_info": vim_info.get("vim_info")}
                         with self.db_lock:
                             self.db.update_rows('instance_nets', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]})
                             self.db.update_rows(
@@ -416,45 +436,79 @@ class vim_thread(threading.Thread):
         while self.pending_tasks:
             task = self.pending_tasks.pop(0)
             nb_processed += 1
-            if task["status"] == "SUPERSEDED":
-                # not needed to do anything but update database with the new status
-                result = True
-                database_update = None
-            elif not self.vim:
-                task["status"] == "ERROR"
-                task["error_msg"] = self.error_status
-                result = False
-                database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]}
-            elif task["item"] == 'instance_vms':
-                if task["action"] == "CREATE":
-                    result, database_update = self.new_vm(task)
-                    nb_created += 1
-                elif task["action"] == "DELETE":
-                    result, database_update = self.del_vm(task)
-                else:
-                    raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
-            elif task["item"] == 'instance_nets':
-                if task["action"] == "CREATE":
-                    result, database_update = self.new_net(task)
-                    nb_created += 1
-                elif task["action"] == "DELETE":
-                    result, database_update = self.del_net(task)
-                elif task["action"] == "FIND":
-                    result, database_update = self.get_net(task)
+            try:
+                # check if tasks that this depends on have been completed
+                dependency_not_completed = False
+                for task_index in task["extra"].get("depends_on", ()):
+                    task_dependency = task["depends"].get("TASK-" + str(task_index))
+                    if not task_dependency:
+                        task_dependency = self._look_for_task(task["instance_action_id"], "TASK-" + str(task_index))
+                        if not task_dependency:
+                            raise VimThreadException(
+                                "Cannot get depending net task trying to get depending task {}.{}".format(
+                                    task["instance_action_id"], task_index))
+                        # task["depends"]["TASK-" + str(task_index)] = task_dependency #it references another object,so database must be look again
+                    if task_dependency["status"] == "SCHEDULED":
+                        dependency_not_completed = True
+                        break
+                    elif task_dependency["status"] == "FAILED":
+                        raise VimThreadException(
+                            "Cannot {} {}, task {}.{} {} because depends on failed {} {}, task{}.{}"
+                            "task {}.{}".format(task["action"], task["item"],
+                                                task["instance_action_id"], task["task_index"],
+                                                task_dependency["instance_action_id"], task_dependency["task_index"],
+                                                task_dependency["action"], task_dependency["item"] ))
+                if dependency_not_completed:
+                    # Move this task to the end.
+                    task["extra"]["tries"] = task["extra"].get("tries", 0) + 1
+                    if task["extra"]["tries"] <= 3:
+                        self.pending_tasks.append(task)
+                        continue
+                    else:
+                        raise VimThreadException(
+                            "Cannot {} {}, (task {}.{}) because timeout waiting to complete {} {}, "
+                            "(task {}.{})".format(task["action"], task["item"],
+                                                  task["instance_action_id"], task["task_index"],
+                                                  task_dependency["instance_action_id"], task_dependency["task_index"],
+                                                  task_dependency["action"], task_dependency["item"]))
+
+                if task["status"] == "SUPERSEDED":
+                    # not needed to do anything but update database with the new status
+                    result = True
+                    database_update = None
+                elif not self.vim:
+                    task["status"] == "ERROR"
+                    task["error_msg"] = self.error_status
+                    result = False
+                    database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]}
+                elif task["item"] == 'instance_vms':
+                    if task["action"] == "CREATE":
+                        result, database_update = self.new_vm(task)
+                        nb_created += 1
+                    elif task["action"] == "DELETE":
+                        result, database_update = self.del_vm(task)
+                    else:
+                        raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
+                elif task["item"] == 'instance_nets':
+                    if task["action"] == "CREATE":
+                        result, database_update = self.new_net(task)
+                        nb_created += 1
+                    elif task["action"] == "DELETE":
+                        result, database_update = self.del_net(task)
+                    elif task["action"] == "FIND":
+                        result, database_update = self.get_net(task)
+                    else:
+                        raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
                 else:
-                    raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
-            else:
-                raise vimconn.vimconnException(self.name + "unknown task item {}".format(task["item"]))
-                # TODO
+                    raise vimconn.vimconnException(self.name + "unknown task item {}".format(task["item"]))
+                    # TODO
+            except VimThreadException as e:
+                result = False
+                task["error_msg"] = str(e)
+                task["status"] = "FAILED"
+                database_update = {"status": "VIM_ERROR", "vim_vm_id": None, "error_msg": task["error_msg"]}
 
-            if task["status"] == "SCHEDULED":
-                # This is because a depend task is not completed. Moved to the end. NOT USED YET
-                if task["extra"].get("tries", 0) > 3:
-                    task["status"] == "FAILED"
-                else:
-                    task["extra"]["tries"] = task["extra"].get("tries", 0) + 1
-                    self.pending_tasks.append(task)
-            elif task["action"] == "DELETE":
+            if task["action"] == "DELETE":
                 action_key = task["item"] + task["item_id"]
                 del self.grouped_tasks[action_key]
             elif task["action"] in ("CREATE", "FIND") and task["status"] in ("DONE", "BUILD"):
@@ -469,7 +523,7 @@ class vim_thread(threading.Thread):
                 with self.db_lock:
                     self.db.update_rows(
                         table="vim_actions",
-                        UPDATE={"status": task["status"], "vim_id": task["vim_id"], "modified_at": now,
+                        UPDATE={"status": task["status"], "vim_id": task.get("vim_id"), "modified_at": now,
                                 "error_msg": task["error_msg"],
                                 "extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256)},
                         WHERE={"instance_action_id": task["instance_action_id"], "task_index": task["task_index"]})
@@ -641,18 +695,18 @@ class vim_thread(threading.Thread):
             error_text = ""
             for net in net_list:
                 if "net_id" in net and is_task_id(net["net_id"]):  # change task_id into network_id
-                    if net["net_id"] in depends:
-                        task_net = depends[net["net_id"]]
-                    else:
-                        task_net = self._look_for_task(task["instance_action_id"], net["net_id"])
-                    if not task_net:
-                        raise VimThreadException(
-                            "Error trying to get depending task from task_index={}".format(net["net_id"]))
-                    network_id = task_net.get("vim_id")
+                    task_dependency = task["depends"].get(net["net_id"])
+                    if not task_dependency:
+                        task_dependency = self._look_for_task(task["instance_action_id"], net["net_id"])
+                        if not task_dependency:
+                            raise VimThreadException(
+                                "Cannot get depending net task trying to get depending task {}.{}".format(
+                                    task["instance_action_id"], net["net_id"]))
+                    network_id = task_dependency.get("vim_id")
                     if not network_id:
                         raise VimThreadException(
                             "Cannot create VM because depends on a network not created or found: " +
-                            str(task_net["error_msg"]))
+                            str(depends[net["net_id"]]["error_msg"]))
                     net["net_id"] = network_id
             vim_vm_id, created_items = self.vim.new_vminstance(*params)