Fix error at creating vm_thread adding a datacenter
[osm/RO.git] / osm_ro / vim_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
5 # This file is part of openvim
6 # All Rights Reserved.
7 #
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
11 #
12 # http://www.apache.org/licenses/LICENSE-2.0
13 #
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
18 # under the License.
19 #
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
22 ##
23
24 """"
25 This is thread that interact with the host and the libvirt to manage VM
26 One thread will be launched per host
27 """
28
29 import threading
30 import time
31 import Queue
32 import logging
33 import vimconn
34 from db_base import db_base_Exception
35 from lib_osm_openvim.ovim import ovimException
36
37 __author__ = "Alfonso Tierno, Pablo Montes"
38 __date__ = "$10-feb-2017 12:07:15$"
39
40 # from logging import Logger
41 # import auxiliary_functions as af
42
43
44 def is_task_id(task_id):
45 return True if task_id[:5] == "TASK." else False
46
47
48 class vim_thread(threading.Thread):
49
50 def __init__(self, vimconn, task_lock, name=None, datacenter_name=None, datacenter_tenant_id=None,
51 db=None, db_lock=None, ovim=None):
52 """Init a thread.
53 Arguments:
54 'id' number of thead
55 'name' name of thread
56 'host','user': host ip or name to manage and user
57 'db', 'db_lock': database class and lock to use it in exclusion
58 """
59 self.tasksResult = {}
60 """ It will contain a dictionary with
61 task_id:
62 status: enqueued,done,error,deleted,processing
63 result: VIM result,
64 """
65 threading.Thread.__init__(self)
66 self.vim = vimconn
67 self.datacenter_name = datacenter_name
68 self.datacenter_tenant_id = datacenter_tenant_id
69 self.ovim = ovim
70 if not name:
71 self.name = vimconn["id"] + "." + vimconn["config"]["datacenter_tenant_id"]
72 else:
73 self.name = name
74
75 self.logger = logging.getLogger('openmano.vim.'+self.name)
76 self.db = db
77 self.db_lock = db_lock
78
79 self.task_lock = task_lock
80 self.task_queue = Queue.Queue(2000)
81 self.refresh_list = []
82 """Contains time ordered task list for refreshing the status of VIM VMs and nets"""
83
84 def _refres_elements(self):
85 """Call VIM to get VMs and networks status until 10 elements"""
86 now = time.time()
87 vm_to_refresh_list = []
88 net_to_refresh_list = []
89 vm_to_refresh_dict = {}
90 net_to_refresh_dict = {}
91 items_to_refresh = 0
92 while self.refresh_list:
93 task = self.refresh_list[0]
94 with self.task_lock:
95 if task['status'] == 'deleted':
96 self.refresh_list.pop(0)
97 continue
98 if task['time'] > now:
99 break
100 task["status"] = "processing"
101 self.refresh_list.pop(0)
102 if task["name"] == 'get-vm':
103 vm_to_refresh_list.append(task["vim_id"])
104 vm_to_refresh_dict[task["vim_id"]] = task
105 elif task["name"] == 'get-net':
106 net_to_refresh_list.append(task["vim_id"])
107 net_to_refresh_dict[task["vim_id"]] = task
108 else:
109 error_text = "unknown task {}".format(task["name"])
110 self.logger.error(error_text)
111 items_to_refresh += 1
112 if items_to_refresh == 10:
113 break
114
115 if vm_to_refresh_list:
116 try:
117 vim_dict = self.vim.refresh_vms_status(vm_to_refresh_list)
118 for vim_id, vim_info in vim_dict.items():
119 #look for task
120 task = vm_to_refresh_dict[vim_id]
121 self.logger.debug("get-vm vm_id=%s result=%s", task["vim_id"], str(vim_info))
122
123 # update database
124 if vim_info.get("error_msg"):
125 vim_info["error_msg"] = self._format_vim_error_msg(vim_info["error_msg"])
126 if task["vim_info"].get("status") != vim_info["status"] or \
127 task["vim_info"].get("error_msg") != vim_info.get("error_msg") or \
128 task["vim_info"].get("vim_info") != vim_info["vim_info"]:
129 with self.db_lock:
130 temp_dict = {"status": vim_info["status"],
131 "error_msg": vim_info.get("error_msg"),
132 "vim_info": vim_info["vim_info"]}
133 self.db.update_rows('instance_vms', UPDATE=temp_dict, WHERE={"vim_vm_id": vim_id})
134 for interface in vim_info["interfaces"]:
135 for task_interface in task["vim_info"]["interfaces"]:
136 if task_interface["vim_net_id"] == interface["vim_net_id"]:
137 break
138 else:
139 task_interface = {"vim_net_id": interface["vim_net_id"]}
140 task["vim_info"]["interfaces"].append(task_interface)
141 if task_interface != interface:
142 #delete old port
143 if task_interface.get("sdn_port_id"):
144 try:
145 self.ovim.delete_port(task_interface["sdn_port_id"])
146 task_interface["sdn_port_id"] = None
147 except ovimException as e:
148 self.logger.error("ovimException deleting external_port={} ".format(
149 task_interface["sdn_port_id"]) + str(e), exc_info=True)
150 # TODO Set error_msg at instance_nets
151 vim_net_id = interface["vim_net_id"]
152 sdn_net_id = None
153 sdn_port_name = None
154 with self.db_lock:
155 where_= {'iv.vim_vm_id': vim_id, "ine.vim_net_id": vim_net_id,
156 'ine.datacenter_tenant_id': self.datacenter_tenant_id}
157 # TODO check why vim_interface_id is not present at database
158 # if interface.get("vim_interface_id"):
159 # where_["vim_interface_id"] = interface["vim_interface_id"]
160 db_ifaces = self.db.get_rows(
161 FROM="instance_interfaces as ii left join instance_nets as ine on "
162 "ii.instance_net_id=ine.uuid left join instance_vms as iv on "
163 "ii.instance_vm_id=iv.uuid",
164 SELECT=("ii.uuid as iface_id", "ine.uuid as net_id", "iv.uuid as vm_id", "sdn_net_id"),
165 WHERE=where_)
166 if len(db_ifaces)>1:
167 self.logger.critical("Refresing interfaces. "
168 "Found more than one interface at database for '{}'".format(where_))
169 elif len(db_ifaces)==0:
170 self.logger.critical("Refresing interfaces. "
171 "Not found any interface at database for '{}'".format(where_))
172 continue
173 else:
174 db_iface = db_ifaces[0]
175 if db_iface.get("sdn_net_id") and interface.get("compute_node") and interface.get("pci"):
176 sdn_net_id = db_iface["sdn_net_id"]
177 sdn_port_name = sdn_net_id + "." + db_iface["vm_id"]
178 sdn_port_name = sdn_port_name[:63]
179 try:
180 sdn_port_id = self.ovim.new_external_port(
181 {"compute_node": interface["compute_node"],
182 "pci": interface["pci"],
183 "vlan": interface.get("vlan"),
184 "net_id": sdn_net_id,
185 "region": self.vim["config"]["datacenter_id"],
186 "name": sdn_port_name,
187 "mac": interface.get("mac_address")})
188 interface["sdn_port_id"] = sdn_port_id
189 except (ovimException, Exception) as e:
190 self.logger.error(
191 "ovimException creating new_external_port compute_node={} " \
192 "pci={} vlan={} ".format(
193 interface["compute_node"],
194 interface["pci"],
195 interface.get("vlan")) + str(e),
196 exc_info=True)
197 # TODO Set error_msg at instance_nets
198 with self.db_lock:
199 vim_net_id = interface.pop("vim_net_id")
200 self.db.update_rows('instance_interfaces', UPDATE=interface,
201 WHERE={'uuid': db_iface["iface_id"]})
202 interface["vim_net_id"] = vim_net_id
203 # TODO insert instance_id
204
205 task["vim_info"] = vim_info
206 if task["vim_info"]["status"] == "BUILD":
207 self._insert_refresh(task, now+5) # 5seconds
208 else:
209 self._insert_refresh(task, now+300) # 5minutes
210 except vimconn.vimconnException as e:
211 self.logger.error("vimconnException Exception when trying to refresh vms " + str(e))
212 self._insert_refresh(task, now + 300) # 5minutes
213
214 if net_to_refresh_list:
215 try:
216 vim_dict = self.vim.refresh_nets_status(net_to_refresh_list)
217 for vim_id, vim_info in vim_dict.items():
218 #look for task
219 task = net_to_refresh_dict[vim_id]
220 self.logger.debug("get-net net_id=%s result=%s", task["vim_id"], str(vim_info))
221
222 #get database info
223 where_ = {"vim_net_id": vim_id, 'datacenter_tenant_id': self.datacenter_tenant_id}
224 with self.db_lock:
225 db_nets = self.db.get_rows(
226 FROM="instance_nets",
227 SELECT=("uuid as net_id", "sdn_net_id"),
228 WHERE=where_)
229 if len(db_nets) > 1:
230 self.logger.critical("Refresing networks. "
231 "Found more than one instance-networks at database for '{}'".format(where_))
232 elif len(db_nets) == 0:
233 self.logger.critical("Refresing networks. "
234 "Not found any instance-network at database for '{}'".format(where_))
235 continue
236 else:
237 db_net = db_nets[0]
238 if db_net.get("sdn_net_id"):
239 # get ovim status
240 try:
241 sdn_net = self.ovim.show_network(db_net["sdn_net_id"])
242 if sdn_net["status"] == "ERROR":
243 if not vim_info.get("error_msg"):
244 vim_info["error_msg"] = sdn_net["error_msg"]
245 else:
246 vim_info["error_msg"] = "VIM_ERROR: {} && SDN_ERROR: {}".format(
247 self._format_vim_error_msg(vim_info["error_msg"], 1024//2-14),
248 self._format_vim_error_msg(sdn_net["error_msg"], 1024//2-14))
249 if vim_info["status"] == "VIM_ERROR":
250 vim_info["status"] = "VIM_SDN_ERROR"
251 else:
252 vim_info["status"] = "SDN_ERROR"
253
254 except (ovimException, Exception) as e:
255 self.logger.error(
256 "ovimException getting network infor snd_net_id={}".format(db_net["sdn_net_id"]),
257 exc_info=True)
258 # TODO Set error_msg at instance_nets
259
260 # update database
261 if vim_info.get("error_msg"):
262 vim_info["error_msg"] = self._format_vim_error_msg(vim_info["error_msg"])
263 if task["vim_info"].get("status") != vim_info["status"] or \
264 task["vim_info"].get("error_msg") != vim_info.get("error_msg") or \
265 task["vim_info"].get("vim_info") != vim_info["vim_info"]:
266 with self.db_lock:
267 temp_dict = {"status": vim_info["status"],
268 "error_msg": vim_info.get("error_msg"),
269 "vim_info": vim_info["vim_info"]}
270 self.db.update_rows('instance_nets', UPDATE=temp_dict, WHERE={"vim_net_id": vim_id})
271
272 task["vim_info"] = vim_info
273 if task["vim_info"]["status"] == "BUILD":
274 self._insert_refresh(task, now+5) # 5seconds
275 else:
276 self._insert_refresh(task, now+300) # 5minutes
277 except vimconn.vimconnException as e:
278 self.logger.error("vimconnException Exception when trying to refresh nets " + str(e))
279 self._insert_refresh(task, now + 300) # 5minutes
280
281 if not items_to_refresh:
282 time.sleep(1)
283
284 def _insert_refresh(self, task, threshold_time):
285 """Insert a task at list of refreshing elements. The refreshing list is ordered by threshold_time (task['time']
286 It is assumed that this is called inside this thread
287 """
288 task["time"] = threshold_time
289 for index in range(0, len(self.refresh_list)):
290 if self.refresh_list[index]["time"] > threshold_time:
291 self.refresh_list.insert(index, task)
292 break
293 else:
294 index = len(self.refresh_list)
295 self.refresh_list.append(task)
296 self.logger.debug("new refresh task={} name={}, time={} index={}".format(
297 task["id"], task["name"], task["time"], index))
298
299 def _remove_refresh(self, task_name, vim_id):
300 """Remove a task with this name and vim_id from the list of refreshing elements.
301 It is assumed that this is called inside this thread outside _refres_elements method
302 Return True if self.refresh_list is modified, task is found
303 Return False if not found
304 """
305 index_to_delete = None
306 for index in range(0, len(self.refresh_list)):
307 if self.refresh_list[index]["name"] == task_name and self.refresh_list[index]["vim_id"] == vim_id:
308 index_to_delete = index
309 break
310 else:
311 return False
312 if index_to_delete != None:
313 del self.refresh_list[index_to_delete]
314 return True
315
316 def insert_task(self, task):
317 try:
318 self.task_queue.put(task, False)
319 return task["id"]
320 except Queue.Full:
321 raise vimconn.vimconnException(self.name + ": timeout inserting a task")
322
323 def del_task(self, task):
324 with self.task_lock:
325 if task["status"] == "enqueued":
326 task["status"] == "deleted"
327 return True
328 else: # task["status"] == "processing"
329 self.task_lock.release()
330 return False
331
332 def run(self):
333 self.logger.debug("Starting")
334 while True:
335 #TODO reload service
336 while True:
337 try:
338 if not self.task_queue.empty():
339 task = self.task_queue.get()
340 self.task_lock.acquire()
341 if task["status"] == "deleted":
342 self.task_lock.release()
343 continue
344 task["status"] = "processing"
345 self.task_lock.release()
346 else:
347 self._refres_elements()
348 continue
349 self.logger.debug("processing task id={} name={} params={}".format(task["id"], task["name"],
350 str(task["params"])))
351 if task["name"] == 'exit' or task["name"] == 'reload':
352 result, content = self.terminate(task)
353 elif task["name"] == 'new-vm':
354 result, content = self.new_vm(task)
355 elif task["name"] == 'del-vm':
356 result, content = self.del_vm(task)
357 elif task["name"] == 'new-net':
358 result, content = self.new_net(task)
359 elif task["name"] == 'del-net':
360 result, content = self.del_net(task)
361 else:
362 error_text = "unknown task {}".format(task["name"])
363 self.logger.error(error_text)
364 result = False
365 content = error_text
366 self.logger.debug("task id={} name={} result={}:{} params={}".format(task["id"], task["name"],
367 result, content,
368 str(task["params"])))
369
370 with self.task_lock:
371 task["status"] = "done" if result else "error"
372 task["result"] = content
373 self.task_queue.task_done()
374
375 if task["name"] == 'exit':
376 return 0
377 elif task["name"] == 'reload':
378 break
379 except Exception as e:
380 self.logger.critical("Unexpected exception at run: " + str(e), exc_info=True)
381
382 self.logger.debug("Finishing")
383
384 def terminate(self, task):
385 return True, None
386
387 def _format_vim_error_msg(self, error_text, max_length=1024):
388 if error_text and len(error_text) >= max_length:
389 return error_text[:max_length//2-3] + " ... " + error_text[-max_length//2+3:]
390 return error_text
391
392 def new_net(self, task):
393 try:
394 task_id = task["id"]
395 params = task["params"]
396 net_id = self.vim.new_network(*params)
397
398 net_name = params[0]
399 net_type = params[1]
400
401 network = None
402 sdn_net_id = None
403 sdn_controller = self.vim.config.get('sdn-controller')
404 if sdn_controller and (net_type == "data" or net_type == "ptp"):
405 network = {"name": net_name, "type": net_type}
406
407 vim_net = self.vim.get_network(net_id)
408 if vim_net.get('encapsulation') != 'vlan':
409 raise vimconn.vimconnException(
410 "net '{}' defined as type '{}' has not vlan encapsulation '{}'".format(
411 net_name, net_type, vim_net['encapsulation']))
412 network["vlan"] = vim_net.get('segmentation_id')
413 try:
414 sdn_net_id = self.ovim.new_network(network)
415 except (ovimException, Exception) as e:
416 self.logger.error("task=%s cannot create SDN network vim_net_id=%s input='%s' ovimException='%s'",
417 str(task_id), net_id, str(network), str(e))
418 with self.db_lock:
419 self.db.update_rows("instance_nets", UPDATE={"vim_net_id": net_id, "sdn_net_id": sdn_net_id},
420 WHERE={"vim_net_id": task_id})
421 new_refresh_task = {"status": "enqueued",
422 "id": task_id,
423 "name": "get-net",
424 "vim_id": net_id,
425 "vim_info": {} }
426 self._insert_refresh(new_refresh_task, time.time())
427 return True, net_id
428 except db_base_Exception as e:
429 self.logger.error("Error updating database %s", str(e))
430 return True, net_id
431 except vimconn.vimconnException as e:
432 self.logger.error("Error creating NET, task=%s: %s", str(task_id), str(e))
433 try:
434 with self.db_lock:
435 self.db.update_rows("instance_nets",
436 UPDATE={"error_msg": self._format_vim_error_msg(str(e)), "status": "VIM_ERROR"},
437 WHERE={"vim_net_id": task_id})
438 except db_base_Exception as e:
439 self.logger.error("Error updating database %s", str(e))
440 return False, str(e)
441 #except ovimException as e:
442 # self.logger.error("Error creating NET in ovim, task=%s: %s", str(task_id), str(e))
443 # return False, str(e)
444
445 def new_vm(self, task):
446 try:
447 params = task["params"]
448 task_id = task["id"]
449 depends = task.get("depends")
450 net_list = params[5]
451 error_text = ""
452 for net in net_list:
453 if "net_id" in net and is_task_id(net["net_id"]): # change task_id into network_id
454 try:
455 task_net = depends[net["net_id"]]
456 with self.task_lock:
457 if task_net["status"] == "error":
458 error_text = "Cannot create VM because depends on a network that cannot be created: " +\
459 str(task_net["result"])
460 break
461 elif task_net["status"] == "enqueued" or task_net["status"] == "processing":
462 error_text = "Cannot create VM because depends on a network still not created"
463 break
464 network_id = task_net["result"]
465 net["net_id"] = network_id
466 except Exception as e:
467 error_text = "Error trying to map from task_id={} to task result: {}".format(
468 net["net_id"],str(e))
469 break
470 if not error_text:
471 vm_id = self.vim.new_vminstance(*params)
472 try:
473 with self.db_lock:
474 if error_text:
475 update = self.db.update_rows("instance_vms",
476 UPDATE={"status": "VIM_ERROR", "error_msg": error_text},
477 WHERE={"vim_vm_id": task_id})
478 else:
479 update = self.db.update_rows("instance_vms", UPDATE={"vim_vm_id": vm_id}, WHERE={"vim_vm_id": task_id})
480 if not update:
481 self.logger.error("task id={} name={} database not updated vim_vm_id={}".format(
482 task["id"], task["name"], vm_id))
483 except db_base_Exception as e:
484 self.logger.error("Error updating database %s", str(e))
485 if error_text:
486 return False, error_text
487 new_refresh_task = {"status": "enqueued",
488 "id": task_id,
489 "name": "get-vm",
490 "vim_id": vm_id,
491 "vim_info": {"interfaces":[]} }
492 self._insert_refresh(new_refresh_task, time.time())
493 return True, vm_id
494 except vimconn.vimconnException as e:
495 self.logger.error("Error creating VM, task=%s: %s", str(task_id), str(e))
496 try:
497 with self.db_lock:
498 self.db.update_rows("instance_vms",
499 UPDATE={"error_msg": self._format_vim_error_msg(str(e)), "status": "VIM_ERROR"},
500 WHERE={"vim_vm_id": task_id})
501 except db_base_Exception as edb:
502 self.logger.error("Error updating database %s", str(edb))
503 return False, str(e)
504
505 def del_vm(self, task):
506 vm_id = task["params"][0]
507 interfaces = task["params"][1]
508 if is_task_id(vm_id):
509 try:
510 task_create = task["depends"][vm_id]
511 with self.task_lock:
512 if task_create["status"] == "error":
513 return True, "VM was not created. It has error: " + str(task_create["result"])
514 elif task_create["status"] == "enqueued" or task_create["status"] == "processing":
515 return False, "Cannot delete VM vim_id={} because still creating".format(vm_id)
516 vm_id = task_create["result"]
517 except Exception as e:
518 return False, "Error trying to get task_id='{}':".format(vm_id, str(e))
519 try:
520 self._remove_refresh("get-vm", vm_id)
521 for iface in interfaces:
522 if iface.get("sdn_port_id"):
523 try:
524 self.ovim.delete_port(iface["sdn_port_id"])
525 except ovimException as e:
526 self.logger.error("ovimException deleting external_port={} at VM vim_id={} deletion ".format(
527 iface["sdn_port_id"], vm_id) + str(e), exc_info=True)
528 # TODO Set error_msg at instance_nets
529
530 return True, self.vim.delete_vminstance(vm_id)
531 except vimconn.vimconnException as e:
532 return False, str(e)
533
534 def del_net(self, task):
535 net_id = task["params"][0]
536 sdn_net_id = task["params"][1]
537 if is_task_id(net_id):
538 try:
539 task_create = task["depends"][net_id]
540 with self.task_lock:
541 if task_create["status"] == "error":
542 return True, "net was not created. It has error: " + str(task_create["result"])
543 elif task_create["status"] == "enqueued" or task_create["status"] == "processing":
544 return False, "Cannot delete net because still creating"
545 net_id = task_create["result"]
546 except Exception as e:
547 return False, "Error trying to get task_id='{}':".format(net_id, str(e))
548 try:
549 self._remove_refresh("get-net", net_id)
550 result = self.vim.delete_network(net_id)
551 if sdn_net_id:
552 with self.db_lock:
553 self.ovim.delete_network(sdn_net_id)
554 return True, result
555 except vimconn.vimconnException as e:
556 return False, str(e)
557 except ovimException as e:
558 logging.error("Error deleting network from ovim. net_id: {}, sdn_net_id: {}".format(net_id, sdn_net_id))
559 return False, str(e)
560
561