5e4bef350cf8b6ec77b4bf9302a65a5dd44d3ddb
[osm/N2VC.git] / n2vc / libjuju.py
1 # Copyright 2020 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import logging
17 import typing
18
19 import time
20
21 import juju.errors
22 from juju.model import Model
23 from juju.machine import Machine
24 from juju.application import Application
25 from juju.unit import Unit
26 from juju.client._definitions import (
27 FullStatus,
28 QueryApplicationOffersResults,
29 Cloud,
30 CloudCredential,
31 )
32 from juju.controller import Controller
33 from juju.client import client
34 from juju import tag
35
36 from n2vc.juju_watcher import JujuModelWatcher
37 from n2vc.provisioner import AsyncSSHProvisioner
38 from n2vc.n2vc_conn import N2VCConnector
39 from n2vc.exceptions import (
40 JujuMachineNotFound,
41 JujuApplicationNotFound,
42 JujuLeaderUnitNotFound,
43 JujuActionNotFound,
44 JujuControllerFailedConnecting,
45 JujuApplicationExists,
46 JujuInvalidK8sConfiguration,
47 JujuError,
48 )
49 from n2vc.vca.cloud import Cloud as VcaCloud
50 from n2vc.vca.connection import Connection
51 from kubernetes.client.configuration import Configuration
52 from retrying_async import retry
53
54
55 RBAC_LABEL_KEY_NAME = "rbac-id"
56
57
58 class Libjuju:
59 def __init__(
60 self,
61 vca_connection: Connection,
62 loop: asyncio.AbstractEventLoop = None,
63 log: logging.Logger = None,
64 n2vc: N2VCConnector = None,
65 ):
66 """
67 Constructor
68
69 :param: vca_connection: n2vc.vca.connection object
70 :param: loop: Asyncio loop
71 :param: log: Logger
72 :param: n2vc: N2VC object
73 """
74
75 self.log = log or logging.getLogger("Libjuju")
76 self.n2vc = n2vc
77 self.vca_connection = vca_connection
78
79 self.loop = loop or asyncio.get_event_loop()
80 self.loop.set_exception_handler(self.handle_exception)
81 self.creating_model = asyncio.Lock(loop=self.loop)
82
83 if self.vca_connection.is_default:
84 self.health_check_task = self._create_health_check_task()
85
86 def _create_health_check_task(self):
87 return self.loop.create_task(self.health_check())
88
89 async def get_controller(self, timeout: float = 60.0) -> Controller:
90 """
91 Get controller
92
93 :param: timeout: Time in seconds to wait for controller to connect
94 """
95 controller = None
96 try:
97 controller = Controller()
98 await asyncio.wait_for(
99 controller.connect(
100 endpoint=self.vca_connection.data.endpoints,
101 username=self.vca_connection.data.user,
102 password=self.vca_connection.data.secret,
103 cacert=self.vca_connection.data.cacert,
104 ),
105 timeout=timeout,
106 )
107 if self.vca_connection.is_default:
108 endpoints = await controller.api_endpoints
109 if not all(
110 endpoint in self.vca_connection.endpoints for endpoint in endpoints
111 ):
112 await self.vca_connection.update_endpoints(endpoints)
113 return controller
114 except asyncio.CancelledError as e:
115 raise e
116 except Exception as e:
117 self.log.error(
118 "Failed connecting to controller: {}... {}".format(
119 self.vca_connection.data.endpoints, e
120 )
121 )
122 if controller:
123 await self.disconnect_controller(controller)
124
125 raise JujuControllerFailedConnecting(
126 f"Error connecting to Juju controller: {e}"
127 )
128
129 async def disconnect(self):
130 """Disconnect"""
131 # Cancel health check task
132 self.health_check_task.cancel()
133 self.log.debug("Libjuju disconnected!")
134
135 async def disconnect_model(self, model: Model):
136 """
137 Disconnect model
138
139 :param: model: Model that will be disconnected
140 """
141 await model.disconnect()
142
143 async def disconnect_controller(self, controller: Controller):
144 """
145 Disconnect controller
146
147 :param: controller: Controller that will be disconnected
148 """
149 if controller:
150 await controller.disconnect()
151
152 @retry(attempts=3, delay=5, timeout=None)
153 async def add_model(self, model_name: str, cloud: VcaCloud):
154 """
155 Create model
156
157 :param: model_name: Model name
158 :param: cloud: Cloud object
159 """
160
161 # Get controller
162 controller = await self.get_controller()
163 model = None
164 try:
165 # Block until other workers have finished model creation
166 while self.creating_model.locked():
167 await asyncio.sleep(0.1)
168
169 # Create the model
170 async with self.creating_model:
171 if await self.model_exists(model_name, controller=controller):
172 return
173 self.log.debug("Creating model {}".format(model_name))
174 model = await controller.add_model(
175 model_name,
176 config=self.vca_connection.data.model_config,
177 cloud_name=cloud.name,
178 credential_name=cloud.credential_name,
179 )
180 except juju.errors.JujuAPIError as e:
181 if "already exists" in e.message:
182 pass
183 else:
184 raise e
185 finally:
186 if model:
187 await self.disconnect_model(model)
188 await self.disconnect_controller(controller)
189
190 async def get_executed_actions(self, model_name: str) -> list:
191 """
192 Get executed/history of actions for a model.
193
194 :param: model_name: Model name, str.
195 :return: List of executed actions for a model.
196 """
197 model = None
198 executed_actions = []
199 controller = await self.get_controller()
200 try:
201 model = await self.get_model(controller, model_name)
202 # Get all unique action names
203 actions = {}
204 for application in model.applications:
205 application_actions = await self.get_actions(application, model_name)
206 actions.update(application_actions)
207 # Get status of all actions
208 for application_action in actions:
209 app_action_status_list = await model.get_action_status(
210 name=application_action
211 )
212 for action_id, action_status in app_action_status_list.items():
213 executed_action = {
214 "id": action_id,
215 "action": application_action,
216 "status": action_status,
217 }
218 # Get action output by id
219 action_status = await model.get_action_output(executed_action["id"])
220 for k, v in action_status.items():
221 executed_action[k] = v
222 executed_actions.append(executed_action)
223 except Exception as e:
224 raise JujuError(
225 "Error in getting executed actions for model: {}. Error: {}".format(
226 model_name, str(e)
227 )
228 )
229 finally:
230 if model:
231 await self.disconnect_model(model)
232 await self.disconnect_controller(controller)
233 return executed_actions
234
235 async def get_application_configs(
236 self, model_name: str, application_name: str
237 ) -> dict:
238 """
239 Get available configs for an application.
240
241 :param: model_name: Model name, str.
242 :param: application_name: Application name, str.
243
244 :return: A dict which has key - action name, value - action description
245 """
246 model = None
247 application_configs = {}
248 controller = await self.get_controller()
249 try:
250 model = await self.get_model(controller, model_name)
251 application = self._get_application(
252 model, application_name=application_name
253 )
254 application_configs = await application.get_config()
255 except Exception as e:
256 raise JujuError(
257 "Error in getting configs for application: {} in model: {}. Error: {}".format(
258 application_name, model_name, str(e)
259 )
260 )
261 finally:
262 if model:
263 await self.disconnect_model(model)
264 await self.disconnect_controller(controller)
265 return application_configs
266
267 @retry(attempts=3, delay=5)
268 async def get_model(self, controller: Controller, model_name: str) -> Model:
269 """
270 Get model from controller
271
272 :param: controller: Controller
273 :param: model_name: Model name
274
275 :return: Model: The created Juju model object
276 """
277 return await controller.get_model(model_name)
278
279 async def model_exists(
280 self, model_name: str, controller: Controller = None
281 ) -> bool:
282 """
283 Check if model exists
284
285 :param: controller: Controller
286 :param: model_name: Model name
287
288 :return bool
289 """
290 need_to_disconnect = False
291
292 # Get controller if not passed
293 if not controller:
294 controller = await self.get_controller()
295 need_to_disconnect = True
296
297 # Check if model exists
298 try:
299 return model_name in await controller.list_models()
300 finally:
301 if need_to_disconnect:
302 await self.disconnect_controller(controller)
303
304 async def models_exist(self, model_names: [str]) -> (bool, list):
305 """
306 Check if models exists
307
308 :param: model_names: List of strings with model names
309
310 :return (bool, list[str]): (True if all models exists, List of model names that don't exist)
311 """
312 if not model_names:
313 raise Exception(
314 "model_names must be a non-empty array. Given value: {}".format(
315 model_names
316 )
317 )
318 non_existing_models = []
319 models = await self.list_models()
320 existing_models = list(set(models).intersection(model_names))
321 non_existing_models = list(set(model_names) - set(existing_models))
322
323 return (
324 len(non_existing_models) == 0,
325 non_existing_models,
326 )
327
328 async def get_model_status(self, model_name: str) -> FullStatus:
329 """
330 Get model status
331
332 :param: model_name: Model name
333
334 :return: Full status object
335 """
336 controller = await self.get_controller()
337 model = await self.get_model(controller, model_name)
338 try:
339 return await model.get_status()
340 finally:
341 await self.disconnect_model(model)
342 await self.disconnect_controller(controller)
343
344 async def create_machine(
345 self,
346 model_name: str,
347 machine_id: str = None,
348 db_dict: dict = None,
349 progress_timeout: float = None,
350 total_timeout: float = None,
351 series: str = "bionic",
352 wait: bool = True,
353 ) -> (Machine, bool):
354 """
355 Create machine
356
357 :param: model_name: Model name
358 :param: machine_id: Machine id
359 :param: db_dict: Dictionary with data of the DB to write the updates
360 :param: progress_timeout: Maximum time between two updates in the model
361 :param: total_timeout: Timeout for the entity to be active
362 :param: series: Series of the machine (xenial, bionic, focal, ...)
363 :param: wait: Wait until machine is ready
364
365 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
366 if the machine is new or it already existed
367 """
368 new = False
369 machine = None
370
371 self.log.debug(
372 "Creating machine (id={}) in model: {}".format(machine_id, model_name)
373 )
374
375 # Get controller
376 controller = await self.get_controller()
377
378 # Get model
379 model = await self.get_model(controller, model_name)
380 try:
381 if machine_id is not None:
382 self.log.debug(
383 "Searching machine (id={}) in model {}".format(
384 machine_id, model_name
385 )
386 )
387
388 # Get machines from model and get the machine with machine_id if exists
389 machines = await model.get_machines()
390 if machine_id in machines:
391 self.log.debug(
392 "Machine (id={}) found in model {}".format(
393 machine_id, model_name
394 )
395 )
396 machine = machines[machine_id]
397 else:
398 raise JujuMachineNotFound("Machine {} not found".format(machine_id))
399
400 if machine is None:
401 self.log.debug("Creating a new machine in model {}".format(model_name))
402
403 # Create machine
404 machine = await model.add_machine(
405 spec=None, constraints=None, disks=None, series=series
406 )
407 new = True
408
409 # Wait until the machine is ready
410 self.log.debug(
411 "Wait until machine {} is ready in model {}".format(
412 machine.entity_id, model_name
413 )
414 )
415 if wait:
416 await JujuModelWatcher.wait_for(
417 model=model,
418 entity=machine,
419 progress_timeout=progress_timeout,
420 total_timeout=total_timeout,
421 db_dict=db_dict,
422 n2vc=self.n2vc,
423 vca_id=self.vca_connection._vca_id,
424 )
425 finally:
426 await self.disconnect_model(model)
427 await self.disconnect_controller(controller)
428
429 self.log.debug(
430 "Machine {} ready at {} in model {}".format(
431 machine.entity_id, machine.dns_name, model_name
432 )
433 )
434 return machine, new
435
436 async def provision_machine(
437 self,
438 model_name: str,
439 hostname: str,
440 username: str,
441 private_key_path: str,
442 db_dict: dict = None,
443 progress_timeout: float = None,
444 total_timeout: float = None,
445 ) -> str:
446 """
447 Manually provisioning of a machine
448
449 :param: model_name: Model name
450 :param: hostname: IP to access the machine
451 :param: username: Username to login to the machine
452 :param: private_key_path: Local path for the private key
453 :param: db_dict: Dictionary with data of the DB to write the updates
454 :param: progress_timeout: Maximum time between two updates in the model
455 :param: total_timeout: Timeout for the entity to be active
456
457 :return: (Entity): Machine id
458 """
459 self.log.debug(
460 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
461 model_name, hostname, username
462 )
463 )
464
465 # Get controller
466 controller = await self.get_controller()
467
468 # Get model
469 model = await self.get_model(controller, model_name)
470
471 try:
472 # Get provisioner
473 provisioner = AsyncSSHProvisioner(
474 host=hostname,
475 user=username,
476 private_key_path=private_key_path,
477 log=self.log,
478 )
479
480 # Provision machine
481 params = await provisioner.provision_machine()
482
483 params.jobs = ["JobHostUnits"]
484
485 self.log.debug("Adding machine to model")
486 connection = model.connection()
487 client_facade = client.ClientFacade.from_connection(connection)
488
489 results = await client_facade.AddMachines(params=[params])
490 error = results.machines[0].error
491
492 if error:
493 msg = "Error adding machine: {}".format(error.message)
494 self.log.error(msg=msg)
495 raise ValueError(msg)
496
497 machine_id = results.machines[0].machine
498
499 self.log.debug("Installing Juju agent into machine {}".format(machine_id))
500 asyncio.ensure_future(
501 provisioner.install_agent(
502 connection=connection,
503 nonce=params.nonce,
504 machine_id=machine_id,
505 proxy=self.vca_connection.data.api_proxy,
506 series=params.series,
507 )
508 )
509
510 machine = None
511 for _ in range(10):
512 machine_list = await model.get_machines()
513 if machine_id in machine_list:
514 self.log.debug("Machine {} found in model!".format(machine_id))
515 machine = model.machines.get(machine_id)
516 break
517 await asyncio.sleep(2)
518
519 if machine is None:
520 msg = "Machine {} not found in model".format(machine_id)
521 self.log.error(msg=msg)
522 raise JujuMachineNotFound(msg)
523
524 self.log.debug(
525 "Wait until machine {} is ready in model {}".format(
526 machine.entity_id, model_name
527 )
528 )
529 await JujuModelWatcher.wait_for(
530 model=model,
531 entity=machine,
532 progress_timeout=progress_timeout,
533 total_timeout=total_timeout,
534 db_dict=db_dict,
535 n2vc=self.n2vc,
536 vca_id=self.vca_connection._vca_id,
537 )
538 except Exception as e:
539 raise e
540 finally:
541 await self.disconnect_model(model)
542 await self.disconnect_controller(controller)
543
544 self.log.debug(
545 "Machine provisioned {} in model {}".format(machine_id, model_name)
546 )
547
548 return machine_id
549
550 async def deploy(
551 self, uri: str, model_name: str, wait: bool = True, timeout: float = 3600
552 ):
553 """
554 Deploy bundle or charm: Similar to the juju CLI command `juju deploy`
555
556 :param: uri: Path or Charm Store uri in which the charm or bundle can be found
557 :param: model_name: Model name
558 :param: wait: Indicates whether to wait or not until all applications are active
559 :param: timeout: Time in seconds to wait until all applications are active
560 """
561 controller = await self.get_controller()
562 model = await self.get_model(controller, model_name)
563 try:
564 await model.deploy(uri, trust=True)
565 if wait:
566 await JujuModelWatcher.wait_for_model(model, timeout=timeout)
567 self.log.debug("All units active in model {}".format(model_name))
568 finally:
569 await self.disconnect_model(model)
570 await self.disconnect_controller(controller)
571
572 async def add_unit(
573 self,
574 application_name: str,
575 model_name: str,
576 machine_id: str,
577 db_dict: dict = None,
578 progress_timeout: float = None,
579 total_timeout: float = None,
580 ):
581 """Add unit
582
583 :param: application_name: Application name
584 :param: model_name: Model name
585 :param: machine_id Machine id
586 :param: db_dict: Dictionary with data of the DB to write the updates
587 :param: progress_timeout: Maximum time between two updates in the model
588 :param: total_timeout: Timeout for the entity to be active
589
590 :return: None
591 """
592
593 model = None
594 controller = await self.get_controller()
595 try:
596 model = await self.get_model(controller, model_name)
597 application = self._get_application(model, application_name)
598
599 if application is not None:
600
601 # Checks if the given machine id in the model,
602 # otherwise function raises an error
603 _machine, _series = self._get_machine_info(model, machine_id)
604
605 self.log.debug(
606 "Adding unit (machine {}) to application {} in model ~{}".format(
607 machine_id, application_name, model_name
608 )
609 )
610
611 await application.add_unit(to=machine_id)
612
613 await JujuModelWatcher.wait_for(
614 model=model,
615 entity=application,
616 progress_timeout=progress_timeout,
617 total_timeout=total_timeout,
618 db_dict=db_dict,
619 n2vc=self.n2vc,
620 vca_id=self.vca_connection._vca_id,
621 )
622 self.log.debug(
623 "Unit is added to application {} in model {}".format(
624 application_name, model_name
625 )
626 )
627 else:
628 raise JujuApplicationNotFound(
629 "Application {} not exists".format(application_name)
630 )
631 finally:
632 if model:
633 await self.disconnect_model(model)
634 await self.disconnect_controller(controller)
635
636 async def destroy_unit(
637 self,
638 application_name: str,
639 model_name: str,
640 machine_id: str,
641 total_timeout: float = None,
642 ):
643 """Destroy unit
644
645 :param: application_name: Application name
646 :param: model_name: Model name
647 :param: machine_id Machine id
648 :param: total_timeout: Timeout for the entity to be active
649
650 :return: None
651 """
652
653 model = None
654 controller = await self.get_controller()
655 try:
656 model = await self.get_model(controller, model_name)
657 application = self._get_application(model, application_name)
658
659 if application is None:
660 raise JujuApplicationNotFound(
661 "Application not found: {} (model={})".format(
662 application_name, model_name
663 )
664 )
665
666 unit = self._get_unit(application, machine_id)
667 if not unit:
668 raise JujuError(
669 "A unit with machine id {} not in available units".format(
670 machine_id
671 )
672 )
673
674 unit_name = unit.name
675
676 self.log.debug(
677 "Destroying unit {} from application {} in model {}".format(
678 unit_name, application_name, model_name
679 )
680 )
681 await application.destroy_unit(unit_name)
682
683 self.log.debug(
684 "Waiting for unit {} to be destroyed in application {} (model={})...".format(
685 unit_name, application_name, model_name
686 )
687 )
688
689 # TODO: Add functionality in the Juju watcher to replace this kind of blocks
690 if total_timeout is None:
691 total_timeout = 3600
692 end = time.time() + total_timeout
693 while time.time() < end:
694 if not self._get_unit(application, machine_id):
695 self.log.debug(
696 "The unit {} was destroyed in application {} (model={}) ".format(
697 unit_name, application_name, model_name
698 )
699 )
700 return
701 await asyncio.sleep(5)
702 self.log.debug(
703 "Unit {} is destroyed from application {} in model {}".format(
704 unit_name, application_name, model_name
705 )
706 )
707 finally:
708 if model:
709 await self.disconnect_model(model)
710 await self.disconnect_controller(controller)
711
712 async def deploy_charm(
713 self,
714 application_name: str,
715 path: str,
716 model_name: str,
717 machine_id: str,
718 db_dict: dict = None,
719 progress_timeout: float = None,
720 total_timeout: float = None,
721 config: dict = None,
722 series: str = None,
723 num_units: int = 1,
724 ):
725 """Deploy charm
726
727 :param: application_name: Application name
728 :param: path: Local path to the charm
729 :param: model_name: Model name
730 :param: machine_id ID of the machine
731 :param: db_dict: Dictionary with data of the DB to write the updates
732 :param: progress_timeout: Maximum time between two updates in the model
733 :param: total_timeout: Timeout for the entity to be active
734 :param: config: Config for the charm
735 :param: series: Series of the charm
736 :param: num_units: Number of units
737
738 :return: (juju.application.Application): Juju application
739 """
740 self.log.debug(
741 "Deploying charm {} to machine {} in model ~{}".format(
742 application_name, machine_id, model_name
743 )
744 )
745 self.log.debug("charm: {}".format(path))
746
747 # Get controller
748 controller = await self.get_controller()
749
750 # Get model
751 model = await self.get_model(controller, model_name)
752
753 try:
754 if application_name not in model.applications:
755
756 if machine_id is not None:
757 machine, series = self._get_machine_info(model, machine_id)
758
759 application = await model.deploy(
760 entity_url=path,
761 application_name=application_name,
762 channel="stable",
763 num_units=1,
764 series=series,
765 to=machine_id,
766 config=config,
767 )
768
769 self.log.debug(
770 "Wait until application {} is ready in model {}".format(
771 application_name, model_name
772 )
773 )
774 if num_units > 1:
775 for _ in range(num_units - 1):
776 m, _ = await self.create_machine(model_name, wait=False)
777 await application.add_unit(to=m.entity_id)
778
779 await JujuModelWatcher.wait_for(
780 model=model,
781 entity=application,
782 progress_timeout=progress_timeout,
783 total_timeout=total_timeout,
784 db_dict=db_dict,
785 n2vc=self.n2vc,
786 vca_id=self.vca_connection._vca_id,
787 )
788 self.log.debug(
789 "Application {} is ready in model {}".format(
790 application_name, model_name
791 )
792 )
793 else:
794 raise JujuApplicationExists(
795 "Application {} exists".format(application_name)
796 )
797 except juju.errors.JujuError as e:
798 if "already exists" in e.message:
799 raise JujuApplicationExists(
800 "Application {} exists".format(application_name)
801 )
802 else:
803 raise e
804 finally:
805 await self.disconnect_model(model)
806 await self.disconnect_controller(controller)
807
808 return application
809
810 async def resolve(self, model_name: str):
811
812 controller = await self.get_controller()
813 model = await self.get_model(controller, model_name)
814 all_units_active = False
815 try:
816 while not all_units_active:
817 all_units_active = True
818 for application_name, application in model.applications.items():
819 if application.status == "error":
820 for unit in application.units:
821 if unit.workload_status == "error":
822 self.log.debug(
823 "Model {}, Application {}, Unit {} in error state, resolving".format(
824 model_name, application_name, unit.entity_id
825 )
826 )
827 try:
828 await unit.resolved(retry=False)
829 all_units_active = False
830 except Exception:
831 pass
832
833 if not all_units_active:
834 await asyncio.sleep(5)
835 finally:
836 await self.disconnect_model(model)
837 await self.disconnect_controller(controller)
838
839 async def scale_application(
840 self,
841 model_name: str,
842 application_name: str,
843 scale: int = 1,
844 total_timeout: float = None,
845 ):
846 """
847 Scale application (K8s)
848
849 :param: model_name: Model name
850 :param: application_name: Application name
851 :param: scale: Scale to which to set this application
852 :param: total_timeout: Timeout for the entity to be active
853 """
854
855 model = None
856 controller = await self.get_controller()
857 try:
858 model = await self.get_model(controller, model_name)
859
860 self.log.debug(
861 "Scaling application {} in model {}".format(
862 application_name, model_name
863 )
864 )
865 application = self._get_application(model, application_name)
866 if application is None:
867 raise JujuApplicationNotFound("Cannot scale application")
868 await application.scale(scale=scale)
869 # Wait until application is scaled in model
870 self.log.debug(
871 "Waiting for application {} to be scaled in model {}...".format(
872 application_name, model_name
873 )
874 )
875 if total_timeout is None:
876 total_timeout = 1800
877 end = time.time() + total_timeout
878 while time.time() < end:
879 application_scale = self._get_application_count(model, application_name)
880 # Before calling wait_for_model function,
881 # wait until application unit count and scale count are equal.
882 # Because there is a delay before scaling triggers in Juju model.
883 if application_scale == scale:
884 await JujuModelWatcher.wait_for_model(
885 model=model, timeout=total_timeout
886 )
887 self.log.debug(
888 "Application {} is scaled in model {}".format(
889 application_name, model_name
890 )
891 )
892 return
893 await asyncio.sleep(5)
894 raise Exception(
895 "Timeout waiting for application {} in model {} to be scaled".format(
896 application_name, model_name
897 )
898 )
899 finally:
900 if model:
901 await self.disconnect_model(model)
902 await self.disconnect_controller(controller)
903
904 def _get_application_count(self, model: Model, application_name: str) -> int:
905 """Get number of units of the application
906
907 :param: model: Model object
908 :param: application_name: Application name
909
910 :return: int (or None if application doesn't exist)
911 """
912 application = self._get_application(model, application_name)
913 if application is not None:
914 return len(application.units)
915
916 def _get_application(self, model: Model, application_name: str) -> Application:
917 """Get application
918
919 :param: model: Model object
920 :param: application_name: Application name
921
922 :return: juju.application.Application (or None if it doesn't exist)
923 """
924 if model.applications and application_name in model.applications:
925 return model.applications[application_name]
926
927 def _get_unit(self, application: Application, machine_id: str) -> Unit:
928 """Get unit
929
930 :param: application: Application object
931 :param: machine_id: Machine id
932
933 :return: Unit
934 """
935 unit = None
936 for u in application.units:
937 if u.machine_id == machine_id:
938 unit = u
939 break
940 return unit
941
942 def _get_machine_info(
943 self,
944 model,
945 machine_id: str,
946 ) -> (str, str):
947 """Get machine info
948
949 :param: model: Model object
950 :param: machine_id: Machine id
951
952 :return: (str, str): (machine, series)
953 """
954 if machine_id not in model.machines:
955 msg = "Machine {} not found in model".format(machine_id)
956 self.log.error(msg=msg)
957 raise JujuMachineNotFound(msg)
958 machine = model.machines[machine_id]
959 return machine, machine.series
960
961 async def execute_action(
962 self,
963 application_name: str,
964 model_name: str,
965 action_name: str,
966 db_dict: dict = None,
967 machine_id: str = None,
968 progress_timeout: float = None,
969 total_timeout: float = None,
970 **kwargs,
971 ):
972 """Execute action
973
974 :param: application_name: Application name
975 :param: model_name: Model name
976 :param: action_name: Name of the action
977 :param: db_dict: Dictionary with data of the DB to write the updates
978 :param: machine_id Machine id
979 :param: progress_timeout: Maximum time between two updates in the model
980 :param: total_timeout: Timeout for the entity to be active
981
982 :return: (str, str): (output and status)
983 """
984 self.log.debug(
985 "Executing action {} using params {}".format(action_name, kwargs)
986 )
987 # Get controller
988 controller = await self.get_controller()
989
990 # Get model
991 model = await self.get_model(controller, model_name)
992
993 try:
994 # Get application
995 application = self._get_application(
996 model,
997 application_name=application_name,
998 )
999 if application is None:
1000 raise JujuApplicationNotFound("Cannot execute action")
1001 # Racing condition:
1002 # Ocassionally, self._get_leader_unit() will return None
1003 # because the leader elected hook has not been triggered yet.
1004 # Therefore, we are doing some retries. If it happens again,
1005 # re-open bug 1236
1006 if machine_id is None:
1007 unit = await self._get_leader_unit(application)
1008 self.log.debug(
1009 "Action {} is being executed on the leader unit {}".format(
1010 action_name, unit.name
1011 )
1012 )
1013 else:
1014 unit = self._get_unit(application, machine_id)
1015 if not unit:
1016 raise JujuError(
1017 "A unit with machine id {} not in available units".format(
1018 machine_id
1019 )
1020 )
1021 self.log.debug(
1022 "Action {} is being executed on {} unit".format(
1023 action_name, unit.name
1024 )
1025 )
1026
1027 actions = await application.get_actions()
1028
1029 if action_name not in actions:
1030 raise JujuActionNotFound(
1031 "Action {} not in available actions".format(action_name)
1032 )
1033
1034 action = await unit.run_action(action_name, **kwargs)
1035
1036 self.log.debug(
1037 "Wait until action {} is completed in application {} (model={})".format(
1038 action_name, application_name, model_name
1039 )
1040 )
1041 await JujuModelWatcher.wait_for(
1042 model=model,
1043 entity=action,
1044 progress_timeout=progress_timeout,
1045 total_timeout=total_timeout,
1046 db_dict=db_dict,
1047 n2vc=self.n2vc,
1048 vca_id=self.vca_connection._vca_id,
1049 )
1050
1051 output = await model.get_action_output(action_uuid=action.entity_id)
1052 status = await model.get_action_status(uuid_or_prefix=action.entity_id)
1053 status = (
1054 status[action.entity_id] if action.entity_id in status else "failed"
1055 )
1056
1057 self.log.debug(
1058 "Action {} completed with status {} in application {} (model={})".format(
1059 action_name, action.status, application_name, model_name
1060 )
1061 )
1062 finally:
1063 await self.disconnect_model(model)
1064 await self.disconnect_controller(controller)
1065
1066 return output, status
1067
1068 async def get_actions(self, application_name: str, model_name: str) -> dict:
1069 """Get list of actions
1070
1071 :param: application_name: Application name
1072 :param: model_name: Model name
1073
1074 :return: Dict with this format
1075 {
1076 "action_name": "Description of the action",
1077 ...
1078 }
1079 """
1080 self.log.debug(
1081 "Getting list of actions for application {}".format(application_name)
1082 )
1083
1084 # Get controller
1085 controller = await self.get_controller()
1086
1087 # Get model
1088 model = await self.get_model(controller, model_name)
1089
1090 try:
1091 # Get application
1092 application = self._get_application(
1093 model,
1094 application_name=application_name,
1095 )
1096
1097 # Return list of actions
1098 return await application.get_actions()
1099
1100 finally:
1101 # Disconnect from model and controller
1102 await self.disconnect_model(model)
1103 await self.disconnect_controller(controller)
1104
1105 async def get_metrics(self, model_name: str, application_name: str) -> dict:
1106 """Get the metrics collected by the VCA.
1107
1108 :param model_name The name or unique id of the network service
1109 :param application_name The name of the application
1110 """
1111 if not model_name or not application_name:
1112 raise Exception("model_name and application_name must be non-empty strings")
1113 metrics = {}
1114 controller = await self.get_controller()
1115 model = await self.get_model(controller, model_name)
1116 try:
1117 application = self._get_application(model, application_name)
1118 if application is not None:
1119 metrics = await application.get_metrics()
1120 finally:
1121 self.disconnect_model(model)
1122 self.disconnect_controller(controller)
1123 return metrics
1124
1125 async def add_relation(
1126 self,
1127 model_name: str,
1128 endpoint_1: str,
1129 endpoint_2: str,
1130 ):
1131 """Add relation
1132
1133 :param: model_name: Model name
1134 :param: endpoint_1 First endpoint name
1135 ("app:endpoint" format or directly the saas name)
1136 :param: endpoint_2: Second endpoint name (^ same format)
1137 """
1138
1139 self.log.debug("Adding relation: {} -> {}".format(endpoint_1, endpoint_2))
1140
1141 # Get controller
1142 controller = await self.get_controller()
1143
1144 # Get model
1145 model = await self.get_model(controller, model_name)
1146
1147 # Add relation
1148 try:
1149 await model.add_relation(endpoint_1, endpoint_2)
1150 except juju.errors.JujuAPIError as e:
1151 if "not found" in e.message:
1152 self.log.warning("Relation not found: {}".format(e.message))
1153 return
1154 if "already exists" in e.message:
1155 self.log.warning("Relation already exists: {}".format(e.message))
1156 return
1157 # another exception, raise it
1158 raise e
1159 finally:
1160 await self.disconnect_model(model)
1161 await self.disconnect_controller(controller)
1162
1163 async def consume(
1164 self,
1165 offer_url: str,
1166 model_name: str,
1167 ):
1168 """
1169 Adds a remote offer to the model. Relations can be created later using "juju relate".
1170
1171 :param: offer_url: Offer Url
1172 :param: model_name: Model name
1173
1174 :raises ParseError if there's a problem parsing the offer_url
1175 :raises JujuError if remote offer includes and endpoint
1176 :raises JujuAPIError if the operation is not successful
1177 """
1178 controller = await self.get_controller()
1179 model = await controller.get_model(model_name)
1180
1181 try:
1182 await model.consume(offer_url)
1183 finally:
1184 await self.disconnect_model(model)
1185 await self.disconnect_controller(controller)
1186
1187 async def destroy_model(self, model_name: str, total_timeout: float = 1800):
1188 """
1189 Destroy model
1190
1191 :param: model_name: Model name
1192 :param: total_timeout: Timeout
1193 """
1194
1195 controller = await self.get_controller()
1196 model = None
1197 try:
1198 if not await self.model_exists(model_name, controller=controller):
1199 return
1200
1201 self.log.debug("Destroying model {}".format(model_name))
1202
1203 model = await self.get_model(controller, model_name)
1204 # Destroy machines that are manually provisioned
1205 # and still are in pending state
1206 await self._destroy_pending_machines(model, only_manual=True)
1207 await self.disconnect_model(model)
1208
1209 await asyncio.wait_for(
1210 self._destroy_model(model_name, controller),
1211 timeout=total_timeout,
1212 )
1213 except Exception as e:
1214 if not await self.model_exists(model_name, controller=controller):
1215 return
1216 raise e
1217 finally:
1218 if model:
1219 await self.disconnect_model(model)
1220 await self.disconnect_controller(controller)
1221
1222 async def _destroy_model(
1223 self,
1224 model_name: str,
1225 controller: Controller,
1226 ):
1227 """
1228 Destroy model from controller
1229
1230 :param: model: Model name to be removed
1231 :param: controller: Controller object
1232 :param: timeout: Timeout in seconds
1233 """
1234
1235 async def _destroy_model_gracefully(model_name: str, controller: Controller):
1236 self.log.info(f"Gracefully deleting model {model_name}")
1237 resolved = False
1238 while model_name in await controller.list_models():
1239 if not resolved:
1240 await self.resolve(model_name)
1241 resolved = True
1242 await controller.destroy_model(model_name, destroy_storage=True)
1243
1244 await asyncio.sleep(5)
1245 self.log.info(f"Model {model_name} deleted gracefully")
1246
1247 async def _destroy_model_forcefully(model_name: str, controller: Controller):
1248 self.log.info(f"Forcefully deleting model {model_name}")
1249 while model_name in await controller.list_models():
1250 await controller.destroy_model(
1251 model_name, destroy_storage=True, force=True, max_wait=60
1252 )
1253 await asyncio.sleep(5)
1254 self.log.info(f"Model {model_name} deleted forcefully")
1255
1256 try:
1257 try:
1258 await asyncio.wait_for(
1259 _destroy_model_gracefully(model_name, controller), timeout=120
1260 )
1261 except asyncio.TimeoutError:
1262 await _destroy_model_forcefully(model_name, controller)
1263 except juju.errors.JujuError as e:
1264 if any("has been removed" in error for error in e.errors):
1265 return
1266 if any("model not found" in error for error in e.errors):
1267 return
1268 raise e
1269
1270 async def destroy_application(
1271 self, model_name: str, application_name: str, total_timeout: float
1272 ):
1273 """
1274 Destroy application
1275
1276 :param: model_name: Model name
1277 :param: application_name: Application name
1278 :param: total_timeout: Timeout
1279 """
1280
1281 controller = await self.get_controller()
1282 model = None
1283
1284 try:
1285 model = await self.get_model(controller, model_name)
1286 self.log.debug(
1287 "Destroying application {} in model {}".format(
1288 application_name, model_name
1289 )
1290 )
1291 application = self._get_application(model, application_name)
1292 if application:
1293 await application.destroy()
1294 else:
1295 self.log.warning("Application not found: {}".format(application_name))
1296
1297 self.log.debug(
1298 "Waiting for application {} to be destroyed in model {}...".format(
1299 application_name, model_name
1300 )
1301 )
1302 if total_timeout is None:
1303 total_timeout = 3600
1304 end = time.time() + total_timeout
1305 while time.time() < end:
1306 if not self._get_application(model, application_name):
1307 self.log.debug(
1308 "The application {} was destroyed in model {} ".format(
1309 application_name, model_name
1310 )
1311 )
1312 return
1313 await asyncio.sleep(5)
1314 raise Exception(
1315 "Timeout waiting for application {} to be destroyed in model {}".format(
1316 application_name, model_name
1317 )
1318 )
1319 finally:
1320 if model is not None:
1321 await self.disconnect_model(model)
1322 await self.disconnect_controller(controller)
1323
1324 async def _destroy_pending_machines(self, model: Model, only_manual: bool = False):
1325 """
1326 Destroy pending machines in a given model
1327
1328 :param: only_manual: Bool that indicates only manually provisioned
1329 machines should be destroyed (if True), or that
1330 all pending machines should be destroyed
1331 """
1332 status = await model.get_status()
1333 for machine_id in status.machines:
1334 machine_status = status.machines[machine_id]
1335 if machine_status.agent_status.status == "pending":
1336 if only_manual and not machine_status.instance_id.startswith("manual:"):
1337 break
1338 machine = model.machines[machine_id]
1339 await machine.destroy(force=True)
1340
1341 async def configure_application(
1342 self, model_name: str, application_name: str, config: dict = None
1343 ):
1344 """Configure application
1345
1346 :param: model_name: Model name
1347 :param: application_name: Application name
1348 :param: config: Config to apply to the charm
1349 """
1350 self.log.debug("Configuring application {}".format(application_name))
1351
1352 if config:
1353 controller = await self.get_controller()
1354 model = None
1355 try:
1356 model = await self.get_model(controller, model_name)
1357 application = self._get_application(
1358 model,
1359 application_name=application_name,
1360 )
1361 await application.set_config(config)
1362 finally:
1363 if model:
1364 await self.disconnect_model(model)
1365 await self.disconnect_controller(controller)
1366
1367 def handle_exception(self, loop, context):
1368 # All unhandled exceptions by libjuju are handled here.
1369 pass
1370
1371 async def health_check(self, interval: float = 300.0):
1372 """
1373 Health check to make sure controller and controller_model connections are OK
1374
1375 :param: interval: Time in seconds between checks
1376 """
1377 controller = None
1378 while True:
1379 try:
1380 controller = await self.get_controller()
1381 # self.log.debug("VCA is alive")
1382 except Exception as e:
1383 self.log.error("Health check to VCA failed: {}".format(e))
1384 finally:
1385 await self.disconnect_controller(controller)
1386 await asyncio.sleep(interval)
1387
1388 async def list_models(self, contains: str = None) -> [str]:
1389 """List models with certain names
1390
1391 :param: contains: String that is contained in model name
1392
1393 :retur: [models] Returns list of model names
1394 """
1395
1396 controller = await self.get_controller()
1397 try:
1398 models = await controller.list_models()
1399 if contains:
1400 models = [model for model in models if contains in model]
1401 return models
1402 finally:
1403 await self.disconnect_controller(controller)
1404
1405 async def list_offers(self, model_name: str) -> QueryApplicationOffersResults:
1406 """List models with certain names
1407
1408 :param: model_name: Model name
1409
1410 :return: Returns list of offers
1411 """
1412
1413 controller = await self.get_controller()
1414 try:
1415 return await controller.list_offers(model_name)
1416 finally:
1417 await self.disconnect_controller(controller)
1418
1419 async def add_k8s(
1420 self,
1421 name: str,
1422 rbac_id: str,
1423 token: str,
1424 client_cert_data: str,
1425 configuration: Configuration,
1426 storage_class: str,
1427 credential_name: str = None,
1428 ):
1429 """
1430 Add a Kubernetes cloud to the controller
1431
1432 Similar to the `juju add-k8s` command in the CLI
1433
1434 :param: name: Name for the K8s cloud
1435 :param: configuration: Kubernetes configuration object
1436 :param: storage_class: Storage Class to use in the cloud
1437 :param: credential_name: Storage Class to use in the cloud
1438 """
1439
1440 if not storage_class:
1441 raise Exception("storage_class must be a non-empty string")
1442 if not name:
1443 raise Exception("name must be a non-empty string")
1444 if not configuration:
1445 raise Exception("configuration must be provided")
1446
1447 endpoint = configuration.host
1448 credential = self.get_k8s_cloud_credential(
1449 configuration,
1450 client_cert_data,
1451 token,
1452 )
1453 credential.attrs[RBAC_LABEL_KEY_NAME] = rbac_id
1454 cloud = client.Cloud(
1455 type_="kubernetes",
1456 auth_types=[credential.auth_type],
1457 endpoint=endpoint,
1458 ca_certificates=[client_cert_data],
1459 config={
1460 "operator-storage": storage_class,
1461 "workload-storage": storage_class,
1462 },
1463 )
1464
1465 return await self.add_cloud(
1466 name, cloud, credential, credential_name=credential_name
1467 )
1468
1469 def get_k8s_cloud_credential(
1470 self,
1471 configuration: Configuration,
1472 client_cert_data: str,
1473 token: str = None,
1474 ) -> client.CloudCredential:
1475 attrs = {}
1476 # TODO: Test with AKS
1477 key = None # open(configuration.key_file, "r").read()
1478 username = configuration.username
1479 password = configuration.password
1480
1481 if client_cert_data:
1482 attrs["ClientCertificateData"] = client_cert_data
1483 if key:
1484 attrs["ClientKeyData"] = key
1485 if token:
1486 if username or password:
1487 raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass")
1488 attrs["Token"] = token
1489
1490 auth_type = None
1491 if key:
1492 auth_type = "oauth2"
1493 if client_cert_data:
1494 auth_type = "oauth2withcert"
1495 if not token:
1496 raise JujuInvalidK8sConfiguration(
1497 "missing token for auth type {}".format(auth_type)
1498 )
1499 elif username:
1500 if not password:
1501 self.log.debug(
1502 "credential for user {} has empty password".format(username)
1503 )
1504 attrs["username"] = username
1505 attrs["password"] = password
1506 if client_cert_data:
1507 auth_type = "userpasswithcert"
1508 else:
1509 auth_type = "userpass"
1510 elif client_cert_data and token:
1511 auth_type = "certificate"
1512 else:
1513 raise JujuInvalidK8sConfiguration("authentication method not supported")
1514 return client.CloudCredential(auth_type=auth_type, attrs=attrs)
1515
1516 async def add_cloud(
1517 self,
1518 name: str,
1519 cloud: Cloud,
1520 credential: CloudCredential = None,
1521 credential_name: str = None,
1522 ) -> Cloud:
1523 """
1524 Add cloud to the controller
1525
1526 :param: name: Name of the cloud to be added
1527 :param: cloud: Cloud object
1528 :param: credential: CloudCredentials object for the cloud
1529 :param: credential_name: Credential name.
1530 If not defined, cloud of the name will be used.
1531 """
1532 controller = await self.get_controller()
1533 try:
1534 _ = await controller.add_cloud(name, cloud)
1535 if credential:
1536 await controller.add_credential(
1537 credential_name or name, credential=credential, cloud=name
1538 )
1539 # Need to return the object returned by the controller.add_cloud() function
1540 # I'm returning the original value now until this bug is fixed:
1541 # https://github.com/juju/python-libjuju/issues/443
1542 return cloud
1543 finally:
1544 await self.disconnect_controller(controller)
1545
1546 async def remove_cloud(self, name: str):
1547 """
1548 Remove cloud
1549
1550 :param: name: Name of the cloud to be removed
1551 """
1552 controller = await self.get_controller()
1553 try:
1554 await controller.remove_cloud(name)
1555 except juju.errors.JujuError as e:
1556 if len(e.errors) == 1 and f'cloud "{name}" not found' == e.errors[0]:
1557 self.log.warning(f"Cloud {name} not found, so it could not be deleted.")
1558 else:
1559 raise e
1560 finally:
1561 await self.disconnect_controller(controller)
1562
1563 @retry(attempts=20, delay=5, fallback=JujuLeaderUnitNotFound())
1564 async def _get_leader_unit(self, application: Application) -> Unit:
1565 unit = None
1566 for u in application.units:
1567 if await u.is_leader_from_status():
1568 unit = u
1569 break
1570 if not unit:
1571 raise Exception()
1572 return unit
1573
1574 async def get_cloud_credentials(self, cloud: Cloud) -> typing.List:
1575 """
1576 Get cloud credentials
1577
1578 :param: cloud: Cloud object. The returned credentials will be from this cloud.
1579
1580 :return: List of credentials object associated to the specified cloud
1581
1582 """
1583 controller = await self.get_controller()
1584 try:
1585 facade = client.CloudFacade.from_connection(controller.connection())
1586 cloud_cred_tag = tag.credential(
1587 cloud.name, self.vca_connection.data.user, cloud.credential_name
1588 )
1589 params = [client.Entity(cloud_cred_tag)]
1590 return (await facade.Credential(params)).results
1591 finally:
1592 await self.disconnect_controller(controller)
1593
1594 async def check_application_exists(self, model_name, application_name) -> bool:
1595 """Check application exists
1596
1597 :param: model_name: Model Name
1598 :param: application_name: Application Name
1599
1600 :return: Boolean
1601 """
1602
1603 model = None
1604 controller = await self.get_controller()
1605 try:
1606 model = await self.get_model(controller, model_name)
1607 self.log.debug(
1608 "Checking if application {} exists in model {}".format(
1609 application_name, model_name
1610 )
1611 )
1612 return self._get_application(model, application_name) is not None
1613 finally:
1614 if model:
1615 await self.disconnect_model(model)
1616 await self.disconnect_controller(controller)