Add get_metrics command to n2vc_juju_conn and libjuju
[osm/N2VC.git] / n2vc / libjuju.py
1 # Copyright 2020 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import logging
17 from juju.controller import Controller
18 from juju.client import client
19 import time
20
21 from juju.errors import JujuAPIError
22 from juju.model import Model
23 from juju.machine import Machine
24 from juju.application import Application
25 from juju.client._definitions import (
26 FullStatus,
27 QueryApplicationOffersResults,
28 Cloud,
29 CloudCredential,
30 )
31 from n2vc.juju_watcher import JujuModelWatcher
32 from n2vc.provisioner import AsyncSSHProvisioner
33 from n2vc.n2vc_conn import N2VCConnector
34 from n2vc.exceptions import (
35 JujuMachineNotFound,
36 JujuApplicationNotFound,
37 JujuLeaderUnitNotFound,
38 JujuActionNotFound,
39 JujuModelAlreadyExists,
40 JujuControllerFailedConnecting,
41 JujuApplicationExists,
42 )
43 from n2vc.utils import DB_DATA
44 from osm_common.dbbase import DbException
45
46
47 class Libjuju:
48 def __init__(
49 self,
50 endpoint: str,
51 api_proxy: str,
52 username: str,
53 password: str,
54 cacert: str,
55 loop: asyncio.AbstractEventLoop = None,
56 log: logging.Logger = None,
57 db: dict = None,
58 n2vc: N2VCConnector = None,
59 apt_mirror: str = None,
60 enable_os_upgrade: bool = True,
61 ):
62 """
63 Constructor
64
65 :param: endpoint: Endpoint of the juju controller (host:port)
66 :param: api_proxy: Endpoint of the juju controller - Reachable from the VNFs
67 :param: username: Juju username
68 :param: password: Juju password
69 :param: cacert: Juju CA Certificate
70 :param: loop: Asyncio loop
71 :param: log: Logger
72 :param: db: DB object
73 :param: n2vc: N2VC object
74 :param: apt_mirror: APT Mirror
75 :param: enable_os_upgrade: Enable OS Upgrade
76 """
77
78 self.log = log or logging.getLogger("Libjuju")
79 self.db = db
80 db_endpoints = self._get_api_endpoints_db()
81 self.endpoints = db_endpoints or [endpoint]
82 if db_endpoints is None:
83 self._update_api_endpoints_db(self.endpoints)
84 self.api_proxy = api_proxy
85 self.username = username
86 self.password = password
87 self.cacert = cacert
88 self.loop = loop or asyncio.get_event_loop()
89 self.n2vc = n2vc
90
91 # Generate config for models
92 self.model_config = {}
93 if apt_mirror:
94 self.model_config["apt-mirror"] = apt_mirror
95 self.model_config["enable-os-refresh-update"] = enable_os_upgrade
96 self.model_config["enable-os-upgrade"] = enable_os_upgrade
97
98 self.loop.set_exception_handler(self.handle_exception)
99 self.creating_model = asyncio.Lock(loop=self.loop)
100
101 self.models = set()
102 self.log.debug("Libjuju initialized!")
103
104 self.health_check_task = self.loop.create_task(self.health_check())
105
106 async def get_controller(self, timeout: float = 5.0) -> Controller:
107 """
108 Get controller
109
110 :param: timeout: Time in seconds to wait for controller to connect
111 """
112 controller = None
113 try:
114 controller = Controller(loop=self.loop)
115 await asyncio.wait_for(
116 controller.connect(
117 endpoint=self.endpoints,
118 username=self.username,
119 password=self.password,
120 cacert=self.cacert,
121 ),
122 timeout=timeout,
123 )
124 endpoints = await controller.api_endpoints
125 if self.endpoints != endpoints:
126 self.endpoints = endpoints
127 self._update_api_endpoints_db(self.endpoints)
128 return controller
129 except asyncio.CancelledError as e:
130 raise e
131 except Exception as e:
132 self.log.error(
133 "Failed connecting to controller: {}...".format(self.endpoints)
134 )
135 if controller:
136 await self.disconnect_controller(controller)
137 raise JujuControllerFailedConnecting(e)
138
139 async def disconnect(self):
140 """Disconnect"""
141 # Cancel health check task
142 self.health_check_task.cancel()
143 self.log.debug("Libjuju disconnected!")
144
145 async def disconnect_model(self, model: Model):
146 """
147 Disconnect model
148
149 :param: model: Model that will be disconnected
150 """
151 await model.disconnect()
152
153 async def disconnect_controller(self, controller: Controller):
154 """
155 Disconnect controller
156
157 :param: controller: Controller that will be disconnected
158 """
159 await controller.disconnect()
160
161 async def add_model(self, model_name: str, cloud_name: str):
162 """
163 Create model
164
165 :param: model_name: Model name
166 :param: cloud_name: Cloud name
167 """
168
169 # Get controller
170 controller = await self.get_controller()
171 model = None
172 try:
173 # Raise exception if model already exists
174 if await self.model_exists(model_name, controller=controller):
175 raise JujuModelAlreadyExists(
176 "Model {} already exists.".format(model_name)
177 )
178
179 # Block until other workers have finished model creation
180 while self.creating_model.locked():
181 await asyncio.sleep(0.1)
182
183 # If the model exists, return it from the controller
184 if model_name in self.models:
185 return
186
187 # Create the model
188 async with self.creating_model:
189 self.log.debug("Creating model {}".format(model_name))
190 model = await controller.add_model(
191 model_name,
192 config=self.model_config,
193 cloud_name=cloud_name,
194 credential_name=cloud_name,
195 )
196 self.models.add(model_name)
197 finally:
198 if model:
199 await self.disconnect_model(model)
200 await self.disconnect_controller(controller)
201
202 async def get_model(
203 self, controller: Controller, model_name: str, id=None
204 ) -> Model:
205 """
206 Get model from controller
207
208 :param: controller: Controller
209 :param: model_name: Model name
210
211 :return: Model: The created Juju model object
212 """
213 return await controller.get_model(model_name)
214
215 async def model_exists(
216 self, model_name: str, controller: Controller = None
217 ) -> bool:
218 """
219 Check if model exists
220
221 :param: controller: Controller
222 :param: model_name: Model name
223
224 :return bool
225 """
226 need_to_disconnect = False
227
228 # Get controller if not passed
229 if not controller:
230 controller = await self.get_controller()
231 need_to_disconnect = True
232
233 # Check if model exists
234 try:
235 return model_name in await controller.list_models()
236 finally:
237 if need_to_disconnect:
238 await self.disconnect_controller(controller)
239
240 async def models_exist(self, model_names: [str]) -> (bool, list):
241 """
242 Check if models exists
243
244 :param: model_names: List of strings with model names
245
246 :return (bool, list[str]): (True if all models exists, List of model names that don't exist)
247 """
248 if not model_names:
249 raise Exception(
250 "model_names must be a non-empty array. Given value: {}".format(
251 model_names
252 )
253 )
254 non_existing_models = []
255 models = await self.list_models()
256 existing_models = list(set(models).intersection(model_names))
257 non_existing_models = list(set(model_names) - set(existing_models))
258
259 return (
260 len(non_existing_models) == 0,
261 non_existing_models,
262 )
263
264 async def get_model_status(self, model_name: str) -> FullStatus:
265 """
266 Get model status
267
268 :param: model_name: Model name
269
270 :return: Full status object
271 """
272 controller = await self.get_controller()
273 model = await self.get_model(controller, model_name)
274 try:
275 return await model.get_status()
276 finally:
277 await self.disconnect_model(model)
278 await self.disconnect_controller(controller)
279
280 async def create_machine(
281 self,
282 model_name: str,
283 machine_id: str = None,
284 db_dict: dict = None,
285 progress_timeout: float = None,
286 total_timeout: float = None,
287 series: str = "xenial",
288 wait: bool = True,
289 ) -> (Machine, bool):
290 """
291 Create machine
292
293 :param: model_name: Model name
294 :param: machine_id: Machine id
295 :param: db_dict: Dictionary with data of the DB to write the updates
296 :param: progress_timeout: Maximum time between two updates in the model
297 :param: total_timeout: Timeout for the entity to be active
298 :param: series: Series of the machine (xenial, bionic, focal, ...)
299 :param: wait: Wait until machine is ready
300
301 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
302 if the machine is new or it already existed
303 """
304 new = False
305 machine = None
306
307 self.log.debug(
308 "Creating machine (id={}) in model: {}".format(machine_id, model_name)
309 )
310
311 # Get controller
312 controller = await self.get_controller()
313
314 # Get model
315 model = await self.get_model(controller, model_name)
316 try:
317 if machine_id is not None:
318 self.log.debug(
319 "Searching machine (id={}) in model {}".format(
320 machine_id, model_name
321 )
322 )
323
324 # Get machines from model and get the machine with machine_id if exists
325 machines = await model.get_machines()
326 if machine_id in machines:
327 self.log.debug(
328 "Machine (id={}) found in model {}".format(
329 machine_id, model_name
330 )
331 )
332 machine = machines[machine_id]
333 else:
334 raise JujuMachineNotFound("Machine {} not found".format(machine_id))
335
336 if machine is None:
337 self.log.debug("Creating a new machine in model {}".format(model_name))
338
339 # Create machine
340 machine = await model.add_machine(
341 spec=None, constraints=None, disks=None, series=series
342 )
343 new = True
344
345 # Wait until the machine is ready
346 self.log.debug(
347 "Wait until machine {} is ready in model {}".format(
348 machine.entity_id, model_name
349 )
350 )
351 if wait:
352 await JujuModelWatcher.wait_for(
353 model=model,
354 entity=machine,
355 progress_timeout=progress_timeout,
356 total_timeout=total_timeout,
357 db_dict=db_dict,
358 n2vc=self.n2vc,
359 )
360 finally:
361 await self.disconnect_model(model)
362 await self.disconnect_controller(controller)
363
364 self.log.debug(
365 "Machine {} ready at {} in model {}".format(
366 machine.entity_id, machine.dns_name, model_name
367 )
368 )
369 return machine, new
370
371 async def provision_machine(
372 self,
373 model_name: str,
374 hostname: str,
375 username: str,
376 private_key_path: str,
377 db_dict: dict = None,
378 progress_timeout: float = None,
379 total_timeout: float = None,
380 ) -> str:
381 """
382 Manually provisioning of a machine
383
384 :param: model_name: Model name
385 :param: hostname: IP to access the machine
386 :param: username: Username to login to the machine
387 :param: private_key_path: Local path for the private key
388 :param: db_dict: Dictionary with data of the DB to write the updates
389 :param: progress_timeout: Maximum time between two updates in the model
390 :param: total_timeout: Timeout for the entity to be active
391
392 :return: (Entity): Machine id
393 """
394 self.log.debug(
395 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
396 model_name, hostname, username
397 )
398 )
399
400 # Get controller
401 controller = await self.get_controller()
402
403 # Get model
404 model = await self.get_model(controller, model_name)
405
406 try:
407 # Get provisioner
408 provisioner = AsyncSSHProvisioner(
409 host=hostname,
410 user=username,
411 private_key_path=private_key_path,
412 log=self.log,
413 )
414
415 # Provision machine
416 params = await provisioner.provision_machine()
417
418 params.jobs = ["JobHostUnits"]
419
420 self.log.debug("Adding machine to model")
421 connection = model.connection()
422 client_facade = client.ClientFacade.from_connection(connection)
423
424 results = await client_facade.AddMachines(params=[params])
425 error = results.machines[0].error
426
427 if error:
428 msg = "Error adding machine: {}".format(error.message)
429 self.log.error(msg=msg)
430 raise ValueError(msg)
431
432 machine_id = results.machines[0].machine
433
434 self.log.debug("Installing Juju agent into machine {}".format(machine_id))
435 asyncio.ensure_future(
436 provisioner.install_agent(
437 connection=connection,
438 nonce=params.nonce,
439 machine_id=machine_id,
440 proxy=self.api_proxy,
441 )
442 )
443
444 machine = None
445 for _ in range(10):
446 machine_list = await model.get_machines()
447 if machine_id in machine_list:
448 self.log.debug("Machine {} found in model!".format(machine_id))
449 machine = model.machines.get(machine_id)
450 break
451 await asyncio.sleep(2)
452
453 if machine is None:
454 msg = "Machine {} not found in model".format(machine_id)
455 self.log.error(msg=msg)
456 raise JujuMachineNotFound(msg)
457
458 self.log.debug(
459 "Wait until machine {} is ready in model {}".format(
460 machine.entity_id, model_name
461 )
462 )
463 await JujuModelWatcher.wait_for(
464 model=model,
465 entity=machine,
466 progress_timeout=progress_timeout,
467 total_timeout=total_timeout,
468 db_dict=db_dict,
469 n2vc=self.n2vc,
470 )
471 except Exception as e:
472 raise e
473 finally:
474 await self.disconnect_model(model)
475 await self.disconnect_controller(controller)
476
477 self.log.debug(
478 "Machine provisioned {} in model {}".format(machine_id, model_name)
479 )
480
481 return machine_id
482
483 async def deploy_charm(
484 self,
485 application_name: str,
486 path: str,
487 model_name: str,
488 machine_id: str,
489 db_dict: dict = None,
490 progress_timeout: float = None,
491 total_timeout: float = None,
492 config: dict = None,
493 series: str = None,
494 num_units: int = 1,
495 ):
496 """Deploy charm
497
498 :param: application_name: Application name
499 :param: path: Local path to the charm
500 :param: model_name: Model name
501 :param: machine_id ID of the machine
502 :param: db_dict: Dictionary with data of the DB to write the updates
503 :param: progress_timeout: Maximum time between two updates in the model
504 :param: total_timeout: Timeout for the entity to be active
505 :param: config: Config for the charm
506 :param: series: Series of the charm
507 :param: num_units: Number of units
508
509 :return: (juju.application.Application): Juju application
510 """
511 self.log.debug(
512 "Deploying charm {} to machine {} in model ~{}".format(
513 application_name, machine_id, model_name
514 )
515 )
516 self.log.debug("charm: {}".format(path))
517
518 # Get controller
519 controller = await self.get_controller()
520
521 # Get model
522 model = await self.get_model(controller, model_name)
523
524 try:
525 application = None
526 if application_name not in model.applications:
527
528 if machine_id is not None:
529 if machine_id not in model.machines:
530 msg = "Machine {} not found in model".format(machine_id)
531 self.log.error(msg=msg)
532 raise JujuMachineNotFound(msg)
533 machine = model.machines[machine_id]
534 series = machine.series
535
536 application = await model.deploy(
537 entity_url=path,
538 application_name=application_name,
539 channel="stable",
540 num_units=1,
541 series=series,
542 to=machine_id,
543 config=config,
544 )
545
546 self.log.debug(
547 "Wait until application {} is ready in model {}".format(
548 application_name, model_name
549 )
550 )
551 if num_units > 1:
552 for _ in range(num_units - 1):
553 m, _ = await self.create_machine(model_name, wait=False)
554 await application.add_unit(to=m.entity_id)
555
556 await JujuModelWatcher.wait_for(
557 model=model,
558 entity=application,
559 progress_timeout=progress_timeout,
560 total_timeout=total_timeout,
561 db_dict=db_dict,
562 n2vc=self.n2vc,
563 )
564 self.log.debug(
565 "Application {} is ready in model {}".format(
566 application_name, model_name
567 )
568 )
569 else:
570 raise JujuApplicationExists(
571 "Application {} exists".format(application_name)
572 )
573 finally:
574 await self.disconnect_model(model)
575 await self.disconnect_controller(controller)
576
577 return application
578
579 def _get_application(self, model: Model, application_name: str) -> Application:
580 """Get application
581
582 :param: model: Model object
583 :param: application_name: Application name
584
585 :return: juju.application.Application (or None if it doesn't exist)
586 """
587 if model.applications and application_name in model.applications:
588 return model.applications[application_name]
589
590 async def execute_action(
591 self,
592 application_name: str,
593 model_name: str,
594 action_name: str,
595 db_dict: dict = None,
596 progress_timeout: float = None,
597 total_timeout: float = None,
598 **kwargs
599 ):
600 """Execute action
601
602 :param: application_name: Application name
603 :param: model_name: Model name
604 :param: action_name: Name of the action
605 :param: db_dict: Dictionary with data of the DB to write the updates
606 :param: progress_timeout: Maximum time between two updates in the model
607 :param: total_timeout: Timeout for the entity to be active
608
609 :return: (str, str): (output and status)
610 """
611 self.log.debug(
612 "Executing action {} using params {}".format(action_name, kwargs)
613 )
614 # Get controller
615 controller = await self.get_controller()
616
617 # Get model
618 model = await self.get_model(controller, model_name)
619
620 try:
621 # Get application
622 application = self._get_application(
623 model, application_name=application_name,
624 )
625 if application is None:
626 raise JujuApplicationNotFound("Cannot execute action")
627
628 # Get unit
629 unit = None
630 for u in application.units:
631 if await u.is_leader_from_status():
632 unit = u
633 if unit is None:
634 raise JujuLeaderUnitNotFound(
635 "Cannot execute action: leader unit not found"
636 )
637
638 actions = await application.get_actions()
639
640 if action_name not in actions:
641 raise JujuActionNotFound(
642 "Action {} not in available actions".format(action_name)
643 )
644
645 action = await unit.run_action(action_name, **kwargs)
646
647 self.log.debug(
648 "Wait until action {} is completed in application {} (model={})".format(
649 action_name, application_name, model_name
650 )
651 )
652 await JujuModelWatcher.wait_for(
653 model=model,
654 entity=action,
655 progress_timeout=progress_timeout,
656 total_timeout=total_timeout,
657 db_dict=db_dict,
658 n2vc=self.n2vc,
659 )
660
661 output = await model.get_action_output(action_uuid=action.entity_id)
662 status = await model.get_action_status(uuid_or_prefix=action.entity_id)
663 status = (
664 status[action.entity_id] if action.entity_id in status else "failed"
665 )
666
667 self.log.debug(
668 "Action {} completed with status {} in application {} (model={})".format(
669 action_name, action.status, application_name, model_name
670 )
671 )
672 finally:
673 await self.disconnect_model(model)
674 await self.disconnect_controller(controller)
675
676 return output, status
677
678 async def get_actions(self, application_name: str, model_name: str) -> dict:
679 """Get list of actions
680
681 :param: application_name: Application name
682 :param: model_name: Model name
683
684 :return: Dict with this format
685 {
686 "action_name": "Description of the action",
687 ...
688 }
689 """
690 self.log.debug(
691 "Getting list of actions for application {}".format(application_name)
692 )
693
694 # Get controller
695 controller = await self.get_controller()
696
697 # Get model
698 model = await self.get_model(controller, model_name)
699
700 try:
701 # Get application
702 application = self._get_application(
703 model, application_name=application_name,
704 )
705
706 # Return list of actions
707 return await application.get_actions()
708
709 finally:
710 # Disconnect from model and controller
711 await self.disconnect_model(model)
712 await self.disconnect_controller(controller)
713
714 async def get_metrics(self, model_name: str, application_name: str) -> dict:
715 """Get the metrics collected by the VCA.
716
717 :param model_name The name or unique id of the network service
718 :param application_name The name of the application
719 """
720 if not model_name or not application_name:
721 raise Exception("model_name and application_name must be non-empty strings")
722 metrics = {}
723 controller = await self.get_controller()
724 model = await self.get_model(controller, model_name)
725 try:
726 application = self._get_application(model, application_name)
727 if application is not None:
728 metrics = await application.get_metrics()
729 finally:
730 self.disconnect_model(model)
731 self.disconnect_controller(controller)
732 return metrics
733
734 async def add_relation(
735 self, model_name: str, endpoint_1: str, endpoint_2: str,
736 ):
737 """Add relation
738
739 :param: model_name: Model name
740 :param: endpoint_1 First endpoint name
741 ("app:endpoint" format or directly the saas name)
742 :param: endpoint_2: Second endpoint name (^ same format)
743 """
744
745 self.log.debug("Adding relation: {} -> {}".format(endpoint_1, endpoint_2))
746
747 # Get controller
748 controller = await self.get_controller()
749
750 # Get model
751 model = await self.get_model(controller, model_name)
752
753 # Add relation
754 try:
755 await model.add_relation(endpoint_1, endpoint_2)
756 except JujuAPIError as e:
757 if "not found" in e.message:
758 self.log.warning("Relation not found: {}".format(e.message))
759 return
760 if "already exists" in e.message:
761 self.log.warning("Relation already exists: {}".format(e.message))
762 return
763 # another exception, raise it
764 raise e
765 finally:
766 await self.disconnect_model(model)
767 await self.disconnect_controller(controller)
768
769 async def consume(
770 self, offer_url: str, model_name: str,
771 ):
772 """
773 Adds a remote offer to the model. Relations can be created later using "juju relate".
774
775 :param: offer_url: Offer Url
776 :param: model_name: Model name
777
778 :raises ParseError if there's a problem parsing the offer_url
779 :raises JujuError if remote offer includes and endpoint
780 :raises JujuAPIError if the operation is not successful
781 """
782 controller = await self.get_controller()
783 model = await controller.get_model(model_name)
784
785 try:
786 await model.consume(offer_url)
787 finally:
788 await self.disconnect_model(model)
789 await self.disconnect_controller(controller)
790
791 async def destroy_model(self, model_name: str, total_timeout: float):
792 """
793 Destroy model
794
795 :param: model_name: Model name
796 :param: total_timeout: Timeout
797 """
798
799 controller = await self.get_controller()
800 model = await self.get_model(controller, model_name)
801 try:
802 self.log.debug("Destroying model {}".format(model_name))
803 uuid = model.info.uuid
804
805 # Destroy machines
806 machines = await model.get_machines()
807 for machine_id in machines:
808 try:
809 await self.destroy_machine(
810 model, machine_id=machine_id, total_timeout=total_timeout,
811 )
812 except asyncio.CancelledError:
813 raise
814 except Exception:
815 pass
816
817 # Disconnect model
818 await self.disconnect_model(model)
819
820 # Destroy model
821 if model_name in self.models:
822 self.models.remove(model_name)
823
824 await controller.destroy_model(uuid)
825
826 # Wait until model is destroyed
827 self.log.debug("Waiting for model {} to be destroyed...".format(model_name))
828 last_exception = ""
829
830 if total_timeout is None:
831 total_timeout = 3600
832 end = time.time() + total_timeout
833 while time.time() < end:
834 try:
835 models = await controller.list_models()
836 if model_name not in models:
837 self.log.debug(
838 "The model {} ({}) was destroyed".format(model_name, uuid)
839 )
840 return
841 except asyncio.CancelledError:
842 raise
843 except Exception as e:
844 last_exception = e
845 await asyncio.sleep(5)
846 raise Exception(
847 "Timeout waiting for model {} to be destroyed {}".format(
848 model_name, last_exception
849 )
850 )
851 finally:
852 await self.disconnect_controller(controller)
853
854 async def destroy_application(self, model: Model, application_name: str):
855 """
856 Destroy application
857
858 :param: model: Model object
859 :param: application_name: Application name
860 """
861 self.log.debug(
862 "Destroying application {} in model {}".format(
863 application_name, model.info.name
864 )
865 )
866 application = model.applications.get(application_name)
867 if application:
868 await application.destroy()
869 else:
870 self.log.warning("Application not found: {}".format(application_name))
871
872 async def destroy_machine(
873 self, model: Model, machine_id: str, total_timeout: float = 3600
874 ):
875 """
876 Destroy machine
877
878 :param: model: Model object
879 :param: machine_id: Machine id
880 :param: total_timeout: Timeout in seconds
881 """
882 machines = await model.get_machines()
883 if machine_id in machines:
884 machine = machines[machine_id]
885 await machine.destroy(force=True)
886 # max timeout
887 end = time.time() + total_timeout
888
889 # wait for machine removal
890 machines = await model.get_machines()
891 while machine_id in machines and time.time() < end:
892 self.log.debug("Waiting for machine {} is destroyed".format(machine_id))
893 await asyncio.sleep(0.5)
894 machines = await model.get_machines()
895 self.log.debug("Machine destroyed: {}".format(machine_id))
896 else:
897 self.log.debug("Machine not found: {}".format(machine_id))
898
899 async def configure_application(
900 self, model_name: str, application_name: str, config: dict = None
901 ):
902 """Configure application
903
904 :param: model_name: Model name
905 :param: application_name: Application name
906 :param: config: Config to apply to the charm
907 """
908 self.log.debug("Configuring application {}".format(application_name))
909
910 if config:
911 try:
912 controller = await self.get_controller()
913 model = await self.get_model(controller, model_name)
914 application = self._get_application(
915 model, application_name=application_name,
916 )
917 await application.set_config(config)
918 finally:
919 await self.disconnect_model(model)
920 await self.disconnect_controller(controller)
921
922 def _get_api_endpoints_db(self) -> [str]:
923 """
924 Get API Endpoints from DB
925
926 :return: List of API endpoints
927 """
928 self.log.debug("Getting endpoints from database")
929
930 juju_info = self.db.get_one(
931 DB_DATA.api_endpoints.table,
932 q_filter=DB_DATA.api_endpoints.filter,
933 fail_on_empty=False,
934 )
935 if juju_info and DB_DATA.api_endpoints.key in juju_info:
936 return juju_info[DB_DATA.api_endpoints.key]
937
938 def _update_api_endpoints_db(self, endpoints: [str]):
939 """
940 Update API endpoints in Database
941
942 :param: List of endpoints
943 """
944 self.log.debug("Saving endpoints {} in database".format(endpoints))
945
946 juju_info = self.db.get_one(
947 DB_DATA.api_endpoints.table,
948 q_filter=DB_DATA.api_endpoints.filter,
949 fail_on_empty=False,
950 )
951 # If it doesn't, then create it
952 if not juju_info:
953 try:
954 self.db.create(
955 DB_DATA.api_endpoints.table, DB_DATA.api_endpoints.filter,
956 )
957 except DbException as e:
958 # Racing condition: check if another N2VC worker has created it
959 juju_info = self.db.get_one(
960 DB_DATA.api_endpoints.table,
961 q_filter=DB_DATA.api_endpoints.filter,
962 fail_on_empty=False,
963 )
964 if not juju_info:
965 raise e
966 self.db.set_one(
967 DB_DATA.api_endpoints.table,
968 DB_DATA.api_endpoints.filter,
969 {DB_DATA.api_endpoints.key: endpoints},
970 )
971
972 def handle_exception(self, loop, context):
973 # All unhandled exceptions by libjuju are handled here.
974 pass
975
976 async def health_check(self, interval: float = 300.0):
977 """
978 Health check to make sure controller and controller_model connections are OK
979
980 :param: interval: Time in seconds between checks
981 """
982 while True:
983 try:
984 controller = await self.get_controller()
985 # self.log.debug("VCA is alive")
986 except Exception as e:
987 self.log.error("Health check to VCA failed: {}".format(e))
988 finally:
989 await self.disconnect_controller(controller)
990 await asyncio.sleep(interval)
991
992 async def list_models(self, contains: str = None) -> [str]:
993 """List models with certain names
994
995 :param: contains: String that is contained in model name
996
997 :retur: [models] Returns list of model names
998 """
999
1000 controller = await self.get_controller()
1001 try:
1002 models = await controller.list_models()
1003 if contains:
1004 models = [model for model in models if contains in model]
1005 return models
1006 finally:
1007 await self.disconnect_controller(controller)
1008
1009 async def list_offers(self, model_name: str) -> QueryApplicationOffersResults:
1010 """List models with certain names
1011
1012 :param: model_name: Model name
1013
1014 :return: Returns list of offers
1015 """
1016
1017 controller = await self.get_controller()
1018 try:
1019 return await controller.list_offers(model_name)
1020 finally:
1021 await self.disconnect_controller(controller)
1022
1023 async def add_k8s(self, name: str, auth_data: dict, storage_class: str):
1024 """
1025 Add a Kubernetes cloud to the controller
1026
1027 Similar to the `juju add-k8s` command in the CLI
1028
1029 :param: name: Name for the K8s cloud
1030 :param: auth_data: Dictionary with needed credentials. Format:
1031 {
1032 "server": "192.168.0.21:16443",
1033 "cacert": "-----BEGIN CERTIFI...",
1034 "token": "clhkRExRem5Xd1dCdnFEVXdvRGt...",
1035
1036 }
1037 :param: storage_class: Storage Class to use in the cloud
1038 """
1039
1040 required_auth_data_keys = ["server", "cacert", "token"]
1041 missing_keys = []
1042 for k in required_auth_data_keys:
1043 if k not in auth_data:
1044 missing_keys.append(k)
1045 if missing_keys:
1046 raise Exception(
1047 "missing keys in auth_data: {}".format(",".join(missing_keys))
1048 )
1049 if not storage_class:
1050 raise Exception("storage_class must be a non-empty string")
1051 if not name:
1052 raise Exception("name must be a non-empty string")
1053
1054 endpoint = auth_data["server"]
1055 cacert = auth_data["cacert"]
1056 token = auth_data["token"]
1057 region_name = "{}-region".format(name)
1058
1059 cloud = client.Cloud(
1060 auth_types=["certificate"],
1061 ca_certificates=[cacert],
1062 endpoint=endpoint,
1063 config={
1064 "operator-storage": storage_class,
1065 "workload-storage": storage_class,
1066 },
1067 regions=[client.CloudRegion(endpoint=endpoint, name=region_name)],
1068 type_="kubernetes",
1069 )
1070
1071 cred = client.CloudCredential(
1072 auth_type="certificate",
1073 attrs={"ClientCertificateData": cacert, "Token": token},
1074 )
1075 return await self.add_cloud(name, cloud, cred)
1076
1077 async def add_cloud(
1078 self, name: str, cloud: Cloud, credential: CloudCredential = None
1079 ) -> Cloud:
1080 """
1081 Add cloud to the controller
1082
1083 :param: name: Name of the cloud to be added
1084 :param: cloud: Cloud object
1085 :param: credential: CloudCredentials object for the cloud
1086 """
1087 controller = await self.get_controller()
1088 try:
1089 _ = await controller.add_cloud(name, cloud)
1090 if credential:
1091 await controller.add_credential(name, credential=credential, cloud=name)
1092 # Need to return the object returned by the controller.add_cloud() function
1093 # I'm returning the original value now until this bug is fixed:
1094 # https://github.com/juju/python-libjuju/issues/443
1095 return cloud
1096 finally:
1097 await self.disconnect_controller(controller)
1098
1099 async def remove_cloud(self, name: str):
1100 """
1101 Remove cloud
1102
1103 :param: name: Name of the cloud to be removed
1104 """
1105 controller = await self.get_controller()
1106 try:
1107 await controller.remove_cloud(name)
1108 finally:
1109 await self.disconnect_controller(controller)