676ba207e28f881f4825c2a3be0d2a055466f03d
1 # -*- coding: utf-8 -*-
4 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
5 # This file is part of openvim
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
12 # http://www.apache.org/licenses/LICENSE-2.0
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
25 This is thread that interact with the host and the libvirt to manage VM
26 One thread will be launched per host
28 __author__
= "Alfonso Tierno, Pablo Montes"
29 __date__
= "$10-feb-2017 12:07:15$"
36 from db_base
import db_base_Exception
37 #from openvim.ovim import ovimException
40 # from logging import Logger
41 # import auxiliary_functions as af
45 return True if id[:5] == "TASK." else False
48 class vim_thread(threading
.Thread
):
50 def __init__(self
, vimconn
, task_lock
, name
=None, datacenter_name
=None, datacenter_tenant_id
=None, db
=None, db_lock
=None, ovim
=None):
55 'host','user': host ip or name to manage and user
56 'db', 'db_lock': database class and lock to use it in exclusion
59 """ It will contain a dictionary with
61 status: enqueued,done,error,deleted,processing
64 threading
.Thread
.__init
__(self
)
66 self
.datacenter_name
= datacenter_name
67 self
.datacenter_tenant_id
= datacenter_tenant_id
70 self
.name
= vimconn
["id"] + "." + vimconn
["config"]["datacenter_tenant_id"]
74 self
.logger
= logging
.getLogger('openmano.vim.'+self
.name
)
76 self
.db_lock
= db_lock
78 self
.task_lock
= task_lock
79 self
.task_queue
= Queue
.Queue(2000)
81 def insert_task(self
, task
):
83 self
.task_queue
.put(task
, False)
86 raise vimconn
.vimconnException(self
.name
+ ": timeout inserting a task")
88 def del_task(self
, task
):
90 if task
["status"] == "enqueued":
91 task
["status"] == "deleted"
93 else: # task["status"] == "processing"
94 self
.task_lock
.release()
98 self
.logger
.debug("Starting")
102 if not self
.task_queue
.empty():
103 task
= self
.task_queue
.get()
104 self
.task_lock
.acquire()
105 if task
["status"] == "deleted":
106 self
.task_lock
.release()
108 task
["status"] == "processing"
109 self
.task_lock
.release()
114 self
.logger
.debug("processing task id={} name={} params={}".format(task
["id"], task
["name"],
115 str(task
["params"])))
116 if task
["name"] == 'exit' or task
["name"] == 'reload':
117 result
, content
= self
.terminate(task
)
118 elif task
["name"] == 'new-vm':
119 result
, content
= self
.new_vm(task
)
120 elif task
["name"] == 'del-vm':
121 result
, content
= self
.del_vm(task
)
122 elif task
["name"] == 'new-net':
123 result
, content
= self
.new_net(task
)
124 elif task
["name"] == 'del-net':
125 result
, content
= self
.del_net(task
)
127 error_text
= "unknown task {}".format(task
["name"])
128 self
.logger
.error(error_text
)
133 task
["status"] = "done" if result
else "error"
134 task
["result"] = content
135 self
.task_queue
.task_done()
137 if task
["name"] == 'exit':
139 elif task
["name"] == 'reload':
142 self
.logger
.debug("Finishing")
144 def terminate(self
, task
):
147 def _format_vim_error_msg(self
, error_text
):
148 if len(error_text
) >= 1024:
149 return error_text
[:516] + " ... " + error_text
[-500:]
152 def new_net(self
, task
):
155 params
= task
["params"]
156 net_id
= self
.vim
.new_network(*params
)
162 #sdn_controller = self.vim.config.get('sdn-controller')
163 sdn_controller
= None
164 if sdn_controller
and (net_type
== "data" or net_type
== "ptp"):
165 network
= {"name": net_name
, "type": net_type
}
167 vim_net
= self
.vim
.get_network(net_id
)
168 if vim_net
.get('encapsulation') != 'vlan':
169 raise vimconn
.vimconnException(net_name
+ "defined as type " + net_type
+ " but the created network in vim is " + vim_net
['encapsulation'])
171 network
["vlan"] = vim_net
.get('segmentation_id')
176 sdn_net_id
= self
.ovim
.new_network(network
)
177 self
.db
.update_rows("instance_nets", UPDATE
={"vim_net_id": net_id
, "sdn_net_id": sdn_net_id
}, WHERE
={"vim_net_id": task_id
})
180 except db_base_Exception
as e
:
181 self
.logger
.error("Error updating database %s", str(e
))
183 except vimconn
.vimconnException
as e
:
184 self
.logger
.error("Error creating NET, task=%s: %s", str(task_id
), str(e
))
187 self
.db
.update_rows("instance_nets",
188 UPDATE
={"error_msg": self
._format
_vim
_error
_msg
(str(e
)), "status": "VIM_ERROR"},
189 WHERE
={"vim_net_id": task_id
})
190 except db_base_Exception
as e
:
191 self
.logger
.error("Error updating database %s", str(e
))
193 # except ovimException as e:
194 # self.logger.error("Error creating NET in ovim, task=%s: %s", str(task_id), str(e))
195 # return False, str(e)
197 def new_vm(self
, task
):
199 params
= task
["params"]
201 depends
= task
.get("depends")
204 if is_task_id(net
["net_id"]): # change task_id into network_id
206 task_net
= depends
[net
["net_id"]]
208 if task_net
["status"] == "error":
209 return False, "Cannot create VM because depends on a network that cannot be created: " + \
210 str(task_net
["result"])
211 elif task_net
["status"] == "enqueued" or task_net
["status"] == "processing":
212 return False, "Cannot create VM because depends on a network still not created"
213 network_id
= task_net
["result"]
214 net
["net_id"] = network_id
215 except Exception as e
:
216 return False, "Error trying to map from task_id={} to task result: {}".format(net
["net_id"],
218 vm_id
= self
.vim
.new_vminstance(*params
)
221 self
.db
.update_rows("instance_vms", UPDATE
={"vim_vm_id": vm_id
}, WHERE
={"vim_vm_id": task_id
})
222 except db_base_Exception
as e
:
223 self
.logger
.error("Error updating database %s", str(e
))
225 except vimconn
.vimconnException
as e
:
226 self
.logger
.error("Error creating VM, task=%s: %s", str(task_id
), str(e
))
229 self
.db
.update_rows("instance_vms",
230 UPDATE
={"error_msg": self
._format
_vim
_error
_msg
(str(e
)), "status": "VIM_ERROR"},
231 WHERE
={"vim_vm_id": task_id
})
232 except db_base_Exception
as e
:
233 self
.logger
.error("Error updating database %s", str(e
))
236 def del_vm(self
, task
):
237 vm_id
= task
["params"]
238 if is_task_id(vm_id
):
240 task_create
= task
["depends"][vm_id
]
242 if task_create
["status"] == "error":
243 return True, "VM was not created. It has error: " + str(task_create
["result"])
244 elif task_create
["status"] == "enqueued" or task_create
["status"] == "processing":
245 return False, "Cannot delete VM because still creating"
246 vm_id
= task_create
["result"]
247 except Exception as e
:
248 return False, "Error trying to get task_id='{}':".format(vm_id
, str(e
))
250 return True, self
.vim
.delete_vminstance(vm_id
)
251 except vimconn
.vimconnException
as e
:
254 def del_net(self
, task
):
255 net_id
= task
["params"][0]
256 #sdn_net_id = task["params"][1]
258 if is_task_id(net_id
):
260 task_create
= task
["depends"][net_id
]
262 if task_create
["status"] == "error":
263 return True, "net was not created. It has error: " + str(task_create
["result"])
264 elif task_create
["status"] == "enqueued" or task_create
["status"] == "processing":
265 return False, "Cannot delete net because still creating"
266 net_id
= task_create
["result"]
267 except Exception as e
:
268 return False, "Error trying to get task_id='{}':".format(net_id
, str(e
))
270 result
= self
.vim
.delete_network(net_id
)
273 self
.ovim
.delete_network(sdn_net_id
)
275 except vimconn
.vimconnException
as e
:
277 # except ovimException as e:
278 # logging.error("Error deleting network from ovim. net_id: {}, sdn_net_id: {}".format(net_id, sdn_net_id))
279 # return False, str(e)