d89c4d141af69984ed3838c01aa734682968a855
1 # -*- coding: utf-8 -*-
4 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
5 # This file is part of openvim
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
12 # http://www.apache.org/licenses/LICENSE-2.0
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
25 This is thread that interact with the host and the libvirt to manage VM
26 One thread will be launched per host
28 __author__
= "Alfonso Tierno, Pablo Montes"
29 __date__
= "$10-feb-2017 12:07:15$"
36 from db_base
import db_base_Exception
37 from openvim
.ovim
import ovimException
40 # from logging import Logger
41 # import auxiliary_functions as af
45 return True if id[:5] == "TASK." else False
48 class vim_thread(threading
.Thread
):
50 def __init__(self
, vimconn
, task_lock
, name
=None, datacenter_name
=None, datacenter_tenant_id
=None, db
=None, db_lock
=None, ovim
=None):
55 'host','user': host ip or name to manage and user
56 'db', 'db_lock': database class and lock to use it in exclusion
59 """ It will contain a dictionary with
61 status: enqueued,done,error,deleted,processing
64 threading
.Thread
.__init
__(self
)
66 self
.datacenter_name
= datacenter_name
67 self
.datacenter_tenant_id
= datacenter_tenant_id
70 self
.name
= vimconn
["id"] + "." + vimconn
["config"]["datacenter_tenant_id"]
74 self
.logger
= logging
.getLogger('openmano.vim.'+self
.name
)
76 self
.db_lock
= db_lock
78 self
.task_lock
= task_lock
79 self
.task_queue
= Queue
.Queue(2000)
81 def insert_task(self
, task
):
83 self
.task_queue
.put(task
, False)
86 raise vimconn
.vimconnException(self
.name
+ ": timeout inserting a task")
88 def del_task(self
, task
):
90 if task
["status"] == "enqueued":
91 task
["status"] == "deleted"
93 else: # task["status"] == "processing"
94 self
.task_lock
.release()
98 self
.logger
.debug("Starting")
102 if not self
.task_queue
.empty():
103 task
= self
.task_queue
.get()
104 self
.task_lock
.acquire()
105 if task
["status"] == "deleted":
106 self
.task_lock
.release()
108 task
["status"] == "processing"
109 self
.task_lock
.release()
114 self
.logger
.debug("processing task id={} name={} params={}".format(task
["id"], task
["name"],
115 str(task
["params"])))
116 if task
["name"] == 'exit' or task
["name"] == 'reload':
117 result
, content
= self
.terminate(task
)
118 elif task
["name"] == 'new-vm':
119 result
, content
= self
.new_vm(task
)
120 elif task
["name"] == 'del-vm':
121 result
, content
= self
.del_vm(task
)
122 elif task
["name"] == 'new-net':
123 result
, content
= self
.new_net(task
)
124 elif task
["name"] == 'del-net':
125 result
, content
= self
.del_net(task
)
127 error_text
= "unknown task {}".format(task
["name"])
128 self
.logger
.error(error_text
)
133 task
["status"] = "done" if result
else "error"
134 task
["result"] = content
135 self
.task_queue
.task_done()
137 if task
["name"] == 'exit':
139 elif task
["name"] == 'reload':
142 self
.logger
.debug("Finishing")
144 def terminate(self
, task
):
147 def _format_vim_error_msg(self
, error_text
):
148 if len(error_text
) >= 1024:
149 return error_text
[:516] + " ... " + error_text
[-500:]
152 def new_net(self
, task
):
155 params
= task
["params"]
156 net_id
= self
.vim
.new_network(*params
)
162 sdn_controller
= self
.vim
.config
.get('sdn-controller')
163 if sdn_controller
and (net_type
== "data" or net_type
== "ptp"):
164 network
= {"name": net_name
, "type": net_type
}
166 vim_net
= self
.vim
.get_network(net_id
)
167 if vim_net
.get('network_type') != 'vlan':
168 raise vimconn
.vimconnException(net_name
+ "defined as type " + net_type
+ " but the created network in vim is " + vim_net
['provider:network_type'])
170 network
["vlan"] = vim_net
.get('segmentation_id')
175 sdn_net_id
= self
.ovim
.new_network(network
)
176 self
.db
.update_rows("instance_nets", UPDATE
={"vim_net_id": net_id
, "sdn_net_id": sdn_net_id
}, WHERE
={"vim_net_id": task_id
})
179 except db_base_Exception
as e
:
180 self
.logger
.error("Error updating database %s", str(e
))
182 except vimconn
.vimconnException
as e
:
183 self
.logger
.error("Error creating NET, task=%s: %s", str(task_id
), str(e
))
186 self
.db
.update_rows("instance_nets",
187 UPDATE
={"error_msg": self
._format
_vim
_error
_msg
(str(e
)), "status": "VIM_ERROR"},
188 WHERE
={"vim_net_id": task_id
})
189 except db_base_Exception
as e
:
190 self
.logger
.error("Error updating database %s", str(e
))
192 except ovimException
as e
:
193 self
.logger
.error("Error creating NET in ovim, task=%s: %s", str(task_id
), str(e
))
196 def new_vm(self
, task
):
198 params
= task
["params"]
200 depends
= task
.get("depends")
203 if is_task_id(net
["net_id"]): # change task_id into network_id
205 task_net
= depends
[net
["net_id"]]
207 if task_net
["status"] == "error":
208 return False, "Cannot create VM because depends on a network that cannot be created: " + \
209 str(task_net
["result"])
210 elif task_net
["status"] == "enqueued" or task_net
["status"] == "processing":
211 return False, "Cannot create VM because depends on a network still not created"
212 network_id
= task_net
["result"]
213 net
["net_id"] = network_id
214 except Exception as e
:
215 return False, "Error trying to map from task_id={} to task result: {}".format(net
["net_id"],
217 vm_id
= self
.vim
.new_vminstance(*params
)
220 self
.db
.update_rows("instance_vms", UPDATE
={"vim_vm_id": vm_id
}, WHERE
={"vim_vm_id": task_id
})
221 except db_base_Exception
as e
:
222 self
.logger
.error("Error updating database %s", str(e
))
224 except vimconn
.vimconnException
as e
:
225 self
.logger
.error("Error creating VM, task=%s: %s", str(task_id
), str(e
))
228 self
.db
.update_rows("instance_vms",
229 UPDATE
={"error_msg": self
._format
_vim
_error
_msg
(str(e
)), "status": "VIM_ERROR"},
230 WHERE
={"vim_vm_id": task_id
})
231 except db_base_Exception
as e
:
232 self
.logger
.error("Error updating database %s", str(e
))
235 def del_vm(self
, task
):
236 vm_id
= task
["params"]
237 if is_task_id(vm_id
):
239 task_create
= task
["depends"][vm_id
]
241 if task_create
["status"] == "error":
242 return True, "VM was not created. It has error: " + str(task_create
["result"])
243 elif task_create
["status"] == "enqueued" or task_create
["status"] == "processing":
244 return False, "Cannot delete VM because still creating"
245 vm_id
= task_create
["result"]
246 except Exception as e
:
247 return False, "Error trying to get task_id='{}':".format(vm_id
, str(e
))
249 return True, self
.vim
.delete_vminstance(vm_id
)
250 except vimconn
.vimconnException
as e
:
253 def del_net(self
, task
):
254 net_id
= task
["params"][0]
255 sdn_net_id
= task
["params"][1]
256 if is_task_id(net_id
):
258 task_create
= task
["depends"][net_id
]
260 if task_create
["status"] == "error":
261 return True, "net was not created. It has error: " + str(task_create
["result"])
262 elif task_create
["status"] == "enqueued" or task_create
["status"] == "processing":
263 return False, "Cannot delete net because still creating"
264 net_id
= task_create
["result"]
265 except Exception as e
:
266 return False, "Error trying to get task_id='{}':".format(net_id
, str(e
))
268 result
= self
.vim
.delete_network(net_id
)
271 self
.ovim
.delete_network(sdn_net_id
)
273 except vimconn
.vimconnException
as e
:
275 except ovimException
as e
:
276 logging
.error("Error deleting network from ovim. net_id: {}, sdn_net_id: {}".format(net_id
, sdn_net_id
))