1 # -*- coding: utf-8 -*-
4 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
5 # This file is part of openvim
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
12 # http://www.apache.org/licenses/LICENSE-2.0
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
25 This is thread that interact with the host and the libvirt to manage VM
26 One thread will be launched per host
28 __author__
= "Alfonso Tierno"
29 __date__
= "$10-feb-2017 12:07:15$"
36 from db_base
import db_base_Exception
39 # from logging import Logger
40 # import auxiliary_functions as af
44 return True if id[:5] == "TASK." else False
47 class vim_thread(threading
.Thread
):
49 def __init__(self
, vimconn
, task_lock
, name
=None, datacenter_name
=None, datacenter_tenant_id
=None, db
=None, db_lock
=None):
54 'host','user': host ip or name to manage and user
55 'db', 'db_lock': database class and lock to use it in exclusion
58 """ It will contain a dictionary with
60 status: enqueued,done,error,deleted,processing
63 threading
.Thread
.__init
__(self
)
65 self
.datacenter_name
= datacenter_name
66 self
.datacenter_tenant_id
= datacenter_tenant_id
68 self
.name
= vimconn
["id"] + "." + vimconn
["config"]["datacenter_tenant_id"]
72 self
.logger
= logging
.getLogger('openmano.vim.'+self
.name
)
74 self
.db_lock
= db_lock
76 self
.task_lock
= task_lock
77 self
.task_queue
= Queue
.Queue(2000)
79 def insert_task(self
, task
):
81 self
.task_queue
.put(task
, False)
84 raise vimconn
.vimconnException(self
.name
+ ": timeout inserting a task")
86 def del_task(self
, task
):
88 if task
["status"] == "enqueued":
89 task
["status"] == "deleted"
91 else: # task["status"] == "processing"
92 self
.task_lock
.release()
96 self
.logger
.debug("Starting")
100 if not self
.task_queue
.empty():
101 task
= self
.task_queue
.get()
102 self
.task_lock
.acquire()
103 if task
["status"] == "deleted":
104 self
.task_lock
.release()
106 task
["status"] == "processing"
107 self
.task_lock
.release()
112 self
.logger
.debug("processing task id={} name={} params={}".format(task
["id"], task
["name"],
113 str(task
["params"])))
114 if task
["name"] == 'exit' or task
["name"] == 'reload':
115 result
, content
= self
.terminate(task
)
116 elif task
["name"] == 'new-vm':
117 result
, content
= self
.new_vm(task
)
118 elif task
["name"] == 'del-vm':
119 result
, content
= self
.del_vm(task
)
120 elif task
["name"] == 'new-net':
121 result
, content
= self
.new_net(task
)
122 elif task
["name"] == 'del-net':
123 result
, content
= self
.del_net(task
)
125 error_text
= "unknown task {}".format(task
["name"])
126 self
.logger
.error(error_text
)
131 task
["status"] = "done" if result
else "error"
132 task
["result"] = content
133 self
.task_queue
.task_done()
135 if task
["name"] == 'exit':
137 elif task
["name"] == 'reload':
140 self
.logger
.debug("Finishing")
142 def terminate(self
, task
):
145 def _format_vim_error_msg(self
, error_text
):
146 if len(error_text
) >= 1024:
147 return error_text
[:516] + " ... " + error_text
[-500:]
150 def new_net(self
, task
):
153 params
= task
["params"]
154 net_id
= self
.vim
.new_network(*params
)
157 self
.db
.update_rows("instance_nets", UPDATE
={"vim_net_id": net_id
}, WHERE
={"vim_net_id": task_id
})
158 except db_base_Exception
as e
:
159 self
.logger
.error("Error updating database %s", str(e
))
161 except vimconn
.vimconnException
as e
:
162 self
.logger
.error("Error creating NET, task=%s: %s", str(task_id
), str(e
))
165 self
.db
.update_rows("instance_nets",
166 UPDATE
={"error_msg": self
._format
_vim
_error
_msg
(str(e
)), "status": "VIM_ERROR"},
167 WHERE
={"vim_net_id": task_id
})
168 except db_base_Exception
as e
:
169 self
.logger
.error("Error updating database %s", str(e
))
172 def new_vm(self
, task
):
174 params
= task
["params"]
176 depends
= task
.get("depends")
179 if is_task_id(net
["net_id"]): # change task_id into network_id
181 task_net
= depends
[net
["net_id"]]
183 if task_net
["status"] == "error":
184 return False, "Cannot create VM because depends on a network that cannot be created: " + \
185 str(task_net
["result"])
186 elif task_net
["status"] == "enqueued" or task_net
["status"] == "processing":
187 return False, "Cannot create VM because depends on a network still not created"
188 network_id
= task_net
["result"]
189 net
["net_id"] = network_id
190 except Exception as e
:
191 return False, "Error trying to map from task_id={} to task result: {}".format(net
["net_id"],
193 vm_id
= self
.vim
.new_vminstance(*params
)
196 self
.db
.update_rows("instance_vms", UPDATE
={"vim_vm_id": vm_id
}, WHERE
={"vim_vm_id": task_id
})
197 except db_base_Exception
as e
:
198 self
.logger
.error("Error updating database %s", str(e
))
200 except vimconn
.vimconnException
as e
:
201 self
.logger
.error("Error creating VM, task=%s: %s", str(task_id
), str(e
))
204 self
.db
.update_rows("instance_vms",
205 UPDATE
={"error_msg": self
._format
_vim
_error
_msg
(str(e
)), "status": "VIM_ERROR"},
206 WHERE
={"vim_vm_id": task_id
})
207 except db_base_Exception
as e
:
208 self
.logger
.error("Error updating database %s", str(e
))
211 def del_vm(self
, task
):
212 vm_id
= task
["params"]
213 if is_task_id(vm_id
):
215 task_create
= task
["depends"][vm_id
]
217 if task_create
["status"] == "error":
218 return True, "VM was not created. It has error: " + str(task_create
["result"])
219 elif task_create
["status"] == "enqueued" or task_create
["status"] == "processing":
220 return False, "Cannot delete VM because still creating"
221 vm_id
= task_create
["result"]
222 except Exception as e
:
223 return False, "Error trying to get task_id='{}':".format(vm_id
, str(e
))
225 return True, self
.vim
.delete_vminstance(vm_id
)
226 except vimconn
.vimconnException
as e
:
229 def del_net(self
, task
):
230 net_id
= task
["params"]
231 if is_task_id(net_id
):
233 task_create
= task
["depends"][net_id
]
235 if task_create
["status"] == "error":
236 return True, "net was not created. It has error: " + str(task_create
["result"])
237 elif task_create
["status"] == "enqueued" or task_create
["status"] == "processing":
238 return False, "Cannot delete net because still creating"
239 net_id
= task_create
["result"]
240 except Exception as e
:
241 return False, "Error trying to get task_id='{}':".format(net_id
, str(e
))
243 return True, self
.vim
.delete_network(net_id
)
244 except vimconn
.vimconnException
as e
: