6c4b1237c2d43b2a8bf35aef077de323446a5ef2
[osm/SO.git] / rwlaunchpad / plugins / rwnsm / rift / tasklets / rwnsmtasklet / publisher.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 import asyncio
18 import concurrent.futures
19 import json
20
21 from gi.repository import (
22 RwDts as rwdts,
23 RwTypes,
24 RwVnfdYang,
25 RwYang
26 )
27 import rift.tasklets
28
29 import requests
30
31
32 class NsrOpDataDtsHandler(object):
33 """ The network service op data DTS handler """
34 XPATH = "D,/nsr:ns-instance-opdata/nsr:nsr"
35
36 def __init__(self, dts, log, loop):
37 self._dts = dts
38 self._log = log
39 self._loop = loop
40 self._regh = None
41
42 @property
43 def regh(self):
44 """ Return the registration handle"""
45 return self._regh
46
47 @asyncio.coroutine
48 def register(self):
49 """ Register for Nsr op data publisher registration"""
50 self._log.debug("Registering Nsr op data path %s as publisher",
51 NsrOpDataDtsHandler.XPATH)
52
53 hdl = rift.tasklets.DTS.RegistrationHandler()
54 with self._dts.group_create() as group:
55 self._regh = group.register(xpath=NsrOpDataDtsHandler.XPATH,
56 handler=hdl,
57 flags=rwdts.Flag.PUBLISHER | rwdts.Flag.NO_PREP_READ)
58
59 @asyncio.coroutine
60 def create(self, xact, path, msg):
61 """
62 Create an NS record in DTS with the path and message
63 """
64 self._log.debug("Creating NSR xact = %s, %s:%s", xact, path, msg)
65 self.regh.create_element(path, msg)
66 self._log.debug("Created NSR xact = %s, %s:%s", xact, path, msg)
67
68 @asyncio.coroutine
69 def update(self, xact, path, msg, flags=rwdts.XactFlag.REPLACE):
70 """
71 Update an NS record in DTS with the path and message
72 """
73 self._log.debug("Updating NSR xact = %s, %s:%s regh = %s", xact, path, msg, self.regh)
74 self.regh.update_element(path, msg, flags)
75 self._log.debug("Updated NSR xact = %s, %s:%s", xact, path, msg)
76
77 @asyncio.coroutine
78 def delete(self, xact, path):
79 """
80 Update an NS record in DTS with the path and message
81 """
82 self._log.debug("Deleting NSR xact:%s, path:%s", xact, path)
83 self.regh.delete_element(path)
84 self._log.debug("Deleted NSR xact:%s, path:%s", xact, path)
85
86
87
88 class VnfrPublisherDtsHandler(object):
89 """ Registers 'D,/vnfr:vnfr-catalog/vnfr:vnfr' DTS"""
90 XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr"
91
92 def __init__(self, dts, log, loop):
93 self._dts = dts
94 self._log = log
95 self._loop = loop
96
97 self._regh = None
98
99 @property
100 def regh(self):
101 """ Return registration handle"""
102 return self._regh
103
104 @asyncio.coroutine
105 def register(self):
106 """ Register for Vvnfr create/update/delete/read requests from dts """
107
108 @asyncio.coroutine
109 def on_prepare(xact_info, action, ks_path, msg):
110 """ prepare callback from dts """
111 self._log.debug(
112 "Got vnfr on_prepare callback (xact_info: %s, action: %s): %s",
113 xact_info, action, msg
114 )
115 raise NotImplementedError(
116 "%s action on VirtualNetworkFunctionRecord not supported",
117 action)
118
119 self._log.debug("Registering for VNFR using xpath: %s",
120 VnfrPublisherDtsHandler.XPATH,)
121
122 hdl = rift.tasklets.DTS.RegistrationHandler()
123 with self._dts.group_create() as group:
124 self._regh = group.register(xpath=VnfrPublisherDtsHandler.XPATH,
125 handler=hdl,
126 flags=(rwdts.Flag.PUBLISHER |
127 rwdts.Flag.NO_PREP_READ |
128 rwdts.Flag.CACHE),)
129
130 @asyncio.coroutine
131 def create(self, xact, path, msg):
132 """
133 Create a VNFR record in DTS with path and message
134 """
135 self._log.debug("Creating VNFR xact = %s, %s:%s",
136 xact, path, msg)
137 self.regh.create_element(path, msg)
138 self._log.debug("Created VNFR xact = %s, %s:%s",
139 xact, path, msg)
140
141 @asyncio.coroutine
142 def update(self, xact, path, msg):
143 """
144 Update a VNFR record in DTS with path and message
145 """
146 self._log.debug("Updating VNFR xact = %s, %s:%s",
147 xact, path, msg)
148 self.regh.update_element(path, msg)
149 self._log.debug("Updated VNFR xact = %s, %s:%s",
150 xact, path, msg)
151
152 @asyncio.coroutine
153 def delete(self, xact, path):
154 """
155 Delete a VNFR record in DTS with path and message
156 """
157 self._log.debug("Deleting VNFR xact = %s, %s", xact, path)
158 self.regh.delete_element(path)
159 self._log.debug("Deleted VNFR xact = %s, %s", xact, path)
160
161
162 class VlrPublisherDtsHandler(object):
163 """ registers 'D,/vlr:vlr-catalog/vlr:vlr """
164 XPATH = "D,/vlr:vlr-catalog/vlr:vlr"
165
166 def __init__(self, dts, log, loop):
167 self._dts = dts
168 self._log = log
169 self._loop = loop
170
171 self._regh = None
172
173 @property
174 def regh(self):
175 """ Return registration handle"""
176 return self._regh
177
178 @asyncio.coroutine
179 def register(self):
180 """ Register for vlr create/update/delete/read requests from dts """
181
182 @asyncio.coroutine
183 def on_prepare(xact_info, action, ks_path, msg):
184 """ prepare callback from dts """
185 self._log.debug(
186 "Got vlr on_prepare callback (xact_info: %s, action: %s): %s",
187 xact_info, action, msg
188 )
189 raise NotImplementedError(
190 "%s action on VirtualLinkRecord not supported",
191 action)
192
193 self._log.debug("Registering for VLR using xpath: %s",
194 VlrPublisherDtsHandler.XPATH,)
195
196 hdl = rift.tasklets.DTS.RegistrationHandler()
197 with self._dts.group_create() as group:
198 self._regh = group.register(xpath=VlrPublisherDtsHandler.XPATH,
199 handler=hdl,
200 flags=(rwdts.Flag.PUBLISHER |
201 rwdts.Flag.NO_PREP_READ |
202 rwdts.Flag.CACHE),)
203
204 @asyncio.coroutine
205 def create(self, xact, path, msg):
206 """
207 Create a VLR record in DTS with path and message
208 """
209 self._log.debug("Creating VLR xact = %s, %s:%s",
210 xact, path, msg)
211 self.regh.create_element(path, msg)
212 self._log.debug("Created VLR xact = %s, %s:%s",
213 xact, path, msg)
214
215 @asyncio.coroutine
216 def update(self, xact, path, msg):
217 """
218 Update a VLR record in DTS with path and message
219 """
220 self._log.debug("Updating VLR xact = %s, %s:%s",
221 xact, path, msg)
222 self.regh.update_element(path, msg)
223 self._log.debug("Updated VLR xact = %s, %s:%s",
224 xact, path, msg)
225
226 @asyncio.coroutine
227 def delete(self, xact, path):
228 """
229 Delete a VLR record in DTS with path and message
230 """
231 self._log.debug("Deleting VLR xact = %s, %s", xact, path)
232 self.regh.delete_element(path)
233 self._log.debug("Deleted VLR xact = %s, %s", xact, path)
234
235
236 class VnfdPublisher(object):
237 AUTH = ('admin', 'admin')
238 HEADERS = {"content-type": "application/vnd.yang.data+json"}
239
240
241 def __init__(self, use_ssl, ssl_cert, ssl_key, loop):
242 self.use_ssl = use_ssl
243 self.ssl_cert = ssl_cert
244 self.ssl_key = ssl_key
245 self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
246 self.loop = loop
247
248 @asyncio.coroutine
249 def update(self, vnfd):
250 def update(vnfd):
251 """
252 Update VNFD record using rest API, as the config data is handled
253 by uAgent and stored in CDB
254 """
255
256 scheme = "https" if self.use_ssl else "http"
257
258 url = "{}://127.0.0.1:8008/api/config/vnfd-catalog/vnfd/{}"
259
260 model = RwYang.Model.create_libncx()
261 model.load_module("rw-vnfd")
262 model.load_module("vnfd")
263
264 data = vnfd.to_json(model)
265
266 key = "vnfd:vnfd-catalog"
267 newdict = json.loads(data)
268 if key in newdict:
269 data = json.dumps(newdict[key])
270
271 options = {"data": data,
272 "headers": VnfdPublisher.HEADERS,
273 "auth": VnfdPublisher.AUTH}
274
275 if self.use_ssl:
276 options["verify"] = False
277 options["cert"] = (self.ssl_cert, self.ssl_key)
278
279 response = requests.put(
280 url.format(scheme, vnfd.id),
281 **options
282 )
283
284 status = yield from self.loop.run_in_executor(
285 None,
286 update,
287 vnfd
288 )
289