Merge branch 'vio' into v2.0
[osm/RO.git] / osm_ro / vim_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
5 # This file is part of openvim
6 # All Rights Reserved.
7 #
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
11 #
12 # http://www.apache.org/licenses/LICENSE-2.0
13 #
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
18 # under the License.
19 #
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
22 ##
23
24 """"
25 This is thread that interact with the host and the libvirt to manage VM
26 One thread will be launched per host
27 """
28
29 import threading
30 import time
31 import Queue
32 import logging
33 import vimconn
34 from db_base import db_base_Exception
35 from lib_osm_openvim.ovim import ovimException
36
37 __author__ = "Alfonso Tierno, Pablo Montes"
38 __date__ = "$10-feb-2017 12:07:15$"
39
40 # from logging import Logger
41 # import auxiliary_functions as af
42
43
44 def is_task_id(task_id):
45 return True if task_id[:5] == "TASK." else False
46
47
48 class vim_thread(threading.Thread):
49 REFRESH_BUILD = 5 # 5 seconds
50 REFRESH_ACTIVE = 60 # 1 minute
51
52 def __init__(self, vimconn, task_lock, name=None, datacenter_name=None, datacenter_tenant_id=None,
53 db=None, db_lock=None, ovim=None):
54 """Init a thread.
55 Arguments:
56 'id' number of thead
57 'name' name of thread
58 'host','user': host ip or name to manage and user
59 'db', 'db_lock': database class and lock to use it in exclusion
60 """
61 self.tasksResult = {}
62 """ It will contain a dictionary with
63 task_id:
64 status: enqueued,done,error,deleted,processing
65 result: VIM result,
66 """
67 threading.Thread.__init__(self)
68 self.vim = vimconn
69 self.datacenter_name = datacenter_name
70 self.datacenter_tenant_id = datacenter_tenant_id
71 self.ovim = ovim
72 if not name:
73 self.name = vimconn["id"] + "." + vimconn["config"]["datacenter_tenant_id"]
74 else:
75 self.name = name
76
77 self.logger = logging.getLogger('openmano.vim.'+self.name)
78 self.db = db
79 self.db_lock = db_lock
80
81 self.task_lock = task_lock
82 self.task_queue = Queue.Queue(2000)
83 self.refresh_list = []
84 """Contains time ordered task list for refreshing the status of VIM VMs and nets"""
85
86 def _refres_elements(self):
87 """Call VIM to get VMs and networks status until 10 elements"""
88 now = time.time()
89 vm_to_refresh_list = []
90 net_to_refresh_list = []
91 vm_to_refresh_dict = {}
92 net_to_refresh_dict = {}
93 items_to_refresh = 0
94 while self.refresh_list:
95 task = self.refresh_list[0]
96 with self.task_lock:
97 if task['status'] == 'deleted':
98 self.refresh_list.pop(0)
99 continue
100 if task['time'] > now:
101 break
102 task["status"] = "processing"
103 self.refresh_list.pop(0)
104 if task["name"] == 'get-vm':
105 vm_to_refresh_list.append(task["vim_id"])
106 vm_to_refresh_dict[task["vim_id"]] = task
107 elif task["name"] == 'get-net':
108 net_to_refresh_list.append(task["vim_id"])
109 net_to_refresh_dict[task["vim_id"]] = task
110 else:
111 error_text = "unknown task {}".format(task["name"])
112 self.logger.error(error_text)
113 items_to_refresh += 1
114 if items_to_refresh == 10:
115 break
116
117 if vm_to_refresh_list:
118 try:
119 vim_dict = self.vim.refresh_vms_status(vm_to_refresh_list)
120 for vim_id, vim_info in vim_dict.items():
121 #look for task
122 task = vm_to_refresh_dict[vim_id]
123 self.logger.debug("get-vm vm_id=%s result=%s", task["vim_id"], str(vim_info))
124
125 # update database
126 if vim_info.get("error_msg"):
127 vim_info["error_msg"] = self._format_vim_error_msg(vim_info["error_msg"])
128 if task["vim_info"].get("status") != vim_info["status"] or \
129 task["vim_info"].get("error_msg") != vim_info.get("error_msg") or \
130 task["vim_info"].get("vim_info") != vim_info["vim_info"]:
131 with self.db_lock:
132 temp_dict = {"status": vim_info["status"],
133 "error_msg": vim_info.get("error_msg"),
134 "vim_info": vim_info["vim_info"]}
135 self.db.update_rows('instance_vms', UPDATE=temp_dict, WHERE={"vim_vm_id": vim_id})
136 for interface in vim_info["interfaces"]:
137 for task_interface in task["vim_info"]["interfaces"]:
138 if task_interface["vim_net_id"] == interface["vim_net_id"]:
139 break
140 else:
141 task_interface = {"vim_net_id": interface["vim_net_id"]}
142 task["vim_info"]["interfaces"].append(task_interface)
143 if task_interface != interface:
144 #delete old port
145 if task_interface.get("sdn_port_id"):
146 try:
147 with self.db_lock:
148 self.ovim.delete_port(task_interface["sdn_port_id"])
149 task_interface["sdn_port_id"] = None
150 except ovimException as e:
151 self.logger.error("ovimException deleting external_port={} ".format(
152 task_interface["sdn_port_id"]) + str(e), exc_info=True)
153 # TODO Set error_msg at instance_nets
154 vim_net_id = interface["vim_net_id"]
155 sdn_net_id = None
156 sdn_port_name = None
157 with self.db_lock:
158 where_= {'iv.vim_vm_id': vim_id, "ine.vim_net_id": vim_net_id,
159 'ine.datacenter_tenant_id': self.datacenter_tenant_id}
160 # TODO check why vim_interface_id is not present at database
161 # if interface.get("vim_interface_id"):
162 # where_["vim_interface_id"] = interface["vim_interface_id"]
163 db_ifaces = self.db.get_rows(
164 FROM="instance_interfaces as ii left join instance_nets as ine on "
165 "ii.instance_net_id=ine.uuid left join instance_vms as iv on "
166 "ii.instance_vm_id=iv.uuid",
167 SELECT=("ii.uuid as iface_id", "ine.uuid as net_id", "iv.uuid as vm_id", "sdn_net_id", "vim_net_id"),
168 WHERE=where_)
169 if len(db_ifaces)>1:
170 self.logger.critical("Refresing interfaces. "
171 "Found more than one interface at database for '{}'".format(where_))
172 elif len(db_ifaces)==0:
173 self.logger.critical("Refresing interfaces. "
174 "Not found any interface at database for '{}'".format(where_))
175 continue
176 else:
177 db_iface = db_ifaces[0]
178 # If there is no sdn_net_id, check if it is because an already created vim network is being used
179 # in that case, the sdn_net_id will be in that entry of the instance_nets table
180 if not db_iface.get("sdn_net_id"):
181 with self.db_lock:
182 result = self.db.get_rows(
183 SELECT=('sdn_net_id',), FROM='instance_nets',
184 WHERE={'vim_net_id': db_iface.get("vim_net_id"),
185 'instance_scenario_id': None,
186 'datacenter_tenant_id': self.datacenter_tenant_id})
187 if len(result) == 1:
188 db_iface["sdn_net_id"] = result[0]['sdn_net_id']
189
190 if db_iface.get("sdn_net_id") and interface.get("compute_node") and interface.get("pci"):
191 sdn_net_id = db_iface["sdn_net_id"]
192 sdn_port_name = sdn_net_id + "." + db_iface["vm_id"]
193 sdn_port_name = sdn_port_name[:63]
194 try:
195 with self.db_lock:
196 sdn_port_id = self.ovim.new_external_port(
197 {"compute_node": interface["compute_node"],
198 "pci": interface["pci"],
199 "vlan": interface.get("vlan"),
200 "net_id": sdn_net_id,
201 "region": self.vim["config"]["datacenter_id"],
202 "name": sdn_port_name,
203 "mac": interface.get("mac_address")})
204 interface["sdn_port_id"] = sdn_port_id
205 except (ovimException, Exception) as e:
206 self.logger.error(
207 "ovimException creating new_external_port compute_node={} " \
208 "pci={} vlan={} ".format(
209 interface["compute_node"],
210 interface["pci"],
211 interface.get("vlan")) + str(e),
212 exc_info=True)
213 # TODO Set error_msg at instance_nets
214 with self.db_lock:
215 vim_net_id = interface.pop("vim_net_id")
216 self.db.update_rows('instance_interfaces', UPDATE=interface,
217 WHERE={'uuid': db_iface["iface_id"]})
218 interface["vim_net_id"] = vim_net_id
219 # TODO insert instance_id
220
221 task["vim_info"] = vim_info
222 if task["vim_info"]["status"] == "BUILD":
223 self._insert_refresh(task, now + self.REFRESH_BUILD)
224 else:
225 self._insert_refresh(task, now + self.REFRESH_ACTIVE)
226 except vimconn.vimconnException as e:
227 self.logger.error("vimconnException Exception when trying to refresh vms " + str(e))
228 self._insert_refresh(task, now + self.REFRESH_ACTIVE)
229
230 if net_to_refresh_list:
231 try:
232 vim_dict = self.vim.refresh_nets_status(net_to_refresh_list)
233 for vim_id, vim_info in vim_dict.items():
234 #look for task
235 task = net_to_refresh_dict[vim_id]
236 self.logger.debug("get-net net_id=%s result=%s", task["vim_id"], str(vim_info))
237
238 #get database info
239 where_ = {"vim_net_id": vim_id, 'datacenter_tenant_id': self.datacenter_tenant_id}
240 with self.db_lock:
241 db_nets = self.db.get_rows(
242 FROM="instance_nets",
243 SELECT=("uuid as net_id", "sdn_net_id"),
244 WHERE=where_)
245 if len(db_nets) > 1:
246 self.logger.critical("Refresing networks. "
247 "Found more than one instance-networks at database for '{}'".format(where_))
248 elif len(db_nets) == 0:
249 self.logger.critical("Refresing networks. "
250 "Not found any instance-network at database for '{}'".format(where_))
251 continue
252 else:
253 db_net = db_nets[0]
254 if db_net.get("sdn_net_id"):
255 # get ovim status
256 try:
257 with self.db_lock:
258 sdn_net = self.ovim.show_network(db_net["sdn_net_id"])
259 if sdn_net["status"] == "ERROR":
260 if not vim_info.get("error_msg"):
261 vim_info["error_msg"] = sdn_net["error_msg"]
262 else:
263 vim_info["error_msg"] = "VIM_ERROR: {} && SDN_ERROR: {}".format(
264 self._format_vim_error_msg(vim_info["error_msg"], 1024//2-14),
265 self._format_vim_error_msg(sdn_net["error_msg"], 1024//2-14))
266 if vim_info["status"] == "VIM_ERROR":
267 vim_info["status"] = "VIM_SDN_ERROR"
268 else:
269 vim_info["status"] = "SDN_ERROR"
270
271 except (ovimException, Exception) as e:
272 self.logger.error(
273 "ovimException getting network infor snd_net_id={}".format(db_net["sdn_net_id"]),
274 exc_info=True)
275 # TODO Set error_msg at instance_nets
276
277 # update database
278 if vim_info.get("error_msg"):
279 vim_info["error_msg"] = self._format_vim_error_msg(vim_info["error_msg"])
280 if task["vim_info"].get("status") != vim_info["status"] or \
281 task["vim_info"].get("error_msg") != vim_info.get("error_msg") or \
282 task["vim_info"].get("vim_info") != vim_info["vim_info"]:
283 with self.db_lock:
284 temp_dict = {"status": vim_info["status"],
285 "error_msg": vim_info.get("error_msg"),
286 "vim_info": vim_info["vim_info"]}
287 self.db.update_rows('instance_nets', UPDATE=temp_dict, WHERE={"vim_net_id": vim_id})
288
289 task["vim_info"] = vim_info
290 if task["vim_info"]["status"] == "BUILD":
291 self._insert_refresh(task, now + self.REFRESH_BUILD)
292 else:
293 self._insert_refresh(task, now + self.REFRESH_ACTIVE)
294 except vimconn.vimconnException as e:
295 self.logger.error("vimconnException Exception when trying to refresh nets " + str(e))
296 self._insert_refresh(task, now + self.REFRESH_ACTIVE)
297
298 if not items_to_refresh:
299 time.sleep(1)
300
301 def _insert_refresh(self, task, threshold_time):
302 """Insert a task at list of refreshing elements. The refreshing list is ordered by threshold_time (task['time']
303 It is assumed that this is called inside this thread
304 """
305 task["time"] = threshold_time
306 for index in range(0, len(self.refresh_list)):
307 if self.refresh_list[index]["time"] > threshold_time:
308 self.refresh_list.insert(index, task)
309 break
310 else:
311 index = len(self.refresh_list)
312 self.refresh_list.append(task)
313 self.logger.debug("new refresh task={} name={}, time={} index={}".format(
314 task["id"], task["name"], task["time"], index))
315
316 def _remove_refresh(self, task_name, vim_id):
317 """Remove a task with this name and vim_id from the list of refreshing elements.
318 It is assumed that this is called inside this thread outside _refres_elements method
319 Return True if self.refresh_list is modified, task is found
320 Return False if not found
321 """
322 index_to_delete = None
323 for index in range(0, len(self.refresh_list)):
324 if self.refresh_list[index]["name"] == task_name and self.refresh_list[index]["vim_id"] == vim_id:
325 index_to_delete = index
326 break
327 else:
328 return False
329 if index_to_delete != None:
330 del self.refresh_list[index_to_delete]
331 return True
332
333 def insert_task(self, task):
334 try:
335 self.task_queue.put(task, False)
336 return task["id"]
337 except Queue.Full:
338 raise vimconn.vimconnException(self.name + ": timeout inserting a task")
339
340 def del_task(self, task):
341 with self.task_lock:
342 if task["status"] == "enqueued":
343 task["status"] == "deleted"
344 return True
345 else: # task["status"] == "processing"
346 self.task_lock.release()
347 return False
348
349 def run(self):
350 self.logger.debug("Starting")
351 while True:
352 #TODO reload service
353 while True:
354 try:
355 if not self.task_queue.empty():
356 task = self.task_queue.get()
357 self.task_lock.acquire()
358 if task["status"] == "deleted":
359 self.task_lock.release()
360 continue
361 task["status"] = "processing"
362 self.task_lock.release()
363 else:
364 self._refres_elements()
365 continue
366 self.logger.debug("processing task id={} name={} params={}".format(task["id"], task["name"],
367 str(task["params"])))
368 if task["name"] == 'exit' or task["name"] == 'reload':
369 result, content = self.terminate(task)
370 elif task["name"] == 'new-vm':
371 result, content = self.new_vm(task)
372 elif task["name"] == 'del-vm':
373 result, content = self.del_vm(task)
374 elif task["name"] == 'new-net':
375 result, content = self.new_net(task)
376 elif task["name"] == 'del-net':
377 result, content = self.del_net(task)
378 else:
379 error_text = "unknown task {}".format(task["name"])
380 self.logger.error(error_text)
381 result = False
382 content = error_text
383 self.logger.debug("task id={} name={} result={}:{} params={}".format(task["id"], task["name"],
384 result, content,
385 str(task["params"])))
386
387 with self.task_lock:
388 task["status"] = "done" if result else "error"
389 task["result"] = content
390 self.task_queue.task_done()
391
392 if task["name"] == 'exit':
393 return 0
394 elif task["name"] == 'reload':
395 break
396 except Exception as e:
397 self.logger.critical("Unexpected exception at run: " + str(e), exc_info=True)
398
399 self.logger.debug("Finishing")
400
401 def terminate(self, task):
402 return True, None
403
404 def _format_vim_error_msg(self, error_text, max_length=1024):
405 if error_text and len(error_text) >= max_length:
406 return error_text[:max_length//2-3] + " ... " + error_text[-max_length//2+3:]
407 return error_text
408
409 def new_net(self, task):
410 try:
411 task_id = task["id"]
412 params = task["params"]
413 net_id = self.vim.new_network(*params)
414
415 net_name = params[0]
416 net_type = params[1]
417
418 network = None
419 sdn_net_id = None
420 sdn_controller = self.vim.config.get('sdn-controller')
421 if sdn_controller and (net_type == "data" or net_type == "ptp"):
422 network = {"name": net_name, "type": net_type, "region": self.vim["config"]["datacenter_id"]}
423
424 vim_net = self.vim.get_network(net_id)
425 if vim_net.get('encapsulation') != 'vlan':
426 raise vimconn.vimconnException(
427 "net '{}' defined as type '{}' has not vlan encapsulation '{}'".format(
428 net_name, net_type, vim_net['encapsulation']))
429 network["vlan"] = vim_net.get('segmentation_id')
430 try:
431 with self.db_lock:
432 sdn_net_id = self.ovim.new_network(network)
433 except (ovimException, Exception) as e:
434 self.logger.error("task=%s cannot create SDN network vim_net_id=%s input='%s' ovimException='%s'",
435 str(task_id), net_id, str(network), str(e))
436 with self.db_lock:
437 self.db.update_rows("instance_nets", UPDATE={"vim_net_id": net_id, "sdn_net_id": sdn_net_id},
438 WHERE={"vim_net_id": task_id})
439 new_refresh_task = {"status": "enqueued",
440 "id": task_id,
441 "name": "get-net",
442 "vim_id": net_id,
443 "vim_info": {} }
444 self._insert_refresh(new_refresh_task, time.time())
445 return True, net_id
446 except db_base_Exception as e:
447 self.logger.error("Error updating database %s", str(e))
448 return True, net_id
449 except vimconn.vimconnException as e:
450 self.logger.error("Error creating NET, task=%s: %s", str(task_id), str(e))
451 try:
452 with self.db_lock:
453 self.db.update_rows("instance_nets",
454 UPDATE={"error_msg": self._format_vim_error_msg(str(e)), "status": "VIM_ERROR"},
455 WHERE={"vim_net_id": task_id})
456 except db_base_Exception as e:
457 self.logger.error("Error updating database %s", str(e))
458 return False, str(e)
459 #except ovimException as e:
460 # self.logger.error("Error creating NET in ovim, task=%s: %s", str(task_id), str(e))
461 # return False, str(e)
462
463 def new_vm(self, task):
464 try:
465 params = task["params"]
466 task_id = task["id"]
467 depends = task.get("depends")
468 net_list = params[5]
469 error_text = ""
470 for net in net_list:
471 if "net_id" in net and is_task_id(net["net_id"]): # change task_id into network_id
472 try:
473 task_net = depends[net["net_id"]]
474 with self.task_lock:
475 if task_net["status"] == "error":
476 error_text = "Cannot create VM because depends on a network that cannot be created: " +\
477 str(task_net["result"])
478 break
479 elif task_net["status"] == "enqueued" or task_net["status"] == "processing":
480 error_text = "Cannot create VM because depends on a network still not created"
481 break
482 network_id = task_net["result"]
483 net["net_id"] = network_id
484 except Exception as e:
485 error_text = "Error trying to map from task_id={} to task result: {}".format(
486 net["net_id"],str(e))
487 break
488 if not error_text:
489 vm_id = self.vim.new_vminstance(*params)
490 try:
491 with self.db_lock:
492 if error_text:
493 update = self.db.update_rows("instance_vms",
494 UPDATE={"status": "VIM_ERROR", "error_msg": error_text},
495 WHERE={"vim_vm_id": task_id})
496 else:
497 update = self.db.update_rows("instance_vms", UPDATE={"vim_vm_id": vm_id}, WHERE={"vim_vm_id": task_id})
498 if not update:
499 self.logger.error("task id={} name={} database not updated vim_vm_id={}".format(
500 task["id"], task["name"], vm_id))
501 except db_base_Exception as e:
502 self.logger.error("Error updating database %s", str(e))
503 if error_text:
504 return False, error_text
505 new_refresh_task = {"status": "enqueued",
506 "id": task_id,
507 "name": "get-vm",
508 "vim_id": vm_id,
509 "vim_info": {"interfaces":[]} }
510 self._insert_refresh(new_refresh_task, time.time())
511 return True, vm_id
512 except vimconn.vimconnException as e:
513 self.logger.error("Error creating VM, task=%s: %s", str(task_id), str(e))
514 try:
515 with self.db_lock:
516 self.db.update_rows("instance_vms",
517 UPDATE={"error_msg": self._format_vim_error_msg(str(e)), "status": "VIM_ERROR"},
518 WHERE={"vim_vm_id": task_id})
519 except db_base_Exception as edb:
520 self.logger.error("Error updating database %s", str(edb))
521 return False, str(e)
522
523 def del_vm(self, task):
524 vm_id = task["params"][0]
525 interfaces = task["params"][1]
526 if is_task_id(vm_id):
527 try:
528 task_create = task["depends"][vm_id]
529 with self.task_lock:
530 if task_create["status"] == "error":
531 return True, "VM was not created. It has error: " + str(task_create["result"])
532 elif task_create["status"] == "enqueued" or task_create["status"] == "processing":
533 return False, "Cannot delete VM vim_id={} because still creating".format(vm_id)
534 vm_id = task_create["result"]
535 except Exception as e:
536 return False, "Error trying to get task_id='{}':".format(vm_id, str(e))
537 try:
538 self._remove_refresh("get-vm", vm_id)
539 for iface in interfaces:
540 if iface.get("sdn_port_id"):
541 try:
542 with self.db_lock:
543 self.ovim.delete_port(iface["sdn_port_id"])
544 except ovimException as e:
545 self.logger.error("ovimException deleting external_port={} at VM vim_id={} deletion ".format(
546 iface["sdn_port_id"], vm_id) + str(e), exc_info=True)
547 # TODO Set error_msg at instance_nets
548
549 return True, self.vim.delete_vminstance(vm_id)
550 except vimconn.vimconnException as e:
551 return False, str(e)
552
553 def del_net(self, task):
554 net_id = task["params"][0]
555 sdn_net_id = task["params"][1]
556 if is_task_id(net_id):
557 try:
558 task_create = task["depends"][net_id]
559 with self.task_lock:
560 if task_create["status"] == "error":
561 return True, "net was not created. It has error: " + str(task_create["result"])
562 elif task_create["status"] == "enqueued" or task_create["status"] == "processing":
563 return False, "Cannot delete net because still creating"
564 net_id = task_create["result"]
565 except Exception as e:
566 return False, "Error trying to get task_id='{}':".format(net_id, str(e))
567 try:
568 self._remove_refresh("get-net", net_id)
569 result = self.vim.delete_network(net_id)
570 if sdn_net_id:
571 # Delete any attached port to this sdn network
572 # At this point, there will be ports associated to this network in case it was manually done using 'openmano vim-net-sdn-attach'
573 try:
574 with self.db_lock:
575 port_list = self.ovim.get_ports(columns={'uuid'}, filter={'name': 'external_port', 'net_id': sdn_net_id})
576 except ovimException as e:
577 raise vimconn.vimconnException(
578 "ovimException obtaining external ports for net {}. ".format(sdn_net_id) + str(e))
579
580 for port in port_list:
581 try:
582 with self.db_lock:
583 self.ovim.delete_port(port['uuid'])
584 except ovimException as e:
585 raise vimconn.vimconnException(
586 "ovimException deleting port {} for net {}. ".format(port['uuid'], sdn_net_id) + str(e))
587 with self.db_lock:
588 self.ovim.delete_network(sdn_net_id)
589 return True, result
590 except vimconn.vimconnException as e:
591 return False, str(e)
592 except ovimException as e:
593 logging.error("Error deleting network from ovim. net_id: {}, sdn_net_id: {}".format(net_id, sdn_net_id))
594 return False, str(e)
595
596