cb2e0ba925d248f39a53bdb45e06c034a9e0b5cc
[osm/N2VC.git] / n2vc / libjuju.py
1 # Copyright 2020 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import logging
17 import typing
18
19 import time
20
21 from juju.errors import JujuAPIError
22 from juju.model import Model
23 from juju.machine import Machine
24 from juju.application import Application
25 from juju.unit import Unit
26 from juju.client._definitions import (
27 FullStatus,
28 QueryApplicationOffersResults,
29 Cloud,
30 CloudCredential,
31 )
32 from juju.controller import Controller
33 from juju.client import client
34 from juju import tag
35
36 from n2vc.juju_watcher import JujuModelWatcher
37 from n2vc.provisioner import AsyncSSHProvisioner
38 from n2vc.n2vc_conn import N2VCConnector
39 from n2vc.exceptions import (
40 JujuMachineNotFound,
41 JujuApplicationNotFound,
42 JujuLeaderUnitNotFound,
43 JujuActionNotFound,
44 JujuControllerFailedConnecting,
45 JujuApplicationExists,
46 JujuInvalidK8sConfiguration,
47 JujuError,
48 )
49 from n2vc.vca.cloud import Cloud as VcaCloud
50 from n2vc.vca.connection import Connection
51 from kubernetes.client.configuration import Configuration
52 from retrying_async import retry
53
54
55 RBAC_LABEL_KEY_NAME = "rbac-id"
56
57
58 class Libjuju:
59 def __init__(
60 self,
61 vca_connection: Connection,
62 loop: asyncio.AbstractEventLoop = None,
63 log: logging.Logger = None,
64 n2vc: N2VCConnector = None,
65 ):
66 """
67 Constructor
68
69 :param: vca_connection: n2vc.vca.connection object
70 :param: loop: Asyncio loop
71 :param: log: Logger
72 :param: n2vc: N2VC object
73 """
74
75 self.log = log or logging.getLogger("Libjuju")
76 self.n2vc = n2vc
77 self.vca_connection = vca_connection
78
79 self.loop = loop or asyncio.get_event_loop()
80 self.loop.set_exception_handler(self.handle_exception)
81 self.creating_model = asyncio.Lock(loop=self.loop)
82
83 if self.vca_connection.is_default:
84 self.health_check_task = self._create_health_check_task()
85
86 def _create_health_check_task(self):
87 return self.loop.create_task(self.health_check())
88
89 async def get_controller(self, timeout: float = 60.0) -> Controller:
90 """
91 Get controller
92
93 :param: timeout: Time in seconds to wait for controller to connect
94 """
95 controller = None
96 try:
97 controller = Controller(loop=self.loop)
98 await asyncio.wait_for(
99 controller.connect(
100 endpoint=self.vca_connection.data.endpoints,
101 username=self.vca_connection.data.user,
102 password=self.vca_connection.data.secret,
103 cacert=self.vca_connection.data.cacert,
104 ),
105 timeout=timeout,
106 )
107 if self.vca_connection.is_default:
108 endpoints = await controller.api_endpoints
109 if not all(
110 endpoint in self.vca_connection.endpoints for endpoint in endpoints
111 ):
112 await self.vca_connection.update_endpoints(endpoints)
113 return controller
114 except asyncio.CancelledError as e:
115 raise e
116 except Exception as e:
117 self.log.error(
118 "Failed connecting to controller: {}... {}".format(
119 self.vca_connection.data.endpoints, e
120 )
121 )
122 if controller:
123 await self.disconnect_controller(controller)
124 raise JujuControllerFailedConnecting(e)
125
126 async def disconnect(self):
127 """Disconnect"""
128 # Cancel health check task
129 self.health_check_task.cancel()
130 self.log.debug("Libjuju disconnected!")
131
132 async def disconnect_model(self, model: Model):
133 """
134 Disconnect model
135
136 :param: model: Model that will be disconnected
137 """
138 await model.disconnect()
139
140 async def disconnect_controller(self, controller: Controller):
141 """
142 Disconnect controller
143
144 :param: controller: Controller that will be disconnected
145 """
146 if controller:
147 await controller.disconnect()
148
149 @retry(attempts=3, delay=5, timeout=None)
150 async def add_model(self, model_name: str, cloud: VcaCloud):
151 """
152 Create model
153
154 :param: model_name: Model name
155 :param: cloud: Cloud object
156 """
157
158 # Get controller
159 controller = await self.get_controller()
160 model = None
161 try:
162 # Block until other workers have finished model creation
163 while self.creating_model.locked():
164 await asyncio.sleep(0.1)
165
166 # Create the model
167 async with self.creating_model:
168 if await self.model_exists(model_name, controller=controller):
169 return
170 self.log.debug("Creating model {}".format(model_name))
171 model = await controller.add_model(
172 model_name,
173 config=self.vca_connection.data.model_config,
174 cloud_name=cloud.name,
175 credential_name=cloud.credential_name,
176 )
177 except JujuAPIError as e:
178 if "already exists" in e.message:
179 pass
180 else:
181 raise e
182 finally:
183 if model:
184 await self.disconnect_model(model)
185 await self.disconnect_controller(controller)
186
187 async def get_executed_actions(self, model_name: str) -> list:
188 """
189 Get executed/history of actions for a model.
190
191 :param: model_name: Model name, str.
192 :return: List of executed actions for a model.
193 """
194 model = None
195 executed_actions = []
196 controller = await self.get_controller()
197 try:
198 model = await self.get_model(controller, model_name)
199 # Get all unique action names
200 actions = {}
201 for application in model.applications:
202 application_actions = await self.get_actions(application, model_name)
203 actions.update(application_actions)
204 # Get status of all actions
205 for application_action in actions:
206 app_action_status_list = await model.get_action_status(
207 name=application_action
208 )
209 for action_id, action_status in app_action_status_list.items():
210 executed_action = {
211 "id": action_id,
212 "action": application_action,
213 "status": action_status,
214 }
215 # Get action output by id
216 action_status = await model.get_action_output(executed_action["id"])
217 for k, v in action_status.items():
218 executed_action[k] = v
219 executed_actions.append(executed_action)
220 except Exception as e:
221 raise JujuError(
222 "Error in getting executed actions for model: {}. Error: {}".format(
223 model_name, str(e)
224 )
225 )
226 finally:
227 if model:
228 await self.disconnect_model(model)
229 await self.disconnect_controller(controller)
230 return executed_actions
231
232 async def get_application_configs(
233 self, model_name: str, application_name: str
234 ) -> dict:
235 """
236 Get available configs for an application.
237
238 :param: model_name: Model name, str.
239 :param: application_name: Application name, str.
240
241 :return: A dict which has key - action name, value - action description
242 """
243 model = None
244 application_configs = {}
245 controller = await self.get_controller()
246 try:
247 model = await self.get_model(controller, model_name)
248 application = self._get_application(
249 model, application_name=application_name
250 )
251 application_configs = await application.get_config()
252 except Exception as e:
253 raise JujuError(
254 "Error in getting configs for application: {} in model: {}. Error: {}".format(
255 application_name, model_name, str(e)
256 )
257 )
258 finally:
259 if model:
260 await self.disconnect_model(model)
261 await self.disconnect_controller(controller)
262 return application_configs
263
264 @retry(attempts=3, delay=5)
265 async def get_model(self, controller: Controller, model_name: str) -> Model:
266 """
267 Get model from controller
268
269 :param: controller: Controller
270 :param: model_name: Model name
271
272 :return: Model: The created Juju model object
273 """
274 return await controller.get_model(model_name)
275
276 async def model_exists(self, model_name: str, controller: Controller = None) -> bool:
277 """
278 Check if model exists
279
280 :param: controller: Controller
281 :param: model_name: Model name
282
283 :return bool
284 """
285 need_to_disconnect = False
286
287 # Get controller if not passed
288 if not controller:
289 controller = await self.get_controller()
290 need_to_disconnect = True
291
292 # Check if model exists
293 try:
294 return model_name in await controller.list_models()
295 finally:
296 if need_to_disconnect:
297 await self.disconnect_controller(controller)
298
299 async def models_exist(self, model_names: [str]) -> (bool, list):
300 """
301 Check if models exists
302
303 :param: model_names: List of strings with model names
304
305 :return (bool, list[str]): (True if all models exists, List of model names that don't exist)
306 """
307 if not model_names:
308 raise Exception(
309 "model_names must be a non-empty array. Given value: {}".format(
310 model_names
311 )
312 )
313 non_existing_models = []
314 models = await self.list_models()
315 existing_models = list(set(models).intersection(model_names))
316 non_existing_models = list(set(model_names) - set(existing_models))
317
318 return (
319 len(non_existing_models) == 0,
320 non_existing_models,
321 )
322
323 async def get_model_status(self, model_name: str) -> FullStatus:
324 """
325 Get model status
326
327 :param: model_name: Model name
328
329 :return: Full status object
330 """
331 controller = await self.get_controller()
332 model = await self.get_model(controller, model_name)
333 try:
334 return await model.get_status()
335 finally:
336 await self.disconnect_model(model)
337 await self.disconnect_controller(controller)
338
339 async def create_machine(
340 self,
341 model_name: str,
342 machine_id: str = None,
343 db_dict: dict = None,
344 progress_timeout: float = None,
345 total_timeout: float = None,
346 series: str = "xenial",
347 wait: bool = True,
348 ) -> (Machine, bool):
349 """
350 Create machine
351
352 :param: model_name: Model name
353 :param: machine_id: Machine id
354 :param: db_dict: Dictionary with data of the DB to write the updates
355 :param: progress_timeout: Maximum time between two updates in the model
356 :param: total_timeout: Timeout for the entity to be active
357 :param: series: Series of the machine (xenial, bionic, focal, ...)
358 :param: wait: Wait until machine is ready
359
360 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
361 if the machine is new or it already existed
362 """
363 new = False
364 machine = None
365
366 self.log.debug(
367 "Creating machine (id={}) in model: {}".format(machine_id, model_name)
368 )
369
370 # Get controller
371 controller = await self.get_controller()
372
373 # Get model
374 model = await self.get_model(controller, model_name)
375 try:
376 if machine_id is not None:
377 self.log.debug(
378 "Searching machine (id={}) in model {}".format(
379 machine_id, model_name
380 )
381 )
382
383 # Get machines from model and get the machine with machine_id if exists
384 machines = await model.get_machines()
385 if machine_id in machines:
386 self.log.debug(
387 "Machine (id={}) found in model {}".format(
388 machine_id, model_name
389 )
390 )
391 machine = machines[machine_id]
392 else:
393 raise JujuMachineNotFound("Machine {} not found".format(machine_id))
394
395 if machine is None:
396 self.log.debug("Creating a new machine in model {}".format(model_name))
397
398 # Create machine
399 machine = await model.add_machine(
400 spec=None, constraints=None, disks=None, series=series
401 )
402 new = True
403
404 # Wait until the machine is ready
405 self.log.debug(
406 "Wait until machine {} is ready in model {}".format(
407 machine.entity_id, model_name
408 )
409 )
410 if wait:
411 await JujuModelWatcher.wait_for(
412 model=model,
413 entity=machine,
414 progress_timeout=progress_timeout,
415 total_timeout=total_timeout,
416 db_dict=db_dict,
417 n2vc=self.n2vc,
418 vca_id=self.vca_connection._vca_id,
419 )
420 finally:
421 await self.disconnect_model(model)
422 await self.disconnect_controller(controller)
423
424 self.log.debug(
425 "Machine {} ready at {} in model {}".format(
426 machine.entity_id, machine.dns_name, model_name
427 )
428 )
429 return machine, new
430
431 async def provision_machine(
432 self,
433 model_name: str,
434 hostname: str,
435 username: str,
436 private_key_path: str,
437 db_dict: dict = None,
438 progress_timeout: float = None,
439 total_timeout: float = None,
440 ) -> str:
441 """
442 Manually provisioning of a machine
443
444 :param: model_name: Model name
445 :param: hostname: IP to access the machine
446 :param: username: Username to login to the machine
447 :param: private_key_path: Local path for the private key
448 :param: db_dict: Dictionary with data of the DB to write the updates
449 :param: progress_timeout: Maximum time between two updates in the model
450 :param: total_timeout: Timeout for the entity to be active
451
452 :return: (Entity): Machine id
453 """
454 self.log.debug(
455 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
456 model_name, hostname, username
457 )
458 )
459
460 # Get controller
461 controller = await self.get_controller()
462
463 # Get model
464 model = await self.get_model(controller, model_name)
465
466 try:
467 # Get provisioner
468 provisioner = AsyncSSHProvisioner(
469 host=hostname,
470 user=username,
471 private_key_path=private_key_path,
472 log=self.log,
473 )
474
475 # Provision machine
476 params = await provisioner.provision_machine()
477
478 params.jobs = ["JobHostUnits"]
479
480 self.log.debug("Adding machine to model")
481 connection = model.connection()
482 client_facade = client.ClientFacade.from_connection(connection)
483
484 results = await client_facade.AddMachines(params=[params])
485 error = results.machines[0].error
486
487 if error:
488 msg = "Error adding machine: {}".format(error.message)
489 self.log.error(msg=msg)
490 raise ValueError(msg)
491
492 machine_id = results.machines[0].machine
493
494 self.log.debug("Installing Juju agent into machine {}".format(machine_id))
495 asyncio.ensure_future(
496 provisioner.install_agent(
497 connection=connection,
498 nonce=params.nonce,
499 machine_id=machine_id,
500 proxy=self.vca_connection.data.api_proxy,
501 series=params.series,
502 )
503 )
504
505 machine = None
506 for _ in range(10):
507 machine_list = await model.get_machines()
508 if machine_id in machine_list:
509 self.log.debug("Machine {} found in model!".format(machine_id))
510 machine = model.machines.get(machine_id)
511 break
512 await asyncio.sleep(2)
513
514 if machine is None:
515 msg = "Machine {} not found in model".format(machine_id)
516 self.log.error(msg=msg)
517 raise JujuMachineNotFound(msg)
518
519 self.log.debug(
520 "Wait until machine {} is ready in model {}".format(
521 machine.entity_id, model_name
522 )
523 )
524 await JujuModelWatcher.wait_for(
525 model=model,
526 entity=machine,
527 progress_timeout=progress_timeout,
528 total_timeout=total_timeout,
529 db_dict=db_dict,
530 n2vc=self.n2vc,
531 vca_id=self.vca_connection._vca_id,
532 )
533 except Exception as e:
534 raise e
535 finally:
536 await self.disconnect_model(model)
537 await self.disconnect_controller(controller)
538
539 self.log.debug(
540 "Machine provisioned {} in model {}".format(machine_id, model_name)
541 )
542
543 return machine_id
544
545 async def deploy(
546 self, uri: str, model_name: str, wait: bool = True, timeout: float = 3600
547 ):
548 """
549 Deploy bundle or charm: Similar to the juju CLI command `juju deploy`
550
551 :param: uri: Path or Charm Store uri in which the charm or bundle can be found
552 :param: model_name: Model name
553 :param: wait: Indicates whether to wait or not until all applications are active
554 :param: timeout: Time in seconds to wait until all applications are active
555 """
556 controller = await self.get_controller()
557 model = await self.get_model(controller, model_name)
558 try:
559 await model.deploy(uri)
560 if wait:
561 await JujuModelWatcher.wait_for_model(model, timeout=timeout)
562 self.log.debug("All units active in model {}".format(model_name))
563 finally:
564 await self.disconnect_model(model)
565 await self.disconnect_controller(controller)
566
567 async def deploy_charm(
568 self,
569 application_name: str,
570 path: str,
571 model_name: str,
572 machine_id: str,
573 db_dict: dict = None,
574 progress_timeout: float = None,
575 total_timeout: float = None,
576 config: dict = None,
577 series: str = None,
578 num_units: int = 1,
579 ):
580 """Deploy charm
581
582 :param: application_name: Application name
583 :param: path: Local path to the charm
584 :param: model_name: Model name
585 :param: machine_id ID of the machine
586 :param: db_dict: Dictionary with data of the DB to write the updates
587 :param: progress_timeout: Maximum time between two updates in the model
588 :param: total_timeout: Timeout for the entity to be active
589 :param: config: Config for the charm
590 :param: series: Series of the charm
591 :param: num_units: Number of units
592
593 :return: (juju.application.Application): Juju application
594 """
595 self.log.debug(
596 "Deploying charm {} to machine {} in model ~{}".format(
597 application_name, machine_id, model_name
598 )
599 )
600 self.log.debug("charm: {}".format(path))
601
602 # Get controller
603 controller = await self.get_controller()
604
605 # Get model
606 model = await self.get_model(controller, model_name)
607
608 try:
609 application = None
610 if application_name not in model.applications:
611
612 if machine_id is not None:
613 if machine_id not in model.machines:
614 msg = "Machine {} not found in model".format(machine_id)
615 self.log.error(msg=msg)
616 raise JujuMachineNotFound(msg)
617 machine = model.machines[machine_id]
618 series = machine.series
619
620 application = await model.deploy(
621 entity_url=path,
622 application_name=application_name,
623 channel="stable",
624 num_units=1,
625 series=series,
626 to=machine_id,
627 config=config,
628 )
629
630 self.log.debug(
631 "Wait until application {} is ready in model {}".format(
632 application_name, model_name
633 )
634 )
635 if num_units > 1:
636 for _ in range(num_units - 1):
637 m, _ = await self.create_machine(model_name, wait=False)
638 await application.add_unit(to=m.entity_id)
639
640 await JujuModelWatcher.wait_for(
641 model=model,
642 entity=application,
643 progress_timeout=progress_timeout,
644 total_timeout=total_timeout,
645 db_dict=db_dict,
646 n2vc=self.n2vc,
647 vca_id=self.vca_connection._vca_id,
648 )
649 self.log.debug(
650 "Application {} is ready in model {}".format(
651 application_name, model_name
652 )
653 )
654 else:
655 raise JujuApplicationExists(
656 "Application {} exists".format(application_name)
657 )
658 finally:
659 await self.disconnect_model(model)
660 await self.disconnect_controller(controller)
661
662 return application
663
664 async def scale_application(
665 self,
666 model_name: str,
667 application_name: str,
668 scale: int = 1,
669 total_timeout: float = None,
670 ):
671 """
672 Scale application (K8s)
673
674 :param: model_name: Model name
675 :param: application_name: Application name
676 :param: scale: Scale to which to set this application
677 :param: total_timeout: Timeout for the entity to be active
678 """
679
680 model = None
681 controller = await self.get_controller()
682 try:
683 model = await self.get_model(controller, model_name)
684
685 self.log.debug(
686 "Scaling application {} in model {}".format(
687 application_name, model_name
688 )
689 )
690 application = self._get_application(model, application_name)
691 if application is None:
692 raise JujuApplicationNotFound("Cannot scale application")
693 await application.scale(scale=scale)
694 # Wait until application is scaled in model
695 self.log.debug(
696 "Waiting for application {} to be scaled in model {}...".format
697 (
698 application_name, model_name
699 )
700 )
701 if total_timeout is None:
702 total_timeout = 1800
703 end = time.time() + total_timeout
704 while time.time() < end:
705 application_scale = self._get_application_count(model, application_name)
706 # Before calling wait_for_model function,
707 # wait until application unit count and scale count are equal.
708 # Because there is a delay before scaling triggers in Juju model.
709 if application_scale == scale:
710 await JujuModelWatcher.wait_for_model(model=model, timeout=total_timeout)
711 self.log.debug(
712 "Application {} is scaled in model {}".format(
713 application_name, model_name
714 )
715 )
716 return
717 await asyncio.sleep(5)
718 raise Exception(
719 "Timeout waiting for application {} in model {} to be scaled".format(
720 application_name, model_name
721 )
722 )
723 finally:
724 if model:
725 await self.disconnect_model(model)
726 await self.disconnect_controller(controller)
727
728 def _get_application_count(self, model: Model, application_name: str) -> int:
729 """Get number of units of the application
730
731 :param: model: Model object
732 :param: application_name: Application name
733
734 :return: int (or None if application doesn't exist)
735 """
736 application = self._get_application(model, application_name)
737 if application is not None:
738 return len(application.units)
739
740 def _get_application(self, model: Model, application_name: str) -> Application:
741 """Get application
742
743 :param: model: Model object
744 :param: application_name: Application name
745
746 :return: juju.application.Application (or None if it doesn't exist)
747 """
748 if model.applications and application_name in model.applications:
749 return model.applications[application_name]
750
751 async def execute_action(
752 self,
753 application_name: str,
754 model_name: str,
755 action_name: str,
756 db_dict: dict = None,
757 progress_timeout: float = None,
758 total_timeout: float = None,
759 **kwargs
760 ):
761 """Execute action
762
763 :param: application_name: Application name
764 :param: model_name: Model name
765 :param: action_name: Name of the action
766 :param: db_dict: Dictionary with data of the DB to write the updates
767 :param: progress_timeout: Maximum time between two updates in the model
768 :param: total_timeout: Timeout for the entity to be active
769
770 :return: (str, str): (output and status)
771 """
772 self.log.debug(
773 "Executing action {} using params {}".format(action_name, kwargs)
774 )
775 # Get controller
776 controller = await self.get_controller()
777
778 # Get model
779 model = await self.get_model(controller, model_name)
780
781 try:
782 # Get application
783 application = self._get_application(
784 model,
785 application_name=application_name,
786 )
787 if application is None:
788 raise JujuApplicationNotFound("Cannot execute action")
789
790 # Get leader unit
791 # Racing condition:
792 # Ocassionally, self._get_leader_unit() will return None
793 # because the leader elected hook has not been triggered yet.
794 # Therefore, we are doing some retries. If it happens again,
795 # re-open bug 1236
796 unit = await self._get_leader_unit(application)
797
798 actions = await application.get_actions()
799
800 if action_name not in actions:
801 raise JujuActionNotFound(
802 "Action {} not in available actions".format(action_name)
803 )
804
805 action = await unit.run_action(action_name, **kwargs)
806
807 self.log.debug(
808 "Wait until action {} is completed in application {} (model={})".format(
809 action_name, application_name, model_name
810 )
811 )
812 await JujuModelWatcher.wait_for(
813 model=model,
814 entity=action,
815 progress_timeout=progress_timeout,
816 total_timeout=total_timeout,
817 db_dict=db_dict,
818 n2vc=self.n2vc,
819 vca_id=self.vca_connection._vca_id,
820 )
821
822 output = await model.get_action_output(action_uuid=action.entity_id)
823 status = await model.get_action_status(uuid_or_prefix=action.entity_id)
824 status = (
825 status[action.entity_id] if action.entity_id in status else "failed"
826 )
827
828 self.log.debug(
829 "Action {} completed with status {} in application {} (model={})".format(
830 action_name, action.status, application_name, model_name
831 )
832 )
833 finally:
834 await self.disconnect_model(model)
835 await self.disconnect_controller(controller)
836
837 return output, status
838
839 async def get_actions(self, application_name: str, model_name: str) -> dict:
840 """Get list of actions
841
842 :param: application_name: Application name
843 :param: model_name: Model name
844
845 :return: Dict with this format
846 {
847 "action_name": "Description of the action",
848 ...
849 }
850 """
851 self.log.debug(
852 "Getting list of actions for application {}".format(application_name)
853 )
854
855 # Get controller
856 controller = await self.get_controller()
857
858 # Get model
859 model = await self.get_model(controller, model_name)
860
861 try:
862 # Get application
863 application = self._get_application(
864 model,
865 application_name=application_name,
866 )
867
868 # Return list of actions
869 return await application.get_actions()
870
871 finally:
872 # Disconnect from model and controller
873 await self.disconnect_model(model)
874 await self.disconnect_controller(controller)
875
876 async def get_metrics(self, model_name: str, application_name: str) -> dict:
877 """Get the metrics collected by the VCA.
878
879 :param model_name The name or unique id of the network service
880 :param application_name The name of the application
881 """
882 if not model_name or not application_name:
883 raise Exception("model_name and application_name must be non-empty strings")
884 metrics = {}
885 controller = await self.get_controller()
886 model = await self.get_model(controller, model_name)
887 try:
888 application = self._get_application(model, application_name)
889 if application is not None:
890 metrics = await application.get_metrics()
891 finally:
892 self.disconnect_model(model)
893 self.disconnect_controller(controller)
894 return metrics
895
896 async def add_relation(
897 self,
898 model_name: str,
899 endpoint_1: str,
900 endpoint_2: str,
901 ):
902 """Add relation
903
904 :param: model_name: Model name
905 :param: endpoint_1 First endpoint name
906 ("app:endpoint" format or directly the saas name)
907 :param: endpoint_2: Second endpoint name (^ same format)
908 """
909
910 self.log.debug("Adding relation: {} -> {}".format(endpoint_1, endpoint_2))
911
912 # Get controller
913 controller = await self.get_controller()
914
915 # Get model
916 model = await self.get_model(controller, model_name)
917
918 # Add relation
919 try:
920 await model.add_relation(endpoint_1, endpoint_2)
921 except JujuAPIError as e:
922 if "not found" in e.message:
923 self.log.warning("Relation not found: {}".format(e.message))
924 return
925 if "already exists" in e.message:
926 self.log.warning("Relation already exists: {}".format(e.message))
927 return
928 # another exception, raise it
929 raise e
930 finally:
931 await self.disconnect_model(model)
932 await self.disconnect_controller(controller)
933
934 async def consume(
935 self,
936 offer_url: str,
937 model_name: str,
938 ):
939 """
940 Adds a remote offer to the model. Relations can be created later using "juju relate".
941
942 :param: offer_url: Offer Url
943 :param: model_name: Model name
944
945 :raises ParseError if there's a problem parsing the offer_url
946 :raises JujuError if remote offer includes and endpoint
947 :raises JujuAPIError if the operation is not successful
948 """
949 controller = await self.get_controller()
950 model = await controller.get_model(model_name)
951
952 try:
953 await model.consume(offer_url)
954 finally:
955 await self.disconnect_model(model)
956 await self.disconnect_controller(controller)
957
958 async def destroy_model(self, model_name: str, total_timeout: float):
959 """
960 Destroy model
961
962 :param: model_name: Model name
963 :param: total_timeout: Timeout
964 """
965
966 controller = await self.get_controller()
967 model = None
968 try:
969 if not await self.model_exists(model_name, controller=controller):
970 return
971
972 model = await self.get_model(controller, model_name)
973 self.log.debug("Destroying model {}".format(model_name))
974 uuid = model.info.uuid
975
976 # Destroy machines that are manually provisioned
977 # and still are in pending state
978 await self._destroy_pending_machines(model, only_manual=True)
979
980 # Disconnect model
981 await self.disconnect_model(model)
982
983 await controller.destroy_model(uuid, force=True, max_wait=0)
984
985 # Wait until model is destroyed
986 self.log.debug("Waiting for model {} to be destroyed...".format(model_name))
987
988 if total_timeout is None:
989 total_timeout = 3600
990 end = time.time() + total_timeout
991 while time.time() < end:
992 models = await controller.list_models()
993 if model_name not in models:
994 self.log.debug(
995 "The model {} ({}) was destroyed".format(model_name, uuid)
996 )
997 return
998 await asyncio.sleep(5)
999 raise Exception(
1000 "Timeout waiting for model {} to be destroyed".format(model_name)
1001 )
1002 except Exception as e:
1003 if model:
1004 await self.disconnect_model(model)
1005 raise e
1006 finally:
1007 await self.disconnect_controller(controller)
1008
1009 async def destroy_application(
1010 self, model_name: str, application_name: str, total_timeout: float
1011 ):
1012 """
1013 Destroy application
1014
1015 :param: model_name: Model name
1016 :param: application_name: Application name
1017 :param: total_timeout: Timeout
1018 """
1019
1020 controller = await self.get_controller()
1021 model = None
1022
1023 try:
1024 model = await self.get_model(controller, model_name)
1025 self.log.debug(
1026 "Destroying application {} in model {}".format(
1027 application_name, model_name
1028 )
1029 )
1030 application = self._get_application(model, application_name)
1031 if application:
1032 await application.destroy()
1033 else:
1034 self.log.warning("Application not found: {}".format(application_name))
1035
1036 self.log.debug(
1037 "Waiting for application {} to be destroyed in model {}...".format(
1038 application_name, model_name
1039 )
1040 )
1041 if total_timeout is None:
1042 total_timeout = 3600
1043 end = time.time() + total_timeout
1044 while time.time() < end:
1045 if not self._get_application(model, application_name):
1046 self.log.debug(
1047 "The application {} was destroyed in model {} ".format(
1048 application_name, model_name
1049 )
1050 )
1051 return
1052 await asyncio.sleep(5)
1053 raise Exception(
1054 "Timeout waiting for application {} to be destroyed in model {}".format(
1055 application_name, model_name
1056 )
1057 )
1058 finally:
1059 if model is not None:
1060 await self.disconnect_model(model)
1061 await self.disconnect_controller(controller)
1062
1063 async def _destroy_pending_machines(self, model: Model, only_manual: bool = False):
1064 """
1065 Destroy pending machines in a given model
1066
1067 :param: only_manual: Bool that indicates only manually provisioned
1068 machines should be destroyed (if True), or that
1069 all pending machines should be destroyed
1070 """
1071 status = await model.get_status()
1072 for machine_id in status.machines:
1073 machine_status = status.machines[machine_id]
1074 if machine_status.agent_status.status == "pending":
1075 if only_manual and not machine_status.instance_id.startswith("manual:"):
1076 break
1077 machine = model.machines[machine_id]
1078 await machine.destroy(force=True)
1079
1080 async def configure_application(
1081 self, model_name: str, application_name: str, config: dict = None
1082 ):
1083 """Configure application
1084
1085 :param: model_name: Model name
1086 :param: application_name: Application name
1087 :param: config: Config to apply to the charm
1088 """
1089 self.log.debug("Configuring application {}".format(application_name))
1090
1091 if config:
1092 controller = await self.get_controller()
1093 model = None
1094 try:
1095 model = await self.get_model(controller, model_name)
1096 application = self._get_application(
1097 model,
1098 application_name=application_name,
1099 )
1100 await application.set_config(config)
1101 finally:
1102 if model:
1103 await self.disconnect_model(model)
1104 await self.disconnect_controller(controller)
1105
1106 def handle_exception(self, loop, context):
1107 # All unhandled exceptions by libjuju are handled here.
1108 pass
1109
1110 async def health_check(self, interval: float = 300.0):
1111 """
1112 Health check to make sure controller and controller_model connections are OK
1113
1114 :param: interval: Time in seconds between checks
1115 """
1116 controller = None
1117 while True:
1118 try:
1119 controller = await self.get_controller()
1120 # self.log.debug("VCA is alive")
1121 except Exception as e:
1122 self.log.error("Health check to VCA failed: {}".format(e))
1123 finally:
1124 await self.disconnect_controller(controller)
1125 await asyncio.sleep(interval)
1126
1127 async def list_models(self, contains: str = None) -> [str]:
1128 """List models with certain names
1129
1130 :param: contains: String that is contained in model name
1131
1132 :retur: [models] Returns list of model names
1133 """
1134
1135 controller = await self.get_controller()
1136 try:
1137 models = await controller.list_models()
1138 if contains:
1139 models = [model for model in models if contains in model]
1140 return models
1141 finally:
1142 await self.disconnect_controller(controller)
1143
1144 async def list_offers(self, model_name: str) -> QueryApplicationOffersResults:
1145 """List models with certain names
1146
1147 :param: model_name: Model name
1148
1149 :return: Returns list of offers
1150 """
1151
1152 controller = await self.get_controller()
1153 try:
1154 return await controller.list_offers(model_name)
1155 finally:
1156 await self.disconnect_controller(controller)
1157
1158 async def add_k8s(
1159 self,
1160 name: str,
1161 rbac_id: str,
1162 token: str,
1163 client_cert_data: str,
1164 configuration: Configuration,
1165 storage_class: str,
1166 credential_name: str = None,
1167 ):
1168 """
1169 Add a Kubernetes cloud to the controller
1170
1171 Similar to the `juju add-k8s` command in the CLI
1172
1173 :param: name: Name for the K8s cloud
1174 :param: configuration: Kubernetes configuration object
1175 :param: storage_class: Storage Class to use in the cloud
1176 :param: credential_name: Storage Class to use in the cloud
1177 """
1178
1179 if not storage_class:
1180 raise Exception("storage_class must be a non-empty string")
1181 if not name:
1182 raise Exception("name must be a non-empty string")
1183 if not configuration:
1184 raise Exception("configuration must be provided")
1185
1186 endpoint = configuration.host
1187 credential = self.get_k8s_cloud_credential(
1188 configuration,
1189 client_cert_data,
1190 token,
1191 )
1192 credential.attrs[RBAC_LABEL_KEY_NAME] = rbac_id
1193 cloud = client.Cloud(
1194 type_="kubernetes",
1195 auth_types=[credential.auth_type],
1196 endpoint=endpoint,
1197 ca_certificates=[client_cert_data],
1198 config={
1199 "operator-storage": storage_class,
1200 "workload-storage": storage_class,
1201 },
1202 )
1203
1204 return await self.add_cloud(
1205 name, cloud, credential, credential_name=credential_name
1206 )
1207
1208 def get_k8s_cloud_credential(
1209 self,
1210 configuration: Configuration,
1211 client_cert_data: str,
1212 token: str = None,
1213 ) -> client.CloudCredential:
1214 attrs = {}
1215 # TODO: Test with AKS
1216 key = None # open(configuration.key_file, "r").read()
1217 username = configuration.username
1218 password = configuration.password
1219
1220 if client_cert_data:
1221 attrs["ClientCertificateData"] = client_cert_data
1222 if key:
1223 attrs["ClientKeyData"] = key
1224 if token:
1225 if username or password:
1226 raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass")
1227 attrs["Token"] = token
1228
1229 auth_type = None
1230 if key:
1231 auth_type = "oauth2"
1232 if client_cert_data:
1233 auth_type = "oauth2withcert"
1234 if not token:
1235 raise JujuInvalidK8sConfiguration(
1236 "missing token for auth type {}".format(auth_type)
1237 )
1238 elif username:
1239 if not password:
1240 self.log.debug(
1241 "credential for user {} has empty password".format(username)
1242 )
1243 attrs["username"] = username
1244 attrs["password"] = password
1245 if client_cert_data:
1246 auth_type = "userpasswithcert"
1247 else:
1248 auth_type = "userpass"
1249 elif client_cert_data and token:
1250 auth_type = "certificate"
1251 else:
1252 raise JujuInvalidK8sConfiguration("authentication method not supported")
1253 return client.CloudCredential(auth_type=auth_type, attrs=attrs)
1254
1255 async def add_cloud(
1256 self,
1257 name: str,
1258 cloud: Cloud,
1259 credential: CloudCredential = None,
1260 credential_name: str = None,
1261 ) -> Cloud:
1262 """
1263 Add cloud to the controller
1264
1265 :param: name: Name of the cloud to be added
1266 :param: cloud: Cloud object
1267 :param: credential: CloudCredentials object for the cloud
1268 :param: credential_name: Credential name.
1269 If not defined, cloud of the name will be used.
1270 """
1271 controller = await self.get_controller()
1272 try:
1273 _ = await controller.add_cloud(name, cloud)
1274 if credential:
1275 await controller.add_credential(
1276 credential_name or name, credential=credential, cloud=name
1277 )
1278 # Need to return the object returned by the controller.add_cloud() function
1279 # I'm returning the original value now until this bug is fixed:
1280 # https://github.com/juju/python-libjuju/issues/443
1281 return cloud
1282 finally:
1283 await self.disconnect_controller(controller)
1284
1285 async def remove_cloud(self, name: str):
1286 """
1287 Remove cloud
1288
1289 :param: name: Name of the cloud to be removed
1290 """
1291 controller = await self.get_controller()
1292 try:
1293 await controller.remove_cloud(name)
1294 finally:
1295 await self.disconnect_controller(controller)
1296
1297 @retry(attempts=20, delay=5, fallback=JujuLeaderUnitNotFound())
1298 async def _get_leader_unit(self, application: Application) -> Unit:
1299 unit = None
1300 for u in application.units:
1301 if await u.is_leader_from_status():
1302 unit = u
1303 break
1304 if not unit:
1305 raise Exception()
1306 return unit
1307
1308 async def get_cloud_credentials(self, cloud: Cloud) -> typing.List:
1309 """
1310 Get cloud credentials
1311
1312 :param: cloud: Cloud object. The returned credentials will be from this cloud.
1313
1314 :return: List of credentials object associated to the specified cloud
1315
1316 """
1317 controller = await self.get_controller()
1318 try:
1319 facade = client.CloudFacade.from_connection(controller.connection())
1320 cloud_cred_tag = tag.credential(
1321 cloud.name, self.vca_connection.data.user, cloud.credential_name
1322 )
1323 params = [client.Entity(cloud_cred_tag)]
1324 return (await facade.Credential(params)).results
1325 finally:
1326 await self.disconnect_controller(controller)