5a3ddbbb237fd1ea1436a775358d13ec204305e1
[osm/N2VC.git] / n2vc / libjuju.py
1 # Copyright 2020 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import logging
17 from juju.controller import Controller
18 from juju.client import client
19 import time
20
21 from juju.errors import JujuAPIError
22 from juju.model import Model
23 from juju.machine import Machine
24 from juju.application import Application
25 from juju.unit import Unit
26 from juju.client._definitions import (
27 FullStatus,
28 QueryApplicationOffersResults,
29 Cloud,
30 CloudCredential,
31 )
32 from n2vc.juju_watcher import JujuModelWatcher
33 from n2vc.provisioner import AsyncSSHProvisioner
34 from n2vc.n2vc_conn import N2VCConnector
35 from n2vc.exceptions import (
36 JujuMachineNotFound,
37 JujuApplicationNotFound,
38 JujuLeaderUnitNotFound,
39 JujuActionNotFound,
40 JujuModelAlreadyExists,
41 JujuControllerFailedConnecting,
42 JujuApplicationExists,
43 JujuInvalidK8sConfiguration,
44 )
45 from n2vc.utils import DB_DATA
46 from osm_common.dbbase import DbException
47 from kubernetes.client.configuration import Configuration
48
49
50 class Libjuju:
51 def __init__(
52 self,
53 endpoint: str,
54 api_proxy: str,
55 username: str,
56 password: str,
57 cacert: str,
58 loop: asyncio.AbstractEventLoop = None,
59 log: logging.Logger = None,
60 db: dict = None,
61 n2vc: N2VCConnector = None,
62 apt_mirror: str = None,
63 enable_os_upgrade: bool = True,
64 ):
65 """
66 Constructor
67
68 :param: endpoint: Endpoint of the juju controller (host:port)
69 :param: api_proxy: Endpoint of the juju controller - Reachable from the VNFs
70 :param: username: Juju username
71 :param: password: Juju password
72 :param: cacert: Juju CA Certificate
73 :param: loop: Asyncio loop
74 :param: log: Logger
75 :param: db: DB object
76 :param: n2vc: N2VC object
77 :param: apt_mirror: APT Mirror
78 :param: enable_os_upgrade: Enable OS Upgrade
79 """
80
81 self.log = log or logging.getLogger("Libjuju")
82 self.db = db
83 db_endpoints = self._get_api_endpoints_db()
84 self.endpoints = None
85 if (db_endpoints and endpoint not in db_endpoints) or not db_endpoints:
86 self.endpoints = [endpoint]
87 self._update_api_endpoints_db(self.endpoints)
88 else:
89 self.endpoints = db_endpoints
90 self.api_proxy = api_proxy
91 self.username = username
92 self.password = password
93 self.cacert = cacert
94 self.loop = loop or asyncio.get_event_loop()
95 self.n2vc = n2vc
96
97 # Generate config for models
98 self.model_config = {}
99 if apt_mirror:
100 self.model_config["apt-mirror"] = apt_mirror
101 self.model_config["enable-os-refresh-update"] = enable_os_upgrade
102 self.model_config["enable-os-upgrade"] = enable_os_upgrade
103
104 self.loop.set_exception_handler(self.handle_exception)
105 self.creating_model = asyncio.Lock(loop=self.loop)
106
107 self.models = set()
108 self.log.debug("Libjuju initialized!")
109
110 self.health_check_task = self._create_health_check_task()
111
112 def _create_health_check_task(self):
113 return self.loop.create_task(self.health_check())
114
115 async def get_controller(self, timeout: float = 5.0) -> Controller:
116 """
117 Get controller
118
119 :param: timeout: Time in seconds to wait for controller to connect
120 """
121 controller = None
122 try:
123 controller = Controller(loop=self.loop)
124 await asyncio.wait_for(
125 controller.connect(
126 endpoint=self.endpoints,
127 username=self.username,
128 password=self.password,
129 cacert=self.cacert,
130 ),
131 timeout=timeout,
132 )
133 endpoints = await controller.api_endpoints
134 if self.endpoints != endpoints:
135 self.endpoints = endpoints
136 self._update_api_endpoints_db(self.endpoints)
137 return controller
138 except asyncio.CancelledError as e:
139 raise e
140 except Exception as e:
141 self.log.error(
142 "Failed connecting to controller: {}...".format(self.endpoints)
143 )
144 if controller:
145 await self.disconnect_controller(controller)
146 raise JujuControllerFailedConnecting(e)
147
148 async def disconnect(self):
149 """Disconnect"""
150 # Cancel health check task
151 self.health_check_task.cancel()
152 self.log.debug("Libjuju disconnected!")
153
154 async def disconnect_model(self, model: Model):
155 """
156 Disconnect model
157
158 :param: model: Model that will be disconnected
159 """
160 await model.disconnect()
161
162 async def disconnect_controller(self, controller: Controller):
163 """
164 Disconnect controller
165
166 :param: controller: Controller that will be disconnected
167 """
168 if controller:
169 await controller.disconnect()
170
171 async def add_model(self, model_name: str, cloud_name: str, credential_name=None):
172 """
173 Create model
174
175 :param: model_name: Model name
176 :param: cloud_name: Cloud name
177 :param: credential_name: Credential name to use for adding the model
178 If not specified, same name as the cloud will be used.
179 """
180
181 # Get controller
182 controller = await self.get_controller()
183 model = None
184 try:
185 # Raise exception if model already exists
186 if await self.model_exists(model_name, controller=controller):
187 raise JujuModelAlreadyExists(
188 "Model {} already exists.".format(model_name)
189 )
190
191 # Block until other workers have finished model creation
192 while self.creating_model.locked():
193 await asyncio.sleep(0.1)
194
195 # If the model exists, return it from the controller
196 if model_name in self.models:
197 return
198
199 # Create the model
200 async with self.creating_model:
201 self.log.debug("Creating model {}".format(model_name))
202 model = await controller.add_model(
203 model_name,
204 config=self.model_config,
205 cloud_name=cloud_name,
206 credential_name=credential_name or cloud_name,
207 )
208 self.models.add(model_name)
209 finally:
210 if model:
211 await self.disconnect_model(model)
212 await self.disconnect_controller(controller)
213
214 async def get_model(
215 self, controller: Controller, model_name: str, id=None
216 ) -> Model:
217 """
218 Get model from controller
219
220 :param: controller: Controller
221 :param: model_name: Model name
222
223 :return: Model: The created Juju model object
224 """
225 return await controller.get_model(model_name)
226
227 async def model_exists(
228 self, model_name: str, controller: Controller = None
229 ) -> bool:
230 """
231 Check if model exists
232
233 :param: controller: Controller
234 :param: model_name: Model name
235
236 :return bool
237 """
238 need_to_disconnect = False
239
240 # Get controller if not passed
241 if not controller:
242 controller = await self.get_controller()
243 need_to_disconnect = True
244
245 # Check if model exists
246 try:
247 return model_name in await controller.list_models()
248 finally:
249 if need_to_disconnect:
250 await self.disconnect_controller(controller)
251
252 async def models_exist(self, model_names: [str]) -> (bool, list):
253 """
254 Check if models exists
255
256 :param: model_names: List of strings with model names
257
258 :return (bool, list[str]): (True if all models exists, List of model names that don't exist)
259 """
260 if not model_names:
261 raise Exception(
262 "model_names must be a non-empty array. Given value: {}".format(
263 model_names
264 )
265 )
266 non_existing_models = []
267 models = await self.list_models()
268 existing_models = list(set(models).intersection(model_names))
269 non_existing_models = list(set(model_names) - set(existing_models))
270
271 return (
272 len(non_existing_models) == 0,
273 non_existing_models,
274 )
275
276 async def get_model_status(self, model_name: str) -> FullStatus:
277 """
278 Get model status
279
280 :param: model_name: Model name
281
282 :return: Full status object
283 """
284 controller = await self.get_controller()
285 model = await self.get_model(controller, model_name)
286 try:
287 return await model.get_status()
288 finally:
289 await self.disconnect_model(model)
290 await self.disconnect_controller(controller)
291
292 async def create_machine(
293 self,
294 model_name: str,
295 machine_id: str = None,
296 db_dict: dict = None,
297 progress_timeout: float = None,
298 total_timeout: float = None,
299 series: str = "xenial",
300 wait: bool = True,
301 ) -> (Machine, bool):
302 """
303 Create machine
304
305 :param: model_name: Model name
306 :param: machine_id: Machine id
307 :param: db_dict: Dictionary with data of the DB to write the updates
308 :param: progress_timeout: Maximum time between two updates in the model
309 :param: total_timeout: Timeout for the entity to be active
310 :param: series: Series of the machine (xenial, bionic, focal, ...)
311 :param: wait: Wait until machine is ready
312
313 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
314 if the machine is new or it already existed
315 """
316 new = False
317 machine = None
318
319 self.log.debug(
320 "Creating machine (id={}) in model: {}".format(machine_id, model_name)
321 )
322
323 # Get controller
324 controller = await self.get_controller()
325
326 # Get model
327 model = await self.get_model(controller, model_name)
328 try:
329 if machine_id is not None:
330 self.log.debug(
331 "Searching machine (id={}) in model {}".format(
332 machine_id, model_name
333 )
334 )
335
336 # Get machines from model and get the machine with machine_id if exists
337 machines = await model.get_machines()
338 if machine_id in machines:
339 self.log.debug(
340 "Machine (id={}) found in model {}".format(
341 machine_id, model_name
342 )
343 )
344 machine = machines[machine_id]
345 else:
346 raise JujuMachineNotFound("Machine {} not found".format(machine_id))
347
348 if machine is None:
349 self.log.debug("Creating a new machine in model {}".format(model_name))
350
351 # Create machine
352 machine = await model.add_machine(
353 spec=None, constraints=None, disks=None, series=series
354 )
355 new = True
356
357 # Wait until the machine is ready
358 self.log.debug(
359 "Wait until machine {} is ready in model {}".format(
360 machine.entity_id, model_name
361 )
362 )
363 if wait:
364 await JujuModelWatcher.wait_for(
365 model=model,
366 entity=machine,
367 progress_timeout=progress_timeout,
368 total_timeout=total_timeout,
369 db_dict=db_dict,
370 n2vc=self.n2vc,
371 )
372 finally:
373 await self.disconnect_model(model)
374 await self.disconnect_controller(controller)
375
376 self.log.debug(
377 "Machine {} ready at {} in model {}".format(
378 machine.entity_id, machine.dns_name, model_name
379 )
380 )
381 return machine, new
382
383 async def provision_machine(
384 self,
385 model_name: str,
386 hostname: str,
387 username: str,
388 private_key_path: str,
389 db_dict: dict = None,
390 progress_timeout: float = None,
391 total_timeout: float = None,
392 ) -> str:
393 """
394 Manually provisioning of a machine
395
396 :param: model_name: Model name
397 :param: hostname: IP to access the machine
398 :param: username: Username to login to the machine
399 :param: private_key_path: Local path for the private key
400 :param: db_dict: Dictionary with data of the DB to write the updates
401 :param: progress_timeout: Maximum time between two updates in the model
402 :param: total_timeout: Timeout for the entity to be active
403
404 :return: (Entity): Machine id
405 """
406 self.log.debug(
407 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
408 model_name, hostname, username
409 )
410 )
411
412 # Get controller
413 controller = await self.get_controller()
414
415 # Get model
416 model = await self.get_model(controller, model_name)
417
418 try:
419 # Get provisioner
420 provisioner = AsyncSSHProvisioner(
421 host=hostname,
422 user=username,
423 private_key_path=private_key_path,
424 log=self.log,
425 )
426
427 # Provision machine
428 params = await provisioner.provision_machine()
429
430 params.jobs = ["JobHostUnits"]
431
432 self.log.debug("Adding machine to model")
433 connection = model.connection()
434 client_facade = client.ClientFacade.from_connection(connection)
435
436 results = await client_facade.AddMachines(params=[params])
437 error = results.machines[0].error
438
439 if error:
440 msg = "Error adding machine: {}".format(error.message)
441 self.log.error(msg=msg)
442 raise ValueError(msg)
443
444 machine_id = results.machines[0].machine
445
446 self.log.debug("Installing Juju agent into machine {}".format(machine_id))
447 asyncio.ensure_future(
448 provisioner.install_agent(
449 connection=connection,
450 nonce=params.nonce,
451 machine_id=machine_id,
452 proxy=self.api_proxy,
453 )
454 )
455
456 machine = None
457 for _ in range(10):
458 machine_list = await model.get_machines()
459 if machine_id in machine_list:
460 self.log.debug("Machine {} found in model!".format(machine_id))
461 machine = model.machines.get(machine_id)
462 break
463 await asyncio.sleep(2)
464
465 if machine is None:
466 msg = "Machine {} not found in model".format(machine_id)
467 self.log.error(msg=msg)
468 raise JujuMachineNotFound(msg)
469
470 self.log.debug(
471 "Wait until machine {} is ready in model {}".format(
472 machine.entity_id, model_name
473 )
474 )
475 await JujuModelWatcher.wait_for(
476 model=model,
477 entity=machine,
478 progress_timeout=progress_timeout,
479 total_timeout=total_timeout,
480 db_dict=db_dict,
481 n2vc=self.n2vc,
482 )
483 except Exception as e:
484 raise e
485 finally:
486 await self.disconnect_model(model)
487 await self.disconnect_controller(controller)
488
489 self.log.debug(
490 "Machine provisioned {} in model {}".format(machine_id, model_name)
491 )
492
493 return machine_id
494
495 async def deploy(
496 self, uri: str, model_name: str, wait: bool = True, timeout: float = 3600
497 ):
498 """
499 Deploy bundle or charm: Similar to the juju CLI command `juju deploy`
500
501 :param: uri: Path or Charm Store uri in which the charm or bundle can be found
502 :param: model_name: Model name
503 :param: wait: Indicates whether to wait or not until all applications are active
504 :param: timeout: Time in seconds to wait until all applications are active
505 """
506 controller = await self.get_controller()
507 model = await self.get_model(controller, model_name)
508 try:
509 await model.deploy(uri)
510 if wait:
511 await JujuModelWatcher.wait_for_model(model, timeout=timeout)
512 self.log.debug("All units active in model {}".format(model_name))
513 finally:
514 await self.disconnect_model(model)
515 await self.disconnect_controller(controller)
516
517 async def deploy_charm(
518 self,
519 application_name: str,
520 path: str,
521 model_name: str,
522 machine_id: str,
523 db_dict: dict = None,
524 progress_timeout: float = None,
525 total_timeout: float = None,
526 config: dict = None,
527 series: str = None,
528 num_units: int = 1,
529 ):
530 """Deploy charm
531
532 :param: application_name: Application name
533 :param: path: Local path to the charm
534 :param: model_name: Model name
535 :param: machine_id ID of the machine
536 :param: db_dict: Dictionary with data of the DB to write the updates
537 :param: progress_timeout: Maximum time between two updates in the model
538 :param: total_timeout: Timeout for the entity to be active
539 :param: config: Config for the charm
540 :param: series: Series of the charm
541 :param: num_units: Number of units
542
543 :return: (juju.application.Application): Juju application
544 """
545 self.log.debug(
546 "Deploying charm {} to machine {} in model ~{}".format(
547 application_name, machine_id, model_name
548 )
549 )
550 self.log.debug("charm: {}".format(path))
551
552 # Get controller
553 controller = await self.get_controller()
554
555 # Get model
556 model = await self.get_model(controller, model_name)
557
558 try:
559 application = None
560 if application_name not in model.applications:
561
562 if machine_id is not None:
563 if machine_id not in model.machines:
564 msg = "Machine {} not found in model".format(machine_id)
565 self.log.error(msg=msg)
566 raise JujuMachineNotFound(msg)
567 machine = model.machines[machine_id]
568 series = machine.series
569
570 application = await model.deploy(
571 entity_url=path,
572 application_name=application_name,
573 channel="stable",
574 num_units=1,
575 series=series,
576 to=machine_id,
577 config=config,
578 )
579
580 self.log.debug(
581 "Wait until application {} is ready in model {}".format(
582 application_name, model_name
583 )
584 )
585 if num_units > 1:
586 for _ in range(num_units - 1):
587 m, _ = await self.create_machine(model_name, wait=False)
588 await application.add_unit(to=m.entity_id)
589
590 await JujuModelWatcher.wait_for(
591 model=model,
592 entity=application,
593 progress_timeout=progress_timeout,
594 total_timeout=total_timeout,
595 db_dict=db_dict,
596 n2vc=self.n2vc,
597 )
598 self.log.debug(
599 "Application {} is ready in model {}".format(
600 application_name, model_name
601 )
602 )
603 else:
604 raise JujuApplicationExists(
605 "Application {} exists".format(application_name)
606 )
607 finally:
608 await self.disconnect_model(model)
609 await self.disconnect_controller(controller)
610
611 return application
612
613 def _get_application(self, model: Model, application_name: str) -> Application:
614 """Get application
615
616 :param: model: Model object
617 :param: application_name: Application name
618
619 :return: juju.application.Application (or None if it doesn't exist)
620 """
621 if model.applications and application_name in model.applications:
622 return model.applications[application_name]
623
624 async def execute_action(
625 self,
626 application_name: str,
627 model_name: str,
628 action_name: str,
629 db_dict: dict = None,
630 progress_timeout: float = None,
631 total_timeout: float = None,
632 **kwargs
633 ):
634 """Execute action
635
636 :param: application_name: Application name
637 :param: model_name: Model name
638 :param: action_name: Name of the action
639 :param: db_dict: Dictionary with data of the DB to write the updates
640 :param: progress_timeout: Maximum time between two updates in the model
641 :param: total_timeout: Timeout for the entity to be active
642
643 :return: (str, str): (output and status)
644 """
645 self.log.debug(
646 "Executing action {} using params {}".format(action_name, kwargs)
647 )
648 # Get controller
649 controller = await self.get_controller()
650
651 # Get model
652 model = await self.get_model(controller, model_name)
653
654 try:
655 # Get application
656 application = self._get_application(
657 model, application_name=application_name,
658 )
659 if application is None:
660 raise JujuApplicationNotFound("Cannot execute action")
661
662 # Get leader unit
663 # Racing condition:
664 # Ocassionally, self._get_leader_unit() will return None
665 # because the leader elected hook has not been triggered yet.
666 # Therefore, we are doing some retries. If it happens again,
667 # re-open bug 1236
668 attempts = 3
669 time_between_retries = 10
670 unit = None
671 for _ in range(attempts):
672 unit = await self._get_leader_unit(application)
673 if unit is None:
674 await asyncio.sleep(time_between_retries)
675 else:
676 break
677 if unit is None:
678 raise JujuLeaderUnitNotFound(
679 "Cannot execute action: leader unit not found"
680 )
681
682 actions = await application.get_actions()
683
684 if action_name not in actions:
685 raise JujuActionNotFound(
686 "Action {} not in available actions".format(action_name)
687 )
688
689 action = await unit.run_action(action_name, **kwargs)
690
691 self.log.debug(
692 "Wait until action {} is completed in application {} (model={})".format(
693 action_name, application_name, model_name
694 )
695 )
696 await JujuModelWatcher.wait_for(
697 model=model,
698 entity=action,
699 progress_timeout=progress_timeout,
700 total_timeout=total_timeout,
701 db_dict=db_dict,
702 n2vc=self.n2vc,
703 )
704
705 output = await model.get_action_output(action_uuid=action.entity_id)
706 status = await model.get_action_status(uuid_or_prefix=action.entity_id)
707 status = (
708 status[action.entity_id] if action.entity_id in status else "failed"
709 )
710
711 self.log.debug(
712 "Action {} completed with status {} in application {} (model={})".format(
713 action_name, action.status, application_name, model_name
714 )
715 )
716 finally:
717 await self.disconnect_model(model)
718 await self.disconnect_controller(controller)
719
720 return output, status
721
722 async def get_actions(self, application_name: str, model_name: str) -> dict:
723 """Get list of actions
724
725 :param: application_name: Application name
726 :param: model_name: Model name
727
728 :return: Dict with this format
729 {
730 "action_name": "Description of the action",
731 ...
732 }
733 """
734 self.log.debug(
735 "Getting list of actions for application {}".format(application_name)
736 )
737
738 # Get controller
739 controller = await self.get_controller()
740
741 # Get model
742 model = await self.get_model(controller, model_name)
743
744 try:
745 # Get application
746 application = self._get_application(
747 model, application_name=application_name,
748 )
749
750 # Return list of actions
751 return await application.get_actions()
752
753 finally:
754 # Disconnect from model and controller
755 await self.disconnect_model(model)
756 await self.disconnect_controller(controller)
757
758 async def get_metrics(self, model_name: str, application_name: str) -> dict:
759 """Get the metrics collected by the VCA.
760
761 :param model_name The name or unique id of the network service
762 :param application_name The name of the application
763 """
764 if not model_name or not application_name:
765 raise Exception("model_name and application_name must be non-empty strings")
766 metrics = {}
767 controller = await self.get_controller()
768 model = await self.get_model(controller, model_name)
769 try:
770 application = self._get_application(model, application_name)
771 if application is not None:
772 metrics = await application.get_metrics()
773 finally:
774 self.disconnect_model(model)
775 self.disconnect_controller(controller)
776 return metrics
777
778 async def add_relation(
779 self, model_name: str, endpoint_1: str, endpoint_2: str,
780 ):
781 """Add relation
782
783 :param: model_name: Model name
784 :param: endpoint_1 First endpoint name
785 ("app:endpoint" format or directly the saas name)
786 :param: endpoint_2: Second endpoint name (^ same format)
787 """
788
789 self.log.debug("Adding relation: {} -> {}".format(endpoint_1, endpoint_2))
790
791 # Get controller
792 controller = await self.get_controller()
793
794 # Get model
795 model = await self.get_model(controller, model_name)
796
797 # Add relation
798 try:
799 await model.add_relation(endpoint_1, endpoint_2)
800 except JujuAPIError as e:
801 if "not found" in e.message:
802 self.log.warning("Relation not found: {}".format(e.message))
803 return
804 if "already exists" in e.message:
805 self.log.warning("Relation already exists: {}".format(e.message))
806 return
807 # another exception, raise it
808 raise e
809 finally:
810 await self.disconnect_model(model)
811 await self.disconnect_controller(controller)
812
813 async def consume(
814 self, offer_url: str, model_name: str,
815 ):
816 """
817 Adds a remote offer to the model. Relations can be created later using "juju relate".
818
819 :param: offer_url: Offer Url
820 :param: model_name: Model name
821
822 :raises ParseError if there's a problem parsing the offer_url
823 :raises JujuError if remote offer includes and endpoint
824 :raises JujuAPIError if the operation is not successful
825 """
826 controller = await self.get_controller()
827 model = await controller.get_model(model_name)
828
829 try:
830 await model.consume(offer_url)
831 finally:
832 await self.disconnect_model(model)
833 await self.disconnect_controller(controller)
834
835 async def destroy_model(self, model_name: str, total_timeout: float):
836 """
837 Destroy model
838
839 :param: model_name: Model name
840 :param: total_timeout: Timeout
841 """
842
843 controller = await self.get_controller()
844 model = await self.get_model(controller, model_name)
845 try:
846 self.log.debug("Destroying model {}".format(model_name))
847 uuid = model.info.uuid
848
849 # Destroy machines that are manually provisioned
850 # and still are in pending state
851 await self._destroy_pending_machines(model, only_manual=True)
852
853 # Disconnect model
854 await self.disconnect_model(model)
855
856 # Destroy model
857 if model_name in self.models:
858 self.models.remove(model_name)
859
860 await controller.destroy_model(uuid, force=True, max_wait=0)
861
862 # Wait until model is destroyed
863 self.log.debug("Waiting for model {} to be destroyed...".format(model_name))
864
865 if total_timeout is None:
866 total_timeout = 3600
867 end = time.time() + total_timeout
868 while time.time() < end:
869 models = await controller.list_models()
870 if model_name not in models:
871 self.log.debug(
872 "The model {} ({}) was destroyed".format(model_name, uuid)
873 )
874 return
875 await asyncio.sleep(5)
876 raise Exception(
877 "Timeout waiting for model {} to be destroyed".format(model_name)
878 )
879 finally:
880 await self.disconnect_controller(controller)
881
882 async def destroy_application(self, model: Model, application_name: str):
883 """
884 Destroy application
885
886 :param: model: Model object
887 :param: application_name: Application name
888 """
889 self.log.debug(
890 "Destroying application {} in model {}".format(
891 application_name, model.info.name
892 )
893 )
894 application = model.applications.get(application_name)
895 if application:
896 await application.destroy()
897 else:
898 self.log.warning("Application not found: {}".format(application_name))
899
900 async def _destroy_pending_machines(self, model: Model, only_manual: bool = False):
901 """
902 Destroy pending machines in a given model
903
904 :param: only_manual: Bool that indicates only manually provisioned
905 machines should be destroyed (if True), or that
906 all pending machines should be destroyed
907 """
908 status = await model.get_status()
909 for machine_id in status.machines:
910 machine_status = status.machines[machine_id]
911 if machine_status.agent_status.status == "pending":
912 if only_manual and not machine_status.instance_id.startswith("manual:"):
913 break
914 machine = model.machines[machine_id]
915 await machine.destroy(force=True)
916
917 # async def destroy_machine(
918 # self, model: Model, machine_id: str, total_timeout: float = 3600
919 # ):
920 # """
921 # Destroy machine
922
923 # :param: model: Model object
924 # :param: machine_id: Machine id
925 # :param: total_timeout: Timeout in seconds
926 # """
927 # machines = await model.get_machines()
928 # if machine_id in machines:
929 # machine = machines[machine_id]
930 # await machine.destroy(force=True)
931 # # max timeout
932 # end = time.time() + total_timeout
933
934 # # wait for machine removal
935 # machines = await model.get_machines()
936 # while machine_id in machines and time.time() < end:
937 # self.log.debug("Waiting for machine {} is destroyed".format(machine_id))
938 # await asyncio.sleep(0.5)
939 # machines = await model.get_machines()
940 # self.log.debug("Machine destroyed: {}".format(machine_id))
941 # else:
942 # self.log.debug("Machine not found: {}".format(machine_id))
943
944 async def configure_application(
945 self, model_name: str, application_name: str, config: dict = None
946 ):
947 """Configure application
948
949 :param: model_name: Model name
950 :param: application_name: Application name
951 :param: config: Config to apply to the charm
952 """
953 self.log.debug("Configuring application {}".format(application_name))
954
955 if config:
956 controller = await self.get_controller()
957 model = None
958 try:
959 model = await self.get_model(controller, model_name)
960 application = self._get_application(
961 model, application_name=application_name,
962 )
963 await application.set_config(config)
964 finally:
965 if model:
966 await self.disconnect_model(model)
967 await self.disconnect_controller(controller)
968
969 def _get_api_endpoints_db(self) -> [str]:
970 """
971 Get API Endpoints from DB
972
973 :return: List of API endpoints
974 """
975 self.log.debug("Getting endpoints from database")
976
977 juju_info = self.db.get_one(
978 DB_DATA.api_endpoints.table,
979 q_filter=DB_DATA.api_endpoints.filter,
980 fail_on_empty=False,
981 )
982 if juju_info and DB_DATA.api_endpoints.key in juju_info:
983 return juju_info[DB_DATA.api_endpoints.key]
984
985 def _update_api_endpoints_db(self, endpoints: [str]):
986 """
987 Update API endpoints in Database
988
989 :param: List of endpoints
990 """
991 self.log.debug("Saving endpoints {} in database".format(endpoints))
992
993 juju_info = self.db.get_one(
994 DB_DATA.api_endpoints.table,
995 q_filter=DB_DATA.api_endpoints.filter,
996 fail_on_empty=False,
997 )
998 # If it doesn't, then create it
999 if not juju_info:
1000 try:
1001 self.db.create(
1002 DB_DATA.api_endpoints.table, DB_DATA.api_endpoints.filter,
1003 )
1004 except DbException as e:
1005 # Racing condition: check if another N2VC worker has created it
1006 juju_info = self.db.get_one(
1007 DB_DATA.api_endpoints.table,
1008 q_filter=DB_DATA.api_endpoints.filter,
1009 fail_on_empty=False,
1010 )
1011 if not juju_info:
1012 raise e
1013 self.db.set_one(
1014 DB_DATA.api_endpoints.table,
1015 DB_DATA.api_endpoints.filter,
1016 {DB_DATA.api_endpoints.key: endpoints},
1017 )
1018
1019 def handle_exception(self, loop, context):
1020 # All unhandled exceptions by libjuju are handled here.
1021 pass
1022
1023 async def health_check(self, interval: float = 300.0):
1024 """
1025 Health check to make sure controller and controller_model connections are OK
1026
1027 :param: interval: Time in seconds between checks
1028 """
1029 controller = None
1030 while True:
1031 try:
1032 controller = await self.get_controller()
1033 # self.log.debug("VCA is alive")
1034 except Exception as e:
1035 self.log.error("Health check to VCA failed: {}".format(e))
1036 finally:
1037 await self.disconnect_controller(controller)
1038 await asyncio.sleep(interval)
1039
1040 async def list_models(self, contains: str = None) -> [str]:
1041 """List models with certain names
1042
1043 :param: contains: String that is contained in model name
1044
1045 :retur: [models] Returns list of model names
1046 """
1047
1048 controller = await self.get_controller()
1049 try:
1050 models = await controller.list_models()
1051 if contains:
1052 models = [model for model in models if contains in model]
1053 return models
1054 finally:
1055 await self.disconnect_controller(controller)
1056
1057 async def list_offers(self, model_name: str) -> QueryApplicationOffersResults:
1058 """List models with certain names
1059
1060 :param: model_name: Model name
1061
1062 :return: Returns list of offers
1063 """
1064
1065 controller = await self.get_controller()
1066 try:
1067 return await controller.list_offers(model_name)
1068 finally:
1069 await self.disconnect_controller(controller)
1070
1071 async def add_k8s(
1072 self,
1073 name: str,
1074 configuration: Configuration,
1075 storage_class: str,
1076 credential_name: str = None,
1077 ):
1078 """
1079 Add a Kubernetes cloud to the controller
1080
1081 Similar to the `juju add-k8s` command in the CLI
1082
1083 :param: name: Name for the K8s cloud
1084 :param: configuration: Kubernetes configuration object
1085 :param: storage_class: Storage Class to use in the cloud
1086 :param: credential_name: Storage Class to use in the cloud
1087 """
1088
1089 if not storage_class:
1090 raise Exception("storage_class must be a non-empty string")
1091 if not name:
1092 raise Exception("name must be a non-empty string")
1093 if not configuration:
1094 raise Exception("configuration must be provided")
1095
1096 endpoint = configuration.host
1097 credential = self.get_k8s_cloud_credential(configuration)
1098 ca_certificates = (
1099 [credential.attrs["ClientCertificateData"]]
1100 if "ClientCertificateData" in credential.attrs
1101 else []
1102 )
1103 cloud = client.Cloud(
1104 type_="kubernetes",
1105 auth_types=[credential.auth_type],
1106 endpoint=endpoint,
1107 ca_certificates=ca_certificates,
1108 config={
1109 "operator-storage": storage_class,
1110 "workload-storage": storage_class,
1111 },
1112 )
1113
1114 return await self.add_cloud(
1115 name, cloud, credential, credential_name=credential_name
1116 )
1117
1118 def get_k8s_cloud_credential(
1119 self, configuration: Configuration,
1120 ) -> client.CloudCredential:
1121 attrs = {}
1122 ca_cert = configuration.ssl_ca_cert or configuration.cert_file
1123 key = configuration.key_file
1124 api_key = configuration.api_key
1125 token = None
1126 username = configuration.username
1127 password = configuration.password
1128
1129 if "authorization" in api_key:
1130 authorization = api_key["authorization"]
1131 if "Bearer " in authorization:
1132 bearer_list = authorization.split(" ")
1133 if len(bearer_list) == 2:
1134 [_, token] = bearer_list
1135 else:
1136 raise JujuInvalidK8sConfiguration("unknown format of api_key")
1137 else:
1138 token = authorization
1139 if ca_cert:
1140 attrs["ClientCertificateData"] = open(ca_cert, "r").read()
1141 if key:
1142 attrs["ClientKeyData"] = open(key, "r").read()
1143 if token:
1144 if username or password:
1145 raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass")
1146 attrs["Token"] = token
1147
1148 auth_type = None
1149 if key:
1150 auth_type = "oauth2"
1151 if not token:
1152 raise JujuInvalidK8sConfiguration(
1153 "missing token for auth type {}".format(auth_type)
1154 )
1155 elif username:
1156 if not password:
1157 self.log.debug(
1158 "credential for user {} has empty password".format(username)
1159 )
1160 attrs["username"] = username
1161 attrs["password"] = password
1162 if ca_cert:
1163 auth_type = "userpasswithcert"
1164 else:
1165 auth_type = "userpass"
1166 elif ca_cert and token:
1167 auth_type = "certificate"
1168 else:
1169 raise JujuInvalidK8sConfiguration("authentication method not supported")
1170 return client.CloudCredential(auth_type=auth_type, attrs=attrs)
1171
1172 async def add_cloud(
1173 self,
1174 name: str,
1175 cloud: Cloud,
1176 credential: CloudCredential = None,
1177 credential_name: str = None,
1178 ) -> Cloud:
1179 """
1180 Add cloud to the controller
1181
1182 :param: name: Name of the cloud to be added
1183 :param: cloud: Cloud object
1184 :param: credential: CloudCredentials object for the cloud
1185 :param: credential_name: Credential name.
1186 If not defined, cloud of the name will be used.
1187 """
1188 controller = await self.get_controller()
1189 try:
1190 _ = await controller.add_cloud(name, cloud)
1191 if credential:
1192 await controller.add_credential(
1193 credential_name or name, credential=credential, cloud=name
1194 )
1195 # Need to return the object returned by the controller.add_cloud() function
1196 # I'm returning the original value now until this bug is fixed:
1197 # https://github.com/juju/python-libjuju/issues/443
1198 return cloud
1199 finally:
1200 await self.disconnect_controller(controller)
1201
1202 async def remove_cloud(self, name: str):
1203 """
1204 Remove cloud
1205
1206 :param: name: Name of the cloud to be removed
1207 """
1208 controller = await self.get_controller()
1209 try:
1210 await controller.remove_cloud(name)
1211 finally:
1212 await self.disconnect_controller(controller)
1213
1214 async def _get_leader_unit(self, application: Application) -> Unit:
1215 unit = None
1216 for u in application.units:
1217 if await u.is_leader_from_status():
1218 unit = u
1219 break
1220 return unit