1 # Copyright 2019 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
17 from .exceptions
import NotImplemented
21 # from juju.bundle import BundleHandler
22 from juju
.controller
import Controller
23 from juju
.model
import Model
24 from juju
.errors
import JujuAPIError
, JujuError
28 from n2vc
.k8s_conn
import K8sConnector
33 # from .vnf import N2VC
39 class K8sJujuConnector(K8sConnector
):
45 kubectl_command
: str = '/usr/bin/kubectl',
46 juju_command
: str = '/usr/bin/juju',
52 :param kubectl_command: path to kubectl executable
53 :param helm_command: path to helm executable
54 :param fs: file system for kubernetes and helm configuration
59 K8sConnector
.__init
__(
63 on_update_db
=on_update_db
,
67 self
.info('Initializing K8S Juju connector')
69 self
.authenticated
= False
71 self
.log
= logging
.getLogger(__name__
)
73 self
.juju_command
= juju_command
76 self
.info('K8S Juju connector initialized')
82 namespace
: str = 'kube-system',
83 reuse_cluster_uuid
: str = None,
85 """Initialize a Kubernetes environment
87 :param k8s_creds dict: A dictionary containing the Kubernetes cluster
89 :param namespace str: The Kubernetes namespace to initialize
91 :return: UUID of the k8s context or raises an exception
96 Bootstrapping cannot be done, by design, through the API. We need to
103 1. Has the environment already been bootstrapped?
104 - Check the database to see if we have a record for this env
106 2. If this is a new env, create it
107 - Add the k8s cloud to Juju
109 - Record it in the database
111 3. Connect to the Juju controller for this cloud
114 # cluster_uuid = reuse_cluster_uuid
115 # if not cluster_uuid:
116 # cluster_uuid = str(uuid4())
118 ##################################################
119 # TODO: Pull info from db based on the namespace #
120 ##################################################
122 if not reuse_cluster_uuid
:
123 # This is a new cluster, so bootstrap it
125 cluster_uuid
= str(uuid
.uuid4())
127 # Is a local k8s cluster?
128 localk8s
= self
.is_local_k8s(k8s_creds
)
130 # If the k8s is external, the juju controller needs a loadbalancer
131 loadbalancer
= False if localk8s
else True
133 # Name the new k8s cloud
134 k8s_cloud
= "{}-k8s".format(namespace
)
136 print("Adding k8s cloud {}".format(k8s_cloud
))
137 await self
.add_k8s(k8s_cloud
, k8s_creds
)
139 # Bootstrap Juju controller
140 print("Bootstrapping...")
141 await self
.bootstrap(k8s_cloud
, cluster_uuid
, loadbalancer
)
142 print("Bootstrap done.")
144 # Get the controller information
146 # Parse ~/.local/share/juju/controllers.yaml
147 # controllers.testing.api-endpoints|ca-cert|uuid
148 print("Getting controller endpoints")
149 with
open(os
.path
.expanduser(
150 "~/.local/share/juju/controllers.yaml"
152 controllers
= yaml
.load(f
, Loader
=yaml
.Loader
)
153 controller
= controllers
['controllers'][cluster_uuid
]
154 endpoints
= controller
['api-endpoints']
155 self
.juju_endpoint
= endpoints
[0]
156 self
.juju_ca_cert
= controller
['ca-cert']
158 # Parse ~/.local/share/juju/accounts
159 # controllers.testing.user|password
160 print("Getting accounts")
161 with
open(os
.path
.expanduser(
162 "~/.local/share/juju/accounts.yaml"
164 controllers
= yaml
.load(f
, Loader
=yaml
.Loader
)
165 controller
= controllers
['controllers'][cluster_uuid
]
167 self
.juju_user
= controller
['user']
168 self
.juju_secret
= controller
['password']
170 print("user: {}".format(self
.juju_user
))
171 print("secret: {}".format(self
.juju_secret
))
172 print("endpoint: {}".format(self
.juju_endpoint
))
173 print("ca-cert: {}".format(self
.juju_ca_cert
))
175 # raise Exception("EOL")
177 self
.juju_public_key
= None
180 'endpoint': self
.juju_endpoint
,
181 'username': self
.juju_user
,
182 'secret': self
.juju_secret
,
183 'cacert': self
.juju_ca_cert
,
184 'namespace': namespace
,
185 'loadbalancer': loadbalancer
,
188 # Store the cluster configuration so it
189 # can be used for subsequent calls
190 print("Setting config")
191 await self
.set_config(cluster_uuid
, config
)
194 # This is an existing cluster, so get its config
195 cluster_uuid
= reuse_cluster_uuid
197 config
= self
.get_config(cluster_uuid
)
199 self
.juju_endpoint
= config
['endpoint']
200 self
.juju_user
= config
['username']
201 self
.juju_secret
= config
['secret']
202 self
.juju_ca_cert
= config
['cacert']
203 self
.juju_public_key
= None
205 # Login to the k8s cluster
206 if not self
.authenticated
:
207 await self
.login(cluster_uuid
)
209 # We're creating a new cluster
210 print("Getting model {}".format(self
.get_namespace(cluster_uuid
), cluster_uuid
=cluster_uuid
))
211 model
= await self
.get_model(
212 self
.get_namespace(cluster_uuid
),
213 cluster_uuid
=cluster_uuid
216 # Disconnect from the model
217 if model
and model
.is_connected():
218 await model
.disconnect()
220 return cluster_uuid
, True
222 """Repo Management"""
229 raise NotImplemented()
231 async def repo_list(self
):
232 raise NotImplemented()
234 async def repo_remove(
238 raise NotImplemented()
245 uninstall_sw
: bool = False
249 Resets the Kubernetes cluster by removing the model that represents it.
251 :param cluster_uuid str: The UUID of the cluster to reset
252 :return: Returns True if successful or raises an exception.
256 if not self
.authenticated
:
257 await self
.login(cluster_uuid
)
259 if self
.controller
.is_connected():
261 namespace
= self
.get_namespace(cluster_uuid
)
262 if await self
.has_model(namespace
):
263 print("[reset] Destroying model")
264 await self
.controller
.destroy_model(
269 # Disconnect from the controller
270 print("[reset] Disconnecting controller")
271 await self
.controller
.disconnect()
273 # Destroy the controller (via CLI)
274 print("[reset] Destroying controller")
275 await self
.destroy_controller(cluster_uuid
)
277 print("[reset] Removing k8s cloud")
278 namespace
= self
.get_namespace(cluster_uuid
)
279 k8s_cloud
= "{}-k8s".format(namespace
)
280 await self
.remove_cloud(k8s_cloud
)
282 except Exception as ex
:
283 print("Caught exception during reset: {}".format(ex
))
292 timeout
: float = 300,
298 :param cluster_uuid str: The UUID of the cluster to install to
299 :param kdu_model str: The name or path of a bundle to install
300 :param atomic bool: If set, waits until the model is active and resets
301 the cluster on failure.
302 :param timeout int: The time, in seconds, to wait for the install
304 :param params dict: Key-value pairs of instantiation parameters
306 :return: If successful, returns ?
309 if not self
.authenticated
:
310 print("[install] Logging in to the controller")
311 await self
.login(cluster_uuid
)
314 # Get or create the model, based on the NS
316 model_name
= db_dict
["filter"]["_id"]
318 self
.log
.debug("Checking for model named {}".format(model_name
))
319 model
= await self
.get_model(model_name
, cluster_uuid
=cluster_uuid
)
321 # Create the new model
322 self
.log
.debug("Adding model: {}".format(model_name
))
323 model
= await self
.add_model(model_name
, cluster_uuid
=cluster_uuid
)
326 # TODO: Instantiation parameters
329 "Juju bundle that models the KDU, in any of the following ways:
330 - <juju-repo>/<juju-bundle>
331 - <juju-bundle folder under k8s_models folder in the package>
332 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder in the package>
333 - <URL_where_to_fetch_juju_bundle>
337 if kdu_model
.startswith("cs:"):
339 elif kdu_model
.startswith("http"):
345 # if kdu_model.endswith(".tar.gz") or kdu_model.endswith(".tgz")
346 # Uncompress temporarily
347 # bundle = <uncompressed file>
351 # Raise named exception that the bundle could not be found
354 print("[install] deploying {}".format(bundle
))
355 await model
.deploy(bundle
)
357 # Get the application
359 # applications = model.applications
360 print("[install] Applications: {}".format(model
.applications
))
361 for name
in model
.applications
:
362 print("[install] Waiting for {} to settle".format(name
))
363 application
= model
.applications
[name
]
365 # It's not enough to wait for all units to be active;
366 # the application status needs to be active as well.
367 print("Waiting for all units to be active...")
368 await model
.block_until(
370 unit
.agent_status
== 'idle'
371 and application
.status
in ['active', 'unknown']
372 and unit
.workload_status
in [
374 ] for unit
in application
.units
378 print("All units active.")
380 except concurrent
.futures
._base
.TimeoutError
:
381 print("[install] Timeout exceeded; resetting cluster")
382 await self
.reset(cluster_uuid
)
385 # Wait for the application to be active
386 if model
.is_connected():
387 print("[install] Disconnecting model")
388 await model
.disconnect()
391 raise Exception("Unable to install")
393 async def instances_list(
398 returns a list of deployed releases in a cluster
400 :param cluster_uuid: the cluster
409 kdu_model
: str = None,
414 :param cluster_uuid str: The UUID of the cluster to upgrade
415 :param kdu_instance str: The unique name of the KDU instance
416 :param kdu_model str: The name or path of the bundle to upgrade to
417 :param params dict: Key-value pairs of instantiation parameters
419 :return: If successful, reference to the new revision number of the
423 # TODO: Loop through the bundle and upgrade each charm individually
426 The API doesn't have a concept of bundle upgrades, because there are
427 many possible changes: charm revision, disk, number of units, etc.
429 As such, we are only supporting a limited subset of upgrades. We'll
430 upgrade the charm revision but leave storage and scale untouched.
432 Scale changes should happen through OSM constructs, and changes to
433 storage would require a redeployment of the service, at least in this
436 namespace
= self
.get_namespace(cluster_uuid
)
437 model
= await self
.get_model(namespace
, cluster_uuid
=cluster_uuid
)
439 with
open(kdu_model
, 'r') as f
:
440 bundle
= yaml
.safe_load(f
)
444 'description': 'Test bundle',
445 'bundle': 'kubernetes',
448 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
451 'password': 'manopw',
452 'root_password': 'osm4u',
455 'series': 'kubernetes'
460 # TODO: This should be returned in an agreed-upon format
461 for name
in bundle
['applications']:
462 print(model
.applications
)
463 application
= model
.applications
[name
]
466 path
= bundle
['applications'][name
]['charm']
469 await application
.upgrade_charm(switch
=path
)
470 except juju
.errors
.JujuError
as ex
:
471 if 'already running charm' in str(ex
):
472 # We're already running this version
475 await model
.disconnect()
478 raise NotImplemented()
489 :param cluster_uuid str: The UUID of the cluster to rollback
490 :param kdu_instance str: The unique name of the KDU instance
491 :param revision int: The revision to revert to. If omitted, rolls back
492 the previous upgrade.
494 :return: If successful, returns the revision of active KDU instance,
495 or raises an exception
497 raise NotImplemented()
505 """Uninstall a KDU instance
507 :param cluster_uuid str: The UUID of the cluster to uninstall
508 :param kdu_instance str: The unique name of the KDU instance
510 :return: Returns True if successful, or raises an exception
514 # Remove an application from the model
515 model
= await self
.get_model(self
.get_namespace(cluster_uuid
), cluster_uuid
=cluster_uuid
)
518 # Get the application
519 if kdu_instance
not in model
.applications
:
520 # TODO: Raise a named exception
521 raise Exception("Application not found.")
523 application
= model
.applications
[kdu_instance
]
525 # Destroy the application
526 await application
.destroy()
528 # TODO: Verify removal
534 async def inspect_kdu(
540 Inspects a bundle and returns a dictionary of config parameters and
541 their default values.
543 :param kdu_model str: The name or path of the bundle to inspect.
545 :return: If successful, returns a dictionary of available parameters
546 and their default values.
550 with
open(kdu_model
, 'r') as f
:
551 bundle
= yaml
.safe_load(f
)
555 'description': 'Test bundle',
556 'bundle': 'kubernetes',
559 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
562 'password': 'manopw',
563 'root_password': 'osm4u',
566 'series': 'kubernetes'
571 # TODO: This should be returned in an agreed-upon format
572 kdu
= bundle
['applications']
582 If available, returns the README of the bundle.
584 :param kdu_model str: The name or path of a bundle
586 :return: If found, returns the contents of the README.
590 files
= ['README', 'README.txt', 'README.md']
591 path
= os
.path
.dirname(kdu_model
)
592 for file in os
.listdir(path
):
594 with
open(file, 'r') as f
:
600 async def status_kdu(
605 """Get the status of the KDU
607 Get the current status of the KDU instance.
609 :param cluster_uuid str: The UUID of the cluster
610 :param kdu_instance str: The unique id of the KDU instance
612 :return: Returns a dictionary containing namespace, state, resources,
617 model
= await self
.get_model(self
.get_namespace(cluster_uuid
), cluster_uuid
=cluster_uuid
)
619 # model = await self.get_model_by_uuid(cluster_uuid)
621 model_status
= await model
.get_status()
622 status
= model_status
.applications
624 for name
in model_status
.applications
:
625 application
= model_status
.applications
[name
]
627 'status': application
['status']['status']
630 if model
.is_connected():
631 await model
.disconnect()
641 """Add a k8s cloud to Juju
643 Adds a Kubernetes cloud to Juju, so it can be bootstrapped with a
646 :param cloud_name str: The name of the cloud to add.
647 :param credentials dict: A dictionary representing the output of
648 `kubectl config view --raw`.
650 :returns: True if successful, otherwise raises an exception.
653 cmd
= [self
.juju_command
, "add-k8s", "--local", cloud_name
]
656 process
= await asyncio
.create_subprocess_exec(
658 stdout
=asyncio
.subprocess
.PIPE
,
659 stderr
=asyncio
.subprocess
.PIPE
,
660 stdin
=asyncio
.subprocess
.PIPE
,
663 # Feed the process the credentials
664 process
.stdin
.write(credentials
.encode("utf-8"))
665 await process
.stdin
.drain()
666 process
.stdin
.close()
668 stdout
, stderr
= await process
.communicate()
670 return_code
= process
.returncode
672 print("add-k8s return code: {}".format(return_code
))
675 raise Exception(stderr
)
683 ) -> juju
.model
.Model
:
684 """Adds a model to the controller
686 Adds a new model to the Juju controller
688 :param model_name str: The name of the model to add.
689 :returns: The juju.model.Model object of the new model upon success or
692 if not self
.authenticated
:
693 await self
.login(cluster_uuid
)
695 self
.log
.debug("Adding model '{}' to cluster_uuid '{}'".format(model_name
, cluster_uuid
))
696 model
= await self
.controller
.add_model(
698 config
={'authorized-keys': self
.juju_public_key
}
708 """Bootstrap a Kubernetes controller
710 Bootstrap a Juju controller inside the Kubernetes cluster
712 :param cloud_name str: The name of the cloud.
713 :param cluster_uuid str: The UUID of the cluster to bootstrap.
714 :param loadbalancer bool: If the controller should use loadbalancer or not.
715 :returns: True upon success or raises an exception.
719 cmd
= [self
.juju_command
, "bootstrap", cloud_name
, cluster_uuid
]
722 For public clusters, specify that the controller service is using a LoadBalancer.
724 cmd
= [self
.juju_command
, "bootstrap", cloud_name
, cluster_uuid
, "--config", "controller-service-type=loadbalancer"]
726 print("Bootstrapping controller {} in cloud {}".format(
727 cluster_uuid
, cloud_name
730 process
= await asyncio
.create_subprocess_exec(
732 stdout
=asyncio
.subprocess
.PIPE
,
733 stderr
=asyncio
.subprocess
.PIPE
,
736 stdout
, stderr
= await process
.communicate()
738 return_code
= process
.returncode
742 if b
'already exists' not in stderr
:
743 raise Exception(stderr
)
747 async def destroy_controller(
751 """Destroy a Kubernetes controller
753 Destroy an existing Kubernetes controller.
755 :param cluster_uuid str: The UUID of the cluster to bootstrap.
756 :returns: True upon success or raises an exception.
760 "destroy-controller",
761 "--destroy-all-models",
767 process
= await asyncio
.create_subprocess_exec(
769 stdout
=asyncio
.subprocess
.PIPE
,
770 stderr
=asyncio
.subprocess
.PIPE
,
773 stdout
, stderr
= await process
.communicate()
775 return_code
= process
.returncode
779 if 'already exists' not in stderr
:
780 raise Exception(stderr
)
786 """Get the cluster configuration
788 Gets the configuration of the cluster
790 :param cluster_uuid str: The UUID of the cluster.
791 :return: A dict upon success, or raises an exception.
793 cluster_config
= "{}/{}.yaml".format(self
.fs
.path
, cluster_uuid
)
794 if os
.path
.exists(cluster_config
):
795 with
open(cluster_config
, 'r') as f
:
796 config
= yaml
.safe_load(f
.read())
800 "Unable to locate configuration for cluster {}".format(
809 ) -> juju
.model
.Model
:
810 """Get a model from the Juju Controller.
812 Note: Model objects returned must call disconnected() before it goes
815 :param model_name str: The name of the model to get
816 :return The juju.model.Model object if found, or None.
818 if not self
.authenticated
:
819 await self
.login(cluster_uuid
)
822 models
= await self
.controller
.list_models()
823 self
.log
.debug(models
)
824 if model_name
in models
:
825 self
.log
.debug("Found model: {}".format(model_name
))
826 model
= await self
.controller
.get_model(
835 """Get the namespace UUID
836 Gets the namespace's unique name
838 :param cluster_uuid str: The UUID of the cluster
839 :returns: The namespace UUID, or raises an exception
841 config
= self
.get_config(cluster_uuid
)
843 # Make sure the name is in the config
844 if 'namespace' not in config
:
845 raise Exception("Namespace not found.")
847 # TODO: We want to make sure this is unique to the cluster, in case
848 # the cluster is being reused.
849 # Consider pre/appending the cluster id to the namespace string
850 return config
['namespace']
856 """Check if a model exists in the controller
858 Checks to see if a model exists in the connected Juju controller.
860 :param model_name str: The name of the model
861 :return: A boolean indicating if the model exists
863 models
= await self
.controller
.list_models()
865 if model_name
in models
:
873 """Check if a cluster is local
875 Checks if a cluster is running in the local host
877 :param credentials dict: A dictionary containing the k8s credentials
878 :returns: A boolean if the cluster is running locally
880 creds
= yaml
.safe_load(credentials
)
881 if os
.getenv("OSMLCM_VCA_APIPROXY"):
882 host_ip
= os
.getenv("OSMLCM_VCA_APIPROXY")
884 if creds
and host_ip
:
885 for cluster
in creds
['clusters']:
886 if 'server' in cluster
['cluster']:
887 if host_ip
in cluster
['cluster']['server']:
892 async def login(self
, cluster_uuid
):
893 """Login to the Juju controller."""
895 if self
.authenticated
:
898 self
.connecting
= True
900 # Test: Make sure we have the credentials loaded
901 config
= self
.get_config(cluster_uuid
)
903 self
.juju_endpoint
= config
['endpoint']
904 self
.juju_user
= config
['username']
905 self
.juju_secret
= config
['secret']
906 self
.juju_ca_cert
= config
['cacert']
907 self
.juju_public_key
= None
909 self
.controller
= Controller()
913 "Connecting to controller... ws://{} as {}/{}".format(
920 await self
.controller
.connect(
921 endpoint
=self
.juju_endpoint
,
922 username
=self
.juju_user
,
923 password
=self
.juju_secret
,
924 cacert
=self
.juju_ca_cert
,
926 self
.authenticated
= True
927 self
.log
.debug("JujuApi: Logged into controller")
928 except Exception as ex
:
930 self
.log
.debug("Caught exception: {}".format(ex
))
933 self
.log
.fatal("VCA credentials not configured.")
934 self
.authenticated
= False
936 async def logout(self
):
937 """Logout of the Juju controller."""
939 if not self
.authenticated
:
942 for model
in self
.models
:
943 print("Logging out of model {}".format(model
))
944 await self
.models
[model
].disconnect()
947 self
.log
.debug("Disconnecting controller {}".format(
950 await self
.controller
.disconnect()
951 self
.controller
= None
953 self
.authenticated
= False
955 async def remove_cloud(
959 """Remove a k8s cloud from Juju
961 Removes a Kubernetes cloud from Juju.
963 :param cloud_name str: The name of the cloud to add.
965 :returns: True if successful, otherwise raises an exception.
968 # Remove the bootstrapped controller
969 cmd
= [self
.juju_command
, "remove-k8s", "--client", cloud_name
]
970 process
= await asyncio
.create_subprocess_exec(
972 stdout
=asyncio
.subprocess
.PIPE
,
973 stderr
=asyncio
.subprocess
.PIPE
,
976 stdout
, stderr
= await process
.communicate()
978 return_code
= process
.returncode
981 raise Exception(stderr
)
983 # Remove the cloud from the local config
984 cmd
= [self
.juju_command
, "remove-cloud", "--client", cloud_name
]
985 process
= await asyncio
.create_subprocess_exec(
987 stdout
=asyncio
.subprocess
.PIPE
,
988 stderr
=asyncio
.subprocess
.PIPE
,
991 stdout
, stderr
= await process
.communicate()
993 return_code
= process
.returncode
996 raise Exception(stderr
)
1000 async def set_config(
1005 """Save the cluster configuration
1007 Saves the cluster information to the file store
1009 :param cluster_uuid str: The UUID of the cluster
1010 :param config dict: A dictionary containing the cluster configuration
1011 :returns: Boolean upon success or raises an exception.
1014 cluster_config
= "{}/{}.yaml".format(self
.fs
.path
, cluster_uuid
)
1015 if not os
.path
.exists(cluster_config
):
1016 print("Writing config to {}".format(cluster_config
))
1017 with
open(cluster_config
, 'w') as f
:
1018 f
.write(yaml
.dump(config
, Dumper
=yaml
.Dumper
))