# import yaml
import logging
+from typing import Any, Dict
from traceback import format_exc as traceback_format_exc
from osm_ng_ro.ns_thread import NsWorker, NsWorkerException, deep_get
from osm_ng_ro.validation import validate_input, deploy_schema
return db_content
+ @staticmethod
+ def _create_task(
+ deployment_info: Dict[str, Any],
+ target_id: str,
+ item: str,
+ action: str,
+ target_record: str,
+ target_record_id: str,
+ extra_dict: Dict[str, Any] = None,
+ ) -> Dict[str, Any]:
+ """Function to create task dict from deployment information.
+
+ Args:
+ deployment_info (Dict[str, Any]): [description]
+ target_id (str): [description]
+ item (str): [description]
+ action (str): [description]
+ target_record (str): [description]
+ target_record_id (str): [description]
+ extra_dict (Dict[str, Any], optional): [description]. Defaults to None.
+
+ Returns:
+ Dict[str, Any]: [description]
+ """
+ task = {
+ "target_id": target_id, # it will be removed before pushing at database
+ "action_id": deployment_info.get("action_id"),
+ "nsr_id": deployment_info.get("nsr_id"),
+ "task_id": f"{deployment_info.get('action_id')}:{deployment_info.get('task_index')}",
+ "status": "SCHEDULED",
+ "action": action,
+ "item": item,
+ "target_record": target_record,
+ "target_record_id": target_record_id,
+ }
+
+ if extra_dict:
+ task.update(extra_dict) # params, find_params, depends_on
+
+ deployment_info["task_index"] = deployment_info.get("task_index", 0) + 1
+
+ return task
+
+ @staticmethod
+ def _create_ro_task(
+ target_id: str,
+ task: Dict[str, Any],
+ ) -> Dict[str, Any]:
+ """Function to create an RO task from task information.
+
+ Args:
+ target_id (str): [description]
+ task (Dict[str, Any]): [description]
+
+ Returns:
+ Dict[str, Any]: [description]
+ """
+ now = time()
+
+ _id = task.get("task_id")
+ db_ro_task = {
+ "_id": _id,
+ "locked_by": None,
+ "locked_at": 0.0,
+ "target_id": target_id,
+ "vim_info": {
+ "created": False,
+ "created_items": None,
+ "vim_id": None,
+ "vim_name": None,
+ "vim_status": None,
+ "vim_details": None,
+ "refresh_at": None,
+ },
+ "modified_at": now,
+ "created_at": now,
+ "to_check_at": now,
+ "tasks": [task],
+ }
+
+ return db_ro_task
+
def deploy(self, session, indata, version, nsr_id, *args, **kwargs):
self.logger.debug("ns.deploy nsr_id={} indata={}".format(nsr_id, indata))
validate_input(indata, deploy_schema)
index += 1
- def _create_task(
- target_id,
- item,
- action,
- target_record,
- target_record_id,
- extra_dict=None,
- ):
- nonlocal task_index
- nonlocal action_id
- nonlocal nsr_id
-
- task = {
- "target_id": target_id, # it will be removed before pushing at database
- "action_id": action_id,
- "nsr_id": nsr_id,
- "task_id": "{}:{}".format(action_id, task_index),
- "status": "SCHEDULED",
- "action": action,
- "item": item,
- "target_record": target_record,
- "target_record_id": target_record_id,
- }
-
- if extra_dict:
- task.update(extra_dict) # params, find_params, depends_on
-
- task_index += 1
-
- return task
-
- def _create_ro_task(target_id, task):
- nonlocal action_id
- nonlocal task_index
- nonlocal now
-
- _id = task["task_id"]
- db_ro_task = {
- "_id": _id,
- "locked_by": None,
- "locked_at": 0.0,
- "target_id": target_id,
- "vim_info": {
- "created": False,
- "created_items": None,
- "vim_id": None,
- "vim_name": None,
- "vim_status": None,
- "vim_details": None,
- "refresh_at": None,
- },
- "modified_at": now,
- "created_at": now,
- "to_check_at": now,
- "tasks": [task],
- }
-
- return db_ro_task
-
def _process_image_params(target_image, vim_info, target_record_id):
find_params = {}
):
nonlocal db_new_tasks
nonlocal tasks_by_target_record_id
+ nonlocal action_id
+ nonlocal nsr_id
nonlocal task_index
# ensure all the target_list elements has an "id". If not assign the index as id
item_ = "sdn_net"
target_record_id += ".sdn"
- task = _create_task(
- target_vim,
- item_,
- "DELETE",
- target_record="{}.{}.vim_info.{}".format(
- db_record, item_index, target_vim
- ),
+ deployment_info = {
+ "action_id": action_id,
+ "nsr_id": nsr_id,
+ "task_index": task_index,
+ }
+
+ task = Ns._create_task(
+ deployment_info=deployment_info,
+ target_id=target_vim,
+ item=item_,
+ action="DELETE",
+ target_record=f"{db_record}.{item_index}.vim_info.{target_vim}",
target_record_id=target_record_id,
)
+
+ task_index = deployment_info.get("task_index")
+
tasks_by_target_record_id[target_record_id] = task
db_new_tasks.append(task)
# TODO delete
target_item, target_viminfo, target_record_id
)
self._assign_vim(target_vim)
- task = _create_task(
- target_vim,
- item_,
- "CREATE",
- target_record="{}.{}.vim_info.{}".format(
- db_record, item_index, target_vim
- ),
+
+ deployment_info = {
+ "action_id": action_id,
+ "nsr_id": nsr_id,
+ "task_index": task_index,
+ }
+
+ task = Ns._create_task(
+ deployment_info=deployment_info,
+ target_id=target_vim,
+ item=item_,
+ action="CREATE",
+ target_record=f"{db_record}.{item_index}.vim_info.{target_vim}",
target_record_id=target_record_id,
extra_dict=extra_dict,
)
+
+ task_index = deployment_info.get("task_index")
+
tasks_by_target_record_id[target_record_id] = task
db_new_tasks.append(task)
def _process_action(indata):
nonlocal db_new_tasks
+ nonlocal action_id
+ nonlocal nsr_id
nonlocal task_index
nonlocal db_vnfrs
nonlocal db_ro_nsr
],
},
}
- task = _create_task(
- target_vim,
- "vdu",
- "EXEC",
+
+ deployment_info = {
+ "action_id": action_id,
+ "nsr_id": nsr_id,
+ "task_index": task_index,
+ }
+
+ task = Ns._create_task(
+ deployment_info=deployment_info,
+ target_id=target_vim,
+ item="vdu",
+ action="EXEC",
target_record=target_record,
target_record_id=None,
extra_dict=extra_dict,
)
+
+ task_index = deployment_info.get("task_index")
+
db_new_tasks.append(task)
with self.write_lock:
):
# Create a ro_task
step = "Updating database, Creating ro_tasks"
- db_ro_task = _create_ro_task(target_id, db_task)
+ db_ro_task = Ns._create_ro_task(target_id, db_task)
nb_ro_tasks += 1
self.db.create("ro_tasks", db_ro_task)