Fix project delete failures
[osm/SO.git] / rwlaunchpad / plugins / rwlaunchpadtasklet / rift / tasklets / rwlaunchpad / export.py
1
2 #
3 # Copyright 2016-2017 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19 import io
20 import os.path
21 import stat
22 import time
23 import uuid
24
25 import tornado.web
26
27 import rift.package.archive
28 import rift.package.checksums
29 import rift.package.package
30 import rift.package.store
31 import rift.package.image
32
33 from . import state
34 from . import message
35 from . import tosca
36
37 import gi
38 gi.require_version('RwPkgMgmtYang', '1.0')
39
40 from gi.repository import (
41 RwPkgMgmtYang)
42 import rift.mano.dts as mano_dts
43
44
45 RPC_PACKAGE_EXPORT_ENDPOINT = RwPkgMgmtYang.YangOutput_RwPkgMgmt_PackageExport
46
47
48 class ExportStart(message.StatusMessage):
49 def __init__(self):
50 super().__init__("export-started", "export process started")
51
52
53 class ExportSuccess(message.StatusMessage):
54 def __init__(self):
55 super().__init__("export-success", "export process successfully completed")
56
57
58 class ExportFailure(message.StatusMessage):
59 def __init__(self):
60 super().__init__("export-failure", "export process failed")
61
62
63 class ExportError(message.ErrorMessage):
64 def __init__(self, msg):
65 super().__init__("update-error", msg)
66
67
68 class ExportSingleDescriptorOnlyError(ExportError):
69 def __init__(self):
70 super().__init__("Only a single descriptor can be exported")
71
72
73 class ArchiveExportError(Exception):
74 pass
75
76
77 class DescriptorPackageArchiveExporter(object):
78 def __init__(self, log):
79 self._log = log
80
81 def _create_archive_from_package(self, archive_hdl, package, open_fn):
82 orig_open = package.open
83 try:
84 package.open = open_fn
85 archive = rift.package.archive.TarPackageArchive.from_package(
86 self._log, package, archive_hdl
87 )
88 return archive
89 finally:
90 package.open = orig_open
91
92 def create_archive(self, archive_hdl, package, desc_json_str, serializer):
93 """ Create a package archive from an existing package, descriptor messages,
94 and a destination serializer.
95
96 In order to stay flexible with the package directory structure and
97 descriptor format, attempt to "augment" the onboarded package with the
98 updated descriptor in the original format. If the original package
99 contained a checksum file, then recalculate the descriptor checksum.
100
101 Arguments:
102 archive_hdl - An open file handle with 'wb' permissions
103 package - A DescriptorPackage instance
104 desc_json_str - A descriptor (e.g. nsd, vnfd) protobuf message
105 serializer - A destination serializer (e.g. VnfdSerializer)
106
107 Returns:
108 A TarPackageArchive
109
110 Raises:
111 ArchiveExportError - The exported archive failed to create
112
113 """
114 new_desc_msg = serializer.from_file_hdl(io.BytesIO(desc_json_str.encode()), ".json")
115 _, dest_ext = os.path.splitext(package.descriptor_file)
116 new_desc_hdl = io.BytesIO(serializer.to_string(new_desc_msg, dest_ext).encode())
117 descriptor_checksum = rift.package.checksums.checksum(new_desc_hdl)
118
119 checksum_file = None
120 try:
121 checksum_file = rift.package.package.PackageChecksumValidator.get_package_checksum_file(
122 package
123 )
124
125 except FileNotFoundError:
126 pass
127
128 # Since we're going to intercept the open function to rewrite the descriptor
129 # and checksum, save a handle to use below
130 open_fn = package.open
131
132 def create_checksum_file_hdl():
133 with open_fn(checksum_file) as checksum_hdl:
134 archive_checksums = rift.package.checksums.ArchiveChecksums.from_file_desc(
135 checksum_hdl
136 )
137
138 archive_checksums[package.descriptor_file] = descriptor_checksum
139
140 checksum_hdl = io.BytesIO(archive_checksums.to_string().encode())
141 return checksum_hdl
142
143 def open_wrapper(rel_path):
144 """ Wraps the package open in order to rewrite the descriptor file and checksum """
145 if rel_path == package.descriptor_file:
146 return new_desc_hdl
147
148 elif rel_path == checksum_file:
149 return create_checksum_file_hdl()
150
151 return open_fn(rel_path)
152
153 archive = self._create_archive_from_package(archive_hdl, package, open_wrapper)
154
155 return archive
156
157 def export_package(self, package, export_dir, file_id, json_desc_str, dest_serializer):
158 """ Export package as an archive to the export directory
159
160 Arguments:
161 package - A DescriptorPackage instance
162 export_dir - The directory to export the package archive to
163 file_id - A unique file id to name the archive as (i.e. <file_id>.tar.gz)
164 json_desc_str - A descriptor (e.g. nsd, vnfd) json message string
165 dest_serializer - A destination serializer (e.g. VnfdSerializer)
166
167 Returns:
168 The created archive path
169
170 Raises:
171 ArchiveExportError - The exported archive failed to create
172 """
173 try:
174 os.makedirs(export_dir, exist_ok=True)
175 except FileExistsError:
176 pass
177
178 archive_path = os.path.join(export_dir, file_id + ".tar.gz")
179 with open(archive_path, 'wb') as archive_hdl:
180 try:
181 self.create_archive(
182 archive_hdl, package, json_desc_str, dest_serializer
183 )
184 except Exception as e:
185 os.remove(archive_path)
186 msg = "Failed to create exported archive"
187 self._log.error(msg)
188 raise ArchiveExportError(msg) from e
189
190 return archive_path
191
192
193 class ExportRpcHandler(mano_dts.AbstractRpcHandler):
194 def __init__(self, application, catalog_map):
195 """
196 Args:
197 application: UploaderApplication
198 calalog_map: Dict containing Vnfds and Nsd onboarding.
199 """
200 super().__init__(application.log, application.dts, application.loop)
201
202 self.application = application
203 self.store_map = application.package_store_map
204 self.exporter = application.exporter
205 self.catalog_map = catalog_map
206
207 @property
208 def xpath(self):
209 return "/rw-pkg-mgmt:package-export"
210
211 @asyncio.coroutine
212 def callback(self, ks_path, msg):
213 transaction_id = str(uuid.uuid4())
214 log = message.Logger(
215 self.log,
216 self.application.messages[transaction_id],
217 )
218
219 file_name = self.export(transaction_id, log, msg)
220
221 rpc_out = RPC_PACKAGE_EXPORT_ENDPOINT.from_dict({
222 'transaction_id': transaction_id,
223 'filename': file_name})
224
225 return rpc_out
226
227 def export(self, transaction_id, log, msg):
228 log.message(ExportStart())
229 desc_type = msg.package_type.lower()
230
231 if desc_type not in self.catalog_map:
232 raise ValueError("Invalid package type: {}".format(desc_type))
233
234 # Parse the IDs
235 desc_id = msg.package_id
236 catalog = self.catalog_map[desc_type](project=msg.project_name)
237
238 if desc_id not in catalog:
239 raise ValueError("Unable to find package ID: {}".format(desc_id))
240
241 desc_msg = catalog[desc_id]
242
243 # Get the schema for exporting
244 schema = msg.export_schema.lower()
245
246 # Get the grammar for exporting
247 grammar = msg.export_grammar.lower()
248
249 # Get the format for exporting
250 format_ = msg.export_format.lower()
251
252 filename = None
253
254 if grammar == 'tosca':
255 filename = "{}.zip".format(transaction_id)
256 self.export_tosca(schema, format_, desc_type, desc_id, desc_msg, log, transaction_id)
257 log.message(message.FilenameMessage(filename))
258 else:
259 filename = "{}.tar.gz".format(transaction_id)
260 self.export_rift(schema, format_, desc_type, desc_id, desc_msg, log, transaction_id)
261 log.message(message.FilenameMessage(filename))
262
263 log.message(ExportSuccess())
264
265 return filename
266
267 def export_rift(self, schema, format_, desc_type, desc_id, desc_msg, log, transaction_id):
268 convert = rift.package.convert
269 schema_serializer_map = {
270 "rift": {
271 "vnfd": convert.RwVnfdSerializer,
272 "nsd": convert.RwNsdSerializer,
273 },
274 "mano": {
275 "vnfd": convert.VnfdSerializer,
276 "nsd": convert.NsdSerializer,
277 }
278 }
279
280 if schema not in schema_serializer_map:
281 raise tornado.web.HTTPError(400, "unknown schema: {}".format(schema))
282
283 if format_ != "yaml":
284 log.warn("Only yaml format supported for export")
285
286 if desc_type not in schema_serializer_map[schema]:
287 raise tornado.web.HTTPError(400, "unknown descriptor type: {}".format(desc_type))
288
289 # Use the rift superset schema as the source
290 src_serializer = schema_serializer_map["rift"][desc_type]()
291
292 dest_serializer = schema_serializer_map[schema][desc_type]()
293
294 package_store = self.store_map[desc_type]
295
296 # Attempt to get the package from the package store
297 # If that fails, create a temporary package using the descriptor only
298 try:
299 package = package_store.get_package(desc_id)
300 except rift.package.store.PackageNotFoundError:
301 log.debug("stored package not found. creating package from descriptor config")
302
303 desc_yaml_str = src_serializer.to_yaml_string(desc_msg)
304 with io.BytesIO(desc_yaml_str.encode()) as hdl:
305 hdl.name = "{}__{}.yaml".format(desc_msg.id, desc_type)
306 package = rift.package.package.DescriptorPackage.from_descriptor_file_hdl(
307 log, hdl
308 )
309
310 self.exporter.export_package(
311 package=package,
312 export_dir=self.application.export_dir,
313 file_id=transaction_id,
314 json_desc_str=src_serializer.to_json_string(desc_msg),
315 dest_serializer=dest_serializer,
316 )
317
318 def export_tosca(self, format_, schema, desc_type, desc_id, desc_msg, log, transaction_id):
319 if format_ != "yaml":
320 log.warn("Only yaml format supported for TOSCA export")
321
322 if desc_type != "nsd":
323 raise tornado.web.HTTPError(
324 400,
325 "NSD need to passed to generate TOSCA: {}".format(desc_type))
326
327 def get_pkg_from_store(id_, type_):
328 package = None
329 # Attempt to get the package from the package store
330 try:
331 package_store = self.store_map[type_]
332 package = package_store.get_package(id_)
333
334 except rift.package.store.PackageNotFoundError:
335 log.debug("stored package not found for {}.".format(id_))
336 except rift.package.store.PackageStoreError:
337 log.debug("stored package error for {}.".format(id_))
338
339 return package
340
341 pkg = tosca.ExportTosca()
342
343 # Add NSD and related descriptors for exporting
344 nsd_id = pkg.add_nsd(desc_msg, get_pkg_from_store(desc_id, "nsd"))
345
346 catalog = self.catalog_map["vnfd"]
347 for const_vnfd in desc_msg.constituent_vnfd:
348 vnfd_id = const_vnfd.vnfd_id_ref
349 if vnfd_id in catalog:
350 pkg.add_vnfd(nsd_id,
351 catalog[vnfd_id],
352 get_pkg_from_store(vnfd_id, "vnfd"))
353 else:
354 raise tornado.web.HTTPError(
355 400,
356 "Unknown VNFD descriptor {} for NSD {}".
357 format(vnfd_id, nsd_id))
358
359 # Create the archive.
360 pkg.create_archive(transaction_id,
361 dest=self.application.export_dir)
362
363
364 class ExportStateHandler(state.StateHandler):
365 STARTED = ExportStart
366 SUCCESS = ExportSuccess
367 FAILURE = ExportFailure
368
369
370 @asyncio.coroutine
371 def periodic_export_cleanup(log, loop, export_dir, period_secs=10 * 60, min_age_secs=30 * 60):
372 """ Periodically cleanup old exported archives (.tar.gz files) in export_dir
373
374 Arguments:
375 log - A Logger instance
376 loop - A asyncio event loop
377 export_dir - The directory to cleanup old archives in
378 period_secs - The number of seconds between clean ups
379 min_age_secs - The minimum age of a archive to be eligible for cleanup
380
381 """
382 log.debug("Starting periodic export cleaning for export directory: %s", export_dir)
383
384 # Create export dir if not created yet
385 if not os.path.exists(export_dir):
386 os.makedirs(export_dir)
387
388 while True:
389 yield from asyncio.sleep(period_secs, loop=loop)
390
391 if not os.path.exists(export_dir):
392 continue
393
394 for file_name in os.listdir(export_dir):
395 if not file_name.endswith(".tar.gz"):
396 continue
397
398 file_path = os.path.join(export_dir, file_name)
399
400 try:
401 file_stat = os.stat(file_path)
402 except OSError as e:
403 log.warning("Could not stat old exported archive: %s", str(e))
404 continue
405
406 file_age = time.time() - file_stat[stat.ST_MTIME]
407
408 if file_age < min_age_secs:
409 continue
410
411 log.debug("Cleaning up old exported archive: %s", file_path)
412
413 try:
414 os.remove(file_path)
415 except OSError as e:
416 log.warning("Failed to remove old exported archive: %s", str(e))