3 # Copyright 2016 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
21 import tornado
.httputil
22 import tornado
.httpserver
23 import tornado
.platform
.asyncio
25 import tornadostreamform
.multipart_streamer
as multipart_streamer
28 gi
.require_version('RwDts', '1.0')
29 gi
.require_version('RwcalYang', '1.0')
30 gi
.require_version('RwTypes', '1.0')
31 gi
.require_version('RwLaunchpadYang', '1.0')
33 from gi
.repository
import (
35 RwLaunchpadYang
as rwlaunchpad
,
41 import rift
.mano
.cloud
42 import rift
.mano
.config_agent
43 from rift
.package
import store
45 from . import uploader
46 from . import datacenters
52 MAX_BUFFER_SIZE
= 1 * MB
# Max. size loaded into memory!
53 MAX_BODY_SIZE
= 1 * MB
# Max. size loaded into memory!
56 def get_add_delete_update_cfgs(dts_member_reg
, xact
, key_name
):
57 # Unforunately, it is currently difficult to figure out what has exactly
58 # changed in this xact without Pbdelta support (RIFT-4916)
59 # As a workaround, we can fetch the pre and post xact elements and
60 # perform a comparison to figure out adds/deletes/updates
61 xact_cfgs
= list(dts_member_reg
.get_xact_elements(xact
))
62 curr_cfgs
= list(dts_member_reg
.elements
)
64 xact_key_map
= {getattr(cfg
, key_name
): cfg
for cfg
in xact_cfgs
}
65 curr_key_map
= {getattr(cfg
, key_name
): cfg
for cfg
in curr_cfgs
}
68 added_keys
= set(xact_key_map
) - set(curr_key_map
)
69 added_cfgs
= [xact_key_map
[key
] for key
in added_keys
]
72 deleted_keys
= set(curr_key_map
) - set(xact_key_map
)
73 deleted_cfgs
= [curr_key_map
[key
] for key
in deleted_keys
]
76 updated_keys
= set(curr_key_map
) & set(xact_key_map
)
77 updated_cfgs
= [xact_key_map
[key
] for key
in updated_keys
if xact_key_map
[key
] != curr_key_map
[key
]]
79 return added_cfgs
, deleted_cfgs
, updated_cfgs
82 class CatalogDtsHandler(object):
83 def __init__(self
, tasklet
, app
):
86 self
.tasklet
= tasklet
90 return self
.tasklet
.log
94 return self
.tasklet
.dts
97 class NsdCatalogDtsHandler(CatalogDtsHandler
):
98 XPATH
= "C,/nsd:nsd-catalog/nsd:nsd"
100 def add_nsd(self
, nsd
):
101 self
.log
.debug('nsd-catalog-handler:add:{}'.format(nsd
.id))
102 if nsd
.id not in self
.tasklet
.nsd_catalog
:
103 self
.tasklet
.nsd_catalog
[nsd
.id] = nsd
105 self
.log
.error("nsd already in catalog: {}".format(nsd
.id))
107 def update_nsd(self
, nsd
):
108 self
.log
.debug('nsd-catalog-handler:update:{}'.format(nsd
.id))
109 if nsd
.id in self
.tasklet
.nsd_catalog
:
110 self
.tasklet
.nsd_catalog
[nsd
.id] = nsd
112 self
.log
.error("unrecognized NSD: {}".format(nsd
.id))
114 def delete_nsd(self
, nsd_id
):
115 self
.log
.debug('nsd-catalog-handler:delete:{}'.format(nsd_id
))
116 if nsd_id
in self
.tasklet
.nsd_catalog
:
117 del self
.tasklet
.nsd_catalog
[nsd_id
]
119 self
.log
.error("unrecognized NSD: {}".format(nsd_id
))
122 self
.tasklet
.nsd_package_store
.delete_package(nsd_id
)
123 except store
.PackageStoreError
as e
:
124 self
.log
.warning("could not delete package from store: %s", str(e
))
128 def apply_config(dts
, acg
, xact
, action
, _
):
129 if xact
.xact
is None:
130 # When RIFT first comes up, an INSTALL is called with the current config
131 # Since confd doesn't actally persist data this never has any data so
133 self
.log
.debug("No xact handle. Skipping apply config")
136 add_cfgs
, delete_cfgs
, update_cfgs
= get_add_delete_update_cfgs(
137 dts_member_reg
=self
.reg
,
143 for cfg
in delete_cfgs
:
144 self
.delete_nsd(cfg
.id)
151 for cfg
in update_cfgs
:
154 self
.log
.debug("Registering for NSD catalog")
156 acg_handler
= rift
.tasklets
.AppConfGroup
.Handler(
157 on_apply
=apply_config
,
160 with self
.dts
.appconf_group_create(acg_handler
) as acg
:
161 self
.reg
= acg
.register(
162 xpath
=NsdCatalogDtsHandler
.XPATH
,
163 flags
=rwdts
.Flag
.SUBSCRIBER
,
167 class VnfdCatalogDtsHandler(CatalogDtsHandler
):
168 XPATH
= "C,/vnfd:vnfd-catalog/vnfd:vnfd"
170 def add_vnfd(self
, vnfd
):
171 self
.log
.debug('vnfd-catalog-handler:add:{}'.format(vnfd
.id))
172 if vnfd
.id not in self
.tasklet
.vnfd_catalog
:
173 self
.tasklet
.vnfd_catalog
[vnfd
.id] = vnfd
176 self
.log
.error("VNFD already in catalog: {}".format(vnfd
.id))
178 def update_vnfd(self
, vnfd
):
179 self
.log
.debug('vnfd-catalog-handler:update:{}'.format(vnfd
.id))
180 if vnfd
.id in self
.tasklet
.vnfd_catalog
:
181 self
.tasklet
.vnfd_catalog
[vnfd
.id] = vnfd
184 self
.log
.error("unrecognized VNFD: {}".format(vnfd
.id))
186 def delete_vnfd(self
, vnfd_id
):
187 self
.log
.debug('vnfd-catalog-handler:delete:{}'.format(vnfd_id
))
188 if vnfd_id
in self
.tasklet
.vnfd_catalog
:
189 del self
.tasklet
.vnfd_catalog
[vnfd_id
]
191 self
.log
.error("unrecognized VNFD: {}".format(vnfd_id
))
194 self
.tasklet
.vnfd_package_store
.delete_package(vnfd_id
)
195 except store
.PackageStoreError
as e
:
196 self
.log
.warning("could not delete package from store: %s", str(e
))
200 def apply_config(dts
, acg
, xact
, action
, _
):
201 if xact
.xact
is None:
202 # When RIFT first comes up, an INSTALL is called with the current config
203 # Since confd doesn't actally persist data this never has any data so
205 self
.log
.debug("No xact handle. Skipping apply config")
208 add_cfgs
, delete_cfgs
, update_cfgs
= get_add_delete_update_cfgs(
209 dts_member_reg
=self
.reg
,
215 for cfg
in delete_cfgs
:
216 self
.delete_vnfd(cfg
.id)
223 for cfg
in update_cfgs
:
224 self
.update_vnfd(cfg
)
226 self
.log
.debug("Registering for VNFD catalog")
228 acg_handler
= rift
.tasklets
.AppConfGroup
.Handler(
229 on_apply
=apply_config
,
232 with self
.dts
.appconf_group_create(acg_handler
) as acg
:
233 self
.reg
= acg
.register(
234 xpath
=VnfdCatalogDtsHandler
.XPATH
,
235 flags
=rwdts
.Flag
.SUBSCRIBER
,
238 class CfgAgentAccountHandlers(object):
239 def __init__(self
, dts
, log
, log_hdl
, loop
):
242 self
._log
_hdl
= log_hdl
245 self
._log
.debug("creating config agent account config handler")
246 self
.cfg_agent_cfg_handler
= rift
.mano
.config_agent
.ConfigAgentSubscriber(
247 self
._dts
, self
._log
,
248 rift
.mano
.config_agent
.ConfigAgentCallbacks(
249 on_add_apply
=self
.on_cfg_agent_account_added
,
250 on_delete_apply
=self
.on_cfg_agent_account_deleted
,
254 self
._log
.debug("creating config agent account opdata handler")
255 self
.cfg_agent_operdata_handler
= rift
.mano
.config_agent
.CfgAgentDtsOperdataHandler(
256 self
._dts
, self
._log
, self
._loop
,
259 def on_cfg_agent_account_deleted(self
, account
):
260 self
._log
.debug("config agent account deleted")
261 self
.cfg_agent_operdata_handler
.delete_cfg_agent_account(account
.name
)
263 def on_cfg_agent_account_added(self
, account
):
264 self
._log
.debug("config agent account added")
265 self
.cfg_agent_operdata_handler
.add_cfg_agent_account(account
)
269 self
.cfg_agent_cfg_handler
.register()
270 yield from self
.cfg_agent_operdata_handler
.register()
272 class CloudAccountHandlers(object):
273 def __init__(self
, dts
, log
, log_hdl
, loop
, app
):
275 self
._log
_hdl
= log_hdl
280 self
._log
.debug("creating cloud account config handler")
281 self
.cloud_cfg_handler
= rift
.mano
.cloud
.CloudAccountConfigSubscriber(
282 self
._dts
, self
._log
, self
._log
_hdl
,
283 rift
.mano
.cloud
.CloudAccountConfigCallbacks(
284 on_add_apply
=self
.on_cloud_account_added
,
285 on_delete_apply
=self
.on_cloud_account_deleted
,
289 self
._log
.debug("creating cloud account opdata handler")
290 self
.cloud_operdata_handler
= rift
.mano
.cloud
.CloudAccountDtsOperdataHandler(
291 self
._dts
, self
._log
, self
._loop
,
294 def on_cloud_account_deleted(self
, account_name
):
295 self
._log
.debug("cloud account deleted")
296 self
._app
.accounts
.clear()
297 self
._app
.accounts
.extend(list(self
.cloud_cfg_handler
.accounts
.values()))
298 self
.cloud_operdata_handler
.delete_cloud_account(account_name
)
300 def on_cloud_account_added(self
, account
):
301 self
._log
.debug("cloud account added")
302 self
._app
.accounts
.clear()
303 self
._app
.accounts
.extend(list(self
.cloud_cfg_handler
.accounts
.values()))
304 self
._log
.debug("accounts: %s", self
._app
.accounts
)
305 self
.cloud_operdata_handler
.add_cloud_account(account
)
309 self
.cloud_cfg_handler
.register()
310 yield from self
.cloud_operdata_handler
.register()
313 class LaunchpadTasklet(rift
.tasklets
.Tasklet
):
314 UPLOAD_MAX_BODY_SIZE
= MAX_BODY_SIZE
315 UPLOAD_MAX_BUFFER_SIZE
= MAX_BUFFER_SIZE
318 def __init__(self
, *args
, **kwargs
):
319 super(LaunchpadTasklet
, self
).__init
__(*args
, **kwargs
)
320 self
.rwlog
.set_category("rw-mano-log")
321 self
.rwlog
.set_subcategory("launchpad")
326 self
.account_handler
= None
327 self
.config_handler
= None
328 self
.nsd_catalog_handler
= None
329 self
.vld_catalog_handler
= None
330 self
.vnfd_catalog_handler
= None
331 self
.cloud_handler
= None
332 self
.datacenter_handler
= None
333 self
.lp_config_handler
= None
335 self
.vnfd_package_store
= store
.VnfdPackageFilesystemStore(self
.log
)
336 self
.nsd_package_store
= store
.NsdPackageFilesystemStore(self
.log
)
338 self
.nsd_catalog
= dict()
339 self
.vld_catalog
= dict()
340 self
.vnfd_catalog
= dict()
343 def cloud_accounts(self
):
344 if self
.cloud_handler
is None:
347 return list(self
.cloud_handler
.cloud_cfg_handler
.accounts
.values())
350 super(LaunchpadTasklet
, self
).start()
351 self
.log
.info("Starting LaunchpadTasklet")
353 self
.log
.debug("Registering with dts")
354 self
.dts
= rift
.tasklets
.DTS(
356 rwlaunchpad
.get_schema(),
358 self
.on_dts_state_change
361 self
.log
.debug("Created DTS Api GI Object: %s", self
.dts
)
368 self
.log
.exception("Caught Exception in LP stop")
373 io_loop
= rift
.tasklets
.tornado
.TaskletAsyncIOLoop(asyncio_loop
=self
.loop
)
374 self
.app
= uploader
.UploaderApplication
.from_tasklet(self
)
375 yield from self
.app
.register()
377 manifest
= self
.tasklet_info
.get_pb_manifest()
378 ssl_cert
= manifest
.bootstrap_phase
.rwsecurity
.cert
379 ssl_key
= manifest
.bootstrap_phase
.rwsecurity
.key
381 "certfile": ssl_cert
,
385 if manifest
.bootstrap_phase
.rwsecurity
.use_ssl
:
386 self
.server
= tornado
.httpserver
.HTTPServer(
388 max_body_size
=LaunchpadTasklet
.UPLOAD_MAX_BODY_SIZE
,
390 ssl_options
=ssl_options
,
394 self
.server
= tornado
.httpserver
.HTTPServer(
396 max_body_size
=LaunchpadTasklet
.UPLOAD_MAX_BODY_SIZE
,
400 self
.log
.debug("creating NSD catalog handler")
401 self
.nsd_catalog_handler
= NsdCatalogDtsHandler(self
, self
.app
)
402 yield from self
.nsd_catalog_handler
.register()
404 self
.log
.debug("creating VNFD catalog handler")
405 self
.vnfd_catalog_handler
= VnfdCatalogDtsHandler(self
, self
.app
)
406 yield from self
.vnfd_catalog_handler
.register()
408 self
.log
.debug("creating datacenter handler")
409 self
.datacenter_handler
= datacenters
.DataCenterPublisher(self
.log
, self
.dts
, self
.loop
)
410 yield from self
.datacenter_handler
.register()
412 self
.log
.debug("creating cloud account handler")
413 self
.cloud_handler
= CloudAccountHandlers(
414 self
.dts
, self
.log
, self
.log_hdl
, self
.loop
, self
.app
416 yield from self
.cloud_handler
.register()
418 self
.log
.debug("creating config agent handler")
419 self
.config_handler
= CfgAgentAccountHandlers(self
.dts
, self
.log
, self
.log_hdl
, self
.loop
)
420 yield from self
.config_handler
.register()
424 self
.server
.listen(LaunchpadTasklet
.UPLOAD_PORT
)
426 def on_instance_started(self
):
427 self
.log
.debug("Got instance started callback")
430 def on_dts_state_change(self
, state
):
431 """Handle DTS state change
433 Take action according to current DTS state to transition application
434 into the corresponding application state
437 state - current dts state
441 rwdts
.State
.INIT
: rwdts
.State
.REGN_COMPLETE
,
442 rwdts
.State
.CONFIG
: rwdts
.State
.RUN
,
446 rwdts
.State
.INIT
: self
.init
,
447 rwdts
.State
.RUN
: self
.run
,
450 # Transition application to next state
451 handler
= handlers
.get(state
, None)
452 if handler
is not None:
455 # Transition dts to next state
456 next_state
= switch
.get(state
, None)
457 if next_state
is not None:
458 self
.dts
.handle
.set_state(next_state
)