RIFT OSM R1 Initial Submission
[osm/SO.git] / rwcm / plugins / rwconman / rift / tasklets / rwconmantasklet / RiftCM_rpc.py
1
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19 import time
20
21 import rift.mano.config_agent
22 import gi
23 gi.require_version('RwDts', '1.0')
24 gi.require_version('RwNsrYang', '1.0')
25 from gi.repository import (
26 RwDts as rwdts,
27 NsrYang,
28 )
29
30 class RiftCMRPCHandler(object):
31 """ The Network service Monitor DTS handler """
32 EXEC_NS_CONF_XPATH = "I,/nsr:exec-ns-service-primitive"
33 EXEC_NS_CONF_O_XPATH = "O,/nsr:exec-ns-service-primitive"
34
35 GET_NS_CONF_XPATH = "I,/nsr:get-ns-service-primitive-values"
36 GET_NS_CONF_O_XPATH = "O,/nsr:get-ns-service-primitive-values"
37
38 def __init__(self, dts, log, loop, nsm):
39 self._dts = dts
40 self._log = log
41 self._loop = loop
42 self._nsm = nsm
43
44 self._ns_regh = None
45 self._vnf_regh = None
46 self._get_ns_conf_regh = None
47
48 self.job_manager = rift.mano.config_agent.ConfigAgentJobManager(dts, log, loop, nsm)
49
50 @property
51 def reghs(self):
52 """ Return registration handles """
53 return (self._ns_regh, self._vnf_regh, self._get_ns_conf_regh)
54
55 @property
56 def nsm(self):
57 """ Return the NS manager instance """
58 return self._nsm
59
60 def prepare_meta(self, rpc_ip):
61
62 try:
63 nsr_id = rpc_ip.nsr_id_ref
64 nsr = self._nsm.nsrs[nsr_id]
65 vnfrs = {}
66 for vnfr in nsr.vnfrs:
67 vnfr_id = vnfr.id
68 # vnfr is a dict containing all attributes
69 vnfrs[vnfr_id] = vnfr
70
71 return nsr, vnfrs
72 except KeyError as e:
73 raise ValueError("Record not found", str(e))
74
75 @asyncio.coroutine
76 def _get_ns_cfg_primitive(self, nsr_id, ns_cfg_name):
77 nsd_msg = yield from self._nsm.get_nsd(nsr_id)
78
79 def get_nsd_cfg_prim(name):
80 for ns_cfg_prim in nsd_msg.service_primitive:
81 if ns_cfg_prim.name == name:
82 return ns_cfg_prim
83 return None
84
85 ns_cfg_prim_msg = get_nsd_cfg_prim(ns_cfg_name)
86 if ns_cfg_prim_msg is not None:
87 ret_cfg_prim_msg = ns_cfg_prim_msg.deep_copy()
88 return ret_cfg_prim_msg
89 return None
90
91 @asyncio.coroutine
92 def _get_vnf_primitive(self, vnfr_id, nsr_id, primitive_name):
93 vnf = self._nsm.get_vnfr_msg(vnfr_id, nsr_id)
94 self._log.debug("vnfr_msg:%s", vnf)
95 if vnf:
96 self._log.debug("nsr/vnf {}/{}, vnf_configuration: %s",
97 vnf.vnf_configuration)
98 for primitive in vnf.vnf_configuration.service_primitive:
99 if primitive.name == primitive_name:
100 return primitive
101
102 raise ValueError("Could not find nsr/vnf {}/{} primitive {}"
103 .format(nsr_id, vnfr_id, primitive_name))
104
105 @asyncio.coroutine
106 def register(self):
107 """ Register for NS monitoring read from dts """
108 yield from self.job_manager.register()
109
110 @asyncio.coroutine
111 def on_ns_config_prepare(xact_info, action, ks_path, msg):
112 """ prepare callback from dts exec-ns-service-primitive"""
113 assert action == rwdts.QueryAction.RPC
114 rpc_ip = msg
115 rpc_op = NsrYang.YangOutput_Nsr_ExecNsServicePrimitive.from_dict({
116 "triggered_by": rpc_ip.triggered_by,
117 "create_time": int(time.time()),
118 "parameter": [param.as_dict() for param in rpc_ip.parameter],
119 "parameter_group": [pg.as_dict() for pg in rpc_ip.parameter_group]
120 })
121
122 try:
123 ns_cfg_prim_name = rpc_ip.name
124 nsr_id = rpc_ip.nsr_id_ref
125 nsr = self._nsm.nsrs[nsr_id]
126
127 nsd_cfg_prim_msg = yield from self._get_ns_cfg_primitive(nsr_id, ns_cfg_prim_name)
128
129 def find_nsd_vnf_prim_param_pool(vnf_index, vnf_prim_name, param_name):
130 for vnf_prim_group in nsd_cfg_prim_msg.vnf_primitive_group:
131 if vnf_prim_group.member_vnf_index_ref != vnf_index:
132 continue
133
134 for vnf_prim in vnf_prim_group.primitive:
135 if vnf_prim.name != vnf_prim_name:
136 continue
137
138 try:
139 nsr_param_pool = nsr.param_pools[pool_param.parameter_pool]
140 except KeyError:
141 raise ValueError("Parameter pool %s does not exist in nsr" % vnf_prim.parameter_pool)
142
143 self._log.debug("Found parameter pool %s for vnf index(%s), vnf_prim_name(%s), param_name(%s)",
144 nsr_param_pool, vnf_index, vnf_prim_name, param_name)
145 return nsr_param_pool
146
147 self._log.debug("Could not find parameter pool for vnf index(%s), vnf_prim_name(%s), param_name(%s)",
148 vnf_index, vnf_prim_name, param_name)
149 return None
150
151 rpc_op.nsr_id_ref = nsr_id
152 rpc_op.name = ns_cfg_prim_name
153
154 nsr, vnfrs = self.prepare_meta(rpc_ip)
155 rpc_op.job_id = nsr.job_id
156
157 # Copy over the NS level Parameters
158
159 # Give preference to user defined script.
160 if nsd_cfg_prim_msg and nsd_cfg_prim_msg.has_field("user_defined_script"):
161 rpc_ip.user_defined_script = nsd_cfg_prim_msg.user_defined_script
162
163 tasks = []
164 for config_plugin in self.nsm.config_agent_plugins:
165 task, err = yield from config_plugin.apply_ns_config(
166 nsr,
167 vnfrs,
168 rpc_ip)
169 tasks.append(task)
170 if err:
171 rpc_op.job_status_details = err.decode()
172
173 self.job_manager.add_job(rpc_op, tasks)
174 else:
175 # Otherwise create VNF primitives.
176 for vnf in rpc_ip.vnf_list:
177 vnf_op = rpc_op.vnf_out_list.add()
178 vnf_member_idx = vnf.member_vnf_index_ref
179 vnfr_id = vnf.vnfr_id_ref
180 vnf_op.vnfr_id_ref = vnfr_id
181 vnf_op.member_vnf_index_ref = vnf_member_idx
182
183 for primitive in vnf.vnf_primitive:
184 op_primitive = vnf_op.vnf_out_primitive.add()
185 op_primitive.name = primitive.name
186 op_primitive.execution_id = ''
187 op_primitive.execution_status = 'completed'
188 op_primitive.execution_error_details = ''
189
190 # Copy over the VNF pimitive's input parameters
191 for param in primitive.parameter:
192 output_param = op_primitive.parameter.add()
193 output_param.name = param.name
194 output_param.value = param.value
195
196 self._log.debug("%s:%s Got primitive %s:%s",
197 nsr_id, vnf.member_vnf_index_ref, primitive.name, primitive.parameter)
198
199 nsd_vnf_primitive = yield from self._get_vnf_primitive(
200 vnfr_id,
201 nsr_id,
202 primitive.name
203 )
204 for param in nsd_vnf_primitive.parameter:
205 if not param.has_field("parameter_pool"):
206 continue
207
208 try:
209 nsr_param_pool = nsr.param_pools[param.parameter_pool]
210 except KeyError:
211 raise ValueError("Parameter pool %s does not exist in nsr" % param.parameter_pool)
212 nsr_param_pool.add_used_value(param.value)
213
214 for config_plugin in self.nsm.config_agent_plugins:
215 yield from config_plugin.vnf_config_primitive(nsr_id,
216 vnfr_id,
217 primitive,
218 op_primitive)
219
220 self.job_manager.add_job(rpc_op)
221
222 # Get NSD
223 # Find Config Primitive
224 # For each vnf-primitive with parameter pool
225 # Find parameter pool
226 # Add used value to the pool
227 self._log.debug("RPC output: {}".format(rpc_op))
228 xact_info.respond_xpath(rwdts.XactRspCode.ACK,
229 RiftCMRPCHandler.EXEC_NS_CONF_O_XPATH,
230 rpc_op)
231 except Exception as e:
232 self._log.error("Exception processing the "
233 "exec-ns-service-primitive: {}".format(e))
234 self._log.exception(e)
235 xact_info.respond_xpath(rwdts.XactRspCode.NACK,
236 RiftCMRPCHandler.EXEC_NS_CONF_O_XPATH)
237
238 @asyncio.coroutine
239 def on_get_ns_config_values_prepare(xact_info, action, ks_path, msg):
240 assert action == rwdts.QueryAction.RPC
241 nsr_id = msg.nsr_id_ref
242 cfg_prim_name = msg.name
243 try:
244 nsr = self._nsm.nsrs[nsr_id]
245
246 rpc_op = NsrYang.YangOutput_Nsr_GetNsServicePrimitiveValues()
247
248 ns_cfg_prim_msg = yield from self._get_ns_cfg_primitive(nsr_id, cfg_prim_name)
249
250 # Get pool values for NS-level parameters
251 for ns_param in ns_cfg_prim_msg.parameter:
252 if not ns_param.has_field("parameter_pool"):
253 continue
254
255 try:
256 nsr_param_pool = nsr.param_pools[ns_param.parameter_pool]
257 except KeyError:
258 raise ValueError("Parameter pool %s does not exist in nsr" % ns_param.parameter_pool)
259
260 new_ns_param = rpc_op.ns_parameter.add()
261 new_ns_param.name = ns_param.name
262 new_ns_param.value = str(nsr_param_pool.get_next_unused_value())
263
264 # Get pool values for NS-level parameters
265 for vnf_prim_group in ns_cfg_prim_msg.vnf_primitive_group:
266 rsp_prim_group = rpc_op.vnf_primitive_group.add()
267 rsp_prim_group.member_vnf_index_ref = vnf_prim_group.member_vnf_index_ref
268 if vnf_prim_group.has_field("vnfd_id_ref"):
269 rsp_prim_group.vnfd_id_ref = vnf_prim_group.vnfd_id_ref
270
271 for index, vnf_prim in enumerate(vnf_prim_group.primitive):
272 rsp_prim = rsp_prim_group.primitive.add()
273 rsp_prim.name = vnf_prim.name
274 rsp_prim.index = index
275 vnf_primitive = yield from self._get_vnf_primitive(
276 vnf_prim_group.vnfd_id_ref,
277 nsr_id,
278 vnf_prim.name
279 )
280 for param in vnf_primitive.parameter:
281 if not param.has_field("parameter_pool"):
282 continue
283
284 # Get pool values for NS-level parameters
285 for ns_param in ns_cfg_prim_msg.parameter:
286 if not ns_param.has_field("parameter_pool"):
287 continue
288
289 try:
290 nsr_param_pool = nsr.param_pools[ns_param.parameter_pool]
291 except KeyError:
292 raise ValueError("Parameter pool %s does not exist in nsr" % ns_param.parameter_pool)
293
294 new_ns_param = rpc_op.ns_parameter.add()
295 new_ns_param.name = ns_param.name
296 new_ns_param.value = str(nsr_param_pool.get_next_unused_value())
297
298 # Get pool values for NS-level parameters
299 for vnf_prim_group in ns_cfg_prim_msg.vnf_primitive_group:
300 rsp_prim_group = rpc_op.vnf_primitive_group.add()
301 rsp_prim_group.member_vnf_index_ref = vnf_prim_group.member_vnf_index_ref
302 if vnf_prim_group.has_field("vnfd_id_ref"):
303 rsp_prim_group.vnfd_id_ref = vnf_prim_group.vnfd_id_ref
304
305 for index, vnf_prim in enumerate(vnf_prim_group.primitive):
306 rsp_prim = rsp_prim_group.primitive.add()
307 rsp_prim.name = vnf_prim.name
308 rsp_prim.index = index
309 vnf_primitive = yield from self._get_vnf_primitive(
310 nsr_id,
311 vnf_prim_group.member_vnf_index_ref,
312 vnf_prim.name
313 )
314 for param in vnf_primitive.parameter:
315 if not param.has_field("parameter_pool"):
316 continue
317
318 try:
319 nsr_param_pool = nsr.param_pools[param.parameter_pool]
320 except KeyError:
321 raise ValueError("Parameter pool %s does not exist in nsr" % vnf_prim.parameter_pool)
322
323 vnf_param = rsp_prim.parameter.add()
324 vnf_param.name = param.name
325 vnf_param.value = str(nsr_param_pool.get_next_unused_value())
326
327 self._log.debug("RPC output: {}".format(rpc_op))
328 xact_info.respond_xpath(rwdts.XactRspCode.ACK,
329 RiftCMRPCHandler.GET_NS_CONF_O_XPATH, rpc_op)
330 except Exception as e:
331 self._log.error("Exception processing the "
332 "get-ns-service-primitive-values: {}".format(e))
333 self._log.exception(e)
334 xact_info.respond_xpath(rwdts.XactRspCode.NACK,
335 RiftCMRPCHandler.GET_NS_CONF_O_XPATH)
336
337 hdl_ns = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_ns_config_prepare,)
338 hdl_ns_get = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_get_ns_config_values_prepare,)
339
340 with self._dts.group_create() as group:
341 self._ns_regh = group.register(xpath=RiftCMRPCHandler.EXEC_NS_CONF_XPATH,
342 handler=hdl_ns,
343 flags=rwdts.Flag.PUBLISHER,
344 )
345 self._get_ns_conf_regh = group.register(xpath=RiftCMRPCHandler.GET_NS_CONF_XPATH,
346 handler=hdl_ns_get,
347 flags=rwdts.Flag.PUBLISHER,
348 )
349
350