Fix bug 1236: Retry if leader unit is not obtained
[osm/N2VC.git] / n2vc / libjuju.py
1 # Copyright 2020 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import logging
17 from juju.controller import Controller
18 from juju.client import client
19 import time
20
21 from juju.errors import JujuAPIError
22 from juju.model import Model
23 from juju.machine import Machine
24 from juju.application import Application
25 from juju.unit import Unit
26 from juju.client._definitions import (
27 FullStatus,
28 QueryApplicationOffersResults,
29 Cloud,
30 CloudCredential,
31 )
32 from n2vc.juju_watcher import JujuModelWatcher
33 from n2vc.provisioner import AsyncSSHProvisioner
34 from n2vc.n2vc_conn import N2VCConnector
35 from n2vc.exceptions import (
36 JujuMachineNotFound,
37 JujuApplicationNotFound,
38 JujuLeaderUnitNotFound,
39 JujuActionNotFound,
40 JujuModelAlreadyExists,
41 JujuControllerFailedConnecting,
42 JujuApplicationExists,
43 JujuInvalidK8sConfiguration,
44 )
45 from n2vc.utils import DB_DATA
46 from osm_common.dbbase import DbException
47 from kubernetes.client.configuration import Configuration
48
49
50 class Libjuju:
51 def __init__(
52 self,
53 endpoint: str,
54 api_proxy: str,
55 username: str,
56 password: str,
57 cacert: str,
58 loop: asyncio.AbstractEventLoop = None,
59 log: logging.Logger = None,
60 db: dict = None,
61 n2vc: N2VCConnector = None,
62 apt_mirror: str = None,
63 enable_os_upgrade: bool = True,
64 ):
65 """
66 Constructor
67
68 :param: endpoint: Endpoint of the juju controller (host:port)
69 :param: api_proxy: Endpoint of the juju controller - Reachable from the VNFs
70 :param: username: Juju username
71 :param: password: Juju password
72 :param: cacert: Juju CA Certificate
73 :param: loop: Asyncio loop
74 :param: log: Logger
75 :param: db: DB object
76 :param: n2vc: N2VC object
77 :param: apt_mirror: APT Mirror
78 :param: enable_os_upgrade: Enable OS Upgrade
79 """
80
81 self.log = log or logging.getLogger("Libjuju")
82 self.db = db
83 db_endpoints = self._get_api_endpoints_db()
84 self.endpoints = db_endpoints or [endpoint]
85 if db_endpoints is None:
86 self._update_api_endpoints_db(self.endpoints)
87 self.api_proxy = api_proxy
88 self.username = username
89 self.password = password
90 self.cacert = cacert
91 self.loop = loop or asyncio.get_event_loop()
92 self.n2vc = n2vc
93
94 # Generate config for models
95 self.model_config = {}
96 if apt_mirror:
97 self.model_config["apt-mirror"] = apt_mirror
98 self.model_config["enable-os-refresh-update"] = enable_os_upgrade
99 self.model_config["enable-os-upgrade"] = enable_os_upgrade
100
101 self.loop.set_exception_handler(self.handle_exception)
102 self.creating_model = asyncio.Lock(loop=self.loop)
103
104 self.models = set()
105 self.log.debug("Libjuju initialized!")
106
107 self.health_check_task = self.loop.create_task(self.health_check())
108
109 async def get_controller(self, timeout: float = 5.0) -> Controller:
110 """
111 Get controller
112
113 :param: timeout: Time in seconds to wait for controller to connect
114 """
115 controller = None
116 try:
117 controller = Controller(loop=self.loop)
118 await asyncio.wait_for(
119 controller.connect(
120 endpoint=self.endpoints,
121 username=self.username,
122 password=self.password,
123 cacert=self.cacert,
124 ),
125 timeout=timeout,
126 )
127 endpoints = await controller.api_endpoints
128 if self.endpoints != endpoints:
129 self.endpoints = endpoints
130 self._update_api_endpoints_db(self.endpoints)
131 return controller
132 except asyncio.CancelledError as e:
133 raise e
134 except Exception as e:
135 self.log.error(
136 "Failed connecting to controller: {}...".format(self.endpoints)
137 )
138 if controller:
139 await self.disconnect_controller(controller)
140 raise JujuControllerFailedConnecting(e)
141
142 async def disconnect(self):
143 """Disconnect"""
144 # Cancel health check task
145 self.health_check_task.cancel()
146 self.log.debug("Libjuju disconnected!")
147
148 async def disconnect_model(self, model: Model):
149 """
150 Disconnect model
151
152 :param: model: Model that will be disconnected
153 """
154 await model.disconnect()
155
156 async def disconnect_controller(self, controller: Controller):
157 """
158 Disconnect controller
159
160 :param: controller: Controller that will be disconnected
161 """
162 await controller.disconnect()
163
164 async def add_model(self, model_name: str, cloud_name: str, credential_name=None):
165 """
166 Create model
167
168 :param: model_name: Model name
169 :param: cloud_name: Cloud name
170 :param: credential_name: Credential name to use for adding the model
171 If not specified, same name as the cloud will be used.
172 """
173
174 # Get controller
175 controller = await self.get_controller()
176 model = None
177 try:
178 # Raise exception if model already exists
179 if await self.model_exists(model_name, controller=controller):
180 raise JujuModelAlreadyExists(
181 "Model {} already exists.".format(model_name)
182 )
183
184 # Block until other workers have finished model creation
185 while self.creating_model.locked():
186 await asyncio.sleep(0.1)
187
188 # If the model exists, return it from the controller
189 if model_name in self.models:
190 return
191
192 # Create the model
193 async with self.creating_model:
194 self.log.debug("Creating model {}".format(model_name))
195 model = await controller.add_model(
196 model_name,
197 config=self.model_config,
198 cloud_name=cloud_name,
199 credential_name=credential_name or cloud_name,
200 )
201 self.models.add(model_name)
202 finally:
203 if model:
204 await self.disconnect_model(model)
205 await self.disconnect_controller(controller)
206
207 async def get_model(
208 self, controller: Controller, model_name: str, id=None
209 ) -> Model:
210 """
211 Get model from controller
212
213 :param: controller: Controller
214 :param: model_name: Model name
215
216 :return: Model: The created Juju model object
217 """
218 return await controller.get_model(model_name)
219
220 async def model_exists(
221 self, model_name: str, controller: Controller = None
222 ) -> bool:
223 """
224 Check if model exists
225
226 :param: controller: Controller
227 :param: model_name: Model name
228
229 :return bool
230 """
231 need_to_disconnect = False
232
233 # Get controller if not passed
234 if not controller:
235 controller = await self.get_controller()
236 need_to_disconnect = True
237
238 # Check if model exists
239 try:
240 return model_name in await controller.list_models()
241 finally:
242 if need_to_disconnect:
243 await self.disconnect_controller(controller)
244
245 async def models_exist(self, model_names: [str]) -> (bool, list):
246 """
247 Check if models exists
248
249 :param: model_names: List of strings with model names
250
251 :return (bool, list[str]): (True if all models exists, List of model names that don't exist)
252 """
253 if not model_names:
254 raise Exception(
255 "model_names must be a non-empty array. Given value: {}".format(
256 model_names
257 )
258 )
259 non_existing_models = []
260 models = await self.list_models()
261 existing_models = list(set(models).intersection(model_names))
262 non_existing_models = list(set(model_names) - set(existing_models))
263
264 return (
265 len(non_existing_models) == 0,
266 non_existing_models,
267 )
268
269 async def get_model_status(self, model_name: str) -> FullStatus:
270 """
271 Get model status
272
273 :param: model_name: Model name
274
275 :return: Full status object
276 """
277 controller = await self.get_controller()
278 model = await self.get_model(controller, model_name)
279 try:
280 return await model.get_status()
281 finally:
282 await self.disconnect_model(model)
283 await self.disconnect_controller(controller)
284
285 async def create_machine(
286 self,
287 model_name: str,
288 machine_id: str = None,
289 db_dict: dict = None,
290 progress_timeout: float = None,
291 total_timeout: float = None,
292 series: str = "xenial",
293 wait: bool = True,
294 ) -> (Machine, bool):
295 """
296 Create machine
297
298 :param: model_name: Model name
299 :param: machine_id: Machine id
300 :param: db_dict: Dictionary with data of the DB to write the updates
301 :param: progress_timeout: Maximum time between two updates in the model
302 :param: total_timeout: Timeout for the entity to be active
303 :param: series: Series of the machine (xenial, bionic, focal, ...)
304 :param: wait: Wait until machine is ready
305
306 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
307 if the machine is new or it already existed
308 """
309 new = False
310 machine = None
311
312 self.log.debug(
313 "Creating machine (id={}) in model: {}".format(machine_id, model_name)
314 )
315
316 # Get controller
317 controller = await self.get_controller()
318
319 # Get model
320 model = await self.get_model(controller, model_name)
321 try:
322 if machine_id is not None:
323 self.log.debug(
324 "Searching machine (id={}) in model {}".format(
325 machine_id, model_name
326 )
327 )
328
329 # Get machines from model and get the machine with machine_id if exists
330 machines = await model.get_machines()
331 if machine_id in machines:
332 self.log.debug(
333 "Machine (id={}) found in model {}".format(
334 machine_id, model_name
335 )
336 )
337 machine = machines[machine_id]
338 else:
339 raise JujuMachineNotFound("Machine {} not found".format(machine_id))
340
341 if machine is None:
342 self.log.debug("Creating a new machine in model {}".format(model_name))
343
344 # Create machine
345 machine = await model.add_machine(
346 spec=None, constraints=None, disks=None, series=series
347 )
348 new = True
349
350 # Wait until the machine is ready
351 self.log.debug(
352 "Wait until machine {} is ready in model {}".format(
353 machine.entity_id, model_name
354 )
355 )
356 if wait:
357 await JujuModelWatcher.wait_for(
358 model=model,
359 entity=machine,
360 progress_timeout=progress_timeout,
361 total_timeout=total_timeout,
362 db_dict=db_dict,
363 n2vc=self.n2vc,
364 )
365 finally:
366 await self.disconnect_model(model)
367 await self.disconnect_controller(controller)
368
369 self.log.debug(
370 "Machine {} ready at {} in model {}".format(
371 machine.entity_id, machine.dns_name, model_name
372 )
373 )
374 return machine, new
375
376 async def provision_machine(
377 self,
378 model_name: str,
379 hostname: str,
380 username: str,
381 private_key_path: str,
382 db_dict: dict = None,
383 progress_timeout: float = None,
384 total_timeout: float = None,
385 ) -> str:
386 """
387 Manually provisioning of a machine
388
389 :param: model_name: Model name
390 :param: hostname: IP to access the machine
391 :param: username: Username to login to the machine
392 :param: private_key_path: Local path for the private key
393 :param: db_dict: Dictionary with data of the DB to write the updates
394 :param: progress_timeout: Maximum time between two updates in the model
395 :param: total_timeout: Timeout for the entity to be active
396
397 :return: (Entity): Machine id
398 """
399 self.log.debug(
400 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
401 model_name, hostname, username
402 )
403 )
404
405 # Get controller
406 controller = await self.get_controller()
407
408 # Get model
409 model = await self.get_model(controller, model_name)
410
411 try:
412 # Get provisioner
413 provisioner = AsyncSSHProvisioner(
414 host=hostname,
415 user=username,
416 private_key_path=private_key_path,
417 log=self.log,
418 )
419
420 # Provision machine
421 params = await provisioner.provision_machine()
422
423 params.jobs = ["JobHostUnits"]
424
425 self.log.debug("Adding machine to model")
426 connection = model.connection()
427 client_facade = client.ClientFacade.from_connection(connection)
428
429 results = await client_facade.AddMachines(params=[params])
430 error = results.machines[0].error
431
432 if error:
433 msg = "Error adding machine: {}".format(error.message)
434 self.log.error(msg=msg)
435 raise ValueError(msg)
436
437 machine_id = results.machines[0].machine
438
439 self.log.debug("Installing Juju agent into machine {}".format(machine_id))
440 asyncio.ensure_future(
441 provisioner.install_agent(
442 connection=connection,
443 nonce=params.nonce,
444 machine_id=machine_id,
445 proxy=self.api_proxy,
446 )
447 )
448
449 machine = None
450 for _ in range(10):
451 machine_list = await model.get_machines()
452 if machine_id in machine_list:
453 self.log.debug("Machine {} found in model!".format(machine_id))
454 machine = model.machines.get(machine_id)
455 break
456 await asyncio.sleep(2)
457
458 if machine is None:
459 msg = "Machine {} not found in model".format(machine_id)
460 self.log.error(msg=msg)
461 raise JujuMachineNotFound(msg)
462
463 self.log.debug(
464 "Wait until machine {} is ready in model {}".format(
465 machine.entity_id, model_name
466 )
467 )
468 await JujuModelWatcher.wait_for(
469 model=model,
470 entity=machine,
471 progress_timeout=progress_timeout,
472 total_timeout=total_timeout,
473 db_dict=db_dict,
474 n2vc=self.n2vc,
475 )
476 except Exception as e:
477 raise e
478 finally:
479 await self.disconnect_model(model)
480 await self.disconnect_controller(controller)
481
482 self.log.debug(
483 "Machine provisioned {} in model {}".format(machine_id, model_name)
484 )
485
486 return machine_id
487
488 async def deploy_charm(
489 self,
490 application_name: str,
491 path: str,
492 model_name: str,
493 machine_id: str,
494 db_dict: dict = None,
495 progress_timeout: float = None,
496 total_timeout: float = None,
497 config: dict = None,
498 series: str = None,
499 num_units: int = 1,
500 ):
501 """Deploy charm
502
503 :param: application_name: Application name
504 :param: path: Local path to the charm
505 :param: model_name: Model name
506 :param: machine_id ID of the machine
507 :param: db_dict: Dictionary with data of the DB to write the updates
508 :param: progress_timeout: Maximum time between two updates in the model
509 :param: total_timeout: Timeout for the entity to be active
510 :param: config: Config for the charm
511 :param: series: Series of the charm
512 :param: num_units: Number of units
513
514 :return: (juju.application.Application): Juju application
515 """
516 self.log.debug(
517 "Deploying charm {} to machine {} in model ~{}".format(
518 application_name, machine_id, model_name
519 )
520 )
521 self.log.debug("charm: {}".format(path))
522
523 # Get controller
524 controller = await self.get_controller()
525
526 # Get model
527 model = await self.get_model(controller, model_name)
528
529 try:
530 application = None
531 if application_name not in model.applications:
532
533 if machine_id is not None:
534 if machine_id not in model.machines:
535 msg = "Machine {} not found in model".format(machine_id)
536 self.log.error(msg=msg)
537 raise JujuMachineNotFound(msg)
538 machine = model.machines[machine_id]
539 series = machine.series
540
541 application = await model.deploy(
542 entity_url=path,
543 application_name=application_name,
544 channel="stable",
545 num_units=1,
546 series=series,
547 to=machine_id,
548 config=config,
549 )
550
551 self.log.debug(
552 "Wait until application {} is ready in model {}".format(
553 application_name, model_name
554 )
555 )
556 if num_units > 1:
557 for _ in range(num_units - 1):
558 m, _ = await self.create_machine(model_name, wait=False)
559 await application.add_unit(to=m.entity_id)
560
561 await JujuModelWatcher.wait_for(
562 model=model,
563 entity=application,
564 progress_timeout=progress_timeout,
565 total_timeout=total_timeout,
566 db_dict=db_dict,
567 n2vc=self.n2vc,
568 )
569 self.log.debug(
570 "Application {} is ready in model {}".format(
571 application_name, model_name
572 )
573 )
574 else:
575 raise JujuApplicationExists(
576 "Application {} exists".format(application_name)
577 )
578 finally:
579 await self.disconnect_model(model)
580 await self.disconnect_controller(controller)
581
582 return application
583
584 def _get_application(self, model: Model, application_name: str) -> Application:
585 """Get application
586
587 :param: model: Model object
588 :param: application_name: Application name
589
590 :return: juju.application.Application (or None if it doesn't exist)
591 """
592 if model.applications and application_name in model.applications:
593 return model.applications[application_name]
594
595 async def execute_action(
596 self,
597 application_name: str,
598 model_name: str,
599 action_name: str,
600 db_dict: dict = None,
601 progress_timeout: float = None,
602 total_timeout: float = None,
603 **kwargs
604 ):
605 """Execute action
606
607 :param: application_name: Application name
608 :param: model_name: Model name
609 :param: action_name: Name of the action
610 :param: db_dict: Dictionary with data of the DB to write the updates
611 :param: progress_timeout: Maximum time between two updates in the model
612 :param: total_timeout: Timeout for the entity to be active
613
614 :return: (str, str): (output and status)
615 """
616 self.log.debug(
617 "Executing action {} using params {}".format(action_name, kwargs)
618 )
619 # Get controller
620 controller = await self.get_controller()
621
622 # Get model
623 model = await self.get_model(controller, model_name)
624
625 try:
626 # Get application
627 application = self._get_application(
628 model, application_name=application_name,
629 )
630 if application is None:
631 raise JujuApplicationNotFound("Cannot execute action")
632
633 # Get leader unit
634 # Racing condition:
635 # Ocassionally, self._get_leader_unit() will return None
636 # because the leader elected hook has not been triggered yet.
637 # Therefore, we are doing some retries. If it happens again,
638 # re-open bug 1236
639 attempts = 3
640 time_between_retries = 10
641 unit = None
642 for _ in range(attempts):
643 unit = await self._get_leader_unit(application)
644 if unit is None:
645 await asyncio.sleep(time_between_retries)
646 else:
647 break
648 if unit is None:
649 raise JujuLeaderUnitNotFound(
650 "Cannot execute action: leader unit not found"
651 )
652
653 actions = await application.get_actions()
654
655 if action_name not in actions:
656 raise JujuActionNotFound(
657 "Action {} not in available actions".format(action_name)
658 )
659
660 action = await unit.run_action(action_name, **kwargs)
661
662 self.log.debug(
663 "Wait until action {} is completed in application {} (model={})".format(
664 action_name, application_name, model_name
665 )
666 )
667 await JujuModelWatcher.wait_for(
668 model=model,
669 entity=action,
670 progress_timeout=progress_timeout,
671 total_timeout=total_timeout,
672 db_dict=db_dict,
673 n2vc=self.n2vc,
674 )
675
676 output = await model.get_action_output(action_uuid=action.entity_id)
677 status = await model.get_action_status(uuid_or_prefix=action.entity_id)
678 status = (
679 status[action.entity_id] if action.entity_id in status else "failed"
680 )
681
682 self.log.debug(
683 "Action {} completed with status {} in application {} (model={})".format(
684 action_name, action.status, application_name, model_name
685 )
686 )
687 finally:
688 await self.disconnect_model(model)
689 await self.disconnect_controller(controller)
690
691 return output, status
692
693 async def get_actions(self, application_name: str, model_name: str) -> dict:
694 """Get list of actions
695
696 :param: application_name: Application name
697 :param: model_name: Model name
698
699 :return: Dict with this format
700 {
701 "action_name": "Description of the action",
702 ...
703 }
704 """
705 self.log.debug(
706 "Getting list of actions for application {}".format(application_name)
707 )
708
709 # Get controller
710 controller = await self.get_controller()
711
712 # Get model
713 model = await self.get_model(controller, model_name)
714
715 try:
716 # Get application
717 application = self._get_application(
718 model, application_name=application_name,
719 )
720
721 # Return list of actions
722 return await application.get_actions()
723
724 finally:
725 # Disconnect from model and controller
726 await self.disconnect_model(model)
727 await self.disconnect_controller(controller)
728
729 async def get_metrics(self, model_name: str, application_name: str) -> dict:
730 """Get the metrics collected by the VCA.
731
732 :param model_name The name or unique id of the network service
733 :param application_name The name of the application
734 """
735 if not model_name or not application_name:
736 raise Exception("model_name and application_name must be non-empty strings")
737 metrics = {}
738 controller = await self.get_controller()
739 model = await self.get_model(controller, model_name)
740 try:
741 application = self._get_application(model, application_name)
742 if application is not None:
743 metrics = await application.get_metrics()
744 finally:
745 self.disconnect_model(model)
746 self.disconnect_controller(controller)
747 return metrics
748
749 async def add_relation(
750 self, model_name: str, endpoint_1: str, endpoint_2: str,
751 ):
752 """Add relation
753
754 :param: model_name: Model name
755 :param: endpoint_1 First endpoint name
756 ("app:endpoint" format or directly the saas name)
757 :param: endpoint_2: Second endpoint name (^ same format)
758 """
759
760 self.log.debug("Adding relation: {} -> {}".format(endpoint_1, endpoint_2))
761
762 # Get controller
763 controller = await self.get_controller()
764
765 # Get model
766 model = await self.get_model(controller, model_name)
767
768 # Add relation
769 try:
770 await model.add_relation(endpoint_1, endpoint_2)
771 except JujuAPIError as e:
772 if "not found" in e.message:
773 self.log.warning("Relation not found: {}".format(e.message))
774 return
775 if "already exists" in e.message:
776 self.log.warning("Relation already exists: {}".format(e.message))
777 return
778 # another exception, raise it
779 raise e
780 finally:
781 await self.disconnect_model(model)
782 await self.disconnect_controller(controller)
783
784 async def consume(
785 self, offer_url: str, model_name: str,
786 ):
787 """
788 Adds a remote offer to the model. Relations can be created later using "juju relate".
789
790 :param: offer_url: Offer Url
791 :param: model_name: Model name
792
793 :raises ParseError if there's a problem parsing the offer_url
794 :raises JujuError if remote offer includes and endpoint
795 :raises JujuAPIError if the operation is not successful
796 """
797 controller = await self.get_controller()
798 model = await controller.get_model(model_name)
799
800 try:
801 await model.consume(offer_url)
802 finally:
803 await self.disconnect_model(model)
804 await self.disconnect_controller(controller)
805
806 async def destroy_model(self, model_name: str, total_timeout: float):
807 """
808 Destroy model
809
810 :param: model_name: Model name
811 :param: total_timeout: Timeout
812 """
813
814 controller = await self.get_controller()
815 model = await self.get_model(controller, model_name)
816 try:
817 self.log.debug("Destroying model {}".format(model_name))
818 uuid = model.info.uuid
819
820 # Destroy machines
821 machines = await model.get_machines()
822 for machine_id in machines:
823 try:
824 await self.destroy_machine(
825 model, machine_id=machine_id, total_timeout=total_timeout,
826 )
827 except asyncio.CancelledError:
828 raise
829 except Exception:
830 pass
831
832 # Disconnect model
833 await self.disconnect_model(model)
834
835 # Destroy model
836 if model_name in self.models:
837 self.models.remove(model_name)
838
839 await controller.destroy_model(uuid)
840
841 # Wait until model is destroyed
842 self.log.debug("Waiting for model {} to be destroyed...".format(model_name))
843 last_exception = ""
844
845 if total_timeout is None:
846 total_timeout = 3600
847 end = time.time() + total_timeout
848 while time.time() < end:
849 try:
850 models = await controller.list_models()
851 if model_name not in models:
852 self.log.debug(
853 "The model {} ({}) was destroyed".format(model_name, uuid)
854 )
855 return
856 except asyncio.CancelledError:
857 raise
858 except Exception as e:
859 last_exception = e
860 await asyncio.sleep(5)
861 raise Exception(
862 "Timeout waiting for model {} to be destroyed {}".format(
863 model_name, last_exception
864 )
865 )
866 finally:
867 await self.disconnect_controller(controller)
868
869 async def destroy_application(self, model: Model, application_name: str):
870 """
871 Destroy application
872
873 :param: model: Model object
874 :param: application_name: Application name
875 """
876 self.log.debug(
877 "Destroying application {} in model {}".format(
878 application_name, model.info.name
879 )
880 )
881 application = model.applications.get(application_name)
882 if application:
883 await application.destroy()
884 else:
885 self.log.warning("Application not found: {}".format(application_name))
886
887 async def destroy_machine(
888 self, model: Model, machine_id: str, total_timeout: float = 3600
889 ):
890 """
891 Destroy machine
892
893 :param: model: Model object
894 :param: machine_id: Machine id
895 :param: total_timeout: Timeout in seconds
896 """
897 machines = await model.get_machines()
898 if machine_id in machines:
899 machine = machines[machine_id]
900 await machine.destroy(force=True)
901 # max timeout
902 end = time.time() + total_timeout
903
904 # wait for machine removal
905 machines = await model.get_machines()
906 while machine_id in machines and time.time() < end:
907 self.log.debug("Waiting for machine {} is destroyed".format(machine_id))
908 await asyncio.sleep(0.5)
909 machines = await model.get_machines()
910 self.log.debug("Machine destroyed: {}".format(machine_id))
911 else:
912 self.log.debug("Machine not found: {}".format(machine_id))
913
914 async def configure_application(
915 self, model_name: str, application_name: str, config: dict = None
916 ):
917 """Configure application
918
919 :param: model_name: Model name
920 :param: application_name: Application name
921 :param: config: Config to apply to the charm
922 """
923 self.log.debug("Configuring application {}".format(application_name))
924
925 if config:
926 try:
927 controller = await self.get_controller()
928 model = await self.get_model(controller, model_name)
929 application = self._get_application(
930 model, application_name=application_name,
931 )
932 await application.set_config(config)
933 finally:
934 await self.disconnect_model(model)
935 await self.disconnect_controller(controller)
936
937 def _get_api_endpoints_db(self) -> [str]:
938 """
939 Get API Endpoints from DB
940
941 :return: List of API endpoints
942 """
943 self.log.debug("Getting endpoints from database")
944
945 juju_info = self.db.get_one(
946 DB_DATA.api_endpoints.table,
947 q_filter=DB_DATA.api_endpoints.filter,
948 fail_on_empty=False,
949 )
950 if juju_info and DB_DATA.api_endpoints.key in juju_info:
951 return juju_info[DB_DATA.api_endpoints.key]
952
953 def _update_api_endpoints_db(self, endpoints: [str]):
954 """
955 Update API endpoints in Database
956
957 :param: List of endpoints
958 """
959 self.log.debug("Saving endpoints {} in database".format(endpoints))
960
961 juju_info = self.db.get_one(
962 DB_DATA.api_endpoints.table,
963 q_filter=DB_DATA.api_endpoints.filter,
964 fail_on_empty=False,
965 )
966 # If it doesn't, then create it
967 if not juju_info:
968 try:
969 self.db.create(
970 DB_DATA.api_endpoints.table, DB_DATA.api_endpoints.filter,
971 )
972 except DbException as e:
973 # Racing condition: check if another N2VC worker has created it
974 juju_info = self.db.get_one(
975 DB_DATA.api_endpoints.table,
976 q_filter=DB_DATA.api_endpoints.filter,
977 fail_on_empty=False,
978 )
979 if not juju_info:
980 raise e
981 self.db.set_one(
982 DB_DATA.api_endpoints.table,
983 DB_DATA.api_endpoints.filter,
984 {DB_DATA.api_endpoints.key: endpoints},
985 )
986
987 def handle_exception(self, loop, context):
988 # All unhandled exceptions by libjuju are handled here.
989 pass
990
991 async def health_check(self, interval: float = 300.0):
992 """
993 Health check to make sure controller and controller_model connections are OK
994
995 :param: interval: Time in seconds between checks
996 """
997 while True:
998 try:
999 controller = await self.get_controller()
1000 # self.log.debug("VCA is alive")
1001 except Exception as e:
1002 self.log.error("Health check to VCA failed: {}".format(e))
1003 finally:
1004 await self.disconnect_controller(controller)
1005 await asyncio.sleep(interval)
1006
1007 async def list_models(self, contains: str = None) -> [str]:
1008 """List models with certain names
1009
1010 :param: contains: String that is contained in model name
1011
1012 :retur: [models] Returns list of model names
1013 """
1014
1015 controller = await self.get_controller()
1016 try:
1017 models = await controller.list_models()
1018 if contains:
1019 models = [model for model in models if contains in model]
1020 return models
1021 finally:
1022 await self.disconnect_controller(controller)
1023
1024 async def list_offers(self, model_name: str) -> QueryApplicationOffersResults:
1025 """List models with certain names
1026
1027 :param: model_name: Model name
1028
1029 :return: Returns list of offers
1030 """
1031
1032 controller = await self.get_controller()
1033 try:
1034 return await controller.list_offers(model_name)
1035 finally:
1036 await self.disconnect_controller(controller)
1037
1038 async def add_k8s(
1039 self,
1040 name: str,
1041 configuration: Configuration,
1042 storage_class: str,
1043 credential_name: str = None,
1044 ):
1045 """
1046 Add a Kubernetes cloud to the controller
1047
1048 Similar to the `juju add-k8s` command in the CLI
1049
1050 :param: name: Name for the K8s cloud
1051 :param: configuration: Kubernetes configuration object
1052 :param: storage_class: Storage Class to use in the cloud
1053 :param: credential_name: Storage Class to use in the cloud
1054 """
1055
1056 if not storage_class:
1057 raise Exception("storage_class must be a non-empty string")
1058 if not name:
1059 raise Exception("name must be a non-empty string")
1060 if not configuration:
1061 raise Exception("configuration must be provided")
1062
1063 endpoint = configuration.host
1064 credential = self.get_k8s_cloud_credential(configuration)
1065 ca_certificates = (
1066 [credential.attrs["ClientCertificateData"]]
1067 if "ClientCertificateData" in credential.attrs
1068 else []
1069 )
1070 cloud = client.Cloud(
1071 type_="kubernetes",
1072 auth_types=[credential.auth_type],
1073 endpoint=endpoint,
1074 ca_certificates=ca_certificates,
1075 config={
1076 "operator-storage": storage_class,
1077 "workload-storage": storage_class,
1078 },
1079 )
1080
1081 return await self.add_cloud(
1082 name, cloud, credential, credential_name=credential_name
1083 )
1084
1085 def get_k8s_cloud_credential(
1086 self, configuration: Configuration,
1087 ) -> client.CloudCredential:
1088 attrs = {}
1089 ca_cert = configuration.ssl_ca_cert or configuration.cert_file
1090 key = configuration.key_file
1091 api_key = configuration.api_key
1092 token = None
1093 username = configuration.username
1094 password = configuration.password
1095
1096 if "authorization" in api_key:
1097 authorization = api_key["authorization"]
1098 if "Bearer " in authorization:
1099 bearer_list = authorization.split(" ")
1100 if len(bearer_list) == 2:
1101 [_, token] = bearer_list
1102 else:
1103 raise JujuInvalidK8sConfiguration("unknown format of api_key")
1104 else:
1105 token = authorization
1106 if ca_cert:
1107 attrs["ClientCertificateData"] = open(ca_cert, "r").read()
1108 if key:
1109 attrs["ClientKeyData"] = open(key, "r").read()
1110 if token:
1111 if username or password:
1112 raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass")
1113 attrs["Token"] = token
1114
1115 auth_type = None
1116 if key:
1117 auth_type = "oauth2"
1118 if not token:
1119 raise JujuInvalidK8sConfiguration(
1120 "missing token for auth type {}".format(auth_type)
1121 )
1122 elif username:
1123 if not password:
1124 self.log.debug(
1125 "credential for user {} has empty password".format(username)
1126 )
1127 attrs["username"] = username
1128 attrs["password"] = password
1129 if ca_cert:
1130 auth_type = "userpasswithcert"
1131 else:
1132 auth_type = "userpass"
1133 elif ca_cert and token:
1134 auth_type = "certificate"
1135 else:
1136 raise JujuInvalidK8sConfiguration("authentication method not supported")
1137 return client.CloudCredential(auth_type=auth_type, attrs=attrs,)
1138
1139 async def add_cloud(
1140 self,
1141 name: str,
1142 cloud: Cloud,
1143 credential: CloudCredential = None,
1144 credential_name: str = None,
1145 ) -> Cloud:
1146 """
1147 Add cloud to the controller
1148
1149 :param: name: Name of the cloud to be added
1150 :param: cloud: Cloud object
1151 :param: credential: CloudCredentials object for the cloud
1152 :param: credential_name: Credential name.
1153 If not defined, cloud of the name will be used.
1154 """
1155 controller = await self.get_controller()
1156 try:
1157 _ = await controller.add_cloud(name, cloud)
1158 if credential:
1159 await controller.add_credential(
1160 credential_name or name, credential=credential, cloud=name
1161 )
1162 # Need to return the object returned by the controller.add_cloud() function
1163 # I'm returning the original value now until this bug is fixed:
1164 # https://github.com/juju/python-libjuju/issues/443
1165 return cloud
1166 finally:
1167 await self.disconnect_controller(controller)
1168
1169 async def remove_cloud(self, name: str):
1170 """
1171 Remove cloud
1172
1173 :param: name: Name of the cloud to be removed
1174 """
1175 controller = await self.get_controller()
1176 try:
1177 await controller.remove_cloud(name)
1178 finally:
1179 await self.disconnect_controller(controller)
1180
1181 async def _get_leader_unit(self, application: Application) -> Unit:
1182 unit = None
1183 for u in application.units:
1184 if await u.is_leader_from_status():
1185 unit = u
1186 break
1187 return unit