RIFT 16526 Copy the icon image also to the UI logos location for UI update.
[osm/SO.git] / rwlaunchpad / test / utest_scaling_rpc.py
1 #!/usr/bin/env python3
2
3 #
4 # Copyright 2016 RIFT.IO Inc
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 #
18
19
20 import asyncio
21 import os
22 import sys
23 import unittest
24 import uuid
25 import xmlrunner
26 import argparse
27 import logging
28 import time
29 import types
30
31 import gi
32 gi.require_version('RwCloudYang', '1.0')
33 gi.require_version('RwDts', '1.0')
34 gi.require_version('RwNsmYang', '1.0')
35 gi.require_version('RwLaunchpadYang', '1.0')
36 gi.require_version('RwResourceMgrYang', '1.0')
37 gi.require_version('RwcalYang', '1.0')
38 gi.require_version('RwNsrYang', '1.0')
39 gi.require_version('NsrYang', '1.0')
40 gi.require_version('RwlogMgmtYang', '1.0')
41
42 from gi.repository import (
43 RwCloudYang as rwcloudyang,
44 RwDts as rwdts,
45 RwLaunchpadYang as launchpadyang,
46 RwNsmYang as rwnsmyang,
47 RwNsrYang as rwnsryang,
48 NsrYang as nsryang,
49 RwResourceMgrYang as rmgryang,
50 RwcalYang as rwcalyang,
51 RwConfigAgentYang as rwcfg_agent,
52 RwlogMgmtYang
53 )
54
55 from gi.repository.RwTypes import RwStatus
56 import rift.mano.examples.ping_pong_nsd as ping_pong_nsd
57 import rift.tasklets
58 import rift.test.dts
59 import rw_peas
60
61
62
63
64 class ManoTestCase(rift.test.dts.AbstractDTSTest):
65 """
66 DTS GI interface unittests
67
68 Note: Each tests uses a list of asyncio.Events for staging through the
69 test. These are required here because we are bring up each coroutine
70 ("tasklet") at the same time and are not implementing any re-try
71 mechanisms. For instance, this is used in numerous tests to make sure that
72 a publisher is up and ready before the subscriber sends queries. Such
73 event lists should not be used in production software.
74 """
75
76 @classmethod
77 def configure_suite(cls, rwmain):
78 nsm_dir = os.environ.get('NSM_DIR')
79
80 rwmain.add_tasklet(nsm_dir, 'rwnsmtasklet')
81
82 @classmethod
83 def configure_schema(cls):
84 return rwnsmyang.get_schema()
85
86 @classmethod
87 def configure_timeout(cls):
88 return 240
89
90 @staticmethod
91 def get_cal_account(account_type, account_name):
92 """
93 Creates an object for class RwcalYang.Clo
94 """
95 account = rwcloudyang.CloudAccount()
96 if account_type == 'mock':
97 account.name = account_name
98 account.account_type = "mock"
99 account.mock.username = "mock_user"
100 elif ((account_type == 'openstack_static') or (account_type == 'openstack_dynamic')):
101 account.name = account_name
102 account.account_type = 'openstack'
103 account.openstack.key = openstack_info['username']
104 account.openstack.secret = openstack_info['password']
105 account.openstack.auth_url = openstack_info['auth_url']
106 account.openstack.tenant = openstack_info['project_name']
107 account.openstack.mgmt_network = openstack_info['mgmt_network']
108 return account
109
110 @asyncio.coroutine
111 def configure_cloud_account(self, dts, cloud_type, cloud_name="cloud1"):
112 account = self.get_cal_account(cloud_type, cloud_name)
113 account_xpath = "C,/rw-cloud:cloud/rw-cloud:account[rw-cloud:name='{}']".format(cloud_name)
114 self.log.info("Configuring cloud-account: %s", account)
115 yield from dts.query_create(account_xpath,
116 rwdts.XactFlag.ADVISE,
117 account)
118
119 @asyncio.coroutine
120 def wait_tasklets(self):
121 yield from asyncio.sleep(5, loop=self.loop)
122
123 def configure_test(self, loop, test_id):
124 self.log.debug("STARTING - %s", self.id())
125 self.tinfo = self.new_tinfo(self.id())
126 self.dts = rift.tasklets.DTS(self.tinfo, self.schema, self.loop)
127
128 def test_create_nsr_record(self):
129
130 @asyncio.coroutine
131 def run_test():
132 yield from self.wait_tasklets()
133
134 cloud_type = "mock"
135 yield from self.configure_cloud_account(self.dts, cloud_type, "mock_account")
136
137
138 # Trigger an rpc
139 rpc_ip = nsryang.YangInput_Nsr_ExecScaleIn.from_dict({
140 'nsr_id_ref': '1',
141 'instance_id': "1",
142 'scaling_group_name_ref': "foo"})
143
144 yield from self.dts.query_rpc("/nsr:exec-scale-in", 0, rpc_ip)
145
146 future = asyncio.ensure_future(run_test(), loop=self.loop)
147 self.run_until(future.done)
148 if future.exception() is not None:
149 self.log.error("Caught exception during test")
150 raise future.exception()
151
152
153 def main():
154 top_dir = __file__[:__file__.find('/modules/core/')]
155 build_dir = os.path.join(top_dir, '.build/modules/core/rwvx/src/core_rwvx-build')
156 launchpad_build_dir = os.path.join(top_dir, '.build/modules/core/mc/core_mc-build/rwlaunchpad')
157
158 if 'NSM_DIR' not in os.environ:
159 os.environ['NSM_DIR'] = os.path.join(launchpad_build_dir, 'plugins/rwnsm')
160
161 runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
162
163 parser = argparse.ArgumentParser()
164 parser.add_argument('-v', '--verbose', action='store_true')
165 parser.add_argument('-n', '--no-runner', action='store_true')
166 args, unittest_args = parser.parse_known_args()
167 if args.no_runner:
168 runner = None
169
170 ManoTestCase.log_level = logging.DEBUG if args.verbose else logging.WARN
171
172 unittest.main(testRunner=runner, argv=[sys.argv[0]] + unittest_args)
173
174 if __name__ == '__main__':
175 main()
176
177 # vim: sw=4