7a730335553392c5041706ce28e6303155bff94f
[osm/N2VC.git] / n2vc / libjuju.py
1 # Copyright 2020 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import logging
17
18 import time
19
20 from juju.errors import JujuAPIError
21 from juju.model import Model
22 from juju.machine import Machine
23 from juju.application import Application
24 from juju.unit import Unit
25 from juju.client._definitions import (
26 FullStatus,
27 QueryApplicationOffersResults,
28 Cloud,
29 CloudCredential,
30 )
31 from juju.controller import Controller
32 from juju.client import client
33 from juju import tag
34
35 from n2vc.juju_watcher import JujuModelWatcher
36 from n2vc.provisioner import AsyncSSHProvisioner
37 from n2vc.n2vc_conn import N2VCConnector
38 from n2vc.exceptions import (
39 JujuMachineNotFound,
40 JujuApplicationNotFound,
41 JujuLeaderUnitNotFound,
42 JujuActionNotFound,
43 JujuControllerFailedConnecting,
44 JujuApplicationExists,
45 JujuInvalidK8sConfiguration,
46 )
47 from n2vc.utils import DB_DATA
48 from osm_common.dbbase import DbException
49 from kubernetes.client.configuration import Configuration
50
51 RBAC_LABEL_KEY_NAME = "rbac-id"
52
53
54 class Libjuju:
55 def __init__(
56 self,
57 endpoint: str,
58 api_proxy: str,
59 username: str,
60 password: str,
61 cacert: str,
62 loop: asyncio.AbstractEventLoop = None,
63 log: logging.Logger = None,
64 db: dict = None,
65 n2vc: N2VCConnector = None,
66 apt_mirror: str = None,
67 enable_os_upgrade: bool = True,
68 ):
69 """
70 Constructor
71
72 :param: endpoint: Endpoint of the juju controller (host:port)
73 :param: api_proxy: Endpoint of the juju controller - Reachable from the VNFs
74 :param: username: Juju username
75 :param: password: Juju password
76 :param: cacert: Juju CA Certificate
77 :param: loop: Asyncio loop
78 :param: log: Logger
79 :param: db: DB object
80 :param: n2vc: N2VC object
81 :param: apt_mirror: APT Mirror
82 :param: enable_os_upgrade: Enable OS Upgrade
83 """
84
85 self.log = log or logging.getLogger("Libjuju")
86 self.db = db
87 db_endpoints = self._get_api_endpoints_db()
88 self.endpoints = None
89 if (db_endpoints and endpoint not in db_endpoints) or not db_endpoints:
90 self.endpoints = [endpoint]
91 self._update_api_endpoints_db(self.endpoints)
92 else:
93 self.endpoints = db_endpoints
94 self.api_proxy = api_proxy
95 self.username = username
96 self.password = password
97 self.cacert = cacert
98 self.loop = loop or asyncio.get_event_loop()
99 self.n2vc = n2vc
100
101 # Generate config for models
102 self.model_config = {}
103 if apt_mirror:
104 self.model_config["apt-mirror"] = apt_mirror
105 self.model_config["enable-os-refresh-update"] = enable_os_upgrade
106 self.model_config["enable-os-upgrade"] = enable_os_upgrade
107
108 self.loop.set_exception_handler(self.handle_exception)
109 self.creating_model = asyncio.Lock(loop=self.loop)
110
111 self.log.debug("Libjuju initialized!")
112
113 self.health_check_task = self._create_health_check_task()
114
115 def _create_health_check_task(self):
116 return self.loop.create_task(self.health_check())
117
118 async def get_controller(self, timeout: float = 15.0) -> Controller:
119 """
120 Get controller
121
122 :param: timeout: Time in seconds to wait for controller to connect
123 """
124 controller = None
125 try:
126 controller = Controller(loop=self.loop)
127 await asyncio.wait_for(
128 controller.connect(
129 endpoint=self.endpoints,
130 username=self.username,
131 password=self.password,
132 cacert=self.cacert,
133 ),
134 timeout=timeout,
135 )
136 endpoints = await controller.api_endpoints
137 if self.endpoints != endpoints:
138 self.endpoints = endpoints
139 self._update_api_endpoints_db(self.endpoints)
140 return controller
141 except asyncio.CancelledError as e:
142 raise e
143 except Exception as e:
144 self.log.error(
145 "Failed connecting to controller: {}...".format(self.endpoints)
146 )
147 if controller:
148 await self.disconnect_controller(controller)
149 raise JujuControllerFailedConnecting(e)
150
151 async def disconnect(self):
152 """Disconnect"""
153 # Cancel health check task
154 self.health_check_task.cancel()
155 self.log.debug("Libjuju disconnected!")
156
157 async def disconnect_model(self, model: Model):
158 """
159 Disconnect model
160
161 :param: model: Model that will be disconnected
162 """
163 await model.disconnect()
164
165 async def disconnect_controller(self, controller: Controller):
166 """
167 Disconnect controller
168
169 :param: controller: Controller that will be disconnected
170 """
171 if controller:
172 await controller.disconnect()
173
174 async def add_model(self, model_name: str, cloud_name: str, credential_name=None):
175 """
176 Create model
177
178 :param: model_name: Model name
179 :param: cloud_name: Cloud name
180 :param: credential_name: Credential name to use for adding the model
181 If not specified, same name as the cloud will be used.
182 """
183
184 # Get controller
185 controller = await self.get_controller()
186 model = None
187 try:
188 # Block until other workers have finished model creation
189 while self.creating_model.locked():
190 await asyncio.sleep(0.1)
191
192 # Create the model
193 async with self.creating_model:
194 if await self.model_exists(model_name, controller=controller):
195 return
196 self.log.debug("Creating model {}".format(model_name))
197 model = await controller.add_model(
198 model_name,
199 config=self.model_config,
200 cloud_name=cloud_name,
201 credential_name=credential_name or cloud_name,
202 )
203 finally:
204 if model:
205 await self.disconnect_model(model)
206 await self.disconnect_controller(controller)
207
208 async def get_model(
209 self, controller: Controller, model_name: str, id=None
210 ) -> Model:
211 """
212 Get model from controller
213
214 :param: controller: Controller
215 :param: model_name: Model name
216
217 :return: Model: The created Juju model object
218 """
219 return await controller.get_model(model_name)
220
221 async def model_exists(
222 self, model_name: str, controller: Controller = None
223 ) -> bool:
224 """
225 Check if model exists
226
227 :param: controller: Controller
228 :param: model_name: Model name
229
230 :return bool
231 """
232 need_to_disconnect = False
233
234 # Get controller if not passed
235 if not controller:
236 controller = await self.get_controller()
237 need_to_disconnect = True
238
239 # Check if model exists
240 try:
241 return model_name in await controller.list_models()
242 finally:
243 if need_to_disconnect:
244 await self.disconnect_controller(controller)
245
246 async def models_exist(self, model_names: [str]) -> (bool, list):
247 """
248 Check if models exists
249
250 :param: model_names: List of strings with model names
251
252 :return (bool, list[str]): (True if all models exists, List of model names that don't exist)
253 """
254 if not model_names:
255 raise Exception(
256 "model_names must be a non-empty array. Given value: {}".format(
257 model_names
258 )
259 )
260 non_existing_models = []
261 models = await self.list_models()
262 existing_models = list(set(models).intersection(model_names))
263 non_existing_models = list(set(model_names) - set(existing_models))
264
265 return (
266 len(non_existing_models) == 0,
267 non_existing_models,
268 )
269
270 async def get_model_status(self, model_name: str) -> FullStatus:
271 """
272 Get model status
273
274 :param: model_name: Model name
275
276 :return: Full status object
277 """
278 controller = await self.get_controller()
279 model = await self.get_model(controller, model_name)
280 try:
281 return await model.get_status()
282 finally:
283 await self.disconnect_model(model)
284 await self.disconnect_controller(controller)
285
286 async def create_machine(
287 self,
288 model_name: str,
289 machine_id: str = None,
290 db_dict: dict = None,
291 progress_timeout: float = None,
292 total_timeout: float = None,
293 series: str = "xenial",
294 wait: bool = True,
295 ) -> (Machine, bool):
296 """
297 Create machine
298
299 :param: model_name: Model name
300 :param: machine_id: Machine id
301 :param: db_dict: Dictionary with data of the DB to write the updates
302 :param: progress_timeout: Maximum time between two updates in the model
303 :param: total_timeout: Timeout for the entity to be active
304 :param: series: Series of the machine (xenial, bionic, focal, ...)
305 :param: wait: Wait until machine is ready
306
307 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
308 if the machine is new or it already existed
309 """
310 new = False
311 machine = None
312
313 self.log.debug(
314 "Creating machine (id={}) in model: {}".format(machine_id, model_name)
315 )
316
317 # Get controller
318 controller = await self.get_controller()
319
320 # Get model
321 model = await self.get_model(controller, model_name)
322 try:
323 if machine_id is not None:
324 self.log.debug(
325 "Searching machine (id={}) in model {}".format(
326 machine_id, model_name
327 )
328 )
329
330 # Get machines from model and get the machine with machine_id if exists
331 machines = await model.get_machines()
332 if machine_id in machines:
333 self.log.debug(
334 "Machine (id={}) found in model {}".format(
335 machine_id, model_name
336 )
337 )
338 machine = machines[machine_id]
339 else:
340 raise JujuMachineNotFound("Machine {} not found".format(machine_id))
341
342 if machine is None:
343 self.log.debug("Creating a new machine in model {}".format(model_name))
344
345 # Create machine
346 machine = await model.add_machine(
347 spec=None, constraints=None, disks=None, series=series
348 )
349 new = True
350
351 # Wait until the machine is ready
352 self.log.debug(
353 "Wait until machine {} is ready in model {}".format(
354 machine.entity_id, model_name
355 )
356 )
357 if wait:
358 await JujuModelWatcher.wait_for(
359 model=model,
360 entity=machine,
361 progress_timeout=progress_timeout,
362 total_timeout=total_timeout,
363 db_dict=db_dict,
364 n2vc=self.n2vc,
365 )
366 finally:
367 await self.disconnect_model(model)
368 await self.disconnect_controller(controller)
369
370 self.log.debug(
371 "Machine {} ready at {} in model {}".format(
372 machine.entity_id, machine.dns_name, model_name
373 )
374 )
375 return machine, new
376
377 async def provision_machine(
378 self,
379 model_name: str,
380 hostname: str,
381 username: str,
382 private_key_path: str,
383 db_dict: dict = None,
384 progress_timeout: float = None,
385 total_timeout: float = None,
386 ) -> str:
387 """
388 Manually provisioning of a machine
389
390 :param: model_name: Model name
391 :param: hostname: IP to access the machine
392 :param: username: Username to login to the machine
393 :param: private_key_path: Local path for the private key
394 :param: db_dict: Dictionary with data of the DB to write the updates
395 :param: progress_timeout: Maximum time between two updates in the model
396 :param: total_timeout: Timeout for the entity to be active
397
398 :return: (Entity): Machine id
399 """
400 self.log.debug(
401 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
402 model_name, hostname, username
403 )
404 )
405
406 # Get controller
407 controller = await self.get_controller()
408
409 # Get model
410 model = await self.get_model(controller, model_name)
411
412 try:
413 # Get provisioner
414 provisioner = AsyncSSHProvisioner(
415 host=hostname,
416 user=username,
417 private_key_path=private_key_path,
418 log=self.log,
419 )
420
421 # Provision machine
422 params = await provisioner.provision_machine()
423
424 params.jobs = ["JobHostUnits"]
425
426 self.log.debug("Adding machine to model")
427 connection = model.connection()
428 client_facade = client.ClientFacade.from_connection(connection)
429
430 results = await client_facade.AddMachines(params=[params])
431 error = results.machines[0].error
432
433 if error:
434 msg = "Error adding machine: {}".format(error.message)
435 self.log.error(msg=msg)
436 raise ValueError(msg)
437
438 machine_id = results.machines[0].machine
439
440 self.log.debug("Installing Juju agent into machine {}".format(machine_id))
441 asyncio.ensure_future(
442 provisioner.install_agent(
443 connection=connection,
444 nonce=params.nonce,
445 machine_id=machine_id,
446 proxy=self.api_proxy,
447 series=params.series,
448 )
449 )
450
451 machine = None
452 for _ in range(10):
453 machine_list = await model.get_machines()
454 if machine_id in machine_list:
455 self.log.debug("Machine {} found in model!".format(machine_id))
456 machine = model.machines.get(machine_id)
457 break
458 await asyncio.sleep(2)
459
460 if machine is None:
461 msg = "Machine {} not found in model".format(machine_id)
462 self.log.error(msg=msg)
463 raise JujuMachineNotFound(msg)
464
465 self.log.debug(
466 "Wait until machine {} is ready in model {}".format(
467 machine.entity_id, model_name
468 )
469 )
470 await JujuModelWatcher.wait_for(
471 model=model,
472 entity=machine,
473 progress_timeout=progress_timeout,
474 total_timeout=total_timeout,
475 db_dict=db_dict,
476 n2vc=self.n2vc,
477 )
478 except Exception as e:
479 raise e
480 finally:
481 await self.disconnect_model(model)
482 await self.disconnect_controller(controller)
483
484 self.log.debug(
485 "Machine provisioned {} in model {}".format(machine_id, model_name)
486 )
487
488 return machine_id
489
490 async def deploy(
491 self, uri: str, model_name: str, wait: bool = True, timeout: float = 3600
492 ):
493 """
494 Deploy bundle or charm: Similar to the juju CLI command `juju deploy`
495
496 :param: uri: Path or Charm Store uri in which the charm or bundle can be found
497 :param: model_name: Model name
498 :param: wait: Indicates whether to wait or not until all applications are active
499 :param: timeout: Time in seconds to wait until all applications are active
500 """
501 controller = await self.get_controller()
502 model = await self.get_model(controller, model_name)
503 try:
504 await model.deploy(uri)
505 if wait:
506 await JujuModelWatcher.wait_for_model(model, timeout=timeout)
507 self.log.debug("All units active in model {}".format(model_name))
508 finally:
509 await self.disconnect_model(model)
510 await self.disconnect_controller(controller)
511
512 async def deploy_charm(
513 self,
514 application_name: str,
515 path: str,
516 model_name: str,
517 machine_id: str,
518 db_dict: dict = None,
519 progress_timeout: float = None,
520 total_timeout: float = None,
521 config: dict = None,
522 series: str = None,
523 num_units: int = 1,
524 ):
525 """Deploy charm
526
527 :param: application_name: Application name
528 :param: path: Local path to the charm
529 :param: model_name: Model name
530 :param: machine_id ID of the machine
531 :param: db_dict: Dictionary with data of the DB to write the updates
532 :param: progress_timeout: Maximum time between two updates in the model
533 :param: total_timeout: Timeout for the entity to be active
534 :param: config: Config for the charm
535 :param: series: Series of the charm
536 :param: num_units: Number of units
537
538 :return: (juju.application.Application): Juju application
539 """
540 self.log.debug(
541 "Deploying charm {} to machine {} in model ~{}".format(
542 application_name, machine_id, model_name
543 )
544 )
545 self.log.debug("charm: {}".format(path))
546
547 # Get controller
548 controller = await self.get_controller()
549
550 # Get model
551 model = await self.get_model(controller, model_name)
552
553 try:
554 application = None
555 if application_name not in model.applications:
556
557 if machine_id is not None:
558 if machine_id not in model.machines:
559 msg = "Machine {} not found in model".format(machine_id)
560 self.log.error(msg=msg)
561 raise JujuMachineNotFound(msg)
562 machine = model.machines[machine_id]
563 series = machine.series
564
565 application = await model.deploy(
566 entity_url=path,
567 application_name=application_name,
568 channel="stable",
569 num_units=1,
570 series=series,
571 to=machine_id,
572 config=config,
573 )
574
575 self.log.debug(
576 "Wait until application {} is ready in model {}".format(
577 application_name, model_name
578 )
579 )
580 if num_units > 1:
581 for _ in range(num_units - 1):
582 m, _ = await self.create_machine(model_name, wait=False)
583 await application.add_unit(to=m.entity_id)
584
585 await JujuModelWatcher.wait_for(
586 model=model,
587 entity=application,
588 progress_timeout=progress_timeout,
589 total_timeout=total_timeout,
590 db_dict=db_dict,
591 n2vc=self.n2vc,
592 )
593 self.log.debug(
594 "Application {} is ready in model {}".format(
595 application_name, model_name
596 )
597 )
598 else:
599 raise JujuApplicationExists(
600 "Application {} exists".format(application_name)
601 )
602 finally:
603 await self.disconnect_model(model)
604 await self.disconnect_controller(controller)
605
606 return application
607
608 def _get_application(self, model: Model, application_name: str) -> Application:
609 """Get application
610
611 :param: model: Model object
612 :param: application_name: Application name
613
614 :return: juju.application.Application (or None if it doesn't exist)
615 """
616 if model.applications and application_name in model.applications:
617 return model.applications[application_name]
618
619 async def execute_action(
620 self,
621 application_name: str,
622 model_name: str,
623 action_name: str,
624 db_dict: dict = None,
625 progress_timeout: float = None,
626 total_timeout: float = None,
627 **kwargs
628 ):
629 """Execute action
630
631 :param: application_name: Application name
632 :param: model_name: Model name
633 :param: action_name: Name of the action
634 :param: db_dict: Dictionary with data of the DB to write the updates
635 :param: progress_timeout: Maximum time between two updates in the model
636 :param: total_timeout: Timeout for the entity to be active
637
638 :return: (str, str): (output and status)
639 """
640 self.log.debug(
641 "Executing action {} using params {}".format(action_name, kwargs)
642 )
643 # Get controller
644 controller = await self.get_controller()
645
646 # Get model
647 model = await self.get_model(controller, model_name)
648
649 try:
650 # Get application
651 application = self._get_application(
652 model,
653 application_name=application_name,
654 )
655 if application is None:
656 raise JujuApplicationNotFound("Cannot execute action")
657
658 # Get leader unit
659 # Racing condition:
660 # Ocassionally, self._get_leader_unit() will return None
661 # because the leader elected hook has not been triggered yet.
662 # Therefore, we are doing some retries. If it happens again,
663 # re-open bug 1236
664 attempts = 3
665 time_between_retries = 10
666 unit = None
667 for _ in range(attempts):
668 unit = await self._get_leader_unit(application)
669 if unit is None:
670 await asyncio.sleep(time_between_retries)
671 else:
672 break
673 if unit is None:
674 raise JujuLeaderUnitNotFound(
675 "Cannot execute action: leader unit not found"
676 )
677
678 actions = await application.get_actions()
679
680 if action_name not in actions:
681 raise JujuActionNotFound(
682 "Action {} not in available actions".format(action_name)
683 )
684
685 action = await unit.run_action(action_name, **kwargs)
686
687 self.log.debug(
688 "Wait until action {} is completed in application {} (model={})".format(
689 action_name, application_name, model_name
690 )
691 )
692 await JujuModelWatcher.wait_for(
693 model=model,
694 entity=action,
695 progress_timeout=progress_timeout,
696 total_timeout=total_timeout,
697 db_dict=db_dict,
698 n2vc=self.n2vc,
699 )
700
701 output = await model.get_action_output(action_uuid=action.entity_id)
702 status = await model.get_action_status(uuid_or_prefix=action.entity_id)
703 status = (
704 status[action.entity_id] if action.entity_id in status else "failed"
705 )
706
707 self.log.debug(
708 "Action {} completed with status {} in application {} (model={})".format(
709 action_name, action.status, application_name, model_name
710 )
711 )
712 finally:
713 await self.disconnect_model(model)
714 await self.disconnect_controller(controller)
715
716 return output, status
717
718 async def get_actions(self, application_name: str, model_name: str) -> dict:
719 """Get list of actions
720
721 :param: application_name: Application name
722 :param: model_name: Model name
723
724 :return: Dict with this format
725 {
726 "action_name": "Description of the action",
727 ...
728 }
729 """
730 self.log.debug(
731 "Getting list of actions for application {}".format(application_name)
732 )
733
734 # Get controller
735 controller = await self.get_controller()
736
737 # Get model
738 model = await self.get_model(controller, model_name)
739
740 try:
741 # Get application
742 application = self._get_application(
743 model,
744 application_name=application_name,
745 )
746
747 # Return list of actions
748 return await application.get_actions()
749
750 finally:
751 # Disconnect from model and controller
752 await self.disconnect_model(model)
753 await self.disconnect_controller(controller)
754
755 async def get_metrics(self, model_name: str, application_name: str) -> dict:
756 """Get the metrics collected by the VCA.
757
758 :param model_name The name or unique id of the network service
759 :param application_name The name of the application
760 """
761 if not model_name or not application_name:
762 raise Exception("model_name and application_name must be non-empty strings")
763 metrics = {}
764 controller = await self.get_controller()
765 model = await self.get_model(controller, model_name)
766 try:
767 application = self._get_application(model, application_name)
768 if application is not None:
769 metrics = await application.get_metrics()
770 finally:
771 self.disconnect_model(model)
772 self.disconnect_controller(controller)
773 return metrics
774
775 async def add_relation(
776 self,
777 model_name: str,
778 endpoint_1: str,
779 endpoint_2: str,
780 ):
781 """Add relation
782
783 :param: model_name: Model name
784 :param: endpoint_1 First endpoint name
785 ("app:endpoint" format or directly the saas name)
786 :param: endpoint_2: Second endpoint name (^ same format)
787 """
788
789 self.log.debug("Adding relation: {} -> {}".format(endpoint_1, endpoint_2))
790
791 # Get controller
792 controller = await self.get_controller()
793
794 # Get model
795 model = await self.get_model(controller, model_name)
796
797 # Add relation
798 try:
799 await model.add_relation(endpoint_1, endpoint_2)
800 except JujuAPIError as e:
801 if "not found" in e.message:
802 self.log.warning("Relation not found: {}".format(e.message))
803 return
804 if "already exists" in e.message:
805 self.log.warning("Relation already exists: {}".format(e.message))
806 return
807 # another exception, raise it
808 raise e
809 finally:
810 await self.disconnect_model(model)
811 await self.disconnect_controller(controller)
812
813 async def consume(
814 self,
815 offer_url: str,
816 model_name: str,
817 ):
818 """
819 Adds a remote offer to the model. Relations can be created later using "juju relate".
820
821 :param: offer_url: Offer Url
822 :param: model_name: Model name
823
824 :raises ParseError if there's a problem parsing the offer_url
825 :raises JujuError if remote offer includes and endpoint
826 :raises JujuAPIError if the operation is not successful
827 """
828 controller = await self.get_controller()
829 model = await controller.get_model(model_name)
830
831 try:
832 await model.consume(offer_url)
833 finally:
834 await self.disconnect_model(model)
835 await self.disconnect_controller(controller)
836
837 async def destroy_model(self, model_name: str, total_timeout: float):
838 """
839 Destroy model
840
841 :param: model_name: Model name
842 :param: total_timeout: Timeout
843 """
844
845 controller = await self.get_controller()
846 model = None
847 try:
848 if not await self.model_exists(model_name, controller=controller):
849 return
850
851 model = await self.get_model(controller, model_name)
852 self.log.debug("Destroying model {}".format(model_name))
853 uuid = model.info.uuid
854
855 # Destroy machines that are manually provisioned
856 # and still are in pending state
857 await self._destroy_pending_machines(model, only_manual=True)
858
859 # Disconnect model
860 await self.disconnect_model(model)
861
862 await controller.destroy_model(uuid, force=True, max_wait=0)
863
864 # Wait until model is destroyed
865 self.log.debug("Waiting for model {} to be destroyed...".format(model_name))
866
867 if total_timeout is None:
868 total_timeout = 3600
869 end = time.time() + total_timeout
870 while time.time() < end:
871 models = await controller.list_models()
872 if model_name not in models:
873 self.log.debug(
874 "The model {} ({}) was destroyed".format(model_name, uuid)
875 )
876 return
877 await asyncio.sleep(5)
878 raise Exception(
879 "Timeout waiting for model {} to be destroyed".format(model_name)
880 )
881 except Exception as e:
882 if model:
883 await self.disconnect_model(model)
884 raise e
885 finally:
886 await self.disconnect_controller(controller)
887
888 async def destroy_application(self, model: Model, application_name: str):
889 """
890 Destroy application
891
892 :param: model: Model object
893 :param: application_name: Application name
894 """
895 self.log.debug(
896 "Destroying application {} in model {}".format(
897 application_name, model.info.name
898 )
899 )
900 application = model.applications.get(application_name)
901 if application:
902 await application.destroy()
903 else:
904 self.log.warning("Application not found: {}".format(application_name))
905
906 async def _destroy_pending_machines(self, model: Model, only_manual: bool = False):
907 """
908 Destroy pending machines in a given model
909
910 :param: only_manual: Bool that indicates only manually provisioned
911 machines should be destroyed (if True), or that
912 all pending machines should be destroyed
913 """
914 status = await model.get_status()
915 for machine_id in status.machines:
916 machine_status = status.machines[machine_id]
917 if machine_status.agent_status.status == "pending":
918 if only_manual and not machine_status.instance_id.startswith("manual:"):
919 break
920 machine = model.machines[machine_id]
921 await machine.destroy(force=True)
922
923 async def configure_application(
924 self, model_name: str, application_name: str, config: dict = None
925 ):
926 """Configure application
927
928 :param: model_name: Model name
929 :param: application_name: Application name
930 :param: config: Config to apply to the charm
931 """
932 self.log.debug("Configuring application {}".format(application_name))
933
934 if config:
935 controller = await self.get_controller()
936 model = None
937 try:
938 model = await self.get_model(controller, model_name)
939 application = self._get_application(
940 model,
941 application_name=application_name,
942 )
943 await application.set_config(config)
944 finally:
945 if model:
946 await self.disconnect_model(model)
947 await self.disconnect_controller(controller)
948
949 def _get_api_endpoints_db(self) -> [str]:
950 """
951 Get API Endpoints from DB
952
953 :return: List of API endpoints
954 """
955 self.log.debug("Getting endpoints from database")
956
957 juju_info = self.db.get_one(
958 DB_DATA.api_endpoints.table,
959 q_filter=DB_DATA.api_endpoints.filter,
960 fail_on_empty=False,
961 )
962 if juju_info and DB_DATA.api_endpoints.key in juju_info:
963 return juju_info[DB_DATA.api_endpoints.key]
964
965 def _update_api_endpoints_db(self, endpoints: [str]):
966 """
967 Update API endpoints in Database
968
969 :param: List of endpoints
970 """
971 self.log.debug("Saving endpoints {} in database".format(endpoints))
972
973 juju_info = self.db.get_one(
974 DB_DATA.api_endpoints.table,
975 q_filter=DB_DATA.api_endpoints.filter,
976 fail_on_empty=False,
977 )
978 # If it doesn't, then create it
979 if not juju_info:
980 try:
981 self.db.create(
982 DB_DATA.api_endpoints.table,
983 DB_DATA.api_endpoints.filter,
984 )
985 except DbException as e:
986 # Racing condition: check if another N2VC worker has created it
987 juju_info = self.db.get_one(
988 DB_DATA.api_endpoints.table,
989 q_filter=DB_DATA.api_endpoints.filter,
990 fail_on_empty=False,
991 )
992 if not juju_info:
993 raise e
994 self.db.set_one(
995 DB_DATA.api_endpoints.table,
996 DB_DATA.api_endpoints.filter,
997 {DB_DATA.api_endpoints.key: endpoints},
998 )
999
1000 def handle_exception(self, loop, context):
1001 # All unhandled exceptions by libjuju are handled here.
1002 pass
1003
1004 async def health_check(self, interval: float = 300.0):
1005 """
1006 Health check to make sure controller and controller_model connections are OK
1007
1008 :param: interval: Time in seconds between checks
1009 """
1010 controller = None
1011 while True:
1012 try:
1013 controller = await self.get_controller()
1014 # self.log.debug("VCA is alive")
1015 except Exception as e:
1016 self.log.error("Health check to VCA failed: {}".format(e))
1017 finally:
1018 await self.disconnect_controller(controller)
1019 await asyncio.sleep(interval)
1020
1021 async def list_models(self, contains: str = None) -> [str]:
1022 """List models with certain names
1023
1024 :param: contains: String that is contained in model name
1025
1026 :retur: [models] Returns list of model names
1027 """
1028
1029 controller = await self.get_controller()
1030 try:
1031 models = await controller.list_models()
1032 if contains:
1033 models = [model for model in models if contains in model]
1034 return models
1035 finally:
1036 await self.disconnect_controller(controller)
1037
1038 async def list_offers(self, model_name: str) -> QueryApplicationOffersResults:
1039 """List models with certain names
1040
1041 :param: model_name: Model name
1042
1043 :return: Returns list of offers
1044 """
1045
1046 controller = await self.get_controller()
1047 try:
1048 return await controller.list_offers(model_name)
1049 finally:
1050 await self.disconnect_controller(controller)
1051
1052 async def add_k8s(
1053 self,
1054 name: str,
1055 rbac_id: str,
1056 token: str,
1057 client_cert_data: str,
1058 configuration: Configuration,
1059 storage_class: str,
1060 credential_name: str = None,
1061 ):
1062 """
1063 Add a Kubernetes cloud to the controller
1064
1065 Similar to the `juju add-k8s` command in the CLI
1066
1067 :param: name: Name for the K8s cloud
1068 :param: configuration: Kubernetes configuration object
1069 :param: storage_class: Storage Class to use in the cloud
1070 :param: credential_name: Storage Class to use in the cloud
1071 """
1072
1073 if not storage_class:
1074 raise Exception("storage_class must be a non-empty string")
1075 if not name:
1076 raise Exception("name must be a non-empty string")
1077 if not configuration:
1078 raise Exception("configuration must be provided")
1079
1080 endpoint = configuration.host
1081 credential = self.get_k8s_cloud_credential(
1082 configuration,
1083 client_cert_data,
1084 token,
1085 )
1086 credential.attrs[RBAC_LABEL_KEY_NAME] = rbac_id
1087 cloud = client.Cloud(
1088 type_="kubernetes",
1089 auth_types=[credential.auth_type],
1090 endpoint=endpoint,
1091 ca_certificates=[client_cert_data],
1092 config={
1093 "operator-storage": storage_class,
1094 "workload-storage": storage_class,
1095 },
1096 )
1097
1098 return await self.add_cloud(
1099 name, cloud, credential, credential_name=credential_name
1100 )
1101
1102 def get_k8s_cloud_credential(
1103 self,
1104 configuration: Configuration,
1105 client_cert_data: str,
1106 token: str = None,
1107 ) -> client.CloudCredential:
1108 attrs = {}
1109 # TODO: Test with AKS
1110 key = None # open(configuration.key_file, "r").read()
1111 username = configuration.username
1112 password = configuration.password
1113
1114 if client_cert_data:
1115 attrs["ClientCertificateData"] = client_cert_data
1116 if key:
1117 attrs["ClientKeyData"] = key
1118 if token:
1119 if username or password:
1120 raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass")
1121 attrs["Token"] = token
1122
1123 auth_type = None
1124 if key:
1125 auth_type = "oauth2"
1126 if client_cert_data:
1127 auth_type = "oauth2withcert"
1128 if not token:
1129 raise JujuInvalidK8sConfiguration(
1130 "missing token for auth type {}".format(auth_type)
1131 )
1132 elif username:
1133 if not password:
1134 self.log.debug(
1135 "credential for user {} has empty password".format(username)
1136 )
1137 attrs["username"] = username
1138 attrs["password"] = password
1139 if client_cert_data:
1140 auth_type = "userpasswithcert"
1141 else:
1142 auth_type = "userpass"
1143 elif client_cert_data and token:
1144 auth_type = "certificate"
1145 else:
1146 raise JujuInvalidK8sConfiguration("authentication method not supported")
1147 return client.CloudCredential(auth_type=auth_type, attrs=attrs)
1148
1149 async def add_cloud(
1150 self,
1151 name: str,
1152 cloud: Cloud,
1153 credential: CloudCredential = None,
1154 credential_name: str = None,
1155 ) -> Cloud:
1156 """
1157 Add cloud to the controller
1158
1159 :param: name: Name of the cloud to be added
1160 :param: cloud: Cloud object
1161 :param: credential: CloudCredentials object for the cloud
1162 :param: credential_name: Credential name.
1163 If not defined, cloud of the name will be used.
1164 """
1165 controller = await self.get_controller()
1166 try:
1167 _ = await controller.add_cloud(name, cloud)
1168 if credential:
1169 await controller.add_credential(
1170 credential_name or name, credential=credential, cloud=name
1171 )
1172 # Need to return the object returned by the controller.add_cloud() function
1173 # I'm returning the original value now until this bug is fixed:
1174 # https://github.com/juju/python-libjuju/issues/443
1175 return cloud
1176 finally:
1177 await self.disconnect_controller(controller)
1178
1179 async def remove_cloud(self, name: str):
1180 """
1181 Remove cloud
1182
1183 :param: name: Name of the cloud to be removed
1184 """
1185 controller = await self.get_controller()
1186 try:
1187 await controller.remove_cloud(name)
1188 finally:
1189 await self.disconnect_controller(controller)
1190
1191 async def _get_leader_unit(self, application: Application) -> Unit:
1192 unit = None
1193 for u in application.units:
1194 if await u.is_leader_from_status():
1195 unit = u
1196 break
1197 return unit
1198
1199 async def get_cloud_credentials(self, cloud_name: str, credential_name: str):
1200 controller = await self.get_controller()
1201 try:
1202 facade = client.CloudFacade.from_connection(controller.connection())
1203 cloud_cred_tag = tag.credential(cloud_name, self.username, credential_name)
1204 params = [client.Entity(cloud_cred_tag)]
1205 return (await facade.Credential(params)).results
1206 finally:
1207 await self.disconnect_controller(controller)