0eff6160e82270f5f090367c5c4bafe436c8d614
[osm/SO.git] / rwlaunchpad / plugins / rwlaunchpadtasklet / rift / tasklets / rwlaunchpad / tasklet.py
1
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19
20 import tornado
21 import tornado.httputil
22 import tornado.httpserver
23 import tornado.platform.asyncio
24
25 import tornadostreamform.multipart_streamer as multipart_streamer
26
27 import gi
28 gi.require_version('RwDts', '1.0')
29 gi.require_version('RwcalYang', '1.0')
30 gi.require_version('RwTypes', '1.0')
31 gi.require_version('RwLaunchpadYang', '1.0')
32
33 from gi.repository import (
34 RwDts as rwdts,
35 RwLaunchpadYang as rwlaunchpad,
36 RwcalYang as rwcal,
37 RwTypes,
38 )
39
40 import rift.tasklets
41 import rift.mano.cloud
42 import rift.mano.config_agent
43 from rift.package import store
44
45 from . import uploader
46 from . import datacenters
47
48 MB = 1024 * 1024
49 GB = 1024 * MB
50 TB = 1024 * GB
51
52 MAX_BUFFER_SIZE = 1 * MB # Max. size loaded into memory!
53 MAX_BODY_SIZE = 1 * MB # Max. size loaded into memory!
54
55
56 def get_add_delete_update_cfgs(dts_member_reg, xact, key_name):
57 # Unforunately, it is currently difficult to figure out what has exactly
58 # changed in this xact without Pbdelta support (RIFT-4916)
59 # As a workaround, we can fetch the pre and post xact elements and
60 # perform a comparison to figure out adds/deletes/updates
61 xact_cfgs = list(dts_member_reg.get_xact_elements(xact))
62 curr_cfgs = list(dts_member_reg.elements)
63
64 xact_key_map = {getattr(cfg, key_name): cfg for cfg in xact_cfgs}
65 curr_key_map = {getattr(cfg, key_name): cfg for cfg in curr_cfgs}
66
67 # Find Adds
68 added_keys = set(xact_key_map) - set(curr_key_map)
69 added_cfgs = [xact_key_map[key] for key in added_keys]
70
71 # Find Deletes
72 deleted_keys = set(curr_key_map) - set(xact_key_map)
73 deleted_cfgs = [curr_key_map[key] for key in deleted_keys]
74
75 # Find Updates
76 updated_keys = set(curr_key_map) & set(xact_key_map)
77 updated_cfgs = [xact_key_map[key] for key in updated_keys if xact_key_map[key] != curr_key_map[key]]
78
79 return added_cfgs, deleted_cfgs, updated_cfgs
80
81
82 class CatalogDtsHandler(object):
83 def __init__(self, tasklet, app):
84 self.app = app
85 self.reg = None
86 self.tasklet = tasklet
87
88 @property
89 def log(self):
90 return self.tasklet.log
91
92 @property
93 def dts(self):
94 return self.tasklet.dts
95
96
97 class NsdCatalogDtsHandler(CatalogDtsHandler):
98 XPATH = "C,/nsd:nsd-catalog/nsd:nsd"
99
100 def add_nsd(self, nsd):
101 self.log.debug('nsd-catalog-handler:add:{}'.format(nsd.id))
102 if nsd.id not in self.tasklet.nsd_catalog:
103 self.tasklet.nsd_catalog[nsd.id] = nsd
104 else:
105 self.log.error("nsd already in catalog: {}".format(nsd.id))
106
107 def update_nsd(self, nsd):
108 self.log.debug('nsd-catalog-handler:update:{}'.format(nsd.id))
109 if nsd.id in self.tasklet.nsd_catalog:
110 self.tasklet.nsd_catalog[nsd.id] = nsd
111 else:
112 self.log.error("unrecognized NSD: {}".format(nsd.id))
113
114 def delete_nsd(self, nsd_id):
115 self.log.debug('nsd-catalog-handler:delete:{}'.format(nsd_id))
116 if nsd_id in self.tasklet.nsd_catalog:
117 del self.tasklet.nsd_catalog[nsd_id]
118 else:
119 self.log.error("unrecognized NSD: {}".format(nsd_id))
120
121 try:
122 self.tasklet.nsd_package_store.delete_package(nsd_id)
123 except store.PackageStoreError as e:
124 self.log.warning("could not delete package from store: %s", str(e))
125
126 @asyncio.coroutine
127 def register(self):
128 def apply_config(dts, acg, xact, action, _):
129 if xact.xact is None:
130 # When RIFT first comes up, an INSTALL is called with the current config
131 # Since confd doesn't actally persist data this never has any data so
132 # skip this for now.
133 self.log.debug("No xact handle. Skipping apply config")
134 return
135
136 add_cfgs, delete_cfgs, update_cfgs = get_add_delete_update_cfgs(
137 dts_member_reg=self.reg,
138 xact=xact,
139 key_name="id",
140 )
141
142 # Handle Deletes
143 for cfg in delete_cfgs:
144 self.delete_nsd(cfg.id)
145
146 # Handle Adds
147 for cfg in add_cfgs:
148 self.add_nsd(cfg)
149
150 # Handle Updates
151 for cfg in update_cfgs:
152 self.update_nsd(cfg)
153
154 self.log.debug("Registering for NSD catalog")
155
156 acg_handler = rift.tasklets.AppConfGroup.Handler(
157 on_apply=apply_config,
158 )
159
160 with self.dts.appconf_group_create(acg_handler) as acg:
161 self.reg = acg.register(
162 xpath=NsdCatalogDtsHandler.XPATH,
163 flags=rwdts.Flag.SUBSCRIBER,
164 )
165
166
167 class VnfdCatalogDtsHandler(CatalogDtsHandler):
168 XPATH = "C,/vnfd:vnfd-catalog/vnfd:vnfd"
169
170 def add_vnfd(self, vnfd):
171 self.log.debug('vnfd-catalog-handler:add:{}'.format(vnfd.id))
172 if vnfd.id not in self.tasklet.vnfd_catalog:
173 self.tasklet.vnfd_catalog[vnfd.id] = vnfd
174
175 else:
176 self.log.error("VNFD already in catalog: {}".format(vnfd.id))
177
178 def update_vnfd(self, vnfd):
179 self.log.debug('vnfd-catalog-handler:update:{}'.format(vnfd.id))
180 if vnfd.id in self.tasklet.vnfd_catalog:
181 self.tasklet.vnfd_catalog[vnfd.id] = vnfd
182
183 else:
184 self.log.error("unrecognized VNFD: {}".format(vnfd.id))
185
186 def delete_vnfd(self, vnfd_id):
187 self.log.debug('vnfd-catalog-handler:delete:{}'.format(vnfd_id))
188 if vnfd_id in self.tasklet.vnfd_catalog:
189 del self.tasklet.vnfd_catalog[vnfd_id]
190 else:
191 self.log.error("unrecognized VNFD: {}".format(vnfd_id))
192
193 try:
194 self.tasklet.vnfd_package_store.delete_package(vnfd_id)
195 except store.PackageStoreError as e:
196 self.log.warning("could not delete package from store: %s", str(e))
197
198 @asyncio.coroutine
199 def register(self):
200 def apply_config(dts, acg, xact, action, _):
201 if xact.xact is None:
202 # When RIFT first comes up, an INSTALL is called with the current config
203 # Since confd doesn't actally persist data this never has any data so
204 # skip this for now.
205 self.log.debug("No xact handle. Skipping apply config")
206 return
207
208 add_cfgs, delete_cfgs, update_cfgs = get_add_delete_update_cfgs(
209 dts_member_reg=self.reg,
210 xact=xact,
211 key_name="id",
212 )
213
214 # Handle Deletes
215 for cfg in delete_cfgs:
216 self.delete_vnfd(cfg.id)
217
218 # Handle Adds
219 for cfg in add_cfgs:
220 self.add_vnfd(cfg)
221
222 # Handle Updates
223 for cfg in update_cfgs:
224 self.update_vnfd(cfg)
225
226 self.log.debug("Registering for VNFD catalog")
227
228 acg_handler = rift.tasklets.AppConfGroup.Handler(
229 on_apply=apply_config,
230 )
231
232 with self.dts.appconf_group_create(acg_handler) as acg:
233 self.reg = acg.register(
234 xpath=VnfdCatalogDtsHandler.XPATH,
235 flags=rwdts.Flag.SUBSCRIBER,
236 )
237
238 class CfgAgentAccountHandlers(object):
239 def __init__(self, dts, log, log_hdl, loop):
240 self._dts = dts
241 self._log = log
242 self._log_hdl = log_hdl
243 self._loop = loop
244
245 self._log.debug("creating config agent account config handler")
246 self.cfg_agent_cfg_handler = rift.mano.config_agent.ConfigAgentSubscriber(
247 self._dts, self._log,
248 rift.mano.config_agent.ConfigAgentCallbacks(
249 on_add_apply=self.on_cfg_agent_account_added,
250 on_delete_apply=self.on_cfg_agent_account_deleted,
251 )
252 )
253
254 self._log.debug("creating config agent account opdata handler")
255 self.cfg_agent_operdata_handler = rift.mano.config_agent.CfgAgentDtsOperdataHandler(
256 self._dts, self._log, self._loop,
257 )
258
259 def on_cfg_agent_account_deleted(self, account):
260 self._log.debug("config agent account deleted")
261 self.cfg_agent_operdata_handler.delete_cfg_agent_account(account.name)
262
263 def on_cfg_agent_account_added(self, account):
264 self._log.debug("config agent account added")
265 self.cfg_agent_operdata_handler.add_cfg_agent_account(account)
266
267 @asyncio.coroutine
268 def register(self):
269 self.cfg_agent_cfg_handler.register()
270 yield from self.cfg_agent_operdata_handler.register()
271
272 class CloudAccountHandlers(object):
273 def __init__(self, dts, log, log_hdl, loop, app):
274 self._log = log
275 self._log_hdl = log_hdl
276 self._dts = dts
277 self._loop = loop
278 self._app = app
279
280 self._log.debug("creating cloud account config handler")
281 self.cloud_cfg_handler = rift.mano.cloud.CloudAccountConfigSubscriber(
282 self._dts, self._log, self._log_hdl,
283 rift.mano.cloud.CloudAccountConfigCallbacks(
284 on_add_apply=self.on_cloud_account_added,
285 on_delete_apply=self.on_cloud_account_deleted,
286 )
287 )
288
289 self._log.debug("creating cloud account opdata handler")
290 self.cloud_operdata_handler = rift.mano.cloud.CloudAccountDtsOperdataHandler(
291 self._dts, self._log, self._loop,
292 )
293
294 def on_cloud_account_deleted(self, account_name):
295 self._log.debug("cloud account deleted")
296 self._app.accounts.clear()
297 self._app.accounts.extend(list(self.cloud_cfg_handler.accounts.values()))
298 self.cloud_operdata_handler.delete_cloud_account(account_name)
299
300 def on_cloud_account_added(self, account):
301 self._log.debug("cloud account added")
302 self._app.accounts.clear()
303 self._app.accounts.extend(list(self.cloud_cfg_handler.accounts.values()))
304 self._log.debug("accounts: %s", self._app.accounts)
305 self.cloud_operdata_handler.add_cloud_account(account)
306
307 @asyncio.coroutine
308 def register(self):
309 self.cloud_cfg_handler.register()
310 yield from self.cloud_operdata_handler.register()
311
312
313 class LaunchpadTasklet(rift.tasklets.Tasklet):
314 UPLOAD_MAX_BODY_SIZE = MAX_BODY_SIZE
315 UPLOAD_MAX_BUFFER_SIZE = MAX_BUFFER_SIZE
316 UPLOAD_PORT = "4567"
317
318 def __init__(self, *args, **kwargs):
319 super(LaunchpadTasklet, self).__init__(*args, **kwargs)
320 self.rwlog.set_category("rw-mano-log")
321 self.rwlog.set_subcategory("launchpad")
322
323 self.app = None
324 self.server = None
325
326 self.account_handler = None
327 self.config_handler = None
328 self.nsd_catalog_handler = None
329 self.vld_catalog_handler = None
330 self.vnfd_catalog_handler = None
331 self.cloud_handler = None
332 self.datacenter_handler = None
333 self.lp_config_handler = None
334
335 self.vnfd_package_store = store.VnfdPackageFilesystemStore(self.log)
336 self.nsd_package_store = store.NsdPackageFilesystemStore(self.log)
337
338 self.nsd_catalog = dict()
339 self.vld_catalog = dict()
340 self.vnfd_catalog = dict()
341
342 @property
343 def cloud_accounts(self):
344 if self.cloud_handler is None:
345 return list()
346
347 return list(self.cloud_handler.cloud_cfg_handler.accounts.values())
348
349 def start(self):
350 super(LaunchpadTasklet, self).start()
351 self.log.info("Starting LaunchpadTasklet")
352
353 self.log.debug("Registering with dts")
354 self.dts = rift.tasklets.DTS(
355 self.tasklet_info,
356 rwlaunchpad.get_schema(),
357 self.loop,
358 self.on_dts_state_change
359 )
360
361 self.log.debug("Created DTS Api GI Object: %s", self.dts)
362
363 def stop(self):
364 try:
365 self.server.stop()
366 self.dts.deinit()
367 except Exception:
368 self.log.exception("Caught Exception in LP stop")
369 raise
370
371 @asyncio.coroutine
372 def init(self):
373 io_loop = rift.tasklets.tornado.TaskletAsyncIOLoop(asyncio_loop=self.loop)
374 self.app = uploader.UploaderApplication.from_tasklet(self)
375 yield from self.app.register()
376
377 manifest = self.tasklet_info.get_pb_manifest()
378 ssl_cert = manifest.bootstrap_phase.rwsecurity.cert
379 ssl_key = manifest.bootstrap_phase.rwsecurity.key
380 ssl_options = {
381 "certfile": ssl_cert,
382 "keyfile": ssl_key,
383 }
384
385 if manifest.bootstrap_phase.rwsecurity.use_ssl:
386 self.server = tornado.httpserver.HTTPServer(
387 self.app,
388 max_body_size=LaunchpadTasklet.UPLOAD_MAX_BODY_SIZE,
389 io_loop=io_loop,
390 ssl_options=ssl_options,
391 )
392
393 else:
394 self.server = tornado.httpserver.HTTPServer(
395 self.app,
396 max_body_size=LaunchpadTasklet.UPLOAD_MAX_BODY_SIZE,
397 io_loop=io_loop,
398 )
399
400 self.log.debug("creating NSD catalog handler")
401 self.nsd_catalog_handler = NsdCatalogDtsHandler(self, self.app)
402 yield from self.nsd_catalog_handler.register()
403
404 self.log.debug("creating VNFD catalog handler")
405 self.vnfd_catalog_handler = VnfdCatalogDtsHandler(self, self.app)
406 yield from self.vnfd_catalog_handler.register()
407
408 self.log.debug("creating datacenter handler")
409 self.datacenter_handler = datacenters.DataCenterPublisher(self.log, self.dts, self.loop)
410 yield from self.datacenter_handler.register()
411
412 self.log.debug("creating cloud account handler")
413 self.cloud_handler = CloudAccountHandlers(
414 self.dts, self.log, self.log_hdl, self.loop, self.app
415 )
416 yield from self.cloud_handler.register()
417
418 self.log.debug("creating config agent handler")
419 self.config_handler = CfgAgentAccountHandlers(self.dts, self.log, self.log_hdl, self.loop)
420 yield from self.config_handler.register()
421
422 @asyncio.coroutine
423 def run(self):
424 self.server.listen(LaunchpadTasklet.UPLOAD_PORT)
425
426 def on_instance_started(self):
427 self.log.debug("Got instance started callback")
428
429 @asyncio.coroutine
430 def on_dts_state_change(self, state):
431 """Handle DTS state change
432
433 Take action according to current DTS state to transition application
434 into the corresponding application state
435
436 Arguments
437 state - current dts state
438
439 """
440 switch = {
441 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
442 rwdts.State.CONFIG: rwdts.State.RUN,
443 }
444
445 handlers = {
446 rwdts.State.INIT: self.init,
447 rwdts.State.RUN: self.run,
448 }
449
450 # Transition application to next state
451 handler = handlers.get(state, None)
452 if handler is not None:
453 yield from handler()
454
455 # Transition dts to next state
456 next_state = switch.get(state, None)
457 if next_state is not None:
458 self.dts.handle.set_state(next_state)