Avoid database growth by cleaning old vim_actions
[osm/RO.git] / osm_ro / vim_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
5 # This file is part of openvim
6 # All Rights Reserved.
7 #
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
11 #
12 # http://www.apache.org/licenses/LICENSE-2.0
13 #
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
18 # under the License.
19 #
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
22 ##
23
24 """"
25 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
26 The tasks are stored at database in table vim_actions
27 The task content is (M: stored at memory, D: stored at database):
28 MD instance_action_id: reference a global action over an instance-scenario: database instance_actions
29 MD task_index: index number of the task. This together with the previous forms a unique key identifier
30 MD datacenter_vim_id: should contain the uuid of the VIM managed by this thread
31 MD vim_id: id of the vm,net,etc at VIM
32 MD action: CREATE, DELETE, FIND
33 MD item: database table name, can be instance_vms, instance_nets, TODO: datacenter_flavors, datacenter_images
34 MD item_id: uuid of the referenced entry in the previous table
35 MD status: SCHEDULED,BUILD,DONE,FAILED,SUPERSEDED
36 MD extra: text with yaml format at database, dict at memory with:
37 params: list with the params to be sent to the VIM for CREATE or FIND. For DELETE the vim_id is taken from other related tasks
38 find: (only for CREATE tasks) if present it should FIND before creating and use if existing. Contains the FIND params
39 depends_on: list with the 'task_index'es of tasks that must be completed before. e.g. a vm creation depends on a net creation
40 sdn_net_id: used for net.
41 tries:
42 interfaces: used for VMs. Each key is the uuid of the instance_interfaces entry at database
43 iface_id: uuid of intance_interfaces
44 sdn_port_id:
45 sdn_net_id:
46 created_items: dictionary with extra elements created that need to be deleted. e.g. ports, volumes,...
47 created: False if the VIM element is not created by other actions, and it should not be deleted
48 vim_status: VIM status of the element. Stored also at database in the instance_XXX
49 M depends: dict with task_index(from depends_on) to task class
50 M params: same as extra[params] but with the resolved dependencies
51 M vim_interfaces: similar to extra[interfaces] but with VIM information. Stored at database in the instance_XXX but not at vim_actions
52 M vim_info: Detailed information of a vm,net from the VIM. Stored at database in the instance_XXX but not at vim_actions
53 MD error_msg: descriptive text upon an error.Stored also at database instance_XXX
54 MD created_at: task creation time
55 MD modified_at: last task update time. On refresh it contains when this task need to be refreshed
56
57 """
58
59 import threading
60 import time
61 import Queue
62 import logging
63 import vimconn
64 import yaml
65 from db_base import db_base_Exception
66 from lib_osm_openvim.ovim import ovimException
67
68 __author__ = "Alfonso Tierno, Pablo Montes"
69 __date__ = "$28-Sep-2017 12:07:15$"
70
71
72 def is_task_id(task_id):
73 return task_id.startswith("TASK-")
74
75
76 class VimThreadException(Exception):
77 pass
78
79
80 class VimThreadExceptionNotFound(VimThreadException):
81 pass
82
83
84 class vim_thread(threading.Thread):
85 REFRESH_BUILD = 5 # 5 seconds
86 REFRESH_ACTIVE = 60 # 1 minute
87
88 def __init__(self, myvimconn, task_lock, name=None, datacenter_name=None, datacenter_tenant_id=None,
89 db=None, db_lock=None, ovim=None):
90 """Init a thread.
91 Arguments:
92 'id' number of thead
93 'name' name of thread
94 'host','user': host ip or name to manage and user
95 'db', 'db_lock': database class and lock to use it in exclusion
96 """
97 threading.Thread.__init__(self)
98 if isinstance(myvimconn, vimconn.vimconnException):
99 self.vim = None
100 self.error_status = "Error accesing to VIM: {}".format(myvimconn)
101 else:
102 self.vim = myvimconn
103 self.error_status = None
104 self.datacenter_name = datacenter_name
105 self.datacenter_tenant_id = datacenter_tenant_id
106 self.ovim = ovim
107 if not name:
108 self.name = vimconn["id"] + "." + vimconn["config"]["datacenter_tenant_id"]
109 else:
110 self.name = name
111
112 self.logger = logging.getLogger('openmano.vim.'+self.name)
113 self.db = db
114 self.db_lock = db_lock
115
116 self.task_lock = task_lock
117 self.task_queue = Queue.Queue(2000)
118
119 self.refresh_tasks = []
120 """Contains time ordered task list for refreshing the status of VIM VMs and nets"""
121
122 self.pending_tasks = []
123 """Contains time ordered task list for creation, deletion of VIM VMs and nets"""
124
125 self.grouped_tasks = {}
126 """ It contains all the creation/deletion pending tasks grouped by its concrete vm, net, etc
127 <item><item_id>:
128 - <task1> # e.g. CREATE task
129 <task2> # e.g. DELETE task
130 """
131
132 def _reload_vim_actions(self):
133 """
134 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
135 :return: None
136 """
137 action_completed = False
138 task_list = []
139 old_action_key = None
140
141 old_item_id = ""
142 old_item = ""
143 old_created_at = 0.0
144 database_limit = 200
145 while True:
146 # get 200 (database_limit) entries each time
147 with self.db_lock:
148 vim_actions = self.db.get_rows(FROM="vim_actions",
149 WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
150 "item_id>=": old_item_id},
151 ORDER_BY=("item_id", "item", "created_at",),
152 LIMIT=database_limit)
153 for task in vim_actions:
154 item = task["item"]
155 item_id = task["item_id"]
156
157 # skip the first entries that are already processed in the previous pool of 200
158 if old_item_id:
159 if item_id == old_item_id and item == old_item and task["created_at"] == old_created_at:
160 old_item_id = False # next one will be a new un-processed task
161 continue
162
163 action_key = item + item_id
164 if old_action_key != action_key:
165 if not action_completed and task_list:
166 # This will fill needed task parameters into memory, and insert the task if needed in
167 # self.pending_tasks or self.refresh_tasks
168 self._insert_pending_tasks(task_list)
169 task_list = []
170 old_action_key = action_key
171 action_completed = False
172 elif action_completed:
173 continue
174
175 if task["status"] == "SCHEDULED" or task["action"] == "CREATE" or task["action"] == "FIND":
176 task_list.append(task)
177 elif task["action"] == "DELETE":
178 # action completed because deleted and status is not SCHEDULED. Not needed anything
179 action_completed = True
180 if len(vim_actions) == database_limit:
181 # update variables for get the next database iteration
182 old_item_id = item_id
183 old_item = item
184 old_created_at = task["created_at"]
185 else:
186 break
187 # Last actions group need to be inserted too
188 if not action_completed and task_list:
189 self._insert_pending_tasks(task_list)
190 self.logger.debug("reloaded vim actions pending:{} refresh:{}".format(
191 len(self.pending_tasks), len(self.refresh_tasks)))
192
193 def _refres_elements(self):
194 """Call VIM to get VMs and networks status until 10 elements"""
195 now = time.time()
196 nb_processed = 0
197 vm_to_refresh_list = []
198 net_to_refresh_list = []
199 vm_to_refresh_dict = {}
200 net_to_refresh_dict = {}
201 items_to_refresh = 0
202 while self.refresh_tasks:
203 task = self.refresh_tasks[0]
204 with self.task_lock:
205 if task['status'] == 'SUPERSEDED':
206 self.refresh_tasks.pop(0)
207 continue
208 if task['modified_at'] > now:
209 break
210 # task["status"] = "processing"
211 nb_processed += 1
212 self.refresh_tasks.pop(0)
213 if task["item"] == 'instance_vms':
214 vm_to_refresh_list.append(task["vim_id"])
215 vm_to_refresh_dict[task["vim_id"]] = task
216 elif task["item"] == 'instance_nets':
217 net_to_refresh_list.append(task["vim_id"])
218 net_to_refresh_dict[task["vim_id"]] = task
219 else:
220 error_text = "unknown task {}".format(task["item"])
221 self.logger.error(error_text)
222 items_to_refresh += 1
223 if items_to_refresh == 10:
224 break
225
226 if vm_to_refresh_list:
227 try:
228 now = time.time()
229 vim_dict = self.vim.refresh_vms_status(vm_to_refresh_list)
230 for vim_id, vim_info in vim_dict.items():
231 # look for task
232 task_need_update = False
233 task = vm_to_refresh_dict[vim_id]
234 self.logger.debug("get-vm vm_id=%s result=%s", task["vim_id"], str(vim_info))
235
236 # update database
237 task_vim_info = task.get("vim_info")
238 task_error_msg = task.get("error_msg")
239 task_vim_status = task["extra"].get("vim_status")
240 if vim_info.get("error_msg"):
241 vim_info["error_msg"] = self._format_vim_error_msg(vim_info["error_msg"])
242 if task_vim_status != vim_info["status"] or task_error_msg != vim_info.get("error_msg") or \
243 task_vim_info != vim_info.get("vim_info"):
244 with self.db_lock:
245 temp_dict = {"status": vim_info["status"],
246 "error_msg": vim_info.get("error_msg"),
247 "vim_info": vim_info.get("vim_info")}
248 self.db.update_rows('instance_vms', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]})
249 task["extra"]["vim_status"] = vim_info["status"]
250 task["error_msg"] = vim_info.get("error_msg")
251 task["vim_info"] = vim_info.get("vim_info")
252 task_need_update = True
253 for interface in vim_info.get("interfaces", ()):
254 vim_interface_id = interface["vim_interface_id"]
255 if vim_interface_id not in task["extra"]["interfaces"]:
256 self.logger.critical("Interface not found {} on task info {}".format(
257 vim_interface_id, task["extra"]["interfaces"]), exc_info=True)
258 continue
259 task_interface = task["extra"]["interfaces"][vim_interface_id]
260 task_vim_interface = task["vim_interfaces"].get(vim_interface_id)
261 if task_vim_interface != interface:
262 # delete old port
263 if task_interface.get("sdn_port_id"):
264 try:
265 with self.db_lock:
266 self.ovim.delete_port(task_interface["sdn_port_id"])
267 task_interface["sdn_port_id"] = None
268 task_need_update = True
269 except ovimException as e:
270 self.logger.error("ovimException deleting external_port={} ".format(
271 task_interface["sdn_port_id"]) + str(e), exc_info=True)
272 # TODO Set error_msg at instance_nets
273
274 # Create SDN port
275 sdn_net_id = task_interface.get("sdn_net_id")
276 if sdn_net_id and interface.get("compute_node") and interface.get("pci"):
277 sdn_port_name = sdn_net_id + "." + task["vim_id"]
278 sdn_port_name = sdn_port_name[:63]
279 try:
280 with self.db_lock:
281 sdn_port_id = self.ovim.new_external_port(
282 {"compute_node": interface["compute_node"],
283 "pci": interface["pci"],
284 "vlan": interface.get("vlan"),
285 "net_id": sdn_net_id,
286 "region": self.vim["config"]["datacenter_id"],
287 "name": sdn_port_name,
288 "mac": interface.get("mac_address")})
289 task_interface["sdn_port_id"] = sdn_port_id
290 task_need_update = True
291 except (ovimException, Exception) as e:
292 self.logger.error(
293 "ovimException creating new_external_port compute_node={} "
294 "pci={} vlan={} ".format(
295 interface["compute_node"],
296 interface["pci"],
297 interface.get("vlan")) + str(e),
298 exc_info=True)
299 # TODO Set error_msg at instance_nets
300 with self.db_lock:
301 self.db.update_rows(
302 'instance_interfaces',
303 UPDATE={"mac_address": interface.get("mac_address"),
304 "ip_address": interface.get("ip_address"),
305 "vim_info": interface.get("vim_info"),
306 "sdn_port_id": task_interface.get("sdn_port_id"),
307 "compute_node": interface.get("compute_node"),
308 "pci": interface.get("pci"),
309 "vlan": interface.get("vlan"),
310 },
311 WHERE={'uuid': task_interface["iface_id"]})
312 task["vim_interfaces"][vim_interface_id] = interface
313 if task_need_update:
314 with self.db_lock:
315 self.db.update_rows(
316 'vim_actions',
317 UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256),
318 "error_msg": task.get("error_msg"), "modified_at": now},
319 WHERE={'instance_action_id': task['instance_action_id'],
320 'task_index': task['task_index']})
321 if task["extra"].get("vim_status") == "BUILD":
322 self._insert_refresh(task, now + self.REFRESH_BUILD)
323 else:
324 self._insert_refresh(task, now + self.REFRESH_ACTIVE)
325 except vimconn.vimconnException as e:
326 self.logger.error("vimconnException Exception when trying to refresh vms " + str(e))
327 self._insert_refresh(task, now + self.REFRESH_ACTIVE)
328
329 if net_to_refresh_list:
330 try:
331 now = time.time()
332 vim_dict = self.vim.refresh_nets_status(net_to_refresh_list)
333 for vim_id, vim_info in vim_dict.items():
334 # look for task
335 task = net_to_refresh_dict[vim_id]
336 self.logger.debug("get-net net_id=%s result=%s", task["vim_id"], str(vim_info))
337
338 task_vim_info = task.get("vim_info")
339 task_vim_status = task["extra"].get("vim_status")
340 task_error_msg = task.get("error_msg")
341 task_sdn_net_id = task["extra"].get("sdn_net_id")
342
343 # get ovim status
344 if task_sdn_net_id:
345 try:
346 with self.db_lock:
347 sdn_net = self.ovim.show_network(task_sdn_net_id)
348 if sdn_net["status"] == "ERROR":
349 if not vim_info.get("error_msg"):
350 vim_info["error_msg"] = sdn_net["error_msg"]
351 else:
352 vim_info["error_msg"] = "VIM_ERROR: {} && SDN_ERROR: {}".format(
353 self._format_vim_error_msg(vim_info["error_msg"], 1024//2-14),
354 self._format_vim_error_msg(sdn_net["error_msg"], 1024//2-14))
355 if vim_info["status"] == "VIM_ERROR":
356 vim_info["status"] = "VIM_SDN_ERROR"
357 else:
358 vim_info["status"] = "SDN_ERROR"
359
360 except (ovimException, Exception) as e:
361 self.logger.error(
362 "ovimException getting network infor snd_net_id={}".format(task_sdn_net_id),
363 exc_info=True)
364 # TODO Set error_msg at instance_nets
365
366 # update database
367 if vim_info.get("error_msg"):
368 vim_info["error_msg"] = self._format_vim_error_msg(vim_info["error_msg"])
369 if task_vim_status != vim_info["status"] or task_error_msg != vim_info.get("error_msg") or \
370 task_vim_info != vim_info.get("vim_info"):
371 task["extra"]["vim_status"] = vim_info["status"]
372 task["error_msg"] = vim_info.get("error_msg")
373 task["vim_info"] = vim_info.get("vim_info")
374 temp_dict = {"status": vim_info["status"],
375 "error_msg": vim_info.get("error_msg"),
376 "vim_info": vim_info.get("vim_info")}
377 with self.db_lock:
378 self.db.update_rows('instance_nets', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]})
379 self.db.update_rows(
380 'vim_actions',
381 UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256),
382 "error_msg": task.get("error_msg"), "modified_at": now},
383 WHERE={'instance_action_id': task['instance_action_id'],
384 'task_index': task['task_index']})
385 if task["extra"].get("vim_status") == "BUILD":
386 self._insert_refresh(task, now + self.REFRESH_BUILD)
387 else:
388 self._insert_refresh(task, now + self.REFRESH_ACTIVE)
389 except vimconn.vimconnException as e:
390 self.logger.error("vimconnException Exception when trying to refresh nets " + str(e))
391 self._insert_refresh(task, now + self.REFRESH_ACTIVE)
392
393 return nb_processed
394
395 def _insert_refresh(self, task, threshold_time=None):
396 """Insert a task at list of refreshing elements. The refreshing list is ordered by threshold_time (task['modified_at']
397 It is assumed that this is called inside this thread
398 """
399 if not self.vim:
400 return
401 if not threshold_time:
402 threshold_time = time.time()
403 task["modified_at"] = threshold_time
404 task_name = task["item"][9:] + "-" + task["action"]
405 task_id = task["instance_action_id"] + "." + str(task["task_index"])
406 for index in range(0, len(self.refresh_tasks)):
407 if self.refresh_tasks[index]["modified_at"] > threshold_time:
408 self.refresh_tasks.insert(index, task)
409 break
410 else:
411 index = len(self.refresh_tasks)
412 self.refresh_tasks.append(task)
413 self.logger.debug("new refresh task={} name={}, modified_at={} index={}".format(
414 task_id, task_name, task["modified_at"], index))
415
416 def _remove_refresh(self, task_name, vim_id):
417 """Remove a task with this name and vim_id from the list of refreshing elements.
418 It is assumed that this is called inside this thread outside _refres_elements method
419 Return True if self.refresh_list is modified, task is found
420 Return False if not found
421 """
422 index_to_delete = None
423 for index in range(0, len(self.refresh_tasks)):
424 if self.refresh_tasks[index]["name"] == task_name and self.refresh_tasks[index]["vim_id"] == vim_id:
425 index_to_delete = index
426 break
427 else:
428 return False
429 if index_to_delete != None:
430 del self.refresh_tasks[index_to_delete]
431 return True
432
433 def _proccess_pending_tasks(self):
434 nb_created = 0
435 nb_processed = 0
436 while self.pending_tasks:
437 task = self.pending_tasks.pop(0)
438 nb_processed += 1
439 try:
440 # check if tasks that this depends on have been completed
441 dependency_not_completed = False
442 for task_index in task["extra"].get("depends_on", ()):
443 task_dependency = task["depends"].get("TASK-" + str(task_index))
444 if not task_dependency:
445 task_dependency = self._look_for_task(task["instance_action_id"], "TASK-" + str(task_index))
446 if not task_dependency:
447 raise VimThreadException(
448 "Cannot get depending net task trying to get depending task {}.{}".format(
449 task["instance_action_id"], task_index))
450 # task["depends"]["TASK-" + str(task_index)] = task_dependency #it references another object,so database must be look again
451 if task_dependency["status"] == "SCHEDULED":
452 dependency_not_completed = True
453 break
454 elif task_dependency["status"] == "FAILED":
455 raise VimThreadException(
456 "Cannot {} {}, task {}.{} {} because depends on failed {} {}, task{}.{}"
457 "task {}.{}".format(task["action"], task["item"],
458 task["instance_action_id"], task["task_index"],
459 task_dependency["instance_action_id"], task_dependency["task_index"],
460 task_dependency["action"], task_dependency["item"] ))
461 if dependency_not_completed:
462 # Move this task to the end.
463 task["extra"]["tries"] = task["extra"].get("tries", 0) + 1
464 if task["extra"]["tries"] <= 3:
465 self.pending_tasks.append(task)
466 continue
467 else:
468 raise VimThreadException(
469 "Cannot {} {}, (task {}.{}) because timeout waiting to complete {} {}, "
470 "(task {}.{})".format(task["action"], task["item"],
471 task["instance_action_id"], task["task_index"],
472 task_dependency["instance_action_id"], task_dependency["task_index"],
473 task_dependency["action"], task_dependency["item"]))
474
475 if task["status"] == "SUPERSEDED":
476 # not needed to do anything but update database with the new status
477 result = True
478 database_update = None
479 elif not self.vim:
480 task["status"] == "ERROR"
481 task["error_msg"] = self.error_status
482 result = False
483 database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]}
484 elif task["item"] == 'instance_vms':
485 if task["action"] == "CREATE":
486 result, database_update = self.new_vm(task)
487 nb_created += 1
488 elif task["action"] == "DELETE":
489 result, database_update = self.del_vm(task)
490 else:
491 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
492 elif task["item"] == 'instance_nets':
493 if task["action"] == "CREATE":
494 result, database_update = self.new_net(task)
495 nb_created += 1
496 elif task["action"] == "DELETE":
497 result, database_update = self.del_net(task)
498 elif task["action"] == "FIND":
499 result, database_update = self.get_net(task)
500 else:
501 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
502 else:
503 raise vimconn.vimconnException(self.name + "unknown task item {}".format(task["item"]))
504 # TODO
505 except VimThreadException as e:
506 result = False
507 task["error_msg"] = str(e)
508 task["status"] = "FAILED"
509 database_update = {"status": "VIM_ERROR", "vim_vm_id": None, "error_msg": task["error_msg"]}
510
511 if task["action"] == "DELETE":
512 action_key = task["item"] + task["item_id"]
513 del self.grouped_tasks[action_key]
514 elif task["action"] in ("CREATE", "FIND") and task["status"] in ("DONE", "BUILD"):
515 self._insert_refresh(task)
516
517 self.logger.debug("vim_action id={}.{} item={} action={} result={}:{} params={}".format(
518 task["instance_action_id"], task["task_index"], task["item"], task["action"],
519 task["status"], task["vim_id"] if task["status"] == "DONE" else task.get("error_msg"),
520 task["params"]))
521 try:
522 now = time.time()
523 with self.db_lock:
524 self.db.update_rows(
525 table="vim_actions",
526 UPDATE={"status": task["status"], "vim_id": task.get("vim_id"), "modified_at": now,
527 "error_msg": task["error_msg"],
528 "extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256)},
529 WHERE={"instance_action_id": task["instance_action_id"], "task_index": task["task_index"]})
530 if result is not None:
531 self.db.update_rows(
532 table="instance_actions",
533 UPDATE={("number_done" if result else "number_failed"): {"INCREMENT": 1},
534 "modified_at": now},
535 WHERE={"uuid": task["instance_action_id"]})
536 if database_update:
537 self.db.update_rows(table=task["item"],
538 UPDATE=database_update,
539 WHERE={"uuid": task["item_id"]})
540 except db_base_Exception as e:
541 self.logger.error("Error updating database %s", str(e), exc_info=True)
542
543 if nb_created == 10:
544 break
545 return nb_processed
546
547 def _insert_pending_tasks(self, vim_actions_list):
548 now = time.time()
549 for task in vim_actions_list:
550 if task["datacenter_vim_id"] != self.datacenter_tenant_id:
551 continue
552 item = task["item"]
553 item_id = task["item_id"]
554 action_key = item + item_id
555 if action_key not in self.grouped_tasks:
556 self.grouped_tasks[action_key] = []
557 task["params"] = None
558 task["depends"] = {}
559 if task["extra"]:
560 extra = yaml.load(task["extra"])
561 task["extra"] = extra
562 task["params"] = extra.get("params")
563 depends_on_list = extra.get("depends_on")
564 if depends_on_list:
565 for index in depends_on_list:
566 if index < len(vim_actions_list) and vim_actions_list[index]["task_index"] == index and\
567 vim_actions_list[index]["instance_action_id"] == task["instance_action_id"]:
568 task["depends"]["TASK-" + str(index)] = vim_actions_list[index]
569 if extra.get("interfaces"):
570 task["vim_interfaces"] = {}
571 else:
572 task["extra"] = {}
573 if "error_msg" not in task:
574 task["error_msg"] = None
575 if "vim_id" not in task:
576 task["vim_id"] = None
577
578 if task["action"] == "DELETE":
579 need_delete_action = False
580 for to_supersede in self.grouped_tasks.get(action_key, ()):
581 if to_supersede["action"] == "FIND" and to_supersede.get("vim_id"):
582 task["vim_id"] = to_supersede["vim_id"]
583 if to_supersede["action"] == "CREATE" and to_supersede.get("vim_id") and \
584 to_supersede["extra"].get("created", True):
585 need_delete_action = True
586 task["vim_id"] = to_supersede["vim_id"]
587 if to_supersede["extra"].get("sdn_vim_id"):
588 task["extra"]["sdn_vim_id"] = to_supersede["extra"]["sdn_vim_id"]
589 if to_supersede["extra"].get("interfaces"):
590 task["extra"]["interfaces"] = to_supersede["extra"]["interfaces"]
591 if to_supersede["extra"].get("created_items"):
592 if not task["extra"].get("created_items"):
593 task["extra"]["created_items"] = {}
594 task["extra"]["created_items"].update(to_supersede["extra"]["created_items"])
595 # Mark task as SUPERSEDED.
596 # If task is in self.pending_tasks, it will be removed and database will be update
597 # If task is in self.refresh_tasks, it will be removed
598 to_supersede["status"] = "SUPERSEDED"
599 if not need_delete_action:
600 task["status"] = "SUPERSEDED"
601
602 self.grouped_tasks[action_key].append(task)
603 self.pending_tasks.append(task)
604 elif task["status"] == "SCHEDULED":
605 self.grouped_tasks[action_key].append(task)
606 self.pending_tasks.append(task)
607 elif task["action"] in ("CREATE", "FIND"):
608 self.grouped_tasks[action_key].append(task)
609 if task["status"] in ("DONE", "BUILD"):
610 self._insert_refresh(task)
611 # TODO add VM reset, get console, etc...
612 else:
613 raise vimconn.vimconnException(self.name + "unknown vim_action action {}".format(task["action"]))
614
615 def insert_task(self, task):
616 try:
617 self.task_queue.put(task, False)
618 return None
619 except Queue.Full:
620 raise vimconn.vimconnException(self.name + ": timeout inserting a task")
621
622 def del_task(self, task):
623 with self.task_lock:
624 if task["status"] == "SCHEDULED":
625 task["status"] == "SUPERSEDED"
626 return True
627 else: # task["status"] == "processing"
628 self.task_lock.release()
629 return False
630
631 def run(self):
632 self.logger.debug("Starting")
633 while True:
634 self._reload_vim_actions()
635 reload_thread = False
636 while True:
637 try:
638 while not self.task_queue.empty():
639 task = self.task_queue.get()
640 if isinstance(task, list):
641 self._insert_pending_tasks(task)
642 elif isinstance(task, str):
643 if task == 'exit':
644 return 0
645 elif task == 'reload':
646 reload_thread = True
647 break
648 self.task_queue.task_done()
649 if reload_thread:
650 break
651 nb_processed = self._proccess_pending_tasks()
652 nb_processed += self._refres_elements()
653 if not nb_processed:
654 time.sleep(1)
655
656 except Exception as e:
657 self.logger.critical("Unexpected exception at run: " + str(e), exc_info=True)
658
659 self.logger.debug("Finishing")
660
661 def terminate(self, task):
662 return True, None
663
664 def _look_for_task(self, instance_action_id, task_id):
665 task_index = task_id.split("-")[-1]
666 with self.db_lock:
667 tasks = self.db.get_rows(FROM="vim_actions", WHERE={"instance_action_id": instance_action_id,
668 "task_index": task_index})
669 if not tasks:
670 return None
671 task = tasks[0]
672 task["params"] = None
673 task["depends"] = {}
674 if task["extra"]:
675 extra = yaml.load(task["extra"])
676 task["extra"] = extra
677 task["params"] = extra.get("params")
678 if extra.get("interfaces"):
679 task["vim_interfaces"] = {}
680 else:
681 task["extra"] = {}
682 return task
683
684 def _format_vim_error_msg(self, error_text, max_length=1024):
685 if error_text and len(error_text) >= max_length:
686 return error_text[:max_length//2-3] + " ... " + error_text[-max_length//2+3:]
687 return error_text
688
689 def new_vm(self, task):
690 try:
691 params = task["params"]
692 task_id = task["instance_action_id"] + "." + str(task["task_index"])
693 depends = task.get("depends")
694 net_list = params[5]
695 error_text = ""
696 for net in net_list:
697 if "net_id" in net and is_task_id(net["net_id"]): # change task_id into network_id
698 task_dependency = task["depends"].get(net["net_id"])
699 if not task_dependency:
700 task_dependency = self._look_for_task(task["instance_action_id"], net["net_id"])
701 if not task_dependency:
702 raise VimThreadException(
703 "Cannot get depending net task trying to get depending task {}.{}".format(
704 task["instance_action_id"], net["net_id"]))
705 network_id = task_dependency.get("vim_id")
706 if not network_id:
707 raise VimThreadException(
708 "Cannot create VM because depends on a network not created or found: " +
709 str(depends[net["net_id"]]["error_msg"]))
710 net["net_id"] = network_id
711 vim_vm_id, created_items = self.vim.new_vminstance(*params)
712
713 # fill task_interfaces. Look for snd_net_id at database for each interface
714 task_interfaces = {}
715 for iface in net_list:
716 task_interfaces[iface["vim_id"]] = {"iface_id": iface["uuid"]}
717 with self.db_lock:
718 result = self.db.get_rows(SELECT=('sdn_net_id',),
719 FROM='instance_nets as ine join instance_interfaces as ii on ii.instance_net_id=ine.uuid',
720 WHERE={'ii.uuid': iface["uuid"]})
721 if result:
722 task_interfaces[iface["vim_id"]]["sdn_net_id"] = result[0]['sdn_net_id']
723 else:
724 self.logger.critical("Error creating VM, task=%s Network {} not found at DB", task_id,
725 iface["uuid"], exc_info=True)
726
727 task["vim_info"] = {}
728 task["vim_interfaces"] = {}
729 task["extra"]["interfaces"] = task_interfaces
730 task["extra"]["created"] = True
731 task["extra"]["created_items"] = created_items
732 task["error_msg"] = None
733 task["status"] = "DONE"
734 task["vim_id"] = vim_vm_id
735 instance_element_update = {"status": "BUILD", "vim_vm_id": vim_vm_id, "error_msg": None}
736 return True, instance_element_update
737
738 except (vimconn.vimconnException, VimThreadException) as e:
739 self.logger.error("Error creating VM, task=%s: %s", task_id, str(e))
740 error_text = self._format_vim_error_msg(str(e))
741 task["error_msg"] = error_text
742 task["status"] = "FAILED"
743 task["vim_id"] = None
744 instance_element_update = {"status": "VIM_ERROR", "vim_vm_id": None, "error_msg": error_text}
745 return False, instance_element_update
746
747 def del_vm(self, task):
748 vm_vim_id = task["vim_id"]
749 interfaces = task["extra"].get("interfaces", ())
750 try:
751 for iface in interfaces.values():
752 if iface.get("sdn_port_id"):
753 try:
754 with self.db_lock:
755 self.ovim.delete_port(iface["sdn_port_id"])
756 except ovimException as e:
757 self.logger.error("ovimException deleting external_port={} at VM vim_id={} deletion ".format(
758 iface["sdn_port_id"], vm_vim_id) + str(e), exc_info=True)
759 # TODO Set error_msg at instance_nets
760
761 self.vim.delete_vminstance(vm_vim_id, task["extra"].get("created_items"))
762 task["status"] = "DONE"
763 task["error_msg"] = None
764 return True, None
765
766 except vimconn.vimconnException as e:
767 task["error_msg"] = self._format_vim_error_msg(str(e))
768 if isinstance(e, vimconn.vimconnNotFoundException):
769 # If not found mark as Done and fill error_msg
770 task["status"] = "DONE"
771 return True, None
772 task["status"] = "FAILED"
773 return False, None
774
775 def _get_net_internal(self, task, filter_param):
776 """
777 Common code for get_net and new_net. It looks for a network on VIM with the filter_params
778 :param task: task for this find or find-or-create action
779 :param filter_param: parameters to send to the vimconnector
780 :return: a dict with the content to update the instance_nets database table. Raises an exception on error, or
781 when network is not found or found more than one
782 """
783 vim_nets = self.vim.get_network_list(filter_param)
784 if not vim_nets:
785 raise VimThreadExceptionNotFound("Network not found with this criteria: '{}'".format(filter))
786 elif len(vim_nets) > 1:
787 raise VimThreadException("More than one network found with this criteria: '{}'".format(filter))
788 vim_net_id = vim_nets[0]["id"]
789
790 # Discover if this network is managed by a sdn controller
791 sdn_net_id = None
792 with self.db_lock:
793 result = self.db.get_rows(SELECT=('sdn_net_id',), FROM='instance_nets',
794 WHERE={'vim_net_id': vim_net_id, 'instance_scenario_id': None,
795 'datacenter_tenant_id': self.datacenter_tenant_id})
796 if result:
797 sdn_net_id = result[0]['sdn_net_id']
798
799 task["status"] = "DONE"
800 task["extra"]["vim_info"] = {}
801 task["extra"]["created"] = False
802 task["extra"]["sdn_net_id"] = sdn_net_id
803 task["error_msg"] = None
804 task["vim_id"] = vim_net_id
805 instance_element_update = {"vim_net_id": vim_net_id, "created": False, "status": "BUILD",
806 "error_msg": None}
807 return instance_element_update
808
809 def get_net(self, task):
810 try:
811 task_id = task["instance_action_id"] + "." + str(task["task_index"])
812 params = task["params"]
813 filter_param = params[0]
814 instance_element_update = self._get_net_internal(task, filter_param)
815 return True, instance_element_update
816
817 except (vimconn.vimconnException, VimThreadException) as e:
818 self.logger.error("Error looking for NET, task=%s: %s", str(task_id), str(e))
819 task["status"] = "FAILED"
820 task["vim_id"] = None
821 task["error_msg"] = self._format_vim_error_msg(str(e))
822 instance_element_update = {"vim_net_id": None, "status": "VIM_ERROR",
823 "error_msg": task["error_msg"]}
824 return False, instance_element_update
825
826 def new_net(self, task):
827 try:
828 task_id = task["instance_action_id"] + "." + str(task["task_index"])
829 # FIND
830 if task["extra"].get("find"):
831 action_text = "finding"
832 filter_param = task["extra"]["find"][0]
833 try:
834 instance_element_update = self._get_net_internal(task, filter_param)
835 return True, instance_element_update
836 except VimThreadExceptionNotFound:
837 pass
838 # CREATE
839 params = task["params"]
840 action_text = "creating"
841 vim_net_id = self.vim.new_network(*params)
842
843 net_name = params[0]
844 net_type = params[1]
845
846 sdn_net_id = None
847 sdn_controller = self.vim.config.get('sdn-controller')
848 if sdn_controller and (net_type == "data" or net_type == "ptp"):
849 network = {"name": net_name, "type": net_type, "region": self.vim["config"]["datacenter_id"]}
850
851 vim_net = self.vim.get_network(vim_net_id)
852 if vim_net.get('encapsulation') != 'vlan':
853 raise vimconn.vimconnException(
854 "net '{}' defined as type '{}' has not vlan encapsulation '{}'".format(
855 net_name, net_type, vim_net['encapsulation']))
856 network["vlan"] = vim_net.get('segmentation_id')
857 try:
858 with self.db_lock:
859 sdn_net_id = self.ovim.new_network(network)
860 except (ovimException, Exception) as e:
861 self.logger.error("task=%s cannot create SDN network vim_net_id=%s input='%s' ovimException='%s'",
862 str(task_id), vim_net_id, str(network), str(e))
863 task["status"] = "DONE"
864 task["extra"]["vim_info"] = {}
865 task["extra"]["sdn_net_id"] = sdn_net_id
866 task["extra"]["created"] = True
867 task["error_msg"] = None
868 task["vim_id"] = vim_net_id
869 instance_element_update = {"vim_net_id": vim_net_id, "sdn_net_id": sdn_net_id, "status": "BUILD",
870 "created": True, "error_msg": None}
871 return True, instance_element_update
872 except vimconn.vimconnException as e:
873 self.logger.error("Error {} NET, task=%s: %s", action_text, str(task_id), str(e))
874 task["status"] = "FAILED"
875 task["vim_id"] = None
876 task["error_msg"] = self._format_vim_error_msg(str(e))
877 instance_element_update = {"vim_net_id": None, "sdn_net_id": None, "status": "VIM_ERROR",
878 "error_msg": task["error_msg"]}
879 return False, instance_element_update
880
881 def del_net(self, task):
882 net_vim_id = task["vim_id"]
883 sdn_net_id = task["extra"].get("sdn_net_id")
884 try:
885 if sdn_net_id:
886 # Delete any attached port to this sdn network. There can be ports associated to this network in case
887 # it was manually done using 'openmano vim-net-sdn-attach'
888 with self.db_lock:
889 port_list = self.ovim.get_ports(columns={'uuid'},
890 filter={'name': 'external_port', 'net_id': sdn_net_id})
891 for port in port_list:
892 self.ovim.delete_port(port['uuid'])
893 self.ovim.delete_network(sdn_net_id)
894 self.vim.delete_network(net_vim_id)
895 task["status"] = "DONE"
896 task["error_msg"] = None
897 return True, None
898 except ovimException as e:
899 task["error_msg"] = self._format_vim_error_msg("ovimException obtaining and deleting external "
900 "ports for net {}: {}".format(sdn_net_id, str(e)))
901 except vimconn.vimconnException as e:
902 task["error_msg"] = self._format_vim_error_msg(str(e))
903 if isinstance(e, vimconn.vimconnNotFoundException):
904 # If not found mark as Done and fill error_msg
905 task["status"] = "DONE"
906 return True, None
907 task["status"] = "FAILED"
908 return False, None