Fix bug 2060: Add logic to delete models gracefully
[osm/N2VC.git] / n2vc / libjuju.py
1 # Copyright 2020 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import logging
17 import typing
18
19 import time
20
21 import juju.errors
22 from juju.model import Model
23 from juju.machine import Machine
24 from juju.application import Application
25 from juju.unit import Unit
26 from juju.client._definitions import (
27 FullStatus,
28 QueryApplicationOffersResults,
29 Cloud,
30 CloudCredential,
31 )
32 from juju.controller import Controller
33 from juju.client import client
34 from juju import tag
35
36 from n2vc.definitions import Offer, RelationEndpoint
37 from n2vc.juju_watcher import JujuModelWatcher
38 from n2vc.provisioner import AsyncSSHProvisioner
39 from n2vc.n2vc_conn import N2VCConnector
40 from n2vc.exceptions import (
41 JujuMachineNotFound,
42 JujuApplicationNotFound,
43 JujuLeaderUnitNotFound,
44 JujuActionNotFound,
45 JujuControllerFailedConnecting,
46 JujuApplicationExists,
47 JujuInvalidK8sConfiguration,
48 JujuError,
49 )
50 from n2vc.vca.cloud import Cloud as VcaCloud
51 from n2vc.vca.connection import Connection
52 from kubernetes.client.configuration import Configuration
53 from retrying_async import retry
54
55
56 RBAC_LABEL_KEY_NAME = "rbac-id"
57
58
59 class Libjuju:
60 def __init__(
61 self,
62 vca_connection: Connection,
63 loop: asyncio.AbstractEventLoop = None,
64 log: logging.Logger = None,
65 n2vc: N2VCConnector = None,
66 ):
67 """
68 Constructor
69
70 :param: vca_connection: n2vc.vca.connection object
71 :param: loop: Asyncio loop
72 :param: log: Logger
73 :param: n2vc: N2VC object
74 """
75
76 self.log = log or logging.getLogger("Libjuju")
77 self.n2vc = n2vc
78 self.vca_connection = vca_connection
79
80 self.loop = loop or asyncio.get_event_loop()
81 self.loop.set_exception_handler(self.handle_exception)
82 self.creating_model = asyncio.Lock(loop=self.loop)
83
84 if self.vca_connection.is_default:
85 self.health_check_task = self._create_health_check_task()
86
87 def _create_health_check_task(self):
88 return self.loop.create_task(self.health_check())
89
90 async def get_controller(self, timeout: float = 60.0) -> Controller:
91 """
92 Get controller
93
94 :param: timeout: Time in seconds to wait for controller to connect
95 """
96 controller = None
97 try:
98 controller = Controller()
99 await asyncio.wait_for(
100 controller.connect(
101 endpoint=self.vca_connection.data.endpoints,
102 username=self.vca_connection.data.user,
103 password=self.vca_connection.data.secret,
104 cacert=self.vca_connection.data.cacert,
105 ),
106 timeout=timeout,
107 )
108 if self.vca_connection.is_default:
109 endpoints = await controller.api_endpoints
110 if not all(
111 endpoint in self.vca_connection.endpoints for endpoint in endpoints
112 ):
113 await self.vca_connection.update_endpoints(endpoints)
114 return controller
115 except asyncio.CancelledError as e:
116 raise e
117 except Exception as e:
118 self.log.error(
119 "Failed connecting to controller: {}... {}".format(
120 self.vca_connection.data.endpoints, e
121 )
122 )
123 if controller:
124 await self.disconnect_controller(controller)
125
126 raise JujuControllerFailedConnecting(
127 f"Error connecting to Juju controller: {e}"
128 )
129
130 async def disconnect(self):
131 """Disconnect"""
132 # Cancel health check task
133 self.health_check_task.cancel()
134 self.log.debug("Libjuju disconnected!")
135
136 async def disconnect_model(self, model: Model):
137 """
138 Disconnect model
139
140 :param: model: Model that will be disconnected
141 """
142 await model.disconnect()
143
144 async def disconnect_controller(self, controller: Controller):
145 """
146 Disconnect controller
147
148 :param: controller: Controller that will be disconnected
149 """
150 if controller:
151 await controller.disconnect()
152
153 @retry(attempts=3, delay=5, timeout=None)
154 async def add_model(self, model_name: str, cloud: VcaCloud):
155 """
156 Create model
157
158 :param: model_name: Model name
159 :param: cloud: Cloud object
160 """
161
162 # Get controller
163 controller = await self.get_controller()
164 model = None
165 try:
166 # Block until other workers have finished model creation
167 while self.creating_model.locked():
168 await asyncio.sleep(0.1)
169
170 # Create the model
171 async with self.creating_model:
172 if await self.model_exists(model_name, controller=controller):
173 return
174 self.log.debug("Creating model {}".format(model_name))
175 model = await controller.add_model(
176 model_name,
177 config=self.vca_connection.data.model_config,
178 cloud_name=cloud.name,
179 credential_name=cloud.credential_name,
180 )
181 except juju.errors.JujuAPIError as e:
182 if "already exists" in e.message:
183 pass
184 else:
185 raise e
186 finally:
187 if model:
188 await self.disconnect_model(model)
189 await self.disconnect_controller(controller)
190
191 async def get_executed_actions(self, model_name: str) -> list:
192 """
193 Get executed/history of actions for a model.
194
195 :param: model_name: Model name, str.
196 :return: List of executed actions for a model.
197 """
198 model = None
199 executed_actions = []
200 controller = await self.get_controller()
201 try:
202 model = await self.get_model(controller, model_name)
203 # Get all unique action names
204 actions = {}
205 for application in model.applications:
206 application_actions = await self.get_actions(application, model_name)
207 actions.update(application_actions)
208 # Get status of all actions
209 for application_action in actions:
210 app_action_status_list = await model.get_action_status(
211 name=application_action
212 )
213 for action_id, action_status in app_action_status_list.items():
214 executed_action = {
215 "id": action_id,
216 "action": application_action,
217 "status": action_status,
218 }
219 # Get action output by id
220 action_status = await model.get_action_output(executed_action["id"])
221 for k, v in action_status.items():
222 executed_action[k] = v
223 executed_actions.append(executed_action)
224 except Exception as e:
225 raise JujuError(
226 "Error in getting executed actions for model: {}. Error: {}".format(
227 model_name, str(e)
228 )
229 )
230 finally:
231 if model:
232 await self.disconnect_model(model)
233 await self.disconnect_controller(controller)
234 return executed_actions
235
236 async def get_application_configs(
237 self, model_name: str, application_name: str
238 ) -> dict:
239 """
240 Get available configs for an application.
241
242 :param: model_name: Model name, str.
243 :param: application_name: Application name, str.
244
245 :return: A dict which has key - action name, value - action description
246 """
247 model = None
248 application_configs = {}
249 controller = await self.get_controller()
250 try:
251 model = await self.get_model(controller, model_name)
252 application = self._get_application(
253 model, application_name=application_name
254 )
255 application_configs = await application.get_config()
256 except Exception as e:
257 raise JujuError(
258 "Error in getting configs for application: {} in model: {}. Error: {}".format(
259 application_name, model_name, str(e)
260 )
261 )
262 finally:
263 if model:
264 await self.disconnect_model(model)
265 await self.disconnect_controller(controller)
266 return application_configs
267
268 @retry(attempts=3, delay=5)
269 async def get_model(self, controller: Controller, model_name: str) -> Model:
270 """
271 Get model from controller
272
273 :param: controller: Controller
274 :param: model_name: Model name
275
276 :return: Model: The created Juju model object
277 """
278 return await controller.get_model(model_name)
279
280 async def model_exists(
281 self, model_name: str, controller: Controller = None
282 ) -> bool:
283 """
284 Check if model exists
285
286 :param: controller: Controller
287 :param: model_name: Model name
288
289 :return bool
290 """
291 need_to_disconnect = False
292
293 # Get controller if not passed
294 if not controller:
295 controller = await self.get_controller()
296 need_to_disconnect = True
297
298 # Check if model exists
299 try:
300 return model_name in await controller.list_models()
301 finally:
302 if need_to_disconnect:
303 await self.disconnect_controller(controller)
304
305 async def models_exist(self, model_names: [str]) -> (bool, list):
306 """
307 Check if models exists
308
309 :param: model_names: List of strings with model names
310
311 :return (bool, list[str]): (True if all models exists, List of model names that don't exist)
312 """
313 if not model_names:
314 raise Exception(
315 "model_names must be a non-empty array. Given value: {}".format(
316 model_names
317 )
318 )
319 non_existing_models = []
320 models = await self.list_models()
321 existing_models = list(set(models).intersection(model_names))
322 non_existing_models = list(set(model_names) - set(existing_models))
323
324 return (
325 len(non_existing_models) == 0,
326 non_existing_models,
327 )
328
329 async def get_model_status(self, model_name: str) -> FullStatus:
330 """
331 Get model status
332
333 :param: model_name: Model name
334
335 :return: Full status object
336 """
337 controller = await self.get_controller()
338 model = await self.get_model(controller, model_name)
339 try:
340 return await model.get_status()
341 finally:
342 await self.disconnect_model(model)
343 await self.disconnect_controller(controller)
344
345 async def create_machine(
346 self,
347 model_name: str,
348 machine_id: str = None,
349 db_dict: dict = None,
350 progress_timeout: float = None,
351 total_timeout: float = None,
352 series: str = "bionic",
353 wait: bool = True,
354 ) -> (Machine, bool):
355 """
356 Create machine
357
358 :param: model_name: Model name
359 :param: machine_id: Machine id
360 :param: db_dict: Dictionary with data of the DB to write the updates
361 :param: progress_timeout: Maximum time between two updates in the model
362 :param: total_timeout: Timeout for the entity to be active
363 :param: series: Series of the machine (xenial, bionic, focal, ...)
364 :param: wait: Wait until machine is ready
365
366 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
367 if the machine is new or it already existed
368 """
369 new = False
370 machine = None
371
372 self.log.debug(
373 "Creating machine (id={}) in model: {}".format(machine_id, model_name)
374 )
375
376 # Get controller
377 controller = await self.get_controller()
378
379 # Get model
380 model = await self.get_model(controller, model_name)
381 try:
382 if machine_id is not None:
383 self.log.debug(
384 "Searching machine (id={}) in model {}".format(
385 machine_id, model_name
386 )
387 )
388
389 # Get machines from model and get the machine with machine_id if exists
390 machines = await model.get_machines()
391 if machine_id in machines:
392 self.log.debug(
393 "Machine (id={}) found in model {}".format(
394 machine_id, model_name
395 )
396 )
397 machine = machines[machine_id]
398 else:
399 raise JujuMachineNotFound("Machine {} not found".format(machine_id))
400
401 if machine is None:
402 self.log.debug("Creating a new machine in model {}".format(model_name))
403
404 # Create machine
405 machine = await model.add_machine(
406 spec=None, constraints=None, disks=None, series=series
407 )
408 new = True
409
410 # Wait until the machine is ready
411 self.log.debug(
412 "Wait until machine {} is ready in model {}".format(
413 machine.entity_id, model_name
414 )
415 )
416 if wait:
417 await JujuModelWatcher.wait_for(
418 model=model,
419 entity=machine,
420 progress_timeout=progress_timeout,
421 total_timeout=total_timeout,
422 db_dict=db_dict,
423 n2vc=self.n2vc,
424 vca_id=self.vca_connection._vca_id,
425 )
426 finally:
427 await self.disconnect_model(model)
428 await self.disconnect_controller(controller)
429
430 self.log.debug(
431 "Machine {} ready at {} in model {}".format(
432 machine.entity_id, machine.dns_name, model_name
433 )
434 )
435 return machine, new
436
437 async def provision_machine(
438 self,
439 model_name: str,
440 hostname: str,
441 username: str,
442 private_key_path: str,
443 db_dict: dict = None,
444 progress_timeout: float = None,
445 total_timeout: float = None,
446 ) -> str:
447 """
448 Manually provisioning of a machine
449
450 :param: model_name: Model name
451 :param: hostname: IP to access the machine
452 :param: username: Username to login to the machine
453 :param: private_key_path: Local path for the private key
454 :param: db_dict: Dictionary with data of the DB to write the updates
455 :param: progress_timeout: Maximum time between two updates in the model
456 :param: total_timeout: Timeout for the entity to be active
457
458 :return: (Entity): Machine id
459 """
460 self.log.debug(
461 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
462 model_name, hostname, username
463 )
464 )
465
466 # Get controller
467 controller = await self.get_controller()
468
469 # Get model
470 model = await self.get_model(controller, model_name)
471
472 try:
473 # Get provisioner
474 provisioner = AsyncSSHProvisioner(
475 host=hostname,
476 user=username,
477 private_key_path=private_key_path,
478 log=self.log,
479 )
480
481 # Provision machine
482 params = await provisioner.provision_machine()
483
484 params.jobs = ["JobHostUnits"]
485
486 self.log.debug("Adding machine to model")
487 connection = model.connection()
488 client_facade = client.ClientFacade.from_connection(connection)
489
490 results = await client_facade.AddMachines(params=[params])
491 error = results.machines[0].error
492
493 if error:
494 msg = "Error adding machine: {}".format(error.message)
495 self.log.error(msg=msg)
496 raise ValueError(msg)
497
498 machine_id = results.machines[0].machine
499
500 self.log.debug("Installing Juju agent into machine {}".format(machine_id))
501 asyncio.ensure_future(
502 provisioner.install_agent(
503 connection=connection,
504 nonce=params.nonce,
505 machine_id=machine_id,
506 proxy=self.vca_connection.data.api_proxy,
507 series=params.series,
508 )
509 )
510
511 machine = None
512 for _ in range(10):
513 machine_list = await model.get_machines()
514 if machine_id in machine_list:
515 self.log.debug("Machine {} found in model!".format(machine_id))
516 machine = model.machines.get(machine_id)
517 break
518 await asyncio.sleep(2)
519
520 if machine is None:
521 msg = "Machine {} not found in model".format(machine_id)
522 self.log.error(msg=msg)
523 raise JujuMachineNotFound(msg)
524
525 self.log.debug(
526 "Wait until machine {} is ready in model {}".format(
527 machine.entity_id, model_name
528 )
529 )
530 await JujuModelWatcher.wait_for(
531 model=model,
532 entity=machine,
533 progress_timeout=progress_timeout,
534 total_timeout=total_timeout,
535 db_dict=db_dict,
536 n2vc=self.n2vc,
537 vca_id=self.vca_connection._vca_id,
538 )
539 except Exception as e:
540 raise e
541 finally:
542 await self.disconnect_model(model)
543 await self.disconnect_controller(controller)
544
545 self.log.debug(
546 "Machine provisioned {} in model {}".format(machine_id, model_name)
547 )
548
549 return machine_id
550
551 async def deploy(
552 self, uri: str, model_name: str, wait: bool = True, timeout: float = 3600
553 ):
554 """
555 Deploy bundle or charm: Similar to the juju CLI command `juju deploy`
556
557 :param: uri: Path or Charm Store uri in which the charm or bundle can be found
558 :param: model_name: Model name
559 :param: wait: Indicates whether to wait or not until all applications are active
560 :param: timeout: Time in seconds to wait until all applications are active
561 """
562 controller = await self.get_controller()
563 model = await self.get_model(controller, model_name)
564 try:
565 await model.deploy(uri, trust=True)
566 if wait:
567 await JujuModelWatcher.wait_for_model(model, timeout=timeout)
568 self.log.debug("All units active in model {}".format(model_name))
569 finally:
570 await self.disconnect_model(model)
571 await self.disconnect_controller(controller)
572
573 async def add_unit(
574 self,
575 application_name: str,
576 model_name: str,
577 machine_id: str,
578 db_dict: dict = None,
579 progress_timeout: float = None,
580 total_timeout: float = None,
581 ):
582 """Add unit
583
584 :param: application_name: Application name
585 :param: model_name: Model name
586 :param: machine_id Machine id
587 :param: db_dict: Dictionary with data of the DB to write the updates
588 :param: progress_timeout: Maximum time between two updates in the model
589 :param: total_timeout: Timeout for the entity to be active
590
591 :return: None
592 """
593
594 model = None
595 controller = await self.get_controller()
596 try:
597 model = await self.get_model(controller, model_name)
598 application = self._get_application(model, application_name)
599
600 if application is not None:
601
602 # Checks if the given machine id in the model,
603 # otherwise function raises an error
604 _machine, _series = self._get_machine_info(model, machine_id)
605
606 self.log.debug(
607 "Adding unit (machine {}) to application {} in model ~{}".format(
608 machine_id, application_name, model_name
609 )
610 )
611
612 await application.add_unit(to=machine_id)
613
614 await JujuModelWatcher.wait_for(
615 model=model,
616 entity=application,
617 progress_timeout=progress_timeout,
618 total_timeout=total_timeout,
619 db_dict=db_dict,
620 n2vc=self.n2vc,
621 vca_id=self.vca_connection._vca_id,
622 )
623 self.log.debug(
624 "Unit is added to application {} in model {}".format(
625 application_name, model_name
626 )
627 )
628 else:
629 raise JujuApplicationNotFound(
630 "Application {} not exists".format(application_name)
631 )
632 finally:
633 if model:
634 await self.disconnect_model(model)
635 await self.disconnect_controller(controller)
636
637 async def destroy_unit(
638 self,
639 application_name: str,
640 model_name: str,
641 machine_id: str,
642 total_timeout: float = None,
643 ):
644 """Destroy unit
645
646 :param: application_name: Application name
647 :param: model_name: Model name
648 :param: machine_id Machine id
649 :param: total_timeout: Timeout for the entity to be active
650
651 :return: None
652 """
653
654 model = None
655 controller = await self.get_controller()
656 try:
657 model = await self.get_model(controller, model_name)
658 application = self._get_application(model, application_name)
659
660 if application is None:
661 raise JujuApplicationNotFound(
662 "Application not found: {} (model={})".format(
663 application_name, model_name
664 )
665 )
666
667 unit = self._get_unit(application, machine_id)
668 if not unit:
669 raise JujuError(
670 "A unit with machine id {} not in available units".format(
671 machine_id
672 )
673 )
674
675 unit_name = unit.name
676
677 self.log.debug(
678 "Destroying unit {} from application {} in model {}".format(
679 unit_name, application_name, model_name
680 )
681 )
682 await application.destroy_unit(unit_name)
683
684 self.log.debug(
685 "Waiting for unit {} to be destroyed in application {} (model={})...".format(
686 unit_name, application_name, model_name
687 )
688 )
689
690 # TODO: Add functionality in the Juju watcher to replace this kind of blocks
691 if total_timeout is None:
692 total_timeout = 3600
693 end = time.time() + total_timeout
694 while time.time() < end:
695 if not self._get_unit(application, machine_id):
696 self.log.debug(
697 "The unit {} was destroyed in application {} (model={}) ".format(
698 unit_name, application_name, model_name
699 )
700 )
701 return
702 await asyncio.sleep(5)
703 self.log.debug(
704 "Unit {} is destroyed from application {} in model {}".format(
705 unit_name, application_name, model_name
706 )
707 )
708 finally:
709 if model:
710 await self.disconnect_model(model)
711 await self.disconnect_controller(controller)
712
713 async def deploy_charm(
714 self,
715 application_name: str,
716 path: str,
717 model_name: str,
718 machine_id: str,
719 db_dict: dict = None,
720 progress_timeout: float = None,
721 total_timeout: float = None,
722 config: dict = None,
723 series: str = None,
724 num_units: int = 1,
725 ):
726 """Deploy charm
727
728 :param: application_name: Application name
729 :param: path: Local path to the charm
730 :param: model_name: Model name
731 :param: machine_id ID of the machine
732 :param: db_dict: Dictionary with data of the DB to write the updates
733 :param: progress_timeout: Maximum time between two updates in the model
734 :param: total_timeout: Timeout for the entity to be active
735 :param: config: Config for the charm
736 :param: series: Series of the charm
737 :param: num_units: Number of units
738
739 :return: (juju.application.Application): Juju application
740 """
741 self.log.debug(
742 "Deploying charm {} to machine {} in model ~{}".format(
743 application_name, machine_id, model_name
744 )
745 )
746 self.log.debug("charm: {}".format(path))
747
748 # Get controller
749 controller = await self.get_controller()
750
751 # Get model
752 model = await self.get_model(controller, model_name)
753
754 try:
755 if application_name not in model.applications:
756
757 if machine_id is not None:
758 machine, series = self._get_machine_info(model, machine_id)
759
760 application = await model.deploy(
761 entity_url=path,
762 application_name=application_name,
763 channel="stable",
764 num_units=1,
765 series=series,
766 to=machine_id,
767 config=config,
768 )
769
770 self.log.debug(
771 "Wait until application {} is ready in model {}".format(
772 application_name, model_name
773 )
774 )
775 if num_units > 1:
776 for _ in range(num_units - 1):
777 m, _ = await self.create_machine(model_name, wait=False)
778 await application.add_unit(to=m.entity_id)
779
780 await JujuModelWatcher.wait_for(
781 model=model,
782 entity=application,
783 progress_timeout=progress_timeout,
784 total_timeout=total_timeout,
785 db_dict=db_dict,
786 n2vc=self.n2vc,
787 vca_id=self.vca_connection._vca_id,
788 )
789 self.log.debug(
790 "Application {} is ready in model {}".format(
791 application_name, model_name
792 )
793 )
794 else:
795 raise JujuApplicationExists(
796 "Application {} exists".format(application_name)
797 )
798 except juju.errors.JujuError as e:
799 if "already exists" in e.message:
800 raise JujuApplicationExists(
801 "Application {} exists".format(application_name)
802 )
803 else:
804 raise e
805 finally:
806 await self.disconnect_model(model)
807 await self.disconnect_controller(controller)
808
809 return application
810
811 async def resolve(self, model_name: str):
812
813 controller = await self.get_controller()
814 model = await self.get_model(controller, model_name)
815 all_units_active = False
816 try:
817 while not all_units_active:
818 all_units_active = True
819 for application_name, application in model.applications.items():
820 if application.status == "error":
821 for unit in application.units:
822 if unit.workload_status == "error":
823 self.log.debug(
824 "Model {}, Application {}, Unit {} in error state, resolving".format(
825 model_name, application_name, unit.entity_id
826 )
827 )
828 try:
829 await unit.resolved(retry=False)
830 all_units_active = False
831 except Exception:
832 pass
833
834 if not all_units_active:
835 await asyncio.sleep(5)
836 finally:
837 await self.disconnect_model(model)
838 await self.disconnect_controller(controller)
839
840 async def scale_application(
841 self,
842 model_name: str,
843 application_name: str,
844 scale: int = 1,
845 total_timeout: float = None,
846 ):
847 """
848 Scale application (K8s)
849
850 :param: model_name: Model name
851 :param: application_name: Application name
852 :param: scale: Scale to which to set this application
853 :param: total_timeout: Timeout for the entity to be active
854 """
855
856 model = None
857 controller = await self.get_controller()
858 try:
859 model = await self.get_model(controller, model_name)
860
861 self.log.debug(
862 "Scaling application {} in model {}".format(
863 application_name, model_name
864 )
865 )
866 application = self._get_application(model, application_name)
867 if application is None:
868 raise JujuApplicationNotFound("Cannot scale application")
869 await application.scale(scale=scale)
870 # Wait until application is scaled in model
871 self.log.debug(
872 "Waiting for application {} to be scaled in model {}...".format(
873 application_name, model_name
874 )
875 )
876 if total_timeout is None:
877 total_timeout = 1800
878 end = time.time() + total_timeout
879 while time.time() < end:
880 application_scale = self._get_application_count(model, application_name)
881 # Before calling wait_for_model function,
882 # wait until application unit count and scale count are equal.
883 # Because there is a delay before scaling triggers in Juju model.
884 if application_scale == scale:
885 await JujuModelWatcher.wait_for_model(
886 model=model, timeout=total_timeout
887 )
888 self.log.debug(
889 "Application {} is scaled in model {}".format(
890 application_name, model_name
891 )
892 )
893 return
894 await asyncio.sleep(5)
895 raise Exception(
896 "Timeout waiting for application {} in model {} to be scaled".format(
897 application_name, model_name
898 )
899 )
900 finally:
901 if model:
902 await self.disconnect_model(model)
903 await self.disconnect_controller(controller)
904
905 def _get_application_count(self, model: Model, application_name: str) -> int:
906 """Get number of units of the application
907
908 :param: model: Model object
909 :param: application_name: Application name
910
911 :return: int (or None if application doesn't exist)
912 """
913 application = self._get_application(model, application_name)
914 if application is not None:
915 return len(application.units)
916
917 def _get_application(self, model: Model, application_name: str) -> Application:
918 """Get application
919
920 :param: model: Model object
921 :param: application_name: Application name
922
923 :return: juju.application.Application (or None if it doesn't exist)
924 """
925 if model.applications and application_name in model.applications:
926 return model.applications[application_name]
927
928 def _get_unit(self, application: Application, machine_id: str) -> Unit:
929 """Get unit
930
931 :param: application: Application object
932 :param: machine_id: Machine id
933
934 :return: Unit
935 """
936 unit = None
937 for u in application.units:
938 if u.machine_id == machine_id:
939 unit = u
940 break
941 return unit
942
943 def _get_machine_info(
944 self,
945 model,
946 machine_id: str,
947 ) -> (str, str):
948 """Get machine info
949
950 :param: model: Model object
951 :param: machine_id: Machine id
952
953 :return: (str, str): (machine, series)
954 """
955 if machine_id not in model.machines:
956 msg = "Machine {} not found in model".format(machine_id)
957 self.log.error(msg=msg)
958 raise JujuMachineNotFound(msg)
959 machine = model.machines[machine_id]
960 return machine, machine.series
961
962 async def execute_action(
963 self,
964 application_name: str,
965 model_name: str,
966 action_name: str,
967 db_dict: dict = None,
968 machine_id: str = None,
969 progress_timeout: float = None,
970 total_timeout: float = None,
971 **kwargs,
972 ):
973 """Execute action
974
975 :param: application_name: Application name
976 :param: model_name: Model name
977 :param: action_name: Name of the action
978 :param: db_dict: Dictionary with data of the DB to write the updates
979 :param: machine_id Machine id
980 :param: progress_timeout: Maximum time between two updates in the model
981 :param: total_timeout: Timeout for the entity to be active
982
983 :return: (str, str): (output and status)
984 """
985 self.log.debug(
986 "Executing action {} using params {}".format(action_name, kwargs)
987 )
988 # Get controller
989 controller = await self.get_controller()
990
991 # Get model
992 model = await self.get_model(controller, model_name)
993
994 try:
995 # Get application
996 application = self._get_application(
997 model,
998 application_name=application_name,
999 )
1000 if application is None:
1001 raise JujuApplicationNotFound("Cannot execute action")
1002 # Racing condition:
1003 # Ocassionally, self._get_leader_unit() will return None
1004 # because the leader elected hook has not been triggered yet.
1005 # Therefore, we are doing some retries. If it happens again,
1006 # re-open bug 1236
1007 if machine_id is None:
1008 unit = await self._get_leader_unit(application)
1009 self.log.debug(
1010 "Action {} is being executed on the leader unit {}".format(
1011 action_name, unit.name
1012 )
1013 )
1014 else:
1015 unit = self._get_unit(application, machine_id)
1016 if not unit:
1017 raise JujuError(
1018 "A unit with machine id {} not in available units".format(
1019 machine_id
1020 )
1021 )
1022 self.log.debug(
1023 "Action {} is being executed on {} unit".format(
1024 action_name, unit.name
1025 )
1026 )
1027
1028 actions = await application.get_actions()
1029
1030 if action_name not in actions:
1031 raise JujuActionNotFound(
1032 "Action {} not in available actions".format(action_name)
1033 )
1034
1035 action = await unit.run_action(action_name, **kwargs)
1036
1037 self.log.debug(
1038 "Wait until action {} is completed in application {} (model={})".format(
1039 action_name, application_name, model_name
1040 )
1041 )
1042 await JujuModelWatcher.wait_for(
1043 model=model,
1044 entity=action,
1045 progress_timeout=progress_timeout,
1046 total_timeout=total_timeout,
1047 db_dict=db_dict,
1048 n2vc=self.n2vc,
1049 vca_id=self.vca_connection._vca_id,
1050 )
1051
1052 output = await model.get_action_output(action_uuid=action.entity_id)
1053 status = await model.get_action_status(uuid_or_prefix=action.entity_id)
1054 status = (
1055 status[action.entity_id] if action.entity_id in status else "failed"
1056 )
1057
1058 self.log.debug(
1059 "Action {} completed with status {} in application {} (model={})".format(
1060 action_name, action.status, application_name, model_name
1061 )
1062 )
1063 finally:
1064 await self.disconnect_model(model)
1065 await self.disconnect_controller(controller)
1066
1067 return output, status
1068
1069 async def get_actions(self, application_name: str, model_name: str) -> dict:
1070 """Get list of actions
1071
1072 :param: application_name: Application name
1073 :param: model_name: Model name
1074
1075 :return: Dict with this format
1076 {
1077 "action_name": "Description of the action",
1078 ...
1079 }
1080 """
1081 self.log.debug(
1082 "Getting list of actions for application {}".format(application_name)
1083 )
1084
1085 # Get controller
1086 controller = await self.get_controller()
1087
1088 # Get model
1089 model = await self.get_model(controller, model_name)
1090
1091 try:
1092 # Get application
1093 application = self._get_application(
1094 model,
1095 application_name=application_name,
1096 )
1097
1098 # Return list of actions
1099 return await application.get_actions()
1100
1101 finally:
1102 # Disconnect from model and controller
1103 await self.disconnect_model(model)
1104 await self.disconnect_controller(controller)
1105
1106 async def get_metrics(self, model_name: str, application_name: str) -> dict:
1107 """Get the metrics collected by the VCA.
1108
1109 :param model_name The name or unique id of the network service
1110 :param application_name The name of the application
1111 """
1112 if not model_name or not application_name:
1113 raise Exception("model_name and application_name must be non-empty strings")
1114 metrics = {}
1115 controller = await self.get_controller()
1116 model = await self.get_model(controller, model_name)
1117 try:
1118 application = self._get_application(model, application_name)
1119 if application is not None:
1120 metrics = await application.get_metrics()
1121 finally:
1122 self.disconnect_model(model)
1123 self.disconnect_controller(controller)
1124 return metrics
1125
1126 async def add_relation(
1127 self,
1128 model_name: str,
1129 endpoint_1: str,
1130 endpoint_2: str,
1131 ):
1132 """Add relation
1133
1134 :param: model_name: Model name
1135 :param: endpoint_1 First endpoint name
1136 ("app:endpoint" format or directly the saas name)
1137 :param: endpoint_2: Second endpoint name (^ same format)
1138 """
1139
1140 self.log.debug("Adding relation: {} -> {}".format(endpoint_1, endpoint_2))
1141
1142 # Get controller
1143 controller = await self.get_controller()
1144
1145 # Get model
1146 model = await self.get_model(controller, model_name)
1147
1148 # Add relation
1149 try:
1150 await model.add_relation(endpoint_1, endpoint_2)
1151 except juju.errors.JujuAPIError as e:
1152 if "not found" in e.message:
1153 self.log.warning("Relation not found: {}".format(e.message))
1154 return
1155 if "already exists" in e.message:
1156 self.log.warning("Relation already exists: {}".format(e.message))
1157 return
1158 # another exception, raise it
1159 raise e
1160 finally:
1161 await self.disconnect_model(model)
1162 await self.disconnect_controller(controller)
1163
1164 async def offer(self, endpoint: RelationEndpoint) -> Offer:
1165 """
1166 Create an offer from a RelationEndpoint
1167
1168 :param: endpoint: Relation endpoint
1169
1170 :return: Offer object
1171 """
1172 model_name = endpoint.model_name
1173 offer_name = f"{endpoint.application_name}-{endpoint.endpoint_name}"
1174 controller = await self.get_controller()
1175 model = None
1176 try:
1177 model = await self.get_model(controller, model_name)
1178 await model.create_offer(endpoint.endpoint, offer_name=offer_name)
1179 offer_list = await self._list_offers(model_name, offer_name=offer_name)
1180 if offer_list:
1181 return Offer(offer_list[0].offer_url)
1182 else:
1183 raise Exception("offer was not created")
1184 except juju.errors.JujuError as e:
1185 if "application offer already exists" not in e.message:
1186 raise e
1187 finally:
1188 if model:
1189 self.disconnect_model(model)
1190 self.disconnect_controller(controller)
1191
1192 async def consume(
1193 self,
1194 model_name: str,
1195 offer: Offer,
1196 provider_libjuju: "Libjuju",
1197 ) -> str:
1198 """
1199 Consumes a remote offer in the model. Relations can be created later using "juju relate".
1200
1201 :param: model_name: Model name
1202 :param: offer: Offer object to consume
1203 :param: provider_libjuju: Libjuju object of the provider endpoint
1204
1205 :raises ParseError if there's a problem parsing the offer_url
1206 :raises JujuError if remote offer includes and endpoint
1207 :raises JujuAPIError if the operation is not successful
1208
1209 :returns: Saas name. It is the application name in the model that reference the remote application.
1210 """
1211 saas_name = f'{offer.name}-{offer.model_name.replace("-", "")}'
1212 if offer.vca_id:
1213 saas_name = f"{saas_name}-{offer.vca_id}"
1214 controller = await self.get_controller()
1215 model = None
1216 provider_controller = None
1217 try:
1218 model = await controller.get_model(model_name)
1219 provider_controller = await provider_libjuju.get_controller()
1220 await model.consume(
1221 offer.url, application_alias=saas_name, controller=provider_controller
1222 )
1223 return saas_name
1224 finally:
1225 if model:
1226 await self.disconnect_model(model)
1227 if provider_controller:
1228 await provider_libjuju.disconnect_controller(provider_controller)
1229 await self.disconnect_controller(controller)
1230
1231 async def destroy_model(self, model_name: str, total_timeout: float = 1800):
1232 """
1233 Destroy model
1234
1235 :param: model_name: Model name
1236 :param: total_timeout: Timeout
1237 """
1238
1239 controller = await self.get_controller()
1240 model = None
1241 try:
1242 if not await self.model_exists(model_name, controller=controller):
1243 return
1244
1245 self.log.debug("Destroying model {}".format(model_name))
1246
1247 model = await self.get_model(controller, model_name)
1248 # Destroy machines that are manually provisioned
1249 # and still are in pending state
1250 await self._destroy_pending_machines(model, only_manual=True)
1251 await self.disconnect_model(model)
1252
1253 await asyncio.wait_for(
1254 self._destroy_model(model_name, controller),
1255 timeout=total_timeout,
1256 )
1257 except Exception as e:
1258 if not await self.model_exists(model_name, controller=controller):
1259 return
1260 raise e
1261 finally:
1262 if model:
1263 await self.disconnect_model(model)
1264 await self.disconnect_controller(controller)
1265
1266 async def _destroy_model(
1267 self,
1268 model_name: str,
1269 controller: Controller,
1270 ):
1271 """
1272 Destroy model from controller
1273
1274 :param: model: Model name to be removed
1275 :param: controller: Controller object
1276 :param: timeout: Timeout in seconds
1277 """
1278
1279 async def _destroy_model_gracefully(model_name: str, controller: Controller):
1280 self.log.info(f"Gracefully deleting model {model_name}")
1281 resolved = False
1282 while model_name in await controller.list_models():
1283 if not resolved:
1284 await self.resolve(model_name)
1285 resolved = True
1286 await controller.destroy_model(model_name, destroy_storage=True)
1287
1288 await asyncio.sleep(5)
1289 self.log.info(f"Model {model_name} deleted gracefully")
1290
1291 async def _destroy_model_forcefully(model_name: str, controller: Controller):
1292 self.log.info(f"Forcefully deleting model {model_name}")
1293 while model_name in await controller.list_models():
1294 await controller.destroy_model(
1295 model_name, destroy_storage=True, force=True, max_wait=60
1296 )
1297 await asyncio.sleep(5)
1298 self.log.info(f"Model {model_name} deleted forcefully")
1299
1300 try:
1301 await asyncio.wait_for(
1302 _destroy_model_gracefully(model_name, controller), timeout=120
1303 )
1304 except asyncio.TimeoutError:
1305 await _destroy_model_forcefully(model_name, controller)
1306 except juju.errors.JujuError as e:
1307 if any("has been removed" in error for error in e.errors):
1308 return
1309 raise e
1310
1311 async def destroy_application(
1312 self, model_name: str, application_name: str, total_timeout: float
1313 ):
1314 """
1315 Destroy application
1316
1317 :param: model_name: Model name
1318 :param: application_name: Application name
1319 :param: total_timeout: Timeout
1320 """
1321
1322 controller = await self.get_controller()
1323 model = None
1324
1325 try:
1326 model = await self.get_model(controller, model_name)
1327 self.log.debug(
1328 "Destroying application {} in model {}".format(
1329 application_name, model_name
1330 )
1331 )
1332 application = self._get_application(model, application_name)
1333 if application:
1334 await application.destroy()
1335 else:
1336 self.log.warning("Application not found: {}".format(application_name))
1337
1338 self.log.debug(
1339 "Waiting for application {} to be destroyed in model {}...".format(
1340 application_name, model_name
1341 )
1342 )
1343 if total_timeout is None:
1344 total_timeout = 3600
1345 end = time.time() + total_timeout
1346 while time.time() < end:
1347 if not self._get_application(model, application_name):
1348 self.log.debug(
1349 "The application {} was destroyed in model {} ".format(
1350 application_name, model_name
1351 )
1352 )
1353 return
1354 await asyncio.sleep(5)
1355 raise Exception(
1356 "Timeout waiting for application {} to be destroyed in model {}".format(
1357 application_name, model_name
1358 )
1359 )
1360 finally:
1361 if model is not None:
1362 await self.disconnect_model(model)
1363 await self.disconnect_controller(controller)
1364
1365 async def _destroy_pending_machines(self, model: Model, only_manual: bool = False):
1366 """
1367 Destroy pending machines in a given model
1368
1369 :param: only_manual: Bool that indicates only manually provisioned
1370 machines should be destroyed (if True), or that
1371 all pending machines should be destroyed
1372 """
1373 status = await model.get_status()
1374 for machine_id in status.machines:
1375 machine_status = status.machines[machine_id]
1376 if machine_status.agent_status.status == "pending":
1377 if only_manual and not machine_status.instance_id.startswith("manual:"):
1378 break
1379 machine = model.machines[machine_id]
1380 await machine.destroy(force=True)
1381
1382 async def configure_application(
1383 self, model_name: str, application_name: str, config: dict = None
1384 ):
1385 """Configure application
1386
1387 :param: model_name: Model name
1388 :param: application_name: Application name
1389 :param: config: Config to apply to the charm
1390 """
1391 self.log.debug("Configuring application {}".format(application_name))
1392
1393 if config:
1394 controller = await self.get_controller()
1395 model = None
1396 try:
1397 model = await self.get_model(controller, model_name)
1398 application = self._get_application(
1399 model,
1400 application_name=application_name,
1401 )
1402 await application.set_config(config)
1403 finally:
1404 if model:
1405 await self.disconnect_model(model)
1406 await self.disconnect_controller(controller)
1407
1408 def handle_exception(self, loop, context):
1409 # All unhandled exceptions by libjuju are handled here.
1410 pass
1411
1412 async def health_check(self, interval: float = 300.0):
1413 """
1414 Health check to make sure controller and controller_model connections are OK
1415
1416 :param: interval: Time in seconds between checks
1417 """
1418 controller = None
1419 while True:
1420 try:
1421 controller = await self.get_controller()
1422 # self.log.debug("VCA is alive")
1423 except Exception as e:
1424 self.log.error("Health check to VCA failed: {}".format(e))
1425 finally:
1426 await self.disconnect_controller(controller)
1427 await asyncio.sleep(interval)
1428
1429 async def list_models(self, contains: str = None) -> [str]:
1430 """List models with certain names
1431
1432 :param: contains: String that is contained in model name
1433
1434 :retur: [models] Returns list of model names
1435 """
1436
1437 controller = await self.get_controller()
1438 try:
1439 models = await controller.list_models()
1440 if contains:
1441 models = [model for model in models if contains in model]
1442 return models
1443 finally:
1444 await self.disconnect_controller(controller)
1445
1446 async def _list_offers(
1447 self, model_name: str, offer_name: str = None
1448 ) -> QueryApplicationOffersResults:
1449 """
1450 List offers within a model
1451
1452 :param: model_name: Model name
1453 :param: offer_name: Offer name to filter.
1454
1455 :return: Returns application offers results in the model
1456 """
1457
1458 controller = await self.get_controller()
1459 try:
1460 offers = (await controller.list_offers(model_name)).results
1461 if offer_name:
1462 matching_offer = []
1463 for offer in offers:
1464 if offer.offer_name == offer_name:
1465 matching_offer.append(offer)
1466 break
1467 offers = matching_offer
1468 return offers
1469 finally:
1470 await self.disconnect_controller(controller)
1471
1472 async def add_k8s(
1473 self,
1474 name: str,
1475 rbac_id: str,
1476 token: str,
1477 client_cert_data: str,
1478 configuration: Configuration,
1479 storage_class: str,
1480 credential_name: str = None,
1481 ):
1482 """
1483 Add a Kubernetes cloud to the controller
1484
1485 Similar to the `juju add-k8s` command in the CLI
1486
1487 :param: name: Name for the K8s cloud
1488 :param: configuration: Kubernetes configuration object
1489 :param: storage_class: Storage Class to use in the cloud
1490 :param: credential_name: Storage Class to use in the cloud
1491 """
1492
1493 if not storage_class:
1494 raise Exception("storage_class must be a non-empty string")
1495 if not name:
1496 raise Exception("name must be a non-empty string")
1497 if not configuration:
1498 raise Exception("configuration must be provided")
1499
1500 endpoint = configuration.host
1501 credential = self.get_k8s_cloud_credential(
1502 configuration,
1503 client_cert_data,
1504 token,
1505 )
1506 credential.attrs[RBAC_LABEL_KEY_NAME] = rbac_id
1507 cloud = client.Cloud(
1508 type_="kubernetes",
1509 auth_types=[credential.auth_type],
1510 endpoint=endpoint,
1511 ca_certificates=[client_cert_data],
1512 config={
1513 "operator-storage": storage_class,
1514 "workload-storage": storage_class,
1515 },
1516 )
1517
1518 return await self.add_cloud(
1519 name, cloud, credential, credential_name=credential_name
1520 )
1521
1522 def get_k8s_cloud_credential(
1523 self,
1524 configuration: Configuration,
1525 client_cert_data: str,
1526 token: str = None,
1527 ) -> client.CloudCredential:
1528 attrs = {}
1529 # TODO: Test with AKS
1530 key = None # open(configuration.key_file, "r").read()
1531 username = configuration.username
1532 password = configuration.password
1533
1534 if client_cert_data:
1535 attrs["ClientCertificateData"] = client_cert_data
1536 if key:
1537 attrs["ClientKeyData"] = key
1538 if token:
1539 if username or password:
1540 raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass")
1541 attrs["Token"] = token
1542
1543 auth_type = None
1544 if key:
1545 auth_type = "oauth2"
1546 if client_cert_data:
1547 auth_type = "oauth2withcert"
1548 if not token:
1549 raise JujuInvalidK8sConfiguration(
1550 "missing token for auth type {}".format(auth_type)
1551 )
1552 elif username:
1553 if not password:
1554 self.log.debug(
1555 "credential for user {} has empty password".format(username)
1556 )
1557 attrs["username"] = username
1558 attrs["password"] = password
1559 if client_cert_data:
1560 auth_type = "userpasswithcert"
1561 else:
1562 auth_type = "userpass"
1563 elif client_cert_data and token:
1564 auth_type = "certificate"
1565 else:
1566 raise JujuInvalidK8sConfiguration("authentication method not supported")
1567 return client.CloudCredential(auth_type=auth_type, attrs=attrs)
1568
1569 async def add_cloud(
1570 self,
1571 name: str,
1572 cloud: Cloud,
1573 credential: CloudCredential = None,
1574 credential_name: str = None,
1575 ) -> Cloud:
1576 """
1577 Add cloud to the controller
1578
1579 :param: name: Name of the cloud to be added
1580 :param: cloud: Cloud object
1581 :param: credential: CloudCredentials object for the cloud
1582 :param: credential_name: Credential name.
1583 If not defined, cloud of the name will be used.
1584 """
1585 controller = await self.get_controller()
1586 try:
1587 _ = await controller.add_cloud(name, cloud)
1588 if credential:
1589 await controller.add_credential(
1590 credential_name or name, credential=credential, cloud=name
1591 )
1592 # Need to return the object returned by the controller.add_cloud() function
1593 # I'm returning the original value now until this bug is fixed:
1594 # https://github.com/juju/python-libjuju/issues/443
1595 return cloud
1596 finally:
1597 await self.disconnect_controller(controller)
1598
1599 async def remove_cloud(self, name: str):
1600 """
1601 Remove cloud
1602
1603 :param: name: Name of the cloud to be removed
1604 """
1605 controller = await self.get_controller()
1606 try:
1607 await controller.remove_cloud(name)
1608 except juju.errors.JujuError as e:
1609 if len(e.errors) == 1 and f'cloud "{name}" not found' == e.errors[0]:
1610 self.log.warning(f"Cloud {name} not found, so it could not be deleted.")
1611 else:
1612 raise e
1613 finally:
1614 await self.disconnect_controller(controller)
1615
1616 @retry(attempts=20, delay=5, fallback=JujuLeaderUnitNotFound())
1617 async def _get_leader_unit(self, application: Application) -> Unit:
1618 unit = None
1619 for u in application.units:
1620 if await u.is_leader_from_status():
1621 unit = u
1622 break
1623 if not unit:
1624 raise Exception()
1625 return unit
1626
1627 async def get_cloud_credentials(self, cloud: Cloud) -> typing.List:
1628 """
1629 Get cloud credentials
1630
1631 :param: cloud: Cloud object. The returned credentials will be from this cloud.
1632
1633 :return: List of credentials object associated to the specified cloud
1634
1635 """
1636 controller = await self.get_controller()
1637 try:
1638 facade = client.CloudFacade.from_connection(controller.connection())
1639 cloud_cred_tag = tag.credential(
1640 cloud.name, self.vca_connection.data.user, cloud.credential_name
1641 )
1642 params = [client.Entity(cloud_cred_tag)]
1643 return (await facade.Credential(params)).results
1644 finally:
1645 await self.disconnect_controller(controller)
1646
1647 async def check_application_exists(self, model_name, application_name) -> bool:
1648 """Check application exists
1649
1650 :param: model_name: Model Name
1651 :param: application_name: Application Name
1652
1653 :return: Boolean
1654 """
1655
1656 model = None
1657 controller = await self.get_controller()
1658 try:
1659 model = await self.get_model(controller, model_name)
1660 self.log.debug(
1661 "Checking if application {} exists in model {}".format(
1662 application_name, model_name
1663 )
1664 )
1665 return self._get_application(model, application_name) is not None
1666 finally:
1667 if model:
1668 await self.disconnect_model(model)
1669 await self.disconnect_controller(controller)