[RIFT 16087] Backend changes to decouple storage semantics from user interface. Chang...
[osm/SO.git] / rwlaunchpad / plugins / rwlaunchpadtasklet / rift / tasklets / rwlaunchpad / export.py
1
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19 import io
20 import os.path
21 import stat
22 import time
23 import uuid
24 import collections
25 import json
26
27 import tornado.web
28
29 import rift.package.archive
30 import rift.package.checksums
31 import rift.package.package
32 import rift.package.store
33 import rift.package.image
34
35 from . import state
36 from . import message
37 from . import tosca
38
39 import gi
40 gi.require_version('NsdYang', '1.0')
41 gi.require_version('VnfdYang', '1.0')
42 gi.require_version('RwPkgMgmtYang', '1.0')
43
44 from gi.repository import (
45 NsdYang,
46 VnfdYang,
47 RwPkgMgmtYang)
48 import rift.mano.dts as mano_dts
49
50
51 RPC_PACKAGE_EXPORT_ENDPOINT = RwPkgMgmtYang.YangOutput_RwPkgMgmt_PackageExport
52
53
54 class ExportStart(message.StatusMessage):
55 def __init__(self):
56 super().__init__("export-started", "export process started")
57
58
59 class ExportSuccess(message.StatusMessage):
60 def __init__(self):
61 super().__init__("export-success", "export process successfully completed")
62
63
64 class ExportFailure(message.StatusMessage):
65 def __init__(self):
66 super().__init__("export-failure", "export process failed")
67
68
69 class ExportError(message.ErrorMessage):
70 def __init__(self, msg):
71 super().__init__("update-error", msg)
72
73
74 class ExportSingleDescriptorOnlyError(ExportError):
75 def __init__(self):
76 super().__init__("Only a single descriptor can be exported")
77
78
79 class ArchiveExportError(Exception):
80 pass
81
82
83 class DescriptorPackageArchiveExporter(object):
84 def __init__(self, log):
85 self._log = log
86
87 def _create_archive_from_package(self, archive_hdl, package, open_fn, top_level_dir=None):
88 orig_open = package.open
89 try:
90 package.open = open_fn
91 archive = rift.package.archive.TarPackageArchive.from_package(
92 self._log, package, archive_hdl, top_level_dir
93 )
94 return archive
95 finally:
96 package.open = orig_open
97
98 def create_archive(self, archive_hdl, package, desc_json_str, serializer):
99 """ Create a package archive from an existing package, descriptor messages,
100 and a destination serializer.
101
102 In order to stay flexible with the package directory structure and
103 descriptor format, attempt to "augment" the onboarded package with the
104 updated descriptor in the original format. If the original package
105 contained a checksum file, then recalculate the descriptor checksum.
106
107 Arguments:
108 archive_hdl - An open file handle with 'wb' permissions
109 package - A DescriptorPackage instance
110 desc_json_str - A descriptor (e.g. nsd, vnfd) protobuf message
111 serializer - A destination serializer (e.g. VnfdSerializer)
112
113 Returns:
114 A TarPackageArchive
115
116 Raises:
117 ArchiveExportError - The exported archive failed to create
118
119 """
120 new_desc_msg = serializer.from_file_hdl(io.BytesIO(desc_json_str.encode()), ".json")
121 _, dest_ext = os.path.splitext(package.descriptor_file)
122 new_desc_hdl = io.BytesIO(serializer.to_string(new_desc_msg, dest_ext).encode())
123 descriptor_checksum = rift.package.checksums.checksum(new_desc_hdl)
124
125 checksum_file = None
126 try:
127 checksum_file = rift.package.package.PackageChecksumValidator.get_package_checksum_file(
128 package
129 )
130
131 except FileNotFoundError:
132 pass
133
134 # Since we're going to intercept the open function to rewrite the descriptor
135 # and checksum, save a handle to use below
136 open_fn = package.open
137
138 def create_checksum_file_hdl():
139 with open_fn(checksum_file) as checksum_hdl:
140 archive_checksums = rift.package.checksums.ArchiveChecksums.from_file_desc(
141 checksum_hdl
142 )
143
144 archive_checksums[package.descriptor_file] = descriptor_checksum
145
146 checksum_hdl = io.BytesIO(archive_checksums.to_string().encode())
147 return checksum_hdl
148
149 def open_wrapper(rel_path):
150 """ Wraps the package open in order to rewrite the descriptor file and checksum """
151 if rel_path == package.descriptor_file:
152 return new_desc_hdl
153
154 elif rel_path == checksum_file:
155 return create_checksum_file_hdl()
156
157 return open_fn(rel_path)
158
159 archive = self._create_archive_from_package(archive_hdl, package, open_wrapper, new_desc_msg.name)
160
161 return archive
162
163 def export_package(self, package, export_dir, file_id, json_desc_str, dest_serializer):
164 """ Export package as an archive to the export directory
165
166 Arguments:
167 package - A DescriptorPackage instance
168 export_dir - The directory to export the package archive to
169 file_id - A unique file id to name the archive as (i.e. <file_id>.tar.gz)
170 json_desc_str - A descriptor (e.g. nsd, vnfd) json message string
171 dest_serializer - A destination serializer (e.g. VnfdSerializer)
172
173 Returns:
174 The created archive path
175
176 Raises:
177 ArchiveExportError - The exported archive failed to create
178 """
179 try:
180 os.makedirs(export_dir, exist_ok=True)
181 except FileExistsError:
182 pass
183
184 archive_path = os.path.join(export_dir, file_id + ".tar.gz")
185 with open(archive_path, 'wb') as archive_hdl:
186 try:
187 self.create_archive(
188 archive_hdl, package, json_desc_str, dest_serializer
189 )
190 except Exception as e:
191 os.remove(archive_path)
192 msg = "Failed to create exported archive"
193 self._log.error(msg)
194 raise ArchiveExportError(msg) from e
195
196 return archive_path
197
198
199 class ExportRpcHandler(mano_dts.AbstractRpcHandler):
200 def __init__(self, log, dts, loop, application, store_map, exporter, onboarder, catalog_map):
201 """
202 Args:
203 application: UploaderApplication
204 store_map: dict containing VnfdStore & NsdStore
205 exporter : DescriptorPackageArchiveExporter
206 calalog_map: Dict containing Vnfds and Nsd onboarding.
207 """
208 super().__init__(log, dts, loop)
209
210 self.application = application
211 self.store_map = store_map
212 self.exporter = exporter
213 self.onboarder = onboarder
214 self.catalog_map = catalog_map
215 self.log = log
216
217 @property
218 def xpath(self):
219 return "/rw-pkg-mgmt:package-export"
220
221 @asyncio.coroutine
222 def callback(self, ks_path, msg):
223 transaction_id = str(uuid.uuid4())
224 log = message.Logger(
225 self.log,
226 self.application.messages[transaction_id],
227 )
228
229 file_name = self.export(transaction_id, log, msg)
230
231 rpc_out = RPC_PACKAGE_EXPORT_ENDPOINT.from_dict({
232 'transaction_id': transaction_id,
233 'filename': file_name})
234
235 return rpc_out
236
237 def export(self, transaction_id, log, msg):
238 log.message(ExportStart())
239 desc_type = msg.package_type.lower()
240
241 if desc_type not in self.catalog_map:
242 raise ValueError("Invalid package type: {}".format(desc_type))
243
244 # Parse the IDs
245 desc_id = msg.package_id
246 catalog = self.catalog_map[desc_type]
247
248 if desc_id not in catalog:
249 raise ValueError("Unable to find package ID: {}".format(desc_id))
250
251 desc_msg = catalog[desc_id]
252
253 # Get the schema for exporting
254 schema = msg.export_schema.lower()
255
256 # Get the grammar for exporting
257 grammar = msg.export_grammar.lower()
258
259 # Get the format for exporting
260 format_ = msg.export_format.lower()
261
262 # Initial value of the exported filename
263 self.filename = "{name}_{ver}".format(
264 name=desc_msg.name,
265 ver=desc_msg.version)
266
267 if grammar == 'tosca':
268 self.export_tosca(schema, format_, desc_type, desc_id, desc_msg, log, transaction_id)
269 filename = "{}.zip".format(self.filename)
270 log.message(message.FilenameMessage(filename))
271 else:
272 self.export_rift(schema, format_, desc_type, desc_id, desc_msg, log, transaction_id)
273 filename = "{}.tar.gz".format(self.filename)
274 log.message(message.FilenameMessage(filename))
275
276 log.message(ExportSuccess())
277
278 return filename
279
280 def export_rift(self, schema, format_, desc_type, desc_id, desc_msg, log, transaction_id):
281 convert = rift.package.convert
282 schema_serializer_map = {
283 "rift": {
284 "vnfd": convert.RwVnfdSerializer,
285 "nsd": convert.RwNsdSerializer,
286 },
287 "mano": {
288 "vnfd": convert.RwVnfdSerializer,
289 "nsd": convert.RwNsdSerializer,
290 }
291 }
292
293 if schema not in schema_serializer_map:
294 raise tornado.web.HTTPError(400, "unknown schema: {}".format(schema))
295
296 if format_ != "yaml":
297 log.warn("Only yaml format supported for export")
298
299 if desc_type not in schema_serializer_map[schema]:
300 raise tornado.web.HTTPError(400, "unknown descriptor type: {}".format(desc_type))
301
302 # Use the rift superset schema as the source
303 src_serializer = schema_serializer_map["rift"][desc_type]()
304
305 dest_serializer = schema_serializer_map[schema][desc_type]()
306
307 package_store = self.store_map[desc_type]
308
309 # Attempt to get the package from the package store
310 # If that fails, create a temporary package using the descriptor only
311 try:
312 package = package_store.get_package(desc_id)
313 except rift.package.store.PackageNotFoundError:
314 log.debug("stored package not found. creating package from descriptor config")
315
316 desc_yaml_str = src_serializer.to_yaml_string(desc_msg)
317 with io.BytesIO(desc_yaml_str.encode()) as hdl:
318 hdl.name = "{}__{}.yaml".format(desc_msg.id, desc_type)
319 package = rift.package.package.DescriptorPackage.from_descriptor_file_hdl(
320 log, hdl
321 )
322
323 # Try to get the updated descriptor from the api endpoint so that we have
324 # the updated descriptor file in the exported archive and the name of the archive
325 # tar matches the name in the yaml descriptor file. Proceed with the current
326 # file if there's an error
327 #
328 json_desc_msg = src_serializer.to_json_string(desc_msg)
329 desc_name, desc_version = desc_msg.name, desc_msg.version
330 try:
331 d = collections.defaultdict(dict)
332 sub_dict = self.onboarder.get_updated_descriptor(desc_msg)
333 root_key, sub_key = "{0}:{0}-catalog".format(desc_type), "{0}:{0}".format(desc_type)
334 # root the dict under "vnfd:vnfd-catalog"
335 d[root_key] = sub_dict
336
337 json_desc_msg = json.dumps(d)
338 desc_name, desc_version = sub_dict[sub_key]['name'], sub_dict[sub_key]['version']
339
340 except Exception as e:
341 msg = "Exception {} raised - {}".format(e.__class__.__name__, str(e))
342 self.log.debug(msg)
343
344 # exported filename based on the updated descriptor name
345 self.filename = "{}_{}".format(desc_name, desc_version)
346
347 self.exporter.export_package(
348 package=package,
349 export_dir=self.application.export_dir,
350 file_id = self.filename,
351 json_desc_str=json_desc_msg,
352 dest_serializer=dest_serializer,
353 )
354
355 def export_tosca(self, format_, schema, desc_type, desc_id, desc_msg, log, transaction_id):
356 if format_ != "yaml":
357 log.warn("Only yaml format supported for TOSCA export")
358
359 def get_pkg_from_store(id_, type_):
360 package = None
361 # Attempt to get the package from the package store
362 try:
363 package_store = self.store_map[type_]
364 package = package_store.get_package(id_)
365
366 except rift.package.store.PackageNotFoundError:
367 log.debug("stored package not found for {}.".format(id_))
368 except rift.package.store.PackageStoreError:
369 log.debug("stored package error for {}.".format(id_))
370
371 return package
372
373 if desc_type == "nsd":
374 pkg = tosca.ExportTosca()
375
376 # Add NSD and related descriptors for exporting
377 nsd_id = pkg.add_nsd(desc_msg, get_pkg_from_store(desc_id, "nsd"))
378
379 catalog = self.catalog_map["vnfd"]
380 for const_vnfd in desc_msg.constituent_vnfd:
381 vnfd_id = const_vnfd.vnfd_id_ref
382 if vnfd_id in catalog:
383 pkg.add_vnfd(nsd_id,
384 catalog[vnfd_id],
385 get_pkg_from_store(vnfd_id, "vnfd"))
386 else:
387 raise tornado.web.HTTPError(
388 400,
389 "Unknown VNFD descriptor {} for NSD {}".
390 format(vnfd_id, nsd_id))
391
392 # Create the archive.
393 pkg.create_archive(transaction_id,
394 dest=self.application.export_dir)
395 if desc_type == "vnfd":
396 pkg = tosca.ExportTosca()
397 vnfd_id = desc_msg.id
398 pkg.add_single_vnfd(vnfd_id,
399 desc_msg,
400 get_pkg_from_store(vnfd_id, "vnfd"))
401
402 # Create the archive.
403 pkg.create_archive(transaction_id,
404 dest=self.application.export_dir)
405
406
407 class ExportStateHandler(state.StateHandler):
408 STARTED = ExportStart
409 SUCCESS = ExportSuccess
410 FAILURE = ExportFailure
411
412
413 @asyncio.coroutine
414 def periodic_export_cleanup(log, loop, export_dir, period_secs=10 * 60, min_age_secs=30 * 60):
415 """ Periodically cleanup old exported archives (.tar.gz files) in export_dir
416
417 Arguments:
418 log - A Logger instance
419 loop - A asyncio event loop
420 export_dir - The directory to cleanup old archives in
421 period_secs - The number of seconds between clean ups
422 min_age_secs - The minimum age of a archive to be eligible for cleanup
423
424 """
425 log.debug("Starting periodic export cleaning for export directory: %s", export_dir)
426
427 # Create export dir if not created yet
428 if not os.path.exists(export_dir):
429 os.makedirs(export_dir)
430
431 while True:
432 yield from asyncio.sleep(period_secs, loop=loop)
433
434 if not os.path.exists(export_dir):
435 continue
436
437 for file_name in os.listdir(export_dir):
438 if not file_name.endswith(".tar.gz"):
439 continue
440
441 file_path = os.path.join(export_dir, file_name)
442
443 try:
444 file_stat = os.stat(file_path)
445 except OSError as e:
446 log.warning("Could not stat old exported archive: %s", str(e))
447 continue
448
449 file_age = time.time() - file_stat[stat.ST_MTIME]
450
451 if file_age < min_age_secs:
452 continue
453
454 log.debug("Cleaning up old exported archive: %s", file_path)
455
456 try:
457 os.remove(file_path)
458 except OSError as e:
459 log.warning("Failed to remove old exported archive: %s", str(e))