update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / rwlaunchpad / plugins / rwmonparam / rift / tasklets / rwmonparam / rwmonparam.py
1 """
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 @file rwmonparam.py
19 @author Varun Prasad (varun.prasad@riftio.com)
20 @date 01-Jul-2016
21
22 """
23
24 import asyncio
25
26 import gi
27 gi.require_version('RwDts', '1.0')
28 gi.require_version('RwLaunchpadYang', '1.0')
29
30 from gi.repository import (
31 RwDts as rwdts,
32 RwLaunchpadYang,
33 NsrYang,
34 ProtobufC)
35 import rift.mano.cloud
36 import rift.mano.dts as subscriber
37 import rift.tasklets
38 import concurrent.futures
39 from rift.mano.utils.project import (
40 ManoProject,
41 ProjectHandler,
42 )
43 from . import vnfr_core
44 from . import nsr_core
45
46
47 class MonParamProject(ManoProject):
48
49 def __init__(self, name, tasklet, **kw):
50 super(MonParamProject, self).__init__(tasklet.log, name)
51 self.update(tasklet)
52
53 self.vnfr_subscriber = None
54
55 self.vnfr_monitors = {}
56 self.nsr_monitors = {}
57 self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
58
59 # Needs to be moved to store once the DTS bug is resolved
60 # Gather all VNFRs
61 self.vnfrs = {}
62
63 self.vnfr_subscriber = subscriber.VnfrCatalogSubscriber.from_project(
64 self,
65 callback=self.handle_vnfr)
66 self.nsr_subsriber = subscriber.NsrCatalogSubscriber.from_project(
67 self,
68 callback=self.handle_nsr)
69
70 self._nsd_subscriber = subscriber.NsdCatalogSubscriber.from_project(self)
71 self._vnfd_subscriber = subscriber.VnfdCatalogSubscriber.from_project(self)
72
73 self.log.debug("Created DTS Api GI Object: %s", self.dts)
74
75 @asyncio.coroutine
76 def register (self):
77 self.log.debug("creating vnfr subscriber")
78 yield from self._nsd_subscriber.register()
79 yield from self._vnfd_subscriber.register()
80 yield from self.vnfr_subscriber.register()
81 yield from self.nsr_subsriber.register()
82
83
84 def deregister(self):
85 self.log.debug("De-register vnfr project {}".format(self.name))
86 self._nsd_subscriber.deregister()
87 self._vnfd_subscriber.deregister()
88 self.vnfr_subscriber.deregister()
89 self.nsr_subsriber.deregister()
90
91 def _unwrap(self, values, id_name):
92 try:
93 return values[0]
94 except KeyError:
95 self.log.exception("Unable to find the object with the given "
96 "ID {}".format(id_name))
97
98 def get_vnfd(self, vnfd_id):
99 values = [vnfd for vnfd in list(self._vnfd_subscriber.reg.get_xact_elements()) if vnfd.id == vnfd_id]
100 return self._unwrap(values, vnfd_id)
101
102 def get_nsd(self, nsd_id):
103 values = [nsd for nsd in list(self._nsd_subscriber.reg.get_xact_elements()) if nsd.id == nsd_id]
104 return self._unwrap(values, nsd_id)
105
106
107 def handle_vnfr(self, vnfr, action):
108 """Starts a monitoring parameter job for every VNFR that reaches
109 running state
110
111 Args:
112 vnfr (GiOBject): VNFR Gi object message from DTS
113 delete_mode (bool, optional): if set, stops and removes the monitor.
114 """
115
116 def vnfr_create():
117 # if vnfr.operational_status == "running" and vnfr.id not in self.vnfr_monitors:
118 vnfr_status = (vnfr.operational_status == "running" and
119 vnfr.config_status in ["configured", "config_not_needed"])
120
121 if vnfr_status and vnfr.id not in self.vnfr_monitors:
122
123 vnf_mon = vnfr_core.VnfMonitorDtsHandler.from_vnf_data(
124 self,
125 vnfr,
126 self.get_vnfd(vnfr.vnfd.id))
127
128 self.vnfr_monitors[vnfr.id] = vnf_mon
129 self.vnfrs[vnfr.id] = vnfr
130
131 @asyncio.coroutine
132 def task():
133 yield from vnf_mon.register()
134 if vnfr.nsr_id_ref in self.nsr_monitors:
135 vnf_mon.update_nsr_mon(self.nsr_monitors[vnfr.nsr_id_ref])
136 vnf_mon.start()
137 #self.update_nsrs(vnfr, action)
138
139 self.loop.create_task(task())
140
141
142 def vnfr_delete():
143 if vnfr.id in self.vnfr_monitors:
144 self.log.debug("VNFR %s deleted: Stopping vnfr monitoring", vnfr.id)
145 vnf_mon = self.vnfr_monitors.pop(vnfr.id)
146 vnf_mon.stop()
147 self.vnfrs.pop(vnfr.id)
148 #self.update_nsrs(vnfr, action)
149
150 if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE]:
151 vnfr_create()
152 elif action == rwdts.QueryAction.DELETE:
153 vnfr_delete()
154
155 def update_nsrs(self, vnfr, action):
156 if vnfr.nsr_id_ref not in self.nsr_monitors:
157 return
158
159 monitor = self.nsr_monitors[vnfr.nsr_id_ref]
160
161 if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE]:
162 @asyncio.coroutine
163 def update_vnfr():
164 yield from monitor.update([vnfr])
165
166 self.loop.create_task(update_vnfr())
167 elif action == rwdts.QueryAction.DELETE:
168 @asyncio.coroutine
169 def delete_vnfr():
170 try:
171 yield from monitor.delete([vnfr])
172 except Exception as e:
173 self.log.exception(str(e))
174
175 self.loop.create_task(delete_vnfr())
176
177
178
179 def handle_nsr(self, nsr, action):
180 """Callback for NSR opdata changes. Creates a publisher for every
181 NS that moves to config state.
182
183 Args:
184 nsr (RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr): Ns Opdata
185 action (rwdts.QueryAction): Action type of the change.
186 """
187
188 def nsr_create():
189 # TODO clean up the if-else mess, exception
190
191 success_state = (nsr.operational_status == "running" and
192 nsr.config_status == "configured")
193
194 if not success_state:
195 return
196
197 if nsr.ns_instance_config_ref in self.nsr_monitors:
198 return
199
200 constituent_vnfrs = []
201
202 for vnfr_id in nsr.constituent_vnfr_ref:
203 if (vnfr_id.vnfr_id in self.vnfrs):
204 vnfr_obj = self.vnfrs[vnfr_id.vnfr_id]
205 constituent_vnfrs.append(vnfr_obj)
206 else:
207 pass
208
209 nsr_mon = nsr_core.NsrMonitorDtsHandler(
210 self.log,
211 self.dts,
212 self.loop,
213 self,
214 nsr,
215 constituent_vnfrs
216 )
217 for vnfr_id in nsr.constituent_vnfr_ref:
218 if vnfr_id.vnfr_id in self.vnfr_monitors:
219 self.vnfr_monitors[vnfr_id.vnfr_id].update_nsr_mon(nsr_mon)
220
221 self.nsr_monitors[nsr.ns_instance_config_ref] = nsr_mon
222
223
224 @asyncio.coroutine
225 def task():
226 try:
227 yield from nsr_mon.register()
228 yield from nsr_mon.start()
229 except Exception as e:
230 self.log.exception(e)
231
232 self.loop.create_task(task())
233
234 def nsr_delete():
235 if nsr.ns_instance_config_ref in self.nsr_monitors:
236 nsr_mon = self.nsr_monitors.pop(nsr.ns_instance_config_ref)
237 nsr_mon.stop()
238
239 if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE]:
240 nsr_create()
241 elif action == rwdts.QueryAction.DELETE:
242 nsr_delete()
243
244
245 class MonitoringParameterTasklet(rift.tasklets.Tasklet):
246 """The main task of this Tasklet is to listen for VNFR changes and once the
247 VNFR hits the running state, triggers the monitor.
248 """
249 def __init__(self, *args, **kwargs):
250 try:
251 super().__init__(*args, **kwargs)
252 self.rwlog.set_category("rw-monitor-log")
253 except Exception as e:
254 self.log.exception(e)
255
256 self._project_handler = None
257 self.projects = {}
258
259 def start(self):
260 super().start()
261
262 self.log.info("Starting MonitoringParameterTasklet")
263 self.log.debug("Registering with dts")
264
265 self.dts = rift.tasklets.DTS(
266 self.tasklet_info,
267 NsrYang.get_schema(),
268 self.loop,
269 self.on_dts_state_change
270 )
271
272 def stop(self):
273 try:
274 self.dts.deinit()
275 except Exception as e:
276 self.log.exception(e)
277
278 @asyncio.coroutine
279 def init(self):
280 self.log.debug("creating project handler")
281 self.project_handler = ProjectHandler(self, MonParamProject)
282 self.project_handler.register()
283
284 @asyncio.coroutine
285 def run(self):
286 pass
287
288 @asyncio.coroutine
289 def on_dts_state_change(self, state):
290 """Handle DTS state change
291
292 Take action according to current DTS state to transition application
293 into the corresponding application state
294
295 Arguments
296 state - current dts state
297
298 """
299 switch = {
300 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
301 rwdts.State.CONFIG: rwdts.State.RUN,
302 }
303
304 handlers = {
305 rwdts.State.INIT: self.init,
306 rwdts.State.RUN: self.run,
307 }
308
309 # Transition application to next state
310 handler = handlers.get(state, None)
311 if handler is not None:
312 yield from handler()
313
314 # Transition dts to next state
315 next_state = switch.get(state, None)
316 if next_state is not None:
317 self.dts.handle.set_state(next_state)