RIFT OSM R1 Initial Submission
[osm/SO.git] / rwlaunchpad / plugins / rwnsm / rift / tasklets / rwnsmtasklet / publisher.py
diff --git a/rwlaunchpad/plugins/rwnsm/rift/tasklets/rwnsmtasklet/publisher.py b/rwlaunchpad/plugins/rwnsm/rift/tasklets/rwnsmtasklet/publisher.py
new file mode 100644 (file)
index 0000000..6c4b123
--- /dev/null
@@ -0,0 +1,289 @@
+# 
+#   Copyright 2016 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+
+import asyncio
+import concurrent.futures
+import json
+
+from gi.repository import (
+    RwDts as rwdts,
+    RwTypes,
+    RwVnfdYang,
+    RwYang
+    )
+import rift.tasklets
+
+import requests
+
+
+class NsrOpDataDtsHandler(object):
+    """ The network service op data DTS handler """
+    XPATH = "D,/nsr:ns-instance-opdata/nsr:nsr"
+
+    def __init__(self, dts, log, loop):
+        self._dts = dts
+        self._log = log
+        self._loop = loop
+        self._regh = None
+
+    @property
+    def regh(self):
+        """ Return the registration handle"""
+        return self._regh
+
+    @asyncio.coroutine
+    def register(self):
+        """ Register for Nsr op data publisher registration"""
+        self._log.debug("Registering Nsr op data path %s as publisher",
+                        NsrOpDataDtsHandler.XPATH)
+
+        hdl = rift.tasklets.DTS.RegistrationHandler()
+        with self._dts.group_create() as group:
+            self._regh = group.register(xpath=NsrOpDataDtsHandler.XPATH,
+                                        handler=hdl,
+                                        flags=rwdts.Flag.PUBLISHER | rwdts.Flag.NO_PREP_READ)
+
+    @asyncio.coroutine
+    def create(self, xact, path, msg):
+        """
+        Create an NS record in DTS with the path and message
+        """
+        self._log.debug("Creating NSR xact = %s, %s:%s", xact, path, msg)
+        self.regh.create_element(path, msg)
+        self._log.debug("Created NSR xact = %s, %s:%s", xact, path, msg)
+
+    @asyncio.coroutine
+    def update(self, xact, path, msg, flags=rwdts.XactFlag.REPLACE):
+        """
+        Update an NS record in DTS with the path and message
+        """
+        self._log.debug("Updating NSR xact = %s, %s:%s regh = %s", xact, path, msg, self.regh)
+        self.regh.update_element(path, msg, flags)
+        self._log.debug("Updated NSR xact = %s, %s:%s", xact, path, msg)
+
+    @asyncio.coroutine
+    def delete(self, xact, path):
+        """
+        Update an NS record in DTS with the path and message
+        """
+        self._log.debug("Deleting NSR xact:%s, path:%s", xact, path)
+        self.regh.delete_element(path)
+        self._log.debug("Deleted NSR xact:%s, path:%s", xact, path)
+
+
+
+class VnfrPublisherDtsHandler(object):
+    """ Registers 'D,/vnfr:vnfr-catalog/vnfr:vnfr' DTS"""
+    XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr"
+
+    def __init__(self, dts, log, loop):
+        self._dts = dts
+        self._log = log
+        self._loop = loop
+
+        self._regh = None
+
+    @property
+    def regh(self):
+        """ Return registration handle"""
+        return self._regh
+
+    @asyncio.coroutine
+    def register(self):
+        """ Register for Vvnfr create/update/delete/read requests from dts """
+
+        @asyncio.coroutine
+        def on_prepare(xact_info, action, ks_path, msg):
+            """ prepare callback from dts """
+            self._log.debug(
+                "Got vnfr on_prepare callback (xact_info: %s, action: %s): %s",
+                xact_info, action, msg
+                )
+            raise NotImplementedError(
+                "%s action on VirtualNetworkFunctionRecord not supported",
+                action)
+
+        self._log.debug("Registering for VNFR using xpath: %s",
+                        VnfrPublisherDtsHandler.XPATH,)
+
+        hdl = rift.tasklets.DTS.RegistrationHandler()
+        with self._dts.group_create() as group:
+            self._regh = group.register(xpath=VnfrPublisherDtsHandler.XPATH,
+                                        handler=hdl,
+                                        flags=(rwdts.Flag.PUBLISHER |
+                                               rwdts.Flag.NO_PREP_READ |
+                                               rwdts.Flag.CACHE),)
+
+    @asyncio.coroutine
+    def create(self, xact, path, msg):
+        """
+        Create a VNFR record in DTS with path and message
+        """
+        self._log.debug("Creating VNFR xact = %s, %s:%s",
+                        xact, path, msg)
+        self.regh.create_element(path, msg)
+        self._log.debug("Created VNFR xact = %s, %s:%s",
+                        xact, path, msg)
+
+    @asyncio.coroutine
+    def update(self, xact, path, msg):
+        """
+        Update a VNFR record in DTS with path and message
+        """
+        self._log.debug("Updating VNFR xact = %s, %s:%s",
+                        xact, path, msg)
+        self.regh.update_element(path, msg)
+        self._log.debug("Updated VNFR xact = %s, %s:%s",
+                        xact, path, msg)
+
+    @asyncio.coroutine
+    def delete(self, xact, path):
+        """
+        Delete a VNFR record in DTS with path and message
+        """
+        self._log.debug("Deleting VNFR xact = %s, %s", xact, path)
+        self.regh.delete_element(path)
+        self._log.debug("Deleted VNFR xact = %s, %s", xact, path)
+
+
+class VlrPublisherDtsHandler(object):
+    """ registers 'D,/vlr:vlr-catalog/vlr:vlr """
+    XPATH = "D,/vlr:vlr-catalog/vlr:vlr"
+
+    def __init__(self, dts, log, loop):
+        self._dts = dts
+        self._log = log
+        self._loop = loop
+
+        self._regh = None
+
+    @property
+    def regh(self):
+        """ Return registration handle"""
+        return self._regh
+
+    @asyncio.coroutine
+    def register(self):
+        """ Register for vlr create/update/delete/read requests from dts """
+
+        @asyncio.coroutine
+        def on_prepare(xact_info, action, ks_path, msg):
+            """ prepare callback from dts """
+            self._log.debug(
+                "Got vlr on_prepare callback (xact_info: %s, action: %s): %s",
+                xact_info, action, msg
+                )
+            raise NotImplementedError(
+                "%s action on VirtualLinkRecord not supported",
+                action)
+
+        self._log.debug("Registering for VLR using xpath: %s",
+                        VlrPublisherDtsHandler.XPATH,)
+
+        hdl = rift.tasklets.DTS.RegistrationHandler()
+        with self._dts.group_create() as group:
+            self._regh = group.register(xpath=VlrPublisherDtsHandler.XPATH,
+                                        handler=hdl,
+                                        flags=(rwdts.Flag.PUBLISHER |
+                                               rwdts.Flag.NO_PREP_READ |
+                                               rwdts.Flag.CACHE),)
+
+    @asyncio.coroutine
+    def create(self, xact, path, msg):
+        """
+        Create a VLR record in DTS with path and message
+        """
+        self._log.debug("Creating VLR xact = %s, %s:%s",
+                        xact, path, msg)
+        self.regh.create_element(path, msg)
+        self._log.debug("Created VLR xact = %s, %s:%s",
+                        xact, path, msg)
+
+    @asyncio.coroutine
+    def update(self, xact, path, msg):
+        """
+        Update a VLR record in DTS with path and message
+        """
+        self._log.debug("Updating VLR xact = %s, %s:%s",
+                        xact, path, msg)
+        self.regh.update_element(path, msg)
+        self._log.debug("Updated VLR xact = %s, %s:%s",
+                        xact, path, msg)
+
+    @asyncio.coroutine
+    def delete(self, xact, path):
+        """
+        Delete a VLR record in DTS with path and message
+        """
+        self._log.debug("Deleting VLR xact = %s, %s", xact, path)
+        self.regh.delete_element(path)
+        self._log.debug("Deleted VLR xact = %s, %s", xact, path)
+
+
+class VnfdPublisher(object):
+    AUTH = ('admin', 'admin')
+    HEADERS = {"content-type": "application/vnd.yang.data+json"}
+
+
+    def __init__(self, use_ssl, ssl_cert, ssl_key, loop):
+        self.use_ssl = use_ssl
+        self.ssl_cert = ssl_cert
+        self.ssl_key = ssl_key
+        self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
+        self.loop = loop
+
+    @asyncio.coroutine
+    def update(self, vnfd):
+        def update(vnfd):
+            """
+            Update VNFD record using rest API, as the config data is handled
+            by uAgent and stored in CDB
+            """
+
+            scheme = "https" if self.use_ssl else "http"
+
+            url = "{}://127.0.0.1:8008/api/config/vnfd-catalog/vnfd/{}"
+
+            model = RwYang.Model.create_libncx()
+            model.load_module("rw-vnfd")
+            model.load_module("vnfd")
+
+            data = vnfd.to_json(model)
+
+            key = "vnfd:vnfd-catalog"
+            newdict = json.loads(data)
+            if key in newdict:
+                data = json.dumps(newdict[key])
+
+            options = {"data": data,
+                       "headers": VnfdPublisher.HEADERS,
+                       "auth": VnfdPublisher.AUTH}
+
+            if self.use_ssl:
+                options["verify"] = False
+                options["cert"] = (self.ssl_cert, self.ssl_key)
+
+            response = requests.put(
+                url.format(scheme, vnfd.id),
+                **options
+            )
+
+        status = yield from self.loop.run_in_executor(
+            None,
+            update,
+            vnfd
+            )
+