Fix 1462
[osm/N2VC.git] / n2vc / libjuju.py
1 # Copyright 2020 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import logging
17
18 import time
19
20 from juju.errors import JujuAPIError
21 from juju.model import Model
22 from juju.machine import Machine
23 from juju.application import Application
24 from juju.unit import Unit
25 from juju.client._definitions import (
26 FullStatus,
27 QueryApplicationOffersResults,
28 Cloud,
29 CloudCredential,
30 )
31 from juju.controller import Controller
32 from juju.client import client
33 from juju import tag
34
35 from n2vc.config import ModelConfig
36 from n2vc.juju_watcher import JujuModelWatcher
37 from n2vc.provisioner import AsyncSSHProvisioner
38 from n2vc.n2vc_conn import N2VCConnector
39 from n2vc.exceptions import (
40 JujuMachineNotFound,
41 JujuApplicationNotFound,
42 JujuLeaderUnitNotFound,
43 JujuActionNotFound,
44 JujuControllerFailedConnecting,
45 JujuApplicationExists,
46 JujuInvalidK8sConfiguration,
47 JujuError
48 )
49 from n2vc.utils import DB_DATA
50 from osm_common.dbbase import DbException
51 from kubernetes.client.configuration import Configuration
52
53 RBAC_LABEL_KEY_NAME = "rbac-id"
54
55
56 class Libjuju:
57 def __init__(
58 self,
59 endpoint: str,
60 api_proxy: str,
61 username: str,
62 password: str,
63 cacert: str,
64 loop: asyncio.AbstractEventLoop = None,
65 log: logging.Logger = None,
66 db: dict = None,
67 n2vc: N2VCConnector = None,
68 model_config: ModelConfig = {},
69 ):
70 """
71 Constructor
72
73 :param: endpoint: Endpoint of the juju controller (host:port)
74 :param: api_proxy: Endpoint of the juju controller - Reachable from the VNFs
75 :param: username: Juju username
76 :param: password: Juju password
77 :param: cacert: Juju CA Certificate
78 :param: loop: Asyncio loop
79 :param: log: Logger
80 :param: db: DB object
81 :param: n2vc: N2VC object
82 :param: apt_mirror: APT Mirror
83 :param: enable_os_upgrade: Enable OS Upgrade
84 """
85
86 self.log = log or logging.getLogger("Libjuju")
87 self.db = db
88 db_endpoints = self._get_api_endpoints_db()
89 self.endpoints = None
90 if (db_endpoints and endpoint not in db_endpoints) or not db_endpoints:
91 self.endpoints = [endpoint]
92 self._update_api_endpoints_db(self.endpoints)
93 else:
94 self.endpoints = db_endpoints
95 self.api_proxy = api_proxy
96 self.username = username
97 self.password = password
98 self.cacert = cacert
99 self.loop = loop or asyncio.get_event_loop()
100 self.n2vc = n2vc
101
102 # Generate config for models
103 self.model_config = model_config
104
105 self.loop.set_exception_handler(self.handle_exception)
106 self.creating_model = asyncio.Lock(loop=self.loop)
107
108 self.log.debug("Libjuju initialized!")
109
110 self.health_check_task = self._create_health_check_task()
111
112 def _create_health_check_task(self):
113 return self.loop.create_task(self.health_check())
114
115 async def get_controller(self, timeout: float = 15.0) -> Controller:
116 """
117 Get controller
118
119 :param: timeout: Time in seconds to wait for controller to connect
120 """
121 controller = None
122 try:
123 controller = Controller(loop=self.loop)
124 await asyncio.wait_for(
125 controller.connect(
126 endpoint=self.endpoints,
127 username=self.username,
128 password=self.password,
129 cacert=self.cacert,
130 ),
131 timeout=timeout,
132 )
133 endpoints = await controller.api_endpoints
134 if self.endpoints != endpoints:
135 self.endpoints = endpoints
136 self._update_api_endpoints_db(self.endpoints)
137 return controller
138 except asyncio.CancelledError as e:
139 raise e
140 except Exception as e:
141 self.log.error(
142 "Failed connecting to controller: {}...".format(self.endpoints)
143 )
144 if controller:
145 await self.disconnect_controller(controller)
146 raise JujuControllerFailedConnecting(e)
147
148 async def disconnect(self):
149 """Disconnect"""
150 # Cancel health check task
151 self.health_check_task.cancel()
152 self.log.debug("Libjuju disconnected!")
153
154 async def disconnect_model(self, model: Model):
155 """
156 Disconnect model
157
158 :param: model: Model that will be disconnected
159 """
160 await model.disconnect()
161
162 async def disconnect_controller(self, controller: Controller):
163 """
164 Disconnect controller
165
166 :param: controller: Controller that will be disconnected
167 """
168 if controller:
169 await controller.disconnect()
170
171 async def add_model(self, model_name: str, cloud_name: str, credential_name=None):
172 """
173 Create model
174
175 :param: model_name: Model name
176 :param: cloud_name: Cloud name
177 :param: credential_name: Credential name to use for adding the model
178 If not specified, same name as the cloud will be used.
179 """
180
181 # Get controller
182 controller = await self.get_controller()
183 model = None
184 try:
185 # Block until other workers have finished model creation
186 while self.creating_model.locked():
187 await asyncio.sleep(0.1)
188
189 # Create the model
190 async with self.creating_model:
191 if await self.model_exists(model_name, controller=controller):
192 return
193 self.log.debug("Creating model {}".format(model_name))
194 model = await controller.add_model(
195 model_name,
196 config=self.model_config,
197 cloud_name=cloud_name,
198 credential_name=credential_name or cloud_name,
199 )
200 finally:
201 if model:
202 await self.disconnect_model(model)
203 await self.disconnect_controller(controller)
204
205 async def get_executed_actions(self, model_name: str) -> list:
206 """
207 Get executed/history of actions for a model.
208
209 :param: model_name: Model name, str.
210 :return: List of executed actions for a model.
211 """
212 model = None
213 executed_actions = []
214 controller = await self.get_controller()
215 try:
216 model = await self.get_model(controller, model_name)
217 # Get all unique action names
218 actions = {}
219 for application in model.applications:
220 application_actions = await self.get_actions(application, model_name)
221 actions.update(application_actions)
222 # Get status of all actions
223 for application_action in actions:
224 app_action_status_list = await model.get_action_status(name=application_action)
225 for action_id, action_status in app_action_status_list.items():
226 executed_action = {"id": action_id, "action": application_action,
227 "status": action_status}
228 # Get action output by id
229 action_status = await model.get_action_output(executed_action["id"])
230 for k, v in action_status.items():
231 executed_action[k] = v
232 executed_actions.append(executed_action)
233 except Exception as e:
234 raise JujuError("Error in getting executed actions for model: {}. Error: {}"
235 .format(model_name, str(e)))
236 finally:
237 if model:
238 await self.disconnect_model(model)
239 await self.disconnect_controller(controller)
240 return executed_actions
241
242 async def get_application_configs(self, model_name: str, application_name: str) -> dict:
243 """
244 Get available configs for an application.
245
246 :param: model_name: Model name, str.
247 :param: application_name: Application name, str.
248
249 :return: A dict which has key - action name, value - action description
250 """
251 model = None
252 application_configs = {}
253 controller = await self.get_controller()
254 try:
255 model = await self.get_model(controller, model_name)
256 application = self._get_application(model, application_name=application_name)
257 application_configs = await application.get_config()
258 except Exception as e:
259 raise JujuError("Error in getting configs for application: {} in model: {}. Error: {}"
260 .format(application_name, model_name, str(e)))
261 finally:
262 if model:
263 await self.disconnect_model(model)
264 await self.disconnect_controller(controller)
265 return application_configs
266
267 async def get_model(
268 self, controller: Controller, model_name: str, id=None
269 ) -> Model:
270 """
271 Get model from controller
272
273 :param: controller: Controller
274 :param: model_name: Model name
275
276 :return: Model: The created Juju model object
277 """
278 return await controller.get_model(model_name)
279
280 async def model_exists(
281 self, model_name: str, controller: Controller = None
282 ) -> bool:
283 """
284 Check if model exists
285
286 :param: controller: Controller
287 :param: model_name: Model name
288
289 :return bool
290 """
291 need_to_disconnect = False
292
293 # Get controller if not passed
294 if not controller:
295 controller = await self.get_controller()
296 need_to_disconnect = True
297
298 # Check if model exists
299 try:
300 return model_name in await controller.list_models()
301 finally:
302 if need_to_disconnect:
303 await self.disconnect_controller(controller)
304
305 async def models_exist(self, model_names: [str]) -> (bool, list):
306 """
307 Check if models exists
308
309 :param: model_names: List of strings with model names
310
311 :return (bool, list[str]): (True if all models exists, List of model names that don't exist)
312 """
313 if not model_names:
314 raise Exception(
315 "model_names must be a non-empty array. Given value: {}".format(
316 model_names
317 )
318 )
319 non_existing_models = []
320 models = await self.list_models()
321 existing_models = list(set(models).intersection(model_names))
322 non_existing_models = list(set(model_names) - set(existing_models))
323
324 return (
325 len(non_existing_models) == 0,
326 non_existing_models,
327 )
328
329 async def get_model_status(self, model_name: str) -> FullStatus:
330 """
331 Get model status
332
333 :param: model_name: Model name
334
335 :return: Full status object
336 """
337 controller = await self.get_controller()
338 model = await self.get_model(controller, model_name)
339 try:
340 return await model.get_status()
341 finally:
342 await self.disconnect_model(model)
343 await self.disconnect_controller(controller)
344
345 async def create_machine(
346 self,
347 model_name: str,
348 machine_id: str = None,
349 db_dict: dict = None,
350 progress_timeout: float = None,
351 total_timeout: float = None,
352 series: str = "xenial",
353 wait: bool = True,
354 ) -> (Machine, bool):
355 """
356 Create machine
357
358 :param: model_name: Model name
359 :param: machine_id: Machine id
360 :param: db_dict: Dictionary with data of the DB to write the updates
361 :param: progress_timeout: Maximum time between two updates in the model
362 :param: total_timeout: Timeout for the entity to be active
363 :param: series: Series of the machine (xenial, bionic, focal, ...)
364 :param: wait: Wait until machine is ready
365
366 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
367 if the machine is new or it already existed
368 """
369 new = False
370 machine = None
371
372 self.log.debug(
373 "Creating machine (id={}) in model: {}".format(machine_id, model_name)
374 )
375
376 # Get controller
377 controller = await self.get_controller()
378
379 # Get model
380 model = await self.get_model(controller, model_name)
381 try:
382 if machine_id is not None:
383 self.log.debug(
384 "Searching machine (id={}) in model {}".format(
385 machine_id, model_name
386 )
387 )
388
389 # Get machines from model and get the machine with machine_id if exists
390 machines = await model.get_machines()
391 if machine_id in machines:
392 self.log.debug(
393 "Machine (id={}) found in model {}".format(
394 machine_id, model_name
395 )
396 )
397 machine = machines[machine_id]
398 else:
399 raise JujuMachineNotFound("Machine {} not found".format(machine_id))
400
401 if machine is None:
402 self.log.debug("Creating a new machine in model {}".format(model_name))
403
404 # Create machine
405 machine = await model.add_machine(
406 spec=None, constraints=None, disks=None, series=series
407 )
408 new = True
409
410 # Wait until the machine is ready
411 self.log.debug(
412 "Wait until machine {} is ready in model {}".format(
413 machine.entity_id, model_name
414 )
415 )
416 if wait:
417 await JujuModelWatcher.wait_for(
418 model=model,
419 entity=machine,
420 progress_timeout=progress_timeout,
421 total_timeout=total_timeout,
422 db_dict=db_dict,
423 n2vc=self.n2vc,
424 )
425 finally:
426 await self.disconnect_model(model)
427 await self.disconnect_controller(controller)
428
429 self.log.debug(
430 "Machine {} ready at {} in model {}".format(
431 machine.entity_id, machine.dns_name, model_name
432 )
433 )
434 return machine, new
435
436 async def provision_machine(
437 self,
438 model_name: str,
439 hostname: str,
440 username: str,
441 private_key_path: str,
442 db_dict: dict = None,
443 progress_timeout: float = None,
444 total_timeout: float = None,
445 ) -> str:
446 """
447 Manually provisioning of a machine
448
449 :param: model_name: Model name
450 :param: hostname: IP to access the machine
451 :param: username: Username to login to the machine
452 :param: private_key_path: Local path for the private key
453 :param: db_dict: Dictionary with data of the DB to write the updates
454 :param: progress_timeout: Maximum time between two updates in the model
455 :param: total_timeout: Timeout for the entity to be active
456
457 :return: (Entity): Machine id
458 """
459 self.log.debug(
460 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
461 model_name, hostname, username
462 )
463 )
464
465 # Get controller
466 controller = await self.get_controller()
467
468 # Get model
469 model = await self.get_model(controller, model_name)
470
471 try:
472 # Get provisioner
473 provisioner = AsyncSSHProvisioner(
474 host=hostname,
475 user=username,
476 private_key_path=private_key_path,
477 log=self.log,
478 )
479
480 # Provision machine
481 params = await provisioner.provision_machine()
482
483 params.jobs = ["JobHostUnits"]
484
485 self.log.debug("Adding machine to model")
486 connection = model.connection()
487 client_facade = client.ClientFacade.from_connection(connection)
488
489 results = await client_facade.AddMachines(params=[params])
490 error = results.machines[0].error
491
492 if error:
493 msg = "Error adding machine: {}".format(error.message)
494 self.log.error(msg=msg)
495 raise ValueError(msg)
496
497 machine_id = results.machines[0].machine
498
499 self.log.debug("Installing Juju agent into machine {}".format(machine_id))
500 asyncio.ensure_future(
501 provisioner.install_agent(
502 connection=connection,
503 nonce=params.nonce,
504 machine_id=machine_id,
505 proxy=self.api_proxy,
506 series=params.series,
507 )
508 )
509
510 machine = None
511 for _ in range(10):
512 machine_list = await model.get_machines()
513 if machine_id in machine_list:
514 self.log.debug("Machine {} found in model!".format(machine_id))
515 machine = model.machines.get(machine_id)
516 break
517 await asyncio.sleep(2)
518
519 if machine is None:
520 msg = "Machine {} not found in model".format(machine_id)
521 self.log.error(msg=msg)
522 raise JujuMachineNotFound(msg)
523
524 self.log.debug(
525 "Wait until machine {} is ready in model {}".format(
526 machine.entity_id, model_name
527 )
528 )
529 await JujuModelWatcher.wait_for(
530 model=model,
531 entity=machine,
532 progress_timeout=progress_timeout,
533 total_timeout=total_timeout,
534 db_dict=db_dict,
535 n2vc=self.n2vc,
536 )
537 except Exception as e:
538 raise e
539 finally:
540 await self.disconnect_model(model)
541 await self.disconnect_controller(controller)
542
543 self.log.debug(
544 "Machine provisioned {} in model {}".format(machine_id, model_name)
545 )
546
547 return machine_id
548
549 async def deploy(
550 self, uri: str, model_name: str, wait: bool = True, timeout: float = 3600
551 ):
552 """
553 Deploy bundle or charm: Similar to the juju CLI command `juju deploy`
554
555 :param: uri: Path or Charm Store uri in which the charm or bundle can be found
556 :param: model_name: Model name
557 :param: wait: Indicates whether to wait or not until all applications are active
558 :param: timeout: Time in seconds to wait until all applications are active
559 """
560 controller = await self.get_controller()
561 model = await self.get_model(controller, model_name)
562 try:
563 await model.deploy(uri)
564 if wait:
565 await JujuModelWatcher.wait_for_model(model, timeout=timeout)
566 self.log.debug("All units active in model {}".format(model_name))
567 finally:
568 await self.disconnect_model(model)
569 await self.disconnect_controller(controller)
570
571 async def deploy_charm(
572 self,
573 application_name: str,
574 path: str,
575 model_name: str,
576 machine_id: str,
577 db_dict: dict = None,
578 progress_timeout: float = None,
579 total_timeout: float = None,
580 config: dict = None,
581 series: str = None,
582 num_units: int = 1,
583 ):
584 """Deploy charm
585
586 :param: application_name: Application name
587 :param: path: Local path to the charm
588 :param: model_name: Model name
589 :param: machine_id ID of the machine
590 :param: db_dict: Dictionary with data of the DB to write the updates
591 :param: progress_timeout: Maximum time between two updates in the model
592 :param: total_timeout: Timeout for the entity to be active
593 :param: config: Config for the charm
594 :param: series: Series of the charm
595 :param: num_units: Number of units
596
597 :return: (juju.application.Application): Juju application
598 """
599 self.log.debug(
600 "Deploying charm {} to machine {} in model ~{}".format(
601 application_name, machine_id, model_name
602 )
603 )
604 self.log.debug("charm: {}".format(path))
605
606 # Get controller
607 controller = await self.get_controller()
608
609 # Get model
610 model = await self.get_model(controller, model_name)
611
612 try:
613 application = None
614 if application_name not in model.applications:
615
616 if machine_id is not None:
617 if machine_id not in model.machines:
618 msg = "Machine {} not found in model".format(machine_id)
619 self.log.error(msg=msg)
620 raise JujuMachineNotFound(msg)
621 machine = model.machines[machine_id]
622 series = machine.series
623
624 application = await model.deploy(
625 entity_url=path,
626 application_name=application_name,
627 channel="stable",
628 num_units=1,
629 series=series,
630 to=machine_id,
631 config=config,
632 )
633
634 self.log.debug(
635 "Wait until application {} is ready in model {}".format(
636 application_name, model_name
637 )
638 )
639 if num_units > 1:
640 for _ in range(num_units - 1):
641 m, _ = await self.create_machine(model_name, wait=False)
642 await application.add_unit(to=m.entity_id)
643
644 await JujuModelWatcher.wait_for(
645 model=model,
646 entity=application,
647 progress_timeout=progress_timeout,
648 total_timeout=total_timeout,
649 db_dict=db_dict,
650 n2vc=self.n2vc,
651 )
652 self.log.debug(
653 "Application {} is ready in model {}".format(
654 application_name, model_name
655 )
656 )
657 else:
658 raise JujuApplicationExists(
659 "Application {} exists".format(application_name)
660 )
661 finally:
662 await self.disconnect_model(model)
663 await self.disconnect_controller(controller)
664
665 return application
666
667 def _get_application(self, model: Model, application_name: str) -> Application:
668 """Get application
669
670 :param: model: Model object
671 :param: application_name: Application name
672
673 :return: juju.application.Application (or None if it doesn't exist)
674 """
675 if model.applications and application_name in model.applications:
676 return model.applications[application_name]
677
678 async def execute_action(
679 self,
680 application_name: str,
681 model_name: str,
682 action_name: str,
683 db_dict: dict = None,
684 progress_timeout: float = None,
685 total_timeout: float = None,
686 **kwargs
687 ):
688 """Execute action
689
690 :param: application_name: Application name
691 :param: model_name: Model name
692 :param: action_name: Name of the action
693 :param: db_dict: Dictionary with data of the DB to write the updates
694 :param: progress_timeout: Maximum time between two updates in the model
695 :param: total_timeout: Timeout for the entity to be active
696
697 :return: (str, str): (output and status)
698 """
699 self.log.debug(
700 "Executing action {} using params {}".format(action_name, kwargs)
701 )
702 # Get controller
703 controller = await self.get_controller()
704
705 # Get model
706 model = await self.get_model(controller, model_name)
707
708 try:
709 # Get application
710 application = self._get_application(
711 model,
712 application_name=application_name,
713 )
714 if application is None:
715 raise JujuApplicationNotFound("Cannot execute action")
716
717 # Get leader unit
718 # Racing condition:
719 # Ocassionally, self._get_leader_unit() will return None
720 # because the leader elected hook has not been triggered yet.
721 # Therefore, we are doing some retries. If it happens again,
722 # re-open bug 1236
723 attempts = 3
724 time_between_retries = 10
725 unit = None
726 for _ in range(attempts):
727 unit = await self._get_leader_unit(application)
728 if unit is None:
729 await asyncio.sleep(time_between_retries)
730 else:
731 break
732 if unit is None:
733 raise JujuLeaderUnitNotFound(
734 "Cannot execute action: leader unit not found"
735 )
736
737 actions = await application.get_actions()
738
739 if action_name not in actions:
740 raise JujuActionNotFound(
741 "Action {} not in available actions".format(action_name)
742 )
743
744 action = await unit.run_action(action_name, **kwargs)
745
746 self.log.debug(
747 "Wait until action {} is completed in application {} (model={})".format(
748 action_name, application_name, model_name
749 )
750 )
751 await JujuModelWatcher.wait_for(
752 model=model,
753 entity=action,
754 progress_timeout=progress_timeout,
755 total_timeout=total_timeout,
756 db_dict=db_dict,
757 n2vc=self.n2vc,
758 )
759
760 output = await model.get_action_output(action_uuid=action.entity_id)
761 status = await model.get_action_status(uuid_or_prefix=action.entity_id)
762 status = (
763 status[action.entity_id] if action.entity_id in status else "failed"
764 )
765
766 self.log.debug(
767 "Action {} completed with status {} in application {} (model={})".format(
768 action_name, action.status, application_name, model_name
769 )
770 )
771 finally:
772 await self.disconnect_model(model)
773 await self.disconnect_controller(controller)
774
775 return output, status
776
777 async def get_actions(self, application_name: str, model_name: str) -> dict:
778 """Get list of actions
779
780 :param: application_name: Application name
781 :param: model_name: Model name
782
783 :return: Dict with this format
784 {
785 "action_name": "Description of the action",
786 ...
787 }
788 """
789 self.log.debug(
790 "Getting list of actions for application {}".format(application_name)
791 )
792
793 # Get controller
794 controller = await self.get_controller()
795
796 # Get model
797 model = await self.get_model(controller, model_name)
798
799 try:
800 # Get application
801 application = self._get_application(
802 model,
803 application_name=application_name,
804 )
805
806 # Return list of actions
807 return await application.get_actions()
808
809 finally:
810 # Disconnect from model and controller
811 await self.disconnect_model(model)
812 await self.disconnect_controller(controller)
813
814 async def get_metrics(self, model_name: str, application_name: str) -> dict:
815 """Get the metrics collected by the VCA.
816
817 :param model_name The name or unique id of the network service
818 :param application_name The name of the application
819 """
820 if not model_name or not application_name:
821 raise Exception("model_name and application_name must be non-empty strings")
822 metrics = {}
823 controller = await self.get_controller()
824 model = await self.get_model(controller, model_name)
825 try:
826 application = self._get_application(model, application_name)
827 if application is not None:
828 metrics = await application.get_metrics()
829 finally:
830 self.disconnect_model(model)
831 self.disconnect_controller(controller)
832 return metrics
833
834 async def add_relation(
835 self,
836 model_name: str,
837 endpoint_1: str,
838 endpoint_2: str,
839 ):
840 """Add relation
841
842 :param: model_name: Model name
843 :param: endpoint_1 First endpoint name
844 ("app:endpoint" format or directly the saas name)
845 :param: endpoint_2: Second endpoint name (^ same format)
846 """
847
848 self.log.debug("Adding relation: {} -> {}".format(endpoint_1, endpoint_2))
849
850 # Get controller
851 controller = await self.get_controller()
852
853 # Get model
854 model = await self.get_model(controller, model_name)
855
856 # Add relation
857 try:
858 await model.add_relation(endpoint_1, endpoint_2)
859 except JujuAPIError as e:
860 if "not found" in e.message:
861 self.log.warning("Relation not found: {}".format(e.message))
862 return
863 if "already exists" in e.message:
864 self.log.warning("Relation already exists: {}".format(e.message))
865 return
866 # another exception, raise it
867 raise e
868 finally:
869 await self.disconnect_model(model)
870 await self.disconnect_controller(controller)
871
872 async def consume(
873 self,
874 offer_url: str,
875 model_name: str,
876 ):
877 """
878 Adds a remote offer to the model. Relations can be created later using "juju relate".
879
880 :param: offer_url: Offer Url
881 :param: model_name: Model name
882
883 :raises ParseError if there's a problem parsing the offer_url
884 :raises JujuError if remote offer includes and endpoint
885 :raises JujuAPIError if the operation is not successful
886 """
887 controller = await self.get_controller()
888 model = await controller.get_model(model_name)
889
890 try:
891 await model.consume(offer_url)
892 finally:
893 await self.disconnect_model(model)
894 await self.disconnect_controller(controller)
895
896 async def destroy_model(self, model_name: str, total_timeout: float):
897 """
898 Destroy model
899
900 :param: model_name: Model name
901 :param: total_timeout: Timeout
902 """
903
904 controller = await self.get_controller()
905 model = None
906 try:
907 if not await self.model_exists(model_name, controller=controller):
908 return
909
910 model = await self.get_model(controller, model_name)
911 self.log.debug("Destroying model {}".format(model_name))
912 uuid = model.info.uuid
913
914 # Destroy machines that are manually provisioned
915 # and still are in pending state
916 await self._destroy_pending_machines(model, only_manual=True)
917
918 # Disconnect model
919 await self.disconnect_model(model)
920
921 await controller.destroy_model(uuid, force=True, max_wait=0)
922
923 # Wait until model is destroyed
924 self.log.debug("Waiting for model {} to be destroyed...".format(model_name))
925
926 if total_timeout is None:
927 total_timeout = 3600
928 end = time.time() + total_timeout
929 while time.time() < end:
930 models = await controller.list_models()
931 if model_name not in models:
932 self.log.debug(
933 "The model {} ({}) was destroyed".format(model_name, uuid)
934 )
935 return
936 await asyncio.sleep(5)
937 raise Exception(
938 "Timeout waiting for model {} to be destroyed".format(model_name)
939 )
940 except Exception as e:
941 if model:
942 await self.disconnect_model(model)
943 raise e
944 finally:
945 await self.disconnect_controller(controller)
946
947 async def destroy_application(
948 self, model_name: str, application_name: str, total_timeout: float
949 ):
950 """
951 Destroy application
952
953 :param: model_name: Model name
954 :param: application_name: Application name
955 :param: total_timeout: Timeout
956 """
957
958 controller = await self.get_controller()
959 model = None
960
961 try:
962 model = await self.get_model(controller, model_name)
963 self.log.debug(
964 "Destroying application {} in model {}".format(
965 application_name, model_name
966 )
967 )
968 application = self._get_application(model, application_name)
969 if application:
970 await application.destroy()
971 else:
972 self.log.warning("Application not found: {}".format(application_name))
973
974 self.log.debug(
975 "Waiting for application {} to be destroyed in model {}...".format(
976 application_name, model_name
977 )
978 )
979 if total_timeout is None:
980 total_timeout = 3600
981 end = time.time() + total_timeout
982 while time.time() < end:
983 if not self._get_application(model, application_name):
984 self.log.debug(
985 "The application {} was destroyed in model {} ".format(
986 application_name, model_name
987 )
988 )
989 return
990 await asyncio.sleep(5)
991 raise Exception(
992 "Timeout waiting for application {} to be destroyed in model {}".format(
993 application_name, model_name
994 )
995 )
996 finally:
997 if model is not None:
998 await self.disconnect_model(model)
999 await self.disconnect_controller(controller)
1000
1001 async def _destroy_pending_machines(self, model: Model, only_manual: bool = False):
1002 """
1003 Destroy pending machines in a given model
1004
1005 :param: only_manual: Bool that indicates only manually provisioned
1006 machines should be destroyed (if True), or that
1007 all pending machines should be destroyed
1008 """
1009 status = await model.get_status()
1010 for machine_id in status.machines:
1011 machine_status = status.machines[machine_id]
1012 if machine_status.agent_status.status == "pending":
1013 if only_manual and not machine_status.instance_id.startswith("manual:"):
1014 break
1015 machine = model.machines[machine_id]
1016 await machine.destroy(force=True)
1017
1018 async def configure_application(
1019 self, model_name: str, application_name: str, config: dict = None
1020 ):
1021 """Configure application
1022
1023 :param: model_name: Model name
1024 :param: application_name: Application name
1025 :param: config: Config to apply to the charm
1026 """
1027 self.log.debug("Configuring application {}".format(application_name))
1028
1029 if config:
1030 controller = await self.get_controller()
1031 model = None
1032 try:
1033 model = await self.get_model(controller, model_name)
1034 application = self._get_application(
1035 model,
1036 application_name=application_name,
1037 )
1038 await application.set_config(config)
1039 finally:
1040 if model:
1041 await self.disconnect_model(model)
1042 await self.disconnect_controller(controller)
1043
1044 def _get_api_endpoints_db(self) -> [str]:
1045 """
1046 Get API Endpoints from DB
1047
1048 :return: List of API endpoints
1049 """
1050 self.log.debug("Getting endpoints from database")
1051
1052 juju_info = self.db.get_one(
1053 DB_DATA.api_endpoints.table,
1054 q_filter=DB_DATA.api_endpoints.filter,
1055 fail_on_empty=False,
1056 )
1057 if juju_info and DB_DATA.api_endpoints.key in juju_info:
1058 return juju_info[DB_DATA.api_endpoints.key]
1059
1060 def _update_api_endpoints_db(self, endpoints: [str]):
1061 """
1062 Update API endpoints in Database
1063
1064 :param: List of endpoints
1065 """
1066 self.log.debug("Saving endpoints {} in database".format(endpoints))
1067
1068 juju_info = self.db.get_one(
1069 DB_DATA.api_endpoints.table,
1070 q_filter=DB_DATA.api_endpoints.filter,
1071 fail_on_empty=False,
1072 )
1073 # If it doesn't, then create it
1074 if not juju_info:
1075 try:
1076 self.db.create(
1077 DB_DATA.api_endpoints.table,
1078 DB_DATA.api_endpoints.filter,
1079 )
1080 except DbException as e:
1081 # Racing condition: check if another N2VC worker has created it
1082 juju_info = self.db.get_one(
1083 DB_DATA.api_endpoints.table,
1084 q_filter=DB_DATA.api_endpoints.filter,
1085 fail_on_empty=False,
1086 )
1087 if not juju_info:
1088 raise e
1089 self.db.set_one(
1090 DB_DATA.api_endpoints.table,
1091 DB_DATA.api_endpoints.filter,
1092 {DB_DATA.api_endpoints.key: endpoints},
1093 )
1094
1095 def handle_exception(self, loop, context):
1096 # All unhandled exceptions by libjuju are handled here.
1097 pass
1098
1099 async def health_check(self, interval: float = 300.0):
1100 """
1101 Health check to make sure controller and controller_model connections are OK
1102
1103 :param: interval: Time in seconds between checks
1104 """
1105 controller = None
1106 while True:
1107 try:
1108 controller = await self.get_controller()
1109 # self.log.debug("VCA is alive")
1110 except Exception as e:
1111 self.log.error("Health check to VCA failed: {}".format(e))
1112 finally:
1113 await self.disconnect_controller(controller)
1114 await asyncio.sleep(interval)
1115
1116 async def list_models(self, contains: str = None) -> [str]:
1117 """List models with certain names
1118
1119 :param: contains: String that is contained in model name
1120
1121 :retur: [models] Returns list of model names
1122 """
1123
1124 controller = await self.get_controller()
1125 try:
1126 models = await controller.list_models()
1127 if contains:
1128 models = [model for model in models if contains in model]
1129 return models
1130 finally:
1131 await self.disconnect_controller(controller)
1132
1133 async def list_offers(self, model_name: str) -> QueryApplicationOffersResults:
1134 """List models with certain names
1135
1136 :param: model_name: Model name
1137
1138 :return: Returns list of offers
1139 """
1140
1141 controller = await self.get_controller()
1142 try:
1143 return await controller.list_offers(model_name)
1144 finally:
1145 await self.disconnect_controller(controller)
1146
1147 async def add_k8s(
1148 self,
1149 name: str,
1150 rbac_id: str,
1151 token: str,
1152 client_cert_data: str,
1153 configuration: Configuration,
1154 storage_class: str,
1155 credential_name: str = None,
1156 ):
1157 """
1158 Add a Kubernetes cloud to the controller
1159
1160 Similar to the `juju add-k8s` command in the CLI
1161
1162 :param: name: Name for the K8s cloud
1163 :param: configuration: Kubernetes configuration object
1164 :param: storage_class: Storage Class to use in the cloud
1165 :param: credential_name: Storage Class to use in the cloud
1166 """
1167
1168 if not storage_class:
1169 raise Exception("storage_class must be a non-empty string")
1170 if not name:
1171 raise Exception("name must be a non-empty string")
1172 if not configuration:
1173 raise Exception("configuration must be provided")
1174
1175 endpoint = configuration.host
1176 credential = self.get_k8s_cloud_credential(
1177 configuration,
1178 client_cert_data,
1179 token,
1180 )
1181 credential.attrs[RBAC_LABEL_KEY_NAME] = rbac_id
1182 cloud = client.Cloud(
1183 type_="kubernetes",
1184 auth_types=[credential.auth_type],
1185 endpoint=endpoint,
1186 ca_certificates=[client_cert_data],
1187 config={
1188 "operator-storage": storage_class,
1189 "workload-storage": storage_class,
1190 },
1191 )
1192
1193 return await self.add_cloud(
1194 name, cloud, credential, credential_name=credential_name
1195 )
1196
1197 def get_k8s_cloud_credential(
1198 self,
1199 configuration: Configuration,
1200 client_cert_data: str,
1201 token: str = None,
1202 ) -> client.CloudCredential:
1203 attrs = {}
1204 # TODO: Test with AKS
1205 key = None # open(configuration.key_file, "r").read()
1206 username = configuration.username
1207 password = configuration.password
1208
1209 if client_cert_data:
1210 attrs["ClientCertificateData"] = client_cert_data
1211 if key:
1212 attrs["ClientKeyData"] = key
1213 if token:
1214 if username or password:
1215 raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass")
1216 attrs["Token"] = token
1217
1218 auth_type = None
1219 if key:
1220 auth_type = "oauth2"
1221 if client_cert_data:
1222 auth_type = "oauth2withcert"
1223 if not token:
1224 raise JujuInvalidK8sConfiguration(
1225 "missing token for auth type {}".format(auth_type)
1226 )
1227 elif username:
1228 if not password:
1229 self.log.debug(
1230 "credential for user {} has empty password".format(username)
1231 )
1232 attrs["username"] = username
1233 attrs["password"] = password
1234 if client_cert_data:
1235 auth_type = "userpasswithcert"
1236 else:
1237 auth_type = "userpass"
1238 elif client_cert_data and token:
1239 auth_type = "certificate"
1240 else:
1241 raise JujuInvalidK8sConfiguration("authentication method not supported")
1242 return client.CloudCredential(auth_type=auth_type, attrs=attrs)
1243
1244 async def add_cloud(
1245 self,
1246 name: str,
1247 cloud: Cloud,
1248 credential: CloudCredential = None,
1249 credential_name: str = None,
1250 ) -> Cloud:
1251 """
1252 Add cloud to the controller
1253
1254 :param: name: Name of the cloud to be added
1255 :param: cloud: Cloud object
1256 :param: credential: CloudCredentials object for the cloud
1257 :param: credential_name: Credential name.
1258 If not defined, cloud of the name will be used.
1259 """
1260 controller = await self.get_controller()
1261 try:
1262 _ = await controller.add_cloud(name, cloud)
1263 if credential:
1264 await controller.add_credential(
1265 credential_name or name, credential=credential, cloud=name
1266 )
1267 # Need to return the object returned by the controller.add_cloud() function
1268 # I'm returning the original value now until this bug is fixed:
1269 # https://github.com/juju/python-libjuju/issues/443
1270 return cloud
1271 finally:
1272 await self.disconnect_controller(controller)
1273
1274 async def remove_cloud(self, name: str):
1275 """
1276 Remove cloud
1277
1278 :param: name: Name of the cloud to be removed
1279 """
1280 controller = await self.get_controller()
1281 try:
1282 await controller.remove_cloud(name)
1283 finally:
1284 await self.disconnect_controller(controller)
1285
1286 async def _get_leader_unit(self, application: Application) -> Unit:
1287 unit = None
1288 for u in application.units:
1289 if await u.is_leader_from_status():
1290 unit = u
1291 break
1292 return unit
1293
1294 async def get_cloud_credentials(self, cloud_name: str, credential_name: str):
1295 controller = await self.get_controller()
1296 try:
1297 facade = client.CloudFacade.from_connection(controller.connection())
1298 cloud_cred_tag = tag.credential(cloud_name, self.username, credential_name)
1299 params = [client.Entity(cloud_cred_tag)]
1300 return (await facade.Credential(params)).results
1301 finally:
1302 await self.disconnect_controller(controller)