1 # Copyright 2020 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
22 from juju
.model
import Model
23 from juju
.machine
import Machine
24 from juju
.application
import Application
25 from juju
.unit
import Unit
26 from juju
.client
._definitions
import (
28 QueryApplicationOffersResults
,
32 from juju
.controller
import Controller
33 from juju
.client
import client
36 from n2vc
.definitions
import Offer
, RelationEndpoint
37 from n2vc
.juju_watcher
import JujuModelWatcher
38 from n2vc
.provisioner
import AsyncSSHProvisioner
39 from n2vc
.n2vc_conn
import N2VCConnector
40 from n2vc
.exceptions
import (
42 JujuApplicationNotFound
,
43 JujuLeaderUnitNotFound
,
45 JujuControllerFailedConnecting
,
46 JujuApplicationExists
,
47 JujuInvalidK8sConfiguration
,
50 from n2vc
.vca
.cloud
import Cloud
as VcaCloud
51 from n2vc
.vca
.connection
import Connection
52 from kubernetes
.client
.configuration
import Configuration
53 from retrying_async
import retry
56 RBAC_LABEL_KEY_NAME
= "rbac-id"
62 vca_connection
: Connection
,
63 loop
: asyncio
.AbstractEventLoop
= None,
64 log
: logging
.Logger
= None,
65 n2vc
: N2VCConnector
= None,
70 :param: vca_connection: n2vc.vca.connection object
71 :param: loop: Asyncio loop
73 :param: n2vc: N2VC object
76 self
.log
= log
or logging
.getLogger("Libjuju")
78 self
.vca_connection
= vca_connection
80 self
.loop
= loop
or asyncio
.get_event_loop()
81 self
.loop
.set_exception_handler(self
.handle_exception
)
82 self
.creating_model
= asyncio
.Lock(loop
=self
.loop
)
84 if self
.vca_connection
.is_default
:
85 self
.health_check_task
= self
._create
_health
_check
_task
()
87 def _create_health_check_task(self
):
88 return self
.loop
.create_task(self
.health_check())
90 async def get_controller(self
, timeout
: float = 60.0) -> Controller
:
94 :param: timeout: Time in seconds to wait for controller to connect
98 controller
= Controller()
99 await asyncio
.wait_for(
101 endpoint
=self
.vca_connection
.data
.endpoints
,
102 username
=self
.vca_connection
.data
.user
,
103 password
=self
.vca_connection
.data
.secret
,
104 cacert
=self
.vca_connection
.data
.cacert
,
108 if self
.vca_connection
.is_default
:
109 endpoints
= await controller
.api_endpoints
111 endpoint
in self
.vca_connection
.endpoints
for endpoint
in endpoints
113 await self
.vca_connection
.update_endpoints(endpoints
)
115 except asyncio
.CancelledError
as e
:
117 except Exception as e
:
119 "Failed connecting to controller: {}... {}".format(
120 self
.vca_connection
.data
.endpoints
, e
124 await self
.disconnect_controller(controller
)
125 raise JujuControllerFailedConnecting(e
)
127 async def disconnect(self
):
129 # Cancel health check task
130 self
.health_check_task
.cancel()
131 self
.log
.debug("Libjuju disconnected!")
133 async def disconnect_model(self
, model
: Model
):
137 :param: model: Model that will be disconnected
139 await model
.disconnect()
141 async def disconnect_controller(self
, controller
: Controller
):
143 Disconnect controller
145 :param: controller: Controller that will be disconnected
148 await controller
.disconnect()
150 @retry(attempts
=3, delay
=5, timeout
=None)
151 async def add_model(self
, model_name
: str, cloud
: VcaCloud
):
155 :param: model_name: Model name
156 :param: cloud: Cloud object
160 controller
= await self
.get_controller()
163 # Block until other workers have finished model creation
164 while self
.creating_model
.locked():
165 await asyncio
.sleep(0.1)
168 async with self
.creating_model
:
169 if await self
.model_exists(model_name
, controller
=controller
):
171 self
.log
.debug("Creating model {}".format(model_name
))
172 model
= await controller
.add_model(
174 config
=self
.vca_connection
.data
.model_config
,
175 cloud_name
=cloud
.name
,
176 credential_name
=cloud
.credential_name
,
178 except juju
.errors
.JujuAPIError
as e
:
179 if "already exists" in e
.message
:
185 await self
.disconnect_model(model
)
186 await self
.disconnect_controller(controller
)
188 async def get_executed_actions(self
, model_name
: str) -> list:
190 Get executed/history of actions for a model.
192 :param: model_name: Model name, str.
193 :return: List of executed actions for a model.
196 executed_actions
= []
197 controller
= await self
.get_controller()
199 model
= await self
.get_model(controller
, model_name
)
200 # Get all unique action names
202 for application
in model
.applications
:
203 application_actions
= await self
.get_actions(application
, model_name
)
204 actions
.update(application_actions
)
205 # Get status of all actions
206 for application_action
in actions
:
207 app_action_status_list
= await model
.get_action_status(
208 name
=application_action
210 for action_id
, action_status
in app_action_status_list
.items():
213 "action": application_action
,
214 "status": action_status
,
216 # Get action output by id
217 action_status
= await model
.get_action_output(executed_action
["id"])
218 for k
, v
in action_status
.items():
219 executed_action
[k
] = v
220 executed_actions
.append(executed_action
)
221 except Exception as e
:
223 "Error in getting executed actions for model: {}. Error: {}".format(
229 await self
.disconnect_model(model
)
230 await self
.disconnect_controller(controller
)
231 return executed_actions
233 async def get_application_configs(
234 self
, model_name
: str, application_name
: str
237 Get available configs for an application.
239 :param: model_name: Model name, str.
240 :param: application_name: Application name, str.
242 :return: A dict which has key - action name, value - action description
245 application_configs
= {}
246 controller
= await self
.get_controller()
248 model
= await self
.get_model(controller
, model_name
)
249 application
= self
._get
_application
(
250 model
, application_name
=application_name
252 application_configs
= await application
.get_config()
253 except Exception as e
:
255 "Error in getting configs for application: {} in model: {}. Error: {}".format(
256 application_name
, model_name
, str(e
)
261 await self
.disconnect_model(model
)
262 await self
.disconnect_controller(controller
)
263 return application_configs
265 @retry(attempts
=3, delay
=5)
266 async def get_model(self
, controller
: Controller
, model_name
: str) -> Model
:
268 Get model from controller
270 :param: controller: Controller
271 :param: model_name: Model name
273 :return: Model: The created Juju model object
275 return await controller
.get_model(model_name
)
277 async def model_exists(
278 self
, model_name
: str, controller
: Controller
= None
281 Check if model exists
283 :param: controller: Controller
284 :param: model_name: Model name
288 need_to_disconnect
= False
290 # Get controller if not passed
292 controller
= await self
.get_controller()
293 need_to_disconnect
= True
295 # Check if model exists
297 return model_name
in await controller
.list_models()
299 if need_to_disconnect
:
300 await self
.disconnect_controller(controller
)
302 async def models_exist(self
, model_names
: [str]) -> (bool, list):
304 Check if models exists
306 :param: model_names: List of strings with model names
308 :return (bool, list[str]): (True if all models exists, List of model names that don't exist)
312 "model_names must be a non-empty array. Given value: {}".format(
316 non_existing_models
= []
317 models
= await self
.list_models()
318 existing_models
= list(set(models
).intersection(model_names
))
319 non_existing_models
= list(set(model_names
) - set(existing_models
))
322 len(non_existing_models
) == 0,
326 async def get_model_status(self
, model_name
: str) -> FullStatus
:
330 :param: model_name: Model name
332 :return: Full status object
334 controller
= await self
.get_controller()
335 model
= await self
.get_model(controller
, model_name
)
337 return await model
.get_status()
339 await self
.disconnect_model(model
)
340 await self
.disconnect_controller(controller
)
342 async def create_machine(
345 machine_id
: str = None,
346 db_dict
: dict = None,
347 progress_timeout
: float = None,
348 total_timeout
: float = None,
349 series
: str = "bionic",
351 ) -> (Machine
, bool):
355 :param: model_name: Model name
356 :param: machine_id: Machine id
357 :param: db_dict: Dictionary with data of the DB to write the updates
358 :param: progress_timeout: Maximum time between two updates in the model
359 :param: total_timeout: Timeout for the entity to be active
360 :param: series: Series of the machine (xenial, bionic, focal, ...)
361 :param: wait: Wait until machine is ready
363 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
364 if the machine is new or it already existed
370 "Creating machine (id={}) in model: {}".format(machine_id
, model_name
)
374 controller
= await self
.get_controller()
377 model
= await self
.get_model(controller
, model_name
)
379 if machine_id
is not None:
381 "Searching machine (id={}) in model {}".format(
382 machine_id
, model_name
386 # Get machines from model and get the machine with machine_id if exists
387 machines
= await model
.get_machines()
388 if machine_id
in machines
:
390 "Machine (id={}) found in model {}".format(
391 machine_id
, model_name
394 machine
= machines
[machine_id
]
396 raise JujuMachineNotFound("Machine {} not found".format(machine_id
))
399 self
.log
.debug("Creating a new machine in model {}".format(model_name
))
402 machine
= await model
.add_machine(
403 spec
=None, constraints
=None, disks
=None, series
=series
407 # Wait until the machine is ready
409 "Wait until machine {} is ready in model {}".format(
410 machine
.entity_id
, model_name
414 await JujuModelWatcher
.wait_for(
417 progress_timeout
=progress_timeout
,
418 total_timeout
=total_timeout
,
421 vca_id
=self
.vca_connection
._vca
_id
,
424 await self
.disconnect_model(model
)
425 await self
.disconnect_controller(controller
)
428 "Machine {} ready at {} in model {}".format(
429 machine
.entity_id
, machine
.dns_name
, model_name
434 async def provision_machine(
439 private_key_path
: str,
440 db_dict
: dict = None,
441 progress_timeout
: float = None,
442 total_timeout
: float = None,
445 Manually provisioning of a machine
447 :param: model_name: Model name
448 :param: hostname: IP to access the machine
449 :param: username: Username to login to the machine
450 :param: private_key_path: Local path for the private key
451 :param: db_dict: Dictionary with data of the DB to write the updates
452 :param: progress_timeout: Maximum time between two updates in the model
453 :param: total_timeout: Timeout for the entity to be active
455 :return: (Entity): Machine id
458 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
459 model_name
, hostname
, username
464 controller
= await self
.get_controller()
467 model
= await self
.get_model(controller
, model_name
)
471 provisioner
= AsyncSSHProvisioner(
474 private_key_path
=private_key_path
,
479 params
= await provisioner
.provision_machine()
481 params
.jobs
= ["JobHostUnits"]
483 self
.log
.debug("Adding machine to model")
484 connection
= model
.connection()
485 client_facade
= client
.ClientFacade
.from_connection(connection
)
487 results
= await client_facade
.AddMachines(params
=[params
])
488 error
= results
.machines
[0].error
491 msg
= "Error adding machine: {}".format(error
.message
)
492 self
.log
.error(msg
=msg
)
493 raise ValueError(msg
)
495 machine_id
= results
.machines
[0].machine
497 self
.log
.debug("Installing Juju agent into machine {}".format(machine_id
))
498 asyncio
.ensure_future(
499 provisioner
.install_agent(
500 connection
=connection
,
502 machine_id
=machine_id
,
503 proxy
=self
.vca_connection
.data
.api_proxy
,
504 series
=params
.series
,
510 machine_list
= await model
.get_machines()
511 if machine_id
in machine_list
:
512 self
.log
.debug("Machine {} found in model!".format(machine_id
))
513 machine
= model
.machines
.get(machine_id
)
515 await asyncio
.sleep(2)
518 msg
= "Machine {} not found in model".format(machine_id
)
519 self
.log
.error(msg
=msg
)
520 raise JujuMachineNotFound(msg
)
523 "Wait until machine {} is ready in model {}".format(
524 machine
.entity_id
, model_name
527 await JujuModelWatcher
.wait_for(
530 progress_timeout
=progress_timeout
,
531 total_timeout
=total_timeout
,
534 vca_id
=self
.vca_connection
._vca
_id
,
536 except Exception as e
:
539 await self
.disconnect_model(model
)
540 await self
.disconnect_controller(controller
)
543 "Machine provisioned {} in model {}".format(machine_id
, model_name
)
549 self
, uri
: str, model_name
: str, wait
: bool = True, timeout
: float = 3600
552 Deploy bundle or charm: Similar to the juju CLI command `juju deploy`
554 :param: uri: Path or Charm Store uri in which the charm or bundle can be found
555 :param: model_name: Model name
556 :param: wait: Indicates whether to wait or not until all applications are active
557 :param: timeout: Time in seconds to wait until all applications are active
559 controller
= await self
.get_controller()
560 model
= await self
.get_model(controller
, model_name
)
562 await model
.deploy(uri
, trust
=True)
564 await JujuModelWatcher
.wait_for_model(model
, timeout
=timeout
)
565 self
.log
.debug("All units active in model {}".format(model_name
))
567 await self
.disconnect_model(model
)
568 await self
.disconnect_controller(controller
)
572 application_name
: str,
575 db_dict
: dict = None,
576 progress_timeout
: float = None,
577 total_timeout
: float = None,
581 :param: application_name: Application name
582 :param: model_name: Model name
583 :param: machine_id Machine id
584 :param: db_dict: Dictionary with data of the DB to write the updates
585 :param: progress_timeout: Maximum time between two updates in the model
586 :param: total_timeout: Timeout for the entity to be active
592 controller
= await self
.get_controller()
594 model
= await self
.get_model(controller
, model_name
)
595 application
= self
._get
_application
(model
, application_name
)
597 if application
is not None:
599 # Checks if the given machine id in the model,
600 # otherwise function raises an error
601 _machine
, _series
= self
._get
_machine
_info
(model
, machine_id
)
604 "Adding unit (machine {}) to application {} in model ~{}".format(
605 machine_id
, application_name
, model_name
609 await application
.add_unit(to
=machine_id
)
611 await JujuModelWatcher
.wait_for(
614 progress_timeout
=progress_timeout
,
615 total_timeout
=total_timeout
,
618 vca_id
=self
.vca_connection
._vca
_id
,
621 "Unit is added to application {} in model {}".format(
622 application_name
, model_name
626 raise JujuApplicationNotFound(
627 "Application {} not exists".format(application_name
)
631 await self
.disconnect_model(model
)
632 await self
.disconnect_controller(controller
)
634 async def destroy_unit(
636 application_name
: str,
639 total_timeout
: float = None,
643 :param: application_name: Application name
644 :param: model_name: Model name
645 :param: machine_id Machine id
646 :param: total_timeout: Timeout for the entity to be active
652 controller
= await self
.get_controller()
654 model
= await self
.get_model(controller
, model_name
)
655 application
= self
._get
_application
(model
, application_name
)
657 if application
is None:
658 raise JujuApplicationNotFound(
659 "Application not found: {} (model={})".format(
660 application_name
, model_name
664 unit
= self
._get
_unit
(application
, machine_id
)
667 "A unit with machine id {} not in available units".format(
672 unit_name
= unit
.name
675 "Destroying unit {} from application {} in model {}".format(
676 unit_name
, application_name
, model_name
679 await application
.destroy_unit(unit_name
)
682 "Waiting for unit {} to be destroyed in application {} (model={})...".format(
683 unit_name
, application_name
, model_name
687 # TODO: Add functionality in the Juju watcher to replace this kind of blocks
688 if total_timeout
is None:
690 end
= time
.time() + total_timeout
691 while time
.time() < end
:
692 if not self
._get
_unit
(application
, machine_id
):
694 "The unit {} was destroyed in application {} (model={}) ".format(
695 unit_name
, application_name
, model_name
699 await asyncio
.sleep(5)
701 "Unit {} is destroyed from application {} in model {}".format(
702 unit_name
, application_name
, model_name
707 await self
.disconnect_model(model
)
708 await self
.disconnect_controller(controller
)
710 async def deploy_charm(
712 application_name
: str,
716 db_dict
: dict = None,
717 progress_timeout
: float = None,
718 total_timeout
: float = None,
725 :param: application_name: Application name
726 :param: path: Local path to the charm
727 :param: model_name: Model name
728 :param: machine_id ID of the machine
729 :param: db_dict: Dictionary with data of the DB to write the updates
730 :param: progress_timeout: Maximum time between two updates in the model
731 :param: total_timeout: Timeout for the entity to be active
732 :param: config: Config for the charm
733 :param: series: Series of the charm
734 :param: num_units: Number of units
736 :return: (juju.application.Application): Juju application
739 "Deploying charm {} to machine {} in model ~{}".format(
740 application_name
, machine_id
, model_name
743 self
.log
.debug("charm: {}".format(path
))
746 controller
= await self
.get_controller()
749 model
= await self
.get_model(controller
, model_name
)
752 if application_name
not in model
.applications
:
754 if machine_id
is not None:
755 machine
, series
= self
._get
_machine
_info
(model
, machine_id
)
757 application
= await model
.deploy(
759 application_name
=application_name
,
768 "Wait until application {} is ready in model {}".format(
769 application_name
, model_name
773 for _
in range(num_units
- 1):
774 m
, _
= await self
.create_machine(model_name
, wait
=False)
775 await application
.add_unit(to
=m
.entity_id
)
777 await JujuModelWatcher
.wait_for(
780 progress_timeout
=progress_timeout
,
781 total_timeout
=total_timeout
,
784 vca_id
=self
.vca_connection
._vca
_id
,
787 "Application {} is ready in model {}".format(
788 application_name
, model_name
792 raise JujuApplicationExists(
793 "Application {} exists".format(application_name
)
795 except juju
.errors
.JujuError
as e
:
796 if "already exists" in e
.message
:
797 raise JujuApplicationExists(
798 "Application {} exists".format(application_name
)
803 await self
.disconnect_model(model
)
804 await self
.disconnect_controller(controller
)
808 async def upgrade_charm(
810 application_name
: str,
813 total_timeout
: float = None,
818 :param: application_name: Application name
819 :param: model_name: Model name
820 :param: path: Local path to the charm
821 :param: total_timeout: Timeout for the entity to be active
823 :return: (str, str): (output and status)
827 "Upgrading charm {} in model {} from path {}".format(
828 application_name
, model_name
, path
832 await self
.resolve_application(
833 model_name
=model_name
, application_name
=application_name
837 controller
= await self
.get_controller()
840 model
= await self
.get_model(controller
, model_name
)
844 application
= self
._get
_application
(
846 application_name
=application_name
,
848 if application
is None:
849 raise JujuApplicationNotFound(
850 "Cannot find application {} to upgrade".format(application_name
)
853 await application
.refresh(path
=path
)
856 "Wait until charm upgrade is completed for application {} (model={})".format(
857 application_name
, model_name
861 await JujuModelWatcher
.ensure_units_idle(
862 model
=model
, application
=application
865 if application
.status
== "error":
866 error_message
= "Unknown"
867 for unit
in application
.units
:
869 unit
.workload_status
== "error"
870 and unit
.workload_status_message
!= ""
872 error_message
= unit
.workload_status_message
874 message
= "Application {} failed update in {}: {}".format(
875 application_name
, model_name
, error_message
877 self
.log
.error(message
)
878 raise JujuError(message
=message
)
881 "Application {} is ready in model {}".format(
882 application_name
, model_name
887 await self
.disconnect_model(model
)
888 await self
.disconnect_controller(controller
)
892 async def resolve_application(self
, model_name
: str, application_name
: str):
894 controller
= await self
.get_controller()
895 model
= await self
.get_model(controller
, model_name
)
898 application
= self
._get
_application
(
900 application_name
=application_name
,
902 if application
is None:
903 raise JujuApplicationNotFound(
904 "Cannot find application {} to resolve".format(application_name
)
907 while application
.status
== "error":
908 for unit
in application
.units
:
909 if unit
.workload_status
== "error":
911 "Model {}, Application {}, Unit {} in error state, resolving".format(
912 model_name
, application_name
, unit
.entity_id
916 await unit
.resolved(retry
=False)
920 await asyncio
.sleep(1)
923 await self
.disconnect_model(model
)
924 await self
.disconnect_controller(controller
)
926 async def scale_application(
929 application_name
: str,
931 total_timeout
: float = None,
934 Scale application (K8s)
936 :param: model_name: Model name
937 :param: application_name: Application name
938 :param: scale: Scale to which to set this application
939 :param: total_timeout: Timeout for the entity to be active
943 controller
= await self
.get_controller()
945 model
= await self
.get_model(controller
, model_name
)
948 "Scaling application {} in model {}".format(
949 application_name
, model_name
952 application
= self
._get
_application
(model
, application_name
)
953 if application
is None:
954 raise JujuApplicationNotFound("Cannot scale application")
955 await application
.scale(scale
=scale
)
956 # Wait until application is scaled in model
958 "Waiting for application {} to be scaled in model {}...".format(
959 application_name
, model_name
962 if total_timeout
is None:
964 end
= time
.time() + total_timeout
965 while time
.time() < end
:
966 application_scale
= self
._get
_application
_count
(model
, application_name
)
967 # Before calling wait_for_model function,
968 # wait until application unit count and scale count are equal.
969 # Because there is a delay before scaling triggers in Juju model.
970 if application_scale
== scale
:
971 await JujuModelWatcher
.wait_for_model(
972 model
=model
, timeout
=total_timeout
975 "Application {} is scaled in model {}".format(
976 application_name
, model_name
980 await asyncio
.sleep(5)
982 "Timeout waiting for application {} in model {} to be scaled".format(
983 application_name
, model_name
988 await self
.disconnect_model(model
)
989 await self
.disconnect_controller(controller
)
991 def _get_application_count(self
, model
: Model
, application_name
: str) -> int:
992 """Get number of units of the application
994 :param: model: Model object
995 :param: application_name: Application name
997 :return: int (or None if application doesn't exist)
999 application
= self
._get
_application
(model
, application_name
)
1000 if application
is not None:
1001 return len(application
.units
)
1003 def _get_application(self
, model
: Model
, application_name
: str) -> Application
:
1006 :param: model: Model object
1007 :param: application_name: Application name
1009 :return: juju.application.Application (or None if it doesn't exist)
1011 if model
.applications
and application_name
in model
.applications
:
1012 return model
.applications
[application_name
]
1014 def _get_unit(self
, application
: Application
, machine_id
: str) -> Unit
:
1017 :param: application: Application object
1018 :param: machine_id: Machine id
1023 for u
in application
.units
:
1024 if u
.machine_id
== machine_id
:
1029 def _get_machine_info(
1036 :param: model: Model object
1037 :param: machine_id: Machine id
1039 :return: (str, str): (machine, series)
1041 if machine_id
not in model
.machines
:
1042 msg
= "Machine {} not found in model".format(machine_id
)
1043 self
.log
.error(msg
=msg
)
1044 raise JujuMachineNotFound(msg
)
1045 machine
= model
.machines
[machine_id
]
1046 return machine
, machine
.series
1048 async def execute_action(
1050 application_name
: str,
1053 db_dict
: dict = None,
1054 machine_id
: str = None,
1055 progress_timeout
: float = None,
1056 total_timeout
: float = None,
1061 :param: application_name: Application name
1062 :param: model_name: Model name
1063 :param: action_name: Name of the action
1064 :param: db_dict: Dictionary with data of the DB to write the updates
1065 :param: machine_id Machine id
1066 :param: progress_timeout: Maximum time between two updates in the model
1067 :param: total_timeout: Timeout for the entity to be active
1069 :return: (str, str): (output and status)
1072 "Executing action {} using params {}".format(action_name
, kwargs
)
1075 controller
= await self
.get_controller()
1078 model
= await self
.get_model(controller
, model_name
)
1082 application
= self
._get
_application
(
1084 application_name
=application_name
,
1086 if application
is None:
1087 raise JujuApplicationNotFound("Cannot execute action")
1089 # Ocassionally, self._get_leader_unit() will return None
1090 # because the leader elected hook has not been triggered yet.
1091 # Therefore, we are doing some retries. If it happens again,
1093 if machine_id
is None:
1094 unit
= await self
._get
_leader
_unit
(application
)
1096 "Action {} is being executed on the leader unit {}".format(
1097 action_name
, unit
.name
1101 unit
= self
._get
_unit
(application
, machine_id
)
1104 "A unit with machine id {} not in available units".format(
1109 "Action {} is being executed on {} unit".format(
1110 action_name
, unit
.name
1114 actions
= await application
.get_actions()
1116 if action_name
not in actions
:
1117 raise JujuActionNotFound(
1118 "Action {} not in available actions".format(action_name
)
1121 action
= await unit
.run_action(action_name
, **kwargs
)
1124 "Wait until action {} is completed in application {} (model={})".format(
1125 action_name
, application_name
, model_name
1128 await JujuModelWatcher
.wait_for(
1131 progress_timeout
=progress_timeout
,
1132 total_timeout
=total_timeout
,
1135 vca_id
=self
.vca_connection
._vca
_id
,
1138 output
= await model
.get_action_output(action_uuid
=action
.entity_id
)
1139 status
= await model
.get_action_status(uuid_or_prefix
=action
.entity_id
)
1141 status
[action
.entity_id
] if action
.entity_id
in status
else "failed"
1145 "Action {} completed with status {} in application {} (model={})".format(
1146 action_name
, action
.status
, application_name
, model_name
1150 await self
.disconnect_model(model
)
1151 await self
.disconnect_controller(controller
)
1153 return output
, status
1155 async def get_actions(self
, application_name
: str, model_name
: str) -> dict:
1156 """Get list of actions
1158 :param: application_name: Application name
1159 :param: model_name: Model name
1161 :return: Dict with this format
1163 "action_name": "Description of the action",
1168 "Getting list of actions for application {}".format(application_name
)
1172 controller
= await self
.get_controller()
1175 model
= await self
.get_model(controller
, model_name
)
1179 application
= self
._get
_application
(
1181 application_name
=application_name
,
1184 # Return list of actions
1185 return await application
.get_actions()
1188 # Disconnect from model and controller
1189 await self
.disconnect_model(model
)
1190 await self
.disconnect_controller(controller
)
1192 async def get_metrics(self
, model_name
: str, application_name
: str) -> dict:
1193 """Get the metrics collected by the VCA.
1195 :param model_name The name or unique id of the network service
1196 :param application_name The name of the application
1198 if not model_name
or not application_name
:
1199 raise Exception("model_name and application_name must be non-empty strings")
1201 controller
= await self
.get_controller()
1202 model
= await self
.get_model(controller
, model_name
)
1204 application
= self
._get
_application
(model
, application_name
)
1205 if application
is not None:
1206 metrics
= await application
.get_metrics()
1208 self
.disconnect_model(model
)
1209 self
.disconnect_controller(controller
)
1212 async def add_relation(
1220 :param: model_name: Model name
1221 :param: endpoint_1 First endpoint name
1222 ("app:endpoint" format or directly the saas name)
1223 :param: endpoint_2: Second endpoint name (^ same format)
1226 self
.log
.debug("Adding relation: {} -> {}".format(endpoint_1
, endpoint_2
))
1229 controller
= await self
.get_controller()
1232 model
= await self
.get_model(controller
, model_name
)
1236 await model
.add_relation(endpoint_1
, endpoint_2
)
1237 except juju
.errors
.JujuAPIError
as e
:
1238 if "not found" in e
.message
:
1239 self
.log
.warning("Relation not found: {}".format(e
.message
))
1241 if "already exists" in e
.message
:
1242 self
.log
.warning("Relation already exists: {}".format(e
.message
))
1244 # another exception, raise it
1247 await self
.disconnect_model(model
)
1248 await self
.disconnect_controller(controller
)
1250 async def offer(self
, endpoint
: RelationEndpoint
) -> Offer
:
1252 Create an offer from a RelationEndpoint
1254 :param: endpoint: Relation endpoint
1256 :return: Offer object
1258 model_name
= endpoint
.model_name
1259 offer_name
= f
"{endpoint.application_name}-{endpoint.endpoint_name}"
1260 controller
= await self
.get_controller()
1263 model
= await self
.get_model(controller
, model_name
)
1264 await model
.create_offer(endpoint
.endpoint
, offer_name
=offer_name
)
1265 offer_list
= await self
._list
_offers
(model_name
, offer_name
=offer_name
)
1267 return Offer(offer_list
[0].offer_url
)
1269 raise Exception("offer was not created")
1270 except juju
.errors
.JujuError
as e
:
1271 if "application offer already exists" not in e
.message
:
1275 self
.disconnect_model(model
)
1276 self
.disconnect_controller(controller
)
1282 provider_libjuju
: "Libjuju",
1285 Consumes a remote offer in the model. Relations can be created later using "juju relate".
1287 :param: model_name: Model name
1288 :param: offer: Offer object to consume
1289 :param: provider_libjuju: Libjuju object of the provider endpoint
1291 :raises ParseError if there's a problem parsing the offer_url
1292 :raises JujuError if remote offer includes and endpoint
1293 :raises JujuAPIError if the operation is not successful
1295 :returns: Saas name. It is the application name in the model that reference the remote application.
1297 saas_name
= f
'{offer.name}-{offer.model_name.replace("-", "")}'
1299 saas_name
= f
"{saas_name}-{offer.vca_id}"
1300 controller
= await self
.get_controller()
1302 provider_controller
= None
1304 model
= await controller
.get_model(model_name
)
1305 provider_controller
= await provider_libjuju
.get_controller()
1306 await model
.consume(
1307 offer
.url
, application_alias
=saas_name
, controller
=provider_controller
1312 await self
.disconnect_model(model
)
1313 if provider_controller
:
1314 await provider_libjuju
.disconnect_controller(provider_controller
)
1315 await self
.disconnect_controller(controller
)
1317 async def destroy_model(self
, model_name
: str, total_timeout
: float = 1800):
1321 :param: model_name: Model name
1322 :param: total_timeout: Timeout
1325 controller
= await self
.get_controller()
1328 if not await self
.model_exists(model_name
, controller
=controller
):
1331 self
.log
.debug("Destroying model {}".format(model_name
))
1333 model
= await self
.get_model(controller
, model_name
)
1334 # Destroy machines that are manually provisioned
1335 # and still are in pending state
1336 await self
._destroy
_pending
_machines
(model
, only_manual
=True)
1337 await self
.disconnect_model(model
)
1339 await self
._destroy
_model
(
1342 timeout
=total_timeout
,
1346 await self
.disconnect_model(model
)
1347 await self
.disconnect_controller(controller
)
1349 async def _destroy_model(
1350 self
, model_name
: str, controller
: Controller
, timeout
: float = 1800
1353 Destroy model from controller
1355 :param: model: Model name to be removed
1356 :param: controller: Controller object
1357 :param: timeout: Timeout in seconds
1360 async def _destroy_model_loop(model_name
: str, controller
: Controller
):
1361 while await self
.model_exists(model_name
, controller
=controller
):
1362 await controller
.destroy_model(
1363 model_name
, destroy_storage
=True, force
=True, max_wait
=0
1365 await asyncio
.sleep(5)
1368 await asyncio
.wait_for(
1369 _destroy_model_loop(model_name
, controller
), timeout
=timeout
1371 except asyncio
.TimeoutError
:
1373 "Timeout waiting for model {} to be destroyed".format(model_name
)
1375 except juju
.errors
.JujuError
as e
:
1376 if any("has been removed" in error
for error
in e
.errors
):
1380 async def destroy_application(
1381 self
, model_name
: str, application_name
: str, total_timeout
: float
1386 :param: model_name: Model name
1387 :param: application_name: Application name
1388 :param: total_timeout: Timeout
1391 controller
= await self
.get_controller()
1395 model
= await self
.get_model(controller
, model_name
)
1397 "Destroying application {} in model {}".format(
1398 application_name
, model_name
1401 application
= self
._get
_application
(model
, application_name
)
1403 await application
.destroy()
1405 self
.log
.warning("Application not found: {}".format(application_name
))
1408 "Waiting for application {} to be destroyed in model {}...".format(
1409 application_name
, model_name
1412 if total_timeout
is None:
1413 total_timeout
= 3600
1414 end
= time
.time() + total_timeout
1415 while time
.time() < end
:
1416 if not self
._get
_application
(model
, application_name
):
1418 "The application {} was destroyed in model {} ".format(
1419 application_name
, model_name
1423 await asyncio
.sleep(5)
1425 "Timeout waiting for application {} to be destroyed in model {}".format(
1426 application_name
, model_name
1430 if model
is not None:
1431 await self
.disconnect_model(model
)
1432 await self
.disconnect_controller(controller
)
1434 async def _destroy_pending_machines(self
, model
: Model
, only_manual
: bool = False):
1436 Destroy pending machines in a given model
1438 :param: only_manual: Bool that indicates only manually provisioned
1439 machines should be destroyed (if True), or that
1440 all pending machines should be destroyed
1442 status
= await model
.get_status()
1443 for machine_id
in status
.machines
:
1444 machine_status
= status
.machines
[machine_id
]
1445 if machine_status
.agent_status
.status
== "pending":
1446 if only_manual
and not machine_status
.instance_id
.startswith("manual:"):
1448 machine
= model
.machines
[machine_id
]
1449 await machine
.destroy(force
=True)
1451 async def configure_application(
1452 self
, model_name
: str, application_name
: str, config
: dict = None
1454 """Configure application
1456 :param: model_name: Model name
1457 :param: application_name: Application name
1458 :param: config: Config to apply to the charm
1460 self
.log
.debug("Configuring application {}".format(application_name
))
1463 controller
= await self
.get_controller()
1466 model
= await self
.get_model(controller
, model_name
)
1467 application
= self
._get
_application
(
1469 application_name
=application_name
,
1471 await application
.set_config(config
)
1474 await self
.disconnect_model(model
)
1475 await self
.disconnect_controller(controller
)
1477 def handle_exception(self
, loop
, context
):
1478 # All unhandled exceptions by libjuju are handled here.
1481 async def health_check(self
, interval
: float = 300.0):
1483 Health check to make sure controller and controller_model connections are OK
1485 :param: interval: Time in seconds between checks
1490 controller
= await self
.get_controller()
1491 # self.log.debug("VCA is alive")
1492 except Exception as e
:
1493 self
.log
.error("Health check to VCA failed: {}".format(e
))
1495 await self
.disconnect_controller(controller
)
1496 await asyncio
.sleep(interval
)
1498 async def list_models(self
, contains
: str = None) -> [str]:
1499 """List models with certain names
1501 :param: contains: String that is contained in model name
1503 :retur: [models] Returns list of model names
1506 controller
= await self
.get_controller()
1508 models
= await controller
.list_models()
1510 models
= [model
for model
in models
if contains
in model
]
1513 await self
.disconnect_controller(controller
)
1515 async def _list_offers(
1516 self
, model_name
: str, offer_name
: str = None
1517 ) -> QueryApplicationOffersResults
:
1519 List offers within a model
1521 :param: model_name: Model name
1522 :param: offer_name: Offer name to filter.
1524 :return: Returns application offers results in the model
1527 controller
= await self
.get_controller()
1529 offers
= (await controller
.list_offers(model_name
)).results
1532 for offer
in offers
:
1533 if offer
.offer_name
== offer_name
:
1534 matching_offer
.append(offer
)
1536 offers
= matching_offer
1539 await self
.disconnect_controller(controller
)
1546 client_cert_data
: str,
1547 configuration
: Configuration
,
1549 credential_name
: str = None,
1552 Add a Kubernetes cloud to the controller
1554 Similar to the `juju add-k8s` command in the CLI
1556 :param: name: Name for the K8s cloud
1557 :param: configuration: Kubernetes configuration object
1558 :param: storage_class: Storage Class to use in the cloud
1559 :param: credential_name: Storage Class to use in the cloud
1562 if not storage_class
:
1563 raise Exception("storage_class must be a non-empty string")
1565 raise Exception("name must be a non-empty string")
1566 if not configuration
:
1567 raise Exception("configuration must be provided")
1569 endpoint
= configuration
.host
1570 credential
= self
.get_k8s_cloud_credential(
1575 credential
.attrs
[RBAC_LABEL_KEY_NAME
] = rbac_id
1576 cloud
= client
.Cloud(
1578 auth_types
=[credential
.auth_type
],
1580 ca_certificates
=[client_cert_data
],
1582 "operator-storage": storage_class
,
1583 "workload-storage": storage_class
,
1587 return await self
.add_cloud(
1588 name
, cloud
, credential
, credential_name
=credential_name
1591 def get_k8s_cloud_credential(
1593 configuration
: Configuration
,
1594 client_cert_data
: str,
1596 ) -> client
.CloudCredential
:
1598 # TODO: Test with AKS
1599 key
= None # open(configuration.key_file, "r").read()
1600 username
= configuration
.username
1601 password
= configuration
.password
1603 if client_cert_data
:
1604 attrs
["ClientCertificateData"] = client_cert_data
1606 attrs
["ClientKeyData"] = key
1608 if username
or password
:
1609 raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass")
1610 attrs
["Token"] = token
1614 auth_type
= "oauth2"
1615 if client_cert_data
:
1616 auth_type
= "oauth2withcert"
1618 raise JujuInvalidK8sConfiguration(
1619 "missing token for auth type {}".format(auth_type
)
1624 "credential for user {} has empty password".format(username
)
1626 attrs
["username"] = username
1627 attrs
["password"] = password
1628 if client_cert_data
:
1629 auth_type
= "userpasswithcert"
1631 auth_type
= "userpass"
1632 elif client_cert_data
and token
:
1633 auth_type
= "certificate"
1635 raise JujuInvalidK8sConfiguration("authentication method not supported")
1636 return client
.CloudCredential(auth_type
=auth_type
, attrs
=attrs
)
1638 async def add_cloud(
1642 credential
: CloudCredential
= None,
1643 credential_name
: str = None,
1646 Add cloud to the controller
1648 :param: name: Name of the cloud to be added
1649 :param: cloud: Cloud object
1650 :param: credential: CloudCredentials object for the cloud
1651 :param: credential_name: Credential name.
1652 If not defined, cloud of the name will be used.
1654 controller
= await self
.get_controller()
1656 _
= await controller
.add_cloud(name
, cloud
)
1658 await controller
.add_credential(
1659 credential_name
or name
, credential
=credential
, cloud
=name
1661 # Need to return the object returned by the controller.add_cloud() function
1662 # I'm returning the original value now until this bug is fixed:
1663 # https://github.com/juju/python-libjuju/issues/443
1666 await self
.disconnect_controller(controller
)
1668 async def remove_cloud(self
, name
: str):
1672 :param: name: Name of the cloud to be removed
1674 controller
= await self
.get_controller()
1676 await controller
.remove_cloud(name
)
1677 except juju
.errors
.JujuError
as e
:
1678 if len(e
.errors
) == 1 and f
'cloud "{name}" not found' == e
.errors
[0]:
1679 self
.log
.warning(f
"Cloud {name} not found, so it could not be deleted.")
1683 await self
.disconnect_controller(controller
)
1685 @retry(attempts
=20, delay
=5, fallback
=JujuLeaderUnitNotFound())
1686 async def _get_leader_unit(self
, application
: Application
) -> Unit
:
1688 for u
in application
.units
:
1689 if await u
.is_leader_from_status():
1696 async def get_cloud_credentials(self
, cloud
: Cloud
) -> typing
.List
:
1698 Get cloud credentials
1700 :param: cloud: Cloud object. The returned credentials will be from this cloud.
1702 :return: List of credentials object associated to the specified cloud
1705 controller
= await self
.get_controller()
1707 facade
= client
.CloudFacade
.from_connection(controller
.connection())
1708 cloud_cred_tag
= tag
.credential(
1709 cloud
.name
, self
.vca_connection
.data
.user
, cloud
.credential_name
1711 params
= [client
.Entity(cloud_cred_tag
)]
1712 return (await facade
.Credential(params
)).results
1714 await self
.disconnect_controller(controller
)
1716 async def check_application_exists(self
, model_name
, application_name
) -> bool:
1717 """Check application exists
1719 :param: model_name: Model Name
1720 :param: application_name: Application Name
1726 controller
= await self
.get_controller()
1728 model
= await self
.get_model(controller
, model_name
)
1730 "Checking if application {} exists in model {}".format(
1731 application_name
, model_name
1734 return self
._get
_application
(model
, application_name
) is not None
1737 await self
.disconnect_model(model
)
1738 await self
.disconnect_controller(controller
)