blob: 1fba7a60fc573418f37cd8a6a6c050f960481203 [file] [log] [blame]
tierno42026a02017-02-10 15:13:40 +01001# -*- coding: utf-8 -*-
2
3##
4# Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
5# This file is part of openvim
6# All Rights Reserved.
7#
8# Licensed under the Apache License, Version 2.0 (the "License"); you may
9# not use this file except in compliance with the License. You may obtain
10# a copy of the License at
11#
12# http://www.apache.org/licenses/LICENSE-2.0
13#
14# Unless required by applicable law or agreed to in writing, software
15# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17# License for the specific language governing permissions and limitations
18# under the License.
19#
20# For those usages not covered by the Apache License, Version 2.0 please
21# contact with: nfvlabs@tid.es
22##
23
tierno99314902017-04-26 13:23:09 +020024""""
tierno868220c2017-09-26 00:11:05 +020025This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
26The tasks are stored at database in table vim_actions
27The task content is:
28 MD instance_action_id: reference a global action over an instance-scenario: database instance_actions
29 MD task_index: index number of the task. This with the previous are a key
30 MD datacenter_vim_id: should contain the uuid of the VIM managed by this thread
31 MD vim_id: id of the vm,net,etc at VIM
32 MD action: CREATE, DELETE, LOOK, TODO: LOOK_CREATE
33 MD item: database table name, can be instance_vms, instance_nets, TODO: datacenter_flavors, datacenter_images
34 MD item_id: uuid of the referenced entry in the preious table
35 MD status: SCHEDULED,BUILD,DONE,FAILED,SUPERSEDED
36 MD extra: text with yaml format at database, dict at memory with:
37 params: list with the params to be sent to the VIM for CREATE or LOOK. For DELETE the vim_id is taken from other related tasks
38 depends_on: list with the 'task_index'es of tasks that must be completed before. e.g. a vm depends on a net
39 sdn_net_id: used for net.
40 tries:
41 interfaces: used for VMs. Each key is the uuid of the instance_interfaces entry at database
42 iface_id: uuid of intance_interfaces
43 sdn_port_id:
44 sdn_net_id:
45 created: False if the VIM element is not created by other actions, and it should not be deleted
46 vim_status: VIM status of the element. Stored also at database in the instance_XXX
47 M depends: dict with task_index(from depends_on) to task class
48 M params: same as extra[params] but with the resolved dependencies
49 M vim_interfaces: similar to extra[interfaces] but with VIM information. Stored at database in the instance_XXX but not at vim_actions
50 M vim_info: Detailed information of a vm,net from the VIM. Stored at database in the instance_XXX but not at vim_actions
51 MD error_msg: descriptive text upon an error.Stored also at database instance_XXX
52 MD created_at: task creation time
53 MD modified_at: last task update time. On refresh it contains when this task need to be refreshed
54
tierno99314902017-04-26 13:23:09 +020055"""
tierno42026a02017-02-10 15:13:40 +010056
57import threading
58import time
59import Queue
60import logging
61import vimconn
tierno868220c2017-09-26 00:11:05 +020062import yaml
tiernob3d36742017-03-03 23:51:05 +010063from db_base import db_base_Exception
tierno01b3e172017-04-21 10:52:34 +020064from lib_osm_openvim.ovim import ovimException
tierno42026a02017-02-10 15:13:40 +010065
tierno99314902017-04-26 13:23:09 +020066__author__ = "Alfonso Tierno, Pablo Montes"
tierno868220c2017-09-26 00:11:05 +020067__date__ = "$28-Sep-2017 12:07:15$"
tierno42026a02017-02-10 15:13:40 +010068
69# from logging import Logger
70# import auxiliary_functions as af
71
tiernob3d36742017-03-03 23:51:05 +010072
tierno99314902017-04-26 13:23:09 +020073def is_task_id(task_id):
tierno868220c2017-09-26 00:11:05 +020074 return task_id.startswith("TASK-")
75
76
77class VimThreadException(Exception):
78 pass
tierno42026a02017-02-10 15:13:40 +010079
80
81class vim_thread(threading.Thread):
garciadeblasb334d442017-06-02 14:32:47 +020082 REFRESH_BUILD = 5 # 5 seconds
83 REFRESH_ACTIVE = 60 # 1 minute
tierno42026a02017-02-10 15:13:40 +010084
tierno9c22f2d2017-10-09 16:23:55 +020085 def __init__(self, myvimconn, task_lock, name=None, datacenter_name=None, datacenter_tenant_id=None,
tierno99314902017-04-26 13:23:09 +020086 db=None, db_lock=None, ovim=None):
tiernob3d36742017-03-03 23:51:05 +010087 """Init a thread.
tierno42026a02017-02-10 15:13:40 +010088 Arguments:
89 'id' number of thead
90 'name' name of thread
91 'host','user': host ip or name to manage and user
92 'db', 'db_lock': database class and lock to use it in exclusion
tiernob3d36742017-03-03 23:51:05 +010093 """
tierno42026a02017-02-10 15:13:40 +010094 threading.Thread.__init__(self)
tierno9c22f2d2017-10-09 16:23:55 +020095 if isinstance(myvimconn, vimconn.vimconnException):
96 self.vim = None
97 self.error_status = "Error accesing to VIM: {}".format(myvimconn)
98 else:
99 self.vim = myvimconn
100 self.error_status = None
tiernob3d36742017-03-03 23:51:05 +0100101 self.datacenter_name = datacenter_name
102 self.datacenter_tenant_id = datacenter_tenant_id
Pablo Montes Moreno3fbff9b2017-03-08 11:28:15 +0100103 self.ovim = ovim
tierno42026a02017-02-10 15:13:40 +0100104 if not name:
tiernob3d36742017-03-03 23:51:05 +0100105 self.name = vimconn["id"] + "." + vimconn["config"]["datacenter_tenant_id"]
tierno42026a02017-02-10 15:13:40 +0100106 else:
107 self.name = name
108
109 self.logger = logging.getLogger('openmano.vim.'+self.name)
tiernob3d36742017-03-03 23:51:05 +0100110 self.db = db
111 self.db_lock = db_lock
tierno42026a02017-02-10 15:13:40 +0100112
tiernob3d36742017-03-03 23:51:05 +0100113 self.task_lock = task_lock
114 self.task_queue = Queue.Queue(2000)
tierno868220c2017-09-26 00:11:05 +0200115
116 self.refresh_tasks = []
tierno867ffe92017-03-27 12:50:34 +0200117 """Contains time ordered task list for refreshing the status of VIM VMs and nets"""
118
tierno868220c2017-09-26 00:11:05 +0200119 self.pending_tasks = []
120 """Contains time ordered task list for creation, deletion of VIM VMs and nets"""
121
122 self.grouped_tasks = {}
123 """ It contains all the creation/deletion pending tasks grouped by its concrete vm, net, etc
124 <item><item_id>:
125 - <task1> # e.g. CREATE task
126 <task2> # e.g. DELETE task
127 """
128
129 def _reload_vim_actions(self):
130 """
131 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
132 :return: None
133 """
134 action_completed = False
135 task_list = []
136 old_action_key = None
137
138 with self.db_lock:
139 vim_actions = self.db.get_rows(FROM="vim_actions",
140 WHERE={"datacenter_vim_id": self.datacenter_tenant_id },
141 ORDER_BY=("item", "item_id", "created_at",))
142 for task in vim_actions:
143 item = task["item"]
144 item_id = task["item_id"]
145 action_key = item + item_id
146 if old_action_key != action_key:
147 if not action_completed and task_list:
148 # This will fill needed task parameters into memory, and insert the task if needed in
149 # self.pending_tasks or self.refresh_tasks
150 self._insert_pending_tasks(task_list)
151 task_list = []
152 old_action_key = action_key
153 action_completed = False
154 elif action_completed:
155 continue
156
157 if task["status"] == "SCHEDULED" or task["action"] == "CREATE" or task["action"] == "FIND":
158 task_list.append(task)
159 elif task["action"] == "DELETE":
160 # action completed because deleted and status is not SCHEDULED. Not needed anything
161 action_completed = True
162
163 # Last actions group need to be inserted too
164 if not action_completed and task_list:
165 self._insert_pending_tasks(task_list)
166
tierno867ffe92017-03-27 12:50:34 +0200167 def _refres_elements(self):
168 """Call VIM to get VMs and networks status until 10 elements"""
169 now = time.time()
tierno868220c2017-09-26 00:11:05 +0200170 nb_processed = 0
tierno867ffe92017-03-27 12:50:34 +0200171 vm_to_refresh_list = []
172 net_to_refresh_list = []
173 vm_to_refresh_dict = {}
174 net_to_refresh_dict = {}
175 items_to_refresh = 0
tierno868220c2017-09-26 00:11:05 +0200176 while self.refresh_tasks:
177 task = self.refresh_tasks[0]
tierno867ffe92017-03-27 12:50:34 +0200178 with self.task_lock:
tierno868220c2017-09-26 00:11:05 +0200179 if task['status'] == 'SUPERSEDED':
180 self.refresh_tasks.pop(0)
tierno867ffe92017-03-27 12:50:34 +0200181 continue
tierno868220c2017-09-26 00:11:05 +0200182 if task['modified_at'] > now:
tierno867ffe92017-03-27 12:50:34 +0200183 break
tierno868220c2017-09-26 00:11:05 +0200184 # task["status"] = "processing"
185 nb_processed += 1
186 self.refresh_tasks.pop(0)
187 if task["item"] == 'instance_vms':
tierno867ffe92017-03-27 12:50:34 +0200188 vm_to_refresh_list.append(task["vim_id"])
189 vm_to_refresh_dict[task["vim_id"]] = task
tierno868220c2017-09-26 00:11:05 +0200190 elif task["item"] == 'instance_nets':
tierno867ffe92017-03-27 12:50:34 +0200191 net_to_refresh_list.append(task["vim_id"])
192 net_to_refresh_dict[task["vim_id"]] = task
193 else:
tierno868220c2017-09-26 00:11:05 +0200194 error_text = "unknown task {}".format(task["item"])
tierno867ffe92017-03-27 12:50:34 +0200195 self.logger.error(error_text)
196 items_to_refresh += 1
197 if items_to_refresh == 10:
198 break
199
200 if vm_to_refresh_list:
201 try:
tierno868220c2017-09-26 00:11:05 +0200202 now = time.time()
tierno867ffe92017-03-27 12:50:34 +0200203 vim_dict = self.vim.refresh_vms_status(vm_to_refresh_list)
204 for vim_id, vim_info in vim_dict.items():
tierno868220c2017-09-26 00:11:05 +0200205 # look for task
206 task_need_update = False
tierno867ffe92017-03-27 12:50:34 +0200207 task = vm_to_refresh_dict[vim_id]
208 self.logger.debug("get-vm vm_id=%s result=%s", task["vim_id"], str(vim_info))
209
210 # update database
tierno868220c2017-09-26 00:11:05 +0200211 task_vim_info = task.get("vim_info")
212 task_error_msg = task.get("error_msg")
213 task_vim_status = task["extra"].get("vim_status")
tierno867ffe92017-03-27 12:50:34 +0200214 if vim_info.get("error_msg"):
215 vim_info["error_msg"] = self._format_vim_error_msg(vim_info["error_msg"])
tierno868220c2017-09-26 00:11:05 +0200216 if task_vim_status != vim_info["status"] or task_error_msg != vim_info.get("error_msg") or \
217 task_vim_info != vim_info.get("vim_info"):
tierno867ffe92017-03-27 12:50:34 +0200218 with self.db_lock:
tierno91baf982017-03-30 19:37:57 +0200219 temp_dict = {"status": vim_info["status"],
220 "error_msg": vim_info.get("error_msg"),
tierno868220c2017-09-26 00:11:05 +0200221 "vim_info": vim_info.get("vim_info")}
222 self.db.update_rows('instance_vms', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]})
223 task["extra"]["vim_status"] = vim_info["status"]
224 task["error_msg"] = vim_info.get("error_msg")
225 task["vim_info"] = vim_info.get("vim_info")
226 task_need_update = True
227 for interface in vim_info.get("interfaces", ()):
228 vim_interface_id = interface["vim_interface_id"]
229 if vim_interface_id not in task["extra"]["interfaces"]:
230 self.logger.critical("Interface not found {} on task info {}".format(
231 vim_interface_id, task["extra"]["interfaces"]), exc_info=True)
232 continue
233 task_interface = task["extra"]["interfaces"][vim_interface_id]
234 task_vim_interface = task["vim_interfaces"].get(vim_interface_id)
235 if task_vim_interface != interface:
236 # delete old port
tierno867ffe92017-03-27 12:50:34 +0200237 if task_interface.get("sdn_port_id"):
238 try:
tierno2391cc12017-08-02 12:48:13 +0200239 with self.db_lock:
240 self.ovim.delete_port(task_interface["sdn_port_id"])
241 task_interface["sdn_port_id"] = None
tierno868220c2017-09-26 00:11:05 +0200242 task_need_update = True
tierno867ffe92017-03-27 12:50:34 +0200243 except ovimException as e:
244 self.logger.error("ovimException deleting external_port={} ".format(
245 task_interface["sdn_port_id"]) + str(e), exc_info=True)
246 # TODO Set error_msg at instance_nets
tierno868220c2017-09-26 00:11:05 +0200247
248 # Create SDN port
249 sdn_net_id = task_interface.get("sdn_net_id")
250 if sdn_net_id and interface.get("compute_node") and interface.get("pci"):
251 sdn_port_name = sdn_net_id + "." + task["vim_id"]
252 sdn_port_name = sdn_port_name[:63]
253 try:
tierno2391cc12017-08-02 12:48:13 +0200254 with self.db_lock:
tierno868220c2017-09-26 00:11:05 +0200255 sdn_port_id = self.ovim.new_external_port(
256 {"compute_node": interface["compute_node"],
257 "pci": interface["pci"],
258 "vlan": interface.get("vlan"),
259 "net_id": sdn_net_id,
260 "region": self.vim["config"]["datacenter_id"],
261 "name": sdn_port_name,
262 "mac": interface.get("mac_address")})
263 task_interface["sdn_port_id"] = sdn_port_id
264 task_need_update = True
265 except (ovimException, Exception) as e:
266 self.logger.error(
267 "ovimException creating new_external_port compute_node={} "
268 "pci={} vlan={} ".format(
269 interface["compute_node"],
270 interface["pci"],
271 interface.get("vlan")) + str(e),
272 exc_info=True)
273 # TODO Set error_msg at instance_nets
274 with self.db_lock:
275 self.db.update_rows(
276 'instance_interfaces',
277 UPDATE={"mac_address": interface.get("mac_address"),
278 "ip_address": interface.get("ip_address"),
279 "vim_info": interface.get("vim_info"),
280 "sdn_port_id": task_interface.get("sdn_port_id"),
281 "compute_node": interface.get("compute_node"),
282 "pci": interface.get("pci"),
283 "vlan": interface.get("vlan"),
284 },
285 WHERE={'uuid': task_interface["iface_id"]})
286 task["vim_interfaces"][vim_interface_id] = interface
287 if task_need_update:
288 with self.db_lock:
289 self.db.update_rows(
290 'vim_actions',
291 UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256),
292 "error_msg": task.get("error_msg"), "modified_at": now},
293 WHERE={'instance_action_id': task['instance_action_id'],
294 'task_index': task['task_index']})
295 if task["extra"].get("vim_status") == "BUILD":
garciadeblasb334d442017-06-02 14:32:47 +0200296 self._insert_refresh(task, now + self.REFRESH_BUILD)
tierno91baf982017-03-30 19:37:57 +0200297 else:
garciadeblasb334d442017-06-02 14:32:47 +0200298 self._insert_refresh(task, now + self.REFRESH_ACTIVE)
tierno867ffe92017-03-27 12:50:34 +0200299 except vimconn.vimconnException as e:
300 self.logger.error("vimconnException Exception when trying to refresh vms " + str(e))
garciadeblasb334d442017-06-02 14:32:47 +0200301 self._insert_refresh(task, now + self.REFRESH_ACTIVE)
tierno867ffe92017-03-27 12:50:34 +0200302
tierno91baf982017-03-30 19:37:57 +0200303 if net_to_refresh_list:
304 try:
tierno868220c2017-09-26 00:11:05 +0200305 now = time.time()
tierno91baf982017-03-30 19:37:57 +0200306 vim_dict = self.vim.refresh_nets_status(net_to_refresh_list)
307 for vim_id, vim_info in vim_dict.items():
tierno868220c2017-09-26 00:11:05 +0200308 # look for task
tierno91baf982017-03-30 19:37:57 +0200309 task = net_to_refresh_dict[vim_id]
310 self.logger.debug("get-net net_id=%s result=%s", task["vim_id"], str(vim_info))
311
tierno868220c2017-09-26 00:11:05 +0200312 task_vim_info = task.get("vim_info")
313 task_vim_status = task["extra"].get("vim_status")
314 task_error_msg = task.get("error_msg")
315 task_sdn_net_id = task["extra"].get("sdn_net_id")
tierno91baf982017-03-30 19:37:57 +0200316
tierno868220c2017-09-26 00:11:05 +0200317 # get ovim status
318 if task_sdn_net_id:
319 try:
320 with self.db_lock:
321 sdn_net = self.ovim.show_network(task_sdn_net_id)
322 if sdn_net["status"] == "ERROR":
323 if not vim_info.get("error_msg"):
324 vim_info["error_msg"] = sdn_net["error_msg"]
325 else:
326 vim_info["error_msg"] = "VIM_ERROR: {} && SDN_ERROR: {}".format(
327 self._format_vim_error_msg(vim_info["error_msg"], 1024//2-14),
328 self._format_vim_error_msg(sdn_net["error_msg"], 1024//2-14))
329 if vim_info["status"] == "VIM_ERROR":
330 vim_info["status"] = "VIM_SDN_ERROR"
331 else:
332 vim_info["status"] = "SDN_ERROR"
333
334 except (ovimException, Exception) as e:
335 self.logger.error(
336 "ovimException getting network infor snd_net_id={}".format(task_sdn_net_id),
337 exc_info=True)
338 # TODO Set error_msg at instance_nets
tierno91baf982017-03-30 19:37:57 +0200339
340 # update database
341 if vim_info.get("error_msg"):
342 vim_info["error_msg"] = self._format_vim_error_msg(vim_info["error_msg"])
tierno868220c2017-09-26 00:11:05 +0200343 if task_vim_status != vim_info["status"] or task_error_msg != vim_info.get("error_msg") or \
344 task_vim_info != vim_info["vim_info"]:
345 task["extra"]["vim_status"] = vim_info["status"]
346 task["error_msg"] = vim_info.get("error_msg")
347 task["vim_info"] = vim_info["vim_info"]
348 temp_dict = {"status": vim_info["status"],
tierno91baf982017-03-30 19:37:57 +0200349 "error_msg": vim_info.get("error_msg"),
350 "vim_info": vim_info["vim_info"]}
tierno868220c2017-09-26 00:11:05 +0200351 with self.db_lock:
352 self.db.update_rows('instance_nets', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]})
353 self.db.update_rows(
354 'vim_actions',
355 UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256),
356 "error_msg": task.get("error_msg"), "modified_at": now},
357 WHERE={'instance_action_id': task['instance_action_id'],
358 'task_index': task['task_index']})
359 if task["extra"].get("vim_status") == "BUILD":
garciadeblasb334d442017-06-02 14:32:47 +0200360 self._insert_refresh(task, now + self.REFRESH_BUILD)
tierno91baf982017-03-30 19:37:57 +0200361 else:
garciadeblasb334d442017-06-02 14:32:47 +0200362 self._insert_refresh(task, now + self.REFRESH_ACTIVE)
tierno91baf982017-03-30 19:37:57 +0200363 except vimconn.vimconnException as e:
364 self.logger.error("vimconnException Exception when trying to refresh nets " + str(e))
garciadeblasb334d442017-06-02 14:32:47 +0200365 self._insert_refresh(task, now + self.REFRESH_ACTIVE)
tierno91baf982017-03-30 19:37:57 +0200366
tierno868220c2017-09-26 00:11:05 +0200367 return nb_processed
tierno867ffe92017-03-27 12:50:34 +0200368
tierno868220c2017-09-26 00:11:05 +0200369 def _insert_refresh(self, task, threshold_time=None):
370 """Insert a task at list of refreshing elements. The refreshing list is ordered by threshold_time (task['modified_at']
tierno867ffe92017-03-27 12:50:34 +0200371 It is assumed that this is called inside this thread
372 """
tierno9c22f2d2017-10-09 16:23:55 +0200373 if not self.vim:
374 return
tierno868220c2017-09-26 00:11:05 +0200375 if not threshold_time:
376 threshold_time = time.time()
377 task["modified_at"] = threshold_time
378 task_name = task["item"][9:] + "-" + task["action"]
379 task_id = task["instance_action_id"] + "." + str(task["task_index"])
380 for index in range(0, len(self.refresh_tasks)):
381 if self.refresh_tasks[index]["modified_at"] > threshold_time:
382 self.refresh_tasks.insert(index, task)
tierno867ffe92017-03-27 12:50:34 +0200383 break
384 else:
tierno868220c2017-09-26 00:11:05 +0200385 index = len(self.refresh_tasks)
386 self.refresh_tasks.append(task)
387 self.logger.debug("new refresh task={} name={}, modified_at={} index={}".format(
388 task_id, task_name, task["modified_at"], index))
tierno867ffe92017-03-27 12:50:34 +0200389
390 def _remove_refresh(self, task_name, vim_id):
391 """Remove a task with this name and vim_id from the list of refreshing elements.
392 It is assumed that this is called inside this thread outside _refres_elements method
393 Return True if self.refresh_list is modified, task is found
394 Return False if not found
395 """
396 index_to_delete = None
tierno868220c2017-09-26 00:11:05 +0200397 for index in range(0, len(self.refresh_tasks)):
398 if self.refresh_tasks[index]["name"] == task_name and self.refresh_tasks[index]["vim_id"] == vim_id:
tierno867ffe92017-03-27 12:50:34 +0200399 index_to_delete = index
400 break
401 else:
402 return False
403 if index_to_delete != None:
tierno868220c2017-09-26 00:11:05 +0200404 del self.refresh_tasks[index_to_delete]
tierno867ffe92017-03-27 12:50:34 +0200405 return True
tierno42026a02017-02-10 15:13:40 +0100406
tierno868220c2017-09-26 00:11:05 +0200407 def _proccess_pending_tasks(self):
408 nb_created = 0
409 nb_processed = 0
410 while self.pending_tasks:
411 task = self.pending_tasks.pop(0)
412 nb_processed += 1
413 if task["status"] == "SUPERSEDED":
414 # not needed to do anything but update database with the new status
415 result = True
416 database_update = None
tierno9c22f2d2017-10-09 16:23:55 +0200417 elif not self.vim:
418 task["status"] == "ERROR"
419 task["error_msg"] = self.error_status
420 result = False
421 database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]}
tierno868220c2017-09-26 00:11:05 +0200422 elif task["item"] == 'instance_vms':
423 if task["action"] == "CREATE":
424 result, database_update = self.new_vm(task)
425 nb_created += 1
426 elif task["action"] == "DELETE":
427 result, database_update = self.del_vm(task)
428 else:
429 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
430 elif task["item"] == 'instance_nets':
431 if task["action"] == "CREATE":
432 result, database_update = self.new_net(task)
433 nb_created += 1
434 elif task["action"] == "DELETE":
435 result, database_update = self.del_net(task)
436 elif task["action"] == "FIND":
437 result, database_update = self.get_net(task)
438 else:
439 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
440 else:
441 raise vimconn.vimconnException(self.name + "unknown task item {}".format(task["item"]))
442 # TODO
443
444 if task["status"] == "SCHEDULED":
445 # This is because a depend task is not completed. Moved to the end. NOT USED YET
446 if task["extra"].get("tries", 0) > 3:
447 task["status"] == "FAILED"
448 else:
449 task["extra"]["tries"] = task["extra"].get("tries", 0) + 1
450 self.pending_tasks.append(task)
451 elif task["action"] == "DELETE":
452 action_key = task["item"] + task["item_id"]
453 del self.grouped_tasks[action_key]
454 elif task["action"] in ("CREATE", "FIND") and task["status"] in ("DONE", "BUILD"):
455 self._insert_refresh(task)
456
457 self.logger.debug("vim_action id={}.{} item={} action={} result={}:{} params={}".format(
458 task["instance_action_id"], task["task_index"], task["item"], task["action"],
459 task["status"], task["vim_id"] if task["status"] == "DONE" else task.get("error_msg"),
460 task["params"]))
461 try:
462 now = time.time()
463 with self.db_lock:
464 self.db.update_rows(
465 table="vim_actions",
466 UPDATE={"status": task["status"], "vim_id": task["vim_id"], "modified_at": now,
467 "error_msg": task["error_msg"],
468 "extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256)},
469 WHERE={"instance_action_id": task["instance_action_id"], "task_index": task["task_index"]})
470 if result is not None:
471 self.db.update_rows(
472 table="instance_actions",
473 UPDATE={("number_done" if result else "number_failed"): {"INCREMENT": 1},
474 "modified_at": now},
475 WHERE={"uuid": task["instance_action_id"]})
476 if database_update:
477 self.db.update_rows(table=task["item"],
478 UPDATE=database_update,
479 WHERE={"uuid": task["item_id"]})
480 except db_base_Exception as e:
481 self.logger.error("Error updating database %s", str(e), exc_info=True)
482
483 if nb_created == 10:
484 break
485 return nb_processed
486
487 def _insert_pending_tasks(self, vim_actions_list):
488 now = time.time()
489 for task in vim_actions_list:
490 if task["datacenter_vim_id"] != self.datacenter_tenant_id:
491 continue
492 item = task["item"]
493 item_id = task["item_id"]
494 action_key = item + item_id
495 if action_key not in self.grouped_tasks:
496 self.grouped_tasks[action_key] = []
497 task["params"] = None
498 task["depends"] = {}
499 if task["extra"]:
500 extra = yaml.load(task["extra"])
501 task["extra"] = extra
502 task["params"] = extra.get("params")
503 depends_on_list = extra.get("depends_on")
504 if depends_on_list:
505 for index in depends_on_list:
506 if index < len(vim_actions_list) and vim_actions_list[index]["task_index"] == index and\
507 vim_actions_list[index]["instance_action_id"] == task["instance_action_id"]:
508 task["depends"]["TASK-" + str(index)] = vim_actions_list[index]
509 if extra.get("interfaces"):
510 task["vim_interfaces"] = {}
511 else:
512 task["extra"] = {}
513 if "error_msg" not in task:
514 task["error_msg"] = None
515 if "vim_id" not in task:
516 task["vim_id"] = None
517
518 if task["action"] == "DELETE":
519 need_delete_action = False
520 for to_supersede in self.grouped_tasks.get(action_key, ()):
521 if to_supersede["action"] == "FIND" and to_supersede.get("vim_id"):
522 task["vim_id"] = to_supersede["vim_id"]
523 if to_supersede["action"] == "CREATE" and to_supersede.get("vim_id"):
524 need_delete_action = True
525 task["vim_id"] = to_supersede["vim_id"]
526 if to_supersede["extra"].get("sdn_vim_id"):
527 task["extra"]["sdn_vim_id"] = to_supersede["extra"]["sdn_vim_id"]
528 if to_supersede["extra"].get("interfaces"):
529 task["extra"]["interfaces"] = to_supersede["extra"]["interfaces"]
530 # Mark task as SUPERSEDED.
531 # If task is in self.pending_tasks, it will be removed and database will be update
532 # If task is in self.refresh_tasks, it will be removed
533 to_supersede["status"] = "SUPERSEDED"
534 if not need_delete_action:
535 task["status"] = "SUPERSEDED"
536
537 self.grouped_tasks[action_key].append(task)
538 self.pending_tasks.append(task)
539 elif task["status"] == "SCHEDULED":
540 self.grouped_tasks[action_key].append(task)
541 self.pending_tasks.append(task)
542 elif task["action"] in ("CREATE", "FIND"):
543 self.grouped_tasks[action_key].append(task)
544 if task["status"] in ("DONE", "BUILD"):
545 self._insert_refresh(task)
546 # TODO add VM reset, get console, etc...
547 else:
548 raise vimconn.vimconnException(self.name + "unknown vim_action action {}".format(task["action"]))
549
tiernob3d36742017-03-03 23:51:05 +0100550 def insert_task(self, task):
tierno42026a02017-02-10 15:13:40 +0100551 try:
tiernob3d36742017-03-03 23:51:05 +0100552 self.task_queue.put(task, False)
tierno868220c2017-09-26 00:11:05 +0200553 return None
tierno42026a02017-02-10 15:13:40 +0100554 except Queue.Full:
tiernob3d36742017-03-03 23:51:05 +0100555 raise vimconn.vimconnException(self.name + ": timeout inserting a task")
556
557 def del_task(self, task):
558 with self.task_lock:
tierno868220c2017-09-26 00:11:05 +0200559 if task["status"] == "SCHEDULED":
560 task["status"] == "SUPERSEDED"
tiernob3d36742017-03-03 23:51:05 +0100561 return True
562 else: # task["status"] == "processing"
563 self.task_lock.release()
564 return False
tierno42026a02017-02-10 15:13:40 +0100565
566 def run(self):
567 self.logger.debug("Starting")
568 while True:
tierno868220c2017-09-26 00:11:05 +0200569 self._reload_vim_actions()
570 reload_thread = False
tierno42026a02017-02-10 15:13:40 +0100571 while True:
tierno639520f2017-04-05 19:55:36 +0200572 try:
tierno868220c2017-09-26 00:11:05 +0200573 while not self.task_queue.empty():
tierno639520f2017-04-05 19:55:36 +0200574 task = self.task_queue.get()
tierno868220c2017-09-26 00:11:05 +0200575 if isinstance(task, list):
576 self._insert_pending_tasks(task)
577 elif isinstance(task, str):
578 if task == 'exit':
579 return 0
580 elif task == 'reload':
581 reload_thread = True
582 break
583 self.task_queue.task_done()
584 if reload_thread:
tierno639520f2017-04-05 19:55:36 +0200585 break
tierno868220c2017-09-26 00:11:05 +0200586 nb_processed = self._proccess_pending_tasks()
587 nb_processed += self._refres_elements()
588 if not nb_processed:
589 time.sleep(1)
590
tierno639520f2017-04-05 19:55:36 +0200591 except Exception as e:
592 self.logger.critical("Unexpected exception at run: " + str(e), exc_info=True)
tierno42026a02017-02-10 15:13:40 +0100593
594 self.logger.debug("Finishing")
595
tiernob3d36742017-03-03 23:51:05 +0100596 def terminate(self, task):
597 return True, None
598
tierno868220c2017-09-26 00:11:05 +0200599 def _look_for_task(self, instance_action_id, task_id):
600 task_index = task_id.split("-")[-1]
601 with self.db_lock:
602 tasks = self.db.get_rows(FROM="vim_actions", WHERE={"instance_action_id": instance_action_id,
603 "task_index": task_index})
604 if not tasks:
605 return None
606 task = tasks[0]
607 task["params"] = None
608 task["depends"] = {}
609 if task["extra"]:
610 extra = yaml.load(task["extra"])
611 task["extra"] = extra
612 task["params"] = extra.get("params")
613 if extra.get("interfaces"):
614 task["vim_interfaces"] = {}
615 else:
616 task["extra"] = {}
617 return task
618
tierno639520f2017-04-05 19:55:36 +0200619 def _format_vim_error_msg(self, error_text, max_length=1024):
620 if error_text and len(error_text) >= max_length:
621 return error_text[:max_length//2-3] + " ... " + error_text[-max_length//2+3:]
tierno95a1ae52017-03-16 13:17:24 +0100622 return error_text
623
tiernob3d36742017-03-03 23:51:05 +0100624 def new_net(self, task):
625 try:
tierno868220c2017-09-26 00:11:05 +0200626 task_id = task["instance_action_id"] + "." + str(task["task_index"])
tiernob3d36742017-03-03 23:51:05 +0100627 params = task["params"]
tierno868220c2017-09-26 00:11:05 +0200628 vim_net_id = self.vim.new_network(*params)
Pablo Montes Moreno3fbff9b2017-03-08 11:28:15 +0100629
630 net_name = params[0]
631 net_type = params[1]
632
633 network = None
tiernof2bd1e82017-04-06 14:25:39 +0200634 sdn_net_id = None
Pablo Montes Moreno7e0e9c62017-03-27 12:42:32 +0200635 sdn_controller = self.vim.config.get('sdn-controller')
Pablo Montes Moreno3fbff9b2017-03-08 11:28:15 +0100636 if sdn_controller and (net_type == "data" or net_type == "ptp"):
tierno518a8662017-05-30 11:35:46 +0200637 network = {"name": net_name, "type": net_type, "region": self.vim["config"]["datacenter_id"]}
Pablo Montes Moreno3fbff9b2017-03-08 11:28:15 +0100638
tierno868220c2017-09-26 00:11:05 +0200639 vim_net = self.vim.get_network(vim_net_id)
Pablo Montes Moreno51e553b2017-03-23 16:39:12 +0100640 if vim_net.get('encapsulation') != 'vlan':
tierno91baf982017-03-30 19:37:57 +0200641 raise vimconn.vimconnException(
642 "net '{}' defined as type '{}' has not vlan encapsulation '{}'".format(
643 net_name, net_type, vim_net['encapsulation']))
Pablo Montes Moreno3fbff9b2017-03-08 11:28:15 +0100644 network["vlan"] = vim_net.get('segmentation_id')
tierno91baf982017-03-30 19:37:57 +0200645 try:
tierno2391cc12017-08-02 12:48:13 +0200646 with self.db_lock:
647 sdn_net_id = self.ovim.new_network(network)
tierno91baf982017-03-30 19:37:57 +0200648 except (ovimException, Exception) as e:
649 self.logger.error("task=%s cannot create SDN network vim_net_id=%s input='%s' ovimException='%s'",
tierno868220c2017-09-26 00:11:05 +0200650 str(task_id), vim_net_id, str(network), str(e))
651 task["status"] = "DONE"
652 task["extra"]["vim_info"] = {}
653 task["extra"]["sdn_net_id"] = sdn_net_id
654 task["error_msg"] = None
655 task["vim_id"] = vim_net_id
656 instance_element_update = {"vim_net_id": vim_net_id, "sdn_net_id": sdn_net_id, "status": "BUILD", "error_msg": None}
657 return True, instance_element_update
tiernob3d36742017-03-03 23:51:05 +0100658 except vimconn.vimconnException as e:
tierno95a1ae52017-03-16 13:17:24 +0100659 self.logger.error("Error creating NET, task=%s: %s", str(task_id), str(e))
tierno868220c2017-09-26 00:11:05 +0200660 task["status"] = "FAILED"
661 task["vim_id"] = None
662 task["error_msg"] = self._format_vim_error_msg(str(e))
663 instance_element_update = {"vim_net_id": None, "sdn_net_id": None, "status": "VIM_ERROR", "error_msg": task["error_msg"]}
664 return False, instance_element_update
tiernob3d36742017-03-03 23:51:05 +0100665
666 def new_vm(self, task):
667 try:
668 params = task["params"]
tierno868220c2017-09-26 00:11:05 +0200669 task_id = task["instance_action_id"] + "." + str(task["task_index"])
tiernob3d36742017-03-03 23:51:05 +0100670 depends = task.get("depends")
671 net_list = params[5]
tierno867ffe92017-03-27 12:50:34 +0200672 error_text = ""
tiernob3d36742017-03-03 23:51:05 +0100673 for net in net_list:
tierno867ffe92017-03-27 12:50:34 +0200674 if "net_id" in net and is_task_id(net["net_id"]): # change task_id into network_id
tierno868220c2017-09-26 00:11:05 +0200675 if net["net_id"] in depends:
tiernob3d36742017-03-03 23:51:05 +0100676 task_net = depends[net["net_id"]]
tierno867ffe92017-03-27 12:50:34 +0200677 else:
tierno868220c2017-09-26 00:11:05 +0200678 task_net = self._look_for_task(task["instance_action_id"], net["net_id"])
679 if not task_net:
680 raise VimThreadException(
681 "Error trying to get depending task from task_index={}".format(net["net_id"]))
682 network_id = task_net.get("vim_id")
683 if not network_id:
684 raise VimThreadException(
685 "Cannot create VM because depends on a network not created or found: " +
686 str(task_net["error_msg"]))
687 net["net_id"] = network_id
688 vim_vm_id = self.vim.new_vminstance(*params)
689
690 # fill task_interfaces. Look for snd_net_id at database for each interface
691 task_interfaces = {}
692 for iface in net_list:
693 task_interfaces[iface["vim_id"]] = {"iface_id": iface["uuid"]}
tierno95a1ae52017-03-16 13:17:24 +0100694 with self.db_lock:
tierno868220c2017-09-26 00:11:05 +0200695 result = self.db.get_rows(SELECT=('sdn_net_id',),
696 FROM='instance_nets as ine join instance_interfaces as ii on ii.instance_net_id=ine.uuid',
697 WHERE={'ii.uuid': iface["uuid"]})
698 if result:
699 task_interfaces[iface["vim_id"]]["sdn_net_id"] = result[0]['sdn_net_id']
700 else:
701 self.logger.critical("Error creating VM, task=%s Network {} not found at DB", task_id,
702 iface["uuid"], exc_info=True)
703
704 task["vim_info"] = {}
705 task["vim_interfaces"] = {}
706 task["extra"]["interfaces"] = task_interfaces
707 task["error_msg"] = None
708 task["status"] = "DONE"
709 task["vim_id"] = vim_vm_id
710 instance_element_update = {"status": "BUILD", "vim_vm_id": vim_vm_id, "error_msg": None}
711 return True, instance_element_update
712
713 except (vimconn.vimconnException, VimThreadException) as e:
714 self.logger.error("Error creating VM, task=%s: %s", task_id, str(e))
715 error_text = self._format_vim_error_msg(str(e))
716 task["error_msg"] = error_text
717 task["status"] = "FAILED"
718 task["vim_id"] = None
719 instance_element_update = {"status": "VIM_ERROR", "vim_vm_id": None, "error_msg": error_text}
720 return False, instance_element_update
tiernob3d36742017-03-03 23:51:05 +0100721
722 def del_vm(self, task):
tierno868220c2017-09-26 00:11:05 +0200723 vm_vim_id = task["vim_id"]
724 interfaces = task["extra"].get("interfaces", ())
tiernob3d36742017-03-03 23:51:05 +0100725 try:
tierno868220c2017-09-26 00:11:05 +0200726 for iface in interfaces.values():
tierno867ffe92017-03-27 12:50:34 +0200727 if iface.get("sdn_port_id"):
728 try:
tierno2391cc12017-08-02 12:48:13 +0200729 with self.db_lock:
730 self.ovim.delete_port(iface["sdn_port_id"])
tierno867ffe92017-03-27 12:50:34 +0200731 except ovimException as e:
732 self.logger.error("ovimException deleting external_port={} at VM vim_id={} deletion ".format(
tierno868220c2017-09-26 00:11:05 +0200733 iface["sdn_port_id"], vm_vim_id) + str(e), exc_info=True)
tierno867ffe92017-03-27 12:50:34 +0200734 # TODO Set error_msg at instance_nets
735
tierno868220c2017-09-26 00:11:05 +0200736 self.vim.delete_vminstance(vm_vim_id)
737 task["status"] = "DONE"
738 task["error_msg"] = None
739 return True, None
740
tiernob3d36742017-03-03 23:51:05 +0100741 except vimconn.vimconnException as e:
tierno868220c2017-09-26 00:11:05 +0200742 task["error_msg"] = self._format_vim_error_msg(str(e))
743 if isinstance(e, vimconn.vimconnNotFoundException):
744 # If not found mark as Done and fill error_msg
745 task["status"] = "DONE"
746 return True, None
747 task["status"] = "FAILED"
748 return False, None
tiernob3d36742017-03-03 23:51:05 +0100749
750 def del_net(self, task):
tierno868220c2017-09-26 00:11:05 +0200751 net_vim_id = task["vim_id"]
752 sdn_net_id = task["extra"].get("sdn_net_id")
tiernob3d36742017-03-03 23:51:05 +0100753 try:
Pablo Montes Moreno3fbff9b2017-03-08 11:28:15 +0100754 if sdn_net_id:
tierno868220c2017-09-26 00:11:05 +0200755 # Delete any attached port to this sdn network. There can be ports associated to this network in case
756 # it was manually done using 'openmano vim-net-sdn-attach'
Pablo Montes Moreno3fbff9b2017-03-08 11:28:15 +0100757 with self.db_lock:
tierno868220c2017-09-26 00:11:05 +0200758 port_list = self.ovim.get_ports(columns={'uuid'},
759 filter={'name': 'external_port', 'net_id': sdn_net_id})
760 for port in port_list:
761 self.ovim.delete_port(port['uuid'])
Pablo Montes Moreno3fbff9b2017-03-08 11:28:15 +0100762 self.ovim.delete_network(sdn_net_id)
tierno868220c2017-09-26 00:11:05 +0200763 self.vim.delete_network(net_vim_id)
764 task["status"] = "DONE"
765 task["error_msg"] = None
766 return True, None
Pablo Montes Moreno7e0e9c62017-03-27 12:42:32 +0200767 except ovimException as e:
tierno868220c2017-09-26 00:11:05 +0200768 task["error_msg"] = self._format_vim_error_msg("ovimException obtaining and deleting external "
769 "ports for net {}: {}".format(sdn_net_id, str(e)))
770 except vimconn.vimconnException as e:
771 task["error_msg"] = self._format_vim_error_msg(str(e))
772 if isinstance(e, vimconn.vimconnNotFoundException):
773 # If not found mark as Done and fill error_msg
774 task["status"] = "DONE"
775 return True, None
776 task["status"] = "FAILED"
777 return False, None
Pablo Montes Moreno7e0e9c62017-03-27 12:42:32 +0200778
tierno868220c2017-09-26 00:11:05 +0200779 def get_net(self, task):
780 try:
781 task_id = task["instance_action_id"] + "." + str(task["task_index"])
782 params = task["params"]
783 filter = params[0]
784 vim_nets = self.vim.get_network_list(filter)
785 if not vim_nets:
786 raise VimThreadException("Network not found with this criteria: '{}'".format(filter))
787 elif len(vim_nets) > 1:
788 raise VimThreadException("More than one network found with this criteria: '{}'".format(filter))
789 vim_net_id = vim_nets[0]["id"]
tierno42026a02017-02-10 15:13:40 +0100790
tierno868220c2017-09-26 00:11:05 +0200791 # Discover if this network is managed by a sdn controller
792 sdn_net_id = None
793 with self.db_lock:
794 result = self.db.get_rows(SELECT=('sdn_net_id',), FROM='instance_nets',
795 WHERE={'vim_net_id': vim_net_id, 'instance_scenario_id': None,
796 'datacenter_tenant_id': self.datacenter_tenant_id})
797 if result:
798 sdn_net_id = result[0]['sdn_net_id']
799
800 task["status"] = "DONE"
801 task["extra"]["vim_info"] = {}
802 task["extra"]["created"] = False
803 task["extra"]["sdn_net_id"] = sdn_net_id
804 task["error_msg"] = None
805 task["vim_id"] = vim_net_id
806 instance_element_update = {"vim_net_id": vim_net_id, "created": False, "status": "BUILD",
807 "error_msg": None}
808 return True, instance_element_update
809 except (vimconn.vimconnException, VimThreadException) as e:
810 self.logger.error("Error looking for NET, task=%s: %s", str(task_id), str(e))
811 task["status"] = "FAILED"
812 task["vim_id"] = None
813 task["error_msg"] = self._format_vim_error_msg(str(e))
814 instance_element_update = {"vim_net_id": None, "status": "VIM_ERROR",
815 "error_msg": task["error_msg"]}
816 return False, instance_element_update