1 # -*- coding: utf-8 -*-
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
31 from pkg_resources
import iter_entry_points
32 # from osm_common import dbmongo, dbmemory, fslocal, fsmongo, msglocal, msgkafka, version as common_version
33 from osm_common
.dbbase
import DbException
34 # from osm_common.fsbase import FsException
35 # from osm_common.msgbase import MsgException
36 from osm_ro_plugin
.vim_dummy
import VimDummyConnector
37 from osm_ro_plugin
import vimconn
38 from copy
import deepcopy
39 from unittest
.mock
import Mock
41 __author__
= "Alfonso Tierno"
42 __date__
= "$28-Sep-2017 12:07:15$"
45 def deep_get(target_dict
, *args
, **kwargs
):
47 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
48 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
49 :param target_dict: dictionary to be read
50 :param args: list of keys to read from target_dict
51 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
52 :return: The wanted value if exist, None or default otherwise
55 if not isinstance(target_dict
, dict) or key
not in target_dict
:
56 return kwargs
.get("default")
57 target_dict
= target_dict
[key
]
61 class NsWorkerException(Exception):
65 class FailingConnector
:
66 def __init__(self
, error_msg
):
67 self
.error_msg
= error_msg
68 for method
in dir(vimconn
.VimConnector
):
70 setattr(self
, method
, Mock(side_effect
=vimconn
.VimConnException(error_msg
)))
73 class NsWorkerExceptionNotFound(NsWorkerException
):
77 class NsWorker(threading
.Thread
):
78 REFRESH_BUILD
= 5 # 5 seconds
79 REFRESH_ACTIVE
= 60 # 1 minute
81 REFRESH_IMAGE
= 3600 * 10
82 REFRESH_DELETE
= 3600 * 10
84 # TODO delete assigment_lock = Lock()
86 # TODO delete assignment = {}
87 MAX_TIME_LOCKED
= 3600
89 def __init__(self
, worker
, config
, plugins
, db
):
94 'host','user': host ip or name to manage and user
95 'db', 'db_lock': database class and lock to use it in exclusion
97 threading
.Thread
.__init
__(self
)
99 self
.plugins
= plugins
100 self
.plugin_name
= "unknown"
101 self
.logger
= logging
.getLogger('ro.worker{}'.format("worker"))
102 self
.worker_id
= worker
103 self
.task_queue
= queue
.Queue(self
.QUEUE_SIZE
)
104 self
.my_vims
= {} # targetvim: vimplugin class
105 self
.db_vims
= {} # targetvim: vim information from database
106 self
.vim_targets
= [] # targetvim list
107 self
.my_id
= config
["process_id"] + ":" + str(worker
)
112 "image": self
.new_image
,
113 "flavor": self
.new_flavor
,
115 self
.item2refresh
= {
116 "net": self
.refresh_net
,
117 "vdu": self
.refresh_vm
,
118 "image": self
.refresh_ok
,
119 "flavor": self
.refresh_ok
,
124 "image": self
.delete_ok
,
125 "flavor": self
.del_flavor
,
130 self
.time_last_task_processed
= None
132 def insert_task(self
, task
):
134 self
.task_queue
.put(task
, False)
137 raise NsWorkerException("timeout inserting a task")
140 self
.insert_task("exit")
142 def del_task(self
, task
):
144 if task
["status"] == "SCHEDULED":
145 task
["status"] = "SUPERSEDED"
147 else: # task["status"] == "processing"
148 self
.task_lock
.release()
151 def _load_plugin(self
, name
, type="vim"):
152 # type can be vim or sdn
153 if "rovim_dummy" not in self
.plugins
:
154 self
.plugins
["rovim_dummy"] = VimDummyConnector
155 if name
in self
.plugins
:
156 return self
.plugins
[name
]
158 for v
in iter_entry_points('osm_ro{}.plugins'.format(type), name
):
159 self
.plugins
[name
] = v
.load()
160 except Exception as e
:
161 self
.logger
.critical("Cannot load osm_{}: {}".format(name
, e
))
163 self
.plugins
[name
] = FailingConnector("Cannot load osm_{}: {}".format(name
, e
))
164 if name
and name
not in self
.plugins
:
165 error_text
= "Cannot load a module for {t} type '{n}'. The plugin 'osm_{n}' has not been" \
166 " registered".format(t
=type, n
=name
)
167 self
.logger
.critical(error_text
)
168 self
.plugins
[name
] = FailingConnector(error_text
)
170 return self
.plugins
[name
]
172 def _load_vim(self
, vim_account_id
):
173 target_id
= "vim:" + vim_account_id
177 step
= "Getting vim={} from db".format(vim_account_id
)
178 vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
180 # if deep_get(vim, "config", "sdn-controller"):
181 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
182 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
184 step
= "Decrypt password"
185 schema_version
= vim
.get("schema_version")
186 self
.db
.encrypt_decrypt_fields(vim
, "decrypt", fields
=('password', 'secret'),
187 schema_version
=schema_version
, salt
=vim_account_id
)
189 step
= "Load plugin 'rovim_{}'".format(vim
.get("vim_type"))
190 plugin_name
= "rovim_" + vim
["vim_type"]
191 vim_module_conn
= self
._load
_plugin
(plugin_name
)
192 self
.my_vims
[target_id
] = vim_module_conn(
193 uuid
=vim
['_id'], name
=vim
['name'],
194 tenant_id
=vim
.get('vim_tenant_id'), tenant_name
=vim
.get('vim_tenant_name'),
195 url
=vim
['vim_url'], url_admin
=None,
196 user
=vim
['vim_user'], passwd
=vim
['vim_password'],
197 config
=vim
.get('config'), persistent_info
={}
199 self
.vim_targets
.append(target_id
)
200 self
.db_vims
[target_id
] = vim
201 self
.error_status
= None
202 self
.logger
.info("Vim Connector loaded for vim_account={}, plugin={}".format(
203 vim_account_id
, plugin_name
))
204 except Exception as e
:
205 self
.logger
.error("Cannot load vimconnector for vim_account={} plugin={}: {} {}".format(
206 vim_account_id
, plugin_name
, step
, e
))
207 self
.db_vims
[target_id
] = vim
or {}
208 self
.my_vims
[target_id
] = FailingConnector(str(e
))
209 self
.error_status
= "Error loading vimconnector: {}".format(e
)
211 def _get_db_task(self
):
213 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
217 if not self
.time_last_task_processed
:
218 self
.time_last_task_processed
= now
221 locked
= self
.db
.set_one(
223 q_filter
={"target_id": self
.vim_targets
,
224 "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
225 "locked_at.lt": now
- self
.MAX_TIME_LOCKED
,
226 "to_check_at.lt": self
.time_last_task_processed
},
227 update_dict
={"locked_by": self
.my_id
, "locked_at": now
},
231 ro_task
= self
.db
.get_one(
233 q_filter
={"target_id": self
.vim_targets
,
234 "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
237 if self
.time_last_task_processed
== now
:
238 self
.time_last_task_processed
= None
241 self
.time_last_task_processed
= now
242 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
244 except DbException
as e
:
245 self
.logger
.error("Database exception at _get_db_task: {}".format(e
))
246 except Exception as e
:
247 self
.logger
.critical("Unexpected exception at _get_db_task: {}".format(e
), exc_info
=True)
250 def _delete_task(self
, ro_task
, task_index
, task_depends
, db_update
):
252 Determine if this task need to be done or superseded
255 my_task
= ro_task
["tasks"][task_index
]
256 task_id
= my_task
["task_id"]
257 needed_delete
= ro_task
["vim_info"]["created"] or ro_task
["vim_info"].get("created_items", False)
258 if my_task
["status"] == "FAILED":
259 return None, None # TODO need to be retry??
261 for index
, task
in enumerate(ro_task
["tasks"]):
262 if index
== task_index
:
264 if my_task
["target_record"] == task
["target_record"] and task
["action"] == "CREATE":
266 db_update
["tasks.{}.status".format(index
)] = task
["status"] = "FINISHED"
267 elif task
["action"] == "CREATE" and task
["status"] not in ("FINISHED", "SUPERSEDED"):
268 needed_delete
= False
270 return self
.item2delete
[my_task
["item"]](ro_task
, task_index
)
272 return "SUPERSEDED", None
273 except Exception as e
:
274 if not isinstance(e
, NsWorkerException
):
275 self
.logger
.critical("Unexpected exception at _delete_task task={}: {}".format(task_id
, e
),
277 return "FAILED", {"vim_status": "VIM_ERROR", "vim_details": str(e
)}
279 def _create_task(self
, ro_task
, task_index
, task_depends
, db_update
):
281 Determine if this task need to be created
284 my_task
= ro_task
["tasks"][task_index
]
285 task_id
= my_task
["task_id"]
287 if my_task
["status"] == "FAILED":
288 return None, None # TODO need to be retry??
289 elif my_task
["status"] == "SCHEDULED":
290 # check if already created by another task
291 for index
, task
in enumerate(ro_task
["tasks"]):
292 if index
== task_index
:
294 if task
["action"] == "CREATE" and task
["status"] not in ("SCHEDULED", "FINISHED", "SUPERSEDED"):
295 return task
["status"], "COPY_VIM_INFO"
298 task_status
, ro_vim_item_update
= self
.item2create
[my_task
["item"]](ro_task
, task_index
, task_depends
)
299 # TODO update other CREATE tasks
300 except Exception as e
:
301 if not isinstance(e
, NsWorkerException
):
302 self
.logger
.error("Error executing task={}: {}".format(task_id
, e
), exc_info
=True)
303 task_status
= "FAILED"
304 ro_vim_item_update
= {"vim_status": "VIM_ERROR", "vim_details": str(e
)}
305 # TODO update ro_vim_item_update
306 return task_status
, ro_vim_item_update
310 def _get_dependency(self
, task_id
, ro_task
=None, target_id
=None):
311 if task_id
.startswith("nsrs:") or task_id
.startswith("vnfrs:"):
312 ro_task_dependency
= self
.db
.get_one(
314 q_filter
={"target_id": target_id
,
315 "tasks.target_record_id": task_id
318 if ro_task_dependency
:
319 for task_index
, task
in enumerate(ro_task_dependency
["tasks"]):
320 if task
["target_record_id"] == task_id
:
321 return ro_task_dependency
, task_index
325 for task_index
, task
in enumerate(ro_task
["tasks"]):
326 if task
["task_id"] == task_id
:
327 return ro_task
, task_index
328 ro_task_dependency
= self
.db
.get_one(
330 q_filter
={"tasks.ANYINDEX.task_id": task_id
,
331 "tasks.ANYINDEX.target_record.ne": None
334 if ro_task_dependency
:
335 for task_index
, task
in ro_task_dependency
["tasks"]:
336 if task
["task_id"] == task_id
:
337 return ro_task_dependency
, task_index
338 raise NsWorkerException("Cannot get depending task {}".format(task_id
))
340 def _proccess_pending_tasks(self
, ro_task
):
341 ro_task_id
= ro_task
["_id"]
343 next_check_at
= now
+ (24*60*60) # one day
344 db_ro_task_update
= {}
346 def _update_refresh(new_status
):
347 # compute next_refresh
349 nonlocal next_check_at
350 nonlocal db_ro_task_update
353 next_refresh
= time
.time()
354 if task
["item"] in ("image", "flavor"):
355 next_refresh
+= self
.REFRESH_IMAGE
356 elif new_status
== "BUILD":
357 next_refresh
+= self
.REFRESH_BUILD
358 elif new_status
== "DONE":
359 next_refresh
+= self
.REFRESH_ACTIVE
361 next_refresh
+= self
.REFRESH_ERROR
362 next_check_at
= min(next_check_at
, next_refresh
)
363 db_ro_task_update
["vim_info.refresh_at"] = next_refresh
364 ro_task
["vim_info"]["refresh_at"] = next_refresh
367 # 0 get task_status_create
368 task_status_create
= None
369 task_create
= next((t
for t
in ro_task
["tasks"] if t
["action"] == "CREATE" and
370 t
["status"] in ("BUILD", "DONE")), None)
372 task_status_create
= task_create
["status"]
373 # 1. look for SCHEDULED or if CREATE also DONE,BUILD
374 for task_action
in ("DELETE", "CREATE", "EXEC"):
376 for task_index
, task
in enumerate(ro_task
["tasks"]):
378 if (task_action
in ("DELETE", "EXEC") and task
["status"] != "SCHEDULED") or\
379 task
["action"] != task_action
or \
380 (task_action
== "CREATE" and task
["status"] in ("FINISHED", "SUPERSEDED")):
382 task_path
= "tasks.{}.status".format(task_index
)
384 if task
["status"] == "SCHEDULED":
386 # check if tasks that this depends on have been completed
387 dependency_not_completed
= False
388 for dependency_task_id
in (task
.get("depends_on") or ()):
389 dependency_ro_task
, dependency_task_index
= \
390 self
._get
_dependency
(dependency_task_id
, target_id
=ro_task
["target_id"])
391 dependency_task
= dependency_ro_task
["tasks"][dependency_task_index
]
392 if dependency_task
["status"] == "SCHEDULED":
393 dependency_not_completed
= True
394 next_check_at
= min(next_check_at
, dependency_ro_task
["to_check_at"])
396 elif dependency_task
["status"] == "FAILED":
397 error_text
= "Cannot {} {} because depends on failed {} {} id={}): {}".format(
398 task
["action"], task
["item"], dependency_task
["action"],
399 dependency_task
["item"], dependency_task_id
,
400 dependency_ro_task
["vim_info"].get("vim_details"))
401 self
.logger
.error("task={} {}".format(task
["task_id"], error_text
))
402 raise NsWorkerException(error_text
)
404 task_depends
[dependency_task_id
] = dependency_ro_task
["vim_info"]["vim_id"]
405 task_depends
["TASK-{}".format(dependency_task_id
)] = \
406 dependency_ro_task
["vim_info"]["vim_id"]
407 if dependency_not_completed
:
408 # TODO set at vim_info.vim_details that it is waiting
411 if task
["action"] == "DELETE":
412 new_status
, db_vim_info_update
= self
._delete
_task
(ro_task
, task_index
,
413 task_depends
, db_ro_task_update
)
414 new_status
= "FINISHED" if new_status
== "DONE" else new_status
415 # ^with FINISHED instead of DONE it will not be refreshing
416 if new_status
in ("FINISHED", "SUPERSEDED"):
417 target_update
= "DELETE"
418 elif task
["action"] == "EXEC":
419 self
.item2action
[task
["item"]](ro_task
, task_index
, task_depends
, db_ro_task_update
)
420 new_status
= "FINISHED" if new_status
== "DONE" else new_status
421 # ^with FINISHED instead of DONE it will not be refreshing
422 if new_status
in ("FINISHED", "SUPERSEDED"):
423 target_update
= "DELETE"
424 elif task
["action"] == "CREATE":
425 if task
["status"] == "SCHEDULED":
426 if task_status_create
:
427 new_status
= task_status_create
428 target_update
= "COPY_VIM_INFO"
430 new_status
, db_vim_info_update
= \
431 self
.item2create
[task
["item"]](ro_task
, task_index
, task_depends
)
432 # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
433 _update_refresh(new_status
)
435 if ro_task
["vim_info"]["refresh_at"] and now
> ro_task
["vim_info"]["refresh_at"]:
436 new_status
, db_vim_info_update
= self
.item2refresh
[task
["item"]](ro_task
)
437 _update_refresh(new_status
)
438 except Exception as e
:
439 new_status
= "FAILED"
440 db_vim_info_update
= {"vim_status": "VIM_ERROR", "vim_details": str(e
)}
441 if not isinstance(e
, (NsWorkerException
, vimconn
.VimConnException
)):
442 self
.logger
.error("Unexpected exception at _delete_task task={}: {}".
443 format(task
["task_id"], e
), exc_info
=True)
446 if db_vim_info_update
:
447 db_vim_update
= db_vim_info_update
.copy()
448 db_ro_task_update
.update({"vim_info." + k
: v
for k
, v
in db_vim_info_update
.items()})
449 ro_task
["vim_info"].update(db_vim_info_update
)
452 if task_action
== "CREATE":
453 task_status_create
= new_status
454 db_ro_task_update
[task_path
] = new_status
455 if target_update
or db_vim_update
:
457 if target_update
== "DELETE":
458 self
._update
_target
(task
, None)
459 elif target_update
== "COPY_VIM_INFO":
460 self
._update
_target
(task
, ro_task
["vim_info"])
462 self
._update
_target
(task
, db_vim_update
)
464 except Exception as e
:
465 self
.logger
.error("Unexpected exception at _update_target task={}: {}".
466 format(task
["task_id"], e
), exc_info
=True)
468 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
469 # outside this task (by ro_nbi) do not update it
470 db_ro_task_update
["locked_by"] = None
471 # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
472 db_ro_task_update
["locked_at"] = int(now
- self
.MAX_TIME_LOCKED
)
473 db_ro_task_update
["to_check_at"] = next_check_at
474 if not self
.db
.set_one("ro_tasks",
475 update_dict
=db_ro_task_update
,
476 q_filter
={"_id": ro_task
["_id"], "to_check_at": ro_task
["to_check_at"]},
477 fail_on_empty
=False):
478 del db_ro_task_update
["to_check_at"]
479 self
.db
.set_one("ro_tasks",
480 q_filter
={"_id": ro_task
["_id"]},
481 update_dict
=db_ro_task_update
,
483 except DbException
as e
:
484 self
.logger
.error("ro_task={} Error updating database {}".format(ro_task_id
, e
))
485 except Exception as e
:
486 self
.logger
.error("Error executing ro_task={}: {}".format(ro_task_id
, e
), exc_info
=True)
488 def _update_target(self
, task
, ro_vim_item_update
):
490 table
, _id
, path
= task
["target_record"].split(":")
491 if ro_vim_item_update
:
492 update_dict
= {path
+ "." + k
: v
for k
, v
in ro_vim_item_update
.items() if k
in
493 ('vim_id', 'vim_details', 'vim_name', 'vim_status', 'interfaces')}
494 if ro_vim_item_update
.get("interfaces"):
495 path_vdu
= path
[:path
.rfind(".")]
496 path_vdu
= path_vdu
[:path_vdu
.rfind(".")]
497 path_interfaces
= path_vdu
+ ".interfaces"
498 for i
, iface
in enumerate(ro_vim_item_update
.get("interfaces")):
500 update_dict
.update({path_interfaces
+ ".{}.".format(i
) + k
: v
for k
, v
in iface
.items() if
501 k
in ('ip_address', 'mac_address', 'vlan', 'compute_node', 'pci')})
502 if iface
.get("mgmt_vnf_interface") and iface
.get("ip_address"):
503 update_dict
["ip-address"] = iface
.get("ip_address").split(";")[0]
504 if iface
.get("mgmt_vdu_interface") and iface
.get("ip_address"):
505 update_dict
[path_vdu
+ ".ip-address"] = iface
.get("ip_address").split(";")[0]
507 self
.db
.set_one(table
, q_filter
={"_id": _id
}, update_dict
=update_dict
)
509 self
.db
.set_one(table
, q_filter
={"_id": _id
}, update_dict
=None,
511 except DbException
as e
:
512 self
.logger
.error("Cannot update database '{}': '{}'".format(task
["target_record"], e
))
514 def new_image(self
, ro_task
, task_index
, task_depends
):
515 task
= ro_task
["tasks"][task_index
]
516 task_id
= task
["task_id"]
519 target_vim
= self
.my_vims
[ro_task
["target_id"]]
522 if task
.get("find_params"):
523 vim_images
= target_vim
.get_image_list(**task
["find_params"])
525 raise NsWorkerExceptionNotFound("Image not found with this criteria: '{}'".format(
526 task
["find_params"]))
527 elif len(vim_images
) > 1:
528 raise NsWorkerException(
529 "More than one network found with this criteria: '{}'".format(task
["find_params"]))
531 vim_image_id
= vim_images
[0]["id"]
533 ro_vim_item_update
= {"vim_id": vim_image_id
,
534 "vim_status": "DONE",
536 "created_items": created_items
,
539 "task={} {} new-image={} created={}".format(task_id
, ro_task
["target_id"], vim_image_id
, created
))
540 return "DONE", ro_vim_item_update
541 except (NsWorkerException
, vimconn
.VimConnException
) as e
:
542 self
.logger
.error("task={} {} new-image: {}".format(task_id
, ro_task
["target_id"], e
))
543 ro_vim_item_update
= {"vim_status": "VIM_ERROR",
545 "vim_details": str(e
)}
546 return "FAILED", ro_vim_item_update
548 def del_flavor(self
, ro_task
, task_index
):
549 task
= ro_task
["tasks"][task_index
]
550 task_id
= task
["task_id"]
551 flavor_vim_id
= ro_task
["vim_info"]["vim_id"]
552 ro_vim_item_update_ok
= {"vim_status": "DELETED",
554 "vim_details": "DELETED",
558 target_vim
= self
.my_vims
[ro_task
["target_id"]]
559 target_vim
.delete_flavor(flavor_vim_id
)
561 except vimconn
.VimConnNotFoundException
:
562 ro_vim_item_update_ok
["vim_details"] = "already deleted"
564 except vimconn
.VimConnException
as e
:
565 self
.logger
.error("ro_task={} vim={} del-flavor={}: {}".format(
566 ro_task
["_id"], ro_task
["target_id"], flavor_vim_id
, e
))
567 ro_vim_item_update
= {"vim_status": "VIM_ERROR",
568 "vim_details": "Error while deleting: {}".format(e
)}
569 return "FAILED", ro_vim_item_update
571 self
.logger
.debug("task={} {} del-flavor={} {}".format(
572 task_id
, ro_task
["target_id"], flavor_vim_id
, ro_vim_item_update_ok
.get("vim_details", "")))
573 return "DONE", ro_vim_item_update_ok
575 def refresh_ok(self
, ro_task
):
576 """skip calling VIM to get image status. Assumes ok"""
577 if ro_task
["vim_info"]["vim_status"] == "VIM_ERROR":
581 def delete_ok(self
, ro_task
):
582 """skip calling VIM to delete image status. Assumes ok"""
585 def new_flavor(self
, ro_task
, task_index
, task_depends
):
586 task
= ro_task
["tasks"][task_index
]
587 task_id
= task
["task_id"]
590 target_vim
= self
.my_vims
[ro_task
["target_id"]]
594 if task
.get("find_params"):
596 flavor_data
= task
["find_params"]["flavor_data"]
597 vim_flavor_id
= target_vim
.get_flavor_id_from_data(flavor_data
)
598 except vimconn
.VimConnNotFoundException
:
601 if not vim_flavor_id
and task
.get("params"):
603 flavor_data
= task
["params"]["flavor_data"]
604 vim_flavor_id
= target_vim
.new_flavor(flavor_data
)
607 ro_vim_item_update
= {"vim_id": vim_flavor_id
,
608 "vim_status": "DONE",
610 "created_items": created_items
,
613 "task={} {} new-flavor={} created={}".format(task_id
, ro_task
["target_id"], vim_flavor_id
, created
))
614 return "DONE", ro_vim_item_update
615 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
616 self
.logger
.error("task={} vim={} new-flavor: {}".format(task_id
, ro_task
["target_id"], e
))
617 ro_vim_item_update
= {"vim_status": "VIM_ERROR",
619 "vim_details": str(e
)}
620 return "FAILED", ro_vim_item_update
622 def new_net(self
, ro_task
, task_index
, task_depends
):
624 task
= ro_task
["tasks"][task_index
]
625 task_id
= task
["task_id"]
628 target_vim
= self
.my_vims
[ro_task
["target_id"]]
631 if task
.get("find_params"):
632 # if management, get configuration of VIM
633 if task
["find_params"].get("filter_dict"):
634 vim_filter
= task
["find_params"]["filter_dict"]
635 elif task
["find_params"].get("mgmt"): # mamagement network
636 if deep_get(self
.db_vims
[ro_task
["target_id"]], "config", "management_network_id"):
637 vim_filter
= {"id": self
.db_vims
[ro_task
["target_id"]]["config"]["management_network_id"]}
638 elif deep_get(self
.db_vims
[ro_task
["target_id"]], "config", "management_network_name"):
639 vim_filter
= {"name": self
.db_vims
[ro_task
["target_id"]]["config"]["management_network_name"]}
641 vim_filter
= {"name": task
["find_params"]["name"]}
643 raise NsWorkerExceptionNotFound("Invalid find_params for new_net {}".format(task
["find_params"]))
645 vim_nets
= target_vim
.get_network_list(vim_filter
)
646 if not vim_nets
and not task
.get("params"):
647 raise NsWorkerExceptionNotFound("Network not found with this criteria: '{}'".format(
648 task
.get("find_params")))
649 elif len(vim_nets
) > 1:
650 raise NsWorkerException(
651 "More than one network found with this criteria: '{}'".format(task
["find_params"]))
653 vim_net_id
= vim_nets
[0]["id"]
656 params
= task
["params"]
657 vim_net_id
, created_items
= target_vim
.new_network(**params
)
660 ro_vim_item_update
= {"vim_id": vim_net_id
,
661 "vim_status": "BUILD",
663 "created_items": created_items
,
666 "task={} {} new-net={} created={}".format(task_id
, ro_task
["target_id"], vim_net_id
, created
))
667 return "BUILD", ro_vim_item_update
668 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
669 self
.logger
.error("task={} vim={} new-net: {}".format(task_id
, ro_task
["target_id"], e
))
670 ro_vim_item_update
= {"vim_status": "VIM_ERROR",
672 "vim_details": str(e
)}
673 return "FAILED", ro_vim_item_update
675 def refresh_net(self
, ro_task
):
676 """Call VIM to get network status"""
677 ro_task_id
= ro_task
["_id"]
678 target_vim
= self
.my_vims
[ro_task
["target_id"]]
680 vim_id
= ro_task
["vim_info"]["vim_id"]
681 net_to_refresh_list
= [vim_id
]
683 vim_dict
= target_vim
.refresh_nets_status(net_to_refresh_list
)
684 vim_info
= vim_dict
[vim_id
]
685 if vim_info
["status"] == "ACTIVE":
687 elif vim_info
["status"] == "BUILD":
688 task_status
= "BUILD"
690 task_status
= "FAILED"
691 except vimconn
.VimConnException
as e
:
692 # Mark all tasks at VIM_ERROR status
693 self
.logger
.error("ro_task={} vim={} get-net={}: {}".format(ro_task_id
, ro_task
["target_id"], vim_id
, e
))
694 vim_info
= {"status": "VIM_ERROR", "error_msg": str(e
)}
695 task_status
= "FAILED"
697 ro_vim_item_update
= {}
698 if ro_task
["vim_info"]["vim_status"] != vim_info
["status"]:
699 ro_vim_item_update
["vim_status"] = vim_info
["status"]
700 if ro_task
["vim_info"]["vim_name"] != vim_info
.get("name"):
701 ro_vim_item_update
["vim_name"] = vim_info
.get("name")
702 if vim_info
["status"] in ("ERROR", "VIM_ERROR"):
703 if ro_task
["vim_info"]["vim_details"] != vim_info
["error_msg"]:
704 ro_vim_item_update
["vim_details"] = vim_info
["error_msg"]
705 elif vim_info
["status"] == "DELETED":
706 ro_vim_item_update
["vim_id"] = None
707 ro_vim_item_update
["vim_details"] = "Deleted externally"
709 if ro_task
["vim_info"]["vim_details"] != vim_info
["vim_info"]:
710 ro_vim_item_update
["vim_details"] = vim_info
["vim_info"]
711 if ro_vim_item_update
:
712 self
.logger
.debug("ro_task={} {} get-net={}: status={} {}".format(
713 ro_task_id
, ro_task
["target_id"], vim_id
, ro_vim_item_update
.get("vim_status"),
714 ro_vim_item_update
.get("vim_details") if ro_vim_item_update
.get("vim_status") != "ACTIVE" else ''))
715 return task_status
, ro_vim_item_update
717 def del_net(self
, ro_task
, task_index
):
718 task
= ro_task
["tasks"][task_index
]
719 task_id
= task
["task_id"]
720 net_vim_id
= ro_task
["vim_info"]["vim_id"]
721 ro_vim_item_update_ok
= {"vim_status": "DELETED",
723 "vim_details": "DELETED",
726 if net_vim_id
or ro_task
["vim_info"]["created_items"]:
727 target_vim
= self
.my_vims
[ro_task
["target_id"]]
728 target_vim
.delete_network(net_vim_id
, ro_task
["vim_info"]["created_items"])
730 except vimconn
.VimConnNotFoundException
:
731 ro_vim_item_update_ok
["vim_details"] = "already deleted"
733 except vimconn
.VimConnException
as e
:
734 self
.logger
.error("ro_task={} vim={} del-net={}: {}".format(ro_task
["_id"], ro_task
["target_id"],
736 ro_vim_item_update
= {"vim_status": "VIM_ERROR",
737 "vim_details": "Error while deleting: {}".format(e
)}
738 return "FAILED", ro_vim_item_update
740 self
.logger
.debug("task={} {} del-net={} {}".format(task_id
, ro_task
["target_id"], net_vim_id
,
741 ro_vim_item_update_ok
.get("vim_details", "")))
742 return "DONE", ro_vim_item_update_ok
744 def new_vm(self
, ro_task
, task_index
, task_depends
):
745 task
= ro_task
["tasks"][task_index
]
746 task_id
= task
["task_id"]
749 target_vim
= self
.my_vims
[ro_task
["target_id"]]
752 params
= task
["params"]
753 params_copy
= deepcopy(params
)
754 net_list
= params_copy
["net_list"]
756 if "net_id" in net
and net
["net_id"].startswith("TASK-"): # change task_id into network_id
757 network_id
= task_depends
[net
["net_id"]]
759 raise NsWorkerException("Cannot create VM because depends on a network not created or found "
760 "for {}".format(net
["net_id"]))
761 net
["net_id"] = network_id
762 if params_copy
["image_id"].startswith("TASK-"):
763 params_copy
["image_id"] = task_depends
[params_copy
["image_id"]]
764 if params_copy
["flavor_id"].startswith("TASK-"):
765 params_copy
["flavor_id"] = task_depends
[params_copy
["flavor_id"]]
767 vim_vm_id
, created_items
= target_vim
.new_vminstance(**params_copy
)
768 interfaces
= [iface
["vim_id"] for iface
in params_copy
["net_list"]]
770 ro_vim_item_update
= {"vim_id": vim_vm_id
,
771 "vim_status": "BUILD",
773 "created_items": created_items
,
775 "interfaces_vim_ids": interfaces
,
779 "task={} {} new-vm={} created={}".format(task_id
, ro_task
["target_id"], vim_vm_id
, created
))
780 return "BUILD", ro_vim_item_update
781 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
782 self
.logger
.error("task={} vim={} new-vm: {}".format(task_id
, ro_task
["target_id"], e
))
783 ro_vim_item_update
= {"vim_status": "VIM_ERROR",
785 "vim_details": str(e
)}
786 return "FAILED", ro_vim_item_update
788 def del_vm(self
, ro_task
, task_index
):
789 task
= ro_task
["tasks"][task_index
]
790 task_id
= task
["task_id"]
791 vm_vim_id
= ro_task
["vim_info"]["vim_id"]
792 ro_vim_item_update_ok
= {"vim_status": "DELETED",
794 "vim_details": "DELETED",
797 if vm_vim_id
or ro_task
["vim_info"]["created_items"]:
798 target_vim
= self
.my_vims
[ro_task
["target_id"]]
799 target_vim
.delete_vminstance(vm_vim_id
, ro_task
["vim_info"]["created_items"])
801 except vimconn
.VimConnNotFoundException
:
802 ro_vim_item_update_ok
["vim_details"] = "already deleted"
804 except vimconn
.VimConnException
as e
:
805 self
.logger
.error("ro_task={} vim={} del-vm={}: {}".format(ro_task
["_id"], ro_task
["target_id"],
807 ro_vim_item_update
= {"vim_status": "VIM_ERROR",
808 "vim_details": "Error while deleting: {}".format(e
)}
809 return "FAILED", ro_vim_item_update
811 self
.logger
.debug("task={} {} del-vm={} {}".format(task_id
, ro_task
["target_id"], vm_vim_id
,
812 ro_vim_item_update_ok
.get("vim_details", "")))
813 return "DONE", ro_vim_item_update_ok
815 def refresh_vm(self
, ro_task
):
816 """Call VIM to get vm status"""
817 ro_task_id
= ro_task
["_id"]
818 target_vim
= self
.my_vims
[ro_task
["target_id"]]
820 vim_id
= ro_task
["vim_info"]["vim_id"]
823 vm_to_refresh_list
= [vim_id
]
825 vim_dict
= target_vim
.refresh_vms_status(vm_to_refresh_list
)
826 vim_info
= vim_dict
[vim_id
]
827 if vim_info
["status"] == "ACTIVE":
829 elif vim_info
["status"] == "BUILD":
830 task_status
= "BUILD"
832 task_status
= "FAILED"
833 except vimconn
.VimConnException
as e
:
834 # Mark all tasks at VIM_ERROR status
835 self
.logger
.error("ro_task={} vim={} get-vm={}: {}".format(ro_task_id
, ro_task
["target_id"], vim_id
, e
))
836 vim_info
= {"status": "VIM_ERROR", "error_msg": str(e
)}
837 task_status
= "FAILED"
839 ro_vim_item_update
= {}
840 # TODO check and update interfaces
842 for vim_iface_id
in ro_task
["vim_info"]["interfaces_vim_ids"]:
843 iface
= next((iface
for iface
in vim_info
["interfaces"] if vim_iface_id
== iface
["vim_interface_id"]), None)
845 # iface.pop("vim_info", None)
846 vim_interfaces
.append(iface
)
848 task
= ro_task
["tasks"][0] # TODO look for a task CREATE and active
849 if task
.get("mgmt_vnf_interface") is not None:
850 vim_interfaces
[task
["mgmt_vnf_interface"]]["mgmt_vnf_interface"] = True
851 mgmt_vdu_iface
= task
.get("mgmt_vdu_interface", task
.get("mgmt_vnf_interface", 0))
852 vim_interfaces
[mgmt_vdu_iface
]["mgmt_vdu_interface"] = True
854 if ro_task
["vim_info"]["interfaces"] != vim_interfaces
:
855 ro_vim_item_update
["interfaces"] = vim_interfaces
856 if ro_task
["vim_info"]["vim_status"] != vim_info
["status"]:
857 ro_vim_item_update
["vim_status"] = vim_info
["status"]
858 if ro_task
["vim_info"]["vim_name"] != vim_info
.get("name"):
859 ro_vim_item_update
["vim_name"] = vim_info
.get("name")
860 if vim_info
["status"] in ("ERROR", "VIM_ERROR"):
861 if ro_task
["vim_info"]["vim_details"] != vim_info
["error_msg"]:
862 ro_vim_item_update
["vim_details"] = vim_info
["error_msg"]
863 elif vim_info
["status"] == "DELETED":
864 ro_vim_item_update
["vim_id"] = None
865 ro_vim_item_update
["vim_details"] = "Deleted externally"
867 if ro_task
["vim_info"]["vim_details"] != vim_info
["vim_info"]:
868 ro_vim_item_update
["vim_details"] = vim_info
["vim_info"]
869 if ro_vim_item_update
:
870 self
.logger
.debug("ro_task={} {} get-vm={}: status={} {}".format(
871 ro_task_id
, ro_task
["target_id"], vim_id
, ro_vim_item_update
.get("vim_status"),
872 ro_vim_item_update
.get("vim_details") if ro_vim_item_update
.get("vim_status") != "ACTIVE" else ''))
873 return task_status
, ro_vim_item_update
875 def exec_vm(self
, ro_task
, task_index
, task_depends
):
876 task
= ro_task
["tasks"][task_index
]
877 task_id
= task
["task_id"]
878 target_vim
= self
.my_vims
[ro_task
["target_id"]]
880 params
= task
["params"]
881 params_copy
= deepcopy(params
)
882 params_copy
["use_pri_key"] = self
.db
.decrypt(params_copy
.pop("private_key"),
883 params_copy
.pop("schema_version"), params_copy
.pop("salt"))
885 target_vim
.inject_user_key(**params_copy
)
887 "task={} {} action-vm=inject_key".format(task_id
, ro_task
["target_id"]))
888 return "DONE", params_copy
["key"]
889 except (vimconn
.VimConnException
, NsWorkerException
) as e
:
890 self
.logger
.error("task={} vim={} new-vm: {}".format(task_id
, ro_task
["target_id"], e
))
891 ro_vim_item_update
= {"vim_details": str(e
)}
892 return "FAILED", ro_vim_item_update
896 self
.logger
.debug("Starting")
899 task
= self
.task_queue
.get(block
=False if self
.my_vims
else True)
900 if task
[0] == "terminate":
902 if task
[0] == "load_vim":
903 self
._load
_vim
(task
[1])
910 ro_task
= self
._get
_db
_task
()
912 self
._proccess
_pending
_tasks
(ro_task
)
916 except Exception as e
:
917 self
.logger
.critical("Unexpected exception at run: " + str(e
), exc_info
=True)
919 self
.logger
.debug("Finishing")