0cb3e94ee651879a6b41f13d8368e4bf82ba2941
3 # Copyright 2016 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
19 @author Varun Prasad (varun.prasad@riftio.com)
27 gi
.require_version('RwDts', '1.0')
28 gi
.require_version('RwLaunchpadYang', '1.0')
30 from gi
.repository
import (
35 import rift
.mano
.cloud
36 import rift
.mano
.dts
as subscriber
38 import concurrent
.futures
39 from rift
.mano
.utils
.project
import (
43 from . import vnfr_core
44 from . import nsr_core
47 class MonParamProject(ManoProject
):
49 def __init__(self
, name
, tasklet
, **kw
):
50 super(MonParamProject
, self
).__init
__(tasklet
.log
, name
)
53 self
.vnfr_subscriber
= None
55 self
.vnfr_monitors
= {}
56 self
.nsr_monitors
= {}
57 self
.executor
= concurrent
.futures
.ThreadPoolExecutor(max_workers
=1)
59 # Needs to be moved to store once the DTS bug is resolved
63 self
.vnfr_subscriber
= subscriber
.VnfrCatalogSubscriber
.from_project(
65 callback
=self
.handle_vnfr
)
66 self
.nsr_subsriber
= subscriber
.NsrCatalogSubscriber
.from_project(
68 callback
=self
.handle_nsr
)
70 self
._nsd
_subscriber
= subscriber
.NsdCatalogSubscriber
.from_project(self
)
71 self
._vnfd
_subscriber
= subscriber
.VnfdCatalogSubscriber
.from_project(self
)
73 self
.log
.debug("Created DTS Api GI Object: %s", self
.dts
)
77 self
.log
.debug("creating vnfr subscriber")
78 yield from self
._nsd
_subscriber
.register()
79 yield from self
._vnfd
_subscriber
.register()
80 yield from self
.vnfr_subscriber
.register()
81 yield from self
.nsr_subsriber
.register()
85 self
.log
.debug("De-register vnfr project {}".format(self
.name
))
86 self
._nsd
_subscriber
.deregister()
87 self
._vnfd
_subscriber
.deregister()
88 self
.vnfr_subscriber
.deregister()
89 self
.nsr_subsriber
.deregister()
91 def _unwrap(self
, values
, id_name
):
95 self
.log
.exception("Unable to find the object with the given "
96 "ID {}".format(id_name
))
98 def get_vnfd(self
, vnfd_id
):
99 values
= [vnfd
for vnfd
in list(self
._vnfd
_subscriber
.reg
.get_xact_elements()) if vnfd
.id == vnfd_id
]
100 return self
._unwrap
(values
, vnfd_id
)
102 def get_nsd(self
, nsd_id
):
103 values
= [nsd
for nsd
in list(self
._nsd
_subscriber
.reg
.get_xact_elements()) if nsd
.id == nsd_id
]
104 return self
._unwrap
(values
, nsd_id
)
107 def handle_vnfr(self
, vnfr
, action
):
108 """Starts a monitoring parameter job for every VNFR that reaches
112 vnfr (GiOBject): VNFR Gi object message from DTS
113 delete_mode (bool, optional): if set, stops and removes the monitor.
117 # if vnfr.operational_status == "running" and vnfr.id not in self.vnfr_monitors:
118 vnfr_status
= (vnfr
.operational_status
== "running" and
119 vnfr
.config_status
in ["configured", "config_not_needed"])
121 if vnfr_status
and vnfr
.id not in self
.vnfr_monitors
:
123 vnf_mon
= vnfr_core
.VnfMonitorDtsHandler
.from_vnf_data(
126 self
.get_vnfd(vnfr
.vnfd
.id))
128 self
.vnfr_monitors
[vnfr
.id] = vnf_mon
129 self
.vnfrs
[vnfr
.id] = vnfr
133 yield from vnf_mon
.register()
134 if vnfr
.nsr_id_ref
in self
.nsr_monitors
:
135 vnf_mon
.update_nsr_mon(self
.nsr_monitors
[vnfr
.nsr_id_ref
])
137 #self.update_nsrs(vnfr, action)
139 self
.loop
.create_task(task())
143 if vnfr
.id in self
.vnfr_monitors
:
144 self
.log
.debug("VNFR %s deleted: Stopping vnfr monitoring", vnfr
.id)
145 vnf_mon
= self
.vnfr_monitors
.pop(vnfr
.id)
147 self
.vnfrs
.pop(vnfr
.id)
148 #self.update_nsrs(vnfr, action)
150 if action
in [rwdts
.QueryAction
.CREATE
, rwdts
.QueryAction
.UPDATE
]:
152 elif action
== rwdts
.QueryAction
.DELETE
:
155 def update_nsrs(self
, vnfr
, action
):
156 if vnfr
.nsr_id_ref
not in self
.nsr_monitors
:
159 monitor
= self
.nsr_monitors
[vnfr
.nsr_id_ref
]
161 if action
in [rwdts
.QueryAction
.CREATE
, rwdts
.QueryAction
.UPDATE
]:
164 yield from monitor
.update([vnfr
])
166 self
.loop
.create_task(update_vnfr())
167 elif action
== rwdts
.QueryAction
.DELETE
:
171 yield from monitor
.delete([vnfr
])
172 except Exception as e
:
173 self
.log
.exception(str(e
))
175 self
.loop
.create_task(delete_vnfr())
179 def handle_nsr(self
, nsr
, action
):
180 """Callback for NSR opdata changes. Creates a publisher for every
181 NS that moves to config state.
184 nsr (RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr): Ns Opdata
185 action (rwdts.QueryAction): Action type of the change.
189 # TODO clean up the if-else mess, exception
191 success_state
= (nsr
.operational_status
== "running" and
192 nsr
.config_status
== "configured")
194 if not success_state
:
197 if nsr
.ns_instance_config_ref
in self
.nsr_monitors
:
200 constituent_vnfrs
= []
202 for vnfr_id
in nsr
.constituent_vnfr_ref
:
203 if (vnfr_id
.vnfr_id
in self
.vnfrs
):
204 vnfr_obj
= self
.vnfrs
[vnfr_id
.vnfr_id
]
205 constituent_vnfrs
.append(vnfr_obj
)
209 nsr_mon
= nsr_core
.NsrMonitorDtsHandler(
217 for vnfr_id
in nsr
.constituent_vnfr_ref
:
218 if vnfr_id
.vnfr_id
in self
.vnfr_monitors
:
219 self
.vnfr_monitors
[vnfr_id
.vnfr_id
].update_nsr_mon(nsr_mon
)
221 self
.nsr_monitors
[nsr
.ns_instance_config_ref
] = nsr_mon
227 yield from nsr_mon
.register()
228 yield from nsr_mon
.start()
229 except Exception as e
:
230 self
.log
.exception(e
)
232 self
.loop
.create_task(task())
235 if nsr
.ns_instance_config_ref
in self
.nsr_monitors
:
236 nsr_mon
= self
.nsr_monitors
.pop(nsr
.ns_instance_config_ref
)
239 if action
in [rwdts
.QueryAction
.CREATE
, rwdts
.QueryAction
.UPDATE
]:
241 elif action
== rwdts
.QueryAction
.DELETE
:
245 class MonitoringParameterTasklet(rift
.tasklets
.Tasklet
):
246 """The main task of this Tasklet is to listen for VNFR changes and once the
247 VNFR hits the running state, triggers the monitor.
249 def __init__(self
, *args
, **kwargs
):
251 super().__init
__(*args
, **kwargs
)
252 self
.rwlog
.set_category("rw-monitor-log")
253 except Exception as e
:
254 self
.log
.exception(e
)
256 self
._project
_handler
= None
262 self
.log
.info("Starting MonitoringParameterTasklet")
263 self
.log
.debug("Registering with dts")
265 self
.dts
= rift
.tasklets
.DTS(
267 NsrYang
.get_schema(),
269 self
.on_dts_state_change
275 except Exception as e
:
276 self
.log
.exception(e
)
280 self
.log
.debug("creating project handler")
281 self
.project_handler
= ProjectHandler(self
, MonParamProject
)
282 self
.project_handler
.register()
289 def on_dts_state_change(self
, state
):
290 """Handle DTS state change
292 Take action according to current DTS state to transition application
293 into the corresponding application state
296 state - current dts state
300 rwdts
.State
.INIT
: rwdts
.State
.REGN_COMPLETE
,
301 rwdts
.State
.CONFIG
: rwdts
.State
.RUN
,
305 rwdts
.State
.INIT
: self
.init
,
306 rwdts
.State
.RUN
: self
.run
,
309 # Transition application to next state
310 handler
= handlers
.get(state
, None)
311 if handler
is not None:
314 # Transition dts to next state
315 next_state
= switch
.get(state
, None)
316 if next_state
is not None:
317 self
.dts
.handle
.set_state(next_state
)