3 # Copyright 2016 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
26 #Setting RIFT_VAR_ROOT if not already set for unit test execution
27 if "RIFT_VAR_ROOT" not in os
.environ
:
28 os
.environ
['RIFT_VAR_ROOT'] = os
.path
.join(os
.environ
['RIFT_INSTALL'], 'var/rift/unittest')
31 import rift
.tasklets
.rwnsmtasklet
.cloud
as cloud
32 import rift
.tasklets
.rwnsmtasklet
.rwnsmplugin
as rwnsmplugin
33 import rift
.tasklets
.rwnsmtasklet
.openmano_nsm
as openmano_nsm
34 from rift
.mano
.utils
.project
import ManoProject
38 gi
.require_version('RwDts', '1.0')
39 from gi
.repository
import (
40 RwRoAccountYang
as roaccountyang
,
42 RwProjectVnfdYang
as RwVnfdYang
,
45 RwProjectNsdYang
as RwNsdYang
,
50 class DescriptorPublisher(object):
51 def __init__(self
, log
, dts
, loop
):
55 self
._registrations
= []
58 def update(self
, xpath
, desc
):
59 self
._registrations
[-1].update_element(xpath
, desc
)
62 def delete(self
, xpath
):
63 self
._registrations
[-1].delete_element(xpath
)
66 def publish(self
, w_path
, path
, desc
):
67 ready_event
= asyncio
.Event(loop
=self
.loop
)
70 def on_ready(regh
, status
):
71 self
.log
.debug("Create element: %s, obj-type:%s obj:%s",
72 path
, type(desc
), desc
)
73 with self
.dts
.transaction() as xact
:
74 regh
.create_element(path
, desc
, xact
.xact
)
75 self
.log
.debug("Created element: %s, obj:%s", path
, desc
)
78 handler
= rift
.tasklets
.DTS
.RegistrationHandler(
82 self
.log
.debug("Registering path: %s, obj:%s", w_path
, desc
)
84 reg
= yield from self
.dts
.register(
87 flags
=rwdts
.Flag
.PUBLISHER | rwdts
.Flag
.NO_PREP_READ
90 self
._registrations
.append(reg
)
91 self
.log
.debug("Registered path : %s", w_path
)
92 yield from ready_event
.wait()
96 def unpublish_all(self
):
97 self
.log
.debug("Deregistering all published descriptors")
98 for reg
in self
._registrations
:
101 class RoAccountDtsTestCase(rift
.test
.dts
.AbstractDTSTest
):
103 def configure_schema(cls
):
104 return roaccountyang
.get_schema()
107 def configure_timeout(cls
):
110 def configure_test(self
, loop
, test_id
):
111 self
.log
.debug("STARTING - %s", test_id
)
112 self
.tinfo
= self
.new_tinfo(str(test_id
))
113 self
.dts
= rift
.tasklets
.DTS(self
.tinfo
, self
.schema
, self
.loop
)
114 self
.project
= ManoProject(self
.log
)
116 self
.tinfo_sub
= self
.new_tinfo(str(test_id
) + "_sub")
117 self
.dts_sub
= rift
.tasklets
.DTS(self
.tinfo_sub
, self
.schema
, self
.loop
)
119 self
.publisher
= DescriptorPublisher(self
.log
, self
.dts
, self
.loop
)
124 @rift.test
.dts
.async_test
125 def test_orch_account_create(self
):
126 ro_cfg_sub
= cloud
.ROAccountConfigSubscriber(self
.dts
, self
.log
, self
.loop
, self
.project
, None)
127 yield from ro_cfg_sub
.register()
129 ro_plugin
= ro_cfg_sub
.get_ro_plugin(account_name
=None)
130 # Test if we have a default plugin in case no RO is specified.
131 assert type(ro_plugin
) is rwnsmplugin
.RwNsPlugin
133 # Test rift-ro plugin CREATE
134 w_xpath
= self
.project
.add_project("C,/rw-ro-account:ro-account/rw-ro-account:account")
135 xpath
= w_xpath
+ "[rw-ro-account:name='openmano']"
137 # Test Openmano plugin CREATE
138 mock_orch_acc
= roaccountyang
.YangData_RwProject_Project_RoAccount_Account
.from_dict(
140 'ro_account_type': 'openmano',
141 'openmano': {'tenant_id': "abc",
143 "host": "10.64.11.77"}})
145 yield from self
.publisher
.publish(w_xpath
, xpath
, mock_orch_acc
)
146 yield from asyncio
.sleep(5, loop
=self
.loop
)
148 ro_plugin
= ro_cfg_sub
.get_ro_plugin(account_name
='openmano')
149 assert type(ro_plugin
) is openmano_nsm
.OpenmanoNsPlugin
152 mock_orch_acc
.openmano
.port
= 9789
153 mock_orch_acc
.openmano
.host
= "10.64.11.78"
154 yield from self
.publisher
.update(xpath
, mock_orch_acc
)
155 yield from asyncio
.sleep(5, loop
=self
.loop
)
157 #Since update means delete followed by a insert get the new ro_plugin.
158 ro_plugin
= ro_cfg_sub
.get_ro_plugin(account_name
='openmano')
159 assert ro_plugin
._cli
_api
._port
== mock_orch_acc
.openmano
.port
160 assert ro_plugin
._cli
_api
._host
== mock_orch_acc
.openmano
.host
162 # Test delete to be implemented. right now facing some dts issues.
163 # Use DescriptorPublisher delete for deletion
165 def main(argv
=sys
.argv
[1:]):
167 # The unittest framework requires a program name, so use the name of this
168 # file instead (we do not want to have to pass a fake program name to main
169 # when this is called from the interpreter).
171 argv
=[__file__
] + argv
,
172 testRunner
=xmlrunner
.XMLTestRunner(output
=os
.environ
["RIFT_MODULE_TEST"])
175 if __name__
== '__main__':