7e58deb33e5af150b3525bdb2b8593e6ee6d916c
[osm/N2VC.git] / n2vc / k8s_juju_conn.py
1 # Copyright 2019 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import os
17 import uuid
18 import yaml
19 import tempfile
20
21 from n2vc.exceptions import K8sException, N2VCBadArgumentsException
22 from n2vc.k8s_conn import K8sConnector
23 from n2vc.kubectl import Kubectl
24 from .exceptions import MethodNotImplemented
25 from n2vc.utils import base64_to_cacert
26 from n2vc.libjuju import Libjuju
27
28
29 # from juju.bundle import BundleHandler
30 # import re
31 # import ssl
32 # from .vnf import N2VC
33 class K8sJujuConnector(K8sConnector):
34 def __init__(
35 self,
36 fs: object,
37 db: object,
38 kubectl_command: str = "/usr/bin/kubectl",
39 juju_command: str = "/usr/bin/juju",
40 log: object = None,
41 loop: object = None,
42 on_update_db=None,
43 vca_config: dict = None,
44 ):
45 """
46 :param fs: file system for kubernetes and helm configuration
47 :param db: Database object
48 :param kubectl_command: path to kubectl executable
49 :param helm_command: path to helm executable
50 :param log: logger
51 :param: loop: Asyncio loop
52 """
53
54 # parent class
55 K8sConnector.__init__(
56 self,
57 db,
58 log=log,
59 on_update_db=on_update_db,
60 )
61
62 self.fs = fs
63 self.loop = loop or asyncio.get_event_loop()
64 self.log.debug("Initializing K8S Juju connector")
65
66 required_vca_config = [
67 "host",
68 "user",
69 "secret",
70 "ca_cert",
71 ]
72 if not vca_config or not all(k in vca_config for k in required_vca_config):
73 raise N2VCBadArgumentsException(
74 message="Missing arguments in vca_config: {}".format(vca_config),
75 bad_args=required_vca_config,
76 )
77 port = vca_config["port"] if "port" in vca_config else 17070
78 url = "{}:{}".format(vca_config["host"], port)
79 enable_os_upgrade = vca_config.get("enable_os_upgrade", True)
80 apt_mirror = vca_config.get("apt_mirror", None)
81 username = vca_config["user"]
82 secret = vca_config["secret"]
83 ca_cert = base64_to_cacert(vca_config["ca_cert"])
84
85 self.libjuju = Libjuju(
86 endpoint=url,
87 api_proxy=None, # Not needed for k8s charms
88 enable_os_upgrade=enable_os_upgrade,
89 apt_mirror=apt_mirror,
90 username=username,
91 password=secret,
92 cacert=ca_cert,
93 loop=self.loop,
94 log=self.log,
95 db=self.db,
96 )
97 self.log.debug("K8S Juju connector initialized")
98 # TODO: Remove these commented lines:
99 # self.authenticated = False
100 # self.models = {}
101 # self.juju_secret = ""
102
103 """Initialization"""
104
105 async def init_env(
106 self,
107 k8s_creds: str,
108 namespace: str = "kube-system",
109 reuse_cluster_uuid: str = None,
110 ) -> (str, bool):
111 """
112 It prepares a given K8s cluster environment to run Juju bundles.
113
114 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
115 '.kube/config'
116 :param namespace: optional namespace to be used for juju. By default,
117 'kube-system' will be used
118 :param reuse_cluster_uuid: existing cluster uuid for reuse
119 :return: uuid of the K8s cluster and True if connector has installed some
120 software in the cluster
121 (on error, an exception will be raised)
122 """
123
124 # """Bootstrapping
125
126 # Bootstrapping cannot be done, by design, through the API. We need to
127 # use the CLI tools.
128 # """
129
130 # """
131 # WIP: Workflow
132
133 # 1. Has the environment already been bootstrapped?
134 # - Check the database to see if we have a record for this env
135
136 # 2. If this is a new env, create it
137 # - Add the k8s cloud to Juju
138 # - Bootstrap
139 # - Record it in the database
140
141 # 3. Connect to the Juju controller for this cloud
142
143 # """
144 # cluster_uuid = reuse_cluster_uuid
145 # if not cluster_uuid:
146 # cluster_uuid = str(uuid4())
147
148 ##################################################
149 # TODO: Pull info from db based on the namespace #
150 ##################################################
151
152 ###################################################
153 # TODO: Make it idempotent, calling add-k8s and #
154 # bootstrap whenever reuse_cluster_uuid is passed #
155 # as parameter #
156 # `init_env` is called to initialize the K8s #
157 # cluster for juju. If this initialization fails, #
158 # it can be called again by LCM with the param #
159 # reuse_cluster_uuid, e.g. to try to fix it. #
160 ###################################################
161
162 # This is a new cluster, so bootstrap it
163
164 cluster_uuid = reuse_cluster_uuid or str(uuid.uuid4())
165
166 # Is a local k8s cluster?
167 # localk8s = self.is_local_k8s(k8s_creds)
168
169 # If the k8s is external, the juju controller needs a loadbalancer
170 # loadbalancer = False if localk8s else True
171
172 # Name the new k8s cloud
173 # k8s_cloud = "k8s-{}".format(cluster_uuid)
174
175 # self.log.debug("Adding k8s cloud {}".format(k8s_cloud))
176 # await self.add_k8s(k8s_cloud, k8s_creds)
177
178 # Bootstrap Juju controller
179 # self.log.debug("Bootstrapping...")
180 # await self.bootstrap(k8s_cloud, cluster_uuid, loadbalancer)
181 # self.log.debug("Bootstrap done.")
182
183 # Get the controller information
184
185 # Parse ~/.local/share/juju/controllers.yaml
186 # controllers.testing.api-endpoints|ca-cert|uuid
187 # self.log.debug("Getting controller endpoints")
188 # with open(os.path.expanduser("~/.local/share/juju/controllers.yaml")) as f:
189 # controllers = yaml.load(f, Loader=yaml.Loader)
190 # controller = controllers["controllers"][cluster_uuid]
191 # endpoints = controller["api-endpoints"]
192 # juju_endpoint = endpoints[0]
193 # juju_ca_cert = controller["ca-cert"]
194
195 # Parse ~/.local/share/juju/accounts
196 # controllers.testing.user|password
197 # self.log.debug("Getting accounts")
198 # with open(os.path.expanduser("~/.local/share/juju/accounts.yaml")) as f:
199 # controllers = yaml.load(f, Loader=yaml.Loader)
200 # controller = controllers["controllers"][cluster_uuid]
201
202 # juju_user = controller["user"]
203 # juju_secret = controller["password"]
204
205 # config = {
206 # "endpoint": juju_endpoint,
207 # "username": juju_user,
208 # "secret": juju_secret,
209 # "cacert": juju_ca_cert,
210 # "loadbalancer": loadbalancer,
211 # }
212
213 # Store the cluster configuration so it
214 # can be used for subsequent calls
215
216 kubecfg = tempfile.NamedTemporaryFile()
217 with open(kubecfg.name, "w") as kubecfg_file:
218 kubecfg_file.write(k8s_creds)
219 kubectl = Kubectl(config_file=kubecfg.name)
220 configuration = kubectl.get_configuration()
221 default_storage_class = kubectl.get_default_storage_class()
222 await self.libjuju.add_k8s(
223 name=cluster_uuid,
224 configuration=configuration,
225 storage_class=default_storage_class,
226 credential_name=self._get_credential_name(cluster_uuid),
227 )
228 # self.log.debug("Setting config")
229 # await self.set_config(cluster_uuid, config)
230
231 # Test connection
232 # controller = await self.get_controller(cluster_uuid)
233 # await controller.disconnect()
234
235 # TODO: Remove these commented lines
236 # raise Exception("EOL")
237 # self.juju_public_key = None
238 # Login to the k8s cluster
239 # if not self.authenticated:
240 # await self.login(cluster_uuid)
241
242 # We're creating a new cluster
243 # print("Getting model {}".format(self.get_namespace(cluster_uuid),
244 # cluster_uuid=cluster_uuid))
245 # model = await self.get_model(
246 # self.get_namespace(cluster_uuid),
247 # cluster_uuid=cluster_uuid
248 # )
249
250 # Disconnect from the model
251 # if model and model.is_connected():
252 # await model.disconnect()
253
254 return cluster_uuid, True
255
256 """Repo Management"""
257
258 async def repo_add(
259 self,
260 name: str,
261 url: str,
262 _type: str = "charm",
263 ):
264 raise MethodNotImplemented()
265
266 async def repo_list(self):
267 raise MethodNotImplemented()
268
269 async def repo_remove(
270 self,
271 name: str,
272 ):
273 raise MethodNotImplemented()
274
275 async def synchronize_repos(self, cluster_uuid: str, name: str):
276 """
277 Returns None as currently add_repo is not implemented
278 """
279 return None
280
281 """Reset"""
282
283 async def reset(
284 self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
285 ) -> bool:
286 """Reset a cluster
287
288 Resets the Kubernetes cluster by removing the model that represents it.
289
290 :param cluster_uuid str: The UUID of the cluster to reset
291 :return: Returns True if successful or raises an exception.
292 """
293
294 try:
295
296 # Remove k8scluster from database
297 # self.log.debug("[reset] Removing k8scluster from juju database")
298 # juju_db = self.db.get_one("admin", {"_id": "juju"})
299
300 # for k in juju_db["k8sclusters"]:
301 # if k["_id"] == cluster_uuid:
302 # juju_db["k8sclusters"].remove(k)
303 # self.db.set_one(
304 # table="admin",
305 # q_filter={"_id": "juju"},
306 # update_dict={"k8sclusters": juju_db["k8sclusters"]},
307 # )
308 # break
309
310 # Destroy the controller (via CLI)
311 # self.log.debug("[reset] Destroying controller")
312 # await self.destroy_controller(cluster_uuid)
313
314 self.log.debug("[reset] Removing k8s cloud")
315 # k8s_cloud = "k8s-{}".format(cluster_uuid)
316 # await self.remove_cloud(k8s_cloud)
317 await self.libjuju.remove_cloud(cluster_uuid)
318
319 except Exception as e:
320 self.log.debug("Caught exception during reset: {}".format(e))
321 raise e
322 return True
323 # TODO: Remove these commented lines
324 # if not self.authenticated:
325 # await self.login(cluster_uuid)
326
327 # if self.controller.is_connected():
328 # # Destroy the model
329 # namespace = self.get_namespace(cluster_uuid)
330 # if await self.has_model(namespace):
331 # self.log.debug("[reset] Destroying model")
332 # await self.controller.destroy_model(namespace, destroy_storage=True)
333
334 # # Disconnect from the controller
335 # self.log.debug("[reset] Disconnecting controller")
336 # await self.logout()
337
338 """Deployment"""
339
340 async def install(
341 self,
342 cluster_uuid: str,
343 kdu_model: str,
344 atomic: bool = True,
345 timeout: float = 1800,
346 params: dict = None,
347 db_dict: dict = None,
348 kdu_name: str = None,
349 namespace: str = None,
350 ) -> bool:
351 """Install a bundle
352
353 :param cluster_uuid str: The UUID of the cluster to install to
354 :param kdu_model str: The name or path of a bundle to install
355 :param atomic bool: If set, waits until the model is active and resets
356 the cluster on failure.
357 :param timeout int: The time, in seconds, to wait for the install
358 to finish
359 :param params dict: Key-value pairs of instantiation parameters
360 :param kdu_name: Name of the KDU instance to be installed
361 :param namespace: K8s namespace to use for the KDU instance
362
363 :return: If successful, returns ?
364 """
365 bundle = kdu_model
366
367 # controller = await self.get_controller(cluster_uuid)
368
369 ##
370 # Get or create the model, based on the NS
371 # uuid.
372
373 if not db_dict:
374 raise K8sException("db_dict must be set")
375 if not bundle:
376 raise K8sException("bundle must be set")
377
378 if bundle.startswith("cs:"):
379 pass
380 elif bundle.startswith("http"):
381 # Download the file
382 pass
383 else:
384 new_workdir = kdu_model.strip(kdu_model.split("/")[-1])
385 os.chdir(new_workdir)
386 bundle = "local:{}".format(kdu_model)
387
388 if kdu_name:
389 kdu_instance = "{}-{}".format(kdu_name, db_dict["filter"]["_id"])
390 else:
391 kdu_instance = db_dict["filter"]["_id"]
392
393 self.log.debug("Checking for model named {}".format(kdu_instance))
394
395 # Create the new model
396 self.log.debug("Adding model: {}".format(kdu_instance))
397 await self.libjuju.add_model(
398 model_name=kdu_instance,
399 cloud_name=cluster_uuid,
400 credential_name=self._get_credential_name(cluster_uuid),
401 )
402
403 # if model:
404 # TODO: Instantiation parameters
405
406 """
407 "Juju bundle that models the KDU, in any of the following ways:
408 - <juju-repo>/<juju-bundle>
409 - <juju-bundle folder under k8s_models folder in the package>
410 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder
411 in the package>
412 - <URL_where_to_fetch_juju_bundle>
413 """
414 try:
415 previous_workdir = os.getcwd()
416 except FileNotFoundError:
417 previous_workdir = "/app/storage"
418
419 self.log.debug("[install] deploying {}".format(bundle))
420 await self.libjuju.deploy(
421 bundle, model_name=kdu_instance, wait=atomic, timeout=timeout
422 )
423
424 # Get the application
425 # if atomic:
426 # # applications = model.applications
427 # self.log.debug("[install] Applications: {}".format(model.applications))
428 # for name in model.applications:
429 # self.log.debug("[install] Waiting for {} to settle".format(name))
430 # application = model.applications[name]
431 # try:
432 # # It's not enough to wait for all units to be active;
433 # # the application status needs to be active as well.
434 # self.log.debug("Waiting for all units to be active...")
435 # await model.block_until(
436 # lambda: all(
437 # unit.agent_status == "idle"
438 # and application.status in ["active", "unknown"]
439 # and unit.workload_status in ["active", "unknown"]
440 # for unit in application.units
441 # ),
442 # timeout=timeout,
443 # )
444 # self.log.debug("All units active.")
445
446 # # TODO use asyncio.TimeoutError
447 # except concurrent.futures._base.TimeoutError:
448 # os.chdir(previous_workdir)
449 # self.log.debug("[install] Timeout exceeded; resetting cluster")
450 # await self.reset(cluster_uuid)
451 # return False
452
453 # Wait for the application to be active
454 # if model.is_connected():
455 # self.log.debug("[install] Disconnecting model")
456 # await model.disconnect()
457 # await controller.disconnect()
458 os.chdir(previous_workdir)
459
460 return kdu_instance
461
462 async def instances_list(self, cluster_uuid: str) -> list:
463 """
464 returns a list of deployed releases in a cluster
465
466 :param cluster_uuid: the cluster
467 :return:
468 """
469 return []
470
471 async def upgrade(
472 self,
473 cluster_uuid: str,
474 kdu_instance: str,
475 kdu_model: str = None,
476 params: dict = None,
477 ) -> str:
478 """Upgrade a model
479
480 :param cluster_uuid str: The UUID of the cluster to upgrade
481 :param kdu_instance str: The unique name of the KDU instance
482 :param kdu_model str: The name or path of the bundle to upgrade to
483 :param params dict: Key-value pairs of instantiation parameters
484
485 :return: If successful, reference to the new revision number of the
486 KDU instance.
487 """
488
489 # TODO: Loop through the bundle and upgrade each charm individually
490
491 """
492 The API doesn't have a concept of bundle upgrades, because there are
493 many possible changes: charm revision, disk, number of units, etc.
494
495 As such, we are only supporting a limited subset of upgrades. We'll
496 upgrade the charm revision but leave storage and scale untouched.
497
498 Scale changes should happen through OSM constructs, and changes to
499 storage would require a redeployment of the service, at least in this
500 initial release.
501 """
502 raise MethodNotImplemented()
503 # TODO: Remove these commented lines
504
505 # model = await self.get_model(namespace, cluster_uuid=cluster_uuid)
506
507 # model = None
508 # namespace = self.get_namespace(cluster_uuid)
509 # controller = await self.get_controller(cluster_uuid)
510
511 # try:
512 # if namespace not in await controller.list_models():
513 # raise N2VCNotFound(message="Model {} does not exist".format(namespace))
514
515 # model = await controller.get_model(namespace)
516 # with open(kdu_model, "r") as f:
517 # bundle = yaml.safe_load(f)
518
519 # """
520 # {
521 # 'description': 'Test bundle',
522 # 'bundle': 'kubernetes',
523 # 'applications': {
524 # 'mariadb-k8s': {
525 # 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
526 # 'scale': 1,
527 # 'options': {
528 # 'password': 'manopw',
529 # 'root_password': 'osm4u',
530 # 'user': 'mano'
531 # },
532 # 'series': 'kubernetes'
533 # }
534 # }
535 # }
536 # """
537 # # TODO: This should be returned in an agreed-upon format
538 # for name in bundle["applications"]:
539 # self.log.debug(model.applications)
540 # application = model.applications[name]
541 # self.log.debug(application)
542
543 # path = bundle["applications"][name]["charm"]
544
545 # try:
546 # await application.upgrade_charm(switch=path)
547 # except juju.errors.JujuError as ex:
548 # if "already running charm" in str(ex):
549 # # We're already running this version
550 # pass
551 # finally:
552 # if model:
553 # await model.disconnect()
554 # await controller.disconnect()
555 # return True
556
557 """Rollback"""
558
559 async def rollback(
560 self,
561 cluster_uuid: str,
562 kdu_instance: str,
563 revision: int = 0,
564 ) -> str:
565 """Rollback a model
566
567 :param cluster_uuid str: The UUID of the cluster to rollback
568 :param kdu_instance str: The unique name of the KDU instance
569 :param revision int: The revision to revert to. If omitted, rolls back
570 the previous upgrade.
571
572 :return: If successful, returns the revision of active KDU instance,
573 or raises an exception
574 """
575 raise MethodNotImplemented()
576
577 """Deletion"""
578
579 async def uninstall(self, cluster_uuid: str, kdu_instance: str) -> bool:
580 """Uninstall a KDU instance
581
582 :param cluster_uuid str: The UUID of the cluster
583 :param kdu_instance str: The unique name of the KDU instance
584
585 :return: Returns True if successful, or raises an exception
586 """
587
588 # controller = await self.get_controller(cluster_uuid)
589
590 self.log.debug("[uninstall] Destroying model")
591
592 await self.libjuju.destroy_model(kdu_instance, total_timeout=3600)
593
594 # self.log.debug("[uninstall] Model destroyed and disconnecting")
595 # await controller.disconnect()
596
597 return True
598 # TODO: Remove these commented lines
599 # if not self.authenticated:
600 # self.log.debug("[uninstall] Connecting to controller")
601 # await self.login(cluster_uuid)
602
603 async def exec_primitive(
604 self,
605 cluster_uuid: str = None,
606 kdu_instance: str = None,
607 primitive_name: str = None,
608 timeout: float = 300,
609 params: dict = None,
610 db_dict: dict = None,
611 ) -> str:
612 """Exec primitive (Juju action)
613
614 :param cluster_uuid str: The UUID of the cluster
615 :param kdu_instance str: The unique name of the KDU instance
616 :param primitive_name: Name of action that will be executed
617 :param timeout: Timeout for action execution
618 :param params: Dictionary of all the parameters needed for the action
619 :db_dict: Dictionary for any additional data
620
621 :return: Returns the output of the action
622 """
623
624 # controller = await self.get_controller(cluster_uuid)
625
626 if not params or "application-name" not in params:
627 raise K8sException(
628 "Missing application-name argument, \
629 argument needed for K8s actions"
630 )
631 try:
632 self.log.debug(
633 "[exec_primitive] Getting model "
634 "kdu_instance: {}".format(kdu_instance)
635 )
636 application_name = params["application-name"]
637 actions = await self.libjuju.get_actions(application_name, kdu_instance)
638 if primitive_name not in actions:
639 raise K8sException("Primitive {} not found".format(primitive_name))
640 output, status = await self.libjuju.execute_action(
641 application_name, kdu_instance, primitive_name, **params
642 )
643 # model = await self.get_model(kdu_instance, controller=controller)
644
645 # application_name = params["application-name"]
646 # application = model.applications[application_name]
647
648 # actions = await application.get_actions()
649 # if primitive_name not in actions:
650 # raise K8sException("Primitive {} not found".format(primitive_name))
651
652 # unit = None
653 # for u in application.units:
654 # if await u.is_leader_from_status():
655 # unit = u
656 # break
657
658 # if unit is None:
659 # raise K8sException("No leader unit found to execute action")
660
661 # self.log.debug("[exec_primitive] Running action: {}".format(primitive_name))
662 # action = await unit.run_action(primitive_name, **params)
663
664 # output = await model.get_action_output(action_uuid=action.entity_id)
665 # status = await model.get_action_status(uuid_or_prefix=action.entity_id)
666
667 # status = (
668 # status[action.entity_id] if action.entity_id in status else "failed"
669 # )
670
671 if status != "completed":
672 raise K8sException(
673 "status is not completed: {} output: {}".format(status, output)
674 )
675
676 return output
677
678 except Exception as e:
679 error_msg = "Error executing primitive {}: {}".format(primitive_name, e)
680 self.log.error(error_msg)
681 raise K8sException(message=error_msg)
682 # finally:
683 # await controller.disconnect()
684 # TODO: Remove these commented lines:
685 # if not self.authenticated:
686 # self.log.debug("[exec_primitive] Connecting to controller")
687 # await self.login(cluster_uuid)
688
689 """Introspection"""
690
691 async def inspect_kdu(
692 self,
693 kdu_model: str,
694 ) -> dict:
695 """Inspect a KDU
696
697 Inspects a bundle and returns a dictionary of config parameters and
698 their default values.
699
700 :param kdu_model str: The name or path of the bundle to inspect.
701
702 :return: If successful, returns a dictionary of available parameters
703 and their default values.
704 """
705
706 kdu = {}
707 if not os.path.exists(kdu_model):
708 raise K8sException("file {} not found".format(kdu_model))
709
710 with open(kdu_model, "r") as f:
711 bundle = yaml.safe_load(f.read())
712
713 """
714 {
715 'description': 'Test bundle',
716 'bundle': 'kubernetes',
717 'applications': {
718 'mariadb-k8s': {
719 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
720 'scale': 1,
721 'options': {
722 'password': 'manopw',
723 'root_password': 'osm4u',
724 'user': 'mano'
725 },
726 'series': 'kubernetes'
727 }
728 }
729 }
730 """
731 # TODO: This should be returned in an agreed-upon format
732 kdu = bundle["applications"]
733
734 return kdu
735
736 async def help_kdu(
737 self,
738 kdu_model: str,
739 ) -> str:
740 """View the README
741
742 If available, returns the README of the bundle.
743
744 :param kdu_model str: The name or path of a bundle
745
746 :return: If found, returns the contents of the README.
747 """
748 readme = None
749
750 files = ["README", "README.txt", "README.md"]
751 path = os.path.dirname(kdu_model)
752 for file in os.listdir(path):
753 if file in files:
754 with open(file, "r") as f:
755 readme = f.read()
756 break
757
758 return readme
759
760 async def status_kdu(
761 self,
762 cluster_uuid: str,
763 kdu_instance: str,
764 ) -> dict:
765 """Get the status of the KDU
766
767 Get the current status of the KDU instance.
768
769 :param cluster_uuid str: The UUID of the cluster
770 :param kdu_instance str: The unique id of the KDU instance
771
772 :return: Returns a dictionary containing namespace, state, resources,
773 and deployment_time.
774 """
775 status = {}
776 # controller = await self.get_controller(cluster_uuid)
777 # model = await self.get_model(kdu_instance, controller=controller)
778
779 # model_status = await model.get_status()
780 # status = model_status.applications
781 model_status = await self.libjuju.get_model_status(kdu_instance)
782 for name in model_status.applications:
783 application = model_status.applications[name]
784 status[name] = {"status": application["status"]["status"]}
785
786 # await model.disconnect()
787 # await controller.disconnect()
788
789 return status
790
791 async def get_services(
792 self, cluster_uuid: str, kdu_instance: str, namespace: str
793 ) -> list:
794 """Return a list of services of a kdu_instance"""
795
796 credentials = self.get_credentials(cluster_uuid=cluster_uuid)
797
798 # config_path = "/tmp/{}".format(cluster_uuid)
799 # config_file = "{}/config".format(config_path)
800
801 # if not os.path.exists(config_path):
802 # os.makedirs(config_path)
803 # with open(config_file, "w") as f:
804 # f.write(credentials)
805
806 kubecfg = tempfile.NamedTemporaryFile()
807 with open(kubecfg.name, "w") as kubecfg_file:
808 kubecfg_file.write(credentials)
809 kubectl = Kubectl(config_file=kubecfg.name)
810
811 return kubectl.get_services(
812 field_selector="metadata.namespace={}".format(kdu_instance)
813 )
814
815 async def get_service(
816 self, cluster_uuid: str, service_name: str, namespace: str
817 ) -> object:
818 """Return data for a specific service inside a namespace"""
819
820 credentials = self.get_credentials(cluster_uuid=cluster_uuid)
821
822 # config_path = "/tmp/{}".format(cluster_uuid)
823 # config_file = "{}/config".format(config_path)
824
825 # if not os.path.exists(config_path):
826 # os.makedirs(config_path)
827 # with open(config_file, "w") as f:
828 # f.write(credentials)
829
830 kubecfg = tempfile.NamedTemporaryFile()
831 with open(kubecfg.name, "w") as kubecfg_file:
832 kubecfg_file.write(credentials)
833 kubectl = Kubectl(config_file=kubecfg.name)
834
835 return kubectl.get_services(
836 field_selector="metadata.name={},metadata.namespace={}".format(
837 service_name, namespace
838 )
839 )[0]
840
841 # Private methods
842 # async def add_k8s(self, cloud_name: str, credentials: str,) -> bool:
843 # """Add a k8s cloud to Juju
844
845 # Adds a Kubernetes cloud to Juju, so it can be bootstrapped with a
846 # Juju Controller.
847
848 # :param cloud_name str: The name of the cloud to add.
849 # :param credentials dict: A dictionary representing the output of
850 # `kubectl config view --raw`.
851
852 # :returns: True if successful, otherwise raises an exception.
853 # """
854
855 # cmd = [self.juju_command, "add-k8s", "--local", cloud_name]
856 # self.log.debug(cmd)
857
858 # process = await asyncio.create_subprocess_exec(
859 # *cmd,
860 # stdout=asyncio.subprocess.PIPE,
861 # stderr=asyncio.subprocess.PIPE,
862 # stdin=asyncio.subprocess.PIPE,
863 # )
864
865 # # Feed the process the credentials
866 # process.stdin.write(credentials.encode("utf-8"))
867 # await process.stdin.drain()
868 # process.stdin.close()
869
870 # _stdout, stderr = await process.communicate()
871
872 # return_code = process.returncode
873
874 # self.log.debug("add-k8s return code: {}".format(return_code))
875
876 # if return_code > 0:
877 # raise Exception(stderr)
878
879 # return True
880
881 # async def add_model(
882 # self, model_name: str, cluster_uuid: str, controller: Controller
883 # ) -> Model:
884 # """Adds a model to the controller
885
886 # Adds a new model to the Juju controller
887
888 # :param model_name str: The name of the model to add.
889 # :param cluster_uuid str: ID of the cluster.
890 # :param controller: Controller object in which the model will be added
891 # :returns: The juju.model.Model object of the new model upon success or
892 # raises an exception.
893 # """
894
895 # self.log.debug(
896 # "Adding model '{}' to cluster_uuid '{}'".format(model_name, cluster_uuid)
897 # )
898 # model = None
899 # try:
900 # if self.juju_public_key is not None:
901 # model = await controller.add_model(
902 # model_name, config={"authorized-keys": self.juju_public_key}
903 # )
904 # else:
905 # model = await controller.add_model(model_name)
906 # except Exception as ex:
907 # self.log.debug(ex)
908 # self.log.debug("Caught exception: {}".format(ex))
909 # pass
910
911 # return model
912
913 # async def bootstrap(
914 # self, cloud_name: str, cluster_uuid: str, loadbalancer: bool
915 # ) -> bool:
916 # """Bootstrap a Kubernetes controller
917
918 # Bootstrap a Juju controller inside the Kubernetes cluster
919
920 # :param cloud_name str: The name of the cloud.
921 # :param cluster_uuid str: The UUID of the cluster to bootstrap.
922 # :param loadbalancer bool: If the controller should use loadbalancer or not.
923 # :returns: True upon success or raises an exception.
924 # """
925
926 # if not loadbalancer:
927 # cmd = [self.juju_command, "bootstrap", cloud_name, cluster_uuid]
928 # else:
929 # """
930 # For public clusters, specify that the controller service is using a
931 # LoadBalancer.
932 # """
933 # cmd = [
934 # self.juju_command,
935 # "bootstrap",
936 # cloud_name,
937 # cluster_uuid,
938 # "--config",
939 # "controller-service-type=loadbalancer",
940 # ]
941
942 # self.log.debug(
943 # "Bootstrapping controller {} in cloud {}".format(cluster_uuid, cloud_name)
944 # )
945
946 # process = await asyncio.create_subprocess_exec(
947 # *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
948 # )
949
950 # _stdout, stderr = await process.communicate()
951
952 # return_code = process.returncode
953
954 # if return_code > 0:
955 # #
956 # if b"already exists" not in stderr:
957 # raise Exception(stderr)
958
959 # return True
960
961 # async def destroy_controller(self, cluster_uuid: str) -> bool:
962 # """Destroy a Kubernetes controller
963
964 # Destroy an existing Kubernetes controller.
965
966 # :param cluster_uuid str: The UUID of the cluster to bootstrap.
967 # :returns: True upon success or raises an exception.
968 # """
969 # cmd = [
970 # self.juju_command,
971 # "destroy-controller",
972 # "--destroy-all-models",
973 # "--destroy-storage",
974 # "-y",
975 # cluster_uuid,
976 # ]
977
978 # process = await asyncio.create_subprocess_exec(
979 # *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
980 # )
981
982 # _stdout, stderr = await process.communicate()
983
984 # return_code = process.returncode
985
986 # if return_code > 0:
987 # #
988 # if "already exists" not in stderr:
989 # raise Exception(stderr)
990
991 def get_credentials(self, cluster_uuid: str) -> str:
992 """
993 Get Cluster Kubeconfig
994 """
995 k8scluster = self.db.get_one(
996 "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
997 )
998
999 self.db.encrypt_decrypt_fields(
1000 k8scluster.get("credentials"),
1001 "decrypt",
1002 ["password", "secret"],
1003 schema_version=k8scluster["schema_version"],
1004 salt=k8scluster["_id"],
1005 )
1006
1007 return yaml.safe_dump(k8scluster.get("credentials"))
1008
1009 def _get_credential_name(self, cluster_uuid: str) -> str:
1010 """
1011 Get credential name for a k8s cloud
1012
1013 We cannot use the cluster_uuid for the credential name directly,
1014 because it cannot start with a number, it must start with a letter.
1015 Therefore, the k8s cloud credential name will be "cred-" followed
1016 by the cluster uuid.
1017
1018 :param: cluster_uuid: Cluster UUID of the kubernetes cloud (=cloud_name)
1019
1020 :return: Name to use for the credential name.
1021 """
1022 return "cred-{}".format(cluster_uuid)
1023
1024 # def get_config(self, cluster_uuid: str,) -> dict:
1025 # """Get the cluster configuration
1026
1027 # Gets the configuration of the cluster
1028
1029 # :param cluster_uuid str: The UUID of the cluster.
1030 # :return: A dict upon success, or raises an exception.
1031 # """
1032
1033 # juju_db = self.db.get_one("admin", {"_id": "juju"})
1034 # config = None
1035 # for k in juju_db["k8sclusters"]:
1036 # if k["_id"] == cluster_uuid:
1037 # config = k["config"]
1038 # self.db.encrypt_decrypt_fields(
1039 # config,
1040 # "decrypt",
1041 # ["secret", "cacert"],
1042 # schema_version="1.1",
1043 # salt=k["_id"],
1044 # )
1045 # break
1046 # if not config:
1047 # raise Exception(
1048 # "Unable to locate configuration for cluster {}".format(cluster_uuid)
1049 # )
1050 # return config
1051
1052 # async def get_model(self, model_name: str, controller: Controller) -> Model:
1053 # """Get a model from the Juju Controller.
1054
1055 # Note: Model objects returned must call disconnected() before it goes
1056 # out of scope.
1057
1058 # :param model_name str: The name of the model to get
1059 # :param controller Controller: Controller object
1060 # :return The juju.model.Model object if found, or None.
1061 # """
1062
1063 # models = await controller.list_models()
1064 # if model_name not in models:
1065 # raise N2VCNotFound("Model {} not found".format(model_name))
1066 # self.log.debug("Found model: {}".format(model_name))
1067 # return await controller.get_model(model_name)
1068
1069 def get_namespace(
1070 self,
1071 cluster_uuid: str,
1072 ) -> str:
1073 """Get the namespace UUID
1074 Gets the namespace's unique name
1075
1076 :param cluster_uuid str: The UUID of the cluster
1077 :returns: The namespace UUID, or raises an exception
1078 """
1079 # config = self.get_config(cluster_uuid)
1080
1081 # Make sure the name is in the config
1082 # if "namespace" not in config:
1083 # raise Exception("Namespace not found.")
1084
1085 # TODO: We want to make sure this is unique to the cluster, in case
1086 # the cluster is being reused.
1087 # Consider pre/appending the cluster id to the namespace string
1088 pass
1089
1090 # TODO: Remove these lines of code
1091 # async def has_model(self, model_name: str) -> bool:
1092 # """Check if a model exists in the controller
1093
1094 # Checks to see if a model exists in the connected Juju controller.
1095
1096 # :param model_name str: The name of the model
1097 # :return: A boolean indicating if the model exists
1098 # """
1099 # models = await self.controller.list_models()
1100
1101 # if model_name in models:
1102 # return True
1103 # return False
1104
1105 # def is_local_k8s(self, credentials: str,) -> bool:
1106 # """Check if a cluster is local
1107
1108 # Checks if a cluster is running in the local host
1109
1110 # :param credentials dict: A dictionary containing the k8s credentials
1111 # :returns: A boolean if the cluster is running locally
1112 # """
1113
1114 # creds = yaml.safe_load(credentials)
1115
1116 # if creds and os.getenv("OSMLCM_VCA_APIPROXY"):
1117 # for cluster in creds["clusters"]:
1118 # if "server" in cluster["cluster"]:
1119 # if os.getenv("OSMLCM_VCA_APIPROXY") in cluster["cluster"]["server"]:
1120 # return True
1121
1122 # return False
1123
1124 # async def get_controller(self, cluster_uuid):
1125 # """Login to the Juju controller."""
1126
1127 # config = self.get_config(cluster_uuid)
1128
1129 # juju_endpoint = config["endpoint"]
1130 # juju_user = config["username"]
1131 # juju_secret = config["secret"]
1132 # juju_ca_cert = config["cacert"]
1133
1134 # controller = Controller()
1135
1136 # if juju_secret:
1137 # self.log.debug(
1138 # "Connecting to controller... ws://{} as {}".format(
1139 # juju_endpoint, juju_user,
1140 # )
1141 # )
1142 # try:
1143 # await controller.connect(
1144 # endpoint=juju_endpoint,
1145 # username=juju_user,
1146 # password=juju_secret,
1147 # cacert=juju_ca_cert,
1148 # )
1149 # self.log.debug("JujuApi: Logged into controller")
1150 # return controller
1151 # except Exception as ex:
1152 # self.log.debug(ex)
1153 # self.log.debug("Caught exception: {}".format(ex))
1154 # else:
1155 # self.log.fatal("VCA credentials not configured.")
1156
1157 # TODO: Remove these commented lines
1158 # self.authenticated = False
1159 # if self.authenticated:
1160 # return
1161
1162 # self.connecting = True
1163 # juju_public_key = None
1164 # self.authenticated = True
1165 # Test: Make sure we have the credentials loaded
1166 # async def logout(self):
1167 # """Logout of the Juju controller."""
1168 # self.log.debug("[logout]")
1169 # if not self.authenticated:
1170 # return False
1171
1172 # for model in self.models:
1173 # self.log.debug("Logging out of model {}".format(model))
1174 # await self.models[model].disconnect()
1175
1176 # if self.controller:
1177 # self.log.debug("Disconnecting controller {}".format(self.controller))
1178 # await self.controller.disconnect()
1179 # self.controller = None
1180
1181 # self.authenticated = False
1182
1183 # async def remove_cloud(self, cloud_name: str,) -> bool:
1184 # """Remove a k8s cloud from Juju
1185
1186 # Removes a Kubernetes cloud from Juju.
1187
1188 # :param cloud_name str: The name of the cloud to add.
1189
1190 # :returns: True if successful, otherwise raises an exception.
1191 # """
1192
1193 # # Remove the bootstrapped controller
1194 # cmd = [self.juju_command, "remove-k8s", "--client", cloud_name]
1195 # process = await asyncio.create_subprocess_exec(
1196 # *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1197 # )
1198
1199 # _stdout, stderr = await process.communicate()
1200
1201 # return_code = process.returncode
1202
1203 # if return_code > 0:
1204 # raise Exception(stderr)
1205
1206 # # Remove the cloud from the local config
1207 # cmd = [self.juju_command, "remove-cloud", "--client", cloud_name]
1208 # process = await asyncio.create_subprocess_exec(
1209 # *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1210 # )
1211
1212 # _stdout, stderr = await process.communicate()
1213
1214 # return_code = process.returncode
1215
1216 # if return_code > 0:
1217 # raise Exception(stderr)
1218
1219 # return True
1220
1221 # async def set_config(self, cluster_uuid: str, config: dict,) -> bool:
1222 # """Save the cluster configuration
1223
1224 # Saves the cluster information to the Mongo database
1225
1226 # :param cluster_uuid str: The UUID of the cluster
1227 # :param config dict: A dictionary containing the cluster configuration
1228 # """
1229
1230 # juju_db = self.db.get_one("admin", {"_id": "juju"})
1231
1232 # k8sclusters = juju_db["k8sclusters"] if "k8sclusters" in juju_db else []
1233 # self.db.encrypt_decrypt_fields(
1234 # config,
1235 # "encrypt",
1236 # ["secret", "cacert"],
1237 # schema_version="1.1",
1238 # salt=cluster_uuid,
1239 # )
1240 # k8sclusters.append({"_id": cluster_uuid, "config": config})
1241 # self.db.set_one(
1242 # table="admin",
1243 # q_filter={"_id": "juju"},
1244 # update_dict={"k8sclusters": k8sclusters},
1245 # )