4 # Copyright 2016 RIFT.IO Inc
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
30 import gi
.repository
.RwDts
as rwdts
31 import gi
.repository
.RwNsmYang
as rwnsmyang
32 import gi
.repository
.NsrYang
as NsrYang
33 import gi
.repository
.RwNsrYang
as RwNsrYang
34 import gi
.repository
.RwTypes
as RwTypes
35 import gi
.repository
.ProtobufC
as ProtobufC
36 import gi
.repository
.RwResourceMgrYang
as RwResourceMgrYang
37 import gi
.repository
.RwLaunchpadYang
as launchpadyang
44 if sys
.version_info
< (3, 4, 4):
45 asyncio
.ensure_future
= asyncio
.async
48 class NsrDtsHandler(object):
49 """ The network service DTS handler """
50 NSR_XPATH
= "C,/rw-project:project/nsr:ns-instance-config/nsr:nsr"
51 SCALE_INSTANCE_XPATH
= "C,/rw-project:project/nsr:ns-instance-config/nsr:nsr/nsr:scaling-group/nsr:instance"
53 def __init__(self
, dts
, log
, loop
, nsm
):
60 self
._scale
_regh
= None
64 """ Return the NS manager instance """
67 def get_scale_group_instances(self
, nsr_id
, group_name
):
68 def nsr_id_from_keyspec(ks
):
69 nsr_path_entry
= NsrYang
.YangData_RwProject_Project_NsInstanceConfig_Nsr
.schema().keyspec_to_entry(ks
)
70 nsr_id
= nsr_path_entry
.key00
.id
73 def group_name_from_keyspec(ks
):
74 group_path_entry
= NsrYang
.YangData_RwProject_Project_NsInstanceConfig_Nsr_ScalingGroup
.schema().keyspec_to_entry(ks
)
75 group_name
= group_path_entry
.key00
.scaling_group_name_ref
80 for instance_cfg
, keyspec
in self
._scale
_regh
.get_xact_elements(include_keyspec
=True):
81 elem_nsr_id
= nsr_id_from_keyspec(keyspec
)
82 if elem_nsr_id
!= nsr_id
:
85 elem_group_name
= group_name_from_keyspec(keyspec
)
86 if elem_group_name
!= group_name
:
89 xact_ids
.add(instance_cfg
.id)
95 """ Register for Nsr create/update/delete/read requests from dts """
97 def nsr_id_from_keyspec(ks
):
98 nsr_path_entry
= NsrYang
.YangData_RwProject_Project_NsInstanceConfig_Nsr
.schema().keyspec_to_entry(ks
)
99 nsr_id
= nsr_path_entry
.key00
.id
102 def group_name_from_keyspec(ks
):
103 group_path_entry
= NsrYang
.YangData_RwProject_Project_NsInstanceConfig_Nsr_ScalingGroup
.schema().keyspec_to_entry(ks
)
104 group_name
= group_path_entry
.key00
.scaling_group_name_ref
107 def is_instance_in_reg_elements(nsr_id
, group_name
, instance_id
):
108 """ Return boolean indicating if scaling group instance was already commited previously.
110 By looking at the existing elements in this registration handle (elements not part
111 of this current xact), we can tell if the instance was configured previously without
112 keeping any application state.
114 for instance_cfg
, keyspec
in self
._nsr
_regh
.get_xact_elements(include_keyspec
=True):
115 elem_nsr_id
= nsr_id_from_keyspec(keyspec
)
116 elem_group_name
= group_name_from_keyspec(keyspec
)
118 if elem_nsr_id
!= nsr_id
or group_name
!= elem_group_name
:
121 if instance_cfg
.id == instance_id
:
126 def get_scale_group_instance_delta(nsr_id
, group_name
, xact
):
128 #1. Find all elements in the transaction add to the "added"
129 #2. Find matching elements in current elements, remove from "added".
130 #3. Find elements only in current, add to "deleted"
133 for instance_cfg
, keyspec
in self
._scale
_regh
.get_xact_elements(xact
, include_keyspec
=True):
134 elem_nsr_id
= nsr_id_from_keyspec(keyspec
)
135 if elem_nsr_id
!= nsr_id
:
138 elem_group_name
= group_name_from_keyspec(keyspec
)
139 if elem_group_name
!= group_name
:
142 xact_ids
.add(instance_cfg
.id)
145 for instance_cfg
, keyspec
in self
._scale
_regh
.get_xact_elements(include_keyspec
=True):
146 elem_nsr_id
= nsr_id_from_keyspec(keyspec
)
147 if elem_nsr_id
!= nsr_id
:
150 elem_group_name
= group_name_from_keyspec(keyspec
)
151 if elem_group_name
!= group_name
:
154 current_ids
.add(instance_cfg
.id)
157 "added": xact_ids
- current_ids
,
158 "deleted": current_ids
- xact_ids
162 def get_add_delete_update_cfgs(dts_member_reg
, xact
, key_name
):
163 # Unforunately, it is currently difficult to figure out what has exactly
164 # changed in this xact without Pbdelta support (RIFT-4916)
165 # As a workaround, we can fetch the pre and post xact elements and
166 # perform a comparison to figure out adds/deletes/updates
167 xact_cfgs
= list(dts_member_reg
.get_xact_elements(xact
))
168 curr_cfgs
= list(dts_member_reg
.elements
)
170 xact_key_map
= {getattr(cfg
, key_name
): cfg
for cfg
in xact_cfgs
}
171 curr_key_map
= {getattr(cfg
, key_name
): cfg
for cfg
in curr_cfgs
}
174 added_keys
= set(xact_key_map
) - set(curr_key_map
)
175 added_cfgs
= [xact_key_map
[key
] for key
in added_keys
]
178 deleted_keys
= set(curr_key_map
) - set(xact_key_map
)
179 deleted_cfgs
= [curr_key_map
[key
] for key
in deleted_keys
]
182 updated_keys
= set(curr_key_map
) & set(xact_key_map
)
183 updated_cfgs
= [xact_key_map
[key
] for key
in updated_keys
if xact_key_map
[key
] != curr_key_map
[key
]]
185 return added_cfgs
, deleted_cfgs
, updated_cfgs
187 def on_apply(dts
, acg
, xact
, action
, scratch
):
188 """Apply the configuration"""
189 def handle_create_nsr(msg
):
190 # Handle create nsr requests """
191 # Do some validations
192 if not msg
.has_field("nsd_ref"):
193 err
= "NSD reference not provided"
195 raise NetworkServiceRecordError(err
)
197 self
._log
.info("Creating NetworkServiceRecord %s from nsd_id %s",
200 #nsr = self.nsm.create_nsr(msg)
203 def handle_delete_nsr(msg
):
205 def delete_instantiation(ns_id
):
206 """ Delete instantiation """
208 #with self._dts.transaction() as xact:
209 #yield from self._nsm.terminate_ns(ns_id, xact)
211 # Handle delete NSR requests
212 self
._log
.info("Delete req for NSR Id: %s received", msg
.id)
213 # Terminate the NSR instance
214 #nsr = self._nsm.get_ns_by_nsr_id(msg.id)
216 #nsr.set_state(NetworkServiceRecordState.TERMINATE_RCVD)
217 #event_descr = "Terminate rcvd for NS Id:%s" % msg.id
218 #nsr.record_event("terminate-rcvd", event_descr)
220 #self._loop.create_task(delete_instantiation(msg.id))
223 def begin_instantiation(nsr
):
224 # Begin instantiation
226 #self._log.info("Beginning NS instantiation: %s", nsr.id)
227 #yield from self._nsm.instantiate_ns(nsr.id, xact)
229 self
._log
.debug("Got nsr apply (xact: %s) (action: %s)(scr: %s)",
230 xact
, action
, scratch
)
232 if action
== rwdts
.AppconfAction
.INSTALL
and xact
.id is None:
233 self
._log
.debug("No xact handle. Skipping apply config")
236 (added_msgs
, deleted_msgs
, updated_msgs
) = get_add_delete_update_cfgs(self
._nsr
_regh
, xact
, "id")
238 for msg
in added_msgs
:
239 self
._log
.info("Create NSR received in on_apply to instantiate NS:%s", msg
.id)
240 #if msg.id not in self._nsm.nsrs:
241 # self._log.info("Create NSR received in on_apply to instantiate NS:%s", msg.id)
242 # nsr = handle_create_nsr(msg)
243 # self._loop.create_task(begin_instantiation(nsr))
245 for msg
in deleted_msgs
:
246 self
._log
.info("Delete NSR received in on_apply to terminate NS:%s", msg
.id)
248 handle_delete_nsr(msg
)
250 self
._log
.exception("Failed to terminate NS:%s", msg
.id)
252 for msg
in updated_msgs
:
253 self
._log
.info("Update NSR received in on_apply to change scaling groups in NS:%s", msg
.id)
255 for group
in msg
.scaling_group
:
256 instance_delta
= get_scale_group_instance_delta(msg
.id, group
.scaling_group_name_ref
, xact
)
257 self
._log
.debug("Got NSR:%s scale group instance delta: %s", msg
.id, instance_delta
)
259 #for instance_id in instance_delta["added"]:
260 # self._nsm.scale_nsr_out(msg.id, group.scaling_group_name_ref, instance_id, xact)
262 #for instance_id in instance_delta["deleted"]:
263 # self._nsm.scale_nsr_in(msg.id, group.scaling_group_name_ref, instance_id)
266 return RwTypes
.RwStatus
.SUCCESS
269 def on_prepare(dts
, acg
, xact
, xact_info
, ks_path
, msg
, scratch
):
270 """ Prepare calllback from DTS for NSR """
272 xpath
= ks_path
.to_xpath(NsrYang
.get_schema())
273 action
= xact_info
.query_action
275 "Got Nsr prepare callback (xact: %s) (action: %s) (info: %s), %s:%s)",
276 xact
, action
, xact_info
, xpath
, msg
279 fref
= ProtobufC
.FieldReference
.alloc()
280 fref
.goto_whole_message(msg
.to_pbcm())
282 if action
in [rwdts
.QueryAction
.CREATE
, rwdts
.QueryAction
.UPDATE
]:
284 # Ensure the Cloud account has been specified if this is an NSR create
285 #if msg.id not in self._nsm.nsrs:
286 # if not msg.has_field("cloud_account"):
287 # raise NsrInstantiationFailed("Cloud account not specified in NSR")
289 # We do not allow scaling actions to occur if the NS is not in running state
290 #elif msg.has_field("scaling_group"):
291 # nsr = self._nsm.nsrs[msg.id]
292 # if nsr.state != NetworkServiceRecordState.RUNNING:
293 # raise ScalingOperationError("Unable to perform scaling action when NS is not in running state")
295 # if len(msg.scaling_group) > 1:
296 # raise ScalingOperationError("Only a single scaling group can be configured at a time")
298 # for group_msg in msg.scaling_group:
299 # num_new_group_instances = len(group_msg.instance)
300 # if num_new_group_instances > 1:
301 # raise ScalingOperationError("Only a single scaling instance can be created at a time")
303 # elif num_new_group_instances == 1:
304 # scale_group = nsr.scaling_groups[group_msg.scaling_group_name_ref]
305 # if len(scale_group.instances) == scale_group.max_instance_count:
306 # raise ScalingOperationError("Max instances for %s reached" % scale_group)
309 acg
.handle
.prepare_complete_ok(xact_info
.handle
)
312 self
._log
.debug("Registering for NSR config using xpath: %s",
313 NsrDtsHandler
.NSR_XPATH
)
315 acg_hdl
= rift
.tasklets
.AppConfGroup
.Handler(on_apply
=on_apply
)
316 with self
._dts
.appconf_group_create(handler
=acg_hdl
) as acg
:
317 self
._nsr
_regh
= acg
.register(xpath
=NsrDtsHandler
.NSR_XPATH
,
318 flags
=rwdts
.Flag
.SUBSCRIBER | rwdts
.Flag
.DELTA_READY | rwdts
.Flag
.CACHE
,
319 on_prepare
=on_prepare
)
321 self
._scale
_regh
= acg
.register(
322 xpath
=NsrDtsHandler
.SCALE_INSTANCE_XPATH
,
323 flags
=rwdts
.Flag
.SUBSCRIBER | rwdts
.Flag
.DELTA_READY | rwdts
.Flag
.CACHE
,
327 class XPaths(object):
329 def nsr_config(nsr_id
=None):
330 return ("C,/rw-project:project/nsr:ns-instance-config/nsr:nsr" +
331 ("[nsr:id='{}']".format(nsr_id
) if nsr_id
is not None else ""))
333 def scaling_group_instance(nsr_id
, group_name
, instance_id
):
334 return ("C,/rw-project:project/nsr:ns-instance-config/nsr:nsr" +
335 "[nsr:id='{}']".format(nsr_id
) +
336 "/nsr:scaling-group" +
337 "[nsr:scaling-group-name-ref='{}']".format(group_name
) +
339 "[nsr:id='{}']".format(instance_id
)
343 class NsrHandlerTestCase(rift
.test
.dts
.AbstractDTSTest
):
345 DTS GI interface unittests
348 def configure_schema(cls
):
349 return NsrYang
.get_schema()
352 def configure_timeout(cls
):
355 def configure_test(self
, loop
, test_id
):
356 self
.log
.debug("STARTING - %s", self
.id())
357 self
.tinfo
= self
.new_tinfo(self
.id())
358 self
.dts
= rift
.tasklets
.DTS(self
.tinfo
, self
.schema
, self
.loop
)
359 self
.handler
= NsrDtsHandler(self
.dts
, self
.log
, self
.loop
, None)
361 self
.tinfo_c
= self
.new_tinfo(self
.id() + "_client")
362 self
.dts_c
= rift
.tasklets
.DTS(self
.tinfo_c
, self
.schema
, self
.loop
)
364 @rift.test
.dts
.async_test
365 def test_add_delete_ns(self
):
367 nsr1_uuid
= "nsr1_uuid" # str(uuid.uuid4())
368 nsr2_uuid
= "nsr2_uuid" # str(uuid.uuid4())
370 assert nsr1_uuid
!= nsr2_uuid
372 yield from self
.handler
.register()
373 yield from asyncio
.sleep(.5, loop
=self
.loop
)
375 self
.log
.debug("Creating NSR")
376 with self
.dts_c
.transaction() as xact
:
377 block
= xact
.block_create()
378 block
.add_query_update(
379 XPaths
.nsr_config(nsr1_uuid
),
380 NsrYang
.YangData_RwProject_Project_NsInstanceConfig_Nsr(id=nsr1_uuid
, name
="fu"),
381 flags
=rwdts
.XactFlag
.ADVISE | rwdts
.XactFlag
.TRACE
,
383 yield from block
.execute(now
=True)
385 yield from asyncio
.sleep(.5, loop
=self
.loop
)
387 with self
.dts_c
.transaction() as xact
:
388 block
= xact
.block_create()
389 block
.add_query_update(
390 XPaths
.scaling_group_instance(nsr1_uuid
, "group", 1234),
391 NsrYang
.YangData_RwProject_Project_NsInstanceConfig_Nsr_ScalingGroup_Instance(id=1234),
392 flags
=rwdts
.XactFlag
.ADVISE | rwdts
.XactFlag
.TRACE
,
394 yield from block
.execute(now
=True)
396 yield from asyncio
.sleep(.5, loop
=self
.loop
)
398 with self
.dts_c
.transaction() as xact
:
399 block
= xact
.block_create()
400 block
.add_query_delete(
401 XPaths
.scaling_group_instance(nsr1_uuid
, "group", 1234),
402 flags
=rwdts
.XactFlag
.ADVISE | rwdts
.XactFlag
.TRACE
,
404 yield from block
.execute(now
=True)
406 yield from asyncio
.sleep(.5, loop
=self
.loop
)
408 with self
.dts_c
.transaction() as xact
:
409 block
= xact
.block_create()
410 block
.add_query_create(
411 XPaths
.scaling_group_instance(nsr1_uuid
, "group", 12345),
412 NsrYang
.YangData_RwProject_Project_NsInstanceConfig_Nsr_ScalingGroup_Instance(id=12345),
413 flags
=rwdts
.XactFlag
.ADVISE | rwdts
.XactFlag
.TRACE
,
415 yield from block
.execute(now
=True)
417 yield from asyncio
.sleep(.5, loop
=self
.loop
)
419 group_ids
= self
.handler
.get_scale_group_instances(nsr2_uuid
, "group")
420 self
.log
.debug("Got group ids in nsr2 after adding 12345 to nsr1: %s", group_ids
)
421 group_ids
= self
.handler
.get_scale_group_instances(nsr1_uuid
, "group")
422 self
.log
.debug("Got group ids in nsr1 after adding 12345 to nsr1: %s", group_ids
)
423 assert group_ids
== {12345}
425 self
.log
.debug("\n\nADD A COMPLETELY DIFFERENT NSR\n")
426 with self
.dts_c
.transaction() as xact
:
427 block
= xact
.block_create()
428 block
.add_query_update(
429 XPaths
.nsr_config(nsr2_uuid
),
430 NsrYang
.YangData_RwProject_Project_NsInstanceConfig_Nsr(id=nsr2_uuid
, name
="fu2"),
431 flags
=rwdts
.XactFlag
.ADVISE | rwdts
.XactFlag
.TRACE
,
433 yield from block
.execute(now
=True)
435 yield from asyncio
.sleep(.5, loop
=self
.loop
)
437 group_ids
= self
.handler
.get_scale_group_instances(nsr2_uuid
, "group")
438 self
.log
.debug("Got group ids in nsr2 after adding new nsr: %s", group_ids
)
439 group_ids
= self
.handler
.get_scale_group_instances(nsr1_uuid
, "group")
440 self
.log
.debug("Got group ids in nsr1 after adding new nsr: %s", group_ids
)
441 assert group_ids
== {12345}
443 self
.log
.debug("\n\nDELETE A COMPLETELY DIFFERENT NSR\n")
444 with self
.dts_c
.transaction() as xact
:
445 block
= xact
.block_create()
446 block
.add_query_delete(
447 XPaths
.nsr_config(nsr2_uuid
),
448 flags
=rwdts
.XactFlag
.ADVISE | rwdts
.XactFlag
.TRACE
,
450 yield from block
.execute(now
=True)
452 yield from asyncio
.sleep(.5, loop
=self
.loop
)
454 group_ids
= self
.handler
.get_scale_group_instances(nsr2_uuid
, "group")
455 self
.log
.debug("Got group ids in nsr2 after deleting nsr2: %s", group_ids
)
456 group_ids
= self
.handler
.get_scale_group_instances(nsr1_uuid
, "group")
457 self
.log
.debug("Got group ids in nsr1 after deleting nsr2: %s", group_ids
)
458 assert group_ids
== {12345}
460 with self
.dts_c
.transaction() as xact
:
461 block
= xact
.block_create()
462 block
.add_query_delete(
463 XPaths
.scaling_group_instance(nsr1_uuid
, "group", 12345),
464 flags
=rwdts
.XactFlag
.ADVISE | rwdts
.XactFlag
.TRACE
,
466 yield from block
.execute(now
=True)
468 yield from asyncio
.sleep(2, loop
=self
.loop
)
471 runner
= xmlrunner
.XMLTestRunner(output
=os
.environ
["RIFT_MODULE_TEST"])
473 parser
= argparse
.ArgumentParser()
474 parser
.add_argument('-v', '--verbose', action
='store_true')
475 parser
.add_argument('-n', '--no-runner', action
='store_true')
476 args
, unittest_args
= parser
.parse_known_args()
480 NsrHandlerTestCase
.log_level
= logging
.DEBUG
if args
.verbose
else logging
.WARN
482 unittest
.main(testRunner
=runner
, argv
=[sys
.argv
[0]] + unittest_args
)
484 if __name__
== '__main__':