X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_ro%2Fvim_thread.py;h=48c8e326b872193a85b9632ef56ebd23496d6ca1;hb=00e29ddd2bc186460698cb2431f6751daa24eca8;hp=c981e31f56f603c92ca5c5d052b9b373482128ae;hpb=fafbf29413f4cc3002cbf1b88effcd832be5c602;p=osm%2FRO.git diff --git a/osm_ro/vim_thread.py b/osm_ro/vim_thread.py index c981e31f..48c8e326 100644 --- a/osm_ro/vim_thread.py +++ b/osm_ro/vim_thread.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- ## -# Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U. +# Copyright 2015 Telefonica Investigacion y Desarrollo, S.A.U. # This file is part of openvim # All Rights Reserved. # @@ -23,7 +23,7 @@ """" This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM. -The tasks are stored at database in table vim_actions +The tasks are stored at database in table vim_wim_actions The task content is (M: stored at memory, D: stored at database): MD instance_action_id: reference a global action over an instance-scenario: database instance_actions MD task_index: index number of the task. This together with the previous forms a unique key identifier @@ -49,8 +49,8 @@ The task content is (M: stored at memory, D: stored at database): vim_status: VIM status of the element. Stored also at database in the instance_XXX M depends: dict with task_index(from depends_on) to task class M params: same as extra[params] but with the resolved dependencies - M vim_interfaces: similar to extra[interfaces] but with VIM information. Stored at database in the instance_XXX but not at vim_actions - M vim_info: Detailed information of a vm,net from the VIM. Stored at database in the instance_XXX but not at vim_actions + M vim_interfaces: similar to extra[interfaces] but with VIM information. Stored at database in the instance_XXX but not at vim_wim_actions + M vim_info: Detailed information of a vm,net from the VIM. Stored at database in the instance_XXX but not at vim_wim_actions MD error_msg: descriptive text upon an error.Stored also at database instance_XXX MD created_at: task creation time MD modified_at: last task update time. On refresh it contains when this task need to be refreshed @@ -189,7 +189,7 @@ class vim_thread(threading.Thread): while True: # get 200 (database_limit) entries each time with self.db_lock: - vim_actions = self.db.get_rows(FROM="vim_actions", + vim_actions = self.db.get_rows(FROM="vim_wim_actions", WHERE={"datacenter_vim_id": self.datacenter_tenant_id, "item_id>=": old_item_id}, ORDER_BY=("item_id", "item", "created_at",), @@ -393,7 +393,7 @@ class vim_thread(threading.Thread): if task_need_update: with self.db_lock: self.db.update_rows( - 'vim_actions', + 'vim_wim_actions', UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256), "error_msg": task.get("error_msg"), "modified_at": now}, WHERE={'instance_action_id': task['instance_action_id'], @@ -463,7 +463,7 @@ class vim_thread(threading.Thread): with self.db_lock: self.db.update_rows('instance_nets', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]}) self.db.update_rows( - 'vim_actions', + 'vim_wim_actions', UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256), "error_msg": task.get("error_msg"), "modified_at": now}, WHERE={'instance_action_id': task['instance_action_id'], @@ -644,7 +644,7 @@ class vim_thread(threading.Thread): now = time.time() with self.db_lock: self.db.update_rows( - table="vim_actions", + table="vim_wim_actions", UPDATE={"status": task["status"], "vim_id": task.get("vim_id"), "modified_at": now, "error_msg": task["error_msg"], "extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256)}, @@ -811,7 +811,7 @@ class vim_thread(threading.Thread): instance_action_id = ins_action_id with self.db_lock: - tasks = self.db.get_rows(FROM="vim_actions", WHERE={"instance_action_id": instance_action_id, + tasks = self.db.get_rows(FROM="vim_wim_actions", WHERE={"instance_action_id": instance_action_id, "task_index": task_index}) if not tasks: return None @@ -1064,15 +1064,14 @@ class vim_thread(threading.Thread): def new_sfi(self, task): vim_sfi_id = None try: - params = task["params"] + dep_id = "TASK-" + str(task["extra"]["depends_on"][0]) task_id = task["instance_action_id"] + "." + str(task["task_index"]) - depends = task.get("depends") error_text = "" - interfaces = task.get("depends").values()[0].get("extra").get("params")[5] + interfaces = task.get("depends").get(dep_id).get("extra").get("interfaces").keys() # At the moment, every port associated with the VM will be used both as ingress and egress ports. # Bear in mind that different VIM connectors might support SFI differently. In the case of OpenStack, only the # first ingress and first egress ports will be used to create the SFI (Port Pair). - port_id_list = [interfaces[0].get("vim_id")] + port_id_list = [interfaces[0]] name = "sfi-%s" % task["item_id"][:8] # By default no form of IETF SFC Encapsulation will be used vim_sfi_id = self.vim.new_sfi(name, port_id_list, port_id_list, sfc_encap=False) @@ -1113,12 +1112,11 @@ class vim_thread(threading.Thread): def new_sf(self, task): vim_sf_id = None try: - params = task["params"] task_id = task["instance_action_id"] + "." + str(task["task_index"]) - depends = task.get("depends") error_text = "" + depending_tasks = [ "TASK-" + str(dep_id) for dep_id in task["extra"]["depends_on"]] #sfis = task.get("depends").values()[0].get("extra").get("params")[5] - sfis = task.get("depends").values() + sfis = [task.get("depends").get(dep_task) for dep_task in depending_tasks] sfi_id_list = [] for sfi in sfis: sfi_id_list.append(sfi.get("vim_id")) @@ -1164,9 +1162,9 @@ class vim_thread(threading.Thread): try: params = task["params"] task_id = task["instance_action_id"] + "." + str(task["task_index"]) - depends = task.get("depends") + depending_task = "TASK-" + str(task.get("extra").get("depends_on")[0]) error_text = "" - interfaces = task.get("depends").values()[0].get("extra").get("params")[5] + interfaces = task.get("depends").get(depending_task).get("vim_interfaces").keys() # Bear in mind that different VIM connectors might support Classifications differently. # In the case of OpenStack, only the first VNF attached to the classifier will be used # to create the Classification(s) (the "logical source port" of the "Flow Classifier"). @@ -1188,7 +1186,7 @@ class vim_thread(threading.Thread): if '/' not in destination_ip: destination_ip += '/32' definition = { - "logical_source_port": interfaces[0].get("vim_id"), + "logical_source_port": interfaces[0], "protocol": ip_proto, "source_ip_prefix": source_ip, "destination_ip_prefix": destination_ip, @@ -1239,12 +1237,11 @@ class vim_thread(threading.Thread): try: params = task["params"] task_id = task["instance_action_id"] + "." + str(task["task_index"]) - depends = task.get("depends") + depending_tasks = [task.get("depends").get("TASK-" + str(tsk_id)) for tsk_id in task.get("extra").get("depends_on")] error_text = "" - deps = task.get("depends").values() sf_id_list = [] classification_id_list = [] - for dep in deps: + for dep in depending_tasks: vim_id = dep.get("vim_id") resource = dep.get("item") if resource == "instance_sfs":