return task
+ @staticmethod
+ def _create_ro_task(
+ target_id: str,
+ task: Dict[str, Any],
+ ) -> Dict[str, Any]:
+ """Function to create an RO task from task information.
+
+ Args:
+ target_id (str): [description]
+ task (Dict[str, Any]): [description]
+
+ Returns:
+ Dict[str, Any]: [description]
+ """
+ now = time()
+
+ _id = task.get("task_id")
+ db_ro_task = {
+ "_id": _id,
+ "locked_by": None,
+ "locked_at": 0.0,
+ "target_id": target_id,
+ "vim_info": {
+ "created": False,
+ "created_items": None,
+ "vim_id": None,
+ "vim_name": None,
+ "vim_status": None,
+ "vim_details": None,
+ "refresh_at": None,
+ },
+ "modified_at": now,
+ "created_at": now,
+ "to_check_at": now,
+ "tasks": [task],
+ }
+
+ return db_ro_task
+
def deploy(self, session, indata, version, nsr_id, *args, **kwargs):
self.logger.debug("ns.deploy nsr_id={} indata={}".format(nsr_id, indata))
validate_input(indata, deploy_schema)
index += 1
- def _create_ro_task(target_id, task):
- nonlocal action_id
- nonlocal task_index
- nonlocal now
-
- _id = task["task_id"]
- db_ro_task = {
- "_id": _id,
- "locked_by": None,
- "locked_at": 0.0,
- "target_id": target_id,
- "vim_info": {
- "created": False,
- "created_items": None,
- "vim_id": None,
- "vim_name": None,
- "vim_status": None,
- "vim_details": None,
- "refresh_at": None,
- },
- "modified_at": now,
- "created_at": now,
- "to_check_at": now,
- "tasks": [task],
- }
-
- return db_ro_task
-
def _process_image_params(target_image, vim_info, target_record_id):
find_params = {}
):
# Create a ro_task
step = "Updating database, Creating ro_tasks"
- db_ro_task = _create_ro_task(target_id, db_task)
+ db_ro_task = Ns._create_ro_task(target_id, db_task)
nb_ro_tasks += 1
self.db.create("ro_tasks", db_ro_task)