minor fix in yaml load with Loader
[osm/N2VC.git] / tests / test_k8s_juju_conn.py
1 # Copyright 2019 Canonical Ltd.
2
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6
7 # http://www.apache.org/licenses/LICENSE-2.0
8
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import argparse
16 import asyncio
17 import logging
18 import n2vc.k8s_juju_conn
19 from base import get_juju_public_key
20 import os
21 from osm_common.fslocal import FsLocal
22 import subprocess
23 import yaml
24
25
26 def get_args():
27 parser = argparse.ArgumentParser()
28 parser.add_argument("--cluster_uuid", help='The UUID of an existing cluster to use', default=None)
29 parser.add_argument("--reset", action="store_true")
30 return parser.parse_args()
31
32 async def main():
33
34 args = get_args()
35
36 reuse_cluster_uuid = args.cluster_uuid
37
38 log = logging.getLogger()
39 log.level = logging.DEBUG
40
41 # Extract parameters from the environment in order to run our tests
42 vca_host = os.getenv('VCA_HOST', '127.0.0.1')
43 vca_port = os.getenv('VCA_PORT', 17070)
44 vca_user = os.getenv('VCA_USER', 'admin')
45 vca_charms = os.getenv('VCA_CHARMS', None)
46 vca_secret = os.getenv('VCA_SECRET', None)
47 vca_ca_cert = os.getenv('VCA_CACERT', None)
48
49 # Get the Juju Public key
50 juju_public_key = get_juju_public_key()
51 if juju_public_key:
52 with open(juju_public_key, 'r') as f:
53 juju_public_key = f.read()
54 else:
55 raise Exception("No Juju Public Key found")
56
57 storage = {
58 'driver': 'local',
59 'path': '/srv/app/storage'
60 }
61 fs = FsLocal()
62 fs.fs_connect(storage)
63
64 client = n2vc.k8s_juju_conn.K8sJujuConnector(
65 kubectl_command = '/bin/true',
66 fs = fs,
67 )
68
69 # kubectl config view --raw
70 # microk8s.config
71
72 # if microk8s then
73 kubecfg = subprocess.getoutput('microk8s.config')
74 # else
75 # kubecfg.subprocess.getoutput('kubectl config view --raw')
76
77 k8screds = yaml.load(kubecfg, Loader=yaml.FullLoader)
78 namespace = 'testing'
79 kdu_model = "./tests/bundles/k8s-zookeeper.yaml"
80
81 """init_env"""
82 cluster_uuid = await client.init_env(k8screds, namespace, reuse_cluster_uuid=reuse_cluster_uuid)
83 print(cluster_uuid)
84
85 if not reuse_cluster_uuid:
86 # This is a new cluster, so install to it
87
88 """install"""
89 # async def install(self, cluster_uuid, kdu_model, atomic=True, timeout=None, params=None):
90 # TODO: Re-add storage declaration to bundle. The API doesn't support the storage option yet. David is investigating.
91
92 # Deploy the bundle
93 kdu_instance = await client.install(cluster_uuid, kdu_model, atomic=True, timeout=600)
94
95 if kdu_instance:
96 # Inspect
97 print("Getting status")
98 status = await client.status_kdu(cluster_uuid, kdu_instance)
99 print(status)
100
101 # Inspect the bundle
102 config = await client.inspect_kdu(kdu_model)
103 print(config)
104
105 readme = await client.help_kdu(kdu_model)
106 # print(readme)
107
108
109 """upgrade
110 Upgrade to a newer version of the bundle
111 """
112 kdu_model_upgrade = "./tests/bundles/k8s-zookeeper-upgrade.yaml"
113 upgraded = await client.upgrade(cluster_uuid, namespace, kdu_model=kdu_model_upgrade)
114
115 kdu_model_upgrade = "./tests/bundles/k8s-zookeeper-downgrade.yaml"
116 upgraded = await client.upgrade(cluster_uuid, namespace, kdu_model=kdu_model_upgrade)
117
118 """uninstall"""
119
120 """reset"""
121 if args.reset:
122 await client.reset(cluster_uuid)
123
124 await client.logout()
125
126 print("Done")
127
128 if __name__ == "__main__":
129 asyncio.run(main())