update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / rwlaunchpad / plugins / rwstagingmgr / test / utest_rpc_dts.py
1 #!/usr/bin/env python3
2
3 #
4 # Copyright 2016 RIFT.IO Inc
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 #
18
19 import asyncio
20 import argparse
21 import os
22 import shutil
23 import sys
24 import unittest
25 import uuid
26 import xmlrunner
27 import mock
28 import uuid
29
30 import gi
31 gi.require_version('RwDts', '1.0')
32 gi.require_version('RwStagingMgmtYang', '1.0')
33 from gi.repository import (
34 RwDts as rwdts,
35 RwStagingMgmtYang,
36 )
37
38
39 import rift.tasklets.rwstagingmgr.rpc as rpc
40 import rift.test.dts
41
42
43 class TestCase(rift.test.dts.AbstractDTSTest):
44 @classmethod
45 def configure_schema(cls):
46 return RwStagingMgmtYang.get_schema()
47
48 @classmethod
49 def configure_timeout(cls):
50 return 240
51
52 def configure_test(self, loop, test_id):
53 self.log.debug("STARTING - %s", test_id)
54 self.tinfo = self.new_tinfo(str(test_id))
55 self.dts = rift.tasklets.DTS(self.tinfo, self.schema, self.loop)
56
57 def tearDown(self):
58 super().tearDown()
59
60 @rift.test.dts.async_test
61 def test_staging_area_create(self):
62 """
63 Verifies the following:
64 The endpoint RPC returns a URL
65 """
66 uid = str(uuid.uuid4())
67 mock_model = mock.MagicMock()
68 mock_model.model.area_id = uid
69 mock_store = mock.MagicMock()
70 mock_store.create_staging_area.return_value = mock_model
71
72 endpoint = rpc.StagingAreaCreateRpcHandler(self.log, self.dts, self.loop, mock_store)
73 yield from endpoint.register()
74
75 yield from asyncio.sleep(2, loop=self.loop)
76
77 ip = RwStagingMgmtYang.YangInput_RwStagingMgmt_CreateStagingArea.from_dict({
78 "package_type": "VNFD"})
79
80 rpc_out = yield from self.dts.query_rpc(
81 "I,/rw-staging-mgmt:create-staging-area",
82 rwdts.XactFlag.TRACE,
83 ip)
84
85 for itr in rpc_out:
86 result = yield from itr
87 print (result)
88 assert uid in result.result.endpoint
89
90 def main():
91 runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
92
93 parser = argparse.ArgumentParser()
94 parser.add_argument('-v', '--verbose', action='store_true')
95 parser.add_argument('-n', '--no-runner', action='store_true')
96 args, unittest_args = parser.parse_known_args()
97 if args.no_runner:
98 runner = None
99
100
101 unittest.main(testRunner=runner, argv=[sys.argv[0]] + unittest_args)
102
103 if __name__ == '__main__':
104 main()