update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / common / python / rift / mano / dts / subscriber / store.py
1 """
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 @file store.py
19 @author Varun Prasad (varun.prasad@riftio.com)
20 @date 09-Jul-2016
21
22 """
23
24 import asyncio
25 import enum
26
27 from gi.repository import RwDts as rwdts
28 from . import core, ns_subscriber, vnf_subscriber
29
30
31 class SubscriberStore(core.SubscriberDtsHandler):
32 """A convenience class that hold all the VNF and NS related config and Opdata
33 """
34 KEY = enum.Enum('KEY', 'NSR NSD VNFD VNFR')
35
36 def __init__(self, log, dts, loop, project, callback=None):
37 super().__init__(log, dts, loop, project)
38
39 params = (self.log, self.dts, self.loop, self.project)
40
41 self._nsr_sub = ns_subscriber.NsrCatalogSubscriber(*params, callback=self.on_nsr_change)
42 self._nsrs = {}
43 self._nsd_sub = ns_subscriber.NsdCatalogSubscriber(*params)
44
45 self._vnfr_sub = vnf_subscriber.VnfrCatalogSubscriber(*params, callback=self.on_vnfr_change)
46 self._vnfrs = {}
47 self._vnfd_sub = vnf_subscriber.VnfdCatalogSubscriber(*params)
48
49 @property
50 def vnfd(self):
51 return list(self._vnfd_sub.reg.get_xact_elements())
52
53 @property
54 def nsd(self):
55 return list(self._nsd_sub.reg.get_xact_elements())
56
57 @property
58 def vnfr(self):
59 return list(self._vnfrs.values())
60
61 @property
62 def nsr(self):
63 return list(self._nsrs.values())
64
65 def _unwrap(self, values, id_name):
66 try:
67 return values[0]
68 except KeyError:
69 self.log.exception("Unable to find the object with the given "
70 "ID {}".format(id_name))
71
72 def get_nsr(self, nsr_id):
73 values = [nsr for nsr in self.nsr if nsr.ns_instance_config_ref == nsr_id]
74 return self._unwrap(values, nsr_id)
75
76 def get_nsd(self, nsd_id):
77 values = [nsd for nsd in self.nsd if nsd.id == nsd_id]
78 return self._unwrap(values, nsd_id)
79
80 def get_vnfr(self, vnfr_id):
81 values = [vnfr for vnfr in self.vnfr if vnfr.id == vnfr_id]
82 return self._unwrap(values, vnfr_id)
83
84 def get_vnfd(self, vnfd_id):
85 values = [vnfd for vnfd in self.vnfd if vnfd.id == vnfd_id]
86 return self._unwrap(values, vnfd_id)
87
88 @asyncio.coroutine
89 def register(self):
90 yield from self._vnfd_sub.register()
91 yield from self._nsd_sub.register()
92 yield from self._vnfr_sub.register()
93 yield from self._nsr_sub.register()
94
95 def deregister(self):
96 self._log.debug("De-register store for project {}".
97 format(self._project))
98 self._vnfd_sub.deregister()
99 self._nsd_sub.deregister()
100 self._vnfr_sub.deregister()
101 self._nsr_sub.deregister()
102
103 @asyncio.coroutine
104 def refresh_store(self, subsriber, store):
105 itr = yield from self.dts.query_read(subsriber.get_xpath())
106
107 store.clear()
108 for res in itr:
109 result = yield from res
110 result = result.result
111 store[getattr(result, subsriber.key_name())] = result
112
113 def on_nsr_change(self, msg, action):
114 if action == rwdts.QueryAction.DELETE:
115 if msg.ns_instance_config_ref in self._nsrs:
116 del self._nsrs[msg.ns_instance_config_ref]
117 return
118
119 self.loop.create_task(self.refresh_store(self._nsr_sub, self._nsrs))
120
121 def on_vnfr_change(self, msg, action):
122 if action == rwdts.QueryAction.DELETE:
123 if msg.id in self._vnfrs:
124 del self._vnfrs[msg.id]
125 return
126
127 self.loop.create_task(self.refresh_store(self._vnfr_sub, self._vnfrs))