2 # Copyright 2016 RIFT.IO Inc
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
16 # Author(s): Varun Prasad
17 # Creation Date: 09/28/2016
20 import tornado
.httpclient
22 import tornadostreamform
.multipart_streamer
as multipart_streamer
29 MAX_STREAMED_SIZE
= 5 * GB
31 class HttpMessageError(Exception):
32 def __init__(self
, code
, msg
):
37 class RequestHandler(tornado
.web
.RequestHandler
):
38 def options(self
, *args
, **kargs
):
41 def set_default_headers(self
):
42 self
.set_header('Access-Control-Allow-Origin', '*')
43 self
.set_header('Access-Control-Allow-Headers',
44 'Content-Type, Cache-Control, Accept, X-Requested-With, Authorization')
45 self
.set_header('Access-Control-Allow-Methods', 'POST, GET, PUT, DELETE')
48 class StoreStreamerPart(multipart_streamer
.MultiPartStreamer
):
50 Create a Part streamer with a custom temp directory. Using the default
51 tmp directory and trying to move the file to $RIFT_VAR_ROOT occasionally
52 causes link errors. So create a temp directory within the staging area.
54 def __init__(self
, store
, *args
, **kwargs
):
55 super().__init
__(*args
, **kwargs
)
58 def create_part(self
, headers
):
59 #RIFT-18071: tmp directory was not getting created - throwing an error in the system test cases in HA failover.
60 if not os
.path
.exists(self
.store
.tmp_dir
):
61 os
.makedirs(self
.store
.tmp_dir
)
62 return multipart_streamer
.TemporaryFileStreamedPart(self
, headers
, tmp_dir
=self
.store
.tmp_dir
)
65 @tornado.web
.stream_request_body
66 class UploadStagingHandler(RequestHandler
):
67 def initialize(self
, store
):
68 """Initialize the handler
71 log - the logger that this handler should use
72 loop - the tasklets ioloop
75 self
.log
= logging
.getLogger()
78 self
.part_streamer
= None
80 @tornado.gen
.coroutine
82 """Prepare the handler for a request
84 The prepare function is the first part of a request transaction. It
85 creates a temporary file that uploaded data can be written to.
88 if self
.request
.method
!= "POST":
91 self
.request
.connection
.set_max_body_size(MAX_STREAMED_SIZE
)
93 # Retrieve the content type and parameters from the request
94 content_type
= self
.request
.headers
.get('content-type', None)
95 if content_type
is None:
96 raise tornado
.httpclient
.HTTPError(400, "No content type set")
98 content_type
, params
= tornado
.httputil
._parse
_header
(content_type
)
100 if "multipart/form-data" != content_type
.lower():
101 raise tornado
.httpclient
.HTTPError(415, "Invalid content type")
103 # You can get the total request size from the headers.
105 total
= int(self
.request
.headers
.get("Content-Length", "0"))
107 self
.log
.warning("Content length header not found")
108 # For any well formed browser request, Content-Length should have a value.
111 # And here you create a streamer that will accept incoming data
112 self
.part_streamer
= StoreStreamerPart(self
.store
, total
)
115 @tornado.gen
.coroutine
116 def data_received(self
, chunk
):
117 """When a chunk of data is received, we forward it to the multipart streamer."""
118 self
.part_streamer
.data_received(chunk
)
120 def post(self
, staging_id
):
121 """Handle a post request
123 The function is called after any data associated with the body of the
124 request has been received.
128 # You MUST call this to close the incoming stream.
129 self
.part_streamer
.data_complete()
130 desc_parts
= self
.part_streamer
.get_parts_by_name("file")
131 if len(desc_parts
) != 1:
132 raise tornado
.httpclient
.HTTPError(400, "File option not found")
134 binary_data
= desc_parts
[0]
135 staging_area
= self
.store
.get_staging_area(staging_id
)
136 filename
= binary_data
.get_filename()
137 staging_area
.model
.name
= filename
138 staging_area
.model
.size
= binary_data
.get_size()
140 dest_file
= os
.path
.join(staging_area
.model
.path
, filename
)
141 binary_data
.move(dest_file
)
144 self
.write(tornado
.escape
.json_encode({
145 "path": "/api/download/{}/{}".format(staging_id
, filename
)
149 self
.part_streamer
.release_parts()