Fix black issues
[osm/N2VC.git] / n2vc / libjuju.py
1 # Copyright 2020 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import logging
17 import typing
18
19 import time
20
21 import juju.errors
22 from juju.model import Model
23 from juju.machine import Machine
24 from juju.application import Application
25 from juju.unit import Unit
26 from juju.client._definitions import (
27 FullStatus,
28 QueryApplicationOffersResults,
29 Cloud,
30 CloudCredential,
31 )
32 from juju.controller import Controller
33 from juju.client import client
34 from juju import tag
35
36 from n2vc.juju_watcher import JujuModelWatcher
37 from n2vc.provisioner import AsyncSSHProvisioner
38 from n2vc.n2vc_conn import N2VCConnector
39 from n2vc.exceptions import (
40 JujuMachineNotFound,
41 JujuApplicationNotFound,
42 JujuLeaderUnitNotFound,
43 JujuActionNotFound,
44 JujuControllerFailedConnecting,
45 JujuApplicationExists,
46 JujuInvalidK8sConfiguration,
47 JujuError,
48 )
49 from n2vc.vca.cloud import Cloud as VcaCloud
50 from n2vc.vca.connection import Connection
51 from kubernetes.client.configuration import Configuration
52 from retrying_async import retry
53
54
55 RBAC_LABEL_KEY_NAME = "rbac-id"
56
57
58 class Libjuju:
59 def __init__(
60 self,
61 vca_connection: Connection,
62 loop: asyncio.AbstractEventLoop = None,
63 log: logging.Logger = None,
64 n2vc: N2VCConnector = None,
65 ):
66 """
67 Constructor
68
69 :param: vca_connection: n2vc.vca.connection object
70 :param: loop: Asyncio loop
71 :param: log: Logger
72 :param: n2vc: N2VC object
73 """
74
75 self.log = log or logging.getLogger("Libjuju")
76 self.n2vc = n2vc
77 self.vca_connection = vca_connection
78
79 self.loop = loop or asyncio.get_event_loop()
80 self.loop.set_exception_handler(self.handle_exception)
81 self.creating_model = asyncio.Lock(loop=self.loop)
82
83 if self.vca_connection.is_default:
84 self.health_check_task = self._create_health_check_task()
85
86 def _create_health_check_task(self):
87 return self.loop.create_task(self.health_check())
88
89 async def get_controller(self, timeout: float = 60.0) -> Controller:
90 """
91 Get controller
92
93 :param: timeout: Time in seconds to wait for controller to connect
94 """
95 controller = None
96 try:
97 controller = Controller()
98 await asyncio.wait_for(
99 controller.connect(
100 endpoint=self.vca_connection.data.endpoints,
101 username=self.vca_connection.data.user,
102 password=self.vca_connection.data.secret,
103 cacert=self.vca_connection.data.cacert,
104 ),
105 timeout=timeout,
106 )
107 if self.vca_connection.is_default:
108 endpoints = await controller.api_endpoints
109 if not all(
110 endpoint in self.vca_connection.endpoints for endpoint in endpoints
111 ):
112 await self.vca_connection.update_endpoints(endpoints)
113 return controller
114 except asyncio.CancelledError as e:
115 raise e
116 except Exception as e:
117 self.log.error(
118 "Failed connecting to controller: {}... {}".format(
119 self.vca_connection.data.endpoints, e
120 )
121 )
122 if controller:
123 await self.disconnect_controller(controller)
124
125 raise JujuControllerFailedConnecting(
126 f"Error connecting to Juju controller: {e}"
127 )
128
129 async def disconnect(self):
130 """Disconnect"""
131 # Cancel health check task
132 self.health_check_task.cancel()
133 self.log.debug("Libjuju disconnected!")
134
135 async def disconnect_model(self, model: Model):
136 """
137 Disconnect model
138
139 :param: model: Model that will be disconnected
140 """
141 await model.disconnect()
142
143 async def disconnect_controller(self, controller: Controller):
144 """
145 Disconnect controller
146
147 :param: controller: Controller that will be disconnected
148 """
149 if controller:
150 await controller.disconnect()
151
152 @retry(attempts=3, delay=5, timeout=None)
153 async def add_model(self, model_name: str, cloud: VcaCloud):
154 """
155 Create model
156
157 :param: model_name: Model name
158 :param: cloud: Cloud object
159 """
160
161 # Get controller
162 controller = await self.get_controller()
163 model = None
164 try:
165 # Block until other workers have finished model creation
166 while self.creating_model.locked():
167 await asyncio.sleep(0.1)
168
169 # Create the model
170 async with self.creating_model:
171 if await self.model_exists(model_name, controller=controller):
172 return
173 self.log.debug("Creating model {}".format(model_name))
174 model = await controller.add_model(
175 model_name,
176 config=self.vca_connection.data.model_config,
177 cloud_name=cloud.name,
178 credential_name=cloud.credential_name,
179 )
180 except juju.errors.JujuAPIError as e:
181 if "already exists" in e.message:
182 pass
183 else:
184 raise e
185 finally:
186 if model:
187 await self.disconnect_model(model)
188 await self.disconnect_controller(controller)
189
190 async def get_executed_actions(self, model_name: str) -> list:
191 """
192 Get executed/history of actions for a model.
193
194 :param: model_name: Model name, str.
195 :return: List of executed actions for a model.
196 """
197 model = None
198 executed_actions = []
199 controller = await self.get_controller()
200 try:
201 model = await self.get_model(controller, model_name)
202 # Get all unique action names
203 actions = {}
204 for application in model.applications:
205 application_actions = await self.get_actions(application, model_name)
206 actions.update(application_actions)
207 # Get status of all actions
208 for application_action in actions:
209 app_action_status_list = await model.get_action_status(
210 name=application_action
211 )
212 for action_id, action_status in app_action_status_list.items():
213 executed_action = {
214 "id": action_id,
215 "action": application_action,
216 "status": action_status,
217 }
218 # Get action output by id
219 action_status = await model.get_action_output(executed_action["id"])
220 for k, v in action_status.items():
221 executed_action[k] = v
222 executed_actions.append(executed_action)
223 except Exception as e:
224 raise JujuError(
225 "Error in getting executed actions for model: {}. Error: {}".format(
226 model_name, str(e)
227 )
228 )
229 finally:
230 if model:
231 await self.disconnect_model(model)
232 await self.disconnect_controller(controller)
233 return executed_actions
234
235 async def get_application_configs(
236 self, model_name: str, application_name: str
237 ) -> dict:
238 """
239 Get available configs for an application.
240
241 :param: model_name: Model name, str.
242 :param: application_name: Application name, str.
243
244 :return: A dict which has key - action name, value - action description
245 """
246 model = None
247 application_configs = {}
248 controller = await self.get_controller()
249 try:
250 model = await self.get_model(controller, model_name)
251 application = self._get_application(
252 model, application_name=application_name
253 )
254 application_configs = await application.get_config()
255 except Exception as e:
256 raise JujuError(
257 "Error in getting configs for application: {} in model: {}. Error: {}".format(
258 application_name, model_name, str(e)
259 )
260 )
261 finally:
262 if model:
263 await self.disconnect_model(model)
264 await self.disconnect_controller(controller)
265 return application_configs
266
267 @retry(attempts=3, delay=5)
268 async def get_model(self, controller: Controller, model_name: str) -> Model:
269 """
270 Get model from controller
271
272 :param: controller: Controller
273 :param: model_name: Model name
274
275 :return: Model: The created Juju model object
276 """
277 return await controller.get_model(model_name)
278
279 async def model_exists(
280 self, model_name: str, controller: Controller = None
281 ) -> bool:
282 """
283 Check if model exists
284
285 :param: controller: Controller
286 :param: model_name: Model name
287
288 :return bool
289 """
290 need_to_disconnect = False
291
292 # Get controller if not passed
293 if not controller:
294 controller = await self.get_controller()
295 need_to_disconnect = True
296
297 # Check if model exists
298 try:
299 return model_name in await controller.list_models()
300 finally:
301 if need_to_disconnect:
302 await self.disconnect_controller(controller)
303
304 async def models_exist(self, model_names: [str]) -> (bool, list):
305 """
306 Check if models exists
307
308 :param: model_names: List of strings with model names
309
310 :return (bool, list[str]): (True if all models exists, List of model names that don't exist)
311 """
312 if not model_names:
313 raise Exception(
314 "model_names must be a non-empty array. Given value: {}".format(
315 model_names
316 )
317 )
318 non_existing_models = []
319 models = await self.list_models()
320 existing_models = list(set(models).intersection(model_names))
321 non_existing_models = list(set(model_names) - set(existing_models))
322
323 return (
324 len(non_existing_models) == 0,
325 non_existing_models,
326 )
327
328 async def get_model_status(self, model_name: str) -> FullStatus:
329 """
330 Get model status
331
332 :param: model_name: Model name
333
334 :return: Full status object
335 """
336 controller = await self.get_controller()
337 model = await self.get_model(controller, model_name)
338 try:
339 return await model.get_status()
340 finally:
341 await self.disconnect_model(model)
342 await self.disconnect_controller(controller)
343
344 async def create_machine(
345 self,
346 model_name: str,
347 machine_id: str = None,
348 db_dict: dict = None,
349 progress_timeout: float = None,
350 total_timeout: float = None,
351 series: str = "bionic",
352 wait: bool = True,
353 ) -> (Machine, bool):
354 """
355 Create machine
356
357 :param: model_name: Model name
358 :param: machine_id: Machine id
359 :param: db_dict: Dictionary with data of the DB to write the updates
360 :param: progress_timeout: Maximum time between two updates in the model
361 :param: total_timeout: Timeout for the entity to be active
362 :param: series: Series of the machine (xenial, bionic, focal, ...)
363 :param: wait: Wait until machine is ready
364
365 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
366 if the machine is new or it already existed
367 """
368 new = False
369 machine = None
370
371 self.log.debug(
372 "Creating machine (id={}) in model: {}".format(machine_id, model_name)
373 )
374
375 # Get controller
376 controller = await self.get_controller()
377
378 # Get model
379 model = await self.get_model(controller, model_name)
380 try:
381 if machine_id is not None:
382 self.log.debug(
383 "Searching machine (id={}) in model {}".format(
384 machine_id, model_name
385 )
386 )
387
388 # Get machines from model and get the machine with machine_id if exists
389 machines = await model.get_machines()
390 if machine_id in machines:
391 self.log.debug(
392 "Machine (id={}) found in model {}".format(
393 machine_id, model_name
394 )
395 )
396 machine = machines[machine_id]
397 else:
398 raise JujuMachineNotFound("Machine {} not found".format(machine_id))
399
400 if machine is None:
401 self.log.debug("Creating a new machine in model {}".format(model_name))
402
403 # Create machine
404 machine = await model.add_machine(
405 spec=None, constraints=None, disks=None, series=series
406 )
407 new = True
408
409 # Wait until the machine is ready
410 self.log.debug(
411 "Wait until machine {} is ready in model {}".format(
412 machine.entity_id, model_name
413 )
414 )
415 if wait:
416 await JujuModelWatcher.wait_for(
417 model=model,
418 entity=machine,
419 progress_timeout=progress_timeout,
420 total_timeout=total_timeout,
421 db_dict=db_dict,
422 n2vc=self.n2vc,
423 vca_id=self.vca_connection._vca_id,
424 )
425 finally:
426 await self.disconnect_model(model)
427 await self.disconnect_controller(controller)
428
429 self.log.debug(
430 "Machine {} ready at {} in model {}".format(
431 machine.entity_id, machine.dns_name, model_name
432 )
433 )
434 return machine, new
435
436 async def provision_machine(
437 self,
438 model_name: str,
439 hostname: str,
440 username: str,
441 private_key_path: str,
442 db_dict: dict = None,
443 progress_timeout: float = None,
444 total_timeout: float = None,
445 ) -> str:
446 """
447 Manually provisioning of a machine
448
449 :param: model_name: Model name
450 :param: hostname: IP to access the machine
451 :param: username: Username to login to the machine
452 :param: private_key_path: Local path for the private key
453 :param: db_dict: Dictionary with data of the DB to write the updates
454 :param: progress_timeout: Maximum time between two updates in the model
455 :param: total_timeout: Timeout for the entity to be active
456
457 :return: (Entity): Machine id
458 """
459 self.log.debug(
460 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
461 model_name, hostname, username
462 )
463 )
464
465 # Get controller
466 controller = await self.get_controller()
467
468 # Get model
469 model = await self.get_model(controller, model_name)
470
471 try:
472 # Get provisioner
473 provisioner = AsyncSSHProvisioner(
474 host=hostname,
475 user=username,
476 private_key_path=private_key_path,
477 log=self.log,
478 )
479
480 # Provision machine
481 params = await provisioner.provision_machine()
482
483 params.jobs = ["JobHostUnits"]
484
485 self.log.debug("Adding machine to model")
486 connection = model.connection()
487 client_facade = client.ClientFacade.from_connection(connection)
488
489 results = await client_facade.AddMachines(params=[params])
490 error = results.machines[0].error
491
492 if error:
493 msg = "Error adding machine: {}".format(error.message)
494 self.log.error(msg=msg)
495 raise ValueError(msg)
496
497 machine_id = results.machines[0].machine
498
499 self.log.debug("Installing Juju agent into machine {}".format(machine_id))
500 asyncio.ensure_future(
501 provisioner.install_agent(
502 connection=connection,
503 nonce=params.nonce,
504 machine_id=machine_id,
505 proxy=self.vca_connection.data.api_proxy,
506 series=params.series,
507 )
508 )
509
510 machine = None
511 for _ in range(10):
512 machine_list = await model.get_machines()
513 if machine_id in machine_list:
514 self.log.debug("Machine {} found in model!".format(machine_id))
515 machine = model.machines.get(machine_id)
516 break
517 await asyncio.sleep(2)
518
519 if machine is None:
520 msg = "Machine {} not found in model".format(machine_id)
521 self.log.error(msg=msg)
522 raise JujuMachineNotFound(msg)
523
524 self.log.debug(
525 "Wait until machine {} is ready in model {}".format(
526 machine.entity_id, model_name
527 )
528 )
529 await JujuModelWatcher.wait_for(
530 model=model,
531 entity=machine,
532 progress_timeout=progress_timeout,
533 total_timeout=total_timeout,
534 db_dict=db_dict,
535 n2vc=self.n2vc,
536 vca_id=self.vca_connection._vca_id,
537 )
538 except Exception as e:
539 raise e
540 finally:
541 await self.disconnect_model(model)
542 await self.disconnect_controller(controller)
543
544 self.log.debug(
545 "Machine provisioned {} in model {}".format(machine_id, model_name)
546 )
547
548 return machine_id
549
550 async def deploy(
551 self, uri: str, model_name: str, wait: bool = True, timeout: float = 3600
552 ):
553 """
554 Deploy bundle or charm: Similar to the juju CLI command `juju deploy`
555
556 :param: uri: Path or Charm Store uri in which the charm or bundle can be found
557 :param: model_name: Model name
558 :param: wait: Indicates whether to wait or not until all applications are active
559 :param: timeout: Time in seconds to wait until all applications are active
560 """
561 controller = await self.get_controller()
562 model = await self.get_model(controller, model_name)
563 try:
564 await model.deploy(uri, trust=True)
565 if wait:
566 await JujuModelWatcher.wait_for_model(model, timeout=timeout)
567 self.log.debug("All units active in model {}".format(model_name))
568 finally:
569 await self.disconnect_model(model)
570 await self.disconnect_controller(controller)
571
572 async def add_unit(
573 self,
574 application_name: str,
575 model_name: str,
576 machine_id: str,
577 db_dict: dict = None,
578 progress_timeout: float = None,
579 total_timeout: float = None,
580 ):
581 """Add unit
582
583 :param: application_name: Application name
584 :param: model_name: Model name
585 :param: machine_id Machine id
586 :param: db_dict: Dictionary with data of the DB to write the updates
587 :param: progress_timeout: Maximum time between two updates in the model
588 :param: total_timeout: Timeout for the entity to be active
589
590 :return: None
591 """
592
593 model = None
594 controller = await self.get_controller()
595 try:
596 model = await self.get_model(controller, model_name)
597 application = self._get_application(model, application_name)
598
599 if application is not None:
600 # Checks if the given machine id in the model,
601 # otherwise function raises an error
602 _machine, _series = self._get_machine_info(model, machine_id)
603
604 self.log.debug(
605 "Adding unit (machine {}) to application {} in model ~{}".format(
606 machine_id, application_name, model_name
607 )
608 )
609
610 await application.add_unit(to=machine_id)
611
612 await JujuModelWatcher.wait_for(
613 model=model,
614 entity=application,
615 progress_timeout=progress_timeout,
616 total_timeout=total_timeout,
617 db_dict=db_dict,
618 n2vc=self.n2vc,
619 vca_id=self.vca_connection._vca_id,
620 )
621 self.log.debug(
622 "Unit is added to application {} in model {}".format(
623 application_name, model_name
624 )
625 )
626 else:
627 raise JujuApplicationNotFound(
628 "Application {} not exists".format(application_name)
629 )
630 finally:
631 if model:
632 await self.disconnect_model(model)
633 await self.disconnect_controller(controller)
634
635 async def destroy_unit(
636 self,
637 application_name: str,
638 model_name: str,
639 machine_id: str,
640 total_timeout: float = None,
641 ):
642 """Destroy unit
643
644 :param: application_name: Application name
645 :param: model_name: Model name
646 :param: machine_id Machine id
647 :param: total_timeout: Timeout for the entity to be active
648
649 :return: None
650 """
651
652 model = None
653 controller = await self.get_controller()
654 try:
655 model = await self.get_model(controller, model_name)
656 application = self._get_application(model, application_name)
657
658 if application is None:
659 raise JujuApplicationNotFound(
660 "Application not found: {} (model={})".format(
661 application_name, model_name
662 )
663 )
664
665 unit = self._get_unit(application, machine_id)
666 if not unit:
667 raise JujuError(
668 "A unit with machine id {} not in available units".format(
669 machine_id
670 )
671 )
672
673 unit_name = unit.name
674
675 self.log.debug(
676 "Destroying unit {} from application {} in model {}".format(
677 unit_name, application_name, model_name
678 )
679 )
680 await application.destroy_unit(unit_name)
681
682 self.log.debug(
683 "Waiting for unit {} to be destroyed in application {} (model={})...".format(
684 unit_name, application_name, model_name
685 )
686 )
687
688 # TODO: Add functionality in the Juju watcher to replace this kind of blocks
689 if total_timeout is None:
690 total_timeout = 3600
691 end = time.time() + total_timeout
692 while time.time() < end:
693 if not self._get_unit(application, machine_id):
694 self.log.debug(
695 "The unit {} was destroyed in application {} (model={}) ".format(
696 unit_name, application_name, model_name
697 )
698 )
699 return
700 await asyncio.sleep(5)
701 self.log.debug(
702 "Unit {} is destroyed from application {} in model {}".format(
703 unit_name, application_name, model_name
704 )
705 )
706 finally:
707 if model:
708 await self.disconnect_model(model)
709 await self.disconnect_controller(controller)
710
711 async def deploy_charm(
712 self,
713 application_name: str,
714 path: str,
715 model_name: str,
716 machine_id: str,
717 db_dict: dict = None,
718 progress_timeout: float = None,
719 total_timeout: float = None,
720 config: dict = None,
721 series: str = None,
722 num_units: int = 1,
723 ):
724 """Deploy charm
725
726 :param: application_name: Application name
727 :param: path: Local path to the charm
728 :param: model_name: Model name
729 :param: machine_id ID of the machine
730 :param: db_dict: Dictionary with data of the DB to write the updates
731 :param: progress_timeout: Maximum time between two updates in the model
732 :param: total_timeout: Timeout for the entity to be active
733 :param: config: Config for the charm
734 :param: series: Series of the charm
735 :param: num_units: Number of units
736
737 :return: (juju.application.Application): Juju application
738 """
739 self.log.debug(
740 "Deploying charm {} to machine {} in model ~{}".format(
741 application_name, machine_id, model_name
742 )
743 )
744 self.log.debug("charm: {}".format(path))
745
746 # Get controller
747 controller = await self.get_controller()
748
749 # Get model
750 model = await self.get_model(controller, model_name)
751
752 try:
753 if application_name not in model.applications:
754 if machine_id is not None:
755 machine, series = self._get_machine_info(model, machine_id)
756
757 application = await model.deploy(
758 entity_url=path,
759 application_name=application_name,
760 channel="stable",
761 num_units=1,
762 series=series,
763 to=machine_id,
764 config=config,
765 )
766
767 self.log.debug(
768 "Wait until application {} is ready in model {}".format(
769 application_name, model_name
770 )
771 )
772 if num_units > 1:
773 for _ in range(num_units - 1):
774 m, _ = await self.create_machine(model_name, wait=False)
775 await application.add_unit(to=m.entity_id)
776
777 await JujuModelWatcher.wait_for(
778 model=model,
779 entity=application,
780 progress_timeout=progress_timeout,
781 total_timeout=total_timeout,
782 db_dict=db_dict,
783 n2vc=self.n2vc,
784 vca_id=self.vca_connection._vca_id,
785 )
786 self.log.debug(
787 "Application {} is ready in model {}".format(
788 application_name, model_name
789 )
790 )
791 else:
792 raise JujuApplicationExists(
793 "Application {} exists".format(application_name)
794 )
795 except juju.errors.JujuError as e:
796 if "already exists" in e.message:
797 raise JujuApplicationExists(
798 "Application {} exists".format(application_name)
799 )
800 else:
801 raise e
802 finally:
803 await self.disconnect_model(model)
804 await self.disconnect_controller(controller)
805
806 return application
807
808 async def resolve(self, model_name: str):
809 controller = await self.get_controller()
810 model = await self.get_model(controller, model_name)
811 all_units_active = False
812 try:
813 while not all_units_active:
814 all_units_active = True
815 for application_name, application in model.applications.items():
816 if application.status == "error":
817 for unit in application.units:
818 if unit.workload_status == "error":
819 self.log.debug(
820 "Model {}, Application {}, Unit {} in error state, resolving".format(
821 model_name, application_name, unit.entity_id
822 )
823 )
824 try:
825 await unit.resolved(retry=False)
826 all_units_active = False
827 except Exception:
828 pass
829
830 if not all_units_active:
831 await asyncio.sleep(5)
832 finally:
833 await self.disconnect_model(model)
834 await self.disconnect_controller(controller)
835
836 async def scale_application(
837 self,
838 model_name: str,
839 application_name: str,
840 scale: int = 1,
841 total_timeout: float = None,
842 ):
843 """
844 Scale application (K8s)
845
846 :param: model_name: Model name
847 :param: application_name: Application name
848 :param: scale: Scale to which to set this application
849 :param: total_timeout: Timeout for the entity to be active
850 """
851
852 model = None
853 controller = await self.get_controller()
854 try:
855 model = await self.get_model(controller, model_name)
856
857 self.log.debug(
858 "Scaling application {} in model {}".format(
859 application_name, model_name
860 )
861 )
862 application = self._get_application(model, application_name)
863 if application is None:
864 raise JujuApplicationNotFound("Cannot scale application")
865 await application.scale(scale=scale)
866 # Wait until application is scaled in model
867 self.log.debug(
868 "Waiting for application {} to be scaled in model {}...".format(
869 application_name, model_name
870 )
871 )
872 if total_timeout is None:
873 total_timeout = 1800
874 end = time.time() + total_timeout
875 while time.time() < end:
876 application_scale = self._get_application_count(model, application_name)
877 # Before calling wait_for_model function,
878 # wait until application unit count and scale count are equal.
879 # Because there is a delay before scaling triggers in Juju model.
880 if application_scale == scale:
881 await JujuModelWatcher.wait_for_model(
882 model=model, timeout=total_timeout
883 )
884 self.log.debug(
885 "Application {} is scaled in model {}".format(
886 application_name, model_name
887 )
888 )
889 return
890 await asyncio.sleep(5)
891 raise Exception(
892 "Timeout waiting for application {} in model {} to be scaled".format(
893 application_name, model_name
894 )
895 )
896 finally:
897 if model:
898 await self.disconnect_model(model)
899 await self.disconnect_controller(controller)
900
901 def _get_application_count(self, model: Model, application_name: str) -> int:
902 """Get number of units of the application
903
904 :param: model: Model object
905 :param: application_name: Application name
906
907 :return: int (or None if application doesn't exist)
908 """
909 application = self._get_application(model, application_name)
910 if application is not None:
911 return len(application.units)
912
913 def _get_application(self, model: Model, application_name: str) -> Application:
914 """Get application
915
916 :param: model: Model object
917 :param: application_name: Application name
918
919 :return: juju.application.Application (or None if it doesn't exist)
920 """
921 if model.applications and application_name in model.applications:
922 return model.applications[application_name]
923
924 def _get_unit(self, application: Application, machine_id: str) -> Unit:
925 """Get unit
926
927 :param: application: Application object
928 :param: machine_id: Machine id
929
930 :return: Unit
931 """
932 unit = None
933 for u in application.units:
934 if u.machine_id == machine_id:
935 unit = u
936 break
937 return unit
938
939 def _get_machine_info(
940 self,
941 model,
942 machine_id: str,
943 ) -> (str, str):
944 """Get machine info
945
946 :param: model: Model object
947 :param: machine_id: Machine id
948
949 :return: (str, str): (machine, series)
950 """
951 if machine_id not in model.machines:
952 msg = "Machine {} not found in model".format(machine_id)
953 self.log.error(msg=msg)
954 raise JujuMachineNotFound(msg)
955 machine = model.machines[machine_id]
956 return machine, machine.series
957
958 async def execute_action(
959 self,
960 application_name: str,
961 model_name: str,
962 action_name: str,
963 db_dict: dict = None,
964 machine_id: str = None,
965 progress_timeout: float = None,
966 total_timeout: float = None,
967 **kwargs,
968 ):
969 """Execute action
970
971 :param: application_name: Application name
972 :param: model_name: Model name
973 :param: action_name: Name of the action
974 :param: db_dict: Dictionary with data of the DB to write the updates
975 :param: machine_id Machine id
976 :param: progress_timeout: Maximum time between two updates in the model
977 :param: total_timeout: Timeout for the entity to be active
978
979 :return: (str, str): (output and status)
980 """
981 self.log.debug(
982 "Executing action {} using params {}".format(action_name, kwargs)
983 )
984 # Get controller
985 controller = await self.get_controller()
986
987 # Get model
988 model = await self.get_model(controller, model_name)
989
990 try:
991 # Get application
992 application = self._get_application(
993 model,
994 application_name=application_name,
995 )
996 if application is None:
997 raise JujuApplicationNotFound("Cannot execute action")
998 # Racing condition:
999 # Ocassionally, self._get_leader_unit() will return None
1000 # because the leader elected hook has not been triggered yet.
1001 # Therefore, we are doing some retries. If it happens again,
1002 # re-open bug 1236
1003 if machine_id is None:
1004 unit = await self._get_leader_unit(application)
1005 self.log.debug(
1006 "Action {} is being executed on the leader unit {}".format(
1007 action_name, unit.name
1008 )
1009 )
1010 else:
1011 unit = self._get_unit(application, machine_id)
1012 if not unit:
1013 raise JujuError(
1014 "A unit with machine id {} not in available units".format(
1015 machine_id
1016 )
1017 )
1018 self.log.debug(
1019 "Action {} is being executed on {} unit".format(
1020 action_name, unit.name
1021 )
1022 )
1023
1024 actions = await application.get_actions()
1025
1026 if action_name not in actions:
1027 raise JujuActionNotFound(
1028 "Action {} not in available actions".format(action_name)
1029 )
1030
1031 action = await unit.run_action(action_name, **kwargs)
1032
1033 self.log.debug(
1034 "Wait until action {} is completed in application {} (model={})".format(
1035 action_name, application_name, model_name
1036 )
1037 )
1038 await JujuModelWatcher.wait_for(
1039 model=model,
1040 entity=action,
1041 progress_timeout=progress_timeout,
1042 total_timeout=total_timeout,
1043 db_dict=db_dict,
1044 n2vc=self.n2vc,
1045 vca_id=self.vca_connection._vca_id,
1046 )
1047
1048 output = await model.get_action_output(action_uuid=action.entity_id)
1049 status = await model.get_action_status(uuid_or_prefix=action.entity_id)
1050 status = (
1051 status[action.entity_id] if action.entity_id in status else "failed"
1052 )
1053
1054 self.log.debug(
1055 "Action {} completed with status {} in application {} (model={})".format(
1056 action_name, action.status, application_name, model_name
1057 )
1058 )
1059 finally:
1060 await self.disconnect_model(model)
1061 await self.disconnect_controller(controller)
1062
1063 return output, status
1064
1065 async def get_actions(self, application_name: str, model_name: str) -> dict:
1066 """Get list of actions
1067
1068 :param: application_name: Application name
1069 :param: model_name: Model name
1070
1071 :return: Dict with this format
1072 {
1073 "action_name": "Description of the action",
1074 ...
1075 }
1076 """
1077 self.log.debug(
1078 "Getting list of actions for application {}".format(application_name)
1079 )
1080
1081 # Get controller
1082 controller = await self.get_controller()
1083
1084 # Get model
1085 model = await self.get_model(controller, model_name)
1086
1087 try:
1088 # Get application
1089 application = self._get_application(
1090 model,
1091 application_name=application_name,
1092 )
1093
1094 # Return list of actions
1095 return await application.get_actions()
1096
1097 finally:
1098 # Disconnect from model and controller
1099 await self.disconnect_model(model)
1100 await self.disconnect_controller(controller)
1101
1102 async def get_metrics(self, model_name: str, application_name: str) -> dict:
1103 """Get the metrics collected by the VCA.
1104
1105 :param model_name The name or unique id of the network service
1106 :param application_name The name of the application
1107 """
1108 if not model_name or not application_name:
1109 raise Exception("model_name and application_name must be non-empty strings")
1110 metrics = {}
1111 controller = await self.get_controller()
1112 model = await self.get_model(controller, model_name)
1113 try:
1114 application = self._get_application(model, application_name)
1115 if application is not None:
1116 metrics = await application.get_metrics()
1117 finally:
1118 self.disconnect_model(model)
1119 self.disconnect_controller(controller)
1120 return metrics
1121
1122 async def add_relation(
1123 self,
1124 model_name: str,
1125 endpoint_1: str,
1126 endpoint_2: str,
1127 ):
1128 """Add relation
1129
1130 :param: model_name: Model name
1131 :param: endpoint_1 First endpoint name
1132 ("app:endpoint" format or directly the saas name)
1133 :param: endpoint_2: Second endpoint name (^ same format)
1134 """
1135
1136 self.log.debug("Adding relation: {} -> {}".format(endpoint_1, endpoint_2))
1137
1138 # Get controller
1139 controller = await self.get_controller()
1140
1141 # Get model
1142 model = await self.get_model(controller, model_name)
1143
1144 # Add relation
1145 try:
1146 await model.add_relation(endpoint_1, endpoint_2)
1147 except juju.errors.JujuAPIError as e:
1148 if "not found" in e.message:
1149 self.log.warning("Relation not found: {}".format(e.message))
1150 return
1151 if "already exists" in e.message:
1152 self.log.warning("Relation already exists: {}".format(e.message))
1153 return
1154 # another exception, raise it
1155 raise e
1156 finally:
1157 await self.disconnect_model(model)
1158 await self.disconnect_controller(controller)
1159
1160 async def consume(
1161 self,
1162 offer_url: str,
1163 model_name: str,
1164 ):
1165 """
1166 Adds a remote offer to the model. Relations can be created later using "juju relate".
1167
1168 :param: offer_url: Offer Url
1169 :param: model_name: Model name
1170
1171 :raises ParseError if there's a problem parsing the offer_url
1172 :raises JujuError if remote offer includes and endpoint
1173 :raises JujuAPIError if the operation is not successful
1174 """
1175 controller = await self.get_controller()
1176 model = await controller.get_model(model_name)
1177
1178 try:
1179 await model.consume(offer_url)
1180 finally:
1181 await self.disconnect_model(model)
1182 await self.disconnect_controller(controller)
1183
1184 async def destroy_model(self, model_name: str, total_timeout: float = 1800):
1185 """
1186 Destroy model
1187
1188 :param: model_name: Model name
1189 :param: total_timeout: Timeout
1190 """
1191
1192 controller = await self.get_controller()
1193 model = None
1194 try:
1195 if not await self.model_exists(model_name, controller=controller):
1196 return
1197
1198 self.log.debug("Destroying model {}".format(model_name))
1199
1200 model = await self.get_model(controller, model_name)
1201 # Destroy machines that are manually provisioned
1202 # and still are in pending state
1203 await self._destroy_pending_machines(model, only_manual=True)
1204 await self.disconnect_model(model)
1205
1206 await asyncio.wait_for(
1207 self._destroy_model(model_name, controller),
1208 timeout=total_timeout,
1209 )
1210 except Exception as e:
1211 if not await self.model_exists(model_name, controller=controller):
1212 return
1213 raise e
1214 finally:
1215 if model:
1216 await self.disconnect_model(model)
1217 await self.disconnect_controller(controller)
1218
1219 async def _destroy_model(
1220 self,
1221 model_name: str,
1222 controller: Controller,
1223 ):
1224 """
1225 Destroy model from controller
1226
1227 :param: model: Model name to be removed
1228 :param: controller: Controller object
1229 :param: timeout: Timeout in seconds
1230 """
1231
1232 async def _destroy_model_gracefully(model_name: str, controller: Controller):
1233 self.log.info(f"Gracefully deleting model {model_name}")
1234 resolved = False
1235 while model_name in await controller.list_models():
1236 if not resolved:
1237 await self.resolve(model_name)
1238 resolved = True
1239 await controller.destroy_model(model_name, destroy_storage=True)
1240
1241 await asyncio.sleep(5)
1242 self.log.info(f"Model {model_name} deleted gracefully")
1243
1244 async def _destroy_model_forcefully(model_name: str, controller: Controller):
1245 self.log.info(f"Forcefully deleting model {model_name}")
1246 while model_name in await controller.list_models():
1247 await controller.destroy_model(
1248 model_name, destroy_storage=True, force=True, max_wait=60
1249 )
1250 await asyncio.sleep(5)
1251 self.log.info(f"Model {model_name} deleted forcefully")
1252
1253 try:
1254 try:
1255 await asyncio.wait_for(
1256 _destroy_model_gracefully(model_name, controller), timeout=120
1257 )
1258 except asyncio.TimeoutError:
1259 await _destroy_model_forcefully(model_name, controller)
1260 except juju.errors.JujuError as e:
1261 if any("has been removed" in error for error in e.errors):
1262 return
1263 if any("model not found" in error for error in e.errors):
1264 return
1265 raise e
1266
1267 async def destroy_application(
1268 self, model_name: str, application_name: str, total_timeout: float
1269 ):
1270 """
1271 Destroy application
1272
1273 :param: model_name: Model name
1274 :param: application_name: Application name
1275 :param: total_timeout: Timeout
1276 """
1277
1278 controller = await self.get_controller()
1279 model = None
1280
1281 try:
1282 model = await self.get_model(controller, model_name)
1283 self.log.debug(
1284 "Destroying application {} in model {}".format(
1285 application_name, model_name
1286 )
1287 )
1288 application = self._get_application(model, application_name)
1289 if application:
1290 await application.destroy()
1291 else:
1292 self.log.warning("Application not found: {}".format(application_name))
1293
1294 self.log.debug(
1295 "Waiting for application {} to be destroyed in model {}...".format(
1296 application_name, model_name
1297 )
1298 )
1299 if total_timeout is None:
1300 total_timeout = 3600
1301 end = time.time() + total_timeout
1302 while time.time() < end:
1303 if not self._get_application(model, application_name):
1304 self.log.debug(
1305 "The application {} was destroyed in model {} ".format(
1306 application_name, model_name
1307 )
1308 )
1309 return
1310 await asyncio.sleep(5)
1311 raise Exception(
1312 "Timeout waiting for application {} to be destroyed in model {}".format(
1313 application_name, model_name
1314 )
1315 )
1316 finally:
1317 if model is not None:
1318 await self.disconnect_model(model)
1319 await self.disconnect_controller(controller)
1320
1321 async def _destroy_pending_machines(self, model: Model, only_manual: bool = False):
1322 """
1323 Destroy pending machines in a given model
1324
1325 :param: only_manual: Bool that indicates only manually provisioned
1326 machines should be destroyed (if True), or that
1327 all pending machines should be destroyed
1328 """
1329 status = await model.get_status()
1330 for machine_id in status.machines:
1331 machine_status = status.machines[machine_id]
1332 if machine_status.agent_status.status == "pending":
1333 if only_manual and not machine_status.instance_id.startswith("manual:"):
1334 break
1335 machine = model.machines[machine_id]
1336 await machine.destroy(force=True)
1337
1338 async def configure_application(
1339 self, model_name: str, application_name: str, config: dict = None
1340 ):
1341 """Configure application
1342
1343 :param: model_name: Model name
1344 :param: application_name: Application name
1345 :param: config: Config to apply to the charm
1346 """
1347 self.log.debug("Configuring application {}".format(application_name))
1348
1349 if config:
1350 controller = await self.get_controller()
1351 model = None
1352 try:
1353 model = await self.get_model(controller, model_name)
1354 application = self._get_application(
1355 model,
1356 application_name=application_name,
1357 )
1358 await application.set_config(config)
1359 finally:
1360 if model:
1361 await self.disconnect_model(model)
1362 await self.disconnect_controller(controller)
1363
1364 def handle_exception(self, loop, context):
1365 # All unhandled exceptions by libjuju are handled here.
1366 pass
1367
1368 async def health_check(self, interval: float = 300.0):
1369 """
1370 Health check to make sure controller and controller_model connections are OK
1371
1372 :param: interval: Time in seconds between checks
1373 """
1374 controller = None
1375 while True:
1376 try:
1377 controller = await self.get_controller()
1378 # self.log.debug("VCA is alive")
1379 except Exception as e:
1380 self.log.error("Health check to VCA failed: {}".format(e))
1381 finally:
1382 await self.disconnect_controller(controller)
1383 await asyncio.sleep(interval)
1384
1385 async def list_models(self, contains: str = None) -> [str]:
1386 """List models with certain names
1387
1388 :param: contains: String that is contained in model name
1389
1390 :retur: [models] Returns list of model names
1391 """
1392
1393 controller = await self.get_controller()
1394 try:
1395 models = await controller.list_models()
1396 if contains:
1397 models = [model for model in models if contains in model]
1398 return models
1399 finally:
1400 await self.disconnect_controller(controller)
1401
1402 async def list_offers(self, model_name: str) -> QueryApplicationOffersResults:
1403 """List models with certain names
1404
1405 :param: model_name: Model name
1406
1407 :return: Returns list of offers
1408 """
1409
1410 controller = await self.get_controller()
1411 try:
1412 return await controller.list_offers(model_name)
1413 finally:
1414 await self.disconnect_controller(controller)
1415
1416 async def add_k8s(
1417 self,
1418 name: str,
1419 rbac_id: str,
1420 token: str,
1421 client_cert_data: str,
1422 configuration: Configuration,
1423 storage_class: str,
1424 credential_name: str = None,
1425 ):
1426 """
1427 Add a Kubernetes cloud to the controller
1428
1429 Similar to the `juju add-k8s` command in the CLI
1430
1431 :param: name: Name for the K8s cloud
1432 :param: configuration: Kubernetes configuration object
1433 :param: storage_class: Storage Class to use in the cloud
1434 :param: credential_name: Storage Class to use in the cloud
1435 """
1436
1437 if not storage_class:
1438 raise Exception("storage_class must be a non-empty string")
1439 if not name:
1440 raise Exception("name must be a non-empty string")
1441 if not configuration:
1442 raise Exception("configuration must be provided")
1443
1444 endpoint = configuration.host
1445 credential = self.get_k8s_cloud_credential(
1446 configuration,
1447 client_cert_data,
1448 token,
1449 )
1450 credential.attrs[RBAC_LABEL_KEY_NAME] = rbac_id
1451 cloud = client.Cloud(
1452 type_="kubernetes",
1453 auth_types=[credential.auth_type],
1454 endpoint=endpoint,
1455 ca_certificates=[client_cert_data],
1456 config={
1457 "operator-storage": storage_class,
1458 "workload-storage": storage_class,
1459 },
1460 )
1461
1462 return await self.add_cloud(
1463 name, cloud, credential, credential_name=credential_name
1464 )
1465
1466 def get_k8s_cloud_credential(
1467 self,
1468 configuration: Configuration,
1469 client_cert_data: str,
1470 token: str = None,
1471 ) -> client.CloudCredential:
1472 attrs = {}
1473 # TODO: Test with AKS
1474 key = None # open(configuration.key_file, "r").read()
1475 username = configuration.username
1476 password = configuration.password
1477
1478 if client_cert_data:
1479 attrs["ClientCertificateData"] = client_cert_data
1480 if key:
1481 attrs["ClientKeyData"] = key
1482 if token:
1483 if username or password:
1484 raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass")
1485 attrs["Token"] = token
1486
1487 auth_type = None
1488 if key:
1489 auth_type = "oauth2"
1490 if client_cert_data:
1491 auth_type = "oauth2withcert"
1492 if not token:
1493 raise JujuInvalidK8sConfiguration(
1494 "missing token for auth type {}".format(auth_type)
1495 )
1496 elif username:
1497 if not password:
1498 self.log.debug(
1499 "credential for user {} has empty password".format(username)
1500 )
1501 attrs["username"] = username
1502 attrs["password"] = password
1503 if client_cert_data:
1504 auth_type = "userpasswithcert"
1505 else:
1506 auth_type = "userpass"
1507 elif client_cert_data and token:
1508 auth_type = "certificate"
1509 else:
1510 raise JujuInvalidK8sConfiguration("authentication method not supported")
1511 return client.CloudCredential(auth_type=auth_type, attrs=attrs)
1512
1513 async def add_cloud(
1514 self,
1515 name: str,
1516 cloud: Cloud,
1517 credential: CloudCredential = None,
1518 credential_name: str = None,
1519 ) -> Cloud:
1520 """
1521 Add cloud to the controller
1522
1523 :param: name: Name of the cloud to be added
1524 :param: cloud: Cloud object
1525 :param: credential: CloudCredentials object for the cloud
1526 :param: credential_name: Credential name.
1527 If not defined, cloud of the name will be used.
1528 """
1529 controller = await self.get_controller()
1530 try:
1531 _ = await controller.add_cloud(name, cloud)
1532 if credential:
1533 await controller.add_credential(
1534 credential_name or name, credential=credential, cloud=name
1535 )
1536 # Need to return the object returned by the controller.add_cloud() function
1537 # I'm returning the original value now until this bug is fixed:
1538 # https://github.com/juju/python-libjuju/issues/443
1539 return cloud
1540 finally:
1541 await self.disconnect_controller(controller)
1542
1543 async def remove_cloud(self, name: str):
1544 """
1545 Remove cloud
1546
1547 :param: name: Name of the cloud to be removed
1548 """
1549 controller = await self.get_controller()
1550 try:
1551 await controller.remove_cloud(name)
1552 except juju.errors.JujuError as e:
1553 if len(e.errors) == 1 and f'cloud "{name}" not found' == e.errors[0]:
1554 self.log.warning(f"Cloud {name} not found, so it could not be deleted.")
1555 else:
1556 raise e
1557 finally:
1558 await self.disconnect_controller(controller)
1559
1560 @retry(attempts=20, delay=5, fallback=JujuLeaderUnitNotFound())
1561 async def _get_leader_unit(self, application: Application) -> Unit:
1562 unit = None
1563 for u in application.units:
1564 if await u.is_leader_from_status():
1565 unit = u
1566 break
1567 if not unit:
1568 raise Exception()
1569 return unit
1570
1571 async def get_cloud_credentials(self, cloud: Cloud) -> typing.List:
1572 """
1573 Get cloud credentials
1574
1575 :param: cloud: Cloud object. The returned credentials will be from this cloud.
1576
1577 :return: List of credentials object associated to the specified cloud
1578
1579 """
1580 controller = await self.get_controller()
1581 try:
1582 facade = client.CloudFacade.from_connection(controller.connection())
1583 cloud_cred_tag = tag.credential(
1584 cloud.name, self.vca_connection.data.user, cloud.credential_name
1585 )
1586 params = [client.Entity(cloud_cred_tag)]
1587 return (await facade.Credential(params)).results
1588 finally:
1589 await self.disconnect_controller(controller)
1590
1591 async def check_application_exists(self, model_name, application_name) -> bool:
1592 """Check application exists
1593
1594 :param: model_name: Model Name
1595 :param: application_name: Application Name
1596
1597 :return: Boolean
1598 """
1599
1600 model = None
1601 controller = await self.get_controller()
1602 try:
1603 model = await self.get_model(controller, model_name)
1604 self.log.debug(
1605 "Checking if application {} exists in model {}".format(
1606 application_name, model_name
1607 )
1608 )
1609 return self._get_application(model, application_name) is not None
1610 finally:
1611 if model:
1612 await self.disconnect_model(model)
1613 await self.disconnect_controller(controller)