3 # Copyright 2016-2017 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
29 import rift
.package
.archive
30 import rift
.package
.checksums
31 import rift
.package
.package
32 import rift
.package
.store
33 import rift
.package
.image
40 gi
.require_version('RwPkgMgmtYang', '1.0')
42 from gi
.repository
import (
49 import rift
.mano
.dts
as mano_dts
52 RPC_PACKAGE_EXPORT_ENDPOINT
= RwPkgMgmtYang
.YangOutput_RwPkgMgmt_PackageExport
55 class ExportStart(message
.StatusMessage
):
57 super().__init
__("export-started", "export process started")
60 class ExportSuccess(message
.StatusMessage
):
62 super().__init
__("export-success", "export process successfully completed")
65 class ExportFailure(message
.StatusMessage
):
67 super().__init
__("export-failure", "export process failed")
70 class ExportError(message
.ErrorMessage
):
71 def __init__(self
, msg
):
72 super().__init
__("update-error", msg
)
75 class ExportSingleDescriptorOnlyError(ExportError
):
77 super().__init
__("Only a single descriptor can be exported")
80 class ArchiveExportError(Exception):
84 class DescriptorPackageArchiveExporter(object):
85 def __init__(self
, log
):
88 def _create_archive_from_package(self
, archive_hdl
, package
, open_fn
, top_level_dir
=None):
89 orig_open
= package
.open
91 package
.open = open_fn
92 archive
= rift
.package
.archive
.TarPackageArchive
.from_package(
93 self
._log
, package
, archive_hdl
, top_level_dir
97 package
.open = orig_open
99 def create_archive(self
, archive_hdl
, package
, desc_json_str
, serializer
, project
=None):
100 """ Create a package archive from an existing package, descriptor messages,
101 and a destination serializer.
103 In order to stay flexible with the package directory structure and
104 descriptor format, attempt to "augment" the onboarded package with the
105 updated descriptor in the original format. If the original package
106 contained a checksum file, then recalculate the descriptor checksum.
109 archive_hdl - An open file handle with 'wb' permissions
110 package - A DescriptorPackage instance
111 desc_json_str - A descriptor (e.g. nsd, vnfd) protobuf message
112 serializer - A destination serializer (e.g. VnfdSerializer)
118 ArchiveExportError - The exported archive failed to create
121 new_desc_msg
= serializer
.from_file_hdl(io
.BytesIO(desc_json_str
.encode()), ".json", project
)
122 _
, dest_ext
= os
.path
.splitext(package
.descriptor_file
)
123 new_desc_hdl
= io
.BytesIO(serializer
.to_string(new_desc_msg
, dest_ext
).encode())
124 descriptor_checksum
= rift
.package
.checksums
.checksum(new_desc_hdl
)
128 checksum_file
= rift
.package
.package
.PackageChecksumValidator
.get_package_checksum_file(
132 except FileNotFoundError
:
135 # Since we're going to intercept the open function to rewrite the descriptor
136 # and checksum, save a handle to use below
137 open_fn
= package
.open
139 def create_checksum_file_hdl():
140 with
open_fn(checksum_file
) as checksum_hdl
:
141 archive_checksums
= rift
.package
.checksums
.ArchiveChecksums
.from_file_desc(
145 # Get the name of the descriptor file without the prefix
146 # (which is what is stored in the checksum file)
147 desc_file_no_prefix
= os
.path
.relpath(package
.descriptor_file
, package
.prefix
)
148 archive_checksums
[desc_file_no_prefix
] = descriptor_checksum
150 checksum_hdl
= io
.BytesIO(archive_checksums
.to_string().encode())
153 def open_wrapper(rel_path
):
154 """ Wraps the package open in order to rewrite the descriptor file and checksum """
155 if rel_path
== package
.descriptor_file
:
158 elif rel_path
== checksum_file
:
159 return create_checksum_file_hdl()
161 return open_fn(rel_path
)
163 archive
= self
._create
_archive
_from
_package
(archive_hdl
, package
, open_wrapper
, new_desc_msg
.name
)
167 def export_package(self
, package
, export_dir
, file_id
, json_desc_str
, dest_serializer
, project
=None):
168 """ Export package as an archive to the export directory
171 package - A DescriptorPackage instance
172 export_dir - The directory to export the package archive to
173 file_id - A unique file id to name the archive as (i.e. <file_id>.tar.gz)
174 json_desc_str - A descriptor (e.g. nsd, vnfd) json message string
175 dest_serializer - A destination serializer (e.g. VnfdSerializer)
178 The created archive path
181 ArchiveExportError - The exported archive failed to create
184 os
.makedirs(export_dir
, exist_ok
=True)
185 except FileExistsError
:
188 archive_path
= os
.path
.join(export_dir
, file_id
+ ".tar.gz")
189 with
open(archive_path
, 'wb') as archive_hdl
:
192 archive_hdl
, package
, json_desc_str
, dest_serializer
, project
194 except Exception as e
:
195 os
.remove(archive_path
)
196 msg
= "Failed to create exported archive"
198 raise ArchiveExportError(msg
) from e
203 class ExportRpcHandler(mano_dts
.AbstractRpcHandler
):
204 def __init__(self
, application
, catalog_map
):
207 application: UploaderApplication
208 calalog_map: Dict containing Vnfds and Nsd onboarding.
210 super().__init
__(application
.log
, application
.dts
, application
.loop
)
212 self
.application
= application
213 self
.exporter
= application
.exporter
214 self
.onboarder
= application
.onboarder
215 self
.catalog_map
= catalog_map
221 return "/rw-pkg-mgmt:package-export"
224 def callback(self
, ks_path
, msg
):
225 transaction_id
= str(uuid
.uuid4())
226 log
= message
.Logger(
228 self
.application
.messages
[transaction_id
],
231 file_name
= self
.export(transaction_id
, log
, msg
)
233 rpc_out
= RPC_PACKAGE_EXPORT_ENDPOINT
.from_dict({
234 'transaction_id': transaction_id
,
235 'filename': file_name
})
239 def export(self
, transaction_id
, log
, msg
):
241 "vnfd": RwProjectVnfdYang
.YangData_RwProject_Project_VnfdCatalog_Vnfd
,
242 "nsd": RwProjectNsdYang
.YangData_RwProject_Project_NsdCatalog_Nsd
245 log
.message(ExportStart())
246 desc_type
= msg
.package_type
.lower()
248 if desc_type
not in self
.catalog_map
:
249 raise ValueError("Invalid package type: {}".format(desc_type
))
252 desc_id
= msg
.package_id
253 catalog
= self
.catalog_map
[desc_type
](project
=msg
.project_name
)
255 # TODO: Descriptor isn't available from catalog info passed in from launchpad tasklet.
256 # If unavailable, create a filler descriptor object, which will be updated
257 # via GET call to config.
258 if desc_id
in catalog
:
259 desc_msg
= catalog
[desc_id
]
261 log
.warn("Unable to find package ID in catalog: {}".format(desc_id
))
262 desc_msg
= DESC_TYPE_PB_MAP
[desc_type
](id = desc_id
)
264 self
.store_map
= self
.application
.build_store_map(project
=msg
.project_name
)
265 self
.project_name
= msg
.project_name
if msg
.has_field('project_name') else None
267 # Get the schema for exporting
268 schema
= msg
.export_schema
.lower()
270 # Get the grammar for exporting
271 grammar
= msg
.export_grammar
.lower()
273 # Get the format for exporting
274 format_
= msg
.export_format
.lower()
276 # Initial value of the exported filename
277 self
.filename
= "{name}_{ver}".format(
279 ver
=desc_msg
.version
)
281 if grammar
== 'tosca':
282 self
.export_tosca(schema
, format_
, desc_type
, desc_id
, desc_msg
, log
, transaction_id
)
283 filename
= "{}.zip".format(self
.filename
)
284 log
.message(message
.FilenameMessage(filename
))
286 self
.export_rift(schema
, format_
, desc_type
, desc_id
, desc_msg
, log
, transaction_id
)
287 filename
= "{}.tar.gz".format(self
.filename
)
288 log
.message(message
.FilenameMessage(filename
))
290 log
.message(ExportSuccess())
294 def export_rift(self
, schema
, format_
, desc_type
, desc_id
, desc_msg
, log
, transaction_id
):
295 convert
= rift
.package
.convert
296 schema_serializer_map
= {
298 "vnfd": convert
.RwVnfdSerializer
,
299 "nsd": convert
.RwNsdSerializer
,
302 "vnfd": convert
.RwVnfdSerializer
,
303 "nsd": convert
.RwNsdSerializer
,
307 if schema
not in schema_serializer_map
:
308 raise tornado
.web
.HTTPError(400, "unknown schema: {}".format(schema
))
310 if format_
!= "yaml":
311 log
.warn("Only yaml format supported for export")
313 if desc_type
not in schema_serializer_map
[schema
]:
314 raise tornado
.web
.HTTPError(400, "unknown descriptor type: {}".format(desc_type
))
316 # Use the rift superset schema as the source
317 src_serializer
= schema_serializer_map
["rift"][desc_type
]()
319 dest_serializer
= schema_serializer_map
[schema
][desc_type
]()
321 package_store
= self
.store_map
[desc_type
]
323 # Attempt to get the package from the package store
324 # If that fails, create a temporary package using the descriptor only
326 package
= package_store
.get_package(desc_id
)
327 #Remove the image file from the package while exporting
328 for file in package
.files
:
329 if rift
.package
.image
.is_image_file(file):
330 package
.remove_file(file)
332 except rift
.package
.store
.PackageNotFoundError
:
333 log
.debug("stored package not found. creating package from descriptor config")
335 desc_yaml_str
= src_serializer
.to_yaml_string(desc_msg
)
336 with io
.BytesIO(desc_yaml_str
.encode()) as hdl
:
337 hdl
.name
= "{}__{}.yaml".format(desc_msg
.id, desc_type
)
338 package
= rift
.package
.package
.DescriptorPackage
.from_descriptor_file_hdl(
342 # Get the updated descriptor from the api endpoint to get any updates
343 # made to the catalog. Also desc_msg may not be populated correctly as yet.
347 # merge the descriptor content: for rbac everything needs to be project rooted, with project name.
348 D
= collections
.defaultdict(dict)
349 sub_dict
= self
.onboarder
.get_updated_descriptor(desc_msg
, self
.project_name
)
351 if self
.project_name
:
352 D
["project"] = dict(name
= self
.project_name
)
353 root_key
, sub_key
= "project-{0}:{0}-catalog".format(desc_type
), "project-{0}:{0}".format(desc_type
)
354 D
["project"].update({root_key
: sub_dict
})
356 root_key
, sub_key
= "{0}:{0}-catalog".format(desc_type
), "{0}:{0}".format(desc_type
)
357 D
[root_key
] = sub_dict
359 json_desc_msg
= json
.dumps(D
)
360 desc_name
, desc_version
= sub_dict
[sub_key
]['name'], sub_dict
[sub_key
].get('version', '')
362 except Exception as e
:
363 msg
= "Exception {} raised - {}".format(e
.__class
__.__name
__, str(e
))
365 raise ArchiveExportError(msg
) from e
367 # exported filename based on the updated descriptor name
368 self
.filename
= "{}_{}".format(desc_name
, desc_version
)
369 self
.log
.debug("JSON string for descriptor: {}".format(json_desc_msg
))
371 self
.exporter
.export_package(
373 export_dir
=self
.application
.export_dir
,
374 file_id
= self
.filename
,
375 json_desc_str
=json_desc_msg
,
376 dest_serializer
=dest_serializer
,
377 project
=self
.project_name
,
380 def export_tosca(self
, format_
, schema
, desc_type
, desc_id
, desc_msg
, log
, transaction_id
):
381 if format_
!= "yaml":
382 log
.warn("Only yaml format supported for TOSCA export")
384 def get_pkg_from_store(id_
, type_
):
386 # Attempt to get the package from the package store
388 package_store
= self
.store_map
[type_
]
389 package
= package_store
.get_package(id_
)
391 except rift
.package
.store
.PackageNotFoundError
:
392 log
.debug("stored package not found for {}.".format(id_
))
393 except rift
.package
.store
.PackageStoreError
:
394 log
.debug("stored package error for {}.".format(id_
))
398 if desc_type
== "nsd":
399 pkg
= tosca
.ExportTosca()
401 # Add NSD and related descriptors for exporting
402 nsd_id
= pkg
.add_nsd(desc_msg
, get_pkg_from_store(desc_id
, "nsd"))
404 catalog
= self
.catalog_map
["vnfd"]
405 for const_vnfd
in desc_msg
.constituent_vnfd
:
406 vnfd_id
= const_vnfd
.vnfd_id_ref
407 if vnfd_id
in catalog
:
410 get_pkg_from_store(vnfd_id
, "vnfd"))
412 raise tornado
.web
.HTTPError(
414 "Unknown VNFD descriptor {} for NSD {}".
415 format(vnfd_id
, nsd_id
))
417 # Create the archive.
418 pkg
.create_archive(transaction_id
,
419 dest
=self
.application
.export_dir
)
420 if desc_type
== "vnfd":
421 pkg
= tosca
.ExportTosca()
422 vnfd_id
= desc_msg
.id
423 pkg
.add_single_vnfd(vnfd_id
,
425 get_pkg_from_store(vnfd_id
, "vnfd"))
427 # Create the archive.
428 pkg
.create_archive(transaction_id
,
429 dest
=self
.application
.export_dir
)
432 class ExportStateHandler(state
.StateHandler
):
433 STARTED
= ExportStart
434 SUCCESS
= ExportSuccess
435 FAILURE
= ExportFailure
439 def periodic_export_cleanup(log
, loop
, export_dir
, period_secs
=10 * 60, min_age_secs
=30 * 60):
440 """ Periodically cleanup old exported archives (.tar.gz files) in export_dir
443 log - A Logger instance
444 loop - A asyncio event loop
445 export_dir - The directory to cleanup old archives in
446 period_secs - The number of seconds between clean ups
447 min_age_secs - The minimum age of a archive to be eligible for cleanup
450 log
.debug("Starting periodic export cleaning for export directory: %s", export_dir
)
452 # Create export dir if not created yet
453 if not os
.path
.exists(export_dir
):
454 os
.makedirs(export_dir
)
457 yield from asyncio
.sleep(period_secs
, loop
=loop
)
459 if not os
.path
.exists(export_dir
):
462 for file_name
in os
.listdir(export_dir
):
463 if not file_name
.endswith(".tar.gz"):
466 file_path
= os
.path
.join(export_dir
, file_name
)
469 file_stat
= os
.stat(file_path
)
471 log
.warning("Could not stat old exported archive: %s", str(e
))
474 file_age
= time
.time() - file_stat
[stat
.ST_MTIME
]
476 if file_age
< min_age_secs
:
479 log
.debug("Cleaning up old exported archive: %s", file_path
)
484 log
.warning("Failed to remove old exported archive: %s", str(e
))