1 # Copyright 2020 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
24 from juju
.bundle
import BundleHandler
25 from juju
.model
import Model
26 from juju
.machine
import Machine
27 from juju
.application
import Application
28 from juju
.unit
import Unit
29 from juju
.url
import URL
30 from juju
.version
import DEFAULT_ARCHITECTURE
31 from juju
.client
._definitions
import (
33 QueryApplicationOffersResults
,
37 from juju
.controller
import Controller
38 from juju
.client
import client
41 from n2vc
.definitions
import Offer
, RelationEndpoint
42 from n2vc
.juju_watcher
import JujuModelWatcher
43 from n2vc
.provisioner
import AsyncSSHProvisioner
44 from n2vc
.n2vc_conn
import N2VCConnector
45 from n2vc
.exceptions
import (
47 JujuApplicationNotFound
,
48 JujuLeaderUnitNotFound
,
50 JujuControllerFailedConnecting
,
51 JujuApplicationExists
,
52 JujuInvalidK8sConfiguration
,
55 from n2vc
.vca
.cloud
import Cloud
as VcaCloud
56 from n2vc
.vca
.connection
import Connection
57 from kubernetes
.client
.configuration
import Configuration
58 from retrying_async
import retry
61 RBAC_LABEL_KEY_NAME
= "rbac-id"
67 vca_connection
: Connection
,
68 loop
: asyncio
.AbstractEventLoop
= None,
69 log
: logging
.Logger
= None,
70 n2vc
: N2VCConnector
= None,
75 :param: vca_connection: n2vc.vca.connection object
76 :param: loop: Asyncio loop
78 :param: n2vc: N2VC object
81 self
.log
= log
or logging
.getLogger("Libjuju")
83 self
.vca_connection
= vca_connection
85 self
.loop
= loop
or asyncio
.get_event_loop()
86 self
.loop
.set_exception_handler(self
.handle_exception
)
87 self
.creating_model
= asyncio
.Lock(loop
=self
.loop
)
89 if self
.vca_connection
.is_default
:
90 self
.health_check_task
= self
._create
_health
_check
_task
()
92 def _create_health_check_task(self
):
93 return self
.loop
.create_task(self
.health_check())
95 async def get_controller(self
, timeout
: float = 60.0) -> Controller
:
99 :param: timeout: Time in seconds to wait for controller to connect
103 controller
= Controller()
104 await asyncio
.wait_for(
106 endpoint
=self
.vca_connection
.data
.endpoints
,
107 username
=self
.vca_connection
.data
.user
,
108 password
=self
.vca_connection
.data
.secret
,
109 cacert
=self
.vca_connection
.data
.cacert
,
113 if self
.vca_connection
.is_default
:
114 endpoints
= await controller
.api_endpoints
116 endpoint
in self
.vca_connection
.endpoints
for endpoint
in endpoints
118 await self
.vca_connection
.update_endpoints(endpoints
)
120 except asyncio
.CancelledError
as e
:
122 except Exception as e
:
124 "Failed connecting to controller: {}... {}".format(
125 self
.vca_connection
.data
.endpoints
, e
129 await self
.disconnect_controller(controller
)
131 raise JujuControllerFailedConnecting(
132 f
"Error connecting to Juju controller: {e}"
135 async def disconnect(self
):
137 # Cancel health check task
138 self
.health_check_task
.cancel()
139 self
.log
.debug("Libjuju disconnected!")
141 async def disconnect_model(self
, model
: Model
):
145 :param: model: Model that will be disconnected
147 await model
.disconnect()
149 async def disconnect_controller(self
, controller
: Controller
):
151 Disconnect controller
153 :param: controller: Controller that will be disconnected
156 await controller
.disconnect()
158 @retry(attempts
=3, delay
=5, timeout
=None)
159 async def add_model(self
, model_name
: str, cloud
: VcaCloud
):
163 :param: model_name: Model name
164 :param: cloud: Cloud object
168 controller
= await self
.get_controller()
171 # Block until other workers have finished model creation
172 while self
.creating_model
.locked():
173 await asyncio
.sleep(0.1)
176 async with self
.creating_model
:
177 if await self
.model_exists(model_name
, controller
=controller
):
179 self
.log
.debug("Creating model {}".format(model_name
))
180 model
= await controller
.add_model(
182 config
=self
.vca_connection
.data
.model_config
,
183 cloud_name
=cloud
.name
,
184 credential_name
=cloud
.credential_name
,
186 except juju
.errors
.JujuAPIError
as e
:
187 if "already exists" in e
.message
:
193 await self
.disconnect_model(model
)
194 await self
.disconnect_controller(controller
)
196 async def get_executed_actions(self
, model_name
: str) -> list:
198 Get executed/history of actions for a model.
200 :param: model_name: Model name, str.
201 :return: List of executed actions for a model.
204 executed_actions
= []
205 controller
= await self
.get_controller()
207 model
= await self
.get_model(controller
, model_name
)
208 # Get all unique action names
210 for application
in model
.applications
:
211 application_actions
= await self
.get_actions(application
, model_name
)
212 actions
.update(application_actions
)
213 # Get status of all actions
214 for application_action
in actions
:
215 app_action_status_list
= await model
.get_action_status(
216 name
=application_action
218 for action_id
, action_status
in app_action_status_list
.items():
221 "action": application_action
,
222 "status": action_status
,
224 # Get action output by id
225 action_status
= await model
.get_action_output(executed_action
["id"])
226 for k
, v
in action_status
.items():
227 executed_action
[k
] = v
228 executed_actions
.append(executed_action
)
229 except Exception as e
:
231 "Error in getting executed actions for model: {}. Error: {}".format(
237 await self
.disconnect_model(model
)
238 await self
.disconnect_controller(controller
)
239 return executed_actions
241 async def get_application_configs(
242 self
, model_name
: str, application_name
: str
245 Get available configs for an application.
247 :param: model_name: Model name, str.
248 :param: application_name: Application name, str.
250 :return: A dict which has key - action name, value - action description
253 application_configs
= {}
254 controller
= await self
.get_controller()
256 model
= await self
.get_model(controller
, model_name
)
257 application
= self
._get
_application
(
258 model
, application_name
=application_name
260 application_configs
= await application
.get_config()
261 except Exception as e
:
263 "Error in getting configs for application: {} in model: {}. Error: {}".format(
264 application_name
, model_name
, str(e
)
269 await self
.disconnect_model(model
)
270 await self
.disconnect_controller(controller
)
271 return application_configs
273 @retry(attempts
=3, delay
=5)
274 async def get_model(self
, controller
: Controller
, model_name
: str) -> Model
:
276 Get model from controller
278 :param: controller: Controller
279 :param: model_name: Model name
281 :return: Model: The created Juju model object
283 return await controller
.get_model(model_name
)
285 async def model_exists(
286 self
, model_name
: str, controller
: Controller
= None
289 Check if model exists
291 :param: controller: Controller
292 :param: model_name: Model name
296 need_to_disconnect
= False
298 # Get controller if not passed
300 controller
= await self
.get_controller()
301 need_to_disconnect
= True
303 # Check if model exists
305 return model_name
in await controller
.list_models()
307 if need_to_disconnect
:
308 await self
.disconnect_controller(controller
)
310 async def models_exist(self
, model_names
: [str]) -> (bool, list):
312 Check if models exists
314 :param: model_names: List of strings with model names
316 :return (bool, list[str]): (True if all models exists, List of model names that don't exist)
320 "model_names must be a non-empty array. Given value: {}".format(
324 non_existing_models
= []
325 models
= await self
.list_models()
326 existing_models
= list(set(models
).intersection(model_names
))
327 non_existing_models
= list(set(model_names
) - set(existing_models
))
330 len(non_existing_models
) == 0,
334 async def get_model_status(self
, model_name
: str) -> FullStatus
:
338 :param: model_name: Model name
340 :return: Full status object
342 controller
= await self
.get_controller()
343 model
= await self
.get_model(controller
, model_name
)
345 return await model
.get_status()
347 await self
.disconnect_model(model
)
348 await self
.disconnect_controller(controller
)
350 async def create_machine(
353 machine_id
: str = None,
354 db_dict
: dict = None,
355 progress_timeout
: float = None,
356 total_timeout
: float = None,
357 series
: str = "bionic",
359 ) -> (Machine
, bool):
363 :param: model_name: Model name
364 :param: machine_id: Machine id
365 :param: db_dict: Dictionary with data of the DB to write the updates
366 :param: progress_timeout: Maximum time between two updates in the model
367 :param: total_timeout: Timeout for the entity to be active
368 :param: series: Series of the machine (xenial, bionic, focal, ...)
369 :param: wait: Wait until machine is ready
371 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
372 if the machine is new or it already existed
378 "Creating machine (id={}) in model: {}".format(machine_id
, model_name
)
382 controller
= await self
.get_controller()
385 model
= await self
.get_model(controller
, model_name
)
387 if machine_id
is not None:
389 "Searching machine (id={}) in model {}".format(
390 machine_id
, model_name
394 # Get machines from model and get the machine with machine_id if exists
395 machines
= await model
.get_machines()
396 if machine_id
in machines
:
398 "Machine (id={}) found in model {}".format(
399 machine_id
, model_name
402 machine
= machines
[machine_id
]
404 raise JujuMachineNotFound("Machine {} not found".format(machine_id
))
407 self
.log
.debug("Creating a new machine in model {}".format(model_name
))
410 machine
= await model
.add_machine(
411 spec
=None, constraints
=None, disks
=None, series
=series
415 # Wait until the machine is ready
417 "Wait until machine {} is ready in model {}".format(
418 machine
.entity_id
, model_name
422 await JujuModelWatcher
.wait_for(
425 progress_timeout
=progress_timeout
,
426 total_timeout
=total_timeout
,
429 vca_id
=self
.vca_connection
._vca
_id
,
432 await self
.disconnect_model(model
)
433 await self
.disconnect_controller(controller
)
436 "Machine {} ready at {} in model {}".format(
437 machine
.entity_id
, machine
.dns_name
, model_name
442 async def provision_machine(
447 private_key_path
: str,
448 db_dict
: dict = None,
449 progress_timeout
: float = None,
450 total_timeout
: float = None,
453 Manually provisioning of a machine
455 :param: model_name: Model name
456 :param: hostname: IP to access the machine
457 :param: username: Username to login to the machine
458 :param: private_key_path: Local path for the private key
459 :param: db_dict: Dictionary with data of the DB to write the updates
460 :param: progress_timeout: Maximum time between two updates in the model
461 :param: total_timeout: Timeout for the entity to be active
463 :return: (Entity): Machine id
466 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
467 model_name
, hostname
, username
472 controller
= await self
.get_controller()
475 model
= await self
.get_model(controller
, model_name
)
479 provisioner
= AsyncSSHProvisioner(
482 private_key_path
=private_key_path
,
487 params
= await provisioner
.provision_machine()
489 params
.jobs
= ["JobHostUnits"]
491 self
.log
.debug("Adding machine to model")
492 connection
= model
.connection()
493 client_facade
= client
.ClientFacade
.from_connection(connection
)
495 results
= await client_facade
.AddMachines(params
=[params
])
496 error
= results
.machines
[0].error
499 msg
= "Error adding machine: {}".format(error
.message
)
500 self
.log
.error(msg
=msg
)
501 raise ValueError(msg
)
503 machine_id
= results
.machines
[0].machine
505 self
.log
.debug("Installing Juju agent into machine {}".format(machine_id
))
506 asyncio
.ensure_future(
507 provisioner
.install_agent(
508 connection
=connection
,
510 machine_id
=machine_id
,
511 proxy
=self
.vca_connection
.data
.api_proxy
,
512 series
=params
.series
,
518 machine_list
= await model
.get_machines()
519 if machine_id
in machine_list
:
520 self
.log
.debug("Machine {} found in model!".format(machine_id
))
521 machine
= model
.machines
.get(machine_id
)
523 await asyncio
.sleep(2)
526 msg
= "Machine {} not found in model".format(machine_id
)
527 self
.log
.error(msg
=msg
)
528 raise JujuMachineNotFound(msg
)
531 "Wait until machine {} is ready in model {}".format(
532 machine
.entity_id
, model_name
535 await JujuModelWatcher
.wait_for(
538 progress_timeout
=progress_timeout
,
539 total_timeout
=total_timeout
,
542 vca_id
=self
.vca_connection
._vca
_id
,
544 except Exception as e
:
547 await self
.disconnect_model(model
)
548 await self
.disconnect_controller(controller
)
551 "Machine provisioned {} in model {}".format(machine_id
, model_name
)
561 timeout
: float = 3600,
562 instantiation_params
: dict = None,
565 Deploy bundle or charm: Similar to the juju CLI command `juju deploy`
567 :param uri: Path or Charm Store uri in which the charm or bundle can be found
568 :param model_name: Model name
569 :param wait: Indicates whether to wait or not until all applications are active
570 :param timeout: Time in seconds to wait until all applications are active
571 :param instantiation_params: To be applied as overlay bundle over primary bundle.
573 controller
= await self
.get_controller()
574 model
= await self
.get_model(controller
, model_name
)
577 await self
._validate
_instantiation
_params
(uri
, model
, instantiation_params
)
578 overlays
= self
._get
_overlays
(model_name
, instantiation_params
)
579 await model
.deploy(uri
, trust
=True, overlays
=overlays
)
581 await JujuModelWatcher
.wait_for_model(model
, timeout
=timeout
)
582 self
.log
.debug("All units active in model {}".format(model_name
))
584 self
._remove
_overlay
_file
(overlays
)
585 await self
.disconnect_model(model
)
586 await self
.disconnect_controller(controller
)
588 async def _validate_instantiation_params(
589 self
, uri
: str, model
, instantiation_params
: dict
591 """Checks if all the applications in instantiation_params
592 exist ins the original bundle.
595 JujuApplicationNotFound if there is an invalid app in
596 the instantiation params.
598 overlay_apps
= self
._get
_apps
_in
_instantiation
_params
(instantiation_params
)
601 original_apps
= await self
._get
_apps
_in
_original
_bundle
(uri
, model
)
602 if not all(app
in original_apps
for app
in overlay_apps
):
603 raise JujuApplicationNotFound(
604 "Cannot find application {} in original bundle {}".format(
605 overlay_apps
, original_apps
609 async def _get_apps_in_original_bundle(self
, uri
: str, model
) -> set:
610 """Bundle is downloaded in BundleHandler.fetch_plan.
611 That method takes care of opening and exception handling.
613 Resolve method gets all the information regarding the channel,
614 track, revision, type, source.
617 Set with the names of the applications in original bundle.
620 architecture
= DEFAULT_ARCHITECTURE
# only AMD64 is allowed
621 res
= await model
.deploy_types
[str(url
.schema
)].resolve(
622 url
, architecture
, entity_url
=uri
624 handler
= BundleHandler(model
, trusted
=True, forced
=False)
625 await handler
.fetch_plan(url
, res
.origin
)
626 return handler
.applications
628 def _get_apps_in_instantiation_params(self
, instantiation_params
: dict) -> list:
629 """Extract applications key in instantiation params.
632 List with the names of the applications in instantiation params.
635 JujuError if applications key is not found.
637 if not instantiation_params
:
640 return [key
for key
in instantiation_params
.get("applications")]
641 except Exception as e
:
642 raise JujuError("Invalid overlay format. {}".format(str(e
)))
644 def _get_overlays(self
, model_name
: str, instantiation_params
: dict) -> list:
645 """Creates a temporary overlay file which includes the instantiation params.
646 Only one overlay file is created.
649 List with one overlay filename. Empty list if there are no instantiation params.
651 if not instantiation_params
:
653 file_name
= model_name
+ "-overlay.yaml"
654 self
._write
_overlay
_file
(file_name
, instantiation_params
)
657 def _write_overlay_file(self
, file_name
: str, instantiation_params
: dict) -> None:
658 with
open(file_name
, "w") as file:
659 yaml
.dump(instantiation_params
, file)
661 def _remove_overlay_file(self
, overlay
: list) -> None:
662 """Overlay contains either one or zero file names."""
666 filename
= overlay
[0]
670 "Overlay file {} could not be removed: {}".format(filename
, e
)
675 application_name
: str,
678 db_dict
: dict = None,
679 progress_timeout
: float = None,
680 total_timeout
: float = None,
684 :param: application_name: Application name
685 :param: model_name: Model name
686 :param: machine_id Machine id
687 :param: db_dict: Dictionary with data of the DB to write the updates
688 :param: progress_timeout: Maximum time between two updates in the model
689 :param: total_timeout: Timeout for the entity to be active
695 controller
= await self
.get_controller()
697 model
= await self
.get_model(controller
, model_name
)
698 application
= self
._get
_application
(model
, application_name
)
700 if application
is not None:
701 # Checks if the given machine id in the model,
702 # otherwise function raises an error
703 _machine
, _series
= self
._get
_machine
_info
(model
, machine_id
)
706 "Adding unit (machine {}) to application {} in model ~{}".format(
707 machine_id
, application_name
, model_name
711 await application
.add_unit(to
=machine_id
)
713 await JujuModelWatcher
.wait_for(
716 progress_timeout
=progress_timeout
,
717 total_timeout
=total_timeout
,
720 vca_id
=self
.vca_connection
._vca
_id
,
723 "Unit is added to application {} in model {}".format(
724 application_name
, model_name
728 raise JujuApplicationNotFound(
729 "Application {} not exists".format(application_name
)
733 await self
.disconnect_model(model
)
734 await self
.disconnect_controller(controller
)
736 async def destroy_unit(
738 application_name
: str,
741 total_timeout
: float = None,
745 :param: application_name: Application name
746 :param: model_name: Model name
747 :param: machine_id Machine id
748 :param: total_timeout: Timeout for the entity to be active
754 controller
= await self
.get_controller()
756 model
= await self
.get_model(controller
, model_name
)
757 application
= self
._get
_application
(model
, application_name
)
759 if application
is None:
760 raise JujuApplicationNotFound(
761 "Application not found: {} (model={})".format(
762 application_name
, model_name
766 unit
= self
._get
_unit
(application
, machine_id
)
769 "A unit with machine id {} not in available units".format(
774 unit_name
= unit
.name
777 "Destroying unit {} from application {} in model {}".format(
778 unit_name
, application_name
, model_name
781 await application
.destroy_unit(unit_name
)
784 "Waiting for unit {} to be destroyed in application {} (model={})...".format(
785 unit_name
, application_name
, model_name
789 # TODO: Add functionality in the Juju watcher to replace this kind of blocks
790 if total_timeout
is None:
792 end
= time
.time() + total_timeout
793 while time
.time() < end
:
794 if not self
._get
_unit
(application
, machine_id
):
796 "The unit {} was destroyed in application {} (model={}) ".format(
797 unit_name
, application_name
, model_name
801 await asyncio
.sleep(5)
803 "Unit {} is destroyed from application {} in model {}".format(
804 unit_name
, application_name
, model_name
809 await self
.disconnect_model(model
)
810 await self
.disconnect_controller(controller
)
812 async def deploy_charm(
814 application_name
: str,
818 db_dict
: dict = None,
819 progress_timeout
: float = None,
820 total_timeout
: float = None,
827 :param: application_name: Application name
828 :param: path: Local path to the charm
829 :param: model_name: Model name
830 :param: machine_id ID of the machine
831 :param: db_dict: Dictionary with data of the DB to write the updates
832 :param: progress_timeout: Maximum time between two updates in the model
833 :param: total_timeout: Timeout for the entity to be active
834 :param: config: Config for the charm
835 :param: series: Series of the charm
836 :param: num_units: Number of units
838 :return: (juju.application.Application): Juju application
841 "Deploying charm {} to machine {} in model ~{}".format(
842 application_name
, machine_id
, model_name
845 self
.log
.debug("charm: {}".format(path
))
848 controller
= await self
.get_controller()
851 model
= await self
.get_model(controller
, model_name
)
854 if application_name
not in model
.applications
:
855 if machine_id
is not None:
856 machine
, series
= self
._get
_machine
_info
(model
, machine_id
)
858 application
= await model
.deploy(
860 application_name
=application_name
,
869 "Wait until application {} is ready in model {}".format(
870 application_name
, model_name
874 for _
in range(num_units
- 1):
875 m
, _
= await self
.create_machine(model_name
, wait
=False)
876 await application
.add_unit(to
=m
.entity_id
)
878 await JujuModelWatcher
.wait_for(
881 progress_timeout
=progress_timeout
,
882 total_timeout
=total_timeout
,
885 vca_id
=self
.vca_connection
._vca
_id
,
888 "Application {} is ready in model {}".format(
889 application_name
, model_name
893 raise JujuApplicationExists(
894 "Application {} exists".format(application_name
)
896 except juju
.errors
.JujuError
as e
:
897 if "already exists" in e
.message
:
898 raise JujuApplicationExists(
899 "Application {} exists".format(application_name
)
904 await self
.disconnect_model(model
)
905 await self
.disconnect_controller(controller
)
909 async def upgrade_charm(
911 application_name
: str,
914 total_timeout
: float = None,
919 :param: application_name: Application name
920 :param: model_name: Model name
921 :param: path: Local path to the charm
922 :param: total_timeout: Timeout for the entity to be active
924 :return: (str, str): (output and status)
928 "Upgrading charm {} in model {} from path {}".format(
929 application_name
, model_name
, path
933 await self
.resolve_application(
934 model_name
=model_name
, application_name
=application_name
938 controller
= await self
.get_controller()
941 model
= await self
.get_model(controller
, model_name
)
945 application
= self
._get
_application
(
947 application_name
=application_name
,
949 if application
is None:
950 raise JujuApplicationNotFound(
951 "Cannot find application {} to upgrade".format(application_name
)
954 await application
.refresh(path
=path
)
957 "Wait until charm upgrade is completed for application {} (model={})".format(
958 application_name
, model_name
962 await JujuModelWatcher
.ensure_units_idle(
963 model
=model
, application
=application
966 if application
.status
== "error":
967 error_message
= "Unknown"
968 for unit
in application
.units
:
970 unit
.workload_status
== "error"
971 and unit
.workload_status_message
!= ""
973 error_message
= unit
.workload_status_message
975 message
= "Application {} failed update in {}: {}".format(
976 application_name
, model_name
, error_message
978 self
.log
.error(message
)
979 raise JujuError(message
=message
)
982 "Application {} is ready in model {}".format(
983 application_name
, model_name
988 await self
.disconnect_model(model
)
989 await self
.disconnect_controller(controller
)
993 async def resolve_application(self
, model_name
: str, application_name
: str):
994 controller
= await self
.get_controller()
995 model
= await self
.get_model(controller
, model_name
)
998 application
= self
._get
_application
(
1000 application_name
=application_name
,
1002 if application
is None:
1003 raise JujuApplicationNotFound(
1004 "Cannot find application {} to resolve".format(application_name
)
1007 while application
.status
== "error":
1008 for unit
in application
.units
:
1009 if unit
.workload_status
== "error":
1011 "Model {}, Application {}, Unit {} in error state, resolving".format(
1012 model_name
, application_name
, unit
.entity_id
1016 await unit
.resolved(retry
=False)
1020 await asyncio
.sleep(1)
1023 await self
.disconnect_model(model
)
1024 await self
.disconnect_controller(controller
)
1026 async def resolve(self
, model_name
: str):
1027 controller
= await self
.get_controller()
1028 model
= await self
.get_model(controller
, model_name
)
1029 all_units_active
= False
1031 while not all_units_active
:
1032 all_units_active
= True
1033 for application_name
, application
in model
.applications
.items():
1034 if application
.status
== "error":
1035 for unit
in application
.units
:
1036 if unit
.workload_status
== "error":
1038 "Model {}, Application {}, Unit {} in error state, resolving".format(
1039 model_name
, application_name
, unit
.entity_id
1043 await unit
.resolved(retry
=False)
1044 all_units_active
= False
1048 if not all_units_active
:
1049 await asyncio
.sleep(5)
1051 await self
.disconnect_model(model
)
1052 await self
.disconnect_controller(controller
)
1054 async def scale_application(
1057 application_name
: str,
1059 total_timeout
: float = None,
1062 Scale application (K8s)
1064 :param: model_name: Model name
1065 :param: application_name: Application name
1066 :param: scale: Scale to which to set this application
1067 :param: total_timeout: Timeout for the entity to be active
1071 controller
= await self
.get_controller()
1073 model
= await self
.get_model(controller
, model_name
)
1076 "Scaling application {} in model {}".format(
1077 application_name
, model_name
1080 application
= self
._get
_application
(model
, application_name
)
1081 if application
is None:
1082 raise JujuApplicationNotFound("Cannot scale application")
1083 await application
.scale(scale
=scale
)
1084 # Wait until application is scaled in model
1086 "Waiting for application {} to be scaled in model {}...".format(
1087 application_name
, model_name
1090 if total_timeout
is None:
1091 total_timeout
= 1800
1092 end
= time
.time() + total_timeout
1093 while time
.time() < end
:
1094 application_scale
= self
._get
_application
_count
(model
, application_name
)
1095 # Before calling wait_for_model function,
1096 # wait until application unit count and scale count are equal.
1097 # Because there is a delay before scaling triggers in Juju model.
1098 if application_scale
== scale
:
1099 await JujuModelWatcher
.wait_for_model(
1100 model
=model
, timeout
=total_timeout
1103 "Application {} is scaled in model {}".format(
1104 application_name
, model_name
1108 await asyncio
.sleep(5)
1110 "Timeout waiting for application {} in model {} to be scaled".format(
1111 application_name
, model_name
1116 await self
.disconnect_model(model
)
1117 await self
.disconnect_controller(controller
)
1119 def _get_application_count(self
, model
: Model
, application_name
: str) -> int:
1120 """Get number of units of the application
1122 :param: model: Model object
1123 :param: application_name: Application name
1125 :return: int (or None if application doesn't exist)
1127 application
= self
._get
_application
(model
, application_name
)
1128 if application
is not None:
1129 return len(application
.units
)
1131 def _get_application(self
, model
: Model
, application_name
: str) -> Application
:
1134 :param: model: Model object
1135 :param: application_name: Application name
1137 :return: juju.application.Application (or None if it doesn't exist)
1139 if model
.applications
and application_name
in model
.applications
:
1140 return model
.applications
[application_name
]
1142 def _get_unit(self
, application
: Application
, machine_id
: str) -> Unit
:
1145 :param: application: Application object
1146 :param: machine_id: Machine id
1151 for u
in application
.units
:
1152 if u
.machine_id
== machine_id
:
1157 def _get_machine_info(
1164 :param: model: Model object
1165 :param: machine_id: Machine id
1167 :return: (str, str): (machine, series)
1169 if machine_id
not in model
.machines
:
1170 msg
= "Machine {} not found in model".format(machine_id
)
1171 self
.log
.error(msg
=msg
)
1172 raise JujuMachineNotFound(msg
)
1173 machine
= model
.machines
[machine_id
]
1174 return machine
, machine
.series
1176 async def execute_action(
1178 application_name
: str,
1181 db_dict
: dict = None,
1182 machine_id
: str = None,
1183 progress_timeout
: float = None,
1184 total_timeout
: float = None,
1189 :param: application_name: Application name
1190 :param: model_name: Model name
1191 :param: action_name: Name of the action
1192 :param: db_dict: Dictionary with data of the DB to write the updates
1193 :param: machine_id Machine id
1194 :param: progress_timeout: Maximum time between two updates in the model
1195 :param: total_timeout: Timeout for the entity to be active
1197 :return: (str, str): (output and status)
1200 "Executing action {} using params {}".format(action_name
, kwargs
)
1203 controller
= await self
.get_controller()
1206 model
= await self
.get_model(controller
, model_name
)
1210 application
= self
._get
_application
(
1212 application_name
=application_name
,
1214 if application
is None:
1215 raise JujuApplicationNotFound("Cannot execute action")
1217 # Ocassionally, self._get_leader_unit() will return None
1218 # because the leader elected hook has not been triggered yet.
1219 # Therefore, we are doing some retries. If it happens again,
1221 if machine_id
is None:
1222 unit
= await self
._get
_leader
_unit
(application
)
1224 "Action {} is being executed on the leader unit {}".format(
1225 action_name
, unit
.name
1229 unit
= self
._get
_unit
(application
, machine_id
)
1232 "A unit with machine id {} not in available units".format(
1237 "Action {} is being executed on {} unit".format(
1238 action_name
, unit
.name
1242 actions
= await application
.get_actions()
1244 if action_name
not in actions
:
1245 raise JujuActionNotFound(
1246 "Action {} not in available actions".format(action_name
)
1249 action
= await unit
.run_action(action_name
, **kwargs
)
1252 "Wait until action {} is completed in application {} (model={})".format(
1253 action_name
, application_name
, model_name
1256 await JujuModelWatcher
.wait_for(
1259 progress_timeout
=progress_timeout
,
1260 total_timeout
=total_timeout
,
1263 vca_id
=self
.vca_connection
._vca
_id
,
1266 output
= await model
.get_action_output(action_uuid
=action
.entity_id
)
1267 status
= await model
.get_action_status(uuid_or_prefix
=action
.entity_id
)
1269 status
[action
.entity_id
] if action
.entity_id
in status
else "failed"
1273 "Action {} completed with status {} in application {} (model={})".format(
1274 action_name
, action
.status
, application_name
, model_name
1278 await self
.disconnect_model(model
)
1279 await self
.disconnect_controller(controller
)
1281 return output
, status
1283 async def get_actions(self
, application_name
: str, model_name
: str) -> dict:
1284 """Get list of actions
1286 :param: application_name: Application name
1287 :param: model_name: Model name
1289 :return: Dict with this format
1291 "action_name": "Description of the action",
1296 "Getting list of actions for application {}".format(application_name
)
1300 controller
= await self
.get_controller()
1303 model
= await self
.get_model(controller
, model_name
)
1307 application
= self
._get
_application
(
1309 application_name
=application_name
,
1312 # Return list of actions
1313 return await application
.get_actions()
1316 # Disconnect from model and controller
1317 await self
.disconnect_model(model
)
1318 await self
.disconnect_controller(controller
)
1320 async def get_metrics(self
, model_name
: str, application_name
: str) -> dict:
1321 """Get the metrics collected by the VCA.
1323 :param model_name The name or unique id of the network service
1324 :param application_name The name of the application
1326 if not model_name
or not application_name
:
1327 raise Exception("model_name and application_name must be non-empty strings")
1329 controller
= await self
.get_controller()
1330 model
= await self
.get_model(controller
, model_name
)
1332 application
= self
._get
_application
(model
, application_name
)
1333 if application
is not None:
1334 metrics
= await application
.get_metrics()
1336 self
.disconnect_model(model
)
1337 self
.disconnect_controller(controller
)
1340 async def add_relation(
1348 :param: model_name: Model name
1349 :param: endpoint_1 First endpoint name
1350 ("app:endpoint" format or directly the saas name)
1351 :param: endpoint_2: Second endpoint name (^ same format)
1354 self
.log
.debug("Adding relation: {} -> {}".format(endpoint_1
, endpoint_2
))
1357 controller
= await self
.get_controller()
1360 model
= await self
.get_model(controller
, model_name
)
1364 await model
.add_relation(endpoint_1
, endpoint_2
)
1365 except juju
.errors
.JujuAPIError
as e
:
1366 if self
._relation
_is
_not
_found
(e
):
1367 self
.log
.warning("Relation not found: {}".format(e
.message
))
1369 if self
._relation
_already
_exist
(e
):
1370 self
.log
.warning("Relation already exists: {}".format(e
.message
))
1372 # another exception, raise it
1375 await self
.disconnect_model(model
)
1376 await self
.disconnect_controller(controller
)
1378 def _relation_is_not_found(self
, juju_error
):
1380 return (text
in juju_error
.message
) or (
1381 juju_error
.error_code
and text
in juju_error
.error_code
1384 def _relation_already_exist(self
, juju_error
):
1385 text
= "already exists"
1386 return (text
in juju_error
.message
) or (
1387 juju_error
.error_code
and text
in juju_error
.error_code
1390 async def offer(self
, endpoint
: RelationEndpoint
) -> Offer
:
1392 Create an offer from a RelationEndpoint
1394 :param: endpoint: Relation endpoint
1396 :return: Offer object
1398 model_name
= endpoint
.model_name
1399 offer_name
= f
"{endpoint.application_name}-{endpoint.endpoint_name}"
1400 controller
= await self
.get_controller()
1403 model
= await self
.get_model(controller
, model_name
)
1404 await model
.create_offer(endpoint
.endpoint
, offer_name
=offer_name
)
1405 offer_list
= await self
._list
_offers
(model_name
, offer_name
=offer_name
)
1407 return Offer(offer_list
[0].offer_url
)
1409 raise Exception("offer was not created")
1410 except juju
.errors
.JujuError
as e
:
1411 if "application offer already exists" not in e
.message
:
1415 self
.disconnect_model(model
)
1416 self
.disconnect_controller(controller
)
1422 provider_libjuju
: "Libjuju",
1425 Consumes a remote offer in the model. Relations can be created later using "juju relate".
1427 :param: model_name: Model name
1428 :param: offer: Offer object to consume
1429 :param: provider_libjuju: Libjuju object of the provider endpoint
1431 :raises ParseError if there's a problem parsing the offer_url
1432 :raises JujuError if remote offer includes and endpoint
1433 :raises JujuAPIError if the operation is not successful
1435 :returns: Saas name. It is the application name in the model that reference the remote application.
1437 saas_name
= f
'{offer.name}-{offer.model_name.replace("-", "")}'
1439 saas_name
= f
"{saas_name}-{offer.vca_id}"
1440 controller
= await self
.get_controller()
1442 provider_controller
= None
1444 model
= await controller
.get_model(model_name
)
1445 provider_controller
= await provider_libjuju
.get_controller()
1446 await model
.consume(
1447 offer
.url
, application_alias
=saas_name
, controller
=provider_controller
1452 await self
.disconnect_model(model
)
1453 if provider_controller
:
1454 await provider_libjuju
.disconnect_controller(provider_controller
)
1455 await self
.disconnect_controller(controller
)
1457 async def destroy_model(self
, model_name
: str, total_timeout
: float = 1800):
1461 :param: model_name: Model name
1462 :param: total_timeout: Timeout
1465 controller
= await self
.get_controller()
1468 if not await self
.model_exists(model_name
, controller
=controller
):
1469 self
.log
.warn(f
"Model {model_name} doesn't exist")
1472 self
.log
.debug(f
"Getting model {model_name} to be destroyed")
1473 model
= await self
.get_model(controller
, model_name
)
1474 self
.log
.debug(f
"Destroying manual machines in model {model_name}")
1475 # Destroy machines that are manually provisioned
1476 # and still are in pending state
1477 await self
._destroy
_pending
_machines
(model
, only_manual
=True)
1478 await self
.disconnect_model(model
)
1480 await asyncio
.wait_for(
1481 self
._destroy
_model
(model_name
, controller
),
1482 timeout
=total_timeout
,
1484 except Exception as e
:
1485 if not await self
.model_exists(model_name
, controller
=controller
):
1487 f
"Failed deleting model {model_name}: model doesn't exist"
1490 self
.log
.warn(f
"Failed deleting model {model_name}: {e}")
1494 await self
.disconnect_model(model
)
1495 await self
.disconnect_controller(controller
)
1497 async def _destroy_model(
1500 controller
: Controller
,
1503 Destroy model from controller
1505 :param: model: Model name to be removed
1506 :param: controller: Controller object
1507 :param: timeout: Timeout in seconds
1509 self
.log
.debug(f
"Destroying model {model_name}")
1511 async def _destroy_model_gracefully(model_name
: str, controller
: Controller
):
1512 self
.log
.info(f
"Gracefully deleting model {model_name}")
1514 while model_name
in await controller
.list_models():
1516 await self
.resolve(model_name
)
1518 await controller
.destroy_model(model_name
, destroy_storage
=True)
1520 await asyncio
.sleep(5)
1521 self
.log
.info(f
"Model {model_name} deleted gracefully")
1523 async def _destroy_model_forcefully(model_name
: str, controller
: Controller
):
1524 self
.log
.info(f
"Forcefully deleting model {model_name}")
1525 while model_name
in await controller
.list_models():
1526 await controller
.destroy_model(
1527 model_name
, destroy_storage
=True, force
=True, max_wait
=60
1529 await asyncio
.sleep(5)
1530 self
.log
.info(f
"Model {model_name} deleted forcefully")
1534 await asyncio
.wait_for(
1535 _destroy_model_gracefully(model_name
, controller
), timeout
=120
1537 except asyncio
.TimeoutError
:
1538 await _destroy_model_forcefully(model_name
, controller
)
1539 except juju
.errors
.JujuError
as e
:
1540 if any("has been removed" in error
for error
in e
.errors
):
1542 if any("model not found" in error
for error
in e
.errors
):
1546 async def destroy_application(
1547 self
, model_name
: str, application_name
: str, total_timeout
: float
1552 :param: model_name: Model name
1553 :param: application_name: Application name
1554 :param: total_timeout: Timeout
1557 controller
= await self
.get_controller()
1561 model
= await self
.get_model(controller
, model_name
)
1563 "Destroying application {} in model {}".format(
1564 application_name
, model_name
1567 application
= self
._get
_application
(model
, application_name
)
1569 await application
.destroy()
1571 self
.log
.warning("Application not found: {}".format(application_name
))
1574 "Waiting for application {} to be destroyed in model {}...".format(
1575 application_name
, model_name
1578 if total_timeout
is None:
1579 total_timeout
= 3600
1580 end
= time
.time() + total_timeout
1581 while time
.time() < end
:
1582 if not self
._get
_application
(model
, application_name
):
1584 "The application {} was destroyed in model {} ".format(
1585 application_name
, model_name
1589 await asyncio
.sleep(5)
1591 "Timeout waiting for application {} to be destroyed in model {}".format(
1592 application_name
, model_name
1596 if model
is not None:
1597 await self
.disconnect_model(model
)
1598 await self
.disconnect_controller(controller
)
1600 async def _destroy_pending_machines(self
, model
: Model
, only_manual
: bool = False):
1602 Destroy pending machines in a given model
1604 :param: only_manual: Bool that indicates only manually provisioned
1605 machines should be destroyed (if True), or that
1606 all pending machines should be destroyed
1608 status
= await model
.get_status()
1609 for machine_id
in status
.machines
:
1610 machine_status
= status
.machines
[machine_id
]
1611 if machine_status
.agent_status
.status
== "pending":
1612 if only_manual
and not machine_status
.instance_id
.startswith("manual:"):
1614 machine
= model
.machines
[machine_id
]
1615 await machine
.destroy(force
=True)
1617 async def configure_application(
1618 self
, model_name
: str, application_name
: str, config
: dict = None
1620 """Configure application
1622 :param: model_name: Model name
1623 :param: application_name: Application name
1624 :param: config: Config to apply to the charm
1626 self
.log
.debug("Configuring application {}".format(application_name
))
1629 controller
= await self
.get_controller()
1632 model
= await self
.get_model(controller
, model_name
)
1633 application
= self
._get
_application
(
1635 application_name
=application_name
,
1637 await application
.set_config(config
)
1640 await self
.disconnect_model(model
)
1641 await self
.disconnect_controller(controller
)
1643 def handle_exception(self
, loop
, context
):
1644 # All unhandled exceptions by libjuju are handled here.
1647 async def health_check(self
, interval
: float = 300.0):
1649 Health check to make sure controller and controller_model connections are OK
1651 :param: interval: Time in seconds between checks
1656 controller
= await self
.get_controller()
1657 # self.log.debug("VCA is alive")
1658 except Exception as e
:
1659 self
.log
.error("Health check to VCA failed: {}".format(e
))
1661 await self
.disconnect_controller(controller
)
1662 await asyncio
.sleep(interval
)
1664 async def list_models(self
, contains
: str = None) -> [str]:
1665 """List models with certain names
1667 :param: contains: String that is contained in model name
1669 :retur: [models] Returns list of model names
1672 controller
= await self
.get_controller()
1674 models
= await controller
.list_models()
1676 models
= [model
for model
in models
if contains
in model
]
1679 await self
.disconnect_controller(controller
)
1681 async def _list_offers(
1682 self
, model_name
: str, offer_name
: str = None
1683 ) -> QueryApplicationOffersResults
:
1685 List offers within a model
1687 :param: model_name: Model name
1688 :param: offer_name: Offer name to filter.
1690 :return: Returns application offers results in the model
1693 controller
= await self
.get_controller()
1695 offers
= (await controller
.list_offers(model_name
)).results
1698 for offer
in offers
:
1699 if offer
.offer_name
== offer_name
:
1700 matching_offer
.append(offer
)
1702 offers
= matching_offer
1705 await self
.disconnect_controller(controller
)
1712 client_cert_data
: str,
1713 configuration
: Configuration
,
1715 credential_name
: str = None,
1718 Add a Kubernetes cloud to the controller
1720 Similar to the `juju add-k8s` command in the CLI
1722 :param: name: Name for the K8s cloud
1723 :param: configuration: Kubernetes configuration object
1724 :param: storage_class: Storage Class to use in the cloud
1725 :param: credential_name: Storage Class to use in the cloud
1728 if not storage_class
:
1729 raise Exception("storage_class must be a non-empty string")
1731 raise Exception("name must be a non-empty string")
1732 if not configuration
:
1733 raise Exception("configuration must be provided")
1735 endpoint
= configuration
.host
1736 credential
= self
.get_k8s_cloud_credential(
1741 credential
.attrs
[RBAC_LABEL_KEY_NAME
] = rbac_id
1742 cloud
= client
.Cloud(
1744 auth_types
=[credential
.auth_type
],
1746 ca_certificates
=[client_cert_data
],
1748 "operator-storage": storage_class
,
1749 "workload-storage": storage_class
,
1753 return await self
.add_cloud(
1754 name
, cloud
, credential
, credential_name
=credential_name
1757 def get_k8s_cloud_credential(
1759 configuration
: Configuration
,
1760 client_cert_data
: str,
1762 ) -> client
.CloudCredential
:
1764 # TODO: Test with AKS
1765 key
= None # open(configuration.key_file, "r").read()
1766 username
= configuration
.username
1767 password
= configuration
.password
1769 if client_cert_data
:
1770 attrs
["ClientCertificateData"] = client_cert_data
1772 attrs
["ClientKeyData"] = key
1774 if username
or password
:
1775 raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass")
1776 attrs
["Token"] = token
1780 auth_type
= "oauth2"
1781 if client_cert_data
:
1782 auth_type
= "oauth2withcert"
1784 raise JujuInvalidK8sConfiguration(
1785 "missing token for auth type {}".format(auth_type
)
1790 "credential for user {} has empty password".format(username
)
1792 attrs
["username"] = username
1793 attrs
["password"] = password
1794 if client_cert_data
:
1795 auth_type
= "userpasswithcert"
1797 auth_type
= "userpass"
1798 elif client_cert_data
and token
:
1799 auth_type
= "certificate"
1801 raise JujuInvalidK8sConfiguration("authentication method not supported")
1802 return client
.CloudCredential(auth_type
=auth_type
, attrs
=attrs
)
1804 async def add_cloud(
1808 credential
: CloudCredential
= None,
1809 credential_name
: str = None,
1812 Add cloud to the controller
1814 :param: name: Name of the cloud to be added
1815 :param: cloud: Cloud object
1816 :param: credential: CloudCredentials object for the cloud
1817 :param: credential_name: Credential name.
1818 If not defined, cloud of the name will be used.
1820 controller
= await self
.get_controller()
1822 _
= await controller
.add_cloud(name
, cloud
)
1824 await controller
.add_credential(
1825 credential_name
or name
, credential
=credential
, cloud
=name
1827 # Need to return the object returned by the controller.add_cloud() function
1828 # I'm returning the original value now until this bug is fixed:
1829 # https://github.com/juju/python-libjuju/issues/443
1832 await self
.disconnect_controller(controller
)
1834 async def remove_cloud(self
, name
: str):
1838 :param: name: Name of the cloud to be removed
1840 controller
= await self
.get_controller()
1842 await controller
.remove_cloud(name
)
1843 except juju
.errors
.JujuError
as e
:
1844 if len(e
.errors
) == 1 and f
'cloud "{name}" not found' == e
.errors
[0]:
1845 self
.log
.warning(f
"Cloud {name} not found, so it could not be deleted.")
1849 await self
.disconnect_controller(controller
)
1851 @retry(attempts
=20, delay
=5, fallback
=JujuLeaderUnitNotFound())
1852 async def _get_leader_unit(self
, application
: Application
) -> Unit
:
1854 for u
in application
.units
:
1855 if await u
.is_leader_from_status():
1862 async def get_cloud_credentials(self
, cloud
: Cloud
) -> typing
.List
:
1864 Get cloud credentials
1866 :param: cloud: Cloud object. The returned credentials will be from this cloud.
1868 :return: List of credentials object associated to the specified cloud
1871 controller
= await self
.get_controller()
1873 facade
= client
.CloudFacade
.from_connection(controller
.connection())
1874 cloud_cred_tag
= tag
.credential(
1875 cloud
.name
, self
.vca_connection
.data
.user
, cloud
.credential_name
1877 params
= [client
.Entity(cloud_cred_tag
)]
1878 return (await facade
.Credential(params
)).results
1880 await self
.disconnect_controller(controller
)
1882 async def check_application_exists(self
, model_name
, application_name
) -> bool:
1883 """Check application exists
1885 :param: model_name: Model Name
1886 :param: application_name: Application Name
1892 controller
= await self
.get_controller()
1894 model
= await self
.get_model(controller
, model_name
)
1896 "Checking if application {} exists in model {}".format(
1897 application_name
, model_name
1900 return self
._get
_application
(model
, application_name
) is not None
1903 await self
.disconnect_model(model
)
1904 await self
.disconnect_controller(controller
)