1 # Copyright 2020 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
20 from juju
.errors
import JujuAPIError
21 from juju
.model
import Model
22 from juju
.machine
import Machine
23 from juju
.application
import Application
24 from juju
.unit
import Unit
25 from juju
.client
._definitions
import (
27 QueryApplicationOffersResults
,
31 from juju
.controller
import Controller
32 from juju
.client
import client
35 from n2vc
.config
import ModelConfig
36 from n2vc
.juju_watcher
import JujuModelWatcher
37 from n2vc
.provisioner
import AsyncSSHProvisioner
38 from n2vc
.n2vc_conn
import N2VCConnector
39 from n2vc
.exceptions
import (
41 JujuApplicationNotFound
,
42 JujuLeaderUnitNotFound
,
44 JujuControllerFailedConnecting
,
45 JujuApplicationExists
,
46 JujuInvalidK8sConfiguration
,
49 from n2vc
.utils
import DB_DATA
50 from osm_common
.dbbase
import DbException
51 from kubernetes
.client
.configuration
import Configuration
53 RBAC_LABEL_KEY_NAME
= "rbac-id"
64 loop
: asyncio
.AbstractEventLoop
= None,
65 log
: logging
.Logger
= None,
67 n2vc
: N2VCConnector
= None,
68 model_config
: ModelConfig
= {},
73 :param: endpoint: Endpoint of the juju controller (host:port)
74 :param: api_proxy: Endpoint of the juju controller - Reachable from the VNFs
75 :param: username: Juju username
76 :param: password: Juju password
77 :param: cacert: Juju CA Certificate
78 :param: loop: Asyncio loop
81 :param: n2vc: N2VC object
82 :param: apt_mirror: APT Mirror
83 :param: enable_os_upgrade: Enable OS Upgrade
86 self
.log
= log
or logging
.getLogger("Libjuju")
88 db_endpoints
= self
._get
_api
_endpoints
_db
()
90 if (db_endpoints
and endpoint
not in db_endpoints
) or not db_endpoints
:
91 self
.endpoints
= [endpoint
]
92 self
._update
_api
_endpoints
_db
(self
.endpoints
)
94 self
.endpoints
= db_endpoints
95 self
.api_proxy
= api_proxy
96 self
.username
= username
97 self
.password
= password
99 self
.loop
= loop
or asyncio
.get_event_loop()
102 # Generate config for models
103 self
.model_config
= model_config
105 self
.loop
.set_exception_handler(self
.handle_exception
)
106 self
.creating_model
= asyncio
.Lock(loop
=self
.loop
)
108 self
.log
.debug("Libjuju initialized!")
110 self
.health_check_task
= self
._create
_health
_check
_task
()
112 def _create_health_check_task(self
):
113 return self
.loop
.create_task(self
.health_check())
115 async def get_controller(self
, timeout
: float = 15.0) -> Controller
:
119 :param: timeout: Time in seconds to wait for controller to connect
123 controller
= Controller(loop
=self
.loop
)
124 await asyncio
.wait_for(
126 endpoint
=self
.endpoints
,
127 username
=self
.username
,
128 password
=self
.password
,
133 endpoints
= await controller
.api_endpoints
134 if self
.endpoints
!= endpoints
:
135 self
.endpoints
= endpoints
136 self
._update
_api
_endpoints
_db
(self
.endpoints
)
138 except asyncio
.CancelledError
as e
:
140 except Exception as e
:
142 "Failed connecting to controller: {}...".format(self
.endpoints
)
145 await self
.disconnect_controller(controller
)
146 raise JujuControllerFailedConnecting(e
)
148 async def disconnect(self
):
150 # Cancel health check task
151 self
.health_check_task
.cancel()
152 self
.log
.debug("Libjuju disconnected!")
154 async def disconnect_model(self
, model
: Model
):
158 :param: model: Model that will be disconnected
160 await model
.disconnect()
162 async def disconnect_controller(self
, controller
: Controller
):
164 Disconnect controller
166 :param: controller: Controller that will be disconnected
169 await controller
.disconnect()
171 async def add_model(self
, model_name
: str, cloud_name
: str, credential_name
=None):
175 :param: model_name: Model name
176 :param: cloud_name: Cloud name
177 :param: credential_name: Credential name to use for adding the model
178 If not specified, same name as the cloud will be used.
182 controller
= await self
.get_controller()
185 # Block until other workers have finished model creation
186 while self
.creating_model
.locked():
187 await asyncio
.sleep(0.1)
190 async with self
.creating_model
:
191 if await self
.model_exists(model_name
, controller
=controller
):
193 self
.log
.debug("Creating model {}".format(model_name
))
194 model
= await controller
.add_model(
196 config
=self
.model_config
,
197 cloud_name
=cloud_name
,
198 credential_name
=credential_name
or cloud_name
,
202 await self
.disconnect_model(model
)
203 await self
.disconnect_controller(controller
)
205 async def get_executed_actions(self
, model_name
: str) -> list:
207 Get executed/history of actions for a model.
209 :param: model_name: Model name, str.
210 :return: List of executed actions for a model.
213 executed_actions
= []
214 controller
= await self
.get_controller()
216 model
= await self
.get_model(controller
, model_name
)
217 # Get all unique action names
219 for application
in model
.applications
:
220 application_actions
= await self
.get_actions(application
, model_name
)
221 actions
.update(application_actions
)
222 # Get status of all actions
223 for application_action
in actions
:
224 app_action_status_list
= await model
.get_action_status(name
=application_action
)
225 for action_id
, action_status
in app_action_status_list
.items():
226 executed_action
= {"id": action_id
, "action": application_action
,
227 "status": action_status
}
228 # Get action output by id
229 action_status
= await model
.get_action_output(executed_action
["id"])
230 for k
, v
in action_status
.items():
231 executed_action
[k
] = v
232 executed_actions
.append(executed_action
)
233 except Exception as e
:
234 raise JujuError("Error in getting executed actions for model: {}. Error: {}"
235 .format(model_name
, str(e
)))
238 await self
.disconnect_model(model
)
239 await self
.disconnect_controller(controller
)
240 return executed_actions
242 async def get_application_configs(self
, model_name
: str, application_name
: str) -> dict:
244 Get available configs for an application.
246 :param: model_name: Model name, str.
247 :param: application_name: Application name, str.
249 :return: A dict which has key - action name, value - action description
252 application_configs
= {}
253 controller
= await self
.get_controller()
255 model
= await self
.get_model(controller
, model_name
)
256 application
= self
._get
_application
(model
, application_name
=application_name
)
257 application_configs
= await application
.get_config()
258 except Exception as e
:
259 raise JujuError("Error in getting configs for application: {} in model: {}. Error: {}"
260 .format(application_name
, model_name
, str(e
)))
263 await self
.disconnect_model(model
)
264 await self
.disconnect_controller(controller
)
265 return application_configs
268 self
, controller
: Controller
, model_name
: str, id=None
271 Get model from controller
273 :param: controller: Controller
274 :param: model_name: Model name
276 :return: Model: The created Juju model object
278 return await controller
.get_model(model_name
)
280 async def model_exists(
281 self
, model_name
: str, controller
: Controller
= None
284 Check if model exists
286 :param: controller: Controller
287 :param: model_name: Model name
291 need_to_disconnect
= False
293 # Get controller if not passed
295 controller
= await self
.get_controller()
296 need_to_disconnect
= True
298 # Check if model exists
300 return model_name
in await controller
.list_models()
302 if need_to_disconnect
:
303 await self
.disconnect_controller(controller
)
305 async def models_exist(self
, model_names
: [str]) -> (bool, list):
307 Check if models exists
309 :param: model_names: List of strings with model names
311 :return (bool, list[str]): (True if all models exists, List of model names that don't exist)
315 "model_names must be a non-empty array. Given value: {}".format(
319 non_existing_models
= []
320 models
= await self
.list_models()
321 existing_models
= list(set(models
).intersection(model_names
))
322 non_existing_models
= list(set(model_names
) - set(existing_models
))
325 len(non_existing_models
) == 0,
329 async def get_model_status(self
, model_name
: str) -> FullStatus
:
333 :param: model_name: Model name
335 :return: Full status object
337 controller
= await self
.get_controller()
338 model
= await self
.get_model(controller
, model_name
)
340 return await model
.get_status()
342 await self
.disconnect_model(model
)
343 await self
.disconnect_controller(controller
)
345 async def create_machine(
348 machine_id
: str = None,
349 db_dict
: dict = None,
350 progress_timeout
: float = None,
351 total_timeout
: float = None,
352 series
: str = "xenial",
354 ) -> (Machine
, bool):
358 :param: model_name: Model name
359 :param: machine_id: Machine id
360 :param: db_dict: Dictionary with data of the DB to write the updates
361 :param: progress_timeout: Maximum time between two updates in the model
362 :param: total_timeout: Timeout for the entity to be active
363 :param: series: Series of the machine (xenial, bionic, focal, ...)
364 :param: wait: Wait until machine is ready
366 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
367 if the machine is new or it already existed
373 "Creating machine (id={}) in model: {}".format(machine_id
, model_name
)
377 controller
= await self
.get_controller()
380 model
= await self
.get_model(controller
, model_name
)
382 if machine_id
is not None:
384 "Searching machine (id={}) in model {}".format(
385 machine_id
, model_name
389 # Get machines from model and get the machine with machine_id if exists
390 machines
= await model
.get_machines()
391 if machine_id
in machines
:
393 "Machine (id={}) found in model {}".format(
394 machine_id
, model_name
397 machine
= machines
[machine_id
]
399 raise JujuMachineNotFound("Machine {} not found".format(machine_id
))
402 self
.log
.debug("Creating a new machine in model {}".format(model_name
))
405 machine
= await model
.add_machine(
406 spec
=None, constraints
=None, disks
=None, series
=series
410 # Wait until the machine is ready
412 "Wait until machine {} is ready in model {}".format(
413 machine
.entity_id
, model_name
417 await JujuModelWatcher
.wait_for(
420 progress_timeout
=progress_timeout
,
421 total_timeout
=total_timeout
,
426 await self
.disconnect_model(model
)
427 await self
.disconnect_controller(controller
)
430 "Machine {} ready at {} in model {}".format(
431 machine
.entity_id
, machine
.dns_name
, model_name
436 async def provision_machine(
441 private_key_path
: str,
442 db_dict
: dict = None,
443 progress_timeout
: float = None,
444 total_timeout
: float = None,
447 Manually provisioning of a machine
449 :param: model_name: Model name
450 :param: hostname: IP to access the machine
451 :param: username: Username to login to the machine
452 :param: private_key_path: Local path for the private key
453 :param: db_dict: Dictionary with data of the DB to write the updates
454 :param: progress_timeout: Maximum time between two updates in the model
455 :param: total_timeout: Timeout for the entity to be active
457 :return: (Entity): Machine id
460 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
461 model_name
, hostname
, username
466 controller
= await self
.get_controller()
469 model
= await self
.get_model(controller
, model_name
)
473 provisioner
= AsyncSSHProvisioner(
476 private_key_path
=private_key_path
,
481 params
= await provisioner
.provision_machine()
483 params
.jobs
= ["JobHostUnits"]
485 self
.log
.debug("Adding machine to model")
486 connection
= model
.connection()
487 client_facade
= client
.ClientFacade
.from_connection(connection
)
489 results
= await client_facade
.AddMachines(params
=[params
])
490 error
= results
.machines
[0].error
493 msg
= "Error adding machine: {}".format(error
.message
)
494 self
.log
.error(msg
=msg
)
495 raise ValueError(msg
)
497 machine_id
= results
.machines
[0].machine
499 self
.log
.debug("Installing Juju agent into machine {}".format(machine_id
))
500 asyncio
.ensure_future(
501 provisioner
.install_agent(
502 connection
=connection
,
504 machine_id
=machine_id
,
505 proxy
=self
.api_proxy
,
506 series
=params
.series
,
512 machine_list
= await model
.get_machines()
513 if machine_id
in machine_list
:
514 self
.log
.debug("Machine {} found in model!".format(machine_id
))
515 machine
= model
.machines
.get(machine_id
)
517 await asyncio
.sleep(2)
520 msg
= "Machine {} not found in model".format(machine_id
)
521 self
.log
.error(msg
=msg
)
522 raise JujuMachineNotFound(msg
)
525 "Wait until machine {} is ready in model {}".format(
526 machine
.entity_id
, model_name
529 await JujuModelWatcher
.wait_for(
532 progress_timeout
=progress_timeout
,
533 total_timeout
=total_timeout
,
537 except Exception as e
:
540 await self
.disconnect_model(model
)
541 await self
.disconnect_controller(controller
)
544 "Machine provisioned {} in model {}".format(machine_id
, model_name
)
550 self
, uri
: str, model_name
: str, wait
: bool = True, timeout
: float = 3600
553 Deploy bundle or charm: Similar to the juju CLI command `juju deploy`
555 :param: uri: Path or Charm Store uri in which the charm or bundle can be found
556 :param: model_name: Model name
557 :param: wait: Indicates whether to wait or not until all applications are active
558 :param: timeout: Time in seconds to wait until all applications are active
560 controller
= await self
.get_controller()
561 model
= await self
.get_model(controller
, model_name
)
563 await model
.deploy(uri
)
565 await JujuModelWatcher
.wait_for_model(model
, timeout
=timeout
)
566 self
.log
.debug("All units active in model {}".format(model_name
))
568 await self
.disconnect_model(model
)
569 await self
.disconnect_controller(controller
)
571 async def deploy_charm(
573 application_name
: str,
577 db_dict
: dict = None,
578 progress_timeout
: float = None,
579 total_timeout
: float = None,
586 :param: application_name: Application name
587 :param: path: Local path to the charm
588 :param: model_name: Model name
589 :param: machine_id ID of the machine
590 :param: db_dict: Dictionary with data of the DB to write the updates
591 :param: progress_timeout: Maximum time between two updates in the model
592 :param: total_timeout: Timeout for the entity to be active
593 :param: config: Config for the charm
594 :param: series: Series of the charm
595 :param: num_units: Number of units
597 :return: (juju.application.Application): Juju application
600 "Deploying charm {} to machine {} in model ~{}".format(
601 application_name
, machine_id
, model_name
604 self
.log
.debug("charm: {}".format(path
))
607 controller
= await self
.get_controller()
610 model
= await self
.get_model(controller
, model_name
)
614 if application_name
not in model
.applications
:
616 if machine_id
is not None:
617 if machine_id
not in model
.machines
:
618 msg
= "Machine {} not found in model".format(machine_id
)
619 self
.log
.error(msg
=msg
)
620 raise JujuMachineNotFound(msg
)
621 machine
= model
.machines
[machine_id
]
622 series
= machine
.series
624 application
= await model
.deploy(
626 application_name
=application_name
,
635 "Wait until application {} is ready in model {}".format(
636 application_name
, model_name
640 for _
in range(num_units
- 1):
641 m
, _
= await self
.create_machine(model_name
, wait
=False)
642 await application
.add_unit(to
=m
.entity_id
)
644 await JujuModelWatcher
.wait_for(
647 progress_timeout
=progress_timeout
,
648 total_timeout
=total_timeout
,
653 "Application {} is ready in model {}".format(
654 application_name
, model_name
658 raise JujuApplicationExists(
659 "Application {} exists".format(application_name
)
662 await self
.disconnect_model(model
)
663 await self
.disconnect_controller(controller
)
667 def _get_application(self
, model
: Model
, application_name
: str) -> Application
:
670 :param: model: Model object
671 :param: application_name: Application name
673 :return: juju.application.Application (or None if it doesn't exist)
675 if model
.applications
and application_name
in model
.applications
:
676 return model
.applications
[application_name
]
678 async def execute_action(
680 application_name
: str,
683 db_dict
: dict = None,
684 progress_timeout
: float = None,
685 total_timeout
: float = None,
690 :param: application_name: Application name
691 :param: model_name: Model name
692 :param: action_name: Name of the action
693 :param: db_dict: Dictionary with data of the DB to write the updates
694 :param: progress_timeout: Maximum time between two updates in the model
695 :param: total_timeout: Timeout for the entity to be active
697 :return: (str, str): (output and status)
700 "Executing action {} using params {}".format(action_name
, kwargs
)
703 controller
= await self
.get_controller()
706 model
= await self
.get_model(controller
, model_name
)
710 application
= self
._get
_application
(
712 application_name
=application_name
,
714 if application
is None:
715 raise JujuApplicationNotFound("Cannot execute action")
719 # Ocassionally, self._get_leader_unit() will return None
720 # because the leader elected hook has not been triggered yet.
721 # Therefore, we are doing some retries. If it happens again,
724 time_between_retries
= 10
726 for _
in range(attempts
):
727 unit
= await self
._get
_leader
_unit
(application
)
729 await asyncio
.sleep(time_between_retries
)
733 raise JujuLeaderUnitNotFound(
734 "Cannot execute action: leader unit not found"
737 actions
= await application
.get_actions()
739 if action_name
not in actions
:
740 raise JujuActionNotFound(
741 "Action {} not in available actions".format(action_name
)
744 action
= await unit
.run_action(action_name
, **kwargs
)
747 "Wait until action {} is completed in application {} (model={})".format(
748 action_name
, application_name
, model_name
751 await JujuModelWatcher
.wait_for(
754 progress_timeout
=progress_timeout
,
755 total_timeout
=total_timeout
,
760 output
= await model
.get_action_output(action_uuid
=action
.entity_id
)
761 status
= await model
.get_action_status(uuid_or_prefix
=action
.entity_id
)
763 status
[action
.entity_id
] if action
.entity_id
in status
else "failed"
767 "Action {} completed with status {} in application {} (model={})".format(
768 action_name
, action
.status
, application_name
, model_name
772 await self
.disconnect_model(model
)
773 await self
.disconnect_controller(controller
)
775 return output
, status
777 async def get_actions(self
, application_name
: str, model_name
: str) -> dict:
778 """Get list of actions
780 :param: application_name: Application name
781 :param: model_name: Model name
783 :return: Dict with this format
785 "action_name": "Description of the action",
790 "Getting list of actions for application {}".format(application_name
)
794 controller
= await self
.get_controller()
797 model
= await self
.get_model(controller
, model_name
)
801 application
= self
._get
_application
(
803 application_name
=application_name
,
806 # Return list of actions
807 return await application
.get_actions()
810 # Disconnect from model and controller
811 await self
.disconnect_model(model
)
812 await self
.disconnect_controller(controller
)
814 async def get_metrics(self
, model_name
: str, application_name
: str) -> dict:
815 """Get the metrics collected by the VCA.
817 :param model_name The name or unique id of the network service
818 :param application_name The name of the application
820 if not model_name
or not application_name
:
821 raise Exception("model_name and application_name must be non-empty strings")
823 controller
= await self
.get_controller()
824 model
= await self
.get_model(controller
, model_name
)
826 application
= self
._get
_application
(model
, application_name
)
827 if application
is not None:
828 metrics
= await application
.get_metrics()
830 self
.disconnect_model(model
)
831 self
.disconnect_controller(controller
)
834 async def add_relation(
842 :param: model_name: Model name
843 :param: endpoint_1 First endpoint name
844 ("app:endpoint" format or directly the saas name)
845 :param: endpoint_2: Second endpoint name (^ same format)
848 self
.log
.debug("Adding relation: {} -> {}".format(endpoint_1
, endpoint_2
))
851 controller
= await self
.get_controller()
854 model
= await self
.get_model(controller
, model_name
)
858 await model
.add_relation(endpoint_1
, endpoint_2
)
859 except JujuAPIError
as e
:
860 if "not found" in e
.message
:
861 self
.log
.warning("Relation not found: {}".format(e
.message
))
863 if "already exists" in e
.message
:
864 self
.log
.warning("Relation already exists: {}".format(e
.message
))
866 # another exception, raise it
869 await self
.disconnect_model(model
)
870 await self
.disconnect_controller(controller
)
878 Adds a remote offer to the model. Relations can be created later using "juju relate".
880 :param: offer_url: Offer Url
881 :param: model_name: Model name
883 :raises ParseError if there's a problem parsing the offer_url
884 :raises JujuError if remote offer includes and endpoint
885 :raises JujuAPIError if the operation is not successful
887 controller
= await self
.get_controller()
888 model
= await controller
.get_model(model_name
)
891 await model
.consume(offer_url
)
893 await self
.disconnect_model(model
)
894 await self
.disconnect_controller(controller
)
896 async def destroy_model(self
, model_name
: str, total_timeout
: float):
900 :param: model_name: Model name
901 :param: total_timeout: Timeout
904 controller
= await self
.get_controller()
907 if not await self
.model_exists(model_name
, controller
=controller
):
910 model
= await self
.get_model(controller
, model_name
)
911 self
.log
.debug("Destroying model {}".format(model_name
))
912 uuid
= model
.info
.uuid
914 # Destroy machines that are manually provisioned
915 # and still are in pending state
916 await self
._destroy
_pending
_machines
(model
, only_manual
=True)
919 await self
.disconnect_model(model
)
921 await controller
.destroy_model(uuid
, force
=True, max_wait
=0)
923 # Wait until model is destroyed
924 self
.log
.debug("Waiting for model {} to be destroyed...".format(model_name
))
926 if total_timeout
is None:
928 end
= time
.time() + total_timeout
929 while time
.time() < end
:
930 models
= await controller
.list_models()
931 if model_name
not in models
:
933 "The model {} ({}) was destroyed".format(model_name
, uuid
)
936 await asyncio
.sleep(5)
938 "Timeout waiting for model {} to be destroyed".format(model_name
)
940 except Exception as e
:
942 await self
.disconnect_model(model
)
945 await self
.disconnect_controller(controller
)
947 async def destroy_application(self
, model
: Model
, application_name
: str):
951 :param: model: Model object
952 :param: application_name: Application name
955 "Destroying application {} in model {}".format(
956 application_name
, model
.info
.name
959 application
= model
.applications
.get(application_name
)
961 await application
.destroy()
963 self
.log
.warning("Application not found: {}".format(application_name
))
965 async def _destroy_pending_machines(self
, model
: Model
, only_manual
: bool = False):
967 Destroy pending machines in a given model
969 :param: only_manual: Bool that indicates only manually provisioned
970 machines should be destroyed (if True), or that
971 all pending machines should be destroyed
973 status
= await model
.get_status()
974 for machine_id
in status
.machines
:
975 machine_status
= status
.machines
[machine_id
]
976 if machine_status
.agent_status
.status
== "pending":
977 if only_manual
and not machine_status
.instance_id
.startswith("manual:"):
979 machine
= model
.machines
[machine_id
]
980 await machine
.destroy(force
=True)
982 async def configure_application(
983 self
, model_name
: str, application_name
: str, config
: dict = None
985 """Configure application
987 :param: model_name: Model name
988 :param: application_name: Application name
989 :param: config: Config to apply to the charm
991 self
.log
.debug("Configuring application {}".format(application_name
))
994 controller
= await self
.get_controller()
997 model
= await self
.get_model(controller
, model_name
)
998 application
= self
._get
_application
(
1000 application_name
=application_name
,
1002 await application
.set_config(config
)
1005 await self
.disconnect_model(model
)
1006 await self
.disconnect_controller(controller
)
1008 def _get_api_endpoints_db(self
) -> [str]:
1010 Get API Endpoints from DB
1012 :return: List of API endpoints
1014 self
.log
.debug("Getting endpoints from database")
1016 juju_info
= self
.db
.get_one(
1017 DB_DATA
.api_endpoints
.table
,
1018 q_filter
=DB_DATA
.api_endpoints
.filter,
1019 fail_on_empty
=False,
1021 if juju_info
and DB_DATA
.api_endpoints
.key
in juju_info
:
1022 return juju_info
[DB_DATA
.api_endpoints
.key
]
1024 def _update_api_endpoints_db(self
, endpoints
: [str]):
1026 Update API endpoints in Database
1028 :param: List of endpoints
1030 self
.log
.debug("Saving endpoints {} in database".format(endpoints
))
1032 juju_info
= self
.db
.get_one(
1033 DB_DATA
.api_endpoints
.table
,
1034 q_filter
=DB_DATA
.api_endpoints
.filter,
1035 fail_on_empty
=False,
1037 # If it doesn't, then create it
1041 DB_DATA
.api_endpoints
.table
,
1042 DB_DATA
.api_endpoints
.filter,
1044 except DbException
as e
:
1045 # Racing condition: check if another N2VC worker has created it
1046 juju_info
= self
.db
.get_one(
1047 DB_DATA
.api_endpoints
.table
,
1048 q_filter
=DB_DATA
.api_endpoints
.filter,
1049 fail_on_empty
=False,
1054 DB_DATA
.api_endpoints
.table
,
1055 DB_DATA
.api_endpoints
.filter,
1056 {DB_DATA
.api_endpoints
.key
: endpoints
},
1059 def handle_exception(self
, loop
, context
):
1060 # All unhandled exceptions by libjuju are handled here.
1063 async def health_check(self
, interval
: float = 300.0):
1065 Health check to make sure controller and controller_model connections are OK
1067 :param: interval: Time in seconds between checks
1072 controller
= await self
.get_controller()
1073 # self.log.debug("VCA is alive")
1074 except Exception as e
:
1075 self
.log
.error("Health check to VCA failed: {}".format(e
))
1077 await self
.disconnect_controller(controller
)
1078 await asyncio
.sleep(interval
)
1080 async def list_models(self
, contains
: str = None) -> [str]:
1081 """List models with certain names
1083 :param: contains: String that is contained in model name
1085 :retur: [models] Returns list of model names
1088 controller
= await self
.get_controller()
1090 models
= await controller
.list_models()
1092 models
= [model
for model
in models
if contains
in model
]
1095 await self
.disconnect_controller(controller
)
1097 async def list_offers(self
, model_name
: str) -> QueryApplicationOffersResults
:
1098 """List models with certain names
1100 :param: model_name: Model name
1102 :return: Returns list of offers
1105 controller
= await self
.get_controller()
1107 return await controller
.list_offers(model_name
)
1109 await self
.disconnect_controller(controller
)
1116 client_cert_data
: str,
1117 configuration
: Configuration
,
1119 credential_name
: str = None,
1122 Add a Kubernetes cloud to the controller
1124 Similar to the `juju add-k8s` command in the CLI
1126 :param: name: Name for the K8s cloud
1127 :param: configuration: Kubernetes configuration object
1128 :param: storage_class: Storage Class to use in the cloud
1129 :param: credential_name: Storage Class to use in the cloud
1132 if not storage_class
:
1133 raise Exception("storage_class must be a non-empty string")
1135 raise Exception("name must be a non-empty string")
1136 if not configuration
:
1137 raise Exception("configuration must be provided")
1139 endpoint
= configuration
.host
1140 credential
= self
.get_k8s_cloud_credential(
1145 credential
.attrs
[RBAC_LABEL_KEY_NAME
] = rbac_id
1146 cloud
= client
.Cloud(
1148 auth_types
=[credential
.auth_type
],
1150 ca_certificates
=[client_cert_data
],
1152 "operator-storage": storage_class
,
1153 "workload-storage": storage_class
,
1157 return await self
.add_cloud(
1158 name
, cloud
, credential
, credential_name
=credential_name
1161 def get_k8s_cloud_credential(
1163 configuration
: Configuration
,
1164 client_cert_data
: str,
1166 ) -> client
.CloudCredential
:
1168 # TODO: Test with AKS
1169 key
= None # open(configuration.key_file, "r").read()
1170 username
= configuration
.username
1171 password
= configuration
.password
1173 if client_cert_data
:
1174 attrs
["ClientCertificateData"] = client_cert_data
1176 attrs
["ClientKeyData"] = key
1178 if username
or password
:
1179 raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass")
1180 attrs
["Token"] = token
1184 auth_type
= "oauth2"
1185 if client_cert_data
:
1186 auth_type
= "oauth2withcert"
1188 raise JujuInvalidK8sConfiguration(
1189 "missing token for auth type {}".format(auth_type
)
1194 "credential for user {} has empty password".format(username
)
1196 attrs
["username"] = username
1197 attrs
["password"] = password
1198 if client_cert_data
:
1199 auth_type
= "userpasswithcert"
1201 auth_type
= "userpass"
1202 elif client_cert_data
and token
:
1203 auth_type
= "certificate"
1205 raise JujuInvalidK8sConfiguration("authentication method not supported")
1206 return client
.CloudCredential(auth_type
=auth_type
, attrs
=attrs
)
1208 async def add_cloud(
1212 credential
: CloudCredential
= None,
1213 credential_name
: str = None,
1216 Add cloud to the controller
1218 :param: name: Name of the cloud to be added
1219 :param: cloud: Cloud object
1220 :param: credential: CloudCredentials object for the cloud
1221 :param: credential_name: Credential name.
1222 If not defined, cloud of the name will be used.
1224 controller
= await self
.get_controller()
1226 _
= await controller
.add_cloud(name
, cloud
)
1228 await controller
.add_credential(
1229 credential_name
or name
, credential
=credential
, cloud
=name
1231 # Need to return the object returned by the controller.add_cloud() function
1232 # I'm returning the original value now until this bug is fixed:
1233 # https://github.com/juju/python-libjuju/issues/443
1236 await self
.disconnect_controller(controller
)
1238 async def remove_cloud(self
, name
: str):
1242 :param: name: Name of the cloud to be removed
1244 controller
= await self
.get_controller()
1246 await controller
.remove_cloud(name
)
1248 await self
.disconnect_controller(controller
)
1250 async def _get_leader_unit(self
, application
: Application
) -> Unit
:
1252 for u
in application
.units
:
1253 if await u
.is_leader_from_status():
1258 async def get_cloud_credentials(self
, cloud_name
: str, credential_name
: str):
1259 controller
= await self
.get_controller()
1261 facade
= client
.CloudFacade
.from_connection(controller
.connection())
1262 cloud_cred_tag
= tag
.credential(cloud_name
, self
.username
, credential_name
)
1263 params
= [client
.Entity(cloud_cred_tag
)]
1264 return (await facade
.Credential(params
)).results
1266 await self
.disconnect_controller(controller
)