Feature 1413 resiliency to single component failure
[osm/RO.git] / osm_ro / vim_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
5 # This file is part of openvim
6 # All Rights Reserved.
7 #
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
11 #
12 # http://www.apache.org/licenses/LICENSE-2.0
13 #
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
18 # under the License.
19 #
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
22 ##
23
24 """"
25 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
26 The tasks are stored at database in table vim_actions
27 The task content is:
28 MD instance_action_id: reference a global action over an instance-scenario: database instance_actions
29 MD task_index: index number of the task. This with the previous are a key
30 MD datacenter_vim_id: should contain the uuid of the VIM managed by this thread
31 MD vim_id: id of the vm,net,etc at VIM
32 MD action: CREATE, DELETE, LOOK, TODO: LOOK_CREATE
33 MD item: database table name, can be instance_vms, instance_nets, TODO: datacenter_flavors, datacenter_images
34 MD item_id: uuid of the referenced entry in the preious table
35 MD status: SCHEDULED,BUILD,DONE,FAILED,SUPERSEDED
36 MD extra: text with yaml format at database, dict at memory with:
37 params: list with the params to be sent to the VIM for CREATE or LOOK. For DELETE the vim_id is taken from other related tasks
38 depends_on: list with the 'task_index'es of tasks that must be completed before. e.g. a vm depends on a net
39 sdn_net_id: used for net.
40 tries:
41 interfaces: used for VMs. Each key is the uuid of the instance_interfaces entry at database
42 iface_id: uuid of intance_interfaces
43 sdn_port_id:
44 sdn_net_id:
45 created: False if the VIM element is not created by other actions, and it should not be deleted
46 vim_status: VIM status of the element. Stored also at database in the instance_XXX
47 M depends: dict with task_index(from depends_on) to task class
48 M params: same as extra[params] but with the resolved dependencies
49 M vim_interfaces: similar to extra[interfaces] but with VIM information. Stored at database in the instance_XXX but not at vim_actions
50 M vim_info: Detailed information of a vm,net from the VIM. Stored at database in the instance_XXX but not at vim_actions
51 MD error_msg: descriptive text upon an error.Stored also at database instance_XXX
52 MD created_at: task creation time
53 MD modified_at: last task update time. On refresh it contains when this task need to be refreshed
54
55 """
56
57 import threading
58 import time
59 import Queue
60 import logging
61 import vimconn
62 import yaml
63 from db_base import db_base_Exception
64 from lib_osm_openvim.ovim import ovimException
65
66 __author__ = "Alfonso Tierno, Pablo Montes"
67 __date__ = "$28-Sep-2017 12:07:15$"
68
69 # from logging import Logger
70 # import auxiliary_functions as af
71
72
73 def is_task_id(task_id):
74 return task_id.startswith("TASK-")
75
76
77 class VimThreadException(Exception):
78 pass
79
80
81 class vim_thread(threading.Thread):
82 REFRESH_BUILD = 5 # 5 seconds
83 REFRESH_ACTIVE = 60 # 1 minute
84
85 def __init__(self, vimconn, task_lock, name=None, datacenter_name=None, datacenter_tenant_id=None,
86 db=None, db_lock=None, ovim=None):
87 """Init a thread.
88 Arguments:
89 'id' number of thead
90 'name' name of thread
91 'host','user': host ip or name to manage and user
92 'db', 'db_lock': database class and lock to use it in exclusion
93 """
94 threading.Thread.__init__(self)
95 self.vim = vimconn
96 self.datacenter_name = datacenter_name
97 self.datacenter_tenant_id = datacenter_tenant_id
98 self.ovim = ovim
99 if not name:
100 self.name = vimconn["id"] + "." + vimconn["config"]["datacenter_tenant_id"]
101 else:
102 self.name = name
103
104 self.logger = logging.getLogger('openmano.vim.'+self.name)
105 self.db = db
106 self.db_lock = db_lock
107
108 self.task_lock = task_lock
109 self.task_queue = Queue.Queue(2000)
110
111 self.refresh_tasks = []
112 """Contains time ordered task list for refreshing the status of VIM VMs and nets"""
113
114 self.pending_tasks = []
115 """Contains time ordered task list for creation, deletion of VIM VMs and nets"""
116
117 self.grouped_tasks = {}
118 """ It contains all the creation/deletion pending tasks grouped by its concrete vm, net, etc
119 <item><item_id>:
120 - <task1> # e.g. CREATE task
121 <task2> # e.g. DELETE task
122 """
123
124 def _reload_vim_actions(self):
125 """
126 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
127 :return: None
128 """
129 action_completed = False
130 task_list = []
131 old_action_key = None
132
133 with self.db_lock:
134 vim_actions = self.db.get_rows(FROM="vim_actions",
135 WHERE={"datacenter_vim_id": self.datacenter_tenant_id },
136 ORDER_BY=("item", "item_id", "created_at",))
137 for task in vim_actions:
138 item = task["item"]
139 item_id = task["item_id"]
140 action_key = item + item_id
141 if old_action_key != action_key:
142 if not action_completed and task_list:
143 # This will fill needed task parameters into memory, and insert the task if needed in
144 # self.pending_tasks or self.refresh_tasks
145 self._insert_pending_tasks(task_list)
146 task_list = []
147 old_action_key = action_key
148 action_completed = False
149 elif action_completed:
150 continue
151
152 if task["status"] == "SCHEDULED" or task["action"] == "CREATE" or task["action"] == "FIND":
153 task_list.append(task)
154 elif task["action"] == "DELETE":
155 # action completed because deleted and status is not SCHEDULED. Not needed anything
156 action_completed = True
157
158 # Last actions group need to be inserted too
159 if not action_completed and task_list:
160 self._insert_pending_tasks(task_list)
161
162 def _refres_elements(self):
163 """Call VIM to get VMs and networks status until 10 elements"""
164 now = time.time()
165 nb_processed = 0
166 vm_to_refresh_list = []
167 net_to_refresh_list = []
168 vm_to_refresh_dict = {}
169 net_to_refresh_dict = {}
170 items_to_refresh = 0
171 while self.refresh_tasks:
172 task = self.refresh_tasks[0]
173 with self.task_lock:
174 if task['status'] == 'SUPERSEDED':
175 self.refresh_tasks.pop(0)
176 continue
177 if task['modified_at'] > now:
178 break
179 # task["status"] = "processing"
180 nb_processed += 1
181 self.refresh_tasks.pop(0)
182 if task["item"] == 'instance_vms':
183 vm_to_refresh_list.append(task["vim_id"])
184 vm_to_refresh_dict[task["vim_id"]] = task
185 elif task["item"] == 'instance_nets':
186 net_to_refresh_list.append(task["vim_id"])
187 net_to_refresh_dict[task["vim_id"]] = task
188 else:
189 error_text = "unknown task {}".format(task["item"])
190 self.logger.error(error_text)
191 items_to_refresh += 1
192 if items_to_refresh == 10:
193 break
194
195 if vm_to_refresh_list:
196 try:
197 now = time.time()
198 vim_dict = self.vim.refresh_vms_status(vm_to_refresh_list)
199 for vim_id, vim_info in vim_dict.items():
200 # look for task
201 task_need_update = False
202 task = vm_to_refresh_dict[vim_id]
203 self.logger.debug("get-vm vm_id=%s result=%s", task["vim_id"], str(vim_info))
204
205 # update database
206 task_vim_info = task.get("vim_info")
207 task_error_msg = task.get("error_msg")
208 task_vim_status = task["extra"].get("vim_status")
209 if vim_info.get("error_msg"):
210 vim_info["error_msg"] = self._format_vim_error_msg(vim_info["error_msg"])
211 if task_vim_status != vim_info["status"] or task_error_msg != vim_info.get("error_msg") or \
212 task_vim_info != vim_info.get("vim_info"):
213 with self.db_lock:
214 temp_dict = {"status": vim_info["status"],
215 "error_msg": vim_info.get("error_msg"),
216 "vim_info": vim_info.get("vim_info")}
217 self.db.update_rows('instance_vms', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]})
218 task["extra"]["vim_status"] = vim_info["status"]
219 task["error_msg"] = vim_info.get("error_msg")
220 task["vim_info"] = vim_info.get("vim_info")
221 task_need_update = True
222 for interface in vim_info.get("interfaces", ()):
223 vim_interface_id = interface["vim_interface_id"]
224 if vim_interface_id not in task["extra"]["interfaces"]:
225 self.logger.critical("Interface not found {} on task info {}".format(
226 vim_interface_id, task["extra"]["interfaces"]), exc_info=True)
227 continue
228 task_interface = task["extra"]["interfaces"][vim_interface_id]
229 task_vim_interface = task["vim_interfaces"].get(vim_interface_id)
230 if task_vim_interface != interface:
231 # delete old port
232 if task_interface.get("sdn_port_id"):
233 try:
234 with self.db_lock:
235 self.ovim.delete_port(task_interface["sdn_port_id"])
236 task_interface["sdn_port_id"] = None
237 task_need_update = True
238 except ovimException as e:
239 self.logger.error("ovimException deleting external_port={} ".format(
240 task_interface["sdn_port_id"]) + str(e), exc_info=True)
241 # TODO Set error_msg at instance_nets
242
243 # Create SDN port
244 sdn_net_id = task_interface.get("sdn_net_id")
245 if sdn_net_id and interface.get("compute_node") and interface.get("pci"):
246 sdn_port_name = sdn_net_id + "." + task["vim_id"]
247 sdn_port_name = sdn_port_name[:63]
248 try:
249 with self.db_lock:
250 sdn_port_id = self.ovim.new_external_port(
251 {"compute_node": interface["compute_node"],
252 "pci": interface["pci"],
253 "vlan": interface.get("vlan"),
254 "net_id": sdn_net_id,
255 "region": self.vim["config"]["datacenter_id"],
256 "name": sdn_port_name,
257 "mac": interface.get("mac_address")})
258 task_interface["sdn_port_id"] = sdn_port_id
259 task_need_update = True
260 except (ovimException, Exception) as e:
261 self.logger.error(
262 "ovimException creating new_external_port compute_node={} "
263 "pci={} vlan={} ".format(
264 interface["compute_node"],
265 interface["pci"],
266 interface.get("vlan")) + str(e),
267 exc_info=True)
268 # TODO Set error_msg at instance_nets
269 with self.db_lock:
270 self.db.update_rows(
271 'instance_interfaces',
272 UPDATE={"mac_address": interface.get("mac_address"),
273 "ip_address": interface.get("ip_address"),
274 "vim_info": interface.get("vim_info"),
275 "sdn_port_id": task_interface.get("sdn_port_id"),
276 "compute_node": interface.get("compute_node"),
277 "pci": interface.get("pci"),
278 "vlan": interface.get("vlan"),
279 },
280 WHERE={'uuid': task_interface["iface_id"]})
281 task["vim_interfaces"][vim_interface_id] = interface
282 if task_need_update:
283 with self.db_lock:
284 self.db.update_rows(
285 'vim_actions',
286 UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256),
287 "error_msg": task.get("error_msg"), "modified_at": now},
288 WHERE={'instance_action_id': task['instance_action_id'],
289 'task_index': task['task_index']})
290 if task["extra"].get("vim_status") == "BUILD":
291 self._insert_refresh(task, now + self.REFRESH_BUILD)
292 else:
293 self._insert_refresh(task, now + self.REFRESH_ACTIVE)
294 except vimconn.vimconnException as e:
295 self.logger.error("vimconnException Exception when trying to refresh vms " + str(e))
296 self._insert_refresh(task, now + self.REFRESH_ACTIVE)
297
298 if net_to_refresh_list:
299 try:
300 now = time.time()
301 vim_dict = self.vim.refresh_nets_status(net_to_refresh_list)
302 for vim_id, vim_info in vim_dict.items():
303 # look for task
304 task = net_to_refresh_dict[vim_id]
305 self.logger.debug("get-net net_id=%s result=%s", task["vim_id"], str(vim_info))
306
307 task_vim_info = task.get("vim_info")
308 task_vim_status = task["extra"].get("vim_status")
309 task_error_msg = task.get("error_msg")
310 task_sdn_net_id = task["extra"].get("sdn_net_id")
311
312 # get ovim status
313 if task_sdn_net_id:
314 try:
315 with self.db_lock:
316 sdn_net = self.ovim.show_network(task_sdn_net_id)
317 if sdn_net["status"] == "ERROR":
318 if not vim_info.get("error_msg"):
319 vim_info["error_msg"] = sdn_net["error_msg"]
320 else:
321 vim_info["error_msg"] = "VIM_ERROR: {} && SDN_ERROR: {}".format(
322 self._format_vim_error_msg(vim_info["error_msg"], 1024//2-14),
323 self._format_vim_error_msg(sdn_net["error_msg"], 1024//2-14))
324 if vim_info["status"] == "VIM_ERROR":
325 vim_info["status"] = "VIM_SDN_ERROR"
326 else:
327 vim_info["status"] = "SDN_ERROR"
328
329 except (ovimException, Exception) as e:
330 self.logger.error(
331 "ovimException getting network infor snd_net_id={}".format(task_sdn_net_id),
332 exc_info=True)
333 # TODO Set error_msg at instance_nets
334
335 # update database
336 if vim_info.get("error_msg"):
337 vim_info["error_msg"] = self._format_vim_error_msg(vim_info["error_msg"])
338 if task_vim_status != vim_info["status"] or task_error_msg != vim_info.get("error_msg") or \
339 task_vim_info != vim_info["vim_info"]:
340 task["extra"]["vim_status"] = vim_info["status"]
341 task["error_msg"] = vim_info.get("error_msg")
342 task["vim_info"] = vim_info["vim_info"]
343 temp_dict = {"status": vim_info["status"],
344 "error_msg": vim_info.get("error_msg"),
345 "vim_info": vim_info["vim_info"]}
346 with self.db_lock:
347 self.db.update_rows('instance_nets', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]})
348 self.db.update_rows(
349 'vim_actions',
350 UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256),
351 "error_msg": task.get("error_msg"), "modified_at": now},
352 WHERE={'instance_action_id': task['instance_action_id'],
353 'task_index': task['task_index']})
354 if task["extra"].get("vim_status") == "BUILD":
355 self._insert_refresh(task, now + self.REFRESH_BUILD)
356 else:
357 self._insert_refresh(task, now + self.REFRESH_ACTIVE)
358 except vimconn.vimconnException as e:
359 self.logger.error("vimconnException Exception when trying to refresh nets " + str(e))
360 self._insert_refresh(task, now + self.REFRESH_ACTIVE)
361
362 return nb_processed
363
364 def _insert_refresh(self, task, threshold_time=None):
365 """Insert a task at list of refreshing elements. The refreshing list is ordered by threshold_time (task['modified_at']
366 It is assumed that this is called inside this thread
367 """
368 if not threshold_time:
369 threshold_time = time.time()
370 task["modified_at"] = threshold_time
371 task_name = task["item"][9:] + "-" + task["action"]
372 task_id = task["instance_action_id"] + "." + str(task["task_index"])
373 for index in range(0, len(self.refresh_tasks)):
374 if self.refresh_tasks[index]["modified_at"] > threshold_time:
375 self.refresh_tasks.insert(index, task)
376 break
377 else:
378 index = len(self.refresh_tasks)
379 self.refresh_tasks.append(task)
380 self.logger.debug("new refresh task={} name={}, modified_at={} index={}".format(
381 task_id, task_name, task["modified_at"], index))
382
383 def _remove_refresh(self, task_name, vim_id):
384 """Remove a task with this name and vim_id from the list of refreshing elements.
385 It is assumed that this is called inside this thread outside _refres_elements method
386 Return True if self.refresh_list is modified, task is found
387 Return False if not found
388 """
389 index_to_delete = None
390 for index in range(0, len(self.refresh_tasks)):
391 if self.refresh_tasks[index]["name"] == task_name and self.refresh_tasks[index]["vim_id"] == vim_id:
392 index_to_delete = index
393 break
394 else:
395 return False
396 if index_to_delete != None:
397 del self.refresh_tasks[index_to_delete]
398 return True
399
400 def _proccess_pending_tasks(self):
401 nb_created = 0
402 nb_processed = 0
403 while self.pending_tasks:
404 task = self.pending_tasks.pop(0)
405 nb_processed += 1
406 if task["status"] == "SUPERSEDED":
407 # not needed to do anything but update database with the new status
408 result = True
409 database_update = None
410 elif task["item"] == 'instance_vms':
411 if task["action"] == "CREATE":
412 result, database_update = self.new_vm(task)
413 nb_created += 1
414 elif task["action"] == "DELETE":
415 result, database_update = self.del_vm(task)
416 else:
417 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
418 elif task["item"] == 'instance_nets':
419 if task["action"] == "CREATE":
420 result, database_update = self.new_net(task)
421 nb_created += 1
422 elif task["action"] == "DELETE":
423 result, database_update = self.del_net(task)
424 elif task["action"] == "FIND":
425 result, database_update = self.get_net(task)
426 else:
427 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
428 else:
429 raise vimconn.vimconnException(self.name + "unknown task item {}".format(task["item"]))
430 # TODO
431
432 if task["status"] == "SCHEDULED":
433 # This is because a depend task is not completed. Moved to the end. NOT USED YET
434 if task["extra"].get("tries", 0) > 3:
435 task["status"] == "FAILED"
436 else:
437 task["extra"]["tries"] = task["extra"].get("tries", 0) + 1
438 self.pending_tasks.append(task)
439 elif task["action"] == "DELETE":
440 action_key = task["item"] + task["item_id"]
441 del self.grouped_tasks[action_key]
442 elif task["action"] in ("CREATE", "FIND") and task["status"] in ("DONE", "BUILD"):
443 self._insert_refresh(task)
444
445 self.logger.debug("vim_action id={}.{} item={} action={} result={}:{} params={}".format(
446 task["instance_action_id"], task["task_index"], task["item"], task["action"],
447 task["status"], task["vim_id"] if task["status"] == "DONE" else task.get("error_msg"),
448 task["params"]))
449 try:
450 now = time.time()
451 with self.db_lock:
452 self.db.update_rows(
453 table="vim_actions",
454 UPDATE={"status": task["status"], "vim_id": task["vim_id"], "modified_at": now,
455 "error_msg": task["error_msg"],
456 "extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256)},
457 WHERE={"instance_action_id": task["instance_action_id"], "task_index": task["task_index"]})
458 if result is not None:
459 self.db.update_rows(
460 table="instance_actions",
461 UPDATE={("number_done" if result else "number_failed"): {"INCREMENT": 1},
462 "modified_at": now},
463 WHERE={"uuid": task["instance_action_id"]})
464 if database_update:
465 self.db.update_rows(table=task["item"],
466 UPDATE=database_update,
467 WHERE={"uuid": task["item_id"]})
468 except db_base_Exception as e:
469 self.logger.error("Error updating database %s", str(e), exc_info=True)
470
471 if nb_created == 10:
472 break
473 return nb_processed
474
475 def _insert_pending_tasks(self, vim_actions_list):
476 now = time.time()
477 for task in vim_actions_list:
478 if task["datacenter_vim_id"] != self.datacenter_tenant_id:
479 continue
480 item = task["item"]
481 item_id = task["item_id"]
482 action_key = item + item_id
483 if action_key not in self.grouped_tasks:
484 self.grouped_tasks[action_key] = []
485 task["params"] = None
486 task["depends"] = {}
487 if task["extra"]:
488 extra = yaml.load(task["extra"])
489 task["extra"] = extra
490 task["params"] = extra.get("params")
491 depends_on_list = extra.get("depends_on")
492 if depends_on_list:
493 for index in depends_on_list:
494 if index < len(vim_actions_list) and vim_actions_list[index]["task_index"] == index and\
495 vim_actions_list[index]["instance_action_id"] == task["instance_action_id"]:
496 task["depends"]["TASK-" + str(index)] = vim_actions_list[index]
497 if extra.get("interfaces"):
498 task["vim_interfaces"] = {}
499 else:
500 task["extra"] = {}
501 if "error_msg" not in task:
502 task["error_msg"] = None
503 if "vim_id" not in task:
504 task["vim_id"] = None
505
506 if task["action"] == "DELETE":
507 need_delete_action = False
508 for to_supersede in self.grouped_tasks.get(action_key, ()):
509 if to_supersede["action"] == "FIND" and to_supersede.get("vim_id"):
510 task["vim_id"] = to_supersede["vim_id"]
511 if to_supersede["action"] == "CREATE" and to_supersede.get("vim_id"):
512 need_delete_action = True
513 task["vim_id"] = to_supersede["vim_id"]
514 if to_supersede["extra"].get("sdn_vim_id"):
515 task["extra"]["sdn_vim_id"] = to_supersede["extra"]["sdn_vim_id"]
516 if to_supersede["extra"].get("interfaces"):
517 task["extra"]["interfaces"] = to_supersede["extra"]["interfaces"]
518 # Mark task as SUPERSEDED.
519 # If task is in self.pending_tasks, it will be removed and database will be update
520 # If task is in self.refresh_tasks, it will be removed
521 to_supersede["status"] = "SUPERSEDED"
522 if not need_delete_action:
523 task["status"] = "SUPERSEDED"
524
525 self.grouped_tasks[action_key].append(task)
526 self.pending_tasks.append(task)
527 elif task["status"] == "SCHEDULED":
528 self.grouped_tasks[action_key].append(task)
529 self.pending_tasks.append(task)
530 elif task["action"] in ("CREATE", "FIND"):
531 self.grouped_tasks[action_key].append(task)
532 if task["status"] in ("DONE", "BUILD"):
533 self._insert_refresh(task)
534 # TODO add VM reset, get console, etc...
535 else:
536 raise vimconn.vimconnException(self.name + "unknown vim_action action {}".format(task["action"]))
537
538 def insert_task(self, task):
539 try:
540 self.task_queue.put(task, False)
541 return None
542 except Queue.Full:
543 raise vimconn.vimconnException(self.name + ": timeout inserting a task")
544
545 def del_task(self, task):
546 with self.task_lock:
547 if task["status"] == "SCHEDULED":
548 task["status"] == "SUPERSEDED"
549 return True
550 else: # task["status"] == "processing"
551 self.task_lock.release()
552 return False
553
554 def run(self):
555 self.logger.debug("Starting")
556 while True:
557 self._reload_vim_actions()
558 reload_thread = False
559 while True:
560 try:
561 while not self.task_queue.empty():
562 task = self.task_queue.get()
563 if isinstance(task, list):
564 self._insert_pending_tasks(task)
565 elif isinstance(task, str):
566 if task == 'exit':
567 return 0
568 elif task == 'reload':
569 reload_thread = True
570 break
571 self.task_queue.task_done()
572 if reload_thread:
573 break
574 nb_processed = self._proccess_pending_tasks()
575 nb_processed += self._refres_elements()
576 if not nb_processed:
577 time.sleep(1)
578
579 except Exception as e:
580 self.logger.critical("Unexpected exception at run: " + str(e), exc_info=True)
581
582 self.logger.debug("Finishing")
583
584 def terminate(self, task):
585 return True, None
586
587 def _look_for_task(self, instance_action_id, task_id):
588 task_index = task_id.split("-")[-1]
589 with self.db_lock:
590 tasks = self.db.get_rows(FROM="vim_actions", WHERE={"instance_action_id": instance_action_id,
591 "task_index": task_index})
592 if not tasks:
593 return None
594 task = tasks[0]
595 task["params"] = None
596 task["depends"] = {}
597 if task["extra"]:
598 extra = yaml.load(task["extra"])
599 task["extra"] = extra
600 task["params"] = extra.get("params")
601 if extra.get("interfaces"):
602 task["vim_interfaces"] = {}
603 else:
604 task["extra"] = {}
605 return task
606
607 def _format_vim_error_msg(self, error_text, max_length=1024):
608 if error_text and len(error_text) >= max_length:
609 return error_text[:max_length//2-3] + " ... " + error_text[-max_length//2+3:]
610 return error_text
611
612 def new_net(self, task):
613 try:
614 task_id = task["instance_action_id"] + "." + str(task["task_index"])
615 params = task["params"]
616 vim_net_id = self.vim.new_network(*params)
617
618 net_name = params[0]
619 net_type = params[1]
620
621 network = None
622 sdn_net_id = None
623 sdn_controller = self.vim.config.get('sdn-controller')
624 if sdn_controller and (net_type == "data" or net_type == "ptp"):
625 network = {"name": net_name, "type": net_type, "region": self.vim["config"]["datacenter_id"]}
626
627 vim_net = self.vim.get_network(vim_net_id)
628 if vim_net.get('encapsulation') != 'vlan':
629 raise vimconn.vimconnException(
630 "net '{}' defined as type '{}' has not vlan encapsulation '{}'".format(
631 net_name, net_type, vim_net['encapsulation']))
632 network["vlan"] = vim_net.get('segmentation_id')
633 try:
634 with self.db_lock:
635 sdn_net_id = self.ovim.new_network(network)
636 except (ovimException, Exception) as e:
637 self.logger.error("task=%s cannot create SDN network vim_net_id=%s input='%s' ovimException='%s'",
638 str(task_id), vim_net_id, str(network), str(e))
639 task["status"] = "DONE"
640 task["extra"]["vim_info"] = {}
641 task["extra"]["sdn_net_id"] = sdn_net_id
642 task["error_msg"] = None
643 task["vim_id"] = vim_net_id
644 instance_element_update = {"vim_net_id": vim_net_id, "sdn_net_id": sdn_net_id, "status": "BUILD", "error_msg": None}
645 return True, instance_element_update
646 except vimconn.vimconnException as e:
647 self.logger.error("Error creating NET, task=%s: %s", str(task_id), str(e))
648 task["status"] = "FAILED"
649 task["vim_id"] = None
650 task["error_msg"] = self._format_vim_error_msg(str(e))
651 instance_element_update = {"vim_net_id": None, "sdn_net_id": None, "status": "VIM_ERROR", "error_msg": task["error_msg"]}
652 return False, instance_element_update
653
654 def new_vm(self, task):
655 try:
656 params = task["params"]
657 task_id = task["instance_action_id"] + "." + str(task["task_index"])
658 depends = task.get("depends")
659 net_list = params[5]
660 error_text = ""
661 for net in net_list:
662 if "net_id" in net and is_task_id(net["net_id"]): # change task_id into network_id
663 if net["net_id"] in depends:
664 task_net = depends[net["net_id"]]
665 else:
666 task_net = self._look_for_task(task["instance_action_id"], net["net_id"])
667 if not task_net:
668 raise VimThreadException(
669 "Error trying to get depending task from task_index={}".format(net["net_id"]))
670 network_id = task_net.get("vim_id")
671 if not network_id:
672 raise VimThreadException(
673 "Cannot create VM because depends on a network not created or found: " +
674 str(task_net["error_msg"]))
675 net["net_id"] = network_id
676 vim_vm_id = self.vim.new_vminstance(*params)
677
678 # fill task_interfaces. Look for snd_net_id at database for each interface
679 task_interfaces = {}
680 for iface in net_list:
681 task_interfaces[iface["vim_id"]] = {"iface_id": iface["uuid"]}
682 with self.db_lock:
683 result = self.db.get_rows(SELECT=('sdn_net_id',),
684 FROM='instance_nets as ine join instance_interfaces as ii on ii.instance_net_id=ine.uuid',
685 WHERE={'ii.uuid': iface["uuid"]})
686 if result:
687 task_interfaces[iface["vim_id"]]["sdn_net_id"] = result[0]['sdn_net_id']
688 else:
689 self.logger.critical("Error creating VM, task=%s Network {} not found at DB", task_id,
690 iface["uuid"], exc_info=True)
691
692 task["vim_info"] = {}
693 task["vim_interfaces"] = {}
694 task["extra"]["interfaces"] = task_interfaces
695 task["error_msg"] = None
696 task["status"] = "DONE"
697 task["vim_id"] = vim_vm_id
698 instance_element_update = {"status": "BUILD", "vim_vm_id": vim_vm_id, "error_msg": None}
699 return True, instance_element_update
700
701 except (vimconn.vimconnException, VimThreadException) as e:
702 self.logger.error("Error creating VM, task=%s: %s", task_id, str(e))
703 error_text = self._format_vim_error_msg(str(e))
704 task["error_msg"] = error_text
705 task["status"] = "FAILED"
706 task["vim_id"] = None
707 instance_element_update = {"status": "VIM_ERROR", "vim_vm_id": None, "error_msg": error_text}
708 return False, instance_element_update
709
710 def del_vm(self, task):
711 vm_vim_id = task["vim_id"]
712 interfaces = task["extra"].get("interfaces", ())
713 try:
714 for iface in interfaces.values():
715 if iface.get("sdn_port_id"):
716 try:
717 with self.db_lock:
718 self.ovim.delete_port(iface["sdn_port_id"])
719 except ovimException as e:
720 self.logger.error("ovimException deleting external_port={} at VM vim_id={} deletion ".format(
721 iface["sdn_port_id"], vm_vim_id) + str(e), exc_info=True)
722 # TODO Set error_msg at instance_nets
723
724 self.vim.delete_vminstance(vm_vim_id)
725 task["status"] = "DONE"
726 task["error_msg"] = None
727 return True, None
728
729 except vimconn.vimconnException as e:
730 task["error_msg"] = self._format_vim_error_msg(str(e))
731 if isinstance(e, vimconn.vimconnNotFoundException):
732 # If not found mark as Done and fill error_msg
733 task["status"] = "DONE"
734 return True, None
735 task["status"] = "FAILED"
736 return False, None
737
738 def del_net(self, task):
739 net_vim_id = task["vim_id"]
740 sdn_net_id = task["extra"].get("sdn_net_id")
741 try:
742 if sdn_net_id:
743 # Delete any attached port to this sdn network. There can be ports associated to this network in case
744 # it was manually done using 'openmano vim-net-sdn-attach'
745 with self.db_lock:
746 port_list = self.ovim.get_ports(columns={'uuid'},
747 filter={'name': 'external_port', 'net_id': sdn_net_id})
748 for port in port_list:
749 self.ovim.delete_port(port['uuid'])
750 self.ovim.delete_network(sdn_net_id)
751 self.vim.delete_network(net_vim_id)
752 task["status"] = "DONE"
753 task["error_msg"] = None
754 return True, None
755 except ovimException as e:
756 task["error_msg"] = self._format_vim_error_msg("ovimException obtaining and deleting external "
757 "ports for net {}: {}".format(sdn_net_id, str(e)))
758 except vimconn.vimconnException as e:
759 task["error_msg"] = self._format_vim_error_msg(str(e))
760 if isinstance(e, vimconn.vimconnNotFoundException):
761 # If not found mark as Done and fill error_msg
762 task["status"] = "DONE"
763 return True, None
764 task["status"] = "FAILED"
765 return False, None
766
767 def get_net(self, task):
768 try:
769 task_id = task["instance_action_id"] + "." + str(task["task_index"])
770 params = task["params"]
771 filter = params[0]
772 vim_nets = self.vim.get_network_list(filter)
773 if not vim_nets:
774 raise VimThreadException("Network not found with this criteria: '{}'".format(filter))
775 elif len(vim_nets) > 1:
776 raise VimThreadException("More than one network found with this criteria: '{}'".format(filter))
777 vim_net_id = vim_nets[0]["id"]
778
779 # Discover if this network is managed by a sdn controller
780 sdn_net_id = None
781 with self.db_lock:
782 result = self.db.get_rows(SELECT=('sdn_net_id',), FROM='instance_nets',
783 WHERE={'vim_net_id': vim_net_id, 'instance_scenario_id': None,
784 'datacenter_tenant_id': self.datacenter_tenant_id})
785 if result:
786 sdn_net_id = result[0]['sdn_net_id']
787
788 task["status"] = "DONE"
789 task["extra"]["vim_info"] = {}
790 task["extra"]["created"] = False
791 task["extra"]["sdn_net_id"] = sdn_net_id
792 task["error_msg"] = None
793 task["vim_id"] = vim_net_id
794 instance_element_update = {"vim_net_id": vim_net_id, "created": False, "status": "BUILD",
795 "error_msg": None}
796 return True, instance_element_update
797 except (vimconn.vimconnException, VimThreadException) as e:
798 self.logger.error("Error looking for NET, task=%s: %s", str(task_id), str(e))
799 task["status"] = "FAILED"
800 task["vim_id"] = None
801 task["error_msg"] = self._format_vim_error_msg(str(e))
802 instance_element_update = {"vim_net_id": None, "status": "VIM_ERROR",
803 "error_msg": task["error_msg"]}
804 return False, instance_element_update