1 # Copyright 2019 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
17 from .exceptions
import NotImplemented
21 # from juju.bundle import BundleHandler
22 from juju
.controller
import Controller
23 from juju
.model
import Model
24 from juju
.errors
import JujuAPIError
, JujuError
28 from n2vc
.k8s_conn
import K8sConnector
33 # from .vnf import N2VC
39 class K8sJujuConnector(K8sConnector
):
45 kubectl_command
: str = '/usr/bin/kubectl',
46 juju_command
: str = '/usr/bin/juju',
52 :param kubectl_command: path to kubectl executable
53 :param helm_command: path to helm executable
54 :param fs: file system for kubernetes and helm configuration
59 K8sConnector
.__init
__(
63 on_update_db
=on_update_db
,
67 self
.info('Initializing K8S Juju connector')
69 self
.authenticated
= False
71 self
.log
= logging
.getLogger(__name__
)
73 self
.juju_command
= juju_command
76 self
.info('K8S Juju connector initialized')
82 namespace
: str = 'kube-system',
83 reuse_cluster_uuid
: str = None,
86 It prepares a given K8s cluster environment to run Juju bundles.
88 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid '.kube/config'
89 :param namespace: optional namespace to be used for juju. By default, 'kube-system' will be used
90 :param reuse_cluster_uuid: existing cluster uuid for reuse
91 :return: uuid of the K8s cluster and True if connector has installed some software in the cluster
92 (on error, an exception will be raised)
97 Bootstrapping cannot be done, by design, through the API. We need to
104 1. Has the environment already been bootstrapped?
105 - Check the database to see if we have a record for this env
107 2. If this is a new env, create it
108 - Add the k8s cloud to Juju
110 - Record it in the database
112 3. Connect to the Juju controller for this cloud
115 # cluster_uuid = reuse_cluster_uuid
116 # if not cluster_uuid:
117 # cluster_uuid = str(uuid4())
119 ##################################################
120 # TODO: Pull info from db based on the namespace #
121 ##################################################
123 ###################################################
124 # TODO: Make it idempotent, calling add-k8s and #
125 # bootstrap whenever reuse_cluster_uuid is passed #
127 # `init_env` is called to initialize the K8s #
128 # cluster for juju. If this initialization fails, #
129 # it can be called again by LCM with the param #
130 # reuse_cluster_uuid, e.g. to try to fix it. #
131 ###################################################
133 if not reuse_cluster_uuid
:
134 # This is a new cluster, so bootstrap it
136 cluster_uuid
= str(uuid
.uuid4())
138 # Is a local k8s cluster?
139 localk8s
= self
.is_local_k8s(k8s_creds
)
141 # If the k8s is external, the juju controller needs a loadbalancer
142 loadbalancer
= False if localk8s
else True
144 # Name the new k8s cloud
145 k8s_cloud
= "k8s-{}".format(cluster_uuid
)
147 print("Adding k8s cloud {}".format(k8s_cloud
))
148 await self
.add_k8s(k8s_cloud
, k8s_creds
)
150 # Bootstrap Juju controller
151 print("Bootstrapping...")
152 await self
.bootstrap(k8s_cloud
, cluster_uuid
, loadbalancer
)
153 print("Bootstrap done.")
155 # Get the controller information
157 # Parse ~/.local/share/juju/controllers.yaml
158 # controllers.testing.api-endpoints|ca-cert|uuid
159 print("Getting controller endpoints")
160 with
open(os
.path
.expanduser(
161 "~/.local/share/juju/controllers.yaml"
163 controllers
= yaml
.load(f
, Loader
=yaml
.Loader
)
164 controller
= controllers
['controllers'][cluster_uuid
]
165 endpoints
= controller
['api-endpoints']
166 self
.juju_endpoint
= endpoints
[0]
167 self
.juju_ca_cert
= controller
['ca-cert']
169 # Parse ~/.local/share/juju/accounts
170 # controllers.testing.user|password
171 print("Getting accounts")
172 with
open(os
.path
.expanduser(
173 "~/.local/share/juju/accounts.yaml"
175 controllers
= yaml
.load(f
, Loader
=yaml
.Loader
)
176 controller
= controllers
['controllers'][cluster_uuid
]
178 self
.juju_user
= controller
['user']
179 self
.juju_secret
= controller
['password']
181 print("user: {}".format(self
.juju_user
))
182 print("secret: {}".format(self
.juju_secret
))
183 print("endpoint: {}".format(self
.juju_endpoint
))
184 print("ca-cert: {}".format(self
.juju_ca_cert
))
186 # raise Exception("EOL")
188 self
.juju_public_key
= None
191 'endpoint': self
.juju_endpoint
,
192 'username': self
.juju_user
,
193 'secret': self
.juju_secret
,
194 'cacert': self
.juju_ca_cert
,
195 'namespace': namespace
,
196 'loadbalancer': loadbalancer
,
199 # Store the cluster configuration so it
200 # can be used for subsequent calls
201 print("Setting config")
202 await self
.set_config(cluster_uuid
, config
)
205 # This is an existing cluster, so get its config
206 cluster_uuid
= reuse_cluster_uuid
208 config
= self
.get_config(cluster_uuid
)
210 self
.juju_endpoint
= config
['endpoint']
211 self
.juju_user
= config
['username']
212 self
.juju_secret
= config
['secret']
213 self
.juju_ca_cert
= config
['cacert']
214 self
.juju_public_key
= None
216 # Login to the k8s cluster
217 if not self
.authenticated
:
218 await self
.login(cluster_uuid
)
220 # We're creating a new cluster
221 #print("Getting model {}".format(self.get_namespace(cluster_uuid), cluster_uuid=cluster_uuid))
222 #model = await self.get_model(
223 # self.get_namespace(cluster_uuid),
224 # cluster_uuid=cluster_uuid
227 ## Disconnect from the model
228 #if model and model.is_connected():
229 # await model.disconnect()
231 return cluster_uuid
, True
233 """Repo Management"""
240 raise NotImplemented()
242 async def repo_list(self
):
243 raise NotImplemented()
245 async def repo_remove(
249 raise NotImplemented()
256 uninstall_sw
: bool = False
260 Resets the Kubernetes cluster by removing the model that represents it.
262 :param cluster_uuid str: The UUID of the cluster to reset
263 :return: Returns True if successful or raises an exception.
267 if not self
.authenticated
:
268 await self
.login(cluster_uuid
)
270 if self
.controller
.is_connected():
272 namespace
= self
.get_namespace(cluster_uuid
)
273 if await self
.has_model(namespace
):
274 print("[reset] Destroying model")
275 await self
.controller
.destroy_model(
280 # Disconnect from the controller
281 print("[reset] Disconnecting controller")
284 # Destroy the controller (via CLI)
285 print("[reset] Destroying controller")
286 await self
.destroy_controller(cluster_uuid
)
288 print("[reset] Removing k8s cloud")
289 k8s_cloud
= "k8s-{}".format(cluster_uuid
)
290 await self
.remove_cloud(k8s_cloud
)
292 except Exception as ex
:
293 print("Caught exception during reset: {}".format(ex
))
304 timeout
: float = 300,
306 db_dict
: dict = None,
311 :param cluster_uuid str: The UUID of the cluster to install to
312 :param kdu_model str: The name or path of a bundle to install
313 :param atomic bool: If set, waits until the model is active and resets
314 the cluster on failure.
315 :param timeout int: The time, in seconds, to wait for the install
317 :param params dict: Key-value pairs of instantiation parameters
318 :param kdu_name: Name of the KDU instance to be installed
320 :return: If successful, returns ?
323 if not self
.authenticated
:
324 print("[install] Logging in to the controller")
325 await self
.login(cluster_uuid
)
328 # Get or create the model, based on the NS
331 kdu_instance
= "{}-{}".format(kdu_name
, db_dict
["filter"]["_id"])
333 kdu_instance
= db_dict
["filter"]["_id"]
335 self
.log
.debug("Checking for model named {}".format(kdu_instance
))
337 # Create the new model
338 self
.log
.debug("Adding model: {}".format(kdu_instance
))
339 model
= await self
.add_model(kdu_instance
, cluster_uuid
=cluster_uuid
)
342 # TODO: Instantiation parameters
345 "Juju bundle that models the KDU, in any of the following ways:
346 - <juju-repo>/<juju-bundle>
347 - <juju-bundle folder under k8s_models folder in the package>
348 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder in the package>
349 - <URL_where_to_fetch_juju_bundle>
353 if kdu_model
.startswith("cs:"):
355 elif kdu_model
.startswith("http"):
361 # if kdu_model.endswith(".tar.gz") or kdu_model.endswith(".tgz")
362 # Uncompress temporarily
363 # bundle = <uncompressed file>
367 # Raise named exception that the bundle could not be found
370 print("[install] deploying {}".format(bundle
))
371 await model
.deploy(bundle
)
373 # Get the application
375 # applications = model.applications
376 print("[install] Applications: {}".format(model
.applications
))
377 for name
in model
.applications
:
378 print("[install] Waiting for {} to settle".format(name
))
379 application
= model
.applications
[name
]
381 # It's not enough to wait for all units to be active;
382 # the application status needs to be active as well.
383 print("Waiting for all units to be active...")
384 await model
.block_until(
386 unit
.agent_status
== 'idle'
387 and application
.status
in ['active', 'unknown']
388 and unit
.workload_status
in [
390 ] for unit
in application
.units
394 print("All units active.")
396 except concurrent
.futures
._base
.TimeoutError
:
397 print("[install] Timeout exceeded; resetting cluster")
398 await self
.reset(cluster_uuid
)
401 # Wait for the application to be active
402 if model
.is_connected():
403 print("[install] Disconnecting model")
404 await model
.disconnect()
407 raise Exception("Unable to install")
409 async def instances_list(
414 returns a list of deployed releases in a cluster
416 :param cluster_uuid: the cluster
425 kdu_model
: str = None,
430 :param cluster_uuid str: The UUID of the cluster to upgrade
431 :param kdu_instance str: The unique name of the KDU instance
432 :param kdu_model str: The name or path of the bundle to upgrade to
433 :param params dict: Key-value pairs of instantiation parameters
435 :return: If successful, reference to the new revision number of the
439 # TODO: Loop through the bundle and upgrade each charm individually
442 The API doesn't have a concept of bundle upgrades, because there are
443 many possible changes: charm revision, disk, number of units, etc.
445 As such, we are only supporting a limited subset of upgrades. We'll
446 upgrade the charm revision but leave storage and scale untouched.
448 Scale changes should happen through OSM constructs, and changes to
449 storage would require a redeployment of the service, at least in this
452 namespace
= self
.get_namespace(cluster_uuid
)
453 model
= await self
.get_model(namespace
, cluster_uuid
=cluster_uuid
)
455 with
open(kdu_model
, 'r') as f
:
456 bundle
= yaml
.safe_load(f
)
460 'description': 'Test bundle',
461 'bundle': 'kubernetes',
464 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
467 'password': 'manopw',
468 'root_password': 'osm4u',
471 'series': 'kubernetes'
476 # TODO: This should be returned in an agreed-upon format
477 for name
in bundle
['applications']:
478 print(model
.applications
)
479 application
= model
.applications
[name
]
482 path
= bundle
['applications'][name
]['charm']
485 await application
.upgrade_charm(switch
=path
)
486 except juju
.errors
.JujuError
as ex
:
487 if 'already running charm' in str(ex
):
488 # We're already running this version
491 await model
.disconnect()
494 raise NotImplemented()
505 :param cluster_uuid str: The UUID of the cluster to rollback
506 :param kdu_instance str: The unique name of the KDU instance
507 :param revision int: The revision to revert to. If omitted, rolls back
508 the previous upgrade.
510 :return: If successful, returns the revision of active KDU instance,
511 or raises an exception
513 raise NotImplemented()
521 """Uninstall a KDU instance
523 :param cluster_uuid str: The UUID of the cluster
524 :param kdu_instance str: The unique name of the KDU instance
526 :return: Returns True if successful, or raises an exception
528 if not self
.authenticated
:
529 self
.log
.debug("[uninstall] Connecting to controller")
530 await self
.login(cluster_uuid
)
532 self
.log
.debug("[uninstall] Destroying model")
534 await self
.controller
.destroy_models(kdu_instance
)
536 self
.log
.debug("[uninstall] Model destroyed and disconnecting")
542 async def inspect_kdu(
548 Inspects a bundle and returns a dictionary of config parameters and
549 their default values.
551 :param kdu_model str: The name or path of the bundle to inspect.
553 :return: If successful, returns a dictionary of available parameters
554 and their default values.
558 with
open(kdu_model
, 'r') as f
:
559 bundle
= yaml
.safe_load(f
)
563 'description': 'Test bundle',
564 'bundle': 'kubernetes',
567 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
570 'password': 'manopw',
571 'root_password': 'osm4u',
574 'series': 'kubernetes'
579 # TODO: This should be returned in an agreed-upon format
580 kdu
= bundle
['applications']
590 If available, returns the README of the bundle.
592 :param kdu_model str: The name or path of a bundle
594 :return: If found, returns the contents of the README.
598 files
= ['README', 'README.txt', 'README.md']
599 path
= os
.path
.dirname(kdu_model
)
600 for file in os
.listdir(path
):
602 with
open(file, 'r') as f
:
608 async def status_kdu(
613 """Get the status of the KDU
615 Get the current status of the KDU instance.
617 :param cluster_uuid str: The UUID of the cluster
618 :param kdu_instance str: The unique id of the KDU instance
620 :return: Returns a dictionary containing namespace, state, resources,
625 model
= await self
.get_model(self
.get_namespace(cluster_uuid
), cluster_uuid
=cluster_uuid
)
627 # model = await self.get_model_by_uuid(cluster_uuid)
629 model_status
= await model
.get_status()
630 status
= model_status
.applications
632 for name
in model_status
.applications
:
633 application
= model_status
.applications
[name
]
635 'status': application
['status']['status']
638 if model
.is_connected():
639 await model
.disconnect()
649 """Add a k8s cloud to Juju
651 Adds a Kubernetes cloud to Juju, so it can be bootstrapped with a
654 :param cloud_name str: The name of the cloud to add.
655 :param credentials dict: A dictionary representing the output of
656 `kubectl config view --raw`.
658 :returns: True if successful, otherwise raises an exception.
661 cmd
= [self
.juju_command
, "add-k8s", "--local", cloud_name
]
664 process
= await asyncio
.create_subprocess_exec(
666 stdout
=asyncio
.subprocess
.PIPE
,
667 stderr
=asyncio
.subprocess
.PIPE
,
668 stdin
=asyncio
.subprocess
.PIPE
,
671 # Feed the process the credentials
672 process
.stdin
.write(credentials
.encode("utf-8"))
673 await process
.stdin
.drain()
674 process
.stdin
.close()
676 stdout
, stderr
= await process
.communicate()
678 return_code
= process
.returncode
680 print("add-k8s return code: {}".format(return_code
))
683 raise Exception(stderr
)
691 ) -> juju
.model
.Model
:
692 """Adds a model to the controller
694 Adds a new model to the Juju controller
696 :param model_name str: The name of the model to add.
697 :returns: The juju.model.Model object of the new model upon success or
700 if not self
.authenticated
:
701 await self
.login(cluster_uuid
)
703 self
.log
.debug("Adding model '{}' to cluster_uuid '{}'".format(model_name
, cluster_uuid
))
705 model
= await self
.controller
.add_model(
707 config
={'authorized-keys': self
.juju_public_key
}
709 except Exception as ex
:
711 self
.log
.debug("Caught exception: {}".format(ex
))
722 """Bootstrap a Kubernetes controller
724 Bootstrap a Juju controller inside the Kubernetes cluster
726 :param cloud_name str: The name of the cloud.
727 :param cluster_uuid str: The UUID of the cluster to bootstrap.
728 :param loadbalancer bool: If the controller should use loadbalancer or not.
729 :returns: True upon success or raises an exception.
733 cmd
= [self
.juju_command
, "bootstrap", cloud_name
, cluster_uuid
]
736 For public clusters, specify that the controller service is using a LoadBalancer.
738 cmd
= [self
.juju_command
, "bootstrap", cloud_name
, cluster_uuid
, "--config", "controller-service-type=loadbalancer"]
740 print("Bootstrapping controller {} in cloud {}".format(
741 cluster_uuid
, cloud_name
744 process
= await asyncio
.create_subprocess_exec(
746 stdout
=asyncio
.subprocess
.PIPE
,
747 stderr
=asyncio
.subprocess
.PIPE
,
750 stdout
, stderr
= await process
.communicate()
752 return_code
= process
.returncode
756 if b
'already exists' not in stderr
:
757 raise Exception(stderr
)
761 async def destroy_controller(
765 """Destroy a Kubernetes controller
767 Destroy an existing Kubernetes controller.
769 :param cluster_uuid str: The UUID of the cluster to bootstrap.
770 :returns: True upon success or raises an exception.
774 "destroy-controller",
775 "--destroy-all-models",
781 process
= await asyncio
.create_subprocess_exec(
783 stdout
=asyncio
.subprocess
.PIPE
,
784 stderr
=asyncio
.subprocess
.PIPE
,
787 stdout
, stderr
= await process
.communicate()
789 return_code
= process
.returncode
793 if 'already exists' not in stderr
:
794 raise Exception(stderr
)
800 """Get the cluster configuration
802 Gets the configuration of the cluster
804 :param cluster_uuid str: The UUID of the cluster.
805 :return: A dict upon success, or raises an exception.
807 cluster_config
= "{}/{}.yaml".format(self
.fs
.path
, cluster_uuid
)
808 if os
.path
.exists(cluster_config
):
809 with
open(cluster_config
, 'r') as f
:
810 config
= yaml
.safe_load(f
.read())
814 "Unable to locate configuration for cluster {}".format(
823 ) -> juju
.model
.Model
:
824 """Get a model from the Juju Controller.
826 Note: Model objects returned must call disconnected() before it goes
829 :param model_name str: The name of the model to get
830 :return The juju.model.Model object if found, or None.
832 if not self
.authenticated
:
833 await self
.login(cluster_uuid
)
836 models
= await self
.controller
.list_models()
837 self
.log
.debug(models
)
838 if model_name
in models
:
839 self
.log
.debug("Found model: {}".format(model_name
))
840 model
= await self
.controller
.get_model(
849 """Get the namespace UUID
850 Gets the namespace's unique name
852 :param cluster_uuid str: The UUID of the cluster
853 :returns: The namespace UUID, or raises an exception
855 config
= self
.get_config(cluster_uuid
)
857 # Make sure the name is in the config
858 if 'namespace' not in config
:
859 raise Exception("Namespace not found.")
861 # TODO: We want to make sure this is unique to the cluster, in case
862 # the cluster is being reused.
863 # Consider pre/appending the cluster id to the namespace string
864 return config
['namespace']
870 """Check if a model exists in the controller
872 Checks to see if a model exists in the connected Juju controller.
874 :param model_name str: The name of the model
875 :return: A boolean indicating if the model exists
877 models
= await self
.controller
.list_models()
879 if model_name
in models
:
887 """Check if a cluster is local
889 Checks if a cluster is running in the local host
891 :param credentials dict: A dictionary containing the k8s credentials
892 :returns: A boolean if the cluster is running locally
894 creds
= yaml
.safe_load(credentials
)
895 if os
.getenv("OSMLCM_VCA_APIPROXY"):
896 host_ip
= os
.getenv("OSMLCM_VCA_APIPROXY")
898 if creds
and host_ip
:
899 for cluster
in creds
['clusters']:
900 if 'server' in cluster
['cluster']:
901 if host_ip
in cluster
['cluster']['server']:
906 async def login(self
, cluster_uuid
):
907 """Login to the Juju controller."""
909 if self
.authenticated
:
912 self
.connecting
= True
914 # Test: Make sure we have the credentials loaded
915 config
= self
.get_config(cluster_uuid
)
917 self
.juju_endpoint
= config
['endpoint']
918 self
.juju_user
= config
['username']
919 self
.juju_secret
= config
['secret']
920 self
.juju_ca_cert
= config
['cacert']
921 self
.juju_public_key
= None
923 self
.controller
= Controller()
927 "Connecting to controller... ws://{} as {}/{}".format(
934 await self
.controller
.connect(
935 endpoint
=self
.juju_endpoint
,
936 username
=self
.juju_user
,
937 password
=self
.juju_secret
,
938 cacert
=self
.juju_ca_cert
,
940 self
.authenticated
= True
941 self
.log
.debug("JujuApi: Logged into controller")
942 except Exception as ex
:
944 self
.log
.debug("Caught exception: {}".format(ex
))
947 self
.log
.fatal("VCA credentials not configured.")
948 self
.authenticated
= False
950 async def logout(self
):
951 """Logout of the Juju controller."""
953 if not self
.authenticated
:
956 for model
in self
.models
:
957 print("Logging out of model {}".format(model
))
958 await self
.models
[model
].disconnect()
961 self
.log
.debug("Disconnecting controller {}".format(
964 await self
.controller
.disconnect()
965 self
.controller
= None
967 self
.authenticated
= False
969 async def remove_cloud(
973 """Remove a k8s cloud from Juju
975 Removes a Kubernetes cloud from Juju.
977 :param cloud_name str: The name of the cloud to add.
979 :returns: True if successful, otherwise raises an exception.
982 # Remove the bootstrapped controller
983 cmd
= [self
.juju_command
, "remove-k8s", "--client", cloud_name
]
984 process
= await asyncio
.create_subprocess_exec(
986 stdout
=asyncio
.subprocess
.PIPE
,
987 stderr
=asyncio
.subprocess
.PIPE
,
990 stdout
, stderr
= await process
.communicate()
992 return_code
= process
.returncode
995 raise Exception(stderr
)
997 # Remove the cloud from the local config
998 cmd
= [self
.juju_command
, "remove-cloud", "--client", cloud_name
]
999 process
= await asyncio
.create_subprocess_exec(
1001 stdout
=asyncio
.subprocess
.PIPE
,
1002 stderr
=asyncio
.subprocess
.PIPE
,
1005 stdout
, stderr
= await process
.communicate()
1007 return_code
= process
.returncode
1010 raise Exception(stderr
)
1014 async def set_config(
1019 """Save the cluster configuration
1021 Saves the cluster information to the file store
1023 :param cluster_uuid str: The UUID of the cluster
1024 :param config dict: A dictionary containing the cluster configuration
1025 :returns: Boolean upon success or raises an exception.
1028 cluster_config
= "{}/{}.yaml".format(self
.fs
.path
, cluster_uuid
)
1029 if not os
.path
.exists(cluster_config
):
1030 print("Writing config to {}".format(cluster_config
))
1031 with
open(cluster_config
, 'w') as f
:
1032 f
.write(yaml
.dump(config
, Dumper
=yaml
.Dumper
))