a00fa58aaa0bc1b2535bc5aa16707fb59e594f67
[osm/N2VC.git] / n2vc / libjuju.py
1 # Copyright 2020 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import logging
17 from juju.controller import Controller
18 from juju.client import client
19 import time
20
21 from juju.errors import JujuAPIError
22 from juju.model import Model
23 from juju.machine import Machine
24 from juju.application import Application
25 from juju.client._definitions import (
26 FullStatus,
27 QueryApplicationOffersResults,
28 Cloud,
29 CloudCredential,
30 )
31 from n2vc.juju_watcher import JujuModelWatcher
32 from n2vc.provisioner import AsyncSSHProvisioner
33 from n2vc.n2vc_conn import N2VCConnector
34 from n2vc.exceptions import (
35 JujuMachineNotFound,
36 JujuApplicationNotFound,
37 JujuLeaderUnitNotFound,
38 JujuActionNotFound,
39 JujuModelAlreadyExists,
40 JujuControllerFailedConnecting,
41 JujuApplicationExists,
42 JujuInvalidK8sConfiguration,
43 )
44 from n2vc.utils import DB_DATA
45 from osm_common.dbbase import DbException
46 from kubernetes.client.configuration import Configuration
47
48
49 class Libjuju:
50 def __init__(
51 self,
52 endpoint: str,
53 api_proxy: str,
54 username: str,
55 password: str,
56 cacert: str,
57 loop: asyncio.AbstractEventLoop = None,
58 log: logging.Logger = None,
59 db: dict = None,
60 n2vc: N2VCConnector = None,
61 apt_mirror: str = None,
62 enable_os_upgrade: bool = True,
63 ):
64 """
65 Constructor
66
67 :param: endpoint: Endpoint of the juju controller (host:port)
68 :param: api_proxy: Endpoint of the juju controller - Reachable from the VNFs
69 :param: username: Juju username
70 :param: password: Juju password
71 :param: cacert: Juju CA Certificate
72 :param: loop: Asyncio loop
73 :param: log: Logger
74 :param: db: DB object
75 :param: n2vc: N2VC object
76 :param: apt_mirror: APT Mirror
77 :param: enable_os_upgrade: Enable OS Upgrade
78 """
79
80 self.log = log or logging.getLogger("Libjuju")
81 self.db = db
82 db_endpoints = self._get_api_endpoints_db()
83 self.endpoints = db_endpoints or [endpoint]
84 if db_endpoints is None:
85 self._update_api_endpoints_db(self.endpoints)
86 self.api_proxy = api_proxy
87 self.username = username
88 self.password = password
89 self.cacert = cacert
90 self.loop = loop or asyncio.get_event_loop()
91 self.n2vc = n2vc
92
93 # Generate config for models
94 self.model_config = {}
95 if apt_mirror:
96 self.model_config["apt-mirror"] = apt_mirror
97 self.model_config["enable-os-refresh-update"] = enable_os_upgrade
98 self.model_config["enable-os-upgrade"] = enable_os_upgrade
99
100 self.loop.set_exception_handler(self.handle_exception)
101 self.creating_model = asyncio.Lock(loop=self.loop)
102
103 self.models = set()
104 self.log.debug("Libjuju initialized!")
105
106 self.health_check_task = self.loop.create_task(self.health_check())
107
108 async def get_controller(self, timeout: float = 5.0) -> Controller:
109 """
110 Get controller
111
112 :param: timeout: Time in seconds to wait for controller to connect
113 """
114 controller = None
115 try:
116 controller = Controller(loop=self.loop)
117 await asyncio.wait_for(
118 controller.connect(
119 endpoint=self.endpoints,
120 username=self.username,
121 password=self.password,
122 cacert=self.cacert,
123 ),
124 timeout=timeout,
125 )
126 endpoints = await controller.api_endpoints
127 if self.endpoints != endpoints:
128 self.endpoints = endpoints
129 self._update_api_endpoints_db(self.endpoints)
130 return controller
131 except asyncio.CancelledError as e:
132 raise e
133 except Exception as e:
134 self.log.error(
135 "Failed connecting to controller: {}...".format(self.endpoints)
136 )
137 if controller:
138 await self.disconnect_controller(controller)
139 raise JujuControllerFailedConnecting(e)
140
141 async def disconnect(self):
142 """Disconnect"""
143 # Cancel health check task
144 self.health_check_task.cancel()
145 self.log.debug("Libjuju disconnected!")
146
147 async def disconnect_model(self, model: Model):
148 """
149 Disconnect model
150
151 :param: model: Model that will be disconnected
152 """
153 await model.disconnect()
154
155 async def disconnect_controller(self, controller: Controller):
156 """
157 Disconnect controller
158
159 :param: controller: Controller that will be disconnected
160 """
161 await controller.disconnect()
162
163 async def add_model(self, model_name: str, cloud_name: str, credential_name=None):
164 """
165 Create model
166
167 :param: model_name: Model name
168 :param: cloud_name: Cloud name
169 :param: credential_name: Credential name to use for adding the model
170 If not specified, same name as the cloud will be used.
171 """
172
173 # Get controller
174 controller = await self.get_controller()
175 model = None
176 try:
177 # Raise exception if model already exists
178 if await self.model_exists(model_name, controller=controller):
179 raise JujuModelAlreadyExists(
180 "Model {} already exists.".format(model_name)
181 )
182
183 # Block until other workers have finished model creation
184 while self.creating_model.locked():
185 await asyncio.sleep(0.1)
186
187 # If the model exists, return it from the controller
188 if model_name in self.models:
189 return
190
191 # Create the model
192 async with self.creating_model:
193 self.log.debug("Creating model {}".format(model_name))
194 model = await controller.add_model(
195 model_name,
196 config=self.model_config,
197 cloud_name=cloud_name,
198 credential_name=credential_name or cloud_name,
199 )
200 self.models.add(model_name)
201 finally:
202 if model:
203 await self.disconnect_model(model)
204 await self.disconnect_controller(controller)
205
206 async def get_model(
207 self, controller: Controller, model_name: str, id=None
208 ) -> Model:
209 """
210 Get model from controller
211
212 :param: controller: Controller
213 :param: model_name: Model name
214
215 :return: Model: The created Juju model object
216 """
217 return await controller.get_model(model_name)
218
219 async def model_exists(
220 self, model_name: str, controller: Controller = None
221 ) -> bool:
222 """
223 Check if model exists
224
225 :param: controller: Controller
226 :param: model_name: Model name
227
228 :return bool
229 """
230 need_to_disconnect = False
231
232 # Get controller if not passed
233 if not controller:
234 controller = await self.get_controller()
235 need_to_disconnect = True
236
237 # Check if model exists
238 try:
239 return model_name in await controller.list_models()
240 finally:
241 if need_to_disconnect:
242 await self.disconnect_controller(controller)
243
244 async def models_exist(self, model_names: [str]) -> (bool, list):
245 """
246 Check if models exists
247
248 :param: model_names: List of strings with model names
249
250 :return (bool, list[str]): (True if all models exists, List of model names that don't exist)
251 """
252 if not model_names:
253 raise Exception(
254 "model_names must be a non-empty array. Given value: {}".format(
255 model_names
256 )
257 )
258 non_existing_models = []
259 models = await self.list_models()
260 existing_models = list(set(models).intersection(model_names))
261 non_existing_models = list(set(model_names) - set(existing_models))
262
263 return (
264 len(non_existing_models) == 0,
265 non_existing_models,
266 )
267
268 async def get_model_status(self, model_name: str) -> FullStatus:
269 """
270 Get model status
271
272 :param: model_name: Model name
273
274 :return: Full status object
275 """
276 controller = await self.get_controller()
277 model = await self.get_model(controller, model_name)
278 try:
279 return await model.get_status()
280 finally:
281 await self.disconnect_model(model)
282 await self.disconnect_controller(controller)
283
284 async def create_machine(
285 self,
286 model_name: str,
287 machine_id: str = None,
288 db_dict: dict = None,
289 progress_timeout: float = None,
290 total_timeout: float = None,
291 series: str = "xenial",
292 wait: bool = True,
293 ) -> (Machine, bool):
294 """
295 Create machine
296
297 :param: model_name: Model name
298 :param: machine_id: Machine id
299 :param: db_dict: Dictionary with data of the DB to write the updates
300 :param: progress_timeout: Maximum time between two updates in the model
301 :param: total_timeout: Timeout for the entity to be active
302 :param: series: Series of the machine (xenial, bionic, focal, ...)
303 :param: wait: Wait until machine is ready
304
305 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
306 if the machine is new or it already existed
307 """
308 new = False
309 machine = None
310
311 self.log.debug(
312 "Creating machine (id={}) in model: {}".format(machine_id, model_name)
313 )
314
315 # Get controller
316 controller = await self.get_controller()
317
318 # Get model
319 model = await self.get_model(controller, model_name)
320 try:
321 if machine_id is not None:
322 self.log.debug(
323 "Searching machine (id={}) in model {}".format(
324 machine_id, model_name
325 )
326 )
327
328 # Get machines from model and get the machine with machine_id if exists
329 machines = await model.get_machines()
330 if machine_id in machines:
331 self.log.debug(
332 "Machine (id={}) found in model {}".format(
333 machine_id, model_name
334 )
335 )
336 machine = machines[machine_id]
337 else:
338 raise JujuMachineNotFound("Machine {} not found".format(machine_id))
339
340 if machine is None:
341 self.log.debug("Creating a new machine in model {}".format(model_name))
342
343 # Create machine
344 machine = await model.add_machine(
345 spec=None, constraints=None, disks=None, series=series
346 )
347 new = True
348
349 # Wait until the machine is ready
350 self.log.debug(
351 "Wait until machine {} is ready in model {}".format(
352 machine.entity_id, model_name
353 )
354 )
355 if wait:
356 await JujuModelWatcher.wait_for(
357 model=model,
358 entity=machine,
359 progress_timeout=progress_timeout,
360 total_timeout=total_timeout,
361 db_dict=db_dict,
362 n2vc=self.n2vc,
363 )
364 finally:
365 await self.disconnect_model(model)
366 await self.disconnect_controller(controller)
367
368 self.log.debug(
369 "Machine {} ready at {} in model {}".format(
370 machine.entity_id, machine.dns_name, model_name
371 )
372 )
373 return machine, new
374
375 async def provision_machine(
376 self,
377 model_name: str,
378 hostname: str,
379 username: str,
380 private_key_path: str,
381 db_dict: dict = None,
382 progress_timeout: float = None,
383 total_timeout: float = None,
384 ) -> str:
385 """
386 Manually provisioning of a machine
387
388 :param: model_name: Model name
389 :param: hostname: IP to access the machine
390 :param: username: Username to login to the machine
391 :param: private_key_path: Local path for the private key
392 :param: db_dict: Dictionary with data of the DB to write the updates
393 :param: progress_timeout: Maximum time between two updates in the model
394 :param: total_timeout: Timeout for the entity to be active
395
396 :return: (Entity): Machine id
397 """
398 self.log.debug(
399 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
400 model_name, hostname, username
401 )
402 )
403
404 # Get controller
405 controller = await self.get_controller()
406
407 # Get model
408 model = await self.get_model(controller, model_name)
409
410 try:
411 # Get provisioner
412 provisioner = AsyncSSHProvisioner(
413 host=hostname,
414 user=username,
415 private_key_path=private_key_path,
416 log=self.log,
417 )
418
419 # Provision machine
420 params = await provisioner.provision_machine()
421
422 params.jobs = ["JobHostUnits"]
423
424 self.log.debug("Adding machine to model")
425 connection = model.connection()
426 client_facade = client.ClientFacade.from_connection(connection)
427
428 results = await client_facade.AddMachines(params=[params])
429 error = results.machines[0].error
430
431 if error:
432 msg = "Error adding machine: {}".format(error.message)
433 self.log.error(msg=msg)
434 raise ValueError(msg)
435
436 machine_id = results.machines[0].machine
437
438 self.log.debug("Installing Juju agent into machine {}".format(machine_id))
439 asyncio.ensure_future(
440 provisioner.install_agent(
441 connection=connection,
442 nonce=params.nonce,
443 machine_id=machine_id,
444 proxy=self.api_proxy,
445 )
446 )
447
448 machine = None
449 for _ in range(10):
450 machine_list = await model.get_machines()
451 if machine_id in machine_list:
452 self.log.debug("Machine {} found in model!".format(machine_id))
453 machine = model.machines.get(machine_id)
454 break
455 await asyncio.sleep(2)
456
457 if machine is None:
458 msg = "Machine {} not found in model".format(machine_id)
459 self.log.error(msg=msg)
460 raise JujuMachineNotFound(msg)
461
462 self.log.debug(
463 "Wait until machine {} is ready in model {}".format(
464 machine.entity_id, model_name
465 )
466 )
467 await JujuModelWatcher.wait_for(
468 model=model,
469 entity=machine,
470 progress_timeout=progress_timeout,
471 total_timeout=total_timeout,
472 db_dict=db_dict,
473 n2vc=self.n2vc,
474 )
475 except Exception as e:
476 raise e
477 finally:
478 await self.disconnect_model(model)
479 await self.disconnect_controller(controller)
480
481 self.log.debug(
482 "Machine provisioned {} in model {}".format(machine_id, model_name)
483 )
484
485 return machine_id
486
487 async def deploy_charm(
488 self,
489 application_name: str,
490 path: str,
491 model_name: str,
492 machine_id: str,
493 db_dict: dict = None,
494 progress_timeout: float = None,
495 total_timeout: float = None,
496 config: dict = None,
497 series: str = None,
498 num_units: int = 1,
499 ):
500 """Deploy charm
501
502 :param: application_name: Application name
503 :param: path: Local path to the charm
504 :param: model_name: Model name
505 :param: machine_id ID of the machine
506 :param: db_dict: Dictionary with data of the DB to write the updates
507 :param: progress_timeout: Maximum time between two updates in the model
508 :param: total_timeout: Timeout for the entity to be active
509 :param: config: Config for the charm
510 :param: series: Series of the charm
511 :param: num_units: Number of units
512
513 :return: (juju.application.Application): Juju application
514 """
515 self.log.debug(
516 "Deploying charm {} to machine {} in model ~{}".format(
517 application_name, machine_id, model_name
518 )
519 )
520 self.log.debug("charm: {}".format(path))
521
522 # Get controller
523 controller = await self.get_controller()
524
525 # Get model
526 model = await self.get_model(controller, model_name)
527
528 try:
529 application = None
530 if application_name not in model.applications:
531
532 if machine_id is not None:
533 if machine_id not in model.machines:
534 msg = "Machine {} not found in model".format(machine_id)
535 self.log.error(msg=msg)
536 raise JujuMachineNotFound(msg)
537 machine = model.machines[machine_id]
538 series = machine.series
539
540 application = await model.deploy(
541 entity_url=path,
542 application_name=application_name,
543 channel="stable",
544 num_units=1,
545 series=series,
546 to=machine_id,
547 config=config,
548 )
549
550 self.log.debug(
551 "Wait until application {} is ready in model {}".format(
552 application_name, model_name
553 )
554 )
555 if num_units > 1:
556 for _ in range(num_units - 1):
557 m, _ = await self.create_machine(model_name, wait=False)
558 await application.add_unit(to=m.entity_id)
559
560 await JujuModelWatcher.wait_for(
561 model=model,
562 entity=application,
563 progress_timeout=progress_timeout,
564 total_timeout=total_timeout,
565 db_dict=db_dict,
566 n2vc=self.n2vc,
567 )
568 self.log.debug(
569 "Application {} is ready in model {}".format(
570 application_name, model_name
571 )
572 )
573 else:
574 raise JujuApplicationExists(
575 "Application {} exists".format(application_name)
576 )
577 finally:
578 await self.disconnect_model(model)
579 await self.disconnect_controller(controller)
580
581 return application
582
583 def _get_application(self, model: Model, application_name: str) -> Application:
584 """Get application
585
586 :param: model: Model object
587 :param: application_name: Application name
588
589 :return: juju.application.Application (or None if it doesn't exist)
590 """
591 if model.applications and application_name in model.applications:
592 return model.applications[application_name]
593
594 async def execute_action(
595 self,
596 application_name: str,
597 model_name: str,
598 action_name: str,
599 db_dict: dict = None,
600 progress_timeout: float = None,
601 total_timeout: float = None,
602 **kwargs
603 ):
604 """Execute action
605
606 :param: application_name: Application name
607 :param: model_name: Model name
608 :param: action_name: Name of the action
609 :param: db_dict: Dictionary with data of the DB to write the updates
610 :param: progress_timeout: Maximum time between two updates in the model
611 :param: total_timeout: Timeout for the entity to be active
612
613 :return: (str, str): (output and status)
614 """
615 self.log.debug(
616 "Executing action {} using params {}".format(action_name, kwargs)
617 )
618 # Get controller
619 controller = await self.get_controller()
620
621 # Get model
622 model = await self.get_model(controller, model_name)
623
624 try:
625 # Get application
626 application = self._get_application(
627 model, application_name=application_name,
628 )
629 if application is None:
630 raise JujuApplicationNotFound("Cannot execute action")
631
632 # Get unit
633 unit = None
634 for u in application.units:
635 if await u.is_leader_from_status():
636 unit = u
637 if unit is None:
638 raise JujuLeaderUnitNotFound(
639 "Cannot execute action: leader unit not found"
640 )
641
642 actions = await application.get_actions()
643
644 if action_name not in actions:
645 raise JujuActionNotFound(
646 "Action {} not in available actions".format(action_name)
647 )
648
649 action = await unit.run_action(action_name, **kwargs)
650
651 self.log.debug(
652 "Wait until action {} is completed in application {} (model={})".format(
653 action_name, application_name, model_name
654 )
655 )
656 await JujuModelWatcher.wait_for(
657 model=model,
658 entity=action,
659 progress_timeout=progress_timeout,
660 total_timeout=total_timeout,
661 db_dict=db_dict,
662 n2vc=self.n2vc,
663 )
664
665 output = await model.get_action_output(action_uuid=action.entity_id)
666 status = await model.get_action_status(uuid_or_prefix=action.entity_id)
667 status = (
668 status[action.entity_id] if action.entity_id in status else "failed"
669 )
670
671 self.log.debug(
672 "Action {} completed with status {} in application {} (model={})".format(
673 action_name, action.status, application_name, model_name
674 )
675 )
676 finally:
677 await self.disconnect_model(model)
678 await self.disconnect_controller(controller)
679
680 return output, status
681
682 async def get_actions(self, application_name: str, model_name: str) -> dict:
683 """Get list of actions
684
685 :param: application_name: Application name
686 :param: model_name: Model name
687
688 :return: Dict with this format
689 {
690 "action_name": "Description of the action",
691 ...
692 }
693 """
694 self.log.debug(
695 "Getting list of actions for application {}".format(application_name)
696 )
697
698 # Get controller
699 controller = await self.get_controller()
700
701 # Get model
702 model = await self.get_model(controller, model_name)
703
704 try:
705 # Get application
706 application = self._get_application(
707 model, application_name=application_name,
708 )
709
710 # Return list of actions
711 return await application.get_actions()
712
713 finally:
714 # Disconnect from model and controller
715 await self.disconnect_model(model)
716 await self.disconnect_controller(controller)
717
718 async def get_metrics(self, model_name: str, application_name: str) -> dict:
719 """Get the metrics collected by the VCA.
720
721 :param model_name The name or unique id of the network service
722 :param application_name The name of the application
723 """
724 if not model_name or not application_name:
725 raise Exception("model_name and application_name must be non-empty strings")
726 metrics = {}
727 controller = await self.get_controller()
728 model = await self.get_model(controller, model_name)
729 try:
730 application = self._get_application(model, application_name)
731 if application is not None:
732 metrics = await application.get_metrics()
733 finally:
734 self.disconnect_model(model)
735 self.disconnect_controller(controller)
736 return metrics
737
738 async def add_relation(
739 self, model_name: str, endpoint_1: str, endpoint_2: str,
740 ):
741 """Add relation
742
743 :param: model_name: Model name
744 :param: endpoint_1 First endpoint name
745 ("app:endpoint" format or directly the saas name)
746 :param: endpoint_2: Second endpoint name (^ same format)
747 """
748
749 self.log.debug("Adding relation: {} -> {}".format(endpoint_1, endpoint_2))
750
751 # Get controller
752 controller = await self.get_controller()
753
754 # Get model
755 model = await self.get_model(controller, model_name)
756
757 # Add relation
758 try:
759 await model.add_relation(endpoint_1, endpoint_2)
760 except JujuAPIError as e:
761 if "not found" in e.message:
762 self.log.warning("Relation not found: {}".format(e.message))
763 return
764 if "already exists" in e.message:
765 self.log.warning("Relation already exists: {}".format(e.message))
766 return
767 # another exception, raise it
768 raise e
769 finally:
770 await self.disconnect_model(model)
771 await self.disconnect_controller(controller)
772
773 async def consume(
774 self, offer_url: str, model_name: str,
775 ):
776 """
777 Adds a remote offer to the model. Relations can be created later using "juju relate".
778
779 :param: offer_url: Offer Url
780 :param: model_name: Model name
781
782 :raises ParseError if there's a problem parsing the offer_url
783 :raises JujuError if remote offer includes and endpoint
784 :raises JujuAPIError if the operation is not successful
785 """
786 controller = await self.get_controller()
787 model = await controller.get_model(model_name)
788
789 try:
790 await model.consume(offer_url)
791 finally:
792 await self.disconnect_model(model)
793 await self.disconnect_controller(controller)
794
795 async def destroy_model(self, model_name: str, total_timeout: float):
796 """
797 Destroy model
798
799 :param: model_name: Model name
800 :param: total_timeout: Timeout
801 """
802
803 controller = await self.get_controller()
804 model = await self.get_model(controller, model_name)
805 try:
806 self.log.debug("Destroying model {}".format(model_name))
807 uuid = model.info.uuid
808
809 # Destroy machines
810 machines = await model.get_machines()
811 for machine_id in machines:
812 try:
813 await self.destroy_machine(
814 model, machine_id=machine_id, total_timeout=total_timeout,
815 )
816 except asyncio.CancelledError:
817 raise
818 except Exception:
819 pass
820
821 # Disconnect model
822 await self.disconnect_model(model)
823
824 # Destroy model
825 if model_name in self.models:
826 self.models.remove(model_name)
827
828 await controller.destroy_model(uuid)
829
830 # Wait until model is destroyed
831 self.log.debug("Waiting for model {} to be destroyed...".format(model_name))
832 last_exception = ""
833
834 if total_timeout is None:
835 total_timeout = 3600
836 end = time.time() + total_timeout
837 while time.time() < end:
838 try:
839 models = await controller.list_models()
840 if model_name not in models:
841 self.log.debug(
842 "The model {} ({}) was destroyed".format(model_name, uuid)
843 )
844 return
845 except asyncio.CancelledError:
846 raise
847 except Exception as e:
848 last_exception = e
849 await asyncio.sleep(5)
850 raise Exception(
851 "Timeout waiting for model {} to be destroyed {}".format(
852 model_name, last_exception
853 )
854 )
855 finally:
856 await self.disconnect_controller(controller)
857
858 async def destroy_application(self, model: Model, application_name: str):
859 """
860 Destroy application
861
862 :param: model: Model object
863 :param: application_name: Application name
864 """
865 self.log.debug(
866 "Destroying application {} in model {}".format(
867 application_name, model.info.name
868 )
869 )
870 application = model.applications.get(application_name)
871 if application:
872 await application.destroy()
873 else:
874 self.log.warning("Application not found: {}".format(application_name))
875
876 async def destroy_machine(
877 self, model: Model, machine_id: str, total_timeout: float = 3600
878 ):
879 """
880 Destroy machine
881
882 :param: model: Model object
883 :param: machine_id: Machine id
884 :param: total_timeout: Timeout in seconds
885 """
886 machines = await model.get_machines()
887 if machine_id in machines:
888 machine = machines[machine_id]
889 await machine.destroy(force=True)
890 # max timeout
891 end = time.time() + total_timeout
892
893 # wait for machine removal
894 machines = await model.get_machines()
895 while machine_id in machines and time.time() < end:
896 self.log.debug("Waiting for machine {} is destroyed".format(machine_id))
897 await asyncio.sleep(0.5)
898 machines = await model.get_machines()
899 self.log.debug("Machine destroyed: {}".format(machine_id))
900 else:
901 self.log.debug("Machine not found: {}".format(machine_id))
902
903 async def configure_application(
904 self, model_name: str, application_name: str, config: dict = None
905 ):
906 """Configure application
907
908 :param: model_name: Model name
909 :param: application_name: Application name
910 :param: config: Config to apply to the charm
911 """
912 self.log.debug("Configuring application {}".format(application_name))
913
914 if config:
915 try:
916 controller = await self.get_controller()
917 model = await self.get_model(controller, model_name)
918 application = self._get_application(
919 model, application_name=application_name,
920 )
921 await application.set_config(config)
922 finally:
923 await self.disconnect_model(model)
924 await self.disconnect_controller(controller)
925
926 def _get_api_endpoints_db(self) -> [str]:
927 """
928 Get API Endpoints from DB
929
930 :return: List of API endpoints
931 """
932 self.log.debug("Getting endpoints from database")
933
934 juju_info = self.db.get_one(
935 DB_DATA.api_endpoints.table,
936 q_filter=DB_DATA.api_endpoints.filter,
937 fail_on_empty=False,
938 )
939 if juju_info and DB_DATA.api_endpoints.key in juju_info:
940 return juju_info[DB_DATA.api_endpoints.key]
941
942 def _update_api_endpoints_db(self, endpoints: [str]):
943 """
944 Update API endpoints in Database
945
946 :param: List of endpoints
947 """
948 self.log.debug("Saving endpoints {} in database".format(endpoints))
949
950 juju_info = self.db.get_one(
951 DB_DATA.api_endpoints.table,
952 q_filter=DB_DATA.api_endpoints.filter,
953 fail_on_empty=False,
954 )
955 # If it doesn't, then create it
956 if not juju_info:
957 try:
958 self.db.create(
959 DB_DATA.api_endpoints.table, DB_DATA.api_endpoints.filter,
960 )
961 except DbException as e:
962 # Racing condition: check if another N2VC worker has created it
963 juju_info = self.db.get_one(
964 DB_DATA.api_endpoints.table,
965 q_filter=DB_DATA.api_endpoints.filter,
966 fail_on_empty=False,
967 )
968 if not juju_info:
969 raise e
970 self.db.set_one(
971 DB_DATA.api_endpoints.table,
972 DB_DATA.api_endpoints.filter,
973 {DB_DATA.api_endpoints.key: endpoints},
974 )
975
976 def handle_exception(self, loop, context):
977 # All unhandled exceptions by libjuju are handled here.
978 pass
979
980 async def health_check(self, interval: float = 300.0):
981 """
982 Health check to make sure controller and controller_model connections are OK
983
984 :param: interval: Time in seconds between checks
985 """
986 while True:
987 try:
988 controller = await self.get_controller()
989 # self.log.debug("VCA is alive")
990 except Exception as e:
991 self.log.error("Health check to VCA failed: {}".format(e))
992 finally:
993 await self.disconnect_controller(controller)
994 await asyncio.sleep(interval)
995
996 async def list_models(self, contains: str = None) -> [str]:
997 """List models with certain names
998
999 :param: contains: String that is contained in model name
1000
1001 :retur: [models] Returns list of model names
1002 """
1003
1004 controller = await self.get_controller()
1005 try:
1006 models = await controller.list_models()
1007 if contains:
1008 models = [model for model in models if contains in model]
1009 return models
1010 finally:
1011 await self.disconnect_controller(controller)
1012
1013 async def list_offers(self, model_name: str) -> QueryApplicationOffersResults:
1014 """List models with certain names
1015
1016 :param: model_name: Model name
1017
1018 :return: Returns list of offers
1019 """
1020
1021 controller = await self.get_controller()
1022 try:
1023 return await controller.list_offers(model_name)
1024 finally:
1025 await self.disconnect_controller(controller)
1026
1027 async def add_k8s(
1028 self,
1029 name: str,
1030 configuration: Configuration,
1031 storage_class: str,
1032 credential_name: str = None,
1033 ):
1034 """
1035 Add a Kubernetes cloud to the controller
1036
1037 Similar to the `juju add-k8s` command in the CLI
1038
1039 :param: name: Name for the K8s cloud
1040 :param: configuration: Kubernetes configuration object
1041 :param: storage_class: Storage Class to use in the cloud
1042 :param: credential_name: Storage Class to use in the cloud
1043 """
1044
1045 if not storage_class:
1046 raise Exception("storage_class must be a non-empty string")
1047 if not name:
1048 raise Exception("name must be a non-empty string")
1049 if not configuration:
1050 raise Exception("configuration must be provided")
1051
1052 endpoint = configuration.host
1053 credential = self.get_k8s_cloud_credential(configuration)
1054 ca_certificates = (
1055 [credential.attrs["ClientCertificateData"]]
1056 if "ClientCertificateData" in credential.attrs
1057 else []
1058 )
1059 cloud = client.Cloud(
1060 type_="kubernetes",
1061 auth_types=[credential.auth_type],
1062 endpoint=endpoint,
1063 ca_certificates=ca_certificates,
1064 config={
1065 "operator-storage": storage_class,
1066 "workload-storage": storage_class,
1067 },
1068 )
1069
1070 return await self.add_cloud(
1071 name, cloud, credential, credential_name=credential_name
1072 )
1073
1074 def get_k8s_cloud_credential(
1075 self, configuration: Configuration,
1076 ) -> client.CloudCredential:
1077 attrs = {}
1078 ca_cert = configuration.ssl_ca_cert or configuration.cert_file
1079 key = configuration.key_file
1080 api_key = configuration.api_key
1081 token = None
1082 username = configuration.username
1083 password = configuration.password
1084
1085 if "authorization" in api_key:
1086 authorization = api_key["authorization"]
1087 if "Bearer " in authorization:
1088 bearer_list = authorization.split(" ")
1089 if len(bearer_list) == 2:
1090 [_, token] = bearer_list
1091 else:
1092 raise JujuInvalidK8sConfiguration("unknown format of api_key")
1093 else:
1094 token = authorization
1095 if ca_cert:
1096 attrs["ClientCertificateData"] = open(ca_cert, "r").read()
1097 if key:
1098 attrs["ClientKeyData"] = open(key, "r").read()
1099 if token:
1100 if username or password:
1101 raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass")
1102 attrs["Token"] = token
1103
1104 auth_type = None
1105 if key:
1106 auth_type = "oauth2"
1107 if not token:
1108 raise JujuInvalidK8sConfiguration(
1109 "missing token for auth type {}".format(auth_type)
1110 )
1111 elif username:
1112 if not password:
1113 self.log.debug(
1114 "credential for user {} has empty password".format(username)
1115 )
1116 attrs["username"] = username
1117 attrs["password"] = password
1118 if ca_cert:
1119 auth_type = "userpasswithcert"
1120 else:
1121 auth_type = "userpass"
1122 elif ca_cert and token:
1123 auth_type = "certificate"
1124 else:
1125 raise JujuInvalidK8sConfiguration("authentication method not supported")
1126 return client.CloudCredential(auth_type=auth_type, attrs=attrs,)
1127
1128 async def add_cloud(
1129 self,
1130 name: str,
1131 cloud: Cloud,
1132 credential: CloudCredential = None,
1133 credential_name: str = None,
1134 ) -> Cloud:
1135 """
1136 Add cloud to the controller
1137
1138 :param: name: Name of the cloud to be added
1139 :param: cloud: Cloud object
1140 :param: credential: CloudCredentials object for the cloud
1141 :param: credential_name: Credential name.
1142 If not defined, cloud of the name will be used.
1143 """
1144 controller = await self.get_controller()
1145 try:
1146 _ = await controller.add_cloud(name, cloud)
1147 if credential:
1148 await controller.add_credential(
1149 credential_name or name, credential=credential, cloud=name
1150 )
1151 # Need to return the object returned by the controller.add_cloud() function
1152 # I'm returning the original value now until this bug is fixed:
1153 # https://github.com/juju/python-libjuju/issues/443
1154 return cloud
1155 finally:
1156 await self.disconnect_controller(controller)
1157
1158 async def remove_cloud(self, name: str):
1159 """
1160 Remove cloud
1161
1162 :param: name: Name of the cloud to be removed
1163 """
1164 controller = await self.get_controller()
1165 try:
1166 await controller.remove_cloud(name)
1167 finally:
1168 await self.disconnect_controller(controller)