22a882ed71fa1a5999fa121867ba4c15a906ee4d
[osm/RO.git] / osm_ro / vim_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
5 # This file is part of openvim
6 # All Rights Reserved.
7 #
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
11 #
12 # http://www.apache.org/licenses/LICENSE-2.0
13 #
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
18 # under the License.
19 #
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
22 ##
23
24 """"
25 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
26 The tasks are stored at database in table vim_actions
27 The task content are (M: stored at memory, D: stored at database):
28 MD instance_action_id: reference a global action over an instance-scenario: database instance_actions
29 MD task_index: index number of the task. This with the previous are a key
30 MD datacenter_vim_id: should contain the uuid of the VIM managed by this thread
31 MD vim_id: id of the vm,net,etc at VIM
32 MD action: CREATE, DELETE, FIND
33 MD item: database table name, can be instance_vms, instance_nets, TODO: datacenter_flavors, datacenter_images
34 MD item_id: uuid of the referenced entry in the preious table
35 MD status: SCHEDULED,BUILD,DONE,FAILED,SUPERSEDED
36 MD extra: text with yaml format at database, dict at memory with:
37 params: list with the params to be sent to the VIM for CREATE or FIND. For DELETE the vim_id is taken from other related tasks
38 find: (only for CREATE tasks) if present it should FIND before creating and use if existing. Contains the FIND params
39 depends_on: list with the 'task_index'es of tasks that must be completed before. e.g. a vm depends on a net
40 sdn_net_id: used for net.
41 tries:
42 interfaces: used for VMs. Each key is the uuid of the instance_interfaces entry at database
43 iface_id: uuid of intance_interfaces
44 sdn_port_id:
45 sdn_net_id:
46 created: False if the VIM element is not created by other actions, and it should not be deleted
47 vim_status: VIM status of the element. Stored also at database in the instance_XXX
48 M depends: dict with task_index(from depends_on) to task class
49 M params: same as extra[params] but with the resolved dependencies
50 M vim_interfaces: similar to extra[interfaces] but with VIM information. Stored at database in the instance_XXX but not at vim_actions
51 M vim_info: Detailed information of a vm,net from the VIM. Stored at database in the instance_XXX but not at vim_actions
52 MD error_msg: descriptive text upon an error.Stored also at database instance_XXX
53 MD created_at: task creation time
54 MD modified_at: last task update time. On refresh it contains when this task need to be refreshed
55
56 """
57
58 import threading
59 import time
60 import Queue
61 import logging
62 import vimconn
63 import yaml
64 from db_base import db_base_Exception
65 from lib_osm_openvim.ovim import ovimException
66
67 __author__ = "Alfonso Tierno, Pablo Montes"
68 __date__ = "$28-Sep-2017 12:07:15$"
69
70 # from logging import Logger
71 # import auxiliary_functions as af
72
73
74 def is_task_id(task_id):
75 return task_id.startswith("TASK-")
76
77
78 class VimThreadException(Exception):
79 pass
80
81
82 class VimThreadExceptionNotFound(VimThreadException):
83 pass
84
85
86 class vim_thread(threading.Thread):
87 REFRESH_BUILD = 5 # 5 seconds
88 REFRESH_ACTIVE = 60 # 1 minute
89
90 def __init__(self, myvimconn, task_lock, name=None, datacenter_name=None, datacenter_tenant_id=None,
91 db=None, db_lock=None, ovim=None):
92 """Init a thread.
93 Arguments:
94 'id' number of thead
95 'name' name of thread
96 'host','user': host ip or name to manage and user
97 'db', 'db_lock': database class and lock to use it in exclusion
98 """
99 threading.Thread.__init__(self)
100 if isinstance(myvimconn, vimconn.vimconnException):
101 self.vim = None
102 self.error_status = "Error accesing to VIM: {}".format(myvimconn)
103 else:
104 self.vim = myvimconn
105 self.error_status = None
106 self.datacenter_name = datacenter_name
107 self.datacenter_tenant_id = datacenter_tenant_id
108 self.ovim = ovim
109 if not name:
110 self.name = vimconn["id"] + "." + vimconn["config"]["datacenter_tenant_id"]
111 else:
112 self.name = name
113
114 self.logger = logging.getLogger('openmano.vim.'+self.name)
115 self.db = db
116 self.db_lock = db_lock
117
118 self.task_lock = task_lock
119 self.task_queue = Queue.Queue(2000)
120
121 self.refresh_tasks = []
122 """Contains time ordered task list for refreshing the status of VIM VMs and nets"""
123
124 self.pending_tasks = []
125 """Contains time ordered task list for creation, deletion of VIM VMs and nets"""
126
127 self.grouped_tasks = {}
128 """ It contains all the creation/deletion pending tasks grouped by its concrete vm, net, etc
129 <item><item_id>:
130 - <task1> # e.g. CREATE task
131 <task2> # e.g. DELETE task
132 """
133
134 def _reload_vim_actions(self):
135 """
136 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
137 :return: None
138 """
139 action_completed = False
140 task_list = []
141 old_action_key = None
142
143 with self.db_lock:
144 vim_actions = self.db.get_rows(FROM="vim_actions",
145 WHERE={"datacenter_vim_id": self.datacenter_tenant_id },
146 ORDER_BY=("item", "item_id", "created_at",))
147 for task in vim_actions:
148 item = task["item"]
149 item_id = task["item_id"]
150 action_key = item + item_id
151 if old_action_key != action_key:
152 if not action_completed and task_list:
153 # This will fill needed task parameters into memory, and insert the task if needed in
154 # self.pending_tasks or self.refresh_tasks
155 self._insert_pending_tasks(task_list)
156 task_list = []
157 old_action_key = action_key
158 action_completed = False
159 elif action_completed:
160 continue
161
162 if task["status"] == "SCHEDULED" or task["action"] == "CREATE" or task["action"] == "FIND":
163 task_list.append(task)
164 elif task["action"] == "DELETE":
165 # action completed because deleted and status is not SCHEDULED. Not needed anything
166 action_completed = True
167
168 # Last actions group need to be inserted too
169 if not action_completed and task_list:
170 self._insert_pending_tasks(task_list)
171
172 def _refres_elements(self):
173 """Call VIM to get VMs and networks status until 10 elements"""
174 now = time.time()
175 nb_processed = 0
176 vm_to_refresh_list = []
177 net_to_refresh_list = []
178 vm_to_refresh_dict = {}
179 net_to_refresh_dict = {}
180 items_to_refresh = 0
181 while self.refresh_tasks:
182 task = self.refresh_tasks[0]
183 with self.task_lock:
184 if task['status'] == 'SUPERSEDED':
185 self.refresh_tasks.pop(0)
186 continue
187 if task['modified_at'] > now:
188 break
189 # task["status"] = "processing"
190 nb_processed += 1
191 self.refresh_tasks.pop(0)
192 if task["item"] == 'instance_vms':
193 vm_to_refresh_list.append(task["vim_id"])
194 vm_to_refresh_dict[task["vim_id"]] = task
195 elif task["item"] == 'instance_nets':
196 net_to_refresh_list.append(task["vim_id"])
197 net_to_refresh_dict[task["vim_id"]] = task
198 else:
199 error_text = "unknown task {}".format(task["item"])
200 self.logger.error(error_text)
201 items_to_refresh += 1
202 if items_to_refresh == 10:
203 break
204
205 if vm_to_refresh_list:
206 try:
207 now = time.time()
208 vim_dict = self.vim.refresh_vms_status(vm_to_refresh_list)
209 for vim_id, vim_info in vim_dict.items():
210 # look for task
211 task_need_update = False
212 task = vm_to_refresh_dict[vim_id]
213 self.logger.debug("get-vm vm_id=%s result=%s", task["vim_id"], str(vim_info))
214
215 # update database
216 task_vim_info = task.get("vim_info")
217 task_error_msg = task.get("error_msg")
218 task_vim_status = task["extra"].get("vim_status")
219 if vim_info.get("error_msg"):
220 vim_info["error_msg"] = self._format_vim_error_msg(vim_info["error_msg"])
221 if task_vim_status != vim_info["status"] or task_error_msg != vim_info.get("error_msg") or \
222 task_vim_info != vim_info.get("vim_info"):
223 with self.db_lock:
224 temp_dict = {"status": vim_info["status"],
225 "error_msg": vim_info.get("error_msg"),
226 "vim_info": vim_info.get("vim_info")}
227 self.db.update_rows('instance_vms', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]})
228 task["extra"]["vim_status"] = vim_info["status"]
229 task["error_msg"] = vim_info.get("error_msg")
230 task["vim_info"] = vim_info.get("vim_info")
231 task_need_update = True
232 for interface in vim_info.get("interfaces", ()):
233 vim_interface_id = interface["vim_interface_id"]
234 if vim_interface_id not in task["extra"]["interfaces"]:
235 self.logger.critical("Interface not found {} on task info {}".format(
236 vim_interface_id, task["extra"]["interfaces"]), exc_info=True)
237 continue
238 task_interface = task["extra"]["interfaces"][vim_interface_id]
239 task_vim_interface = task["vim_interfaces"].get(vim_interface_id)
240 if task_vim_interface != interface:
241 # delete old port
242 if task_interface.get("sdn_port_id"):
243 try:
244 with self.db_lock:
245 self.ovim.delete_port(task_interface["sdn_port_id"])
246 task_interface["sdn_port_id"] = None
247 task_need_update = True
248 except ovimException as e:
249 self.logger.error("ovimException deleting external_port={} ".format(
250 task_interface["sdn_port_id"]) + str(e), exc_info=True)
251 # TODO Set error_msg at instance_nets
252
253 # Create SDN port
254 sdn_net_id = task_interface.get("sdn_net_id")
255 if sdn_net_id and interface.get("compute_node") and interface.get("pci"):
256 sdn_port_name = sdn_net_id + "." + task["vim_id"]
257 sdn_port_name = sdn_port_name[:63]
258 try:
259 with self.db_lock:
260 sdn_port_id = self.ovim.new_external_port(
261 {"compute_node": interface["compute_node"],
262 "pci": interface["pci"],
263 "vlan": interface.get("vlan"),
264 "net_id": sdn_net_id,
265 "region": self.vim["config"]["datacenter_id"],
266 "name": sdn_port_name,
267 "mac": interface.get("mac_address")})
268 task_interface["sdn_port_id"] = sdn_port_id
269 task_need_update = True
270 except (ovimException, Exception) as e:
271 self.logger.error(
272 "ovimException creating new_external_port compute_node={} "
273 "pci={} vlan={} ".format(
274 interface["compute_node"],
275 interface["pci"],
276 interface.get("vlan")) + str(e),
277 exc_info=True)
278 # TODO Set error_msg at instance_nets
279 with self.db_lock:
280 self.db.update_rows(
281 'instance_interfaces',
282 UPDATE={"mac_address": interface.get("mac_address"),
283 "ip_address": interface.get("ip_address"),
284 "vim_info": interface.get("vim_info"),
285 "sdn_port_id": task_interface.get("sdn_port_id"),
286 "compute_node": interface.get("compute_node"),
287 "pci": interface.get("pci"),
288 "vlan": interface.get("vlan"),
289 },
290 WHERE={'uuid': task_interface["iface_id"]})
291 task["vim_interfaces"][vim_interface_id] = interface
292 if task_need_update:
293 with self.db_lock:
294 self.db.update_rows(
295 'vim_actions',
296 UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256),
297 "error_msg": task.get("error_msg"), "modified_at": now},
298 WHERE={'instance_action_id': task['instance_action_id'],
299 'task_index': task['task_index']})
300 if task["extra"].get("vim_status") == "BUILD":
301 self._insert_refresh(task, now + self.REFRESH_BUILD)
302 else:
303 self._insert_refresh(task, now + self.REFRESH_ACTIVE)
304 except vimconn.vimconnException as e:
305 self.logger.error("vimconnException Exception when trying to refresh vms " + str(e))
306 self._insert_refresh(task, now + self.REFRESH_ACTIVE)
307
308 if net_to_refresh_list:
309 try:
310 now = time.time()
311 vim_dict = self.vim.refresh_nets_status(net_to_refresh_list)
312 for vim_id, vim_info in vim_dict.items():
313 # look for task
314 task = net_to_refresh_dict[vim_id]
315 self.logger.debug("get-net net_id=%s result=%s", task["vim_id"], str(vim_info))
316
317 task_vim_info = task.get("vim_info")
318 task_vim_status = task["extra"].get("vim_status")
319 task_error_msg = task.get("error_msg")
320 task_sdn_net_id = task["extra"].get("sdn_net_id")
321
322 # get ovim status
323 if task_sdn_net_id:
324 try:
325 with self.db_lock:
326 sdn_net = self.ovim.show_network(task_sdn_net_id)
327 if sdn_net["status"] == "ERROR":
328 if not vim_info.get("error_msg"):
329 vim_info["error_msg"] = sdn_net["error_msg"]
330 else:
331 vim_info["error_msg"] = "VIM_ERROR: {} && SDN_ERROR: {}".format(
332 self._format_vim_error_msg(vim_info["error_msg"], 1024//2-14),
333 self._format_vim_error_msg(sdn_net["error_msg"], 1024//2-14))
334 if vim_info["status"] == "VIM_ERROR":
335 vim_info["status"] = "VIM_SDN_ERROR"
336 else:
337 vim_info["status"] = "SDN_ERROR"
338
339 except (ovimException, Exception) as e:
340 self.logger.error(
341 "ovimException getting network infor snd_net_id={}".format(task_sdn_net_id),
342 exc_info=True)
343 # TODO Set error_msg at instance_nets
344
345 # update database
346 if vim_info.get("error_msg"):
347 vim_info["error_msg"] = self._format_vim_error_msg(vim_info["error_msg"])
348 if task_vim_status != vim_info["status"] or task_error_msg != vim_info.get("error_msg") or \
349 task_vim_info != vim_info["vim_info"]:
350 task["extra"]["vim_status"] = vim_info["status"]
351 task["error_msg"] = vim_info.get("error_msg")
352 task["vim_info"] = vim_info["vim_info"]
353 temp_dict = {"status": vim_info["status"],
354 "error_msg": vim_info.get("error_msg"),
355 "vim_info": vim_info["vim_info"]}
356 with self.db_lock:
357 self.db.update_rows('instance_nets', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]})
358 self.db.update_rows(
359 'vim_actions',
360 UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256),
361 "error_msg": task.get("error_msg"), "modified_at": now},
362 WHERE={'instance_action_id': task['instance_action_id'],
363 'task_index': task['task_index']})
364 if task["extra"].get("vim_status") == "BUILD":
365 self._insert_refresh(task, now + self.REFRESH_BUILD)
366 else:
367 self._insert_refresh(task, now + self.REFRESH_ACTIVE)
368 except vimconn.vimconnException as e:
369 self.logger.error("vimconnException Exception when trying to refresh nets " + str(e))
370 self._insert_refresh(task, now + self.REFRESH_ACTIVE)
371
372 return nb_processed
373
374 def _insert_refresh(self, task, threshold_time=None):
375 """Insert a task at list of refreshing elements. The refreshing list is ordered by threshold_time (task['modified_at']
376 It is assumed that this is called inside this thread
377 """
378 if not self.vim:
379 return
380 if not threshold_time:
381 threshold_time = time.time()
382 task["modified_at"] = threshold_time
383 task_name = task["item"][9:] + "-" + task["action"]
384 task_id = task["instance_action_id"] + "." + str(task["task_index"])
385 for index in range(0, len(self.refresh_tasks)):
386 if self.refresh_tasks[index]["modified_at"] > threshold_time:
387 self.refresh_tasks.insert(index, task)
388 break
389 else:
390 index = len(self.refresh_tasks)
391 self.refresh_tasks.append(task)
392 self.logger.debug("new refresh task={} name={}, modified_at={} index={}".format(
393 task_id, task_name, task["modified_at"], index))
394
395 def _remove_refresh(self, task_name, vim_id):
396 """Remove a task with this name and vim_id from the list of refreshing elements.
397 It is assumed that this is called inside this thread outside _refres_elements method
398 Return True if self.refresh_list is modified, task is found
399 Return False if not found
400 """
401 index_to_delete = None
402 for index in range(0, len(self.refresh_tasks)):
403 if self.refresh_tasks[index]["name"] == task_name and self.refresh_tasks[index]["vim_id"] == vim_id:
404 index_to_delete = index
405 break
406 else:
407 return False
408 if index_to_delete != None:
409 del self.refresh_tasks[index_to_delete]
410 return True
411
412 def _proccess_pending_tasks(self):
413 nb_created = 0
414 nb_processed = 0
415 while self.pending_tasks:
416 task = self.pending_tasks.pop(0)
417 nb_processed += 1
418 if task["status"] == "SUPERSEDED":
419 # not needed to do anything but update database with the new status
420 result = True
421 database_update = None
422 elif not self.vim:
423 task["status"] == "ERROR"
424 task["error_msg"] = self.error_status
425 result = False
426 database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]}
427 elif task["item"] == 'instance_vms':
428 if task["action"] == "CREATE":
429 result, database_update = self.new_vm(task)
430 nb_created += 1
431 elif task["action"] == "DELETE":
432 result, database_update = self.del_vm(task)
433 else:
434 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
435 elif task["item"] == 'instance_nets':
436 if task["action"] == "CREATE":
437 result, database_update = self.new_net(task)
438 nb_created += 1
439 elif task["action"] == "DELETE":
440 result, database_update = self.del_net(task)
441 elif task["action"] == "FIND":
442 result, database_update = self.get_net(task)
443 else:
444 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
445 else:
446 raise vimconn.vimconnException(self.name + "unknown task item {}".format(task["item"]))
447 # TODO
448
449 if task["status"] == "SCHEDULED":
450 # This is because a depend task is not completed. Moved to the end. NOT USED YET
451 if task["extra"].get("tries", 0) > 3:
452 task["status"] == "FAILED"
453 else:
454 task["extra"]["tries"] = task["extra"].get("tries", 0) + 1
455 self.pending_tasks.append(task)
456 elif task["action"] == "DELETE":
457 action_key = task["item"] + task["item_id"]
458 del self.grouped_tasks[action_key]
459 elif task["action"] in ("CREATE", "FIND") and task["status"] in ("DONE", "BUILD"):
460 self._insert_refresh(task)
461
462 self.logger.debug("vim_action id={}.{} item={} action={} result={}:{} params={}".format(
463 task["instance_action_id"], task["task_index"], task["item"], task["action"],
464 task["status"], task["vim_id"] if task["status"] == "DONE" else task.get("error_msg"),
465 task["params"]))
466 try:
467 now = time.time()
468 with self.db_lock:
469 self.db.update_rows(
470 table="vim_actions",
471 UPDATE={"status": task["status"], "vim_id": task["vim_id"], "modified_at": now,
472 "error_msg": task["error_msg"],
473 "extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256)},
474 WHERE={"instance_action_id": task["instance_action_id"], "task_index": task["task_index"]})
475 if result is not None:
476 self.db.update_rows(
477 table="instance_actions",
478 UPDATE={("number_done" if result else "number_failed"): {"INCREMENT": 1},
479 "modified_at": now},
480 WHERE={"uuid": task["instance_action_id"]})
481 if database_update:
482 self.db.update_rows(table=task["item"],
483 UPDATE=database_update,
484 WHERE={"uuid": task["item_id"]})
485 except db_base_Exception as e:
486 self.logger.error("Error updating database %s", str(e), exc_info=True)
487
488 if nb_created == 10:
489 break
490 return nb_processed
491
492 def _insert_pending_tasks(self, vim_actions_list):
493 now = time.time()
494 for task in vim_actions_list:
495 if task["datacenter_vim_id"] != self.datacenter_tenant_id:
496 continue
497 item = task["item"]
498 item_id = task["item_id"]
499 action_key = item + item_id
500 if action_key not in self.grouped_tasks:
501 self.grouped_tasks[action_key] = []
502 task["params"] = None
503 task["depends"] = {}
504 if task["extra"]:
505 extra = yaml.load(task["extra"])
506 task["extra"] = extra
507 task["params"] = extra.get("params")
508 depends_on_list = extra.get("depends_on")
509 if depends_on_list:
510 for index in depends_on_list:
511 if index < len(vim_actions_list) and vim_actions_list[index]["task_index"] == index and\
512 vim_actions_list[index]["instance_action_id"] == task["instance_action_id"]:
513 task["depends"]["TASK-" + str(index)] = vim_actions_list[index]
514 if extra.get("interfaces"):
515 task["vim_interfaces"] = {}
516 else:
517 task["extra"] = {}
518 if "error_msg" not in task:
519 task["error_msg"] = None
520 if "vim_id" not in task:
521 task["vim_id"] = None
522
523 if task["action"] == "DELETE":
524 need_delete_action = False
525 for to_supersede in self.grouped_tasks.get(action_key, ()):
526 if to_supersede["action"] == "FIND" and to_supersede.get("vim_id"):
527 task["vim_id"] = to_supersede["vim_id"]
528 if to_supersede["action"] == "CREATE" and to_supersede.get("vim_id") and \
529 to_supersede["extra"].get("created", True):
530 need_delete_action = True
531 task["vim_id"] = to_supersede["vim_id"]
532 if to_supersede["extra"].get("sdn_vim_id"):
533 task["extra"]["sdn_vim_id"] = to_supersede["extra"]["sdn_vim_id"]
534 if to_supersede["extra"].get("interfaces"):
535 task["extra"]["interfaces"] = to_supersede["extra"]["interfaces"]
536 # Mark task as SUPERSEDED.
537 # If task is in self.pending_tasks, it will be removed and database will be update
538 # If task is in self.refresh_tasks, it will be removed
539 to_supersede["status"] = "SUPERSEDED"
540 if not need_delete_action:
541 task["status"] = "SUPERSEDED"
542
543 self.grouped_tasks[action_key].append(task)
544 self.pending_tasks.append(task)
545 elif task["status"] == "SCHEDULED":
546 self.grouped_tasks[action_key].append(task)
547 self.pending_tasks.append(task)
548 elif task["action"] in ("CREATE", "FIND"):
549 self.grouped_tasks[action_key].append(task)
550 if task["status"] in ("DONE", "BUILD"):
551 self._insert_refresh(task)
552 # TODO add VM reset, get console, etc...
553 else:
554 raise vimconn.vimconnException(self.name + "unknown vim_action action {}".format(task["action"]))
555
556 def insert_task(self, task):
557 try:
558 self.task_queue.put(task, False)
559 return None
560 except Queue.Full:
561 raise vimconn.vimconnException(self.name + ": timeout inserting a task")
562
563 def del_task(self, task):
564 with self.task_lock:
565 if task["status"] == "SCHEDULED":
566 task["status"] == "SUPERSEDED"
567 return True
568 else: # task["status"] == "processing"
569 self.task_lock.release()
570 return False
571
572 def run(self):
573 self.logger.debug("Starting")
574 while True:
575 self._reload_vim_actions()
576 reload_thread = False
577 while True:
578 try:
579 while not self.task_queue.empty():
580 task = self.task_queue.get()
581 if isinstance(task, list):
582 self._insert_pending_tasks(task)
583 elif isinstance(task, str):
584 if task == 'exit':
585 return 0
586 elif task == 'reload':
587 reload_thread = True
588 break
589 self.task_queue.task_done()
590 if reload_thread:
591 break
592 nb_processed = self._proccess_pending_tasks()
593 nb_processed += self._refres_elements()
594 if not nb_processed:
595 time.sleep(1)
596
597 except Exception as e:
598 self.logger.critical("Unexpected exception at run: " + str(e), exc_info=True)
599
600 self.logger.debug("Finishing")
601
602 def terminate(self, task):
603 return True, None
604
605 def _look_for_task(self, instance_action_id, task_id):
606 task_index = task_id.split("-")[-1]
607 with self.db_lock:
608 tasks = self.db.get_rows(FROM="vim_actions", WHERE={"instance_action_id": instance_action_id,
609 "task_index": task_index})
610 if not tasks:
611 return None
612 task = tasks[0]
613 task["params"] = None
614 task["depends"] = {}
615 if task["extra"]:
616 extra = yaml.load(task["extra"])
617 task["extra"] = extra
618 task["params"] = extra.get("params")
619 if extra.get("interfaces"):
620 task["vim_interfaces"] = {}
621 else:
622 task["extra"] = {}
623 return task
624
625 def _format_vim_error_msg(self, error_text, max_length=1024):
626 if error_text and len(error_text) >= max_length:
627 return error_text[:max_length//2-3] + " ... " + error_text[-max_length//2+3:]
628 return error_text
629
630 def new_vm(self, task):
631 try:
632 params = task["params"]
633 task_id = task["instance_action_id"] + "." + str(task["task_index"])
634 depends = task.get("depends")
635 net_list = params[5]
636 error_text = ""
637 for net in net_list:
638 if "net_id" in net and is_task_id(net["net_id"]): # change task_id into network_id
639 if net["net_id"] in depends:
640 task_net = depends[net["net_id"]]
641 else:
642 task_net = self._look_for_task(task["instance_action_id"], net["net_id"])
643 if not task_net:
644 raise VimThreadException(
645 "Error trying to get depending task from task_index={}".format(net["net_id"]))
646 network_id = task_net.get("vim_id")
647 if not network_id:
648 raise VimThreadException(
649 "Cannot create VM because depends on a network not created or found: " +
650 str(task_net["error_msg"]))
651 net["net_id"] = network_id
652 vim_vm_id = self.vim.new_vminstance(*params)
653
654 # fill task_interfaces. Look for snd_net_id at database for each interface
655 task_interfaces = {}
656 for iface in net_list:
657 task_interfaces[iface["vim_id"]] = {"iface_id": iface["uuid"]}
658 with self.db_lock:
659 result = self.db.get_rows(SELECT=('sdn_net_id',),
660 FROM='instance_nets as ine join instance_interfaces as ii on ii.instance_net_id=ine.uuid',
661 WHERE={'ii.uuid': iface["uuid"]})
662 if result:
663 task_interfaces[iface["vim_id"]]["sdn_net_id"] = result[0]['sdn_net_id']
664 else:
665 self.logger.critical("Error creating VM, task=%s Network {} not found at DB", task_id,
666 iface["uuid"], exc_info=True)
667
668 task["vim_info"] = {}
669 task["vim_interfaces"] = {}
670 task["extra"]["interfaces"] = task_interfaces
671 task["extra"]["created"] = True
672 task["error_msg"] = None
673 task["status"] = "DONE"
674 task["vim_id"] = vim_vm_id
675 instance_element_update = {"status": "BUILD", "vim_vm_id": vim_vm_id, "error_msg": None}
676 return True, instance_element_update
677
678 except (vimconn.vimconnException, VimThreadException) as e:
679 self.logger.error("Error creating VM, task=%s: %s", task_id, str(e))
680 error_text = self._format_vim_error_msg(str(e))
681 task["error_msg"] = error_text
682 task["status"] = "FAILED"
683 task["vim_id"] = None
684 instance_element_update = {"status": "VIM_ERROR", "vim_vm_id": None, "error_msg": error_text}
685 return False, instance_element_update
686
687 def del_vm(self, task):
688 vm_vim_id = task["vim_id"]
689 interfaces = task["extra"].get("interfaces", ())
690 try:
691 for iface in interfaces.values():
692 if iface.get("sdn_port_id"):
693 try:
694 with self.db_lock:
695 self.ovim.delete_port(iface["sdn_port_id"])
696 except ovimException as e:
697 self.logger.error("ovimException deleting external_port={} at VM vim_id={} deletion ".format(
698 iface["sdn_port_id"], vm_vim_id) + str(e), exc_info=True)
699 # TODO Set error_msg at instance_nets
700
701 self.vim.delete_vminstance(vm_vim_id)
702 task["status"] = "DONE"
703 task["error_msg"] = None
704 return True, None
705
706 except vimconn.vimconnException as e:
707 task["error_msg"] = self._format_vim_error_msg(str(e))
708 if isinstance(e, vimconn.vimconnNotFoundException):
709 # If not found mark as Done and fill error_msg
710 task["status"] = "DONE"
711 return True, None
712 task["status"] = "FAILED"
713 return False, None
714
715 def _get_net_internal(self, task, filter_param):
716 vim_nets = self.vim.get_network_list(filter_param)
717 if not vim_nets:
718 raise VimThreadExceptionNotFound("Network not found with this criteria: '{}'".format(filter))
719 elif len(vim_nets) > 1:
720 raise VimThreadException("More than one network found with this criteria: '{}'".format(filter))
721 vim_net_id = vim_nets[0]["id"]
722
723 # Discover if this network is managed by a sdn controller
724 sdn_net_id = None
725 with self.db_lock:
726 result = self.db.get_rows(SELECT=('sdn_net_id',), FROM='instance_nets',
727 WHERE={'vim_net_id': vim_net_id, 'instance_scenario_id': None,
728 'datacenter_tenant_id': self.datacenter_tenant_id})
729 if result:
730 sdn_net_id = result[0]['sdn_net_id']
731
732 task["status"] = "DONE"
733 task["extra"]["vim_info"] = {}
734 task["extra"]["created"] = False
735 task["extra"]["sdn_net_id"] = sdn_net_id
736 task["error_msg"] = None
737 task["vim_id"] = vim_net_id
738 instance_element_update = {"vim_net_id": vim_net_id, "created": False, "status": "BUILD",
739 "error_msg": None}
740 return instance_element_update
741
742 def get_net(self, task):
743 try:
744 task_id = task["instance_action_id"] + "." + str(task["task_index"])
745 params = task["params"]
746 filter_param = params[0]
747 instance_element_update = self._get_net_internal(task, filter_param)
748 return True, instance_element_update
749
750 except (vimconn.vimconnException, VimThreadException) as e:
751 self.logger.error("Error looking for NET, task=%s: %s", str(task_id), str(e))
752 task["status"] = "FAILED"
753 task["vim_id"] = None
754 task["error_msg"] = self._format_vim_error_msg(str(e))
755 instance_element_update = {"vim_net_id": None, "status": "VIM_ERROR",
756 "error_msg": task["error_msg"]}
757 return False, instance_element_update
758
759 def new_net(self, task):
760 try:
761 task_id = task["instance_action_id"] + "." + str(task["task_index"])
762 # FIND
763 if task["extra"].get("find"):
764 action_text = "finding"
765 filter_param = task["extra"]["find"][0]
766 try:
767 instance_element_update = self._get_net_internal(task, filter_param)
768 return True, instance_element_update
769 except VimThreadExceptionNotFound:
770 pass
771 # CREATE
772 params = task["params"]
773 action_text = "creating"
774 vim_net_id = self.vim.new_network(*params)
775
776 net_name = params[0]
777 net_type = params[1]
778
779 sdn_net_id = None
780 sdn_controller = self.vim.config.get('sdn-controller')
781 if sdn_controller and (net_type == "data" or net_type == "ptp"):
782 network = {"name": net_name, "type": net_type, "region": self.vim["config"]["datacenter_id"]}
783
784 vim_net = self.vim.get_network(vim_net_id)
785 if vim_net.get('encapsulation') != 'vlan':
786 raise vimconn.vimconnException(
787 "net '{}' defined as type '{}' has not vlan encapsulation '{}'".format(
788 net_name, net_type, vim_net['encapsulation']))
789 network["vlan"] = vim_net.get('segmentation_id')
790 try:
791 with self.db_lock:
792 sdn_net_id = self.ovim.new_network(network)
793 except (ovimException, Exception) as e:
794 self.logger.error("task=%s cannot create SDN network vim_net_id=%s input='%s' ovimException='%s'",
795 str(task_id), vim_net_id, str(network), str(e))
796 task["status"] = "DONE"
797 task["extra"]["vim_info"] = {}
798 task["extra"]["sdn_net_id"] = sdn_net_id
799 task["extra"]["created"] = True
800 task["error_msg"] = None
801 task["vim_id"] = vim_net_id
802 instance_element_update = {"vim_net_id": vim_net_id, "sdn_net_id": sdn_net_id, "status": "BUILD",
803 "created": True, "error_msg": None}
804 return True, instance_element_update
805 except vimconn.vimconnException as e:
806 self.logger.error("Error {} NET, task=%s: %s", action_text, str(task_id), str(e))
807 task["status"] = "FAILED"
808 task["vim_id"] = None
809 task["error_msg"] = self._format_vim_error_msg(str(e))
810 instance_element_update = {"vim_net_id": None, "sdn_net_id": None, "status": "VIM_ERROR",
811 "error_msg": task["error_msg"]}
812 return False, instance_element_update
813
814 def del_net(self, task):
815 net_vim_id = task["vim_id"]
816 sdn_net_id = task["extra"].get("sdn_net_id")
817 try:
818 if sdn_net_id:
819 # Delete any attached port to this sdn network. There can be ports associated to this network in case
820 # it was manually done using 'openmano vim-net-sdn-attach'
821 with self.db_lock:
822 port_list = self.ovim.get_ports(columns={'uuid'},
823 filter={'name': 'external_port', 'net_id': sdn_net_id})
824 for port in port_list:
825 self.ovim.delete_port(port['uuid'])
826 self.ovim.delete_network(sdn_net_id)
827 self.vim.delete_network(net_vim_id)
828 task["status"] = "DONE"
829 task["error_msg"] = None
830 return True, None
831 except ovimException as e:
832 task["error_msg"] = self._format_vim_error_msg("ovimException obtaining and deleting external "
833 "ports for net {}: {}".format(sdn_net_id, str(e)))
834 except vimconn.vimconnException as e:
835 task["error_msg"] = self._format_vim_error_msg(str(e))
836 if isinstance(e, vimconn.vimconnNotFoundException):
837 # If not found mark as Done and fill error_msg
838 task["status"] = "DONE"
839 return True, None
840 task["status"] = "FAILED"
841 return False, None