1 # Copyright 2020 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
22 from juju
.model
import Model
23 from juju
.machine
import Machine
24 from juju
.application
import Application
25 from juju
.unit
import Unit
26 from juju
.client
._definitions
import (
28 QueryApplicationOffersResults
,
32 from juju
.controller
import Controller
33 from juju
.client
import client
36 from n2vc
.definitions
import Offer
, RelationEndpoint
37 from n2vc
.juju_watcher
import JujuModelWatcher
38 from n2vc
.provisioner
import AsyncSSHProvisioner
39 from n2vc
.n2vc_conn
import N2VCConnector
40 from n2vc
.exceptions
import (
42 JujuApplicationNotFound
,
43 JujuLeaderUnitNotFound
,
45 JujuControllerFailedConnecting
,
46 JujuApplicationExists
,
47 JujuInvalidK8sConfiguration
,
50 from n2vc
.vca
.cloud
import Cloud
as VcaCloud
51 from n2vc
.vca
.connection
import Connection
52 from kubernetes
.client
.configuration
import Configuration
53 from retrying_async
import retry
56 RBAC_LABEL_KEY_NAME
= "rbac-id"
62 vca_connection
: Connection
,
63 loop
: asyncio
.AbstractEventLoop
= None,
64 log
: logging
.Logger
= None,
65 n2vc
: N2VCConnector
= None,
70 :param: vca_connection: n2vc.vca.connection object
71 :param: loop: Asyncio loop
73 :param: n2vc: N2VC object
76 self
.log
= log
or logging
.getLogger("Libjuju")
78 self
.vca_connection
= vca_connection
80 self
.loop
= loop
or asyncio
.get_event_loop()
81 self
.loop
.set_exception_handler(self
.handle_exception
)
82 self
.creating_model
= asyncio
.Lock(loop
=self
.loop
)
84 if self
.vca_connection
.is_default
:
85 self
.health_check_task
= self
._create
_health
_check
_task
()
87 def _create_health_check_task(self
):
88 return self
.loop
.create_task(self
.health_check())
90 async def get_controller(self
, timeout
: float = 60.0) -> Controller
:
94 :param: timeout: Time in seconds to wait for controller to connect
98 controller
= Controller()
99 await asyncio
.wait_for(
101 endpoint
=self
.vca_connection
.data
.endpoints
,
102 username
=self
.vca_connection
.data
.user
,
103 password
=self
.vca_connection
.data
.secret
,
104 cacert
=self
.vca_connection
.data
.cacert
,
108 if self
.vca_connection
.is_default
:
109 endpoints
= await controller
.api_endpoints
111 endpoint
in self
.vca_connection
.endpoints
for endpoint
in endpoints
113 await self
.vca_connection
.update_endpoints(endpoints
)
115 except asyncio
.CancelledError
as e
:
117 except Exception as e
:
119 "Failed connecting to controller: {}... {}".format(
120 self
.vca_connection
.data
.endpoints
, e
124 await self
.disconnect_controller(controller
)
126 raise JujuControllerFailedConnecting(
127 f
"Error connecting to Juju controller: {e}"
130 async def disconnect(self
):
132 # Cancel health check task
133 self
.health_check_task
.cancel()
134 self
.log
.debug("Libjuju disconnected!")
136 async def disconnect_model(self
, model
: Model
):
140 :param: model: Model that will be disconnected
142 await model
.disconnect()
144 async def disconnect_controller(self
, controller
: Controller
):
146 Disconnect controller
148 :param: controller: Controller that will be disconnected
151 await controller
.disconnect()
153 @retry(attempts
=3, delay
=5, timeout
=None)
154 async def add_model(self
, model_name
: str, cloud
: VcaCloud
):
158 :param: model_name: Model name
159 :param: cloud: Cloud object
163 controller
= await self
.get_controller()
166 # Block until other workers have finished model creation
167 while self
.creating_model
.locked():
168 await asyncio
.sleep(0.1)
171 async with self
.creating_model
:
172 if await self
.model_exists(model_name
, controller
=controller
):
174 self
.log
.debug("Creating model {}".format(model_name
))
175 model
= await controller
.add_model(
177 config
=self
.vca_connection
.data
.model_config
,
178 cloud_name
=cloud
.name
,
179 credential_name
=cloud
.credential_name
,
181 except juju
.errors
.JujuAPIError
as e
:
182 if "already exists" in e
.message
:
188 await self
.disconnect_model(model
)
189 await self
.disconnect_controller(controller
)
191 async def get_executed_actions(self
, model_name
: str) -> list:
193 Get executed/history of actions for a model.
195 :param: model_name: Model name, str.
196 :return: List of executed actions for a model.
199 executed_actions
= []
200 controller
= await self
.get_controller()
202 model
= await self
.get_model(controller
, model_name
)
203 # Get all unique action names
205 for application
in model
.applications
:
206 application_actions
= await self
.get_actions(application
, model_name
)
207 actions
.update(application_actions
)
208 # Get status of all actions
209 for application_action
in actions
:
210 app_action_status_list
= await model
.get_action_status(
211 name
=application_action
213 for action_id
, action_status
in app_action_status_list
.items():
216 "action": application_action
,
217 "status": action_status
,
219 # Get action output by id
220 action_status
= await model
.get_action_output(executed_action
["id"])
221 for k
, v
in action_status
.items():
222 executed_action
[k
] = v
223 executed_actions
.append(executed_action
)
224 except Exception as e
:
226 "Error in getting executed actions for model: {}. Error: {}".format(
232 await self
.disconnect_model(model
)
233 await self
.disconnect_controller(controller
)
234 return executed_actions
236 async def get_application_configs(
237 self
, model_name
: str, application_name
: str
240 Get available configs for an application.
242 :param: model_name: Model name, str.
243 :param: application_name: Application name, str.
245 :return: A dict which has key - action name, value - action description
248 application_configs
= {}
249 controller
= await self
.get_controller()
251 model
= await self
.get_model(controller
, model_name
)
252 application
= self
._get
_application
(
253 model
, application_name
=application_name
255 application_configs
= await application
.get_config()
256 except Exception as e
:
258 "Error in getting configs for application: {} in model: {}. Error: {}".format(
259 application_name
, model_name
, str(e
)
264 await self
.disconnect_model(model
)
265 await self
.disconnect_controller(controller
)
266 return application_configs
268 @retry(attempts
=3, delay
=5)
269 async def get_model(self
, controller
: Controller
, model_name
: str) -> Model
:
271 Get model from controller
273 :param: controller: Controller
274 :param: model_name: Model name
276 :return: Model: The created Juju model object
278 return await controller
.get_model(model_name
)
280 async def model_exists(
281 self
, model_name
: str, controller
: Controller
= None
284 Check if model exists
286 :param: controller: Controller
287 :param: model_name: Model name
291 need_to_disconnect
= False
293 # Get controller if not passed
295 controller
= await self
.get_controller()
296 need_to_disconnect
= True
298 # Check if model exists
300 return model_name
in await controller
.list_models()
302 if need_to_disconnect
:
303 await self
.disconnect_controller(controller
)
305 async def models_exist(self
, model_names
: [str]) -> (bool, list):
307 Check if models exists
309 :param: model_names: List of strings with model names
311 :return (bool, list[str]): (True if all models exists, List of model names that don't exist)
315 "model_names must be a non-empty array. Given value: {}".format(
319 non_existing_models
= []
320 models
= await self
.list_models()
321 existing_models
= list(set(models
).intersection(model_names
))
322 non_existing_models
= list(set(model_names
) - set(existing_models
))
325 len(non_existing_models
) == 0,
329 async def get_model_status(self
, model_name
: str) -> FullStatus
:
333 :param: model_name: Model name
335 :return: Full status object
337 controller
= await self
.get_controller()
338 model
= await self
.get_model(controller
, model_name
)
340 return await model
.get_status()
342 await self
.disconnect_model(model
)
343 await self
.disconnect_controller(controller
)
345 async def create_machine(
348 machine_id
: str = None,
349 db_dict
: dict = None,
350 progress_timeout
: float = None,
351 total_timeout
: float = None,
352 series
: str = "bionic",
354 ) -> (Machine
, bool):
358 :param: model_name: Model name
359 :param: machine_id: Machine id
360 :param: db_dict: Dictionary with data of the DB to write the updates
361 :param: progress_timeout: Maximum time between two updates in the model
362 :param: total_timeout: Timeout for the entity to be active
363 :param: series: Series of the machine (xenial, bionic, focal, ...)
364 :param: wait: Wait until machine is ready
366 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
367 if the machine is new or it already existed
373 "Creating machine (id={}) in model: {}".format(machine_id
, model_name
)
377 controller
= await self
.get_controller()
380 model
= await self
.get_model(controller
, model_name
)
382 if machine_id
is not None:
384 "Searching machine (id={}) in model {}".format(
385 machine_id
, model_name
389 # Get machines from model and get the machine with machine_id if exists
390 machines
= await model
.get_machines()
391 if machine_id
in machines
:
393 "Machine (id={}) found in model {}".format(
394 machine_id
, model_name
397 machine
= machines
[machine_id
]
399 raise JujuMachineNotFound("Machine {} not found".format(machine_id
))
402 self
.log
.debug("Creating a new machine in model {}".format(model_name
))
405 machine
= await model
.add_machine(
406 spec
=None, constraints
=None, disks
=None, series
=series
410 # Wait until the machine is ready
412 "Wait until machine {} is ready in model {}".format(
413 machine
.entity_id
, model_name
417 await JujuModelWatcher
.wait_for(
420 progress_timeout
=progress_timeout
,
421 total_timeout
=total_timeout
,
424 vca_id
=self
.vca_connection
._vca
_id
,
427 await self
.disconnect_model(model
)
428 await self
.disconnect_controller(controller
)
431 "Machine {} ready at {} in model {}".format(
432 machine
.entity_id
, machine
.dns_name
, model_name
437 async def provision_machine(
442 private_key_path
: str,
443 db_dict
: dict = None,
444 progress_timeout
: float = None,
445 total_timeout
: float = None,
448 Manually provisioning of a machine
450 :param: model_name: Model name
451 :param: hostname: IP to access the machine
452 :param: username: Username to login to the machine
453 :param: private_key_path: Local path for the private key
454 :param: db_dict: Dictionary with data of the DB to write the updates
455 :param: progress_timeout: Maximum time between two updates in the model
456 :param: total_timeout: Timeout for the entity to be active
458 :return: (Entity): Machine id
461 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
462 model_name
, hostname
, username
467 controller
= await self
.get_controller()
470 model
= await self
.get_model(controller
, model_name
)
474 provisioner
= AsyncSSHProvisioner(
477 private_key_path
=private_key_path
,
482 params
= await provisioner
.provision_machine()
484 params
.jobs
= ["JobHostUnits"]
486 self
.log
.debug("Adding machine to model")
487 connection
= model
.connection()
488 client_facade
= client
.ClientFacade
.from_connection(connection
)
490 results
= await client_facade
.AddMachines(params
=[params
])
491 error
= results
.machines
[0].error
494 msg
= "Error adding machine: {}".format(error
.message
)
495 self
.log
.error(msg
=msg
)
496 raise ValueError(msg
)
498 machine_id
= results
.machines
[0].machine
500 self
.log
.debug("Installing Juju agent into machine {}".format(machine_id
))
501 asyncio
.ensure_future(
502 provisioner
.install_agent(
503 connection
=connection
,
505 machine_id
=machine_id
,
506 proxy
=self
.vca_connection
.data
.api_proxy
,
507 series
=params
.series
,
513 machine_list
= await model
.get_machines()
514 if machine_id
in machine_list
:
515 self
.log
.debug("Machine {} found in model!".format(machine_id
))
516 machine
= model
.machines
.get(machine_id
)
518 await asyncio
.sleep(2)
521 msg
= "Machine {} not found in model".format(machine_id
)
522 self
.log
.error(msg
=msg
)
523 raise JujuMachineNotFound(msg
)
526 "Wait until machine {} is ready in model {}".format(
527 machine
.entity_id
, model_name
530 await JujuModelWatcher
.wait_for(
533 progress_timeout
=progress_timeout
,
534 total_timeout
=total_timeout
,
537 vca_id
=self
.vca_connection
._vca
_id
,
539 except Exception as e
:
542 await self
.disconnect_model(model
)
543 await self
.disconnect_controller(controller
)
546 "Machine provisioned {} in model {}".format(machine_id
, model_name
)
552 self
, uri
: str, model_name
: str, wait
: bool = True, timeout
: float = 3600
555 Deploy bundle or charm: Similar to the juju CLI command `juju deploy`
557 :param: uri: Path or Charm Store uri in which the charm or bundle can be found
558 :param: model_name: Model name
559 :param: wait: Indicates whether to wait or not until all applications are active
560 :param: timeout: Time in seconds to wait until all applications are active
562 controller
= await self
.get_controller()
563 model
= await self
.get_model(controller
, model_name
)
565 await model
.deploy(uri
, trust
=True)
567 await JujuModelWatcher
.wait_for_model(model
, timeout
=timeout
)
568 self
.log
.debug("All units active in model {}".format(model_name
))
570 await self
.disconnect_model(model
)
571 await self
.disconnect_controller(controller
)
575 application_name
: str,
578 db_dict
: dict = None,
579 progress_timeout
: float = None,
580 total_timeout
: float = None,
584 :param: application_name: Application name
585 :param: model_name: Model name
586 :param: machine_id Machine id
587 :param: db_dict: Dictionary with data of the DB to write the updates
588 :param: progress_timeout: Maximum time between two updates in the model
589 :param: total_timeout: Timeout for the entity to be active
595 controller
= await self
.get_controller()
597 model
= await self
.get_model(controller
, model_name
)
598 application
= self
._get
_application
(model
, application_name
)
600 if application
is not None:
601 # Checks if the given machine id in the model,
602 # otherwise function raises an error
603 _machine
, _series
= self
._get
_machine
_info
(model
, machine_id
)
606 "Adding unit (machine {}) to application {} in model ~{}".format(
607 machine_id
, application_name
, model_name
611 await application
.add_unit(to
=machine_id
)
613 await JujuModelWatcher
.wait_for(
616 progress_timeout
=progress_timeout
,
617 total_timeout
=total_timeout
,
620 vca_id
=self
.vca_connection
._vca
_id
,
623 "Unit is added to application {} in model {}".format(
624 application_name
, model_name
628 raise JujuApplicationNotFound(
629 "Application {} not exists".format(application_name
)
633 await self
.disconnect_model(model
)
634 await self
.disconnect_controller(controller
)
636 async def destroy_unit(
638 application_name
: str,
641 total_timeout
: float = None,
645 :param: application_name: Application name
646 :param: model_name: Model name
647 :param: machine_id Machine id
648 :param: total_timeout: Timeout for the entity to be active
654 controller
= await self
.get_controller()
656 model
= await self
.get_model(controller
, model_name
)
657 application
= self
._get
_application
(model
, application_name
)
659 if application
is None:
660 raise JujuApplicationNotFound(
661 "Application not found: {} (model={})".format(
662 application_name
, model_name
666 unit
= self
._get
_unit
(application
, machine_id
)
669 "A unit with machine id {} not in available units".format(
674 unit_name
= unit
.name
677 "Destroying unit {} from application {} in model {}".format(
678 unit_name
, application_name
, model_name
681 await application
.destroy_unit(unit_name
)
684 "Waiting for unit {} to be destroyed in application {} (model={})...".format(
685 unit_name
, application_name
, model_name
689 # TODO: Add functionality in the Juju watcher to replace this kind of blocks
690 if total_timeout
is None:
692 end
= time
.time() + total_timeout
693 while time
.time() < end
:
694 if not self
._get
_unit
(application
, machine_id
):
696 "The unit {} was destroyed in application {} (model={}) ".format(
697 unit_name
, application_name
, model_name
701 await asyncio
.sleep(5)
703 "Unit {} is destroyed from application {} in model {}".format(
704 unit_name
, application_name
, model_name
709 await self
.disconnect_model(model
)
710 await self
.disconnect_controller(controller
)
712 async def deploy_charm(
714 application_name
: str,
718 db_dict
: dict = None,
719 progress_timeout
: float = None,
720 total_timeout
: float = None,
727 :param: application_name: Application name
728 :param: path: Local path to the charm
729 :param: model_name: Model name
730 :param: machine_id ID of the machine
731 :param: db_dict: Dictionary with data of the DB to write the updates
732 :param: progress_timeout: Maximum time between two updates in the model
733 :param: total_timeout: Timeout for the entity to be active
734 :param: config: Config for the charm
735 :param: series: Series of the charm
736 :param: num_units: Number of units
738 :return: (juju.application.Application): Juju application
741 "Deploying charm {} to machine {} in model ~{}".format(
742 application_name
, machine_id
, model_name
745 self
.log
.debug("charm: {}".format(path
))
748 controller
= await self
.get_controller()
751 model
= await self
.get_model(controller
, model_name
)
754 if application_name
not in model
.applications
:
755 if machine_id
is not None:
756 machine
, series
= self
._get
_machine
_info
(model
, machine_id
)
758 application
= await model
.deploy(
760 application_name
=application_name
,
769 "Wait until application {} is ready in model {}".format(
770 application_name
, model_name
774 for _
in range(num_units
- 1):
775 m
, _
= await self
.create_machine(model_name
, wait
=False)
776 await application
.add_unit(to
=m
.entity_id
)
778 await JujuModelWatcher
.wait_for(
781 progress_timeout
=progress_timeout
,
782 total_timeout
=total_timeout
,
785 vca_id
=self
.vca_connection
._vca
_id
,
788 "Application {} is ready in model {}".format(
789 application_name
, model_name
793 raise JujuApplicationExists(
794 "Application {} exists".format(application_name
)
796 except juju
.errors
.JujuError
as e
:
797 if "already exists" in e
.message
:
798 raise JujuApplicationExists(
799 "Application {} exists".format(application_name
)
804 await self
.disconnect_model(model
)
805 await self
.disconnect_controller(controller
)
809 async def upgrade_charm(
811 application_name
: str,
814 total_timeout
: float = None,
819 :param: application_name: Application name
820 :param: model_name: Model name
821 :param: path: Local path to the charm
822 :param: total_timeout: Timeout for the entity to be active
824 :return: (str, str): (output and status)
828 "Upgrading charm {} in model {} from path {}".format(
829 application_name
, model_name
, path
833 await self
.resolve_application(
834 model_name
=model_name
, application_name
=application_name
838 controller
= await self
.get_controller()
841 model
= await self
.get_model(controller
, model_name
)
845 application
= self
._get
_application
(
847 application_name
=application_name
,
849 if application
is None:
850 raise JujuApplicationNotFound(
851 "Cannot find application {} to upgrade".format(application_name
)
854 await application
.refresh(path
=path
)
857 "Wait until charm upgrade is completed for application {} (model={})".format(
858 application_name
, model_name
862 await JujuModelWatcher
.ensure_units_idle(
863 model
=model
, application
=application
866 if application
.status
== "error":
867 error_message
= "Unknown"
868 for unit
in application
.units
:
870 unit
.workload_status
== "error"
871 and unit
.workload_status_message
!= ""
873 error_message
= unit
.workload_status_message
875 message
= "Application {} failed update in {}: {}".format(
876 application_name
, model_name
, error_message
878 self
.log
.error(message
)
879 raise JujuError(message
=message
)
882 "Application {} is ready in model {}".format(
883 application_name
, model_name
888 await self
.disconnect_model(model
)
889 await self
.disconnect_controller(controller
)
893 async def resolve_application(self
, model_name
: str, application_name
: str):
894 controller
= await self
.get_controller()
895 model
= await self
.get_model(controller
, model_name
)
898 application
= self
._get
_application
(
900 application_name
=application_name
,
902 if application
is None:
903 raise JujuApplicationNotFound(
904 "Cannot find application {} to resolve".format(application_name
)
907 while application
.status
== "error":
908 for unit
in application
.units
:
909 if unit
.workload_status
== "error":
911 "Model {}, Application {}, Unit {} in error state, resolving".format(
912 model_name
, application_name
, unit
.entity_id
916 await unit
.resolved(retry
=False)
920 await asyncio
.sleep(1)
923 await self
.disconnect_model(model
)
924 await self
.disconnect_controller(controller
)
926 async def resolve(self
, model_name
: str):
927 controller
= await self
.get_controller()
928 model
= await self
.get_model(controller
, model_name
)
929 all_units_active
= False
931 while not all_units_active
:
932 all_units_active
= True
933 for application_name
, application
in model
.applications
.items():
934 if application
.status
== "error":
935 for unit
in application
.units
:
936 if unit
.workload_status
== "error":
938 "Model {}, Application {}, Unit {} in error state, resolving".format(
939 model_name
, application_name
, unit
.entity_id
943 await unit
.resolved(retry
=False)
944 all_units_active
= False
948 if not all_units_active
:
949 await asyncio
.sleep(5)
951 await self
.disconnect_model(model
)
952 await self
.disconnect_controller(controller
)
954 async def scale_application(
957 application_name
: str,
959 total_timeout
: float = None,
962 Scale application (K8s)
964 :param: model_name: Model name
965 :param: application_name: Application name
966 :param: scale: Scale to which to set this application
967 :param: total_timeout: Timeout for the entity to be active
971 controller
= await self
.get_controller()
973 model
= await self
.get_model(controller
, model_name
)
976 "Scaling application {} in model {}".format(
977 application_name
, model_name
980 application
= self
._get
_application
(model
, application_name
)
981 if application
is None:
982 raise JujuApplicationNotFound("Cannot scale application")
983 await application
.scale(scale
=scale
)
984 # Wait until application is scaled in model
986 "Waiting for application {} to be scaled in model {}...".format(
987 application_name
, model_name
990 if total_timeout
is None:
992 end
= time
.time() + total_timeout
993 while time
.time() < end
:
994 application_scale
= self
._get
_application
_count
(model
, application_name
)
995 # Before calling wait_for_model function,
996 # wait until application unit count and scale count are equal.
997 # Because there is a delay before scaling triggers in Juju model.
998 if application_scale
== scale
:
999 await JujuModelWatcher
.wait_for_model(
1000 model
=model
, timeout
=total_timeout
1003 "Application {} is scaled in model {}".format(
1004 application_name
, model_name
1008 await asyncio
.sleep(5)
1010 "Timeout waiting for application {} in model {} to be scaled".format(
1011 application_name
, model_name
1016 await self
.disconnect_model(model
)
1017 await self
.disconnect_controller(controller
)
1019 def _get_application_count(self
, model
: Model
, application_name
: str) -> int:
1020 """Get number of units of the application
1022 :param: model: Model object
1023 :param: application_name: Application name
1025 :return: int (or None if application doesn't exist)
1027 application
= self
._get
_application
(model
, application_name
)
1028 if application
is not None:
1029 return len(application
.units
)
1031 def _get_application(self
, model
: Model
, application_name
: str) -> Application
:
1034 :param: model: Model object
1035 :param: application_name: Application name
1037 :return: juju.application.Application (or None if it doesn't exist)
1039 if model
.applications
and application_name
in model
.applications
:
1040 return model
.applications
[application_name
]
1042 def _get_unit(self
, application
: Application
, machine_id
: str) -> Unit
:
1045 :param: application: Application object
1046 :param: machine_id: Machine id
1051 for u
in application
.units
:
1052 if u
.machine_id
== machine_id
:
1057 def _get_machine_info(
1064 :param: model: Model object
1065 :param: machine_id: Machine id
1067 :return: (str, str): (machine, series)
1069 if machine_id
not in model
.machines
:
1070 msg
= "Machine {} not found in model".format(machine_id
)
1071 self
.log
.error(msg
=msg
)
1072 raise JujuMachineNotFound(msg
)
1073 machine
= model
.machines
[machine_id
]
1074 return machine
, machine
.series
1076 async def execute_action(
1078 application_name
: str,
1081 db_dict
: dict = None,
1082 machine_id
: str = None,
1083 progress_timeout
: float = None,
1084 total_timeout
: float = None,
1089 :param: application_name: Application name
1090 :param: model_name: Model name
1091 :param: action_name: Name of the action
1092 :param: db_dict: Dictionary with data of the DB to write the updates
1093 :param: machine_id Machine id
1094 :param: progress_timeout: Maximum time between two updates in the model
1095 :param: total_timeout: Timeout for the entity to be active
1097 :return: (str, str): (output and status)
1100 "Executing action {} using params {}".format(action_name
, kwargs
)
1103 controller
= await self
.get_controller()
1106 model
= await self
.get_model(controller
, model_name
)
1110 application
= self
._get
_application
(
1112 application_name
=application_name
,
1114 if application
is None:
1115 raise JujuApplicationNotFound("Cannot execute action")
1117 # Ocassionally, self._get_leader_unit() will return None
1118 # because the leader elected hook has not been triggered yet.
1119 # Therefore, we are doing some retries. If it happens again,
1121 if machine_id
is None:
1122 unit
= await self
._get
_leader
_unit
(application
)
1124 "Action {} is being executed on the leader unit {}".format(
1125 action_name
, unit
.name
1129 unit
= self
._get
_unit
(application
, machine_id
)
1132 "A unit with machine id {} not in available units".format(
1137 "Action {} is being executed on {} unit".format(
1138 action_name
, unit
.name
1142 actions
= await application
.get_actions()
1144 if action_name
not in actions
:
1145 raise JujuActionNotFound(
1146 "Action {} not in available actions".format(action_name
)
1149 action
= await unit
.run_action(action_name
, **kwargs
)
1152 "Wait until action {} is completed in application {} (model={})".format(
1153 action_name
, application_name
, model_name
1156 await JujuModelWatcher
.wait_for(
1159 progress_timeout
=progress_timeout
,
1160 total_timeout
=total_timeout
,
1163 vca_id
=self
.vca_connection
._vca
_id
,
1166 output
= await model
.get_action_output(action_uuid
=action
.entity_id
)
1167 status
= await model
.get_action_status(uuid_or_prefix
=action
.entity_id
)
1169 status
[action
.entity_id
] if action
.entity_id
in status
else "failed"
1173 "Action {} completed with status {} in application {} (model={})".format(
1174 action_name
, action
.status
, application_name
, model_name
1178 await self
.disconnect_model(model
)
1179 await self
.disconnect_controller(controller
)
1181 return output
, status
1183 async def get_actions(self
, application_name
: str, model_name
: str) -> dict:
1184 """Get list of actions
1186 :param: application_name: Application name
1187 :param: model_name: Model name
1189 :return: Dict with this format
1191 "action_name": "Description of the action",
1196 "Getting list of actions for application {}".format(application_name
)
1200 controller
= await self
.get_controller()
1203 model
= await self
.get_model(controller
, model_name
)
1207 application
= self
._get
_application
(
1209 application_name
=application_name
,
1212 # Return list of actions
1213 return await application
.get_actions()
1216 # Disconnect from model and controller
1217 await self
.disconnect_model(model
)
1218 await self
.disconnect_controller(controller
)
1220 async def get_metrics(self
, model_name
: str, application_name
: str) -> dict:
1221 """Get the metrics collected by the VCA.
1223 :param model_name The name or unique id of the network service
1224 :param application_name The name of the application
1226 if not model_name
or not application_name
:
1227 raise Exception("model_name and application_name must be non-empty strings")
1229 controller
= await self
.get_controller()
1230 model
= await self
.get_model(controller
, model_name
)
1232 application
= self
._get
_application
(model
, application_name
)
1233 if application
is not None:
1234 metrics
= await application
.get_metrics()
1236 self
.disconnect_model(model
)
1237 self
.disconnect_controller(controller
)
1240 async def add_relation(
1248 :param: model_name: Model name
1249 :param: endpoint_1 First endpoint name
1250 ("app:endpoint" format or directly the saas name)
1251 :param: endpoint_2: Second endpoint name (^ same format)
1254 self
.log
.debug("Adding relation: {} -> {}".format(endpoint_1
, endpoint_2
))
1257 controller
= await self
.get_controller()
1260 model
= await self
.get_model(controller
, model_name
)
1264 await model
.add_relation(endpoint_1
, endpoint_2
)
1265 except juju
.errors
.JujuAPIError
as e
:
1266 if "not found" in e
.message
:
1267 self
.log
.warning("Relation not found: {}".format(e
.message
))
1269 if "already exists" in e
.message
:
1270 self
.log
.warning("Relation already exists: {}".format(e
.message
))
1272 # another exception, raise it
1275 await self
.disconnect_model(model
)
1276 await self
.disconnect_controller(controller
)
1278 async def offer(self
, endpoint
: RelationEndpoint
) -> Offer
:
1280 Create an offer from a RelationEndpoint
1282 :param: endpoint: Relation endpoint
1284 :return: Offer object
1286 model_name
= endpoint
.model_name
1287 offer_name
= f
"{endpoint.application_name}-{endpoint.endpoint_name}"
1288 controller
= await self
.get_controller()
1291 model
= await self
.get_model(controller
, model_name
)
1292 await model
.create_offer(endpoint
.endpoint
, offer_name
=offer_name
)
1293 offer_list
= await self
._list
_offers
(model_name
, offer_name
=offer_name
)
1295 return Offer(offer_list
[0].offer_url
)
1297 raise Exception("offer was not created")
1298 except juju
.errors
.JujuError
as e
:
1299 if "application offer already exists" not in e
.message
:
1303 self
.disconnect_model(model
)
1304 self
.disconnect_controller(controller
)
1310 provider_libjuju
: "Libjuju",
1313 Consumes a remote offer in the model. Relations can be created later using "juju relate".
1315 :param: model_name: Model name
1316 :param: offer: Offer object to consume
1317 :param: provider_libjuju: Libjuju object of the provider endpoint
1319 :raises ParseError if there's a problem parsing the offer_url
1320 :raises JujuError if remote offer includes and endpoint
1321 :raises JujuAPIError if the operation is not successful
1323 :returns: Saas name. It is the application name in the model that reference the remote application.
1325 saas_name
= f
'{offer.name}-{offer.model_name.replace("-", "")}'
1327 saas_name
= f
"{saas_name}-{offer.vca_id}"
1328 controller
= await self
.get_controller()
1330 provider_controller
= None
1332 model
= await controller
.get_model(model_name
)
1333 provider_controller
= await provider_libjuju
.get_controller()
1334 await model
.consume(
1335 offer
.url
, application_alias
=saas_name
, controller
=provider_controller
1340 await self
.disconnect_model(model
)
1341 if provider_controller
:
1342 await provider_libjuju
.disconnect_controller(provider_controller
)
1343 await self
.disconnect_controller(controller
)
1345 async def destroy_model(self
, model_name
: str, total_timeout
: float = 1800):
1349 :param: model_name: Model name
1350 :param: total_timeout: Timeout
1353 controller
= await self
.get_controller()
1356 if not await self
.model_exists(model_name
, controller
=controller
):
1357 self
.log
.warn(f
"Model {model_name} doesn't exist")
1360 self
.log
.debug(f
"Getting model {model_name} to be destroyed")
1361 model
= await self
.get_model(controller
, model_name
)
1362 self
.log
.debug(f
"Destroying manual machines in model {model_name}")
1363 # Destroy machines that are manually provisioned
1364 # and still are in pending state
1365 await self
._destroy
_pending
_machines
(model
, only_manual
=True)
1366 await self
.disconnect_model(model
)
1368 await asyncio
.wait_for(
1369 self
._destroy
_model
(model_name
, controller
),
1370 timeout
=total_timeout
,
1372 except Exception as e
:
1373 if not await self
.model_exists(model_name
, controller
=controller
):
1375 f
"Failed deleting model {model_name}: model doesn't exist"
1378 self
.log
.warn(f
"Failed deleting model {model_name}: {e}")
1382 await self
.disconnect_model(model
)
1383 await self
.disconnect_controller(controller
)
1385 async def _destroy_model(
1388 controller
: Controller
,
1391 Destroy model from controller
1393 :param: model: Model name to be removed
1394 :param: controller: Controller object
1395 :param: timeout: Timeout in seconds
1397 self
.log
.debug(f
"Destroying model {model_name}")
1399 async def _destroy_model_gracefully(model_name
: str, controller
: Controller
):
1400 self
.log
.info(f
"Gracefully deleting model {model_name}")
1402 while model_name
in await controller
.list_models():
1404 await self
.resolve(model_name
)
1406 await controller
.destroy_model(model_name
, destroy_storage
=True)
1408 await asyncio
.sleep(5)
1409 self
.log
.info(f
"Model {model_name} deleted gracefully")
1411 async def _destroy_model_forcefully(model_name
: str, controller
: Controller
):
1412 self
.log
.info(f
"Forcefully deleting model {model_name}")
1413 while model_name
in await controller
.list_models():
1414 await controller
.destroy_model(
1415 model_name
, destroy_storage
=True, force
=True, max_wait
=60
1417 await asyncio
.sleep(5)
1418 self
.log
.info(f
"Model {model_name} deleted forcefully")
1422 await asyncio
.wait_for(
1423 _destroy_model_gracefully(model_name
, controller
), timeout
=120
1425 except asyncio
.TimeoutError
:
1426 await _destroy_model_forcefully(model_name
, controller
)
1427 except juju
.errors
.JujuError
as e
:
1428 if any("has been removed" in error
for error
in e
.errors
):
1430 if any("model not found" in error
for error
in e
.errors
):
1434 async def destroy_application(
1435 self
, model_name
: str, application_name
: str, total_timeout
: float
1440 :param: model_name: Model name
1441 :param: application_name: Application name
1442 :param: total_timeout: Timeout
1445 controller
= await self
.get_controller()
1449 model
= await self
.get_model(controller
, model_name
)
1451 "Destroying application {} in model {}".format(
1452 application_name
, model_name
1455 application
= self
._get
_application
(model
, application_name
)
1457 await application
.destroy()
1459 self
.log
.warning("Application not found: {}".format(application_name
))
1462 "Waiting for application {} to be destroyed in model {}...".format(
1463 application_name
, model_name
1466 if total_timeout
is None:
1467 total_timeout
= 3600
1468 end
= time
.time() + total_timeout
1469 while time
.time() < end
:
1470 if not self
._get
_application
(model
, application_name
):
1472 "The application {} was destroyed in model {} ".format(
1473 application_name
, model_name
1477 await asyncio
.sleep(5)
1479 "Timeout waiting for application {} to be destroyed in model {}".format(
1480 application_name
, model_name
1484 if model
is not None:
1485 await self
.disconnect_model(model
)
1486 await self
.disconnect_controller(controller
)
1488 async def _destroy_pending_machines(self
, model
: Model
, only_manual
: bool = False):
1490 Destroy pending machines in a given model
1492 :param: only_manual: Bool that indicates only manually provisioned
1493 machines should be destroyed (if True), or that
1494 all pending machines should be destroyed
1496 status
= await model
.get_status()
1497 for machine_id
in status
.machines
:
1498 machine_status
= status
.machines
[machine_id
]
1499 if machine_status
.agent_status
.status
== "pending":
1500 if only_manual
and not machine_status
.instance_id
.startswith("manual:"):
1502 machine
= model
.machines
[machine_id
]
1503 await machine
.destroy(force
=True)
1505 async def configure_application(
1506 self
, model_name
: str, application_name
: str, config
: dict = None
1508 """Configure application
1510 :param: model_name: Model name
1511 :param: application_name: Application name
1512 :param: config: Config to apply to the charm
1514 self
.log
.debug("Configuring application {}".format(application_name
))
1517 controller
= await self
.get_controller()
1520 model
= await self
.get_model(controller
, model_name
)
1521 application
= self
._get
_application
(
1523 application_name
=application_name
,
1525 await application
.set_config(config
)
1528 await self
.disconnect_model(model
)
1529 await self
.disconnect_controller(controller
)
1531 def handle_exception(self
, loop
, context
):
1532 # All unhandled exceptions by libjuju are handled here.
1535 async def health_check(self
, interval
: float = 300.0):
1537 Health check to make sure controller and controller_model connections are OK
1539 :param: interval: Time in seconds between checks
1544 controller
= await self
.get_controller()
1545 # self.log.debug("VCA is alive")
1546 except Exception as e
:
1547 self
.log
.error("Health check to VCA failed: {}".format(e
))
1549 await self
.disconnect_controller(controller
)
1550 await asyncio
.sleep(interval
)
1552 async def list_models(self
, contains
: str = None) -> [str]:
1553 """List models with certain names
1555 :param: contains: String that is contained in model name
1557 :retur: [models] Returns list of model names
1560 controller
= await self
.get_controller()
1562 models
= await controller
.list_models()
1564 models
= [model
for model
in models
if contains
in model
]
1567 await self
.disconnect_controller(controller
)
1569 async def _list_offers(
1570 self
, model_name
: str, offer_name
: str = None
1571 ) -> QueryApplicationOffersResults
:
1573 List offers within a model
1575 :param: model_name: Model name
1576 :param: offer_name: Offer name to filter.
1578 :return: Returns application offers results in the model
1581 controller
= await self
.get_controller()
1583 offers
= (await controller
.list_offers(model_name
)).results
1586 for offer
in offers
:
1587 if offer
.offer_name
== offer_name
:
1588 matching_offer
.append(offer
)
1590 offers
= matching_offer
1593 await self
.disconnect_controller(controller
)
1600 client_cert_data
: str,
1601 configuration
: Configuration
,
1603 credential_name
: str = None,
1606 Add a Kubernetes cloud to the controller
1608 Similar to the `juju add-k8s` command in the CLI
1610 :param: name: Name for the K8s cloud
1611 :param: configuration: Kubernetes configuration object
1612 :param: storage_class: Storage Class to use in the cloud
1613 :param: credential_name: Storage Class to use in the cloud
1616 if not storage_class
:
1617 raise Exception("storage_class must be a non-empty string")
1619 raise Exception("name must be a non-empty string")
1620 if not configuration
:
1621 raise Exception("configuration must be provided")
1623 endpoint
= configuration
.host
1624 credential
= self
.get_k8s_cloud_credential(
1629 credential
.attrs
[RBAC_LABEL_KEY_NAME
] = rbac_id
1630 cloud
= client
.Cloud(
1632 auth_types
=[credential
.auth_type
],
1634 ca_certificates
=[client_cert_data
],
1636 "operator-storage": storage_class
,
1637 "workload-storage": storage_class
,
1641 return await self
.add_cloud(
1642 name
, cloud
, credential
, credential_name
=credential_name
1645 def get_k8s_cloud_credential(
1647 configuration
: Configuration
,
1648 client_cert_data
: str,
1650 ) -> client
.CloudCredential
:
1652 # TODO: Test with AKS
1653 key
= None # open(configuration.key_file, "r").read()
1654 username
= configuration
.username
1655 password
= configuration
.password
1657 if client_cert_data
:
1658 attrs
["ClientCertificateData"] = client_cert_data
1660 attrs
["ClientKeyData"] = key
1662 if username
or password
:
1663 raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass")
1664 attrs
["Token"] = token
1668 auth_type
= "oauth2"
1669 if client_cert_data
:
1670 auth_type
= "oauth2withcert"
1672 raise JujuInvalidK8sConfiguration(
1673 "missing token for auth type {}".format(auth_type
)
1678 "credential for user {} has empty password".format(username
)
1680 attrs
["username"] = username
1681 attrs
["password"] = password
1682 if client_cert_data
:
1683 auth_type
= "userpasswithcert"
1685 auth_type
= "userpass"
1686 elif client_cert_data
and token
:
1687 auth_type
= "certificate"
1689 raise JujuInvalidK8sConfiguration("authentication method not supported")
1690 return client
.CloudCredential(auth_type
=auth_type
, attrs
=attrs
)
1692 async def add_cloud(
1696 credential
: CloudCredential
= None,
1697 credential_name
: str = None,
1700 Add cloud to the controller
1702 :param: name: Name of the cloud to be added
1703 :param: cloud: Cloud object
1704 :param: credential: CloudCredentials object for the cloud
1705 :param: credential_name: Credential name.
1706 If not defined, cloud of the name will be used.
1708 controller
= await self
.get_controller()
1710 _
= await controller
.add_cloud(name
, cloud
)
1712 await controller
.add_credential(
1713 credential_name
or name
, credential
=credential
, cloud
=name
1715 # Need to return the object returned by the controller.add_cloud() function
1716 # I'm returning the original value now until this bug is fixed:
1717 # https://github.com/juju/python-libjuju/issues/443
1720 await self
.disconnect_controller(controller
)
1722 async def remove_cloud(self
, name
: str):
1726 :param: name: Name of the cloud to be removed
1728 controller
= await self
.get_controller()
1730 await controller
.remove_cloud(name
)
1731 except juju
.errors
.JujuError
as e
:
1732 if len(e
.errors
) == 1 and f
'cloud "{name}" not found' == e
.errors
[0]:
1733 self
.log
.warning(f
"Cloud {name} not found, so it could not be deleted.")
1737 await self
.disconnect_controller(controller
)
1739 @retry(attempts
=20, delay
=5, fallback
=JujuLeaderUnitNotFound())
1740 async def _get_leader_unit(self
, application
: Application
) -> Unit
:
1742 for u
in application
.units
:
1743 if await u
.is_leader_from_status():
1750 async def get_cloud_credentials(self
, cloud
: Cloud
) -> typing
.List
:
1752 Get cloud credentials
1754 :param: cloud: Cloud object. The returned credentials will be from this cloud.
1756 :return: List of credentials object associated to the specified cloud
1759 controller
= await self
.get_controller()
1761 facade
= client
.CloudFacade
.from_connection(controller
.connection())
1762 cloud_cred_tag
= tag
.credential(
1763 cloud
.name
, self
.vca_connection
.data
.user
, cloud
.credential_name
1765 params
= [client
.Entity(cloud_cred_tag
)]
1766 return (await facade
.Credential(params
)).results
1768 await self
.disconnect_controller(controller
)
1770 async def check_application_exists(self
, model_name
, application_name
) -> bool:
1771 """Check application exists
1773 :param: model_name: Model Name
1774 :param: application_name: Application Name
1780 controller
= await self
.get_controller()
1782 model
= await self
.get_model(controller
, model_name
)
1784 "Checking if application {} exists in model {}".format(
1785 application_name
, model_name
1788 return self
._get
_application
(model
, application_name
) is not None
1791 await self
.disconnect_model(model
)
1792 await self
.disconnect_controller(controller
)