1 # Copyright 2020 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
17 from juju
.controller
import Controller
18 from juju
.client
import client
21 from juju
.errors
import JujuAPIError
22 from juju
.model
import Model
23 from juju
.machine
import Machine
24 from juju
.application
import Application
25 from juju
.unit
import Unit
26 from juju
.client
._definitions
import (
28 QueryApplicationOffersResults
,
32 from n2vc
.juju_watcher
import JujuModelWatcher
33 from n2vc
.provisioner
import AsyncSSHProvisioner
34 from n2vc
.n2vc_conn
import N2VCConnector
35 from n2vc
.exceptions
import (
37 JujuApplicationNotFound
,
38 JujuLeaderUnitNotFound
,
40 JujuModelAlreadyExists
,
41 JujuControllerFailedConnecting
,
42 JujuApplicationExists
,
43 JujuInvalidK8sConfiguration
,
45 from n2vc
.utils
import DB_DATA
46 from osm_common
.dbbase
import DbException
47 from kubernetes
.client
.configuration
import Configuration
58 loop
: asyncio
.AbstractEventLoop
= None,
59 log
: logging
.Logger
= None,
61 n2vc
: N2VCConnector
= None,
62 apt_mirror
: str = None,
63 enable_os_upgrade
: bool = True,
68 :param: endpoint: Endpoint of the juju controller (host:port)
69 :param: api_proxy: Endpoint of the juju controller - Reachable from the VNFs
70 :param: username: Juju username
71 :param: password: Juju password
72 :param: cacert: Juju CA Certificate
73 :param: loop: Asyncio loop
76 :param: n2vc: N2VC object
77 :param: apt_mirror: APT Mirror
78 :param: enable_os_upgrade: Enable OS Upgrade
81 self
.log
= log
or logging
.getLogger("Libjuju")
83 db_endpoints
= self
._get
_api
_endpoints
_db
()
85 if (db_endpoints
and endpoint
not in db_endpoints
) or not db_endpoints
:
86 self
.endpoints
= [endpoint
]
87 self
._update
_api
_endpoints
_db
(self
.endpoints
)
89 self
.endpoints
= db_endpoints
90 self
.api_proxy
= api_proxy
91 self
.username
= username
92 self
.password
= password
94 self
.loop
= loop
or asyncio
.get_event_loop()
97 # Generate config for models
98 self
.model_config
= {}
100 self
.model_config
["apt-mirror"] = apt_mirror
101 self
.model_config
["enable-os-refresh-update"] = enable_os_upgrade
102 self
.model_config
["enable-os-upgrade"] = enable_os_upgrade
104 self
.loop
.set_exception_handler(self
.handle_exception
)
105 self
.creating_model
= asyncio
.Lock(loop
=self
.loop
)
108 self
.log
.debug("Libjuju initialized!")
110 self
.health_check_task
= self
._create
_health
_check
_task
()
112 def _create_health_check_task(self
):
113 return self
.loop
.create_task(self
.health_check())
115 async def get_controller(self
, timeout
: float = 5.0) -> Controller
:
119 :param: timeout: Time in seconds to wait for controller to connect
123 controller
= Controller(loop
=self
.loop
)
124 await asyncio
.wait_for(
126 endpoint
=self
.endpoints
,
127 username
=self
.username
,
128 password
=self
.password
,
133 endpoints
= await controller
.api_endpoints
134 if self
.endpoints
!= endpoints
:
135 self
.endpoints
= endpoints
136 self
._update
_api
_endpoints
_db
(self
.endpoints
)
138 except asyncio
.CancelledError
as e
:
140 except Exception as e
:
142 "Failed connecting to controller: {}...".format(self
.endpoints
)
145 await self
.disconnect_controller(controller
)
146 raise JujuControllerFailedConnecting(e
)
148 async def disconnect(self
):
150 # Cancel health check task
151 self
.health_check_task
.cancel()
152 self
.log
.debug("Libjuju disconnected!")
154 async def disconnect_model(self
, model
: Model
):
158 :param: model: Model that will be disconnected
160 await model
.disconnect()
162 async def disconnect_controller(self
, controller
: Controller
):
164 Disconnect controller
166 :param: controller: Controller that will be disconnected
169 await controller
.disconnect()
171 async def add_model(self
, model_name
: str, cloud_name
: str, credential_name
=None):
175 :param: model_name: Model name
176 :param: cloud_name: Cloud name
177 :param: credential_name: Credential name to use for adding the model
178 If not specified, same name as the cloud will be used.
182 controller
= await self
.get_controller()
185 # Raise exception if model already exists
186 if await self
.model_exists(model_name
, controller
=controller
):
187 raise JujuModelAlreadyExists(
188 "Model {} already exists.".format(model_name
)
191 # Block until other workers have finished model creation
192 while self
.creating_model
.locked():
193 await asyncio
.sleep(0.1)
195 # If the model exists, return it from the controller
196 if model_name
in self
.models
:
200 async with self
.creating_model
:
201 self
.log
.debug("Creating model {}".format(model_name
))
202 model
= await controller
.add_model(
204 config
=self
.model_config
,
205 cloud_name
=cloud_name
,
206 credential_name
=credential_name
or cloud_name
,
208 self
.models
.add(model_name
)
211 await self
.disconnect_model(model
)
212 await self
.disconnect_controller(controller
)
215 self
, controller
: Controller
, model_name
: str, id=None
218 Get model from controller
220 :param: controller: Controller
221 :param: model_name: Model name
223 :return: Model: The created Juju model object
225 return await controller
.get_model(model_name
)
227 async def model_exists(
228 self
, model_name
: str, controller
: Controller
= None
231 Check if model exists
233 :param: controller: Controller
234 :param: model_name: Model name
238 need_to_disconnect
= False
240 # Get controller if not passed
242 controller
= await self
.get_controller()
243 need_to_disconnect
= True
245 # Check if model exists
247 return model_name
in await controller
.list_models()
249 if need_to_disconnect
:
250 await self
.disconnect_controller(controller
)
252 async def models_exist(self
, model_names
: [str]) -> (bool, list):
254 Check if models exists
256 :param: model_names: List of strings with model names
258 :return (bool, list[str]): (True if all models exists, List of model names that don't exist)
262 "model_names must be a non-empty array. Given value: {}".format(
266 non_existing_models
= []
267 models
= await self
.list_models()
268 existing_models
= list(set(models
).intersection(model_names
))
269 non_existing_models
= list(set(model_names
) - set(existing_models
))
272 len(non_existing_models
) == 0,
276 async def get_model_status(self
, model_name
: str) -> FullStatus
:
280 :param: model_name: Model name
282 :return: Full status object
284 controller
= await self
.get_controller()
285 model
= await self
.get_model(controller
, model_name
)
287 return await model
.get_status()
289 await self
.disconnect_model(model
)
290 await self
.disconnect_controller(controller
)
292 async def create_machine(
295 machine_id
: str = None,
296 db_dict
: dict = None,
297 progress_timeout
: float = None,
298 total_timeout
: float = None,
299 series
: str = "xenial",
301 ) -> (Machine
, bool):
305 :param: model_name: Model name
306 :param: machine_id: Machine id
307 :param: db_dict: Dictionary with data of the DB to write the updates
308 :param: progress_timeout: Maximum time between two updates in the model
309 :param: total_timeout: Timeout for the entity to be active
310 :param: series: Series of the machine (xenial, bionic, focal, ...)
311 :param: wait: Wait until machine is ready
313 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
314 if the machine is new or it already existed
320 "Creating machine (id={}) in model: {}".format(machine_id
, model_name
)
324 controller
= await self
.get_controller()
327 model
= await self
.get_model(controller
, model_name
)
329 if machine_id
is not None:
331 "Searching machine (id={}) in model {}".format(
332 machine_id
, model_name
336 # Get machines from model and get the machine with machine_id if exists
337 machines
= await model
.get_machines()
338 if machine_id
in machines
:
340 "Machine (id={}) found in model {}".format(
341 machine_id
, model_name
344 machine
= machines
[machine_id
]
346 raise JujuMachineNotFound("Machine {} not found".format(machine_id
))
349 self
.log
.debug("Creating a new machine in model {}".format(model_name
))
352 machine
= await model
.add_machine(
353 spec
=None, constraints
=None, disks
=None, series
=series
357 # Wait until the machine is ready
359 "Wait until machine {} is ready in model {}".format(
360 machine
.entity_id
, model_name
364 await JujuModelWatcher
.wait_for(
367 progress_timeout
=progress_timeout
,
368 total_timeout
=total_timeout
,
373 await self
.disconnect_model(model
)
374 await self
.disconnect_controller(controller
)
377 "Machine {} ready at {} in model {}".format(
378 machine
.entity_id
, machine
.dns_name
, model_name
383 async def provision_machine(
388 private_key_path
: str,
389 db_dict
: dict = None,
390 progress_timeout
: float = None,
391 total_timeout
: float = None,
394 Manually provisioning of a machine
396 :param: model_name: Model name
397 :param: hostname: IP to access the machine
398 :param: username: Username to login to the machine
399 :param: private_key_path: Local path for the private key
400 :param: db_dict: Dictionary with data of the DB to write the updates
401 :param: progress_timeout: Maximum time between two updates in the model
402 :param: total_timeout: Timeout for the entity to be active
404 :return: (Entity): Machine id
407 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
408 model_name
, hostname
, username
413 controller
= await self
.get_controller()
416 model
= await self
.get_model(controller
, model_name
)
420 provisioner
= AsyncSSHProvisioner(
423 private_key_path
=private_key_path
,
428 params
= await provisioner
.provision_machine()
430 params
.jobs
= ["JobHostUnits"]
432 self
.log
.debug("Adding machine to model")
433 connection
= model
.connection()
434 client_facade
= client
.ClientFacade
.from_connection(connection
)
436 results
= await client_facade
.AddMachines(params
=[params
])
437 error
= results
.machines
[0].error
440 msg
= "Error adding machine: {}".format(error
.message
)
441 self
.log
.error(msg
=msg
)
442 raise ValueError(msg
)
444 machine_id
= results
.machines
[0].machine
446 self
.log
.debug("Installing Juju agent into machine {}".format(machine_id
))
447 asyncio
.ensure_future(
448 provisioner
.install_agent(
449 connection
=connection
,
451 machine_id
=machine_id
,
452 proxy
=self
.api_proxy
,
458 machine_list
= await model
.get_machines()
459 if machine_id
in machine_list
:
460 self
.log
.debug("Machine {} found in model!".format(machine_id
))
461 machine
= model
.machines
.get(machine_id
)
463 await asyncio
.sleep(2)
466 msg
= "Machine {} not found in model".format(machine_id
)
467 self
.log
.error(msg
=msg
)
468 raise JujuMachineNotFound(msg
)
471 "Wait until machine {} is ready in model {}".format(
472 machine
.entity_id
, model_name
475 await JujuModelWatcher
.wait_for(
478 progress_timeout
=progress_timeout
,
479 total_timeout
=total_timeout
,
483 except Exception as e
:
486 await self
.disconnect_model(model
)
487 await self
.disconnect_controller(controller
)
490 "Machine provisioned {} in model {}".format(machine_id
, model_name
)
496 self
, uri
: str, model_name
: str, wait
: bool = True, timeout
: float = 3600
499 Deploy bundle or charm: Similar to the juju CLI command `juju deploy`
501 :param: uri: Path or Charm Store uri in which the charm or bundle can be found
502 :param: model_name: Model name
503 :param: wait: Indicates whether to wait or not until all applications are active
504 :param: timeout: Time in seconds to wait until all applications are active
506 controller
= await self
.get_controller()
507 model
= await self
.get_model(controller
, model_name
)
509 await model
.deploy(uri
)
511 await JujuModelWatcher
.wait_for_model(model
, timeout
=timeout
)
512 self
.log
.debug("All units active in model {}".format(model_name
))
514 await self
.disconnect_model(model
)
515 await self
.disconnect_controller(controller
)
517 async def deploy_charm(
519 application_name
: str,
523 db_dict
: dict = None,
524 progress_timeout
: float = None,
525 total_timeout
: float = None,
532 :param: application_name: Application name
533 :param: path: Local path to the charm
534 :param: model_name: Model name
535 :param: machine_id ID of the machine
536 :param: db_dict: Dictionary with data of the DB to write the updates
537 :param: progress_timeout: Maximum time between two updates in the model
538 :param: total_timeout: Timeout for the entity to be active
539 :param: config: Config for the charm
540 :param: series: Series of the charm
541 :param: num_units: Number of units
543 :return: (juju.application.Application): Juju application
546 "Deploying charm {} to machine {} in model ~{}".format(
547 application_name
, machine_id
, model_name
550 self
.log
.debug("charm: {}".format(path
))
553 controller
= await self
.get_controller()
556 model
= await self
.get_model(controller
, model_name
)
560 if application_name
not in model
.applications
:
562 if machine_id
is not None:
563 if machine_id
not in model
.machines
:
564 msg
= "Machine {} not found in model".format(machine_id
)
565 self
.log
.error(msg
=msg
)
566 raise JujuMachineNotFound(msg
)
567 machine
= model
.machines
[machine_id
]
568 series
= machine
.series
570 application
= await model
.deploy(
572 application_name
=application_name
,
581 "Wait until application {} is ready in model {}".format(
582 application_name
, model_name
586 for _
in range(num_units
- 1):
587 m
, _
= await self
.create_machine(model_name
, wait
=False)
588 await application
.add_unit(to
=m
.entity_id
)
590 await JujuModelWatcher
.wait_for(
593 progress_timeout
=progress_timeout
,
594 total_timeout
=total_timeout
,
599 "Application {} is ready in model {}".format(
600 application_name
, model_name
604 raise JujuApplicationExists(
605 "Application {} exists".format(application_name
)
608 await self
.disconnect_model(model
)
609 await self
.disconnect_controller(controller
)
613 def _get_application(self
, model
: Model
, application_name
: str) -> Application
:
616 :param: model: Model object
617 :param: application_name: Application name
619 :return: juju.application.Application (or None if it doesn't exist)
621 if model
.applications
and application_name
in model
.applications
:
622 return model
.applications
[application_name
]
624 async def execute_action(
626 application_name
: str,
629 db_dict
: dict = None,
630 progress_timeout
: float = None,
631 total_timeout
: float = None,
636 :param: application_name: Application name
637 :param: model_name: Model name
638 :param: action_name: Name of the action
639 :param: db_dict: Dictionary with data of the DB to write the updates
640 :param: progress_timeout: Maximum time between two updates in the model
641 :param: total_timeout: Timeout for the entity to be active
643 :return: (str, str): (output and status)
646 "Executing action {} using params {}".format(action_name
, kwargs
)
649 controller
= await self
.get_controller()
652 model
= await self
.get_model(controller
, model_name
)
656 application
= self
._get
_application
(
657 model
, application_name
=application_name
,
659 if application
is None:
660 raise JujuApplicationNotFound("Cannot execute action")
664 # Ocassionally, self._get_leader_unit() will return None
665 # because the leader elected hook has not been triggered yet.
666 # Therefore, we are doing some retries. If it happens again,
669 time_between_retries
= 10
671 for _
in range(attempts
):
672 unit
= await self
._get
_leader
_unit
(application
)
674 await asyncio
.sleep(time_between_retries
)
678 raise JujuLeaderUnitNotFound(
679 "Cannot execute action: leader unit not found"
682 actions
= await application
.get_actions()
684 if action_name
not in actions
:
685 raise JujuActionNotFound(
686 "Action {} not in available actions".format(action_name
)
689 action
= await unit
.run_action(action_name
, **kwargs
)
692 "Wait until action {} is completed in application {} (model={})".format(
693 action_name
, application_name
, model_name
696 await JujuModelWatcher
.wait_for(
699 progress_timeout
=progress_timeout
,
700 total_timeout
=total_timeout
,
705 output
= await model
.get_action_output(action_uuid
=action
.entity_id
)
706 status
= await model
.get_action_status(uuid_or_prefix
=action
.entity_id
)
708 status
[action
.entity_id
] if action
.entity_id
in status
else "failed"
712 "Action {} completed with status {} in application {} (model={})".format(
713 action_name
, action
.status
, application_name
, model_name
717 await self
.disconnect_model(model
)
718 await self
.disconnect_controller(controller
)
720 return output
, status
722 async def get_actions(self
, application_name
: str, model_name
: str) -> dict:
723 """Get list of actions
725 :param: application_name: Application name
726 :param: model_name: Model name
728 :return: Dict with this format
730 "action_name": "Description of the action",
735 "Getting list of actions for application {}".format(application_name
)
739 controller
= await self
.get_controller()
742 model
= await self
.get_model(controller
, model_name
)
746 application
= self
._get
_application
(
747 model
, application_name
=application_name
,
750 # Return list of actions
751 return await application
.get_actions()
754 # Disconnect from model and controller
755 await self
.disconnect_model(model
)
756 await self
.disconnect_controller(controller
)
758 async def get_metrics(self
, model_name
: str, application_name
: str) -> dict:
759 """Get the metrics collected by the VCA.
761 :param model_name The name or unique id of the network service
762 :param application_name The name of the application
764 if not model_name
or not application_name
:
765 raise Exception("model_name and application_name must be non-empty strings")
767 controller
= await self
.get_controller()
768 model
= await self
.get_model(controller
, model_name
)
770 application
= self
._get
_application
(model
, application_name
)
771 if application
is not None:
772 metrics
= await application
.get_metrics()
774 self
.disconnect_model(model
)
775 self
.disconnect_controller(controller
)
778 async def add_relation(
779 self
, model_name
: str, endpoint_1
: str, endpoint_2
: str,
783 :param: model_name: Model name
784 :param: endpoint_1 First endpoint name
785 ("app:endpoint" format or directly the saas name)
786 :param: endpoint_2: Second endpoint name (^ same format)
789 self
.log
.debug("Adding relation: {} -> {}".format(endpoint_1
, endpoint_2
))
792 controller
= await self
.get_controller()
795 model
= await self
.get_model(controller
, model_name
)
799 await model
.add_relation(endpoint_1
, endpoint_2
)
800 except JujuAPIError
as e
:
801 if "not found" in e
.message
:
802 self
.log
.warning("Relation not found: {}".format(e
.message
))
804 if "already exists" in e
.message
:
805 self
.log
.warning("Relation already exists: {}".format(e
.message
))
807 # another exception, raise it
810 await self
.disconnect_model(model
)
811 await self
.disconnect_controller(controller
)
814 self
, offer_url
: str, model_name
: str,
817 Adds a remote offer to the model. Relations can be created later using "juju relate".
819 :param: offer_url: Offer Url
820 :param: model_name: Model name
822 :raises ParseError if there's a problem parsing the offer_url
823 :raises JujuError if remote offer includes and endpoint
824 :raises JujuAPIError if the operation is not successful
826 controller
= await self
.get_controller()
827 model
= await controller
.get_model(model_name
)
830 await model
.consume(offer_url
)
832 await self
.disconnect_model(model
)
833 await self
.disconnect_controller(controller
)
835 async def destroy_model(self
, model_name
: str, total_timeout
: float):
839 :param: model_name: Model name
840 :param: total_timeout: Timeout
843 controller
= await self
.get_controller()
844 model
= await self
.get_model(controller
, model_name
)
846 self
.log
.debug("Destroying model {}".format(model_name
))
847 uuid
= model
.info
.uuid
849 # Destroy machines that are manually provisioned
850 # and still are in pending state
851 await self
._destroy
_pending
_machines
(model
, only_manual
=True)
854 await self
.disconnect_model(model
)
857 if model_name
in self
.models
:
858 self
.models
.remove(model_name
)
860 await controller
.destroy_model(uuid
, force
=True, max_wait
=0)
862 # Wait until model is destroyed
863 self
.log
.debug("Waiting for model {} to be destroyed...".format(model_name
))
865 if total_timeout
is None:
867 end
= time
.time() + total_timeout
868 while time
.time() < end
:
869 models
= await controller
.list_models()
870 if model_name
not in models
:
872 "The model {} ({}) was destroyed".format(model_name
, uuid
)
875 await asyncio
.sleep(5)
877 "Timeout waiting for model {} to be destroyed".format(model_name
)
880 await self
.disconnect_controller(controller
)
882 async def destroy_application(self
, model
: Model
, application_name
: str):
886 :param: model: Model object
887 :param: application_name: Application name
890 "Destroying application {} in model {}".format(
891 application_name
, model
.info
.name
894 application
= model
.applications
.get(application_name
)
896 await application
.destroy()
898 self
.log
.warning("Application not found: {}".format(application_name
))
900 async def _destroy_pending_machines(self
, model
: Model
, only_manual
: bool = False):
902 Destroy pending machines in a given model
904 :param: only_manual: Bool that indicates only manually provisioned
905 machines should be destroyed (if True), or that
906 all pending machines should be destroyed
908 status
= await model
.get_status()
909 for machine_id
in status
.machines
:
910 machine_status
= status
.machines
[machine_id
]
911 if machine_status
.agent_status
.status
== "pending":
912 if only_manual
and not machine_status
.instance_id
.startswith("manual:"):
914 machine
= model
.machines
[machine_id
]
915 await machine
.destroy(force
=True)
917 # async def destroy_machine(
918 # self, model: Model, machine_id: str, total_timeout: float = 3600
923 # :param: model: Model object
924 # :param: machine_id: Machine id
925 # :param: total_timeout: Timeout in seconds
927 # machines = await model.get_machines()
928 # if machine_id in machines:
929 # machine = machines[machine_id]
930 # await machine.destroy(force=True)
932 # end = time.time() + total_timeout
934 # # wait for machine removal
935 # machines = await model.get_machines()
936 # while machine_id in machines and time.time() < end:
937 # self.log.debug("Waiting for machine {} is destroyed".format(machine_id))
938 # await asyncio.sleep(0.5)
939 # machines = await model.get_machines()
940 # self.log.debug("Machine destroyed: {}".format(machine_id))
942 # self.log.debug("Machine not found: {}".format(machine_id))
944 async def configure_application(
945 self
, model_name
: str, application_name
: str, config
: dict = None
947 """Configure application
949 :param: model_name: Model name
950 :param: application_name: Application name
951 :param: config: Config to apply to the charm
953 self
.log
.debug("Configuring application {}".format(application_name
))
956 controller
= await self
.get_controller()
959 model
= await self
.get_model(controller
, model_name
)
960 application
= self
._get
_application
(
961 model
, application_name
=application_name
,
963 await application
.set_config(config
)
966 await self
.disconnect_model(model
)
967 await self
.disconnect_controller(controller
)
969 def _get_api_endpoints_db(self
) -> [str]:
971 Get API Endpoints from DB
973 :return: List of API endpoints
975 self
.log
.debug("Getting endpoints from database")
977 juju_info
= self
.db
.get_one(
978 DB_DATA
.api_endpoints
.table
,
979 q_filter
=DB_DATA
.api_endpoints
.filter,
982 if juju_info
and DB_DATA
.api_endpoints
.key
in juju_info
:
983 return juju_info
[DB_DATA
.api_endpoints
.key
]
985 def _update_api_endpoints_db(self
, endpoints
: [str]):
987 Update API endpoints in Database
989 :param: List of endpoints
991 self
.log
.debug("Saving endpoints {} in database".format(endpoints
))
993 juju_info
= self
.db
.get_one(
994 DB_DATA
.api_endpoints
.table
,
995 q_filter
=DB_DATA
.api_endpoints
.filter,
998 # If it doesn't, then create it
1002 DB_DATA
.api_endpoints
.table
, DB_DATA
.api_endpoints
.filter,
1004 except DbException
as e
:
1005 # Racing condition: check if another N2VC worker has created it
1006 juju_info
= self
.db
.get_one(
1007 DB_DATA
.api_endpoints
.table
,
1008 q_filter
=DB_DATA
.api_endpoints
.filter,
1009 fail_on_empty
=False,
1014 DB_DATA
.api_endpoints
.table
,
1015 DB_DATA
.api_endpoints
.filter,
1016 {DB_DATA
.api_endpoints
.key
: endpoints
},
1019 def handle_exception(self
, loop
, context
):
1020 # All unhandled exceptions by libjuju are handled here.
1023 async def health_check(self
, interval
: float = 300.0):
1025 Health check to make sure controller and controller_model connections are OK
1027 :param: interval: Time in seconds between checks
1032 controller
= await self
.get_controller()
1033 # self.log.debug("VCA is alive")
1034 except Exception as e
:
1035 self
.log
.error("Health check to VCA failed: {}".format(e
))
1037 await self
.disconnect_controller(controller
)
1038 await asyncio
.sleep(interval
)
1040 async def list_models(self
, contains
: str = None) -> [str]:
1041 """List models with certain names
1043 :param: contains: String that is contained in model name
1045 :retur: [models] Returns list of model names
1048 controller
= await self
.get_controller()
1050 models
= await controller
.list_models()
1052 models
= [model
for model
in models
if contains
in model
]
1055 await self
.disconnect_controller(controller
)
1057 async def list_offers(self
, model_name
: str) -> QueryApplicationOffersResults
:
1058 """List models with certain names
1060 :param: model_name: Model name
1062 :return: Returns list of offers
1065 controller
= await self
.get_controller()
1067 return await controller
.list_offers(model_name
)
1069 await self
.disconnect_controller(controller
)
1074 configuration
: Configuration
,
1076 credential_name
: str = None,
1079 Add a Kubernetes cloud to the controller
1081 Similar to the `juju add-k8s` command in the CLI
1083 :param: name: Name for the K8s cloud
1084 :param: configuration: Kubernetes configuration object
1085 :param: storage_class: Storage Class to use in the cloud
1086 :param: credential_name: Storage Class to use in the cloud
1089 if not storage_class
:
1090 raise Exception("storage_class must be a non-empty string")
1092 raise Exception("name must be a non-empty string")
1093 if not configuration
:
1094 raise Exception("configuration must be provided")
1096 endpoint
= configuration
.host
1097 credential
= self
.get_k8s_cloud_credential(configuration
)
1099 [credential
.attrs
["ClientCertificateData"]]
1100 if "ClientCertificateData" in credential
.attrs
1103 cloud
= client
.Cloud(
1105 auth_types
=[credential
.auth_type
],
1107 ca_certificates
=ca_certificates
,
1109 "operator-storage": storage_class
,
1110 "workload-storage": storage_class
,
1114 return await self
.add_cloud(
1115 name
, cloud
, credential
, credential_name
=credential_name
1118 def get_k8s_cloud_credential(
1119 self
, configuration
: Configuration
,
1120 ) -> client
.CloudCredential
:
1122 ca_cert
= configuration
.ssl_ca_cert
or configuration
.cert_file
1123 key
= configuration
.key_file
1124 api_key
= configuration
.api_key
1126 username
= configuration
.username
1127 password
= configuration
.password
1129 if "authorization" in api_key
:
1130 authorization
= api_key
["authorization"]
1131 if "Bearer " in authorization
:
1132 bearer_list
= authorization
.split(" ")
1133 if len(bearer_list
) == 2:
1134 [_
, token
] = bearer_list
1136 raise JujuInvalidK8sConfiguration("unknown format of api_key")
1138 token
= authorization
1140 attrs
["ClientCertificateData"] = open(ca_cert
, "r").read()
1142 attrs
["ClientKeyData"] = open(key
, "r").read()
1144 if username
or password
:
1145 raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass")
1146 attrs
["Token"] = token
1150 auth_type
= "oauth2"
1152 raise JujuInvalidK8sConfiguration(
1153 "missing token for auth type {}".format(auth_type
)
1158 "credential for user {} has empty password".format(username
)
1160 attrs
["username"] = username
1161 attrs
["password"] = password
1163 auth_type
= "userpasswithcert"
1165 auth_type
= "userpass"
1166 elif ca_cert
and token
:
1167 auth_type
= "certificate"
1169 raise JujuInvalidK8sConfiguration("authentication method not supported")
1170 return client
.CloudCredential(auth_type
=auth_type
, attrs
=attrs
)
1172 async def add_cloud(
1176 credential
: CloudCredential
= None,
1177 credential_name
: str = None,
1180 Add cloud to the controller
1182 :param: name: Name of the cloud to be added
1183 :param: cloud: Cloud object
1184 :param: credential: CloudCredentials object for the cloud
1185 :param: credential_name: Credential name.
1186 If not defined, cloud of the name will be used.
1188 controller
= await self
.get_controller()
1190 _
= await controller
.add_cloud(name
, cloud
)
1192 await controller
.add_credential(
1193 credential_name
or name
, credential
=credential
, cloud
=name
1195 # Need to return the object returned by the controller.add_cloud() function
1196 # I'm returning the original value now until this bug is fixed:
1197 # https://github.com/juju/python-libjuju/issues/443
1200 await self
.disconnect_controller(controller
)
1202 async def remove_cloud(self
, name
: str):
1206 :param: name: Name of the cloud to be removed
1208 controller
= await self
.get_controller()
1210 await controller
.remove_cloud(name
)
1212 await self
.disconnect_controller(controller
)
1214 async def _get_leader_unit(self
, application
: Application
) -> Unit
:
1216 for u
in application
.units
:
1217 if await u
.is_leader_from_status():