Minor improvements to libjuju.py
[osm/N2VC.git] / n2vc / libjuju.py
1 # Copyright 2020 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import logging
17
18 import time
19
20 from juju.errors import JujuAPIError
21 from juju.model import Model
22 from juju.machine import Machine
23 from juju.application import Application
24 from juju.unit import Unit
25 from juju.client._definitions import (
26 FullStatus,
27 QueryApplicationOffersResults,
28 Cloud,
29 CloudCredential,
30 )
31 from juju.controller import Controller
32 from juju.client import client
33 from juju import tag
34
35 from n2vc.juju_watcher import JujuModelWatcher
36 from n2vc.provisioner import AsyncSSHProvisioner
37 from n2vc.n2vc_conn import N2VCConnector
38 from n2vc.exceptions import (
39 JujuMachineNotFound,
40 JujuApplicationNotFound,
41 JujuLeaderUnitNotFound,
42 JujuActionNotFound,
43 JujuModelAlreadyExists,
44 JujuControllerFailedConnecting,
45 JujuApplicationExists,
46 JujuInvalidK8sConfiguration,
47 )
48 from n2vc.utils import DB_DATA
49 from osm_common.dbbase import DbException
50 from kubernetes.client.configuration import Configuration
51
52 RBAC_LABEL_KEY_NAME = "rbac-id"
53
54
55 class Libjuju:
56 def __init__(
57 self,
58 endpoint: str,
59 api_proxy: str,
60 username: str,
61 password: str,
62 cacert: str,
63 loop: asyncio.AbstractEventLoop = None,
64 log: logging.Logger = None,
65 db: dict = None,
66 n2vc: N2VCConnector = None,
67 apt_mirror: str = None,
68 enable_os_upgrade: bool = True,
69 ):
70 """
71 Constructor
72
73 :param: endpoint: Endpoint of the juju controller (host:port)
74 :param: api_proxy: Endpoint of the juju controller - Reachable from the VNFs
75 :param: username: Juju username
76 :param: password: Juju password
77 :param: cacert: Juju CA Certificate
78 :param: loop: Asyncio loop
79 :param: log: Logger
80 :param: db: DB object
81 :param: n2vc: N2VC object
82 :param: apt_mirror: APT Mirror
83 :param: enable_os_upgrade: Enable OS Upgrade
84 """
85
86 self.log = log or logging.getLogger("Libjuju")
87 self.db = db
88 db_endpoints = self._get_api_endpoints_db()
89 self.endpoints = None
90 if (db_endpoints and endpoint not in db_endpoints) or not db_endpoints:
91 self.endpoints = [endpoint]
92 self._update_api_endpoints_db(self.endpoints)
93 else:
94 self.endpoints = db_endpoints
95 self.api_proxy = api_proxy
96 self.username = username
97 self.password = password
98 self.cacert = cacert
99 self.loop = loop or asyncio.get_event_loop()
100 self.n2vc = n2vc
101
102 # Generate config for models
103 self.model_config = {}
104 if apt_mirror:
105 self.model_config["apt-mirror"] = apt_mirror
106 self.model_config["enable-os-refresh-update"] = enable_os_upgrade
107 self.model_config["enable-os-upgrade"] = enable_os_upgrade
108
109 self.loop.set_exception_handler(self.handle_exception)
110 self.creating_model = asyncio.Lock(loop=self.loop)
111
112 self.models = set()
113 self.log.debug("Libjuju initialized!")
114
115 self.health_check_task = self._create_health_check_task()
116
117 def _create_health_check_task(self):
118 return self.loop.create_task(self.health_check())
119
120 async def get_controller(self, timeout: float = 15.0) -> Controller:
121 """
122 Get controller
123
124 :param: timeout: Time in seconds to wait for controller to connect
125 """
126 controller = None
127 try:
128 controller = Controller(loop=self.loop)
129 await asyncio.wait_for(
130 controller.connect(
131 endpoint=self.endpoints,
132 username=self.username,
133 password=self.password,
134 cacert=self.cacert,
135 ),
136 timeout=timeout,
137 )
138 endpoints = await controller.api_endpoints
139 if self.endpoints != endpoints:
140 self.endpoints = endpoints
141 self._update_api_endpoints_db(self.endpoints)
142 return controller
143 except asyncio.CancelledError as e:
144 raise e
145 except Exception as e:
146 self.log.error(
147 "Failed connecting to controller: {}...".format(self.endpoints)
148 )
149 if controller:
150 await self.disconnect_controller(controller)
151 raise JujuControllerFailedConnecting(e)
152
153 async def disconnect(self):
154 """Disconnect"""
155 # Cancel health check task
156 self.health_check_task.cancel()
157 self.log.debug("Libjuju disconnected!")
158
159 async def disconnect_model(self, model: Model):
160 """
161 Disconnect model
162
163 :param: model: Model that will be disconnected
164 """
165 await model.disconnect()
166
167 async def disconnect_controller(self, controller: Controller):
168 """
169 Disconnect controller
170
171 :param: controller: Controller that will be disconnected
172 """
173 if controller:
174 await controller.disconnect()
175
176 async def add_model(self, model_name: str, cloud_name: str, credential_name=None):
177 """
178 Create model
179
180 :param: model_name: Model name
181 :param: cloud_name: Cloud name
182 :param: credential_name: Credential name to use for adding the model
183 If not specified, same name as the cloud will be used.
184 """
185
186 # Get controller
187 controller = await self.get_controller()
188 model = None
189 try:
190 # Raise exception if model already exists
191 if await self.model_exists(model_name, controller=controller):
192 raise JujuModelAlreadyExists(
193 "Model {} already exists.".format(model_name)
194 )
195
196 # Block until other workers have finished model creation
197 while self.creating_model.locked():
198 await asyncio.sleep(0.1)
199
200 # If the model exists, return it from the controller
201 if model_name in self.models:
202 return
203
204 # Create the model
205 async with self.creating_model:
206 self.log.debug("Creating model {}".format(model_name))
207 model = await controller.add_model(
208 model_name,
209 config=self.model_config,
210 cloud_name=cloud_name,
211 credential_name=credential_name or cloud_name,
212 )
213 self.models.add(model_name)
214 finally:
215 if model:
216 await self.disconnect_model(model)
217 await self.disconnect_controller(controller)
218
219 async def get_model(
220 self, controller: Controller, model_name: str, id=None
221 ) -> Model:
222 """
223 Get model from controller
224
225 :param: controller: Controller
226 :param: model_name: Model name
227
228 :return: Model: The created Juju model object
229 """
230 return await controller.get_model(model_name)
231
232 async def model_exists(
233 self, model_name: str, controller: Controller = None
234 ) -> bool:
235 """
236 Check if model exists
237
238 :param: controller: Controller
239 :param: model_name: Model name
240
241 :return bool
242 """
243 need_to_disconnect = False
244
245 # Get controller if not passed
246 if not controller:
247 controller = await self.get_controller()
248 need_to_disconnect = True
249
250 # Check if model exists
251 try:
252 return model_name in await controller.list_models()
253 finally:
254 if need_to_disconnect:
255 await self.disconnect_controller(controller)
256
257 async def models_exist(self, model_names: [str]) -> (bool, list):
258 """
259 Check if models exists
260
261 :param: model_names: List of strings with model names
262
263 :return (bool, list[str]): (True if all models exists, List of model names that don't exist)
264 """
265 if not model_names:
266 raise Exception(
267 "model_names must be a non-empty array. Given value: {}".format(
268 model_names
269 )
270 )
271 non_existing_models = []
272 models = await self.list_models()
273 existing_models = list(set(models).intersection(model_names))
274 non_existing_models = list(set(model_names) - set(existing_models))
275
276 return (
277 len(non_existing_models) == 0,
278 non_existing_models,
279 )
280
281 async def get_model_status(self, model_name: str) -> FullStatus:
282 """
283 Get model status
284
285 :param: model_name: Model name
286
287 :return: Full status object
288 """
289 controller = await self.get_controller()
290 model = await self.get_model(controller, model_name)
291 try:
292 return await model.get_status()
293 finally:
294 await self.disconnect_model(model)
295 await self.disconnect_controller(controller)
296
297 async def create_machine(
298 self,
299 model_name: str,
300 machine_id: str = None,
301 db_dict: dict = None,
302 progress_timeout: float = None,
303 total_timeout: float = None,
304 series: str = "xenial",
305 wait: bool = True,
306 ) -> (Machine, bool):
307 """
308 Create machine
309
310 :param: model_name: Model name
311 :param: machine_id: Machine id
312 :param: db_dict: Dictionary with data of the DB to write the updates
313 :param: progress_timeout: Maximum time between two updates in the model
314 :param: total_timeout: Timeout for the entity to be active
315 :param: series: Series of the machine (xenial, bionic, focal, ...)
316 :param: wait: Wait until machine is ready
317
318 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
319 if the machine is new or it already existed
320 """
321 new = False
322 machine = None
323
324 self.log.debug(
325 "Creating machine (id={}) in model: {}".format(machine_id, model_name)
326 )
327
328 # Get controller
329 controller = await self.get_controller()
330
331 # Get model
332 model = await self.get_model(controller, model_name)
333 try:
334 if machine_id is not None:
335 self.log.debug(
336 "Searching machine (id={}) in model {}".format(
337 machine_id, model_name
338 )
339 )
340
341 # Get machines from model and get the machine with machine_id if exists
342 machines = await model.get_machines()
343 if machine_id in machines:
344 self.log.debug(
345 "Machine (id={}) found in model {}".format(
346 machine_id, model_name
347 )
348 )
349 machine = machines[machine_id]
350 else:
351 raise JujuMachineNotFound("Machine {} not found".format(machine_id))
352
353 if machine is None:
354 self.log.debug("Creating a new machine in model {}".format(model_name))
355
356 # Create machine
357 machine = await model.add_machine(
358 spec=None, constraints=None, disks=None, series=series
359 )
360 new = True
361
362 # Wait until the machine is ready
363 self.log.debug(
364 "Wait until machine {} is ready in model {}".format(
365 machine.entity_id, model_name
366 )
367 )
368 if wait:
369 await JujuModelWatcher.wait_for(
370 model=model,
371 entity=machine,
372 progress_timeout=progress_timeout,
373 total_timeout=total_timeout,
374 db_dict=db_dict,
375 n2vc=self.n2vc,
376 )
377 finally:
378 await self.disconnect_model(model)
379 await self.disconnect_controller(controller)
380
381 self.log.debug(
382 "Machine {} ready at {} in model {}".format(
383 machine.entity_id, machine.dns_name, model_name
384 )
385 )
386 return machine, new
387
388 async def provision_machine(
389 self,
390 model_name: str,
391 hostname: str,
392 username: str,
393 private_key_path: str,
394 db_dict: dict = None,
395 progress_timeout: float = None,
396 total_timeout: float = None,
397 ) -> str:
398 """
399 Manually provisioning of a machine
400
401 :param: model_name: Model name
402 :param: hostname: IP to access the machine
403 :param: username: Username to login to the machine
404 :param: private_key_path: Local path for the private key
405 :param: db_dict: Dictionary with data of the DB to write the updates
406 :param: progress_timeout: Maximum time between two updates in the model
407 :param: total_timeout: Timeout for the entity to be active
408
409 :return: (Entity): Machine id
410 """
411 self.log.debug(
412 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
413 model_name, hostname, username
414 )
415 )
416
417 # Get controller
418 controller = await self.get_controller()
419
420 # Get model
421 model = await self.get_model(controller, model_name)
422
423 try:
424 # Get provisioner
425 provisioner = AsyncSSHProvisioner(
426 host=hostname,
427 user=username,
428 private_key_path=private_key_path,
429 log=self.log,
430 )
431
432 # Provision machine
433 params = await provisioner.provision_machine()
434
435 params.jobs = ["JobHostUnits"]
436
437 self.log.debug("Adding machine to model")
438 connection = model.connection()
439 client_facade = client.ClientFacade.from_connection(connection)
440
441 results = await client_facade.AddMachines(params=[params])
442 error = results.machines[0].error
443
444 if error:
445 msg = "Error adding machine: {}".format(error.message)
446 self.log.error(msg=msg)
447 raise ValueError(msg)
448
449 machine_id = results.machines[0].machine
450
451 self.log.debug("Installing Juju agent into machine {}".format(machine_id))
452 asyncio.ensure_future(
453 provisioner.install_agent(
454 connection=connection,
455 nonce=params.nonce,
456 machine_id=machine_id,
457 proxy=self.api_proxy,
458 )
459 )
460
461 machine = None
462 for _ in range(10):
463 machine_list = await model.get_machines()
464 if machine_id in machine_list:
465 self.log.debug("Machine {} found in model!".format(machine_id))
466 machine = model.machines.get(machine_id)
467 break
468 await asyncio.sleep(2)
469
470 if machine is None:
471 msg = "Machine {} not found in model".format(machine_id)
472 self.log.error(msg=msg)
473 raise JujuMachineNotFound(msg)
474
475 self.log.debug(
476 "Wait until machine {} is ready in model {}".format(
477 machine.entity_id, model_name
478 )
479 )
480 await JujuModelWatcher.wait_for(
481 model=model,
482 entity=machine,
483 progress_timeout=progress_timeout,
484 total_timeout=total_timeout,
485 db_dict=db_dict,
486 n2vc=self.n2vc,
487 )
488 except Exception as e:
489 raise e
490 finally:
491 await self.disconnect_model(model)
492 await self.disconnect_controller(controller)
493
494 self.log.debug(
495 "Machine provisioned {} in model {}".format(machine_id, model_name)
496 )
497
498 return machine_id
499
500 async def deploy(
501 self, uri: str, model_name: str, wait: bool = True, timeout: float = 3600
502 ):
503 """
504 Deploy bundle or charm: Similar to the juju CLI command `juju deploy`
505
506 :param: uri: Path or Charm Store uri in which the charm or bundle can be found
507 :param: model_name: Model name
508 :param: wait: Indicates whether to wait or not until all applications are active
509 :param: timeout: Time in seconds to wait until all applications are active
510 """
511 controller = await self.get_controller()
512 model = await self.get_model(controller, model_name)
513 try:
514 await model.deploy(uri)
515 if wait:
516 await JujuModelWatcher.wait_for_model(model, timeout=timeout)
517 self.log.debug("All units active in model {}".format(model_name))
518 finally:
519 await self.disconnect_model(model)
520 await self.disconnect_controller(controller)
521
522 async def deploy_charm(
523 self,
524 application_name: str,
525 path: str,
526 model_name: str,
527 machine_id: str,
528 db_dict: dict = None,
529 progress_timeout: float = None,
530 total_timeout: float = None,
531 config: dict = None,
532 series: str = None,
533 num_units: int = 1,
534 ):
535 """Deploy charm
536
537 :param: application_name: Application name
538 :param: path: Local path to the charm
539 :param: model_name: Model name
540 :param: machine_id ID of the machine
541 :param: db_dict: Dictionary with data of the DB to write the updates
542 :param: progress_timeout: Maximum time between two updates in the model
543 :param: total_timeout: Timeout for the entity to be active
544 :param: config: Config for the charm
545 :param: series: Series of the charm
546 :param: num_units: Number of units
547
548 :return: (juju.application.Application): Juju application
549 """
550 self.log.debug(
551 "Deploying charm {} to machine {} in model ~{}".format(
552 application_name, machine_id, model_name
553 )
554 )
555 self.log.debug("charm: {}".format(path))
556
557 # Get controller
558 controller = await self.get_controller()
559
560 # Get model
561 model = await self.get_model(controller, model_name)
562
563 try:
564 application = None
565 if application_name not in model.applications:
566
567 if machine_id is not None:
568 if machine_id not in model.machines:
569 msg = "Machine {} not found in model".format(machine_id)
570 self.log.error(msg=msg)
571 raise JujuMachineNotFound(msg)
572 machine = model.machines[machine_id]
573 series = machine.series
574
575 application = await model.deploy(
576 entity_url=path,
577 application_name=application_name,
578 channel="stable",
579 num_units=1,
580 series=series,
581 to=machine_id,
582 config=config,
583 )
584
585 self.log.debug(
586 "Wait until application {} is ready in model {}".format(
587 application_name, model_name
588 )
589 )
590 if num_units > 1:
591 for _ in range(num_units - 1):
592 m, _ = await self.create_machine(model_name, wait=False)
593 await application.add_unit(to=m.entity_id)
594
595 await JujuModelWatcher.wait_for(
596 model=model,
597 entity=application,
598 progress_timeout=progress_timeout,
599 total_timeout=total_timeout,
600 db_dict=db_dict,
601 n2vc=self.n2vc,
602 )
603 self.log.debug(
604 "Application {} is ready in model {}".format(
605 application_name, model_name
606 )
607 )
608 else:
609 raise JujuApplicationExists(
610 "Application {} exists".format(application_name)
611 )
612 finally:
613 await self.disconnect_model(model)
614 await self.disconnect_controller(controller)
615
616 return application
617
618 def _get_application(self, model: Model, application_name: str) -> Application:
619 """Get application
620
621 :param: model: Model object
622 :param: application_name: Application name
623
624 :return: juju.application.Application (or None if it doesn't exist)
625 """
626 if model.applications and application_name in model.applications:
627 return model.applications[application_name]
628
629 async def execute_action(
630 self,
631 application_name: str,
632 model_name: str,
633 action_name: str,
634 db_dict: dict = None,
635 progress_timeout: float = None,
636 total_timeout: float = None,
637 **kwargs
638 ):
639 """Execute action
640
641 :param: application_name: Application name
642 :param: model_name: Model name
643 :param: action_name: Name of the action
644 :param: db_dict: Dictionary with data of the DB to write the updates
645 :param: progress_timeout: Maximum time between two updates in the model
646 :param: total_timeout: Timeout for the entity to be active
647
648 :return: (str, str): (output and status)
649 """
650 self.log.debug(
651 "Executing action {} using params {}".format(action_name, kwargs)
652 )
653 # Get controller
654 controller = await self.get_controller()
655
656 # Get model
657 model = await self.get_model(controller, model_name)
658
659 try:
660 # Get application
661 application = self._get_application(
662 model,
663 application_name=application_name,
664 )
665 if application is None:
666 raise JujuApplicationNotFound("Cannot execute action")
667
668 # Get leader unit
669 # Racing condition:
670 # Ocassionally, self._get_leader_unit() will return None
671 # because the leader elected hook has not been triggered yet.
672 # Therefore, we are doing some retries. If it happens again,
673 # re-open bug 1236
674 attempts = 3
675 time_between_retries = 10
676 unit = None
677 for _ in range(attempts):
678 unit = await self._get_leader_unit(application)
679 if unit is None:
680 await asyncio.sleep(time_between_retries)
681 else:
682 break
683 if unit is None:
684 raise JujuLeaderUnitNotFound(
685 "Cannot execute action: leader unit not found"
686 )
687
688 actions = await application.get_actions()
689
690 if action_name not in actions:
691 raise JujuActionNotFound(
692 "Action {} not in available actions".format(action_name)
693 )
694
695 action = await unit.run_action(action_name, **kwargs)
696
697 self.log.debug(
698 "Wait until action {} is completed in application {} (model={})".format(
699 action_name, application_name, model_name
700 )
701 )
702 await JujuModelWatcher.wait_for(
703 model=model,
704 entity=action,
705 progress_timeout=progress_timeout,
706 total_timeout=total_timeout,
707 db_dict=db_dict,
708 n2vc=self.n2vc,
709 )
710
711 output = await model.get_action_output(action_uuid=action.entity_id)
712 status = await model.get_action_status(uuid_or_prefix=action.entity_id)
713 status = (
714 status[action.entity_id] if action.entity_id in status else "failed"
715 )
716
717 self.log.debug(
718 "Action {} completed with status {} in application {} (model={})".format(
719 action_name, action.status, application_name, model_name
720 )
721 )
722 finally:
723 await self.disconnect_model(model)
724 await self.disconnect_controller(controller)
725
726 return output, status
727
728 async def get_actions(self, application_name: str, model_name: str) -> dict:
729 """Get list of actions
730
731 :param: application_name: Application name
732 :param: model_name: Model name
733
734 :return: Dict with this format
735 {
736 "action_name": "Description of the action",
737 ...
738 }
739 """
740 self.log.debug(
741 "Getting list of actions for application {}".format(application_name)
742 )
743
744 # Get controller
745 controller = await self.get_controller()
746
747 # Get model
748 model = await self.get_model(controller, model_name)
749
750 try:
751 # Get application
752 application = self._get_application(
753 model,
754 application_name=application_name,
755 )
756
757 # Return list of actions
758 return await application.get_actions()
759
760 finally:
761 # Disconnect from model and controller
762 await self.disconnect_model(model)
763 await self.disconnect_controller(controller)
764
765 async def get_metrics(self, model_name: str, application_name: str) -> dict:
766 """Get the metrics collected by the VCA.
767
768 :param model_name The name or unique id of the network service
769 :param application_name The name of the application
770 """
771 if not model_name or not application_name:
772 raise Exception("model_name and application_name must be non-empty strings")
773 metrics = {}
774 controller = await self.get_controller()
775 model = await self.get_model(controller, model_name)
776 try:
777 application = self._get_application(model, application_name)
778 if application is not None:
779 metrics = await application.get_metrics()
780 finally:
781 self.disconnect_model(model)
782 self.disconnect_controller(controller)
783 return metrics
784
785 async def add_relation(
786 self,
787 model_name: str,
788 endpoint_1: str,
789 endpoint_2: str,
790 ):
791 """Add relation
792
793 :param: model_name: Model name
794 :param: endpoint_1 First endpoint name
795 ("app:endpoint" format or directly the saas name)
796 :param: endpoint_2: Second endpoint name (^ same format)
797 """
798
799 self.log.debug("Adding relation: {} -> {}".format(endpoint_1, endpoint_2))
800
801 # Get controller
802 controller = await self.get_controller()
803
804 # Get model
805 model = await self.get_model(controller, model_name)
806
807 # Add relation
808 try:
809 await model.add_relation(endpoint_1, endpoint_2)
810 except JujuAPIError as e:
811 if "not found" in e.message:
812 self.log.warning("Relation not found: {}".format(e.message))
813 return
814 if "already exists" in e.message:
815 self.log.warning("Relation already exists: {}".format(e.message))
816 return
817 # another exception, raise it
818 raise e
819 finally:
820 await self.disconnect_model(model)
821 await self.disconnect_controller(controller)
822
823 async def consume(
824 self,
825 offer_url: str,
826 model_name: str,
827 ):
828 """
829 Adds a remote offer to the model. Relations can be created later using "juju relate".
830
831 :param: offer_url: Offer Url
832 :param: model_name: Model name
833
834 :raises ParseError if there's a problem parsing the offer_url
835 :raises JujuError if remote offer includes and endpoint
836 :raises JujuAPIError if the operation is not successful
837 """
838 controller = await self.get_controller()
839 model = await controller.get_model(model_name)
840
841 try:
842 await model.consume(offer_url)
843 finally:
844 await self.disconnect_model(model)
845 await self.disconnect_controller(controller)
846
847 async def destroy_model(self, model_name: str, total_timeout: float):
848 """
849 Destroy model
850
851 :param: model_name: Model name
852 :param: total_timeout: Timeout
853 """
854
855 controller = await self.get_controller()
856 model = None
857 try:
858 model = await self.get_model(controller, model_name)
859 self.log.debug("Destroying model {}".format(model_name))
860 uuid = model.info.uuid
861
862 # Destroy machines that are manually provisioned
863 # and still are in pending state
864 await self._destroy_pending_machines(model, only_manual=True)
865
866 # Disconnect model
867 await self.disconnect_model(model)
868
869 # Destroy model
870 if model_name in self.models:
871 self.models.remove(model_name)
872
873 await controller.destroy_model(uuid, force=True, max_wait=0)
874
875 # Wait until model is destroyed
876 self.log.debug("Waiting for model {} to be destroyed...".format(model_name))
877
878 if total_timeout is None:
879 total_timeout = 3600
880 end = time.time() + total_timeout
881 while time.time() < end:
882 models = await controller.list_models()
883 if model_name not in models:
884 self.log.debug(
885 "The model {} ({}) was destroyed".format(model_name, uuid)
886 )
887 return
888 await asyncio.sleep(5)
889 raise Exception(
890 "Timeout waiting for model {} to be destroyed".format(model_name)
891 )
892 except Exception as e:
893 if model:
894 await self.disconnect_model(model)
895 raise e
896 finally:
897 await self.disconnect_controller(controller)
898
899 async def destroy_application(self, model: Model, application_name: str):
900 """
901 Destroy application
902
903 :param: model: Model object
904 :param: application_name: Application name
905 """
906 self.log.debug(
907 "Destroying application {} in model {}".format(
908 application_name, model.info.name
909 )
910 )
911 application = model.applications.get(application_name)
912 if application:
913 await application.destroy()
914 else:
915 self.log.warning("Application not found: {}".format(application_name))
916
917 async def _destroy_pending_machines(self, model: Model, only_manual: bool = False):
918 """
919 Destroy pending machines in a given model
920
921 :param: only_manual: Bool that indicates only manually provisioned
922 machines should be destroyed (if True), or that
923 all pending machines should be destroyed
924 """
925 status = await model.get_status()
926 for machine_id in status.machines:
927 machine_status = status.machines[machine_id]
928 if machine_status.agent_status.status == "pending":
929 if only_manual and not machine_status.instance_id.startswith("manual:"):
930 break
931 machine = model.machines[machine_id]
932 await machine.destroy(force=True)
933
934 # async def destroy_machine(
935 # self, model: Model, machine_id: str, total_timeout: float = 3600
936 # ):
937 # """
938 # Destroy machine
939
940 # :param: model: Model object
941 # :param: machine_id: Machine id
942 # :param: total_timeout: Timeout in seconds
943 # """
944 # machines = await model.get_machines()
945 # if machine_id in machines:
946 # machine = machines[machine_id]
947 # await machine.destroy(force=True)
948 # # max timeout
949 # end = time.time() + total_timeout
950
951 # # wait for machine removal
952 # machines = await model.get_machines()
953 # while machine_id in machines and time.time() < end:
954 # self.log.debug("Waiting for machine {} is destroyed".format(machine_id))
955 # await asyncio.sleep(0.5)
956 # machines = await model.get_machines()
957 # self.log.debug("Machine destroyed: {}".format(machine_id))
958 # else:
959 # self.log.debug("Machine not found: {}".format(machine_id))
960
961 async def configure_application(
962 self, model_name: str, application_name: str, config: dict = None
963 ):
964 """Configure application
965
966 :param: model_name: Model name
967 :param: application_name: Application name
968 :param: config: Config to apply to the charm
969 """
970 self.log.debug("Configuring application {}".format(application_name))
971
972 if config:
973 controller = await self.get_controller()
974 model = None
975 try:
976 model = await self.get_model(controller, model_name)
977 application = self._get_application(
978 model,
979 application_name=application_name,
980 )
981 await application.set_config(config)
982 finally:
983 if model:
984 await self.disconnect_model(model)
985 await self.disconnect_controller(controller)
986
987 def _get_api_endpoints_db(self) -> [str]:
988 """
989 Get API Endpoints from DB
990
991 :return: List of API endpoints
992 """
993 self.log.debug("Getting endpoints from database")
994
995 juju_info = self.db.get_one(
996 DB_DATA.api_endpoints.table,
997 q_filter=DB_DATA.api_endpoints.filter,
998 fail_on_empty=False,
999 )
1000 if juju_info and DB_DATA.api_endpoints.key in juju_info:
1001 return juju_info[DB_DATA.api_endpoints.key]
1002
1003 def _update_api_endpoints_db(self, endpoints: [str]):
1004 """
1005 Update API endpoints in Database
1006
1007 :param: List of endpoints
1008 """
1009 self.log.debug("Saving endpoints {} in database".format(endpoints))
1010
1011 juju_info = self.db.get_one(
1012 DB_DATA.api_endpoints.table,
1013 q_filter=DB_DATA.api_endpoints.filter,
1014 fail_on_empty=False,
1015 )
1016 # If it doesn't, then create it
1017 if not juju_info:
1018 try:
1019 self.db.create(
1020 DB_DATA.api_endpoints.table,
1021 DB_DATA.api_endpoints.filter,
1022 )
1023 except DbException as e:
1024 # Racing condition: check if another N2VC worker has created it
1025 juju_info = self.db.get_one(
1026 DB_DATA.api_endpoints.table,
1027 q_filter=DB_DATA.api_endpoints.filter,
1028 fail_on_empty=False,
1029 )
1030 if not juju_info:
1031 raise e
1032 self.db.set_one(
1033 DB_DATA.api_endpoints.table,
1034 DB_DATA.api_endpoints.filter,
1035 {DB_DATA.api_endpoints.key: endpoints},
1036 )
1037
1038 def handle_exception(self, loop, context):
1039 # All unhandled exceptions by libjuju are handled here.
1040 pass
1041
1042 async def health_check(self, interval: float = 300.0):
1043 """
1044 Health check to make sure controller and controller_model connections are OK
1045
1046 :param: interval: Time in seconds between checks
1047 """
1048 controller = None
1049 while True:
1050 try:
1051 controller = await self.get_controller()
1052 # self.log.debug("VCA is alive")
1053 except Exception as e:
1054 self.log.error("Health check to VCA failed: {}".format(e))
1055 finally:
1056 await self.disconnect_controller(controller)
1057 await asyncio.sleep(interval)
1058
1059 async def list_models(self, contains: str = None) -> [str]:
1060 """List models with certain names
1061
1062 :param: contains: String that is contained in model name
1063
1064 :retur: [models] Returns list of model names
1065 """
1066
1067 controller = await self.get_controller()
1068 try:
1069 models = await controller.list_models()
1070 if contains:
1071 models = [model for model in models if contains in model]
1072 return models
1073 finally:
1074 await self.disconnect_controller(controller)
1075
1076 async def list_offers(self, model_name: str) -> QueryApplicationOffersResults:
1077 """List models with certain names
1078
1079 :param: model_name: Model name
1080
1081 :return: Returns list of offers
1082 """
1083
1084 controller = await self.get_controller()
1085 try:
1086 return await controller.list_offers(model_name)
1087 finally:
1088 await self.disconnect_controller(controller)
1089
1090 async def add_k8s(
1091 self,
1092 name: str,
1093 rbac_id: str,
1094 token: str,
1095 client_cert_data: str,
1096 configuration: Configuration,
1097 storage_class: str,
1098 credential_name: str = None,
1099 ):
1100 """
1101 Add a Kubernetes cloud to the controller
1102
1103 Similar to the `juju add-k8s` command in the CLI
1104
1105 :param: name: Name for the K8s cloud
1106 :param: configuration: Kubernetes configuration object
1107 :param: storage_class: Storage Class to use in the cloud
1108 :param: credential_name: Storage Class to use in the cloud
1109 """
1110
1111 if not storage_class:
1112 raise Exception("storage_class must be a non-empty string")
1113 if not name:
1114 raise Exception("name must be a non-empty string")
1115 if not configuration:
1116 raise Exception("configuration must be provided")
1117
1118 endpoint = configuration.host
1119 credential = self.get_k8s_cloud_credential(
1120 configuration,
1121 client_cert_data,
1122 token,
1123 )
1124 credential.attrs[RBAC_LABEL_KEY_NAME] = rbac_id
1125 cloud = client.Cloud(
1126 type_="kubernetes",
1127 auth_types=[credential.auth_type],
1128 endpoint=endpoint,
1129 ca_certificates=[client_cert_data],
1130 config={
1131 "operator-storage": storage_class,
1132 "workload-storage": storage_class,
1133 },
1134 )
1135
1136 return await self.add_cloud(
1137 name, cloud, credential, credential_name=credential_name
1138 )
1139
1140 def get_k8s_cloud_credential(
1141 self,
1142 configuration: Configuration,
1143 client_cert_data: str,
1144 token: str = None,
1145 ) -> client.CloudCredential:
1146 attrs = {}
1147 # TODO: Test with AKS
1148 key = None # open(configuration.key_file, "r").read()
1149 username = configuration.username
1150 password = configuration.password
1151
1152 if client_cert_data:
1153 attrs["ClientCertificateData"] = client_cert_data
1154 if key:
1155 attrs["ClientKeyData"] = key
1156 if token:
1157 if username or password:
1158 raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass")
1159 attrs["Token"] = token
1160
1161 auth_type = None
1162 if key:
1163 auth_type = "oauth2"
1164 if client_cert_data:
1165 auth_type = "oauth2withcert"
1166 if not token:
1167 raise JujuInvalidK8sConfiguration(
1168 "missing token for auth type {}".format(auth_type)
1169 )
1170 elif username:
1171 if not password:
1172 self.log.debug(
1173 "credential for user {} has empty password".format(username)
1174 )
1175 attrs["username"] = username
1176 attrs["password"] = password
1177 if client_cert_data:
1178 auth_type = "userpasswithcert"
1179 else:
1180 auth_type = "userpass"
1181 elif client_cert_data and token:
1182 auth_type = "certificate"
1183 else:
1184 raise JujuInvalidK8sConfiguration("authentication method not supported")
1185 return client.CloudCredential(auth_type=auth_type, attrs=attrs)
1186
1187 async def add_cloud(
1188 self,
1189 name: str,
1190 cloud: Cloud,
1191 credential: CloudCredential = None,
1192 credential_name: str = None,
1193 ) -> Cloud:
1194 """
1195 Add cloud to the controller
1196
1197 :param: name: Name of the cloud to be added
1198 :param: cloud: Cloud object
1199 :param: credential: CloudCredentials object for the cloud
1200 :param: credential_name: Credential name.
1201 If not defined, cloud of the name will be used.
1202 """
1203 controller = await self.get_controller()
1204 try:
1205 _ = await controller.add_cloud(name, cloud)
1206 if credential:
1207 await controller.add_credential(
1208 credential_name or name, credential=credential, cloud=name
1209 )
1210 # Need to return the object returned by the controller.add_cloud() function
1211 # I'm returning the original value now until this bug is fixed:
1212 # https://github.com/juju/python-libjuju/issues/443
1213 return cloud
1214 finally:
1215 await self.disconnect_controller(controller)
1216
1217 async def remove_cloud(self, name: str):
1218 """
1219 Remove cloud
1220
1221 :param: name: Name of the cloud to be removed
1222 """
1223 controller = await self.get_controller()
1224 try:
1225 await controller.remove_cloud(name)
1226 finally:
1227 await self.disconnect_controller(controller)
1228
1229 async def _get_leader_unit(self, application: Application) -> Unit:
1230 unit = None
1231 for u in application.units:
1232 if await u.is_leader_from_status():
1233 unit = u
1234 break
1235 return unit
1236
1237 async def get_cloud_credentials(self, cloud_name: str, credential_name: str):
1238 controller = await self.get_controller()
1239 try:
1240 facade = client.CloudFacade.from_connection(controller.connection())
1241 cloud_cred_tag = tag.credential(cloud_name, self.username, credential_name)
1242 params = [client.Entity(cloud_cred_tag)]
1243 return (await facade.Credential(params)).results
1244 finally:
1245 await self.disconnect_controller(controller)