1 # Copyright 2020 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
22 from juju
.model
import Model
23 from juju
.machine
import Machine
24 from juju
.application
import Application
25 from juju
.unit
import Unit
26 from juju
.client
._definitions
import (
28 QueryApplicationOffersResults
,
32 from juju
.controller
import Controller
33 from juju
.client
import client
36 from n2vc
.definitions
import Offer
, RelationEndpoint
37 from n2vc
.juju_watcher
import JujuModelWatcher
38 from n2vc
.provisioner
import AsyncSSHProvisioner
39 from n2vc
.n2vc_conn
import N2VCConnector
40 from n2vc
.exceptions
import (
42 JujuApplicationNotFound
,
43 JujuLeaderUnitNotFound
,
45 JujuControllerFailedConnecting
,
46 JujuApplicationExists
,
47 JujuInvalidK8sConfiguration
,
50 from n2vc
.vca
.cloud
import Cloud
as VcaCloud
51 from n2vc
.vca
.connection
import Connection
52 from kubernetes
.client
.configuration
import Configuration
53 from retrying_async
import retry
56 RBAC_LABEL_KEY_NAME
= "rbac-id"
62 vca_connection
: Connection
,
63 loop
: asyncio
.AbstractEventLoop
= None,
64 log
: logging
.Logger
= None,
65 n2vc
: N2VCConnector
= None,
70 :param: vca_connection: n2vc.vca.connection object
71 :param: loop: Asyncio loop
73 :param: n2vc: N2VC object
76 self
.log
= log
or logging
.getLogger("Libjuju")
78 self
.vca_connection
= vca_connection
80 self
.loop
= loop
or asyncio
.get_event_loop()
81 self
.loop
.set_exception_handler(self
.handle_exception
)
82 self
.creating_model
= asyncio
.Lock(loop
=self
.loop
)
84 if self
.vca_connection
.is_default
:
85 self
.health_check_task
= self
._create
_health
_check
_task
()
87 def _create_health_check_task(self
):
88 return self
.loop
.create_task(self
.health_check())
90 async def get_controller(self
, timeout
: float = 60.0) -> Controller
:
94 :param: timeout: Time in seconds to wait for controller to connect
98 controller
= Controller()
99 await asyncio
.wait_for(
101 endpoint
=self
.vca_connection
.data
.endpoints
,
102 username
=self
.vca_connection
.data
.user
,
103 password
=self
.vca_connection
.data
.secret
,
104 cacert
=self
.vca_connection
.data
.cacert
,
108 if self
.vca_connection
.is_default
:
109 endpoints
= await controller
.api_endpoints
111 endpoint
in self
.vca_connection
.endpoints
for endpoint
in endpoints
113 await self
.vca_connection
.update_endpoints(endpoints
)
115 except asyncio
.CancelledError
as e
:
117 except Exception as e
:
119 "Failed connecting to controller: {}... {}".format(
120 self
.vca_connection
.data
.endpoints
, e
124 await self
.disconnect_controller(controller
)
126 raise JujuControllerFailedConnecting(
127 f
"Error connecting to Juju controller: {e}"
130 async def disconnect(self
):
132 # Cancel health check task
133 self
.health_check_task
.cancel()
134 self
.log
.debug("Libjuju disconnected!")
136 async def disconnect_model(self
, model
: Model
):
140 :param: model: Model that will be disconnected
142 await model
.disconnect()
144 async def disconnect_controller(self
, controller
: Controller
):
146 Disconnect controller
148 :param: controller: Controller that will be disconnected
151 await controller
.disconnect()
153 @retry(attempts
=3, delay
=5, timeout
=None)
154 async def add_model(self
, model_name
: str, cloud
: VcaCloud
):
158 :param: model_name: Model name
159 :param: cloud: Cloud object
163 controller
= await self
.get_controller()
166 # Block until other workers have finished model creation
167 while self
.creating_model
.locked():
168 await asyncio
.sleep(0.1)
171 async with self
.creating_model
:
172 if await self
.model_exists(model_name
, controller
=controller
):
174 self
.log
.debug("Creating model {}".format(model_name
))
175 model
= await controller
.add_model(
177 config
=self
.vca_connection
.data
.model_config
,
178 cloud_name
=cloud
.name
,
179 credential_name
=cloud
.credential_name
,
181 except juju
.errors
.JujuAPIError
as e
:
182 if "already exists" in e
.message
:
188 await self
.disconnect_model(model
)
189 await self
.disconnect_controller(controller
)
191 async def get_executed_actions(self
, model_name
: str) -> list:
193 Get executed/history of actions for a model.
195 :param: model_name: Model name, str.
196 :return: List of executed actions for a model.
199 executed_actions
= []
200 controller
= await self
.get_controller()
202 model
= await self
.get_model(controller
, model_name
)
203 # Get all unique action names
205 for application
in model
.applications
:
206 application_actions
= await self
.get_actions(application
, model_name
)
207 actions
.update(application_actions
)
208 # Get status of all actions
209 for application_action
in actions
:
210 app_action_status_list
= await model
.get_action_status(
211 name
=application_action
213 for action_id
, action_status
in app_action_status_list
.items():
216 "action": application_action
,
217 "status": action_status
,
219 # Get action output by id
220 action_status
= await model
.get_action_output(executed_action
["id"])
221 for k
, v
in action_status
.items():
222 executed_action
[k
] = v
223 executed_actions
.append(executed_action
)
224 except Exception as e
:
226 "Error in getting executed actions for model: {}. Error: {}".format(
232 await self
.disconnect_model(model
)
233 await self
.disconnect_controller(controller
)
234 return executed_actions
236 async def get_application_configs(
237 self
, model_name
: str, application_name
: str
240 Get available configs for an application.
242 :param: model_name: Model name, str.
243 :param: application_name: Application name, str.
245 :return: A dict which has key - action name, value - action description
248 application_configs
= {}
249 controller
= await self
.get_controller()
251 model
= await self
.get_model(controller
, model_name
)
252 application
= self
._get
_application
(
253 model
, application_name
=application_name
255 application_configs
= await application
.get_config()
256 except Exception as e
:
258 "Error in getting configs for application: {} in model: {}. Error: {}".format(
259 application_name
, model_name
, str(e
)
264 await self
.disconnect_model(model
)
265 await self
.disconnect_controller(controller
)
266 return application_configs
268 @retry(attempts
=3, delay
=5)
269 async def get_model(self
, controller
: Controller
, model_name
: str) -> Model
:
271 Get model from controller
273 :param: controller: Controller
274 :param: model_name: Model name
276 :return: Model: The created Juju model object
278 return await controller
.get_model(model_name
)
280 async def model_exists(
281 self
, model_name
: str, controller
: Controller
= None
284 Check if model exists
286 :param: controller: Controller
287 :param: model_name: Model name
291 need_to_disconnect
= False
293 # Get controller if not passed
295 controller
= await self
.get_controller()
296 need_to_disconnect
= True
298 # Check if model exists
300 return model_name
in await controller
.list_models()
302 if need_to_disconnect
:
303 await self
.disconnect_controller(controller
)
305 async def models_exist(self
, model_names
: [str]) -> (bool, list):
307 Check if models exists
309 :param: model_names: List of strings with model names
311 :return (bool, list[str]): (True if all models exists, List of model names that don't exist)
315 "model_names must be a non-empty array. Given value: {}".format(
319 non_existing_models
= []
320 models
= await self
.list_models()
321 existing_models
= list(set(models
).intersection(model_names
))
322 non_existing_models
= list(set(model_names
) - set(existing_models
))
325 len(non_existing_models
) == 0,
329 async def get_model_status(self
, model_name
: str) -> FullStatus
:
333 :param: model_name: Model name
335 :return: Full status object
337 controller
= await self
.get_controller()
338 model
= await self
.get_model(controller
, model_name
)
340 return await model
.get_status()
342 await self
.disconnect_model(model
)
343 await self
.disconnect_controller(controller
)
345 async def create_machine(
348 machine_id
: str = None,
349 db_dict
: dict = None,
350 progress_timeout
: float = None,
351 total_timeout
: float = None,
352 series
: str = "bionic",
354 ) -> (Machine
, bool):
358 :param: model_name: Model name
359 :param: machine_id: Machine id
360 :param: db_dict: Dictionary with data of the DB to write the updates
361 :param: progress_timeout: Maximum time between two updates in the model
362 :param: total_timeout: Timeout for the entity to be active
363 :param: series: Series of the machine (xenial, bionic, focal, ...)
364 :param: wait: Wait until machine is ready
366 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
367 if the machine is new or it already existed
373 "Creating machine (id={}) in model: {}".format(machine_id
, model_name
)
377 controller
= await self
.get_controller()
380 model
= await self
.get_model(controller
, model_name
)
382 if machine_id
is not None:
384 "Searching machine (id={}) in model {}".format(
385 machine_id
, model_name
389 # Get machines from model and get the machine with machine_id if exists
390 machines
= await model
.get_machines()
391 if machine_id
in machines
:
393 "Machine (id={}) found in model {}".format(
394 machine_id
, model_name
397 machine
= machines
[machine_id
]
399 raise JujuMachineNotFound("Machine {} not found".format(machine_id
))
402 self
.log
.debug("Creating a new machine in model {}".format(model_name
))
405 machine
= await model
.add_machine(
406 spec
=None, constraints
=None, disks
=None, series
=series
410 # Wait until the machine is ready
412 "Wait until machine {} is ready in model {}".format(
413 machine
.entity_id
, model_name
417 await JujuModelWatcher
.wait_for(
420 progress_timeout
=progress_timeout
,
421 total_timeout
=total_timeout
,
424 vca_id
=self
.vca_connection
._vca
_id
,
427 await self
.disconnect_model(model
)
428 await self
.disconnect_controller(controller
)
431 "Machine {} ready at {} in model {}".format(
432 machine
.entity_id
, machine
.dns_name
, model_name
437 async def provision_machine(
442 private_key_path
: str,
443 db_dict
: dict = None,
444 progress_timeout
: float = None,
445 total_timeout
: float = None,
448 Manually provisioning of a machine
450 :param: model_name: Model name
451 :param: hostname: IP to access the machine
452 :param: username: Username to login to the machine
453 :param: private_key_path: Local path for the private key
454 :param: db_dict: Dictionary with data of the DB to write the updates
455 :param: progress_timeout: Maximum time between two updates in the model
456 :param: total_timeout: Timeout for the entity to be active
458 :return: (Entity): Machine id
461 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
462 model_name
, hostname
, username
467 controller
= await self
.get_controller()
470 model
= await self
.get_model(controller
, model_name
)
474 provisioner
= AsyncSSHProvisioner(
477 private_key_path
=private_key_path
,
482 params
= await provisioner
.provision_machine()
484 params
.jobs
= ["JobHostUnits"]
486 self
.log
.debug("Adding machine to model")
487 connection
= model
.connection()
488 client_facade
= client
.ClientFacade
.from_connection(connection
)
490 results
= await client_facade
.AddMachines(params
=[params
])
491 error
= results
.machines
[0].error
494 msg
= "Error adding machine: {}".format(error
.message
)
495 self
.log
.error(msg
=msg
)
496 raise ValueError(msg
)
498 machine_id
= results
.machines
[0].machine
500 self
.log
.debug("Installing Juju agent into machine {}".format(machine_id
))
501 asyncio
.ensure_future(
502 provisioner
.install_agent(
503 connection
=connection
,
505 machine_id
=machine_id
,
506 proxy
=self
.vca_connection
.data
.api_proxy
,
507 series
=params
.series
,
513 machine_list
= await model
.get_machines()
514 if machine_id
in machine_list
:
515 self
.log
.debug("Machine {} found in model!".format(machine_id
))
516 machine
= model
.machines
.get(machine_id
)
518 await asyncio
.sleep(2)
521 msg
= "Machine {} not found in model".format(machine_id
)
522 self
.log
.error(msg
=msg
)
523 raise JujuMachineNotFound(msg
)
526 "Wait until machine {} is ready in model {}".format(
527 machine
.entity_id
, model_name
530 await JujuModelWatcher
.wait_for(
533 progress_timeout
=progress_timeout
,
534 total_timeout
=total_timeout
,
537 vca_id
=self
.vca_connection
._vca
_id
,
539 except Exception as e
:
542 await self
.disconnect_model(model
)
543 await self
.disconnect_controller(controller
)
546 "Machine provisioned {} in model {}".format(machine_id
, model_name
)
552 self
, uri
: str, model_name
: str, wait
: bool = True, timeout
: float = 3600
555 Deploy bundle or charm: Similar to the juju CLI command `juju deploy`
557 :param: uri: Path or Charm Store uri in which the charm or bundle can be found
558 :param: model_name: Model name
559 :param: wait: Indicates whether to wait or not until all applications are active
560 :param: timeout: Time in seconds to wait until all applications are active
562 controller
= await self
.get_controller()
563 model
= await self
.get_model(controller
, model_name
)
565 await model
.deploy(uri
, trust
=True)
567 await JujuModelWatcher
.wait_for_model(model
, timeout
=timeout
)
568 self
.log
.debug("All units active in model {}".format(model_name
))
570 await self
.disconnect_model(model
)
571 await self
.disconnect_controller(controller
)
575 application_name
: str,
578 db_dict
: dict = None,
579 progress_timeout
: float = None,
580 total_timeout
: float = None,
584 :param: application_name: Application name
585 :param: model_name: Model name
586 :param: machine_id Machine id
587 :param: db_dict: Dictionary with data of the DB to write the updates
588 :param: progress_timeout: Maximum time between two updates in the model
589 :param: total_timeout: Timeout for the entity to be active
595 controller
= await self
.get_controller()
597 model
= await self
.get_model(controller
, model_name
)
598 application
= self
._get
_application
(model
, application_name
)
600 if application
is not None:
602 # Checks if the given machine id in the model,
603 # otherwise function raises an error
604 _machine
, _series
= self
._get
_machine
_info
(model
, machine_id
)
607 "Adding unit (machine {}) to application {} in model ~{}".format(
608 machine_id
, application_name
, model_name
612 await application
.add_unit(to
=machine_id
)
614 await JujuModelWatcher
.wait_for(
617 progress_timeout
=progress_timeout
,
618 total_timeout
=total_timeout
,
621 vca_id
=self
.vca_connection
._vca
_id
,
624 "Unit is added to application {} in model {}".format(
625 application_name
, model_name
629 raise JujuApplicationNotFound(
630 "Application {} not exists".format(application_name
)
634 await self
.disconnect_model(model
)
635 await self
.disconnect_controller(controller
)
637 async def destroy_unit(
639 application_name
: str,
642 total_timeout
: float = None,
646 :param: application_name: Application name
647 :param: model_name: Model name
648 :param: machine_id Machine id
649 :param: total_timeout: Timeout for the entity to be active
655 controller
= await self
.get_controller()
657 model
= await self
.get_model(controller
, model_name
)
658 application
= self
._get
_application
(model
, application_name
)
660 if application
is None:
661 raise JujuApplicationNotFound(
662 "Application not found: {} (model={})".format(
663 application_name
, model_name
667 unit
= self
._get
_unit
(application
, machine_id
)
670 "A unit with machine id {} not in available units".format(
675 unit_name
= unit
.name
678 "Destroying unit {} from application {} in model {}".format(
679 unit_name
, application_name
, model_name
682 await application
.destroy_unit(unit_name
)
685 "Waiting for unit {} to be destroyed in application {} (model={})...".format(
686 unit_name
, application_name
, model_name
690 # TODO: Add functionality in the Juju watcher to replace this kind of blocks
691 if total_timeout
is None:
693 end
= time
.time() + total_timeout
694 while time
.time() < end
:
695 if not self
._get
_unit
(application
, machine_id
):
697 "The unit {} was destroyed in application {} (model={}) ".format(
698 unit_name
, application_name
, model_name
702 await asyncio
.sleep(5)
704 "Unit {} is destroyed from application {} in model {}".format(
705 unit_name
, application_name
, model_name
710 await self
.disconnect_model(model
)
711 await self
.disconnect_controller(controller
)
713 async def deploy_charm(
715 application_name
: str,
719 db_dict
: dict = None,
720 progress_timeout
: float = None,
721 total_timeout
: float = None,
728 :param: application_name: Application name
729 :param: path: Local path to the charm
730 :param: model_name: Model name
731 :param: machine_id ID of the machine
732 :param: db_dict: Dictionary with data of the DB to write the updates
733 :param: progress_timeout: Maximum time between two updates in the model
734 :param: total_timeout: Timeout for the entity to be active
735 :param: config: Config for the charm
736 :param: series: Series of the charm
737 :param: num_units: Number of units
739 :return: (juju.application.Application): Juju application
742 "Deploying charm {} to machine {} in model ~{}".format(
743 application_name
, machine_id
, model_name
746 self
.log
.debug("charm: {}".format(path
))
749 controller
= await self
.get_controller()
752 model
= await self
.get_model(controller
, model_name
)
755 if application_name
not in model
.applications
:
757 if machine_id
is not None:
758 machine
, series
= self
._get
_machine
_info
(model
, machine_id
)
760 application
= await model
.deploy(
762 application_name
=application_name
,
771 "Wait until application {} is ready in model {}".format(
772 application_name
, model_name
776 for _
in range(num_units
- 1):
777 m
, _
= await self
.create_machine(model_name
, wait
=False)
778 await application
.add_unit(to
=m
.entity_id
)
780 await JujuModelWatcher
.wait_for(
783 progress_timeout
=progress_timeout
,
784 total_timeout
=total_timeout
,
787 vca_id
=self
.vca_connection
._vca
_id
,
790 "Application {} is ready in model {}".format(
791 application_name
, model_name
795 raise JujuApplicationExists(
796 "Application {} exists".format(application_name
)
798 except juju
.errors
.JujuError
as e
:
799 if "already exists" in e
.message
:
800 raise JujuApplicationExists(
801 "Application {} exists".format(application_name
)
806 await self
.disconnect_model(model
)
807 await self
.disconnect_controller(controller
)
811 async def upgrade_charm(
813 application_name
: str,
816 total_timeout
: float = None,
821 :param: application_name: Application name
822 :param: model_name: Model name
823 :param: path: Local path to the charm
824 :param: total_timeout: Timeout for the entity to be active
826 :return: (str, str): (output and status)
830 "Upgrading charm {} in model {} from path {}".format(
831 application_name
, model_name
, path
835 await self
.resolve_application(
836 model_name
=model_name
, application_name
=application_name
840 controller
= await self
.get_controller()
843 model
= await self
.get_model(controller
, model_name
)
847 application
= self
._get
_application
(
849 application_name
=application_name
,
851 if application
is None:
852 raise JujuApplicationNotFound(
853 "Cannot find application {} to upgrade".format(application_name
)
856 await application
.refresh(path
=path
)
859 "Wait until charm upgrade is completed for application {} (model={})".format(
860 application_name
, model_name
864 await JujuModelWatcher
.ensure_units_idle(
865 model
=model
, application
=application
868 if application
.status
== "error":
869 error_message
= "Unknown"
870 for unit
in application
.units
:
872 unit
.workload_status
== "error"
873 and unit
.workload_status_message
!= ""
875 error_message
= unit
.workload_status_message
877 message
= "Application {} failed update in {}: {}".format(
878 application_name
, model_name
, error_message
880 self
.log
.error(message
)
881 raise JujuError(message
=message
)
884 "Application {} is ready in model {}".format(
885 application_name
, model_name
890 await self
.disconnect_model(model
)
891 await self
.disconnect_controller(controller
)
895 async def resolve_application(self
, model_name
: str, application_name
: str):
897 controller
= await self
.get_controller()
898 model
= await self
.get_model(controller
, model_name
)
901 application
= self
._get
_application
(
903 application_name
=application_name
,
905 if application
is None:
906 raise JujuApplicationNotFound(
907 "Cannot find application {} to resolve".format(application_name
)
910 while application
.status
== "error":
911 for unit
in application
.units
:
912 if unit
.workload_status
== "error":
914 "Model {}, Application {}, Unit {} in error state, resolving".format(
915 model_name
, application_name
, unit
.entity_id
919 await unit
.resolved(retry
=False)
923 await asyncio
.sleep(1)
926 await self
.disconnect_model(model
)
927 await self
.disconnect_controller(controller
)
929 async def resolve(self
, model_name
: str):
931 controller
= await self
.get_controller()
932 model
= await self
.get_model(controller
, model_name
)
933 all_units_active
= False
935 while not all_units_active
:
936 all_units_active
= True
937 for application_name
, application
in model
.applications
.items():
938 if application
.status
== "error":
939 for unit
in application
.units
:
940 if unit
.workload_status
== "error":
942 "Model {}, Application {}, Unit {} in error state, resolving".format(
943 model_name
, application_name
, unit
.entity_id
947 await unit
.resolved(retry
=False)
948 all_units_active
= False
952 if not all_units_active
:
953 await asyncio
.sleep(5)
955 await self
.disconnect_model(model
)
956 await self
.disconnect_controller(controller
)
958 async def scale_application(
961 application_name
: str,
963 total_timeout
: float = None,
966 Scale application (K8s)
968 :param: model_name: Model name
969 :param: application_name: Application name
970 :param: scale: Scale to which to set this application
971 :param: total_timeout: Timeout for the entity to be active
975 controller
= await self
.get_controller()
977 model
= await self
.get_model(controller
, model_name
)
980 "Scaling application {} in model {}".format(
981 application_name
, model_name
984 application
= self
._get
_application
(model
, application_name
)
985 if application
is None:
986 raise JujuApplicationNotFound("Cannot scale application")
987 await application
.scale(scale
=scale
)
988 # Wait until application is scaled in model
990 "Waiting for application {} to be scaled in model {}...".format(
991 application_name
, model_name
994 if total_timeout
is None:
996 end
= time
.time() + total_timeout
997 while time
.time() < end
:
998 application_scale
= self
._get
_application
_count
(model
, application_name
)
999 # Before calling wait_for_model function,
1000 # wait until application unit count and scale count are equal.
1001 # Because there is a delay before scaling triggers in Juju model.
1002 if application_scale
== scale
:
1003 await JujuModelWatcher
.wait_for_model(
1004 model
=model
, timeout
=total_timeout
1007 "Application {} is scaled in model {}".format(
1008 application_name
, model_name
1012 await asyncio
.sleep(5)
1014 "Timeout waiting for application {} in model {} to be scaled".format(
1015 application_name
, model_name
1020 await self
.disconnect_model(model
)
1021 await self
.disconnect_controller(controller
)
1023 def _get_application_count(self
, model
: Model
, application_name
: str) -> int:
1024 """Get number of units of the application
1026 :param: model: Model object
1027 :param: application_name: Application name
1029 :return: int (or None if application doesn't exist)
1031 application
= self
._get
_application
(model
, application_name
)
1032 if application
is not None:
1033 return len(application
.units
)
1035 def _get_application(self
, model
: Model
, application_name
: str) -> Application
:
1038 :param: model: Model object
1039 :param: application_name: Application name
1041 :return: juju.application.Application (or None if it doesn't exist)
1043 if model
.applications
and application_name
in model
.applications
:
1044 return model
.applications
[application_name
]
1046 def _get_unit(self
, application
: Application
, machine_id
: str) -> Unit
:
1049 :param: application: Application object
1050 :param: machine_id: Machine id
1055 for u
in application
.units
:
1056 if u
.machine_id
== machine_id
:
1061 def _get_machine_info(
1068 :param: model: Model object
1069 :param: machine_id: Machine id
1071 :return: (str, str): (machine, series)
1073 if machine_id
not in model
.machines
:
1074 msg
= "Machine {} not found in model".format(machine_id
)
1075 self
.log
.error(msg
=msg
)
1076 raise JujuMachineNotFound(msg
)
1077 machine
= model
.machines
[machine_id
]
1078 return machine
, machine
.series
1080 async def execute_action(
1082 application_name
: str,
1085 db_dict
: dict = None,
1086 machine_id
: str = None,
1087 progress_timeout
: float = None,
1088 total_timeout
: float = None,
1093 :param: application_name: Application name
1094 :param: model_name: Model name
1095 :param: action_name: Name of the action
1096 :param: db_dict: Dictionary with data of the DB to write the updates
1097 :param: machine_id Machine id
1098 :param: progress_timeout: Maximum time between two updates in the model
1099 :param: total_timeout: Timeout for the entity to be active
1101 :return: (str, str): (output and status)
1104 "Executing action {} using params {}".format(action_name
, kwargs
)
1107 controller
= await self
.get_controller()
1110 model
= await self
.get_model(controller
, model_name
)
1114 application
= self
._get
_application
(
1116 application_name
=application_name
,
1118 if application
is None:
1119 raise JujuApplicationNotFound("Cannot execute action")
1121 # Ocassionally, self._get_leader_unit() will return None
1122 # because the leader elected hook has not been triggered yet.
1123 # Therefore, we are doing some retries. If it happens again,
1125 if machine_id
is None:
1126 unit
= await self
._get
_leader
_unit
(application
)
1128 "Action {} is being executed on the leader unit {}".format(
1129 action_name
, unit
.name
1133 unit
= self
._get
_unit
(application
, machine_id
)
1136 "A unit with machine id {} not in available units".format(
1141 "Action {} is being executed on {} unit".format(
1142 action_name
, unit
.name
1146 actions
= await application
.get_actions()
1148 if action_name
not in actions
:
1149 raise JujuActionNotFound(
1150 "Action {} not in available actions".format(action_name
)
1153 action
= await unit
.run_action(action_name
, **kwargs
)
1156 "Wait until action {} is completed in application {} (model={})".format(
1157 action_name
, application_name
, model_name
1160 await JujuModelWatcher
.wait_for(
1163 progress_timeout
=progress_timeout
,
1164 total_timeout
=total_timeout
,
1167 vca_id
=self
.vca_connection
._vca
_id
,
1170 output
= await model
.get_action_output(action_uuid
=action
.entity_id
)
1171 status
= await model
.get_action_status(uuid_or_prefix
=action
.entity_id
)
1173 status
[action
.entity_id
] if action
.entity_id
in status
else "failed"
1177 "Action {} completed with status {} in application {} (model={})".format(
1178 action_name
, action
.status
, application_name
, model_name
1182 await self
.disconnect_model(model
)
1183 await self
.disconnect_controller(controller
)
1185 return output
, status
1187 async def get_actions(self
, application_name
: str, model_name
: str) -> dict:
1188 """Get list of actions
1190 :param: application_name: Application name
1191 :param: model_name: Model name
1193 :return: Dict with this format
1195 "action_name": "Description of the action",
1200 "Getting list of actions for application {}".format(application_name
)
1204 controller
= await self
.get_controller()
1207 model
= await self
.get_model(controller
, model_name
)
1211 application
= self
._get
_application
(
1213 application_name
=application_name
,
1216 # Return list of actions
1217 return await application
.get_actions()
1220 # Disconnect from model and controller
1221 await self
.disconnect_model(model
)
1222 await self
.disconnect_controller(controller
)
1224 async def get_metrics(self
, model_name
: str, application_name
: str) -> dict:
1225 """Get the metrics collected by the VCA.
1227 :param model_name The name or unique id of the network service
1228 :param application_name The name of the application
1230 if not model_name
or not application_name
:
1231 raise Exception("model_name and application_name must be non-empty strings")
1233 controller
= await self
.get_controller()
1234 model
= await self
.get_model(controller
, model_name
)
1236 application
= self
._get
_application
(model
, application_name
)
1237 if application
is not None:
1238 metrics
= await application
.get_metrics()
1240 self
.disconnect_model(model
)
1241 self
.disconnect_controller(controller
)
1244 async def add_relation(
1252 :param: model_name: Model name
1253 :param: endpoint_1 First endpoint name
1254 ("app:endpoint" format or directly the saas name)
1255 :param: endpoint_2: Second endpoint name (^ same format)
1258 self
.log
.debug("Adding relation: {} -> {}".format(endpoint_1
, endpoint_2
))
1261 controller
= await self
.get_controller()
1264 model
= await self
.get_model(controller
, model_name
)
1268 await model
.add_relation(endpoint_1
, endpoint_2
)
1269 except juju
.errors
.JujuAPIError
as e
:
1270 if self
._relation
_is
_not
_found
(e
):
1271 self
.log
.warning("Relation not found: {}".format(e
.message
))
1273 if self
._relation
_already
_exist
(e
):
1274 self
.log
.warning("Relation already exists: {}".format(e
.message
))
1276 # another exception, raise it
1279 await self
.disconnect_model(model
)
1280 await self
.disconnect_controller(controller
)
1282 def _relation_is_not_found(self
, juju_error
):
1284 return (text
in juju_error
.message
) or (
1285 juju_error
.error_code
and text
in juju_error
.error_code
1288 def _relation_already_exist(self
, juju_error
):
1289 text
= "already exists"
1290 return (text
in juju_error
.message
) or (
1291 juju_error
.error_code
and text
in juju_error
.error_code
1294 async def offer(self
, endpoint
: RelationEndpoint
) -> Offer
:
1296 Create an offer from a RelationEndpoint
1298 :param: endpoint: Relation endpoint
1300 :return: Offer object
1302 model_name
= endpoint
.model_name
1303 offer_name
= f
"{endpoint.application_name}-{endpoint.endpoint_name}"
1304 controller
= await self
.get_controller()
1307 model
= await self
.get_model(controller
, model_name
)
1308 await model
.create_offer(endpoint
.endpoint
, offer_name
=offer_name
)
1309 offer_list
= await self
._list
_offers
(model_name
, offer_name
=offer_name
)
1311 return Offer(offer_list
[0].offer_url
)
1313 raise Exception("offer was not created")
1314 except juju
.errors
.JujuError
as e
:
1315 if "application offer already exists" not in e
.message
:
1319 self
.disconnect_model(model
)
1320 self
.disconnect_controller(controller
)
1326 provider_libjuju
: "Libjuju",
1329 Consumes a remote offer in the model. Relations can be created later using "juju relate".
1331 :param: model_name: Model name
1332 :param: offer: Offer object to consume
1333 :param: provider_libjuju: Libjuju object of the provider endpoint
1335 :raises ParseError if there's a problem parsing the offer_url
1336 :raises JujuError if remote offer includes and endpoint
1337 :raises JujuAPIError if the operation is not successful
1339 :returns: Saas name. It is the application name in the model that reference the remote application.
1341 saas_name
= f
'{offer.name}-{offer.model_name.replace("-", "")}'
1343 saas_name
= f
"{saas_name}-{offer.vca_id}"
1344 controller
= await self
.get_controller()
1346 provider_controller
= None
1348 model
= await controller
.get_model(model_name
)
1349 provider_controller
= await provider_libjuju
.get_controller()
1350 await model
.consume(
1351 offer
.url
, application_alias
=saas_name
, controller
=provider_controller
1356 await self
.disconnect_model(model
)
1357 if provider_controller
:
1358 await provider_libjuju
.disconnect_controller(provider_controller
)
1359 await self
.disconnect_controller(controller
)
1361 async def destroy_model(self
, model_name
: str, total_timeout
: float = 1800):
1365 :param: model_name: Model name
1366 :param: total_timeout: Timeout
1369 controller
= await self
.get_controller()
1372 if not await self
.model_exists(model_name
, controller
=controller
):
1373 self
.log
.warn(f
"Model {model_name} doesn't exist")
1376 self
.log
.debug(f
"Getting model {model_name} to be destroyed")
1377 model
= await self
.get_model(controller
, model_name
)
1378 self
.log
.debug(f
"Destroying manual machines in model {model_name}")
1379 # Destroy machines that are manually provisioned
1380 # and still are in pending state
1381 await self
._destroy
_pending
_machines
(model
, only_manual
=True)
1382 await self
.disconnect_model(model
)
1384 await asyncio
.wait_for(
1385 self
._destroy
_model
(model_name
, controller
),
1386 timeout
=total_timeout
,
1388 except Exception as e
:
1389 if not await self
.model_exists(model_name
, controller
=controller
):
1391 f
"Failed deleting model {model_name}: model doesn't exist"
1394 self
.log
.warn(f
"Failed deleting model {model_name}: {e}")
1398 await self
.disconnect_model(model
)
1399 await self
.disconnect_controller(controller
)
1401 async def _destroy_model(
1404 controller
: Controller
,
1407 Destroy model from controller
1409 :param: model: Model name to be removed
1410 :param: controller: Controller object
1411 :param: timeout: Timeout in seconds
1413 self
.log
.debug(f
"Destroying model {model_name}")
1415 async def _destroy_model_gracefully(model_name
: str, controller
: Controller
):
1416 self
.log
.info(f
"Gracefully deleting model {model_name}")
1418 while model_name
in await controller
.list_models():
1420 await self
.resolve(model_name
)
1422 await controller
.destroy_model(model_name
, destroy_storage
=True)
1424 await asyncio
.sleep(5)
1425 self
.log
.info(f
"Model {model_name} deleted gracefully")
1427 async def _destroy_model_forcefully(model_name
: str, controller
: Controller
):
1428 self
.log
.info(f
"Forcefully deleting model {model_name}")
1429 while model_name
in await controller
.list_models():
1430 await controller
.destroy_model(
1431 model_name
, destroy_storage
=True, force
=True, max_wait
=60
1433 await asyncio
.sleep(5)
1434 self
.log
.info(f
"Model {model_name} deleted forcefully")
1438 await asyncio
.wait_for(
1439 _destroy_model_gracefully(model_name
, controller
), timeout
=120
1441 except asyncio
.TimeoutError
:
1442 await _destroy_model_forcefully(model_name
, controller
)
1443 except juju
.errors
.JujuError
as e
:
1444 if any("has been removed" in error
for error
in e
.errors
):
1446 if any("model not found" in error
for error
in e
.errors
):
1450 async def destroy_application(
1451 self
, model_name
: str, application_name
: str, total_timeout
: float
1456 :param: model_name: Model name
1457 :param: application_name: Application name
1458 :param: total_timeout: Timeout
1461 controller
= await self
.get_controller()
1465 model
= await self
.get_model(controller
, model_name
)
1467 "Destroying application {} in model {}".format(
1468 application_name
, model_name
1471 application
= self
._get
_application
(model
, application_name
)
1473 await application
.destroy()
1475 self
.log
.warning("Application not found: {}".format(application_name
))
1478 "Waiting for application {} to be destroyed in model {}...".format(
1479 application_name
, model_name
1482 if total_timeout
is None:
1483 total_timeout
= 3600
1484 end
= time
.time() + total_timeout
1485 while time
.time() < end
:
1486 if not self
._get
_application
(model
, application_name
):
1488 "The application {} was destroyed in model {} ".format(
1489 application_name
, model_name
1493 await asyncio
.sleep(5)
1495 "Timeout waiting for application {} to be destroyed in model {}".format(
1496 application_name
, model_name
1500 if model
is not None:
1501 await self
.disconnect_model(model
)
1502 await self
.disconnect_controller(controller
)
1504 async def _destroy_pending_machines(self
, model
: Model
, only_manual
: bool = False):
1506 Destroy pending machines in a given model
1508 :param: only_manual: Bool that indicates only manually provisioned
1509 machines should be destroyed (if True), or that
1510 all pending machines should be destroyed
1512 status
= await model
.get_status()
1513 for machine_id
in status
.machines
:
1514 machine_status
= status
.machines
[machine_id
]
1515 if machine_status
.agent_status
.status
== "pending":
1516 if only_manual
and not machine_status
.instance_id
.startswith("manual:"):
1518 machine
= model
.machines
[machine_id
]
1519 await machine
.destroy(force
=True)
1521 async def configure_application(
1522 self
, model_name
: str, application_name
: str, config
: dict = None
1524 """Configure application
1526 :param: model_name: Model name
1527 :param: application_name: Application name
1528 :param: config: Config to apply to the charm
1530 self
.log
.debug("Configuring application {}".format(application_name
))
1533 controller
= await self
.get_controller()
1536 model
= await self
.get_model(controller
, model_name
)
1537 application
= self
._get
_application
(
1539 application_name
=application_name
,
1541 await application
.set_config(config
)
1544 await self
.disconnect_model(model
)
1545 await self
.disconnect_controller(controller
)
1547 def handle_exception(self
, loop
, context
):
1548 # All unhandled exceptions by libjuju are handled here.
1551 async def health_check(self
, interval
: float = 300.0):
1553 Health check to make sure controller and controller_model connections are OK
1555 :param: interval: Time in seconds between checks
1560 controller
= await self
.get_controller()
1561 # self.log.debug("VCA is alive")
1562 except Exception as e
:
1563 self
.log
.error("Health check to VCA failed: {}".format(e
))
1565 await self
.disconnect_controller(controller
)
1566 await asyncio
.sleep(interval
)
1568 async def list_models(self
, contains
: str = None) -> [str]:
1569 """List models with certain names
1571 :param: contains: String that is contained in model name
1573 :retur: [models] Returns list of model names
1576 controller
= await self
.get_controller()
1578 models
= await controller
.list_models()
1580 models
= [model
for model
in models
if contains
in model
]
1583 await self
.disconnect_controller(controller
)
1585 async def _list_offers(
1586 self
, model_name
: str, offer_name
: str = None
1587 ) -> QueryApplicationOffersResults
:
1589 List offers within a model
1591 :param: model_name: Model name
1592 :param: offer_name: Offer name to filter.
1594 :return: Returns application offers results in the model
1597 controller
= await self
.get_controller()
1599 offers
= (await controller
.list_offers(model_name
)).results
1602 for offer
in offers
:
1603 if offer
.offer_name
== offer_name
:
1604 matching_offer
.append(offer
)
1606 offers
= matching_offer
1609 await self
.disconnect_controller(controller
)
1616 client_cert_data
: str,
1617 configuration
: Configuration
,
1619 credential_name
: str = None,
1622 Add a Kubernetes cloud to the controller
1624 Similar to the `juju add-k8s` command in the CLI
1626 :param: name: Name for the K8s cloud
1627 :param: configuration: Kubernetes configuration object
1628 :param: storage_class: Storage Class to use in the cloud
1629 :param: credential_name: Storage Class to use in the cloud
1632 if not storage_class
:
1633 raise Exception("storage_class must be a non-empty string")
1635 raise Exception("name must be a non-empty string")
1636 if not configuration
:
1637 raise Exception("configuration must be provided")
1639 endpoint
= configuration
.host
1640 credential
= self
.get_k8s_cloud_credential(
1645 credential
.attrs
[RBAC_LABEL_KEY_NAME
] = rbac_id
1646 cloud
= client
.Cloud(
1648 auth_types
=[credential
.auth_type
],
1650 ca_certificates
=[client_cert_data
],
1652 "operator-storage": storage_class
,
1653 "workload-storage": storage_class
,
1657 return await self
.add_cloud(
1658 name
, cloud
, credential
, credential_name
=credential_name
1661 def get_k8s_cloud_credential(
1663 configuration
: Configuration
,
1664 client_cert_data
: str,
1666 ) -> client
.CloudCredential
:
1668 # TODO: Test with AKS
1669 key
= None # open(configuration.key_file, "r").read()
1670 username
= configuration
.username
1671 password
= configuration
.password
1673 if client_cert_data
:
1674 attrs
["ClientCertificateData"] = client_cert_data
1676 attrs
["ClientKeyData"] = key
1678 if username
or password
:
1679 raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass")
1680 attrs
["Token"] = token
1684 auth_type
= "oauth2"
1685 if client_cert_data
:
1686 auth_type
= "oauth2withcert"
1688 raise JujuInvalidK8sConfiguration(
1689 "missing token for auth type {}".format(auth_type
)
1694 "credential for user {} has empty password".format(username
)
1696 attrs
["username"] = username
1697 attrs
["password"] = password
1698 if client_cert_data
:
1699 auth_type
= "userpasswithcert"
1701 auth_type
= "userpass"
1702 elif client_cert_data
and token
:
1703 auth_type
= "certificate"
1705 raise JujuInvalidK8sConfiguration("authentication method not supported")
1706 return client
.CloudCredential(auth_type
=auth_type
, attrs
=attrs
)
1708 async def add_cloud(
1712 credential
: CloudCredential
= None,
1713 credential_name
: str = None,
1716 Add cloud to the controller
1718 :param: name: Name of the cloud to be added
1719 :param: cloud: Cloud object
1720 :param: credential: CloudCredentials object for the cloud
1721 :param: credential_name: Credential name.
1722 If not defined, cloud of the name will be used.
1724 controller
= await self
.get_controller()
1726 _
= await controller
.add_cloud(name
, cloud
)
1728 await controller
.add_credential(
1729 credential_name
or name
, credential
=credential
, cloud
=name
1731 # Need to return the object returned by the controller.add_cloud() function
1732 # I'm returning the original value now until this bug is fixed:
1733 # https://github.com/juju/python-libjuju/issues/443
1736 await self
.disconnect_controller(controller
)
1738 async def remove_cloud(self
, name
: str):
1742 :param: name: Name of the cloud to be removed
1744 controller
= await self
.get_controller()
1746 await controller
.remove_cloud(name
)
1747 except juju
.errors
.JujuError
as e
:
1748 if len(e
.errors
) == 1 and f
'cloud "{name}" not found' == e
.errors
[0]:
1749 self
.log
.warning(f
"Cloud {name} not found, so it could not be deleted.")
1753 await self
.disconnect_controller(controller
)
1755 @retry(attempts
=20, delay
=5, fallback
=JujuLeaderUnitNotFound())
1756 async def _get_leader_unit(self
, application
: Application
) -> Unit
:
1758 for u
in application
.units
:
1759 if await u
.is_leader_from_status():
1766 async def get_cloud_credentials(self
, cloud
: Cloud
) -> typing
.List
:
1768 Get cloud credentials
1770 :param: cloud: Cloud object. The returned credentials will be from this cloud.
1772 :return: List of credentials object associated to the specified cloud
1775 controller
= await self
.get_controller()
1777 facade
= client
.CloudFacade
.from_connection(controller
.connection())
1778 cloud_cred_tag
= tag
.credential(
1779 cloud
.name
, self
.vca_connection
.data
.user
, cloud
.credential_name
1781 params
= [client
.Entity(cloud_cred_tag
)]
1782 return (await facade
.Credential(params
)).results
1784 await self
.disconnect_controller(controller
)
1786 async def check_application_exists(self
, model_name
, application_name
) -> bool:
1787 """Check application exists
1789 :param: model_name: Model Name
1790 :param: application_name: Application Name
1796 controller
= await self
.get_controller()
1798 model
= await self
.get_model(controller
, model_name
)
1800 "Checking if application {} exists in model {}".format(
1801 application_name
, model_name
1804 return self
._get
_application
(model
, application_name
) is not None
1807 await self
.disconnect_model(model
)
1808 await self
.disconnect_controller(controller
)