fix some more issues at SDN assist and deleting openstack ports
[osm/RO.git] / osm_ro / vim_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
5 # This file is part of openvim
6 # All Rights Reserved.
7 #
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
11 #
12 # http://www.apache.org/licenses/LICENSE-2.0
13 #
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
18 # under the License.
19 #
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
22 ##
23
24 """"
25 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
26 The tasks are stored at database in table vim_actions
27 The task content is (M: stored at memory, D: stored at database):
28 MD instance_action_id: reference a global action over an instance-scenario: database instance_actions
29 MD task_index: index number of the task. This together with the previous forms a unique key identifier
30 MD datacenter_vim_id: should contain the uuid of the VIM managed by this thread
31 MD vim_id: id of the vm,net,etc at VIM
32 MD action: CREATE, DELETE, FIND
33 MD item: database table name, can be instance_vms, instance_nets, TODO: datacenter_flavors, datacenter_images
34 MD item_id: uuid of the referenced entry in the previous table
35 MD status: SCHEDULED,BUILD,DONE,FAILED,SUPERSEDED
36 MD extra: text with yaml format at database, dict at memory with:
37 params: list with the params to be sent to the VIM for CREATE or FIND. For DELETE the vim_id is taken from other related tasks
38 find: (only for CREATE tasks) if present it should FIND before creating and use if existing. Contains the FIND params
39 depends_on: list with the 'task_index'es of tasks that must be completed before. e.g. a vm creation depends on a net creation
40 sdn_net_id: used for net.
41 tries:
42 interfaces: used for VMs. Each key is the uuid of the instance_interfaces entry at database
43 iface_id: uuid of intance_interfaces
44 sdn_port_id:
45 sdn_net_id:
46 created_items: dictionary with extra elements created that need to be deleted. e.g. ports, volumes,...
47 created: False if the VIM element is not created by other actions, and it should not be deleted
48 vim_status: VIM status of the element. Stored also at database in the instance_XXX
49 M depends: dict with task_index(from depends_on) to task class
50 M params: same as extra[params] but with the resolved dependencies
51 M vim_interfaces: similar to extra[interfaces] but with VIM information. Stored at database in the instance_XXX but not at vim_actions
52 M vim_info: Detailed information of a vm,net from the VIM. Stored at database in the instance_XXX but not at vim_actions
53 MD error_msg: descriptive text upon an error.Stored also at database instance_XXX
54 MD created_at: task creation time
55 MD modified_at: last task update time. On refresh it contains when this task need to be refreshed
56
57 """
58
59 import threading
60 import time
61 import Queue
62 import logging
63 import vimconn
64 import yaml
65 from db_base import db_base_Exception
66 from lib_osm_openvim.ovim import ovimException
67
68 __author__ = "Alfonso Tierno, Pablo Montes"
69 __date__ = "$28-Sep-2017 12:07:15$"
70
71
72 def is_task_id(task_id):
73 return task_id.startswith("TASK-")
74
75
76 class VimThreadException(Exception):
77 pass
78
79
80 class VimThreadExceptionNotFound(VimThreadException):
81 pass
82
83
84 class vim_thread(threading.Thread):
85 REFRESH_BUILD = 5 # 5 seconds
86 REFRESH_ACTIVE = 60 # 1 minute
87
88 def __init__(self, myvimconn, task_lock, name=None, datacenter_name=None, datacenter_tenant_id=None,
89 db=None, db_lock=None, ovim=None):
90 """Init a thread.
91 Arguments:
92 'id' number of thead
93 'name' name of thread
94 'host','user': host ip or name to manage and user
95 'db', 'db_lock': database class and lock to use it in exclusion
96 """
97 threading.Thread.__init__(self)
98 if isinstance(myvimconn, vimconn.vimconnException):
99 self.vim = None
100 self.error_status = "Error accesing to VIM: {}".format(myvimconn)
101 else:
102 self.vim = myvimconn
103 self.error_status = None
104 self.datacenter_name = datacenter_name
105 self.datacenter_tenant_id = datacenter_tenant_id
106 self.ovim = ovim
107 if not name:
108 self.name = vimconn["id"] + "." + vimconn["config"]["datacenter_tenant_id"]
109 else:
110 self.name = name
111
112 self.logger = logging.getLogger('openmano.vim.'+self.name)
113 self.db = db
114 self.db_lock = db_lock
115
116 self.task_lock = task_lock
117 self.task_queue = Queue.Queue(2000)
118
119 self.refresh_tasks = []
120 """Contains time ordered task list for refreshing the status of VIM VMs and nets"""
121
122 self.pending_tasks = []
123 """Contains time ordered task list for creation, deletion of VIM VMs and nets"""
124
125 self.grouped_tasks = {}
126 """ It contains all the creation/deletion pending tasks grouped by its concrete vm, net, etc
127 <item><item_id>:
128 - <task1> # e.g. CREATE task
129 <task2> # e.g. DELETE task
130 """
131
132 def _reload_vim_actions(self):
133 """
134 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
135 :return: None
136 """
137 action_completed = False
138 task_list = []
139 old_action_key = None
140
141 old_item_id = ""
142 old_item = ""
143 old_created_at = 0.0
144 database_limit = 200
145 while True:
146 # get 200 (database_limit) entries each time
147 with self.db_lock:
148 vim_actions = self.db.get_rows(FROM="vim_actions",
149 WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
150 "item_id>=": old_item_id},
151 ORDER_BY=("item_id", "item", "created_at",),
152 LIMIT=database_limit)
153 for task in vim_actions:
154 item = task["item"]
155 item_id = task["item_id"]
156
157 # skip the first entries that are already processed in the previous pool of 200
158 if old_item_id:
159 if item_id == old_item_id and item == old_item and task["created_at"] == old_created_at:
160 old_item_id = False # next one will be a new un-processed task
161 continue
162
163 action_key = item + item_id
164 if old_action_key != action_key:
165 if not action_completed and task_list:
166 # This will fill needed task parameters into memory, and insert the task if needed in
167 # self.pending_tasks or self.refresh_tasks
168 self._insert_pending_tasks(task_list)
169 task_list = []
170 old_action_key = action_key
171 action_completed = False
172 elif action_completed:
173 continue
174
175 if task["status"] == "SCHEDULED" or task["action"] == "CREATE" or task["action"] == "FIND":
176 task_list.append(task)
177 elif task["action"] == "DELETE":
178 # action completed because deleted and status is not SCHEDULED. Not needed anything
179 action_completed = True
180 if len(vim_actions) == database_limit:
181 # update variables for get the next database iteration
182 old_item_id = item_id
183 old_item = item
184 old_created_at = task["created_at"]
185 else:
186 break
187 # Last actions group need to be inserted too
188 if not action_completed and task_list:
189 self._insert_pending_tasks(task_list)
190 self.logger.debug("reloaded vim actions pending:{} refresh:{}".format(
191 len(self.pending_tasks), len(self.refresh_tasks)))
192
193 def _refres_elements(self):
194 """Call VIM to get VMs and networks status until 10 elements"""
195 now = time.time()
196 nb_processed = 0
197 vm_to_refresh_list = []
198 net_to_refresh_list = []
199 vm_to_refresh_dict = {}
200 net_to_refresh_dict = {}
201 items_to_refresh = 0
202 while self.refresh_tasks:
203 task = self.refresh_tasks[0]
204 with self.task_lock:
205 if task['status'] == 'SUPERSEDED':
206 self.refresh_tasks.pop(0)
207 continue
208 if task['modified_at'] > now:
209 break
210 # task["status"] = "processing"
211 nb_processed += 1
212 self.refresh_tasks.pop(0)
213 if task["item"] == 'instance_vms':
214 vm_to_refresh_list.append(task["vim_id"])
215 vm_to_refresh_dict[task["vim_id"]] = task
216 elif task["item"] == 'instance_nets':
217 net_to_refresh_list.append(task["vim_id"])
218 net_to_refresh_dict[task["vim_id"]] = task
219 else:
220 error_text = "unknown task {}".format(task["item"])
221 self.logger.error(error_text)
222 items_to_refresh += 1
223 if items_to_refresh == 10:
224 break
225
226 if vm_to_refresh_list:
227 now = time.time()
228 try:
229 vim_dict = self.vim.refresh_vms_status(vm_to_refresh_list)
230 except vimconn.vimconnException as e:
231 # Mark all tasks at VIM_ERROR status
232 self.logger.error("vimconnException Exception when trying to refresh vms " + str(e))
233 vim_dict = {}
234 for vim_id in vm_to_refresh_list:
235 vim_dict[vim_id] = {"status": "VIM_ERROR", "error_msg": str(e)}
236
237 for vim_id, vim_info in vim_dict.items():
238 # look for task
239 task_need_update = False
240 task = vm_to_refresh_dict[vim_id]
241 self.logger.debug("get-vm vm_id=%s result=%s", task["vim_id"], str(vim_info))
242
243 # check and update interfaces
244 task_warning_msg = ""
245 for interface in vim_info.get("interfaces", ()):
246 vim_interface_id = interface["vim_interface_id"]
247 if vim_interface_id not in task["extra"]["interfaces"]:
248 self.logger.critical("Interface not found {} on task info {}".format(
249 vim_interface_id, task["extra"]["interfaces"]), exc_info=True)
250 continue
251 task_interface = task["extra"]["interfaces"][vim_interface_id]
252 task_vim_interface = task["vim_interfaces"].get(vim_interface_id)
253 if task_vim_interface != interface:
254 # delete old port
255 if task_interface.get("sdn_port_id"):
256 try:
257 with self.db_lock:
258 self.ovim.delete_port(task_interface["sdn_port_id"])
259 task_interface["sdn_port_id"] = None
260 task_need_update = True
261 except ovimException as e:
262 error_text =" ovimException deleting external_port={} {}".format(
263 task_interface["sdn_port_id"], e)
264 self.logger.error(error_text, exc_info=True)
265 task_warning_msg += error_text
266 # TODO Set error_msg at instance_nets instead of instance VMs
267
268 # Create SDN port
269 sdn_net_id = task_interface.get("sdn_net_id")
270 if sdn_net_id and interface.get("compute_node") and interface.get("pci"):
271 sdn_port_name = sdn_net_id + "." + task["vim_id"]
272 sdn_port_name = sdn_port_name[:63]
273 try:
274 with self.db_lock:
275 sdn_port_id = self.ovim.new_external_port(
276 {"compute_node": interface["compute_node"],
277 "pci": interface["pci"],
278 "vlan": interface.get("vlan"),
279 "net_id": sdn_net_id,
280 "region": self.vim["config"]["datacenter_id"],
281 "name": sdn_port_name,
282 "mac": interface.get("mac_address")})
283 task_interface["sdn_port_id"] = sdn_port_id
284 task_need_update = True
285 except (ovimException, Exception) as e:
286 error_text = "ovimException creating new_external_port compute_node={} "\
287 "pci={} vlan={} {}".format(
288 interface["compute_node"],
289 interface["pci"],
290 interface.get("vlan"), e)
291 self.logger.error(error_text, exc_info=True)
292 task_warning_msg += error_text
293 # TODO Set error_msg at instance_nets instead of instance VMs
294
295 with self.db_lock:
296 self.db.update_rows(
297 'instance_interfaces',
298 UPDATE={"mac_address": interface.get("mac_address"),
299 "ip_address": interface.get("ip_address"),
300 "vim_info": interface.get("vim_info"),
301 "sdn_port_id": task_interface.get("sdn_port_id"),
302 "compute_node": interface.get("compute_node"),
303 "pci": interface.get("pci"),
304 "vlan": interface.get("vlan"),
305 },
306 WHERE={'uuid': task_interface["iface_id"]})
307 task["vim_interfaces"][vim_interface_id] = interface
308
309 # check and update task and instance_vms database
310 if vim_info.get("error_msg"):
311 vim_info["error_msg"] = self._format_vim_error_msg(vim_info["error_msg"] + task_warning_msg)
312 elif task_warning_msg:
313 vim_info["error_msg"] = self._format_vim_error_msg(task_warning_msg)
314
315 task_vim_info = task.get("vim_info")
316 task_error_msg = task.get("error_msg")
317 task_vim_status = task["extra"].get("vim_status")
318 if task_vim_status != vim_info["status"] or task_error_msg != vim_info.get("error_msg") or \
319 (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
320 temp_dict = {"status": vim_info["status"], "error_msg": vim_info.get("error_msg")}
321 if vim_info.get("vim_info"):
322 temp_dict["vim_info"] = vim_info["vim_info"]
323 with self.db_lock:
324 self.db.update_rows('instance_vms', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]})
325 task["extra"]["vim_status"] = vim_info["status"]
326 task["error_msg"] = vim_info.get("error_msg")
327 if vim_info.get("vim_info"):
328 task["vim_info"] = vim_info["vim_info"]
329 task_need_update = True
330
331 if task_need_update:
332 with self.db_lock:
333 self.db.update_rows(
334 'vim_actions',
335 UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256),
336 "error_msg": task.get("error_msg"), "modified_at": now},
337 WHERE={'instance_action_id': task['instance_action_id'],
338 'task_index': task['task_index']})
339 if task["extra"].get("vim_status") == "BUILD":
340 self._insert_refresh(task, now + self.REFRESH_BUILD)
341 else:
342 self._insert_refresh(task, now + self.REFRESH_ACTIVE)
343
344 if net_to_refresh_list:
345 now = time.time()
346 try:
347 vim_dict = self.vim.refresh_nets_status(net_to_refresh_list)
348 except vimconn.vimconnException as e:
349 # Mark all tasks at VIM_ERROR status
350 self.logger.error("vimconnException Exception when trying to refresh nets " + str(e))
351 vim_dict = {}
352 for vim_id in net_to_refresh_list:
353 vim_dict[vim_id] = {"status": "VIM_ERROR", "error_msg": str(e)}
354
355 for vim_id, vim_info in vim_dict.items():
356 # look for task
357 task = net_to_refresh_dict[vim_id]
358 self.logger.debug("get-net net_id=%s result=%s", task["vim_id"], str(vim_info))
359
360 task_vim_info = task.get("vim_info")
361 task_vim_status = task["extra"].get("vim_status")
362 task_error_msg = task.get("error_msg")
363 task_sdn_net_id = task["extra"].get("sdn_net_id")
364
365 # get ovim status
366 if task_sdn_net_id:
367 try:
368 with self.db_lock:
369 sdn_net = self.ovim.show_network(task_sdn_net_id)
370 except (ovimException, Exception) as e:
371 text_error = "ovimException getting network infor snd_net_id={}".format(task_sdn_net_id)
372 self.logger.error(text_error, exc_info = True)
373 sdn_net = {"status": "ERROR", "error_msg": text_error}
374 if sdn_net["status"] == "ERROR":
375 if not vim_info.get("error_msg"):
376 vim_info["error_msg"] = sdn_net["error_msg"]
377 else:
378 vim_info["error_msg"] = "VIM_ERROR: {} && SDN_ERROR: {}".format(
379 self._format_vim_error_msg(vim_info["error_msg"], 1024//2-14),
380 self._format_vim_error_msg(sdn_net["error_msg"], 1024//2-14))
381 if vim_info["status"] == "VIM_ERROR":
382 vim_info["status"] = "VIM_SDN_ERROR"
383 else:
384 vim_info["status"] = "SDN_ERROR"
385
386 # update database
387 if vim_info.get("error_msg"):
388 vim_info["error_msg"] = self._format_vim_error_msg(vim_info["error_msg"])
389 if task_vim_status != vim_info["status"] or task_error_msg != vim_info.get("error_msg") or \
390 (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
391 task["extra"]["vim_status"] = vim_info["status"]
392 task["error_msg"] = vim_info.get("error_msg")
393 if vim_info.get("vim_info"):
394 task["vim_info"] = vim_info["vim_info"]
395 temp_dict = {"status": vim_info["status"], "error_msg": vim_info.get("error_msg")}
396 if vim_info.get("vim_info"):
397 temp_dict["vim_info"] = vim_info["vim_info"]
398 with self.db_lock:
399 self.db.update_rows('instance_nets', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]})
400 self.db.update_rows(
401 'vim_actions',
402 UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256),
403 "error_msg": task.get("error_msg"), "modified_at": now},
404 WHERE={'instance_action_id': task['instance_action_id'],
405 'task_index': task['task_index']})
406 if task["extra"].get("vim_status") == "BUILD":
407 self._insert_refresh(task, now + self.REFRESH_BUILD)
408 else:
409 self._insert_refresh(task, now + self.REFRESH_ACTIVE)
410
411 return nb_processed
412
413 def _insert_refresh(self, task, threshold_time=None):
414 """Insert a task at list of refreshing elements. The refreshing list is ordered by threshold_time (task['modified_at']
415 It is assumed that this is called inside this thread
416 """
417 if not self.vim:
418 return
419 if not threshold_time:
420 threshold_time = time.time()
421 task["modified_at"] = threshold_time
422 task_name = task["item"][9:] + "-" + task["action"]
423 task_id = task["instance_action_id"] + "." + str(task["task_index"])
424 for index in range(0, len(self.refresh_tasks)):
425 if self.refresh_tasks[index]["modified_at"] > threshold_time:
426 self.refresh_tasks.insert(index, task)
427 break
428 else:
429 index = len(self.refresh_tasks)
430 self.refresh_tasks.append(task)
431 self.logger.debug("new refresh task={} name={}, modified_at={} index={}".format(
432 task_id, task_name, task["modified_at"], index))
433
434 def _remove_refresh(self, task_name, vim_id):
435 """Remove a task with this name and vim_id from the list of refreshing elements.
436 It is assumed that this is called inside this thread outside _refres_elements method
437 Return True if self.refresh_list is modified, task is found
438 Return False if not found
439 """
440 index_to_delete = None
441 for index in range(0, len(self.refresh_tasks)):
442 if self.refresh_tasks[index]["name"] == task_name and self.refresh_tasks[index]["vim_id"] == vim_id:
443 index_to_delete = index
444 break
445 else:
446 return False
447 if index_to_delete != None:
448 del self.refresh_tasks[index_to_delete]
449 return True
450
451 def _proccess_pending_tasks(self):
452 nb_created = 0
453 nb_processed = 0
454 while self.pending_tasks:
455 task = self.pending_tasks.pop(0)
456 nb_processed += 1
457 try:
458 # check if tasks that this depends on have been completed
459 dependency_not_completed = False
460 for task_index in task["extra"].get("depends_on", ()):
461 task_dependency = task["depends"].get("TASK-" + str(task_index))
462 if not task_dependency:
463 task_dependency = self._look_for_task(task["instance_action_id"], "TASK-" + str(task_index))
464 if not task_dependency:
465 raise VimThreadException(
466 "Cannot get depending net task trying to get depending task {}.{}".format(
467 task["instance_action_id"], task_index))
468 # task["depends"]["TASK-" + str(task_index)] = task_dependency #it references another object,so database must be look again
469 if task_dependency["status"] == "SCHEDULED":
470 dependency_not_completed = True
471 break
472 elif task_dependency["status"] == "FAILED":
473 raise VimThreadException(
474 "Cannot {} {}, (task {}.{}) because depends on failed {} {}, (task{}.{})".format(
475 task["action"], task["item"],
476 task["instance_action_id"], task["task_index"],
477 task_dependency["instance_action_id"], task_dependency["task_index"],
478 task_dependency["action"], task_dependency["item"]))
479 if dependency_not_completed:
480 # Move this task to the end.
481 task["extra"]["tries"] = task["extra"].get("tries", 0) + 1
482 if task["extra"]["tries"] <= 3:
483 self.pending_tasks.append(task)
484 continue
485 else:
486 raise VimThreadException(
487 "Cannot {} {}, (task {}.{}) because timeout waiting to complete {} {}, "
488 "(task {}.{})".format(task["action"], task["item"],
489 task["instance_action_id"], task["task_index"],
490 task_dependency["instance_action_id"], task_dependency["task_index"],
491 task_dependency["action"], task_dependency["item"]))
492
493 if task["status"] == "SUPERSEDED":
494 # not needed to do anything but update database with the new status
495 result = True
496 database_update = None
497 elif not self.vim:
498 task["status"] == "ERROR"
499 task["error_msg"] = self.error_status
500 result = False
501 database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]}
502 elif task["item"] == 'instance_vms':
503 if task["action"] == "CREATE":
504 result, database_update = self.new_vm(task)
505 nb_created += 1
506 elif task["action"] == "DELETE":
507 result, database_update = self.del_vm(task)
508 else:
509 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
510 elif task["item"] == 'instance_nets':
511 if task["action"] == "CREATE":
512 result, database_update = self.new_net(task)
513 nb_created += 1
514 elif task["action"] == "DELETE":
515 result, database_update = self.del_net(task)
516 elif task["action"] == "FIND":
517 result, database_update = self.get_net(task)
518 else:
519 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
520 else:
521 raise vimconn.vimconnException(self.name + "unknown task item {}".format(task["item"]))
522 # TODO
523 except VimThreadException as e:
524 result = False
525 task["error_msg"] = str(e)
526 task["status"] = "FAILED"
527 database_update = {"status": "VIM_ERROR", "vim_vm_id": None, "error_msg": task["error_msg"]}
528
529 if task["action"] == "DELETE":
530 action_key = task["item"] + task["item_id"]
531 del self.grouped_tasks[action_key]
532 elif task["action"] in ("CREATE", "FIND") and task["status"] in ("DONE", "BUILD"):
533 self._insert_refresh(task)
534
535 self.logger.debug("vim_action id={}.{} item={} action={} result={}:{} params={}".format(
536 task["instance_action_id"], task["task_index"], task["item"], task["action"],
537 task["status"], task["vim_id"] if task["status"] == "DONE" else task.get("error_msg"),
538 task["params"]))
539 try:
540 now = time.time()
541 with self.db_lock:
542 self.db.update_rows(
543 table="vim_actions",
544 UPDATE={"status": task["status"], "vim_id": task.get("vim_id"), "modified_at": now,
545 "error_msg": task["error_msg"],
546 "extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256)},
547 WHERE={"instance_action_id": task["instance_action_id"], "task_index": task["task_index"]})
548 if result is not None:
549 self.db.update_rows(
550 table="instance_actions",
551 UPDATE={("number_done" if result else "number_failed"): {"INCREMENT": 1},
552 "modified_at": now},
553 WHERE={"uuid": task["instance_action_id"]})
554 if database_update:
555 self.db.update_rows(table=task["item"],
556 UPDATE=database_update,
557 WHERE={"uuid": task["item_id"]})
558 except db_base_Exception as e:
559 self.logger.error("Error updating database %s", str(e), exc_info=True)
560
561 if nb_created == 10:
562 break
563 return nb_processed
564
565 def _insert_pending_tasks(self, vim_actions_list):
566 now = time.time()
567 for task in vim_actions_list:
568 if task["datacenter_vim_id"] != self.datacenter_tenant_id:
569 continue
570 item = task["item"]
571 item_id = task["item_id"]
572 action_key = item + item_id
573 if action_key not in self.grouped_tasks:
574 self.grouped_tasks[action_key] = []
575 task["params"] = None
576 task["depends"] = {}
577 if task["extra"]:
578 extra = yaml.load(task["extra"])
579 task["extra"] = extra
580 task["params"] = extra.get("params")
581 depends_on_list = extra.get("depends_on")
582 if depends_on_list:
583 for index in depends_on_list:
584 if index < len(vim_actions_list) and vim_actions_list[index]["task_index"] == index and\
585 vim_actions_list[index]["instance_action_id"] == task["instance_action_id"]:
586 task["depends"]["TASK-" + str(index)] = vim_actions_list[index]
587 if extra.get("interfaces"):
588 task["vim_interfaces"] = {}
589 else:
590 task["extra"] = {}
591 if "error_msg" not in task:
592 task["error_msg"] = None
593 if "vim_id" not in task:
594 task["vim_id"] = None
595
596 if task["action"] == "DELETE":
597 need_delete_action = False
598 for to_supersede in self.grouped_tasks.get(action_key, ()):
599 if to_supersede["action"] == "FIND" and to_supersede.get("vim_id"):
600 task["vim_id"] = to_supersede["vim_id"]
601 if to_supersede["action"] == "CREATE" and to_supersede["extra"].get("created", True) and \
602 (to_supersede.get("vim_id") or to_supersede["extra"].get("sdn_net_id")):
603 need_delete_action = True
604 task["vim_id"] = to_supersede["vim_id"]
605 if to_supersede["extra"].get("sdn_net_id"):
606 task["extra"]["sdn_net_id"] = to_supersede["extra"]["sdn_net_id"]
607 if to_supersede["extra"].get("interfaces"):
608 task["extra"]["interfaces"] = to_supersede["extra"]["interfaces"]
609 if to_supersede["extra"].get("created_items"):
610 if not task["extra"].get("created_items"):
611 task["extra"]["created_items"] = {}
612 task["extra"]["created_items"].update(to_supersede["extra"]["created_items"])
613 # Mark task as SUPERSEDED.
614 # If task is in self.pending_tasks, it will be removed and database will be update
615 # If task is in self.refresh_tasks, it will be removed
616 to_supersede["status"] = "SUPERSEDED"
617 if not need_delete_action:
618 task["status"] = "SUPERSEDED"
619
620 self.grouped_tasks[action_key].append(task)
621 self.pending_tasks.append(task)
622 elif task["status"] == "SCHEDULED":
623 self.grouped_tasks[action_key].append(task)
624 self.pending_tasks.append(task)
625 elif task["action"] in ("CREATE", "FIND"):
626 self.grouped_tasks[action_key].append(task)
627 if task["status"] in ("DONE", "BUILD"):
628 self._insert_refresh(task)
629 # TODO add VM reset, get console, etc...
630 else:
631 raise vimconn.vimconnException(self.name + "unknown vim_action action {}".format(task["action"]))
632
633 def insert_task(self, task):
634 try:
635 self.task_queue.put(task, False)
636 return None
637 except Queue.Full:
638 raise vimconn.vimconnException(self.name + ": timeout inserting a task")
639
640 def del_task(self, task):
641 with self.task_lock:
642 if task["status"] == "SCHEDULED":
643 task["status"] == "SUPERSEDED"
644 return True
645 else: # task["status"] == "processing"
646 self.task_lock.release()
647 return False
648
649 def run(self):
650 self.logger.debug("Starting")
651 while True:
652 self._reload_vim_actions()
653 reload_thread = False
654 while True:
655 try:
656 while not self.task_queue.empty():
657 task = self.task_queue.get()
658 if isinstance(task, list):
659 self._insert_pending_tasks(task)
660 elif isinstance(task, str):
661 if task == 'exit':
662 return 0
663 elif task == 'reload':
664 reload_thread = True
665 break
666 self.task_queue.task_done()
667 if reload_thread:
668 break
669 nb_processed = self._proccess_pending_tasks()
670 nb_processed += self._refres_elements()
671 if not nb_processed:
672 time.sleep(1)
673
674 except Exception as e:
675 self.logger.critical("Unexpected exception at run: " + str(e), exc_info=True)
676
677 self.logger.debug("Finishing")
678
679 def terminate(self, task):
680 return True, None
681
682 def _look_for_task(self, instance_action_id, task_id):
683 task_index = task_id.split("-")[-1]
684 with self.db_lock:
685 tasks = self.db.get_rows(FROM="vim_actions", WHERE={"instance_action_id": instance_action_id,
686 "task_index": task_index})
687 if not tasks:
688 return None
689 task = tasks[0]
690 task["params"] = None
691 task["depends"] = {}
692 if task["extra"]:
693 extra = yaml.load(task["extra"])
694 task["extra"] = extra
695 task["params"] = extra.get("params")
696 if extra.get("interfaces"):
697 task["vim_interfaces"] = {}
698 else:
699 task["extra"] = {}
700 return task
701
702 def _format_vim_error_msg(self, error_text, max_length=1024):
703 if error_text and len(error_text) >= max_length:
704 return error_text[:max_length//2-3] + " ... " + error_text[-max_length//2+3:]
705 return error_text
706
707 def new_vm(self, task):
708 try:
709 params = task["params"]
710 task_id = task["instance_action_id"] + "." + str(task["task_index"])
711 depends = task.get("depends")
712 net_list = params[5]
713 error_text = ""
714 for net in net_list:
715 if "net_id" in net and is_task_id(net["net_id"]): # change task_id into network_id
716 task_dependency = task["depends"].get(net["net_id"])
717 if not task_dependency:
718 task_dependency = self._look_for_task(task["instance_action_id"], net["net_id"])
719 if not task_dependency:
720 raise VimThreadException(
721 "Cannot get depending net task trying to get depending task {}.{}".format(
722 task["instance_action_id"], net["net_id"]))
723 network_id = task_dependency.get("vim_id")
724 if not network_id:
725 raise VimThreadException(
726 "Cannot create VM because depends on a network not created or found: " +
727 str(depends[net["net_id"]]["error_msg"]))
728 net["net_id"] = network_id
729 vim_vm_id, created_items = self.vim.new_vminstance(*params)
730
731 # fill task_interfaces. Look for snd_net_id at database for each interface
732 task_interfaces = {}
733 for iface in net_list:
734 task_interfaces[iface["vim_id"]] = {"iface_id": iface["uuid"]}
735 with self.db_lock:
736 result = self.db.get_rows(SELECT=('sdn_net_id',),
737 FROM='instance_nets as ine join instance_interfaces as ii on ii.instance_net_id=ine.uuid',
738 WHERE={'ii.uuid': iface["uuid"]})
739 if result:
740 task_interfaces[iface["vim_id"]]["sdn_net_id"] = result[0]['sdn_net_id']
741 else:
742 self.logger.critical("Error creating VM, task=%s Network {} not found at DB", task_id,
743 iface["uuid"], exc_info=True)
744
745 task["vim_info"] = {}
746 task["vim_interfaces"] = {}
747 task["extra"]["interfaces"] = task_interfaces
748 task["extra"]["created"] = True
749 task["extra"]["created_items"] = created_items
750 task["error_msg"] = None
751 task["status"] = "DONE"
752 task["vim_id"] = vim_vm_id
753 instance_element_update = {"status": "BUILD", "vim_vm_id": vim_vm_id, "error_msg": None}
754 return True, instance_element_update
755
756 except (vimconn.vimconnException, VimThreadException) as e:
757 self.logger.error("Error creating VM, task=%s: %s", task_id, str(e))
758 error_text = self._format_vim_error_msg(str(e))
759 task["error_msg"] = error_text
760 task["status"] = "FAILED"
761 task["vim_id"] = None
762 instance_element_update = {"status": "VIM_ERROR", "vim_vm_id": None, "error_msg": error_text}
763 return False, instance_element_update
764
765 def del_vm(self, task):
766 vm_vim_id = task["vim_id"]
767 interfaces = task["extra"].get("interfaces", ())
768 try:
769 for iface in interfaces.values():
770 if iface.get("sdn_port_id"):
771 try:
772 with self.db_lock:
773 self.ovim.delete_port(iface["sdn_port_id"])
774 except ovimException as e:
775 self.logger.error("ovimException deleting external_port={} at VM vim_id={} deletion ".format(
776 iface["sdn_port_id"], vm_vim_id) + str(e), exc_info=True)
777 # TODO Set error_msg at instance_nets
778
779 self.vim.delete_vminstance(vm_vim_id, task["extra"].get("created_items"))
780 task["status"] = "DONE"
781 task["error_msg"] = None
782 return True, None
783
784 except vimconn.vimconnException as e:
785 task["error_msg"] = self._format_vim_error_msg(str(e))
786 if isinstance(e, vimconn.vimconnNotFoundException):
787 # If not found mark as Done and fill error_msg
788 task["status"] = "DONE"
789 return True, None
790 task["status"] = "FAILED"
791 return False, None
792
793 def _get_net_internal(self, task, filter_param):
794 """
795 Common code for get_net and new_net. It looks for a network on VIM with the filter_params
796 :param task: task for this find or find-or-create action
797 :param filter_param: parameters to send to the vimconnector
798 :return: a dict with the content to update the instance_nets database table. Raises an exception on error, or
799 when network is not found or found more than one
800 """
801 vim_nets = self.vim.get_network_list(filter_param)
802 if not vim_nets:
803 raise VimThreadExceptionNotFound("Network not found with this criteria: '{}'".format(filter_param))
804 elif len(vim_nets) > 1:
805 raise VimThreadException("More than one network found with this criteria: '{}'".format(filter_param))
806 vim_net_id = vim_nets[0]["id"]
807
808 # Discover if this network is managed by a sdn controller
809 sdn_net_id = None
810 with self.db_lock:
811 result = self.db.get_rows(SELECT=('sdn_net_id',), FROM='instance_nets',
812 WHERE={'vim_net_id': vim_net_id, 'instance_scenario_id': None,
813 'datacenter_tenant_id': self.datacenter_tenant_id})
814 if result:
815 sdn_net_id = result[0]['sdn_net_id']
816
817 task["status"] = "DONE"
818 task["extra"]["vim_info"] = {}
819 task["extra"]["created"] = False
820 task["extra"]["sdn_net_id"] = sdn_net_id
821 task["error_msg"] = None
822 task["vim_id"] = vim_net_id
823 instance_element_update = {"vim_net_id": vim_net_id, "created": False, "status": "BUILD",
824 "error_msg": None, "sdn_net_id": sdn_net_id}
825 return instance_element_update
826
827 def get_net(self, task):
828 try:
829 task_id = task["instance_action_id"] + "." + str(task["task_index"])
830 params = task["params"]
831 filter_param = params[0]
832 instance_element_update = self._get_net_internal(task, filter_param)
833 return True, instance_element_update
834
835 except (vimconn.vimconnException, VimThreadException) as e:
836 self.logger.error("Error looking for NET, task=%s: %s", str(task_id), str(e))
837 task["status"] = "FAILED"
838 task["vim_id"] = None
839 task["error_msg"] = self._format_vim_error_msg(str(e))
840 instance_element_update = {"vim_net_id": None, "status": "VIM_ERROR",
841 "error_msg": task["error_msg"]}
842 return False, instance_element_update
843
844 def new_net(self, task):
845 vim_net_id = None
846 sdn_net_id = None
847 try:
848 task_id = task["instance_action_id"] + "." + str(task["task_index"])
849 # FIND
850 if task["extra"].get("find"):
851 action_text = "finding"
852 filter_param = task["extra"]["find"][0]
853 try:
854 instance_element_update = self._get_net_internal(task, filter_param)
855 return True, instance_element_update
856 except VimThreadExceptionNotFound:
857 pass
858 # CREATE
859 params = task["params"]
860 action_text = "creating VIM"
861 vim_net_id = self.vim.new_network(*params)
862
863 net_name = params[0]
864 net_type = params[1]
865
866 sdn_controller = self.vim.config.get('sdn-controller')
867 if sdn_controller and (net_type == "data" or net_type == "ptp"):
868 network = {"name": net_name, "type": net_type, "region": self.vim["config"]["datacenter_id"]}
869
870 vim_net = self.vim.get_network(vim_net_id)
871 if vim_net.get('encapsulation') != 'vlan':
872 raise vimconn.vimconnException(
873 "net '{}' defined as type '{}' has not vlan encapsulation '{}'".format(
874 net_name, net_type, vim_net['encapsulation']))
875 network["vlan"] = vim_net.get('segmentation_id')
876 action_text = "creating SDN"
877 with self.db_lock:
878 sdn_net_id = self.ovim.new_network(network)
879 task["status"] = "DONE"
880 task["extra"]["vim_info"] = {}
881 task["extra"]["sdn_net_id"] = sdn_net_id
882 task["extra"]["created"] = True
883 task["error_msg"] = None
884 task["vim_id"] = vim_net_id
885 instance_element_update = {"vim_net_id": vim_net_id, "sdn_net_id": sdn_net_id, "status": "BUILD",
886 "created": True, "error_msg": None}
887 return True, instance_element_update
888 except (vimconn.vimconnException, ovimException) as e:
889 self.logger.error("Error {} NET, task={}: {}".format(action_text, task_id, e))
890 task["status"] = "FAILED"
891 task["vim_id"] = vim_net_id
892 task["error_msg"] = self._format_vim_error_msg(str(e))
893 task["extra"]["sdn_net_id"] = sdn_net_id
894 instance_element_update = {"vim_net_id": vim_net_id, "sdn_net_id": sdn_net_id, "status": "VIM_ERROR",
895 "error_msg": task["error_msg"]}
896 return False, instance_element_update
897
898 def del_net(self, task):
899 net_vim_id = task["vim_id"]
900 sdn_net_id = task["extra"].get("sdn_net_id")
901 try:
902 if sdn_net_id:
903 # Delete any attached port to this sdn network. There can be ports associated to this network in case
904 # it was manually done using 'openmano vim-net-sdn-attach'
905 with self.db_lock:
906 port_list = self.ovim.get_ports(columns={'uuid'},
907 filter={'name': 'external_port', 'net_id': sdn_net_id})
908 for port in port_list:
909 self.ovim.delete_port(port['uuid'])
910 self.ovim.delete_network(sdn_net_id)
911 if net_vim_id:
912 self.vim.delete_network(net_vim_id)
913 task["status"] = "DONE"
914 task["error_msg"] = None
915 return True, None
916 except ovimException as e:
917 task["error_msg"] = self._format_vim_error_msg("ovimException obtaining and deleting external "
918 "ports for net {}: {}".format(sdn_net_id, str(e)))
919 except vimconn.vimconnException as e:
920 task["error_msg"] = self._format_vim_error_msg(str(e))
921 if isinstance(e, vimconn.vimconnNotFoundException):
922 # If not found mark as Done and fill error_msg
923 task["status"] = "DONE"
924 return True, None
925 task["status"] = "FAILED"
926 return False, None