1 # Copyright 2019 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
16 from .exceptions
import NotImplemented
19 # from juju.bundle import BundleHandler
20 from juju
.controller
import Controller
21 from juju
.model
import Model
22 from juju
.errors
import JujuAPIError
, JujuError
26 from n2vc
.k8s_conn
import K8sConnector
32 # from .vnf import N2VC
38 class K8sJujuConnector(K8sConnector
):
44 kubectl_command
: str = '/usr/bin/kubectl',
45 juju_command
: str = '/usr/bin/juju',
51 :param kubectl_command: path to kubectl executable
52 :param helm_command: path to helm executable
53 :param fs: file system for kubernetes and helm configuration
58 K8sConnector
.__init
__(
62 on_update_db
=on_update_db
,
66 self
.info('Initializing K8S Juju connector')
68 self
.authenticated
= False
70 self
.log
= logging
.getLogger(__name__
)
74 self
.info('K8S Juju connector initialized')
80 namespace
: str = 'kube-system',
81 reuse_cluster_uuid
: str = None,
83 """Initialize a Kubernetes environment
85 :param k8s_creds dict: A dictionary containing the Kubernetes cluster
87 :param namespace str: The Kubernetes namespace to initialize
89 :return: UUID of the k8s context or raises an exception
94 Bootstrapping cannot be done, by design, through the API. We need to
97 # TODO: The path may change
98 jujudir
= "/usr/local/bin"
100 self
.k8scli
= "{}/juju".format(jujudir
)
105 1. Has the environment already been bootstrapped?
106 - Check the database to see if we have a record for this env
108 2. If this is a new env, create it
109 - Add the k8s cloud to Juju
111 - Record it in the database
113 3. Connect to the Juju controller for this cloud
116 # cluster_uuid = reuse_cluster_uuid
117 # if not cluster_uuid:
118 # cluster_uuid = str(uuid4())
120 ##################################################
121 # TODO: Pull info from db based on the namespace #
122 ##################################################
124 if not reuse_cluster_uuid
:
125 # This is a new cluster, so bootstrap it
127 cluster_uuid
= str(uuid
.uuid4())
129 # Add k8s cloud to Juju (unless it's microk8s)
132 # k8s_creds = yaml.safe_load(k8s_creds)
134 # Does the kubeconfig contain microk8s?
135 microk8s
= self
.is_microk8s_by_credentials(k8s_creds
)
138 # Name the new k8s cloud
139 k8s_cloud
= "{}-k8s".format(namespace
)
141 print("Adding k8s cloud {}".format(k8s_cloud
))
142 await self
.add_k8s(k8s_cloud
, k8s_creds
)
144 # Bootstrap Juju controller
145 print("Bootstrapping...")
146 await self
.bootstrap(k8s_cloud
, cluster_uuid
)
147 print("Bootstrap done.")
149 # k8s_cloud = 'microk8s-test'
150 k8s_cloud
= "{}-k8s".format(namespace
)
152 await self
.add_k8s(k8s_cloud
, k8s_creds
)
154 await self
.bootstrap(k8s_cloud
, cluster_uuid
)
156 # Get the controller information
158 # Parse ~/.local/share/juju/controllers.yaml
159 # controllers.testing.api-endpoints|ca-cert|uuid
160 print("Getting controller endpoints")
161 with
open(os
.path
.expanduser(
162 "~/.local/share/juju/controllers.yaml"
164 controllers
= yaml
.load(f
, Loader
=yaml
.Loader
)
165 controller
= controllers
['controllers'][cluster_uuid
]
166 endpoints
= controller
['api-endpoints']
167 self
.juju_endpoint
= endpoints
[0]
168 self
.juju_ca_cert
= controller
['ca-cert']
170 # Parse ~/.local/share/juju/accounts
171 # controllers.testing.user|password
172 print("Getting accounts")
173 with
open(os
.path
.expanduser(
174 "~/.local/share/juju/accounts.yaml"
176 controllers
= yaml
.load(f
, Loader
=yaml
.Loader
)
177 controller
= controllers
['controllers'][cluster_uuid
]
179 self
.juju_user
= controller
['user']
180 self
.juju_secret
= controller
['password']
182 print("user: {}".format(self
.juju_user
))
183 print("secret: {}".format(self
.juju_secret
))
184 print("endpoint: {}".format(self
.juju_endpoint
))
185 print("ca-cert: {}".format(self
.juju_ca_cert
))
187 # raise Exception("EOL")
189 self
.juju_public_key
= None
192 'endpoint': self
.juju_endpoint
,
193 'username': self
.juju_user
,
194 'secret': self
.juju_secret
,
195 'cacert': self
.juju_ca_cert
,
196 'namespace': namespace
,
197 'microk8s': microk8s
,
200 # Store the cluster configuration so it
201 # can be used for subsequent calls
202 print("Setting config")
203 await self
.set_config(cluster_uuid
, config
)
206 # This is an existing cluster, so get its config
207 cluster_uuid
= reuse_cluster_uuid
209 config
= self
.get_config(cluster_uuid
)
211 self
.juju_endpoint
= config
['endpoint']
212 self
.juju_user
= config
['username']
213 self
.juju_secret
= config
['secret']
214 self
.juju_ca_cert
= config
['cacert']
215 self
.juju_public_key
= None
217 # Login to the k8s cluster
218 if not self
.authenticated
:
219 await self
.login(cluster_uuid
)
221 # We're creating a new cluster
222 print("Getting model {}".format(self
.get_namespace(cluster_uuid
), cluster_uuid
=cluster_uuid
))
223 model
= await self
.get_model(
224 self
.get_namespace(cluster_uuid
),
225 cluster_uuid
=cluster_uuid
228 # Disconnect from the model
229 if model
and model
.is_connected():
230 await model
.disconnect()
232 return cluster_uuid
, True
234 """Repo Management"""
241 raise NotImplemented()
243 async def repo_list(self
):
244 raise NotImplemented()
246 async def repo_remove(
250 raise NotImplemented()
257 uninstall_sw
: bool = False
261 Resets the Kubernetes cluster by removing the model that represents it.
263 :param cluster_uuid str: The UUID of the cluster to reset
264 :return: Returns True if successful or raises an exception.
268 if not self
.authenticated
:
269 await self
.login(cluster_uuid
)
271 if self
.controller
.is_connected():
273 namespace
= self
.get_namespace(cluster_uuid
)
274 if await self
.has_model(namespace
):
275 print("[reset] Destroying model")
276 await self
.controller
.destroy_model(
281 # Disconnect from the controller
282 print("[reset] Disconnecting controller")
283 await self
.controller
.disconnect()
285 # Destroy the controller (via CLI)
286 print("[reset] Destroying controller")
287 await self
.destroy_controller(cluster_uuid
)
289 """Remove the k8s cloud
291 Only remove the k8s cloud if it's not a microk8s cloud,
292 since microk8s is a built-in cloud type.
294 # microk8s = self.is_microk8s_by_cluster_uuid(cluster_uuid)
296 print("[reset] Removing k8s cloud")
297 namespace
= self
.get_namespace(cluster_uuid
)
298 k8s_cloud
= "{}-k8s".format(namespace
)
299 await self
.remove_cloud(k8s_cloud
)
301 except Exception as ex
:
302 print("Caught exception during reset: {}".format(ex
))
311 timeout
: float = 300,
317 :param cluster_uuid str: The UUID of the cluster to install to
318 :param kdu_model str: The name or path of a bundle to install
319 :param atomic bool: If set, waits until the model is active and resets
320 the cluster on failure.
321 :param timeout int: The time, in seconds, to wait for the install
323 :param params dict: Key-value pairs of instantiation parameters
325 :return: If successful, returns ?
328 if not self
.authenticated
:
329 print("[install] Logging in to the controller")
330 await self
.login(cluster_uuid
)
333 # Get or create the model, based on the namespace the cluster was
335 namespace
= self
.get_namespace(cluster_uuid
)
336 namespace
= "gitlab-demo"
337 self
.log
.debug("Checking for model named {}".format(namespace
))
338 model
= await self
.get_model(namespace
, cluster_uuid
=cluster_uuid
)
340 # Create the new model
341 self
.log
.debug("Adding model: {}".format(namespace
))
342 model
= await self
.add_model(namespace
, cluster_uuid
=cluster_uuid
)
345 # TODO: Instantiation parameters
348 "Juju bundle that models the KDU, in any of the following ways:
349 - <juju-repo>/<juju-bundle>
350 - <juju-bundle folder under k8s_models folder in the package>
351 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder in the package>
352 - <URL_where_to_fetch_juju_bundle>
356 if kdu_model
.startswith("cs:"):
358 elif kdu_model
.startswith("http"):
364 # if kdu_model.endswith(".tar.gz") or kdu_model.endswith(".tgz")
365 # Uncompress temporarily
366 # bundle = <uncompressed file>
370 # Raise named exception that the bundle could not be found
373 print("[install] deploying {}".format(bundle
))
374 await model
.deploy(bundle
)
376 # Get the application
378 # applications = model.applications
379 print("[install] Applications: {}".format(model
.applications
))
380 for name
in model
.applications
:
381 print("[install] Waiting for {} to settle".format(name
))
382 application
= model
.applications
[name
]
384 # It's not enough to wait for all units to be active;
385 # the application status needs to be active as well.
386 print("Waiting for all units to be active...")
387 await model
.block_until(
389 unit
.agent_status
== 'idle'
390 and application
.status
in ['active', 'unknown']
391 and unit
.workload_status
in [
393 ] for unit
in application
.units
397 print("All units active.")
399 except concurrent
.futures
._base
.TimeoutError
:
400 print("[install] Timeout exceeded; resetting cluster")
401 await self
.reset(cluster_uuid
)
404 # Wait for the application to be active
405 if model
.is_connected():
406 print("[install] Disconnecting model")
407 await model
.disconnect()
410 raise Exception("Unable to install")
412 async def instances_list(
417 returns a list of deployed releases in a cluster
419 :param cluster_uuid: the cluster
428 kdu_model
: str = None,
433 :param cluster_uuid str: The UUID of the cluster to upgrade
434 :param kdu_instance str: The unique name of the KDU instance
435 :param kdu_model str: The name or path of the bundle to upgrade to
436 :param params dict: Key-value pairs of instantiation parameters
438 :return: If successful, reference to the new revision number of the
442 # TODO: Loop through the bundle and upgrade each charm individually
445 The API doesn't have a concept of bundle upgrades, because there are
446 many possible changes: charm revision, disk, number of units, etc.
448 As such, we are only supporting a limited subset of upgrades. We'll
449 upgrade the charm revision but leave storage and scale untouched.
451 Scale changes should happen through OSM constructs, and changes to
452 storage would require a redeployment of the service, at least in this
455 namespace
= self
.get_namespace(cluster_uuid
)
456 model
= await self
.get_model(namespace
, cluster_uuid
=cluster_uuid
)
458 with
open(kdu_model
, 'r') as f
:
459 bundle
= yaml
.safe_load(f
)
463 'description': 'Test bundle',
464 'bundle': 'kubernetes',
467 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
470 'password': 'manopw',
471 'root_password': 'osm4u',
474 'series': 'kubernetes'
479 # TODO: This should be returned in an agreed-upon format
480 for name
in bundle
['applications']:
481 print(model
.applications
)
482 application
= model
.applications
[name
]
485 path
= bundle
['applications'][name
]['charm']
488 await application
.upgrade_charm(switch
=path
)
489 except juju
.errors
.JujuError
as ex
:
490 if 'already running charm' in str(ex
):
491 # We're already running this version
494 await model
.disconnect()
497 raise NotImplemented()
508 :param cluster_uuid str: The UUID of the cluster to rollback
509 :param kdu_instance str: The unique name of the KDU instance
510 :param revision int: The revision to revert to. If omitted, rolls back
511 the previous upgrade.
513 :return: If successful, returns the revision of active KDU instance,
514 or raises an exception
516 raise NotImplemented()
524 """Uninstall a KDU instance
526 :param cluster_uuid str: The UUID of the cluster to uninstall
527 :param kdu_instance str: The unique name of the KDU instance
529 :return: Returns True if successful, or raises an exception
533 # Remove an application from the model
534 model
= await self
.get_model(self
.get_namespace(cluster_uuid
), cluster_uuid
=cluster_uuid
)
537 # Get the application
538 if kdu_instance
not in model
.applications
:
539 # TODO: Raise a named exception
540 raise Exception("Application not found.")
542 application
= model
.applications
[kdu_instance
]
544 # Destroy the application
545 await application
.destroy()
547 # TODO: Verify removal
553 async def inspect_kdu(
559 Inspects a bundle and returns a dictionary of config parameters and
560 their default values.
562 :param kdu_model str: The name or path of the bundle to inspect.
564 :return: If successful, returns a dictionary of available parameters
565 and their default values.
569 with
open(kdu_model
, 'r') as f
:
570 bundle
= yaml
.safe_load(f
)
574 'description': 'Test bundle',
575 'bundle': 'kubernetes',
578 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
581 'password': 'manopw',
582 'root_password': 'osm4u',
585 'series': 'kubernetes'
590 # TODO: This should be returned in an agreed-upon format
591 kdu
= bundle
['applications']
601 If available, returns the README of the bundle.
603 :param kdu_model str: The name or path of a bundle
605 :return: If found, returns the contents of the README.
609 files
= ['README', 'README.txt', 'README.md']
610 path
= os
.path
.dirname(kdu_model
)
611 for file in os
.listdir(path
):
613 with
open(file, 'r') as f
:
619 async def status_kdu(
624 """Get the status of the KDU
626 Get the current status of the KDU instance.
628 :param cluster_uuid str: The UUID of the cluster
629 :param kdu_instance str: The unique id of the KDU instance
631 :return: Returns a dictionary containing namespace, state, resources,
636 model
= await self
.get_model(self
.get_namespace(cluster_uuid
), cluster_uuid
=cluster_uuid
)
638 # model = await self.get_model_by_uuid(cluster_uuid)
640 model_status
= await model
.get_status()
641 status
= model_status
.applications
643 for name
in model_status
.applications
:
644 application
= model_status
.applications
[name
]
646 'status': application
['status']['status']
649 if model
.is_connected():
650 await model
.disconnect()
660 """Add a k8s cloud to Juju
662 Adds a Kubernetes cloud to Juju, so it can be bootstrapped with a
665 :param cloud_name str: The name of the cloud to add.
666 :param credentials dict: A dictionary representing the output of
667 `kubectl config view --raw`.
669 :returns: True if successful, otherwise raises an exception.
671 cmd
= [self
.k8scli
, "add-k8s", "--local", cloud_name
]
675 stdout
=subprocess
.PIPE
,
676 stderr
=subprocess
.PIPE
,
677 # input=yaml.dump(credentials, Dumper=yaml.Dumper).encode("utf-8"),
678 input=credentials
.encode("utf-8"),
681 retcode
= p
.returncode
682 print("add-k8s return code: {}".format(retcode
))
685 raise Exception(p
.stderr
)
692 ) -> juju
.model
.Model
:
693 """Adds a model to the controller
695 Adds a new model to the Juju controller
697 :param model_name str: The name of the model to add.
698 :returns: The juju.model.Model object of the new model upon success or
701 if not self
.authenticated
:
702 await self
.login(cluster_uuid
)
704 self
.log
.debug("Adding model '{}' to cluster_uuid '{}'".format(model_name
, cluster_uuid
))
705 model
= await self
.controller
.add_model(
707 config
={'authorized-keys': self
.juju_public_key
}
716 """Bootstrap a Kubernetes controller
718 Bootstrap a Juju controller inside the Kubernetes cluster
720 :param cloud_name str: The name of the cloud.
721 :param cluster_uuid str: The UUID of the cluster to bootstrap.
722 :returns: True upon success or raises an exception.
724 cmd
= [self
.k8scli
, "bootstrap", cloud_name
, cluster_uuid
]
725 print("Bootstrapping controller {} in cloud {}".format(
726 cluster_uuid
, cloud_name
731 stdout
=subprocess
.PIPE
,
732 stderr
=subprocess
.PIPE
,
735 retcode
= p
.returncode
739 if b
'already exists' not in p
.stderr
:
740 raise Exception(p
.stderr
)
744 async def destroy_controller(
748 """Destroy a Kubernetes controller
750 Destroy an existing Kubernetes controller.
752 :param cluster_uuid str: The UUID of the cluster to bootstrap.
753 :returns: True upon success or raises an exception.
757 "destroy-controller",
758 "--destroy-all-models",
766 stdout
=subprocess
.PIPE
,
767 stderr
=subprocess
.PIPE
,
770 retcode
= p
.returncode
774 if 'already exists' not in p
.stderr
:
775 raise Exception(p
.stderr
)
781 """Get the cluster configuration
783 Gets the configuration of the cluster
785 :param cluster_uuid str: The UUID of the cluster.
786 :return: A dict upon success, or raises an exception.
788 cluster_config
= "{}/{}.yaml".format(self
.fs
.path
, cluster_uuid
)
789 if os
.path
.exists(cluster_config
):
790 with
open(cluster_config
, 'r') as f
:
791 config
= yaml
.safe_load(f
.read())
795 "Unable to locate configuration for cluster {}".format(
804 ) -> juju
.model
.Model
:
805 """Get a model from the Juju Controller.
807 Note: Model objects returned must call disconnected() before it goes
810 :param model_name str: The name of the model to get
811 :return The juju.model.Model object if found, or None.
813 if not self
.authenticated
:
814 await self
.login(cluster_uuid
)
817 models
= await self
.controller
.list_models()
818 self
.log
.debug(models
)
819 if model_name
in models
:
820 self
.log
.debug("Found model: {}".format(model_name
))
821 model
= await self
.controller
.get_model(
830 """Get the namespace UUID
831 Gets the namespace's unique name
833 :param cluster_uuid str: The UUID of the cluster
834 :returns: The namespace UUID, or raises an exception
836 config
= self
.get_config(cluster_uuid
)
838 # Make sure the name is in the config
839 if 'namespace' not in config
:
840 raise Exception("Namespace not found.")
842 # TODO: We want to make sure this is unique to the cluster, in case
843 # the cluster is being reused.
844 # Consider pre/appending the cluster id to the namespace string
845 return config
['namespace']
851 """Check if a model exists in the controller
853 Checks to see if a model exists in the connected Juju controller.
855 :param model_name str: The name of the model
856 :return: A boolean indicating if the model exists
858 models
= await self
.controller
.list_models()
860 if model_name
in models
:
864 def is_microk8s_by_cluster_uuid(
868 """Check if a cluster is micro8s
870 Checks if a cluster is running microk8s
872 :param cluster_uuid str: The UUID of the cluster
873 :returns: A boolean if the cluster is running microk8s
875 config
= self
.get_config(cluster_uuid
)
876 return config
['microk8s']
878 def is_microk8s_by_credentials(
882 """Check if a cluster is micro8s
884 Checks if a cluster is running microk8s
886 :param credentials dict: A dictionary containing the k8s credentials
887 :returns: A boolean if the cluster is running microk8s
889 creds
= yaml
.safe_load(credentials
)
891 for context
in creds
['contexts']:
892 if 'microk8s' in context
['name']:
897 async def login(self
, cluster_uuid
):
898 """Login to the Juju controller."""
900 if self
.authenticated
:
903 self
.connecting
= True
905 # Test: Make sure we have the credentials loaded
906 config
= self
.get_config(cluster_uuid
)
908 self
.juju_endpoint
= config
['endpoint']
909 self
.juju_user
= config
['username']
910 self
.juju_secret
= config
['secret']
911 self
.juju_ca_cert
= config
['cacert']
912 self
.juju_public_key
= None
914 self
.controller
= Controller()
918 "Connecting to controller... ws://{} as {}/{}".format(
925 await self
.controller
.connect(
926 endpoint
=self
.juju_endpoint
,
927 username
=self
.juju_user
,
928 password
=self
.juju_secret
,
929 cacert
=self
.juju_ca_cert
,
931 self
.authenticated
= True
932 self
.log
.debug("JujuApi: Logged into controller")
933 except Exception as ex
:
935 self
.log
.debug("Caught exception: {}".format(ex
))
938 self
.log
.fatal("VCA credentials not configured.")
939 self
.authenticated
= False
941 async def logout(self
):
942 """Logout of the Juju controller."""
944 if not self
.authenticated
:
947 for model
in self
.models
:
948 print("Logging out of model {}".format(model
))
949 await self
.models
[model
].disconnect()
952 self
.log
.debug("Disconnecting controller {}".format(
955 await self
.controller
.disconnect()
956 self
.controller
= None
958 self
.authenticated
= False
960 async def remove_cloud(
964 """Remove a k8s cloud from Juju
966 Removes a Kubernetes cloud from Juju.
968 :param cloud_name str: The name of the cloud to add.
970 :returns: True if successful, otherwise raises an exception.
973 # Remove the bootstrapped controller
974 cmd
= [self
.k8scli
, "remove-k8s", "--client", cloud_name
]
977 stdout
=subprocess
.PIPE
,
978 stderr
=subprocess
.PIPE
,
981 retcode
= p
.returncode
984 raise Exception(p
.stderr
)
986 # Remove the cloud from the local config
987 cmd
= [self
.k8scli
, "remove-cloud", "--client", cloud_name
]
990 stdout
=subprocess
.PIPE
,
991 stderr
=subprocess
.PIPE
,
994 retcode
= p
.returncode
997 raise Exception(p
.stderr
)
1002 async def set_config(
1007 """Save the cluster configuration
1009 Saves the cluster information to the file store
1011 :param cluster_uuid str: The UUID of the cluster
1012 :param config dict: A dictionary containing the cluster configuration
1013 :returns: Boolean upon success or raises an exception.
1016 cluster_config
= "{}/{}.yaml".format(self
.fs
.path
, cluster_uuid
)
1017 if not os
.path
.exists(cluster_config
):
1018 print("Writing config to {}".format(cluster_config
))
1019 with
open(cluster_config
, 'w') as f
:
1020 f
.write(yaml
.dump(config
, Dumper
=yaml
.Dumper
))