Add consume to libjuju.py
[osm/N2VC.git] / n2vc / libjuju.py
1 # Copyright 2020 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import logging
17 from juju.controller import Controller
18 from juju.client import client
19 import time
20
21 from juju.errors import JujuAPIError
22 from juju.model import Model
23 from juju.machine import Machine
24 from juju.application import Application
25 from juju.client._definitions import FullStatus, QueryApplicationOffersResults
26 from n2vc.juju_watcher import JujuModelWatcher
27 from n2vc.provisioner import AsyncSSHProvisioner
28 from n2vc.n2vc_conn import N2VCConnector
29 from n2vc.exceptions import (
30 JujuMachineNotFound,
31 JujuApplicationNotFound,
32 JujuLeaderUnitNotFound,
33 JujuActionNotFound,
34 JujuModelAlreadyExists,
35 JujuControllerFailedConnecting,
36 JujuApplicationExists,
37 )
38 from n2vc.utils import DB_DATA
39 from osm_common.dbbase import DbException
40
41
42 class Libjuju:
43 def __init__(
44 self,
45 endpoint: str,
46 api_proxy: str,
47 username: str,
48 password: str,
49 cacert: str,
50 loop: asyncio.AbstractEventLoop = None,
51 log: logging.Logger = None,
52 db: dict = None,
53 n2vc: N2VCConnector = None,
54 apt_mirror: str = None,
55 enable_os_upgrade: bool = True,
56 ):
57 """
58 Constructor
59
60 :param: endpoint: Endpoint of the juju controller (host:port)
61 :param: api_proxy: Endpoint of the juju controller - Reachable from the VNFs
62 :param: username: Juju username
63 :param: password: Juju password
64 :param: cacert: Juju CA Certificate
65 :param: loop: Asyncio loop
66 :param: log: Logger
67 :param: db: DB object
68 :param: n2vc: N2VC object
69 :param: apt_mirror: APT Mirror
70 :param: enable_os_upgrade: Enable OS Upgrade
71 """
72
73 self.log = log or logging.getLogger("Libjuju")
74 self.db = db
75 db_endpoints = self._get_api_endpoints_db()
76 self.endpoints = db_endpoints or [endpoint]
77 if db_endpoints is None:
78 self._update_api_endpoints_db(self.endpoints)
79 self.api_proxy = api_proxy
80 self.username = username
81 self.password = password
82 self.cacert = cacert
83 self.loop = loop or asyncio.get_event_loop()
84 self.n2vc = n2vc
85
86 # Generate config for models
87 self.model_config = {}
88 if apt_mirror:
89 self.model_config["apt-mirror"] = apt_mirror
90 self.model_config["enable-os-refresh-update"] = enable_os_upgrade
91 self.model_config["enable-os-upgrade"] = enable_os_upgrade
92
93 self.loop.set_exception_handler(self.handle_exception)
94 self.creating_model = asyncio.Lock(loop=self.loop)
95
96 self.models = set()
97 self.log.debug("Libjuju initialized!")
98
99 self.health_check_task = self.loop.create_task(self.health_check())
100
101 async def get_controller(self, timeout: float = 5.0) -> Controller:
102 """
103 Get controller
104
105 :param: timeout: Time in seconds to wait for controller to connect
106 """
107 controller = None
108 try:
109 controller = Controller(loop=self.loop)
110 await asyncio.wait_for(
111 controller.connect(
112 endpoint=self.endpoints,
113 username=self.username,
114 password=self.password,
115 cacert=self.cacert,
116 ),
117 timeout=timeout,
118 )
119 endpoints = await controller.api_endpoints
120 if self.endpoints != endpoints:
121 self.endpoints = endpoints
122 self._update_api_endpoints_db(self.endpoints)
123 return controller
124 except asyncio.CancelledError as e:
125 raise e
126 except Exception as e:
127 self.log.error(
128 "Failed connecting to controller: {}...".format(self.endpoints)
129 )
130 if controller:
131 await self.disconnect_controller(controller)
132 raise JujuControllerFailedConnecting(e)
133
134 async def disconnect(self):
135 """Disconnect"""
136 # Cancel health check task
137 self.health_check_task.cancel()
138 self.log.debug("Libjuju disconnected!")
139
140 async def disconnect_model(self, model: Model):
141 """
142 Disconnect model
143
144 :param: model: Model that will be disconnected
145 """
146 await model.disconnect()
147
148 async def disconnect_controller(self, controller: Controller):
149 """
150 Disconnect controller
151
152 :param: controller: Controller that will be disconnected
153 """
154 await controller.disconnect()
155
156 async def add_model(self, model_name: str, cloud_name: str):
157 """
158 Create model
159
160 :param: model_name: Model name
161 :param: cloud_name: Cloud name
162 """
163
164 # Get controller
165 controller = await self.get_controller()
166 model = None
167 try:
168 # Raise exception if model already exists
169 if await self.model_exists(model_name, controller=controller):
170 raise JujuModelAlreadyExists(
171 "Model {} already exists.".format(model_name)
172 )
173
174 # Block until other workers have finished model creation
175 while self.creating_model.locked():
176 await asyncio.sleep(0.1)
177
178 # If the model exists, return it from the controller
179 if model_name in self.models:
180 return
181
182 # Create the model
183 async with self.creating_model:
184 self.log.debug("Creating model {}".format(model_name))
185 model = await controller.add_model(
186 model_name,
187 config=self.model_config,
188 cloud_name=cloud_name,
189 credential_name=cloud_name,
190 )
191 self.models.add(model_name)
192 finally:
193 if model:
194 await self.disconnect_model(model)
195 await self.disconnect_controller(controller)
196
197 async def get_model(
198 self, controller: Controller, model_name: str, id=None
199 ) -> Model:
200 """
201 Get model from controller
202
203 :param: controller: Controller
204 :param: model_name: Model name
205
206 :return: Model: The created Juju model object
207 """
208 return await controller.get_model(model_name)
209
210 async def model_exists(
211 self, model_name: str, controller: Controller = None
212 ) -> bool:
213 """
214 Check if model exists
215
216 :param: controller: Controller
217 :param: model_name: Model name
218
219 :return bool
220 """
221 need_to_disconnect = False
222
223 # Get controller if not passed
224 if not controller:
225 controller = await self.get_controller()
226 need_to_disconnect = True
227
228 # Check if model exists
229 try:
230 return model_name in await controller.list_models()
231 finally:
232 if need_to_disconnect:
233 await self.disconnect_controller(controller)
234
235 async def models_exist(self, model_names: [str]) -> (bool, list):
236 """
237 Check if models exists
238
239 :param: model_names: List of strings with model names
240
241 :return (bool, list[str]): (True if all models exists, List of model names that don't exist)
242 """
243 if not model_names:
244 raise Exception(
245 "model_names must be a non-empty array. Given value: {}".format(model_names)
246 )
247 non_existing_models = []
248 models = await self.list_models()
249 existing_models = list(set(models).intersection(model_names))
250 non_existing_models = list(set(model_names) - set(existing_models))
251
252 return (
253 len(non_existing_models) == 0,
254 non_existing_models,
255 )
256
257 async def get_model_status(self, model_name: str) -> FullStatus:
258 """
259 Get model status
260
261 :param: model_name: Model name
262
263 :return: Full status object
264 """
265 controller = await self.get_controller()
266 model = await self.get_model(controller, model_name)
267 try:
268 return await model.get_status()
269 finally:
270 await self.disconnect_model(model)
271 await self.disconnect_controller(controller)
272
273 async def create_machine(
274 self,
275 model_name: str,
276 machine_id: str = None,
277 db_dict: dict = None,
278 progress_timeout: float = None,
279 total_timeout: float = None,
280 series: str = "xenial",
281 wait: bool = True,
282 ) -> (Machine, bool):
283 """
284 Create machine
285
286 :param: model_name: Model name
287 :param: machine_id: Machine id
288 :param: db_dict: Dictionary with data of the DB to write the updates
289 :param: progress_timeout: Maximum time between two updates in the model
290 :param: total_timeout: Timeout for the entity to be active
291 :param: series: Series of the machine (xenial, bionic, focal, ...)
292 :param: wait: Wait until machine is ready
293
294 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
295 if the machine is new or it already existed
296 """
297 new = False
298 machine = None
299
300 self.log.debug(
301 "Creating machine (id={}) in model: {}".format(machine_id, model_name)
302 )
303
304 # Get controller
305 controller = await self.get_controller()
306
307 # Get model
308 model = await self.get_model(controller, model_name)
309 try:
310 if machine_id is not None:
311 self.log.debug(
312 "Searching machine (id={}) in model {}".format(
313 machine_id, model_name
314 )
315 )
316
317 # Get machines from model and get the machine with machine_id if exists
318 machines = await model.get_machines()
319 if machine_id in machines:
320 self.log.debug(
321 "Machine (id={}) found in model {}".format(
322 machine_id, model_name
323 )
324 )
325 machine = machines[machine_id]
326 else:
327 raise JujuMachineNotFound("Machine {} not found".format(machine_id))
328
329 if machine is None:
330 self.log.debug("Creating a new machine in model {}".format(model_name))
331
332 # Create machine
333 machine = await model.add_machine(
334 spec=None, constraints=None, disks=None, series=series
335 )
336 new = True
337
338 # Wait until the machine is ready
339 self.log.debug(
340 "Wait until machine {} is ready in model {}".format(
341 machine.entity_id, model_name
342 )
343 )
344 if wait:
345 await JujuModelWatcher.wait_for(
346 model=model,
347 entity=machine,
348 progress_timeout=progress_timeout,
349 total_timeout=total_timeout,
350 db_dict=db_dict,
351 n2vc=self.n2vc,
352 )
353 finally:
354 await self.disconnect_model(model)
355 await self.disconnect_controller(controller)
356
357 self.log.debug(
358 "Machine {} ready at {} in model {}".format(
359 machine.entity_id, machine.dns_name, model_name
360 )
361 )
362 return machine, new
363
364 async def provision_machine(
365 self,
366 model_name: str,
367 hostname: str,
368 username: str,
369 private_key_path: str,
370 db_dict: dict = None,
371 progress_timeout: float = None,
372 total_timeout: float = None,
373 ) -> str:
374 """
375 Manually provisioning of a machine
376
377 :param: model_name: Model name
378 :param: hostname: IP to access the machine
379 :param: username: Username to login to the machine
380 :param: private_key_path: Local path for the private key
381 :param: db_dict: Dictionary with data of the DB to write the updates
382 :param: progress_timeout: Maximum time between two updates in the model
383 :param: total_timeout: Timeout for the entity to be active
384
385 :return: (Entity): Machine id
386 """
387 self.log.debug(
388 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
389 model_name, hostname, username
390 )
391 )
392
393 # Get controller
394 controller = await self.get_controller()
395
396 # Get model
397 model = await self.get_model(controller, model_name)
398
399 try:
400 # Get provisioner
401 provisioner = AsyncSSHProvisioner(
402 host=hostname,
403 user=username,
404 private_key_path=private_key_path,
405 log=self.log,
406 )
407
408 # Provision machine
409 params = await provisioner.provision_machine()
410
411 params.jobs = ["JobHostUnits"]
412
413 self.log.debug("Adding machine to model")
414 connection = model.connection()
415 client_facade = client.ClientFacade.from_connection(connection)
416
417 results = await client_facade.AddMachines(params=[params])
418 error = results.machines[0].error
419
420 if error:
421 msg = "Error adding machine: {}".format(error.message)
422 self.log.error(msg=msg)
423 raise ValueError(msg)
424
425 machine_id = results.machines[0].machine
426
427 self.log.debug("Installing Juju agent into machine {}".format(machine_id))
428 asyncio.ensure_future(
429 provisioner.install_agent(
430 connection=connection,
431 nonce=params.nonce,
432 machine_id=machine_id,
433 proxy=self.api_proxy,
434 )
435 )
436
437 machine = None
438 for _ in range(10):
439 machine_list = await model.get_machines()
440 if machine_id in machine_list:
441 self.log.debug("Machine {} found in model!".format(machine_id))
442 machine = model.machines.get(machine_id)
443 break
444 await asyncio.sleep(2)
445
446 if machine is None:
447 msg = "Machine {} not found in model".format(machine_id)
448 self.log.error(msg=msg)
449 raise JujuMachineNotFound(msg)
450
451 self.log.debug(
452 "Wait until machine {} is ready in model {}".format(
453 machine.entity_id, model_name
454 )
455 )
456 await JujuModelWatcher.wait_for(
457 model=model,
458 entity=machine,
459 progress_timeout=progress_timeout,
460 total_timeout=total_timeout,
461 db_dict=db_dict,
462 n2vc=self.n2vc,
463 )
464 except Exception as e:
465 raise e
466 finally:
467 await self.disconnect_model(model)
468 await self.disconnect_controller(controller)
469
470 self.log.debug(
471 "Machine provisioned {} in model {}".format(machine_id, model_name)
472 )
473
474 return machine_id
475
476 async def deploy_charm(
477 self,
478 application_name: str,
479 path: str,
480 model_name: str,
481 machine_id: str,
482 db_dict: dict = None,
483 progress_timeout: float = None,
484 total_timeout: float = None,
485 config: dict = None,
486 series: str = None,
487 num_units: int = 1,
488 ):
489 """Deploy charm
490
491 :param: application_name: Application name
492 :param: path: Local path to the charm
493 :param: model_name: Model name
494 :param: machine_id ID of the machine
495 :param: db_dict: Dictionary with data of the DB to write the updates
496 :param: progress_timeout: Maximum time between two updates in the model
497 :param: total_timeout: Timeout for the entity to be active
498 :param: config: Config for the charm
499 :param: series: Series of the charm
500 :param: num_units: Number of units
501
502 :return: (juju.application.Application): Juju application
503 """
504 self.log.debug(
505 "Deploying charm {} to machine {} in model ~{}".format(
506 application_name, machine_id, model_name
507 )
508 )
509 self.log.debug("charm: {}".format(path))
510
511 # Get controller
512 controller = await self.get_controller()
513
514 # Get model
515 model = await self.get_model(controller, model_name)
516
517 try:
518 application = None
519 if application_name not in model.applications:
520
521 if machine_id is not None:
522 if machine_id not in model.machines:
523 msg = "Machine {} not found in model".format(machine_id)
524 self.log.error(msg=msg)
525 raise JujuMachineNotFound(msg)
526 machine = model.machines[machine_id]
527 series = machine.series
528
529 application = await model.deploy(
530 entity_url=path,
531 application_name=application_name,
532 channel="stable",
533 num_units=1,
534 series=series,
535 to=machine_id,
536 config=config,
537 )
538
539 self.log.debug(
540 "Wait until application {} is ready in model {}".format(
541 application_name, model_name
542 )
543 )
544 if num_units > 1:
545 for _ in range(num_units - 1):
546 m, _ = await self.create_machine(model_name, wait=False)
547 await application.add_unit(to=m.entity_id)
548
549 await JujuModelWatcher.wait_for(
550 model=model,
551 entity=application,
552 progress_timeout=progress_timeout,
553 total_timeout=total_timeout,
554 db_dict=db_dict,
555 n2vc=self.n2vc,
556 )
557 self.log.debug(
558 "Application {} is ready in model {}".format(
559 application_name, model_name
560 )
561 )
562 else:
563 raise JujuApplicationExists(
564 "Application {} exists".format(application_name)
565 )
566 finally:
567 await self.disconnect_model(model)
568 await self.disconnect_controller(controller)
569
570 return application
571
572 def _get_application(self, model: Model, application_name: str) -> Application:
573 """Get application
574
575 :param: model: Model object
576 :param: application_name: Application name
577
578 :return: juju.application.Application (or None if it doesn't exist)
579 """
580 if model.applications and application_name in model.applications:
581 return model.applications[application_name]
582
583 async def execute_action(
584 self,
585 application_name: str,
586 model_name: str,
587 action_name: str,
588 db_dict: dict = None,
589 progress_timeout: float = None,
590 total_timeout: float = None,
591 **kwargs
592 ):
593 """Execute action
594
595 :param: application_name: Application name
596 :param: model_name: Model name
597 :param: action_name: Name of the action
598 :param: db_dict: Dictionary with data of the DB to write the updates
599 :param: progress_timeout: Maximum time between two updates in the model
600 :param: total_timeout: Timeout for the entity to be active
601
602 :return: (str, str): (output and status)
603 """
604 self.log.debug(
605 "Executing action {} using params {}".format(action_name, kwargs)
606 )
607 # Get controller
608 controller = await self.get_controller()
609
610 # Get model
611 model = await self.get_model(controller, model_name)
612
613 try:
614 # Get application
615 application = self._get_application(
616 model, application_name=application_name,
617 )
618 if application is None:
619 raise JujuApplicationNotFound("Cannot execute action")
620
621 # Get unit
622 unit = None
623 for u in application.units:
624 if await u.is_leader_from_status():
625 unit = u
626 if unit is None:
627 raise JujuLeaderUnitNotFound("Cannot execute action: leader unit not found")
628
629 actions = await application.get_actions()
630
631 if action_name not in actions:
632 raise JujuActionNotFound(
633 "Action {} not in available actions".format(action_name)
634 )
635
636 action = await unit.run_action(action_name, **kwargs)
637
638 self.log.debug(
639 "Wait until action {} is completed in application {} (model={})".format(
640 action_name, application_name, model_name
641 )
642 )
643 await JujuModelWatcher.wait_for(
644 model=model,
645 entity=action,
646 progress_timeout=progress_timeout,
647 total_timeout=total_timeout,
648 db_dict=db_dict,
649 n2vc=self.n2vc,
650 )
651
652 output = await model.get_action_output(action_uuid=action.entity_id)
653 status = await model.get_action_status(uuid_or_prefix=action.entity_id)
654 status = (
655 status[action.entity_id] if action.entity_id in status else "failed"
656 )
657
658 self.log.debug(
659 "Action {} completed with status {} in application {} (model={})".format(
660 action_name, action.status, application_name, model_name
661 )
662 )
663 finally:
664 await self.disconnect_model(model)
665 await self.disconnect_controller(controller)
666
667 return output, status
668
669 async def get_actions(self, application_name: str, model_name: str) -> dict:
670 """Get list of actions
671
672 :param: application_name: Application name
673 :param: model_name: Model name
674
675 :return: Dict with this format
676 {
677 "action_name": "Description of the action",
678 ...
679 }
680 """
681 self.log.debug(
682 "Getting list of actions for application {}".format(application_name)
683 )
684
685 # Get controller
686 controller = await self.get_controller()
687
688 # Get model
689 model = await self.get_model(controller, model_name)
690
691 try:
692 # Get application
693 application = self._get_application(
694 model, application_name=application_name,
695 )
696
697 # Return list of actions
698 return await application.get_actions()
699
700 finally:
701 # Disconnect from model and controller
702 await self.disconnect_model(model)
703 await self.disconnect_controller(controller)
704
705 async def add_relation(
706 self,
707 model_name: str,
708 endpoint_1: str,
709 endpoint_2: str,
710 ):
711 """Add relation
712
713 :param: model_name: Model name
714 :param: endpoint_1 First endpoint name
715 ("app:endpoint" format or directly the saas name)
716 :param: endpoint_2: Second endpoint name (^ same format)
717 """
718
719 self.log.debug("Adding relation: {} -> {}".format(endpoint_1, endpoint_2))
720
721 # Get controller
722 controller = await self.get_controller()
723
724 # Get model
725 model = await self.get_model(controller, model_name)
726
727 # Add relation
728 try:
729 await model.add_relation(endpoint_1, endpoint_2)
730 except JujuAPIError as e:
731 if "not found" in e.message:
732 self.log.warning("Relation not found: {}".format(e.message))
733 return
734 if "already exists" in e.message:
735 self.log.warning("Relation already exists: {}".format(e.message))
736 return
737 # another exception, raise it
738 raise e
739 finally:
740 await self.disconnect_model(model)
741 await self.disconnect_controller(controller)
742
743 async def consume(
744 self, offer_url: str, model_name: str,
745 ):
746 """
747 Adds a remote offer to the model. Relations can be created later using "juju relate".
748
749 :param: offer_url: Offer Url
750 :param: model_name: Model name
751
752 :raises ParseError if there's a problem parsing the offer_url
753 :raises JujuError if remote offer includes and endpoint
754 :raises JujuAPIError if the operation is not successful
755 """
756 controller = await self.get_controller()
757 model = await controller.get_model(model_name)
758
759 try:
760 await model.consume(offer_url)
761 finally:
762 await self.disconnect_model(model)
763 await self.disconnect_controller(controller)
764
765 async def destroy_model(self, model_name: str, total_timeout: float):
766 """
767 Destroy model
768
769 :param: model_name: Model name
770 :param: total_timeout: Timeout
771 """
772
773 controller = await self.get_controller()
774 model = await self.get_model(controller, model_name)
775 try:
776 self.log.debug("Destroying model {}".format(model_name))
777 uuid = model.info.uuid
778
779 # Destroy machines
780 machines = await model.get_machines()
781 for machine_id in machines:
782 try:
783 await self.destroy_machine(
784 model, machine_id=machine_id, total_timeout=total_timeout,
785 )
786 except asyncio.CancelledError:
787 raise
788 except Exception:
789 pass
790
791 # Disconnect model
792 await self.disconnect_model(model)
793
794 # Destroy model
795 if model_name in self.models:
796 self.models.remove(model_name)
797
798 await controller.destroy_model(uuid)
799
800 # Wait until model is destroyed
801 self.log.debug("Waiting for model {} to be destroyed...".format(model_name))
802 last_exception = ""
803
804 if total_timeout is None:
805 total_timeout = 3600
806 end = time.time() + total_timeout
807 while time.time() < end:
808 try:
809 models = await controller.list_models()
810 if model_name not in models:
811 self.log.debug(
812 "The model {} ({}) was destroyed".format(model_name, uuid)
813 )
814 return
815 except asyncio.CancelledError:
816 raise
817 except Exception as e:
818 last_exception = e
819 await asyncio.sleep(5)
820 raise Exception(
821 "Timeout waiting for model {} to be destroyed {}".format(
822 model_name, last_exception
823 )
824 )
825 finally:
826 await self.disconnect_controller(controller)
827
828 async def destroy_application(self, model: Model, application_name: str):
829 """
830 Destroy application
831
832 :param: model: Model object
833 :param: application_name: Application name
834 """
835 self.log.debug(
836 "Destroying application {} in model {}".format(
837 application_name, model.info.name
838 )
839 )
840 application = model.applications.get(application_name)
841 if application:
842 await application.destroy()
843 else:
844 self.log.warning("Application not found: {}".format(application_name))
845
846 async def destroy_machine(
847 self, model: Model, machine_id: str, total_timeout: float = 3600
848 ):
849 """
850 Destroy machine
851
852 :param: model: Model object
853 :param: machine_id: Machine id
854 :param: total_timeout: Timeout in seconds
855 """
856 machines = await model.get_machines()
857 if machine_id in machines:
858 machine = machines[machine_id]
859 await machine.destroy(force=True)
860 # max timeout
861 end = time.time() + total_timeout
862
863 # wait for machine removal
864 machines = await model.get_machines()
865 while machine_id in machines and time.time() < end:
866 self.log.debug(
867 "Waiting for machine {} is destroyed".format(machine_id)
868 )
869 await asyncio.sleep(0.5)
870 machines = await model.get_machines()
871 self.log.debug("Machine destroyed: {}".format(machine_id))
872 else:
873 self.log.debug("Machine not found: {}".format(machine_id))
874
875 async def configure_application(
876 self, model_name: str, application_name: str, config: dict = None
877 ):
878 """Configure application
879
880 :param: model_name: Model name
881 :param: application_name: Application name
882 :param: config: Config to apply to the charm
883 """
884 self.log.debug("Configuring application {}".format(application_name))
885
886 if config:
887 try:
888 controller = await self.get_controller()
889 model = await self.get_model(controller, model_name)
890 application = self._get_application(
891 model, application_name=application_name,
892 )
893 await application.set_config(config)
894 finally:
895 await self.disconnect_model(model)
896 await self.disconnect_controller(controller)
897
898 def _get_api_endpoints_db(self) -> [str]:
899 """
900 Get API Endpoints from DB
901
902 :return: List of API endpoints
903 """
904 self.log.debug("Getting endpoints from database")
905
906 juju_info = self.db.get_one(
907 DB_DATA.api_endpoints.table,
908 q_filter=DB_DATA.api_endpoints.filter,
909 fail_on_empty=False,
910 )
911 if juju_info and DB_DATA.api_endpoints.key in juju_info:
912 return juju_info[DB_DATA.api_endpoints.key]
913
914 def _update_api_endpoints_db(self, endpoints: [str]):
915 """
916 Update API endpoints in Database
917
918 :param: List of endpoints
919 """
920 self.log.debug("Saving endpoints {} in database".format(endpoints))
921
922 juju_info = self.db.get_one(
923 DB_DATA.api_endpoints.table,
924 q_filter=DB_DATA.api_endpoints.filter,
925 fail_on_empty=False,
926 )
927 # If it doesn't, then create it
928 if not juju_info:
929 try:
930 self.db.create(
931 DB_DATA.api_endpoints.table, DB_DATA.api_endpoints.filter,
932 )
933 except DbException as e:
934 # Racing condition: check if another N2VC worker has created it
935 juju_info = self.db.get_one(
936 DB_DATA.api_endpoints.table,
937 q_filter=DB_DATA.api_endpoints.filter,
938 fail_on_empty=False,
939 )
940 if not juju_info:
941 raise e
942 self.db.set_one(
943 DB_DATA.api_endpoints.table,
944 DB_DATA.api_endpoints.filter,
945 {DB_DATA.api_endpoints.key: endpoints},
946 )
947
948 def handle_exception(self, loop, context):
949 # All unhandled exceptions by libjuju are handled here.
950 pass
951
952 async def health_check(self, interval: float = 300.0):
953 """
954 Health check to make sure controller and controller_model connections are OK
955
956 :param: interval: Time in seconds between checks
957 """
958 while True:
959 try:
960 controller = await self.get_controller()
961 # self.log.debug("VCA is alive")
962 except Exception as e:
963 self.log.error("Health check to VCA failed: {}".format(e))
964 finally:
965 await self.disconnect_controller(controller)
966 await asyncio.sleep(interval)
967
968 async def list_models(self, contains: str = None) -> [str]:
969 """List models with certain names
970
971 :param: contains: String that is contained in model name
972
973 :retur: [models] Returns list of model names
974 """
975
976 controller = await self.get_controller()
977 try:
978 models = await controller.list_models()
979 if contains:
980 models = [model for model in models if contains in model]
981 return models
982 finally:
983 await self.disconnect_controller(controller)
984
985 async def list_offers(self, model_name: str) -> QueryApplicationOffersResults:
986 """List models with certain names
987
988 :param: model_name: Model name
989
990 :return: Returns list of offers
991 """
992
993 controller = await self.get_controller()
994 try:
995 return await controller.list_offers(model_name)
996 finally:
997 await self.disconnect_controller(controller)