Merge "Revert "Functional spec for cloud-init support""
[osm/SO.git] / rwlaunchpad / plugins / rwlaunchpadtasklet / rift / tasklets / rwlaunchpad / uploader.py
1
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import collections
19 import os
20 import threading
21 import uuid
22 import zlib
23
24 import tornado
25 import tornado.escape
26 import tornado.ioloop
27 import tornado.web
28 import tornado.httputil
29 import tornadostreamform.multipart_streamer as multipart_streamer
30
31 import requests
32
33 # disable unsigned certificate warning
34 from requests.packages.urllib3.exceptions import InsecureRequestWarning
35 requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
36
37 import gi
38 gi.require_version('RwLaunchpadYang', '1.0')
39 gi.require_version('NsdYang', '1.0')
40 gi.require_version('VnfdYang', '1.0')
41
42 from gi.repository import (
43 NsdYang,
44 VnfdYang,
45 )
46 import rift.mano.cloud
47
48 import rift.package.charm
49 import rift.package.checksums
50 import rift.package.config
51 import rift.package.convert
52 import rift.package.icon
53 import rift.package.package
54 import rift.package.script
55 import rift.package.store
56
57 from . import (
58 export,
59 extract,
60 image,
61 message,
62 onboard,
63 state,
64 )
65
66 from .message import (
67 MessageException,
68
69 # Onboard Error Messages
70 OnboardChecksumMismatch,
71 OnboardDescriptorError,
72 OnboardDescriptorExistsError,
73 OnboardDescriptorFormatError,
74 OnboardError,
75 OnboardExtractionError,
76 OnboardImageUploadError,
77 OnboardMissingContentBoundary,
78 OnboardMissingContentType,
79 OnboardMissingTerminalBoundary,
80 OnboardUnreadableHeaders,
81 OnboardUnreadablePackage,
82 OnboardUnsupportedMediaType,
83
84 # Onboard Status Messages
85 OnboardDescriptorOnboard,
86 OnboardFailure,
87 OnboardImageUpload,
88 OnboardPackageUpload,
89 OnboardPackageValidation,
90 OnboardStart,
91 OnboardSuccess,
92
93
94 # Update Error Messages
95 UpdateChecksumMismatch,
96 UpdateDescriptorError,
97 UpdateDescriptorFormatError,
98 UpdateError,
99 UpdateExtractionError,
100 UpdateImageUploadError,
101 UpdateMissingContentBoundary,
102 UpdateMissingContentType,
103 UpdatePackageNotFoundError,
104 UpdateUnreadableHeaders,
105 UpdateUnreadablePackage,
106 UpdateUnsupportedMediaType,
107
108 # Update Status Messages
109 UpdateDescriptorUpdate,
110 UpdateDescriptorUpdated,
111 UpdatePackageUpload,
112 UpdateStart,
113 UpdateSuccess,
114 UpdateFailure,
115 )
116
117 from .tosca import ExportTosca
118
119 MB = 1024 * 1024
120 GB = 1024 * MB
121
122 MAX_STREAMED_SIZE = 5 * GB
123
124
125 class HttpMessageError(Exception):
126 def __init__(self, code, msg):
127 self.code = code
128 self.msg = msg
129
130
131 class GzipTemporaryFileStreamedPart(multipart_streamer.TemporaryFileStreamedPart):
132 def __init__(self, *args, **kwargs):
133 super().__init__(*args, **kwargs)
134
135 # Create a decompressor for gzip data to decompress on the fly during upload
136 # http://stackoverflow.com/questions/2423866/python-decompressing-gzip-chunk-by-chunk
137 self._decompressor = zlib.decompressobj(16 + zlib.MAX_WBITS)
138
139 def feed(self, data):
140 decompressed_data = self._decompressor.decompress(data)
141 if decompressed_data:
142 super().feed(decompressed_data)
143
144 def finalize(self):
145 # All data has arrived, flush the decompressor to get any last decompressed data
146 decompressed_data = self._decompressor.flush()
147 super().feed(decompressed_data)
148 super().finalize()
149
150
151 class GzipMultiPartStreamer(multipart_streamer.MultiPartStreamer):
152 """ This Multipart Streamer decompresses gzip files on the fly during multipart upload """
153
154 @staticmethod
155 def _get_descriptor_name_from_headers(headers):
156 descriptor_filename = None
157
158 for entry in headers:
159 if entry["value"] != "form-data":
160 continue
161
162 form_data_params = entry["params"]
163 if "name" in form_data_params:
164 if form_data_params["name"] != "descriptor":
165 continue
166
167 if "filename" not in form_data_params:
168 continue
169
170 descriptor_filename = form_data_params["filename"]
171
172 return descriptor_filename
173
174 def create_part(self, headers):
175 """ Create the StreamedPart subclass depending on the descriptor filename
176
177 For gzipped descriptor packages, create a GzipTemporaryFileStreamedPart which
178 can decompress the gzip while it's being streamed into the launchpad directely
179 into a file.
180
181 Returns:
182 The descriptor filename
183 """
184 filename = GzipMultiPartStreamer._get_descriptor_name_from_headers(headers)
185 if filename is None or not filename.endswith(".gz"):
186 return multipart_streamer.TemporaryFileStreamedPart(self, headers)
187
188 return GzipTemporaryFileStreamedPart(self, headers)
189
190
191 class RequestHandler(tornado.web.RequestHandler):
192 def options(self, *args, **kargs):
193 pass
194
195 def set_default_headers(self):
196 self.set_header('Access-Control-Allow-Origin', '*')
197 self.set_header('Access-Control-Allow-Headers',
198 'Content-Type, Cache-Control, Accept, X-Requested-With, Authorization')
199 self.set_header('Access-Control-Allow-Methods', 'POST, GET, PUT, DELETE')
200
201
202 @tornado.web.stream_request_body
203 class StreamingUploadHandler(RequestHandler):
204 def initialize(self, log, loop):
205 """Initialize the handler
206
207 Arguments:
208 log - the logger that this handler should use
209 loop - the tasklets ioloop
210
211 """
212 self.transaction_id = str(uuid.uuid4())
213
214 self.loop = loop
215 self.log = self.application.get_logger(self.transaction_id)
216
217 self.part_streamer = None
218
219 self.log.debug('created handler (transaction_id = {})'.format(self.transaction_id))
220
221 def msg_missing_content_type(self):
222 raise NotImplementedError()
223
224 def msg_unsupported_media_type(self):
225 raise NotImplementedError()
226
227 def msg_missing_content_boundary(self):
228 raise NotImplementedError()
229
230 def msg_start(self):
231 raise NotImplementedError()
232
233 def msg_success(self):
234 raise NotImplementedError()
235
236 def msg_failure(self):
237 raise NotImplementedError()
238
239 def msg_package_upload(self):
240 raise NotImplementedError()
241
242 @tornado.gen.coroutine
243 def prepare(self):
244 """Prepare the handler for a request
245
246 The prepare function is the first part of a request transaction. It
247 creates a temporary file that uploaded data can be written to.
248
249 """
250 if self.request.method != "POST":
251 return
252
253 self.request.connection.set_max_body_size(MAX_STREAMED_SIZE)
254
255 self.log.message(self.msg_start())
256
257 try:
258 # Retrieve the content type and parameters from the request
259 content_type = self.request.headers.get('content-type', None)
260 if content_type is None:
261 raise HttpMessageError(400, self.msg_missing_content_type())
262
263 content_type, params = tornado.httputil._parse_header(content_type)
264
265 if "multipart/form-data" != content_type.lower():
266 raise HttpMessageError(415, self.msg_unsupported_media_type())
267
268 if "boundary" not in params:
269 raise HttpMessageError(400, self.msg_missing_content_boundary())
270
271 # You can get the total request size from the headers.
272 try:
273 total = int(self.request.headers.get("Content-Length", "0"))
274 except KeyError:
275 self.log.warning("Content length header not found")
276 # For any well formed browser request, Content-Length should have a value.
277 total = 0
278
279 # And here you create a streamer that will accept incoming data
280 self.part_streamer = GzipMultiPartStreamer(total)
281
282 except HttpMessageError as e:
283 self.log.message(e.msg)
284 self.log.message(self.msg_failure())
285
286 raise tornado.web.HTTPError(e.code, e.msg.name)
287
288 except Exception as e:
289 self.log.exception(e)
290 self.log.message(self.msg_failure())
291
292 @tornado.gen.coroutine
293 def data_received(self, chunk):
294 """Write data to the current file
295
296 Arguments:
297 data - a chunk of data to write to file
298
299 """
300
301 """When a chunk of data is received, we forward it to the multipart streamer."""
302 self.part_streamer.data_received(chunk)
303
304 def post(self):
305 """Handle a post request
306
307 The function is called after any data associated with the body of the
308 request has been received.
309
310 """
311 # You MUST call this to close the incoming stream.
312 self.part_streamer.data_complete()
313
314 desc_parts = self.part_streamer.get_parts_by_name("descriptor")
315 if len(desc_parts) != 1:
316 raise HttpMessageError(400, OnboardError("Descriptor option not found"))
317
318 self.log.message(self.msg_package_upload())
319
320
321 class UploadHandler(StreamingUploadHandler):
322 """
323 This handler is used to upload archives that contain VNFDs, NSDs, and PNFDs
324 to the launchpad. This is a streaming handler that writes uploaded archives
325 to disk without loading them all into memory.
326 """
327
328 def msg_missing_content_type(self):
329 return OnboardMissingContentType()
330
331 def msg_unsupported_media_type(self):
332 return OnboardUnsupportedMediaType()
333
334 def msg_missing_content_boundary(self):
335 return OnboardMissingContentBoundary()
336
337 def msg_start(self):
338 return OnboardStart()
339
340 def msg_success(self):
341 return OnboardSuccess()
342
343 def msg_failure(self):
344 return OnboardFailure()
345
346 def msg_package_upload(self):
347 return OnboardPackageUpload()
348
349 def post(self):
350 """Handle a post request
351
352 The function is called after any data associated with the body of the
353 request has been received.
354
355 """
356 try:
357 super().post()
358 self.application.onboard(
359 self.part_streamer,
360 self.transaction_id,
361 auth=self.request.headers.get('authorization', None),
362 )
363
364 self.set_status(200)
365 self.write(tornado.escape.json_encode({
366 "transaction_id": self.transaction_id,
367 }))
368
369 except Exception:
370 self.log.exception("Upload POST failed")
371 self.part_streamer.release_parts()
372 raise
373
374
375 class UpdateHandler(StreamingUploadHandler):
376 def msg_missing_content_type(self):
377 return UpdateMissingContentType()
378
379 def msg_unsupported_media_type(self):
380 return UpdateUnsupportedMediaType()
381
382 def msg_missing_content_boundary(self):
383 return UpdateMissingContentBoundary()
384
385 def msg_start(self):
386 return UpdateStart()
387
388 def msg_success(self):
389 return UpdateSuccess()
390
391 def msg_failure(self):
392 return UpdateFailure()
393
394 def msg_package_upload(self):
395 return UpdatePackageUpload()
396
397 def post(self):
398 """Handle a post request
399
400 The function is called after any data associated with the body of the
401 request has been received.
402
403 """
404 try:
405 super().post()
406
407 self.application.update(
408 self.part_streamer,
409 self.transaction_id,
410 auth=self.request.headers.get('authorization', None),
411 )
412
413 self.set_status(200)
414 self.write(tornado.escape.json_encode({
415 "transaction_id": self.transaction_id,
416 }))
417 except Exception:
418 self.log.exception("Upload POST failed")
419 self.part_streamer.release_parts()
420 raise
421
422
423 class UploadStateHandler(state.StateHandler):
424 STARTED = OnboardStart
425 SUCCESS = OnboardSuccess
426 FAILURE = OnboardFailure
427
428
429 class UpdateStateHandler(state.StateHandler):
430 STARTED = UpdateStart
431 SUCCESS = UpdateSuccess
432 FAILURE = UpdateFailure
433
434
435 class UpdatePackage(threading.Thread):
436 def __init__(self, log, loop, part_streamer, auth,
437 onboarder, uploader, package_store_map):
438 super().__init__()
439 self.log = log
440 self.loop = loop
441 self.part_streamer = part_streamer
442 self.auth = auth
443 self.onboarder = onboarder
444 self.uploader = uploader
445 self.package_store_map = package_store_map
446
447 self.io_loop = tornado.ioloop.IOLoop.current()
448
449 def _update_package(self):
450 # Extract package could return multiple packages if
451 # the package is converted
452 for pkg in self.extract_package():
453 with pkg as temp_package:
454 package_checksums = self.validate_package(temp_package)
455 stored_package = self.update_package(temp_package)
456
457 try:
458 self.extract_charms(temp_package)
459 self.extract_scripts(temp_package)
460 self.extract_configs(temp_package)
461 self.extract_icons(temp_package)
462
463 self.update_descriptors(temp_package)
464
465 except Exception:
466 self.delete_stored_package(stored_package)
467 raise
468
469 else:
470 self.upload_images(temp_package, package_checksums)
471
472 def run(self):
473 try:
474 self._update_package()
475 self.log.message(UpdateSuccess())
476
477 except MessageException as e:
478 self.log.message(e.msg)
479 self.log.message(UpdateFailure())
480
481 except Exception as e:
482 self.log.exception(e)
483 if str(e):
484 self.log.message(UpdateError(str(e)))
485 self.log.message(UpdateFailure())
486
487 def extract_package(self):
488 """Extract multipart message from tarball"""
489 desc_part = self.part_streamer.get_parts_by_name("descriptor")[0]
490
491 # Invoke the move API to prevent the part streamer from attempting
492 # to clean up (the file backed package will do that itself)
493 desc_part.move(desc_part.f_out.name)
494
495 package_name = desc_part.get_filename()
496 package_path = desc_part.f_out.name
497
498 extractor = extract.UploadPackageExtractor(self.log)
499 file_backed_packages = extractor.create_packages_from_upload(
500 package_name, package_path
501 )
502
503 return file_backed_packages
504
505 def get_package_store(self, package):
506 return self.package_store_map[package.descriptor_type]
507
508 def update_package(self, package):
509 store = self.get_package_store(package)
510
511 try:
512 store.update_package(package)
513 except rift.package.store.PackageNotFoundError as e:
514 # If the package doesn't exist, then it is possible the descriptor was onboarded
515 # out of band. In that case, just store the package as is
516 self.log.warning("Package not found, storing new package instead.")
517 store.store_package(package)
518
519 stored_package = store.get_package(package.descriptor_id)
520
521 return stored_package
522
523 def delete_stored_package(self, package):
524 self.log.info("Deleting stored package: %s", package)
525 store = self.get_package_store(package)
526 try:
527 store.delete_package(package.descriptor_id)
528 except Exception as e:
529 self.log.warning("Failed to delete package from store: %s", str(e))
530
531 def upload_images(self, package, package_checksums):
532 image_file_map = rift.package.image.get_package_image_files(package)
533 name_hdl_map = {name: package.open(image_file_map[name]) for name in image_file_map}
534 if not image_file_map:
535 return
536
537 try:
538 for image_name, image_hdl in name_hdl_map.items():
539 image_file = image_file_map[image_name]
540 if image_file in package_checksums:
541 image_checksum = package_checksums[image_file]
542 else:
543 self.log.warning("checksum not provided for image %s. Calculating checksum",
544 image_file)
545 image_checksum = rift.package.checksums.checksum(
546 package.open(image_file_map[image_name])
547 )
548 try:
549 self.uploader.upload_image(image_name, image_checksum, image_hdl)
550 self.uploader.upload_image_to_cloud_accounts(image_name, image_checksum)
551
552 except image.ImageUploadError as e:
553 self.log.exception("Failed to upload image: %s", image_name)
554 raise MessageException(OnboardImageUploadError(str(e))) from e
555
556 finally:
557 _ = [image_hdl.close() for image_hdl in name_hdl_map.values()]
558
559
560 def extract_charms(self, package):
561 try:
562 charm_extractor = rift.package.charm.PackageCharmExtractor(self.log)
563 charm_extractor.extract_charms(package)
564 except rift.package.charm.CharmExtractionError as e:
565 raise MessageException(UpdateExtractionError()) from e
566
567 def extract_scripts(self, package):
568 try:
569 script_extractor = rift.package.script.PackageScriptExtractor(self.log)
570 script_extractor.extract_scripts(package)
571 except rift.package.script.ScriptExtractionError as e:
572 raise MessageException(UpdateExtractionError()) from e
573
574 def extract_configs(self, package):
575 try:
576 config_extractor = rift.package.config.PackageConfigExtractor(self.log)
577 config_extractor.extract_configs(package)
578 except rift.package.config.ConfigExtractionError as e:
579 raise MessageException(UpdateExtractionError()) from e
580
581 def extract_icons(self, package):
582 try:
583 icon_extractor = rift.package.icon.PackageIconExtractor(self.log)
584 icon_extractor.extract_icons(package)
585 except rift.package.icon.IconExtractionError as e:
586 raise MessageException(UpdateExtractionError()) from e
587
588 def validate_package(self, package):
589 checksum_validator = rift.package.package.PackageChecksumValidator(self.log)
590
591 try:
592 file_checksums = checksum_validator.validate(package)
593 except rift.package.package.PackageFileChecksumError as e:
594 raise MessageException(UpdateChecksumMismatch(e.filename)) from e
595 except rift.package.package.PackageValidationError as e:
596 raise MessageException(UpdateUnreadablePackage()) from e
597
598 return file_checksums
599
600 def update_descriptors(self, package):
601 descriptor_msg = package.descriptor_msg
602
603 self.log.message(UpdateDescriptorUpdate())
604
605 try:
606 self.onboarder.update(descriptor_msg)
607 except onboard.UpdateError as e:
608 raise MessageException(UpdateDescriptorError(package.descriptor_file)) from e
609
610
611 class OnboardPackage(threading.Thread):
612 def __init__(self, log, loop, part_streamer, auth,
613 onboarder, uploader, package_store_map):
614 super().__init__()
615 self.log = log
616 self.loop = loop
617 self.part_streamer = part_streamer
618 self.auth = auth
619 self.onboarder = onboarder
620 self.uploader = uploader
621 self.package_store_map = package_store_map
622
623 self.io_loop = tornado.ioloop.IOLoop.current()
624
625 def _onboard_package(self):
626 # Extract package could return multiple packages if
627 # the package is converted
628 for pkg in self.extract_package():
629 with pkg as temp_package:
630 package_checksums = self.validate_package(temp_package)
631 stored_package = self.store_package(temp_package)
632
633 try:
634 self.extract_charms(temp_package)
635 self.extract_scripts(temp_package)
636 self.extract_configs(temp_package)
637 self.extract_icons(temp_package)
638
639 self.onboard_descriptors(temp_package)
640
641 except Exception:
642 self.delete_stored_package(stored_package)
643 raise
644
645 else:
646 self.upload_images(temp_package, package_checksums)
647
648 def run(self):
649 try:
650 self._onboard_package()
651 self.log.message(OnboardSuccess())
652
653 except MessageException as e:
654 self.log.message(e.msg)
655 self.log.message(OnboardFailure())
656
657 except Exception as e:
658 self.log.exception(e)
659 if str(e):
660 self.log.message(OnboardError(str(e)))
661 self.log.message(OnboardFailure())
662
663 finally:
664 self.part_streamer.release_parts()
665
666 def extract_package(self):
667 """Extract multipart message from tarball"""
668 desc_part = self.part_streamer.get_parts_by_name("descriptor")[0]
669
670 # Invoke the move API to prevent the part streamer from attempting
671 # to clean up (the file backed package will do that itself)
672 desc_part.move(desc_part.f_out.name)
673
674 package_name = desc_part.get_filename()
675 package_path = desc_part.f_out.name
676
677 extractor = extract.UploadPackageExtractor(self.log)
678 file_backed_packages = extractor.create_packages_from_upload(
679 package_name, package_path
680 )
681
682 return file_backed_packages
683
684 def get_package_store(self, package):
685 return self.package_store_map[package.descriptor_type]
686
687 def store_package(self, package):
688 store = self.get_package_store(package)
689
690 try:
691 store.store_package(package)
692 except rift.package.store.PackageExistsError as e:
693 store.update_package(package)
694
695 stored_package = store.get_package(package.descriptor_id)
696
697 return stored_package
698
699 def delete_stored_package(self, package):
700 self.log.info("Deleting stored package: %s", package)
701 store = self.get_package_store(package)
702 try:
703 store.delete_package(package.descriptor_id)
704 except Exception as e:
705 self.log.warning("Failed to delete package from store: %s", str(e))
706
707 def upload_images(self, package, package_checksums):
708 image_file_map = rift.package.image.get_package_image_files(package)
709 if not image_file_map:
710 return
711
712 name_hdl_map = {name: package.open(image_file_map[name]) for name in image_file_map}
713 try:
714 for image_name, image_hdl in name_hdl_map.items():
715 image_file = image_file_map[image_name]
716 if image_file in package_checksums:
717 image_checksum = package_checksums[image_file]
718 else:
719 self.log.warning("checksum not provided for image %s. Calculating checksum",
720 image_file)
721 image_checksum = rift.package.checksums.checksum(
722 package.open(image_file_map[image_name])
723 )
724 try:
725 self.uploader.upload_image(image_name, image_checksum, image_hdl)
726 self.uploader.upload_image_to_cloud_accounts(image_name, image_checksum)
727
728 except image.ImageUploadError as e:
729 raise MessageException(OnboardImageUploadError()) from e
730
731 finally:
732 _ = [image_hdl.close() for image_hdl in name_hdl_map.values()]
733
734 def extract_charms(self, package):
735 try:
736 charm_extractor = rift.package.charm.PackageCharmExtractor(self.log)
737 charm_extractor.extract_charms(package)
738 except rift.package.charm.CharmExtractionError as e:
739 raise MessageException(OnboardExtractionError()) from e
740
741 def extract_scripts(self, package):
742 try:
743 script_extractor = rift.package.script.PackageScriptExtractor(self.log)
744 script_extractor.extract_scripts(package)
745 except rift.package.script.ScriptExtractionError as e:
746 raise MessageException(OnboardExtractionError()) from e
747
748 def extract_configs(self, package):
749 try:
750 config_extractor = rift.package.config.PackageConfigExtractor(self.log)
751 config_extractor.extract_configs(package)
752 except rift.package.config.ConfigExtractionError as e:
753 raise MessageException(OnboardExtractionError()) from e
754
755 def extract_icons(self, package):
756 try:
757 icon_extractor = rift.package.icon.PackageIconExtractor(self.log)
758 icon_extractor.extract_icons(package)
759 except rift.package.icon.IconExtractionError as e:
760 raise MessageException(OnboardExtractionError()) from e
761
762 def validate_package(self, package):
763 checksum_validator = rift.package.package.PackageChecksumValidator(self.log)
764
765 try:
766 file_checksums = checksum_validator.validate(package)
767 except rift.package.package.PackageFileChecksumError as e:
768 raise MessageException(OnboardChecksumMismatch(e.filename)) from e
769 except rift.package.package.PackageValidationError as e:
770 raise MessageException(OnboardUnreadablePackage()) from e
771
772 return file_checksums
773
774 def onboard_descriptors(self, package):
775 descriptor_msg = package.descriptor_msg
776
777 self.log.message(OnboardDescriptorOnboard())
778
779 try:
780 self.onboarder.onboard(descriptor_msg)
781 except onboard.OnboardError as e:
782 raise MessageException(OnboardDescriptorError(package.descriptor_file)) from e
783
784
785 class UploaderApplication(tornado.web.Application):
786 def __init__(self, tasklet):
787 self.tasklet = tasklet
788 self.accounts = []
789 self.messages = collections.defaultdict(list)
790 self.export_dir = os.path.join(os.environ['RIFT_ARTIFACTS'], 'launchpad/exports')
791
792 manifest = tasklet.tasklet_info.get_pb_manifest()
793 self.use_ssl = manifest.bootstrap_phase.rwsecurity.use_ssl
794 self.ssl_cert = manifest.bootstrap_phase.rwsecurity.cert
795 self.ssl_key = manifest.bootstrap_phase.rwsecurity.key
796
797 self.uploader = image.ImageUploader(self.log, self.loop, tasklet.dts)
798 self.onboarder = onboard.DescriptorOnboarder(
799 self.log, "127.0.0.1", 8008, self.use_ssl, self.ssl_cert, self.ssl_key
800 )
801 self.package_store_map = {
802 "vnfd": self.tasklet.vnfd_package_store,
803 "nsd": self.tasklet.nsd_package_store,
804 }
805
806 self.exporter = export.DescriptorPackageArchiveExporter(self.log)
807 self.loop.create_task(export.periodic_export_cleanup(self.log, self.loop, self.export_dir))
808
809 attrs = dict(log=self.log, loop=self.loop)
810
811 export_attrs = attrs.copy()
812 export_attrs.update({
813 "store_map": self.package_store_map,
814 "exporter": self.exporter,
815 "catalog_map": {
816 "vnfd": self.vnfd_catalog,
817 "nsd": self.nsd_catalog
818 }
819 })
820
821 super(UploaderApplication, self).__init__([
822 (r"/api/update", UpdateHandler, attrs),
823 (r"/api/upload", UploadHandler, attrs),
824
825 (r"/api/upload/([^/]+)/state", UploadStateHandler, attrs),
826 (r"/api/update/([^/]+)/state", UpdateStateHandler, attrs),
827 (r"/api/export/([^/]+)/state", export.ExportStateHandler, attrs),
828
829 (r"/api/export/(nsd|vnfd)$", export.ExportHandler, export_attrs),
830 (r"/api/export/([^/]+.tar.gz)", tornado.web.StaticFileHandler, {
831 "path": self.export_dir,
832 }),
833 (r"/api/export/([^/]+.zip)", tornado.web.StaticFileHandler, {
834 "path": self.export_dir,
835 }),
836 ])
837
838 @property
839 def log(self):
840 return self.tasklet.log
841
842 @property
843 def loop(self):
844 return self.tasklet.loop
845
846 def get_logger(self, transaction_id):
847 return message.Logger(self.log, self.messages[transaction_id])
848
849 def onboard(self, part_streamer, transaction_id, auth=None):
850 log = message.Logger(self.log, self.messages[transaction_id])
851
852 OnboardPackage(
853 log,
854 self.loop,
855 part_streamer,
856 auth,
857 self.onboarder,
858 self.uploader,
859 self.package_store_map,
860 ).start()
861
862 def update(self, part_streamer, transaction_id, auth=None):
863 log = message.Logger(self.log, self.messages[transaction_id])
864
865 UpdatePackage(
866 log,
867 self.loop,
868 part_streamer,
869 auth,
870 self.onboarder,
871 self.uploader,
872 self.package_store_map,
873 ).start()
874
875 @property
876 def vnfd_catalog(self):
877 return self.tasklet.vnfd_catalog
878
879 @property
880 def nsd_catalog(self):
881 return self.tasklet.nsd_catalog