6def40eb23d21bc0ceeb429b2fd9063d28db1c1b
[osm/SO.git] / rwlaunchpad / plugins / rwnsm / rift / tasklets / rwnsmtasklet / publisher.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 import asyncio
18 import concurrent.futures
19 import json
20
21 from gi.repository import (
22 RwDts as rwdts,
23 RwTypes,
24 RwProjectVnfdYang as RwVnfdYang,
25 RwYang
26 )
27 import rift.tasklets
28
29 import requests
30
31
32 class NsrOpDataDtsHandler(object):
33 """ The network service op data DTS handler """
34 XPATH = "D,/nsr:ns-instance-opdata/nsr:nsr"
35
36 def __init__(self, dts, log, loop, project):
37 self._dts = dts
38 self._log = log
39 self._loop = loop
40 self._project = project
41 self._regh = None
42
43 @property
44 def regh(self):
45 """ Return the registration handle"""
46 return self._regh
47
48 @asyncio.coroutine
49 def register(self):
50 """ Register for Nsr op data publisher registration"""
51 if self._regh:
52 return
53
54 xpath = self._project.add_project(NsrOpDataDtsHandler.XPATH)
55 self._log.debug("Registering Nsr op data path {} as publisher".
56 format(xpath))
57
58 hdl = rift.tasklets.DTS.RegistrationHandler()
59 with self._dts.group_create() as group:
60 self._regh = group.register(xpath=xpath,
61 handler=hdl,
62 flags=rwdts.Flag.PUBLISHER | rwdts.Flag.NO_PREP_READ)
63
64 @asyncio.coroutine
65 def create(self, xact, xpath, msg):
66 """
67 Create an NS record in DTS with the path and message
68 """
69 path = self._project.add_project(xpath)
70 self._log.debug("Creating NSR xact = %s, %s:%s", xact, path, msg)
71 self.regh.create_element(path, msg, xact=xact)
72 self._log.debug("Created NSR xact = %s, %s:%s", xact, path, msg)
73
74 @asyncio.coroutine
75 def update(self, xact, xpath, msg, flags=rwdts.XactFlag.REPLACE):
76 """
77 Update an NS record in DTS with the path and message
78 """
79 path = self._project.add_project(xpath)
80 self._log.debug("Updating NSR xact = %s, %s:%s regh = %s", xact, path, msg, self.regh)
81 self.regh.update_element(path, msg, flags, xact=xact)
82 self._log.debug("Updated NSR xact = %s, %s:%s", xact, path, msg)
83
84 @asyncio.coroutine
85 def delete(self, xact, xpath):
86 """
87 Update an NS record in DTS with the path and message
88 """
89 path = self._project.add_project(xpath)
90 self._log.debug("Deleting NSR xact:%s, path:%s", xact, path)
91 self.regh.delete_element(path, xact=xact)
92 self._log.debug("Deleted NSR xact:%s, path:%s", xact, path)
93
94 def deregister(self):
95 if self._regh:
96 self._regh.deregister()
97 self._regh = None
98
99 class VnfrPublisherDtsHandler(object):
100 """ Registers 'D,/rw-project:project/vnfr:vnfr-catalog/vnfr:vnfr' DTS"""
101 XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr"
102
103 def __init__(self, dts, log, loop, project):
104 self._dts = dts
105 self._log = log
106 self._loop = loop
107 self._project = project
108
109 self._regh = None
110
111 @property
112 def regh(self):
113 """ Return registration handle"""
114 return self._regh
115
116 @asyncio.coroutine
117 def register(self):
118 """ Register for Vvnfr create/update/delete/read requests from dts """
119 if self._regh:
120 return
121
122 @asyncio.coroutine
123 def on_prepare(xact_info, action, ks_path, msg):
124 """ prepare callback from dts """
125 self._log.debug(
126 "Got vnfr on_prepare callback (xact_info: %s, action: %s): %s",
127 xact_info, action, msg
128 )
129 raise NotImplementedError(
130 "%s action on VirtualNetworkFunctionRecord not supported",
131 action)
132
133 xpath = self._project.add_project(VnfrPublisherDtsHandler.XPATH)
134 self._log.debug("Registering for VNFR using xpath: {}".
135 format(xpath))
136
137 hdl = rift.tasklets.DTS.RegistrationHandler()
138 with self._dts.group_create() as group:
139 self._regh = group.register(xpath=xpath,
140 handler=hdl,
141 flags=(rwdts.Flag.PUBLISHER |
142 rwdts.Flag.SHARED |
143 rwdts.Flag.NO_PREP_READ |
144 rwdts.Flag.CACHE),)
145
146 def deregister(self):
147 if self._regh:
148 self._regh.deregister()
149 self._regh = None
150
151 @asyncio.coroutine
152 def create(self, xact, path, msg):
153 """
154 Create a VNFR record in DTS with path and message
155 """
156 self._log.debug("Creating VNFR xact = %s, %s:%s",
157 xact, path, msg)
158 self.regh.create_element(path, msg, xact=xact)
159 self._log.debug("Created VNFR xact = %s, %s:%s",
160 xact, path, msg)
161
162 @asyncio.coroutine
163 def update(self, xact, path, msg):
164 """
165 Update a VNFR record in DTS with path and message
166 """
167 self._log.debug("Updating VNFR xact = %s, %s:%s",
168 xact, path, msg)
169 self.regh.update_element(path, msg, xact=xact)
170 self._log.debug("Updated VNFR xact = %s, %s:%s",
171 xact, path, msg)
172
173 @asyncio.coroutine
174 def delete(self, xact, path):
175 """
176 Delete a VNFR record in DTS with path and message
177 """
178 self._log.debug("Deleting VNFR xact = %s, %s", xact, path)
179 self.regh.delete_element(path, xact=xact)
180 self._log.debug("Deleted VNFR xact = %s, %s", xact, path)
181
182
183 class VlrPublisherDtsHandler(object):
184 """ registers 'D,/rw-project:project/vlr:vlr-catalog/vlr:vlr """
185 XPATH = "D,/vlr:vlr-catalog/vlr:vlr"
186
187 def __init__(self, dts, log, loop, project):
188 self._dts = dts
189 self._log = log
190 self._loop = loop
191 self._project = project
192
193 self._regh = None
194
195 @property
196 def regh(self):
197 """ Return registration handle"""
198 return self._regh
199
200 @asyncio.coroutine
201 def register(self):
202 """ Register for vlr create/update/delete/read requests from dts """
203
204 if self._regh:
205 return
206
207 @asyncio.coroutine
208 def on_prepare(xact_info, action, ks_path, msg):
209 """ prepare callback from dts """
210 self._log.debug(
211 "Got vlr on_prepare callback (xact_info: %s, action: %s): %s",
212 xact_info, action, msg
213 )
214 raise NotImplementedError(
215 "%s action on VirtualLinkRecord not supported",
216 action)
217
218 xpath = self._project.add_project(VlrPublisherDtsHandler.XPATH)
219 self._log.debug("Registering for VLR using xpath: {}".
220 format(xpath))
221
222 hdl = rift.tasklets.DTS.RegistrationHandler()
223 with self._dts.group_create() as group:
224 self._regh = group.register(xpath=xpath,
225 handler=hdl,
226 flags=(rwdts.Flag.PUBLISHER |
227 rwdts.Flag.NO_PREP_READ |
228 rwdts.Flag.CACHE),)
229
230 def deregister(self):
231 if self._regh:
232 self._regh.deregister()
233 self._regh = None
234
235 @asyncio.coroutine
236 def create(self, xact, path, msg):
237 """
238 Create a VLR record in DTS with path and message
239 """
240 self._log.debug("Creating VLR xact = %s, %s:%s",
241 xact, path, msg)
242 self.regh.create_element(path, msg, xact=xact)
243 self._log.debug("Created VLR xact = %s, %s:%s",
244 xact, path, msg)
245
246 @asyncio.coroutine
247 def update(self, xact, path, msg):
248 """
249 Update a VLR record in DTS with path and message
250 """
251 self._log.debug("Updating VLR xact = %s, %s:%s",
252 xact, path, msg)
253 self.regh.update_element(path, msg, xact=xact)
254 self._log.debug("Updated VLR xact = %s, %s:%s",
255 xact, path, msg)
256
257 @asyncio.coroutine
258 def delete(self, xact, path):
259 """
260 Delete a VLR record in DTS with path and message
261 """
262 self._log.debug("Deleting VLR xact = %s, %s", xact, path)
263 self.regh.delete_element(path, xact=xact)
264 self._log.debug("Deleted VLR xact = %s, %s", xact, path)
265
266
267 class VnfdPublisher(object):
268 AUTH = ('@rift', 'rift')
269 HEADERS = {"content-type": "application/vnd.yang.data+json"}
270
271
272 def __init__(self, use_ssl, ssl_cert, ssl_key, loop, project):
273 self.use_ssl = use_ssl
274 self.ssl_cert = ssl_cert
275 self.ssl_key = ssl_key
276 self._project = project
277 self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
278 self.loop = loop
279
280 @asyncio.coroutine
281 def update(self, vnfd):
282 def update(vnfd):
283 """
284 Update VNFD record using rest API, as the config data is handled
285 by uAgent and stored in CDB
286 """
287
288 scheme = "https" if self.use_ssl else "http"
289
290 url = "{}://127.0.0.1:8008/api/config/project/{}/vnfd-catalog/vnfd/{}"
291
292 model = RwYang.Model.create_libyang()
293 model.load_module("rw-project-vnfd")
294 model.load_module("project-vnfd")
295
296 data = vnfd.to_json(model)
297
298 key = "project-vnfd:vnfd-catalog"
299 newdict = json.loads(data)
300 if key in newdict:
301 data = json.dumps(newdict[key])
302
303 options = {"data": data,
304 "headers": VnfdPublisher.HEADERS,
305 "auth": VnfdPublisher.AUTH}
306
307 if self.use_ssl:
308 options["verify"] = False
309 options["cert"] = (self.ssl_cert, self.ssl_key)
310
311 response = requests.put(
312 url.format(scheme, self._project.name, vnfd.id),
313 **options
314 )
315
316 status = yield from self.loop.run_in_executor(
317 None,
318 update,
319 vnfd
320 )