550ad123a0c89bf0233eca5c84791ca53a87750b
1 # Copyright 2019 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
17 from .exceptions
import NotImplemented
21 # from juju.bundle import BundleHandler
22 from juju
.controller
import Controller
23 from juju
.model
import Model
24 from juju
.errors
import JujuAPIError
, JujuError
26 from n2vc
.k8s_conn
import K8sConnector
31 # from .vnf import N2VC
37 class K8sJujuConnector(K8sConnector
):
43 kubectl_command
: str = '/usr/bin/kubectl',
44 juju_command
: str = '/usr/bin/juju',
50 :param kubectl_command: path to kubectl executable
51 :param helm_command: path to helm executable
52 :param fs: file system for kubernetes and helm configuration
57 K8sConnector
.__init
__(
61 on_update_db
=on_update_db
,
65 self
.log
.debug('Initializing K8S Juju connector')
67 self
.authenticated
= False
70 self
.juju_command
= juju_command
73 self
.log
.debug('K8S Juju connector initialized')
79 namespace
: str = 'kube-system',
80 reuse_cluster_uuid
: str = None,
83 It prepares a given K8s cluster environment to run Juju bundles.
85 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid '.kube/config'
86 :param namespace: optional namespace to be used for juju. By default, 'kube-system' will be used
87 :param reuse_cluster_uuid: existing cluster uuid for reuse
88 :return: uuid of the K8s cluster and True if connector has installed some software in the cluster
89 (on error, an exception will be raised)
94 Bootstrapping cannot be done, by design, through the API. We need to
101 1. Has the environment already been bootstrapped?
102 - Check the database to see if we have a record for this env
104 2. If this is a new env, create it
105 - Add the k8s cloud to Juju
107 - Record it in the database
109 3. Connect to the Juju controller for this cloud
112 # cluster_uuid = reuse_cluster_uuid
113 # if not cluster_uuid:
114 # cluster_uuid = str(uuid4())
116 ##################################################
117 # TODO: Pull info from db based on the namespace #
118 ##################################################
120 ###################################################
121 # TODO: Make it idempotent, calling add-k8s and #
122 # bootstrap whenever reuse_cluster_uuid is passed #
124 # `init_env` is called to initialize the K8s #
125 # cluster for juju. If this initialization fails, #
126 # it can be called again by LCM with the param #
127 # reuse_cluster_uuid, e.g. to try to fix it. #
128 ###################################################
130 if not reuse_cluster_uuid
:
131 # This is a new cluster, so bootstrap it
133 cluster_uuid
= str(uuid
.uuid4())
135 # Is a local k8s cluster?
136 localk8s
= self
.is_local_k8s(k8s_creds
)
138 # If the k8s is external, the juju controller needs a loadbalancer
139 loadbalancer
= False if localk8s
else True
141 # Name the new k8s cloud
142 k8s_cloud
= "k8s-{}".format(cluster_uuid
)
144 self
.log
.debug("Adding k8s cloud {}".format(k8s_cloud
))
145 await self
.add_k8s(k8s_cloud
, k8s_creds
)
147 # Bootstrap Juju controller
148 self
.log
.debug("Bootstrapping...")
149 await self
.bootstrap(k8s_cloud
, cluster_uuid
, loadbalancer
)
150 self
.log
.debug("Bootstrap done.")
152 # Get the controller information
154 # Parse ~/.local/share/juju/controllers.yaml
155 # controllers.testing.api-endpoints|ca-cert|uuid
156 self
.log
.debug("Getting controller endpoints")
157 with
open(os
.path
.expanduser(
158 "~/.local/share/juju/controllers.yaml"
160 controllers
= yaml
.load(f
, Loader
=yaml
.Loader
)
161 controller
= controllers
['controllers'][cluster_uuid
]
162 endpoints
= controller
['api-endpoints']
163 self
.juju_endpoint
= endpoints
[0]
164 self
.juju_ca_cert
= controller
['ca-cert']
166 # Parse ~/.local/share/juju/accounts
167 # controllers.testing.user|password
168 self
.log
.debug("Getting accounts")
169 with
open(os
.path
.expanduser(
170 "~/.local/share/juju/accounts.yaml"
172 controllers
= yaml
.load(f
, Loader
=yaml
.Loader
)
173 controller
= controllers
['controllers'][cluster_uuid
]
175 self
.juju_user
= controller
['user']
176 self
.juju_secret
= controller
['password']
178 # raise Exception("EOL")
180 self
.juju_public_key
= None
183 'endpoint': self
.juju_endpoint
,
184 'username': self
.juju_user
,
185 'secret': self
.juju_secret
,
186 'cacert': self
.juju_ca_cert
,
187 'namespace': namespace
,
188 'loadbalancer': loadbalancer
,
191 # Store the cluster configuration so it
192 # can be used for subsequent calls
193 self
.log
.debug("Setting config")
194 await self
.set_config(cluster_uuid
, config
)
197 # This is an existing cluster, so get its config
198 cluster_uuid
= reuse_cluster_uuid
200 config
= self
.get_config(cluster_uuid
)
202 self
.juju_endpoint
= config
['endpoint']
203 self
.juju_user
= config
['username']
204 self
.juju_secret
= config
['secret']
205 self
.juju_ca_cert
= config
['cacert']
206 self
.juju_public_key
= None
208 # Login to the k8s cluster
209 if not self
.authenticated
:
210 await self
.login(cluster_uuid
)
212 # We're creating a new cluster
213 #print("Getting model {}".format(self.get_namespace(cluster_uuid), cluster_uuid=cluster_uuid))
214 #model = await self.get_model(
215 # self.get_namespace(cluster_uuid),
216 # cluster_uuid=cluster_uuid
219 ## Disconnect from the model
220 #if model and model.is_connected():
221 # await model.disconnect()
223 return cluster_uuid
, True
225 """Repo Management"""
232 raise NotImplemented()
234 async def repo_list(self
):
235 raise NotImplemented()
237 async def repo_remove(
241 raise NotImplemented()
243 async def synchronize_repos(
249 Returns None as currently add_repo is not implemented
258 uninstall_sw
: bool = False
262 Resets the Kubernetes cluster by removing the model that represents it.
264 :param cluster_uuid str: The UUID of the cluster to reset
265 :return: Returns True if successful or raises an exception.
269 if not self
.authenticated
:
270 await self
.login(cluster_uuid
)
272 if self
.controller
.is_connected():
274 namespace
= self
.get_namespace(cluster_uuid
)
275 if await self
.has_model(namespace
):
276 self
.log
.debug("[reset] Destroying model")
277 await self
.controller
.destroy_model(
282 # Disconnect from the controller
283 self
.log
.debug("[reset] Disconnecting controller")
286 # Destroy the controller (via CLI)
287 self
.log
.debug("[reset] Destroying controller")
288 await self
.destroy_controller(cluster_uuid
)
290 self
.log
.debug("[reset] Removing k8s cloud")
291 k8s_cloud
= "k8s-{}".format(cluster_uuid
)
292 await self
.remove_cloud(k8s_cloud
)
294 except Exception as ex
:
295 self
.log
.debug("Caught exception during reset: {}".format(ex
))
306 timeout
: float = 300,
308 db_dict
: dict = None,
309 kdu_name
: str = None,
310 namespace
: str = None
314 :param cluster_uuid str: The UUID of the cluster to install to
315 :param kdu_model str: The name or path of a bundle to install
316 :param atomic bool: If set, waits until the model is active and resets
317 the cluster on failure.
318 :param timeout int: The time, in seconds, to wait for the install
320 :param params dict: Key-value pairs of instantiation parameters
321 :param kdu_name: Name of the KDU instance to be installed
322 :param namespace: K8s namespace to use for the KDU instance
324 :return: If successful, returns ?
327 if not self
.authenticated
:
328 self
.log
.debug("[install] Logging in to the controller")
329 await self
.login(cluster_uuid
)
332 # Get or create the model, based on the NS
335 kdu_instance
= "{}-{}".format(kdu_name
, db_dict
["filter"]["_id"])
337 kdu_instance
= db_dict
["filter"]["_id"]
339 self
.log
.debug("Checking for model named {}".format(kdu_instance
))
341 # Create the new model
342 self
.log
.debug("Adding model: {}".format(kdu_instance
))
343 model
= await self
.add_model(kdu_instance
, cluster_uuid
=cluster_uuid
)
346 # TODO: Instantiation parameters
349 "Juju bundle that models the KDU, in any of the following ways:
350 - <juju-repo>/<juju-bundle>
351 - <juju-bundle folder under k8s_models folder in the package>
352 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder in the package>
353 - <URL_where_to_fetch_juju_bundle>
356 previous_workdir
= os
.getcwd()
359 if kdu_model
.startswith("cs:"):
361 elif kdu_model
.startswith("http"):
365 new_workdir
= kdu_model
.strip(kdu_model
.split("/")[-1])
367 os
.chdir(new_workdir
)
369 bundle
= "local:{}".format(kdu_model
)
372 # Raise named exception that the bundle could not be found
375 self
.log
.debug("[install] deploying {}".format(bundle
))
376 await model
.deploy(bundle
)
378 # Get the application
380 # applications = model.applications
381 self
.log
.debug("[install] Applications: {}".format(model
.applications
))
382 for name
in model
.applications
:
383 self
.log
.debug("[install] Waiting for {} to settle".format(name
))
384 application
= model
.applications
[name
]
386 # It's not enough to wait for all units to be active;
387 # the application status needs to be active as well.
388 self
.log
.debug("Waiting for all units to be active...")
389 await model
.block_until(
391 unit
.agent_status
== 'idle'
392 and application
.status
in ['active', 'unknown']
393 and unit
.workload_status
in [
395 ] for unit
in application
.units
399 self
.log
.debug("All units active.")
401 except concurrent
.futures
._base
.TimeoutError
: # TODO use asyncio.TimeoutError
402 os
.chdir(previous_workdir
)
403 self
.log
.debug("[install] Timeout exceeded; resetting cluster")
404 await self
.reset(cluster_uuid
)
407 # Wait for the application to be active
408 if model
.is_connected():
409 self
.log
.debug("[install] Disconnecting model")
410 await model
.disconnect()
412 os
.chdir(previous_workdir
)
415 raise Exception("Unable to install")
417 async def instances_list(
422 returns a list of deployed releases in a cluster
424 :param cluster_uuid: the cluster
433 kdu_model
: str = None,
438 :param cluster_uuid str: The UUID of the cluster to upgrade
439 :param kdu_instance str: The unique name of the KDU instance
440 :param kdu_model str: The name or path of the bundle to upgrade to
441 :param params dict: Key-value pairs of instantiation parameters
443 :return: If successful, reference to the new revision number of the
447 # TODO: Loop through the bundle and upgrade each charm individually
450 The API doesn't have a concept of bundle upgrades, because there are
451 many possible changes: charm revision, disk, number of units, etc.
453 As such, we are only supporting a limited subset of upgrades. We'll
454 upgrade the charm revision but leave storage and scale untouched.
456 Scale changes should happen through OSM constructs, and changes to
457 storage would require a redeployment of the service, at least in this
460 namespace
= self
.get_namespace(cluster_uuid
)
461 model
= await self
.get_model(namespace
, cluster_uuid
=cluster_uuid
)
463 with
open(kdu_model
, 'r') as f
:
464 bundle
= yaml
.safe_load(f
)
468 'description': 'Test bundle',
469 'bundle': 'kubernetes',
472 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
475 'password': 'manopw',
476 'root_password': 'osm4u',
479 'series': 'kubernetes'
484 # TODO: This should be returned in an agreed-upon format
485 for name
in bundle
['applications']:
486 self
.log
.debug(model
.applications
)
487 application
= model
.applications
[name
]
488 self
.log
.debug(application
)
490 path
= bundle
['applications'][name
]['charm']
493 await application
.upgrade_charm(switch
=path
)
494 except juju
.errors
.JujuError
as ex
:
495 if 'already running charm' in str(ex
):
496 # We're already running this version
499 await model
.disconnect()
502 raise NotImplemented()
513 :param cluster_uuid str: The UUID of the cluster to rollback
514 :param kdu_instance str: The unique name of the KDU instance
515 :param revision int: The revision to revert to. If omitted, rolls back
516 the previous upgrade.
518 :return: If successful, returns the revision of active KDU instance,
519 or raises an exception
521 raise NotImplemented()
529 """Uninstall a KDU instance
531 :param cluster_uuid str: The UUID of the cluster
532 :param kdu_instance str: The unique name of the KDU instance
534 :return: Returns True if successful, or raises an exception
536 if not self
.authenticated
:
537 self
.log
.debug("[uninstall] Connecting to controller")
538 await self
.login(cluster_uuid
)
540 self
.log
.debug("[uninstall] Destroying model")
542 await self
.controller
.destroy_models(kdu_instance
)
544 self
.log
.debug("[uninstall] Model destroyed and disconnecting")
550 async def inspect_kdu(
556 Inspects a bundle and returns a dictionary of config parameters and
557 their default values.
559 :param kdu_model str: The name or path of the bundle to inspect.
561 :return: If successful, returns a dictionary of available parameters
562 and their default values.
566 with
open(kdu_model
, 'r') as f
:
567 bundle
= yaml
.safe_load(f
)
571 'description': 'Test bundle',
572 'bundle': 'kubernetes',
575 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
578 'password': 'manopw',
579 'root_password': 'osm4u',
582 'series': 'kubernetes'
587 # TODO: This should be returned in an agreed-upon format
588 kdu
= bundle
['applications']
598 If available, returns the README of the bundle.
600 :param kdu_model str: The name or path of a bundle
602 :return: If found, returns the contents of the README.
606 files
= ['README', 'README.txt', 'README.md']
607 path
= os
.path
.dirname(kdu_model
)
608 for file in os
.listdir(path
):
610 with
open(file, 'r') as f
:
616 async def status_kdu(
621 """Get the status of the KDU
623 Get the current status of the KDU instance.
625 :param cluster_uuid str: The UUID of the cluster
626 :param kdu_instance str: The unique id of the KDU instance
628 :return: Returns a dictionary containing namespace, state, resources,
633 model
= await self
.get_model(self
.get_namespace(cluster_uuid
), cluster_uuid
=cluster_uuid
)
635 # model = await self.get_model_by_uuid(cluster_uuid)
637 model_status
= await model
.get_status()
638 status
= model_status
.applications
640 for name
in model_status
.applications
:
641 application
= model_status
.applications
[name
]
643 'status': application
['status']['status']
646 if model
.is_connected():
647 await model
.disconnect()
657 """Add a k8s cloud to Juju
659 Adds a Kubernetes cloud to Juju, so it can be bootstrapped with a
662 :param cloud_name str: The name of the cloud to add.
663 :param credentials dict: A dictionary representing the output of
664 `kubectl config view --raw`.
666 :returns: True if successful, otherwise raises an exception.
669 cmd
= [self
.juju_command
, "add-k8s", "--local", cloud_name
]
672 process
= await asyncio
.create_subprocess_exec(
674 stdout
=asyncio
.subprocess
.PIPE
,
675 stderr
=asyncio
.subprocess
.PIPE
,
676 stdin
=asyncio
.subprocess
.PIPE
,
679 # Feed the process the credentials
680 process
.stdin
.write(credentials
.encode("utf-8"))
681 await process
.stdin
.drain()
682 process
.stdin
.close()
684 stdout
, stderr
= await process
.communicate()
686 return_code
= process
.returncode
688 self
.log
.debug("add-k8s return code: {}".format(return_code
))
691 raise Exception(stderr
)
699 ) -> juju
.model
.Model
:
700 """Adds a model to the controller
702 Adds a new model to the Juju controller
704 :param model_name str: The name of the model to add.
705 :returns: The juju.model.Model object of the new model upon success or
708 if not self
.authenticated
:
709 await self
.login(cluster_uuid
)
711 self
.log
.debug("Adding model '{}' to cluster_uuid '{}'".format(model_name
, cluster_uuid
))
713 model
= await self
.controller
.add_model(
715 config
={'authorized-keys': self
.juju_public_key
}
717 except Exception as ex
:
719 self
.log
.debug("Caught exception: {}".format(ex
))
730 """Bootstrap a Kubernetes controller
732 Bootstrap a Juju controller inside the Kubernetes cluster
734 :param cloud_name str: The name of the cloud.
735 :param cluster_uuid str: The UUID of the cluster to bootstrap.
736 :param loadbalancer bool: If the controller should use loadbalancer or not.
737 :returns: True upon success or raises an exception.
741 cmd
= [self
.juju_command
, "bootstrap", cloud_name
, cluster_uuid
]
744 For public clusters, specify that the controller service is using a LoadBalancer.
746 cmd
= [self
.juju_command
, "bootstrap", cloud_name
, cluster_uuid
, "--config", "controller-service-type=loadbalancer"]
748 self
.log
.debug("Bootstrapping controller {} in cloud {}".format(
749 cluster_uuid
, cloud_name
752 process
= await asyncio
.create_subprocess_exec(
754 stdout
=asyncio
.subprocess
.PIPE
,
755 stderr
=asyncio
.subprocess
.PIPE
,
758 stdout
, stderr
= await process
.communicate()
760 return_code
= process
.returncode
764 if b
'already exists' not in stderr
:
765 raise Exception(stderr
)
769 async def destroy_controller(
773 """Destroy a Kubernetes controller
775 Destroy an existing Kubernetes controller.
777 :param cluster_uuid str: The UUID of the cluster to bootstrap.
778 :returns: True upon success or raises an exception.
782 "destroy-controller",
783 "--destroy-all-models",
789 process
= await asyncio
.create_subprocess_exec(
791 stdout
=asyncio
.subprocess
.PIPE
,
792 stderr
=asyncio
.subprocess
.PIPE
,
795 stdout
, stderr
= await process
.communicate()
797 return_code
= process
.returncode
801 if 'already exists' not in stderr
:
802 raise Exception(stderr
)
808 """Get the cluster configuration
810 Gets the configuration of the cluster
812 :param cluster_uuid str: The UUID of the cluster.
813 :return: A dict upon success, or raises an exception.
815 cluster_config
= "{}/{}.yaml".format(self
.fs
.path
, cluster_uuid
)
816 if os
.path
.exists(cluster_config
):
817 with
open(cluster_config
, 'r') as f
:
818 config
= yaml
.safe_load(f
.read())
822 "Unable to locate configuration for cluster {}".format(
831 ) -> juju
.model
.Model
:
832 """Get a model from the Juju Controller.
834 Note: Model objects returned must call disconnected() before it goes
837 :param model_name str: The name of the model to get
838 :return The juju.model.Model object if found, or None.
840 if not self
.authenticated
:
841 await self
.login(cluster_uuid
)
844 models
= await self
.controller
.list_models()
845 self
.log
.debug(models
)
846 if model_name
in models
:
847 self
.log
.debug("Found model: {}".format(model_name
))
848 model
= await self
.controller
.get_model(
857 """Get the namespace UUID
858 Gets the namespace's unique name
860 :param cluster_uuid str: The UUID of the cluster
861 :returns: The namespace UUID, or raises an exception
863 config
= self
.get_config(cluster_uuid
)
865 # Make sure the name is in the config
866 if 'namespace' not in config
:
867 raise Exception("Namespace not found.")
869 # TODO: We want to make sure this is unique to the cluster, in case
870 # the cluster is being reused.
871 # Consider pre/appending the cluster id to the namespace string
872 return config
['namespace']
878 """Check if a model exists in the controller
880 Checks to see if a model exists in the connected Juju controller.
882 :param model_name str: The name of the model
883 :return: A boolean indicating if the model exists
885 models
= await self
.controller
.list_models()
887 if model_name
in models
:
895 """Check if a cluster is local
897 Checks if a cluster is running in the local host
899 :param credentials dict: A dictionary containing the k8s credentials
900 :returns: A boolean if the cluster is running locally
902 creds
= yaml
.safe_load(credentials
)
903 if os
.getenv("OSMLCM_VCA_APIPROXY"):
904 host_ip
= os
.getenv("OSMLCM_VCA_APIPROXY")
906 if creds
and host_ip
:
907 for cluster
in creds
['clusters']:
908 if 'server' in cluster
['cluster']:
909 if host_ip
in cluster
['cluster']['server']:
914 async def login(self
, cluster_uuid
):
915 """Login to the Juju controller."""
917 if self
.authenticated
:
920 self
.connecting
= True
922 # Test: Make sure we have the credentials loaded
923 config
= self
.get_config(cluster_uuid
)
925 self
.juju_endpoint
= config
['endpoint']
926 self
.juju_user
= config
['username']
927 self
.juju_secret
= config
['secret']
928 self
.juju_ca_cert
= config
['cacert']
929 self
.juju_public_key
= None
931 self
.controller
= Controller()
935 "Connecting to controller... ws://{} as {}/{}".format(
942 await self
.controller
.connect(
943 endpoint
=self
.juju_endpoint
,
944 username
=self
.juju_user
,
945 password
=self
.juju_secret
,
946 cacert
=self
.juju_ca_cert
,
948 self
.authenticated
= True
949 self
.log
.debug("JujuApi: Logged into controller")
950 except Exception as ex
:
952 self
.log
.debug("Caught exception: {}".format(ex
))
955 self
.log
.fatal("VCA credentials not configured.")
956 self
.authenticated
= False
958 async def logout(self
):
959 """Logout of the Juju controller."""
960 self
.log
.debug("[logout]")
961 if not self
.authenticated
:
964 for model
in self
.models
:
965 self
.log
.debug("Logging out of model {}".format(model
))
966 await self
.models
[model
].disconnect()
969 self
.log
.debug("Disconnecting controller {}".format(
972 await self
.controller
.disconnect()
973 self
.controller
= None
975 self
.authenticated
= False
977 async def remove_cloud(
981 """Remove a k8s cloud from Juju
983 Removes a Kubernetes cloud from Juju.
985 :param cloud_name str: The name of the cloud to add.
987 :returns: True if successful, otherwise raises an exception.
990 # Remove the bootstrapped controller
991 cmd
= [self
.juju_command
, "remove-k8s", "--client", cloud_name
]
992 process
= await asyncio
.create_subprocess_exec(
994 stdout
=asyncio
.subprocess
.PIPE
,
995 stderr
=asyncio
.subprocess
.PIPE
,
998 stdout
, stderr
= await process
.communicate()
1000 return_code
= process
.returncode
1003 raise Exception(stderr
)
1005 # Remove the cloud from the local config
1006 cmd
= [self
.juju_command
, "remove-cloud", "--client", cloud_name
]
1007 process
= await asyncio
.create_subprocess_exec(
1009 stdout
=asyncio
.subprocess
.PIPE
,
1010 stderr
=asyncio
.subprocess
.PIPE
,
1013 stdout
, stderr
= await process
.communicate()
1015 return_code
= process
.returncode
1018 raise Exception(stderr
)
1022 async def set_config(
1027 """Save the cluster configuration
1029 Saves the cluster information to the file store
1031 :param cluster_uuid str: The UUID of the cluster
1032 :param config dict: A dictionary containing the cluster configuration
1033 :returns: Boolean upon success or raises an exception.
1036 cluster_config
= "{}/{}.yaml".format(self
.fs
.path
, cluster_uuid
)
1037 if not os
.path
.exists(cluster_config
):
1038 self
.log
.debug("Writing config to {}".format(cluster_config
))
1039 with
open(cluster_config
, 'w') as f
:
1040 f
.write(yaml
.dump(config
, Dumper
=yaml
.Dumper
))