Fix bug 1263
[osm/N2VC.git] / n2vc / libjuju.py
1 # Copyright 2020 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import logging
17 from juju.controller import Controller
18 from juju.client import client
19 import time
20
21 from juju.errors import JujuAPIError
22 from juju.model import Model
23 from juju.machine import Machine
24 from juju.application import Application
25 from juju.unit import Unit
26 from juju.client._definitions import (
27 FullStatus,
28 QueryApplicationOffersResults,
29 Cloud,
30 CloudCredential,
31 )
32 from n2vc.juju_watcher import JujuModelWatcher
33 from n2vc.provisioner import AsyncSSHProvisioner
34 from n2vc.n2vc_conn import N2VCConnector
35 from n2vc.exceptions import (
36 JujuMachineNotFound,
37 JujuApplicationNotFound,
38 JujuLeaderUnitNotFound,
39 JujuActionNotFound,
40 JujuModelAlreadyExists,
41 JujuControllerFailedConnecting,
42 JujuApplicationExists,
43 JujuInvalidK8sConfiguration,
44 )
45 from n2vc.utils import DB_DATA
46 from osm_common.dbbase import DbException
47 from kubernetes.client.configuration import Configuration
48
49
50 class Libjuju:
51 def __init__(
52 self,
53 endpoint: str,
54 api_proxy: str,
55 username: str,
56 password: str,
57 cacert: str,
58 loop: asyncio.AbstractEventLoop = None,
59 log: logging.Logger = None,
60 db: dict = None,
61 n2vc: N2VCConnector = None,
62 apt_mirror: str = None,
63 enable_os_upgrade: bool = True,
64 ):
65 """
66 Constructor
67
68 :param: endpoint: Endpoint of the juju controller (host:port)
69 :param: api_proxy: Endpoint of the juju controller - Reachable from the VNFs
70 :param: username: Juju username
71 :param: password: Juju password
72 :param: cacert: Juju CA Certificate
73 :param: loop: Asyncio loop
74 :param: log: Logger
75 :param: db: DB object
76 :param: n2vc: N2VC object
77 :param: apt_mirror: APT Mirror
78 :param: enable_os_upgrade: Enable OS Upgrade
79 """
80
81 self.log = log or logging.getLogger("Libjuju")
82 self.db = db
83 db_endpoints = self._get_api_endpoints_db()
84 self.endpoints = None
85 if (db_endpoints and endpoint not in db_endpoints) or not db_endpoints:
86 self.endpoints = [endpoint]
87 self._update_api_endpoints_db(self.endpoints)
88 else:
89 self.endpoints = db_endpoints
90 self.api_proxy = api_proxy
91 self.username = username
92 self.password = password
93 self.cacert = cacert
94 self.loop = loop or asyncio.get_event_loop()
95 self.n2vc = n2vc
96
97 # Generate config for models
98 self.model_config = {}
99 if apt_mirror:
100 self.model_config["apt-mirror"] = apt_mirror
101 self.model_config["enable-os-refresh-update"] = enable_os_upgrade
102 self.model_config["enable-os-upgrade"] = enable_os_upgrade
103
104 self.loop.set_exception_handler(self.handle_exception)
105 self.creating_model = asyncio.Lock(loop=self.loop)
106
107 self.models = set()
108 self.log.debug("Libjuju initialized!")
109
110 self.health_check_task = self._create_health_check_task()
111
112 def _create_health_check_task(self):
113 return self.loop.create_task(self.health_check())
114
115 async def get_controller(self, timeout: float = 5.0) -> Controller:
116 """
117 Get controller
118
119 :param: timeout: Time in seconds to wait for controller to connect
120 """
121 controller = None
122 try:
123 controller = Controller(loop=self.loop)
124 await asyncio.wait_for(
125 controller.connect(
126 endpoint=self.endpoints,
127 username=self.username,
128 password=self.password,
129 cacert=self.cacert,
130 ),
131 timeout=timeout,
132 )
133 endpoints = await controller.api_endpoints
134 if self.endpoints != endpoints:
135 self.endpoints = endpoints
136 self._update_api_endpoints_db(self.endpoints)
137 return controller
138 except asyncio.CancelledError as e:
139 raise e
140 except Exception as e:
141 self.log.error(
142 "Failed connecting to controller: {}...".format(self.endpoints)
143 )
144 if controller:
145 await self.disconnect_controller(controller)
146 raise JujuControllerFailedConnecting(e)
147
148 async def disconnect(self):
149 """Disconnect"""
150 # Cancel health check task
151 self.health_check_task.cancel()
152 self.log.debug("Libjuju disconnected!")
153
154 async def disconnect_model(self, model: Model):
155 """
156 Disconnect model
157
158 :param: model: Model that will be disconnected
159 """
160 await model.disconnect()
161
162 async def disconnect_controller(self, controller: Controller):
163 """
164 Disconnect controller
165
166 :param: controller: Controller that will be disconnected
167 """
168 await controller.disconnect()
169
170 async def add_model(self, model_name: str, cloud_name: str, credential_name=None):
171 """
172 Create model
173
174 :param: model_name: Model name
175 :param: cloud_name: Cloud name
176 :param: credential_name: Credential name to use for adding the model
177 If not specified, same name as the cloud will be used.
178 """
179
180 # Get controller
181 controller = await self.get_controller()
182 model = None
183 try:
184 # Raise exception if model already exists
185 if await self.model_exists(model_name, controller=controller):
186 raise JujuModelAlreadyExists(
187 "Model {} already exists.".format(model_name)
188 )
189
190 # Block until other workers have finished model creation
191 while self.creating_model.locked():
192 await asyncio.sleep(0.1)
193
194 # If the model exists, return it from the controller
195 if model_name in self.models:
196 return
197
198 # Create the model
199 async with self.creating_model:
200 self.log.debug("Creating model {}".format(model_name))
201 model = await controller.add_model(
202 model_name,
203 config=self.model_config,
204 cloud_name=cloud_name,
205 credential_name=credential_name or cloud_name,
206 )
207 self.models.add(model_name)
208 finally:
209 if model:
210 await self.disconnect_model(model)
211 await self.disconnect_controller(controller)
212
213 async def get_model(
214 self, controller: Controller, model_name: str, id=None
215 ) -> Model:
216 """
217 Get model from controller
218
219 :param: controller: Controller
220 :param: model_name: Model name
221
222 :return: Model: The created Juju model object
223 """
224 return await controller.get_model(model_name)
225
226 async def model_exists(
227 self, model_name: str, controller: Controller = None
228 ) -> bool:
229 """
230 Check if model exists
231
232 :param: controller: Controller
233 :param: model_name: Model name
234
235 :return bool
236 """
237 need_to_disconnect = False
238
239 # Get controller if not passed
240 if not controller:
241 controller = await self.get_controller()
242 need_to_disconnect = True
243
244 # Check if model exists
245 try:
246 return model_name in await controller.list_models()
247 finally:
248 if need_to_disconnect:
249 await self.disconnect_controller(controller)
250
251 async def models_exist(self, model_names: [str]) -> (bool, list):
252 """
253 Check if models exists
254
255 :param: model_names: List of strings with model names
256
257 :return (bool, list[str]): (True if all models exists, List of model names that don't exist)
258 """
259 if not model_names:
260 raise Exception(
261 "model_names must be a non-empty array. Given value: {}".format(
262 model_names
263 )
264 )
265 non_existing_models = []
266 models = await self.list_models()
267 existing_models = list(set(models).intersection(model_names))
268 non_existing_models = list(set(model_names) - set(existing_models))
269
270 return (
271 len(non_existing_models) == 0,
272 non_existing_models,
273 )
274
275 async def get_model_status(self, model_name: str) -> FullStatus:
276 """
277 Get model status
278
279 :param: model_name: Model name
280
281 :return: Full status object
282 """
283 controller = await self.get_controller()
284 model = await self.get_model(controller, model_name)
285 try:
286 return await model.get_status()
287 finally:
288 await self.disconnect_model(model)
289 await self.disconnect_controller(controller)
290
291 async def create_machine(
292 self,
293 model_name: str,
294 machine_id: str = None,
295 db_dict: dict = None,
296 progress_timeout: float = None,
297 total_timeout: float = None,
298 series: str = "xenial",
299 wait: bool = True,
300 ) -> (Machine, bool):
301 """
302 Create machine
303
304 :param: model_name: Model name
305 :param: machine_id: Machine id
306 :param: db_dict: Dictionary with data of the DB to write the updates
307 :param: progress_timeout: Maximum time between two updates in the model
308 :param: total_timeout: Timeout for the entity to be active
309 :param: series: Series of the machine (xenial, bionic, focal, ...)
310 :param: wait: Wait until machine is ready
311
312 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
313 if the machine is new or it already existed
314 """
315 new = False
316 machine = None
317
318 self.log.debug(
319 "Creating machine (id={}) in model: {}".format(machine_id, model_name)
320 )
321
322 # Get controller
323 controller = await self.get_controller()
324
325 # Get model
326 model = await self.get_model(controller, model_name)
327 try:
328 if machine_id is not None:
329 self.log.debug(
330 "Searching machine (id={}) in model {}".format(
331 machine_id, model_name
332 )
333 )
334
335 # Get machines from model and get the machine with machine_id if exists
336 machines = await model.get_machines()
337 if machine_id in machines:
338 self.log.debug(
339 "Machine (id={}) found in model {}".format(
340 machine_id, model_name
341 )
342 )
343 machine = machines[machine_id]
344 else:
345 raise JujuMachineNotFound("Machine {} not found".format(machine_id))
346
347 if machine is None:
348 self.log.debug("Creating a new machine in model {}".format(model_name))
349
350 # Create machine
351 machine = await model.add_machine(
352 spec=None, constraints=None, disks=None, series=series
353 )
354 new = True
355
356 # Wait until the machine is ready
357 self.log.debug(
358 "Wait until machine {} is ready in model {}".format(
359 machine.entity_id, model_name
360 )
361 )
362 if wait:
363 await JujuModelWatcher.wait_for(
364 model=model,
365 entity=machine,
366 progress_timeout=progress_timeout,
367 total_timeout=total_timeout,
368 db_dict=db_dict,
369 n2vc=self.n2vc,
370 )
371 finally:
372 await self.disconnect_model(model)
373 await self.disconnect_controller(controller)
374
375 self.log.debug(
376 "Machine {} ready at {} in model {}".format(
377 machine.entity_id, machine.dns_name, model_name
378 )
379 )
380 return machine, new
381
382 async def provision_machine(
383 self,
384 model_name: str,
385 hostname: str,
386 username: str,
387 private_key_path: str,
388 db_dict: dict = None,
389 progress_timeout: float = None,
390 total_timeout: float = None,
391 ) -> str:
392 """
393 Manually provisioning of a machine
394
395 :param: model_name: Model name
396 :param: hostname: IP to access the machine
397 :param: username: Username to login to the machine
398 :param: private_key_path: Local path for the private key
399 :param: db_dict: Dictionary with data of the DB to write the updates
400 :param: progress_timeout: Maximum time between two updates in the model
401 :param: total_timeout: Timeout for the entity to be active
402
403 :return: (Entity): Machine id
404 """
405 self.log.debug(
406 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
407 model_name, hostname, username
408 )
409 )
410
411 # Get controller
412 controller = await self.get_controller()
413
414 # Get model
415 model = await self.get_model(controller, model_name)
416
417 try:
418 # Get provisioner
419 provisioner = AsyncSSHProvisioner(
420 host=hostname,
421 user=username,
422 private_key_path=private_key_path,
423 log=self.log,
424 )
425
426 # Provision machine
427 params = await provisioner.provision_machine()
428
429 params.jobs = ["JobHostUnits"]
430
431 self.log.debug("Adding machine to model")
432 connection = model.connection()
433 client_facade = client.ClientFacade.from_connection(connection)
434
435 results = await client_facade.AddMachines(params=[params])
436 error = results.machines[0].error
437
438 if error:
439 msg = "Error adding machine: {}".format(error.message)
440 self.log.error(msg=msg)
441 raise ValueError(msg)
442
443 machine_id = results.machines[0].machine
444
445 self.log.debug("Installing Juju agent into machine {}".format(machine_id))
446 asyncio.ensure_future(
447 provisioner.install_agent(
448 connection=connection,
449 nonce=params.nonce,
450 machine_id=machine_id,
451 proxy=self.api_proxy,
452 )
453 )
454
455 machine = None
456 for _ in range(10):
457 machine_list = await model.get_machines()
458 if machine_id in machine_list:
459 self.log.debug("Machine {} found in model!".format(machine_id))
460 machine = model.machines.get(machine_id)
461 break
462 await asyncio.sleep(2)
463
464 if machine is None:
465 msg = "Machine {} not found in model".format(machine_id)
466 self.log.error(msg=msg)
467 raise JujuMachineNotFound(msg)
468
469 self.log.debug(
470 "Wait until machine {} is ready in model {}".format(
471 machine.entity_id, model_name
472 )
473 )
474 await JujuModelWatcher.wait_for(
475 model=model,
476 entity=machine,
477 progress_timeout=progress_timeout,
478 total_timeout=total_timeout,
479 db_dict=db_dict,
480 n2vc=self.n2vc,
481 )
482 except Exception as e:
483 raise e
484 finally:
485 await self.disconnect_model(model)
486 await self.disconnect_controller(controller)
487
488 self.log.debug(
489 "Machine provisioned {} in model {}".format(machine_id, model_name)
490 )
491
492 return machine_id
493
494 async def deploy_charm(
495 self,
496 application_name: str,
497 path: str,
498 model_name: str,
499 machine_id: str,
500 db_dict: dict = None,
501 progress_timeout: float = None,
502 total_timeout: float = None,
503 config: dict = None,
504 series: str = None,
505 num_units: int = 1,
506 ):
507 """Deploy charm
508
509 :param: application_name: Application name
510 :param: path: Local path to the charm
511 :param: model_name: Model name
512 :param: machine_id ID of the machine
513 :param: db_dict: Dictionary with data of the DB to write the updates
514 :param: progress_timeout: Maximum time between two updates in the model
515 :param: total_timeout: Timeout for the entity to be active
516 :param: config: Config for the charm
517 :param: series: Series of the charm
518 :param: num_units: Number of units
519
520 :return: (juju.application.Application): Juju application
521 """
522 self.log.debug(
523 "Deploying charm {} to machine {} in model ~{}".format(
524 application_name, machine_id, model_name
525 )
526 )
527 self.log.debug("charm: {}".format(path))
528
529 # Get controller
530 controller = await self.get_controller()
531
532 # Get model
533 model = await self.get_model(controller, model_name)
534
535 try:
536 application = None
537 if application_name not in model.applications:
538
539 if machine_id is not None:
540 if machine_id not in model.machines:
541 msg = "Machine {} not found in model".format(machine_id)
542 self.log.error(msg=msg)
543 raise JujuMachineNotFound(msg)
544 machine = model.machines[machine_id]
545 series = machine.series
546
547 application = await model.deploy(
548 entity_url=path,
549 application_name=application_name,
550 channel="stable",
551 num_units=1,
552 series=series,
553 to=machine_id,
554 config=config,
555 )
556
557 self.log.debug(
558 "Wait until application {} is ready in model {}".format(
559 application_name, model_name
560 )
561 )
562 if num_units > 1:
563 for _ in range(num_units - 1):
564 m, _ = await self.create_machine(model_name, wait=False)
565 await application.add_unit(to=m.entity_id)
566
567 await JujuModelWatcher.wait_for(
568 model=model,
569 entity=application,
570 progress_timeout=progress_timeout,
571 total_timeout=total_timeout,
572 db_dict=db_dict,
573 n2vc=self.n2vc,
574 )
575 self.log.debug(
576 "Application {} is ready in model {}".format(
577 application_name, model_name
578 )
579 )
580 else:
581 raise JujuApplicationExists(
582 "Application {} exists".format(application_name)
583 )
584 finally:
585 await self.disconnect_model(model)
586 await self.disconnect_controller(controller)
587
588 return application
589
590 def _get_application(self, model: Model, application_name: str) -> Application:
591 """Get application
592
593 :param: model: Model object
594 :param: application_name: Application name
595
596 :return: juju.application.Application (or None if it doesn't exist)
597 """
598 if model.applications and application_name in model.applications:
599 return model.applications[application_name]
600
601 async def execute_action(
602 self,
603 application_name: str,
604 model_name: str,
605 action_name: str,
606 db_dict: dict = None,
607 progress_timeout: float = None,
608 total_timeout: float = None,
609 **kwargs
610 ):
611 """Execute action
612
613 :param: application_name: Application name
614 :param: model_name: Model name
615 :param: action_name: Name of the action
616 :param: db_dict: Dictionary with data of the DB to write the updates
617 :param: progress_timeout: Maximum time between two updates in the model
618 :param: total_timeout: Timeout for the entity to be active
619
620 :return: (str, str): (output and status)
621 """
622 self.log.debug(
623 "Executing action {} using params {}".format(action_name, kwargs)
624 )
625 # Get controller
626 controller = await self.get_controller()
627
628 # Get model
629 model = await self.get_model(controller, model_name)
630
631 try:
632 # Get application
633 application = self._get_application(
634 model, application_name=application_name,
635 )
636 if application is None:
637 raise JujuApplicationNotFound("Cannot execute action")
638
639 # Get leader unit
640 # Racing condition:
641 # Ocassionally, self._get_leader_unit() will return None
642 # because the leader elected hook has not been triggered yet.
643 # Therefore, we are doing some retries. If it happens again,
644 # re-open bug 1236
645 attempts = 3
646 time_between_retries = 10
647 unit = None
648 for _ in range(attempts):
649 unit = await self._get_leader_unit(application)
650 if unit is None:
651 await asyncio.sleep(time_between_retries)
652 else:
653 break
654 if unit is None:
655 raise JujuLeaderUnitNotFound(
656 "Cannot execute action: leader unit not found"
657 )
658
659 actions = await application.get_actions()
660
661 if action_name not in actions:
662 raise JujuActionNotFound(
663 "Action {} not in available actions".format(action_name)
664 )
665
666 action = await unit.run_action(action_name, **kwargs)
667
668 self.log.debug(
669 "Wait until action {} is completed in application {} (model={})".format(
670 action_name, application_name, model_name
671 )
672 )
673 await JujuModelWatcher.wait_for(
674 model=model,
675 entity=action,
676 progress_timeout=progress_timeout,
677 total_timeout=total_timeout,
678 db_dict=db_dict,
679 n2vc=self.n2vc,
680 )
681
682 output = await model.get_action_output(action_uuid=action.entity_id)
683 status = await model.get_action_status(uuid_or_prefix=action.entity_id)
684 status = (
685 status[action.entity_id] if action.entity_id in status else "failed"
686 )
687
688 self.log.debug(
689 "Action {} completed with status {} in application {} (model={})".format(
690 action_name, action.status, application_name, model_name
691 )
692 )
693 finally:
694 await self.disconnect_model(model)
695 await self.disconnect_controller(controller)
696
697 return output, status
698
699 async def get_actions(self, application_name: str, model_name: str) -> dict:
700 """Get list of actions
701
702 :param: application_name: Application name
703 :param: model_name: Model name
704
705 :return: Dict with this format
706 {
707 "action_name": "Description of the action",
708 ...
709 }
710 """
711 self.log.debug(
712 "Getting list of actions for application {}".format(application_name)
713 )
714
715 # Get controller
716 controller = await self.get_controller()
717
718 # Get model
719 model = await self.get_model(controller, model_name)
720
721 try:
722 # Get application
723 application = self._get_application(
724 model, application_name=application_name,
725 )
726
727 # Return list of actions
728 return await application.get_actions()
729
730 finally:
731 # Disconnect from model and controller
732 await self.disconnect_model(model)
733 await self.disconnect_controller(controller)
734
735 async def get_metrics(self, model_name: str, application_name: str) -> dict:
736 """Get the metrics collected by the VCA.
737
738 :param model_name The name or unique id of the network service
739 :param application_name The name of the application
740 """
741 if not model_name or not application_name:
742 raise Exception("model_name and application_name must be non-empty strings")
743 metrics = {}
744 controller = await self.get_controller()
745 model = await self.get_model(controller, model_name)
746 try:
747 application = self._get_application(model, application_name)
748 if application is not None:
749 metrics = await application.get_metrics()
750 finally:
751 self.disconnect_model(model)
752 self.disconnect_controller(controller)
753 return metrics
754
755 async def add_relation(
756 self, model_name: str, endpoint_1: str, endpoint_2: str,
757 ):
758 """Add relation
759
760 :param: model_name: Model name
761 :param: endpoint_1 First endpoint name
762 ("app:endpoint" format or directly the saas name)
763 :param: endpoint_2: Second endpoint name (^ same format)
764 """
765
766 self.log.debug("Adding relation: {} -> {}".format(endpoint_1, endpoint_2))
767
768 # Get controller
769 controller = await self.get_controller()
770
771 # Get model
772 model = await self.get_model(controller, model_name)
773
774 # Add relation
775 try:
776 await model.add_relation(endpoint_1, endpoint_2)
777 except JujuAPIError as e:
778 if "not found" in e.message:
779 self.log.warning("Relation not found: {}".format(e.message))
780 return
781 if "already exists" in e.message:
782 self.log.warning("Relation already exists: {}".format(e.message))
783 return
784 # another exception, raise it
785 raise e
786 finally:
787 await self.disconnect_model(model)
788 await self.disconnect_controller(controller)
789
790 async def consume(
791 self, offer_url: str, model_name: str,
792 ):
793 """
794 Adds a remote offer to the model. Relations can be created later using "juju relate".
795
796 :param: offer_url: Offer Url
797 :param: model_name: Model name
798
799 :raises ParseError if there's a problem parsing the offer_url
800 :raises JujuError if remote offer includes and endpoint
801 :raises JujuAPIError if the operation is not successful
802 """
803 controller = await self.get_controller()
804 model = await controller.get_model(model_name)
805
806 try:
807 await model.consume(offer_url)
808 finally:
809 await self.disconnect_model(model)
810 await self.disconnect_controller(controller)
811
812 async def destroy_model(self, model_name: str, total_timeout: float):
813 """
814 Destroy model
815
816 :param: model_name: Model name
817 :param: total_timeout: Timeout
818 """
819
820 controller = await self.get_controller()
821 model = await self.get_model(controller, model_name)
822 try:
823 self.log.debug("Destroying model {}".format(model_name))
824 uuid = model.info.uuid
825
826 # Destroy machines that are manually provisioned
827 # and still are in pending state
828 await self._destroy_pending_machines(model, only_manual=True)
829
830 # Disconnect model
831 await self.disconnect_model(model)
832
833 # Destroy model
834 if model_name in self.models:
835 self.models.remove(model_name)
836
837 await controller.destroy_model(uuid, force=True, max_wait=0)
838
839 # Wait until model is destroyed
840 self.log.debug("Waiting for model {} to be destroyed...".format(model_name))
841
842 if total_timeout is None:
843 total_timeout = 3600
844 end = time.time() + total_timeout
845 while time.time() < end:
846 models = await controller.list_models()
847 if model_name not in models:
848 self.log.debug(
849 "The model {} ({}) was destroyed".format(model_name, uuid)
850 )
851 return
852 await asyncio.sleep(5)
853 raise Exception(
854 "Timeout waiting for model {} to be destroyed".format(model_name)
855 )
856 finally:
857 await self.disconnect_controller(controller)
858
859 async def destroy_application(self, model: Model, application_name: str):
860 """
861 Destroy application
862
863 :param: model: Model object
864 :param: application_name: Application name
865 """
866 self.log.debug(
867 "Destroying application {} in model {}".format(
868 application_name, model.info.name
869 )
870 )
871 application = model.applications.get(application_name)
872 if application:
873 await application.destroy()
874 else:
875 self.log.warning("Application not found: {}".format(application_name))
876
877 async def _destroy_pending_machines(self, model: Model, only_manual: bool = False):
878 """
879 Destroy pending machines in a given model
880
881 :param: only_manual: Bool that indicates only manually provisioned
882 machines should be destroyed (if True), or that
883 all pending machines should be destroyed
884 """
885 status = await model.get_status()
886 for machine_id in status.machines:
887 machine_status = status.machines[machine_id]
888 if machine_status.agent_status.status == "pending":
889 if only_manual and not machine_status.instance_id.startswith("manual:"):
890 break
891 machine = model.machines[machine_id]
892 await machine.destroy(force=True)
893
894 # async def destroy_machine(
895 # self, model: Model, machine_id: str, total_timeout: float = 3600
896 # ):
897 # """
898 # Destroy machine
899
900 # :param: model: Model object
901 # :param: machine_id: Machine id
902 # :param: total_timeout: Timeout in seconds
903 # """
904 # machines = await model.get_machines()
905 # if machine_id in machines:
906 # machine = machines[machine_id]
907 # await machine.destroy(force=True)
908 # # max timeout
909 # end = time.time() + total_timeout
910
911 # # wait for machine removal
912 # machines = await model.get_machines()
913 # while machine_id in machines and time.time() < end:
914 # self.log.debug("Waiting for machine {} is destroyed".format(machine_id))
915 # await asyncio.sleep(0.5)
916 # machines = await model.get_machines()
917 # self.log.debug("Machine destroyed: {}".format(machine_id))
918 # else:
919 # self.log.debug("Machine not found: {}".format(machine_id))
920
921 async def configure_application(
922 self, model_name: str, application_name: str, config: dict = None
923 ):
924 """Configure application
925
926 :param: model_name: Model name
927 :param: application_name: Application name
928 :param: config: Config to apply to the charm
929 """
930 self.log.debug("Configuring application {}".format(application_name))
931
932 if config:
933 try:
934 controller = await self.get_controller()
935 model = await self.get_model(controller, model_name)
936 application = self._get_application(
937 model, application_name=application_name,
938 )
939 await application.set_config(config)
940 finally:
941 await self.disconnect_model(model)
942 await self.disconnect_controller(controller)
943
944 def _get_api_endpoints_db(self) -> [str]:
945 """
946 Get API Endpoints from DB
947
948 :return: List of API endpoints
949 """
950 self.log.debug("Getting endpoints from database")
951
952 juju_info = self.db.get_one(
953 DB_DATA.api_endpoints.table,
954 q_filter=DB_DATA.api_endpoints.filter,
955 fail_on_empty=False,
956 )
957 if juju_info and DB_DATA.api_endpoints.key in juju_info:
958 return juju_info[DB_DATA.api_endpoints.key]
959
960 def _update_api_endpoints_db(self, endpoints: [str]):
961 """
962 Update API endpoints in Database
963
964 :param: List of endpoints
965 """
966 self.log.debug("Saving endpoints {} in database".format(endpoints))
967
968 juju_info = self.db.get_one(
969 DB_DATA.api_endpoints.table,
970 q_filter=DB_DATA.api_endpoints.filter,
971 fail_on_empty=False,
972 )
973 # If it doesn't, then create it
974 if not juju_info:
975 try:
976 self.db.create(
977 DB_DATA.api_endpoints.table, DB_DATA.api_endpoints.filter,
978 )
979 except DbException as e:
980 # Racing condition: check if another N2VC worker has created it
981 juju_info = self.db.get_one(
982 DB_DATA.api_endpoints.table,
983 q_filter=DB_DATA.api_endpoints.filter,
984 fail_on_empty=False,
985 )
986 if not juju_info:
987 raise e
988 self.db.set_one(
989 DB_DATA.api_endpoints.table,
990 DB_DATA.api_endpoints.filter,
991 {DB_DATA.api_endpoints.key: endpoints},
992 )
993
994 def handle_exception(self, loop, context):
995 # All unhandled exceptions by libjuju are handled here.
996 pass
997
998 async def health_check(self, interval: float = 300.0):
999 """
1000 Health check to make sure controller and controller_model connections are OK
1001
1002 :param: interval: Time in seconds between checks
1003 """
1004 while True:
1005 try:
1006 controller = await self.get_controller()
1007 # self.log.debug("VCA is alive")
1008 except Exception as e:
1009 self.log.error("Health check to VCA failed: {}".format(e))
1010 finally:
1011 await self.disconnect_controller(controller)
1012 await asyncio.sleep(interval)
1013
1014 async def list_models(self, contains: str = None) -> [str]:
1015 """List models with certain names
1016
1017 :param: contains: String that is contained in model name
1018
1019 :retur: [models] Returns list of model names
1020 """
1021
1022 controller = await self.get_controller()
1023 try:
1024 models = await controller.list_models()
1025 if contains:
1026 models = [model for model in models if contains in model]
1027 return models
1028 finally:
1029 await self.disconnect_controller(controller)
1030
1031 async def list_offers(self, model_name: str) -> QueryApplicationOffersResults:
1032 """List models with certain names
1033
1034 :param: model_name: Model name
1035
1036 :return: Returns list of offers
1037 """
1038
1039 controller = await self.get_controller()
1040 try:
1041 return await controller.list_offers(model_name)
1042 finally:
1043 await self.disconnect_controller(controller)
1044
1045 async def add_k8s(
1046 self,
1047 name: str,
1048 configuration: Configuration,
1049 storage_class: str,
1050 credential_name: str = None,
1051 ):
1052 """
1053 Add a Kubernetes cloud to the controller
1054
1055 Similar to the `juju add-k8s` command in the CLI
1056
1057 :param: name: Name for the K8s cloud
1058 :param: configuration: Kubernetes configuration object
1059 :param: storage_class: Storage Class to use in the cloud
1060 :param: credential_name: Storage Class to use in the cloud
1061 """
1062
1063 if not storage_class:
1064 raise Exception("storage_class must be a non-empty string")
1065 if not name:
1066 raise Exception("name must be a non-empty string")
1067 if not configuration:
1068 raise Exception("configuration must be provided")
1069
1070 endpoint = configuration.host
1071 credential = self.get_k8s_cloud_credential(configuration)
1072 ca_certificates = (
1073 [credential.attrs["ClientCertificateData"]]
1074 if "ClientCertificateData" in credential.attrs
1075 else []
1076 )
1077 cloud = client.Cloud(
1078 type_="kubernetes",
1079 auth_types=[credential.auth_type],
1080 endpoint=endpoint,
1081 ca_certificates=ca_certificates,
1082 config={
1083 "operator-storage": storage_class,
1084 "workload-storage": storage_class,
1085 },
1086 )
1087
1088 return await self.add_cloud(
1089 name, cloud, credential, credential_name=credential_name
1090 )
1091
1092 def get_k8s_cloud_credential(
1093 self, configuration: Configuration,
1094 ) -> client.CloudCredential:
1095 attrs = {}
1096 ca_cert = configuration.ssl_ca_cert or configuration.cert_file
1097 key = configuration.key_file
1098 api_key = configuration.api_key
1099 token = None
1100 username = configuration.username
1101 password = configuration.password
1102
1103 if "authorization" in api_key:
1104 authorization = api_key["authorization"]
1105 if "Bearer " in authorization:
1106 bearer_list = authorization.split(" ")
1107 if len(bearer_list) == 2:
1108 [_, token] = bearer_list
1109 else:
1110 raise JujuInvalidK8sConfiguration("unknown format of api_key")
1111 else:
1112 token = authorization
1113 if ca_cert:
1114 attrs["ClientCertificateData"] = open(ca_cert, "r").read()
1115 if key:
1116 attrs["ClientKeyData"] = open(key, "r").read()
1117 if token:
1118 if username or password:
1119 raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass")
1120 attrs["Token"] = token
1121
1122 auth_type = None
1123 if key:
1124 auth_type = "oauth2"
1125 if not token:
1126 raise JujuInvalidK8sConfiguration(
1127 "missing token for auth type {}".format(auth_type)
1128 )
1129 elif username:
1130 if not password:
1131 self.log.debug(
1132 "credential for user {} has empty password".format(username)
1133 )
1134 attrs["username"] = username
1135 attrs["password"] = password
1136 if ca_cert:
1137 auth_type = "userpasswithcert"
1138 else:
1139 auth_type = "userpass"
1140 elif ca_cert and token:
1141 auth_type = "certificate"
1142 else:
1143 raise JujuInvalidK8sConfiguration("authentication method not supported")
1144 return client.CloudCredential(auth_type=auth_type, attrs=attrs,)
1145
1146 async def add_cloud(
1147 self,
1148 name: str,
1149 cloud: Cloud,
1150 credential: CloudCredential = None,
1151 credential_name: str = None,
1152 ) -> Cloud:
1153 """
1154 Add cloud to the controller
1155
1156 :param: name: Name of the cloud to be added
1157 :param: cloud: Cloud object
1158 :param: credential: CloudCredentials object for the cloud
1159 :param: credential_name: Credential name.
1160 If not defined, cloud of the name will be used.
1161 """
1162 controller = await self.get_controller()
1163 try:
1164 _ = await controller.add_cloud(name, cloud)
1165 if credential:
1166 await controller.add_credential(
1167 credential_name or name, credential=credential, cloud=name
1168 )
1169 # Need to return the object returned by the controller.add_cloud() function
1170 # I'm returning the original value now until this bug is fixed:
1171 # https://github.com/juju/python-libjuju/issues/443
1172 return cloud
1173 finally:
1174 await self.disconnect_controller(controller)
1175
1176 async def remove_cloud(self, name: str):
1177 """
1178 Remove cloud
1179
1180 :param: name: Name of the cloud to be removed
1181 """
1182 controller = await self.get_controller()
1183 try:
1184 await controller.remove_cloud(name)
1185 finally:
1186 await self.disconnect_controller(controller)
1187
1188 async def _get_leader_unit(self, application: Application) -> Unit:
1189 unit = None
1190 for u in application.units:
1191 if await u.is_leader_from_status():
1192 unit = u
1193 break
1194 return unit