Bug 1609 fix
[osm/N2VC.git] / n2vc / libjuju.py
1 # Copyright 2020 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import logging
17 import typing
18
19 import time
20
21 import juju.errors
22 from juju.model import Model
23 from juju.machine import Machine
24 from juju.application import Application
25 from juju.unit import Unit
26 from juju.client._definitions import (
27 FullStatus,
28 QueryApplicationOffersResults,
29 Cloud,
30 CloudCredential,
31 )
32 from juju.controller import Controller
33 from juju.client import client
34 from juju import tag
35
36 from n2vc.juju_watcher import JujuModelWatcher
37 from n2vc.provisioner import AsyncSSHProvisioner
38 from n2vc.n2vc_conn import N2VCConnector
39 from n2vc.exceptions import (
40 JujuMachineNotFound,
41 JujuApplicationNotFound,
42 JujuLeaderUnitNotFound,
43 JujuActionNotFound,
44 JujuControllerFailedConnecting,
45 JujuApplicationExists,
46 JujuInvalidK8sConfiguration,
47 JujuError,
48 )
49 from n2vc.vca.cloud import Cloud as VcaCloud
50 from n2vc.vca.connection import Connection
51 from kubernetes.client.configuration import Configuration
52 from retrying_async import retry
53
54
55 RBAC_LABEL_KEY_NAME = "rbac-id"
56
57
58 class Libjuju:
59 def __init__(
60 self,
61 vca_connection: Connection,
62 loop: asyncio.AbstractEventLoop = None,
63 log: logging.Logger = None,
64 n2vc: N2VCConnector = None,
65 ):
66 """
67 Constructor
68
69 :param: vca_connection: n2vc.vca.connection object
70 :param: loop: Asyncio loop
71 :param: log: Logger
72 :param: n2vc: N2VC object
73 """
74
75 self.log = log or logging.getLogger("Libjuju")
76 self.n2vc = n2vc
77 self.vca_connection = vca_connection
78
79 self.loop = loop or asyncio.get_event_loop()
80 self.loop.set_exception_handler(self.handle_exception)
81 self.creating_model = asyncio.Lock(loop=self.loop)
82
83 if self.vca_connection.is_default:
84 self.health_check_task = self._create_health_check_task()
85
86 def _create_health_check_task(self):
87 return self.loop.create_task(self.health_check())
88
89 async def get_controller(self, timeout: float = 60.0) -> Controller:
90 """
91 Get controller
92
93 :param: timeout: Time in seconds to wait for controller to connect
94 """
95 controller = None
96 try:
97 controller = Controller(loop=self.loop)
98 await asyncio.wait_for(
99 controller.connect(
100 endpoint=self.vca_connection.data.endpoints,
101 username=self.vca_connection.data.user,
102 password=self.vca_connection.data.secret,
103 cacert=self.vca_connection.data.cacert,
104 ),
105 timeout=timeout,
106 )
107 if self.vca_connection.is_default:
108 endpoints = await controller.api_endpoints
109 if not all(
110 endpoint in self.vca_connection.endpoints for endpoint in endpoints
111 ):
112 await self.vca_connection.update_endpoints(endpoints)
113 return controller
114 except asyncio.CancelledError as e:
115 raise e
116 except Exception as e:
117 self.log.error(
118 "Failed connecting to controller: {}... {}".format(
119 self.vca_connection.data.endpoints, e
120 )
121 )
122 if controller:
123 await self.disconnect_controller(controller)
124 raise JujuControllerFailedConnecting(e)
125
126 async def disconnect(self):
127 """Disconnect"""
128 # Cancel health check task
129 self.health_check_task.cancel()
130 self.log.debug("Libjuju disconnected!")
131
132 async def disconnect_model(self, model: Model):
133 """
134 Disconnect model
135
136 :param: model: Model that will be disconnected
137 """
138 await model.disconnect()
139
140 async def disconnect_controller(self, controller: Controller):
141 """
142 Disconnect controller
143
144 :param: controller: Controller that will be disconnected
145 """
146 if controller:
147 await controller.disconnect()
148
149 @retry(attempts=3, delay=5, timeout=None)
150 async def add_model(self, model_name: str, cloud: VcaCloud):
151 """
152 Create model
153
154 :param: model_name: Model name
155 :param: cloud: Cloud object
156 """
157
158 # Get controller
159 controller = await self.get_controller()
160 model = None
161 try:
162 # Block until other workers have finished model creation
163 while self.creating_model.locked():
164 await asyncio.sleep(0.1)
165
166 # Create the model
167 async with self.creating_model:
168 if await self.model_exists(model_name, controller=controller):
169 return
170 self.log.debug("Creating model {}".format(model_name))
171 model = await controller.add_model(
172 model_name,
173 config=self.vca_connection.data.model_config,
174 cloud_name=cloud.name,
175 credential_name=cloud.credential_name,
176 )
177 except juju.errors.JujuAPIError as e:
178 if "already exists" in e.message:
179 pass
180 else:
181 raise e
182 finally:
183 if model:
184 await self.disconnect_model(model)
185 await self.disconnect_controller(controller)
186
187 async def get_executed_actions(self, model_name: str) -> list:
188 """
189 Get executed/history of actions for a model.
190
191 :param: model_name: Model name, str.
192 :return: List of executed actions for a model.
193 """
194 model = None
195 executed_actions = []
196 controller = await self.get_controller()
197 try:
198 model = await self.get_model(controller, model_name)
199 # Get all unique action names
200 actions = {}
201 for application in model.applications:
202 application_actions = await self.get_actions(application, model_name)
203 actions.update(application_actions)
204 # Get status of all actions
205 for application_action in actions:
206 app_action_status_list = await model.get_action_status(
207 name=application_action
208 )
209 for action_id, action_status in app_action_status_list.items():
210 executed_action = {
211 "id": action_id,
212 "action": application_action,
213 "status": action_status,
214 }
215 # Get action output by id
216 action_status = await model.get_action_output(executed_action["id"])
217 for k, v in action_status.items():
218 executed_action[k] = v
219 executed_actions.append(executed_action)
220 except Exception as e:
221 raise JujuError(
222 "Error in getting executed actions for model: {}. Error: {}".format(
223 model_name, str(e)
224 )
225 )
226 finally:
227 if model:
228 await self.disconnect_model(model)
229 await self.disconnect_controller(controller)
230 return executed_actions
231
232 async def get_application_configs(
233 self, model_name: str, application_name: str
234 ) -> dict:
235 """
236 Get available configs for an application.
237
238 :param: model_name: Model name, str.
239 :param: application_name: Application name, str.
240
241 :return: A dict which has key - action name, value - action description
242 """
243 model = None
244 application_configs = {}
245 controller = await self.get_controller()
246 try:
247 model = await self.get_model(controller, model_name)
248 application = self._get_application(
249 model, application_name=application_name
250 )
251 application_configs = await application.get_config()
252 except Exception as e:
253 raise JujuError(
254 "Error in getting configs for application: {} in model: {}. Error: {}".format(
255 application_name, model_name, str(e)
256 )
257 )
258 finally:
259 if model:
260 await self.disconnect_model(model)
261 await self.disconnect_controller(controller)
262 return application_configs
263
264 @retry(attempts=3, delay=5)
265 async def get_model(self, controller: Controller, model_name: str) -> Model:
266 """
267 Get model from controller
268
269 :param: controller: Controller
270 :param: model_name: Model name
271
272 :return: Model: The created Juju model object
273 """
274 return await controller.get_model(model_name)
275
276 async def model_exists(
277 self, model_name: str, controller: Controller = None
278 ) -> bool:
279 """
280 Check if model exists
281
282 :param: controller: Controller
283 :param: model_name: Model name
284
285 :return bool
286 """
287 need_to_disconnect = False
288
289 # Get controller if not passed
290 if not controller:
291 controller = await self.get_controller()
292 need_to_disconnect = True
293
294 # Check if model exists
295 try:
296 return model_name in await controller.list_models()
297 finally:
298 if need_to_disconnect:
299 await self.disconnect_controller(controller)
300
301 async def models_exist(self, model_names: [str]) -> (bool, list):
302 """
303 Check if models exists
304
305 :param: model_names: List of strings with model names
306
307 :return (bool, list[str]): (True if all models exists, List of model names that don't exist)
308 """
309 if not model_names:
310 raise Exception(
311 "model_names must be a non-empty array. Given value: {}".format(
312 model_names
313 )
314 )
315 non_existing_models = []
316 models = await self.list_models()
317 existing_models = list(set(models).intersection(model_names))
318 non_existing_models = list(set(model_names) - set(existing_models))
319
320 return (
321 len(non_existing_models) == 0,
322 non_existing_models,
323 )
324
325 async def get_model_status(self, model_name: str) -> FullStatus:
326 """
327 Get model status
328
329 :param: model_name: Model name
330
331 :return: Full status object
332 """
333 controller = await self.get_controller()
334 model = await self.get_model(controller, model_name)
335 try:
336 return await model.get_status()
337 finally:
338 await self.disconnect_model(model)
339 await self.disconnect_controller(controller)
340
341 async def create_machine(
342 self,
343 model_name: str,
344 machine_id: str = None,
345 db_dict: dict = None,
346 progress_timeout: float = None,
347 total_timeout: float = None,
348 series: str = "bionic",
349 wait: bool = True,
350 ) -> (Machine, bool):
351 """
352 Create machine
353
354 :param: model_name: Model name
355 :param: machine_id: Machine id
356 :param: db_dict: Dictionary with data of the DB to write the updates
357 :param: progress_timeout: Maximum time between two updates in the model
358 :param: total_timeout: Timeout for the entity to be active
359 :param: series: Series of the machine (xenial, bionic, focal, ...)
360 :param: wait: Wait until machine is ready
361
362 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
363 if the machine is new or it already existed
364 """
365 new = False
366 machine = None
367
368 self.log.debug(
369 "Creating machine (id={}) in model: {}".format(machine_id, model_name)
370 )
371
372 # Get controller
373 controller = await self.get_controller()
374
375 # Get model
376 model = await self.get_model(controller, model_name)
377 try:
378 if machine_id is not None:
379 self.log.debug(
380 "Searching machine (id={}) in model {}".format(
381 machine_id, model_name
382 )
383 )
384
385 # Get machines from model and get the machine with machine_id if exists
386 machines = await model.get_machines()
387 if machine_id in machines:
388 self.log.debug(
389 "Machine (id={}) found in model {}".format(
390 machine_id, model_name
391 )
392 )
393 machine = machines[machine_id]
394 else:
395 raise JujuMachineNotFound("Machine {} not found".format(machine_id))
396
397 if machine is None:
398 self.log.debug("Creating a new machine in model {}".format(model_name))
399
400 # Create machine
401 machine = await model.add_machine(
402 spec=None, constraints=None, disks=None, series=series
403 )
404 new = True
405
406 # Wait until the machine is ready
407 self.log.debug(
408 "Wait until machine {} is ready in model {}".format(
409 machine.entity_id, model_name
410 )
411 )
412 if wait:
413 await JujuModelWatcher.wait_for(
414 model=model,
415 entity=machine,
416 progress_timeout=progress_timeout,
417 total_timeout=total_timeout,
418 db_dict=db_dict,
419 n2vc=self.n2vc,
420 vca_id=self.vca_connection._vca_id,
421 )
422 finally:
423 await self.disconnect_model(model)
424 await self.disconnect_controller(controller)
425
426 self.log.debug(
427 "Machine {} ready at {} in model {}".format(
428 machine.entity_id, machine.dns_name, model_name
429 )
430 )
431 return machine, new
432
433 async def provision_machine(
434 self,
435 model_name: str,
436 hostname: str,
437 username: str,
438 private_key_path: str,
439 db_dict: dict = None,
440 progress_timeout: float = None,
441 total_timeout: float = None,
442 ) -> str:
443 """
444 Manually provisioning of a machine
445
446 :param: model_name: Model name
447 :param: hostname: IP to access the machine
448 :param: username: Username to login to the machine
449 :param: private_key_path: Local path for the private key
450 :param: db_dict: Dictionary with data of the DB to write the updates
451 :param: progress_timeout: Maximum time between two updates in the model
452 :param: total_timeout: Timeout for the entity to be active
453
454 :return: (Entity): Machine id
455 """
456 self.log.debug(
457 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
458 model_name, hostname, username
459 )
460 )
461
462 # Get controller
463 controller = await self.get_controller()
464
465 # Get model
466 model = await self.get_model(controller, model_name)
467
468 try:
469 # Get provisioner
470 provisioner = AsyncSSHProvisioner(
471 host=hostname,
472 user=username,
473 private_key_path=private_key_path,
474 log=self.log,
475 )
476
477 # Provision machine
478 params = await provisioner.provision_machine()
479
480 params.jobs = ["JobHostUnits"]
481
482 self.log.debug("Adding machine to model")
483 connection = model.connection()
484 client_facade = client.ClientFacade.from_connection(connection)
485
486 results = await client_facade.AddMachines(params=[params])
487 error = results.machines[0].error
488
489 if error:
490 msg = "Error adding machine: {}".format(error.message)
491 self.log.error(msg=msg)
492 raise ValueError(msg)
493
494 machine_id = results.machines[0].machine
495
496 self.log.debug("Installing Juju agent into machine {}".format(machine_id))
497 asyncio.ensure_future(
498 provisioner.install_agent(
499 connection=connection,
500 nonce=params.nonce,
501 machine_id=machine_id,
502 proxy=self.vca_connection.data.api_proxy,
503 series=params.series,
504 )
505 )
506
507 machine = None
508 for _ in range(10):
509 machine_list = await model.get_machines()
510 if machine_id in machine_list:
511 self.log.debug("Machine {} found in model!".format(machine_id))
512 machine = model.machines.get(machine_id)
513 break
514 await asyncio.sleep(2)
515
516 if machine is None:
517 msg = "Machine {} not found in model".format(machine_id)
518 self.log.error(msg=msg)
519 raise JujuMachineNotFound(msg)
520
521 self.log.debug(
522 "Wait until machine {} is ready in model {}".format(
523 machine.entity_id, model_name
524 )
525 )
526 await JujuModelWatcher.wait_for(
527 model=model,
528 entity=machine,
529 progress_timeout=progress_timeout,
530 total_timeout=total_timeout,
531 db_dict=db_dict,
532 n2vc=self.n2vc,
533 vca_id=self.vca_connection._vca_id,
534 )
535 except Exception as e:
536 raise e
537 finally:
538 await self.disconnect_model(model)
539 await self.disconnect_controller(controller)
540
541 self.log.debug(
542 "Machine provisioned {} in model {}".format(machine_id, model_name)
543 )
544
545 return machine_id
546
547 async def deploy(
548 self, uri: str, model_name: str, wait: bool = True, timeout: float = 3600
549 ):
550 """
551 Deploy bundle or charm: Similar to the juju CLI command `juju deploy`
552
553 :param: uri: Path or Charm Store uri in which the charm or bundle can be found
554 :param: model_name: Model name
555 :param: wait: Indicates whether to wait or not until all applications are active
556 :param: timeout: Time in seconds to wait until all applications are active
557 """
558 controller = await self.get_controller()
559 model = await self.get_model(controller, model_name)
560 try:
561 await model.deploy(uri)
562 if wait:
563 await JujuModelWatcher.wait_for_model(model, timeout=timeout)
564 self.log.debug("All units active in model {}".format(model_name))
565 finally:
566 await self.disconnect_model(model)
567 await self.disconnect_controller(controller)
568
569 async def add_unit(
570 self,
571 application_name: str,
572 model_name: str,
573 machine_id: str,
574 db_dict: dict = None,
575 progress_timeout: float = None,
576 total_timeout: float = None,
577 ):
578 """Add unit
579
580 :param: application_name: Application name
581 :param: model_name: Model name
582 :param: machine_id Machine id
583 :param: db_dict: Dictionary with data of the DB to write the updates
584 :param: progress_timeout: Maximum time between two updates in the model
585 :param: total_timeout: Timeout for the entity to be active
586
587 :return: None
588 """
589
590 model = None
591 controller = await self.get_controller()
592 try:
593 model = await self.get_model(controller, model_name)
594 application = self._get_application(model, application_name)
595
596 if application is not None:
597
598 # Checks if the given machine id in the model,
599 # otherwise function raises an error
600 _machine, _series = self._get_machine_info(model, machine_id)
601
602 self.log.debug(
603 "Adding unit (machine {}) to application {} in model ~{}".format(
604 machine_id, application_name, model_name
605 )
606 )
607
608 await application.add_unit(to=machine_id)
609
610 await JujuModelWatcher.wait_for(
611 model=model,
612 entity=application,
613 progress_timeout=progress_timeout,
614 total_timeout=total_timeout,
615 db_dict=db_dict,
616 n2vc=self.n2vc,
617 vca_id=self.vca_connection._vca_id,
618 )
619 self.log.debug(
620 "Unit is added to application {} in model {}".format(
621 application_name, model_name
622 )
623 )
624 else:
625 raise JujuApplicationNotFound(
626 "Application {} not exists".format(application_name)
627 )
628 finally:
629 if model:
630 await self.disconnect_model(model)
631 await self.disconnect_controller(controller)
632
633 async def destroy_unit(
634 self,
635 application_name: str,
636 model_name: str,
637 machine_id: str,
638 total_timeout: float = None,
639 ):
640 """Destroy unit
641
642 :param: application_name: Application name
643 :param: model_name: Model name
644 :param: machine_id Machine id
645 :param: db_dict: Dictionary with data of the DB to write the updates
646 :param: total_timeout: Timeout for the entity to be active
647
648 :return: None
649 """
650
651 model = None
652 controller = await self.get_controller()
653 try:
654 model = await self.get_model(controller, model_name)
655 application = self._get_application(model, application_name)
656
657 if application is None:
658 raise JujuApplicationNotFound(
659 "Application not found: {} (model={})".format(
660 application_name, model_name
661 )
662 )
663
664 unit = self._get_unit(application, machine_id)
665 if not unit:
666 raise JujuError(
667 "A unit with machine id {} not in available units".format(
668 machine_id
669 )
670 )
671
672 unit_name = unit.name
673
674 self.log.debug(
675 "Destroying unit {} from application {} in model {}".format(
676 unit_name, application_name, model_name
677 )
678 )
679 await application.destroy_unit(unit_name)
680
681 self.log.debug(
682 "Waiting for unit {} to be destroyed in application {} (model={})...".format(
683 unit_name, application_name, model_name
684 )
685 )
686
687 # TODO: Add functionality in the Juju watcher to replace this kind of blocks
688 if total_timeout is None:
689 total_timeout = 3600
690 end = time.time() + total_timeout
691 while time.time() < end:
692 if not self._get_unit(application, machine_id):
693 self.log.debug(
694 "The unit {} was destroyed in application {} (model={}) ".format(
695 unit_name, application_name, model_name
696 )
697 )
698 return
699 await asyncio.sleep(5)
700 self.log.debug(
701 "Unit {} is destroyed from application {} in model {}".format(
702 unit_name, application_name, model_name
703 )
704 )
705 finally:
706 if model:
707 await self.disconnect_model(model)
708 await self.disconnect_controller(controller)
709
710 async def deploy_charm(
711 self,
712 application_name: str,
713 path: str,
714 model_name: str,
715 machine_id: str,
716 db_dict: dict = None,
717 progress_timeout: float = None,
718 total_timeout: float = None,
719 config: dict = None,
720 series: str = None,
721 num_units: int = 1,
722 ):
723 """Deploy charm
724
725 :param: application_name: Application name
726 :param: path: Local path to the charm
727 :param: model_name: Model name
728 :param: machine_id ID of the machine
729 :param: db_dict: Dictionary with data of the DB to write the updates
730 :param: progress_timeout: Maximum time between two updates in the model
731 :param: total_timeout: Timeout for the entity to be active
732 :param: config: Config for the charm
733 :param: series: Series of the charm
734 :param: num_units: Number of units
735
736 :return: (juju.application.Application): Juju application
737 """
738 self.log.debug(
739 "Deploying charm {} to machine {} in model ~{}".format(
740 application_name, machine_id, model_name
741 )
742 )
743 self.log.debug("charm: {}".format(path))
744
745 # Get controller
746 controller = await self.get_controller()
747
748 # Get model
749 model = await self.get_model(controller, model_name)
750
751 try:
752 if application_name not in model.applications:
753
754 if machine_id is not None:
755 machine, series = self._get_machine_info(model, machine_id)
756
757 application = await model.deploy(
758 entity_url=path,
759 application_name=application_name,
760 channel="stable",
761 num_units=1,
762 series=series,
763 to=machine_id,
764 config=config,
765 )
766
767 self.log.debug(
768 "Wait until application {} is ready in model {}".format(
769 application_name, model_name
770 )
771 )
772 if num_units > 1:
773 for _ in range(num_units - 1):
774 m, _ = await self.create_machine(model_name, wait=False)
775 await application.add_unit(to=m.entity_id)
776
777 await JujuModelWatcher.wait_for(
778 model=model,
779 entity=application,
780 progress_timeout=progress_timeout,
781 total_timeout=total_timeout,
782 db_dict=db_dict,
783 n2vc=self.n2vc,
784 vca_id=self.vca_connection._vca_id,
785 )
786 self.log.debug(
787 "Application {} is ready in model {}".format(
788 application_name, model_name
789 )
790 )
791 else:
792 raise JujuApplicationExists(
793 "Application {} exists".format(application_name)
794 )
795 finally:
796 await self.disconnect_model(model)
797 await self.disconnect_controller(controller)
798
799 return application
800
801 async def scale_application(
802 self,
803 model_name: str,
804 application_name: str,
805 scale: int = 1,
806 total_timeout: float = None,
807 ):
808 """
809 Scale application (K8s)
810
811 :param: model_name: Model name
812 :param: application_name: Application name
813 :param: scale: Scale to which to set this application
814 :param: total_timeout: Timeout for the entity to be active
815 """
816
817 model = None
818 controller = await self.get_controller()
819 try:
820 model = await self.get_model(controller, model_name)
821
822 self.log.debug(
823 "Scaling application {} in model {}".format(
824 application_name, model_name
825 )
826 )
827 application = self._get_application(model, application_name)
828 if application is None:
829 raise JujuApplicationNotFound("Cannot scale application")
830 await application.scale(scale=scale)
831 # Wait until application is scaled in model
832 self.log.debug(
833 "Waiting for application {} to be scaled in model {}...".format(
834 application_name, model_name
835 )
836 )
837 if total_timeout is None:
838 total_timeout = 1800
839 end = time.time() + total_timeout
840 while time.time() < end:
841 application_scale = self._get_application_count(model, application_name)
842 # Before calling wait_for_model function,
843 # wait until application unit count and scale count are equal.
844 # Because there is a delay before scaling triggers in Juju model.
845 if application_scale == scale:
846 await JujuModelWatcher.wait_for_model(
847 model=model, timeout=total_timeout
848 )
849 self.log.debug(
850 "Application {} is scaled in model {}".format(
851 application_name, model_name
852 )
853 )
854 return
855 await asyncio.sleep(5)
856 raise Exception(
857 "Timeout waiting for application {} in model {} to be scaled".format(
858 application_name, model_name
859 )
860 )
861 finally:
862 if model:
863 await self.disconnect_model(model)
864 await self.disconnect_controller(controller)
865
866 def _get_application_count(self, model: Model, application_name: str) -> int:
867 """Get number of units of the application
868
869 :param: model: Model object
870 :param: application_name: Application name
871
872 :return: int (or None if application doesn't exist)
873 """
874 application = self._get_application(model, application_name)
875 if application is not None:
876 return len(application.units)
877
878 def _get_application(self, model: Model, application_name: str) -> Application:
879 """Get application
880
881 :param: model: Model object
882 :param: application_name: Application name
883
884 :return: juju.application.Application (or None if it doesn't exist)
885 """
886 if model.applications and application_name in model.applications:
887 return model.applications[application_name]
888
889 def _get_unit(self, application: Application, machine_id: str) -> Unit:
890 """Get unit
891
892 :param: application: Application object
893 :param: machine_id: Machine id
894
895 :return: Unit
896 """
897 unit = None
898 for u in application.units:
899 if u.machine_id == machine_id:
900 unit = u
901 break
902 return unit
903
904 def _get_machine_info(
905 self,
906 model,
907 machine_id: str,
908 ) -> (str, str):
909 """Get machine info
910
911 :param: model: Model object
912 :param: machine_id: Machine id
913
914 :return: (str, str): (machine, series)
915 """
916 if machine_id not in model.machines:
917 msg = "Machine {} not found in model".format(machine_id)
918 self.log.error(msg=msg)
919 raise JujuMachineNotFound(msg)
920 machine = model.machines[machine_id]
921 return machine, machine.series
922
923 async def execute_action(
924 self,
925 application_name: str,
926 model_name: str,
927 action_name: str,
928 db_dict: dict = None,
929 machine_id: str = None,
930 progress_timeout: float = None,
931 total_timeout: float = None,
932 **kwargs,
933 ):
934 """Execute action
935
936 :param: application_name: Application name
937 :param: model_name: Model name
938 :param: action_name: Name of the action
939 :param: db_dict: Dictionary with data of the DB to write the updates
940 :param: machine_id Machine id
941 :param: progress_timeout: Maximum time between two updates in the model
942 :param: total_timeout: Timeout for the entity to be active
943
944 :return: (str, str): (output and status)
945 """
946 self.log.debug(
947 "Executing action {} using params {}".format(action_name, kwargs)
948 )
949 # Get controller
950 controller = await self.get_controller()
951
952 # Get model
953 model = await self.get_model(controller, model_name)
954
955 try:
956 # Get application
957 application = self._get_application(
958 model,
959 application_name=application_name,
960 )
961 if application is None:
962 raise JujuApplicationNotFound("Cannot execute action")
963 # Racing condition:
964 # Ocassionally, self._get_leader_unit() will return None
965 # because the leader elected hook has not been triggered yet.
966 # Therefore, we are doing some retries. If it happens again,
967 # re-open bug 1236
968 if machine_id is None:
969 unit = await self._get_leader_unit(application)
970 self.log.debug(
971 "Action {} is being executed on the leader unit {}".format(
972 action_name, unit.name
973 )
974 )
975 else:
976 unit = self._get_unit(application, machine_id)
977 if not unit:
978 raise JujuError(
979 "A unit with machine id {} not in available units".format(
980 machine_id
981 )
982 )
983 self.log.debug(
984 "Action {} is being executed on {} unit".format(
985 action_name, unit.name
986 )
987 )
988
989 actions = await application.get_actions()
990
991 if action_name not in actions:
992 raise JujuActionNotFound(
993 "Action {} not in available actions".format(action_name)
994 )
995
996 action = await unit.run_action(action_name, **kwargs)
997
998 self.log.debug(
999 "Wait until action {} is completed in application {} (model={})".format(
1000 action_name, application_name, model_name
1001 )
1002 )
1003 await JujuModelWatcher.wait_for(
1004 model=model,
1005 entity=action,
1006 progress_timeout=progress_timeout,
1007 total_timeout=total_timeout,
1008 db_dict=db_dict,
1009 n2vc=self.n2vc,
1010 vca_id=self.vca_connection._vca_id,
1011 )
1012
1013 output = await model.get_action_output(action_uuid=action.entity_id)
1014 status = await model.get_action_status(uuid_or_prefix=action.entity_id)
1015 status = (
1016 status[action.entity_id] if action.entity_id in status else "failed"
1017 )
1018
1019 self.log.debug(
1020 "Action {} completed with status {} in application {} (model={})".format(
1021 action_name, action.status, application_name, model_name
1022 )
1023 )
1024 finally:
1025 await self.disconnect_model(model)
1026 await self.disconnect_controller(controller)
1027
1028 return output, status
1029
1030 async def get_actions(self, application_name: str, model_name: str) -> dict:
1031 """Get list of actions
1032
1033 :param: application_name: Application name
1034 :param: model_name: Model name
1035
1036 :return: Dict with this format
1037 {
1038 "action_name": "Description of the action",
1039 ...
1040 }
1041 """
1042 self.log.debug(
1043 "Getting list of actions for application {}".format(application_name)
1044 )
1045
1046 # Get controller
1047 controller = await self.get_controller()
1048
1049 # Get model
1050 model = await self.get_model(controller, model_name)
1051
1052 try:
1053 # Get application
1054 application = self._get_application(
1055 model,
1056 application_name=application_name,
1057 )
1058
1059 # Return list of actions
1060 return await application.get_actions()
1061
1062 finally:
1063 # Disconnect from model and controller
1064 await self.disconnect_model(model)
1065 await self.disconnect_controller(controller)
1066
1067 async def get_metrics(self, model_name: str, application_name: str) -> dict:
1068 """Get the metrics collected by the VCA.
1069
1070 :param model_name The name or unique id of the network service
1071 :param application_name The name of the application
1072 """
1073 if not model_name or not application_name:
1074 raise Exception("model_name and application_name must be non-empty strings")
1075 metrics = {}
1076 controller = await self.get_controller()
1077 model = await self.get_model(controller, model_name)
1078 try:
1079 application = self._get_application(model, application_name)
1080 if application is not None:
1081 metrics = await application.get_metrics()
1082 finally:
1083 self.disconnect_model(model)
1084 self.disconnect_controller(controller)
1085 return metrics
1086
1087 async def add_relation(
1088 self,
1089 model_name: str,
1090 endpoint_1: str,
1091 endpoint_2: str,
1092 ):
1093 """Add relation
1094
1095 :param: model_name: Model name
1096 :param: endpoint_1 First endpoint name
1097 ("app:endpoint" format or directly the saas name)
1098 :param: endpoint_2: Second endpoint name (^ same format)
1099 """
1100
1101 self.log.debug("Adding relation: {} -> {}".format(endpoint_1, endpoint_2))
1102
1103 # Get controller
1104 controller = await self.get_controller()
1105
1106 # Get model
1107 model = await self.get_model(controller, model_name)
1108
1109 # Add relation
1110 try:
1111 await model.add_relation(endpoint_1, endpoint_2)
1112 except juju.errors.JujuAPIError as e:
1113 if "not found" in e.message:
1114 self.log.warning("Relation not found: {}".format(e.message))
1115 return
1116 if "already exists" in e.message:
1117 self.log.warning("Relation already exists: {}".format(e.message))
1118 return
1119 # another exception, raise it
1120 raise e
1121 finally:
1122 await self.disconnect_model(model)
1123 await self.disconnect_controller(controller)
1124
1125 async def consume(
1126 self,
1127 offer_url: str,
1128 model_name: str,
1129 ):
1130 """
1131 Adds a remote offer to the model. Relations can be created later using "juju relate".
1132
1133 :param: offer_url: Offer Url
1134 :param: model_name: Model name
1135
1136 :raises ParseError if there's a problem parsing the offer_url
1137 :raises JujuError if remote offer includes and endpoint
1138 :raises JujuAPIError if the operation is not successful
1139 """
1140 controller = await self.get_controller()
1141 model = await controller.get_model(model_name)
1142
1143 try:
1144 await model.consume(offer_url)
1145 finally:
1146 await self.disconnect_model(model)
1147 await self.disconnect_controller(controller)
1148
1149 async def destroy_model(self, model_name: str, total_timeout: float = 1800):
1150 """
1151 Destroy model
1152
1153 :param: model_name: Model name
1154 :param: total_timeout: Timeout
1155 """
1156
1157 controller = await self.get_controller()
1158 model = None
1159 try:
1160 if not await self.model_exists(model_name, controller=controller):
1161 return
1162
1163 self.log.debug("Destroying model {}".format(model_name))
1164
1165 model = await self.get_model(controller, model_name)
1166 # Destroy machines that are manually provisioned
1167 # and still are in pending state
1168 await self._destroy_pending_machines(model, only_manual=True)
1169 await self.disconnect_model(model)
1170
1171 await self._destroy_model(
1172 model_name,
1173 controller,
1174 timeout=total_timeout,
1175 )
1176 finally:
1177 if model:
1178 await self.disconnect_model(model)
1179 await self.disconnect_controller(controller)
1180
1181 async def _destroy_model(
1182 self, model_name: str, controller: Controller, timeout: float = 1800
1183 ):
1184 """
1185 Destroy model from controller
1186
1187 :param: model: Model name to be removed
1188 :param: controller: Controller object
1189 :param: timeout: Timeout in seconds
1190 """
1191
1192 async def _destroy_model_loop(model_name: str, controller: Controller):
1193 while await self.model_exists(model_name, controller=controller):
1194 await controller.destroy_model(
1195 model_name, destroy_storage=True, force=True, max_wait=0
1196 )
1197 await asyncio.sleep(5)
1198
1199 try:
1200 await asyncio.wait_for(
1201 _destroy_model_loop(model_name, controller), timeout=timeout
1202 )
1203 except asyncio.TimeoutError:
1204 raise Exception(
1205 "Timeout waiting for model {} to be destroyed".format(model_name)
1206 )
1207
1208 async def destroy_application(
1209 self, model_name: str, application_name: str, total_timeout: float
1210 ):
1211 """
1212 Destroy application
1213
1214 :param: model_name: Model name
1215 :param: application_name: Application name
1216 :param: total_timeout: Timeout
1217 """
1218
1219 controller = await self.get_controller()
1220 model = None
1221
1222 try:
1223 model = await self.get_model(controller, model_name)
1224 self.log.debug(
1225 "Destroying application {} in model {}".format(
1226 application_name, model_name
1227 )
1228 )
1229 application = self._get_application(model, application_name)
1230 if application:
1231 await application.destroy()
1232 else:
1233 self.log.warning("Application not found: {}".format(application_name))
1234
1235 self.log.debug(
1236 "Waiting for application {} to be destroyed in model {}...".format(
1237 application_name, model_name
1238 )
1239 )
1240 if total_timeout is None:
1241 total_timeout = 3600
1242 end = time.time() + total_timeout
1243 while time.time() < end:
1244 if not self._get_application(model, application_name):
1245 self.log.debug(
1246 "The application {} was destroyed in model {} ".format(
1247 application_name, model_name
1248 )
1249 )
1250 return
1251 await asyncio.sleep(5)
1252 raise Exception(
1253 "Timeout waiting for application {} to be destroyed in model {}".format(
1254 application_name, model_name
1255 )
1256 )
1257 finally:
1258 if model is not None:
1259 await self.disconnect_model(model)
1260 await self.disconnect_controller(controller)
1261
1262 async def _destroy_pending_machines(self, model: Model, only_manual: bool = False):
1263 """
1264 Destroy pending machines in a given model
1265
1266 :param: only_manual: Bool that indicates only manually provisioned
1267 machines should be destroyed (if True), or that
1268 all pending machines should be destroyed
1269 """
1270 status = await model.get_status()
1271 for machine_id in status.machines:
1272 machine_status = status.machines[machine_id]
1273 if machine_status.agent_status.status == "pending":
1274 if only_manual and not machine_status.instance_id.startswith("manual:"):
1275 break
1276 machine = model.machines[machine_id]
1277 await machine.destroy(force=True)
1278
1279 async def configure_application(
1280 self, model_name: str, application_name: str, config: dict = None
1281 ):
1282 """Configure application
1283
1284 :param: model_name: Model name
1285 :param: application_name: Application name
1286 :param: config: Config to apply to the charm
1287 """
1288 self.log.debug("Configuring application {}".format(application_name))
1289
1290 if config:
1291 controller = await self.get_controller()
1292 model = None
1293 try:
1294 model = await self.get_model(controller, model_name)
1295 application = self._get_application(
1296 model,
1297 application_name=application_name,
1298 )
1299 await application.set_config(config)
1300 finally:
1301 if model:
1302 await self.disconnect_model(model)
1303 await self.disconnect_controller(controller)
1304
1305 def handle_exception(self, loop, context):
1306 # All unhandled exceptions by libjuju are handled here.
1307 pass
1308
1309 async def health_check(self, interval: float = 300.0):
1310 """
1311 Health check to make sure controller and controller_model connections are OK
1312
1313 :param: interval: Time in seconds between checks
1314 """
1315 controller = None
1316 while True:
1317 try:
1318 controller = await self.get_controller()
1319 # self.log.debug("VCA is alive")
1320 except Exception as e:
1321 self.log.error("Health check to VCA failed: {}".format(e))
1322 finally:
1323 await self.disconnect_controller(controller)
1324 await asyncio.sleep(interval)
1325
1326 async def list_models(self, contains: str = None) -> [str]:
1327 """List models with certain names
1328
1329 :param: contains: String that is contained in model name
1330
1331 :retur: [models] Returns list of model names
1332 """
1333
1334 controller = await self.get_controller()
1335 try:
1336 models = await controller.list_models()
1337 if contains:
1338 models = [model for model in models if contains in model]
1339 return models
1340 finally:
1341 await self.disconnect_controller(controller)
1342
1343 async def list_offers(self, model_name: str) -> QueryApplicationOffersResults:
1344 """List models with certain names
1345
1346 :param: model_name: Model name
1347
1348 :return: Returns list of offers
1349 """
1350
1351 controller = await self.get_controller()
1352 try:
1353 return await controller.list_offers(model_name)
1354 finally:
1355 await self.disconnect_controller(controller)
1356
1357 async def add_k8s(
1358 self,
1359 name: str,
1360 rbac_id: str,
1361 token: str,
1362 client_cert_data: str,
1363 configuration: Configuration,
1364 storage_class: str,
1365 credential_name: str = None,
1366 ):
1367 """
1368 Add a Kubernetes cloud to the controller
1369
1370 Similar to the `juju add-k8s` command in the CLI
1371
1372 :param: name: Name for the K8s cloud
1373 :param: configuration: Kubernetes configuration object
1374 :param: storage_class: Storage Class to use in the cloud
1375 :param: credential_name: Storage Class to use in the cloud
1376 """
1377
1378 if not storage_class:
1379 raise Exception("storage_class must be a non-empty string")
1380 if not name:
1381 raise Exception("name must be a non-empty string")
1382 if not configuration:
1383 raise Exception("configuration must be provided")
1384
1385 endpoint = configuration.host
1386 credential = self.get_k8s_cloud_credential(
1387 configuration,
1388 client_cert_data,
1389 token,
1390 )
1391 credential.attrs[RBAC_LABEL_KEY_NAME] = rbac_id
1392 cloud = client.Cloud(
1393 type_="kubernetes",
1394 auth_types=[credential.auth_type],
1395 endpoint=endpoint,
1396 ca_certificates=[client_cert_data],
1397 config={
1398 "operator-storage": storage_class,
1399 "workload-storage": storage_class,
1400 },
1401 )
1402
1403 return await self.add_cloud(
1404 name, cloud, credential, credential_name=credential_name
1405 )
1406
1407 def get_k8s_cloud_credential(
1408 self,
1409 configuration: Configuration,
1410 client_cert_data: str,
1411 token: str = None,
1412 ) -> client.CloudCredential:
1413 attrs = {}
1414 # TODO: Test with AKS
1415 key = None # open(configuration.key_file, "r").read()
1416 username = configuration.username
1417 password = configuration.password
1418
1419 if client_cert_data:
1420 attrs["ClientCertificateData"] = client_cert_data
1421 if key:
1422 attrs["ClientKeyData"] = key
1423 if token:
1424 if username or password:
1425 raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass")
1426 attrs["Token"] = token
1427
1428 auth_type = None
1429 if key:
1430 auth_type = "oauth2"
1431 if client_cert_data:
1432 auth_type = "oauth2withcert"
1433 if not token:
1434 raise JujuInvalidK8sConfiguration(
1435 "missing token for auth type {}".format(auth_type)
1436 )
1437 elif username:
1438 if not password:
1439 self.log.debug(
1440 "credential for user {} has empty password".format(username)
1441 )
1442 attrs["username"] = username
1443 attrs["password"] = password
1444 if client_cert_data:
1445 auth_type = "userpasswithcert"
1446 else:
1447 auth_type = "userpass"
1448 elif client_cert_data and token:
1449 auth_type = "certificate"
1450 else:
1451 raise JujuInvalidK8sConfiguration("authentication method not supported")
1452 return client.CloudCredential(auth_type=auth_type, attrs=attrs)
1453
1454 async def add_cloud(
1455 self,
1456 name: str,
1457 cloud: Cloud,
1458 credential: CloudCredential = None,
1459 credential_name: str = None,
1460 ) -> Cloud:
1461 """
1462 Add cloud to the controller
1463
1464 :param: name: Name of the cloud to be added
1465 :param: cloud: Cloud object
1466 :param: credential: CloudCredentials object for the cloud
1467 :param: credential_name: Credential name.
1468 If not defined, cloud of the name will be used.
1469 """
1470 controller = await self.get_controller()
1471 try:
1472 _ = await controller.add_cloud(name, cloud)
1473 if credential:
1474 await controller.add_credential(
1475 credential_name or name, credential=credential, cloud=name
1476 )
1477 # Need to return the object returned by the controller.add_cloud() function
1478 # I'm returning the original value now until this bug is fixed:
1479 # https://github.com/juju/python-libjuju/issues/443
1480 return cloud
1481 finally:
1482 await self.disconnect_controller(controller)
1483
1484 async def remove_cloud(self, name: str):
1485 """
1486 Remove cloud
1487
1488 :param: name: Name of the cloud to be removed
1489 """
1490 controller = await self.get_controller()
1491 try:
1492 await controller.remove_cloud(name)
1493 except juju.errors.JujuError as e:
1494 if len(e.errors) == 1 and f'cloud "{name}" not found' == e.errors[0]:
1495 self.log.warning(f"Cloud {name} not found, so it could not be deleted.")
1496 else:
1497 raise e
1498 finally:
1499 await self.disconnect_controller(controller)
1500
1501 @retry(attempts=20, delay=5, fallback=JujuLeaderUnitNotFound())
1502 async def _get_leader_unit(self, application: Application) -> Unit:
1503 unit = None
1504 for u in application.units:
1505 if await u.is_leader_from_status():
1506 unit = u
1507 break
1508 if not unit:
1509 raise Exception()
1510 return unit
1511
1512 async def get_cloud_credentials(self, cloud: Cloud) -> typing.List:
1513 """
1514 Get cloud credentials
1515
1516 :param: cloud: Cloud object. The returned credentials will be from this cloud.
1517
1518 :return: List of credentials object associated to the specified cloud
1519
1520 """
1521 controller = await self.get_controller()
1522 try:
1523 facade = client.CloudFacade.from_connection(controller.connection())
1524 cloud_cred_tag = tag.credential(
1525 cloud.name, self.vca_connection.data.user, cloud.credential_name
1526 )
1527 params = [client.Entity(cloud_cred_tag)]
1528 return (await facade.Credential(params)).results
1529 finally:
1530 await self.disconnect_controller(controller)
1531
1532 async def check_application_exists(self, model_name, application_name) -> bool:
1533 """Check application exists
1534
1535 :param: model_name: Model Name
1536 :param: application_name: Application Name
1537
1538 :return: Boolean
1539 """
1540
1541 model = None
1542 controller = await self.get_controller()
1543 try:
1544 model = await self.get_model(controller, model_name)
1545 self.log.debug(
1546 "Checking if application {} exists in model {}".format(
1547 application_name, model_name
1548 )
1549 )
1550 return self._get_application(model, application_name) is not None
1551 finally:
1552 if model:
1553 await self.disconnect_model(model)
1554 await self.disconnect_controller(controller)