Feature 9952: Distributed Proxy Charms
[osm/N2VC.git] / n2vc / libjuju.py
1 # Copyright 2020 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import logging
17 from juju.controller import Controller
18 from juju.client import client
19 import time
20
21 from juju.errors import JujuAPIError
22 from juju.model import Model
23 from juju.machine import Machine
24 from juju.application import Application
25 from juju.unit import Unit
26 from juju.client._definitions import (
27 FullStatus,
28 QueryApplicationOffersResults,
29 Cloud,
30 CloudCredential,
31 )
32 from n2vc.juju_watcher import JujuModelWatcher
33 from n2vc.provisioner import AsyncSSHProvisioner
34 from n2vc.n2vc_conn import N2VCConnector
35 from n2vc.exceptions import (
36 JujuMachineNotFound,
37 JujuApplicationNotFound,
38 JujuLeaderUnitNotFound,
39 JujuActionNotFound,
40 JujuModelAlreadyExists,
41 JujuControllerFailedConnecting,
42 JujuApplicationExists,
43 JujuInvalidK8sConfiguration,
44 )
45 from n2vc.utils import DB_DATA
46 from osm_common.dbbase import DbException
47 from kubernetes.client.configuration import Configuration
48
49
50 class Libjuju:
51 def __init__(
52 self,
53 endpoint: str,
54 api_proxy: str,
55 username: str,
56 password: str,
57 cacert: str,
58 loop: asyncio.AbstractEventLoop = None,
59 log: logging.Logger = None,
60 db: dict = None,
61 n2vc: N2VCConnector = None,
62 apt_mirror: str = None,
63 enable_os_upgrade: bool = True,
64 ):
65 """
66 Constructor
67
68 :param: endpoint: Endpoint of the juju controller (host:port)
69 :param: api_proxy: Endpoint of the juju controller - Reachable from the VNFs
70 :param: username: Juju username
71 :param: password: Juju password
72 :param: cacert: Juju CA Certificate
73 :param: loop: Asyncio loop
74 :param: log: Logger
75 :param: db: DB object
76 :param: n2vc: N2VC object
77 :param: apt_mirror: APT Mirror
78 :param: enable_os_upgrade: Enable OS Upgrade
79 """
80
81 self.log = log or logging.getLogger("Libjuju")
82 self.db = db
83 db_endpoints = self._get_api_endpoints_db()
84 self.endpoints = None
85 if (db_endpoints and endpoint not in db_endpoints) or not db_endpoints:
86 self.endpoints = [endpoint]
87 self._update_api_endpoints_db(self.endpoints)
88 else:
89 self.endpoints = db_endpoints
90 self.api_proxy = api_proxy
91 self.username = username
92 self.password = password
93 self.cacert = cacert
94 self.loop = loop or asyncio.get_event_loop()
95 self.n2vc = n2vc
96
97 # Generate config for models
98 self.model_config = {}
99 if apt_mirror:
100 self.model_config["apt-mirror"] = apt_mirror
101 self.model_config["enable-os-refresh-update"] = enable_os_upgrade
102 self.model_config["enable-os-upgrade"] = enable_os_upgrade
103
104 self.loop.set_exception_handler(self.handle_exception)
105 self.creating_model = asyncio.Lock(loop=self.loop)
106
107 self.models = set()
108 self.log.debug("Libjuju initialized!")
109
110 self.health_check_task = self._create_health_check_task()
111
112 def _create_health_check_task(self):
113 return self.loop.create_task(self.health_check())
114
115 async def get_controller(self, timeout: float = 5.0) -> Controller:
116 """
117 Get controller
118
119 :param: timeout: Time in seconds to wait for controller to connect
120 """
121 controller = None
122 try:
123 controller = Controller(loop=self.loop)
124 await asyncio.wait_for(
125 controller.connect(
126 endpoint=self.endpoints,
127 username=self.username,
128 password=self.password,
129 cacert=self.cacert,
130 ),
131 timeout=timeout,
132 )
133 endpoints = await controller.api_endpoints
134 if self.endpoints != endpoints:
135 self.endpoints = endpoints
136 self._update_api_endpoints_db(self.endpoints)
137 return controller
138 except asyncio.CancelledError as e:
139 raise e
140 except Exception as e:
141 self.log.error(
142 "Failed connecting to controller: {}...".format(self.endpoints)
143 )
144 if controller:
145 await self.disconnect_controller(controller)
146 raise JujuControllerFailedConnecting(e)
147
148 async def disconnect(self):
149 """Disconnect"""
150 # Cancel health check task
151 self.health_check_task.cancel()
152 self.log.debug("Libjuju disconnected!")
153
154 async def disconnect_model(self, model: Model):
155 """
156 Disconnect model
157
158 :param: model: Model that will be disconnected
159 """
160 await model.disconnect()
161
162 async def disconnect_controller(self, controller: Controller):
163 """
164 Disconnect controller
165
166 :param: controller: Controller that will be disconnected
167 """
168 if controller:
169 await controller.disconnect()
170
171 async def add_model(self, model_name: str, cloud_name: str, credential_name=None):
172 """
173 Create model
174
175 :param: model_name: Model name
176 :param: cloud_name: Cloud name
177 :param: credential_name: Credential name to use for adding the model
178 If not specified, same name as the cloud will be used.
179 """
180
181 # Get controller
182 controller = await self.get_controller()
183 model = None
184 try:
185 # Raise exception if model already exists
186 if await self.model_exists(model_name, controller=controller):
187 raise JujuModelAlreadyExists(
188 "Model {} already exists.".format(model_name)
189 )
190
191 # Block until other workers have finished model creation
192 while self.creating_model.locked():
193 await asyncio.sleep(0.1)
194
195 # If the model exists, return it from the controller
196 if model_name in self.models:
197 return
198
199 # Create the model
200 async with self.creating_model:
201 self.log.debug("Creating model {}".format(model_name))
202 model = await controller.add_model(
203 model_name,
204 config=self.model_config,
205 cloud_name=cloud_name,
206 credential_name=credential_name or cloud_name,
207 )
208 self.models.add(model_name)
209 finally:
210 if model:
211 await self.disconnect_model(model)
212 await self.disconnect_controller(controller)
213
214 async def get_model(
215 self, controller: Controller, model_name: str, id=None
216 ) -> Model:
217 """
218 Get model from controller
219
220 :param: controller: Controller
221 :param: model_name: Model name
222
223 :return: Model: The created Juju model object
224 """
225 return await controller.get_model(model_name)
226
227 async def model_exists(
228 self, model_name: str, controller: Controller = None
229 ) -> bool:
230 """
231 Check if model exists
232
233 :param: controller: Controller
234 :param: model_name: Model name
235
236 :return bool
237 """
238 need_to_disconnect = False
239
240 # Get controller if not passed
241 if not controller:
242 controller = await self.get_controller()
243 need_to_disconnect = True
244
245 # Check if model exists
246 try:
247 return model_name in await controller.list_models()
248 finally:
249 if need_to_disconnect:
250 await self.disconnect_controller(controller)
251
252 async def models_exist(self, model_names: [str]) -> (bool, list):
253 """
254 Check if models exists
255
256 :param: model_names: List of strings with model names
257
258 :return (bool, list[str]): (True if all models exists, List of model names that don't exist)
259 """
260 if not model_names:
261 raise Exception(
262 "model_names must be a non-empty array. Given value: {}".format(
263 model_names
264 )
265 )
266 non_existing_models = []
267 models = await self.list_models()
268 existing_models = list(set(models).intersection(model_names))
269 non_existing_models = list(set(model_names) - set(existing_models))
270
271 return (
272 len(non_existing_models) == 0,
273 non_existing_models,
274 )
275
276 async def get_model_status(self, model_name: str) -> FullStatus:
277 """
278 Get model status
279
280 :param: model_name: Model name
281
282 :return: Full status object
283 """
284 controller = await self.get_controller()
285 model = await self.get_model(controller, model_name)
286 try:
287 return await model.get_status()
288 finally:
289 await self.disconnect_model(model)
290 await self.disconnect_controller(controller)
291
292 async def create_machine(
293 self,
294 model_name: str,
295 machine_id: str = None,
296 db_dict: dict = None,
297 progress_timeout: float = None,
298 total_timeout: float = None,
299 series: str = "xenial",
300 wait: bool = True,
301 ) -> (Machine, bool):
302 """
303 Create machine
304
305 :param: model_name: Model name
306 :param: machine_id: Machine id
307 :param: db_dict: Dictionary with data of the DB to write the updates
308 :param: progress_timeout: Maximum time between two updates in the model
309 :param: total_timeout: Timeout for the entity to be active
310 :param: series: Series of the machine (xenial, bionic, focal, ...)
311 :param: wait: Wait until machine is ready
312
313 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
314 if the machine is new or it already existed
315 """
316 new = False
317 machine = None
318
319 self.log.debug(
320 "Creating machine (id={}) in model: {}".format(machine_id, model_name)
321 )
322
323 # Get controller
324 controller = await self.get_controller()
325
326 # Get model
327 model = await self.get_model(controller, model_name)
328 try:
329 if machine_id is not None:
330 self.log.debug(
331 "Searching machine (id={}) in model {}".format(
332 machine_id, model_name
333 )
334 )
335
336 # Get machines from model and get the machine with machine_id if exists
337 machines = await model.get_machines()
338 if machine_id in machines:
339 self.log.debug(
340 "Machine (id={}) found in model {}".format(
341 machine_id, model_name
342 )
343 )
344 machine = machines[machine_id]
345 else:
346 raise JujuMachineNotFound("Machine {} not found".format(machine_id))
347
348 if machine is None:
349 self.log.debug("Creating a new machine in model {}".format(model_name))
350
351 # Create machine
352 machine = await model.add_machine(
353 spec=None, constraints=None, disks=None, series=series
354 )
355 new = True
356
357 # Wait until the machine is ready
358 self.log.debug(
359 "Wait until machine {} is ready in model {}".format(
360 machine.entity_id, model_name
361 )
362 )
363 if wait:
364 await JujuModelWatcher.wait_for(
365 model=model,
366 entity=machine,
367 progress_timeout=progress_timeout,
368 total_timeout=total_timeout,
369 db_dict=db_dict,
370 n2vc=self.n2vc,
371 )
372 finally:
373 await self.disconnect_model(model)
374 await self.disconnect_controller(controller)
375
376 self.log.debug(
377 "Machine {} ready at {} in model {}".format(
378 machine.entity_id, machine.dns_name, model_name
379 )
380 )
381 return machine, new
382
383 async def provision_machine(
384 self,
385 model_name: str,
386 hostname: str,
387 username: str,
388 private_key_path: str,
389 db_dict: dict = None,
390 progress_timeout: float = None,
391 total_timeout: float = None,
392 ) -> str:
393 """
394 Manually provisioning of a machine
395
396 :param: model_name: Model name
397 :param: hostname: IP to access the machine
398 :param: username: Username to login to the machine
399 :param: private_key_path: Local path for the private key
400 :param: db_dict: Dictionary with data of the DB to write the updates
401 :param: progress_timeout: Maximum time between two updates in the model
402 :param: total_timeout: Timeout for the entity to be active
403
404 :return: (Entity): Machine id
405 """
406 self.log.debug(
407 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
408 model_name, hostname, username
409 )
410 )
411
412 # Get controller
413 controller = await self.get_controller()
414
415 # Get model
416 model = await self.get_model(controller, model_name)
417
418 try:
419 # Get provisioner
420 provisioner = AsyncSSHProvisioner(
421 host=hostname,
422 user=username,
423 private_key_path=private_key_path,
424 log=self.log,
425 )
426
427 # Provision machine
428 params = await provisioner.provision_machine()
429
430 params.jobs = ["JobHostUnits"]
431
432 self.log.debug("Adding machine to model")
433 connection = model.connection()
434 client_facade = client.ClientFacade.from_connection(connection)
435
436 results = await client_facade.AddMachines(params=[params])
437 error = results.machines[0].error
438
439 if error:
440 msg = "Error adding machine: {}".format(error.message)
441 self.log.error(msg=msg)
442 raise ValueError(msg)
443
444 machine_id = results.machines[0].machine
445
446 self.log.debug("Installing Juju agent into machine {}".format(machine_id))
447 asyncio.ensure_future(
448 provisioner.install_agent(
449 connection=connection,
450 nonce=params.nonce,
451 machine_id=machine_id,
452 proxy=self.api_proxy,
453 )
454 )
455
456 machine = None
457 for _ in range(10):
458 machine_list = await model.get_machines()
459 if machine_id in machine_list:
460 self.log.debug("Machine {} found in model!".format(machine_id))
461 machine = model.machines.get(machine_id)
462 break
463 await asyncio.sleep(2)
464
465 if machine is None:
466 msg = "Machine {} not found in model".format(machine_id)
467 self.log.error(msg=msg)
468 raise JujuMachineNotFound(msg)
469
470 self.log.debug(
471 "Wait until machine {} is ready in model {}".format(
472 machine.entity_id, model_name
473 )
474 )
475 await JujuModelWatcher.wait_for(
476 model=model,
477 entity=machine,
478 progress_timeout=progress_timeout,
479 total_timeout=total_timeout,
480 db_dict=db_dict,
481 n2vc=self.n2vc,
482 )
483 except Exception as e:
484 raise e
485 finally:
486 await self.disconnect_model(model)
487 await self.disconnect_controller(controller)
488
489 self.log.debug(
490 "Machine provisioned {} in model {}".format(machine_id, model_name)
491 )
492
493 return machine_id
494
495 async def deploy(
496 self, uri: str, model_name: str, wait: bool = True, timeout: float = 3600
497 ):
498 """
499 Deploy bundle or charm: Similar to the juju CLI command `juju deploy`
500
501 :param: uri: Path or Charm Store uri in which the charm or bundle can be found
502 :param: model_name: Model name
503 :param: wait: Indicates whether to wait or not until all applications are active
504 :param: timeout: Time in seconds to wait until all applications are active
505 """
506 controller = await self.get_controller()
507 model = await self.get_model(controller, model_name)
508 try:
509 await model.deploy(uri)
510 if wait:
511 await JujuModelWatcher.wait_for_model(model, timeout=timeout)
512 self.log.debug("All units active in model {}".format(model_name))
513 finally:
514 await self.disconnect_model(model)
515 await self.disconnect_controller(controller)
516
517 async def deploy_charm(
518 self,
519 application_name: str,
520 path: str,
521 model_name: str,
522 machine_id: str,
523 db_dict: dict = None,
524 progress_timeout: float = None,
525 total_timeout: float = None,
526 config: dict = None,
527 series: str = None,
528 num_units: int = 1,
529 ):
530 """Deploy charm
531
532 :param: application_name: Application name
533 :param: path: Local path to the charm
534 :param: model_name: Model name
535 :param: machine_id ID of the machine
536 :param: db_dict: Dictionary with data of the DB to write the updates
537 :param: progress_timeout: Maximum time between two updates in the model
538 :param: total_timeout: Timeout for the entity to be active
539 :param: config: Config for the charm
540 :param: series: Series of the charm
541 :param: num_units: Number of units
542
543 :return: (juju.application.Application): Juju application
544 """
545 self.log.debug(
546 "Deploying charm {} to machine {} in model ~{}".format(
547 application_name, machine_id, model_name
548 )
549 )
550 self.log.debug("charm: {}".format(path))
551
552 # Get controller
553 controller = await self.get_controller()
554
555 # Get model
556 model = await self.get_model(controller, model_name)
557
558 try:
559 application = None
560 if application_name not in model.applications:
561
562 if machine_id is not None:
563 if machine_id not in model.machines:
564 msg = "Machine {} not found in model".format(machine_id)
565 self.log.error(msg=msg)
566 raise JujuMachineNotFound(msg)
567 machine = model.machines[machine_id]
568 series = machine.series
569
570 application = await model.deploy(
571 entity_url=path,
572 application_name=application_name,
573 channel="stable",
574 num_units=1,
575 series=series,
576 to=machine_id,
577 config=config,
578 )
579
580 self.log.debug(
581 "Wait until application {} is ready in model {}".format(
582 application_name, model_name
583 )
584 )
585 if num_units > 1:
586 for _ in range(num_units - 1):
587 m, _ = await self.create_machine(model_name, wait=False)
588 await application.add_unit(to=m.entity_id)
589
590 await JujuModelWatcher.wait_for(
591 model=model,
592 entity=application,
593 progress_timeout=progress_timeout,
594 total_timeout=total_timeout,
595 db_dict=db_dict,
596 n2vc=self.n2vc,
597 )
598 self.log.debug(
599 "Application {} is ready in model {}".format(
600 application_name, model_name
601 )
602 )
603 else:
604 raise JujuApplicationExists(
605 "Application {} exists".format(application_name)
606 )
607 finally:
608 await self.disconnect_model(model)
609 await self.disconnect_controller(controller)
610
611 return application
612
613 def _get_application(self, model: Model, application_name: str) -> Application:
614 """Get application
615
616 :param: model: Model object
617 :param: application_name: Application name
618
619 :return: juju.application.Application (or None if it doesn't exist)
620 """
621 if model.applications and application_name in model.applications:
622 return model.applications[application_name]
623
624 async def execute_action(
625 self,
626 application_name: str,
627 model_name: str,
628 action_name: str,
629 db_dict: dict = None,
630 progress_timeout: float = None,
631 total_timeout: float = None,
632 **kwargs
633 ):
634 """Execute action
635
636 :param: application_name: Application name
637 :param: model_name: Model name
638 :param: action_name: Name of the action
639 :param: db_dict: Dictionary with data of the DB to write the updates
640 :param: progress_timeout: Maximum time between two updates in the model
641 :param: total_timeout: Timeout for the entity to be active
642
643 :return: (str, str): (output and status)
644 """
645 self.log.debug(
646 "Executing action {} using params {}".format(action_name, kwargs)
647 )
648 # Get controller
649 controller = await self.get_controller()
650
651 # Get model
652 model = await self.get_model(controller, model_name)
653
654 try:
655 # Get application
656 application = self._get_application(
657 model, application_name=application_name,
658 )
659 if application is None:
660 raise JujuApplicationNotFound("Cannot execute action")
661
662 # Get leader unit
663 # Racing condition:
664 # Ocassionally, self._get_leader_unit() will return None
665 # because the leader elected hook has not been triggered yet.
666 # Therefore, we are doing some retries. If it happens again,
667 # re-open bug 1236
668 attempts = 3
669 time_between_retries = 10
670 unit = None
671 for _ in range(attempts):
672 unit = await self._get_leader_unit(application)
673 if unit is None:
674 await asyncio.sleep(time_between_retries)
675 else:
676 break
677 if unit is None:
678 raise JujuLeaderUnitNotFound(
679 "Cannot execute action: leader unit not found"
680 )
681
682 actions = await application.get_actions()
683
684 if action_name not in actions:
685 raise JujuActionNotFound(
686 "Action {} not in available actions".format(action_name)
687 )
688
689 action = await unit.run_action(action_name, **kwargs)
690
691 self.log.debug(
692 "Wait until action {} is completed in application {} (model={})".format(
693 action_name, application_name, model_name
694 )
695 )
696 await JujuModelWatcher.wait_for(
697 model=model,
698 entity=action,
699 progress_timeout=progress_timeout,
700 total_timeout=total_timeout,
701 db_dict=db_dict,
702 n2vc=self.n2vc,
703 )
704
705 output = await model.get_action_output(action_uuid=action.entity_id)
706 status = await model.get_action_status(uuid_or_prefix=action.entity_id)
707 status = (
708 status[action.entity_id] if action.entity_id in status else "failed"
709 )
710
711 self.log.debug(
712 "Action {} completed with status {} in application {} (model={})".format(
713 action_name, action.status, application_name, model_name
714 )
715 )
716 finally:
717 await self.disconnect_model(model)
718 await self.disconnect_controller(controller)
719
720 return output, status
721
722 async def get_actions(self, application_name: str, model_name: str) -> dict:
723 """Get list of actions
724
725 :param: application_name: Application name
726 :param: model_name: Model name
727
728 :return: Dict with this format
729 {
730 "action_name": "Description of the action",
731 ...
732 }
733 """
734 self.log.debug(
735 "Getting list of actions for application {}".format(application_name)
736 )
737
738 # Get controller
739 controller = await self.get_controller()
740
741 # Get model
742 model = await self.get_model(controller, model_name)
743
744 try:
745 # Get application
746 application = self._get_application(
747 model, application_name=application_name,
748 )
749
750 # Return list of actions
751 return await application.get_actions()
752
753 finally:
754 # Disconnect from model and controller
755 await self.disconnect_model(model)
756 await self.disconnect_controller(controller)
757
758 async def get_metrics(self, model_name: str, application_name: str) -> dict:
759 """Get the metrics collected by the VCA.
760
761 :param model_name The name or unique id of the network service
762 :param application_name The name of the application
763 """
764 if not model_name or not application_name:
765 raise Exception("model_name and application_name must be non-empty strings")
766 metrics = {}
767 controller = await self.get_controller()
768 model = await self.get_model(controller, model_name)
769 try:
770 application = self._get_application(model, application_name)
771 if application is not None:
772 metrics = await application.get_metrics()
773 finally:
774 self.disconnect_model(model)
775 self.disconnect_controller(controller)
776 return metrics
777
778 async def add_relation(
779 self, model_name: str, endpoint_1: str, endpoint_2: str,
780 ):
781 """Add relation
782
783 :param: model_name: Model name
784 :param: endpoint_1 First endpoint name
785 ("app:endpoint" format or directly the saas name)
786 :param: endpoint_2: Second endpoint name (^ same format)
787 """
788
789 self.log.debug("Adding relation: {} -> {}".format(endpoint_1, endpoint_2))
790
791 # Get controller
792 controller = await self.get_controller()
793
794 # Get model
795 model = await self.get_model(controller, model_name)
796
797 # Add relation
798 try:
799 await model.add_relation(endpoint_1, endpoint_2)
800 except JujuAPIError as e:
801 if "not found" in e.message:
802 self.log.warning("Relation not found: {}".format(e.message))
803 return
804 if "already exists" in e.message:
805 self.log.warning("Relation already exists: {}".format(e.message))
806 return
807 # another exception, raise it
808 raise e
809 finally:
810 await self.disconnect_model(model)
811 await self.disconnect_controller(controller)
812
813 async def consume(
814 self, offer_url: str, model_name: str,
815 ):
816 """
817 Adds a remote offer to the model. Relations can be created later using "juju relate".
818
819 :param: offer_url: Offer Url
820 :param: model_name: Model name
821
822 :raises ParseError if there's a problem parsing the offer_url
823 :raises JujuError if remote offer includes and endpoint
824 :raises JujuAPIError if the operation is not successful
825 """
826 controller = await self.get_controller()
827 model = await controller.get_model(model_name)
828
829 try:
830 await model.consume(offer_url)
831 finally:
832 await self.disconnect_model(model)
833 await self.disconnect_controller(controller)
834
835 async def destroy_model(self, model_name: str, total_timeout: float):
836 """
837 Destroy model
838
839 :param: model_name: Model name
840 :param: total_timeout: Timeout
841 """
842
843 controller = await self.get_controller()
844 model = await self.get_model(controller, model_name)
845 try:
846 self.log.debug("Destroying model {}".format(model_name))
847 uuid = model.info.uuid
848
849 # Destroy machines that are manually provisioned
850 # and still are in pending state
851 await self._destroy_pending_machines(model, only_manual=True)
852
853 # Disconnect model
854 await self.disconnect_model(model)
855
856 # Destroy model
857 if model_name in self.models:
858 self.models.remove(model_name)
859
860 await controller.destroy_model(uuid, force=True, max_wait=0)
861
862 # Wait until model is destroyed
863 self.log.debug("Waiting for model {} to be destroyed...".format(model_name))
864
865 if total_timeout is None:
866 total_timeout = 3600
867 end = time.time() + total_timeout
868 while time.time() < end:
869 models = await controller.list_models()
870 if model_name not in models:
871 self.log.debug(
872 "The model {} ({}) was destroyed".format(model_name, uuid)
873 )
874 return
875 await asyncio.sleep(5)
876 raise Exception(
877 "Timeout waiting for model {} to be destroyed".format(model_name)
878 )
879 finally:
880 await self.disconnect_controller(controller)
881
882 async def destroy_application(self, model: Model, application_name: str):
883 """
884 Destroy application
885
886 :param: model: Model object
887 :param: application_name: Application name
888 """
889 self.log.debug(
890 "Destroying application {} in model {}".format(
891 application_name, model.info.name
892 )
893 )
894 application = model.applications.get(application_name)
895 if application:
896 await application.destroy()
897 else:
898 self.log.warning("Application not found: {}".format(application_name))
899
900 async def _destroy_pending_machines(self, model: Model, only_manual: bool = False):
901 """
902 Destroy pending machines in a given model
903
904 :param: only_manual: Bool that indicates only manually provisioned
905 machines should be destroyed (if True), or that
906 all pending machines should be destroyed
907 """
908 status = await model.get_status()
909 for machine_id in status.machines:
910 machine_status = status.machines[machine_id]
911 if machine_status.agent_status.status == "pending":
912 if only_manual and not machine_status.instance_id.startswith("manual:"):
913 break
914 machine = model.machines[machine_id]
915 await machine.destroy(force=True)
916
917 # async def destroy_machine(
918 # self, model: Model, machine_id: str, total_timeout: float = 3600
919 # ):
920 # """
921 # Destroy machine
922
923 # :param: model: Model object
924 # :param: machine_id: Machine id
925 # :param: total_timeout: Timeout in seconds
926 # """
927 # machines = await model.get_machines()
928 # if machine_id in machines:
929 # machine = machines[machine_id]
930 # await machine.destroy(force=True)
931 # # max timeout
932 # end = time.time() + total_timeout
933
934 # # wait for machine removal
935 # machines = await model.get_machines()
936 # while machine_id in machines and time.time() < end:
937 # self.log.debug("Waiting for machine {} is destroyed".format(machine_id))
938 # await asyncio.sleep(0.5)
939 # machines = await model.get_machines()
940 # self.log.debug("Machine destroyed: {}".format(machine_id))
941 # else:
942 # self.log.debug("Machine not found: {}".format(machine_id))
943
944 async def configure_application(
945 self, model_name: str, application_name: str, config: dict = None
946 ):
947 """Configure application
948
949 :param: model_name: Model name
950 :param: application_name: Application name
951 :param: config: Config to apply to the charm
952 """
953 self.log.debug("Configuring application {}".format(application_name))
954
955 if config:
956 try:
957 controller = await self.get_controller()
958 model = await self.get_model(controller, model_name)
959 application = self._get_application(
960 model, application_name=application_name,
961 )
962 await application.set_config(config)
963 finally:
964 await self.disconnect_model(model)
965 await self.disconnect_controller(controller)
966
967 def _get_api_endpoints_db(self) -> [str]:
968 """
969 Get API Endpoints from DB
970
971 :return: List of API endpoints
972 """
973 self.log.debug("Getting endpoints from database")
974
975 juju_info = self.db.get_one(
976 DB_DATA.api_endpoints.table,
977 q_filter=DB_DATA.api_endpoints.filter,
978 fail_on_empty=False,
979 )
980 if juju_info and DB_DATA.api_endpoints.key in juju_info:
981 return juju_info[DB_DATA.api_endpoints.key]
982
983 def _update_api_endpoints_db(self, endpoints: [str]):
984 """
985 Update API endpoints in Database
986
987 :param: List of endpoints
988 """
989 self.log.debug("Saving endpoints {} in database".format(endpoints))
990
991 juju_info = self.db.get_one(
992 DB_DATA.api_endpoints.table,
993 q_filter=DB_DATA.api_endpoints.filter,
994 fail_on_empty=False,
995 )
996 # If it doesn't, then create it
997 if not juju_info:
998 try:
999 self.db.create(
1000 DB_DATA.api_endpoints.table, DB_DATA.api_endpoints.filter,
1001 )
1002 except DbException as e:
1003 # Racing condition: check if another N2VC worker has created it
1004 juju_info = self.db.get_one(
1005 DB_DATA.api_endpoints.table,
1006 q_filter=DB_DATA.api_endpoints.filter,
1007 fail_on_empty=False,
1008 )
1009 if not juju_info:
1010 raise e
1011 self.db.set_one(
1012 DB_DATA.api_endpoints.table,
1013 DB_DATA.api_endpoints.filter,
1014 {DB_DATA.api_endpoints.key: endpoints},
1015 )
1016
1017 def handle_exception(self, loop, context):
1018 # All unhandled exceptions by libjuju are handled here.
1019 pass
1020
1021 async def health_check(self, interval: float = 300.0):
1022 """
1023 Health check to make sure controller and controller_model connections are OK
1024
1025 :param: interval: Time in seconds between checks
1026 """
1027 controller = None
1028 while True:
1029 try:
1030 controller = await self.get_controller()
1031 # self.log.debug("VCA is alive")
1032 except Exception as e:
1033 self.log.error("Health check to VCA failed: {}".format(e))
1034 finally:
1035 await self.disconnect_controller(controller)
1036 await asyncio.sleep(interval)
1037
1038 async def list_models(self, contains: str = None) -> [str]:
1039 """List models with certain names
1040
1041 :param: contains: String that is contained in model name
1042
1043 :retur: [models] Returns list of model names
1044 """
1045
1046 controller = await self.get_controller()
1047 try:
1048 models = await controller.list_models()
1049 if contains:
1050 models = [model for model in models if contains in model]
1051 return models
1052 finally:
1053 await self.disconnect_controller(controller)
1054
1055 async def list_offers(self, model_name: str) -> QueryApplicationOffersResults:
1056 """List models with certain names
1057
1058 :param: model_name: Model name
1059
1060 :return: Returns list of offers
1061 """
1062
1063 controller = await self.get_controller()
1064 try:
1065 return await controller.list_offers(model_name)
1066 finally:
1067 await self.disconnect_controller(controller)
1068
1069 async def add_k8s(
1070 self,
1071 name: str,
1072 configuration: Configuration,
1073 storage_class: str,
1074 credential_name: str = None,
1075 ):
1076 """
1077 Add a Kubernetes cloud to the controller
1078
1079 Similar to the `juju add-k8s` command in the CLI
1080
1081 :param: name: Name for the K8s cloud
1082 :param: configuration: Kubernetes configuration object
1083 :param: storage_class: Storage Class to use in the cloud
1084 :param: credential_name: Storage Class to use in the cloud
1085 """
1086
1087 if not storage_class:
1088 raise Exception("storage_class must be a non-empty string")
1089 if not name:
1090 raise Exception("name must be a non-empty string")
1091 if not configuration:
1092 raise Exception("configuration must be provided")
1093
1094 endpoint = configuration.host
1095 credential = self.get_k8s_cloud_credential(configuration)
1096 ca_certificates = (
1097 [credential.attrs["ClientCertificateData"]]
1098 if "ClientCertificateData" in credential.attrs
1099 else []
1100 )
1101 cloud = client.Cloud(
1102 type_="kubernetes",
1103 auth_types=[credential.auth_type],
1104 endpoint=endpoint,
1105 ca_certificates=ca_certificates,
1106 config={
1107 "operator-storage": storage_class,
1108 "workload-storage": storage_class,
1109 },
1110 )
1111
1112 return await self.add_cloud(
1113 name, cloud, credential, credential_name=credential_name
1114 )
1115
1116 def get_k8s_cloud_credential(
1117 self, configuration: Configuration,
1118 ) -> client.CloudCredential:
1119 attrs = {}
1120 ca_cert = configuration.ssl_ca_cert or configuration.cert_file
1121 key = configuration.key_file
1122 api_key = configuration.api_key
1123 token = None
1124 username = configuration.username
1125 password = configuration.password
1126
1127 if "authorization" in api_key:
1128 authorization = api_key["authorization"]
1129 if "Bearer " in authorization:
1130 bearer_list = authorization.split(" ")
1131 if len(bearer_list) == 2:
1132 [_, token] = bearer_list
1133 else:
1134 raise JujuInvalidK8sConfiguration("unknown format of api_key")
1135 else:
1136 token = authorization
1137 if ca_cert:
1138 attrs["ClientCertificateData"] = open(ca_cert, "r").read()
1139 if key:
1140 attrs["ClientKeyData"] = open(key, "r").read()
1141 if token:
1142 if username or password:
1143 raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass")
1144 attrs["Token"] = token
1145
1146 auth_type = None
1147 if key:
1148 auth_type = "oauth2"
1149 if not token:
1150 raise JujuInvalidK8sConfiguration(
1151 "missing token for auth type {}".format(auth_type)
1152 )
1153 elif username:
1154 if not password:
1155 self.log.debug(
1156 "credential for user {} has empty password".format(username)
1157 )
1158 attrs["username"] = username
1159 attrs["password"] = password
1160 if ca_cert:
1161 auth_type = "userpasswithcert"
1162 else:
1163 auth_type = "userpass"
1164 elif ca_cert and token:
1165 auth_type = "certificate"
1166 else:
1167 raise JujuInvalidK8sConfiguration("authentication method not supported")
1168 return client.CloudCredential(auth_type=auth_type, attrs=attrs)
1169
1170 async def add_cloud(
1171 self,
1172 name: str,
1173 cloud: Cloud,
1174 credential: CloudCredential = None,
1175 credential_name: str = None,
1176 ) -> Cloud:
1177 """
1178 Add cloud to the controller
1179
1180 :param: name: Name of the cloud to be added
1181 :param: cloud: Cloud object
1182 :param: credential: CloudCredentials object for the cloud
1183 :param: credential_name: Credential name.
1184 If not defined, cloud of the name will be used.
1185 """
1186 controller = await self.get_controller()
1187 try:
1188 _ = await controller.add_cloud(name, cloud)
1189 if credential:
1190 await controller.add_credential(
1191 credential_name or name, credential=credential, cloud=name
1192 )
1193 # Need to return the object returned by the controller.add_cloud() function
1194 # I'm returning the original value now until this bug is fixed:
1195 # https://github.com/juju/python-libjuju/issues/443
1196 return cloud
1197 finally:
1198 await self.disconnect_controller(controller)
1199
1200 async def remove_cloud(self, name: str):
1201 """
1202 Remove cloud
1203
1204 :param: name: Name of the cloud to be removed
1205 """
1206 controller = await self.get_controller()
1207 try:
1208 await controller.remove_cloud(name)
1209 finally:
1210 await self.disconnect_controller(controller)
1211
1212 async def _get_leader_unit(self, application: Application) -> Unit:
1213 unit = None
1214 for u in application.units:
1215 if await u.is_leader_from_status():
1216 unit = u
1217 break
1218 return unit