Merge "Bug 502 improvements"
[osm/N2VC.git] / tests / test_primitive_no_parameter.py
1 # A simple test to exercise the libraries' functionality
2 import asyncio
3 import functools
4 import os
5 import sys
6 import logging
7 import unittest
8 import yaml
9 from n2vc.vnf import N2VC
10
11 NSD_YAML = """
12 nsd:nsd-catalog:
13 nsd:
14 - id: multicharmvdu-ns
15 name: multicharmvdu-ns
16 short-name: multicharmvdu-ns
17 description: NS with 1 VNF
18 version: '1.0'
19 logo: osm.png
20 constituent-vnfd:
21 - vnfd-id-ref: multicharmvdu-vnf
22 member-vnf-index: '1'
23 vld:
24 - id: datanet
25 name: datanet
26 short-name: datanet
27 type: ELAN
28 vnfd-connection-point-ref:
29 - vnfd-id-ref: multicharmvdu-vnf
30 member-vnf-index-ref: '1'
31 vnfd-connection-point-ref: vnf-data
32 """
33
34 VNFD_YAML = """
35 vnfd:vnfd-catalog:
36 vnfd:
37 - id: multicharmvdu-vnf
38 name: multicharmvdu-vnf
39 short-name: multicharmvdu-vnf
40 version: '1.0'
41 description: A VNF consisting of 1 VDUs w/charm
42 logo: osm.png
43 connection-point:
44 - id: vnf-data
45 name: vnf-data
46 short-name: vnf-data
47 type: VPORT
48 mgmt-interface:
49 cp: vnf-data
50 internal-vld:
51 - id: internal
52 name: internal
53 short-name: internal
54 type: ELAN
55 internal-connection-point:
56 - id-ref: dataVM-internal
57 vdu:
58 - id: dataVM
59 name: dataVM
60 image: xenial
61 count: '1'
62 vm-flavor:
63 vcpu-count: '1'
64 memory-mb: '1024'
65 storage-gb: '10'
66 interface:
67 - name: dataVM-eth0
68 position: '1'
69 type: INTERNAL
70 virtual-interface:
71 type: VIRTIO
72 internal-connection-point-ref: dataVM-internal
73 - name: dataVM-xe0
74 position: '2'
75 type: EXTERNAL
76 virtual-interface:
77 type: VIRTIO
78 external-connection-point-ref: vnf-data
79 internal-connection-point:
80 - id: dataVM-internal
81 name: dataVM-internal
82 short-name: dataVM-internal
83 type: VPORT
84 vdu-configuration:
85 juju:
86 charm: simple
87 initial-config-primitive:
88 - seq: '1'
89 name: config
90 parameter:
91 - name: ssh-hostname
92 value: <rw_mgmt_ip>
93 - name: ssh-username
94 value: ubuntu
95 - name: ssh-password
96 value: ubuntu
97 - seq: '2'
98 name: touch
99 parameter:
100 - name: filename
101 value: '/home/ubuntu/first-touch-dataVM'
102 - seq: '3'
103 name: start
104 config-primitive:
105 - name: touch
106 parameter:
107 - name: filename
108 data-type: STRING
109 default-value: '/home/ubuntu/touched'
110 """
111
112 class PythonTest(unittest.TestCase):
113 n2vc = None
114
115 def setUp(self):
116
117 self.log = logging.getLogger()
118 self.log.level = logging.DEBUG
119
120 self.loop = asyncio.get_event_loop()
121
122 # self.loop = asyncio.new_event_loop()
123 # asyncio.set_event_loop(None)
124
125 # Extract parameters from the environment in order to run our test
126 vca_host = os.getenv('VCA_HOST', '127.0.0.1')
127 vca_port = os.getenv('VCA_PORT', 17070)
128 vca_user = os.getenv('VCA_USER', 'admin')
129 vca_charms = os.getenv('VCA_CHARMS', None)
130 vca_secret = os.getenv('VCA_SECRET', None)
131 self.n2vc = N2VC(
132 log=self.log,
133 server=vca_host,
134 port=vca_port,
135 user=vca_user,
136 secret=vca_secret,
137 artifacts=vca_charms,
138 )
139
140 def tearDown(self):
141 self.loop.run_until_complete(self.n2vc.logout())
142
143 def get_descriptor(self, descriptor):
144 desc = None
145 try:
146 tmp = yaml.load(descriptor)
147
148 # Remove the envelope
149 root = list(tmp.keys())[0]
150 if root == "nsd:nsd-catalog":
151 desc = tmp['nsd:nsd-catalog']['nsd'][0]
152 elif root == "vnfd:vnfd-catalog":
153 desc = tmp['vnfd:vnfd-catalog']['vnfd'][0]
154 except ValueError:
155 assert False
156 return desc
157
158 def n2vc_callback(self, model_name, application_name, workload_status, task=None):
159 """We pass the vnfd when setting up the callback, so expect it to be
160 returned as a tuple."""
161 if workload_status and not task:
162 self.log.debug("Callback: workload status \"{}\"".format(workload_status))
163
164 if workload_status in ["blocked"]:
165 task = asyncio.ensure_future(
166 self.n2vc.ExecutePrimitive(
167 model_name,
168 application_name,
169 "config",
170 None,
171 params={
172 'ssh-hostname': '10.195.8.78',
173 'ssh-username': 'ubuntu',
174 'ssh-password': 'ubuntu'
175 }
176 )
177 )
178 task.add_done_callback(functools.partial(self.n2vc_callback, None, None, None))
179 pass
180 elif workload_status in ["active"]:
181 self.log.debug("Removing charm")
182 task = asyncio.ensure_future(
183 self.n2vc.RemoveCharms(model_name, application_name, self.n2vc_callback)
184 )
185 task.add_done_callback(functools.partial(self.n2vc_callback, None, None, None))
186
187 def test_deploy_application(self):
188 stream_handler = logging.StreamHandler(sys.stdout)
189 self.log.addHandler(stream_handler)
190 try:
191 self.log.info("Log handler installed")
192 nsd = self.get_descriptor(NSD_YAML)
193 vnfd = self.get_descriptor(VNFD_YAML)
194
195 if nsd and vnfd:
196
197 vca_charms = os.getenv('VCA_CHARMS', None)
198
199 params = {}
200 vnf_index = 0
201
202 def deploy():
203 """An inner function to do the deployment of a charm from
204 either a vdu or vnf.
205 """
206 charm_dir = "{}/{}".format(vca_charms, charm)
207
208 # Setting this to an IP that will fail the initial config.
209 # This will be detected in the callback, which will execute
210 # the "config" primitive with the right IP address.
211 params['rw_mgmt_ip'] = '10.195.8.78'
212
213 # self.loop.run_until_complete(n.CreateNetworkService(nsd))
214 ns_name = "default"
215
216 vnf_name = self.n2vc.FormatApplicationName(
217 ns_name,
218 vnfd['name'],
219 str(vnf_index),
220 )
221
222 self.loop.run_until_complete(
223 self.n2vc.DeployCharms(
224 ns_name,
225 vnf_name,
226 vnfd,
227 charm_dir,
228 params,
229 {},
230 self.n2vc_callback
231 )
232 )
233
234 # Check if the VDUs in this VNF have a charm
235 for vdu in vnfd['vdu']:
236 vdu_config = vdu.get('vdu-configuration')
237 if vdu_config:
238 juju = vdu_config['juju']
239 self.assertIsNotNone(juju)
240
241 charm = juju['charm']
242 self.assertIsNotNone(charm)
243
244 params['initial-config-primitive'] = vdu_config['initial-config-primitive']
245
246 deploy()
247 vnf_index += 1
248
249 # Check if this VNF has a charm
250 vnf_config = vnfd.get("vnf-configuration")
251 if vnf_config:
252 juju = vnf_config['juju']
253 self.assertIsNotNone(juju)
254
255 charm = juju['charm']
256 self.assertIsNotNone(charm)
257
258 params['initial-config-primitive'] = vnf_config['initial-config-primitive']
259
260 deploy()
261 vnf_index += 1
262
263 self.loop.run_forever()
264
265 # self.loop.run_until_complete(n.GetMetrics(vnfd, nsd=nsd))
266
267 # Test actions
268 # ExecutePrimitive(self, nsd, vnfd, vnf_member_index, primitive, callback, *callback_args, **params):
269
270 # self.loop.run_until_complete(n.DestroyNetworkService(nsd))
271
272 # self.loop.run_until_complete(self.n2vc.logout())
273 finally:
274 self.log.removeHandler(stream_handler)