2 # Copyright 2016 RIFT.IO Inc
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
18 import concurrent
.futures
21 from gi
.repository
import (
24 RwProjectVnfdYang
as RwVnfdYang
,
32 class NsrOpDataDtsHandler(object):
33 """ The network service op data DTS handler """
34 XPATH
= "D,/nsr:ns-instance-opdata/nsr:nsr"
36 def __init__(self
, dts
, log
, loop
, project
):
40 self
._project
= project
45 """ Return the registration handle"""
50 """ Register for Nsr op data publisher registration"""
54 xpath
= self
._project
.add_project(NsrOpDataDtsHandler
.XPATH
)
55 self
._log
.debug("Registering Nsr op data path {} as publisher".
58 hdl
= rift
.tasklets
.DTS
.RegistrationHandler()
59 with self
._dts
.group_create() as group
:
60 self
._regh
= group
.register(xpath
=xpath
,
62 flags
=rwdts
.Flag
.PUBLISHER | rwdts
.Flag
.NO_PREP_READ
)
65 def create(self
, xact
, xpath
, msg
):
67 Create an NS record in DTS with the path and message
69 path
= self
._project
.add_project(xpath
)
70 self
._log
.debug("Creating NSR xact = %s, %s:%s", xact
, path
, msg
)
71 self
.regh
.create_element(path
, msg
, xact
=xact
)
72 self
._log
.debug("Created NSR xact = %s, %s:%s", xact
, path
, msg
)
75 def update(self
, xact
, xpath
, msg
, flags
=rwdts
.XactFlag
.REPLACE
):
77 Update an NS record in DTS with the path and message
79 path
= self
._project
.add_project(xpath
)
80 self
._log
.debug("Updating NSR xact = %s, %s:%s regh = %s", xact
, path
, msg
, self
.regh
)
81 self
.regh
.update_element(path
, msg
, flags
, xact
=xact
)
82 self
._log
.debug("Updated NSR xact = %s, %s:%s", xact
, path
, msg
)
85 def delete(self
, xact
, xpath
):
87 Update an NS record in DTS with the path and message
89 path
= self
._project
.add_project(xpath
)
90 self
._log
.debug("Deleting NSR xact:%s, path:%s", xact
, path
)
91 self
.regh
.delete_element(path
, xact
=xact
)
92 self
._log
.debug("Deleted NSR xact:%s, path:%s", xact
, path
)
96 self
._regh
.deregister()
99 class VnfrPublisherDtsHandler(object):
100 """ Registers 'D,/rw-project:project/vnfr:vnfr-catalog/vnfr:vnfr' DTS"""
101 XPATH
= "D,/vnfr:vnfr-catalog/vnfr:vnfr"
103 def __init__(self
, dts
, log
, loop
, project
):
107 self
._project
= project
113 """ Return registration handle"""
118 """ Register for Vvnfr create/update/delete/read requests from dts """
123 def on_prepare(xact_info
, action
, ks_path
, msg
):
124 """ prepare callback from dts """
126 "Got vnfr on_prepare callback (xact_info: %s, action: %s): %s",
127 xact_info
, action
, msg
129 raise NotImplementedError(
130 "%s action on VirtualNetworkFunctionRecord not supported",
133 xpath
= self
._project
.add_project(VnfrPublisherDtsHandler
.XPATH
)
134 self
._log
.debug("Registering for VNFR using xpath: {}".
137 hdl
= rift
.tasklets
.DTS
.RegistrationHandler()
138 with self
._dts
.group_create() as group
:
139 self
._regh
= group
.register(xpath
=xpath
,
141 flags
=(rwdts
.Flag
.PUBLISHER |
143 rwdts
.Flag
.NO_PREP_READ |
146 def deregister(self
):
148 self
._regh
.deregister()
152 def create(self
, xact
, path
, msg
):
154 Create a VNFR record in DTS with path and message
156 self
._log
.debug("Creating VNFR xact = %s, %s:%s",
158 self
.regh
.create_element(path
, msg
, xact
=xact
)
159 self
._log
.debug("Created VNFR xact = %s, %s:%s",
163 def update(self
, xact
, path
, msg
):
165 Update a VNFR record in DTS with path and message
167 self
._log
.debug("Updating VNFR xact = %s, %s:%s",
169 self
.regh
.update_element(path
, msg
, xact
=xact
)
170 self
._log
.debug("Updated VNFR xact = %s, %s:%s",
174 def delete(self
, xact
, path
):
176 Delete a VNFR record in DTS with path and message
178 self
._log
.debug("Deleting VNFR xact = %s, %s", xact
, path
)
179 self
.regh
.delete_element(path
, xact
=xact
)
180 self
._log
.debug("Deleted VNFR xact = %s, %s", xact
, path
)
183 class VlrPublisherDtsHandler(object):
184 """ registers 'D,/rw-project:project/vlr:vlr-catalog/vlr:vlr """
185 XPATH
= "D,/vlr:vlr-catalog/vlr:vlr"
187 def __init__(self
, dts
, log
, loop
, project
):
191 self
._project
= project
197 """ Return registration handle"""
202 """ Register for vlr create/update/delete/read requests from dts """
208 def on_prepare(xact_info
, action
, ks_path
, msg
):
209 """ prepare callback from dts """
211 "Got vlr on_prepare callback (xact_info: %s, action: %s): %s",
212 xact_info
, action
, msg
214 raise NotImplementedError(
215 "%s action on VirtualLinkRecord not supported",
218 xpath
= self
._project
.add_project(VlrPublisherDtsHandler
.XPATH
)
219 self
._log
.debug("Registering for VLR using xpath: {}".
222 hdl
= rift
.tasklets
.DTS
.RegistrationHandler()
223 with self
._dts
.group_create() as group
:
224 self
._regh
= group
.register(xpath
=xpath
,
226 flags
=(rwdts
.Flag
.PUBLISHER |
227 rwdts
.Flag
.NO_PREP_READ |
230 def deregister(self
):
232 self
._regh
.deregister()
236 def create(self
, xact
, path
, msg
):
238 Create a VLR record in DTS with path and message
240 self
._log
.debug("Creating VLR xact = %s, %s:%s",
242 self
.regh
.create_element(path
, msg
, xact
=xact
)
243 self
._log
.debug("Created VLR xact = %s, %s:%s",
247 def update(self
, xact
, path
, msg
):
249 Update a VLR record in DTS with path and message
251 self
._log
.debug("Updating VLR xact = %s, %s:%s",
253 self
.regh
.update_element(path
, msg
, xact
=xact
)
254 self
._log
.debug("Updated VLR xact = %s, %s:%s",
258 def delete(self
, xact
, path
):
260 Delete a VLR record in DTS with path and message
262 self
._log
.debug("Deleting VLR xact = %s, %s", xact
, path
)
263 self
.regh
.delete_element(path
, xact
=xact
)
264 self
._log
.debug("Deleted VLR xact = %s, %s", xact
, path
)
267 class VnfdPublisher(object):
268 AUTH
= ('@rift', 'rift')
269 HEADERS
= {"content-type": "application/vnd.yang.data+json"}
272 def __init__(self
, use_ssl
, ssl_cert
, ssl_key
, loop
, project
):
273 self
.use_ssl
= use_ssl
274 self
.ssl_cert
= ssl_cert
275 self
.ssl_key
= ssl_key
276 self
._project
= project
277 self
.executor
= concurrent
.futures
.ThreadPoolExecutor(max_workers
=1)
281 def update(self
, vnfd
):
284 Update VNFD record using rest API, as the config data is handled
285 by uAgent and stored in CDB
288 scheme
= "https" if self
.use_ssl
else "http"
290 url
= "{}://127.0.0.1:8008/api/config/project/{}/vnfd-catalog/vnfd/{}"
292 model
= RwYang
.Model
.create_libyang()
293 model
.load_module("rw-project-vnfd")
294 model
.load_module("project-vnfd")
296 data
= vnfd
.to_json(model
)
298 key
= "project-vnfd:vnfd-catalog"
299 newdict
= json
.loads(data
)
301 data
= json
.dumps(newdict
[key
])
303 options
= {"data": data
,
304 "headers": VnfdPublisher
.HEADERS
,
305 "auth": VnfdPublisher
.AUTH
}
308 options
["verify"] = False
309 options
["cert"] = (self
.ssl_cert
, self
.ssl_key
)
311 response
= requests
.put(
312 url
.format(scheme
, self
._project
.name
, vnfd
.id),
316 status
= yield from self
.loop
.run_in_executor(