161370bd1126a88ae3066a2f2a5cea847e23f2b9
[osm/N2VC.git] / n2vc / libjuju.py
1 # Copyright 2020 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import logging
17
18 import time
19
20 from juju.errors import JujuAPIError
21 from juju.model import Model
22 from juju.machine import Machine
23 from juju.application import Application
24 from juju.unit import Unit
25 from juju.client._definitions import (
26 FullStatus,
27 QueryApplicationOffersResults,
28 Cloud,
29 CloudCredential,
30 )
31 from juju.controller import Controller
32 from juju.client import client
33 from juju import tag
34
35 from n2vc.juju_watcher import JujuModelWatcher
36 from n2vc.provisioner import AsyncSSHProvisioner
37 from n2vc.n2vc_conn import N2VCConnector
38 from n2vc.exceptions import (
39 JujuMachineNotFound,
40 JujuApplicationNotFound,
41 JujuLeaderUnitNotFound,
42 JujuActionNotFound,
43 JujuControllerFailedConnecting,
44 JujuApplicationExists,
45 JujuInvalidK8sConfiguration,
46 )
47 from n2vc.utils import DB_DATA
48 from osm_common.dbbase import DbException
49 from kubernetes.client.configuration import Configuration
50
51 RBAC_LABEL_KEY_NAME = "rbac-id"
52
53
54 class Libjuju:
55 def __init__(
56 self,
57 endpoint: str,
58 api_proxy: str,
59 username: str,
60 password: str,
61 cacert: str,
62 loop: asyncio.AbstractEventLoop = None,
63 log: logging.Logger = None,
64 db: dict = None,
65 n2vc: N2VCConnector = None,
66 apt_mirror: str = None,
67 enable_os_upgrade: bool = True,
68 ):
69 """
70 Constructor
71
72 :param: endpoint: Endpoint of the juju controller (host:port)
73 :param: api_proxy: Endpoint of the juju controller - Reachable from the VNFs
74 :param: username: Juju username
75 :param: password: Juju password
76 :param: cacert: Juju CA Certificate
77 :param: loop: Asyncio loop
78 :param: log: Logger
79 :param: db: DB object
80 :param: n2vc: N2VC object
81 :param: apt_mirror: APT Mirror
82 :param: enable_os_upgrade: Enable OS Upgrade
83 """
84
85 self.log = log or logging.getLogger("Libjuju")
86 self.db = db
87 db_endpoints = self._get_api_endpoints_db()
88 self.endpoints = None
89 if (db_endpoints and endpoint not in db_endpoints) or not db_endpoints:
90 self.endpoints = [endpoint]
91 self._update_api_endpoints_db(self.endpoints)
92 else:
93 self.endpoints = db_endpoints
94 self.api_proxy = api_proxy
95 self.username = username
96 self.password = password
97 self.cacert = cacert
98 self.loop = loop or asyncio.get_event_loop()
99 self.n2vc = n2vc
100
101 # Generate config for models
102 self.model_config = {}
103 if apt_mirror:
104 self.model_config["apt-mirror"] = apt_mirror
105 self.model_config["enable-os-refresh-update"] = enable_os_upgrade
106 self.model_config["enable-os-upgrade"] = enable_os_upgrade
107
108 self.loop.set_exception_handler(self.handle_exception)
109 self.creating_model = asyncio.Lock(loop=self.loop)
110
111 self.log.debug("Libjuju initialized!")
112
113 self.health_check_task = self._create_health_check_task()
114
115 def _create_health_check_task(self):
116 return self.loop.create_task(self.health_check())
117
118 async def get_controller(self, timeout: float = 15.0) -> Controller:
119 """
120 Get controller
121
122 :param: timeout: Time in seconds to wait for controller to connect
123 """
124 controller = None
125 try:
126 controller = Controller(loop=self.loop)
127 await asyncio.wait_for(
128 controller.connect(
129 endpoint=self.endpoints,
130 username=self.username,
131 password=self.password,
132 cacert=self.cacert,
133 ),
134 timeout=timeout,
135 )
136 endpoints = await controller.api_endpoints
137 if self.endpoints != endpoints:
138 self.endpoints = endpoints
139 self._update_api_endpoints_db(self.endpoints)
140 return controller
141 except asyncio.CancelledError as e:
142 raise e
143 except Exception as e:
144 self.log.error(
145 "Failed connecting to controller: {}...".format(self.endpoints)
146 )
147 if controller:
148 await self.disconnect_controller(controller)
149 raise JujuControllerFailedConnecting(e)
150
151 async def disconnect(self):
152 """Disconnect"""
153 # Cancel health check task
154 self.health_check_task.cancel()
155 self.log.debug("Libjuju disconnected!")
156
157 async def disconnect_model(self, model: Model):
158 """
159 Disconnect model
160
161 :param: model: Model that will be disconnected
162 """
163 await model.disconnect()
164
165 async def disconnect_controller(self, controller: Controller):
166 """
167 Disconnect controller
168
169 :param: controller: Controller that will be disconnected
170 """
171 if controller:
172 await controller.disconnect()
173
174 async def add_model(self, model_name: str, cloud_name: str, credential_name=None):
175 """
176 Create model
177
178 :param: model_name: Model name
179 :param: cloud_name: Cloud name
180 :param: credential_name: Credential name to use for adding the model
181 If not specified, same name as the cloud will be used.
182 """
183
184 # Get controller
185 controller = await self.get_controller()
186 model = None
187 try:
188 # Block until other workers have finished model creation
189 while self.creating_model.locked():
190 await asyncio.sleep(0.1)
191
192 # Create the model
193 async with self.creating_model:
194 if await self.model_exists(model_name, controller=controller):
195 return
196 self.log.debug("Creating model {}".format(model_name))
197 model = await controller.add_model(
198 model_name,
199 config=self.model_config,
200 cloud_name=cloud_name,
201 credential_name=credential_name or cloud_name,
202 )
203 finally:
204 if model:
205 await self.disconnect_model(model)
206 await self.disconnect_controller(controller)
207
208 async def get_model(
209 self, controller: Controller, model_name: str, id=None
210 ) -> Model:
211 """
212 Get model from controller
213
214 :param: controller: Controller
215 :param: model_name: Model name
216
217 :return: Model: The created Juju model object
218 """
219 return await controller.get_model(model_name)
220
221 async def model_exists(
222 self, model_name: str, controller: Controller = None
223 ) -> bool:
224 """
225 Check if model exists
226
227 :param: controller: Controller
228 :param: model_name: Model name
229
230 :return bool
231 """
232 need_to_disconnect = False
233
234 # Get controller if not passed
235 if not controller:
236 controller = await self.get_controller()
237 need_to_disconnect = True
238
239 # Check if model exists
240 try:
241 return model_name in await controller.list_models()
242 finally:
243 if need_to_disconnect:
244 await self.disconnect_controller(controller)
245
246 async def models_exist(self, model_names: [str]) -> (bool, list):
247 """
248 Check if models exists
249
250 :param: model_names: List of strings with model names
251
252 :return (bool, list[str]): (True if all models exists, List of model names that don't exist)
253 """
254 if not model_names:
255 raise Exception(
256 "model_names must be a non-empty array. Given value: {}".format(
257 model_names
258 )
259 )
260 non_existing_models = []
261 models = await self.list_models()
262 existing_models = list(set(models).intersection(model_names))
263 non_existing_models = list(set(model_names) - set(existing_models))
264
265 return (
266 len(non_existing_models) == 0,
267 non_existing_models,
268 )
269
270 async def get_model_status(self, model_name: str) -> FullStatus:
271 """
272 Get model status
273
274 :param: model_name: Model name
275
276 :return: Full status object
277 """
278 controller = await self.get_controller()
279 model = await self.get_model(controller, model_name)
280 try:
281 return await model.get_status()
282 finally:
283 await self.disconnect_model(model)
284 await self.disconnect_controller(controller)
285
286 async def create_machine(
287 self,
288 model_name: str,
289 machine_id: str = None,
290 db_dict: dict = None,
291 progress_timeout: float = None,
292 total_timeout: float = None,
293 series: str = "xenial",
294 wait: bool = True,
295 ) -> (Machine, bool):
296 """
297 Create machine
298
299 :param: model_name: Model name
300 :param: machine_id: Machine id
301 :param: db_dict: Dictionary with data of the DB to write the updates
302 :param: progress_timeout: Maximum time between two updates in the model
303 :param: total_timeout: Timeout for the entity to be active
304 :param: series: Series of the machine (xenial, bionic, focal, ...)
305 :param: wait: Wait until machine is ready
306
307 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
308 if the machine is new or it already existed
309 """
310 new = False
311 machine = None
312
313 self.log.debug(
314 "Creating machine (id={}) in model: {}".format(machine_id, model_name)
315 )
316
317 # Get controller
318 controller = await self.get_controller()
319
320 # Get model
321 model = await self.get_model(controller, model_name)
322 try:
323 if machine_id is not None:
324 self.log.debug(
325 "Searching machine (id={}) in model {}".format(
326 machine_id, model_name
327 )
328 )
329
330 # Get machines from model and get the machine with machine_id if exists
331 machines = await model.get_machines()
332 if machine_id in machines:
333 self.log.debug(
334 "Machine (id={}) found in model {}".format(
335 machine_id, model_name
336 )
337 )
338 machine = machines[machine_id]
339 else:
340 raise JujuMachineNotFound("Machine {} not found".format(machine_id))
341
342 if machine is None:
343 self.log.debug("Creating a new machine in model {}".format(model_name))
344
345 # Create machine
346 machine = await model.add_machine(
347 spec=None, constraints=None, disks=None, series=series
348 )
349 new = True
350
351 # Wait until the machine is ready
352 self.log.debug(
353 "Wait until machine {} is ready in model {}".format(
354 machine.entity_id, model_name
355 )
356 )
357 if wait:
358 await JujuModelWatcher.wait_for(
359 model=model,
360 entity=machine,
361 progress_timeout=progress_timeout,
362 total_timeout=total_timeout,
363 db_dict=db_dict,
364 n2vc=self.n2vc,
365 )
366 finally:
367 await self.disconnect_model(model)
368 await self.disconnect_controller(controller)
369
370 self.log.debug(
371 "Machine {} ready at {} in model {}".format(
372 machine.entity_id, machine.dns_name, model_name
373 )
374 )
375 return machine, new
376
377 async def provision_machine(
378 self,
379 model_name: str,
380 hostname: str,
381 username: str,
382 private_key_path: str,
383 db_dict: dict = None,
384 progress_timeout: float = None,
385 total_timeout: float = None,
386 ) -> str:
387 """
388 Manually provisioning of a machine
389
390 :param: model_name: Model name
391 :param: hostname: IP to access the machine
392 :param: username: Username to login to the machine
393 :param: private_key_path: Local path for the private key
394 :param: db_dict: Dictionary with data of the DB to write the updates
395 :param: progress_timeout: Maximum time between two updates in the model
396 :param: total_timeout: Timeout for the entity to be active
397
398 :return: (Entity): Machine id
399 """
400 self.log.debug(
401 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
402 model_name, hostname, username
403 )
404 )
405
406 # Get controller
407 controller = await self.get_controller()
408
409 # Get model
410 model = await self.get_model(controller, model_name)
411
412 try:
413 # Get provisioner
414 provisioner = AsyncSSHProvisioner(
415 host=hostname,
416 user=username,
417 private_key_path=private_key_path,
418 log=self.log,
419 )
420
421 # Provision machine
422 params = await provisioner.provision_machine()
423
424 params.jobs = ["JobHostUnits"]
425
426 self.log.debug("Adding machine to model")
427 connection = model.connection()
428 client_facade = client.ClientFacade.from_connection(connection)
429
430 results = await client_facade.AddMachines(params=[params])
431 error = results.machines[0].error
432
433 if error:
434 msg = "Error adding machine: {}".format(error.message)
435 self.log.error(msg=msg)
436 raise ValueError(msg)
437
438 machine_id = results.machines[0].machine
439
440 self.log.debug("Installing Juju agent into machine {}".format(machine_id))
441 asyncio.ensure_future(
442 provisioner.install_agent(
443 connection=connection,
444 nonce=params.nonce,
445 machine_id=machine_id,
446 proxy=self.api_proxy,
447 )
448 )
449
450 machine = None
451 for _ in range(10):
452 machine_list = await model.get_machines()
453 if machine_id in machine_list:
454 self.log.debug("Machine {} found in model!".format(machine_id))
455 machine = model.machines.get(machine_id)
456 break
457 await asyncio.sleep(2)
458
459 if machine is None:
460 msg = "Machine {} not found in model".format(machine_id)
461 self.log.error(msg=msg)
462 raise JujuMachineNotFound(msg)
463
464 self.log.debug(
465 "Wait until machine {} is ready in model {}".format(
466 machine.entity_id, model_name
467 )
468 )
469 await JujuModelWatcher.wait_for(
470 model=model,
471 entity=machine,
472 progress_timeout=progress_timeout,
473 total_timeout=total_timeout,
474 db_dict=db_dict,
475 n2vc=self.n2vc,
476 )
477 except Exception as e:
478 raise e
479 finally:
480 await self.disconnect_model(model)
481 await self.disconnect_controller(controller)
482
483 self.log.debug(
484 "Machine provisioned {} in model {}".format(machine_id, model_name)
485 )
486
487 return machine_id
488
489 async def deploy(
490 self, uri: str, model_name: str, wait: bool = True, timeout: float = 3600
491 ):
492 """
493 Deploy bundle or charm: Similar to the juju CLI command `juju deploy`
494
495 :param: uri: Path or Charm Store uri in which the charm or bundle can be found
496 :param: model_name: Model name
497 :param: wait: Indicates whether to wait or not until all applications are active
498 :param: timeout: Time in seconds to wait until all applications are active
499 """
500 controller = await self.get_controller()
501 model = await self.get_model(controller, model_name)
502 try:
503 await model.deploy(uri)
504 if wait:
505 await JujuModelWatcher.wait_for_model(model, timeout=timeout)
506 self.log.debug("All units active in model {}".format(model_name))
507 finally:
508 await self.disconnect_model(model)
509 await self.disconnect_controller(controller)
510
511 async def deploy_charm(
512 self,
513 application_name: str,
514 path: str,
515 model_name: str,
516 machine_id: str,
517 db_dict: dict = None,
518 progress_timeout: float = None,
519 total_timeout: float = None,
520 config: dict = None,
521 series: str = None,
522 num_units: int = 1,
523 ):
524 """Deploy charm
525
526 :param: application_name: Application name
527 :param: path: Local path to the charm
528 :param: model_name: Model name
529 :param: machine_id ID of the machine
530 :param: db_dict: Dictionary with data of the DB to write the updates
531 :param: progress_timeout: Maximum time between two updates in the model
532 :param: total_timeout: Timeout for the entity to be active
533 :param: config: Config for the charm
534 :param: series: Series of the charm
535 :param: num_units: Number of units
536
537 :return: (juju.application.Application): Juju application
538 """
539 self.log.debug(
540 "Deploying charm {} to machine {} in model ~{}".format(
541 application_name, machine_id, model_name
542 )
543 )
544 self.log.debug("charm: {}".format(path))
545
546 # Get controller
547 controller = await self.get_controller()
548
549 # Get model
550 model = await self.get_model(controller, model_name)
551
552 try:
553 application = None
554 if application_name not in model.applications:
555
556 if machine_id is not None:
557 if machine_id not in model.machines:
558 msg = "Machine {} not found in model".format(machine_id)
559 self.log.error(msg=msg)
560 raise JujuMachineNotFound(msg)
561 machine = model.machines[machine_id]
562 series = machine.series
563
564 application = await model.deploy(
565 entity_url=path,
566 application_name=application_name,
567 channel="stable",
568 num_units=1,
569 series=series,
570 to=machine_id,
571 config=config,
572 )
573
574 self.log.debug(
575 "Wait until application {} is ready in model {}".format(
576 application_name, model_name
577 )
578 )
579 if num_units > 1:
580 for _ in range(num_units - 1):
581 m, _ = await self.create_machine(model_name, wait=False)
582 await application.add_unit(to=m.entity_id)
583
584 await JujuModelWatcher.wait_for(
585 model=model,
586 entity=application,
587 progress_timeout=progress_timeout,
588 total_timeout=total_timeout,
589 db_dict=db_dict,
590 n2vc=self.n2vc,
591 )
592 self.log.debug(
593 "Application {} is ready in model {}".format(
594 application_name, model_name
595 )
596 )
597 else:
598 raise JujuApplicationExists(
599 "Application {} exists".format(application_name)
600 )
601 finally:
602 await self.disconnect_model(model)
603 await self.disconnect_controller(controller)
604
605 return application
606
607 def _get_application(self, model: Model, application_name: str) -> Application:
608 """Get application
609
610 :param: model: Model object
611 :param: application_name: Application name
612
613 :return: juju.application.Application (or None if it doesn't exist)
614 """
615 if model.applications and application_name in model.applications:
616 return model.applications[application_name]
617
618 async def execute_action(
619 self,
620 application_name: str,
621 model_name: str,
622 action_name: str,
623 db_dict: dict = None,
624 progress_timeout: float = None,
625 total_timeout: float = None,
626 **kwargs
627 ):
628 """Execute action
629
630 :param: application_name: Application name
631 :param: model_name: Model name
632 :param: action_name: Name of the action
633 :param: db_dict: Dictionary with data of the DB to write the updates
634 :param: progress_timeout: Maximum time between two updates in the model
635 :param: total_timeout: Timeout for the entity to be active
636
637 :return: (str, str): (output and status)
638 """
639 self.log.debug(
640 "Executing action {} using params {}".format(action_name, kwargs)
641 )
642 # Get controller
643 controller = await self.get_controller()
644
645 # Get model
646 model = await self.get_model(controller, model_name)
647
648 try:
649 # Get application
650 application = self._get_application(
651 model,
652 application_name=application_name,
653 )
654 if application is None:
655 raise JujuApplicationNotFound("Cannot execute action")
656
657 # Get leader unit
658 # Racing condition:
659 # Ocassionally, self._get_leader_unit() will return None
660 # because the leader elected hook has not been triggered yet.
661 # Therefore, we are doing some retries. If it happens again,
662 # re-open bug 1236
663 attempts = 3
664 time_between_retries = 10
665 unit = None
666 for _ in range(attempts):
667 unit = await self._get_leader_unit(application)
668 if unit is None:
669 await asyncio.sleep(time_between_retries)
670 else:
671 break
672 if unit is None:
673 raise JujuLeaderUnitNotFound(
674 "Cannot execute action: leader unit not found"
675 )
676
677 actions = await application.get_actions()
678
679 if action_name not in actions:
680 raise JujuActionNotFound(
681 "Action {} not in available actions".format(action_name)
682 )
683
684 action = await unit.run_action(action_name, **kwargs)
685
686 self.log.debug(
687 "Wait until action {} is completed in application {} (model={})".format(
688 action_name, application_name, model_name
689 )
690 )
691 await JujuModelWatcher.wait_for(
692 model=model,
693 entity=action,
694 progress_timeout=progress_timeout,
695 total_timeout=total_timeout,
696 db_dict=db_dict,
697 n2vc=self.n2vc,
698 )
699
700 output = await model.get_action_output(action_uuid=action.entity_id)
701 status = await model.get_action_status(uuid_or_prefix=action.entity_id)
702 status = (
703 status[action.entity_id] if action.entity_id in status else "failed"
704 )
705
706 self.log.debug(
707 "Action {} completed with status {} in application {} (model={})".format(
708 action_name, action.status, application_name, model_name
709 )
710 )
711 finally:
712 await self.disconnect_model(model)
713 await self.disconnect_controller(controller)
714
715 return output, status
716
717 async def get_actions(self, application_name: str, model_name: str) -> dict:
718 """Get list of actions
719
720 :param: application_name: Application name
721 :param: model_name: Model name
722
723 :return: Dict with this format
724 {
725 "action_name": "Description of the action",
726 ...
727 }
728 """
729 self.log.debug(
730 "Getting list of actions for application {}".format(application_name)
731 )
732
733 # Get controller
734 controller = await self.get_controller()
735
736 # Get model
737 model = await self.get_model(controller, model_name)
738
739 try:
740 # Get application
741 application = self._get_application(
742 model,
743 application_name=application_name,
744 )
745
746 # Return list of actions
747 return await application.get_actions()
748
749 finally:
750 # Disconnect from model and controller
751 await self.disconnect_model(model)
752 await self.disconnect_controller(controller)
753
754 async def get_metrics(self, model_name: str, application_name: str) -> dict:
755 """Get the metrics collected by the VCA.
756
757 :param model_name The name or unique id of the network service
758 :param application_name The name of the application
759 """
760 if not model_name or not application_name:
761 raise Exception("model_name and application_name must be non-empty strings")
762 metrics = {}
763 controller = await self.get_controller()
764 model = await self.get_model(controller, model_name)
765 try:
766 application = self._get_application(model, application_name)
767 if application is not None:
768 metrics = await application.get_metrics()
769 finally:
770 self.disconnect_model(model)
771 self.disconnect_controller(controller)
772 return metrics
773
774 async def add_relation(
775 self,
776 model_name: str,
777 endpoint_1: str,
778 endpoint_2: str,
779 ):
780 """Add relation
781
782 :param: model_name: Model name
783 :param: endpoint_1 First endpoint name
784 ("app:endpoint" format or directly the saas name)
785 :param: endpoint_2: Second endpoint name (^ same format)
786 """
787
788 self.log.debug("Adding relation: {} -> {}".format(endpoint_1, endpoint_2))
789
790 # Get controller
791 controller = await self.get_controller()
792
793 # Get model
794 model = await self.get_model(controller, model_name)
795
796 # Add relation
797 try:
798 await model.add_relation(endpoint_1, endpoint_2)
799 except JujuAPIError as e:
800 if "not found" in e.message:
801 self.log.warning("Relation not found: {}".format(e.message))
802 return
803 if "already exists" in e.message:
804 self.log.warning("Relation already exists: {}".format(e.message))
805 return
806 # another exception, raise it
807 raise e
808 finally:
809 await self.disconnect_model(model)
810 await self.disconnect_controller(controller)
811
812 async def consume(
813 self,
814 offer_url: str,
815 model_name: str,
816 ):
817 """
818 Adds a remote offer to the model. Relations can be created later using "juju relate".
819
820 :param: offer_url: Offer Url
821 :param: model_name: Model name
822
823 :raises ParseError if there's a problem parsing the offer_url
824 :raises JujuError if remote offer includes and endpoint
825 :raises JujuAPIError if the operation is not successful
826 """
827 controller = await self.get_controller()
828 model = await controller.get_model(model_name)
829
830 try:
831 await model.consume(offer_url)
832 finally:
833 await self.disconnect_model(model)
834 await self.disconnect_controller(controller)
835
836 async def destroy_model(self, model_name: str, total_timeout: float):
837 """
838 Destroy model
839
840 :param: model_name: Model name
841 :param: total_timeout: Timeout
842 """
843
844 controller = await self.get_controller()
845 model = None
846 try:
847 if not await self.model_exists(model_name, controller=controller):
848 return
849
850 model = await self.get_model(controller, model_name)
851 self.log.debug("Destroying model {}".format(model_name))
852 uuid = model.info.uuid
853
854 # Destroy machines that are manually provisioned
855 # and still are in pending state
856 await self._destroy_pending_machines(model, only_manual=True)
857
858 # Disconnect model
859 await self.disconnect_model(model)
860
861 await controller.destroy_model(uuid, force=True, max_wait=0)
862
863 # Wait until model is destroyed
864 self.log.debug("Waiting for model {} to be destroyed...".format(model_name))
865
866 if total_timeout is None:
867 total_timeout = 3600
868 end = time.time() + total_timeout
869 while time.time() < end:
870 models = await controller.list_models()
871 if model_name not in models:
872 self.log.debug(
873 "The model {} ({}) was destroyed".format(model_name, uuid)
874 )
875 return
876 await asyncio.sleep(5)
877 raise Exception(
878 "Timeout waiting for model {} to be destroyed".format(model_name)
879 )
880 except Exception as e:
881 if model:
882 await self.disconnect_model(model)
883 raise e
884 finally:
885 await self.disconnect_controller(controller)
886
887 async def destroy_application(self, model: Model, application_name: str):
888 """
889 Destroy application
890
891 :param: model: Model object
892 :param: application_name: Application name
893 """
894 self.log.debug(
895 "Destroying application {} in model {}".format(
896 application_name, model.info.name
897 )
898 )
899 application = model.applications.get(application_name)
900 if application:
901 await application.destroy()
902 else:
903 self.log.warning("Application not found: {}".format(application_name))
904
905 async def _destroy_pending_machines(self, model: Model, only_manual: bool = False):
906 """
907 Destroy pending machines in a given model
908
909 :param: only_manual: Bool that indicates only manually provisioned
910 machines should be destroyed (if True), or that
911 all pending machines should be destroyed
912 """
913 status = await model.get_status()
914 for machine_id in status.machines:
915 machine_status = status.machines[machine_id]
916 if machine_status.agent_status.status == "pending":
917 if only_manual and not machine_status.instance_id.startswith("manual:"):
918 break
919 machine = model.machines[machine_id]
920 await machine.destroy(force=True)
921
922 # async def destroy_machine(
923 # self, model: Model, machine_id: str, total_timeout: float = 3600
924 # ):
925 # """
926 # Destroy machine
927
928 # :param: model: Model object
929 # :param: machine_id: Machine id
930 # :param: total_timeout: Timeout in seconds
931 # """
932 # machines = await model.get_machines()
933 # if machine_id in machines:
934 # machine = machines[machine_id]
935 # await machine.destroy(force=True)
936 # # max timeout
937 # end = time.time() + total_timeout
938
939 # # wait for machine removal
940 # machines = await model.get_machines()
941 # while machine_id in machines and time.time() < end:
942 # self.log.debug("Waiting for machine {} is destroyed".format(machine_id))
943 # await asyncio.sleep(0.5)
944 # machines = await model.get_machines()
945 # self.log.debug("Machine destroyed: {}".format(machine_id))
946 # else:
947 # self.log.debug("Machine not found: {}".format(machine_id))
948
949 async def configure_application(
950 self, model_name: str, application_name: str, config: dict = None
951 ):
952 """Configure application
953
954 :param: model_name: Model name
955 :param: application_name: Application name
956 :param: config: Config to apply to the charm
957 """
958 self.log.debug("Configuring application {}".format(application_name))
959
960 if config:
961 controller = await self.get_controller()
962 model = None
963 try:
964 model = await self.get_model(controller, model_name)
965 application = self._get_application(
966 model,
967 application_name=application_name,
968 )
969 await application.set_config(config)
970 finally:
971 if model:
972 await self.disconnect_model(model)
973 await self.disconnect_controller(controller)
974
975 def _get_api_endpoints_db(self) -> [str]:
976 """
977 Get API Endpoints from DB
978
979 :return: List of API endpoints
980 """
981 self.log.debug("Getting endpoints from database")
982
983 juju_info = self.db.get_one(
984 DB_DATA.api_endpoints.table,
985 q_filter=DB_DATA.api_endpoints.filter,
986 fail_on_empty=False,
987 )
988 if juju_info and DB_DATA.api_endpoints.key in juju_info:
989 return juju_info[DB_DATA.api_endpoints.key]
990
991 def _update_api_endpoints_db(self, endpoints: [str]):
992 """
993 Update API endpoints in Database
994
995 :param: List of endpoints
996 """
997 self.log.debug("Saving endpoints {} in database".format(endpoints))
998
999 juju_info = self.db.get_one(
1000 DB_DATA.api_endpoints.table,
1001 q_filter=DB_DATA.api_endpoints.filter,
1002 fail_on_empty=False,
1003 )
1004 # If it doesn't, then create it
1005 if not juju_info:
1006 try:
1007 self.db.create(
1008 DB_DATA.api_endpoints.table,
1009 DB_DATA.api_endpoints.filter,
1010 )
1011 except DbException as e:
1012 # Racing condition: check if another N2VC worker has created it
1013 juju_info = self.db.get_one(
1014 DB_DATA.api_endpoints.table,
1015 q_filter=DB_DATA.api_endpoints.filter,
1016 fail_on_empty=False,
1017 )
1018 if not juju_info:
1019 raise e
1020 self.db.set_one(
1021 DB_DATA.api_endpoints.table,
1022 DB_DATA.api_endpoints.filter,
1023 {DB_DATA.api_endpoints.key: endpoints},
1024 )
1025
1026 def handle_exception(self, loop, context):
1027 # All unhandled exceptions by libjuju are handled here.
1028 pass
1029
1030 async def health_check(self, interval: float = 300.0):
1031 """
1032 Health check to make sure controller and controller_model connections are OK
1033
1034 :param: interval: Time in seconds between checks
1035 """
1036 controller = None
1037 while True:
1038 try:
1039 controller = await self.get_controller()
1040 # self.log.debug("VCA is alive")
1041 except Exception as e:
1042 self.log.error("Health check to VCA failed: {}".format(e))
1043 finally:
1044 await self.disconnect_controller(controller)
1045 await asyncio.sleep(interval)
1046
1047 async def list_models(self, contains: str = None) -> [str]:
1048 """List models with certain names
1049
1050 :param: contains: String that is contained in model name
1051
1052 :retur: [models] Returns list of model names
1053 """
1054
1055 controller = await self.get_controller()
1056 try:
1057 models = await controller.list_models()
1058 if contains:
1059 models = [model for model in models if contains in model]
1060 return models
1061 finally:
1062 await self.disconnect_controller(controller)
1063
1064 async def list_offers(self, model_name: str) -> QueryApplicationOffersResults:
1065 """List models with certain names
1066
1067 :param: model_name: Model name
1068
1069 :return: Returns list of offers
1070 """
1071
1072 controller = await self.get_controller()
1073 try:
1074 return await controller.list_offers(model_name)
1075 finally:
1076 await self.disconnect_controller(controller)
1077
1078 async def add_k8s(
1079 self,
1080 name: str,
1081 rbac_id: str,
1082 token: str,
1083 client_cert_data: str,
1084 configuration: Configuration,
1085 storage_class: str,
1086 credential_name: str = None,
1087 ):
1088 """
1089 Add a Kubernetes cloud to the controller
1090
1091 Similar to the `juju add-k8s` command in the CLI
1092
1093 :param: name: Name for the K8s cloud
1094 :param: configuration: Kubernetes configuration object
1095 :param: storage_class: Storage Class to use in the cloud
1096 :param: credential_name: Storage Class to use in the cloud
1097 """
1098
1099 if not storage_class:
1100 raise Exception("storage_class must be a non-empty string")
1101 if not name:
1102 raise Exception("name must be a non-empty string")
1103 if not configuration:
1104 raise Exception("configuration must be provided")
1105
1106 endpoint = configuration.host
1107 credential = self.get_k8s_cloud_credential(
1108 configuration,
1109 client_cert_data,
1110 token,
1111 )
1112 credential.attrs[RBAC_LABEL_KEY_NAME] = rbac_id
1113 cloud = client.Cloud(
1114 type_="kubernetes",
1115 auth_types=[credential.auth_type],
1116 endpoint=endpoint,
1117 ca_certificates=[client_cert_data],
1118 config={
1119 "operator-storage": storage_class,
1120 "workload-storage": storage_class,
1121 },
1122 )
1123
1124 return await self.add_cloud(
1125 name, cloud, credential, credential_name=credential_name
1126 )
1127
1128 def get_k8s_cloud_credential(
1129 self,
1130 configuration: Configuration,
1131 client_cert_data: str,
1132 token: str = None,
1133 ) -> client.CloudCredential:
1134 attrs = {}
1135 # TODO: Test with AKS
1136 key = None # open(configuration.key_file, "r").read()
1137 username = configuration.username
1138 password = configuration.password
1139
1140 if client_cert_data:
1141 attrs["ClientCertificateData"] = client_cert_data
1142 if key:
1143 attrs["ClientKeyData"] = key
1144 if token:
1145 if username or password:
1146 raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass")
1147 attrs["Token"] = token
1148
1149 auth_type = None
1150 if key:
1151 auth_type = "oauth2"
1152 if client_cert_data:
1153 auth_type = "oauth2withcert"
1154 if not token:
1155 raise JujuInvalidK8sConfiguration(
1156 "missing token for auth type {}".format(auth_type)
1157 )
1158 elif username:
1159 if not password:
1160 self.log.debug(
1161 "credential for user {} has empty password".format(username)
1162 )
1163 attrs["username"] = username
1164 attrs["password"] = password
1165 if client_cert_data:
1166 auth_type = "userpasswithcert"
1167 else:
1168 auth_type = "userpass"
1169 elif client_cert_data and token:
1170 auth_type = "certificate"
1171 else:
1172 raise JujuInvalidK8sConfiguration("authentication method not supported")
1173 return client.CloudCredential(auth_type=auth_type, attrs=attrs)
1174
1175 async def add_cloud(
1176 self,
1177 name: str,
1178 cloud: Cloud,
1179 credential: CloudCredential = None,
1180 credential_name: str = None,
1181 ) -> Cloud:
1182 """
1183 Add cloud to the controller
1184
1185 :param: name: Name of the cloud to be added
1186 :param: cloud: Cloud object
1187 :param: credential: CloudCredentials object for the cloud
1188 :param: credential_name: Credential name.
1189 If not defined, cloud of the name will be used.
1190 """
1191 controller = await self.get_controller()
1192 try:
1193 _ = await controller.add_cloud(name, cloud)
1194 if credential:
1195 await controller.add_credential(
1196 credential_name or name, credential=credential, cloud=name
1197 )
1198 # Need to return the object returned by the controller.add_cloud() function
1199 # I'm returning the original value now until this bug is fixed:
1200 # https://github.com/juju/python-libjuju/issues/443
1201 return cloud
1202 finally:
1203 await self.disconnect_controller(controller)
1204
1205 async def remove_cloud(self, name: str):
1206 """
1207 Remove cloud
1208
1209 :param: name: Name of the cloud to be removed
1210 """
1211 controller = await self.get_controller()
1212 try:
1213 await controller.remove_cloud(name)
1214 finally:
1215 await self.disconnect_controller(controller)
1216
1217 async def _get_leader_unit(self, application: Application) -> Unit:
1218 unit = None
1219 for u in application.units:
1220 if await u.is_leader_from_status():
1221 unit = u
1222 break
1223 return unit
1224
1225 async def get_cloud_credentials(self, cloud_name: str, credential_name: str):
1226 controller = await self.get_controller()
1227 try:
1228 facade = client.CloudFacade.from_connection(controller.connection())
1229 cloud_cred_tag = tag.credential(cloud_name, self.username, credential_name)
1230 params = [client.Entity(cloud_cred_tag)]
1231 return (await facade.Credential(params)).results
1232 finally:
1233 await self.disconnect_controller(controller)