0c0180f30021e1b0799caa597df3ede84cf97f10
[osm/N2VC.git] / n2vc / libjuju.py
1 # Copyright 2020 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import logging
17 import typing
18
19 import time
20
21 import juju.errors
22 from juju.model import Model
23 from juju.machine import Machine
24 from juju.application import Application
25 from juju.unit import Unit
26 from juju.client._definitions import (
27 FullStatus,
28 QueryApplicationOffersResults,
29 Cloud,
30 CloudCredential,
31 )
32 from juju.controller import Controller
33 from juju.client import client
34 from juju import tag
35
36 from n2vc.definitions import Offer, RelationEndpoint
37 from n2vc.juju_watcher import JujuModelWatcher
38 from n2vc.provisioner import AsyncSSHProvisioner
39 from n2vc.n2vc_conn import N2VCConnector
40 from n2vc.exceptions import (
41 JujuMachineNotFound,
42 JujuApplicationNotFound,
43 JujuLeaderUnitNotFound,
44 JujuActionNotFound,
45 JujuControllerFailedConnecting,
46 JujuApplicationExists,
47 JujuInvalidK8sConfiguration,
48 JujuError,
49 )
50 from n2vc.vca.cloud import Cloud as VcaCloud
51 from n2vc.vca.connection import Connection
52 from kubernetes.client.configuration import Configuration
53 from retrying_async import retry
54
55
56 RBAC_LABEL_KEY_NAME = "rbac-id"
57
58
59 class Libjuju:
60 def __init__(
61 self,
62 vca_connection: Connection,
63 loop: asyncio.AbstractEventLoop = None,
64 log: logging.Logger = None,
65 n2vc: N2VCConnector = None,
66 ):
67 """
68 Constructor
69
70 :param: vca_connection: n2vc.vca.connection object
71 :param: loop: Asyncio loop
72 :param: log: Logger
73 :param: n2vc: N2VC object
74 """
75
76 self.log = log or logging.getLogger("Libjuju")
77 self.n2vc = n2vc
78 self.vca_connection = vca_connection
79
80 self.loop = loop or asyncio.get_event_loop()
81 self.loop.set_exception_handler(self.handle_exception)
82 self.creating_model = asyncio.Lock(loop=self.loop)
83
84 if self.vca_connection.is_default:
85 self.health_check_task = self._create_health_check_task()
86
87 def _create_health_check_task(self):
88 return self.loop.create_task(self.health_check())
89
90 async def get_controller(self, timeout: float = 60.0) -> Controller:
91 """
92 Get controller
93
94 :param: timeout: Time in seconds to wait for controller to connect
95 """
96 controller = None
97 try:
98 controller = Controller()
99 await asyncio.wait_for(
100 controller.connect(
101 endpoint=self.vca_connection.data.endpoints,
102 username=self.vca_connection.data.user,
103 password=self.vca_connection.data.secret,
104 cacert=self.vca_connection.data.cacert,
105 ),
106 timeout=timeout,
107 )
108 if self.vca_connection.is_default:
109 endpoints = await controller.api_endpoints
110 if not all(
111 endpoint in self.vca_connection.endpoints for endpoint in endpoints
112 ):
113 await self.vca_connection.update_endpoints(endpoints)
114 return controller
115 except asyncio.CancelledError as e:
116 raise e
117 except Exception as e:
118 self.log.error(
119 "Failed connecting to controller: {}... {}".format(
120 self.vca_connection.data.endpoints, e
121 )
122 )
123 if controller:
124 await self.disconnect_controller(controller)
125 raise JujuControllerFailedConnecting(e)
126
127 async def disconnect(self):
128 """Disconnect"""
129 # Cancel health check task
130 self.health_check_task.cancel()
131 self.log.debug("Libjuju disconnected!")
132
133 async def disconnect_model(self, model: Model):
134 """
135 Disconnect model
136
137 :param: model: Model that will be disconnected
138 """
139 await model.disconnect()
140
141 async def disconnect_controller(self, controller: Controller):
142 """
143 Disconnect controller
144
145 :param: controller: Controller that will be disconnected
146 """
147 if controller:
148 await controller.disconnect()
149
150 @retry(attempts=3, delay=5, timeout=None)
151 async def add_model(self, model_name: str, cloud: VcaCloud):
152 """
153 Create model
154
155 :param: model_name: Model name
156 :param: cloud: Cloud object
157 """
158
159 # Get controller
160 controller = await self.get_controller()
161 model = None
162 try:
163 # Block until other workers have finished model creation
164 while self.creating_model.locked():
165 await asyncio.sleep(0.1)
166
167 # Create the model
168 async with self.creating_model:
169 if await self.model_exists(model_name, controller=controller):
170 return
171 self.log.debug("Creating model {}".format(model_name))
172 model = await controller.add_model(
173 model_name,
174 config=self.vca_connection.data.model_config,
175 cloud_name=cloud.name,
176 credential_name=cloud.credential_name,
177 )
178 except juju.errors.JujuAPIError as e:
179 if "already exists" in e.message:
180 pass
181 else:
182 raise e
183 finally:
184 if model:
185 await self.disconnect_model(model)
186 await self.disconnect_controller(controller)
187
188 async def get_executed_actions(self, model_name: str) -> list:
189 """
190 Get executed/history of actions for a model.
191
192 :param: model_name: Model name, str.
193 :return: List of executed actions for a model.
194 """
195 model = None
196 executed_actions = []
197 controller = await self.get_controller()
198 try:
199 model = await self.get_model(controller, model_name)
200 # Get all unique action names
201 actions = {}
202 for application in model.applications:
203 application_actions = await self.get_actions(application, model_name)
204 actions.update(application_actions)
205 # Get status of all actions
206 for application_action in actions:
207 app_action_status_list = await model.get_action_status(
208 name=application_action
209 )
210 for action_id, action_status in app_action_status_list.items():
211 executed_action = {
212 "id": action_id,
213 "action": application_action,
214 "status": action_status,
215 }
216 # Get action output by id
217 action_status = await model.get_action_output(executed_action["id"])
218 for k, v in action_status.items():
219 executed_action[k] = v
220 executed_actions.append(executed_action)
221 except Exception as e:
222 raise JujuError(
223 "Error in getting executed actions for model: {}. Error: {}".format(
224 model_name, str(e)
225 )
226 )
227 finally:
228 if model:
229 await self.disconnect_model(model)
230 await self.disconnect_controller(controller)
231 return executed_actions
232
233 async def get_application_configs(
234 self, model_name: str, application_name: str
235 ) -> dict:
236 """
237 Get available configs for an application.
238
239 :param: model_name: Model name, str.
240 :param: application_name: Application name, str.
241
242 :return: A dict which has key - action name, value - action description
243 """
244 model = None
245 application_configs = {}
246 controller = await self.get_controller()
247 try:
248 model = await self.get_model(controller, model_name)
249 application = self._get_application(
250 model, application_name=application_name
251 )
252 application_configs = await application.get_config()
253 except Exception as e:
254 raise JujuError(
255 "Error in getting configs for application: {} in model: {}. Error: {}".format(
256 application_name, model_name, str(e)
257 )
258 )
259 finally:
260 if model:
261 await self.disconnect_model(model)
262 await self.disconnect_controller(controller)
263 return application_configs
264
265 @retry(attempts=3, delay=5)
266 async def get_model(self, controller: Controller, model_name: str) -> Model:
267 """
268 Get model from controller
269
270 :param: controller: Controller
271 :param: model_name: Model name
272
273 :return: Model: The created Juju model object
274 """
275 return await controller.get_model(model_name)
276
277 async def model_exists(
278 self, model_name: str, controller: Controller = None
279 ) -> bool:
280 """
281 Check if model exists
282
283 :param: controller: Controller
284 :param: model_name: Model name
285
286 :return bool
287 """
288 need_to_disconnect = False
289
290 # Get controller if not passed
291 if not controller:
292 controller = await self.get_controller()
293 need_to_disconnect = True
294
295 # Check if model exists
296 try:
297 return model_name in await controller.list_models()
298 finally:
299 if need_to_disconnect:
300 await self.disconnect_controller(controller)
301
302 async def models_exist(self, model_names: [str]) -> (bool, list):
303 """
304 Check if models exists
305
306 :param: model_names: List of strings with model names
307
308 :return (bool, list[str]): (True if all models exists, List of model names that don't exist)
309 """
310 if not model_names:
311 raise Exception(
312 "model_names must be a non-empty array. Given value: {}".format(
313 model_names
314 )
315 )
316 non_existing_models = []
317 models = await self.list_models()
318 existing_models = list(set(models).intersection(model_names))
319 non_existing_models = list(set(model_names) - set(existing_models))
320
321 return (
322 len(non_existing_models) == 0,
323 non_existing_models,
324 )
325
326 async def get_model_status(self, model_name: str) -> FullStatus:
327 """
328 Get model status
329
330 :param: model_name: Model name
331
332 :return: Full status object
333 """
334 controller = await self.get_controller()
335 model = await self.get_model(controller, model_name)
336 try:
337 return await model.get_status()
338 finally:
339 await self.disconnect_model(model)
340 await self.disconnect_controller(controller)
341
342 async def create_machine(
343 self,
344 model_name: str,
345 machine_id: str = None,
346 db_dict: dict = None,
347 progress_timeout: float = None,
348 total_timeout: float = None,
349 series: str = "bionic",
350 wait: bool = True,
351 ) -> (Machine, bool):
352 """
353 Create machine
354
355 :param: model_name: Model name
356 :param: machine_id: Machine id
357 :param: db_dict: Dictionary with data of the DB to write the updates
358 :param: progress_timeout: Maximum time between two updates in the model
359 :param: total_timeout: Timeout for the entity to be active
360 :param: series: Series of the machine (xenial, bionic, focal, ...)
361 :param: wait: Wait until machine is ready
362
363 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
364 if the machine is new or it already existed
365 """
366 new = False
367 machine = None
368
369 self.log.debug(
370 "Creating machine (id={}) in model: {}".format(machine_id, model_name)
371 )
372
373 # Get controller
374 controller = await self.get_controller()
375
376 # Get model
377 model = await self.get_model(controller, model_name)
378 try:
379 if machine_id is not None:
380 self.log.debug(
381 "Searching machine (id={}) in model {}".format(
382 machine_id, model_name
383 )
384 )
385
386 # Get machines from model and get the machine with machine_id if exists
387 machines = await model.get_machines()
388 if machine_id in machines:
389 self.log.debug(
390 "Machine (id={}) found in model {}".format(
391 machine_id, model_name
392 )
393 )
394 machine = machines[machine_id]
395 else:
396 raise JujuMachineNotFound("Machine {} not found".format(machine_id))
397
398 if machine is None:
399 self.log.debug("Creating a new machine in model {}".format(model_name))
400
401 # Create machine
402 machine = await model.add_machine(
403 spec=None, constraints=None, disks=None, series=series
404 )
405 new = True
406
407 # Wait until the machine is ready
408 self.log.debug(
409 "Wait until machine {} is ready in model {}".format(
410 machine.entity_id, model_name
411 )
412 )
413 if wait:
414 await JujuModelWatcher.wait_for(
415 model=model,
416 entity=machine,
417 progress_timeout=progress_timeout,
418 total_timeout=total_timeout,
419 db_dict=db_dict,
420 n2vc=self.n2vc,
421 vca_id=self.vca_connection._vca_id,
422 )
423 finally:
424 await self.disconnect_model(model)
425 await self.disconnect_controller(controller)
426
427 self.log.debug(
428 "Machine {} ready at {} in model {}".format(
429 machine.entity_id, machine.dns_name, model_name
430 )
431 )
432 return machine, new
433
434 async def provision_machine(
435 self,
436 model_name: str,
437 hostname: str,
438 username: str,
439 private_key_path: str,
440 db_dict: dict = None,
441 progress_timeout: float = None,
442 total_timeout: float = None,
443 ) -> str:
444 """
445 Manually provisioning of a machine
446
447 :param: model_name: Model name
448 :param: hostname: IP to access the machine
449 :param: username: Username to login to the machine
450 :param: private_key_path: Local path for the private key
451 :param: db_dict: Dictionary with data of the DB to write the updates
452 :param: progress_timeout: Maximum time between two updates in the model
453 :param: total_timeout: Timeout for the entity to be active
454
455 :return: (Entity): Machine id
456 """
457 self.log.debug(
458 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
459 model_name, hostname, username
460 )
461 )
462
463 # Get controller
464 controller = await self.get_controller()
465
466 # Get model
467 model = await self.get_model(controller, model_name)
468
469 try:
470 # Get provisioner
471 provisioner = AsyncSSHProvisioner(
472 host=hostname,
473 user=username,
474 private_key_path=private_key_path,
475 log=self.log,
476 )
477
478 # Provision machine
479 params = await provisioner.provision_machine()
480
481 params.jobs = ["JobHostUnits"]
482
483 self.log.debug("Adding machine to model")
484 connection = model.connection()
485 client_facade = client.ClientFacade.from_connection(connection)
486
487 results = await client_facade.AddMachines(params=[params])
488 error = results.machines[0].error
489
490 if error:
491 msg = "Error adding machine: {}".format(error.message)
492 self.log.error(msg=msg)
493 raise ValueError(msg)
494
495 machine_id = results.machines[0].machine
496
497 self.log.debug("Installing Juju agent into machine {}".format(machine_id))
498 asyncio.ensure_future(
499 provisioner.install_agent(
500 connection=connection,
501 nonce=params.nonce,
502 machine_id=machine_id,
503 proxy=self.vca_connection.data.api_proxy,
504 series=params.series,
505 )
506 )
507
508 machine = None
509 for _ in range(10):
510 machine_list = await model.get_machines()
511 if machine_id in machine_list:
512 self.log.debug("Machine {} found in model!".format(machine_id))
513 machine = model.machines.get(machine_id)
514 break
515 await asyncio.sleep(2)
516
517 if machine is None:
518 msg = "Machine {} not found in model".format(machine_id)
519 self.log.error(msg=msg)
520 raise JujuMachineNotFound(msg)
521
522 self.log.debug(
523 "Wait until machine {} is ready in model {}".format(
524 machine.entity_id, model_name
525 )
526 )
527 await JujuModelWatcher.wait_for(
528 model=model,
529 entity=machine,
530 progress_timeout=progress_timeout,
531 total_timeout=total_timeout,
532 db_dict=db_dict,
533 n2vc=self.n2vc,
534 vca_id=self.vca_connection._vca_id,
535 )
536 except Exception as e:
537 raise e
538 finally:
539 await self.disconnect_model(model)
540 await self.disconnect_controller(controller)
541
542 self.log.debug(
543 "Machine provisioned {} in model {}".format(machine_id, model_name)
544 )
545
546 return machine_id
547
548 async def deploy(
549 self, uri: str, model_name: str, wait: bool = True, timeout: float = 3600
550 ):
551 """
552 Deploy bundle or charm: Similar to the juju CLI command `juju deploy`
553
554 :param: uri: Path or Charm Store uri in which the charm or bundle can be found
555 :param: model_name: Model name
556 :param: wait: Indicates whether to wait or not until all applications are active
557 :param: timeout: Time in seconds to wait until all applications are active
558 """
559 controller = await self.get_controller()
560 model = await self.get_model(controller, model_name)
561 try:
562 await model.deploy(uri, trust=True)
563 if wait:
564 await JujuModelWatcher.wait_for_model(model, timeout=timeout)
565 self.log.debug("All units active in model {}".format(model_name))
566 finally:
567 await self.disconnect_model(model)
568 await self.disconnect_controller(controller)
569
570 async def add_unit(
571 self,
572 application_name: str,
573 model_name: str,
574 machine_id: str,
575 db_dict: dict = None,
576 progress_timeout: float = None,
577 total_timeout: float = None,
578 ):
579 """Add unit
580
581 :param: application_name: Application name
582 :param: model_name: Model name
583 :param: machine_id Machine id
584 :param: db_dict: Dictionary with data of the DB to write the updates
585 :param: progress_timeout: Maximum time between two updates in the model
586 :param: total_timeout: Timeout for the entity to be active
587
588 :return: None
589 """
590
591 model = None
592 controller = await self.get_controller()
593 try:
594 model = await self.get_model(controller, model_name)
595 application = self._get_application(model, application_name)
596
597 if application is not None:
598
599 # Checks if the given machine id in the model,
600 # otherwise function raises an error
601 _machine, _series = self._get_machine_info(model, machine_id)
602
603 self.log.debug(
604 "Adding unit (machine {}) to application {} in model ~{}".format(
605 machine_id, application_name, model_name
606 )
607 )
608
609 await application.add_unit(to=machine_id)
610
611 await JujuModelWatcher.wait_for(
612 model=model,
613 entity=application,
614 progress_timeout=progress_timeout,
615 total_timeout=total_timeout,
616 db_dict=db_dict,
617 n2vc=self.n2vc,
618 vca_id=self.vca_connection._vca_id,
619 )
620 self.log.debug(
621 "Unit is added to application {} in model {}".format(
622 application_name, model_name
623 )
624 )
625 else:
626 raise JujuApplicationNotFound(
627 "Application {} not exists".format(application_name)
628 )
629 finally:
630 if model:
631 await self.disconnect_model(model)
632 await self.disconnect_controller(controller)
633
634 async def destroy_unit(
635 self,
636 application_name: str,
637 model_name: str,
638 machine_id: str,
639 total_timeout: float = None,
640 ):
641 """Destroy unit
642
643 :param: application_name: Application name
644 :param: model_name: Model name
645 :param: machine_id Machine id
646 :param: total_timeout: Timeout for the entity to be active
647
648 :return: None
649 """
650
651 model = None
652 controller = await self.get_controller()
653 try:
654 model = await self.get_model(controller, model_name)
655 application = self._get_application(model, application_name)
656
657 if application is None:
658 raise JujuApplicationNotFound(
659 "Application not found: {} (model={})".format(
660 application_name, model_name
661 )
662 )
663
664 unit = self._get_unit(application, machine_id)
665 if not unit:
666 raise JujuError(
667 "A unit with machine id {} not in available units".format(
668 machine_id
669 )
670 )
671
672 unit_name = unit.name
673
674 self.log.debug(
675 "Destroying unit {} from application {} in model {}".format(
676 unit_name, application_name, model_name
677 )
678 )
679 await application.destroy_unit(unit_name)
680
681 self.log.debug(
682 "Waiting for unit {} to be destroyed in application {} (model={})...".format(
683 unit_name, application_name, model_name
684 )
685 )
686
687 # TODO: Add functionality in the Juju watcher to replace this kind of blocks
688 if total_timeout is None:
689 total_timeout = 3600
690 end = time.time() + total_timeout
691 while time.time() < end:
692 if not self._get_unit(application, machine_id):
693 self.log.debug(
694 "The unit {} was destroyed in application {} (model={}) ".format(
695 unit_name, application_name, model_name
696 )
697 )
698 return
699 await asyncio.sleep(5)
700 self.log.debug(
701 "Unit {} is destroyed from application {} in model {}".format(
702 unit_name, application_name, model_name
703 )
704 )
705 finally:
706 if model:
707 await self.disconnect_model(model)
708 await self.disconnect_controller(controller)
709
710 async def deploy_charm(
711 self,
712 application_name: str,
713 path: str,
714 model_name: str,
715 machine_id: str,
716 db_dict: dict = None,
717 progress_timeout: float = None,
718 total_timeout: float = None,
719 config: dict = None,
720 series: str = None,
721 num_units: int = 1,
722 ):
723 """Deploy charm
724
725 :param: application_name: Application name
726 :param: path: Local path to the charm
727 :param: model_name: Model name
728 :param: machine_id ID of the machine
729 :param: db_dict: Dictionary with data of the DB to write the updates
730 :param: progress_timeout: Maximum time between two updates in the model
731 :param: total_timeout: Timeout for the entity to be active
732 :param: config: Config for the charm
733 :param: series: Series of the charm
734 :param: num_units: Number of units
735
736 :return: (juju.application.Application): Juju application
737 """
738 self.log.debug(
739 "Deploying charm {} to machine {} in model ~{}".format(
740 application_name, machine_id, model_name
741 )
742 )
743 self.log.debug("charm: {}".format(path))
744
745 # Get controller
746 controller = await self.get_controller()
747
748 # Get model
749 model = await self.get_model(controller, model_name)
750
751 try:
752 if application_name not in model.applications:
753
754 if machine_id is not None:
755 machine, series = self._get_machine_info(model, machine_id)
756
757 application = await model.deploy(
758 entity_url=path,
759 application_name=application_name,
760 channel="stable",
761 num_units=1,
762 series=series,
763 to=machine_id,
764 config=config,
765 )
766
767 self.log.debug(
768 "Wait until application {} is ready in model {}".format(
769 application_name, model_name
770 )
771 )
772 if num_units > 1:
773 for _ in range(num_units - 1):
774 m, _ = await self.create_machine(model_name, wait=False)
775 await application.add_unit(to=m.entity_id)
776
777 await JujuModelWatcher.wait_for(
778 model=model,
779 entity=application,
780 progress_timeout=progress_timeout,
781 total_timeout=total_timeout,
782 db_dict=db_dict,
783 n2vc=self.n2vc,
784 vca_id=self.vca_connection._vca_id,
785 )
786 self.log.debug(
787 "Application {} is ready in model {}".format(
788 application_name, model_name
789 )
790 )
791 else:
792 raise JujuApplicationExists(
793 "Application {} exists".format(application_name)
794 )
795 except juju.errors.JujuError as e:
796 if "already exists" in e.message:
797 raise JujuApplicationExists(
798 "Application {} exists".format(application_name)
799 )
800 else:
801 raise e
802 finally:
803 await self.disconnect_model(model)
804 await self.disconnect_controller(controller)
805
806 return application
807
808 async def scale_application(
809 self,
810 model_name: str,
811 application_name: str,
812 scale: int = 1,
813 total_timeout: float = None,
814 ):
815 """
816 Scale application (K8s)
817
818 :param: model_name: Model name
819 :param: application_name: Application name
820 :param: scale: Scale to which to set this application
821 :param: total_timeout: Timeout for the entity to be active
822 """
823
824 model = None
825 controller = await self.get_controller()
826 try:
827 model = await self.get_model(controller, model_name)
828
829 self.log.debug(
830 "Scaling application {} in model {}".format(
831 application_name, model_name
832 )
833 )
834 application = self._get_application(model, application_name)
835 if application is None:
836 raise JujuApplicationNotFound("Cannot scale application")
837 await application.scale(scale=scale)
838 # Wait until application is scaled in model
839 self.log.debug(
840 "Waiting for application {} to be scaled in model {}...".format(
841 application_name, model_name
842 )
843 )
844 if total_timeout is None:
845 total_timeout = 1800
846 end = time.time() + total_timeout
847 while time.time() < end:
848 application_scale = self._get_application_count(model, application_name)
849 # Before calling wait_for_model function,
850 # wait until application unit count and scale count are equal.
851 # Because there is a delay before scaling triggers in Juju model.
852 if application_scale == scale:
853 await JujuModelWatcher.wait_for_model(
854 model=model, timeout=total_timeout
855 )
856 self.log.debug(
857 "Application {} is scaled in model {}".format(
858 application_name, model_name
859 )
860 )
861 return
862 await asyncio.sleep(5)
863 raise Exception(
864 "Timeout waiting for application {} in model {} to be scaled".format(
865 application_name, model_name
866 )
867 )
868 finally:
869 if model:
870 await self.disconnect_model(model)
871 await self.disconnect_controller(controller)
872
873 def _get_application_count(self, model: Model, application_name: str) -> int:
874 """Get number of units of the application
875
876 :param: model: Model object
877 :param: application_name: Application name
878
879 :return: int (or None if application doesn't exist)
880 """
881 application = self._get_application(model, application_name)
882 if application is not None:
883 return len(application.units)
884
885 def _get_application(self, model: Model, application_name: str) -> Application:
886 """Get application
887
888 :param: model: Model object
889 :param: application_name: Application name
890
891 :return: juju.application.Application (or None if it doesn't exist)
892 """
893 if model.applications and application_name in model.applications:
894 return model.applications[application_name]
895
896 def _get_unit(self, application: Application, machine_id: str) -> Unit:
897 """Get unit
898
899 :param: application: Application object
900 :param: machine_id: Machine id
901
902 :return: Unit
903 """
904 unit = None
905 for u in application.units:
906 if u.machine_id == machine_id:
907 unit = u
908 break
909 return unit
910
911 def _get_machine_info(
912 self,
913 model,
914 machine_id: str,
915 ) -> (str, str):
916 """Get machine info
917
918 :param: model: Model object
919 :param: machine_id: Machine id
920
921 :return: (str, str): (machine, series)
922 """
923 if machine_id not in model.machines:
924 msg = "Machine {} not found in model".format(machine_id)
925 self.log.error(msg=msg)
926 raise JujuMachineNotFound(msg)
927 machine = model.machines[machine_id]
928 return machine, machine.series
929
930 async def execute_action(
931 self,
932 application_name: str,
933 model_name: str,
934 action_name: str,
935 db_dict: dict = None,
936 machine_id: str = None,
937 progress_timeout: float = None,
938 total_timeout: float = None,
939 **kwargs,
940 ):
941 """Execute action
942
943 :param: application_name: Application name
944 :param: model_name: Model name
945 :param: action_name: Name of the action
946 :param: db_dict: Dictionary with data of the DB to write the updates
947 :param: machine_id Machine id
948 :param: progress_timeout: Maximum time between two updates in the model
949 :param: total_timeout: Timeout for the entity to be active
950
951 :return: (str, str): (output and status)
952 """
953 self.log.debug(
954 "Executing action {} using params {}".format(action_name, kwargs)
955 )
956 # Get controller
957 controller = await self.get_controller()
958
959 # Get model
960 model = await self.get_model(controller, model_name)
961
962 try:
963 # Get application
964 application = self._get_application(
965 model,
966 application_name=application_name,
967 )
968 if application is None:
969 raise JujuApplicationNotFound("Cannot execute action")
970 # Racing condition:
971 # Ocassionally, self._get_leader_unit() will return None
972 # because the leader elected hook has not been triggered yet.
973 # Therefore, we are doing some retries. If it happens again,
974 # re-open bug 1236
975 if machine_id is None:
976 unit = await self._get_leader_unit(application)
977 self.log.debug(
978 "Action {} is being executed on the leader unit {}".format(
979 action_name, unit.name
980 )
981 )
982 else:
983 unit = self._get_unit(application, machine_id)
984 if not unit:
985 raise JujuError(
986 "A unit with machine id {} not in available units".format(
987 machine_id
988 )
989 )
990 self.log.debug(
991 "Action {} is being executed on {} unit".format(
992 action_name, unit.name
993 )
994 )
995
996 actions = await application.get_actions()
997
998 if action_name not in actions:
999 raise JujuActionNotFound(
1000 "Action {} not in available actions".format(action_name)
1001 )
1002
1003 action = await unit.run_action(action_name, **kwargs)
1004
1005 self.log.debug(
1006 "Wait until action {} is completed in application {} (model={})".format(
1007 action_name, application_name, model_name
1008 )
1009 )
1010 await JujuModelWatcher.wait_for(
1011 model=model,
1012 entity=action,
1013 progress_timeout=progress_timeout,
1014 total_timeout=total_timeout,
1015 db_dict=db_dict,
1016 n2vc=self.n2vc,
1017 vca_id=self.vca_connection._vca_id,
1018 )
1019
1020 output = await model.get_action_output(action_uuid=action.entity_id)
1021 status = await model.get_action_status(uuid_or_prefix=action.entity_id)
1022 status = (
1023 status[action.entity_id] if action.entity_id in status else "failed"
1024 )
1025
1026 self.log.debug(
1027 "Action {} completed with status {} in application {} (model={})".format(
1028 action_name, action.status, application_name, model_name
1029 )
1030 )
1031 finally:
1032 await self.disconnect_model(model)
1033 await self.disconnect_controller(controller)
1034
1035 return output, status
1036
1037 async def get_actions(self, application_name: str, model_name: str) -> dict:
1038 """Get list of actions
1039
1040 :param: application_name: Application name
1041 :param: model_name: Model name
1042
1043 :return: Dict with this format
1044 {
1045 "action_name": "Description of the action",
1046 ...
1047 }
1048 """
1049 self.log.debug(
1050 "Getting list of actions for application {}".format(application_name)
1051 )
1052
1053 # Get controller
1054 controller = await self.get_controller()
1055
1056 # Get model
1057 model = await self.get_model(controller, model_name)
1058
1059 try:
1060 # Get application
1061 application = self._get_application(
1062 model,
1063 application_name=application_name,
1064 )
1065
1066 # Return list of actions
1067 return await application.get_actions()
1068
1069 finally:
1070 # Disconnect from model and controller
1071 await self.disconnect_model(model)
1072 await self.disconnect_controller(controller)
1073
1074 async def get_metrics(self, model_name: str, application_name: str) -> dict:
1075 """Get the metrics collected by the VCA.
1076
1077 :param model_name The name or unique id of the network service
1078 :param application_name The name of the application
1079 """
1080 if not model_name or not application_name:
1081 raise Exception("model_name and application_name must be non-empty strings")
1082 metrics = {}
1083 controller = await self.get_controller()
1084 model = await self.get_model(controller, model_name)
1085 try:
1086 application = self._get_application(model, application_name)
1087 if application is not None:
1088 metrics = await application.get_metrics()
1089 finally:
1090 self.disconnect_model(model)
1091 self.disconnect_controller(controller)
1092 return metrics
1093
1094 async def add_relation(
1095 self,
1096 model_name: str,
1097 endpoint_1: str,
1098 endpoint_2: str,
1099 ):
1100 """Add relation
1101
1102 :param: model_name: Model name
1103 :param: endpoint_1 First endpoint name
1104 ("app:endpoint" format or directly the saas name)
1105 :param: endpoint_2: Second endpoint name (^ same format)
1106 """
1107
1108 self.log.debug("Adding relation: {} -> {}".format(endpoint_1, endpoint_2))
1109
1110 # Get controller
1111 controller = await self.get_controller()
1112
1113 # Get model
1114 model = await self.get_model(controller, model_name)
1115
1116 # Add relation
1117 try:
1118 await model.add_relation(endpoint_1, endpoint_2)
1119 except juju.errors.JujuAPIError as e:
1120 if "not found" in e.message:
1121 self.log.warning("Relation not found: {}".format(e.message))
1122 return
1123 if "already exists" in e.message:
1124 self.log.warning("Relation already exists: {}".format(e.message))
1125 return
1126 # another exception, raise it
1127 raise e
1128 finally:
1129 await self.disconnect_model(model)
1130 await self.disconnect_controller(controller)
1131
1132 async def offer(self, endpoint: RelationEndpoint) -> Offer:
1133 """
1134 Create an offer from a RelationEndpoint
1135
1136 :param: endpoint: Relation endpoint
1137
1138 :return: Offer object
1139 """
1140 model_name = endpoint.model_name
1141 offer_name = f"{endpoint.application_name}-{endpoint.endpoint_name}"
1142 controller = await self.get_controller()
1143 model = None
1144 try:
1145 model = await self.get_model(controller, model_name)
1146 await model.create_offer(endpoint.endpoint, offer_name=offer_name)
1147 offer_list = await self._list_offers(model_name, offer_name=offer_name)
1148 if offer_list:
1149 return Offer(offer_list[0].offer_url)
1150 else:
1151 raise Exception("offer was not created")
1152 except juju.errors.JujuError as e:
1153 if "application offer already exists" not in e.message:
1154 raise e
1155 finally:
1156 if model:
1157 self.disconnect_model(model)
1158 self.disconnect_controller(controller)
1159
1160 async def consume(
1161 self,
1162 model_name: str,
1163 offer: Offer,
1164 provider_libjuju: "Libjuju",
1165 ) -> str:
1166 """
1167 Consumes a remote offer in the model. Relations can be created later using "juju relate".
1168
1169 :param: model_name: Model name
1170 :param: offer: Offer object to consume
1171 :param: provider_libjuju: Libjuju object of the provider endpoint
1172
1173 :raises ParseError if there's a problem parsing the offer_url
1174 :raises JujuError if remote offer includes and endpoint
1175 :raises JujuAPIError if the operation is not successful
1176
1177 :returns: Saas name. It is the application name in the model that reference the remote application.
1178 """
1179 saas_name = f'{offer.name}-{offer.model_name.replace("-", "")}'
1180 if offer.vca_id:
1181 saas_name = f"{saas_name}-{offer.vca_id}"
1182 controller = await self.get_controller()
1183 model = None
1184 provider_controller = None
1185 try:
1186 model = await controller.get_model(model_name)
1187 provider_controller = await provider_libjuju.get_controller()
1188 await model.consume(
1189 offer.url, application_alias=saas_name, controller=provider_controller
1190 )
1191 return saas_name
1192 finally:
1193 if model:
1194 await self.disconnect_model(model)
1195 if provider_controller:
1196 await provider_libjuju.disconnect_controller(provider_controller)
1197 await self.disconnect_controller(controller)
1198
1199 async def destroy_model(self, model_name: str, total_timeout: float = 1800):
1200 """
1201 Destroy model
1202
1203 :param: model_name: Model name
1204 :param: total_timeout: Timeout
1205 """
1206
1207 controller = await self.get_controller()
1208 model = None
1209 try:
1210 if not await self.model_exists(model_name, controller=controller):
1211 return
1212
1213 self.log.debug("Destroying model {}".format(model_name))
1214
1215 model = await self.get_model(controller, model_name)
1216 # Destroy machines that are manually provisioned
1217 # and still are in pending state
1218 await self._destroy_pending_machines(model, only_manual=True)
1219 await self.disconnect_model(model)
1220
1221 await self._destroy_model(
1222 model_name,
1223 controller,
1224 timeout=total_timeout,
1225 )
1226 finally:
1227 if model:
1228 await self.disconnect_model(model)
1229 await self.disconnect_controller(controller)
1230
1231 async def _destroy_model(
1232 self, model_name: str, controller: Controller, timeout: float = 1800
1233 ):
1234 """
1235 Destroy model from controller
1236
1237 :param: model: Model name to be removed
1238 :param: controller: Controller object
1239 :param: timeout: Timeout in seconds
1240 """
1241
1242 async def _destroy_model_loop(model_name: str, controller: Controller):
1243 while await self.model_exists(model_name, controller=controller):
1244 await controller.destroy_model(
1245 model_name, destroy_storage=True, force=True, max_wait=0
1246 )
1247 await asyncio.sleep(5)
1248
1249 try:
1250 await asyncio.wait_for(
1251 _destroy_model_loop(model_name, controller), timeout=timeout
1252 )
1253 except asyncio.TimeoutError:
1254 raise Exception(
1255 "Timeout waiting for model {} to be destroyed".format(model_name)
1256 )
1257
1258 async def destroy_application(
1259 self, model_name: str, application_name: str, total_timeout: float
1260 ):
1261 """
1262 Destroy application
1263
1264 :param: model_name: Model name
1265 :param: application_name: Application name
1266 :param: total_timeout: Timeout
1267 """
1268
1269 controller = await self.get_controller()
1270 model = None
1271
1272 try:
1273 model = await self.get_model(controller, model_name)
1274 self.log.debug(
1275 "Destroying application {} in model {}".format(
1276 application_name, model_name
1277 )
1278 )
1279 application = self._get_application(model, application_name)
1280 if application:
1281 await application.destroy()
1282 else:
1283 self.log.warning("Application not found: {}".format(application_name))
1284
1285 self.log.debug(
1286 "Waiting for application {} to be destroyed in model {}...".format(
1287 application_name, model_name
1288 )
1289 )
1290 if total_timeout is None:
1291 total_timeout = 3600
1292 end = time.time() + total_timeout
1293 while time.time() < end:
1294 if not self._get_application(model, application_name):
1295 self.log.debug(
1296 "The application {} was destroyed in model {} ".format(
1297 application_name, model_name
1298 )
1299 )
1300 return
1301 await asyncio.sleep(5)
1302 raise Exception(
1303 "Timeout waiting for application {} to be destroyed in model {}".format(
1304 application_name, model_name
1305 )
1306 )
1307 finally:
1308 if model is not None:
1309 await self.disconnect_model(model)
1310 await self.disconnect_controller(controller)
1311
1312 async def _destroy_pending_machines(self, model: Model, only_manual: bool = False):
1313 """
1314 Destroy pending machines in a given model
1315
1316 :param: only_manual: Bool that indicates only manually provisioned
1317 machines should be destroyed (if True), or that
1318 all pending machines should be destroyed
1319 """
1320 status = await model.get_status()
1321 for machine_id in status.machines:
1322 machine_status = status.machines[machine_id]
1323 if machine_status.agent_status.status == "pending":
1324 if only_manual and not machine_status.instance_id.startswith("manual:"):
1325 break
1326 machine = model.machines[machine_id]
1327 await machine.destroy(force=True)
1328
1329 async def configure_application(
1330 self, model_name: str, application_name: str, config: dict = None
1331 ):
1332 """Configure application
1333
1334 :param: model_name: Model name
1335 :param: application_name: Application name
1336 :param: config: Config to apply to the charm
1337 """
1338 self.log.debug("Configuring application {}".format(application_name))
1339
1340 if config:
1341 controller = await self.get_controller()
1342 model = None
1343 try:
1344 model = await self.get_model(controller, model_name)
1345 application = self._get_application(
1346 model,
1347 application_name=application_name,
1348 )
1349 await application.set_config(config)
1350 finally:
1351 if model:
1352 await self.disconnect_model(model)
1353 await self.disconnect_controller(controller)
1354
1355 def handle_exception(self, loop, context):
1356 # All unhandled exceptions by libjuju are handled here.
1357 pass
1358
1359 async def health_check(self, interval: float = 300.0):
1360 """
1361 Health check to make sure controller and controller_model connections are OK
1362
1363 :param: interval: Time in seconds between checks
1364 """
1365 controller = None
1366 while True:
1367 try:
1368 controller = await self.get_controller()
1369 # self.log.debug("VCA is alive")
1370 except Exception as e:
1371 self.log.error("Health check to VCA failed: {}".format(e))
1372 finally:
1373 await self.disconnect_controller(controller)
1374 await asyncio.sleep(interval)
1375
1376 async def list_models(self, contains: str = None) -> [str]:
1377 """List models with certain names
1378
1379 :param: contains: String that is contained in model name
1380
1381 :retur: [models] Returns list of model names
1382 """
1383
1384 controller = await self.get_controller()
1385 try:
1386 models = await controller.list_models()
1387 if contains:
1388 models = [model for model in models if contains in model]
1389 return models
1390 finally:
1391 await self.disconnect_controller(controller)
1392
1393 async def _list_offers(
1394 self, model_name: str, offer_name: str = None
1395 ) -> QueryApplicationOffersResults:
1396 """
1397 List offers within a model
1398
1399 :param: model_name: Model name
1400 :param: offer_name: Offer name to filter.
1401
1402 :return: Returns application offers results in the model
1403 """
1404
1405 controller = await self.get_controller()
1406 try:
1407 offers = (await controller.list_offers(model_name)).results
1408 if offer_name:
1409 matching_offer = []
1410 for offer in offers:
1411 if offer.offer_name == offer_name:
1412 matching_offer.append(offer)
1413 break
1414 offers = matching_offer
1415 return offers
1416 finally:
1417 await self.disconnect_controller(controller)
1418
1419 async def add_k8s(
1420 self,
1421 name: str,
1422 rbac_id: str,
1423 token: str,
1424 client_cert_data: str,
1425 configuration: Configuration,
1426 storage_class: str,
1427 credential_name: str = None,
1428 ):
1429 """
1430 Add a Kubernetes cloud to the controller
1431
1432 Similar to the `juju add-k8s` command in the CLI
1433
1434 :param: name: Name for the K8s cloud
1435 :param: configuration: Kubernetes configuration object
1436 :param: storage_class: Storage Class to use in the cloud
1437 :param: credential_name: Storage Class to use in the cloud
1438 """
1439
1440 if not storage_class:
1441 raise Exception("storage_class must be a non-empty string")
1442 if not name:
1443 raise Exception("name must be a non-empty string")
1444 if not configuration:
1445 raise Exception("configuration must be provided")
1446
1447 endpoint = configuration.host
1448 credential = self.get_k8s_cloud_credential(
1449 configuration,
1450 client_cert_data,
1451 token,
1452 )
1453 credential.attrs[RBAC_LABEL_KEY_NAME] = rbac_id
1454 cloud = client.Cloud(
1455 type_="kubernetes",
1456 auth_types=[credential.auth_type],
1457 endpoint=endpoint,
1458 ca_certificates=[client_cert_data],
1459 config={
1460 "operator-storage": storage_class,
1461 "workload-storage": storage_class,
1462 },
1463 )
1464
1465 return await self.add_cloud(
1466 name, cloud, credential, credential_name=credential_name
1467 )
1468
1469 def get_k8s_cloud_credential(
1470 self,
1471 configuration: Configuration,
1472 client_cert_data: str,
1473 token: str = None,
1474 ) -> client.CloudCredential:
1475 attrs = {}
1476 # TODO: Test with AKS
1477 key = None # open(configuration.key_file, "r").read()
1478 username = configuration.username
1479 password = configuration.password
1480
1481 if client_cert_data:
1482 attrs["ClientCertificateData"] = client_cert_data
1483 if key:
1484 attrs["ClientKeyData"] = key
1485 if token:
1486 if username or password:
1487 raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass")
1488 attrs["Token"] = token
1489
1490 auth_type = None
1491 if key:
1492 auth_type = "oauth2"
1493 if client_cert_data:
1494 auth_type = "oauth2withcert"
1495 if not token:
1496 raise JujuInvalidK8sConfiguration(
1497 "missing token for auth type {}".format(auth_type)
1498 )
1499 elif username:
1500 if not password:
1501 self.log.debug(
1502 "credential for user {} has empty password".format(username)
1503 )
1504 attrs["username"] = username
1505 attrs["password"] = password
1506 if client_cert_data:
1507 auth_type = "userpasswithcert"
1508 else:
1509 auth_type = "userpass"
1510 elif client_cert_data and token:
1511 auth_type = "certificate"
1512 else:
1513 raise JujuInvalidK8sConfiguration("authentication method not supported")
1514 return client.CloudCredential(auth_type=auth_type, attrs=attrs)
1515
1516 async def add_cloud(
1517 self,
1518 name: str,
1519 cloud: Cloud,
1520 credential: CloudCredential = None,
1521 credential_name: str = None,
1522 ) -> Cloud:
1523 """
1524 Add cloud to the controller
1525
1526 :param: name: Name of the cloud to be added
1527 :param: cloud: Cloud object
1528 :param: credential: CloudCredentials object for the cloud
1529 :param: credential_name: Credential name.
1530 If not defined, cloud of the name will be used.
1531 """
1532 controller = await self.get_controller()
1533 try:
1534 _ = await controller.add_cloud(name, cloud)
1535 if credential:
1536 await controller.add_credential(
1537 credential_name or name, credential=credential, cloud=name
1538 )
1539 # Need to return the object returned by the controller.add_cloud() function
1540 # I'm returning the original value now until this bug is fixed:
1541 # https://github.com/juju/python-libjuju/issues/443
1542 return cloud
1543 finally:
1544 await self.disconnect_controller(controller)
1545
1546 async def remove_cloud(self, name: str):
1547 """
1548 Remove cloud
1549
1550 :param: name: Name of the cloud to be removed
1551 """
1552 controller = await self.get_controller()
1553 try:
1554 await controller.remove_cloud(name)
1555 except juju.errors.JujuError as e:
1556 if len(e.errors) == 1 and f'cloud "{name}" not found' == e.errors[0]:
1557 self.log.warning(f"Cloud {name} not found, so it could not be deleted.")
1558 else:
1559 raise e
1560 finally:
1561 await self.disconnect_controller(controller)
1562
1563 @retry(attempts=20, delay=5, fallback=JujuLeaderUnitNotFound())
1564 async def _get_leader_unit(self, application: Application) -> Unit:
1565 unit = None
1566 for u in application.units:
1567 if await u.is_leader_from_status():
1568 unit = u
1569 break
1570 if not unit:
1571 raise Exception()
1572 return unit
1573
1574 async def get_cloud_credentials(self, cloud: Cloud) -> typing.List:
1575 """
1576 Get cloud credentials
1577
1578 :param: cloud: Cloud object. The returned credentials will be from this cloud.
1579
1580 :return: List of credentials object associated to the specified cloud
1581
1582 """
1583 controller = await self.get_controller()
1584 try:
1585 facade = client.CloudFacade.from_connection(controller.connection())
1586 cloud_cred_tag = tag.credential(
1587 cloud.name, self.vca_connection.data.user, cloud.credential_name
1588 )
1589 params = [client.Entity(cloud_cred_tag)]
1590 return (await facade.Credential(params)).results
1591 finally:
1592 await self.disconnect_controller(controller)
1593
1594 async def check_application_exists(self, model_name, application_name) -> bool:
1595 """Check application exists
1596
1597 :param: model_name: Model Name
1598 :param: application_name: Application Name
1599
1600 :return: Boolean
1601 """
1602
1603 model = None
1604 controller = await self.get_controller()
1605 try:
1606 model = await self.get_model(controller, model_name)
1607 self.log.debug(
1608 "Checking if application {} exists in model {}".format(
1609 application_name, model_name
1610 )
1611 )
1612 return self._get_application(model, application_name) is not None
1613 finally:
1614 if model:
1615 await self.disconnect_model(model)
1616 await self.disconnect_controller(controller)