02cc96885632b2dfeebfb5756be4685fc7a36a3e
[osm/N2VC.git] / n2vc / libjuju.py
1 # Copyright 2020 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import logging
17
18 import time
19
20 from juju.errors import JujuAPIError
21 from juju.model import Model
22 from juju.machine import Machine
23 from juju.application import Application
24 from juju.unit import Unit
25 from juju.client._definitions import (
26 FullStatus,
27 QueryApplicationOffersResults,
28 Cloud,
29 CloudCredential,
30 )
31 from juju.controller import Controller
32 from juju.client import client
33 from juju import tag
34
35 from n2vc.config import ModelConfig
36 from n2vc.juju_watcher import JujuModelWatcher
37 from n2vc.provisioner import AsyncSSHProvisioner
38 from n2vc.n2vc_conn import N2VCConnector
39 from n2vc.exceptions import (
40 JujuMachineNotFound,
41 JujuApplicationNotFound,
42 JujuLeaderUnitNotFound,
43 JujuActionNotFound,
44 JujuControllerFailedConnecting,
45 JujuApplicationExists,
46 JujuInvalidK8sConfiguration,
47 )
48 from n2vc.utils import DB_DATA
49 from osm_common.dbbase import DbException
50 from kubernetes.client.configuration import Configuration
51
52 RBAC_LABEL_KEY_NAME = "rbac-id"
53
54
55 class Libjuju:
56 def __init__(
57 self,
58 endpoint: str,
59 api_proxy: str,
60 username: str,
61 password: str,
62 cacert: str,
63 loop: asyncio.AbstractEventLoop = None,
64 log: logging.Logger = None,
65 db: dict = None,
66 n2vc: N2VCConnector = None,
67 model_config: ModelConfig = {},
68 ):
69 """
70 Constructor
71
72 :param: endpoint: Endpoint of the juju controller (host:port)
73 :param: api_proxy: Endpoint of the juju controller - Reachable from the VNFs
74 :param: username: Juju username
75 :param: password: Juju password
76 :param: cacert: Juju CA Certificate
77 :param: loop: Asyncio loop
78 :param: log: Logger
79 :param: db: DB object
80 :param: n2vc: N2VC object
81 :param: apt_mirror: APT Mirror
82 :param: enable_os_upgrade: Enable OS Upgrade
83 """
84
85 self.log = log or logging.getLogger("Libjuju")
86 self.db = db
87 db_endpoints = self._get_api_endpoints_db()
88 self.endpoints = None
89 if (db_endpoints and endpoint not in db_endpoints) or not db_endpoints:
90 self.endpoints = [endpoint]
91 self._update_api_endpoints_db(self.endpoints)
92 else:
93 self.endpoints = db_endpoints
94 self.api_proxy = api_proxy
95 self.username = username
96 self.password = password
97 self.cacert = cacert
98 self.loop = loop or asyncio.get_event_loop()
99 self.n2vc = n2vc
100
101 # Generate config for models
102 self.model_config = model_config
103
104 self.loop.set_exception_handler(self.handle_exception)
105 self.creating_model = asyncio.Lock(loop=self.loop)
106
107 self.log.debug("Libjuju initialized!")
108
109 self.health_check_task = self._create_health_check_task()
110
111 def _create_health_check_task(self):
112 return self.loop.create_task(self.health_check())
113
114 async def get_controller(self, timeout: float = 15.0) -> Controller:
115 """
116 Get controller
117
118 :param: timeout: Time in seconds to wait for controller to connect
119 """
120 controller = None
121 try:
122 controller = Controller(loop=self.loop)
123 await asyncio.wait_for(
124 controller.connect(
125 endpoint=self.endpoints,
126 username=self.username,
127 password=self.password,
128 cacert=self.cacert,
129 ),
130 timeout=timeout,
131 )
132 endpoints = await controller.api_endpoints
133 if self.endpoints != endpoints:
134 self.endpoints = endpoints
135 self._update_api_endpoints_db(self.endpoints)
136 return controller
137 except asyncio.CancelledError as e:
138 raise e
139 except Exception as e:
140 self.log.error(
141 "Failed connecting to controller: {}...".format(self.endpoints)
142 )
143 if controller:
144 await self.disconnect_controller(controller)
145 raise JujuControllerFailedConnecting(e)
146
147 async def disconnect(self):
148 """Disconnect"""
149 # Cancel health check task
150 self.health_check_task.cancel()
151 self.log.debug("Libjuju disconnected!")
152
153 async def disconnect_model(self, model: Model):
154 """
155 Disconnect model
156
157 :param: model: Model that will be disconnected
158 """
159 await model.disconnect()
160
161 async def disconnect_controller(self, controller: Controller):
162 """
163 Disconnect controller
164
165 :param: controller: Controller that will be disconnected
166 """
167 if controller:
168 await controller.disconnect()
169
170 async def add_model(self, model_name: str, cloud_name: str, credential_name=None):
171 """
172 Create model
173
174 :param: model_name: Model name
175 :param: cloud_name: Cloud name
176 :param: credential_name: Credential name to use for adding the model
177 If not specified, same name as the cloud will be used.
178 """
179
180 # Get controller
181 controller = await self.get_controller()
182 model = None
183 try:
184 # Block until other workers have finished model creation
185 while self.creating_model.locked():
186 await asyncio.sleep(0.1)
187
188 # Create the model
189 async with self.creating_model:
190 if await self.model_exists(model_name, controller=controller):
191 return
192 self.log.debug("Creating model {}".format(model_name))
193 model = await controller.add_model(
194 model_name,
195 config=self.model_config,
196 cloud_name=cloud_name,
197 credential_name=credential_name or cloud_name,
198 )
199 finally:
200 if model:
201 await self.disconnect_model(model)
202 await self.disconnect_controller(controller)
203
204 async def get_model(
205 self, controller: Controller, model_name: str, id=None
206 ) -> Model:
207 """
208 Get model from controller
209
210 :param: controller: Controller
211 :param: model_name: Model name
212
213 :return: Model: The created Juju model object
214 """
215 return await controller.get_model(model_name)
216
217 async def model_exists(
218 self, model_name: str, controller: Controller = None
219 ) -> bool:
220 """
221 Check if model exists
222
223 :param: controller: Controller
224 :param: model_name: Model name
225
226 :return bool
227 """
228 need_to_disconnect = False
229
230 # Get controller if not passed
231 if not controller:
232 controller = await self.get_controller()
233 need_to_disconnect = True
234
235 # Check if model exists
236 try:
237 return model_name in await controller.list_models()
238 finally:
239 if need_to_disconnect:
240 await self.disconnect_controller(controller)
241
242 async def models_exist(self, model_names: [str]) -> (bool, list):
243 """
244 Check if models exists
245
246 :param: model_names: List of strings with model names
247
248 :return (bool, list[str]): (True if all models exists, List of model names that don't exist)
249 """
250 if not model_names:
251 raise Exception(
252 "model_names must be a non-empty array. Given value: {}".format(
253 model_names
254 )
255 )
256 non_existing_models = []
257 models = await self.list_models()
258 existing_models = list(set(models).intersection(model_names))
259 non_existing_models = list(set(model_names) - set(existing_models))
260
261 return (
262 len(non_existing_models) == 0,
263 non_existing_models,
264 )
265
266 async def get_model_status(self, model_name: str) -> FullStatus:
267 """
268 Get model status
269
270 :param: model_name: Model name
271
272 :return: Full status object
273 """
274 controller = await self.get_controller()
275 model = await self.get_model(controller, model_name)
276 try:
277 return await model.get_status()
278 finally:
279 await self.disconnect_model(model)
280 await self.disconnect_controller(controller)
281
282 async def create_machine(
283 self,
284 model_name: str,
285 machine_id: str = None,
286 db_dict: dict = None,
287 progress_timeout: float = None,
288 total_timeout: float = None,
289 series: str = "xenial",
290 wait: bool = True,
291 ) -> (Machine, bool):
292 """
293 Create machine
294
295 :param: model_name: Model name
296 :param: machine_id: Machine id
297 :param: db_dict: Dictionary with data of the DB to write the updates
298 :param: progress_timeout: Maximum time between two updates in the model
299 :param: total_timeout: Timeout for the entity to be active
300 :param: series: Series of the machine (xenial, bionic, focal, ...)
301 :param: wait: Wait until machine is ready
302
303 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
304 if the machine is new or it already existed
305 """
306 new = False
307 machine = None
308
309 self.log.debug(
310 "Creating machine (id={}) in model: {}".format(machine_id, model_name)
311 )
312
313 # Get controller
314 controller = await self.get_controller()
315
316 # Get model
317 model = await self.get_model(controller, model_name)
318 try:
319 if machine_id is not None:
320 self.log.debug(
321 "Searching machine (id={}) in model {}".format(
322 machine_id, model_name
323 )
324 )
325
326 # Get machines from model and get the machine with machine_id if exists
327 machines = await model.get_machines()
328 if machine_id in machines:
329 self.log.debug(
330 "Machine (id={}) found in model {}".format(
331 machine_id, model_name
332 )
333 )
334 machine = machines[machine_id]
335 else:
336 raise JujuMachineNotFound("Machine {} not found".format(machine_id))
337
338 if machine is None:
339 self.log.debug("Creating a new machine in model {}".format(model_name))
340
341 # Create machine
342 machine = await model.add_machine(
343 spec=None, constraints=None, disks=None, series=series
344 )
345 new = True
346
347 # Wait until the machine is ready
348 self.log.debug(
349 "Wait until machine {} is ready in model {}".format(
350 machine.entity_id, model_name
351 )
352 )
353 if wait:
354 await JujuModelWatcher.wait_for(
355 model=model,
356 entity=machine,
357 progress_timeout=progress_timeout,
358 total_timeout=total_timeout,
359 db_dict=db_dict,
360 n2vc=self.n2vc,
361 )
362 finally:
363 await self.disconnect_model(model)
364 await self.disconnect_controller(controller)
365
366 self.log.debug(
367 "Machine {} ready at {} in model {}".format(
368 machine.entity_id, machine.dns_name, model_name
369 )
370 )
371 return machine, new
372
373 async def provision_machine(
374 self,
375 model_name: str,
376 hostname: str,
377 username: str,
378 private_key_path: str,
379 db_dict: dict = None,
380 progress_timeout: float = None,
381 total_timeout: float = None,
382 ) -> str:
383 """
384 Manually provisioning of a machine
385
386 :param: model_name: Model name
387 :param: hostname: IP to access the machine
388 :param: username: Username to login to the machine
389 :param: private_key_path: Local path for the private key
390 :param: db_dict: Dictionary with data of the DB to write the updates
391 :param: progress_timeout: Maximum time between two updates in the model
392 :param: total_timeout: Timeout for the entity to be active
393
394 :return: (Entity): Machine id
395 """
396 self.log.debug(
397 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
398 model_name, hostname, username
399 )
400 )
401
402 # Get controller
403 controller = await self.get_controller()
404
405 # Get model
406 model = await self.get_model(controller, model_name)
407
408 try:
409 # Get provisioner
410 provisioner = AsyncSSHProvisioner(
411 host=hostname,
412 user=username,
413 private_key_path=private_key_path,
414 log=self.log,
415 )
416
417 # Provision machine
418 params = await provisioner.provision_machine()
419
420 params.jobs = ["JobHostUnits"]
421
422 self.log.debug("Adding machine to model")
423 connection = model.connection()
424 client_facade = client.ClientFacade.from_connection(connection)
425
426 results = await client_facade.AddMachines(params=[params])
427 error = results.machines[0].error
428
429 if error:
430 msg = "Error adding machine: {}".format(error.message)
431 self.log.error(msg=msg)
432 raise ValueError(msg)
433
434 machine_id = results.machines[0].machine
435
436 self.log.debug("Installing Juju agent into machine {}".format(machine_id))
437 asyncio.ensure_future(
438 provisioner.install_agent(
439 connection=connection,
440 nonce=params.nonce,
441 machine_id=machine_id,
442 proxy=self.api_proxy,
443 series=params.series,
444 )
445 )
446
447 machine = None
448 for _ in range(10):
449 machine_list = await model.get_machines()
450 if machine_id in machine_list:
451 self.log.debug("Machine {} found in model!".format(machine_id))
452 machine = model.machines.get(machine_id)
453 break
454 await asyncio.sleep(2)
455
456 if machine is None:
457 msg = "Machine {} not found in model".format(machine_id)
458 self.log.error(msg=msg)
459 raise JujuMachineNotFound(msg)
460
461 self.log.debug(
462 "Wait until machine {} is ready in model {}".format(
463 machine.entity_id, model_name
464 )
465 )
466 await JujuModelWatcher.wait_for(
467 model=model,
468 entity=machine,
469 progress_timeout=progress_timeout,
470 total_timeout=total_timeout,
471 db_dict=db_dict,
472 n2vc=self.n2vc,
473 )
474 except Exception as e:
475 raise e
476 finally:
477 await self.disconnect_model(model)
478 await self.disconnect_controller(controller)
479
480 self.log.debug(
481 "Machine provisioned {} in model {}".format(machine_id, model_name)
482 )
483
484 return machine_id
485
486 async def deploy(
487 self, uri: str, model_name: str, wait: bool = True, timeout: float = 3600
488 ):
489 """
490 Deploy bundle or charm: Similar to the juju CLI command `juju deploy`
491
492 :param: uri: Path or Charm Store uri in which the charm or bundle can be found
493 :param: model_name: Model name
494 :param: wait: Indicates whether to wait or not until all applications are active
495 :param: timeout: Time in seconds to wait until all applications are active
496 """
497 controller = await self.get_controller()
498 model = await self.get_model(controller, model_name)
499 try:
500 await model.deploy(uri)
501 if wait:
502 await JujuModelWatcher.wait_for_model(model, timeout=timeout)
503 self.log.debug("All units active in model {}".format(model_name))
504 finally:
505 await self.disconnect_model(model)
506 await self.disconnect_controller(controller)
507
508 async def deploy_charm(
509 self,
510 application_name: str,
511 path: str,
512 model_name: str,
513 machine_id: str,
514 db_dict: dict = None,
515 progress_timeout: float = None,
516 total_timeout: float = None,
517 config: dict = None,
518 series: str = None,
519 num_units: int = 1,
520 ):
521 """Deploy charm
522
523 :param: application_name: Application name
524 :param: path: Local path to the charm
525 :param: model_name: Model name
526 :param: machine_id ID of the machine
527 :param: db_dict: Dictionary with data of the DB to write the updates
528 :param: progress_timeout: Maximum time between two updates in the model
529 :param: total_timeout: Timeout for the entity to be active
530 :param: config: Config for the charm
531 :param: series: Series of the charm
532 :param: num_units: Number of units
533
534 :return: (juju.application.Application): Juju application
535 """
536 self.log.debug(
537 "Deploying charm {} to machine {} in model ~{}".format(
538 application_name, machine_id, model_name
539 )
540 )
541 self.log.debug("charm: {}".format(path))
542
543 # Get controller
544 controller = await self.get_controller()
545
546 # Get model
547 model = await self.get_model(controller, model_name)
548
549 try:
550 application = None
551 if application_name not in model.applications:
552
553 if machine_id is not None:
554 if machine_id not in model.machines:
555 msg = "Machine {} not found in model".format(machine_id)
556 self.log.error(msg=msg)
557 raise JujuMachineNotFound(msg)
558 machine = model.machines[machine_id]
559 series = machine.series
560
561 application = await model.deploy(
562 entity_url=path,
563 application_name=application_name,
564 channel="stable",
565 num_units=1,
566 series=series,
567 to=machine_id,
568 config=config,
569 )
570
571 self.log.debug(
572 "Wait until application {} is ready in model {}".format(
573 application_name, model_name
574 )
575 )
576 if num_units > 1:
577 for _ in range(num_units - 1):
578 m, _ = await self.create_machine(model_name, wait=False)
579 await application.add_unit(to=m.entity_id)
580
581 await JujuModelWatcher.wait_for(
582 model=model,
583 entity=application,
584 progress_timeout=progress_timeout,
585 total_timeout=total_timeout,
586 db_dict=db_dict,
587 n2vc=self.n2vc,
588 )
589 self.log.debug(
590 "Application {} is ready in model {}".format(
591 application_name, model_name
592 )
593 )
594 else:
595 raise JujuApplicationExists(
596 "Application {} exists".format(application_name)
597 )
598 finally:
599 await self.disconnect_model(model)
600 await self.disconnect_controller(controller)
601
602 return application
603
604 def _get_application(self, model: Model, application_name: str) -> Application:
605 """Get application
606
607 :param: model: Model object
608 :param: application_name: Application name
609
610 :return: juju.application.Application (or None if it doesn't exist)
611 """
612 if model.applications and application_name in model.applications:
613 return model.applications[application_name]
614
615 async def execute_action(
616 self,
617 application_name: str,
618 model_name: str,
619 action_name: str,
620 db_dict: dict = None,
621 progress_timeout: float = None,
622 total_timeout: float = None,
623 **kwargs
624 ):
625 """Execute action
626
627 :param: application_name: Application name
628 :param: model_name: Model name
629 :param: action_name: Name of the action
630 :param: db_dict: Dictionary with data of the DB to write the updates
631 :param: progress_timeout: Maximum time between two updates in the model
632 :param: total_timeout: Timeout for the entity to be active
633
634 :return: (str, str): (output and status)
635 """
636 self.log.debug(
637 "Executing action {} using params {}".format(action_name, kwargs)
638 )
639 # Get controller
640 controller = await self.get_controller()
641
642 # Get model
643 model = await self.get_model(controller, model_name)
644
645 try:
646 # Get application
647 application = self._get_application(
648 model,
649 application_name=application_name,
650 )
651 if application is None:
652 raise JujuApplicationNotFound("Cannot execute action")
653
654 # Get leader unit
655 # Racing condition:
656 # Ocassionally, self._get_leader_unit() will return None
657 # because the leader elected hook has not been triggered yet.
658 # Therefore, we are doing some retries. If it happens again,
659 # re-open bug 1236
660 attempts = 3
661 time_between_retries = 10
662 unit = None
663 for _ in range(attempts):
664 unit = await self._get_leader_unit(application)
665 if unit is None:
666 await asyncio.sleep(time_between_retries)
667 else:
668 break
669 if unit is None:
670 raise JujuLeaderUnitNotFound(
671 "Cannot execute action: leader unit not found"
672 )
673
674 actions = await application.get_actions()
675
676 if action_name not in actions:
677 raise JujuActionNotFound(
678 "Action {} not in available actions".format(action_name)
679 )
680
681 action = await unit.run_action(action_name, **kwargs)
682
683 self.log.debug(
684 "Wait until action {} is completed in application {} (model={})".format(
685 action_name, application_name, model_name
686 )
687 )
688 await JujuModelWatcher.wait_for(
689 model=model,
690 entity=action,
691 progress_timeout=progress_timeout,
692 total_timeout=total_timeout,
693 db_dict=db_dict,
694 n2vc=self.n2vc,
695 )
696
697 output = await model.get_action_output(action_uuid=action.entity_id)
698 status = await model.get_action_status(uuid_or_prefix=action.entity_id)
699 status = (
700 status[action.entity_id] if action.entity_id in status else "failed"
701 )
702
703 self.log.debug(
704 "Action {} completed with status {} in application {} (model={})".format(
705 action_name, action.status, application_name, model_name
706 )
707 )
708 finally:
709 await self.disconnect_model(model)
710 await self.disconnect_controller(controller)
711
712 return output, status
713
714 async def get_actions(self, application_name: str, model_name: str) -> dict:
715 """Get list of actions
716
717 :param: application_name: Application name
718 :param: model_name: Model name
719
720 :return: Dict with this format
721 {
722 "action_name": "Description of the action",
723 ...
724 }
725 """
726 self.log.debug(
727 "Getting list of actions for application {}".format(application_name)
728 )
729
730 # Get controller
731 controller = await self.get_controller()
732
733 # Get model
734 model = await self.get_model(controller, model_name)
735
736 try:
737 # Get application
738 application = self._get_application(
739 model,
740 application_name=application_name,
741 )
742
743 # Return list of actions
744 return await application.get_actions()
745
746 finally:
747 # Disconnect from model and controller
748 await self.disconnect_model(model)
749 await self.disconnect_controller(controller)
750
751 async def get_metrics(self, model_name: str, application_name: str) -> dict:
752 """Get the metrics collected by the VCA.
753
754 :param model_name The name or unique id of the network service
755 :param application_name The name of the application
756 """
757 if not model_name or not application_name:
758 raise Exception("model_name and application_name must be non-empty strings")
759 metrics = {}
760 controller = await self.get_controller()
761 model = await self.get_model(controller, model_name)
762 try:
763 application = self._get_application(model, application_name)
764 if application is not None:
765 metrics = await application.get_metrics()
766 finally:
767 self.disconnect_model(model)
768 self.disconnect_controller(controller)
769 return metrics
770
771 async def add_relation(
772 self,
773 model_name: str,
774 endpoint_1: str,
775 endpoint_2: str,
776 ):
777 """Add relation
778
779 :param: model_name: Model name
780 :param: endpoint_1 First endpoint name
781 ("app:endpoint" format or directly the saas name)
782 :param: endpoint_2: Second endpoint name (^ same format)
783 """
784
785 self.log.debug("Adding relation: {} -> {}".format(endpoint_1, endpoint_2))
786
787 # Get controller
788 controller = await self.get_controller()
789
790 # Get model
791 model = await self.get_model(controller, model_name)
792
793 # Add relation
794 try:
795 await model.add_relation(endpoint_1, endpoint_2)
796 except JujuAPIError as e:
797 if "not found" in e.message:
798 self.log.warning("Relation not found: {}".format(e.message))
799 return
800 if "already exists" in e.message:
801 self.log.warning("Relation already exists: {}".format(e.message))
802 return
803 # another exception, raise it
804 raise e
805 finally:
806 await self.disconnect_model(model)
807 await self.disconnect_controller(controller)
808
809 async def consume(
810 self,
811 offer_url: str,
812 model_name: str,
813 ):
814 """
815 Adds a remote offer to the model. Relations can be created later using "juju relate".
816
817 :param: offer_url: Offer Url
818 :param: model_name: Model name
819
820 :raises ParseError if there's a problem parsing the offer_url
821 :raises JujuError if remote offer includes and endpoint
822 :raises JujuAPIError if the operation is not successful
823 """
824 controller = await self.get_controller()
825 model = await controller.get_model(model_name)
826
827 try:
828 await model.consume(offer_url)
829 finally:
830 await self.disconnect_model(model)
831 await self.disconnect_controller(controller)
832
833 async def destroy_model(self, model_name: str, total_timeout: float):
834 """
835 Destroy model
836
837 :param: model_name: Model name
838 :param: total_timeout: Timeout
839 """
840
841 controller = await self.get_controller()
842 model = None
843 try:
844 if not await self.model_exists(model_name, controller=controller):
845 return
846
847 model = await self.get_model(controller, model_name)
848 self.log.debug("Destroying model {}".format(model_name))
849 uuid = model.info.uuid
850
851 # Destroy machines that are manually provisioned
852 # and still are in pending state
853 await self._destroy_pending_machines(model, only_manual=True)
854
855 # Disconnect model
856 await self.disconnect_model(model)
857
858 await controller.destroy_model(uuid, force=True, max_wait=0)
859
860 # Wait until model is destroyed
861 self.log.debug("Waiting for model {} to be destroyed...".format(model_name))
862
863 if total_timeout is None:
864 total_timeout = 3600
865 end = time.time() + total_timeout
866 while time.time() < end:
867 models = await controller.list_models()
868 if model_name not in models:
869 self.log.debug(
870 "The model {} ({}) was destroyed".format(model_name, uuid)
871 )
872 return
873 await asyncio.sleep(5)
874 raise Exception(
875 "Timeout waiting for model {} to be destroyed".format(model_name)
876 )
877 except Exception as e:
878 if model:
879 await self.disconnect_model(model)
880 raise e
881 finally:
882 await self.disconnect_controller(controller)
883
884 async def destroy_application(
885 self, model_name: str, application_name: str, total_timeout: float
886 ):
887 """
888 Destroy application
889
890 :param: model_name: Model name
891 :param: application_name: Application name
892 :param: total_timeout: Timeout
893 """
894
895 controller = await self.get_controller()
896 model = None
897
898 try:
899 model = await self.get_model(controller, model_name)
900 self.log.debug(
901 "Destroying application {} in model {}".format(
902 application_name, model_name
903 )
904 )
905 application = self._get_application(model, application_name)
906 if application:
907 await application.destroy()
908 else:
909 self.log.warning("Application not found: {}".format(application_name))
910
911 self.log.debug(
912 "Waiting for application {} to be destroyed in model {}...".format(
913 application_name, model_name
914 )
915 )
916 if total_timeout is None:
917 total_timeout = 3600
918 end = time.time() + total_timeout
919 while time.time() < end:
920 if not self._get_application(model, application_name):
921 self.log.debug(
922 "The application {} was destroyed in model {} ".format(
923 application_name, model_name
924 )
925 )
926 return
927 await asyncio.sleep(5)
928 raise Exception(
929 "Timeout waiting for application {} to be destroyed in model {}".format(
930 application_name, model_name
931 )
932 )
933 finally:
934 if model is not None:
935 await self.disconnect_model(model)
936 await self.disconnect_controller(controller)
937
938 async def _destroy_pending_machines(self, model: Model, only_manual: bool = False):
939 """
940 Destroy pending machines in a given model
941
942 :param: only_manual: Bool that indicates only manually provisioned
943 machines should be destroyed (if True), or that
944 all pending machines should be destroyed
945 """
946 status = await model.get_status()
947 for machine_id in status.machines:
948 machine_status = status.machines[machine_id]
949 if machine_status.agent_status.status == "pending":
950 if only_manual and not machine_status.instance_id.startswith("manual:"):
951 break
952 machine = model.machines[machine_id]
953 await machine.destroy(force=True)
954
955 # async def destroy_machine(
956 # self, model: Model, machine_id: str, total_timeout: float = 3600
957 # ):
958 # """
959 # Destroy machine
960
961 # :param: model: Model object
962 # :param: machine_id: Machine id
963 # :param: total_timeout: Timeout in seconds
964 # """
965 # machines = await model.get_machines()
966 # if machine_id in machines:
967 # machine = machines[machine_id]
968 # await machine.destroy(force=True)
969 # # max timeout
970 # end = time.time() + total_timeout
971
972 # # wait for machine removal
973 # machines = await model.get_machines()
974 # while machine_id in machines and time.time() < end:
975 # self.log.debug("Waiting for machine {} is destroyed".format(machine_id))
976 # await asyncio.sleep(0.5)
977 # machines = await model.get_machines()
978 # self.log.debug("Machine destroyed: {}".format(machine_id))
979 # else:
980 # self.log.debug("Machine not found: {}".format(machine_id))
981
982 async def configure_application(
983 self, model_name: str, application_name: str, config: dict = None
984 ):
985 """Configure application
986
987 :param: model_name: Model name
988 :param: application_name: Application name
989 :param: config: Config to apply to the charm
990 """
991 self.log.debug("Configuring application {}".format(application_name))
992
993 if config:
994 controller = await self.get_controller()
995 model = None
996 try:
997 model = await self.get_model(controller, model_name)
998 application = self._get_application(
999 model,
1000 application_name=application_name,
1001 )
1002 await application.set_config(config)
1003 finally:
1004 if model:
1005 await self.disconnect_model(model)
1006 await self.disconnect_controller(controller)
1007
1008 def _get_api_endpoints_db(self) -> [str]:
1009 """
1010 Get API Endpoints from DB
1011
1012 :return: List of API endpoints
1013 """
1014 self.log.debug("Getting endpoints from database")
1015
1016 juju_info = self.db.get_one(
1017 DB_DATA.api_endpoints.table,
1018 q_filter=DB_DATA.api_endpoints.filter,
1019 fail_on_empty=False,
1020 )
1021 if juju_info and DB_DATA.api_endpoints.key in juju_info:
1022 return juju_info[DB_DATA.api_endpoints.key]
1023
1024 def _update_api_endpoints_db(self, endpoints: [str]):
1025 """
1026 Update API endpoints in Database
1027
1028 :param: List of endpoints
1029 """
1030 self.log.debug("Saving endpoints {} in database".format(endpoints))
1031
1032 juju_info = self.db.get_one(
1033 DB_DATA.api_endpoints.table,
1034 q_filter=DB_DATA.api_endpoints.filter,
1035 fail_on_empty=False,
1036 )
1037 # If it doesn't, then create it
1038 if not juju_info:
1039 try:
1040 self.db.create(
1041 DB_DATA.api_endpoints.table,
1042 DB_DATA.api_endpoints.filter,
1043 )
1044 except DbException as e:
1045 # Racing condition: check if another N2VC worker has created it
1046 juju_info = self.db.get_one(
1047 DB_DATA.api_endpoints.table,
1048 q_filter=DB_DATA.api_endpoints.filter,
1049 fail_on_empty=False,
1050 )
1051 if not juju_info:
1052 raise e
1053 self.db.set_one(
1054 DB_DATA.api_endpoints.table,
1055 DB_DATA.api_endpoints.filter,
1056 {DB_DATA.api_endpoints.key: endpoints},
1057 )
1058
1059 def handle_exception(self, loop, context):
1060 # All unhandled exceptions by libjuju are handled here.
1061 pass
1062
1063 async def health_check(self, interval: float = 300.0):
1064 """
1065 Health check to make sure controller and controller_model connections are OK
1066
1067 :param: interval: Time in seconds between checks
1068 """
1069 controller = None
1070 while True:
1071 try:
1072 controller = await self.get_controller()
1073 # self.log.debug("VCA is alive")
1074 except Exception as e:
1075 self.log.error("Health check to VCA failed: {}".format(e))
1076 finally:
1077 await self.disconnect_controller(controller)
1078 await asyncio.sleep(interval)
1079
1080 async def list_models(self, contains: str = None) -> [str]:
1081 """List models with certain names
1082
1083 :param: contains: String that is contained in model name
1084
1085 :retur: [models] Returns list of model names
1086 """
1087
1088 controller = await self.get_controller()
1089 try:
1090 models = await controller.list_models()
1091 if contains:
1092 models = [model for model in models if contains in model]
1093 return models
1094 finally:
1095 await self.disconnect_controller(controller)
1096
1097 async def list_offers(self, model_name: str) -> QueryApplicationOffersResults:
1098 """List models with certain names
1099
1100 :param: model_name: Model name
1101
1102 :return: Returns list of offers
1103 """
1104
1105 controller = await self.get_controller()
1106 try:
1107 return await controller.list_offers(model_name)
1108 finally:
1109 await self.disconnect_controller(controller)
1110
1111 async def add_k8s(
1112 self,
1113 name: str,
1114 rbac_id: str,
1115 token: str,
1116 client_cert_data: str,
1117 configuration: Configuration,
1118 storage_class: str,
1119 credential_name: str = None,
1120 ):
1121 """
1122 Add a Kubernetes cloud to the controller
1123
1124 Similar to the `juju add-k8s` command in the CLI
1125
1126 :param: name: Name for the K8s cloud
1127 :param: configuration: Kubernetes configuration object
1128 :param: storage_class: Storage Class to use in the cloud
1129 :param: credential_name: Storage Class to use in the cloud
1130 """
1131
1132 if not storage_class:
1133 raise Exception("storage_class must be a non-empty string")
1134 if not name:
1135 raise Exception("name must be a non-empty string")
1136 if not configuration:
1137 raise Exception("configuration must be provided")
1138
1139 endpoint = configuration.host
1140 credential = self.get_k8s_cloud_credential(
1141 configuration,
1142 client_cert_data,
1143 token,
1144 )
1145 credential.attrs[RBAC_LABEL_KEY_NAME] = rbac_id
1146 cloud = client.Cloud(
1147 type_="kubernetes",
1148 auth_types=[credential.auth_type],
1149 endpoint=endpoint,
1150 ca_certificates=[client_cert_data],
1151 config={
1152 "operator-storage": storage_class,
1153 "workload-storage": storage_class,
1154 },
1155 )
1156
1157 return await self.add_cloud(
1158 name, cloud, credential, credential_name=credential_name
1159 )
1160
1161 def get_k8s_cloud_credential(
1162 self,
1163 configuration: Configuration,
1164 client_cert_data: str,
1165 token: str = None,
1166 ) -> client.CloudCredential:
1167 attrs = {}
1168 # TODO: Test with AKS
1169 key = None # open(configuration.key_file, "r").read()
1170 username = configuration.username
1171 password = configuration.password
1172
1173 if client_cert_data:
1174 attrs["ClientCertificateData"] = client_cert_data
1175 if key:
1176 attrs["ClientKeyData"] = key
1177 if token:
1178 if username or password:
1179 raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass")
1180 attrs["Token"] = token
1181
1182 auth_type = None
1183 if key:
1184 auth_type = "oauth2"
1185 if client_cert_data:
1186 auth_type = "oauth2withcert"
1187 if not token:
1188 raise JujuInvalidK8sConfiguration(
1189 "missing token for auth type {}".format(auth_type)
1190 )
1191 elif username:
1192 if not password:
1193 self.log.debug(
1194 "credential for user {} has empty password".format(username)
1195 )
1196 attrs["username"] = username
1197 attrs["password"] = password
1198 if client_cert_data:
1199 auth_type = "userpasswithcert"
1200 else:
1201 auth_type = "userpass"
1202 elif client_cert_data and token:
1203 auth_type = "certificate"
1204 else:
1205 raise JujuInvalidK8sConfiguration("authentication method not supported")
1206 return client.CloudCredential(auth_type=auth_type, attrs=attrs)
1207
1208 async def add_cloud(
1209 self,
1210 name: str,
1211 cloud: Cloud,
1212 credential: CloudCredential = None,
1213 credential_name: str = None,
1214 ) -> Cloud:
1215 """
1216 Add cloud to the controller
1217
1218 :param: name: Name of the cloud to be added
1219 :param: cloud: Cloud object
1220 :param: credential: CloudCredentials object for the cloud
1221 :param: credential_name: Credential name.
1222 If not defined, cloud of the name will be used.
1223 """
1224 controller = await self.get_controller()
1225 try:
1226 _ = await controller.add_cloud(name, cloud)
1227 if credential:
1228 await controller.add_credential(
1229 credential_name or name, credential=credential, cloud=name
1230 )
1231 # Need to return the object returned by the controller.add_cloud() function
1232 # I'm returning the original value now until this bug is fixed:
1233 # https://github.com/juju/python-libjuju/issues/443
1234 return cloud
1235 finally:
1236 await self.disconnect_controller(controller)
1237
1238 async def remove_cloud(self, name: str):
1239 """
1240 Remove cloud
1241
1242 :param: name: Name of the cloud to be removed
1243 """
1244 controller = await self.get_controller()
1245 try:
1246 await controller.remove_cloud(name)
1247 finally:
1248 await self.disconnect_controller(controller)
1249
1250 async def _get_leader_unit(self, application: Application) -> Unit:
1251 unit = None
1252 for u in application.units:
1253 if await u.is_leader_from_status():
1254 unit = u
1255 break
1256 return unit
1257
1258 async def get_cloud_credentials(self, cloud_name: str, credential_name: str):
1259 controller = await self.get_controller()
1260 try:
1261 facade = client.CloudFacade.from_connection(controller.connection())
1262 cloud_cred_tag = tag.credential(cloud_name, self.username, credential_name)
1263 params = [client.Entity(cloud_cred_tag)]
1264 return (await facade.Credential(params)).results
1265 finally:
1266 await self.disconnect_controller(controller)