super().__init__(msg, self.logger)
async def create(self, vim_content, order_id):
-
# HA tasks and backward compatibility:
# If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
# In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
self.lcm_tasks.remove("vim_account", vim_id, order_id)
async def edit(self, vim_content, order_id):
-
# HA tasks and backward compatibility:
# If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
# In such a case, HA is not supported by NBI, and the HA check always returns True
self.lcm_tasks.remove("vim_account", vim_id, order_id)
async def delete(self, vim_content, order_id):
-
# HA tasks and backward compatibility:
# If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
# In such a case, HA is not supported by NBI, and the HA check always returns True
super().__init__(msg, self.logger)
async def create(self, wim_content, order_id):
-
# HA tasks and backward compatibility:
# If 'wim_content' does not include 'op_id', we a running a legacy NBI version.
# In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
self.lcm_tasks.remove("wim_account", wim_id, order_id)
async def edit(self, wim_content, order_id):
-
# HA tasks and backward compatibility:
# If 'wim_content' does not include 'op_id', we a running a legacy NBI version.
# In such a case, HA is not supported by NBI, and the HA check always returns True
and db_wim["_admin"].get("deployed")
and db_wim["_admin"]["deployed"].get("RO")
):
-
RO_wim_id = db_wim["_admin"]["deployed"]["RO"]
step = "Editing wim at RO"
RO = ROclient.ROClient(self.loop, **self.ro_config)
self.lcm_tasks.remove("wim_account", wim_id, order_id)
async def delete(self, wim_content, order_id):
-
# HA tasks and backward compatibility:
# If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
# In such a case, HA is not supported by NBI, and the HA check always returns True
super().__init__(msg, self.logger)
async def create(self, sdn_content, order_id):
-
# HA tasks and backward compatibility:
# If 'sdn_content' does not include 'op_id', we a running a legacy NBI version.
# In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
self.lcm_tasks.remove("sdn", sdn_id, order_id)
async def edit(self, sdn_content, order_id):
-
# HA tasks and backward compatibility:
# If 'sdn_content' does not include 'op_id', we a running a legacy NBI version.
# In such a case, HA is not supported by NBI, and the HA check always returns True
self.lcm_tasks.remove("sdn", sdn_id, order_id)
async def delete(self, sdn_content, order_id):
-
# HA tasks and backward compatibility:
# If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
# In such a case, HA is not supported by NBI, and the HA check always returns True
}
async def create(self, k8scluster_content, order_id):
-
op_id = k8scluster_content.pop("op_id", None)
if not self.lcm_tasks.lock_HA("k8scluster", "create", op_id):
return
self.logger.error(logging_text + "Cannot update database: {}".format(e))
self.lcm_tasks.remove("k8scluster", k8scluster_id, order_id)
- async def delete(self, k8scluster_content, order_id):
+ async def edit(self, k8scluster_content, order_id):
+ op_id = k8scluster_content.pop("op_id", None)
+ if not self.lcm_tasks.lock_HA("k8scluster", "edit", op_id):
+ return
+
+ k8scluster_id = k8scluster_content["_id"]
+ logging_text = "Task k8scluster_edit={} ".format(k8scluster_id)
+ self.logger.debug(logging_text + "Enter")
+
+ # TODO the implementation is pending and will be part of a new feature
+ # It will support rotation of certificates, update of credentials and K8S API endpoint
+ # At the moment the operation is set as completed
+
+ operation_state = "COMPLETED"
+ operation_details = "Not implemented"
+
+ self.lcm_tasks.unlock_HA(
+ "k8scluster",
+ "edit",
+ op_id,
+ operationState=operation_state,
+ detailed_status=operation_details,
+ )
+ self.lcm_tasks.remove("k8scluster", k8scluster_id, order_id)
+
+ async def delete(self, k8scluster_content, order_id):
# HA tasks and backward compatibility:
# If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
# In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
db_k8scluster_update["_admin.helm-chart.operationalState"] = "DISABLED"
if k8s_h3c_id:
- step = "Removing helm-chart-v3 '{}'".format(k8s_hc_id)
+ step = "Removing helm-chart-v3 '{}'".format(k8s_h3c_id)
uninstall_sw = (
deep_get(db_k8scluster, ("_admin", "helm-chart-v3", "created"))
or False
self.lcm_tasks.remove("vca", vca_id, order_id)
async def delete(self, vca_content, order_id):
-
# HA tasks and backward compatibility:
# If "vim_content" does not include "op_id", we a running a legacy NBI version.
# In such a case, HA is not supported by NBI, "op_id" is None, and lock_HA() will do nothing.
)
async def create(self, k8srepo_content, order_id):
-
# HA tasks and backward compatibility:
# If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
# In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
self.lcm_tasks.remove("k8srepo", k8srepo_id, order_id)
async def delete(self, k8srepo_content, order_id):
-
# HA tasks and backward compatibility:
# If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
# In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.