1 # A simple test to exercise the libraries' functionality
9 from n2vc
.vnf
import N2VC
14 - id: multicharmvdu-ns
15 name: multicharmvdu-ns
16 short-name: multicharmvdu-ns
17 description: NS with 1 VNF
21 - vnfd-id-ref: multicharmvdu-vnf
28 vnfd-connection-point-ref:
29 - vnfd-id-ref: multicharmvdu-vnf
30 member-vnf-index-ref: '1'
31 vnfd-connection-point-ref: vnf-data
37 - id: multicharmvdu-vnf
38 name: multicharmvdu-vnf
39 short-name: multicharmvdu-vnf
41 description: A VNF consisting of 1 VDUs w/charm
55 internal-connection-point:
56 - id-ref: dataVM-internal
72 internal-connection-point-ref: dataVM-internal
78 external-connection-point-ref: vnf-data
79 internal-connection-point:
82 short-name: dataVM-internal
87 initial-config-primitive:
101 value: '/home/ubuntu/first-touch-dataVM'
112 default-value: '/home/ubuntu/touched'
115 class PythonTest(unittest
.TestCase
):
120 self
.log
= logging
.getLogger()
121 self
.log
.level
= logging
.DEBUG
123 self
.loop
= asyncio
.get_event_loop()
125 # self.loop = asyncio.new_event_loop()
126 # asyncio.set_event_loop(None)
128 # Extract parameters from the environment in order to run our test
129 vca_host
= os
.getenv('VCA_HOST', '127.0.0.1')
130 vca_port
= os
.getenv('VCA_PORT', 17070)
131 vca_user
= os
.getenv('VCA_USER', 'admin')
132 vca_charms
= os
.getenv('VCA_CHARMS', None)
133 vca_secret
= os
.getenv('VCA_SECRET', None)
140 artifacts
=vca_charms
,
144 self
.loop
.run_until_complete(self
.n2vc
.logout())
146 def get_descriptor(self
, descriptor
):
149 tmp
= yaml
.load(descriptor
)
151 # Remove the envelope
152 root
= list(tmp
.keys())[0]
153 if root
== "nsd:nsd-catalog":
154 desc
= tmp
['nsd:nsd-catalog']['nsd'][0]
155 elif root
== "vnfd:vnfd-catalog":
156 desc
= tmp
['vnfd:vnfd-catalog']['vnfd'][0]
161 def n2vc_callback(self
, model_name
, application_name
, workload_status
, task
=None):
162 """We pass the vnfd when setting up the callback, so expect it to be
163 returned as a tuple."""
164 if workload_status
and not task
:
165 self
.log
.debug("Callback: workload status \"{}\"".format(workload_status
))
167 if workload_status
in ["blocked"]:
168 task
= asyncio
.ensure_future(
169 self
.n2vc
.ExecutePrimitive(
175 'ssh-hostname': '10.195.8.78',
176 'ssh-username': 'ubuntu',
177 'ssh-password': 'ubuntu'
181 task
.add_done_callback(functools
.partial(self
.n2vc_callback
, None, None, None))
183 elif workload_status
in ["active"]:
184 self
.log
.debug("Removing charm")
185 task
= asyncio
.ensure_future(
186 self
.n2vc
.RemoveCharms(model_name
, application_name
, self
.n2vc_callback
)
188 task
.add_done_callback(functools
.partial(self
.n2vc_callback
, None, None, None))
190 def test_deploy_application(self
):
191 stream_handler
= logging
.StreamHandler(sys
.stdout
)
192 self
.log
.addHandler(stream_handler
)
194 self
.log
.info("Log handler installed")
195 nsd
= self
.get_descriptor(NSD_YAML
)
196 vnfd
= self
.get_descriptor(VNFD_YAML
)
200 vca_charms
= os
.getenv('VCA_CHARMS', None)
206 """An inner function to do the deployment of a charm from
209 charm_dir
= "{}/{}".format(vca_charms
, charm
)
211 # Setting this to an IP that will fail the initial config.
212 # This will be detected in the callback, which will execute
213 # the "config" primitive with the right IP address.
214 params
['rw_mgmt_ip'] = '10.195.8.78'
216 # self.loop.run_until_complete(n.CreateNetworkService(nsd))
219 vnf_name
= self
.n2vc
.FormatApplicationName(
225 self
.loop
.run_until_complete(
226 self
.n2vc
.DeployCharms(
237 # Check if the VDUs in this VNF have a charm
238 for vdu
in vnfd
['vdu']:
239 vdu_config
= vdu
.get('vdu-configuration')
241 juju
= vdu_config
['juju']
242 self
.assertIsNotNone(juju
)
244 charm
= juju
['charm']
245 self
.assertIsNotNone(charm
)
247 params
['initial-config-primitive'] = vdu_config
['initial-config-primitive']
252 # Check if this VNF has a charm
253 vnf_config
= vnfd
.get("vnf-configuration")
255 juju
= vnf_config
['juju']
256 self
.assertIsNotNone(juju
)
258 charm
= juju
['charm']
259 self
.assertIsNotNone(charm
)
261 params
['initial-config-primitive'] = vnf_config
['initial-config-primitive']
266 self
.loop
.run_forever()
268 # self.loop.run_until_complete(n.GetMetrics(vnfd, nsd=nsd))
271 # ExecutePrimitive(self, nsd, vnfd, vnf_member_index, primitive, callback, *callback_args, **params):
273 # self.loop.run_until_complete(n.DestroyNetworkService(nsd))
275 # self.loop.run_until_complete(self.n2vc.logout())
277 self
.log
.removeHandler(stream_handler
)