| #!/usr/bin/env python3 |
| |
| # |
| # Copyright 2016 RIFT.IO Inc |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| import argparse |
| import asyncio |
| import gi |
| import logging |
| import os |
| import sys |
| import time |
| import unittest |
| import uuid |
| import xmlrunner |
| |
| import gi.repository.RwDts as rwdts |
| import gi.repository.RwNsmYang as rwnsmyang |
| import gi.repository.NsrYang as NsrYang |
| import gi.repository.RwNsrYang as RwNsrYang |
| import gi.repository.RwTypes as RwTypes |
| import gi.repository.ProtobufC as ProtobufC |
| import gi.repository.RwResourceMgrYang as RwResourceMgrYang |
| import gi.repository.RwLaunchpadYang as launchpadyang |
| import rift.tasklets |
| import rift.test.dts |
| |
| gi.require_version('RwKeyspec', '1.0') |
| from gi.repository.RwKeyspec import quoted_key |
| |
| import mano_ut |
| |
| |
| if sys.version_info < (3, 4, 4): |
| asyncio.ensure_future = asyncio.async |
| |
| |
| class NsrDtsHandler(object): |
| """ The network service DTS handler """ |
| NSR_XPATH = "C,/rw-project:project/nsr:ns-instance-config/nsr:nsr" |
| SCALE_INSTANCE_XPATH = "C,/rw-project:project/nsr:ns-instance-config/nsr:nsr/nsr:scaling-group/nsr:instance" |
| |
| def __init__(self, dts, log, loop, nsm): |
| self._dts = dts |
| self._log = log |
| self._loop = loop |
| self._nsm = nsm |
| |
| self._nsr_regh = None |
| self._scale_regh = None |
| |
| @property |
| def nsm(self): |
| """ Return the NS manager instance """ |
| return self._nsm |
| |
| def get_scale_group_instances(self, nsr_id, group_name): |
| def nsr_id_from_keyspec(ks): |
| nsr_path_entry = NsrYang.YangData_RwProject_Project_NsInstanceConfig_Nsr.schema().keyspec_to_entry(ks) |
| nsr_id = nsr_path_entry.key00.id |
| return nsr_id |
| |
| def group_name_from_keyspec(ks): |
| group_path_entry = NsrYang.YangData_RwProject_Project_NsInstanceConfig_Nsr_ScalingGroup.schema().keyspec_to_entry(ks) |
| group_name = group_path_entry.key00.scaling_group_name_ref |
| return group_name |
| |
| |
| xact_ids = set() |
| for instance_cfg, keyspec in self._scale_regh.get_xact_elements(include_keyspec=True): |
| elem_nsr_id = nsr_id_from_keyspec(keyspec) |
| if elem_nsr_id != nsr_id: |
| continue |
| |
| elem_group_name = group_name_from_keyspec(keyspec) |
| if elem_group_name != group_name: |
| continue |
| |
| xact_ids.add(instance_cfg.id) |
| |
| return xact_ids |
| |
| @asyncio.coroutine |
| def register(self): |
| """ Register for Nsr create/update/delete/read requests from dts """ |
| |
| def nsr_id_from_keyspec(ks): |
| nsr_path_entry = NsrYang.YangData_RwProject_Project_NsInstanceConfig_Nsr.schema().keyspec_to_entry(ks) |
| nsr_id = nsr_path_entry.key00.id |
| return nsr_id |
| |
| def group_name_from_keyspec(ks): |
| group_path_entry = NsrYang.YangData_RwProject_Project_NsInstanceConfig_Nsr_ScalingGroup.schema().keyspec_to_entry(ks) |
| group_name = group_path_entry.key00.scaling_group_name_ref |
| return group_name |
| |
| def is_instance_in_reg_elements(nsr_id, group_name, instance_id): |
| """ Return boolean indicating if scaling group instance was already commited previously. |
| |
| By looking at the existing elements in this registration handle (elements not part |
| of this current xact), we can tell if the instance was configured previously without |
| keeping any application state. |
| """ |
| for instance_cfg, keyspec in self._nsr_regh.get_xact_elements(include_keyspec=True): |
| elem_nsr_id = nsr_id_from_keyspec(keyspec) |
| elem_group_name = group_name_from_keyspec(keyspec) |
| |
| if elem_nsr_id != nsr_id or group_name != elem_group_name: |
| continue |
| |
| if instance_cfg.id == instance_id: |
| return True |
| |
| return False |
| |
| def get_scale_group_instance_delta(nsr_id, group_name, xact): |
| |
| #1. Find all elements in the transaction add to the "added" |
| #2. Find matching elements in current elements, remove from "added". |
| #3. Find elements only in current, add to "deleted" |
| |
| xact_ids = set() |
| for instance_cfg, keyspec in self._scale_regh.get_xact_elements(xact, include_keyspec=True): |
| elem_nsr_id = nsr_id_from_keyspec(keyspec) |
| if elem_nsr_id != nsr_id: |
| continue |
| |
| elem_group_name = group_name_from_keyspec(keyspec) |
| if elem_group_name != group_name: |
| continue |
| |
| xact_ids.add(instance_cfg.id) |
| |
| current_ids = set() |
| for instance_cfg, keyspec in self._scale_regh.get_xact_elements(include_keyspec=True): |
| elem_nsr_id = nsr_id_from_keyspec(keyspec) |
| if elem_nsr_id != nsr_id: |
| continue |
| |
| elem_group_name = group_name_from_keyspec(keyspec) |
| if elem_group_name != group_name: |
| continue |
| |
| current_ids.add(instance_cfg.id) |
| |
| delta = { |
| "added": xact_ids - current_ids, |
| "deleted": current_ids - xact_ids |
| } |
| return delta |
| |
| def get_add_delete_update_cfgs(dts_member_reg, xact, key_name): |
| # Unforunately, it is currently difficult to figure out what has exactly |
| # changed in this xact without Pbdelta support (RIFT-4916) |
| # As a workaround, we can fetch the pre and post xact elements and |
| # perform a comparison to figure out adds/deletes/updates |
| xact_cfgs = list(dts_member_reg.get_xact_elements(xact)) |
| curr_cfgs = list(dts_member_reg.elements) |
| |
| xact_key_map = {getattr(cfg, key_name): cfg for cfg in xact_cfgs} |
| curr_key_map = {getattr(cfg, key_name): cfg for cfg in curr_cfgs} |
| |
| # Find Adds |
| added_keys = set(xact_key_map) - set(curr_key_map) |
| added_cfgs = [xact_key_map[key] for key in added_keys] |
| |
| # Find Deletes |
| deleted_keys = set(curr_key_map) - set(xact_key_map) |
| deleted_cfgs = [curr_key_map[key] for key in deleted_keys] |
| |
| # Find Updates |
| updated_keys = set(curr_key_map) & set(xact_key_map) |
| updated_cfgs = [xact_key_map[key] for key in updated_keys if xact_key_map[key] != curr_key_map[key]] |
| |
| return added_cfgs, deleted_cfgs, updated_cfgs |
| |
| def on_apply(dts, acg, xact, action, scratch): |
| """Apply the configuration""" |
| def handle_create_nsr(msg): |
| # Handle create nsr requests """ |
| # Do some validations |
| if not msg.has_field("nsd_ref"): |
| err = "NSD reference not provided" |
| self._log.error(err) |
| raise NetworkServiceRecordError(err) |
| |
| self._log.info("Creating NetworkServiceRecord %s from nsd_id %s", |
| msg.id, msg.nsd_ref) |
| |
| #nsr = self.nsm.create_nsr(msg) |
| return nsr |
| |
| def handle_delete_nsr(msg): |
| @asyncio.coroutine |
| def delete_instantiation(ns_id): |
| """ Delete instantiation """ |
| pass |
| #with self._dts.transaction() as xact: |
| #yield from self._nsm.terminate_ns(ns_id, xact) |
| |
| # Handle delete NSR requests |
| self._log.info("Delete req for NSR Id: %s received", msg.id) |
| # Terminate the NSR instance |
| #nsr = self._nsm.get_ns_by_nsr_id(msg.id) |
| |
| #nsr.set_state(NetworkServiceRecordState.TERMINATE_RCVD) |
| #event_descr = "Terminate rcvd for NS Id:%s" % msg.id |
| #nsr.record_event("terminate-rcvd", event_descr) |
| |
| #self._loop.create_task(delete_instantiation(msg.id)) |
| |
| @asyncio.coroutine |
| def begin_instantiation(nsr): |
| # Begin instantiation |
| pass |
| #self._log.info("Beginning NS instantiation: %s", nsr.id) |
| #yield from self._nsm.instantiate_ns(nsr.id, xact) |
| |
| self._log.debug("Got nsr apply (xact: %s) (action: %s)(scr: %s)", |
| xact, action, scratch) |
| |
| if action == rwdts.AppconfAction.INSTALL and xact.id is None: |
| self._log.debug("No xact handle. Skipping apply config") |
| xact = None |
| |
| (added_msgs, deleted_msgs, updated_msgs) = get_add_delete_update_cfgs(self._nsr_regh, xact, "id") |
| |
| for msg in added_msgs: |
| self._log.info("Create NSR received in on_apply to instantiate NS:%s", msg.id) |
| #if msg.id not in self._nsm.nsrs: |
| # self._log.info("Create NSR received in on_apply to instantiate NS:%s", msg.id) |
| # nsr = handle_create_nsr(msg) |
| # self._loop.create_task(begin_instantiation(nsr)) |
| |
| for msg in deleted_msgs: |
| self._log.info("Delete NSR received in on_apply to terminate NS:%s", msg.id) |
| try: |
| handle_delete_nsr(msg) |
| except Exception: |
| self._log.exception("Failed to terminate NS:%s", msg.id) |
| |
| for msg in updated_msgs: |
| self._log.info("Update NSR received in on_apply to change scaling groups in NS:%s", msg.id) |
| |
| for group in msg.scaling_group: |
| instance_delta = get_scale_group_instance_delta(msg.id, group.scaling_group_name_ref, xact) |
| self._log.debug("Got NSR:%s scale group instance delta: %s", msg.id, instance_delta) |
| |
| #for instance_id in instance_delta["added"]: |
| # self._nsm.scale_nsr_out(msg.id, group.scaling_group_name_ref, instance_id, xact) |
| |
| #for instance_id in instance_delta["deleted"]: |
| # self._nsm.scale_nsr_in(msg.id, group.scaling_group_name_ref, instance_id) |
| |
| |
| return RwTypes.RwStatus.SUCCESS |
| |
| @asyncio.coroutine |
| def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch): |
| """ Prepare calllback from DTS for NSR """ |
| |
| xpath = ks_path.to_xpath(NsrYang.get_schema()) |
| action = xact_info.query_action |
| self._log.debug( |
| "Got Nsr prepare callback (xact: %s) (action: %s) (info: %s), %s:%s)", |
| xact, action, xact_info, xpath, msg |
| ) |
| |
| fref = ProtobufC.FieldReference.alloc() |
| fref.goto_whole_message(msg.to_pbcm()) |
| |
| if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE]: |
| pass |
| # Ensure the Cloud account has been specified if this is an NSR create |
| #if msg.id not in self._nsm.nsrs: |
| # if not msg.has_field("cloud_account"): |
| # raise NsrInstantiationFailed("Cloud account not specified in NSR") |
| |
| # We do not allow scaling actions to occur if the NS is not in running state |
| #elif msg.has_field("scaling_group"): |
| # nsr = self._nsm.nsrs[msg.id] |
| # if nsr.state != NetworkServiceRecordState.RUNNING: |
| # raise ScalingOperationError("Unable to perform scaling action when NS is not in running state") |
| |
| # if len(msg.scaling_group) > 1: |
| # raise ScalingOperationError("Only a single scaling group can be configured at a time") |
| |
| # for group_msg in msg.scaling_group: |
| # num_new_group_instances = len(group_msg.instance) |
| # if num_new_group_instances > 1: |
| # raise ScalingOperationError("Only a single scaling instance can be created at a time") |
| |
| # elif num_new_group_instances == 1: |
| # scale_group = nsr.scaling_groups[group_msg.scaling_group_name_ref] |
| # if len(scale_group.instances) == scale_group.max_instance_count: |
| # raise ScalingOperationError("Max instances for %s reached" % scale_group) |
| |
| |
| acg.handle.prepare_complete_ok(xact_info.handle) |
| |
| |
| self._log.debug("Registering for NSR config using xpath: %s", |
| NsrDtsHandler.NSR_XPATH) |
| |
| acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply) |
| with self._dts.appconf_group_create(handler=acg_hdl) as acg: |
| self._nsr_regh = acg.register(xpath=NsrDtsHandler.NSR_XPATH, |
| flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE, |
| on_prepare=on_prepare) |
| |
| self._scale_regh = acg.register( |
| xpath=NsrDtsHandler.SCALE_INSTANCE_XPATH, |
| flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE, |
| ) |
| |
| |
| class XPaths(object): |
| @staticmethod |
| def nsr_config(nsr_id=None): |
| return ("C,/rw-project:project/nsr:ns-instance-config/nsr:nsr" + |
| ("[nsr:id={}]".format(quoted_key(nsr_id)) if nsr_id is not None else "")) |
| |
| def scaling_group_instance(nsr_id, group_name, instance_id): |
| return ("C,/rw-project:project/nsr:ns-instance-config/nsr:nsr" + |
| "[nsr:id={}]".format(quoted_key(nsr_id)) + |
| "/nsr:scaling-group" + |
| "[nsr:scaling-group-name-ref={}]".format(quoted_key(group_name)) + |
| "/nsr:instance" + |
| "[nsr:id={}]".format(quoted_key(instance_id)) |
| ) |
| |
| |
| class NsrHandlerTestCase(rift.test.dts.AbstractDTSTest): |
| """ |
| DTS GI interface unittests |
| """ |
| @classmethod |
| def configure_schema(cls): |
| return NsrYang.get_schema() |
| |
| @classmethod |
| def configure_timeout(cls): |
| return 240 |
| |
| def configure_test(self, loop, test_id): |
| self.log.debug("STARTING - %s", self.id()) |
| self.tinfo = self.new_tinfo(self.id()) |
| self.dts = rift.tasklets.DTS(self.tinfo, self.schema, self.loop) |
| self.handler = NsrDtsHandler(self.dts, self.log, self.loop, None) |
| |
| self.tinfo_c = self.new_tinfo(self.id() + "_client") |
| self.dts_c = rift.tasklets.DTS(self.tinfo_c, self.schema, self.loop) |
| |
| @rift.test.dts.async_test |
| def test_add_delete_ns(self): |
| |
| nsr1_uuid = "nsr1_uuid" # str(uuid.uuid4()) |
| nsr2_uuid = "nsr2_uuid" # str(uuid.uuid4()) |
| |
| assert nsr1_uuid != nsr2_uuid |
| |
| yield from self.handler.register() |
| yield from asyncio.sleep(.5, loop=self.loop) |
| |
| self.log.debug("Creating NSR") |
| with self.dts_c.transaction() as xact: |
| block = xact.block_create() |
| block.add_query_update( |
| XPaths.nsr_config(nsr1_uuid), |
| NsrYang.YangData_RwProject_Project_NsInstanceConfig_Nsr(id=nsr1_uuid, name="fu"), |
| flags=rwdts.XactFlag.ADVISE | rwdts.XactFlag.TRACE, |
| ) |
| yield from block.execute(now=True) |
| |
| yield from asyncio.sleep(.5, loop=self.loop) |
| |
| with self.dts_c.transaction() as xact: |
| block = xact.block_create() |
| block.add_query_update( |
| XPaths.scaling_group_instance(nsr1_uuid, "group", 1234), |
| NsrYang.YangData_RwProject_Project_NsInstanceConfig_Nsr_ScalingGroup_Instance(id=1234), |
| flags=rwdts.XactFlag.ADVISE | rwdts.XactFlag.TRACE, |
| ) |
| yield from block.execute(now=True) |
| |
| yield from asyncio.sleep(.5, loop=self.loop) |
| |
| with self.dts_c.transaction() as xact: |
| block = xact.block_create() |
| block.add_query_delete( |
| XPaths.scaling_group_instance(nsr1_uuid, "group", 1234), |
| flags=rwdts.XactFlag.ADVISE | rwdts.XactFlag.TRACE, |
| ) |
| yield from block.execute(now=True) |
| |
| yield from asyncio.sleep(.5, loop=self.loop) |
| |
| with self.dts_c.transaction() as xact: |
| block = xact.block_create() |
| block.add_query_create( |
| XPaths.scaling_group_instance(nsr1_uuid, "group", 12345), |
| NsrYang.YangData_RwProject_Project_NsInstanceConfig_Nsr_ScalingGroup_Instance(id=12345), |
| flags=rwdts.XactFlag.ADVISE | rwdts.XactFlag.TRACE, |
| ) |
| yield from block.execute(now=True) |
| |
| yield from asyncio.sleep(.5, loop=self.loop) |
| |
| group_ids = self.handler.get_scale_group_instances(nsr2_uuid, "group") |
| self.log.debug("Got group ids in nsr2 after adding 12345 to nsr1: %s", group_ids) |
| group_ids = self.handler.get_scale_group_instances(nsr1_uuid, "group") |
| self.log.debug("Got group ids in nsr1 after adding 12345 to nsr1: %s", group_ids) |
| assert group_ids == {12345} |
| |
| self.log.debug("\n\nADD A COMPLETELY DIFFERENT NSR\n") |
| with self.dts_c.transaction() as xact: |
| block = xact.block_create() |
| block.add_query_update( |
| XPaths.nsr_config(nsr2_uuid), |
| NsrYang.YangData_RwProject_Project_NsInstanceConfig_Nsr(id=nsr2_uuid, name="fu2"), |
| flags=rwdts.XactFlag.ADVISE | rwdts.XactFlag.TRACE, |
| ) |
| yield from block.execute(now=True) |
| |
| yield from asyncio.sleep(.5, loop=self.loop) |
| |
| group_ids = self.handler.get_scale_group_instances(nsr2_uuid, "group") |
| self.log.debug("Got group ids in nsr2 after adding new nsr: %s", group_ids) |
| group_ids = self.handler.get_scale_group_instances(nsr1_uuid, "group") |
| self.log.debug("Got group ids in nsr1 after adding new nsr: %s", group_ids) |
| assert group_ids == {12345} |
| |
| self.log.debug("\n\nDELETE A COMPLETELY DIFFERENT NSR\n") |
| with self.dts_c.transaction() as xact: |
| block = xact.block_create() |
| block.add_query_delete( |
| XPaths.nsr_config(nsr2_uuid), |
| flags=rwdts.XactFlag.ADVISE | rwdts.XactFlag.TRACE, |
| ) |
| yield from block.execute(now=True) |
| |
| yield from asyncio.sleep(.5, loop=self.loop) |
| |
| group_ids = self.handler.get_scale_group_instances(nsr2_uuid, "group") |
| self.log.debug("Got group ids in nsr2 after deleting nsr2: %s", group_ids) |
| group_ids = self.handler.get_scale_group_instances(nsr1_uuid, "group") |
| self.log.debug("Got group ids in nsr1 after deleting nsr2: %s", group_ids) |
| assert group_ids == {12345} |
| |
| with self.dts_c.transaction() as xact: |
| block = xact.block_create() |
| block.add_query_delete( |
| XPaths.scaling_group_instance(nsr1_uuid, "group", 12345), |
| flags=rwdts.XactFlag.ADVISE | rwdts.XactFlag.TRACE, |
| ) |
| yield from block.execute(now=True) |
| |
| yield from asyncio.sleep(2, loop=self.loop) |
| |
| def main(): |
| runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"]) |
| |
| parser = argparse.ArgumentParser() |
| parser.add_argument('-v', '--verbose', action='store_true') |
| parser.add_argument('-n', '--no-runner', action='store_true') |
| args, unittest_args = parser.parse_known_args() |
| if args.no_runner: |
| runner = None |
| |
| NsrHandlerTestCase.log_level = logging.DEBUG if args.verbose else logging.WARN |
| |
| unittest.main(testRunner=runner, argv=[sys.argv[0]] + unittest_args) |
| |
| if __name__ == '__main__': |
| main() |