RIFT OSM R1 Initial Submission
Signed-off-by: Jeremy Mordkoff <jeremy.mordkoff@riftio.com>
diff --git a/rwlaunchpad/test/utest_nsr_handler.py b/rwlaunchpad/test/utest_nsr_handler.py
new file mode 100755
index 0000000..ffab929
--- /dev/null
+++ b/rwlaunchpad/test/utest_nsr_handler.py
@@ -0,0 +1,485 @@
+#!/usr/bin/env python3
+
+#
+# Copyright 2016 RIFT.IO Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import argparse
+import asyncio
+import logging
+import os
+import sys
+import time
+import unittest
+import uuid
+
+import xmlrunner
+
+import gi.repository.RwDts as rwdts
+import gi.repository.RwNsmYang as rwnsmyang
+import gi.repository.NsrYang as NsrYang
+import gi.repository.RwNsrYang as RwNsrYang
+import gi.repository.RwTypes as RwTypes
+import gi.repository.ProtobufC as ProtobufC
+import gi.repository.RwResourceMgrYang as RwResourceMgrYang
+import gi.repository.RwLaunchpadYang as launchpadyang
+import rift.tasklets
+import rift.test.dts
+
+import mano_ut
+
+
+if sys.version_info < (3, 4, 4):
+ asyncio.ensure_future = asyncio.async
+
+
+class NsrDtsHandler(object):
+ """ The network service DTS handler """
+ NSR_XPATH = "C,/nsr:ns-instance-config/nsr:nsr"
+ SCALE_INSTANCE_XPATH = "C,/nsr:ns-instance-config/nsr:nsr/nsr:scaling-group/nsr:instance"
+
+ def __init__(self, dts, log, loop, nsm):
+ self._dts = dts
+ self._log = log
+ self._loop = loop
+ self._nsm = nsm
+
+ self._nsr_regh = None
+ self._scale_regh = None
+
+ @property
+ def nsm(self):
+ """ Return the NS manager instance """
+ return self._nsm
+
+ def get_scale_group_instances(self, nsr_id, group_name):
+ def nsr_id_from_keyspec(ks):
+ nsr_path_entry = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr.schema().keyspec_to_entry(ks)
+ nsr_id = nsr_path_entry.key00.id
+ return nsr_id
+
+ def group_name_from_keyspec(ks):
+ group_path_entry = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_ScalingGroup.schema().keyspec_to_entry(ks)
+ group_name = group_path_entry.key00.scaling_group_name_ref
+ return group_name
+
+
+ xact_ids = set()
+ for instance_cfg, keyspec in self._scale_regh.get_xact_elements(include_keyspec=True):
+ elem_nsr_id = nsr_id_from_keyspec(keyspec)
+ if elem_nsr_id != nsr_id:
+ continue
+
+ elem_group_name = group_name_from_keyspec(keyspec)
+ if elem_group_name != group_name:
+ continue
+
+ xact_ids.add(instance_cfg.id)
+
+ return xact_ids
+
+ @asyncio.coroutine
+ def register(self):
+ """ Register for Nsr create/update/delete/read requests from dts """
+
+ def nsr_id_from_keyspec(ks):
+ nsr_path_entry = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr.schema().keyspec_to_entry(ks)
+ nsr_id = nsr_path_entry.key00.id
+ return nsr_id
+
+ def group_name_from_keyspec(ks):
+ group_path_entry = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_ScalingGroup.schema().keyspec_to_entry(ks)
+ group_name = group_path_entry.key00.scaling_group_name_ref
+ return group_name
+
+ def is_instance_in_reg_elements(nsr_id, group_name, instance_id):
+ """ Return boolean indicating if scaling group instance was already commited previously.
+
+ By looking at the existing elements in this registration handle (elements not part
+ of this current xact), we can tell if the instance was configured previously without
+ keeping any application state.
+ """
+ for instance_cfg, keyspec in self._nsr_regh.get_xact_elements(include_keyspec=True):
+ elem_nsr_id = nsr_id_from_keyspec(keyspec)
+ elem_group_name = group_name_from_keyspec(keyspec)
+
+ if elem_nsr_id != nsr_id or group_name != elem_group_name:
+ continue
+
+ if instance_cfg.id == instance_id:
+ return True
+
+ return False
+
+ def get_scale_group_instance_delta(nsr_id, group_name, xact):
+
+ #1. Find all elements in the transaction add to the "added"
+ #2. Find matching elements in current elements, remove from "added".
+ #3. Find elements only in current, add to "deleted"
+
+ xact_ids = set()
+ for instance_cfg, keyspec in self._scale_regh.get_xact_elements(xact, include_keyspec=True):
+ elem_nsr_id = nsr_id_from_keyspec(keyspec)
+ if elem_nsr_id != nsr_id:
+ continue
+
+ elem_group_name = group_name_from_keyspec(keyspec)
+ if elem_group_name != group_name:
+ continue
+
+ xact_ids.add(instance_cfg.id)
+
+ current_ids = set()
+ for instance_cfg, keyspec in self._scale_regh.get_xact_elements(include_keyspec=True):
+ elem_nsr_id = nsr_id_from_keyspec(keyspec)
+ if elem_nsr_id != nsr_id:
+ continue
+
+ elem_group_name = group_name_from_keyspec(keyspec)
+ if elem_group_name != group_name:
+ continue
+
+ current_ids.add(instance_cfg.id)
+
+ delta = {
+ "added": xact_ids - current_ids,
+ "deleted": current_ids - xact_ids
+ }
+ return delta
+
+ def get_add_delete_update_cfgs(dts_member_reg, xact, key_name):
+ # Unforunately, it is currently difficult to figure out what has exactly
+ # changed in this xact without Pbdelta support (RIFT-4916)
+ # As a workaround, we can fetch the pre and post xact elements and
+ # perform a comparison to figure out adds/deletes/updates
+ xact_cfgs = list(dts_member_reg.get_xact_elements(xact))
+ curr_cfgs = list(dts_member_reg.elements)
+
+ xact_key_map = {getattr(cfg, key_name): cfg for cfg in xact_cfgs}
+ curr_key_map = {getattr(cfg, key_name): cfg for cfg in curr_cfgs}
+
+ # Find Adds
+ added_keys = set(xact_key_map) - set(curr_key_map)
+ added_cfgs = [xact_key_map[key] for key in added_keys]
+
+ # Find Deletes
+ deleted_keys = set(curr_key_map) - set(xact_key_map)
+ deleted_cfgs = [curr_key_map[key] for key in deleted_keys]
+
+ # Find Updates
+ updated_keys = set(curr_key_map) & set(xact_key_map)
+ updated_cfgs = [xact_key_map[key] for key in updated_keys if xact_key_map[key] != curr_key_map[key]]
+
+ return added_cfgs, deleted_cfgs, updated_cfgs
+
+ def on_apply(dts, acg, xact, action, scratch):
+ """Apply the configuration"""
+ def handle_create_nsr(msg):
+ # Handle create nsr requests """
+ # Do some validations
+ if not msg.has_field("nsd_ref"):
+ err = "NSD reference not provided"
+ self._log.error(err)
+ raise NetworkServiceRecordError(err)
+
+ self._log.info("Creating NetworkServiceRecord %s from nsd_id %s",
+ msg.id, msg.nsd_ref)
+
+ #nsr = self.nsm.create_nsr(msg)
+ return nsr
+
+ def handle_delete_nsr(msg):
+ @asyncio.coroutine
+ def delete_instantiation(ns_id):
+ """ Delete instantiation """
+ pass
+ #with self._dts.transaction() as xact:
+ #yield from self._nsm.terminate_ns(ns_id, xact)
+
+ # Handle delete NSR requests
+ self._log.info("Delete req for NSR Id: %s received", msg.id)
+ # Terminate the NSR instance
+ #nsr = self._nsm.get_ns_by_nsr_id(msg.id)
+
+ #nsr.set_state(NetworkServiceRecordState.TERMINATE_RCVD)
+ #event_descr = "Terminate rcvd for NS Id:%s" % msg.id
+ #nsr.record_event("terminate-rcvd", event_descr)
+
+ #self._loop.create_task(delete_instantiation(msg.id))
+
+ @asyncio.coroutine
+ def begin_instantiation(nsr):
+ # Begin instantiation
+ pass
+ #self._log.info("Beginning NS instantiation: %s", nsr.id)
+ #yield from self._nsm.instantiate_ns(nsr.id, xact)
+
+ self._log.debug("Got nsr apply (xact: %s) (action: %s)(scr: %s)",
+ xact, action, scratch)
+
+ if action == rwdts.AppconfAction.INSTALL and xact.id is None:
+ self._log.debug("No xact handle. Skipping apply config")
+ xact = None
+
+ (added_msgs, deleted_msgs, updated_msgs) = get_add_delete_update_cfgs(self._nsr_regh, xact, "id")
+
+ for msg in added_msgs:
+ self._log.info("Create NSR received in on_apply to instantiate NS:%s", msg.id)
+ #if msg.id not in self._nsm.nsrs:
+ # self._log.info("Create NSR received in on_apply to instantiate NS:%s", msg.id)
+ # nsr = handle_create_nsr(msg)
+ # self._loop.create_task(begin_instantiation(nsr))
+
+ for msg in deleted_msgs:
+ self._log.info("Delete NSR received in on_apply to terminate NS:%s", msg.id)
+ try:
+ handle_delete_nsr(msg)
+ except Exception:
+ self._log.exception("Failed to terminate NS:%s", msg.id)
+
+ for msg in updated_msgs:
+ self._log.info("Update NSR received in on_apply to change scaling groups in NS:%s", msg.id)
+
+ for group in msg.scaling_group:
+ instance_delta = get_scale_group_instance_delta(msg.id, group.scaling_group_name_ref, xact)
+ self._log.debug("Got NSR:%s scale group instance delta: %s", msg.id, instance_delta)
+
+ #for instance_id in instance_delta["added"]:
+ # self._nsm.scale_nsr_out(msg.id, group.scaling_group_name_ref, instance_id, xact)
+
+ #for instance_id in instance_delta["deleted"]:
+ # self._nsm.scale_nsr_in(msg.id, group.scaling_group_name_ref, instance_id)
+
+
+ return RwTypes.RwStatus.SUCCESS
+
+ @asyncio.coroutine
+ def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
+ """ Prepare calllback from DTS for NSR """
+
+ xpath = ks_path.to_xpath(NsrYang.get_schema())
+ action = xact_info.query_action
+ self._log.debug(
+ "Got Nsr prepare callback (xact: %s) (action: %s) (info: %s), %s:%s)",
+ xact, action, xact_info, xpath, msg
+ )
+
+ fref = ProtobufC.FieldReference.alloc()
+ fref.goto_whole_message(msg.to_pbcm())
+
+ if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE]:
+ pass
+ # Ensure the Cloud account has been specified if this is an NSR create
+ #if msg.id not in self._nsm.nsrs:
+ # if not msg.has_field("cloud_account"):
+ # raise NsrInstantiationFailed("Cloud account not specified in NSR")
+
+ # We do not allow scaling actions to occur if the NS is not in running state
+ #elif msg.has_field("scaling_group"):
+ # nsr = self._nsm.nsrs[msg.id]
+ # if nsr.state != NetworkServiceRecordState.RUNNING:
+ # raise ScalingOperationError("Unable to perform scaling action when NS is not in running state")
+
+ # if len(msg.scaling_group) > 1:
+ # raise ScalingOperationError("Only a single scaling group can be configured at a time")
+
+ # for group_msg in msg.scaling_group:
+ # num_new_group_instances = len(group_msg.instance)
+ # if num_new_group_instances > 1:
+ # raise ScalingOperationError("Only a single scaling instance can be created at a time")
+
+ # elif num_new_group_instances == 1:
+ # scale_group = nsr.scaling_groups[group_msg.scaling_group_name_ref]
+ # if len(scale_group.instances) == scale_group.max_instance_count:
+ # raise ScalingOperationError("Max instances for %s reached" % scale_group)
+
+
+ acg.handle.prepare_complete_ok(xact_info.handle)
+
+
+ self._log.debug("Registering for NSR config using xpath: %s",
+ NsrDtsHandler.NSR_XPATH)
+
+ acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply)
+ with self._dts.appconf_group_create(handler=acg_hdl) as acg:
+ self._nsr_regh = acg.register(xpath=NsrDtsHandler.NSR_XPATH,
+ flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE,
+ on_prepare=on_prepare)
+
+ self._scale_regh = acg.register(
+ xpath=NsrDtsHandler.SCALE_INSTANCE_XPATH,
+ flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE,
+ )
+
+
+class XPaths(object):
+ @staticmethod
+ def nsr_config(nsr_id=None):
+ return ("C,/nsr:ns-instance-config/nsr:nsr" +
+ ("[nsr:id='{}']".format(nsr_id) if nsr_id is not None else ""))
+
+ def scaling_group_instance(nsr_id, group_name, instance_id):
+ return ("C,/nsr:ns-instance-config/nsr:nsr" +
+ "[nsr:id='{}']".format(nsr_id) +
+ "/nsr:scaling-group" +
+ "[nsr:scaling-group-name-ref='{}']".format(group_name) +
+ "/nsr:instance" +
+ "[nsr:id='{}']".format(instance_id)
+ )
+
+
+class NsrHandlerTestCase(rift.test.dts.AbstractDTSTest):
+ """
+ DTS GI interface unittests
+ """
+ @classmethod
+ def configure_schema(cls):
+ return NsrYang.get_schema()
+
+ @classmethod
+ def configure_timeout(cls):
+ return 240
+
+ def configure_test(self, loop, test_id):
+ self.log.debug("STARTING - %s", self.id())
+ self.tinfo = self.new_tinfo(self.id())
+ self.dts = rift.tasklets.DTS(self.tinfo, self.schema, self.loop)
+ self.handler = NsrDtsHandler(self.dts, self.log, self.loop, None)
+
+ self.tinfo_c = self.new_tinfo(self.id() + "_client")
+ self.dts_c = rift.tasklets.DTS(self.tinfo_c, self.schema, self.loop)
+
+ @rift.test.dts.async_test
+ def test_add_delete_ns(self):
+
+ nsr1_uuid = "nsr1_uuid" # str(uuid.uuid4())
+ nsr2_uuid = "nsr2_uuid" # str(uuid.uuid4())
+
+ assert nsr1_uuid != nsr2_uuid
+
+ yield from self.handler.register()
+ yield from asyncio.sleep(.5, loop=self.loop)
+
+ self.log.debug("Creating NSR")
+ with self.dts_c.transaction() as xact:
+ block = xact.block_create()
+ block.add_query_update(
+ XPaths.nsr_config(nsr1_uuid),
+ NsrYang.YangData_Nsr_NsInstanceConfig_Nsr(id=nsr1_uuid, name="fu"),
+ flags=rwdts.XactFlag.ADVISE | rwdts.XactFlag.TRACE,
+ )
+ yield from block.execute(now=True)
+
+ yield from asyncio.sleep(.5, loop=self.loop)
+
+ with self.dts_c.transaction() as xact:
+ block = xact.block_create()
+ block.add_query_update(
+ XPaths.scaling_group_instance(nsr1_uuid, "group", 1234),
+ NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_ScalingGroup_Instance(id=1234),
+ flags=rwdts.XactFlag.ADVISE | rwdts.XactFlag.TRACE,
+ )
+ yield from block.execute(now=True)
+
+ yield from asyncio.sleep(.5, loop=self.loop)
+
+ with self.dts_c.transaction() as xact:
+ block = xact.block_create()
+ block.add_query_delete(
+ XPaths.scaling_group_instance(nsr1_uuid, "group", 1234),
+ flags=rwdts.XactFlag.ADVISE | rwdts.XactFlag.TRACE,
+ )
+ yield from block.execute(now=True)
+
+ yield from asyncio.sleep(.5, loop=self.loop)
+
+ with self.dts_c.transaction() as xact:
+ block = xact.block_create()
+ block.add_query_create(
+ XPaths.scaling_group_instance(nsr1_uuid, "group", 12345),
+ NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_ScalingGroup_Instance(id=12345),
+ flags=rwdts.XactFlag.ADVISE | rwdts.XactFlag.TRACE,
+ )
+ yield from block.execute(now=True)
+
+ yield from asyncio.sleep(.5, loop=self.loop)
+
+ group_ids = self.handler.get_scale_group_instances(nsr2_uuid, "group")
+ self.log.debug("Got group ids in nsr2 after adding 12345 to nsr1: %s", group_ids)
+ group_ids = self.handler.get_scale_group_instances(nsr1_uuid, "group")
+ self.log.debug("Got group ids in nsr1 after adding 12345 to nsr1: %s", group_ids)
+ assert group_ids == {12345}
+
+ self.log.debug("\n\nADD A COMPLETELY DIFFERENT NSR\n")
+ with self.dts_c.transaction() as xact:
+ block = xact.block_create()
+ block.add_query_update(
+ XPaths.nsr_config(nsr2_uuid),
+ NsrYang.YangData_Nsr_NsInstanceConfig_Nsr(id=nsr2_uuid, name="fu2"),
+ flags=rwdts.XactFlag.ADVISE | rwdts.XactFlag.TRACE,
+ )
+ yield from block.execute(now=True)
+
+ yield from asyncio.sleep(.5, loop=self.loop)
+
+ group_ids = self.handler.get_scale_group_instances(nsr2_uuid, "group")
+ self.log.debug("Got group ids in nsr2 after adding new nsr: %s", group_ids)
+ group_ids = self.handler.get_scale_group_instances(nsr1_uuid, "group")
+ self.log.debug("Got group ids in nsr1 after adding new nsr: %s", group_ids)
+ assert group_ids == {12345}
+
+ self.log.debug("\n\nDELETE A COMPLETELY DIFFERENT NSR\n")
+ with self.dts_c.transaction() as xact:
+ block = xact.block_create()
+ block.add_query_delete(
+ XPaths.nsr_config(nsr2_uuid),
+ flags=rwdts.XactFlag.ADVISE | rwdts.XactFlag.TRACE,
+ )
+ yield from block.execute(now=True)
+
+ yield from asyncio.sleep(.5, loop=self.loop)
+
+ group_ids = self.handler.get_scale_group_instances(nsr2_uuid, "group")
+ self.log.debug("Got group ids in nsr2 after deleting nsr2: %s", group_ids)
+ group_ids = self.handler.get_scale_group_instances(nsr1_uuid, "group")
+ self.log.debug("Got group ids in nsr1 after deleting nsr2: %s", group_ids)
+ assert group_ids == {12345}
+
+ with self.dts_c.transaction() as xact:
+ block = xact.block_create()
+ block.add_query_delete(
+ XPaths.scaling_group_instance(nsr1_uuid, "group", 12345),
+ flags=rwdts.XactFlag.ADVISE | rwdts.XactFlag.TRACE,
+ )
+ yield from block.execute(now=True)
+
+ yield from asyncio.sleep(2, loop=self.loop)
+
+def main():
+ runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument('-v', '--verbose', action='store_true')
+ parser.add_argument('-n', '--no-runner', action='store_true')
+ args, unittest_args = parser.parse_known_args()
+ if args.no_runner:
+ runner = None
+
+ NsrHandlerTestCase.log_level = logging.DEBUG if args.verbose else logging.WARN
+
+ unittest.main(testRunner=runner, argv=[sys.argv[0]] + unittest_args)
+
+if __name__ == '__main__':
+ main()