ac7f6b4a0a902ea2b9420b97f43b77275012b262
[osm/SO.git] / rwlaunchpad / plugins / rwmonitor / test / reprotesttasklet-python.py
1 #!/usr/bin/env python3
2
3 import argparse
4 import asyncio
5 import concurrent.futures
6 import gi
7 import logging
8 import os
9 import rwlogger
10 import sys
11 import time
12 import unittest
13 import xmlrunner
14
15 gi.require_version("RwDts", "1.0")
16 from gi.repository import (
17 RwDts as rwdts,
18 RwDtsYang,
19 )
20 import rift.tasklets
21 import rift.test.dts
22
23 gi.require_version('RwLog', '1.0')
24
25 import rift.tasklets.rwmonitor.core as core
26 import rift.mano.cloud as cloud
27
28 from gi.repository import RwCloudYang, RwLog, RwVnfrYang
29 import rw_peas
30
31 from repro import update
32
33 gi.require_version('RwKeyspec', '1.0')
34 from gi.repository.RwKeyspec import quoted_key
35
36
37 class DtsHandler(object):
38 def __init__(self, tasklet):
39 self.reg = None
40 self.tasklet = tasklet
41
42 @property
43 def log(self):
44 return self.tasklet.log
45
46 @property
47 def log_hdl(self):
48 return self.tasklet.log_hdl
49
50 @property
51 def dts(self):
52 return self.tasklet.dts
53
54 @property
55 def loop(self):
56 return self.tasklet.loop
57
58 @property
59 def classname(self):
60 return self.__class__.__name__
61
62
63 class VdurNfviMetricsPublisher(DtsHandler):
64 """
65 A VdurNfviMetricsPublisher is responsible for publishing the NFVI metrics
66 from a single VDU.
67 """
68
69 XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr[vnfr:id={}]/vnfr:vdur[vnfr:id={}]/rw-vnfr:nfvi-metrics"
70
71 # This timeout defines the length of time the publisher will wait for a
72 # request to a data source to complete. If the request cannot be completed
73 # before timing out, the current data will be published instead.
74 TIMEOUT = 2.0
75
76 def __init__(self, tasklet, vnfr_id, vdur_id):
77 """Create an instance of VdurNvfiPublisher
78
79 Arguments:
80 tasklet - the tasklet
81 vdur - the VDUR of the VDU whose metrics are published
82
83 """
84 super().__init__(tasklet)
85 self._vnfr_id = vnfr_id
86 self._vdur_id = vdur_id
87
88 self._handle = None
89 self._xpath = VdurNfviMetricsPublisher.XPATH.format(quoted_key(vnfr_id), quoted_key(vdur_id))
90
91 self._deregistered = asyncio.Event(loop=self.loop)
92
93 @property
94 def xpath(self):
95 """The XPATH that the metrics are published on"""
96 return self._xpath
97
98 @asyncio.coroutine
99 def dts_on_prepare(self, xact_info, action, ks_path, msg):
100 """Handles the DTS on_prepare callback"""
101 self.log.debug("{}:dts_on_prepare".format(self.classname))
102
103 if action == rwdts.QueryAction.READ:
104 # If the publisher has been deregistered, the xpath element has
105 # been deleted. So we do not want to publish the metrics and
106 # re-created the element.
107 if not self._deregistered.is_set():
108 metrics = self.tasklet.on_retrieve_nfvi_metrics(self._vdur_id)
109 xact_info.respond_xpath(
110 rwdts.XactRspCode.MORE,
111 self.xpath,
112 metrics,
113 )
114
115 xact_info.respond_xpath(rwdts.XactRspCode.ACK, self.xpath)
116
117 @asyncio.coroutine
118 def register(self):
119 """Register the publisher with DTS"""
120 self._handle = yield from self.dts.register(
121 xpath=self.xpath,
122 handler=rift.tasklets.DTS.RegistrationHandler(
123 on_prepare=self.dts_on_prepare,
124 ),
125 flags=rwdts.Flag.PUBLISHER,
126 )
127
128 def deregister(self):
129 """Deregister the publisher from DTS"""
130 # Mark the publisher for deregistration. This prevents the publisher
131 # from creating an element after it has been deleted.
132 self._deregistered.set()
133
134 # Now that we are done with the registration handle, delete the element
135 # and tell DTS to deregister it
136 self._handle.delete_element(self.xpath)
137 self._handle.deregister()
138 self._handle = None
139
140
141 class RwLogTestTasklet(rift.tasklets.Tasklet):
142 """ A tasklet to test Python rwlog interactions """
143 def __init__(self, *args, **kwargs):
144 super(RwLogTestTasklet, self).__init__(*args, **kwargs)
145 self._dts = None
146 self.rwlog.set_category("rw-logtest-log")
147 self._metrics = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_NfviMetrics()
148
149 def start(self):
150 """ The task start callback """
151 super(RwLogTestTasklet, self).start()
152
153 self._dts = rift.tasklets.DTS(self.tasklet_info,
154 RwVnfrYang.get_schema(),
155 self.loop,
156 self.on_dts_state_change)
157 @property
158 def dts(self):
159 return self._dts
160
161 @asyncio.coroutine
162 def init(self):
163 pass
164
165 def on_retrieve_nfvi_metrics(self, vdur_id):
166 return self._metrics
167
168 @asyncio.coroutine
169 def run(self):
170 def go():
171 account_msg = RwCloudYang.YangData_RwProject_Project_CloudAccounts_CloudAccountList.from_dict({
172 "account_type": "openstack",
173 "openstack": {
174 "key": "admin",
175 "secret": "mypasswd",
176 "auth_url": 'http://10.66.4.18:5000/v3/',
177 "tenant": "demo",
178 "mgmt_network": "private"
179 }
180 })
181
182 account = cloud.CloudAccount(
183 self.log,
184 RwLog.Ctx.new(__file__), account_msg
185 )
186
187 vim_id = "a7f30def-0942-4425-8454-1ffe02b7db1e"
188 instances = 20
189
190 executor = concurrent.futures.ThreadPoolExecutor(10)
191 plugin = rw_peas.PeasPlugin("rwmon_ceilometer", 'RwMon-1.0')
192 impl = plugin.get_interface("Monitoring")
193 while True:
194 tasks = []
195 for _ in range(instances):
196 task = update(self.loop, self.log, executor, account.cal_account_msg, impl, vim_id)
197 tasks.append(task)
198
199 self.log.debug("Running %s update tasks", instances)
200 #self.loop.run_until_complete(asyncio.wait(tasks, loop=self.loop, timeout=20))
201 done, pending = yield from asyncio.wait(tasks, loop=self.loop, timeout=20)
202 self._metrics = done.pop().result()
203
204 self._publisher = VdurNfviMetricsPublisher(self, "a7f30def-0942-4425-8454-1ffe02b7db1e", "a7f30def-0942-4425-8454-1ffe02b7db1e")
205 yield from self._publisher.register()
206 self.loop.create_task(go())
207
208 @asyncio.coroutine
209 def on_dts_state_change(self, state):
210 switch = {
211 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
212 rwdts.State.CONFIG: rwdts.State.RUN,
213 }
214
215 handlers = {
216 rwdts.State.INIT: self.init,
217 rwdts.State.RUN: self.run,
218 }
219
220 # Transition application to next state
221 handler = handlers.get(state, None)
222 if handler is not None:
223 yield from handler()
224
225 # Transition dts to next state
226 next_state = switch.get(state, None)
227 if next_state is not None:
228 self.log.debug("Changing state to %s", next_state)
229 self._dts.handle.set_state(next_state)