Feature-9904: Enhancing NG-UI to enable Juju operational view dashboard
[osm/N2VC.git] / n2vc / libjuju.py
1 # Copyright 2020 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import logging
17
18 import time
19
20 from juju.errors import JujuAPIError
21 from juju.model import Model
22 from juju.machine import Machine
23 from juju.application import Application
24 from juju.unit import Unit
25 from juju.client._definitions import (
26 FullStatus,
27 QueryApplicationOffersResults,
28 Cloud,
29 CloudCredential,
30 )
31 from juju.controller import Controller
32 from juju.client import client
33 from juju import tag
34
35 from n2vc.config import ModelConfig
36 from n2vc.juju_watcher import JujuModelWatcher
37 from n2vc.provisioner import AsyncSSHProvisioner
38 from n2vc.n2vc_conn import N2VCConnector
39 from n2vc.exceptions import (
40 JujuMachineNotFound,
41 JujuApplicationNotFound,
42 JujuLeaderUnitNotFound,
43 JujuActionNotFound,
44 JujuControllerFailedConnecting,
45 JujuApplicationExists,
46 JujuInvalidK8sConfiguration,
47 JujuError
48 )
49 from n2vc.utils import DB_DATA
50 from osm_common.dbbase import DbException
51 from kubernetes.client.configuration import Configuration
52
53 RBAC_LABEL_KEY_NAME = "rbac-id"
54
55
56 class Libjuju:
57 def __init__(
58 self,
59 endpoint: str,
60 api_proxy: str,
61 username: str,
62 password: str,
63 cacert: str,
64 loop: asyncio.AbstractEventLoop = None,
65 log: logging.Logger = None,
66 db: dict = None,
67 n2vc: N2VCConnector = None,
68 model_config: ModelConfig = {},
69 ):
70 """
71 Constructor
72
73 :param: endpoint: Endpoint of the juju controller (host:port)
74 :param: api_proxy: Endpoint of the juju controller - Reachable from the VNFs
75 :param: username: Juju username
76 :param: password: Juju password
77 :param: cacert: Juju CA Certificate
78 :param: loop: Asyncio loop
79 :param: log: Logger
80 :param: db: DB object
81 :param: n2vc: N2VC object
82 :param: apt_mirror: APT Mirror
83 :param: enable_os_upgrade: Enable OS Upgrade
84 """
85
86 self.log = log or logging.getLogger("Libjuju")
87 self.db = db
88 db_endpoints = self._get_api_endpoints_db()
89 self.endpoints = None
90 if (db_endpoints and endpoint not in db_endpoints) or not db_endpoints:
91 self.endpoints = [endpoint]
92 self._update_api_endpoints_db(self.endpoints)
93 else:
94 self.endpoints = db_endpoints
95 self.api_proxy = api_proxy
96 self.username = username
97 self.password = password
98 self.cacert = cacert
99 self.loop = loop or asyncio.get_event_loop()
100 self.n2vc = n2vc
101
102 # Generate config for models
103 self.model_config = model_config
104
105 self.loop.set_exception_handler(self.handle_exception)
106 self.creating_model = asyncio.Lock(loop=self.loop)
107
108 self.log.debug("Libjuju initialized!")
109
110 self.health_check_task = self._create_health_check_task()
111
112 def _create_health_check_task(self):
113 return self.loop.create_task(self.health_check())
114
115 async def get_controller(self, timeout: float = 15.0) -> Controller:
116 """
117 Get controller
118
119 :param: timeout: Time in seconds to wait for controller to connect
120 """
121 controller = None
122 try:
123 controller = Controller(loop=self.loop)
124 await asyncio.wait_for(
125 controller.connect(
126 endpoint=self.endpoints,
127 username=self.username,
128 password=self.password,
129 cacert=self.cacert,
130 ),
131 timeout=timeout,
132 )
133 endpoints = await controller.api_endpoints
134 if self.endpoints != endpoints:
135 self.endpoints = endpoints
136 self._update_api_endpoints_db(self.endpoints)
137 return controller
138 except asyncio.CancelledError as e:
139 raise e
140 except Exception as e:
141 self.log.error(
142 "Failed connecting to controller: {}...".format(self.endpoints)
143 )
144 if controller:
145 await self.disconnect_controller(controller)
146 raise JujuControllerFailedConnecting(e)
147
148 async def disconnect(self):
149 """Disconnect"""
150 # Cancel health check task
151 self.health_check_task.cancel()
152 self.log.debug("Libjuju disconnected!")
153
154 async def disconnect_model(self, model: Model):
155 """
156 Disconnect model
157
158 :param: model: Model that will be disconnected
159 """
160 await model.disconnect()
161
162 async def disconnect_controller(self, controller: Controller):
163 """
164 Disconnect controller
165
166 :param: controller: Controller that will be disconnected
167 """
168 if controller:
169 await controller.disconnect()
170
171 async def add_model(self, model_name: str, cloud_name: str, credential_name=None):
172 """
173 Create model
174
175 :param: model_name: Model name
176 :param: cloud_name: Cloud name
177 :param: credential_name: Credential name to use for adding the model
178 If not specified, same name as the cloud will be used.
179 """
180
181 # Get controller
182 controller = await self.get_controller()
183 model = None
184 try:
185 # Block until other workers have finished model creation
186 while self.creating_model.locked():
187 await asyncio.sleep(0.1)
188
189 # Create the model
190 async with self.creating_model:
191 if await self.model_exists(model_name, controller=controller):
192 return
193 self.log.debug("Creating model {}".format(model_name))
194 model = await controller.add_model(
195 model_name,
196 config=self.model_config,
197 cloud_name=cloud_name,
198 credential_name=credential_name or cloud_name,
199 )
200 finally:
201 if model:
202 await self.disconnect_model(model)
203 await self.disconnect_controller(controller)
204
205 async def get_executed_actions(self, model_name: str) -> list:
206 """
207 Get executed/history of actions for a model.
208
209 :param: model_name: Model name, str.
210 :return: List of executed actions for a model.
211 """
212 model = None
213 executed_actions = []
214 controller = await self.get_controller()
215 try:
216 model = await self.get_model(controller, model_name)
217 # Get all unique action names
218 actions = {}
219 for application in model.applications:
220 application_actions = await self.get_actions(application, model_name)
221 actions.update(application_actions)
222 # Get status of all actions
223 for application_action in actions:
224 app_action_status_list = await model.get_action_status(name=application_action)
225 for action_id, action_status in app_action_status_list.items():
226 executed_action = {"id": action_id, "action": application_action,
227 "status": action_status}
228 # Get action output by id
229 action_status = await model.get_action_output(executed_action["id"])
230 for k, v in action_status.items():
231 executed_action[k] = v
232 executed_actions.append(executed_action)
233 except Exception as e:
234 raise JujuError("Error in getting executed actions for model: {}. Error: {}"
235 .format(model_name, str(e)))
236 finally:
237 if model:
238 await self.disconnect_model(model)
239 await self.disconnect_controller(controller)
240 return executed_actions
241
242 async def get_application_configs(self, model_name: str, application_name: str) -> dict:
243 """
244 Get available configs for an application.
245
246 :param: model_name: Model name, str.
247 :param: application_name: Application name, str.
248
249 :return: A dict which has key - action name, value - action description
250 """
251 model = None
252 application_configs = {}
253 controller = await self.get_controller()
254 try:
255 model = await self.get_model(controller, model_name)
256 application = self._get_application(model, application_name=application_name)
257 application_configs = await application.get_config()
258 except Exception as e:
259 raise JujuError("Error in getting configs for application: {} in model: {}. Error: {}"
260 .format(application_name, model_name, str(e)))
261 finally:
262 if model:
263 await self.disconnect_model(model)
264 await self.disconnect_controller(controller)
265 return application_configs
266
267 async def get_model(
268 self, controller: Controller, model_name: str, id=None
269 ) -> Model:
270 """
271 Get model from controller
272
273 :param: controller: Controller
274 :param: model_name: Model name
275
276 :return: Model: The created Juju model object
277 """
278 return await controller.get_model(model_name)
279
280 async def model_exists(
281 self, model_name: str, controller: Controller = None
282 ) -> bool:
283 """
284 Check if model exists
285
286 :param: controller: Controller
287 :param: model_name: Model name
288
289 :return bool
290 """
291 need_to_disconnect = False
292
293 # Get controller if not passed
294 if not controller:
295 controller = await self.get_controller()
296 need_to_disconnect = True
297
298 # Check if model exists
299 try:
300 return model_name in await controller.list_models()
301 finally:
302 if need_to_disconnect:
303 await self.disconnect_controller(controller)
304
305 async def models_exist(self, model_names: [str]) -> (bool, list):
306 """
307 Check if models exists
308
309 :param: model_names: List of strings with model names
310
311 :return (bool, list[str]): (True if all models exists, List of model names that don't exist)
312 """
313 if not model_names:
314 raise Exception(
315 "model_names must be a non-empty array. Given value: {}".format(
316 model_names
317 )
318 )
319 non_existing_models = []
320 models = await self.list_models()
321 existing_models = list(set(models).intersection(model_names))
322 non_existing_models = list(set(model_names) - set(existing_models))
323
324 return (
325 len(non_existing_models) == 0,
326 non_existing_models,
327 )
328
329 async def get_model_status(self, model_name: str) -> FullStatus:
330 """
331 Get model status
332
333 :param: model_name: Model name
334
335 :return: Full status object
336 """
337 controller = await self.get_controller()
338 model = await self.get_model(controller, model_name)
339 try:
340 return await model.get_status()
341 finally:
342 await self.disconnect_model(model)
343 await self.disconnect_controller(controller)
344
345 async def create_machine(
346 self,
347 model_name: str,
348 machine_id: str = None,
349 db_dict: dict = None,
350 progress_timeout: float = None,
351 total_timeout: float = None,
352 series: str = "xenial",
353 wait: bool = True,
354 ) -> (Machine, bool):
355 """
356 Create machine
357
358 :param: model_name: Model name
359 :param: machine_id: Machine id
360 :param: db_dict: Dictionary with data of the DB to write the updates
361 :param: progress_timeout: Maximum time between two updates in the model
362 :param: total_timeout: Timeout for the entity to be active
363 :param: series: Series of the machine (xenial, bionic, focal, ...)
364 :param: wait: Wait until machine is ready
365
366 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
367 if the machine is new or it already existed
368 """
369 new = False
370 machine = None
371
372 self.log.debug(
373 "Creating machine (id={}) in model: {}".format(machine_id, model_name)
374 )
375
376 # Get controller
377 controller = await self.get_controller()
378
379 # Get model
380 model = await self.get_model(controller, model_name)
381 try:
382 if machine_id is not None:
383 self.log.debug(
384 "Searching machine (id={}) in model {}".format(
385 machine_id, model_name
386 )
387 )
388
389 # Get machines from model and get the machine with machine_id if exists
390 machines = await model.get_machines()
391 if machine_id in machines:
392 self.log.debug(
393 "Machine (id={}) found in model {}".format(
394 machine_id, model_name
395 )
396 )
397 machine = machines[machine_id]
398 else:
399 raise JujuMachineNotFound("Machine {} not found".format(machine_id))
400
401 if machine is None:
402 self.log.debug("Creating a new machine in model {}".format(model_name))
403
404 # Create machine
405 machine = await model.add_machine(
406 spec=None, constraints=None, disks=None, series=series
407 )
408 new = True
409
410 # Wait until the machine is ready
411 self.log.debug(
412 "Wait until machine {} is ready in model {}".format(
413 machine.entity_id, model_name
414 )
415 )
416 if wait:
417 await JujuModelWatcher.wait_for(
418 model=model,
419 entity=machine,
420 progress_timeout=progress_timeout,
421 total_timeout=total_timeout,
422 db_dict=db_dict,
423 n2vc=self.n2vc,
424 )
425 finally:
426 await self.disconnect_model(model)
427 await self.disconnect_controller(controller)
428
429 self.log.debug(
430 "Machine {} ready at {} in model {}".format(
431 machine.entity_id, machine.dns_name, model_name
432 )
433 )
434 return machine, new
435
436 async def provision_machine(
437 self,
438 model_name: str,
439 hostname: str,
440 username: str,
441 private_key_path: str,
442 db_dict: dict = None,
443 progress_timeout: float = None,
444 total_timeout: float = None,
445 ) -> str:
446 """
447 Manually provisioning of a machine
448
449 :param: model_name: Model name
450 :param: hostname: IP to access the machine
451 :param: username: Username to login to the machine
452 :param: private_key_path: Local path for the private key
453 :param: db_dict: Dictionary with data of the DB to write the updates
454 :param: progress_timeout: Maximum time between two updates in the model
455 :param: total_timeout: Timeout for the entity to be active
456
457 :return: (Entity): Machine id
458 """
459 self.log.debug(
460 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
461 model_name, hostname, username
462 )
463 )
464
465 # Get controller
466 controller = await self.get_controller()
467
468 # Get model
469 model = await self.get_model(controller, model_name)
470
471 try:
472 # Get provisioner
473 provisioner = AsyncSSHProvisioner(
474 host=hostname,
475 user=username,
476 private_key_path=private_key_path,
477 log=self.log,
478 )
479
480 # Provision machine
481 params = await provisioner.provision_machine()
482
483 params.jobs = ["JobHostUnits"]
484
485 self.log.debug("Adding machine to model")
486 connection = model.connection()
487 client_facade = client.ClientFacade.from_connection(connection)
488
489 results = await client_facade.AddMachines(params=[params])
490 error = results.machines[0].error
491
492 if error:
493 msg = "Error adding machine: {}".format(error.message)
494 self.log.error(msg=msg)
495 raise ValueError(msg)
496
497 machine_id = results.machines[0].machine
498
499 self.log.debug("Installing Juju agent into machine {}".format(machine_id))
500 asyncio.ensure_future(
501 provisioner.install_agent(
502 connection=connection,
503 nonce=params.nonce,
504 machine_id=machine_id,
505 proxy=self.api_proxy,
506 series=params.series,
507 )
508 )
509
510 machine = None
511 for _ in range(10):
512 machine_list = await model.get_machines()
513 if machine_id in machine_list:
514 self.log.debug("Machine {} found in model!".format(machine_id))
515 machine = model.machines.get(machine_id)
516 break
517 await asyncio.sleep(2)
518
519 if machine is None:
520 msg = "Machine {} not found in model".format(machine_id)
521 self.log.error(msg=msg)
522 raise JujuMachineNotFound(msg)
523
524 self.log.debug(
525 "Wait until machine {} is ready in model {}".format(
526 machine.entity_id, model_name
527 )
528 )
529 await JujuModelWatcher.wait_for(
530 model=model,
531 entity=machine,
532 progress_timeout=progress_timeout,
533 total_timeout=total_timeout,
534 db_dict=db_dict,
535 n2vc=self.n2vc,
536 )
537 except Exception as e:
538 raise e
539 finally:
540 await self.disconnect_model(model)
541 await self.disconnect_controller(controller)
542
543 self.log.debug(
544 "Machine provisioned {} in model {}".format(machine_id, model_name)
545 )
546
547 return machine_id
548
549 async def deploy(
550 self, uri: str, model_name: str, wait: bool = True, timeout: float = 3600
551 ):
552 """
553 Deploy bundle or charm: Similar to the juju CLI command `juju deploy`
554
555 :param: uri: Path or Charm Store uri in which the charm or bundle can be found
556 :param: model_name: Model name
557 :param: wait: Indicates whether to wait or not until all applications are active
558 :param: timeout: Time in seconds to wait until all applications are active
559 """
560 controller = await self.get_controller()
561 model = await self.get_model(controller, model_name)
562 try:
563 await model.deploy(uri)
564 if wait:
565 await JujuModelWatcher.wait_for_model(model, timeout=timeout)
566 self.log.debug("All units active in model {}".format(model_name))
567 finally:
568 await self.disconnect_model(model)
569 await self.disconnect_controller(controller)
570
571 async def deploy_charm(
572 self,
573 application_name: str,
574 path: str,
575 model_name: str,
576 machine_id: str,
577 db_dict: dict = None,
578 progress_timeout: float = None,
579 total_timeout: float = None,
580 config: dict = None,
581 series: str = None,
582 num_units: int = 1,
583 ):
584 """Deploy charm
585
586 :param: application_name: Application name
587 :param: path: Local path to the charm
588 :param: model_name: Model name
589 :param: machine_id ID of the machine
590 :param: db_dict: Dictionary with data of the DB to write the updates
591 :param: progress_timeout: Maximum time between two updates in the model
592 :param: total_timeout: Timeout for the entity to be active
593 :param: config: Config for the charm
594 :param: series: Series of the charm
595 :param: num_units: Number of units
596
597 :return: (juju.application.Application): Juju application
598 """
599 self.log.debug(
600 "Deploying charm {} to machine {} in model ~{}".format(
601 application_name, machine_id, model_name
602 )
603 )
604 self.log.debug("charm: {}".format(path))
605
606 # Get controller
607 controller = await self.get_controller()
608
609 # Get model
610 model = await self.get_model(controller, model_name)
611
612 try:
613 application = None
614 if application_name not in model.applications:
615
616 if machine_id is not None:
617 if machine_id not in model.machines:
618 msg = "Machine {} not found in model".format(machine_id)
619 self.log.error(msg=msg)
620 raise JujuMachineNotFound(msg)
621 machine = model.machines[machine_id]
622 series = machine.series
623
624 application = await model.deploy(
625 entity_url=path,
626 application_name=application_name,
627 channel="stable",
628 num_units=1,
629 series=series,
630 to=machine_id,
631 config=config,
632 )
633
634 self.log.debug(
635 "Wait until application {} is ready in model {}".format(
636 application_name, model_name
637 )
638 )
639 if num_units > 1:
640 for _ in range(num_units - 1):
641 m, _ = await self.create_machine(model_name, wait=False)
642 await application.add_unit(to=m.entity_id)
643
644 await JujuModelWatcher.wait_for(
645 model=model,
646 entity=application,
647 progress_timeout=progress_timeout,
648 total_timeout=total_timeout,
649 db_dict=db_dict,
650 n2vc=self.n2vc,
651 )
652 self.log.debug(
653 "Application {} is ready in model {}".format(
654 application_name, model_name
655 )
656 )
657 else:
658 raise JujuApplicationExists(
659 "Application {} exists".format(application_name)
660 )
661 finally:
662 await self.disconnect_model(model)
663 await self.disconnect_controller(controller)
664
665 return application
666
667 def _get_application(self, model: Model, application_name: str) -> Application:
668 """Get application
669
670 :param: model: Model object
671 :param: application_name: Application name
672
673 :return: juju.application.Application (or None if it doesn't exist)
674 """
675 if model.applications and application_name in model.applications:
676 return model.applications[application_name]
677
678 async def execute_action(
679 self,
680 application_name: str,
681 model_name: str,
682 action_name: str,
683 db_dict: dict = None,
684 progress_timeout: float = None,
685 total_timeout: float = None,
686 **kwargs
687 ):
688 """Execute action
689
690 :param: application_name: Application name
691 :param: model_name: Model name
692 :param: action_name: Name of the action
693 :param: db_dict: Dictionary with data of the DB to write the updates
694 :param: progress_timeout: Maximum time between two updates in the model
695 :param: total_timeout: Timeout for the entity to be active
696
697 :return: (str, str): (output and status)
698 """
699 self.log.debug(
700 "Executing action {} using params {}".format(action_name, kwargs)
701 )
702 # Get controller
703 controller = await self.get_controller()
704
705 # Get model
706 model = await self.get_model(controller, model_name)
707
708 try:
709 # Get application
710 application = self._get_application(
711 model,
712 application_name=application_name,
713 )
714 if application is None:
715 raise JujuApplicationNotFound("Cannot execute action")
716
717 # Get leader unit
718 # Racing condition:
719 # Ocassionally, self._get_leader_unit() will return None
720 # because the leader elected hook has not been triggered yet.
721 # Therefore, we are doing some retries. If it happens again,
722 # re-open bug 1236
723 attempts = 3
724 time_between_retries = 10
725 unit = None
726 for _ in range(attempts):
727 unit = await self._get_leader_unit(application)
728 if unit is None:
729 await asyncio.sleep(time_between_retries)
730 else:
731 break
732 if unit is None:
733 raise JujuLeaderUnitNotFound(
734 "Cannot execute action: leader unit not found"
735 )
736
737 actions = await application.get_actions()
738
739 if action_name not in actions:
740 raise JujuActionNotFound(
741 "Action {} not in available actions".format(action_name)
742 )
743
744 action = await unit.run_action(action_name, **kwargs)
745
746 self.log.debug(
747 "Wait until action {} is completed in application {} (model={})".format(
748 action_name, application_name, model_name
749 )
750 )
751 await JujuModelWatcher.wait_for(
752 model=model,
753 entity=action,
754 progress_timeout=progress_timeout,
755 total_timeout=total_timeout,
756 db_dict=db_dict,
757 n2vc=self.n2vc,
758 )
759
760 output = await model.get_action_output(action_uuid=action.entity_id)
761 status = await model.get_action_status(uuid_or_prefix=action.entity_id)
762 status = (
763 status[action.entity_id] if action.entity_id in status else "failed"
764 )
765
766 self.log.debug(
767 "Action {} completed with status {} in application {} (model={})".format(
768 action_name, action.status, application_name, model_name
769 )
770 )
771 finally:
772 await self.disconnect_model(model)
773 await self.disconnect_controller(controller)
774
775 return output, status
776
777 async def get_actions(self, application_name: str, model_name: str) -> dict:
778 """Get list of actions
779
780 :param: application_name: Application name
781 :param: model_name: Model name
782
783 :return: Dict with this format
784 {
785 "action_name": "Description of the action",
786 ...
787 }
788 """
789 self.log.debug(
790 "Getting list of actions for application {}".format(application_name)
791 )
792
793 # Get controller
794 controller = await self.get_controller()
795
796 # Get model
797 model = await self.get_model(controller, model_name)
798
799 try:
800 # Get application
801 application = self._get_application(
802 model,
803 application_name=application_name,
804 )
805
806 # Return list of actions
807 return await application.get_actions()
808
809 finally:
810 # Disconnect from model and controller
811 await self.disconnect_model(model)
812 await self.disconnect_controller(controller)
813
814 async def get_metrics(self, model_name: str, application_name: str) -> dict:
815 """Get the metrics collected by the VCA.
816
817 :param model_name The name or unique id of the network service
818 :param application_name The name of the application
819 """
820 if not model_name or not application_name:
821 raise Exception("model_name and application_name must be non-empty strings")
822 metrics = {}
823 controller = await self.get_controller()
824 model = await self.get_model(controller, model_name)
825 try:
826 application = self._get_application(model, application_name)
827 if application is not None:
828 metrics = await application.get_metrics()
829 finally:
830 self.disconnect_model(model)
831 self.disconnect_controller(controller)
832 return metrics
833
834 async def add_relation(
835 self,
836 model_name: str,
837 endpoint_1: str,
838 endpoint_2: str,
839 ):
840 """Add relation
841
842 :param: model_name: Model name
843 :param: endpoint_1 First endpoint name
844 ("app:endpoint" format or directly the saas name)
845 :param: endpoint_2: Second endpoint name (^ same format)
846 """
847
848 self.log.debug("Adding relation: {} -> {}".format(endpoint_1, endpoint_2))
849
850 # Get controller
851 controller = await self.get_controller()
852
853 # Get model
854 model = await self.get_model(controller, model_name)
855
856 # Add relation
857 try:
858 await model.add_relation(endpoint_1, endpoint_2)
859 except JujuAPIError as e:
860 if "not found" in e.message:
861 self.log.warning("Relation not found: {}".format(e.message))
862 return
863 if "already exists" in e.message:
864 self.log.warning("Relation already exists: {}".format(e.message))
865 return
866 # another exception, raise it
867 raise e
868 finally:
869 await self.disconnect_model(model)
870 await self.disconnect_controller(controller)
871
872 async def consume(
873 self,
874 offer_url: str,
875 model_name: str,
876 ):
877 """
878 Adds a remote offer to the model. Relations can be created later using "juju relate".
879
880 :param: offer_url: Offer Url
881 :param: model_name: Model name
882
883 :raises ParseError if there's a problem parsing the offer_url
884 :raises JujuError if remote offer includes and endpoint
885 :raises JujuAPIError if the operation is not successful
886 """
887 controller = await self.get_controller()
888 model = await controller.get_model(model_name)
889
890 try:
891 await model.consume(offer_url)
892 finally:
893 await self.disconnect_model(model)
894 await self.disconnect_controller(controller)
895
896 async def destroy_model(self, model_name: str, total_timeout: float):
897 """
898 Destroy model
899
900 :param: model_name: Model name
901 :param: total_timeout: Timeout
902 """
903
904 controller = await self.get_controller()
905 model = None
906 try:
907 if not await self.model_exists(model_name, controller=controller):
908 return
909
910 model = await self.get_model(controller, model_name)
911 self.log.debug("Destroying model {}".format(model_name))
912 uuid = model.info.uuid
913
914 # Destroy machines that are manually provisioned
915 # and still are in pending state
916 await self._destroy_pending_machines(model, only_manual=True)
917
918 # Disconnect model
919 await self.disconnect_model(model)
920
921 await controller.destroy_model(uuid, force=True, max_wait=0)
922
923 # Wait until model is destroyed
924 self.log.debug("Waiting for model {} to be destroyed...".format(model_name))
925
926 if total_timeout is None:
927 total_timeout = 3600
928 end = time.time() + total_timeout
929 while time.time() < end:
930 models = await controller.list_models()
931 if model_name not in models:
932 self.log.debug(
933 "The model {} ({}) was destroyed".format(model_name, uuid)
934 )
935 return
936 await asyncio.sleep(5)
937 raise Exception(
938 "Timeout waiting for model {} to be destroyed".format(model_name)
939 )
940 except Exception as e:
941 if model:
942 await self.disconnect_model(model)
943 raise e
944 finally:
945 await self.disconnect_controller(controller)
946
947 async def destroy_application(self, model: Model, application_name: str):
948 """
949 Destroy application
950
951 :param: model: Model object
952 :param: application_name: Application name
953 """
954 self.log.debug(
955 "Destroying application {} in model {}".format(
956 application_name, model.info.name
957 )
958 )
959 application = model.applications.get(application_name)
960 if application:
961 await application.destroy()
962 else:
963 self.log.warning("Application not found: {}".format(application_name))
964
965 async def _destroy_pending_machines(self, model: Model, only_manual: bool = False):
966 """
967 Destroy pending machines in a given model
968
969 :param: only_manual: Bool that indicates only manually provisioned
970 machines should be destroyed (if True), or that
971 all pending machines should be destroyed
972 """
973 status = await model.get_status()
974 for machine_id in status.machines:
975 machine_status = status.machines[machine_id]
976 if machine_status.agent_status.status == "pending":
977 if only_manual and not machine_status.instance_id.startswith("manual:"):
978 break
979 machine = model.machines[machine_id]
980 await machine.destroy(force=True)
981
982 async def configure_application(
983 self, model_name: str, application_name: str, config: dict = None
984 ):
985 """Configure application
986
987 :param: model_name: Model name
988 :param: application_name: Application name
989 :param: config: Config to apply to the charm
990 """
991 self.log.debug("Configuring application {}".format(application_name))
992
993 if config:
994 controller = await self.get_controller()
995 model = None
996 try:
997 model = await self.get_model(controller, model_name)
998 application = self._get_application(
999 model,
1000 application_name=application_name,
1001 )
1002 await application.set_config(config)
1003 finally:
1004 if model:
1005 await self.disconnect_model(model)
1006 await self.disconnect_controller(controller)
1007
1008 def _get_api_endpoints_db(self) -> [str]:
1009 """
1010 Get API Endpoints from DB
1011
1012 :return: List of API endpoints
1013 """
1014 self.log.debug("Getting endpoints from database")
1015
1016 juju_info = self.db.get_one(
1017 DB_DATA.api_endpoints.table,
1018 q_filter=DB_DATA.api_endpoints.filter,
1019 fail_on_empty=False,
1020 )
1021 if juju_info and DB_DATA.api_endpoints.key in juju_info:
1022 return juju_info[DB_DATA.api_endpoints.key]
1023
1024 def _update_api_endpoints_db(self, endpoints: [str]):
1025 """
1026 Update API endpoints in Database
1027
1028 :param: List of endpoints
1029 """
1030 self.log.debug("Saving endpoints {} in database".format(endpoints))
1031
1032 juju_info = self.db.get_one(
1033 DB_DATA.api_endpoints.table,
1034 q_filter=DB_DATA.api_endpoints.filter,
1035 fail_on_empty=False,
1036 )
1037 # If it doesn't, then create it
1038 if not juju_info:
1039 try:
1040 self.db.create(
1041 DB_DATA.api_endpoints.table,
1042 DB_DATA.api_endpoints.filter,
1043 )
1044 except DbException as e:
1045 # Racing condition: check if another N2VC worker has created it
1046 juju_info = self.db.get_one(
1047 DB_DATA.api_endpoints.table,
1048 q_filter=DB_DATA.api_endpoints.filter,
1049 fail_on_empty=False,
1050 )
1051 if not juju_info:
1052 raise e
1053 self.db.set_one(
1054 DB_DATA.api_endpoints.table,
1055 DB_DATA.api_endpoints.filter,
1056 {DB_DATA.api_endpoints.key: endpoints},
1057 )
1058
1059 def handle_exception(self, loop, context):
1060 # All unhandled exceptions by libjuju are handled here.
1061 pass
1062
1063 async def health_check(self, interval: float = 300.0):
1064 """
1065 Health check to make sure controller and controller_model connections are OK
1066
1067 :param: interval: Time in seconds between checks
1068 """
1069 controller = None
1070 while True:
1071 try:
1072 controller = await self.get_controller()
1073 # self.log.debug("VCA is alive")
1074 except Exception as e:
1075 self.log.error("Health check to VCA failed: {}".format(e))
1076 finally:
1077 await self.disconnect_controller(controller)
1078 await asyncio.sleep(interval)
1079
1080 async def list_models(self, contains: str = None) -> [str]:
1081 """List models with certain names
1082
1083 :param: contains: String that is contained in model name
1084
1085 :retur: [models] Returns list of model names
1086 """
1087
1088 controller = await self.get_controller()
1089 try:
1090 models = await controller.list_models()
1091 if contains:
1092 models = [model for model in models if contains in model]
1093 return models
1094 finally:
1095 await self.disconnect_controller(controller)
1096
1097 async def list_offers(self, model_name: str) -> QueryApplicationOffersResults:
1098 """List models with certain names
1099
1100 :param: model_name: Model name
1101
1102 :return: Returns list of offers
1103 """
1104
1105 controller = await self.get_controller()
1106 try:
1107 return await controller.list_offers(model_name)
1108 finally:
1109 await self.disconnect_controller(controller)
1110
1111 async def add_k8s(
1112 self,
1113 name: str,
1114 rbac_id: str,
1115 token: str,
1116 client_cert_data: str,
1117 configuration: Configuration,
1118 storage_class: str,
1119 credential_name: str = None,
1120 ):
1121 """
1122 Add a Kubernetes cloud to the controller
1123
1124 Similar to the `juju add-k8s` command in the CLI
1125
1126 :param: name: Name for the K8s cloud
1127 :param: configuration: Kubernetes configuration object
1128 :param: storage_class: Storage Class to use in the cloud
1129 :param: credential_name: Storage Class to use in the cloud
1130 """
1131
1132 if not storage_class:
1133 raise Exception("storage_class must be a non-empty string")
1134 if not name:
1135 raise Exception("name must be a non-empty string")
1136 if not configuration:
1137 raise Exception("configuration must be provided")
1138
1139 endpoint = configuration.host
1140 credential = self.get_k8s_cloud_credential(
1141 configuration,
1142 client_cert_data,
1143 token,
1144 )
1145 credential.attrs[RBAC_LABEL_KEY_NAME] = rbac_id
1146 cloud = client.Cloud(
1147 type_="kubernetes",
1148 auth_types=[credential.auth_type],
1149 endpoint=endpoint,
1150 ca_certificates=[client_cert_data],
1151 config={
1152 "operator-storage": storage_class,
1153 "workload-storage": storage_class,
1154 },
1155 )
1156
1157 return await self.add_cloud(
1158 name, cloud, credential, credential_name=credential_name
1159 )
1160
1161 def get_k8s_cloud_credential(
1162 self,
1163 configuration: Configuration,
1164 client_cert_data: str,
1165 token: str = None,
1166 ) -> client.CloudCredential:
1167 attrs = {}
1168 # TODO: Test with AKS
1169 key = None # open(configuration.key_file, "r").read()
1170 username = configuration.username
1171 password = configuration.password
1172
1173 if client_cert_data:
1174 attrs["ClientCertificateData"] = client_cert_data
1175 if key:
1176 attrs["ClientKeyData"] = key
1177 if token:
1178 if username or password:
1179 raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass")
1180 attrs["Token"] = token
1181
1182 auth_type = None
1183 if key:
1184 auth_type = "oauth2"
1185 if client_cert_data:
1186 auth_type = "oauth2withcert"
1187 if not token:
1188 raise JujuInvalidK8sConfiguration(
1189 "missing token for auth type {}".format(auth_type)
1190 )
1191 elif username:
1192 if not password:
1193 self.log.debug(
1194 "credential for user {} has empty password".format(username)
1195 )
1196 attrs["username"] = username
1197 attrs["password"] = password
1198 if client_cert_data:
1199 auth_type = "userpasswithcert"
1200 else:
1201 auth_type = "userpass"
1202 elif client_cert_data and token:
1203 auth_type = "certificate"
1204 else:
1205 raise JujuInvalidK8sConfiguration("authentication method not supported")
1206 return client.CloudCredential(auth_type=auth_type, attrs=attrs)
1207
1208 async def add_cloud(
1209 self,
1210 name: str,
1211 cloud: Cloud,
1212 credential: CloudCredential = None,
1213 credential_name: str = None,
1214 ) -> Cloud:
1215 """
1216 Add cloud to the controller
1217
1218 :param: name: Name of the cloud to be added
1219 :param: cloud: Cloud object
1220 :param: credential: CloudCredentials object for the cloud
1221 :param: credential_name: Credential name.
1222 If not defined, cloud of the name will be used.
1223 """
1224 controller = await self.get_controller()
1225 try:
1226 _ = await controller.add_cloud(name, cloud)
1227 if credential:
1228 await controller.add_credential(
1229 credential_name or name, credential=credential, cloud=name
1230 )
1231 # Need to return the object returned by the controller.add_cloud() function
1232 # I'm returning the original value now until this bug is fixed:
1233 # https://github.com/juju/python-libjuju/issues/443
1234 return cloud
1235 finally:
1236 await self.disconnect_controller(controller)
1237
1238 async def remove_cloud(self, name: str):
1239 """
1240 Remove cloud
1241
1242 :param: name: Name of the cloud to be removed
1243 """
1244 controller = await self.get_controller()
1245 try:
1246 await controller.remove_cloud(name)
1247 finally:
1248 await self.disconnect_controller(controller)
1249
1250 async def _get_leader_unit(self, application: Application) -> Unit:
1251 unit = None
1252 for u in application.units:
1253 if await u.is_leader_from_status():
1254 unit = u
1255 break
1256 return unit
1257
1258 async def get_cloud_credentials(self, cloud_name: str, credential_name: str):
1259 controller = await self.get_controller()
1260 try:
1261 facade = client.CloudFacade.from_connection(controller.connection())
1262 cloud_cred_tag = tag.credential(cloud_name, self.username, credential_name)
1263 params = [client.Entity(cloud_cred_tag)]
1264 return (await facade.Credential(params)).results
1265 finally:
1266 await self.disconnect_controller(controller)