1741a58469b81059295f3f456b21d72d8fd54e15
[osm/SO.git] / rwlaunchpad / plugins / rwautoscaler / rift / tasklets / rwautoscaler / rwautoscaler.py
1 """
2 #
3 # Copyright 2016-2017 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 @file rwautoscaler.py
19 @author Varun Prasad (varun.prasad@riftio.com)
20 @date 01-Jul-2016
21
22 """
23 import asyncio
24 import collections
25
26 from . import engine
27 from . import subscribers as monp_subscriber
28
29 import gi
30 gi.require_version('RwDts', '1.0')
31 gi.require_version('RwLaunchpadYang', '1.0')
32 gi.require_version('NsrYang', '1.0')
33
34 from gi.repository import (
35 RwDts as rwdts,
36 NsrYang,
37 RwLaunchpadYang,
38 ProtobufC)
39 import rift.mano.cloud
40 import rift.mano.dts as subscriber
41 import rift.tasklets
42 from rift.mano.utils.project import (
43 ManoProject,
44 ProjectHandler,
45 )
46
47 class AutoScalerProject(ManoProject, engine.ScalingPolicy.Delegate):
48
49 def __init__(self, name, tasklet, **kw):
50 super(AutoScalerProject, self).__init__(tasklet.log, name)
51 self.update(tasklet)
52
53 self.store = None
54 self.monparam_store = None
55 self.nsr_sub = None
56 self.nsr_monp_subscribers = {}
57 self.instance_id_store = collections.defaultdict(list)
58
59 self.store = subscriber.SubscriberStore.from_project(self)
60 self.nsr_sub = subscriber.NsrCatalogSubscriber(self.log, self.dts, self.loop,
61 self, self.handle_nsr)
62
63 def deregister(self):
64 self.log.debug("De-register project {}".format(self.name))
65 self.nsr_sub.deregister()
66 self.store.deregister()
67
68
69 @asyncio.coroutine
70 def register (self):
71 self.log.debug("creating vnfr subscriber")
72 yield from self.store.register()
73 yield from self.nsr_sub.register()
74
75 def scale_in(self, scaling_group_name, nsr_id, instance_id):
76 """Delegate callback
77
78 Args:
79 scaling_group_name (str): Scaling group name to be scaled in
80 nsr_id (str): NSR id
81 instance_id (str): Instance id of the scaling group
82
83 """
84 self.log.info("Sending a scaling-in request for {} in NSR: {}".format(
85 scaling_group_name,
86 nsr_id))
87
88 @asyncio.coroutine
89 def _scale_in():
90
91 # Purposely ignore passed instance_id
92 instance_id_ = self.instance_id_store[(scaling_group_name, nsr_id)].pop()
93 # Trigger an rpc
94 rpc_ip = NsrYang.YangInput_Nsr_ExecScaleIn.from_dict({
95 'project_name': self.name,
96 'nsr_id_ref': nsr_id,
97 'instance_id': instance_id_,
98 'scaling_group_name_ref': scaling_group_name})
99
100 rpc_out = yield from self.dts.query_rpc(
101 "/nsr:exec-scale-in",
102 0,
103 rpc_ip)
104
105 # Check for existing scaled-out VNFs if any.
106 if len(self.instance_id_store):
107 self.loop.create_task(_scale_in())
108
109 def scale_out(self, scaling_group_name, nsr_id):
110 """Delegate callback for scale out requests
111
112 Args:
113 scaling_group_name (str): Scaling group name
114 nsr_id (str): NSR ID
115 """
116 self.log.info("Sending a scaling-out request for {} in NSR: {}".format(
117 scaling_group_name,
118 nsr_id))
119
120 @asyncio.coroutine
121 def _scale_out():
122 # Trigger an rpc
123 rpc_ip = NsrYang.YangInput_Nsr_ExecScaleOut.from_dict({
124 'project_name': self.name,
125 'nsr_id_ref': nsr_id ,
126 'scaling_group_name_ref': scaling_group_name})
127
128 itr = yield from self.dts.query_rpc("/nsr:exec-scale-out", 0, rpc_ip)
129
130 key = (scaling_group_name, nsr_id)
131 for res in itr:
132 result = yield from res
133 rpc_out = result.result
134 self.instance_id_store[key].append(rpc_out.instance_id)
135
136 self.log.info("Created new scaling group {} with instance id {}".format(
137 scaling_group_name,
138 rpc_out.instance_id))
139
140 self.loop.create_task(_scale_out())
141
142
143 def handle_nsr(self, nsr, action):
144 """Callback for NSR opdata changes. Creates a publisher for every
145 NS that moves to config state.
146
147 Args:
148 nsr (RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr): Ns Opdata
149 action (rwdts.QueryAction): Action type of the change.
150 """
151 def nsr_create():
152 if nsr.config_status == "configured" and nsr.ns_instance_config_ref not in self.nsr_monp_subscribers:
153 nsr_id = nsr.ns_instance_config_ref
154 self.nsr_monp_subscribers[nsr_id] = []
155 nsd = self.store.get_nsd(nsr.nsd_ref)
156 self.log.debug ("Creating a scaling policy monitor for NSR: {}".format(
157 nsr_id))
158
159 @asyncio.coroutine
160 def task():
161 for scaling_group in nsd.scaling_group_descriptor:
162 for policy_cfg in scaling_group.scaling_policy:
163 policy = engine.ScalingPolicy(
164 self.log, self.dts, self.loop, self,
165 nsr.ns_instance_config_ref,
166 nsr.nsd_ref,
167 scaling_group.name,
168 policy_cfg,
169 self.store,
170 delegate=self)
171 self.nsr_monp_subscribers[nsr_id].append(policy)
172 yield from policy.register()
173 self.log.debug ("Started a scaling policy monitor for NSR: {}".format(
174 nsr_id))
175
176
177 self.loop.create_task(task())
178
179
180 def nsr_delete():
181 if nsr.ns_instance_config_ref in self.nsr_monp_subscribers:
182 policies = self.nsr_monp_subscribers[nsr.ns_instance_config_ref]
183 for policy in policies:
184 policy.deregister()
185 del self.nsr_monp_subscribers[nsr.ns_instance_config_ref]
186 self.log.debug ("Deleted the scaling policy monitor for NSD: {}".format(
187 nsr.ns_instance_config_ref))
188
189
190 if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE]:
191 nsr_create()
192 elif action == rwdts.QueryAction.DELETE:
193 nsr_delete()
194
195
196 class AutoScalerTasklet(rift.tasklets.Tasklet):
197 """The main task of this Tasklet is to listen for NSR changes and once the
198 NSR is configured, ScalingPolicy is created.
199 """
200 def __init__(self, *args, **kwargs):
201
202 try:
203 super().__init__(*args, **kwargs)
204 self.rwlog.set_category("rw-mano-log")
205
206 self._project_handler = None
207 self.projects = {}
208
209 except Exception as e:
210 self.log.exception(e)
211
212 def start(self):
213 super().start()
214
215 self.log.debug("Registering with dts")
216
217 self.dts = rift.tasklets.DTS(
218 self.tasklet_info,
219 RwLaunchpadYang.get_schema(),
220 self.loop,
221 self.on_dts_state_change
222 )
223
224 self.log.debug("Created DTS Api GI Object: %s", self.dts)
225
226 def stop(self):
227 try:
228 self.dts.deinit()
229 except Exception as e:
230 self.log.exception(e)
231
232 @asyncio.coroutine
233 def init(self):
234 self.log.debug("creating project handler")
235 self.project_handler = ProjectHandler(self, AutoScalerProject)
236 self.project_handler.register()
237
238 @asyncio.coroutine
239 def run(self):
240 pass
241
242 @asyncio.coroutine
243 def on_dts_state_change(self, state):
244 """Handle DTS state change
245
246 Take action according to current DTS state to transition application
247 into the corresponding application state
248
249 Arguments
250 state - current dts state
251
252 """
253 switch = {
254 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
255 rwdts.State.CONFIG: rwdts.State.RUN,
256 }
257
258 handlers = {
259 rwdts.State.INIT: self.init,
260 rwdts.State.RUN: self.run,
261 }
262
263 # Transition application to next state
264 handler = handlers.get(state, None)
265 if handler is not None:
266 yield from handler()
267
268 # Transition dts to next state
269 next_state = switch.get(state, None)
270 if next_state is not None:
271 self.dts.handle.set_state(next_state)
272