3 # Copyright 2016 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
29 import rift
.package
.archive
30 import rift
.package
.checksums
31 import rift
.package
.package
32 import rift
.package
.store
33 import rift
.package
.image
40 gi
.require_version('NsdYang', '1.0')
41 gi
.require_version('VnfdYang', '1.0')
42 gi
.require_version('RwPkgMgmtYang', '1.0')
44 from gi
.repository
import (
48 import rift
.mano
.dts
as mano_dts
51 RPC_PACKAGE_EXPORT_ENDPOINT
= RwPkgMgmtYang
.YangOutput_RwPkgMgmt_PackageExport
54 class ExportStart(message
.StatusMessage
):
56 super().__init
__("export-started", "export process started")
59 class ExportSuccess(message
.StatusMessage
):
61 super().__init
__("export-success", "export process successfully completed")
64 class ExportFailure(message
.StatusMessage
):
66 super().__init
__("export-failure", "export process failed")
69 class ExportError(message
.ErrorMessage
):
70 def __init__(self
, msg
):
71 super().__init
__("update-error", msg
)
74 class ExportSingleDescriptorOnlyError(ExportError
):
76 super().__init
__("Only a single descriptor can be exported")
79 class ArchiveExportError(Exception):
83 class DescriptorPackageArchiveExporter(object):
84 def __init__(self
, log
):
87 def _create_archive_from_package(self
, archive_hdl
, package
, open_fn
, top_level_dir
=None):
88 orig_open
= package
.open
90 package
.open = open_fn
91 archive
= rift
.package
.archive
.TarPackageArchive
.from_package(
92 self
._log
, package
, archive_hdl
, top_level_dir
96 package
.open = orig_open
98 def create_archive(self
, archive_hdl
, package
, desc_json_str
, serializer
):
99 """ Create a package archive from an existing package, descriptor messages,
100 and a destination serializer.
102 In order to stay flexible with the package directory structure and
103 descriptor format, attempt to "augment" the onboarded package with the
104 updated descriptor in the original format. If the original package
105 contained a checksum file, then recalculate the descriptor checksum.
108 archive_hdl - An open file handle with 'wb' permissions
109 package - A DescriptorPackage instance
110 desc_json_str - A descriptor (e.g. nsd, vnfd) protobuf message
111 serializer - A destination serializer (e.g. VnfdSerializer)
117 ArchiveExportError - The exported archive failed to create
120 new_desc_msg
= serializer
.from_file_hdl(io
.BytesIO(desc_json_str
.encode()), ".json")
121 _
, dest_ext
= os
.path
.splitext(package
.descriptor_file
)
122 new_desc_hdl
= io
.BytesIO(serializer
.to_string(new_desc_msg
, dest_ext
).encode())
123 descriptor_checksum
= rift
.package
.checksums
.checksum(new_desc_hdl
)
127 checksum_file
= rift
.package
.package
.PackageChecksumValidator
.get_package_checksum_file(
131 except FileNotFoundError
:
134 # Since we're going to intercept the open function to rewrite the descriptor
135 # and checksum, save a handle to use below
136 open_fn
= package
.open
138 def create_checksum_file_hdl():
139 with
open_fn(checksum_file
) as checksum_hdl
:
140 archive_checksums
= rift
.package
.checksums
.ArchiveChecksums
.from_file_desc(
144 archive_checksums
[package
.descriptor_file
] = descriptor_checksum
146 checksum_hdl
= io
.BytesIO(archive_checksums
.to_string().encode())
149 def open_wrapper(rel_path
):
150 """ Wraps the package open in order to rewrite the descriptor file and checksum """
151 if rel_path
== package
.descriptor_file
:
154 elif rel_path
== checksum_file
:
155 return create_checksum_file_hdl()
157 return open_fn(rel_path
)
159 archive
= self
._create
_archive
_from
_package
(archive_hdl
, package
, open_wrapper
, new_desc_msg
.name
)
163 def export_package(self
, package
, export_dir
, file_id
, json_desc_str
, dest_serializer
):
164 """ Export package as an archive to the export directory
167 package - A DescriptorPackage instance
168 export_dir - The directory to export the package archive to
169 file_id - A unique file id to name the archive as (i.e. <file_id>.tar.gz)
170 json_desc_str - A descriptor (e.g. nsd, vnfd) json message string
171 dest_serializer - A destination serializer (e.g. VnfdSerializer)
174 The created archive path
177 ArchiveExportError - The exported archive failed to create
180 os
.makedirs(export_dir
, exist_ok
=True)
181 except FileExistsError
:
184 archive_path
= os
.path
.join(export_dir
, file_id
+ ".tar.gz")
185 with
open(archive_path
, 'wb') as archive_hdl
:
188 archive_hdl
, package
, json_desc_str
, dest_serializer
190 except Exception as e
:
191 os
.remove(archive_path
)
192 msg
= "Failed to create exported archive"
194 raise ArchiveExportError(msg
) from e
199 class ExportRpcHandler(mano_dts
.AbstractRpcHandler
):
200 def __init__(self
, log
, dts
, loop
, application
, store_map
, exporter
, onboarder
, catalog_map
):
203 application: UploaderApplication
204 store_map: dict containing VnfdStore & NsdStore
205 exporter : DescriptorPackageArchiveExporter
206 calalog_map: Dict containing Vnfds and Nsd onboarding.
208 super().__init
__(log
, dts
, loop
)
210 self
.application
= application
211 self
.store_map
= store_map
212 self
.exporter
= exporter
213 self
.onboarder
= onboarder
214 self
.catalog_map
= catalog_map
219 return "/rw-pkg-mgmt:package-export"
222 def callback(self
, ks_path
, msg
):
223 transaction_id
= str(uuid
.uuid4())
224 log
= message
.Logger(
226 self
.application
.messages
[transaction_id
],
229 file_name
= self
.export(transaction_id
, log
, msg
)
231 rpc_out
= RPC_PACKAGE_EXPORT_ENDPOINT
.from_dict({
232 'transaction_id': transaction_id
,
233 'filename': file_name
})
237 def export(self
, transaction_id
, log
, msg
):
238 log
.message(ExportStart())
239 desc_type
= msg
.package_type
.lower()
241 if desc_type
not in self
.catalog_map
:
242 raise ValueError("Invalid package type: {}".format(desc_type
))
245 desc_id
= msg
.package_id
246 catalog
= self
.catalog_map
[desc_type
]
248 if desc_id
not in catalog
:
249 raise ValueError("Unable to find package ID: {}".format(desc_id
))
251 desc_msg
= catalog
[desc_id
]
253 # Get the schema for exporting
254 schema
= msg
.export_schema
.lower()
256 # Get the grammar for exporting
257 grammar
= msg
.export_grammar
.lower()
259 # Get the format for exporting
260 format_
= msg
.export_format
.lower()
262 # Initial value of the exported filename
263 self
.filename
= "{name}_{ver}".format(
265 ver
=desc_msg
.version
)
267 if grammar
== 'tosca':
268 self
.export_tosca(schema
, format_
, desc_type
, desc_id
, desc_msg
, log
, transaction_id
)
269 filename
= "{}.zip".format(self
.filename
)
270 log
.message(message
.FilenameMessage(filename
))
272 self
.export_rift(schema
, format_
, desc_type
, desc_id
, desc_msg
, log
, transaction_id
)
273 filename
= "{}.tar.gz".format(self
.filename
)
274 log
.message(message
.FilenameMessage(filename
))
276 log
.message(ExportSuccess())
280 def export_rift(self
, schema
, format_
, desc_type
, desc_id
, desc_msg
, log
, transaction_id
):
281 convert
= rift
.package
.convert
282 schema_serializer_map
= {
284 "vnfd": convert
.RwVnfdSerializer
,
285 "nsd": convert
.RwNsdSerializer
,
288 "vnfd": convert
.RwVnfdSerializer
,
289 "nsd": convert
.RwNsdSerializer
,
293 if schema
not in schema_serializer_map
:
294 raise tornado
.web
.HTTPError(400, "unknown schema: {}".format(schema
))
296 if format_
!= "yaml":
297 log
.warn("Only yaml format supported for export")
299 if desc_type
not in schema_serializer_map
[schema
]:
300 raise tornado
.web
.HTTPError(400, "unknown descriptor type: {}".format(desc_type
))
302 # Use the rift superset schema as the source
303 src_serializer
= schema_serializer_map
["rift"][desc_type
]()
305 dest_serializer
= schema_serializer_map
[schema
][desc_type
]()
307 package_store
= self
.store_map
[desc_type
]
309 # Attempt to get the package from the package store
310 # If that fails, create a temporary package using the descriptor only
312 package
= package_store
.get_package(desc_id
)
313 except rift
.package
.store
.PackageNotFoundError
:
314 log
.debug("stored package not found. creating package from descriptor config")
316 desc_yaml_str
= src_serializer
.to_yaml_string(desc_msg
)
317 with io
.BytesIO(desc_yaml_str
.encode()) as hdl
:
318 hdl
.name
= "{}__{}.yaml".format(desc_msg
.id, desc_type
)
319 package
= rift
.package
.package
.DescriptorPackage
.from_descriptor_file_hdl(
323 # Try to get the updated descriptor from the api endpoint so that we have
324 # the updated descriptor file in the exported archive and the name of the archive
325 # tar matches the name in the yaml descriptor file. Proceed with the current
326 # file if there's an error
328 json_desc_msg
= src_serializer
.to_json_string(desc_msg
)
329 desc_name
, desc_version
= desc_msg
.name
, desc_msg
.version
331 d
= collections
.defaultdict(dict)
332 sub_dict
= self
.onboarder
.get_updated_descriptor(desc_msg
)
333 root_key
, sub_key
= "{0}:{0}-catalog".format(desc_type
), "{0}:{0}".format(desc_type
)
334 # root the dict under "vnfd:vnfd-catalog"
335 d
[root_key
] = sub_dict
337 json_desc_msg
= json
.dumps(d
)
338 desc_name
, desc_version
= sub_dict
[sub_key
]['name'], sub_dict
[sub_key
]['version']
340 except Exception as e
:
341 msg
= "Exception {} raised - {}".format(e
.__class
__.__name
__, str(e
))
344 # exported filename based on the updated descriptor name
345 self
.filename
= "{}_{}".format(desc_name
, desc_version
)
347 self
.exporter
.export_package(
349 export_dir
=self
.application
.export_dir
,
350 file_id
= self
.filename
,
351 json_desc_str
=json_desc_msg
,
352 dest_serializer
=dest_serializer
,
355 def export_tosca(self
, format_
, schema
, desc_type
, desc_id
, desc_msg
, log
, transaction_id
):
356 if format_
!= "yaml":
357 log
.warn("Only yaml format supported for TOSCA export")
359 def get_pkg_from_store(id_
, type_
):
361 # Attempt to get the package from the package store
363 package_store
= self
.store_map
[type_
]
364 package
= package_store
.get_package(id_
)
366 except rift
.package
.store
.PackageNotFoundError
:
367 log
.debug("stored package not found for {}.".format(id_
))
368 except rift
.package
.store
.PackageStoreError
:
369 log
.debug("stored package error for {}.".format(id_
))
373 if desc_type
== "nsd":
374 pkg
= tosca
.ExportTosca()
376 # Add NSD and related descriptors for exporting
377 nsd_id
= pkg
.add_nsd(desc_msg
, get_pkg_from_store(desc_id
, "nsd"))
379 catalog
= self
.catalog_map
["vnfd"]
380 for const_vnfd
in desc_msg
.constituent_vnfd
:
381 vnfd_id
= const_vnfd
.vnfd_id_ref
382 if vnfd_id
in catalog
:
385 get_pkg_from_store(vnfd_id
, "vnfd"))
387 raise tornado
.web
.HTTPError(
389 "Unknown VNFD descriptor {} for NSD {}".
390 format(vnfd_id
, nsd_id
))
392 # Create the archive.
393 pkg
.create_archive(transaction_id
,
394 dest
=self
.application
.export_dir
)
395 if desc_type
== "vnfd":
396 pkg
= tosca
.ExportTosca()
397 vnfd_id
= desc_msg
.id
398 pkg
.add_single_vnfd(vnfd_id
,
400 get_pkg_from_store(vnfd_id
, "vnfd"))
402 # Create the archive.
403 pkg
.create_archive(transaction_id
,
404 dest
=self
.application
.export_dir
)
407 class ExportStateHandler(state
.StateHandler
):
408 STARTED
= ExportStart
409 SUCCESS
= ExportSuccess
410 FAILURE
= ExportFailure
414 def periodic_export_cleanup(log
, loop
, export_dir
, period_secs
=10 * 60, min_age_secs
=30 * 60):
415 """ Periodically cleanup old exported archives (.tar.gz files) in export_dir
418 log - A Logger instance
419 loop - A asyncio event loop
420 export_dir - The directory to cleanup old archives in
421 period_secs - The number of seconds between clean ups
422 min_age_secs - The minimum age of a archive to be eligible for cleanup
425 log
.debug("Starting periodic export cleaning for export directory: %s", export_dir
)
427 # Create export dir if not created yet
428 if not os
.path
.exists(export_dir
):
429 os
.makedirs(export_dir
)
432 yield from asyncio
.sleep(period_secs
, loop
=loop
)
434 if not os
.path
.exists(export_dir
):
437 for file_name
in os
.listdir(export_dir
):
438 if not file_name
.endswith(".tar.gz"):
441 file_path
= os
.path
.join(export_dir
, file_name
)
444 file_stat
= os
.stat(file_path
)
446 log
.warning("Could not stat old exported archive: %s", str(e
))
449 file_age
= time
.time() - file_stat
[stat
.ST_MTIME
]
451 if file_age
< min_age_secs
:
454 log
.debug("Cleaning up old exported archive: %s", file_path
)
459 log
.warning("Failed to remove old exported archive: %s", str(e
))