2 # Copyright 2016 RIFT.IO Inc
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
18 import concurrent
.futures
21 from gi
.repository
import (
32 class NsrOpDataDtsHandler(object):
33 """ The network service op data DTS handler """
34 XPATH
= "D,/nsr:ns-instance-opdata/nsr:nsr"
36 def __init__(self
, dts
, log
, loop
):
44 """ Return the registration handle"""
49 """ Register for Nsr op data publisher registration"""
50 self
._log
.debug("Registering Nsr op data path %s as publisher",
51 NsrOpDataDtsHandler
.XPATH
)
53 hdl
= rift
.tasklets
.DTS
.RegistrationHandler()
54 with self
._dts
.group_create() as group
:
55 self
._regh
= group
.register(xpath
=NsrOpDataDtsHandler
.XPATH
,
57 flags
=rwdts
.Flag
.PUBLISHER | rwdts
.Flag
.NO_PREP_READ
)
60 def create(self
, xact
, path
, msg
):
62 Create an NS record in DTS with the path and message
64 self
._log
.debug("Creating NSR xact = %s, %s:%s", xact
, path
, msg
)
65 self
.regh
.create_element(path
, msg
)
66 self
._log
.debug("Created NSR xact = %s, %s:%s", xact
, path
, msg
)
69 def update(self
, xact
, path
, msg
, flags
=rwdts
.XactFlag
.REPLACE
):
71 Update an NS record in DTS with the path and message
73 self
._log
.debug("Updating NSR xact = %s, %s:%s regh = %s", xact
, path
, msg
, self
.regh
)
74 self
.regh
.update_element(path
, msg
, flags
)
75 self
._log
.debug("Updated NSR xact = %s, %s:%s", xact
, path
, msg
)
78 def delete(self
, xact
, path
):
80 Update an NS record in DTS with the path and message
82 self
._log
.debug("Deleting NSR xact:%s, path:%s", xact
, path
)
83 self
.regh
.delete_element(path
)
84 self
._log
.debug("Deleted NSR xact:%s, path:%s", xact
, path
)
88 class VnfrPublisherDtsHandler(object):
89 """ Registers 'D,/vnfr:vnfr-catalog/vnfr:vnfr' DTS"""
90 XPATH
= "D,/vnfr:vnfr-catalog/vnfr:vnfr"
92 def __init__(self
, dts
, log
, loop
):
101 """ Return registration handle"""
106 """ Register for Vvnfr create/update/delete/read requests from dts """
109 def on_prepare(xact_info
, action
, ks_path
, msg
):
110 """ prepare callback from dts """
112 "Got vnfr on_prepare callback (xact_info: %s, action: %s): %s",
113 xact_info
, action
, msg
115 raise NotImplementedError(
116 "%s action on VirtualNetworkFunctionRecord not supported",
119 self
._log
.debug("Registering for VNFR using xpath: %s",
120 VnfrPublisherDtsHandler
.XPATH
,)
122 hdl
= rift
.tasklets
.DTS
.RegistrationHandler()
123 with self
._dts
.group_create() as group
:
124 self
._regh
= group
.register(xpath
=VnfrPublisherDtsHandler
.XPATH
,
126 flags
=(rwdts
.Flag
.PUBLISHER |
127 rwdts
.Flag
.NO_PREP_READ |
131 def create(self
, xact
, path
, msg
):
133 Create a VNFR record in DTS with path and message
135 self
._log
.debug("Creating VNFR xact = %s, %s:%s",
137 self
.regh
.create_element(path
, msg
)
138 self
._log
.debug("Created VNFR xact = %s, %s:%s",
142 def update(self
, xact
, path
, msg
):
144 Update a VNFR record in DTS with path and message
146 self
._log
.debug("Updating VNFR xact = %s, %s:%s",
148 self
.regh
.update_element(path
, msg
)
149 self
._log
.debug("Updated VNFR xact = %s, %s:%s",
153 def delete(self
, xact
, path
):
155 Delete a VNFR record in DTS with path and message
157 self
._log
.debug("Deleting VNFR xact = %s, %s", xact
, path
)
158 self
.regh
.delete_element(path
)
159 self
._log
.debug("Deleted VNFR xact = %s, %s", xact
, path
)
162 class VlrPublisherDtsHandler(object):
163 """ registers 'D,/vlr:vlr-catalog/vlr:vlr """
164 XPATH
= "D,/vlr:vlr-catalog/vlr:vlr"
166 def __init__(self
, dts
, log
, loop
):
175 """ Return registration handle"""
180 """ Register for vlr create/update/delete/read requests from dts """
183 def on_prepare(xact_info
, action
, ks_path
, msg
):
184 """ prepare callback from dts """
186 "Got vlr on_prepare callback (xact_info: %s, action: %s): %s",
187 xact_info
, action
, msg
189 raise NotImplementedError(
190 "%s action on VirtualLinkRecord not supported",
193 self
._log
.debug("Registering for VLR using xpath: %s",
194 VlrPublisherDtsHandler
.XPATH
,)
196 hdl
= rift
.tasklets
.DTS
.RegistrationHandler()
197 with self
._dts
.group_create() as group
:
198 self
._regh
= group
.register(xpath
=VlrPublisherDtsHandler
.XPATH
,
200 flags
=(rwdts
.Flag
.PUBLISHER |
201 rwdts
.Flag
.NO_PREP_READ |
205 def create(self
, xact
, path
, msg
):
207 Create a VLR record in DTS with path and message
209 self
._log
.debug("Creating VLR xact = %s, %s:%s",
211 self
.regh
.create_element(path
, msg
)
212 self
._log
.debug("Created VLR xact = %s, %s:%s",
216 def update(self
, xact
, path
, msg
):
218 Update a VLR record in DTS with path and message
220 self
._log
.debug("Updating VLR xact = %s, %s:%s",
222 self
.regh
.update_element(path
, msg
)
223 self
._log
.debug("Updated VLR xact = %s, %s:%s",
227 def delete(self
, xact
, path
):
229 Delete a VLR record in DTS with path and message
231 self
._log
.debug("Deleting VLR xact = %s, %s", xact
, path
)
232 self
.regh
.delete_element(path
)
233 self
._log
.debug("Deleted VLR xact = %s, %s", xact
, path
)
236 class VnfdPublisher(object):
237 AUTH
= ('admin', 'admin')
238 HEADERS
= {"content-type": "application/vnd.yang.data+json"}
241 def __init__(self
, use_ssl
, ssl_cert
, ssl_key
, loop
):
242 self
.use_ssl
= use_ssl
243 self
.ssl_cert
= ssl_cert
244 self
.ssl_key
= ssl_key
245 self
.executor
= concurrent
.futures
.ThreadPoolExecutor(max_workers
=1)
249 def update(self
, vnfd
):
252 Update VNFD record using rest API, as the config data is handled
253 by uAgent and stored in CDB
256 scheme
= "https" if self
.use_ssl
else "http"
258 url
= "{}://127.0.0.1:8008/api/config/vnfd-catalog/vnfd/{}"
260 model
= RwYang
.Model
.create_libncx()
261 model
.load_module("rw-vnfd")
262 model
.load_module("vnfd")
264 data
= vnfd
.to_json(model
)
266 key
= "vnfd:vnfd-catalog"
267 newdict
= json
.loads(data
)
269 data
= json
.dumps(newdict
[key
])
271 options
= {"data": data
,
272 "headers": VnfdPublisher
.HEADERS
,
273 "auth": VnfdPublisher
.AUTH
}
276 options
["verify"] = False
277 options
["cert"] = (self
.ssl_cert
, self
.ssl_key
)
279 response
= requests
.put(
280 url
.format(scheme
, vnfd
.id),
284 status
= yield from self
.loop
.run_in_executor(