update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / rwlaunchpad / plugins / rwlaunchpadtasklet / rift / tasklets / rwlaunchpad / tasklet.py
1
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19
20 import tornado
21 import tornado.httputil
22 import tornado.httpserver
23 import tornado.platform.asyncio
24 import abc
25
26 import tornadostreamform.multipart_streamer as multipart_streamer
27
28 import gi
29 gi.require_version('RwDts', '1.0')
30 gi.require_version('RwcalYang', '1.0')
31 gi.require_version('RwTypes', '1.0')
32 gi.require_version('rwlib', '1.0')
33 gi.require_version('RwLaunchpadYang', '1.0')
34
35 from gi.repository import (
36 RwDts as rwdts,
37 RwLaunchpadYang as rwlaunchpad,
38 RwcalYang as rwcal,
39 RwTypes,
40 RwPkgMgmtYang
41 )
42 import gi.repository.rwlib as rwlib
43 from gi.repository.RwKeyspec import quoted_key
44
45 import rift.tasklets
46 import rift.mano.cloud
47 import rift.mano.ro_account
48 import rift.mano.config_agent
49 import rift.downloader as downloader
50 from rift.mano.utils.project import (
51 ManoProject,
52 ProjectHandler,
53 get_add_delete_update_cfgs,
54 DEFAULT_PROJECT,
55 )
56 from rift.package import store
57
58 from . import uploader
59
60 MB = 1024 * 1024
61 GB = 1024 * MB
62 TB = 1024 * GB
63
64 MAX_BUFFER_SIZE = 1 * MB # Max. size loaded into memory!
65 MAX_BODY_SIZE = 1 * MB # Max. size loaded into memory!
66
67 TaskStatus = RwPkgMgmtYang.TaskStatus
68
69 class LaunchpadError(Exception):
70 pass
71
72 class LpProjectNotFound(Exception):
73 pass
74
75 class CatalogDtsHandler(object):
76 def __init__(self, project, app):
77 self.app = app
78 self.reg = None
79 self.project = project
80
81 @property
82 def log(self):
83 return self.project.log
84
85 @property
86 def dts(self):
87 return self.project.dts
88
89
90 class NsdCatalogDtsHandler(CatalogDtsHandler):
91 XPATH = "C,/project-nsd:nsd-catalog/project-nsd:nsd"
92
93 def add_nsd(self, nsd):
94 self.log.debug('nsd-catalog-handler:add:{}'.format(nsd.id))
95 if nsd.id not in self.project.nsd_catalog:
96 self.project.nsd_catalog[nsd.id] = nsd
97 else:
98 self.log.error("nsd already in catalog: {}".format(nsd.id))
99
100 def update_nsd(self, nsd):
101 self.log.debug('nsd-catalog-handler:update:{}'.format(nsd.id))
102 if nsd.id in self.project.nsd_catalog:
103 self.project.nsd_catalog[nsd.id] = nsd
104 else:
105 self.log.error("unrecognized NSD: {}".format(nsd.id))
106
107 def delete_nsd(self, nsd_id):
108 self.log.debug('nsd-catalog-handler:delete:{}'.format(nsd_id))
109 if nsd_id in self.project.nsd_catalog:
110 del self.project.nsd_catalog[nsd_id]
111 else:
112 self.log.error("unrecognized NSD: {}".format(nsd_id))
113
114 try:
115 self.project.nsd_package_store.delete_package(nsd_id)
116 except store.PackageStoreError as e:
117 self.log.warning("could not delete package from store: %s", str(e))
118
119 @asyncio.coroutine
120 def register(self):
121 def apply_config(dts, acg, xact, action, _):
122 if xact.xact is None:
123 if action == rwdts.AppconfAction.INSTALL:
124 if self.reg:
125 for element in self.reg.elements:
126 self.log.debug("Add NSD on restart: {}".format(element.id))
127 self.add_nsd(element)
128 else:
129 self.log.error("DTS handle is null for project {}".
130 format(self.project.name))
131 else:
132 self.log.debug("No xact handle. Skipping apply config")
133 return
134
135 add_cfgs, delete_cfgs, update_cfgs = get_add_delete_update_cfgs(
136 dts_member_reg=self.reg,
137 xact=xact,
138 key_name="id",
139 )
140
141 # Handle Deletes
142 for cfg in delete_cfgs:
143 self.delete_nsd(cfg.id)
144
145 # Handle Adds
146 for cfg in add_cfgs:
147 self.add_nsd(cfg)
148
149 # Handle Updates
150 for cfg in update_cfgs:
151 self.update_nsd(cfg)
152
153 self.log.debug("Registering for NSD catalog in project".
154 format(self.project.name))
155
156 acg_handler = rift.tasklets.AppConfGroup.Handler(
157 on_apply=apply_config,
158 )
159
160 with self.dts.appconf_group_create(acg_handler) as acg:
161 xpath = self.project.add_project(NsdCatalogDtsHandler.XPATH)
162 self.reg = acg.register(
163 xpath=xpath,
164 flags=rwdts.Flag.SUBSCRIBER,
165 )
166
167 def deregister(self):
168 if self.reg:
169 self.reg.deregister()
170 self.reg = None
171
172
173 class VnfdCatalogDtsHandler(CatalogDtsHandler):
174 XPATH = "C,/project-vnfd:vnfd-catalog/project-vnfd:vnfd"
175
176 def add_vnfd(self, vnfd):
177 self.log.debug('vnfd-catalog-handler:add:{}'.format(vnfd.id))
178 if vnfd.id not in self.project.vnfd_catalog:
179 self.project.vnfd_catalog[vnfd.id] = vnfd
180
181 else:
182 self.log.error("VNFD already in catalog: {}".format(vnfd.id))
183
184 def update_vnfd(self, vnfd):
185 self.log.debug('vnfd-catalog-handler:update:{}'.format(vnfd.id))
186
187 if vnfd.id in self.project.vnfd_catalog:
188 self.project.vnfd_catalog[vnfd.id] = vnfd
189
190 else:
191 self.log.error("unrecognized VNFD: {}".format(vnfd.id))
192
193 def delete_vnfd(self, vnfd_id):
194 self.log.debug('vnfd-catalog-handler:delete:{}'.format(vnfd_id))
195 if vnfd_id in self.project.vnfd_catalog:
196 del self.project.vnfd_catalog[vnfd_id]
197 else:
198 self.log.error("unrecognized VNFD: {}".format(vnfd_id))
199
200 try:
201 self.project.vnfd_package_store.delete_package(vnfd_id)
202 except store.PackageStoreError as e:
203 self.log.warning("could not delete package from store: %s", str(e))
204
205 @asyncio.coroutine
206 def register(self):
207 def apply_config(dts, acg, xact, action, _):
208 if xact.xact is None:
209 if action == rwdts.AppconfAction.INSTALL:
210 if self.reg:
211 for element in self.reg.elements:
212 self.log.error("Add VNFD on restart: {}".format(element.id))
213 self.add_vnfd(element)
214 else:
215 self.log.error("DTS handle is null for project {}".
216 format(self.project.name))
217 else:
218 self.log.debug("No xact handle. Skipping apply config")
219 return
220
221 add_cfgs, delete_cfgs, update_cfgs = get_add_delete_update_cfgs(
222 dts_member_reg=self.reg,
223 xact=xact,
224 key_name="id",
225 )
226
227 # Handle Deletes
228 for cfg in delete_cfgs:
229 self.delete_vnfd(cfg.id)
230
231 # Handle Adds
232 for cfg in add_cfgs:
233 self.add_vnfd(cfg)
234
235 # Handle Updates
236 for cfg in update_cfgs:
237 self.update_vnfd(cfg)
238
239 self.log.debug("Registering for VNFD catalog in project {}".
240 format(self.project.name))
241
242 acg_handler = rift.tasklets.AppConfGroup.Handler(
243 on_apply=apply_config,
244 )
245
246 with self.dts.appconf_group_create(acg_handler) as acg:
247 xpath = self.project.add_project(VnfdCatalogDtsHandler.XPATH)
248 self.reg = acg.register(
249 xpath=xpath,
250 flags=rwdts.Flag.SUBSCRIBER,
251 )
252
253 def deregister(self):
254 if self.reg:
255 self.reg.deregister()
256 self.reg = None
257
258 class CfgAgentAccountHandlers(object):
259 def __init__(self, dts, log, log_hdl, loop, project):
260 self._dts = dts
261 self._log = log
262 self._log_hdl = log_hdl
263 self._loop = loop
264 self._project = project
265
266 self._log.debug("creating config agent account config handler")
267 self.cfg_agent_cfg_handler = rift.mano.config_agent.ConfigAgentSubscriber(
268 self._dts, self._log, self._project,
269 rift.mano.config_agent.ConfigAgentCallbacks(
270 on_add_apply=self.on_cfg_agent_account_added,
271 on_delete_apply=self.on_cfg_agent_account_deleted,
272 )
273 )
274
275 self._log.debug("creating config agent account opdata handler")
276 self.cfg_agent_operdata_handler = rift.mano.config_agent.CfgAgentDtsOperdataHandler(
277 self._dts, self._log, self._loop, self._project
278 )
279
280 def on_cfg_agent_account_deleted(self, account):
281 self._log.debug("config agent account deleted")
282 self.cfg_agent_operdata_handler.delete_cfg_agent_account(account.name)
283
284 def on_cfg_agent_account_added(self, account):
285 self._log.debug("config agent account added")
286 self.cfg_agent_operdata_handler.add_cfg_agent_account(account)
287
288 @asyncio.coroutine
289 def register(self):
290 self.cfg_agent_cfg_handler.register()
291 yield from self.cfg_agent_operdata_handler.register()
292
293 def deregister(self):
294 self.cfg_agent_operdata_handler.deregister()
295 self.cfg_agent_cfg_handler.deregister()
296
297
298 class CloudAccountHandlers(object):
299 def __init__(self, dts, log, log_hdl, loop, app, project):
300 self._log = log
301 self._log_hdl = log_hdl
302 self._dts = dts
303 self._loop = loop
304 self._app = app
305 self._project = project
306
307 self._log.debug("Creating cloud account config handler for project {}".
308 format(project.name))
309 self.cloud_cfg_handler = rift.mano.cloud.CloudAccountConfigSubscriber(
310 self._dts, self._log, self._log_hdl, self._project,
311 rift.mano.cloud.CloudAccountConfigCallbacks(
312 on_add_apply=self.on_cloud_account_added,
313 on_delete_apply=self.on_cloud_account_deleted,
314 ),
315 )
316
317 self._log.debug("creating cloud account opdata handler")
318 self.cloud_operdata_handler = rift.mano.cloud.CloudAccountDtsOperdataHandler(
319 self._dts, self._log, self._loop, self._project,
320 )
321
322 def on_cloud_account_deleted(self, account_name):
323 self._log.debug("cloud account deleted")
324 self._app.accounts[self._project.name] = \
325 list(self.cloud_cfg_handler.accounts.values())
326 self.cloud_operdata_handler.delete_cloud_account(account_name)
327
328 def on_cloud_account_added(self, account):
329 self._log.debug("cloud account added")
330 self._app.accounts[self._project.name] = \
331 list(self.cloud_cfg_handler.accounts.values())
332 self._log.debug("accounts: %s", self._app.accounts)
333 self.cloud_operdata_handler.add_cloud_account(account)
334
335 @asyncio.coroutine
336 def register(self):
337 yield from self.cloud_cfg_handler.register()
338 yield from self.cloud_operdata_handler.register()
339
340 def deregister(self):
341 self.cloud_cfg_handler.deregister()
342 self.cloud_operdata_handler.deregister()
343
344 class ROAccountHandlers(object):
345 def __init__(self, dts, log, loop, app, project):
346 self._log = log
347 self._dts = dts
348 self._loop = loop
349 self._app = app
350 self._project = project
351
352 self._log.debug("Creating RO account config handler for project {}".
353 format(project.name))
354 self.ro_cfg_handler = rift.mano.ro_account.ROAccountConfigSubscriber(
355 self._dts, self._log, self._loop, self._project, None,
356 rift.mano.ro_account.ROAccountConfigCallbacks(
357 on_add_apply=self.on_ro_account_added,
358 on_delete_apply=self.on_ro_account_deleted,
359 ),
360 )
361
362 self._log.debug("Creating RO account opdata handler")
363 self.ro_operdata_handler = rift.mano.ro_account.ROAccountDtsOperdataHandler(
364 self._dts, self._log, self._loop, self._project
365 )
366
367 def on_ro_account_deleted(self, account_name):
368 self._log.debug(" launchpad tasklet RO account deleted")
369 self._app.ro_accounts[self._project.name] = \
370 list(self.ro_cfg_handler.accounts.values())
371 self.ro_operdata_handler.delete_ro_account(account_name)
372
373 def on_ro_account_added(self, account):
374 self._log.debug(" launchpad tasklet RO account added")
375 self._app.ro_accounts[self._project.name] = \
376 list(self.ro_cfg_handler.accounts.values())
377 self._log.debug("Accounts: %s", self._app.ro_accounts)
378 self.ro_operdata_handler.add_ro_account(account)
379
380 @asyncio.coroutine
381 def register(self):
382 yield from self.ro_cfg_handler.register()
383 yield from self.ro_operdata_handler.register()
384
385 def deregister(self):
386 self.ro_cfg_handler.deregister()
387 self.ro_operdata_handler.deregister()
388
389 class StatusHandlers(object):
390 STATUS_MAP = {
391 downloader.DownloadStatus.STARTED: TaskStatus.QUEUED.value_nick.upper(),
392 downloader.DownloadStatus.IN_PROGRESS: TaskStatus.IN_PROGRESS.value_nick.upper(),
393 downloader.DownloadStatus.COMPLETED: TaskStatus.COMPLETED.value_nick.upper(),
394 downloader.DownloadStatus.FAILED: TaskStatus.FAILED.value_nick.upper(),
395 downloader.DownloadStatus.CANCELLED: TaskStatus.CANCELLED.value_nick.upper()
396 }
397
398 def __init__(self, dts, log, loop, app, project):
399 self.log = log
400 self.dts = dts
401 self.loop = loop
402 self.app = app
403 self.project = project
404
405 @abc.abstractmethod
406 def xpath(self, transaction_id=None):
407 return
408
409 @asyncio.coroutine
410 def register(self):
411 self.reg = yield from self.dts.register(xpath=self.xpath(),
412 flags=rwdts.Flag.PUBLISHER|rwdts.Flag.CACHE|rwdts.Flag.NO_PREP_READ)
413
414 assert self.reg is not None
415
416 def deregister(self):
417 if self.reg:
418 self.reg.deregister()
419 self.reg = None
420
421
422 class UploadStatusHandlers(StatusHandlers):
423 """Publisher for status of onboarded packages.
424 """
425 def __init__(self, dts, log, loop, app, project):
426 super(UploadStatusHandlers, self).__init__(dts, log, loop, app, project)
427 self.reg = None
428 self.transaction_to_job_map = {}
429
430 def xpath(self, transaction_id=None):
431 return self.project.add_project("D,/rw-pkg-mgmt:create-jobs/rw-pkg-mgmt:job" +
432 ("[transaction-id={}]".format(quoted_key(transaction_id)) if transaction_id else ""))
433
434 def create_job_xpath(self):
435 return self.project.add_project("D,/rw-pkg-mgmt:create-jobs")
436
437 @asyncio.coroutine
438 def register(self):
439 @asyncio.coroutine
440 def on_prepare(xact_info, action, ks_path, msg):
441 """ prepare callback from dts """
442
443 if action == rwdts.QueryAction.READ:
444 xpath = ks_path.to_xpath(RwPkgMgmtYang.get_schema())
445 path_entry = RwPkgMgmtYang.YangData_RwProject_Project_CreateJobs_Job().schema().keyspec_to_entry(ks_path)
446 transaction_id = path_entry.key00.transaction_id
447 if transaction_id:
448 create_job_msg = msg.as_dict()
449 if create_job_msg:
450 if transaction_id in self.transaction_to_job_map:
451 job = self.transaction_to_job_map[transaction_id]
452 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK,
453 xpath=xpath,
454 msg=job)
455 return
456 else:
457 jobs = RwPkgMgmtYang.YangData_RwProject_Project_CreateJobs()
458 for job in self.transaction_to_job_map.values():
459 jb = RwPkgMgmtYang.YangData_RwProject_Project_CreateJobs_Job.from_dict({
460 "transaction_id": job.transaction_id,
461 "status": job.status
462 })
463 jobs.job.append(jb)
464 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK,
465 xpath=self.create_job_xpath(),
466 msg=jobs)
467 return
468 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
469
470 hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
471 with self.dts.group_create() as group:
472 self.reg = group.register(xpath=self.xpath(),
473 handler=hdl,
474 flags=rwdts.Flag.PUBLISHER,
475 )
476
477 def upload_status(self, job, trans_id):
478 try:
479 create_job = RwPkgMgmtYang.YangData_RwProject_Project_CreateJobs_Job.from_dict({
480 "transaction_id": trans_id,
481 "status": StatusHandlers.STATUS_MAP[job.status]
482 })
483 self.transaction_to_job_map[trans_id] = create_job
484 except Exception as e:
485 self.log.error("Exception : {}".format(e))
486
487 class UpdateStatusHandlers(StatusHandlers):
488 """Publisher for status of updated packages.
489 """
490 def __init__(self, dts, log, loop, app, project):
491 super(UpdateStatusHandlers, self).__init__(dts, log, loop, app, project)
492
493 def xpath(self, transaction_id=None):
494 return self.project.add_project("D,/rw-pkg-mgmt:update-jobs/rw-pkg-mgmt:job" +
495 ("[transaction-id={}]".format(quoted_key(transaction_id)) if transaction_id else ""))
496
497 @asyncio.coroutine
498 def schedule_dts_work(self, job, transaction_id):
499 # Publish the download state
500 create_job = RwPkgMgmtYang.YangData_RwProject_Project_UpdateJobs_Job.from_dict({
501 "transaction_id": transaction_id,
502 "status": StatusHandlers.STATUS_MAP[job.status]
503 })
504
505 self.reg.update_element(
506 self.xpath(transaction_id=transaction_id), create_job)
507
508 def update_status(self, job, trans_id):
509 self.log.debug("Download completed, writing status of task")
510 asyncio.ensure_future(self.schedule_dts_work(job, trans_id), loop=self.loop)
511
512 class LaunchpadProject(ManoProject):
513
514 def __init__(self, name, tasklet, **kw):
515 super(LaunchpadProject, self).__init__(tasklet.log, name)
516 self.update(tasklet)
517 self._app = kw['app']
518
519 self.config_handler = None
520 self.nsd_catalog_handler = None
521 self.vld_catalog_handler = None
522 self.vnfd_catalog_handler = None
523 self.cloud_handler = None
524 self.ro_handler = None
525 self.lp_config_handler = None
526 self.account_handler = None
527 self.upload_handlers = None
528 self.update_handlers = None
529
530 self.nsd_catalog = dict()
531 self.vld_catalog = dict()
532 self.vnfd_catalog = dict()
533 self.nsd_package_store = rift.package.store.NsdPackageFilesystemStore(tasklet.log,
534 project=name)
535 self.vnfd_package_store = rift.package.store.VnfdPackageFilesystemStore(tasklet.log,
536 project=name)
537
538 @property
539 def dts(self):
540 return self._dts
541
542 @property
543 def loop(self):
544 return self._loop
545
546 @property
547 def upload_status_handler(self):
548 return self.upload_handlers
549
550 @property
551 def update_status_handler(self):
552 return self.update_handlers
553
554 @asyncio.coroutine
555 def register(self):
556 self.log.debug("creating NSD catalog handler for project {}".format(self.name))
557 self.nsd_catalog_handler = NsdCatalogDtsHandler(self, self._app)
558 yield from self.nsd_catalog_handler.register()
559
560 self.log.debug("creating VNFD catalog handler for project {}".format(self.name))
561 self.vnfd_catalog_handler = VnfdCatalogDtsHandler(self, self._app)
562 yield from self.vnfd_catalog_handler.register()
563
564 self.log.debug("creating cloud account handler for project {}".format(self.name))
565 self.cloud_handler = CloudAccountHandlers(self.dts, self.log, self.log_hdl,
566 self.loop, self._app, self)
567 yield from self.cloud_handler.register()
568
569 self.log.debug("creating RO account handler for project {}".format(self.name))
570 self.ro_handler = ROAccountHandlers(self.dts, self.log, self.loop, self._app, self)
571 yield from self.ro_handler.register()
572
573 self.log.debug("creating config agent handler for project {}".format(self.name))
574 self.config_handler = CfgAgentAccountHandlers(self.dts, self.log, self.log_hdl,
575 self.loop, self)
576 yield from self.config_handler.register()
577
578 self.log.debug("creating upload handler for project {}".format(self.name))
579 self.upload_handlers = UploadStatusHandlers(self.dts, self.log, self.loop,
580 self._app, self)
581 yield from self.upload_handlers.register()
582
583 self.log.debug("creating update handler for project {}".format(self.name))
584 self.update_handlers = UpdateStatusHandlers(self.dts, self.log, self.loop,
585 self._app, self)
586 yield from self.update_handlers.register()
587
588 def deregister(self):
589 self.log.debug("De-register handlers for project: {}".format(self.name))
590 self.config_handler.deregister()
591 self.cloud_handler.deregister()
592 self.ro_handler.deregister()
593 self.vnfd_catalog_handler.deregister()
594 self.nsd_catalog_handler.deregister()
595 self.update_handlers.deregister()
596 self.upload_handlers.deregister()
597
598 @property
599 def cloud_accounts(self):
600 if self.cloud_handler is None:
601 return list()
602
603 return list(self.cloud_handler.cloud_cfg_handler.accounts.values())
604
605 @property
606 def ro_accounts(self):
607 if self.ro_handler is None:
608 return list()
609
610 return list(self.ro_handler.ro_cfg_handler.accounts.values())
611
612 class LaunchpadTasklet(rift.tasklets.Tasklet):
613 UPLOAD_MAX_BODY_SIZE = MAX_BODY_SIZE
614 UPLOAD_MAX_BUFFER_SIZE = MAX_BUFFER_SIZE
615 UPLOAD_PORT = "4567"
616
617 def __init__(self, *args, **kwargs):
618 super(LaunchpadTasklet, self).__init__(*args, **kwargs)
619 self.rwlog.set_category("rw-mano-log")
620 self.rwlog.set_subcategory("launchpad")
621
622 self.dts = None
623 self.project_handler = None
624
625 self.app = None
626 self.server = None
627 self.projects = {}
628
629 def _get_project(self, project=None):
630 if project is None:
631 project = DEFAULT_PROJECT
632
633 if project in self.projects:
634 return self.projects[project]
635
636 msg = "Project {} not found".format(project)
637 self._log.error(msg)
638 raise LpProjectNotFound(msg)
639
640 def nsd_catalog_get(self, project=None):
641 return self._get_project(project=project).nsd_catalog
642
643 def vnfd_catalog_get(self, project=None):
644 return self._get_project(project=project).vnfd_catalog
645
646 def get_cloud_accounts(self, project=None):
647 return self._get_project(project=project).cloud_accounts
648
649 def start(self):
650 super(LaunchpadTasklet, self).start()
651 self.log.info("Starting LaunchpadTasklet")
652
653 self.log.debug("Registering with dts")
654 self.dts = rift.tasklets.DTS(
655 self.tasklet_info,
656 rwlaunchpad.get_schema(),
657 self.loop,
658 self.on_dts_state_change
659 )
660
661 self.log.debug("Created DTS Api GI Object: %s", self.dts)
662
663 def stop(self):
664 try:
665 self.server.stop()
666 self.dts.deinit()
667 except Exception:
668 self.log.exception("Caught Exception in LP stop")
669 raise
670
671 def get_vnfd_catalog(self, project):
672 return self.projects[project].vnfd_catalog
673
674 def get_nsd_catalog(self, project):
675 return self.projects[project].nsd_catalog
676
677 @asyncio.coroutine
678 def init(self):
679 try:
680 io_loop = rift.tasklets.tornado.TaskletAsyncIOLoop(asyncio_loop=self.loop)
681 self.app = uploader.UploaderApplication.from_tasklet(self)
682 yield from self.app.register()
683
684 manifest = self.tasklet_info.get_pb_manifest()
685 ssl_cert = manifest.bootstrap_phase.rwsecurity.cert
686 ssl_key = manifest.bootstrap_phase.rwsecurity.key
687 ssl_options = {
688 "certfile": ssl_cert,
689 "keyfile": ssl_key,
690 }
691
692 if manifest.bootstrap_phase.rwsecurity.use_ssl:
693 self.server = tornado.httpserver.HTTPServer(
694 self.app,
695 max_body_size=LaunchpadTasklet.UPLOAD_MAX_BODY_SIZE,
696 io_loop=io_loop,
697 ssl_options=ssl_options,
698 )
699
700 else:
701 self.server = tornado.httpserver.HTTPServer(
702 self.app,
703 max_body_size=LaunchpadTasklet.UPLOAD_MAX_BODY_SIZE,
704 io_loop=io_loop,
705 )
706
707 self.log.debug("Registering project handler")
708 self.project_handler = ProjectHandler(self, LaunchpadProject,
709 app=self.app)
710 self.project_handler.register()
711
712 except Exception as e:
713 self.log.error("Exception : {}".format(e))
714 self.log.exception(e)
715
716 @asyncio.coroutine
717 def run(self):
718 address = rwlib.getenv("RWVM_INTERNAL_IPADDR")
719 if (address is None):
720 address=""
721 self.server.listen(LaunchpadTasklet.UPLOAD_PORT, address=address)
722 self.server.listen(LaunchpadTasklet.UPLOAD_PORT, address="127.0.0.1")
723
724 def on_instance_started(self):
725 self.log.debug("Got instance started callback")
726
727 @asyncio.coroutine
728 def on_dts_state_change(self, state):
729 """Handle DTS state change
730
731 Take action according to current DTS state to transition application
732 into the corresponding application state
733
734 Arguments
735 state - current dts state
736
737 """
738 switch = {
739 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
740 rwdts.State.CONFIG: rwdts.State.RUN,
741 }
742
743 handlers = {
744 rwdts.State.INIT: self.init,
745 rwdts.State.RUN: self.run,
746 }
747
748 # Transition application to next state
749 handler = handlers.get(state, None)
750 if handler is not None:
751 yield from handler()
752
753 # Transition dts to next state
754 next_state = switch.get(state, None)
755 if next_state is not None:
756 self.dts.handle.set_state(next_state)
757