3 # Copyright 2016 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
27 import rift
.package
.archive
28 import rift
.package
.checksums
29 import rift
.package
.package
30 import rift
.package
.store
31 import rift
.package
.image
38 gi
.require_version('NsdYang', '1.0')
39 gi
.require_version('VnfdYang', '1.0')
40 gi
.require_version('RwPkgMgmtYang', '1.0')
42 from gi
.repository
import (
46 import rift
.mano
.dts
as mano_dts
49 RPC_PACKAGE_EXPORT_ENDPOINT
= RwPkgMgmtYang
.YangOutput_RwPkgMgmt_PackageExport
52 class ExportStart(message
.StatusMessage
):
54 super().__init
__("export-started", "export process started")
57 class ExportSuccess(message
.StatusMessage
):
59 super().__init
__("export-success", "export process successfully completed")
62 class ExportFailure(message
.StatusMessage
):
64 super().__init
__("export-failure", "export process failed")
67 class ExportError(message
.ErrorMessage
):
68 def __init__(self
, msg
):
69 super().__init
__("update-error", msg
)
72 class ExportSingleDescriptorOnlyError(ExportError
):
74 super().__init
__("Only a single descriptor can be exported")
77 class ArchiveExportError(Exception):
81 class DescriptorPackageArchiveExporter(object):
82 def __init__(self
, log
):
85 def _create_archive_from_package(self
, archive_hdl
, package
, open_fn
):
86 orig_open
= package
.open
88 package
.open = open_fn
89 archive
= rift
.package
.archive
.TarPackageArchive
.from_package(
90 self
._log
, package
, archive_hdl
94 package
.open = orig_open
96 def create_archive(self
, archive_hdl
, package
, desc_json_str
, serializer
):
97 """ Create a package archive from an existing package, descriptor messages,
98 and a destination serializer.
100 In order to stay flexible with the package directory structure and
101 descriptor format, attempt to "augment" the onboarded package with the
102 updated descriptor in the original format. If the original package
103 contained a checksum file, then recalculate the descriptor checksum.
106 archive_hdl - An open file handle with 'wb' permissions
107 package - A DescriptorPackage instance
108 desc_json_str - A descriptor (e.g. nsd, vnfd) protobuf message
109 serializer - A destination serializer (e.g. VnfdSerializer)
115 ArchiveExportError - The exported archive failed to create
118 new_desc_msg
= serializer
.from_file_hdl(io
.BytesIO(desc_json_str
.encode()), ".json")
119 _
, dest_ext
= os
.path
.splitext(package
.descriptor_file
)
120 new_desc_hdl
= io
.BytesIO(serializer
.to_string(new_desc_msg
, dest_ext
).encode())
121 descriptor_checksum
= rift
.package
.checksums
.checksum(new_desc_hdl
)
125 checksum_file
= rift
.package
.package
.PackageChecksumValidator
.get_package_checksum_file(
129 except FileNotFoundError
:
132 # Since we're going to intercept the open function to rewrite the descriptor
133 # and checksum, save a handle to use below
134 open_fn
= package
.open
136 def create_checksum_file_hdl():
137 with
open_fn(checksum_file
) as checksum_hdl
:
138 archive_checksums
= rift
.package
.checksums
.ArchiveChecksums
.from_file_desc(
142 archive_checksums
[package
.descriptor_file
] = descriptor_checksum
144 checksum_hdl
= io
.BytesIO(archive_checksums
.to_string().encode())
147 def open_wrapper(rel_path
):
148 """ Wraps the package open in order to rewrite the descriptor file and checksum """
149 if rel_path
== package
.descriptor_file
:
152 elif rel_path
== checksum_file
:
153 return create_checksum_file_hdl()
155 return open_fn(rel_path
)
157 archive
= self
._create
_archive
_from
_package
(archive_hdl
, package
, open_wrapper
)
161 def export_package(self
, package
, export_dir
, file_id
, json_desc_str
, dest_serializer
):
162 """ Export package as an archive to the export directory
165 package - A DescriptorPackage instance
166 export_dir - The directory to export the package archive to
167 file_id - A unique file id to name the archive as (i.e. <file_id>.tar.gz)
168 json_desc_str - A descriptor (e.g. nsd, vnfd) json message string
169 dest_serializer - A destination serializer (e.g. VnfdSerializer)
172 The created archive path
175 ArchiveExportError - The exported archive failed to create
178 os
.makedirs(export_dir
, exist_ok
=True)
179 except FileExistsError
:
182 archive_path
= os
.path
.join(export_dir
, file_id
+ ".tar.gz")
183 with
open(archive_path
, 'wb') as archive_hdl
:
186 archive_hdl
, package
, json_desc_str
, dest_serializer
188 except Exception as e
:
189 os
.remove(archive_path
)
190 msg
= "Failed to create exported archive"
192 raise ArchiveExportError(msg
) from e
197 class ExportRpcHandler(mano_dts
.AbstractRpcHandler
):
198 def __init__(self
, log
, dts
, loop
, application
, store_map
, exporter
, catalog_map
):
201 application: UploaderApplication
202 store_map: dict containing VnfdStore & NsdStore
203 exporter : DescriptorPackageArchiveExporter
204 calalog_map: Dict containing Vnfds and Nsd onboarding.
206 super().__init
__(log
, dts
, loop
)
208 self
.application
= application
209 self
.store_map
= store_map
210 self
.exporter
= exporter
211 self
.catalog_map
= catalog_map
216 return "/rw-pkg-mgmt:package-export"
219 def callback(self
, ks_path
, msg
):
220 transaction_id
= str(uuid
.uuid4())
221 log
= message
.Logger(
223 self
.application
.messages
[transaction_id
],
226 file_name
= self
.export(transaction_id
, log
, msg
)
228 rpc_out
= RPC_PACKAGE_EXPORT_ENDPOINT
.from_dict({
229 'transaction_id': transaction_id
,
230 'filename': file_name
})
234 def export(self
, transaction_id
, log
, msg
):
235 log
.message(ExportStart())
236 desc_type
= msg
.package_type
.lower()
238 if desc_type
not in self
.catalog_map
:
239 raise ValueError("Invalid package type: {}".format(desc_type
))
242 desc_id
= msg
.package_id
243 catalog
= self
.catalog_map
[desc_type
]
245 if desc_id
not in catalog
:
246 raise ValueError("Unable to find package ID: {}".format(desc_id
))
248 desc_msg
= catalog
[desc_id
]
250 # Get the schema for exporting
251 schema
= msg
.export_schema
.lower()
253 # Get the grammar for exporting
254 grammar
= msg
.export_grammar
.lower()
256 # Get the format for exporting
257 format_
= msg
.export_format
.lower()
261 if grammar
== 'tosca':
262 filename
= "{}.zip".format(transaction_id
)
263 self
.export_tosca(schema
, format_
, desc_type
, desc_id
, desc_msg
, log
, transaction_id
)
264 log
.message(message
.FilenameMessage(filename
))
266 filename
= "{}.tar.gz".format(transaction_id
)
267 self
.export_rift(schema
, format_
, desc_type
, desc_id
, desc_msg
, log
, transaction_id
)
268 log
.message(message
.FilenameMessage(filename
))
270 log
.message(ExportSuccess())
274 def export_rift(self
, schema
, format_
, desc_type
, desc_id
, desc_msg
, log
, transaction_id
):
275 convert
= rift
.package
.convert
276 schema_serializer_map
= {
278 "vnfd": convert
.RwVnfdSerializer
,
279 "nsd": convert
.RwNsdSerializer
,
282 "vnfd": convert
.VnfdSerializer
,
283 "nsd": convert
.NsdSerializer
,
287 if schema
not in schema_serializer_map
:
288 raise tornado
.web
.HTTPError(400, "unknown schema: {}".format(schema
))
290 if format_
!= "yaml":
291 log
.warn("Only yaml format supported for export")
293 if desc_type
not in schema_serializer_map
[schema
]:
294 raise tornado
.web
.HTTPError(400, "unknown descriptor type: {}".format(desc_type
))
296 # Use the rift superset schema as the source
297 src_serializer
= schema_serializer_map
["rift"][desc_type
]()
299 dest_serializer
= schema_serializer_map
[schema
][desc_type
]()
301 package_store
= self
.store_map
[desc_type
]
303 # Attempt to get the package from the package store
304 # If that fails, create a temporary package using the descriptor only
306 package
= package_store
.get_package(desc_id
)
307 except rift
.package
.store
.PackageNotFoundError
:
308 log
.debug("stored package not found. creating package from descriptor config")
310 desc_yaml_str
= src_serializer
.to_yaml_string(desc_msg
)
311 with io
.BytesIO(desc_yaml_str
.encode()) as hdl
:
312 hdl
.name
= "{}__{}.yaml".format(desc_msg
.id, desc_type
)
313 package
= rift
.package
.package
.DescriptorPackage
.from_descriptor_file_hdl(
317 self
.exporter
.export_package(
319 export_dir
=self
.application
.export_dir
,
320 file_id
=transaction_id
,
321 json_desc_str
=src_serializer
.to_json_string(desc_msg
),
322 dest_serializer
=dest_serializer
,
325 def export_tosca(self
, format_
, schema
, desc_type
, desc_id
, desc_msg
, log
, transaction_id
):
326 if format_
!= "yaml":
327 log
.warn("Only yaml format supported for TOSCA export")
329 if desc_type
!= "nsd":
330 raise tornado
.web
.HTTPError(
332 "NSD need to passed to generate TOSCA: {}".format(desc_type
))
334 def get_pkg_from_store(id_
, type_
):
336 # Attempt to get the package from the package store
338 package_store
= self
.store_map
[type_
]
339 package
= package_store
.get_package(id_
)
341 except rift
.package
.store
.PackageNotFoundError
:
342 log
.debug("stored package not found for {}.".format(id_
))
343 except rift
.package
.store
.PackageStoreError
:
344 log
.debug("stored package error for {}.".format(id_
))
348 pkg
= tosca
.ExportTosca()
350 # Add NSD and related descriptors for exporting
351 nsd_id
= pkg
.add_nsd(desc_msg
, get_pkg_from_store(desc_id
, "nsd"))
353 catalog
= self
.catalog_map
["vnfd"]
354 for const_vnfd
in desc_msg
.constituent_vnfd
:
355 vnfd_id
= const_vnfd
.vnfd_id_ref
356 if vnfd_id
in catalog
:
359 get_pkg_from_store(vnfd_id
, "vnfd"))
361 raise tornado
.web
.HTTPError(
363 "Unknown VNFD descriptor {} for NSD {}".
364 format(vnfd_id
, nsd_id
))
366 # Create the archive.
367 pkg
.create_archive(transaction_id
,
368 dest
=self
.application
.export_dir
)
371 class ExportStateHandler(state
.StateHandler
):
372 STARTED
= ExportStart
373 SUCCESS
= ExportSuccess
374 FAILURE
= ExportFailure
378 def periodic_export_cleanup(log
, loop
, export_dir
, period_secs
=10 * 60, min_age_secs
=30 * 60):
379 """ Periodically cleanup old exported archives (.tar.gz files) in export_dir
382 log - A Logger instance
383 loop - A asyncio event loop
384 export_dir - The directory to cleanup old archives in
385 period_secs - The number of seconds between clean ups
386 min_age_secs - The minimum age of a archive to be eligible for cleanup
389 log
.debug("Starting periodic export cleaning for export directory: %s", export_dir
)
391 # Create export dir if not created yet
392 if not os
.path
.exists(export_dir
):
393 os
.makedirs(export_dir
)
396 yield from asyncio
.sleep(period_secs
, loop
=loop
)
398 if not os
.path
.exists(export_dir
):
401 for file_name
in os
.listdir(export_dir
):
402 if not file_name
.endswith(".tar.gz"):
405 file_path
= os
.path
.join(export_dir
, file_name
)
408 file_stat
= os
.stat(file_path
)
410 log
.warning("Could not stat old exported archive: %s", str(e
))
413 file_age
= time
.time() - file_stat
[stat
.ST_MTIME
]
415 if file_age
< min_age_secs
:
418 log
.debug("Cleaning up old exported archive: %s", file_path
)
423 log
.warning("Failed to remove old exported archive: %s", str(e
))