* YANG to TOSCA translator
[osm/SO.git] / rwlaunchpad / plugins / rwlaunchpadtasklet / rift / tasklets / rwlaunchpad / export.py
1
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19 import io
20 import os.path
21 import stat
22 import time
23 import uuid
24
25 import tornado.web
26
27 import rift.package.archive
28 import rift.package.checksums
29 import rift.package.package
30 import rift.package.store
31 import rift.package.image
32
33 from . import state
34 from . import message
35 from . import tosca
36
37 import gi
38 gi.require_version('NsdYang', '1.0')
39 gi.require_version('VnfdYang', '1.0')
40 gi.require_version('RwPkgMgmtYang', '1.0')
41
42 from gi.repository import (
43 NsdYang,
44 VnfdYang,
45 RwPkgMgmtYang)
46 import rift.mano.dts as mano_dts
47
48
49 RPC_PACKAGE_EXPORT_ENDPOINT = RwPkgMgmtYang.YangOutput_RwPkgMgmt_PackageExport
50
51
52 class ExportStart(message.StatusMessage):
53 def __init__(self):
54 super().__init__("export-started", "export process started")
55
56
57 class ExportSuccess(message.StatusMessage):
58 def __init__(self):
59 super().__init__("export-success", "export process successfully completed")
60
61
62 class ExportFailure(message.StatusMessage):
63 def __init__(self):
64 super().__init__("export-failure", "export process failed")
65
66
67 class ExportError(message.ErrorMessage):
68 def __init__(self, msg):
69 super().__init__("update-error", msg)
70
71
72 class ExportSingleDescriptorOnlyError(ExportError):
73 def __init__(self):
74 super().__init__("Only a single descriptor can be exported")
75
76
77 class ArchiveExportError(Exception):
78 pass
79
80
81 class DescriptorPackageArchiveExporter(object):
82 def __init__(self, log):
83 self._log = log
84
85 def _create_archive_from_package(self, archive_hdl, package, open_fn):
86 orig_open = package.open
87 try:
88 package.open = open_fn
89 archive = rift.package.archive.TarPackageArchive.from_package(
90 self._log, package, archive_hdl
91 )
92 return archive
93 finally:
94 package.open = orig_open
95
96 def create_archive(self, archive_hdl, package, desc_json_str, serializer):
97 """ Create a package archive from an existing package, descriptor messages,
98 and a destination serializer.
99
100 In order to stay flexible with the package directory structure and
101 descriptor format, attempt to "augment" the onboarded package with the
102 updated descriptor in the original format. If the original package
103 contained a checksum file, then recalculate the descriptor checksum.
104
105 Arguments:
106 archive_hdl - An open file handle with 'wb' permissions
107 package - A DescriptorPackage instance
108 desc_json_str - A descriptor (e.g. nsd, vnfd) protobuf message
109 serializer - A destination serializer (e.g. VnfdSerializer)
110
111 Returns:
112 A TarPackageArchive
113
114 Raises:
115 ArchiveExportError - The exported archive failed to create
116
117 """
118 new_desc_msg = serializer.from_file_hdl(io.BytesIO(desc_json_str.encode()), ".json")
119 _, dest_ext = os.path.splitext(package.descriptor_file)
120 new_desc_hdl = io.BytesIO(serializer.to_string(new_desc_msg, dest_ext).encode())
121 descriptor_checksum = rift.package.checksums.checksum(new_desc_hdl)
122
123 checksum_file = None
124 try:
125 checksum_file = rift.package.package.PackageChecksumValidator.get_package_checksum_file(
126 package
127 )
128
129 except FileNotFoundError:
130 pass
131
132 # Since we're going to intercept the open function to rewrite the descriptor
133 # and checksum, save a handle to use below
134 open_fn = package.open
135
136 def create_checksum_file_hdl():
137 with open_fn(checksum_file) as checksum_hdl:
138 archive_checksums = rift.package.checksums.ArchiveChecksums.from_file_desc(
139 checksum_hdl
140 )
141
142 archive_checksums[package.descriptor_file] = descriptor_checksum
143
144 checksum_hdl = io.BytesIO(archive_checksums.to_string().encode())
145 return checksum_hdl
146
147 def open_wrapper(rel_path):
148 """ Wraps the package open in order to rewrite the descriptor file and checksum """
149 if rel_path == package.descriptor_file:
150 return new_desc_hdl
151
152 elif rel_path == checksum_file:
153 return create_checksum_file_hdl()
154
155 return open_fn(rel_path)
156
157 archive = self._create_archive_from_package(archive_hdl, package, open_wrapper)
158
159 return archive
160
161 def export_package(self, package, export_dir, file_id, json_desc_str, dest_serializer):
162 """ Export package as an archive to the export directory
163
164 Arguments:
165 package - A DescriptorPackage instance
166 export_dir - The directory to export the package archive to
167 file_id - A unique file id to name the archive as (i.e. <file_id>.tar.gz)
168 json_desc_str - A descriptor (e.g. nsd, vnfd) json message string
169 dest_serializer - A destination serializer (e.g. VnfdSerializer)
170
171 Returns:
172 The created archive path
173
174 Raises:
175 ArchiveExportError - The exported archive failed to create
176 """
177 try:
178 os.makedirs(export_dir, exist_ok=True)
179 except FileExistsError:
180 pass
181
182 archive_path = os.path.join(export_dir, file_id + ".tar.gz")
183 with open(archive_path, 'wb') as archive_hdl:
184 try:
185 self.create_archive(
186 archive_hdl, package, json_desc_str, dest_serializer
187 )
188 except Exception as e:
189 os.remove(archive_path)
190 msg = "Failed to create exported archive"
191 self._log.error(msg)
192 raise ArchiveExportError(msg) from e
193
194 return archive_path
195
196
197 class ExportRpcHandler(mano_dts.AbstractRpcHandler):
198 def __init__(self, log, dts, loop, application, store_map, exporter, catalog_map):
199 """
200 Args:
201 application: UploaderApplication
202 store_map: dict containing VnfdStore & NsdStore
203 exporter : DescriptorPackageArchiveExporter
204 calalog_map: Dict containing Vnfds and Nsd onboarding.
205 """
206 super().__init__(log, dts, loop)
207
208 self.application = application
209 self.store_map = store_map
210 self.exporter = exporter
211 self.catalog_map = catalog_map
212 self.log = log
213
214 @property
215 def xpath(self):
216 return "/rw-pkg-mgmt:package-export"
217
218 @asyncio.coroutine
219 def callback(self, ks_path, msg):
220 transaction_id = str(uuid.uuid4())
221 log = message.Logger(
222 self.log,
223 self.application.messages[transaction_id],
224 )
225
226 file_name = self.export(transaction_id, log, msg)
227
228 rpc_out = RPC_PACKAGE_EXPORT_ENDPOINT.from_dict({
229 'transaction_id': transaction_id,
230 'filename': file_name})
231
232 return rpc_out
233
234 def export(self, transaction_id, log, msg):
235 log.message(ExportStart())
236 desc_type = msg.package_type.lower()
237
238 if desc_type not in self.catalog_map:
239 raise ValueError("Invalid package type: {}".format(desc_type))
240
241 # Parse the IDs
242 desc_id = msg.package_id
243 catalog = self.catalog_map[desc_type]
244
245 if desc_id not in catalog:
246 raise ValueError("Unable to find package ID: {}".format(desc_id))
247
248 desc_msg = catalog[desc_id]
249
250 # Get the schema for exporting
251 schema = msg.export_schema.lower()
252
253 # Get the grammar for exporting
254 grammar = msg.export_grammar.lower()
255
256 # Get the format for exporting
257 format_ = msg.export_format.lower()
258
259 filename = None
260
261 if grammar == 'tosca':
262 filename = "{}.zip".format(transaction_id)
263 self.export_tosca(schema, format_, desc_type, desc_id, desc_msg, log, transaction_id)
264 log.message(message.FilenameMessage(filename))
265 else:
266 filename = "{}.tar.gz".format(transaction_id)
267 self.export_rift(schema, format_, desc_type, desc_id, desc_msg, log, transaction_id)
268 log.message(message.FilenameMessage(filename))
269
270 log.message(ExportSuccess())
271
272 return filename
273
274 def export_rift(self, schema, format_, desc_type, desc_id, desc_msg, log, transaction_id):
275 convert = rift.package.convert
276 schema_serializer_map = {
277 "rift": {
278 "vnfd": convert.RwVnfdSerializer,
279 "nsd": convert.RwNsdSerializer,
280 },
281 "mano": {
282 "vnfd": convert.VnfdSerializer,
283 "nsd": convert.NsdSerializer,
284 }
285 }
286
287 if schema not in schema_serializer_map:
288 raise tornado.web.HTTPError(400, "unknown schema: {}".format(schema))
289
290 if format_ != "yaml":
291 log.warn("Only yaml format supported for export")
292
293 if desc_type not in schema_serializer_map[schema]:
294 raise tornado.web.HTTPError(400, "unknown descriptor type: {}".format(desc_type))
295
296 # Use the rift superset schema as the source
297 src_serializer = schema_serializer_map["rift"][desc_type]()
298
299 dest_serializer = schema_serializer_map[schema][desc_type]()
300
301 package_store = self.store_map[desc_type]
302
303 # Attempt to get the package from the package store
304 # If that fails, create a temporary package using the descriptor only
305 try:
306 package = package_store.get_package(desc_id)
307 except rift.package.store.PackageNotFoundError:
308 log.debug("stored package not found. creating package from descriptor config")
309
310 desc_yaml_str = src_serializer.to_yaml_string(desc_msg)
311 with io.BytesIO(desc_yaml_str.encode()) as hdl:
312 hdl.name = "{}__{}.yaml".format(desc_msg.id, desc_type)
313 package = rift.package.package.DescriptorPackage.from_descriptor_file_hdl(
314 log, hdl
315 )
316
317 self.exporter.export_package(
318 package=package,
319 export_dir=self.application.export_dir,
320 file_id=transaction_id,
321 json_desc_str=src_serializer.to_json_string(desc_msg),
322 dest_serializer=dest_serializer,
323 )
324
325 def export_tosca(self, format_, schema, desc_type, desc_id, desc_msg, log, transaction_id):
326 if format_ != "yaml":
327 log.warn("Only yaml format supported for TOSCA export")
328
329 def get_pkg_from_store(id_, type_):
330 package = None
331 # Attempt to get the package from the package store
332 try:
333 package_store = self.store_map[type_]
334 package = package_store.get_package(id_)
335
336 except rift.package.store.PackageNotFoundError:
337 log.debug("stored package not found for {}.".format(id_))
338 except rift.package.store.PackageStoreError:
339 log.debug("stored package error for {}.".format(id_))
340
341 return package
342
343 if desc_type == "nsd":
344 pkg = tosca.ExportTosca()
345
346 # Add NSD and related descriptors for exporting
347 nsd_id = pkg.add_nsd(desc_msg, get_pkg_from_store(desc_id, "nsd"))
348
349 catalog = self.catalog_map["vnfd"]
350 for const_vnfd in desc_msg.constituent_vnfd:
351 vnfd_id = const_vnfd.vnfd_id_ref
352 if vnfd_id in catalog:
353 pkg.add_vnfd(nsd_id,
354 catalog[vnfd_id],
355 get_pkg_from_store(vnfd_id, "vnfd"))
356 else:
357 raise tornado.web.HTTPError(
358 400,
359 "Unknown VNFD descriptor {} for NSD {}".
360 format(vnfd_id, nsd_id))
361
362 # Create the archive.
363 pkg.create_archive(transaction_id,
364 dest=self.application.export_dir)
365 if desc_type == "vnfd":
366 pkg = tosca.ExportTosca()
367 vnfd_id = desc_msg.id
368 pkg.add_single_vnfd(vnfd_id,
369 desc_msg,
370 get_pkg_from_store(vnfd_id, "vnfd"))
371
372 # Create the archive.
373 pkg.create_archive(transaction_id,
374 dest=self.application.export_dir)
375
376
377 class ExportStateHandler(state.StateHandler):
378 STARTED = ExportStart
379 SUCCESS = ExportSuccess
380 FAILURE = ExportFailure
381
382
383 @asyncio.coroutine
384 def periodic_export_cleanup(log, loop, export_dir, period_secs=10 * 60, min_age_secs=30 * 60):
385 """ Periodically cleanup old exported archives (.tar.gz files) in export_dir
386
387 Arguments:
388 log - A Logger instance
389 loop - A asyncio event loop
390 export_dir - The directory to cleanup old archives in
391 period_secs - The number of seconds between clean ups
392 min_age_secs - The minimum age of a archive to be eligible for cleanup
393
394 """
395 log.debug("Starting periodic export cleaning for export directory: %s", export_dir)
396
397 # Create export dir if not created yet
398 if not os.path.exists(export_dir):
399 os.makedirs(export_dir)
400
401 while True:
402 yield from asyncio.sleep(period_secs, loop=loop)
403
404 if not os.path.exists(export_dir):
405 continue
406
407 for file_name in os.listdir(export_dir):
408 if not file_name.endswith(".tar.gz"):
409 continue
410
411 file_path = os.path.join(export_dir, file_name)
412
413 try:
414 file_stat = os.stat(file_path)
415 except OSError as e:
416 log.warning("Could not stat old exported archive: %s", str(e))
417 continue
418
419 file_age = time.time() - file_stat[stat.ST_MTIME]
420
421 if file_age < min_age_secs:
422 continue
423
424 log.debug("Cleaning up old exported archive: %s", file_path)
425
426 try:
427 os.remove(file_path)
428 except OSError as e:
429 log.warning("Failed to remove old exported archive: %s", str(e))