Remove unsecure and unused function
[osm/N2VC.git] / n2vc / libjuju.py
1 # Copyright 2020 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import logging
17 import typing
18
19 import time
20
21 import juju.errors
22 from juju.model import Model
23 from juju.machine import Machine
24 from juju.application import Application
25 from juju.unit import Unit
26 from juju.client._definitions import (
27 FullStatus,
28 QueryApplicationOffersResults,
29 Cloud,
30 CloudCredential,
31 )
32 from juju.controller import Controller
33 from juju.client import client
34 from juju import tag
35
36 from n2vc.juju_watcher import JujuModelWatcher
37 from n2vc.provisioner import AsyncSSHProvisioner
38 from n2vc.n2vc_conn import N2VCConnector
39 from n2vc.exceptions import (
40 JujuMachineNotFound,
41 JujuApplicationNotFound,
42 JujuLeaderUnitNotFound,
43 JujuActionNotFound,
44 JujuControllerFailedConnecting,
45 JujuApplicationExists,
46 JujuInvalidK8sConfiguration,
47 JujuError,
48 )
49 from n2vc.vca.cloud import Cloud as VcaCloud
50 from n2vc.vca.connection import Connection
51 from kubernetes.client.configuration import Configuration
52 from retrying_async import retry
53
54
55 RBAC_LABEL_KEY_NAME = "rbac-id"
56
57
58 class Libjuju:
59 def __init__(
60 self,
61 vca_connection: Connection,
62 loop: asyncio.AbstractEventLoop = None,
63 log: logging.Logger = None,
64 n2vc: N2VCConnector = None,
65 ):
66 """
67 Constructor
68
69 :param: vca_connection: n2vc.vca.connection object
70 :param: loop: Asyncio loop
71 :param: log: Logger
72 :param: n2vc: N2VC object
73 """
74
75 self.log = log or logging.getLogger("Libjuju")
76 self.n2vc = n2vc
77 self.vca_connection = vca_connection
78
79 self.loop = loop or asyncio.get_event_loop()
80 self.loop.set_exception_handler(self.handle_exception)
81 self.creating_model = asyncio.Lock(loop=self.loop)
82
83 if self.vca_connection.is_default:
84 self.health_check_task = self._create_health_check_task()
85
86 def _create_health_check_task(self):
87 return self.loop.create_task(self.health_check())
88
89 async def get_controller(self, timeout: float = 60.0) -> Controller:
90 """
91 Get controller
92
93 :param: timeout: Time in seconds to wait for controller to connect
94 """
95 controller = None
96 try:
97 controller = Controller(loop=self.loop)
98 await asyncio.wait_for(
99 controller.connect(
100 endpoint=self.vca_connection.data.endpoints,
101 username=self.vca_connection.data.user,
102 password=self.vca_connection.data.secret,
103 cacert=self.vca_connection.data.cacert,
104 ),
105 timeout=timeout,
106 )
107 if self.vca_connection.is_default:
108 endpoints = await controller.api_endpoints
109 if not all(
110 endpoint in self.vca_connection.endpoints for endpoint in endpoints
111 ):
112 await self.vca_connection.update_endpoints(endpoints)
113 return controller
114 except asyncio.CancelledError as e:
115 raise e
116 except Exception as e:
117 self.log.error(
118 "Failed connecting to controller: {}... {}".format(
119 self.vca_connection.data.endpoints, e
120 )
121 )
122 if controller:
123 await self.disconnect_controller(controller)
124 raise JujuControllerFailedConnecting(e)
125
126 async def disconnect(self):
127 """Disconnect"""
128 # Cancel health check task
129 self.health_check_task.cancel()
130 self.log.debug("Libjuju disconnected!")
131
132 async def disconnect_model(self, model: Model):
133 """
134 Disconnect model
135
136 :param: model: Model that will be disconnected
137 """
138 await model.disconnect()
139
140 async def disconnect_controller(self, controller: Controller):
141 """
142 Disconnect controller
143
144 :param: controller: Controller that will be disconnected
145 """
146 if controller:
147 await controller.disconnect()
148
149 @retry(attempts=3, delay=5, timeout=None)
150 async def add_model(self, model_name: str, cloud: VcaCloud):
151 """
152 Create model
153
154 :param: model_name: Model name
155 :param: cloud: Cloud object
156 """
157
158 # Get controller
159 controller = await self.get_controller()
160 model = None
161 try:
162 # Block until other workers have finished model creation
163 while self.creating_model.locked():
164 await asyncio.sleep(0.1)
165
166 # Create the model
167 async with self.creating_model:
168 if await self.model_exists(model_name, controller=controller):
169 return
170 self.log.debug("Creating model {}".format(model_name))
171 model = await controller.add_model(
172 model_name,
173 config=self.vca_connection.data.model_config,
174 cloud_name=cloud.name,
175 credential_name=cloud.credential_name,
176 )
177 except juju.errors.JujuAPIError as e:
178 if "already exists" in e.message:
179 pass
180 else:
181 raise e
182 finally:
183 if model:
184 await self.disconnect_model(model)
185 await self.disconnect_controller(controller)
186
187 async def get_executed_actions(self, model_name: str) -> list:
188 """
189 Get executed/history of actions for a model.
190
191 :param: model_name: Model name, str.
192 :return: List of executed actions for a model.
193 """
194 model = None
195 executed_actions = []
196 controller = await self.get_controller()
197 try:
198 model = await self.get_model(controller, model_name)
199 # Get all unique action names
200 actions = {}
201 for application in model.applications:
202 application_actions = await self.get_actions(application, model_name)
203 actions.update(application_actions)
204 # Get status of all actions
205 for application_action in actions:
206 app_action_status_list = await model.get_action_status(
207 name=application_action
208 )
209 for action_id, action_status in app_action_status_list.items():
210 executed_action = {
211 "id": action_id,
212 "action": application_action,
213 "status": action_status,
214 }
215 # Get action output by id
216 action_status = await model.get_action_output(executed_action["id"])
217 for k, v in action_status.items():
218 executed_action[k] = v
219 executed_actions.append(executed_action)
220 except Exception as e:
221 raise JujuError(
222 "Error in getting executed actions for model: {}. Error: {}".format(
223 model_name, str(e)
224 )
225 )
226 finally:
227 if model:
228 await self.disconnect_model(model)
229 await self.disconnect_controller(controller)
230 return executed_actions
231
232 async def get_application_configs(
233 self, model_name: str, application_name: str
234 ) -> dict:
235 """
236 Get available configs for an application.
237
238 :param: model_name: Model name, str.
239 :param: application_name: Application name, str.
240
241 :return: A dict which has key - action name, value - action description
242 """
243 model = None
244 application_configs = {}
245 controller = await self.get_controller()
246 try:
247 model = await self.get_model(controller, model_name)
248 application = self._get_application(
249 model, application_name=application_name
250 )
251 application_configs = await application.get_config()
252 except Exception as e:
253 raise JujuError(
254 "Error in getting configs for application: {} in model: {}. Error: {}".format(
255 application_name, model_name, str(e)
256 )
257 )
258 finally:
259 if model:
260 await self.disconnect_model(model)
261 await self.disconnect_controller(controller)
262 return application_configs
263
264 @retry(attempts=3, delay=5)
265 async def get_model(self, controller: Controller, model_name: str) -> Model:
266 """
267 Get model from controller
268
269 :param: controller: Controller
270 :param: model_name: Model name
271
272 :return: Model: The created Juju model object
273 """
274 return await controller.get_model(model_name)
275
276 async def model_exists(
277 self, model_name: str, controller: Controller = None
278 ) -> bool:
279 """
280 Check if model exists
281
282 :param: controller: Controller
283 :param: model_name: Model name
284
285 :return bool
286 """
287 need_to_disconnect = False
288
289 # Get controller if not passed
290 if not controller:
291 controller = await self.get_controller()
292 need_to_disconnect = True
293
294 # Check if model exists
295 try:
296 return model_name in await controller.list_models()
297 finally:
298 if need_to_disconnect:
299 await self.disconnect_controller(controller)
300
301 async def models_exist(self, model_names: [str]) -> (bool, list):
302 """
303 Check if models exists
304
305 :param: model_names: List of strings with model names
306
307 :return (bool, list[str]): (True if all models exists, List of model names that don't exist)
308 """
309 if not model_names:
310 raise Exception(
311 "model_names must be a non-empty array. Given value: {}".format(
312 model_names
313 )
314 )
315 non_existing_models = []
316 models = await self.list_models()
317 existing_models = list(set(models).intersection(model_names))
318 non_existing_models = list(set(model_names) - set(existing_models))
319
320 return (
321 len(non_existing_models) == 0,
322 non_existing_models,
323 )
324
325 async def get_model_status(self, model_name: str) -> FullStatus:
326 """
327 Get model status
328
329 :param: model_name: Model name
330
331 :return: Full status object
332 """
333 controller = await self.get_controller()
334 model = await self.get_model(controller, model_name)
335 try:
336 return await model.get_status()
337 finally:
338 await self.disconnect_model(model)
339 await self.disconnect_controller(controller)
340
341 async def create_machine(
342 self,
343 model_name: str,
344 machine_id: str = None,
345 db_dict: dict = None,
346 progress_timeout: float = None,
347 total_timeout: float = None,
348 series: str = "bionic",
349 wait: bool = True,
350 ) -> (Machine, bool):
351 """
352 Create machine
353
354 :param: model_name: Model name
355 :param: machine_id: Machine id
356 :param: db_dict: Dictionary with data of the DB to write the updates
357 :param: progress_timeout: Maximum time between two updates in the model
358 :param: total_timeout: Timeout for the entity to be active
359 :param: series: Series of the machine (xenial, bionic, focal, ...)
360 :param: wait: Wait until machine is ready
361
362 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
363 if the machine is new or it already existed
364 """
365 new = False
366 machine = None
367
368 self.log.debug(
369 "Creating machine (id={}) in model: {}".format(machine_id, model_name)
370 )
371
372 # Get controller
373 controller = await self.get_controller()
374
375 # Get model
376 model = await self.get_model(controller, model_name)
377 try:
378 if machine_id is not None:
379 self.log.debug(
380 "Searching machine (id={}) in model {}".format(
381 machine_id, model_name
382 )
383 )
384
385 # Get machines from model and get the machine with machine_id if exists
386 machines = await model.get_machines()
387 if machine_id in machines:
388 self.log.debug(
389 "Machine (id={}) found in model {}".format(
390 machine_id, model_name
391 )
392 )
393 machine = machines[machine_id]
394 else:
395 raise JujuMachineNotFound("Machine {} not found".format(machine_id))
396
397 if machine is None:
398 self.log.debug("Creating a new machine in model {}".format(model_name))
399
400 # Create machine
401 machine = await model.add_machine(
402 spec=None, constraints=None, disks=None, series=series
403 )
404 new = True
405
406 # Wait until the machine is ready
407 self.log.debug(
408 "Wait until machine {} is ready in model {}".format(
409 machine.entity_id, model_name
410 )
411 )
412 if wait:
413 await JujuModelWatcher.wait_for(
414 model=model,
415 entity=machine,
416 progress_timeout=progress_timeout,
417 total_timeout=total_timeout,
418 db_dict=db_dict,
419 n2vc=self.n2vc,
420 vca_id=self.vca_connection._vca_id,
421 )
422 finally:
423 await self.disconnect_model(model)
424 await self.disconnect_controller(controller)
425
426 self.log.debug(
427 "Machine {} ready at {} in model {}".format(
428 machine.entity_id, machine.dns_name, model_name
429 )
430 )
431 return machine, new
432
433 async def provision_machine(
434 self,
435 model_name: str,
436 hostname: str,
437 username: str,
438 private_key_path: str,
439 db_dict: dict = None,
440 progress_timeout: float = None,
441 total_timeout: float = None,
442 ) -> str:
443 """
444 Manually provisioning of a machine
445
446 :param: model_name: Model name
447 :param: hostname: IP to access the machine
448 :param: username: Username to login to the machine
449 :param: private_key_path: Local path for the private key
450 :param: db_dict: Dictionary with data of the DB to write the updates
451 :param: progress_timeout: Maximum time between two updates in the model
452 :param: total_timeout: Timeout for the entity to be active
453
454 :return: (Entity): Machine id
455 """
456 self.log.debug(
457 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
458 model_name, hostname, username
459 )
460 )
461
462 # Get controller
463 controller = await self.get_controller()
464
465 # Get model
466 model = await self.get_model(controller, model_name)
467
468 try:
469 # Get provisioner
470 provisioner = AsyncSSHProvisioner(
471 host=hostname,
472 user=username,
473 private_key_path=private_key_path,
474 log=self.log,
475 )
476
477 # Provision machine
478 params = await provisioner.provision_machine()
479
480 params.jobs = ["JobHostUnits"]
481
482 self.log.debug("Adding machine to model")
483 connection = model.connection()
484 client_facade = client.ClientFacade.from_connection(connection)
485
486 results = await client_facade.AddMachines(params=[params])
487 error = results.machines[0].error
488
489 if error:
490 msg = "Error adding machine: {}".format(error.message)
491 self.log.error(msg=msg)
492 raise ValueError(msg)
493
494 machine_id = results.machines[0].machine
495
496 self.log.debug("Installing Juju agent into machine {}".format(machine_id))
497 asyncio.ensure_future(
498 provisioner.install_agent(
499 connection=connection,
500 nonce=params.nonce,
501 machine_id=machine_id,
502 proxy=self.vca_connection.data.api_proxy,
503 series=params.series,
504 )
505 )
506
507 machine = None
508 for _ in range(10):
509 machine_list = await model.get_machines()
510 if machine_id in machine_list:
511 self.log.debug("Machine {} found in model!".format(machine_id))
512 machine = model.machines.get(machine_id)
513 break
514 await asyncio.sleep(2)
515
516 if machine is None:
517 msg = "Machine {} not found in model".format(machine_id)
518 self.log.error(msg=msg)
519 raise JujuMachineNotFound(msg)
520
521 self.log.debug(
522 "Wait until machine {} is ready in model {}".format(
523 machine.entity_id, model_name
524 )
525 )
526 await JujuModelWatcher.wait_for(
527 model=model,
528 entity=machine,
529 progress_timeout=progress_timeout,
530 total_timeout=total_timeout,
531 db_dict=db_dict,
532 n2vc=self.n2vc,
533 vca_id=self.vca_connection._vca_id,
534 )
535 except Exception as e:
536 raise e
537 finally:
538 await self.disconnect_model(model)
539 await self.disconnect_controller(controller)
540
541 self.log.debug(
542 "Machine provisioned {} in model {}".format(machine_id, model_name)
543 )
544
545 return machine_id
546
547 async def deploy(
548 self, uri: str, model_name: str, wait: bool = True, timeout: float = 3600
549 ):
550 """
551 Deploy bundle or charm: Similar to the juju CLI command `juju deploy`
552
553 :param: uri: Path or Charm Store uri in which the charm or bundle can be found
554 :param: model_name: Model name
555 :param: wait: Indicates whether to wait or not until all applications are active
556 :param: timeout: Time in seconds to wait until all applications are active
557 """
558 controller = await self.get_controller()
559 model = await self.get_model(controller, model_name)
560 try:
561 await model.deploy(uri)
562 if wait:
563 await JujuModelWatcher.wait_for_model(model, timeout=timeout)
564 self.log.debug("All units active in model {}".format(model_name))
565 finally:
566 await self.disconnect_model(model)
567 await self.disconnect_controller(controller)
568
569 async def deploy_charm(
570 self,
571 application_name: str,
572 path: str,
573 model_name: str,
574 machine_id: str,
575 db_dict: dict = None,
576 progress_timeout: float = None,
577 total_timeout: float = None,
578 config: dict = None,
579 series: str = None,
580 num_units: int = 1,
581 ):
582 """Deploy charm
583
584 :param: application_name: Application name
585 :param: path: Local path to the charm
586 :param: model_name: Model name
587 :param: machine_id ID of the machine
588 :param: db_dict: Dictionary with data of the DB to write the updates
589 :param: progress_timeout: Maximum time between two updates in the model
590 :param: total_timeout: Timeout for the entity to be active
591 :param: config: Config for the charm
592 :param: series: Series of the charm
593 :param: num_units: Number of units
594
595 :return: (juju.application.Application): Juju application
596 """
597 self.log.debug(
598 "Deploying charm {} to machine {} in model ~{}".format(
599 application_name, machine_id, model_name
600 )
601 )
602 self.log.debug("charm: {}".format(path))
603
604 # Get controller
605 controller = await self.get_controller()
606
607 # Get model
608 model = await self.get_model(controller, model_name)
609
610 try:
611 application = None
612 if application_name not in model.applications:
613
614 if machine_id is not None:
615 if machine_id not in model.machines:
616 msg = "Machine {} not found in model".format(machine_id)
617 self.log.error(msg=msg)
618 raise JujuMachineNotFound(msg)
619 machine = model.machines[machine_id]
620 series = machine.series
621
622 application = await model.deploy(
623 entity_url=path,
624 application_name=application_name,
625 channel="stable",
626 num_units=1,
627 series=series,
628 to=machine_id,
629 config=config,
630 )
631
632 self.log.debug(
633 "Wait until application {} is ready in model {}".format(
634 application_name, model_name
635 )
636 )
637 if num_units > 1:
638 for _ in range(num_units - 1):
639 m, _ = await self.create_machine(model_name, wait=False)
640 await application.add_unit(to=m.entity_id)
641
642 await JujuModelWatcher.wait_for(
643 model=model,
644 entity=application,
645 progress_timeout=progress_timeout,
646 total_timeout=total_timeout,
647 db_dict=db_dict,
648 n2vc=self.n2vc,
649 vca_id=self.vca_connection._vca_id,
650 )
651 self.log.debug(
652 "Application {} is ready in model {}".format(
653 application_name, model_name
654 )
655 )
656 else:
657 raise JujuApplicationExists(
658 "Application {} exists".format(application_name)
659 )
660 finally:
661 await self.disconnect_model(model)
662 await self.disconnect_controller(controller)
663
664 return application
665
666 async def scale_application(
667 self,
668 model_name: str,
669 application_name: str,
670 scale: int = 1,
671 total_timeout: float = None,
672 ):
673 """
674 Scale application (K8s)
675
676 :param: model_name: Model name
677 :param: application_name: Application name
678 :param: scale: Scale to which to set this application
679 :param: total_timeout: Timeout for the entity to be active
680 """
681
682 model = None
683 controller = await self.get_controller()
684 try:
685 model = await self.get_model(controller, model_name)
686
687 self.log.debug(
688 "Scaling application {} in model {}".format(
689 application_name, model_name
690 )
691 )
692 application = self._get_application(model, application_name)
693 if application is None:
694 raise JujuApplicationNotFound("Cannot scale application")
695 await application.scale(scale=scale)
696 # Wait until application is scaled in model
697 self.log.debug(
698 "Waiting for application {} to be scaled in model {}...".format(
699 application_name, model_name
700 )
701 )
702 if total_timeout is None:
703 total_timeout = 1800
704 end = time.time() + total_timeout
705 while time.time() < end:
706 application_scale = self._get_application_count(model, application_name)
707 # Before calling wait_for_model function,
708 # wait until application unit count and scale count are equal.
709 # Because there is a delay before scaling triggers in Juju model.
710 if application_scale == scale:
711 await JujuModelWatcher.wait_for_model(
712 model=model, timeout=total_timeout
713 )
714 self.log.debug(
715 "Application {} is scaled in model {}".format(
716 application_name, model_name
717 )
718 )
719 return
720 await asyncio.sleep(5)
721 raise Exception(
722 "Timeout waiting for application {} in model {} to be scaled".format(
723 application_name, model_name
724 )
725 )
726 finally:
727 if model:
728 await self.disconnect_model(model)
729 await self.disconnect_controller(controller)
730
731 def _get_application_count(self, model: Model, application_name: str) -> int:
732 """Get number of units of the application
733
734 :param: model: Model object
735 :param: application_name: Application name
736
737 :return: int (or None if application doesn't exist)
738 """
739 application = self._get_application(model, application_name)
740 if application is not None:
741 return len(application.units)
742
743 def _get_application(self, model: Model, application_name: str) -> Application:
744 """Get application
745
746 :param: model: Model object
747 :param: application_name: Application name
748
749 :return: juju.application.Application (or None if it doesn't exist)
750 """
751 if model.applications and application_name in model.applications:
752 return model.applications[application_name]
753
754 async def execute_action(
755 self,
756 application_name: str,
757 model_name: str,
758 action_name: str,
759 db_dict: dict = None,
760 progress_timeout: float = None,
761 total_timeout: float = None,
762 **kwargs,
763 ):
764 """Execute action
765
766 :param: application_name: Application name
767 :param: model_name: Model name
768 :param: action_name: Name of the action
769 :param: db_dict: Dictionary with data of the DB to write the updates
770 :param: progress_timeout: Maximum time between two updates in the model
771 :param: total_timeout: Timeout for the entity to be active
772
773 :return: (str, str): (output and status)
774 """
775 self.log.debug(
776 "Executing action {} using params {}".format(action_name, kwargs)
777 )
778 # Get controller
779 controller = await self.get_controller()
780
781 # Get model
782 model = await self.get_model(controller, model_name)
783
784 try:
785 # Get application
786 application = self._get_application(
787 model,
788 application_name=application_name,
789 )
790 if application is None:
791 raise JujuApplicationNotFound("Cannot execute action")
792
793 # Get leader unit
794 # Racing condition:
795 # Ocassionally, self._get_leader_unit() will return None
796 # because the leader elected hook has not been triggered yet.
797 # Therefore, we are doing some retries. If it happens again,
798 # re-open bug 1236
799 unit = await self._get_leader_unit(application)
800
801 actions = await application.get_actions()
802
803 if action_name not in actions:
804 raise JujuActionNotFound(
805 "Action {} not in available actions".format(action_name)
806 )
807
808 action = await unit.run_action(action_name, **kwargs)
809
810 self.log.debug(
811 "Wait until action {} is completed in application {} (model={})".format(
812 action_name, application_name, model_name
813 )
814 )
815 await JujuModelWatcher.wait_for(
816 model=model,
817 entity=action,
818 progress_timeout=progress_timeout,
819 total_timeout=total_timeout,
820 db_dict=db_dict,
821 n2vc=self.n2vc,
822 vca_id=self.vca_connection._vca_id,
823 )
824
825 output = await model.get_action_output(action_uuid=action.entity_id)
826 status = await model.get_action_status(uuid_or_prefix=action.entity_id)
827 status = (
828 status[action.entity_id] if action.entity_id in status else "failed"
829 )
830
831 self.log.debug(
832 "Action {} completed with status {} in application {} (model={})".format(
833 action_name, action.status, application_name, model_name
834 )
835 )
836 finally:
837 await self.disconnect_model(model)
838 await self.disconnect_controller(controller)
839
840 return output, status
841
842 async def get_actions(self, application_name: str, model_name: str) -> dict:
843 """Get list of actions
844
845 :param: application_name: Application name
846 :param: model_name: Model name
847
848 :return: Dict with this format
849 {
850 "action_name": "Description of the action",
851 ...
852 }
853 """
854 self.log.debug(
855 "Getting list of actions for application {}".format(application_name)
856 )
857
858 # Get controller
859 controller = await self.get_controller()
860
861 # Get model
862 model = await self.get_model(controller, model_name)
863
864 try:
865 # Get application
866 application = self._get_application(
867 model,
868 application_name=application_name,
869 )
870
871 # Return list of actions
872 return await application.get_actions()
873
874 finally:
875 # Disconnect from model and controller
876 await self.disconnect_model(model)
877 await self.disconnect_controller(controller)
878
879 async def get_metrics(self, model_name: str, application_name: str) -> dict:
880 """Get the metrics collected by the VCA.
881
882 :param model_name The name or unique id of the network service
883 :param application_name The name of the application
884 """
885 if not model_name or not application_name:
886 raise Exception("model_name and application_name must be non-empty strings")
887 metrics = {}
888 controller = await self.get_controller()
889 model = await self.get_model(controller, model_name)
890 try:
891 application = self._get_application(model, application_name)
892 if application is not None:
893 metrics = await application.get_metrics()
894 finally:
895 self.disconnect_model(model)
896 self.disconnect_controller(controller)
897 return metrics
898
899 async def add_relation(
900 self,
901 model_name: str,
902 endpoint_1: str,
903 endpoint_2: str,
904 ):
905 """Add relation
906
907 :param: model_name: Model name
908 :param: endpoint_1 First endpoint name
909 ("app:endpoint" format or directly the saas name)
910 :param: endpoint_2: Second endpoint name (^ same format)
911 """
912
913 self.log.debug("Adding relation: {} -> {}".format(endpoint_1, endpoint_2))
914
915 # Get controller
916 controller = await self.get_controller()
917
918 # Get model
919 model = await self.get_model(controller, model_name)
920
921 # Add relation
922 try:
923 await model.add_relation(endpoint_1, endpoint_2)
924 except juju.errors.JujuAPIError as e:
925 if "not found" in e.message:
926 self.log.warning("Relation not found: {}".format(e.message))
927 return
928 if "already exists" in e.message:
929 self.log.warning("Relation already exists: {}".format(e.message))
930 return
931 # another exception, raise it
932 raise e
933 finally:
934 await self.disconnect_model(model)
935 await self.disconnect_controller(controller)
936
937 async def consume(
938 self,
939 offer_url: str,
940 model_name: str,
941 ):
942 """
943 Adds a remote offer to the model. Relations can be created later using "juju relate".
944
945 :param: offer_url: Offer Url
946 :param: model_name: Model name
947
948 :raises ParseError if there's a problem parsing the offer_url
949 :raises JujuError if remote offer includes and endpoint
950 :raises JujuAPIError if the operation is not successful
951 """
952 controller = await self.get_controller()
953 model = await controller.get_model(model_name)
954
955 try:
956 await model.consume(offer_url)
957 finally:
958 await self.disconnect_model(model)
959 await self.disconnect_controller(controller)
960
961 async def destroy_model(self, model_name: str, total_timeout: float = 1800):
962 """
963 Destroy model
964
965 :param: model_name: Model name
966 :param: total_timeout: Timeout
967 """
968
969 controller = await self.get_controller()
970 model = None
971 try:
972 if not await self.model_exists(model_name, controller=controller):
973 return
974
975 self.log.debug("Destroying model {}".format(model_name))
976
977 model = await self.get_model(controller, model_name)
978 # Destroy machines that are manually provisioned
979 # and still are in pending state
980 await self._destroy_pending_machines(model, only_manual=True)
981 await self.disconnect_model(model)
982
983 await self._destroy_model(
984 model_name,
985 controller,
986 timeout=total_timeout,
987 )
988 finally:
989 if model:
990 await self.disconnect_model(model)
991 await self.disconnect_controller(controller)
992
993 async def _destroy_model(
994 self, model_name: str, controller: Controller, timeout: float = 1800
995 ):
996 """
997 Destroy model from controller
998
999 :param: model: Model name to be removed
1000 :param: controller: Controller object
1001 :param: timeout: Timeout in seconds
1002 """
1003
1004 async def _destroy_model_loop(model_name: str, controller: Controller):
1005 while await self.model_exists(model_name, controller=controller):
1006 await controller.destroy_model(
1007 model_name, destroy_storage=True, force=True, max_wait=0
1008 )
1009 await asyncio.sleep(5)
1010
1011 try:
1012 await asyncio.wait_for(
1013 _destroy_model_loop(model_name, controller), timeout=timeout
1014 )
1015 except asyncio.TimeoutError:
1016 raise Exception(
1017 "Timeout waiting for model {} to be destroyed".format(model_name)
1018 )
1019
1020 async def destroy_application(
1021 self, model_name: str, application_name: str, total_timeout: float
1022 ):
1023 """
1024 Destroy application
1025
1026 :param: model_name: Model name
1027 :param: application_name: Application name
1028 :param: total_timeout: Timeout
1029 """
1030
1031 controller = await self.get_controller()
1032 model = None
1033
1034 try:
1035 model = await self.get_model(controller, model_name)
1036 self.log.debug(
1037 "Destroying application {} in model {}".format(
1038 application_name, model_name
1039 )
1040 )
1041 application = self._get_application(model, application_name)
1042 if application:
1043 await application.destroy()
1044 else:
1045 self.log.warning("Application not found: {}".format(application_name))
1046
1047 self.log.debug(
1048 "Waiting for application {} to be destroyed in model {}...".format(
1049 application_name, model_name
1050 )
1051 )
1052 if total_timeout is None:
1053 total_timeout = 3600
1054 end = time.time() + total_timeout
1055 while time.time() < end:
1056 if not self._get_application(model, application_name):
1057 self.log.debug(
1058 "The application {} was destroyed in model {} ".format(
1059 application_name, model_name
1060 )
1061 )
1062 return
1063 await asyncio.sleep(5)
1064 raise Exception(
1065 "Timeout waiting for application {} to be destroyed in model {}".format(
1066 application_name, model_name
1067 )
1068 )
1069 finally:
1070 if model is not None:
1071 await self.disconnect_model(model)
1072 await self.disconnect_controller(controller)
1073
1074 async def _destroy_pending_machines(self, model: Model, only_manual: bool = False):
1075 """
1076 Destroy pending machines in a given model
1077
1078 :param: only_manual: Bool that indicates only manually provisioned
1079 machines should be destroyed (if True), or that
1080 all pending machines should be destroyed
1081 """
1082 status = await model.get_status()
1083 for machine_id in status.machines:
1084 machine_status = status.machines[machine_id]
1085 if machine_status.agent_status.status == "pending":
1086 if only_manual and not machine_status.instance_id.startswith("manual:"):
1087 break
1088 machine = model.machines[machine_id]
1089 await machine.destroy(force=True)
1090
1091 async def configure_application(
1092 self, model_name: str, application_name: str, config: dict = None
1093 ):
1094 """Configure application
1095
1096 :param: model_name: Model name
1097 :param: application_name: Application name
1098 :param: config: Config to apply to the charm
1099 """
1100 self.log.debug("Configuring application {}".format(application_name))
1101
1102 if config:
1103 controller = await self.get_controller()
1104 model = None
1105 try:
1106 model = await self.get_model(controller, model_name)
1107 application = self._get_application(
1108 model,
1109 application_name=application_name,
1110 )
1111 await application.set_config(config)
1112 finally:
1113 if model:
1114 await self.disconnect_model(model)
1115 await self.disconnect_controller(controller)
1116
1117 def handle_exception(self, loop, context):
1118 # All unhandled exceptions by libjuju are handled here.
1119 pass
1120
1121 async def health_check(self, interval: float = 300.0):
1122 """
1123 Health check to make sure controller and controller_model connections are OK
1124
1125 :param: interval: Time in seconds between checks
1126 """
1127 controller = None
1128 while True:
1129 try:
1130 controller = await self.get_controller()
1131 # self.log.debug("VCA is alive")
1132 except Exception as e:
1133 self.log.error("Health check to VCA failed: {}".format(e))
1134 finally:
1135 await self.disconnect_controller(controller)
1136 await asyncio.sleep(interval)
1137
1138 async def list_models(self, contains: str = None) -> [str]:
1139 """List models with certain names
1140
1141 :param: contains: String that is contained in model name
1142
1143 :retur: [models] Returns list of model names
1144 """
1145
1146 controller = await self.get_controller()
1147 try:
1148 models = await controller.list_models()
1149 if contains:
1150 models = [model for model in models if contains in model]
1151 return models
1152 finally:
1153 await self.disconnect_controller(controller)
1154
1155 async def list_offers(self, model_name: str) -> QueryApplicationOffersResults:
1156 """List models with certain names
1157
1158 :param: model_name: Model name
1159
1160 :return: Returns list of offers
1161 """
1162
1163 controller = await self.get_controller()
1164 try:
1165 return await controller.list_offers(model_name)
1166 finally:
1167 await self.disconnect_controller(controller)
1168
1169 async def add_k8s(
1170 self,
1171 name: str,
1172 rbac_id: str,
1173 token: str,
1174 client_cert_data: str,
1175 configuration: Configuration,
1176 storage_class: str,
1177 credential_name: str = None,
1178 ):
1179 """
1180 Add a Kubernetes cloud to the controller
1181
1182 Similar to the `juju add-k8s` command in the CLI
1183
1184 :param: name: Name for the K8s cloud
1185 :param: configuration: Kubernetes configuration object
1186 :param: storage_class: Storage Class to use in the cloud
1187 :param: credential_name: Storage Class to use in the cloud
1188 """
1189
1190 if not storage_class:
1191 raise Exception("storage_class must be a non-empty string")
1192 if not name:
1193 raise Exception("name must be a non-empty string")
1194 if not configuration:
1195 raise Exception("configuration must be provided")
1196
1197 endpoint = configuration.host
1198 credential = self.get_k8s_cloud_credential(
1199 configuration,
1200 client_cert_data,
1201 token,
1202 )
1203 credential.attrs[RBAC_LABEL_KEY_NAME] = rbac_id
1204 cloud = client.Cloud(
1205 type_="kubernetes",
1206 auth_types=[credential.auth_type],
1207 endpoint=endpoint,
1208 ca_certificates=[client_cert_data],
1209 config={
1210 "operator-storage": storage_class,
1211 "workload-storage": storage_class,
1212 },
1213 )
1214
1215 return await self.add_cloud(
1216 name, cloud, credential, credential_name=credential_name
1217 )
1218
1219 def get_k8s_cloud_credential(
1220 self,
1221 configuration: Configuration,
1222 client_cert_data: str,
1223 token: str = None,
1224 ) -> client.CloudCredential:
1225 attrs = {}
1226 # TODO: Test with AKS
1227 key = None # open(configuration.key_file, "r").read()
1228 username = configuration.username
1229 password = configuration.password
1230
1231 if client_cert_data:
1232 attrs["ClientCertificateData"] = client_cert_data
1233 if key:
1234 attrs["ClientKeyData"] = key
1235 if token:
1236 if username or password:
1237 raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass")
1238 attrs["Token"] = token
1239
1240 auth_type = None
1241 if key:
1242 auth_type = "oauth2"
1243 if client_cert_data:
1244 auth_type = "oauth2withcert"
1245 if not token:
1246 raise JujuInvalidK8sConfiguration(
1247 "missing token for auth type {}".format(auth_type)
1248 )
1249 elif username:
1250 if not password:
1251 self.log.debug(
1252 "credential for user {} has empty password".format(username)
1253 )
1254 attrs["username"] = username
1255 attrs["password"] = password
1256 if client_cert_data:
1257 auth_type = "userpasswithcert"
1258 else:
1259 auth_type = "userpass"
1260 elif client_cert_data and token:
1261 auth_type = "certificate"
1262 else:
1263 raise JujuInvalidK8sConfiguration("authentication method not supported")
1264 return client.CloudCredential(auth_type=auth_type, attrs=attrs)
1265
1266 async def add_cloud(
1267 self,
1268 name: str,
1269 cloud: Cloud,
1270 credential: CloudCredential = None,
1271 credential_name: str = None,
1272 ) -> Cloud:
1273 """
1274 Add cloud to the controller
1275
1276 :param: name: Name of the cloud to be added
1277 :param: cloud: Cloud object
1278 :param: credential: CloudCredentials object for the cloud
1279 :param: credential_name: Credential name.
1280 If not defined, cloud of the name will be used.
1281 """
1282 controller = await self.get_controller()
1283 try:
1284 _ = await controller.add_cloud(name, cloud)
1285 if credential:
1286 await controller.add_credential(
1287 credential_name or name, credential=credential, cloud=name
1288 )
1289 # Need to return the object returned by the controller.add_cloud() function
1290 # I'm returning the original value now until this bug is fixed:
1291 # https://github.com/juju/python-libjuju/issues/443
1292 return cloud
1293 finally:
1294 await self.disconnect_controller(controller)
1295
1296 async def remove_cloud(self, name: str):
1297 """
1298 Remove cloud
1299
1300 :param: name: Name of the cloud to be removed
1301 """
1302 controller = await self.get_controller()
1303 try:
1304 await controller.remove_cloud(name)
1305 except juju.errors.JujuError as e:
1306 if len(e.errors) == 1 and f'cloud "{name}" not found' == e.errors[0]:
1307 self.log.warning(f"Cloud {name} not found, so it could not be deleted.")
1308 else:
1309 raise e
1310 finally:
1311 await self.disconnect_controller(controller)
1312
1313 @retry(attempts=20, delay=5, fallback=JujuLeaderUnitNotFound())
1314 async def _get_leader_unit(self, application: Application) -> Unit:
1315 unit = None
1316 for u in application.units:
1317 if await u.is_leader_from_status():
1318 unit = u
1319 break
1320 if not unit:
1321 raise Exception()
1322 return unit
1323
1324 async def get_cloud_credentials(self, cloud: Cloud) -> typing.List:
1325 """
1326 Get cloud credentials
1327
1328 :param: cloud: Cloud object. The returned credentials will be from this cloud.
1329
1330 :return: List of credentials object associated to the specified cloud
1331
1332 """
1333 controller = await self.get_controller()
1334 try:
1335 facade = client.CloudFacade.from_connection(controller.connection())
1336 cloud_cred_tag = tag.credential(
1337 cloud.name, self.vca_connection.data.user, cloud.credential_name
1338 )
1339 params = [client.Entity(cloud_cred_tag)]
1340 return (await facade.Credential(params)).results
1341 finally:
1342 await self.disconnect_controller(controller)