affa579b398e5425d4020930433e149c8838fbd5
[osm/SO.git] / rwlaunchpad / plugins / rwautoscaler / rift / tasklets / rwautoscaler / rwautoscaler.py
1 """
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 @file rwautoscaler.py
19 @author Varun Prasad (varun.prasad@riftio.com)
20 @date 01-Jul-2016
21
22 """
23 import asyncio
24 import collections
25
26 from . import engine
27 from . import subscribers as monp_subscriber
28
29 import gi
30 gi.require_version('RwDts', '1.0')
31 gi.require_version('RwLaunchpadYang', '1.0')
32 gi.require_version('NsrYang', '1.0')
33
34 from gi.repository import (
35 RwDts as rwdts,
36 NsrYang,
37 RwLaunchpadYang,
38 ProtobufC)
39 import rift.mano.cloud
40 import rift.mano.dts as subscriber
41 import rift.tasklets
42
43
44
45 class AutoScalerTasklet(rift.tasklets.Tasklet, engine.ScalingPolicy.Delegate):
46 """The main task of this Tasklet is to listen for NSR changes and once the
47 NSR is configured, ScalingPolicy is created.
48 """
49 def __init__(self, *args, **kwargs):
50
51 try:
52 super().__init__(*args, **kwargs)
53 self.store = None
54 self.monparam_store = None
55
56 self.nsr_sub = None
57 self.nsr_monp_subscribers = {}
58 self.instance_id_store = collections.defaultdict(list)
59
60 except Exception as e:
61 self.log.exception(e)
62
63 def start(self):
64 super().start()
65
66 self.log.debug("Registering with dts")
67
68 self.dts = rift.tasklets.DTS(
69 self.tasklet_info,
70 RwLaunchpadYang.get_schema(),
71 self.loop,
72 self.on_dts_state_change
73 )
74
75 self.store = subscriber.SubscriberStore.from_tasklet(self)
76 self.nsr_sub = subscriber.NsrCatalogSubscriber(self.log, self.dts, self.loop, self.handle_nsr)
77
78 self.log.debug("Created DTS Api GI Object: %s", self.dts)
79
80 def stop(self):
81 try:
82 self.dts.deinit()
83 except Exception as e:
84 self.log.exception(e)
85
86 @asyncio.coroutine
87 def init(self):
88 self.log.debug("creating vnfr subscriber")
89 yield from self.store.register()
90 yield from self.nsr_sub.register()
91
92 @asyncio.coroutine
93 def run(self):
94 pass
95
96 @asyncio.coroutine
97 def on_dts_state_change(self, state):
98 """Handle DTS state change
99
100 Take action according to current DTS state to transition application
101 into the corresponding application state
102
103 Arguments
104 state - current dts state
105
106 """
107 switch = {
108 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
109 rwdts.State.CONFIG: rwdts.State.RUN,
110 }
111
112 handlers = {
113 rwdts.State.INIT: self.init,
114 rwdts.State.RUN: self.run,
115 }
116
117 # Transition application to next state
118 handler = handlers.get(state, None)
119 if handler is not None:
120 yield from handler()
121
122 # Transition dts to next state
123 next_state = switch.get(state, None)
124 if next_state is not None:
125 self.dts.handle.set_state(next_state)
126
127 def scale_in(self, scaling_group_name, nsr_id):
128 """Delegate callback
129
130 Args:
131 scaling_group_name (str): Scaling group name to be scaled in
132 nsr_id (str): NSR id
133
134 """
135 self.log.info("Sending a scaling-in request for {} in NSR: {}".format(
136 scaling_group_name,
137 nsr_id))
138
139 @asyncio.coroutine
140 def _scale_in():
141 instance_id = self.instance_id_store[(scaling_group_name, nsr_id)].pop()
142
143 # Trigger an rpc
144 rpc_ip = NsrYang.YangInput_Nsr_ExecScaleIn.from_dict({
145 'nsr_id_ref': nsr_id,
146 'instance_id': instance_id,
147 'scaling_group_name_ref': scaling_group_name})
148
149 rpc_out = yield from self.dts.query_rpc(
150 "/nsr:exec-scale-in",
151 0,
152 rpc_ip)
153
154 self.loop.create_task(_scale_in())
155
156 def scale_out(self, scaling_group_name, nsr_id):
157 """Delegate callback for scale out requests
158
159 Args:
160 scaling_group_name (str): Scaling group name
161 nsr_id (str): NSR ID
162 """
163 self.log.info("Sending a scaling-out request for {} in NSR: {}".format(
164 scaling_group_name,
165 nsr_id))
166
167 @asyncio.coroutine
168 def _scale_out():
169 # Trigger an rpc
170 rpc_ip = NsrYang.YangInput_Nsr_ExecScaleOut.from_dict({
171 'nsr_id_ref': nsr_id ,
172 'scaling_group_name_ref': scaling_group_name})
173
174 itr = yield from self.dts.query_rpc("/nsr:exec-scale-out", 0, rpc_ip)
175
176 key = (scaling_group_name, nsr_id)
177 for res in itr:
178 result = yield from res
179 rpc_out = result.result
180 self.instance_id_store[key].append(rpc_out.instance_id)
181
182 self.log.info("Created new scaling group {} with instance id {}".format(
183 scaling_group_name,
184 rpc_out.instance_id))
185
186 self.loop.create_task(_scale_out())
187
188
189 def handle_nsr(self, nsr, action):
190 """Callback for NSR opdata changes. Creates a publisher for every
191 NS that moves to config state.
192
193 Args:
194 nsr (RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr): Ns Opdata
195 action (rwdts.QueryAction): Action type of the change.
196 """
197 def nsr_create():
198 if nsr.config_status == "configured" and nsr.ns_instance_config_ref not in self.nsr_monp_subscribers:
199 nsr_id = nsr.ns_instance_config_ref
200 self.nsr_monp_subscribers[nsr_id] = []
201 nsd = self.store.get_nsd(nsr.nsd_ref)
202 @asyncio.coroutine
203 def task():
204 for scaling_group in nsd.scaling_group_descriptor:
205 for policy_cfg in scaling_group.scaling_policy:
206 policy = engine.ScalingPolicy(
207 self.log, self.dts, self.loop,
208 nsr.ns_instance_config_ref,
209 nsr.nsd_ref,
210 scaling_group.name,
211 policy_cfg,
212 self.store,
213 delegate=self)
214 self.nsr_monp_subscribers[nsr_id].append(policy)
215 yield from policy.register()
216
217 self.loop.create_task(task())
218
219
220 def nsr_delete():
221 if nsr.ns_instance_config_ref in self.nsr_monp_subscribers:
222 policies = self.nsr_monp_subscribers[nsr.ns_instance_config_ref]
223 for policy in policies:
224 policy.deregister()
225 del self.nsr_monp_subscribers[nsr.ns_instance_config_ref]
226
227 if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE]:
228 nsr_create()
229 elif action == rwdts.QueryAction.DELETE:
230 nsr_delete()