3 # Copyright 2016 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
21 import tornado
.httputil
22 import tornado
.httpserver
23 import tornado
.platform
.asyncio
25 import tornadostreamform
.multipart_streamer
as multipart_streamer
28 gi
.require_version('RwDts', '1.0')
29 gi
.require_version('RwcalYang', '1.0')
30 gi
.require_version('RwTypes', '1.0')
31 gi
.require_version('RwLaunchpadYang', '1.0')
33 from gi
.repository
import (
35 RwLaunchpadYang
as rwlaunchpad
,
41 import rift
.mano
.cloud
42 import rift
.mano
.config_agent
43 from rift
.mano
.utils
.project
import (
46 get_add_delete_update_cfgs
,
49 from rift
.package
import store
51 from . import uploader
52 from . import datacenters
58 MAX_BUFFER_SIZE
= 1 * MB
# Max. size loaded into memory!
59 MAX_BODY_SIZE
= 1 * MB
# Max. size loaded into memory!
62 class LaunchpadError(Exception):
65 class LpProjectNotFound(Exception):
68 class CatalogDtsHandler(object):
69 def __init__(self
, project
, app
):
72 self
.project
= project
76 return self
.project
.log
80 return self
.project
.dts
83 class NsdCatalogDtsHandler(CatalogDtsHandler
):
84 XPATH
= "C,/project-nsd:nsd-catalog/project-nsd:nsd"
86 def add_nsd(self
, nsd
):
87 self
.log
.debug('nsd-catalog-handler:add:{}'.format(nsd
.id))
88 if nsd
.id not in self
.project
.nsd_catalog
:
89 self
.project
.nsd_catalog
[nsd
.id] = nsd
91 self
.log
.error("nsd already in catalog: {}".format(nsd
.id))
93 def update_nsd(self
, nsd
):
94 self
.log
.debug('nsd-catalog-handler:update:{}'.format(nsd
.id))
95 if nsd
.id in self
.project
.nsd_catalog
:
96 self
.project
.nsd_catalog
[nsd
.id] = nsd
98 self
.log
.error("unrecognized NSD: {}".format(nsd
.id))
100 def delete_nsd(self
, nsd_id
):
101 self
.log
.debug('nsd-catalog-handler:delete:{}'.format(nsd_id
))
102 if nsd_id
in self
.project
.nsd_catalog
:
103 del self
.project
.nsd_catalog
[nsd_id
]
105 self
.log
.error("unrecognized NSD: {}".format(nsd_id
))
108 self
.project
.tasklet
.nsd_package_store
.delete_package(nsd_id
)
109 except store
.PackageStoreError
as e
:
110 self
.log
.warning("could not delete package from store: %s", str(e
))
114 def apply_config(dts
, acg
, xact
, action
, _
):
115 if xact
.xact
is None:
116 # When RIFT first comes up, an INSTALL is called with the current config
117 # Since confd doesn't actally persist data this never has any data so
119 self
.log
.debug("No xact handle. Skipping apply config")
122 add_cfgs
, delete_cfgs
, update_cfgs
= get_add_delete_update_cfgs(
123 dts_member_reg
=self
.reg
,
129 for cfg
in delete_cfgs
:
130 self
.delete_nsd(cfg
.id)
137 for cfg
in update_cfgs
:
140 self
.log
.debug("Registering for NSD catalog in project".
141 format(self
.project
.name
))
143 acg_handler
= rift
.tasklets
.AppConfGroup
.Handler(
144 on_apply
=apply_config
,
147 with self
.dts
.appconf_group_create(acg_handler
) as acg
:
148 xpath
= self
.project
.add_project(NsdCatalogDtsHandler
.XPATH
)
149 self
.reg
= acg
.register(
151 flags
=rwdts
.Flag
.SUBSCRIBER
,
154 def deregister(self
):
156 self
.reg
.deregister()
160 class VnfdCatalogDtsHandler(CatalogDtsHandler
):
161 XPATH
= "C,/project-vnfd:vnfd-catalog/project-vnfd:vnfd"
163 def add_vnfd(self
, vnfd
):
164 self
.log
.debug('vnfd-catalog-handler:add:{}'.format(vnfd
.id))
165 if vnfd
.id not in self
.project
.vnfd_catalog
:
166 self
.project
.vnfd_catalog
[vnfd
.id] = vnfd
169 self
.log
.error("VNFD already in catalog: {}".format(vnfd
.id))
171 def update_vnfd(self
, vnfd
):
172 self
.log
.debug('vnfd-catalog-handler:update:{}'.format(vnfd
.id))
173 if vnfd
.id in self
.project
.vnfd_catalog
:
174 self
.project
.vnfd_catalog
[vnfd
.id] = vnfd
177 self
.log
.error("unrecognized VNFD: {}".format(vnfd
.id))
179 def delete_vnfd(self
, vnfd_id
):
180 self
.log
.debug('vnfd-catalog-handler:delete:{}'.format(vnfd_id
))
181 if vnfd_id
in self
.project
.vnfd_catalog
:
182 del self
.project
.vnfd_catalog
[vnfd_id
]
184 self
.log
.error("unrecognized VNFD: {}".format(vnfd_id
))
187 self
.project
.tasklet
.vnfd_package_store
.delete_package(vnfd_id
)
188 except store
.PackageStoreError
as e
:
189 self
.log
.warning("could not delete package from store: %s", str(e
))
193 def apply_config(dts
, acg
, xact
, action
, _
):
194 if xact
.xact
is None:
195 # When RIFT first comes up, an INSTALL is called with the current config
196 # Since confd doesn't actally persist data this never has any data so
198 self
.log
.debug("No xact handle. Skipping apply config")
201 add_cfgs
, delete_cfgs
, update_cfgs
= get_add_delete_update_cfgs(
202 dts_member_reg
=self
.reg
,
208 for cfg
in delete_cfgs
:
209 self
.delete_vnfd(cfg
.id)
216 for cfg
in update_cfgs
:
217 self
.update_vnfd(cfg
)
219 self
.log
.debug("Registering for VNFD catalog in project {}".
220 format(self
.project
.name
))
222 acg_handler
= rift
.tasklets
.AppConfGroup
.Handler(
223 on_apply
=apply_config
,
226 with self
.dts
.appconf_group_create(acg_handler
) as acg
:
227 xpath
= self
.project
.add_project(VnfdCatalogDtsHandler
.XPATH
)
228 self
.reg
= acg
.register(
230 flags
=rwdts
.Flag
.SUBSCRIBER
,
233 def deregister(self
):
235 self
.reg
.deregister()
238 class CfgAgentAccountHandlers(object):
239 def __init__(self
, dts
, log
, log_hdl
, loop
, project
):
242 self
._log
_hdl
= log_hdl
244 self
._project
= project
246 self
._log
.debug("creating config agent account config handler")
247 self
.cfg_agent_cfg_handler
= rift
.mano
.config_agent
.ConfigAgentSubscriber(
248 self
._dts
, self
._log
, self
._project
,
249 rift
.mano
.config_agent
.ConfigAgentCallbacks(
250 on_add_apply
=self
.on_cfg_agent_account_added
,
251 on_delete_apply
=self
.on_cfg_agent_account_deleted
,
255 self
._log
.debug("creating config agent account opdata handler")
256 self
.cfg_agent_operdata_handler
= rift
.mano
.config_agent
.CfgAgentDtsOperdataHandler(
257 self
._dts
, self
._log
, self
._loop
, self
._project
260 def on_cfg_agent_account_deleted(self
, account
):
261 self
._log
.debug("config agent account deleted")
262 self
.cfg_agent_operdata_handler
.delete_cfg_agent_account(account
.name
)
264 def on_cfg_agent_account_added(self
, account
):
265 self
._log
.debug("config agent account added")
266 self
.cfg_agent_operdata_handler
.add_cfg_agent_account(account
)
270 self
.cfg_agent_cfg_handler
.register()
271 yield from self
.cfg_agent_operdata_handler
.register()
273 def deregister(self
):
274 self
.cfg_agent_operdata_handler
.deregister()
275 self
.cfg_agent_cfg_handler
.deregister()
278 class CloudAccountHandlers(object):
279 def __init__(self
, dts
, log
, log_hdl
, loop
, app
, project
):
281 self
._log
_hdl
= log_hdl
285 self
._project
= project
287 self
._log
.debug("Creating cloud account config handler for project {}".
288 format(project
.name
))
289 self
.cloud_cfg_handler
= rift
.mano
.cloud
.CloudAccountConfigSubscriber(
290 self
._dts
, self
._log
, self
._log
_hdl
, self
._project
,
291 rift
.mano
.cloud
.CloudAccountConfigCallbacks(
292 on_add_apply
=self
.on_cloud_account_added
,
293 on_delete_apply
=self
.on_cloud_account_deleted
,
297 self
._log
.debug("creating cloud account opdata handler")
298 self
.cloud_operdata_handler
= rift
.mano
.cloud
.CloudAccountDtsOperdataHandler(
299 self
._dts
, self
._log
, self
._loop
, self
._project
,
302 def on_cloud_account_deleted(self
, account_name
):
303 self
._log
.debug("cloud account deleted")
304 self
._app
.accounts
[self
._project
.name
] = \
305 list(self
.cloud_cfg_handler
.accounts
.values())
306 self
.cloud_operdata_handler
.delete_cloud_account(account_name
)
308 def on_cloud_account_added(self
, account
):
309 self
._log
.debug("cloud account added")
310 self
._app
.accounts
[self
._project
.name
] = \
311 list(self
.cloud_cfg_handler
.accounts
.values())
312 self
._log
.debug("accounts: %s", self
._app
.accounts
)
313 self
.cloud_operdata_handler
.add_cloud_account(account
)
317 yield from self
.cloud_cfg_handler
.register()
318 yield from self
.cloud_operdata_handler
.register()
320 def deregister(self
):
321 self
.cloud_cfg_handler
.deregister()
322 self
.cloud_operdata_handler
.deregister()
325 class LaunchpadProject(ManoProject
):
327 def __init__(self
, name
, tasklet
, **kw
):
328 super(LaunchpadProject
, self
).__init
__(tasklet
.log
, name
)
330 self
._app
= kw
['app']
332 self
.config_handler
= None
333 self
.nsd_catalog_handler
= None
334 self
.vld_catalog_handler
= None
335 self
.vnfd_catalog_handler
= None
336 self
.cloud_handler
= None
337 self
.datacenter_handler
= None
338 self
.lp_config_handler
= None
339 self
.account_handler
= None
341 self
.nsd_catalog
= dict()
342 self
.vld_catalog
= dict()
343 self
.vnfd_catalog
= dict()
355 self
.log
.debug("creating NSD catalog handler for project {}".format(self
.name
))
356 self
.nsd_catalog_handler
= NsdCatalogDtsHandler(self
, self
._app
)
357 yield from self
.nsd_catalog_handler
.register()
359 self
.log
.debug("creating VNFD catalog handler for project {}".format(self
.name
))
360 self
.vnfd_catalog_handler
= VnfdCatalogDtsHandler(self
, self
._app
)
361 yield from self
.vnfd_catalog_handler
.register()
363 self
.log
.debug("creating datacenter handler for project {}".format(self
.name
))
364 self
.datacenter_handler
= datacenters
.DataCenterPublisher(self
.log
, self
.dts
,
366 yield from self
.datacenter_handler
.register()
368 self
.log
.debug("creating cloud account handler for project {}".format(self
.name
))
369 self
.cloud_handler
= CloudAccountHandlers(self
.dts
, self
.log
, self
.log_hdl
,
370 self
.loop
, self
._app
, self
)
371 yield from self
.cloud_handler
.register()
373 self
.log
.debug("creating config agent handler for project {}".format(self
.name
))
374 self
.config_handler
= CfgAgentAccountHandlers(self
.dts
, self
.log
, self
.log_hdl
,
376 yield from self
.config_handler
.register()
378 def deregister(self
):
379 self
.log
.debug("De-register handlers for project: {}".format(self
.name
))
380 self
.config_handler
.deregister()
381 self
.cloud_handler
.deregister()
382 self
.datacenter_handler
.deregister()
383 self
.vnfd_catalog_handler
.deregister()
384 self
.nsd_catalog_handler
.deregister()
387 def delete_prepare(self
):
388 # TODO: Do we need this check
389 # if self.nsd_catalog or self.vnfd_catalog or self.vld_catalog:
394 def cloud_accounts(self
):
395 if self
.cloud_handler
is None:
398 return list(self
.cloud_handler
.cloud_cfg_handler
.accounts
.values())
401 class LaunchpadTasklet(rift
.tasklets
.Tasklet
):
402 UPLOAD_MAX_BODY_SIZE
= MAX_BODY_SIZE
403 UPLOAD_MAX_BUFFER_SIZE
= MAX_BUFFER_SIZE
406 def __init__(self
, *args
, **kwargs
):
407 super(LaunchpadTasklet
, self
).__init
__(*args
, **kwargs
)
408 self
.rwlog
.set_category("rw-mano-log")
409 self
.rwlog
.set_subcategory("launchpad")
412 self
.project_handler
= None
414 self
.vnfd_package_store
= store
.VnfdPackageFilesystemStore(self
.log
)
415 self
.nsd_package_store
= store
.NsdPackageFilesystemStore(self
.log
)
420 print("LP Tasklet init")
422 def _get_project(project
=None):
424 project
= DEFAULT_PROJECT
426 if project
in self
.projects
:
427 return self
.projects
[project
]
429 msg
= "Project {} not found".format(project
)
431 raise LpProjectNotFound(msg
)
433 def nsd_catalog_get(self
, project
=None):
434 return self
._get
_project
(project
=project
).nsd_catalog
436 def vnfd_catalog_get(self
, project
=None):
437 return self
._get
_project
(project
=project
).vnfd_catalog
439 def get_cloud_accounts(self
, project
=None):
440 return self
._get
_project
(project
=project
).cloud_accounts
443 super(LaunchpadTasklet
, self
).start()
444 self
.log
.info("Starting LaunchpadTasklet")
446 self
.log
.debug("Registering with dts")
447 self
.dts
= rift
.tasklets
.DTS(
449 rwlaunchpad
.get_schema(),
451 self
.on_dts_state_change
454 self
.log
.debug("Created DTS Api GI Object: %s", self
.dts
)
461 self
.log
.exception("Caught Exception in LP stop")
464 def get_vnfd_catalog(self
, project
):
465 return self
.projects
[project
].vnfd_catalog
467 def get_nsd_catalog(self
, project
):
468 return self
.projects
[project
].nsd_catalog
473 io_loop
= rift
.tasklets
.tornado
.TaskletAsyncIOLoop(asyncio_loop
=self
.loop
)
474 self
.app
= uploader
.UploaderApplication
.from_tasklet(self
)
475 yield from self
.app
.register()
477 manifest
= self
.tasklet_info
.get_pb_manifest()
478 ssl_cert
= manifest
.bootstrap_phase
.rwsecurity
.cert
479 ssl_key
= manifest
.bootstrap_phase
.rwsecurity
.key
481 "certfile": ssl_cert
,
485 if manifest
.bootstrap_phase
.rwsecurity
.use_ssl
:
486 self
.server
= tornado
.httpserver
.HTTPServer(
488 max_body_size
=LaunchpadTasklet
.UPLOAD_MAX_BODY_SIZE
,
490 ssl_options
=ssl_options
,
494 self
.server
= tornado
.httpserver
.HTTPServer(
496 max_body_size
=LaunchpadTasklet
.UPLOAD_MAX_BODY_SIZE
,
500 self
.log
.debug("Registering project handler")
501 print("PJ: Registering project handler")
502 self
.project_handler
= ProjectHandler(self
, LaunchpadProject
,
504 self
.project_handler
.register()
506 except Exception as e
:
507 self
.log
.error("Exception : {}".format(e
))
508 self
.log
.exception(e
)
512 self
.server
.listen(LaunchpadTasklet
.UPLOAD_PORT
)
514 def on_instance_started(self
):
515 self
.log
.debug("Got instance started callback")
518 def on_dts_state_change(self
, state
):
519 """Handle DTS state change
521 Take action according to current DTS state to transition application
522 into the corresponding application state
525 state - current dts state
529 rwdts
.State
.INIT
: rwdts
.State
.REGN_COMPLETE
,
530 rwdts
.State
.CONFIG
: rwdts
.State
.RUN
,
534 rwdts
.State
.INIT
: self
.init
,
535 rwdts
.State
.RUN
: self
.run
,
538 # Transition application to next state
539 handler
= handlers
.get(state
, None)
540 if handler
is not None:
543 # Transition dts to next state
544 next_state
= switch
.get(state
, None)
545 if next_state
is not None:
546 self
.dts
.handle
.set_state(next_state
)