Merge from OSM SO master
[osm/SO.git] / rwlaunchpad / plugins / rwautoscaler / rift / tasklets / rwautoscaler / rwautoscaler.py
1 """
2 #
3 # Copyright 2016-2017 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 @file rwautoscaler.py
19 @author Varun Prasad (varun.prasad@riftio.com)
20 @date 01-Jul-2016
21
22 """
23 import asyncio
24 import collections
25
26 from . import engine
27 from . import subscribers as monp_subscriber
28
29 import gi
30 gi.require_version('RwDts', '1.0')
31 gi.require_version('RwLaunchpadYang', '1.0')
32 gi.require_version('NsrYang', '1.0')
33
34 from gi.repository import (
35 RwDts as rwdts,
36 NsrYang,
37 RwLaunchpadYang,
38 ProtobufC)
39 import rift.mano.cloud
40 import rift.mano.dts as subscriber
41 import rift.tasklets
42 from rift.mano.utils.project import (
43 ManoProject,
44 ProjectHandler,
45 )
46
47
48 class AutoScalerProject(ManoProject, engine.ScalingPolicy.Delegate):
49
50 def __init__(self, name, tasklet, **kw):
51 super(AutoScalerProject, self).__init__(tasklet.log, name)
52 self.update(tasklet)
53
54 self.store = None
55 self.monparam_store = None
56 self.nsr_sub = None
57 self.nsr_monp_subscribers = {}
58 self.instance_id_store = collections.defaultdict(list)
59
60 self.store = subscriber.SubscriberStore.from_project(self)
61 self.nsr_sub = subscriber.NsrCatalogSubscriber(self.log, self.dts, self.loop,
62 self, self.handle_nsr)
63
64 def deregister(self):
65 self.log.debug("De-register project {}".format(self.name))
66 self.nsr_sub.deregister()
67 self.store.deregister()
68
69
70 @asyncio.coroutine
71 def register (self):
72 self.log.debug("creating vnfr subscriber")
73 yield from self.store.register()
74 yield from self.nsr_sub.register()
75
76 def scale_in(self, scaling_group_name, nsr_id):
77 """Delegate callback
78
79 Args:
80 scaling_group_name (str): Scaling group name to be scaled in
81 nsr_id (str): NSR id
82
83 """
84 self.log.info("Sending a scaling-in request for {} in NSR: {}".format(
85 scaling_group_name,
86 nsr_id))
87
88 @asyncio.coroutine
89 def _scale_in():
90 instance_id = self.instance_id_store[(scaling_group_name, nsr_id)].pop()
91
92 # Trigger an rpc
93 rpc_ip = NsrYang.YangInput_Nsr_ExecScaleIn.from_dict({
94 'project_name': self.name,
95 'nsr_id_ref': nsr_id,
96 'instance_id': instance_id,
97 'scaling_group_name_ref': scaling_group_name})
98
99 rpc_out = yield from self.dts.query_rpc(
100 "/nsr:exec-scale-in",
101 0,
102 rpc_ip)
103
104 self.loop.create_task(_scale_in())
105
106 def scale_out(self, scaling_group_name, nsr_id):
107 """Delegate callback for scale out requests
108
109 Args:
110 scaling_group_name (str): Scaling group name
111 nsr_id (str): NSR ID
112 """
113 self.log.info("Sending a scaling-out request for {} in NSR: {}".format(
114 scaling_group_name,
115 nsr_id))
116
117 @asyncio.coroutine
118 def _scale_out():
119 # Trigger an rpc
120 rpc_ip = NsrYang.YangInput_Nsr_ExecScaleOut.from_dict({
121 'project_name': self.name,
122 'nsr_id_ref': nsr_id ,
123 'scaling_group_name_ref': scaling_group_name})
124
125 itr = yield from self.dts.query_rpc("/nsr:exec-scale-out", 0, rpc_ip)
126
127 key = (scaling_group_name, nsr_id)
128 for res in itr:
129 result = yield from res
130 rpc_out = result.result
131 self.instance_id_store[key].append(rpc_out.instance_id)
132
133 self.log.info("Created new scaling group {} with instance id {}".format(
134 scaling_group_name,
135 rpc_out.instance_id))
136
137 self.loop.create_task(_scale_out())
138
139
140 def handle_nsr(self, nsr, action):
141 """Callback for NSR opdata changes. Creates a publisher for every
142 NS that moves to config state.
143
144 Args:
145 nsr (RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr): Ns Opdata
146 action (rwdts.QueryAction): Action type of the change.
147 """
148 def nsr_create():
149 if nsr.config_status == "configured" and nsr.ns_instance_config_ref not in self.nsr_monp_subscribers:
150 nsr_id = nsr.ns_instance_config_ref
151 self.nsr_monp_subscribers[nsr_id] = []
152 nsd = self.store.get_nsd(nsr.nsd_ref)
153 @asyncio.coroutine
154 def task():
155 for scaling_group in nsd.scaling_group_descriptor:
156 for policy_cfg in scaling_group.scaling_policy:
157 policy = engine.ScalingPolicy(
158 self.log, self.dts, self.loop, self,
159 nsr.ns_instance_config_ref,
160 nsr.nsd_ref,
161 scaling_group.name,
162 policy_cfg,
163 self.store,
164 delegate=self)
165 self.nsr_monp_subscribers[nsr_id].append(policy)
166 yield from policy.register()
167
168 self.loop.create_task(task())
169
170
171 def nsr_delete():
172 if nsr.ns_instance_config_ref in self.nsr_monp_subscribers:
173 policies = self.nsr_monp_subscribers[nsr.ns_instance_config_ref]
174 for policy in policies:
175 policy.deregister()
176 del self.nsr_monp_subscribers[nsr.ns_instance_config_ref]
177
178 if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE]:
179 nsr_create()
180 elif action == rwdts.QueryAction.DELETE:
181 nsr_delete()
182
183
184 class AutoScalerTasklet(rift.tasklets.Tasklet):
185 """The main task of this Tasklet is to listen for NSR changes and once the
186 NSR is configured, ScalingPolicy is created.
187 """
188 def __init__(self, *args, **kwargs):
189
190 try:
191 super().__init__(*args, **kwargs)
192
193 self._project_handler = None
194 self.projects = {}
195
196 except Exception as e:
197 self.log.exception(e)
198
199 def start(self):
200 super().start()
201
202 self.log.debug("Registering with dts")
203
204 self.dts = rift.tasklets.DTS(
205 self.tasklet_info,
206 RwLaunchpadYang.get_schema(),
207 self.loop,
208 self.on_dts_state_change
209 )
210
211 self.log.debug("Created DTS Api GI Object: %s", self.dts)
212
213 def stop(self):
214 try:
215 self.dts.deinit()
216 except Exception as e:
217 self.log.exception(e)
218
219 @asyncio.coroutine
220 def init(self):
221 self.log.debug("creating project handler")
222 self.project_handler = ProjectHandler(self, AutoScalerProject)
223 self.project_handler.register()
224
225 @asyncio.coroutine
226 def run(self):
227 pass
228
229 @asyncio.coroutine
230 def on_dts_state_change(self, state):
231 """Handle DTS state change
232
233 Take action according to current DTS state to transition application
234 into the corresponding application state
235
236 Arguments
237 state - current dts state
238
239 """
240 switch = {
241 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
242 rwdts.State.CONFIG: rwdts.State.RUN,
243 }
244
245 handlers = {
246 rwdts.State.INIT: self.init,
247 rwdts.State.RUN: self.run,
248 }
249
250 # Transition application to next state
251 handler = handlers.get(state, None)
252 if handler is not None:
253 yield from handler()
254
255 # Transition dts to next state
256 next_state = switch.get(state, None)
257 if next_state is not None:
258 self.dts.handle.set_state(next_state)
259