Merge from OSM SO master
[osm/SO.git] / rwcm / plugins / rwconman / rift / tasklets / rwconmantasklet / jujuconf.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 import asyncio
18 import os
19 import re
20 import tempfile
21 import time
22 import yaml
23
24 import rift.mano.utils.juju_api as juju
25 from . import riftcm_config_plugin
26
27
28 # Charm service name accepts only a to z and -.
29 def get_vnf_unique_name(nsr_name, vnfr_short_name, member_vnf_index):
30 name = "{}-{}-{}".format(nsr_name, vnfr_short_name, member_vnf_index)
31 new_name = ''
32 for c in name:
33 if c.isdigit():
34 c = chr(97 + int(c))
35 elif not c.isalpha():
36 c = "-"
37 new_name += c
38 return new_name.lower()
39
40
41 class JujuConfigPlugin(riftcm_config_plugin.RiftCMConfigPluginBase):
42 """
43 Juju implementation of the riftcm_config_plugin.RiftCMConfigPluginBase
44 """
45 def __init__(self, dts, log, loop, project, account):
46 riftcm_config_plugin.RiftCMConfigPluginBase.__init__(self, dts, log, loop,
47 project, account)
48 self._name = account.name
49 self._type = 'juju'
50 self._ip_address = account.juju.ip_address
51 self._port = account.juju.port
52 self._user = account.juju.user
53 self._secret = account.juju.secret
54 self._rift_install_dir = os.environ['RIFT_INSTALL']
55 self._rift_artif_dir = os.environ['RIFT_ARTIFACTS']
56
57 ############################################################
58 # This is wrongfully overloaded with 'juju' private data. #
59 # Really need to separate agent_vnfr from juju vnfr data. #
60 # Currently, this holds agent_vnfr, which has actual vnfr, #
61 # then this juju overloads actual vnfr with its own #
62 # dictionary elemetns (WRONG!!!) #
63 self._juju_vnfs = {}
64 ############################################################
65
66 self._tasks = {}
67 self._api = juju.JujuApi(log, loop,
68 self._ip_address, self._port,
69 self._user, self._secret)
70
71 @property
72 def name(self):
73 return self._name
74
75 @property
76 def agent_type(self):
77 return self._type
78
79 @property
80 def api(self):
81 return self._api
82
83 @property
84 def agent_data(self):
85 return dict(
86 type=self.agent_type,
87 name=self.name,
88 host=self._ip_address,
89 port=self._port,
90 user=self._user,
91 secret=self._secret
92 )
93
94 def vnfr(self, vnfr_id):
95 try:
96 vnfr = self._juju_vnfs[vnfr_id].vnfr
97 except KeyError:
98 self._log.error("jujuCA: Did not find VNFR %s in juju plugin", vnfr_id)
99 return None
100
101 return vnfr
102
103 def get_service_name(self, vnfr_id):
104 vnfr = self.vnfr(vnfr_id)
105 if vnfr and 'vnf_juju_name' in vnfr:
106 return vnfr['vnf_juju_name']
107 return None
108
109 def juju_log(self, level, name, log_str, *args):
110 if name is not None:
111 g_log_str = 'jujuCA:({}) {}'.format(name, log_str)
112 else:
113 g_log_str = 'jujuCA: {}'.format(log_str)
114 getattr(self._log, level)(g_log_str, *args)
115
116 # TBD: Do a better, similar to config manager
117 def xlate(self, tag, tags):
118 # TBD
119 if tag is None:
120 return tag
121 val = tag
122 if re.search('<.*>', tag):
123 self._log.debug("jujuCA: Xlate value %s", tag)
124 try:
125 if tag == '<rw_mgmt_ip>':
126 val = tags['rw_mgmt_ip']
127 except KeyError as e:
128 self._log.info("jujuCA: Did not get a value for tag %s, e=%s",
129 tag, e)
130 return val
131
132 @asyncio.coroutine
133 def notify_create_vlr(self, agent_nsr, agent_vnfr, vld, vlr):
134 """
135 Notification of create VL record
136 """
137 return True
138
139 @asyncio.coroutine
140 def notify_create_vnfr(self, agent_nsr, agent_vnfr):
141 """
142 Notification of create Network VNF record
143 Returns True if configured using config_agent
144 """
145 # Deploy the charm if specified for the vnf
146 self._log.debug("jujuCA: create vnfr nsr=%s vnfr=%s",
147 agent_nsr.name, agent_vnfr.name)
148 self._log.debug("jujuCA: Config = %s",
149 agent_vnfr.vnf_configuration)
150 try:
151 vnf_config = agent_vnfr.vnfr_msg.vnf_configuration
152 self._log.debug("jujuCA: vnf_configuration = %s", vnf_config)
153 if not vnf_config.has_field('juju'):
154 return False
155 charm = vnf_config.juju.charm
156 self._log.debug("jujuCA: charm = %s", charm)
157 except Exception as e:
158 self._log.Error("jujuCA: vnf_configuration error for vnfr {}: {}".
159 format(agent_vnfr.name, e))
160 return False
161
162 # Prepare unique name for this VNF
163 vnf_unique_name = get_vnf_unique_name(agent_nsr.name,
164 agent_vnfr.name,
165 agent_vnfr.member_vnf_index)
166 if vnf_unique_name in self._tasks:
167 self._log.warn("jujuCA: Service %s already deployed",
168 vnf_unique_name)
169
170 vnfr_dict = agent_vnfr.vnfr
171 vnfr_dict.update({'vnf_juju_name': vnf_unique_name,
172 'charm': charm,
173 'nsr_id': agent_nsr.id,
174 'member_vnf_index': agent_vnfr.member_vnf_index,
175 'tags': {},
176 'active': False,
177 'config': vnf_config,
178 'vnfr_name' : agent_vnfr.name})
179 self._log.debug("jujuCA: Charm %s for vnf %s to be deployed as %s",
180 charm, agent_vnfr.name, vnf_unique_name)
181
182 # Find the charm directory
183 try:
184 path = os.path.join(self._rift_artif_dir,
185 'launchpad/libs',
186 agent_vnfr.vnfr_msg.vnfd.id,
187 'charms/trusty',
188 charm)
189 self._log.debug("jujuCA: Charm dir is {}".format(path))
190 if not os.path.isdir(path):
191 self._log.error("jujuCA: Did not find the charm directory at {}".
192 format(path))
193 path = None
194 except Exception as e:
195 self.log.exception(e)
196 return False
197
198 if vnf_unique_name not in self._tasks:
199 self._tasks[vnf_unique_name] = {}
200
201 self._tasks[vnf_unique_name]['deploy'] = self.loop.create_task(
202 self.api.deploy_service(charm, vnf_unique_name, path=path))
203
204 self._log.debug("jujuCA: Deploying service %s",
205 vnf_unique_name)
206
207 return True
208
209 @asyncio.coroutine
210 def notify_instantiate_vnfr(self, agent_nsr, agent_vnfr):
211 """
212 Notification of Instantiate NSR with the passed nsr id
213 """
214 return True
215
216 @asyncio.coroutine
217 def notify_instantiate_vlr(self, agent_nsr, agent_vnfr, vlr):
218 """
219 Notification of Instantiate NSR with the passed nsr id
220 """
221 return True
222
223 @asyncio.coroutine
224 def notify_terminate_nsr(self, agent_nsr, agent_vnfr):
225 """
226 Notification of Terminate the network service
227 """
228 return True
229
230 @asyncio.coroutine
231 def notify_terminate_vnfr(self, agent_nsr, agent_vnfr):
232 """
233 Notification of Terminate the network service
234 """
235 self._log.debug("jujuCA: Terminate VNFr {}, current vnfrs={}".
236 format(agent_vnfr.name, self._juju_vnfs))
237 try:
238 vnfr = agent_vnfr.vnfr
239 service = vnfr['vnf_juju_name']
240
241 self._log.debug ("jujuCA: Terminating VNFr %s, %s",
242 agent_vnfr.name, service)
243 self._tasks[service]['destroy'] = self.loop.create_task(
244 self.api.destroy_service(service)
245 )
246
247 del self._juju_vnfs[agent_vnfr.id]
248 self._log.debug ("jujuCA: current vnfrs={}".
249 format(self._juju_vnfs))
250 if service in self._tasks:
251 tasks = []
252 for action in self._tasks[service].keys():
253 #if self.check_task_status(service, action):
254 tasks.append(action)
255 del tasks
256 except KeyError as e:
257 self._log.debug ("jujuCA: Termiating charm service for VNFr {}, e={}".
258 format(agent_vnfr.name, e))
259 except Exception as e:
260 self._log.error("jujuCA: Exception terminating charm service for VNFR {}: {}".
261 format(agent_vnfr.name, e))
262
263 return True
264
265 @asyncio.coroutine
266 def notify_terminate_vlr(self, agent_nsr, agent_vnfr, vlr):
267 """
268 Notification of Terminate the virtual link
269 """
270 return True
271
272 def check_task_status(self, service, action):
273 #self.log.debug("jujuCA: check task status for %s, %s" % (service, action))
274 try:
275 task = self._tasks[service][action]
276 if task.done():
277 self.log.debug("jujuCA: Task for %s, %s done" % (service, action))
278 e = task.exception()
279 if e:
280 self.log.error("jujuCA: Error in task for {} and {} : {}".
281 format(service, action, e))
282 raise Exception(e)
283 r= task.result()
284 if r:
285 self.log.debug("jujuCA: Task for {} and {}, returned {}".
286 format(service, action,r))
287 return True
288 else:
289 self.log.debug("jujuCA: task {}, {} not done".
290 format(service, action))
291 return False
292 except KeyError as e:
293 self.log.error("jujuCA: KeyError for task for {} and {}: {}".
294 format(service, action, e))
295 except Exception as e:
296 self.log.error("jujuCA: Error for task for {} and {}: {}".
297 format(service, action, e))
298 raise
299 return True
300
301 @asyncio.coroutine
302 def vnf_config_primitive(self, nsr_id, vnfr_id, primitive, output):
303 self._log.debug("jujuCA: VNF config primititve {} for nsr {}, vnfr_id {}".
304 format(primitive, nsr_id, vnfr_id))
305 try:
306 vnfr = self._juju_vnfs[vnfr_id].vnfr
307 except KeyError:
308 self._log.error("jujuCA: Did not find VNFR %s in juju plugin",
309 vnfr_id)
310 return
311
312 output.execution_status = "failed"
313 output.execution_id = ''
314 output.execution_error_details = ''
315
316 try:
317 service = vnfr['vnf_juju_name']
318 vnf_config = vnfr['config']
319 self._log.debug("VNF config %s", vnf_config)
320 configs = vnf_config.service_primitive
321 for config in configs:
322 if config.name == primitive.name:
323 self._log.debug("jujuCA: Found the config primitive %s",
324 config.name)
325 params = {}
326 for parameter in primitive.parameter:
327 if parameter.value:
328 val = self.xlate(parameter.value, vnfr['tags'])
329 # TBD do validation of the parameters
330 data_type = 'STRING'
331 found = False
332 for ca_param in config.parameter:
333 if ca_param.name == parameter.name:
334 data_type = ca_param.data_type
335 found = True
336 break
337 try:
338 if data_type == 'INTEGER':
339 tmp = int(val)
340 val = tmp
341 except Exception as e:
342 pass
343
344 if not found:
345 self._log.warn("jujuCA: Did not find parameter {} for {}".
346 format(parameter, config.name))
347 params.update({parameter.name: val})
348
349 if config.name == 'config':
350 output.execution_id = 'config'
351 if len(params):
352 self._log.debug("jujuCA: applying config with params {} for service {}".
353 format(params, service))
354
355 rc = yield from self.api.apply_config(params, service=service, wait=False)
356
357 if rc:
358 # Mark as pending and check later for the status
359 output.execution_status = "pending"
360 self._log.debug("jujuCA: applied config {} on {}".
361 format(params, service))
362 else:
363 output.execution_status = 'failed'
364 output.execution_error_details = \
365 'Failed to apply config: {}'.format(params)
366 self._log.error("jujuCA: Error applying config {} on service {}".
367 format(params, service))
368 else:
369 self._log.warn("jujuCA: Did not find valid parameters for config : {}".
370 format(primitive.parameter))
371 output.execution_status = "completed"
372 else:
373 self._log.debug("jujuCA: Execute action {} on service {} with params {}".
374 format(config.name, service, params))
375
376 resp = yield from self.api.execute_action(config.name,
377 params,
378 service=service)
379
380 if resp:
381 if 'error' in resp:
382 output.execution_error_details = resp['error']['Message']
383 else:
384 output.execution_id = resp['action']['tag']
385 output.execution_status = resp['status']
386 if output.execution_status == 'failed':
387 output.execution_error_details = resp['message']
388 self._log.debug("jujuCA: execute action {} on service {} returned {}".
389 format(config.name, service, output.execution_status))
390 else:
391 self._log.error("jujuCA: error executing action {} for {} with {}".
392 format(config.name, service, params))
393 output.execution_id = ''
394 output.execution_status = 'failed'
395 output.execution_error_details = "Failed to queue the action"
396 break
397
398 except KeyError as e:
399 self._log.info("VNF %s does not have config primititves, e=%s", vnfr_id, e)
400
401 @asyncio.coroutine
402 def apply_config(self, agent_nsr, agent_vnfr, config, rpc_ip):
403 """ Notification on configuration of an NSR """
404 pass
405
406 @asyncio.coroutine
407 def apply_ns_config(self, agent_nsr, agent_vnfrs, rpc_ip):
408 """
409
410 ###### TBD - This really does not belong here. Looks more like NS level script ####
411 ###### apply_config should be called for a particular VNF only here ###############
412
413 Hook: Runs the user defined script. Feeds all the necessary data
414 for the script thro' yaml file.
415
416 Args:
417 rpc_ip (YangInput_Nsr_ExecNsConfigPrimitive): The input data.
418 nsr (NetworkServiceRecord): Description
419 vnfrs (dict): VNFR ID => VirtualNetworkFunctionRecord
420
421 """
422 def get_meta(agent_nsr):
423 unit_names, initial_params, vnfr_index_map = {}, {}, {}
424
425 for vnfr_id in agent_nsr.vnfr_ids:
426 juju_vnf = self._juju_vnfs[vnfr_id].vnfr
427
428 # Vnfr -> index ref
429 vnfr_index_map[vnfr_id] = juju_vnf['member_vnf_index']
430
431 # Unit name
432 unit_names[vnfr_id] = juju_vnf['vnf_juju_name']
433
434 # Flatten the data for simplicity
435 param_data = {}
436 self._log.debug("Juju Config:%s", juju_vnf['config'])
437 for primitive in juju_vnf['config'].initial_config_primitive:
438 for parameter in primitive.parameter:
439 value = self.xlate(parameter.value, juju_vnf['tags'])
440 param_data[parameter.name] = value
441
442 initial_params[vnfr_id] = param_data
443
444
445 return unit_names, initial_params, vnfr_index_map
446
447 unit_names, init_data, vnfr_index_map = get_meta(agent_nsr)
448
449 # The data consists of 4 sections
450 # 1. Account data
451 # 2. The input passed.
452 # 3. Juju unit names (keyed by vnfr ID).
453 # 4. Initial config data (keyed by vnfr ID).
454 data = dict()
455 data['config_agent'] = dict(
456 name=self._name,
457 host=self._ip_address,
458 port=self._port,
459 user=self._user,
460 secret=self._secret
461 )
462 data["rpc_ip"] = rpc_ip.as_dict()
463 data["unit_names"] = unit_names
464 data["init_config"] = init_data
465 data["vnfr_index_map"] = vnfr_index_map
466
467 tmp_file = None
468 with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
469 tmp_file.write(yaml.dump(data, default_flow_style=True)
470 .encode("UTF-8"))
471
472 self._log.debug("jujuCA: Creating a temp file: {} with input data".format(
473 tmp_file.name))
474
475 # Get the full path to the script
476 script = ''
477 if rpc_ip.user_defined_script[0] == '/':
478 # The script has full path, use as is
479 script = rpc_ip.user_defined_script
480 else:
481 script = os.path.join(self._rift_artif_dir, 'launchpad/libs', agent_nsr.id, 'scripts',
482 rpc_ip.user_defined_script)
483 self.log.debug("jujuCA: Checking for script in %s", script)
484 if not os.path.exists(script):
485 script = os.path.join(self._rift_install_dir, 'usr/bin', rpc_ip.user_defined_script)
486
487 cmd = "{} {}".format(rpc_ip.user_defined_script, tmp_file.name)
488 self._log.debug("jujuCA: Running the CMD: {}".format(cmd))
489
490 coro = asyncio.create_subprocess_shell(cmd, loop=self._loop,
491 stderr=asyncio.subprocess.PIPE)
492 process = yield from coro
493 err = yield from process.stderr.read()
494 task = self._loop.create_task(process.wait())
495
496 return task, err
497
498 @asyncio.coroutine
499 def apply_initial_config(self, agent_nsr, agent_vnfr):
500 """
501 Apply the initial configuration
502 Expect config directives mostly, not actions
503 Actions in initial config may not work based on charm design
504 """
505
506 vnfr = agent_vnfr.vnfr
507 service = vnfr['vnf_juju_name']
508
509 rc = yield from self.api.is_service_up(service=service)
510 if not rc:
511 return False
512
513 action_ids = []
514 try:
515 vnf_cat = agent_vnfr.vnfr_msg
516 if vnf_cat and vnf_cat.mgmt_interface.ip_address:
517 vnfr['tags'].update({'rw_mgmt_ip': vnf_cat.mgmt_interface.ip_address})
518 self._log.debug("jujuCA:(%s) tags: %s", vnfr['vnf_juju_name'], vnfr['tags'])
519
520 config = {}
521 try:
522 for primitive in vnfr['config'].initial_config_primitive:
523 self._log.debug("jujuCA:(%s) Initial config primitive %s", vnfr['vnf_juju_name'], primitive)
524 if primitive.name == 'config':
525 for param in primitive.parameter:
526 if vnfr['tags']:
527 val = self.xlate(param.value, vnfr['tags'])
528 config.update({param.name: val})
529 except KeyError as e:
530 self._log.exception("jujuCA:(%s) Initial config error(%s): config=%s",
531 vnfr['vnf_juju_name'], str(e), config)
532 config = None
533 return False
534
535 if config:
536 self.juju_log('info', vnfr['vnf_juju_name'],
537 "Applying Initial config:%s",
538 config)
539
540 rc = yield from self.api.apply_config(config, service=service)
541 if rc is False:
542 self.log.error("Service {} is in error state".format(service))
543 return False
544
545
546 # Apply any actions specified as part of initial config
547 for primitive in vnfr['config'].initial_config_primitive:
548 if primitive.name != 'config':
549 self._log.debug("jujuCA:(%s) Initial config action primitive %s",
550 vnfr['vnf_juju_name'], primitive)
551 action = primitive.name
552 params = {}
553 for param in primitive.parameter:
554 val = self.xlate(param.value, vnfr['tags'])
555 params.update({param.name: val})
556
557 self._log.info("jujuCA:(%s) Action %s with params %s",
558 vnfr['vnf_juju_name'], action, params)
559
560 resp = yield from self.api.execute_action(action, params,
561 service=service)
562 if 'error' in resp:
563 self._log.error("Applying initial config on {} failed for {} with {}: {}".
564 format(vnfr['vnf_juju_name'], action, params, resp))
565 return False
566
567 action_ids.append(resp['action']['tag'])
568
569 except KeyError as e:
570 self._log.info("Juju config agent(%s): VNFR %s not managed by Juju",
571 vnfr['vnf_juju_name'], agent_vnfr.id)
572 return False
573 except Exception as e:
574 self._log.exception("jujuCA:(%s) Exception juju apply_initial_config for VNFR {}: {}".
575 format(vnfr['vnf_juju_name'], agent_vnfr.id, e))
576 return False
577
578 # Check if all actions completed
579 pending = True
580 while pending:
581 pending = False
582 for act in action_ids:
583 resp = yield from self.api.get_action_status(act)
584 if 'error' in resp:
585 self._log.error("Initial config failed: {}".format(resp))
586 return False
587
588 if resp['status'] == 'failed':
589 self._log.error("Initial config action failed: {}".format(resp))
590 return False
591
592 if resp['status'] == 'pending':
593 pending = True
594
595 return True
596
597 def add_vnfr_managed(self, agent_vnfr):
598 if agent_vnfr.id not in self._juju_vnfs.keys():
599 self._log.info("juju config agent: add vnfr={}/{}".
600 format(agent_vnfr.name, agent_vnfr.id))
601 self._juju_vnfs[agent_vnfr.id] = agent_vnfr
602
603 def is_vnfr_managed(self, vnfr_id):
604 try:
605 if vnfr_id in self._juju_vnfs:
606 return True
607 except Exception as e:
608 self._log.debug("jujuCA: Is VNFR {} managed: {}".
609 format(vnfr_id, e))
610 return False
611
612 @asyncio.coroutine
613 def is_configured(self, vnfr_id):
614 try:
615 agent_vnfr = self._juju_vnfs[vnfr_id]
616 vnfr = agent_vnfr.vnfr
617 if vnfr['active']:
618 return True
619
620 vnfr = self._juju_vnfs[vnfr_id].vnfr
621 service = vnfr['vnf_juju_name']
622 resp = self.api.is_service_active(service=service)
623 self._juju_vnfs[vnfr_id]['active'] = resp
624 self._log.debug("jujuCA: Service state for {} is {}".
625 format(service, resp))
626 return resp
627
628 except KeyError:
629 self._log.debug("jujuCA: VNFR id {} not found in config agent".
630 format(vnfr_id))
631 return False
632 except Exception as e:
633 self._log.error("jujuCA: VNFR id {} is_configured: {}".
634 format(vnfr_id, e))
635 return False
636
637 @asyncio.coroutine
638 def get_config_status(self, agent_nsr, agent_vnfr):
639 """Get the configuration status for the VNF"""
640 rc = 'unknown'
641
642 try:
643 vnfr = agent_vnfr.vnfr
644 service = vnfr['vnf_juju_name']
645 except KeyError:
646 # This VNF is not managed by Juju
647 return rc
648
649 rc = 'configuring'
650
651 if not self.check_task_status(service, 'deploy'):
652 return rc
653
654 try:
655 resp = yield from self.api.get_service_status(service=service)
656 self._log.debug("jujuCA: Get service %s status? %s", service, resp)
657
658 if resp == 'error':
659 return 'error'
660 if resp == 'active':
661 return 'configured'
662 except KeyError:
663 self._log.error("jujuCA: Check unknown service %s status", service)
664 except Exception as e:
665 self._log.error("jujuCA: Caught exception when checking for service is active: %s", e)
666 self._log.exception(e)
667
668 return rc
669
670 def get_action_status(self, execution_id):
671 ''' Get the action status for an execution ID
672 *** Make sure this is NOT a asyncio coroutine function ***
673 '''
674
675 try:
676 self._log.debug("jujuCA: Get action status for {}".format(execution_id))
677 resp = self.api._get_action_status(execution_id)
678 self._log.debug("jujuCA: Action status: {}".format(resp))
679 return resp
680 except Exception as e:
681 self._log.error("jujuCA: Error fetching execution status for %s",
682 execution_id)
683 self._log.exception(e)
684 raise e
685
686 def get_service_status(self, vnfr_id):
687 '''Get the service status, used by job status handle
688 Make sure this is NOT a coroutine
689 '''
690 service = self.get_service_name(vnfr_id)
691 if service is None:
692 self._log.error("jujuCA: VNFR {} not managed by this Juju agent".
693 format(vnfr_id))
694 return None
695
696 # Delay for 3 seconds before checking as config apply takes a
697 # few seconds to transfer to the service
698 time.sleep(3)
699 return self.api._get_service_status(service=service)