Bug 48: SO crash due to service primitive
[osm/SO.git] / rwcm / plugins / rwconman / rift / tasklets / rwconmantasklet / RiftCM_rpc.py
1
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19 import time
20
21 import rift.mano.config_agent
22 import gi
23 gi.require_version('RwDts', '1.0')
24 gi.require_version('RwNsrYang', '1.0')
25 from gi.repository import (
26 RwDts as rwdts,
27 NsrYang,
28 )
29
30 class RiftCMRPCHandler(object):
31 """ The Network service Monitor DTS handler """
32 EXEC_NS_CONF_XPATH = "I,/nsr:exec-ns-service-primitive"
33 EXEC_NS_CONF_O_XPATH = "O,/nsr:exec-ns-service-primitive"
34
35 GET_NS_CONF_XPATH = "I,/nsr:get-ns-service-primitive-values"
36 GET_NS_CONF_O_XPATH = "O,/nsr:get-ns-service-primitive-values"
37
38 def __init__(self, dts, log, loop, nsm):
39 self._dts = dts
40 self._log = log
41 self._loop = loop
42 self._nsm = nsm
43
44 self._ns_regh = None
45 self._vnf_regh = None
46 self._get_ns_conf_regh = None
47
48 self.job_manager = rift.mano.config_agent.ConfigAgentJobManager(dts, log, loop, nsm)
49
50 @property
51 def reghs(self):
52 """ Return registration handles """
53 return (self._ns_regh, self._vnf_regh, self._get_ns_conf_regh)
54
55 @property
56 def nsm(self):
57 """ Return the NS manager instance """
58 return self._nsm
59
60 def prepare_meta(self, rpc_ip):
61
62 try:
63 nsr_id = rpc_ip.nsr_id_ref
64 nsr = self._nsm.nsrs[nsr_id]
65 vnfrs = {}
66 for vnfr in nsr.vnfrs:
67 vnfr_id = vnfr.id
68 # vnfr is a dict containing all attributes
69 vnfrs[vnfr_id] = vnfr
70
71 return nsr, vnfrs
72 except KeyError as e:
73 raise ValueError("Record not found", str(e))
74
75 @asyncio.coroutine
76 def _get_ns_cfg_primitive(self, nsr_id, ns_cfg_name):
77 nsd_msg = yield from self._nsm.get_nsd(nsr_id)
78
79 def get_nsd_cfg_prim(name):
80 for ns_cfg_prim in nsd_msg.service_primitive:
81 if ns_cfg_prim.name == name:
82 return ns_cfg_prim
83 return None
84
85 ns_cfg_prim_msg = get_nsd_cfg_prim(ns_cfg_name)
86 if ns_cfg_prim_msg is not None:
87 ret_cfg_prim_msg = ns_cfg_prim_msg.deep_copy()
88 return ret_cfg_prim_msg
89 return None
90
91 @asyncio.coroutine
92 def _get_vnf_primitive(self, vnfr_id, nsr_id, primitive_name):
93 vnf = self._nsm.get_vnfr_msg(vnfr_id, nsr_id)
94 self._log.debug("vnfr_msg:%s", vnf)
95 if vnf:
96 self._log.debug("nsr/vnf {}/{}, vnf_configuration: %s",
97 vnf.vnf_configuration)
98 for primitive in vnf.vnf_configuration.service_primitive:
99 if primitive.name == primitive_name:
100 return primitive
101
102 raise ValueError("Could not find nsr/vnf {}/{} primitive {}"
103 .format(nsr_id, vnfr_id, primitive_name))
104
105 @asyncio.coroutine
106 def register(self):
107 """ Register for NS monitoring read from dts """
108 yield from self.job_manager.register()
109
110 @asyncio.coroutine
111 def on_ns_config_prepare(xact_info, action, ks_path, msg):
112 """ prepare callback from dts exec-ns-service-primitive"""
113 assert action == rwdts.QueryAction.RPC
114 rpc_ip = msg
115 rpc_op = NsrYang.YangOutput_Nsr_ExecNsServicePrimitive.from_dict({
116 "triggered_by": rpc_ip.triggered_by,
117 "create_time": int(time.time()),
118 "parameter": [param.as_dict() for param in rpc_ip.parameter],
119 "parameter_group": [pg.as_dict() for pg in rpc_ip.parameter_group]
120 })
121
122 try:
123 ns_cfg_prim_name = rpc_ip.name
124 nsr_id = rpc_ip.nsr_id_ref
125 nsr = self._nsm.nsrs[nsr_id]
126
127 nsd_cfg_prim_msg = yield from self._get_ns_cfg_primitive(nsr_id, ns_cfg_prim_name)
128
129 def find_nsd_vnf_prim_param_pool(vnf_index, vnf_prim_name, param_name):
130 for vnf_prim_group in nsd_cfg_prim_msg.vnf_primitive_group:
131 if vnf_prim_group.member_vnf_index_ref != vnf_index:
132 continue
133
134 for vnf_prim in vnf_prim_group.primitive:
135 if vnf_prim.name != vnf_prim_name:
136 continue
137
138 try:
139 nsr_param_pool = nsr.param_pools[pool_param.parameter_pool]
140 except KeyError:
141 raise ValueError("Parameter pool %s does not exist in nsr" % vnf_prim.parameter_pool)
142
143 self._log.debug("Found parameter pool %s for vnf index(%s), vnf_prim_name(%s), param_name(%s)",
144 nsr_param_pool, vnf_index, vnf_prim_name, param_name)
145 return nsr_param_pool
146
147 self._log.debug("Could not find parameter pool for vnf index(%s), vnf_prim_name(%s), param_name(%s)",
148 vnf_index, vnf_prim_name, param_name)
149 return None
150
151 rpc_op.nsr_id_ref = nsr_id
152 rpc_op.name = ns_cfg_prim_name
153
154 nsr, vnfrs = self.prepare_meta(rpc_ip)
155 rpc_op.job_id = nsr.job_id
156
157 # Copy over the NS level Parameters
158
159 # Give preference to user defined script.
160 if nsd_cfg_prim_msg and nsd_cfg_prim_msg.has_field("user_defined_script"):
161 rpc_ip.user_defined_script = nsd_cfg_prim_msg.user_defined_script
162
163 tasks = []
164 for config_plugin in self.nsm.config_agent_plugins:
165 task, err = yield from config_plugin.apply_ns_config(
166 nsr,
167 vnfrs,
168 rpc_ip)
169 tasks.append(task)
170 if err:
171 rpc_op.job_status_details = err.decode()
172
173 self.job_manager.add_job(rpc_op, tasks)
174 else:
175 # Otherwise create VNF primitives.
176 for vnf in rpc_ip.vnf_list:
177 vnf_op = rpc_op.vnf_out_list.add()
178 vnf_member_idx = vnf.member_vnf_index_ref
179 vnfr_id = vnf.vnfr_id_ref
180 vnf_op.vnfr_id_ref = vnfr_id
181 vnf_op.member_vnf_index_ref = vnf_member_idx
182
183 idx = 0
184 for primitive in vnf.vnf_primitive:
185 op_primitive = vnf_op.vnf_out_primitive.add()
186 op_primitive.index = idx
187 idx += 1
188 op_primitive.name = primitive.name
189 op_primitive.execution_id = ''
190 op_primitive.execution_status = 'completed'
191 op_primitive.execution_error_details = ''
192
193 # Copy over the VNF pimitive's input parameters
194 for param in primitive.parameter:
195 output_param = op_primitive.parameter.add()
196 output_param.name = param.name
197 output_param.value = param.value
198
199 self._log.debug("%s:%s Got primitive %s:%s",
200 nsr_id, vnf.member_vnf_index_ref, primitive.name, primitive.parameter)
201
202 nsd_vnf_primitive = yield from self._get_vnf_primitive(
203 vnfr_id,
204 nsr_id,
205 primitive.name
206 )
207 for param in nsd_vnf_primitive.parameter:
208 if not param.has_field("parameter_pool"):
209 continue
210
211 try:
212 nsr_param_pool = nsr.param_pools[param.parameter_pool]
213 except KeyError:
214 raise ValueError("Parameter pool %s does not exist in nsr" % param.parameter_pool)
215 nsr_param_pool.add_used_value(param.value)
216
217 for config_plugin in self.nsm.config_agent_plugins:
218 yield from config_plugin.vnf_config_primitive(nsr_id,
219 vnfr_id,
220 primitive,
221 op_primitive)
222
223 self.job_manager.add_job(rpc_op)
224
225 # Get NSD
226 # Find Config Primitive
227 # For each vnf-primitive with parameter pool
228 # Find parameter pool
229 # Add used value to the pool
230 self._log.debug("RPC output: {}".format(rpc_op))
231 xact_info.respond_xpath(rwdts.XactRspCode.ACK,
232 RiftCMRPCHandler.EXEC_NS_CONF_O_XPATH,
233 rpc_op)
234 except Exception as e:
235 self._log.error("Exception processing the "
236 "exec-ns-service-primitive: {}".format(e))
237 self._log.exception(e)
238 xact_info.respond_xpath(rwdts.XactRspCode.NACK,
239 RiftCMRPCHandler.EXEC_NS_CONF_O_XPATH)
240
241 @asyncio.coroutine
242 def on_get_ns_config_values_prepare(xact_info, action, ks_path, msg):
243 assert action == rwdts.QueryAction.RPC
244 nsr_id = msg.nsr_id_ref
245 cfg_prim_name = msg.name
246 try:
247 nsr = self._nsm.nsrs[nsr_id]
248
249 rpc_op = NsrYang.YangOutput_Nsr_GetNsServicePrimitiveValues()
250
251 ns_cfg_prim_msg = yield from self._get_ns_cfg_primitive(nsr_id, cfg_prim_name)
252
253 # Get pool values for NS-level parameters
254 for ns_param in ns_cfg_prim_msg.parameter:
255 if not ns_param.has_field("parameter_pool"):
256 continue
257
258 try:
259 nsr_param_pool = nsr.param_pools[ns_param.parameter_pool]
260 except KeyError:
261 raise ValueError("Parameter pool %s does not exist in nsr" % ns_param.parameter_pool)
262
263 new_ns_param = rpc_op.ns_parameter.add()
264 new_ns_param.name = ns_param.name
265 new_ns_param.value = str(nsr_param_pool.get_next_unused_value())
266
267 # Get pool values for NS-level parameters
268 for vnf_prim_group in ns_cfg_prim_msg.vnf_primitive_group:
269 rsp_prim_group = rpc_op.vnf_primitive_group.add()
270 rsp_prim_group.member_vnf_index_ref = vnf_prim_group.member_vnf_index_ref
271 if vnf_prim_group.has_field("vnfd_id_ref"):
272 rsp_prim_group.vnfd_id_ref = vnf_prim_group.vnfd_id_ref
273
274 for index, vnf_prim in enumerate(vnf_prim_group.primitive):
275 rsp_prim = rsp_prim_group.primitive.add()
276 rsp_prim.name = vnf_prim.name
277 rsp_prim.index = index
278 vnf_primitive = yield from self._get_vnf_primitive(
279 vnf_prim_group.vnfd_id_ref,
280 nsr_id,
281 vnf_prim.name
282 )
283 for param in vnf_primitive.parameter:
284 if not param.has_field("parameter_pool"):
285 continue
286
287 # Get pool values for NS-level parameters
288 for ns_param in ns_cfg_prim_msg.parameter:
289 if not ns_param.has_field("parameter_pool"):
290 continue
291
292 try:
293 nsr_param_pool = nsr.param_pools[ns_param.parameter_pool]
294 except KeyError:
295 raise ValueError("Parameter pool %s does not exist in nsr" % ns_param.parameter_pool)
296
297 new_ns_param = rpc_op.ns_parameter.add()
298 new_ns_param.name = ns_param.name
299 new_ns_param.value = str(nsr_param_pool.get_next_unused_value())
300
301 # Get pool values for NS-level parameters
302 for vnf_prim_group in ns_cfg_prim_msg.vnf_primitive_group:
303 rsp_prim_group = rpc_op.vnf_primitive_group.add()
304 rsp_prim_group.member_vnf_index_ref = vnf_prim_group.member_vnf_index_ref
305 if vnf_prim_group.has_field("vnfd_id_ref"):
306 rsp_prim_group.vnfd_id_ref = vnf_prim_group.vnfd_id_ref
307
308 for index, vnf_prim in enumerate(vnf_prim_group.primitive):
309 rsp_prim = rsp_prim_group.primitive.add()
310 rsp_prim.name = vnf_prim.name
311 rsp_prim.index = index
312 vnf_primitive = yield from self._get_vnf_primitive(
313 nsr_id,
314 vnf_prim_group.member_vnf_index_ref,
315 vnf_prim.name
316 )
317 for param in vnf_primitive.parameter:
318 if not param.has_field("parameter_pool"):
319 continue
320
321 try:
322 nsr_param_pool = nsr.param_pools[param.parameter_pool]
323 except KeyError:
324 raise ValueError("Parameter pool %s does not exist in nsr" % vnf_prim.parameter_pool)
325
326 vnf_param = rsp_prim.parameter.add()
327 vnf_param.name = param.name
328 vnf_param.value = str(nsr_param_pool.get_next_unused_value())
329
330 self._log.debug("RPC output: {}".format(rpc_op))
331 xact_info.respond_xpath(rwdts.XactRspCode.ACK,
332 RiftCMRPCHandler.GET_NS_CONF_O_XPATH, rpc_op)
333 except Exception as e:
334 self._log.error("Exception processing the "
335 "get-ns-service-primitive-values: {}".format(e))
336 self._log.exception(e)
337 xact_info.respond_xpath(rwdts.XactRspCode.NACK,
338 RiftCMRPCHandler.GET_NS_CONF_O_XPATH)
339
340 hdl_ns = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_ns_config_prepare,)
341 hdl_ns_get = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_get_ns_config_values_prepare,)
342
343 with self._dts.group_create() as group:
344 self._ns_regh = group.register(xpath=RiftCMRPCHandler.EXEC_NS_CONF_XPATH,
345 handler=hdl_ns,
346 flags=rwdts.Flag.PUBLISHER,
347 )
348 self._get_ns_conf_regh = group.register(xpath=RiftCMRPCHandler.GET_NS_CONF_XPATH,
349 handler=hdl_ns_get,
350 flags=rwdts.Flag.PUBLISHER,
351 )
352
353