1 # Copyright 2020 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
17 from juju
.controller
import Controller
18 from juju
.client
import client
21 from juju
.errors
import JujuAPIError
22 from juju
.model
import Model
23 from juju
.machine
import Machine
24 from juju
.application
import Application
25 from juju
.unit
import Unit
26 from juju
.client
._definitions
import (
28 QueryApplicationOffersResults
,
32 from n2vc
.juju_watcher
import JujuModelWatcher
33 from n2vc
.provisioner
import AsyncSSHProvisioner
34 from n2vc
.n2vc_conn
import N2VCConnector
35 from n2vc
.exceptions
import (
37 JujuApplicationNotFound
,
38 JujuLeaderUnitNotFound
,
40 JujuModelAlreadyExists
,
41 JujuControllerFailedConnecting
,
42 JujuApplicationExists
,
43 JujuInvalidK8sConfiguration
,
45 from n2vc
.utils
import DB_DATA
46 from osm_common
.dbbase
import DbException
47 from kubernetes
.client
.configuration
import Configuration
58 loop
: asyncio
.AbstractEventLoop
= None,
59 log
: logging
.Logger
= None,
61 n2vc
: N2VCConnector
= None,
62 apt_mirror
: str = None,
63 enable_os_upgrade
: bool = True,
68 :param: endpoint: Endpoint of the juju controller (host:port)
69 :param: api_proxy: Endpoint of the juju controller - Reachable from the VNFs
70 :param: username: Juju username
71 :param: password: Juju password
72 :param: cacert: Juju CA Certificate
73 :param: loop: Asyncio loop
76 :param: n2vc: N2VC object
77 :param: apt_mirror: APT Mirror
78 :param: enable_os_upgrade: Enable OS Upgrade
81 self
.log
= log
or logging
.getLogger("Libjuju")
83 db_endpoints
= self
._get
_api
_endpoints
_db
()
84 self
.endpoints
= db_endpoints
or [endpoint
]
85 if db_endpoints
is None:
86 self
._update
_api
_endpoints
_db
(self
.endpoints
)
87 self
.api_proxy
= api_proxy
88 self
.username
= username
89 self
.password
= password
91 self
.loop
= loop
or asyncio
.get_event_loop()
94 # Generate config for models
95 self
.model_config
= {}
97 self
.model_config
["apt-mirror"] = apt_mirror
98 self
.model_config
["enable-os-refresh-update"] = enable_os_upgrade
99 self
.model_config
["enable-os-upgrade"] = enable_os_upgrade
101 self
.loop
.set_exception_handler(self
.handle_exception
)
102 self
.creating_model
= asyncio
.Lock(loop
=self
.loop
)
105 self
.log
.debug("Libjuju initialized!")
107 self
.health_check_task
= self
.loop
.create_task(self
.health_check())
109 async def get_controller(self
, timeout
: float = 5.0) -> Controller
:
113 :param: timeout: Time in seconds to wait for controller to connect
117 controller
= Controller(loop
=self
.loop
)
118 await asyncio
.wait_for(
120 endpoint
=self
.endpoints
,
121 username
=self
.username
,
122 password
=self
.password
,
127 endpoints
= await controller
.api_endpoints
128 if self
.endpoints
!= endpoints
:
129 self
.endpoints
= endpoints
130 self
._update
_api
_endpoints
_db
(self
.endpoints
)
132 except asyncio
.CancelledError
as e
:
134 except Exception as e
:
136 "Failed connecting to controller: {}...".format(self
.endpoints
)
139 await self
.disconnect_controller(controller
)
140 raise JujuControllerFailedConnecting(e
)
142 async def disconnect(self
):
144 # Cancel health check task
145 self
.health_check_task
.cancel()
146 self
.log
.debug("Libjuju disconnected!")
148 async def disconnect_model(self
, model
: Model
):
152 :param: model: Model that will be disconnected
154 await model
.disconnect()
156 async def disconnect_controller(self
, controller
: Controller
):
158 Disconnect controller
160 :param: controller: Controller that will be disconnected
162 await controller
.disconnect()
164 async def add_model(self
, model_name
: str, cloud_name
: str, credential_name
=None):
168 :param: model_name: Model name
169 :param: cloud_name: Cloud name
170 :param: credential_name: Credential name to use for adding the model
171 If not specified, same name as the cloud will be used.
175 controller
= await self
.get_controller()
178 # Raise exception if model already exists
179 if await self
.model_exists(model_name
, controller
=controller
):
180 raise JujuModelAlreadyExists(
181 "Model {} already exists.".format(model_name
)
184 # Block until other workers have finished model creation
185 while self
.creating_model
.locked():
186 await asyncio
.sleep(0.1)
188 # If the model exists, return it from the controller
189 if model_name
in self
.models
:
193 async with self
.creating_model
:
194 self
.log
.debug("Creating model {}".format(model_name
))
195 model
= await controller
.add_model(
197 config
=self
.model_config
,
198 cloud_name
=cloud_name
,
199 credential_name
=credential_name
or cloud_name
,
201 self
.models
.add(model_name
)
204 await self
.disconnect_model(model
)
205 await self
.disconnect_controller(controller
)
208 self
, controller
: Controller
, model_name
: str, id=None
211 Get model from controller
213 :param: controller: Controller
214 :param: model_name: Model name
216 :return: Model: The created Juju model object
218 return await controller
.get_model(model_name
)
220 async def model_exists(
221 self
, model_name
: str, controller
: Controller
= None
224 Check if model exists
226 :param: controller: Controller
227 :param: model_name: Model name
231 need_to_disconnect
= False
233 # Get controller if not passed
235 controller
= await self
.get_controller()
236 need_to_disconnect
= True
238 # Check if model exists
240 return model_name
in await controller
.list_models()
242 if need_to_disconnect
:
243 await self
.disconnect_controller(controller
)
245 async def models_exist(self
, model_names
: [str]) -> (bool, list):
247 Check if models exists
249 :param: model_names: List of strings with model names
251 :return (bool, list[str]): (True if all models exists, List of model names that don't exist)
255 "model_names must be a non-empty array. Given value: {}".format(
259 non_existing_models
= []
260 models
= await self
.list_models()
261 existing_models
= list(set(models
).intersection(model_names
))
262 non_existing_models
= list(set(model_names
) - set(existing_models
))
265 len(non_existing_models
) == 0,
269 async def get_model_status(self
, model_name
: str) -> FullStatus
:
273 :param: model_name: Model name
275 :return: Full status object
277 controller
= await self
.get_controller()
278 model
= await self
.get_model(controller
, model_name
)
280 return await model
.get_status()
282 await self
.disconnect_model(model
)
283 await self
.disconnect_controller(controller
)
285 async def create_machine(
288 machine_id
: str = None,
289 db_dict
: dict = None,
290 progress_timeout
: float = None,
291 total_timeout
: float = None,
292 series
: str = "xenial",
294 ) -> (Machine
, bool):
298 :param: model_name: Model name
299 :param: machine_id: Machine id
300 :param: db_dict: Dictionary with data of the DB to write the updates
301 :param: progress_timeout: Maximum time between two updates in the model
302 :param: total_timeout: Timeout for the entity to be active
303 :param: series: Series of the machine (xenial, bionic, focal, ...)
304 :param: wait: Wait until machine is ready
306 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
307 if the machine is new or it already existed
313 "Creating machine (id={}) in model: {}".format(machine_id
, model_name
)
317 controller
= await self
.get_controller()
320 model
= await self
.get_model(controller
, model_name
)
322 if machine_id
is not None:
324 "Searching machine (id={}) in model {}".format(
325 machine_id
, model_name
329 # Get machines from model and get the machine with machine_id if exists
330 machines
= await model
.get_machines()
331 if machine_id
in machines
:
333 "Machine (id={}) found in model {}".format(
334 machine_id
, model_name
337 machine
= machines
[machine_id
]
339 raise JujuMachineNotFound("Machine {} not found".format(machine_id
))
342 self
.log
.debug("Creating a new machine in model {}".format(model_name
))
345 machine
= await model
.add_machine(
346 spec
=None, constraints
=None, disks
=None, series
=series
350 # Wait until the machine is ready
352 "Wait until machine {} is ready in model {}".format(
353 machine
.entity_id
, model_name
357 await JujuModelWatcher
.wait_for(
360 progress_timeout
=progress_timeout
,
361 total_timeout
=total_timeout
,
366 await self
.disconnect_model(model
)
367 await self
.disconnect_controller(controller
)
370 "Machine {} ready at {} in model {}".format(
371 machine
.entity_id
, machine
.dns_name
, model_name
376 async def provision_machine(
381 private_key_path
: str,
382 db_dict
: dict = None,
383 progress_timeout
: float = None,
384 total_timeout
: float = None,
387 Manually provisioning of a machine
389 :param: model_name: Model name
390 :param: hostname: IP to access the machine
391 :param: username: Username to login to the machine
392 :param: private_key_path: Local path for the private key
393 :param: db_dict: Dictionary with data of the DB to write the updates
394 :param: progress_timeout: Maximum time between two updates in the model
395 :param: total_timeout: Timeout for the entity to be active
397 :return: (Entity): Machine id
400 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
401 model_name
, hostname
, username
406 controller
= await self
.get_controller()
409 model
= await self
.get_model(controller
, model_name
)
413 provisioner
= AsyncSSHProvisioner(
416 private_key_path
=private_key_path
,
421 params
= await provisioner
.provision_machine()
423 params
.jobs
= ["JobHostUnits"]
425 self
.log
.debug("Adding machine to model")
426 connection
= model
.connection()
427 client_facade
= client
.ClientFacade
.from_connection(connection
)
429 results
= await client_facade
.AddMachines(params
=[params
])
430 error
= results
.machines
[0].error
433 msg
= "Error adding machine: {}".format(error
.message
)
434 self
.log
.error(msg
=msg
)
435 raise ValueError(msg
)
437 machine_id
= results
.machines
[0].machine
439 self
.log
.debug("Installing Juju agent into machine {}".format(machine_id
))
440 asyncio
.ensure_future(
441 provisioner
.install_agent(
442 connection
=connection
,
444 machine_id
=machine_id
,
445 proxy
=self
.api_proxy
,
451 machine_list
= await model
.get_machines()
452 if machine_id
in machine_list
:
453 self
.log
.debug("Machine {} found in model!".format(machine_id
))
454 machine
= model
.machines
.get(machine_id
)
456 await asyncio
.sleep(2)
459 msg
= "Machine {} not found in model".format(machine_id
)
460 self
.log
.error(msg
=msg
)
461 raise JujuMachineNotFound(msg
)
464 "Wait until machine {} is ready in model {}".format(
465 machine
.entity_id
, model_name
468 await JujuModelWatcher
.wait_for(
471 progress_timeout
=progress_timeout
,
472 total_timeout
=total_timeout
,
476 except Exception as e
:
479 await self
.disconnect_model(model
)
480 await self
.disconnect_controller(controller
)
483 "Machine provisioned {} in model {}".format(machine_id
, model_name
)
488 async def deploy_charm(
490 application_name
: str,
494 db_dict
: dict = None,
495 progress_timeout
: float = None,
496 total_timeout
: float = None,
503 :param: application_name: Application name
504 :param: path: Local path to the charm
505 :param: model_name: Model name
506 :param: machine_id ID of the machine
507 :param: db_dict: Dictionary with data of the DB to write the updates
508 :param: progress_timeout: Maximum time between two updates in the model
509 :param: total_timeout: Timeout for the entity to be active
510 :param: config: Config for the charm
511 :param: series: Series of the charm
512 :param: num_units: Number of units
514 :return: (juju.application.Application): Juju application
517 "Deploying charm {} to machine {} in model ~{}".format(
518 application_name
, machine_id
, model_name
521 self
.log
.debug("charm: {}".format(path
))
524 controller
= await self
.get_controller()
527 model
= await self
.get_model(controller
, model_name
)
531 if application_name
not in model
.applications
:
533 if machine_id
is not None:
534 if machine_id
not in model
.machines
:
535 msg
= "Machine {} not found in model".format(machine_id
)
536 self
.log
.error(msg
=msg
)
537 raise JujuMachineNotFound(msg
)
538 machine
= model
.machines
[machine_id
]
539 series
= machine
.series
541 application
= await model
.deploy(
543 application_name
=application_name
,
552 "Wait until application {} is ready in model {}".format(
553 application_name
, model_name
557 for _
in range(num_units
- 1):
558 m
, _
= await self
.create_machine(model_name
, wait
=False)
559 await application
.add_unit(to
=m
.entity_id
)
561 await JujuModelWatcher
.wait_for(
564 progress_timeout
=progress_timeout
,
565 total_timeout
=total_timeout
,
570 "Application {} is ready in model {}".format(
571 application_name
, model_name
575 raise JujuApplicationExists(
576 "Application {} exists".format(application_name
)
579 await self
.disconnect_model(model
)
580 await self
.disconnect_controller(controller
)
584 def _get_application(self
, model
: Model
, application_name
: str) -> Application
:
587 :param: model: Model object
588 :param: application_name: Application name
590 :return: juju.application.Application (or None if it doesn't exist)
592 if model
.applications
and application_name
in model
.applications
:
593 return model
.applications
[application_name
]
595 async def execute_action(
597 application_name
: str,
600 db_dict
: dict = None,
601 progress_timeout
: float = None,
602 total_timeout
: float = None,
607 :param: application_name: Application name
608 :param: model_name: Model name
609 :param: action_name: Name of the action
610 :param: db_dict: Dictionary with data of the DB to write the updates
611 :param: progress_timeout: Maximum time between two updates in the model
612 :param: total_timeout: Timeout for the entity to be active
614 :return: (str, str): (output and status)
617 "Executing action {} using params {}".format(action_name
, kwargs
)
620 controller
= await self
.get_controller()
623 model
= await self
.get_model(controller
, model_name
)
627 application
= self
._get
_application
(
628 model
, application_name
=application_name
,
630 if application
is None:
631 raise JujuApplicationNotFound("Cannot execute action")
635 # Ocassionally, self._get_leader_unit() will return None
636 # because the leader elected hook has not been triggered yet.
637 # Therefore, we are doing some retries. If it happens again,
640 time_between_retries
= 10
642 for _
in range(attempts
):
643 unit
= await self
._get
_leader
_unit
(application
)
645 await asyncio
.sleep(time_between_retries
)
649 raise JujuLeaderUnitNotFound(
650 "Cannot execute action: leader unit not found"
653 actions
= await application
.get_actions()
655 if action_name
not in actions
:
656 raise JujuActionNotFound(
657 "Action {} not in available actions".format(action_name
)
660 action
= await unit
.run_action(action_name
, **kwargs
)
663 "Wait until action {} is completed in application {} (model={})".format(
664 action_name
, application_name
, model_name
667 await JujuModelWatcher
.wait_for(
670 progress_timeout
=progress_timeout
,
671 total_timeout
=total_timeout
,
676 output
= await model
.get_action_output(action_uuid
=action
.entity_id
)
677 status
= await model
.get_action_status(uuid_or_prefix
=action
.entity_id
)
679 status
[action
.entity_id
] if action
.entity_id
in status
else "failed"
683 "Action {} completed with status {} in application {} (model={})".format(
684 action_name
, action
.status
, application_name
, model_name
688 await self
.disconnect_model(model
)
689 await self
.disconnect_controller(controller
)
691 return output
, status
693 async def get_actions(self
, application_name
: str, model_name
: str) -> dict:
694 """Get list of actions
696 :param: application_name: Application name
697 :param: model_name: Model name
699 :return: Dict with this format
701 "action_name": "Description of the action",
706 "Getting list of actions for application {}".format(application_name
)
710 controller
= await self
.get_controller()
713 model
= await self
.get_model(controller
, model_name
)
717 application
= self
._get
_application
(
718 model
, application_name
=application_name
,
721 # Return list of actions
722 return await application
.get_actions()
725 # Disconnect from model and controller
726 await self
.disconnect_model(model
)
727 await self
.disconnect_controller(controller
)
729 async def get_metrics(self
, model_name
: str, application_name
: str) -> dict:
730 """Get the metrics collected by the VCA.
732 :param model_name The name or unique id of the network service
733 :param application_name The name of the application
735 if not model_name
or not application_name
:
736 raise Exception("model_name and application_name must be non-empty strings")
738 controller
= await self
.get_controller()
739 model
= await self
.get_model(controller
, model_name
)
741 application
= self
._get
_application
(model
, application_name
)
742 if application
is not None:
743 metrics
= await application
.get_metrics()
745 self
.disconnect_model(model
)
746 self
.disconnect_controller(controller
)
749 async def add_relation(
750 self
, model_name
: str, endpoint_1
: str, endpoint_2
: str,
754 :param: model_name: Model name
755 :param: endpoint_1 First endpoint name
756 ("app:endpoint" format or directly the saas name)
757 :param: endpoint_2: Second endpoint name (^ same format)
760 self
.log
.debug("Adding relation: {} -> {}".format(endpoint_1
, endpoint_2
))
763 controller
= await self
.get_controller()
766 model
= await self
.get_model(controller
, model_name
)
770 await model
.add_relation(endpoint_1
, endpoint_2
)
771 except JujuAPIError
as e
:
772 if "not found" in e
.message
:
773 self
.log
.warning("Relation not found: {}".format(e
.message
))
775 if "already exists" in e
.message
:
776 self
.log
.warning("Relation already exists: {}".format(e
.message
))
778 # another exception, raise it
781 await self
.disconnect_model(model
)
782 await self
.disconnect_controller(controller
)
785 self
, offer_url
: str, model_name
: str,
788 Adds a remote offer to the model. Relations can be created later using "juju relate".
790 :param: offer_url: Offer Url
791 :param: model_name: Model name
793 :raises ParseError if there's a problem parsing the offer_url
794 :raises JujuError if remote offer includes and endpoint
795 :raises JujuAPIError if the operation is not successful
797 controller
= await self
.get_controller()
798 model
= await controller
.get_model(model_name
)
801 await model
.consume(offer_url
)
803 await self
.disconnect_model(model
)
804 await self
.disconnect_controller(controller
)
806 async def destroy_model(self
, model_name
: str, total_timeout
: float):
810 :param: model_name: Model name
811 :param: total_timeout: Timeout
814 controller
= await self
.get_controller()
815 model
= await self
.get_model(controller
, model_name
)
817 self
.log
.debug("Destroying model {}".format(model_name
))
818 uuid
= model
.info
.uuid
821 await self
.disconnect_model(model
)
824 if model_name
in self
.models
:
825 self
.models
.remove(model_name
)
827 await controller
.destroy_model(uuid
, force
=True, max_wait
=0)
829 # Wait until model is destroyed
830 self
.log
.debug("Waiting for model {} to be destroyed...".format(model_name
))
832 if total_timeout
is None:
834 end
= time
.time() + total_timeout
835 while time
.time() < end
:
836 models
= await controller
.list_models()
837 if model_name
not in models
:
839 "The model {} ({}) was destroyed".format(model_name
, uuid
)
842 await asyncio
.sleep(5)
844 "Timeout waiting for model {} to be destroyed".format(model_name
)
847 await self
.disconnect_controller(controller
)
849 async def destroy_application(self
, model
: Model
, application_name
: str):
853 :param: model: Model object
854 :param: application_name: Application name
857 "Destroying application {} in model {}".format(
858 application_name
, model
.info
.name
861 application
= model
.applications
.get(application_name
)
863 await application
.destroy()
865 self
.log
.warning("Application not found: {}".format(application_name
))
867 # async def destroy_machine(
868 # self, model: Model, machine_id: str, total_timeout: float = 3600
873 # :param: model: Model object
874 # :param: machine_id: Machine id
875 # :param: total_timeout: Timeout in seconds
877 # machines = await model.get_machines()
878 # if machine_id in machines:
879 # machine = machines[machine_id]
880 # await machine.destroy(force=True)
882 # end = time.time() + total_timeout
884 # # wait for machine removal
885 # machines = await model.get_machines()
886 # while machine_id in machines and time.time() < end:
887 # self.log.debug("Waiting for machine {} is destroyed".format(machine_id))
888 # await asyncio.sleep(0.5)
889 # machines = await model.get_machines()
890 # self.log.debug("Machine destroyed: {}".format(machine_id))
892 # self.log.debug("Machine not found: {}".format(machine_id))
894 async def configure_application(
895 self
, model_name
: str, application_name
: str, config
: dict = None
897 """Configure application
899 :param: model_name: Model name
900 :param: application_name: Application name
901 :param: config: Config to apply to the charm
903 self
.log
.debug("Configuring application {}".format(application_name
))
907 controller
= await self
.get_controller()
908 model
= await self
.get_model(controller
, model_name
)
909 application
= self
._get
_application
(
910 model
, application_name
=application_name
,
912 await application
.set_config(config
)
914 await self
.disconnect_model(model
)
915 await self
.disconnect_controller(controller
)
917 def _get_api_endpoints_db(self
) -> [str]:
919 Get API Endpoints from DB
921 :return: List of API endpoints
923 self
.log
.debug("Getting endpoints from database")
925 juju_info
= self
.db
.get_one(
926 DB_DATA
.api_endpoints
.table
,
927 q_filter
=DB_DATA
.api_endpoints
.filter,
930 if juju_info
and DB_DATA
.api_endpoints
.key
in juju_info
:
931 return juju_info
[DB_DATA
.api_endpoints
.key
]
933 def _update_api_endpoints_db(self
, endpoints
: [str]):
935 Update API endpoints in Database
937 :param: List of endpoints
939 self
.log
.debug("Saving endpoints {} in database".format(endpoints
))
941 juju_info
= self
.db
.get_one(
942 DB_DATA
.api_endpoints
.table
,
943 q_filter
=DB_DATA
.api_endpoints
.filter,
946 # If it doesn't, then create it
950 DB_DATA
.api_endpoints
.table
, DB_DATA
.api_endpoints
.filter,
952 except DbException
as e
:
953 # Racing condition: check if another N2VC worker has created it
954 juju_info
= self
.db
.get_one(
955 DB_DATA
.api_endpoints
.table
,
956 q_filter
=DB_DATA
.api_endpoints
.filter,
962 DB_DATA
.api_endpoints
.table
,
963 DB_DATA
.api_endpoints
.filter,
964 {DB_DATA
.api_endpoints
.key
: endpoints
},
967 def handle_exception(self
, loop
, context
):
968 # All unhandled exceptions by libjuju are handled here.
971 async def health_check(self
, interval
: float = 300.0):
973 Health check to make sure controller and controller_model connections are OK
975 :param: interval: Time in seconds between checks
979 controller
= await self
.get_controller()
980 # self.log.debug("VCA is alive")
981 except Exception as e
:
982 self
.log
.error("Health check to VCA failed: {}".format(e
))
984 await self
.disconnect_controller(controller
)
985 await asyncio
.sleep(interval
)
987 async def list_models(self
, contains
: str = None) -> [str]:
988 """List models with certain names
990 :param: contains: String that is contained in model name
992 :retur: [models] Returns list of model names
995 controller
= await self
.get_controller()
997 models
= await controller
.list_models()
999 models
= [model
for model
in models
if contains
in model
]
1002 await self
.disconnect_controller(controller
)
1004 async def list_offers(self
, model_name
: str) -> QueryApplicationOffersResults
:
1005 """List models with certain names
1007 :param: model_name: Model name
1009 :return: Returns list of offers
1012 controller
= await self
.get_controller()
1014 return await controller
.list_offers(model_name
)
1016 await self
.disconnect_controller(controller
)
1021 configuration
: Configuration
,
1023 credential_name
: str = None,
1026 Add a Kubernetes cloud to the controller
1028 Similar to the `juju add-k8s` command in the CLI
1030 :param: name: Name for the K8s cloud
1031 :param: configuration: Kubernetes configuration object
1032 :param: storage_class: Storage Class to use in the cloud
1033 :param: credential_name: Storage Class to use in the cloud
1036 if not storage_class
:
1037 raise Exception("storage_class must be a non-empty string")
1039 raise Exception("name must be a non-empty string")
1040 if not configuration
:
1041 raise Exception("configuration must be provided")
1043 endpoint
= configuration
.host
1044 credential
= self
.get_k8s_cloud_credential(configuration
)
1046 [credential
.attrs
["ClientCertificateData"]]
1047 if "ClientCertificateData" in credential
.attrs
1050 cloud
= client
.Cloud(
1052 auth_types
=[credential
.auth_type
],
1054 ca_certificates
=ca_certificates
,
1056 "operator-storage": storage_class
,
1057 "workload-storage": storage_class
,
1061 return await self
.add_cloud(
1062 name
, cloud
, credential
, credential_name
=credential_name
1065 def get_k8s_cloud_credential(
1066 self
, configuration
: Configuration
,
1067 ) -> client
.CloudCredential
:
1069 ca_cert
= configuration
.ssl_ca_cert
or configuration
.cert_file
1070 key
= configuration
.key_file
1071 api_key
= configuration
.api_key
1073 username
= configuration
.username
1074 password
= configuration
.password
1076 if "authorization" in api_key
:
1077 authorization
= api_key
["authorization"]
1078 if "Bearer " in authorization
:
1079 bearer_list
= authorization
.split(" ")
1080 if len(bearer_list
) == 2:
1081 [_
, token
] = bearer_list
1083 raise JujuInvalidK8sConfiguration("unknown format of api_key")
1085 token
= authorization
1087 attrs
["ClientCertificateData"] = open(ca_cert
, "r").read()
1089 attrs
["ClientKeyData"] = open(key
, "r").read()
1091 if username
or password
:
1092 raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass")
1093 attrs
["Token"] = token
1097 auth_type
= "oauth2"
1099 raise JujuInvalidK8sConfiguration(
1100 "missing token for auth type {}".format(auth_type
)
1105 "credential for user {} has empty password".format(username
)
1107 attrs
["username"] = username
1108 attrs
["password"] = password
1110 auth_type
= "userpasswithcert"
1112 auth_type
= "userpass"
1113 elif ca_cert
and token
:
1114 auth_type
= "certificate"
1116 raise JujuInvalidK8sConfiguration("authentication method not supported")
1117 return client
.CloudCredential(auth_type
=auth_type
, attrs
=attrs
,)
1119 async def add_cloud(
1123 credential
: CloudCredential
= None,
1124 credential_name
: str = None,
1127 Add cloud to the controller
1129 :param: name: Name of the cloud to be added
1130 :param: cloud: Cloud object
1131 :param: credential: CloudCredentials object for the cloud
1132 :param: credential_name: Credential name.
1133 If not defined, cloud of the name will be used.
1135 controller
= await self
.get_controller()
1137 _
= await controller
.add_cloud(name
, cloud
)
1139 await controller
.add_credential(
1140 credential_name
or name
, credential
=credential
, cloud
=name
1142 # Need to return the object returned by the controller.add_cloud() function
1143 # I'm returning the original value now until this bug is fixed:
1144 # https://github.com/juju/python-libjuju/issues/443
1147 await self
.disconnect_controller(controller
)
1149 async def remove_cloud(self
, name
: str):
1153 :param: name: Name of the cloud to be removed
1155 controller
= await self
.get_controller()
1157 await controller
.remove_cloud(name
)
1159 await self
.disconnect_controller(controller
)
1161 async def _get_leader_unit(self
, application
: Application
) -> Unit
:
1163 for u
in application
.units
:
1164 if await u
.is_leader_from_status():