Merge from OSM SO master
[osm/SO.git] / rwlaunchpad / plugins / rwmonparam / rift / tasklets / rwmonparam / rwmonparam.py
1 """
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 @file rwmonparam.py
19 @author Varun Prasad (varun.prasad@riftio.com)
20 @date 01-Jul-2016
21
22 """
23
24 import asyncio
25
26 import gi
27 gi.require_version('RwDts', '1.0')
28 gi.require_version('RwLaunchpadYang', '1.0')
29
30 from gi.repository import (
31 RwDts as rwdts,
32 RwLaunchpadYang,
33 ProtobufC)
34 import rift.mano.cloud
35 import rift.mano.dts as subscriber
36 import rift.tasklets
37 from rift.mano.utils.project import (
38 ManoProject,
39 ProjectHandler,
40 )
41
42 from . import vnfr_core
43 from . import nsr_core
44
45
46 class MonParamProject(ManoProject):
47
48 def __init__(self, name, tasklet, **kw):
49 super(MonParamProject, self).__init__(tasklet.log, name)
50 self.update(tasklet)
51
52 self.vnfr_subscriber = None
53 self.store = None
54
55 self.vnfr_monitors = {}
56 self.nsr_monitors = {}
57
58 # Needs to be moved to store once the DTS bug is resolved
59 self.vnfrs = {}
60
61 self.vnfr_subscriber = subscriber.VnfrCatalogSubscriber.from_project(
62 self,
63 callback=self.handle_vnfr)
64 self.nsr_subsriber = subscriber.NsrCatalogSubscriber.from_project(
65 self,
66 callback=self.handle_nsr)
67
68 self.store = subscriber.SubscriberStore.from_project(self)
69
70 self.log.debug("Created DTS Api GI Object: %s", self.dts)
71
72 @asyncio.coroutine
73 def register (self):
74 self.log.debug("creating vnfr subscriber")
75 yield from self.store.register()
76 yield from self.vnfr_subscriber.register()
77 yield from self.nsr_subsriber.register()
78
79 def deregister(self):
80 self.log.debug("De-register vnfr project {}".format(self.name))
81 #TODO:
82
83 def handle_vnfr(self, vnfr, action):
84 """Starts a monitoring parameter job for every VNFR that reaches
85 running state
86
87 Args:
88 vnfr (GiOBject): VNFR Gi object message from DTS
89 delete_mode (bool, optional): if set, stops and removes the monitor.
90 """
91
92 def vnfr_create():
93 if vnfr.config_status == "configured" and vnfr.id not in self.vnfr_monitors:
94
95 vnf_mon = vnfr_core.VnfMonitorDtsHandler.from_vnf_data(
96 self,
97 vnfr,
98 self.store.get_vnfd(vnfr.vnfd.id))
99
100 self.vnfr_monitors[vnfr.id] = vnf_mon
101 self.vnfrs[vnfr.id] = vnfr
102
103 @asyncio.coroutine
104 def task():
105 yield from vnf_mon.register()
106 vnf_mon.start()
107
108 self.loop.create_task(task())
109
110
111 def vnfr_delete():
112 if vnfr.id in self.vnfr_monitors:
113 self.log.debug("VNFR %s deleted: Stopping vnfr monitoring", vnfr.id)
114 vnf_mon = self.vnfr_monitors.pop(vnfr.id)
115 vnf_mon.stop()
116 self.vnfrs.pop(vnfr.id)
117
118 if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE]:
119 vnfr_create()
120 elif action == rwdts.QueryAction.DELETE:
121 vnfr_delete()
122
123
124 def handle_nsr(self, nsr, action):
125 """Callback for NSR opdata changes. Creates a publisher for every
126 NS that moves to config state.
127
128 Args:
129 nsr (RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr): Ns Opdata
130 action (rwdts.QueryAction): Action type of the change.
131 """
132 def nsr_create():
133 # if nsr.operational_status == "running" and nsr.ns_instance_config_ref not in self.nsr_monitors:
134 if nsr.config_status == "configured" and nsr.ns_instance_config_ref not in self.nsr_monitors:
135 nsr_mon = nsr_core.NsrMonitorDtsHandler(
136 self.log,
137 self.dts,
138 self.loop,
139 self,
140 nsr,
141 list(self.vnfrs.values()),
142 self.store
143 )
144
145 self.nsr_monitors[nsr.ns_instance_config_ref] = nsr_mon
146
147 @asyncio.coroutine
148 def task():
149 try:
150 yield from nsr_mon.register()
151 yield from nsr_mon.start()
152 except Exception as e:
153 self.log.exception("NSR {} monparam task failed: {}".
154 format(nsr.name_ref, e))
155
156 self.loop.create_task(task())
157
158
159
160 def nsr_delete():
161 if nsr.ns_instance_config_ref in self.nsr_monitors:
162 # if vnfr.operational_status == "running" and vnfr.id in self.vnfr_monitors:
163 nsr_mon = self.nsr_monitors.pop(nsr.ns_instance_config_ref)
164 nsr_mon.stop()
165
166 if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE]:
167 nsr_create()
168 elif action == rwdts.QueryAction.DELETE:
169 nsr_delete()
170
171
172 class MonitoringParameterTasklet(rift.tasklets.Tasklet):
173 """The main task of this Tasklet is to listen for VNFR changes and once the
174 VNFR hits the running state, triggers the monitor.
175 """
176 def __init__(self, *args, **kwargs):
177 try:
178 super().__init__(*args, **kwargs)
179 self.rwlog.set_category("rw-monitor-log")
180 except Exception as e:
181 self.log.exception(e)
182
183 self._project_handler = None
184 self.projects = {}
185
186 def start(self):
187 super().start()
188
189 self.log.info("Starting MonitoringParameterTasklet")
190 self.log.debug("Registering with dts")
191
192 self.dts = rift.tasklets.DTS(
193 self.tasklet_info,
194 RwLaunchpadYang.get_schema(),
195 self.loop,
196 self.on_dts_state_change
197 )
198
199 def stop(self):
200 try:
201 self.dts.deinit()
202 except Exception as e:
203 self.log.exception(e)
204
205 @asyncio.coroutine
206 def init(self):
207 self.log.debug("creating project handler")
208 self.project_handler = ProjectHandler(self, MonParamProject)
209 self.project_handler.register()
210
211 @asyncio.coroutine
212 def run(self):
213 pass
214
215 @asyncio.coroutine
216 def on_dts_state_change(self, state):
217 """Handle DTS state change
218
219 Take action according to current DTS state to transition application
220 into the corresponding application state
221
222 Arguments
223 state - current dts state
224
225 """
226 switch = {
227 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
228 rwdts.State.CONFIG: rwdts.State.RUN,
229 }
230
231 handlers = {
232 rwdts.State.INIT: self.init,
233 rwdts.State.RUN: self.run,
234 }
235
236 # Transition application to next state
237 handler = handlers.get(state, None)
238 if handler is not None:
239 yield from handler()
240
241 # Transition dts to next state
242 next_state = switch.get(state, None)
243 if next_state is not None:
244 self.dts.handle.set_state(next_state)