Merge "Revert "Functional spec for cloud-init support""
[osm/SO.git] / rwlaunchpad / plugins / rwlaunchpadtasklet / rift / tasklets / rwlaunchpad / export.py
1
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19 import io
20 import os.path
21 import stat
22 import time
23 import uuid
24
25 import tornado.web
26
27 import rift.package.archive
28 import rift.package.checksums
29 import rift.package.package
30 import rift.package.store
31 import rift.package.image
32
33 from . import state
34 from . import message
35 from . import tosca
36
37 import gi
38 gi.require_version('NsdYang', '1.0')
39 gi.require_version('VnfdYang', '1.0')
40
41 from gi.repository import (
42 NsdYang,
43 VnfdYang,
44 )
45
46
47 class ExportStart(message.StatusMessage):
48 def __init__(self):
49 super().__init__("export-started", "export process started")
50
51
52 class ExportSuccess(message.StatusMessage):
53 def __init__(self):
54 super().__init__("export-success", "export process successfully completed")
55
56
57 class ExportFailure(message.StatusMessage):
58 def __init__(self):
59 super().__init__("export-failure", "export process failed")
60
61
62 class ExportError(message.ErrorMessage):
63 def __init__(self, msg):
64 super().__init__("update-error", msg)
65
66
67 class ExportSingleDescriptorOnlyError(ExportError):
68 def __init__(self):
69 super().__init__("Only a single descriptor can be exported")
70
71
72 class ArchiveExportError(Exception):
73 pass
74
75
76 class DescriptorPackageArchiveExporter(object):
77 def __init__(self, log):
78 self._log = log
79
80 def _create_archive_from_package(self, archive_hdl, package, open_fn):
81 orig_open = package.open
82 try:
83 package.open = open_fn
84 archive = rift.package.archive.TarPackageArchive.from_package(
85 self._log, package, archive_hdl
86 )
87 return archive
88 finally:
89 package.open = orig_open
90
91 def create_archive(self, archive_hdl, package, desc_json_str, serializer):
92 """ Create a package archive from an existing package, descriptor messages,
93 and a destination serializer.
94
95 In order to stay flexible with the package directory structure and
96 descriptor format, attempt to "augment" the onboarded package with the
97 updated descriptor in the original format. If the original package
98 contained a checksum file, then recalculate the descriptor checksum.
99
100 Arguments:
101 archive_hdl - An open file handle with 'wb' permissions
102 package - A DescriptorPackage instance
103 desc_json_str - A descriptor (e.g. nsd, vnfd) protobuf message
104 serializer - A destination serializer (e.g. VnfdSerializer)
105
106 Returns:
107 A TarPackageArchive
108
109 Raises:
110 ArchiveExportError - The exported archive failed to create
111
112 """
113 new_desc_msg = serializer.from_file_hdl(io.BytesIO(desc_json_str.encode()), ".json")
114 _, dest_ext = os.path.splitext(package.descriptor_file)
115 new_desc_hdl = io.BytesIO(serializer.to_string(new_desc_msg, dest_ext).encode())
116 descriptor_checksum = rift.package.checksums.checksum(new_desc_hdl)
117
118 checksum_file = None
119 try:
120 checksum_file = rift.package.package.PackageChecksumValidator.get_package_checksum_file(
121 package
122 )
123
124 except FileNotFoundError:
125 pass
126
127 # Since we're going to intercept the open function to rewrite the descriptor
128 # and checksum, save a handle to use below
129 open_fn = package.open
130
131 def create_checksum_file_hdl():
132 with open_fn(checksum_file) as checksum_hdl:
133 archive_checksums = rift.package.checksums.ArchiveChecksums.from_file_desc(
134 checksum_hdl
135 )
136
137 archive_checksums[package.descriptor_file] = descriptor_checksum
138
139 checksum_hdl = io.BytesIO(archive_checksums.to_string().encode())
140 return checksum_hdl
141
142 def open_wrapper(rel_path):
143 """ Wraps the package open in order to rewrite the descriptor file and checksum """
144 if rel_path == package.descriptor_file:
145 return new_desc_hdl
146
147 elif rel_path == checksum_file:
148 return create_checksum_file_hdl()
149
150 return open_fn(rel_path)
151
152 archive = self._create_archive_from_package(archive_hdl, package, open_wrapper)
153
154 return archive
155
156 def export_package(self, package, export_dir, file_id, json_desc_str, dest_serializer):
157 """ Export package as an archive to the export directory
158
159 Arguments:
160 package - A DescriptorPackage instance
161 export_dir - The directory to export the package archive to
162 file_id - A unique file id to name the archive as (i.e. <file_id>.tar.gz)
163 json_desc_str - A descriptor (e.g. nsd, vnfd) json message string
164 dest_serializer - A destination serializer (e.g. VnfdSerializer)
165
166 Returns:
167 The created archive path
168
169 Raises:
170 ArchiveExportError - The exported archive failed to create
171 """
172 try:
173 os.makedirs(export_dir, exist_ok=True)
174 except FileExistsError:
175 pass
176
177 archive_path = os.path.join(export_dir, file_id + ".tar.gz")
178 with open(archive_path, 'wb') as archive_hdl:
179 try:
180 self.create_archive(
181 archive_hdl, package, json_desc_str, dest_serializer
182 )
183 except Exception as e:
184 os.remove(archive_path)
185 msg = "Failed to create exported archive"
186 self._log.error(msg)
187 raise ArchiveExportError(msg) from e
188
189 return archive_path
190
191
192 class ExportHandler(tornado.web.RequestHandler):
193 def options(self, *args, **kargs):
194 pass
195
196 def set_default_headers(self):
197 self.set_header('Access-Control-Allow-Origin', '*')
198 self.set_header('Access-Control-Allow-Headers',
199 'Content-Type, Cache-Control, Accept, X-Requested-With, Authorization')
200 self.set_header('Access-Control-Allow-Methods', 'POST, GET, PUT, DELETE')
201
202 def initialize(self, log, loop, store_map, exporter, catalog_map):
203 self.loop = loop
204 self.transaction_id = str(uuid.uuid4())
205 self.log = message.Logger(
206 log,
207 self.application.messages[self.transaction_id],
208 )
209 self.store_map = store_map
210 self.exporter = exporter
211 self.catalog_map = catalog_map
212
213 def get(self, desc_type):
214 if desc_type not in self.catalog_map:
215 raise tornado.web.HTTPError(400, "unknown descriptor type: {}".format(desc_type))
216
217 self.log.message(ExportStart())
218
219 # Parse the IDs
220 ids_query = self.get_query_argument("ids")
221 ids = [id.strip() for id in ids_query.split(',')]
222 if len(ids) != 1:
223 raise message.MessageException(ExportSingleDescriptorOnlyError)
224 desc_id = ids[0]
225
226 catalog = self.catalog_map[desc_type]
227
228 if desc_id not in catalog:
229 raise tornado.web.HTTPError(400, "unknown descriptor id: {}".format(desc_id))
230
231 desc_msg = catalog[desc_id]
232
233 # Get the schema for exporting
234 schema = self.get_argument("schema", default="rift")
235
236 # Get the grammar for exporting
237 grammar = self.get_argument("grammar", default="osm")
238
239 # Get the format for exporting
240 format_ = self.get_argument("format", default="yaml")
241
242 filename = None
243
244 if grammar == 'tosca':
245 filename = "{}.zip".format(self.transaction_id)
246 self.export_tosca(schema, format_, desc_type, desc_id, desc_msg)
247 self.log.message(message.FilenameMessage(filename))
248 else:
249 filename = "{}.tar.gz".format(self.transaction_id)
250 self.export_rift(schema, format_, desc_type, desc_id, desc_msg)
251 self.log.message(message.FilenameMessage(filename))
252
253 self.log.message(ExportSuccess())
254
255 if filename is not None:
256 self.write(tornado.escape.json_encode({
257 "transaction_id": self.transaction_id,
258 "filename": filename,
259 }))
260 else:
261 self.write(tornado.escape.json_encode({
262 "transaction_id": self.transaction_id,
263 }))
264
265 def export_rift(self, schema, format_, desc_type, desc_id, desc_msg):
266 convert = rift.package.convert
267 schema_serializer_map = {
268 "rift": {
269 "vnfd": convert.RwVnfdSerializer,
270 "nsd": convert.RwNsdSerializer,
271 },
272 "mano": {
273 "vnfd": convert.VnfdSerializer,
274 "nsd": convert.NsdSerializer,
275 }
276 }
277
278 if schema not in schema_serializer_map:
279 raise tornado.web.HTTPError(400, "unknown schema: {}".format(schema))
280
281 if format_ != "yaml":
282 self.log.warn("Only yaml format supported for export")
283
284 if desc_type not in schema_serializer_map[schema]:
285 raise tornado.web.HTTPError(400, "unknown descriptor type: {}".format(desc_type))
286
287 # Use the rift superset schema as the source
288 src_serializer = schema_serializer_map["rift"][desc_type]()
289
290 dest_serializer = schema_serializer_map[schema][desc_type]()
291
292 package_store = self.store_map[desc_type]
293
294 # Attempt to get the package from the package store
295 # If that fails, create a temporary package using the descriptor only
296 try:
297 package = package_store.get_package(desc_id)
298 except rift.package.store.PackageNotFoundError:
299 self.log.debug("stored package not found. creating package from descriptor config")
300
301 desc_yaml_str = src_serializer.to_yaml_string(desc_msg)
302 with io.BytesIO(desc_yaml_str.encode()) as hdl:
303 hdl.name = "{}__{}.yaml".format(desc_msg.id, desc_type)
304 package = rift.package.package.DescriptorPackage.from_descriptor_file_hdl(
305 self.log, hdl
306 )
307
308 self.exporter.export_package(
309 package=package,
310 export_dir=self.application.export_dir,
311 file_id=self.transaction_id,
312 json_desc_str=src_serializer.to_json_string(desc_msg),
313 dest_serializer=dest_serializer,
314 )
315
316 def export_tosca(self, format_, schema, desc_type, desc_id, desc_msg):
317 if format_ != "yaml":
318 self.log.warn("Only yaml format supported for TOSCA export")
319
320 if desc_type != "nsd":
321 raise tornado.web.HTTPError(
322 400,
323 "NSD need to passed to generate TOSCA: {}".format(desc_type))
324
325 def get_pkg_from_store(id_, type_):
326 package = None
327 # Attempt to get the package from the package store
328 try:
329 package_store = self.store_map[type_]
330 package = package_store.get_package(id_)
331
332 except rift.package.store.PackageNotFoundError:
333 self.log.debug("stored package not found for {}.".format(id_))
334 except rift.package.store.PackageStoreError:
335 self.log.debug("stored package error for {}.".format(id_))
336
337 return package
338
339 pkg = tosca.ExportTosca()
340
341 # Add NSD and related descriptors for exporting
342 nsd_id = pkg.add_nsd(desc_msg, get_pkg_from_store(desc_id, "nsd"))
343
344 catalog = self.catalog_map["vnfd"]
345 for const_vnfd in desc_msg.constituent_vnfd:
346 vnfd_id = const_vnfd.vnfd_id_ref
347 if vnfd_id in catalog:
348 pkg.add_vnfd(nsd_id,
349 catalog[vnfd_id],
350 get_pkg_from_store(vnfd_id, "vnfd"))
351 else:
352 raise tornado.web.HTTPError(
353 400,
354 "Unknown VNFD descriptor {} for NSD {}".
355 format(vnfd_id, nsd_id))
356
357 # Create the archive.
358 pkg.create_archive(self.transaction_id,
359 dest=self.application.export_dir)
360
361
362 class ExportStateHandler(state.StateHandler):
363 STARTED = ExportStart
364 SUCCESS = ExportSuccess
365 FAILURE = ExportFailure
366
367
368 @asyncio.coroutine
369 def periodic_export_cleanup(log, loop, export_dir, period_secs=10 * 60, min_age_secs=30 * 60):
370 """ Periodically cleanup old exported archives (.tar.gz files) in export_dir
371
372 Arguments:
373 log - A Logger instance
374 loop - A asyncio event loop
375 export_dir - The directory to cleanup old archives in
376 period_secs - The number of seconds between clean ups
377 min_age_secs - The minimum age of a archive to be eligible for cleanup
378
379 """
380 log.debug("Starting periodic export cleaning for export directory: %s", export_dir)
381
382 # Create export dir if not created yet
383 if not os.path.exists(export_dir):
384 os.makedirs(export_dir)
385
386 while True:
387 yield from asyncio.sleep(period_secs, loop=loop)
388
389 if not os.path.exists(export_dir):
390 continue
391
392 for file_name in os.listdir(export_dir):
393 if not file_name.endswith(".tar.gz"):
394 continue
395
396 file_path = os.path.join(export_dir, file_name)
397
398 try:
399 file_stat = os.stat(file_path)
400 except OSError as e:
401 log.warning("Could not stat old exported archive: %s", str(e))
402 continue
403
404 file_age = time.time() - file_stat[stat.ST_MTIME]
405
406 if file_age < min_age_secs:
407 continue
408
409 log.debug("Cleaning up old exported archive: %s", file_path)
410
411 try:
412 os.remove(file_path)
413 except OSError as e:
414 log.warning("Failed to remove old exported archive: %s", str(e))