Merge from OSM SO master
[osm/SO.git] / rwlaunchpad / plugins / rwnsm / rift / tasklets / rwnsmtasklet / publisher.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 import asyncio
18 import concurrent.futures
19 import json
20
21 from gi.repository import (
22 RwDts as rwdts,
23 RwTypes,
24 RwProjectVnfdYang as RwVnfdYang,
25 RwYang
26 )
27 import rift.tasklets
28
29 import requests
30
31
32 class NsrOpDataDtsHandler(object):
33 """ The network service op data DTS handler """
34 XPATH = "D,/nsr:ns-instance-opdata/nsr:nsr"
35
36 def __init__(self, dts, log, loop, project):
37 self._dts = dts
38 self._log = log
39 self._loop = loop
40 self._project = project
41 self._regh = None
42
43 @property
44 def regh(self):
45 """ Return the registration handle"""
46 return self._regh
47
48 @asyncio.coroutine
49 def register(self):
50 """ Register for Nsr op data publisher registration"""
51 if self._regh:
52 return
53
54 xpath = self._project.add_project(NsrOpDataDtsHandler.XPATH)
55 self._log.debug("Registering Nsr op data path {} as publisher".
56 format(xpath))
57
58 hdl = rift.tasklets.DTS.RegistrationHandler()
59 with self._dts.group_create() as group:
60 self._regh = group.register(xpath=xpath,
61 handler=hdl,
62 flags=rwdts.Flag.PUBLISHER | rwdts.Flag.NO_PREP_READ)
63
64 @asyncio.coroutine
65 def create(self, xact, xpath, msg):
66 """
67 Create an NS record in DTS with the path and message
68 """
69 path = self._project.add_project(xpath)
70 self._log.debug("Creating NSR xact = %s, %s:%s", xact, path, msg)
71 self.regh.create_element(path, msg)
72 self._log.debug("Created NSR xact = %s, %s:%s", xact, path, msg)
73
74 @asyncio.coroutine
75 def update(self, xact, xpath, msg, flags=rwdts.XactFlag.REPLACE):
76 """
77 Update an NS record in DTS with the path and message
78 """
79 path = self._project.add_project(xpath)
80 self._log.debug("Updating NSR xact = %s, %s:%s regh = %s", xact, path, msg, self.regh)
81 self.regh.update_element(path, msg, flags)
82 self._log.debug("Updated NSR xact = %s, %s:%s", xact, path, msg)
83
84 @asyncio.coroutine
85 def delete(self, xact, xpath):
86 """
87 Update an NS record in DTS with the path and message
88 """
89 path = self._project.add_project(xpath)
90 self._log.debug("Deleting NSR xact:%s, path:%s", xact, path)
91 self.regh.delete_element(path)
92 self._log.debug("Deleted NSR xact:%s, path:%s", xact, path)
93
94
95
96 class VnfrPublisherDtsHandler(object):
97 """ Registers 'D,/rw-project:project/vnfr:vnfr-catalog/vnfr:vnfr' DTS"""
98 XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr"
99
100 def __init__(self, dts, log, loop, project):
101 self._dts = dts
102 self._log = log
103 self._loop = loop
104 self._project = project
105
106 self._regh = None
107
108 @property
109 def regh(self):
110 """ Return registration handle"""
111 return self._regh
112
113 @asyncio.coroutine
114 def register(self):
115 """ Register for Vvnfr create/update/delete/read requests from dts """
116 if self._regh:
117 return
118
119 @asyncio.coroutine
120 def on_prepare(xact_info, action, ks_path, msg):
121 """ prepare callback from dts """
122 self._log.debug(
123 "Got vnfr on_prepare callback (xact_info: %s, action: %s): %s",
124 xact_info, action, msg
125 )
126 raise NotImplementedError(
127 "%s action on VirtualNetworkFunctionRecord not supported",
128 action)
129
130 xpath = self._project.add_project(VnfrPublisherDtsHandler.XPATH)
131 self._log.debug("Registering for VNFR using xpath: {}".
132 format(xpath))
133
134 hdl = rift.tasklets.DTS.RegistrationHandler()
135 with self._dts.group_create() as group:
136 self._regh = group.register(xpath=xpath,
137 handler=hdl,
138 flags=(rwdts.Flag.PUBLISHER |
139 rwdts.Flag.NO_PREP_READ |
140 rwdts.Flag.CACHE),)
141
142 @asyncio.coroutine
143 def create(self, xact, path, msg):
144 """
145 Create a VNFR record in DTS with path and message
146 """
147 self._log.debug("Creating VNFR xact = %s, %s:%s",
148 xact, path, msg)
149 self.regh.create_element(path, msg)
150 self._log.debug("Created VNFR xact = %s, %s:%s",
151 xact, path, msg)
152
153 @asyncio.coroutine
154 def update(self, xact, path, msg):
155 """
156 Update a VNFR record in DTS with path and message
157 """
158 self._log.debug("Updating VNFR xact = %s, %s:%s",
159 xact, path, msg)
160 self.regh.update_element(path, msg)
161 self._log.debug("Updated VNFR xact = %s, %s:%s",
162 xact, path, msg)
163
164 @asyncio.coroutine
165 def delete(self, xact, path):
166 """
167 Delete a VNFR record in DTS with path and message
168 """
169 self._log.debug("Deleting VNFR xact = %s, %s", xact, path)
170 self.regh.delete_element(path)
171 self._log.debug("Deleted VNFR xact = %s, %s", xact, path)
172
173
174 class VlrPublisherDtsHandler(object):
175 """ registers 'D,/rw-project:project/vlr:vlr-catalog/vlr:vlr """
176 XPATH = "D,/vlr:vlr-catalog/vlr:vlr"
177
178 def __init__(self, dts, log, loop, project):
179 self._dts = dts
180 self._log = log
181 self._loop = loop
182 self._project = project
183
184 self._regh = None
185
186 @property
187 def regh(self):
188 """ Return registration handle"""
189 return self._regh
190
191 @asyncio.coroutine
192 def register(self):
193 """ Register for vlr create/update/delete/read requests from dts """
194
195 if self._regh:
196 return
197
198 @asyncio.coroutine
199 def on_prepare(xact_info, action, ks_path, msg):
200 """ prepare callback from dts """
201 self._log.debug(
202 "Got vlr on_prepare callback (xact_info: %s, action: %s): %s",
203 xact_info, action, msg
204 )
205 raise NotImplementedError(
206 "%s action on VirtualLinkRecord not supported",
207 action)
208
209 xpath = self._project.add_project(VlrPublisherDtsHandler.XPATH)
210 self._log.debug("Registering for VLR using xpath: {}".
211 format(xpath))
212
213 hdl = rift.tasklets.DTS.RegistrationHandler()
214 with self._dts.group_create() as group:
215 self._regh = group.register(xpath=xpath,
216 handler=hdl,
217 flags=(rwdts.Flag.PUBLISHER |
218 rwdts.Flag.NO_PREP_READ |
219 rwdts.Flag.CACHE),)
220
221 @asyncio.coroutine
222 def create(self, xact, path, msg):
223 """
224 Create a VLR record in DTS with path and message
225 """
226 self._log.debug("Creating VLR xact = %s, %s:%s",
227 xact, path, msg)
228 self.regh.create_element(path, msg)
229 self._log.debug("Created VLR xact = %s, %s:%s",
230 xact, path, msg)
231
232 @asyncio.coroutine
233 def update(self, xact, path, msg):
234 """
235 Update a VLR record in DTS with path and message
236 """
237 self._log.debug("Updating VLR xact = %s, %s:%s",
238 xact, path, msg)
239 self.regh.update_element(path, msg)
240 self._log.debug("Updated VLR xact = %s, %s:%s",
241 xact, path, msg)
242
243 @asyncio.coroutine
244 def delete(self, xact, path):
245 """
246 Delete a VLR record in DTS with path and message
247 """
248 self._log.debug("Deleting VLR xact = %s, %s", xact, path)
249 self.regh.delete_element(path)
250 self._log.debug("Deleted VLR xact = %s, %s", xact, path)
251
252
253 class VnfdPublisher(object):
254 AUTH = ('admin', 'admin')
255 HEADERS = {"content-type": "application/vnd.yang.data+json"}
256
257
258 def __init__(self, use_ssl, ssl_cert, ssl_key, loop, project):
259 self.use_ssl = use_ssl
260 self.ssl_cert = ssl_cert
261 self.ssl_key = ssl_key
262 self._project = project
263 self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
264 self.loop = loop
265
266 @asyncio.coroutine
267 def update(self, vnfd):
268 def update(vnfd):
269 """
270 Update VNFD record using rest API, as the config data is handled
271 by uAgent and stored in CDB
272 """
273
274 scheme = "https" if self.use_ssl else "http"
275
276 url = "{}://127.0.0.1:8008/api/config/project/{}/vnfd-catalog/vnfd/{}"
277
278 model = RwYang.Model.create_libncx()
279 model.load_module("rw-vnfd")
280 model.load_module("vnfd")
281
282 data = vnfd.to_json(model)
283
284 key = "project-vnfd:vnfd-catalog"
285 newdict = json.loads(data)
286 if key in newdict:
287 data = json.dumps(newdict[key])
288
289 options = {"data": data,
290 "headers": VnfdPublisher.HEADERS,
291 "auth": VnfdPublisher.AUTH}
292
293 if self.use_ssl:
294 options["verify"] = False
295 options["cert"] = (self.ssl_cert, self.ssl_key)
296
297 response = requests.put(
298 url.format(scheme, self._project.name, vnfd.id),
299 **options
300 )
301
302 status = yield from self.loop.run_in_executor(
303 None,
304 update,
305 vnfd
306 )
307