vim thread logging enhancement
[osm/RO.git] / osm_ro / vim_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
5 # This file is part of openvim
6 # All Rights Reserved.
7 #
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
11 #
12 # http://www.apache.org/licenses/LICENSE-2.0
13 #
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
18 # under the License.
19 #
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
22 ##
23
24 """"
25 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
26 The tasks are stored at database in table vim_actions
27 The task content is (M: stored at memory, D: stored at database):
28 MD instance_action_id: reference a global action over an instance-scenario: database instance_actions
29 MD task_index: index number of the task. This together with the previous forms a unique key identifier
30 MD datacenter_vim_id: should contain the uuid of the VIM managed by this thread
31 MD vim_id: id of the vm,net,etc at VIM
32 MD action: CREATE, DELETE, FIND
33 MD item: database table name, can be instance_vms, instance_nets, TODO: datacenter_flavors, datacenter_images
34 MD item_id: uuid of the referenced entry in the previous table
35 MD status: SCHEDULED,BUILD,DONE,FAILED,SUPERSEDED
36 MD extra: text with yaml format at database, dict at memory with:
37 params: list with the params to be sent to the VIM for CREATE or FIND. For DELETE the vim_id is taken from other related tasks
38 find: (only for CREATE tasks) if present it should FIND before creating and use if existing. Contains the FIND params
39 depends_on: list with the 'task_index'es of tasks that must be completed before. e.g. a vm creation depends on a net creation
40 sdn_net_id: used for net.
41 tries:
42 interfaces: used for VMs. Each key is the uuid of the instance_interfaces entry at database
43 iface_id: uuid of intance_interfaces
44 sdn_port_id:
45 sdn_net_id:
46 created_items: dictionary with extra elements created that need to be deleted. e.g. ports, volumes,...
47 created: False if the VIM element is not created by other actions, and it should not be deleted
48 vim_status: VIM status of the element. Stored also at database in the instance_XXX
49 M depends: dict with task_index(from depends_on) to task class
50 M params: same as extra[params] but with the resolved dependencies
51 M vim_interfaces: similar to extra[interfaces] but with VIM information. Stored at database in the instance_XXX but not at vim_actions
52 M vim_info: Detailed information of a vm,net from the VIM. Stored at database in the instance_XXX but not at vim_actions
53 MD error_msg: descriptive text upon an error.Stored also at database instance_XXX
54 MD created_at: task creation time
55 MD modified_at: last task update time. On refresh it contains when this task need to be refreshed
56
57 """
58
59 import threading
60 import time
61 import Queue
62 import logging
63 import vimconn
64 import yaml
65 from db_base import db_base_Exception
66 from lib_osm_openvim.ovim import ovimException
67
68 __author__ = "Alfonso Tierno, Pablo Montes"
69 __date__ = "$28-Sep-2017 12:07:15$"
70
71
72 def is_task_id(task_id):
73 return task_id.startswith("TASK-")
74
75
76 class VimThreadException(Exception):
77 pass
78
79
80 class VimThreadExceptionNotFound(VimThreadException):
81 pass
82
83
84 class vim_thread(threading.Thread):
85 REFRESH_BUILD = 5 # 5 seconds
86 REFRESH_ACTIVE = 60 # 1 minute
87
88 def __init__(self, myvimconn, task_lock, name=None, datacenter_name=None, datacenter_tenant_id=None,
89 db=None, db_lock=None, ovim=None):
90 """Init a thread.
91 Arguments:
92 'id' number of thead
93 'name' name of thread
94 'host','user': host ip or name to manage and user
95 'db', 'db_lock': database class and lock to use it in exclusion
96 """
97 threading.Thread.__init__(self)
98 if isinstance(myvimconn, vimconn.vimconnException):
99 self.vim = None
100 self.error_status = "Error accesing to VIM: {}".format(myvimconn)
101 else:
102 self.vim = myvimconn
103 self.error_status = None
104 self.datacenter_name = datacenter_name
105 self.datacenter_tenant_id = datacenter_tenant_id
106 self.ovim = ovim
107 if not name:
108 self.name = vimconn["id"] + "." + vimconn["config"]["datacenter_tenant_id"]
109 else:
110 self.name = name
111
112 self.logger = logging.getLogger('openmano.vim.'+self.name)
113 self.db = db
114 self.db_lock = db_lock
115
116 self.task_lock = task_lock
117 self.task_queue = Queue.Queue(2000)
118
119 self.refresh_tasks = []
120 """Contains time ordered task list for refreshing the status of VIM VMs and nets"""
121
122 self.pending_tasks = []
123 """Contains time ordered task list for creation, deletion of VIM VMs and nets"""
124
125 self.grouped_tasks = {}
126 """ It contains all the creation/deletion pending tasks grouped by its concrete vm, net, etc
127 <item><item_id>:
128 - <task1> # e.g. CREATE task
129 <task2> # e.g. DELETE task
130 """
131
132 def _reload_vim_actions(self):
133 """
134 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
135 :return: None
136 """
137 try:
138 action_completed = False
139 task_list = []
140 old_action_key = None
141
142 old_item_id = ""
143 old_item = ""
144 old_created_at = 0.0
145 database_limit = 200
146 while True:
147 # get 200 (database_limit) entries each time
148 with self.db_lock:
149 vim_actions = self.db.get_rows(FROM="vim_actions",
150 WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
151 "item_id>=": old_item_id},
152 ORDER_BY=("item_id", "item", "created_at",),
153 LIMIT=database_limit)
154 for task in vim_actions:
155 item = task["item"]
156 item_id = task["item_id"]
157
158 # skip the first entries that are already processed in the previous pool of 200
159 if old_item_id:
160 if item_id == old_item_id and item == old_item and task["created_at"] == old_created_at:
161 old_item_id = False # next one will be a new un-processed task
162 continue
163
164 action_key = item + item_id
165 if old_action_key != action_key:
166 if not action_completed and task_list:
167 # This will fill needed task parameters into memory, and insert the task if needed in
168 # self.pending_tasks or self.refresh_tasks
169 try:
170 self._insert_pending_tasks(task_list)
171 except Exception as e:
172 self.logger.critical(
173 "Unexpected exception at _reload_vim_actions:_insert_pending_tasks: " + str(e),
174 exc_info=True)
175 task_list = []
176 old_action_key = action_key
177 action_completed = False
178 elif action_completed:
179 continue
180
181 if task["status"] == "SCHEDULED" or task["action"] == "CREATE" or task["action"] == "FIND":
182 task_list.append(task)
183 elif task["action"] == "DELETE":
184 # action completed because deleted and status is not SCHEDULED. Not needed anything
185 action_completed = True
186 if len(vim_actions) == database_limit:
187 # update variables for get the next database iteration
188 old_item_id = item_id
189 old_item = item
190 old_created_at = task["created_at"]
191 else:
192 break
193 # Last actions group need to be inserted too
194 if not action_completed and task_list:
195 try:
196 self._insert_pending_tasks(task_list)
197 except Exception as e:
198 self.logger.critical("Unexpected exception at _reload_vim_actions:_insert_pending_tasks: " + str(e),
199 exc_info=True)
200 self.logger.debug("reloaded vim actions pending:{} refresh:{}".format(
201 len(self.pending_tasks), len(self.refresh_tasks)))
202 except Exception as e:
203 self.logger.critical("Unexpected exception at _reload_vim_actions: " + str(e), exc_info=True)
204
205 def _refres_elements(self):
206 """Call VIM to get VMs and networks status until 10 elements"""
207 now = time.time()
208 nb_processed = 0
209 vm_to_refresh_list = []
210 net_to_refresh_list = []
211 vm_to_refresh_dict = {}
212 net_to_refresh_dict = {}
213 items_to_refresh = 0
214 while self.refresh_tasks:
215 task = self.refresh_tasks[0]
216 with self.task_lock:
217 if task['status'] == 'SUPERSEDED':
218 self.refresh_tasks.pop(0)
219 continue
220 if task['modified_at'] > now:
221 break
222 # task["status"] = "processing"
223 nb_processed += 1
224 self.refresh_tasks.pop(0)
225 if task["item"] == 'instance_vms':
226 vm_to_refresh_list.append(task["vim_id"])
227 vm_to_refresh_dict[task["vim_id"]] = task
228 elif task["item"] == 'instance_nets':
229 net_to_refresh_list.append(task["vim_id"])
230 net_to_refresh_dict[task["vim_id"]] = task
231 else:
232 task_id = task["instance_action_id"] + "." + str(task["task_index"])
233 self.logger.critical("task={}: unknown task {}".format(task_id, task["item"]), exc_info=True)
234 items_to_refresh += 1
235 if items_to_refresh == 10:
236 break
237
238 if vm_to_refresh_list:
239 now = time.time()
240 try:
241 vim_dict = self.vim.refresh_vms_status(vm_to_refresh_list)
242 except vimconn.vimconnException as e:
243 # Mark all tasks at VIM_ERROR status
244 self.logger.error("task=several get-VM: vimconnException when trying to refresh vms " + str(e))
245 vim_dict = {}
246 for vim_id in vm_to_refresh_list:
247 vim_dict[vim_id] = {"status": "VIM_ERROR", "error_msg": str(e)}
248
249 for vim_id, vim_info in vim_dict.items():
250 # look for task
251 task_need_update = False
252 task = vm_to_refresh_dict[vim_id]
253 task_id = task["instance_action_id"] + "." + str(task["task_index"])
254 self.logger.debug("task={} get-VM: vim_vm_id={} result={}".format(task_id, task["vim_id"], vim_info))
255
256 # check and update interfaces
257 task_warning_msg = ""
258 for interface in vim_info.get("interfaces", ()):
259 vim_interface_id = interface["vim_interface_id"]
260 if vim_interface_id not in task["extra"]["interfaces"]:
261 self.logger.critical("task={} get-VM: Interface not found {} on task info {}".format(
262 task_id, vim_interface_id, task["extra"]["interfaces"]), exc_info=True)
263 continue
264 task_interface = task["extra"]["interfaces"][vim_interface_id]
265 task_vim_interface = task["vim_interfaces"].get(vim_interface_id)
266 if task_vim_interface != interface:
267 # delete old port
268 if task_interface.get("sdn_port_id"):
269 try:
270 with self.db_lock:
271 self.ovim.delete_port(task_interface["sdn_port_id"])
272 task_interface["sdn_port_id"] = None
273 task_need_update = True
274 except ovimException as e:
275 error_text = "ovimException deleting external_port={}: {}".format(
276 task_interface["sdn_port_id"], e)
277 self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True)
278 task_warning_msg += error_text
279 # TODO Set error_msg at instance_nets instead of instance VMs
280
281 # Create SDN port
282 sdn_net_id = task_interface.get("sdn_net_id")
283 if sdn_net_id and interface.get("compute_node") and interface.get("pci"):
284 sdn_port_name = sdn_net_id + "." + task["vim_id"]
285 sdn_port_name = sdn_port_name[:63]
286 try:
287 with self.db_lock:
288 sdn_port_id = self.ovim.new_external_port(
289 {"compute_node": interface["compute_node"],
290 "pci": interface["pci"],
291 "vlan": interface.get("vlan"),
292 "net_id": sdn_net_id,
293 "region": self.vim["config"]["datacenter_id"],
294 "name": sdn_port_name,
295 "mac": interface.get("mac_address")})
296 task_interface["sdn_port_id"] = sdn_port_id
297 task_need_update = True
298 except (ovimException, Exception) as e:
299 error_text = "ovimException creating new_external_port compute_node={}"\
300 " pci={} vlan={} {}".format(
301 interface["compute_node"],
302 interface["pci"],
303 interface.get("vlan"), e)
304 self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True)
305 task_warning_msg += error_text
306 # TODO Set error_msg at instance_nets instead of instance VMs
307
308 with self.db_lock:
309 self.db.update_rows(
310 'instance_interfaces',
311 UPDATE={"mac_address": interface.get("mac_address"),
312 "ip_address": interface.get("ip_address"),
313 "vim_info": interface.get("vim_info"),
314 "sdn_port_id": task_interface.get("sdn_port_id"),
315 "compute_node": interface.get("compute_node"),
316 "pci": interface.get("pci"),
317 "vlan": interface.get("vlan")},
318 WHERE={'uuid': task_interface["iface_id"]})
319 task["vim_interfaces"][vim_interface_id] = interface
320
321 # check and update task and instance_vms database
322 if vim_info.get("error_msg"):
323 vim_info["error_msg"] = self._format_vim_error_msg(vim_info["error_msg"] + task_warning_msg)
324 elif task_warning_msg:
325 vim_info["error_msg"] = self._format_vim_error_msg(task_warning_msg)
326
327 task_vim_info = task.get("vim_info")
328 task_error_msg = task.get("error_msg")
329 task_vim_status = task["extra"].get("vim_status")
330 if task_vim_status != vim_info["status"] or task_error_msg != vim_info.get("error_msg") or \
331 (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
332 temp_dict = {"status": vim_info["status"], "error_msg": vim_info.get("error_msg")}
333 if vim_info.get("vim_info"):
334 temp_dict["vim_info"] = vim_info["vim_info"]
335 with self.db_lock:
336 self.db.update_rows('instance_vms', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]})
337 task["extra"]["vim_status"] = vim_info["status"]
338 task["error_msg"] = vim_info.get("error_msg")
339 if vim_info.get("vim_info"):
340 task["vim_info"] = vim_info["vim_info"]
341 task_need_update = True
342
343 if task_need_update:
344 with self.db_lock:
345 self.db.update_rows(
346 'vim_actions',
347 UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256),
348 "error_msg": task.get("error_msg"), "modified_at": now},
349 WHERE={'instance_action_id': task['instance_action_id'],
350 'task_index': task['task_index']})
351 if task["extra"].get("vim_status") == "BUILD":
352 self._insert_refresh(task, now + self.REFRESH_BUILD)
353 else:
354 self._insert_refresh(task, now + self.REFRESH_ACTIVE)
355
356 if net_to_refresh_list:
357 now = time.time()
358 try:
359 vim_dict = self.vim.refresh_nets_status(net_to_refresh_list)
360 except vimconn.vimconnException as e:
361 # Mark all tasks at VIM_ERROR status
362 self.logger.error("task=several get-net: vimconnException when trying to refresh nets " + str(e))
363 vim_dict = {}
364 for vim_id in net_to_refresh_list:
365 vim_dict[vim_id] = {"status": "VIM_ERROR", "error_msg": str(e)}
366
367 for vim_id, vim_info in vim_dict.items():
368 # look for task
369 task = net_to_refresh_dict[vim_id]
370 task_id = task["instance_action_id"] + "." + str(task["task_index"])
371 self.logger.debug("task={} get-net: vim_net_id={} result={}".format(task_id, task["vim_id"], vim_info))
372
373 task_vim_info = task.get("vim_info")
374 task_vim_status = task["extra"].get("vim_status")
375 task_error_msg = task.get("error_msg")
376 task_sdn_net_id = task["extra"].get("sdn_net_id")
377
378 # get ovim status
379 if task_sdn_net_id:
380 try:
381 with self.db_lock:
382 sdn_net = self.ovim.show_network(task_sdn_net_id)
383 except (ovimException, Exception) as e:
384 text_error = "ovimException getting network snd_net_id={}: {}".format(task_sdn_net_id, e)
385 self.logger.error("task={} get-net: {}".format(task_id, text_error), exc_info=True)
386 sdn_net = {"status": "ERROR", "error_msg": text_error}
387 if sdn_net["status"] == "ERROR":
388 if not vim_info.get("error_msg"):
389 vim_info["error_msg"] = sdn_net["error_msg"]
390 else:
391 vim_info["error_msg"] = "VIM_ERROR: {} && SDN_ERROR: {}".format(
392 self._format_vim_error_msg(vim_info["error_msg"], 1024//2-14),
393 self._format_vim_error_msg(sdn_net["error_msg"], 1024//2-14))
394 if vim_info["status"] == "VIM_ERROR":
395 vim_info["status"] = "VIM_SDN_ERROR"
396 else:
397 vim_info["status"] = "SDN_ERROR"
398
399 # update database
400 if vim_info.get("error_msg"):
401 vim_info["error_msg"] = self._format_vim_error_msg(vim_info["error_msg"])
402 if task_vim_status != vim_info["status"] or task_error_msg != vim_info.get("error_msg") or \
403 (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
404 task["extra"]["vim_status"] = vim_info["status"]
405 task["error_msg"] = vim_info.get("error_msg")
406 if vim_info.get("vim_info"):
407 task["vim_info"] = vim_info["vim_info"]
408 temp_dict = {"status": vim_info["status"], "error_msg": vim_info.get("error_msg")}
409 if vim_info.get("vim_info"):
410 temp_dict["vim_info"] = vim_info["vim_info"]
411 with self.db_lock:
412 self.db.update_rows('instance_nets', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]})
413 self.db.update_rows(
414 'vim_actions',
415 UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256),
416 "error_msg": task.get("error_msg"), "modified_at": now},
417 WHERE={'instance_action_id': task['instance_action_id'],
418 'task_index': task['task_index']})
419 if task["extra"].get("vim_status") == "BUILD":
420 self._insert_refresh(task, now + self.REFRESH_BUILD)
421 else:
422 self._insert_refresh(task, now + self.REFRESH_ACTIVE)
423
424 return nb_processed
425
426 def _insert_refresh(self, task, threshold_time=None):
427 """Insert a task at list of refreshing elements. The refreshing list is ordered by threshold_time (task['modified_at']
428 It is assumed that this is called inside this thread
429 """
430 if not self.vim:
431 return
432 if not threshold_time:
433 threshold_time = time.time()
434 task["modified_at"] = threshold_time
435 task_name = task["item"][9:] + "-" + task["action"]
436 task_id = task["instance_action_id"] + "." + str(task["task_index"])
437 for index in range(0, len(self.refresh_tasks)):
438 if self.refresh_tasks[index]["modified_at"] > threshold_time:
439 self.refresh_tasks.insert(index, task)
440 break
441 else:
442 index = len(self.refresh_tasks)
443 self.refresh_tasks.append(task)
444 self.logger.debug("task={} new refresh name={}, modified_at={} index={}".format(
445 task_id, task_name, task["modified_at"], index))
446
447 def _remove_refresh(self, task_name, vim_id):
448 """Remove a task with this name and vim_id from the list of refreshing elements.
449 It is assumed that this is called inside this thread outside _refres_elements method
450 Return True if self.refresh_list is modified, task is found
451 Return False if not found
452 """
453 index_to_delete = None
454 for index in range(0, len(self.refresh_tasks)):
455 if self.refresh_tasks[index]["name"] == task_name and self.refresh_tasks[index]["vim_id"] == vim_id:
456 index_to_delete = index
457 break
458 else:
459 return False
460 if not index_to_delete:
461 del self.refresh_tasks[index_to_delete]
462 return True
463
464 def _proccess_pending_tasks(self):
465 nb_created = 0
466 nb_processed = 0
467 while self.pending_tasks:
468 task = self.pending_tasks.pop(0)
469 nb_processed += 1
470 try:
471 # check if tasks that this depends on have been completed
472 dependency_not_completed = False
473 for task_index in task["extra"].get("depends_on", ()):
474 task_dependency = task["depends"].get("TASK-" + str(task_index))
475 if not task_dependency:
476 task_dependency = self._look_for_task(task["instance_action_id"], "TASK-" + str(task_index))
477 if not task_dependency:
478 raise VimThreadException(
479 "Cannot get depending net task trying to get depending task {}.{}".format(
480 task["instance_action_id"], task_index))
481 # task["depends"]["TASK-" + str(task_index)] = task_dependency #it references another object,so database must be look again
482 if task_dependency["status"] == "SCHEDULED":
483 dependency_not_completed = True
484 break
485 elif task_dependency["status"] == "FAILED":
486 raise VimThreadException(
487 "Cannot {} {}, (task {}.{}) because depends on failed {} {}, (task{}.{})".format(
488 task["action"], task["item"],
489 task["instance_action_id"], task["task_index"],
490 task_dependency["instance_action_id"], task_dependency["task_index"],
491 task_dependency["action"], task_dependency["item"]))
492 if dependency_not_completed:
493 # Move this task to the end.
494 task["extra"]["tries"] = task["extra"].get("tries", 0) + 1
495 if task["extra"]["tries"] <= 3:
496 self.pending_tasks.append(task)
497 continue
498 else:
499 raise VimThreadException(
500 "Cannot {} {}, (task {}.{}) because timeout waiting to complete {} {}, "
501 "(task {}.{})".format(task["action"], task["item"],
502 task["instance_action_id"], task["task_index"],
503 task_dependency["instance_action_id"], task_dependency["task_index"],
504 task_dependency["action"], task_dependency["item"]))
505
506 if task["status"] == "SUPERSEDED":
507 # not needed to do anything but update database with the new status
508 result = True
509 database_update = None
510 elif not self.vim:
511 task["status"] = "ERROR"
512 task["error_msg"] = self.error_status
513 result = False
514 database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]}
515 elif task["item"] == 'instance_vms':
516 if task["action"] == "CREATE":
517 result, database_update = self.new_vm(task)
518 nb_created += 1
519 elif task["action"] == "DELETE":
520 result, database_update = self.del_vm(task)
521 else:
522 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
523 elif task["item"] == 'instance_nets':
524 if task["action"] == "CREATE":
525 result, database_update = self.new_net(task)
526 nb_created += 1
527 elif task["action"] == "DELETE":
528 result, database_update = self.del_net(task)
529 elif task["action"] == "FIND":
530 result, database_update = self.get_net(task)
531 else:
532 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
533 else:
534 raise vimconn.vimconnException(self.name + "unknown task item {}".format(task["item"]))
535 # TODO
536 except VimThreadException as e:
537 result = False
538 task["error_msg"] = str(e)
539 task["status"] = "FAILED"
540 database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]}
541 if task["item"] == 'instance_vms':
542 database_update["vim_vm_id"] = None
543 elif task["item"] == 'instance_nets':
544 database_update["vim_net_id"] = None
545
546 if task["action"] == "DELETE":
547 action_key = task["item"] + task["item_id"]
548 del self.grouped_tasks[action_key]
549 elif task["action"] in ("CREATE", "FIND") and task["status"] in ("DONE", "BUILD"):
550 self._insert_refresh(task)
551
552 task_id = task["instance_action_id"] + "." + str(task["task_index"])
553 self.logger.debug("task={} item={} action={} result={}:'{}' params={}".format(
554 task_id, task["item"], task["action"], task["status"],
555 task["vim_id"] if task["status"] == "DONE" else task.get("error_msg"), task["params"]))
556 try:
557 now = time.time()
558 with self.db_lock:
559 self.db.update_rows(
560 table="vim_actions",
561 UPDATE={"status": task["status"], "vim_id": task.get("vim_id"), "modified_at": now,
562 "error_msg": task["error_msg"],
563 "extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256)},
564 WHERE={"instance_action_id": task["instance_action_id"], "task_index": task["task_index"]})
565 if result is not None:
566 self.db.update_rows(
567 table="instance_actions",
568 UPDATE={("number_done" if result else "number_failed"): {"INCREMENT": 1},
569 "modified_at": now},
570 WHERE={"uuid": task["instance_action_id"]})
571 if database_update:
572 self.db.update_rows(table=task["item"],
573 UPDATE=database_update,
574 WHERE={"uuid": task["item_id"]})
575 except db_base_Exception as e:
576 self.logger.error("task={} Error updating database {}".format(task_id, e), exc_info=True)
577
578 if nb_created == 10:
579 break
580 return nb_processed
581
582 def _insert_pending_tasks(self, vim_actions_list):
583 for task in vim_actions_list:
584 if task["datacenter_vim_id"] != self.datacenter_tenant_id:
585 continue
586 item = task["item"]
587 item_id = task["item_id"]
588 action_key = item + item_id
589 if action_key not in self.grouped_tasks:
590 self.grouped_tasks[action_key] = []
591 task["params"] = None
592 task["depends"] = {}
593 if task["extra"]:
594 extra = yaml.load(task["extra"])
595 task["extra"] = extra
596 task["params"] = extra.get("params")
597 depends_on_list = extra.get("depends_on")
598 if depends_on_list:
599 for index in depends_on_list:
600 if index < len(vim_actions_list) and vim_actions_list[index]["task_index"] == index and\
601 vim_actions_list[index]["instance_action_id"] == task["instance_action_id"]:
602 task["depends"]["TASK-" + str(index)] = vim_actions_list[index]
603 if extra.get("interfaces"):
604 task["vim_interfaces"] = {}
605 else:
606 task["extra"] = {}
607 if "error_msg" not in task:
608 task["error_msg"] = None
609 if "vim_id" not in task:
610 task["vim_id"] = None
611
612 if task["action"] == "DELETE":
613 need_delete_action = False
614 for to_supersede in self.grouped_tasks.get(action_key, ()):
615 if to_supersede["action"] == "FIND" and to_supersede.get("vim_id"):
616 task["vim_id"] = to_supersede["vim_id"]
617 if to_supersede["action"] == "CREATE" and to_supersede["extra"].get("created", True) and \
618 (to_supersede.get("vim_id") or to_supersede["extra"].get("sdn_net_id")):
619 need_delete_action = True
620 task["vim_id"] = to_supersede["vim_id"]
621 if to_supersede["extra"].get("sdn_net_id"):
622 task["extra"]["sdn_net_id"] = to_supersede["extra"]["sdn_net_id"]
623 if to_supersede["extra"].get("interfaces"):
624 task["extra"]["interfaces"] = to_supersede["extra"]["interfaces"]
625 if to_supersede["extra"].get("created_items"):
626 if not task["extra"].get("created_items"):
627 task["extra"]["created_items"] = {}
628 task["extra"]["created_items"].update(to_supersede["extra"]["created_items"])
629 # Mark task as SUPERSEDED.
630 # If task is in self.pending_tasks, it will be removed and database will be update
631 # If task is in self.refresh_tasks, it will be removed
632 to_supersede["status"] = "SUPERSEDED"
633 if not need_delete_action:
634 task["status"] = "SUPERSEDED"
635
636 self.grouped_tasks[action_key].append(task)
637 self.pending_tasks.append(task)
638 elif task["status"] == "SCHEDULED":
639 self.grouped_tasks[action_key].append(task)
640 self.pending_tasks.append(task)
641 elif task["action"] in ("CREATE", "FIND"):
642 self.grouped_tasks[action_key].append(task)
643 if task["status"] in ("DONE", "BUILD"):
644 self._insert_refresh(task)
645 # TODO add VM reset, get console, etc...
646 else:
647 raise vimconn.vimconnException(self.name + "unknown vim_action action {}".format(task["action"]))
648
649 def insert_task(self, task):
650 try:
651 self.task_queue.put(task, False)
652 return None
653 except Queue.Full:
654 raise vimconn.vimconnException(self.name + ": timeout inserting a task")
655
656 def del_task(self, task):
657 with self.task_lock:
658 if task["status"] == "SCHEDULED":
659 task["status"] = "SUPERSEDED"
660 return True
661 else: # task["status"] == "processing"
662 self.task_lock.release()
663 return False
664
665 def run(self):
666 self.logger.debug("Starting")
667 while True:
668 self._reload_vim_actions()
669 reload_thread = False
670
671 while True:
672 try:
673 while not self.task_queue.empty():
674 task = self.task_queue.get()
675 if isinstance(task, list):
676 self._insert_pending_tasks(task)
677 elif isinstance(task, str):
678 if task == 'exit':
679 return 0
680 elif task == 'reload':
681 reload_thread = True
682 break
683 self.task_queue.task_done()
684 if reload_thread:
685 break
686 nb_processed = self._proccess_pending_tasks()
687 nb_processed += self._refres_elements()
688 if not nb_processed:
689 time.sleep(1)
690
691 except Exception as e:
692 self.logger.critical("Unexpected exception at run: " + str(e), exc_info=True)
693
694 self.logger.debug("Finishing")
695
696 def _look_for_task(self, instance_action_id, task_id):
697 task_index = task_id.split("-")[-1]
698 with self.db_lock:
699 tasks = self.db.get_rows(FROM="vim_actions", WHERE={"instance_action_id": instance_action_id,
700 "task_index": task_index})
701 if not tasks:
702 return None
703 task = tasks[0]
704 task["params"] = None
705 task["depends"] = {}
706 if task["extra"]:
707 extra = yaml.load(task["extra"])
708 task["extra"] = extra
709 task["params"] = extra.get("params")
710 if extra.get("interfaces"):
711 task["vim_interfaces"] = {}
712 else:
713 task["extra"] = {}
714 return task
715
716 @staticmethod
717 def _format_vim_error_msg(error_text, max_length=1024):
718 if error_text and len(error_text) >= max_length:
719 return error_text[:max_length//2-3] + " ... " + error_text[-max_length//2+3:]
720 return error_text
721
722 def new_vm(self, task):
723 task_id = task["instance_action_id"] + "." + str(task["task_index"])
724 try:
725 params = task["params"]
726 depends = task.get("depends")
727 net_list = params[5]
728 for net in net_list:
729 if "net_id" in net and is_task_id(net["net_id"]): # change task_id into network_id
730 task_dependency = task["depends"].get(net["net_id"])
731 if not task_dependency:
732 task_dependency = self._look_for_task(task["instance_action_id"], net["net_id"])
733 if not task_dependency:
734 raise VimThreadException(
735 "Cannot get depending net task trying to get depending task {}.{}".format(
736 task["instance_action_id"], net["net_id"]))
737 network_id = task_dependency.get("vim_id")
738 if not network_id:
739 raise VimThreadException(
740 "Cannot create VM because depends on a network not created or found: " +
741 str(depends[net["net_id"]]["error_msg"]))
742 net["net_id"] = network_id
743 vim_vm_id, created_items = self.vim.new_vminstance(*params)
744
745 # fill task_interfaces. Look for snd_net_id at database for each interface
746 task_interfaces = {}
747 for iface in net_list:
748 task_interfaces[iface["vim_id"]] = {"iface_id": iface["uuid"]}
749 with self.db_lock:
750 result = self.db.get_rows(
751 SELECT=('sdn_net_id',),
752 FROM='instance_nets as ine join instance_interfaces as ii on ii.instance_net_id=ine.uuid',
753 WHERE={'ii.uuid': iface["uuid"]})
754 if result:
755 task_interfaces[iface["vim_id"]]["sdn_net_id"] = result[0]['sdn_net_id']
756 else:
757 self.logger.critical("task={} new-VM: instance_nets uuid={} not found at DB".format(task_id,
758 iface["uuid"]), exc_info=True)
759
760 task["vim_info"] = {}
761 task["vim_interfaces"] = {}
762 task["extra"]["interfaces"] = task_interfaces
763 task["extra"]["created"] = True
764 task["extra"]["created_items"] = created_items
765 task["error_msg"] = None
766 task["status"] = "DONE"
767 task["vim_id"] = vim_vm_id
768 instance_element_update = {"status": "BUILD", "vim_vm_id": vim_vm_id, "error_msg": None}
769 return True, instance_element_update
770
771 except (vimconn.vimconnException, VimThreadException) as e:
772 self.logger.error("task={} new-VM: {}".format(task_id, e))
773 error_text = self._format_vim_error_msg(str(e))
774 task["error_msg"] = error_text
775 task["status"] = "FAILED"
776 task["vim_id"] = None
777 instance_element_update = {"status": "VIM_ERROR", "vim_vm_id": None, "error_msg": error_text}
778 return False, instance_element_update
779
780 def del_vm(self, task):
781 task_id = task["instance_action_id"] + "." + str(task["task_index"])
782 vm_vim_id = task["vim_id"]
783 interfaces = task["extra"].get("interfaces", ())
784 try:
785 for iface in interfaces.values():
786 if iface.get("sdn_port_id"):
787 try:
788 with self.db_lock:
789 self.ovim.delete_port(iface["sdn_port_id"])
790 except ovimException as e:
791 self.logger.error("task={} del-VM: ovimException when deleting external_port={}: {} ".format(
792 task_id, iface["sdn_port_id"], e), exc_info=True)
793 # TODO Set error_msg at instance_nets
794
795 self.vim.delete_vminstance(vm_vim_id, task["extra"].get("created_items"))
796 task["status"] = "DONE"
797 task["error_msg"] = None
798 return True, None
799
800 except vimconn.vimconnException as e:
801 task["error_msg"] = self._format_vim_error_msg(str(e))
802 if isinstance(e, vimconn.vimconnNotFoundException):
803 # If not found mark as Done and fill error_msg
804 task["status"] = "DONE"
805 return True, None
806 task["status"] = "FAILED"
807 return False, None
808
809 def _get_net_internal(self, task, filter_param):
810 """
811 Common code for get_net and new_net. It looks for a network on VIM with the filter_params
812 :param task: task for this find or find-or-create action
813 :param filter_param: parameters to send to the vimconnector
814 :return: a dict with the content to update the instance_nets database table. Raises an exception on error, or
815 when network is not found or found more than one
816 """
817 vim_nets = self.vim.get_network_list(filter_param)
818 if not vim_nets:
819 raise VimThreadExceptionNotFound("Network not found with this criteria: '{}'".format(filter_param))
820 elif len(vim_nets) > 1:
821 raise VimThreadException("More than one network found with this criteria: '{}'".format(filter_param))
822 vim_net_id = vim_nets[0]["id"]
823
824 # Discover if this network is managed by a sdn controller
825 sdn_net_id = None
826 with self.db_lock:
827 result = self.db.get_rows(SELECT=('sdn_net_id',), FROM='instance_nets',
828 WHERE={'vim_net_id': vim_net_id, 'instance_scenario_id': None,
829 'datacenter_tenant_id': self.datacenter_tenant_id})
830 if result:
831 sdn_net_id = result[0]['sdn_net_id']
832
833 task["status"] = "DONE"
834 task["extra"]["vim_info"] = {}
835 task["extra"]["created"] = False
836 task["extra"]["sdn_net_id"] = sdn_net_id
837 task["error_msg"] = None
838 task["vim_id"] = vim_net_id
839 instance_element_update = {"vim_net_id": vim_net_id, "created": False, "status": "BUILD",
840 "error_msg": None, "sdn_net_id": sdn_net_id}
841 return instance_element_update
842
843 def get_net(self, task):
844 task_id = task["instance_action_id"] + "." + str(task["task_index"])
845 try:
846 params = task["params"]
847 filter_param = params[0]
848 instance_element_update = self._get_net_internal(task, filter_param)
849 return True, instance_element_update
850
851 except (vimconn.vimconnException, VimThreadException) as e:
852 self.logger.error("task={} get-net: {}".format(task_id, e))
853 task["status"] = "FAILED"
854 task["vim_id"] = None
855 task["error_msg"] = self._format_vim_error_msg(str(e))
856 instance_element_update = {"vim_net_id": None, "status": "VIM_ERROR",
857 "error_msg": task["error_msg"]}
858 return False, instance_element_update
859
860 def new_net(self, task):
861 vim_net_id = None
862 sdn_net_id = None
863 task_id = task["instance_action_id"] + "." + str(task["task_index"])
864 action_text = ""
865 try:
866 # FIND
867 if task["extra"].get("find"):
868 action_text = "finding"
869 filter_param = task["extra"]["find"][0]
870 try:
871 instance_element_update = self._get_net_internal(task, filter_param)
872 return True, instance_element_update
873 except VimThreadExceptionNotFound:
874 pass
875 # CREATE
876 params = task["params"]
877 action_text = "creating VIM"
878 vim_net_id = self.vim.new_network(*params)
879
880 net_name = params[0]
881 net_type = params[1]
882
883 sdn_controller = self.vim.config.get('sdn-controller')
884 if sdn_controller and (net_type == "data" or net_type == "ptp"):
885 network = {"name": net_name, "type": net_type, "region": self.vim["config"]["datacenter_id"]}
886
887 vim_net = self.vim.get_network(vim_net_id)
888 if vim_net.get('encapsulation') != 'vlan':
889 raise vimconn.vimconnException(
890 "net '{}' defined as type '{}' has not vlan encapsulation '{}'".format(
891 net_name, net_type, vim_net['encapsulation']))
892 network["vlan"] = vim_net.get('segmentation_id')
893 action_text = "creating SDN"
894 with self.db_lock:
895 sdn_net_id = self.ovim.new_network(network)
896 task["status"] = "DONE"
897 task["extra"]["vim_info"] = {}
898 task["extra"]["sdn_net_id"] = sdn_net_id
899 task["extra"]["created"] = True
900 task["error_msg"] = None
901 task["vim_id"] = vim_net_id
902 instance_element_update = {"vim_net_id": vim_net_id, "sdn_net_id": sdn_net_id, "status": "BUILD",
903 "created": True, "error_msg": None}
904 return True, instance_element_update
905 except (vimconn.vimconnException, ovimException) as e:
906 self.logger.error("task={} new-net: Error {}: {}".format(task_id, action_text, e))
907 task["status"] = "FAILED"
908 task["vim_id"] = vim_net_id
909 task["error_msg"] = self._format_vim_error_msg(str(e))
910 task["extra"]["sdn_net_id"] = sdn_net_id
911 instance_element_update = {"vim_net_id": vim_net_id, "sdn_net_id": sdn_net_id, "status": "VIM_ERROR",
912 "error_msg": task["error_msg"]}
913 return False, instance_element_update
914
915 def del_net(self, task):
916 net_vim_id = task["vim_id"]
917 sdn_net_id = task["extra"].get("sdn_net_id")
918 try:
919 if sdn_net_id:
920 # Delete any attached port to this sdn network. There can be ports associated to this network in case
921 # it was manually done using 'openmano vim-net-sdn-attach'
922 with self.db_lock:
923 port_list = self.ovim.get_ports(columns={'uuid'},
924 filter={'name': 'external_port', 'net_id': sdn_net_id})
925 for port in port_list:
926 self.ovim.delete_port(port['uuid'])
927 self.ovim.delete_network(sdn_net_id)
928 if net_vim_id:
929 self.vim.delete_network(net_vim_id)
930 task["status"] = "DONE"
931 task["error_msg"] = None
932 return True, None
933 except ovimException as e:
934 task["error_msg"] = self._format_vim_error_msg("ovimException obtaining and deleting external "
935 "ports for net {}: {}".format(sdn_net_id, str(e)))
936 except vimconn.vimconnException as e:
937 task["error_msg"] = self._format_vim_error_msg(str(e))
938 if isinstance(e, vimconn.vimconnNotFoundException):
939 # If not found mark as Done and fill error_msg
940 task["status"] = "DONE"
941 return True, None
942 task["status"] = "FAILED"
943 return False, None