147b5993bb72b058cc47c8af1ec531a6fe92efbf
1 # Copyright 2019 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
17 from .exceptions
import NotImplemented
21 # from juju.bundle import BundleHandler
22 from juju
.controller
import Controller
23 from juju
.model
import Model
24 from juju
.errors
import JujuAPIError
, JujuError
28 from n2vc
.k8s_conn
import K8sConnector
33 # from .vnf import N2VC
39 class K8sJujuConnector(K8sConnector
):
45 kubectl_command
: str = '/usr/bin/kubectl',
46 juju_command
: str = '/usr/bin/juju',
52 :param kubectl_command: path to kubectl executable
53 :param helm_command: path to helm executable
54 :param fs: file system for kubernetes and helm configuration
59 K8sConnector
.__init
__(
63 on_update_db
=on_update_db
,
67 self
.info('Initializing K8S Juju connector')
69 self
.authenticated
= False
71 self
.log
= logging
.getLogger(__name__
)
73 self
.juju_command
= juju_command
76 self
.info('K8S Juju connector initialized')
82 namespace
: str = 'kube-system',
83 reuse_cluster_uuid
: str = None,
86 It prepares a given K8s cluster environment to run Juju bundles.
88 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid '.kube/config'
89 :param namespace: optional namespace to be used for juju. By default, 'kube-system' will be used
90 :param reuse_cluster_uuid: existing cluster uuid for reuse
91 :return: uuid of the K8s cluster and True if connector has installed some software in the cluster
92 (on error, an exception will be raised)
97 Bootstrapping cannot be done, by design, through the API. We need to
104 1. Has the environment already been bootstrapped?
105 - Check the database to see if we have a record for this env
107 2. If this is a new env, create it
108 - Add the k8s cloud to Juju
110 - Record it in the database
112 3. Connect to the Juju controller for this cloud
115 # cluster_uuid = reuse_cluster_uuid
116 # if not cluster_uuid:
117 # cluster_uuid = str(uuid4())
119 ##################################################
120 # TODO: Pull info from db based on the namespace #
121 ##################################################
123 ###################################################
124 # TODO: Make it idempotent, calling add-k8s and #
125 # bootstrap whenever reuse_cluster_uuid is passed #
127 # `init_env` is called to initialize the K8s #
128 # cluster for juju. If this initialization fails, #
129 # it can be called again by LCM with the param #
130 # reuse_cluster_uuid, e.g. to try to fix it. #
131 ###################################################
133 if not reuse_cluster_uuid
:
134 # This is a new cluster, so bootstrap it
136 cluster_uuid
= str(uuid
.uuid4())
138 # Is a local k8s cluster?
139 localk8s
= self
.is_local_k8s(k8s_creds
)
141 # If the k8s is external, the juju controller needs a loadbalancer
142 loadbalancer
= False if localk8s
else True
144 # Name the new k8s cloud
145 k8s_cloud
= "k8s-{}".format(cluster_uuid
)
147 print("Adding k8s cloud {}".format(k8s_cloud
))
148 await self
.add_k8s(k8s_cloud
, k8s_creds
)
150 # Bootstrap Juju controller
151 print("Bootstrapping...")
152 await self
.bootstrap(k8s_cloud
, cluster_uuid
, loadbalancer
)
153 print("Bootstrap done.")
155 # Get the controller information
157 # Parse ~/.local/share/juju/controllers.yaml
158 # controllers.testing.api-endpoints|ca-cert|uuid
159 print("Getting controller endpoints")
160 with
open(os
.path
.expanduser(
161 "~/.local/share/juju/controllers.yaml"
163 controllers
= yaml
.load(f
, Loader
=yaml
.Loader
)
164 controller
= controllers
['controllers'][cluster_uuid
]
165 endpoints
= controller
['api-endpoints']
166 self
.juju_endpoint
= endpoints
[0]
167 self
.juju_ca_cert
= controller
['ca-cert']
169 # Parse ~/.local/share/juju/accounts
170 # controllers.testing.user|password
171 print("Getting accounts")
172 with
open(os
.path
.expanduser(
173 "~/.local/share/juju/accounts.yaml"
175 controllers
= yaml
.load(f
, Loader
=yaml
.Loader
)
176 controller
= controllers
['controllers'][cluster_uuid
]
178 self
.juju_user
= controller
['user']
179 self
.juju_secret
= controller
['password']
181 print("user: {}".format(self
.juju_user
))
182 print("secret: {}".format(self
.juju_secret
))
183 print("endpoint: {}".format(self
.juju_endpoint
))
184 print("ca-cert: {}".format(self
.juju_ca_cert
))
186 # raise Exception("EOL")
188 self
.juju_public_key
= None
191 'endpoint': self
.juju_endpoint
,
192 'username': self
.juju_user
,
193 'secret': self
.juju_secret
,
194 'cacert': self
.juju_ca_cert
,
195 'namespace': namespace
,
196 'loadbalancer': loadbalancer
,
199 # Store the cluster configuration so it
200 # can be used for subsequent calls
201 print("Setting config")
202 await self
.set_config(cluster_uuid
, config
)
205 # This is an existing cluster, so get its config
206 cluster_uuid
= reuse_cluster_uuid
208 config
= self
.get_config(cluster_uuid
)
210 self
.juju_endpoint
= config
['endpoint']
211 self
.juju_user
= config
['username']
212 self
.juju_secret
= config
['secret']
213 self
.juju_ca_cert
= config
['cacert']
214 self
.juju_public_key
= None
216 # Login to the k8s cluster
217 if not self
.authenticated
:
218 await self
.login(cluster_uuid
)
220 # We're creating a new cluster
221 #print("Getting model {}".format(self.get_namespace(cluster_uuid), cluster_uuid=cluster_uuid))
222 #model = await self.get_model(
223 # self.get_namespace(cluster_uuid),
224 # cluster_uuid=cluster_uuid
227 ## Disconnect from the model
228 #if model and model.is_connected():
229 # await model.disconnect()
231 return cluster_uuid
, True
233 """Repo Management"""
240 raise NotImplemented()
242 async def repo_list(self
):
243 raise NotImplemented()
245 async def repo_remove(
249 raise NotImplemented()
251 async def synchronize_repos(
257 Returns None as currently add_repo is not implemented
266 uninstall_sw
: bool = False
270 Resets the Kubernetes cluster by removing the model that represents it.
272 :param cluster_uuid str: The UUID of the cluster to reset
273 :return: Returns True if successful or raises an exception.
277 if not self
.authenticated
:
278 await self
.login(cluster_uuid
)
280 if self
.controller
.is_connected():
282 namespace
= self
.get_namespace(cluster_uuid
)
283 if await self
.has_model(namespace
):
284 print("[reset] Destroying model")
285 await self
.controller
.destroy_model(
290 # Disconnect from the controller
291 print("[reset] Disconnecting controller")
294 # Destroy the controller (via CLI)
295 print("[reset] Destroying controller")
296 await self
.destroy_controller(cluster_uuid
)
298 print("[reset] Removing k8s cloud")
299 k8s_cloud
= "k8s-{}".format(cluster_uuid
)
300 await self
.remove_cloud(k8s_cloud
)
302 except Exception as ex
:
303 print("Caught exception during reset: {}".format(ex
))
314 timeout
: float = 300,
316 db_dict
: dict = None,
321 :param cluster_uuid str: The UUID of the cluster to install to
322 :param kdu_model str: The name or path of a bundle to install
323 :param atomic bool: If set, waits until the model is active and resets
324 the cluster on failure.
325 :param timeout int: The time, in seconds, to wait for the install
327 :param params dict: Key-value pairs of instantiation parameters
328 :param kdu_name: Name of the KDU instance to be installed
330 :return: If successful, returns ?
333 if not self
.authenticated
:
334 print("[install] Logging in to the controller")
335 await self
.login(cluster_uuid
)
338 # Get or create the model, based on the NS
341 kdu_instance
= "{}-{}".format(kdu_name
, db_dict
["filter"]["_id"])
343 kdu_instance
= db_dict
["filter"]["_id"]
345 self
.log
.debug("Checking for model named {}".format(kdu_instance
))
347 # Create the new model
348 self
.log
.debug("Adding model: {}".format(kdu_instance
))
349 model
= await self
.add_model(kdu_instance
, cluster_uuid
=cluster_uuid
)
352 # TODO: Instantiation parameters
355 "Juju bundle that models the KDU, in any of the following ways:
356 - <juju-repo>/<juju-bundle>
357 - <juju-bundle folder under k8s_models folder in the package>
358 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder in the package>
359 - <URL_where_to_fetch_juju_bundle>
363 if kdu_model
.startswith("cs:"):
365 elif kdu_model
.startswith("http"):
371 # if kdu_model.endswith(".tar.gz") or kdu_model.endswith(".tgz")
372 # Uncompress temporarily
373 # bundle = <uncompressed file>
377 # Raise named exception that the bundle could not be found
380 print("[install] deploying {}".format(bundle
))
381 await model
.deploy(bundle
)
383 # Get the application
385 # applications = model.applications
386 print("[install] Applications: {}".format(model
.applications
))
387 for name
in model
.applications
:
388 print("[install] Waiting for {} to settle".format(name
))
389 application
= model
.applications
[name
]
391 # It's not enough to wait for all units to be active;
392 # the application status needs to be active as well.
393 print("Waiting for all units to be active...")
394 await model
.block_until(
396 unit
.agent_status
== 'idle'
397 and application
.status
in ['active', 'unknown']
398 and unit
.workload_status
in [
400 ] for unit
in application
.units
404 print("All units active.")
406 except concurrent
.futures
._base
.TimeoutError
:
407 print("[install] Timeout exceeded; resetting cluster")
408 await self
.reset(cluster_uuid
)
411 # Wait for the application to be active
412 if model
.is_connected():
413 print("[install] Disconnecting model")
414 await model
.disconnect()
417 raise Exception("Unable to install")
419 async def instances_list(
424 returns a list of deployed releases in a cluster
426 :param cluster_uuid: the cluster
435 kdu_model
: str = None,
440 :param cluster_uuid str: The UUID of the cluster to upgrade
441 :param kdu_instance str: The unique name of the KDU instance
442 :param kdu_model str: The name or path of the bundle to upgrade to
443 :param params dict: Key-value pairs of instantiation parameters
445 :return: If successful, reference to the new revision number of the
449 # TODO: Loop through the bundle and upgrade each charm individually
452 The API doesn't have a concept of bundle upgrades, because there are
453 many possible changes: charm revision, disk, number of units, etc.
455 As such, we are only supporting a limited subset of upgrades. We'll
456 upgrade the charm revision but leave storage and scale untouched.
458 Scale changes should happen through OSM constructs, and changes to
459 storage would require a redeployment of the service, at least in this
462 namespace
= self
.get_namespace(cluster_uuid
)
463 model
= await self
.get_model(namespace
, cluster_uuid
=cluster_uuid
)
465 with
open(kdu_model
, 'r') as f
:
466 bundle
= yaml
.safe_load(f
)
470 'description': 'Test bundle',
471 'bundle': 'kubernetes',
474 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
477 'password': 'manopw',
478 'root_password': 'osm4u',
481 'series': 'kubernetes'
486 # TODO: This should be returned in an agreed-upon format
487 for name
in bundle
['applications']:
488 print(model
.applications
)
489 application
= model
.applications
[name
]
492 path
= bundle
['applications'][name
]['charm']
495 await application
.upgrade_charm(switch
=path
)
496 except juju
.errors
.JujuError
as ex
:
497 if 'already running charm' in str(ex
):
498 # We're already running this version
501 await model
.disconnect()
504 raise NotImplemented()
515 :param cluster_uuid str: The UUID of the cluster to rollback
516 :param kdu_instance str: The unique name of the KDU instance
517 :param revision int: The revision to revert to. If omitted, rolls back
518 the previous upgrade.
520 :return: If successful, returns the revision of active KDU instance,
521 or raises an exception
523 raise NotImplemented()
531 """Uninstall a KDU instance
533 :param cluster_uuid str: The UUID of the cluster
534 :param kdu_instance str: The unique name of the KDU instance
536 :return: Returns True if successful, or raises an exception
538 if not self
.authenticated
:
539 self
.log
.debug("[uninstall] Connecting to controller")
540 await self
.login(cluster_uuid
)
542 self
.log
.debug("[uninstall] Destroying model")
544 await self
.controller
.destroy_models(kdu_instance
)
546 self
.log
.debug("[uninstall] Model destroyed and disconnecting")
552 async def inspect_kdu(
558 Inspects a bundle and returns a dictionary of config parameters and
559 their default values.
561 :param kdu_model str: The name or path of the bundle to inspect.
563 :return: If successful, returns a dictionary of available parameters
564 and their default values.
568 with
open(kdu_model
, 'r') as f
:
569 bundle
= yaml
.safe_load(f
)
573 'description': 'Test bundle',
574 'bundle': 'kubernetes',
577 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
580 'password': 'manopw',
581 'root_password': 'osm4u',
584 'series': 'kubernetes'
589 # TODO: This should be returned in an agreed-upon format
590 kdu
= bundle
['applications']
600 If available, returns the README of the bundle.
602 :param kdu_model str: The name or path of a bundle
604 :return: If found, returns the contents of the README.
608 files
= ['README', 'README.txt', 'README.md']
609 path
= os
.path
.dirname(kdu_model
)
610 for file in os
.listdir(path
):
612 with
open(file, 'r') as f
:
618 async def status_kdu(
623 """Get the status of the KDU
625 Get the current status of the KDU instance.
627 :param cluster_uuid str: The UUID of the cluster
628 :param kdu_instance str: The unique id of the KDU instance
630 :return: Returns a dictionary containing namespace, state, resources,
635 model
= await self
.get_model(self
.get_namespace(cluster_uuid
), cluster_uuid
=cluster_uuid
)
637 # model = await self.get_model_by_uuid(cluster_uuid)
639 model_status
= await model
.get_status()
640 status
= model_status
.applications
642 for name
in model_status
.applications
:
643 application
= model_status
.applications
[name
]
645 'status': application
['status']['status']
648 if model
.is_connected():
649 await model
.disconnect()
659 """Add a k8s cloud to Juju
661 Adds a Kubernetes cloud to Juju, so it can be bootstrapped with a
664 :param cloud_name str: The name of the cloud to add.
665 :param credentials dict: A dictionary representing the output of
666 `kubectl config view --raw`.
668 :returns: True if successful, otherwise raises an exception.
671 cmd
= [self
.juju_command
, "add-k8s", "--local", cloud_name
]
674 process
= await asyncio
.create_subprocess_exec(
676 stdout
=asyncio
.subprocess
.PIPE
,
677 stderr
=asyncio
.subprocess
.PIPE
,
678 stdin
=asyncio
.subprocess
.PIPE
,
681 # Feed the process the credentials
682 process
.stdin
.write(credentials
.encode("utf-8"))
683 await process
.stdin
.drain()
684 process
.stdin
.close()
686 stdout
, stderr
= await process
.communicate()
688 return_code
= process
.returncode
690 print("add-k8s return code: {}".format(return_code
))
693 raise Exception(stderr
)
701 ) -> juju
.model
.Model
:
702 """Adds a model to the controller
704 Adds a new model to the Juju controller
706 :param model_name str: The name of the model to add.
707 :returns: The juju.model.Model object of the new model upon success or
710 if not self
.authenticated
:
711 await self
.login(cluster_uuid
)
713 self
.log
.debug("Adding model '{}' to cluster_uuid '{}'".format(model_name
, cluster_uuid
))
715 model
= await self
.controller
.add_model(
717 config
={'authorized-keys': self
.juju_public_key
}
719 except Exception as ex
:
721 self
.log
.debug("Caught exception: {}".format(ex
))
732 """Bootstrap a Kubernetes controller
734 Bootstrap a Juju controller inside the Kubernetes cluster
736 :param cloud_name str: The name of the cloud.
737 :param cluster_uuid str: The UUID of the cluster to bootstrap.
738 :param loadbalancer bool: If the controller should use loadbalancer or not.
739 :returns: True upon success or raises an exception.
743 cmd
= [self
.juju_command
, "bootstrap", cloud_name
, cluster_uuid
]
746 For public clusters, specify that the controller service is using a LoadBalancer.
748 cmd
= [self
.juju_command
, "bootstrap", cloud_name
, cluster_uuid
, "--config", "controller-service-type=loadbalancer"]
750 print("Bootstrapping controller {} in cloud {}".format(
751 cluster_uuid
, cloud_name
754 process
= await asyncio
.create_subprocess_exec(
756 stdout
=asyncio
.subprocess
.PIPE
,
757 stderr
=asyncio
.subprocess
.PIPE
,
760 stdout
, stderr
= await process
.communicate()
762 return_code
= process
.returncode
766 if b
'already exists' not in stderr
:
767 raise Exception(stderr
)
771 async def destroy_controller(
775 """Destroy a Kubernetes controller
777 Destroy an existing Kubernetes controller.
779 :param cluster_uuid str: The UUID of the cluster to bootstrap.
780 :returns: True upon success or raises an exception.
784 "destroy-controller",
785 "--destroy-all-models",
791 process
= await asyncio
.create_subprocess_exec(
793 stdout
=asyncio
.subprocess
.PIPE
,
794 stderr
=asyncio
.subprocess
.PIPE
,
797 stdout
, stderr
= await process
.communicate()
799 return_code
= process
.returncode
803 if 'already exists' not in stderr
:
804 raise Exception(stderr
)
810 """Get the cluster configuration
812 Gets the configuration of the cluster
814 :param cluster_uuid str: The UUID of the cluster.
815 :return: A dict upon success, or raises an exception.
817 cluster_config
= "{}/{}.yaml".format(self
.fs
.path
, cluster_uuid
)
818 if os
.path
.exists(cluster_config
):
819 with
open(cluster_config
, 'r') as f
:
820 config
= yaml
.safe_load(f
.read())
824 "Unable to locate configuration for cluster {}".format(
833 ) -> juju
.model
.Model
:
834 """Get a model from the Juju Controller.
836 Note: Model objects returned must call disconnected() before it goes
839 :param model_name str: The name of the model to get
840 :return The juju.model.Model object if found, or None.
842 if not self
.authenticated
:
843 await self
.login(cluster_uuid
)
846 models
= await self
.controller
.list_models()
847 self
.log
.debug(models
)
848 if model_name
in models
:
849 self
.log
.debug("Found model: {}".format(model_name
))
850 model
= await self
.controller
.get_model(
859 """Get the namespace UUID
860 Gets the namespace's unique name
862 :param cluster_uuid str: The UUID of the cluster
863 :returns: The namespace UUID, or raises an exception
865 config
= self
.get_config(cluster_uuid
)
867 # Make sure the name is in the config
868 if 'namespace' not in config
:
869 raise Exception("Namespace not found.")
871 # TODO: We want to make sure this is unique to the cluster, in case
872 # the cluster is being reused.
873 # Consider pre/appending the cluster id to the namespace string
874 return config
['namespace']
880 """Check if a model exists in the controller
882 Checks to see if a model exists in the connected Juju controller.
884 :param model_name str: The name of the model
885 :return: A boolean indicating if the model exists
887 models
= await self
.controller
.list_models()
889 if model_name
in models
:
897 """Check if a cluster is local
899 Checks if a cluster is running in the local host
901 :param credentials dict: A dictionary containing the k8s credentials
902 :returns: A boolean if the cluster is running locally
904 creds
= yaml
.safe_load(credentials
)
905 if os
.getenv("OSMLCM_VCA_APIPROXY"):
906 host_ip
= os
.getenv("OSMLCM_VCA_APIPROXY")
908 if creds
and host_ip
:
909 for cluster
in creds
['clusters']:
910 if 'server' in cluster
['cluster']:
911 if host_ip
in cluster
['cluster']['server']:
916 async def login(self
, cluster_uuid
):
917 """Login to the Juju controller."""
919 if self
.authenticated
:
922 self
.connecting
= True
924 # Test: Make sure we have the credentials loaded
925 config
= self
.get_config(cluster_uuid
)
927 self
.juju_endpoint
= config
['endpoint']
928 self
.juju_user
= config
['username']
929 self
.juju_secret
= config
['secret']
930 self
.juju_ca_cert
= config
['cacert']
931 self
.juju_public_key
= None
933 self
.controller
= Controller()
937 "Connecting to controller... ws://{} as {}/{}".format(
944 await self
.controller
.connect(
945 endpoint
=self
.juju_endpoint
,
946 username
=self
.juju_user
,
947 password
=self
.juju_secret
,
948 cacert
=self
.juju_ca_cert
,
950 self
.authenticated
= True
951 self
.log
.debug("JujuApi: Logged into controller")
952 except Exception as ex
:
954 self
.log
.debug("Caught exception: {}".format(ex
))
957 self
.log
.fatal("VCA credentials not configured.")
958 self
.authenticated
= False
960 async def logout(self
):
961 """Logout of the Juju controller."""
963 if not self
.authenticated
:
966 for model
in self
.models
:
967 print("Logging out of model {}".format(model
))
968 await self
.models
[model
].disconnect()
971 self
.log
.debug("Disconnecting controller {}".format(
974 await self
.controller
.disconnect()
975 self
.controller
= None
977 self
.authenticated
= False
979 async def remove_cloud(
983 """Remove a k8s cloud from Juju
985 Removes a Kubernetes cloud from Juju.
987 :param cloud_name str: The name of the cloud to add.
989 :returns: True if successful, otherwise raises an exception.
992 # Remove the bootstrapped controller
993 cmd
= [self
.juju_command
, "remove-k8s", "--client", cloud_name
]
994 process
= await asyncio
.create_subprocess_exec(
996 stdout
=asyncio
.subprocess
.PIPE
,
997 stderr
=asyncio
.subprocess
.PIPE
,
1000 stdout
, stderr
= await process
.communicate()
1002 return_code
= process
.returncode
1005 raise Exception(stderr
)
1007 # Remove the cloud from the local config
1008 cmd
= [self
.juju_command
, "remove-cloud", "--client", cloud_name
]
1009 process
= await asyncio
.create_subprocess_exec(
1011 stdout
=asyncio
.subprocess
.PIPE
,
1012 stderr
=asyncio
.subprocess
.PIPE
,
1015 stdout
, stderr
= await process
.communicate()
1017 return_code
= process
.returncode
1020 raise Exception(stderr
)
1024 async def set_config(
1029 """Save the cluster configuration
1031 Saves the cluster information to the file store
1033 :param cluster_uuid str: The UUID of the cluster
1034 :param config dict: A dictionary containing the cluster configuration
1035 :returns: Boolean upon success or raises an exception.
1038 cluster_config
= "{}/{}.yaml".format(self
.fs
.path
, cluster_uuid
)
1039 if not os
.path
.exists(cluster_config
):
1040 print("Writing config to {}".format(cluster_config
))
1041 with
open(cluster_config
, 'w') as f
:
1042 f
.write(yaml
.dump(config
, Dumper
=yaml
.Dumper
))