1 # A simple test to exercise the libraries' functionality
9 from n2vc
.vnf
import N2VC
14 - id: multicharmvdu-ns
15 name: multicharmvdu-ns
16 short-name: multicharmvdu-ns
17 description: NS with 1 VNF
21 - vnfd-id-ref: multicharmvdu-vnf
28 vnfd-connection-point-ref:
29 - vnfd-id-ref: multicharmvdu-vnf
30 member-vnf-index-ref: '1'
31 vnfd-connection-point-ref: vnf-data
37 - id: multicharmvdu-vnf
38 name: multicharmvdu-vnf
39 short-name: multicharmvdu-vnf
41 description: A VNF consisting of 1 VDUs w/charm
55 internal-connection-point:
56 - id-ref: dataVM-internal
72 internal-connection-point-ref: dataVM-internal
78 external-connection-point-ref: vnf-data
79 internal-connection-point:
82 short-name: dataVM-internal
87 initial-config-primitive:
105 value: '/home/ubuntu/first-touch-dataVM'
117 default-value: '/home/ubuntu/touched'
120 class PythonTest(unittest
.TestCase
):
125 self
.log
= logging
.getLogger()
126 self
.log
.level
= logging
.DEBUG
128 self
.loop
= asyncio
.get_event_loop()
130 # self.loop = asyncio.new_event_loop()
131 # asyncio.set_event_loop(None)
133 # Extract parameters from the environment in order to run our test
134 vca_host
= os
.getenv('VCA_HOST', '127.0.0.1')
135 vca_port
= os
.getenv('VCA_PORT', 17070)
136 vca_user
= os
.getenv('VCA_USER', 'admin')
137 vca_charms
= os
.getenv('VCA_CHARMS', None)
138 vca_secret
= os
.getenv('VCA_SECRET', None)
145 artifacts
=vca_charms
,
149 self
.loop
.run_until_complete(self
.n2vc
.logout())
151 def get_descriptor(self
, descriptor
):
154 tmp
= yaml
.load(descriptor
)
156 # Remove the envelope
157 root
= list(tmp
.keys())[0]
158 if root
== "nsd:nsd-catalog":
159 desc
= tmp
['nsd:nsd-catalog']['nsd'][0]
160 elif root
== "vnfd:vnfd-catalog":
161 desc
= tmp
['vnfd:vnfd-catalog']['vnfd'][0]
166 def n2vc_callback(self
, model_name
, application_name
, workload_status
, task
=None):
167 """We pass the vnfd when setting up the callback, so expect it to be
168 returned as a tuple."""
169 if workload_status
and not task
:
170 self
.log
.debug("Callback: workload status \"{}\"".format(workload_status
))
172 if workload_status
in ["blocked"]:
173 task
= asyncio
.ensure_future(
174 self
.n2vc
.ExecutePrimitive(
180 'ssh-hostname': '10.195.8.78',
181 'ssh-username': 'ubuntu',
182 'ssh-password': 'ubuntu'
186 task
.add_done_callback(functools
.partial(self
.n2vc_callback
, None, None, None))
188 elif workload_status
in ["active"]:
189 self
.log
.debug("Removing charm")
190 task
= asyncio
.ensure_future(
191 self
.n2vc
.RemoveCharms(model_name
, application_name
, self
.n2vc_callback
)
193 task
.add_done_callback(functools
.partial(self
.n2vc_callback
, None, None, None))
195 def test_deploy_application(self
):
196 stream_handler
= logging
.StreamHandler(sys
.stdout
)
197 self
.log
.addHandler(stream_handler
)
199 self
.log
.info("Log handler installed")
200 nsd
= self
.get_descriptor(NSD_YAML
)
201 vnfd
= self
.get_descriptor(VNFD_YAML
)
205 vca_charms
= os
.getenv('VCA_CHARMS', None)
211 """An inner function to do the deployment of a charm from
214 charm_dir
= "{}/{}".format(vca_charms
, charm
)
216 # Setting this to an IP that will fail the initial config.
217 # This will be detected in the callback, which will execute
218 # the "config" primitive with the right IP address.
219 params
['rw_mgmt_ip'] = '10.195.8.78'
221 # self.loop.run_until_complete(n.CreateNetworkService(nsd))
224 vnf_name
= self
.n2vc
.FormatApplicationName(
230 self
.loop
.run_until_complete(
231 self
.n2vc
.DeployCharms(
242 # Check if the VDUs in this VNF have a charm
243 for vdu
in vnfd
['vdu']:
244 vdu_config
= vdu
.get('vdu-configuration')
246 juju
= vdu_config
['juju']
247 self
.assertIsNotNone(juju
)
249 charm
= juju
['charm']
250 self
.assertIsNotNone(charm
)
252 params
['initial-config-primitive'] = vdu_config
['initial-config-primitive']
257 # Check if this VNF has a charm
258 vnf_config
= vnfd
.get("vnf-configuration")
260 juju
= vnf_config
['juju']
261 self
.assertIsNotNone(juju
)
263 charm
= juju
['charm']
264 self
.assertIsNotNone(charm
)
266 params
['initial-config-primitive'] = vnf_config
['initial-config-primitive']
271 self
.loop
.run_forever()
273 # self.loop.run_until_complete(n.GetMetrics(vnfd, nsd=nsd))
276 # ExecutePrimitive(self, nsd, vnfd, vnf_member_index, primitive, callback, *callback_args, **params):
278 # self.loop.run_until_complete(n.DestroyNetworkService(nsd))
280 # self.loop.run_until_complete(self.n2vc.logout())
282 self
.log
.removeHandler(stream_handler
)