Add new kubectl.py functions, modify some libjuju.py functions, add unit tests
[osm/N2VC.git] / n2vc / libjuju.py
1 # Copyright 2020 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import logging
17 from juju.controller import Controller
18 from juju.client import client
19 import time
20
21 from juju.errors import JujuAPIError
22 from juju.model import Model
23 from juju.machine import Machine
24 from juju.application import Application
25 from juju.client._definitions import (
26 FullStatus,
27 QueryApplicationOffersResults,
28 Cloud,
29 CloudCredential,
30 )
31 from n2vc.juju_watcher import JujuModelWatcher
32 from n2vc.provisioner import AsyncSSHProvisioner
33 from n2vc.n2vc_conn import N2VCConnector
34 from n2vc.exceptions import (
35 JujuMachineNotFound,
36 JujuApplicationNotFound,
37 JujuLeaderUnitNotFound,
38 JujuActionNotFound,
39 JujuModelAlreadyExists,
40 JujuControllerFailedConnecting,
41 JujuApplicationExists,
42 JujuInvalidK8sConfiguration,
43 )
44 from n2vc.utils import DB_DATA
45 from osm_common.dbbase import DbException
46 from kubernetes.client.configuration import Configuration
47
48
49 class Libjuju:
50 def __init__(
51 self,
52 endpoint: str,
53 api_proxy: str,
54 username: str,
55 password: str,
56 cacert: str,
57 loop: asyncio.AbstractEventLoop = None,
58 log: logging.Logger = None,
59 db: dict = None,
60 n2vc: N2VCConnector = None,
61 apt_mirror: str = None,
62 enable_os_upgrade: bool = True,
63 ):
64 """
65 Constructor
66
67 :param: endpoint: Endpoint of the juju controller (host:port)
68 :param: api_proxy: Endpoint of the juju controller - Reachable from the VNFs
69 :param: username: Juju username
70 :param: password: Juju password
71 :param: cacert: Juju CA Certificate
72 :param: loop: Asyncio loop
73 :param: log: Logger
74 :param: db: DB object
75 :param: n2vc: N2VC object
76 :param: apt_mirror: APT Mirror
77 :param: enable_os_upgrade: Enable OS Upgrade
78 """
79
80 self.log = log or logging.getLogger("Libjuju")
81 self.db = db
82 db_endpoints = self._get_api_endpoints_db()
83 self.endpoints = db_endpoints or [endpoint]
84 if db_endpoints is None:
85 self._update_api_endpoints_db(self.endpoints)
86 self.api_proxy = api_proxy
87 self.username = username
88 self.password = password
89 self.cacert = cacert
90 self.loop = loop or asyncio.get_event_loop()
91 self.n2vc = n2vc
92
93 # Generate config for models
94 self.model_config = {}
95 if apt_mirror:
96 self.model_config["apt-mirror"] = apt_mirror
97 self.model_config["enable-os-refresh-update"] = enable_os_upgrade
98 self.model_config["enable-os-upgrade"] = enable_os_upgrade
99
100 self.loop.set_exception_handler(self.handle_exception)
101 self.creating_model = asyncio.Lock(loop=self.loop)
102
103 self.models = set()
104 self.log.debug("Libjuju initialized!")
105
106 self.health_check_task = self.loop.create_task(self.health_check())
107
108 async def get_controller(self, timeout: float = 5.0) -> Controller:
109 """
110 Get controller
111
112 :param: timeout: Time in seconds to wait for controller to connect
113 """
114 controller = None
115 try:
116 controller = Controller(loop=self.loop)
117 await asyncio.wait_for(
118 controller.connect(
119 endpoint=self.endpoints,
120 username=self.username,
121 password=self.password,
122 cacert=self.cacert,
123 ),
124 timeout=timeout,
125 )
126 endpoints = await controller.api_endpoints
127 if self.endpoints != endpoints:
128 self.endpoints = endpoints
129 self._update_api_endpoints_db(self.endpoints)
130 return controller
131 except asyncio.CancelledError as e:
132 raise e
133 except Exception as e:
134 self.log.error(
135 "Failed connecting to controller: {}...".format(self.endpoints)
136 )
137 if controller:
138 await self.disconnect_controller(controller)
139 raise JujuControllerFailedConnecting(e)
140
141 async def disconnect(self):
142 """Disconnect"""
143 # Cancel health check task
144 self.health_check_task.cancel()
145 self.log.debug("Libjuju disconnected!")
146
147 async def disconnect_model(self, model: Model):
148 """
149 Disconnect model
150
151 :param: model: Model that will be disconnected
152 """
153 await model.disconnect()
154
155 async def disconnect_controller(self, controller: Controller):
156 """
157 Disconnect controller
158
159 :param: controller: Controller that will be disconnected
160 """
161 await controller.disconnect()
162
163 async def add_model(self, model_name: str, cloud_name: str):
164 """
165 Create model
166
167 :param: model_name: Model name
168 :param: cloud_name: Cloud name
169 """
170
171 # Get controller
172 controller = await self.get_controller()
173 model = None
174 try:
175 # Raise exception if model already exists
176 if await self.model_exists(model_name, controller=controller):
177 raise JujuModelAlreadyExists(
178 "Model {} already exists.".format(model_name)
179 )
180
181 # Block until other workers have finished model creation
182 while self.creating_model.locked():
183 await asyncio.sleep(0.1)
184
185 # If the model exists, return it from the controller
186 if model_name in self.models:
187 return
188
189 # Create the model
190 async with self.creating_model:
191 self.log.debug("Creating model {}".format(model_name))
192 model = await controller.add_model(
193 model_name,
194 config=self.model_config,
195 cloud_name=cloud_name,
196 credential_name=cloud_name,
197 )
198 self.models.add(model_name)
199 finally:
200 if model:
201 await self.disconnect_model(model)
202 await self.disconnect_controller(controller)
203
204 async def get_model(
205 self, controller: Controller, model_name: str, id=None
206 ) -> Model:
207 """
208 Get model from controller
209
210 :param: controller: Controller
211 :param: model_name: Model name
212
213 :return: Model: The created Juju model object
214 """
215 return await controller.get_model(model_name)
216
217 async def model_exists(
218 self, model_name: str, controller: Controller = None
219 ) -> bool:
220 """
221 Check if model exists
222
223 :param: controller: Controller
224 :param: model_name: Model name
225
226 :return bool
227 """
228 need_to_disconnect = False
229
230 # Get controller if not passed
231 if not controller:
232 controller = await self.get_controller()
233 need_to_disconnect = True
234
235 # Check if model exists
236 try:
237 return model_name in await controller.list_models()
238 finally:
239 if need_to_disconnect:
240 await self.disconnect_controller(controller)
241
242 async def models_exist(self, model_names: [str]) -> (bool, list):
243 """
244 Check if models exists
245
246 :param: model_names: List of strings with model names
247
248 :return (bool, list[str]): (True if all models exists, List of model names that don't exist)
249 """
250 if not model_names:
251 raise Exception(
252 "model_names must be a non-empty array. Given value: {}".format(
253 model_names
254 )
255 )
256 non_existing_models = []
257 models = await self.list_models()
258 existing_models = list(set(models).intersection(model_names))
259 non_existing_models = list(set(model_names) - set(existing_models))
260
261 return (
262 len(non_existing_models) == 0,
263 non_existing_models,
264 )
265
266 async def get_model_status(self, model_name: str) -> FullStatus:
267 """
268 Get model status
269
270 :param: model_name: Model name
271
272 :return: Full status object
273 """
274 controller = await self.get_controller()
275 model = await self.get_model(controller, model_name)
276 try:
277 return await model.get_status()
278 finally:
279 await self.disconnect_model(model)
280 await self.disconnect_controller(controller)
281
282 async def create_machine(
283 self,
284 model_name: str,
285 machine_id: str = None,
286 db_dict: dict = None,
287 progress_timeout: float = None,
288 total_timeout: float = None,
289 series: str = "xenial",
290 wait: bool = True,
291 ) -> (Machine, bool):
292 """
293 Create machine
294
295 :param: model_name: Model name
296 :param: machine_id: Machine id
297 :param: db_dict: Dictionary with data of the DB to write the updates
298 :param: progress_timeout: Maximum time between two updates in the model
299 :param: total_timeout: Timeout for the entity to be active
300 :param: series: Series of the machine (xenial, bionic, focal, ...)
301 :param: wait: Wait until machine is ready
302
303 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
304 if the machine is new or it already existed
305 """
306 new = False
307 machine = None
308
309 self.log.debug(
310 "Creating machine (id={}) in model: {}".format(machine_id, model_name)
311 )
312
313 # Get controller
314 controller = await self.get_controller()
315
316 # Get model
317 model = await self.get_model(controller, model_name)
318 try:
319 if machine_id is not None:
320 self.log.debug(
321 "Searching machine (id={}) in model {}".format(
322 machine_id, model_name
323 )
324 )
325
326 # Get machines from model and get the machine with machine_id if exists
327 machines = await model.get_machines()
328 if machine_id in machines:
329 self.log.debug(
330 "Machine (id={}) found in model {}".format(
331 machine_id, model_name
332 )
333 )
334 machine = machines[machine_id]
335 else:
336 raise JujuMachineNotFound("Machine {} not found".format(machine_id))
337
338 if machine is None:
339 self.log.debug("Creating a new machine in model {}".format(model_name))
340
341 # Create machine
342 machine = await model.add_machine(
343 spec=None, constraints=None, disks=None, series=series
344 )
345 new = True
346
347 # Wait until the machine is ready
348 self.log.debug(
349 "Wait until machine {} is ready in model {}".format(
350 machine.entity_id, model_name
351 )
352 )
353 if wait:
354 await JujuModelWatcher.wait_for(
355 model=model,
356 entity=machine,
357 progress_timeout=progress_timeout,
358 total_timeout=total_timeout,
359 db_dict=db_dict,
360 n2vc=self.n2vc,
361 )
362 finally:
363 await self.disconnect_model(model)
364 await self.disconnect_controller(controller)
365
366 self.log.debug(
367 "Machine {} ready at {} in model {}".format(
368 machine.entity_id, machine.dns_name, model_name
369 )
370 )
371 return machine, new
372
373 async def provision_machine(
374 self,
375 model_name: str,
376 hostname: str,
377 username: str,
378 private_key_path: str,
379 db_dict: dict = None,
380 progress_timeout: float = None,
381 total_timeout: float = None,
382 ) -> str:
383 """
384 Manually provisioning of a machine
385
386 :param: model_name: Model name
387 :param: hostname: IP to access the machine
388 :param: username: Username to login to the machine
389 :param: private_key_path: Local path for the private key
390 :param: db_dict: Dictionary with data of the DB to write the updates
391 :param: progress_timeout: Maximum time between two updates in the model
392 :param: total_timeout: Timeout for the entity to be active
393
394 :return: (Entity): Machine id
395 """
396 self.log.debug(
397 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
398 model_name, hostname, username
399 )
400 )
401
402 # Get controller
403 controller = await self.get_controller()
404
405 # Get model
406 model = await self.get_model(controller, model_name)
407
408 try:
409 # Get provisioner
410 provisioner = AsyncSSHProvisioner(
411 host=hostname,
412 user=username,
413 private_key_path=private_key_path,
414 log=self.log,
415 )
416
417 # Provision machine
418 params = await provisioner.provision_machine()
419
420 params.jobs = ["JobHostUnits"]
421
422 self.log.debug("Adding machine to model")
423 connection = model.connection()
424 client_facade = client.ClientFacade.from_connection(connection)
425
426 results = await client_facade.AddMachines(params=[params])
427 error = results.machines[0].error
428
429 if error:
430 msg = "Error adding machine: {}".format(error.message)
431 self.log.error(msg=msg)
432 raise ValueError(msg)
433
434 machine_id = results.machines[0].machine
435
436 self.log.debug("Installing Juju agent into machine {}".format(machine_id))
437 asyncio.ensure_future(
438 provisioner.install_agent(
439 connection=connection,
440 nonce=params.nonce,
441 machine_id=machine_id,
442 proxy=self.api_proxy,
443 )
444 )
445
446 machine = None
447 for _ in range(10):
448 machine_list = await model.get_machines()
449 if machine_id in machine_list:
450 self.log.debug("Machine {} found in model!".format(machine_id))
451 machine = model.machines.get(machine_id)
452 break
453 await asyncio.sleep(2)
454
455 if machine is None:
456 msg = "Machine {} not found in model".format(machine_id)
457 self.log.error(msg=msg)
458 raise JujuMachineNotFound(msg)
459
460 self.log.debug(
461 "Wait until machine {} is ready in model {}".format(
462 machine.entity_id, model_name
463 )
464 )
465 await JujuModelWatcher.wait_for(
466 model=model,
467 entity=machine,
468 progress_timeout=progress_timeout,
469 total_timeout=total_timeout,
470 db_dict=db_dict,
471 n2vc=self.n2vc,
472 )
473 except Exception as e:
474 raise e
475 finally:
476 await self.disconnect_model(model)
477 await self.disconnect_controller(controller)
478
479 self.log.debug(
480 "Machine provisioned {} in model {}".format(machine_id, model_name)
481 )
482
483 return machine_id
484
485 async def deploy_charm(
486 self,
487 application_name: str,
488 path: str,
489 model_name: str,
490 machine_id: str,
491 db_dict: dict = None,
492 progress_timeout: float = None,
493 total_timeout: float = None,
494 config: dict = None,
495 series: str = None,
496 num_units: int = 1,
497 ):
498 """Deploy charm
499
500 :param: application_name: Application name
501 :param: path: Local path to the charm
502 :param: model_name: Model name
503 :param: machine_id ID of the machine
504 :param: db_dict: Dictionary with data of the DB to write the updates
505 :param: progress_timeout: Maximum time between two updates in the model
506 :param: total_timeout: Timeout for the entity to be active
507 :param: config: Config for the charm
508 :param: series: Series of the charm
509 :param: num_units: Number of units
510
511 :return: (juju.application.Application): Juju application
512 """
513 self.log.debug(
514 "Deploying charm {} to machine {} in model ~{}".format(
515 application_name, machine_id, model_name
516 )
517 )
518 self.log.debug("charm: {}".format(path))
519
520 # Get controller
521 controller = await self.get_controller()
522
523 # Get model
524 model = await self.get_model(controller, model_name)
525
526 try:
527 application = None
528 if application_name not in model.applications:
529
530 if machine_id is not None:
531 if machine_id not in model.machines:
532 msg = "Machine {} not found in model".format(machine_id)
533 self.log.error(msg=msg)
534 raise JujuMachineNotFound(msg)
535 machine = model.machines[machine_id]
536 series = machine.series
537
538 application = await model.deploy(
539 entity_url=path,
540 application_name=application_name,
541 channel="stable",
542 num_units=1,
543 series=series,
544 to=machine_id,
545 config=config,
546 )
547
548 self.log.debug(
549 "Wait until application {} is ready in model {}".format(
550 application_name, model_name
551 )
552 )
553 if num_units > 1:
554 for _ in range(num_units - 1):
555 m, _ = await self.create_machine(model_name, wait=False)
556 await application.add_unit(to=m.entity_id)
557
558 await JujuModelWatcher.wait_for(
559 model=model,
560 entity=application,
561 progress_timeout=progress_timeout,
562 total_timeout=total_timeout,
563 db_dict=db_dict,
564 n2vc=self.n2vc,
565 )
566 self.log.debug(
567 "Application {} is ready in model {}".format(
568 application_name, model_name
569 )
570 )
571 else:
572 raise JujuApplicationExists(
573 "Application {} exists".format(application_name)
574 )
575 finally:
576 await self.disconnect_model(model)
577 await self.disconnect_controller(controller)
578
579 return application
580
581 def _get_application(self, model: Model, application_name: str) -> Application:
582 """Get application
583
584 :param: model: Model object
585 :param: application_name: Application name
586
587 :return: juju.application.Application (or None if it doesn't exist)
588 """
589 if model.applications and application_name in model.applications:
590 return model.applications[application_name]
591
592 async def execute_action(
593 self,
594 application_name: str,
595 model_name: str,
596 action_name: str,
597 db_dict: dict = None,
598 progress_timeout: float = None,
599 total_timeout: float = None,
600 **kwargs
601 ):
602 """Execute action
603
604 :param: application_name: Application name
605 :param: model_name: Model name
606 :param: action_name: Name of the action
607 :param: db_dict: Dictionary with data of the DB to write the updates
608 :param: progress_timeout: Maximum time between two updates in the model
609 :param: total_timeout: Timeout for the entity to be active
610
611 :return: (str, str): (output and status)
612 """
613 self.log.debug(
614 "Executing action {} using params {}".format(action_name, kwargs)
615 )
616 # Get controller
617 controller = await self.get_controller()
618
619 # Get model
620 model = await self.get_model(controller, model_name)
621
622 try:
623 # Get application
624 application = self._get_application(
625 model, application_name=application_name,
626 )
627 if application is None:
628 raise JujuApplicationNotFound("Cannot execute action")
629
630 # Get unit
631 unit = None
632 for u in application.units:
633 if await u.is_leader_from_status():
634 unit = u
635 if unit is None:
636 raise JujuLeaderUnitNotFound(
637 "Cannot execute action: leader unit not found"
638 )
639
640 actions = await application.get_actions()
641
642 if action_name not in actions:
643 raise JujuActionNotFound(
644 "Action {} not in available actions".format(action_name)
645 )
646
647 action = await unit.run_action(action_name, **kwargs)
648
649 self.log.debug(
650 "Wait until action {} is completed in application {} (model={})".format(
651 action_name, application_name, model_name
652 )
653 )
654 await JujuModelWatcher.wait_for(
655 model=model,
656 entity=action,
657 progress_timeout=progress_timeout,
658 total_timeout=total_timeout,
659 db_dict=db_dict,
660 n2vc=self.n2vc,
661 )
662
663 output = await model.get_action_output(action_uuid=action.entity_id)
664 status = await model.get_action_status(uuid_or_prefix=action.entity_id)
665 status = (
666 status[action.entity_id] if action.entity_id in status else "failed"
667 )
668
669 self.log.debug(
670 "Action {} completed with status {} in application {} (model={})".format(
671 action_name, action.status, application_name, model_name
672 )
673 )
674 finally:
675 await self.disconnect_model(model)
676 await self.disconnect_controller(controller)
677
678 return output, status
679
680 async def get_actions(self, application_name: str, model_name: str) -> dict:
681 """Get list of actions
682
683 :param: application_name: Application name
684 :param: model_name: Model name
685
686 :return: Dict with this format
687 {
688 "action_name": "Description of the action",
689 ...
690 }
691 """
692 self.log.debug(
693 "Getting list of actions for application {}".format(application_name)
694 )
695
696 # Get controller
697 controller = await self.get_controller()
698
699 # Get model
700 model = await self.get_model(controller, model_name)
701
702 try:
703 # Get application
704 application = self._get_application(
705 model, application_name=application_name,
706 )
707
708 # Return list of actions
709 return await application.get_actions()
710
711 finally:
712 # Disconnect from model and controller
713 await self.disconnect_model(model)
714 await self.disconnect_controller(controller)
715
716 async def get_metrics(self, model_name: str, application_name: str) -> dict:
717 """Get the metrics collected by the VCA.
718
719 :param model_name The name or unique id of the network service
720 :param application_name The name of the application
721 """
722 if not model_name or not application_name:
723 raise Exception("model_name and application_name must be non-empty strings")
724 metrics = {}
725 controller = await self.get_controller()
726 model = await self.get_model(controller, model_name)
727 try:
728 application = self._get_application(model, application_name)
729 if application is not None:
730 metrics = await application.get_metrics()
731 finally:
732 self.disconnect_model(model)
733 self.disconnect_controller(controller)
734 return metrics
735
736 async def add_relation(
737 self, model_name: str, endpoint_1: str, endpoint_2: str,
738 ):
739 """Add relation
740
741 :param: model_name: Model name
742 :param: endpoint_1 First endpoint name
743 ("app:endpoint" format or directly the saas name)
744 :param: endpoint_2: Second endpoint name (^ same format)
745 """
746
747 self.log.debug("Adding relation: {} -> {}".format(endpoint_1, endpoint_2))
748
749 # Get controller
750 controller = await self.get_controller()
751
752 # Get model
753 model = await self.get_model(controller, model_name)
754
755 # Add relation
756 try:
757 await model.add_relation(endpoint_1, endpoint_2)
758 except JujuAPIError as e:
759 if "not found" in e.message:
760 self.log.warning("Relation not found: {}".format(e.message))
761 return
762 if "already exists" in e.message:
763 self.log.warning("Relation already exists: {}".format(e.message))
764 return
765 # another exception, raise it
766 raise e
767 finally:
768 await self.disconnect_model(model)
769 await self.disconnect_controller(controller)
770
771 async def consume(
772 self, offer_url: str, model_name: str,
773 ):
774 """
775 Adds a remote offer to the model. Relations can be created later using "juju relate".
776
777 :param: offer_url: Offer Url
778 :param: model_name: Model name
779
780 :raises ParseError if there's a problem parsing the offer_url
781 :raises JujuError if remote offer includes and endpoint
782 :raises JujuAPIError if the operation is not successful
783 """
784 controller = await self.get_controller()
785 model = await controller.get_model(model_name)
786
787 try:
788 await model.consume(offer_url)
789 finally:
790 await self.disconnect_model(model)
791 await self.disconnect_controller(controller)
792
793 async def destroy_model(self, model_name: str, total_timeout: float):
794 """
795 Destroy model
796
797 :param: model_name: Model name
798 :param: total_timeout: Timeout
799 """
800
801 controller = await self.get_controller()
802 model = await self.get_model(controller, model_name)
803 try:
804 self.log.debug("Destroying model {}".format(model_name))
805 uuid = model.info.uuid
806
807 # Destroy machines
808 machines = await model.get_machines()
809 for machine_id in machines:
810 try:
811 await self.destroy_machine(
812 model, machine_id=machine_id, total_timeout=total_timeout,
813 )
814 except asyncio.CancelledError:
815 raise
816 except Exception:
817 pass
818
819 # Disconnect model
820 await self.disconnect_model(model)
821
822 # Destroy model
823 if model_name in self.models:
824 self.models.remove(model_name)
825
826 await controller.destroy_model(uuid)
827
828 # Wait until model is destroyed
829 self.log.debug("Waiting for model {} to be destroyed...".format(model_name))
830 last_exception = ""
831
832 if total_timeout is None:
833 total_timeout = 3600
834 end = time.time() + total_timeout
835 while time.time() < end:
836 try:
837 models = await controller.list_models()
838 if model_name not in models:
839 self.log.debug(
840 "The model {} ({}) was destroyed".format(model_name, uuid)
841 )
842 return
843 except asyncio.CancelledError:
844 raise
845 except Exception as e:
846 last_exception = e
847 await asyncio.sleep(5)
848 raise Exception(
849 "Timeout waiting for model {} to be destroyed {}".format(
850 model_name, last_exception
851 )
852 )
853 finally:
854 await self.disconnect_controller(controller)
855
856 async def destroy_application(self, model: Model, application_name: str):
857 """
858 Destroy application
859
860 :param: model: Model object
861 :param: application_name: Application name
862 """
863 self.log.debug(
864 "Destroying application {} in model {}".format(
865 application_name, model.info.name
866 )
867 )
868 application = model.applications.get(application_name)
869 if application:
870 await application.destroy()
871 else:
872 self.log.warning("Application not found: {}".format(application_name))
873
874 async def destroy_machine(
875 self, model: Model, machine_id: str, total_timeout: float = 3600
876 ):
877 """
878 Destroy machine
879
880 :param: model: Model object
881 :param: machine_id: Machine id
882 :param: total_timeout: Timeout in seconds
883 """
884 machines = await model.get_machines()
885 if machine_id in machines:
886 machine = machines[machine_id]
887 await machine.destroy(force=True)
888 # max timeout
889 end = time.time() + total_timeout
890
891 # wait for machine removal
892 machines = await model.get_machines()
893 while machine_id in machines and time.time() < end:
894 self.log.debug("Waiting for machine {} is destroyed".format(machine_id))
895 await asyncio.sleep(0.5)
896 machines = await model.get_machines()
897 self.log.debug("Machine destroyed: {}".format(machine_id))
898 else:
899 self.log.debug("Machine not found: {}".format(machine_id))
900
901 async def configure_application(
902 self, model_name: str, application_name: str, config: dict = None
903 ):
904 """Configure application
905
906 :param: model_name: Model name
907 :param: application_name: Application name
908 :param: config: Config to apply to the charm
909 """
910 self.log.debug("Configuring application {}".format(application_name))
911
912 if config:
913 try:
914 controller = await self.get_controller()
915 model = await self.get_model(controller, model_name)
916 application = self._get_application(
917 model, application_name=application_name,
918 )
919 await application.set_config(config)
920 finally:
921 await self.disconnect_model(model)
922 await self.disconnect_controller(controller)
923
924 def _get_api_endpoints_db(self) -> [str]:
925 """
926 Get API Endpoints from DB
927
928 :return: List of API endpoints
929 """
930 self.log.debug("Getting endpoints from database")
931
932 juju_info = self.db.get_one(
933 DB_DATA.api_endpoints.table,
934 q_filter=DB_DATA.api_endpoints.filter,
935 fail_on_empty=False,
936 )
937 if juju_info and DB_DATA.api_endpoints.key in juju_info:
938 return juju_info[DB_DATA.api_endpoints.key]
939
940 def _update_api_endpoints_db(self, endpoints: [str]):
941 """
942 Update API endpoints in Database
943
944 :param: List of endpoints
945 """
946 self.log.debug("Saving endpoints {} in database".format(endpoints))
947
948 juju_info = self.db.get_one(
949 DB_DATA.api_endpoints.table,
950 q_filter=DB_DATA.api_endpoints.filter,
951 fail_on_empty=False,
952 )
953 # If it doesn't, then create it
954 if not juju_info:
955 try:
956 self.db.create(
957 DB_DATA.api_endpoints.table, DB_DATA.api_endpoints.filter,
958 )
959 except DbException as e:
960 # Racing condition: check if another N2VC worker has created it
961 juju_info = self.db.get_one(
962 DB_DATA.api_endpoints.table,
963 q_filter=DB_DATA.api_endpoints.filter,
964 fail_on_empty=False,
965 )
966 if not juju_info:
967 raise e
968 self.db.set_one(
969 DB_DATA.api_endpoints.table,
970 DB_DATA.api_endpoints.filter,
971 {DB_DATA.api_endpoints.key: endpoints},
972 )
973
974 def handle_exception(self, loop, context):
975 # All unhandled exceptions by libjuju are handled here.
976 pass
977
978 async def health_check(self, interval: float = 300.0):
979 """
980 Health check to make sure controller and controller_model connections are OK
981
982 :param: interval: Time in seconds between checks
983 """
984 while True:
985 try:
986 controller = await self.get_controller()
987 # self.log.debug("VCA is alive")
988 except Exception as e:
989 self.log.error("Health check to VCA failed: {}".format(e))
990 finally:
991 await self.disconnect_controller(controller)
992 await asyncio.sleep(interval)
993
994 async def list_models(self, contains: str = None) -> [str]:
995 """List models with certain names
996
997 :param: contains: String that is contained in model name
998
999 :retur: [models] Returns list of model names
1000 """
1001
1002 controller = await self.get_controller()
1003 try:
1004 models = await controller.list_models()
1005 if contains:
1006 models = [model for model in models if contains in model]
1007 return models
1008 finally:
1009 await self.disconnect_controller(controller)
1010
1011 async def list_offers(self, model_name: str) -> QueryApplicationOffersResults:
1012 """List models with certain names
1013
1014 :param: model_name: Model name
1015
1016 :return: Returns list of offers
1017 """
1018
1019 controller = await self.get_controller()
1020 try:
1021 return await controller.list_offers(model_name)
1022 finally:
1023 await self.disconnect_controller(controller)
1024
1025 async def add_k8s(
1026 self, name: str, configuration: Configuration, storage_class: str
1027 ):
1028 """
1029 Add a Kubernetes cloud to the controller
1030
1031 Similar to the `juju add-k8s` command in the CLI
1032
1033 :param: name: Name for the K8s cloud
1034 :param: configuration: Kubernetes configuration object
1035 :param: storage_class: Storage Class to use in the cloud
1036 """
1037
1038 if not storage_class:
1039 raise Exception("storage_class must be a non-empty string")
1040 if not name:
1041 raise Exception("name must be a non-empty string")
1042 if not configuration:
1043 raise Exception("configuration must be provided")
1044
1045 endpoint = configuration.host
1046 credential = self.get_k8s_cloud_credential(configuration)
1047 ca_certificates = (
1048 [credential.attrs["ClientCertificateData"]]
1049 if "ClientCertificateData" in credential.attrs
1050 else []
1051 )
1052 cloud = client.Cloud(
1053 type_="kubernetes",
1054 auth_types=[credential.auth_type],
1055 endpoint=endpoint,
1056 ca_certificates=ca_certificates,
1057 config={
1058 "operator-storage": storage_class,
1059 "workload-storage": storage_class,
1060 },
1061 )
1062
1063 return await self.add_cloud(name, cloud, credential)
1064
1065 def get_k8s_cloud_credential(
1066 self, configuration: Configuration,
1067 ) -> client.CloudCredential:
1068 attrs = {}
1069 ca_cert = configuration.ssl_ca_cert or configuration.cert_file
1070 key = configuration.key_file
1071 api_key = configuration.api_key
1072 token = None
1073 username = configuration.username
1074 password = configuration.password
1075
1076 if "authorization" in api_key:
1077 authorization = api_key["authorization"]
1078 if "Bearer " in authorization:
1079 bearer_list = authorization.split(" ")
1080 if len(bearer_list) == 2:
1081 [_, token] = bearer_list
1082 else:
1083 raise JujuInvalidK8sConfiguration("unknown format of api_key")
1084 else:
1085 token = authorization
1086 if ca_cert:
1087 attrs["ClientCertificateData"] = open(ca_cert, "r").read()
1088 if key:
1089 attrs["ClientKeyData"] = open(key, "r").read()
1090 if token:
1091 if username or password:
1092 raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass")
1093 attrs["Token"] = token
1094
1095 auth_type = None
1096 if key:
1097 auth_type = "oauth2"
1098 if not token:
1099 raise JujuInvalidK8sConfiguration(
1100 "missing token for auth type {}".format(auth_type)
1101 )
1102 elif username:
1103 if not password:
1104 self.log.debug(
1105 "credential for user {} has empty password".format(username)
1106 )
1107 attrs["username"] = username
1108 attrs["password"] = password
1109 if ca_cert:
1110 auth_type = "userpasswithcert"
1111 else:
1112 auth_type = "userpass"
1113 elif ca_cert and token:
1114 auth_type = "certificate"
1115 else:
1116 raise JujuInvalidK8sConfiguration("authentication method not supported")
1117 return client.CloudCredential(auth_type=auth_type, attrs=attrs,)
1118
1119 async def add_cloud(
1120 self, name: str, cloud: Cloud, credential: CloudCredential = None
1121 ) -> Cloud:
1122 """
1123 Add cloud to the controller
1124
1125 :param: name: Name of the cloud to be added
1126 :param: cloud: Cloud object
1127 :param: credential: CloudCredentials object for the cloud
1128 """
1129 controller = await self.get_controller()
1130 try:
1131 _ = await controller.add_cloud(name, cloud)
1132 if credential:
1133 await controller.add_credential(name, credential=credential, cloud=name)
1134 # Need to return the object returned by the controller.add_cloud() function
1135 # I'm returning the original value now until this bug is fixed:
1136 # https://github.com/juju/python-libjuju/issues/443
1137 return cloud
1138 finally:
1139 await self.disconnect_controller(controller)
1140
1141 async def remove_cloud(self, name: str):
1142 """
1143 Remove cloud
1144
1145 :param: name: Name of the cloud to be removed
1146 """
1147 controller = await self.get_controller()
1148 try:
1149 await controller.remove_cloud(name)
1150 finally:
1151 await self.disconnect_controller(controller)