3 # Copyright 2016 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
25 import rift
.tasklets
.rwnsmtasklet
.cloud
as cloud
26 import rift
.tasklets
.rwnsmtasklet
.openmano_nsm
as openmano_nsm
30 gi
.require_version('RwDtsYang', '1.0')
31 from gi
.repository
import (
32 RwLaunchpadYang
as launchpadyang
,
42 class DescriptorPublisher(object):
43 def __init__(self
, log
, dts
, loop
):
48 self
._registrations
= []
51 def publish(self
, w_path
, path
, desc
):
52 ready_event
= asyncio
.Event(loop
=self
.loop
)
55 def on_ready(regh
, status
):
56 self
.log
.debug("Create element: %s, obj-type:%s obj:%s",
57 path
, type(desc
), desc
)
58 with self
.dts
.transaction() as xact
:
59 regh
.create_element(path
, desc
, xact
.xact
)
60 self
.log
.debug("Created element: %s, obj:%s", path
, desc
)
63 handler
= rift
.tasklets
.DTS
.RegistrationHandler(
67 self
.log
.debug("Registering path: %s, obj:%s", w_path
, desc
)
68 reg
= yield from self
.dts
.register(
71 flags
=rwdts
.Flag
.PUBLISHER | rwdts
.Flag
.NO_PREP_READ
73 self
._registrations
.append(reg
)
74 self
.log
.debug("Registered path : %s", w_path
)
75 yield from ready_event
.wait()
79 def unpublish_all(self
):
80 self
.log
.debug("Deregistering all published descriptors")
81 for reg
in self
._registrations
:
84 class RoAccountDtsTestCase(rift
.test
.dts
.AbstractDTSTest
):
86 def configure_schema(cls
):
87 return launchpadyang
.get_schema()
90 def configure_timeout(cls
):
93 def configure_test(self
, loop
, test_id
):
94 self
.log
.debug("STARTING - %s", test_id
)
95 self
.tinfo
= self
.new_tinfo(str(test_id
))
96 self
.dts
= rift
.tasklets
.DTS(self
.tinfo
, self
.schema
, self
.loop
)
98 self
.tinfo_sub
= self
.new_tinfo(str(test_id
) + "_sub")
99 self
.dts_sub
= rift
.tasklets
.DTS(self
.tinfo_sub
, self
.schema
, self
.loop
)
101 self
.publisher
= DescriptorPublisher(self
.log
, self
.dts
, self
.loop
)
106 @rift.test
.dts
.async_test
107 def test_orch_account_create(self
):
108 orch
= cloud
.ROAccountPluginSelector(self
.dts
, self
.log
, self
.loop
, None)
110 yield from orch
.register()
112 # Test if we have a default plugin in case no RO is specified.
113 assert type(orch
.ro_plugin
) is cloud
.RwNsPlugin
114 mock_orch_acc
= launchpadyang
.ResourceOrchestrator
.from_dict(
115 {'name': 'rift-ro', 'account_type': 'rift_ro', 'rift_ro': {'rift_ro': True}})
117 # Test rift-ro plugin CREATE
118 w_xpath
= "C,/rw-launchpad:resource-orchestrator"
120 yield from self
.publisher
.publish(w_xpath
, xpath
, mock_orch_acc
)
121 yield from asyncio
.sleep(5, loop
=self
.loop
)
123 assert type(orch
.ro_plugin
) is cloud
.RwNsPlugin
125 # Test Openmano plugin CREATE
126 mock_orch_acc
= launchpadyang
.ResourceOrchestrator
.from_dict(
128 'account_type': 'openmano',
129 'openmano': {'tenant_id': "abc",
131 "host": "10.64.11.77"}})
132 yield from self
.publisher
.publish(w_xpath
, xpath
, mock_orch_acc
)
133 yield from asyncio
.sleep(5, loop
=self
.loop
)
135 assert type(orch
.ro_plugin
) is openmano_nsm
.OpenmanoNsPlugin
136 assert orch
.ro_plugin
._cli
_api
._port
== mock_orch_acc
.openmano
.port
137 assert orch
.ro_plugin
._cli
_api
._host
== mock_orch_acc
.openmano
.host
140 mock_orch_acc
.openmano
.port
= 9789
141 mock_orch_acc
.openmano
.host
= "10.64.11.78"
142 yield from self
.dts
.query_update("C,/rw-launchpad:resource-orchestrator",
143 rwdts
.XactFlag
.ADVISE
, mock_orch_acc
)
144 assert orch
.ro_plugin
._cli
_api
._port
== mock_orch_acc
.openmano
.port
145 assert orch
.ro_plugin
._cli
_api
._host
== mock_orch_acc
.openmano
.host
147 # Test update when a live instance exists
148 # Exception should be thrown
149 orch
.handle_nsr(None, rwdts
.QueryAction
.CREATE
)
150 mock_orch_acc
.openmano
.port
= 9788
152 with self
.assertRaises(Exception):
153 yield from self
.dts
.query_update("C,/rw-launchpad:resource-orchestrator",
154 rwdts
.XactFlag
.ADVISE
, mock_orch_acc
)
157 yield from self
.dts
.query_delete("C,/rw-launchpad:resource-orchestrator",
158 flags
=rwdts
.XactFlag
.ADVISE
)
159 assert orch
.ro_plugin
== None
162 def main(argv
=sys
.argv
[1:]):
164 # The unittest framework requires a program name, so use the name of this
165 # file instead (we do not want to have to pass a fake program name to main
166 # when this is called from the interpreter).
168 argv
=[__file__
] + argv
,
169 testRunner
=None#xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
172 if __name__
== '__main__':