1 # Copyright 2020 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
17 from juju
.controller
import Controller
18 from juju
.client
import client
21 from juju
.errors
import JujuAPIError
22 from juju
.model
import Model
23 from juju
.machine
import Machine
24 from juju
.application
import Application
25 from juju
.unit
import Unit
26 from juju
.client
._definitions
import (
28 QueryApplicationOffersResults
,
32 from n2vc
.juju_watcher
import JujuModelWatcher
33 from n2vc
.provisioner
import AsyncSSHProvisioner
34 from n2vc
.n2vc_conn
import N2VCConnector
35 from n2vc
.exceptions
import (
37 JujuApplicationNotFound
,
38 JujuLeaderUnitNotFound
,
40 JujuModelAlreadyExists
,
41 JujuControllerFailedConnecting
,
42 JujuApplicationExists
,
43 JujuInvalidK8sConfiguration
,
45 from n2vc
.utils
import DB_DATA
46 from osm_common
.dbbase
import DbException
47 from kubernetes
.client
.configuration
import Configuration
58 loop
: asyncio
.AbstractEventLoop
= None,
59 log
: logging
.Logger
= None,
61 n2vc
: N2VCConnector
= None,
62 apt_mirror
: str = None,
63 enable_os_upgrade
: bool = True,
68 :param: endpoint: Endpoint of the juju controller (host:port)
69 :param: api_proxy: Endpoint of the juju controller - Reachable from the VNFs
70 :param: username: Juju username
71 :param: password: Juju password
72 :param: cacert: Juju CA Certificate
73 :param: loop: Asyncio loop
76 :param: n2vc: N2VC object
77 :param: apt_mirror: APT Mirror
78 :param: enable_os_upgrade: Enable OS Upgrade
81 self
.log
= log
or logging
.getLogger("Libjuju")
83 db_endpoints
= self
._get
_api
_endpoints
_db
()
85 if (db_endpoints
and endpoint
not in db_endpoints
) or not db_endpoints
:
86 self
.endpoints
= [endpoint
]
87 self
._update
_api
_endpoints
_db
(self
.endpoints
)
89 self
.endpoints
= db_endpoints
90 self
.api_proxy
= api_proxy
91 self
.username
= username
92 self
.password
= password
94 self
.loop
= loop
or asyncio
.get_event_loop()
97 # Generate config for models
98 self
.model_config
= {}
100 self
.model_config
["apt-mirror"] = apt_mirror
101 self
.model_config
["enable-os-refresh-update"] = enable_os_upgrade
102 self
.model_config
["enable-os-upgrade"] = enable_os_upgrade
104 self
.loop
.set_exception_handler(self
.handle_exception
)
105 self
.creating_model
= asyncio
.Lock(loop
=self
.loop
)
108 self
.log
.debug("Libjuju initialized!")
110 self
.health_check_task
= self
._create
_health
_check
_task
()
112 def _create_health_check_task(self
):
113 return self
.loop
.create_task(self
.health_check())
115 async def get_controller(self
, timeout
: float = 5.0) -> Controller
:
119 :param: timeout: Time in seconds to wait for controller to connect
123 controller
= Controller(loop
=self
.loop
)
124 await asyncio
.wait_for(
126 endpoint
=self
.endpoints
,
127 username
=self
.username
,
128 password
=self
.password
,
133 endpoints
= await controller
.api_endpoints
134 if self
.endpoints
!= endpoints
:
135 self
.endpoints
= endpoints
136 self
._update
_api
_endpoints
_db
(self
.endpoints
)
138 except asyncio
.CancelledError
as e
:
140 except Exception as e
:
142 "Failed connecting to controller: {}...".format(self
.endpoints
)
145 await self
.disconnect_controller(controller
)
146 raise JujuControllerFailedConnecting(e
)
148 async def disconnect(self
):
150 # Cancel health check task
151 self
.health_check_task
.cancel()
152 self
.log
.debug("Libjuju disconnected!")
154 async def disconnect_model(self
, model
: Model
):
158 :param: model: Model that will be disconnected
160 await model
.disconnect()
162 async def disconnect_controller(self
, controller
: Controller
):
164 Disconnect controller
166 :param: controller: Controller that will be disconnected
168 await controller
.disconnect()
170 async def add_model(self
, model_name
: str, cloud_name
: str, credential_name
=None):
174 :param: model_name: Model name
175 :param: cloud_name: Cloud name
176 :param: credential_name: Credential name to use for adding the model
177 If not specified, same name as the cloud will be used.
181 controller
= await self
.get_controller()
184 # Raise exception if model already exists
185 if await self
.model_exists(model_name
, controller
=controller
):
186 raise JujuModelAlreadyExists(
187 "Model {} already exists.".format(model_name
)
190 # Block until other workers have finished model creation
191 while self
.creating_model
.locked():
192 await asyncio
.sleep(0.1)
194 # If the model exists, return it from the controller
195 if model_name
in self
.models
:
199 async with self
.creating_model
:
200 self
.log
.debug("Creating model {}".format(model_name
))
201 model
= await controller
.add_model(
203 config
=self
.model_config
,
204 cloud_name
=cloud_name
,
205 credential_name
=credential_name
or cloud_name
,
207 self
.models
.add(model_name
)
210 await self
.disconnect_model(model
)
211 await self
.disconnect_controller(controller
)
214 self
, controller
: Controller
, model_name
: str, id=None
217 Get model from controller
219 :param: controller: Controller
220 :param: model_name: Model name
222 :return: Model: The created Juju model object
224 return await controller
.get_model(model_name
)
226 async def model_exists(
227 self
, model_name
: str, controller
: Controller
= None
230 Check if model exists
232 :param: controller: Controller
233 :param: model_name: Model name
237 need_to_disconnect
= False
239 # Get controller if not passed
241 controller
= await self
.get_controller()
242 need_to_disconnect
= True
244 # Check if model exists
246 return model_name
in await controller
.list_models()
248 if need_to_disconnect
:
249 await self
.disconnect_controller(controller
)
251 async def models_exist(self
, model_names
: [str]) -> (bool, list):
253 Check if models exists
255 :param: model_names: List of strings with model names
257 :return (bool, list[str]): (True if all models exists, List of model names that don't exist)
261 "model_names must be a non-empty array. Given value: {}".format(
265 non_existing_models
= []
266 models
= await self
.list_models()
267 existing_models
= list(set(models
).intersection(model_names
))
268 non_existing_models
= list(set(model_names
) - set(existing_models
))
271 len(non_existing_models
) == 0,
275 async def get_model_status(self
, model_name
: str) -> FullStatus
:
279 :param: model_name: Model name
281 :return: Full status object
283 controller
= await self
.get_controller()
284 model
= await self
.get_model(controller
, model_name
)
286 return await model
.get_status()
288 await self
.disconnect_model(model
)
289 await self
.disconnect_controller(controller
)
291 async def create_machine(
294 machine_id
: str = None,
295 db_dict
: dict = None,
296 progress_timeout
: float = None,
297 total_timeout
: float = None,
298 series
: str = "xenial",
300 ) -> (Machine
, bool):
304 :param: model_name: Model name
305 :param: machine_id: Machine id
306 :param: db_dict: Dictionary with data of the DB to write the updates
307 :param: progress_timeout: Maximum time between two updates in the model
308 :param: total_timeout: Timeout for the entity to be active
309 :param: series: Series of the machine (xenial, bionic, focal, ...)
310 :param: wait: Wait until machine is ready
312 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
313 if the machine is new or it already existed
319 "Creating machine (id={}) in model: {}".format(machine_id
, model_name
)
323 controller
= await self
.get_controller()
326 model
= await self
.get_model(controller
, model_name
)
328 if machine_id
is not None:
330 "Searching machine (id={}) in model {}".format(
331 machine_id
, model_name
335 # Get machines from model and get the machine with machine_id if exists
336 machines
= await model
.get_machines()
337 if machine_id
in machines
:
339 "Machine (id={}) found in model {}".format(
340 machine_id
, model_name
343 machine
= machines
[machine_id
]
345 raise JujuMachineNotFound("Machine {} not found".format(machine_id
))
348 self
.log
.debug("Creating a new machine in model {}".format(model_name
))
351 machine
= await model
.add_machine(
352 spec
=None, constraints
=None, disks
=None, series
=series
356 # Wait until the machine is ready
358 "Wait until machine {} is ready in model {}".format(
359 machine
.entity_id
, model_name
363 await JujuModelWatcher
.wait_for(
366 progress_timeout
=progress_timeout
,
367 total_timeout
=total_timeout
,
372 await self
.disconnect_model(model
)
373 await self
.disconnect_controller(controller
)
376 "Machine {} ready at {} in model {}".format(
377 machine
.entity_id
, machine
.dns_name
, model_name
382 async def provision_machine(
387 private_key_path
: str,
388 db_dict
: dict = None,
389 progress_timeout
: float = None,
390 total_timeout
: float = None,
393 Manually provisioning of a machine
395 :param: model_name: Model name
396 :param: hostname: IP to access the machine
397 :param: username: Username to login to the machine
398 :param: private_key_path: Local path for the private key
399 :param: db_dict: Dictionary with data of the DB to write the updates
400 :param: progress_timeout: Maximum time between two updates in the model
401 :param: total_timeout: Timeout for the entity to be active
403 :return: (Entity): Machine id
406 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
407 model_name
, hostname
, username
412 controller
= await self
.get_controller()
415 model
= await self
.get_model(controller
, model_name
)
419 provisioner
= AsyncSSHProvisioner(
422 private_key_path
=private_key_path
,
427 params
= await provisioner
.provision_machine()
429 params
.jobs
= ["JobHostUnits"]
431 self
.log
.debug("Adding machine to model")
432 connection
= model
.connection()
433 client_facade
= client
.ClientFacade
.from_connection(connection
)
435 results
= await client_facade
.AddMachines(params
=[params
])
436 error
= results
.machines
[0].error
439 msg
= "Error adding machine: {}".format(error
.message
)
440 self
.log
.error(msg
=msg
)
441 raise ValueError(msg
)
443 machine_id
= results
.machines
[0].machine
445 self
.log
.debug("Installing Juju agent into machine {}".format(machine_id
))
446 asyncio
.ensure_future(
447 provisioner
.install_agent(
448 connection
=connection
,
450 machine_id
=machine_id
,
451 proxy
=self
.api_proxy
,
457 machine_list
= await model
.get_machines()
458 if machine_id
in machine_list
:
459 self
.log
.debug("Machine {} found in model!".format(machine_id
))
460 machine
= model
.machines
.get(machine_id
)
462 await asyncio
.sleep(2)
465 msg
= "Machine {} not found in model".format(machine_id
)
466 self
.log
.error(msg
=msg
)
467 raise JujuMachineNotFound(msg
)
470 "Wait until machine {} is ready in model {}".format(
471 machine
.entity_id
, model_name
474 await JujuModelWatcher
.wait_for(
477 progress_timeout
=progress_timeout
,
478 total_timeout
=total_timeout
,
482 except Exception as e
:
485 await self
.disconnect_model(model
)
486 await self
.disconnect_controller(controller
)
489 "Machine provisioned {} in model {}".format(machine_id
, model_name
)
494 async def deploy_charm(
496 application_name
: str,
500 db_dict
: dict = None,
501 progress_timeout
: float = None,
502 total_timeout
: float = None,
509 :param: application_name: Application name
510 :param: path: Local path to the charm
511 :param: model_name: Model name
512 :param: machine_id ID of the machine
513 :param: db_dict: Dictionary with data of the DB to write the updates
514 :param: progress_timeout: Maximum time between two updates in the model
515 :param: total_timeout: Timeout for the entity to be active
516 :param: config: Config for the charm
517 :param: series: Series of the charm
518 :param: num_units: Number of units
520 :return: (juju.application.Application): Juju application
523 "Deploying charm {} to machine {} in model ~{}".format(
524 application_name
, machine_id
, model_name
527 self
.log
.debug("charm: {}".format(path
))
530 controller
= await self
.get_controller()
533 model
= await self
.get_model(controller
, model_name
)
537 if application_name
not in model
.applications
:
539 if machine_id
is not None:
540 if machine_id
not in model
.machines
:
541 msg
= "Machine {} not found in model".format(machine_id
)
542 self
.log
.error(msg
=msg
)
543 raise JujuMachineNotFound(msg
)
544 machine
= model
.machines
[machine_id
]
545 series
= machine
.series
547 application
= await model
.deploy(
549 application_name
=application_name
,
558 "Wait until application {} is ready in model {}".format(
559 application_name
, model_name
563 for _
in range(num_units
- 1):
564 m
, _
= await self
.create_machine(model_name
, wait
=False)
565 await application
.add_unit(to
=m
.entity_id
)
567 await JujuModelWatcher
.wait_for(
570 progress_timeout
=progress_timeout
,
571 total_timeout
=total_timeout
,
576 "Application {} is ready in model {}".format(
577 application_name
, model_name
581 raise JujuApplicationExists(
582 "Application {} exists".format(application_name
)
585 await self
.disconnect_model(model
)
586 await self
.disconnect_controller(controller
)
590 def _get_application(self
, model
: Model
, application_name
: str) -> Application
:
593 :param: model: Model object
594 :param: application_name: Application name
596 :return: juju.application.Application (or None if it doesn't exist)
598 if model
.applications
and application_name
in model
.applications
:
599 return model
.applications
[application_name
]
601 async def execute_action(
603 application_name
: str,
606 db_dict
: dict = None,
607 progress_timeout
: float = None,
608 total_timeout
: float = None,
613 :param: application_name: Application name
614 :param: model_name: Model name
615 :param: action_name: Name of the action
616 :param: db_dict: Dictionary with data of the DB to write the updates
617 :param: progress_timeout: Maximum time between two updates in the model
618 :param: total_timeout: Timeout for the entity to be active
620 :return: (str, str): (output and status)
623 "Executing action {} using params {}".format(action_name
, kwargs
)
626 controller
= await self
.get_controller()
629 model
= await self
.get_model(controller
, model_name
)
633 application
= self
._get
_application
(
634 model
, application_name
=application_name
,
636 if application
is None:
637 raise JujuApplicationNotFound("Cannot execute action")
641 # Ocassionally, self._get_leader_unit() will return None
642 # because the leader elected hook has not been triggered yet.
643 # Therefore, we are doing some retries. If it happens again,
646 time_between_retries
= 10
648 for _
in range(attempts
):
649 unit
= await self
._get
_leader
_unit
(application
)
651 await asyncio
.sleep(time_between_retries
)
655 raise JujuLeaderUnitNotFound(
656 "Cannot execute action: leader unit not found"
659 actions
= await application
.get_actions()
661 if action_name
not in actions
:
662 raise JujuActionNotFound(
663 "Action {} not in available actions".format(action_name
)
666 action
= await unit
.run_action(action_name
, **kwargs
)
669 "Wait until action {} is completed in application {} (model={})".format(
670 action_name
, application_name
, model_name
673 await JujuModelWatcher
.wait_for(
676 progress_timeout
=progress_timeout
,
677 total_timeout
=total_timeout
,
682 output
= await model
.get_action_output(action_uuid
=action
.entity_id
)
683 status
= await model
.get_action_status(uuid_or_prefix
=action
.entity_id
)
685 status
[action
.entity_id
] if action
.entity_id
in status
else "failed"
689 "Action {} completed with status {} in application {} (model={})".format(
690 action_name
, action
.status
, application_name
, model_name
694 await self
.disconnect_model(model
)
695 await self
.disconnect_controller(controller
)
697 return output
, status
699 async def get_actions(self
, application_name
: str, model_name
: str) -> dict:
700 """Get list of actions
702 :param: application_name: Application name
703 :param: model_name: Model name
705 :return: Dict with this format
707 "action_name": "Description of the action",
712 "Getting list of actions for application {}".format(application_name
)
716 controller
= await self
.get_controller()
719 model
= await self
.get_model(controller
, model_name
)
723 application
= self
._get
_application
(
724 model
, application_name
=application_name
,
727 # Return list of actions
728 return await application
.get_actions()
731 # Disconnect from model and controller
732 await self
.disconnect_model(model
)
733 await self
.disconnect_controller(controller
)
735 async def get_metrics(self
, model_name
: str, application_name
: str) -> dict:
736 """Get the metrics collected by the VCA.
738 :param model_name The name or unique id of the network service
739 :param application_name The name of the application
741 if not model_name
or not application_name
:
742 raise Exception("model_name and application_name must be non-empty strings")
744 controller
= await self
.get_controller()
745 model
= await self
.get_model(controller
, model_name
)
747 application
= self
._get
_application
(model
, application_name
)
748 if application
is not None:
749 metrics
= await application
.get_metrics()
751 self
.disconnect_model(model
)
752 self
.disconnect_controller(controller
)
755 async def add_relation(
756 self
, model_name
: str, endpoint_1
: str, endpoint_2
: str,
760 :param: model_name: Model name
761 :param: endpoint_1 First endpoint name
762 ("app:endpoint" format or directly the saas name)
763 :param: endpoint_2: Second endpoint name (^ same format)
766 self
.log
.debug("Adding relation: {} -> {}".format(endpoint_1
, endpoint_2
))
769 controller
= await self
.get_controller()
772 model
= await self
.get_model(controller
, model_name
)
776 await model
.add_relation(endpoint_1
, endpoint_2
)
777 except JujuAPIError
as e
:
778 if "not found" in e
.message
:
779 self
.log
.warning("Relation not found: {}".format(e
.message
))
781 if "already exists" in e
.message
:
782 self
.log
.warning("Relation already exists: {}".format(e
.message
))
784 # another exception, raise it
787 await self
.disconnect_model(model
)
788 await self
.disconnect_controller(controller
)
791 self
, offer_url
: str, model_name
: str,
794 Adds a remote offer to the model. Relations can be created later using "juju relate".
796 :param: offer_url: Offer Url
797 :param: model_name: Model name
799 :raises ParseError if there's a problem parsing the offer_url
800 :raises JujuError if remote offer includes and endpoint
801 :raises JujuAPIError if the operation is not successful
803 controller
= await self
.get_controller()
804 model
= await controller
.get_model(model_name
)
807 await model
.consume(offer_url
)
809 await self
.disconnect_model(model
)
810 await self
.disconnect_controller(controller
)
812 async def destroy_model(self
, model_name
: str, total_timeout
: float):
816 :param: model_name: Model name
817 :param: total_timeout: Timeout
820 controller
= await self
.get_controller()
821 model
= await self
.get_model(controller
, model_name
)
823 self
.log
.debug("Destroying model {}".format(model_name
))
824 uuid
= model
.info
.uuid
826 # Destroy machines that are manually provisioned
827 # and still are in pending state
828 await self
._destroy
_pending
_machines
(model
, only_manual
=True)
831 await self
.disconnect_model(model
)
834 if model_name
in self
.models
:
835 self
.models
.remove(model_name
)
837 await controller
.destroy_model(uuid
, force
=True, max_wait
=0)
839 # Wait until model is destroyed
840 self
.log
.debug("Waiting for model {} to be destroyed...".format(model_name
))
842 if total_timeout
is None:
844 end
= time
.time() + total_timeout
845 while time
.time() < end
:
846 models
= await controller
.list_models()
847 if model_name
not in models
:
849 "The model {} ({}) was destroyed".format(model_name
, uuid
)
852 await asyncio
.sleep(5)
854 "Timeout waiting for model {} to be destroyed".format(model_name
)
857 await self
.disconnect_controller(controller
)
859 async def destroy_application(self
, model
: Model
, application_name
: str):
863 :param: model: Model object
864 :param: application_name: Application name
867 "Destroying application {} in model {}".format(
868 application_name
, model
.info
.name
871 application
= model
.applications
.get(application_name
)
873 await application
.destroy()
875 self
.log
.warning("Application not found: {}".format(application_name
))
877 async def _destroy_pending_machines(self
, model
: Model
, only_manual
: bool = False):
879 Destroy pending machines in a given model
881 :param: only_manual: Bool that indicates only manually provisioned
882 machines should be destroyed (if True), or that
883 all pending machines should be destroyed
885 status
= await model
.get_status()
886 for machine_id
in status
.machines
:
887 machine_status
= status
.machines
[machine_id
]
888 if machine_status
.agent_status
.status
== "pending":
889 if only_manual
and not machine_status
.instance_id
.startswith("manual:"):
891 machine
= model
.machines
[machine_id
]
892 await machine
.destroy(force
=True)
894 # async def destroy_machine(
895 # self, model: Model, machine_id: str, total_timeout: float = 3600
900 # :param: model: Model object
901 # :param: machine_id: Machine id
902 # :param: total_timeout: Timeout in seconds
904 # machines = await model.get_machines()
905 # if machine_id in machines:
906 # machine = machines[machine_id]
907 # await machine.destroy(force=True)
909 # end = time.time() + total_timeout
911 # # wait for machine removal
912 # machines = await model.get_machines()
913 # while machine_id in machines and time.time() < end:
914 # self.log.debug("Waiting for machine {} is destroyed".format(machine_id))
915 # await asyncio.sleep(0.5)
916 # machines = await model.get_machines()
917 # self.log.debug("Machine destroyed: {}".format(machine_id))
919 # self.log.debug("Machine not found: {}".format(machine_id))
921 async def configure_application(
922 self
, model_name
: str, application_name
: str, config
: dict = None
924 """Configure application
926 :param: model_name: Model name
927 :param: application_name: Application name
928 :param: config: Config to apply to the charm
930 self
.log
.debug("Configuring application {}".format(application_name
))
934 controller
= await self
.get_controller()
935 model
= await self
.get_model(controller
, model_name
)
936 application
= self
._get
_application
(
937 model
, application_name
=application_name
,
939 await application
.set_config(config
)
941 await self
.disconnect_model(model
)
942 await self
.disconnect_controller(controller
)
944 def _get_api_endpoints_db(self
) -> [str]:
946 Get API Endpoints from DB
948 :return: List of API endpoints
950 self
.log
.debug("Getting endpoints from database")
952 juju_info
= self
.db
.get_one(
953 DB_DATA
.api_endpoints
.table
,
954 q_filter
=DB_DATA
.api_endpoints
.filter,
957 if juju_info
and DB_DATA
.api_endpoints
.key
in juju_info
:
958 return juju_info
[DB_DATA
.api_endpoints
.key
]
960 def _update_api_endpoints_db(self
, endpoints
: [str]):
962 Update API endpoints in Database
964 :param: List of endpoints
966 self
.log
.debug("Saving endpoints {} in database".format(endpoints
))
968 juju_info
= self
.db
.get_one(
969 DB_DATA
.api_endpoints
.table
,
970 q_filter
=DB_DATA
.api_endpoints
.filter,
973 # If it doesn't, then create it
977 DB_DATA
.api_endpoints
.table
, DB_DATA
.api_endpoints
.filter,
979 except DbException
as e
:
980 # Racing condition: check if another N2VC worker has created it
981 juju_info
= self
.db
.get_one(
982 DB_DATA
.api_endpoints
.table
,
983 q_filter
=DB_DATA
.api_endpoints
.filter,
989 DB_DATA
.api_endpoints
.table
,
990 DB_DATA
.api_endpoints
.filter,
991 {DB_DATA
.api_endpoints
.key
: endpoints
},
994 def handle_exception(self
, loop
, context
):
995 # All unhandled exceptions by libjuju are handled here.
998 async def health_check(self
, interval
: float = 300.0):
1000 Health check to make sure controller and controller_model connections are OK
1002 :param: interval: Time in seconds between checks
1006 controller
= await self
.get_controller()
1007 # self.log.debug("VCA is alive")
1008 except Exception as e
:
1009 self
.log
.error("Health check to VCA failed: {}".format(e
))
1011 await self
.disconnect_controller(controller
)
1012 await asyncio
.sleep(interval
)
1014 async def list_models(self
, contains
: str = None) -> [str]:
1015 """List models with certain names
1017 :param: contains: String that is contained in model name
1019 :retur: [models] Returns list of model names
1022 controller
= await self
.get_controller()
1024 models
= await controller
.list_models()
1026 models
= [model
for model
in models
if contains
in model
]
1029 await self
.disconnect_controller(controller
)
1031 async def list_offers(self
, model_name
: str) -> QueryApplicationOffersResults
:
1032 """List models with certain names
1034 :param: model_name: Model name
1036 :return: Returns list of offers
1039 controller
= await self
.get_controller()
1041 return await controller
.list_offers(model_name
)
1043 await self
.disconnect_controller(controller
)
1048 configuration
: Configuration
,
1050 credential_name
: str = None,
1053 Add a Kubernetes cloud to the controller
1055 Similar to the `juju add-k8s` command in the CLI
1057 :param: name: Name for the K8s cloud
1058 :param: configuration: Kubernetes configuration object
1059 :param: storage_class: Storage Class to use in the cloud
1060 :param: credential_name: Storage Class to use in the cloud
1063 if not storage_class
:
1064 raise Exception("storage_class must be a non-empty string")
1066 raise Exception("name must be a non-empty string")
1067 if not configuration
:
1068 raise Exception("configuration must be provided")
1070 endpoint
= configuration
.host
1071 credential
= self
.get_k8s_cloud_credential(configuration
)
1073 [credential
.attrs
["ClientCertificateData"]]
1074 if "ClientCertificateData" in credential
.attrs
1077 cloud
= client
.Cloud(
1079 auth_types
=[credential
.auth_type
],
1081 ca_certificates
=ca_certificates
,
1083 "operator-storage": storage_class
,
1084 "workload-storage": storage_class
,
1088 return await self
.add_cloud(
1089 name
, cloud
, credential
, credential_name
=credential_name
1092 def get_k8s_cloud_credential(
1093 self
, configuration
: Configuration
,
1094 ) -> client
.CloudCredential
:
1096 ca_cert
= configuration
.ssl_ca_cert
or configuration
.cert_file
1097 key
= configuration
.key_file
1098 api_key
= configuration
.api_key
1100 username
= configuration
.username
1101 password
= configuration
.password
1103 if "authorization" in api_key
:
1104 authorization
= api_key
["authorization"]
1105 if "Bearer " in authorization
:
1106 bearer_list
= authorization
.split(" ")
1107 if len(bearer_list
) == 2:
1108 [_
, token
] = bearer_list
1110 raise JujuInvalidK8sConfiguration("unknown format of api_key")
1112 token
= authorization
1114 attrs
["ClientCertificateData"] = open(ca_cert
, "r").read()
1116 attrs
["ClientKeyData"] = open(key
, "r").read()
1118 if username
or password
:
1119 raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass")
1120 attrs
["Token"] = token
1124 auth_type
= "oauth2"
1126 raise JujuInvalidK8sConfiguration(
1127 "missing token for auth type {}".format(auth_type
)
1132 "credential for user {} has empty password".format(username
)
1134 attrs
["username"] = username
1135 attrs
["password"] = password
1137 auth_type
= "userpasswithcert"
1139 auth_type
= "userpass"
1140 elif ca_cert
and token
:
1141 auth_type
= "certificate"
1143 raise JujuInvalidK8sConfiguration("authentication method not supported")
1144 return client
.CloudCredential(auth_type
=auth_type
, attrs
=attrs
,)
1146 async def add_cloud(
1150 credential
: CloudCredential
= None,
1151 credential_name
: str = None,
1154 Add cloud to the controller
1156 :param: name: Name of the cloud to be added
1157 :param: cloud: Cloud object
1158 :param: credential: CloudCredentials object for the cloud
1159 :param: credential_name: Credential name.
1160 If not defined, cloud of the name will be used.
1162 controller
= await self
.get_controller()
1164 _
= await controller
.add_cloud(name
, cloud
)
1166 await controller
.add_credential(
1167 credential_name
or name
, credential
=credential
, cloud
=name
1169 # Need to return the object returned by the controller.add_cloud() function
1170 # I'm returning the original value now until this bug is fixed:
1171 # https://github.com/juju/python-libjuju/issues/443
1174 await self
.disconnect_controller(controller
)
1176 async def remove_cloud(self
, name
: str):
1180 :param: name: Name of the cloud to be removed
1182 controller
= await self
.get_controller()
1184 await controller
.remove_cloud(name
)
1186 await self
.disconnect_controller(controller
)
1188 async def _get_leader_unit(self
, application
: Application
) -> Unit
:
1190 for u
in application
.units
:
1191 if await u
.is_leader_from_status():