Skip exception if model doesn't exist after delete failure
[osm/N2VC.git] / n2vc / libjuju.py
1 # Copyright 2020 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import logging
17 import typing
18
19 import time
20
21 import juju.errors
22 from juju.model import Model
23 from juju.machine import Machine
24 from juju.application import Application
25 from juju.unit import Unit
26 from juju.client._definitions import (
27 FullStatus,
28 QueryApplicationOffersResults,
29 Cloud,
30 CloudCredential,
31 )
32 from juju.controller import Controller
33 from juju.client import client
34 from juju import tag
35
36 from n2vc.juju_watcher import JujuModelWatcher
37 from n2vc.provisioner import AsyncSSHProvisioner
38 from n2vc.n2vc_conn import N2VCConnector
39 from n2vc.exceptions import (
40 JujuMachineNotFound,
41 JujuApplicationNotFound,
42 JujuLeaderUnitNotFound,
43 JujuActionNotFound,
44 JujuControllerFailedConnecting,
45 JujuApplicationExists,
46 JujuInvalidK8sConfiguration,
47 JujuError,
48 )
49 from n2vc.vca.cloud import Cloud as VcaCloud
50 from n2vc.vca.connection import Connection
51 from kubernetes.client.configuration import Configuration
52 from retrying_async import retry
53
54
55 RBAC_LABEL_KEY_NAME = "rbac-id"
56
57
58 class Libjuju:
59 def __init__(
60 self,
61 vca_connection: Connection,
62 loop: asyncio.AbstractEventLoop = None,
63 log: logging.Logger = None,
64 n2vc: N2VCConnector = None,
65 ):
66 """
67 Constructor
68
69 :param: vca_connection: n2vc.vca.connection object
70 :param: loop: Asyncio loop
71 :param: log: Logger
72 :param: n2vc: N2VC object
73 """
74
75 self.log = log or logging.getLogger("Libjuju")
76 self.n2vc = n2vc
77 self.vca_connection = vca_connection
78
79 self.loop = loop or asyncio.get_event_loop()
80 self.loop.set_exception_handler(self.handle_exception)
81 self.creating_model = asyncio.Lock(loop=self.loop)
82
83 if self.vca_connection.is_default:
84 self.health_check_task = self._create_health_check_task()
85
86 def _create_health_check_task(self):
87 return self.loop.create_task(self.health_check())
88
89 async def get_controller(self, timeout: float = 60.0) -> Controller:
90 """
91 Get controller
92
93 :param: timeout: Time in seconds to wait for controller to connect
94 """
95 controller = None
96 try:
97 controller = Controller()
98 await asyncio.wait_for(
99 controller.connect(
100 endpoint=self.vca_connection.data.endpoints,
101 username=self.vca_connection.data.user,
102 password=self.vca_connection.data.secret,
103 cacert=self.vca_connection.data.cacert,
104 ),
105 timeout=timeout,
106 )
107 if self.vca_connection.is_default:
108 endpoints = await controller.api_endpoints
109 if not all(
110 endpoint in self.vca_connection.endpoints for endpoint in endpoints
111 ):
112 await self.vca_connection.update_endpoints(endpoints)
113 return controller
114 except asyncio.CancelledError as e:
115 raise e
116 except Exception as e:
117 self.log.error(
118 "Failed connecting to controller: {}... {}".format(
119 self.vca_connection.data.endpoints, e
120 )
121 )
122 if controller:
123 await self.disconnect_controller(controller)
124 raise JujuControllerFailedConnecting(e)
125
126 async def disconnect(self):
127 """Disconnect"""
128 # Cancel health check task
129 self.health_check_task.cancel()
130 self.log.debug("Libjuju disconnected!")
131
132 async def disconnect_model(self, model: Model):
133 """
134 Disconnect model
135
136 :param: model: Model that will be disconnected
137 """
138 await model.disconnect()
139
140 async def disconnect_controller(self, controller: Controller):
141 """
142 Disconnect controller
143
144 :param: controller: Controller that will be disconnected
145 """
146 if controller:
147 await controller.disconnect()
148
149 @retry(attempts=3, delay=5, timeout=None)
150 async def add_model(self, model_name: str, cloud: VcaCloud):
151 """
152 Create model
153
154 :param: model_name: Model name
155 :param: cloud: Cloud object
156 """
157
158 # Get controller
159 controller = await self.get_controller()
160 model = None
161 try:
162 # Block until other workers have finished model creation
163 while self.creating_model.locked():
164 await asyncio.sleep(0.1)
165
166 # Create the model
167 async with self.creating_model:
168 if await self.model_exists(model_name, controller=controller):
169 return
170 self.log.debug("Creating model {}".format(model_name))
171 model = await controller.add_model(
172 model_name,
173 config=self.vca_connection.data.model_config,
174 cloud_name=cloud.name,
175 credential_name=cloud.credential_name,
176 )
177 except juju.errors.JujuAPIError as e:
178 if "already exists" in e.message:
179 pass
180 else:
181 raise e
182 finally:
183 if model:
184 await self.disconnect_model(model)
185 await self.disconnect_controller(controller)
186
187 async def get_executed_actions(self, model_name: str) -> list:
188 """
189 Get executed/history of actions for a model.
190
191 :param: model_name: Model name, str.
192 :return: List of executed actions for a model.
193 """
194 model = None
195 executed_actions = []
196 controller = await self.get_controller()
197 try:
198 model = await self.get_model(controller, model_name)
199 # Get all unique action names
200 actions = {}
201 for application in model.applications:
202 application_actions = await self.get_actions(application, model_name)
203 actions.update(application_actions)
204 # Get status of all actions
205 for application_action in actions:
206 app_action_status_list = await model.get_action_status(
207 name=application_action
208 )
209 for action_id, action_status in app_action_status_list.items():
210 executed_action = {
211 "id": action_id,
212 "action": application_action,
213 "status": action_status,
214 }
215 # Get action output by id
216 action_status = await model.get_action_output(executed_action["id"])
217 for k, v in action_status.items():
218 executed_action[k] = v
219 executed_actions.append(executed_action)
220 except Exception as e:
221 raise JujuError(
222 "Error in getting executed actions for model: {}. Error: {}".format(
223 model_name, str(e)
224 )
225 )
226 finally:
227 if model:
228 await self.disconnect_model(model)
229 await self.disconnect_controller(controller)
230 return executed_actions
231
232 async def get_application_configs(
233 self, model_name: str, application_name: str
234 ) -> dict:
235 """
236 Get available configs for an application.
237
238 :param: model_name: Model name, str.
239 :param: application_name: Application name, str.
240
241 :return: A dict which has key - action name, value - action description
242 """
243 model = None
244 application_configs = {}
245 controller = await self.get_controller()
246 try:
247 model = await self.get_model(controller, model_name)
248 application = self._get_application(
249 model, application_name=application_name
250 )
251 application_configs = await application.get_config()
252 except Exception as e:
253 raise JujuError(
254 "Error in getting configs for application: {} in model: {}. Error: {}".format(
255 application_name, model_name, str(e)
256 )
257 )
258 finally:
259 if model:
260 await self.disconnect_model(model)
261 await self.disconnect_controller(controller)
262 return application_configs
263
264 @retry(attempts=3, delay=5)
265 async def get_model(self, controller: Controller, model_name: str) -> Model:
266 """
267 Get model from controller
268
269 :param: controller: Controller
270 :param: model_name: Model name
271
272 :return: Model: The created Juju model object
273 """
274 return await controller.get_model(model_name)
275
276 async def model_exists(
277 self, model_name: str, controller: Controller = None
278 ) -> bool:
279 """
280 Check if model exists
281
282 :param: controller: Controller
283 :param: model_name: Model name
284
285 :return bool
286 """
287 need_to_disconnect = False
288
289 # Get controller if not passed
290 if not controller:
291 controller = await self.get_controller()
292 need_to_disconnect = True
293
294 # Check if model exists
295 try:
296 return model_name in await controller.list_models()
297 finally:
298 if need_to_disconnect:
299 await self.disconnect_controller(controller)
300
301 async def models_exist(self, model_names: [str]) -> (bool, list):
302 """
303 Check if models exists
304
305 :param: model_names: List of strings with model names
306
307 :return (bool, list[str]): (True if all models exists, List of model names that don't exist)
308 """
309 if not model_names:
310 raise Exception(
311 "model_names must be a non-empty array. Given value: {}".format(
312 model_names
313 )
314 )
315 non_existing_models = []
316 models = await self.list_models()
317 existing_models = list(set(models).intersection(model_names))
318 non_existing_models = list(set(model_names) - set(existing_models))
319
320 return (
321 len(non_existing_models) == 0,
322 non_existing_models,
323 )
324
325 async def get_model_status(self, model_name: str) -> FullStatus:
326 """
327 Get model status
328
329 :param: model_name: Model name
330
331 :return: Full status object
332 """
333 controller = await self.get_controller()
334 model = await self.get_model(controller, model_name)
335 try:
336 return await model.get_status()
337 finally:
338 await self.disconnect_model(model)
339 await self.disconnect_controller(controller)
340
341 async def create_machine(
342 self,
343 model_name: str,
344 machine_id: str = None,
345 db_dict: dict = None,
346 progress_timeout: float = None,
347 total_timeout: float = None,
348 series: str = "bionic",
349 wait: bool = True,
350 ) -> (Machine, bool):
351 """
352 Create machine
353
354 :param: model_name: Model name
355 :param: machine_id: Machine id
356 :param: db_dict: Dictionary with data of the DB to write the updates
357 :param: progress_timeout: Maximum time between two updates in the model
358 :param: total_timeout: Timeout for the entity to be active
359 :param: series: Series of the machine (xenial, bionic, focal, ...)
360 :param: wait: Wait until machine is ready
361
362 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
363 if the machine is new or it already existed
364 """
365 new = False
366 machine = None
367
368 self.log.debug(
369 "Creating machine (id={}) in model: {}".format(machine_id, model_name)
370 )
371
372 # Get controller
373 controller = await self.get_controller()
374
375 # Get model
376 model = await self.get_model(controller, model_name)
377 try:
378 if machine_id is not None:
379 self.log.debug(
380 "Searching machine (id={}) in model {}".format(
381 machine_id, model_name
382 )
383 )
384
385 # Get machines from model and get the machine with machine_id if exists
386 machines = await model.get_machines()
387 if machine_id in machines:
388 self.log.debug(
389 "Machine (id={}) found in model {}".format(
390 machine_id, model_name
391 )
392 )
393 machine = machines[machine_id]
394 else:
395 raise JujuMachineNotFound("Machine {} not found".format(machine_id))
396
397 if machine is None:
398 self.log.debug("Creating a new machine in model {}".format(model_name))
399
400 # Create machine
401 machine = await model.add_machine(
402 spec=None, constraints=None, disks=None, series=series
403 )
404 new = True
405
406 # Wait until the machine is ready
407 self.log.debug(
408 "Wait until machine {} is ready in model {}".format(
409 machine.entity_id, model_name
410 )
411 )
412 if wait:
413 await JujuModelWatcher.wait_for(
414 model=model,
415 entity=machine,
416 progress_timeout=progress_timeout,
417 total_timeout=total_timeout,
418 db_dict=db_dict,
419 n2vc=self.n2vc,
420 vca_id=self.vca_connection._vca_id,
421 )
422 finally:
423 await self.disconnect_model(model)
424 await self.disconnect_controller(controller)
425
426 self.log.debug(
427 "Machine {} ready at {} in model {}".format(
428 machine.entity_id, machine.dns_name, model_name
429 )
430 )
431 return machine, new
432
433 async def provision_machine(
434 self,
435 model_name: str,
436 hostname: str,
437 username: str,
438 private_key_path: str,
439 db_dict: dict = None,
440 progress_timeout: float = None,
441 total_timeout: float = None,
442 ) -> str:
443 """
444 Manually provisioning of a machine
445
446 :param: model_name: Model name
447 :param: hostname: IP to access the machine
448 :param: username: Username to login to the machine
449 :param: private_key_path: Local path for the private key
450 :param: db_dict: Dictionary with data of the DB to write the updates
451 :param: progress_timeout: Maximum time between two updates in the model
452 :param: total_timeout: Timeout for the entity to be active
453
454 :return: (Entity): Machine id
455 """
456 self.log.debug(
457 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
458 model_name, hostname, username
459 )
460 )
461
462 # Get controller
463 controller = await self.get_controller()
464
465 # Get model
466 model = await self.get_model(controller, model_name)
467
468 try:
469 # Get provisioner
470 provisioner = AsyncSSHProvisioner(
471 host=hostname,
472 user=username,
473 private_key_path=private_key_path,
474 log=self.log,
475 )
476
477 # Provision machine
478 params = await provisioner.provision_machine()
479
480 params.jobs = ["JobHostUnits"]
481
482 self.log.debug("Adding machine to model")
483 connection = model.connection()
484 client_facade = client.ClientFacade.from_connection(connection)
485
486 results = await client_facade.AddMachines(params=[params])
487 error = results.machines[0].error
488
489 if error:
490 msg = "Error adding machine: {}".format(error.message)
491 self.log.error(msg=msg)
492 raise ValueError(msg)
493
494 machine_id = results.machines[0].machine
495
496 self.log.debug("Installing Juju agent into machine {}".format(machine_id))
497 asyncio.ensure_future(
498 provisioner.install_agent(
499 connection=connection,
500 nonce=params.nonce,
501 machine_id=machine_id,
502 proxy=self.vca_connection.data.api_proxy,
503 series=params.series,
504 )
505 )
506
507 machine = None
508 for _ in range(10):
509 machine_list = await model.get_machines()
510 if machine_id in machine_list:
511 self.log.debug("Machine {} found in model!".format(machine_id))
512 machine = model.machines.get(machine_id)
513 break
514 await asyncio.sleep(2)
515
516 if machine is None:
517 msg = "Machine {} not found in model".format(machine_id)
518 self.log.error(msg=msg)
519 raise JujuMachineNotFound(msg)
520
521 self.log.debug(
522 "Wait until machine {} is ready in model {}".format(
523 machine.entity_id, model_name
524 )
525 )
526 await JujuModelWatcher.wait_for(
527 model=model,
528 entity=machine,
529 progress_timeout=progress_timeout,
530 total_timeout=total_timeout,
531 db_dict=db_dict,
532 n2vc=self.n2vc,
533 vca_id=self.vca_connection._vca_id,
534 )
535 except Exception as e:
536 raise e
537 finally:
538 await self.disconnect_model(model)
539 await self.disconnect_controller(controller)
540
541 self.log.debug(
542 "Machine provisioned {} in model {}".format(machine_id, model_name)
543 )
544
545 return machine_id
546
547 async def deploy(
548 self, uri: str, model_name: str, wait: bool = True, timeout: float = 3600
549 ):
550 """
551 Deploy bundle or charm: Similar to the juju CLI command `juju deploy`
552
553 :param: uri: Path or Charm Store uri in which the charm or bundle can be found
554 :param: model_name: Model name
555 :param: wait: Indicates whether to wait or not until all applications are active
556 :param: timeout: Time in seconds to wait until all applications are active
557 """
558 controller = await self.get_controller()
559 model = await self.get_model(controller, model_name)
560 try:
561 await model.deploy(uri, trust=True)
562 if wait:
563 await JujuModelWatcher.wait_for_model(model, timeout=timeout)
564 self.log.debug("All units active in model {}".format(model_name))
565 finally:
566 await self.disconnect_model(model)
567 await self.disconnect_controller(controller)
568
569 async def add_unit(
570 self,
571 application_name: str,
572 model_name: str,
573 machine_id: str,
574 db_dict: dict = None,
575 progress_timeout: float = None,
576 total_timeout: float = None,
577 ):
578 """Add unit
579
580 :param: application_name: Application name
581 :param: model_name: Model name
582 :param: machine_id Machine id
583 :param: db_dict: Dictionary with data of the DB to write the updates
584 :param: progress_timeout: Maximum time between two updates in the model
585 :param: total_timeout: Timeout for the entity to be active
586
587 :return: None
588 """
589
590 model = None
591 controller = await self.get_controller()
592 try:
593 model = await self.get_model(controller, model_name)
594 application = self._get_application(model, application_name)
595
596 if application is not None:
597
598 # Checks if the given machine id in the model,
599 # otherwise function raises an error
600 _machine, _series = self._get_machine_info(model, machine_id)
601
602 self.log.debug(
603 "Adding unit (machine {}) to application {} in model ~{}".format(
604 machine_id, application_name, model_name
605 )
606 )
607
608 await application.add_unit(to=machine_id)
609
610 await JujuModelWatcher.wait_for(
611 model=model,
612 entity=application,
613 progress_timeout=progress_timeout,
614 total_timeout=total_timeout,
615 db_dict=db_dict,
616 n2vc=self.n2vc,
617 vca_id=self.vca_connection._vca_id,
618 )
619 self.log.debug(
620 "Unit is added to application {} in model {}".format(
621 application_name, model_name
622 )
623 )
624 else:
625 raise JujuApplicationNotFound(
626 "Application {} not exists".format(application_name)
627 )
628 finally:
629 if model:
630 await self.disconnect_model(model)
631 await self.disconnect_controller(controller)
632
633 async def destroy_unit(
634 self,
635 application_name: str,
636 model_name: str,
637 machine_id: str,
638 total_timeout: float = None,
639 ):
640 """Destroy unit
641
642 :param: application_name: Application name
643 :param: model_name: Model name
644 :param: machine_id Machine id
645 :param: total_timeout: Timeout for the entity to be active
646
647 :return: None
648 """
649
650 model = None
651 controller = await self.get_controller()
652 try:
653 model = await self.get_model(controller, model_name)
654 application = self._get_application(model, application_name)
655
656 if application is None:
657 raise JujuApplicationNotFound(
658 "Application not found: {} (model={})".format(
659 application_name, model_name
660 )
661 )
662
663 unit = self._get_unit(application, machine_id)
664 if not unit:
665 raise JujuError(
666 "A unit with machine id {} not in available units".format(
667 machine_id
668 )
669 )
670
671 unit_name = unit.name
672
673 self.log.debug(
674 "Destroying unit {} from application {} in model {}".format(
675 unit_name, application_name, model_name
676 )
677 )
678 await application.destroy_unit(unit_name)
679
680 self.log.debug(
681 "Waiting for unit {} to be destroyed in application {} (model={})...".format(
682 unit_name, application_name, model_name
683 )
684 )
685
686 # TODO: Add functionality in the Juju watcher to replace this kind of blocks
687 if total_timeout is None:
688 total_timeout = 3600
689 end = time.time() + total_timeout
690 while time.time() < end:
691 if not self._get_unit(application, machine_id):
692 self.log.debug(
693 "The unit {} was destroyed in application {} (model={}) ".format(
694 unit_name, application_name, model_name
695 )
696 )
697 return
698 await asyncio.sleep(5)
699 self.log.debug(
700 "Unit {} is destroyed from application {} in model {}".format(
701 unit_name, application_name, model_name
702 )
703 )
704 finally:
705 if model:
706 await self.disconnect_model(model)
707 await self.disconnect_controller(controller)
708
709 async def deploy_charm(
710 self,
711 application_name: str,
712 path: str,
713 model_name: str,
714 machine_id: str,
715 db_dict: dict = None,
716 progress_timeout: float = None,
717 total_timeout: float = None,
718 config: dict = None,
719 series: str = None,
720 num_units: int = 1,
721 ):
722 """Deploy charm
723
724 :param: application_name: Application name
725 :param: path: Local path to the charm
726 :param: model_name: Model name
727 :param: machine_id ID of the machine
728 :param: db_dict: Dictionary with data of the DB to write the updates
729 :param: progress_timeout: Maximum time between two updates in the model
730 :param: total_timeout: Timeout for the entity to be active
731 :param: config: Config for the charm
732 :param: series: Series of the charm
733 :param: num_units: Number of units
734
735 :return: (juju.application.Application): Juju application
736 """
737 self.log.debug(
738 "Deploying charm {} to machine {} in model ~{}".format(
739 application_name, machine_id, model_name
740 )
741 )
742 self.log.debug("charm: {}".format(path))
743
744 # Get controller
745 controller = await self.get_controller()
746
747 # Get model
748 model = await self.get_model(controller, model_name)
749
750 try:
751 if application_name not in model.applications:
752
753 if machine_id is not None:
754 machine, series = self._get_machine_info(model, machine_id)
755
756 application = await model.deploy(
757 entity_url=path,
758 application_name=application_name,
759 channel="stable",
760 num_units=1,
761 series=series,
762 to=machine_id,
763 config=config,
764 )
765
766 self.log.debug(
767 "Wait until application {} is ready in model {}".format(
768 application_name, model_name
769 )
770 )
771 if num_units > 1:
772 for _ in range(num_units - 1):
773 m, _ = await self.create_machine(model_name, wait=False)
774 await application.add_unit(to=m.entity_id)
775
776 await JujuModelWatcher.wait_for(
777 model=model,
778 entity=application,
779 progress_timeout=progress_timeout,
780 total_timeout=total_timeout,
781 db_dict=db_dict,
782 n2vc=self.n2vc,
783 vca_id=self.vca_connection._vca_id,
784 )
785 self.log.debug(
786 "Application {} is ready in model {}".format(
787 application_name, model_name
788 )
789 )
790 else:
791 raise JujuApplicationExists(
792 "Application {} exists".format(application_name)
793 )
794 except juju.errors.JujuError as e:
795 if "already exists" in e.message:
796 raise JujuApplicationExists(
797 "Application {} exists".format(application_name)
798 )
799 else:
800 raise e
801 finally:
802 await self.disconnect_model(model)
803 await self.disconnect_controller(controller)
804
805 return application
806
807 async def scale_application(
808 self,
809 model_name: str,
810 application_name: str,
811 scale: int = 1,
812 total_timeout: float = None,
813 ):
814 """
815 Scale application (K8s)
816
817 :param: model_name: Model name
818 :param: application_name: Application name
819 :param: scale: Scale to which to set this application
820 :param: total_timeout: Timeout for the entity to be active
821 """
822
823 model = None
824 controller = await self.get_controller()
825 try:
826 model = await self.get_model(controller, model_name)
827
828 self.log.debug(
829 "Scaling application {} in model {}".format(
830 application_name, model_name
831 )
832 )
833 application = self._get_application(model, application_name)
834 if application is None:
835 raise JujuApplicationNotFound("Cannot scale application")
836 await application.scale(scale=scale)
837 # Wait until application is scaled in model
838 self.log.debug(
839 "Waiting for application {} to be scaled in model {}...".format(
840 application_name, model_name
841 )
842 )
843 if total_timeout is None:
844 total_timeout = 1800
845 end = time.time() + total_timeout
846 while time.time() < end:
847 application_scale = self._get_application_count(model, application_name)
848 # Before calling wait_for_model function,
849 # wait until application unit count and scale count are equal.
850 # Because there is a delay before scaling triggers in Juju model.
851 if application_scale == scale:
852 await JujuModelWatcher.wait_for_model(
853 model=model, timeout=total_timeout
854 )
855 self.log.debug(
856 "Application {} is scaled in model {}".format(
857 application_name, model_name
858 )
859 )
860 return
861 await asyncio.sleep(5)
862 raise Exception(
863 "Timeout waiting for application {} in model {} to be scaled".format(
864 application_name, model_name
865 )
866 )
867 finally:
868 if model:
869 await self.disconnect_model(model)
870 await self.disconnect_controller(controller)
871
872 def _get_application_count(self, model: Model, application_name: str) -> int:
873 """Get number of units of the application
874
875 :param: model: Model object
876 :param: application_name: Application name
877
878 :return: int (or None if application doesn't exist)
879 """
880 application = self._get_application(model, application_name)
881 if application is not None:
882 return len(application.units)
883
884 def _get_application(self, model: Model, application_name: str) -> Application:
885 """Get application
886
887 :param: model: Model object
888 :param: application_name: Application name
889
890 :return: juju.application.Application (or None if it doesn't exist)
891 """
892 if model.applications and application_name in model.applications:
893 return model.applications[application_name]
894
895 def _get_unit(self, application: Application, machine_id: str) -> Unit:
896 """Get unit
897
898 :param: application: Application object
899 :param: machine_id: Machine id
900
901 :return: Unit
902 """
903 unit = None
904 for u in application.units:
905 if u.machine_id == machine_id:
906 unit = u
907 break
908 return unit
909
910 def _get_machine_info(
911 self,
912 model,
913 machine_id: str,
914 ) -> (str, str):
915 """Get machine info
916
917 :param: model: Model object
918 :param: machine_id: Machine id
919
920 :return: (str, str): (machine, series)
921 """
922 if machine_id not in model.machines:
923 msg = "Machine {} not found in model".format(machine_id)
924 self.log.error(msg=msg)
925 raise JujuMachineNotFound(msg)
926 machine = model.machines[machine_id]
927 return machine, machine.series
928
929 async def execute_action(
930 self,
931 application_name: str,
932 model_name: str,
933 action_name: str,
934 db_dict: dict = None,
935 machine_id: str = None,
936 progress_timeout: float = None,
937 total_timeout: float = None,
938 **kwargs,
939 ):
940 """Execute action
941
942 :param: application_name: Application name
943 :param: model_name: Model name
944 :param: action_name: Name of the action
945 :param: db_dict: Dictionary with data of the DB to write the updates
946 :param: machine_id Machine id
947 :param: progress_timeout: Maximum time between two updates in the model
948 :param: total_timeout: Timeout for the entity to be active
949
950 :return: (str, str): (output and status)
951 """
952 self.log.debug(
953 "Executing action {} using params {}".format(action_name, kwargs)
954 )
955 # Get controller
956 controller = await self.get_controller()
957
958 # Get model
959 model = await self.get_model(controller, model_name)
960
961 try:
962 # Get application
963 application = self._get_application(
964 model,
965 application_name=application_name,
966 )
967 if application is None:
968 raise JujuApplicationNotFound("Cannot execute action")
969 # Racing condition:
970 # Ocassionally, self._get_leader_unit() will return None
971 # because the leader elected hook has not been triggered yet.
972 # Therefore, we are doing some retries. If it happens again,
973 # re-open bug 1236
974 if machine_id is None:
975 unit = await self._get_leader_unit(application)
976 self.log.debug(
977 "Action {} is being executed on the leader unit {}".format(
978 action_name, unit.name
979 )
980 )
981 else:
982 unit = self._get_unit(application, machine_id)
983 if not unit:
984 raise JujuError(
985 "A unit with machine id {} not in available units".format(
986 machine_id
987 )
988 )
989 self.log.debug(
990 "Action {} is being executed on {} unit".format(
991 action_name, unit.name
992 )
993 )
994
995 actions = await application.get_actions()
996
997 if action_name not in actions:
998 raise JujuActionNotFound(
999 "Action {} not in available actions".format(action_name)
1000 )
1001
1002 action = await unit.run_action(action_name, **kwargs)
1003
1004 self.log.debug(
1005 "Wait until action {} is completed in application {} (model={})".format(
1006 action_name, application_name, model_name
1007 )
1008 )
1009 await JujuModelWatcher.wait_for(
1010 model=model,
1011 entity=action,
1012 progress_timeout=progress_timeout,
1013 total_timeout=total_timeout,
1014 db_dict=db_dict,
1015 n2vc=self.n2vc,
1016 vca_id=self.vca_connection._vca_id,
1017 )
1018
1019 output = await model.get_action_output(action_uuid=action.entity_id)
1020 status = await model.get_action_status(uuid_or_prefix=action.entity_id)
1021 status = (
1022 status[action.entity_id] if action.entity_id in status else "failed"
1023 )
1024
1025 self.log.debug(
1026 "Action {} completed with status {} in application {} (model={})".format(
1027 action_name, action.status, application_name, model_name
1028 )
1029 )
1030 finally:
1031 await self.disconnect_model(model)
1032 await self.disconnect_controller(controller)
1033
1034 return output, status
1035
1036 async def get_actions(self, application_name: str, model_name: str) -> dict:
1037 """Get list of actions
1038
1039 :param: application_name: Application name
1040 :param: model_name: Model name
1041
1042 :return: Dict with this format
1043 {
1044 "action_name": "Description of the action",
1045 ...
1046 }
1047 """
1048 self.log.debug(
1049 "Getting list of actions for application {}".format(application_name)
1050 )
1051
1052 # Get controller
1053 controller = await self.get_controller()
1054
1055 # Get model
1056 model = await self.get_model(controller, model_name)
1057
1058 try:
1059 # Get application
1060 application = self._get_application(
1061 model,
1062 application_name=application_name,
1063 )
1064
1065 # Return list of actions
1066 return await application.get_actions()
1067
1068 finally:
1069 # Disconnect from model and controller
1070 await self.disconnect_model(model)
1071 await self.disconnect_controller(controller)
1072
1073 async def get_metrics(self, model_name: str, application_name: str) -> dict:
1074 """Get the metrics collected by the VCA.
1075
1076 :param model_name The name or unique id of the network service
1077 :param application_name The name of the application
1078 """
1079 if not model_name or not application_name:
1080 raise Exception("model_name and application_name must be non-empty strings")
1081 metrics = {}
1082 controller = await self.get_controller()
1083 model = await self.get_model(controller, model_name)
1084 try:
1085 application = self._get_application(model, application_name)
1086 if application is not None:
1087 metrics = await application.get_metrics()
1088 finally:
1089 self.disconnect_model(model)
1090 self.disconnect_controller(controller)
1091 return metrics
1092
1093 async def add_relation(
1094 self,
1095 model_name: str,
1096 endpoint_1: str,
1097 endpoint_2: str,
1098 ):
1099 """Add relation
1100
1101 :param: model_name: Model name
1102 :param: endpoint_1 First endpoint name
1103 ("app:endpoint" format or directly the saas name)
1104 :param: endpoint_2: Second endpoint name (^ same format)
1105 """
1106
1107 self.log.debug("Adding relation: {} -> {}".format(endpoint_1, endpoint_2))
1108
1109 # Get controller
1110 controller = await self.get_controller()
1111
1112 # Get model
1113 model = await self.get_model(controller, model_name)
1114
1115 # Add relation
1116 try:
1117 await model.add_relation(endpoint_1, endpoint_2)
1118 except juju.errors.JujuAPIError as e:
1119 if "not found" in e.message:
1120 self.log.warning("Relation not found: {}".format(e.message))
1121 return
1122 if "already exists" in e.message:
1123 self.log.warning("Relation already exists: {}".format(e.message))
1124 return
1125 # another exception, raise it
1126 raise e
1127 finally:
1128 await self.disconnect_model(model)
1129 await self.disconnect_controller(controller)
1130
1131 async def consume(
1132 self,
1133 offer_url: str,
1134 model_name: str,
1135 ):
1136 """
1137 Adds a remote offer to the model. Relations can be created later using "juju relate".
1138
1139 :param: offer_url: Offer Url
1140 :param: model_name: Model name
1141
1142 :raises ParseError if there's a problem parsing the offer_url
1143 :raises JujuError if remote offer includes and endpoint
1144 :raises JujuAPIError if the operation is not successful
1145 """
1146 controller = await self.get_controller()
1147 model = await controller.get_model(model_name)
1148
1149 try:
1150 await model.consume(offer_url)
1151 finally:
1152 await self.disconnect_model(model)
1153 await self.disconnect_controller(controller)
1154
1155 async def destroy_model(self, model_name: str, total_timeout: float = 1800):
1156 """
1157 Destroy model
1158
1159 :param: model_name: Model name
1160 :param: total_timeout: Timeout
1161 """
1162
1163 controller = await self.get_controller()
1164 model = None
1165 try:
1166 if not await self.model_exists(model_name, controller=controller):
1167 return
1168
1169 self.log.debug("Destroying model {}".format(model_name))
1170
1171 model = await self.get_model(controller, model_name)
1172 # Destroy machines that are manually provisioned
1173 # and still are in pending state
1174 await self._destroy_pending_machines(model, only_manual=True)
1175 await self.disconnect_model(model)
1176
1177 await self._destroy_model(
1178 model_name,
1179 controller,
1180 timeout=total_timeout,
1181 )
1182 except Exception as e:
1183 if not await self.model_exists(model_name, controller=controller):
1184 return
1185 raise e
1186 finally:
1187 if model:
1188 await self.disconnect_model(model)
1189 await self.disconnect_controller(controller)
1190
1191 async def _destroy_model(
1192 self, model_name: str, controller: Controller, timeout: float = 1800
1193 ):
1194 """
1195 Destroy model from controller
1196
1197 :param: model: Model name to be removed
1198 :param: controller: Controller object
1199 :param: timeout: Timeout in seconds
1200 """
1201
1202 async def _destroy_model_loop(model_name: str, controller: Controller):
1203 while await self.model_exists(model_name, controller=controller):
1204 await controller.destroy_model(
1205 model_name, destroy_storage=True, force=True, max_wait=0
1206 )
1207 await asyncio.sleep(5)
1208
1209 try:
1210 await asyncio.wait_for(
1211 _destroy_model_loop(model_name, controller), timeout=timeout
1212 )
1213 except asyncio.TimeoutError:
1214 raise Exception(
1215 "Timeout waiting for model {} to be destroyed".format(model_name)
1216 )
1217 except juju.errors.JujuError as e:
1218 if any("has been removed" in error for error in e.errors):
1219 return
1220 raise e
1221
1222 async def destroy_application(
1223 self, model_name: str, application_name: str, total_timeout: float
1224 ):
1225 """
1226 Destroy application
1227
1228 :param: model_name: Model name
1229 :param: application_name: Application name
1230 :param: total_timeout: Timeout
1231 """
1232
1233 controller = await self.get_controller()
1234 model = None
1235
1236 try:
1237 model = await self.get_model(controller, model_name)
1238 self.log.debug(
1239 "Destroying application {} in model {}".format(
1240 application_name, model_name
1241 )
1242 )
1243 application = self._get_application(model, application_name)
1244 if application:
1245 await application.destroy()
1246 else:
1247 self.log.warning("Application not found: {}".format(application_name))
1248
1249 self.log.debug(
1250 "Waiting for application {} to be destroyed in model {}...".format(
1251 application_name, model_name
1252 )
1253 )
1254 if total_timeout is None:
1255 total_timeout = 3600
1256 end = time.time() + total_timeout
1257 while time.time() < end:
1258 if not self._get_application(model, application_name):
1259 self.log.debug(
1260 "The application {} was destroyed in model {} ".format(
1261 application_name, model_name
1262 )
1263 )
1264 return
1265 await asyncio.sleep(5)
1266 raise Exception(
1267 "Timeout waiting for application {} to be destroyed in model {}".format(
1268 application_name, model_name
1269 )
1270 )
1271 finally:
1272 if model is not None:
1273 await self.disconnect_model(model)
1274 await self.disconnect_controller(controller)
1275
1276 async def _destroy_pending_machines(self, model: Model, only_manual: bool = False):
1277 """
1278 Destroy pending machines in a given model
1279
1280 :param: only_manual: Bool that indicates only manually provisioned
1281 machines should be destroyed (if True), or that
1282 all pending machines should be destroyed
1283 """
1284 status = await model.get_status()
1285 for machine_id in status.machines:
1286 machine_status = status.machines[machine_id]
1287 if machine_status.agent_status.status == "pending":
1288 if only_manual and not machine_status.instance_id.startswith("manual:"):
1289 break
1290 machine = model.machines[machine_id]
1291 await machine.destroy(force=True)
1292
1293 async def configure_application(
1294 self, model_name: str, application_name: str, config: dict = None
1295 ):
1296 """Configure application
1297
1298 :param: model_name: Model name
1299 :param: application_name: Application name
1300 :param: config: Config to apply to the charm
1301 """
1302 self.log.debug("Configuring application {}".format(application_name))
1303
1304 if config:
1305 controller = await self.get_controller()
1306 model = None
1307 try:
1308 model = await self.get_model(controller, model_name)
1309 application = self._get_application(
1310 model,
1311 application_name=application_name,
1312 )
1313 await application.set_config(config)
1314 finally:
1315 if model:
1316 await self.disconnect_model(model)
1317 await self.disconnect_controller(controller)
1318
1319 def handle_exception(self, loop, context):
1320 # All unhandled exceptions by libjuju are handled here.
1321 pass
1322
1323 async def health_check(self, interval: float = 300.0):
1324 """
1325 Health check to make sure controller and controller_model connections are OK
1326
1327 :param: interval: Time in seconds between checks
1328 """
1329 controller = None
1330 while True:
1331 try:
1332 controller = await self.get_controller()
1333 # self.log.debug("VCA is alive")
1334 except Exception as e:
1335 self.log.error("Health check to VCA failed: {}".format(e))
1336 finally:
1337 await self.disconnect_controller(controller)
1338 await asyncio.sleep(interval)
1339
1340 async def list_models(self, contains: str = None) -> [str]:
1341 """List models with certain names
1342
1343 :param: contains: String that is contained in model name
1344
1345 :retur: [models] Returns list of model names
1346 """
1347
1348 controller = await self.get_controller()
1349 try:
1350 models = await controller.list_models()
1351 if contains:
1352 models = [model for model in models if contains in model]
1353 return models
1354 finally:
1355 await self.disconnect_controller(controller)
1356
1357 async def list_offers(self, model_name: str) -> QueryApplicationOffersResults:
1358 """List models with certain names
1359
1360 :param: model_name: Model name
1361
1362 :return: Returns list of offers
1363 """
1364
1365 controller = await self.get_controller()
1366 try:
1367 return await controller.list_offers(model_name)
1368 finally:
1369 await self.disconnect_controller(controller)
1370
1371 async def add_k8s(
1372 self,
1373 name: str,
1374 rbac_id: str,
1375 token: str,
1376 client_cert_data: str,
1377 configuration: Configuration,
1378 storage_class: str,
1379 credential_name: str = None,
1380 ):
1381 """
1382 Add a Kubernetes cloud to the controller
1383
1384 Similar to the `juju add-k8s` command in the CLI
1385
1386 :param: name: Name for the K8s cloud
1387 :param: configuration: Kubernetes configuration object
1388 :param: storage_class: Storage Class to use in the cloud
1389 :param: credential_name: Storage Class to use in the cloud
1390 """
1391
1392 if not storage_class:
1393 raise Exception("storage_class must be a non-empty string")
1394 if not name:
1395 raise Exception("name must be a non-empty string")
1396 if not configuration:
1397 raise Exception("configuration must be provided")
1398
1399 endpoint = configuration.host
1400 credential = self.get_k8s_cloud_credential(
1401 configuration,
1402 client_cert_data,
1403 token,
1404 )
1405 credential.attrs[RBAC_LABEL_KEY_NAME] = rbac_id
1406 cloud = client.Cloud(
1407 type_="kubernetes",
1408 auth_types=[credential.auth_type],
1409 endpoint=endpoint,
1410 ca_certificates=[client_cert_data],
1411 config={
1412 "operator-storage": storage_class,
1413 "workload-storage": storage_class,
1414 },
1415 )
1416
1417 return await self.add_cloud(
1418 name, cloud, credential, credential_name=credential_name
1419 )
1420
1421 def get_k8s_cloud_credential(
1422 self,
1423 configuration: Configuration,
1424 client_cert_data: str,
1425 token: str = None,
1426 ) -> client.CloudCredential:
1427 attrs = {}
1428 # TODO: Test with AKS
1429 key = None # open(configuration.key_file, "r").read()
1430 username = configuration.username
1431 password = configuration.password
1432
1433 if client_cert_data:
1434 attrs["ClientCertificateData"] = client_cert_data
1435 if key:
1436 attrs["ClientKeyData"] = key
1437 if token:
1438 if username or password:
1439 raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass")
1440 attrs["Token"] = token
1441
1442 auth_type = None
1443 if key:
1444 auth_type = "oauth2"
1445 if client_cert_data:
1446 auth_type = "oauth2withcert"
1447 if not token:
1448 raise JujuInvalidK8sConfiguration(
1449 "missing token for auth type {}".format(auth_type)
1450 )
1451 elif username:
1452 if not password:
1453 self.log.debug(
1454 "credential for user {} has empty password".format(username)
1455 )
1456 attrs["username"] = username
1457 attrs["password"] = password
1458 if client_cert_data:
1459 auth_type = "userpasswithcert"
1460 else:
1461 auth_type = "userpass"
1462 elif client_cert_data and token:
1463 auth_type = "certificate"
1464 else:
1465 raise JujuInvalidK8sConfiguration("authentication method not supported")
1466 return client.CloudCredential(auth_type=auth_type, attrs=attrs)
1467
1468 async def add_cloud(
1469 self,
1470 name: str,
1471 cloud: Cloud,
1472 credential: CloudCredential = None,
1473 credential_name: str = None,
1474 ) -> Cloud:
1475 """
1476 Add cloud to the controller
1477
1478 :param: name: Name of the cloud to be added
1479 :param: cloud: Cloud object
1480 :param: credential: CloudCredentials object for the cloud
1481 :param: credential_name: Credential name.
1482 If not defined, cloud of the name will be used.
1483 """
1484 controller = await self.get_controller()
1485 try:
1486 _ = await controller.add_cloud(name, cloud)
1487 if credential:
1488 await controller.add_credential(
1489 credential_name or name, credential=credential, cloud=name
1490 )
1491 # Need to return the object returned by the controller.add_cloud() function
1492 # I'm returning the original value now until this bug is fixed:
1493 # https://github.com/juju/python-libjuju/issues/443
1494 return cloud
1495 finally:
1496 await self.disconnect_controller(controller)
1497
1498 async def remove_cloud(self, name: str):
1499 """
1500 Remove cloud
1501
1502 :param: name: Name of the cloud to be removed
1503 """
1504 controller = await self.get_controller()
1505 try:
1506 await controller.remove_cloud(name)
1507 except juju.errors.JujuError as e:
1508 if len(e.errors) == 1 and f'cloud "{name}" not found' == e.errors[0]:
1509 self.log.warning(f"Cloud {name} not found, so it could not be deleted.")
1510 else:
1511 raise e
1512 finally:
1513 await self.disconnect_controller(controller)
1514
1515 @retry(attempts=20, delay=5, fallback=JujuLeaderUnitNotFound())
1516 async def _get_leader_unit(self, application: Application) -> Unit:
1517 unit = None
1518 for u in application.units:
1519 if await u.is_leader_from_status():
1520 unit = u
1521 break
1522 if not unit:
1523 raise Exception()
1524 return unit
1525
1526 async def get_cloud_credentials(self, cloud: Cloud) -> typing.List:
1527 """
1528 Get cloud credentials
1529
1530 :param: cloud: Cloud object. The returned credentials will be from this cloud.
1531
1532 :return: List of credentials object associated to the specified cloud
1533
1534 """
1535 controller = await self.get_controller()
1536 try:
1537 facade = client.CloudFacade.from_connection(controller.connection())
1538 cloud_cred_tag = tag.credential(
1539 cloud.name, self.vca_connection.data.user, cloud.credential_name
1540 )
1541 params = [client.Entity(cloud_cred_tag)]
1542 return (await facade.Credential(params)).results
1543 finally:
1544 await self.disconnect_controller(controller)
1545
1546 async def check_application_exists(self, model_name, application_name) -> bool:
1547 """Check application exists
1548
1549 :param: model_name: Model Name
1550 :param: application_name: Application Name
1551
1552 :return: Boolean
1553 """
1554
1555 model = None
1556 controller = await self.get_controller()
1557 try:
1558 model = await self.get_model(controller, model_name)
1559 self.log.debug(
1560 "Checking if application {} exists in model {}".format(
1561 application_name, model_name
1562 )
1563 )
1564 return self._get_application(model, application_name) is not None
1565 finally:
1566 if model:
1567 await self.disconnect_model(model)
1568 await self.disconnect_controller(controller)