博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
TR.py
阅读量:7006 次
发布时间:2019-06-27

本文共 10800 字,大约阅读时间需要 36 分钟。

hot3.png

import osimport sysimport pexpectimport reclass TR:    nodes = None    wan_ip = None    wan_user = None    wan_pwd = None    acs_url = None    acs_user = None    acs_pwd = None    prompt = None    cfg_file = None    output = None        def __init__(self):        self.wan_ip = os.getenv('G_HOST_IP1')        print 'wan_ip:'        print self.wan_ip        self.wan_user = os.getenv('user', 'root')        self.wan_pwd = os.getenv('pwd', '123qaz')        self.acs_url = os.getenv('ACS_ConnectionRequestURL')        print 'acs connection request url'        print self.acs_url        self.acs_user = os.getenv('acs_user', 'actiontec')        self.acs_pwd = os.getenv('acs_pwd', 'actiontec')        self.prompt = ['\]$', '\]# ', 'Permission denied,', pexpect.EOF, pexpect.TIMEOUT]        self.cfg_file = '/tmp/jacs_config_file'        self.output = '/tmp/jacs_output'        def do_gpv(self, *nkw):        self.__init__()        print '=' * 100        print "Begin GPV"        print nkw        output = self.output                self.nodes = list(nkw)        gpv_node = []        check_dict = {}        gpv_format1 = r'^ *[\w\.]+ *$'        gpv_format2 = r'^ *([\w\.]+) *= *([\w\.\,/]+) *$'                # (u'InternetGatewayDevice.LANDevice.1.WLANConfiguration.1.SSID', u'output=/tmp/1')        for key in self.nodes:            print key            m1 = re.match(gpv_format1, key)            m2 = re.match(gpv_format2, key)            if m1:                gpv_node.append(key)            elif m2:                                if key.startswith('output'):                    output = m2.group(2)                else:                    check_dict[m2.group(1)] = m2.group(2)                    gpv_node.append(m2.group(1))            else:                print 'AT_ERROR : GPV parameters Format Error!'                return False        print gpv_node        print check_dict                if not self.Create_GPV_Config_File(gpv_node):            print 'AT_ERROR : Create Config File ERROR!'            return False                  rc = self.copyCfg(self.cfg_file)        if rc:            rc = self.ExcuteJacs(output)            self.readfile(output)            if rc and check_dict:                rc = self.checkGPV(check_dict, output)                if rc:                    print 'AT_INFO : GPV PASS PASS PASS!'                else:                    print 'AT_ERROR : GPV FAIL FAIL FAIL!'                return rc            elif rc:                print 'AT_INFO : GPV PASS PASS PASS!'                return True            else:                print 'AT_ERROR : GPV FAIL FAIL FAIL!'                return False        else:            return False#         def getGPVResult(self, item, output):        ""        if not output:            output = self.output        try:            cmd = 'grep "' + str(item) + '" ' + output            print cmd            rc = os.system(cmd)            print rc            if str(rc) == str(0):                cmd = 'grep "' + str(item) + '" -A1  ' + output + '|grep "
' + "'{print $2}'|awk -F\< '{print $1}'" f = os.popen(cmd) data = f.readline().strip() f.close() else: data = 'NodeNOTExist' except Exception, e: print e print cmd data = 'UNKnown' print data return data def checkGPV(self, dict, output): "" rc = True for item in dict: value = self.getGPVResult(item, output) if value != dict[item]: print 'AT_ERROR : ' + item + ' Check FAIL FAIL FAIL!' print 'AT_ERROR : Actual value:' + value + ',Expect value:' + dict[item] rc = False if rc: print 'AT_INFO : GPV Result Check PASS!' return rc def do_spv(self, *nkw): self.__init__() print '=' * 100 print "Begin SPV" print nkw output = self.output self.nodes = list(nkw) spv_node = [] spv_format = r'^ *([\w\.]+) *= *([\w\.\,/]+) +([\w]+) *$' # (u'InternetGatewayDevice.LANDevice.1.WLANConfiguration.1.SSID=1', u'output=/tmp/1') for key in self.nodes: print key m = re.match(spv_format, key) if m: if key.startswith('output'): output = m.group(2) else: spv_node.append(m.group(1) + '=' + m.group(2) + ' ' + m.group(3)) else: print 'AT_ERROR : SPV parameters Format Error!' return False if not self.Create_SPV_Config_File(spv_node): print 'AT_ERROR : Create Config File ERROR!' return False rc = self.copyCfg(self.cfg_file) if rc: self.ExcuteJacs(output) rc = self.readfile(output, filter='FaultCode') if rc: print 'AT_ERROR : SPV FAIL,Find "FaultCode"!' return False else: rc = self.readfile(output, filter=r'>0') if rc: print 'AT_INFO : SPV PASS PASS PASS PASS PASS!' return True else: print 'AT_ERROR : SPV FAIL ,NOT find
0
!' return False else: return False def ExcuteJacs(self, output): "Excute Jacs" if os.path.exists(output): os.remove(output) try: start_cmd = 'ssh -l ' + self.wan_user + ' ' + self.wan_ip except Exception, e: print start_cmd print e return False child = pexpect.spawn(start_cmd, timeout=60)# child.logfile = sys.stdout try: fout = open(output, 'w') except Exception, e: print e return False child.logfile = fout index = child.expect(['password:', '\(yes\/no\)\?', pexpect.EOF, pexpect.TIMEOUT]) if index == 0 or index == 1: if index == 1: child.sendline('yes') index = child.expect(['password:', pexpect.EOF, pexpect.TIMEOUT]) if not index == 0 : print 'AT_ERROR : Login wan pc Fail Fail!' child.logfile.close() return False child.sendline(self.wan_pwd) index = child.expect(self.prompt) if index == 0 or index == 1: child.sendline('killall jacs') index = child.expect(self.prompt) if index == 0 or index == 1: child.sendline('./jacs -f ' + self.cfg_file)# child.sendline('ls') index = child.expect(self.prompt) if index == 0 or index == 1: child.sendline('exit') child.logfile.close() return True else : print 'AT_ERROR : Run jacs FAIL!' child.logfile.close() return False elif index == 2: print 'AT_ERROR : Login wan pc Fail!' child.logfile.close() return False else: print 'AT_ERROR : TimeOut' child.logfile.close() return False else : print 'AT_ERROR : TimeOut' child.logfile.close() return False def Create_GPV_Config_File(self, node): try: file = open(self.cfg_file, 'w') except Exception, e: print e return False file.writelines('listen 1234' + os.linesep) file.writelines('connect ' + self.acs_url + ' ' + self.acs_user + ' ' + self.acs_pwd + ' NONE' + os.linesep) file.writelines('wait' + os.linesep) file.writelines('rpc cwmp:InformResponse MaxEnvelopes=1' + os.linesep) file.writelines('wait' + os.linesep) gpv_cmd = 'get_params ' for p in node: gpv_cmd += p + ' ' file.writelines(gpv_cmd + os.linesep) file.writelines('wait' + os.linesep) file.writelines('rpc0' + os.linesep) file.writelines('quit' + os.linesep) file.close() self.readfile(self.cfg_file) return True def Create_SPV_Config_File(self, node): try: file = open(self.cfg_file, 'w') except Exception, e: print e return False file.writelines('listen 1234' + os.linesep) file.writelines('connect ' + self.acs_url + ' ' + self.acs_user + ' ' + self.acs_pwd + ' NONE' + os.linesep) file.writelines('wait' + os.linesep) file.writelines('rpc cwmp:InformResponse MaxEnvelopes=1' + os.linesep) file.writelines('wait' + os.linesep) spv_cmd = 'set_params ' for p in node: spv_cmd += p + ' ' file.writelines(spv_cmd + os.linesep) file.writelines('wait' + os.linesep) file.writelines('rpc0' + os.linesep) file.writelines('quit' + os.linesep) file.close() self.readfile(self.cfg_file) return True def copyCfg(self, file): try: start_cmd = 'scp ' + file + ' ' + self.wan_ip + ':' + file print start_cmd except Exception, e: print e return False child = pexpect.spawn(start_cmd, timeout=60) child.logfile = sys.stdout index = child.expect(['password:', '\(yes\/no\)\?', pexpect.EOF, pexpect.TIMEOUT]) if index == 0 or index == 1: if index == 1: child.sendline('yes') index = child.expect(['password:', pexpect.EOF, pexpect.TIMEOUT]) if not index == 0 : print 'AT_ERROR : Login wan pc Fail Fail!' return False child.sendline(self.wan_pwd) index = child.expect(['100%', pexpect.EOF, pexpect.TIMEOUT]) if index == 0 : return True else: print 'AT_ERROR : SCP Config File Fail!' return False def readfile(self, file, filter=''): filter = filter match_flag = False try: file = open(file, 'r') except Exception, e: print e return False for line in file: print line, if filter: m = re.findall(filter, line) if m: match_flag = True file.close() return match_flag# # tr = TR()# tr.GPV('InternetGatewayDevice.1', 'InternetGatewayDevice.WiFi.', output='/tmp/22')

转载于:https://my.oschina.net/xxjbs001/blog/304056

你可能感兴趣的文章
Zabbix 中文显示(学习笔记四)
查看>>
财报显示阿里云“可怕”之处 和AWS等全球头部云厂商还差多少?
查看>>
【对讲机的那点事】你知道吗?对讲机天线决定了对讲机的通信效果
查看>>
Android GreenDao常用注解
查看>>
二十分钟教你如何将区块链应用与函数计算相结合
查看>>
Bootstrap wysiwyg,将富文本数据保存到mysql
查看>>
Linux cron crontab用法(转载)
查看>>
Window 编译Openssl
查看>>
SAMBA 生产环境应用实例
查看>>
WPF - Group分组对ListBox等列表样式的约束
查看>>
Android 位置服务——BaiduLocation的使用
查看>>
源码专题之spring设计模式:单例模式
查看>>
WPF 将图片进行灰度处理
查看>>
用Intellij idea搭建solr调试环境
查看>>
你用过不写代码就能完成一个简单模块的组件么?
查看>>
js循环POST提交添加辅助单位
查看>>
solr6.6教程-从mysql数据库中导入数据(三)
查看>>
[OpenGL] glTranslatef(); glScaled(); glRotatef(); 函数介绍
查看>>
node 模块开发的一个小坑
查看>>
HTML5+CSS3 为 input 添加一键删除按钮
查看>>