2 # Copyright 2016 RIFT.IO Inc
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
18 import concurrent
.futures
21 from gi
.repository
import (
32 class NsrOpDataDtsHandler(object):
33 """ The network service op data DTS handler """
34 XPATH
= "D,/nsr:ns-instance-opdata/nsr:nsr"
36 def __init__(self
, dts
, log
, loop
):
44 """ Return the registration handle"""
49 """ Register for Nsr op data publisher registration"""
50 self
._log
.debug("Registering Nsr op data path %s as publisher",
51 NsrOpDataDtsHandler
.XPATH
)
53 hdl
= rift
.tasklets
.DTS
.RegistrationHandler()
54 with self
._dts
.group_create() as group
:
55 self
._regh
= group
.register(xpath
=NsrOpDataDtsHandler
.XPATH
,
57 flags
=rwdts
.Flag
.PUBLISHER | rwdts
.Flag
.NO_PREP_READ
)
60 def create(self
, xact
, path
, msg
):
62 Create an NS record in DTS with the path and message
64 self
._log
.debug("Creating NSR xact = %s, %s:%s", xact
, path
, msg
)
65 self
.regh
.create_element(path
, msg
, xact
=xact
)
66 self
._log
.debug("Created NSR xact = %s, %s:%s", xact
, path
, msg
)
69 def update(self
, xact
, path
, msg
, flags
=rwdts
.XactFlag
.REPLACE
):
71 Update an NS record in DTS with the path and message
73 self
._log
.debug("Updating NSR xact = %s, %s:%s regh = %s", xact
, path
, msg
, self
.regh
)
74 self
.regh
.update_element(path
, msg
, flags
, xact
=xact
)
75 self
._log
.debug("Updated NSR xact = %s, %s:%s", xact
, path
, msg
)
78 def delete(self
, xact
, path
):
80 Update an NS record in DTS with the path and message
82 self
._log
.debug("Deleting NSR xact:%s, path:%s", xact
, path
)
83 self
.regh
.delete_element(path
, xact
=xact
)
84 self
._log
.debug("Deleted NSR xact:%s, path:%s", xact
, path
)
87 class VnfrPublisherDtsHandler(object):
88 """ Registers 'D,/vnfr:vnfr-catalog/vnfr:vnfr' DTS"""
89 XPATH
= "D,/vnfr:vnfr-catalog/vnfr:vnfr"
91 def __init__(self
, dts
, log
, loop
):
100 """ Return registration handle"""
105 """ Register for Vvnfr create/update/delete/read requests from dts """
108 def on_prepare(xact_info
, action
, ks_path
, msg
):
109 """ prepare callback from dts """
111 "Got vnfr on_prepare callback (xact_info: %s, action: %s): %s",
112 xact_info
, action
, msg
114 raise NotImplementedError(
115 "%s action on VirtualNetworkFunctionRecord not supported",
118 self
._log
.debug("Registering for VNFR using xpath: %s",
119 VnfrPublisherDtsHandler
.XPATH
,)
121 hdl
= rift
.tasklets
.DTS
.RegistrationHandler()
122 with self
._dts
.group_create() as group
:
123 self
._regh
= group
.register(xpath
=VnfrPublisherDtsHandler
.XPATH
,
125 flags
=(rwdts
.Flag
.PUBLISHER |
126 rwdts
.Flag
.NO_PREP_READ |
130 def create(self
, xact
, path
, msg
):
132 Create a VNFR record in DTS with path and message
134 self
._log
.debug("Creating VNFR xact = %s, %s:%s",
136 self
.regh
.create_element(path
, msg
, xact
=xact
)
137 self
._log
.debug("Created VNFR xact = %s, %s:%s",
141 def update(self
, xact
, path
, msg
):
143 Update a VNFR record in DTS with path and message
145 self
._log
.debug("Updating VNFR xact = %s, %s:%s",
147 self
.regh
.update_element(path
, msg
, xact
=xact
)
148 self
._log
.debug("Updated VNFR xact = %s, %s:%s",
152 def delete(self
, xact
, path
):
154 Delete a VNFR record in DTS with path and message
156 self
._log
.debug("Deleting VNFR xact = %s, %s", xact
, path
)
157 self
.regh
.delete_element(path
, xact
=xact
)
158 self
._log
.debug("Deleted VNFR xact = %s, %s", xact
, path
)
161 class VlrPublisherDtsHandler(object):
162 """ registers 'D,/vlr:vlr-catalog/vlr:vlr """
163 XPATH
= "D,/vlr:vlr-catalog/vlr:vlr"
165 def __init__(self
, dts
, log
, loop
):
174 """ Return registration handle"""
179 """ Register for vlr create/update/delete/read requests from dts """
182 def on_prepare(xact_info
, action
, ks_path
, msg
):
183 """ prepare callback from dts """
185 "Got vlr on_prepare callback (xact_info: %s, action: %s): %s",
186 xact_info
, action
, msg
188 raise NotImplementedError(
189 "%s action on VirtualLinkRecord not supported",
192 self
._log
.debug("Registering for VLR using xpath: %s",
193 VlrPublisherDtsHandler
.XPATH
,)
195 hdl
= rift
.tasklets
.DTS
.RegistrationHandler()
196 with self
._dts
.group_create() as group
:
197 self
._regh
= group
.register(xpath
=VlrPublisherDtsHandler
.XPATH
,
199 flags
=(rwdts
.Flag
.PUBLISHER |
200 rwdts
.Flag
.NO_PREP_READ |
204 def create(self
, xact
, path
, msg
):
206 Create a VLR record in DTS with path and message
208 self
._log
.debug("Creating VLR xact = %s, %s:%s",
210 self
.regh
.create_element(path
, msg
, xact
=xact
)
211 self
._log
.debug("Created VLR xact = %s, %s:%s",
215 def update(self
, xact
, path
, msg
):
217 Update a VLR record in DTS with path and message
219 self
._log
.debug("Updating VLR xact = %s, %s:%s",
221 self
.regh
.update_element(path
, msg
, xact
=xact
)
222 self
._log
.debug("Updated VLR xact = %s, %s:%s",
226 def delete(self
, xact
, path
):
228 Delete a VLR record in DTS with path and message
230 self
._log
.debug("Deleting VLR xact = %s, %s", xact
, path
)
231 self
.regh
.delete_element(path
, xact
=xact
)
232 self
._log
.debug("Deleted VLR xact = %s, %s", xact
, path
)
235 class VnfdPublisher(object):
236 AUTH
= ('admin', 'admin')
237 HEADERS
= {"content-type": "application/vnd.yang.data+json"}
240 def __init__(self
, use_ssl
, ssl_cert
, ssl_key
, loop
):
241 self
.use_ssl
= use_ssl
242 self
.ssl_cert
= ssl_cert
243 self
.ssl_key
= ssl_key
244 self
.executor
= concurrent
.futures
.ThreadPoolExecutor(max_workers
=1)
248 def update(self
, vnfd
):
251 Update VNFD record using rest API, as the config data is handled
252 by uAgent and stored in CDB
255 scheme
= "https" if self
.use_ssl
else "http"
257 url
= "{}://127.0.0.1:8008/api/config/vnfd-catalog/vnfd/{}"
259 model
= RwYang
.Model
.create_libncx()
260 model
.load_module("rw-vnfd")
261 model
.load_module("vnfd")
263 data
= vnfd
.to_json(model
)
265 key
= "vnfd:vnfd-catalog"
266 newdict
= json
.loads(data
)
268 data
= json
.dumps(newdict
[key
])
270 options
= {"data": data
,
271 "headers": VnfdPublisher
.HEADERS
,
272 "auth": VnfdPublisher
.AUTH
}
275 options
["verify"] = False
276 options
["cert"] = (self
.ssl_cert
, self
.ssl_key
)
278 response
= requests
.put(
279 url
.format(scheme
, vnfd
.id),
283 status
= yield from self
.loop
.run_in_executor(