Bug 196 SO+UI container memory leak
[osm/SO.git] / rwlaunchpad / plugins / rwnsm / rift / tasklets / rwnsmtasklet / publisher.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 import asyncio
18 import concurrent.futures
19 import json
20
21 from gi.repository import (
22 RwDts as rwdts,
23 RwTypes,
24 RwVnfdYang,
25 RwYang
26 )
27 import rift.tasklets
28
29 import requests
30
31
32 class NsrOpDataDtsHandler(object):
33 """ The network service op data DTS handler """
34 XPATH = "D,/nsr:ns-instance-opdata/nsr:nsr"
35
36 def __init__(self, dts, log, loop):
37 self._dts = dts
38 self._log = log
39 self._loop = loop
40 self._regh = None
41
42 @property
43 def regh(self):
44 """ Return the registration handle"""
45 return self._regh
46
47 @asyncio.coroutine
48 def register(self):
49 """ Register for Nsr op data publisher registration"""
50 self._log.debug("Registering Nsr op data path %s as publisher",
51 NsrOpDataDtsHandler.XPATH)
52
53 hdl = rift.tasklets.DTS.RegistrationHandler()
54 with self._dts.group_create() as group:
55 self._regh = group.register(xpath=NsrOpDataDtsHandler.XPATH,
56 handler=hdl,
57 flags=rwdts.Flag.PUBLISHER | rwdts.Flag.NO_PREP_READ)
58
59 @asyncio.coroutine
60 def create(self, xact, path, msg):
61 """
62 Create an NS record in DTS with the path and message
63 """
64 self._log.debug("Creating NSR xact = %s, %s:%s", xact, path, msg)
65 self.regh.create_element(path, msg, xact=xact)
66 self._log.debug("Created NSR xact = %s, %s:%s", xact, path, msg)
67
68 @asyncio.coroutine
69 def update(self, xact, path, msg, flags=rwdts.XactFlag.REPLACE):
70 """
71 Update an NS record in DTS with the path and message
72 """
73 self._log.debug("Updating NSR xact = %s, %s:%s regh = %s", xact, path, msg, self.regh)
74 self.regh.update_element(path, msg, flags, xact=xact)
75 self._log.debug("Updated NSR xact = %s, %s:%s", xact, path, msg)
76
77 @asyncio.coroutine
78 def delete(self, xact, path):
79 """
80 Update an NS record in DTS with the path and message
81 """
82 self._log.debug("Deleting NSR xact:%s, path:%s", xact, path)
83 self.regh.delete_element(path, xact=xact)
84 self._log.debug("Deleted NSR xact:%s, path:%s", xact, path)
85
86
87 class VnfrPublisherDtsHandler(object):
88 """ Registers 'D,/vnfr:vnfr-catalog/vnfr:vnfr' DTS"""
89 XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr"
90
91 def __init__(self, dts, log, loop):
92 self._dts = dts
93 self._log = log
94 self._loop = loop
95
96 self._regh = None
97
98 @property
99 def regh(self):
100 """ Return registration handle"""
101 return self._regh
102
103 @asyncio.coroutine
104 def register(self):
105 """ Register for Vvnfr create/update/delete/read requests from dts """
106
107 @asyncio.coroutine
108 def on_prepare(xact_info, action, ks_path, msg):
109 """ prepare callback from dts """
110 self._log.debug(
111 "Got vnfr on_prepare callback (xact_info: %s, action: %s): %s",
112 xact_info, action, msg
113 )
114 raise NotImplementedError(
115 "%s action on VirtualNetworkFunctionRecord not supported",
116 action)
117
118 self._log.debug("Registering for VNFR using xpath: %s",
119 VnfrPublisherDtsHandler.XPATH,)
120
121 hdl = rift.tasklets.DTS.RegistrationHandler()
122 with self._dts.group_create() as group:
123 self._regh = group.register(xpath=VnfrPublisherDtsHandler.XPATH,
124 handler=hdl,
125 flags=(rwdts.Flag.PUBLISHER |
126 rwdts.Flag.NO_PREP_READ |
127 rwdts.Flag.CACHE),)
128
129 @asyncio.coroutine
130 def create(self, xact, path, msg):
131 """
132 Create a VNFR record in DTS with path and message
133 """
134 self._log.debug("Creating VNFR xact = %s, %s:%s",
135 xact, path, msg)
136 self.regh.create_element(path, msg, xact=xact)
137 self._log.debug("Created VNFR xact = %s, %s:%s",
138 xact, path, msg)
139
140 @asyncio.coroutine
141 def update(self, xact, path, msg):
142 """
143 Update a VNFR record in DTS with path and message
144 """
145 self._log.debug("Updating VNFR xact = %s, %s:%s",
146 xact, path, msg)
147 self.regh.update_element(path, msg, xact=xact)
148 self._log.debug("Updated VNFR xact = %s, %s:%s",
149 xact, path, msg)
150
151 @asyncio.coroutine
152 def delete(self, xact, path):
153 """
154 Delete a VNFR record in DTS with path and message
155 """
156 self._log.debug("Deleting VNFR xact = %s, %s", xact, path)
157 self.regh.delete_element(path, xact=xact)
158 self._log.debug("Deleted VNFR xact = %s, %s", xact, path)
159
160
161 class VlrPublisherDtsHandler(object):
162 """ registers 'D,/vlr:vlr-catalog/vlr:vlr """
163 XPATH = "D,/vlr:vlr-catalog/vlr:vlr"
164
165 def __init__(self, dts, log, loop):
166 self._dts = dts
167 self._log = log
168 self._loop = loop
169
170 self._regh = None
171
172 @property
173 def regh(self):
174 """ Return registration handle"""
175 return self._regh
176
177 @asyncio.coroutine
178 def register(self):
179 """ Register for vlr create/update/delete/read requests from dts """
180
181 @asyncio.coroutine
182 def on_prepare(xact_info, action, ks_path, msg):
183 """ prepare callback from dts """
184 self._log.debug(
185 "Got vlr on_prepare callback (xact_info: %s, action: %s): %s",
186 xact_info, action, msg
187 )
188 raise NotImplementedError(
189 "%s action on VirtualLinkRecord not supported",
190 action)
191
192 self._log.debug("Registering for VLR using xpath: %s",
193 VlrPublisherDtsHandler.XPATH,)
194
195 hdl = rift.tasklets.DTS.RegistrationHandler()
196 with self._dts.group_create() as group:
197 self._regh = group.register(xpath=VlrPublisherDtsHandler.XPATH,
198 handler=hdl,
199 flags=(rwdts.Flag.PUBLISHER |
200 rwdts.Flag.NO_PREP_READ |
201 rwdts.Flag.CACHE),)
202
203 @asyncio.coroutine
204 def create(self, xact, path, msg):
205 """
206 Create a VLR record in DTS with path and message
207 """
208 self._log.debug("Creating VLR xact = %s, %s:%s",
209 xact, path, msg)
210 self.regh.create_element(path, msg, xact=xact)
211 self._log.debug("Created VLR xact = %s, %s:%s",
212 xact, path, msg)
213
214 @asyncio.coroutine
215 def update(self, xact, path, msg):
216 """
217 Update a VLR record in DTS with path and message
218 """
219 self._log.debug("Updating VLR xact = %s, %s:%s",
220 xact, path, msg)
221 self.regh.update_element(path, msg, xact=xact)
222 self._log.debug("Updated VLR xact = %s, %s:%s",
223 xact, path, msg)
224
225 @asyncio.coroutine
226 def delete(self, xact, path):
227 """
228 Delete a VLR record in DTS with path and message
229 """
230 self._log.debug("Deleting VLR xact = %s, %s", xact, path)
231 self.regh.delete_element(path, xact=xact)
232 self._log.debug("Deleted VLR xact = %s, %s", xact, path)
233
234
235 class VnfdPublisher(object):
236 AUTH = ('admin', 'admin')
237 HEADERS = {"content-type": "application/vnd.yang.data+json"}
238
239
240 def __init__(self, use_ssl, ssl_cert, ssl_key, loop):
241 self.use_ssl = use_ssl
242 self.ssl_cert = ssl_cert
243 self.ssl_key = ssl_key
244 self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
245 self.loop = loop
246
247 @asyncio.coroutine
248 def update(self, vnfd):
249 def update(vnfd):
250 """
251 Update VNFD record using rest API, as the config data is handled
252 by uAgent and stored in CDB
253 """
254
255 scheme = "https" if self.use_ssl else "http"
256
257 url = "{}://127.0.0.1:8008/api/config/vnfd-catalog/vnfd/{}"
258
259 model = RwYang.Model.create_libncx()
260 model.load_module("rw-vnfd")
261 model.load_module("vnfd")
262
263 data = vnfd.to_json(model)
264
265 key = "vnfd:vnfd-catalog"
266 newdict = json.loads(data)
267 if key in newdict:
268 data = json.dumps(newdict[key])
269
270 options = {"data": data,
271 "headers": VnfdPublisher.HEADERS,
272 "auth": VnfdPublisher.AUTH}
273
274 if self.use_ssl:
275 options["verify"] = False
276 options["cert"] = (self.ssl_cert, self.ssl_key)
277
278 response = requests.put(
279 url.format(scheme, vnfd.id),
280 **options
281 )
282
283 status = yield from self.loop.run_in_executor(
284 None,
285 update,
286 vnfd
287 )