1 # -*- coding: utf-8 -*-
4 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
5 # This file is part of openvim
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
12 # http://www.apache.org/licenses/LICENSE-2.0
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
25 This is thread that interact with the host and the libvirt to manage VM
26 One thread will be launched per host
28 __author__
= "Alfonso Tierno"
29 __date__
= "$10-feb-2017 12:07:15$"
36 from db_base
import db_base_Exception
39 # from logging import Logger
40 # import auxiliary_functions as af
44 return True if id[:5] == "TASK." else False
47 class vim_thread(threading
.Thread
):
49 def __init__(self
, vimconn
, task_lock
, name
=None, datacenter_name
=None, datacenter_tenant_id
=None, db
=None, db_lock
=None):
54 'host','user': host ip or name to manage and user
55 'db', 'db_lock': database class and lock to use it in exclusion
58 """ It will contain a dictionary with
60 status: enqueued,done,error,deleted,processing
63 threading
.Thread
.__init
__(self
)
65 self
.datacenter_name
= datacenter_name
66 self
.datacenter_tenant_id
= datacenter_tenant_id
68 self
.name
= vimconn
["id"] + "." + vimconn
["config"]["datacenter_tenant_id"]
72 self
.logger
= logging
.getLogger('openmano.vim.'+self
.name
)
74 self
.db_lock
= db_lock
76 self
.task_lock
= task_lock
77 self
.task_queue
= Queue
.Queue(2000)
79 def insert_task(self
, task
):
81 self
.task_queue
.put(task
, False)
84 raise vimconn
.vimconnException(self
.name
+ ": timeout inserting a task")
86 def del_task(self
, task
):
88 if task
["status"] == "enqueued":
89 task
["status"] == "deleted"
91 else: # task["status"] == "processing"
92 self
.task_lock
.release()
96 self
.logger
.debug("Starting")
100 if not self
.task_queue
.empty():
101 task
= self
.task_queue
.get()
102 self
.task_lock
.acquire()
103 if task
["status"] == "deleted":
104 self
.task_lock
.release()
106 task
["status"] == "processing"
107 self
.task_lock
.release()
112 self
.logger
.debug("processing task id={} name={} params={}".format(task
["id"], task
["name"],
113 str(task
["params"])))
114 if task
["name"] == 'exit' or task
["name"] == 'reload':
115 result
, content
= self
.terminate(task
)
116 elif task
["name"] == 'new-vm':
117 result
, content
= self
.new_vm(task
)
118 elif task
["name"] == 'del-vm':
119 result
, content
= self
.del_vm(task
)
120 elif task
["name"] == 'new-net':
121 result
, content
= self
.new_net(task
)
122 elif task
["name"] == 'del-net':
123 result
, content
= self
.del_net(task
)
125 error_text
= "unknown task {}".format(task
["name"])
126 self
.logger
.error(error_text
)
131 task
["status"] = "done" if result
else "error"
132 task
["result"] = content
133 self
.task_queue
.task_done()
135 if task
["name"] == 'exit':
137 elif task
["name"] == 'reload':
140 self
.logger
.debug("Finishing")
142 def terminate(self
, task
):
145 def new_net(self
, task
):
148 params
= task
["params"]
149 net_id
= self
.vim
.new_network(*params
)
151 self
.db
.update_rows("instance_nets", UPDATE
={"vim_net_id": net_id
}, WHERE
={"vim_net_id": task_id
})
153 except db_base_Exception
as e
:
154 self
.logger
.error("Error updating database %s", str(e
))
156 except vimconn
.vimconnException
as e
:
159 def new_vm(self
, task
):
161 params
= task
["params"]
163 depends
= task
.get("depends")
166 if is_task_id(net
["net_id"]): # change task_id into network_id
168 task_net
= depends
[net
["net_id"]]
170 if task_net
["status"] == "error":
171 return False, "Cannot create VM because depends on a network that cannot be created: " + \
172 str(task_net
["result"])
173 elif task_net
["status"] == "enqueued" or task_net
["status"] == "processing":
174 return False, "Cannot create VM because depends on a network still not created"
175 network_id
= task_net
["result"]
176 net
["net_id"] = network_id
177 except Exception as e
:
178 return False, "Error trying to map from task_id={} to task result: {}".format(net
["net_id"],
180 vm_id
= self
.vim
.new_vminstance(*params
)
182 self
.db
.update_rows("instance_vms", UPDATE
={"vim_vm_id": vm_id
}, WHERE
={"vim_vm_id": task_id
})
184 except db_base_Exception
as e
:
185 self
.logger
.error("Error updtaing database %s", str(e
))
187 except vimconn
.vimconnException
as e
:
190 def del_vm(self
, task
):
191 vm_id
= task
["params"]
192 if is_task_id(vm_id
):
194 task_create
= task
["depends"][vm_id
]
196 if task_create
["status"] == "error":
197 return True, "VM was not created. It has error: " + str(task_create
["result"])
198 elif task_create
["status"] == "enqueued" or task_create
["status"] == "processing":
199 return False, "Cannot delete VM because still creating"
200 vm_id
= task_create
["result"]
201 except Exception as e
:
202 return False, "Error trying to get task_id='{}':".format(vm_id
, str(e
))
204 return True, self
.vim
.delete_vminstance(vm_id
)
205 except vimconn
.vimconnException
as e
:
208 def del_net(self
, task
):
209 net_id
= task
["params"]
210 if is_task_id(net_id
):
212 task_create
= task
["depends"][net_id
]
214 if task_create
["status"] == "error":
215 return True, "net was not created. It has error: " + str(task_create
["result"])
216 elif task_create
["status"] == "enqueued" or task_create
["status"] == "processing":
217 return False, "Cannot delete net because still creating"
218 net_id
= task_create
["result"]
219 except Exception as e
:
220 return False, "Error trying to get task_id='{}':".format(net_id
, str(e
))
222 return True, self
.vim
.delete_network(net_id
)
223 except vimconn
.vimconnException
as e
: