New feature: Code changes for project support
Signed-off-by: Philip Joseph <philip.joseph@riftio.com>
diff --git a/common/python/rift/downloader/local_file.py b/common/python/rift/downloader/local_file.py
new file mode 100644
index 0000000..c6e9c5e
--- /dev/null
+++ b/common/python/rift/downloader/local_file.py
@@ -0,0 +1,84 @@
+#
+# Copyright 2016 RIFT.IO Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Taken from http://stackoverflow.com/a/27786580
+
+
+import logging
+import requests
+import os
+from urllib.parse import urlparse
+
+
+class LocalFileAdapter(requests.adapters.BaseAdapter):
+ """Protocol Adapter to allow Requests to GET file:// URLs
+
+ @todo: Properly handle non-empty hostname portions.
+ """
+
+ @staticmethod
+ def _chkpath(method, path):
+ """Return an HTTP status for the given filesystem path."""
+ if method.lower() in ('put', 'delete'):
+ return 501, "Not Implemented" # TODO
+ elif method.lower() not in ('get', 'head'):
+ return 405, "Method Not Allowed"
+ elif os.path.isdir(path):
+ return 400, "Path Not A File"
+ elif not os.path.isfile(path):
+ return 404, "File Not Found"
+ elif not os.access(path, os.R_OK):
+ return 403, "Access Denied"
+ else:
+ return 200, "OK"
+
+ def send(self, req, **kwargs): # pylint: disable=unused-argument
+ """Return the file specified by the given request
+
+ @type req: C{PreparedRequest}
+ @todo: Should I bother filling `response.headers` and processing
+ If-Modified-Since and friends using `os.stat`?
+ """
+
+ log = logging.getLogger('rw-mano-log')
+ log.debug("Request: {}".format(req))
+
+ url = urlparse(req.path_url)
+ path = os.path.normcase(os.path.normpath(url.path))
+ response = requests.Response()
+
+ response.status_code, response.reason = self._chkpath(req.method, path)
+ log.debug("Response {}: {}".format(response.status_code, response.reason))
+ if response.status_code == 200 and req.method.lower() != 'head':
+ try:
+ response.raw = open(path, 'rb')
+ except (OSError, IOError) as err:
+ response.status_code = 500
+ response.reason = str(err)
+
+ if isinstance(req.url, bytes):
+ response.url = req.url.decode('utf-8')
+ else:
+ response.url = req.url
+
+ response.request = req
+ response.connection = self
+
+
+ log.debug("Response {}: {}".format(response.status_code, response))
+ return response
+
+ def close(self):
+ pass
diff --git a/common/python/rift/downloader/url.py b/common/python/rift/downloader/url.py
index 2768894..0cdd8d4 100644
--- a/common/python/rift/downloader/url.py
+++ b/common/python/rift/downloader/url.py
@@ -39,6 +39,7 @@
from gi.repository import RwPkgMgmtYang
from . import base
+from .local_file import LocalFileAdapter as LocalFileAdapter
class UrlDownloader(base.AbstractDownloader):
@@ -109,6 +110,7 @@
retries = Retry(total=5, backoff_factor=1)
session.mount("http://", HTTPAdapter(max_retries=retries))
session.mount("https://", HTTPAdapter(max_retries=retries))
+ session.mount("file://", LocalFileAdapter())
return session
diff --git a/common/python/rift/mano/cloud/accounts.py b/common/python/rift/mano/cloud/accounts.py
index d3aa860..6563512 100644
--- a/common/python/rift/mano/cloud/accounts.py
+++ b/common/python/rift/mano/cloud/accounts.py
@@ -53,7 +53,7 @@
self._cal = self.plugin.get_interface("Cloud")
self._cal.init(rwlog_hdl)
- self._status = RwCloudYang.CloudAccount_ConnectionStatus(
+ self._status = RwCloudYang.CloudAcc_ConnectionStatus(
status="unknown",
details="Connection status lookup not started"
)
@@ -151,7 +151,7 @@
@asyncio.coroutine
def validate_cloud_account_credentials(self, loop):
self._log.debug("Validating Cloud Account credentials %s", self._account_msg)
- self._status = RwCloudYang.CloudAccount_ConnectionStatus(
+ self._status = RwCloudYang.CloudAcc_ConnectionStatus(
status="validating",
details="Cloud account connection validation in progress"
)
@@ -161,9 +161,9 @@
self.cal_account_msg,
)
if rwstatus == RwTypes.RwStatus.SUCCESS:
- self._status = RwCloudYang.CloudAccount_ConnectionStatus.from_dict(status.as_dict())
+ self._status = RwCloudYang.CloudAcc_ConnectionStatus.from_dict(status.as_dict())
else:
- self._status = RwCloudYang.CloudAccount_ConnectionStatus(
+ self._status = RwCloudYang.CloudAcc_ConnectionStatus(
status="failure",
details="Error when calling CAL validate cloud creds"
)
diff --git a/common/python/rift/mano/cloud/config.py b/common/python/rift/mano/cloud/config.py
index c62f129..f7906fd 100644
--- a/common/python/rift/mano/cloud/config.py
+++ b/common/python/rift/mano/cloud/config.py
@@ -21,11 +21,13 @@
import gi
gi.require_version('RwDts', '1.0')
import rift.tasklets
+from rift.mano.utils.project import get_add_delete_update_cfgs
from gi.repository import (
RwcalYang as rwcal,
RwDts as rwdts,
ProtobufC,
+ RwCloudYang,
)
from . import accounts
@@ -38,32 +40,6 @@
pass
-def get_add_delete_update_cfgs(dts_member_reg, xact, key_name):
- # Unforunately, it is currently difficult to figure out what has exactly
- # changed in this xact without Pbdelta support (RIFT-4916)
- # As a workaround, we can fetch the pre and post xact elements and
- # perform a comparison to figure out adds/deletes/updates
- xact_cfgs = list(dts_member_reg.get_xact_elements(xact))
- curr_cfgs = list(dts_member_reg.elements)
-
- xact_key_map = {getattr(cfg, key_name): cfg for cfg in xact_cfgs}
- curr_key_map = {getattr(cfg, key_name): cfg for cfg in curr_cfgs}
-
- # Find Adds
- added_keys = set(xact_key_map) - set(curr_key_map)
- added_cfgs = [xact_key_map[key] for key in added_keys]
-
- # Find Deletes
- deleted_keys = set(curr_key_map) - set(xact_key_map)
- deleted_cfgs = [curr_key_map[key] for key in deleted_keys]
-
- # Find Updates
- updated_keys = set(curr_key_map) & set(xact_key_map)
- updated_cfgs = [xact_key_map[key] for key in updated_keys if xact_key_map[key] != curr_key_map[key]]
-
- return added_cfgs, deleted_cfgs, updated_cfgs
-
-
class CloudAccountConfigCallbacks(object):
def __init__(self,
on_add_apply=None, on_add_prepare=None,
@@ -101,12 +77,13 @@
class CloudAccountConfigSubscriber(object):
- XPATH = "C,/rw-project:project/rw-cloud:cloud/rw-cloud:account"
+ XPATH = "C,/rw-cloud:cloud/rw-cloud:account"
- def __init__(self, dts, log, rwlog_hdl, cloud_callbacks):
+ def __init__(self, dts, log, rwlog_hdl, project, cloud_callbacks):
self._dts = dts
self._log = log
self._rwlog_hdl = rwlog_hdl
+ self._project = project
self._reg = None
self.accounts = {}
@@ -144,9 +121,16 @@
self.delete_account(account_msg.name)
self.add_account(account_msg)
+ def deregister(self):
+ self._log.debug("Project {}: De-register cloud account handler".
+ format(self._project))
+ if self._reg:
+ self._reg.deregister()
+ self._reg = None
+
def register(self):
@asyncio.coroutine
- def apply_config(dts, acg, xact, action, _):
+ def apply_config(dts, acg, xact, action, scratch):
self._log.debug("Got cloud account apply config (xact: %s) (action: %s)", xact, action)
if xact.xact is None:
@@ -188,12 +172,16 @@
""" Prepare callback from DTS for Cloud Account """
action = xact_info.query_action
+
+ xpath = ks_path.to_xpath(RwCloudYang.get_schema())
+
self._log.debug("Cloud account on_prepare config received (action: %s): %s",
xact_info.query_action, msg)
if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE]:
if msg.name in self.accounts:
- self._log.debug("Cloud account already exists. Invoking update request")
+ self._log.debug("Cloud account {} already exists. " \
+ "Invoking update request".format(msg.name))
# Since updates are handled by a delete followed by an add, invoke the
# delete prepare callbacks to give clients an opportunity to reject.
@@ -241,9 +229,10 @@
on_apply=apply_config,
)
+ xpath = self._project.add_project(CloudAccountConfigSubscriber.XPATH)
with self._dts.appconf_group_create(acg_handler) as acg:
self._reg = acg.register(
- xpath=CloudAccountConfigSubscriber.XPATH,
+ xpath=xpath,
flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE,
on_prepare=on_prepare,
)
diff --git a/common/python/rift/mano/cloud/operdata.py b/common/python/rift/mano/cloud/operdata.py
index 12ab801..a86f638 100644
--- a/common/python/rift/mano/cloud/operdata.py
+++ b/common/python/rift/mano/cloud/operdata.py
@@ -21,6 +21,7 @@
from gi.repository import(
RwCloudYang,
RwDts as rwdts,
+ RwTypes,
)
class CloudAccountNotFound(Exception):
@@ -28,11 +29,14 @@
class CloudAccountDtsOperdataHandler(object):
- def __init__(self, dts, log, loop):
+ def __init__(self, dts, log, loop, project):
self._dts = dts
self._log = log
self._loop = loop
+ self._project = project
+ self._regh = None
+ self._rpc = None
self.cloud_accounts = {}
def add_cloud_account(self, account):
@@ -71,13 +75,13 @@
def _register_show_status(self):
def get_xpath(cloud_name=None):
- return "D,/rw-project:project/rw-cloud:cloud/account{}/connection-status".format(
- "[name='%s']" % cloud_name if cloud_name is not None else ''
- )
+ return "D,/rw-cloud:cloud/account{}/connection-status".format(
+ "[name='%s']" % cloud_name if cloud_name is not None else ''
+ )
@asyncio.coroutine
def on_prepare(xact_info, action, ks_path, msg):
- path_entry = RwCloudYang.CloudAccount.schema().keyspec_to_entry(ks_path)
+ path_entry = RwCloudYang.CloudAcc.schema().keyspec_to_entry(ks_path)
cloud_account_name = path_entry.key00.name
self._log.debug("Got show cloud connection status request: %s", ks_path.create_string())
@@ -86,9 +90,10 @@
for account in saved_accounts:
connection_status = account.connection_status
self._log.debug("Responding to cloud connection status request: %s", connection_status)
+ xpath = self._project.add_project(get_xpath(account.name))
xact_info.respond_xpath(
rwdts.XactRspCode.MORE,
- xpath=get_xpath(account.name),
+ xpath=xpath,
msg=account.connection_status,
)
except KeyError as e:
@@ -98,8 +103,9 @@
xact_info.respond_xpath(rwdts.XactRspCode.ACK)
- yield from self._dts.register(
- xpath=get_xpath(),
+ xpath = self._project.add_project(get_xpath())
+ self._regh = yield from self._dts.register(
+ xpath=xpath,
handler=rift.tasklets.DTS.RegistrationHandler(
on_prepare=on_prepare),
flags=rwdts.Flag.PUBLISHER,
@@ -113,12 +119,20 @@
def on_prepare(xact_info, action, ks_path, msg):
if not msg.has_field("cloud_account"):
raise CloudAccountNotFound("Cloud account name not provided")
-
cloud_account_name = msg.cloud_account
+
+ if not self._project.rpc_check(msg, xact_info=xact_info):
+ return
+
try:
account = self.cloud_accounts[cloud_account_name]
except KeyError:
- raise CloudAccountNotFound("Cloud account name %s not found" % cloud_account_name)
+ errmsg = "Cloud account name {} not found in project {}". \
+ format(cloud_account_name, self._project.name)
+ xact_info.send_error_xpath(RwTypes.RwStatus.FAILURE,
+ get_xpath(),
+ errmsg)
+ raise CloudAccountNotFound(errmsg)
account.start_validate_credentials(self._loop)
@@ -126,7 +140,7 @@
xact_info.respond_xpath(rwdts.XactRspCode.ACK)
- yield from self._dts.register(
+ self._rpc = yield from self._dts.register(
xpath=get_xpath(),
handler=rift.tasklets.DTS.RegistrationHandler(
on_prepare=on_prepare
@@ -138,3 +152,7 @@
def register(self):
yield from self._register_show_status()
yield from self._register_validate_rpc()
+
+ def deregister(self):
+ yield from self._rpc.deregister()
+ yield from self._regh.deregister()
diff --git a/common/python/rift/mano/config_agent/config.py b/common/python/rift/mano/config_agent/config.py
index 7500bac..daee792 100644
--- a/common/python/rift/mano/config_agent/config.py
+++ b/common/python/rift/mano/config_agent/config.py
@@ -21,6 +21,7 @@
import gi
gi.require_version('RwDts', '1.0')
import rift.tasklets
+from rift.mano.utils.project import get_add_delete_update_cfgs
from gi.repository import (
RwcalYang as rwcal,
@@ -36,32 +37,6 @@
pass
-def get_add_delete_update_cfgs(dts_member_reg, xact, key_name):
- # Unforunately, it is currently difficult to figure out what has exactly
- # changed in this xact without Pbdelta support (RIFT-4916)
- # As a workaround, we can fetch the pre and post xact elements and
- # perform a comparison to figure out adds/deletes/updates
- xact_cfgs = list(dts_member_reg.get_xact_elements(xact))
- curr_cfgs = list(dts_member_reg.elements)
-
- xact_key_map = {getattr(cfg, key_name): cfg for cfg in xact_cfgs}
- curr_key_map = {getattr(cfg, key_name): cfg for cfg in curr_cfgs}
-
- # Find Adds
- added_keys = set(xact_key_map) - set(curr_key_map)
- added_cfgs = [xact_key_map[key] for key in added_keys]
-
- # Find Deletes
- deleted_keys = set(curr_key_map) - set(xact_key_map)
- deleted_cfgs = [curr_key_map[key] for key in deleted_keys]
-
- # Find Updates
- updated_keys = set(curr_key_map) & set(xact_key_map)
- updated_cfgs = [xact_key_map[key] for key in updated_keys if xact_key_map[key] != curr_key_map[key]]
-
- return added_cfgs, deleted_cfgs, updated_cfgs
-
-
class ConfigAgentCallbacks(object):
def __init__(self,
on_add_apply=None, on_add_prepare=None,
@@ -101,9 +76,10 @@
class ConfigAgentSubscriber(object):
XPATH = "C,/rw-config-agent:config-agent/account"
- def __init__(self, dts, log, config_callbacks):
+ def __init__(self, dts, log, project, config_callbacks):
self._dts = dts
self._log = log
+ self._project = project
self._reg = None
self.accounts = {}
@@ -139,6 +115,13 @@
self.delete_account(account_msg)
self.add_account(account_msg)
+ def deregister(self):
+ self._log.debug("De-register config agent handler for project {}".
+ format(self._project.name))
+ if self._reg:
+ self._reg.deregister()
+ self._reg = None
+
def register(self):
def apply_config(dts, acg, xact, action, _):
self._log.debug("Got config account apply config (xact: %s) (action: %s)", xact, action)
@@ -212,17 +195,17 @@
xact_info.respond_xpath(rwdts.XactRspCode.ACK)
- self._log.debug("Registering for Config Account config using xpath: %s",
- ConfigAgentSubscriber.XPATH,
- )
acg_handler = rift.tasklets.AppConfGroup.Handler(
on_apply=apply_config,
)
with self._dts.appconf_group_create(acg_handler) as acg:
+ xpath = self._project.add_project(ConfigAgentSubscriber.XPATH)
+ self._log.debug("Registering for Config Account config using xpath: %s",
+ xpath)
self._reg = acg.register(
- xpath=ConfigAgentSubscriber.XPATH,
+ xpath=xpath,
flags=rwdts.Flag.SUBSCRIBER,
on_prepare=on_prepare,
)
diff --git a/common/python/rift/mano/config_agent/operdata.py b/common/python/rift/mano/config_agent/operdata.py
index 729a1f1..61ae5f8 100644
--- a/common/python/rift/mano/config_agent/operdata.py
+++ b/common/python/rift/mano/config_agent/operdata.py
@@ -27,7 +27,6 @@
RwDts as rwdts)
import rift.tasklets
-
import rift.mano.utils.juju_api as juju
@@ -153,12 +152,15 @@
)
class CfgAgentDtsOperdataHandler(object):
- def __init__(self, dts, log, loop):
+ def __init__(self, dts, log, loop, project):
self._dts = dts
self._log = log
self._loop = loop
+ self._project = project
self.cfg_agent_accounts = {}
+ self._show_reg = None
+ self._rpc_reg = None
def add_cfg_agent_account(self, account_msg):
account = ConfigAgentAccount(self._log, account_msg)
@@ -205,9 +207,10 @@
for account in saved_accounts:
connection_status = account.connection_status
self._log.debug("Responding to config agent connection status request: %s", connection_status)
+ xpath = self._project.add_project(get_xpath(account.name))
xact_info.respond_xpath(
rwdts.XactRspCode.MORE,
- xpath=get_xpath(account.name),
+ xpath=xpath,
msg=account.connection_status,
)
except KeyError as e:
@@ -217,12 +220,13 @@
xact_info.respond_xpath(rwdts.XactRspCode.ACK)
- yield from self._dts.register(
- xpath=get_xpath(),
- handler=rift.tasklets.DTS.RegistrationHandler(
- on_prepare=on_prepare),
- flags=rwdts.Flag.PUBLISHER,
- )
+ xpath = self._project.add_project(get_xpath())
+ self._show_reg = yield from self._dts.register(
+ xpath=xpath,
+ handler=rift.tasklets.DTS.RegistrationHandler(
+ on_prepare=on_prepare),
+ flags=rwdts.Flag.PUBLISHER,
+ )
def _register_validate_rpc(self):
def get_xpath():
@@ -234,6 +238,10 @@
raise ConfigAgentAccountNotFound("Config Agent account name not provided")
cfg_agent_account_name = msg.cfg_agent_account
+
+ if not self._project.rpc_check(msg, xact_info=xact_info):
+ return
+
try:
account = self.cfg_agent_accounts[cfg_agent_account_name]
except KeyError:
@@ -243,24 +251,29 @@
xact_info.respond_xpath(rwdts.XactRspCode.ACK)
- yield from self._dts.register(
- xpath=get_xpath(),
- handler=rift.tasklets.DTS.RegistrationHandler(
- on_prepare=on_prepare
- ),
- flags=rwdts.Flag.PUBLISHER,
- )
+ self._rpc_reg = yield from self._dts.register(
+ xpath=get_xpath(),
+ handler=rift.tasklets.DTS.RegistrationHandler(
+ on_prepare=on_prepare
+ ),
+ flags=rwdts.Flag.PUBLISHER,
+ )
@asyncio.coroutine
def register(self):
yield from self._register_show_status()
yield from self._register_validate_rpc()
+ def deregister(self):
+ self._show_reg.deregister()
+ self._rpc_reg.deregister()
+
+
class ConfigAgentJob(object):
"""A wrapper over the config agent job object, providing some
convenience functions.
- YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob contains
+ YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob contains
||
==> VNFRS
||
@@ -274,17 +287,19 @@
"running" : "pending",
"failed" : "failure"}
- def __init__(self, nsr_id, job, tasks=None):
+ def __init__(self, nsr_id, job, project, tasks=None):
"""
Args:
nsr_id (uuid): ID of NSR record
- job (YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob): Gi object
+ job (YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob): Gi object
tasks: List of asyncio.tasks. If provided the job monitor will
use it to monitor the tasks instead of the execution IDs
"""
self._job = job
self.nsr_id = nsr_id
self.tasks = tasks
+ self._project = project
+
self._regh = None
@property
@@ -315,10 +330,10 @@
@property
def xpath(self):
"""Xpath of the job"""
- return ("D,/nsr:ns-instance-opdata" +
+ return self._project.add_project(("D,/nsr:ns-instance-opdata" +
"/nsr:nsr[nsr:ns-instance-config-ref='{}']" +
"/nsr:config-agent-job[nsr:job-id='{}']"
- ).format(self.nsr_id, self.id)
+ ).format(self.nsr_id, self.id))
@property
def regh(self):
@@ -333,7 +348,7 @@
@staticmethod
def convert_rpc_input_to_job(nsr_id, rpc_output, tasks):
"""A helper function to convert the YangOutput_Nsr_ExecNsConfigPrimitive
- to YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob (NsrYang)
+ to YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob (NsrYang)
Args:
nsr_id (uuid): NSR ID
@@ -344,10 +359,10 @@
ConfigAgentJob
"""
# Shortcuts to prevent the HUUGE names.
- CfgAgentJob = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob
- CfgAgentVnfr = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr
- CfgAgentPrimitive = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr_Primitive
- CfgAgentPrimitiveParam = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr_Primitive_Parameter
+ CfgAgentJob = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob
+ CfgAgentVnfr = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr
+ CfgAgentPrimitive = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr_Primitive
+ CfgAgentPrimitiveParam = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr_Primitive_Parameter
job = CfgAgentJob.from_dict({
"job_id": rpc_output.job_id,
@@ -385,7 +400,7 @@
job.vnfr.append(vnfr_job)
- return ConfigAgentJob(nsr_id, job, tasks)
+ return ConfigAgentJob(nsr_id, job, project, tasks)
class ConfigAgentJobMonitor(object):
@@ -666,6 +681,7 @@
self._regh = None
self._nsr_regh = None
+ self._project = cfgm.project
@property
def regh(self):
@@ -684,9 +700,9 @@
@staticmethod
def cfg_job_xpath(nsr_id, job_id):
- return ("D,/nsr:ns-instance-opdata" +
+ return self._project.add_project(("D,/nsr:ns-instance-opdata" +
"/nsr:nsr[nsr:ns-instance-config-ref = '{}']" +
- "/nsr:config-agent-job[nsr:job-id='{}']").format(nsr_id, job_id)
+ "/nsr:config-agent-job[nsr:job-id='{}']").format(nsr_id, job_id))
@asyncio.coroutine
def register(self):
@@ -697,7 +713,7 @@
""" prepare callback from dts """
xpath = ks_path.to_xpath(RwNsrYang.get_schema())
if action == rwdts.QueryAction.READ:
- schema = RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr.schema()
+ schema = RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr.schema()
path_entry = schema.keyspec_to_entry(ks_path)
try:
nsr_id = path_entry.key00.ns_instance_config_ref
@@ -728,7 +744,8 @@
hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
with self._dts.group_create() as group:
- self._regh = group.register(xpath=CfgAgentJobDtsHandler.XPATH,
+ self._regh = group.register(xpath=self._project.add_project(
+ CfgAgentJobDtsHandler.XPATH),
handler=hdl,
flags=rwdts.Flag.PUBLISHER,
)
@@ -749,7 +766,7 @@
@property
def nsr_xpath(self):
- return "D,/nsr:ns-instance-opdata/nsr:nsr"
+ return self._project.add_project("D,/nsr:ns-instance-opdata/nsr:nsr")
@asyncio.coroutine
def register_for_nsr(self):
@@ -784,6 +801,17 @@
except Exception as e:
self._log.error("Failed to register for NSR changes as %s", str(e))
+ def deregister(self):
+ self._log.debug("De-register config agent job for project".
+ format(self._project.name))
+ if self._regh:
+ self._regh.deregister()
+ self._regh = None
+
+ if self._nsr_regh:
+ self._nsr_regh.deregister()
+ self._nsr_regh = None
+
class ConfigAgentJobManager(object):
"""A central class that manager all the Config Agent related data,
@@ -791,7 +819,7 @@
TODO: Needs to support multiple config agents.
"""
- def __init__(self, dts, log, loop, nsm):
+ def __init__(self, dts, log, loop, project, nsm):
"""
Args:
dts : Dts handle
@@ -804,11 +832,12 @@
self.log = log
self.loop = loop
self.nsm = nsm
+ self.project = project
self.handler = CfgAgentJobDtsHandler(dts, log, loop, nsm, self)
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
def add_job(self, rpc_output, tasks=None):
- """Once an RPC is trigger add a now job
+ """Once an RPC is triggered, add a new job
Args:
rpc_output (YangOutput_Nsr_ExecNsConfigPrimitive): Rpc output
@@ -818,7 +847,8 @@
"""
nsr_id = rpc_output.nsr_id_ref
- job = ConfigAgentJob.convert_rpc_input_to_job(nsr_id, rpc_output, tasks)
+ job = ConfigAgentJob.convert_rpc_input_to_job(nsr_id, rpc_output,
+ tasks, self.project)
self.log.debug("Creating a job monitor for Job id: {}".format(
rpc_output.job_id))
@@ -864,3 +894,6 @@
def register(self):
yield from self.handler.register()
yield from self.handler.register_for_nsr()
+
+ def deregister(self):
+ yield from self.handler.deregister()
diff --git a/common/python/rift/mano/config_data/test/test_converter.py b/common/python/rift/mano/config_data/test/test_converter.py
index 1bfd7d7..6169d73 100644
--- a/common/python/rift/mano/config_data/test/test_converter.py
+++ b/common/python/rift/mano/config_data/test/test_converter.py
@@ -23,14 +23,14 @@
@pytest.fixture(scope="function")
def nsd():
- catalog = NsdYang.YangData_Nsd_NsdCatalog()
+ catalog = NsdYang.YangData_RwProject_Project_NsdCatalog()
nsd = catalog.nsd.add()
nsd.id = str(uuid.uuid1())
return nsd
@pytest.fixture(scope="function")
def vnfd():
- catalog = VnfdYang.YangData_Vnfd_VnfdCatalog()
+ catalog = VnfdYang.YangData_RwProject_Project_VnfdCatalog()
vnfd = catalog.vnfd.add()
vnfd.id = str(uuid.uuid1())
return vnfd
@@ -287,7 +287,7 @@
cidr: 10.10.10.2/30
"""
- catalog = VnfdYang.YangData_Vnfd_VnfdCatalog()
+ catalog = VnfdYang.YangData_RwProject_Project_VnfdCatalog()
expected_vnfd = catalog.vnfd.add()
vnf_config = expected_vnfd.vnf_configuration
expected_vnfd.id = vnfd.id
@@ -374,7 +374,7 @@
Vlan ID: '3000'
"""
- catalog = NsdYang.YangData_Nsd_NsdCatalog()
+ catalog = NsdYang.YangData_RwProject_Project_NsdCatalog()
expected_nsd = catalog.nsd.add()
expected_nsd.id = nsd.id
expected_nsd.service_primitive.add().from_dict(
diff --git a/common/python/rift/mano/dts/core.py b/common/python/rift/mano/dts/core.py
index 4894e16..3a04945 100644
--- a/common/python/rift/mano/dts/core.py
+++ b/common/python/rift/mano/dts/core.py
@@ -25,7 +25,7 @@
"""A common class to hold the barebone objects to build a publisher or
subscriber
"""
- def __init__(self, log, dts, loop):
+ def __init__(self, log, dts, loop, project):
"""Constructor
Args:
@@ -34,7 +34,39 @@
loop : Asyncio event loop.
"""
# Reg handle
- self.reg = None
- self.log = log
- self.dts = dts
- self.loop = loop
+ self._reg = None
+ self._log = log
+ self._dts = dts
+ self._loop = loop
+ self._project = project
+
+ @property
+ def reg(self):
+ return self._reg
+
+ @reg.setter
+ def reg(self, val):
+ self._reg = val
+
+ @property
+ def log(self):
+ return self._log
+
+ @property
+ def dts(self):
+ return self._dts
+
+ @property
+ def loop(self):
+ return self._loop
+
+ @property
+ def project(self):
+ return self._project
+
+ def deregister(self):
+ self._log.debug("De-registering DTS handler ({}) for project {}".
+ format(self.__class__.__name__, self._project))
+ if self._reg:
+ self._reg.deregister()
+ self._reg = None
diff --git a/common/python/rift/mano/dts/rpc/core.py b/common/python/rift/mano/dts/rpc/core.py
index dfa08bb..72016f1 100644
--- a/common/python/rift/mano/dts/rpc/core.py
+++ b/common/python/rift/mano/dts/rpc/core.py
@@ -36,8 +36,8 @@
class AbstractRpcHandler(DtsHandler):
"""Base class to simplify RPC implementation
"""
- def __init__(self, log, dts, loop):
- super().__init__(log, dts, loop)
+ def __init__(self, log, dts, loop, project=None):
+ super().__init__(log, dts, loop, project)
if not asyncio.iscoroutinefunction(self.callback):
raise ValueError('%s has to be a coroutine' % (self.callback))
@@ -61,6 +61,9 @@
def on_prepare(self, xact_info, action, ks_path, msg):
assert action == rwdts.QueryAction.RPC
+ if self.project and not self.project.rpc_check(msg, xact_info=xact_info):
+ return
+
try:
rpc_op = yield from self.callback(ks_path, msg)
xact_info.respond_xpath(
@@ -76,6 +79,11 @@
@asyncio.coroutine
def register(self):
+ if self.reg:
+ self._log.warning("RPC already registered for project {}".
+ format(self._project.name))
+ return
+
reg_event = asyncio.Event(loop=self.loop)
@asyncio.coroutine
@@ -94,6 +102,10 @@
yield from reg_event.wait()
+ def deregister(self):
+ self.reg.deregister()
+ self.reg = None
+
@abc.abstractmethod
@asyncio.coroutine
def callback(self, ks_path, msg):
diff --git a/common/python/rift/mano/dts/subscriber/core.py b/common/python/rift/mano/dts/subscriber/core.py
index dd2513e..ebd38f9 100644
--- a/common/python/rift/mano/dts/subscriber/core.py
+++ b/common/python/rift/mano/dts/subscriber/core.py
@@ -1,6 +1,6 @@
"""
-#
-# Copyright 2016 RIFT.IO Inc
+#
+# Copyright 2016-2017 RIFT.IO Inc
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -27,6 +27,9 @@
from gi.repository import (RwDts as rwdts, ProtobufC)
import rift.tasklets
+from rift.mano.utils.project import (
+ get_add_delete_update_cfgs,
+ )
from ..core import DtsHandler
@@ -35,11 +38,11 @@
"""A common class for all subscribers.
"""
@classmethod
- def from_tasklet(cls, tasklet, callback=None):
+ def from_project(cls, proj, callback=None):
"""Convenience method to build the object from tasklet
Args:
- tasklet (rift.tasklets.Tasklet): Tasklet
+ proj (rift.mano.utils.project.ManoProject): Project
callback (None, optional): Callable, which will be invoked on
subscriber changes.
@@ -48,15 +51,15 @@
msg: The Gi Object msg from DTS
action(rwdts.QueryAction): Action type
"""
- return cls(tasklet.log, tasklet.dts, tasklet.loop, callback=callback)
+ return cls(proj.log, proj.dts, proj.loop, proj, callback=callback)
- def __init__(self, log, dts, loop, callback=None):
- super().__init__(log, dts, loop)
+ def __init__(self, log, dts, loop, project, callback=None):
+ super().__init__(log, dts, loop, project)
self.callback = callback
def get_reg_flags(self):
"""Default set of REG flags, can be over-ridden by sub classes.
-
+
Returns:
Set of rwdts.Flag types.
"""
@@ -70,7 +73,7 @@
Opdata subscriber can be created in one step by subclassing and implementing
the MANDATORY get_xpath() method
-
+
"""
@abc.abstractmethod
def get_xpath(self):
@@ -84,6 +87,12 @@
def register(self):
"""Triggers the registration
"""
+
+ if self.reg:
+ self._log.warning("RPC already registered for project {}".
+ format(self._project.name))
+ return
+
xacts = {}
def on_commit(xact_info):
@@ -121,7 +130,7 @@
)
self.reg = yield from self.dts.register(
- xpath=self.get_xpath(),
+ xpath=self.project.add_project(self.get_xpath()),
flags=self.get_reg_flags(),
handler=handler)
@@ -129,9 +138,6 @@
assert self.reg is not None
- def deregister(self):
- self.reg.deregister()
-
class AbstractConfigSubscriber(SubscriberDtsHandler):
"""Abstract class that simplifies the process of creating subscribers
@@ -139,7 +145,7 @@
Config subscriber can be created in one step by subclassing and implementing
the MANDATORY get_xpath() method
-
+
"""
KEY = "msgs"
@@ -151,41 +157,21 @@
def key_name(self):
pass
- def get_add_delete_update_cfgs(self, dts_member_reg, xact, key_name):
- # Unforunately, it is currently difficult to figure out what has exactly
- # changed in this xact without Pbdelta support (RIFT-4916)
- # As a workaround, we can fetch the pre and post xact elements and
- # perform a comparison to figure out adds/deletes/updates
- xact_cfgs = list(dts_member_reg.get_xact_elements(xact))
- curr_cfgs = list(dts_member_reg.elements)
-
- xact_key_map = {getattr(cfg, key_name): cfg for cfg in xact_cfgs}
- curr_key_map = {getattr(cfg, key_name): cfg for cfg in curr_cfgs}
-
- # Find Adds
- added_keys = set(xact_key_map) - set(curr_key_map)
- added_cfgs = [xact_key_map[key] for key in added_keys]
-
- # Find Deletes
- deleted_keys = set(curr_key_map) - set(xact_key_map)
- deleted_cfgs = [curr_key_map[key] for key in deleted_keys]
-
- # Find Updates
- updated_keys = set(curr_key_map) & set(xact_key_map)
- updated_cfgs = [xact_key_map[key] for key in updated_keys if xact_key_map[key] != curr_key_map[key]]
-
- return added_cfgs, deleted_cfgs, updated_cfgs
-
@asyncio.coroutine
def register(self):
""" Register for VNFD configuration"""
+ if self.reg:
+ self._log.warning("RPC already registered for project {}".
+ format(self._project.name))
+ return
+
def on_apply(dts, acg, xact, action, scratch):
"""Apply the configuration"""
is_recovery = xact.xact is None and action == rwdts.AppconfAction.INSTALL
- add_cfgs, delete_cfgs, update_cfgs = self.get_add_delete_update_cfgs(
+ add_cfgs, delete_cfgs, update_cfgs = get_add_delete_update_cfgs(
dts_member_reg=self.reg,
xact=xact,
key_name=self.key_name())
@@ -207,9 +193,6 @@
acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply)
with self.dts.appconf_group_create(handler=acg_hdl) as acg:
self.reg = acg.register(
- xpath=self.get_xpath(),
+ xpath=self.project.add_project(self.get_xpath()),
flags=self.get_reg_flags(),
on_prepare=on_prepare)
-
- def deregister(self):
- self.reg.deregister()
diff --git a/common/python/rift/mano/dts/subscriber/ns_subscriber.py b/common/python/rift/mano/dts/subscriber/ns_subscriber.py
index c16f771..a379b9a 100644
--- a/common/python/rift/mano/dts/subscriber/ns_subscriber.py
+++ b/common/python/rift/mano/dts/subscriber/ns_subscriber.py
@@ -39,7 +39,7 @@
return rwdts.Flag.SUBSCRIBER|rwdts.Flag.DELTA_READY
def get_xpath(self):
- return "D,/nsr:ns-instance-opdata/nsr:nsr"
+ return self._project.add_project("D,/nsr:ns-instance-opdata/nsr:nsr")
class NsdCatalogSubscriber(core.AbstractConfigSubscriber):
@@ -49,7 +49,7 @@
return "id"
def get_xpath(self):
- return "C,/nsd:nsd-catalog/nsd:nsd"
+ return self._project.add_project("C,/nsd:nsd-catalog/nsd:nsd")
class NsInstanceConfigSubscriber(core.AbstractConfigSubscriber):
@@ -59,4 +59,4 @@
return "id"
def get_xpath(self):
- return "C,/nsr:ns-instance-config/nsr:nsr"
+ return self._project.add_project("C,/nsr:ns-instance-config/nsr:nsr")
diff --git a/common/python/rift/mano/dts/subscriber/ro_account.py b/common/python/rift/mano/dts/subscriber/ro_account.py
index 575d649..3091d15 100644
--- a/common/python/rift/mano/dts/subscriber/ro_account.py
+++ b/common/python/rift/mano/dts/subscriber/ro_account.py
@@ -33,4 +33,4 @@
return "name"
def get_xpath(self):
- return("C,/rw-launchpad:resource-orchestrator")
\ No newline at end of file
+ return self._project.add_project("C,/rw-launchpad:resource-orchestrator")
diff --git a/common/python/rift/mano/dts/subscriber/store.py b/common/python/rift/mano/dts/subscriber/store.py
index 88cb79a..222d444 100644
--- a/common/python/rift/mano/dts/subscriber/store.py
+++ b/common/python/rift/mano/dts/subscriber/store.py
@@ -33,10 +33,10 @@
"""
KEY = enum.Enum('KEY', 'NSR NSD VNFD VNFR')
- def __init__(self, log, dts, loop, callback=None):
- super().__init__(log, dts, loop)
+ def __init__(self, log, dts, loop, project, callback=None):
+ super().__init__(log, dts, loop, project)
- params = (self.log, self.dts, self.loop)
+ params = (self.log, self.dts, self.loop, self.project)
self._nsr_sub = ns_subscriber.NsrCatalogSubscriber(*params, callback=self.on_nsr_change)
self._nsrs = {}
@@ -92,6 +92,14 @@
yield from self._vnfr_sub.register()
yield from self._nsr_sub.register()
+ def deregister(self):
+ self._log.debug("De-register store for project {}".
+ format(self._project))
+ self._vnfd_sub.deregister()
+ self._nsd_sub.deregister()
+ self._vnfr_sub.deregister()
+ self._nsr_sub.deregister()
+
@asyncio.coroutine
def refresh_store(self, subsriber, store):
itr = yield from self.dts.query_read(subsriber.get_xpath())
diff --git a/common/python/rift/mano/dts/subscriber/test/utest_subscriber_dts.py b/common/python/rift/mano/dts/subscriber/test/utest_subscriber_dts.py
index a69a00f..bf7ae68 100644
--- a/common/python/rift/mano/dts/subscriber/test/utest_subscriber_dts.py
+++ b/common/python/rift/mano/dts/subscriber/test/utest_subscriber_dts.py
@@ -107,10 +107,10 @@
def test_vnfd_handler(self):
yield from self.store.register()
- mock_vnfd = RwVnfdYang.YangData_Vnfd_VnfdCatalog_Vnfd()
+ mock_vnfd = RwVnfdYang.YangData_RwProject_Project_VnfdCatalog_Vnfd()
mock_vnfd.id = str(uuid.uuid1())
- w_xpath = "C,/vnfd:vnfd-catalog/vnfd:vnfd"
+ w_xpath = "C,/rw-project:project/vnfd:vnfd-catalog/vnfd:vnfd"
xpath = "{}[vnfd:id='{}']".format(w_xpath, mock_vnfd.id)
yield from self.publisher.publish(w_xpath, xpath, mock_vnfd)
@@ -128,10 +128,10 @@
def test_vnfr_handler(self):
yield from self.store.register()
- mock_vnfr = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr()
+ mock_vnfr = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr()
mock_vnfr.id = str(uuid.uuid1())
- w_xpath = "D,/vnfr:vnfr-catalog/vnfr:vnfr"
+ w_xpath = "D,/rw-project:project/vnfr:vnfr-catalog/vnfr:vnfr"
xpath = "{}[vnfr:id='{}']".format(w_xpath, mock_vnfr.id)
yield from self.publisher.publish(w_xpath, xpath, mock_vnfr)
@@ -151,11 +151,11 @@
def test_nsr_handler(self):
yield from self.store.register()
- mock_nsr = RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr()
+ mock_nsr = RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr()
mock_nsr.ns_instance_config_ref = str(uuid.uuid1())
mock_nsr.name_ref = "Foo"
- w_xpath = "D,/nsr:ns-instance-opdata/nsr:nsr"
+ w_xpath = "D,/rw-project:project/nsr:ns-instance-opdata/nsr:nsr"
xpath = "{}[nsr:ns-instance-config-ref='{}']".format(w_xpath, mock_nsr.ns_instance_config_ref)
yield from self.publisher.publish(w_xpath, xpath, mock_nsr)
@@ -175,10 +175,10 @@
def test_nsd_handler(self):
yield from self.store.register()
- mock_nsd = RwNsdYang.YangData_Nsd_NsdCatalog_Nsd()
+ mock_nsd = RwNsdYang.YangData_RwProject_Project_NsdCatalog_Nsd()
mock_nsd.id = str(uuid.uuid1())
- w_xpath = "C,/nsd:nsd-catalog/nsd:nsd"
+ w_xpath = "C,/rw-project:project/nsd:nsd-catalog/nsd:nsd"
xpath = "{}[nsd:id='{}']".format(w_xpath, mock_nsd.id)
yield from self.publisher.publish(w_xpath, xpath, mock_nsd)
@@ -206,22 +206,22 @@
# publish
yield from vnf_handler.register()
- mock_vnfr = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr()
+ mock_vnfr = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr()
mock_vnfr.id = str(uuid.uuid1())
def mon_xpath(param_id=None):
""" Monitoring params xpath """
- return("D,/vnfr:vnfr-catalog" +
+ return("D,/rw-project:project/vnfr:vnfr-catalog" +
"/vnfr:vnfr[vnfr:id='{}']".format(mock_vnfr.id) +
"/vnfr:monitoring-param" +
("[vnfr:id='{}']".format(param_id) if param_id else ""))
- w_xpath = "D,/vnfr:vnfr-catalog/vnfr:vnfr"
+ w_xpath = "D,/rw-project:project/vnfr:vnfr-catalog/vnfr:vnfr"
xpath = "{}[vnfr:id='{}']".format(w_xpath, mock_vnfr.id)
yield from self.publisher.publish(w_xpath, xpath, mock_vnfr)
- mock_param = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_MonitoringParam.from_dict({
+ mock_param = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_MonitoringParam.from_dict({
"id": "1"
})
mock_vnfr.monitoring_param.append(mock_param)
diff --git a/common/python/rift/mano/dts/subscriber/vnf_subscriber.py b/common/python/rift/mano/dts/subscriber/vnf_subscriber.py
index 76a58ab..85ec589 100644
--- a/common/python/rift/mano/dts/subscriber/vnf_subscriber.py
+++ b/common/python/rift/mano/dts/subscriber/vnf_subscriber.py
@@ -38,7 +38,7 @@
return rwdts.Flag.SUBSCRIBER|rwdts.Flag.DELTA_READY
def get_xpath(self):
- return "D,/vnfr:vnfr-catalog/vnfr:vnfr"
+ return self.project.add_project("D,/vnfr:vnfr-catalog/vnfr:vnfr")
class VnfdCatalogSubscriber(core.AbstractConfigSubscriber):
@@ -48,4 +48,4 @@
return "id"
def get_xpath(self):
- return "C,/vnfd:vnfd-catalog/vnfd:vnfd"
+ return self.project.add_project("C,/vnfd:vnfd-catalog/vnfd:vnfd")
diff --git a/common/python/rift/mano/tosca_translator/rwmano/syntax/mano_parameter.py b/common/python/rift/mano/tosca_translator/rwmano/syntax/mano_parameter.py
index aa6b83b..7df584f 100644
--- a/common/python/rift/mano/tosca_translator/rwmano/syntax/mano_parameter.py
+++ b/common/python/rift/mano/tosca_translator/rwmano/syntax/mano_parameter.py
@@ -42,7 +42,7 @@
# TODO(Philip): Harcoding for now, need to make this generic
def get_xpath(self):
- xpath = '/nsd:nsd-catalog/nsd:nsd/nsd:' + self.name
+ xpath = '/rw-project:project/nsd:nsd-catalog/nsd:nsd/nsd:' + self.name
return xpath
def get_dict_output(self):
diff --git a/common/python/rift/mano/tosca_translator/rwmano/syntax/mano_template.py b/common/python/rift/mano/tosca_translator/rwmano/syntax/mano_template.py
index d263e6f..edcbef6 100644
--- a/common/python/rift/mano/tosca_translator/rwmano/syntax/mano_template.py
+++ b/common/python/rift/mano/tosca_translator/rwmano/syntax/mano_template.py
@@ -61,7 +61,7 @@
if use_gi:
try:
- nsd_cat = RwNsdYang.YangData_Nsd_NsdCatalog()
+ nsd_cat = RwNsdYang.YangData_RwProject_Project_NsdCatalog()
nsd = nsd_cat.nsd.add()
nsd.id = nsd_id
nsd.name = self.metadata['name']
@@ -111,7 +111,7 @@
if use_gi:
for param in self.parameters:
nsd.input_parameter_xpath.append(
- NsdYang.YangData_Nsd_NsdCatalog_Nsd_InputParameterXpath(
+ NsdYang.YangData_RwProject_Project_NsdCatalog_Nsd_InputParameterXpath(
xpath=param.get_xpath(),
)
)
diff --git a/common/python/rift/mano/tosca_translator/rwmano/tosca/tosca_nfv_vnf.py b/common/python/rift/mano/tosca_translator/rwmano/tosca/tosca_nfv_vnf.py
index 3e52967..6356601 100644
--- a/common/python/rift/mano/tosca_translator/rwmano/tosca/tosca_nfv_vnf.py
+++ b/common/python/rift/mano/tosca_translator/rwmano/tosca/tosca_nfv_vnf.py
@@ -222,7 +222,7 @@
format(self.name, self.properties))
def generate_yang_model_gi(self, nsd, vnfds):
- vnfd_cat = RwVnfdYang.YangData_Vnfd_VnfdCatalog()
+ vnfd_cat = RwVnfdYang.YangData_RwProject_Project_VnfdCatalog()
vnfd = vnfd_cat.vnfd.add()
props = convert_keys_to_python(self.properties)
try:
diff --git a/common/python/rift/mano/utils/project.py b/common/python/rift/mano/utils/project.py
index 9c303f9..637c0ca 100644
--- a/common/python/rift/mano/utils/project.py
+++ b/common/python/rift/mano/utils/project.py
@@ -16,6 +16,21 @@
# limitations under the License.
#
+import abc
+import asyncio
+import logging
+
+import gi
+gi.require_version('RwProjectYang', '1.0')
+gi.require_version('RwDtsYang', '1.0')
+from gi.repository import (
+ RwProjectYang,
+ RwDts as rwdts,
+ ProtobufC,
+)
+
+import rift.tasklets
+
class ManoProjectError(Exception):
pass
@@ -33,19 +48,37 @@
pass
+class ManoProjXpathNotRootErr(ManoProjectError):
+ pass
+
+
+class ManoProjXpathPresentErr(ManoProjectError):
+ pass
+
+
+NS = 'rw-project'
+PROJECT = 'project'
+NS_PROJECT = '{}:{}'.format(NS, PROJECT)
+XPATH = '/{}'.format(NS_PROJECT)
+XPATH_LEN = len(XPATH)
+
+NAME = 'name'
+NAME_LEN = len(NAME)
+NS_NAME = '{}:{}'.format(NS, NAME)
+
+DEFAULT_PROJECT = 'default'
+DEFAULT_PREFIX = "{}[{}='{}']".format(XPATH,
+ NS_NAME,
+ DEFAULT_PROJECT)
+
+
class ManoProject(object):
'''Class to handle the project name'''
- NS = 'rw-project'
- XPATH = '/{}:project'.format(NS)
- XPATH_LEN = len(XPATH)
-
- NAME = 'name'
- NAME_LEN = len(NAME)
- NS_NAME = '{}:{}'.format(NS, NAME)
+ log = None
@classmethod
- def create_from_xpath(cls, xpath, log):
+ def instance_from_xpath(cls, xpath, log):
name = cls.from_xpath(xpath, log)
if name is None:
return None
@@ -57,24 +90,23 @@
def from_xpath(cls, xpath, log):
log.debug("Get project name from {}".format(xpath));
- if cls.XPATH in xpath:
- idx = xpath.find(cls.XPATH) + cls.XPATH_LEN
+ if XPATH in xpath:
+ idx = xpath.find(XPATH)
if idx == -1:
msg = "Project not found in XPATH: {}".format(xpath)
log.error(msg)
raise ManoProjXpathNoProjErr(msg)
- sub = xpath[idx:]
- sub = sub.strip()
- if (len(sub) < cls.NAME_LEN) or (sub[0] != '['):
+ sub = xpath[idx+XPATH_LEN:].strip()
+ if (len(sub) < NAME_LEN) or (sub[0] != '['):
msg = "Project name not found in XPath: {}".format(xpath)
log.error(msg)
raise ManoProjXpathKeyErr(msg)
sub = sub[1:].strip()
- idx = sub.find(cls.NS_NAME)
+ idx = sub.find(NS_NAME)
if idx == -1:
- idx = sub.find(cls.NAME)
+ idx = sub.find(NAME)
if idx != 0:
msg = "Project name not found in XPath: {}".format(xpath)
log.error(msg)
@@ -86,10 +118,11 @@
log.error(msg)
raise ManoProjXpathKeyErr(msg)
- sub = sub[:idx-1].strip()
+ sub = sub[:idx].strip()
try:
+ log.debug("Key and value found: {}".format(sub))
k, n = sub.split("=", 2)
- name = n.strip()
+ name = n.strip(' \'"')
if name is None:
msg = "Project name is empty in XPath".format(xpath)
log.error(msg)
@@ -104,23 +137,497 @@
.format(xpath, e)
log.exception(msg)
raise ManoProjXpathKeyErr(msg)
+ else:
+ msg = "Project not found in XPATH: {}".format(xpath)
+ log.error(msg)
+ raise ManoProjXpathNoProjErr(msg)
- def __init__(self, log, name=None):
+ @classmethod
+ def get_log(cls):
+ if not cls.log:
+ cls.log = logging.getLogger('rw-mano-log.rw-project')
+ cls.log.setLevel(logging.ERROR)
+
+ @classmethod
+ def prefix_project(cls, xpath, project=None, log=None):
+ if log is None:
+ log = cls.get_log()
+
+ if project is None:
+ project = DEFAULT_PROJECT
+ proj_prefix = DEFAULT_PREFIX
+ else:
+ proj_prefix = "{}[{}='{}']".format(XPATH,
+ NS_NAME,
+ project)
+
+ log.debug("Add project {} to {}".format(project, xpath))
+
+ prefix = ''
+ suffix = xpath
+ idx = xpath.find('C,/')
+ if idx == -1:
+ idx = xpath.find('D,/')
+
+ suffix = xpath
+ if idx != -1:
+ prefix = xpath[:2]
+ suffix = xpath[2:]
+
+ if suffix[0] != '/':
+ msg = "Non-rooted xpath provided: {}".format(xpath)
+ log.error(msg)
+ raise ManoProjXpathNotRootErr(msg)
+
+ idx = suffix.find(XPATH)
+ if idx == 0:
+ name = cls.from_xpath(xpath, log)
+ if name == project:
+ log.warning("Project already in the XPATH: {}".format(xpath))
+ return xpath
+
+ else:
+ msg = "Different project {} already in XPATH {}". \
+ format(name, xpath)
+ log.error(msg)
+ raise ManoProjXpathPresentErr(msg)
+
+ ret = prefix + proj_prefix + suffix
+ log.debug("XPath with project: {}".format(ret))
+ return ret
+
+
+ def __init__(self, log, name=None, tasklet=None):
self._log = log
- self._name = name
+ self._name = None
+ self._prefix = None
+ self._pbcm = None
+ self._tasklet = None
+ self._dts = None
+ self._loop = None
+ self._log_hdl = None
+
+ # Track if the apply config was received
+ self._apply = False
+
+ if name:
+ self.name = name
+
+ def update(self, tasklet):
+ # Store the commonly used properties from a tasklet
+ self._tasklet = tasklet
+ self._log_hdl = tasklet.log_hdl
+ self._dts = tasklet.dts
+ self._loop = tasklet.loop
@property
def name(self):
return self._name
+ @property
+ def log(self):
+ return self._log
+
+ @property
+ def prefix(self):
+ return self._prefix
+
+ @property
+ def pbcm(self):
+ return self._pbcm
+
+ @property
+ def config(self):
+ return self._pbcm.project_config
+
+ @property
+ def tasklet(self):
+ return self._tasklet
+
+ @property
+ def log_hdl(self):
+ return self._log_hdl
+
+ @property
+ def dts(self):
+ return self._dts
+
+ @property
+ def loop(self):
+ return self._loop
+
@name.setter
def name(self, value):
if self._name is None:
self._name = value
+ self._prefix = "{}[{}='{}']".format(XPATH,
+ NS_NAME,
+ self._name)
+ self._pbcm = RwProjectYang.YangData_RwProject_Project(
+ name=self._name)
+
+ elif self._name == value:
+ self._log.debug("Setting the same name again for project {}".
+ format(value))
else:
msg = "Project name already set to {}".format(self._name)
self._log.error(msg)
raise ManoProjNameSetErr(msg)
def set_from_xpath(self, xpath):
- self.name = ManoProject.get_from_xpath(xpath, self._log)
+ self.name = ManoProject.from_xpath(xpath, self._log)
+
+ def add_project(self, xpath):
+ return ManoProject.prefix_project(xpath, log=self._log, project=self._name)
+
+ @abc.abstractmethod
+ @asyncio.coroutine
+ def delete_prepare(self):
+ self._log.debug("Delete prepare for project {}".format(self._name))
+ return True
+
+ @abc.abstractmethod
+ @asyncio.coroutine
+ def register(self):
+ msg = "Register not implemented for project type {}". \
+ format(self.__class__.__name__)
+ self._log.error(msg)
+ raise NotImplementedError(msg)
+
+ @abc.abstractmethod
+ def deregister(self):
+ msg = "De-register not implemented for project type {}". \
+ format(self.__class__.__name__)
+ self._log.error(msg)
+ raise NotImplementedError(msg)
+
+ def rpc_check(self, msg, xact_info=None):
+ '''Check if the rpc is for this project'''
+ try:
+ project = msg.project_name
+ except AttributeError as e:
+ project = DEFAULT_PROJECT
+
+ if project != self.name:
+ self._log.debug("Project {}: RPC is for different project {}".
+ format(self.name, project))
+ if xact_info:
+ xact_info.respond_xpath(rwdts.XactRspCode.ACK)
+ return False
+
+ return True
+
+ @asyncio.coroutine
+ def create_project(self, dts):
+ proj_xpath = "C,{}/project-config".format(self.prefix)
+ self._log.info("Creating project: {} with {}".
+ format(proj_xpath, self.config.as_dict()))
+
+ yield from dts.query_create(proj_xpath,
+ rwdts.XactFlag.ADVISE,
+ self.config)
+
+
+def get_add_delete_update_cfgs(dts_member_reg, xact, key_name):
+ #TODO: Check why this is getting called during project delete
+ if not dts_member_reg:
+ return [], [], []
+
+ # Unforunately, it is currently difficult to figure out what has exactly
+ # changed in this xact without Pbdelta support (RIFT-4916)
+ # As a workaround, we can fetch the pre and post xact elements and
+ # perform a comparison to figure out adds/deletes/updates
+ xact_cfgs = list(dts_member_reg.get_xact_elements(xact))
+ curr_cfgs = list(dts_member_reg.elements)
+
+ xact_key_map = {getattr(cfg, key_name): cfg for cfg in xact_cfgs}
+ curr_key_map = {getattr(cfg, key_name): cfg for cfg in curr_cfgs}
+
+ # Find Adds
+ added_keys = set(xact_key_map) - set(curr_key_map)
+ added_cfgs = [xact_key_map[key] for key in added_keys]
+
+ # Find Deletes
+ deleted_keys = set(curr_key_map) - set(xact_key_map)
+ deleted_cfgs = [curr_key_map[key] for key in deleted_keys]
+
+ # Find Updates
+ updated_keys = set(curr_key_map) & set(xact_key_map)
+ updated_cfgs = [xact_key_map[key] for key in updated_keys if xact_key_map[key] != curr_key_map[key]]
+
+ return added_cfgs, deleted_cfgs, updated_cfgs
+
+
+class ProjectConfigCallbacks(object):
+ def __init__(self,
+ on_add_apply=None, on_add_prepare=None,
+ on_delete_apply=None, on_delete_prepare=None):
+
+ @asyncio.coroutine
+ def prepare_noop(*args, **kwargs):
+ pass
+
+ def apply_noop(*args, **kwargs):
+ pass
+
+ self.on_add_apply = on_add_apply
+ self.on_add_prepare = on_add_prepare
+ self.on_delete_apply = on_delete_apply
+ self.on_delete_prepare = on_delete_prepare
+
+ for f in ('on_add_apply', 'on_delete_apply'):
+ ref = getattr(self, f)
+ if ref is None:
+ setattr(self, f, apply_noop)
+ continue
+
+ if asyncio.iscoroutinefunction(ref):
+ raise ValueError('%s cannot be a coroutine' % (f,))
+
+ for f in ('on_add_prepare', 'on_delete_prepare'):
+ ref = getattr(self, f)
+ if ref is None:
+ setattr(self, f, prepare_noop)
+ continue
+
+ if not asyncio.iscoroutinefunction(ref):
+ raise ValueError("%s must be a coroutine" % f)
+
+
+class ProjectDtsHandler(object):
+ XPATH = "C,{}/project-config".format(XPATH)
+
+ def __init__(self, dts, log, callbacks):
+ self._dts = dts
+ self._log = log
+ self._callbacks = callbacks
+
+ self.reg = None
+ self.projects = []
+
+ @property
+ def log(self):
+ return self._log
+
+ @property
+ def dts(self):
+ return self._dts
+
+ def add_project(self, name):
+ self.log.info("Adding project: {}".format(name))
+
+ if name not in self.projects:
+ self._callbacks.on_add_apply(name)
+ self.projects.append(name)
+ else:
+ self.log.error("Project already present: {}".
+ format(name))
+
+ def delete_project(self, name):
+ self._log.info("Deleting project: {}".format(name))
+ if name in self.projects:
+ self._callbacks.on_delete_apply(name)
+ self.projects.remove(name)
+ else:
+ self.log.error("Unrecognized project: {}".
+ format(name))
+
+ def update_project(self, name):
+ """ Update an existing project
+
+ Currently, we do not take any action on MANO for this,
+ so no callbacks are defined
+
+ Arguments:
+ msg - The project config message
+ """
+ self._log.info("Updating project: {}".format(name))
+ if name in self.projects:
+ pass
+ else:
+ self.log.error("Unrecognized project: {}".
+ format(name))
+
+ def register(self):
+ @asyncio.coroutine
+ def apply_config(dts, acg, xact, action, scratch):
+ self._log.debug("Got project apply config (xact: %s) (action: %s)", xact, action)
+
+ if xact.xact is None:
+ if action == rwdts.AppconfAction.INSTALL:
+ curr_cfg = self._reg.elements
+ for cfg in curr_cfg:
+ self._log.debug("Project being re-added after restart.")
+ self.add_project(cfg)
+ else:
+ # When RIFT first comes up, an INSTALL is called with the current config
+ # Since confd doesn't actally persist data this never has any data so
+ # skip this for now.
+ self._log.debug("No xact handle. Skipping apply config")
+
+ return
+
+ add_cfgs, delete_cfgs, update_cfgs = get_add_delete_update_cfgs(
+ dts_member_reg=self._reg,
+ xact=xact,
+ key_name="name_ref",
+ )
+
+ # Handle Deletes
+ for cfg in delete_cfgs:
+ self.delete_project(cfg.name_ref)
+
+ # Handle Adds
+ for cfg in add_cfgs:
+ self.add_project(cfg.name_ref)
+
+ # Handle Updates
+ for cfg in update_cfgs:
+ self.update_project(cfg.name_ref)
+
+ @asyncio.coroutine
+ def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
+ """ Prepare callback from DTS for Project """
+
+ # xpath = ks_path.to_xpath(RwProjectYang.get_schema())
+ # name = ManoProject.from_xpath(xpath, self._log)
+ # if not name:
+ # self._log.error("Did not find the project name in ks: {}".
+ # format(xpath))
+ # xact_info.respond_xpath(rwdts.XactRspCode.NACK)
+ # return
+
+ action = xact_info.query_action
+ name = msg.name_ref
+ self._log.debug("Project %s on_prepare config received (action: %s): %s",
+ name, xact_info.query_action, msg)
+
+ if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE]:
+ if name in self.projects:
+ self._log.debug("Project {} already exists. Ignore request".
+ format(name))
+
+ else:
+ self._log.debug("Project {}: Invoking on_prepare add request".
+ format(name))
+ yield from self._callbacks.on_add_prepare(name)
+
+ elif action == rwdts.QueryAction.DELETE:
+ # Check if the entire project got deleted
+ fref = ProtobufC.FieldReference.alloc()
+ fref.goto_whole_message(msg.to_pbcm())
+ if fref.is_field_deleted():
+ if name in self.projects:
+ rc = yield from self._callbacks.on_delete_prepare(name)
+ if not rc:
+ self._log.error("Project {} should not be deleted".
+ format(name))
+ xact_info.respond_xpath(rwdts.XactRspCode.NACK)
+ else:
+ self._log.warning("Delete on unknown project: {}".
+ format(name))
+
+ else:
+ self._log.error("Action (%s) NOT SUPPORTED", action)
+ xact_info.respond_xpath(rwdts.XactRspCode.NACK)
+ return
+
+ xact_info.respond_xpath(rwdts.XactRspCode.ACK)
+
+ self._log.debug("Registering for project config using xpath: %s",
+ ProjectDtsHandler.XPATH,
+ )
+
+ acg_handler = rift.tasklets.AppConfGroup.Handler(
+ on_apply=apply_config,
+ )
+
+ with self._dts.appconf_group_create(acg_handler) as acg:
+ self._reg = acg.register(
+ xpath=ProjectDtsHandler.XPATH,
+ flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE,
+ on_prepare=on_prepare,
+ )
+
+class ProjectHandler(object):
+ def __init__(self, tasklet, project_class, **kw):
+ self._tasklet = tasklet
+ self._log = tasklet.log
+ self._log_hdl = tasklet.log_hdl
+ self._dts = tasklet.dts
+ self._loop = tasklet.loop
+ self._class = project_class
+ self._kw = kw
+
+ self._log.debug("Creating project config handler")
+ self.project_cfg_handler = ProjectDtsHandler(
+ self._dts, self._log,
+ ProjectConfigCallbacks(
+ on_add_apply=self.on_project_added,
+ on_add_prepare=self.on_add_prepare,
+ on_delete_apply=self.on_project_deleted,
+ on_delete_prepare=self.on_delete_prepare,
+ )
+ )
+
+ def _get_tasklet_name(self):
+ return self._tasklet.tasklet_info.instance_name
+
+ def _get_project(self, name):
+ try:
+ proj = self._tasklet.projects[name]
+ except Exception as e:
+ self._log.exception("Project {} ({})not found for tasklet {}: {}".
+ format(name, list(self._tasklet.projects.keys()),
+ self._get_tasklet_name(), e))
+ raise e
+
+ return proj
+
+ def on_project_deleted(self, name):
+ self._log.debug("Project {} deleted".format(name))
+ try:
+ self._get_project(name).deregister()
+ except Exception as e:
+ self._log.exception("Project {} deregister for {} failed: {}".
+ format(name, self._get_tasklet_name(), e))
+
+ try:
+ proj = self._tasklet.projects.pop(name)
+ del proj
+ except Exception as e:
+ self._log.exception("Project {} delete for {} failed: {}".
+ format(name, self._get_tasklet_name(), e))
+
+ def on_project_added(self, name):
+ self._log.debug("Project {} added to tasklet {}".
+ format(name, self._get_tasklet_name()))
+ self._get_project(name)._apply = True
+
+ @asyncio.coroutine
+ def on_add_prepare(self, name):
+ self._log.debug("Project {} to be added to {}".
+ format(name, self._get_tasklet_name()))
+
+ try:
+ self._tasklet.projects[name] = \
+ self._class(name, self._tasklet, **(self._kw))
+ except Exception as e:
+ self._log.exception("Project {} create for {} failed: {}".
+ formatname, self._get_tasklet_name(), e())
+
+ try:
+ yield from self._get_project(name).register()
+ except Exception as e:
+ self._log.exception("Project {} register for tasklet {} failed: {}".
+ format(name, self._get_tasklet_name(), e))
+
+ @asyncio.coroutine
+ def on_delete_prepare(self, name):
+ self._log.debug("Project {} being deleted for tasklet {}".
+ format(name, self._get_tasklet_name()))
+ rc = yield from self._get_project(name).delete_prepare()
+ return rc
+
+ def register(self):
+ self.project_cfg_handler.register()
diff --git a/common/python/rift/mano/yang_translator/rwmano/yang/yang_nsd.py b/common/python/rift/mano/yang_translator/rwmano/yang/yang_nsd.py
index 491bd86..41845dc 100644
--- a/common/python/rift/mano/yang_translator/rwmano/yang/yang_nsd.py
+++ b/common/python/rift/mano/yang_translator/rwmano/yang/yang_nsd.py
@@ -73,7 +73,7 @@
self.inputs.append({
self.NAME:
self.map_yang_name_to_tosca(
- val.replace('/nsd:nsd-catalog/nsd:nsd/nsd:', ''))})
+ val.replace('/rw-project:project/nsd:nsd-catalog/nsd:nsd/nsd:', ''))})
if len(param):
self.log.warn(_("{0}, Did not process the following for "
"input param {1}: {2}").