a457309cf61de5c4235cb5760e77f237ca5478f8
[osm/N2VC.git] / n2vc / libjuju.py
1 # Copyright 2020 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import logging
17 from juju.controller import Controller
18 from juju.client import client
19 import time
20
21 from juju.errors import JujuAPIError
22 from juju.model import Model
23 from juju.machine import Machine
24 from juju.application import Application
25 from juju.unit import Unit
26 from juju.client._definitions import (
27 FullStatus,
28 QueryApplicationOffersResults,
29 Cloud,
30 CloudCredential,
31 )
32 from n2vc.juju_watcher import JujuModelWatcher
33 from n2vc.provisioner import AsyncSSHProvisioner
34 from n2vc.n2vc_conn import N2VCConnector
35 from n2vc.exceptions import (
36 JujuMachineNotFound,
37 JujuApplicationNotFound,
38 JujuLeaderUnitNotFound,
39 JujuActionNotFound,
40 JujuModelAlreadyExists,
41 JujuControllerFailedConnecting,
42 JujuApplicationExists,
43 JujuInvalidK8sConfiguration,
44 )
45 from n2vc.utils import DB_DATA
46 from osm_common.dbbase import DbException
47 from kubernetes.client.configuration import Configuration
48
49
50 class Libjuju:
51 def __init__(
52 self,
53 endpoint: str,
54 api_proxy: str,
55 username: str,
56 password: str,
57 cacert: str,
58 loop: asyncio.AbstractEventLoop = None,
59 log: logging.Logger = None,
60 db: dict = None,
61 n2vc: N2VCConnector = None,
62 apt_mirror: str = None,
63 enable_os_upgrade: bool = True,
64 ):
65 """
66 Constructor
67
68 :param: endpoint: Endpoint of the juju controller (host:port)
69 :param: api_proxy: Endpoint of the juju controller - Reachable from the VNFs
70 :param: username: Juju username
71 :param: password: Juju password
72 :param: cacert: Juju CA Certificate
73 :param: loop: Asyncio loop
74 :param: log: Logger
75 :param: db: DB object
76 :param: n2vc: N2VC object
77 :param: apt_mirror: APT Mirror
78 :param: enable_os_upgrade: Enable OS Upgrade
79 """
80
81 self.log = log or logging.getLogger("Libjuju")
82 self.db = db
83 db_endpoints = self._get_api_endpoints_db()
84 self.endpoints = db_endpoints or [endpoint]
85 if db_endpoints is None:
86 self._update_api_endpoints_db(self.endpoints)
87 self.api_proxy = api_proxy
88 self.username = username
89 self.password = password
90 self.cacert = cacert
91 self.loop = loop or asyncio.get_event_loop()
92 self.n2vc = n2vc
93
94 # Generate config for models
95 self.model_config = {}
96 if apt_mirror:
97 self.model_config["apt-mirror"] = apt_mirror
98 self.model_config["enable-os-refresh-update"] = enable_os_upgrade
99 self.model_config["enable-os-upgrade"] = enable_os_upgrade
100
101 self.loop.set_exception_handler(self.handle_exception)
102 self.creating_model = asyncio.Lock(loop=self.loop)
103
104 self.models = set()
105 self.log.debug("Libjuju initialized!")
106
107 self.health_check_task = self.loop.create_task(self.health_check())
108
109 async def get_controller(self, timeout: float = 5.0) -> Controller:
110 """
111 Get controller
112
113 :param: timeout: Time in seconds to wait for controller to connect
114 """
115 controller = None
116 try:
117 controller = Controller(loop=self.loop)
118 await asyncio.wait_for(
119 controller.connect(
120 endpoint=self.endpoints,
121 username=self.username,
122 password=self.password,
123 cacert=self.cacert,
124 ),
125 timeout=timeout,
126 )
127 endpoints = await controller.api_endpoints
128 if self.endpoints != endpoints:
129 self.endpoints = endpoints
130 self._update_api_endpoints_db(self.endpoints)
131 return controller
132 except asyncio.CancelledError as e:
133 raise e
134 except Exception as e:
135 self.log.error(
136 "Failed connecting to controller: {}...".format(self.endpoints)
137 )
138 if controller:
139 await self.disconnect_controller(controller)
140 raise JujuControllerFailedConnecting(e)
141
142 async def disconnect(self):
143 """Disconnect"""
144 # Cancel health check task
145 self.health_check_task.cancel()
146 self.log.debug("Libjuju disconnected!")
147
148 async def disconnect_model(self, model: Model):
149 """
150 Disconnect model
151
152 :param: model: Model that will be disconnected
153 """
154 await model.disconnect()
155
156 async def disconnect_controller(self, controller: Controller):
157 """
158 Disconnect controller
159
160 :param: controller: Controller that will be disconnected
161 """
162 await controller.disconnect()
163
164 async def add_model(self, model_name: str, cloud_name: str, credential_name=None):
165 """
166 Create model
167
168 :param: model_name: Model name
169 :param: cloud_name: Cloud name
170 :param: credential_name: Credential name to use for adding the model
171 If not specified, same name as the cloud will be used.
172 """
173
174 # Get controller
175 controller = await self.get_controller()
176 model = None
177 try:
178 # Raise exception if model already exists
179 if await self.model_exists(model_name, controller=controller):
180 raise JujuModelAlreadyExists(
181 "Model {} already exists.".format(model_name)
182 )
183
184 # Block until other workers have finished model creation
185 while self.creating_model.locked():
186 await asyncio.sleep(0.1)
187
188 # If the model exists, return it from the controller
189 if model_name in self.models:
190 return
191
192 # Create the model
193 async with self.creating_model:
194 self.log.debug("Creating model {}".format(model_name))
195 model = await controller.add_model(
196 model_name,
197 config=self.model_config,
198 cloud_name=cloud_name,
199 credential_name=credential_name or cloud_name,
200 )
201 self.models.add(model_name)
202 finally:
203 if model:
204 await self.disconnect_model(model)
205 await self.disconnect_controller(controller)
206
207 async def get_model(
208 self, controller: Controller, model_name: str, id=None
209 ) -> Model:
210 """
211 Get model from controller
212
213 :param: controller: Controller
214 :param: model_name: Model name
215
216 :return: Model: The created Juju model object
217 """
218 return await controller.get_model(model_name)
219
220 async def model_exists(
221 self, model_name: str, controller: Controller = None
222 ) -> bool:
223 """
224 Check if model exists
225
226 :param: controller: Controller
227 :param: model_name: Model name
228
229 :return bool
230 """
231 need_to_disconnect = False
232
233 # Get controller if not passed
234 if not controller:
235 controller = await self.get_controller()
236 need_to_disconnect = True
237
238 # Check if model exists
239 try:
240 return model_name in await controller.list_models()
241 finally:
242 if need_to_disconnect:
243 await self.disconnect_controller(controller)
244
245 async def models_exist(self, model_names: [str]) -> (bool, list):
246 """
247 Check if models exists
248
249 :param: model_names: List of strings with model names
250
251 :return (bool, list[str]): (True if all models exists, List of model names that don't exist)
252 """
253 if not model_names:
254 raise Exception(
255 "model_names must be a non-empty array. Given value: {}".format(
256 model_names
257 )
258 )
259 non_existing_models = []
260 models = await self.list_models()
261 existing_models = list(set(models).intersection(model_names))
262 non_existing_models = list(set(model_names) - set(existing_models))
263
264 return (
265 len(non_existing_models) == 0,
266 non_existing_models,
267 )
268
269 async def get_model_status(self, model_name: str) -> FullStatus:
270 """
271 Get model status
272
273 :param: model_name: Model name
274
275 :return: Full status object
276 """
277 controller = await self.get_controller()
278 model = await self.get_model(controller, model_name)
279 try:
280 return await model.get_status()
281 finally:
282 await self.disconnect_model(model)
283 await self.disconnect_controller(controller)
284
285 async def create_machine(
286 self,
287 model_name: str,
288 machine_id: str = None,
289 db_dict: dict = None,
290 progress_timeout: float = None,
291 total_timeout: float = None,
292 series: str = "xenial",
293 wait: bool = True,
294 ) -> (Machine, bool):
295 """
296 Create machine
297
298 :param: model_name: Model name
299 :param: machine_id: Machine id
300 :param: db_dict: Dictionary with data of the DB to write the updates
301 :param: progress_timeout: Maximum time between two updates in the model
302 :param: total_timeout: Timeout for the entity to be active
303 :param: series: Series of the machine (xenial, bionic, focal, ...)
304 :param: wait: Wait until machine is ready
305
306 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
307 if the machine is new or it already existed
308 """
309 new = False
310 machine = None
311
312 self.log.debug(
313 "Creating machine (id={}) in model: {}".format(machine_id, model_name)
314 )
315
316 # Get controller
317 controller = await self.get_controller()
318
319 # Get model
320 model = await self.get_model(controller, model_name)
321 try:
322 if machine_id is not None:
323 self.log.debug(
324 "Searching machine (id={}) in model {}".format(
325 machine_id, model_name
326 )
327 )
328
329 # Get machines from model and get the machine with machine_id if exists
330 machines = await model.get_machines()
331 if machine_id in machines:
332 self.log.debug(
333 "Machine (id={}) found in model {}".format(
334 machine_id, model_name
335 )
336 )
337 machine = machines[machine_id]
338 else:
339 raise JujuMachineNotFound("Machine {} not found".format(machine_id))
340
341 if machine is None:
342 self.log.debug("Creating a new machine in model {}".format(model_name))
343
344 # Create machine
345 machine = await model.add_machine(
346 spec=None, constraints=None, disks=None, series=series
347 )
348 new = True
349
350 # Wait until the machine is ready
351 self.log.debug(
352 "Wait until machine {} is ready in model {}".format(
353 machine.entity_id, model_name
354 )
355 )
356 if wait:
357 await JujuModelWatcher.wait_for(
358 model=model,
359 entity=machine,
360 progress_timeout=progress_timeout,
361 total_timeout=total_timeout,
362 db_dict=db_dict,
363 n2vc=self.n2vc,
364 )
365 finally:
366 await self.disconnect_model(model)
367 await self.disconnect_controller(controller)
368
369 self.log.debug(
370 "Machine {} ready at {} in model {}".format(
371 machine.entity_id, machine.dns_name, model_name
372 )
373 )
374 return machine, new
375
376 async def provision_machine(
377 self,
378 model_name: str,
379 hostname: str,
380 username: str,
381 private_key_path: str,
382 db_dict: dict = None,
383 progress_timeout: float = None,
384 total_timeout: float = None,
385 ) -> str:
386 """
387 Manually provisioning of a machine
388
389 :param: model_name: Model name
390 :param: hostname: IP to access the machine
391 :param: username: Username to login to the machine
392 :param: private_key_path: Local path for the private key
393 :param: db_dict: Dictionary with data of the DB to write the updates
394 :param: progress_timeout: Maximum time between two updates in the model
395 :param: total_timeout: Timeout for the entity to be active
396
397 :return: (Entity): Machine id
398 """
399 self.log.debug(
400 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
401 model_name, hostname, username
402 )
403 )
404
405 # Get controller
406 controller = await self.get_controller()
407
408 # Get model
409 model = await self.get_model(controller, model_name)
410
411 try:
412 # Get provisioner
413 provisioner = AsyncSSHProvisioner(
414 host=hostname,
415 user=username,
416 private_key_path=private_key_path,
417 log=self.log,
418 )
419
420 # Provision machine
421 params = await provisioner.provision_machine()
422
423 params.jobs = ["JobHostUnits"]
424
425 self.log.debug("Adding machine to model")
426 connection = model.connection()
427 client_facade = client.ClientFacade.from_connection(connection)
428
429 results = await client_facade.AddMachines(params=[params])
430 error = results.machines[0].error
431
432 if error:
433 msg = "Error adding machine: {}".format(error.message)
434 self.log.error(msg=msg)
435 raise ValueError(msg)
436
437 machine_id = results.machines[0].machine
438
439 self.log.debug("Installing Juju agent into machine {}".format(machine_id))
440 asyncio.ensure_future(
441 provisioner.install_agent(
442 connection=connection,
443 nonce=params.nonce,
444 machine_id=machine_id,
445 proxy=self.api_proxy,
446 )
447 )
448
449 machine = None
450 for _ in range(10):
451 machine_list = await model.get_machines()
452 if machine_id in machine_list:
453 self.log.debug("Machine {} found in model!".format(machine_id))
454 machine = model.machines.get(machine_id)
455 break
456 await asyncio.sleep(2)
457
458 if machine is None:
459 msg = "Machine {} not found in model".format(machine_id)
460 self.log.error(msg=msg)
461 raise JujuMachineNotFound(msg)
462
463 self.log.debug(
464 "Wait until machine {} is ready in model {}".format(
465 machine.entity_id, model_name
466 )
467 )
468 await JujuModelWatcher.wait_for(
469 model=model,
470 entity=machine,
471 progress_timeout=progress_timeout,
472 total_timeout=total_timeout,
473 db_dict=db_dict,
474 n2vc=self.n2vc,
475 )
476 except Exception as e:
477 raise e
478 finally:
479 await self.disconnect_model(model)
480 await self.disconnect_controller(controller)
481
482 self.log.debug(
483 "Machine provisioned {} in model {}".format(machine_id, model_name)
484 )
485
486 return machine_id
487
488 async def deploy_charm(
489 self,
490 application_name: str,
491 path: str,
492 model_name: str,
493 machine_id: str,
494 db_dict: dict = None,
495 progress_timeout: float = None,
496 total_timeout: float = None,
497 config: dict = None,
498 series: str = None,
499 num_units: int = 1,
500 ):
501 """Deploy charm
502
503 :param: application_name: Application name
504 :param: path: Local path to the charm
505 :param: model_name: Model name
506 :param: machine_id ID of the machine
507 :param: db_dict: Dictionary with data of the DB to write the updates
508 :param: progress_timeout: Maximum time between two updates in the model
509 :param: total_timeout: Timeout for the entity to be active
510 :param: config: Config for the charm
511 :param: series: Series of the charm
512 :param: num_units: Number of units
513
514 :return: (juju.application.Application): Juju application
515 """
516 self.log.debug(
517 "Deploying charm {} to machine {} in model ~{}".format(
518 application_name, machine_id, model_name
519 )
520 )
521 self.log.debug("charm: {}".format(path))
522
523 # Get controller
524 controller = await self.get_controller()
525
526 # Get model
527 model = await self.get_model(controller, model_name)
528
529 try:
530 application = None
531 if application_name not in model.applications:
532
533 if machine_id is not None:
534 if machine_id not in model.machines:
535 msg = "Machine {} not found in model".format(machine_id)
536 self.log.error(msg=msg)
537 raise JujuMachineNotFound(msg)
538 machine = model.machines[machine_id]
539 series = machine.series
540
541 application = await model.deploy(
542 entity_url=path,
543 application_name=application_name,
544 channel="stable",
545 num_units=1,
546 series=series,
547 to=machine_id,
548 config=config,
549 )
550
551 self.log.debug(
552 "Wait until application {} is ready in model {}".format(
553 application_name, model_name
554 )
555 )
556 if num_units > 1:
557 for _ in range(num_units - 1):
558 m, _ = await self.create_machine(model_name, wait=False)
559 await application.add_unit(to=m.entity_id)
560
561 await JujuModelWatcher.wait_for(
562 model=model,
563 entity=application,
564 progress_timeout=progress_timeout,
565 total_timeout=total_timeout,
566 db_dict=db_dict,
567 n2vc=self.n2vc,
568 )
569 self.log.debug(
570 "Application {} is ready in model {}".format(
571 application_name, model_name
572 )
573 )
574 else:
575 raise JujuApplicationExists(
576 "Application {} exists".format(application_name)
577 )
578 finally:
579 await self.disconnect_model(model)
580 await self.disconnect_controller(controller)
581
582 return application
583
584 def _get_application(self, model: Model, application_name: str) -> Application:
585 """Get application
586
587 :param: model: Model object
588 :param: application_name: Application name
589
590 :return: juju.application.Application (or None if it doesn't exist)
591 """
592 if model.applications and application_name in model.applications:
593 return model.applications[application_name]
594
595 async def execute_action(
596 self,
597 application_name: str,
598 model_name: str,
599 action_name: str,
600 db_dict: dict = None,
601 progress_timeout: float = None,
602 total_timeout: float = None,
603 **kwargs
604 ):
605 """Execute action
606
607 :param: application_name: Application name
608 :param: model_name: Model name
609 :param: action_name: Name of the action
610 :param: db_dict: Dictionary with data of the DB to write the updates
611 :param: progress_timeout: Maximum time between two updates in the model
612 :param: total_timeout: Timeout for the entity to be active
613
614 :return: (str, str): (output and status)
615 """
616 self.log.debug(
617 "Executing action {} using params {}".format(action_name, kwargs)
618 )
619 # Get controller
620 controller = await self.get_controller()
621
622 # Get model
623 model = await self.get_model(controller, model_name)
624
625 try:
626 # Get application
627 application = self._get_application(
628 model, application_name=application_name,
629 )
630 if application is None:
631 raise JujuApplicationNotFound("Cannot execute action")
632
633 # Get leader unit
634 # Racing condition:
635 # Ocassionally, self._get_leader_unit() will return None
636 # because the leader elected hook has not been triggered yet.
637 # Therefore, we are doing some retries. If it happens again,
638 # re-open bug 1236
639 attempts = 3
640 time_between_retries = 10
641 unit = None
642 for _ in range(attempts):
643 unit = await self._get_leader_unit(application)
644 if unit is None:
645 await asyncio.sleep(time_between_retries)
646 else:
647 break
648 if unit is None:
649 raise JujuLeaderUnitNotFound(
650 "Cannot execute action: leader unit not found"
651 )
652
653 actions = await application.get_actions()
654
655 if action_name not in actions:
656 raise JujuActionNotFound(
657 "Action {} not in available actions".format(action_name)
658 )
659
660 action = await unit.run_action(action_name, **kwargs)
661
662 self.log.debug(
663 "Wait until action {} is completed in application {} (model={})".format(
664 action_name, application_name, model_name
665 )
666 )
667 await JujuModelWatcher.wait_for(
668 model=model,
669 entity=action,
670 progress_timeout=progress_timeout,
671 total_timeout=total_timeout,
672 db_dict=db_dict,
673 n2vc=self.n2vc,
674 )
675
676 output = await model.get_action_output(action_uuid=action.entity_id)
677 status = await model.get_action_status(uuid_or_prefix=action.entity_id)
678 status = (
679 status[action.entity_id] if action.entity_id in status else "failed"
680 )
681
682 self.log.debug(
683 "Action {} completed with status {} in application {} (model={})".format(
684 action_name, action.status, application_name, model_name
685 )
686 )
687 finally:
688 await self.disconnect_model(model)
689 await self.disconnect_controller(controller)
690
691 return output, status
692
693 async def get_actions(self, application_name: str, model_name: str) -> dict:
694 """Get list of actions
695
696 :param: application_name: Application name
697 :param: model_name: Model name
698
699 :return: Dict with this format
700 {
701 "action_name": "Description of the action",
702 ...
703 }
704 """
705 self.log.debug(
706 "Getting list of actions for application {}".format(application_name)
707 )
708
709 # Get controller
710 controller = await self.get_controller()
711
712 # Get model
713 model = await self.get_model(controller, model_name)
714
715 try:
716 # Get application
717 application = self._get_application(
718 model, application_name=application_name,
719 )
720
721 # Return list of actions
722 return await application.get_actions()
723
724 finally:
725 # Disconnect from model and controller
726 await self.disconnect_model(model)
727 await self.disconnect_controller(controller)
728
729 async def get_metrics(self, model_name: str, application_name: str) -> dict:
730 """Get the metrics collected by the VCA.
731
732 :param model_name The name or unique id of the network service
733 :param application_name The name of the application
734 """
735 if not model_name or not application_name:
736 raise Exception("model_name and application_name must be non-empty strings")
737 metrics = {}
738 controller = await self.get_controller()
739 model = await self.get_model(controller, model_name)
740 try:
741 application = self._get_application(model, application_name)
742 if application is not None:
743 metrics = await application.get_metrics()
744 finally:
745 self.disconnect_model(model)
746 self.disconnect_controller(controller)
747 return metrics
748
749 async def add_relation(
750 self, model_name: str, endpoint_1: str, endpoint_2: str,
751 ):
752 """Add relation
753
754 :param: model_name: Model name
755 :param: endpoint_1 First endpoint name
756 ("app:endpoint" format or directly the saas name)
757 :param: endpoint_2: Second endpoint name (^ same format)
758 """
759
760 self.log.debug("Adding relation: {} -> {}".format(endpoint_1, endpoint_2))
761
762 # Get controller
763 controller = await self.get_controller()
764
765 # Get model
766 model = await self.get_model(controller, model_name)
767
768 # Add relation
769 try:
770 await model.add_relation(endpoint_1, endpoint_2)
771 except JujuAPIError as e:
772 if "not found" in e.message:
773 self.log.warning("Relation not found: {}".format(e.message))
774 return
775 if "already exists" in e.message:
776 self.log.warning("Relation already exists: {}".format(e.message))
777 return
778 # another exception, raise it
779 raise e
780 finally:
781 await self.disconnect_model(model)
782 await self.disconnect_controller(controller)
783
784 async def consume(
785 self, offer_url: str, model_name: str,
786 ):
787 """
788 Adds a remote offer to the model. Relations can be created later using "juju relate".
789
790 :param: offer_url: Offer Url
791 :param: model_name: Model name
792
793 :raises ParseError if there's a problem parsing the offer_url
794 :raises JujuError if remote offer includes and endpoint
795 :raises JujuAPIError if the operation is not successful
796 """
797 controller = await self.get_controller()
798 model = await controller.get_model(model_name)
799
800 try:
801 await model.consume(offer_url)
802 finally:
803 await self.disconnect_model(model)
804 await self.disconnect_controller(controller)
805
806 async def destroy_model(self, model_name: str, total_timeout: float):
807 """
808 Destroy model
809
810 :param: model_name: Model name
811 :param: total_timeout: Timeout
812 """
813
814 controller = await self.get_controller()
815 model = await self.get_model(controller, model_name)
816 try:
817 self.log.debug("Destroying model {}".format(model_name))
818 uuid = model.info.uuid
819
820 # Disconnect model
821 await self.disconnect_model(model)
822
823 # Destroy model
824 if model_name in self.models:
825 self.models.remove(model_name)
826
827 await controller.destroy_model(uuid, force=True, max_wait=0)
828
829 # Wait until model is destroyed
830 self.log.debug("Waiting for model {} to be destroyed...".format(model_name))
831
832 if total_timeout is None:
833 total_timeout = 3600
834 end = time.time() + total_timeout
835 while time.time() < end:
836 models = await controller.list_models()
837 if model_name not in models:
838 self.log.debug(
839 "The model {} ({}) was destroyed".format(model_name, uuid)
840 )
841 return
842 await asyncio.sleep(5)
843 raise Exception(
844 "Timeout waiting for model {} to be destroyed".format(model_name)
845 )
846 finally:
847 await self.disconnect_controller(controller)
848
849 async def destroy_application(self, model: Model, application_name: str):
850 """
851 Destroy application
852
853 :param: model: Model object
854 :param: application_name: Application name
855 """
856 self.log.debug(
857 "Destroying application {} in model {}".format(
858 application_name, model.info.name
859 )
860 )
861 application = model.applications.get(application_name)
862 if application:
863 await application.destroy()
864 else:
865 self.log.warning("Application not found: {}".format(application_name))
866
867 # async def destroy_machine(
868 # self, model: Model, machine_id: str, total_timeout: float = 3600
869 # ):
870 # """
871 # Destroy machine
872
873 # :param: model: Model object
874 # :param: machine_id: Machine id
875 # :param: total_timeout: Timeout in seconds
876 # """
877 # machines = await model.get_machines()
878 # if machine_id in machines:
879 # machine = machines[machine_id]
880 # await machine.destroy(force=True)
881 # # max timeout
882 # end = time.time() + total_timeout
883
884 # # wait for machine removal
885 # machines = await model.get_machines()
886 # while machine_id in machines and time.time() < end:
887 # self.log.debug("Waiting for machine {} is destroyed".format(machine_id))
888 # await asyncio.sleep(0.5)
889 # machines = await model.get_machines()
890 # self.log.debug("Machine destroyed: {}".format(machine_id))
891 # else:
892 # self.log.debug("Machine not found: {}".format(machine_id))
893
894 async def configure_application(
895 self, model_name: str, application_name: str, config: dict = None
896 ):
897 """Configure application
898
899 :param: model_name: Model name
900 :param: application_name: Application name
901 :param: config: Config to apply to the charm
902 """
903 self.log.debug("Configuring application {}".format(application_name))
904
905 if config:
906 try:
907 controller = await self.get_controller()
908 model = await self.get_model(controller, model_name)
909 application = self._get_application(
910 model, application_name=application_name,
911 )
912 await application.set_config(config)
913 finally:
914 await self.disconnect_model(model)
915 await self.disconnect_controller(controller)
916
917 def _get_api_endpoints_db(self) -> [str]:
918 """
919 Get API Endpoints from DB
920
921 :return: List of API endpoints
922 """
923 self.log.debug("Getting endpoints from database")
924
925 juju_info = self.db.get_one(
926 DB_DATA.api_endpoints.table,
927 q_filter=DB_DATA.api_endpoints.filter,
928 fail_on_empty=False,
929 )
930 if juju_info and DB_DATA.api_endpoints.key in juju_info:
931 return juju_info[DB_DATA.api_endpoints.key]
932
933 def _update_api_endpoints_db(self, endpoints: [str]):
934 """
935 Update API endpoints in Database
936
937 :param: List of endpoints
938 """
939 self.log.debug("Saving endpoints {} in database".format(endpoints))
940
941 juju_info = self.db.get_one(
942 DB_DATA.api_endpoints.table,
943 q_filter=DB_DATA.api_endpoints.filter,
944 fail_on_empty=False,
945 )
946 # If it doesn't, then create it
947 if not juju_info:
948 try:
949 self.db.create(
950 DB_DATA.api_endpoints.table, DB_DATA.api_endpoints.filter,
951 )
952 except DbException as e:
953 # Racing condition: check if another N2VC worker has created it
954 juju_info = self.db.get_one(
955 DB_DATA.api_endpoints.table,
956 q_filter=DB_DATA.api_endpoints.filter,
957 fail_on_empty=False,
958 )
959 if not juju_info:
960 raise e
961 self.db.set_one(
962 DB_DATA.api_endpoints.table,
963 DB_DATA.api_endpoints.filter,
964 {DB_DATA.api_endpoints.key: endpoints},
965 )
966
967 def handle_exception(self, loop, context):
968 # All unhandled exceptions by libjuju are handled here.
969 pass
970
971 async def health_check(self, interval: float = 300.0):
972 """
973 Health check to make sure controller and controller_model connections are OK
974
975 :param: interval: Time in seconds between checks
976 """
977 while True:
978 try:
979 controller = await self.get_controller()
980 # self.log.debug("VCA is alive")
981 except Exception as e:
982 self.log.error("Health check to VCA failed: {}".format(e))
983 finally:
984 await self.disconnect_controller(controller)
985 await asyncio.sleep(interval)
986
987 async def list_models(self, contains: str = None) -> [str]:
988 """List models with certain names
989
990 :param: contains: String that is contained in model name
991
992 :retur: [models] Returns list of model names
993 """
994
995 controller = await self.get_controller()
996 try:
997 models = await controller.list_models()
998 if contains:
999 models = [model for model in models if contains in model]
1000 return models
1001 finally:
1002 await self.disconnect_controller(controller)
1003
1004 async def list_offers(self, model_name: str) -> QueryApplicationOffersResults:
1005 """List models with certain names
1006
1007 :param: model_name: Model name
1008
1009 :return: Returns list of offers
1010 """
1011
1012 controller = await self.get_controller()
1013 try:
1014 return await controller.list_offers(model_name)
1015 finally:
1016 await self.disconnect_controller(controller)
1017
1018 async def add_k8s(
1019 self,
1020 name: str,
1021 configuration: Configuration,
1022 storage_class: str,
1023 credential_name: str = None,
1024 ):
1025 """
1026 Add a Kubernetes cloud to the controller
1027
1028 Similar to the `juju add-k8s` command in the CLI
1029
1030 :param: name: Name for the K8s cloud
1031 :param: configuration: Kubernetes configuration object
1032 :param: storage_class: Storage Class to use in the cloud
1033 :param: credential_name: Storage Class to use in the cloud
1034 """
1035
1036 if not storage_class:
1037 raise Exception("storage_class must be a non-empty string")
1038 if not name:
1039 raise Exception("name must be a non-empty string")
1040 if not configuration:
1041 raise Exception("configuration must be provided")
1042
1043 endpoint = configuration.host
1044 credential = self.get_k8s_cloud_credential(configuration)
1045 ca_certificates = (
1046 [credential.attrs["ClientCertificateData"]]
1047 if "ClientCertificateData" in credential.attrs
1048 else []
1049 )
1050 cloud = client.Cloud(
1051 type_="kubernetes",
1052 auth_types=[credential.auth_type],
1053 endpoint=endpoint,
1054 ca_certificates=ca_certificates,
1055 config={
1056 "operator-storage": storage_class,
1057 "workload-storage": storage_class,
1058 },
1059 )
1060
1061 return await self.add_cloud(
1062 name, cloud, credential, credential_name=credential_name
1063 )
1064
1065 def get_k8s_cloud_credential(
1066 self, configuration: Configuration,
1067 ) -> client.CloudCredential:
1068 attrs = {}
1069 ca_cert = configuration.ssl_ca_cert or configuration.cert_file
1070 key = configuration.key_file
1071 api_key = configuration.api_key
1072 token = None
1073 username = configuration.username
1074 password = configuration.password
1075
1076 if "authorization" in api_key:
1077 authorization = api_key["authorization"]
1078 if "Bearer " in authorization:
1079 bearer_list = authorization.split(" ")
1080 if len(bearer_list) == 2:
1081 [_, token] = bearer_list
1082 else:
1083 raise JujuInvalidK8sConfiguration("unknown format of api_key")
1084 else:
1085 token = authorization
1086 if ca_cert:
1087 attrs["ClientCertificateData"] = open(ca_cert, "r").read()
1088 if key:
1089 attrs["ClientKeyData"] = open(key, "r").read()
1090 if token:
1091 if username or password:
1092 raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass")
1093 attrs["Token"] = token
1094
1095 auth_type = None
1096 if key:
1097 auth_type = "oauth2"
1098 if not token:
1099 raise JujuInvalidK8sConfiguration(
1100 "missing token for auth type {}".format(auth_type)
1101 )
1102 elif username:
1103 if not password:
1104 self.log.debug(
1105 "credential for user {} has empty password".format(username)
1106 )
1107 attrs["username"] = username
1108 attrs["password"] = password
1109 if ca_cert:
1110 auth_type = "userpasswithcert"
1111 else:
1112 auth_type = "userpass"
1113 elif ca_cert and token:
1114 auth_type = "certificate"
1115 else:
1116 raise JujuInvalidK8sConfiguration("authentication method not supported")
1117 return client.CloudCredential(auth_type=auth_type, attrs=attrs,)
1118
1119 async def add_cloud(
1120 self,
1121 name: str,
1122 cloud: Cloud,
1123 credential: CloudCredential = None,
1124 credential_name: str = None,
1125 ) -> Cloud:
1126 """
1127 Add cloud to the controller
1128
1129 :param: name: Name of the cloud to be added
1130 :param: cloud: Cloud object
1131 :param: credential: CloudCredentials object for the cloud
1132 :param: credential_name: Credential name.
1133 If not defined, cloud of the name will be used.
1134 """
1135 controller = await self.get_controller()
1136 try:
1137 _ = await controller.add_cloud(name, cloud)
1138 if credential:
1139 await controller.add_credential(
1140 credential_name or name, credential=credential, cloud=name
1141 )
1142 # Need to return the object returned by the controller.add_cloud() function
1143 # I'm returning the original value now until this bug is fixed:
1144 # https://github.com/juju/python-libjuju/issues/443
1145 return cloud
1146 finally:
1147 await self.disconnect_controller(controller)
1148
1149 async def remove_cloud(self, name: str):
1150 """
1151 Remove cloud
1152
1153 :param: name: Name of the cloud to be removed
1154 """
1155 controller = await self.get_controller()
1156 try:
1157 await controller.remove_cloud(name)
1158 finally:
1159 await self.disconnect_controller(controller)
1160
1161 async def _get_leader_unit(self, application: Application) -> Unit:
1162 unit = None
1163 for u in application.units:
1164 if await u.is_leader_from_status():
1165 unit = u
1166 break
1167 return unit