update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / rwlaunchpad / plugins / rwlaunchpadtasklet / rift / tasklets / rwlaunchpad / export.py
1
2 #
3 # Copyright 2016-2017 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19 import io
20 import os.path
21 import stat
22 import time
23 import uuid
24 import collections
25 import json
26
27 import tornado.web
28
29 import rift.package.archive
30 import rift.package.checksums
31 import rift.package.package
32 import rift.package.store
33 import rift.package.image
34
35 from . import state
36 from . import message
37 from . import tosca
38
39 import gi
40 gi.require_version('RwPkgMgmtYang', '1.0')
41
42 from gi.repository import (
43 RwPkgMgmtYang,
44 RwVnfdYang,
45 RwProjectVnfdYang,
46 RwNsdYang,
47 RwProjectNsdYang
48 )
49 import rift.mano.dts as mano_dts
50
51
52 RPC_PACKAGE_EXPORT_ENDPOINT = RwPkgMgmtYang.YangOutput_RwPkgMgmt_PackageExport
53
54
55 class ExportStart(message.StatusMessage):
56 def __init__(self):
57 super().__init__("export-started", "export process started")
58
59
60 class ExportSuccess(message.StatusMessage):
61 def __init__(self):
62 super().__init__("export-success", "export process successfully completed")
63
64
65 class ExportFailure(message.StatusMessage):
66 def __init__(self):
67 super().__init__("export-failure", "export process failed")
68
69
70 class ExportError(message.ErrorMessage):
71 def __init__(self, msg):
72 super().__init__("update-error", msg)
73
74
75 class ExportSingleDescriptorOnlyError(ExportError):
76 def __init__(self):
77 super().__init__("Only a single descriptor can be exported")
78
79
80 class ArchiveExportError(Exception):
81 pass
82
83
84 class DescriptorPackageArchiveExporter(object):
85 def __init__(self, log):
86 self._log = log
87
88 def _create_archive_from_package(self, archive_hdl, package, open_fn, top_level_dir=None):
89 orig_open = package.open
90 try:
91 package.open = open_fn
92 archive = rift.package.archive.TarPackageArchive.from_package(
93 self._log, package, archive_hdl, top_level_dir
94 )
95 return archive
96 finally:
97 package.open = orig_open
98
99 def create_archive(self, archive_hdl, package, desc_json_str, serializer, project=None):
100 """ Create a package archive from an existing package, descriptor messages,
101 and a destination serializer.
102
103 In order to stay flexible with the package directory structure and
104 descriptor format, attempt to "augment" the onboarded package with the
105 updated descriptor in the original format. If the original package
106 contained a checksum file, then recalculate the descriptor checksum.
107
108 Arguments:
109 archive_hdl - An open file handle with 'wb' permissions
110 package - A DescriptorPackage instance
111 desc_json_str - A descriptor (e.g. nsd, vnfd) protobuf message
112 serializer - A destination serializer (e.g. VnfdSerializer)
113
114 Returns:
115 A TarPackageArchive
116
117 Raises:
118 ArchiveExportError - The exported archive failed to create
119
120 """
121 new_desc_msg = serializer.from_file_hdl(io.BytesIO(desc_json_str.encode()), ".json", project)
122 _, dest_ext = os.path.splitext(package.descriptor_file)
123 new_desc_hdl = io.BytesIO(serializer.to_string(new_desc_msg, dest_ext).encode())
124 descriptor_checksum = rift.package.checksums.checksum(new_desc_hdl)
125
126 checksum_file = None
127 try:
128 checksum_file = rift.package.package.PackageChecksumValidator.get_package_checksum_file(
129 package
130 )
131
132 except FileNotFoundError:
133 pass
134
135 # Since we're going to intercept the open function to rewrite the descriptor
136 # and checksum, save a handle to use below
137 open_fn = package.open
138
139 def create_checksum_file_hdl():
140 with open_fn(checksum_file) as checksum_hdl:
141 archive_checksums = rift.package.checksums.ArchiveChecksums.from_file_desc(
142 checksum_hdl
143 )
144
145 # Get the name of the descriptor file without the prefix
146 # (which is what is stored in the checksum file)
147 desc_file_no_prefix = os.path.relpath(package.descriptor_file, package.prefix)
148 archive_checksums[desc_file_no_prefix] = descriptor_checksum
149
150 checksum_hdl = io.BytesIO(archive_checksums.to_string().encode())
151 return checksum_hdl
152
153 def open_wrapper(rel_path):
154 """ Wraps the package open in order to rewrite the descriptor file and checksum """
155 if rel_path == package.descriptor_file:
156 return new_desc_hdl
157
158 elif rel_path == checksum_file:
159 return create_checksum_file_hdl()
160
161 return open_fn(rel_path)
162
163 archive = self._create_archive_from_package(archive_hdl, package, open_wrapper, new_desc_msg.name)
164
165 return archive
166
167 def export_package(self, package, export_dir, file_id, json_desc_str, dest_serializer, project=None):
168 """ Export package as an archive to the export directory
169
170 Arguments:
171 package - A DescriptorPackage instance
172 export_dir - The directory to export the package archive to
173 file_id - A unique file id to name the archive as (i.e. <file_id>.tar.gz)
174 json_desc_str - A descriptor (e.g. nsd, vnfd) json message string
175 dest_serializer - A destination serializer (e.g. VnfdSerializer)
176
177 Returns:
178 The created archive path
179
180 Raises:
181 ArchiveExportError - The exported archive failed to create
182 """
183 try:
184 os.makedirs(export_dir, exist_ok=True)
185 except FileExistsError:
186 pass
187
188 archive_path = os.path.join(export_dir, file_id + ".tar.gz")
189 with open(archive_path, 'wb') as archive_hdl:
190 try:
191 self.create_archive(
192 archive_hdl, package, json_desc_str, dest_serializer, project
193 )
194 except Exception as e:
195 os.remove(archive_path)
196 msg = "Failed to create exported archive"
197 self._log.error(msg)
198 raise ArchiveExportError(msg) from e
199
200 return archive_path
201
202
203 class ExportRpcHandler(mano_dts.AbstractRpcHandler):
204 def __init__(self, application, catalog_map):
205 """
206 Args:
207 application: UploaderApplication
208 calalog_map: Dict containing Vnfds and Nsd onboarding.
209 """
210 super().__init__(application.log, application.dts, application.loop)
211
212 self.application = application
213 self.exporter = application.exporter
214 self.onboarder = application.onboarder
215 self.catalog_map = catalog_map
216
217
218
219 @property
220 def xpath(self):
221 return "/rw-pkg-mgmt:package-export"
222
223 @asyncio.coroutine
224 def callback(self, ks_path, msg):
225 transaction_id = str(uuid.uuid4())
226 log = message.Logger(
227 self.log,
228 self.application.messages[transaction_id],
229 )
230
231 file_name = self.export(transaction_id, log, msg)
232
233 rpc_out = RPC_PACKAGE_EXPORT_ENDPOINT.from_dict({
234 'transaction_id': transaction_id,
235 'filename': file_name})
236
237 return rpc_out
238
239 def export(self, transaction_id, log, msg):
240 DESC_TYPE_PB_MAP = {
241 "vnfd": RwProjectVnfdYang.YangData_RwProject_Project_VnfdCatalog_Vnfd,
242 "nsd": RwProjectNsdYang.YangData_RwProject_Project_NsdCatalog_Nsd
243 }
244
245 log.message(ExportStart())
246 desc_type = msg.package_type.lower()
247
248 if desc_type not in self.catalog_map:
249 raise ValueError("Invalid package type: {}".format(desc_type))
250
251 # Parse the IDs
252 desc_id = msg.package_id
253 catalog = self.catalog_map[desc_type](project=msg.project_name)
254
255 # TODO: Descriptor isn't available from catalog info passed in from launchpad tasklet.
256 # If unavailable, create a filler descriptor object, which will be updated
257 # via GET call to config.
258 if desc_id in catalog:
259 desc_msg = catalog[desc_id]
260 else:
261 log.warn("Unable to find package ID in catalog: {}".format(desc_id))
262 desc_msg = DESC_TYPE_PB_MAP[desc_type](id = desc_id)
263
264 self.store_map = self.application.build_store_map(project=msg.project_name)
265 self.project_name = msg.project_name if msg.has_field('project_name') else None
266
267 # Get the schema for exporting
268 schema = msg.export_schema.lower()
269
270 # Get the grammar for exporting
271 grammar = msg.export_grammar.lower()
272
273 # Get the format for exporting
274 format_ = msg.export_format.lower()
275
276 # Initial value of the exported filename
277 self.filename = "{name}_{ver}".format(
278 name=desc_msg.name,
279 ver=desc_msg.version)
280
281 if grammar == 'tosca':
282 self.export_tosca(schema, format_, desc_type, desc_id, desc_msg, log, transaction_id)
283 filename = "{}.zip".format(self.filename)
284 log.message(message.FilenameMessage(filename))
285 else:
286 self.export_rift(schema, format_, desc_type, desc_id, desc_msg, log, transaction_id)
287 filename = "{}.tar.gz".format(self.filename)
288 log.message(message.FilenameMessage(filename))
289
290 log.message(ExportSuccess())
291
292 return filename
293
294 def export_rift(self, schema, format_, desc_type, desc_id, desc_msg, log, transaction_id):
295 convert = rift.package.convert
296 schema_serializer_map = {
297 "rift": {
298 "vnfd": convert.RwVnfdSerializer,
299 "nsd": convert.RwNsdSerializer,
300 },
301 "mano": {
302 "vnfd": convert.RwVnfdSerializer,
303 "nsd": convert.RwNsdSerializer,
304 }
305 }
306
307 if schema not in schema_serializer_map:
308 raise tornado.web.HTTPError(400, "unknown schema: {}".format(schema))
309
310 if format_ != "yaml":
311 log.warn("Only yaml format supported for export")
312
313 if desc_type not in schema_serializer_map[schema]:
314 raise tornado.web.HTTPError(400, "unknown descriptor type: {}".format(desc_type))
315
316 # Use the rift superset schema as the source
317 src_serializer = schema_serializer_map["rift"][desc_type]()
318
319 dest_serializer = schema_serializer_map[schema][desc_type]()
320
321 package_store = self.store_map[desc_type]
322
323 # Attempt to get the package from the package store
324 # If that fails, create a temporary package using the descriptor only
325 try:
326 package = package_store.get_package(desc_id)
327 #Remove the image file from the package while exporting
328 for file in package.files:
329 if rift.package.image.is_image_file(file):
330 package.remove_file(file)
331
332 except rift.package.store.PackageNotFoundError:
333 log.debug("stored package not found. creating package from descriptor config")
334
335 desc_yaml_str = src_serializer.to_yaml_string(desc_msg)
336 with io.BytesIO(desc_yaml_str.encode()) as hdl:
337 hdl.name = "{}__{}.yaml".format(desc_msg.id, desc_type)
338 package = rift.package.package.DescriptorPackage.from_descriptor_file_hdl(
339 log, hdl
340 )
341
342 # Get the updated descriptor from the api endpoint to get any updates
343 # made to the catalog. Also desc_msg may not be populated correctly as yet.
344 #
345
346 try:
347 # merge the descriptor content: for rbac everything needs to be project rooted, with project name.
348 D = collections.defaultdict(dict)
349 sub_dict = self.onboarder.get_updated_descriptor(desc_msg, self.project_name)
350
351 if self.project_name:
352 D["project"] = dict(name = self.project_name)
353 root_key, sub_key = "project-{0}:{0}-catalog".format(desc_type), "project-{0}:{0}".format(desc_type)
354 D["project"].update({root_key: sub_dict})
355 else:
356 root_key, sub_key = "{0}:{0}-catalog".format(desc_type), "{0}:{0}".format(desc_type)
357 D[root_key] = sub_dict
358
359 json_desc_msg = json.dumps(D)
360 desc_name, desc_version = sub_dict[sub_key]['name'], sub_dict[sub_key].get('version', '')
361
362 except Exception as e:
363 msg = "Exception {} raised - {}".format(e.__class__.__name__, str(e))
364 self.log.error(msg)
365 raise ArchiveExportError(msg) from e
366
367 # exported filename based on the updated descriptor name
368 self.filename = "{}_{}".format(desc_name, desc_version)
369 self.log.debug("JSON string for descriptor: {}".format(json_desc_msg))
370
371 self.exporter.export_package(
372 package=package,
373 export_dir=self.application.export_dir,
374 file_id = self.filename,
375 json_desc_str=json_desc_msg,
376 dest_serializer=dest_serializer,
377 project=self.project_name,
378 )
379
380 def export_tosca(self, format_, schema, desc_type, desc_id, desc_msg, log, transaction_id):
381 if format_ != "yaml":
382 log.warn("Only yaml format supported for TOSCA export")
383
384 def get_pkg_from_store(id_, type_):
385 package = None
386 # Attempt to get the package from the package store
387 try:
388 package_store = self.store_map[type_]
389 package = package_store.get_package(id_)
390
391 except rift.package.store.PackageNotFoundError:
392 log.debug("stored package not found for {}.".format(id_))
393 except rift.package.store.PackageStoreError:
394 log.debug("stored package error for {}.".format(id_))
395
396 return package
397
398 if desc_type == "nsd":
399 pkg = tosca.ExportTosca()
400
401 # Add NSD and related descriptors for exporting
402 nsd_id = pkg.add_nsd(desc_msg, get_pkg_from_store(desc_id, "nsd"))
403
404 catalog = self.catalog_map["vnfd"]
405 for const_vnfd in desc_msg.constituent_vnfd:
406 vnfd_id = const_vnfd.vnfd_id_ref
407 if vnfd_id in catalog:
408 pkg.add_vnfd(nsd_id,
409 catalog[vnfd_id],
410 get_pkg_from_store(vnfd_id, "vnfd"))
411 else:
412 raise tornado.web.HTTPError(
413 400,
414 "Unknown VNFD descriptor {} for NSD {}".
415 format(vnfd_id, nsd_id))
416
417 # Create the archive.
418 pkg.create_archive(transaction_id,
419 dest=self.application.export_dir)
420 if desc_type == "vnfd":
421 pkg = tosca.ExportTosca()
422 vnfd_id = desc_msg.id
423 pkg.add_single_vnfd(vnfd_id,
424 desc_msg,
425 get_pkg_from_store(vnfd_id, "vnfd"))
426
427 # Create the archive.
428 pkg.create_archive(transaction_id,
429 dest=self.application.export_dir)
430
431
432 class ExportStateHandler(state.StateHandler):
433 STARTED = ExportStart
434 SUCCESS = ExportSuccess
435 FAILURE = ExportFailure
436
437
438 @asyncio.coroutine
439 def periodic_export_cleanup(log, loop, export_dir, period_secs=10 * 60, min_age_secs=30 * 60):
440 """ Periodically cleanup old exported archives (.tar.gz files) in export_dir
441
442 Arguments:
443 log - A Logger instance
444 loop - A asyncio event loop
445 export_dir - The directory to cleanup old archives in
446 period_secs - The number of seconds between clean ups
447 min_age_secs - The minimum age of a archive to be eligible for cleanup
448
449 """
450 log.debug("Starting periodic export cleaning for export directory: %s", export_dir)
451
452 # Create export dir if not created yet
453 if not os.path.exists(export_dir):
454 os.makedirs(export_dir)
455
456 while True:
457 yield from asyncio.sleep(period_secs, loop=loop)
458
459 if not os.path.exists(export_dir):
460 continue
461
462 for file_name in os.listdir(export_dir):
463 if not file_name.endswith(".tar.gz"):
464 continue
465
466 file_path = os.path.join(export_dir, file_name)
467
468 try:
469 file_stat = os.stat(file_path)
470 except OSError as e:
471 log.warning("Could not stat old exported archive: %s", str(e))
472 continue
473
474 file_age = time.time() - file_stat[stat.ST_MTIME]
475
476 if file_age < min_age_secs:
477 continue
478
479 log.debug("Cleaning up old exported archive: %s", file_path)
480
481 try:
482 os.remove(file_path)
483 except OSError as e:
484 log.warning("Failed to remove old exported archive: %s", str(e))