1 # Copyright 2019 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
16 from .exceptions
import NotImplemented
19 # from juju.bundle import BundleHandler
20 from juju
.controller
import Controller
21 from juju
.model
import Model
22 from juju
.errors
import JujuAPIError
, JujuError
26 from n2vc
.k8s_conn
import K8sConnector
32 # from .vnf import N2VC
38 class K8sJujuConnector(K8sConnector
):
44 kubectl_command
: str = '/usr/bin/kubectl',
45 juju_command
: str = '/usr/bin/juju',
51 :param kubectl_command: path to kubectl executable
52 :param helm_command: path to helm executable
53 :param fs: file system for kubernetes and helm configuration
58 K8sConnector
.__init
__(
62 on_update_db
=on_update_db
,
66 self
.info('Initializing K8S Juju connector')
68 self
.authenticated
= False
70 self
.log
= logging
.getLogger(__name__
)
72 self
.juju_command
= juju_command
75 self
.info('K8S Juju connector initialized')
81 namespace
: str = 'kube-system',
82 reuse_cluster_uuid
: str = None,
84 """Initialize a Kubernetes environment
86 :param k8s_creds dict: A dictionary containing the Kubernetes cluster
88 :param namespace str: The Kubernetes namespace to initialize
90 :return: UUID of the k8s context or raises an exception
95 Bootstrapping cannot be done, by design, through the API. We need to
102 1. Has the environment already been bootstrapped?
103 - Check the database to see if we have a record for this env
105 2. If this is a new env, create it
106 - Add the k8s cloud to Juju
108 - Record it in the database
110 3. Connect to the Juju controller for this cloud
113 # cluster_uuid = reuse_cluster_uuid
114 # if not cluster_uuid:
115 # cluster_uuid = str(uuid4())
117 ##################################################
118 # TODO: Pull info from db based on the namespace #
119 ##################################################
121 if not reuse_cluster_uuid
:
122 # This is a new cluster, so bootstrap it
124 cluster_uuid
= str(uuid
.uuid4())
126 # Is a local k8s cluster?
127 localk8s
= self
.is_local_k8s(k8s_creds
)
129 # If the k8s is external, the juju controller needs a loadbalancer
130 loadbalancer
= False if localk8s
else True
132 # Name the new k8s cloud
133 k8s_cloud
= "{}-k8s".format(namespace
)
135 print("Adding k8s cloud {}".format(k8s_cloud
))
136 await self
.add_k8s(k8s_cloud
, k8s_creds
)
138 # Bootstrap Juju controller
139 print("Bootstrapping...")
140 await self
.bootstrap(k8s_cloud
, cluster_uuid
, loadbalancer
)
141 print("Bootstrap done.")
143 # Get the controller information
145 # Parse ~/.local/share/juju/controllers.yaml
146 # controllers.testing.api-endpoints|ca-cert|uuid
147 print("Getting controller endpoints")
148 with
open(os
.path
.expanduser(
149 "~/.local/share/juju/controllers.yaml"
151 controllers
= yaml
.load(f
, Loader
=yaml
.Loader
)
152 controller
= controllers
['controllers'][cluster_uuid
]
153 endpoints
= controller
['api-endpoints']
154 self
.juju_endpoint
= endpoints
[0]
155 self
.juju_ca_cert
= controller
['ca-cert']
157 # Parse ~/.local/share/juju/accounts
158 # controllers.testing.user|password
159 print("Getting accounts")
160 with
open(os
.path
.expanduser(
161 "~/.local/share/juju/accounts.yaml"
163 controllers
= yaml
.load(f
, Loader
=yaml
.Loader
)
164 controller
= controllers
['controllers'][cluster_uuid
]
166 self
.juju_user
= controller
['user']
167 self
.juju_secret
= controller
['password']
169 print("user: {}".format(self
.juju_user
))
170 print("secret: {}".format(self
.juju_secret
))
171 print("endpoint: {}".format(self
.juju_endpoint
))
172 print("ca-cert: {}".format(self
.juju_ca_cert
))
174 # raise Exception("EOL")
176 self
.juju_public_key
= None
179 'endpoint': self
.juju_endpoint
,
180 'username': self
.juju_user
,
181 'secret': self
.juju_secret
,
182 'cacert': self
.juju_ca_cert
,
183 'namespace': namespace
,
184 'loadbalancer': loadbalancer
,
187 # Store the cluster configuration so it
188 # can be used for subsequent calls
189 print("Setting config")
190 await self
.set_config(cluster_uuid
, config
)
193 # This is an existing cluster, so get its config
194 cluster_uuid
= reuse_cluster_uuid
196 config
= self
.get_config(cluster_uuid
)
198 self
.juju_endpoint
= config
['endpoint']
199 self
.juju_user
= config
['username']
200 self
.juju_secret
= config
['secret']
201 self
.juju_ca_cert
= config
['cacert']
202 self
.juju_public_key
= None
204 # Login to the k8s cluster
205 if not self
.authenticated
:
206 await self
.login(cluster_uuid
)
208 # We're creating a new cluster
209 print("Getting model {}".format(self
.get_namespace(cluster_uuid
), cluster_uuid
=cluster_uuid
))
210 model
= await self
.get_model(
211 self
.get_namespace(cluster_uuid
),
212 cluster_uuid
=cluster_uuid
215 # Disconnect from the model
216 if model
and model
.is_connected():
217 await model
.disconnect()
219 return cluster_uuid
, True
221 """Repo Management"""
228 raise NotImplemented()
230 async def repo_list(self
):
231 raise NotImplemented()
233 async def repo_remove(
237 raise NotImplemented()
244 uninstall_sw
: bool = False
248 Resets the Kubernetes cluster by removing the model that represents it.
250 :param cluster_uuid str: The UUID of the cluster to reset
251 :return: Returns True if successful or raises an exception.
255 if not self
.authenticated
:
256 await self
.login(cluster_uuid
)
258 if self
.controller
.is_connected():
260 namespace
= self
.get_namespace(cluster_uuid
)
261 if await self
.has_model(namespace
):
262 print("[reset] Destroying model")
263 await self
.controller
.destroy_model(
268 # Disconnect from the controller
269 print("[reset] Disconnecting controller")
270 await self
.controller
.disconnect()
272 # Destroy the controller (via CLI)
273 print("[reset] Destroying controller")
274 await self
.destroy_controller(cluster_uuid
)
276 print("[reset] Removing k8s cloud")
277 namespace
= self
.get_namespace(cluster_uuid
)
278 k8s_cloud
= "{}-k8s".format(namespace
)
279 await self
.remove_cloud(k8s_cloud
)
281 except Exception as ex
:
282 print("Caught exception during reset: {}".format(ex
))
291 timeout
: float = 300,
297 :param cluster_uuid str: The UUID of the cluster to install to
298 :param kdu_model str: The name or path of a bundle to install
299 :param atomic bool: If set, waits until the model is active and resets
300 the cluster on failure.
301 :param timeout int: The time, in seconds, to wait for the install
303 :param params dict: Key-value pairs of instantiation parameters
305 :return: If successful, returns ?
308 if not self
.authenticated
:
309 print("[install] Logging in to the controller")
310 await self
.login(cluster_uuid
)
313 # Get or create the model, based on the NS
315 model_name
= db_dict
["filter"]["_id"]
317 self
.log
.debug("Checking for model named {}".format(model_name
))
318 model
= await self
.get_model(model_name
, cluster_uuid
=cluster_uuid
)
320 # Create the new model
321 self
.log
.debug("Adding model: {}".format(model_name
))
322 model
= await self
.add_model(model_name
, cluster_uuid
=cluster_uuid
)
325 # TODO: Instantiation parameters
328 "Juju bundle that models the KDU, in any of the following ways:
329 - <juju-repo>/<juju-bundle>
330 - <juju-bundle folder under k8s_models folder in the package>
331 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder in the package>
332 - <URL_where_to_fetch_juju_bundle>
336 if kdu_model
.startswith("cs:"):
338 elif kdu_model
.startswith("http"):
344 # if kdu_model.endswith(".tar.gz") or kdu_model.endswith(".tgz")
345 # Uncompress temporarily
346 # bundle = <uncompressed file>
350 # Raise named exception that the bundle could not be found
353 print("[install] deploying {}".format(bundle
))
354 await model
.deploy(bundle
)
356 # Get the application
358 # applications = model.applications
359 print("[install] Applications: {}".format(model
.applications
))
360 for name
in model
.applications
:
361 print("[install] Waiting for {} to settle".format(name
))
362 application
= model
.applications
[name
]
364 # It's not enough to wait for all units to be active;
365 # the application status needs to be active as well.
366 print("Waiting for all units to be active...")
367 await model
.block_until(
369 unit
.agent_status
== 'idle'
370 and application
.status
in ['active', 'unknown']
371 and unit
.workload_status
in [
373 ] for unit
in application
.units
377 print("All units active.")
379 except concurrent
.futures
._base
.TimeoutError
:
380 print("[install] Timeout exceeded; resetting cluster")
381 await self
.reset(cluster_uuid
)
384 # Wait for the application to be active
385 if model
.is_connected():
386 print("[install] Disconnecting model")
387 await model
.disconnect()
390 raise Exception("Unable to install")
392 async def instances_list(
397 returns a list of deployed releases in a cluster
399 :param cluster_uuid: the cluster
408 kdu_model
: str = None,
413 :param cluster_uuid str: The UUID of the cluster to upgrade
414 :param kdu_instance str: The unique name of the KDU instance
415 :param kdu_model str: The name or path of the bundle to upgrade to
416 :param params dict: Key-value pairs of instantiation parameters
418 :return: If successful, reference to the new revision number of the
422 # TODO: Loop through the bundle and upgrade each charm individually
425 The API doesn't have a concept of bundle upgrades, because there are
426 many possible changes: charm revision, disk, number of units, etc.
428 As such, we are only supporting a limited subset of upgrades. We'll
429 upgrade the charm revision but leave storage and scale untouched.
431 Scale changes should happen through OSM constructs, and changes to
432 storage would require a redeployment of the service, at least in this
435 namespace
= self
.get_namespace(cluster_uuid
)
436 model
= await self
.get_model(namespace
, cluster_uuid
=cluster_uuid
)
438 with
open(kdu_model
, 'r') as f
:
439 bundle
= yaml
.safe_load(f
)
443 'description': 'Test bundle',
444 'bundle': 'kubernetes',
447 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
450 'password': 'manopw',
451 'root_password': 'osm4u',
454 'series': 'kubernetes'
459 # TODO: This should be returned in an agreed-upon format
460 for name
in bundle
['applications']:
461 print(model
.applications
)
462 application
= model
.applications
[name
]
465 path
= bundle
['applications'][name
]['charm']
468 await application
.upgrade_charm(switch
=path
)
469 except juju
.errors
.JujuError
as ex
:
470 if 'already running charm' in str(ex
):
471 # We're already running this version
474 await model
.disconnect()
477 raise NotImplemented()
488 :param cluster_uuid str: The UUID of the cluster to rollback
489 :param kdu_instance str: The unique name of the KDU instance
490 :param revision int: The revision to revert to. If omitted, rolls back
491 the previous upgrade.
493 :return: If successful, returns the revision of active KDU instance,
494 or raises an exception
496 raise NotImplemented()
504 """Uninstall a KDU instance
506 :param cluster_uuid str: The UUID of the cluster to uninstall
507 :param kdu_instance str: The unique name of the KDU instance
509 :return: Returns True if successful, or raises an exception
513 # Remove an application from the model
514 model
= await self
.get_model(self
.get_namespace(cluster_uuid
), cluster_uuid
=cluster_uuid
)
517 # Get the application
518 if kdu_instance
not in model
.applications
:
519 # TODO: Raise a named exception
520 raise Exception("Application not found.")
522 application
= model
.applications
[kdu_instance
]
524 # Destroy the application
525 await application
.destroy()
527 # TODO: Verify removal
533 async def inspect_kdu(
539 Inspects a bundle and returns a dictionary of config parameters and
540 their default values.
542 :param kdu_model str: The name or path of the bundle to inspect.
544 :return: If successful, returns a dictionary of available parameters
545 and their default values.
549 with
open(kdu_model
, 'r') as f
:
550 bundle
= yaml
.safe_load(f
)
554 'description': 'Test bundle',
555 'bundle': 'kubernetes',
558 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
561 'password': 'manopw',
562 'root_password': 'osm4u',
565 'series': 'kubernetes'
570 # TODO: This should be returned in an agreed-upon format
571 kdu
= bundle
['applications']
581 If available, returns the README of the bundle.
583 :param kdu_model str: The name or path of a bundle
585 :return: If found, returns the contents of the README.
589 files
= ['README', 'README.txt', 'README.md']
590 path
= os
.path
.dirname(kdu_model
)
591 for file in os
.listdir(path
):
593 with
open(file, 'r') as f
:
599 async def status_kdu(
604 """Get the status of the KDU
606 Get the current status of the KDU instance.
608 :param cluster_uuid str: The UUID of the cluster
609 :param kdu_instance str: The unique id of the KDU instance
611 :return: Returns a dictionary containing namespace, state, resources,
616 model
= await self
.get_model(self
.get_namespace(cluster_uuid
), cluster_uuid
=cluster_uuid
)
618 # model = await self.get_model_by_uuid(cluster_uuid)
620 model_status
= await model
.get_status()
621 status
= model_status
.applications
623 for name
in model_status
.applications
:
624 application
= model_status
.applications
[name
]
626 'status': application
['status']['status']
629 if model
.is_connected():
630 await model
.disconnect()
640 """Add a k8s cloud to Juju
642 Adds a Kubernetes cloud to Juju, so it can be bootstrapped with a
645 :param cloud_name str: The name of the cloud to add.
646 :param credentials dict: A dictionary representing the output of
647 `kubectl config view --raw`.
649 :returns: True if successful, otherwise raises an exception.
652 cmd
= [self
.juju_command
, "add-k8s", "--local", cloud_name
]
656 stdout
=subprocess
.PIPE
,
657 stderr
=subprocess
.PIPE
,
658 # input=yaml.dump(credentials, Dumper=yaml.Dumper).encode("utf-8"),
659 input=credentials
.encode("utf-8"),
662 retcode
= p
.returncode
663 print("add-k8s return code: {}".format(retcode
))
666 raise Exception(p
.stderr
)
673 ) -> juju
.model
.Model
:
674 """Adds a model to the controller
676 Adds a new model to the Juju controller
678 :param model_name str: The name of the model to add.
679 :returns: The juju.model.Model object of the new model upon success or
682 if not self
.authenticated
:
683 await self
.login(cluster_uuid
)
685 self
.log
.debug("Adding model '{}' to cluster_uuid '{}'".format(model_name
, cluster_uuid
))
686 model
= await self
.controller
.add_model(
688 config
={'authorized-keys': self
.juju_public_key
}
698 """Bootstrap a Kubernetes controller
700 Bootstrap a Juju controller inside the Kubernetes cluster
702 :param cloud_name str: The name of the cloud.
703 :param cluster_uuid str: The UUID of the cluster to bootstrap.
704 :param loadbalancer bool: If the controller should use loadbalancer or not.
705 :returns: True upon success or raises an exception.
709 cmd
= [self
.juju_command
, "bootstrap", cloud_name
, cluster_uuid
]
712 For public clusters, specify that the controller service is using a LoadBalancer.
714 cmd
= [self
.juju_command
, "bootstrap", cloud_name
, cluster_uuid
, "--config", "controller-service-type=loadbalancer"]
716 print("Bootstrapping controller {} in cloud {}".format(
717 cluster_uuid
, cloud_name
722 stdout
=subprocess
.PIPE
,
723 stderr
=subprocess
.PIPE
,
726 retcode
= p
.returncode
730 if b
'already exists' not in p
.stderr
:
731 raise Exception(p
.stderr
)
735 async def destroy_controller(
739 """Destroy a Kubernetes controller
741 Destroy an existing Kubernetes controller.
743 :param cluster_uuid str: The UUID of the cluster to bootstrap.
744 :returns: True upon success or raises an exception.
748 "destroy-controller",
749 "--destroy-all-models",
757 stdout
=subprocess
.PIPE
,
758 stderr
=subprocess
.PIPE
,
761 retcode
= p
.returncode
765 if 'already exists' not in p
.stderr
:
766 raise Exception(p
.stderr
)
772 """Get the cluster configuration
774 Gets the configuration of the cluster
776 :param cluster_uuid str: The UUID of the cluster.
777 :return: A dict upon success, or raises an exception.
779 cluster_config
= "{}/{}.yaml".format(self
.fs
.path
, cluster_uuid
)
780 if os
.path
.exists(cluster_config
):
781 with
open(cluster_config
, 'r') as f
:
782 config
= yaml
.safe_load(f
.read())
786 "Unable to locate configuration for cluster {}".format(
795 ) -> juju
.model
.Model
:
796 """Get a model from the Juju Controller.
798 Note: Model objects returned must call disconnected() before it goes
801 :param model_name str: The name of the model to get
802 :return The juju.model.Model object if found, or None.
804 if not self
.authenticated
:
805 await self
.login(cluster_uuid
)
808 models
= await self
.controller
.list_models()
809 self
.log
.debug(models
)
810 if model_name
in models
:
811 self
.log
.debug("Found model: {}".format(model_name
))
812 model
= await self
.controller
.get_model(
821 """Get the namespace UUID
822 Gets the namespace's unique name
824 :param cluster_uuid str: The UUID of the cluster
825 :returns: The namespace UUID, or raises an exception
827 config
= self
.get_config(cluster_uuid
)
829 # Make sure the name is in the config
830 if 'namespace' not in config
:
831 raise Exception("Namespace not found.")
833 # TODO: We want to make sure this is unique to the cluster, in case
834 # the cluster is being reused.
835 # Consider pre/appending the cluster id to the namespace string
836 return config
['namespace']
842 """Check if a model exists in the controller
844 Checks to see if a model exists in the connected Juju controller.
846 :param model_name str: The name of the model
847 :return: A boolean indicating if the model exists
849 models
= await self
.controller
.list_models()
851 if model_name
in models
:
859 """Check if a cluster is local
861 Checks if a cluster is running in the local host
863 :param credentials dict: A dictionary containing the k8s credentials
864 :returns: A boolean if the cluster is running locally
866 creds
= yaml
.safe_load(credentials
)
867 if os
.getenv("OSMLCM_VCA_APIPROXY"):
868 host_ip
= os
.getenv("OSMLCM_VCA_APIPROXY")
870 if creds
and host_ip
:
871 for cluster
in creds
['clusters']:
872 if 'server' in cluster
['cluster']:
873 if host_ip
in cluster
['cluster']['server']:
878 async def login(self
, cluster_uuid
):
879 """Login to the Juju controller."""
881 if self
.authenticated
:
884 self
.connecting
= True
886 # Test: Make sure we have the credentials loaded
887 config
= self
.get_config(cluster_uuid
)
889 self
.juju_endpoint
= config
['endpoint']
890 self
.juju_user
= config
['username']
891 self
.juju_secret
= config
['secret']
892 self
.juju_ca_cert
= config
['cacert']
893 self
.juju_public_key
= None
895 self
.controller
= Controller()
899 "Connecting to controller... ws://{} as {}/{}".format(
906 await self
.controller
.connect(
907 endpoint
=self
.juju_endpoint
,
908 username
=self
.juju_user
,
909 password
=self
.juju_secret
,
910 cacert
=self
.juju_ca_cert
,
912 self
.authenticated
= True
913 self
.log
.debug("JujuApi: Logged into controller")
914 except Exception as ex
:
916 self
.log
.debug("Caught exception: {}".format(ex
))
919 self
.log
.fatal("VCA credentials not configured.")
920 self
.authenticated
= False
922 async def logout(self
):
923 """Logout of the Juju controller."""
925 if not self
.authenticated
:
928 for model
in self
.models
:
929 print("Logging out of model {}".format(model
))
930 await self
.models
[model
].disconnect()
933 self
.log
.debug("Disconnecting controller {}".format(
936 await self
.controller
.disconnect()
937 self
.controller
= None
939 self
.authenticated
= False
941 async def remove_cloud(
945 """Remove a k8s cloud from Juju
947 Removes a Kubernetes cloud from Juju.
949 :param cloud_name str: The name of the cloud to add.
951 :returns: True if successful, otherwise raises an exception.
954 # Remove the bootstrapped controller
955 cmd
= [self
.juju_command
, "remove-k8s", "--client", cloud_name
]
958 stdout
=subprocess
.PIPE
,
959 stderr
=subprocess
.PIPE
,
962 retcode
= p
.returncode
965 raise Exception(p
.stderr
)
967 # Remove the cloud from the local config
968 cmd
= [self
.juju_command
, "remove-cloud", "--client", cloud_name
]
971 stdout
=subprocess
.PIPE
,
972 stderr
=subprocess
.PIPE
,
975 retcode
= p
.returncode
978 raise Exception(p
.stderr
)
982 async def set_config(
987 """Save the cluster configuration
989 Saves the cluster information to the file store
991 :param cluster_uuid str: The UUID of the cluster
992 :param config dict: A dictionary containing the cluster configuration
993 :returns: Boolean upon success or raises an exception.
996 cluster_config
= "{}/{}.yaml".format(self
.fs
.path
, cluster_uuid
)
997 if not os
.path
.exists(cluster_config
):
998 print("Writing config to {}".format(cluster_config
))
999 with
open(cluster_config
, 'w') as f
:
1000 f
.write(yaml
.dump(config
, Dumper
=yaml
.Dumper
))