Minor improvements to libjuju.py
[osm/N2VC.git] / n2vc / libjuju.py
1 # Copyright 2020 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import logging
17
18 import time
19
20 from juju.errors import JujuAPIError
21 from juju.model import Model
22 from juju.machine import Machine
23 from juju.application import Application
24 from juju.unit import Unit
25 from juju.client._definitions import (
26 FullStatus,
27 QueryApplicationOffersResults,
28 Cloud,
29 CloudCredential,
30 )
31 from juju.controller import Controller
32 from juju.client import client
33 from juju import tag
34
35 from n2vc.juju_watcher import JujuModelWatcher
36 from n2vc.provisioner import AsyncSSHProvisioner
37 from n2vc.n2vc_conn import N2VCConnector
38 from n2vc.exceptions import (
39 JujuMachineNotFound,
40 JujuApplicationNotFound,
41 JujuLeaderUnitNotFound,
42 JujuActionNotFound,
43 JujuModelAlreadyExists,
44 JujuControllerFailedConnecting,
45 JujuApplicationExists,
46 JujuInvalidK8sConfiguration,
47 )
48 from n2vc.utils import DB_DATA
49 from osm_common.dbbase import DbException
50 from kubernetes.client.configuration import Configuration
51
52 RBAC_LABEL_KEY_NAME = "rbac-id"
53
54
55 class Libjuju:
56 def __init__(
57 self,
58 endpoint: str,
59 api_proxy: str,
60 username: str,
61 password: str,
62 cacert: str,
63 loop: asyncio.AbstractEventLoop = None,
64 log: logging.Logger = None,
65 db: dict = None,
66 n2vc: N2VCConnector = None,
67 apt_mirror: str = None,
68 enable_os_upgrade: bool = True,
69 ):
70 """
71 Constructor
72
73 :param: endpoint: Endpoint of the juju controller (host:port)
74 :param: api_proxy: Endpoint of the juju controller - Reachable from the VNFs
75 :param: username: Juju username
76 :param: password: Juju password
77 :param: cacert: Juju CA Certificate
78 :param: loop: Asyncio loop
79 :param: log: Logger
80 :param: db: DB object
81 :param: n2vc: N2VC object
82 :param: apt_mirror: APT Mirror
83 :param: enable_os_upgrade: Enable OS Upgrade
84 """
85
86 self.log = log or logging.getLogger("Libjuju")
87 self.db = db
88 db_endpoints = self._get_api_endpoints_db()
89 self.endpoints = None
90 if (db_endpoints and endpoint not in db_endpoints) or not db_endpoints:
91 self.endpoints = [endpoint]
92 self._update_api_endpoints_db(self.endpoints)
93 else:
94 self.endpoints = db_endpoints
95 self.api_proxy = api_proxy
96 self.username = username
97 self.password = password
98 self.cacert = cacert
99 self.loop = loop or asyncio.get_event_loop()
100 self.n2vc = n2vc
101
102 # Generate config for models
103 self.model_config = {}
104 if apt_mirror:
105 self.model_config["apt-mirror"] = apt_mirror
106 self.model_config["enable-os-refresh-update"] = enable_os_upgrade
107 self.model_config["enable-os-upgrade"] = enable_os_upgrade
108
109 self.loop.set_exception_handler(self.handle_exception)
110 self.creating_model = asyncio.Lock(loop=self.loop)
111
112 self.models = set()
113 self.log.debug("Libjuju initialized!")
114
115 self.health_check_task = self._create_health_check_task()
116
117 def _create_health_check_task(self):
118 return self.loop.create_task(self.health_check())
119
120 async def get_controller(self, timeout: float = 15.0) -> Controller:
121 """
122 Get controller
123
124 :param: timeout: Time in seconds to wait for controller to connect
125 """
126 controller = None
127 try:
128 controller = Controller(loop=self.loop)
129 await asyncio.wait_for(
130 controller.connect(
131 endpoint=self.endpoints,
132 username=self.username,
133 password=self.password,
134 cacert=self.cacert,
135 ),
136 timeout=timeout,
137 )
138 endpoints = await controller.api_endpoints
139 if self.endpoints != endpoints:
140 self.endpoints = endpoints
141 self._update_api_endpoints_db(self.endpoints)
142 return controller
143 except asyncio.CancelledError as e:
144 raise e
145 except Exception as e:
146 self.log.error(
147 "Failed connecting to controller: {}...".format(self.endpoints)
148 )
149 if controller:
150 await self.disconnect_controller(controller)
151 raise JujuControllerFailedConnecting(e)
152
153 async def disconnect(self):
154 """Disconnect"""
155 # Cancel health check task
156 self.health_check_task.cancel()
157 self.log.debug("Libjuju disconnected!")
158
159 async def disconnect_model(self, model: Model):
160 """
161 Disconnect model
162
163 :param: model: Model that will be disconnected
164 """
165 await model.disconnect()
166
167 async def disconnect_controller(self, controller: Controller):
168 """
169 Disconnect controller
170
171 :param: controller: Controller that will be disconnected
172 """
173 if controller:
174 await controller.disconnect()
175
176 async def add_model(self, model_name: str, cloud_name: str, credential_name=None):
177 """
178 Create model
179
180 :param: model_name: Model name
181 :param: cloud_name: Cloud name
182 :param: credential_name: Credential name to use for adding the model
183 If not specified, same name as the cloud will be used.
184 """
185
186 # Get controller
187 controller = await self.get_controller()
188 model = None
189 try:
190 # Raise exception if model already exists
191 if await self.model_exists(model_name, controller=controller):
192 raise JujuModelAlreadyExists(
193 "Model {} already exists.".format(model_name)
194 )
195
196 # Block until other workers have finished model creation
197 while self.creating_model.locked():
198 await asyncio.sleep(0.1)
199
200 # If the model exists, return it from the controller
201 if model_name in self.models:
202 return
203
204 # Create the model
205 async with self.creating_model:
206 self.log.debug("Creating model {}".format(model_name))
207 model = await controller.add_model(
208 model_name,
209 config=self.model_config,
210 cloud_name=cloud_name,
211 credential_name=credential_name or cloud_name,
212 )
213 self.models.add(model_name)
214 finally:
215 if model:
216 await self.disconnect_model(model)
217 await self.disconnect_controller(controller)
218
219 async def get_model(
220 self, controller: Controller, model_name: str, id=None
221 ) -> Model:
222 """
223 Get model from controller
224
225 :param: controller: Controller
226 :param: model_name: Model name
227
228 :return: Model: The created Juju model object
229 """
230 return await controller.get_model(model_name)
231
232 async def model_exists(
233 self, model_name: str, controller: Controller = None
234 ) -> bool:
235 """
236 Check if model exists
237
238 :param: controller: Controller
239 :param: model_name: Model name
240
241 :return bool
242 """
243 need_to_disconnect = False
244
245 # Get controller if not passed
246 if not controller:
247 controller = await self.get_controller()
248 need_to_disconnect = True
249
250 # Check if model exists
251 try:
252 return model_name in await controller.list_models()
253 finally:
254 if need_to_disconnect:
255 await self.disconnect_controller(controller)
256
257 async def models_exist(self, model_names: [str]) -> (bool, list):
258 """
259 Check if models exists
260
261 :param: model_names: List of strings with model names
262
263 :return (bool, list[str]): (True if all models exists, List of model names that don't exist)
264 """
265 if not model_names:
266 raise Exception(
267 "model_names must be a non-empty array. Given value: {}".format(
268 model_names
269 )
270 )
271 non_existing_models = []
272 models = await self.list_models()
273 existing_models = list(set(models).intersection(model_names))
274 non_existing_models = list(set(model_names) - set(existing_models))
275
276 return (
277 len(non_existing_models) == 0,
278 non_existing_models,
279 )
280
281 async def get_model_status(self, model_name: str) -> FullStatus:
282 """
283 Get model status
284
285 :param: model_name: Model name
286
287 :return: Full status object
288 """
289 controller = await self.get_controller()
290 model = await self.get_model(controller, model_name)
291 try:
292 return await model.get_status()
293 finally:
294 await self.disconnect_model(model)
295 await self.disconnect_controller(controller)
296
297 async def create_machine(
298 self,
299 model_name: str,
300 machine_id: str = None,
301 db_dict: dict = None,
302 progress_timeout: float = None,
303 total_timeout: float = None,
304 series: str = "xenial",
305 wait: bool = True,
306 ) -> (Machine, bool):
307 """
308 Create machine
309
310 :param: model_name: Model name
311 :param: machine_id: Machine id
312 :param: db_dict: Dictionary with data of the DB to write the updates
313 :param: progress_timeout: Maximum time between two updates in the model
314 :param: total_timeout: Timeout for the entity to be active
315 :param: series: Series of the machine (xenial, bionic, focal, ...)
316 :param: wait: Wait until machine is ready
317
318 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
319 if the machine is new or it already existed
320 """
321 new = False
322 machine = None
323
324 self.log.debug(
325 "Creating machine (id={}) in model: {}".format(machine_id, model_name)
326 )
327
328 # Get controller
329 controller = await self.get_controller()
330
331 # Get model
332 model = await self.get_model(controller, model_name)
333 try:
334 if machine_id is not None:
335 self.log.debug(
336 "Searching machine (id={}) in model {}".format(
337 machine_id, model_name
338 )
339 )
340
341 # Get machines from model and get the machine with machine_id if exists
342 machines = await model.get_machines()
343 if machine_id in machines:
344 self.log.debug(
345 "Machine (id={}) found in model {}".format(
346 machine_id, model_name
347 )
348 )
349 machine = machines[machine_id]
350 else:
351 raise JujuMachineNotFound("Machine {} not found".format(machine_id))
352
353 if machine is None:
354 self.log.debug("Creating a new machine in model {}".format(model_name))
355
356 # Create machine
357 machine = await model.add_machine(
358 spec=None, constraints=None, disks=None, series=series
359 )
360 new = True
361
362 # Wait until the machine is ready
363 self.log.debug(
364 "Wait until machine {} is ready in model {}".format(
365 machine.entity_id, model_name
366 )
367 )
368 if wait:
369 await JujuModelWatcher.wait_for(
370 model=model,
371 entity=machine,
372 progress_timeout=progress_timeout,
373 total_timeout=total_timeout,
374 db_dict=db_dict,
375 n2vc=self.n2vc,
376 )
377 finally:
378 await self.disconnect_model(model)
379 await self.disconnect_controller(controller)
380
381 self.log.debug(
382 "Machine {} ready at {} in model {}".format(
383 machine.entity_id, machine.dns_name, model_name
384 )
385 )
386 return machine, new
387
388 async def provision_machine(
389 self,
390 model_name: str,
391 hostname: str,
392 username: str,
393 private_key_path: str,
394 db_dict: dict = None,
395 progress_timeout: float = None,
396 total_timeout: float = None,
397 ) -> str:
398 """
399 Manually provisioning of a machine
400
401 :param: model_name: Model name
402 :param: hostname: IP to access the machine
403 :param: username: Username to login to the machine
404 :param: private_key_path: Local path for the private key
405 :param: db_dict: Dictionary with data of the DB to write the updates
406 :param: progress_timeout: Maximum time between two updates in the model
407 :param: total_timeout: Timeout for the entity to be active
408
409 :return: (Entity): Machine id
410 """
411 self.log.debug(
412 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
413 model_name, hostname, username
414 )
415 )
416
417 # Get controller
418 controller = await self.get_controller()
419
420 # Get model
421 model = await self.get_model(controller, model_name)
422
423 try:
424 # Get provisioner
425 provisioner = AsyncSSHProvisioner(
426 host=hostname,
427 user=username,
428 private_key_path=private_key_path,
429 log=self.log,
430 )
431
432 # Provision machine
433 params = await provisioner.provision_machine()
434
435 params.jobs = ["JobHostUnits"]
436
437 self.log.debug("Adding machine to model")
438 connection = model.connection()
439 client_facade = client.ClientFacade.from_connection(connection)
440
441 results = await client_facade.AddMachines(params=[params])
442 error = results.machines[0].error
443
444 if error:
445 msg = "Error adding machine: {}".format(error.message)
446 self.log.error(msg=msg)
447 raise ValueError(msg)
448
449 machine_id = results.machines[0].machine
450
451 self.log.debug("Installing Juju agent into machine {}".format(machine_id))
452 asyncio.ensure_future(
453 provisioner.install_agent(
454 connection=connection,
455 nonce=params.nonce,
456 machine_id=machine_id,
457 proxy=self.api_proxy,
458 series=params.series,
459 )
460 )
461
462 machine = None
463 for _ in range(10):
464 machine_list = await model.get_machines()
465 if machine_id in machine_list:
466 self.log.debug("Machine {} found in model!".format(machine_id))
467 machine = model.machines.get(machine_id)
468 break
469 await asyncio.sleep(2)
470
471 if machine is None:
472 msg = "Machine {} not found in model".format(machine_id)
473 self.log.error(msg=msg)
474 raise JujuMachineNotFound(msg)
475
476 self.log.debug(
477 "Wait until machine {} is ready in model {}".format(
478 machine.entity_id, model_name
479 )
480 )
481 await JujuModelWatcher.wait_for(
482 model=model,
483 entity=machine,
484 progress_timeout=progress_timeout,
485 total_timeout=total_timeout,
486 db_dict=db_dict,
487 n2vc=self.n2vc,
488 )
489 except Exception as e:
490 raise e
491 finally:
492 await self.disconnect_model(model)
493 await self.disconnect_controller(controller)
494
495 self.log.debug(
496 "Machine provisioned {} in model {}".format(machine_id, model_name)
497 )
498
499 return machine_id
500
501 async def deploy(
502 self, uri: str, model_name: str, wait: bool = True, timeout: float = 3600
503 ):
504 """
505 Deploy bundle or charm: Similar to the juju CLI command `juju deploy`
506
507 :param: uri: Path or Charm Store uri in which the charm or bundle can be found
508 :param: model_name: Model name
509 :param: wait: Indicates whether to wait or not until all applications are active
510 :param: timeout: Time in seconds to wait until all applications are active
511 """
512 controller = await self.get_controller()
513 model = await self.get_model(controller, model_name)
514 try:
515 await model.deploy(uri)
516 if wait:
517 await JujuModelWatcher.wait_for_model(model, timeout=timeout)
518 self.log.debug("All units active in model {}".format(model_name))
519 finally:
520 await self.disconnect_model(model)
521 await self.disconnect_controller(controller)
522
523 async def deploy_charm(
524 self,
525 application_name: str,
526 path: str,
527 model_name: str,
528 machine_id: str,
529 db_dict: dict = None,
530 progress_timeout: float = None,
531 total_timeout: float = None,
532 config: dict = None,
533 series: str = None,
534 num_units: int = 1,
535 ):
536 """Deploy charm
537
538 :param: application_name: Application name
539 :param: path: Local path to the charm
540 :param: model_name: Model name
541 :param: machine_id ID of the machine
542 :param: db_dict: Dictionary with data of the DB to write the updates
543 :param: progress_timeout: Maximum time between two updates in the model
544 :param: total_timeout: Timeout for the entity to be active
545 :param: config: Config for the charm
546 :param: series: Series of the charm
547 :param: num_units: Number of units
548
549 :return: (juju.application.Application): Juju application
550 """
551 self.log.debug(
552 "Deploying charm {} to machine {} in model ~{}".format(
553 application_name, machine_id, model_name
554 )
555 )
556 self.log.debug("charm: {}".format(path))
557
558 # Get controller
559 controller = await self.get_controller()
560
561 # Get model
562 model = await self.get_model(controller, model_name)
563
564 try:
565 application = None
566 if application_name not in model.applications:
567
568 if machine_id is not None:
569 if machine_id not in model.machines:
570 msg = "Machine {} not found in model".format(machine_id)
571 self.log.error(msg=msg)
572 raise JujuMachineNotFound(msg)
573 machine = model.machines[machine_id]
574 series = machine.series
575
576 application = await model.deploy(
577 entity_url=path,
578 application_name=application_name,
579 channel="stable",
580 num_units=1,
581 series=series,
582 to=machine_id,
583 config=config,
584 )
585
586 self.log.debug(
587 "Wait until application {} is ready in model {}".format(
588 application_name, model_name
589 )
590 )
591 if num_units > 1:
592 for _ in range(num_units - 1):
593 m, _ = await self.create_machine(model_name, wait=False)
594 await application.add_unit(to=m.entity_id)
595
596 await JujuModelWatcher.wait_for(
597 model=model,
598 entity=application,
599 progress_timeout=progress_timeout,
600 total_timeout=total_timeout,
601 db_dict=db_dict,
602 n2vc=self.n2vc,
603 )
604 self.log.debug(
605 "Application {} is ready in model {}".format(
606 application_name, model_name
607 )
608 )
609 else:
610 raise JujuApplicationExists(
611 "Application {} exists".format(application_name)
612 )
613 finally:
614 await self.disconnect_model(model)
615 await self.disconnect_controller(controller)
616
617 return application
618
619 def _get_application(self, model: Model, application_name: str) -> Application:
620 """Get application
621
622 :param: model: Model object
623 :param: application_name: Application name
624
625 :return: juju.application.Application (or None if it doesn't exist)
626 """
627 if model.applications and application_name in model.applications:
628 return model.applications[application_name]
629
630 async def execute_action(
631 self,
632 application_name: str,
633 model_name: str,
634 action_name: str,
635 db_dict: dict = None,
636 progress_timeout: float = None,
637 total_timeout: float = None,
638 **kwargs
639 ):
640 """Execute action
641
642 :param: application_name: Application name
643 :param: model_name: Model name
644 :param: action_name: Name of the action
645 :param: db_dict: Dictionary with data of the DB to write the updates
646 :param: progress_timeout: Maximum time between two updates in the model
647 :param: total_timeout: Timeout for the entity to be active
648
649 :return: (str, str): (output and status)
650 """
651 self.log.debug(
652 "Executing action {} using params {}".format(action_name, kwargs)
653 )
654 # Get controller
655 controller = await self.get_controller()
656
657 # Get model
658 model = await self.get_model(controller, model_name)
659
660 try:
661 # Get application
662 application = self._get_application(
663 model,
664 application_name=application_name,
665 )
666 if application is None:
667 raise JujuApplicationNotFound("Cannot execute action")
668
669 # Get leader unit
670 # Racing condition:
671 # Ocassionally, self._get_leader_unit() will return None
672 # because the leader elected hook has not been triggered yet.
673 # Therefore, we are doing some retries. If it happens again,
674 # re-open bug 1236
675 attempts = 3
676 time_between_retries = 10
677 unit = None
678 for _ in range(attempts):
679 unit = await self._get_leader_unit(application)
680 if unit is None:
681 await asyncio.sleep(time_between_retries)
682 else:
683 break
684 if unit is None:
685 raise JujuLeaderUnitNotFound(
686 "Cannot execute action: leader unit not found"
687 )
688
689 actions = await application.get_actions()
690
691 if action_name not in actions:
692 raise JujuActionNotFound(
693 "Action {} not in available actions".format(action_name)
694 )
695
696 action = await unit.run_action(action_name, **kwargs)
697
698 self.log.debug(
699 "Wait until action {} is completed in application {} (model={})".format(
700 action_name, application_name, model_name
701 )
702 )
703 await JujuModelWatcher.wait_for(
704 model=model,
705 entity=action,
706 progress_timeout=progress_timeout,
707 total_timeout=total_timeout,
708 db_dict=db_dict,
709 n2vc=self.n2vc,
710 )
711
712 output = await model.get_action_output(action_uuid=action.entity_id)
713 status = await model.get_action_status(uuid_or_prefix=action.entity_id)
714 status = (
715 status[action.entity_id] if action.entity_id in status else "failed"
716 )
717
718 self.log.debug(
719 "Action {} completed with status {} in application {} (model={})".format(
720 action_name, action.status, application_name, model_name
721 )
722 )
723 finally:
724 await self.disconnect_model(model)
725 await self.disconnect_controller(controller)
726
727 return output, status
728
729 async def get_actions(self, application_name: str, model_name: str) -> dict:
730 """Get list of actions
731
732 :param: application_name: Application name
733 :param: model_name: Model name
734
735 :return: Dict with this format
736 {
737 "action_name": "Description of the action",
738 ...
739 }
740 """
741 self.log.debug(
742 "Getting list of actions for application {}".format(application_name)
743 )
744
745 # Get controller
746 controller = await self.get_controller()
747
748 # Get model
749 model = await self.get_model(controller, model_name)
750
751 try:
752 # Get application
753 application = self._get_application(
754 model,
755 application_name=application_name,
756 )
757
758 # Return list of actions
759 return await application.get_actions()
760
761 finally:
762 # Disconnect from model and controller
763 await self.disconnect_model(model)
764 await self.disconnect_controller(controller)
765
766 async def get_metrics(self, model_name: str, application_name: str) -> dict:
767 """Get the metrics collected by the VCA.
768
769 :param model_name The name or unique id of the network service
770 :param application_name The name of the application
771 """
772 if not model_name or not application_name:
773 raise Exception("model_name and application_name must be non-empty strings")
774 metrics = {}
775 controller = await self.get_controller()
776 model = await self.get_model(controller, model_name)
777 try:
778 application = self._get_application(model, application_name)
779 if application is not None:
780 metrics = await application.get_metrics()
781 finally:
782 self.disconnect_model(model)
783 self.disconnect_controller(controller)
784 return metrics
785
786 async def add_relation(
787 self,
788 model_name: str,
789 endpoint_1: str,
790 endpoint_2: str,
791 ):
792 """Add relation
793
794 :param: model_name: Model name
795 :param: endpoint_1 First endpoint name
796 ("app:endpoint" format or directly the saas name)
797 :param: endpoint_2: Second endpoint name (^ same format)
798 """
799
800 self.log.debug("Adding relation: {} -> {}".format(endpoint_1, endpoint_2))
801
802 # Get controller
803 controller = await self.get_controller()
804
805 # Get model
806 model = await self.get_model(controller, model_name)
807
808 # Add relation
809 try:
810 await model.add_relation(endpoint_1, endpoint_2)
811 except JujuAPIError as e:
812 if "not found" in e.message:
813 self.log.warning("Relation not found: {}".format(e.message))
814 return
815 if "already exists" in e.message:
816 self.log.warning("Relation already exists: {}".format(e.message))
817 return
818 # another exception, raise it
819 raise e
820 finally:
821 await self.disconnect_model(model)
822 await self.disconnect_controller(controller)
823
824 async def consume(
825 self,
826 offer_url: str,
827 model_name: str,
828 ):
829 """
830 Adds a remote offer to the model. Relations can be created later using "juju relate".
831
832 :param: offer_url: Offer Url
833 :param: model_name: Model name
834
835 :raises ParseError if there's a problem parsing the offer_url
836 :raises JujuError if remote offer includes and endpoint
837 :raises JujuAPIError if the operation is not successful
838 """
839 controller = await self.get_controller()
840 model = await controller.get_model(model_name)
841
842 try:
843 await model.consume(offer_url)
844 finally:
845 await self.disconnect_model(model)
846 await self.disconnect_controller(controller)
847
848 async def destroy_model(self, model_name: str, total_timeout: float):
849 """
850 Destroy model
851
852 :param: model_name: Model name
853 :param: total_timeout: Timeout
854 """
855
856 controller = await self.get_controller()
857 model = None
858 try:
859 model = await self.get_model(controller, model_name)
860 self.log.debug("Destroying model {}".format(model_name))
861 uuid = model.info.uuid
862
863 # Destroy machines that are manually provisioned
864 # and still are in pending state
865 await self._destroy_pending_machines(model, only_manual=True)
866
867 # Disconnect model
868 await self.disconnect_model(model)
869
870 # Destroy model
871 if model_name in self.models:
872 self.models.remove(model_name)
873
874 await controller.destroy_model(uuid, force=True, max_wait=0)
875
876 # Wait until model is destroyed
877 self.log.debug("Waiting for model {} to be destroyed...".format(model_name))
878
879 if total_timeout is None:
880 total_timeout = 3600
881 end = time.time() + total_timeout
882 while time.time() < end:
883 models = await controller.list_models()
884 if model_name not in models:
885 self.log.debug(
886 "The model {} ({}) was destroyed".format(model_name, uuid)
887 )
888 return
889 await asyncio.sleep(5)
890 raise Exception(
891 "Timeout waiting for model {} to be destroyed".format(model_name)
892 )
893 except Exception as e:
894 if model:
895 await self.disconnect_model(model)
896 raise e
897 finally:
898 await self.disconnect_controller(controller)
899
900 async def destroy_application(self, model: Model, application_name: str):
901 """
902 Destroy application
903
904 :param: model: Model object
905 :param: application_name: Application name
906 """
907 self.log.debug(
908 "Destroying application {} in model {}".format(
909 application_name, model.info.name
910 )
911 )
912 application = model.applications.get(application_name)
913 if application:
914 await application.destroy()
915 else:
916 self.log.warning("Application not found: {}".format(application_name))
917
918 async def _destroy_pending_machines(self, model: Model, only_manual: bool = False):
919 """
920 Destroy pending machines in a given model
921
922 :param: only_manual: Bool that indicates only manually provisioned
923 machines should be destroyed (if True), or that
924 all pending machines should be destroyed
925 """
926 status = await model.get_status()
927 for machine_id in status.machines:
928 machine_status = status.machines[machine_id]
929 if machine_status.agent_status.status == "pending":
930 if only_manual and not machine_status.instance_id.startswith("manual:"):
931 break
932 machine = model.machines[machine_id]
933 await machine.destroy(force=True)
934
935 async def configure_application(
936 self, model_name: str, application_name: str, config: dict = None
937 ):
938 """Configure application
939
940 :param: model_name: Model name
941 :param: application_name: Application name
942 :param: config: Config to apply to the charm
943 """
944 self.log.debug("Configuring application {}".format(application_name))
945
946 if config:
947 controller = await self.get_controller()
948 model = None
949 try:
950 model = await self.get_model(controller, model_name)
951 application = self._get_application(
952 model,
953 application_name=application_name,
954 )
955 await application.set_config(config)
956 finally:
957 if model:
958 await self.disconnect_model(model)
959 await self.disconnect_controller(controller)
960
961 def _get_api_endpoints_db(self) -> [str]:
962 """
963 Get API Endpoints from DB
964
965 :return: List of API endpoints
966 """
967 self.log.debug("Getting endpoints from database")
968
969 juju_info = self.db.get_one(
970 DB_DATA.api_endpoints.table,
971 q_filter=DB_DATA.api_endpoints.filter,
972 fail_on_empty=False,
973 )
974 if juju_info and DB_DATA.api_endpoints.key in juju_info:
975 return juju_info[DB_DATA.api_endpoints.key]
976
977 def _update_api_endpoints_db(self, endpoints: [str]):
978 """
979 Update API endpoints in Database
980
981 :param: List of endpoints
982 """
983 self.log.debug("Saving endpoints {} in database".format(endpoints))
984
985 juju_info = self.db.get_one(
986 DB_DATA.api_endpoints.table,
987 q_filter=DB_DATA.api_endpoints.filter,
988 fail_on_empty=False,
989 )
990 # If it doesn't, then create it
991 if not juju_info:
992 try:
993 self.db.create(
994 DB_DATA.api_endpoints.table,
995 DB_DATA.api_endpoints.filter,
996 )
997 except DbException as e:
998 # Racing condition: check if another N2VC worker has created it
999 juju_info = self.db.get_one(
1000 DB_DATA.api_endpoints.table,
1001 q_filter=DB_DATA.api_endpoints.filter,
1002 fail_on_empty=False,
1003 )
1004 if not juju_info:
1005 raise e
1006 self.db.set_one(
1007 DB_DATA.api_endpoints.table,
1008 DB_DATA.api_endpoints.filter,
1009 {DB_DATA.api_endpoints.key: endpoints},
1010 )
1011
1012 def handle_exception(self, loop, context):
1013 # All unhandled exceptions by libjuju are handled here.
1014 pass
1015
1016 async def health_check(self, interval: float = 300.0):
1017 """
1018 Health check to make sure controller and controller_model connections are OK
1019
1020 :param: interval: Time in seconds between checks
1021 """
1022 controller = None
1023 while True:
1024 try:
1025 controller = await self.get_controller()
1026 # self.log.debug("VCA is alive")
1027 except Exception as e:
1028 self.log.error("Health check to VCA failed: {}".format(e))
1029 finally:
1030 await self.disconnect_controller(controller)
1031 await asyncio.sleep(interval)
1032
1033 async def list_models(self, contains: str = None) -> [str]:
1034 """List models with certain names
1035
1036 :param: contains: String that is contained in model name
1037
1038 :retur: [models] Returns list of model names
1039 """
1040
1041 controller = await self.get_controller()
1042 try:
1043 models = await controller.list_models()
1044 if contains:
1045 models = [model for model in models if contains in model]
1046 return models
1047 finally:
1048 await self.disconnect_controller(controller)
1049
1050 async def list_offers(self, model_name: str) -> QueryApplicationOffersResults:
1051 """List models with certain names
1052
1053 :param: model_name: Model name
1054
1055 :return: Returns list of offers
1056 """
1057
1058 controller = await self.get_controller()
1059 try:
1060 return await controller.list_offers(model_name)
1061 finally:
1062 await self.disconnect_controller(controller)
1063
1064 async def add_k8s(
1065 self,
1066 name: str,
1067 rbac_id: str,
1068 token: str,
1069 client_cert_data: str,
1070 configuration: Configuration,
1071 storage_class: str,
1072 credential_name: str = None,
1073 ):
1074 """
1075 Add a Kubernetes cloud to the controller
1076
1077 Similar to the `juju add-k8s` command in the CLI
1078
1079 :param: name: Name for the K8s cloud
1080 :param: configuration: Kubernetes configuration object
1081 :param: storage_class: Storage Class to use in the cloud
1082 :param: credential_name: Storage Class to use in the cloud
1083 """
1084
1085 if not storage_class:
1086 raise Exception("storage_class must be a non-empty string")
1087 if not name:
1088 raise Exception("name must be a non-empty string")
1089 if not configuration:
1090 raise Exception("configuration must be provided")
1091
1092 endpoint = configuration.host
1093 credential = self.get_k8s_cloud_credential(
1094 configuration,
1095 client_cert_data,
1096 token,
1097 )
1098 credential.attrs[RBAC_LABEL_KEY_NAME] = rbac_id
1099 cloud = client.Cloud(
1100 type_="kubernetes",
1101 auth_types=[credential.auth_type],
1102 endpoint=endpoint,
1103 ca_certificates=[client_cert_data],
1104 config={
1105 "operator-storage": storage_class,
1106 "workload-storage": storage_class,
1107 },
1108 )
1109
1110 return await self.add_cloud(
1111 name, cloud, credential, credential_name=credential_name
1112 )
1113
1114 def get_k8s_cloud_credential(
1115 self,
1116 configuration: Configuration,
1117 client_cert_data: str,
1118 token: str = None,
1119 ) -> client.CloudCredential:
1120 attrs = {}
1121 # TODO: Test with AKS
1122 key = None # open(configuration.key_file, "r").read()
1123 username = configuration.username
1124 password = configuration.password
1125
1126 if client_cert_data:
1127 attrs["ClientCertificateData"] = client_cert_data
1128 if key:
1129 attrs["ClientKeyData"] = key
1130 if token:
1131 if username or password:
1132 raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass")
1133 attrs["Token"] = token
1134
1135 auth_type = None
1136 if key:
1137 auth_type = "oauth2"
1138 if client_cert_data:
1139 auth_type = "oauth2withcert"
1140 if not token:
1141 raise JujuInvalidK8sConfiguration(
1142 "missing token for auth type {}".format(auth_type)
1143 )
1144 elif username:
1145 if not password:
1146 self.log.debug(
1147 "credential for user {} has empty password".format(username)
1148 )
1149 attrs["username"] = username
1150 attrs["password"] = password
1151 if client_cert_data:
1152 auth_type = "userpasswithcert"
1153 else:
1154 auth_type = "userpass"
1155 elif client_cert_data and token:
1156 auth_type = "certificate"
1157 else:
1158 raise JujuInvalidK8sConfiguration("authentication method not supported")
1159 return client.CloudCredential(auth_type=auth_type, attrs=attrs)
1160
1161 async def add_cloud(
1162 self,
1163 name: str,
1164 cloud: Cloud,
1165 credential: CloudCredential = None,
1166 credential_name: str = None,
1167 ) -> Cloud:
1168 """
1169 Add cloud to the controller
1170
1171 :param: name: Name of the cloud to be added
1172 :param: cloud: Cloud object
1173 :param: credential: CloudCredentials object for the cloud
1174 :param: credential_name: Credential name.
1175 If not defined, cloud of the name will be used.
1176 """
1177 controller = await self.get_controller()
1178 try:
1179 _ = await controller.add_cloud(name, cloud)
1180 if credential:
1181 await controller.add_credential(
1182 credential_name or name, credential=credential, cloud=name
1183 )
1184 # Need to return the object returned by the controller.add_cloud() function
1185 # I'm returning the original value now until this bug is fixed:
1186 # https://github.com/juju/python-libjuju/issues/443
1187 return cloud
1188 finally:
1189 await self.disconnect_controller(controller)
1190
1191 async def remove_cloud(self, name: str):
1192 """
1193 Remove cloud
1194
1195 :param: name: Name of the cloud to be removed
1196 """
1197 controller = await self.get_controller()
1198 try:
1199 await controller.remove_cloud(name)
1200 finally:
1201 await self.disconnect_controller(controller)
1202
1203 async def _get_leader_unit(self, application: Application) -> Unit:
1204 unit = None
1205 for u in application.units:
1206 if await u.is_leader_from_status():
1207 unit = u
1208 break
1209 return unit
1210
1211 async def get_cloud_credentials(self, cloud_name: str, credential_name: str):
1212 controller = await self.get_controller()
1213 try:
1214 facade = client.CloudFacade.from_connection(controller.connection())
1215 cloud_cred_tag = tag.credential(cloud_name, self.username, credential_name)
1216 params = [client.Entity(cloud_cred_tag)]
1217 return (await facade.Credential(params)).results
1218 finally:
1219 await self.disconnect_controller(controller)