update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / rwlaunchpad / test / utest_scaling_rpc.py
1 #!/usr/bin/env python3
2
3 #
4 # Copyright 2016 RIFT.IO Inc
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 #
18
19 import argparse
20 import asyncio
21 import gi
22 import logging
23 import os
24 import sys
25 import time
26 import types
27 import unittest
28 import uuid
29 import xmlrunner
30
31 gi.require_version('RwCloudYang', '1.0')
32 gi.require_version('RwDts', '1.0')
33 gi.require_version('RwNsmYang', '1.0')
34 gi.require_version('RwLaunchpadYang', '1.0')
35 gi.require_version('RwResourceMgrYang', '1.0')
36 gi.require_version('RwcalYang', '1.0')
37 gi.require_version('RwNsrYang', '1.0')
38 gi.require_version('NsrYang', '1.0')
39 gi.require_version('RwlogMgmtYang', '1.0')
40
41 from gi.repository import (
42 RwCloudYang as rwcloudyang,
43 RwDts as rwdts,
44 RwLaunchpadYang as launchpadyang,
45 RwNsmYang as rwnsmyang,
46 RwNsrYang as rwnsryang,
47 NsrYang as nsryang,
48 RwResourceMgrYang as rmgryang,
49 RwcalYang as rwcalyang,
50 RwConfigAgentYang as rwcfg_agent,
51 RwlogMgmtYang
52 )
53 gi.require_version('RwKeyspec', '1.0')
54 from gi.repository.RwKeyspec import quoted_key
55
56 from gi.repository.RwTypes import RwStatus
57 import rift.mano.examples.ping_pong_nsd as ping_pong_nsd
58 import rift.tasklets
59 import rift.test.dts
60 import rw_peas
61
62
63
64
65 class ManoTestCase(rift.test.dts.AbstractDTSTest):
66 """
67 DTS GI interface unittests
68
69 Note: Each tests uses a list of asyncio.Events for staging through the
70 test. These are required here because we are bring up each coroutine
71 ("tasklet") at the same time and are not implementing any re-try
72 mechanisms. For instance, this is used in numerous tests to make sure that
73 a publisher is up and ready before the subscriber sends queries. Such
74 event lists should not be used in production software.
75 """
76
77 @classmethod
78 def configure_suite(cls, rwmain):
79 nsm_dir = os.environ.get('NSM_DIR')
80
81 rwmain.add_tasklet(nsm_dir, 'rwnsmtasklet')
82
83 @classmethod
84 def configure_schema(cls):
85 return rwnsmyang.get_schema()
86
87 @classmethod
88 def configure_timeout(cls):
89 return 240
90
91 @staticmethod
92 def get_cal_account(account_type, account_name):
93 """
94 Creates an object for class RwcalYang.Clo
95 """
96 account = rwcloudyang.YangData_RwProject_Project_CloudAccounts_CloudAccountList()
97 if account_type == 'mock':
98 account.name = account_name
99 account.account_type = "mock"
100 account.mock.username = "mock_user"
101 elif ((account_type == 'openstack_static') or (account_type == 'openstack_dynamic')):
102 account.name = account_name
103 account.account_type = 'openstack'
104 account.openstack.key = openstack_info['username']
105 account.openstack.secret = openstack_info['password']
106 account.openstack.auth_url = openstack_info['auth_url']
107 account.openstack.tenant = openstack_info['project_name']
108 account.openstack.mgmt_network = openstack_info['mgmt_network']
109 return account
110
111 @asyncio.coroutine
112 def configure_cloud_account(self, dts, cloud_type, cloud_name="cloud1"):
113 account = self.get_cal_account(cloud_type, cloud_name)
114 account_xpath = "C,/rw-cloud:cloud/rw-cloud:account[rw-cloud:name={}]".format(quoted_key(cloud_name))
115 self.log.info("Configuring cloud-account: %s", account)
116 yield from dts.query_create(account_xpath,
117 rwdts.XactFlag.ADVISE,
118 account)
119
120 @asyncio.coroutine
121 def wait_tasklets(self):
122 yield from asyncio.sleep(5, loop=self.loop)
123
124 def configure_test(self, loop, test_id):
125 self.log.debug("STARTING - %s", self.id())
126 self.tinfo = self.new_tinfo(self.id())
127 self.dts = rift.tasklets.DTS(self.tinfo, self.schema, self.loop)
128
129 def test_create_nsr_record(self):
130
131 @asyncio.coroutine
132 def run_test():
133 yield from self.wait_tasklets()
134
135 cloud_type = "mock"
136 yield from self.configure_cloud_account(self.dts, cloud_type, "mock_account")
137
138
139 # Trigger an rpc
140 rpc_ip = nsryang.YangInput_Nsr_ExecScaleIn.from_dict({
141 'nsr_id_ref': '1',
142 'instance_id': "1",
143 'scaling_group_name_ref': "foo"})
144
145 yield from self.dts.query_rpc("/nsr:exec-scale-in", 0, rpc_ip)
146
147 future = asyncio.ensure_future(run_test(), loop=self.loop)
148 self.run_until(future.done)
149 if future.exception() is not None:
150 self.log.error("Caught exception during test")
151 raise future.exception()
152
153
154 def main():
155 top_dir = __file__[:__file__.find('/modules/core/')]
156 build_dir = os.path.join(top_dir, '.build/modules/core/rwvx/src/core_rwvx-build')
157 launchpad_build_dir = os.path.join(top_dir, '.build/modules/core/mc/core_mc-build/rwlaunchpad')
158
159 if 'NSM_DIR' not in os.environ:
160 os.environ['NSM_DIR'] = os.path.join(launchpad_build_dir, 'plugins/rwnsm')
161
162 runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
163
164 parser = argparse.ArgumentParser()
165 parser.add_argument('-v', '--verbose', action='store_true')
166 parser.add_argument('-n', '--no-runner', action='store_true')
167 args, unittest_args = parser.parse_known_args()
168 if args.no_runner:
169 runner = None
170
171 ManoTestCase.log_level = logging.DEBUG if args.verbose else logging.WARN
172
173 unittest.main(testRunner=runner, argv=[sys.argv[0]] + unittest_args)
174
175 if __name__ == '__main__':
176 main()
177
178 # vim: sw=4