3 # Copyright 2016 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
30 import tornado
.httputil
31 import tornadostreamform
.multipart_streamer
as multipart_streamer
35 # disable unsigned certificate warning
36 from requests
.packages
.urllib3
.exceptions
import InsecureRequestWarning
37 requests
.packages
.urllib3
.disable_warnings(InsecureRequestWarning
)
40 gi
.require_version('RwLaunchpadYang', '1.0')
41 gi
.require_version('NsdYang', '1.0')
42 gi
.require_version('VnfdYang', '1.0')
44 from gi
.repository
import (
48 import rift
.mano
.cloud
50 import rift
.package
.charm
51 import rift
.package
.checksums
52 import rift
.package
.config
53 import rift
.package
.convert
54 import rift
.package
.handler
as pkg_handler
55 import rift
.package
.icon
56 import rift
.package
.package
57 import rift
.package
.script
58 import rift
.package
.store
60 from gi
.repository
import (
63 import rift
.downloader
as downloader
64 import rift
.mano
.dts
as mano_dts
76 from .message
import (
79 # Onboard Error Messages
80 OnboardChecksumMismatch
,
81 OnboardDescriptorError
,
82 OnboardDescriptorExistsError
,
83 OnboardDescriptorFormatError
,
85 OnboardExtractionError
,
86 OnboardImageUploadError
,
87 OnboardMissingContentBoundary
,
88 OnboardMissingContentType
,
89 OnboardMissingTerminalBoundary
,
90 OnboardUnreadableHeaders
,
91 OnboardUnreadablePackage
,
92 OnboardUnsupportedMediaType
,
94 # Onboard Status Messages
95 OnboardDescriptorOnboard
,
99 OnboardPackageValidation
,
106 # Update Error Messages
107 UpdateChecksumMismatch
,
108 UpdateDescriptorError
,
109 UpdateDescriptorFormatError
,
111 UpdateExtractionError
,
112 UpdateImageUploadError
,
113 UpdateMissingContentBoundary
,
114 UpdateMissingContentType
,
115 UpdatePackageNotFoundError
,
116 UpdateUnreadableHeaders
,
117 UpdateUnreadablePackage
,
118 UpdateUnsupportedMediaType
,
120 # Update Status Messages
121 UpdateDescriptorUpdate
,
122 UpdateDescriptorUpdated
,
129 from .tosca
import ExportTosca
134 MAX_STREAMED_SIZE
= 5 * GB
137 RPC_PACKAGE_CREATE_ENDPOINT
= RwPkgMgmtYang
.YangOutput_RwPkgMgmt_PackageCreate
138 RPC_PACKAGE_UPDATE_ENDPOINT
= RwPkgMgmtYang
.YangOutput_RwPkgMgmt_PackageUpdate
142 class HttpMessageError(Exception):
143 def __init__(self
, code
, msg
):
148 class UploadRpcHandler(mano_dts
.AbstractRpcHandler
):
149 def __init__(self
, log
, dts
, loop
, application
):
152 application: UploaderApplication
154 super().__init
__(log
, dts
, loop
)
155 self
.application
= application
159 return "/rw-pkg-mgmt:package-create"
162 def callback(self
, ks_path
, msg
):
163 transaction_id
= str(uuid
.uuid4())
164 log
= self
.application
.get_logger(transaction_id
)
165 log
.message(OnboardStart())
169 if msg
.username
is not None:
170 auth
= (msg
.username
, msg
.password
)
172 self
.application
.onboard(
178 rpc_op
= RPC_PACKAGE_CREATE_ENDPOINT
.from_dict({
179 "transaction_id": transaction_id
})
184 class UpdateRpcHandler(mano_dts
.AbstractRpcHandler
):
185 def __init__(self
, log
, dts
, loop
, application
):
188 application: UploaderApplication
190 super().__init
__(log
, dts
, loop
)
191 self
.application
= application
195 return "/rw-pkg-mgmt:package-update"
198 def callback(self
, ks_path
, msg
):
200 transaction_id
= str(uuid
.uuid4())
201 log
= self
.application
.get_logger(transaction_id
)
202 log
.message(UpdateStart())
205 if msg
.username
is not None:
206 auth
= (msg
.username
, msg
.password
)
208 self
.application
.update(
214 rpc_op
= RPC_PACKAGE_UPDATE_ENDPOINT
.from_dict({
215 "transaction_id": transaction_id
})
220 class UploadStateHandler(state
.StateHandler
):
221 STARTED
= OnboardStart
222 SUCCESS
= OnboardSuccess
223 FAILURE
= OnboardFailure
226 class UpdateStateHandler(state
.StateHandler
):
227 STARTED
= UpdateStart
228 SUCCESS
= UpdateSuccess
229 FAILURE
= UpdateFailure
232 class UpdatePackage(downloader
.DownloaderProtocol
):
234 def __init__(self
, log
, loop
, url
, auth
,
235 onboarder
, uploader
, package_store_map
):
241 self
.onboarder
= onboarder
242 self
.uploader
= uploader
243 self
.package_store_map
= package_store_map
246 def _update_package(self
, packages
):
248 # Extract package could return multiple packages if
249 # the package is converted
251 with pkg
as temp_package
:
252 package_checksums
= self
.validate_package(temp_package
)
253 stored_package
= self
.update_package(temp_package
)
256 self
.extract_charms(temp_package
)
257 self
.extract_scripts(temp_package
)
258 self
.extract_configs(temp_package
)
259 self
.extract_icons(temp_package
)
261 self
.update_descriptors(temp_package
)
264 self
.delete_stored_package(stored_package
)
268 self
.upload_images(temp_package
, package_checksums
)
270 def extract(self
, packages
):
272 self
._update
_package
(packages
)
273 self
.log
.message(UpdateSuccess())
275 except MessageException
as e
:
276 self
.log
.message(e
.msg
)
277 self
.log
.message(UpdateFailure())
279 except Exception as e
:
280 self
.log
.exception(e
)
282 self
.log
.message(UpdateError(str(e
)))
283 self
.log
.message(UpdateFailure())
285 def on_download_succeeded(self
, job
):
286 self
.log
.message(DownloadSuccess("Package downloaded."))
288 extractor
= extract
.UploadPackageExtractor(self
.log
)
289 file_backed_packages
= extractor
.create_packages_from_upload(
290 job
.filename
, job
.filepath
293 self
.extract(file_backed_packages
)
295 def on_download_failed(self
, job
):
296 self
.log
.error(job
.detail
)
297 self
.log
.message(DownloadError("Package download failed. {}".format(job
.detail
)))
298 self
.log
.message(UpdateFailure())
300 def download_package(self
):
302 _
, filename
= tempfile
.mkstemp()
303 url_downloader
= downloader
.UrlDownloader(
307 decompress_on_fly
=True,
309 url_downloader
.delegate
= self
310 url_downloader
.download()
312 def get_package_store(self
, package
):
313 return self
.package_store_map
[package
.descriptor_type
]
315 def update_package(self
, package
):
316 store
= self
.get_package_store(package
)
319 store
.update_package(package
)
320 except rift
.package
.store
.PackageNotFoundError
as e
:
321 # If the package doesn't exist, then it is possible the descriptor was onboarded
322 # out of band. In that case, just store the package as is
323 self
.log
.warning("Package not found, storing new package instead.")
324 store
.store_package(package
)
326 stored_package
= store
.get_package(package
.descriptor_id
)
328 return stored_package
330 def delete_stored_package(self
, package
):
331 self
.log
.info("Deleting stored package: %s", package
)
332 store
= self
.get_package_store(package
)
334 store
.delete_package(package
.descriptor_id
)
335 except Exception as e
:
336 self
.log
.warning("Failed to delete package from store: %s", str(e
))
338 def upload_images(self
, package
, package_checksums
):
339 image_file_map
= rift
.package
.image
.get_package_image_files(package
)
340 name_hdl_map
= {name
: package
.open(image_file_map
[name
]) for name
in image_file_map
}
341 if not image_file_map
:
345 for image_name
, image_hdl
in name_hdl_map
.items():
346 image_file
= image_file_map
[image_name
]
347 if image_file
in package_checksums
:
348 image_checksum
= package_checksums
[image_file
]
350 self
.log
.warning("checksum not provided for image %s. Calculating checksum",
352 image_checksum
= rift
.package
.checksums
.checksum(
353 package
.open(image_file_map
[image_name
])
356 self
.uploader
.upload_image(image_name
, image_checksum
, image_hdl
)
357 self
.uploader
.upload_image_to_cloud_accounts(image_name
, image_checksum
)
359 except image
.ImageUploadError
as e
:
360 self
.log
.exception("Failed to upload image: %s", image_name
)
361 raise MessageException(OnboardImageUploadError(str(e
))) from e
364 _
= [image_hdl
.close() for image_hdl
in name_hdl_map
.values()]
366 def extract_charms(self
, package
):
368 charm_extractor
= rift
.package
.charm
.PackageCharmExtractor(self
.log
)
369 charm_extractor
.extract_charms(package
)
370 except rift
.package
.charm
.CharmExtractionError
as e
:
371 raise MessageException(UpdateExtractionError()) from e
373 def extract_scripts(self
, package
):
375 script_extractor
= rift
.package
.script
.PackageScriptExtractor(self
.log
)
376 script_extractor
.extract_scripts(package
)
377 except rift
.package
.script
.ScriptExtractionError
as e
:
378 raise MessageException(UpdateExtractionError()) from e
380 def extract_configs(self
, package
):
382 config_extractor
= rift
.package
.config
.PackageConfigExtractor(self
.log
)
383 config_extractor
.extract_configs(package
)
384 except rift
.package
.config
.ConfigExtractionError
as e
:
385 raise MessageException(UpdateExtractionError()) from e
387 def extract_icons(self
, package
):
389 icon_extractor
= rift
.package
.icon
.PackageIconExtractor(self
.log
)
390 icon_extractor
.extract_icons(package
)
391 except rift
.package
.icon
.IconExtractionError
as e
:
392 raise MessageException(UpdateExtractionError()) from e
394 def validate_package(self
, package
):
395 checksum_validator
= rift
.package
.package
.PackageChecksumValidator(self
.log
)
398 file_checksums
= checksum_validator
.validate(package
)
399 except rift
.package
.package
.PackageFileChecksumError
as e
:
400 raise MessageException(UpdateChecksumMismatch(e
.filename
)) from e
401 except rift
.package
.package
.PackageValidationError
as e
:
402 raise MessageException(UpdateUnreadablePackage()) from e
404 return file_checksums
406 def update_descriptors(self
, package
):
407 descriptor_msg
= package
.descriptor_msg
409 self
.log
.message(UpdateDescriptorUpdate())
412 self
.onboarder
.update(descriptor_msg
)
413 except onboard
.UpdateError
as e
:
414 raise MessageException(UpdateDescriptorError(package
.descriptor_file
)) from e
417 class OnboardPackage(downloader
.DownloaderProtocol
):
419 def __init__(self
, log
, loop
, url
, auth
,
420 onboarder
, uploader
, package_store_map
):
425 self
.onboarder
= onboarder
426 self
.uploader
= uploader
427 self
.package_store_map
= package_store_map
429 def _onboard_package(self
, packages
):
430 # Extract package could return multiple packages if
431 # the package is converted
433 with pkg
as temp_package
:
434 package_checksums
= self
.validate_package(temp_package
)
435 stored_package
= self
.store_package(temp_package
)
438 self
.extract_charms(temp_package
)
439 self
.extract_scripts(temp_package
)
440 self
.extract_configs(temp_package
)
441 self
.extract_icons(temp_package
)
443 self
.onboard_descriptors(temp_package
)
446 self
.delete_stored_package(stored_package
)
450 self
.upload_images(temp_package
, package_checksums
)
452 def extract(self
, packages
):
454 self
._onboard
_package
(packages
)
455 self
.log
.message(OnboardSuccess())
457 except MessageException
as e
:
458 self
.log
.message(e
.msg
)
459 self
.log
.message(OnboardFailure())
461 except Exception as e
:
462 self
.log
.exception(e
)
464 self
.log
.message(OnboardError(str(e
)))
465 self
.log
.message(OnboardFailure())
467 def on_download_succeeded(self
, job
):
468 self
.log
.message(DownloadSuccess("Package downloaded."))
470 extractor
= extract
.UploadPackageExtractor(self
.log
)
471 file_backed_packages
= extractor
.create_packages_from_upload(
472 job
.filename
, job
.filepath
475 self
.extract(file_backed_packages
)
477 def on_download_failed(self
, job
):
478 self
.log
.error(job
.detail
)
479 self
.log
.message(DownloadError("Package download failed. {}".format(job
.detail
)))
480 self
.log
.message(OnboardFailure())
482 def download_package(self
):
484 _
, filename
= tempfile
.mkstemp()
485 url_downloader
= downloader
.UrlDownloader(
489 decompress_on_fly
=True,
491 url_downloader
.delegate
= self
492 url_downloader
.download()
494 def get_package_store(self
, package
):
495 return self
.package_store_map
[package
.descriptor_type
]
497 def store_package(self
, package
):
498 store
= self
.get_package_store(package
)
501 store
.store_package(package
)
502 except rift
.package
.store
.PackageExistsError
as e
:
503 store
.update_package(package
)
505 stored_package
= store
.get_package(package
.descriptor_id
)
507 return stored_package
509 def delete_stored_package(self
, package
):
510 self
.log
.info("Deleting stored package: %s", package
)
511 store
= self
.get_package_store(package
)
513 store
.delete_package(package
.descriptor_id
)
514 except Exception as e
:
515 self
.log
.warning("Failed to delete package from store: %s", str(e
))
517 def upload_images(self
, package
, package_checksums
):
518 image_file_map
= rift
.package
.image
.get_package_image_files(package
)
519 if not image_file_map
:
522 name_hdl_map
= {name
: package
.open(image_file_map
[name
]) for name
in image_file_map
}
524 for image_name
, image_hdl
in name_hdl_map
.items():
525 image_file
= image_file_map
[image_name
]
526 if image_file
in package_checksums
:
527 image_checksum
= package_checksums
[image_file
]
529 self
.log
.warning("checksum not provided for image %s. Calculating checksum",
531 image_checksum
= rift
.package
.checksums
.checksum(
532 package
.open(image_file_map
[image_name
])
535 self
.uploader
.upload_image(image_name
, image_checksum
, image_hdl
)
536 self
.uploader
.upload_image_to_cloud_accounts(image_name
, image_checksum
)
538 except image
.ImageUploadError
as e
:
539 raise MessageException(OnboardImageUploadError()) from e
542 _
= [image_hdl
.close() for image_hdl
in name_hdl_map
.values()]
544 def extract_charms(self
, package
):
546 charm_extractor
= rift
.package
.charm
.PackageCharmExtractor(self
.log
)
547 charm_extractor
.extract_charms(package
)
548 except rift
.package
.charm
.CharmExtractionError
as e
:
549 raise MessageException(OnboardExtractionError()) from e
551 def extract_scripts(self
, package
):
553 script_extractor
= rift
.package
.script
.PackageScriptExtractor(self
.log
)
554 script_extractor
.extract_scripts(package
)
555 except rift
.package
.script
.ScriptExtractionError
as e
:
556 raise MessageException(OnboardExtractionError()) from e
558 def extract_configs(self
, package
):
560 config_extractor
= rift
.package
.config
.PackageConfigExtractor(self
.log
)
561 config_extractor
.extract_configs(package
)
562 except rift
.package
.config
.ConfigExtractionError
as e
:
563 raise MessageException(OnboardExtractionError()) from e
565 def extract_icons(self
, package
):
567 icon_extractor
= rift
.package
.icon
.PackageIconExtractor(self
.log
)
568 icon_extractor
.extract_icons(package
)
569 except rift
.package
.icon
.IconExtractionError
as e
:
570 raise MessageException(OnboardExtractionError()) from e
572 def validate_package(self
, package
):
573 checksum_validator
= rift
.package
.package
.PackageChecksumValidator(self
.log
)
576 file_checksums
= checksum_validator
.validate(package
)
577 except rift
.package
.package
.PackageFileChecksumError
as e
:
578 raise MessageException(OnboardChecksumMismatch(e
.filename
)) from e
579 except rift
.package
.package
.PackageValidationError
as e
:
580 raise MessageException(OnboardUnreadablePackage()) from e
582 return file_checksums
584 def onboard_descriptors(self
, package
):
585 descriptor_msg
= package
.descriptor_msg
587 self
.log
.message(OnboardDescriptorOnboard())
590 self
.onboarder
.onboard(descriptor_msg
)
591 except onboard
.OnboardError
as e
:
592 raise MessageException(OnboardDescriptorError(package
.descriptor_file
)) from e
595 class UploaderApplication(tornado
.web
.Application
):
598 def from_tasklet(cls
, tasklet
):
599 manifest
= tasklet
.tasklet_info
.get_pb_manifest()
600 use_ssl
= manifest
.bootstrap_phase
.rwsecurity
.use_ssl
601 ssl_cert
= manifest
.bootstrap_phase
.rwsecurity
.cert
602 ssl_key
= manifest
.bootstrap_phase
.rwsecurity
.key
607 ssl
=(ssl_cert
, ssl_key
),
608 vnfd_store
=tasklet
.vnfd_package_store
,
609 nsd_store
=tasklet
.nsd_package_store
,
610 vnfd_catalog
=tasklet
.vnfd_catalog
,
611 nsd_catalog
=tasklet
.nsd_catalog
)
629 self
.ssl_cert
, self
.ssl_key
= None, None
632 self
.ssl_cert
, self
.ssl_key
= ssl
635 vnfd_store
= rift
.package
.store
.VnfdPackageFilesystemStore(self
.log
)
638 nsd_store
= rift
.package
.store
.NsdPackageFilesystemStore(self
.log
)
641 self
.messages
= collections
.defaultdict(list)
642 self
.export_dir
= os
.path
.join(os
.environ
['RIFT_ARTIFACTS'], 'launchpad/exports')
644 self
.uploader
= image
.ImageUploader(self
.log
, self
.loop
, self
.dts
)
645 self
.onboarder
= onboard
.DescriptorOnboarder(
646 self
.log
, "127.0.0.1", 8008, self
.use_ssl
, self
.ssl_cert
, self
.ssl_key
648 self
.package_store_map
= {
653 self
.exporter
= export
.DescriptorPackageArchiveExporter(self
.log
)
654 self
.loop
.create_task(export
.periodic_export_cleanup(self
.log
, self
.loop
, self
.export_dir
))
656 self
.vnfd_catalog
= vnfd_catalog
657 self
.nsd_catalog
= nsd_catalog
659 "vnfd": self
.vnfd_catalog
,
660 "nsd": self
.nsd_catalog
663 self
.upload_handler
= UploadRpcHandler(self
.log
, self
.dts
, self
.loop
, self
)
664 self
.update_handler
= UpdateRpcHandler(self
.log
, self
.dts
, self
.loop
, self
)
665 self
.export_handler
= export
.ExportRpcHandler(
670 store_map
=self
.package_store_map
,
671 exporter
=self
.exporter
,
672 catalog_map
=catalog_map
675 attrs
= dict(log
=self
.log
, loop
=self
.loop
)
677 super(UploaderApplication
, self
).__init
__([
678 (r
"/api/package/vnfd/(.*)", pkg_handler
.FileRestApiHandler
, {
679 'path': vnfd_store
.root_dir
}),
680 (r
"/api/package/nsd/(.*)", pkg_handler
.FileRestApiHandler
, {
681 'path': nsd_store
.root_dir
}),
683 (r
"/api/upload/([^/]+)/state", UploadStateHandler
, attrs
),
684 (r
"/api/update/([^/]+)/state", UpdateStateHandler
, attrs
),
685 (r
"/api/export/([^/]+)/state", export
.ExportStateHandler
, attrs
),
687 (r
"/api/export/([^/]+.tar.gz)", tornado
.web
.StaticFileHandler
, {
688 "path": self
.export_dir
,
690 (r
"/api/export/([^/]+.zip)", tornado
.web
.StaticFileHandler
, {
691 "path": self
.export_dir
,
697 yield from self
.upload_handler
.register()
698 yield from self
.update_handler
.register()
699 yield from self
.export_handler
.register()
701 def get_logger(self
, transaction_id
):
702 return message
.Logger(self
.log
, self
.messages
[transaction_id
])
704 def onboard(self
, url
, transaction_id
, auth
=None):
705 log
= message
.Logger(self
.log
, self
.messages
[transaction_id
])
707 onboard_package
= OnboardPackage(
714 self
.package_store_map
,
717 self
.loop
.run_in_executor(None, onboard_package
.download_package
)
719 def update(self
, url
, transaction_id
, auth
=None):
720 log
= message
.Logger(self
.log
, self
.messages
[transaction_id
])
722 update_package
= UpdatePackage(
729 self
.package_store_map
,
732 self
.loop
.run_in_executor(None, update_package
.download_package
)