3 # Copyright 2016 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
30 import tornado
.httputil
31 import tornadostreamform
.multipart_streamer
as multipart_streamer
35 # disable unsigned certificate warning
36 from requests
.packages
.urllib3
.exceptions
import InsecureRequestWarning
37 requests
.packages
.urllib3
.disable_warnings(InsecureRequestWarning
)
40 gi
.require_version('RwLaunchpadYang', '1.0')
41 gi
.require_version('ProjectNsdYang', '1.0')
42 gi
.require_version('ProjectVnfdYang', '1.0')
44 from gi
.repository
import (
45 ProjectNsdYang
as NsdYang
,
46 ProjectVnfdYang
as VnfdYang
,
48 import rift
.mano
.cloud
50 import rift
.package
.charm
51 import rift
.package
.checksums
52 import rift
.package
.config
53 import rift
.package
.convert
54 import rift
.package
.handler
as pkg_handler
55 import rift
.package
.icon
56 import rift
.package
.package
57 import rift
.package
.script
58 import rift
.package
.store
60 from gi
.repository
import (
63 import rift
.downloader
as downloader
64 import rift
.mano
.dts
as mano_dts
76 from .message
import (
79 # Onboard Error Messages
80 OnboardChecksumMismatch
,
81 OnboardDescriptorError
,
82 OnboardDescriptorExistsError
,
83 OnboardDescriptorFormatError
,
85 OnboardExtractionError
,
86 OnboardImageUploadError
,
87 OnboardMissingContentBoundary
,
88 OnboardMissingContentType
,
89 OnboardMissingTerminalBoundary
,
90 OnboardUnreadableHeaders
,
91 OnboardUnreadablePackage
,
92 OnboardUnsupportedMediaType
,
94 # Onboard Status Messages
95 OnboardDescriptorOnboard
,
99 OnboardPackageValidation
,
106 # Update Error Messages
107 UpdateChecksumMismatch
,
108 UpdateDescriptorError
,
109 UpdateDescriptorFormatError
,
111 UpdateExtractionError
,
112 UpdateImageUploadError
,
113 UpdateMissingContentBoundary
,
114 UpdateMissingContentType
,
115 UpdatePackageNotFoundError
,
116 UpdateUnreadableHeaders
,
117 UpdateUnreadablePackage
,
118 UpdateUnsupportedMediaType
,
120 # Update Status Messages
121 UpdateDescriptorUpdate
,
122 UpdateDescriptorUpdated
,
129 from .tosca
import ExportTosca
134 MAX_STREAMED_SIZE
= 5 * GB
137 RPC_PACKAGE_CREATE_ENDPOINT
= RwPkgMgmtYang
.YangOutput_RwPkgMgmt_PackageCreate
138 RPC_PACKAGE_UPDATE_ENDPOINT
= RwPkgMgmtYang
.YangOutput_RwPkgMgmt_PackageUpdate
142 class HttpMessageError(Exception):
143 def __init__(self
, code
, msg
):
148 class UploadRpcHandler(mano_dts
.AbstractRpcHandler
):
149 def __init__(self
, application
):
152 application: UploaderApplication
154 super().__init
__(application
.log
, application
.dts
, application
.loop
)
155 self
.application
= application
159 return "/rw-pkg-mgmt:package-create"
162 def callback(self
, ks_path
, msg
):
163 transaction_id
= str(uuid
.uuid4())
164 log
= self
.application
.get_logger(transaction_id
)
165 log
.message(OnboardStart())
167 self
.log
.debug("Package create RPC: {}".format(msg
))
170 if msg
.username
is not None:
171 auth
= (msg
.username
, msg
.password
)
174 project
= msg
.project_name
175 except AttributeError as e
:
176 self
._log
.warning("Did not get project name in RPC: {}".
177 format(msg
.as_dict()))
178 project
= rift
.mano
.utils
.project
.DEFAULT_PROJECT
180 self
.application
.onboard(
187 rpc_op
= RPC_PACKAGE_CREATE_ENDPOINT
.from_dict({
188 "transaction_id": transaction_id
,
189 "project_name": project
,
195 class UpdateRpcHandler(mano_dts
.AbstractRpcHandler
):
196 def __init__(self
, application
):
199 application: UploaderApplication
201 super().__init
__(application
.log
, application
.dts
, application
.loop
)
202 self
.application
= application
206 return "/rw-pkg-mgmt:package-update"
209 def callback(self
, ks_path
, msg
):
211 transaction_id
= str(uuid
.uuid4())
212 log
= self
.application
.get_logger(transaction_id
)
213 log
.message(UpdateStart())
216 if msg
.username
is not None:
217 auth
= (msg
.username
, msg
.password
)
219 self
.application
.update(
223 project
=msg
.project_name
,
226 rpc_op
= RPC_PACKAGE_UPDATE_ENDPOINT
.from_dict({
227 "transaction_id": transaction_id
,
228 "project_name": msg
.project_name
,
234 class UploadStateHandler(state
.StateHandler
):
235 STARTED
= OnboardStart
236 SUCCESS
= OnboardSuccess
237 FAILURE
= OnboardFailure
240 class UpdateStateHandler(state
.StateHandler
):
241 STARTED
= UpdateStart
242 SUCCESS
= UpdateSuccess
243 FAILURE
= UpdateFailure
246 class UpdatePackage(downloader
.DownloaderProtocol
):
248 def __init__(self
, log
, loop
, project
, url
, auth
,
249 onboarder
, uploader
, package_store_map
):
253 self
.project
= project
256 self
.onboarder
= onboarder
257 self
.uploader
= uploader
258 self
.package_store_map
= package_store_map
261 def _update_package(self
, packages
):
263 # Extract package could return multiple packages if
264 # the package is converted
266 with pkg
as temp_package
:
267 package_checksums
= self
.validate_package(temp_package
)
268 stored_package
= self
.update_package(temp_package
)
269 self
.validate_vnfd_fields(temp_package
)
272 self
.extract_charms(temp_package
)
273 self
.extract_scripts(temp_package
)
274 self
.extract_configs(temp_package
)
275 self
.extract_icons(temp_package
)
277 self
.update_descriptors(temp_package
)
280 self
.delete_stored_package(stored_package
)
284 self
.upload_images(temp_package
, package_checksums
)
286 def extract(self
, packages
):
288 self
._update
_package
(packages
)
289 self
.log
.message(UpdateSuccess())
291 except MessageException
as e
:
292 self
.log
.message(e
.msg
)
293 self
.log
.message(UpdateFailure())
295 except Exception as e
:
296 self
.log
.exception(e
)
298 self
.log
.message(UpdateError(str(e
)))
299 self
.log
.message(UpdateFailure())
301 def on_download_succeeded(self
, job
):
302 self
.log
.message(DownloadSuccess("Package downloaded."))
304 extractor
= extract
.UploadPackageExtractor(self
.log
)
305 file_backed_packages
= extractor
.create_packages_from_upload(
306 job
.filename
, job
.filepath
309 self
.extract(file_backed_packages
)
311 def on_download_failed(self
, job
):
312 self
.log
.error(job
.detail
)
313 self
.log
.message(DownloadError("Package download failed. {}".format(job
.detail
)))
314 self
.log
.message(UpdateFailure())
316 def download_package(self
):
318 _
, filename
= tempfile
.mkstemp()
319 url_downloader
= downloader
.UrlDownloader(
323 decompress_on_fly
=True,
325 url_downloader
.delegate
= self
326 url_downloader
.download()
328 def get_package_store(self
, package
):
329 return self
.package_store_map
[package
.descriptor_type
]
331 def update_package(self
, package
):
332 store
= self
.get_package_store(package
)
335 store
.update_package(package
)
336 except rift
.package
.store
.PackageNotFoundError
as e
:
337 # If the package doesn't exist, then it is possible the descriptor was onboarded
338 # out of band. In that case, just store the package as is
339 self
.log
.warning("Package not found, storing new package instead.")
340 store
.store_package(package
)
342 stored_package
= store
.get_package(package
.descriptor_id
)
344 return stored_package
346 def delete_stored_package(self
, package
):
347 self
.log
.info("Deleting stored package: %s", package
)
348 store
= self
.get_package_store(package
)
350 store
.delete_package(package
.descriptor_id
)
351 except Exception as e
:
352 self
.log
.warning("Failed to delete package from store: %s", str(e
))
354 def upload_images(self
, package
, package_checksums
):
355 image_file_map
= rift
.package
.image
.get_package_image_files(package
)
356 name_hdl_map
= {name
: package
.open(image_file_map
[name
]) for name
in image_file_map
}
357 if not image_file_map
:
361 for image_name
, image_hdl
in name_hdl_map
.items():
362 image_file
= image_file_map
[image_name
]
363 if image_file
in package_checksums
:
364 image_checksum
= package_checksums
[image_file
]
366 self
.log
.warning("checksum not provided for image %s. Calculating checksum",
368 image_checksum
= rift
.package
.checksums
.checksum(
369 package
.open(image_file_map
[image_name
])
372 self
.uploader
.upload_image(image_name
, image_checksum
, image_hdl
)
373 self
.uploader
.upload_image_to_cloud_accounts(image_name
, image_checksum
, self
.project
)
375 except image
.ImageUploadError
as e
:
376 self
.log
.exception("Failed to upload image: %s", image_name
)
377 raise MessageException(OnboardImageUploadError(str(e
))) from e
380 _
= [image_hdl
.close() for image_hdl
in name_hdl_map
.values()]
382 def extract_charms(self
, package
):
384 charm_extractor
= rift
.package
.charm
.PackageCharmExtractor(self
.log
)
385 charm_extractor
.extract_charms(package
)
386 except rift
.package
.charm
.CharmExtractionError
as e
:
387 raise MessageException(UpdateExtractionError()) from e
389 def extract_scripts(self
, package
):
391 script_extractor
= rift
.package
.script
.PackageScriptExtractor(self
.log
)
392 script_extractor
.extract_scripts(package
)
393 except rift
.package
.script
.ScriptExtractionError
as e
:
394 raise MessageException(UpdateExtractionError()) from e
396 def extract_configs(self
, package
):
398 config_extractor
= rift
.package
.config
.PackageConfigExtractor(self
.log
)
399 config_extractor
.extract_configs(package
)
400 except rift
.package
.config
.ConfigExtractionError
as e
:
401 raise MessageException(UpdateExtractionError()) from e
403 def extract_icons(self
, package
):
405 icon_extractor
= rift
.package
.icon
.PackageIconExtractor(self
.log
)
406 icon_extractor
.extract_icons(package
)
407 except rift
.package
.icon
.IconExtractionError
as e
:
408 raise MessageException(UpdateExtractionError()) from e
410 def validate_vnfd_fields(self
, package
):
411 # We can add more VNFD validations here. Currently we are validating only cloud-init
412 if package
.descriptor_msg
is not None:
413 self
.validate_cloud_init_file(package
)
415 def validate_cloud_init_file(self
, package
):
416 """ This validation is for VNFDs with associated VDUs. """
417 if 'vdu' in package
.descriptor_msg
.as_dict():
418 for vdu
in package
.descriptor_msg
.as_dict()['vdu']:
419 if 'cloud_init_file' in vdu
:
420 cloud_init_file
= vdu
['cloud_init_file']
421 for file in package
.files
:
422 if file.endswith('/' + cloud_init_file
) is True:
424 raise MessageException(
425 OnboardError("Cloud-Init file reference in VNFD does not match with cloud-init file"))
427 def validate_package(self
, package
):
428 checksum_validator
= rift
.package
.package
.PackageChecksumValidator(self
.log
)
431 file_checksums
= checksum_validator
.validate(package
)
432 except rift
.package
.package
.PackageFileChecksumError
as e
:
433 raise MessageException(UpdateChecksumMismatch(e
.filename
)) from e
434 except rift
.package
.package
.PackageValidationError
as e
:
435 raise MessageException(UpdateUnreadablePackage()) from e
437 return file_checksums
439 def update_descriptors(self
, package
):
440 descriptor_msg
= package
.descriptor_msg
442 self
.log
.message(UpdateDescriptorUpdate())
445 self
.onboarder
.update(descriptor_msg
, project
=self
.project
)
446 except onboard
.UpdateError
as e
:
447 raise MessageException(UpdateDescriptorError(package
.descriptor_file
)) from e
450 class OnboardPackage(downloader
.DownloaderProtocol
):
452 def __init__(self
, log
, loop
, project
, url
, auth
,
453 onboarder
, uploader
, package_store_map
):
456 self
.project
= project
459 self
.onboarder
= onboarder
460 self
.uploader
= uploader
461 self
.package_store_map
= package_store_map
462 self
.project
= project
464 def _onboard_package(self
, packages
):
465 # Extract package could return multiple packages if
466 # the package is converted
468 with pkg
as temp_package
:
469 package_checksums
= self
.validate_package(temp_package
)
470 stored_package
= self
.store_package(temp_package
)
471 self
.validate_vnfd_fields(temp_package
)
474 self
.extract_charms(temp_package
)
475 self
.extract_scripts(temp_package
)
476 self
.extract_configs(temp_package
)
477 self
.extract_icons(temp_package
)
479 self
.onboard_descriptors(temp_package
)
482 self
.delete_stored_package(stored_package
)
486 self
.upload_images(temp_package
, package_checksums
)
488 def extract(self
, packages
):
490 self
._onboard
_package
(packages
)
491 self
.log
.message(OnboardSuccess())
493 except MessageException
as e
:
494 self
.log
.message(e
.msg
)
495 self
.log
.message(OnboardFailure())
497 except Exception as e
:
498 self
.log
.exception(e
)
500 self
.log
.message(OnboardError(str(e
)))
501 self
.log
.message(OnboardFailure())
503 def on_download_succeeded(self
, job
):
504 self
.log
.message(DownloadSuccess("Package downloaded."))
506 extractor
= extract
.UploadPackageExtractor(self
.log
)
507 file_backed_packages
= extractor
.create_packages_from_upload(
508 job
.filename
, job
.filepath
511 self
.extract(file_backed_packages
)
513 def on_download_failed(self
, job
):
514 self
.log
.error(job
.detail
)
515 self
.log
.message(DownloadError("Package download failed. {}".format(job
.detail
)))
516 self
.log
.message(OnboardFailure())
518 def download_package(self
):
520 _
, filename
= tempfile
.mkstemp()
521 url_downloader
= downloader
.UrlDownloader(
525 decompress_on_fly
=True,
527 url_downloader
.delegate
= self
528 url_downloader
.download()
530 def get_package_store(self
, package
):
531 return self
.package_store_map
[package
.descriptor_type
]
533 def store_package(self
, package
):
534 store
= self
.get_package_store(package
)
537 store
.store_package(package
)
538 except rift
.package
.store
.PackageExistsError
as e
:
539 store
.update_package(package
)
541 stored_package
= store
.get_package(package
.descriptor_id
)
543 return stored_package
545 def delete_stored_package(self
, package
):
546 self
.log
.info("Deleting stored package: %s", package
)
547 store
= self
.get_package_store(package
)
549 store
.delete_package(package
.descriptor_id
)
550 except Exception as e
:
551 self
.log
.warning("Failed to delete package from store: %s", str(e
))
553 def upload_images(self
, package
, package_checksums
):
554 image_file_map
= rift
.package
.image
.get_package_image_files(package
)
555 if not image_file_map
:
558 name_hdl_map
= {name
: package
.open(image_file_map
[name
]) for name
in image_file_map
}
560 for image_name
, image_hdl
in name_hdl_map
.items():
561 image_file
= image_file_map
[image_name
]
562 if image_file
in package_checksums
:
563 image_checksum
= package_checksums
[image_file
]
565 self
.log
.warning("checksum not provided for image %s. Calculating checksum",
567 image_checksum
= rift
.package
.checksums
.checksum(
568 package
.open(image_file_map
[image_name
])
571 self
.uploader
.upload_image(image_name
, image_checksum
, image_hdl
)
572 self
.uploader
.upload_image_to_cloud_accounts(image_name
, image_checksum
)
574 except image
.ImageUploadError
as e
:
575 raise MessageException(OnboardImageUploadError()) from e
578 _
= [image_hdl
.close() for image_hdl
in name_hdl_map
.values()]
580 def extract_charms(self
, package
):
582 charm_extractor
= rift
.package
.charm
.PackageCharmExtractor(self
.log
)
583 charm_extractor
.extract_charms(package
)
584 except rift
.package
.charm
.CharmExtractionError
as e
:
585 raise MessageException(OnboardExtractionError()) from e
587 def extract_scripts(self
, package
):
589 script_extractor
= rift
.package
.script
.PackageScriptExtractor(self
.log
)
590 script_extractor
.extract_scripts(package
)
591 except rift
.package
.script
.ScriptExtractionError
as e
:
592 raise MessageException(OnboardExtractionError()) from e
594 def extract_configs(self
, package
):
596 config_extractor
= rift
.package
.config
.PackageConfigExtractor(self
.log
)
597 config_extractor
.extract_configs(package
)
598 except rift
.package
.config
.ConfigExtractionError
as e
:
599 raise MessageException(OnboardExtractionError()) from e
601 def extract_icons(self
, package
):
603 icon_extractor
= rift
.package
.icon
.PackageIconExtractor(self
.log
)
604 icon_extractor
.extract_icons(package
)
605 except rift
.package
.icon
.IconExtractionError
as e
:
606 raise MessageException(OnboardExtractionError()) from e
608 def validate_vnfd_fields(self
, package
):
609 # We can add more VNFD validations here. Currently we are validating only cloud-init
610 if package
.descriptor_msg
is not None:
611 self
.validate_cloud_init_file(package
)
613 def validate_cloud_init_file(self
, package
):
614 """ This validation is for VNFDs with associated VDUs. """
615 if 'vdu' in package
.descriptor_msg
.as_dict():
616 for vdu
in package
.descriptor_msg
.as_dict()['vdu']:
617 if 'cloud_init_file' in vdu
:
618 cloud_init_file
= vdu
['cloud_init_file']
619 for file in package
.files
:
620 if file.endswith('/' + cloud_init_file
) is True:
622 raise MessageException(
623 OnboardError("Cloud-Init file reference in VNFD does not match with cloud-init file"))
625 def validate_package(self
, package
):
626 checksum_validator
= rift
.package
.package
.PackageChecksumValidator(self
.log
)
629 file_checksums
= checksum_validator
.validate(package
)
630 except rift
.package
.package
.PackageFileChecksumError
as e
:
631 raise MessageException(OnboardChecksumMismatch(e
.filename
)) from e
632 except rift
.package
.package
.PackageValidationError
as e
:
633 raise MessageException(OnboardUnreadablePackage()) from e
635 return file_checksums
637 def onboard_descriptors(self
, package
):
638 descriptor_msg
= package
.descriptor_msg
640 self
.log
.message(OnboardDescriptorOnboard())
643 self
.onboarder
.onboard(descriptor_msg
, project
=self
.project
)
644 except onboard
.OnboardError
as e
:
645 raise MessageException(OnboardDescriptorError(package
.descriptor_file
)) from e
648 class UploaderApplication(tornado
.web
.Application
):
651 def from_tasklet(cls
, tasklet
):
652 manifest
= tasklet
.tasklet_info
.get_pb_manifest()
653 use_ssl
= manifest
.bootstrap_phase
.rwsecurity
.use_ssl
654 ssl_cert
= manifest
.bootstrap_phase
.rwsecurity
.cert
655 ssl_key
= manifest
.bootstrap_phase
.rwsecurity
.key
658 ssl
=(ssl_cert
, ssl_key
),
659 vnfd_store
=tasklet
.vnfd_package_store
,
660 nsd_store
=tasklet
.nsd_package_store
)
669 self
.log
= tasklet
.log
670 self
.loop
= tasklet
.loop
671 self
.dts
= tasklet
.dts
676 self
.ssl_cert
, self
.ssl_key
= None, None
679 self
.ssl_cert
, self
.ssl_key
= ssl
682 vnfd_store
= rift
.package
.store
.VnfdPackageFilesystemStore(self
.log
)
685 nsd_store
= rift
.package
.store
.NsdPackageFilesystemStore(self
.log
)
687 self
.messages
= collections
.defaultdict(list)
688 self
.export_dir
= os
.path
.join(os
.environ
['RIFT_ARTIFACTS'], 'launchpad/exports')
690 self
.uploader
= image
.ImageUploader(self
.log
, self
.loop
, self
.dts
)
691 self
.onboarder
= onboard
.DescriptorOnboarder(
692 self
.log
, "127.0.0.1", 8008, self
.use_ssl
, self
.ssl_cert
, self
.ssl_key
694 self
.package_store_map
= {
699 self
.exporter
= export
.DescriptorPackageArchiveExporter(self
.log
)
700 self
.loop
.create_task(export
.periodic_export_cleanup(self
.log
, self
.loop
, self
.export_dir
))
702 self
.get_vnfd_catalog
= tasklet
.get_vnfd_catalog
703 self
.get_nsd_catalog
= tasklet
.get_nsd_catalog
705 "vnfd": self
.get_vnfd_catalog
,
706 "nsd": self
.get_nsd_catalog
709 self
.upload_handler
= UploadRpcHandler(self
)
710 self
.update_handler
= UpdateRpcHandler(self
)
711 self
.export_handler
= export
.ExportRpcHandler(self
, catalog_map
)
713 attrs
= dict(log
=self
.log
, loop
=self
.loop
)
715 super(UploaderApplication
, self
).__init
__([
716 (r
"/api/package/vnfd/(.*)", pkg_handler
.FileRestApiHandler
, {
717 'path': vnfd_store
.root_dir
}),
718 (r
"/api/package/nsd/(.*)", pkg_handler
.FileRestApiHandler
, {
719 'path': nsd_store
.root_dir
}),
721 (r
"/api/upload/([^/]+)/state", UploadStateHandler
, attrs
),
722 (r
"/api/update/([^/]+)/state", UpdateStateHandler
, attrs
),
723 (r
"/api/export/([^/]+)/state", export
.ExportStateHandler
, attrs
),
725 (r
"/api/export/([^/]+.tar.gz)", tornado
.web
.StaticFileHandler
, {
726 "path": self
.export_dir
,
728 (r
"/api/export/([^/]+.zip)", tornado
.web
.StaticFileHandler
, {
729 "path": self
.export_dir
,
735 yield from self
.upload_handler
.register()
736 yield from self
.update_handler
.register()
737 yield from self
.export_handler
.register()
739 def get_logger(self
, transaction_id
):
740 return message
.Logger(self
.log
, self
.messages
[transaction_id
])
742 def onboard(self
, url
, transaction_id
, auth
=None, project
=None):
743 log
= message
.Logger(self
.log
, self
.messages
[transaction_id
])
745 onboard_package
= OnboardPackage(
753 self
.package_store_map
,
756 self
.loop
.run_in_executor(None, onboard_package
.download_package
)
758 def update(self
, url
, transaction_id
, auth
=None, project
=None):
759 log
= message
.Logger(self
.log
, self
.messages
[transaction_id
])
761 update_package
= UpdatePackage(
769 self
.package_store_map
,
772 self
.loop
.run_in_executor(None, update_package
.download_package
)