+ @staticmethod
+ def _create_task(
+ deployment_info: Dict[str, Any],
+ target_id: str,
+ item: str,
+ action: str,
+ target_record: str,
+ target_record_id: str,
+ extra_dict: Dict[str, Any] = None,
+ ) -> Dict[str, Any]:
+ """Function to create task dict from deployment information.
+
+ Args:
+ deployment_info (Dict[str, Any]): [description]
+ target_id (str): [description]
+ item (str): [description]
+ action (str): [description]
+ target_record (str): [description]
+ target_record_id (str): [description]
+ extra_dict (Dict[str, Any], optional): [description]. Defaults to None.
+
+ Returns:
+ Dict[str, Any]: [description]
+ """
+ task = {
+ "target_id": target_id, # it will be removed before pushing at database
+ "action_id": deployment_info.get("action_id"),
+ "nsr_id": deployment_info.get("nsr_id"),
+ "task_id": f"{deployment_info.get('action_id')}:{deployment_info.get('task_index')}",
+ "status": "SCHEDULED",
+ "action": action,
+ "item": item,
+ "target_record": target_record,
+ "target_record_id": target_record_id,
+ }
+
+ if extra_dict:
+ task.update(extra_dict) # params, find_params, depends_on
+
+ deployment_info["task_index"] = deployment_info.get("task_index", 0) + 1
+
+ return task
+
+ @staticmethod
+ def _create_ro_task(
+ target_id: str,
+ task: Dict[str, Any],
+ ) -> Dict[str, Any]:
+ """Function to create an RO task from task information.
+
+ Args:
+ target_id (str): [description]
+ task (Dict[str, Any]): [description]
+
+ Returns:
+ Dict[str, Any]: [description]
+ """
+ now = time()
+
+ _id = task.get("task_id")
+ db_ro_task = {
+ "_id": _id,
+ "locked_by": None,
+ "locked_at": 0.0,
+ "target_id": target_id,
+ "vim_info": {
+ "created": False,
+ "created_items": None,
+ "vim_id": None,
+ "vim_name": None,
+ "vim_status": None,
+ "vim_details": None,
+ "refresh_at": None,
+ },
+ "modified_at": now,
+ "created_at": now,
+ "to_check_at": now,
+ "tasks": [task],
+ }
+
+ return db_ro_task
+
+ @staticmethod
+ def _process_image_params(
+ target_image: Dict[str, Any],
+ vim_info: Dict[str, Any],
+ target_record_id: str,
+ ) -> Dict[str, Any]:
+ """Function to process VDU image parameters.
+
+ Args:
+ target_image (Dict[str, Any]): [description]
+ vim_info (Dict[str, Any]): [description]
+ target_record_id (str): [description]
+
+ Returns:
+ Dict[str, Any]: [description]
+ """
+ find_params = {}
+
+ if target_image.get("image"):
+ find_params["filter_dict"] = {"name": target_image.get("image")}
+
+ if target_image.get("vim_image_id"):
+ find_params["filter_dict"] = {"id": target_image.get("vim_image_id")}
+
+ if target_image.get("image_checksum"):
+ find_params["filter_dict"] = {
+ "checksum": target_image.get("image_checksum")
+ }
+
+ return {"find_params": find_params}
+