e3ae5cf1765ced65ce7d8addcd61f04fc47d2b0f
2 # Copyright 2016 RIFT.IO Inc
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
18 import concurrent
.futures
21 from gi
.repository
import (
32 class NsrOpDataDtsHandler(object):
33 """ The network service op data DTS handler """
34 XPATH
= "D,/nsr:ns-instance-opdata/nsr:nsr"
36 def __init__(self
, dts
, log
, loop
, project
):
40 self
._project
= project
45 """ Return the registration handle"""
50 """ Register for Nsr op data publisher registration"""
54 xpath
= self
._project
.add_project(NsrOpDataDtsHandler
.XPATH
)
55 self
._log
.debug("Registering Nsr op data path {} as publisher".
58 hdl
= rift
.tasklets
.DTS
.RegistrationHandler()
59 with self
._dts
.group_create() as group
:
60 self
._regh
= group
.register(xpath
=xpath
,
62 flags
=rwdts
.Flag
.PUBLISHER | rwdts
.Flag
.NO_PREP_READ
)
65 def create(self
, xact
, xpath
, msg
):
67 Create an NS record in DTS with the path and message
69 path
= self
._project
.add_project(xpath
)
70 self
._log
.debug("Creating NSR xact = %s, %s:%s", xact
, path
, msg
)
71 self
.regh
.create_element(path
, msg
)
72 self
._log
.debug("Created NSR xact = %s, %s:%s", xact
, path
, msg
)
75 def update(self
, xact
, xpath
, msg
, flags
=rwdts
.XactFlag
.REPLACE
):
77 Update an NS record in DTS with the path and message
79 path
= self
._project
.add_project(xpath
)
80 self
._log
.debug("Updating NSR xact = %s, %s:%s regh = %s", xact
, path
, msg
, self
.regh
)
81 self
.regh
.update_element(path
, msg
, flags
)
82 self
._log
.debug("Updated NSR xact = %s, %s:%s", xact
, path
, msg
)
85 def delete(self
, xact
, xpath
):
87 Update an NS record in DTS with the path and message
89 path
= self
._project
.add_project(xpath
)
90 self
._log
.debug("Deleting NSR xact:%s, path:%s", xact
, path
)
91 self
.regh
.delete_element(path
)
92 self
._log
.debug("Deleted NSR xact:%s, path:%s", xact
, path
)
96 class VnfrPublisherDtsHandler(object):
97 """ Registers 'D,/rw-project:project/vnfr:vnfr-catalog/vnfr:vnfr' DTS"""
98 XPATH
= "D,/vnfr:vnfr-catalog/vnfr:vnfr"
100 def __init__(self
, dts
, log
, loop
, project
):
104 self
._project
= project
110 """ Return registration handle"""
115 """ Register for Vvnfr create/update/delete/read requests from dts """
120 def on_prepare(xact_info
, action
, ks_path
, msg
):
121 """ prepare callback from dts """
123 "Got vnfr on_prepare callback (xact_info: %s, action: %s): %s",
124 xact_info
, action
, msg
126 raise NotImplementedError(
127 "%s action on VirtualNetworkFunctionRecord not supported",
130 xpath
= self
._project
.add_project(VnfrPublisherDtsHandler
.XPATH
)
131 self
._log
.debug("Registering for VNFR using xpath: {}".
134 hdl
= rift
.tasklets
.DTS
.RegistrationHandler()
135 with self
._dts
.group_create() as group
:
136 self
._regh
= group
.register(xpath
=xpath
,
138 flags
=(rwdts
.Flag
.PUBLISHER |
139 rwdts
.Flag
.NO_PREP_READ |
143 def create(self
, xact
, path
, msg
):
145 Create a VNFR record in DTS with path and message
147 self
._log
.debug("Creating VNFR xact = %s, %s:%s",
149 self
.regh
.create_element(path
, msg
)
150 self
._log
.debug("Created VNFR xact = %s, %s:%s",
154 def update(self
, xact
, path
, msg
):
156 Update a VNFR record in DTS with path and message
158 self
._log
.debug("Updating VNFR xact = %s, %s:%s",
160 self
.regh
.update_element(path
, msg
)
161 self
._log
.debug("Updated VNFR xact = %s, %s:%s",
165 def delete(self
, xact
, path
):
167 Delete a VNFR record in DTS with path and message
169 self
._log
.debug("Deleting VNFR xact = %s, %s", xact
, path
)
170 self
.regh
.delete_element(path
)
171 self
._log
.debug("Deleted VNFR xact = %s, %s", xact
, path
)
174 class VlrPublisherDtsHandler(object):
175 """ registers 'D,/rw-project:project/vlr:vlr-catalog/vlr:vlr """
176 XPATH
= "D,/vlr:vlr-catalog/vlr:vlr"
178 def __init__(self
, dts
, log
, loop
, project
):
182 self
._project
= project
188 """ Return registration handle"""
193 """ Register for vlr create/update/delete/read requests from dts """
199 def on_prepare(xact_info
, action
, ks_path
, msg
):
200 """ prepare callback from dts """
202 "Got vlr on_prepare callback (xact_info: %s, action: %s): %s",
203 xact_info
, action
, msg
205 raise NotImplementedError(
206 "%s action on VirtualLinkRecord not supported",
209 xpath
= self
._project
.add_project(VlrPublisherDtsHandler
.XPATH
)
210 self
._log
.debug("Registering for VLR using xpath: {}".
213 hdl
= rift
.tasklets
.DTS
.RegistrationHandler()
214 with self
._dts
.group_create() as group
:
215 self
._regh
= group
.register(xpath
=xpath
,
217 flags
=(rwdts
.Flag
.PUBLISHER |
218 rwdts
.Flag
.NO_PREP_READ |
222 def create(self
, xact
, path
, msg
):
224 Create a VLR record in DTS with path and message
226 self
._log
.debug("Creating VLR xact = %s, %s:%s",
228 self
.regh
.create_element(path
, msg
)
229 self
._log
.debug("Created VLR xact = %s, %s:%s",
233 def update(self
, xact
, path
, msg
):
235 Update a VLR record in DTS with path and message
237 self
._log
.debug("Updating VLR xact = %s, %s:%s",
239 self
.regh
.update_element(path
, msg
)
240 self
._log
.debug("Updated VLR xact = %s, %s:%s",
244 def delete(self
, xact
, path
):
246 Delete a VLR record in DTS with path and message
248 self
._log
.debug("Deleting VLR xact = %s, %s", xact
, path
)
249 self
.regh
.delete_element(path
)
250 self
._log
.debug("Deleted VLR xact = %s, %s", xact
, path
)
253 class VnfdPublisher(object):
254 AUTH
= ('admin', 'admin')
255 HEADERS
= {"content-type": "application/vnd.yang.data+json"}
258 def __init__(self
, use_ssl
, ssl_cert
, ssl_key
, loop
, project
):
259 self
.use_ssl
= use_ssl
260 self
.ssl_cert
= ssl_cert
261 self
.ssl_key
= ssl_key
262 self
._project
= project
263 self
.executor
= concurrent
.futures
.ThreadPoolExecutor(max_workers
=1)
267 def update(self
, vnfd
):
270 Update VNFD record using rest API, as the config data is handled
271 by uAgent and stored in CDB
274 scheme
= "https" if self
.use_ssl
else "http"
276 url
= "{}://127.0.0.1:8008/api/config/project/{}/vnfd-catalog/vnfd/{}"
278 model
= RwYang
.Model
.create_libncx()
279 model
.load_module("rw-vnfd")
280 model
.load_module("vnfd")
282 data
= vnfd
.to_json(model
)
284 key
= "vnfd:vnfd-catalog"
285 newdict
= json
.loads(data
)
287 data
= json
.dumps(newdict
[key
])
289 options
= {"data": data
,
290 "headers": VnfdPublisher
.HEADERS
,
291 "auth": VnfdPublisher
.AUTH
}
294 options
["verify"] = False
295 options
["cert"] = (self
.ssl_cert
, self
.ssl_key
)
297 response
= requests
.put(
298 url
.format(scheme
, self
._project
.name
, vnfd
.id),
302 status
= yield from self
.loop
.run_in_executor(