3 # Copyright 2016-2017 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
27 import rift
.package
.archive
28 import rift
.package
.checksums
29 import rift
.package
.package
30 import rift
.package
.store
31 import rift
.package
.image
38 gi
.require_version('RwPkgMgmtYang', '1.0')
40 from gi
.repository
import (
42 import rift
.mano
.dts
as mano_dts
45 RPC_PACKAGE_EXPORT_ENDPOINT
= RwPkgMgmtYang
.YangOutput_RwPkgMgmt_PackageExport
48 class ExportStart(message
.StatusMessage
):
50 super().__init
__("export-started", "export process started")
53 class ExportSuccess(message
.StatusMessage
):
55 super().__init
__("export-success", "export process successfully completed")
58 class ExportFailure(message
.StatusMessage
):
60 super().__init
__("export-failure", "export process failed")
63 class ExportError(message
.ErrorMessage
):
64 def __init__(self
, msg
):
65 super().__init
__("update-error", msg
)
68 class ExportSingleDescriptorOnlyError(ExportError
):
70 super().__init
__("Only a single descriptor can be exported")
73 class ArchiveExportError(Exception):
77 class DescriptorPackageArchiveExporter(object):
78 def __init__(self
, log
):
81 def _create_archive_from_package(self
, archive_hdl
, package
, open_fn
):
82 orig_open
= package
.open
84 package
.open = open_fn
85 archive
= rift
.package
.archive
.TarPackageArchive
.from_package(
86 self
._log
, package
, archive_hdl
90 package
.open = orig_open
92 def create_archive(self
, archive_hdl
, package
, desc_json_str
, serializer
):
93 """ Create a package archive from an existing package, descriptor messages,
94 and a destination serializer.
96 In order to stay flexible with the package directory structure and
97 descriptor format, attempt to "augment" the onboarded package with the
98 updated descriptor in the original format. If the original package
99 contained a checksum file, then recalculate the descriptor checksum.
102 archive_hdl - An open file handle with 'wb' permissions
103 package - A DescriptorPackage instance
104 desc_json_str - A descriptor (e.g. nsd, vnfd) protobuf message
105 serializer - A destination serializer (e.g. VnfdSerializer)
111 ArchiveExportError - The exported archive failed to create
114 new_desc_msg
= serializer
.from_file_hdl(io
.BytesIO(desc_json_str
.encode()), ".json")
115 _
, dest_ext
= os
.path
.splitext(package
.descriptor_file
)
116 new_desc_hdl
= io
.BytesIO(serializer
.to_string(new_desc_msg
, dest_ext
).encode())
117 descriptor_checksum
= rift
.package
.checksums
.checksum(new_desc_hdl
)
121 checksum_file
= rift
.package
.package
.PackageChecksumValidator
.get_package_checksum_file(
125 except FileNotFoundError
:
128 # Since we're going to intercept the open function to rewrite the descriptor
129 # and checksum, save a handle to use below
130 open_fn
= package
.open
132 def create_checksum_file_hdl():
133 with
open_fn(checksum_file
) as checksum_hdl
:
134 archive_checksums
= rift
.package
.checksums
.ArchiveChecksums
.from_file_desc(
138 archive_checksums
[package
.descriptor_file
] = descriptor_checksum
140 checksum_hdl
= io
.BytesIO(archive_checksums
.to_string().encode())
143 def open_wrapper(rel_path
):
144 """ Wraps the package open in order to rewrite the descriptor file and checksum """
145 if rel_path
== package
.descriptor_file
:
148 elif rel_path
== checksum_file
:
149 return create_checksum_file_hdl()
151 return open_fn(rel_path
)
153 archive
= self
._create
_archive
_from
_package
(archive_hdl
, package
, open_wrapper
)
157 def export_package(self
, package
, export_dir
, file_id
, json_desc_str
, dest_serializer
):
158 """ Export package as an archive to the export directory
161 package - A DescriptorPackage instance
162 export_dir - The directory to export the package archive to
163 file_id - A unique file id to name the archive as (i.e. <file_id>.tar.gz)
164 json_desc_str - A descriptor (e.g. nsd, vnfd) json message string
165 dest_serializer - A destination serializer (e.g. VnfdSerializer)
168 The created archive path
171 ArchiveExportError - The exported archive failed to create
174 os
.makedirs(export_dir
, exist_ok
=True)
175 except FileExistsError
:
178 archive_path
= os
.path
.join(export_dir
, file_id
+ ".tar.gz")
179 with
open(archive_path
, 'wb') as archive_hdl
:
182 archive_hdl
, package
, json_desc_str
, dest_serializer
184 except Exception as e
:
185 os
.remove(archive_path
)
186 msg
= "Failed to create exported archive"
188 raise ArchiveExportError(msg
) from e
193 class ExportRpcHandler(mano_dts
.AbstractRpcHandler
):
194 def __init__(self
, application
, catalog_map
):
197 application: UploaderApplication
198 calalog_map: Dict containing Vnfds and Nsd onboarding.
200 super().__init
__(application
.log
, application
.dts
, application
.loop
)
202 self
.application
= application
203 self
.store_map
= application
.package_store_map
204 self
.exporter
= application
.exporter
205 self
.catalog_map
= catalog_map
209 return "/rw-pkg-mgmt:package-export"
212 def callback(self
, ks_path
, msg
):
213 transaction_id
= str(uuid
.uuid4())
214 log
= message
.Logger(
216 self
.application
.messages
[transaction_id
],
219 file_name
= self
.export(transaction_id
, log
, msg
)
221 rpc_out
= RPC_PACKAGE_EXPORT_ENDPOINT
.from_dict({
222 'transaction_id': transaction_id
,
223 'filename': file_name
})
227 def export(self
, transaction_id
, log
, msg
):
228 log
.message(ExportStart())
229 desc_type
= msg
.package_type
.lower()
231 if desc_type
not in self
.catalog_map
:
232 raise ValueError("Invalid package type: {}".format(desc_type
))
235 desc_id
= msg
.package_id
236 catalog
= self
.catalog_map
[desc_type
](project
=msg
.project_name
)
238 if desc_id
not in catalog
:
239 raise ValueError("Unable to find package ID: {}".format(desc_id
))
241 desc_msg
= catalog
[desc_id
]
243 # Get the schema for exporting
244 schema
= msg
.export_schema
.lower()
246 # Get the grammar for exporting
247 grammar
= msg
.export_grammar
.lower()
249 # Get the format for exporting
250 format_
= msg
.export_format
.lower()
254 if grammar
== 'tosca':
255 filename
= "{}.zip".format(transaction_id
)
256 self
.export_tosca(schema
, format_
, desc_type
, desc_id
, desc_msg
, log
, transaction_id
)
257 log
.message(message
.FilenameMessage(filename
))
259 filename
= "{}.tar.gz".format(transaction_id
)
260 self
.export_rift(schema
, format_
, desc_type
, desc_id
, desc_msg
, log
, transaction_id
)
261 log
.message(message
.FilenameMessage(filename
))
263 log
.message(ExportSuccess())
267 def export_rift(self
, schema
, format_
, desc_type
, desc_id
, desc_msg
, log
, transaction_id
):
268 convert
= rift
.package
.convert
269 schema_serializer_map
= {
271 "vnfd": convert
.RwVnfdSerializer
,
272 "nsd": convert
.RwNsdSerializer
,
275 "vnfd": convert
.VnfdSerializer
,
276 "nsd": convert
.NsdSerializer
,
280 if schema
not in schema_serializer_map
:
281 raise tornado
.web
.HTTPError(400, "unknown schema: {}".format(schema
))
283 if format_
!= "yaml":
284 log
.warn("Only yaml format supported for export")
286 if desc_type
not in schema_serializer_map
[schema
]:
287 raise tornado
.web
.HTTPError(400, "unknown descriptor type: {}".format(desc_type
))
289 # Use the rift superset schema as the source
290 src_serializer
= schema_serializer_map
["rift"][desc_type
]()
292 dest_serializer
= schema_serializer_map
[schema
][desc_type
]()
294 package_store
= self
.store_map
[desc_type
]
296 # Attempt to get the package from the package store
297 # If that fails, create a temporary package using the descriptor only
299 package
= package_store
.get_package(desc_id
)
300 except rift
.package
.store
.PackageNotFoundError
:
301 log
.debug("stored package not found. creating package from descriptor config")
303 desc_yaml_str
= src_serializer
.to_yaml_string(desc_msg
)
304 with io
.BytesIO(desc_yaml_str
.encode()) as hdl
:
305 hdl
.name
= "{}__{}.yaml".format(desc_msg
.id, desc_type
)
306 package
= rift
.package
.package
.DescriptorPackage
.from_descriptor_file_hdl(
310 self
.exporter
.export_package(
312 export_dir
=self
.application
.export_dir
,
313 file_id
=transaction_id
,
314 json_desc_str
=src_serializer
.to_json_string(desc_msg
),
315 dest_serializer
=dest_serializer
,
318 def export_tosca(self
, format_
, schema
, desc_type
, desc_id
, desc_msg
, log
, transaction_id
):
319 if format_
!= "yaml":
320 log
.warn("Only yaml format supported for TOSCA export")
322 def get_pkg_from_store(id_
, type_
):
324 # Attempt to get the package from the package store
326 package_store
= self
.store_map
[type_
]
327 package
= package_store
.get_package(id_
)
329 except rift
.package
.store
.PackageNotFoundError
:
330 log
.debug("stored package not found for {}.".format(id_
))
331 except rift
.package
.store
.PackageStoreError
:
332 log
.debug("stored package error for {}.".format(id_
))
336 if desc_type
== "nsd":
337 pkg
= tosca
.ExportTosca()
339 # Add NSD and related descriptors for exporting
340 nsd_id
= pkg
.add_nsd(desc_msg
, get_pkg_from_store(desc_id
, "nsd"))
342 catalog
= self
.catalog_map
["vnfd"]
343 for const_vnfd
in desc_msg
.constituent_vnfd
:
344 vnfd_id
= const_vnfd
.vnfd_id_ref
345 if vnfd_id
in catalog
:
348 get_pkg_from_store(vnfd_id
, "vnfd"))
350 raise tornado
.web
.HTTPError(
352 "Unknown VNFD descriptor {} for NSD {}".
353 format(vnfd_id
, nsd_id
))
355 # Create the archive.
356 pkg
.create_archive(transaction_id
,
357 dest
=self
.application
.export_dir
)
358 if desc_type
== "vnfd":
359 pkg
= tosca
.ExportTosca()
360 vnfd_id
= desc_msg
.id
361 pkg
.add_single_vnfd(vnfd_id
,
363 get_pkg_from_store(vnfd_id
, "vnfd"))
365 # Create the archive.
366 pkg
.create_archive(transaction_id
,
367 dest
=self
.application
.export_dir
)
370 class ExportStateHandler(state
.StateHandler
):
371 STARTED
= ExportStart
372 SUCCESS
= ExportSuccess
373 FAILURE
= ExportFailure
377 def periodic_export_cleanup(log
, loop
, export_dir
, period_secs
=10 * 60, min_age_secs
=30 * 60):
378 """ Periodically cleanup old exported archives (.tar.gz files) in export_dir
381 log - A Logger instance
382 loop - A asyncio event loop
383 export_dir - The directory to cleanup old archives in
384 period_secs - The number of seconds between clean ups
385 min_age_secs - The minimum age of a archive to be eligible for cleanup
388 log
.debug("Starting periodic export cleaning for export directory: %s", export_dir
)
390 # Create export dir if not created yet
391 if not os
.path
.exists(export_dir
):
392 os
.makedirs(export_dir
)
395 yield from asyncio
.sleep(period_secs
, loop
=loop
)
397 if not os
.path
.exists(export_dir
):
400 for file_name
in os
.listdir(export_dir
):
401 if not file_name
.endswith(".tar.gz"):
404 file_path
= os
.path
.join(export_dir
, file_name
)
407 file_stat
= os
.stat(file_path
)
409 log
.warning("Could not stat old exported archive: %s", str(e
))
412 file_age
= time
.time() - file_stat
[stat
.ST_MTIME
]
414 if file_age
< min_age_secs
:
417 log
.debug("Cleaning up old exported archive: %s", file_path
)
422 log
.warning("Failed to remove old exported archive: %s", str(e
))