import rift.tasklets.rwlaunchpad.uploader as uploader
import rift.tasklets.rwlaunchpad.message as message
import rift.tasklets.rwlaunchpad.export as export
+from rift.mano.utils.project import DEFAULT_PROJECT
import rift.test.dts
+
import mock
TEST_STRING = "foobar"
})
mock_vnfd_catalog = {self.uid: mock_vnfd}
- self.app = uploader.UploaderApplication(
- self.log,
- self.dts,
- self.loop,
- vnfd_catalog=mock_vnfd_catalog)
+ class MockTasklet:
+ def __init__(cls):
+ def get_vnfd_catalog(project=DEFAULT_PROJECT):
+ return mock_vnfd_catalog
+
+ cls.log = self.log
+ cls.loop = self.loop
+ cls.dts = self.dts
+ cls.get_vnfd_catalog = get_vnfd_catalog
+ cls.get_nsd_catalog = None
+
+ self.app = uploader.UploaderApplication(MockTasklet())
AsyncIOMainLoop().install()
self.server = tornado.httpserver.HTTPServer(
yield from self.app.register()
ip = RwPkgMgmtYang.YangInput_RwPkgMgmt_PackageCreate.from_dict({
"package_type": "VNFD",
- "external_url": "http://repo.riftio.com/releases/open.riftio.com/4.2.1/VNFS/ping_vnfd.tar.gz"
+ "external_url": "http://repo.riftio.com/releases/open.riftio.com/4.2.1/VNFS/ping_vnfd.tar.gz",
+ "project_name": DEFAULT_PROJECT
})
rpc_out = yield from self.dts.query_rpc(
# Update
ip = RwPkgMgmtYang.YangInput_RwPkgMgmt_PackageUpdate.from_dict({
"package_type": "VNFD",
- "external_url": "http://repo.riftio.com/releases/open.riftio.com/4.2.1/VNFS/ping_vnfd.tar.gz"
+ "external_url": "http://repo.riftio.com/releases/open.riftio.com/4.2.1/VNFS/ping_vnfd.tar.gz",
+ "project_name": DEFAULT_PROJECT
})
rpc_out = yield from self.dts.query_rpc(
"I,/rw-pkg-mgmt:package-update",