ec8e1059ae59dfcf604026ea38711cbb20d98814
[osm/SO.git] / rwlaunchpad / plugins / rwstagingmgr / test / utest_tornado_app.py
1 #!/usr/bin/env python3
2
3 #
4 # Copyright 2016 RIFT.IO Inc
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 #
18
19
20 import argparse
21 import asyncio
22 import logging
23 import os
24 import sys
25 import tornado.testing
26 import tornado.web
27 import tempfile
28 import unittest
29 import json
30 import xmlrunner
31 import urllib.parse
32 from requests_toolbelt import MultipartEncoder
33 import mock
34 import uuid
35 import shutil
36 import requests
37 import filecmp
38 import yaml
39 import time
40 import shutil
41 from rift.rwlib.util import certs
42
43 from rift.package.handler import FileRestApiHandler
44 from rift.tasklets.rwstagingmgr.server.app import StagingApplication, CleanUpStaging
45 from rift.tasklets.rwstagingmgr.model import StagingArea
46
47 import gi
48 gi.require_version('RwStagingMgmtYang', '1.0')
49 from gi.repository import (
50 RwStagingMgmtYang,
51 )
52
53
54 class TestCase(tornado.testing.AsyncHTTPTestCase):
55 def setUp(self):
56 self._log = logging.getLogger(__file__)
57 self._loop = asyncio.get_event_loop()
58
59 super().setUp()
60 self._port = self.get_http_port()
61
62 def get_new_ioloop(self):
63 return tornado.platform.asyncio.AsyncIOMainLoop()
64
65 def create_mock_store(self):
66 self.staging_dir_tmp = tempfile.mkdtemp()
67 self.staging_id = str(uuid.uuid4())
68 self.staging_dir = os.path.join(self.staging_dir_tmp, self.staging_id)
69 os.makedirs(self.staging_dir)
70 mock_model = RwStagingMgmtYang.StagingArea.from_dict({
71 'path': self.staging_dir,
72 "validity_time": int(time.time()) + 5
73 })
74
75 with open(os.path.join(self.staging_dir, "meta.yaml"), "w") as fh:
76 yaml.dump(mock_model.as_dict(), fh, default_flow_style=True)
77
78 mock_model = StagingArea(mock_model)
79 store = mock.MagicMock()
80 store.get_staging_area.return_value = mock_model
81 store.root_dir = self.staging_dir_tmp
82 store.tmp_dir = self.staging_dir_tmp
83 store.META_YAML = "meta.yaml"
84 store.remove_staging_area = mock.Mock(return_value=None)
85
86 return store, mock_model
87
88 def create_tmp_file(self):
89 _, self.temp_file = tempfile.mkstemp()
90 with open(self.temp_file, "w") as fh:
91 fh.write("Lorem Ipsum")
92
93 return self.temp_file
94
95
96 def get_app(self):
97 self.store, self.mock_model = self.create_mock_store()
98 return StagingApplication(self.store, cleanup_interval=5)
99
100 def test_file_upload_and_download(self):
101 """
102
103 Asserts:
104 1. The file upload
105 2. the response of the file upload
106 3. Finally downloads the file and verifies if the uploaded and download
107 files are the same.
108 4. Verify if the directory is cleaned up after expiry
109 """
110 temp_file = self.create_tmp_file()
111 form = MultipartEncoder(fields={
112 'file': (os.path.basename(temp_file), open(temp_file, 'rb'), 'application/octet-stream')})
113
114 # Upload
115 response = self.fetch("/api/upload/{}".format(self.staging_id),
116 method="POST",
117 body=form.to_string(),
118 headers={"Content-Type": "multipart/form-data"})
119
120 assert response.code == 200
121 assert os.path.isfile(os.path.join(
122 self.staging_dir,
123 os.path.basename(temp_file)))
124 assert self.staging_id in response.body.decode("utf-8")
125
126 response = response.body.decode("utf-8")
127 response = json.loads(response)
128
129 # Download
130 _, downloaded_file = tempfile.mkstemp()
131 response = self.fetch(response['path'])
132
133 with open(downloaded_file, 'wb') as fh:
134 fh.write(response.body)
135
136 assert filecmp.cmp(temp_file, downloaded_file)
137
138 print (self.get_url('/'))
139 print (self.staging_dir)
140 time.sleep(5)
141 self.store.remove_staging_area.assert_called_once_with(self.mock_model)
142
143 def tearDown(self):
144 shutil.rmtree(self.staging_dir_tmp)
145
146
147 def main(argv=sys.argv[1:]):
148 logging.basicConfig(format='TEST %(message)s')
149
150 runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
151 parser = argparse.ArgumentParser()
152 parser.add_argument('-v', '--verbose', action='store_true')
153 parser.add_argument('-n', '--no-runner', action='store_true')
154
155 args, unknown = parser.parse_known_args(argv)
156 if args.no_runner:
157 runner = None
158
159 # Set the global logging level
160 logging.getLogger().setLevel(logging.DEBUG if args.verbose else logging.ERROR)
161
162 # The unittest framework requires a program name, so use the name of this
163 # file instead (we do not want to have to pass a fake program name to main
164 # when this is called from the interpreter).
165 unittest.main(argv=[__file__] + unknown + ["-v"], testRunner=runner)
166
167 if __name__ == '__main__':
168 main()