Feature 10239: Distributed VCA
[osm/N2VC.git] / n2vc / libjuju.py
1 # Copyright 2020 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import logging
17 import typing
18
19 import time
20
21 from juju.errors import JujuAPIError
22 from juju.model import Model
23 from juju.machine import Machine
24 from juju.application import Application
25 from juju.unit import Unit
26 from juju.client._definitions import (
27 FullStatus,
28 QueryApplicationOffersResults,
29 Cloud,
30 CloudCredential,
31 )
32 from juju.controller import Controller
33 from juju.client import client
34 from juju import tag
35
36 from n2vc.juju_watcher import JujuModelWatcher
37 from n2vc.provisioner import AsyncSSHProvisioner
38 from n2vc.n2vc_conn import N2VCConnector
39 from n2vc.exceptions import (
40 JujuMachineNotFound,
41 JujuApplicationNotFound,
42 JujuLeaderUnitNotFound,
43 JujuActionNotFound,
44 JujuControllerFailedConnecting,
45 JujuApplicationExists,
46 JujuInvalidK8sConfiguration,
47 JujuError,
48 )
49 from n2vc.vca.cloud import Cloud as VcaCloud
50 from n2vc.vca.connection import Connection
51 from kubernetes.client.configuration import Configuration
52 from retrying_async import retry
53
54
55 RBAC_LABEL_KEY_NAME = "rbac-id"
56
57
58 class Libjuju:
59 def __init__(
60 self,
61 vca_connection: Connection,
62 loop: asyncio.AbstractEventLoop = None,
63 log: logging.Logger = None,
64 n2vc: N2VCConnector = None,
65 ):
66 """
67 Constructor
68
69 :param: vca_connection: n2vc.vca.connection object
70 :param: loop: Asyncio loop
71 :param: log: Logger
72 :param: n2vc: N2VC object
73 """
74
75 self.log = log or logging.getLogger("Libjuju")
76 self.n2vc = n2vc
77 self.vca_connection = vca_connection
78
79 self.loop = loop or asyncio.get_event_loop()
80 self.loop.set_exception_handler(self.handle_exception)
81 self.creating_model = asyncio.Lock(loop=self.loop)
82
83 if self.vca_connection.is_default:
84 self.health_check_task = self._create_health_check_task()
85
86 def _create_health_check_task(self):
87 return self.loop.create_task(self.health_check())
88
89 async def get_controller(self, timeout: float = 60.0) -> Controller:
90 """
91 Get controller
92
93 :param: timeout: Time in seconds to wait for controller to connect
94 """
95 controller = None
96 try:
97 controller = Controller(loop=self.loop)
98 await asyncio.wait_for(
99 controller.connect(
100 endpoint=self.vca_connection.data.endpoints,
101 username=self.vca_connection.data.user,
102 password=self.vca_connection.data.secret,
103 cacert=self.vca_connection.data.cacert,
104 ),
105 timeout=timeout,
106 )
107 if self.vca_connection.is_default:
108 endpoints = await controller.api_endpoints
109 if not all(
110 endpoint in self.vca_connection.endpoints for endpoint in endpoints
111 ):
112 await self.vca_connection.update_endpoints(endpoints)
113 return controller
114 except asyncio.CancelledError as e:
115 raise e
116 except Exception as e:
117 self.log.error(
118 "Failed connecting to controller: {}... {}".format(
119 self.vca_connection.data.endpoints, e
120 )
121 )
122 if controller:
123 await self.disconnect_controller(controller)
124 raise JujuControllerFailedConnecting(e)
125
126 async def disconnect(self):
127 """Disconnect"""
128 # Cancel health check task
129 self.health_check_task.cancel()
130 self.log.debug("Libjuju disconnected!")
131
132 async def disconnect_model(self, model: Model):
133 """
134 Disconnect model
135
136 :param: model: Model that will be disconnected
137 """
138 await model.disconnect()
139
140 async def disconnect_controller(self, controller: Controller):
141 """
142 Disconnect controller
143
144 :param: controller: Controller that will be disconnected
145 """
146 if controller:
147 await controller.disconnect()
148
149 @retry(attempts=3, delay=5, timeout=None)
150 async def add_model(self, model_name: str, cloud: VcaCloud):
151 """
152 Create model
153
154 :param: model_name: Model name
155 :param: cloud: Cloud object
156 """
157
158 # Get controller
159 controller = await self.get_controller()
160 model = None
161 try:
162 # Block until other workers have finished model creation
163 while self.creating_model.locked():
164 await asyncio.sleep(0.1)
165
166 # Create the model
167 async with self.creating_model:
168 if await self.model_exists(model_name, controller=controller):
169 return
170 self.log.debug("Creating model {}".format(model_name))
171 model = await controller.add_model(
172 model_name,
173 config=self.vca_connection.data.model_config,
174 cloud_name=cloud.name,
175 credential_name=cloud.credential_name,
176 )
177 except JujuAPIError as e:
178 if "already exists" in e.message:
179 pass
180 else:
181 raise e
182 finally:
183 if model:
184 await self.disconnect_model(model)
185 await self.disconnect_controller(controller)
186
187 async def get_executed_actions(self, model_name: str) -> list:
188 """
189 Get executed/history of actions for a model.
190
191 :param: model_name: Model name, str.
192 :return: List of executed actions for a model.
193 """
194 model = None
195 executed_actions = []
196 controller = await self.get_controller()
197 try:
198 model = await self.get_model(controller, model_name)
199 # Get all unique action names
200 actions = {}
201 for application in model.applications:
202 application_actions = await self.get_actions(application, model_name)
203 actions.update(application_actions)
204 # Get status of all actions
205 for application_action in actions:
206 app_action_status_list = await model.get_action_status(
207 name=application_action
208 )
209 for action_id, action_status in app_action_status_list.items():
210 executed_action = {
211 "id": action_id,
212 "action": application_action,
213 "status": action_status,
214 }
215 # Get action output by id
216 action_status = await model.get_action_output(executed_action["id"])
217 for k, v in action_status.items():
218 executed_action[k] = v
219 executed_actions.append(executed_action)
220 except Exception as e:
221 raise JujuError(
222 "Error in getting executed actions for model: {}. Error: {}".format(
223 model_name, str(e)
224 )
225 )
226 finally:
227 if model:
228 await self.disconnect_model(model)
229 await self.disconnect_controller(controller)
230 return executed_actions
231
232 async def get_application_configs(
233 self, model_name: str, application_name: str
234 ) -> dict:
235 """
236 Get available configs for an application.
237
238 :param: model_name: Model name, str.
239 :param: application_name: Application name, str.
240
241 :return: A dict which has key - action name, value - action description
242 """
243 model = None
244 application_configs = {}
245 controller = await self.get_controller()
246 try:
247 model = await self.get_model(controller, model_name)
248 application = self._get_application(
249 model, application_name=application_name
250 )
251 application_configs = await application.get_config()
252 except Exception as e:
253 raise JujuError(
254 "Error in getting configs for application: {} in model: {}. Error: {}".format(
255 application_name, model_name, str(e)
256 )
257 )
258 finally:
259 if model:
260 await self.disconnect_model(model)
261 await self.disconnect_controller(controller)
262 return application_configs
263
264 @retry(attempts=3, delay=5)
265 async def get_model(self, controller: Controller, model_name: str) -> Model:
266 """
267 Get model from controller
268
269 :param: controller: Controller
270 :param: model_name: Model name
271
272 :return: Model: The created Juju model object
273 """
274 return await controller.get_model(model_name)
275
276 async def model_exists(self, model_name: str, controller: Controller = None) -> bool:
277 """
278 Check if model exists
279
280 :param: controller: Controller
281 :param: model_name: Model name
282
283 :return bool
284 """
285 need_to_disconnect = False
286
287 # Get controller if not passed
288 if not controller:
289 controller = await self.get_controller()
290 need_to_disconnect = True
291
292 # Check if model exists
293 try:
294 return model_name in await controller.list_models()
295 finally:
296 if need_to_disconnect:
297 await self.disconnect_controller(controller)
298
299 async def models_exist(self, model_names: [str]) -> (bool, list):
300 """
301 Check if models exists
302
303 :param: model_names: List of strings with model names
304
305 :return (bool, list[str]): (True if all models exists, List of model names that don't exist)
306 """
307 if not model_names:
308 raise Exception(
309 "model_names must be a non-empty array. Given value: {}".format(
310 model_names
311 )
312 )
313 non_existing_models = []
314 models = await self.list_models()
315 existing_models = list(set(models).intersection(model_names))
316 non_existing_models = list(set(model_names) - set(existing_models))
317
318 return (
319 len(non_existing_models) == 0,
320 non_existing_models,
321 )
322
323 async def get_model_status(self, model_name: str) -> FullStatus:
324 """
325 Get model status
326
327 :param: model_name: Model name
328
329 :return: Full status object
330 """
331 controller = await self.get_controller()
332 model = await self.get_model(controller, model_name)
333 try:
334 return await model.get_status()
335 finally:
336 await self.disconnect_model(model)
337 await self.disconnect_controller(controller)
338
339 async def create_machine(
340 self,
341 model_name: str,
342 machine_id: str = None,
343 db_dict: dict = None,
344 progress_timeout: float = None,
345 total_timeout: float = None,
346 series: str = "xenial",
347 wait: bool = True,
348 ) -> (Machine, bool):
349 """
350 Create machine
351
352 :param: model_name: Model name
353 :param: machine_id: Machine id
354 :param: db_dict: Dictionary with data of the DB to write the updates
355 :param: progress_timeout: Maximum time between two updates in the model
356 :param: total_timeout: Timeout for the entity to be active
357 :param: series: Series of the machine (xenial, bionic, focal, ...)
358 :param: wait: Wait until machine is ready
359
360 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
361 if the machine is new or it already existed
362 """
363 new = False
364 machine = None
365
366 self.log.debug(
367 "Creating machine (id={}) in model: {}".format(machine_id, model_name)
368 )
369
370 # Get controller
371 controller = await self.get_controller()
372
373 # Get model
374 model = await self.get_model(controller, model_name)
375 try:
376 if machine_id is not None:
377 self.log.debug(
378 "Searching machine (id={}) in model {}".format(
379 machine_id, model_name
380 )
381 )
382
383 # Get machines from model and get the machine with machine_id if exists
384 machines = await model.get_machines()
385 if machine_id in machines:
386 self.log.debug(
387 "Machine (id={}) found in model {}".format(
388 machine_id, model_name
389 )
390 )
391 machine = machines[machine_id]
392 else:
393 raise JujuMachineNotFound("Machine {} not found".format(machine_id))
394
395 if machine is None:
396 self.log.debug("Creating a new machine in model {}".format(model_name))
397
398 # Create machine
399 machine = await model.add_machine(
400 spec=None, constraints=None, disks=None, series=series
401 )
402 new = True
403
404 # Wait until the machine is ready
405 self.log.debug(
406 "Wait until machine {} is ready in model {}".format(
407 machine.entity_id, model_name
408 )
409 )
410 if wait:
411 await JujuModelWatcher.wait_for(
412 model=model,
413 entity=machine,
414 progress_timeout=progress_timeout,
415 total_timeout=total_timeout,
416 db_dict=db_dict,
417 n2vc=self.n2vc,
418 vca_id=self.vca_connection._vca_id,
419 )
420 finally:
421 await self.disconnect_model(model)
422 await self.disconnect_controller(controller)
423
424 self.log.debug(
425 "Machine {} ready at {} in model {}".format(
426 machine.entity_id, machine.dns_name, model_name
427 )
428 )
429 return machine, new
430
431 async def provision_machine(
432 self,
433 model_name: str,
434 hostname: str,
435 username: str,
436 private_key_path: str,
437 db_dict: dict = None,
438 progress_timeout: float = None,
439 total_timeout: float = None,
440 ) -> str:
441 """
442 Manually provisioning of a machine
443
444 :param: model_name: Model name
445 :param: hostname: IP to access the machine
446 :param: username: Username to login to the machine
447 :param: private_key_path: Local path for the private key
448 :param: db_dict: Dictionary with data of the DB to write the updates
449 :param: progress_timeout: Maximum time between two updates in the model
450 :param: total_timeout: Timeout for the entity to be active
451
452 :return: (Entity): Machine id
453 """
454 self.log.debug(
455 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
456 model_name, hostname, username
457 )
458 )
459
460 # Get controller
461 controller = await self.get_controller()
462
463 # Get model
464 model = await self.get_model(controller, model_name)
465
466 try:
467 # Get provisioner
468 provisioner = AsyncSSHProvisioner(
469 host=hostname,
470 user=username,
471 private_key_path=private_key_path,
472 log=self.log,
473 )
474
475 # Provision machine
476 params = await provisioner.provision_machine()
477
478 params.jobs = ["JobHostUnits"]
479
480 self.log.debug("Adding machine to model")
481 connection = model.connection()
482 client_facade = client.ClientFacade.from_connection(connection)
483
484 results = await client_facade.AddMachines(params=[params])
485 error = results.machines[0].error
486
487 if error:
488 msg = "Error adding machine: {}".format(error.message)
489 self.log.error(msg=msg)
490 raise ValueError(msg)
491
492 machine_id = results.machines[0].machine
493
494 self.log.debug("Installing Juju agent into machine {}".format(machine_id))
495 asyncio.ensure_future(
496 provisioner.install_agent(
497 connection=connection,
498 nonce=params.nonce,
499 machine_id=machine_id,
500 proxy=self.vca_connection.data.api_proxy,
501 series=params.series,
502 )
503 )
504
505 machine = None
506 for _ in range(10):
507 machine_list = await model.get_machines()
508 if machine_id in machine_list:
509 self.log.debug("Machine {} found in model!".format(machine_id))
510 machine = model.machines.get(machine_id)
511 break
512 await asyncio.sleep(2)
513
514 if machine is None:
515 msg = "Machine {} not found in model".format(machine_id)
516 self.log.error(msg=msg)
517 raise JujuMachineNotFound(msg)
518
519 self.log.debug(
520 "Wait until machine {} is ready in model {}".format(
521 machine.entity_id, model_name
522 )
523 )
524 await JujuModelWatcher.wait_for(
525 model=model,
526 entity=machine,
527 progress_timeout=progress_timeout,
528 total_timeout=total_timeout,
529 db_dict=db_dict,
530 n2vc=self.n2vc,
531 vca_id=self.vca_connection._vca_id,
532 )
533 except Exception as e:
534 raise e
535 finally:
536 await self.disconnect_model(model)
537 await self.disconnect_controller(controller)
538
539 self.log.debug(
540 "Machine provisioned {} in model {}".format(machine_id, model_name)
541 )
542
543 return machine_id
544
545 async def deploy(
546 self, uri: str, model_name: str, wait: bool = True, timeout: float = 3600
547 ):
548 """
549 Deploy bundle or charm: Similar to the juju CLI command `juju deploy`
550
551 :param: uri: Path or Charm Store uri in which the charm or bundle can be found
552 :param: model_name: Model name
553 :param: wait: Indicates whether to wait or not until all applications are active
554 :param: timeout: Time in seconds to wait until all applications are active
555 """
556 controller = await self.get_controller()
557 model = await self.get_model(controller, model_name)
558 try:
559 await model.deploy(uri)
560 if wait:
561 await JujuModelWatcher.wait_for_model(model, timeout=timeout)
562 self.log.debug("All units active in model {}".format(model_name))
563 finally:
564 await self.disconnect_model(model)
565 await self.disconnect_controller(controller)
566
567 async def deploy_charm(
568 self,
569 application_name: str,
570 path: str,
571 model_name: str,
572 machine_id: str,
573 db_dict: dict = None,
574 progress_timeout: float = None,
575 total_timeout: float = None,
576 config: dict = None,
577 series: str = None,
578 num_units: int = 1,
579 ):
580 """Deploy charm
581
582 :param: application_name: Application name
583 :param: path: Local path to the charm
584 :param: model_name: Model name
585 :param: machine_id ID of the machine
586 :param: db_dict: Dictionary with data of the DB to write the updates
587 :param: progress_timeout: Maximum time between two updates in the model
588 :param: total_timeout: Timeout for the entity to be active
589 :param: config: Config for the charm
590 :param: series: Series of the charm
591 :param: num_units: Number of units
592
593 :return: (juju.application.Application): Juju application
594 """
595 self.log.debug(
596 "Deploying charm {} to machine {} in model ~{}".format(
597 application_name, machine_id, model_name
598 )
599 )
600 self.log.debug("charm: {}".format(path))
601
602 # Get controller
603 controller = await self.get_controller()
604
605 # Get model
606 model = await self.get_model(controller, model_name)
607
608 try:
609 application = None
610 if application_name not in model.applications:
611
612 if machine_id is not None:
613 if machine_id not in model.machines:
614 msg = "Machine {} not found in model".format(machine_id)
615 self.log.error(msg=msg)
616 raise JujuMachineNotFound(msg)
617 machine = model.machines[machine_id]
618 series = machine.series
619
620 application = await model.deploy(
621 entity_url=path,
622 application_name=application_name,
623 channel="stable",
624 num_units=1,
625 series=series,
626 to=machine_id,
627 config=config,
628 )
629
630 self.log.debug(
631 "Wait until application {} is ready in model {}".format(
632 application_name, model_name
633 )
634 )
635 if num_units > 1:
636 for _ in range(num_units - 1):
637 m, _ = await self.create_machine(model_name, wait=False)
638 await application.add_unit(to=m.entity_id)
639
640 await JujuModelWatcher.wait_for(
641 model=model,
642 entity=application,
643 progress_timeout=progress_timeout,
644 total_timeout=total_timeout,
645 db_dict=db_dict,
646 n2vc=self.n2vc,
647 vca_id=self.vca_connection._vca_id,
648 )
649 self.log.debug(
650 "Application {} is ready in model {}".format(
651 application_name, model_name
652 )
653 )
654 else:
655 raise JujuApplicationExists(
656 "Application {} exists".format(application_name)
657 )
658 finally:
659 await self.disconnect_model(model)
660 await self.disconnect_controller(controller)
661
662 return application
663
664 def _get_application(self, model: Model, application_name: str) -> Application:
665 """Get application
666
667 :param: model: Model object
668 :param: application_name: Application name
669
670 :return: juju.application.Application (or None if it doesn't exist)
671 """
672 if model.applications and application_name in model.applications:
673 return model.applications[application_name]
674
675 async def execute_action(
676 self,
677 application_name: str,
678 model_name: str,
679 action_name: str,
680 db_dict: dict = None,
681 progress_timeout: float = None,
682 total_timeout: float = None,
683 **kwargs
684 ):
685 """Execute action
686
687 :param: application_name: Application name
688 :param: model_name: Model name
689 :param: action_name: Name of the action
690 :param: db_dict: Dictionary with data of the DB to write the updates
691 :param: progress_timeout: Maximum time between two updates in the model
692 :param: total_timeout: Timeout for the entity to be active
693
694 :return: (str, str): (output and status)
695 """
696 self.log.debug(
697 "Executing action {} using params {}".format(action_name, kwargs)
698 )
699 # Get controller
700 controller = await self.get_controller()
701
702 # Get model
703 model = await self.get_model(controller, model_name)
704
705 try:
706 # Get application
707 application = self._get_application(
708 model,
709 application_name=application_name,
710 )
711 if application is None:
712 raise JujuApplicationNotFound("Cannot execute action")
713
714 # Get leader unit
715 # Racing condition:
716 # Ocassionally, self._get_leader_unit() will return None
717 # because the leader elected hook has not been triggered yet.
718 # Therefore, we are doing some retries. If it happens again,
719 # re-open bug 1236
720 unit = await self._get_leader_unit(application)
721
722 actions = await application.get_actions()
723
724 if action_name not in actions:
725 raise JujuActionNotFound(
726 "Action {} not in available actions".format(action_name)
727 )
728
729 action = await unit.run_action(action_name, **kwargs)
730
731 self.log.debug(
732 "Wait until action {} is completed in application {} (model={})".format(
733 action_name, application_name, model_name
734 )
735 )
736 await JujuModelWatcher.wait_for(
737 model=model,
738 entity=action,
739 progress_timeout=progress_timeout,
740 total_timeout=total_timeout,
741 db_dict=db_dict,
742 n2vc=self.n2vc,
743 vca_id=self.vca_connection._vca_id,
744 )
745
746 output = await model.get_action_output(action_uuid=action.entity_id)
747 status = await model.get_action_status(uuid_or_prefix=action.entity_id)
748 status = (
749 status[action.entity_id] if action.entity_id in status else "failed"
750 )
751
752 self.log.debug(
753 "Action {} completed with status {} in application {} (model={})".format(
754 action_name, action.status, application_name, model_name
755 )
756 )
757 finally:
758 await self.disconnect_model(model)
759 await self.disconnect_controller(controller)
760
761 return output, status
762
763 async def get_actions(self, application_name: str, model_name: str) -> dict:
764 """Get list of actions
765
766 :param: application_name: Application name
767 :param: model_name: Model name
768
769 :return: Dict with this format
770 {
771 "action_name": "Description of the action",
772 ...
773 }
774 """
775 self.log.debug(
776 "Getting list of actions for application {}".format(application_name)
777 )
778
779 # Get controller
780 controller = await self.get_controller()
781
782 # Get model
783 model = await self.get_model(controller, model_name)
784
785 try:
786 # Get application
787 application = self._get_application(
788 model,
789 application_name=application_name,
790 )
791
792 # Return list of actions
793 return await application.get_actions()
794
795 finally:
796 # Disconnect from model and controller
797 await self.disconnect_model(model)
798 await self.disconnect_controller(controller)
799
800 async def get_metrics(self, model_name: str, application_name: str) -> dict:
801 """Get the metrics collected by the VCA.
802
803 :param model_name The name or unique id of the network service
804 :param application_name The name of the application
805 """
806 if not model_name or not application_name:
807 raise Exception("model_name and application_name must be non-empty strings")
808 metrics = {}
809 controller = await self.get_controller()
810 model = await self.get_model(controller, model_name)
811 try:
812 application = self._get_application(model, application_name)
813 if application is not None:
814 metrics = await application.get_metrics()
815 finally:
816 self.disconnect_model(model)
817 self.disconnect_controller(controller)
818 return metrics
819
820 async def add_relation(
821 self,
822 model_name: str,
823 endpoint_1: str,
824 endpoint_2: str,
825 ):
826 """Add relation
827
828 :param: model_name: Model name
829 :param: endpoint_1 First endpoint name
830 ("app:endpoint" format or directly the saas name)
831 :param: endpoint_2: Second endpoint name (^ same format)
832 """
833
834 self.log.debug("Adding relation: {} -> {}".format(endpoint_1, endpoint_2))
835
836 # Get controller
837 controller = await self.get_controller()
838
839 # Get model
840 model = await self.get_model(controller, model_name)
841
842 # Add relation
843 try:
844 await model.add_relation(endpoint_1, endpoint_2)
845 except JujuAPIError as e:
846 if "not found" in e.message:
847 self.log.warning("Relation not found: {}".format(e.message))
848 return
849 if "already exists" in e.message:
850 self.log.warning("Relation already exists: {}".format(e.message))
851 return
852 # another exception, raise it
853 raise e
854 finally:
855 await self.disconnect_model(model)
856 await self.disconnect_controller(controller)
857
858 async def consume(
859 self,
860 offer_url: str,
861 model_name: str,
862 ):
863 """
864 Adds a remote offer to the model. Relations can be created later using "juju relate".
865
866 :param: offer_url: Offer Url
867 :param: model_name: Model name
868
869 :raises ParseError if there's a problem parsing the offer_url
870 :raises JujuError if remote offer includes and endpoint
871 :raises JujuAPIError if the operation is not successful
872 """
873 controller = await self.get_controller()
874 model = await controller.get_model(model_name)
875
876 try:
877 await model.consume(offer_url)
878 finally:
879 await self.disconnect_model(model)
880 await self.disconnect_controller(controller)
881
882 async def destroy_model(self, model_name: str, total_timeout: float):
883 """
884 Destroy model
885
886 :param: model_name: Model name
887 :param: total_timeout: Timeout
888 """
889
890 controller = await self.get_controller()
891 model = None
892 try:
893 if not await self.model_exists(model_name, controller=controller):
894 return
895
896 model = await self.get_model(controller, model_name)
897 self.log.debug("Destroying model {}".format(model_name))
898 uuid = model.info.uuid
899
900 # Destroy machines that are manually provisioned
901 # and still are in pending state
902 await self._destroy_pending_machines(model, only_manual=True)
903
904 # Disconnect model
905 await self.disconnect_model(model)
906
907 await controller.destroy_model(uuid, force=True, max_wait=0)
908
909 # Wait until model is destroyed
910 self.log.debug("Waiting for model {} to be destroyed...".format(model_name))
911
912 if total_timeout is None:
913 total_timeout = 3600
914 end = time.time() + total_timeout
915 while time.time() < end:
916 models = await controller.list_models()
917 if model_name not in models:
918 self.log.debug(
919 "The model {} ({}) was destroyed".format(model_name, uuid)
920 )
921 return
922 await asyncio.sleep(5)
923 raise Exception(
924 "Timeout waiting for model {} to be destroyed".format(model_name)
925 )
926 except Exception as e:
927 if model:
928 await self.disconnect_model(model)
929 raise e
930 finally:
931 await self.disconnect_controller(controller)
932
933 async def destroy_application(
934 self, model_name: str, application_name: str, total_timeout: float
935 ):
936 """
937 Destroy application
938
939 :param: model_name: Model name
940 :param: application_name: Application name
941 :param: total_timeout: Timeout
942 """
943
944 controller = await self.get_controller()
945 model = None
946
947 try:
948 model = await self.get_model(controller, model_name)
949 self.log.debug(
950 "Destroying application {} in model {}".format(
951 application_name, model_name
952 )
953 )
954 application = self._get_application(model, application_name)
955 if application:
956 await application.destroy()
957 else:
958 self.log.warning("Application not found: {}".format(application_name))
959
960 self.log.debug(
961 "Waiting for application {} to be destroyed in model {}...".format(
962 application_name, model_name
963 )
964 )
965 if total_timeout is None:
966 total_timeout = 3600
967 end = time.time() + total_timeout
968 while time.time() < end:
969 if not self._get_application(model, application_name):
970 self.log.debug(
971 "The application {} was destroyed in model {} ".format(
972 application_name, model_name
973 )
974 )
975 return
976 await asyncio.sleep(5)
977 raise Exception(
978 "Timeout waiting for application {} to be destroyed in model {}".format(
979 application_name, model_name
980 )
981 )
982 finally:
983 if model is not None:
984 await self.disconnect_model(model)
985 await self.disconnect_controller(controller)
986
987 async def _destroy_pending_machines(self, model: Model, only_manual: bool = False):
988 """
989 Destroy pending machines in a given model
990
991 :param: only_manual: Bool that indicates only manually provisioned
992 machines should be destroyed (if True), or that
993 all pending machines should be destroyed
994 """
995 status = await model.get_status()
996 for machine_id in status.machines:
997 machine_status = status.machines[machine_id]
998 if machine_status.agent_status.status == "pending":
999 if only_manual and not machine_status.instance_id.startswith("manual:"):
1000 break
1001 machine = model.machines[machine_id]
1002 await machine.destroy(force=True)
1003
1004 async def configure_application(
1005 self, model_name: str, application_name: str, config: dict = None
1006 ):
1007 """Configure application
1008
1009 :param: model_name: Model name
1010 :param: application_name: Application name
1011 :param: config: Config to apply to the charm
1012 """
1013 self.log.debug("Configuring application {}".format(application_name))
1014
1015 if config:
1016 controller = await self.get_controller()
1017 model = None
1018 try:
1019 model = await self.get_model(controller, model_name)
1020 application = self._get_application(
1021 model,
1022 application_name=application_name,
1023 )
1024 await application.set_config(config)
1025 finally:
1026 if model:
1027 await self.disconnect_model(model)
1028 await self.disconnect_controller(controller)
1029
1030 def handle_exception(self, loop, context):
1031 # All unhandled exceptions by libjuju are handled here.
1032 pass
1033
1034 async def health_check(self, interval: float = 300.0):
1035 """
1036 Health check to make sure controller and controller_model connections are OK
1037
1038 :param: interval: Time in seconds between checks
1039 """
1040 controller = None
1041 while True:
1042 try:
1043 controller = await self.get_controller()
1044 # self.log.debug("VCA is alive")
1045 except Exception as e:
1046 self.log.error("Health check to VCA failed: {}".format(e))
1047 finally:
1048 await self.disconnect_controller(controller)
1049 await asyncio.sleep(interval)
1050
1051 async def list_models(self, contains: str = None) -> [str]:
1052 """List models with certain names
1053
1054 :param: contains: String that is contained in model name
1055
1056 :retur: [models] Returns list of model names
1057 """
1058
1059 controller = await self.get_controller()
1060 try:
1061 models = await controller.list_models()
1062 if contains:
1063 models = [model for model in models if contains in model]
1064 return models
1065 finally:
1066 await self.disconnect_controller(controller)
1067
1068 async def list_offers(self, model_name: str) -> QueryApplicationOffersResults:
1069 """List models with certain names
1070
1071 :param: model_name: Model name
1072
1073 :return: Returns list of offers
1074 """
1075
1076 controller = await self.get_controller()
1077 try:
1078 return await controller.list_offers(model_name)
1079 finally:
1080 await self.disconnect_controller(controller)
1081
1082 async def add_k8s(
1083 self,
1084 name: str,
1085 rbac_id: str,
1086 token: str,
1087 client_cert_data: str,
1088 configuration: Configuration,
1089 storage_class: str,
1090 credential_name: str = None,
1091 ):
1092 """
1093 Add a Kubernetes cloud to the controller
1094
1095 Similar to the `juju add-k8s` command in the CLI
1096
1097 :param: name: Name for the K8s cloud
1098 :param: configuration: Kubernetes configuration object
1099 :param: storage_class: Storage Class to use in the cloud
1100 :param: credential_name: Storage Class to use in the cloud
1101 """
1102
1103 if not storage_class:
1104 raise Exception("storage_class must be a non-empty string")
1105 if not name:
1106 raise Exception("name must be a non-empty string")
1107 if not configuration:
1108 raise Exception("configuration must be provided")
1109
1110 endpoint = configuration.host
1111 credential = self.get_k8s_cloud_credential(
1112 configuration,
1113 client_cert_data,
1114 token,
1115 )
1116 credential.attrs[RBAC_LABEL_KEY_NAME] = rbac_id
1117 cloud = client.Cloud(
1118 type_="kubernetes",
1119 auth_types=[credential.auth_type],
1120 endpoint=endpoint,
1121 ca_certificates=[client_cert_data],
1122 config={
1123 "operator-storage": storage_class,
1124 "workload-storage": storage_class,
1125 },
1126 )
1127
1128 return await self.add_cloud(
1129 name, cloud, credential, credential_name=credential_name
1130 )
1131
1132 def get_k8s_cloud_credential(
1133 self,
1134 configuration: Configuration,
1135 client_cert_data: str,
1136 token: str = None,
1137 ) -> client.CloudCredential:
1138 attrs = {}
1139 # TODO: Test with AKS
1140 key = None # open(configuration.key_file, "r").read()
1141 username = configuration.username
1142 password = configuration.password
1143
1144 if client_cert_data:
1145 attrs["ClientCertificateData"] = client_cert_data
1146 if key:
1147 attrs["ClientKeyData"] = key
1148 if token:
1149 if username or password:
1150 raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass")
1151 attrs["Token"] = token
1152
1153 auth_type = None
1154 if key:
1155 auth_type = "oauth2"
1156 if client_cert_data:
1157 auth_type = "oauth2withcert"
1158 if not token:
1159 raise JujuInvalidK8sConfiguration(
1160 "missing token for auth type {}".format(auth_type)
1161 )
1162 elif username:
1163 if not password:
1164 self.log.debug(
1165 "credential for user {} has empty password".format(username)
1166 )
1167 attrs["username"] = username
1168 attrs["password"] = password
1169 if client_cert_data:
1170 auth_type = "userpasswithcert"
1171 else:
1172 auth_type = "userpass"
1173 elif client_cert_data and token:
1174 auth_type = "certificate"
1175 else:
1176 raise JujuInvalidK8sConfiguration("authentication method not supported")
1177 return client.CloudCredential(auth_type=auth_type, attrs=attrs)
1178
1179 async def add_cloud(
1180 self,
1181 name: str,
1182 cloud: Cloud,
1183 credential: CloudCredential = None,
1184 credential_name: str = None,
1185 ) -> Cloud:
1186 """
1187 Add cloud to the controller
1188
1189 :param: name: Name of the cloud to be added
1190 :param: cloud: Cloud object
1191 :param: credential: CloudCredentials object for the cloud
1192 :param: credential_name: Credential name.
1193 If not defined, cloud of the name will be used.
1194 """
1195 controller = await self.get_controller()
1196 try:
1197 _ = await controller.add_cloud(name, cloud)
1198 if credential:
1199 await controller.add_credential(
1200 credential_name or name, credential=credential, cloud=name
1201 )
1202 # Need to return the object returned by the controller.add_cloud() function
1203 # I'm returning the original value now until this bug is fixed:
1204 # https://github.com/juju/python-libjuju/issues/443
1205 return cloud
1206 finally:
1207 await self.disconnect_controller(controller)
1208
1209 async def remove_cloud(self, name: str):
1210 """
1211 Remove cloud
1212
1213 :param: name: Name of the cloud to be removed
1214 """
1215 controller = await self.get_controller()
1216 try:
1217 await controller.remove_cloud(name)
1218 finally:
1219 await self.disconnect_controller(controller)
1220
1221 @retry(attempts=20, delay=5, fallback=JujuLeaderUnitNotFound())
1222 async def _get_leader_unit(self, application: Application) -> Unit:
1223 unit = None
1224 for u in application.units:
1225 if await u.is_leader_from_status():
1226 unit = u
1227 break
1228 if not unit:
1229 raise Exception()
1230 return unit
1231
1232 async def get_cloud_credentials(self, cloud: Cloud) -> typing.List:
1233 """
1234 Get cloud credentials
1235
1236 :param: cloud: Cloud object. The returned credentials will be from this cloud.
1237
1238 :return: List of credentials object associated to the specified cloud
1239
1240 """
1241 controller = await self.get_controller()
1242 try:
1243 facade = client.CloudFacade.from_connection(controller.connection())
1244 cloud_cred_tag = tag.credential(
1245 cloud.name, self.vca_connection.data.user, cloud.credential_name
1246 )
1247 params = [client.Entity(cloud_cred_tag)]
1248 return (await facade.Credential(params)).results
1249 finally:
1250 await self.disconnect_controller(controller)