1 # Copyright 2020 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
20 from juju
.errors
import JujuAPIError
21 from juju
.model
import Model
22 from juju
.machine
import Machine
23 from juju
.application
import Application
24 from juju
.unit
import Unit
25 from juju
.client
._definitions
import (
27 QueryApplicationOffersResults
,
31 from juju
.controller
import Controller
32 from juju
.client
import client
35 from n2vc
.config
import ModelConfig
36 from n2vc
.juju_watcher
import JujuModelWatcher
37 from n2vc
.provisioner
import AsyncSSHProvisioner
38 from n2vc
.n2vc_conn
import N2VCConnector
39 from n2vc
.exceptions
import (
41 JujuApplicationNotFound
,
42 JujuLeaderUnitNotFound
,
44 JujuControllerFailedConnecting
,
45 JujuApplicationExists
,
46 JujuInvalidK8sConfiguration
,
49 from n2vc
.utils
import DB_DATA
50 from osm_common
.dbbase
import DbException
51 from kubernetes
.client
.configuration
import Configuration
53 RBAC_LABEL_KEY_NAME
= "rbac-id"
64 loop
: asyncio
.AbstractEventLoop
= None,
65 log
: logging
.Logger
= None,
67 n2vc
: N2VCConnector
= None,
68 model_config
: ModelConfig
= {},
73 :param: endpoint: Endpoint of the juju controller (host:port)
74 :param: api_proxy: Endpoint of the juju controller - Reachable from the VNFs
75 :param: username: Juju username
76 :param: password: Juju password
77 :param: cacert: Juju CA Certificate
78 :param: loop: Asyncio loop
81 :param: n2vc: N2VC object
82 :param: apt_mirror: APT Mirror
83 :param: enable_os_upgrade: Enable OS Upgrade
86 self
.log
= log
or logging
.getLogger("Libjuju")
88 db_endpoints
= self
._get
_api
_endpoints
_db
()
90 if (db_endpoints
and endpoint
not in db_endpoints
) or not db_endpoints
:
91 self
.endpoints
= [endpoint
]
92 self
._update
_api
_endpoints
_db
(self
.endpoints
)
94 self
.endpoints
= db_endpoints
95 self
.api_proxy
= api_proxy
96 self
.username
= username
97 self
.password
= password
99 self
.loop
= loop
or asyncio
.get_event_loop()
102 # Generate config for models
103 self
.model_config
= model_config
105 self
.loop
.set_exception_handler(self
.handle_exception
)
106 self
.creating_model
= asyncio
.Lock(loop
=self
.loop
)
108 self
.log
.debug("Libjuju initialized!")
110 self
.health_check_task
= self
._create
_health
_check
_task
()
112 def _create_health_check_task(self
):
113 return self
.loop
.create_task(self
.health_check())
115 async def get_controller(self
, timeout
: float = 15.0) -> Controller
:
119 :param: timeout: Time in seconds to wait for controller to connect
123 controller
= Controller(loop
=self
.loop
)
124 await asyncio
.wait_for(
126 endpoint
=self
.endpoints
,
127 username
=self
.username
,
128 password
=self
.password
,
133 endpoints
= await controller
.api_endpoints
134 if self
.endpoints
!= endpoints
:
135 self
.endpoints
= endpoints
136 self
._update
_api
_endpoints
_db
(self
.endpoints
)
138 except asyncio
.CancelledError
as e
:
140 except Exception as e
:
142 "Failed connecting to controller: {}...".format(self
.endpoints
)
145 await self
.disconnect_controller(controller
)
146 raise JujuControllerFailedConnecting(e
)
148 async def disconnect(self
):
150 # Cancel health check task
151 self
.health_check_task
.cancel()
152 self
.log
.debug("Libjuju disconnected!")
154 async def disconnect_model(self
, model
: Model
):
158 :param: model: Model that will be disconnected
160 await model
.disconnect()
162 async def disconnect_controller(self
, controller
: Controller
):
164 Disconnect controller
166 :param: controller: Controller that will be disconnected
169 await controller
.disconnect()
171 async def add_model(self
, model_name
: str, cloud_name
: str, credential_name
=None):
175 :param: model_name: Model name
176 :param: cloud_name: Cloud name
177 :param: credential_name: Credential name to use for adding the model
178 If not specified, same name as the cloud will be used.
182 controller
= await self
.get_controller()
185 # Block until other workers have finished model creation
186 while self
.creating_model
.locked():
187 await asyncio
.sleep(0.1)
190 async with self
.creating_model
:
191 if await self
.model_exists(model_name
, controller
=controller
):
193 self
.log
.debug("Creating model {}".format(model_name
))
194 model
= await controller
.add_model(
196 config
=self
.model_config
,
197 cloud_name
=cloud_name
,
198 credential_name
=credential_name
or cloud_name
,
202 await self
.disconnect_model(model
)
203 await self
.disconnect_controller(controller
)
205 async def get_executed_actions(self
, model_name
: str) -> list:
207 Get executed/history of actions for a model.
209 :param: model_name: Model name, str.
210 :return: List of executed actions for a model.
213 executed_actions
= []
214 controller
= await self
.get_controller()
216 model
= await self
.get_model(controller
, model_name
)
217 # Get all unique action names
219 for application
in model
.applications
:
220 application_actions
= await self
.get_actions(application
, model_name
)
221 actions
.update(application_actions
)
222 # Get status of all actions
223 for application_action
in actions
:
224 app_action_status_list
= await model
.get_action_status(name
=application_action
)
225 for action_id
, action_status
in app_action_status_list
.items():
226 executed_action
= {"id": action_id
, "action": application_action
,
227 "status": action_status
}
228 # Get action output by id
229 action_status
= await model
.get_action_output(executed_action
["id"])
230 for k
, v
in action_status
.items():
231 executed_action
[k
] = v
232 executed_actions
.append(executed_action
)
233 except Exception as e
:
234 raise JujuError("Error in getting executed actions for model: {}. Error: {}"
235 .format(model_name
, str(e
)))
238 await self
.disconnect_model(model
)
239 await self
.disconnect_controller(controller
)
240 return executed_actions
242 async def get_application_configs(self
, model_name
: str, application_name
: str) -> dict:
244 Get available configs for an application.
246 :param: model_name: Model name, str.
247 :param: application_name: Application name, str.
249 :return: A dict which has key - action name, value - action description
252 application_configs
= {}
253 controller
= await self
.get_controller()
255 model
= await self
.get_model(controller
, model_name
)
256 application
= self
._get
_application
(model
, application_name
=application_name
)
257 application_configs
= await application
.get_config()
258 except Exception as e
:
259 raise JujuError("Error in getting configs for application: {} in model: {}. Error: {}"
260 .format(application_name
, model_name
, str(e
)))
263 await self
.disconnect_model(model
)
264 await self
.disconnect_controller(controller
)
265 return application_configs
268 self
, controller
: Controller
, model_name
: str, id=None
271 Get model from controller
273 :param: controller: Controller
274 :param: model_name: Model name
276 :return: Model: The created Juju model object
278 return await controller
.get_model(model_name
)
280 async def model_exists(
281 self
, model_name
: str, controller
: Controller
= None
284 Check if model exists
286 :param: controller: Controller
287 :param: model_name: Model name
291 need_to_disconnect
= False
293 # Get controller if not passed
295 controller
= await self
.get_controller()
296 need_to_disconnect
= True
298 # Check if model exists
300 return model_name
in await controller
.list_models()
302 if need_to_disconnect
:
303 await self
.disconnect_controller(controller
)
305 async def models_exist(self
, model_names
: [str]) -> (bool, list):
307 Check if models exists
309 :param: model_names: List of strings with model names
311 :return (bool, list[str]): (True if all models exists, List of model names that don't exist)
315 "model_names must be a non-empty array. Given value: {}".format(
319 non_existing_models
= []
320 models
= await self
.list_models()
321 existing_models
= list(set(models
).intersection(model_names
))
322 non_existing_models
= list(set(model_names
) - set(existing_models
))
325 len(non_existing_models
) == 0,
329 async def get_model_status(self
, model_name
: str) -> FullStatus
:
333 :param: model_name: Model name
335 :return: Full status object
337 controller
= await self
.get_controller()
338 model
= await self
.get_model(controller
, model_name
)
340 return await model
.get_status()
342 await self
.disconnect_model(model
)
343 await self
.disconnect_controller(controller
)
345 async def create_machine(
348 machine_id
: str = None,
349 db_dict
: dict = None,
350 progress_timeout
: float = None,
351 total_timeout
: float = None,
352 series
: str = "xenial",
354 ) -> (Machine
, bool):
358 :param: model_name: Model name
359 :param: machine_id: Machine id
360 :param: db_dict: Dictionary with data of the DB to write the updates
361 :param: progress_timeout: Maximum time between two updates in the model
362 :param: total_timeout: Timeout for the entity to be active
363 :param: series: Series of the machine (xenial, bionic, focal, ...)
364 :param: wait: Wait until machine is ready
366 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
367 if the machine is new or it already existed
373 "Creating machine (id={}) in model: {}".format(machine_id
, model_name
)
377 controller
= await self
.get_controller()
380 model
= await self
.get_model(controller
, model_name
)
382 if machine_id
is not None:
384 "Searching machine (id={}) in model {}".format(
385 machine_id
, model_name
389 # Get machines from model and get the machine with machine_id if exists
390 machines
= await model
.get_machines()
391 if machine_id
in machines
:
393 "Machine (id={}) found in model {}".format(
394 machine_id
, model_name
397 machine
= machines
[machine_id
]
399 raise JujuMachineNotFound("Machine {} not found".format(machine_id
))
402 self
.log
.debug("Creating a new machine in model {}".format(model_name
))
405 machine
= await model
.add_machine(
406 spec
=None, constraints
=None, disks
=None, series
=series
410 # Wait until the machine is ready
412 "Wait until machine {} is ready in model {}".format(
413 machine
.entity_id
, model_name
417 await JujuModelWatcher
.wait_for(
420 progress_timeout
=progress_timeout
,
421 total_timeout
=total_timeout
,
426 await self
.disconnect_model(model
)
427 await self
.disconnect_controller(controller
)
430 "Machine {} ready at {} in model {}".format(
431 machine
.entity_id
, machine
.dns_name
, model_name
436 async def provision_machine(
441 private_key_path
: str,
442 db_dict
: dict = None,
443 progress_timeout
: float = None,
444 total_timeout
: float = None,
447 Manually provisioning of a machine
449 :param: model_name: Model name
450 :param: hostname: IP to access the machine
451 :param: username: Username to login to the machine
452 :param: private_key_path: Local path for the private key
453 :param: db_dict: Dictionary with data of the DB to write the updates
454 :param: progress_timeout: Maximum time between two updates in the model
455 :param: total_timeout: Timeout for the entity to be active
457 :return: (Entity): Machine id
460 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
461 model_name
, hostname
, username
466 controller
= await self
.get_controller()
469 model
= await self
.get_model(controller
, model_name
)
473 provisioner
= AsyncSSHProvisioner(
476 private_key_path
=private_key_path
,
481 params
= await provisioner
.provision_machine()
483 params
.jobs
= ["JobHostUnits"]
485 self
.log
.debug("Adding machine to model")
486 connection
= model
.connection()
487 client_facade
= client
.ClientFacade
.from_connection(connection
)
489 results
= await client_facade
.AddMachines(params
=[params
])
490 error
= results
.machines
[0].error
493 msg
= "Error adding machine: {}".format(error
.message
)
494 self
.log
.error(msg
=msg
)
495 raise ValueError(msg
)
497 machine_id
= results
.machines
[0].machine
499 self
.log
.debug("Installing Juju agent into machine {}".format(machine_id
))
500 asyncio
.ensure_future(
501 provisioner
.install_agent(
502 connection
=connection
,
504 machine_id
=machine_id
,
505 proxy
=self
.api_proxy
,
506 series
=params
.series
,
512 machine_list
= await model
.get_machines()
513 if machine_id
in machine_list
:
514 self
.log
.debug("Machine {} found in model!".format(machine_id
))
515 machine
= model
.machines
.get(machine_id
)
517 await asyncio
.sleep(2)
520 msg
= "Machine {} not found in model".format(machine_id
)
521 self
.log
.error(msg
=msg
)
522 raise JujuMachineNotFound(msg
)
525 "Wait until machine {} is ready in model {}".format(
526 machine
.entity_id
, model_name
529 await JujuModelWatcher
.wait_for(
532 progress_timeout
=progress_timeout
,
533 total_timeout
=total_timeout
,
537 except Exception as e
:
540 await self
.disconnect_model(model
)
541 await self
.disconnect_controller(controller
)
544 "Machine provisioned {} in model {}".format(machine_id
, model_name
)
550 self
, uri
: str, model_name
: str, wait
: bool = True, timeout
: float = 3600
553 Deploy bundle or charm: Similar to the juju CLI command `juju deploy`
555 :param: uri: Path or Charm Store uri in which the charm or bundle can be found
556 :param: model_name: Model name
557 :param: wait: Indicates whether to wait or not until all applications are active
558 :param: timeout: Time in seconds to wait until all applications are active
560 controller
= await self
.get_controller()
561 model
= await self
.get_model(controller
, model_name
)
563 await model
.deploy(uri
)
565 await JujuModelWatcher
.wait_for_model(model
, timeout
=timeout
)
566 self
.log
.debug("All units active in model {}".format(model_name
))
568 await self
.disconnect_model(model
)
569 await self
.disconnect_controller(controller
)
571 async def deploy_charm(
573 application_name
: str,
577 db_dict
: dict = None,
578 progress_timeout
: float = None,
579 total_timeout
: float = None,
586 :param: application_name: Application name
587 :param: path: Local path to the charm
588 :param: model_name: Model name
589 :param: machine_id ID of the machine
590 :param: db_dict: Dictionary with data of the DB to write the updates
591 :param: progress_timeout: Maximum time between two updates in the model
592 :param: total_timeout: Timeout for the entity to be active
593 :param: config: Config for the charm
594 :param: series: Series of the charm
595 :param: num_units: Number of units
597 :return: (juju.application.Application): Juju application
600 "Deploying charm {} to machine {} in model ~{}".format(
601 application_name
, machine_id
, model_name
604 self
.log
.debug("charm: {}".format(path
))
607 controller
= await self
.get_controller()
610 model
= await self
.get_model(controller
, model_name
)
614 if application_name
not in model
.applications
:
616 if machine_id
is not None:
617 if machine_id
not in model
.machines
:
618 msg
= "Machine {} not found in model".format(machine_id
)
619 self
.log
.error(msg
=msg
)
620 raise JujuMachineNotFound(msg
)
621 machine
= model
.machines
[machine_id
]
622 series
= machine
.series
624 application
= await model
.deploy(
626 application_name
=application_name
,
635 "Wait until application {} is ready in model {}".format(
636 application_name
, model_name
640 for _
in range(num_units
- 1):
641 m
, _
= await self
.create_machine(model_name
, wait
=False)
642 await application
.add_unit(to
=m
.entity_id
)
644 await JujuModelWatcher
.wait_for(
647 progress_timeout
=progress_timeout
,
648 total_timeout
=total_timeout
,
653 "Application {} is ready in model {}".format(
654 application_name
, model_name
658 raise JujuApplicationExists(
659 "Application {} exists".format(application_name
)
662 await self
.disconnect_model(model
)
663 await self
.disconnect_controller(controller
)
667 def _get_application(self
, model
: Model
, application_name
: str) -> Application
:
670 :param: model: Model object
671 :param: application_name: Application name
673 :return: juju.application.Application (or None if it doesn't exist)
675 if model
.applications
and application_name
in model
.applications
:
676 return model
.applications
[application_name
]
678 async def execute_action(
680 application_name
: str,
683 db_dict
: dict = None,
684 progress_timeout
: float = None,
685 total_timeout
: float = None,
690 :param: application_name: Application name
691 :param: model_name: Model name
692 :param: action_name: Name of the action
693 :param: db_dict: Dictionary with data of the DB to write the updates
694 :param: progress_timeout: Maximum time between two updates in the model
695 :param: total_timeout: Timeout for the entity to be active
697 :return: (str, str): (output and status)
700 "Executing action {} using params {}".format(action_name
, kwargs
)
703 controller
= await self
.get_controller()
706 model
= await self
.get_model(controller
, model_name
)
710 application
= self
._get
_application
(
712 application_name
=application_name
,
714 if application
is None:
715 raise JujuApplicationNotFound("Cannot execute action")
719 # Ocassionally, self._get_leader_unit() will return None
720 # because the leader elected hook has not been triggered yet.
721 # Therefore, we are doing some retries. If it happens again,
724 time_between_retries
= 10
726 for _
in range(attempts
):
727 unit
= await self
._get
_leader
_unit
(application
)
729 await asyncio
.sleep(time_between_retries
)
733 raise JujuLeaderUnitNotFound(
734 "Cannot execute action: leader unit not found"
737 actions
= await application
.get_actions()
739 if action_name
not in actions
:
740 raise JujuActionNotFound(
741 "Action {} not in available actions".format(action_name
)
744 action
= await unit
.run_action(action_name
, **kwargs
)
747 "Wait until action {} is completed in application {} (model={})".format(
748 action_name
, application_name
, model_name
751 await JujuModelWatcher
.wait_for(
754 progress_timeout
=progress_timeout
,
755 total_timeout
=total_timeout
,
760 output
= await model
.get_action_output(action_uuid
=action
.entity_id
)
761 status
= await model
.get_action_status(uuid_or_prefix
=action
.entity_id
)
763 status
[action
.entity_id
] if action
.entity_id
in status
else "failed"
767 "Action {} completed with status {} in application {} (model={})".format(
768 action_name
, action
.status
, application_name
, model_name
772 await self
.disconnect_model(model
)
773 await self
.disconnect_controller(controller
)
775 return output
, status
777 async def get_actions(self
, application_name
: str, model_name
: str) -> dict:
778 """Get list of actions
780 :param: application_name: Application name
781 :param: model_name: Model name
783 :return: Dict with this format
785 "action_name": "Description of the action",
790 "Getting list of actions for application {}".format(application_name
)
794 controller
= await self
.get_controller()
797 model
= await self
.get_model(controller
, model_name
)
801 application
= self
._get
_application
(
803 application_name
=application_name
,
806 # Return list of actions
807 return await application
.get_actions()
810 # Disconnect from model and controller
811 await self
.disconnect_model(model
)
812 await self
.disconnect_controller(controller
)
814 async def get_metrics(self
, model_name
: str, application_name
: str) -> dict:
815 """Get the metrics collected by the VCA.
817 :param model_name The name or unique id of the network service
818 :param application_name The name of the application
820 if not model_name
or not application_name
:
821 raise Exception("model_name and application_name must be non-empty strings")
823 controller
= await self
.get_controller()
824 model
= await self
.get_model(controller
, model_name
)
826 application
= self
._get
_application
(model
, application_name
)
827 if application
is not None:
828 metrics
= await application
.get_metrics()
830 self
.disconnect_model(model
)
831 self
.disconnect_controller(controller
)
834 async def add_relation(
842 :param: model_name: Model name
843 :param: endpoint_1 First endpoint name
844 ("app:endpoint" format or directly the saas name)
845 :param: endpoint_2: Second endpoint name (^ same format)
848 self
.log
.debug("Adding relation: {} -> {}".format(endpoint_1
, endpoint_2
))
851 controller
= await self
.get_controller()
854 model
= await self
.get_model(controller
, model_name
)
858 await model
.add_relation(endpoint_1
, endpoint_2
)
859 except JujuAPIError
as e
:
860 if "not found" in e
.message
:
861 self
.log
.warning("Relation not found: {}".format(e
.message
))
863 if "already exists" in e
.message
:
864 self
.log
.warning("Relation already exists: {}".format(e
.message
))
866 # another exception, raise it
869 await self
.disconnect_model(model
)
870 await self
.disconnect_controller(controller
)
878 Adds a remote offer to the model. Relations can be created later using "juju relate".
880 :param: offer_url: Offer Url
881 :param: model_name: Model name
883 :raises ParseError if there's a problem parsing the offer_url
884 :raises JujuError if remote offer includes and endpoint
885 :raises JujuAPIError if the operation is not successful
887 controller
= await self
.get_controller()
888 model
= await controller
.get_model(model_name
)
891 await model
.consume(offer_url
)
893 await self
.disconnect_model(model
)
894 await self
.disconnect_controller(controller
)
896 async def destroy_model(self
, model_name
: str, total_timeout
: float):
900 :param: model_name: Model name
901 :param: total_timeout: Timeout
904 controller
= await self
.get_controller()
907 if not await self
.model_exists(model_name
, controller
=controller
):
910 model
= await self
.get_model(controller
, model_name
)
911 self
.log
.debug("Destroying model {}".format(model_name
))
912 uuid
= model
.info
.uuid
914 # Destroy machines that are manually provisioned
915 # and still are in pending state
916 await self
._destroy
_pending
_machines
(model
, only_manual
=True)
919 await self
.disconnect_model(model
)
921 await controller
.destroy_model(uuid
, force
=True, max_wait
=0)
923 # Wait until model is destroyed
924 self
.log
.debug("Waiting for model {} to be destroyed...".format(model_name
))
926 if total_timeout
is None:
928 end
= time
.time() + total_timeout
929 while time
.time() < end
:
930 models
= await controller
.list_models()
931 if model_name
not in models
:
933 "The model {} ({}) was destroyed".format(model_name
, uuid
)
936 await asyncio
.sleep(5)
938 "Timeout waiting for model {} to be destroyed".format(model_name
)
940 except Exception as e
:
942 await self
.disconnect_model(model
)
945 await self
.disconnect_controller(controller
)
947 async def destroy_application(
948 self
, model_name
: str, application_name
: str, total_timeout
: float
953 :param: model_name: Model name
954 :param: application_name: Application name
955 :param: total_timeout: Timeout
958 controller
= await self
.get_controller()
962 model
= await self
.get_model(controller
, model_name
)
964 "Destroying application {} in model {}".format(
965 application_name
, model_name
968 application
= self
._get
_application
(model
, application_name
)
970 await application
.destroy()
972 self
.log
.warning("Application not found: {}".format(application_name
))
975 "Waiting for application {} to be destroyed in model {}...".format(
976 application_name
, model_name
979 if total_timeout
is None:
981 end
= time
.time() + total_timeout
982 while time
.time() < end
:
983 if not self
._get
_application
(model
, application_name
):
985 "The application {} was destroyed in model {} ".format(
986 application_name
, model_name
990 await asyncio
.sleep(5)
992 "Timeout waiting for application {} to be destroyed in model {}".format(
993 application_name
, model_name
997 if model
is not None:
998 await self
.disconnect_model(model
)
999 await self
.disconnect_controller(controller
)
1001 async def _destroy_pending_machines(self
, model
: Model
, only_manual
: bool = False):
1003 Destroy pending machines in a given model
1005 :param: only_manual: Bool that indicates only manually provisioned
1006 machines should be destroyed (if True), or that
1007 all pending machines should be destroyed
1009 status
= await model
.get_status()
1010 for machine_id
in status
.machines
:
1011 machine_status
= status
.machines
[machine_id
]
1012 if machine_status
.agent_status
.status
== "pending":
1013 if only_manual
and not machine_status
.instance_id
.startswith("manual:"):
1015 machine
= model
.machines
[machine_id
]
1016 await machine
.destroy(force
=True)
1018 async def configure_application(
1019 self
, model_name
: str, application_name
: str, config
: dict = None
1021 """Configure application
1023 :param: model_name: Model name
1024 :param: application_name: Application name
1025 :param: config: Config to apply to the charm
1027 self
.log
.debug("Configuring application {}".format(application_name
))
1030 controller
= await self
.get_controller()
1033 model
= await self
.get_model(controller
, model_name
)
1034 application
= self
._get
_application
(
1036 application_name
=application_name
,
1038 await application
.set_config(config
)
1041 await self
.disconnect_model(model
)
1042 await self
.disconnect_controller(controller
)
1044 def _get_api_endpoints_db(self
) -> [str]:
1046 Get API Endpoints from DB
1048 :return: List of API endpoints
1050 self
.log
.debug("Getting endpoints from database")
1052 juju_info
= self
.db
.get_one(
1053 DB_DATA
.api_endpoints
.table
,
1054 q_filter
=DB_DATA
.api_endpoints
.filter,
1055 fail_on_empty
=False,
1057 if juju_info
and DB_DATA
.api_endpoints
.key
in juju_info
:
1058 return juju_info
[DB_DATA
.api_endpoints
.key
]
1060 def _update_api_endpoints_db(self
, endpoints
: [str]):
1062 Update API endpoints in Database
1064 :param: List of endpoints
1066 self
.log
.debug("Saving endpoints {} in database".format(endpoints
))
1068 juju_info
= self
.db
.get_one(
1069 DB_DATA
.api_endpoints
.table
,
1070 q_filter
=DB_DATA
.api_endpoints
.filter,
1071 fail_on_empty
=False,
1073 # If it doesn't, then create it
1077 DB_DATA
.api_endpoints
.table
,
1078 DB_DATA
.api_endpoints
.filter,
1080 except DbException
as e
:
1081 # Racing condition: check if another N2VC worker has created it
1082 juju_info
= self
.db
.get_one(
1083 DB_DATA
.api_endpoints
.table
,
1084 q_filter
=DB_DATA
.api_endpoints
.filter,
1085 fail_on_empty
=False,
1090 DB_DATA
.api_endpoints
.table
,
1091 DB_DATA
.api_endpoints
.filter,
1092 {DB_DATA
.api_endpoints
.key
: endpoints
},
1095 def handle_exception(self
, loop
, context
):
1096 # All unhandled exceptions by libjuju are handled here.
1099 async def health_check(self
, interval
: float = 300.0):
1101 Health check to make sure controller and controller_model connections are OK
1103 :param: interval: Time in seconds between checks
1108 controller
= await self
.get_controller()
1109 # self.log.debug("VCA is alive")
1110 except Exception as e
:
1111 self
.log
.error("Health check to VCA failed: {}".format(e
))
1113 await self
.disconnect_controller(controller
)
1114 await asyncio
.sleep(interval
)
1116 async def list_models(self
, contains
: str = None) -> [str]:
1117 """List models with certain names
1119 :param: contains: String that is contained in model name
1121 :retur: [models] Returns list of model names
1124 controller
= await self
.get_controller()
1126 models
= await controller
.list_models()
1128 models
= [model
for model
in models
if contains
in model
]
1131 await self
.disconnect_controller(controller
)
1133 async def list_offers(self
, model_name
: str) -> QueryApplicationOffersResults
:
1134 """List models with certain names
1136 :param: model_name: Model name
1138 :return: Returns list of offers
1141 controller
= await self
.get_controller()
1143 return await controller
.list_offers(model_name
)
1145 await self
.disconnect_controller(controller
)
1152 client_cert_data
: str,
1153 configuration
: Configuration
,
1155 credential_name
: str = None,
1158 Add a Kubernetes cloud to the controller
1160 Similar to the `juju add-k8s` command in the CLI
1162 :param: name: Name for the K8s cloud
1163 :param: configuration: Kubernetes configuration object
1164 :param: storage_class: Storage Class to use in the cloud
1165 :param: credential_name: Storage Class to use in the cloud
1168 if not storage_class
:
1169 raise Exception("storage_class must be a non-empty string")
1171 raise Exception("name must be a non-empty string")
1172 if not configuration
:
1173 raise Exception("configuration must be provided")
1175 endpoint
= configuration
.host
1176 credential
= self
.get_k8s_cloud_credential(
1181 credential
.attrs
[RBAC_LABEL_KEY_NAME
] = rbac_id
1182 cloud
= client
.Cloud(
1184 auth_types
=[credential
.auth_type
],
1186 ca_certificates
=[client_cert_data
],
1188 "operator-storage": storage_class
,
1189 "workload-storage": storage_class
,
1193 return await self
.add_cloud(
1194 name
, cloud
, credential
, credential_name
=credential_name
1197 def get_k8s_cloud_credential(
1199 configuration
: Configuration
,
1200 client_cert_data
: str,
1202 ) -> client
.CloudCredential
:
1204 # TODO: Test with AKS
1205 key
= None # open(configuration.key_file, "r").read()
1206 username
= configuration
.username
1207 password
= configuration
.password
1209 if client_cert_data
:
1210 attrs
["ClientCertificateData"] = client_cert_data
1212 attrs
["ClientKeyData"] = key
1214 if username
or password
:
1215 raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass")
1216 attrs
["Token"] = token
1220 auth_type
= "oauth2"
1221 if client_cert_data
:
1222 auth_type
= "oauth2withcert"
1224 raise JujuInvalidK8sConfiguration(
1225 "missing token for auth type {}".format(auth_type
)
1230 "credential for user {} has empty password".format(username
)
1232 attrs
["username"] = username
1233 attrs
["password"] = password
1234 if client_cert_data
:
1235 auth_type
= "userpasswithcert"
1237 auth_type
= "userpass"
1238 elif client_cert_data
and token
:
1239 auth_type
= "certificate"
1241 raise JujuInvalidK8sConfiguration("authentication method not supported")
1242 return client
.CloudCredential(auth_type
=auth_type
, attrs
=attrs
)
1244 async def add_cloud(
1248 credential
: CloudCredential
= None,
1249 credential_name
: str = None,
1252 Add cloud to the controller
1254 :param: name: Name of the cloud to be added
1255 :param: cloud: Cloud object
1256 :param: credential: CloudCredentials object for the cloud
1257 :param: credential_name: Credential name.
1258 If not defined, cloud of the name will be used.
1260 controller
= await self
.get_controller()
1262 _
= await controller
.add_cloud(name
, cloud
)
1264 await controller
.add_credential(
1265 credential_name
or name
, credential
=credential
, cloud
=name
1267 # Need to return the object returned by the controller.add_cloud() function
1268 # I'm returning the original value now until this bug is fixed:
1269 # https://github.com/juju/python-libjuju/issues/443
1272 await self
.disconnect_controller(controller
)
1274 async def remove_cloud(self
, name
: str):
1278 :param: name: Name of the cloud to be removed
1280 controller
= await self
.get_controller()
1282 await controller
.remove_cloud(name
)
1284 await self
.disconnect_controller(controller
)
1286 async def _get_leader_unit(self
, application
: Application
) -> Unit
:
1288 for u
in application
.units
:
1289 if await u
.is_leader_from_status():
1294 async def get_cloud_credentials(self
, cloud_name
: str, credential_name
: str):
1295 controller
= await self
.get_controller()
1297 facade
= client
.CloudFacade
.from_connection(controller
.connection())
1298 cloud_cred_tag
= tag
.credential(cloud_name
, self
.username
, credential_name
)
1299 params
= [client
.Entity(cloud_cred_tag
)]
1300 return (await facade
.Credential(params
)).results
1302 await self
.disconnect_controller(controller
)