RIFT OSM R1 Initial Submission

Signed-off-by: Jeremy Mordkoff <jeremy.mordkoff@riftio.com>
diff --git a/common/python/rift/mano/cloud/__init__.py b/common/python/rift/mano/cloud/__init__.py
new file mode 100644
index 0000000..4317d51
--- /dev/null
+++ b/common/python/rift/mano/cloud/__init__.py
@@ -0,0 +1,30 @@
+
+# 
+#   Copyright 2016 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+
+from .accounts import (
+    CloudAccount,
+    CloudAccountCalError,
+    )
+
+from .config import (
+    CloudAccountConfigSubscriber,
+    CloudAccountConfigCallbacks
+    )
+
+from .operdata import (
+     CloudAccountDtsOperdataHandler,
+)
diff --git a/common/python/rift/mano/cloud/accounts.py b/common/python/rift/mano/cloud/accounts.py
new file mode 100644
index 0000000..57ca55f
--- /dev/null
+++ b/common/python/rift/mano/cloud/accounts.py
@@ -0,0 +1,181 @@
+
+# 
+#   Copyright 2016 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+
+import sys
+import asyncio
+from gi import require_version
+require_version('RwcalYang', '1.0')
+require_version('RwTypes', '1.0')
+require_version('RwCloudYang', '1.0')
+
+from gi.repository import (
+        RwTypes,
+        RwcalYang,
+        RwCloudYang,
+        )
+import rw_peas
+
+if sys.version_info < (3, 4, 4):
+    asyncio.ensure_future = asyncio.async
+
+
+class PluginLoadingError(Exception):
+    pass
+
+
+class CloudAccountCalError(Exception):
+    pass
+
+
+class CloudAccount(object):
+    def __init__(self, log, rwlog_hdl, account_msg):
+        self._log = log
+        self._account_msg = account_msg.deep_copy()
+
+        self._cal_plugin = None
+        self._engine = None
+
+        self._cal = self.plugin.get_interface("Cloud")
+        self._cal.init(rwlog_hdl)
+
+        self._status = RwCloudYang.CloudAccount_ConnectionStatus(
+                status="unknown",
+                details="Connection status lookup not started"
+                )
+
+        self._validate_task = None
+
+    @property
+    def plugin(self):
+        if self._cal_plugin is None:
+            try:
+                self._cal_plugin = rw_peas.PeasPlugin(
+                        getattr(self._account_msg, self.account_type).plugin_name,
+                        'RwCal-1.0',
+                        )
+
+            except AttributeError as e:
+                raise PluginLoadingError(str(e))
+
+            self._engine, _, _ = self._cal_plugin()
+
+        return self._cal_plugin
+
+    def _wrap_status_fn(self, fn, *args, **kwargs):
+        ret = fn(*args, **kwargs)
+        rw_status = ret[0]
+        if rw_status != RwTypes.RwStatus.SUCCESS:
+            msg = "%s returned %s" % (fn.__name__, str(rw_status))
+            self._log.error(msg)
+            raise CloudAccountCalError(msg)
+
+        # If there was only one other return value besides rw_status, then just
+        # return that element.  Otherwise return the rest of the return values
+        # as a list.
+        return ret[1] if len(ret) == 2 else ret[1:]
+
+    @property
+    def cal(self):
+        return self._cal
+
+    @property
+    def name(self):
+        return self._account_msg.name
+
+    @property
+    def account_msg(self):
+        return self._account_msg
+
+    @property
+    def cal_account_msg(self):
+        return RwcalYang.CloudAccount.from_dict(
+                self.account_msg.as_dict(),
+                ignore_missing_keys=True,
+                )
+
+    def cloud_account_msg(self, account_dict):
+        self._account_msg = RwCloudYang.CloudAccount.from_dict(account_dict)
+
+    @property
+    def account_type(self):
+        return self._account_msg.account_type
+
+    @property
+    def connection_status(self):
+        return self._status
+
+    def update_from_cfg(self, cfg):
+        self._log.debug("Updating parent CloudAccount to %s", cfg)
+
+        # Hack to catch updates triggered from apply_callback when a sdn-account is removed
+        # from a cloud-account. To be fixed properly when updates are handled
+        if (self.account_msg.name == cfg.name
+                and self.account_msg.account_type == cfg.account_type):
+            return
+
+        if cfg.has_field("sdn_account"):
+            self.account_msg.sdn_account = cfg.sdn_account
+        else:
+            raise NotImplementedError("Update cloud account not yet supported")
+
+    def create_image(self, image_info_msg):
+        image_id = self._wrap_status_fn(
+                self.cal.create_image, self.cal_account_msg, image_info_msg
+                )
+
+        return image_id
+
+    def get_image_list(self):
+        self._log.debug("Getting image list from account: %s", self.name)
+        resources = self._wrap_status_fn(
+                self.cal.get_image_list, self.cal_account_msg
+                )
+
+        return resources.imageinfo_list
+
+    @asyncio.coroutine
+    def validate_cloud_account_credentials(self, loop):
+        self._log.debug("Validating Cloud Account credentials %s", self._account_msg)
+        self._status = RwCloudYang.CloudAccount_ConnectionStatus(
+                status="validating",
+                details="Cloud account connection validation in progress"
+                )
+        rwstatus, status = yield from loop.run_in_executor(
+                None,
+                self._cal.validate_cloud_creds,
+                self.cal_account_msg,
+                )
+        if rwstatus == RwTypes.RwStatus.SUCCESS:
+            self._status = RwCloudYang.CloudAccount_ConnectionStatus.from_dict(status.as_dict())
+        else:
+            self._status = RwCloudYang.CloudAccount_ConnectionStatus(
+                    status="failure",
+                    details="Error when calling CAL validate cloud creds"
+                    )
+
+        self._log.info("Got cloud account validation response: %s", self._status)
+
+    def start_validate_credentials(self, loop):
+        if self._validate_task is not None:
+            self._validate_task.cancel()
+            self._validate_task = None
+
+        self._validate_task = asyncio.ensure_future(
+                self.validate_cloud_account_credentials(loop),
+                loop=loop
+                )
+
diff --git a/common/python/rift/mano/cloud/config.py b/common/python/rift/mano/cloud/config.py
new file mode 100644
index 0000000..1b1847c
--- /dev/null
+++ b/common/python/rift/mano/cloud/config.py
@@ -0,0 +1,249 @@
+
+# 
+#   Copyright 2016 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+
+import asyncio
+import rw_peas
+
+import gi
+gi.require_version('RwDts', '1.0')
+import rift.tasklets
+
+from gi.repository import (
+    RwcalYang as rwcal,
+    RwDts as rwdts,
+    ProtobufC,
+    )
+
+from . import accounts
+
+class CloudAccountNotFound(Exception):
+    pass
+
+
+class CloudAccountError(Exception):
+    pass
+
+
+def get_add_delete_update_cfgs(dts_member_reg, xact, key_name):
+    # Unforunately, it is currently difficult to figure out what has exactly
+    # changed in this xact without Pbdelta support (RIFT-4916)
+    # As a workaround, we can fetch the pre and post xact elements and
+    # perform a comparison to figure out adds/deletes/updates
+    xact_cfgs = list(dts_member_reg.get_xact_elements(xact))
+    curr_cfgs = list(dts_member_reg.elements)
+
+    xact_key_map = {getattr(cfg, key_name): cfg for cfg in xact_cfgs}
+    curr_key_map = {getattr(cfg, key_name): cfg for cfg in curr_cfgs}
+
+    # Find Adds
+    added_keys = set(xact_key_map) - set(curr_key_map)
+    added_cfgs = [xact_key_map[key] for key in added_keys]
+
+    # Find Deletes
+    deleted_keys = set(curr_key_map) - set(xact_key_map)
+    deleted_cfgs = [curr_key_map[key] for key in deleted_keys]
+
+    # Find Updates
+    updated_keys = set(curr_key_map) & set(xact_key_map)
+    updated_cfgs = [xact_key_map[key] for key in updated_keys if xact_key_map[key] != curr_key_map[key]]
+
+    return added_cfgs, deleted_cfgs, updated_cfgs
+
+
+class CloudAccountConfigCallbacks(object):
+    def __init__(self,
+                 on_add_apply=None, on_add_prepare=None,
+                 on_delete_apply=None, on_delete_prepare=None):
+
+        @asyncio.coroutine
+        def prepare_noop(*args, **kwargs):
+            pass
+
+        def apply_noop(*args, **kwargs):
+            pass
+
+        self.on_add_apply = on_add_apply
+        self.on_add_prepare = on_add_prepare
+        self.on_delete_apply = on_delete_apply
+        self.on_delete_prepare = on_delete_prepare
+
+        for f in ('on_add_apply', 'on_delete_apply'):
+            ref = getattr(self, f)
+            if ref is None:
+                setattr(self, f, apply_noop)
+                continue
+
+            if asyncio.iscoroutinefunction(ref):
+                raise ValueError('%s cannot be a coroutine' % (f,))
+
+        for f in ('on_add_prepare', 'on_delete_prepare'):
+            ref = getattr(self, f)
+            if ref is None:
+                setattr(self, f, prepare_noop)
+                continue
+
+            if not asyncio.iscoroutinefunction(ref):
+                raise ValueError("%s must be a coroutine" % f)
+
+
+class CloudAccountConfigSubscriber(object):
+    XPATH = "C,/rw-cloud:cloud/rw-cloud:account"
+
+    def __init__(self, dts, log, rwlog_hdl, cloud_callbacks):
+        self._dts = dts
+        self._log = log
+        self._rwlog_hdl = rwlog_hdl
+        self._reg = None
+
+        self.accounts = {}
+
+        self._cloud_callbacks = cloud_callbacks
+
+    def add_account(self, account_msg):
+        self._log.info("adding cloud account: {}".format(account_msg))
+
+        account = accounts.CloudAccount(self._log, self._rwlog_hdl, account_msg)
+        self.accounts[account.name] = account
+
+        self._cloud_callbacks.on_add_apply(account)
+
+    def delete_account(self, account_name):
+        self._log.info("deleting cloud account: {}".format(account_name))
+        del self.accounts[account_name]
+
+        self._cloud_callbacks.on_delete_apply(account_name)
+
+    def update_account(self, account_msg):
+        """ Update an existing cloud account
+
+        In order to simplify update, turn an update into a delete followed by
+        an add.  The drawback to this approach is that we will not support
+        updates of an "in-use" cloud account, but this seems like a
+        reasonable trade-off.
+
+
+        Arguments:
+            account_msg - The cloud account config message
+        """
+        self._log.info("updating cloud account: {}".format(account_msg))
+
+        self.delete_account(account_msg.name)
+        self.add_account(account_msg)
+
+    def register(self):
+        @asyncio.coroutine
+        def apply_config(dts, acg, xact, action, _):
+            self._log.debug("Got cloud account apply config (xact: %s) (action: %s)", xact, action)
+
+            if xact.xact is None:
+                if action == rwdts.AppconfAction.INSTALL:
+                    curr_cfg = self._reg.elements
+                    for cfg in curr_cfg:
+                        self._log.debug("Cloud account being re-added after restart.")
+                        if not cfg.has_field('account_type'):
+                            raise CloudAccountError("New cloud account must contain account_type field.")
+                        self.add_account(cfg)
+                else:
+                    # When RIFT first comes up, an INSTALL is called with the current config
+                    # Since confd doesn't actally persist data this never has any data so
+                    # skip this for now.
+                    self._log.debug("No xact handle.  Skipping apply config")
+
+                return
+
+            add_cfgs, delete_cfgs, update_cfgs = get_add_delete_update_cfgs(
+                    dts_member_reg=self._reg,
+                    xact=xact,
+                    key_name="name",
+                    )
+
+            # Handle Deletes
+            for cfg in delete_cfgs:
+                self.delete_account(cfg.name)
+
+            # Handle Adds
+            for cfg in add_cfgs:
+                self.add_account(cfg)
+
+            # Handle Updates
+            for cfg in update_cfgs:
+                self.update_account(cfg)
+
+        @asyncio.coroutine
+        def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
+            """ Prepare callback from DTS for Cloud Account """
+
+            action = xact_info.query_action
+            self._log.debug("Cloud account on_prepare config received (action: %s): %s",
+                            xact_info.query_action, msg)
+
+            if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE]:
+                if msg.name in self.accounts:
+                    self._log.debug("Cloud account already exists. Invoking update request")
+
+                    # Since updates are handled by a delete followed by an add, invoke the
+                    # delete prepare callbacks to give clients an opportunity to reject.
+                    yield from self._cloud_callbacks.on_delete_prepare(msg.name)
+
+                else:
+                    self._log.debug("Cloud account does not already exist. Invoking on_prepare add request")
+                    if not msg.has_field('account_type'):
+                        raise CloudAccountError("New cloud account must contain account_type field.")
+
+                    account = accounts.CloudAccount(self._log, self._rwlog_hdl, msg)
+                    yield from self._cloud_callbacks.on_add_prepare(account)
+
+            elif action == rwdts.QueryAction.DELETE:
+                # Check if the entire cloud account got deleted
+                fref = ProtobufC.FieldReference.alloc()
+                fref.goto_whole_message(msg.to_pbcm())
+                if fref.is_field_deleted():
+                    yield from self._cloud_callbacks.on_delete_prepare(msg.name)
+
+                else:
+                    fref.goto_proto_name(msg.to_pbcm(), "sdn_account")
+                    if fref.is_field_deleted():
+                        # SDN account disassociated from cloud account
+                        account = self.accounts[msg.name]
+                        dict_account = account.account_msg.as_dict()
+                        del dict_account["sdn_account"]
+                        account.cloud_account_msg(dict_account)
+                    else:
+                        self._log.error("Deleting individual fields for cloud account not supported")
+                        xact_info.respond_xpath(rwdts.XactRspCode.NACK)
+                        return
+
+            else:
+                self._log.error("Action (%s) NOT SUPPORTED", action)
+                xact_info.respond_xpath(rwdts.XactRspCode.NACK)
+
+            xact_info.respond_xpath(rwdts.XactRspCode.ACK)
+
+        self._log.debug("Registering for Cloud Account config using xpath: %s",
+                        CloudAccountConfigSubscriber.XPATH,
+                        )
+
+        acg_handler = rift.tasklets.AppConfGroup.Handler(
+                        on_apply=apply_config,
+                        )
+
+        with self._dts.appconf_group_create(acg_handler) as acg:
+            self._reg = acg.register(
+                    xpath=CloudAccountConfigSubscriber.XPATH,
+                    flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE,
+                    on_prepare=on_prepare,
+                    )
diff --git a/common/python/rift/mano/cloud/operdata.py b/common/python/rift/mano/cloud/operdata.py
new file mode 100644
index 0000000..4878691
--- /dev/null
+++ b/common/python/rift/mano/cloud/operdata.py
@@ -0,0 +1,140 @@
+
+# 
+#   Copyright 2016 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+
+import asyncio
+import rift.tasklets
+
+from gi.repository import(
+        RwCloudYang,
+        RwDts as rwdts,
+        )
+
+class CloudAccountNotFound(Exception):
+    pass
+
+
+class CloudAccountDtsOperdataHandler(object):
+    def __init__(self, dts, log, loop):
+        self._dts = dts
+        self._log = log
+        self._loop = loop
+
+        self.cloud_accounts = {}
+
+    def add_cloud_account(self, account):
+        self.cloud_accounts[account.name] = account
+        account.start_validate_credentials(self._loop)
+
+    def delete_cloud_account(self, account_name):
+        del self.cloud_accounts[account_name]
+
+    def get_saved_cloud_accounts(self, cloud_account_name):
+        ''' Get Cloud Account corresponding to passed name, or all saved accounts if name is None'''
+        saved_cloud_accounts = []
+
+        if cloud_account_name is None or cloud_account_name == "":
+            cloud_accounts = list(self.cloud_accounts.values())
+            saved_cloud_accounts.extend(cloud_accounts)
+        elif cloud_account_name in self.cloud_accounts:
+            account = self.cloud_accounts[cloud_account_name]
+            saved_cloud_accounts.append(account)
+        else:
+            errstr = "Cloud account {} does not exist".format(cloud_account_name)
+            raise KeyError(errstr)
+
+        return saved_cloud_accounts
+
+    @asyncio.coroutine
+    def create_notification(self, account):
+        xpath = "N,/rw-cloud:cloud-notif"
+        ac_status = RwCloudYang.YangNotif_RwCloud_CloudNotif()
+        ac_status.name = account.name
+        ac_status.message = account.connection_status.details
+
+        yield from self._dts.query_create(xpath, rwdts.XactFlag.ADVISE, ac_status)
+        self._log.info("Notification called by creating dts query: %s", ac_status)
+
+
+    def _register_show_status(self):
+        def get_xpath(cloud_name=None):
+            return "D,/rw-cloud:cloud/account{}/connection-status".format(
+                    "[name='%s']" % cloud_name if cloud_name is not None else ''
+                    )
+
+        @asyncio.coroutine
+        def on_prepare(xact_info, action, ks_path, msg):
+            path_entry = RwCloudYang.CloudAccount.schema().keyspec_to_entry(ks_path)
+            cloud_account_name = path_entry.key00.name
+            self._log.debug("Got show cloud connection status request: %s", ks_path.create_string())
+
+            try:
+                saved_accounts = self.get_saved_cloud_accounts(cloud_account_name)
+                for account in saved_accounts:
+                    connection_status = account.connection_status
+                    self._log.debug("Responding to cloud connection status request: %s", connection_status)
+                    xact_info.respond_xpath(
+                            rwdts.XactRspCode.MORE,
+                            xpath=get_xpath(account.name),
+                            msg=account.connection_status,
+                            )
+            except KeyError as e:
+                self._log.warning(str(e))
+                xact_info.respond_xpath(rwdts.XactRspCode.NA)
+                return
+
+            xact_info.respond_xpath(rwdts.XactRspCode.ACK)
+
+        yield from self._dts.register(
+                xpath=get_xpath(),
+                handler=rift.tasklets.DTS.RegistrationHandler(
+                    on_prepare=on_prepare),
+                flags=rwdts.Flag.PUBLISHER,
+                )
+
+    def _register_validate_rpc(self):
+        def get_xpath():
+            return "/rw-cloud:update-cloud-status"
+
+        @asyncio.coroutine
+        def on_prepare(xact_info, action, ks_path, msg):
+            if not msg.has_field("cloud_account"):
+                raise CloudAccountNotFound("Cloud account name not provided")
+
+            cloud_account_name = msg.cloud_account
+            try:
+                account = self.cloud_accounts[cloud_account_name]
+            except KeyError:
+                raise CloudAccountNotFound("Cloud account name %s not found" % cloud_account_name)
+
+            account.start_validate_credentials(self._loop)
+
+            yield from self.create_notification(account)
+
+            xact_info.respond_xpath(rwdts.XactRspCode.ACK)
+
+        yield from self._dts.register(
+                xpath=get_xpath(),
+                handler=rift.tasklets.DTS.RegistrationHandler(
+                    on_prepare=on_prepare
+                    ),
+                flags=rwdts.Flag.PUBLISHER,
+                )
+
+    @asyncio.coroutine
+    def register(self):
+        yield from self._register_show_status()
+        yield from self._register_validate_rpc()