1 # Copyright 2019 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
16 from .exceptions
import NotImplemented
19 # from juju.bundle import BundleHandler
20 from juju
.controller
import Controller
21 from juju
.model
import Model
22 from juju
.errors
import JujuAPIError
, JujuError
26 from n2vc
.k8s_conn
import K8sConnector
32 # from .vnf import N2VC
38 class K8sJujuConnector(K8sConnector
):
42 kubectl_command
='/usr/bin/kubectl',
47 :param kubectl_command: path to kubectl executable
48 :param helm_command: path to helm executable
49 :param fs: file system for kubernetes and helm configuration
54 K8sConnector
.__init
__(
56 kubectl_command
=kubectl_command
,
61 self
.info('Initializing K8S Juju connector')
63 self
.authenticated
= False
65 self
.log
= logging
.getLogger(__name__
)
66 self
.info('K8S Juju connector initialized')
72 namespace
: str = 'kube-system',
73 reuse_cluster_uuid
: str = None,
75 """Initialize a Kubernetes environment
77 :param k8s_creds dict: A dictionary containing the Kubernetes cluster
79 :param namespace str: The Kubernetes namespace to initialize
81 :return: UUID of the k8s context or raises an exception
86 Bootstrapping cannot be done, by design, through the API. We need to
89 # TODO: The path may change
92 self
.k8scli
= "{}/juju".format(jujudir
)
97 1. Has the environment already been bootstrapped?
98 - Check the database to see if we have a record for this env
100 2. If this is a new env, create it
101 - Add the k8s cloud to Juju
103 - Record it in the database
105 3. Connect to the Juju controller for this cloud
108 # cluster_uuid = reuse_cluster_uuid
109 # if not cluster_uuid:
110 # cluster_uuid = str(uuid4())
112 ##################################################
113 # TODO: Pull info from db based on the namespace #
114 ##################################################
116 if not reuse_cluster_uuid
:
117 # This is a new cluster, so bootstrap it
119 cluster_uuid
= str(uuid
.uuid4())
121 # Add k8s cloud to Juju (unless it's microk8s)
123 # Does the kubeconfig contain microk8s?
124 microk8s
= self
.is_microk8s_by_credentials(k8s_creds
)
127 # Name the new k8s cloud
128 k8s_cloud
= "{}-k8s".format(namespace
)
130 await self
.add_k8s(k8s_cloud
, k8s_creds
)
132 # Bootstrap Juju controller
133 self
.bootstrap(k8s_cloud
, cluster_uuid
)
135 # k8s_cloud = 'microk8s-test'
136 k8s_cloud
= "{}-k8s".format(namespace
)
138 await self
.add_k8s(k8s_cloud
, k8s_creds
)
140 await self
.bootstrap(k8s_cloud
, cluster_uuid
)
142 # Get the controller information
144 # Parse ~/.local/share/juju/controllers.yaml
145 # controllers.testing.api-endpoints|ca-cert|uuid
146 with
open(os
.path
.expanduser(
147 "~/.local/share/juju/controllers.yaml"
149 controllers
= yaml
.load(f
, Loader
=yaml
.Loader
)
150 controller
= controllers
['controllers'][cluster_uuid
]
151 endpoints
= controller
['api-endpoints']
152 self
.juju_endpoint
= endpoints
[0]
153 self
.juju_ca_cert
= controller
['ca-cert']
155 # Parse ~/.local/share/juju/accounts
156 # controllers.testing.user|password
157 with
open(os
.path
.expanduser(
158 "~/.local/share/juju/accounts.yaml"
160 controllers
= yaml
.load(f
, Loader
=yaml
.Loader
)
161 controller
= controllers
['controllers'][cluster_uuid
]
163 self
.juju_user
= controller
['user']
164 self
.juju_secret
= controller
['password']
166 print("user: {}".format(self
.juju_user
))
167 print("secret: {}".format(self
.juju_secret
))
168 print("endpoint: {}".format(self
.juju_endpoint
))
169 print("ca-cert: {}".format(self
.juju_ca_cert
))
171 # raise Exception("EOL")
173 self
.juju_public_key
= None
176 'endpoint': self
.juju_endpoint
,
177 'username': self
.juju_user
,
178 'secret': self
.juju_secret
,
179 'cacert': self
.juju_ca_cert
,
180 'namespace': namespace
,
181 'microk8s': microk8s
,
184 # Store the cluster configuration so it
185 # can be used for subsequent calls
186 await self
.set_config(cluster_uuid
, config
)
189 # This is an existing cluster, so get its config
190 cluster_uuid
= reuse_cluster_uuid
192 config
= self
.get_config(cluster_uuid
)
194 self
.juju_endpoint
= config
['endpoint']
195 self
.juju_user
= config
['username']
196 self
.juju_secret
= config
['secret']
197 self
.juju_ca_cert
= config
['cacert']
198 self
.juju_public_key
= None
200 # Login to the k8s cluster
201 if not self
.authenticated
:
204 # We're creating a new cluster
205 print("Getting model {}".format(self
.get_namespace(cluster_uuid
)))
206 model
= await self
.get_model(self
.get_namespace(cluster_uuid
))
208 # Disconnect from the model
209 if model
and model
.is_connected():
210 await model
.disconnect()
214 """Repo Management"""
221 raise NotImplemented()
223 async def repo_list(self
):
224 raise NotImplemented()
226 async def repo_remove(
230 raise NotImplemented()
239 Resets the Kubernetes cluster by removing the model that represents it.
241 :param cluster_uuid str: The UUID of the cluster to reset
242 :return: Returns True if successful or raises an exception.
246 if not self
.authenticated
:
249 if self
.controller
.is_connected():
251 namespace
= self
.get_namespace(cluster_uuid
)
252 if await self
.has_model(namespace
):
253 print("[reset] Destroying model")
254 await self
.controller
.destroy_model(
259 # Disconnect from the controller
260 print("[reset] Disconnecting controller")
261 await self
.controller
.disconnect()
263 # Destroy the controller (via CLI)
264 print("[reset] Destroying controller")
265 await self
.destroy_controller(cluster_uuid
)
267 """Remove the k8s cloud
269 Only remove the k8s cloud if it's not a microk8s cloud,
270 since microk8s is a built-in cloud type.
272 # microk8s = self.is_microk8s_by_cluster_uuid(cluster_uuid)
274 print("[reset] Removing k8s cloud")
275 namespace
= self
.get_namespace(cluster_uuid
)
276 k8s_cloud
= "{}-k8s".format(namespace
)
277 await self
.remove_cloud(k8s_cloud
)
279 except Exception as ex
:
280 print("Caught exception during reset: {}".format(ex
))
293 :param cluster_uuid str: The UUID of the cluster to install to
294 :param kdu_model str: The name or path of a bundle to install
295 :param atomic bool: If set, waits until the model is active and resets
296 the cluster on failure.
297 :param timeout int: The time, in seconds, to wait for the install
299 :param params dict: Key-value pairs of instantiation parameters
301 :return: If successful, returns ?
304 if not self
.authenticated
:
305 print("[install] Logging in to the controller")
309 # Get or create the model, based on the namespace the cluster was
311 namespace
= self
.get_namespace(cluster_uuid
)
312 model
= await self
.get_model(namespace
)
314 # Create the new model
315 model
= await self
.add_model(namespace
)
318 # TODO: Instantiation parameters
320 print("[install] deploying {}".format(kdu_model
))
321 await model
.deploy(kdu_model
)
323 # Get the application
325 # applications = model.applications
326 print("[install] Applications: {}".format(model
.applications
))
327 for name
in model
.applications
:
328 print("[install] Waiting for {} to settle".format(name
))
329 application
= model
.applications
[name
]
331 # It's not enough to wait for all units to be active;
332 # the application status needs to be active as well.
333 print("Waiting for all units to be active...")
334 await model
.block_until(
336 unit
.agent_status
== 'idle'
337 and application
.status
in ['active', 'unknown']
338 and unit
.workload_status
in [
340 ] for unit
in application
.units
344 print("All units active.")
346 except concurrent
.futures
._base
.TimeoutError
:
347 print("[install] Timeout exceeded; resetting cluster")
348 await self
.reset(cluster_uuid
)
351 # Wait for the application to be active
352 if model
.is_connected():
353 print("[install] Disconnecting model")
354 await model
.disconnect()
357 raise Exception("Unable to install")
359 async def instances_list(
364 returns a list of deployed releases in a cluster
366 :param cluster_uuid: the cluster
375 kdu_model
: str = None,
380 :param cluster_uuid str: The UUID of the cluster to upgrade
381 :param kdu_instance str: The unique name of the KDU instance
382 :param kdu_model str: The name or path of the bundle to upgrade to
383 :param params dict: Key-value pairs of instantiation parameters
385 :return: If successful, reference to the new revision number of the
389 # TODO: Loop through the bundle and upgrade each charm individually
392 The API doesn't have a concept of bundle upgrades, because there are
393 many possible changes: charm revision, disk, number of units, etc.
395 As such, we are only supporting a limited subset of upgrades. We'll
396 upgrade the charm revision but leave storage and scale untouched.
398 Scale changes should happen through OSM constructs, and changes to
399 storage would require a redeployment of the service, at least in this
402 namespace
= self
.get_namespace(cluster_uuid
)
403 model
= await self
.get_model(namespace
)
405 with
open(kdu_model
, 'r') as f
:
406 bundle
= yaml
.load(f
, Loader
=yaml
.FullLoader
)
410 'description': 'Test bundle',
411 'bundle': 'kubernetes',
414 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
417 'password': 'manopw',
418 'root_password': 'osm4u',
421 'series': 'kubernetes'
426 # TODO: This should be returned in an agreed-upon format
427 for name
in bundle
['applications']:
428 print(model
.applications
)
429 application
= model
.applications
[name
]
432 path
= bundle
['applications'][name
]['charm']
435 await application
.upgrade_charm(switch
=path
)
436 except juju
.errors
.JujuError
as ex
:
437 if 'already running charm' in str(ex
):
438 # We're already running this version
441 await model
.disconnect()
444 raise NotImplemented()
455 :param cluster_uuid str: The UUID of the cluster to rollback
456 :param kdu_instance str: The unique name of the KDU instance
457 :param revision int: The revision to revert to. If omitted, rolls back
458 the previous upgrade.
460 :return: If successful, returns the revision of active KDU instance,
461 or raises an exception
463 raise NotImplemented()
471 """Uninstall a KDU instance
473 :param cluster_uuid str: The UUID of the cluster to uninstall
474 :param kdu_instance str: The unique name of the KDU instance
476 :return: Returns True if successful, or raises an exception
480 # Remove an application from the model
481 model
= await self
.get_model(self
.get_namespace(cluster_uuid
))
484 # Get the application
485 if kdu_instance
not in model
.applications
:
486 # TODO: Raise a named exception
487 raise Exception("Application not found.")
489 application
= model
.applications
[kdu_instance
]
491 # Destroy the application
492 await application
.destroy()
494 # TODO: Verify removal
500 async def inspect_kdu(
506 Inspects a bundle and returns a dictionary of config parameters and
507 their default values.
509 :param kdu_model str: The name or path of the bundle to inspect.
511 :return: If successful, returns a dictionary of available parameters
512 and their default values.
516 with
open(kdu_model
, 'r') as f
:
517 bundle
= yaml
.load(f
, Loader
=yaml
.FullLoader
)
521 'description': 'Test bundle',
522 'bundle': 'kubernetes',
525 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
528 'password': 'manopw',
529 'root_password': 'osm4u',
532 'series': 'kubernetes'
537 # TODO: This should be returned in an agreed-upon format
538 kdu
= bundle
['applications']
548 If available, returns the README of the bundle.
550 :param kdu_model str: The name or path of a bundle
552 :return: If found, returns the contents of the README.
556 files
= ['README', 'README.txt', 'README.md']
557 path
= os
.path
.dirname(kdu_model
)
558 for file in os
.listdir(path
):
560 with
open(file, 'r') as f
:
566 async def status_kdu(
571 """Get the status of the KDU
573 Get the current status of the KDU instance.
575 :param cluster_uuid str: The UUID of the cluster
576 :param kdu_instance str: The unique id of the KDU instance
578 :return: Returns a dictionary containing namespace, state, resources,
583 model
= await self
.get_model(self
.get_namespace(cluster_uuid
))
585 # model = await self.get_model_by_uuid(cluster_uuid)
587 model_status
= await model
.get_status()
588 status
= model_status
.applications
590 for name
in model_status
.applications
:
591 application
= model_status
.applications
[name
]
593 'status': application
['status']['status']
596 if model
.is_connected():
597 await model
.disconnect()
607 """Add a k8s cloud to Juju
609 Adds a Kubernetes cloud to Juju, so it can be bootstrapped with a
612 :param cloud_name str: The name of the cloud to add.
613 :param credentials dict: A dictionary representing the output of
614 `kubectl config view --raw`.
616 :returns: True if successful, otherwise raises an exception.
618 cmd
= [self
.k8scli
, "add-k8s", "--local", cloud_name
]
622 stdout
=subprocess
.PIPE
,
623 stderr
=subprocess
.PIPE
,
624 input=yaml
.dump(credentials
, Dumper
=yaml
.Dumper
),
627 retcode
= p
.returncode
630 raise Exception(p
.stderr
)
636 ) -> juju
.model
.Model
:
637 """Adds a model to the controller
639 Adds a new model to the Juju controller
641 :param model_name str: The name of the model to add.
642 :returns: The juju.model.Model object of the new model upon success or
645 if not self
.authenticated
:
648 model
= await self
.controller
.add_model(
650 config
={'authorized-keys': self
.juju_public_key
}
659 """Bootstrap a Kubernetes controller
661 Bootstrap a Juju controller inside the Kubernetes cluster
663 :param cloud_name str: The name of the cloud.
664 :param cluster_uuid str: The UUID of the cluster to bootstrap.
665 :returns: True upon success or raises an exception.
667 cmd
= [self
.k8scli
, "bootstrap", cloud_name
, cluster_uuid
]
668 print("Bootstrapping controller {} in cloud {}".format(
669 cluster_uuid
, cloud_name
674 stdout
=subprocess
.PIPE
,
675 stderr
=subprocess
.PIPE
,
678 retcode
= p
.returncode
682 if 'already exists' not in p
.stderr
:
683 raise Exception(p
.stderr
)
687 async def destroy_controller(
691 """Destroy a Kubernetes controller
693 Destroy an existing Kubernetes controller.
695 :param cluster_uuid str: The UUID of the cluster to bootstrap.
696 :returns: True upon success or raises an exception.
700 "destroy-controller",
701 "--destroy-all-models",
709 stdout
=subprocess
.PIPE
,
710 stderr
=subprocess
.PIPE
,
713 retcode
= p
.returncode
717 if 'already exists' not in p
.stderr
:
718 raise Exception(p
.stderr
)
724 """Get the cluster configuration
726 Gets the configuration of the cluster
728 :param cluster_uuid str: The UUID of the cluster.
729 :return: A dict upon success, or raises an exception.
731 cluster_config
= "{}/{}.yaml".format(self
.fs
.path
, cluster_uuid
)
732 if os
.path
.exists(cluster_config
):
733 with
open(cluster_config
, 'r') as f
:
734 config
= yaml
.load(f
.read(), Loader
=yaml
.FullLoader
)
738 "Unable to locate configuration for cluster {}".format(
746 ) -> juju
.model
.Model
:
747 """Get a model from the Juju Controller.
749 Note: Model objects returned must call disconnected() before it goes
752 :param model_name str: The name of the model to get
753 :return The juju.model.Model object if found, or None.
755 if not self
.authenticated
:
759 models
= await self
.controller
.list_models()
761 if model_name
in models
:
762 model
= await self
.controller
.get_model(
771 """Get the namespace UUID
772 Gets the namespace's unique name
774 :param cluster_uuid str: The UUID of the cluster
775 :returns: The namespace UUID, or raises an exception
777 config
= self
.get_config(cluster_uuid
)
779 # Make sure the name is in the config
780 if 'namespace' not in config
:
781 raise Exception("Namespace not found.")
783 # TODO: We want to make sure this is unique to the cluster, in case
784 # the cluster is being reused.
785 # Consider pre/appending the cluster id to the namespace string
786 return config
['namespace']
792 """Check if a model exists in the controller
794 Checks to see if a model exists in the connected Juju controller.
796 :param model_name str: The name of the model
797 :return: A boolean indicating if the model exists
799 models
= await self
.controller
.list_models()
801 if model_name
in models
:
805 def is_microk8s_by_cluster_uuid(
809 """Check if a cluster is micro8s
811 Checks if a cluster is running microk8s
813 :param cluster_uuid str: The UUID of the cluster
814 :returns: A boolean if the cluster is running microk8s
816 config
= self
.get_config(cluster_uuid
)
817 return config
['microk8s']
819 def is_microk8s_by_credentials(
823 """Check if a cluster is micro8s
825 Checks if a cluster is running microk8s
827 :param credentials dict: A dictionary containing the k8s credentials
828 :returns: A boolean if the cluster is running microk8s
830 for context
in credentials
['contexts']:
831 if 'microk8s' in context
['name']:
836 async def login(self
):
837 """Login to the Juju controller."""
839 if self
.authenticated
:
842 self
.connecting
= True
844 self
.controller
= Controller()
848 "Connecting to controller... ws://{} as {}/{}".format(
855 await self
.controller
.connect(
856 endpoint
=self
.juju_endpoint
,
857 username
=self
.juju_user
,
858 password
=self
.juju_secret
,
859 cacert
=self
.juju_ca_cert
,
861 self
.authenticated
= True
862 self
.log
.debug("JujuApi: Logged into controller")
863 except Exception as ex
:
865 self
.log
.debug("Caught exception: {}".format(ex
))
868 self
.log
.fatal("VCA credentials not configured.")
869 self
.authenticated
= False
871 async def logout(self
):
872 """Logout of the Juju controller."""
874 if not self
.authenticated
:
877 for model
in self
.models
:
878 print("Logging out of model {}".format(model
))
879 await self
.models
[model
].disconnect()
882 self
.log
.debug("Disconnecting controller {}".format(
885 await self
.controller
.disconnect()
886 self
.controller
= None
888 self
.authenticated
= False
890 async def remove_cloud(
894 """Remove a k8s cloud from Juju
896 Removes a Kubernetes cloud from Juju.
898 :param cloud_name str: The name of the cloud to add.
900 :returns: True if successful, otherwise raises an exception.
903 # Remove the bootstrapped controller
904 cmd
= [self
.k8scli
, "remove-k8s", "--client", cloud_name
]
907 stdout
=subprocess
.PIPE
,
908 stderr
=subprocess
.PIPE
,
911 retcode
= p
.returncode
914 raise Exception(p
.stderr
)
916 # Remove the cloud from the local config
917 cmd
= [self
.k8scli
, "remove-cloud", "--client", cloud_name
]
920 stdout
=subprocess
.PIPE
,
921 stderr
=subprocess
.PIPE
,
924 retcode
= p
.returncode
927 raise Exception(p
.stderr
)
932 async def set_config(
937 """Save the cluster configuration
939 Saves the cluster information to the file store
941 :param cluster_uuid str: The UUID of the cluster
942 :param config dict: A dictionary containing the cluster configuration
943 :returns: Boolean upon success or raises an exception.
946 cluster_config
= "{}/{}.yaml".format(self
.fs
.path
, cluster_uuid
)
947 if not os
.path
.exists(cluster_config
):
948 print("Writing config to {}".format(cluster_config
))
949 with
open(cluster_config
, 'w') as f
:
950 f
.write(yaml
.dump(config
, Dumper
=yaml
.Dumper
))