1 # A simple test to exercise the libraries' functionality
9 from n2vc
.vnf
import N2VC
14 - id: multicharmvdu-ns
15 name: multicharmvdu-ns
16 short-name: multicharmvdu-ns
17 description: NS with 1 VNF
21 - vnfd-id-ref: multicharmvdu-vnf
28 vnfd-connection-point-ref:
29 - vnfd-id-ref: multicharmvdu-vnf
30 member-vnf-index-ref: '1'
31 vnfd-connection-point-ref: vnf-data
37 - id: multicharmvdu-vnf
38 name: multicharmvdu-vnf
39 short-name: multicharmvdu-vnf
41 description: A VNF consisting of 1 VDUs w/charm
55 internal-connection-point:
56 - id-ref: dataVM-internal
72 internal-connection-point-ref: dataVM-internal
78 external-connection-point-ref: vnf-data
79 internal-connection-point:
82 short-name: dataVM-internal
87 initial-config-primitive:
101 value: '/home/ubuntu/first-touch-dataVM'
109 default-value: '/home/ubuntu/touched'
112 class PythonTest(unittest
.TestCase
):
117 self
.log
= logging
.getLogger()
118 self
.log
.level
= logging
.DEBUG
120 self
.loop
= asyncio
.get_event_loop()
122 # self.loop = asyncio.new_event_loop()
123 # asyncio.set_event_loop(None)
125 # Extract parameters from the environment in order to run our test
126 vca_host
= os
.getenv('VCA_HOST', '127.0.0.1')
127 vca_port
= os
.getenv('VCA_PORT', 17070)
128 vca_user
= os
.getenv('VCA_USER', 'admin')
129 vca_charms
= os
.getenv('VCA_CHARMS', None)
130 vca_secret
= os
.getenv('VCA_SECRET', None)
137 artifacts
=vca_charms
,
141 self
.loop
.run_until_complete(self
.n2vc
.logout())
143 def get_descriptor(self
, descriptor
):
146 tmp
= yaml
.load(descriptor
)
148 # Remove the envelope
149 root
= list(tmp
.keys())[0]
150 if root
== "nsd:nsd-catalog":
151 desc
= tmp
['nsd:nsd-catalog']['nsd'][0]
152 elif root
== "vnfd:vnfd-catalog":
153 desc
= tmp
['vnfd:vnfd-catalog']['vnfd'][0]
158 def n2vc_callback(self
, model_name
, application_name
, workload_status
, task
=None):
159 """We pass the vnfd when setting up the callback, so expect it to be
160 returned as a tuple."""
161 if workload_status
and not task
:
162 self
.log
.debug("Callback: workload status \"{}\"".format(workload_status
))
164 if workload_status
in ["blocked"]:
165 task
= asyncio
.ensure_future(
166 self
.n2vc
.ExecutePrimitive(
172 'ssh-hostname': '10.195.8.78',
173 'ssh-username': 'ubuntu',
174 'ssh-password': 'ubuntu'
178 task
.add_done_callback(functools
.partial(self
.n2vc_callback
, None, None, None))
180 elif workload_status
in ["active"]:
181 self
.log
.debug("Removing charm")
182 task
= asyncio
.ensure_future(
183 self
.n2vc
.RemoveCharms(model_name
, application_name
, self
.n2vc_callback
)
185 task
.add_done_callback(functools
.partial(self
.n2vc_callback
, None, None, None))
187 def test_deploy_application(self
):
188 stream_handler
= logging
.StreamHandler(sys
.stdout
)
189 self
.log
.addHandler(stream_handler
)
191 self
.log
.info("Log handler installed")
192 nsd
= self
.get_descriptor(NSD_YAML
)
193 vnfd
= self
.get_descriptor(VNFD_YAML
)
197 vca_charms
= os
.getenv('VCA_CHARMS', None)
203 """An inner function to do the deployment of a charm from
206 charm_dir
= "{}/{}".format(vca_charms
, charm
)
208 # Setting this to an IP that will fail the initial config.
209 # This will be detected in the callback, which will execute
210 # the "config" primitive with the right IP address.
211 params
['rw_mgmt_ip'] = '10.195.8.78'
213 # self.loop.run_until_complete(n.CreateNetworkService(nsd))
216 vnf_name
= self
.n2vc
.FormatApplicationName(
222 self
.loop
.run_until_complete(
223 self
.n2vc
.DeployCharms(
234 # Check if the VDUs in this VNF have a charm
235 for vdu
in vnfd
['vdu']:
236 vdu_config
= vdu
.get('vdu-configuration')
238 juju
= vdu_config
['juju']
239 self
.assertIsNotNone(juju
)
241 charm
= juju
['charm']
242 self
.assertIsNotNone(charm
)
244 params
['initial-config-primitive'] = vdu_config
['initial-config-primitive']
249 # Check if this VNF has a charm
250 vnf_config
= vnfd
.get("vnf-configuration")
252 juju
= vnf_config
['juju']
253 self
.assertIsNotNone(juju
)
255 charm
= juju
['charm']
256 self
.assertIsNotNone(charm
)
258 params
['initial-config-primitive'] = vnf_config
['initial-config-primitive']
263 self
.loop
.run_forever()
265 # self.loop.run_until_complete(n.GetMetrics(vnfd, nsd=nsd))
268 # ExecutePrimitive(self, nsd, vnfd, vnf_member_index, primitive, callback, *callback_args, **params):
270 # self.loop.run_until_complete(n.DestroyNetworkService(nsd))
272 # self.loop.run_until_complete(self.n2vc.logout())
274 self
.log
.removeHandler(stream_handler
)