5 import concurrent
.futures
15 gi
.require_version("RwDts", "1.0")
16 from gi
.repository
import (
23 gi
.require_version('RwLog', '1.0')
25 import rift
.tasklets
.rwmonitor
.core
as core
26 import rift
.mano
.cloud
as cloud
28 from gi
.repository
import RwCloudYang
, RwLog
, RwVnfrYang
31 from repro
import update
33 gi
.require_version('RwKeyspec', '1.0')
34 from gi
.repository
.RwKeyspec
import quoted_key
37 class DtsHandler(object):
38 def __init__(self
, tasklet
):
40 self
.tasklet
= tasklet
44 return self
.tasklet
.log
48 return self
.tasklet
.log_hdl
52 return self
.tasklet
.dts
56 return self
.tasklet
.loop
60 return self
.__class
__.__name
__
63 class VdurNfviMetricsPublisher(DtsHandler
):
65 A VdurNfviMetricsPublisher is responsible for publishing the NFVI metrics
69 XPATH
= "D,/vnfr:vnfr-catalog/vnfr:vnfr[vnfr:id={}]/vnfr:vdur[vnfr:id={}]/rw-vnfr:nfvi-metrics"
71 # This timeout defines the length of time the publisher will wait for a
72 # request to a data source to complete. If the request cannot be completed
73 # before timing out, the current data will be published instead.
76 def __init__(self
, tasklet
, vnfr_id
, vdur_id
):
77 """Create an instance of VdurNvfiPublisher
81 vdur - the VDUR of the VDU whose metrics are published
84 super().__init
__(tasklet
)
85 self
._vnfr
_id
= vnfr_id
86 self
._vdur
_id
= vdur_id
89 self
._xpath
= VdurNfviMetricsPublisher
.XPATH
.format(quoted_key(vnfr_id
), quoted_key(vdur_id
))
91 self
._deregistered
= asyncio
.Event(loop
=self
.loop
)
95 """The XPATH that the metrics are published on"""
99 def dts_on_prepare(self
, xact_info
, action
, ks_path
, msg
):
100 """Handles the DTS on_prepare callback"""
101 self
.log
.debug("{}:dts_on_prepare".format(self
.classname
))
103 if action
== rwdts
.QueryAction
.READ
:
104 # If the publisher has been deregistered, the xpath element has
105 # been deleted. So we do not want to publish the metrics and
106 # re-created the element.
107 if not self
._deregistered
.is_set():
108 metrics
= self
.tasklet
.on_retrieve_nfvi_metrics(self
._vdur
_id
)
109 xact_info
.respond_xpath(
110 rwdts
.XactRspCode
.MORE
,
115 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
, self
.xpath
)
119 """Register the publisher with DTS"""
120 self
._handle
= yield from self
.dts
.register(
122 handler
=rift
.tasklets
.DTS
.RegistrationHandler(
123 on_prepare
=self
.dts_on_prepare
,
125 flags
=rwdts
.Flag
.PUBLISHER
,
128 def deregister(self
):
129 """Deregister the publisher from DTS"""
130 # Mark the publisher for deregistration. This prevents the publisher
131 # from creating an element after it has been deleted.
132 self
._deregistered
.set()
134 # Now that we are done with the registration handle, delete the element
135 # and tell DTS to deregister it
136 self
._handle
.delete_element(self
.xpath
)
137 self
._handle
.deregister()
141 class RwLogTestTasklet(rift
.tasklets
.Tasklet
):
142 """ A tasklet to test Python rwlog interactions """
143 def __init__(self
, *args
, **kwargs
):
144 super(RwLogTestTasklet
, self
).__init
__(*args
, **kwargs
)
146 self
.rwlog
.set_category("rw-logtest-log")
147 self
._metrics
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_NfviMetrics()
150 """ The task start callback """
151 super(RwLogTestTasklet
, self
).start()
153 self
._dts
= rift
.tasklets
.DTS(self
.tasklet_info
,
154 RwVnfrYang
.get_schema(),
156 self
.on_dts_state_change
)
165 def on_retrieve_nfvi_metrics(self
, vdur_id
):
171 account_msg
= RwCloudYang
.YangData_RwProject_Project_CloudAccounts_CloudAccountList
.from_dict({
172 "account_type": "openstack",
175 "secret": "mypasswd",
176 "auth_url": 'http://10.66.4.18:5000/v3/',
178 "mgmt_network": "private"
182 account
= cloud
.CloudAccount(
184 RwLog
.Ctx
.new(__file__
), account_msg
187 vim_id
= "a7f30def-0942-4425-8454-1ffe02b7db1e"
190 executor
= concurrent
.futures
.ThreadPoolExecutor(10)
191 plugin
= rw_peas
.PeasPlugin("rwmon_ceilometer", 'RwMon-1.0')
192 impl
= plugin
.get_interface("Monitoring")
195 for _
in range(instances
):
196 task
= update(self
.loop
, self
.log
, executor
, account
.cal_account_msg
, impl
, vim_id
)
199 self
.log
.debug("Running %s update tasks", instances
)
200 #self.loop.run_until_complete(asyncio.wait(tasks, loop=self.loop, timeout=20))
201 done
, pending
= yield from asyncio
.wait(tasks
, loop
=self
.loop
, timeout
=20)
202 self
._metrics
= done
.pop().result()
204 self
._publisher
= VdurNfviMetricsPublisher(self
, "a7f30def-0942-4425-8454-1ffe02b7db1e", "a7f30def-0942-4425-8454-1ffe02b7db1e")
205 yield from self
._publisher
.register()
206 self
.loop
.create_task(go())
209 def on_dts_state_change(self
, state
):
211 rwdts
.State
.INIT
: rwdts
.State
.REGN_COMPLETE
,
212 rwdts
.State
.CONFIG
: rwdts
.State
.RUN
,
216 rwdts
.State
.INIT
: self
.init
,
217 rwdts
.State
.RUN
: self
.run
,
220 # Transition application to next state
221 handler
= handlers
.get(state
, None)
222 if handler
is not None:
225 # Transition dts to next state
226 next_state
= switch
.get(state
, None)
227 if next_state
is not None:
228 self
.log
.debug("Changing state to %s", next_state
)
229 self
._dts
.handle
.set_state(next_state
)