7aafcb94298ab0ad51772916bc41b530423389b1
[osm/SO.git] / rwlaunchpad / plugins / rwlaunchpadtasklet / rift / tasklets / rwlaunchpad / uploader.py
1
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17 import abc
18 import asyncio
19 import collections
20 import os
21 import tempfile
22 import threading
23 import uuid
24 import zlib
25 import re
26
27 import tornado
28 import tornado.escape
29 import tornado.ioloop
30 import tornado.web
31 import tornado.httputil
32 import tornadostreamform.multipart_streamer as multipart_streamer
33
34 import requests
35
36 # disable unsigned certificate warning
37 from requests.packages.urllib3.exceptions import InsecureRequestWarning
38 requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
39
40 import gi
41 gi.require_version('RwLaunchpadYang', '1.0')
42 gi.require_version('ProjectNsdYang', '1.0')
43 gi.require_version('ProjectVnfdYang', '1.0')
44
45 from gi.repository import (
46 ProjectNsdYang as NsdYang,
47 ProjectVnfdYang as VnfdYang,
48 )
49 import rift.mano.cloud
50
51 import rift.package.checksums
52 import rift.package.convert
53 import rift.package.handler as pkg_handler
54 import rift.package.icon
55 import rift.package.package
56 import rift.package.script
57 import rift.package.store
58
59 from gi.repository import (
60 RwDts as rwdts,
61 RwPkgMgmtYang
62 )
63 import rift.downloader as downloader
64 import rift.mano.dts as mano_dts
65 import rift.tasklets
66
67 from . import (
68 export,
69 extract,
70 image,
71 message,
72 onboard,
73 state,
74 )
75
76 from .message import (
77 MessageException,
78
79 # Onboard Error Messages
80 OnboardChecksumMismatch,
81 OnboardDescriptorError,
82 OnboardDescriptorExistsError,
83 OnboardDescriptorFormatError,
84 OnboardError,
85 OnboardExtractionError,
86 OnboardImageUploadError,
87 OnboardMissingContentBoundary,
88 OnboardMissingContentType,
89 OnboardMissingTerminalBoundary,
90 OnboardUnreadableHeaders,
91 OnboardUnreadablePackage,
92 OnboardUnsupportedMediaType,
93
94 # Onboard Status Messages
95 OnboardDescriptorOnboard,
96 OnboardFailure,
97 OnboardImageUpload,
98 OnboardPackageUpload,
99 OnboardPackageValidation,
100 OnboardStart,
101 OnboardSuccess,
102
103 DownloadError,
104 DownloadSuccess,
105
106 # Update Error Messages
107 UpdateChecksumMismatch,
108 UpdateDescriptorError,
109 UpdateDescriptorFormatError,
110 UpdateError,
111 UpdateExtractionError,
112 UpdateImageUploadError,
113 UpdateMissingContentBoundary,
114 UpdateMissingContentType,
115 UpdatePackageNotFoundError,
116 UpdateUnreadableHeaders,
117 UpdateUnreadablePackage,
118 UpdateUnsupportedMediaType,
119
120 # Update Status Messages
121 UpdateDescriptorUpdate,
122 UpdateDescriptorUpdated,
123 UpdatePackageUpload,
124 UpdateStart,
125 UpdateSuccess,
126 UpdateFailure,
127 )
128
129 from .tosca import ExportTosca
130
131 from .onboard import OnboardError as OnboardException
132
133 MB = 1024 * 1024
134 GB = 1024 * MB
135
136 MAX_STREAMED_SIZE = 5 * GB
137
138 # Shortcuts
139 RPC_PACKAGE_CREATE_ENDPOINT = RwPkgMgmtYang.YangOutput_RwPkgMgmt_PackageCreate
140 RPC_PACKAGE_UPDATE_ENDPOINT = RwPkgMgmtYang.YangOutput_RwPkgMgmt_PackageUpdate
141
142 class HttpMessageError(Exception):
143 def __init__(self, code, msg):
144 self.code = code
145 self.msg = msg
146
147
148 class UploadRpcHandler(mano_dts.AbstractRpcHandler):
149 def __init__(self, application):
150 """
151 Args:
152 application: UploaderApplication
153 """
154 super().__init__(application.log, application.dts, application.loop)
155 self.application = application
156
157 @property
158 def xpath(self):
159 return "/rw-pkg-mgmt:package-create"
160
161 @asyncio.coroutine
162 def callback(self, ks_path, msg):
163 transaction_id = str(uuid.uuid4())
164 log = self.application.get_logger(transaction_id)
165 log.message(OnboardStart())
166
167 self.log.debug("Package create RPC: {}".format(msg))
168
169 auth = None
170 if msg.username is not None:
171 auth = (msg.username, msg.password)
172
173 try:
174 project = msg.project_name
175 except AttributeError as e:
176 self._log.warning("Did not get project name in RPC: {}".
177 format(msg.as_dict()))
178 project = rift.mano.utils.project.DEFAULT_PROJECT
179
180 self.application.onboard(
181 msg.external_url,
182 transaction_id,
183 auth=auth,
184 project=project,
185 )
186
187 rpc_op = RPC_PACKAGE_CREATE_ENDPOINT.from_dict({
188 "transaction_id": transaction_id,
189 "project_name": project,
190 })
191
192 return rpc_op
193
194
195 class UpdateRpcHandler(mano_dts.AbstractRpcHandler):
196 def __init__(self, application):
197 """
198 Args:
199 application: UploaderApplication
200 """
201 super().__init__(application.log, application.dts, application.loop)
202 self.application = application
203
204 @property
205 def xpath(self):
206 return "/rw-pkg-mgmt:package-update"
207
208 @asyncio.coroutine
209 def callback(self, ks_path, msg):
210
211 transaction_id = str(uuid.uuid4())
212 log = self.application.get_logger(transaction_id)
213 log.message(UpdateStart())
214
215 auth = None
216 if msg.username is not None:
217 auth = (msg.username, msg.password)
218
219 self.application.update(
220 msg.external_url,
221 transaction_id,
222 auth=auth,
223 project=msg.project_name,
224 )
225
226 rpc_op = RPC_PACKAGE_UPDATE_ENDPOINT.from_dict({
227 "transaction_id": transaction_id,
228 "project_name": msg.project_name,
229 })
230
231 return rpc_op
232
233
234 class UploadStateHandler(state.StateHandler):
235 STARTED = OnboardStart
236 SUCCESS = OnboardSuccess
237 FAILURE = OnboardFailure
238
239
240 class UpdateStateHandler(state.StateHandler):
241 STARTED = UpdateStart
242 SUCCESS = UpdateSuccess
243 FAILURE = UpdateFailure
244
245
246 class UpdatePackage(downloader.DownloaderProtocol):
247
248 def __init__(self, log, loop, project, url, auth,
249 onboarder, uploader, package_store_map, transaction_id):
250 super().__init__()
251 self.log = log
252 self.loop = loop
253 self.project = project
254 self.url = url
255 self.auth = auth
256 self.onboarder = onboarder
257 self.uploader = uploader
258 self.package_store_map = package_store_map
259 self.transaction_id = transaction_id
260
261
262 def _update_package(self, packages):
263
264 # Extract package could return multiple packages if
265 # the package is converted
266 for pkg in packages:
267 with pkg as temp_package:
268 package_checksums = self.validate_package(temp_package)
269 stored_package = self.update_package(temp_package)
270 self.validate_descriptor_fields(temp_package)
271
272 try:
273 self.extract_icons(temp_package)
274 self.update_descriptors(temp_package)
275
276 except Exception:
277 self.delete_stored_package(stored_package)
278 raise
279
280 else:
281 self.upload_images(temp_package, package_checksums)
282
283 def extract(self, packages):
284 try:
285 self._update_package(packages)
286 self.log.message(UpdateSuccess())
287
288 except MessageException as e:
289 self.log.message(e.msg)
290 self.log.message(UpdateFailure())
291 raise UpdateFailure(str(e))
292
293 except Exception as e:
294 self.log.exception(e)
295 if str(e):
296 self.log.message(UpdateError(str(e)))
297 self.log.message(UpdateFailure())
298
299 def on_download_succeeded(self, job):
300 self.log.message(DownloadSuccess("Package downloaded."))
301
302 extractor = extract.UploadPackageExtractor(self.log)
303 file_backed_packages = extractor.create_packages_from_upload(
304 job.filename, job.filepath
305 )
306 try:
307 self.extract(file_backed_packages)
308 except Exception as e:
309 raise Exception("Error in Package Update")
310
311 def on_download_finished(self, job):
312 self.log.debug("*** Download completed")
313 if hasattr(self.project, 'update_status_handler'):
314 self.project.update_status_handler.update_status(job, self.transaction_id)
315
316 def on_download_progress(self, job):
317 self.log.debug("*** Download in progress")
318 if hasattr(self.project, 'update_status_handler'):
319 self.project.update_status_handler.update_status(job, self.transaction_id)
320
321 def on_download_failed(self, job):
322 self.log.error(job.detail)
323 self.log.message(DownloadError("Package download failed. {}".format(job.detail)))
324 self.log.message(UpdateFailure())
325
326 def download_package(self):
327
328 _, filename = tempfile.mkstemp()
329 url_downloader = downloader.UrlDownloader(
330 self.url,
331 auth=self.auth,
332 file_obj=filename,
333 decompress_on_fly=True,
334 log=self.log)
335 url_downloader.delegate = self
336 url_downloader.download()
337
338 def get_package_store(self, package):
339 return self.package_store_map[package.descriptor_type]
340
341 def update_package(self, package):
342 store = self.get_package_store(package)
343
344 try:
345 store.update_package(package)
346 except rift.package.store.PackageNotFoundError as e:
347 # If the package doesn't exist, then it is possible the descriptor was onboarded
348 # out of band. In that case, just store the package as is
349 self.log.warning("Package not found, storing new package instead.")
350 store.store_package(package)
351
352 stored_package = store.get_package(package.descriptor_id)
353
354 return stored_package
355
356 def delete_stored_package(self, package):
357 self.log.info("Deleting stored package: %s", package)
358 store = self.get_package_store(package)
359 try:
360 store.delete_package(package.descriptor_id)
361 except Exception as e:
362 self.log.warning("Failed to delete package from store: %s", str(e))
363
364 def upload_images(self, package, package_checksums):
365 image_file_map = rift.package.image.get_package_image_files(package)
366 name_hdl_map = {name: package.open(image_file_map[name]) for name in image_file_map}
367 if not image_file_map:
368 return
369
370 try:
371 for image_name, image_hdl in name_hdl_map.items():
372 image_file = image_file_map[image_name]
373 if image_file in package_checksums:
374 image_checksum = package_checksums[image_file]
375 else:
376 self.log.warning("checksum not provided for image %s. Calculating checksum",
377 image_file)
378 image_checksum = rift.package.checksums.checksum(
379 package.open(image_file_map[image_name])
380 )
381 try:
382 self.uploader.upload_image(image_name, image_checksum, image_hdl)
383 self.uploader.upload_image_to_cloud_accounts(image_name, image_checksum, self.project)
384
385 except image.ImageUploadError as e:
386 self.log.exception("Failed to upload image: %s", image_name)
387 raise MessageException(OnboardImageUploadError(str(e))) from e
388
389 finally:
390 _ = [image_hdl.close() for image_hdl in name_hdl_map.values()]
391
392 def extract_icons(self, package):
393 try:
394 icon_extractor = rift.package.icon.PackageIconExtractor(self.log)
395 icon_extractor.extract_icons(package)
396 except rift.package.icon.IconExtractionError as e:
397 raise MessageException(UpdateExtractionError()) from e
398
399 def validate_descriptor_fields(self, package):
400 # We can add more VNFD validations here. Currently we are validating only cloud-init
401 if package.descriptor_msg is not None:
402 self.validate_cloud_init_file(package)
403
404 def validate_cloud_init_file(self, package):
405 """ This validation is for VNFDs with associated VDUs. """
406 if 'vdu' in package.descriptor_msg.as_dict():
407 for vdu in package.descriptor_msg.as_dict()['vdu']:
408 if 'cloud_init_file' in vdu:
409 cloud_init_file = vdu['cloud_init_file']
410 for file in package.files:
411 if file.endswith('/' + cloud_init_file) is True:
412 return
413 raise MessageException(
414 OnboardError("Cloud-Init file reference in VNFD does not match with cloud-init file"))
415
416 def validate_package(self, package):
417 checksum_validator = rift.package.package.PackageChecksumValidator(self.log)
418
419 try:
420 checksum_validator.validate(package)
421 except rift.package.package.PackageFileChecksumError as e:
422 raise MessageException(UpdateChecksumMismatch(e.filename)) from e
423 except rift.package.package.PackageValidationError as e:
424 raise MessageException(UpdateUnreadablePackage()) from e
425
426 return checksum_validator.checksums
427
428 def update_descriptors(self, package):
429 descriptor_msg = package.descriptor_msg
430
431 self.log.message(UpdateDescriptorUpdate())
432
433 try:
434 self.onboarder.update(descriptor_msg, project=self.project.name)
435 except onboard.UpdateError as e:
436 raise MessageException(UpdateDescriptorError(package.descriptor_file)) from e
437
438
439 class OnboardPackage(downloader.DownloaderProtocol):
440
441 def __init__(self, log, loop, project, url, auth,
442 onboarder, uploader, package_store_map, transaction_id):
443 self.log = log
444 self.loop = loop
445 self.project = project
446 self.url = url
447 self.auth = auth
448 self.onboarder = onboarder
449 self.uploader = uploader
450 self.package_store_map = package_store_map
451 self.transaction_id = transaction_id
452
453 def _onboard_package(self, packages):
454 # Extract package could return multiple packages if
455 # the package is converted
456 for pkg in packages:
457 with pkg as temp_package:
458 package_checksums = self.validate_package(temp_package)
459 stored_package = self.store_package(temp_package)
460 self.validate_descriptor_fields(temp_package)
461
462 try:
463 self.extract_icons(temp_package)
464 self.onboard_descriptors(temp_package)
465
466 except Exception as e:
467 if "data-exists" not in e.msg.text:
468 self.delete_stored_package(stored_package)
469 raise
470 else:
471 self.upload_images(temp_package, package_checksums)
472
473 def extract(self, packages):
474 try:
475 self._onboard_package(packages)
476 self.log.message(OnboardSuccess())
477
478 except MessageException as e:
479 self.log.message(e.msg)
480 self.log.message(OnboardFailure())
481 raise OnboardException(OnboardFailure())
482
483
484 except Exception as e:
485 self.log.exception(e)
486 if str(e):
487 self.log.message(OnboardError(str(e)))
488 self.log.message(OnboardFailure())
489
490 def on_download_succeeded(self, job):
491 self.log.message(DownloadSuccess("Package downloaded."))
492
493 extractor = extract.UploadPackageExtractor(self.log)
494 file_backed_packages = extractor.create_packages_from_upload(
495 job.filename, job.filepath
496 )
497 try:
498 self.extract(file_backed_packages)
499 except Exception as e:
500 raise Exception("Error in Onboarding Package")
501
502 def on_download_finished(self, job):
503 self.log.debug("*** Download completed")
504 if hasattr(self.project, 'upload_status_handler'):
505 self.project.upload_status_handler.upload_status(job, self.transaction_id)
506
507 def on_download_progress(self, job):
508 self.log.debug("*** Download in progress")
509 if hasattr(self.project, 'upload_status_handler'):
510 self.project.upload_status_handler.upload_status(job, self.transaction_id)
511
512 def on_download_failed(self, job):
513 self.log.error(job.detail)
514 self.log.message(DownloadError("Package download failed. {}".format(job.detail)))
515 self.log.message(OnboardFailure())
516
517 def download_package(self):
518
519 self.log.debug("Before pkg download, project = {}".format(self.project.name))
520 _, filename = tempfile.mkstemp()
521 url_downloader = downloader.UrlDownloader(
522 self.url,
523 auth=self.auth,
524 file_obj=filename,
525 decompress_on_fly=True,
526 log=self.log)
527 url_downloader.delegate = self
528 url_downloader.download()
529
530 def get_package_store(self, package):
531 return self.package_store_map[package.descriptor_type]
532
533 def store_package(self, package):
534 store = self.get_package_store(package)
535
536 try:
537 store.store_package(package)
538 except rift.package.store.PackageExistsError as e:
539 store.update_package(package)
540
541 stored_package = store.get_package(package.descriptor_id)
542
543 return stored_package
544
545 def delete_stored_package(self, package):
546 self.log.info("Deleting stored package: %s", package)
547 store = self.get_package_store(package)
548 try:
549 store.delete_package(package.descriptor_id)
550 except Exception as e:
551 self.log.warning("Failed to delete package from store: %s", str(e))
552
553 def upload_images(self, package, package_checksums):
554 image_file_map = rift.package.image.get_package_image_files(package)
555 if not image_file_map:
556 return
557
558 name_hdl_map = {name: package.open(image_file_map[name]) for name in image_file_map}
559 try:
560 for image_name, image_hdl in name_hdl_map.items():
561 image_file = image_file_map[image_name]
562 if image_file in package_checksums:
563 image_checksum = package_checksums[image_file]
564 else:
565 self.log.warning("checksum not provided for image %s. Calculating checksum",
566 image_file)
567 image_checksum = rift.package.checksums.checksum(
568 package.open(image_file_map[image_name])
569 )
570 try:
571 set_image_property = {}
572 self.uploader.upload_image(image_name, image_checksum, image_hdl, set_image_property)
573 self.uploader.upload_image_to_cloud_accounts(image_name, image_checksum, self.project.name)
574
575 except image.ImageUploadError as e:
576 raise MessageException(OnboardImageUploadError(str(e))) from e
577
578 finally:
579 _ = [image_hdl.close() for image_hdl in name_hdl_map.values()]
580
581 def extract_icons(self, package):
582 try:
583 icon_extractor = rift.package.icon.PackageIconExtractor(self.log)
584 icon_extractor.extract_icons(package)
585 except rift.package.icon.IconExtractionError as e:
586 raise MessageException(OnboardExtractionError()) from e
587
588 def validate_descriptor_fields(self, package):
589 # We can add more VNFD/NSD validations here.
590 if package.descriptor_msg is not None:
591 self.validate_cloud_init_file(package)
592 self.validate_vld_mgmt_network(package)
593
594 def validate_vld_mgmt_network(self, package):
595 """ This is validation at onboarding of NSD for atleast one of the VL's to have mgmt network true
596 and have minimum one connection point"""
597 if package.descriptor_type == 'nsd':
598 for vld in package.descriptor_msg.as_dict().get('vld',[]):
599 if vld.get('mgmt_network', False) is True and \
600 len(vld.get('vnfd_connection_point_ref',[])) > 0 :
601 break
602 else:
603 self.log.error(("AtLeast One of the VL's should have Management Network as True "
604 "and have minimum one connection point"))
605
606 def validate_cloud_init_file(self, package):
607 """ This validation is for VNFDs with associated VDUs. """
608 if 'vdu' in package.descriptor_msg.as_dict():
609 for vdu in package.descriptor_msg.as_dict()['vdu']:
610 if 'cloud_init_file' in vdu:
611 cloud_init_file = vdu['cloud_init_file']
612 for file in package.files:
613 if file.endswith('/' + cloud_init_file) is True:
614 return
615 raise MessageException(
616 OnboardError("Cloud-Init file reference in VNFD does not match with cloud-init file"))
617
618 def validate_package(self, package):
619 validators = (
620 rift.package.package.PackageChecksumValidator(self.log),
621 rift.package.package.PackageConstructValidator(self.log),
622 )
623
624 # Run the validators for checksum and package construction for imported pkgs
625 for validator in validators:
626 try:
627 validator.validate(package)
628
629 except rift.package.package.PackageFileChecksumError as e:
630 raise MessageException(OnboardChecksumMismatch(e.filename)) from e
631 except rift.package.package.PackageValidationError as e:
632 raise MessageException(OnboardUnreadablePackage()) from e
633
634 return validators[0].checksums
635
636 def onboard_descriptors(self, package):
637 def process_error_messsage(exception, package):
638 """
639 This method captures error reason. This needs to be enhanced with time.
640 """
641 exception_msg = str(exception)
642 match_duplicate = re.findall('<error-tag>(.*?)</error-tag>', exception_msg, re.DOTALL)
643
644 if len(match_duplicate) > 0:
645 error_message = str(match_duplicate[0])
646 return error_message
647
648 match = re.findall('<tailf:missing-element>(.*?)</tailf:missing-element>', exception_msg, re.DOTALL)
649 error_message = ""
650 if len(match) > 0:
651 for element in match:
652 element_message = "Missing element : {}".format(element)
653 error_message += element_message
654 else:
655 error_message = package.descriptor_file
656 return error_message
657
658 def process_exception(exception, package):
659 return OnboardDescriptorError(process_error_messsage(exception, package))
660
661 descriptor_msg = package.descriptor_msg
662 self.log.message(OnboardDescriptorOnboard())
663
664 try:
665 self.onboarder.onboard(descriptor_msg, project=self.project.name)
666 except onboard.OnboardError as e:
667 raise MessageException(process_exception(e, package)) from e
668
669
670 class UploaderApplication(tornado.web.Application):
671
672 @classmethod
673 def from_tasklet(cls, tasklet):
674 manifest = tasklet.tasklet_info.get_pb_manifest()
675 use_ssl = manifest.bootstrap_phase.rwsecurity.use_ssl
676 ssl_cert = manifest.bootstrap_phase.rwsecurity.cert
677 ssl_key = manifest.bootstrap_phase.rwsecurity.key
678 return cls(
679 tasklet,
680 ssl=(ssl_cert, ssl_key))
681
682 def __init__(
683 self,
684 tasklet,
685 ssl=None,
686 vnfd_store=None,
687 nsd_store=None):
688
689 self.log = tasklet.log
690 self.loop = tasklet.loop
691 self.dts = tasklet.dts
692
693 self.accounts = {}
694 self.ro_accounts = {}
695
696 self.use_ssl = False
697 self.ssl_cert, self.ssl_key = None, None
698 if ssl:
699 self.use_ssl = True
700 self.ssl_cert, self.ssl_key = ssl
701
702 self.messages = collections.defaultdict(list)
703 self.export_dir = os.path.join(os.environ['RIFT_VAR_ROOT'], 'launchpad/exports')
704
705 self.uploader = image.ImageUploader(self.log, self.loop, self.dts)
706 self.onboarder = onboard.DescriptorOnboarder(
707 self.log, "127.0.0.1", 8008, self.use_ssl, self.ssl_cert, self.ssl_key
708 )
709
710 self.exporter = export.DescriptorPackageArchiveExporter(self.log)
711 self.loop.create_task(export.periodic_export_cleanup(self.log, self.loop, self.export_dir))
712
713 self.tasklet = tasklet
714 self.get_vnfd_catalog = tasklet.get_vnfd_catalog
715 self.get_nsd_catalog = tasklet.get_nsd_catalog
716 catalog_map = {
717 "vnfd": self.get_vnfd_catalog,
718 "nsd": self.get_nsd_catalog
719 }
720
721 self.upload_handler = UploadRpcHandler(self)
722 self.update_handler = UpdateRpcHandler(self)
723 self.export_handler = export.ExportRpcHandler(self, catalog_map)
724
725 attrs = dict(log=self.log, loop=self.loop)
726
727 super(UploaderApplication, self).__init__([
728 (r"/api/package/vnfd/(.*)", pkg_handler.FileRestApiHandler, {
729 'path': rift.package.store.VnfdPackageFilesystemStore.DEFAULT_ROOT_DIR}),
730 (r"/api/package/nsd/(.*)", pkg_handler.FileRestApiHandler, {
731 'path': rift.package.store.NsdPackageFilesystemStore.DEFAULT_ROOT_DIR}),
732
733 (r"/api/upload/([^/]+)/state", UploadStateHandler, attrs),
734 (r"/api/update/([^/]+)/state", UpdateStateHandler, attrs),
735 (r"/api/export/([^/]+)/state", export.ExportStateHandler, attrs),
736
737 (r"/api/export/([^/]+.tar.gz)", tornado.web.StaticFileHandler, {
738 "path": self.export_dir,
739 }),
740 (r"/api/export/([^/]+.zip)", tornado.web.StaticFileHandler, {
741 "path": self.export_dir,
742 }),
743 ])
744
745 @asyncio.coroutine
746 def register(self):
747 yield from self.upload_handler.register()
748 yield from self.update_handler.register()
749 yield from self.export_handler.register()
750
751 def get_logger(self, transaction_id):
752 return message.Logger(self.log, self.messages[transaction_id])
753
754 def build_store_map(self, project=None):
755 ''' Use project information to build vnfd/nsd filesystem stores with appropriate
756 package directory root.
757 '''
758 vnfd_store = rift.package.store.VnfdPackageFilesystemStore(self.log) if not \
759 project else rift.package.store.VnfdPackageFilesystemStore(self.log, project=project)
760 nsd_store = rift.package.store.NsdPackageFilesystemStore(self.log) if not \
761 project else rift.package.store.NsdPackageFilesystemStore(self.log, project=project)
762
763 return dict(vnfd = vnfd_store, nsd = nsd_store)
764
765 def onboard(self, url, transaction_id, auth=None, project=None):
766 log = message.Logger(self.log, self.messages[transaction_id])
767
768 try:
769 self.project = self.tasklet._get_project(project)
770 except Exception as e:
771 self.log.error("Exception raised ...%s" % (str(e)))
772 self.log.exception(e)
773
774 self.package_store_map = self.build_store_map(project)
775 onboard_package = OnboardPackage(
776 log,
777 self.loop,
778 self.project,
779 url,
780 auth,
781 self.onboarder,
782 self.uploader,
783 self.package_store_map,
784 transaction_id
785 )
786
787 self.loop.run_in_executor(None, onboard_package.download_package)
788
789 def update(self, url, transaction_id, auth=None, project=None):
790 log = message.Logger(self.log, self.messages[transaction_id])
791
792 try:
793 self.project = self.tasklet._get_project(project)
794 except Exception as e:
795 self.log.error("Exception raised ...%s" % (str(e)))
796 self.log.exception(e)
797
798 self.package_store_map = self.build_store_map(project)
799 update_package = UpdatePackage(
800 log,
801 self.loop,
802 self.project,
803 url,
804 auth,
805 self.onboarder,
806 self.uploader,
807 self.package_store_map,
808 transaction_id
809 )
810
811 self.loop.run_in_executor(None, update_package.download_package)