Merge from OSM SO master
[osm/SO.git] / rwlaunchpad / plugins / rwlaunchpadtasklet / rift / tasklets / rwlaunchpad / export.py
1
2 #
3 # Copyright 2016-2017 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19 import io
20 import os.path
21 import stat
22 import time
23 import uuid
24
25 import tornado.web
26
27 import rift.package.archive
28 import rift.package.checksums
29 import rift.package.package
30 import rift.package.store
31 import rift.package.image
32
33 from . import state
34 from . import message
35 from . import tosca
36
37 import gi
38 gi.require_version('RwPkgMgmtYang', '1.0')
39
40 from gi.repository import (
41 RwPkgMgmtYang)
42 import rift.mano.dts as mano_dts
43
44
45 RPC_PACKAGE_EXPORT_ENDPOINT = RwPkgMgmtYang.YangOutput_RwPkgMgmt_PackageExport
46
47
48 class ExportStart(message.StatusMessage):
49 def __init__(self):
50 super().__init__("export-started", "export process started")
51
52
53 class ExportSuccess(message.StatusMessage):
54 def __init__(self):
55 super().__init__("export-success", "export process successfully completed")
56
57
58 class ExportFailure(message.StatusMessage):
59 def __init__(self):
60 super().__init__("export-failure", "export process failed")
61
62
63 class ExportError(message.ErrorMessage):
64 def __init__(self, msg):
65 super().__init__("update-error", msg)
66
67
68 class ExportSingleDescriptorOnlyError(ExportError):
69 def __init__(self):
70 super().__init__("Only a single descriptor can be exported")
71
72
73 class ArchiveExportError(Exception):
74 pass
75
76
77 class DescriptorPackageArchiveExporter(object):
78 def __init__(self, log):
79 self._log = log
80
81 def _create_archive_from_package(self, archive_hdl, package, open_fn):
82 orig_open = package.open
83 try:
84 package.open = open_fn
85 archive = rift.package.archive.TarPackageArchive.from_package(
86 self._log, package, archive_hdl
87 )
88 return archive
89 finally:
90 package.open = orig_open
91
92 def create_archive(self, archive_hdl, package, desc_json_str, serializer):
93 """ Create a package archive from an existing package, descriptor messages,
94 and a destination serializer.
95
96 In order to stay flexible with the package directory structure and
97 descriptor format, attempt to "augment" the onboarded package with the
98 updated descriptor in the original format. If the original package
99 contained a checksum file, then recalculate the descriptor checksum.
100
101 Arguments:
102 archive_hdl - An open file handle with 'wb' permissions
103 package - A DescriptorPackage instance
104 desc_json_str - A descriptor (e.g. nsd, vnfd) protobuf message
105 serializer - A destination serializer (e.g. VnfdSerializer)
106
107 Returns:
108 A TarPackageArchive
109
110 Raises:
111 ArchiveExportError - The exported archive failed to create
112
113 """
114 new_desc_msg = serializer.from_file_hdl(io.BytesIO(desc_json_str.encode()), ".json")
115 _, dest_ext = os.path.splitext(package.descriptor_file)
116 new_desc_hdl = io.BytesIO(serializer.to_string(new_desc_msg, dest_ext).encode())
117 descriptor_checksum = rift.package.checksums.checksum(new_desc_hdl)
118
119 checksum_file = None
120 try:
121 checksum_file = rift.package.package.PackageChecksumValidator.get_package_checksum_file(
122 package
123 )
124
125 except FileNotFoundError:
126 pass
127
128 # Since we're going to intercept the open function to rewrite the descriptor
129 # and checksum, save a handle to use below
130 open_fn = package.open
131
132 def create_checksum_file_hdl():
133 with open_fn(checksum_file) as checksum_hdl:
134 archive_checksums = rift.package.checksums.ArchiveChecksums.from_file_desc(
135 checksum_hdl
136 )
137
138 archive_checksums[package.descriptor_file] = descriptor_checksum
139
140 checksum_hdl = io.BytesIO(archive_checksums.to_string().encode())
141 return checksum_hdl
142
143 def open_wrapper(rel_path):
144 """ Wraps the package open in order to rewrite the descriptor file and checksum """
145 if rel_path == package.descriptor_file:
146 return new_desc_hdl
147
148 elif rel_path == checksum_file:
149 return create_checksum_file_hdl()
150
151 return open_fn(rel_path)
152
153 archive = self._create_archive_from_package(archive_hdl, package, open_wrapper)
154
155 return archive
156
157 def export_package(self, package, export_dir, file_id, json_desc_str, dest_serializer):
158 """ Export package as an archive to the export directory
159
160 Arguments:
161 package - A DescriptorPackage instance
162 export_dir - The directory to export the package archive to
163 file_id - A unique file id to name the archive as (i.e. <file_id>.tar.gz)
164 json_desc_str - A descriptor (e.g. nsd, vnfd) json message string
165 dest_serializer - A destination serializer (e.g. VnfdSerializer)
166
167 Returns:
168 The created archive path
169
170 Raises:
171 ArchiveExportError - The exported archive failed to create
172 """
173 try:
174 os.makedirs(export_dir, exist_ok=True)
175 except FileExistsError:
176 pass
177
178 archive_path = os.path.join(export_dir, file_id + ".tar.gz")
179 with open(archive_path, 'wb') as archive_hdl:
180 try:
181 self.create_archive(
182 archive_hdl, package, json_desc_str, dest_serializer
183 )
184 except Exception as e:
185 os.remove(archive_path)
186 msg = "Failed to create exported archive"
187 self._log.error(msg)
188 raise ArchiveExportError(msg) from e
189
190 return archive_path
191
192
193 class ExportRpcHandler(mano_dts.AbstractRpcHandler):
194 def __init__(self, application, catalog_map):
195 """
196 Args:
197 application: UploaderApplication
198 calalog_map: Dict containing Vnfds and Nsd onboarding.
199 """
200 super().__init__(application.log, application.dts, application.loop)
201
202 self.application = application
203 self.store_map = application.package_store_map
204 self.exporter = application.exporter
205 self.catalog_map = catalog_map
206
207 @property
208 def xpath(self):
209 return "/rw-pkg-mgmt:package-export"
210
211 @asyncio.coroutine
212 def callback(self, ks_path, msg):
213 transaction_id = str(uuid.uuid4())
214 log = message.Logger(
215 self.log,
216 self.application.messages[transaction_id],
217 )
218
219 file_name = self.export(transaction_id, log, msg)
220
221 rpc_out = RPC_PACKAGE_EXPORT_ENDPOINT.from_dict({
222 'transaction_id': transaction_id,
223 'filename': file_name})
224
225 return rpc_out
226
227 def export(self, transaction_id, log, msg):
228 log.message(ExportStart())
229 desc_type = msg.package_type.lower()
230
231 if desc_type not in self.catalog_map:
232 raise ValueError("Invalid package type: {}".format(desc_type))
233
234 # Parse the IDs
235 desc_id = msg.package_id
236 catalog = self.catalog_map[desc_type](project=msg.project_name)
237
238 if desc_id not in catalog:
239 raise ValueError("Unable to find package ID: {}".format(desc_id))
240
241 desc_msg = catalog[desc_id]
242
243 # Get the schema for exporting
244 schema = msg.export_schema.lower()
245
246 # Get the grammar for exporting
247 grammar = msg.export_grammar.lower()
248
249 # Get the format for exporting
250 format_ = msg.export_format.lower()
251
252 filename = None
253
254 if grammar == 'tosca':
255 filename = "{}.zip".format(transaction_id)
256 self.export_tosca(schema, format_, desc_type, desc_id, desc_msg, log, transaction_id)
257 log.message(message.FilenameMessage(filename))
258 else:
259 filename = "{}.tar.gz".format(transaction_id)
260 self.export_rift(schema, format_, desc_type, desc_id, desc_msg, log, transaction_id)
261 log.message(message.FilenameMessage(filename))
262
263 log.message(ExportSuccess())
264
265 return filename
266
267 def export_rift(self, schema, format_, desc_type, desc_id, desc_msg, log, transaction_id):
268 convert = rift.package.convert
269 schema_serializer_map = {
270 "rift": {
271 "vnfd": convert.RwVnfdSerializer,
272 "nsd": convert.RwNsdSerializer,
273 },
274 "mano": {
275 "vnfd": convert.VnfdSerializer,
276 "nsd": convert.NsdSerializer,
277 }
278 }
279
280 if schema not in schema_serializer_map:
281 raise tornado.web.HTTPError(400, "unknown schema: {}".format(schema))
282
283 if format_ != "yaml":
284 log.warn("Only yaml format supported for export")
285
286 if desc_type not in schema_serializer_map[schema]:
287 raise tornado.web.HTTPError(400, "unknown descriptor type: {}".format(desc_type))
288
289 # Use the rift superset schema as the source
290 src_serializer = schema_serializer_map["rift"][desc_type]()
291
292 dest_serializer = schema_serializer_map[schema][desc_type]()
293
294 package_store = self.store_map[desc_type]
295
296 # Attempt to get the package from the package store
297 # If that fails, create a temporary package using the descriptor only
298 try:
299 package = package_store.get_package(desc_id)
300 except rift.package.store.PackageNotFoundError:
301 log.debug("stored package not found. creating package from descriptor config")
302
303 desc_yaml_str = src_serializer.to_yaml_string(desc_msg)
304 with io.BytesIO(desc_yaml_str.encode()) as hdl:
305 hdl.name = "{}__{}.yaml".format(desc_msg.id, desc_type)
306 package = rift.package.package.DescriptorPackage.from_descriptor_file_hdl(
307 log, hdl
308 )
309
310 self.exporter.export_package(
311 package=package,
312 export_dir=self.application.export_dir,
313 file_id=transaction_id,
314 json_desc_str=src_serializer.to_json_string(desc_msg),
315 dest_serializer=dest_serializer,
316 )
317
318 def export_tosca(self, format_, schema, desc_type, desc_id, desc_msg, log, transaction_id):
319 if format_ != "yaml":
320 log.warn("Only yaml format supported for TOSCA export")
321
322 def get_pkg_from_store(id_, type_):
323 package = None
324 # Attempt to get the package from the package store
325 try:
326 package_store = self.store_map[type_]
327 package = package_store.get_package(id_)
328
329 except rift.package.store.PackageNotFoundError:
330 log.debug("stored package not found for {}.".format(id_))
331 except rift.package.store.PackageStoreError:
332 log.debug("stored package error for {}.".format(id_))
333
334 return package
335
336 if desc_type == "nsd":
337 pkg = tosca.ExportTosca()
338
339 # Add NSD and related descriptors for exporting
340 nsd_id = pkg.add_nsd(desc_msg, get_pkg_from_store(desc_id, "nsd"))
341
342 catalog = self.catalog_map["vnfd"]
343 for const_vnfd in desc_msg.constituent_vnfd:
344 vnfd_id = const_vnfd.vnfd_id_ref
345 if vnfd_id in catalog:
346 pkg.add_vnfd(nsd_id,
347 catalog[vnfd_id],
348 get_pkg_from_store(vnfd_id, "vnfd"))
349 else:
350 raise tornado.web.HTTPError(
351 400,
352 "Unknown VNFD descriptor {} for NSD {}".
353 format(vnfd_id, nsd_id))
354
355 # Create the archive.
356 pkg.create_archive(transaction_id,
357 dest=self.application.export_dir)
358 if desc_type == "vnfd":
359 pkg = tosca.ExportTosca()
360 vnfd_id = desc_msg.id
361 pkg.add_single_vnfd(vnfd_id,
362 desc_msg,
363 get_pkg_from_store(vnfd_id, "vnfd"))
364
365 # Create the archive.
366 pkg.create_archive(transaction_id,
367 dest=self.application.export_dir)
368
369
370 class ExportStateHandler(state.StateHandler):
371 STARTED = ExportStart
372 SUCCESS = ExportSuccess
373 FAILURE = ExportFailure
374
375
376 @asyncio.coroutine
377 def periodic_export_cleanup(log, loop, export_dir, period_secs=10 * 60, min_age_secs=30 * 60):
378 """ Periodically cleanup old exported archives (.tar.gz files) in export_dir
379
380 Arguments:
381 log - A Logger instance
382 loop - A asyncio event loop
383 export_dir - The directory to cleanup old archives in
384 period_secs - The number of seconds between clean ups
385 min_age_secs - The minimum age of a archive to be eligible for cleanup
386
387 """
388 log.debug("Starting periodic export cleaning for export directory: %s", export_dir)
389
390 # Create export dir if not created yet
391 if not os.path.exists(export_dir):
392 os.makedirs(export_dir)
393
394 while True:
395 yield from asyncio.sleep(period_secs, loop=loop)
396
397 if not os.path.exists(export_dir):
398 continue
399
400 for file_name in os.listdir(export_dir):
401 if not file_name.endswith(".tar.gz"):
402 continue
403
404 file_path = os.path.join(export_dir, file_name)
405
406 try:
407 file_stat = os.stat(file_path)
408 except OSError as e:
409 log.warning("Could not stat old exported archive: %s", str(e))
410 continue
411
412 file_age = time.time() - file_stat[stat.ST_MTIME]
413
414 if file_age < min_age_secs:
415 continue
416
417 log.debug("Cleaning up old exported archive: %s", file_path)
418
419 try:
420 os.remove(file_path)
421 except OSError as e:
422 log.warning("Failed to remove old exported archive: %s", str(e))