c8fc3fc5fadeded0ef376f46de57bef7fde4a462
[osm/SO.git] / rwlaunchpad / plugins / rwlaunchpadtasklet / rift / tasklets / rwlaunchpad / uploader.py
1
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17 import abc
18 import asyncio
19 import collections
20 import os
21 import tempfile
22 import threading
23 import uuid
24 import zlib
25
26 import tornado
27 import tornado.escape
28 import tornado.ioloop
29 import tornado.web
30 import tornado.httputil
31 import tornadostreamform.multipart_streamer as multipart_streamer
32
33 import requests
34
35 # disable unsigned certificate warning
36 from requests.packages.urllib3.exceptions import InsecureRequestWarning
37 requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
38
39 import gi
40 gi.require_version('RwLaunchpadYang', '1.0')
41 gi.require_version('NsdYang', '1.0')
42 gi.require_version('VnfdYang', '1.0')
43
44 from gi.repository import (
45 NsdYang,
46 VnfdYang,
47 )
48 import rift.mano.cloud
49
50 import rift.package.charm
51 import rift.package.checksums
52 import rift.package.config
53 import rift.package.convert
54 import rift.package.handler as pkg_handler
55 import rift.package.icon
56 import rift.package.package
57 import rift.package.script
58 import rift.package.store
59
60 from gi.repository import (
61 RwDts as rwdts,
62 RwPkgMgmtYang)
63 import rift.downloader as downloader
64 import rift.mano.dts as mano_dts
65 import rift.tasklets
66
67 from . import (
68 export,
69 extract,
70 image,
71 message,
72 onboard,
73 state,
74 )
75
76 from .message import (
77 MessageException,
78
79 # Onboard Error Messages
80 OnboardChecksumMismatch,
81 OnboardDescriptorError,
82 OnboardDescriptorExistsError,
83 OnboardDescriptorFormatError,
84 OnboardError,
85 OnboardExtractionError,
86 OnboardImageUploadError,
87 OnboardMissingContentBoundary,
88 OnboardMissingContentType,
89 OnboardMissingTerminalBoundary,
90 OnboardUnreadableHeaders,
91 OnboardUnreadablePackage,
92 OnboardUnsupportedMediaType,
93
94 # Onboard Status Messages
95 OnboardDescriptorOnboard,
96 OnboardFailure,
97 OnboardImageUpload,
98 OnboardPackageUpload,
99 OnboardPackageValidation,
100 OnboardStart,
101 OnboardSuccess,
102
103 DownloadError,
104 DownloadSuccess,
105
106 # Update Error Messages
107 UpdateChecksumMismatch,
108 UpdateDescriptorError,
109 UpdateDescriptorFormatError,
110 UpdateError,
111 UpdateExtractionError,
112 UpdateImageUploadError,
113 UpdateMissingContentBoundary,
114 UpdateMissingContentType,
115 UpdatePackageNotFoundError,
116 UpdateUnreadableHeaders,
117 UpdateUnreadablePackage,
118 UpdateUnsupportedMediaType,
119
120 # Update Status Messages
121 UpdateDescriptorUpdate,
122 UpdateDescriptorUpdated,
123 UpdatePackageUpload,
124 UpdateStart,
125 UpdateSuccess,
126 UpdateFailure,
127 )
128
129 from .tosca import ExportTosca
130
131 MB = 1024 * 1024
132 GB = 1024 * MB
133
134 MAX_STREAMED_SIZE = 5 * GB
135
136 # Shortcuts
137 RPC_PACKAGE_CREATE_ENDPOINT = RwPkgMgmtYang.YangOutput_RwPkgMgmt_PackageCreate
138 RPC_PACKAGE_UPDATE_ENDPOINT = RwPkgMgmtYang.YangOutput_RwPkgMgmt_PackageUpdate
139
140
141
142 class HttpMessageError(Exception):
143 def __init__(self, code, msg):
144 self.code = code
145 self.msg = msg
146
147
148 class UploadRpcHandler(mano_dts.AbstractRpcHandler):
149 def __init__(self, log, dts, loop, application):
150 """
151 Args:
152 application: UploaderApplication
153 """
154 super().__init__(log, dts, loop)
155 self.application = application
156
157 @property
158 def xpath(self):
159 return "/rw-pkg-mgmt:package-create"
160
161 @asyncio.coroutine
162 def callback(self, ks_path, msg):
163 transaction_id = str(uuid.uuid4())
164 log = self.application.get_logger(transaction_id)
165 log.message(OnboardStart())
166
167
168 auth = None
169 if msg.username is not None:
170 auth = (msg.username, msg.password)
171
172 self.application.onboard(
173 msg.external_url,
174 transaction_id,
175 auth=auth
176 )
177
178 rpc_op = RPC_PACKAGE_CREATE_ENDPOINT.from_dict({
179 "transaction_id": transaction_id})
180
181 return rpc_op
182
183
184 class UpdateRpcHandler(mano_dts.AbstractRpcHandler):
185 def __init__(self, log, dts, loop, application):
186 """
187 Args:
188 application: UploaderApplication
189 """
190 super().__init__(log, dts, loop)
191 self.application = application
192
193 @property
194 def xpath(self):
195 return "/rw-pkg-mgmt:package-update"
196
197 @asyncio.coroutine
198 def callback(self, ks_path, msg):
199
200 transaction_id = str(uuid.uuid4())
201 log = self.application.get_logger(transaction_id)
202 log.message(UpdateStart())
203
204 auth = None
205 if msg.username is not None:
206 auth = (msg.username, msg.password)
207
208 self.application.update(
209 msg.external_url,
210 transaction_id,
211 auth=auth
212 )
213
214 rpc_op = RPC_PACKAGE_UPDATE_ENDPOINT.from_dict({
215 "transaction_id": transaction_id})
216
217 return rpc_op
218
219
220 class UploadStateHandler(state.StateHandler):
221 STARTED = OnboardStart
222 SUCCESS = OnboardSuccess
223 FAILURE = OnboardFailure
224
225
226 class UpdateStateHandler(state.StateHandler):
227 STARTED = UpdateStart
228 SUCCESS = UpdateSuccess
229 FAILURE = UpdateFailure
230
231
232 class UpdatePackage(downloader.DownloaderProtocol):
233
234 def __init__(self, log, loop, url, auth,
235 onboarder, uploader, package_store_map):
236 super().__init__()
237 self.log = log
238 self.loop = loop
239 self.url = url
240 self.auth = auth
241 self.onboarder = onboarder
242 self.uploader = uploader
243 self.package_store_map = package_store_map
244
245
246 def _update_package(self, packages):
247
248 # Extract package could return multiple packages if
249 # the package is converted
250 for pkg in packages:
251 with pkg as temp_package:
252 package_checksums = self.validate_package(temp_package)
253 stored_package = self.update_package(temp_package)
254
255 try:
256 self.extract_charms(temp_package)
257 self.extract_scripts(temp_package)
258 self.extract_configs(temp_package)
259 self.extract_icons(temp_package)
260
261 self.update_descriptors(temp_package)
262
263 except Exception:
264 self.delete_stored_package(stored_package)
265 raise
266
267 else:
268 self.upload_images(temp_package, package_checksums)
269
270 def extract(self, packages):
271 try:
272 self._update_package(packages)
273 self.log.message(UpdateSuccess())
274
275 except MessageException as e:
276 self.log.message(e.msg)
277 self.log.message(UpdateFailure())
278
279 except Exception as e:
280 self.log.exception(e)
281 if str(e):
282 self.log.message(UpdateError(str(e)))
283 self.log.message(UpdateFailure())
284
285 def on_download_succeeded(self, job):
286 self.log.message(DownloadSuccess("Package downloaded."))
287
288 extractor = extract.UploadPackageExtractor(self.log)
289 file_backed_packages = extractor.create_packages_from_upload(
290 job.filename, job.filepath
291 )
292
293 self.extract(file_backed_packages)
294
295 def on_download_failed(self, job):
296 self.log.error(job.detail)
297 self.log.message(DownloadError("Package download failed. {}".format(job.detail)))
298 self.log.message(UpdateFailure())
299
300 def download_package(self):
301
302 _, filename = tempfile.mkstemp()
303 url_downloader = downloader.UrlDownloader(
304 self.url,
305 auth=self.auth,
306 file_obj=filename,
307 decompress_on_fly=True,
308 log=self.log)
309 url_downloader.delegate = self
310 url_downloader.download()
311
312 def get_package_store(self, package):
313 return self.package_store_map[package.descriptor_type]
314
315 def update_package(self, package):
316 store = self.get_package_store(package)
317
318 try:
319 store.update_package(package)
320 except rift.package.store.PackageNotFoundError as e:
321 # If the package doesn't exist, then it is possible the descriptor was onboarded
322 # out of band. In that case, just store the package as is
323 self.log.warning("Package not found, storing new package instead.")
324 store.store_package(package)
325
326 stored_package = store.get_package(package.descriptor_id)
327
328 return stored_package
329
330 def delete_stored_package(self, package):
331 self.log.info("Deleting stored package: %s", package)
332 store = self.get_package_store(package)
333 try:
334 store.delete_package(package.descriptor_id)
335 except Exception as e:
336 self.log.warning("Failed to delete package from store: %s", str(e))
337
338 def upload_images(self, package, package_checksums):
339 image_file_map = rift.package.image.get_package_image_files(package)
340 name_hdl_map = {name: package.open(image_file_map[name]) for name in image_file_map}
341 if not image_file_map:
342 return
343
344 try:
345 for image_name, image_hdl in name_hdl_map.items():
346 image_file = image_file_map[image_name]
347 if image_file in package_checksums:
348 image_checksum = package_checksums[image_file]
349 else:
350 self.log.warning("checksum not provided for image %s. Calculating checksum",
351 image_file)
352 image_checksum = rift.package.checksums.checksum(
353 package.open(image_file_map[image_name])
354 )
355 try:
356 self.uploader.upload_image(image_name, image_checksum, image_hdl)
357 self.uploader.upload_image_to_cloud_accounts(image_name, image_checksum)
358
359 except image.ImageUploadError as e:
360 self.log.exception("Failed to upload image: %s", image_name)
361 raise MessageException(OnboardImageUploadError(str(e))) from e
362
363 finally:
364 _ = [image_hdl.close() for image_hdl in name_hdl_map.values()]
365
366 def extract_charms(self, package):
367 try:
368 charm_extractor = rift.package.charm.PackageCharmExtractor(self.log)
369 charm_extractor.extract_charms(package)
370 except rift.package.charm.CharmExtractionError as e:
371 raise MessageException(UpdateExtractionError()) from e
372
373 def extract_scripts(self, package):
374 try:
375 script_extractor = rift.package.script.PackageScriptExtractor(self.log)
376 script_extractor.extract_scripts(package)
377 except rift.package.script.ScriptExtractionError as e:
378 raise MessageException(UpdateExtractionError()) from e
379
380 def extract_configs(self, package):
381 try:
382 config_extractor = rift.package.config.PackageConfigExtractor(self.log)
383 config_extractor.extract_configs(package)
384 except rift.package.config.ConfigExtractionError as e:
385 raise MessageException(UpdateExtractionError()) from e
386
387 def extract_icons(self, package):
388 try:
389 icon_extractor = rift.package.icon.PackageIconExtractor(self.log)
390 icon_extractor.extract_icons(package)
391 except rift.package.icon.IconExtractionError as e:
392 raise MessageException(UpdateExtractionError()) from e
393
394 def validate_package(self, package):
395 checksum_validator = rift.package.package.PackageChecksumValidator(self.log)
396
397 try:
398 file_checksums = checksum_validator.validate(package)
399 except rift.package.package.PackageFileChecksumError as e:
400 raise MessageException(UpdateChecksumMismatch(e.filename)) from e
401 except rift.package.package.PackageValidationError as e:
402 raise MessageException(UpdateUnreadablePackage()) from e
403
404 return file_checksums
405
406 def update_descriptors(self, package):
407 descriptor_msg = package.descriptor_msg
408
409 self.log.message(UpdateDescriptorUpdate())
410
411 try:
412 self.onboarder.update(descriptor_msg)
413 except onboard.UpdateError as e:
414 raise MessageException(UpdateDescriptorError(package.descriptor_file)) from e
415
416
417 class OnboardPackage(downloader.DownloaderProtocol):
418
419 def __init__(self, log, loop, url, auth,
420 onboarder, uploader, package_store_map):
421 self.log = log
422 self.loop = loop
423 self.url = url
424 self.auth = auth
425 self.onboarder = onboarder
426 self.uploader = uploader
427 self.package_store_map = package_store_map
428
429 def _onboard_package(self, packages):
430 # Extract package could return multiple packages if
431 # the package is converted
432 for pkg in packages:
433 with pkg as temp_package:
434 package_checksums = self.validate_package(temp_package)
435 stored_package = self.store_package(temp_package)
436
437 try:
438 self.extract_charms(temp_package)
439 self.extract_scripts(temp_package)
440 self.extract_configs(temp_package)
441 self.extract_icons(temp_package)
442
443 self.onboard_descriptors(temp_package)
444
445 except Exception:
446 self.delete_stored_package(stored_package)
447 raise
448
449 else:
450 self.upload_images(temp_package, package_checksums)
451
452 def extract(self, packages):
453 try:
454 self._onboard_package(packages)
455 self.log.message(OnboardSuccess())
456
457 except MessageException as e:
458 self.log.message(e.msg)
459 self.log.message(OnboardFailure())
460
461 except Exception as e:
462 self.log.exception(e)
463 if str(e):
464 self.log.message(OnboardError(str(e)))
465 self.log.message(OnboardFailure())
466
467 def on_download_succeeded(self, job):
468 self.log.message(DownloadSuccess("Package downloaded."))
469
470 extractor = extract.UploadPackageExtractor(self.log)
471 file_backed_packages = extractor.create_packages_from_upload(
472 job.filename, job.filepath
473 )
474
475 self.extract(file_backed_packages)
476
477 def on_download_failed(self, job):
478 self.log.error(job.detail)
479 self.log.message(DownloadError("Package download failed. {}".format(job.detail)))
480 self.log.message(OnboardFailure())
481
482 def download_package(self):
483
484 _, filename = tempfile.mkstemp()
485 url_downloader = downloader.UrlDownloader(
486 self.url,
487 auth=self.auth,
488 file_obj=filename,
489 decompress_on_fly=True,
490 log=self.log)
491 url_downloader.delegate = self
492 url_downloader.download()
493
494 def get_package_store(self, package):
495 return self.package_store_map[package.descriptor_type]
496
497 def store_package(self, package):
498 store = self.get_package_store(package)
499
500 try:
501 store.store_package(package)
502 except rift.package.store.PackageExistsError as e:
503 store.update_package(package)
504
505 stored_package = store.get_package(package.descriptor_id)
506
507 return stored_package
508
509 def delete_stored_package(self, package):
510 self.log.info("Deleting stored package: %s", package)
511 store = self.get_package_store(package)
512 try:
513 store.delete_package(package.descriptor_id)
514 except Exception as e:
515 self.log.warning("Failed to delete package from store: %s", str(e))
516
517 def upload_images(self, package, package_checksums):
518 image_file_map = rift.package.image.get_package_image_files(package)
519 if not image_file_map:
520 return
521
522 name_hdl_map = {name: package.open(image_file_map[name]) for name in image_file_map}
523 try:
524 for image_name, image_hdl in name_hdl_map.items():
525 image_file = image_file_map[image_name]
526 if image_file in package_checksums:
527 image_checksum = package_checksums[image_file]
528 else:
529 self.log.warning("checksum not provided for image %s. Calculating checksum",
530 image_file)
531 image_checksum = rift.package.checksums.checksum(
532 package.open(image_file_map[image_name])
533 )
534 try:
535 self.uploader.upload_image(image_name, image_checksum, image_hdl)
536 self.uploader.upload_image_to_cloud_accounts(image_name, image_checksum)
537
538 except image.ImageUploadError as e:
539 raise MessageException(OnboardImageUploadError()) from e
540
541 finally:
542 _ = [image_hdl.close() for image_hdl in name_hdl_map.values()]
543
544 def extract_charms(self, package):
545 try:
546 charm_extractor = rift.package.charm.PackageCharmExtractor(self.log)
547 charm_extractor.extract_charms(package)
548 except rift.package.charm.CharmExtractionError as e:
549 raise MessageException(OnboardExtractionError()) from e
550
551 def extract_scripts(self, package):
552 try:
553 script_extractor = rift.package.script.PackageScriptExtractor(self.log)
554 script_extractor.extract_scripts(package)
555 except rift.package.script.ScriptExtractionError as e:
556 raise MessageException(OnboardExtractionError()) from e
557
558 def extract_configs(self, package):
559 try:
560 config_extractor = rift.package.config.PackageConfigExtractor(self.log)
561 config_extractor.extract_configs(package)
562 except rift.package.config.ConfigExtractionError as e:
563 raise MessageException(OnboardExtractionError()) from e
564
565 def extract_icons(self, package):
566 try:
567 icon_extractor = rift.package.icon.PackageIconExtractor(self.log)
568 icon_extractor.extract_icons(package)
569 except rift.package.icon.IconExtractionError as e:
570 raise MessageException(OnboardExtractionError()) from e
571
572 def validate_package(self, package):
573 checksum_validator = rift.package.package.PackageChecksumValidator(self.log)
574
575 try:
576 file_checksums = checksum_validator.validate(package)
577 except rift.package.package.PackageFileChecksumError as e:
578 raise MessageException(OnboardChecksumMismatch(e.filename)) from e
579 except rift.package.package.PackageValidationError as e:
580 raise MessageException(OnboardUnreadablePackage()) from e
581
582 return file_checksums
583
584 def onboard_descriptors(self, package):
585 descriptor_msg = package.descriptor_msg
586
587 self.log.message(OnboardDescriptorOnboard())
588
589 try:
590 self.onboarder.onboard(descriptor_msg)
591 except onboard.OnboardError as e:
592 raise MessageException(OnboardDescriptorError(package.descriptor_file)) from e
593
594
595 class UploaderApplication(tornado.web.Application):
596
597 @classmethod
598 def from_tasklet(cls, tasklet):
599 manifest = tasklet.tasklet_info.get_pb_manifest()
600 use_ssl = manifest.bootstrap_phase.rwsecurity.use_ssl
601 ssl_cert = manifest.bootstrap_phase.rwsecurity.cert
602 ssl_key = manifest.bootstrap_phase.rwsecurity.key
603 return cls(
604 tasklet.log,
605 tasklet.dts,
606 tasklet.loop,
607 ssl=(ssl_cert, ssl_key),
608 vnfd_store=tasklet.vnfd_package_store,
609 nsd_store=tasklet.nsd_package_store,
610 vnfd_catalog=tasklet.vnfd_catalog,
611 nsd_catalog=tasklet.nsd_catalog)
612
613 def __init__(
614 self,
615 log,
616 dts,
617 loop,
618 ssl=None,
619 vnfd_store=None,
620 nsd_store=None,
621 vnfd_catalog=None,
622 nsd_catalog=None):
623
624 self.log = log
625 self.loop = loop
626 self.dts = dts
627
628 self.use_ssl = False
629 self.ssl_cert, self.ssl_key = None, None
630 if ssl:
631 self.use_ssl = True
632 self.ssl_cert, self.ssl_key = ssl
633
634 if not vnfd_store:
635 vnfd_store = rift.package.store.VnfdPackageFilesystemStore(self.log)
636
637 if not nsd_store:
638 nsd_store = rift.package.store.NsdPackageFilesystemStore(self.log)
639
640 self.accounts = []
641 self.messages = collections.defaultdict(list)
642 self.export_dir = os.path.join(os.environ['RIFT_ARTIFACTS'], 'launchpad/exports')
643
644 self.uploader = image.ImageUploader(self.log, self.loop, self.dts)
645 self.onboarder = onboard.DescriptorOnboarder(
646 self.log, "127.0.0.1", 8008, self.use_ssl, self.ssl_cert, self.ssl_key
647 )
648 self.package_store_map = {
649 "vnfd": vnfd_store,
650 "nsd": nsd_store
651 }
652
653 self.exporter = export.DescriptorPackageArchiveExporter(self.log)
654 self.loop.create_task(export.periodic_export_cleanup(self.log, self.loop, self.export_dir))
655
656 self.vnfd_catalog = vnfd_catalog
657 self.nsd_catalog = nsd_catalog
658 catalog_map = {
659 "vnfd": self.vnfd_catalog,
660 "nsd": self.nsd_catalog
661 }
662
663 self.upload_handler = UploadRpcHandler(self.log, self.dts, self.loop, self)
664 self.update_handler = UpdateRpcHandler(self.log, self.dts, self.loop, self)
665 self.export_handler = export.ExportRpcHandler(
666 self.log,
667 self.dts,
668 self.loop,
669 self,
670 store_map=self.package_store_map,
671 exporter=self.exporter,
672 catalog_map=catalog_map
673 )
674
675 attrs = dict(log=self.log, loop=self.loop)
676
677 super(UploaderApplication, self).__init__([
678 (r"/api/package/vnfd/(.*)", pkg_handler.FileRestApiHandler, {
679 'path': vnfd_store.root_dir}),
680 (r"/api/package/nsd/(.*)", pkg_handler.FileRestApiHandler, {
681 'path': nsd_store.root_dir}),
682
683 (r"/api/upload/([^/]+)/state", UploadStateHandler, attrs),
684 (r"/api/update/([^/]+)/state", UpdateStateHandler, attrs),
685 (r"/api/export/([^/]+)/state", export.ExportStateHandler, attrs),
686
687 (r"/api/export/([^/]+.tar.gz)", tornado.web.StaticFileHandler, {
688 "path": self.export_dir,
689 }),
690 (r"/api/export/([^/]+.zip)", tornado.web.StaticFileHandler, {
691 "path": self.export_dir,
692 }),
693 ])
694
695 @asyncio.coroutine
696 def register(self):
697 yield from self.upload_handler.register()
698 yield from self.update_handler.register()
699 yield from self.export_handler.register()
700
701 def get_logger(self, transaction_id):
702 return message.Logger(self.log, self.messages[transaction_id])
703
704 def onboard(self, url, transaction_id, auth=None):
705 log = message.Logger(self.log, self.messages[transaction_id])
706
707 onboard_package = OnboardPackage(
708 log,
709 self.loop,
710 url,
711 auth,
712 self.onboarder,
713 self.uploader,
714 self.package_store_map,
715 )
716
717 self.loop.run_in_executor(None, onboard_package.download_package)
718
719 def update(self, url, transaction_id, auth=None):
720 log = message.Logger(self.log, self.messages[transaction_id])
721
722 update_package = UpdatePackage(
723 log,
724 self.loop,
725 url,
726 auth,
727 self.onboarder,
728 self.uploader,
729 self.package_store_map,
730 )
731
732 self.loop.run_in_executor(None, update_package.download_package)
733