3 # Copyright 2016 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
30 import tornado
.httputil
31 import tornadostreamform
.multipart_streamer
as multipart_streamer
35 # disable unsigned certificate warning
36 from requests
.packages
.urllib3
.exceptions
import InsecureRequestWarning
37 requests
.packages
.urllib3
.disable_warnings(InsecureRequestWarning
)
40 gi
.require_version('RwLaunchpadYang', '1.0')
41 gi
.require_version('NsdYang', '1.0')
42 gi
.require_version('VnfdYang', '1.0')
44 from gi
.repository
import (
48 import rift
.mano
.cloud
50 import rift
.package
.charm
51 import rift
.package
.checksums
52 import rift
.package
.config
53 import rift
.package
.convert
54 import rift
.package
.handler
as pkg_handler
55 import rift
.package
.icon
56 import rift
.package
.package
57 import rift
.package
.script
58 import rift
.package
.store
60 from gi
.repository
import (
63 import rift
.downloader
as downloader
64 import rift
.mano
.dts
as mano_dts
76 from .message
import (
79 # Onboard Error Messages
80 OnboardChecksumMismatch
,
81 OnboardDescriptorError
,
82 OnboardDescriptorExistsError
,
83 OnboardDescriptorFormatError
,
85 OnboardExtractionError
,
86 OnboardImageUploadError
,
87 OnboardMissingContentBoundary
,
88 OnboardMissingContentType
,
89 OnboardMissingTerminalBoundary
,
90 OnboardUnreadableHeaders
,
91 OnboardUnreadablePackage
,
92 OnboardUnsupportedMediaType
,
94 # Onboard Status Messages
95 OnboardDescriptorOnboard
,
99 OnboardPackageValidation
,
106 # Update Error Messages
107 UpdateChecksumMismatch
,
108 UpdateDescriptorError
,
109 UpdateDescriptorFormatError
,
111 UpdateExtractionError
,
112 UpdateImageUploadError
,
113 UpdateMissingContentBoundary
,
114 UpdateMissingContentType
,
115 UpdatePackageNotFoundError
,
116 UpdateUnreadableHeaders
,
117 UpdateUnreadablePackage
,
118 UpdateUnsupportedMediaType
,
120 # Update Status Messages
121 UpdateDescriptorUpdate
,
122 UpdateDescriptorUpdated
,
129 from .tosca
import ExportTosca
134 MAX_STREAMED_SIZE
= 5 * GB
137 RPC_PACKAGE_CREATE_ENDPOINT
= RwPkgMgmtYang
.YangOutput_RwPkgMgmt_PackageCreate
138 RPC_PACKAGE_UPDATE_ENDPOINT
= RwPkgMgmtYang
.YangOutput_RwPkgMgmt_PackageUpdate
142 class HttpMessageError(Exception):
143 def __init__(self
, code
, msg
):
148 class UploadRpcHandler(mano_dts
.AbstractRpcHandler
):
149 def __init__(self
, log
, dts
, loop
, application
):
152 application: UploaderApplication
154 super().__init
__(log
, dts
, loop
)
155 self
.application
= application
159 return "/rw-pkg-mgmt:package-create"
162 def callback(self
, ks_path
, msg
):
163 transaction_id
= str(uuid
.uuid4())
164 log
= self
.application
.get_logger(transaction_id
)
165 log
.message(OnboardStart())
169 if msg
.username
is not None:
170 auth
= (msg
.username
, msg
.password
)
172 self
.application
.onboard(
178 rpc_op
= RPC_PACKAGE_CREATE_ENDPOINT
.from_dict({
179 "transaction_id": transaction_id
})
184 class UpdateRpcHandler(mano_dts
.AbstractRpcHandler
):
185 def __init__(self
, log
, dts
, loop
, application
):
188 application: UploaderApplication
190 super().__init
__(log
, dts
, loop
)
191 self
.application
= application
195 return "/rw-pkg-mgmt:package-update"
198 def callback(self
, ks_path
, msg
):
200 transaction_id
= str(uuid
.uuid4())
201 log
= self
.application
.get_logger(transaction_id
)
202 log
.message(UpdateStart())
205 if msg
.username
is not None:
206 auth
= (msg
.username
, msg
.password
)
208 self
.application
.update(
214 rpc_op
= RPC_PACKAGE_UPDATE_ENDPOINT
.from_dict({
215 "transaction_id": transaction_id
})
220 class UploadStateHandler(state
.StateHandler
):
221 STARTED
= OnboardStart
222 SUCCESS
= OnboardSuccess
223 FAILURE
= OnboardFailure
226 class UpdateStateHandler(state
.StateHandler
):
227 STARTED
= UpdateStart
228 SUCCESS
= UpdateSuccess
229 FAILURE
= UpdateFailure
232 class UpdatePackage(downloader
.DownloaderProtocol
):
234 def __init__(self
, log
, loop
, url
, auth
,
235 onboarder
, uploader
, package_store_map
):
241 self
.onboarder
= onboarder
242 self
.uploader
= uploader
243 self
.package_store_map
= package_store_map
246 def _update_package(self
, packages
):
248 # Extract package could return multiple packages if
249 # the package is converted
251 with pkg
as temp_package
:
252 package_checksums
= self
.validate_package(temp_package
)
253 stored_package
= self
.update_package(temp_package
)
254 self
.validate_vnfd_fields(temp_package
)
257 self
.extract_charms(temp_package
)
258 self
.extract_scripts(temp_package
)
259 self
.extract_configs(temp_package
)
260 self
.extract_icons(temp_package
)
262 self
.update_descriptors(temp_package
)
265 self
.delete_stored_package(stored_package
)
269 self
.upload_images(temp_package
, package_checksums
)
271 def extract(self
, packages
):
273 self
._update
_package
(packages
)
274 self
.log
.message(UpdateSuccess())
276 except MessageException
as e
:
277 self
.log
.message(e
.msg
)
278 self
.log
.message(UpdateFailure())
280 except Exception as e
:
281 self
.log
.exception(e
)
283 self
.log
.message(UpdateError(str(e
)))
284 self
.log
.message(UpdateFailure())
286 def on_download_succeeded(self
, job
):
287 self
.log
.message(DownloadSuccess("Package downloaded."))
289 extractor
= extract
.UploadPackageExtractor(self
.log
)
290 file_backed_packages
= extractor
.create_packages_from_upload(
291 job
.filename
, job
.filepath
294 self
.extract(file_backed_packages
)
296 def on_download_failed(self
, job
):
297 self
.log
.error(job
.detail
)
298 self
.log
.message(DownloadError("Package download failed. {}".format(job
.detail
)))
299 self
.log
.message(UpdateFailure())
301 def download_package(self
):
303 _
, filename
= tempfile
.mkstemp()
304 url_downloader
= downloader
.UrlDownloader(
308 decompress_on_fly
=True,
310 url_downloader
.delegate
= self
311 url_downloader
.download()
313 def get_package_store(self
, package
):
314 return self
.package_store_map
[package
.descriptor_type
]
316 def update_package(self
, package
):
317 store
= self
.get_package_store(package
)
320 store
.update_package(package
)
321 except rift
.package
.store
.PackageNotFoundError
as e
:
322 # If the package doesn't exist, then it is possible the descriptor was onboarded
323 # out of band. In that case, just store the package as is
324 self
.log
.warning("Package not found, storing new package instead.")
325 store
.store_package(package
)
327 stored_package
= store
.get_package(package
.descriptor_id
)
329 return stored_package
331 def delete_stored_package(self
, package
):
332 self
.log
.info("Deleting stored package: %s", package
)
333 store
= self
.get_package_store(package
)
335 store
.delete_package(package
.descriptor_id
)
336 except Exception as e
:
337 self
.log
.warning("Failed to delete package from store: %s", str(e
))
339 def upload_images(self
, package
, package_checksums
):
340 image_file_map
= rift
.package
.image
.get_package_image_files(package
)
341 name_hdl_map
= {name
: package
.open(image_file_map
[name
]) for name
in image_file_map
}
342 if not image_file_map
:
346 for image_name
, image_hdl
in name_hdl_map
.items():
347 image_file
= image_file_map
[image_name
]
348 if image_file
in package_checksums
:
349 image_checksum
= package_checksums
[image_file
]
351 self
.log
.warning("checksum not provided for image %s. Calculating checksum",
353 image_checksum
= rift
.package
.checksums
.checksum(
354 package
.open(image_file_map
[image_name
])
357 self
.uploader
.upload_image(image_name
, image_checksum
, image_hdl
)
358 self
.uploader
.upload_image_to_cloud_accounts(image_name
, image_checksum
)
360 except image
.ImageUploadError
as e
:
361 self
.log
.exception("Failed to upload image: %s", image_name
)
362 raise MessageException(OnboardImageUploadError(str(e
))) from e
365 _
= [image_hdl
.close() for image_hdl
in name_hdl_map
.values()]
367 def extract_charms(self
, package
):
369 charm_extractor
= rift
.package
.charm
.PackageCharmExtractor(self
.log
)
370 charm_extractor
.extract_charms(package
)
371 except rift
.package
.charm
.CharmExtractionError
as e
:
372 raise MessageException(UpdateExtractionError()) from e
374 def extract_scripts(self
, package
):
376 script_extractor
= rift
.package
.script
.PackageScriptExtractor(self
.log
)
377 script_extractor
.extract_scripts(package
)
378 except rift
.package
.script
.ScriptExtractionError
as e
:
379 raise MessageException(UpdateExtractionError()) from e
381 def extract_configs(self
, package
):
383 config_extractor
= rift
.package
.config
.PackageConfigExtractor(self
.log
)
384 config_extractor
.extract_configs(package
)
385 except rift
.package
.config
.ConfigExtractionError
as e
:
386 raise MessageException(UpdateExtractionError()) from e
388 def extract_icons(self
, package
):
390 icon_extractor
= rift
.package
.icon
.PackageIconExtractor(self
.log
)
391 icon_extractor
.extract_icons(package
)
392 except rift
.package
.icon
.IconExtractionError
as e
:
393 raise MessageException(UpdateExtractionError()) from e
395 def validate_vnfd_fields(self
, package
):
396 # We can add more VNFD validations here. Currently we are validating only cloud-init
397 if package
.descriptor_msg
is not None:
398 self
.validate_cloud_init_file(package
)
400 def validate_cloud_init_file(self
, package
):
401 """ This validation is for VNFDs with associated VDUs. """
402 if 'vdu' in package
.descriptor_msg
.as_dict():
403 for vdu
in package
.descriptor_msg
.as_dict()['vdu']:
404 if 'cloud_init_file' in vdu
:
405 cloud_init_file
= vdu
['cloud_init_file']
406 for file in package
.files
:
407 if file.endswith('/' + cloud_init_file
) is True:
409 raise MessageException(
410 OnboardError("Cloud-Init file reference in VNFD does not match with cloud-init file"))
412 def validate_package(self
, package
):
413 checksum_validator
= rift
.package
.package
.PackageChecksumValidator(self
.log
)
416 file_checksums
= checksum_validator
.validate(package
)
417 except rift
.package
.package
.PackageFileChecksumError
as e
:
418 raise MessageException(UpdateChecksumMismatch(e
.filename
)) from e
419 except rift
.package
.package
.PackageValidationError
as e
:
420 raise MessageException(UpdateUnreadablePackage()) from e
422 return file_checksums
424 def update_descriptors(self
, package
):
425 descriptor_msg
= package
.descriptor_msg
427 self
.log
.message(UpdateDescriptorUpdate())
430 self
.onboarder
.update(descriptor_msg
)
431 except onboard
.UpdateError
as e
:
432 raise MessageException(UpdateDescriptorError(package
.descriptor_file
)) from e
435 class OnboardPackage(downloader
.DownloaderProtocol
):
437 def __init__(self
, log
, loop
, url
, auth
,
438 onboarder
, uploader
, package_store_map
):
443 self
.onboarder
= onboarder
444 self
.uploader
= uploader
445 self
.package_store_map
= package_store_map
447 def _onboard_package(self
, packages
):
448 # Extract package could return multiple packages if
449 # the package is converted
451 with pkg
as temp_package
:
452 package_checksums
= self
.validate_package(temp_package
)
453 stored_package
= self
.store_package(temp_package
)
454 self
.validate_vnfd_fields(temp_package
)
457 self
.extract_charms(temp_package
)
458 self
.extract_scripts(temp_package
)
459 self
.extract_configs(temp_package
)
460 self
.extract_icons(temp_package
)
462 self
.onboard_descriptors(temp_package
)
465 self
.delete_stored_package(stored_package
)
469 self
.upload_images(temp_package
, package_checksums
)
471 def extract(self
, packages
):
473 self
._onboard
_package
(packages
)
474 self
.log
.message(OnboardSuccess())
476 except MessageException
as e
:
477 self
.log
.message(e
.msg
)
478 self
.log
.message(OnboardFailure())
480 except Exception as e
:
481 self
.log
.exception(e
)
483 self
.log
.message(OnboardError(str(e
)))
484 self
.log
.message(OnboardFailure())
486 def on_download_succeeded(self
, job
):
487 self
.log
.message(DownloadSuccess("Package downloaded."))
489 extractor
= extract
.UploadPackageExtractor(self
.log
)
490 file_backed_packages
= extractor
.create_packages_from_upload(
491 job
.filename
, job
.filepath
494 self
.extract(file_backed_packages
)
496 def on_download_failed(self
, job
):
497 self
.log
.error(job
.detail
)
498 self
.log
.message(DownloadError("Package download failed. {}".format(job
.detail
)))
499 self
.log
.message(OnboardFailure())
501 def download_package(self
):
503 _
, filename
= tempfile
.mkstemp()
504 url_downloader
= downloader
.UrlDownloader(
508 decompress_on_fly
=True,
510 url_downloader
.delegate
= self
511 url_downloader
.download()
513 def get_package_store(self
, package
):
514 return self
.package_store_map
[package
.descriptor_type
]
516 def store_package(self
, package
):
517 store
= self
.get_package_store(package
)
520 store
.store_package(package
)
521 except rift
.package
.store
.PackageExistsError
as e
:
522 store
.update_package(package
)
524 stored_package
= store
.get_package(package
.descriptor_id
)
526 return stored_package
528 def delete_stored_package(self
, package
):
529 self
.log
.info("Deleting stored package: %s", package
)
530 store
= self
.get_package_store(package
)
532 store
.delete_package(package
.descriptor_id
)
533 except Exception as e
:
534 self
.log
.warning("Failed to delete package from store: %s", str(e
))
536 def upload_images(self
, package
, package_checksums
):
537 image_file_map
= rift
.package
.image
.get_package_image_files(package
)
538 if not image_file_map
:
541 name_hdl_map
= {name
: package
.open(image_file_map
[name
]) for name
in image_file_map
}
543 for image_name
, image_hdl
in name_hdl_map
.items():
544 image_file
= image_file_map
[image_name
]
545 if image_file
in package_checksums
:
546 image_checksum
= package_checksums
[image_file
]
548 self
.log
.warning("checksum not provided for image %s. Calculating checksum",
550 image_checksum
= rift
.package
.checksums
.checksum(
551 package
.open(image_file_map
[image_name
])
554 self
.uploader
.upload_image(image_name
, image_checksum
, image_hdl
)
555 self
.uploader
.upload_image_to_cloud_accounts(image_name
, image_checksum
)
557 except image
.ImageUploadError
as e
:
558 raise MessageException(OnboardImageUploadError()) from e
561 _
= [image_hdl
.close() for image_hdl
in name_hdl_map
.values()]
563 def extract_charms(self
, package
):
565 charm_extractor
= rift
.package
.charm
.PackageCharmExtractor(self
.log
)
566 charm_extractor
.extract_charms(package
)
567 except rift
.package
.charm
.CharmExtractionError
as e
:
568 raise MessageException(OnboardExtractionError()) from e
570 def extract_scripts(self
, package
):
572 script_extractor
= rift
.package
.script
.PackageScriptExtractor(self
.log
)
573 script_extractor
.extract_scripts(package
)
574 except rift
.package
.script
.ScriptExtractionError
as e
:
575 raise MessageException(OnboardExtractionError()) from e
577 def extract_configs(self
, package
):
579 config_extractor
= rift
.package
.config
.PackageConfigExtractor(self
.log
)
580 config_extractor
.extract_configs(package
)
581 except rift
.package
.config
.ConfigExtractionError
as e
:
582 raise MessageException(OnboardExtractionError()) from e
584 def extract_icons(self
, package
):
586 icon_extractor
= rift
.package
.icon
.PackageIconExtractor(self
.log
)
587 icon_extractor
.extract_icons(package
)
588 except rift
.package
.icon
.IconExtractionError
as e
:
589 raise MessageException(OnboardExtractionError()) from e
591 def validate_vnfd_fields(self
, package
):
592 # We can add more VNFD validations here. Currently we are validating only cloud-init
593 if package
.descriptor_msg
is not None:
594 self
.validate_cloud_init_file(package
)
596 def validate_cloud_init_file(self
, package
):
597 """ This validation is for VNFDs with associated VDUs. """
598 if 'vdu' in package
.descriptor_msg
.as_dict():
599 for vdu
in package
.descriptor_msg
.as_dict()['vdu']:
600 if 'cloud_init_file' in vdu
:
601 cloud_init_file
= vdu
['cloud_init_file']
602 for file in package
.files
:
603 if file.endswith('/' + cloud_init_file
) is True:
605 raise MessageException(
606 OnboardError("Cloud-Init file reference in VNFD does not match with cloud-init file"))
608 def validate_package(self
, package
):
610 rift
.package
.package
.PackageChecksumValidator(self
.log
),
611 rift
.package
.package
.PackageConstructValidator(self
.log
),
614 # Run the validators for checksum and package construction for imported pkgs
615 for validator
in validators
:
617 validator
.validate(package
)
619 except rift
.package
.package
.PackageFileChecksumError
as e
:
620 raise MessageException(OnboardChecksumMismatch(e
.filename
)) from e
621 except rift
.package
.package
.PackageValidationError
as e
:
622 raise MessageException(OnboardUnreadablePackage()) from e
624 return validators
[0].checksums
626 def onboard_descriptors(self
, package
):
627 descriptor_msg
= package
.descriptor_msg
629 self
.log
.message(OnboardDescriptorOnboard())
632 self
.onboarder
.onboard(descriptor_msg
)
633 except onboard
.OnboardError
as e
:
634 raise MessageException(OnboardDescriptorError(package
.descriptor_file
)) from e
637 class UploaderApplication(tornado
.web
.Application
):
640 def from_tasklet(cls
, tasklet
):
641 manifest
= tasklet
.tasklet_info
.get_pb_manifest()
642 use_ssl
= manifest
.bootstrap_phase
.rwsecurity
.use_ssl
643 ssl_cert
= manifest
.bootstrap_phase
.rwsecurity
.cert
644 ssl_key
= manifest
.bootstrap_phase
.rwsecurity
.key
649 ssl
=(ssl_cert
, ssl_key
),
650 vnfd_store
=tasklet
.vnfd_package_store
,
651 nsd_store
=tasklet
.nsd_package_store
,
652 vnfd_catalog
=tasklet
.vnfd_catalog
,
653 nsd_catalog
=tasklet
.nsd_catalog
)
671 self
.ssl_cert
, self
.ssl_key
= None, None
674 self
.ssl_cert
, self
.ssl_key
= ssl
677 vnfd_store
= rift
.package
.store
.VnfdPackageFilesystemStore(self
.log
)
680 nsd_store
= rift
.package
.store
.NsdPackageFilesystemStore(self
.log
)
683 self
.messages
= collections
.defaultdict(list)
684 self
.export_dir
= os
.path
.join(os
.environ
['RIFT_ARTIFACTS'], 'launchpad/exports')
686 self
.uploader
= image
.ImageUploader(self
.log
, self
.loop
, self
.dts
)
687 self
.onboarder
= onboard
.DescriptorOnboarder(
688 self
.log
, "127.0.0.1", 8008, self
.use_ssl
, self
.ssl_cert
, self
.ssl_key
690 self
.package_store_map
= {
695 self
.exporter
= export
.DescriptorPackageArchiveExporter(self
.log
)
696 self
.loop
.create_task(export
.periodic_export_cleanup(self
.log
, self
.loop
, self
.export_dir
))
698 self
.vnfd_catalog
= vnfd_catalog
699 self
.nsd_catalog
= nsd_catalog
701 "vnfd": self
.vnfd_catalog
,
702 "nsd": self
.nsd_catalog
705 self
.upload_handler
= UploadRpcHandler(self
.log
, self
.dts
, self
.loop
, self
)
706 self
.update_handler
= UpdateRpcHandler(self
.log
, self
.dts
, self
.loop
, self
)
707 self
.export_handler
= export
.ExportRpcHandler(
712 store_map
=self
.package_store_map
,
713 exporter
=self
.exporter
,
714 onboarder
=self
.onboarder
,
715 catalog_map
=catalog_map
718 attrs
= dict(log
=self
.log
, loop
=self
.loop
)
720 super(UploaderApplication
, self
).__init
__([
721 (r
"/api/package/vnfd/(.*)", pkg_handler
.FileRestApiHandler
, {
722 'path': vnfd_store
.root_dir
}),
723 (r
"/api/package/nsd/(.*)", pkg_handler
.FileRestApiHandler
, {
724 'path': nsd_store
.root_dir
}),
726 (r
"/api/upload/([^/]+)/state", UploadStateHandler
, attrs
),
727 (r
"/api/update/([^/]+)/state", UpdateStateHandler
, attrs
),
728 (r
"/api/export/([^/]+)/state", export
.ExportStateHandler
, attrs
),
730 (r
"/api/export/([^/]+.tar.gz)", tornado
.web
.StaticFileHandler
, {
731 "path": self
.export_dir
,
733 (r
"/api/export/([^/]+.zip)", tornado
.web
.StaticFileHandler
, {
734 "path": self
.export_dir
,
740 yield from self
.upload_handler
.register()
741 yield from self
.update_handler
.register()
742 yield from self
.export_handler
.register()
744 def get_logger(self
, transaction_id
):
745 return message
.Logger(self
.log
, self
.messages
[transaction_id
])
747 def onboard(self
, url
, transaction_id
, auth
=None):
748 log
= message
.Logger(self
.log
, self
.messages
[transaction_id
])
750 onboard_package
= OnboardPackage(
757 self
.package_store_map
,
760 self
.loop
.run_in_executor(None, onboard_package
.download_package
)
762 def update(self
, url
, transaction_id
, auth
=None):
763 log
= message
.Logger(self
.log
, self
.messages
[transaction_id
])
765 update_package
= UpdatePackage(
772 self
.package_store_map
,
775 self
.loop
.run_in_executor(None, update_package
.download_package
)