3 # Copyright 2016 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
19 @author Varun Prasad (varun.prasad@riftio.com)
27 from . import subscribers
as monp_subscriber
30 gi
.require_version('RwDts', '1.0')
31 gi
.require_version('RwLaunchpadYang', '1.0')
32 gi
.require_version('NsrYang', '1.0')
34 from gi
.repository
import (
39 import rift
.mano
.cloud
40 import rift
.mano
.dts
as subscriber
45 class AutoScalerTasklet(rift
.tasklets
.Tasklet
, engine
.ScalingPolicy
.Delegate
):
46 """The main task of this Tasklet is to listen for NSR changes and once the
47 NSR is configured, ScalingPolicy is created.
49 def __init__(self
, *args
, **kwargs
):
52 super().__init
__(*args
, **kwargs
)
54 self
.monparam_store
= None
57 self
.nsr_monp_subscribers
= {}
58 self
.instance_id_store
= collections
.defaultdict(list)
60 except Exception as e
:
66 self
.log
.debug("Registering with dts")
68 self
.dts
= rift
.tasklets
.DTS(
70 RwLaunchpadYang
.get_schema(),
72 self
.on_dts_state_change
75 self
.store
= subscriber
.SubscriberStore
.from_tasklet(self
)
76 self
.nsr_sub
= subscriber
.NsrCatalogSubscriber(self
.log
, self
.dts
, self
.loop
, self
.handle_nsr
)
78 self
.log
.debug("Created DTS Api GI Object: %s", self
.dts
)
83 except Exception as e
:
88 self
.log
.debug("creating vnfr subscriber")
89 yield from self
.store
.register()
90 yield from self
.nsr_sub
.register()
97 def on_dts_state_change(self
, state
):
98 """Handle DTS state change
100 Take action according to current DTS state to transition application
101 into the corresponding application state
104 state - current dts state
108 rwdts
.State
.INIT
: rwdts
.State
.REGN_COMPLETE
,
109 rwdts
.State
.CONFIG
: rwdts
.State
.RUN
,
113 rwdts
.State
.INIT
: self
.init
,
114 rwdts
.State
.RUN
: self
.run
,
117 # Transition application to next state
118 handler
= handlers
.get(state
, None)
119 if handler
is not None:
122 # Transition dts to next state
123 next_state
= switch
.get(state
, None)
124 if next_state
is not None:
125 self
.dts
.handle
.set_state(next_state
)
127 def scale_in(self
, scaling_group_name
, nsr_id
):
131 scaling_group_name (str): Scaling group name to be scaled in
135 self
.log
.info("Sending a scaling-in request for {} in NSR: {}".format(
141 instance_id
= self
.instance_id_store
[(scaling_group_name
, nsr_id
)].pop()
144 rpc_ip
= NsrYang
.YangInput_Nsr_ExecScaleIn
.from_dict({
145 'nsr_id_ref': nsr_id
,
146 'instance_id': instance_id
,
147 'scaling_group_name_ref': scaling_group_name
})
149 rpc_out
= yield from self
.dts
.query_rpc(
150 "/nsr:exec-scale-in",
154 self
.loop
.create_task(_scale_in())
156 def scale_out(self
, scaling_group_name
, nsr_id
):
157 """Delegate callback for scale out requests
160 scaling_group_name (str): Scaling group name
163 self
.log
.info("Sending a scaling-out request for {} in NSR: {}".format(
170 rpc_ip
= NsrYang
.YangInput_Nsr_ExecScaleOut
.from_dict({
171 'nsr_id_ref': nsr_id
,
172 'scaling_group_name_ref': scaling_group_name
})
174 itr
= yield from self
.dts
.query_rpc("/nsr:exec-scale-out", 0, rpc_ip
)
176 key
= (scaling_group_name
, nsr_id
)
178 result
= yield from res
179 rpc_out
= result
.result
180 self
.instance_id_store
[key
].append(rpc_out
.instance_id
)
182 self
.log
.info("Created new scaling group {} with instance id {}".format(
184 rpc_out
.instance_id
))
186 self
.loop
.create_task(_scale_out())
189 def handle_nsr(self
, nsr
, action
):
190 """Callback for NSR opdata changes. Creates a publisher for every
191 NS that moves to config state.
194 nsr (RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr): Ns Opdata
195 action (rwdts.QueryAction): Action type of the change.
198 if nsr
.config_status
== "configured" and nsr
.ns_instance_config_ref
not in self
.nsr_monp_subscribers
:
199 nsr_id
= nsr
.ns_instance_config_ref
200 self
.nsr_monp_subscribers
[nsr_id
] = []
201 nsd
= self
.store
.get_nsd(nsr
.nsd_ref
)
204 for scaling_group
in nsd
.scaling_group_descriptor
:
205 for policy_cfg
in scaling_group
.scaling_policy
:
206 policy
= engine
.ScalingPolicy(
207 self
.log
, self
.dts
, self
.loop
,
208 nsr
.ns_instance_config_ref
,
214 self
.nsr_monp_subscribers
[nsr_id
].append(policy
)
215 yield from policy
.register()
217 self
.loop
.create_task(task())
221 if nsr
.ns_instance_config_ref
in self
.nsr_monp_subscribers
:
222 policies
= self
.nsr_monp_subscribers
[nsr
.ns_instance_config_ref
]
223 for policy
in policies
:
225 del self
.nsr_monp_subscribers
[nsr
.ns_instance_config_ref
]
227 if action
in [rwdts
.QueryAction
.CREATE
, rwdts
.QueryAction
.UPDATE
]:
229 elif action
== rwdts
.QueryAction
.DELETE
: