Bug 2054 fixed: raising JujuControllerFailedConnecting with error message
[osm/N2VC.git] / n2vc / libjuju.py
1 # Copyright 2020 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import logging
17 import typing
18
19 import time
20
21 import juju.errors
22 from juju.model import Model
23 from juju.machine import Machine
24 from juju.application import Application
25 from juju.unit import Unit
26 from juju.client._definitions import (
27 FullStatus,
28 QueryApplicationOffersResults,
29 Cloud,
30 CloudCredential,
31 )
32 from juju.controller import Controller
33 from juju.client import client
34 from juju import tag
35
36 from n2vc.definitions import Offer, RelationEndpoint
37 from n2vc.juju_watcher import JujuModelWatcher
38 from n2vc.provisioner import AsyncSSHProvisioner
39 from n2vc.n2vc_conn import N2VCConnector
40 from n2vc.exceptions import (
41 JujuMachineNotFound,
42 JujuApplicationNotFound,
43 JujuLeaderUnitNotFound,
44 JujuActionNotFound,
45 JujuControllerFailedConnecting,
46 JujuApplicationExists,
47 JujuInvalidK8sConfiguration,
48 JujuError,
49 )
50 from n2vc.vca.cloud import Cloud as VcaCloud
51 from n2vc.vca.connection import Connection
52 from kubernetes.client.configuration import Configuration
53 from retrying_async import retry
54
55
56 RBAC_LABEL_KEY_NAME = "rbac-id"
57
58
59 class Libjuju:
60 def __init__(
61 self,
62 vca_connection: Connection,
63 loop: asyncio.AbstractEventLoop = None,
64 log: logging.Logger = None,
65 n2vc: N2VCConnector = None,
66 ):
67 """
68 Constructor
69
70 :param: vca_connection: n2vc.vca.connection object
71 :param: loop: Asyncio loop
72 :param: log: Logger
73 :param: n2vc: N2VC object
74 """
75
76 self.log = log or logging.getLogger("Libjuju")
77 self.n2vc = n2vc
78 self.vca_connection = vca_connection
79
80 self.loop = loop or asyncio.get_event_loop()
81 self.loop.set_exception_handler(self.handle_exception)
82 self.creating_model = asyncio.Lock(loop=self.loop)
83
84 if self.vca_connection.is_default:
85 self.health_check_task = self._create_health_check_task()
86
87 def _create_health_check_task(self):
88 return self.loop.create_task(self.health_check())
89
90 async def get_controller(self, timeout: float = 60.0) -> Controller:
91 """
92 Get controller
93
94 :param: timeout: Time in seconds to wait for controller to connect
95 """
96 controller = None
97 try:
98 controller = Controller()
99 await asyncio.wait_for(
100 controller.connect(
101 endpoint=self.vca_connection.data.endpoints,
102 username=self.vca_connection.data.user,
103 password=self.vca_connection.data.secret,
104 cacert=self.vca_connection.data.cacert,
105 ),
106 timeout=timeout,
107 )
108 if self.vca_connection.is_default:
109 endpoints = await controller.api_endpoints
110 if not all(
111 endpoint in self.vca_connection.endpoints for endpoint in endpoints
112 ):
113 await self.vca_connection.update_endpoints(endpoints)
114 return controller
115 except asyncio.CancelledError as e:
116 raise e
117 except Exception as e:
118 self.log.error(
119 "Failed connecting to controller: {}... {}".format(
120 self.vca_connection.data.endpoints, e
121 )
122 )
123 if controller:
124 await self.disconnect_controller(controller)
125
126 raise JujuControllerFailedConnecting(
127 f"Error connecting to Juju controller: {e}"
128 )
129
130 async def disconnect(self):
131 """Disconnect"""
132 # Cancel health check task
133 self.health_check_task.cancel()
134 self.log.debug("Libjuju disconnected!")
135
136 async def disconnect_model(self, model: Model):
137 """
138 Disconnect model
139
140 :param: model: Model that will be disconnected
141 """
142 await model.disconnect()
143
144 async def disconnect_controller(self, controller: Controller):
145 """
146 Disconnect controller
147
148 :param: controller: Controller that will be disconnected
149 """
150 if controller:
151 await controller.disconnect()
152
153 @retry(attempts=3, delay=5, timeout=None)
154 async def add_model(self, model_name: str, cloud: VcaCloud):
155 """
156 Create model
157
158 :param: model_name: Model name
159 :param: cloud: Cloud object
160 """
161
162 # Get controller
163 controller = await self.get_controller()
164 model = None
165 try:
166 # Block until other workers have finished model creation
167 while self.creating_model.locked():
168 await asyncio.sleep(0.1)
169
170 # Create the model
171 async with self.creating_model:
172 if await self.model_exists(model_name, controller=controller):
173 return
174 self.log.debug("Creating model {}".format(model_name))
175 model = await controller.add_model(
176 model_name,
177 config=self.vca_connection.data.model_config,
178 cloud_name=cloud.name,
179 credential_name=cloud.credential_name,
180 )
181 except juju.errors.JujuAPIError as e:
182 if "already exists" in e.message:
183 pass
184 else:
185 raise e
186 finally:
187 if model:
188 await self.disconnect_model(model)
189 await self.disconnect_controller(controller)
190
191 async def get_executed_actions(self, model_name: str) -> list:
192 """
193 Get executed/history of actions for a model.
194
195 :param: model_name: Model name, str.
196 :return: List of executed actions for a model.
197 """
198 model = None
199 executed_actions = []
200 controller = await self.get_controller()
201 try:
202 model = await self.get_model(controller, model_name)
203 # Get all unique action names
204 actions = {}
205 for application in model.applications:
206 application_actions = await self.get_actions(application, model_name)
207 actions.update(application_actions)
208 # Get status of all actions
209 for application_action in actions:
210 app_action_status_list = await model.get_action_status(
211 name=application_action
212 )
213 for action_id, action_status in app_action_status_list.items():
214 executed_action = {
215 "id": action_id,
216 "action": application_action,
217 "status": action_status,
218 }
219 # Get action output by id
220 action_status = await model.get_action_output(executed_action["id"])
221 for k, v in action_status.items():
222 executed_action[k] = v
223 executed_actions.append(executed_action)
224 except Exception as e:
225 raise JujuError(
226 "Error in getting executed actions for model: {}. Error: {}".format(
227 model_name, str(e)
228 )
229 )
230 finally:
231 if model:
232 await self.disconnect_model(model)
233 await self.disconnect_controller(controller)
234 return executed_actions
235
236 async def get_application_configs(
237 self, model_name: str, application_name: str
238 ) -> dict:
239 """
240 Get available configs for an application.
241
242 :param: model_name: Model name, str.
243 :param: application_name: Application name, str.
244
245 :return: A dict which has key - action name, value - action description
246 """
247 model = None
248 application_configs = {}
249 controller = await self.get_controller()
250 try:
251 model = await self.get_model(controller, model_name)
252 application = self._get_application(
253 model, application_name=application_name
254 )
255 application_configs = await application.get_config()
256 except Exception as e:
257 raise JujuError(
258 "Error in getting configs for application: {} in model: {}. Error: {}".format(
259 application_name, model_name, str(e)
260 )
261 )
262 finally:
263 if model:
264 await self.disconnect_model(model)
265 await self.disconnect_controller(controller)
266 return application_configs
267
268 @retry(attempts=3, delay=5)
269 async def get_model(self, controller: Controller, model_name: str) -> Model:
270 """
271 Get model from controller
272
273 :param: controller: Controller
274 :param: model_name: Model name
275
276 :return: Model: The created Juju model object
277 """
278 return await controller.get_model(model_name)
279
280 async def model_exists(
281 self, model_name: str, controller: Controller = None
282 ) -> bool:
283 """
284 Check if model exists
285
286 :param: controller: Controller
287 :param: model_name: Model name
288
289 :return bool
290 """
291 need_to_disconnect = False
292
293 # Get controller if not passed
294 if not controller:
295 controller = await self.get_controller()
296 need_to_disconnect = True
297
298 # Check if model exists
299 try:
300 return model_name in await controller.list_models()
301 finally:
302 if need_to_disconnect:
303 await self.disconnect_controller(controller)
304
305 async def models_exist(self, model_names: [str]) -> (bool, list):
306 """
307 Check if models exists
308
309 :param: model_names: List of strings with model names
310
311 :return (bool, list[str]): (True if all models exists, List of model names that don't exist)
312 """
313 if not model_names:
314 raise Exception(
315 "model_names must be a non-empty array. Given value: {}".format(
316 model_names
317 )
318 )
319 non_existing_models = []
320 models = await self.list_models()
321 existing_models = list(set(models).intersection(model_names))
322 non_existing_models = list(set(model_names) - set(existing_models))
323
324 return (
325 len(non_existing_models) == 0,
326 non_existing_models,
327 )
328
329 async def get_model_status(self, model_name: str) -> FullStatus:
330 """
331 Get model status
332
333 :param: model_name: Model name
334
335 :return: Full status object
336 """
337 controller = await self.get_controller()
338 model = await self.get_model(controller, model_name)
339 try:
340 return await model.get_status()
341 finally:
342 await self.disconnect_model(model)
343 await self.disconnect_controller(controller)
344
345 async def create_machine(
346 self,
347 model_name: str,
348 machine_id: str = None,
349 db_dict: dict = None,
350 progress_timeout: float = None,
351 total_timeout: float = None,
352 series: str = "bionic",
353 wait: bool = True,
354 ) -> (Machine, bool):
355 """
356 Create machine
357
358 :param: model_name: Model name
359 :param: machine_id: Machine id
360 :param: db_dict: Dictionary with data of the DB to write the updates
361 :param: progress_timeout: Maximum time between two updates in the model
362 :param: total_timeout: Timeout for the entity to be active
363 :param: series: Series of the machine (xenial, bionic, focal, ...)
364 :param: wait: Wait until machine is ready
365
366 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
367 if the machine is new or it already existed
368 """
369 new = False
370 machine = None
371
372 self.log.debug(
373 "Creating machine (id={}) in model: {}".format(machine_id, model_name)
374 )
375
376 # Get controller
377 controller = await self.get_controller()
378
379 # Get model
380 model = await self.get_model(controller, model_name)
381 try:
382 if machine_id is not None:
383 self.log.debug(
384 "Searching machine (id={}) in model {}".format(
385 machine_id, model_name
386 )
387 )
388
389 # Get machines from model and get the machine with machine_id if exists
390 machines = await model.get_machines()
391 if machine_id in machines:
392 self.log.debug(
393 "Machine (id={}) found in model {}".format(
394 machine_id, model_name
395 )
396 )
397 machine = machines[machine_id]
398 else:
399 raise JujuMachineNotFound("Machine {} not found".format(machine_id))
400
401 if machine is None:
402 self.log.debug("Creating a new machine in model {}".format(model_name))
403
404 # Create machine
405 machine = await model.add_machine(
406 spec=None, constraints=None, disks=None, series=series
407 )
408 new = True
409
410 # Wait until the machine is ready
411 self.log.debug(
412 "Wait until machine {} is ready in model {}".format(
413 machine.entity_id, model_name
414 )
415 )
416 if wait:
417 await JujuModelWatcher.wait_for(
418 model=model,
419 entity=machine,
420 progress_timeout=progress_timeout,
421 total_timeout=total_timeout,
422 db_dict=db_dict,
423 n2vc=self.n2vc,
424 vca_id=self.vca_connection._vca_id,
425 )
426 finally:
427 await self.disconnect_model(model)
428 await self.disconnect_controller(controller)
429
430 self.log.debug(
431 "Machine {} ready at {} in model {}".format(
432 machine.entity_id, machine.dns_name, model_name
433 )
434 )
435 return machine, new
436
437 async def provision_machine(
438 self,
439 model_name: str,
440 hostname: str,
441 username: str,
442 private_key_path: str,
443 db_dict: dict = None,
444 progress_timeout: float = None,
445 total_timeout: float = None,
446 ) -> str:
447 """
448 Manually provisioning of a machine
449
450 :param: model_name: Model name
451 :param: hostname: IP to access the machine
452 :param: username: Username to login to the machine
453 :param: private_key_path: Local path for the private key
454 :param: db_dict: Dictionary with data of the DB to write the updates
455 :param: progress_timeout: Maximum time between two updates in the model
456 :param: total_timeout: Timeout for the entity to be active
457
458 :return: (Entity): Machine id
459 """
460 self.log.debug(
461 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
462 model_name, hostname, username
463 )
464 )
465
466 # Get controller
467 controller = await self.get_controller()
468
469 # Get model
470 model = await self.get_model(controller, model_name)
471
472 try:
473 # Get provisioner
474 provisioner = AsyncSSHProvisioner(
475 host=hostname,
476 user=username,
477 private_key_path=private_key_path,
478 log=self.log,
479 )
480
481 # Provision machine
482 params = await provisioner.provision_machine()
483
484 params.jobs = ["JobHostUnits"]
485
486 self.log.debug("Adding machine to model")
487 connection = model.connection()
488 client_facade = client.ClientFacade.from_connection(connection)
489
490 results = await client_facade.AddMachines(params=[params])
491 error = results.machines[0].error
492
493 if error:
494 msg = "Error adding machine: {}".format(error.message)
495 self.log.error(msg=msg)
496 raise ValueError(msg)
497
498 machine_id = results.machines[0].machine
499
500 self.log.debug("Installing Juju agent into machine {}".format(machine_id))
501 asyncio.ensure_future(
502 provisioner.install_agent(
503 connection=connection,
504 nonce=params.nonce,
505 machine_id=machine_id,
506 proxy=self.vca_connection.data.api_proxy,
507 series=params.series,
508 )
509 )
510
511 machine = None
512 for _ in range(10):
513 machine_list = await model.get_machines()
514 if machine_id in machine_list:
515 self.log.debug("Machine {} found in model!".format(machine_id))
516 machine = model.machines.get(machine_id)
517 break
518 await asyncio.sleep(2)
519
520 if machine is None:
521 msg = "Machine {} not found in model".format(machine_id)
522 self.log.error(msg=msg)
523 raise JujuMachineNotFound(msg)
524
525 self.log.debug(
526 "Wait until machine {} is ready in model {}".format(
527 machine.entity_id, model_name
528 )
529 )
530 await JujuModelWatcher.wait_for(
531 model=model,
532 entity=machine,
533 progress_timeout=progress_timeout,
534 total_timeout=total_timeout,
535 db_dict=db_dict,
536 n2vc=self.n2vc,
537 vca_id=self.vca_connection._vca_id,
538 )
539 except Exception as e:
540 raise e
541 finally:
542 await self.disconnect_model(model)
543 await self.disconnect_controller(controller)
544
545 self.log.debug(
546 "Machine provisioned {} in model {}".format(machine_id, model_name)
547 )
548
549 return machine_id
550
551 async def deploy(
552 self, uri: str, model_name: str, wait: bool = True, timeout: float = 3600
553 ):
554 """
555 Deploy bundle or charm: Similar to the juju CLI command `juju deploy`
556
557 :param: uri: Path or Charm Store uri in which the charm or bundle can be found
558 :param: model_name: Model name
559 :param: wait: Indicates whether to wait or not until all applications are active
560 :param: timeout: Time in seconds to wait until all applications are active
561 """
562 controller = await self.get_controller()
563 model = await self.get_model(controller, model_name)
564 try:
565 await model.deploy(uri, trust=True)
566 if wait:
567 await JujuModelWatcher.wait_for_model(model, timeout=timeout)
568 self.log.debug("All units active in model {}".format(model_name))
569 finally:
570 await self.disconnect_model(model)
571 await self.disconnect_controller(controller)
572
573 async def add_unit(
574 self,
575 application_name: str,
576 model_name: str,
577 machine_id: str,
578 db_dict: dict = None,
579 progress_timeout: float = None,
580 total_timeout: float = None,
581 ):
582 """Add unit
583
584 :param: application_name: Application name
585 :param: model_name: Model name
586 :param: machine_id Machine id
587 :param: db_dict: Dictionary with data of the DB to write the updates
588 :param: progress_timeout: Maximum time between two updates in the model
589 :param: total_timeout: Timeout for the entity to be active
590
591 :return: None
592 """
593
594 model = None
595 controller = await self.get_controller()
596 try:
597 model = await self.get_model(controller, model_name)
598 application = self._get_application(model, application_name)
599
600 if application is not None:
601
602 # Checks if the given machine id in the model,
603 # otherwise function raises an error
604 _machine, _series = self._get_machine_info(model, machine_id)
605
606 self.log.debug(
607 "Adding unit (machine {}) to application {} in model ~{}".format(
608 machine_id, application_name, model_name
609 )
610 )
611
612 await application.add_unit(to=machine_id)
613
614 await JujuModelWatcher.wait_for(
615 model=model,
616 entity=application,
617 progress_timeout=progress_timeout,
618 total_timeout=total_timeout,
619 db_dict=db_dict,
620 n2vc=self.n2vc,
621 vca_id=self.vca_connection._vca_id,
622 )
623 self.log.debug(
624 "Unit is added to application {} in model {}".format(
625 application_name, model_name
626 )
627 )
628 else:
629 raise JujuApplicationNotFound(
630 "Application {} not exists".format(application_name)
631 )
632 finally:
633 if model:
634 await self.disconnect_model(model)
635 await self.disconnect_controller(controller)
636
637 async def destroy_unit(
638 self,
639 application_name: str,
640 model_name: str,
641 machine_id: str,
642 total_timeout: float = None,
643 ):
644 """Destroy unit
645
646 :param: application_name: Application name
647 :param: model_name: Model name
648 :param: machine_id Machine id
649 :param: total_timeout: Timeout for the entity to be active
650
651 :return: None
652 """
653
654 model = None
655 controller = await self.get_controller()
656 try:
657 model = await self.get_model(controller, model_name)
658 application = self._get_application(model, application_name)
659
660 if application is None:
661 raise JujuApplicationNotFound(
662 "Application not found: {} (model={})".format(
663 application_name, model_name
664 )
665 )
666
667 unit = self._get_unit(application, machine_id)
668 if not unit:
669 raise JujuError(
670 "A unit with machine id {} not in available units".format(
671 machine_id
672 )
673 )
674
675 unit_name = unit.name
676
677 self.log.debug(
678 "Destroying unit {} from application {} in model {}".format(
679 unit_name, application_name, model_name
680 )
681 )
682 await application.destroy_unit(unit_name)
683
684 self.log.debug(
685 "Waiting for unit {} to be destroyed in application {} (model={})...".format(
686 unit_name, application_name, model_name
687 )
688 )
689
690 # TODO: Add functionality in the Juju watcher to replace this kind of blocks
691 if total_timeout is None:
692 total_timeout = 3600
693 end = time.time() + total_timeout
694 while time.time() < end:
695 if not self._get_unit(application, machine_id):
696 self.log.debug(
697 "The unit {} was destroyed in application {} (model={}) ".format(
698 unit_name, application_name, model_name
699 )
700 )
701 return
702 await asyncio.sleep(5)
703 self.log.debug(
704 "Unit {} is destroyed from application {} in model {}".format(
705 unit_name, application_name, model_name
706 )
707 )
708 finally:
709 if model:
710 await self.disconnect_model(model)
711 await self.disconnect_controller(controller)
712
713 async def deploy_charm(
714 self,
715 application_name: str,
716 path: str,
717 model_name: str,
718 machine_id: str,
719 db_dict: dict = None,
720 progress_timeout: float = None,
721 total_timeout: float = None,
722 config: dict = None,
723 series: str = None,
724 num_units: int = 1,
725 ):
726 """Deploy charm
727
728 :param: application_name: Application name
729 :param: path: Local path to the charm
730 :param: model_name: Model name
731 :param: machine_id ID of the machine
732 :param: db_dict: Dictionary with data of the DB to write the updates
733 :param: progress_timeout: Maximum time between two updates in the model
734 :param: total_timeout: Timeout for the entity to be active
735 :param: config: Config for the charm
736 :param: series: Series of the charm
737 :param: num_units: Number of units
738
739 :return: (juju.application.Application): Juju application
740 """
741 self.log.debug(
742 "Deploying charm {} to machine {} in model ~{}".format(
743 application_name, machine_id, model_name
744 )
745 )
746 self.log.debug("charm: {}".format(path))
747
748 # Get controller
749 controller = await self.get_controller()
750
751 # Get model
752 model = await self.get_model(controller, model_name)
753
754 try:
755 if application_name not in model.applications:
756
757 if machine_id is not None:
758 machine, series = self._get_machine_info(model, machine_id)
759
760 application = await model.deploy(
761 entity_url=path,
762 application_name=application_name,
763 channel="stable",
764 num_units=1,
765 series=series,
766 to=machine_id,
767 config=config,
768 )
769
770 self.log.debug(
771 "Wait until application {} is ready in model {}".format(
772 application_name, model_name
773 )
774 )
775 if num_units > 1:
776 for _ in range(num_units - 1):
777 m, _ = await self.create_machine(model_name, wait=False)
778 await application.add_unit(to=m.entity_id)
779
780 await JujuModelWatcher.wait_for(
781 model=model,
782 entity=application,
783 progress_timeout=progress_timeout,
784 total_timeout=total_timeout,
785 db_dict=db_dict,
786 n2vc=self.n2vc,
787 vca_id=self.vca_connection._vca_id,
788 )
789 self.log.debug(
790 "Application {} is ready in model {}".format(
791 application_name, model_name
792 )
793 )
794 else:
795 raise JujuApplicationExists(
796 "Application {} exists".format(application_name)
797 )
798 except juju.errors.JujuError as e:
799 if "already exists" in e.message:
800 raise JujuApplicationExists(
801 "Application {} exists".format(application_name)
802 )
803 else:
804 raise e
805 finally:
806 await self.disconnect_model(model)
807 await self.disconnect_controller(controller)
808
809 return application
810
811 async def scale_application(
812 self,
813 model_name: str,
814 application_name: str,
815 scale: int = 1,
816 total_timeout: float = None,
817 ):
818 """
819 Scale application (K8s)
820
821 :param: model_name: Model name
822 :param: application_name: Application name
823 :param: scale: Scale to which to set this application
824 :param: total_timeout: Timeout for the entity to be active
825 """
826
827 model = None
828 controller = await self.get_controller()
829 try:
830 model = await self.get_model(controller, model_name)
831
832 self.log.debug(
833 "Scaling application {} in model {}".format(
834 application_name, model_name
835 )
836 )
837 application = self._get_application(model, application_name)
838 if application is None:
839 raise JujuApplicationNotFound("Cannot scale application")
840 await application.scale(scale=scale)
841 # Wait until application is scaled in model
842 self.log.debug(
843 "Waiting for application {} to be scaled in model {}...".format(
844 application_name, model_name
845 )
846 )
847 if total_timeout is None:
848 total_timeout = 1800
849 end = time.time() + total_timeout
850 while time.time() < end:
851 application_scale = self._get_application_count(model, application_name)
852 # Before calling wait_for_model function,
853 # wait until application unit count and scale count are equal.
854 # Because there is a delay before scaling triggers in Juju model.
855 if application_scale == scale:
856 await JujuModelWatcher.wait_for_model(
857 model=model, timeout=total_timeout
858 )
859 self.log.debug(
860 "Application {} is scaled in model {}".format(
861 application_name, model_name
862 )
863 )
864 return
865 await asyncio.sleep(5)
866 raise Exception(
867 "Timeout waiting for application {} in model {} to be scaled".format(
868 application_name, model_name
869 )
870 )
871 finally:
872 if model:
873 await self.disconnect_model(model)
874 await self.disconnect_controller(controller)
875
876 def _get_application_count(self, model: Model, application_name: str) -> int:
877 """Get number of units of the application
878
879 :param: model: Model object
880 :param: application_name: Application name
881
882 :return: int (or None if application doesn't exist)
883 """
884 application = self._get_application(model, application_name)
885 if application is not None:
886 return len(application.units)
887
888 def _get_application(self, model: Model, application_name: str) -> Application:
889 """Get application
890
891 :param: model: Model object
892 :param: application_name: Application name
893
894 :return: juju.application.Application (or None if it doesn't exist)
895 """
896 if model.applications and application_name in model.applications:
897 return model.applications[application_name]
898
899 def _get_unit(self, application: Application, machine_id: str) -> Unit:
900 """Get unit
901
902 :param: application: Application object
903 :param: machine_id: Machine id
904
905 :return: Unit
906 """
907 unit = None
908 for u in application.units:
909 if u.machine_id == machine_id:
910 unit = u
911 break
912 return unit
913
914 def _get_machine_info(
915 self,
916 model,
917 machine_id: str,
918 ) -> (str, str):
919 """Get machine info
920
921 :param: model: Model object
922 :param: machine_id: Machine id
923
924 :return: (str, str): (machine, series)
925 """
926 if machine_id not in model.machines:
927 msg = "Machine {} not found in model".format(machine_id)
928 self.log.error(msg=msg)
929 raise JujuMachineNotFound(msg)
930 machine = model.machines[machine_id]
931 return machine, machine.series
932
933 async def execute_action(
934 self,
935 application_name: str,
936 model_name: str,
937 action_name: str,
938 db_dict: dict = None,
939 machine_id: str = None,
940 progress_timeout: float = None,
941 total_timeout: float = None,
942 **kwargs,
943 ):
944 """Execute action
945
946 :param: application_name: Application name
947 :param: model_name: Model name
948 :param: action_name: Name of the action
949 :param: db_dict: Dictionary with data of the DB to write the updates
950 :param: machine_id Machine id
951 :param: progress_timeout: Maximum time between two updates in the model
952 :param: total_timeout: Timeout for the entity to be active
953
954 :return: (str, str): (output and status)
955 """
956 self.log.debug(
957 "Executing action {} using params {}".format(action_name, kwargs)
958 )
959 # Get controller
960 controller = await self.get_controller()
961
962 # Get model
963 model = await self.get_model(controller, model_name)
964
965 try:
966 # Get application
967 application = self._get_application(
968 model,
969 application_name=application_name,
970 )
971 if application is None:
972 raise JujuApplicationNotFound("Cannot execute action")
973 # Racing condition:
974 # Ocassionally, self._get_leader_unit() will return None
975 # because the leader elected hook has not been triggered yet.
976 # Therefore, we are doing some retries. If it happens again,
977 # re-open bug 1236
978 if machine_id is None:
979 unit = await self._get_leader_unit(application)
980 self.log.debug(
981 "Action {} is being executed on the leader unit {}".format(
982 action_name, unit.name
983 )
984 )
985 else:
986 unit = self._get_unit(application, machine_id)
987 if not unit:
988 raise JujuError(
989 "A unit with machine id {} not in available units".format(
990 machine_id
991 )
992 )
993 self.log.debug(
994 "Action {} is being executed on {} unit".format(
995 action_name, unit.name
996 )
997 )
998
999 actions = await application.get_actions()
1000
1001 if action_name not in actions:
1002 raise JujuActionNotFound(
1003 "Action {} not in available actions".format(action_name)
1004 )
1005
1006 action = await unit.run_action(action_name, **kwargs)
1007
1008 self.log.debug(
1009 "Wait until action {} is completed in application {} (model={})".format(
1010 action_name, application_name, model_name
1011 )
1012 )
1013 await JujuModelWatcher.wait_for(
1014 model=model,
1015 entity=action,
1016 progress_timeout=progress_timeout,
1017 total_timeout=total_timeout,
1018 db_dict=db_dict,
1019 n2vc=self.n2vc,
1020 vca_id=self.vca_connection._vca_id,
1021 )
1022
1023 output = await model.get_action_output(action_uuid=action.entity_id)
1024 status = await model.get_action_status(uuid_or_prefix=action.entity_id)
1025 status = (
1026 status[action.entity_id] if action.entity_id in status else "failed"
1027 )
1028
1029 self.log.debug(
1030 "Action {} completed with status {} in application {} (model={})".format(
1031 action_name, action.status, application_name, model_name
1032 )
1033 )
1034 finally:
1035 await self.disconnect_model(model)
1036 await self.disconnect_controller(controller)
1037
1038 return output, status
1039
1040 async def get_actions(self, application_name: str, model_name: str) -> dict:
1041 """Get list of actions
1042
1043 :param: application_name: Application name
1044 :param: model_name: Model name
1045
1046 :return: Dict with this format
1047 {
1048 "action_name": "Description of the action",
1049 ...
1050 }
1051 """
1052 self.log.debug(
1053 "Getting list of actions for application {}".format(application_name)
1054 )
1055
1056 # Get controller
1057 controller = await self.get_controller()
1058
1059 # Get model
1060 model = await self.get_model(controller, model_name)
1061
1062 try:
1063 # Get application
1064 application = self._get_application(
1065 model,
1066 application_name=application_name,
1067 )
1068
1069 # Return list of actions
1070 return await application.get_actions()
1071
1072 finally:
1073 # Disconnect from model and controller
1074 await self.disconnect_model(model)
1075 await self.disconnect_controller(controller)
1076
1077 async def get_metrics(self, model_name: str, application_name: str) -> dict:
1078 """Get the metrics collected by the VCA.
1079
1080 :param model_name The name or unique id of the network service
1081 :param application_name The name of the application
1082 """
1083 if not model_name or not application_name:
1084 raise Exception("model_name and application_name must be non-empty strings")
1085 metrics = {}
1086 controller = await self.get_controller()
1087 model = await self.get_model(controller, model_name)
1088 try:
1089 application = self._get_application(model, application_name)
1090 if application is not None:
1091 metrics = await application.get_metrics()
1092 finally:
1093 self.disconnect_model(model)
1094 self.disconnect_controller(controller)
1095 return metrics
1096
1097 async def add_relation(
1098 self,
1099 model_name: str,
1100 endpoint_1: str,
1101 endpoint_2: str,
1102 ):
1103 """Add relation
1104
1105 :param: model_name: Model name
1106 :param: endpoint_1 First endpoint name
1107 ("app:endpoint" format or directly the saas name)
1108 :param: endpoint_2: Second endpoint name (^ same format)
1109 """
1110
1111 self.log.debug("Adding relation: {} -> {}".format(endpoint_1, endpoint_2))
1112
1113 # Get controller
1114 controller = await self.get_controller()
1115
1116 # Get model
1117 model = await self.get_model(controller, model_name)
1118
1119 # Add relation
1120 try:
1121 await model.add_relation(endpoint_1, endpoint_2)
1122 except juju.errors.JujuAPIError as e:
1123 if "not found" in e.message:
1124 self.log.warning("Relation not found: {}".format(e.message))
1125 return
1126 if "already exists" in e.message:
1127 self.log.warning("Relation already exists: {}".format(e.message))
1128 return
1129 # another exception, raise it
1130 raise e
1131 finally:
1132 await self.disconnect_model(model)
1133 await self.disconnect_controller(controller)
1134
1135 async def offer(self, endpoint: RelationEndpoint) -> Offer:
1136 """
1137 Create an offer from a RelationEndpoint
1138
1139 :param: endpoint: Relation endpoint
1140
1141 :return: Offer object
1142 """
1143 model_name = endpoint.model_name
1144 offer_name = f"{endpoint.application_name}-{endpoint.endpoint_name}"
1145 controller = await self.get_controller()
1146 model = None
1147 try:
1148 model = await self.get_model(controller, model_name)
1149 await model.create_offer(endpoint.endpoint, offer_name=offer_name)
1150 offer_list = await self._list_offers(model_name, offer_name=offer_name)
1151 if offer_list:
1152 return Offer(offer_list[0].offer_url)
1153 else:
1154 raise Exception("offer was not created")
1155 except juju.errors.JujuError as e:
1156 if "application offer already exists" not in e.message:
1157 raise e
1158 finally:
1159 if model:
1160 self.disconnect_model(model)
1161 self.disconnect_controller(controller)
1162
1163 async def consume(
1164 self,
1165 model_name: str,
1166 offer: Offer,
1167 provider_libjuju: "Libjuju",
1168 ) -> str:
1169 """
1170 Consumes a remote offer in the model. Relations can be created later using "juju relate".
1171
1172 :param: model_name: Model name
1173 :param: offer: Offer object to consume
1174 :param: provider_libjuju: Libjuju object of the provider endpoint
1175
1176 :raises ParseError if there's a problem parsing the offer_url
1177 :raises JujuError if remote offer includes and endpoint
1178 :raises JujuAPIError if the operation is not successful
1179
1180 :returns: Saas name. It is the application name in the model that reference the remote application.
1181 """
1182 saas_name = f'{offer.name}-{offer.model_name.replace("-", "")}'
1183 if offer.vca_id:
1184 saas_name = f"{saas_name}-{offer.vca_id}"
1185 controller = await self.get_controller()
1186 model = None
1187 provider_controller = None
1188 try:
1189 model = await controller.get_model(model_name)
1190 provider_controller = await provider_libjuju.get_controller()
1191 await model.consume(
1192 offer.url, application_alias=saas_name, controller=provider_controller
1193 )
1194 return saas_name
1195 finally:
1196 if model:
1197 await self.disconnect_model(model)
1198 if provider_controller:
1199 await provider_libjuju.disconnect_controller(provider_controller)
1200 await self.disconnect_controller(controller)
1201
1202 async def destroy_model(self, model_name: str, total_timeout: float = 1800):
1203 """
1204 Destroy model
1205
1206 :param: model_name: Model name
1207 :param: total_timeout: Timeout
1208 """
1209
1210 controller = await self.get_controller()
1211 model = None
1212 try:
1213 if not await self.model_exists(model_name, controller=controller):
1214 return
1215
1216 self.log.debug("Destroying model {}".format(model_name))
1217
1218 model = await self.get_model(controller, model_name)
1219 # Destroy machines that are manually provisioned
1220 # and still are in pending state
1221 await self._destroy_pending_machines(model, only_manual=True)
1222 await self.disconnect_model(model)
1223
1224 await self._destroy_model(
1225 model_name,
1226 controller,
1227 timeout=total_timeout,
1228 )
1229 except Exception as e:
1230 if not await self.model_exists(model_name, controller=controller):
1231 return
1232 raise e
1233 finally:
1234 if model:
1235 await self.disconnect_model(model)
1236 await self.disconnect_controller(controller)
1237
1238 async def _destroy_model(
1239 self, model_name: str, controller: Controller, timeout: float = 1800
1240 ):
1241 """
1242 Destroy model from controller
1243
1244 :param: model: Model name to be removed
1245 :param: controller: Controller object
1246 :param: timeout: Timeout in seconds
1247 """
1248
1249 async def _destroy_model_loop(model_name: str, controller: Controller):
1250 while await self.model_exists(model_name, controller=controller):
1251 await controller.destroy_model(
1252 model_name, destroy_storage=True, force=True, max_wait=0
1253 )
1254 await asyncio.sleep(5)
1255
1256 try:
1257 await asyncio.wait_for(
1258 _destroy_model_loop(model_name, controller), timeout=timeout
1259 )
1260 except asyncio.TimeoutError:
1261 raise Exception(
1262 "Timeout waiting for model {} to be destroyed".format(model_name)
1263 )
1264 except juju.errors.JujuError as e:
1265 if any("has been removed" in error for error in e.errors):
1266 return
1267 raise e
1268
1269 async def destroy_application(
1270 self, model_name: str, application_name: str, total_timeout: float
1271 ):
1272 """
1273 Destroy application
1274
1275 :param: model_name: Model name
1276 :param: application_name: Application name
1277 :param: total_timeout: Timeout
1278 """
1279
1280 controller = await self.get_controller()
1281 model = None
1282
1283 try:
1284 model = await self.get_model(controller, model_name)
1285 self.log.debug(
1286 "Destroying application {} in model {}".format(
1287 application_name, model_name
1288 )
1289 )
1290 application = self._get_application(model, application_name)
1291 if application:
1292 await application.destroy()
1293 else:
1294 self.log.warning("Application not found: {}".format(application_name))
1295
1296 self.log.debug(
1297 "Waiting for application {} to be destroyed in model {}...".format(
1298 application_name, model_name
1299 )
1300 )
1301 if total_timeout is None:
1302 total_timeout = 3600
1303 end = time.time() + total_timeout
1304 while time.time() < end:
1305 if not self._get_application(model, application_name):
1306 self.log.debug(
1307 "The application {} was destroyed in model {} ".format(
1308 application_name, model_name
1309 )
1310 )
1311 return
1312 await asyncio.sleep(5)
1313 raise Exception(
1314 "Timeout waiting for application {} to be destroyed in model {}".format(
1315 application_name, model_name
1316 )
1317 )
1318 finally:
1319 if model is not None:
1320 await self.disconnect_model(model)
1321 await self.disconnect_controller(controller)
1322
1323 async def _destroy_pending_machines(self, model: Model, only_manual: bool = False):
1324 """
1325 Destroy pending machines in a given model
1326
1327 :param: only_manual: Bool that indicates only manually provisioned
1328 machines should be destroyed (if True), or that
1329 all pending machines should be destroyed
1330 """
1331 status = await model.get_status()
1332 for machine_id in status.machines:
1333 machine_status = status.machines[machine_id]
1334 if machine_status.agent_status.status == "pending":
1335 if only_manual and not machine_status.instance_id.startswith("manual:"):
1336 break
1337 machine = model.machines[machine_id]
1338 await machine.destroy(force=True)
1339
1340 async def configure_application(
1341 self, model_name: str, application_name: str, config: dict = None
1342 ):
1343 """Configure application
1344
1345 :param: model_name: Model name
1346 :param: application_name: Application name
1347 :param: config: Config to apply to the charm
1348 """
1349 self.log.debug("Configuring application {}".format(application_name))
1350
1351 if config:
1352 controller = await self.get_controller()
1353 model = None
1354 try:
1355 model = await self.get_model(controller, model_name)
1356 application = self._get_application(
1357 model,
1358 application_name=application_name,
1359 )
1360 await application.set_config(config)
1361 finally:
1362 if model:
1363 await self.disconnect_model(model)
1364 await self.disconnect_controller(controller)
1365
1366 def handle_exception(self, loop, context):
1367 # All unhandled exceptions by libjuju are handled here.
1368 pass
1369
1370 async def health_check(self, interval: float = 300.0):
1371 """
1372 Health check to make sure controller and controller_model connections are OK
1373
1374 :param: interval: Time in seconds between checks
1375 """
1376 controller = None
1377 while True:
1378 try:
1379 controller = await self.get_controller()
1380 # self.log.debug("VCA is alive")
1381 except Exception as e:
1382 self.log.error("Health check to VCA failed: {}".format(e))
1383 finally:
1384 await self.disconnect_controller(controller)
1385 await asyncio.sleep(interval)
1386
1387 async def list_models(self, contains: str = None) -> [str]:
1388 """List models with certain names
1389
1390 :param: contains: String that is contained in model name
1391
1392 :retur: [models] Returns list of model names
1393 """
1394
1395 controller = await self.get_controller()
1396 try:
1397 models = await controller.list_models()
1398 if contains:
1399 models = [model for model in models if contains in model]
1400 return models
1401 finally:
1402 await self.disconnect_controller(controller)
1403
1404 async def _list_offers(
1405 self, model_name: str, offer_name: str = None
1406 ) -> QueryApplicationOffersResults:
1407 """
1408 List offers within a model
1409
1410 :param: model_name: Model name
1411 :param: offer_name: Offer name to filter.
1412
1413 :return: Returns application offers results in the model
1414 """
1415
1416 controller = await self.get_controller()
1417 try:
1418 offers = (await controller.list_offers(model_name)).results
1419 if offer_name:
1420 matching_offer = []
1421 for offer in offers:
1422 if offer.offer_name == offer_name:
1423 matching_offer.append(offer)
1424 break
1425 offers = matching_offer
1426 return offers
1427 finally:
1428 await self.disconnect_controller(controller)
1429
1430 async def add_k8s(
1431 self,
1432 name: str,
1433 rbac_id: str,
1434 token: str,
1435 client_cert_data: str,
1436 configuration: Configuration,
1437 storage_class: str,
1438 credential_name: str = None,
1439 ):
1440 """
1441 Add a Kubernetes cloud to the controller
1442
1443 Similar to the `juju add-k8s` command in the CLI
1444
1445 :param: name: Name for the K8s cloud
1446 :param: configuration: Kubernetes configuration object
1447 :param: storage_class: Storage Class to use in the cloud
1448 :param: credential_name: Storage Class to use in the cloud
1449 """
1450
1451 if not storage_class:
1452 raise Exception("storage_class must be a non-empty string")
1453 if not name:
1454 raise Exception("name must be a non-empty string")
1455 if not configuration:
1456 raise Exception("configuration must be provided")
1457
1458 endpoint = configuration.host
1459 credential = self.get_k8s_cloud_credential(
1460 configuration,
1461 client_cert_data,
1462 token,
1463 )
1464 credential.attrs[RBAC_LABEL_KEY_NAME] = rbac_id
1465 cloud = client.Cloud(
1466 type_="kubernetes",
1467 auth_types=[credential.auth_type],
1468 endpoint=endpoint,
1469 ca_certificates=[client_cert_data],
1470 config={
1471 "operator-storage": storage_class,
1472 "workload-storage": storage_class,
1473 },
1474 )
1475
1476 return await self.add_cloud(
1477 name, cloud, credential, credential_name=credential_name
1478 )
1479
1480 def get_k8s_cloud_credential(
1481 self,
1482 configuration: Configuration,
1483 client_cert_data: str,
1484 token: str = None,
1485 ) -> client.CloudCredential:
1486 attrs = {}
1487 # TODO: Test with AKS
1488 key = None # open(configuration.key_file, "r").read()
1489 username = configuration.username
1490 password = configuration.password
1491
1492 if client_cert_data:
1493 attrs["ClientCertificateData"] = client_cert_data
1494 if key:
1495 attrs["ClientKeyData"] = key
1496 if token:
1497 if username or password:
1498 raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass")
1499 attrs["Token"] = token
1500
1501 auth_type = None
1502 if key:
1503 auth_type = "oauth2"
1504 if client_cert_data:
1505 auth_type = "oauth2withcert"
1506 if not token:
1507 raise JujuInvalidK8sConfiguration(
1508 "missing token for auth type {}".format(auth_type)
1509 )
1510 elif username:
1511 if not password:
1512 self.log.debug(
1513 "credential for user {} has empty password".format(username)
1514 )
1515 attrs["username"] = username
1516 attrs["password"] = password
1517 if client_cert_data:
1518 auth_type = "userpasswithcert"
1519 else:
1520 auth_type = "userpass"
1521 elif client_cert_data and token:
1522 auth_type = "certificate"
1523 else:
1524 raise JujuInvalidK8sConfiguration("authentication method not supported")
1525 return client.CloudCredential(auth_type=auth_type, attrs=attrs)
1526
1527 async def add_cloud(
1528 self,
1529 name: str,
1530 cloud: Cloud,
1531 credential: CloudCredential = None,
1532 credential_name: str = None,
1533 ) -> Cloud:
1534 """
1535 Add cloud to the controller
1536
1537 :param: name: Name of the cloud to be added
1538 :param: cloud: Cloud object
1539 :param: credential: CloudCredentials object for the cloud
1540 :param: credential_name: Credential name.
1541 If not defined, cloud of the name will be used.
1542 """
1543 controller = await self.get_controller()
1544 try:
1545 _ = await controller.add_cloud(name, cloud)
1546 if credential:
1547 await controller.add_credential(
1548 credential_name or name, credential=credential, cloud=name
1549 )
1550 # Need to return the object returned by the controller.add_cloud() function
1551 # I'm returning the original value now until this bug is fixed:
1552 # https://github.com/juju/python-libjuju/issues/443
1553 return cloud
1554 finally:
1555 await self.disconnect_controller(controller)
1556
1557 async def remove_cloud(self, name: str):
1558 """
1559 Remove cloud
1560
1561 :param: name: Name of the cloud to be removed
1562 """
1563 controller = await self.get_controller()
1564 try:
1565 await controller.remove_cloud(name)
1566 except juju.errors.JujuError as e:
1567 if len(e.errors) == 1 and f'cloud "{name}" not found' == e.errors[0]:
1568 self.log.warning(f"Cloud {name} not found, so it could not be deleted.")
1569 else:
1570 raise e
1571 finally:
1572 await self.disconnect_controller(controller)
1573
1574 @retry(attempts=20, delay=5, fallback=JujuLeaderUnitNotFound())
1575 async def _get_leader_unit(self, application: Application) -> Unit:
1576 unit = None
1577 for u in application.units:
1578 if await u.is_leader_from_status():
1579 unit = u
1580 break
1581 if not unit:
1582 raise Exception()
1583 return unit
1584
1585 async def get_cloud_credentials(self, cloud: Cloud) -> typing.List:
1586 """
1587 Get cloud credentials
1588
1589 :param: cloud: Cloud object. The returned credentials will be from this cloud.
1590
1591 :return: List of credentials object associated to the specified cloud
1592
1593 """
1594 controller = await self.get_controller()
1595 try:
1596 facade = client.CloudFacade.from_connection(controller.connection())
1597 cloud_cred_tag = tag.credential(
1598 cloud.name, self.vca_connection.data.user, cloud.credential_name
1599 )
1600 params = [client.Entity(cloud_cred_tag)]
1601 return (await facade.Credential(params)).results
1602 finally:
1603 await self.disconnect_controller(controller)
1604
1605 async def check_application_exists(self, model_name, application_name) -> bool:
1606 """Check application exists
1607
1608 :param: model_name: Model Name
1609 :param: application_name: Application Name
1610
1611 :return: Boolean
1612 """
1613
1614 model = None
1615 controller = await self.get_controller()
1616 try:
1617 model = await self.get_model(controller, model_name)
1618 self.log.debug(
1619 "Checking if application {} exists in model {}".format(
1620 application_name, model_name
1621 )
1622 )
1623 return self._get_application(model, application_name) is not None
1624 finally:
1625 if model:
1626 await self.disconnect_model(model)
1627 await self.disconnect_controller(controller)