Merge from OSM SO master
[osm/SO.git] / rwlaunchpad / plugins / rwlaunchpadtasklet / rift / tasklets / rwlaunchpad / tasklet.py
1
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19
20 import tornado
21 import tornado.httputil
22 import tornado.httpserver
23 import tornado.platform.asyncio
24
25 import tornadostreamform.multipart_streamer as multipart_streamer
26
27 import gi
28 gi.require_version('RwDts', '1.0')
29 gi.require_version('RwcalYang', '1.0')
30 gi.require_version('RwTypes', '1.0')
31 gi.require_version('RwLaunchpadYang', '1.0')
32
33 from gi.repository import (
34 RwDts as rwdts,
35 RwLaunchpadYang as rwlaunchpad,
36 RwcalYang as rwcal,
37 RwTypes,
38 )
39
40 import rift.tasklets
41 import rift.mano.cloud
42 import rift.mano.config_agent
43 from rift.mano.utils.project import (
44 ManoProject,
45 ProjectHandler,
46 get_add_delete_update_cfgs,
47 DEFAULT_PROJECT,
48 )
49 from rift.package import store
50
51 from . import uploader
52 from . import datacenters
53
54 MB = 1024 * 1024
55 GB = 1024 * MB
56 TB = 1024 * GB
57
58 MAX_BUFFER_SIZE = 1 * MB # Max. size loaded into memory!
59 MAX_BODY_SIZE = 1 * MB # Max. size loaded into memory!
60
61
62 class LaunchpadError(Exception):
63 pass
64
65 class LpProjectNotFound(Exception):
66 pass
67
68 class CatalogDtsHandler(object):
69 def __init__(self, project, app):
70 self.app = app
71 self.reg = None
72 self.project = project
73
74 @property
75 def log(self):
76 return self.project.log
77
78 @property
79 def dts(self):
80 return self.project.dts
81
82
83 class NsdCatalogDtsHandler(CatalogDtsHandler):
84 XPATH = "C,/project-nsd:nsd-catalog/project-nsd:nsd"
85
86 def add_nsd(self, nsd):
87 self.log.debug('nsd-catalog-handler:add:{}'.format(nsd.id))
88 if nsd.id not in self.project.nsd_catalog:
89 self.project.nsd_catalog[nsd.id] = nsd
90 else:
91 self.log.error("nsd already in catalog: {}".format(nsd.id))
92
93 def update_nsd(self, nsd):
94 self.log.debug('nsd-catalog-handler:update:{}'.format(nsd.id))
95 if nsd.id in self.project.nsd_catalog:
96 self.project.nsd_catalog[nsd.id] = nsd
97 else:
98 self.log.error("unrecognized NSD: {}".format(nsd.id))
99
100 def delete_nsd(self, nsd_id):
101 self.log.debug('nsd-catalog-handler:delete:{}'.format(nsd_id))
102 if nsd_id in self.project.nsd_catalog:
103 del self.project.nsd_catalog[nsd_id]
104 else:
105 self.log.error("unrecognized NSD: {}".format(nsd_id))
106
107 try:
108 self.project.tasklet.nsd_package_store.delete_package(nsd_id)
109 except store.PackageStoreError as e:
110 self.log.warning("could not delete package from store: %s", str(e))
111
112 @asyncio.coroutine
113 def register(self):
114 def apply_config(dts, acg, xact, action, _):
115 if xact.xact is None:
116 # When RIFT first comes up, an INSTALL is called with the current config
117 # Since confd doesn't actally persist data this never has any data so
118 # skip this for now.
119 self.log.debug("No xact handle. Skipping apply config")
120 return
121
122 add_cfgs, delete_cfgs, update_cfgs = get_add_delete_update_cfgs(
123 dts_member_reg=self.reg,
124 xact=xact,
125 key_name="id",
126 )
127
128 # Handle Deletes
129 for cfg in delete_cfgs:
130 self.delete_nsd(cfg.id)
131
132 # Handle Adds
133 for cfg in add_cfgs:
134 self.add_nsd(cfg)
135
136 # Handle Updates
137 for cfg in update_cfgs:
138 self.update_nsd(cfg)
139
140 self.log.debug("Registering for NSD catalog in project".
141 format(self.project.name))
142
143 acg_handler = rift.tasklets.AppConfGroup.Handler(
144 on_apply=apply_config,
145 )
146
147 with self.dts.appconf_group_create(acg_handler) as acg:
148 xpath = self.project.add_project(NsdCatalogDtsHandler.XPATH)
149 self.reg = acg.register(
150 xpath=xpath,
151 flags=rwdts.Flag.SUBSCRIBER,
152 )
153
154 def deregister(self):
155 if self.reg:
156 self.reg.deregister()
157 self.reg = None
158
159
160 class VnfdCatalogDtsHandler(CatalogDtsHandler):
161 XPATH = "C,/project-vnfd:vnfd-catalog/project-vnfd:vnfd"
162
163 def add_vnfd(self, vnfd):
164 self.log.debug('vnfd-catalog-handler:add:{}'.format(vnfd.id))
165 if vnfd.id not in self.project.vnfd_catalog:
166 self.project.vnfd_catalog[vnfd.id] = vnfd
167
168 else:
169 self.log.error("VNFD already in catalog: {}".format(vnfd.id))
170
171 def update_vnfd(self, vnfd):
172 self.log.debug('vnfd-catalog-handler:update:{}'.format(vnfd.id))
173 if vnfd.id in self.project.vnfd_catalog:
174 self.project.vnfd_catalog[vnfd.id] = vnfd
175
176 else:
177 self.log.error("unrecognized VNFD: {}".format(vnfd.id))
178
179 def delete_vnfd(self, vnfd_id):
180 self.log.debug('vnfd-catalog-handler:delete:{}'.format(vnfd_id))
181 if vnfd_id in self.project.vnfd_catalog:
182 del self.project.vnfd_catalog[vnfd_id]
183 else:
184 self.log.error("unrecognized VNFD: {}".format(vnfd_id))
185
186 try:
187 self.project.tasklet.vnfd_package_store.delete_package(vnfd_id)
188 except store.PackageStoreError as e:
189 self.log.warning("could not delete package from store: %s", str(e))
190
191 @asyncio.coroutine
192 def register(self):
193 def apply_config(dts, acg, xact, action, _):
194 if xact.xact is None:
195 # When RIFT first comes up, an INSTALL is called with the current config
196 # Since confd doesn't actally persist data this never has any data so
197 # skip this for now.
198 self.log.debug("No xact handle. Skipping apply config")
199 return
200
201 add_cfgs, delete_cfgs, update_cfgs = get_add_delete_update_cfgs(
202 dts_member_reg=self.reg,
203 xact=xact,
204 key_name="id",
205 )
206
207 # Handle Deletes
208 for cfg in delete_cfgs:
209 self.delete_vnfd(cfg.id)
210
211 # Handle Adds
212 for cfg in add_cfgs:
213 self.add_vnfd(cfg)
214
215 # Handle Updates
216 for cfg in update_cfgs:
217 self.update_vnfd(cfg)
218
219 self.log.debug("Registering for VNFD catalog in project {}".
220 format(self.project.name))
221
222 acg_handler = rift.tasklets.AppConfGroup.Handler(
223 on_apply=apply_config,
224 )
225
226 with self.dts.appconf_group_create(acg_handler) as acg:
227 xpath = self.project.add_project(VnfdCatalogDtsHandler.XPATH)
228 self.reg = acg.register(
229 xpath=xpath,
230 flags=rwdts.Flag.SUBSCRIBER,
231 )
232
233 def deregister(self):
234 if self.reg:
235 self.reg.deregister()
236 self.reg = None
237
238 class CfgAgentAccountHandlers(object):
239 def __init__(self, dts, log, log_hdl, loop, project):
240 self._dts = dts
241 self._log = log
242 self._log_hdl = log_hdl
243 self._loop = loop
244 self._project = project
245
246 self._log.debug("creating config agent account config handler")
247 self.cfg_agent_cfg_handler = rift.mano.config_agent.ConfigAgentSubscriber(
248 self._dts, self._log, self._project,
249 rift.mano.config_agent.ConfigAgentCallbacks(
250 on_add_apply=self.on_cfg_agent_account_added,
251 on_delete_apply=self.on_cfg_agent_account_deleted,
252 )
253 )
254
255 self._log.debug("creating config agent account opdata handler")
256 self.cfg_agent_operdata_handler = rift.mano.config_agent.CfgAgentDtsOperdataHandler(
257 self._dts, self._log, self._loop, self._project
258 )
259
260 def on_cfg_agent_account_deleted(self, account):
261 self._log.debug("config agent account deleted")
262 self.cfg_agent_operdata_handler.delete_cfg_agent_account(account.name)
263
264 def on_cfg_agent_account_added(self, account):
265 self._log.debug("config agent account added")
266 self.cfg_agent_operdata_handler.add_cfg_agent_account(account)
267
268 @asyncio.coroutine
269 def register(self):
270 self.cfg_agent_cfg_handler.register()
271 yield from self.cfg_agent_operdata_handler.register()
272
273 def deregister(self):
274 self.cfg_agent_operdata_handler.deregister()
275 self.cfg_agent_cfg_handler.deregister()
276
277
278 class CloudAccountHandlers(object):
279 def __init__(self, dts, log, log_hdl, loop, app, project):
280 self._log = log
281 self._log_hdl = log_hdl
282 self._dts = dts
283 self._loop = loop
284 self._app = app
285 self._project = project
286
287 self._log.debug("Creating cloud account config handler for project {}".
288 format(project.name))
289 self.cloud_cfg_handler = rift.mano.cloud.CloudAccountConfigSubscriber(
290 self._dts, self._log, self._log_hdl, self._project,
291 rift.mano.cloud.CloudAccountConfigCallbacks(
292 on_add_apply=self.on_cloud_account_added,
293 on_delete_apply=self.on_cloud_account_deleted,
294 ),
295 )
296
297 self._log.debug("creating cloud account opdata handler")
298 self.cloud_operdata_handler = rift.mano.cloud.CloudAccountDtsOperdataHandler(
299 self._dts, self._log, self._loop, self._project,
300 )
301
302 def on_cloud_account_deleted(self, account_name):
303 self._log.debug("cloud account deleted")
304 self._app.accounts[self._project.name] = \
305 list(self.cloud_cfg_handler.accounts.values())
306 self.cloud_operdata_handler.delete_cloud_account(account_name)
307
308 def on_cloud_account_added(self, account):
309 self._log.debug("cloud account added")
310 self._app.accounts[self._project.name] = \
311 list(self.cloud_cfg_handler.accounts.values())
312 self._log.debug("accounts: %s", self._app.accounts)
313 self.cloud_operdata_handler.add_cloud_account(account)
314
315 @asyncio.coroutine
316 def register(self):
317 yield from self.cloud_cfg_handler.register()
318 yield from self.cloud_operdata_handler.register()
319
320 def deregister(self):
321 self.cloud_cfg_handler.deregister()
322 self.cloud_operdata_handler.deregister()
323
324
325 class LaunchpadProject(ManoProject):
326
327 def __init__(self, name, tasklet, **kw):
328 super(LaunchpadProject, self).__init__(tasklet.log, name)
329 self.update(tasklet)
330 self._app = kw['app']
331
332 self.config_handler = None
333 self.nsd_catalog_handler = None
334 self.vld_catalog_handler = None
335 self.vnfd_catalog_handler = None
336 self.cloud_handler = None
337 self.datacenter_handler = None
338 self.lp_config_handler = None
339 self.account_handler = None
340
341 self.nsd_catalog = dict()
342 self.vld_catalog = dict()
343 self.vnfd_catalog = dict()
344
345 @property
346 def dts(self):
347 return self._dts
348
349 @property
350 def loop(self):
351 return self._loop
352
353 @asyncio.coroutine
354 def register(self):
355 self.log.debug("creating NSD catalog handler for project {}".format(self.name))
356 self.nsd_catalog_handler = NsdCatalogDtsHandler(self, self._app)
357 yield from self.nsd_catalog_handler.register()
358
359 self.log.debug("creating VNFD catalog handler for project {}".format(self.name))
360 self.vnfd_catalog_handler = VnfdCatalogDtsHandler(self, self._app)
361 yield from self.vnfd_catalog_handler.register()
362
363 self.log.debug("creating datacenter handler for project {}".format(self.name))
364 self.datacenter_handler = datacenters.DataCenterPublisher(self.log, self.dts,
365 self.loop, self)
366 yield from self.datacenter_handler.register()
367
368 self.log.debug("creating cloud account handler for project {}".format(self.name))
369 self.cloud_handler = CloudAccountHandlers(self.dts, self.log, self.log_hdl,
370 self.loop, self._app, self)
371 yield from self.cloud_handler.register()
372
373 self.log.debug("creating config agent handler for project {}".format(self.name))
374 self.config_handler = CfgAgentAccountHandlers(self.dts, self.log, self.log_hdl,
375 self.loop, self)
376 yield from self.config_handler.register()
377
378 def deregister(self):
379 self.log.debug("De-register handlers for project: {}".format(self.name))
380 self.config_handler.deregister()
381 self.cloud_handler.deregister()
382 self.datacenter_handler.deregister()
383 self.vnfd_catalog_handler.deregister()
384 self.nsd_catalog_handler.deregister()
385
386 @asyncio.coroutine
387 def delete_prepare(self):
388 # TODO: Do we need this check
389 # if self.nsd_catalog or self.vnfd_catalog or self.vld_catalog:
390 # return False
391 return True
392
393 @property
394 def cloud_accounts(self):
395 if self.cloud_handler is None:
396 return list()
397
398 return list(self.cloud_handler.cloud_cfg_handler.accounts.values())
399
400
401 class LaunchpadTasklet(rift.tasklets.Tasklet):
402 UPLOAD_MAX_BODY_SIZE = MAX_BODY_SIZE
403 UPLOAD_MAX_BUFFER_SIZE = MAX_BUFFER_SIZE
404 UPLOAD_PORT = "4567"
405
406 def __init__(self, *args, **kwargs):
407 super(LaunchpadTasklet, self).__init__(*args, **kwargs)
408 self.rwlog.set_category("rw-mano-log")
409 self.rwlog.set_subcategory("launchpad")
410
411 self.dts = None
412 self.project_handler = None
413
414 self.vnfd_package_store = store.VnfdPackageFilesystemStore(self.log)
415 self.nsd_package_store = store.NsdPackageFilesystemStore(self.log)
416
417 self.app = None
418 self.server = None
419 self.projects = {}
420 print("LP Tasklet init")
421
422 def _get_project(project=None):
423 if project is None:
424 project = DEFAULT_PROJECT
425
426 if project in self.projects:
427 return self.projects[project]
428
429 msg = "Project {} not found".format(project)
430 self._log.error(msg)
431 raise LpProjectNotFound(msg)
432
433 def nsd_catalog_get(self, project=None):
434 return self._get_project(project=project).nsd_catalog
435
436 def vnfd_catalog_get(self, project=None):
437 return self._get_project(project=project).vnfd_catalog
438
439 def get_cloud_accounts(self, project=None):
440 return self._get_project(project=project).cloud_accounts
441
442 def start(self):
443 super(LaunchpadTasklet, self).start()
444 self.log.info("Starting LaunchpadTasklet")
445
446 self.log.debug("Registering with dts")
447 self.dts = rift.tasklets.DTS(
448 self.tasklet_info,
449 rwlaunchpad.get_schema(),
450 self.loop,
451 self.on_dts_state_change
452 )
453
454 self.log.debug("Created DTS Api GI Object: %s", self.dts)
455
456 def stop(self):
457 try:
458 self.server.stop()
459 self.dts.deinit()
460 except Exception:
461 self.log.exception("Caught Exception in LP stop")
462 raise
463
464 def get_vnfd_catalog(self, project):
465 return self.projects[project].vnfd_catalog
466
467 def get_nsd_catalog(self, project):
468 return self.projects[project].nsd_catalog
469
470 @asyncio.coroutine
471 def init(self):
472 try:
473 io_loop = rift.tasklets.tornado.TaskletAsyncIOLoop(asyncio_loop=self.loop)
474 self.app = uploader.UploaderApplication.from_tasklet(self)
475 yield from self.app.register()
476
477 manifest = self.tasklet_info.get_pb_manifest()
478 ssl_cert = manifest.bootstrap_phase.rwsecurity.cert
479 ssl_key = manifest.bootstrap_phase.rwsecurity.key
480 ssl_options = {
481 "certfile": ssl_cert,
482 "keyfile": ssl_key,
483 }
484
485 if manifest.bootstrap_phase.rwsecurity.use_ssl:
486 self.server = tornado.httpserver.HTTPServer(
487 self.app,
488 max_body_size=LaunchpadTasklet.UPLOAD_MAX_BODY_SIZE,
489 io_loop=io_loop,
490 ssl_options=ssl_options,
491 )
492
493 else:
494 self.server = tornado.httpserver.HTTPServer(
495 self.app,
496 max_body_size=LaunchpadTasklet.UPLOAD_MAX_BODY_SIZE,
497 io_loop=io_loop,
498 )
499
500 self.log.debug("Registering project handler")
501 print("PJ: Registering project handler")
502 self.project_handler = ProjectHandler(self, LaunchpadProject,
503 app=self.app)
504 self.project_handler.register()
505
506 except Exception as e:
507 self.log.error("Exception : {}".format(e))
508 self.log.exception(e)
509
510 @asyncio.coroutine
511 def run(self):
512 self.server.listen(LaunchpadTasklet.UPLOAD_PORT)
513
514 def on_instance_started(self):
515 self.log.debug("Got instance started callback")
516
517 @asyncio.coroutine
518 def on_dts_state_change(self, state):
519 """Handle DTS state change
520
521 Take action according to current DTS state to transition application
522 into the corresponding application state
523
524 Arguments
525 state - current dts state
526
527 """
528 switch = {
529 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
530 rwdts.State.CONFIG: rwdts.State.RUN,
531 }
532
533 handlers = {
534 rwdts.State.INIT: self.init,
535 rwdts.State.RUN: self.run,
536 }
537
538 # Transition application to next state
539 handler = handlers.get(state, None)
540 if handler is not None:
541 yield from handler()
542
543 # Transition dts to next state
544 next_state = switch.get(state, None)
545 if next_state is not None:
546 self.dts.handle.set_state(next_state)