1c32fc91c009edc5ea05681688565af2d21076f7
[osm/SO.git] / rwcm / plugins / rwconman / rift / tasklets / rwconmantasklet / jujuconf.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 import asyncio
18 import re
19 import tempfile
20 import yaml
21 import os
22
23 import rift.mano.utils.juju_api as juju
24 from . import riftcm_config_plugin
25
26
27 # Charm service name accepts only a to z and -.
28 def get_vnf_unique_name(nsr_name, vnfr_short_name, member_vnf_index):
29 name = "{}-{}-{}".format(nsr_name, vnfr_short_name, member_vnf_index)
30 new_name = ''
31 for c in name:
32 if c.isdigit():
33 c = chr(97 + int(c))
34 elif not c.isalpha():
35 c = "-"
36 new_name += c
37 return new_name.lower()
38
39
40 class JujuConfigPlugin(riftcm_config_plugin.RiftCMConfigPluginBase):
41 """
42 Juju implementation of the riftcm_config_plugin.RiftCMConfigPluginBase
43 """
44 def __init__(self, dts, log, loop, account):
45 riftcm_config_plugin.RiftCMConfigPluginBase.__init__(self, dts, log, loop, account)
46 self._name = account.name
47 self._type = 'juju'
48 self._ip_address = account.juju.ip_address
49 self._port = account.juju.port
50 self._user = account.juju.user
51 self._secret = account.juju.secret
52 self._rift_install_dir = os.environ['RIFT_INSTALL']
53 self._rift_artif_dir = os.environ['RIFT_ARTIFACTS']
54
55 ############################################################
56 # This is wrongfully overloaded with 'juju' private data. #
57 # Really need to separate agent_vnfr from juju vnfr data. #
58 # Currently, this holds agent_vnfr, which has actual vnfr, #
59 # then this juju overloads actual vnfr with its own #
60 # dictionary elemetns (WRONG!!!) #
61 self._juju_vnfs = {}
62 ############################################################
63
64 self._tasks = {}
65 self._api = juju.JujuApi(log, loop,
66 self._ip_address, self._port,
67 self._user, self._secret)
68
69 @property
70 def name(self):
71 return self._name
72
73 @property
74 def agent_type(self):
75 return self._type
76
77 @property
78 def api(self):
79 return self._api
80
81 @property
82 def vnfr(self, vnfr_id):
83 try:
84 vnfr = self._juju_vnfs[vnfr_id].vnfr
85 except KeyError:
86 self._log.error("jujuCA: Did not find VNFR %s in juju plugin", vnfr_id)
87 return None
88
89 return vnfr
90
91 def juju_log(self, level, name, log_str, *args):
92 if name is not None:
93 g_log_str = 'jujuCA:({}) {}'.format(name, log_str)
94 else:
95 g_log_str = 'jujuCA: {}'.format(log_str)
96 getattr(self._log, level)(g_log_str, *args)
97
98 # TBD: Do a better, similar to config manager
99 def xlate(self, tag, tags):
100 # TBD
101 if tag is None:
102 return tag
103 val = tag
104 if re.search('<.*>', tag):
105 self._log.debug("jujuCA: Xlate value %s", tag)
106 try:
107 if tag == '<rw_mgmt_ip>':
108 val = tags['rw_mgmt_ip']
109 except KeyError as e:
110 self._log.info("jujuCA: Did not get a value for tag %s, e=%s",
111 tag, e)
112 return val
113
114 @asyncio.coroutine
115 def notify_create_vlr(self, agent_nsr, agent_vnfr, vld, vlr):
116 """
117 Notification of create VL record
118 """
119 return True
120
121 @asyncio.coroutine
122 def notify_create_vnfr(self, agent_nsr, agent_vnfr):
123 """
124 Notification of create Network VNF record
125 Returns True if configured using config_agent
126 """
127 # Deploy the charm if specified for the vnf
128 self._log.debug("jujuCA: create vnfr nsr=%s vnfr=%s",
129 agent_nsr.name, agent_vnfr.name)
130 self._log.debug("jujuCA: Config = %s",
131 agent_vnfr.vnf_configuration)
132 try:
133 vnf_config = agent_vnfr.vnfr_msg.vnf_configuration
134 self._log.debug("jujuCA: vnf_configuration = %s", vnf_config)
135 if not vnf_config.has_field('juju'):
136 return False
137 charm = vnf_config.juju.charm
138 self._log.debug("jujuCA: charm = %s", charm)
139 except Exception as e:
140 self._log.Error("jujuCA: vnf_configuration error for vnfr {}: {}".
141 format(agent_vnfr.name, e))
142 return False
143
144 # Prepare unique name for this VNF
145 vnf_unique_name = get_vnf_unique_name(agent_nsr.name,
146 agent_vnfr.name,
147 agent_vnfr.member_vnf_index)
148 if vnf_unique_name in self._tasks:
149 self._log.warn("jujuCA: Service %s already deployed",
150 vnf_unique_name)
151
152 vnfr_dict = agent_vnfr.vnfr
153 vnfr_dict.update({'vnf_juju_name': vnf_unique_name,
154 'charm': charm,
155 'nsr_id': agent_nsr.id,
156 'member_vnf_index': agent_vnfr.member_vnf_index,
157 'tags': {},
158 'active': False,
159 'config': vnf_config,
160 'vnfr_name' : agent_vnfr.name})
161 self._log.debug("jujuCA: Charm %s for vnf %s to be deployed as %s",
162 charm, agent_vnfr.name, vnf_unique_name)
163
164 # Find the charm directory
165 try:
166 path = os.path.join(self._rift_artif_dir,
167 'launchpad/libs',
168 agent_vnfr.vnfr_msg.vnfd_ref,
169 'charms/trusty',
170 charm)
171 self._log.debug("jujuCA: Charm dir is {}".format(path))
172 if not os.path.isdir(path):
173 self._log.error("jujuCA: Did not find the charm directory at {}".
174 format(path))
175 path = None
176 except Exception as e:
177 self.log.exception(e)
178 return False
179
180 if vnf_unique_name not in self._tasks:
181 self._tasks[vnf_unique_name] = {}
182
183 self._tasks[vnf_unique_name]['deploy'] = self.loop.create_task(
184 self.api.deploy_service(charm, vnf_unique_name, path=path))
185
186 self._log.debug("jujuCA: Deploying service %s",
187 vnf_unique_name)
188
189 return True
190
191 @asyncio.coroutine
192 def notify_instantiate_vnfr(self, agent_nsr, agent_vnfr):
193 """
194 Notification of Instantiate NSR with the passed nsr id
195 """
196 return True
197
198 @asyncio.coroutine
199 def notify_instantiate_vlr(self, agent_nsr, agent_vnfr, vlr):
200 """
201 Notification of Instantiate NSR with the passed nsr id
202 """
203 return True
204
205 @asyncio.coroutine
206 def notify_terminate_nsr(self, agent_nsr, agent_vnfr):
207 """
208 Notification of Terminate the network service
209 """
210 return True
211
212 @asyncio.coroutine
213 def notify_terminate_vnfr(self, agent_nsr, agent_vnfr):
214 """
215 Notification of Terminate the network service
216 """
217 self._log.debug("jujuCA: Terminate VNFr {}, current vnfrs={}".
218 format(agent_vnfr.name, self._juju_vnfs))
219 try:
220 vnfr = agent_vnfr.vnfr
221 service = vnfr['vnf_juju_name']
222
223 self._log.debug ("jujuCA: Terminating VNFr %s, %s",
224 agent_vnfr.name, service)
225 self._tasks[service]['destroy'] = self.loop.create_task(
226 self.api.destroy_service(service)
227 )
228
229 del self._juju_vnfs[agent_vnfr.id]
230 self._log.debug ("jujuCA: current vnfrs={}".
231 format(self._juju_vnfs))
232 if service in self._tasks:
233 tasks = []
234 for action in self._tasks[service].keys():
235 #if self.check_task_status(service, action):
236 tasks.append(action)
237 del tasks
238 except KeyError as e:
239 self._log.debug ("jujuCA: Termiating charm service for VNFr {}, e={}".
240 format(agent_vnfr.name, e))
241 except Exception as e:
242 self._log.error("jujuCA: Exception terminating charm service for VNFR {}: {}".
243 format(agent_vnfr.name, e))
244
245 return True
246
247 @asyncio.coroutine
248 def notify_terminate_vlr(self, agent_nsr, agent_vnfr, vlr):
249 """
250 Notification of Terminate the virtual link
251 """
252 return True
253
254 def check_task_status(self, service, action):
255 #self.log.debug("jujuCA: check task status for %s, %s" % (service, action))
256 try:
257 task = self._tasks[service][action]
258 if task.done():
259 self.log.debug("jujuCA: Task for %s, %s done" % (service, action))
260 e = task.exception()
261 if e:
262 self.log.error("jujuCA: Error in task for {} and {} : {}".
263 format(service, action, e))
264 raise Exception(e)
265 r= task.result()
266 if r:
267 self.log.debug("jujuCA: Task for {} and {}, returned {}".
268 format(service, action,r))
269 return True
270 else:
271 self.log.debug("jujuCA: task {}, {} not done".
272 format(service, action))
273 return False
274 except KeyError as e:
275 self.log.error("jujuCA: KeyError for task for {} and {}: {}".
276 format(service, action, e))
277 except Exception as e:
278 self.log.error("jujuCA: Error for task for {} and {}: {}".
279 format(service, action, e))
280 raise
281 return True
282
283 @asyncio.coroutine
284 def vnf_config_primitive(self, nsr_id, vnfr_id, primitive, output):
285 self._log.debug("jujuCA: VNF config primititve {} for nsr {}, vnfr_id {}".
286 format(primitive, nsr_id, vnfr_id))
287 output.execution_status = "failed"
288 output.execution_id = ''
289 output.execution_error_details = ''
290
291 try:
292 vnfr = self._juju_vnfs[vnfr_id].vnfr
293 except KeyError:
294 self._log.error("jujuCA: Did not find VNFR %s in juju plugin",
295 vnfr_id)
296 return
297
298 try:
299 service = vnfr['vnf_juju_name']
300 vnf_config = vnfr['config']
301 self._log.debug("VNF config %s", vnf_config)
302 configs = vnf_config.service_primitive
303 for config in configs:
304 if config.name == primitive.name:
305 self._log.debug("jujuCA: Found the config primitive %s",
306 config.name)
307 params = {}
308 for parameter in primitive.parameter:
309 if parameter.value:
310 val = self.xlate(parameter.value, vnfr['tags'])
311 # TBD do validation of the parameters
312 data_type = 'string'
313 found = False
314 for ca_param in config.parameter:
315 if ca_param.name == parameter.name:
316 data_type = ca_param.data_type
317 found = True
318 break
319 if data_type == 'integer':
320 val = int(parameter.value)
321 if not found:
322 self._log.warn("jujuCA: Did not find parameter {} for {}".
323 format(parameter, config.name))
324 params.update({parameter.name: val})
325
326 if config.name == 'config':
327 if len(params):
328 self._log.debug("jujuCA: applying config with params {} for service {}".
329 format(params, service))
330
331 rc = yield from self.api.apply_config(params, service=service)
332
333 if rc:
334 output.execution_status = "completed"
335 self._log.debug("jujuCA: applied config {} on {}".
336 format(params, service))
337 else:
338 output.execution_status = 'failed'
339 output.execution_error_Details = \
340 'Failed to apply config: {}'.format(params)
341 self._log.error("jujuCA: Error applying config {} on service {}".
342 format(params, service))
343 else:
344 self._log.warn("jujuCA: Did not find valid paramaters for config : {}".
345 format(primitive.parameter))
346 else:
347 self._log.debug("jujuCA: Execute action {} on service {} with params {}".
348 format(config.name, service, params))
349
350 resp = yield from self.api.execute_action(config.name,
351 params,
352 service=service)
353
354 if resp:
355 if 'error' in resp:
356 output.execution_error_details = resp['error']['Message']
357 else:
358 output.execution_id = resp['action']['tag']
359 output.execution_status = resp['status']
360 if output.execution_status == 'failed':
361 output.execution_error_details = resp['message']
362 self._log.debug("jujuCA: execute action {} on service {} returned {}".
363 format(config.name, service, output.execution_status))
364 else:
365 self._log.error("jujuCA: error executing action {} for {} with {}".
366 format(config.name, service, params))
367 output.execution_id = ''
368 output.execution_status = 'failed'
369 output.execution_error_details = "Failed to queue the action"
370 break
371
372 except KeyError as e:
373 self._log.info("VNF %s does not have config primititves, e=%s", vnfr_id, e)
374
375 @asyncio.coroutine
376 def apply_config(self, agent_nsr, agent_vnfr, config, rpc_ip):
377 """ Notification on configuration of an NSR """
378 pass
379
380 @asyncio.coroutine
381 def apply_ns_config(self, agent_nsr, agent_vnfrs, rpc_ip):
382 """
383
384 ###### TBD - This really does not belong here. Looks more like NS level script ####
385 ###### apply_config should be called for a particular VNF only here ###############
386
387 Hook: Runs the user defined script. Feeds all the necessary data
388 for the script thro' yaml file.
389
390 Args:
391 rpc_ip (YangInput_Nsr_ExecNsConfigPrimitive): The input data.
392 nsr (NetworkServiceRecord): Description
393 vnfrs (dict): VNFR ID => VirtualNetworkFunctionRecord
394
395 """
396 def get_meta(agent_nsr):
397 unit_names, initial_params, vnfr_index_map = {}, {}, {}
398
399 for vnfr_id in agent_nsr.vnfr_ids:
400 juju_vnf = self._juju_vnfs[vnfr_id].vnfr
401
402 # Vnfr -> index ref
403 vnfr_index_map[vnfr_id] = juju_vnf['member_vnf_index']
404
405 # Unit name
406 unit_names[vnfr_id] = juju_vnf['vnf_juju_name']
407
408 # Flatten the data for simplicity
409 param_data = {}
410 self._log.debug("Juju Config:%s", juju_vnf['config'])
411 for primitive in juju_vnf['config'].initial_config_primitive:
412 for parameter in primitive.parameter:
413 value = self.xlate(parameter.value, juju_vnf['tags'])
414 param_data[parameter.name] = value
415
416 initial_params[vnfr_id] = param_data
417
418
419 return unit_names, initial_params, vnfr_index_map
420
421 unit_names, init_data, vnfr_index_map = get_meta(agent_nsr)
422
423 # The data consists of 4 sections
424 # 1. Account data
425 # 2. The input passed.
426 # 3. Juju unit names (keyed by vnfr ID).
427 # 4. Initial config data (keyed by vnfr ID).
428 data = dict()
429 data['config_agent'] = dict(
430 name=self._name,
431 host=self._ip_address,
432 port=self._port,
433 user=self._user,
434 secret=self._secret
435 )
436 data["rpc_ip"] = rpc_ip.as_dict()
437 data["unit_names"] = unit_names
438 data["init_config"] = init_data
439 data["vnfr_index_map"] = vnfr_index_map
440
441 tmp_file = None
442 with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
443 tmp_file.write(yaml.dump(data, default_flow_style=True)
444 .encode("UTF-8"))
445
446 self._log.debug("jujuCA: Creating a temp file: {} with input data".format(
447 tmp_file.name))
448
449 # Get the full path to the script
450 script = ''
451 if rpc_ip.user_defined_script[0] == '/':
452 # The script has full path, use as is
453 script = rpc_ip.user_defined_script
454 else:
455 script = os.path.join(self._rift_artif_dir, 'launchpad/libs', agent_nsr.id, 'scripts',
456 rpc_ip.user_defined_script)
457 self.log.debug("jujuCA: Checking for script in %s", script)
458 if not os.path.exists(script):
459 script = os.path.join(self._rift_install_dir, 'usr/bin', rpc_ip.user_defined_script)
460
461 cmd = "{} {}".format(rpc_ip.user_defined_script, tmp_file.name)
462 self._log.debug("jujuCA: Running the CMD: {}".format(cmd))
463
464 coro = asyncio.create_subprocess_shell(cmd, loop=self._loop,
465 stderr=asyncio.subprocess.PIPE)
466 process = yield from coro
467 err = yield from process.stderr.read()
468 task = self._loop.create_task(process.wait())
469
470 return task, err
471
472 @asyncio.coroutine
473 def apply_initial_config(self, agent_nsr, agent_vnfr):
474 """
475 Apply the initial configuration
476 Expect config directives mostly, not actions
477 Actions in initial config may not work based on charm design
478 """
479
480 vnfr = agent_vnfr.vnfr
481 service = vnfr['vnf_juju_name']
482
483 rc = yield from self.api.is_service_up(service=service)
484 if not rc:
485 return False
486
487 action_ids = []
488 try:
489 vnf_cat = agent_vnfr.vnfr_msg
490 if vnf_cat and vnf_cat.mgmt_interface.ip_address:
491 vnfr['tags'].update({'rw_mgmt_ip': vnf_cat.mgmt_interface.ip_address})
492 self._log.debug("jujuCA:(%s) tags: %s", vnfr['vnf_juju_name'], vnfr['tags'])
493
494 config = {}
495 try:
496 for primitive in vnfr['config'].initial_config_primitive:
497 self._log.debug("jujuCA:(%s) Initial config primitive %s", vnfr['vnf_juju_name'], primitive)
498 if primitive.name == 'config':
499 for param in primitive.parameter:
500 if vnfr['tags']:
501 val = self.xlate(param.value, vnfr['tags'])
502 config.update({param.name: val})
503 except KeyError as e:
504 self._log.exception("jujuCA:(%s) Initial config error(%s): config=%s",
505 vnfr['vnf_juju_name'], str(e), config)
506 config = None
507 return False
508
509 if config:
510 self.juju_log('info', vnfr['vnf_juju_name'],
511 "Applying Initial config:%s",
512 config)
513
514 rc = yield from self.api.apply_config(config, service=service)
515 if rc is False:
516 self.log.error("Service {} is in error state".format(service))
517 return False
518
519
520 # Apply any actions specified as part of initial config
521 for primitive in vnfr['config'].initial_config_primitive:
522 if primitive.name != 'config':
523 self._log.debug("jujuCA:(%s) Initial config action primitive %s",
524 vnfr['vnf_juju_name'], primitive)
525 action = primitive.name
526 params = {}
527 for param in primitive.parameter:
528 val = self.xlate(param.value, vnfr['tags'])
529 params.update({param.name: val})
530
531 self._log.info("jujuCA:(%s) Action %s with params %s",
532 vnfr['vnf_juju_name'], action, params)
533
534 resp = yield from self.api.execute_actions(action, params,
535 service=service)
536 if 'error' in resp:
537 self._log.error("Applying initial config failed: {}".
538 format(resp))
539 return False
540
541 action_ids.append(resp['action']['tag'])
542
543 except KeyError as e:
544 self._log.info("Juju config agent(%s): VNFR %s not managed by Juju",
545 vnfr['vnf_juju_name'], agent_vnfr.id)
546 return False
547 except Exception as e:
548 self._log.exception("jujuCA:(%s) Exception juju apply_initial_config for VNFR {}: {}".
549 format(vnfr['vnf_juju_name'], agent_vnfr.id, e))
550 return False
551
552 # Check if all actions completed
553 pending = True
554 while pending:
555 pending = False
556 for act in action_ids:
557 resp = yield from self.api.get_action_status(act, service=service)
558 if 'error' in resp:
559 self._log.error("Initial config failed: {}".format(resp))
560 return False
561
562 if resp['status'] == 'failed':
563 self._log.error("Initial config action failed: {}".format(resp))
564 return False
565
566 if resp['status'] == 'pending':
567 pending = True
568
569 return True
570
571 def add_vnfr_managed(self, agent_vnfr):
572 if agent_vnfr.id not in self._juju_vnfs.keys():
573 self._log.info("juju config agent: add vnfr={}/{}".
574 format(agent_vnfr.name, agent_vnfr.id))
575 self._juju_vnfs[agent_vnfr.id] = agent_vnfr
576
577 def is_vnfr_managed(self, vnfr_id):
578 try:
579 if vnfr_id in self._juju_vnfs:
580 return True
581 except Exception as e:
582 self._log.debug("jujuCA: Is VNFR {} managed: {}".
583 format(vnfr_id, e))
584 return False
585
586 @asyncio.coroutine
587 def is_configured(self, vnfr_id):
588 try:
589 agent_vnfr = self._juju_vnfs[vnfr_id]
590 vnfr = agent_vnfr.vnfr
591 if vnfr['active']:
592 return True
593
594 vnfr = self._juju_vnfs[vnfr_id].vnfr
595 service = vnfr['vnf_juju_name']
596 resp = self.api.is_service_active(service=service)
597 self._juju_vnfs[vnfr_id]['active'] = resp
598 self._log.debug("jujuCA: Service state for {} is {}".
599 format(service, resp))
600 return resp
601
602 except KeyError:
603 self._log.debug("jujuCA: VNFR id {} not found in config agent".
604 format(vnfr_id))
605 return False
606 except Exception as e:
607 self._log.error("jujuCA: VNFR id {} is_configured: {}".
608 format(vnfr_id, e))
609 return False
610
611 @asyncio.coroutine
612 def get_config_status(self, agent_nsr, agent_vnfr):
613 """Get the configuration status for the VNF"""
614 rc = 'unknown'
615
616 try:
617 vnfr = agent_vnfr.vnfr
618 service = vnfr['vnf_juju_name']
619 except KeyError:
620 # This VNF is not managed by Juju
621 return rc
622
623 rc = 'configuring'
624
625 if not self.check_task_status(service, 'deploy'):
626 return rc
627
628 try:
629 resp = yield from self.api.get_service_status(service=service)
630 self._log.debug("jujuCA: Get service %s status? %s", service, resp)
631
632 if resp == 'error':
633 return 'error'
634 if resp == 'active':
635 return 'configured'
636 except KeyError:
637 self._log.error("jujuCA: Check unknown service %s status", service)
638 except Exception as e:
639 self._log.error("jujuCA: Caught exception when checking for service is active: %s", e)
640 self._log.exception(e)
641
642 return rc
643
644 def get_action_status(self, execution_id):
645 ''' Get the action status for an execution ID
646 *** Make sure this is NOT a asyncio coroutine function ***
647 '''
648
649 try:
650 return self.api._get_action_status(execution_id)
651 except Exception as e:
652 self._log.error("jujuCA: Error fetching execution status for %s",
653 execution_id)
654 self._log.exception(e)
655 raise e