Add ModelConfig
[osm/N2VC.git] / n2vc / libjuju.py
1 # Copyright 2020 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import logging
17
18 import time
19
20 from juju.errors import JujuAPIError
21 from juju.model import Model
22 from juju.machine import Machine
23 from juju.application import Application
24 from juju.unit import Unit
25 from juju.client._definitions import (
26 FullStatus,
27 QueryApplicationOffersResults,
28 Cloud,
29 CloudCredential,
30 )
31 from juju.controller import Controller
32 from juju.client import client
33 from juju import tag
34
35 from n2vc.config import ModelConfig
36 from n2vc.juju_watcher import JujuModelWatcher
37 from n2vc.provisioner import AsyncSSHProvisioner
38 from n2vc.n2vc_conn import N2VCConnector
39 from n2vc.exceptions import (
40 JujuMachineNotFound,
41 JujuApplicationNotFound,
42 JujuLeaderUnitNotFound,
43 JujuActionNotFound,
44 JujuControllerFailedConnecting,
45 JujuApplicationExists,
46 JujuInvalidK8sConfiguration,
47 )
48 from n2vc.utils import DB_DATA
49 from osm_common.dbbase import DbException
50 from kubernetes.client.configuration import Configuration
51
52 RBAC_LABEL_KEY_NAME = "rbac-id"
53
54
55 class Libjuju:
56 def __init__(
57 self,
58 endpoint: str,
59 api_proxy: str,
60 username: str,
61 password: str,
62 cacert: str,
63 loop: asyncio.AbstractEventLoop = None,
64 log: logging.Logger = None,
65 db: dict = None,
66 n2vc: N2VCConnector = None,
67 model_config: ModelConfig = {},
68 ):
69 """
70 Constructor
71
72 :param: endpoint: Endpoint of the juju controller (host:port)
73 :param: api_proxy: Endpoint of the juju controller - Reachable from the VNFs
74 :param: username: Juju username
75 :param: password: Juju password
76 :param: cacert: Juju CA Certificate
77 :param: loop: Asyncio loop
78 :param: log: Logger
79 :param: db: DB object
80 :param: n2vc: N2VC object
81 :param: apt_mirror: APT Mirror
82 :param: enable_os_upgrade: Enable OS Upgrade
83 """
84
85 self.log = log or logging.getLogger("Libjuju")
86 self.db = db
87 db_endpoints = self._get_api_endpoints_db()
88 self.endpoints = None
89 if (db_endpoints and endpoint not in db_endpoints) or not db_endpoints:
90 self.endpoints = [endpoint]
91 self._update_api_endpoints_db(self.endpoints)
92 else:
93 self.endpoints = db_endpoints
94 self.api_proxy = api_proxy
95 self.username = username
96 self.password = password
97 self.cacert = cacert
98 self.loop = loop or asyncio.get_event_loop()
99 self.n2vc = n2vc
100
101 # Generate config for models
102 self.model_config = model_config
103
104 self.loop.set_exception_handler(self.handle_exception)
105 self.creating_model = asyncio.Lock(loop=self.loop)
106
107 self.log.debug("Libjuju initialized!")
108
109 self.health_check_task = self._create_health_check_task()
110
111 def _create_health_check_task(self):
112 return self.loop.create_task(self.health_check())
113
114 async def get_controller(self, timeout: float = 15.0) -> Controller:
115 """
116 Get controller
117
118 :param: timeout: Time in seconds to wait for controller to connect
119 """
120 controller = None
121 try:
122 controller = Controller(loop=self.loop)
123 await asyncio.wait_for(
124 controller.connect(
125 endpoint=self.endpoints,
126 username=self.username,
127 password=self.password,
128 cacert=self.cacert,
129 ),
130 timeout=timeout,
131 )
132 endpoints = await controller.api_endpoints
133 if self.endpoints != endpoints:
134 self.endpoints = endpoints
135 self._update_api_endpoints_db(self.endpoints)
136 return controller
137 except asyncio.CancelledError as e:
138 raise e
139 except Exception as e:
140 self.log.error(
141 "Failed connecting to controller: {}...".format(self.endpoints)
142 )
143 if controller:
144 await self.disconnect_controller(controller)
145 raise JujuControllerFailedConnecting(e)
146
147 async def disconnect(self):
148 """Disconnect"""
149 # Cancel health check task
150 self.health_check_task.cancel()
151 self.log.debug("Libjuju disconnected!")
152
153 async def disconnect_model(self, model: Model):
154 """
155 Disconnect model
156
157 :param: model: Model that will be disconnected
158 """
159 await model.disconnect()
160
161 async def disconnect_controller(self, controller: Controller):
162 """
163 Disconnect controller
164
165 :param: controller: Controller that will be disconnected
166 """
167 if controller:
168 await controller.disconnect()
169
170 async def add_model(self, model_name: str, cloud_name: str, credential_name=None):
171 """
172 Create model
173
174 :param: model_name: Model name
175 :param: cloud_name: Cloud name
176 :param: credential_name: Credential name to use for adding the model
177 If not specified, same name as the cloud will be used.
178 """
179
180 # Get controller
181 controller = await self.get_controller()
182 model = None
183 try:
184 # Block until other workers have finished model creation
185 while self.creating_model.locked():
186 await asyncio.sleep(0.1)
187
188 # Create the model
189 async with self.creating_model:
190 if await self.model_exists(model_name, controller=controller):
191 return
192 self.log.debug("Creating model {}".format(model_name))
193 model = await controller.add_model(
194 model_name,
195 config=self.model_config,
196 cloud_name=cloud_name,
197 credential_name=credential_name or cloud_name,
198 )
199 finally:
200 if model:
201 await self.disconnect_model(model)
202 await self.disconnect_controller(controller)
203
204 async def get_model(
205 self, controller: Controller, model_name: str, id=None
206 ) -> Model:
207 """
208 Get model from controller
209
210 :param: controller: Controller
211 :param: model_name: Model name
212
213 :return: Model: The created Juju model object
214 """
215 return await controller.get_model(model_name)
216
217 async def model_exists(
218 self, model_name: str, controller: Controller = None
219 ) -> bool:
220 """
221 Check if model exists
222
223 :param: controller: Controller
224 :param: model_name: Model name
225
226 :return bool
227 """
228 need_to_disconnect = False
229
230 # Get controller if not passed
231 if not controller:
232 controller = await self.get_controller()
233 need_to_disconnect = True
234
235 # Check if model exists
236 try:
237 return model_name in await controller.list_models()
238 finally:
239 if need_to_disconnect:
240 await self.disconnect_controller(controller)
241
242 async def models_exist(self, model_names: [str]) -> (bool, list):
243 """
244 Check if models exists
245
246 :param: model_names: List of strings with model names
247
248 :return (bool, list[str]): (True if all models exists, List of model names that don't exist)
249 """
250 if not model_names:
251 raise Exception(
252 "model_names must be a non-empty array. Given value: {}".format(
253 model_names
254 )
255 )
256 non_existing_models = []
257 models = await self.list_models()
258 existing_models = list(set(models).intersection(model_names))
259 non_existing_models = list(set(model_names) - set(existing_models))
260
261 return (
262 len(non_existing_models) == 0,
263 non_existing_models,
264 )
265
266 async def get_model_status(self, model_name: str) -> FullStatus:
267 """
268 Get model status
269
270 :param: model_name: Model name
271
272 :return: Full status object
273 """
274 controller = await self.get_controller()
275 model = await self.get_model(controller, model_name)
276 try:
277 return await model.get_status()
278 finally:
279 await self.disconnect_model(model)
280 await self.disconnect_controller(controller)
281
282 async def create_machine(
283 self,
284 model_name: str,
285 machine_id: str = None,
286 db_dict: dict = None,
287 progress_timeout: float = None,
288 total_timeout: float = None,
289 series: str = "xenial",
290 wait: bool = True,
291 ) -> (Machine, bool):
292 """
293 Create machine
294
295 :param: model_name: Model name
296 :param: machine_id: Machine id
297 :param: db_dict: Dictionary with data of the DB to write the updates
298 :param: progress_timeout: Maximum time between two updates in the model
299 :param: total_timeout: Timeout for the entity to be active
300 :param: series: Series of the machine (xenial, bionic, focal, ...)
301 :param: wait: Wait until machine is ready
302
303 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
304 if the machine is new or it already existed
305 """
306 new = False
307 machine = None
308
309 self.log.debug(
310 "Creating machine (id={}) in model: {}".format(machine_id, model_name)
311 )
312
313 # Get controller
314 controller = await self.get_controller()
315
316 # Get model
317 model = await self.get_model(controller, model_name)
318 try:
319 if machine_id is not None:
320 self.log.debug(
321 "Searching machine (id={}) in model {}".format(
322 machine_id, model_name
323 )
324 )
325
326 # Get machines from model and get the machine with machine_id if exists
327 machines = await model.get_machines()
328 if machine_id in machines:
329 self.log.debug(
330 "Machine (id={}) found in model {}".format(
331 machine_id, model_name
332 )
333 )
334 machine = machines[machine_id]
335 else:
336 raise JujuMachineNotFound("Machine {} not found".format(machine_id))
337
338 if machine is None:
339 self.log.debug("Creating a new machine in model {}".format(model_name))
340
341 # Create machine
342 machine = await model.add_machine(
343 spec=None, constraints=None, disks=None, series=series
344 )
345 new = True
346
347 # Wait until the machine is ready
348 self.log.debug(
349 "Wait until machine {} is ready in model {}".format(
350 machine.entity_id, model_name
351 )
352 )
353 if wait:
354 await JujuModelWatcher.wait_for(
355 model=model,
356 entity=machine,
357 progress_timeout=progress_timeout,
358 total_timeout=total_timeout,
359 db_dict=db_dict,
360 n2vc=self.n2vc,
361 )
362 finally:
363 await self.disconnect_model(model)
364 await self.disconnect_controller(controller)
365
366 self.log.debug(
367 "Machine {} ready at {} in model {}".format(
368 machine.entity_id, machine.dns_name, model_name
369 )
370 )
371 return machine, new
372
373 async def provision_machine(
374 self,
375 model_name: str,
376 hostname: str,
377 username: str,
378 private_key_path: str,
379 db_dict: dict = None,
380 progress_timeout: float = None,
381 total_timeout: float = None,
382 ) -> str:
383 """
384 Manually provisioning of a machine
385
386 :param: model_name: Model name
387 :param: hostname: IP to access the machine
388 :param: username: Username to login to the machine
389 :param: private_key_path: Local path for the private key
390 :param: db_dict: Dictionary with data of the DB to write the updates
391 :param: progress_timeout: Maximum time between two updates in the model
392 :param: total_timeout: Timeout for the entity to be active
393
394 :return: (Entity): Machine id
395 """
396 self.log.debug(
397 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
398 model_name, hostname, username
399 )
400 )
401
402 # Get controller
403 controller = await self.get_controller()
404
405 # Get model
406 model = await self.get_model(controller, model_name)
407
408 try:
409 # Get provisioner
410 provisioner = AsyncSSHProvisioner(
411 host=hostname,
412 user=username,
413 private_key_path=private_key_path,
414 log=self.log,
415 )
416
417 # Provision machine
418 params = await provisioner.provision_machine()
419
420 params.jobs = ["JobHostUnits"]
421
422 self.log.debug("Adding machine to model")
423 connection = model.connection()
424 client_facade = client.ClientFacade.from_connection(connection)
425
426 results = await client_facade.AddMachines(params=[params])
427 error = results.machines[0].error
428
429 if error:
430 msg = "Error adding machine: {}".format(error.message)
431 self.log.error(msg=msg)
432 raise ValueError(msg)
433
434 machine_id = results.machines[0].machine
435
436 self.log.debug("Installing Juju agent into machine {}".format(machine_id))
437 asyncio.ensure_future(
438 provisioner.install_agent(
439 connection=connection,
440 nonce=params.nonce,
441 machine_id=machine_id,
442 proxy=self.api_proxy,
443 )
444 )
445
446 machine = None
447 for _ in range(10):
448 machine_list = await model.get_machines()
449 if machine_id in machine_list:
450 self.log.debug("Machine {} found in model!".format(machine_id))
451 machine = model.machines.get(machine_id)
452 break
453 await asyncio.sleep(2)
454
455 if machine is None:
456 msg = "Machine {} not found in model".format(machine_id)
457 self.log.error(msg=msg)
458 raise JujuMachineNotFound(msg)
459
460 self.log.debug(
461 "Wait until machine {} is ready in model {}".format(
462 machine.entity_id, model_name
463 )
464 )
465 await JujuModelWatcher.wait_for(
466 model=model,
467 entity=machine,
468 progress_timeout=progress_timeout,
469 total_timeout=total_timeout,
470 db_dict=db_dict,
471 n2vc=self.n2vc,
472 )
473 except Exception as e:
474 raise e
475 finally:
476 await self.disconnect_model(model)
477 await self.disconnect_controller(controller)
478
479 self.log.debug(
480 "Machine provisioned {} in model {}".format(machine_id, model_name)
481 )
482
483 return machine_id
484
485 async def deploy(
486 self, uri: str, model_name: str, wait: bool = True, timeout: float = 3600
487 ):
488 """
489 Deploy bundle or charm: Similar to the juju CLI command `juju deploy`
490
491 :param: uri: Path or Charm Store uri in which the charm or bundle can be found
492 :param: model_name: Model name
493 :param: wait: Indicates whether to wait or not until all applications are active
494 :param: timeout: Time in seconds to wait until all applications are active
495 """
496 controller = await self.get_controller()
497 model = await self.get_model(controller, model_name)
498 try:
499 await model.deploy(uri)
500 if wait:
501 await JujuModelWatcher.wait_for_model(model, timeout=timeout)
502 self.log.debug("All units active in model {}".format(model_name))
503 finally:
504 await self.disconnect_model(model)
505 await self.disconnect_controller(controller)
506
507 async def deploy_charm(
508 self,
509 application_name: str,
510 path: str,
511 model_name: str,
512 machine_id: str,
513 db_dict: dict = None,
514 progress_timeout: float = None,
515 total_timeout: float = None,
516 config: dict = None,
517 series: str = None,
518 num_units: int = 1,
519 ):
520 """Deploy charm
521
522 :param: application_name: Application name
523 :param: path: Local path to the charm
524 :param: model_name: Model name
525 :param: machine_id ID of the machine
526 :param: db_dict: Dictionary with data of the DB to write the updates
527 :param: progress_timeout: Maximum time between two updates in the model
528 :param: total_timeout: Timeout for the entity to be active
529 :param: config: Config for the charm
530 :param: series: Series of the charm
531 :param: num_units: Number of units
532
533 :return: (juju.application.Application): Juju application
534 """
535 self.log.debug(
536 "Deploying charm {} to machine {} in model ~{}".format(
537 application_name, machine_id, model_name
538 )
539 )
540 self.log.debug("charm: {}".format(path))
541
542 # Get controller
543 controller = await self.get_controller()
544
545 # Get model
546 model = await self.get_model(controller, model_name)
547
548 try:
549 application = None
550 if application_name not in model.applications:
551
552 if machine_id is not None:
553 if machine_id not in model.machines:
554 msg = "Machine {} not found in model".format(machine_id)
555 self.log.error(msg=msg)
556 raise JujuMachineNotFound(msg)
557 machine = model.machines[machine_id]
558 series = machine.series
559
560 application = await model.deploy(
561 entity_url=path,
562 application_name=application_name,
563 channel="stable",
564 num_units=1,
565 series=series,
566 to=machine_id,
567 config=config,
568 )
569
570 self.log.debug(
571 "Wait until application {} is ready in model {}".format(
572 application_name, model_name
573 )
574 )
575 if num_units > 1:
576 for _ in range(num_units - 1):
577 m, _ = await self.create_machine(model_name, wait=False)
578 await application.add_unit(to=m.entity_id)
579
580 await JujuModelWatcher.wait_for(
581 model=model,
582 entity=application,
583 progress_timeout=progress_timeout,
584 total_timeout=total_timeout,
585 db_dict=db_dict,
586 n2vc=self.n2vc,
587 )
588 self.log.debug(
589 "Application {} is ready in model {}".format(
590 application_name, model_name
591 )
592 )
593 else:
594 raise JujuApplicationExists(
595 "Application {} exists".format(application_name)
596 )
597 finally:
598 await self.disconnect_model(model)
599 await self.disconnect_controller(controller)
600
601 return application
602
603 def _get_application(self, model: Model, application_name: str) -> Application:
604 """Get application
605
606 :param: model: Model object
607 :param: application_name: Application name
608
609 :return: juju.application.Application (or None if it doesn't exist)
610 """
611 if model.applications and application_name in model.applications:
612 return model.applications[application_name]
613
614 async def execute_action(
615 self,
616 application_name: str,
617 model_name: str,
618 action_name: str,
619 db_dict: dict = None,
620 progress_timeout: float = None,
621 total_timeout: float = None,
622 **kwargs
623 ):
624 """Execute action
625
626 :param: application_name: Application name
627 :param: model_name: Model name
628 :param: action_name: Name of the action
629 :param: db_dict: Dictionary with data of the DB to write the updates
630 :param: progress_timeout: Maximum time between two updates in the model
631 :param: total_timeout: Timeout for the entity to be active
632
633 :return: (str, str): (output and status)
634 """
635 self.log.debug(
636 "Executing action {} using params {}".format(action_name, kwargs)
637 )
638 # Get controller
639 controller = await self.get_controller()
640
641 # Get model
642 model = await self.get_model(controller, model_name)
643
644 try:
645 # Get application
646 application = self._get_application(
647 model,
648 application_name=application_name,
649 )
650 if application is None:
651 raise JujuApplicationNotFound("Cannot execute action")
652
653 # Get leader unit
654 # Racing condition:
655 # Ocassionally, self._get_leader_unit() will return None
656 # because the leader elected hook has not been triggered yet.
657 # Therefore, we are doing some retries. If it happens again,
658 # re-open bug 1236
659 attempts = 3
660 time_between_retries = 10
661 unit = None
662 for _ in range(attempts):
663 unit = await self._get_leader_unit(application)
664 if unit is None:
665 await asyncio.sleep(time_between_retries)
666 else:
667 break
668 if unit is None:
669 raise JujuLeaderUnitNotFound(
670 "Cannot execute action: leader unit not found"
671 )
672
673 actions = await application.get_actions()
674
675 if action_name not in actions:
676 raise JujuActionNotFound(
677 "Action {} not in available actions".format(action_name)
678 )
679
680 action = await unit.run_action(action_name, **kwargs)
681
682 self.log.debug(
683 "Wait until action {} is completed in application {} (model={})".format(
684 action_name, application_name, model_name
685 )
686 )
687 await JujuModelWatcher.wait_for(
688 model=model,
689 entity=action,
690 progress_timeout=progress_timeout,
691 total_timeout=total_timeout,
692 db_dict=db_dict,
693 n2vc=self.n2vc,
694 )
695
696 output = await model.get_action_output(action_uuid=action.entity_id)
697 status = await model.get_action_status(uuid_or_prefix=action.entity_id)
698 status = (
699 status[action.entity_id] if action.entity_id in status else "failed"
700 )
701
702 self.log.debug(
703 "Action {} completed with status {} in application {} (model={})".format(
704 action_name, action.status, application_name, model_name
705 )
706 )
707 finally:
708 await self.disconnect_model(model)
709 await self.disconnect_controller(controller)
710
711 return output, status
712
713 async def get_actions(self, application_name: str, model_name: str) -> dict:
714 """Get list of actions
715
716 :param: application_name: Application name
717 :param: model_name: Model name
718
719 :return: Dict with this format
720 {
721 "action_name": "Description of the action",
722 ...
723 }
724 """
725 self.log.debug(
726 "Getting list of actions for application {}".format(application_name)
727 )
728
729 # Get controller
730 controller = await self.get_controller()
731
732 # Get model
733 model = await self.get_model(controller, model_name)
734
735 try:
736 # Get application
737 application = self._get_application(
738 model,
739 application_name=application_name,
740 )
741
742 # Return list of actions
743 return await application.get_actions()
744
745 finally:
746 # Disconnect from model and controller
747 await self.disconnect_model(model)
748 await self.disconnect_controller(controller)
749
750 async def get_metrics(self, model_name: str, application_name: str) -> dict:
751 """Get the metrics collected by the VCA.
752
753 :param model_name The name or unique id of the network service
754 :param application_name The name of the application
755 """
756 if not model_name or not application_name:
757 raise Exception("model_name and application_name must be non-empty strings")
758 metrics = {}
759 controller = await self.get_controller()
760 model = await self.get_model(controller, model_name)
761 try:
762 application = self._get_application(model, application_name)
763 if application is not None:
764 metrics = await application.get_metrics()
765 finally:
766 self.disconnect_model(model)
767 self.disconnect_controller(controller)
768 return metrics
769
770 async def add_relation(
771 self,
772 model_name: str,
773 endpoint_1: str,
774 endpoint_2: str,
775 ):
776 """Add relation
777
778 :param: model_name: Model name
779 :param: endpoint_1 First endpoint name
780 ("app:endpoint" format or directly the saas name)
781 :param: endpoint_2: Second endpoint name (^ same format)
782 """
783
784 self.log.debug("Adding relation: {} -> {}".format(endpoint_1, endpoint_2))
785
786 # Get controller
787 controller = await self.get_controller()
788
789 # Get model
790 model = await self.get_model(controller, model_name)
791
792 # Add relation
793 try:
794 await model.add_relation(endpoint_1, endpoint_2)
795 except JujuAPIError as e:
796 if "not found" in e.message:
797 self.log.warning("Relation not found: {}".format(e.message))
798 return
799 if "already exists" in e.message:
800 self.log.warning("Relation already exists: {}".format(e.message))
801 return
802 # another exception, raise it
803 raise e
804 finally:
805 await self.disconnect_model(model)
806 await self.disconnect_controller(controller)
807
808 async def consume(
809 self,
810 offer_url: str,
811 model_name: str,
812 ):
813 """
814 Adds a remote offer to the model. Relations can be created later using "juju relate".
815
816 :param: offer_url: Offer Url
817 :param: model_name: Model name
818
819 :raises ParseError if there's a problem parsing the offer_url
820 :raises JujuError if remote offer includes and endpoint
821 :raises JujuAPIError if the operation is not successful
822 """
823 controller = await self.get_controller()
824 model = await controller.get_model(model_name)
825
826 try:
827 await model.consume(offer_url)
828 finally:
829 await self.disconnect_model(model)
830 await self.disconnect_controller(controller)
831
832 async def destroy_model(self, model_name: str, total_timeout: float):
833 """
834 Destroy model
835
836 :param: model_name: Model name
837 :param: total_timeout: Timeout
838 """
839
840 controller = await self.get_controller()
841 model = None
842 try:
843 if not await self.model_exists(model_name, controller=controller):
844 return
845
846 model = await self.get_model(controller, model_name)
847 self.log.debug("Destroying model {}".format(model_name))
848 uuid = model.info.uuid
849
850 # Destroy machines that are manually provisioned
851 # and still are in pending state
852 await self._destroy_pending_machines(model, only_manual=True)
853
854 # Disconnect model
855 await self.disconnect_model(model)
856
857 await controller.destroy_model(uuid, force=True, max_wait=0)
858
859 # Wait until model is destroyed
860 self.log.debug("Waiting for model {} to be destroyed...".format(model_name))
861
862 if total_timeout is None:
863 total_timeout = 3600
864 end = time.time() + total_timeout
865 while time.time() < end:
866 models = await controller.list_models()
867 if model_name not in models:
868 self.log.debug(
869 "The model {} ({}) was destroyed".format(model_name, uuid)
870 )
871 return
872 await asyncio.sleep(5)
873 raise Exception(
874 "Timeout waiting for model {} to be destroyed".format(model_name)
875 )
876 except Exception as e:
877 if model:
878 await self.disconnect_model(model)
879 raise e
880 finally:
881 await self.disconnect_controller(controller)
882
883 async def destroy_application(self, model: Model, application_name: str):
884 """
885 Destroy application
886
887 :param: model: Model object
888 :param: application_name: Application name
889 """
890 self.log.debug(
891 "Destroying application {} in model {}".format(
892 application_name, model.info.name
893 )
894 )
895 application = model.applications.get(application_name)
896 if application:
897 await application.destroy()
898 else:
899 self.log.warning("Application not found: {}".format(application_name))
900
901 async def _destroy_pending_machines(self, model: Model, only_manual: bool = False):
902 """
903 Destroy pending machines in a given model
904
905 :param: only_manual: Bool that indicates only manually provisioned
906 machines should be destroyed (if True), or that
907 all pending machines should be destroyed
908 """
909 status = await model.get_status()
910 for machine_id in status.machines:
911 machine_status = status.machines[machine_id]
912 if machine_status.agent_status.status == "pending":
913 if only_manual and not machine_status.instance_id.startswith("manual:"):
914 break
915 machine = model.machines[machine_id]
916 await machine.destroy(force=True)
917
918 # async def destroy_machine(
919 # self, model: Model, machine_id: str, total_timeout: float = 3600
920 # ):
921 # """
922 # Destroy machine
923
924 # :param: model: Model object
925 # :param: machine_id: Machine id
926 # :param: total_timeout: Timeout in seconds
927 # """
928 # machines = await model.get_machines()
929 # if machine_id in machines:
930 # machine = machines[machine_id]
931 # await machine.destroy(force=True)
932 # # max timeout
933 # end = time.time() + total_timeout
934
935 # # wait for machine removal
936 # machines = await model.get_machines()
937 # while machine_id in machines and time.time() < end:
938 # self.log.debug("Waiting for machine {} is destroyed".format(machine_id))
939 # await asyncio.sleep(0.5)
940 # machines = await model.get_machines()
941 # self.log.debug("Machine destroyed: {}".format(machine_id))
942 # else:
943 # self.log.debug("Machine not found: {}".format(machine_id))
944
945 async def configure_application(
946 self, model_name: str, application_name: str, config: dict = None
947 ):
948 """Configure application
949
950 :param: model_name: Model name
951 :param: application_name: Application name
952 :param: config: Config to apply to the charm
953 """
954 self.log.debug("Configuring application {}".format(application_name))
955
956 if config:
957 controller = await self.get_controller()
958 model = None
959 try:
960 model = await self.get_model(controller, model_name)
961 application = self._get_application(
962 model,
963 application_name=application_name,
964 )
965 await application.set_config(config)
966 finally:
967 if model:
968 await self.disconnect_model(model)
969 await self.disconnect_controller(controller)
970
971 def _get_api_endpoints_db(self) -> [str]:
972 """
973 Get API Endpoints from DB
974
975 :return: List of API endpoints
976 """
977 self.log.debug("Getting endpoints from database")
978
979 juju_info = self.db.get_one(
980 DB_DATA.api_endpoints.table,
981 q_filter=DB_DATA.api_endpoints.filter,
982 fail_on_empty=False,
983 )
984 if juju_info and DB_DATA.api_endpoints.key in juju_info:
985 return juju_info[DB_DATA.api_endpoints.key]
986
987 def _update_api_endpoints_db(self, endpoints: [str]):
988 """
989 Update API endpoints in Database
990
991 :param: List of endpoints
992 """
993 self.log.debug("Saving endpoints {} in database".format(endpoints))
994
995 juju_info = self.db.get_one(
996 DB_DATA.api_endpoints.table,
997 q_filter=DB_DATA.api_endpoints.filter,
998 fail_on_empty=False,
999 )
1000 # If it doesn't, then create it
1001 if not juju_info:
1002 try:
1003 self.db.create(
1004 DB_DATA.api_endpoints.table,
1005 DB_DATA.api_endpoints.filter,
1006 )
1007 except DbException as e:
1008 # Racing condition: check if another N2VC worker has created it
1009 juju_info = self.db.get_one(
1010 DB_DATA.api_endpoints.table,
1011 q_filter=DB_DATA.api_endpoints.filter,
1012 fail_on_empty=False,
1013 )
1014 if not juju_info:
1015 raise e
1016 self.db.set_one(
1017 DB_DATA.api_endpoints.table,
1018 DB_DATA.api_endpoints.filter,
1019 {DB_DATA.api_endpoints.key: endpoints},
1020 )
1021
1022 def handle_exception(self, loop, context):
1023 # All unhandled exceptions by libjuju are handled here.
1024 pass
1025
1026 async def health_check(self, interval: float = 300.0):
1027 """
1028 Health check to make sure controller and controller_model connections are OK
1029
1030 :param: interval: Time in seconds between checks
1031 """
1032 controller = None
1033 while True:
1034 try:
1035 controller = await self.get_controller()
1036 # self.log.debug("VCA is alive")
1037 except Exception as e:
1038 self.log.error("Health check to VCA failed: {}".format(e))
1039 finally:
1040 await self.disconnect_controller(controller)
1041 await asyncio.sleep(interval)
1042
1043 async def list_models(self, contains: str = None) -> [str]:
1044 """List models with certain names
1045
1046 :param: contains: String that is contained in model name
1047
1048 :retur: [models] Returns list of model names
1049 """
1050
1051 controller = await self.get_controller()
1052 try:
1053 models = await controller.list_models()
1054 if contains:
1055 models = [model for model in models if contains in model]
1056 return models
1057 finally:
1058 await self.disconnect_controller(controller)
1059
1060 async def list_offers(self, model_name: str) -> QueryApplicationOffersResults:
1061 """List models with certain names
1062
1063 :param: model_name: Model name
1064
1065 :return: Returns list of offers
1066 """
1067
1068 controller = await self.get_controller()
1069 try:
1070 return await controller.list_offers(model_name)
1071 finally:
1072 await self.disconnect_controller(controller)
1073
1074 async def add_k8s(
1075 self,
1076 name: str,
1077 rbac_id: str,
1078 token: str,
1079 client_cert_data: str,
1080 configuration: Configuration,
1081 storage_class: str,
1082 credential_name: str = None,
1083 ):
1084 """
1085 Add a Kubernetes cloud to the controller
1086
1087 Similar to the `juju add-k8s` command in the CLI
1088
1089 :param: name: Name for the K8s cloud
1090 :param: configuration: Kubernetes configuration object
1091 :param: storage_class: Storage Class to use in the cloud
1092 :param: credential_name: Storage Class to use in the cloud
1093 """
1094
1095 if not storage_class:
1096 raise Exception("storage_class must be a non-empty string")
1097 if not name:
1098 raise Exception("name must be a non-empty string")
1099 if not configuration:
1100 raise Exception("configuration must be provided")
1101
1102 endpoint = configuration.host
1103 credential = self.get_k8s_cloud_credential(
1104 configuration,
1105 client_cert_data,
1106 token,
1107 )
1108 credential.attrs[RBAC_LABEL_KEY_NAME] = rbac_id
1109 cloud = client.Cloud(
1110 type_="kubernetes",
1111 auth_types=[credential.auth_type],
1112 endpoint=endpoint,
1113 ca_certificates=[client_cert_data],
1114 config={
1115 "operator-storage": storage_class,
1116 "workload-storage": storage_class,
1117 },
1118 )
1119
1120 return await self.add_cloud(
1121 name, cloud, credential, credential_name=credential_name
1122 )
1123
1124 def get_k8s_cloud_credential(
1125 self,
1126 configuration: Configuration,
1127 client_cert_data: str,
1128 token: str = None,
1129 ) -> client.CloudCredential:
1130 attrs = {}
1131 # TODO: Test with AKS
1132 key = None # open(configuration.key_file, "r").read()
1133 username = configuration.username
1134 password = configuration.password
1135
1136 if client_cert_data:
1137 attrs["ClientCertificateData"] = client_cert_data
1138 if key:
1139 attrs["ClientKeyData"] = key
1140 if token:
1141 if username or password:
1142 raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass")
1143 attrs["Token"] = token
1144
1145 auth_type = None
1146 if key:
1147 auth_type = "oauth2"
1148 if client_cert_data:
1149 auth_type = "oauth2withcert"
1150 if not token:
1151 raise JujuInvalidK8sConfiguration(
1152 "missing token for auth type {}".format(auth_type)
1153 )
1154 elif username:
1155 if not password:
1156 self.log.debug(
1157 "credential for user {} has empty password".format(username)
1158 )
1159 attrs["username"] = username
1160 attrs["password"] = password
1161 if client_cert_data:
1162 auth_type = "userpasswithcert"
1163 else:
1164 auth_type = "userpass"
1165 elif client_cert_data and token:
1166 auth_type = "certificate"
1167 else:
1168 raise JujuInvalidK8sConfiguration("authentication method not supported")
1169 return client.CloudCredential(auth_type=auth_type, attrs=attrs)
1170
1171 async def add_cloud(
1172 self,
1173 name: str,
1174 cloud: Cloud,
1175 credential: CloudCredential = None,
1176 credential_name: str = None,
1177 ) -> Cloud:
1178 """
1179 Add cloud to the controller
1180
1181 :param: name: Name of the cloud to be added
1182 :param: cloud: Cloud object
1183 :param: credential: CloudCredentials object for the cloud
1184 :param: credential_name: Credential name.
1185 If not defined, cloud of the name will be used.
1186 """
1187 controller = await self.get_controller()
1188 try:
1189 _ = await controller.add_cloud(name, cloud)
1190 if credential:
1191 await controller.add_credential(
1192 credential_name or name, credential=credential, cloud=name
1193 )
1194 # Need to return the object returned by the controller.add_cloud() function
1195 # I'm returning the original value now until this bug is fixed:
1196 # https://github.com/juju/python-libjuju/issues/443
1197 return cloud
1198 finally:
1199 await self.disconnect_controller(controller)
1200
1201 async def remove_cloud(self, name: str):
1202 """
1203 Remove cloud
1204
1205 :param: name: Name of the cloud to be removed
1206 """
1207 controller = await self.get_controller()
1208 try:
1209 await controller.remove_cloud(name)
1210 finally:
1211 await self.disconnect_controller(controller)
1212
1213 async def _get_leader_unit(self, application: Application) -> Unit:
1214 unit = None
1215 for u in application.units:
1216 if await u.is_leader_from_status():
1217 unit = u
1218 break
1219 return unit
1220
1221 async def get_cloud_credentials(self, cloud_name: str, credential_name: str):
1222 controller = await self.get_controller()
1223 try:
1224 facade = client.CloudFacade.from_connection(controller.connection())
1225 cloud_cred_tag = tag.credential(cloud_name, self.username, credential_name)
1226 params = [client.Entity(cloud_cred_tag)]
1227 return (await facade.Credential(params)).results
1228 finally:
1229 await self.disconnect_controller(controller)