1 # -*- coding: utf-8 -*-
4 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
5 # This file is part of openvim
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
12 # http://www.apache.org/licenses/LICENSE-2.0
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
25 This is thread that interact with the host and the libvirt to manage VM
26 One thread will be launched per host
28 __author__
= "Alfonso Tierno, Pablo Montes"
29 __date__
= "$10-feb-2017 12:07:15$"
36 from db_base
import db_base_Exception
37 from openvim
.ovim
import ovimException
40 # from logging import Logger
41 # import auxiliary_functions as af
45 return True if id[:5] == "TASK." else False
48 class vim_thread(threading
.Thread
):
50 def __init__(self
, vimconn
, task_lock
, name
=None, datacenter_name
=None, datacenter_tenant_id
=None, db
=None, db_lock
=None, ovim
=None):
55 'host','user': host ip or name to manage and user
56 'db', 'db_lock': database class and lock to use it in exclusion
59 """ It will contain a dictionary with
61 status: enqueued,done,error,deleted,processing
64 threading
.Thread
.__init
__(self
)
66 self
.datacenter_name
= datacenter_name
67 self
.datacenter_tenant_id
= datacenter_tenant_id
70 self
.name
= vimconn
["id"] + "." + vimconn
["config"]["datacenter_tenant_id"]
74 self
.logger
= logging
.getLogger('openmano.vim.'+self
.name
)
76 self
.db_lock
= db_lock
78 self
.task_lock
= task_lock
79 self
.task_queue
= Queue
.Queue(2000)
80 self
.refresh_list
= []
81 """Contains time ordered task list for refreshing the status of VIM VMs and nets"""
83 def _refres_elements(self
):
84 """Call VIM to get VMs and networks status until 10 elements"""
86 vm_to_refresh_list
= []
87 net_to_refresh_list
= []
88 vm_to_refresh_dict
= {}
89 net_to_refresh_dict
= {}
91 while self
.refresh_list
:
92 task
= self
.refresh_list
[0]
94 if task
['status'] == 'deleted':
95 self
.refresh_list
.pop(0)
97 if task
['time'] > now
:
99 task
["status"] = "processing"
100 self
.refresh_list
.pop(0)
101 if task
["name"] == 'get-vm':
102 vm_to_refresh_list
.append(task
["vim_id"])
103 vm_to_refresh_dict
[task
["vim_id"]] = task
104 elif task
["name"] == 'get-net':
105 net_to_refresh_list
.append(task
["vim_id"])
106 net_to_refresh_dict
[task
["vim_id"]] = task
108 error_text
= "unknown task {}".format(task
["name"])
109 self
.logger
.error(error_text
)
110 items_to_refresh
+= 1
111 if items_to_refresh
== 10:
114 if vm_to_refresh_list
:
116 vim_dict
= self
.vim
.refresh_vms_status(vm_to_refresh_list
)
117 for vim_id
, vim_info
in vim_dict
.items():
119 task
= vm_to_refresh_dict
[vim_id
]
120 self
.logger
.debug("get-vm vm_id=%s result=%s", task
["vim_id"], str(vim_info
))
123 if vim_info
.get("error_msg"):
124 vim_info
["error_msg"] = self
._format
_vim
_error
_msg
(vim_info
["error_msg"])
125 if task
["vim_info"].get("status") != vim_info
["status"] or \
126 task
["vim_info"].get("error_msg") != vim_info
.get("error_msg") or \
127 task
["vim_info"].get("vim_info") != vim_info
["vim_info"]:
129 temp_dict
={ "status": vim_info
["status"],
130 "error_msg": vim_info
.get("error_msg"),
131 "vim_info": vim_info
["vim_info"]
133 self
.db
.update_rows('instance_vms', UPDATE
=temp_dict
, WHERE
={"vim_vm_id": vim_id
})
134 for interface
in vim_info
["interfaces"]:
135 for task_interface
in task
["vim_info"]["interfaces"]:
136 if task_interface
["vim_net_id"] == interface
["vim_net_id"]:
139 task_interface
= {"vim_net_id": interface
["vim_net_id"]}
140 task
["vim_info"]["interfaces"].append(task_interface
)
141 if task_interface
!= interface
:
143 if task_interface
.get("sdn_port_id"):
145 self
.ovim
.delete_port(task_interface
["sdn_port_id"])
146 task_interface
["sdn_port_id"] = None
147 except ovimException
as e
:
148 self
.logger
.error("ovimException deleting external_port={} ".format(
149 task_interface
["sdn_port_id"]) + str(e
), exc_info
=True)
150 # TODO Set error_msg at instance_nets
151 vim_net_id
= interface
.pop("vim_net_id")
155 where_
= {'iv.vim_vm_id': vim_id
, "ine.vim_net_id": vim_net_id
,
156 'ine.datacenter_tenant_id': self
.datacenter_tenant_id
}
157 # TODO check why vim_interface_id is not present at database
158 # if interface.get("vim_interface_id"):
159 # where_["vim_interface_id"] = interface["vim_interface_id"]
160 db_ifaces
= self
.db
.get_rows(
161 FROM
="instance_interfaces as ii left join instance_nets as ine on "
162 "ii.instance_net_id=ine.uuid left join instance_vms as iv on "
163 "ii.instance_vm_id=iv.uuid",
164 SELECT
=("ii.uuid as iface_id", "ine.uuid as net_id", "iv.uuid as vm_id", "sdn_net_id"),
167 self
.logger
.error("Refresing interfaces. "
168 "Found more than one interface at database for '{}'".format(where_
))
169 elif len(db_ifaces
)==0:
170 self
.logger
.error("Refresing interfaces. "
171 "Not found any interface at database for '{}'".format(where_
))
173 db_iface
= db_ifaces
[0]
174 if db_iface
.get("sdn_net_id") and interface
.get("compute_node") and interface
.get("pci"):
175 sdn_net_id
= db_iface
["sdn_net_id"]
176 sdn_port_name
= sdn_net_id
+ "." + db_iface
["vm_id"]
177 sdn_port_name
= sdn_port_name
[:63]
179 sdn_port_id
= self
.ovim
.new_external_port(
180 {"compute_node": interface
["compute_node"],
181 "pci": interface
["pci"],
182 "vlan": interface
.get("vlan"),
183 "net_id": sdn_net_id
,
184 "region": self
.vim
["config"]["datacenter_id"],
185 "name": sdn_port_name
})
186 interface
["sdn_port_id"] = sdn_port_id
187 except (ovimException
, Exception) as e
:
189 "ovimException creating new_external_port compute_node={} " \
190 "pci={} vlan={} ".format(
191 interface
["compute_node"],
193 interface
.get("vlan")) + str(e
),
195 # TODO Set error_msg at instance_nets
197 self
.db
.update_rows('instance_interfaces', UPDATE
=interface
,
198 WHERE
={'uuid': db_iface
["iface_id"]})
199 # TODO insert instance_id
200 interface
["vim_net_id"] = vim_net_id
202 task
["vim_info"] = vim_info
203 if "ACTIVE" in task
["vim_info"]["status"]:
204 self
._insert
_refresh
(task
, now
+300) # 5minutes
206 self
._insert
_refresh
(task
, now
+5) # 5seconds
207 except vimconn
.vimconnException
as e
:
208 self
.logger
.error("vimconnException Exception when trying to refresh vms " + str(e
))
209 self
._insert
_refresh
(task
, now
+ 300) # 5minutes
211 if not items_to_refresh
:
214 def _insert_refresh(self
, task
, threshold_time
):
215 """Insert a task at list of refreshing elements. The refreshing list is ordered by threshold_time (task['time']
216 It is assumed that this is called inside this thread
218 task
["time"] = threshold_time
219 for index
in range(0, len(self
.refresh_list
)):
220 if self
.refresh_list
[index
]["time"] > threshold_time
:
221 self
.refresh_list
.insert(index
, task
)
224 index
= len(self
.refresh_list
)
225 self
.refresh_list
.append(task
)
226 self
.logger
.debug("new refresh task={} name={}, time={} index={}".format(
227 task
["id"], task
["name"], task
["time"], index
))
229 def _remove_refresh(self
, task_name
, vim_id
):
230 """Remove a task with this name and vim_id from the list of refreshing elements.
231 It is assumed that this is called inside this thread outside _refres_elements method
232 Return True if self.refresh_list is modified, task is found
233 Return False if not found
235 index_to_delete
= None
236 for index
in range(0, len(self
.refresh_list
)):
237 if self
.refresh_list
[index
]["name"] == task_name
and self
.refresh_list
[index
]["vim_id"] == vim_id
:
238 index_to_delete
= index
242 if index_to_delete
!= None:
243 del self
.refresh_list
[index_to_delete
]
246 def insert_task(self
, task
):
248 self
.task_queue
.put(task
, False)
251 raise vimconn
.vimconnException(self
.name
+ ": timeout inserting a task")
253 def del_task(self
, task
):
255 if task
["status"] == "enqueued":
256 task
["status"] == "deleted"
258 else: # task["status"] == "processing"
259 self
.task_lock
.release()
263 self
.logger
.debug("Starting")
267 if not self
.task_queue
.empty():
268 task
= self
.task_queue
.get()
269 self
.task_lock
.acquire()
270 if task
["status"] == "deleted":
271 self
.task_lock
.release()
273 task
["status"] = "processing"
274 self
.task_lock
.release()
276 self
._refres
_elements
()
278 self
.logger
.debug("processing task id={} name={} params={}".format(task
["id"], task
["name"],
279 str(task
["params"])))
280 if task
["name"] == 'exit' or task
["name"] == 'reload':
281 result
, content
= self
.terminate(task
)
282 elif task
["name"] == 'new-vm':
283 result
, content
= self
.new_vm(task
)
284 elif task
["name"] == 'del-vm':
285 result
, content
= self
.del_vm(task
)
286 elif task
["name"] == 'new-net':
287 result
, content
= self
.new_net(task
)
288 elif task
["name"] == 'del-net':
289 result
, content
= self
.del_net(task
)
291 error_text
= "unknown task {}".format(task
["name"])
292 self
.logger
.error(error_text
)
295 self
.logger
.debug("task id={} name={} result={}:{} params={}".format(task
["id"], task
["name"],
297 str(task
["params"])))
300 task
["status"] = "done" if result
else "error"
301 task
["result"] = content
302 self
.task_queue
.task_done()
304 if task
["name"] == 'exit':
306 elif task
["name"] == 'reload':
309 self
.logger
.debug("Finishing")
311 def terminate(self
, task
):
314 def _format_vim_error_msg(self
, error_text
):
315 if error_text
and len(error_text
) >= 1024:
316 return error_text
[:516] + " ... " + error_text
[-500:]
319 def new_net(self
, task
):
322 params
= task
["params"]
323 net_id
= self
.vim
.new_network(*params
)
329 sdn_controller
= self
.vim
.config
.get('sdn-controller')
330 if sdn_controller
and (net_type
== "data" or net_type
== "ptp"):
331 network
= {"name": net_name
, "type": net_type
}
333 vim_net
= self
.vim
.get_network(net_id
)
334 if vim_net
.get('encapsulation') != 'vlan':
335 raise vimconn
.vimconnException(net_name
+ "defined as type " + net_type
+ " but the created network in vim is " + vim_net
['encapsulation'])
337 network
["vlan"] = vim_net
.get('segmentation_id')
342 sdn_net_id
= self
.ovim
.new_network(network
)
343 self
.db
.update_rows("instance_nets", UPDATE
={"vim_net_id": net_id
, "sdn_net_id": sdn_net_id
}, WHERE
={"vim_net_id": task_id
})
346 except db_base_Exception
as e
:
347 self
.logger
.error("Error updating database %s", str(e
))
349 except vimconn
.vimconnException
as e
:
350 self
.logger
.error("Error creating NET, task=%s: %s", str(task_id
), str(e
))
353 self
.db
.update_rows("instance_nets",
354 UPDATE
={"error_msg": self
._format
_vim
_error
_msg
(str(e
)), "status": "VIM_ERROR"},
355 WHERE
={"vim_net_id": task_id
})
356 except db_base_Exception
as e
:
357 self
.logger
.error("Error updating database %s", str(e
))
359 except ovimException
as e
:
360 self
.logger
.error("Error creating NET in ovim, task=%s: %s", str(task_id
), str(e
))
363 def new_vm(self
, task
):
365 params
= task
["params"]
367 depends
= task
.get("depends")
371 if "net_id" in net
and is_task_id(net
["net_id"]): # change task_id into network_id
373 task_net
= depends
[net
["net_id"]]
375 if task_net
["status"] == "error":
376 error_text
= "Cannot create VM because depends on a network that cannot be created: " +\
377 str(task_net
["result"])
379 elif task_net
["status"] == "enqueued" or task_net
["status"] == "processing":
380 error_text
= "Cannot create VM because depends on a network still not created"
382 network_id
= task_net
["result"]
383 net
["net_id"] = network_id
384 except Exception as e
:
385 error_text
= "Error trying to map from task_id={} to task result: {}".format(
386 net
["net_id"],str(e
))
389 vm_id
= self
.vim
.new_vminstance(*params
)
393 update
= self
.db
.update_rows("instance_vms",
394 UPDATE
={"status": "VIM_ERROR", "error_msg": error_text
},
395 WHERE
={"vim_vm_id": task_id
})
397 update
= self
.db
.update_rows("instance_vms", UPDATE
={"vim_vm_id": vm_id
}, WHERE
={"vim_vm_id": task_id
})
399 self
.logger
.error("task id={} name={} database not updated vim_vm_id={}".format(
400 task
["id"], task
["name"], vm_id
))
401 except db_base_Exception
as e
:
402 self
.logger
.error("Error updating database %s", str(e
))
404 return False, error_text
405 new_refresh_task
= {"status": "enqueued",
409 "vim_info": {"interfaces":[]} }
410 self
._insert
_refresh
(new_refresh_task
, time
.time())
412 except vimconn
.vimconnException
as e
:
413 self
.logger
.error("Error creating VM, task=%s: %s", str(task_id
), str(e
))
416 self
.db
.update_rows("instance_vms",
417 UPDATE
={"error_msg": self
._format
_vim
_error
_msg
(str(e
)), "status": "VIM_ERROR"},
418 WHERE
={"vim_vm_id": task_id
})
419 except db_base_Exception
as edb
:
420 self
.logger
.error("Error updating database %s", str(edb
))
423 def del_vm(self
, task
):
424 vm_id
= task
["params"][0]
425 interfaces
= task
["params"][1]
426 if is_task_id(vm_id
):
428 task_create
= task
["depends"][vm_id
]
430 if task_create
["status"] == "error":
431 return True, "VM was not created. It has error: " + str(task_create
["result"])
432 elif task_create
["status"] == "enqueued" or task_create
["status"] == "processing":
433 return False, "Cannot delete VM vim_id={} because still creating".format(vm_id
)
434 vm_id
= task_create
["result"]
435 except Exception as e
:
436 return False, "Error trying to get task_id='{}':".format(vm_id
, str(e
))
438 self
._remove
_refresh
("get-vm", vm_id
)
439 for iface
in interfaces
:
440 if iface
.get("sdn_port_id"):
442 self
.ovim
.delete_port(iface
["sdn_port_id"])
443 except ovimException
as e
:
444 self
.logger
.error("ovimException deleting external_port={} at VM vim_id={} deletion ".format(
445 iface
["sdn_port_id"], vm_id
) + str(e
), exc_info
=True)
446 # TODO Set error_msg at instance_nets
448 return True, self
.vim
.delete_vminstance(vm_id
)
449 except vimconn
.vimconnException
as e
:
452 def del_net(self
, task
):
453 net_id
= task
["params"][0]
454 sdn_net_id
= task
["params"][1]
455 if is_task_id(net_id
):
457 task_create
= task
["depends"][net_id
]
459 if task_create
["status"] == "error":
460 return True, "net was not created. It has error: " + str(task_create
["result"])
461 elif task_create
["status"] == "enqueued" or task_create
["status"] == "processing":
462 return False, "Cannot delete net because still creating"
463 net_id
= task_create
["result"]
464 except Exception as e
:
465 return False, "Error trying to get task_id='{}':".format(net_id
, str(e
))
467 result
= self
.vim
.delete_network(net_id
)
470 self
.ovim
.delete_network(sdn_net_id
)
472 except vimconn
.vimconnException
as e
:
474 except ovimException
as e
:
475 logging
.error("Error deleting network from ovim. net_id: {}, sdn_net_id: {}".format(net_id
, sdn_net_id
))