1 # Copyright 2020 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
24 from juju
.bundle
import BundleHandler
25 from juju
.model
import Model
26 from juju
.machine
import Machine
27 from juju
.application
import Application
28 from juju
.unit
import Unit
29 from juju
.url
import URL
30 from juju
.version
import DEFAULT_ARCHITECTURE
31 from juju
.client
._definitions
import (
33 QueryApplicationOffersResults
,
37 from juju
.controller
import Controller
38 from juju
.client
import client
41 from n2vc
.definitions
import Offer
, RelationEndpoint
42 from n2vc
.juju_watcher
import JujuModelWatcher
43 from n2vc
.provisioner
import AsyncSSHProvisioner
44 from n2vc
.n2vc_conn
import N2VCConnector
45 from n2vc
.exceptions
import (
47 JujuApplicationNotFound
,
48 JujuLeaderUnitNotFound
,
50 JujuControllerFailedConnecting
,
51 JujuApplicationExists
,
52 JujuInvalidK8sConfiguration
,
55 from n2vc
.vca
.cloud
import Cloud
as VcaCloud
56 from n2vc
.vca
.connection
import Connection
57 from kubernetes
.client
.configuration
import Configuration
58 from retrying_async
import retry
61 RBAC_LABEL_KEY_NAME
= "rbac-id"
67 vca_connection
: Connection
,
68 log
: logging
.Logger
= None,
69 n2vc
: N2VCConnector
= None,
74 :param: vca_connection: n2vc.vca.connection object
76 :param: n2vc: N2VC object
79 self
.log
= log
or logging
.getLogger("Libjuju")
81 self
.vca_connection
= vca_connection
83 self
.creating_model
= asyncio
.Lock()
85 if self
.vca_connection
.is_default
:
86 self
.health_check_task
= self
._create
_health
_check
_task
()
88 def _create_health_check_task(self
):
89 return asyncio
.get_event_loop().create_task(self
.health_check())
91 async def get_controller(self
, timeout
: float = 60.0) -> Controller
:
95 :param: timeout: Time in seconds to wait for controller to connect
99 controller
= Controller()
100 await asyncio
.wait_for(
102 endpoint
=self
.vca_connection
.data
.endpoints
,
103 username
=self
.vca_connection
.data
.user
,
104 password
=self
.vca_connection
.data
.secret
,
105 cacert
=self
.vca_connection
.data
.cacert
,
109 if self
.vca_connection
.is_default
:
110 endpoints
= await controller
.api_endpoints
112 endpoint
in self
.vca_connection
.endpoints
for endpoint
in endpoints
114 await self
.vca_connection
.update_endpoints(endpoints
)
116 except asyncio
.CancelledError
as e
:
118 except Exception as e
:
120 "Failed connecting to controller: {}... {}".format(
121 self
.vca_connection
.data
.endpoints
, e
125 await self
.disconnect_controller(controller
)
127 raise JujuControllerFailedConnecting(
128 f
"Error connecting to Juju controller: {e}"
131 async def disconnect(self
):
133 # Cancel health check task
134 self
.health_check_task
.cancel()
135 self
.log
.debug("Libjuju disconnected!")
137 async def disconnect_model(self
, model
: Model
):
141 :param: model: Model that will be disconnected
143 await model
.disconnect()
145 async def disconnect_controller(self
, controller
: Controller
):
147 Disconnect controller
149 :param: controller: Controller that will be disconnected
152 await controller
.disconnect()
154 @retry(attempts
=3, delay
=5, timeout
=None)
155 async def add_model(self
, model_name
: str, cloud
: VcaCloud
):
159 :param: model_name: Model name
160 :param: cloud: Cloud object
164 controller
= await self
.get_controller()
167 # Block until other workers have finished model creation
168 while self
.creating_model
.locked():
169 await asyncio
.sleep(0.1)
172 async with self
.creating_model
:
173 if await self
.model_exists(model_name
, controller
=controller
):
175 self
.log
.debug("Creating model {}".format(model_name
))
176 model
= await controller
.add_model(
178 config
=self
.vca_connection
.data
.model_config
,
179 cloud_name
=cloud
.name
,
180 credential_name
=cloud
.credential_name
,
182 except juju
.errors
.JujuAPIError
as e
:
183 if "already exists" in e
.message
:
189 await self
.disconnect_model(model
)
190 await self
.disconnect_controller(controller
)
192 async def get_executed_actions(self
, model_name
: str) -> list:
194 Get executed/history of actions for a model.
196 :param: model_name: Model name, str.
197 :return: List of executed actions for a model.
200 executed_actions
= []
201 controller
= await self
.get_controller()
203 model
= await self
.get_model(controller
, model_name
)
204 # Get all unique action names
206 for application
in model
.applications
:
207 application_actions
= await self
.get_actions(application
, model_name
)
208 actions
.update(application_actions
)
209 # Get status of all actions
210 for application_action
in actions
:
211 app_action_status_list
= await model
.get_action_status(
212 name
=application_action
214 for action_id
, action_status
in app_action_status_list
.items():
217 "action": application_action
,
218 "status": action_status
,
220 # Get action output by id
221 action_status
= await model
.get_action_output(executed_action
["id"])
222 for k
, v
in action_status
.items():
223 executed_action
[k
] = v
224 executed_actions
.append(executed_action
)
225 except Exception as e
:
227 "Error in getting executed actions for model: {}. Error: {}".format(
233 await self
.disconnect_model(model
)
234 await self
.disconnect_controller(controller
)
235 return executed_actions
237 async def get_application_configs(
238 self
, model_name
: str, application_name
: str
241 Get available configs for an application.
243 :param: model_name: Model name, str.
244 :param: application_name: Application name, str.
246 :return: A dict which has key - action name, value - action description
249 application_configs
= {}
250 controller
= await self
.get_controller()
252 model
= await self
.get_model(controller
, model_name
)
253 application
= self
._get
_application
(
254 model
, application_name
=application_name
256 application_configs
= await application
.get_config()
257 except Exception as e
:
259 "Error in getting configs for application: {} in model: {}. Error: {}".format(
260 application_name
, model_name
, str(e
)
265 await self
.disconnect_model(model
)
266 await self
.disconnect_controller(controller
)
267 return application_configs
269 @retry(attempts
=3, delay
=5)
270 async def get_model(self
, controller
: Controller
, model_name
: str) -> Model
:
272 Get model from controller
274 :param: controller: Controller
275 :param: model_name: Model name
277 :return: Model: The created Juju model object
279 return await controller
.get_model(model_name
)
281 async def model_exists(
282 self
, model_name
: str, controller
: Controller
= None
285 Check if model exists
287 :param: controller: Controller
288 :param: model_name: Model name
292 need_to_disconnect
= False
294 # Get controller if not passed
296 controller
= await self
.get_controller()
297 need_to_disconnect
= True
299 # Check if model exists
301 return model_name
in await controller
.list_models()
303 if need_to_disconnect
:
304 await self
.disconnect_controller(controller
)
306 async def models_exist(self
, model_names
: [str]) -> (bool, list):
308 Check if models exists
310 :param: model_names: List of strings with model names
312 :return (bool, list[str]): (True if all models exists, List of model names that don't exist)
316 "model_names must be a non-empty array. Given value: {}".format(
320 non_existing_models
= []
321 models
= await self
.list_models()
322 existing_models
= list(set(models
).intersection(model_names
))
323 non_existing_models
= list(set(model_names
) - set(existing_models
))
326 len(non_existing_models
) == 0,
330 async def get_model_status(self
, model_name
: str) -> FullStatus
:
334 :param: model_name: Model name
336 :return: Full status object
338 controller
= await self
.get_controller()
339 model
= await self
.get_model(controller
, model_name
)
341 return await model
.get_status()
343 await self
.disconnect_model(model
)
344 await self
.disconnect_controller(controller
)
346 async def create_machine(
349 machine_id
: str = None,
350 db_dict
: dict = None,
351 progress_timeout
: float = None,
352 total_timeout
: float = None,
353 series
: str = "bionic",
355 ) -> (Machine
, bool):
359 :param: model_name: Model name
360 :param: machine_id: Machine id
361 :param: db_dict: Dictionary with data of the DB to write the updates
362 :param: progress_timeout: Maximum time between two updates in the model
363 :param: total_timeout: Timeout for the entity to be active
364 :param: series: Series of the machine (xenial, bionic, focal, ...)
365 :param: wait: Wait until machine is ready
367 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
368 if the machine is new or it already existed
374 "Creating machine (id={}) in model: {}".format(machine_id
, model_name
)
378 controller
= await self
.get_controller()
381 model
= await self
.get_model(controller
, model_name
)
383 if machine_id
is not None:
385 "Searching machine (id={}) in model {}".format(
386 machine_id
, model_name
390 # Get machines from model and get the machine with machine_id if exists
391 machines
= await model
.get_machines()
392 if machine_id
in machines
:
394 "Machine (id={}) found in model {}".format(
395 machine_id
, model_name
398 machine
= machines
[machine_id
]
400 raise JujuMachineNotFound("Machine {} not found".format(machine_id
))
403 self
.log
.debug("Creating a new machine in model {}".format(model_name
))
406 machine
= await model
.add_machine(
407 spec
=None, constraints
=None, disks
=None, series
=series
411 # Wait until the machine is ready
413 "Wait until machine {} is ready in model {}".format(
414 machine
.entity_id
, model_name
418 await JujuModelWatcher
.wait_for(
421 progress_timeout
=progress_timeout
,
422 total_timeout
=total_timeout
,
425 vca_id
=self
.vca_connection
._vca
_id
,
428 await self
.disconnect_model(model
)
429 await self
.disconnect_controller(controller
)
432 "Machine {} ready at {} in model {}".format(
433 machine
.entity_id
, machine
.dns_name
, model_name
438 async def provision_machine(
443 private_key_path
: str,
444 db_dict
: dict = None,
445 progress_timeout
: float = None,
446 total_timeout
: float = None,
449 Manually provisioning of a machine
451 :param: model_name: Model name
452 :param: hostname: IP to access the machine
453 :param: username: Username to login to the machine
454 :param: private_key_path: Local path for the private key
455 :param: db_dict: Dictionary with data of the DB to write the updates
456 :param: progress_timeout: Maximum time between two updates in the model
457 :param: total_timeout: Timeout for the entity to be active
459 :return: (Entity): Machine id
462 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
463 model_name
, hostname
, username
468 controller
= await self
.get_controller()
471 model
= await self
.get_model(controller
, model_name
)
475 provisioner
= AsyncSSHProvisioner(
478 private_key_path
=private_key_path
,
483 params
= await provisioner
.provision_machine()
485 params
.jobs
= ["JobHostUnits"]
487 self
.log
.debug("Adding machine to model")
488 connection
= model
.connection()
489 client_facade
= client
.ClientFacade
.from_connection(connection
)
491 results
= await client_facade
.AddMachines(params
=[params
])
492 error
= results
.machines
[0].error
495 msg
= "Error adding machine: {}".format(error
.message
)
496 self
.log
.error(msg
=msg
)
497 raise ValueError(msg
)
499 machine_id
= results
.machines
[0].machine
501 self
.log
.debug("Installing Juju agent into machine {}".format(machine_id
))
502 asyncio
.ensure_future(
503 provisioner
.install_agent(
504 connection
=connection
,
506 machine_id
=machine_id
,
507 proxy
=self
.vca_connection
.data
.api_proxy
,
508 series
=params
.series
,
514 machine_list
= await model
.get_machines()
515 if machine_id
in machine_list
:
516 self
.log
.debug("Machine {} found in model!".format(machine_id
))
517 machine
= model
.machines
.get(machine_id
)
519 await asyncio
.sleep(2)
522 msg
= "Machine {} not found in model".format(machine_id
)
523 self
.log
.error(msg
=msg
)
524 raise JujuMachineNotFound(msg
)
527 "Wait until machine {} is ready in model {}".format(
528 machine
.entity_id
, model_name
531 await JujuModelWatcher
.wait_for(
534 progress_timeout
=progress_timeout
,
535 total_timeout
=total_timeout
,
538 vca_id
=self
.vca_connection
._vca
_id
,
540 except Exception as e
:
543 await self
.disconnect_model(model
)
544 await self
.disconnect_controller(controller
)
547 "Machine provisioned {} in model {}".format(machine_id
, model_name
)
557 timeout
: float = 3600,
558 instantiation_params
: dict = None,
561 Deploy bundle or charm: Similar to the juju CLI command `juju deploy`
563 :param uri: Path or Charm Store uri in which the charm or bundle can be found
564 :param model_name: Model name
565 :param wait: Indicates whether to wait or not until all applications are active
566 :param timeout: Time in seconds to wait until all applications are active
567 :param instantiation_params: To be applied as overlay bundle over primary bundle.
569 controller
= await self
.get_controller()
570 model
= await self
.get_model(controller
, model_name
)
573 await self
._validate
_instantiation
_params
(uri
, model
, instantiation_params
)
574 overlays
= self
._get
_overlays
(model_name
, instantiation_params
)
575 await model
.deploy(uri
, trust
=True, overlays
=overlays
)
577 await JujuModelWatcher
.wait_for_model(model
, timeout
=timeout
)
578 self
.log
.debug("All units active in model {}".format(model_name
))
580 self
._remove
_overlay
_file
(overlays
)
581 await self
.disconnect_model(model
)
582 await self
.disconnect_controller(controller
)
584 async def _validate_instantiation_params(
585 self
, uri
: str, model
, instantiation_params
: dict
587 """Checks if all the applications in instantiation_params
588 exist ins the original bundle.
591 JujuApplicationNotFound if there is an invalid app in
592 the instantiation params.
594 overlay_apps
= self
._get
_apps
_in
_instantiation
_params
(instantiation_params
)
597 original_apps
= await self
._get
_apps
_in
_original
_bundle
(uri
, model
)
598 if not all(app
in original_apps
for app
in overlay_apps
):
599 raise JujuApplicationNotFound(
600 "Cannot find application {} in original bundle {}".format(
601 overlay_apps
, original_apps
605 async def _get_apps_in_original_bundle(self
, uri
: str, model
) -> set:
606 """Bundle is downloaded in BundleHandler.fetch_plan.
607 That method takes care of opening and exception handling.
609 Resolve method gets all the information regarding the channel,
610 track, revision, type, source.
613 Set with the names of the applications in original bundle.
616 architecture
= DEFAULT_ARCHITECTURE
# only AMD64 is allowed
617 res
= await model
.deploy_types
[str(url
.schema
)].resolve(
618 url
, architecture
, entity_url
=uri
620 handler
= BundleHandler(model
, trusted
=True, forced
=False)
621 await handler
.fetch_plan(url
, res
.origin
)
622 return handler
.applications
624 def _get_apps_in_instantiation_params(self
, instantiation_params
: dict) -> list:
625 """Extract applications key in instantiation params.
628 List with the names of the applications in instantiation params.
631 JujuError if applications key is not found.
633 if not instantiation_params
:
636 return [key
for key
in instantiation_params
.get("applications")]
637 except Exception as e
:
638 raise JujuError("Invalid overlay format. {}".format(str(e
)))
640 def _get_overlays(self
, model_name
: str, instantiation_params
: dict) -> list:
641 """Creates a temporary overlay file which includes the instantiation params.
642 Only one overlay file is created.
645 List with one overlay filename. Empty list if there are no instantiation params.
647 if not instantiation_params
:
649 file_name
= model_name
+ "-overlay.yaml"
650 self
._write
_overlay
_file
(file_name
, instantiation_params
)
653 def _write_overlay_file(self
, file_name
: str, instantiation_params
: dict) -> None:
654 with
open(file_name
, "w") as file:
655 yaml
.dump(instantiation_params
, file)
657 def _remove_overlay_file(self
, overlay
: list) -> None:
658 """Overlay contains either one or zero file names."""
662 filename
= overlay
[0]
666 "Overlay file {} could not be removed: {}".format(filename
, e
)
671 application_name
: str,
674 db_dict
: dict = None,
675 progress_timeout
: float = None,
676 total_timeout
: float = None,
680 :param: application_name: Application name
681 :param: model_name: Model name
682 :param: machine_id Machine id
683 :param: db_dict: Dictionary with data of the DB to write the updates
684 :param: progress_timeout: Maximum time between two updates in the model
685 :param: total_timeout: Timeout for the entity to be active
691 controller
= await self
.get_controller()
693 model
= await self
.get_model(controller
, model_name
)
694 application
= self
._get
_application
(model
, application_name
)
696 if application
is not None:
697 # Checks if the given machine id in the model,
698 # otherwise function raises an error
699 _machine
, _series
= self
._get
_machine
_info
(model
, machine_id
)
702 "Adding unit (machine {}) to application {} in model ~{}".format(
703 machine_id
, application_name
, model_name
707 await application
.add_unit(to
=machine_id
)
709 await JujuModelWatcher
.wait_for(
712 progress_timeout
=progress_timeout
,
713 total_timeout
=total_timeout
,
716 vca_id
=self
.vca_connection
._vca
_id
,
719 "Unit is added to application {} in model {}".format(
720 application_name
, model_name
724 raise JujuApplicationNotFound(
725 "Application {} not exists".format(application_name
)
729 await self
.disconnect_model(model
)
730 await self
.disconnect_controller(controller
)
732 async def destroy_unit(
734 application_name
: str,
737 total_timeout
: float = None,
741 :param: application_name: Application name
742 :param: model_name: Model name
743 :param: machine_id Machine id
744 :param: total_timeout: Timeout for the entity to be active
750 controller
= await self
.get_controller()
752 model
= await self
.get_model(controller
, model_name
)
753 application
= self
._get
_application
(model
, application_name
)
755 if application
is None:
756 raise JujuApplicationNotFound(
757 "Application not found: {} (model={})".format(
758 application_name
, model_name
762 unit
= self
._get
_unit
(application
, machine_id
)
765 "A unit with machine id {} not in available units".format(
770 unit_name
= unit
.name
773 "Destroying unit {} from application {} in model {}".format(
774 unit_name
, application_name
, model_name
777 await application
.destroy_unit(unit_name
)
780 "Waiting for unit {} to be destroyed in application {} (model={})...".format(
781 unit_name
, application_name
, model_name
785 # TODO: Add functionality in the Juju watcher to replace this kind of blocks
786 if total_timeout
is None:
788 end
= time
.time() + total_timeout
789 while time
.time() < end
:
790 if not self
._get
_unit
(application
, machine_id
):
792 "The unit {} was destroyed in application {} (model={}) ".format(
793 unit_name
, application_name
, model_name
797 await asyncio
.sleep(5)
799 "Unit {} is destroyed from application {} in model {}".format(
800 unit_name
, application_name
, model_name
805 await self
.disconnect_model(model
)
806 await self
.disconnect_controller(controller
)
808 async def deploy_charm(
810 application_name
: str,
814 db_dict
: dict = None,
815 progress_timeout
: float = None,
816 total_timeout
: float = None,
823 :param: application_name: Application name
824 :param: path: Local path to the charm
825 :param: model_name: Model name
826 :param: machine_id ID of the machine
827 :param: db_dict: Dictionary with data of the DB to write the updates
828 :param: progress_timeout: Maximum time between two updates in the model
829 :param: total_timeout: Timeout for the entity to be active
830 :param: config: Config for the charm
831 :param: series: Series of the charm
832 :param: num_units: Number of units
834 :return: (juju.application.Application): Juju application
837 "Deploying charm {} to machine {} in model ~{}".format(
838 application_name
, machine_id
, model_name
841 self
.log
.debug("charm: {}".format(path
))
844 controller
= await self
.get_controller()
847 model
= await self
.get_model(controller
, model_name
)
850 if application_name
not in model
.applications
:
851 if machine_id
is not None:
852 machine
, series
= self
._get
_machine
_info
(model
, machine_id
)
854 application
= await model
.deploy(
856 application_name
=application_name
,
865 "Wait until application {} is ready in model {}".format(
866 application_name
, model_name
870 for _
in range(num_units
- 1):
871 m
, _
= await self
.create_machine(model_name
, wait
=False)
872 await application
.add_unit(to
=m
.entity_id
)
874 await JujuModelWatcher
.wait_for(
877 progress_timeout
=progress_timeout
,
878 total_timeout
=total_timeout
,
881 vca_id
=self
.vca_connection
._vca
_id
,
884 "Application {} is ready in model {}".format(
885 application_name
, model_name
889 raise JujuApplicationExists(
890 "Application {} exists".format(application_name
)
892 except juju
.errors
.JujuError
as e
:
893 if "already exists" in e
.message
:
894 raise JujuApplicationExists(
895 "Application {} exists".format(application_name
)
900 await self
.disconnect_model(model
)
901 await self
.disconnect_controller(controller
)
905 async def upgrade_charm(
907 application_name
: str,
910 total_timeout
: float = None,
915 :param: application_name: Application name
916 :param: model_name: Model name
917 :param: path: Local path to the charm
918 :param: total_timeout: Timeout for the entity to be active
920 :return: (str, str): (output and status)
924 "Upgrading charm {} in model {} from path {}".format(
925 application_name
, model_name
, path
929 await self
.resolve_application(
930 model_name
=model_name
, application_name
=application_name
934 controller
= await self
.get_controller()
937 model
= await self
.get_model(controller
, model_name
)
941 application
= self
._get
_application
(
943 application_name
=application_name
,
945 if application
is None:
946 raise JujuApplicationNotFound(
947 "Cannot find application {} to upgrade".format(application_name
)
950 await application
.refresh(path
=path
)
953 "Wait until charm upgrade is completed for application {} (model={})".format(
954 application_name
, model_name
958 await JujuModelWatcher
.ensure_units_idle(
959 model
=model
, application
=application
962 if application
.status
== "error":
963 error_message
= "Unknown"
964 for unit
in application
.units
:
966 unit
.workload_status
== "error"
967 and unit
.workload_status_message
!= ""
969 error_message
= unit
.workload_status_message
971 message
= "Application {} failed update in {}: {}".format(
972 application_name
, model_name
, error_message
974 self
.log
.error(message
)
975 raise JujuError(message
=message
)
978 "Application {} is ready in model {}".format(
979 application_name
, model_name
984 await self
.disconnect_model(model
)
985 await self
.disconnect_controller(controller
)
989 async def resolve_application(self
, model_name
: str, application_name
: str):
990 controller
= await self
.get_controller()
991 model
= await self
.get_model(controller
, model_name
)
994 application
= self
._get
_application
(
996 application_name
=application_name
,
998 if application
is None:
999 raise JujuApplicationNotFound(
1000 "Cannot find application {} to resolve".format(application_name
)
1003 while application
.status
== "error":
1004 for unit
in application
.units
:
1005 if unit
.workload_status
== "error":
1007 "Model {}, Application {}, Unit {} in error state, resolving".format(
1008 model_name
, application_name
, unit
.entity_id
1012 await unit
.resolved(retry
=False)
1016 await asyncio
.sleep(1)
1019 await self
.disconnect_model(model
)
1020 await self
.disconnect_controller(controller
)
1022 async def resolve(self
, model_name
: str):
1023 controller
= await self
.get_controller()
1024 model
= await self
.get_model(controller
, model_name
)
1025 all_units_active
= False
1027 while not all_units_active
:
1028 all_units_active
= True
1029 for application_name
, application
in model
.applications
.items():
1030 if application
.status
== "error":
1031 for unit
in application
.units
:
1032 if unit
.workload_status
== "error":
1034 "Model {}, Application {}, Unit {} in error state, resolving".format(
1035 model_name
, application_name
, unit
.entity_id
1039 await unit
.resolved(retry
=False)
1040 all_units_active
= False
1044 if not all_units_active
:
1045 await asyncio
.sleep(5)
1047 await self
.disconnect_model(model
)
1048 await self
.disconnect_controller(controller
)
1050 async def scale_application(
1053 application_name
: str,
1055 total_timeout
: float = None,
1058 Scale application (K8s)
1060 :param: model_name: Model name
1061 :param: application_name: Application name
1062 :param: scale: Scale to which to set this application
1063 :param: total_timeout: Timeout for the entity to be active
1067 controller
= await self
.get_controller()
1069 model
= await self
.get_model(controller
, model_name
)
1072 "Scaling application {} in model {}".format(
1073 application_name
, model_name
1076 application
= self
._get
_application
(model
, application_name
)
1077 if application
is None:
1078 raise JujuApplicationNotFound("Cannot scale application")
1079 await application
.scale(scale
=scale
)
1080 # Wait until application is scaled in model
1082 "Waiting for application {} to be scaled in model {}...".format(
1083 application_name
, model_name
1086 if total_timeout
is None:
1087 total_timeout
= 1800
1088 end
= time
.time() + total_timeout
1089 while time
.time() < end
:
1090 application_scale
= self
._get
_application
_count
(model
, application_name
)
1091 # Before calling wait_for_model function,
1092 # wait until application unit count and scale count are equal.
1093 # Because there is a delay before scaling triggers in Juju model.
1094 if application_scale
== scale
:
1095 await JujuModelWatcher
.wait_for_model(
1096 model
=model
, timeout
=total_timeout
1099 "Application {} is scaled in model {}".format(
1100 application_name
, model_name
1104 await asyncio
.sleep(5)
1106 "Timeout waiting for application {} in model {} to be scaled".format(
1107 application_name
, model_name
1112 await self
.disconnect_model(model
)
1113 await self
.disconnect_controller(controller
)
1115 def _get_application_count(self
, model
: Model
, application_name
: str) -> int:
1116 """Get number of units of the application
1118 :param: model: Model object
1119 :param: application_name: Application name
1121 :return: int (or None if application doesn't exist)
1123 application
= self
._get
_application
(model
, application_name
)
1124 if application
is not None:
1125 return len(application
.units
)
1127 def _get_application(self
, model
: Model
, application_name
: str) -> Application
:
1130 :param: model: Model object
1131 :param: application_name: Application name
1133 :return: juju.application.Application (or None if it doesn't exist)
1135 if model
.applications
and application_name
in model
.applications
:
1136 return model
.applications
[application_name
]
1138 def _get_unit(self
, application
: Application
, machine_id
: str) -> Unit
:
1141 :param: application: Application object
1142 :param: machine_id: Machine id
1147 for u
in application
.units
:
1148 if u
.machine_id
== machine_id
:
1153 def _get_machine_info(
1160 :param: model: Model object
1161 :param: machine_id: Machine id
1163 :return: (str, str): (machine, series)
1165 if machine_id
not in model
.machines
:
1166 msg
= "Machine {} not found in model".format(machine_id
)
1167 self
.log
.error(msg
=msg
)
1168 raise JujuMachineNotFound(msg
)
1169 machine
= model
.machines
[machine_id
]
1170 return machine
, machine
.series
1172 async def execute_action(
1174 application_name
: str,
1177 db_dict
: dict = None,
1178 machine_id
: str = None,
1179 progress_timeout
: float = None,
1180 total_timeout
: float = None,
1185 :param: application_name: Application name
1186 :param: model_name: Model name
1187 :param: action_name: Name of the action
1188 :param: db_dict: Dictionary with data of the DB to write the updates
1189 :param: machine_id Machine id
1190 :param: progress_timeout: Maximum time between two updates in the model
1191 :param: total_timeout: Timeout for the entity to be active
1193 :return: (str, str): (output and status)
1196 "Executing action {} using params {}".format(action_name
, kwargs
)
1199 controller
= await self
.get_controller()
1202 model
= await self
.get_model(controller
, model_name
)
1206 application
= self
._get
_application
(
1208 application_name
=application_name
,
1210 if application
is None:
1211 raise JujuApplicationNotFound("Cannot execute action")
1213 # Ocassionally, self._get_leader_unit() will return None
1214 # because the leader elected hook has not been triggered yet.
1215 # Therefore, we are doing some retries. If it happens again,
1217 if machine_id
is None:
1218 unit
= await self
._get
_leader
_unit
(application
)
1220 "Action {} is being executed on the leader unit {}".format(
1221 action_name
, unit
.name
1225 unit
= self
._get
_unit
(application
, machine_id
)
1228 "A unit with machine id {} not in available units".format(
1233 "Action {} is being executed on {} unit".format(
1234 action_name
, unit
.name
1238 actions
= await application
.get_actions()
1240 if action_name
not in actions
:
1241 raise JujuActionNotFound(
1242 "Action {} not in available actions".format(action_name
)
1245 action
= await unit
.run_action(action_name
, **kwargs
)
1248 "Wait until action {} is completed in application {} (model={})".format(
1249 action_name
, application_name
, model_name
1252 await JujuModelWatcher
.wait_for(
1255 progress_timeout
=progress_timeout
,
1256 total_timeout
=total_timeout
,
1259 vca_id
=self
.vca_connection
._vca
_id
,
1262 output
= await model
.get_action_output(action_uuid
=action
.entity_id
)
1263 status
= await model
.get_action_status(uuid_or_prefix
=action
.entity_id
)
1265 status
[action
.entity_id
] if action
.entity_id
in status
else "failed"
1269 "Action {} completed with status {} in application {} (model={})".format(
1270 action_name
, action
.status
, application_name
, model_name
1274 await self
.disconnect_model(model
)
1275 await self
.disconnect_controller(controller
)
1277 return output
, status
1279 async def get_actions(self
, application_name
: str, model_name
: str) -> dict:
1280 """Get list of actions
1282 :param: application_name: Application name
1283 :param: model_name: Model name
1285 :return: Dict with this format
1287 "action_name": "Description of the action",
1292 "Getting list of actions for application {}".format(application_name
)
1296 controller
= await self
.get_controller()
1299 model
= await self
.get_model(controller
, model_name
)
1303 application
= self
._get
_application
(
1305 application_name
=application_name
,
1308 # Return list of actions
1309 return await application
.get_actions()
1312 # Disconnect from model and controller
1313 await self
.disconnect_model(model
)
1314 await self
.disconnect_controller(controller
)
1316 async def get_metrics(self
, model_name
: str, application_name
: str) -> dict:
1317 """Get the metrics collected by the VCA.
1319 :param model_name The name or unique id of the network service
1320 :param application_name The name of the application
1322 if not model_name
or not application_name
:
1323 raise Exception("model_name and application_name must be non-empty strings")
1325 controller
= await self
.get_controller()
1326 model
= await self
.get_model(controller
, model_name
)
1328 application
= self
._get
_application
(model
, application_name
)
1329 if application
is not None:
1330 metrics
= await application
.get_metrics()
1332 self
.disconnect_model(model
)
1333 self
.disconnect_controller(controller
)
1336 async def add_relation(
1344 :param: model_name: Model name
1345 :param: endpoint_1 First endpoint name
1346 ("app:endpoint" format or directly the saas name)
1347 :param: endpoint_2: Second endpoint name (^ same format)
1350 self
.log
.debug("Adding relation: {} -> {}".format(endpoint_1
, endpoint_2
))
1353 controller
= await self
.get_controller()
1356 model
= await self
.get_model(controller
, model_name
)
1360 await model
.add_relation(endpoint_1
, endpoint_2
)
1361 except juju
.errors
.JujuAPIError
as e
:
1362 if self
._relation
_is
_not
_found
(e
):
1363 self
.log
.warning("Relation not found: {}".format(e
.message
))
1365 if self
._relation
_already
_exist
(e
):
1366 self
.log
.warning("Relation already exists: {}".format(e
.message
))
1368 # another exception, raise it
1371 await self
.disconnect_model(model
)
1372 await self
.disconnect_controller(controller
)
1374 def _relation_is_not_found(self
, juju_error
):
1376 return (text
in juju_error
.message
) or (
1377 juju_error
.error_code
and text
in juju_error
.error_code
1380 def _relation_already_exist(self
, juju_error
):
1381 text
= "already exists"
1382 return (text
in juju_error
.message
) or (
1383 juju_error
.error_code
and text
in juju_error
.error_code
1386 async def offer(self
, endpoint
: RelationEndpoint
) -> Offer
:
1388 Create an offer from a RelationEndpoint
1390 :param: endpoint: Relation endpoint
1392 :return: Offer object
1394 model_name
= endpoint
.model_name
1395 offer_name
= f
"{endpoint.application_name}-{endpoint.endpoint_name}"
1396 controller
= await self
.get_controller()
1399 model
= await self
.get_model(controller
, model_name
)
1400 await model
.create_offer(endpoint
.endpoint
, offer_name
=offer_name
)
1401 offer_list
= await self
._list
_offers
(model_name
, offer_name
=offer_name
)
1403 return Offer(offer_list
[0].offer_url
)
1405 raise Exception("offer was not created")
1406 except juju
.errors
.JujuError
as e
:
1407 if "application offer already exists" not in e
.message
:
1411 self
.disconnect_model(model
)
1412 self
.disconnect_controller(controller
)
1418 provider_libjuju
: "Libjuju",
1421 Consumes a remote offer in the model. Relations can be created later using "juju relate".
1423 :param: model_name: Model name
1424 :param: offer: Offer object to consume
1425 :param: provider_libjuju: Libjuju object of the provider endpoint
1427 :raises ParseError if there's a problem parsing the offer_url
1428 :raises JujuError if remote offer includes and endpoint
1429 :raises JujuAPIError if the operation is not successful
1431 :returns: Saas name. It is the application name in the model that reference the remote application.
1433 saas_name
= f
'{offer.name}-{offer.model_name.replace("-", "")}'
1435 saas_name
= f
"{saas_name}-{offer.vca_id}"
1436 controller
= await self
.get_controller()
1438 provider_controller
= None
1440 model
= await controller
.get_model(model_name
)
1441 provider_controller
= await provider_libjuju
.get_controller()
1442 await model
.consume(
1443 offer
.url
, application_alias
=saas_name
, controller
=provider_controller
1448 await self
.disconnect_model(model
)
1449 if provider_controller
:
1450 await provider_libjuju
.disconnect_controller(provider_controller
)
1451 await self
.disconnect_controller(controller
)
1453 async def destroy_model(self
, model_name
: str, total_timeout
: float = 1800):
1457 :param: model_name: Model name
1458 :param: total_timeout: Timeout
1461 controller
= await self
.get_controller()
1464 if not await self
.model_exists(model_name
, controller
=controller
):
1465 self
.log
.warn(f
"Model {model_name} doesn't exist")
1468 self
.log
.debug(f
"Getting model {model_name} to be destroyed")
1469 model
= await self
.get_model(controller
, model_name
)
1470 self
.log
.debug(f
"Destroying manual machines in model {model_name}")
1471 # Destroy machines that are manually provisioned
1472 # and still are in pending state
1473 await self
._destroy
_pending
_machines
(model
, only_manual
=True)
1474 await self
.disconnect_model(model
)
1476 await asyncio
.wait_for(
1477 self
._destroy
_model
(model_name
, controller
),
1478 timeout
=total_timeout
,
1480 except Exception as e
:
1481 if not await self
.model_exists(model_name
, controller
=controller
):
1483 f
"Failed deleting model {model_name}: model doesn't exist"
1486 self
.log
.warn(f
"Failed deleting model {model_name}: {e}")
1490 await self
.disconnect_model(model
)
1491 await self
.disconnect_controller(controller
)
1493 async def _destroy_model(
1496 controller
: Controller
,
1499 Destroy model from controller
1501 :param: model: Model name to be removed
1502 :param: controller: Controller object
1503 :param: timeout: Timeout in seconds
1505 self
.log
.debug(f
"Destroying model {model_name}")
1507 async def _destroy_model_gracefully(model_name
: str, controller
: Controller
):
1508 self
.log
.info(f
"Gracefully deleting model {model_name}")
1510 while model_name
in await controller
.list_models():
1512 await self
.resolve(model_name
)
1514 await controller
.destroy_model(model_name
, destroy_storage
=True)
1516 await asyncio
.sleep(5)
1517 self
.log
.info(f
"Model {model_name} deleted gracefully")
1519 async def _destroy_model_forcefully(model_name
: str, controller
: Controller
):
1520 self
.log
.info(f
"Forcefully deleting model {model_name}")
1521 while model_name
in await controller
.list_models():
1522 await controller
.destroy_model(
1523 model_name
, destroy_storage
=True, force
=True, max_wait
=60
1525 await asyncio
.sleep(5)
1526 self
.log
.info(f
"Model {model_name} deleted forcefully")
1530 await asyncio
.wait_for(
1531 _destroy_model_gracefully(model_name
, controller
), timeout
=120
1533 except asyncio
.TimeoutError
:
1534 await _destroy_model_forcefully(model_name
, controller
)
1535 except juju
.errors
.JujuError
as e
:
1536 if any("has been removed" in error
for error
in e
.errors
):
1538 if any("model not found" in error
for error
in e
.errors
):
1542 async def destroy_application(
1543 self
, model_name
: str, application_name
: str, total_timeout
: float
1548 :param: model_name: Model name
1549 :param: application_name: Application name
1550 :param: total_timeout: Timeout
1553 controller
= await self
.get_controller()
1557 model
= await self
.get_model(controller
, model_name
)
1559 "Destroying application {} in model {}".format(
1560 application_name
, model_name
1563 application
= self
._get
_application
(model
, application_name
)
1565 await application
.destroy()
1567 self
.log
.warning("Application not found: {}".format(application_name
))
1570 "Waiting for application {} to be destroyed in model {}...".format(
1571 application_name
, model_name
1574 if total_timeout
is None:
1575 total_timeout
= 3600
1576 end
= time
.time() + total_timeout
1577 while time
.time() < end
:
1578 if not self
._get
_application
(model
, application_name
):
1580 "The application {} was destroyed in model {} ".format(
1581 application_name
, model_name
1585 await asyncio
.sleep(5)
1587 "Timeout waiting for application {} to be destroyed in model {}".format(
1588 application_name
, model_name
1592 if model
is not None:
1593 await self
.disconnect_model(model
)
1594 await self
.disconnect_controller(controller
)
1596 async def _destroy_pending_machines(self
, model
: Model
, only_manual
: bool = False):
1598 Destroy pending machines in a given model
1600 :param: only_manual: Bool that indicates only manually provisioned
1601 machines should be destroyed (if True), or that
1602 all pending machines should be destroyed
1604 status
= await model
.get_status()
1605 for machine_id
in status
.machines
:
1606 machine_status
= status
.machines
[machine_id
]
1607 if machine_status
.agent_status
.status
== "pending":
1608 if only_manual
and not machine_status
.instance_id
.startswith("manual:"):
1610 machine
= model
.machines
[machine_id
]
1611 await machine
.destroy(force
=True)
1613 async def configure_application(
1614 self
, model_name
: str, application_name
: str, config
: dict = None
1616 """Configure application
1618 :param: model_name: Model name
1619 :param: application_name: Application name
1620 :param: config: Config to apply to the charm
1622 self
.log
.debug("Configuring application {}".format(application_name
))
1625 controller
= await self
.get_controller()
1628 model
= await self
.get_model(controller
, model_name
)
1629 application
= self
._get
_application
(
1631 application_name
=application_name
,
1633 await application
.set_config(config
)
1636 await self
.disconnect_model(model
)
1637 await self
.disconnect_controller(controller
)
1639 async def health_check(self
, interval
: float = 300.0):
1641 Health check to make sure controller and controller_model connections are OK
1643 :param: interval: Time in seconds between checks
1648 controller
= await self
.get_controller()
1649 # self.log.debug("VCA is alive")
1650 except Exception as e
:
1651 self
.log
.error("Health check to VCA failed: {}".format(e
))
1653 await self
.disconnect_controller(controller
)
1654 await asyncio
.sleep(interval
)
1656 async def list_models(self
, contains
: str = None) -> [str]:
1657 """List models with certain names
1659 :param: contains: String that is contained in model name
1661 :retur: [models] Returns list of model names
1664 controller
= await self
.get_controller()
1666 models
= await controller
.list_models()
1668 models
= [model
for model
in models
if contains
in model
]
1671 await self
.disconnect_controller(controller
)
1673 async def _list_offers(
1674 self
, model_name
: str, offer_name
: str = None
1675 ) -> QueryApplicationOffersResults
:
1677 List offers within a model
1679 :param: model_name: Model name
1680 :param: offer_name: Offer name to filter.
1682 :return: Returns application offers results in the model
1685 controller
= await self
.get_controller()
1687 offers
= (await controller
.list_offers(model_name
)).results
1690 for offer
in offers
:
1691 if offer
.offer_name
== offer_name
:
1692 matching_offer
.append(offer
)
1694 offers
= matching_offer
1697 await self
.disconnect_controller(controller
)
1704 client_cert_data
: str,
1705 configuration
: Configuration
,
1707 credential_name
: str = None,
1710 Add a Kubernetes cloud to the controller
1712 Similar to the `juju add-k8s` command in the CLI
1714 :param: name: Name for the K8s cloud
1715 :param: configuration: Kubernetes configuration object
1716 :param: storage_class: Storage Class to use in the cloud
1717 :param: credential_name: Storage Class to use in the cloud
1720 if not storage_class
:
1721 raise Exception("storage_class must be a non-empty string")
1723 raise Exception("name must be a non-empty string")
1724 if not configuration
:
1725 raise Exception("configuration must be provided")
1727 endpoint
= configuration
.host
1728 credential
= self
.get_k8s_cloud_credential(
1733 credential
.attrs
[RBAC_LABEL_KEY_NAME
] = rbac_id
1734 cloud
= client
.Cloud(
1736 auth_types
=[credential
.auth_type
],
1738 ca_certificates
=[client_cert_data
],
1740 "operator-storage": storage_class
,
1741 "workload-storage": storage_class
,
1745 return await self
.add_cloud(
1746 name
, cloud
, credential
, credential_name
=credential_name
1749 def get_k8s_cloud_credential(
1751 configuration
: Configuration
,
1752 client_cert_data
: str,
1754 ) -> client
.CloudCredential
:
1756 # TODO: Test with AKS
1757 key
= None # open(configuration.key_file, "r").read()
1758 username
= configuration
.username
1759 password
= configuration
.password
1761 if client_cert_data
:
1762 attrs
["ClientCertificateData"] = client_cert_data
1764 attrs
["ClientKeyData"] = key
1766 if username
or password
:
1767 raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass")
1768 attrs
["Token"] = token
1772 auth_type
= "oauth2"
1773 if client_cert_data
:
1774 auth_type
= "oauth2withcert"
1776 raise JujuInvalidK8sConfiguration(
1777 "missing token for auth type {}".format(auth_type
)
1782 "credential for user {} has empty password".format(username
)
1784 attrs
["username"] = username
1785 attrs
["password"] = password
1786 if client_cert_data
:
1787 auth_type
= "userpasswithcert"
1789 auth_type
= "userpass"
1790 elif client_cert_data
and token
:
1791 auth_type
= "certificate"
1793 raise JujuInvalidK8sConfiguration("authentication method not supported")
1794 return client
.CloudCredential(auth_type
=auth_type
, attrs
=attrs
)
1796 async def add_cloud(
1800 credential
: CloudCredential
= None,
1801 credential_name
: str = None,
1804 Add cloud to the controller
1806 :param: name: Name of the cloud to be added
1807 :param: cloud: Cloud object
1808 :param: credential: CloudCredentials object for the cloud
1809 :param: credential_name: Credential name.
1810 If not defined, cloud of the name will be used.
1812 controller
= await self
.get_controller()
1814 _
= await controller
.add_cloud(name
, cloud
)
1816 await controller
.add_credential(
1817 credential_name
or name
, credential
=credential
, cloud
=name
1819 # Need to return the object returned by the controller.add_cloud() function
1820 # I'm returning the original value now until this bug is fixed:
1821 # https://github.com/juju/python-libjuju/issues/443
1824 await self
.disconnect_controller(controller
)
1826 async def remove_cloud(self
, name
: str):
1830 :param: name: Name of the cloud to be removed
1832 controller
= await self
.get_controller()
1834 await controller
.remove_cloud(name
)
1835 except juju
.errors
.JujuError
as e
:
1836 if len(e
.errors
) == 1 and f
'cloud "{name}" not found' == e
.errors
[0]:
1837 self
.log
.warning(f
"Cloud {name} not found, so it could not be deleted.")
1841 await self
.disconnect_controller(controller
)
1843 @retry(attempts
=20, delay
=5, fallback
=JujuLeaderUnitNotFound())
1844 async def _get_leader_unit(self
, application
: Application
) -> Unit
:
1846 for u
in application
.units
:
1847 if await u
.is_leader_from_status():
1854 async def get_cloud_credentials(self
, cloud
: Cloud
) -> typing
.List
:
1856 Get cloud credentials
1858 :param: cloud: Cloud object. The returned credentials will be from this cloud.
1860 :return: List of credentials object associated to the specified cloud
1863 controller
= await self
.get_controller()
1865 facade
= client
.CloudFacade
.from_connection(controller
.connection())
1866 cloud_cred_tag
= tag
.credential(
1867 cloud
.name
, self
.vca_connection
.data
.user
, cloud
.credential_name
1869 params
= [client
.Entity(cloud_cred_tag
)]
1870 return (await facade
.Credential(params
)).results
1872 await self
.disconnect_controller(controller
)
1874 async def check_application_exists(self
, model_name
, application_name
) -> bool:
1875 """Check application exists
1877 :param: model_name: Model Name
1878 :param: application_name: Application Name
1884 controller
= await self
.get_controller()
1886 model
= await self
.get_model(controller
, model_name
)
1888 "Checking if application {} exists in model {}".format(
1889 application_name
, model_name
1892 return self
._get
_application
(model
, application_name
) is not None
1895 await self
.disconnect_model(model
)
1896 await self
.disconnect_controller(controller
)