1.Created new file for VIO connector as vimconn_vio.py that extends the existing...
[osm/RO.git] / osm_ro / vim_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
5 # This file is part of openvim
6 # All Rights Reserved.
7 #
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
11 #
12 # http://www.apache.org/licenses/LICENSE-2.0
13 #
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
18 # under the License.
19 #
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
22 ##
23
24 """"
25 This is thread that interact with the host and the libvirt to manage VM
26 One thread will be launched per host
27 """
28
29 import threading
30 import time
31 import Queue
32 import logging
33 import vimconn
34 from db_base import db_base_Exception
35 from lib_osm_openvim.ovim import ovimException
36
37 __author__ = "Alfonso Tierno, Pablo Montes"
38 __date__ = "$10-feb-2017 12:07:15$"
39
40 # from logging import Logger
41 # import auxiliary_functions as af
42
43
44 def is_task_id(task_id):
45 return True if task_id[:5] == "TASK." else False
46
47
48 class vim_thread(threading.Thread):
49 REFRESH_BUILD = 5 # 5 seconds
50 REFRESH_ACTIVE = 60 # 1 minute
51
52 def __init__(self, vimconn, task_lock, name=None, datacenter_name=None, datacenter_tenant_id=None,
53 db=None, db_lock=None, ovim=None):
54 """Init a thread.
55 Arguments:
56 'id' number of thead
57 'name' name of thread
58 'host','user': host ip or name to manage and user
59 'db', 'db_lock': database class and lock to use it in exclusion
60 """
61 self.tasksResult = {}
62 """ It will contain a dictionary with
63 task_id:
64 status: enqueued,done,error,deleted,processing
65 result: VIM result,
66 """
67 threading.Thread.__init__(self)
68 self.vim = vimconn
69 self.datacenter_name = datacenter_name
70 self.datacenter_tenant_id = datacenter_tenant_id
71 self.ovim = ovim
72 if not name:
73 self.name = vimconn["id"] + "." + vimconn["config"]["datacenter_tenant_id"]
74 else:
75 self.name = name
76
77 self.logger = logging.getLogger('openmano.vim.'+self.name)
78 self.db = db
79 self.db_lock = db_lock
80
81 self.task_lock = task_lock
82 self.task_queue = Queue.Queue(2000)
83 self.refresh_list = []
84 """Contains time ordered task list for refreshing the status of VIM VMs and nets"""
85
86 def _refres_elements(self):
87 """Call VIM to get VMs and networks status until 10 elements"""
88 now = time.time()
89 vm_to_refresh_list = []
90 net_to_refresh_list = []
91 vm_to_refresh_dict = {}
92 net_to_refresh_dict = {}
93 items_to_refresh = 0
94 while self.refresh_list:
95 task = self.refresh_list[0]
96 with self.task_lock:
97 if task['status'] == 'deleted':
98 self.refresh_list.pop(0)
99 continue
100 if task['time'] > now:
101 break
102 task["status"] = "processing"
103 self.refresh_list.pop(0)
104 if task["name"] == 'get-vm':
105 vm_to_refresh_list.append(task["vim_id"])
106 vm_to_refresh_dict[task["vim_id"]] = task
107 elif task["name"] == 'get-net':
108 net_to_refresh_list.append(task["vim_id"])
109 net_to_refresh_dict[task["vim_id"]] = task
110 else:
111 error_text = "unknown task {}".format(task["name"])
112 self.logger.error(error_text)
113 items_to_refresh += 1
114 if items_to_refresh == 10:
115 break
116
117 if vm_to_refresh_list:
118 try:
119 vim_dict = self.vim.refresh_vms_status(vm_to_refresh_list)
120 for vim_id, vim_info in vim_dict.items():
121 #look for task
122 task = vm_to_refresh_dict[vim_id]
123 self.logger.debug("get-vm vm_id=%s result=%s", task["vim_id"], str(vim_info))
124
125 # update database
126 if vim_info.get("error_msg"):
127 vim_info["error_msg"] = self._format_vim_error_msg(vim_info["error_msg"])
128 if task["vim_info"].get("status") != vim_info["status"] or \
129 task["vim_info"].get("error_msg") != vim_info.get("error_msg") or \
130 task["vim_info"].get("vim_info") != vim_info["vim_info"]:
131 with self.db_lock:
132 temp_dict = {"status": vim_info["status"],
133 "error_msg": vim_info.get("error_msg"),
134 "vim_info": vim_info["vim_info"]}
135 self.db.update_rows('instance_vms', UPDATE=temp_dict, WHERE={"vim_vm_id": vim_id})
136 for interface in vim_info["interfaces"]:
137 for task_interface in task["vim_info"]["interfaces"]:
138 if task_interface["vim_net_id"] == interface["vim_net_id"]:
139 break
140 else:
141 task_interface = {"vim_net_id": interface["vim_net_id"]}
142 task["vim_info"]["interfaces"].append(task_interface)
143 if task_interface != interface:
144 #delete old port
145 if task_interface.get("sdn_port_id"):
146 try:
147 self.ovim.delete_port(task_interface["sdn_port_id"])
148 task_interface["sdn_port_id"] = None
149 except ovimException as e:
150 self.logger.error("ovimException deleting external_port={} ".format(
151 task_interface["sdn_port_id"]) + str(e), exc_info=True)
152 # TODO Set error_msg at instance_nets
153 vim_net_id = interface["vim_net_id"]
154 sdn_net_id = None
155 sdn_port_name = None
156 with self.db_lock:
157 where_= {'iv.vim_vm_id': vim_id, "ine.vim_net_id": vim_net_id,
158 'ine.datacenter_tenant_id': self.datacenter_tenant_id}
159 # TODO check why vim_interface_id is not present at database
160 # if interface.get("vim_interface_id"):
161 # where_["vim_interface_id"] = interface["vim_interface_id"]
162 db_ifaces = self.db.get_rows(
163 FROM="instance_interfaces as ii left join instance_nets as ine on "
164 "ii.instance_net_id=ine.uuid left join instance_vms as iv on "
165 "ii.instance_vm_id=iv.uuid",
166 SELECT=("ii.uuid as iface_id", "ine.uuid as net_id", "iv.uuid as vm_id", "sdn_net_id", "vim_net_id"),
167 WHERE=where_)
168 if len(db_ifaces)>1:
169 self.logger.critical("Refresing interfaces. "
170 "Found more than one interface at database for '{}'".format(where_))
171 elif len(db_ifaces)==0:
172 self.logger.critical("Refresing interfaces. "
173 "Not found any interface at database for '{}'".format(where_))
174 continue
175 else:
176 db_iface = db_ifaces[0]
177 #If there is no sdn_net_id, check if it is because an already created vim network is being used
178 #in that case, the sdn_net_id will be in that entry of the instance_nets table
179 if not db_iface.get("sdn_net_id"):
180 result = self.db.get_rows(SELECT=('sdn_net_id',), FROM='instance_nets',
181 WHERE={'vim_net_id': db_iface.get("vim_net_id"), 'instance_scenario_id': None, "datacenter_tenant_id": self.datacenter_tenant_id})
182 if len(result) == 1:
183 db_iface["sdn_net_id"] = result[0]['sdn_net_id']
184
185 if db_iface.get("sdn_net_id") and interface.get("compute_node") and interface.get("pci"):
186 sdn_net_id = db_iface["sdn_net_id"]
187 sdn_port_name = sdn_net_id + "." + db_iface["vm_id"]
188 sdn_port_name = sdn_port_name[:63]
189 try:
190 sdn_port_id = self.ovim.new_external_port(
191 {"compute_node": interface["compute_node"],
192 "pci": interface["pci"],
193 "vlan": interface.get("vlan"),
194 "net_id": sdn_net_id,
195 "region": self.vim["config"]["datacenter_id"],
196 "name": sdn_port_name,
197 "mac": interface.get("mac_address")})
198 interface["sdn_port_id"] = sdn_port_id
199 except (ovimException, Exception) as e:
200 self.logger.error(
201 "ovimException creating new_external_port compute_node={} " \
202 "pci={} vlan={} ".format(
203 interface["compute_node"],
204 interface["pci"],
205 interface.get("vlan")) + str(e),
206 exc_info=True)
207 # TODO Set error_msg at instance_nets
208 with self.db_lock:
209 vim_net_id = interface.pop("vim_net_id")
210 self.db.update_rows('instance_interfaces', UPDATE=interface,
211 WHERE={'uuid': db_iface["iface_id"]})
212 interface["vim_net_id"] = vim_net_id
213 # TODO insert instance_id
214
215 task["vim_info"] = vim_info
216 if task["vim_info"]["status"] == "BUILD":
217 self._insert_refresh(task, now + self.REFRESH_BUILD)
218 else:
219 self._insert_refresh(task, now + self.REFRESH_ACTIVE)
220 except vimconn.vimconnException as e:
221 self.logger.error("vimconnException Exception when trying to refresh vms " + str(e))
222 self._insert_refresh(task, now + self.REFRESH_ACTIVE)
223
224 if net_to_refresh_list:
225 try:
226 vim_dict = self.vim.refresh_nets_status(net_to_refresh_list)
227 for vim_id, vim_info in vim_dict.items():
228 #look for task
229 task = net_to_refresh_dict[vim_id]
230 self.logger.debug("get-net net_id=%s result=%s", task["vim_id"], str(vim_info))
231
232 #get database info
233 where_ = {"vim_net_id": vim_id, 'datacenter_tenant_id': self.datacenter_tenant_id}
234 with self.db_lock:
235 db_nets = self.db.get_rows(
236 FROM="instance_nets",
237 SELECT=("uuid as net_id", "sdn_net_id"),
238 WHERE=where_)
239 if len(db_nets) > 1:
240 self.logger.critical("Refresing networks. "
241 "Found more than one instance-networks at database for '{}'".format(where_))
242 elif len(db_nets) == 0:
243 self.logger.critical("Refresing networks. "
244 "Not found any instance-network at database for '{}'".format(where_))
245 continue
246 else:
247 db_net = db_nets[0]
248 if db_net.get("sdn_net_id"):
249 # get ovim status
250 try:
251 sdn_net = self.ovim.show_network(db_net["sdn_net_id"])
252 if sdn_net["status"] == "ERROR":
253 if not vim_info.get("error_msg"):
254 vim_info["error_msg"] = sdn_net["error_msg"]
255 else:
256 vim_info["error_msg"] = "VIM_ERROR: {} && SDN_ERROR: {}".format(
257 self._format_vim_error_msg(vim_info["error_msg"], 1024//2-14),
258 self._format_vim_error_msg(sdn_net["error_msg"], 1024//2-14))
259 if vim_info["status"] == "VIM_ERROR":
260 vim_info["status"] = "VIM_SDN_ERROR"
261 else:
262 vim_info["status"] = "SDN_ERROR"
263
264 except (ovimException, Exception) as e:
265 self.logger.error(
266 "ovimException getting network infor snd_net_id={}".format(db_net["sdn_net_id"]),
267 exc_info=True)
268 # TODO Set error_msg at instance_nets
269
270 # update database
271 if vim_info.get("error_msg"):
272 vim_info["error_msg"] = self._format_vim_error_msg(vim_info["error_msg"])
273 if task["vim_info"].get("status") != vim_info["status"] or \
274 task["vim_info"].get("error_msg") != vim_info.get("error_msg") or \
275 task["vim_info"].get("vim_info") != vim_info["vim_info"]:
276 with self.db_lock:
277 temp_dict = {"status": vim_info["status"],
278 "error_msg": vim_info.get("error_msg"),
279 "vim_info": vim_info["vim_info"]}
280 self.db.update_rows('instance_nets', UPDATE=temp_dict, WHERE={"vim_net_id": vim_id})
281
282 task["vim_info"] = vim_info
283 if task["vim_info"]["status"] == "BUILD":
284 self._insert_refresh(task, now + self.REFRESH_BUILD)
285 else:
286 self._insert_refresh(task, now + self.REFRESH_ACTIVE)
287 except vimconn.vimconnException as e:
288 self.logger.error("vimconnException Exception when trying to refresh nets " + str(e))
289 self._insert_refresh(task, now + self.REFRESH_ACTIVE)
290
291 if not items_to_refresh:
292 time.sleep(1)
293
294 def _insert_refresh(self, task, threshold_time):
295 """Insert a task at list of refreshing elements. The refreshing list is ordered by threshold_time (task['time']
296 It is assumed that this is called inside this thread
297 """
298 task["time"] = threshold_time
299 for index in range(0, len(self.refresh_list)):
300 if self.refresh_list[index]["time"] > threshold_time:
301 self.refresh_list.insert(index, task)
302 break
303 else:
304 index = len(self.refresh_list)
305 self.refresh_list.append(task)
306 self.logger.debug("new refresh task={} name={}, time={} index={}".format(
307 task["id"], task["name"], task["time"], index))
308
309 def _remove_refresh(self, task_name, vim_id):
310 """Remove a task with this name and vim_id from the list of refreshing elements.
311 It is assumed that this is called inside this thread outside _refres_elements method
312 Return True if self.refresh_list is modified, task is found
313 Return False if not found
314 """
315 index_to_delete = None
316 for index in range(0, len(self.refresh_list)):
317 if self.refresh_list[index]["name"] == task_name and self.refresh_list[index]["vim_id"] == vim_id:
318 index_to_delete = index
319 break
320 else:
321 return False
322 if index_to_delete != None:
323 del self.refresh_list[index_to_delete]
324 return True
325
326 def insert_task(self, task):
327 try:
328 self.task_queue.put(task, False)
329 return task["id"]
330 except Queue.Full:
331 raise vimconn.vimconnException(self.name + ": timeout inserting a task")
332
333 def del_task(self, task):
334 with self.task_lock:
335 if task["status"] == "enqueued":
336 task["status"] == "deleted"
337 return True
338 else: # task["status"] == "processing"
339 self.task_lock.release()
340 return False
341
342 def run(self):
343 self.logger.debug("Starting")
344 while True:
345 #TODO reload service
346 while True:
347 try:
348 if not self.task_queue.empty():
349 task = self.task_queue.get()
350 self.task_lock.acquire()
351 if task["status"] == "deleted":
352 self.task_lock.release()
353 continue
354 task["status"] = "processing"
355 self.task_lock.release()
356 else:
357 self._refres_elements()
358 continue
359 self.logger.debug("processing task id={} name={} params={}".format(task["id"], task["name"],
360 str(task["params"])))
361 if task["name"] == 'exit' or task["name"] == 'reload':
362 result, content = self.terminate(task)
363 elif task["name"] == 'new-vm':
364 result, content = self.new_vm(task)
365 elif task["name"] == 'del-vm':
366 result, content = self.del_vm(task)
367 elif task["name"] == 'new-net':
368 result, content = self.new_net(task)
369 elif task["name"] == 'del-net':
370 result, content = self.del_net(task)
371 else:
372 error_text = "unknown task {}".format(task["name"])
373 self.logger.error(error_text)
374 result = False
375 content = error_text
376 self.logger.debug("task id={} name={} result={}:{} params={}".format(task["id"], task["name"],
377 result, content,
378 str(task["params"])))
379
380 with self.task_lock:
381 task["status"] = "done" if result else "error"
382 task["result"] = content
383 self.task_queue.task_done()
384
385 if task["name"] == 'exit':
386 return 0
387 elif task["name"] == 'reload':
388 break
389 except Exception as e:
390 self.logger.critical("Unexpected exception at run: " + str(e), exc_info=True)
391
392 self.logger.debug("Finishing")
393
394 def terminate(self, task):
395 return True, None
396
397 def _format_vim_error_msg(self, error_text, max_length=1024):
398 if error_text and len(error_text) >= max_length:
399 return error_text[:max_length//2-3] + " ... " + error_text[-max_length//2+3:]
400 return error_text
401
402 def new_net(self, task):
403 try:
404 task_id = task["id"]
405 params = task["params"]
406 net_id = self.vim.new_network(*params)
407
408 net_name = params[0]
409 net_type = params[1]
410
411 network = None
412 sdn_net_id = None
413 sdn_controller = self.vim.config.get('sdn-controller')
414 if sdn_controller and (net_type == "data" or net_type == "ptp"):
415 network = {"name": net_name, "type": net_type, "region": self.vim["config"]["datacenter_id"]}
416
417 vim_net = self.vim.get_network(net_id)
418 if vim_net.get('encapsulation') != 'vlan':
419 raise vimconn.vimconnException(
420 "net '{}' defined as type '{}' has not vlan encapsulation '{}'".format(
421 net_name, net_type, vim_net['encapsulation']))
422 network["vlan"] = vim_net.get('segmentation_id')
423 try:
424 sdn_net_id = self.ovim.new_network(network)
425 except (ovimException, Exception) as e:
426 self.logger.error("task=%s cannot create SDN network vim_net_id=%s input='%s' ovimException='%s'",
427 str(task_id), net_id, str(network), str(e))
428 with self.db_lock:
429 self.db.update_rows("instance_nets", UPDATE={"vim_net_id": net_id, "sdn_net_id": sdn_net_id},
430 WHERE={"vim_net_id": task_id})
431 new_refresh_task = {"status": "enqueued",
432 "id": task_id,
433 "name": "get-net",
434 "vim_id": net_id,
435 "vim_info": {} }
436 self._insert_refresh(new_refresh_task, time.time())
437 return True, net_id
438 except db_base_Exception as e:
439 self.logger.error("Error updating database %s", str(e))
440 return True, net_id
441 except vimconn.vimconnException as e:
442 self.logger.error("Error creating NET, task=%s: %s", str(task_id), str(e))
443 try:
444 with self.db_lock:
445 self.db.update_rows("instance_nets",
446 UPDATE={"error_msg": self._format_vim_error_msg(str(e)), "status": "VIM_ERROR"},
447 WHERE={"vim_net_id": task_id})
448 except db_base_Exception as e:
449 self.logger.error("Error updating database %s", str(e))
450 return False, str(e)
451 #except ovimException as e:
452 # self.logger.error("Error creating NET in ovim, task=%s: %s", str(task_id), str(e))
453 # return False, str(e)
454
455 def new_vm(self, task):
456 try:
457 params = task["params"]
458 task_id = task["id"]
459 depends = task.get("depends")
460 net_list = params[5]
461 error_text = ""
462 for net in net_list:
463 if "net_id" in net and is_task_id(net["net_id"]): # change task_id into network_id
464 try:
465 task_net = depends[net["net_id"]]
466 with self.task_lock:
467 if task_net["status"] == "error":
468 error_text = "Cannot create VM because depends on a network that cannot be created: " +\
469 str(task_net["result"])
470 break
471 elif task_net["status"] == "enqueued" or task_net["status"] == "processing":
472 error_text = "Cannot create VM because depends on a network still not created"
473 break
474 network_id = task_net["result"]
475 net["net_id"] = network_id
476 except Exception as e:
477 error_text = "Error trying to map from task_id={} to task result: {}".format(
478 net["net_id"],str(e))
479 break
480 if not error_text:
481 vm_id = self.vim.new_vminstance(*params)
482 try:
483 with self.db_lock:
484 if error_text:
485 update = self.db.update_rows("instance_vms",
486 UPDATE={"status": "VIM_ERROR", "error_msg": error_text},
487 WHERE={"vim_vm_id": task_id})
488 else:
489 update = self.db.update_rows("instance_vms", UPDATE={"vim_vm_id": vm_id}, WHERE={"vim_vm_id": task_id})
490 if not update:
491 self.logger.error("task id={} name={} database not updated vim_vm_id={}".format(
492 task["id"], task["name"], vm_id))
493 except db_base_Exception as e:
494 self.logger.error("Error updating database %s", str(e))
495 if error_text:
496 return False, error_text
497 new_refresh_task = {"status": "enqueued",
498 "id": task_id,
499 "name": "get-vm",
500 "vim_id": vm_id,
501 "vim_info": {"interfaces":[]} }
502 self._insert_refresh(new_refresh_task, time.time())
503 return True, vm_id
504 except vimconn.vimconnException as e:
505 self.logger.error("Error creating VM, task=%s: %s", str(task_id), str(e))
506 try:
507 with self.db_lock:
508 self.db.update_rows("instance_vms",
509 UPDATE={"error_msg": self._format_vim_error_msg(str(e)), "status": "VIM_ERROR"},
510 WHERE={"vim_vm_id": task_id})
511 except db_base_Exception as edb:
512 self.logger.error("Error updating database %s", str(edb))
513 return False, str(e)
514
515 def del_vm(self, task):
516 vm_id = task["params"][0]
517 interfaces = task["params"][1]
518 if is_task_id(vm_id):
519 try:
520 task_create = task["depends"][vm_id]
521 with self.task_lock:
522 if task_create["status"] == "error":
523 return True, "VM was not created. It has error: " + str(task_create["result"])
524 elif task_create["status"] == "enqueued" or task_create["status"] == "processing":
525 return False, "Cannot delete VM vim_id={} because still creating".format(vm_id)
526 vm_id = task_create["result"]
527 except Exception as e:
528 return False, "Error trying to get task_id='{}':".format(vm_id, str(e))
529 try:
530 self._remove_refresh("get-vm", vm_id)
531 for iface in interfaces:
532 if iface.get("sdn_port_id"):
533 try:
534 self.ovim.delete_port(iface["sdn_port_id"])
535 except ovimException as e:
536 self.logger.error("ovimException deleting external_port={} at VM vim_id={} deletion ".format(
537 iface["sdn_port_id"], vm_id) + str(e), exc_info=True)
538 # TODO Set error_msg at instance_nets
539
540 return True, self.vim.delete_vminstance(vm_id)
541 except vimconn.vimconnException as e:
542 return False, str(e)
543
544 def del_net(self, task):
545 net_id = task["params"][0]
546 sdn_net_id = task["params"][1]
547 if is_task_id(net_id):
548 try:
549 task_create = task["depends"][net_id]
550 with self.task_lock:
551 if task_create["status"] == "error":
552 return True, "net was not created. It has error: " + str(task_create["result"])
553 elif task_create["status"] == "enqueued" or task_create["status"] == "processing":
554 return False, "Cannot delete net because still creating"
555 net_id = task_create["result"]
556 except Exception as e:
557 return False, "Error trying to get task_id='{}':".format(net_id, str(e))
558 try:
559 self._remove_refresh("get-net", net_id)
560 result = self.vim.delete_network(net_id)
561 if sdn_net_id:
562 #Delete any attached port to this sdn network
563 #At this point, there will be ports associated to this network in case it was manually done using 'openmano vim-net-sdn-attach'
564 try:
565 port_list = self.ovim.get_ports(columns={'uuid'}, filter={'name': 'external_port', 'net_id': sdn_net_id})
566 except ovimException as e:
567 raise vimconn.vimconnException(
568 "ovimException obtaining external ports for net {}. ".format(sdn_net_id) + str(e))
569
570 for port in port_list:
571 try:
572 self.ovim.delete_port(port['uuid'])
573 except ovimException as e:
574 raise vimconn.vimconnException(
575 "ovimException deleting port {} for net {}. ".format(port['uuid'], sdn_net_id) + str(e))
576 with self.db_lock:
577 self.ovim.delete_network(sdn_net_id)
578 return True, result
579 except vimconn.vimconnException as e:
580 return False, str(e)
581 except ovimException as e:
582 logging.error("Error deleting network from ovim. net_id: {}, sdn_net_id: {}".format(net_id, sdn_net_id))
583 return False, str(e)
584
585