Merge "[RIFT 16413, 16414] Unittest failures fixed for utest_package, utest_publisher...
[osm/SO.git] / rwlaunchpad / test / utest_nsr_handler.py
1 #!/usr/bin/env python3
2
3 #
4 # Copyright 2016 RIFT.IO Inc
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 #
18
19 import argparse
20 import asyncio
21 import logging
22 import os
23 import sys
24 import time
25 import unittest
26 import uuid
27
28 import xmlrunner
29
30 import gi.repository.RwDts as rwdts
31 import gi.repository.RwNsmYang as rwnsmyang
32 import gi.repository.NsrYang as NsrYang
33 import gi.repository.RwNsrYang as RwNsrYang
34 import gi.repository.RwTypes as RwTypes
35 import gi.repository.ProtobufC as ProtobufC
36 import gi.repository.RwResourceMgrYang as RwResourceMgrYang
37 import gi.repository.RwLaunchpadYang as launchpadyang
38 import rift.tasklets
39 import rift.test.dts
40
41 import mano_ut
42
43
44 if sys.version_info < (3, 4, 4):
45 asyncio.ensure_future = asyncio.async
46
47
48 class NsrDtsHandler(object):
49 """ The network service DTS handler """
50 NSR_XPATH = "C,/nsr:ns-instance-config/nsr:nsr"
51 SCALE_INSTANCE_XPATH = "C,/nsr:ns-instance-config/nsr:nsr/nsr:scaling-group/nsr:instance"
52
53 def __init__(self, dts, log, loop, nsm):
54 self._dts = dts
55 self._log = log
56 self._loop = loop
57 self._nsm = nsm
58
59 self._nsr_regh = None
60 self._scale_regh = None
61
62 @property
63 def nsm(self):
64 """ Return the NS manager instance """
65 return self._nsm
66
67 def get_scale_group_instances(self, nsr_id, group_name):
68 def nsr_id_from_keyspec(ks):
69 nsr_path_entry = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr.schema().keyspec_to_entry(ks)
70 nsr_id = nsr_path_entry.key00.id
71 return nsr_id
72
73 def group_name_from_keyspec(ks):
74 group_path_entry = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_ScalingGroup.schema().keyspec_to_entry(ks)
75 group_name = group_path_entry.key00.scaling_group_name_ref
76 return group_name
77
78
79 xact_ids = set()
80 for instance_cfg, keyspec in self._scale_regh.get_xact_elements(include_keyspec=True):
81 elem_nsr_id = nsr_id_from_keyspec(keyspec)
82 if elem_nsr_id != nsr_id:
83 continue
84
85 elem_group_name = group_name_from_keyspec(keyspec)
86 if elem_group_name != group_name:
87 continue
88
89 xact_ids.add(instance_cfg.id)
90
91 return xact_ids
92
93 @asyncio.coroutine
94 def register(self):
95 """ Register for Nsr create/update/delete/read requests from dts """
96
97 def nsr_id_from_keyspec(ks):
98 nsr_path_entry = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr.schema().keyspec_to_entry(ks)
99 nsr_id = nsr_path_entry.key00.id
100 return nsr_id
101
102 def group_name_from_keyspec(ks):
103 group_path_entry = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_ScalingGroup.schema().keyspec_to_entry(ks)
104 group_name = group_path_entry.key00.scaling_group_name_ref
105 return group_name
106
107 def is_instance_in_reg_elements(nsr_id, group_name, instance_id):
108 """ Return boolean indicating if scaling group instance was already commited previously.
109
110 By looking at the existing elements in this registration handle (elements not part
111 of this current xact), we can tell if the instance was configured previously without
112 keeping any application state.
113 """
114 for instance_cfg, keyspec in self._nsr_regh.get_xact_elements(include_keyspec=True):
115 elem_nsr_id = nsr_id_from_keyspec(keyspec)
116 elem_group_name = group_name_from_keyspec(keyspec)
117
118 if elem_nsr_id != nsr_id or group_name != elem_group_name:
119 continue
120
121 if instance_cfg.id == instance_id:
122 return True
123
124 return False
125
126 def get_scale_group_instance_delta(nsr_id, group_name, xact):
127
128 #1. Find all elements in the transaction add to the "added"
129 #2. Find matching elements in current elements, remove from "added".
130 #3. Find elements only in current, add to "deleted"
131
132 xact_ids = set()
133 for instance_cfg, keyspec in self._scale_regh.get_xact_elements(xact, include_keyspec=True):
134 elem_nsr_id = nsr_id_from_keyspec(keyspec)
135 if elem_nsr_id != nsr_id:
136 continue
137
138 elem_group_name = group_name_from_keyspec(keyspec)
139 if elem_group_name != group_name:
140 continue
141
142 xact_ids.add(instance_cfg.id)
143
144 current_ids = set()
145 for instance_cfg, keyspec in self._scale_regh.get_xact_elements(include_keyspec=True):
146 elem_nsr_id = nsr_id_from_keyspec(keyspec)
147 if elem_nsr_id != nsr_id:
148 continue
149
150 elem_group_name = group_name_from_keyspec(keyspec)
151 if elem_group_name != group_name:
152 continue
153
154 current_ids.add(instance_cfg.id)
155
156 delta = {
157 "added": xact_ids - current_ids,
158 "deleted": current_ids - xact_ids
159 }
160 return delta
161
162 def get_add_delete_update_cfgs(dts_member_reg, xact, key_name):
163 # Unforunately, it is currently difficult to figure out what has exactly
164 # changed in this xact without Pbdelta support (RIFT-4916)
165 # As a workaround, we can fetch the pre and post xact elements and
166 # perform a comparison to figure out adds/deletes/updates
167 xact_cfgs = list(dts_member_reg.get_xact_elements(xact))
168 curr_cfgs = list(dts_member_reg.elements)
169
170 xact_key_map = {getattr(cfg, key_name): cfg for cfg in xact_cfgs}
171 curr_key_map = {getattr(cfg, key_name): cfg for cfg in curr_cfgs}
172
173 # Find Adds
174 added_keys = set(xact_key_map) - set(curr_key_map)
175 added_cfgs = [xact_key_map[key] for key in added_keys]
176
177 # Find Deletes
178 deleted_keys = set(curr_key_map) - set(xact_key_map)
179 deleted_cfgs = [curr_key_map[key] for key in deleted_keys]
180
181 # Find Updates
182 updated_keys = set(curr_key_map) & set(xact_key_map)
183 updated_cfgs = [xact_key_map[key] for key in updated_keys if xact_key_map[key] != curr_key_map[key]]
184
185 return added_cfgs, deleted_cfgs, updated_cfgs
186
187 def on_apply(dts, acg, xact, action, scratch):
188 """Apply the configuration"""
189 def handle_create_nsr(msg):
190 # Handle create nsr requests """
191 # Do some validations
192 if not msg.has_field("nsd_ref"):
193 err = "NSD reference not provided"
194 self._log.error(err)
195 raise NetworkServiceRecordError(err)
196
197 self._log.info("Creating NetworkServiceRecord %s from nsd_id %s",
198 msg.id, msg.nsd_ref)
199
200 #nsr = self.nsm.create_nsr(msg)
201 return nsr
202
203 def handle_delete_nsr(msg):
204 @asyncio.coroutine
205 def delete_instantiation(ns_id):
206 """ Delete instantiation """
207 pass
208 #with self._dts.transaction() as xact:
209 #yield from self._nsm.terminate_ns(ns_id, xact)
210
211 # Handle delete NSR requests
212 self._log.info("Delete req for NSR Id: %s received", msg.id)
213 # Terminate the NSR instance
214 #nsr = self._nsm.get_ns_by_nsr_id(msg.id)
215
216 #nsr.set_state(NetworkServiceRecordState.TERMINATE_RCVD)
217 #event_descr = "Terminate rcvd for NS Id:%s" % msg.id
218 #nsr.record_event("terminate-rcvd", event_descr)
219
220 #self._loop.create_task(delete_instantiation(msg.id))
221
222 @asyncio.coroutine
223 def begin_instantiation(nsr):
224 # Begin instantiation
225 pass
226 #self._log.info("Beginning NS instantiation: %s", nsr.id)
227 #yield from self._nsm.instantiate_ns(nsr.id, xact)
228
229 self._log.debug("Got nsr apply (xact: %s) (action: %s)(scr: %s)",
230 xact, action, scratch)
231
232 if action == rwdts.AppconfAction.INSTALL and xact.id is None:
233 self._log.debug("No xact handle. Skipping apply config")
234 xact = None
235
236 (added_msgs, deleted_msgs, updated_msgs) = get_add_delete_update_cfgs(self._nsr_regh, xact, "id")
237
238 for msg in added_msgs:
239 self._log.info("Create NSR received in on_apply to instantiate NS:%s", msg.id)
240 #if msg.id not in self._nsm.nsrs:
241 # self._log.info("Create NSR received in on_apply to instantiate NS:%s", msg.id)
242 # nsr = handle_create_nsr(msg)
243 # self._loop.create_task(begin_instantiation(nsr))
244
245 for msg in deleted_msgs:
246 self._log.info("Delete NSR received in on_apply to terminate NS:%s", msg.id)
247 try:
248 handle_delete_nsr(msg)
249 except Exception:
250 self._log.exception("Failed to terminate NS:%s", msg.id)
251
252 for msg in updated_msgs:
253 self._log.info("Update NSR received in on_apply to change scaling groups in NS:%s", msg.id)
254
255 for group in msg.scaling_group:
256 instance_delta = get_scale_group_instance_delta(msg.id, group.scaling_group_name_ref, xact)
257 self._log.debug("Got NSR:%s scale group instance delta: %s", msg.id, instance_delta)
258
259 #for instance_id in instance_delta["added"]:
260 # self._nsm.scale_nsr_out(msg.id, group.scaling_group_name_ref, instance_id, xact)
261
262 #for instance_id in instance_delta["deleted"]:
263 # self._nsm.scale_nsr_in(msg.id, group.scaling_group_name_ref, instance_id)
264
265
266 return RwTypes.RwStatus.SUCCESS
267
268 @asyncio.coroutine
269 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
270 """ Prepare calllback from DTS for NSR """
271
272 xpath = ks_path.to_xpath(NsrYang.get_schema())
273 action = xact_info.query_action
274 self._log.debug(
275 "Got Nsr prepare callback (xact: %s) (action: %s) (info: %s), %s:%s)",
276 xact, action, xact_info, xpath, msg
277 )
278
279 fref = ProtobufC.FieldReference.alloc()
280 fref.goto_whole_message(msg.to_pbcm())
281
282 if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE]:
283 pass
284 # Ensure the Cloud account has been specified if this is an NSR create
285 #if msg.id not in self._nsm.nsrs:
286 # if not msg.has_field("cloud_account"):
287 # raise NsrInstantiationFailed("Cloud account not specified in NSR")
288
289 # We do not allow scaling actions to occur if the NS is not in running state
290 #elif msg.has_field("scaling_group"):
291 # nsr = self._nsm.nsrs[msg.id]
292 # if nsr.state != NetworkServiceRecordState.RUNNING:
293 # raise ScalingOperationError("Unable to perform scaling action when NS is not in running state")
294
295 # if len(msg.scaling_group) > 1:
296 # raise ScalingOperationError("Only a single scaling group can be configured at a time")
297
298 # for group_msg in msg.scaling_group:
299 # num_new_group_instances = len(group_msg.instance)
300 # if num_new_group_instances > 1:
301 # raise ScalingOperationError("Only a single scaling instance can be created at a time")
302
303 # elif num_new_group_instances == 1:
304 # scale_group = nsr.scaling_groups[group_msg.scaling_group_name_ref]
305 # if len(scale_group.instances) == scale_group.max_instance_count:
306 # raise ScalingOperationError("Max instances for %s reached" % scale_group)
307
308
309 acg.handle.prepare_complete_ok(xact_info.handle)
310
311
312 self._log.debug("Registering for NSR config using xpath: %s",
313 NsrDtsHandler.NSR_XPATH)
314
315 acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply)
316 with self._dts.appconf_group_create(handler=acg_hdl) as acg:
317 self._nsr_regh = acg.register(xpath=NsrDtsHandler.NSR_XPATH,
318 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE,
319 on_prepare=on_prepare)
320
321 self._scale_regh = acg.register(
322 xpath=NsrDtsHandler.SCALE_INSTANCE_XPATH,
323 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE,
324 )
325
326
327 class XPaths(object):
328 @staticmethod
329 def nsr_config(nsr_id=None):
330 return ("C,/nsr:ns-instance-config/nsr:nsr" +
331 ("[nsr:id='{}']".format(nsr_id) if nsr_id is not None else ""))
332
333 def scaling_group_instance(nsr_id, group_name, instance_id):
334 return ("C,/nsr:ns-instance-config/nsr:nsr" +
335 "[nsr:id='{}']".format(nsr_id) +
336 "/nsr:scaling-group" +
337 "[nsr:scaling-group-name-ref='{}']".format(group_name) +
338 "/nsr:instance" +
339 "[nsr:id='{}']".format(instance_id)
340 )
341
342
343 class NsrHandlerTestCase(rift.test.dts.AbstractDTSTest):
344 """
345 DTS GI interface unittests
346 """
347 @classmethod
348 def configure_schema(cls):
349 return NsrYang.get_schema()
350
351 @classmethod
352 def configure_timeout(cls):
353 return 240
354
355 def configure_test(self, loop, test_id):
356 self.log.debug("STARTING - %s", self.id())
357 self.tinfo = self.new_tinfo(self.id())
358 self.dts = rift.tasklets.DTS(self.tinfo, self.schema, self.loop)
359 self.handler = NsrDtsHandler(self.dts, self.log, self.loop, None)
360
361 self.tinfo_c = self.new_tinfo(self.id() + "_client")
362 self.dts_c = rift.tasklets.DTS(self.tinfo_c, self.schema, self.loop)
363
364 @rift.test.dts.async_test
365 def test_add_delete_ns(self):
366
367 nsr1_uuid = "nsr1_uuid" # str(uuid.uuid4())
368 nsr2_uuid = "nsr2_uuid" # str(uuid.uuid4())
369
370 assert nsr1_uuid != nsr2_uuid
371
372 yield from self.handler.register()
373 yield from asyncio.sleep(.5, loop=self.loop)
374
375 self.log.debug("Creating NSR")
376 with self.dts_c.transaction() as xact:
377 block = xact.block_create()
378 block.add_query_update(
379 XPaths.nsr_config(nsr1_uuid),
380 NsrYang.YangData_Nsr_NsInstanceConfig_Nsr(id=nsr1_uuid, name="fu"),
381 flags=rwdts.XactFlag.ADVISE | rwdts.XactFlag.TRACE,
382 )
383 yield from block.execute(now=True)
384
385 yield from asyncio.sleep(.5, loop=self.loop)
386
387 with self.dts_c.transaction() as xact:
388 block = xact.block_create()
389 block.add_query_update(
390 XPaths.scaling_group_instance(nsr1_uuid, "group", 1234),
391 NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_ScalingGroup_Instance(id=1234),
392 flags=rwdts.XactFlag.ADVISE | rwdts.XactFlag.TRACE,
393 )
394 yield from block.execute(now=True)
395
396 yield from asyncio.sleep(.5, loop=self.loop)
397
398 with self.dts_c.transaction() as xact:
399 block = xact.block_create()
400 block.add_query_delete(
401 XPaths.scaling_group_instance(nsr1_uuid, "group", 1234),
402 flags=rwdts.XactFlag.ADVISE | rwdts.XactFlag.TRACE,
403 )
404 yield from block.execute(now=True)
405
406 yield from asyncio.sleep(.5, loop=self.loop)
407
408 with self.dts_c.transaction() as xact:
409 block = xact.block_create()
410 block.add_query_create(
411 XPaths.scaling_group_instance(nsr1_uuid, "group", 12345),
412 NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_ScalingGroup_Instance(id=12345),
413 flags=rwdts.XactFlag.ADVISE | rwdts.XactFlag.TRACE,
414 )
415 yield from block.execute(now=True)
416
417 yield from asyncio.sleep(.5, loop=self.loop)
418
419 group_ids = self.handler.get_scale_group_instances(nsr2_uuid, "group")
420 self.log.debug("Got group ids in nsr2 after adding 12345 to nsr1: %s", group_ids)
421 group_ids = self.handler.get_scale_group_instances(nsr1_uuid, "group")
422 self.log.debug("Got group ids in nsr1 after adding 12345 to nsr1: %s", group_ids)
423 assert group_ids == {12345}
424
425 self.log.debug("\n\nADD A COMPLETELY DIFFERENT NSR\n")
426 with self.dts_c.transaction() as xact:
427 block = xact.block_create()
428 block.add_query_update(
429 XPaths.nsr_config(nsr2_uuid),
430 NsrYang.YangData_Nsr_NsInstanceConfig_Nsr(id=nsr2_uuid, name="fu2"),
431 flags=rwdts.XactFlag.ADVISE | rwdts.XactFlag.TRACE,
432 )
433 yield from block.execute(now=True)
434
435 yield from asyncio.sleep(.5, loop=self.loop)
436
437 group_ids = self.handler.get_scale_group_instances(nsr2_uuid, "group")
438 self.log.debug("Got group ids in nsr2 after adding new nsr: %s", group_ids)
439 group_ids = self.handler.get_scale_group_instances(nsr1_uuid, "group")
440 self.log.debug("Got group ids in nsr1 after adding new nsr: %s", group_ids)
441 assert group_ids == {12345}
442
443 self.log.debug("\n\nDELETE A COMPLETELY DIFFERENT NSR\n")
444 with self.dts_c.transaction() as xact:
445 block = xact.block_create()
446 block.add_query_delete(
447 XPaths.nsr_config(nsr2_uuid),
448 flags=rwdts.XactFlag.ADVISE | rwdts.XactFlag.TRACE,
449 )
450 yield from block.execute(now=True)
451
452 yield from asyncio.sleep(.5, loop=self.loop)
453
454 group_ids = self.handler.get_scale_group_instances(nsr2_uuid, "group")
455 self.log.debug("Got group ids in nsr2 after deleting nsr2: %s", group_ids)
456 group_ids = self.handler.get_scale_group_instances(nsr1_uuid, "group")
457 self.log.debug("Got group ids in nsr1 after deleting nsr2: %s", group_ids)
458 assert group_ids == {12345}
459
460 with self.dts_c.transaction() as xact:
461 block = xact.block_create()
462 block.add_query_delete(
463 XPaths.scaling_group_instance(nsr1_uuid, "group", 12345),
464 flags=rwdts.XactFlag.ADVISE | rwdts.XactFlag.TRACE,
465 )
466 yield from block.execute(now=True)
467
468 yield from asyncio.sleep(2, loop=self.loop)
469
470 def main():
471 runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
472
473 parser = argparse.ArgumentParser()
474 parser.add_argument('-v', '--verbose', action='store_true')
475 parser.add_argument('-n', '--no-runner', action='store_true')
476 args, unittest_args = parser.parse_known_args()
477 if args.no_runner:
478 runner = None
479
480 NsrHandlerTestCase.log_level = logging.DEBUG if args.verbose else logging.WARN
481
482 unittest.main(testRunner=runner, argv=[sys.argv[0]] + unittest_args)
483
484 if __name__ == '__main__':
485 main()