1 # Copyright 2019 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
16 from .exceptions
import NotImplemented
19 # from juju.bundle import BundleHandler
20 from juju
.controller
import Controller
21 from juju
.model
import Model
22 from juju
.errors
import JujuAPIError
, JujuError
26 from n2vc
.k8s_conn
import K8sConnector
32 # from .vnf import N2VC
38 class K8sJujuConnector(K8sConnector
):
44 kubectl_command
: str = '/usr/bin/kubectl',
45 juju_command
: str = '/usr/bin/juju',
51 :param kubectl_command: path to kubectl executable
52 :param helm_command: path to helm executable
53 :param fs: file system for kubernetes and helm configuration
58 K8sConnector
.__init
__(
62 on_update_db
=on_update_db
,
66 self
.info('Initializing K8S Juju connector')
68 self
.authenticated
= False
70 self
.log
= logging
.getLogger(__name__
)
72 self
.juju_command
= juju_command
75 self
.info('K8S Juju connector initialized')
81 namespace
: str = 'kube-system',
82 reuse_cluster_uuid
: str = None,
84 """Initialize a Kubernetes environment
86 :param k8s_creds dict: A dictionary containing the Kubernetes cluster
88 :param namespace str: The Kubernetes namespace to initialize
90 :return: UUID of the k8s context or raises an exception
95 Bootstrapping cannot be done, by design, through the API. We need to
102 1. Has the environment already been bootstrapped?
103 - Check the database to see if we have a record for this env
105 2. If this is a new env, create it
106 - Add the k8s cloud to Juju
108 - Record it in the database
110 3. Connect to the Juju controller for this cloud
113 # cluster_uuid = reuse_cluster_uuid
114 # if not cluster_uuid:
115 # cluster_uuid = str(uuid4())
117 ##################################################
118 # TODO: Pull info from db based on the namespace #
119 ##################################################
121 if not reuse_cluster_uuid
:
122 # This is a new cluster, so bootstrap it
124 cluster_uuid
= str(uuid
.uuid4())
126 # Add k8s cloud to Juju (unless it's microk8s)
128 # Does the kubeconfig contain microk8s?
129 microk8s
= self
.is_microk8s_by_credentials(k8s_creds
)
131 # Name the new k8s cloud
132 k8s_cloud
= "{}-k8s".format(namespace
)
134 print("Adding k8s cloud {}".format(k8s_cloud
))
135 await self
.add_k8s(k8s_cloud
, k8s_creds
)
137 # Bootstrap Juju controller
138 print("Bootstrapping...")
139 await self
.bootstrap(k8s_cloud
, cluster_uuid
, microk8s
)
140 print("Bootstrap done.")
142 # Get the controller information
144 # Parse ~/.local/share/juju/controllers.yaml
145 # controllers.testing.api-endpoints|ca-cert|uuid
146 print("Getting controller endpoints")
147 with
open(os
.path
.expanduser(
148 "~/.local/share/juju/controllers.yaml"
150 controllers
= yaml
.load(f
, Loader
=yaml
.Loader
)
151 controller
= controllers
['controllers'][cluster_uuid
]
152 endpoints
= controller
['api-endpoints']
153 self
.juju_endpoint
= endpoints
[0]
154 self
.juju_ca_cert
= controller
['ca-cert']
156 # Parse ~/.local/share/juju/accounts
157 # controllers.testing.user|password
158 print("Getting accounts")
159 with
open(os
.path
.expanduser(
160 "~/.local/share/juju/accounts.yaml"
162 controllers
= yaml
.load(f
, Loader
=yaml
.Loader
)
163 controller
= controllers
['controllers'][cluster_uuid
]
165 self
.juju_user
= controller
['user']
166 self
.juju_secret
= controller
['password']
168 print("user: {}".format(self
.juju_user
))
169 print("secret: {}".format(self
.juju_secret
))
170 print("endpoint: {}".format(self
.juju_endpoint
))
171 print("ca-cert: {}".format(self
.juju_ca_cert
))
173 # raise Exception("EOL")
175 self
.juju_public_key
= None
178 'endpoint': self
.juju_endpoint
,
179 'username': self
.juju_user
,
180 'secret': self
.juju_secret
,
181 'cacert': self
.juju_ca_cert
,
182 'namespace': namespace
,
183 'microk8s': microk8s
,
186 # Store the cluster configuration so it
187 # can be used for subsequent calls
188 print("Setting config")
189 await self
.set_config(cluster_uuid
, config
)
192 # This is an existing cluster, so get its config
193 cluster_uuid
= reuse_cluster_uuid
195 config
= self
.get_config(cluster_uuid
)
197 self
.juju_endpoint
= config
['endpoint']
198 self
.juju_user
= config
['username']
199 self
.juju_secret
= config
['secret']
200 self
.juju_ca_cert
= config
['cacert']
201 self
.juju_public_key
= None
203 # Login to the k8s cluster
204 if not self
.authenticated
:
205 await self
.login(cluster_uuid
)
207 # We're creating a new cluster
208 print("Getting model {}".format(self
.get_namespace(cluster_uuid
), cluster_uuid
=cluster_uuid
))
209 model
= await self
.get_model(
210 self
.get_namespace(cluster_uuid
),
211 cluster_uuid
=cluster_uuid
214 # Disconnect from the model
215 if model
and model
.is_connected():
216 await model
.disconnect()
218 return cluster_uuid
, True
220 """Repo Management"""
227 raise NotImplemented()
229 async def repo_list(self
):
230 raise NotImplemented()
232 async def repo_remove(
236 raise NotImplemented()
243 uninstall_sw
: bool = False
247 Resets the Kubernetes cluster by removing the model that represents it.
249 :param cluster_uuid str: The UUID of the cluster to reset
250 :return: Returns True if successful or raises an exception.
254 if not self
.authenticated
:
255 await self
.login(cluster_uuid
)
257 if self
.controller
.is_connected():
259 namespace
= self
.get_namespace(cluster_uuid
)
260 if await self
.has_model(namespace
):
261 print("[reset] Destroying model")
262 await self
.controller
.destroy_model(
267 # Disconnect from the controller
268 print("[reset] Disconnecting controller")
269 await self
.controller
.disconnect()
271 # Destroy the controller (via CLI)
272 print("[reset] Destroying controller")
273 await self
.destroy_controller(cluster_uuid
)
275 """Remove the k8s cloud
277 Only remove the k8s cloud if it's not a microk8s cloud,
278 since microk8s is a built-in cloud type.
280 # microk8s = self.is_microk8s_by_cluster_uuid(cluster_uuid)
282 print("[reset] Removing k8s cloud")
283 namespace
= self
.get_namespace(cluster_uuid
)
284 k8s_cloud
= "{}-k8s".format(namespace
)
285 await self
.remove_cloud(k8s_cloud
)
287 except Exception as ex
:
288 print("Caught exception during reset: {}".format(ex
))
297 timeout
: float = 300,
303 :param cluster_uuid str: The UUID of the cluster to install to
304 :param kdu_model str: The name or path of a bundle to install
305 :param atomic bool: If set, waits until the model is active and resets
306 the cluster on failure.
307 :param timeout int: The time, in seconds, to wait for the install
309 :param params dict: Key-value pairs of instantiation parameters
311 :return: If successful, returns ?
314 if not self
.authenticated
:
315 print("[install] Logging in to the controller")
316 await self
.login(cluster_uuid
)
319 # Get or create the model, based on the NS
321 model_name
= db_dict
["filter"]["_id"]
323 self
.log
.debug("Checking for model named {}".format(model_name
))
324 model
= await self
.get_model(model_name
, cluster_uuid
=cluster_uuid
)
326 # Create the new model
327 self
.log
.debug("Adding model: {}".format(model_name
))
328 model
= await self
.add_model(model_name
, cluster_uuid
=cluster_uuid
)
331 # TODO: Instantiation parameters
334 "Juju bundle that models the KDU, in any of the following ways:
335 - <juju-repo>/<juju-bundle>
336 - <juju-bundle folder under k8s_models folder in the package>
337 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder in the package>
338 - <URL_where_to_fetch_juju_bundle>
342 if kdu_model
.startswith("cs:"):
344 elif kdu_model
.startswith("http"):
350 # if kdu_model.endswith(".tar.gz") or kdu_model.endswith(".tgz")
351 # Uncompress temporarily
352 # bundle = <uncompressed file>
356 # Raise named exception that the bundle could not be found
359 print("[install] deploying {}".format(bundle
))
360 await model
.deploy(bundle
)
362 # Get the application
364 # applications = model.applications
365 print("[install] Applications: {}".format(model
.applications
))
366 for name
in model
.applications
:
367 print("[install] Waiting for {} to settle".format(name
))
368 application
= model
.applications
[name
]
370 # It's not enough to wait for all units to be active;
371 # the application status needs to be active as well.
372 print("Waiting for all units to be active...")
373 await model
.block_until(
375 unit
.agent_status
== 'idle'
376 and application
.status
in ['active', 'unknown']
377 and unit
.workload_status
in [
379 ] for unit
in application
.units
383 print("All units active.")
385 except concurrent
.futures
._base
.TimeoutError
:
386 print("[install] Timeout exceeded; resetting cluster")
387 await self
.reset(cluster_uuid
)
390 # Wait for the application to be active
391 if model
.is_connected():
392 print("[install] Disconnecting model")
393 await model
.disconnect()
396 raise Exception("Unable to install")
398 async def instances_list(
403 returns a list of deployed releases in a cluster
405 :param cluster_uuid: the cluster
414 kdu_model
: str = None,
419 :param cluster_uuid str: The UUID of the cluster to upgrade
420 :param kdu_instance str: The unique name of the KDU instance
421 :param kdu_model str: The name or path of the bundle to upgrade to
422 :param params dict: Key-value pairs of instantiation parameters
424 :return: If successful, reference to the new revision number of the
428 # TODO: Loop through the bundle and upgrade each charm individually
431 The API doesn't have a concept of bundle upgrades, because there are
432 many possible changes: charm revision, disk, number of units, etc.
434 As such, we are only supporting a limited subset of upgrades. We'll
435 upgrade the charm revision but leave storage and scale untouched.
437 Scale changes should happen through OSM constructs, and changes to
438 storage would require a redeployment of the service, at least in this
441 namespace
= self
.get_namespace(cluster_uuid
)
442 model
= await self
.get_model(namespace
, cluster_uuid
=cluster_uuid
)
444 with
open(kdu_model
, 'r') as f
:
445 bundle
= yaml
.safe_load(f
)
449 'description': 'Test bundle',
450 'bundle': 'kubernetes',
453 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
456 'password': 'manopw',
457 'root_password': 'osm4u',
460 'series': 'kubernetes'
465 # TODO: This should be returned in an agreed-upon format
466 for name
in bundle
['applications']:
467 print(model
.applications
)
468 application
= model
.applications
[name
]
471 path
= bundle
['applications'][name
]['charm']
474 await application
.upgrade_charm(switch
=path
)
475 except juju
.errors
.JujuError
as ex
:
476 if 'already running charm' in str(ex
):
477 # We're already running this version
480 await model
.disconnect()
483 raise NotImplemented()
494 :param cluster_uuid str: The UUID of the cluster to rollback
495 :param kdu_instance str: The unique name of the KDU instance
496 :param revision int: The revision to revert to. If omitted, rolls back
497 the previous upgrade.
499 :return: If successful, returns the revision of active KDU instance,
500 or raises an exception
502 raise NotImplemented()
510 """Uninstall a KDU instance
512 :param cluster_uuid str: The UUID of the cluster to uninstall
513 :param kdu_instance str: The unique name of the KDU instance
515 :return: Returns True if successful, or raises an exception
519 # Remove an application from the model
520 model
= await self
.get_model(self
.get_namespace(cluster_uuid
), cluster_uuid
=cluster_uuid
)
523 # Get the application
524 if kdu_instance
not in model
.applications
:
525 # TODO: Raise a named exception
526 raise Exception("Application not found.")
528 application
= model
.applications
[kdu_instance
]
530 # Destroy the application
531 await application
.destroy()
533 # TODO: Verify removal
539 async def inspect_kdu(
545 Inspects a bundle and returns a dictionary of config parameters and
546 their default values.
548 :param kdu_model str: The name or path of the bundle to inspect.
550 :return: If successful, returns a dictionary of available parameters
551 and their default values.
555 with
open(kdu_model
, 'r') as f
:
556 bundle
= yaml
.safe_load(f
)
560 'description': 'Test bundle',
561 'bundle': 'kubernetes',
564 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
567 'password': 'manopw',
568 'root_password': 'osm4u',
571 'series': 'kubernetes'
576 # TODO: This should be returned in an agreed-upon format
577 kdu
= bundle
['applications']
587 If available, returns the README of the bundle.
589 :param kdu_model str: The name or path of a bundle
591 :return: If found, returns the contents of the README.
595 files
= ['README', 'README.txt', 'README.md']
596 path
= os
.path
.dirname(kdu_model
)
597 for file in os
.listdir(path
):
599 with
open(file, 'r') as f
:
605 async def status_kdu(
610 """Get the status of the KDU
612 Get the current status of the KDU instance.
614 :param cluster_uuid str: The UUID of the cluster
615 :param kdu_instance str: The unique id of the KDU instance
617 :return: Returns a dictionary containing namespace, state, resources,
622 model
= await self
.get_model(self
.get_namespace(cluster_uuid
), cluster_uuid
=cluster_uuid
)
624 # model = await self.get_model_by_uuid(cluster_uuid)
626 model_status
= await model
.get_status()
627 status
= model_status
.applications
629 for name
in model_status
.applications
:
630 application
= model_status
.applications
[name
]
632 'status': application
['status']['status']
635 if model
.is_connected():
636 await model
.disconnect()
646 """Add a k8s cloud to Juju
648 Adds a Kubernetes cloud to Juju, so it can be bootstrapped with a
651 :param cloud_name str: The name of the cloud to add.
652 :param credentials dict: A dictionary representing the output of
653 `kubectl config view --raw`.
655 :returns: True if successful, otherwise raises an exception.
658 cmd
= [self
.juju_command
, "add-k8s", "--local", cloud_name
]
662 stdout
=subprocess
.PIPE
,
663 stderr
=subprocess
.PIPE
,
664 # input=yaml.dump(credentials, Dumper=yaml.Dumper).encode("utf-8"),
665 input=credentials
.encode("utf-8"),
668 retcode
= p
.returncode
669 print("add-k8s return code: {}".format(retcode
))
672 raise Exception(p
.stderr
)
679 ) -> juju
.model
.Model
:
680 """Adds a model to the controller
682 Adds a new model to the Juju controller
684 :param model_name str: The name of the model to add.
685 :returns: The juju.model.Model object of the new model upon success or
688 if not self
.authenticated
:
689 await self
.login(cluster_uuid
)
691 self
.log
.debug("Adding model '{}' to cluster_uuid '{}'".format(model_name
, cluster_uuid
))
692 model
= await self
.controller
.add_model(
694 config
={'authorized-keys': self
.juju_public_key
}
704 """Bootstrap a Kubernetes controller
706 Bootstrap a Juju controller inside the Kubernetes cluster
708 :param cloud_name str: The name of the cloud.
709 :param cluster_uuid str: The UUID of the cluster to bootstrap.
710 :param microk8s bool: If this is a microk8s cluster.
711 :returns: True upon success or raises an exception.
715 cmd
= [self
.juju_command
, "bootstrap", cloud_name
, cluster_uuid
]
718 For non-microk8s clusters, specify that the controller service is using a LoadBalancer.
720 cmd
= [self
.juju_command
, "bootstrap", cloud_name
, cluster_uuid
, "--config", "controller-service-type=loadbalancer"]
722 print("Bootstrapping controller {} in cloud {}".format(
723 cluster_uuid
, cloud_name
728 stdout
=subprocess
.PIPE
,
729 stderr
=subprocess
.PIPE
,
732 retcode
= p
.returncode
736 if b
'already exists' not in p
.stderr
:
737 raise Exception(p
.stderr
)
741 async def destroy_controller(
745 """Destroy a Kubernetes controller
747 Destroy an existing Kubernetes controller.
749 :param cluster_uuid str: The UUID of the cluster to bootstrap.
750 :returns: True upon success or raises an exception.
754 "destroy-controller",
755 "--destroy-all-models",
763 stdout
=subprocess
.PIPE
,
764 stderr
=subprocess
.PIPE
,
767 retcode
= p
.returncode
771 if 'already exists' not in p
.stderr
:
772 raise Exception(p
.stderr
)
778 """Get the cluster configuration
780 Gets the configuration of the cluster
782 :param cluster_uuid str: The UUID of the cluster.
783 :return: A dict upon success, or raises an exception.
785 cluster_config
= "{}/{}.yaml".format(self
.fs
.path
, cluster_uuid
)
786 if os
.path
.exists(cluster_config
):
787 with
open(cluster_config
, 'r') as f
:
788 config
= yaml
.safe_load(f
.read())
792 "Unable to locate configuration for cluster {}".format(
801 ) -> juju
.model
.Model
:
802 """Get a model from the Juju Controller.
804 Note: Model objects returned must call disconnected() before it goes
807 :param model_name str: The name of the model to get
808 :return The juju.model.Model object if found, or None.
810 if not self
.authenticated
:
811 await self
.login(cluster_uuid
)
814 models
= await self
.controller
.list_models()
815 self
.log
.debug(models
)
816 if model_name
in models
:
817 self
.log
.debug("Found model: {}".format(model_name
))
818 model
= await self
.controller
.get_model(
827 """Get the namespace UUID
828 Gets the namespace's unique name
830 :param cluster_uuid str: The UUID of the cluster
831 :returns: The namespace UUID, or raises an exception
833 config
= self
.get_config(cluster_uuid
)
835 # Make sure the name is in the config
836 if 'namespace' not in config
:
837 raise Exception("Namespace not found.")
839 # TODO: We want to make sure this is unique to the cluster, in case
840 # the cluster is being reused.
841 # Consider pre/appending the cluster id to the namespace string
842 return config
['namespace']
848 """Check if a model exists in the controller
850 Checks to see if a model exists in the connected Juju controller.
852 :param model_name str: The name of the model
853 :return: A boolean indicating if the model exists
855 models
= await self
.controller
.list_models()
857 if model_name
in models
:
861 def is_microk8s_by_cluster_uuid(
865 """Check if a cluster is micro8s
867 Checks if a cluster is running microk8s
869 :param cluster_uuid str: The UUID of the cluster
870 :returns: A boolean if the cluster is running microk8s
872 config
= self
.get_config(cluster_uuid
)
873 return config
['microk8s']
875 def is_microk8s_by_credentials(
879 """Check if a cluster is micro8s
881 Checks if a cluster is running microk8s
883 :param credentials dict: A dictionary containing the k8s credentials
884 :returns: A boolean if the cluster is running microk8s
886 creds
= yaml
.safe_load(credentials
)
888 for context
in creds
['contexts']:
889 if 'microk8s' in context
['name']:
894 async def login(self
, cluster_uuid
):
895 """Login to the Juju controller."""
897 if self
.authenticated
:
900 self
.connecting
= True
902 # Test: Make sure we have the credentials loaded
903 config
= self
.get_config(cluster_uuid
)
905 self
.juju_endpoint
= config
['endpoint']
906 self
.juju_user
= config
['username']
907 self
.juju_secret
= config
['secret']
908 self
.juju_ca_cert
= config
['cacert']
909 self
.juju_public_key
= None
911 self
.controller
= Controller()
915 "Connecting to controller... ws://{} as {}/{}".format(
922 await self
.controller
.connect(
923 endpoint
=self
.juju_endpoint
,
924 username
=self
.juju_user
,
925 password
=self
.juju_secret
,
926 cacert
=self
.juju_ca_cert
,
928 self
.authenticated
= True
929 self
.log
.debug("JujuApi: Logged into controller")
930 except Exception as ex
:
932 self
.log
.debug("Caught exception: {}".format(ex
))
935 self
.log
.fatal("VCA credentials not configured.")
936 self
.authenticated
= False
938 async def logout(self
):
939 """Logout of the Juju controller."""
941 if not self
.authenticated
:
944 for model
in self
.models
:
945 print("Logging out of model {}".format(model
))
946 await self
.models
[model
].disconnect()
949 self
.log
.debug("Disconnecting controller {}".format(
952 await self
.controller
.disconnect()
953 self
.controller
= None
955 self
.authenticated
= False
957 async def remove_cloud(
961 """Remove a k8s cloud from Juju
963 Removes a Kubernetes cloud from Juju.
965 :param cloud_name str: The name of the cloud to add.
967 :returns: True if successful, otherwise raises an exception.
970 # Remove the bootstrapped controller
971 cmd
= [self
.juju_command
, "remove-k8s", "--client", cloud_name
]
974 stdout
=subprocess
.PIPE
,
975 stderr
=subprocess
.PIPE
,
978 retcode
= p
.returncode
981 raise Exception(p
.stderr
)
983 # Remove the cloud from the local config
984 cmd
= [self
.juju_command
, "remove-cloud", "--client", cloud_name
]
987 stdout
=subprocess
.PIPE
,
988 stderr
=subprocess
.PIPE
,
991 retcode
= p
.returncode
994 raise Exception(p
.stderr
)
999 async def set_config(
1004 """Save the cluster configuration
1006 Saves the cluster information to the file store
1008 :param cluster_uuid str: The UUID of the cluster
1009 :param config dict: A dictionary containing the cluster configuration
1010 :returns: Boolean upon success or raises an exception.
1013 cluster_config
= "{}/{}.yaml".format(self
.fs
.path
, cluster_uuid
)
1014 if not os
.path
.exists(cluster_config
):
1015 print("Writing config to {}".format(cluster_config
))
1016 with
open(cluster_config
, 'w') as f
:
1017 f
.write(yaml
.dump(config
, Dumper
=yaml
.Dumper
))