Feature 7184 New Generation RO
[osm/RO.git] / NG-RO / osm_ng_ro / ns_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 #
18 ##
19
20 """"
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
25 """
26
27 import threading
28 import time
29 import queue
30 import logging
31 from pkg_resources import iter_entry_points
32 # from osm_common import dbmongo, dbmemory, fslocal, fsmongo, msglocal, msgkafka, version as common_version
33 from osm_common.dbbase import DbException
34 # from osm_common.fsbase import FsException
35 # from osm_common.msgbase import MsgException
36 from osm_ro_plugin.vim_dummy import VimDummyConnector
37 from osm_ro_plugin import vimconn
38 from copy import deepcopy
39 from unittest.mock import Mock
40
41 __author__ = "Alfonso Tierno"
42 __date__ = "$28-Sep-2017 12:07:15$"
43
44
45 def deep_get(target_dict, *args, **kwargs):
46 """
47 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
48 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
49 :param target_dict: dictionary to be read
50 :param args: list of keys to read from target_dict
51 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
52 :return: The wanted value if exist, None or default otherwise
53 """
54 for key in args:
55 if not isinstance(target_dict, dict) or key not in target_dict:
56 return kwargs.get("default")
57 target_dict = target_dict[key]
58 return target_dict
59
60
61 class NsWorkerException(Exception):
62 pass
63
64
65 class FailingConnector:
66 def __init__(self, error_msg):
67 self.error_msg = error_msg
68 for method in dir(vimconn.VimConnector):
69 if method[0] != "_":
70 setattr(self, method, Mock(side_effect=vimconn.VimConnException(error_msg)))
71
72
73 class NsWorkerExceptionNotFound(NsWorkerException):
74 pass
75
76
77 class NsWorker(threading.Thread):
78 REFRESH_BUILD = 5 # 5 seconds
79 REFRESH_ACTIVE = 60 # 1 minute
80 REFRESH_ERROR = 600
81 REFRESH_IMAGE = 3600 * 10
82 REFRESH_DELETE = 3600 * 10
83 QUEUE_SIZE = 2000
84 # TODO delete assigment_lock = Lock()
85 terminate = False
86 # TODO delete assignment = {}
87 MAX_TIME_LOCKED = 3600
88
89 def __init__(self, worker, config, plugins, db):
90 """Init a thread.
91 Arguments:
92 'id' number of thead
93 'name' name of thread
94 'host','user': host ip or name to manage and user
95 'db', 'db_lock': database class and lock to use it in exclusion
96 """
97 threading.Thread.__init__(self)
98 self.config = config
99 self.plugins = plugins
100 self.plugin_name = "unknown"
101 self.logger = logging.getLogger('ro.worker{}'.format("worker"))
102 self.worker_id = worker
103 self.task_queue = queue.Queue(self.QUEUE_SIZE)
104 self.my_vims = {} # targetvim: vimplugin class
105 self.db_vims = {} # targetvim: vim information from database
106 self.vim_targets = [] # targetvim list
107 self.my_id = config["process_id"] + ":" + str(worker)
108 self.db = db
109 self.item2create = {
110 "net": self.new_net,
111 "vdu": self.new_vm,
112 "image": self.new_image,
113 "flavor": self.new_flavor,
114 }
115 self.item2refresh = {
116 "net": self.refresh_net,
117 "vdu": self.refresh_vm,
118 "image": self.refresh_ok,
119 "flavor": self.refresh_ok,
120 }
121 self.item2delete = {
122 "net": self.del_net,
123 "vdu": self.del_vm,
124 "image": self.delete_ok,
125 "flavor": self.del_flavor,
126 }
127 self.item2action = {
128 "vdu": self.exec_vm,
129 }
130 self.time_last_task_processed = None
131
132 def insert_task(self, task):
133 try:
134 self.task_queue.put(task, False)
135 return None
136 except queue.Full:
137 raise NsWorkerException("timeout inserting a task")
138
139 def terminate(self):
140 self.insert_task("exit")
141
142 def del_task(self, task):
143 with self.task_lock:
144 if task["status"] == "SCHEDULED":
145 task["status"] = "SUPERSEDED"
146 return True
147 else: # task["status"] == "processing"
148 self.task_lock.release()
149 return False
150
151 def _load_plugin(self, name, type="vim"):
152 # type can be vim or sdn
153 if "rovim_dummy" not in self.plugins:
154 self.plugins["rovim_dummy"] = VimDummyConnector
155 if name in self.plugins:
156 return self.plugins[name]
157 try:
158 for v in iter_entry_points('osm_ro{}.plugins'.format(type), name):
159 self.plugins[name] = v.load()
160 except Exception as e:
161 self.logger.critical("Cannot load osm_{}: {}".format(name, e))
162 if name:
163 self.plugins[name] = FailingConnector("Cannot load osm_{}: {}".format(name, e))
164 if name and name not in self.plugins:
165 error_text = "Cannot load a module for {t} type '{n}'. The plugin 'osm_{n}' has not been" \
166 " registered".format(t=type, n=name)
167 self.logger.critical(error_text)
168 self.plugins[name] = FailingConnector(error_text)
169
170 return self.plugins[name]
171
172 def _load_vim(self, vim_account_id):
173 target_id = "vim:" + vim_account_id
174 plugin_name = ""
175 vim = None
176 try:
177 step = "Getting vim={} from db".format(vim_account_id)
178 vim = self.db.get_one("vim_accounts", {"_id": vim_account_id})
179
180 # if deep_get(vim, "config", "sdn-controller"):
181 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
182 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
183
184 step = "Decrypt password"
185 schema_version = vim.get("schema_version")
186 self.db.encrypt_decrypt_fields(vim, "decrypt", fields=('password', 'secret'),
187 schema_version=schema_version, salt=vim_account_id)
188
189 step = "Load plugin 'rovim_{}'".format(vim.get("vim_type"))
190 plugin_name = "rovim_" + vim["vim_type"]
191 vim_module_conn = self._load_plugin(plugin_name)
192 self.my_vims[target_id] = vim_module_conn(
193 uuid=vim['_id'], name=vim['name'],
194 tenant_id=vim.get('vim_tenant_id'), tenant_name=vim.get('vim_tenant_name'),
195 url=vim['vim_url'], url_admin=None,
196 user=vim['vim_user'], passwd=vim['vim_password'],
197 config=vim.get('config'), persistent_info={}
198 )
199 self.vim_targets.append(target_id)
200 self.db_vims[target_id] = vim
201 self.error_status = None
202 self.logger.info("Vim Connector loaded for vim_account={}, plugin={}".format(
203 vim_account_id, plugin_name))
204 except Exception as e:
205 self.logger.error("Cannot load vimconnector for vim_account={} plugin={}: {} {}".format(
206 vim_account_id, plugin_name, step, e))
207 self.db_vims[target_id] = vim or {}
208 self.my_vims[target_id] = FailingConnector(str(e))
209 self.error_status = "Error loading vimconnector: {}".format(e)
210
211 def _get_db_task(self):
212 """
213 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
214 :return: None
215 """
216 now = time.time()
217 if not self.time_last_task_processed:
218 self.time_last_task_processed = now
219 try:
220 while True:
221 locked = self.db.set_one(
222 "ro_tasks",
223 q_filter={"target_id": self.vim_targets,
224 "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
225 "locked_at.lt": now - self.MAX_TIME_LOCKED,
226 "to_check_at.lt": self.time_last_task_processed},
227 update_dict={"locked_by": self.my_id, "locked_at": now},
228 fail_on_empty=False)
229 if locked:
230 # read and return
231 ro_task = self.db.get_one(
232 "ro_tasks",
233 q_filter={"target_id": self.vim_targets,
234 "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
235 "locked_at": now})
236 return ro_task
237 if self.time_last_task_processed == now:
238 self.time_last_task_processed = None
239 return None
240 else:
241 self.time_last_task_processed = now
242 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
243
244 except DbException as e:
245 self.logger.error("Database exception at _get_db_task: {}".format(e))
246 except Exception as e:
247 self.logger.critical("Unexpected exception at _get_db_task: {}".format(e), exc_info=True)
248 return None
249
250 def _delete_task(self, ro_task, task_index, task_depends, db_update):
251 """
252 Determine if this task need to be done or superseded
253 :return: None
254 """
255 my_task = ro_task["tasks"][task_index]
256 task_id = my_task["task_id"]
257 needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get("created_items", False)
258 if my_task["status"] == "FAILED":
259 return None, None # TODO need to be retry??
260 try:
261 for index, task in enumerate(ro_task["tasks"]):
262 if index == task_index:
263 continue # own task
264 if my_task["target_record"] == task["target_record"] and task["action"] == "CREATE":
265 # set to finished
266 db_update["tasks.{}.status".format(index)] = task["status"] = "FINISHED"
267 elif task["action"] == "CREATE" and task["status"] not in ("FINISHED", "SUPERSEDED"):
268 needed_delete = False
269 if needed_delete:
270 return self.item2delete[my_task["item"]](ro_task, task_index)
271 else:
272 return "SUPERSEDED", None
273 except Exception as e:
274 if not isinstance(e, NsWorkerException):
275 self.logger.critical("Unexpected exception at _delete_task task={}: {}".format(task_id, e),
276 exc_info=True)
277 return "FAILED", {"vim_status": "VIM_ERROR", "vim_details": str(e)}
278
279 def _create_task(self, ro_task, task_index, task_depends, db_update):
280 """
281 Determine if this task need to be created
282 :return: None
283 """
284 my_task = ro_task["tasks"][task_index]
285 task_id = my_task["task_id"]
286 task_status = None
287 if my_task["status"] == "FAILED":
288 return None, None # TODO need to be retry??
289 elif my_task["status"] == "SCHEDULED":
290 # check if already created by another task
291 for index, task in enumerate(ro_task["tasks"]):
292 if index == task_index:
293 continue # own task
294 if task["action"] == "CREATE" and task["status"] not in ("SCHEDULED", "FINISHED", "SUPERSEDED"):
295 return task["status"], "COPY_VIM_INFO"
296
297 try:
298 task_status, ro_vim_item_update = self.item2create[my_task["item"]](ro_task, task_index, task_depends)
299 # TODO update other CREATE tasks
300 except Exception as e:
301 if not isinstance(e, NsWorkerException):
302 self.logger.error("Error executing task={}: {}".format(task_id, e), exc_info=True)
303 task_status = "FAILED"
304 ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_details": str(e)}
305 # TODO update ro_vim_item_update
306 return task_status, ro_vim_item_update
307 else:
308 return None, None
309
310 def _get_dependency(self, task_id, ro_task=None, target_id=None):
311 if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
312 ro_task_dependency = self.db.get_one(
313 "ro_tasks",
314 q_filter={"target_id": target_id,
315 "tasks.target_record_id": task_id
316 },
317 fail_on_empty=False)
318 if ro_task_dependency:
319 for task_index, task in enumerate(ro_task_dependency["tasks"]):
320 if task["target_record_id"] == task_id:
321 return ro_task_dependency, task_index
322
323 else:
324 if ro_task:
325 for task_index, task in enumerate(ro_task["tasks"]):
326 if task["task_id"] == task_id:
327 return ro_task, task_index
328 ro_task_dependency = self.db.get_one(
329 "ro_tasks",
330 q_filter={"tasks.ANYINDEX.task_id": task_id,
331 "tasks.ANYINDEX.target_record.ne": None
332 },
333 fail_on_empty=False)
334 if ro_task_dependency:
335 for task_index, task in ro_task_dependency["tasks"]:
336 if task["task_id"] == task_id:
337 return ro_task_dependency, task_index
338 raise NsWorkerException("Cannot get depending task {}".format(task_id))
339
340 def _proccess_pending_tasks(self, ro_task):
341 ro_task_id = ro_task["_id"]
342 now = time.time()
343 next_check_at = now + (24*60*60) # one day
344 db_ro_task_update = {}
345
346 def _update_refresh(new_status):
347 # compute next_refresh
348 nonlocal task
349 nonlocal next_check_at
350 nonlocal db_ro_task_update
351 nonlocal ro_task
352
353 next_refresh = time.time()
354 if task["item"] in ("image", "flavor"):
355 next_refresh += self.REFRESH_IMAGE
356 elif new_status == "BUILD":
357 next_refresh += self.REFRESH_BUILD
358 elif new_status == "DONE":
359 next_refresh += self.REFRESH_ACTIVE
360 else:
361 next_refresh += self.REFRESH_ERROR
362 next_check_at = min(next_check_at, next_refresh)
363 db_ro_task_update["vim_info.refresh_at"] = next_refresh
364 ro_task["vim_info"]["refresh_at"] = next_refresh
365
366 try:
367 # 0 get task_status_create
368 task_status_create = None
369 task_create = next((t for t in ro_task["tasks"] if t["action"] == "CREATE" and
370 t["status"] in ("BUILD", "DONE")), None)
371 if task_create:
372 task_status_create = task_create["status"]
373 # 1. look for SCHEDULED or if CREATE also DONE,BUILD
374 for task_action in ("DELETE", "CREATE", "EXEC"):
375 db_vim_update = None
376 for task_index, task in enumerate(ro_task["tasks"]):
377 target_update = None
378 if (task_action in ("DELETE", "EXEC") and task["status"] != "SCHEDULED") or\
379 task["action"] != task_action or \
380 (task_action == "CREATE" and task["status"] in ("FINISHED", "SUPERSEDED")):
381 continue
382 task_path = "tasks.{}.status".format(task_index)
383 try:
384 if task["status"] == "SCHEDULED":
385 task_depends = {}
386 # check if tasks that this depends on have been completed
387 dependency_not_completed = False
388 for dependency_task_id in (task.get("depends_on") or ()):
389 dependency_ro_task, dependency_task_index = \
390 self._get_dependency(dependency_task_id, target_id=ro_task["target_id"])
391 dependency_task = dependency_ro_task["tasks"][dependency_task_index]
392 if dependency_task["status"] == "SCHEDULED":
393 dependency_not_completed = True
394 next_check_at = min(next_check_at, dependency_ro_task["to_check_at"])
395 break
396 elif dependency_task["status"] == "FAILED":
397 error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
398 task["action"], task["item"], dependency_task["action"],
399 dependency_task["item"], dependency_task_id,
400 dependency_ro_task["vim_info"].get("vim_details"))
401 self.logger.error("task={} {}".format(task["task_id"], error_text))
402 raise NsWorkerException(error_text)
403
404 task_depends[dependency_task_id] = dependency_ro_task["vim_info"]["vim_id"]
405 task_depends["TASK-{}".format(dependency_task_id)] = \
406 dependency_ro_task["vim_info"]["vim_id"]
407 if dependency_not_completed:
408 # TODO set at vim_info.vim_details that it is waiting
409 continue
410
411 if task["action"] == "DELETE":
412 new_status, db_vim_info_update = self._delete_task(ro_task, task_index,
413 task_depends, db_ro_task_update)
414 new_status = "FINISHED" if new_status == "DONE" else new_status
415 # ^with FINISHED instead of DONE it will not be refreshing
416 if new_status in ("FINISHED", "SUPERSEDED"):
417 target_update = "DELETE"
418 elif task["action"] == "EXEC":
419 self.item2action[task["item"]](ro_task, task_index, task_depends, db_ro_task_update)
420 new_status = "FINISHED" if new_status == "DONE" else new_status
421 # ^with FINISHED instead of DONE it will not be refreshing
422 if new_status in ("FINISHED", "SUPERSEDED"):
423 target_update = "DELETE"
424 elif task["action"] == "CREATE":
425 if task["status"] == "SCHEDULED":
426 if task_status_create:
427 new_status = task_status_create
428 target_update = "COPY_VIM_INFO"
429 else:
430 new_status, db_vim_info_update = \
431 self.item2create[task["item"]](ro_task, task_index, task_depends)
432 # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
433 _update_refresh(new_status)
434 else:
435 if ro_task["vim_info"]["refresh_at"] and now > ro_task["vim_info"]["refresh_at"]:
436 new_status, db_vim_info_update = self.item2refresh[task["item"]](ro_task)
437 _update_refresh(new_status)
438 except Exception as e:
439 new_status = "FAILED"
440 db_vim_info_update = {"vim_status": "VIM_ERROR", "vim_details": str(e)}
441 if not isinstance(e, (NsWorkerException, vimconn.VimConnException)):
442 self.logger.error("Unexpected exception at _delete_task task={}: {}".
443 format(task["task_id"], e), exc_info=True)
444
445 try:
446 if db_vim_info_update:
447 db_vim_update = db_vim_info_update.copy()
448 db_ro_task_update.update({"vim_info." + k: v for k, v in db_vim_info_update.items()})
449 ro_task["vim_info"].update(db_vim_info_update)
450
451 if new_status:
452 if task_action == "CREATE":
453 task_status_create = new_status
454 db_ro_task_update[task_path] = new_status
455 if target_update or db_vim_update:
456
457 if target_update == "DELETE":
458 self._update_target(task, None)
459 elif target_update == "COPY_VIM_INFO":
460 self._update_target(task, ro_task["vim_info"])
461 else:
462 self._update_target(task, db_vim_update)
463
464 except Exception as e:
465 self.logger.error("Unexpected exception at _update_target task={}: {}".
466 format(task["task_id"], e), exc_info=True)
467
468 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
469 # outside this task (by ro_nbi) do not update it
470 db_ro_task_update["locked_by"] = None
471 # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
472 db_ro_task_update["locked_at"] = int(now - self.MAX_TIME_LOCKED)
473 db_ro_task_update["to_check_at"] = next_check_at
474 if not self.db.set_one("ro_tasks",
475 update_dict=db_ro_task_update,
476 q_filter={"_id": ro_task["_id"], "to_check_at": ro_task["to_check_at"]},
477 fail_on_empty=False):
478 del db_ro_task_update["to_check_at"]
479 self.db.set_one("ro_tasks",
480 q_filter={"_id": ro_task["_id"]},
481 update_dict=db_ro_task_update,
482 fail_on_empty=True)
483 except DbException as e:
484 self.logger.error("ro_task={} Error updating database {}".format(ro_task_id, e))
485 except Exception as e:
486 self.logger.error("Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True)
487
488 def _update_target(self, task, ro_vim_item_update):
489 try:
490 table, _id, path = task["target_record"].split(":")
491 if ro_vim_item_update:
492 update_dict = {path + "." + k: v for k, v in ro_vim_item_update.items() if k in
493 ('vim_id', 'vim_details', 'vim_name', 'vim_status', 'interfaces')}
494 if ro_vim_item_update.get("interfaces"):
495 path_vdu = path[:path.rfind(".")]
496 path_vdu = path_vdu[:path_vdu.rfind(".")]
497 path_interfaces = path_vdu + ".interfaces"
498 for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
499 if iface:
500 update_dict.update({path_interfaces + ".{}.".format(i) + k: v for k, v in iface.items() if
501 k in ('ip_address', 'mac_address', 'vlan', 'compute_node', 'pci')})
502 if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
503 update_dict["ip-address"] = iface.get("ip_address").split(";")[0]
504 if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
505 update_dict[path_vdu + ".ip-address"] = iface.get("ip_address").split(";")[0]
506
507 self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
508 else:
509 self.db.set_one(table, q_filter={"_id": _id}, update_dict=None,
510 unset={path: None})
511 except DbException as e:
512 self.logger.error("Cannot update database '{}': '{}'".format(task["target_record"], e))
513
514 def new_image(self, ro_task, task_index, task_depends):
515 task = ro_task["tasks"][task_index]
516 task_id = task["task_id"]
517 created = False
518 created_items = {}
519 target_vim = self.my_vims[ro_task["target_id"]]
520 try:
521 # FIND
522 if task.get("find_params"):
523 vim_images = target_vim.get_image_list(**task["find_params"])
524 if not vim_images:
525 raise NsWorkerExceptionNotFound("Image not found with this criteria: '{}'".format(
526 task["find_params"]))
527 elif len(vim_images) > 1:
528 raise NsWorkerException(
529 "More than one network found with this criteria: '{}'".format(task["find_params"]))
530 else:
531 vim_image_id = vim_images[0]["id"]
532
533 ro_vim_item_update = {"vim_id": vim_image_id,
534 "vim_status": "DONE",
535 "created": created,
536 "created_items": created_items,
537 "vim_details": None}
538 self.logger.debug(
539 "task={} {} new-image={} created={}".format(task_id, ro_task["target_id"], vim_image_id, created))
540 return "DONE", ro_vim_item_update
541 except (NsWorkerException, vimconn.VimConnException) as e:
542 self.logger.error("task={} {} new-image: {}".format(task_id, ro_task["target_id"], e))
543 ro_vim_item_update = {"vim_status": "VIM_ERROR",
544 "created": created,
545 "vim_details": str(e)}
546 return "FAILED", ro_vim_item_update
547
548 def del_flavor(self, ro_task, task_index):
549 task = ro_task["tasks"][task_index]
550 task_id = task["task_id"]
551 flavor_vim_id = ro_task["vim_info"]["vim_id"]
552 ro_vim_item_update_ok = {"vim_status": "DELETED",
553 "created": False,
554 "vim_details": "DELETED",
555 "vim_id": None}
556 try:
557 if flavor_vim_id:
558 target_vim = self.my_vims[ro_task["target_id"]]
559 target_vim.delete_flavor(flavor_vim_id)
560
561 except vimconn.VimConnNotFoundException:
562 ro_vim_item_update_ok["vim_details"] = "already deleted"
563
564 except vimconn.VimConnException as e:
565 self.logger.error("ro_task={} vim={} del-flavor={}: {}".format(
566 ro_task["_id"], ro_task["target_id"], flavor_vim_id, e))
567 ro_vim_item_update = {"vim_status": "VIM_ERROR",
568 "vim_details": "Error while deleting: {}".format(e)}
569 return "FAILED", ro_vim_item_update
570
571 self.logger.debug("task={} {} del-flavor={} {}".format(
572 task_id, ro_task["target_id"], flavor_vim_id, ro_vim_item_update_ok.get("vim_details", "")))
573 return "DONE", ro_vim_item_update_ok
574
575 def refresh_ok(self, ro_task):
576 """skip calling VIM to get image status. Assumes ok"""
577 if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
578 return "FAILED", {}
579 return "DONE", {}
580
581 def delete_ok(self, ro_task):
582 """skip calling VIM to delete image status. Assumes ok"""
583 return "DONE", {}
584
585 def new_flavor(self, ro_task, task_index, task_depends):
586 task = ro_task["tasks"][task_index]
587 task_id = task["task_id"]
588 created = False
589 created_items = {}
590 target_vim = self.my_vims[ro_task["target_id"]]
591 try:
592 # FIND
593 vim_flavor_id = None
594 if task.get("find_params"):
595 try:
596 flavor_data = task["find_params"]["flavor_data"]
597 vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
598 except vimconn.VimConnNotFoundException:
599 pass
600
601 if not vim_flavor_id and task.get("params"):
602 # CREATE
603 flavor_data = task["params"]["flavor_data"]
604 vim_flavor_id = target_vim.new_flavor(flavor_data)
605 created = True
606
607 ro_vim_item_update = {"vim_id": vim_flavor_id,
608 "vim_status": "DONE",
609 "created": created,
610 "created_items": created_items,
611 "vim_details": None}
612 self.logger.debug(
613 "task={} {} new-flavor={} created={}".format(task_id, ro_task["target_id"], vim_flavor_id, created))
614 return "DONE", ro_vim_item_update
615 except (vimconn.VimConnException, NsWorkerException) as e:
616 self.logger.error("task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e))
617 ro_vim_item_update = {"vim_status": "VIM_ERROR",
618 "created": created,
619 "vim_details": str(e)}
620 return "FAILED", ro_vim_item_update
621
622 def new_net(self, ro_task, task_index, task_depends):
623 vim_net_id = None
624 task = ro_task["tasks"][task_index]
625 task_id = task["task_id"]
626 created = False
627 created_items = {}
628 target_vim = self.my_vims[ro_task["target_id"]]
629 try:
630 # FIND
631 if task.get("find_params"):
632 # if management, get configuration of VIM
633 if task["find_params"].get("filter_dict"):
634 vim_filter = task["find_params"]["filter_dict"]
635 elif task["find_params"].get("mgmt"): # mamagement network
636 if deep_get(self.db_vims[ro_task["target_id"]], "config", "management_network_id"):
637 vim_filter = {"id": self.db_vims[ro_task["target_id"]]["config"]["management_network_id"]}
638 elif deep_get(self.db_vims[ro_task["target_id"]], "config", "management_network_name"):
639 vim_filter = {"name": self.db_vims[ro_task["target_id"]]["config"]["management_network_name"]}
640 else:
641 vim_filter = {"name": task["find_params"]["name"]}
642 else:
643 raise NsWorkerExceptionNotFound("Invalid find_params for new_net {}".format(task["find_params"]))
644
645 vim_nets = target_vim.get_network_list(vim_filter)
646 if not vim_nets and not task.get("params"):
647 raise NsWorkerExceptionNotFound("Network not found with this criteria: '{}'".format(
648 task.get("find_params")))
649 elif len(vim_nets) > 1:
650 raise NsWorkerException(
651 "More than one network found with this criteria: '{}'".format(task["find_params"]))
652 if vim_nets:
653 vim_net_id = vim_nets[0]["id"]
654 else:
655 # CREATE
656 params = task["params"]
657 vim_net_id, created_items = target_vim.new_network(**params)
658 created = True
659
660 ro_vim_item_update = {"vim_id": vim_net_id,
661 "vim_status": "BUILD",
662 "created": created,
663 "created_items": created_items,
664 "vim_details": None}
665 self.logger.debug(
666 "task={} {} new-net={} created={}".format(task_id, ro_task["target_id"], vim_net_id, created))
667 return "BUILD", ro_vim_item_update
668 except (vimconn.VimConnException, NsWorkerException) as e:
669 self.logger.error("task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e))
670 ro_vim_item_update = {"vim_status": "VIM_ERROR",
671 "created": created,
672 "vim_details": str(e)}
673 return "FAILED", ro_vim_item_update
674
675 def refresh_net(self, ro_task):
676 """Call VIM to get network status"""
677 ro_task_id = ro_task["_id"]
678 target_vim = self.my_vims[ro_task["target_id"]]
679
680 vim_id = ro_task["vim_info"]["vim_id"]
681 net_to_refresh_list = [vim_id]
682 try:
683 vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
684 vim_info = vim_dict[vim_id]
685 if vim_info["status"] == "ACTIVE":
686 task_status = "DONE"
687 elif vim_info["status"] == "BUILD":
688 task_status = "BUILD"
689 else:
690 task_status = "FAILED"
691 except vimconn.VimConnException as e:
692 # Mark all tasks at VIM_ERROR status
693 self.logger.error("ro_task={} vim={} get-net={}: {}".format(ro_task_id, ro_task["target_id"], vim_id, e))
694 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
695 task_status = "FAILED"
696
697 ro_vim_item_update = {}
698 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
699 ro_vim_item_update["vim_status"] = vim_info["status"]
700 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
701 ro_vim_item_update["vim_name"] = vim_info.get("name")
702 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
703 if ro_task["vim_info"]["vim_details"] != vim_info["error_msg"]:
704 ro_vim_item_update["vim_details"] = vim_info["error_msg"]
705 elif vim_info["status"] == "DELETED":
706 ro_vim_item_update["vim_id"] = None
707 ro_vim_item_update["vim_details"] = "Deleted externally"
708 else:
709 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
710 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
711 if ro_vim_item_update:
712 self.logger.debug("ro_task={} {} get-net={}: status={} {}".format(
713 ro_task_id, ro_task["target_id"], vim_id, ro_vim_item_update.get("vim_status"),
714 ro_vim_item_update.get("vim_details") if ro_vim_item_update.get("vim_status") != "ACTIVE" else ''))
715 return task_status, ro_vim_item_update
716
717 def del_net(self, ro_task, task_index):
718 task = ro_task["tasks"][task_index]
719 task_id = task["task_id"]
720 net_vim_id = ro_task["vim_info"]["vim_id"]
721 ro_vim_item_update_ok = {"vim_status": "DELETED",
722 "created": False,
723 "vim_details": "DELETED",
724 "vim_id": None}
725 try:
726 if net_vim_id or ro_task["vim_info"]["created_items"]:
727 target_vim = self.my_vims[ro_task["target_id"]]
728 target_vim.delete_network(net_vim_id, ro_task["vim_info"]["created_items"])
729
730 except vimconn.VimConnNotFoundException:
731 ro_vim_item_update_ok["vim_details"] = "already deleted"
732
733 except vimconn.VimConnException as e:
734 self.logger.error("ro_task={} vim={} del-net={}: {}".format(ro_task["_id"], ro_task["target_id"],
735 net_vim_id, e))
736 ro_vim_item_update = {"vim_status": "VIM_ERROR",
737 "vim_details": "Error while deleting: {}".format(e)}
738 return "FAILED", ro_vim_item_update
739
740 self.logger.debug("task={} {} del-net={} {}".format(task_id, ro_task["target_id"], net_vim_id,
741 ro_vim_item_update_ok.get("vim_details", "")))
742 return "DONE", ro_vim_item_update_ok
743
744 def new_vm(self, ro_task, task_index, task_depends):
745 task = ro_task["tasks"][task_index]
746 task_id = task["task_id"]
747 created = False
748 created_items = {}
749 target_vim = self.my_vims[ro_task["target_id"]]
750 try:
751 created = True
752 params = task["params"]
753 params_copy = deepcopy(params)
754 net_list = params_copy["net_list"]
755 for net in net_list:
756 if "net_id" in net and net["net_id"].startswith("TASK-"): # change task_id into network_id
757 network_id = task_depends[net["net_id"]]
758 if not network_id:
759 raise NsWorkerException("Cannot create VM because depends on a network not created or found "
760 "for {}".format(net["net_id"]))
761 net["net_id"] = network_id
762 if params_copy["image_id"].startswith("TASK-"):
763 params_copy["image_id"] = task_depends[params_copy["image_id"]]
764 if params_copy["flavor_id"].startswith("TASK-"):
765 params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
766
767 vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
768 interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
769
770 ro_vim_item_update = {"vim_id": vim_vm_id,
771 "vim_status": "BUILD",
772 "created": created,
773 "created_items": created_items,
774 "vim_details": None,
775 "interfaces_vim_ids": interfaces,
776 "interfaces": [],
777 }
778 self.logger.debug(
779 "task={} {} new-vm={} created={}".format(task_id, ro_task["target_id"], vim_vm_id, created))
780 return "BUILD", ro_vim_item_update
781 except (vimconn.VimConnException, NsWorkerException) as e:
782 self.logger.error("task={} vim={} new-vm: {}".format(task_id, ro_task["target_id"], e))
783 ro_vim_item_update = {"vim_status": "VIM_ERROR",
784 "created": created,
785 "vim_details": str(e)}
786 return "FAILED", ro_vim_item_update
787
788 def del_vm(self, ro_task, task_index):
789 task = ro_task["tasks"][task_index]
790 task_id = task["task_id"]
791 vm_vim_id = ro_task["vim_info"]["vim_id"]
792 ro_vim_item_update_ok = {"vim_status": "DELETED",
793 "created": False,
794 "vim_details": "DELETED",
795 "vim_id": None}
796 try:
797 if vm_vim_id or ro_task["vim_info"]["created_items"]:
798 target_vim = self.my_vims[ro_task["target_id"]]
799 target_vim.delete_vminstance(vm_vim_id, ro_task["vim_info"]["created_items"])
800
801 except vimconn.VimConnNotFoundException:
802 ro_vim_item_update_ok["vim_details"] = "already deleted"
803
804 except vimconn.VimConnException as e:
805 self.logger.error("ro_task={} vim={} del-vm={}: {}".format(ro_task["_id"], ro_task["target_id"],
806 vm_vim_id, e))
807 ro_vim_item_update = {"vim_status": "VIM_ERROR",
808 "vim_details": "Error while deleting: {}".format(e)}
809 return "FAILED", ro_vim_item_update
810
811 self.logger.debug("task={} {} del-vm={} {}".format(task_id, ro_task["target_id"], vm_vim_id,
812 ro_vim_item_update_ok.get("vim_details", "")))
813 return "DONE", ro_vim_item_update_ok
814
815 def refresh_vm(self, ro_task):
816 """Call VIM to get vm status"""
817 ro_task_id = ro_task["_id"]
818 target_vim = self.my_vims[ro_task["target_id"]]
819
820 vim_id = ro_task["vim_info"]["vim_id"]
821 if not vim_id:
822 return None, None
823 vm_to_refresh_list = [vim_id]
824 try:
825 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
826 vim_info = vim_dict[vim_id]
827 if vim_info["status"] == "ACTIVE":
828 task_status = "DONE"
829 elif vim_info["status"] == "BUILD":
830 task_status = "BUILD"
831 else:
832 task_status = "FAILED"
833 except vimconn.VimConnException as e:
834 # Mark all tasks at VIM_ERROR status
835 self.logger.error("ro_task={} vim={} get-vm={}: {}".format(ro_task_id, ro_task["target_id"], vim_id, e))
836 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
837 task_status = "FAILED"
838
839 ro_vim_item_update = {}
840 # TODO check and update interfaces
841 vim_interfaces = []
842 for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
843 iface = next((iface for iface in vim_info["interfaces"] if vim_iface_id == iface["vim_interface_id"]), None)
844 # if iface:
845 # iface.pop("vim_info", None)
846 vim_interfaces.append(iface)
847
848 task = ro_task["tasks"][0] # TODO look for a task CREATE and active
849 if task.get("mgmt_vnf_interface") is not None:
850 vim_interfaces[task["mgmt_vnf_interface"]]["mgmt_vnf_interface"] = True
851 mgmt_vdu_iface = task.get("mgmt_vdu_interface", task.get("mgmt_vnf_interface", 0))
852 vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
853
854 if ro_task["vim_info"]["interfaces"] != vim_interfaces:
855 ro_vim_item_update["interfaces"] = vim_interfaces
856 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
857 ro_vim_item_update["vim_status"] = vim_info["status"]
858 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
859 ro_vim_item_update["vim_name"] = vim_info.get("name")
860 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
861 if ro_task["vim_info"]["vim_details"] != vim_info["error_msg"]:
862 ro_vim_item_update["vim_details"] = vim_info["error_msg"]
863 elif vim_info["status"] == "DELETED":
864 ro_vim_item_update["vim_id"] = None
865 ro_vim_item_update["vim_details"] = "Deleted externally"
866 else:
867 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
868 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
869 if ro_vim_item_update:
870 self.logger.debug("ro_task={} {} get-vm={}: status={} {}".format(
871 ro_task_id, ro_task["target_id"], vim_id, ro_vim_item_update.get("vim_status"),
872 ro_vim_item_update.get("vim_details") if ro_vim_item_update.get("vim_status") != "ACTIVE" else ''))
873 return task_status, ro_vim_item_update
874
875 def exec_vm(self, ro_task, task_index, task_depends):
876 task = ro_task["tasks"][task_index]
877 task_id = task["task_id"]
878 target_vim = self.my_vims[ro_task["target_id"]]
879 try:
880 params = task["params"]
881 params_copy = deepcopy(params)
882 params_copy["use_pri_key"] = self.db.decrypt(params_copy.pop("private_key"),
883 params_copy.pop("schema_version"), params_copy.pop("salt"))
884
885 target_vim.inject_user_key(**params_copy)
886 self.logger.debug(
887 "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"]))
888 return "DONE", params_copy["key"]
889 except (vimconn.VimConnException, NsWorkerException) as e:
890 self.logger.error("task={} vim={} new-vm: {}".format(task_id, ro_task["target_id"], e))
891 ro_vim_item_update = {"vim_details": str(e)}
892 return "FAILED", ro_vim_item_update
893
894 def run(self):
895 # load database
896 self.logger.debug("Starting")
897 while True:
898 try:
899 task = self.task_queue.get(block=False if self.my_vims else True)
900 if task[0] == "terminate":
901 break
902 if task[0] == "load_vim":
903 self._load_vim(task[1])
904 continue
905 except queue.Empty:
906 pass
907
908 try:
909 busy = False
910 ro_task = self._get_db_task()
911 if ro_task:
912 self._proccess_pending_tasks(ro_task)
913 busy = True
914 if not busy:
915 time.sleep(5)
916 except Exception as e:
917 self.logger.critical("Unexpected exception at run: " + str(e), exc_info=True)
918
919 self.logger.debug("Finishing")