update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / rwlaunchpad / test / utest_nsr_handler.py
1 #!/usr/bin/env python3
2
3 #
4 # Copyright 2016 RIFT.IO Inc
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 #
18
19 import argparse
20 import asyncio
21 import gi
22 import logging
23 import os
24 import sys
25 import time
26 import unittest
27 import uuid
28 import xmlrunner
29
30 import gi.repository.RwDts as rwdts
31 import gi.repository.RwNsmYang as rwnsmyang
32 import gi.repository.NsrYang as NsrYang
33 import gi.repository.RwNsrYang as RwNsrYang
34 import gi.repository.RwTypes as RwTypes
35 import gi.repository.ProtobufC as ProtobufC
36 import gi.repository.RwResourceMgrYang as RwResourceMgrYang
37 import gi.repository.RwLaunchpadYang as launchpadyang
38 import rift.tasklets
39 import rift.test.dts
40
41 gi.require_version('RwKeyspec', '1.0')
42 from gi.repository.RwKeyspec import quoted_key
43
44 import mano_ut
45
46
47 if sys.version_info < (3, 4, 4):
48 asyncio.ensure_future = asyncio.async
49
50
51 class NsrDtsHandler(object):
52 """ The network service DTS handler """
53 NSR_XPATH = "C,/rw-project:project/nsr:ns-instance-config/nsr:nsr"
54 SCALE_INSTANCE_XPATH = "C,/rw-project:project/nsr:ns-instance-config/nsr:nsr/nsr:scaling-group/nsr:instance"
55
56 def __init__(self, dts, log, loop, nsm):
57 self._dts = dts
58 self._log = log
59 self._loop = loop
60 self._nsm = nsm
61
62 self._nsr_regh = None
63 self._scale_regh = None
64
65 @property
66 def nsm(self):
67 """ Return the NS manager instance """
68 return self._nsm
69
70 def get_scale_group_instances(self, nsr_id, group_name):
71 def nsr_id_from_keyspec(ks):
72 nsr_path_entry = NsrYang.YangData_RwProject_Project_NsInstanceConfig_Nsr.schema().keyspec_to_entry(ks)
73 nsr_id = nsr_path_entry.key00.id
74 return nsr_id
75
76 def group_name_from_keyspec(ks):
77 group_path_entry = NsrYang.YangData_RwProject_Project_NsInstanceConfig_Nsr_ScalingGroup.schema().keyspec_to_entry(ks)
78 group_name = group_path_entry.key00.scaling_group_name_ref
79 return group_name
80
81
82 xact_ids = set()
83 for instance_cfg, keyspec in self._scale_regh.get_xact_elements(include_keyspec=True):
84 elem_nsr_id = nsr_id_from_keyspec(keyspec)
85 if elem_nsr_id != nsr_id:
86 continue
87
88 elem_group_name = group_name_from_keyspec(keyspec)
89 if elem_group_name != group_name:
90 continue
91
92 xact_ids.add(instance_cfg.id)
93
94 return xact_ids
95
96 @asyncio.coroutine
97 def register(self):
98 """ Register for Nsr create/update/delete/read requests from dts """
99
100 def nsr_id_from_keyspec(ks):
101 nsr_path_entry = NsrYang.YangData_RwProject_Project_NsInstanceConfig_Nsr.schema().keyspec_to_entry(ks)
102 nsr_id = nsr_path_entry.key00.id
103 return nsr_id
104
105 def group_name_from_keyspec(ks):
106 group_path_entry = NsrYang.YangData_RwProject_Project_NsInstanceConfig_Nsr_ScalingGroup.schema().keyspec_to_entry(ks)
107 group_name = group_path_entry.key00.scaling_group_name_ref
108 return group_name
109
110 def is_instance_in_reg_elements(nsr_id, group_name, instance_id):
111 """ Return boolean indicating if scaling group instance was already commited previously.
112
113 By looking at the existing elements in this registration handle (elements not part
114 of this current xact), we can tell if the instance was configured previously without
115 keeping any application state.
116 """
117 for instance_cfg, keyspec in self._nsr_regh.get_xact_elements(include_keyspec=True):
118 elem_nsr_id = nsr_id_from_keyspec(keyspec)
119 elem_group_name = group_name_from_keyspec(keyspec)
120
121 if elem_nsr_id != nsr_id or group_name != elem_group_name:
122 continue
123
124 if instance_cfg.id == instance_id:
125 return True
126
127 return False
128
129 def get_scale_group_instance_delta(nsr_id, group_name, xact):
130
131 #1. Find all elements in the transaction add to the "added"
132 #2. Find matching elements in current elements, remove from "added".
133 #3. Find elements only in current, add to "deleted"
134
135 xact_ids = set()
136 for instance_cfg, keyspec in self._scale_regh.get_xact_elements(xact, include_keyspec=True):
137 elem_nsr_id = nsr_id_from_keyspec(keyspec)
138 if elem_nsr_id != nsr_id:
139 continue
140
141 elem_group_name = group_name_from_keyspec(keyspec)
142 if elem_group_name != group_name:
143 continue
144
145 xact_ids.add(instance_cfg.id)
146
147 current_ids = set()
148 for instance_cfg, keyspec in self._scale_regh.get_xact_elements(include_keyspec=True):
149 elem_nsr_id = nsr_id_from_keyspec(keyspec)
150 if elem_nsr_id != nsr_id:
151 continue
152
153 elem_group_name = group_name_from_keyspec(keyspec)
154 if elem_group_name != group_name:
155 continue
156
157 current_ids.add(instance_cfg.id)
158
159 delta = {
160 "added": xact_ids - current_ids,
161 "deleted": current_ids - xact_ids
162 }
163 return delta
164
165 def get_add_delete_update_cfgs(dts_member_reg, xact, key_name):
166 # Unforunately, it is currently difficult to figure out what has exactly
167 # changed in this xact without Pbdelta support (RIFT-4916)
168 # As a workaround, we can fetch the pre and post xact elements and
169 # perform a comparison to figure out adds/deletes/updates
170 xact_cfgs = list(dts_member_reg.get_xact_elements(xact))
171 curr_cfgs = list(dts_member_reg.elements)
172
173 xact_key_map = {getattr(cfg, key_name): cfg for cfg in xact_cfgs}
174 curr_key_map = {getattr(cfg, key_name): cfg for cfg in curr_cfgs}
175
176 # Find Adds
177 added_keys = set(xact_key_map) - set(curr_key_map)
178 added_cfgs = [xact_key_map[key] for key in added_keys]
179
180 # Find Deletes
181 deleted_keys = set(curr_key_map) - set(xact_key_map)
182 deleted_cfgs = [curr_key_map[key] for key in deleted_keys]
183
184 # Find Updates
185 updated_keys = set(curr_key_map) & set(xact_key_map)
186 updated_cfgs = [xact_key_map[key] for key in updated_keys if xact_key_map[key] != curr_key_map[key]]
187
188 return added_cfgs, deleted_cfgs, updated_cfgs
189
190 def on_apply(dts, acg, xact, action, scratch):
191 """Apply the configuration"""
192 def handle_create_nsr(msg):
193 # Handle create nsr requests """
194 # Do some validations
195 if not msg.has_field("nsd_ref"):
196 err = "NSD reference not provided"
197 self._log.error(err)
198 raise NetworkServiceRecordError(err)
199
200 self._log.info("Creating NetworkServiceRecord %s from nsd_id %s",
201 msg.id, msg.nsd_ref)
202
203 #nsr = self.nsm.create_nsr(msg)
204 return nsr
205
206 def handle_delete_nsr(msg):
207 @asyncio.coroutine
208 def delete_instantiation(ns_id):
209 """ Delete instantiation """
210 pass
211 #with self._dts.transaction() as xact:
212 #yield from self._nsm.terminate_ns(ns_id, xact)
213
214 # Handle delete NSR requests
215 self._log.info("Delete req for NSR Id: %s received", msg.id)
216 # Terminate the NSR instance
217 #nsr = self._nsm.get_ns_by_nsr_id(msg.id)
218
219 #nsr.set_state(NetworkServiceRecordState.TERMINATE_RCVD)
220 #event_descr = "Terminate rcvd for NS Id:%s" % msg.id
221 #nsr.record_event("terminate-rcvd", event_descr)
222
223 #self._loop.create_task(delete_instantiation(msg.id))
224
225 @asyncio.coroutine
226 def begin_instantiation(nsr):
227 # Begin instantiation
228 pass
229 #self._log.info("Beginning NS instantiation: %s", nsr.id)
230 #yield from self._nsm.instantiate_ns(nsr.id, xact)
231
232 self._log.debug("Got nsr apply (xact: %s) (action: %s)(scr: %s)",
233 xact, action, scratch)
234
235 if action == rwdts.AppconfAction.INSTALL and xact.id is None:
236 self._log.debug("No xact handle. Skipping apply config")
237 xact = None
238
239 (added_msgs, deleted_msgs, updated_msgs) = get_add_delete_update_cfgs(self._nsr_regh, xact, "id")
240
241 for msg in added_msgs:
242 self._log.info("Create NSR received in on_apply to instantiate NS:%s", msg.id)
243 #if msg.id not in self._nsm.nsrs:
244 # self._log.info("Create NSR received in on_apply to instantiate NS:%s", msg.id)
245 # nsr = handle_create_nsr(msg)
246 # self._loop.create_task(begin_instantiation(nsr))
247
248 for msg in deleted_msgs:
249 self._log.info("Delete NSR received in on_apply to terminate NS:%s", msg.id)
250 try:
251 handle_delete_nsr(msg)
252 except Exception:
253 self._log.exception("Failed to terminate NS:%s", msg.id)
254
255 for msg in updated_msgs:
256 self._log.info("Update NSR received in on_apply to change scaling groups in NS:%s", msg.id)
257
258 for group in msg.scaling_group:
259 instance_delta = get_scale_group_instance_delta(msg.id, group.scaling_group_name_ref, xact)
260 self._log.debug("Got NSR:%s scale group instance delta: %s", msg.id, instance_delta)
261
262 #for instance_id in instance_delta["added"]:
263 # self._nsm.scale_nsr_out(msg.id, group.scaling_group_name_ref, instance_id, xact)
264
265 #for instance_id in instance_delta["deleted"]:
266 # self._nsm.scale_nsr_in(msg.id, group.scaling_group_name_ref, instance_id)
267
268
269 return RwTypes.RwStatus.SUCCESS
270
271 @asyncio.coroutine
272 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
273 """ Prepare calllback from DTS for NSR """
274
275 xpath = ks_path.to_xpath(NsrYang.get_schema())
276 action = xact_info.query_action
277 self._log.debug(
278 "Got Nsr prepare callback (xact: %s) (action: %s) (info: %s), %s:%s)",
279 xact, action, xact_info, xpath, msg
280 )
281
282 fref = ProtobufC.FieldReference.alloc()
283 fref.goto_whole_message(msg.to_pbcm())
284
285 if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE]:
286 pass
287 # Ensure the Cloud account has been specified if this is an NSR create
288 #if msg.id not in self._nsm.nsrs:
289 # if not msg.has_field("cloud_account"):
290 # raise NsrInstantiationFailed("Cloud account not specified in NSR")
291
292 # We do not allow scaling actions to occur if the NS is not in running state
293 #elif msg.has_field("scaling_group"):
294 # nsr = self._nsm.nsrs[msg.id]
295 # if nsr.state != NetworkServiceRecordState.RUNNING:
296 # raise ScalingOperationError("Unable to perform scaling action when NS is not in running state")
297
298 # if len(msg.scaling_group) > 1:
299 # raise ScalingOperationError("Only a single scaling group can be configured at a time")
300
301 # for group_msg in msg.scaling_group:
302 # num_new_group_instances = len(group_msg.instance)
303 # if num_new_group_instances > 1:
304 # raise ScalingOperationError("Only a single scaling instance can be created at a time")
305
306 # elif num_new_group_instances == 1:
307 # scale_group = nsr.scaling_groups[group_msg.scaling_group_name_ref]
308 # if len(scale_group.instances) == scale_group.max_instance_count:
309 # raise ScalingOperationError("Max instances for %s reached" % scale_group)
310
311
312 acg.handle.prepare_complete_ok(xact_info.handle)
313
314
315 self._log.debug("Registering for NSR config using xpath: %s",
316 NsrDtsHandler.NSR_XPATH)
317
318 acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply)
319 with self._dts.appconf_group_create(handler=acg_hdl) as acg:
320 self._nsr_regh = acg.register(xpath=NsrDtsHandler.NSR_XPATH,
321 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE,
322 on_prepare=on_prepare)
323
324 self._scale_regh = acg.register(
325 xpath=NsrDtsHandler.SCALE_INSTANCE_XPATH,
326 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE,
327 )
328
329
330 class XPaths(object):
331 @staticmethod
332 def nsr_config(nsr_id=None):
333 return ("C,/rw-project:project/nsr:ns-instance-config/nsr:nsr" +
334 ("[nsr:id={}]".format(quoted_key(nsr_id)) if nsr_id is not None else ""))
335
336 def scaling_group_instance(nsr_id, group_name, instance_id):
337 return ("C,/rw-project:project/nsr:ns-instance-config/nsr:nsr" +
338 "[nsr:id={}]".format(quoted_key(nsr_id)) +
339 "/nsr:scaling-group" +
340 "[nsr:scaling-group-name-ref={}]".format(quoted_key(group_name)) +
341 "/nsr:instance" +
342 "[nsr:id={}]".format(quoted_key(instance_id))
343 )
344
345
346 class NsrHandlerTestCase(rift.test.dts.AbstractDTSTest):
347 """
348 DTS GI interface unittests
349 """
350 @classmethod
351 def configure_schema(cls):
352 return NsrYang.get_schema()
353
354 @classmethod
355 def configure_timeout(cls):
356 return 240
357
358 def configure_test(self, loop, test_id):
359 self.log.debug("STARTING - %s", self.id())
360 self.tinfo = self.new_tinfo(self.id())
361 self.dts = rift.tasklets.DTS(self.tinfo, self.schema, self.loop)
362 self.handler = NsrDtsHandler(self.dts, self.log, self.loop, None)
363
364 self.tinfo_c = self.new_tinfo(self.id() + "_client")
365 self.dts_c = rift.tasklets.DTS(self.tinfo_c, self.schema, self.loop)
366
367 @rift.test.dts.async_test
368 def test_add_delete_ns(self):
369
370 nsr1_uuid = "nsr1_uuid" # str(uuid.uuid4())
371 nsr2_uuid = "nsr2_uuid" # str(uuid.uuid4())
372
373 assert nsr1_uuid != nsr2_uuid
374
375 yield from self.handler.register()
376 yield from asyncio.sleep(.5, loop=self.loop)
377
378 self.log.debug("Creating NSR")
379 with self.dts_c.transaction() as xact:
380 block = xact.block_create()
381 block.add_query_update(
382 XPaths.nsr_config(nsr1_uuid),
383 NsrYang.YangData_RwProject_Project_NsInstanceConfig_Nsr(id=nsr1_uuid, name="fu"),
384 flags=rwdts.XactFlag.ADVISE | rwdts.XactFlag.TRACE,
385 )
386 yield from block.execute(now=True)
387
388 yield from asyncio.sleep(.5, loop=self.loop)
389
390 with self.dts_c.transaction() as xact:
391 block = xact.block_create()
392 block.add_query_update(
393 XPaths.scaling_group_instance(nsr1_uuid, "group", 1234),
394 NsrYang.YangData_RwProject_Project_NsInstanceConfig_Nsr_ScalingGroup_Instance(id=1234),
395 flags=rwdts.XactFlag.ADVISE | rwdts.XactFlag.TRACE,
396 )
397 yield from block.execute(now=True)
398
399 yield from asyncio.sleep(.5, loop=self.loop)
400
401 with self.dts_c.transaction() as xact:
402 block = xact.block_create()
403 block.add_query_delete(
404 XPaths.scaling_group_instance(nsr1_uuid, "group", 1234),
405 flags=rwdts.XactFlag.ADVISE | rwdts.XactFlag.TRACE,
406 )
407 yield from block.execute(now=True)
408
409 yield from asyncio.sleep(.5, loop=self.loop)
410
411 with self.dts_c.transaction() as xact:
412 block = xact.block_create()
413 block.add_query_create(
414 XPaths.scaling_group_instance(nsr1_uuid, "group", 12345),
415 NsrYang.YangData_RwProject_Project_NsInstanceConfig_Nsr_ScalingGroup_Instance(id=12345),
416 flags=rwdts.XactFlag.ADVISE | rwdts.XactFlag.TRACE,
417 )
418 yield from block.execute(now=True)
419
420 yield from asyncio.sleep(.5, loop=self.loop)
421
422 group_ids = self.handler.get_scale_group_instances(nsr2_uuid, "group")
423 self.log.debug("Got group ids in nsr2 after adding 12345 to nsr1: %s", group_ids)
424 group_ids = self.handler.get_scale_group_instances(nsr1_uuid, "group")
425 self.log.debug("Got group ids in nsr1 after adding 12345 to nsr1: %s", group_ids)
426 assert group_ids == {12345}
427
428 self.log.debug("\n\nADD A COMPLETELY DIFFERENT NSR\n")
429 with self.dts_c.transaction() as xact:
430 block = xact.block_create()
431 block.add_query_update(
432 XPaths.nsr_config(nsr2_uuid),
433 NsrYang.YangData_RwProject_Project_NsInstanceConfig_Nsr(id=nsr2_uuid, name="fu2"),
434 flags=rwdts.XactFlag.ADVISE | rwdts.XactFlag.TRACE,
435 )
436 yield from block.execute(now=True)
437
438 yield from asyncio.sleep(.5, loop=self.loop)
439
440 group_ids = self.handler.get_scale_group_instances(nsr2_uuid, "group")
441 self.log.debug("Got group ids in nsr2 after adding new nsr: %s", group_ids)
442 group_ids = self.handler.get_scale_group_instances(nsr1_uuid, "group")
443 self.log.debug("Got group ids in nsr1 after adding new nsr: %s", group_ids)
444 assert group_ids == {12345}
445
446 self.log.debug("\n\nDELETE A COMPLETELY DIFFERENT NSR\n")
447 with self.dts_c.transaction() as xact:
448 block = xact.block_create()
449 block.add_query_delete(
450 XPaths.nsr_config(nsr2_uuid),
451 flags=rwdts.XactFlag.ADVISE | rwdts.XactFlag.TRACE,
452 )
453 yield from block.execute(now=True)
454
455 yield from asyncio.sleep(.5, loop=self.loop)
456
457 group_ids = self.handler.get_scale_group_instances(nsr2_uuid, "group")
458 self.log.debug("Got group ids in nsr2 after deleting nsr2: %s", group_ids)
459 group_ids = self.handler.get_scale_group_instances(nsr1_uuid, "group")
460 self.log.debug("Got group ids in nsr1 after deleting nsr2: %s", group_ids)
461 assert group_ids == {12345}
462
463 with self.dts_c.transaction() as xact:
464 block = xact.block_create()
465 block.add_query_delete(
466 XPaths.scaling_group_instance(nsr1_uuid, "group", 12345),
467 flags=rwdts.XactFlag.ADVISE | rwdts.XactFlag.TRACE,
468 )
469 yield from block.execute(now=True)
470
471 yield from asyncio.sleep(2, loop=self.loop)
472
473 def main():
474 runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
475
476 parser = argparse.ArgumentParser()
477 parser.add_argument('-v', '--verbose', action='store_true')
478 parser.add_argument('-n', '--no-runner', action='store_true')
479 args, unittest_args = parser.parse_known_args()
480 if args.no_runner:
481 runner = None
482
483 NsrHandlerTestCase.log_level = logging.DEBUG if args.verbose else logging.WARN
484
485 unittest.main(testRunner=runner, argv=[sys.argv[0]] + unittest_args)
486
487 if __name__ == '__main__':
488 main()