1 # Copyright 2020 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
17 from juju
.controller
import Controller
18 from juju
.client
import client
21 from juju
.errors
import JujuAPIError
22 from juju
.model
import Model
23 from juju
.machine
import Machine
24 from juju
.application
import Application
25 from juju
.unit
import Unit
26 from juju
.client
._definitions
import (
28 QueryApplicationOffersResults
,
32 from n2vc
.juju_watcher
import JujuModelWatcher
33 from n2vc
.provisioner
import AsyncSSHProvisioner
34 from n2vc
.n2vc_conn
import N2VCConnector
35 from n2vc
.exceptions
import (
37 JujuApplicationNotFound
,
38 JujuLeaderUnitNotFound
,
40 JujuModelAlreadyExists
,
41 JujuControllerFailedConnecting
,
42 JujuApplicationExists
,
43 JujuInvalidK8sConfiguration
,
45 from n2vc
.utils
import DB_DATA
46 from osm_common
.dbbase
import DbException
47 from kubernetes
.client
.configuration
import Configuration
58 loop
: asyncio
.AbstractEventLoop
= None,
59 log
: logging
.Logger
= None,
61 n2vc
: N2VCConnector
= None,
62 apt_mirror
: str = None,
63 enable_os_upgrade
: bool = True,
68 :param: endpoint: Endpoint of the juju controller (host:port)
69 :param: api_proxy: Endpoint of the juju controller - Reachable from the VNFs
70 :param: username: Juju username
71 :param: password: Juju password
72 :param: cacert: Juju CA Certificate
73 :param: loop: Asyncio loop
76 :param: n2vc: N2VC object
77 :param: apt_mirror: APT Mirror
78 :param: enable_os_upgrade: Enable OS Upgrade
81 self
.log
= log
or logging
.getLogger("Libjuju")
83 db_endpoints
= self
._get
_api
_endpoints
_db
()
84 self
.endpoints
= db_endpoints
or [endpoint
]
85 if db_endpoints
is None:
86 self
._update
_api
_endpoints
_db
(self
.endpoints
)
87 self
.api_proxy
= api_proxy
88 self
.username
= username
89 self
.password
= password
91 self
.loop
= loop
or asyncio
.get_event_loop()
94 # Generate config for models
95 self
.model_config
= {}
97 self
.model_config
["apt-mirror"] = apt_mirror
98 self
.model_config
["enable-os-refresh-update"] = enable_os_upgrade
99 self
.model_config
["enable-os-upgrade"] = enable_os_upgrade
101 self
.loop
.set_exception_handler(self
.handle_exception
)
102 self
.creating_model
= asyncio
.Lock(loop
=self
.loop
)
105 self
.log
.debug("Libjuju initialized!")
107 self
.health_check_task
= self
.loop
.create_task(self
.health_check())
109 async def get_controller(self
, timeout
: float = 5.0) -> Controller
:
113 :param: timeout: Time in seconds to wait for controller to connect
117 controller
= Controller(loop
=self
.loop
)
118 await asyncio
.wait_for(
120 endpoint
=self
.endpoints
,
121 username
=self
.username
,
122 password
=self
.password
,
127 endpoints
= await controller
.api_endpoints
128 if self
.endpoints
!= endpoints
:
129 self
.endpoints
= endpoints
130 self
._update
_api
_endpoints
_db
(self
.endpoints
)
132 except asyncio
.CancelledError
as e
:
134 except Exception as e
:
136 "Failed connecting to controller: {}...".format(self
.endpoints
)
139 await self
.disconnect_controller(controller
)
140 raise JujuControllerFailedConnecting(e
)
142 async def disconnect(self
):
144 # Cancel health check task
145 self
.health_check_task
.cancel()
146 self
.log
.debug("Libjuju disconnected!")
148 async def disconnect_model(self
, model
: Model
):
152 :param: model: Model that will be disconnected
154 await model
.disconnect()
156 async def disconnect_controller(self
, controller
: Controller
):
158 Disconnect controller
160 :param: controller: Controller that will be disconnected
162 await controller
.disconnect()
164 async def add_model(self
, model_name
: str, cloud_name
: str, credential_name
=None):
168 :param: model_name: Model name
169 :param: cloud_name: Cloud name
170 :param: credential_name: Credential name to use for adding the model
171 If not specified, same name as the cloud will be used.
175 controller
= await self
.get_controller()
178 # Raise exception if model already exists
179 if await self
.model_exists(model_name
, controller
=controller
):
180 raise JujuModelAlreadyExists(
181 "Model {} already exists.".format(model_name
)
184 # Block until other workers have finished model creation
185 while self
.creating_model
.locked():
186 await asyncio
.sleep(0.1)
188 # If the model exists, return it from the controller
189 if model_name
in self
.models
:
193 async with self
.creating_model
:
194 self
.log
.debug("Creating model {}".format(model_name
))
195 model
= await controller
.add_model(
197 config
=self
.model_config
,
198 cloud_name
=cloud_name
,
199 credential_name
=credential_name
or cloud_name
,
201 self
.models
.add(model_name
)
204 await self
.disconnect_model(model
)
205 await self
.disconnect_controller(controller
)
208 self
, controller
: Controller
, model_name
: str, id=None
211 Get model from controller
213 :param: controller: Controller
214 :param: model_name: Model name
216 :return: Model: The created Juju model object
218 return await controller
.get_model(model_name
)
220 async def model_exists(
221 self
, model_name
: str, controller
: Controller
= None
224 Check if model exists
226 :param: controller: Controller
227 :param: model_name: Model name
231 need_to_disconnect
= False
233 # Get controller if not passed
235 controller
= await self
.get_controller()
236 need_to_disconnect
= True
238 # Check if model exists
240 return model_name
in await controller
.list_models()
242 if need_to_disconnect
:
243 await self
.disconnect_controller(controller
)
245 async def models_exist(self
, model_names
: [str]) -> (bool, list):
247 Check if models exists
249 :param: model_names: List of strings with model names
251 :return (bool, list[str]): (True if all models exists, List of model names that don't exist)
255 "model_names must be a non-empty array. Given value: {}".format(
259 non_existing_models
= []
260 models
= await self
.list_models()
261 existing_models
= list(set(models
).intersection(model_names
))
262 non_existing_models
= list(set(model_names
) - set(existing_models
))
265 len(non_existing_models
) == 0,
269 async def get_model_status(self
, model_name
: str) -> FullStatus
:
273 :param: model_name: Model name
275 :return: Full status object
277 controller
= await self
.get_controller()
278 model
= await self
.get_model(controller
, model_name
)
280 return await model
.get_status()
282 await self
.disconnect_model(model
)
283 await self
.disconnect_controller(controller
)
285 async def create_machine(
288 machine_id
: str = None,
289 db_dict
: dict = None,
290 progress_timeout
: float = None,
291 total_timeout
: float = None,
292 series
: str = "xenial",
294 ) -> (Machine
, bool):
298 :param: model_name: Model name
299 :param: machine_id: Machine id
300 :param: db_dict: Dictionary with data of the DB to write the updates
301 :param: progress_timeout: Maximum time between two updates in the model
302 :param: total_timeout: Timeout for the entity to be active
303 :param: series: Series of the machine (xenial, bionic, focal, ...)
304 :param: wait: Wait until machine is ready
306 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
307 if the machine is new or it already existed
313 "Creating machine (id={}) in model: {}".format(machine_id
, model_name
)
317 controller
= await self
.get_controller()
320 model
= await self
.get_model(controller
, model_name
)
322 if machine_id
is not None:
324 "Searching machine (id={}) in model {}".format(
325 machine_id
, model_name
329 # Get machines from model and get the machine with machine_id if exists
330 machines
= await model
.get_machines()
331 if machine_id
in machines
:
333 "Machine (id={}) found in model {}".format(
334 machine_id
, model_name
337 machine
= machines
[machine_id
]
339 raise JujuMachineNotFound("Machine {} not found".format(machine_id
))
342 self
.log
.debug("Creating a new machine in model {}".format(model_name
))
345 machine
= await model
.add_machine(
346 spec
=None, constraints
=None, disks
=None, series
=series
350 # Wait until the machine is ready
352 "Wait until machine {} is ready in model {}".format(
353 machine
.entity_id
, model_name
357 await JujuModelWatcher
.wait_for(
360 progress_timeout
=progress_timeout
,
361 total_timeout
=total_timeout
,
366 await self
.disconnect_model(model
)
367 await self
.disconnect_controller(controller
)
370 "Machine {} ready at {} in model {}".format(
371 machine
.entity_id
, machine
.dns_name
, model_name
376 async def provision_machine(
381 private_key_path
: str,
382 db_dict
: dict = None,
383 progress_timeout
: float = None,
384 total_timeout
: float = None,
387 Manually provisioning of a machine
389 :param: model_name: Model name
390 :param: hostname: IP to access the machine
391 :param: username: Username to login to the machine
392 :param: private_key_path: Local path for the private key
393 :param: db_dict: Dictionary with data of the DB to write the updates
394 :param: progress_timeout: Maximum time between two updates in the model
395 :param: total_timeout: Timeout for the entity to be active
397 :return: (Entity): Machine id
400 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
401 model_name
, hostname
, username
406 controller
= await self
.get_controller()
409 model
= await self
.get_model(controller
, model_name
)
413 provisioner
= AsyncSSHProvisioner(
416 private_key_path
=private_key_path
,
421 params
= await provisioner
.provision_machine()
423 params
.jobs
= ["JobHostUnits"]
425 self
.log
.debug("Adding machine to model")
426 connection
= model
.connection()
427 client_facade
= client
.ClientFacade
.from_connection(connection
)
429 results
= await client_facade
.AddMachines(params
=[params
])
430 error
= results
.machines
[0].error
433 msg
= "Error adding machine: {}".format(error
.message
)
434 self
.log
.error(msg
=msg
)
435 raise ValueError(msg
)
437 machine_id
= results
.machines
[0].machine
439 self
.log
.debug("Installing Juju agent into machine {}".format(machine_id
))
440 asyncio
.ensure_future(
441 provisioner
.install_agent(
442 connection
=connection
,
444 machine_id
=machine_id
,
445 proxy
=self
.api_proxy
,
451 machine_list
= await model
.get_machines()
452 if machine_id
in machine_list
:
453 self
.log
.debug("Machine {} found in model!".format(machine_id
))
454 machine
= model
.machines
.get(machine_id
)
456 await asyncio
.sleep(2)
459 msg
= "Machine {} not found in model".format(machine_id
)
460 self
.log
.error(msg
=msg
)
461 raise JujuMachineNotFound(msg
)
464 "Wait until machine {} is ready in model {}".format(
465 machine
.entity_id
, model_name
468 await JujuModelWatcher
.wait_for(
471 progress_timeout
=progress_timeout
,
472 total_timeout
=total_timeout
,
476 except Exception as e
:
479 await self
.disconnect_model(model
)
480 await self
.disconnect_controller(controller
)
483 "Machine provisioned {} in model {}".format(machine_id
, model_name
)
488 async def deploy_charm(
490 application_name
: str,
494 db_dict
: dict = None,
495 progress_timeout
: float = None,
496 total_timeout
: float = None,
503 :param: application_name: Application name
504 :param: path: Local path to the charm
505 :param: model_name: Model name
506 :param: machine_id ID of the machine
507 :param: db_dict: Dictionary with data of the DB to write the updates
508 :param: progress_timeout: Maximum time between two updates in the model
509 :param: total_timeout: Timeout for the entity to be active
510 :param: config: Config for the charm
511 :param: series: Series of the charm
512 :param: num_units: Number of units
514 :return: (juju.application.Application): Juju application
517 "Deploying charm {} to machine {} in model ~{}".format(
518 application_name
, machine_id
, model_name
521 self
.log
.debug("charm: {}".format(path
))
524 controller
= await self
.get_controller()
527 model
= await self
.get_model(controller
, model_name
)
531 if application_name
not in model
.applications
:
533 if machine_id
is not None:
534 if machine_id
not in model
.machines
:
535 msg
= "Machine {} not found in model".format(machine_id
)
536 self
.log
.error(msg
=msg
)
537 raise JujuMachineNotFound(msg
)
538 machine
= model
.machines
[machine_id
]
539 series
= machine
.series
541 application
= await model
.deploy(
543 application_name
=application_name
,
552 "Wait until application {} is ready in model {}".format(
553 application_name
, model_name
557 for _
in range(num_units
- 1):
558 m
, _
= await self
.create_machine(model_name
, wait
=False)
559 await application
.add_unit(to
=m
.entity_id
)
561 await JujuModelWatcher
.wait_for(
564 progress_timeout
=progress_timeout
,
565 total_timeout
=total_timeout
,
570 "Application {} is ready in model {}".format(
571 application_name
, model_name
575 raise JujuApplicationExists(
576 "Application {} exists".format(application_name
)
579 await self
.disconnect_model(model
)
580 await self
.disconnect_controller(controller
)
584 def _get_application(self
, model
: Model
, application_name
: str) -> Application
:
587 :param: model: Model object
588 :param: application_name: Application name
590 :return: juju.application.Application (or None if it doesn't exist)
592 if model
.applications
and application_name
in model
.applications
:
593 return model
.applications
[application_name
]
595 async def execute_action(
597 application_name
: str,
600 db_dict
: dict = None,
601 progress_timeout
: float = None,
602 total_timeout
: float = None,
607 :param: application_name: Application name
608 :param: model_name: Model name
609 :param: action_name: Name of the action
610 :param: db_dict: Dictionary with data of the DB to write the updates
611 :param: progress_timeout: Maximum time between two updates in the model
612 :param: total_timeout: Timeout for the entity to be active
614 :return: (str, str): (output and status)
617 "Executing action {} using params {}".format(action_name
, kwargs
)
620 controller
= await self
.get_controller()
623 model
= await self
.get_model(controller
, model_name
)
627 application
= self
._get
_application
(
628 model
, application_name
=application_name
,
630 if application
is None:
631 raise JujuApplicationNotFound("Cannot execute action")
635 # Ocassionally, self._get_leader_unit() will return None
636 # because the leader elected hook has not been triggered yet.
637 # Therefore, we are doing some retries. If it happens again,
640 time_between_retries
= 10
642 for _
in range(attempts
):
643 unit
= await self
._get
_leader
_unit
(application
)
645 await asyncio
.sleep(time_between_retries
)
649 raise JujuLeaderUnitNotFound(
650 "Cannot execute action: leader unit not found"
653 actions
= await application
.get_actions()
655 if action_name
not in actions
:
656 raise JujuActionNotFound(
657 "Action {} not in available actions".format(action_name
)
660 action
= await unit
.run_action(action_name
, **kwargs
)
663 "Wait until action {} is completed in application {} (model={})".format(
664 action_name
, application_name
, model_name
667 await JujuModelWatcher
.wait_for(
670 progress_timeout
=progress_timeout
,
671 total_timeout
=total_timeout
,
676 output
= await model
.get_action_output(action_uuid
=action
.entity_id
)
677 status
= await model
.get_action_status(uuid_or_prefix
=action
.entity_id
)
679 status
[action
.entity_id
] if action
.entity_id
in status
else "failed"
683 "Action {} completed with status {} in application {} (model={})".format(
684 action_name
, action
.status
, application_name
, model_name
688 await self
.disconnect_model(model
)
689 await self
.disconnect_controller(controller
)
691 return output
, status
693 async def get_actions(self
, application_name
: str, model_name
: str) -> dict:
694 """Get list of actions
696 :param: application_name: Application name
697 :param: model_name: Model name
699 :return: Dict with this format
701 "action_name": "Description of the action",
706 "Getting list of actions for application {}".format(application_name
)
710 controller
= await self
.get_controller()
713 model
= await self
.get_model(controller
, model_name
)
717 application
= self
._get
_application
(
718 model
, application_name
=application_name
,
721 # Return list of actions
722 return await application
.get_actions()
725 # Disconnect from model and controller
726 await self
.disconnect_model(model
)
727 await self
.disconnect_controller(controller
)
729 async def get_metrics(self
, model_name
: str, application_name
: str) -> dict:
730 """Get the metrics collected by the VCA.
732 :param model_name The name or unique id of the network service
733 :param application_name The name of the application
735 if not model_name
or not application_name
:
736 raise Exception("model_name and application_name must be non-empty strings")
738 controller
= await self
.get_controller()
739 model
= await self
.get_model(controller
, model_name
)
741 application
= self
._get
_application
(model
, application_name
)
742 if application
is not None:
743 metrics
= await application
.get_metrics()
745 self
.disconnect_model(model
)
746 self
.disconnect_controller(controller
)
749 async def add_relation(
750 self
, model_name
: str, endpoint_1
: str, endpoint_2
: str,
754 :param: model_name: Model name
755 :param: endpoint_1 First endpoint name
756 ("app:endpoint" format or directly the saas name)
757 :param: endpoint_2: Second endpoint name (^ same format)
760 self
.log
.debug("Adding relation: {} -> {}".format(endpoint_1
, endpoint_2
))
763 controller
= await self
.get_controller()
766 model
= await self
.get_model(controller
, model_name
)
770 await model
.add_relation(endpoint_1
, endpoint_2
)
771 except JujuAPIError
as e
:
772 if "not found" in e
.message
:
773 self
.log
.warning("Relation not found: {}".format(e
.message
))
775 if "already exists" in e
.message
:
776 self
.log
.warning("Relation already exists: {}".format(e
.message
))
778 # another exception, raise it
781 await self
.disconnect_model(model
)
782 await self
.disconnect_controller(controller
)
785 self
, offer_url
: str, model_name
: str,
788 Adds a remote offer to the model. Relations can be created later using "juju relate".
790 :param: offer_url: Offer Url
791 :param: model_name: Model name
793 :raises ParseError if there's a problem parsing the offer_url
794 :raises JujuError if remote offer includes and endpoint
795 :raises JujuAPIError if the operation is not successful
797 controller
= await self
.get_controller()
798 model
= await controller
.get_model(model_name
)
801 await model
.consume(offer_url
)
803 await self
.disconnect_model(model
)
804 await self
.disconnect_controller(controller
)
806 async def destroy_model(self
, model_name
: str, total_timeout
: float):
810 :param: model_name: Model name
811 :param: total_timeout: Timeout
814 controller
= await self
.get_controller()
815 model
= await self
.get_model(controller
, model_name
)
817 self
.log
.debug("Destroying model {}".format(model_name
))
818 uuid
= model
.info
.uuid
821 machines
= await model
.get_machines()
822 for machine_id
in machines
:
824 await self
.destroy_machine(
825 model
, machine_id
=machine_id
, total_timeout
=total_timeout
,
827 except asyncio
.CancelledError
:
833 await self
.disconnect_model(model
)
836 if model_name
in self
.models
:
837 self
.models
.remove(model_name
)
839 await controller
.destroy_model(uuid
)
841 # Wait until model is destroyed
842 self
.log
.debug("Waiting for model {} to be destroyed...".format(model_name
))
845 if total_timeout
is None:
847 end
= time
.time() + total_timeout
848 while time
.time() < end
:
850 models
= await controller
.list_models()
851 if model_name
not in models
:
853 "The model {} ({}) was destroyed".format(model_name
, uuid
)
856 except asyncio
.CancelledError
:
858 except Exception as e
:
860 await asyncio
.sleep(5)
862 "Timeout waiting for model {} to be destroyed {}".format(
863 model_name
, last_exception
867 await self
.disconnect_controller(controller
)
869 async def destroy_application(self
, model
: Model
, application_name
: str):
873 :param: model: Model object
874 :param: application_name: Application name
877 "Destroying application {} in model {}".format(
878 application_name
, model
.info
.name
881 application
= model
.applications
.get(application_name
)
883 await application
.destroy()
885 self
.log
.warning("Application not found: {}".format(application_name
))
887 async def destroy_machine(
888 self
, model
: Model
, machine_id
: str, total_timeout
: float = 3600
893 :param: model: Model object
894 :param: machine_id: Machine id
895 :param: total_timeout: Timeout in seconds
897 machines
= await model
.get_machines()
898 if machine_id
in machines
:
899 machine
= machines
[machine_id
]
900 await machine
.destroy(force
=True)
902 end
= time
.time() + total_timeout
904 # wait for machine removal
905 machines
= await model
.get_machines()
906 while machine_id
in machines
and time
.time() < end
:
907 self
.log
.debug("Waiting for machine {} is destroyed".format(machine_id
))
908 await asyncio
.sleep(0.5)
909 machines
= await model
.get_machines()
910 self
.log
.debug("Machine destroyed: {}".format(machine_id
))
912 self
.log
.debug("Machine not found: {}".format(machine_id
))
914 async def configure_application(
915 self
, model_name
: str, application_name
: str, config
: dict = None
917 """Configure application
919 :param: model_name: Model name
920 :param: application_name: Application name
921 :param: config: Config to apply to the charm
923 self
.log
.debug("Configuring application {}".format(application_name
))
927 controller
= await self
.get_controller()
928 model
= await self
.get_model(controller
, model_name
)
929 application
= self
._get
_application
(
930 model
, application_name
=application_name
,
932 await application
.set_config(config
)
934 await self
.disconnect_model(model
)
935 await self
.disconnect_controller(controller
)
937 def _get_api_endpoints_db(self
) -> [str]:
939 Get API Endpoints from DB
941 :return: List of API endpoints
943 self
.log
.debug("Getting endpoints from database")
945 juju_info
= self
.db
.get_one(
946 DB_DATA
.api_endpoints
.table
,
947 q_filter
=DB_DATA
.api_endpoints
.filter,
950 if juju_info
and DB_DATA
.api_endpoints
.key
in juju_info
:
951 return juju_info
[DB_DATA
.api_endpoints
.key
]
953 def _update_api_endpoints_db(self
, endpoints
: [str]):
955 Update API endpoints in Database
957 :param: List of endpoints
959 self
.log
.debug("Saving endpoints {} in database".format(endpoints
))
961 juju_info
= self
.db
.get_one(
962 DB_DATA
.api_endpoints
.table
,
963 q_filter
=DB_DATA
.api_endpoints
.filter,
966 # If it doesn't, then create it
970 DB_DATA
.api_endpoints
.table
, DB_DATA
.api_endpoints
.filter,
972 except DbException
as e
:
973 # Racing condition: check if another N2VC worker has created it
974 juju_info
= self
.db
.get_one(
975 DB_DATA
.api_endpoints
.table
,
976 q_filter
=DB_DATA
.api_endpoints
.filter,
982 DB_DATA
.api_endpoints
.table
,
983 DB_DATA
.api_endpoints
.filter,
984 {DB_DATA
.api_endpoints
.key
: endpoints
},
987 def handle_exception(self
, loop
, context
):
988 # All unhandled exceptions by libjuju are handled here.
991 async def health_check(self
, interval
: float = 300.0):
993 Health check to make sure controller and controller_model connections are OK
995 :param: interval: Time in seconds between checks
999 controller
= await self
.get_controller()
1000 # self.log.debug("VCA is alive")
1001 except Exception as e
:
1002 self
.log
.error("Health check to VCA failed: {}".format(e
))
1004 await self
.disconnect_controller(controller
)
1005 await asyncio
.sleep(interval
)
1007 async def list_models(self
, contains
: str = None) -> [str]:
1008 """List models with certain names
1010 :param: contains: String that is contained in model name
1012 :retur: [models] Returns list of model names
1015 controller
= await self
.get_controller()
1017 models
= await controller
.list_models()
1019 models
= [model
for model
in models
if contains
in model
]
1022 await self
.disconnect_controller(controller
)
1024 async def list_offers(self
, model_name
: str) -> QueryApplicationOffersResults
:
1025 """List models with certain names
1027 :param: model_name: Model name
1029 :return: Returns list of offers
1032 controller
= await self
.get_controller()
1034 return await controller
.list_offers(model_name
)
1036 await self
.disconnect_controller(controller
)
1041 configuration
: Configuration
,
1043 credential_name
: str = None,
1046 Add a Kubernetes cloud to the controller
1048 Similar to the `juju add-k8s` command in the CLI
1050 :param: name: Name for the K8s cloud
1051 :param: configuration: Kubernetes configuration object
1052 :param: storage_class: Storage Class to use in the cloud
1053 :param: credential_name: Storage Class to use in the cloud
1056 if not storage_class
:
1057 raise Exception("storage_class must be a non-empty string")
1059 raise Exception("name must be a non-empty string")
1060 if not configuration
:
1061 raise Exception("configuration must be provided")
1063 endpoint
= configuration
.host
1064 credential
= self
.get_k8s_cloud_credential(configuration
)
1066 [credential
.attrs
["ClientCertificateData"]]
1067 if "ClientCertificateData" in credential
.attrs
1070 cloud
= client
.Cloud(
1072 auth_types
=[credential
.auth_type
],
1074 ca_certificates
=ca_certificates
,
1076 "operator-storage": storage_class
,
1077 "workload-storage": storage_class
,
1081 return await self
.add_cloud(
1082 name
, cloud
, credential
, credential_name
=credential_name
1085 def get_k8s_cloud_credential(
1086 self
, configuration
: Configuration
,
1087 ) -> client
.CloudCredential
:
1089 ca_cert
= configuration
.ssl_ca_cert
or configuration
.cert_file
1090 key
= configuration
.key_file
1091 api_key
= configuration
.api_key
1093 username
= configuration
.username
1094 password
= configuration
.password
1096 if "authorization" in api_key
:
1097 authorization
= api_key
["authorization"]
1098 if "Bearer " in authorization
:
1099 bearer_list
= authorization
.split(" ")
1100 if len(bearer_list
) == 2:
1101 [_
, token
] = bearer_list
1103 raise JujuInvalidK8sConfiguration("unknown format of api_key")
1105 token
= authorization
1107 attrs
["ClientCertificateData"] = open(ca_cert
, "r").read()
1109 attrs
["ClientKeyData"] = open(key
, "r").read()
1111 if username
or password
:
1112 raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass")
1113 attrs
["Token"] = token
1117 auth_type
= "oauth2"
1119 raise JujuInvalidK8sConfiguration(
1120 "missing token for auth type {}".format(auth_type
)
1125 "credential for user {} has empty password".format(username
)
1127 attrs
["username"] = username
1128 attrs
["password"] = password
1130 auth_type
= "userpasswithcert"
1132 auth_type
= "userpass"
1133 elif ca_cert
and token
:
1134 auth_type
= "certificate"
1136 raise JujuInvalidK8sConfiguration("authentication method not supported")
1137 return client
.CloudCredential(auth_type
=auth_type
, attrs
=attrs
,)
1139 async def add_cloud(
1143 credential
: CloudCredential
= None,
1144 credential_name
: str = None,
1147 Add cloud to the controller
1149 :param: name: Name of the cloud to be added
1150 :param: cloud: Cloud object
1151 :param: credential: CloudCredentials object for the cloud
1152 :param: credential_name: Credential name.
1153 If not defined, cloud of the name will be used.
1155 controller
= await self
.get_controller()
1157 _
= await controller
.add_cloud(name
, cloud
)
1159 await controller
.add_credential(
1160 credential_name
or name
, credential
=credential
, cloud
=name
1162 # Need to return the object returned by the controller.add_cloud() function
1163 # I'm returning the original value now until this bug is fixed:
1164 # https://github.com/juju/python-libjuju/issues/443
1167 await self
.disconnect_controller(controller
)
1169 async def remove_cloud(self
, name
: str):
1173 :param: name: Name of the cloud to be removed
1175 controller
= await self
.get_controller()
1177 await controller
.remove_cloud(name
)
1179 await self
.disconnect_controller(controller
)
1181 async def _get_leader_unit(self
, application
: Application
) -> Unit
:
1183 for u
in application
.units
:
1184 if await u
.is_leader_from_status():