Merge branch 'v1.0'
[osm/SO.git] / rwlaunchpad / plugins / rwmonparam / rift / tasklets / rwmonparam / rwmonparam.py
1 """
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 @file rwmonparam.py
19 @author Varun Prasad (varun.prasad@riftio.com)
20 @date 01-Jul-2016
21
22 """
23
24 import asyncio
25
26 import gi
27 gi.require_version('RwDts', '1.0')
28 gi.require_version('RwLaunchpadYang', '1.0')
29
30 from gi.repository import (
31 RwDts as rwdts,
32 RwLaunchpadYang,
33 ProtobufC)
34 import rift.mano.cloud
35 import rift.mano.dts as subscriber
36 import rift.tasklets
37
38 from . import vnfr_core
39 from . import nsr_core
40
41
42 class MonitoringParameterTasklet(rift.tasklets.Tasklet):
43 """The main task of this Tasklet is to listen for VNFR changes and once the
44 VNFR hits the running state, triggers the monitor.
45 """
46 def __init__(self, *args, **kwargs):
47 try:
48 super().__init__(*args, **kwargs)
49 self.rwlog.set_category("rw-monitor-log")
50 except Exception as e:
51 self.log.exception(e)
52
53 self.vnfr_subscriber = None
54 self.store = None
55
56 self.vnfr_monitors = {}
57 self.nsr_monitors = {}
58
59 # Needs to be moved to store once the DTS bug is resolved
60 self.vnfrs = {}
61
62 def start(self):
63 super().start()
64
65 self.log.info("Starting MonitoringParameterTasklet")
66 self.log.debug("Registering with dts")
67
68 self.dts = rift.tasklets.DTS(
69 self.tasklet_info,
70 RwLaunchpadYang.get_schema(),
71 self.loop,
72 self.on_dts_state_change
73 )
74
75 self.vnfr_subscriber = subscriber.VnfrCatalogSubscriber.from_tasklet(
76 self,
77 callback=self.handle_vnfr)
78 self.nsr_subsriber = subscriber.NsrCatalogSubscriber.from_tasklet(
79 self,
80 callback=self.handle_nsr)
81
82 self.store = subscriber.SubscriberStore.from_tasklet(self)
83
84 self.log.debug("Created DTS Api GI Object: %s", self.dts)
85
86 def stop(self):
87 try:
88 self.dts.deinit()
89 except Exception as e:
90 self.log.exception(e)
91
92 @asyncio.coroutine
93 def init(self):
94 self.log.debug("creating vnfr subscriber")
95 yield from self.store.register()
96 yield from self.vnfr_subscriber.register()
97 yield from self.nsr_subsriber.register()
98
99 @asyncio.coroutine
100 def run(self):
101 pass
102
103 @asyncio.coroutine
104 def on_dts_state_change(self, state):
105 """Handle DTS state change
106
107 Take action according to current DTS state to transition application
108 into the corresponding application state
109
110 Arguments
111 state - current dts state
112
113 """
114 switch = {
115 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
116 rwdts.State.CONFIG: rwdts.State.RUN,
117 }
118
119 handlers = {
120 rwdts.State.INIT: self.init,
121 rwdts.State.RUN: self.run,
122 }
123
124 # Transition application to next state
125 handler = handlers.get(state, None)
126 if handler is not None:
127 yield from handler()
128
129 # Transition dts to next state
130 next_state = switch.get(state, None)
131 if next_state is not None:
132 self.dts.handle.set_state(next_state)
133
134 def handle_vnfr(self, vnfr, action):
135 """Starts a monitoring parameter job for every VNFR that reaches
136 running state
137
138 Args:
139 vnfr (GiOBject): VNFR Gi object message from DTS
140 delete_mode (bool, optional): if set, stops and removes the monitor.
141 """
142
143 def vnfr_create():
144 # if vnfr.operational_status == "running" and vnfr.id not in self.vnfr_monitors:
145 if vnfr.config_status == "configured" and vnfr.id not in self.vnfr_monitors:
146
147 vnf_mon = vnfr_core.VnfMonitorDtsHandler.from_vnf_data(
148 self,
149 vnfr,
150 self.store.get_vnfd(vnfr.vnfd.id))
151
152 self.vnfr_monitors[vnfr.id] = vnf_mon
153 self.vnfrs[vnfr.id] = vnfr
154
155 @asyncio.coroutine
156 def task():
157 yield from vnf_mon.register()
158 vnf_mon.start()
159
160 self.loop.create_task(task())
161
162
163 def vnfr_delete():
164 if vnfr.id in self.vnfr_monitors:
165 self.log.debug("VNFR %s deleted: Stopping vnfr monitoring", vnfr.id)
166 vnf_mon = self.vnfr_monitors.pop(vnfr.id)
167 vnf_mon.stop()
168 self.vnfrs.pop(vnfr.id)
169
170 if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE]:
171 vnfr_create()
172 elif action == rwdts.QueryAction.DELETE:
173 vnfr_delete()
174
175
176 def handle_nsr(self, nsr, action):
177 """Callback for NSR opdata changes. Creates a publisher for every
178 NS that moves to config state.
179
180 Args:
181 nsr (RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr): Ns Opdata
182 action (rwdts.QueryAction): Action type of the change.
183 """
184 def nsr_create():
185 # if nsr.operational_status == "running" and nsr.ns_instance_config_ref not in self.nsr_monitors:
186 if nsr.config_status == "configured" and nsr.ns_instance_config_ref not in self.nsr_monitors:
187 nsr_mon = nsr_core.NsrMonitorDtsHandler(
188 self.log,
189 self.dts,
190 self.loop,
191 nsr,
192 list(self.vnfrs.values()),
193 self.store
194 )
195
196 self.nsr_monitors[nsr.ns_instance_config_ref] = nsr_mon
197
198 @asyncio.coroutine
199 def task():
200 yield from nsr_mon.register()
201 yield from nsr_mon.start()
202
203 self.loop.create_task(task())
204
205
206
207 def nsr_delete():
208 if nsr.ns_instance_config_ref in self.nsr_monitors:
209 # if vnfr.operational_status == "running" and vnfr.id in self.vnfr_monitors:
210 nsr_mon = self.nsr_monitors.pop(nsr.ns_instance_config_ref)
211 nsr_mon.stop()
212
213 if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE]:
214 nsr_create()
215 elif action == rwdts.QueryAction.DELETE:
216 nsr_delete()