+ SCALE_OUT_INPUT_XPATH = "I,/nsr:exec-scale-out"
+ SCALE_OUT_OUTPUT_XPATH = "O,/nsr:exec-scale-out"
+
+ ACTION = Enum('ACTION', 'SCALE_IN SCALE_OUT')
+
+ def __init__(self, log, dts, loop, nsm, callback=None):
+ super().__init__(log, dts, loop, nsm._project)
+ self._nsm = nsm
+ self.callback = callback
+ self.last_instance_id = defaultdict(int)
+
+ self._reg_in = None
+ self._reg_out = None
+
+ @asyncio.coroutine
+ def register(self):
+
+ def send_err_msg(err_msg, xact_info, ks_path, e=False):
+ xpath = ks_path.to_xpath(NsrYang.get_schema())
+ if e:
+ self._log.exception(err_msg)
+ else:
+ self._log.error(err_msg)
+ xact_info.send_error_xpath(RwTypes.RwStatus.FAILURE,
+ xpath,
+ err_msg)
+ xact_info.respond_xpath(rwdts.XactRspCode.NACK)
+
+ @asyncio.coroutine
+ def on_scale_in_prepare(xact_info, action, ks_path, msg):
+ assert action == rwdts.QueryAction.RPC
+
+ self._log.debug("Scale in called: {}".format(msg.as_dict()))
+ if not self.project.rpc_check(msg, xact_info):
+ return
+
+ try:
+ rpc_op = NsrYang.YangOutput_Nsr_ExecScaleIn.from_dict({
+ "instance_id": msg.instance_id})
+
+ nsr = self._nsm.nsrs[msg.nsr_id_ref]
+ if nsr.state != NetworkServiceRecordState.RUNNING:
+ errmsg = ("Unable to perform scaling action when NS {}({}) not in running state".
+ format(nsr.name, nsr.id))
+ send_err_msg(errmsg, xact_info, ks_path)
+ return
+
+ xact_info.respond_xpath(
+ rwdts.XactRspCode.ACK,
+ self.__class__.SCALE_IN_OUTPUT_XPATH,
+ rpc_op)
+
+ if self.callback:
+ self.callback(xact_info.xact, msg, self.ACTION.SCALE_IN)
+
+ except Exception as e:
+ errmsg = ("Exception doing scale in using {}: {}".
+ format(msg, e))
+ send_err_msg(errmsg, xact_info, ks_path, e=True)
+
+ @asyncio.coroutine
+ def on_scale_out_prepare(xact_info, action, ks_path, msg):
+ assert action == rwdts.QueryAction.RPC
+
+ self._log.debug("Scale out called: {}".format(msg.as_dict()))
+ if not self.project.rpc_check(msg, xact_info):
+ return
+
+ try:
+ scaling_group = msg.scaling_group_name_ref
+ if not msg.instance_id:
+ last_instance_id = self.last_instance_id[scale_group]
+ msg.instance_id = last_instance_id + 1
+ self.last_instance_id[scale_group] += 1
+
+ nsr = self._nsm.nsrs[msg.nsr_id_ref]
+ if nsr.state != NetworkServiceRecordState.RUNNING:
+ errmsg = ("Unable to perform scaling action when NS {}({}) not in running state".
+ format(nsr.name, nsr.id))
+ send_err_msg(errmsg, xact_info, ks_path)
+ return
+
+ rpc_op = NsrYang.YangOutput_Nsr_ExecScaleOut.from_dict({
+ "instance_id": msg.instance_id})
+
+ xact_info.respond_xpath(
+ rwdts.XactRspCode.ACK,
+ self.__class__.SCALE_OUT_OUTPUT_XPATH,
+ rpc_op)
+
+ if self.callback:
+ self.callback(xact_info.xact, msg, self.ACTION.SCALE_OUT)
+
+ except Exception as e:
+ errmsg = ("Exception doing scale in using {}: {}".
+ format(msg, e))
+ send_err_msg(errmsg, xact_info, ks_path, e=True)
+
+ self._reg_in = yield from self.dts.register(
+ xpath=self.__class__.SCALE_IN_INPUT_XPATH,
+ handler=rift.tasklets.DTS.RegistrationHandler(
+ on_prepare=on_scale_in_prepare),
+ flags=rwdts.Flag.PUBLISHER)
+
+ self._reg_out = yield from self.dts.register(
+ xpath=self.__class__.SCALE_OUT_INPUT_XPATH,
+ handler=rift.tasklets.DTS.RegistrationHandler(
+ on_prepare=on_scale_out_prepare),
+ flags=rwdts.Flag.PUBLISHER)
+
+ def deregister(self):
+ if self._reg_in:
+ self._reg_in.deregister()
+ self._reg_in = None
+
+ if self._reg_out:
+ self._reg_out.deregister()
+ self._reg_out = None
+
+
+class NsmProject(ManoProject):
+
+ def __init__(self, name, tasklet, **kw):
+ super(NsmProject, self).__init__(tasklet.log, name)
+ self.update(tasklet)