1 # Copyright 2020 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
22 from juju
.model
import Model
23 from juju
.machine
import Machine
24 from juju
.application
import Application
25 from juju
.unit
import Unit
26 from juju
.client
._definitions
import (
28 QueryApplicationOffersResults
,
32 from juju
.controller
import Controller
33 from juju
.client
import client
36 from n2vc
.juju_watcher
import JujuModelWatcher
37 from n2vc
.provisioner
import AsyncSSHProvisioner
38 from n2vc
.n2vc_conn
import N2VCConnector
39 from n2vc
.exceptions
import (
41 JujuApplicationNotFound
,
42 JujuLeaderUnitNotFound
,
44 JujuControllerFailedConnecting
,
45 JujuApplicationExists
,
46 JujuInvalidK8sConfiguration
,
49 from n2vc
.vca
.cloud
import Cloud
as VcaCloud
50 from n2vc
.vca
.connection
import Connection
51 from kubernetes
.client
.configuration
import Configuration
52 from retrying_async
import retry
55 RBAC_LABEL_KEY_NAME
= "rbac-id"
61 vca_connection
: Connection
,
62 loop
: asyncio
.AbstractEventLoop
= None,
63 log
: logging
.Logger
= None,
64 n2vc
: N2VCConnector
= None,
69 :param: vca_connection: n2vc.vca.connection object
70 :param: loop: Asyncio loop
72 :param: n2vc: N2VC object
75 self
.log
= log
or logging
.getLogger("Libjuju")
77 self
.vca_connection
= vca_connection
79 self
.loop
= loop
or asyncio
.get_event_loop()
80 self
.loop
.set_exception_handler(self
.handle_exception
)
81 self
.creating_model
= asyncio
.Lock(loop
=self
.loop
)
83 if self
.vca_connection
.is_default
:
84 self
.health_check_task
= self
._create
_health
_check
_task
()
86 def _create_health_check_task(self
):
87 return self
.loop
.create_task(self
.health_check())
89 async def get_controller(self
, timeout
: float = 60.0) -> Controller
:
93 :param: timeout: Time in seconds to wait for controller to connect
97 controller
= Controller()
98 await asyncio
.wait_for(
100 endpoint
=self
.vca_connection
.data
.endpoints
,
101 username
=self
.vca_connection
.data
.user
,
102 password
=self
.vca_connection
.data
.secret
,
103 cacert
=self
.vca_connection
.data
.cacert
,
107 if self
.vca_connection
.is_default
:
108 endpoints
= await controller
.api_endpoints
110 endpoint
in self
.vca_connection
.endpoints
for endpoint
in endpoints
112 await self
.vca_connection
.update_endpoints(endpoints
)
114 except asyncio
.CancelledError
as e
:
116 except Exception as e
:
118 "Failed connecting to controller: {}... {}".format(
119 self
.vca_connection
.data
.endpoints
, e
123 await self
.disconnect_controller(controller
)
125 raise JujuControllerFailedConnecting(
126 f
"Error connecting to Juju controller: {e}"
129 async def disconnect(self
):
131 # Cancel health check task
132 self
.health_check_task
.cancel()
133 self
.log
.debug("Libjuju disconnected!")
135 async def disconnect_model(self
, model
: Model
):
139 :param: model: Model that will be disconnected
141 await model
.disconnect()
143 async def disconnect_controller(self
, controller
: Controller
):
145 Disconnect controller
147 :param: controller: Controller that will be disconnected
150 await controller
.disconnect()
152 @retry(attempts
=3, delay
=5, timeout
=None)
153 async def add_model(self
, model_name
: str, cloud
: VcaCloud
):
157 :param: model_name: Model name
158 :param: cloud: Cloud object
162 controller
= await self
.get_controller()
165 # Block until other workers have finished model creation
166 while self
.creating_model
.locked():
167 await asyncio
.sleep(0.1)
170 async with self
.creating_model
:
171 if await self
.model_exists(model_name
, controller
=controller
):
173 self
.log
.debug("Creating model {}".format(model_name
))
174 model
= await controller
.add_model(
176 config
=self
.vca_connection
.data
.model_config
,
177 cloud_name
=cloud
.name
,
178 credential_name
=cloud
.credential_name
,
180 except juju
.errors
.JujuAPIError
as e
:
181 if "already exists" in e
.message
:
187 await self
.disconnect_model(model
)
188 await self
.disconnect_controller(controller
)
190 async def get_executed_actions(self
, model_name
: str) -> list:
192 Get executed/history of actions for a model.
194 :param: model_name: Model name, str.
195 :return: List of executed actions for a model.
198 executed_actions
= []
199 controller
= await self
.get_controller()
201 model
= await self
.get_model(controller
, model_name
)
202 # Get all unique action names
204 for application
in model
.applications
:
205 application_actions
= await self
.get_actions(application
, model_name
)
206 actions
.update(application_actions
)
207 # Get status of all actions
208 for application_action
in actions
:
209 app_action_status_list
= await model
.get_action_status(
210 name
=application_action
212 for action_id
, action_status
in app_action_status_list
.items():
215 "action": application_action
,
216 "status": action_status
,
218 # Get action output by id
219 action_status
= await model
.get_action_output(executed_action
["id"])
220 for k
, v
in action_status
.items():
221 executed_action
[k
] = v
222 executed_actions
.append(executed_action
)
223 except Exception as e
:
225 "Error in getting executed actions for model: {}. Error: {}".format(
231 await self
.disconnect_model(model
)
232 await self
.disconnect_controller(controller
)
233 return executed_actions
235 async def get_application_configs(
236 self
, model_name
: str, application_name
: str
239 Get available configs for an application.
241 :param: model_name: Model name, str.
242 :param: application_name: Application name, str.
244 :return: A dict which has key - action name, value - action description
247 application_configs
= {}
248 controller
= await self
.get_controller()
250 model
= await self
.get_model(controller
, model_name
)
251 application
= self
._get
_application
(
252 model
, application_name
=application_name
254 application_configs
= await application
.get_config()
255 except Exception as e
:
257 "Error in getting configs for application: {} in model: {}. Error: {}".format(
258 application_name
, model_name
, str(e
)
263 await self
.disconnect_model(model
)
264 await self
.disconnect_controller(controller
)
265 return application_configs
267 @retry(attempts
=3, delay
=5)
268 async def get_model(self
, controller
: Controller
, model_name
: str) -> Model
:
270 Get model from controller
272 :param: controller: Controller
273 :param: model_name: Model name
275 :return: Model: The created Juju model object
277 return await controller
.get_model(model_name
)
279 async def model_exists(
280 self
, model_name
: str, controller
: Controller
= None
283 Check if model exists
285 :param: controller: Controller
286 :param: model_name: Model name
290 need_to_disconnect
= False
292 # Get controller if not passed
294 controller
= await self
.get_controller()
295 need_to_disconnect
= True
297 # Check if model exists
299 return model_name
in await controller
.list_models()
301 if need_to_disconnect
:
302 await self
.disconnect_controller(controller
)
304 async def models_exist(self
, model_names
: [str]) -> (bool, list):
306 Check if models exists
308 :param: model_names: List of strings with model names
310 :return (bool, list[str]): (True if all models exists, List of model names that don't exist)
314 "model_names must be a non-empty array. Given value: {}".format(
318 non_existing_models
= []
319 models
= await self
.list_models()
320 existing_models
= list(set(models
).intersection(model_names
))
321 non_existing_models
= list(set(model_names
) - set(existing_models
))
324 len(non_existing_models
) == 0,
328 async def get_model_status(self
, model_name
: str) -> FullStatus
:
332 :param: model_name: Model name
334 :return: Full status object
336 controller
= await self
.get_controller()
337 model
= await self
.get_model(controller
, model_name
)
339 return await model
.get_status()
341 await self
.disconnect_model(model
)
342 await self
.disconnect_controller(controller
)
344 async def create_machine(
347 machine_id
: str = None,
348 db_dict
: dict = None,
349 progress_timeout
: float = None,
350 total_timeout
: float = None,
351 series
: str = "bionic",
353 ) -> (Machine
, bool):
357 :param: model_name: Model name
358 :param: machine_id: Machine id
359 :param: db_dict: Dictionary with data of the DB to write the updates
360 :param: progress_timeout: Maximum time between two updates in the model
361 :param: total_timeout: Timeout for the entity to be active
362 :param: series: Series of the machine (xenial, bionic, focal, ...)
363 :param: wait: Wait until machine is ready
365 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
366 if the machine is new or it already existed
372 "Creating machine (id={}) in model: {}".format(machine_id
, model_name
)
376 controller
= await self
.get_controller()
379 model
= await self
.get_model(controller
, model_name
)
381 if machine_id
is not None:
383 "Searching machine (id={}) in model {}".format(
384 machine_id
, model_name
388 # Get machines from model and get the machine with machine_id if exists
389 machines
= await model
.get_machines()
390 if machine_id
in machines
:
392 "Machine (id={}) found in model {}".format(
393 machine_id
, model_name
396 machine
= machines
[machine_id
]
398 raise JujuMachineNotFound("Machine {} not found".format(machine_id
))
401 self
.log
.debug("Creating a new machine in model {}".format(model_name
))
404 machine
= await model
.add_machine(
405 spec
=None, constraints
=None, disks
=None, series
=series
409 # Wait until the machine is ready
411 "Wait until machine {} is ready in model {}".format(
412 machine
.entity_id
, model_name
416 await JujuModelWatcher
.wait_for(
419 progress_timeout
=progress_timeout
,
420 total_timeout
=total_timeout
,
423 vca_id
=self
.vca_connection
._vca
_id
,
426 await self
.disconnect_model(model
)
427 await self
.disconnect_controller(controller
)
430 "Machine {} ready at {} in model {}".format(
431 machine
.entity_id
, machine
.dns_name
, model_name
436 async def provision_machine(
441 private_key_path
: str,
442 db_dict
: dict = None,
443 progress_timeout
: float = None,
444 total_timeout
: float = None,
447 Manually provisioning of a machine
449 :param: model_name: Model name
450 :param: hostname: IP to access the machine
451 :param: username: Username to login to the machine
452 :param: private_key_path: Local path for the private key
453 :param: db_dict: Dictionary with data of the DB to write the updates
454 :param: progress_timeout: Maximum time between two updates in the model
455 :param: total_timeout: Timeout for the entity to be active
457 :return: (Entity): Machine id
460 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
461 model_name
, hostname
, username
466 controller
= await self
.get_controller()
469 model
= await self
.get_model(controller
, model_name
)
473 provisioner
= AsyncSSHProvisioner(
476 private_key_path
=private_key_path
,
481 params
= await provisioner
.provision_machine()
483 params
.jobs
= ["JobHostUnits"]
485 self
.log
.debug("Adding machine to model")
486 connection
= model
.connection()
487 client_facade
= client
.ClientFacade
.from_connection(connection
)
489 results
= await client_facade
.AddMachines(params
=[params
])
490 error
= results
.machines
[0].error
493 msg
= "Error adding machine: {}".format(error
.message
)
494 self
.log
.error(msg
=msg
)
495 raise ValueError(msg
)
497 machine_id
= results
.machines
[0].machine
499 self
.log
.debug("Installing Juju agent into machine {}".format(machine_id
))
500 asyncio
.ensure_future(
501 provisioner
.install_agent(
502 connection
=connection
,
504 machine_id
=machine_id
,
505 proxy
=self
.vca_connection
.data
.api_proxy
,
506 series
=params
.series
,
512 machine_list
= await model
.get_machines()
513 if machine_id
in machine_list
:
514 self
.log
.debug("Machine {} found in model!".format(machine_id
))
515 machine
= model
.machines
.get(machine_id
)
517 await asyncio
.sleep(2)
520 msg
= "Machine {} not found in model".format(machine_id
)
521 self
.log
.error(msg
=msg
)
522 raise JujuMachineNotFound(msg
)
525 "Wait until machine {} is ready in model {}".format(
526 machine
.entity_id
, model_name
529 await JujuModelWatcher
.wait_for(
532 progress_timeout
=progress_timeout
,
533 total_timeout
=total_timeout
,
536 vca_id
=self
.vca_connection
._vca
_id
,
538 except Exception as e
:
541 await self
.disconnect_model(model
)
542 await self
.disconnect_controller(controller
)
545 "Machine provisioned {} in model {}".format(machine_id
, model_name
)
551 self
, uri
: str, model_name
: str, wait
: bool = True, timeout
: float = 3600
554 Deploy bundle or charm: Similar to the juju CLI command `juju deploy`
556 :param: uri: Path or Charm Store uri in which the charm or bundle can be found
557 :param: model_name: Model name
558 :param: wait: Indicates whether to wait or not until all applications are active
559 :param: timeout: Time in seconds to wait until all applications are active
561 controller
= await self
.get_controller()
562 model
= await self
.get_model(controller
, model_name
)
564 await model
.deploy(uri
, trust
=True)
566 await JujuModelWatcher
.wait_for_model(model
, timeout
=timeout
)
567 self
.log
.debug("All units active in model {}".format(model_name
))
569 await self
.disconnect_model(model
)
570 await self
.disconnect_controller(controller
)
574 application_name
: str,
577 db_dict
: dict = None,
578 progress_timeout
: float = None,
579 total_timeout
: float = None,
583 :param: application_name: Application name
584 :param: model_name: Model name
585 :param: machine_id Machine id
586 :param: db_dict: Dictionary with data of the DB to write the updates
587 :param: progress_timeout: Maximum time between two updates in the model
588 :param: total_timeout: Timeout for the entity to be active
594 controller
= await self
.get_controller()
596 model
= await self
.get_model(controller
, model_name
)
597 application
= self
._get
_application
(model
, application_name
)
599 if application
is not None:
601 # Checks if the given machine id in the model,
602 # otherwise function raises an error
603 _machine
, _series
= self
._get
_machine
_info
(model
, machine_id
)
606 "Adding unit (machine {}) to application {} in model ~{}".format(
607 machine_id
, application_name
, model_name
611 await application
.add_unit(to
=machine_id
)
613 await JujuModelWatcher
.wait_for(
616 progress_timeout
=progress_timeout
,
617 total_timeout
=total_timeout
,
620 vca_id
=self
.vca_connection
._vca
_id
,
623 "Unit is added to application {} in model {}".format(
624 application_name
, model_name
628 raise JujuApplicationNotFound(
629 "Application {} not exists".format(application_name
)
633 await self
.disconnect_model(model
)
634 await self
.disconnect_controller(controller
)
636 async def destroy_unit(
638 application_name
: str,
641 total_timeout
: float = None,
645 :param: application_name: Application name
646 :param: model_name: Model name
647 :param: machine_id Machine id
648 :param: total_timeout: Timeout for the entity to be active
654 controller
= await self
.get_controller()
656 model
= await self
.get_model(controller
, model_name
)
657 application
= self
._get
_application
(model
, application_name
)
659 if application
is None:
660 raise JujuApplicationNotFound(
661 "Application not found: {} (model={})".format(
662 application_name
, model_name
666 unit
= self
._get
_unit
(application
, machine_id
)
669 "A unit with machine id {} not in available units".format(
674 unit_name
= unit
.name
677 "Destroying unit {} from application {} in model {}".format(
678 unit_name
, application_name
, model_name
681 await application
.destroy_unit(unit_name
)
684 "Waiting for unit {} to be destroyed in application {} (model={})...".format(
685 unit_name
, application_name
, model_name
689 # TODO: Add functionality in the Juju watcher to replace this kind of blocks
690 if total_timeout
is None:
692 end
= time
.time() + total_timeout
693 while time
.time() < end
:
694 if not self
._get
_unit
(application
, machine_id
):
696 "The unit {} was destroyed in application {} (model={}) ".format(
697 unit_name
, application_name
, model_name
701 await asyncio
.sleep(5)
703 "Unit {} is destroyed from application {} in model {}".format(
704 unit_name
, application_name
, model_name
709 await self
.disconnect_model(model
)
710 await self
.disconnect_controller(controller
)
712 async def deploy_charm(
714 application_name
: str,
718 db_dict
: dict = None,
719 progress_timeout
: float = None,
720 total_timeout
: float = None,
727 :param: application_name: Application name
728 :param: path: Local path to the charm
729 :param: model_name: Model name
730 :param: machine_id ID of the machine
731 :param: db_dict: Dictionary with data of the DB to write the updates
732 :param: progress_timeout: Maximum time between two updates in the model
733 :param: total_timeout: Timeout for the entity to be active
734 :param: config: Config for the charm
735 :param: series: Series of the charm
736 :param: num_units: Number of units
738 :return: (juju.application.Application): Juju application
741 "Deploying charm {} to machine {} in model ~{}".format(
742 application_name
, machine_id
, model_name
745 self
.log
.debug("charm: {}".format(path
))
748 controller
= await self
.get_controller()
751 model
= await self
.get_model(controller
, model_name
)
754 if application_name
not in model
.applications
:
756 if machine_id
is not None:
757 machine
, series
= self
._get
_machine
_info
(model
, machine_id
)
759 application
= await model
.deploy(
761 application_name
=application_name
,
770 "Wait until application {} is ready in model {}".format(
771 application_name
, model_name
775 for _
in range(num_units
- 1):
776 m
, _
= await self
.create_machine(model_name
, wait
=False)
777 await application
.add_unit(to
=m
.entity_id
)
779 await JujuModelWatcher
.wait_for(
782 progress_timeout
=progress_timeout
,
783 total_timeout
=total_timeout
,
786 vca_id
=self
.vca_connection
._vca
_id
,
789 "Application {} is ready in model {}".format(
790 application_name
, model_name
794 raise JujuApplicationExists(
795 "Application {} exists".format(application_name
)
797 except juju
.errors
.JujuError
as e
:
798 if "already exists" in e
.message
:
799 raise JujuApplicationExists(
800 "Application {} exists".format(application_name
)
805 await self
.disconnect_model(model
)
806 await self
.disconnect_controller(controller
)
810 async def resolve(self
, model_name
: str):
812 controller
= await self
.get_controller()
813 model
= await self
.get_model(controller
, model_name
)
814 all_units_active
= False
816 while not all_units_active
:
817 all_units_active
= True
818 for application_name
, application
in model
.applications
.items():
819 if application
.status
== "error":
820 for unit
in application
.units
:
821 if unit
.workload_status
== "error":
823 "Model {}, Application {}, Unit {} in error state, resolving".format(
824 model_name
, application_name
, unit
.entity_id
828 await unit
.resolved(retry
=False)
829 all_units_active
= False
833 if not all_units_active
:
834 await asyncio
.sleep(5)
836 await self
.disconnect_model(model
)
837 await self
.disconnect_controller(controller
)
839 async def scale_application(
842 application_name
: str,
844 total_timeout
: float = None,
847 Scale application (K8s)
849 :param: model_name: Model name
850 :param: application_name: Application name
851 :param: scale: Scale to which to set this application
852 :param: total_timeout: Timeout for the entity to be active
856 controller
= await self
.get_controller()
858 model
= await self
.get_model(controller
, model_name
)
861 "Scaling application {} in model {}".format(
862 application_name
, model_name
865 application
= self
._get
_application
(model
, application_name
)
866 if application
is None:
867 raise JujuApplicationNotFound("Cannot scale application")
868 await application
.scale(scale
=scale
)
869 # Wait until application is scaled in model
871 "Waiting for application {} to be scaled in model {}...".format(
872 application_name
, model_name
875 if total_timeout
is None:
877 end
= time
.time() + total_timeout
878 while time
.time() < end
:
879 application_scale
= self
._get
_application
_count
(model
, application_name
)
880 # Before calling wait_for_model function,
881 # wait until application unit count and scale count are equal.
882 # Because there is a delay before scaling triggers in Juju model.
883 if application_scale
== scale
:
884 await JujuModelWatcher
.wait_for_model(
885 model
=model
, timeout
=total_timeout
888 "Application {} is scaled in model {}".format(
889 application_name
, model_name
893 await asyncio
.sleep(5)
895 "Timeout waiting for application {} in model {} to be scaled".format(
896 application_name
, model_name
901 await self
.disconnect_model(model
)
902 await self
.disconnect_controller(controller
)
904 def _get_application_count(self
, model
: Model
, application_name
: str) -> int:
905 """Get number of units of the application
907 :param: model: Model object
908 :param: application_name: Application name
910 :return: int (or None if application doesn't exist)
912 application
= self
._get
_application
(model
, application_name
)
913 if application
is not None:
914 return len(application
.units
)
916 def _get_application(self
, model
: Model
, application_name
: str) -> Application
:
919 :param: model: Model object
920 :param: application_name: Application name
922 :return: juju.application.Application (or None if it doesn't exist)
924 if model
.applications
and application_name
in model
.applications
:
925 return model
.applications
[application_name
]
927 def _get_unit(self
, application
: Application
, machine_id
: str) -> Unit
:
930 :param: application: Application object
931 :param: machine_id: Machine id
936 for u
in application
.units
:
937 if u
.machine_id
== machine_id
:
942 def _get_machine_info(
949 :param: model: Model object
950 :param: machine_id: Machine id
952 :return: (str, str): (machine, series)
954 if machine_id
not in model
.machines
:
955 msg
= "Machine {} not found in model".format(machine_id
)
956 self
.log
.error(msg
=msg
)
957 raise JujuMachineNotFound(msg
)
958 machine
= model
.machines
[machine_id
]
959 return machine
, machine
.series
961 async def execute_action(
963 application_name
: str,
966 db_dict
: dict = None,
967 machine_id
: str = None,
968 progress_timeout
: float = None,
969 total_timeout
: float = None,
974 :param: application_name: Application name
975 :param: model_name: Model name
976 :param: action_name: Name of the action
977 :param: db_dict: Dictionary with data of the DB to write the updates
978 :param: machine_id Machine id
979 :param: progress_timeout: Maximum time between two updates in the model
980 :param: total_timeout: Timeout for the entity to be active
982 :return: (str, str): (output and status)
985 "Executing action {} using params {}".format(action_name
, kwargs
)
988 controller
= await self
.get_controller()
991 model
= await self
.get_model(controller
, model_name
)
995 application
= self
._get
_application
(
997 application_name
=application_name
,
999 if application
is None:
1000 raise JujuApplicationNotFound("Cannot execute action")
1002 # Ocassionally, self._get_leader_unit() will return None
1003 # because the leader elected hook has not been triggered yet.
1004 # Therefore, we are doing some retries. If it happens again,
1006 if machine_id
is None:
1007 unit
= await self
._get
_leader
_unit
(application
)
1009 "Action {} is being executed on the leader unit {}".format(
1010 action_name
, unit
.name
1014 unit
= self
._get
_unit
(application
, machine_id
)
1017 "A unit with machine id {} not in available units".format(
1022 "Action {} is being executed on {} unit".format(
1023 action_name
, unit
.name
1027 actions
= await application
.get_actions()
1029 if action_name
not in actions
:
1030 raise JujuActionNotFound(
1031 "Action {} not in available actions".format(action_name
)
1034 action
= await unit
.run_action(action_name
, **kwargs
)
1037 "Wait until action {} is completed in application {} (model={})".format(
1038 action_name
, application_name
, model_name
1041 await JujuModelWatcher
.wait_for(
1044 progress_timeout
=progress_timeout
,
1045 total_timeout
=total_timeout
,
1048 vca_id
=self
.vca_connection
._vca
_id
,
1051 output
= await model
.get_action_output(action_uuid
=action
.entity_id
)
1052 status
= await model
.get_action_status(uuid_or_prefix
=action
.entity_id
)
1054 status
[action
.entity_id
] if action
.entity_id
in status
else "failed"
1058 "Action {} completed with status {} in application {} (model={})".format(
1059 action_name
, action
.status
, application_name
, model_name
1063 await self
.disconnect_model(model
)
1064 await self
.disconnect_controller(controller
)
1066 return output
, status
1068 async def get_actions(self
, application_name
: str, model_name
: str) -> dict:
1069 """Get list of actions
1071 :param: application_name: Application name
1072 :param: model_name: Model name
1074 :return: Dict with this format
1076 "action_name": "Description of the action",
1081 "Getting list of actions for application {}".format(application_name
)
1085 controller
= await self
.get_controller()
1088 model
= await self
.get_model(controller
, model_name
)
1092 application
= self
._get
_application
(
1094 application_name
=application_name
,
1097 # Return list of actions
1098 return await application
.get_actions()
1101 # Disconnect from model and controller
1102 await self
.disconnect_model(model
)
1103 await self
.disconnect_controller(controller
)
1105 async def get_metrics(self
, model_name
: str, application_name
: str) -> dict:
1106 """Get the metrics collected by the VCA.
1108 :param model_name The name or unique id of the network service
1109 :param application_name The name of the application
1111 if not model_name
or not application_name
:
1112 raise Exception("model_name and application_name must be non-empty strings")
1114 controller
= await self
.get_controller()
1115 model
= await self
.get_model(controller
, model_name
)
1117 application
= self
._get
_application
(model
, application_name
)
1118 if application
is not None:
1119 metrics
= await application
.get_metrics()
1121 self
.disconnect_model(model
)
1122 self
.disconnect_controller(controller
)
1125 async def add_relation(
1133 :param: model_name: Model name
1134 :param: endpoint_1 First endpoint name
1135 ("app:endpoint" format or directly the saas name)
1136 :param: endpoint_2: Second endpoint name (^ same format)
1139 self
.log
.debug("Adding relation: {} -> {}".format(endpoint_1
, endpoint_2
))
1142 controller
= await self
.get_controller()
1145 model
= await self
.get_model(controller
, model_name
)
1149 await model
.add_relation(endpoint_1
, endpoint_2
)
1150 except juju
.errors
.JujuAPIError
as e
:
1151 if "not found" in e
.message
:
1152 self
.log
.warning("Relation not found: {}".format(e
.message
))
1154 if "already exists" in e
.message
:
1155 self
.log
.warning("Relation already exists: {}".format(e
.message
))
1157 # another exception, raise it
1160 await self
.disconnect_model(model
)
1161 await self
.disconnect_controller(controller
)
1169 Adds a remote offer to the model. Relations can be created later using "juju relate".
1171 :param: offer_url: Offer Url
1172 :param: model_name: Model name
1174 :raises ParseError if there's a problem parsing the offer_url
1175 :raises JujuError if remote offer includes and endpoint
1176 :raises JujuAPIError if the operation is not successful
1178 controller
= await self
.get_controller()
1179 model
= await controller
.get_model(model_name
)
1182 await model
.consume(offer_url
)
1184 await self
.disconnect_model(model
)
1185 await self
.disconnect_controller(controller
)
1187 async def destroy_model(self
, model_name
: str, total_timeout
: float = 1800):
1191 :param: model_name: Model name
1192 :param: total_timeout: Timeout
1195 controller
= await self
.get_controller()
1198 if not await self
.model_exists(model_name
, controller
=controller
):
1201 self
.log
.debug("Destroying model {}".format(model_name
))
1203 model
= await self
.get_model(controller
, model_name
)
1204 # Destroy machines that are manually provisioned
1205 # and still are in pending state
1206 await self
._destroy
_pending
_machines
(model
, only_manual
=True)
1207 await self
.disconnect_model(model
)
1209 await asyncio
.wait_for(
1210 self
._destroy
_model
(model_name
, controller
),
1211 timeout
=total_timeout
,
1213 except Exception as e
:
1214 if not await self
.model_exists(model_name
, controller
=controller
):
1219 await self
.disconnect_model(model
)
1220 await self
.disconnect_controller(controller
)
1222 async def _destroy_model(
1225 controller
: Controller
,
1228 Destroy model from controller
1230 :param: model: Model name to be removed
1231 :param: controller: Controller object
1232 :param: timeout: Timeout in seconds
1235 async def _destroy_model_gracefully(model_name
: str, controller
: Controller
):
1236 self
.log
.info(f
"Gracefully deleting model {model_name}")
1238 while model_name
in await controller
.list_models():
1240 await self
.resolve(model_name
)
1242 await controller
.destroy_model(model_name
, destroy_storage
=True)
1244 await asyncio
.sleep(5)
1245 self
.log
.info(f
"Model {model_name} deleted gracefully")
1247 async def _destroy_model_forcefully(model_name
: str, controller
: Controller
):
1248 self
.log
.info(f
"Forcefully deleting model {model_name}")
1249 while model_name
in await controller
.list_models():
1250 await controller
.destroy_model(
1251 model_name
, destroy_storage
=True, force
=True, max_wait
=60
1253 await asyncio
.sleep(5)
1254 self
.log
.info(f
"Model {model_name} deleted forcefully")
1258 await asyncio
.wait_for(
1259 _destroy_model_gracefully(model_name
, controller
), timeout
=120
1261 except asyncio
.TimeoutError
:
1262 await _destroy_model_forcefully(model_name
, controller
)
1263 except juju
.errors
.JujuError
as e
:
1264 if any("has been removed" in error
for error
in e
.errors
):
1266 if any("model not found" in error
for error
in e
.errors
):
1270 async def destroy_application(
1271 self
, model_name
: str, application_name
: str, total_timeout
: float
1276 :param: model_name: Model name
1277 :param: application_name: Application name
1278 :param: total_timeout: Timeout
1281 controller
= await self
.get_controller()
1285 model
= await self
.get_model(controller
, model_name
)
1287 "Destroying application {} in model {}".format(
1288 application_name
, model_name
1291 application
= self
._get
_application
(model
, application_name
)
1293 await application
.destroy()
1295 self
.log
.warning("Application not found: {}".format(application_name
))
1298 "Waiting for application {} to be destroyed in model {}...".format(
1299 application_name
, model_name
1302 if total_timeout
is None:
1303 total_timeout
= 3600
1304 end
= time
.time() + total_timeout
1305 while time
.time() < end
:
1306 if not self
._get
_application
(model
, application_name
):
1308 "The application {} was destroyed in model {} ".format(
1309 application_name
, model_name
1313 await asyncio
.sleep(5)
1315 "Timeout waiting for application {} to be destroyed in model {}".format(
1316 application_name
, model_name
1320 if model
is not None:
1321 await self
.disconnect_model(model
)
1322 await self
.disconnect_controller(controller
)
1324 async def _destroy_pending_machines(self
, model
: Model
, only_manual
: bool = False):
1326 Destroy pending machines in a given model
1328 :param: only_manual: Bool that indicates only manually provisioned
1329 machines should be destroyed (if True), or that
1330 all pending machines should be destroyed
1332 status
= await model
.get_status()
1333 for machine_id
in status
.machines
:
1334 machine_status
= status
.machines
[machine_id
]
1335 if machine_status
.agent_status
.status
== "pending":
1336 if only_manual
and not machine_status
.instance_id
.startswith("manual:"):
1338 machine
= model
.machines
[machine_id
]
1339 await machine
.destroy(force
=True)
1341 async def configure_application(
1342 self
, model_name
: str, application_name
: str, config
: dict = None
1344 """Configure application
1346 :param: model_name: Model name
1347 :param: application_name: Application name
1348 :param: config: Config to apply to the charm
1350 self
.log
.debug("Configuring application {}".format(application_name
))
1353 controller
= await self
.get_controller()
1356 model
= await self
.get_model(controller
, model_name
)
1357 application
= self
._get
_application
(
1359 application_name
=application_name
,
1361 await application
.set_config(config
)
1364 await self
.disconnect_model(model
)
1365 await self
.disconnect_controller(controller
)
1367 def handle_exception(self
, loop
, context
):
1368 # All unhandled exceptions by libjuju are handled here.
1371 async def health_check(self
, interval
: float = 300.0):
1373 Health check to make sure controller and controller_model connections are OK
1375 :param: interval: Time in seconds between checks
1380 controller
= await self
.get_controller()
1381 # self.log.debug("VCA is alive")
1382 except Exception as e
:
1383 self
.log
.error("Health check to VCA failed: {}".format(e
))
1385 await self
.disconnect_controller(controller
)
1386 await asyncio
.sleep(interval
)
1388 async def list_models(self
, contains
: str = None) -> [str]:
1389 """List models with certain names
1391 :param: contains: String that is contained in model name
1393 :retur: [models] Returns list of model names
1396 controller
= await self
.get_controller()
1398 models
= await controller
.list_models()
1400 models
= [model
for model
in models
if contains
in model
]
1403 await self
.disconnect_controller(controller
)
1405 async def list_offers(self
, model_name
: str) -> QueryApplicationOffersResults
:
1406 """List models with certain names
1408 :param: model_name: Model name
1410 :return: Returns list of offers
1413 controller
= await self
.get_controller()
1415 return await controller
.list_offers(model_name
)
1417 await self
.disconnect_controller(controller
)
1424 client_cert_data
: str,
1425 configuration
: Configuration
,
1427 credential_name
: str = None,
1430 Add a Kubernetes cloud to the controller
1432 Similar to the `juju add-k8s` command in the CLI
1434 :param: name: Name for the K8s cloud
1435 :param: configuration: Kubernetes configuration object
1436 :param: storage_class: Storage Class to use in the cloud
1437 :param: credential_name: Storage Class to use in the cloud
1440 if not storage_class
:
1441 raise Exception("storage_class must be a non-empty string")
1443 raise Exception("name must be a non-empty string")
1444 if not configuration
:
1445 raise Exception("configuration must be provided")
1447 endpoint
= configuration
.host
1448 credential
= self
.get_k8s_cloud_credential(
1453 credential
.attrs
[RBAC_LABEL_KEY_NAME
] = rbac_id
1454 cloud
= client
.Cloud(
1456 auth_types
=[credential
.auth_type
],
1458 ca_certificates
=[client_cert_data
],
1460 "operator-storage": storage_class
,
1461 "workload-storage": storage_class
,
1465 return await self
.add_cloud(
1466 name
, cloud
, credential
, credential_name
=credential_name
1469 def get_k8s_cloud_credential(
1471 configuration
: Configuration
,
1472 client_cert_data
: str,
1474 ) -> client
.CloudCredential
:
1476 # TODO: Test with AKS
1477 key
= None # open(configuration.key_file, "r").read()
1478 username
= configuration
.username
1479 password
= configuration
.password
1481 if client_cert_data
:
1482 attrs
["ClientCertificateData"] = client_cert_data
1484 attrs
["ClientKeyData"] = key
1486 if username
or password
:
1487 raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass")
1488 attrs
["Token"] = token
1492 auth_type
= "oauth2"
1493 if client_cert_data
:
1494 auth_type
= "oauth2withcert"
1496 raise JujuInvalidK8sConfiguration(
1497 "missing token for auth type {}".format(auth_type
)
1502 "credential for user {} has empty password".format(username
)
1504 attrs
["username"] = username
1505 attrs
["password"] = password
1506 if client_cert_data
:
1507 auth_type
= "userpasswithcert"
1509 auth_type
= "userpass"
1510 elif client_cert_data
and token
:
1511 auth_type
= "certificate"
1513 raise JujuInvalidK8sConfiguration("authentication method not supported")
1514 return client
.CloudCredential(auth_type
=auth_type
, attrs
=attrs
)
1516 async def add_cloud(
1520 credential
: CloudCredential
= None,
1521 credential_name
: str = None,
1524 Add cloud to the controller
1526 :param: name: Name of the cloud to be added
1527 :param: cloud: Cloud object
1528 :param: credential: CloudCredentials object for the cloud
1529 :param: credential_name: Credential name.
1530 If not defined, cloud of the name will be used.
1532 controller
= await self
.get_controller()
1534 _
= await controller
.add_cloud(name
, cloud
)
1536 await controller
.add_credential(
1537 credential_name
or name
, credential
=credential
, cloud
=name
1539 # Need to return the object returned by the controller.add_cloud() function
1540 # I'm returning the original value now until this bug is fixed:
1541 # https://github.com/juju/python-libjuju/issues/443
1544 await self
.disconnect_controller(controller
)
1546 async def remove_cloud(self
, name
: str):
1550 :param: name: Name of the cloud to be removed
1552 controller
= await self
.get_controller()
1554 await controller
.remove_cloud(name
)
1555 except juju
.errors
.JujuError
as e
:
1556 if len(e
.errors
) == 1 and f
'cloud "{name}" not found' == e
.errors
[0]:
1557 self
.log
.warning(f
"Cloud {name} not found, so it could not be deleted.")
1561 await self
.disconnect_controller(controller
)
1563 @retry(attempts
=20, delay
=5, fallback
=JujuLeaderUnitNotFound())
1564 async def _get_leader_unit(self
, application
: Application
) -> Unit
:
1566 for u
in application
.units
:
1567 if await u
.is_leader_from_status():
1574 async def get_cloud_credentials(self
, cloud
: Cloud
) -> typing
.List
:
1576 Get cloud credentials
1578 :param: cloud: Cloud object. The returned credentials will be from this cloud.
1580 :return: List of credentials object associated to the specified cloud
1583 controller
= await self
.get_controller()
1585 facade
= client
.CloudFacade
.from_connection(controller
.connection())
1586 cloud_cred_tag
= tag
.credential(
1587 cloud
.name
, self
.vca_connection
.data
.user
, cloud
.credential_name
1589 params
= [client
.Entity(cloud_cred_tag
)]
1590 return (await facade
.Credential(params
)).results
1592 await self
.disconnect_controller(controller
)
1594 async def check_application_exists(self
, model_name
, application_name
) -> bool:
1595 """Check application exists
1597 :param: model_name: Model Name
1598 :param: application_name: Application Name
1604 controller
= await self
.get_controller()
1606 model
= await self
.get_model(controller
, model_name
)
1608 "Checking if application {} exists in model {}".format(
1609 application_name
, model_name
1612 return self
._get
_application
(model
, application_name
) is not None
1615 await self
.disconnect_model(model
)
1616 await self
.disconnect_controller(controller
)