External port implementation for SDN assist
[osm/RO.git] / osm_ro / vim_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
5 # This file is part of openvim
6 # All Rights Reserved.
7 #
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
11 #
12 # http://www.apache.org/licenses/LICENSE-2.0
13 #
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
18 # under the License.
19 #
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
22 ##
23
24 """"
25 This is thread that interact with the host and the libvirt to manage VM
26 One thread will be launched per host
27 """
28
29 import threading
30 import time
31 import Queue
32 import logging
33 import vimconn
34 from db_base import db_base_Exception
35 from lib_osm_openvim.ovim import ovimException
36
37 __author__ = "Alfonso Tierno, Pablo Montes"
38 __date__ = "$10-feb-2017 12:07:15$"
39
40 # from logging import Logger
41 # import auxiliary_functions as af
42
43
44 def is_task_id(task_id):
45 return True if task_id[:5] == "TASK." else False
46
47
48 class vim_thread(threading.Thread):
49
50 def __init__(self, vimconn, task_lock, name=None, datacenter_name=None, datacenter_tenant_id=None,
51 db=None, db_lock=None, ovim=None):
52 """Init a thread.
53 Arguments:
54 'id' number of thead
55 'name' name of thread
56 'host','user': host ip or name to manage and user
57 'db', 'db_lock': database class and lock to use it in exclusion
58 """
59 self.tasksResult = {}
60 """ It will contain a dictionary with
61 task_id:
62 status: enqueued,done,error,deleted,processing
63 result: VIM result,
64 """
65 threading.Thread.__init__(self)
66 self.vim = vimconn
67 self.datacenter_name = datacenter_name
68 self.datacenter_tenant_id = datacenter_tenant_id
69 self.ovim = ovim
70 if not name:
71 self.name = vimconn["id"] + "." + vimconn["config"]["datacenter_tenant_id"]
72 else:
73 self.name = name
74
75 self.logger = logging.getLogger('openmano.vim.'+self.name)
76 self.db = db
77 self.db_lock = db_lock
78
79 self.task_lock = task_lock
80 self.task_queue = Queue.Queue(2000)
81 self.refresh_list = []
82 """Contains time ordered task list for refreshing the status of VIM VMs and nets"""
83
84 def _refres_elements(self):
85 """Call VIM to get VMs and networks status until 10 elements"""
86 now = time.time()
87 vm_to_refresh_list = []
88 net_to_refresh_list = []
89 vm_to_refresh_dict = {}
90 net_to_refresh_dict = {}
91 items_to_refresh = 0
92 while self.refresh_list:
93 task = self.refresh_list[0]
94 with self.task_lock:
95 if task['status'] == 'deleted':
96 self.refresh_list.pop(0)
97 continue
98 if task['time'] > now:
99 break
100 task["status"] = "processing"
101 self.refresh_list.pop(0)
102 if task["name"] == 'get-vm':
103 vm_to_refresh_list.append(task["vim_id"])
104 vm_to_refresh_dict[task["vim_id"]] = task
105 elif task["name"] == 'get-net':
106 net_to_refresh_list.append(task["vim_id"])
107 net_to_refresh_dict[task["vim_id"]] = task
108 else:
109 error_text = "unknown task {}".format(task["name"])
110 self.logger.error(error_text)
111 items_to_refresh += 1
112 if items_to_refresh == 10:
113 break
114
115 if vm_to_refresh_list:
116 try:
117 vim_dict = self.vim.refresh_vms_status(vm_to_refresh_list)
118 for vim_id, vim_info in vim_dict.items():
119 #look for task
120 task = vm_to_refresh_dict[vim_id]
121 self.logger.debug("get-vm vm_id=%s result=%s", task["vim_id"], str(vim_info))
122
123 # update database
124 if vim_info.get("error_msg"):
125 vim_info["error_msg"] = self._format_vim_error_msg(vim_info["error_msg"])
126 if task["vim_info"].get("status") != vim_info["status"] or \
127 task["vim_info"].get("error_msg") != vim_info.get("error_msg") or \
128 task["vim_info"].get("vim_info") != vim_info["vim_info"]:
129 with self.db_lock:
130 temp_dict = {"status": vim_info["status"],
131 "error_msg": vim_info.get("error_msg"),
132 "vim_info": vim_info["vim_info"]}
133 self.db.update_rows('instance_vms', UPDATE=temp_dict, WHERE={"vim_vm_id": vim_id})
134 for interface in vim_info["interfaces"]:
135 for task_interface in task["vim_info"]["interfaces"]:
136 if task_interface["vim_net_id"] == interface["vim_net_id"]:
137 break
138 else:
139 task_interface = {"vim_net_id": interface["vim_net_id"]}
140 task["vim_info"]["interfaces"].append(task_interface)
141 if task_interface != interface:
142 #delete old port
143 if task_interface.get("sdn_port_id"):
144 try:
145 self.ovim.delete_port(task_interface["sdn_port_id"])
146 task_interface["sdn_port_id"] = None
147 except ovimException as e:
148 self.logger.error("ovimException deleting external_port={} ".format(
149 task_interface["sdn_port_id"]) + str(e), exc_info=True)
150 # TODO Set error_msg at instance_nets
151 vim_net_id = interface["vim_net_id"]
152 sdn_net_id = None
153 sdn_port_name = None
154 with self.db_lock:
155 where_= {'iv.vim_vm_id': vim_id, "ine.vim_net_id": vim_net_id,
156 'ine.datacenter_tenant_id': self.datacenter_tenant_id}
157 # TODO check why vim_interface_id is not present at database
158 # if interface.get("vim_interface_id"):
159 # where_["vim_interface_id"] = interface["vim_interface_id"]
160 db_ifaces = self.db.get_rows(
161 FROM="instance_interfaces as ii left join instance_nets as ine on "
162 "ii.instance_net_id=ine.uuid left join instance_vms as iv on "
163 "ii.instance_vm_id=iv.uuid",
164 SELECT=("ii.uuid as iface_id", "ine.uuid as net_id", "iv.uuid as vm_id", "sdn_net_id", "vim_net_id"),
165 WHERE=where_)
166 if len(db_ifaces)>1:
167 self.logger.critical("Refresing interfaces. "
168 "Found more than one interface at database for '{}'".format(where_))
169 elif len(db_ifaces)==0:
170 self.logger.critical("Refresing interfaces. "
171 "Not found any interface at database for '{}'".format(where_))
172 continue
173 else:
174 db_iface = db_ifaces[0]
175 #If there is no sdn_net_id, check if it is because an already created vim network is being used
176 #in that case, the sdn_net_id will be in that entry of the instance_nets table
177 if not db_iface.get("sdn_net_id"):
178 result = self.db.get_rows(SELECT=('sdn_net_id',), FROM='instance_nets',
179 WHERE={'vim_net_id': db_iface.get("vim_net_id"), 'instance_scenario_id': None, "datacenter_tenant_id": self.datacenter_tenant_id})
180 if len(result) == 1:
181 db_iface["sdn_net_id"] = result[0]['sdn_net_id']
182
183 if db_iface.get("sdn_net_id") and interface.get("compute_node") and interface.get("pci"):
184 sdn_net_id = db_iface["sdn_net_id"]
185 sdn_port_name = sdn_net_id + "." + db_iface["vm_id"]
186 sdn_port_name = sdn_port_name[:63]
187 try:
188 sdn_port_id = self.ovim.new_external_port(
189 {"compute_node": interface["compute_node"],
190 "pci": interface["pci"],
191 "vlan": interface.get("vlan"),
192 "net_id": sdn_net_id,
193 "region": self.vim["config"]["datacenter_id"],
194 "name": sdn_port_name,
195 "mac": interface.get("mac_address")})
196 interface["sdn_port_id"] = sdn_port_id
197 except (ovimException, Exception) as e:
198 self.logger.error(
199 "ovimException creating new_external_port compute_node={} " \
200 "pci={} vlan={} ".format(
201 interface["compute_node"],
202 interface["pci"],
203 interface.get("vlan")) + str(e),
204 exc_info=True)
205 # TODO Set error_msg at instance_nets
206 with self.db_lock:
207 vim_net_id = interface.pop("vim_net_id")
208 self.db.update_rows('instance_interfaces', UPDATE=interface,
209 WHERE={'uuid': db_iface["iface_id"]})
210 interface["vim_net_id"] = vim_net_id
211 # TODO insert instance_id
212
213 task["vim_info"] = vim_info
214 if task["vim_info"]["status"] == "BUILD":
215 self._insert_refresh(task, now+5) # 5seconds
216 else:
217 self._insert_refresh(task, now+300) # 5minutes
218 except vimconn.vimconnException as e:
219 self.logger.error("vimconnException Exception when trying to refresh vms " + str(e))
220 self._insert_refresh(task, now + 300) # 5minutes
221
222 if net_to_refresh_list:
223 try:
224 vim_dict = self.vim.refresh_nets_status(net_to_refresh_list)
225 for vim_id, vim_info in vim_dict.items():
226 #look for task
227 task = net_to_refresh_dict[vim_id]
228 self.logger.debug("get-net net_id=%s result=%s", task["vim_id"], str(vim_info))
229
230 #get database info
231 where_ = {"vim_net_id": vim_id, 'datacenter_tenant_id': self.datacenter_tenant_id}
232 with self.db_lock:
233 db_nets = self.db.get_rows(
234 FROM="instance_nets",
235 SELECT=("uuid as net_id", "sdn_net_id"),
236 WHERE=where_)
237 if len(db_nets) > 1:
238 self.logger.critical("Refresing networks. "
239 "Found more than one instance-networks at database for '{}'".format(where_))
240 elif len(db_nets) == 0:
241 self.logger.critical("Refresing networks. "
242 "Not found any instance-network at database for '{}'".format(where_))
243 continue
244 else:
245 db_net = db_nets[0]
246 if db_net.get("sdn_net_id"):
247 # get ovim status
248 try:
249 sdn_net = self.ovim.show_network(db_net["sdn_net_id"])
250 if sdn_net["status"] == "ERROR":
251 if not vim_info.get("error_msg"):
252 vim_info["error_msg"] = sdn_net["error_msg"]
253 else:
254 vim_info["error_msg"] = "VIM_ERROR: {} && SDN_ERROR: {}".format(
255 self._format_vim_error_msg(vim_info["error_msg"], 1024//2-14),
256 self._format_vim_error_msg(sdn_net["error_msg"], 1024//2-14))
257 if vim_info["status"] == "VIM_ERROR":
258 vim_info["status"] = "VIM_SDN_ERROR"
259 else:
260 vim_info["status"] = "SDN_ERROR"
261
262 except (ovimException, Exception) as e:
263 self.logger.error(
264 "ovimException getting network infor snd_net_id={}".format(db_net["sdn_net_id"]),
265 exc_info=True)
266 # TODO Set error_msg at instance_nets
267
268 # update database
269 if vim_info.get("error_msg"):
270 vim_info["error_msg"] = self._format_vim_error_msg(vim_info["error_msg"])
271 if task["vim_info"].get("status") != vim_info["status"] or \
272 task["vim_info"].get("error_msg") != vim_info.get("error_msg") or \
273 task["vim_info"].get("vim_info") != vim_info["vim_info"]:
274 with self.db_lock:
275 temp_dict = {"status": vim_info["status"],
276 "error_msg": vim_info.get("error_msg"),
277 "vim_info": vim_info["vim_info"]}
278 self.db.update_rows('instance_nets', UPDATE=temp_dict, WHERE={"vim_net_id": vim_id})
279
280 task["vim_info"] = vim_info
281 if task["vim_info"]["status"] == "BUILD":
282 self._insert_refresh(task, now+5) # 5seconds
283 else:
284 self._insert_refresh(task, now+300) # 5minutes
285 except vimconn.vimconnException as e:
286 self.logger.error("vimconnException Exception when trying to refresh nets " + str(e))
287 self._insert_refresh(task, now + 300) # 5minutes
288
289 if not items_to_refresh:
290 time.sleep(1)
291
292 def _insert_refresh(self, task, threshold_time):
293 """Insert a task at list of refreshing elements. The refreshing list is ordered by threshold_time (task['time']
294 It is assumed that this is called inside this thread
295 """
296 task["time"] = threshold_time
297 for index in range(0, len(self.refresh_list)):
298 if self.refresh_list[index]["time"] > threshold_time:
299 self.refresh_list.insert(index, task)
300 break
301 else:
302 index = len(self.refresh_list)
303 self.refresh_list.append(task)
304 self.logger.debug("new refresh task={} name={}, time={} index={}".format(
305 task["id"], task["name"], task["time"], index))
306
307 def _remove_refresh(self, task_name, vim_id):
308 """Remove a task with this name and vim_id from the list of refreshing elements.
309 It is assumed that this is called inside this thread outside _refres_elements method
310 Return True if self.refresh_list is modified, task is found
311 Return False if not found
312 """
313 index_to_delete = None
314 for index in range(0, len(self.refresh_list)):
315 if self.refresh_list[index]["name"] == task_name and self.refresh_list[index]["vim_id"] == vim_id:
316 index_to_delete = index
317 break
318 else:
319 return False
320 if index_to_delete != None:
321 del self.refresh_list[index_to_delete]
322 return True
323
324 def insert_task(self, task):
325 try:
326 self.task_queue.put(task, False)
327 return task["id"]
328 except Queue.Full:
329 raise vimconn.vimconnException(self.name + ": timeout inserting a task")
330
331 def del_task(self, task):
332 with self.task_lock:
333 if task["status"] == "enqueued":
334 task["status"] == "deleted"
335 return True
336 else: # task["status"] == "processing"
337 self.task_lock.release()
338 return False
339
340 def run(self):
341 self.logger.debug("Starting")
342 while True:
343 #TODO reload service
344 while True:
345 try:
346 if not self.task_queue.empty():
347 task = self.task_queue.get()
348 self.task_lock.acquire()
349 if task["status"] == "deleted":
350 self.task_lock.release()
351 continue
352 task["status"] = "processing"
353 self.task_lock.release()
354 else:
355 self._refres_elements()
356 continue
357 self.logger.debug("processing task id={} name={} params={}".format(task["id"], task["name"],
358 str(task["params"])))
359 if task["name"] == 'exit' or task["name"] == 'reload':
360 result, content = self.terminate(task)
361 elif task["name"] == 'new-vm':
362 result, content = self.new_vm(task)
363 elif task["name"] == 'del-vm':
364 result, content = self.del_vm(task)
365 elif task["name"] == 'new-net':
366 result, content = self.new_net(task)
367 elif task["name"] == 'del-net':
368 result, content = self.del_net(task)
369 else:
370 error_text = "unknown task {}".format(task["name"])
371 self.logger.error(error_text)
372 result = False
373 content = error_text
374 self.logger.debug("task id={} name={} result={}:{} params={}".format(task["id"], task["name"],
375 result, content,
376 str(task["params"])))
377
378 with self.task_lock:
379 task["status"] = "done" if result else "error"
380 task["result"] = content
381 self.task_queue.task_done()
382
383 if task["name"] == 'exit':
384 return 0
385 elif task["name"] == 'reload':
386 break
387 except Exception as e:
388 self.logger.critical("Unexpected exception at run: " + str(e), exc_info=True)
389
390 self.logger.debug("Finishing")
391
392 def terminate(self, task):
393 return True, None
394
395 def _format_vim_error_msg(self, error_text, max_length=1024):
396 if error_text and len(error_text) >= max_length:
397 return error_text[:max_length//2-3] + " ... " + error_text[-max_length//2+3:]
398 return error_text
399
400 def new_net(self, task):
401 try:
402 task_id = task["id"]
403 params = task["params"]
404 net_id = self.vim.new_network(*params)
405
406 net_name = params[0]
407 net_type = params[1]
408
409 network = None
410 sdn_net_id = None
411 sdn_controller = self.vim.config.get('sdn-controller')
412 if sdn_controller and (net_type == "data" or net_type == "ptp"):
413 network = {"name": net_name, "type": net_type}
414
415 vim_net = self.vim.get_network(net_id)
416 if vim_net.get('encapsulation') != 'vlan':
417 raise vimconn.vimconnException(
418 "net '{}' defined as type '{}' has not vlan encapsulation '{}'".format(
419 net_name, net_type, vim_net['encapsulation']))
420 network["vlan"] = vim_net.get('segmentation_id')
421 try:
422 sdn_net_id = self.ovim.new_network(network)
423 except (ovimException, Exception) as e:
424 self.logger.error("task=%s cannot create SDN network vim_net_id=%s input='%s' ovimException='%s'",
425 str(task_id), net_id, str(network), str(e))
426 with self.db_lock:
427 self.db.update_rows("instance_nets", UPDATE={"vim_net_id": net_id, "sdn_net_id": sdn_net_id},
428 WHERE={"vim_net_id": task_id})
429 new_refresh_task = {"status": "enqueued",
430 "id": task_id,
431 "name": "get-net",
432 "vim_id": net_id,
433 "vim_info": {} }
434 self._insert_refresh(new_refresh_task, time.time())
435 return True, net_id
436 except db_base_Exception as e:
437 self.logger.error("Error updating database %s", str(e))
438 return True, net_id
439 except vimconn.vimconnException as e:
440 self.logger.error("Error creating NET, task=%s: %s", str(task_id), str(e))
441 try:
442 with self.db_lock:
443 self.db.update_rows("instance_nets",
444 UPDATE={"error_msg": self._format_vim_error_msg(str(e)), "status": "VIM_ERROR"},
445 WHERE={"vim_net_id": task_id})
446 except db_base_Exception as e:
447 self.logger.error("Error updating database %s", str(e))
448 return False, str(e)
449 #except ovimException as e:
450 # self.logger.error("Error creating NET in ovim, task=%s: %s", str(task_id), str(e))
451 # return False, str(e)
452
453 def new_vm(self, task):
454 try:
455 params = task["params"]
456 task_id = task["id"]
457 depends = task.get("depends")
458 net_list = params[5]
459 error_text = ""
460 for net in net_list:
461 if "net_id" in net and is_task_id(net["net_id"]): # change task_id into network_id
462 try:
463 task_net = depends[net["net_id"]]
464 with self.task_lock:
465 if task_net["status"] == "error":
466 error_text = "Cannot create VM because depends on a network that cannot be created: " +\
467 str(task_net["result"])
468 break
469 elif task_net["status"] == "enqueued" or task_net["status"] == "processing":
470 error_text = "Cannot create VM because depends on a network still not created"
471 break
472 network_id = task_net["result"]
473 net["net_id"] = network_id
474 except Exception as e:
475 error_text = "Error trying to map from task_id={} to task result: {}".format(
476 net["net_id"],str(e))
477 break
478 if not error_text:
479 vm_id = self.vim.new_vminstance(*params)
480 try:
481 with self.db_lock:
482 if error_text:
483 update = self.db.update_rows("instance_vms",
484 UPDATE={"status": "VIM_ERROR", "error_msg": error_text},
485 WHERE={"vim_vm_id": task_id})
486 else:
487 update = self.db.update_rows("instance_vms", UPDATE={"vim_vm_id": vm_id}, WHERE={"vim_vm_id": task_id})
488 if not update:
489 self.logger.error("task id={} name={} database not updated vim_vm_id={}".format(
490 task["id"], task["name"], vm_id))
491 except db_base_Exception as e:
492 self.logger.error("Error updating database %s", str(e))
493 if error_text:
494 return False, error_text
495 new_refresh_task = {"status": "enqueued",
496 "id": task_id,
497 "name": "get-vm",
498 "vim_id": vm_id,
499 "vim_info": {"interfaces":[]} }
500 self._insert_refresh(new_refresh_task, time.time())
501 return True, vm_id
502 except vimconn.vimconnException as e:
503 self.logger.error("Error creating VM, task=%s: %s", str(task_id), str(e))
504 try:
505 with self.db_lock:
506 self.db.update_rows("instance_vms",
507 UPDATE={"error_msg": self._format_vim_error_msg(str(e)), "status": "VIM_ERROR"},
508 WHERE={"vim_vm_id": task_id})
509 except db_base_Exception as edb:
510 self.logger.error("Error updating database %s", str(edb))
511 return False, str(e)
512
513 def del_vm(self, task):
514 vm_id = task["params"][0]
515 interfaces = task["params"][1]
516 if is_task_id(vm_id):
517 try:
518 task_create = task["depends"][vm_id]
519 with self.task_lock:
520 if task_create["status"] == "error":
521 return True, "VM was not created. It has error: " + str(task_create["result"])
522 elif task_create["status"] == "enqueued" or task_create["status"] == "processing":
523 return False, "Cannot delete VM vim_id={} because still creating".format(vm_id)
524 vm_id = task_create["result"]
525 except Exception as e:
526 return False, "Error trying to get task_id='{}':".format(vm_id, str(e))
527 try:
528 self._remove_refresh("get-vm", vm_id)
529 for iface in interfaces:
530 if iface.get("sdn_port_id"):
531 try:
532 self.ovim.delete_port(iface["sdn_port_id"])
533 except ovimException as e:
534 self.logger.error("ovimException deleting external_port={} at VM vim_id={} deletion ".format(
535 iface["sdn_port_id"], vm_id) + str(e), exc_info=True)
536 # TODO Set error_msg at instance_nets
537
538 return True, self.vim.delete_vminstance(vm_id)
539 except vimconn.vimconnException as e:
540 return False, str(e)
541
542 def del_net(self, task):
543 net_id = task["params"][0]
544 sdn_net_id = task["params"][1]
545 if is_task_id(net_id):
546 try:
547 task_create = task["depends"][net_id]
548 with self.task_lock:
549 if task_create["status"] == "error":
550 return True, "net was not created. It has error: " + str(task_create["result"])
551 elif task_create["status"] == "enqueued" or task_create["status"] == "processing":
552 return False, "Cannot delete net because still creating"
553 net_id = task_create["result"]
554 except Exception as e:
555 return False, "Error trying to get task_id='{}':".format(net_id, str(e))
556 try:
557 self._remove_refresh("get-net", net_id)
558 result = self.vim.delete_network(net_id)
559 if sdn_net_id:
560 #Delete any attached port to this sdn network
561 #At this point, there will be ports associated to this network in case it was manually done using 'openmano vim-net-sdn-attach'
562 try:
563 port_list = self.ovim.get_ports(columns={'uuid'}, filter={'name': 'external_port', 'net_id': sdn_net_id})
564 except ovimException as e:
565 raise vimconn.vimconnException(
566 "ovimException obtaining external ports for net {}. ".format(sdn_net_id) + str(e))
567
568 for port in port_list:
569 try:
570 self.ovim.delete_port(port['uuid'])
571 except ovimException as e:
572 raise vimconn.vimconnException(
573 "ovimException deleting port {} for net {}. ".format(port['uuid'], sdn_net_id) + str(e))
574 with self.db_lock:
575 self.ovim.delete_network(sdn_net_id)
576 return True, result
577 except vimconn.vimconnException as e:
578 return False, str(e)
579 except ovimException as e:
580 logging.error("Error deleting network from ovim. net_id: {}, sdn_net_id: {}".format(net_id, sdn_net_id))
581 return False, str(e)
582
583