"k8scluster", k8scluster_id, order_id, "k8scluster_create", task
)
return
+ elif command == "edit" or command == "edited":
+ k8scluster_id = params.get("_id")
+ task = asyncio.ensure_future(self.k8scluster.edit(params, order_id))
+ self.lcm_tasks.register(
+ "k8scluster", k8scluster_id, order_id, "k8scluster_edit", task
+ )
+ return
elif command == "delete" or command == "deleted":
k8scluster_id = params.get("_id")
task = asyncio.ensure_future(self.k8scluster.delete(params, order_id))
from osm_common import msgbase
from osm_common.dbbase import DbException
-from osm_lcm.vim_sdn import VcaLcm
+from osm_lcm.vim_sdn import K8sClusterLcm, VcaLcm
class AsyncMock(MagicMock):
)
self.lcm_tasks.unlock_HA.not_called()
self.lcm_tasks.remove.assert_called_with("vca", "id", "order-id")
+
+
+class TestK8SClusterLcm(TestCase):
+ @patch("osm_lcm.vim_sdn.K8sHelmConnector")
+ @patch("osm_lcm.vim_sdn.K8sHelm3Connector")
+ @patch("osm_lcm.vim_sdn.K8sJujuConnector")
+ @patch("osm_lcm.lcm_utils.Database")
+ @patch("osm_lcm.lcm_utils.Filesystem")
+ def setUp(
+ self,
+ mock_filesystem,
+ mock_database,
+ juju_connector,
+ helm3_connector,
+ helm_connector,
+ ):
+ self.loop = asyncio.get_event_loop()
+ self.msg = Mock(msgbase.MsgBase())
+ self.lcm_tasks = Mock()
+ self.config = {"database": {"driver": "mongo"}}
+ self.vca_config = {
+ "VCA": {
+ "helmpath": "/usr/local/bin/helm",
+ "helm3path": "/usr/local/bin/helm3",
+ "kubectlpath": "/usr/bin/kubectl",
+ }
+ }
+ self.k8scluster_lcm = K8sClusterLcm(
+ self.msg, self.lcm_tasks, self.vca_config, self.loop
+ )
+ self.k8scluster_lcm.db = Mock()
+ self.k8scluster_lcm.fs = Mock()
+
+ def test_k8scluster_edit(self):
+ k8scluster_content = {"op_id": "op-id", "_id": "id"}
+ order_id = "order-id"
+ self.lcm_tasks.lock_HA.return_value = True
+ self.loop.run_until_complete(
+ self.k8scluster_lcm.edit(k8scluster_content, order_id)
+ )
+ self.lcm_tasks.unlock_HA.assert_called_with(
+ "k8scluster",
+ "edit",
+ "op-id",
+ operationState="COMPLETED",
+ detailed_status="Not implemented",
+ )
+ self.lcm_tasks.remove.assert_called_with("k8scluster", "id", order_id)
+
+ def test_k8scluster_edit_lock_false(self):
+ k8scluster_content = {"op_id": "op-id", "_id": "id"}
+ order_id = "order-id"
+ self.lcm_tasks.lock_HA.return_value = False
+ self.loop.run_until_complete(
+ self.k8scluster_lcm.edit(k8scluster_content, order_id)
+ )
+ self.lcm_tasks.unlock_HA.assert_not_called()
+ self.lcm_tasks.remove.assert_not_called()
+
+ def test_k8scluster_edit_no_opid(self):
+ k8scluster_content = {"_id": "id"}
+ order_id = "order-id"
+ self.lcm_tasks.lock_HA.return_value = True
+ self.loop.run_until_complete(
+ self.k8scluster_lcm.edit(k8scluster_content, order_id)
+ )
+ self.lcm_tasks.unlock_HA.assert_called_with(
+ "k8scluster",
+ "edit",
+ None,
+ operationState="COMPLETED",
+ detailed_status="Not implemented",
+ )
+ self.lcm_tasks.remove.assert_called_with("k8scluster", "id", order_id)
+
+ def test_k8scluster_edit_no_orderid(self):
+ k8scluster_content = {"op_id": "op-id", "_id": "id"}
+ order_id = None
+ self.lcm_tasks.lock_HA.return_value = True
+ self.loop.run_until_complete(
+ self.k8scluster_lcm.edit(k8scluster_content, order_id)
+ )
+ self.lcm_tasks.unlock_HA.assert_called_with(
+ "k8scluster",
+ "edit",
+ "op-id",
+ operationState="COMPLETED",
+ detailed_status="Not implemented",
+ )
+ self.lcm_tasks.remove.assert_called_with("k8scluster", "id", order_id)
self.logger.error(logging_text + "Cannot update database: {}".format(e))
self.lcm_tasks.remove("k8scluster", k8scluster_id, order_id)
+ async def edit(self, k8scluster_content, order_id):
+
+ op_id = k8scluster_content.pop("op_id", None)
+ if not self.lcm_tasks.lock_HA("k8scluster", "edit", op_id):
+ return
+
+ k8scluster_id = k8scluster_content["_id"]
+ logging_text = "Task k8scluster_edit={} ".format(k8scluster_id)
+ self.logger.debug(logging_text + "Enter")
+
+ # TODO the implementation is pending and will be part of a new feature
+ # It will support rotation of certificates, update of credentials and K8S API endpoint
+ # At the moment the operation is set as completed
+
+ operation_state = "COMPLETED"
+ operation_details = "Not implemented"
+
+ self.lcm_tasks.unlock_HA(
+ "k8scluster",
+ "edit",
+ op_id,
+ operationState=operation_state,
+ detailed_status=operation_details,
+ )
+ self.lcm_tasks.remove("k8scluster", k8scluster_id, order_id)
+
async def delete(self, k8scluster_content, order_id):
# HA tasks and backward compatibility: