8d1fdaddbfe8dd60e4825b1c06e3247caaa2449a
[osm/N2VC.git] / n2vc / libjuju.py
1 # Copyright 2020 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import logging
17 from juju.controller import Controller
18 from juju.client import client
19 import time
20
21 from juju.errors import JujuAPIError
22 from juju.model import Model
23 from juju.machine import Machine
24 from juju.application import Application
25 from juju.unit import Unit
26 from juju.client._definitions import (
27 FullStatus,
28 QueryApplicationOffersResults,
29 Cloud,
30 CloudCredential,
31 )
32 from n2vc.juju_watcher import JujuModelWatcher
33 from n2vc.provisioner import AsyncSSHProvisioner
34 from n2vc.n2vc_conn import N2VCConnector
35 from n2vc.exceptions import (
36 JujuMachineNotFound,
37 JujuApplicationNotFound,
38 JujuLeaderUnitNotFound,
39 JujuActionNotFound,
40 JujuModelAlreadyExists,
41 JujuControllerFailedConnecting,
42 JujuApplicationExists,
43 JujuInvalidK8sConfiguration,
44 )
45 from n2vc.utils import DB_DATA
46 from osm_common.dbbase import DbException
47 from kubernetes.client.configuration import Configuration
48
49
50 class Libjuju:
51 def __init__(
52 self,
53 endpoint: str,
54 api_proxy: str,
55 username: str,
56 password: str,
57 cacert: str,
58 loop: asyncio.AbstractEventLoop = None,
59 log: logging.Logger = None,
60 db: dict = None,
61 n2vc: N2VCConnector = None,
62 apt_mirror: str = None,
63 enable_os_upgrade: bool = True,
64 ):
65 """
66 Constructor
67
68 :param: endpoint: Endpoint of the juju controller (host:port)
69 :param: api_proxy: Endpoint of the juju controller - Reachable from the VNFs
70 :param: username: Juju username
71 :param: password: Juju password
72 :param: cacert: Juju CA Certificate
73 :param: loop: Asyncio loop
74 :param: log: Logger
75 :param: db: DB object
76 :param: n2vc: N2VC object
77 :param: apt_mirror: APT Mirror
78 :param: enable_os_upgrade: Enable OS Upgrade
79 """
80
81 self.log = log or logging.getLogger("Libjuju")
82 self.db = db
83 db_endpoints = self._get_api_endpoints_db()
84 self.endpoints = db_endpoints or [endpoint]
85 if db_endpoints is None:
86 self._update_api_endpoints_db(self.endpoints)
87 self.api_proxy = api_proxy
88 self.username = username
89 self.password = password
90 self.cacert = cacert
91 self.loop = loop or asyncio.get_event_loop()
92 self.n2vc = n2vc
93
94 # Generate config for models
95 self.model_config = {}
96 if apt_mirror:
97 self.model_config["apt-mirror"] = apt_mirror
98 self.model_config["enable-os-refresh-update"] = enable_os_upgrade
99 self.model_config["enable-os-upgrade"] = enable_os_upgrade
100
101 self.loop.set_exception_handler(self.handle_exception)
102 self.creating_model = asyncio.Lock(loop=self.loop)
103
104 self.models = set()
105 self.log.debug("Libjuju initialized!")
106
107 self.health_check_task = self.loop.create_task(self.health_check())
108
109 async def get_controller(self, timeout: float = 5.0) -> Controller:
110 """
111 Get controller
112
113 :param: timeout: Time in seconds to wait for controller to connect
114 """
115 controller = None
116 try:
117 controller = Controller(loop=self.loop)
118 await asyncio.wait_for(
119 controller.connect(
120 endpoint=self.endpoints,
121 username=self.username,
122 password=self.password,
123 cacert=self.cacert,
124 ),
125 timeout=timeout,
126 )
127 endpoints = await controller.api_endpoints
128 if self.endpoints != endpoints:
129 self.endpoints = endpoints
130 self._update_api_endpoints_db(self.endpoints)
131 return controller
132 except asyncio.CancelledError as e:
133 raise e
134 except Exception as e:
135 self.log.error(
136 "Failed connecting to controller: {}...".format(self.endpoints)
137 )
138 if controller:
139 await self.disconnect_controller(controller)
140 raise JujuControllerFailedConnecting(e)
141
142 async def disconnect(self):
143 """Disconnect"""
144 # Cancel health check task
145 self.health_check_task.cancel()
146 self.log.debug("Libjuju disconnected!")
147
148 async def disconnect_model(self, model: Model):
149 """
150 Disconnect model
151
152 :param: model: Model that will be disconnected
153 """
154 await model.disconnect()
155
156 async def disconnect_controller(self, controller: Controller):
157 """
158 Disconnect controller
159
160 :param: controller: Controller that will be disconnected
161 """
162 await controller.disconnect()
163
164 async def add_model(self, model_name: str, cloud_name: str, credential_name=None):
165 """
166 Create model
167
168 :param: model_name: Model name
169 :param: cloud_name: Cloud name
170 :param: credential_name: Credential name to use for adding the model
171 If not specified, same name as the cloud will be used.
172 """
173
174 # Get controller
175 controller = await self.get_controller()
176 model = None
177 try:
178 # Raise exception if model already exists
179 if await self.model_exists(model_name, controller=controller):
180 raise JujuModelAlreadyExists(
181 "Model {} already exists.".format(model_name)
182 )
183
184 # Block until other workers have finished model creation
185 while self.creating_model.locked():
186 await asyncio.sleep(0.1)
187
188 # If the model exists, return it from the controller
189 if model_name in self.models:
190 return
191
192 # Create the model
193 async with self.creating_model:
194 self.log.debug("Creating model {}".format(model_name))
195 model = await controller.add_model(
196 model_name,
197 config=self.model_config,
198 cloud_name=cloud_name,
199 credential_name=credential_name or cloud_name,
200 )
201 self.models.add(model_name)
202 finally:
203 if model:
204 await self.disconnect_model(model)
205 await self.disconnect_controller(controller)
206
207 async def get_model(
208 self, controller: Controller, model_name: str, id=None
209 ) -> Model:
210 """
211 Get model from controller
212
213 :param: controller: Controller
214 :param: model_name: Model name
215
216 :return: Model: The created Juju model object
217 """
218 return await controller.get_model(model_name)
219
220 async def model_exists(
221 self, model_name: str, controller: Controller = None
222 ) -> bool:
223 """
224 Check if model exists
225
226 :param: controller: Controller
227 :param: model_name: Model name
228
229 :return bool
230 """
231 need_to_disconnect = False
232
233 # Get controller if not passed
234 if not controller:
235 controller = await self.get_controller()
236 need_to_disconnect = True
237
238 # Check if model exists
239 try:
240 return model_name in await controller.list_models()
241 finally:
242 if need_to_disconnect:
243 await self.disconnect_controller(controller)
244
245 async def models_exist(self, model_names: [str]) -> (bool, list):
246 """
247 Check if models exists
248
249 :param: model_names: List of strings with model names
250
251 :return (bool, list[str]): (True if all models exists, List of model names that don't exist)
252 """
253 if not model_names:
254 raise Exception(
255 "model_names must be a non-empty array. Given value: {}".format(
256 model_names
257 )
258 )
259 non_existing_models = []
260 models = await self.list_models()
261 existing_models = list(set(models).intersection(model_names))
262 non_existing_models = list(set(model_names) - set(existing_models))
263
264 return (
265 len(non_existing_models) == 0,
266 non_existing_models,
267 )
268
269 async def get_model_status(self, model_name: str) -> FullStatus:
270 """
271 Get model status
272
273 :param: model_name: Model name
274
275 :return: Full status object
276 """
277 controller = await self.get_controller()
278 model = await self.get_model(controller, model_name)
279 try:
280 return await model.get_status()
281 finally:
282 await self.disconnect_model(model)
283 await self.disconnect_controller(controller)
284
285 async def create_machine(
286 self,
287 model_name: str,
288 machine_id: str = None,
289 db_dict: dict = None,
290 progress_timeout: float = None,
291 total_timeout: float = None,
292 series: str = "xenial",
293 wait: bool = True,
294 ) -> (Machine, bool):
295 """
296 Create machine
297
298 :param: model_name: Model name
299 :param: machine_id: Machine id
300 :param: db_dict: Dictionary with data of the DB to write the updates
301 :param: progress_timeout: Maximum time between two updates in the model
302 :param: total_timeout: Timeout for the entity to be active
303 :param: series: Series of the machine (xenial, bionic, focal, ...)
304 :param: wait: Wait until machine is ready
305
306 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
307 if the machine is new or it already existed
308 """
309 new = False
310 machine = None
311
312 self.log.debug(
313 "Creating machine (id={}) in model: {}".format(machine_id, model_name)
314 )
315
316 # Get controller
317 controller = await self.get_controller()
318
319 # Get model
320 model = await self.get_model(controller, model_name)
321 try:
322 if machine_id is not None:
323 self.log.debug(
324 "Searching machine (id={}) in model {}".format(
325 machine_id, model_name
326 )
327 )
328
329 # Get machines from model and get the machine with machine_id if exists
330 machines = await model.get_machines()
331 if machine_id in machines:
332 self.log.debug(
333 "Machine (id={}) found in model {}".format(
334 machine_id, model_name
335 )
336 )
337 machine = machines[machine_id]
338 else:
339 raise JujuMachineNotFound("Machine {} not found".format(machine_id))
340
341 if machine is None:
342 self.log.debug("Creating a new machine in model {}".format(model_name))
343
344 # Create machine
345 machine = await model.add_machine(
346 spec=None, constraints=None, disks=None, series=series
347 )
348 new = True
349
350 # Wait until the machine is ready
351 self.log.debug(
352 "Wait until machine {} is ready in model {}".format(
353 machine.entity_id, model_name
354 )
355 )
356 if wait:
357 await JujuModelWatcher.wait_for(
358 model=model,
359 entity=machine,
360 progress_timeout=progress_timeout,
361 total_timeout=total_timeout,
362 db_dict=db_dict,
363 n2vc=self.n2vc,
364 )
365 finally:
366 await self.disconnect_model(model)
367 await self.disconnect_controller(controller)
368
369 self.log.debug(
370 "Machine {} ready at {} in model {}".format(
371 machine.entity_id, machine.dns_name, model_name
372 )
373 )
374 return machine, new
375
376 async def provision_machine(
377 self,
378 model_name: str,
379 hostname: str,
380 username: str,
381 private_key_path: str,
382 db_dict: dict = None,
383 progress_timeout: float = None,
384 total_timeout: float = None,
385 ) -> str:
386 """
387 Manually provisioning of a machine
388
389 :param: model_name: Model name
390 :param: hostname: IP to access the machine
391 :param: username: Username to login to the machine
392 :param: private_key_path: Local path for the private key
393 :param: db_dict: Dictionary with data of the DB to write the updates
394 :param: progress_timeout: Maximum time between two updates in the model
395 :param: total_timeout: Timeout for the entity to be active
396
397 :return: (Entity): Machine id
398 """
399 self.log.debug(
400 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
401 model_name, hostname, username
402 )
403 )
404
405 # Get controller
406 controller = await self.get_controller()
407
408 # Get model
409 model = await self.get_model(controller, model_name)
410
411 try:
412 # Get provisioner
413 provisioner = AsyncSSHProvisioner(
414 host=hostname,
415 user=username,
416 private_key_path=private_key_path,
417 log=self.log,
418 )
419
420 # Provision machine
421 params = await provisioner.provision_machine()
422
423 params.jobs = ["JobHostUnits"]
424
425 self.log.debug("Adding machine to model")
426 connection = model.connection()
427 client_facade = client.ClientFacade.from_connection(connection)
428
429 results = await client_facade.AddMachines(params=[params])
430 error = results.machines[0].error
431
432 if error:
433 msg = "Error adding machine: {}".format(error.message)
434 self.log.error(msg=msg)
435 raise ValueError(msg)
436
437 machine_id = results.machines[0].machine
438
439 self.log.debug("Installing Juju agent into machine {}".format(machine_id))
440 asyncio.ensure_future(
441 provisioner.install_agent(
442 connection=connection,
443 nonce=params.nonce,
444 machine_id=machine_id,
445 proxy=self.api_proxy,
446 )
447 )
448
449 machine = None
450 for _ in range(10):
451 machine_list = await model.get_machines()
452 if machine_id in machine_list:
453 self.log.debug("Machine {} found in model!".format(machine_id))
454 machine = model.machines.get(machine_id)
455 break
456 await asyncio.sleep(2)
457
458 if machine is None:
459 msg = "Machine {} not found in model".format(machine_id)
460 self.log.error(msg=msg)
461 raise JujuMachineNotFound(msg)
462
463 self.log.debug(
464 "Wait until machine {} is ready in model {}".format(
465 machine.entity_id, model_name
466 )
467 )
468 await JujuModelWatcher.wait_for(
469 model=model,
470 entity=machine,
471 progress_timeout=progress_timeout,
472 total_timeout=total_timeout,
473 db_dict=db_dict,
474 n2vc=self.n2vc,
475 )
476 except Exception as e:
477 raise e
478 finally:
479 await self.disconnect_model(model)
480 await self.disconnect_controller(controller)
481
482 self.log.debug(
483 "Machine provisioned {} in model {}".format(machine_id, model_name)
484 )
485
486 return machine_id
487
488 async def deploy_charm(
489 self,
490 application_name: str,
491 path: str,
492 model_name: str,
493 machine_id: str,
494 db_dict: dict = None,
495 progress_timeout: float = None,
496 total_timeout: float = None,
497 config: dict = None,
498 series: str = None,
499 num_units: int = 1,
500 ):
501 """Deploy charm
502
503 :param: application_name: Application name
504 :param: path: Local path to the charm
505 :param: model_name: Model name
506 :param: machine_id ID of the machine
507 :param: db_dict: Dictionary with data of the DB to write the updates
508 :param: progress_timeout: Maximum time between two updates in the model
509 :param: total_timeout: Timeout for the entity to be active
510 :param: config: Config for the charm
511 :param: series: Series of the charm
512 :param: num_units: Number of units
513
514 :return: (juju.application.Application): Juju application
515 """
516 self.log.debug(
517 "Deploying charm {} to machine {} in model ~{}".format(
518 application_name, machine_id, model_name
519 )
520 )
521 self.log.debug("charm: {}".format(path))
522
523 # Get controller
524 controller = await self.get_controller()
525
526 # Get model
527 model = await self.get_model(controller, model_name)
528
529 try:
530 application = None
531 if application_name not in model.applications:
532
533 if machine_id is not None:
534 if machine_id not in model.machines:
535 msg = "Machine {} not found in model".format(machine_id)
536 self.log.error(msg=msg)
537 raise JujuMachineNotFound(msg)
538 machine = model.machines[machine_id]
539 series = machine.series
540
541 application = await model.deploy(
542 entity_url=path,
543 application_name=application_name,
544 channel="stable",
545 num_units=1,
546 series=series,
547 to=machine_id,
548 config=config,
549 )
550
551 self.log.debug(
552 "Wait until application {} is ready in model {}".format(
553 application_name, model_name
554 )
555 )
556 if num_units > 1:
557 for _ in range(num_units - 1):
558 m, _ = await self.create_machine(model_name, wait=False)
559 await application.add_unit(to=m.entity_id)
560
561 await JujuModelWatcher.wait_for(
562 model=model,
563 entity=application,
564 progress_timeout=progress_timeout,
565 total_timeout=total_timeout,
566 db_dict=db_dict,
567 n2vc=self.n2vc,
568 )
569 self.log.debug(
570 "Application {} is ready in model {}".format(
571 application_name, model_name
572 )
573 )
574 else:
575 raise JujuApplicationExists(
576 "Application {} exists".format(application_name)
577 )
578 finally:
579 await self.disconnect_model(model)
580 await self.disconnect_controller(controller)
581
582 return application
583
584 def _get_application(self, model: Model, application_name: str) -> Application:
585 """Get application
586
587 :param: model: Model object
588 :param: application_name: Application name
589
590 :return: juju.application.Application (or None if it doesn't exist)
591 """
592 if model.applications and application_name in model.applications:
593 return model.applications[application_name]
594
595 async def execute_action(
596 self,
597 application_name: str,
598 model_name: str,
599 action_name: str,
600 db_dict: dict = None,
601 progress_timeout: float = None,
602 total_timeout: float = None,
603 **kwargs
604 ):
605 """Execute action
606
607 :param: application_name: Application name
608 :param: model_name: Model name
609 :param: action_name: Name of the action
610 :param: db_dict: Dictionary with data of the DB to write the updates
611 :param: progress_timeout: Maximum time between two updates in the model
612 :param: total_timeout: Timeout for the entity to be active
613
614 :return: (str, str): (output and status)
615 """
616 self.log.debug(
617 "Executing action {} using params {}".format(action_name, kwargs)
618 )
619 # Get controller
620 controller = await self.get_controller()
621
622 # Get model
623 model = await self.get_model(controller, model_name)
624
625 try:
626 # Get application
627 application = self._get_application(
628 model, application_name=application_name,
629 )
630 if application is None:
631 raise JujuApplicationNotFound("Cannot execute action")
632
633 # Get leader unit
634 # Racing condition:
635 # Ocassionally, self._get_leader_unit() will return None
636 # because the leader elected hook has not been triggered yet.
637 # Therefore, we are doing some retries. If it happens again,
638 # re-open bug 1236
639 attempts = 3
640 time_between_retries = 10
641 unit = None
642 for _ in range(attempts):
643 unit = await self._get_leader_unit(application)
644 if unit is None:
645 await asyncio.sleep(time_between_retries)
646 else:
647 break
648 if unit is None:
649 raise JujuLeaderUnitNotFound(
650 "Cannot execute action: leader unit not found"
651 )
652
653 actions = await application.get_actions()
654
655 if action_name not in actions:
656 raise JujuActionNotFound(
657 "Action {} not in available actions".format(action_name)
658 )
659
660 action = await unit.run_action(action_name, **kwargs)
661
662 self.log.debug(
663 "Wait until action {} is completed in application {} (model={})".format(
664 action_name, application_name, model_name
665 )
666 )
667 await JujuModelWatcher.wait_for(
668 model=model,
669 entity=action,
670 progress_timeout=progress_timeout,
671 total_timeout=total_timeout,
672 db_dict=db_dict,
673 n2vc=self.n2vc,
674 )
675
676 output = await model.get_action_output(action_uuid=action.entity_id)
677 status = await model.get_action_status(uuid_or_prefix=action.entity_id)
678 status = (
679 status[action.entity_id] if action.entity_id in status else "failed"
680 )
681
682 self.log.debug(
683 "Action {} completed with status {} in application {} (model={})".format(
684 action_name, action.status, application_name, model_name
685 )
686 )
687 finally:
688 await self.disconnect_model(model)
689 await self.disconnect_controller(controller)
690
691 return output, status
692
693 async def get_actions(self, application_name: str, model_name: str) -> dict:
694 """Get list of actions
695
696 :param: application_name: Application name
697 :param: model_name: Model name
698
699 :return: Dict with this format
700 {
701 "action_name": "Description of the action",
702 ...
703 }
704 """
705 self.log.debug(
706 "Getting list of actions for application {}".format(application_name)
707 )
708
709 # Get controller
710 controller = await self.get_controller()
711
712 # Get model
713 model = await self.get_model(controller, model_name)
714
715 try:
716 # Get application
717 application = self._get_application(
718 model, application_name=application_name,
719 )
720
721 # Return list of actions
722 return await application.get_actions()
723
724 finally:
725 # Disconnect from model and controller
726 await self.disconnect_model(model)
727 await self.disconnect_controller(controller)
728
729 async def get_metrics(self, model_name: str, application_name: str) -> dict:
730 """Get the metrics collected by the VCA.
731
732 :param model_name The name or unique id of the network service
733 :param application_name The name of the application
734 """
735 if not model_name or not application_name:
736 raise Exception("model_name and application_name must be non-empty strings")
737 metrics = {}
738 controller = await self.get_controller()
739 model = await self.get_model(controller, model_name)
740 try:
741 application = self._get_application(model, application_name)
742 if application is not None:
743 metrics = await application.get_metrics()
744 finally:
745 self.disconnect_model(model)
746 self.disconnect_controller(controller)
747 return metrics
748
749 async def add_relation(
750 self, model_name: str, endpoint_1: str, endpoint_2: str,
751 ):
752 """Add relation
753
754 :param: model_name: Model name
755 :param: endpoint_1 First endpoint name
756 ("app:endpoint" format or directly the saas name)
757 :param: endpoint_2: Second endpoint name (^ same format)
758 """
759
760 self.log.debug("Adding relation: {} -> {}".format(endpoint_1, endpoint_2))
761
762 # Get controller
763 controller = await self.get_controller()
764
765 # Get model
766 model = await self.get_model(controller, model_name)
767
768 # Add relation
769 try:
770 await model.add_relation(endpoint_1, endpoint_2)
771 except JujuAPIError as e:
772 if "not found" in e.message:
773 self.log.warning("Relation not found: {}".format(e.message))
774 return
775 if "already exists" in e.message:
776 self.log.warning("Relation already exists: {}".format(e.message))
777 return
778 # another exception, raise it
779 raise e
780 finally:
781 await self.disconnect_model(model)
782 await self.disconnect_controller(controller)
783
784 async def consume(
785 self, offer_url: str, model_name: str,
786 ):
787 """
788 Adds a remote offer to the model. Relations can be created later using "juju relate".
789
790 :param: offer_url: Offer Url
791 :param: model_name: Model name
792
793 :raises ParseError if there's a problem parsing the offer_url
794 :raises JujuError if remote offer includes and endpoint
795 :raises JujuAPIError if the operation is not successful
796 """
797 controller = await self.get_controller()
798 model = await controller.get_model(model_name)
799
800 try:
801 await model.consume(offer_url)
802 finally:
803 await self.disconnect_model(model)
804 await self.disconnect_controller(controller)
805
806 async def destroy_model(self, model_name: str, total_timeout: float):
807 """
808 Destroy model
809
810 :param: model_name: Model name
811 :param: total_timeout: Timeout
812 """
813
814 controller = await self.get_controller()
815 model = await self.get_model(controller, model_name)
816 try:
817 self.log.debug("Destroying model {}".format(model_name))
818 uuid = model.info.uuid
819
820 # Destroy machines that are manually provisioned
821 # and still are in pending state
822 await self._destroy_pending_machines(model, only_manual=True)
823
824 # Disconnect model
825 await self.disconnect_model(model)
826
827 # Destroy model
828 if model_name in self.models:
829 self.models.remove(model_name)
830
831 await controller.destroy_model(uuid, force=True, max_wait=0)
832
833 # Wait until model is destroyed
834 self.log.debug("Waiting for model {} to be destroyed...".format(model_name))
835
836 if total_timeout is None:
837 total_timeout = 3600
838 end = time.time() + total_timeout
839 while time.time() < end:
840 models = await controller.list_models()
841 if model_name not in models:
842 self.log.debug(
843 "The model {} ({}) was destroyed".format(model_name, uuid)
844 )
845 return
846 await asyncio.sleep(5)
847 raise Exception(
848 "Timeout waiting for model {} to be destroyed".format(model_name)
849 )
850 finally:
851 await self.disconnect_controller(controller)
852
853 async def destroy_application(self, model: Model, application_name: str):
854 """
855 Destroy application
856
857 :param: model: Model object
858 :param: application_name: Application name
859 """
860 self.log.debug(
861 "Destroying application {} in model {}".format(
862 application_name, model.info.name
863 )
864 )
865 application = model.applications.get(application_name)
866 if application:
867 await application.destroy()
868 else:
869 self.log.warning("Application not found: {}".format(application_name))
870
871 async def _destroy_pending_machines(self, model: Model, only_manual: bool = False):
872 """
873 Destroy pending machines in a given model
874
875 :param: only_manual: Bool that indicates only manually provisioned
876 machines should be destroyed (if True), or that
877 all pending machines should be destroyed
878 """
879 status = await model.get_status()
880 for machine_id in status.machines:
881 machine_status = status.machines[machine_id]
882 if machine_status.agent_status.status == "pending":
883 if only_manual and not machine_status.instance_id.startswith("manual:"):
884 break
885 machine = model.machines[machine_id]
886 await machine.destroy(force=True)
887
888 # async def destroy_machine(
889 # self, model: Model, machine_id: str, total_timeout: float = 3600
890 # ):
891 # """
892 # Destroy machine
893
894 # :param: model: Model object
895 # :param: machine_id: Machine id
896 # :param: total_timeout: Timeout in seconds
897 # """
898 # machines = await model.get_machines()
899 # if machine_id in machines:
900 # machine = machines[machine_id]
901 # await machine.destroy(force=True)
902 # # max timeout
903 # end = time.time() + total_timeout
904
905 # # wait for machine removal
906 # machines = await model.get_machines()
907 # while machine_id in machines and time.time() < end:
908 # self.log.debug("Waiting for machine {} is destroyed".format(machine_id))
909 # await asyncio.sleep(0.5)
910 # machines = await model.get_machines()
911 # self.log.debug("Machine destroyed: {}".format(machine_id))
912 # else:
913 # self.log.debug("Machine not found: {}".format(machine_id))
914
915 async def configure_application(
916 self, model_name: str, application_name: str, config: dict = None
917 ):
918 """Configure application
919
920 :param: model_name: Model name
921 :param: application_name: Application name
922 :param: config: Config to apply to the charm
923 """
924 self.log.debug("Configuring application {}".format(application_name))
925
926 if config:
927 try:
928 controller = await self.get_controller()
929 model = await self.get_model(controller, model_name)
930 application = self._get_application(
931 model, application_name=application_name,
932 )
933 await application.set_config(config)
934 finally:
935 await self.disconnect_model(model)
936 await self.disconnect_controller(controller)
937
938 def _get_api_endpoints_db(self) -> [str]:
939 """
940 Get API Endpoints from DB
941
942 :return: List of API endpoints
943 """
944 self.log.debug("Getting endpoints from database")
945
946 juju_info = self.db.get_one(
947 DB_DATA.api_endpoints.table,
948 q_filter=DB_DATA.api_endpoints.filter,
949 fail_on_empty=False,
950 )
951 if juju_info and DB_DATA.api_endpoints.key in juju_info:
952 return juju_info[DB_DATA.api_endpoints.key]
953
954 def _update_api_endpoints_db(self, endpoints: [str]):
955 """
956 Update API endpoints in Database
957
958 :param: List of endpoints
959 """
960 self.log.debug("Saving endpoints {} in database".format(endpoints))
961
962 juju_info = self.db.get_one(
963 DB_DATA.api_endpoints.table,
964 q_filter=DB_DATA.api_endpoints.filter,
965 fail_on_empty=False,
966 )
967 # If it doesn't, then create it
968 if not juju_info:
969 try:
970 self.db.create(
971 DB_DATA.api_endpoints.table, DB_DATA.api_endpoints.filter,
972 )
973 except DbException as e:
974 # Racing condition: check if another N2VC worker has created it
975 juju_info = self.db.get_one(
976 DB_DATA.api_endpoints.table,
977 q_filter=DB_DATA.api_endpoints.filter,
978 fail_on_empty=False,
979 )
980 if not juju_info:
981 raise e
982 self.db.set_one(
983 DB_DATA.api_endpoints.table,
984 DB_DATA.api_endpoints.filter,
985 {DB_DATA.api_endpoints.key: endpoints},
986 )
987
988 def handle_exception(self, loop, context):
989 # All unhandled exceptions by libjuju are handled here.
990 pass
991
992 async def health_check(self, interval: float = 300.0):
993 """
994 Health check to make sure controller and controller_model connections are OK
995
996 :param: interval: Time in seconds between checks
997 """
998 while True:
999 try:
1000 controller = await self.get_controller()
1001 # self.log.debug("VCA is alive")
1002 except Exception as e:
1003 self.log.error("Health check to VCA failed: {}".format(e))
1004 finally:
1005 await self.disconnect_controller(controller)
1006 await asyncio.sleep(interval)
1007
1008 async def list_models(self, contains: str = None) -> [str]:
1009 """List models with certain names
1010
1011 :param: contains: String that is contained in model name
1012
1013 :retur: [models] Returns list of model names
1014 """
1015
1016 controller = await self.get_controller()
1017 try:
1018 models = await controller.list_models()
1019 if contains:
1020 models = [model for model in models if contains in model]
1021 return models
1022 finally:
1023 await self.disconnect_controller(controller)
1024
1025 async def list_offers(self, model_name: str) -> QueryApplicationOffersResults:
1026 """List models with certain names
1027
1028 :param: model_name: Model name
1029
1030 :return: Returns list of offers
1031 """
1032
1033 controller = await self.get_controller()
1034 try:
1035 return await controller.list_offers(model_name)
1036 finally:
1037 await self.disconnect_controller(controller)
1038
1039 async def add_k8s(
1040 self,
1041 name: str,
1042 configuration: Configuration,
1043 storage_class: str,
1044 credential_name: str = None,
1045 ):
1046 """
1047 Add a Kubernetes cloud to the controller
1048
1049 Similar to the `juju add-k8s` command in the CLI
1050
1051 :param: name: Name for the K8s cloud
1052 :param: configuration: Kubernetes configuration object
1053 :param: storage_class: Storage Class to use in the cloud
1054 :param: credential_name: Storage Class to use in the cloud
1055 """
1056
1057 if not storage_class:
1058 raise Exception("storage_class must be a non-empty string")
1059 if not name:
1060 raise Exception("name must be a non-empty string")
1061 if not configuration:
1062 raise Exception("configuration must be provided")
1063
1064 endpoint = configuration.host
1065 credential = self.get_k8s_cloud_credential(configuration)
1066 ca_certificates = (
1067 [credential.attrs["ClientCertificateData"]]
1068 if "ClientCertificateData" in credential.attrs
1069 else []
1070 )
1071 cloud = client.Cloud(
1072 type_="kubernetes",
1073 auth_types=[credential.auth_type],
1074 endpoint=endpoint,
1075 ca_certificates=ca_certificates,
1076 config={
1077 "operator-storage": storage_class,
1078 "workload-storage": storage_class,
1079 },
1080 )
1081
1082 return await self.add_cloud(
1083 name, cloud, credential, credential_name=credential_name
1084 )
1085
1086 def get_k8s_cloud_credential(
1087 self, configuration: Configuration,
1088 ) -> client.CloudCredential:
1089 attrs = {}
1090 ca_cert = configuration.ssl_ca_cert or configuration.cert_file
1091 key = configuration.key_file
1092 api_key = configuration.api_key
1093 token = None
1094 username = configuration.username
1095 password = configuration.password
1096
1097 if "authorization" in api_key:
1098 authorization = api_key["authorization"]
1099 if "Bearer " in authorization:
1100 bearer_list = authorization.split(" ")
1101 if len(bearer_list) == 2:
1102 [_, token] = bearer_list
1103 else:
1104 raise JujuInvalidK8sConfiguration("unknown format of api_key")
1105 else:
1106 token = authorization
1107 if ca_cert:
1108 attrs["ClientCertificateData"] = open(ca_cert, "r").read()
1109 if key:
1110 attrs["ClientKeyData"] = open(key, "r").read()
1111 if token:
1112 if username or password:
1113 raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass")
1114 attrs["Token"] = token
1115
1116 auth_type = None
1117 if key:
1118 auth_type = "oauth2"
1119 if not token:
1120 raise JujuInvalidK8sConfiguration(
1121 "missing token for auth type {}".format(auth_type)
1122 )
1123 elif username:
1124 if not password:
1125 self.log.debug(
1126 "credential for user {} has empty password".format(username)
1127 )
1128 attrs["username"] = username
1129 attrs["password"] = password
1130 if ca_cert:
1131 auth_type = "userpasswithcert"
1132 else:
1133 auth_type = "userpass"
1134 elif ca_cert and token:
1135 auth_type = "certificate"
1136 else:
1137 raise JujuInvalidK8sConfiguration("authentication method not supported")
1138 return client.CloudCredential(auth_type=auth_type, attrs=attrs,)
1139
1140 async def add_cloud(
1141 self,
1142 name: str,
1143 cloud: Cloud,
1144 credential: CloudCredential = None,
1145 credential_name: str = None,
1146 ) -> Cloud:
1147 """
1148 Add cloud to the controller
1149
1150 :param: name: Name of the cloud to be added
1151 :param: cloud: Cloud object
1152 :param: credential: CloudCredentials object for the cloud
1153 :param: credential_name: Credential name.
1154 If not defined, cloud of the name will be used.
1155 """
1156 controller = await self.get_controller()
1157 try:
1158 _ = await controller.add_cloud(name, cloud)
1159 if credential:
1160 await controller.add_credential(
1161 credential_name or name, credential=credential, cloud=name
1162 )
1163 # Need to return the object returned by the controller.add_cloud() function
1164 # I'm returning the original value now until this bug is fixed:
1165 # https://github.com/juju/python-libjuju/issues/443
1166 return cloud
1167 finally:
1168 await self.disconnect_controller(controller)
1169
1170 async def remove_cloud(self, name: str):
1171 """
1172 Remove cloud
1173
1174 :param: name: Name of the cloud to be removed
1175 """
1176 controller = await self.get_controller()
1177 try:
1178 await controller.remove_cloud(name)
1179 finally:
1180 await self.disconnect_controller(controller)
1181
1182 async def _get_leader_unit(self, application: Application) -> Unit:
1183 unit = None
1184 for u in application.units:
1185 if await u.is_leader_from_status():
1186 unit = u
1187 break
1188 return unit