12730fde98bb795d51de9011fde8c066f1062e57
[osm/N2VC.git] / n2vc / libjuju.py
1 # Copyright 2020 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import logging
17 from juju.controller import Controller
18 from juju.client import client
19 import time
20
21 from juju.errors import JujuAPIError
22 from juju.model import Model
23 from juju.machine import Machine
24 from juju.application import Application
25 from juju.client._definitions import (
26 FullStatus,
27 QueryApplicationOffersResults,
28 Cloud,
29 CloudCredential,
30 )
31 from n2vc.juju_watcher import JujuModelWatcher
32 from n2vc.provisioner import AsyncSSHProvisioner
33 from n2vc.n2vc_conn import N2VCConnector
34 from n2vc.exceptions import (
35 JujuMachineNotFound,
36 JujuApplicationNotFound,
37 JujuLeaderUnitNotFound,
38 JujuActionNotFound,
39 JujuModelAlreadyExists,
40 JujuControllerFailedConnecting,
41 JujuApplicationExists,
42 )
43 from n2vc.utils import DB_DATA
44 from osm_common.dbbase import DbException
45
46
47 class Libjuju:
48 def __init__(
49 self,
50 endpoint: str,
51 api_proxy: str,
52 username: str,
53 password: str,
54 cacert: str,
55 loop: asyncio.AbstractEventLoop = None,
56 log: logging.Logger = None,
57 db: dict = None,
58 n2vc: N2VCConnector = None,
59 apt_mirror: str = None,
60 enable_os_upgrade: bool = True,
61 ):
62 """
63 Constructor
64
65 :param: endpoint: Endpoint of the juju controller (host:port)
66 :param: api_proxy: Endpoint of the juju controller - Reachable from the VNFs
67 :param: username: Juju username
68 :param: password: Juju password
69 :param: cacert: Juju CA Certificate
70 :param: loop: Asyncio loop
71 :param: log: Logger
72 :param: db: DB object
73 :param: n2vc: N2VC object
74 :param: apt_mirror: APT Mirror
75 :param: enable_os_upgrade: Enable OS Upgrade
76 """
77
78 self.log = log or logging.getLogger("Libjuju")
79 self.db = db
80 db_endpoints = self._get_api_endpoints_db()
81 self.endpoints = db_endpoints or [endpoint]
82 if db_endpoints is None:
83 self._update_api_endpoints_db(self.endpoints)
84 self.api_proxy = api_proxy
85 self.username = username
86 self.password = password
87 self.cacert = cacert
88 self.loop = loop or asyncio.get_event_loop()
89 self.n2vc = n2vc
90
91 # Generate config for models
92 self.model_config = {}
93 if apt_mirror:
94 self.model_config["apt-mirror"] = apt_mirror
95 self.model_config["enable-os-refresh-update"] = enable_os_upgrade
96 self.model_config["enable-os-upgrade"] = enable_os_upgrade
97
98 self.loop.set_exception_handler(self.handle_exception)
99 self.creating_model = asyncio.Lock(loop=self.loop)
100
101 self.models = set()
102 self.log.debug("Libjuju initialized!")
103
104 self.health_check_task = self.loop.create_task(self.health_check())
105
106 async def get_controller(self, timeout: float = 5.0) -> Controller:
107 """
108 Get controller
109
110 :param: timeout: Time in seconds to wait for controller to connect
111 """
112 controller = None
113 try:
114 controller = Controller(loop=self.loop)
115 await asyncio.wait_for(
116 controller.connect(
117 endpoint=self.endpoints,
118 username=self.username,
119 password=self.password,
120 cacert=self.cacert,
121 ),
122 timeout=timeout,
123 )
124 endpoints = await controller.api_endpoints
125 if self.endpoints != endpoints:
126 self.endpoints = endpoints
127 self._update_api_endpoints_db(self.endpoints)
128 return controller
129 except asyncio.CancelledError as e:
130 raise e
131 except Exception as e:
132 self.log.error(
133 "Failed connecting to controller: {}...".format(self.endpoints)
134 )
135 if controller:
136 await self.disconnect_controller(controller)
137 raise JujuControllerFailedConnecting(e)
138
139 async def disconnect(self):
140 """Disconnect"""
141 # Cancel health check task
142 self.health_check_task.cancel()
143 self.log.debug("Libjuju disconnected!")
144
145 async def disconnect_model(self, model: Model):
146 """
147 Disconnect model
148
149 :param: model: Model that will be disconnected
150 """
151 await model.disconnect()
152
153 async def disconnect_controller(self, controller: Controller):
154 """
155 Disconnect controller
156
157 :param: controller: Controller that will be disconnected
158 """
159 await controller.disconnect()
160
161 async def add_model(self, model_name: str, cloud_name: str):
162 """
163 Create model
164
165 :param: model_name: Model name
166 :param: cloud_name: Cloud name
167 """
168
169 # Get controller
170 controller = await self.get_controller()
171 model = None
172 try:
173 # Raise exception if model already exists
174 if await self.model_exists(model_name, controller=controller):
175 raise JujuModelAlreadyExists(
176 "Model {} already exists.".format(model_name)
177 )
178
179 # Block until other workers have finished model creation
180 while self.creating_model.locked():
181 await asyncio.sleep(0.1)
182
183 # If the model exists, return it from the controller
184 if model_name in self.models:
185 return
186
187 # Create the model
188 async with self.creating_model:
189 self.log.debug("Creating model {}".format(model_name))
190 model = await controller.add_model(
191 model_name,
192 config=self.model_config,
193 cloud_name=cloud_name,
194 credential_name=cloud_name,
195 )
196 self.models.add(model_name)
197 finally:
198 if model:
199 await self.disconnect_model(model)
200 await self.disconnect_controller(controller)
201
202 async def get_model(
203 self, controller: Controller, model_name: str, id=None
204 ) -> Model:
205 """
206 Get model from controller
207
208 :param: controller: Controller
209 :param: model_name: Model name
210
211 :return: Model: The created Juju model object
212 """
213 return await controller.get_model(model_name)
214
215 async def model_exists(
216 self, model_name: str, controller: Controller = None
217 ) -> bool:
218 """
219 Check if model exists
220
221 :param: controller: Controller
222 :param: model_name: Model name
223
224 :return bool
225 """
226 need_to_disconnect = False
227
228 # Get controller if not passed
229 if not controller:
230 controller = await self.get_controller()
231 need_to_disconnect = True
232
233 # Check if model exists
234 try:
235 return model_name in await controller.list_models()
236 finally:
237 if need_to_disconnect:
238 await self.disconnect_controller(controller)
239
240 async def models_exist(self, model_names: [str]) -> (bool, list):
241 """
242 Check if models exists
243
244 :param: model_names: List of strings with model names
245
246 :return (bool, list[str]): (True if all models exists, List of model names that don't exist)
247 """
248 if not model_names:
249 raise Exception(
250 "model_names must be a non-empty array. Given value: {}".format(
251 model_names
252 )
253 )
254 non_existing_models = []
255 models = await self.list_models()
256 existing_models = list(set(models).intersection(model_names))
257 non_existing_models = list(set(model_names) - set(existing_models))
258
259 return (
260 len(non_existing_models) == 0,
261 non_existing_models,
262 )
263
264 async def get_model_status(self, model_name: str) -> FullStatus:
265 """
266 Get model status
267
268 :param: model_name: Model name
269
270 :return: Full status object
271 """
272 controller = await self.get_controller()
273 model = await self.get_model(controller, model_name)
274 try:
275 return await model.get_status()
276 finally:
277 await self.disconnect_model(model)
278 await self.disconnect_controller(controller)
279
280 async def create_machine(
281 self,
282 model_name: str,
283 machine_id: str = None,
284 db_dict: dict = None,
285 progress_timeout: float = None,
286 total_timeout: float = None,
287 series: str = "xenial",
288 wait: bool = True,
289 ) -> (Machine, bool):
290 """
291 Create machine
292
293 :param: model_name: Model name
294 :param: machine_id: Machine id
295 :param: db_dict: Dictionary with data of the DB to write the updates
296 :param: progress_timeout: Maximum time between two updates in the model
297 :param: total_timeout: Timeout for the entity to be active
298 :param: series: Series of the machine (xenial, bionic, focal, ...)
299 :param: wait: Wait until machine is ready
300
301 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
302 if the machine is new or it already existed
303 """
304 new = False
305 machine = None
306
307 self.log.debug(
308 "Creating machine (id={}) in model: {}".format(machine_id, model_name)
309 )
310
311 # Get controller
312 controller = await self.get_controller()
313
314 # Get model
315 model = await self.get_model(controller, model_name)
316 try:
317 if machine_id is not None:
318 self.log.debug(
319 "Searching machine (id={}) in model {}".format(
320 machine_id, model_name
321 )
322 )
323
324 # Get machines from model and get the machine with machine_id if exists
325 machines = await model.get_machines()
326 if machine_id in machines:
327 self.log.debug(
328 "Machine (id={}) found in model {}".format(
329 machine_id, model_name
330 )
331 )
332 machine = machines[machine_id]
333 else:
334 raise JujuMachineNotFound("Machine {} not found".format(machine_id))
335
336 if machine is None:
337 self.log.debug("Creating a new machine in model {}".format(model_name))
338
339 # Create machine
340 machine = await model.add_machine(
341 spec=None, constraints=None, disks=None, series=series
342 )
343 new = True
344
345 # Wait until the machine is ready
346 self.log.debug(
347 "Wait until machine {} is ready in model {}".format(
348 machine.entity_id, model_name
349 )
350 )
351 if wait:
352 await JujuModelWatcher.wait_for(
353 model=model,
354 entity=machine,
355 progress_timeout=progress_timeout,
356 total_timeout=total_timeout,
357 db_dict=db_dict,
358 n2vc=self.n2vc,
359 )
360 finally:
361 await self.disconnect_model(model)
362 await self.disconnect_controller(controller)
363
364 self.log.debug(
365 "Machine {} ready at {} in model {}".format(
366 machine.entity_id, machine.dns_name, model_name
367 )
368 )
369 return machine, new
370
371 async def provision_machine(
372 self,
373 model_name: str,
374 hostname: str,
375 username: str,
376 private_key_path: str,
377 db_dict: dict = None,
378 progress_timeout: float = None,
379 total_timeout: float = None,
380 ) -> str:
381 """
382 Manually provisioning of a machine
383
384 :param: model_name: Model name
385 :param: hostname: IP to access the machine
386 :param: username: Username to login to the machine
387 :param: private_key_path: Local path for the private key
388 :param: db_dict: Dictionary with data of the DB to write the updates
389 :param: progress_timeout: Maximum time between two updates in the model
390 :param: total_timeout: Timeout for the entity to be active
391
392 :return: (Entity): Machine id
393 """
394 self.log.debug(
395 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
396 model_name, hostname, username
397 )
398 )
399
400 # Get controller
401 controller = await self.get_controller()
402
403 # Get model
404 model = await self.get_model(controller, model_name)
405
406 try:
407 # Get provisioner
408 provisioner = AsyncSSHProvisioner(
409 host=hostname,
410 user=username,
411 private_key_path=private_key_path,
412 log=self.log,
413 )
414
415 # Provision machine
416 params = await provisioner.provision_machine()
417
418 params.jobs = ["JobHostUnits"]
419
420 self.log.debug("Adding machine to model")
421 connection = model.connection()
422 client_facade = client.ClientFacade.from_connection(connection)
423
424 results = await client_facade.AddMachines(params=[params])
425 error = results.machines[0].error
426
427 if error:
428 msg = "Error adding machine: {}".format(error.message)
429 self.log.error(msg=msg)
430 raise ValueError(msg)
431
432 machine_id = results.machines[0].machine
433
434 self.log.debug("Installing Juju agent into machine {}".format(machine_id))
435 asyncio.ensure_future(
436 provisioner.install_agent(
437 connection=connection,
438 nonce=params.nonce,
439 machine_id=machine_id,
440 proxy=self.api_proxy,
441 )
442 )
443
444 machine = None
445 for _ in range(10):
446 machine_list = await model.get_machines()
447 if machine_id in machine_list:
448 self.log.debug("Machine {} found in model!".format(machine_id))
449 machine = model.machines.get(machine_id)
450 break
451 await asyncio.sleep(2)
452
453 if machine is None:
454 msg = "Machine {} not found in model".format(machine_id)
455 self.log.error(msg=msg)
456 raise JujuMachineNotFound(msg)
457
458 self.log.debug(
459 "Wait until machine {} is ready in model {}".format(
460 machine.entity_id, model_name
461 )
462 )
463 await JujuModelWatcher.wait_for(
464 model=model,
465 entity=machine,
466 progress_timeout=progress_timeout,
467 total_timeout=total_timeout,
468 db_dict=db_dict,
469 n2vc=self.n2vc,
470 )
471 except Exception as e:
472 raise e
473 finally:
474 await self.disconnect_model(model)
475 await self.disconnect_controller(controller)
476
477 self.log.debug(
478 "Machine provisioned {} in model {}".format(machine_id, model_name)
479 )
480
481 return machine_id
482
483 async def deploy_charm(
484 self,
485 application_name: str,
486 path: str,
487 model_name: str,
488 machine_id: str,
489 db_dict: dict = None,
490 progress_timeout: float = None,
491 total_timeout: float = None,
492 config: dict = None,
493 series: str = None,
494 num_units: int = 1,
495 ):
496 """Deploy charm
497
498 :param: application_name: Application name
499 :param: path: Local path to the charm
500 :param: model_name: Model name
501 :param: machine_id ID of the machine
502 :param: db_dict: Dictionary with data of the DB to write the updates
503 :param: progress_timeout: Maximum time between two updates in the model
504 :param: total_timeout: Timeout for the entity to be active
505 :param: config: Config for the charm
506 :param: series: Series of the charm
507 :param: num_units: Number of units
508
509 :return: (juju.application.Application): Juju application
510 """
511 self.log.debug(
512 "Deploying charm {} to machine {} in model ~{}".format(
513 application_name, machine_id, model_name
514 )
515 )
516 self.log.debug("charm: {}".format(path))
517
518 # Get controller
519 controller = await self.get_controller()
520
521 # Get model
522 model = await self.get_model(controller, model_name)
523
524 try:
525 application = None
526 if application_name not in model.applications:
527
528 if machine_id is not None:
529 if machine_id not in model.machines:
530 msg = "Machine {} not found in model".format(machine_id)
531 self.log.error(msg=msg)
532 raise JujuMachineNotFound(msg)
533 machine = model.machines[machine_id]
534 series = machine.series
535
536 application = await model.deploy(
537 entity_url=path,
538 application_name=application_name,
539 channel="stable",
540 num_units=1,
541 series=series,
542 to=machine_id,
543 config=config,
544 )
545
546 self.log.debug(
547 "Wait until application {} is ready in model {}".format(
548 application_name, model_name
549 )
550 )
551 if num_units > 1:
552 for _ in range(num_units - 1):
553 m, _ = await self.create_machine(model_name, wait=False)
554 await application.add_unit(to=m.entity_id)
555
556 await JujuModelWatcher.wait_for(
557 model=model,
558 entity=application,
559 progress_timeout=progress_timeout,
560 total_timeout=total_timeout,
561 db_dict=db_dict,
562 n2vc=self.n2vc,
563 )
564 self.log.debug(
565 "Application {} is ready in model {}".format(
566 application_name, model_name
567 )
568 )
569 else:
570 raise JujuApplicationExists(
571 "Application {} exists".format(application_name)
572 )
573 finally:
574 await self.disconnect_model(model)
575 await self.disconnect_controller(controller)
576
577 return application
578
579 def _get_application(self, model: Model, application_name: str) -> Application:
580 """Get application
581
582 :param: model: Model object
583 :param: application_name: Application name
584
585 :return: juju.application.Application (or None if it doesn't exist)
586 """
587 if model.applications and application_name in model.applications:
588 return model.applications[application_name]
589
590 async def execute_action(
591 self,
592 application_name: str,
593 model_name: str,
594 action_name: str,
595 db_dict: dict = None,
596 progress_timeout: float = None,
597 total_timeout: float = None,
598 **kwargs
599 ):
600 """Execute action
601
602 :param: application_name: Application name
603 :param: model_name: Model name
604 :param: action_name: Name of the action
605 :param: db_dict: Dictionary with data of the DB to write the updates
606 :param: progress_timeout: Maximum time between two updates in the model
607 :param: total_timeout: Timeout for the entity to be active
608
609 :return: (str, str): (output and status)
610 """
611 self.log.debug(
612 "Executing action {} using params {}".format(action_name, kwargs)
613 )
614 # Get controller
615 controller = await self.get_controller()
616
617 # Get model
618 model = await self.get_model(controller, model_name)
619
620 try:
621 # Get application
622 application = self._get_application(
623 model, application_name=application_name,
624 )
625 if application is None:
626 raise JujuApplicationNotFound("Cannot execute action")
627
628 # Get unit
629 unit = None
630 for u in application.units:
631 if await u.is_leader_from_status():
632 unit = u
633 if unit is None:
634 raise JujuLeaderUnitNotFound(
635 "Cannot execute action: leader unit not found"
636 )
637
638 actions = await application.get_actions()
639
640 if action_name not in actions:
641 raise JujuActionNotFound(
642 "Action {} not in available actions".format(action_name)
643 )
644
645 action = await unit.run_action(action_name, **kwargs)
646
647 self.log.debug(
648 "Wait until action {} is completed in application {} (model={})".format(
649 action_name, application_name, model_name
650 )
651 )
652 await JujuModelWatcher.wait_for(
653 model=model,
654 entity=action,
655 progress_timeout=progress_timeout,
656 total_timeout=total_timeout,
657 db_dict=db_dict,
658 n2vc=self.n2vc,
659 )
660
661 output = await model.get_action_output(action_uuid=action.entity_id)
662 status = await model.get_action_status(uuid_or_prefix=action.entity_id)
663 status = (
664 status[action.entity_id] if action.entity_id in status else "failed"
665 )
666
667 self.log.debug(
668 "Action {} completed with status {} in application {} (model={})".format(
669 action_name, action.status, application_name, model_name
670 )
671 )
672 finally:
673 await self.disconnect_model(model)
674 await self.disconnect_controller(controller)
675
676 return output, status
677
678 async def get_actions(self, application_name: str, model_name: str) -> dict:
679 """Get list of actions
680
681 :param: application_name: Application name
682 :param: model_name: Model name
683
684 :return: Dict with this format
685 {
686 "action_name": "Description of the action",
687 ...
688 }
689 """
690 self.log.debug(
691 "Getting list of actions for application {}".format(application_name)
692 )
693
694 # Get controller
695 controller = await self.get_controller()
696
697 # Get model
698 model = await self.get_model(controller, model_name)
699
700 try:
701 # Get application
702 application = self._get_application(
703 model, application_name=application_name,
704 )
705
706 # Return list of actions
707 return await application.get_actions()
708
709 finally:
710 # Disconnect from model and controller
711 await self.disconnect_model(model)
712 await self.disconnect_controller(controller)
713
714 async def add_relation(
715 self, model_name: str, endpoint_1: str, endpoint_2: str,
716 ):
717 """Add relation
718
719 :param: model_name: Model name
720 :param: endpoint_1 First endpoint name
721 ("app:endpoint" format or directly the saas name)
722 :param: endpoint_2: Second endpoint name (^ same format)
723 """
724
725 self.log.debug("Adding relation: {} -> {}".format(endpoint_1, endpoint_2))
726
727 # Get controller
728 controller = await self.get_controller()
729
730 # Get model
731 model = await self.get_model(controller, model_name)
732
733 # Add relation
734 try:
735 await model.add_relation(endpoint_1, endpoint_2)
736 except JujuAPIError as e:
737 if "not found" in e.message:
738 self.log.warning("Relation not found: {}".format(e.message))
739 return
740 if "already exists" in e.message:
741 self.log.warning("Relation already exists: {}".format(e.message))
742 return
743 # another exception, raise it
744 raise e
745 finally:
746 await self.disconnect_model(model)
747 await self.disconnect_controller(controller)
748
749 async def consume(
750 self, offer_url: str, model_name: str,
751 ):
752 """
753 Adds a remote offer to the model. Relations can be created later using "juju relate".
754
755 :param: offer_url: Offer Url
756 :param: model_name: Model name
757
758 :raises ParseError if there's a problem parsing the offer_url
759 :raises JujuError if remote offer includes and endpoint
760 :raises JujuAPIError if the operation is not successful
761 """
762 controller = await self.get_controller()
763 model = await controller.get_model(model_name)
764
765 try:
766 await model.consume(offer_url)
767 finally:
768 await self.disconnect_model(model)
769 await self.disconnect_controller(controller)
770
771 async def destroy_model(self, model_name: str, total_timeout: float):
772 """
773 Destroy model
774
775 :param: model_name: Model name
776 :param: total_timeout: Timeout
777 """
778
779 controller = await self.get_controller()
780 model = await self.get_model(controller, model_name)
781 try:
782 self.log.debug("Destroying model {}".format(model_name))
783 uuid = model.info.uuid
784
785 # Destroy machines
786 machines = await model.get_machines()
787 for machine_id in machines:
788 try:
789 await self.destroy_machine(
790 model, machine_id=machine_id, total_timeout=total_timeout,
791 )
792 except asyncio.CancelledError:
793 raise
794 except Exception:
795 pass
796
797 # Disconnect model
798 await self.disconnect_model(model)
799
800 # Destroy model
801 if model_name in self.models:
802 self.models.remove(model_name)
803
804 await controller.destroy_model(uuid)
805
806 # Wait until model is destroyed
807 self.log.debug("Waiting for model {} to be destroyed...".format(model_name))
808 last_exception = ""
809
810 if total_timeout is None:
811 total_timeout = 3600
812 end = time.time() + total_timeout
813 while time.time() < end:
814 try:
815 models = await controller.list_models()
816 if model_name not in models:
817 self.log.debug(
818 "The model {} ({}) was destroyed".format(model_name, uuid)
819 )
820 return
821 except asyncio.CancelledError:
822 raise
823 except Exception as e:
824 last_exception = e
825 await asyncio.sleep(5)
826 raise Exception(
827 "Timeout waiting for model {} to be destroyed {}".format(
828 model_name, last_exception
829 )
830 )
831 finally:
832 await self.disconnect_controller(controller)
833
834 async def destroy_application(self, model: Model, application_name: str):
835 """
836 Destroy application
837
838 :param: model: Model object
839 :param: application_name: Application name
840 """
841 self.log.debug(
842 "Destroying application {} in model {}".format(
843 application_name, model.info.name
844 )
845 )
846 application = model.applications.get(application_name)
847 if application:
848 await application.destroy()
849 else:
850 self.log.warning("Application not found: {}".format(application_name))
851
852 async def destroy_machine(
853 self, model: Model, machine_id: str, total_timeout: float = 3600
854 ):
855 """
856 Destroy machine
857
858 :param: model: Model object
859 :param: machine_id: Machine id
860 :param: total_timeout: Timeout in seconds
861 """
862 machines = await model.get_machines()
863 if machine_id in machines:
864 machine = machines[machine_id]
865 await machine.destroy(force=True)
866 # max timeout
867 end = time.time() + total_timeout
868
869 # wait for machine removal
870 machines = await model.get_machines()
871 while machine_id in machines and time.time() < end:
872 self.log.debug("Waiting for machine {} is destroyed".format(machine_id))
873 await asyncio.sleep(0.5)
874 machines = await model.get_machines()
875 self.log.debug("Machine destroyed: {}".format(machine_id))
876 else:
877 self.log.debug("Machine not found: {}".format(machine_id))
878
879 async def configure_application(
880 self, model_name: str, application_name: str, config: dict = None
881 ):
882 """Configure application
883
884 :param: model_name: Model name
885 :param: application_name: Application name
886 :param: config: Config to apply to the charm
887 """
888 self.log.debug("Configuring application {}".format(application_name))
889
890 if config:
891 try:
892 controller = await self.get_controller()
893 model = await self.get_model(controller, model_name)
894 application = self._get_application(
895 model, application_name=application_name,
896 )
897 await application.set_config(config)
898 finally:
899 await self.disconnect_model(model)
900 await self.disconnect_controller(controller)
901
902 def _get_api_endpoints_db(self) -> [str]:
903 """
904 Get API Endpoints from DB
905
906 :return: List of API endpoints
907 """
908 self.log.debug("Getting endpoints from database")
909
910 juju_info = self.db.get_one(
911 DB_DATA.api_endpoints.table,
912 q_filter=DB_DATA.api_endpoints.filter,
913 fail_on_empty=False,
914 )
915 if juju_info and DB_DATA.api_endpoints.key in juju_info:
916 return juju_info[DB_DATA.api_endpoints.key]
917
918 def _update_api_endpoints_db(self, endpoints: [str]):
919 """
920 Update API endpoints in Database
921
922 :param: List of endpoints
923 """
924 self.log.debug("Saving endpoints {} in database".format(endpoints))
925
926 juju_info = self.db.get_one(
927 DB_DATA.api_endpoints.table,
928 q_filter=DB_DATA.api_endpoints.filter,
929 fail_on_empty=False,
930 )
931 # If it doesn't, then create it
932 if not juju_info:
933 try:
934 self.db.create(
935 DB_DATA.api_endpoints.table, DB_DATA.api_endpoints.filter,
936 )
937 except DbException as e:
938 # Racing condition: check if another N2VC worker has created it
939 juju_info = self.db.get_one(
940 DB_DATA.api_endpoints.table,
941 q_filter=DB_DATA.api_endpoints.filter,
942 fail_on_empty=False,
943 )
944 if not juju_info:
945 raise e
946 self.db.set_one(
947 DB_DATA.api_endpoints.table,
948 DB_DATA.api_endpoints.filter,
949 {DB_DATA.api_endpoints.key: endpoints},
950 )
951
952 def handle_exception(self, loop, context):
953 # All unhandled exceptions by libjuju are handled here.
954 pass
955
956 async def health_check(self, interval: float = 300.0):
957 """
958 Health check to make sure controller and controller_model connections are OK
959
960 :param: interval: Time in seconds between checks
961 """
962 while True:
963 try:
964 controller = await self.get_controller()
965 # self.log.debug("VCA is alive")
966 except Exception as e:
967 self.log.error("Health check to VCA failed: {}".format(e))
968 finally:
969 await self.disconnect_controller(controller)
970 await asyncio.sleep(interval)
971
972 async def list_models(self, contains: str = None) -> [str]:
973 """List models with certain names
974
975 :param: contains: String that is contained in model name
976
977 :retur: [models] Returns list of model names
978 """
979
980 controller = await self.get_controller()
981 try:
982 models = await controller.list_models()
983 if contains:
984 models = [model for model in models if contains in model]
985 return models
986 finally:
987 await self.disconnect_controller(controller)
988
989 async def list_offers(self, model_name: str) -> QueryApplicationOffersResults:
990 """List models with certain names
991
992 :param: model_name: Model name
993
994 :return: Returns list of offers
995 """
996
997 controller = await self.get_controller()
998 try:
999 return await controller.list_offers(model_name)
1000 finally:
1001 await self.disconnect_controller(controller)
1002
1003 async def add_k8s(self, name: str, auth_data: dict, storage_class: str):
1004 """
1005 Add a Kubernetes cloud to the controller
1006
1007 Similar to the `juju add-k8s` command in the CLI
1008
1009 :param: name: Name for the K8s cloud
1010 :param: auth_data: Dictionary with needed credentials. Format:
1011 {
1012 "server": "192.168.0.21:16443",
1013 "cacert": "-----BEGIN CERTIFI...",
1014 "token": "clhkRExRem5Xd1dCdnFEVXdvRGt...",
1015
1016 }
1017 :param: storage_class: Storage Class to use in the cloud
1018 """
1019
1020 required_auth_data_keys = ["server", "cacert", "token"]
1021 missing_keys = []
1022 for k in required_auth_data_keys:
1023 if k not in auth_data:
1024 missing_keys.append(k)
1025 if missing_keys:
1026 raise Exception(
1027 "missing keys in auth_data: {}".format(",".join(missing_keys))
1028 )
1029 if not storage_class:
1030 raise Exception("storage_class must be a non-empty string")
1031 if not name:
1032 raise Exception("name must be a non-empty string")
1033
1034 endpoint = auth_data["server"]
1035 cacert = auth_data["cacert"]
1036 token = auth_data["token"]
1037 region_name = "{}-region".format(name)
1038
1039 cloud = client.Cloud(
1040 auth_types=["certificate"],
1041 ca_certificates=[cacert],
1042 endpoint=endpoint,
1043 config={
1044 "operator-storage": storage_class,
1045 "workload-storage": storage_class,
1046 },
1047 regions=[client.CloudRegion(endpoint=endpoint, name=region_name)],
1048 type_="kubernetes",
1049 )
1050
1051 cred = client.CloudCredential(
1052 auth_type="certificate",
1053 attrs={"ClientCertificateData": cacert, "Token": token},
1054 )
1055 return await self.add_cloud(name, cloud, cred)
1056
1057 async def add_cloud(
1058 self, name: str, cloud: Cloud, credential: CloudCredential = None
1059 ) -> Cloud:
1060 """
1061 Add cloud to the controller
1062
1063 :param: name: Name of the cloud to be added
1064 :param: cloud: Cloud object
1065 :param: credential: CloudCredentials object for the cloud
1066 """
1067 controller = await self.get_controller()
1068 try:
1069 _ = await controller.add_cloud(name, cloud)
1070 if credential:
1071 await controller.add_credential(name, credential=credential, cloud=name)
1072 # Need to return the object returned by the controller.add_cloud() function
1073 # I'm returning the original value now until this bug is fixed:
1074 # https://github.com/juju/python-libjuju/issues/443
1075 return cloud
1076 finally:
1077 await self.disconnect_controller(controller)
1078
1079 async def remove_cloud(self, name: str):
1080 """
1081 Remove cloud
1082
1083 :param: name: Name of the cloud to be removed
1084 """
1085 controller = await self.get_controller()
1086 try:
1087 await controller.remove_cloud(name)
1088 finally:
1089 await self.disconnect_controller(controller)