1 # -*- coding: utf-8 -*-
4 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
5 # This file is part of openvim
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
12 # http://www.apache.org/licenses/LICENSE-2.0
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
25 This is thread that interact with the host and the libvirt to manage VM
26 One thread will be launched per host
34 from db_base
import db_base_Exception
35 from lib_osm_openvim
.ovim
import ovimException
37 __author__
= "Alfonso Tierno, Pablo Montes"
38 __date__
= "$10-feb-2017 12:07:15$"
40 # from logging import Logger
41 # import auxiliary_functions as af
44 def is_task_id(task_id
):
45 return True if task_id
[:5] == "TASK." else False
48 class vim_thread(threading
.Thread
):
49 REFRESH_BUILD
= 5 # 5 seconds
50 REFRESH_ACTIVE
= 60 # 1 minute
52 def __init__(self
, vimconn
, task_lock
, name
=None, datacenter_name
=None, datacenter_tenant_id
=None,
53 db
=None, db_lock
=None, ovim
=None):
58 'host','user': host ip or name to manage and user
59 'db', 'db_lock': database class and lock to use it in exclusion
62 """ It will contain a dictionary with
64 status: enqueued,done,error,deleted,processing
67 threading
.Thread
.__init
__(self
)
69 self
.datacenter_name
= datacenter_name
70 self
.datacenter_tenant_id
= datacenter_tenant_id
73 self
.name
= vimconn
["id"] + "." + vimconn
["config"]["datacenter_tenant_id"]
77 self
.logger
= logging
.getLogger('openmano.vim.'+self
.name
)
79 self
.db_lock
= db_lock
81 self
.task_lock
= task_lock
82 self
.task_queue
= Queue
.Queue(2000)
83 self
.refresh_list
= []
84 """Contains time ordered task list for refreshing the status of VIM VMs and nets"""
86 def _refres_elements(self
):
87 """Call VIM to get VMs and networks status until 10 elements"""
89 vm_to_refresh_list
= []
90 net_to_refresh_list
= []
91 vm_to_refresh_dict
= {}
92 net_to_refresh_dict
= {}
94 while self
.refresh_list
:
95 task
= self
.refresh_list
[0]
97 if task
['status'] == 'deleted':
98 self
.refresh_list
.pop(0)
100 if task
['time'] > now
:
102 task
["status"] = "processing"
103 self
.refresh_list
.pop(0)
104 if task
["name"] == 'get-vm':
105 vm_to_refresh_list
.append(task
["vim_id"])
106 vm_to_refresh_dict
[task
["vim_id"]] = task
107 elif task
["name"] == 'get-net':
108 net_to_refresh_list
.append(task
["vim_id"])
109 net_to_refresh_dict
[task
["vim_id"]] = task
111 error_text
= "unknown task {}".format(task
["name"])
112 self
.logger
.error(error_text
)
113 items_to_refresh
+= 1
114 if items_to_refresh
== 10:
117 if vm_to_refresh_list
:
119 vim_dict
= self
.vim
.refresh_vms_status(vm_to_refresh_list
)
120 for vim_id
, vim_info
in vim_dict
.items():
122 task
= vm_to_refresh_dict
[vim_id
]
123 self
.logger
.debug("get-vm vm_id=%s result=%s", task
["vim_id"], str(vim_info
))
126 if vim_info
.get("error_msg"):
127 vim_info
["error_msg"] = self
._format
_vim
_error
_msg
(vim_info
["error_msg"])
128 if task
["vim_info"].get("status") != vim_info
["status"] or \
129 task
["vim_info"].get("error_msg") != vim_info
.get("error_msg") or \
130 task
["vim_info"].get("vim_info") != vim_info
["vim_info"]:
132 temp_dict
= {"status": vim_info
["status"],
133 "error_msg": vim_info
.get("error_msg"),
134 "vim_info": vim_info
["vim_info"]}
135 self
.db
.update_rows('instance_vms', UPDATE
=temp_dict
, WHERE
={"vim_vm_id": vim_id
})
136 for interface
in vim_info
["interfaces"]:
137 for task_interface
in task
["vim_info"]["interfaces"]:
138 if task_interface
["vim_net_id"] == interface
["vim_net_id"]:
141 task_interface
= {"vim_net_id": interface
["vim_net_id"]}
142 task
["vim_info"]["interfaces"].append(task_interface
)
143 if task_interface
!= interface
:
145 if task_interface
.get("sdn_port_id"):
147 self
.ovim
.delete_port(task_interface
["sdn_port_id"])
148 task_interface
["sdn_port_id"] = None
149 except ovimException
as e
:
150 self
.logger
.error("ovimException deleting external_port={} ".format(
151 task_interface
["sdn_port_id"]) + str(e
), exc_info
=True)
152 # TODO Set error_msg at instance_nets
153 vim_net_id
= interface
["vim_net_id"]
157 where_
= {'iv.vim_vm_id': vim_id
, "ine.vim_net_id": vim_net_id
,
158 'ine.datacenter_tenant_id': self
.datacenter_tenant_id
}
159 # TODO check why vim_interface_id is not present at database
160 # if interface.get("vim_interface_id"):
161 # where_["vim_interface_id"] = interface["vim_interface_id"]
162 db_ifaces
= self
.db
.get_rows(
163 FROM
="instance_interfaces as ii left join instance_nets as ine on "
164 "ii.instance_net_id=ine.uuid left join instance_vms as iv on "
165 "ii.instance_vm_id=iv.uuid",
166 SELECT
=("ii.uuid as iface_id", "ine.uuid as net_id", "iv.uuid as vm_id", "sdn_net_id", "vim_net_id"),
169 self
.logger
.critical("Refresing interfaces. "
170 "Found more than one interface at database for '{}'".format(where_
))
171 elif len(db_ifaces
)==0:
172 self
.logger
.critical("Refresing interfaces. "
173 "Not found any interface at database for '{}'".format(where_
))
176 db_iface
= db_ifaces
[0]
177 #If there is no sdn_net_id, check if it is because an already created vim network is being used
178 #in that case, the sdn_net_id will be in that entry of the instance_nets table
179 if not db_iface
.get("sdn_net_id"):
180 result
= self
.db
.get_rows(SELECT
=('sdn_net_id',), FROM
='instance_nets',
181 WHERE
={'vim_net_id': db_iface
.get("vim_net_id"), 'instance_scenario_id': None, "datacenter_tenant_id": self
.datacenter_tenant_id
})
183 db_iface
["sdn_net_id"] = result
[0]['sdn_net_id']
185 if db_iface
.get("sdn_net_id") and interface
.get("compute_node") and interface
.get("pci"):
186 sdn_net_id
= db_iface
["sdn_net_id"]
187 sdn_port_name
= sdn_net_id
+ "." + db_iface
["vm_id"]
188 sdn_port_name
= sdn_port_name
[:63]
190 sdn_port_id
= self
.ovim
.new_external_port(
191 {"compute_node": interface
["compute_node"],
192 "pci": interface
["pci"],
193 "vlan": interface
.get("vlan"),
194 "net_id": sdn_net_id
,
195 "region": self
.vim
["config"]["datacenter_id"],
196 "name": sdn_port_name
,
197 "mac": interface
.get("mac_address")})
198 interface
["sdn_port_id"] = sdn_port_id
199 except (ovimException
, Exception) as e
:
201 "ovimException creating new_external_port compute_node={} " \
202 "pci={} vlan={} ".format(
203 interface
["compute_node"],
205 interface
.get("vlan")) + str(e
),
207 # TODO Set error_msg at instance_nets
209 vim_net_id
= interface
.pop("vim_net_id")
210 self
.db
.update_rows('instance_interfaces', UPDATE
=interface
,
211 WHERE
={'uuid': db_iface
["iface_id"]})
212 interface
["vim_net_id"] = vim_net_id
213 # TODO insert instance_id
215 task
["vim_info"] = vim_info
216 if task
["vim_info"]["status"] == "BUILD":
217 self
._insert
_refresh
(task
, now
+ self
.REFRESH_BUILD
)
219 self
._insert
_refresh
(task
, now
+ self
.REFRESH_ACTIVE
)
220 except vimconn
.vimconnException
as e
:
221 self
.logger
.error("vimconnException Exception when trying to refresh vms " + str(e
))
222 self
._insert
_refresh
(task
, now
+ self
.REFRESH_ACTIVE
)
224 if net_to_refresh_list
:
226 vim_dict
= self
.vim
.refresh_nets_status(net_to_refresh_list
)
227 for vim_id
, vim_info
in vim_dict
.items():
229 task
= net_to_refresh_dict
[vim_id
]
230 self
.logger
.debug("get-net net_id=%s result=%s", task
["vim_id"], str(vim_info
))
233 where_
= {"vim_net_id": vim_id
, 'datacenter_tenant_id': self
.datacenter_tenant_id
}
235 db_nets
= self
.db
.get_rows(
236 FROM
="instance_nets",
237 SELECT
=("uuid as net_id", "sdn_net_id"),
240 self
.logger
.critical("Refresing networks. "
241 "Found more than one instance-networks at database for '{}'".format(where_
))
242 elif len(db_nets
) == 0:
243 self
.logger
.critical("Refresing networks. "
244 "Not found any instance-network at database for '{}'".format(where_
))
248 if db_net
.get("sdn_net_id"):
251 sdn_net
= self
.ovim
.show_network(db_net
["sdn_net_id"])
252 if sdn_net
["status"] == "ERROR":
253 if not vim_info
.get("error_msg"):
254 vim_info
["error_msg"] = sdn_net
["error_msg"]
256 vim_info
["error_msg"] = "VIM_ERROR: {} && SDN_ERROR: {}".format(
257 self
._format
_vim
_error
_msg
(vim_info
["error_msg"], 1024//2-14),
258 self
._format
_vim
_error
_msg
(sdn_net
["error_msg"], 1024//2-14))
259 if vim_info
["status"] == "VIM_ERROR":
260 vim_info
["status"] = "VIM_SDN_ERROR"
262 vim_info
["status"] = "SDN_ERROR"
264 except (ovimException
, Exception) as e
:
266 "ovimException getting network infor snd_net_id={}".format(db_net
["sdn_net_id"]),
268 # TODO Set error_msg at instance_nets
271 if vim_info
.get("error_msg"):
272 vim_info
["error_msg"] = self
._format
_vim
_error
_msg
(vim_info
["error_msg"])
273 if task
["vim_info"].get("status") != vim_info
["status"] or \
274 task
["vim_info"].get("error_msg") != vim_info
.get("error_msg") or \
275 task
["vim_info"].get("vim_info") != vim_info
["vim_info"]:
277 temp_dict
= {"status": vim_info
["status"],
278 "error_msg": vim_info
.get("error_msg"),
279 "vim_info": vim_info
["vim_info"]}
280 self
.db
.update_rows('instance_nets', UPDATE
=temp_dict
, WHERE
={"vim_net_id": vim_id
})
282 task
["vim_info"] = vim_info
283 if task
["vim_info"]["status"] == "BUILD":
284 self
._insert
_refresh
(task
, now
+ self
.REFRESH_BUILD
)
286 self
._insert
_refresh
(task
, now
+ self
.REFRESH_ACTIVE
)
287 except vimconn
.vimconnException
as e
:
288 self
.logger
.error("vimconnException Exception when trying to refresh nets " + str(e
))
289 self
._insert
_refresh
(task
, now
+ self
.REFRESH_ACTIVE
)
291 if not items_to_refresh
:
294 def _insert_refresh(self
, task
, threshold_time
):
295 """Insert a task at list of refreshing elements. The refreshing list is ordered by threshold_time (task['time']
296 It is assumed that this is called inside this thread
298 task
["time"] = threshold_time
299 for index
in range(0, len(self
.refresh_list
)):
300 if self
.refresh_list
[index
]["time"] > threshold_time
:
301 self
.refresh_list
.insert(index
, task
)
304 index
= len(self
.refresh_list
)
305 self
.refresh_list
.append(task
)
306 self
.logger
.debug("new refresh task={} name={}, time={} index={}".format(
307 task
["id"], task
["name"], task
["time"], index
))
309 def _remove_refresh(self
, task_name
, vim_id
):
310 """Remove a task with this name and vim_id from the list of refreshing elements.
311 It is assumed that this is called inside this thread outside _refres_elements method
312 Return True if self.refresh_list is modified, task is found
313 Return False if not found
315 index_to_delete
= None
316 for index
in range(0, len(self
.refresh_list
)):
317 if self
.refresh_list
[index
]["name"] == task_name
and self
.refresh_list
[index
]["vim_id"] == vim_id
:
318 index_to_delete
= index
322 if index_to_delete
!= None:
323 del self
.refresh_list
[index_to_delete
]
326 def insert_task(self
, task
):
328 self
.task_queue
.put(task
, False)
331 raise vimconn
.vimconnException(self
.name
+ ": timeout inserting a task")
333 def del_task(self
, task
):
335 if task
["status"] == "enqueued":
336 task
["status"] == "deleted"
338 else: # task["status"] == "processing"
339 self
.task_lock
.release()
343 self
.logger
.debug("Starting")
348 if not self
.task_queue
.empty():
349 task
= self
.task_queue
.get()
350 self
.task_lock
.acquire()
351 if task
["status"] == "deleted":
352 self
.task_lock
.release()
354 task
["status"] = "processing"
355 self
.task_lock
.release()
357 self
._refres
_elements
()
359 self
.logger
.debug("processing task id={} name={} params={}".format(task
["id"], task
["name"],
360 str(task
["params"])))
361 if task
["name"] == 'exit' or task
["name"] == 'reload':
362 result
, content
= self
.terminate(task
)
363 elif task
["name"] == 'new-vm':
364 result
, content
= self
.new_vm(task
)
365 elif task
["name"] == 'del-vm':
366 result
, content
= self
.del_vm(task
)
367 elif task
["name"] == 'new-net':
368 result
, content
= self
.new_net(task
)
369 elif task
["name"] == 'del-net':
370 result
, content
= self
.del_net(task
)
372 error_text
= "unknown task {}".format(task
["name"])
373 self
.logger
.error(error_text
)
376 self
.logger
.debug("task id={} name={} result={}:{} params={}".format(task
["id"], task
["name"],
378 str(task
["params"])))
381 task
["status"] = "done" if result
else "error"
382 task
["result"] = content
383 self
.task_queue
.task_done()
385 if task
["name"] == 'exit':
387 elif task
["name"] == 'reload':
389 except Exception as e
:
390 self
.logger
.critical("Unexpected exception at run: " + str(e
), exc_info
=True)
392 self
.logger
.debug("Finishing")
394 def terminate(self
, task
):
397 def _format_vim_error_msg(self
, error_text
, max_length
=1024):
398 if error_text
and len(error_text
) >= max_length
:
399 return error_text
[:max_length
//2-3] + " ... " + error_text
[-max_length
//2+3:]
402 def new_net(self
, task
):
405 params
= task
["params"]
406 net_id
= self
.vim
.new_network(*params
)
413 sdn_controller
= self
.vim
.config
.get('sdn-controller')
414 if sdn_controller
and (net_type
== "data" or net_type
== "ptp"):
415 network
= {"name": net_name
, "type": net_type
, "region": self
.vim
["config"]["datacenter_id"]}
417 vim_net
= self
.vim
.get_network(net_id
)
418 if vim_net
.get('encapsulation') != 'vlan':
419 raise vimconn
.vimconnException(
420 "net '{}' defined as type '{}' has not vlan encapsulation '{}'".format(
421 net_name
, net_type
, vim_net
['encapsulation']))
422 network
["vlan"] = vim_net
.get('segmentation_id')
424 sdn_net_id
= self
.ovim
.new_network(network
)
425 except (ovimException
, Exception) as e
:
426 self
.logger
.error("task=%s cannot create SDN network vim_net_id=%s input='%s' ovimException='%s'",
427 str(task_id
), net_id
, str(network
), str(e
))
429 self
.db
.update_rows("instance_nets", UPDATE
={"vim_net_id": net_id
, "sdn_net_id": sdn_net_id
},
430 WHERE
={"vim_net_id": task_id
})
431 new_refresh_task
= {"status": "enqueued",
436 self
._insert
_refresh
(new_refresh_task
, time
.time())
438 except db_base_Exception
as e
:
439 self
.logger
.error("Error updating database %s", str(e
))
441 except vimconn
.vimconnException
as e
:
442 self
.logger
.error("Error creating NET, task=%s: %s", str(task_id
), str(e
))
445 self
.db
.update_rows("instance_nets",
446 UPDATE
={"error_msg": self
._format
_vim
_error
_msg
(str(e
)), "status": "VIM_ERROR"},
447 WHERE
={"vim_net_id": task_id
})
448 except db_base_Exception
as e
:
449 self
.logger
.error("Error updating database %s", str(e
))
451 #except ovimException as e:
452 # self.logger.error("Error creating NET in ovim, task=%s: %s", str(task_id), str(e))
453 # return False, str(e)
455 def new_vm(self
, task
):
457 params
= task
["params"]
459 depends
= task
.get("depends")
463 if "net_id" in net
and is_task_id(net
["net_id"]): # change task_id into network_id
465 task_net
= depends
[net
["net_id"]]
467 if task_net
["status"] == "error":
468 error_text
= "Cannot create VM because depends on a network that cannot be created: " +\
469 str(task_net
["result"])
471 elif task_net
["status"] == "enqueued" or task_net
["status"] == "processing":
472 error_text
= "Cannot create VM because depends on a network still not created"
474 network_id
= task_net
["result"]
475 net
["net_id"] = network_id
476 except Exception as e
:
477 error_text
= "Error trying to map from task_id={} to task result: {}".format(
478 net
["net_id"],str(e
))
481 vm_id
= self
.vim
.new_vminstance(*params
)
485 update
= self
.db
.update_rows("instance_vms",
486 UPDATE
={"status": "VIM_ERROR", "error_msg": error_text
},
487 WHERE
={"vim_vm_id": task_id
})
489 update
= self
.db
.update_rows("instance_vms", UPDATE
={"vim_vm_id": vm_id
}, WHERE
={"vim_vm_id": task_id
})
491 self
.logger
.error("task id={} name={} database not updated vim_vm_id={}".format(
492 task
["id"], task
["name"], vm_id
))
493 except db_base_Exception
as e
:
494 self
.logger
.error("Error updating database %s", str(e
))
496 return False, error_text
497 new_refresh_task
= {"status": "enqueued",
501 "vim_info": {"interfaces":[]} }
502 self
._insert
_refresh
(new_refresh_task
, time
.time())
504 except vimconn
.vimconnException
as e
:
505 self
.logger
.error("Error creating VM, task=%s: %s", str(task_id
), str(e
))
508 self
.db
.update_rows("instance_vms",
509 UPDATE
={"error_msg": self
._format
_vim
_error
_msg
(str(e
)), "status": "VIM_ERROR"},
510 WHERE
={"vim_vm_id": task_id
})
511 except db_base_Exception
as edb
:
512 self
.logger
.error("Error updating database %s", str(edb
))
515 def del_vm(self
, task
):
516 vm_id
= task
["params"][0]
517 interfaces
= task
["params"][1]
518 if is_task_id(vm_id
):
520 task_create
= task
["depends"][vm_id
]
522 if task_create
["status"] == "error":
523 return True, "VM was not created. It has error: " + str(task_create
["result"])
524 elif task_create
["status"] == "enqueued" or task_create
["status"] == "processing":
525 return False, "Cannot delete VM vim_id={} because still creating".format(vm_id
)
526 vm_id
= task_create
["result"]
527 except Exception as e
:
528 return False, "Error trying to get task_id='{}':".format(vm_id
, str(e
))
530 self
._remove
_refresh
("get-vm", vm_id
)
531 for iface
in interfaces
:
532 if iface
.get("sdn_port_id"):
534 self
.ovim
.delete_port(iface
["sdn_port_id"])
535 except ovimException
as e
:
536 self
.logger
.error("ovimException deleting external_port={} at VM vim_id={} deletion ".format(
537 iface
["sdn_port_id"], vm_id
) + str(e
), exc_info
=True)
538 # TODO Set error_msg at instance_nets
540 return True, self
.vim
.delete_vminstance(vm_id
)
541 except vimconn
.vimconnException
as e
:
544 def del_net(self
, task
):
545 net_id
= task
["params"][0]
546 sdn_net_id
= task
["params"][1]
547 if is_task_id(net_id
):
549 task_create
= task
["depends"][net_id
]
551 if task_create
["status"] == "error":
552 return True, "net was not created. It has error: " + str(task_create
["result"])
553 elif task_create
["status"] == "enqueued" or task_create
["status"] == "processing":
554 return False, "Cannot delete net because still creating"
555 net_id
= task_create
["result"]
556 except Exception as e
:
557 return False, "Error trying to get task_id='{}':".format(net_id
, str(e
))
559 self
._remove
_refresh
("get-net", net_id
)
560 result
= self
.vim
.delete_network(net_id
)
562 #Delete any attached port to this sdn network
563 #At this point, there will be ports associated to this network in case it was manually done using 'openmano vim-net-sdn-attach'
565 port_list
= self
.ovim
.get_ports(columns
={'uuid'}, filter={'name': 'external_port', 'net_id': sdn_net_id
})
566 except ovimException
as e
:
567 raise vimconn
.vimconnException(
568 "ovimException obtaining external ports for net {}. ".format(sdn_net_id
) + str(e
))
570 for port
in port_list
:
572 self
.ovim
.delete_port(port
['uuid'])
573 except ovimException
as e
:
574 raise vimconn
.vimconnException(
575 "ovimException deleting port {} for net {}. ".format(port
['uuid'], sdn_net_id
) + str(e
))
577 self
.ovim
.delete_network(sdn_net_id
)
579 except vimconn
.vimconnException
as e
:
581 except ovimException
as e
:
582 logging
.error("Error deleting network from ovim. net_id: {}, sdn_net_id: {}".format(net_id
, sdn_net_id
))