1 # Copyright 2020 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
24 from juju
.bundle
import BundleHandler
25 from juju
.model
import Model
26 from juju
.machine
import Machine
27 from juju
.application
import Application
28 from juju
.unit
import Unit
29 from juju
.url
import URL
30 from juju
.version
import DEFAULT_ARCHITECTURE
31 from juju
.client
._definitions
import (
33 QueryApplicationOffersResults
,
37 from juju
.controller
import Controller
38 from juju
.client
import client
41 from n2vc
.definitions
import Offer
, RelationEndpoint
42 from n2vc
.juju_watcher
import JujuModelWatcher
43 from n2vc
.provisioner
import AsyncSSHProvisioner
44 from n2vc
.n2vc_conn
import N2VCConnector
45 from n2vc
.exceptions
import (
47 JujuApplicationNotFound
,
48 JujuLeaderUnitNotFound
,
50 JujuControllerFailedConnecting
,
51 JujuApplicationExists
,
52 JujuInvalidK8sConfiguration
,
55 from n2vc
.vca
.cloud
import Cloud
as VcaCloud
56 from n2vc
.vca
.connection
import Connection
57 from kubernetes
.client
.configuration
import Configuration
58 from retrying_async
import retry
61 RBAC_LABEL_KEY_NAME
= "rbac-id"
65 def retry_callback(attempt
, exc
, args
, kwargs
, delay
=0.5, *, loop
):
66 # Specifically overridden from upstream implementation so it can
67 # continue to work with Python 3.10
68 yield from asyncio
.sleep(attempt
* delay
)
75 vca_connection
: Connection
,
76 log
: logging
.Logger
= None,
77 n2vc
: N2VCConnector
= None,
82 :param: vca_connection: n2vc.vca.connection object
84 :param: n2vc: N2VC object
87 self
.log
= log
or logging
.getLogger("Libjuju")
89 self
.vca_connection
= vca_connection
91 self
.creating_model
= asyncio
.Lock()
93 if self
.vca_connection
.is_default
:
94 self
.health_check_task
= self
._create
_health
_check
_task
()
96 def _create_health_check_task(self
):
97 return asyncio
.get_event_loop().create_task(self
.health_check())
99 async def get_controller(self
, timeout
: float = 60.0) -> Controller
:
103 :param: timeout: Time in seconds to wait for controller to connect
107 controller
= Controller()
108 await asyncio
.wait_for(
110 endpoint
=self
.vca_connection
.data
.endpoints
,
111 username
=self
.vca_connection
.data
.user
,
112 password
=self
.vca_connection
.data
.secret
,
113 cacert
=self
.vca_connection
.data
.cacert
,
117 if self
.vca_connection
.is_default
:
118 endpoints
= await controller
.api_endpoints
120 endpoint
in self
.vca_connection
.endpoints
for endpoint
in endpoints
122 await self
.vca_connection
.update_endpoints(endpoints
)
124 except asyncio
.CancelledError
as e
:
126 except Exception as e
:
128 "Failed connecting to controller: {}... {}".format(
129 self
.vca_connection
.data
.endpoints
, e
133 await self
.disconnect_controller(controller
)
135 raise JujuControllerFailedConnecting(
136 f
"Error connecting to Juju controller: {e}"
139 async def disconnect(self
):
141 # Cancel health check task
142 self
.health_check_task
.cancel()
143 self
.log
.debug("Libjuju disconnected!")
145 async def disconnect_model(self
, model
: Model
):
149 :param: model: Model that will be disconnected
151 await model
.disconnect()
153 async def disconnect_controller(self
, controller
: Controller
):
155 Disconnect controller
157 :param: controller: Controller that will be disconnected
160 await controller
.disconnect()
162 @retry(attempts
=3, delay
=5, timeout
=None, callback
=retry_callback
)
163 async def add_model(self
, model_name
: str, cloud
: VcaCloud
):
167 :param: model_name: Model name
168 :param: cloud: Cloud object
172 controller
= await self
.get_controller()
175 # Block until other workers have finished model creation
176 while self
.creating_model
.locked():
177 await asyncio
.sleep(0.1)
180 async with self
.creating_model
:
181 if await self
.model_exists(model_name
, controller
=controller
):
183 self
.log
.debug("Creating model {}".format(model_name
))
184 model
= await controller
.add_model(
186 config
=self
.vca_connection
.data
.model_config
,
187 cloud_name
=cloud
.name
,
188 credential_name
=cloud
.credential_name
,
190 except juju
.errors
.JujuAPIError
as e
:
191 if "already exists" in e
.message
:
197 await self
.disconnect_model(model
)
198 await self
.disconnect_controller(controller
)
200 async def get_executed_actions(self
, model_name
: str) -> list:
202 Get executed/history of actions for a model.
204 :param: model_name: Model name, str.
205 :return: List of executed actions for a model.
208 executed_actions
= []
209 controller
= await self
.get_controller()
211 model
= await self
.get_model(controller
, model_name
)
212 # Get all unique action names
214 for application
in model
.applications
:
215 application_actions
= await self
.get_actions(application
, model_name
)
216 actions
.update(application_actions
)
217 # Get status of all actions
218 for application_action
in actions
:
219 app_action_status_list
= await model
.get_action_status(
220 name
=application_action
222 for action_id
, action_status
in app_action_status_list
.items():
225 "action": application_action
,
226 "status": action_status
,
228 # Get action output by id
229 action_status
= await model
.get_action_output(executed_action
["id"])
230 for k
, v
in action_status
.items():
231 executed_action
[k
] = v
232 executed_actions
.append(executed_action
)
233 except Exception as e
:
235 "Error in getting executed actions for model: {}. Error: {}".format(
241 await self
.disconnect_model(model
)
242 await self
.disconnect_controller(controller
)
243 return executed_actions
245 async def get_application_configs(
246 self
, model_name
: str, application_name
: str
249 Get available configs for an application.
251 :param: model_name: Model name, str.
252 :param: application_name: Application name, str.
254 :return: A dict which has key - action name, value - action description
257 application_configs
= {}
258 controller
= await self
.get_controller()
260 model
= await self
.get_model(controller
, model_name
)
261 application
= self
._get
_application
(
262 model
, application_name
=application_name
264 application_configs
= await application
.get_config()
265 except Exception as e
:
267 "Error in getting configs for application: {} in model: {}. Error: {}".format(
268 application_name
, model_name
, str(e
)
273 await self
.disconnect_model(model
)
274 await self
.disconnect_controller(controller
)
275 return application_configs
277 @retry(attempts
=3, delay
=5, callback
=retry_callback
)
278 async def get_model(self
, controller
: Controller
, model_name
: str) -> Model
:
280 Get model from controller
282 :param: controller: Controller
283 :param: model_name: Model name
285 :return: Model: The created Juju model object
287 return await controller
.get_model(model_name
)
289 async def model_exists(
290 self
, model_name
: str, controller
: Controller
= None
293 Check if model exists
295 :param: controller: Controller
296 :param: model_name: Model name
300 need_to_disconnect
= False
302 # Get controller if not passed
304 controller
= await self
.get_controller()
305 need_to_disconnect
= True
307 # Check if model exists
309 return model_name
in await controller
.list_models()
311 if need_to_disconnect
:
312 await self
.disconnect_controller(controller
)
314 async def models_exist(self
, model_names
: [str]) -> (bool, list):
316 Check if models exists
318 :param: model_names: List of strings with model names
320 :return (bool, list[str]): (True if all models exists, List of model names that don't exist)
324 "model_names must be a non-empty array. Given value: {}".format(
328 non_existing_models
= []
329 models
= await self
.list_models()
330 existing_models
= list(set(models
).intersection(model_names
))
331 non_existing_models
= list(set(model_names
) - set(existing_models
))
334 len(non_existing_models
) == 0,
338 async def get_model_status(self
, model_name
: str) -> FullStatus
:
342 :param: model_name: Model name
344 :return: Full status object
346 controller
= await self
.get_controller()
347 model
= await self
.get_model(controller
, model_name
)
349 return await model
.get_status()
351 await self
.disconnect_model(model
)
352 await self
.disconnect_controller(controller
)
354 async def create_machine(
357 machine_id
: str = None,
358 db_dict
: dict = None,
359 progress_timeout
: float = None,
360 total_timeout
: float = None,
361 series
: str = "bionic",
363 ) -> (Machine
, bool):
367 :param: model_name: Model name
368 :param: machine_id: Machine id
369 :param: db_dict: Dictionary with data of the DB to write the updates
370 :param: progress_timeout: Maximum time between two updates in the model
371 :param: total_timeout: Timeout for the entity to be active
372 :param: series: Series of the machine (xenial, bionic, focal, ...)
373 :param: wait: Wait until machine is ready
375 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
376 if the machine is new or it already existed
382 "Creating machine (id={}) in model: {}".format(machine_id
, model_name
)
386 controller
= await self
.get_controller()
389 model
= await self
.get_model(controller
, model_name
)
391 if machine_id
is not None:
393 "Searching machine (id={}) in model {}".format(
394 machine_id
, model_name
398 # Get machines from model and get the machine with machine_id if exists
399 machines
= await model
.get_machines()
400 if machine_id
in machines
:
402 "Machine (id={}) found in model {}".format(
403 machine_id
, model_name
406 machine
= machines
[machine_id
]
408 raise JujuMachineNotFound("Machine {} not found".format(machine_id
))
411 self
.log
.debug("Creating a new machine in model {}".format(model_name
))
414 machine
= await model
.add_machine(
415 spec
=None, constraints
=None, disks
=None, series
=series
419 # Wait until the machine is ready
421 "Wait until machine {} is ready in model {}".format(
422 machine
.entity_id
, model_name
426 await JujuModelWatcher
.wait_for(
429 progress_timeout
=progress_timeout
,
430 total_timeout
=total_timeout
,
433 vca_id
=self
.vca_connection
._vca
_id
,
436 await self
.disconnect_model(model
)
437 await self
.disconnect_controller(controller
)
440 "Machine {} ready at {} in model {}".format(
441 machine
.entity_id
, machine
.dns_name
, model_name
446 async def provision_machine(
451 private_key_path
: str,
452 db_dict
: dict = None,
453 progress_timeout
: float = None,
454 total_timeout
: float = None,
457 Manually provisioning of a machine
459 :param: model_name: Model name
460 :param: hostname: IP to access the machine
461 :param: username: Username to login to the machine
462 :param: private_key_path: Local path for the private key
463 :param: db_dict: Dictionary with data of the DB to write the updates
464 :param: progress_timeout: Maximum time between two updates in the model
465 :param: total_timeout: Timeout for the entity to be active
467 :return: (Entity): Machine id
470 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
471 model_name
, hostname
, username
476 controller
= await self
.get_controller()
479 model
= await self
.get_model(controller
, model_name
)
483 provisioner
= AsyncSSHProvisioner(
486 private_key_path
=private_key_path
,
491 params
= await provisioner
.provision_machine()
493 params
.jobs
= ["JobHostUnits"]
495 self
.log
.debug("Adding machine to model")
496 connection
= model
.connection()
497 client_facade
= client
.ClientFacade
.from_connection(connection
)
499 results
= await client_facade
.AddMachines(params
=[params
])
500 error
= results
.machines
[0].error
503 msg
= "Error adding machine: {}".format(error
.message
)
504 self
.log
.error(msg
=msg
)
505 raise ValueError(msg
)
507 machine_id
= results
.machines
[0].machine
509 self
.log
.debug("Installing Juju agent into machine {}".format(machine_id
))
510 asyncio
.ensure_future(
511 provisioner
.install_agent(
512 connection
=connection
,
514 machine_id
=machine_id
,
515 proxy
=self
.vca_connection
.data
.api_proxy
,
516 series
=params
.series
,
522 machine_list
= await model
.get_machines()
523 if machine_id
in machine_list
:
524 self
.log
.debug("Machine {} found in model!".format(machine_id
))
525 machine
= model
.machines
.get(machine_id
)
527 await asyncio
.sleep(2)
530 msg
= "Machine {} not found in model".format(machine_id
)
531 self
.log
.error(msg
=msg
)
532 raise JujuMachineNotFound(msg
)
535 "Wait until machine {} is ready in model {}".format(
536 machine
.entity_id
, model_name
539 await JujuModelWatcher
.wait_for(
542 progress_timeout
=progress_timeout
,
543 total_timeout
=total_timeout
,
546 vca_id
=self
.vca_connection
._vca
_id
,
548 except Exception as e
:
551 await self
.disconnect_model(model
)
552 await self
.disconnect_controller(controller
)
555 "Machine provisioned {} in model {}".format(machine_id
, model_name
)
565 timeout
: float = 3600,
566 instantiation_params
: dict = None,
569 Deploy bundle or charm: Similar to the juju CLI command `juju deploy`
571 :param uri: Path or Charm Store uri in which the charm or bundle can be found
572 :param model_name: Model name
573 :param wait: Indicates whether to wait or not until all applications are active
574 :param timeout: Time in seconds to wait until all applications are active
575 :param instantiation_params: To be applied as overlay bundle over primary bundle.
577 controller
= await self
.get_controller()
578 model
= await self
.get_model(controller
, model_name
)
581 await self
._validate
_instantiation
_params
(uri
, model
, instantiation_params
)
582 overlays
= self
._get
_overlays
(model_name
, instantiation_params
)
583 await model
.deploy(uri
, trust
=True, overlays
=overlays
)
585 await JujuModelWatcher
.wait_for_model(model
, timeout
=timeout
)
586 self
.log
.debug("All units active in model {}".format(model_name
))
588 self
._remove
_overlay
_file
(overlays
)
589 await self
.disconnect_model(model
)
590 await self
.disconnect_controller(controller
)
592 async def _validate_instantiation_params(
593 self
, uri
: str, model
, instantiation_params
: dict
595 """Checks if all the applications in instantiation_params
596 exist ins the original bundle.
599 JujuApplicationNotFound if there is an invalid app in
600 the instantiation params.
602 overlay_apps
= self
._get
_apps
_in
_instantiation
_params
(instantiation_params
)
605 original_apps
= await self
._get
_apps
_in
_original
_bundle
(uri
, model
)
606 if not all(app
in original_apps
for app
in overlay_apps
):
607 raise JujuApplicationNotFound(
608 "Cannot find application {} in original bundle {}".format(
609 overlay_apps
, original_apps
613 async def _get_apps_in_original_bundle(self
, uri
: str, model
) -> set:
614 """Bundle is downloaded in BundleHandler.fetch_plan.
615 That method takes care of opening and exception handling.
617 Resolve method gets all the information regarding the channel,
618 track, revision, type, source.
621 Set with the names of the applications in original bundle.
624 architecture
= DEFAULT_ARCHITECTURE
# only AMD64 is allowed
625 res
= await model
.deploy_types
[str(url
.schema
)].resolve(
626 url
, architecture
, entity_url
=uri
628 handler
= BundleHandler(model
, trusted
=True, forced
=False)
629 await handler
.fetch_plan(url
, res
.origin
)
630 return handler
.applications
632 def _get_apps_in_instantiation_params(self
, instantiation_params
: dict) -> list:
633 """Extract applications key in instantiation params.
636 List with the names of the applications in instantiation params.
639 JujuError if applications key is not found.
641 if not instantiation_params
:
644 return [key
for key
in instantiation_params
.get("applications")]
645 except Exception as e
:
646 raise JujuError("Invalid overlay format. {}".format(str(e
)))
648 def _get_overlays(self
, model_name
: str, instantiation_params
: dict) -> list:
649 """Creates a temporary overlay file which includes the instantiation params.
650 Only one overlay file is created.
653 List with one overlay filename. Empty list if there are no instantiation params.
655 if not instantiation_params
:
657 file_name
= model_name
+ "-overlay.yaml"
658 self
._write
_overlay
_file
(file_name
, instantiation_params
)
661 def _write_overlay_file(self
, file_name
: str, instantiation_params
: dict) -> None:
662 with
open(file_name
, "w") as file:
663 yaml
.dump(instantiation_params
, file)
665 def _remove_overlay_file(self
, overlay
: list) -> None:
666 """Overlay contains either one or zero file names."""
670 filename
= overlay
[0]
674 "Overlay file {} could not be removed: {}".format(filename
, e
)
679 application_name
: str,
682 db_dict
: dict = None,
683 progress_timeout
: float = None,
684 total_timeout
: float = None,
688 :param: application_name: Application name
689 :param: model_name: Model name
690 :param: machine_id Machine id
691 :param: db_dict: Dictionary with data of the DB to write the updates
692 :param: progress_timeout: Maximum time between two updates in the model
693 :param: total_timeout: Timeout for the entity to be active
699 controller
= await self
.get_controller()
701 model
= await self
.get_model(controller
, model_name
)
702 application
= self
._get
_application
(model
, application_name
)
704 if application
is not None:
705 # Checks if the given machine id in the model,
706 # otherwise function raises an error
707 _machine
, _series
= self
._get
_machine
_info
(model
, machine_id
)
710 "Adding unit (machine {}) to application {} in model ~{}".format(
711 machine_id
, application_name
, model_name
715 await application
.add_unit(to
=machine_id
)
717 await JujuModelWatcher
.wait_for(
720 progress_timeout
=progress_timeout
,
721 total_timeout
=total_timeout
,
724 vca_id
=self
.vca_connection
._vca
_id
,
727 "Unit is added to application {} in model {}".format(
728 application_name
, model_name
732 raise JujuApplicationNotFound(
733 "Application {} not exists".format(application_name
)
737 await self
.disconnect_model(model
)
738 await self
.disconnect_controller(controller
)
740 async def destroy_unit(
742 application_name
: str,
745 total_timeout
: float = None,
749 :param: application_name: Application name
750 :param: model_name: Model name
751 :param: machine_id Machine id
752 :param: total_timeout: Timeout for the entity to be active
758 controller
= await self
.get_controller()
760 model
= await self
.get_model(controller
, model_name
)
761 application
= self
._get
_application
(model
, application_name
)
763 if application
is None:
764 raise JujuApplicationNotFound(
765 "Application not found: {} (model={})".format(
766 application_name
, model_name
770 unit
= self
._get
_unit
(application
, machine_id
)
773 "A unit with machine id {} not in available units".format(
778 unit_name
= unit
.name
781 "Destroying unit {} from application {} in model {}".format(
782 unit_name
, application_name
, model_name
785 await application
.destroy_unit(unit_name
)
788 "Waiting for unit {} to be destroyed in application {} (model={})...".format(
789 unit_name
, application_name
, model_name
793 # TODO: Add functionality in the Juju watcher to replace this kind of blocks
794 if total_timeout
is None:
796 end
= time
.time() + total_timeout
797 while time
.time() < end
:
798 if not self
._get
_unit
(application
, machine_id
):
800 "The unit {} was destroyed in application {} (model={}) ".format(
801 unit_name
, application_name
, model_name
805 await asyncio
.sleep(5)
807 "Unit {} is destroyed from application {} in model {}".format(
808 unit_name
, application_name
, model_name
813 await self
.disconnect_model(model
)
814 await self
.disconnect_controller(controller
)
816 async def deploy_charm(
818 application_name
: str,
822 db_dict
: dict = None,
823 progress_timeout
: float = None,
824 total_timeout
: float = None,
831 :param: application_name: Application name
832 :param: path: Local path to the charm
833 :param: model_name: Model name
834 :param: machine_id ID of the machine
835 :param: db_dict: Dictionary with data of the DB to write the updates
836 :param: progress_timeout: Maximum time between two updates in the model
837 :param: total_timeout: Timeout for the entity to be active
838 :param: config: Config for the charm
839 :param: series: Series of the charm
840 :param: num_units: Number of units
842 :return: (juju.application.Application): Juju application
845 "Deploying charm {} to machine {} in model ~{}".format(
846 application_name
, machine_id
, model_name
849 self
.log
.debug("charm: {}".format(path
))
852 controller
= await self
.get_controller()
855 model
= await self
.get_model(controller
, model_name
)
858 if application_name
not in model
.applications
:
859 if machine_id
is not None:
860 machine
, series
= self
._get
_machine
_info
(model
, machine_id
)
862 application
= await model
.deploy(
864 application_name
=application_name
,
873 "Wait until application {} is ready in model {}".format(
874 application_name
, model_name
878 for _
in range(num_units
- 1):
879 m
, _
= await self
.create_machine(model_name
, wait
=False)
880 await application
.add_unit(to
=m
.entity_id
)
882 await JujuModelWatcher
.wait_for(
885 progress_timeout
=progress_timeout
,
886 total_timeout
=total_timeout
,
889 vca_id
=self
.vca_connection
._vca
_id
,
892 "Application {} is ready in model {}".format(
893 application_name
, model_name
897 raise JujuApplicationExists(
898 "Application {} exists".format(application_name
)
900 except juju
.errors
.JujuError
as e
:
901 if "already exists" in e
.message
:
902 raise JujuApplicationExists(
903 "Application {} exists".format(application_name
)
908 await self
.disconnect_model(model
)
909 await self
.disconnect_controller(controller
)
913 async def upgrade_charm(
915 application_name
: str,
918 total_timeout
: float = None,
923 :param: application_name: Application name
924 :param: model_name: Model name
925 :param: path: Local path to the charm
926 :param: total_timeout: Timeout for the entity to be active
928 :return: (str, str): (output and status)
932 "Upgrading charm {} in model {} from path {}".format(
933 application_name
, model_name
, path
937 await self
.resolve_application(
938 model_name
=model_name
, application_name
=application_name
942 controller
= await self
.get_controller()
945 model
= await self
.get_model(controller
, model_name
)
949 application
= self
._get
_application
(
951 application_name
=application_name
,
953 if application
is None:
954 raise JujuApplicationNotFound(
955 "Cannot find application {} to upgrade".format(application_name
)
958 await application
.refresh(path
=path
)
961 "Wait until charm upgrade is completed for application {} (model={})".format(
962 application_name
, model_name
966 await JujuModelWatcher
.ensure_units_idle(
967 model
=model
, application
=application
970 if application
.status
== "error":
971 error_message
= "Unknown"
972 for unit
in application
.units
:
974 unit
.workload_status
== "error"
975 and unit
.workload_status_message
!= ""
977 error_message
= unit
.workload_status_message
979 message
= "Application {} failed update in {}: {}".format(
980 application_name
, model_name
, error_message
982 self
.log
.error(message
)
983 raise JujuError(message
=message
)
986 "Application {} is ready in model {}".format(
987 application_name
, model_name
992 await self
.disconnect_model(model
)
993 await self
.disconnect_controller(controller
)
997 async def resolve_application(self
, model_name
: str, application_name
: str):
998 controller
= await self
.get_controller()
999 model
= await self
.get_model(controller
, model_name
)
1002 application
= self
._get
_application
(
1004 application_name
=application_name
,
1006 if application
is None:
1007 raise JujuApplicationNotFound(
1008 "Cannot find application {} to resolve".format(application_name
)
1011 while application
.status
== "error":
1012 for unit
in application
.units
:
1013 if unit
.workload_status
== "error":
1015 "Model {}, Application {}, Unit {} in error state, resolving".format(
1016 model_name
, application_name
, unit
.entity_id
1020 await unit
.resolved(retry
=False)
1024 await asyncio
.sleep(1)
1027 await self
.disconnect_model(model
)
1028 await self
.disconnect_controller(controller
)
1030 async def resolve(self
, model_name
: str):
1031 controller
= await self
.get_controller()
1032 model
= await self
.get_model(controller
, model_name
)
1033 all_units_active
= False
1035 while not all_units_active
:
1036 all_units_active
= True
1037 for application_name
, application
in model
.applications
.items():
1038 if application
.status
== "error":
1039 for unit
in application
.units
:
1040 if unit
.workload_status
== "error":
1042 "Model {}, Application {}, Unit {} in error state, resolving".format(
1043 model_name
, application_name
, unit
.entity_id
1047 await unit
.resolved(retry
=False)
1048 all_units_active
= False
1052 if not all_units_active
:
1053 await asyncio
.sleep(5)
1055 await self
.disconnect_model(model
)
1056 await self
.disconnect_controller(controller
)
1058 async def scale_application(
1061 application_name
: str,
1063 total_timeout
: float = None,
1066 Scale application (K8s)
1068 :param: model_name: Model name
1069 :param: application_name: Application name
1070 :param: scale: Scale to which to set this application
1071 :param: total_timeout: Timeout for the entity to be active
1075 controller
= await self
.get_controller()
1077 model
= await self
.get_model(controller
, model_name
)
1080 "Scaling application {} in model {}".format(
1081 application_name
, model_name
1084 application
= self
._get
_application
(model
, application_name
)
1085 if application
is None:
1086 raise JujuApplicationNotFound("Cannot scale application")
1087 await application
.scale(scale
=scale
)
1088 # Wait until application is scaled in model
1090 "Waiting for application {} to be scaled in model {}...".format(
1091 application_name
, model_name
1094 if total_timeout
is None:
1095 total_timeout
= 1800
1096 end
= time
.time() + total_timeout
1097 while time
.time() < end
:
1098 application_scale
= self
._get
_application
_count
(model
, application_name
)
1099 # Before calling wait_for_model function,
1100 # wait until application unit count and scale count are equal.
1101 # Because there is a delay before scaling triggers in Juju model.
1102 if application_scale
== scale
:
1103 await JujuModelWatcher
.wait_for_model(
1104 model
=model
, timeout
=total_timeout
1107 "Application {} is scaled in model {}".format(
1108 application_name
, model_name
1112 await asyncio
.sleep(5)
1114 "Timeout waiting for application {} in model {} to be scaled".format(
1115 application_name
, model_name
1120 await self
.disconnect_model(model
)
1121 await self
.disconnect_controller(controller
)
1123 def _get_application_count(self
, model
: Model
, application_name
: str) -> int:
1124 """Get number of units of the application
1126 :param: model: Model object
1127 :param: application_name: Application name
1129 :return: int (or None if application doesn't exist)
1131 application
= self
._get
_application
(model
, application_name
)
1132 if application
is not None:
1133 return len(application
.units
)
1135 def _get_application(self
, model
: Model
, application_name
: str) -> Application
:
1138 :param: model: Model object
1139 :param: application_name: Application name
1141 :return: juju.application.Application (or None if it doesn't exist)
1143 if model
.applications
and application_name
in model
.applications
:
1144 return model
.applications
[application_name
]
1146 def _get_unit(self
, application
: Application
, machine_id
: str) -> Unit
:
1149 :param: application: Application object
1150 :param: machine_id: Machine id
1155 for u
in application
.units
:
1156 if u
.machine_id
== machine_id
:
1161 def _get_machine_info(
1168 :param: model: Model object
1169 :param: machine_id: Machine id
1171 :return: (str, str): (machine, series)
1173 if machine_id
not in model
.machines
:
1174 msg
= "Machine {} not found in model".format(machine_id
)
1175 self
.log
.error(msg
=msg
)
1176 raise JujuMachineNotFound(msg
)
1177 machine
= model
.machines
[machine_id
]
1178 return machine
, machine
.series
1180 async def execute_action(
1182 application_name
: str,
1185 db_dict
: dict = None,
1186 machine_id
: str = None,
1187 progress_timeout
: float = None,
1188 total_timeout
: float = None,
1193 :param: application_name: Application name
1194 :param: model_name: Model name
1195 :param: action_name: Name of the action
1196 :param: db_dict: Dictionary with data of the DB to write the updates
1197 :param: machine_id Machine id
1198 :param: progress_timeout: Maximum time between two updates in the model
1199 :param: total_timeout: Timeout for the entity to be active
1201 :return: (str, str): (output and status)
1204 "Executing action {} using params {}".format(action_name
, kwargs
)
1207 controller
= await self
.get_controller()
1210 model
= await self
.get_model(controller
, model_name
)
1214 application
= self
._get
_application
(
1216 application_name
=application_name
,
1218 if application
is None:
1219 raise JujuApplicationNotFound("Cannot execute action")
1221 # Ocassionally, self._get_leader_unit() will return None
1222 # because the leader elected hook has not been triggered yet.
1223 # Therefore, we are doing some retries. If it happens again,
1225 if machine_id
is None:
1226 unit
= await self
._get
_leader
_unit
(application
)
1228 "Action {} is being executed on the leader unit {}".format(
1229 action_name
, unit
.name
1233 unit
= self
._get
_unit
(application
, machine_id
)
1236 "A unit with machine id {} not in available units".format(
1241 "Action {} is being executed on {} unit".format(
1242 action_name
, unit
.name
1246 actions
= await application
.get_actions()
1248 if action_name
not in actions
:
1249 raise JujuActionNotFound(
1250 "Action {} not in available actions".format(action_name
)
1253 action
= await unit
.run_action(action_name
, **kwargs
)
1256 "Wait until action {} is completed in application {} (model={})".format(
1257 action_name
, application_name
, model_name
1260 await JujuModelWatcher
.wait_for(
1263 progress_timeout
=progress_timeout
,
1264 total_timeout
=total_timeout
,
1267 vca_id
=self
.vca_connection
._vca
_id
,
1270 output
= await model
.get_action_output(action_uuid
=action
.entity_id
)
1271 status
= await model
.get_action_status(uuid_or_prefix
=action
.entity_id
)
1273 status
[action
.entity_id
] if action
.entity_id
in status
else "failed"
1277 "Action {} completed with status {} in application {} (model={})".format(
1278 action_name
, action
.status
, application_name
, model_name
1282 await self
.disconnect_model(model
)
1283 await self
.disconnect_controller(controller
)
1285 return output
, status
1287 async def get_actions(self
, application_name
: str, model_name
: str) -> dict:
1288 """Get list of actions
1290 :param: application_name: Application name
1291 :param: model_name: Model name
1293 :return: Dict with this format
1295 "action_name": "Description of the action",
1300 "Getting list of actions for application {}".format(application_name
)
1304 controller
= await self
.get_controller()
1307 model
= await self
.get_model(controller
, model_name
)
1311 application
= self
._get
_application
(
1313 application_name
=application_name
,
1316 # Return list of actions
1317 return await application
.get_actions()
1320 # Disconnect from model and controller
1321 await self
.disconnect_model(model
)
1322 await self
.disconnect_controller(controller
)
1324 async def get_metrics(self
, model_name
: str, application_name
: str) -> dict:
1325 """Get the metrics collected by the VCA.
1327 :param model_name The name or unique id of the network service
1328 :param application_name The name of the application
1330 if not model_name
or not application_name
:
1331 raise Exception("model_name and application_name must be non-empty strings")
1333 controller
= await self
.get_controller()
1334 model
= await self
.get_model(controller
, model_name
)
1336 application
= self
._get
_application
(model
, application_name
)
1337 if application
is not None:
1338 metrics
= await application
.get_metrics()
1340 self
.disconnect_model(model
)
1341 self
.disconnect_controller(controller
)
1344 async def add_relation(
1352 :param: model_name: Model name
1353 :param: endpoint_1 First endpoint name
1354 ("app:endpoint" format or directly the saas name)
1355 :param: endpoint_2: Second endpoint name (^ same format)
1358 self
.log
.debug("Adding relation: {} -> {}".format(endpoint_1
, endpoint_2
))
1361 controller
= await self
.get_controller()
1364 model
= await self
.get_model(controller
, model_name
)
1368 await model
.add_relation(endpoint_1
, endpoint_2
)
1369 except juju
.errors
.JujuAPIError
as e
:
1370 if self
._relation
_is
_not
_found
(e
):
1371 self
.log
.warning("Relation not found: {}".format(e
.message
))
1373 if self
._relation
_already
_exist
(e
):
1374 self
.log
.warning("Relation already exists: {}".format(e
.message
))
1376 # another exception, raise it
1379 await self
.disconnect_model(model
)
1380 await self
.disconnect_controller(controller
)
1382 def _relation_is_not_found(self
, juju_error
):
1384 return (text
in juju_error
.message
) or (
1385 juju_error
.error_code
and text
in juju_error
.error_code
1388 def _relation_already_exist(self
, juju_error
):
1389 text
= "already exists"
1390 return (text
in juju_error
.message
) or (
1391 juju_error
.error_code
and text
in juju_error
.error_code
1394 async def offer(self
, endpoint
: RelationEndpoint
) -> Offer
:
1396 Create an offer from a RelationEndpoint
1398 :param: endpoint: Relation endpoint
1400 :return: Offer object
1402 model_name
= endpoint
.model_name
1403 offer_name
= f
"{endpoint.application_name}-{endpoint.endpoint_name}"
1404 controller
= await self
.get_controller()
1407 model
= await self
.get_model(controller
, model_name
)
1408 await model
.create_offer(endpoint
.endpoint
, offer_name
=offer_name
)
1409 offer_list
= await self
._list
_offers
(model_name
, offer_name
=offer_name
)
1411 return Offer(offer_list
[0].offer_url
)
1413 raise Exception("offer was not created")
1414 except juju
.errors
.JujuError
as e
:
1415 if "application offer already exists" not in e
.message
:
1419 self
.disconnect_model(model
)
1420 self
.disconnect_controller(controller
)
1426 provider_libjuju
: "Libjuju",
1429 Consumes a remote offer in the model. Relations can be created later using "juju relate".
1431 :param: model_name: Model name
1432 :param: offer: Offer object to consume
1433 :param: provider_libjuju: Libjuju object of the provider endpoint
1435 :raises ParseError if there's a problem parsing the offer_url
1436 :raises JujuError if remote offer includes and endpoint
1437 :raises JujuAPIError if the operation is not successful
1439 :returns: Saas name. It is the application name in the model that reference the remote application.
1441 saas_name
= f
'{offer.name}-{offer.model_name.replace("-", "")}'
1443 saas_name
= f
"{saas_name}-{offer.vca_id}"
1444 controller
= await self
.get_controller()
1446 provider_controller
= None
1448 model
= await controller
.get_model(model_name
)
1449 provider_controller
= await provider_libjuju
.get_controller()
1450 await model
.consume(
1451 offer
.url
, application_alias
=saas_name
, controller
=provider_controller
1456 await self
.disconnect_model(model
)
1457 if provider_controller
:
1458 await provider_libjuju
.disconnect_controller(provider_controller
)
1459 await self
.disconnect_controller(controller
)
1461 async def destroy_model(self
, model_name
: str, total_timeout
: float = 1800):
1465 :param: model_name: Model name
1466 :param: total_timeout: Timeout
1469 controller
= await self
.get_controller()
1472 if not await self
.model_exists(model_name
, controller
=controller
):
1473 self
.log
.warn(f
"Model {model_name} doesn't exist")
1476 self
.log
.debug(f
"Getting model {model_name} to be destroyed")
1477 model
= await self
.get_model(controller
, model_name
)
1478 self
.log
.debug(f
"Destroying manual machines in model {model_name}")
1479 # Destroy machines that are manually provisioned
1480 # and still are in pending state
1481 await self
._destroy
_pending
_machines
(model
, only_manual
=True)
1482 await self
.disconnect_model(model
)
1484 await asyncio
.wait_for(
1485 self
._destroy
_model
(model_name
, controller
),
1486 timeout
=total_timeout
,
1488 except Exception as e
:
1489 if not await self
.model_exists(model_name
, controller
=controller
):
1491 f
"Failed deleting model {model_name}: model doesn't exist"
1494 self
.log
.warn(f
"Failed deleting model {model_name}: {e}")
1498 await self
.disconnect_model(model
)
1499 await self
.disconnect_controller(controller
)
1501 async def _destroy_model(
1504 controller
: Controller
,
1507 Destroy model from controller
1509 :param: model: Model name to be removed
1510 :param: controller: Controller object
1511 :param: timeout: Timeout in seconds
1513 self
.log
.debug(f
"Destroying model {model_name}")
1515 async def _destroy_model_gracefully(model_name
: str, controller
: Controller
):
1516 self
.log
.info(f
"Gracefully deleting model {model_name}")
1518 while model_name
in await controller
.list_models():
1520 await self
.resolve(model_name
)
1522 await controller
.destroy_model(model_name
, destroy_storage
=True)
1524 await asyncio
.sleep(5)
1525 self
.log
.info(f
"Model {model_name} deleted gracefully")
1527 async def _destroy_model_forcefully(model_name
: str, controller
: Controller
):
1528 self
.log
.info(f
"Forcefully deleting model {model_name}")
1529 while model_name
in await controller
.list_models():
1530 await controller
.destroy_model(
1531 model_name
, destroy_storage
=True, force
=True, max_wait
=60
1533 await asyncio
.sleep(5)
1534 self
.log
.info(f
"Model {model_name} deleted forcefully")
1538 await asyncio
.wait_for(
1539 _destroy_model_gracefully(model_name
, controller
), timeout
=120
1541 except asyncio
.TimeoutError
:
1542 await _destroy_model_forcefully(model_name
, controller
)
1543 except juju
.errors
.JujuError
as e
:
1544 if any("has been removed" in error
for error
in e
.errors
):
1546 if any("model not found" in error
for error
in e
.errors
):
1550 async def destroy_application(
1551 self
, model_name
: str, application_name
: str, total_timeout
: float
1556 :param: model_name: Model name
1557 :param: application_name: Application name
1558 :param: total_timeout: Timeout
1561 controller
= await self
.get_controller()
1565 model
= await self
.get_model(controller
, model_name
)
1567 "Destroying application {} in model {}".format(
1568 application_name
, model_name
1571 application
= self
._get
_application
(model
, application_name
)
1573 await application
.destroy()
1575 self
.log
.warning("Application not found: {}".format(application_name
))
1578 "Waiting for application {} to be destroyed in model {}...".format(
1579 application_name
, model_name
1582 if total_timeout
is None:
1583 total_timeout
= 3600
1584 end
= time
.time() + total_timeout
1585 while time
.time() < end
:
1586 if not self
._get
_application
(model
, application_name
):
1588 "The application {} was destroyed in model {} ".format(
1589 application_name
, model_name
1593 await asyncio
.sleep(5)
1595 "Timeout waiting for application {} to be destroyed in model {}".format(
1596 application_name
, model_name
1600 if model
is not None:
1601 await self
.disconnect_model(model
)
1602 await self
.disconnect_controller(controller
)
1604 async def _destroy_pending_machines(self
, model
: Model
, only_manual
: bool = False):
1606 Destroy pending machines in a given model
1608 :param: only_manual: Bool that indicates only manually provisioned
1609 machines should be destroyed (if True), or that
1610 all pending machines should be destroyed
1612 status
= await model
.get_status()
1613 for machine_id
in status
.machines
:
1614 machine_status
= status
.machines
[machine_id
]
1615 if machine_status
.agent_status
.status
== "pending":
1616 if only_manual
and not machine_status
.instance_id
.startswith("manual:"):
1618 machine
= model
.machines
[machine_id
]
1619 await machine
.destroy(force
=True)
1621 async def configure_application(
1622 self
, model_name
: str, application_name
: str, config
: dict = None
1624 """Configure application
1626 :param: model_name: Model name
1627 :param: application_name: Application name
1628 :param: config: Config to apply to the charm
1630 self
.log
.debug("Configuring application {}".format(application_name
))
1633 controller
= await self
.get_controller()
1636 model
= await self
.get_model(controller
, model_name
)
1637 application
= self
._get
_application
(
1639 application_name
=application_name
,
1641 await application
.set_config(config
)
1644 await self
.disconnect_model(model
)
1645 await self
.disconnect_controller(controller
)
1647 async def health_check(self
, interval
: float = 300.0):
1649 Health check to make sure controller and controller_model connections are OK
1651 :param: interval: Time in seconds between checks
1656 controller
= await self
.get_controller()
1657 # self.log.debug("VCA is alive")
1658 except Exception as e
:
1659 self
.log
.error("Health check to VCA failed: {}".format(e
))
1661 await self
.disconnect_controller(controller
)
1662 await asyncio
.sleep(interval
)
1664 async def list_models(self
, contains
: str = None) -> [str]:
1665 """List models with certain names
1667 :param: contains: String that is contained in model name
1669 :retur: [models] Returns list of model names
1672 controller
= await self
.get_controller()
1674 models
= await controller
.list_models()
1676 models
= [model
for model
in models
if contains
in model
]
1679 await self
.disconnect_controller(controller
)
1681 async def _list_offers(
1682 self
, model_name
: str, offer_name
: str = None
1683 ) -> QueryApplicationOffersResults
:
1685 List offers within a model
1687 :param: model_name: Model name
1688 :param: offer_name: Offer name to filter.
1690 :return: Returns application offers results in the model
1693 controller
= await self
.get_controller()
1695 offers
= (await controller
.list_offers(model_name
)).results
1698 for offer
in offers
:
1699 if offer
.offer_name
== offer_name
:
1700 matching_offer
.append(offer
)
1702 offers
= matching_offer
1705 await self
.disconnect_controller(controller
)
1712 client_cert_data
: str,
1713 configuration
: Configuration
,
1715 credential_name
: str = None,
1718 Add a Kubernetes cloud to the controller
1720 Similar to the `juju add-k8s` command in the CLI
1722 :param: name: Name for the K8s cloud
1723 :param: configuration: Kubernetes configuration object
1724 :param: storage_class: Storage Class to use in the cloud
1725 :param: credential_name: Storage Class to use in the cloud
1728 if not storage_class
:
1729 raise Exception("storage_class must be a non-empty string")
1731 raise Exception("name must be a non-empty string")
1732 if not configuration
:
1733 raise Exception("configuration must be provided")
1735 endpoint
= configuration
.host
1736 credential
= self
.get_k8s_cloud_credential(
1741 credential
.attrs
[RBAC_LABEL_KEY_NAME
] = rbac_id
1742 cloud
= client
.Cloud(
1744 auth_types
=[credential
.auth_type
],
1746 ca_certificates
=[client_cert_data
],
1748 "operator-storage": storage_class
,
1749 "workload-storage": storage_class
,
1753 return await self
.add_cloud(
1754 name
, cloud
, credential
, credential_name
=credential_name
1757 def get_k8s_cloud_credential(
1759 configuration
: Configuration
,
1760 client_cert_data
: str,
1762 ) -> client
.CloudCredential
:
1764 # TODO: Test with AKS
1765 key
= None # open(configuration.key_file, "r").read()
1766 username
= configuration
.username
1767 password
= configuration
.password
1769 if client_cert_data
:
1770 attrs
["ClientCertificateData"] = client_cert_data
1772 attrs
["ClientKeyData"] = key
1774 if username
or password
:
1775 raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass")
1776 attrs
["Token"] = token
1780 auth_type
= "oauth2"
1781 if client_cert_data
:
1782 auth_type
= "oauth2withcert"
1784 raise JujuInvalidK8sConfiguration(
1785 "missing token for auth type {}".format(auth_type
)
1790 "credential for user {} has empty password".format(username
)
1792 attrs
["username"] = username
1793 attrs
["password"] = password
1794 if client_cert_data
:
1795 auth_type
= "userpasswithcert"
1797 auth_type
= "userpass"
1798 elif client_cert_data
and token
:
1799 auth_type
= "certificate"
1801 raise JujuInvalidK8sConfiguration("authentication method not supported")
1802 return client
.CloudCredential(auth_type
=auth_type
, attrs
=attrs
)
1804 async def add_cloud(
1808 credential
: CloudCredential
= None,
1809 credential_name
: str = None,
1812 Add cloud to the controller
1814 :param: name: Name of the cloud to be added
1815 :param: cloud: Cloud object
1816 :param: credential: CloudCredentials object for the cloud
1817 :param: credential_name: Credential name.
1818 If not defined, cloud of the name will be used.
1820 controller
= await self
.get_controller()
1822 _
= await controller
.add_cloud(name
, cloud
)
1824 await controller
.add_credential(
1825 credential_name
or name
, credential
=credential
, cloud
=name
1827 # Need to return the object returned by the controller.add_cloud() function
1828 # I'm returning the original value now until this bug is fixed:
1829 # https://github.com/juju/python-libjuju/issues/443
1832 await self
.disconnect_controller(controller
)
1834 async def remove_cloud(self
, name
: str):
1838 :param: name: Name of the cloud to be removed
1840 controller
= await self
.get_controller()
1842 await controller
.remove_cloud(name
)
1843 except juju
.errors
.JujuError
as e
:
1844 if len(e
.errors
) == 1 and f
'cloud "{name}" not found' == e
.errors
[0]:
1845 self
.log
.warning(f
"Cloud {name} not found, so it could not be deleted.")
1849 await self
.disconnect_controller(controller
)
1852 attempts
=20, delay
=5, fallback
=JujuLeaderUnitNotFound(), callback
=retry_callback
1854 async def _get_leader_unit(self
, application
: Application
) -> Unit
:
1856 for u
in application
.units
:
1857 if await u
.is_leader_from_status():
1864 async def get_cloud_credentials(self
, cloud
: Cloud
) -> typing
.List
:
1866 Get cloud credentials
1868 :param: cloud: Cloud object. The returned credentials will be from this cloud.
1870 :return: List of credentials object associated to the specified cloud
1873 controller
= await self
.get_controller()
1875 facade
= client
.CloudFacade
.from_connection(controller
.connection())
1876 cloud_cred_tag
= tag
.credential(
1877 cloud
.name
, self
.vca_connection
.data
.user
, cloud
.credential_name
1879 params
= [client
.Entity(cloud_cred_tag
)]
1880 return (await facade
.Credential(params
)).results
1882 await self
.disconnect_controller(controller
)
1884 async def check_application_exists(self
, model_name
, application_name
) -> bool:
1885 """Check application exists
1887 :param: model_name: Model Name
1888 :param: application_name: Application Name
1894 controller
= await self
.get_controller()
1896 model
= await self
.get_model(controller
, model_name
)
1898 "Checking if application {} exists in model {}".format(
1899 application_name
, model_name
1902 return self
._get
_application
(model
, application_name
) is not None
1905 await self
.disconnect_model(model
)
1906 await self
.disconnect_controller(controller
)