bug 373 get cp port_security_enable from descriptor instead of pyangbind
[osm/RO.git] / osm_ro / vim_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
5 # This file is part of openvim
6 # All Rights Reserved.
7 #
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
11 #
12 # http://www.apache.org/licenses/LICENSE-2.0
13 #
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
18 # under the License.
19 #
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
22 ##
23
24 """"
25 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
26 The tasks are stored at database in table vim_actions
27 The task content is:
28 MD instance_action_id: reference a global action over an instance-scenario: database instance_actions
29 MD task_index: index number of the task. This with the previous are a key
30 MD datacenter_vim_id: should contain the uuid of the VIM managed by this thread
31 MD vim_id: id of the vm,net,etc at VIM
32 MD action: CREATE, DELETE, LOOK, TODO: LOOK_CREATE
33 MD item: database table name, can be instance_vms, instance_nets, TODO: datacenter_flavors, datacenter_images
34 MD item_id: uuid of the referenced entry in the preious table
35 MD status: SCHEDULED,BUILD,DONE,FAILED,SUPERSEDED
36 MD extra: text with yaml format at database, dict at memory with:
37 params: list with the params to be sent to the VIM for CREATE or LOOK. For DELETE the vim_id is taken from other related tasks
38 depends_on: list with the 'task_index'es of tasks that must be completed before. e.g. a vm depends on a net
39 sdn_net_id: used for net.
40 tries:
41 interfaces: used for VMs. Each key is the uuid of the instance_interfaces entry at database
42 iface_id: uuid of intance_interfaces
43 sdn_port_id:
44 sdn_net_id:
45 created: False if the VIM element is not created by other actions, and it should not be deleted
46 vim_status: VIM status of the element. Stored also at database in the instance_XXX
47 M depends: dict with task_index(from depends_on) to task class
48 M params: same as extra[params] but with the resolved dependencies
49 M vim_interfaces: similar to extra[interfaces] but with VIM information. Stored at database in the instance_XXX but not at vim_actions
50 M vim_info: Detailed information of a vm,net from the VIM. Stored at database in the instance_XXX but not at vim_actions
51 MD error_msg: descriptive text upon an error.Stored also at database instance_XXX
52 MD created_at: task creation time
53 MD modified_at: last task update time. On refresh it contains when this task need to be refreshed
54
55 """
56
57 import threading
58 import time
59 import Queue
60 import logging
61 import vimconn
62 import yaml
63 from db_base import db_base_Exception
64 from lib_osm_openvim.ovim import ovimException
65
66 __author__ = "Alfonso Tierno, Pablo Montes"
67 __date__ = "$28-Sep-2017 12:07:15$"
68
69 # from logging import Logger
70 # import auxiliary_functions as af
71
72
73 def is_task_id(task_id):
74 return task_id.startswith("TASK-")
75
76
77 class VimThreadException(Exception):
78 pass
79
80
81 class vim_thread(threading.Thread):
82 REFRESH_BUILD = 5 # 5 seconds
83 REFRESH_ACTIVE = 60 # 1 minute
84
85 def __init__(self, myvimconn, task_lock, name=None, datacenter_name=None, datacenter_tenant_id=None,
86 db=None, db_lock=None, ovim=None):
87 """Init a thread.
88 Arguments:
89 'id' number of thead
90 'name' name of thread
91 'host','user': host ip or name to manage and user
92 'db', 'db_lock': database class and lock to use it in exclusion
93 """
94 threading.Thread.__init__(self)
95 if isinstance(myvimconn, vimconn.vimconnException):
96 self.vim = None
97 self.error_status = "Error accesing to VIM: {}".format(myvimconn)
98 else:
99 self.vim = myvimconn
100 self.error_status = None
101 self.datacenter_name = datacenter_name
102 self.datacenter_tenant_id = datacenter_tenant_id
103 self.ovim = ovim
104 if not name:
105 self.name = vimconn["id"] + "." + vimconn["config"]["datacenter_tenant_id"]
106 else:
107 self.name = name
108
109 self.logger = logging.getLogger('openmano.vim.'+self.name)
110 self.db = db
111 self.db_lock = db_lock
112
113 self.task_lock = task_lock
114 self.task_queue = Queue.Queue(2000)
115
116 self.refresh_tasks = []
117 """Contains time ordered task list for refreshing the status of VIM VMs and nets"""
118
119 self.pending_tasks = []
120 """Contains time ordered task list for creation, deletion of VIM VMs and nets"""
121
122 self.grouped_tasks = {}
123 """ It contains all the creation/deletion pending tasks grouped by its concrete vm, net, etc
124 <item><item_id>:
125 - <task1> # e.g. CREATE task
126 <task2> # e.g. DELETE task
127 """
128
129 def _reload_vim_actions(self):
130 """
131 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
132 :return: None
133 """
134 action_completed = False
135 task_list = []
136 old_action_key = None
137
138 with self.db_lock:
139 vim_actions = self.db.get_rows(FROM="vim_actions",
140 WHERE={"datacenter_vim_id": self.datacenter_tenant_id },
141 ORDER_BY=("item", "item_id", "created_at",))
142 for task in vim_actions:
143 item = task["item"]
144 item_id = task["item_id"]
145 action_key = item + item_id
146 if old_action_key != action_key:
147 if not action_completed and task_list:
148 # This will fill needed task parameters into memory, and insert the task if needed in
149 # self.pending_tasks or self.refresh_tasks
150 self._insert_pending_tasks(task_list)
151 task_list = []
152 old_action_key = action_key
153 action_completed = False
154 elif action_completed:
155 continue
156
157 if task["status"] == "SCHEDULED" or task["action"] == "CREATE" or task["action"] == "FIND":
158 task_list.append(task)
159 elif task["action"] == "DELETE":
160 # action completed because deleted and status is not SCHEDULED. Not needed anything
161 action_completed = True
162
163 # Last actions group need to be inserted too
164 if not action_completed and task_list:
165 self._insert_pending_tasks(task_list)
166
167 def _refres_elements(self):
168 """Call VIM to get VMs and networks status until 10 elements"""
169 now = time.time()
170 nb_processed = 0
171 vm_to_refresh_list = []
172 net_to_refresh_list = []
173 vm_to_refresh_dict = {}
174 net_to_refresh_dict = {}
175 items_to_refresh = 0
176 while self.refresh_tasks:
177 task = self.refresh_tasks[0]
178 with self.task_lock:
179 if task['status'] == 'SUPERSEDED':
180 self.refresh_tasks.pop(0)
181 continue
182 if task['modified_at'] > now:
183 break
184 # task["status"] = "processing"
185 nb_processed += 1
186 self.refresh_tasks.pop(0)
187 if task["item"] == 'instance_vms':
188 vm_to_refresh_list.append(task["vim_id"])
189 vm_to_refresh_dict[task["vim_id"]] = task
190 elif task["item"] == 'instance_nets':
191 net_to_refresh_list.append(task["vim_id"])
192 net_to_refresh_dict[task["vim_id"]] = task
193 else:
194 error_text = "unknown task {}".format(task["item"])
195 self.logger.error(error_text)
196 items_to_refresh += 1
197 if items_to_refresh == 10:
198 break
199
200 if vm_to_refresh_list:
201 try:
202 now = time.time()
203 vim_dict = self.vim.refresh_vms_status(vm_to_refresh_list)
204 for vim_id, vim_info in vim_dict.items():
205 # look for task
206 task_need_update = False
207 task = vm_to_refresh_dict[vim_id]
208 self.logger.debug("get-vm vm_id=%s result=%s", task["vim_id"], str(vim_info))
209
210 # update database
211 task_vim_info = task.get("vim_info")
212 task_error_msg = task.get("error_msg")
213 task_vim_status = task["extra"].get("vim_status")
214 if vim_info.get("error_msg"):
215 vim_info["error_msg"] = self._format_vim_error_msg(vim_info["error_msg"])
216 if task_vim_status != vim_info["status"] or task_error_msg != vim_info.get("error_msg") or \
217 task_vim_info != vim_info.get("vim_info"):
218 with self.db_lock:
219 temp_dict = {"status": vim_info["status"],
220 "error_msg": vim_info.get("error_msg"),
221 "vim_info": vim_info.get("vim_info")}
222 self.db.update_rows('instance_vms', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]})
223 task["extra"]["vim_status"] = vim_info["status"]
224 task["error_msg"] = vim_info.get("error_msg")
225 task["vim_info"] = vim_info.get("vim_info")
226 task_need_update = True
227 for interface in vim_info.get("interfaces", ()):
228 vim_interface_id = interface["vim_interface_id"]
229 if vim_interface_id not in task["extra"]["interfaces"]:
230 self.logger.critical("Interface not found {} on task info {}".format(
231 vim_interface_id, task["extra"]["interfaces"]), exc_info=True)
232 continue
233 task_interface = task["extra"]["interfaces"][vim_interface_id]
234 task_vim_interface = task["vim_interfaces"].get(vim_interface_id)
235 if task_vim_interface != interface:
236 # delete old port
237 if task_interface.get("sdn_port_id"):
238 try:
239 with self.db_lock:
240 self.ovim.delete_port(task_interface["sdn_port_id"])
241 task_interface["sdn_port_id"] = None
242 task_need_update = True
243 except ovimException as e:
244 self.logger.error("ovimException deleting external_port={} ".format(
245 task_interface["sdn_port_id"]) + str(e), exc_info=True)
246 # TODO Set error_msg at instance_nets
247
248 # Create SDN port
249 sdn_net_id = task_interface.get("sdn_net_id")
250 if sdn_net_id and interface.get("compute_node") and interface.get("pci"):
251 sdn_port_name = sdn_net_id + "." + task["vim_id"]
252 sdn_port_name = sdn_port_name[:63]
253 try:
254 with self.db_lock:
255 sdn_port_id = self.ovim.new_external_port(
256 {"compute_node": interface["compute_node"],
257 "pci": interface["pci"],
258 "vlan": interface.get("vlan"),
259 "net_id": sdn_net_id,
260 "region": self.vim["config"]["datacenter_id"],
261 "name": sdn_port_name,
262 "mac": interface.get("mac_address")})
263 task_interface["sdn_port_id"] = sdn_port_id
264 task_need_update = True
265 except (ovimException, Exception) as e:
266 self.logger.error(
267 "ovimException creating new_external_port compute_node={} "
268 "pci={} vlan={} ".format(
269 interface["compute_node"],
270 interface["pci"],
271 interface.get("vlan")) + str(e),
272 exc_info=True)
273 # TODO Set error_msg at instance_nets
274 with self.db_lock:
275 self.db.update_rows(
276 'instance_interfaces',
277 UPDATE={"mac_address": interface.get("mac_address"),
278 "ip_address": interface.get("ip_address"),
279 "vim_info": interface.get("vim_info"),
280 "sdn_port_id": task_interface.get("sdn_port_id"),
281 "compute_node": interface.get("compute_node"),
282 "pci": interface.get("pci"),
283 "vlan": interface.get("vlan"),
284 },
285 WHERE={'uuid': task_interface["iface_id"]})
286 task["vim_interfaces"][vim_interface_id] = interface
287 if task_need_update:
288 with self.db_lock:
289 self.db.update_rows(
290 'vim_actions',
291 UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256),
292 "error_msg": task.get("error_msg"), "modified_at": now},
293 WHERE={'instance_action_id': task['instance_action_id'],
294 'task_index': task['task_index']})
295 if task["extra"].get("vim_status") == "BUILD":
296 self._insert_refresh(task, now + self.REFRESH_BUILD)
297 else:
298 self._insert_refresh(task, now + self.REFRESH_ACTIVE)
299 except vimconn.vimconnException as e:
300 self.logger.error("vimconnException Exception when trying to refresh vms " + str(e))
301 self._insert_refresh(task, now + self.REFRESH_ACTIVE)
302
303 if net_to_refresh_list:
304 try:
305 now = time.time()
306 vim_dict = self.vim.refresh_nets_status(net_to_refresh_list)
307 for vim_id, vim_info in vim_dict.items():
308 # look for task
309 task = net_to_refresh_dict[vim_id]
310 self.logger.debug("get-net net_id=%s result=%s", task["vim_id"], str(vim_info))
311
312 task_vim_info = task.get("vim_info")
313 task_vim_status = task["extra"].get("vim_status")
314 task_error_msg = task.get("error_msg")
315 task_sdn_net_id = task["extra"].get("sdn_net_id")
316
317 # get ovim status
318 if task_sdn_net_id:
319 try:
320 with self.db_lock:
321 sdn_net = self.ovim.show_network(task_sdn_net_id)
322 if sdn_net["status"] == "ERROR":
323 if not vim_info.get("error_msg"):
324 vim_info["error_msg"] = sdn_net["error_msg"]
325 else:
326 vim_info["error_msg"] = "VIM_ERROR: {} && SDN_ERROR: {}".format(
327 self._format_vim_error_msg(vim_info["error_msg"], 1024//2-14),
328 self._format_vim_error_msg(sdn_net["error_msg"], 1024//2-14))
329 if vim_info["status"] == "VIM_ERROR":
330 vim_info["status"] = "VIM_SDN_ERROR"
331 else:
332 vim_info["status"] = "SDN_ERROR"
333
334 except (ovimException, Exception) as e:
335 self.logger.error(
336 "ovimException getting network infor snd_net_id={}".format(task_sdn_net_id),
337 exc_info=True)
338 # TODO Set error_msg at instance_nets
339
340 # update database
341 if vim_info.get("error_msg"):
342 vim_info["error_msg"] = self._format_vim_error_msg(vim_info["error_msg"])
343 if task_vim_status != vim_info["status"] or task_error_msg != vim_info.get("error_msg") or \
344 task_vim_info != vim_info["vim_info"]:
345 task["extra"]["vim_status"] = vim_info["status"]
346 task["error_msg"] = vim_info.get("error_msg")
347 task["vim_info"] = vim_info["vim_info"]
348 temp_dict = {"status": vim_info["status"],
349 "error_msg": vim_info.get("error_msg"),
350 "vim_info": vim_info["vim_info"]}
351 with self.db_lock:
352 self.db.update_rows('instance_nets', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]})
353 self.db.update_rows(
354 'vim_actions',
355 UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256),
356 "error_msg": task.get("error_msg"), "modified_at": now},
357 WHERE={'instance_action_id': task['instance_action_id'],
358 'task_index': task['task_index']})
359 if task["extra"].get("vim_status") == "BUILD":
360 self._insert_refresh(task, now + self.REFRESH_BUILD)
361 else:
362 self._insert_refresh(task, now + self.REFRESH_ACTIVE)
363 except vimconn.vimconnException as e:
364 self.logger.error("vimconnException Exception when trying to refresh nets " + str(e))
365 self._insert_refresh(task, now + self.REFRESH_ACTIVE)
366
367 return nb_processed
368
369 def _insert_refresh(self, task, threshold_time=None):
370 """Insert a task at list of refreshing elements. The refreshing list is ordered by threshold_time (task['modified_at']
371 It is assumed that this is called inside this thread
372 """
373 if not self.vim:
374 return
375 if not threshold_time:
376 threshold_time = time.time()
377 task["modified_at"] = threshold_time
378 task_name = task["item"][9:] + "-" + task["action"]
379 task_id = task["instance_action_id"] + "." + str(task["task_index"])
380 for index in range(0, len(self.refresh_tasks)):
381 if self.refresh_tasks[index]["modified_at"] > threshold_time:
382 self.refresh_tasks.insert(index, task)
383 break
384 else:
385 index = len(self.refresh_tasks)
386 self.refresh_tasks.append(task)
387 self.logger.debug("new refresh task={} name={}, modified_at={} index={}".format(
388 task_id, task_name, task["modified_at"], index))
389
390 def _remove_refresh(self, task_name, vim_id):
391 """Remove a task with this name and vim_id from the list of refreshing elements.
392 It is assumed that this is called inside this thread outside _refres_elements method
393 Return True if self.refresh_list is modified, task is found
394 Return False if not found
395 """
396 index_to_delete = None
397 for index in range(0, len(self.refresh_tasks)):
398 if self.refresh_tasks[index]["name"] == task_name and self.refresh_tasks[index]["vim_id"] == vim_id:
399 index_to_delete = index
400 break
401 else:
402 return False
403 if index_to_delete != None:
404 del self.refresh_tasks[index_to_delete]
405 return True
406
407 def _proccess_pending_tasks(self):
408 nb_created = 0
409 nb_processed = 0
410 while self.pending_tasks:
411 task = self.pending_tasks.pop(0)
412 nb_processed += 1
413 if task["status"] == "SUPERSEDED":
414 # not needed to do anything but update database with the new status
415 result = True
416 database_update = None
417 elif not self.vim:
418 task["status"] == "ERROR"
419 task["error_msg"] = self.error_status
420 result = False
421 database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]}
422 elif task["item"] == 'instance_vms':
423 if task["action"] == "CREATE":
424 result, database_update = self.new_vm(task)
425 nb_created += 1
426 elif task["action"] == "DELETE":
427 result, database_update = self.del_vm(task)
428 else:
429 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
430 elif task["item"] == 'instance_nets':
431 if task["action"] == "CREATE":
432 result, database_update = self.new_net(task)
433 nb_created += 1
434 elif task["action"] == "DELETE":
435 result, database_update = self.del_net(task)
436 elif task["action"] == "FIND":
437 result, database_update = self.get_net(task)
438 else:
439 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
440 else:
441 raise vimconn.vimconnException(self.name + "unknown task item {}".format(task["item"]))
442 # TODO
443
444 if task["status"] == "SCHEDULED":
445 # This is because a depend task is not completed. Moved to the end. NOT USED YET
446 if task["extra"].get("tries", 0) > 3:
447 task["status"] == "FAILED"
448 else:
449 task["extra"]["tries"] = task["extra"].get("tries", 0) + 1
450 self.pending_tasks.append(task)
451 elif task["action"] == "DELETE":
452 action_key = task["item"] + task["item_id"]
453 del self.grouped_tasks[action_key]
454 elif task["action"] in ("CREATE", "FIND") and task["status"] in ("DONE", "BUILD"):
455 self._insert_refresh(task)
456
457 self.logger.debug("vim_action id={}.{} item={} action={} result={}:{} params={}".format(
458 task["instance_action_id"], task["task_index"], task["item"], task["action"],
459 task["status"], task["vim_id"] if task["status"] == "DONE" else task.get("error_msg"),
460 task["params"]))
461 try:
462 now = time.time()
463 with self.db_lock:
464 self.db.update_rows(
465 table="vim_actions",
466 UPDATE={"status": task["status"], "vim_id": task["vim_id"], "modified_at": now,
467 "error_msg": task["error_msg"],
468 "extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256)},
469 WHERE={"instance_action_id": task["instance_action_id"], "task_index": task["task_index"]})
470 if result is not None:
471 self.db.update_rows(
472 table="instance_actions",
473 UPDATE={("number_done" if result else "number_failed"): {"INCREMENT": 1},
474 "modified_at": now},
475 WHERE={"uuid": task["instance_action_id"]})
476 if database_update:
477 self.db.update_rows(table=task["item"],
478 UPDATE=database_update,
479 WHERE={"uuid": task["item_id"]})
480 except db_base_Exception as e:
481 self.logger.error("Error updating database %s", str(e), exc_info=True)
482
483 if nb_created == 10:
484 break
485 return nb_processed
486
487 def _insert_pending_tasks(self, vim_actions_list):
488 now = time.time()
489 for task in vim_actions_list:
490 if task["datacenter_vim_id"] != self.datacenter_tenant_id:
491 continue
492 item = task["item"]
493 item_id = task["item_id"]
494 action_key = item + item_id
495 if action_key not in self.grouped_tasks:
496 self.grouped_tasks[action_key] = []
497 task["params"] = None
498 task["depends"] = {}
499 if task["extra"]:
500 extra = yaml.load(task["extra"])
501 task["extra"] = extra
502 task["params"] = extra.get("params")
503 depends_on_list = extra.get("depends_on")
504 if depends_on_list:
505 for index in depends_on_list:
506 if index < len(vim_actions_list) and vim_actions_list[index]["task_index"] == index and\
507 vim_actions_list[index]["instance_action_id"] == task["instance_action_id"]:
508 task["depends"]["TASK-" + str(index)] = vim_actions_list[index]
509 if extra.get("interfaces"):
510 task["vim_interfaces"] = {}
511 else:
512 task["extra"] = {}
513 if "error_msg" not in task:
514 task["error_msg"] = None
515 if "vim_id" not in task:
516 task["vim_id"] = None
517
518 if task["action"] == "DELETE":
519 need_delete_action = False
520 for to_supersede in self.grouped_tasks.get(action_key, ()):
521 if to_supersede["action"] == "FIND" and to_supersede.get("vim_id"):
522 task["vim_id"] = to_supersede["vim_id"]
523 if to_supersede["action"] == "CREATE" and to_supersede.get("vim_id"):
524 need_delete_action = True
525 task["vim_id"] = to_supersede["vim_id"]
526 if to_supersede["extra"].get("sdn_vim_id"):
527 task["extra"]["sdn_vim_id"] = to_supersede["extra"]["sdn_vim_id"]
528 if to_supersede["extra"].get("interfaces"):
529 task["extra"]["interfaces"] = to_supersede["extra"]["interfaces"]
530 # Mark task as SUPERSEDED.
531 # If task is in self.pending_tasks, it will be removed and database will be update
532 # If task is in self.refresh_tasks, it will be removed
533 to_supersede["status"] = "SUPERSEDED"
534 if not need_delete_action:
535 task["status"] = "SUPERSEDED"
536
537 self.grouped_tasks[action_key].append(task)
538 self.pending_tasks.append(task)
539 elif task["status"] == "SCHEDULED":
540 self.grouped_tasks[action_key].append(task)
541 self.pending_tasks.append(task)
542 elif task["action"] in ("CREATE", "FIND"):
543 self.grouped_tasks[action_key].append(task)
544 if task["status"] in ("DONE", "BUILD"):
545 self._insert_refresh(task)
546 # TODO add VM reset, get console, etc...
547 else:
548 raise vimconn.vimconnException(self.name + "unknown vim_action action {}".format(task["action"]))
549
550 def insert_task(self, task):
551 try:
552 self.task_queue.put(task, False)
553 return None
554 except Queue.Full:
555 raise vimconn.vimconnException(self.name + ": timeout inserting a task")
556
557 def del_task(self, task):
558 with self.task_lock:
559 if task["status"] == "SCHEDULED":
560 task["status"] == "SUPERSEDED"
561 return True
562 else: # task["status"] == "processing"
563 self.task_lock.release()
564 return False
565
566 def run(self):
567 self.logger.debug("Starting")
568 while True:
569 self._reload_vim_actions()
570 reload_thread = False
571 while True:
572 try:
573 while not self.task_queue.empty():
574 task = self.task_queue.get()
575 if isinstance(task, list):
576 self._insert_pending_tasks(task)
577 elif isinstance(task, str):
578 if task == 'exit':
579 return 0
580 elif task == 'reload':
581 reload_thread = True
582 break
583 self.task_queue.task_done()
584 if reload_thread:
585 break
586 nb_processed = self._proccess_pending_tasks()
587 nb_processed += self._refres_elements()
588 if not nb_processed:
589 time.sleep(1)
590
591 except Exception as e:
592 self.logger.critical("Unexpected exception at run: " + str(e), exc_info=True)
593
594 self.logger.debug("Finishing")
595
596 def terminate(self, task):
597 return True, None
598
599 def _look_for_task(self, instance_action_id, task_id):
600 task_index = task_id.split("-")[-1]
601 with self.db_lock:
602 tasks = self.db.get_rows(FROM="vim_actions", WHERE={"instance_action_id": instance_action_id,
603 "task_index": task_index})
604 if not tasks:
605 return None
606 task = tasks[0]
607 task["params"] = None
608 task["depends"] = {}
609 if task["extra"]:
610 extra = yaml.load(task["extra"])
611 task["extra"] = extra
612 task["params"] = extra.get("params")
613 if extra.get("interfaces"):
614 task["vim_interfaces"] = {}
615 else:
616 task["extra"] = {}
617 return task
618
619 def _format_vim_error_msg(self, error_text, max_length=1024):
620 if error_text and len(error_text) >= max_length:
621 return error_text[:max_length//2-3] + " ... " + error_text[-max_length//2+3:]
622 return error_text
623
624 def new_net(self, task):
625 try:
626 task_id = task["instance_action_id"] + "." + str(task["task_index"])
627 params = task["params"]
628 vim_net_id = self.vim.new_network(*params)
629
630 net_name = params[0]
631 net_type = params[1]
632
633 network = None
634 sdn_net_id = None
635 sdn_controller = self.vim.config.get('sdn-controller')
636 if sdn_controller and (net_type == "data" or net_type == "ptp"):
637 network = {"name": net_name, "type": net_type, "region": self.vim["config"]["datacenter_id"]}
638
639 vim_net = self.vim.get_network(vim_net_id)
640 if vim_net.get('encapsulation') != 'vlan':
641 raise vimconn.vimconnException(
642 "net '{}' defined as type '{}' has not vlan encapsulation '{}'".format(
643 net_name, net_type, vim_net['encapsulation']))
644 network["vlan"] = vim_net.get('segmentation_id')
645 try:
646 with self.db_lock:
647 sdn_net_id = self.ovim.new_network(network)
648 except (ovimException, Exception) as e:
649 self.logger.error("task=%s cannot create SDN network vim_net_id=%s input='%s' ovimException='%s'",
650 str(task_id), vim_net_id, str(network), str(e))
651 task["status"] = "DONE"
652 task["extra"]["vim_info"] = {}
653 task["extra"]["sdn_net_id"] = sdn_net_id
654 task["error_msg"] = None
655 task["vim_id"] = vim_net_id
656 instance_element_update = {"vim_net_id": vim_net_id, "sdn_net_id": sdn_net_id, "status": "BUILD", "error_msg": None}
657 return True, instance_element_update
658 except vimconn.vimconnException as e:
659 self.logger.error("Error creating NET, task=%s: %s", str(task_id), str(e))
660 task["status"] = "FAILED"
661 task["vim_id"] = None
662 task["error_msg"] = self._format_vim_error_msg(str(e))
663 instance_element_update = {"vim_net_id": None, "sdn_net_id": None, "status": "VIM_ERROR", "error_msg": task["error_msg"]}
664 return False, instance_element_update
665
666 def new_vm(self, task):
667 try:
668 params = task["params"]
669 task_id = task["instance_action_id"] + "." + str(task["task_index"])
670 depends = task.get("depends")
671 net_list = params[5]
672 error_text = ""
673 for net in net_list:
674 if "net_id" in net and is_task_id(net["net_id"]): # change task_id into network_id
675 if net["net_id"] in depends:
676 task_net = depends[net["net_id"]]
677 else:
678 task_net = self._look_for_task(task["instance_action_id"], net["net_id"])
679 if not task_net:
680 raise VimThreadException(
681 "Error trying to get depending task from task_index={}".format(net["net_id"]))
682 network_id = task_net.get("vim_id")
683 if not network_id:
684 raise VimThreadException(
685 "Cannot create VM because depends on a network not created or found: " +
686 str(task_net["error_msg"]))
687 net["net_id"] = network_id
688 vim_vm_id = self.vim.new_vminstance(*params)
689
690 # fill task_interfaces. Look for snd_net_id at database for each interface
691 task_interfaces = {}
692 for iface in net_list:
693 task_interfaces[iface["vim_id"]] = {"iface_id": iface["uuid"]}
694 with self.db_lock:
695 result = self.db.get_rows(SELECT=('sdn_net_id',),
696 FROM='instance_nets as ine join instance_interfaces as ii on ii.instance_net_id=ine.uuid',
697 WHERE={'ii.uuid': iface["uuid"]})
698 if result:
699 task_interfaces[iface["vim_id"]]["sdn_net_id"] = result[0]['sdn_net_id']
700 else:
701 self.logger.critical("Error creating VM, task=%s Network {} not found at DB", task_id,
702 iface["uuid"], exc_info=True)
703
704 task["vim_info"] = {}
705 task["vim_interfaces"] = {}
706 task["extra"]["interfaces"] = task_interfaces
707 task["error_msg"] = None
708 task["status"] = "DONE"
709 task["vim_id"] = vim_vm_id
710 instance_element_update = {"status": "BUILD", "vim_vm_id": vim_vm_id, "error_msg": None}
711 return True, instance_element_update
712
713 except (vimconn.vimconnException, VimThreadException) as e:
714 self.logger.error("Error creating VM, task=%s: %s", task_id, str(e))
715 error_text = self._format_vim_error_msg(str(e))
716 task["error_msg"] = error_text
717 task["status"] = "FAILED"
718 task["vim_id"] = None
719 instance_element_update = {"status": "VIM_ERROR", "vim_vm_id": None, "error_msg": error_text}
720 return False, instance_element_update
721
722 def del_vm(self, task):
723 vm_vim_id = task["vim_id"]
724 interfaces = task["extra"].get("interfaces", ())
725 try:
726 for iface in interfaces.values():
727 if iface.get("sdn_port_id"):
728 try:
729 with self.db_lock:
730 self.ovim.delete_port(iface["sdn_port_id"])
731 except ovimException as e:
732 self.logger.error("ovimException deleting external_port={} at VM vim_id={} deletion ".format(
733 iface["sdn_port_id"], vm_vim_id) + str(e), exc_info=True)
734 # TODO Set error_msg at instance_nets
735
736 self.vim.delete_vminstance(vm_vim_id)
737 task["status"] = "DONE"
738 task["error_msg"] = None
739 return True, None
740
741 except vimconn.vimconnException as e:
742 task["error_msg"] = self._format_vim_error_msg(str(e))
743 if isinstance(e, vimconn.vimconnNotFoundException):
744 # If not found mark as Done and fill error_msg
745 task["status"] = "DONE"
746 return True, None
747 task["status"] = "FAILED"
748 return False, None
749
750 def del_net(self, task):
751 net_vim_id = task["vim_id"]
752 sdn_net_id = task["extra"].get("sdn_net_id")
753 try:
754 if sdn_net_id:
755 # Delete any attached port to this sdn network. There can be ports associated to this network in case
756 # it was manually done using 'openmano vim-net-sdn-attach'
757 with self.db_lock:
758 port_list = self.ovim.get_ports(columns={'uuid'},
759 filter={'name': 'external_port', 'net_id': sdn_net_id})
760 for port in port_list:
761 self.ovim.delete_port(port['uuid'])
762 self.ovim.delete_network(sdn_net_id)
763 self.vim.delete_network(net_vim_id)
764 task["status"] = "DONE"
765 task["error_msg"] = None
766 return True, None
767 except ovimException as e:
768 task["error_msg"] = self._format_vim_error_msg("ovimException obtaining and deleting external "
769 "ports for net {}: {}".format(sdn_net_id, str(e)))
770 except vimconn.vimconnException as e:
771 task["error_msg"] = self._format_vim_error_msg(str(e))
772 if isinstance(e, vimconn.vimconnNotFoundException):
773 # If not found mark as Done and fill error_msg
774 task["status"] = "DONE"
775 return True, None
776 task["status"] = "FAILED"
777 return False, None
778
779 def get_net(self, task):
780 try:
781 task_id = task["instance_action_id"] + "." + str(task["task_index"])
782 params = task["params"]
783 filter = params[0]
784 vim_nets = self.vim.get_network_list(filter)
785 if not vim_nets:
786 raise VimThreadException("Network not found with this criteria: '{}'".format(filter))
787 elif len(vim_nets) > 1:
788 raise VimThreadException("More than one network found with this criteria: '{}'".format(filter))
789 vim_net_id = vim_nets[0]["id"]
790
791 # Discover if this network is managed by a sdn controller
792 sdn_net_id = None
793 with self.db_lock:
794 result = self.db.get_rows(SELECT=('sdn_net_id',), FROM='instance_nets',
795 WHERE={'vim_net_id': vim_net_id, 'instance_scenario_id': None,
796 'datacenter_tenant_id': self.datacenter_tenant_id})
797 if result:
798 sdn_net_id = result[0]['sdn_net_id']
799
800 task["status"] = "DONE"
801 task["extra"]["vim_info"] = {}
802 task["extra"]["created"] = False
803 task["extra"]["sdn_net_id"] = sdn_net_id
804 task["error_msg"] = None
805 task["vim_id"] = vim_net_id
806 instance_element_update = {"vim_net_id": vim_net_id, "created": False, "status": "BUILD",
807 "error_msg": None}
808 return True, instance_element_update
809 except (vimconn.vimconnException, VimThreadException) as e:
810 self.logger.error("Error looking for NET, task=%s: %s", str(task_id), str(e))
811 task["status"] = "FAILED"
812 task["vim_id"] = None
813 task["error_msg"] = self._format_vim_error_msg(str(e))
814 instance_element_update = {"vim_net_id": None, "status": "VIM_ERROR",
815 "error_msg": task["error_msg"]}
816 return False, instance_element_update