:return: reference to the new revision number of the KDU instance
"""
+ @abc.abstractmethod
+ async def scale(
+ self, kdu_instance: str,
+ scale: int,
+ resource_name: str,
+ total_timeout: float = 1800,
+ **kwargs,
+ ) -> bool:
+ """
+ Scales an application in KDU instance.
+
+ :param: kdu_instance str: KDU instance name
+ :param: scale int: Scale to which to set this application
+ :param: resource_name str: Resource name (Application name)
+ :param: timeout float: The time, in seconds, to wait for the install
+ to finish
+ :param kwargs: Additional parameters
+
+ :return: If successful, returns True
+ """
+
+ @abc.abstractmethod
+ async def get_scale_count(
+ self,
+ resource_name: str,
+ kdu_instance: str,
+ **kwargs,
+ ) -> int:
+ """
+ Get an application scale count.
+
+ :param: resource_name str: Resource name (Application name)
+ :param: kdu_instance str: KDU instance name
+ :param kwargs: Additional parameters
+
+ :return: Return application instance count
+ """
+
@abc.abstractmethod
async def rollback(
self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
else:
return 0
+ async def scale(
+ self,
+ kdu_instance: str,
+ scale: int,
+ resource_name: str,
+ total_timeout: float = 1800,
+ **kwargs,
+ ):
+ raise NotImplementedError("Method not implemented")
+
+ async def get_scale_count(
+ self,
+ resource_name: str,
+ kdu_instance: str,
+ **kwargs,
+ ):
+ raise NotImplementedError("Method not implemented")
+
async def rollback(
self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
):
)
return True
+ async def scale(
+ self,
+ kdu_instance: str,
+ scale: int,
+ resource_name: str,
+ total_timeout: float = 1800,
+ **kwargs,
+ ) -> bool:
+ """Scale an application in a model
+
+ :param: kdu_instance str: KDU instance name
+ :param: scale int: Scale to which to set this application
+ :param: resource_name str: Resource name (Application name)
+ :param: timeout float: The time, in seconds, to wait for the install
+ to finish
+ :param kwargs: Additional parameters
+ vca_id (str): VCA ID
+
+ :return: If successful, returns True
+ """
+
+ try:
+ libjuju = await self._get_libjuju(kwargs.get("vca_id"))
+ await libjuju.scale_application(
+ model_name=kdu_instance,
+ application_name=resource_name,
+ scale=scale,
+ total_timeout=total_timeout
+ )
+ except Exception as e:
+ error_msg = "Error scaling application {} in kdu instance {}: {}".format(
+ resource_name, kdu_instance, e)
+ self.log.error(error_msg)
+ raise K8sException(message=error_msg)
+ return True
+
+ async def get_scale_count(
+ self,
+ resource_name: str,
+ kdu_instance: str,
+ **kwargs,
+ ) -> int:
+ """Get an application scale count
+
+ :param: resource_name str: Resource name (Application name)
+ :param: kdu_instance str: KDU instance name
+ :param kwargs: Additional parameters
+ vca_id (str): VCA ID
+ :return: Return application instance count
+ """
+ try:
+ libjuju = await self._get_libjuju(kwargs.get("vca_id"))
+ status = await libjuju.get_model_status(kdu_instance)
+ return len(status.applications[resource_name].units)
+ except Exception as e:
+ error_msg = "Error getting scale count from application {} in kdu instance {}: {}".format(
+ resource_name, kdu_instance, e)
+ self.log.error(error_msg)
+ raise K8sException(message=error_msg)
+
async def instances_list(self, cluster_uuid: str) -> list:
"""
returns a list of deployed releases in a cluster
return application
+ async def scale_application(
+ self,
+ model_name: str,
+ application_name: str,
+ scale: int = 1,
+ total_timeout: float = None,
+ ):
+ """
+ Scale application (K8s)
+
+ :param: model_name: Model name
+ :param: application_name: Application name
+ :param: scale: Scale to which to set this application
+ :param: total_timeout: Timeout for the entity to be active
+ """
+
+ model = None
+ controller = await self.get_controller()
+ try:
+ model = await self.get_model(controller, model_name)
+
+ self.log.debug(
+ "Scaling application {} in model {}".format(
+ application_name, model_name
+ )
+ )
+ application = self._get_application(model, application_name)
+ if application is None:
+ raise JujuApplicationNotFound("Cannot scale application")
+ await application.scale(scale=scale)
+ # Wait until application is scaled in model
+ self.log.debug(
+ "Waiting for application {} to be scaled in model {}...".format
+ (
+ application_name, model_name
+ )
+ )
+ if total_timeout is None:
+ total_timeout = 1800
+ end = time.time() + total_timeout
+ while time.time() < end:
+ application_scale = self._get_application_count(model, application_name)
+ # Before calling wait_for_model function,
+ # wait until application unit count and scale count are equal.
+ # Because there is a delay before scaling triggers in Juju model.
+ if application_scale == scale:
+ await JujuModelWatcher.wait_for_model(model=model, timeout=total_timeout)
+ self.log.debug(
+ "Application {} is scaled in model {}".format(
+ application_name, model_name
+ )
+ )
+ return
+ await asyncio.sleep(5)
+ raise Exception(
+ "Timeout waiting for application {} in model {} to be scaled".format(
+ application_name, model_name
+ )
+ )
+ finally:
+ if model:
+ await self.disconnect_model(model)
+ await self.disconnect_controller(controller)
+
+ def _get_application_count(self, model: Model, application_name: str) -> int:
+ """Get number of units of the application
+
+ :param: model: Model object
+ :param: application_name: Application name
+
+ :return: int (or None if application doesn't exist)
+ """
+ application = self._get_application(model, application_name)
+ if application is not None:
+ return len(application.units)
+
def _get_application(self, model: Model, application_name: str) -> Application:
"""Get application
from unittest.mock import Mock
from n2vc.k8s_juju_conn import K8sJujuConnector, RBAC_LABEL_KEY_NAME
from osm_common import fslocal
-from .utils import kubeconfig, FakeModel, FakeFileWrapper, AsyncMock
+from .utils import kubeconfig, FakeModel, FakeFileWrapper, AsyncMock, FakeApplication
from n2vc.exceptions import (
MethodNotImplemented,
K8sException,
self.k8s_juju_conn.libjuju.get_executed_actions.assert_not_called()
self.k8s_juju_conn.libjuju.get_actions.assert_not_called_once()
self.k8s_juju_conn.libjuju.get_application_configs.assert_not_called_once()
+
+
+class ScaleTest(K8sJujuConnTestCase):
+ def setUp(self):
+ super(ScaleTest, self).setUp()
+ self.application_name = "app"
+ self.kdu_name = "kdu-instance"
+ self._scale = 2
+ self.k8s_juju_conn.libjuju.scale_application = AsyncMock()
+
+ def test_success(
+ self
+ ):
+ self.loop.run_until_complete(
+ self.k8s_juju_conn.scale(
+ self.kdu_name,
+ self._scale,
+ self.application_name
+ )
+ )
+ self.k8s_juju_conn.libjuju.scale_application.assert_called_once()
+
+ def test_exception(self):
+ self.k8s_juju_conn.libjuju.scale_application.side_effect = Exception()
+ with self.assertRaises(Exception):
+ self.loop.run_until_complete(
+ self.k8s_juju_conn.scale(
+ self.kdu_name,
+ self._scale,
+ self.application_name
+ )
+ )
+ self.k8s_juju_conn.libjuju.scale_application.assert_called_once()
+
+
+class GetScaleCount(K8sJujuConnTestCase):
+ def setUp(self):
+ super(GetScaleCount, self).setUp()
+ self.k8s_juju_conn.libjuju.get_model_status = AsyncMock()
+
+ def test_success(self):
+ applications = {"app": FakeApplication()}
+ model = FakeModel(applications=applications)
+ self.k8s_juju_conn.libjuju.get_model_status.return_value = model
+ status = self.loop.run_until_complete(
+ self.k8s_juju_conn.get_scale_count("app", "kdu_instance")
+ )
+ self.assertEqual(status, 2)
+ self.k8s_juju_conn.libjuju.get_model_status.assert_called_once()
+
+ def test_exception(self):
+ self.k8s_juju_conn.libjuju.get_model_status.side_effect = Exception()
+ status = None
+ with self.assertRaises(Exception):
+ status = self.loop.run_until_complete(
+ self.k8s_juju_conn.status_kdu("app", "kdu_instance")
+ )
+ self.assertIsNone(status)
+ self.k8s_juju_conn.libjuju.get_model_status.assert_called_once()
"Cannot set both token and user/pass",
)
self.assertTrue(exception_raised)
+
+
+@asynctest.mock.patch("n2vc.libjuju.Libjuju.get_controller")
+@asynctest.mock.patch("n2vc.libjuju.Libjuju.get_model")
+@asynctest.mock.patch("n2vc.libjuju.Libjuju._get_application")
+@asynctest.mock.patch("n2vc.libjuju.Libjuju.disconnect_model")
+@asynctest.mock.patch("n2vc.libjuju.Libjuju.disconnect_controller")
+@asynctest.mock.patch("n2vc.juju_watcher.JujuModelWatcher.wait_for_model")
+class ScaleApplicationTest(LibjujuTestCase):
+ def setUp(self):
+ super(ScaleApplicationTest, self).setUp()
+
+ @asynctest.mock.patch("asyncio.sleep")
+ def test_scale_application(
+ self,
+ mock_sleep,
+ mock_wait_for_model,
+ mock_disconnect_controller,
+ mock_disconnect_model,
+ mock_get_application,
+ mock_get_model,
+ mock_get_controller,
+ ):
+ mock_get_model.return_value = juju.model.Model()
+ mock_get_application.return_value = FakeApplication()
+ self.loop.run_until_complete(
+ self.libjuju.scale_application(
+ "model",
+ "app",
+ 2
+ )
+ )
+ mock_wait_for_model.assert_called_once()
+ mock_disconnect_controller.assert_called_once()
+ mock_disconnect_model.assert_called_once()
+
+ def test_no_application(
+ self,
+ mock_wait_for,
+ mock_disconnect_controller,
+ mock_disconnect_model,
+ mock_get_application,
+ mock_get_model,
+ mock_get_controller,
+ ):
+ mock_get_application.return_value = None
+ mock_get_model.return_value = juju.model.Model()
+ with self.assertRaises(JujuApplicationNotFound):
+ self.loop.run_until_complete(
+ self.libjuju.scale_application(
+ "model",
+ "app",
+ 2
+ )
+ )
+ mock_disconnect_controller.assert_called()
+ mock_disconnect_model.assert_called()
+
+ def test_exception(
+ self,
+ mock_wait_for,
+ mock_disconnect_controller,
+ mock_disconnect_model,
+ mock_get_application,
+ mock_get_model,
+ mock_get_controller,
+ ):
+ mock_get_model.return_value = None
+ mock_get_application.return_value = FakeApplication()
+ with self.assertRaises(Exception):
+ self.loop.run_until_complete(
+ self.libjuju.scale_application(
+ "model",
+ "app",
+ 2,
+ total_timeout=0
+ )
+ )
+ mock_disconnect_controller.assert_called_once()
+
+
+@asynctest.mock.patch("n2vc.libjuju.Libjuju._get_application")
+class GetUnitNumberTest(LibjujuTestCase):
+ def setUp(self):
+ super(GetUnitNumberTest, self).setUp()
+
+ def test_succesful_get_unit_number(
+ self,
+ mock_get_applications,
+ ):
+ mock_get_applications.return_value = FakeApplication()
+ model = juju.model.Model()
+ result = self.libjuju._get_application_count(model, "app")
+ self.assertEqual(result, 2)
+
+ def test_non_existing_application(
+ self,
+ mock_get_applications,
+ ):
+ mock_get_applications.return_value = None
+ model = juju.model.Model()
+ result = self.libjuju._get_application_count(model, "app")
+ self.assertEqual(result, None)
async def is_leader_from_status(self):
return True
- async def run_action(self, action_name):
+ async def run_action(self, action_name, **kwargs):
return FakeAction()
async def get_config(self):
return ["app_config"]
+ async def scale(self, scale):
+ pass
+
units = [FakeUnit(), FakeUnit()]
# See the License for the specific language governing permissions and
# limitations under the License.
-juju==2.8.4
+juju==2.8.6
kubernetes==10.0.1
pyasn1
motor==1.3.1
# via kubernetes
idna==2.10
# via requests
-juju==2.8.4
+juju==2.8.6
# via -r requirements.in
jujubundlelib==0.5.6
# via theblues