4 # Copyright 2016 RIFT.IO Inc
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
30 import gi
.repository
.RwDts
as rwdts
31 import gi
.repository
.RwNsmYang
as rwnsmyang
32 import gi
.repository
.NsrYang
as NsrYang
33 import gi
.repository
.RwNsrYang
as RwNsrYang
34 import gi
.repository
.RwTypes
as RwTypes
35 import gi
.repository
.ProtobufC
as ProtobufC
36 import gi
.repository
.RwResourceMgrYang
as RwResourceMgrYang
37 import gi
.repository
.RwLaunchpadYang
as launchpadyang
41 gi
.require_version('RwKeyspec', '1.0')
42 from gi
.repository
.RwKeyspec
import quoted_key
47 if sys
.version_info
< (3, 4, 4):
48 asyncio
.ensure_future
= asyncio
.async
51 class NsrDtsHandler(object):
52 """ The network service DTS handler """
53 NSR_XPATH
= "C,/rw-project:project/nsr:ns-instance-config/nsr:nsr"
54 SCALE_INSTANCE_XPATH
= "C,/rw-project:project/nsr:ns-instance-config/nsr:nsr/nsr:scaling-group/nsr:instance"
56 def __init__(self
, dts
, log
, loop
, nsm
):
63 self
._scale
_regh
= None
67 """ Return the NS manager instance """
70 def get_scale_group_instances(self
, nsr_id
, group_name
):
71 def nsr_id_from_keyspec(ks
):
72 nsr_path_entry
= NsrYang
.YangData_RwProject_Project_NsInstanceConfig_Nsr
.schema().keyspec_to_entry(ks
)
73 nsr_id
= nsr_path_entry
.key00
.id
76 def group_name_from_keyspec(ks
):
77 group_path_entry
= NsrYang
.YangData_RwProject_Project_NsInstanceConfig_Nsr_ScalingGroup
.schema().keyspec_to_entry(ks
)
78 group_name
= group_path_entry
.key00
.scaling_group_name_ref
83 for instance_cfg
, keyspec
in self
._scale
_regh
.get_xact_elements(include_keyspec
=True):
84 elem_nsr_id
= nsr_id_from_keyspec(keyspec
)
85 if elem_nsr_id
!= nsr_id
:
88 elem_group_name
= group_name_from_keyspec(keyspec
)
89 if elem_group_name
!= group_name
:
92 xact_ids
.add(instance_cfg
.id)
98 """ Register for Nsr create/update/delete/read requests from dts """
100 def nsr_id_from_keyspec(ks
):
101 nsr_path_entry
= NsrYang
.YangData_RwProject_Project_NsInstanceConfig_Nsr
.schema().keyspec_to_entry(ks
)
102 nsr_id
= nsr_path_entry
.key00
.id
105 def group_name_from_keyspec(ks
):
106 group_path_entry
= NsrYang
.YangData_RwProject_Project_NsInstanceConfig_Nsr_ScalingGroup
.schema().keyspec_to_entry(ks
)
107 group_name
= group_path_entry
.key00
.scaling_group_name_ref
110 def is_instance_in_reg_elements(nsr_id
, group_name
, instance_id
):
111 """ Return boolean indicating if scaling group instance was already commited previously.
113 By looking at the existing elements in this registration handle (elements not part
114 of this current xact), we can tell if the instance was configured previously without
115 keeping any application state.
117 for instance_cfg
, keyspec
in self
._nsr
_regh
.get_xact_elements(include_keyspec
=True):
118 elem_nsr_id
= nsr_id_from_keyspec(keyspec
)
119 elem_group_name
= group_name_from_keyspec(keyspec
)
121 if elem_nsr_id
!= nsr_id
or group_name
!= elem_group_name
:
124 if instance_cfg
.id == instance_id
:
129 def get_scale_group_instance_delta(nsr_id
, group_name
, xact
):
131 #1. Find all elements in the transaction add to the "added"
132 #2. Find matching elements in current elements, remove from "added".
133 #3. Find elements only in current, add to "deleted"
136 for instance_cfg
, keyspec
in self
._scale
_regh
.get_xact_elements(xact
, include_keyspec
=True):
137 elem_nsr_id
= nsr_id_from_keyspec(keyspec
)
138 if elem_nsr_id
!= nsr_id
:
141 elem_group_name
= group_name_from_keyspec(keyspec
)
142 if elem_group_name
!= group_name
:
145 xact_ids
.add(instance_cfg
.id)
148 for instance_cfg
, keyspec
in self
._scale
_regh
.get_xact_elements(include_keyspec
=True):
149 elem_nsr_id
= nsr_id_from_keyspec(keyspec
)
150 if elem_nsr_id
!= nsr_id
:
153 elem_group_name
= group_name_from_keyspec(keyspec
)
154 if elem_group_name
!= group_name
:
157 current_ids
.add(instance_cfg
.id)
160 "added": xact_ids
- current_ids
,
161 "deleted": current_ids
- xact_ids
165 def get_add_delete_update_cfgs(dts_member_reg
, xact
, key_name
):
166 # Unforunately, it is currently difficult to figure out what has exactly
167 # changed in this xact without Pbdelta support (RIFT-4916)
168 # As a workaround, we can fetch the pre and post xact elements and
169 # perform a comparison to figure out adds/deletes/updates
170 xact_cfgs
= list(dts_member_reg
.get_xact_elements(xact
))
171 curr_cfgs
= list(dts_member_reg
.elements
)
173 xact_key_map
= {getattr(cfg
, key_name
): cfg
for cfg
in xact_cfgs
}
174 curr_key_map
= {getattr(cfg
, key_name
): cfg
for cfg
in curr_cfgs
}
177 added_keys
= set(xact_key_map
) - set(curr_key_map
)
178 added_cfgs
= [xact_key_map
[key
] for key
in added_keys
]
181 deleted_keys
= set(curr_key_map
) - set(xact_key_map
)
182 deleted_cfgs
= [curr_key_map
[key
] for key
in deleted_keys
]
185 updated_keys
= set(curr_key_map
) & set(xact_key_map
)
186 updated_cfgs
= [xact_key_map
[key
] for key
in updated_keys
if xact_key_map
[key
] != curr_key_map
[key
]]
188 return added_cfgs
, deleted_cfgs
, updated_cfgs
190 def on_apply(dts
, acg
, xact
, action
, scratch
):
191 """Apply the configuration"""
192 def handle_create_nsr(msg
):
193 # Handle create nsr requests """
194 # Do some validations
195 if not msg
.has_field("nsd_ref"):
196 err
= "NSD reference not provided"
198 raise NetworkServiceRecordError(err
)
200 self
._log
.info("Creating NetworkServiceRecord %s from nsd_id %s",
203 #nsr = self.nsm.create_nsr(msg)
206 def handle_delete_nsr(msg
):
208 def delete_instantiation(ns_id
):
209 """ Delete instantiation """
211 #with self._dts.transaction() as xact:
212 #yield from self._nsm.terminate_ns(ns_id, xact)
214 # Handle delete NSR requests
215 self
._log
.info("Delete req for NSR Id: %s received", msg
.id)
216 # Terminate the NSR instance
217 #nsr = self._nsm.get_ns_by_nsr_id(msg.id)
219 #nsr.set_state(NetworkServiceRecordState.TERMINATE_RCVD)
220 #event_descr = "Terminate rcvd for NS Id:%s" % msg.id
221 #nsr.record_event("terminate-rcvd", event_descr)
223 #self._loop.create_task(delete_instantiation(msg.id))
226 def begin_instantiation(nsr
):
227 # Begin instantiation
229 #self._log.info("Beginning NS instantiation: %s", nsr.id)
230 #yield from self._nsm.instantiate_ns(nsr.id, xact)
232 self
._log
.debug("Got nsr apply (xact: %s) (action: %s)(scr: %s)",
233 xact
, action
, scratch
)
235 if action
== rwdts
.AppconfAction
.INSTALL
and xact
.id is None:
236 self
._log
.debug("No xact handle. Skipping apply config")
239 (added_msgs
, deleted_msgs
, updated_msgs
) = get_add_delete_update_cfgs(self
._nsr
_regh
, xact
, "id")
241 for msg
in added_msgs
:
242 self
._log
.info("Create NSR received in on_apply to instantiate NS:%s", msg
.id)
243 #if msg.id not in self._nsm.nsrs:
244 # self._log.info("Create NSR received in on_apply to instantiate NS:%s", msg.id)
245 # nsr = handle_create_nsr(msg)
246 # self._loop.create_task(begin_instantiation(nsr))
248 for msg
in deleted_msgs
:
249 self
._log
.info("Delete NSR received in on_apply to terminate NS:%s", msg
.id)
251 handle_delete_nsr(msg
)
253 self
._log
.exception("Failed to terminate NS:%s", msg
.id)
255 for msg
in updated_msgs
:
256 self
._log
.info("Update NSR received in on_apply to change scaling groups in NS:%s", msg
.id)
258 for group
in msg
.scaling_group
:
259 instance_delta
= get_scale_group_instance_delta(msg
.id, group
.scaling_group_name_ref
, xact
)
260 self
._log
.debug("Got NSR:%s scale group instance delta: %s", msg
.id, instance_delta
)
262 #for instance_id in instance_delta["added"]:
263 # self._nsm.scale_nsr_out(msg.id, group.scaling_group_name_ref, instance_id, xact)
265 #for instance_id in instance_delta["deleted"]:
266 # self._nsm.scale_nsr_in(msg.id, group.scaling_group_name_ref, instance_id)
269 return RwTypes
.RwStatus
.SUCCESS
272 def on_prepare(dts
, acg
, xact
, xact_info
, ks_path
, msg
, scratch
):
273 """ Prepare calllback from DTS for NSR """
275 xpath
= ks_path
.to_xpath(NsrYang
.get_schema())
276 action
= xact_info
.query_action
278 "Got Nsr prepare callback (xact: %s) (action: %s) (info: %s), %s:%s)",
279 xact
, action
, xact_info
, xpath
, msg
282 fref
= ProtobufC
.FieldReference
.alloc()
283 fref
.goto_whole_message(msg
.to_pbcm())
285 if action
in [rwdts
.QueryAction
.CREATE
, rwdts
.QueryAction
.UPDATE
]:
287 # Ensure the Cloud account has been specified if this is an NSR create
288 #if msg.id not in self._nsm.nsrs:
289 # if not msg.has_field("cloud_account"):
290 # raise NsrInstantiationFailed("Cloud account not specified in NSR")
292 # We do not allow scaling actions to occur if the NS is not in running state
293 #elif msg.has_field("scaling_group"):
294 # nsr = self._nsm.nsrs[msg.id]
295 # if nsr.state != NetworkServiceRecordState.RUNNING:
296 # raise ScalingOperationError("Unable to perform scaling action when NS is not in running state")
298 # if len(msg.scaling_group) > 1:
299 # raise ScalingOperationError("Only a single scaling group can be configured at a time")
301 # for group_msg in msg.scaling_group:
302 # num_new_group_instances = len(group_msg.instance)
303 # if num_new_group_instances > 1:
304 # raise ScalingOperationError("Only a single scaling instance can be created at a time")
306 # elif num_new_group_instances == 1:
307 # scale_group = nsr.scaling_groups[group_msg.scaling_group_name_ref]
308 # if len(scale_group.instances) == scale_group.max_instance_count:
309 # raise ScalingOperationError("Max instances for %s reached" % scale_group)
312 acg
.handle
.prepare_complete_ok(xact_info
.handle
)
315 self
._log
.debug("Registering for NSR config using xpath: %s",
316 NsrDtsHandler
.NSR_XPATH
)
318 acg_hdl
= rift
.tasklets
.AppConfGroup
.Handler(on_apply
=on_apply
)
319 with self
._dts
.appconf_group_create(handler
=acg_hdl
) as acg
:
320 self
._nsr
_regh
= acg
.register(xpath
=NsrDtsHandler
.NSR_XPATH
,
321 flags
=rwdts
.Flag
.SUBSCRIBER | rwdts
.Flag
.DELTA_READY | rwdts
.Flag
.CACHE
,
322 on_prepare
=on_prepare
)
324 self
._scale
_regh
= acg
.register(
325 xpath
=NsrDtsHandler
.SCALE_INSTANCE_XPATH
,
326 flags
=rwdts
.Flag
.SUBSCRIBER | rwdts
.Flag
.DELTA_READY | rwdts
.Flag
.CACHE
,
330 class XPaths(object):
332 def nsr_config(nsr_id
=None):
333 return ("C,/rw-project:project/nsr:ns-instance-config/nsr:nsr" +
334 ("[nsr:id={}]".format(quoted_key(nsr_id
)) if nsr_id
is not None else ""))
336 def scaling_group_instance(nsr_id
, group_name
, instance_id
):
337 return ("C,/rw-project:project/nsr:ns-instance-config/nsr:nsr" +
338 "[nsr:id={}]".format(quoted_key(nsr_id
)) +
339 "/nsr:scaling-group" +
340 "[nsr:scaling-group-name-ref={}]".format(quoted_key(group_name
)) +
342 "[nsr:id={}]".format(quoted_key(instance_id
))
346 class NsrHandlerTestCase(rift
.test
.dts
.AbstractDTSTest
):
348 DTS GI interface unittests
351 def configure_schema(cls
):
352 return NsrYang
.get_schema()
355 def configure_timeout(cls
):
358 def configure_test(self
, loop
, test_id
):
359 self
.log
.debug("STARTING - %s", self
.id())
360 self
.tinfo
= self
.new_tinfo(self
.id())
361 self
.dts
= rift
.tasklets
.DTS(self
.tinfo
, self
.schema
, self
.loop
)
362 self
.handler
= NsrDtsHandler(self
.dts
, self
.log
, self
.loop
, None)
364 self
.tinfo_c
= self
.new_tinfo(self
.id() + "_client")
365 self
.dts_c
= rift
.tasklets
.DTS(self
.tinfo_c
, self
.schema
, self
.loop
)
367 @rift.test
.dts
.async_test
368 def test_add_delete_ns(self
):
370 nsr1_uuid
= "nsr1_uuid" # str(uuid.uuid4())
371 nsr2_uuid
= "nsr2_uuid" # str(uuid.uuid4())
373 assert nsr1_uuid
!= nsr2_uuid
375 yield from self
.handler
.register()
376 yield from asyncio
.sleep(.5, loop
=self
.loop
)
378 self
.log
.debug("Creating NSR")
379 with self
.dts_c
.transaction() as xact
:
380 block
= xact
.block_create()
381 block
.add_query_update(
382 XPaths
.nsr_config(nsr1_uuid
),
383 NsrYang
.YangData_RwProject_Project_NsInstanceConfig_Nsr(id=nsr1_uuid
, name
="fu"),
384 flags
=rwdts
.XactFlag
.ADVISE | rwdts
.XactFlag
.TRACE
,
386 yield from block
.execute(now
=True)
388 yield from asyncio
.sleep(.5, loop
=self
.loop
)
390 with self
.dts_c
.transaction() as xact
:
391 block
= xact
.block_create()
392 block
.add_query_update(
393 XPaths
.scaling_group_instance(nsr1_uuid
, "group", 1234),
394 NsrYang
.YangData_RwProject_Project_NsInstanceConfig_Nsr_ScalingGroup_Instance(id=1234),
395 flags
=rwdts
.XactFlag
.ADVISE | rwdts
.XactFlag
.TRACE
,
397 yield from block
.execute(now
=True)
399 yield from asyncio
.sleep(.5, loop
=self
.loop
)
401 with self
.dts_c
.transaction() as xact
:
402 block
= xact
.block_create()
403 block
.add_query_delete(
404 XPaths
.scaling_group_instance(nsr1_uuid
, "group", 1234),
405 flags
=rwdts
.XactFlag
.ADVISE | rwdts
.XactFlag
.TRACE
,
407 yield from block
.execute(now
=True)
409 yield from asyncio
.sleep(.5, loop
=self
.loop
)
411 with self
.dts_c
.transaction() as xact
:
412 block
= xact
.block_create()
413 block
.add_query_create(
414 XPaths
.scaling_group_instance(nsr1_uuid
, "group", 12345),
415 NsrYang
.YangData_RwProject_Project_NsInstanceConfig_Nsr_ScalingGroup_Instance(id=12345),
416 flags
=rwdts
.XactFlag
.ADVISE | rwdts
.XactFlag
.TRACE
,
418 yield from block
.execute(now
=True)
420 yield from asyncio
.sleep(.5, loop
=self
.loop
)
422 group_ids
= self
.handler
.get_scale_group_instances(nsr2_uuid
, "group")
423 self
.log
.debug("Got group ids in nsr2 after adding 12345 to nsr1: %s", group_ids
)
424 group_ids
= self
.handler
.get_scale_group_instances(nsr1_uuid
, "group")
425 self
.log
.debug("Got group ids in nsr1 after adding 12345 to nsr1: %s", group_ids
)
426 assert group_ids
== {12345}
428 self
.log
.debug("\n\nADD A COMPLETELY DIFFERENT NSR\n")
429 with self
.dts_c
.transaction() as xact
:
430 block
= xact
.block_create()
431 block
.add_query_update(
432 XPaths
.nsr_config(nsr2_uuid
),
433 NsrYang
.YangData_RwProject_Project_NsInstanceConfig_Nsr(id=nsr2_uuid
, name
="fu2"),
434 flags
=rwdts
.XactFlag
.ADVISE | rwdts
.XactFlag
.TRACE
,
436 yield from block
.execute(now
=True)
438 yield from asyncio
.sleep(.5, loop
=self
.loop
)
440 group_ids
= self
.handler
.get_scale_group_instances(nsr2_uuid
, "group")
441 self
.log
.debug("Got group ids in nsr2 after adding new nsr: %s", group_ids
)
442 group_ids
= self
.handler
.get_scale_group_instances(nsr1_uuid
, "group")
443 self
.log
.debug("Got group ids in nsr1 after adding new nsr: %s", group_ids
)
444 assert group_ids
== {12345}
446 self
.log
.debug("\n\nDELETE A COMPLETELY DIFFERENT NSR\n")
447 with self
.dts_c
.transaction() as xact
:
448 block
= xact
.block_create()
449 block
.add_query_delete(
450 XPaths
.nsr_config(nsr2_uuid
),
451 flags
=rwdts
.XactFlag
.ADVISE | rwdts
.XactFlag
.TRACE
,
453 yield from block
.execute(now
=True)
455 yield from asyncio
.sleep(.5, loop
=self
.loop
)
457 group_ids
= self
.handler
.get_scale_group_instances(nsr2_uuid
, "group")
458 self
.log
.debug("Got group ids in nsr2 after deleting nsr2: %s", group_ids
)
459 group_ids
= self
.handler
.get_scale_group_instances(nsr1_uuid
, "group")
460 self
.log
.debug("Got group ids in nsr1 after deleting nsr2: %s", group_ids
)
461 assert group_ids
== {12345}
463 with self
.dts_c
.transaction() as xact
:
464 block
= xact
.block_create()
465 block
.add_query_delete(
466 XPaths
.scaling_group_instance(nsr1_uuid
, "group", 12345),
467 flags
=rwdts
.XactFlag
.ADVISE | rwdts
.XactFlag
.TRACE
,
469 yield from block
.execute(now
=True)
471 yield from asyncio
.sleep(2, loop
=self
.loop
)
474 runner
= xmlrunner
.XMLTestRunner(output
=os
.environ
["RIFT_MODULE_TEST"])
476 parser
= argparse
.ArgumentParser()
477 parser
.add_argument('-v', '--verbose', action
='store_true')
478 parser
.add_argument('-n', '--no-runner', action
='store_true')
479 args
, unittest_args
= parser
.parse_known_args()
483 NsrHandlerTestCase
.log_level
= logging
.DEBUG
if args
.verbose
else logging
.WARN
485 unittest
.main(testRunner
=runner
, argv
=[sys
.argv
[0]] + unittest_args
)
487 if __name__
== '__main__':