bug 331 Ensure neutron port is deleted upon vm creation failed
[osm/RO.git] / osm_ro / vim_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
5 # This file is part of openvim
6 # All Rights Reserved.
7 #
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
11 #
12 # http://www.apache.org/licenses/LICENSE-2.0
13 #
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
18 # under the License.
19 #
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
22 ##
23
24 """"
25 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
26 The tasks are stored at database in table vim_actions
27 The task content are (M: stored at memory, D: stored at database):
28 MD instance_action_id: reference a global action over an instance-scenario: database instance_actions
29 MD task_index: index number of the task. This with the previous are a key
30 MD datacenter_vim_id: should contain the uuid of the VIM managed by this thread
31 MD vim_id: id of the vm,net,etc at VIM
32 MD action: CREATE, DELETE, FIND
33 MD item: database table name, can be instance_vms, instance_nets, TODO: datacenter_flavors, datacenter_images
34 MD item_id: uuid of the referenced entry in the preious table
35 MD status: SCHEDULED,BUILD,DONE,FAILED,SUPERSEDED
36 MD extra: text with yaml format at database, dict at memory with:
37 params: list with the params to be sent to the VIM for CREATE or FIND. For DELETE the vim_id is taken from other related tasks
38 find: (only for CREATE tasks) if present it should FIND before creating and use if existing. Contains the FIND params
39 depends_on: list with the 'task_index'es of tasks that must be completed before. e.g. a vm depends on a net
40 sdn_net_id: used for net.
41 tries:
42 interfaces: used for VMs. Each key is the uuid of the instance_interfaces entry at database
43 iface_id: uuid of intance_interfaces
44 sdn_port_id:
45 sdn_net_id:
46 created_items: dictionary with extra elements created that need to be deleted. e.g. ports, volumes,...
47 created: False if the VIM element is not created by other actions, and it should not be deleted
48 vim_status: VIM status of the element. Stored also at database in the instance_XXX
49 M depends: dict with task_index(from depends_on) to task class
50 M params: same as extra[params] but with the resolved dependencies
51 M vim_interfaces: similar to extra[interfaces] but with VIM information. Stored at database in the instance_XXX but not at vim_actions
52 M vim_info: Detailed information of a vm,net from the VIM. Stored at database in the instance_XXX but not at vim_actions
53 MD error_msg: descriptive text upon an error.Stored also at database instance_XXX
54 MD created_at: task creation time
55 MD modified_at: last task update time. On refresh it contains when this task need to be refreshed
56
57 """
58
59 import threading
60 import time
61 import Queue
62 import logging
63 import vimconn
64 import yaml
65 from db_base import db_base_Exception
66 from lib_osm_openvim.ovim import ovimException
67
68 __author__ = "Alfonso Tierno, Pablo Montes"
69 __date__ = "$28-Sep-2017 12:07:15$"
70
71 # from logging import Logger
72 # import auxiliary_functions as af
73
74
75 def is_task_id(task_id):
76 return task_id.startswith("TASK-")
77
78
79 class VimThreadException(Exception):
80 pass
81
82
83 class VimThreadExceptionNotFound(VimThreadException):
84 pass
85
86
87 class vim_thread(threading.Thread):
88 REFRESH_BUILD = 5 # 5 seconds
89 REFRESH_ACTIVE = 60 # 1 minute
90
91 def __init__(self, myvimconn, task_lock, name=None, datacenter_name=None, datacenter_tenant_id=None,
92 db=None, db_lock=None, ovim=None):
93 """Init a thread.
94 Arguments:
95 'id' number of thead
96 'name' name of thread
97 'host','user': host ip or name to manage and user
98 'db', 'db_lock': database class and lock to use it in exclusion
99 """
100 threading.Thread.__init__(self)
101 if isinstance(myvimconn, vimconn.vimconnException):
102 self.vim = None
103 self.error_status = "Error accesing to VIM: {}".format(myvimconn)
104 else:
105 self.vim = myvimconn
106 self.error_status = None
107 self.datacenter_name = datacenter_name
108 self.datacenter_tenant_id = datacenter_tenant_id
109 self.ovim = ovim
110 if not name:
111 self.name = vimconn["id"] + "." + vimconn["config"]["datacenter_tenant_id"]
112 else:
113 self.name = name
114
115 self.logger = logging.getLogger('openmano.vim.'+self.name)
116 self.db = db
117 self.db_lock = db_lock
118
119 self.task_lock = task_lock
120 self.task_queue = Queue.Queue(2000)
121
122 self.refresh_tasks = []
123 """Contains time ordered task list for refreshing the status of VIM VMs and nets"""
124
125 self.pending_tasks = []
126 """Contains time ordered task list for creation, deletion of VIM VMs and nets"""
127
128 self.grouped_tasks = {}
129 """ It contains all the creation/deletion pending tasks grouped by its concrete vm, net, etc
130 <item><item_id>:
131 - <task1> # e.g. CREATE task
132 <task2> # e.g. DELETE task
133 """
134
135 def _reload_vim_actions(self):
136 """
137 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
138 :return: None
139 """
140 action_completed = False
141 task_list = []
142 old_action_key = None
143
144 with self.db_lock:
145 vim_actions = self.db.get_rows(FROM="vim_actions",
146 WHERE={"datacenter_vim_id": self.datacenter_tenant_id },
147 ORDER_BY=("item", "item_id", "created_at",))
148 for task in vim_actions:
149 item = task["item"]
150 item_id = task["item_id"]
151 action_key = item + item_id
152 if old_action_key != action_key:
153 if not action_completed and task_list:
154 # This will fill needed task parameters into memory, and insert the task if needed in
155 # self.pending_tasks or self.refresh_tasks
156 self._insert_pending_tasks(task_list)
157 task_list = []
158 old_action_key = action_key
159 action_completed = False
160 elif action_completed:
161 continue
162
163 if task["status"] == "SCHEDULED" or task["action"] == "CREATE" or task["action"] == "FIND":
164 task_list.append(task)
165 elif task["action"] == "DELETE":
166 # action completed because deleted and status is not SCHEDULED. Not needed anything
167 action_completed = True
168
169 # Last actions group need to be inserted too
170 if not action_completed and task_list:
171 self._insert_pending_tasks(task_list)
172
173 def _refres_elements(self):
174 """Call VIM to get VMs and networks status until 10 elements"""
175 now = time.time()
176 nb_processed = 0
177 vm_to_refresh_list = []
178 net_to_refresh_list = []
179 vm_to_refresh_dict = {}
180 net_to_refresh_dict = {}
181 items_to_refresh = 0
182 while self.refresh_tasks:
183 task = self.refresh_tasks[0]
184 with self.task_lock:
185 if task['status'] == 'SUPERSEDED':
186 self.refresh_tasks.pop(0)
187 continue
188 if task['modified_at'] > now:
189 break
190 # task["status"] = "processing"
191 nb_processed += 1
192 self.refresh_tasks.pop(0)
193 if task["item"] == 'instance_vms':
194 vm_to_refresh_list.append(task["vim_id"])
195 vm_to_refresh_dict[task["vim_id"]] = task
196 elif task["item"] == 'instance_nets':
197 net_to_refresh_list.append(task["vim_id"])
198 net_to_refresh_dict[task["vim_id"]] = task
199 else:
200 error_text = "unknown task {}".format(task["item"])
201 self.logger.error(error_text)
202 items_to_refresh += 1
203 if items_to_refresh == 10:
204 break
205
206 if vm_to_refresh_list:
207 try:
208 now = time.time()
209 vim_dict = self.vim.refresh_vms_status(vm_to_refresh_list)
210 for vim_id, vim_info in vim_dict.items():
211 # look for task
212 task_need_update = False
213 task = vm_to_refresh_dict[vim_id]
214 self.logger.debug("get-vm vm_id=%s result=%s", task["vim_id"], str(vim_info))
215
216 # update database
217 task_vim_info = task.get("vim_info")
218 task_error_msg = task.get("error_msg")
219 task_vim_status = task["extra"].get("vim_status")
220 if vim_info.get("error_msg"):
221 vim_info["error_msg"] = self._format_vim_error_msg(vim_info["error_msg"])
222 if task_vim_status != vim_info["status"] or task_error_msg != vim_info.get("error_msg") or \
223 task_vim_info != vim_info.get("vim_info"):
224 with self.db_lock:
225 temp_dict = {"status": vim_info["status"],
226 "error_msg": vim_info.get("error_msg"),
227 "vim_info": vim_info.get("vim_info")}
228 self.db.update_rows('instance_vms', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]})
229 task["extra"]["vim_status"] = vim_info["status"]
230 task["error_msg"] = vim_info.get("error_msg")
231 task["vim_info"] = vim_info.get("vim_info")
232 task_need_update = True
233 for interface in vim_info.get("interfaces", ()):
234 vim_interface_id = interface["vim_interface_id"]
235 if vim_interface_id not in task["extra"]["interfaces"]:
236 self.logger.critical("Interface not found {} on task info {}".format(
237 vim_interface_id, task["extra"]["interfaces"]), exc_info=True)
238 continue
239 task_interface = task["extra"]["interfaces"][vim_interface_id]
240 task_vim_interface = task["vim_interfaces"].get(vim_interface_id)
241 if task_vim_interface != interface:
242 # delete old port
243 if task_interface.get("sdn_port_id"):
244 try:
245 with self.db_lock:
246 self.ovim.delete_port(task_interface["sdn_port_id"])
247 task_interface["sdn_port_id"] = None
248 task_need_update = True
249 except ovimException as e:
250 self.logger.error("ovimException deleting external_port={} ".format(
251 task_interface["sdn_port_id"]) + str(e), exc_info=True)
252 # TODO Set error_msg at instance_nets
253
254 # Create SDN port
255 sdn_net_id = task_interface.get("sdn_net_id")
256 if sdn_net_id and interface.get("compute_node") and interface.get("pci"):
257 sdn_port_name = sdn_net_id + "." + task["vim_id"]
258 sdn_port_name = sdn_port_name[:63]
259 try:
260 with self.db_lock:
261 sdn_port_id = self.ovim.new_external_port(
262 {"compute_node": interface["compute_node"],
263 "pci": interface["pci"],
264 "vlan": interface.get("vlan"),
265 "net_id": sdn_net_id,
266 "region": self.vim["config"]["datacenter_id"],
267 "name": sdn_port_name,
268 "mac": interface.get("mac_address")})
269 task_interface["sdn_port_id"] = sdn_port_id
270 task_need_update = True
271 except (ovimException, Exception) as e:
272 self.logger.error(
273 "ovimException creating new_external_port compute_node={} "
274 "pci={} vlan={} ".format(
275 interface["compute_node"],
276 interface["pci"],
277 interface.get("vlan")) + str(e),
278 exc_info=True)
279 # TODO Set error_msg at instance_nets
280 with self.db_lock:
281 self.db.update_rows(
282 'instance_interfaces',
283 UPDATE={"mac_address": interface.get("mac_address"),
284 "ip_address": interface.get("ip_address"),
285 "vim_info": interface.get("vim_info"),
286 "sdn_port_id": task_interface.get("sdn_port_id"),
287 "compute_node": interface.get("compute_node"),
288 "pci": interface.get("pci"),
289 "vlan": interface.get("vlan"),
290 },
291 WHERE={'uuid': task_interface["iface_id"]})
292 task["vim_interfaces"][vim_interface_id] = interface
293 if task_need_update:
294 with self.db_lock:
295 self.db.update_rows(
296 'vim_actions',
297 UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256),
298 "error_msg": task.get("error_msg"), "modified_at": now},
299 WHERE={'instance_action_id': task['instance_action_id'],
300 'task_index': task['task_index']})
301 if task["extra"].get("vim_status") == "BUILD":
302 self._insert_refresh(task, now + self.REFRESH_BUILD)
303 else:
304 self._insert_refresh(task, now + self.REFRESH_ACTIVE)
305 except vimconn.vimconnException as e:
306 self.logger.error("vimconnException Exception when trying to refresh vms " + str(e))
307 self._insert_refresh(task, now + self.REFRESH_ACTIVE)
308
309 if net_to_refresh_list:
310 try:
311 now = time.time()
312 vim_dict = self.vim.refresh_nets_status(net_to_refresh_list)
313 for vim_id, vim_info in vim_dict.items():
314 # look for task
315 task = net_to_refresh_dict[vim_id]
316 self.logger.debug("get-net net_id=%s result=%s", task["vim_id"], str(vim_info))
317
318 task_vim_info = task.get("vim_info")
319 task_vim_status = task["extra"].get("vim_status")
320 task_error_msg = task.get("error_msg")
321 task_sdn_net_id = task["extra"].get("sdn_net_id")
322
323 # get ovim status
324 if task_sdn_net_id:
325 try:
326 with self.db_lock:
327 sdn_net = self.ovim.show_network(task_sdn_net_id)
328 if sdn_net["status"] == "ERROR":
329 if not vim_info.get("error_msg"):
330 vim_info["error_msg"] = sdn_net["error_msg"]
331 else:
332 vim_info["error_msg"] = "VIM_ERROR: {} && SDN_ERROR: {}".format(
333 self._format_vim_error_msg(vim_info["error_msg"], 1024//2-14),
334 self._format_vim_error_msg(sdn_net["error_msg"], 1024//2-14))
335 if vim_info["status"] == "VIM_ERROR":
336 vim_info["status"] = "VIM_SDN_ERROR"
337 else:
338 vim_info["status"] = "SDN_ERROR"
339
340 except (ovimException, Exception) as e:
341 self.logger.error(
342 "ovimException getting network infor snd_net_id={}".format(task_sdn_net_id),
343 exc_info=True)
344 # TODO Set error_msg at instance_nets
345
346 # update database
347 if vim_info.get("error_msg"):
348 vim_info["error_msg"] = self._format_vim_error_msg(vim_info["error_msg"])
349 if task_vim_status != vim_info["status"] or task_error_msg != vim_info.get("error_msg") or \
350 task_vim_info != vim_info["vim_info"]:
351 task["extra"]["vim_status"] = vim_info["status"]
352 task["error_msg"] = vim_info.get("error_msg")
353 task["vim_info"] = vim_info["vim_info"]
354 temp_dict = {"status": vim_info["status"],
355 "error_msg": vim_info.get("error_msg"),
356 "vim_info": vim_info["vim_info"]}
357 with self.db_lock:
358 self.db.update_rows('instance_nets', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]})
359 self.db.update_rows(
360 'vim_actions',
361 UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256),
362 "error_msg": task.get("error_msg"), "modified_at": now},
363 WHERE={'instance_action_id': task['instance_action_id'],
364 'task_index': task['task_index']})
365 if task["extra"].get("vim_status") == "BUILD":
366 self._insert_refresh(task, now + self.REFRESH_BUILD)
367 else:
368 self._insert_refresh(task, now + self.REFRESH_ACTIVE)
369 except vimconn.vimconnException as e:
370 self.logger.error("vimconnException Exception when trying to refresh nets " + str(e))
371 self._insert_refresh(task, now + self.REFRESH_ACTIVE)
372
373 return nb_processed
374
375 def _insert_refresh(self, task, threshold_time=None):
376 """Insert a task at list of refreshing elements. The refreshing list is ordered by threshold_time (task['modified_at']
377 It is assumed that this is called inside this thread
378 """
379 if not self.vim:
380 return
381 if not threshold_time:
382 threshold_time = time.time()
383 task["modified_at"] = threshold_time
384 task_name = task["item"][9:] + "-" + task["action"]
385 task_id = task["instance_action_id"] + "." + str(task["task_index"])
386 for index in range(0, len(self.refresh_tasks)):
387 if self.refresh_tasks[index]["modified_at"] > threshold_time:
388 self.refresh_tasks.insert(index, task)
389 break
390 else:
391 index = len(self.refresh_tasks)
392 self.refresh_tasks.append(task)
393 self.logger.debug("new refresh task={} name={}, modified_at={} index={}".format(
394 task_id, task_name, task["modified_at"], index))
395
396 def _remove_refresh(self, task_name, vim_id):
397 """Remove a task with this name and vim_id from the list of refreshing elements.
398 It is assumed that this is called inside this thread outside _refres_elements method
399 Return True if self.refresh_list is modified, task is found
400 Return False if not found
401 """
402 index_to_delete = None
403 for index in range(0, len(self.refresh_tasks)):
404 if self.refresh_tasks[index]["name"] == task_name and self.refresh_tasks[index]["vim_id"] == vim_id:
405 index_to_delete = index
406 break
407 else:
408 return False
409 if index_to_delete != None:
410 del self.refresh_tasks[index_to_delete]
411 return True
412
413 def _proccess_pending_tasks(self):
414 nb_created = 0
415 nb_processed = 0
416 while self.pending_tasks:
417 task = self.pending_tasks.pop(0)
418 nb_processed += 1
419 if task["status"] == "SUPERSEDED":
420 # not needed to do anything but update database with the new status
421 result = True
422 database_update = None
423 elif not self.vim:
424 task["status"] == "ERROR"
425 task["error_msg"] = self.error_status
426 result = False
427 database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]}
428 elif task["item"] == 'instance_vms':
429 if task["action"] == "CREATE":
430 result, database_update = self.new_vm(task)
431 nb_created += 1
432 elif task["action"] == "DELETE":
433 result, database_update = self.del_vm(task)
434 else:
435 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
436 elif task["item"] == 'instance_nets':
437 if task["action"] == "CREATE":
438 result, database_update = self.new_net(task)
439 nb_created += 1
440 elif task["action"] == "DELETE":
441 result, database_update = self.del_net(task)
442 elif task["action"] == "FIND":
443 result, database_update = self.get_net(task)
444 else:
445 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
446 else:
447 raise vimconn.vimconnException(self.name + "unknown task item {}".format(task["item"]))
448 # TODO
449
450 if task["status"] == "SCHEDULED":
451 # This is because a depend task is not completed. Moved to the end. NOT USED YET
452 if task["extra"].get("tries", 0) > 3:
453 task["status"] == "FAILED"
454 else:
455 task["extra"]["tries"] = task["extra"].get("tries", 0) + 1
456 self.pending_tasks.append(task)
457 elif task["action"] == "DELETE":
458 action_key = task["item"] + task["item_id"]
459 del self.grouped_tasks[action_key]
460 elif task["action"] in ("CREATE", "FIND") and task["status"] in ("DONE", "BUILD"):
461 self._insert_refresh(task)
462
463 self.logger.debug("vim_action id={}.{} item={} action={} result={}:{} params={}".format(
464 task["instance_action_id"], task["task_index"], task["item"], task["action"],
465 task["status"], task["vim_id"] if task["status"] == "DONE" else task.get("error_msg"),
466 task["params"]))
467 try:
468 now = time.time()
469 with self.db_lock:
470 self.db.update_rows(
471 table="vim_actions",
472 UPDATE={"status": task["status"], "vim_id": task["vim_id"], "modified_at": now,
473 "error_msg": task["error_msg"],
474 "extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256)},
475 WHERE={"instance_action_id": task["instance_action_id"], "task_index": task["task_index"]})
476 if result is not None:
477 self.db.update_rows(
478 table="instance_actions",
479 UPDATE={("number_done" if result else "number_failed"): {"INCREMENT": 1},
480 "modified_at": now},
481 WHERE={"uuid": task["instance_action_id"]})
482 if database_update:
483 self.db.update_rows(table=task["item"],
484 UPDATE=database_update,
485 WHERE={"uuid": task["item_id"]})
486 except db_base_Exception as e:
487 self.logger.error("Error updating database %s", str(e), exc_info=True)
488
489 if nb_created == 10:
490 break
491 return nb_processed
492
493 def _insert_pending_tasks(self, vim_actions_list):
494 now = time.time()
495 for task in vim_actions_list:
496 if task["datacenter_vim_id"] != self.datacenter_tenant_id:
497 continue
498 item = task["item"]
499 item_id = task["item_id"]
500 action_key = item + item_id
501 if action_key not in self.grouped_tasks:
502 self.grouped_tasks[action_key] = []
503 task["params"] = None
504 task["depends"] = {}
505 if task["extra"]:
506 extra = yaml.load(task["extra"])
507 task["extra"] = extra
508 task["params"] = extra.get("params")
509 depends_on_list = extra.get("depends_on")
510 if depends_on_list:
511 for index in depends_on_list:
512 if index < len(vim_actions_list) and vim_actions_list[index]["task_index"] == index and\
513 vim_actions_list[index]["instance_action_id"] == task["instance_action_id"]:
514 task["depends"]["TASK-" + str(index)] = vim_actions_list[index]
515 if extra.get("interfaces"):
516 task["vim_interfaces"] = {}
517 else:
518 task["extra"] = {}
519 if "error_msg" not in task:
520 task["error_msg"] = None
521 if "vim_id" not in task:
522 task["vim_id"] = None
523
524 if task["action"] == "DELETE":
525 need_delete_action = False
526 for to_supersede in self.grouped_tasks.get(action_key, ()):
527 if to_supersede["action"] == "FIND" and to_supersede.get("vim_id"):
528 task["vim_id"] = to_supersede["vim_id"]
529 if to_supersede["action"] == "CREATE" and to_supersede.get("vim_id") and \
530 to_supersede["extra"].get("created", True):
531 need_delete_action = True
532 task["vim_id"] = to_supersede["vim_id"]
533 if to_supersede["extra"].get("sdn_vim_id"):
534 task["extra"]["sdn_vim_id"] = to_supersede["extra"]["sdn_vim_id"]
535 if to_supersede["extra"].get("interfaces"):
536 task["extra"]["interfaces"] = to_supersede["extra"]["interfaces"]
537 if to_supersede["extra"].get("created_items"):
538 if not task["extra"].get("created_items"):
539 task["extra"]["created_items"] = {}
540 task["extra"]["created_items"].update(to_supersede["extra"]["created_items"])
541 # Mark task as SUPERSEDED.
542 # If task is in self.pending_tasks, it will be removed and database will be update
543 # If task is in self.refresh_tasks, it will be removed
544 to_supersede["status"] = "SUPERSEDED"
545 if not need_delete_action:
546 task["status"] = "SUPERSEDED"
547
548 self.grouped_tasks[action_key].append(task)
549 self.pending_tasks.append(task)
550 elif task["status"] == "SCHEDULED":
551 self.grouped_tasks[action_key].append(task)
552 self.pending_tasks.append(task)
553 elif task["action"] in ("CREATE", "FIND"):
554 self.grouped_tasks[action_key].append(task)
555 if task["status"] in ("DONE", "BUILD"):
556 self._insert_refresh(task)
557 # TODO add VM reset, get console, etc...
558 else:
559 raise vimconn.vimconnException(self.name + "unknown vim_action action {}".format(task["action"]))
560
561 def insert_task(self, task):
562 try:
563 self.task_queue.put(task, False)
564 return None
565 except Queue.Full:
566 raise vimconn.vimconnException(self.name + ": timeout inserting a task")
567
568 def del_task(self, task):
569 with self.task_lock:
570 if task["status"] == "SCHEDULED":
571 task["status"] == "SUPERSEDED"
572 return True
573 else: # task["status"] == "processing"
574 self.task_lock.release()
575 return False
576
577 def run(self):
578 self.logger.debug("Starting")
579 while True:
580 self._reload_vim_actions()
581 reload_thread = False
582 while True:
583 try:
584 while not self.task_queue.empty():
585 task = self.task_queue.get()
586 if isinstance(task, list):
587 self._insert_pending_tasks(task)
588 elif isinstance(task, str):
589 if task == 'exit':
590 return 0
591 elif task == 'reload':
592 reload_thread = True
593 break
594 self.task_queue.task_done()
595 if reload_thread:
596 break
597 nb_processed = self._proccess_pending_tasks()
598 nb_processed += self._refres_elements()
599 if not nb_processed:
600 time.sleep(1)
601
602 except Exception as e:
603 self.logger.critical("Unexpected exception at run: " + str(e), exc_info=True)
604
605 self.logger.debug("Finishing")
606
607 def terminate(self, task):
608 return True, None
609
610 def _look_for_task(self, instance_action_id, task_id):
611 task_index = task_id.split("-")[-1]
612 with self.db_lock:
613 tasks = self.db.get_rows(FROM="vim_actions", WHERE={"instance_action_id": instance_action_id,
614 "task_index": task_index})
615 if not tasks:
616 return None
617 task = tasks[0]
618 task["params"] = None
619 task["depends"] = {}
620 if task["extra"]:
621 extra = yaml.load(task["extra"])
622 task["extra"] = extra
623 task["params"] = extra.get("params")
624 if extra.get("interfaces"):
625 task["vim_interfaces"] = {}
626 else:
627 task["extra"] = {}
628 return task
629
630 def _format_vim_error_msg(self, error_text, max_length=1024):
631 if error_text and len(error_text) >= max_length:
632 return error_text[:max_length//2-3] + " ... " + error_text[-max_length//2+3:]
633 return error_text
634
635 def new_vm(self, task):
636 try:
637 params = task["params"]
638 task_id = task["instance_action_id"] + "." + str(task["task_index"])
639 depends = task.get("depends")
640 net_list = params[5]
641 error_text = ""
642 for net in net_list:
643 if "net_id" in net and is_task_id(net["net_id"]): # change task_id into network_id
644 if net["net_id"] in depends:
645 task_net = depends[net["net_id"]]
646 else:
647 task_net = self._look_for_task(task["instance_action_id"], net["net_id"])
648 if not task_net:
649 raise VimThreadException(
650 "Error trying to get depending task from task_index={}".format(net["net_id"]))
651 network_id = task_net.get("vim_id")
652 if not network_id:
653 raise VimThreadException(
654 "Cannot create VM because depends on a network not created or found: " +
655 str(task_net["error_msg"]))
656 net["net_id"] = network_id
657 vim_vm_id, created_items = self.vim.new_vminstance(*params)
658
659 # fill task_interfaces. Look for snd_net_id at database for each interface
660 task_interfaces = {}
661 for iface in net_list:
662 task_interfaces[iface["vim_id"]] = {"iface_id": iface["uuid"]}
663 with self.db_lock:
664 result = self.db.get_rows(SELECT=('sdn_net_id',),
665 FROM='instance_nets as ine join instance_interfaces as ii on ii.instance_net_id=ine.uuid',
666 WHERE={'ii.uuid': iface["uuid"]})
667 if result:
668 task_interfaces[iface["vim_id"]]["sdn_net_id"] = result[0]['sdn_net_id']
669 else:
670 self.logger.critical("Error creating VM, task=%s Network {} not found at DB", task_id,
671 iface["uuid"], exc_info=True)
672
673 task["vim_info"] = {}
674 task["vim_interfaces"] = {}
675 task["extra"]["interfaces"] = task_interfaces
676 task["extra"]["created"] = True
677 task["extra"]["created_items"] = created_items
678 task["error_msg"] = None
679 task["status"] = "DONE"
680 task["vim_id"] = vim_vm_id
681 instance_element_update = {"status": "BUILD", "vim_vm_id": vim_vm_id, "error_msg": None}
682 return True, instance_element_update
683
684 except (vimconn.vimconnException, VimThreadException) as e:
685 self.logger.error("Error creating VM, task=%s: %s", task_id, str(e))
686 error_text = self._format_vim_error_msg(str(e))
687 task["error_msg"] = error_text
688 task["status"] = "FAILED"
689 task["vim_id"] = None
690 instance_element_update = {"status": "VIM_ERROR", "vim_vm_id": None, "error_msg": error_text}
691 return False, instance_element_update
692
693 def del_vm(self, task):
694 vm_vim_id = task["vim_id"]
695 interfaces = task["extra"].get("interfaces", ())
696 try:
697 for iface in interfaces.values():
698 if iface.get("sdn_port_id"):
699 try:
700 with self.db_lock:
701 self.ovim.delete_port(iface["sdn_port_id"])
702 except ovimException as e:
703 self.logger.error("ovimException deleting external_port={} at VM vim_id={} deletion ".format(
704 iface["sdn_port_id"], vm_vim_id) + str(e), exc_info=True)
705 # TODO Set error_msg at instance_nets
706
707 self.vim.delete_vminstance(vm_vim_id, task["extra"].get("created_items"))
708 task["status"] = "DONE"
709 task["error_msg"] = None
710 return True, None
711
712 except vimconn.vimconnException as e:
713 task["error_msg"] = self._format_vim_error_msg(str(e))
714 if isinstance(e, vimconn.vimconnNotFoundException):
715 # If not found mark as Done and fill error_msg
716 task["status"] = "DONE"
717 return True, None
718 task["status"] = "FAILED"
719 return False, None
720
721 def _get_net_internal(self, task, filter_param):
722 """
723 Common code for get_net and new_net. It looks for a network on VIM with the filter_params
724 :param task: task for this find or find-or-create action
725 :param filter_param: parameters to send to the vimconnector
726 :return: a dict with the content to update the instance_nets database table. Raises an exception on error, or
727 when network is not found or found more than one
728 """
729 vim_nets = self.vim.get_network_list(filter_param)
730 if not vim_nets:
731 raise VimThreadExceptionNotFound("Network not found with this criteria: '{}'".format(filter))
732 elif len(vim_nets) > 1:
733 raise VimThreadException("More than one network found with this criteria: '{}'".format(filter))
734 vim_net_id = vim_nets[0]["id"]
735
736 # Discover if this network is managed by a sdn controller
737 sdn_net_id = None
738 with self.db_lock:
739 result = self.db.get_rows(SELECT=('sdn_net_id',), FROM='instance_nets',
740 WHERE={'vim_net_id': vim_net_id, 'instance_scenario_id': None,
741 'datacenter_tenant_id': self.datacenter_tenant_id})
742 if result:
743 sdn_net_id = result[0]['sdn_net_id']
744
745 task["status"] = "DONE"
746 task["extra"]["vim_info"] = {}
747 task["extra"]["created"] = False
748 task["extra"]["sdn_net_id"] = sdn_net_id
749 task["error_msg"] = None
750 task["vim_id"] = vim_net_id
751 instance_element_update = {"vim_net_id": vim_net_id, "created": False, "status": "BUILD",
752 "error_msg": None}
753 return instance_element_update
754
755 def get_net(self, task):
756 try:
757 task_id = task["instance_action_id"] + "." + str(task["task_index"])
758 params = task["params"]
759 filter_param = params[0]
760 instance_element_update = self._get_net_internal(task, filter_param)
761 return True, instance_element_update
762
763 except (vimconn.vimconnException, VimThreadException) as e:
764 self.logger.error("Error looking for NET, task=%s: %s", str(task_id), str(e))
765 task["status"] = "FAILED"
766 task["vim_id"] = None
767 task["error_msg"] = self._format_vim_error_msg(str(e))
768 instance_element_update = {"vim_net_id": None, "status": "VIM_ERROR",
769 "error_msg": task["error_msg"]}
770 return False, instance_element_update
771
772 def new_net(self, task):
773 try:
774 task_id = task["instance_action_id"] + "." + str(task["task_index"])
775 # FIND
776 if task["extra"].get("find"):
777 action_text = "finding"
778 filter_param = task["extra"]["find"][0]
779 try:
780 instance_element_update = self._get_net_internal(task, filter_param)
781 return True, instance_element_update
782 except VimThreadExceptionNotFound:
783 pass
784 # CREATE
785 params = task["params"]
786 action_text = "creating"
787 vim_net_id = self.vim.new_network(*params)
788
789 net_name = params[0]
790 net_type = params[1]
791
792 sdn_net_id = None
793 sdn_controller = self.vim.config.get('sdn-controller')
794 if sdn_controller and (net_type == "data" or net_type == "ptp"):
795 network = {"name": net_name, "type": net_type, "region": self.vim["config"]["datacenter_id"]}
796
797 vim_net = self.vim.get_network(vim_net_id)
798 if vim_net.get('encapsulation') != 'vlan':
799 raise vimconn.vimconnException(
800 "net '{}' defined as type '{}' has not vlan encapsulation '{}'".format(
801 net_name, net_type, vim_net['encapsulation']))
802 network["vlan"] = vim_net.get('segmentation_id')
803 try:
804 with self.db_lock:
805 sdn_net_id = self.ovim.new_network(network)
806 except (ovimException, Exception) as e:
807 self.logger.error("task=%s cannot create SDN network vim_net_id=%s input='%s' ovimException='%s'",
808 str(task_id), vim_net_id, str(network), str(e))
809 task["status"] = "DONE"
810 task["extra"]["vim_info"] = {}
811 task["extra"]["sdn_net_id"] = sdn_net_id
812 task["extra"]["created"] = True
813 task["error_msg"] = None
814 task["vim_id"] = vim_net_id
815 instance_element_update = {"vim_net_id": vim_net_id, "sdn_net_id": sdn_net_id, "status": "BUILD",
816 "created": True, "error_msg": None}
817 return True, instance_element_update
818 except vimconn.vimconnException as e:
819 self.logger.error("Error {} NET, task=%s: %s", action_text, str(task_id), str(e))
820 task["status"] = "FAILED"
821 task["vim_id"] = None
822 task["error_msg"] = self._format_vim_error_msg(str(e))
823 instance_element_update = {"vim_net_id": None, "sdn_net_id": None, "status": "VIM_ERROR",
824 "error_msg": task["error_msg"]}
825 return False, instance_element_update
826
827 def del_net(self, task):
828 net_vim_id = task["vim_id"]
829 sdn_net_id = task["extra"].get("sdn_net_id")
830 try:
831 if sdn_net_id:
832 # Delete any attached port to this sdn network. There can be ports associated to this network in case
833 # it was manually done using 'openmano vim-net-sdn-attach'
834 with self.db_lock:
835 port_list = self.ovim.get_ports(columns={'uuid'},
836 filter={'name': 'external_port', 'net_id': sdn_net_id})
837 for port in port_list:
838 self.ovim.delete_port(port['uuid'])
839 self.ovim.delete_network(sdn_net_id)
840 self.vim.delete_network(net_vim_id)
841 task["status"] = "DONE"
842 task["error_msg"] = None
843 return True, None
844 except ovimException as e:
845 task["error_msg"] = self._format_vim_error_msg("ovimException obtaining and deleting external "
846 "ports for net {}: {}".format(sdn_net_id, str(e)))
847 except vimconn.vimconnException as e:
848 task["error_msg"] = self._format_vim_error_msg(str(e))
849 if isinstance(e, vimconn.vimconnNotFoundException):
850 # If not found mark as Done and fill error_msg
851 task["status"] = "DONE"
852 return True, None
853 task["status"] = "FAILED"
854 return False, None