python常用小脚本
特立独行的猫a 人气:0前言
日常生活中常会遇到一些小任务,如果人工处理会很麻烦。
用python做些小脚本处理,能够提高不少效率。或者可以把python当工具使用,辅助提高一下办公效率。(比如我常拿python当计算器,计算和字符转换用)
以下总结下个人用到的一些python小脚本留作备忘。
打印16进制字符串
用途:通信报文中的hex数据不好看,可以打印为16进制的字符串显示出来。
#coding=utf-8 #name: myutil.py def print_hex1(s,prev='0x'): for c in s: print '%s%02x' %(prev,ord(c)), print def print_hex(s): for c in s: print '%02x' %(ord(c)), print print 'myutil' def print_hex3(s,prev='0x'): i = 0 for c in s: print '%s%s,' %(prev,s[i:i+2]), i += 2 print
文件合并
之前搞单片机时生成的hex应用程序文件不能直接刷到单片机里,还需要把iap程序合并成一个文件才能烧写到单片机。每次打包麻烦,做个脚本处理:
#path='C:\\Users\\test\\IAP_CZ_v204w.hex' #file=open(path,'r') #for ll in file.readlines() # print ll #coding=gb18030 import time import os def prr(): print 'file combination begin..' path0=os.getcwd() print path0 path=path0 #path1=path0 path2=path0 path+='\\IAP_CZ_v204w.hex' #path1+='\\NC_armStaSystem.hex' path2+='\\' print path s=raw_input('enter file path:') path1=s #path1+='\\NC_armStaSystem.hex' print path1 s=raw_input('enter file name:') path2+=s path2+=time.strftime('_%y%m%d%H%M%S') path2+='.hex' print path2 prr() try: f1=open(path,'r') count=0 for l in f1.readlines(): # print l count+=1 #print count f1.close() f1=open(path,'r') f2=open(path1,'r') f3=open(path2,'w') while(count>1): l=f1.readline() # print l f3.write(l) count-=1 # print count f3.flush() for l in f2.readlines(): f3.write(l) f3.flush() f3.close() print 'combination success!' except Exception,ex: print 'excettion occured!' print ex s=raw_input('press any key to continue...') finally: f1.close() f2.close() s=raw_input('press any key to continue...')
多线程下载图集
网上好看的动漫图集,如果手工下载太费时了。简单分析下网页地址规律,写个多线程脚本搞定。
#!/usr/bin/python # -*- coding: utf-8 -*- # filename: paxel.py '''It is a multi-thread downloading tool It was developed follow axel. Author: volans E-mail: volansw [at] gmail.com ''' import sys import os import time import urllib from threading import Thread local_proxies = {'http': 'http://131.139.58.200:8080'} class AxelPython(Thread, urllib.FancyURLopener): '''Multi-thread downloading class. run() is a vitural method of Thread. ''' def __init__(self, threadname, url, filename, ranges=0, proxies={}): Thread.__init__(self, name=threadname) urllib.FancyURLopener.__init__(self, proxies) self.name = threadname self.url = url self.filename = filename self.ranges = ranges self.downloaded = 0 def run(self): '''vertual function in Thread''' try: self.downloaded = os.path.getsize( self.filename ) except OSError: #print 'never downloaded' self.downloaded = 0 # rebuild start poind self.startpoint = self.ranges[0] + self.downloaded # This part is completed if self.startpoint >= self.ranges[1]: print 'Part %s has been downloaded over.' % self.filename return self.oneTimeSize = 16384 #16kByte/time print 'task %s will download from %d to %d' % (self.name, self.startpoint, self.ranges[1]) self.addheader("Range", "bytes=%d-%d" % (self.startpoint, self.ranges[1])) self.urlhandle = self.open( self.url ) data = self.urlhandle.read( self.oneTimeSize ) while data: filehandle = open( self.filename, 'ab+' ) filehandle.write( data ) filehandle.close() self.downloaded += len( data ) #print "%s" % (self.name) #progress = u'\r...' data = self.urlhandle.read( self.oneTimeSize ) def GetUrlFileSize(url, proxies={}): urlHandler = urllib.urlopen( url, proxies=proxies ) headers = urlHandler.info().headers length = 0 for header in headers: if header.find('Length') != -1: length = header.split(':')[-1].strip() length = int(length) return length def SpliteBlocks(totalsize, blocknumber): blocksize = totalsize/blocknumber ranges = [] for i in range(0, blocknumber-1): ranges.append((i*blocksize, i*blocksize +blocksize - 1)) ranges.append(( blocksize*(blocknumber-1), totalsize -1 )) return ranges def islive(tasks): for task in tasks: if task.isAlive(): return True return False def paxel(url, output, blocks=6, proxies=local_proxies): ''' paxel ''' size = GetUrlFileSize( url, proxies ) ranges = SpliteBlocks( size, blocks ) threadname = [ "thread_%d" % i for i in range(0, blocks) ] filename = [ "tmpfile_%d" % i for i in range(0, blocks) ] tasks = [] for i in range(0,blocks): task = AxelPython( threadname[i], url, filename[i], ranges[i] ) task.setDaemon( True ) task.start() tasks.append( task ) time.sleep( 2 ) while islive(tasks): downloaded = sum( [task.downloaded for task in tasks] ) process = downloaded/float(size)*100 show = u'\rFilesize:%d Downloaded:%d Completed:%.2f%%' % (size, downloaded, process) sys.stdout.write(show) sys.stdout.flush() time.sleep( 0.5 ) filehandle = open( output, 'wb+' ) for i in filename: f = open( i, 'rb' ) filehandle.write( f.read() ) f.close() try: os.remove(i) pass except: pass filehandle.close() if __name__ == '__main__': url = "http://xz1.mm667.com/xz84/images/001.jpg" output = '001.jpg' paxel( url, output, blocks=4, proxies={} )
多线程下载图片
多线程下载图片并存储到指定目录中,若目录不存在则自动创建。
# -*- coding: UTF-8 -*- ''' import re import urllib urls='http://xz5.mm667.com/xz82/images/01.jpg' def getHtml(url): page = urllib.urlopen(url) html = page.read() return html def getImg(html): reg = r'src="(.+?\.jpg)" pic_ext' imgre = re.compile(reg) imglist = imgre.findall(html) x = 0 for imgurl in imglist: urllib.urlretrieve(imgurl,'%s.jpg' % x) x = x + 1 html = getHtml("http://tieba.baidu.com/p/2460150866") getImg(html) ''' import re import urllib import threading import time import socket socket.setdefaulttimeout(30) urls=[] j=0 for i in xrange(1,81): if (i-1)%4 == 0: j += 1 if ((j-1)%5) == 0 : j=1 site='http://xz%d.mm667.com/xz%02d/images/' %(j,i) urls.append(site) print urls[i-1] #print urls ''' urls.append('http://xz1.mm667.com/xz01/images/') urls.append('http://xz1.mm667.com/xz02/images/') urls.append('http://xz1.mm667.com/xz03/images/') urls.append('http://xz1.mm667.com/xz04/images/') urls.append('http://xz1.mm667.com/xz84/images/') urls.append('http://xz2.mm667.com/xz85/images/') urls.append('http://xz3.mm667.com/xz86/images/') urls.append('http://xz1.mm667.com/s/') urls.append('http://xz1.mm667.com/p/') ''' def mkdir(path): # 引入模块 import os # 去除首位空格 path=path.strip() # 去除尾部 \ 符号 path=path.rstrip("\\") # 判断路径是否存在 # 存在 True # 不存在 False isExists=os.path.exists(path) # 判断结果 if not isExists: # 如果不存在则创建目录 print path+u' 创建成功' # 创建目录操作函数 os.makedirs(path) return True else: # 如果目录存在则不创建,并提示目录已存在 print path+u' 目录已存在' return False def cbk(a,b,c): '''''回调函数 @a: 已经下载的数据块 @b: 数据块的大小 @c: 远程文件的大小 ''' per = 100.0 * a * b / c if per > 100: per = 100 print '%.2f%%' % per #url = 'http://www.sina.com.cn' local = 'd:\\mysite\\pic1\\' d=0 mutex = threading.Lock() # mutex1 = threading.Lock() class MyThread(threading.Thread): def __init__(self, url, name): threading.Thread.__init__(self) self.url=url self.name=name def run(self): mutex.acquire() print print 'down from %s' % self.url time.sleep(1) mutex.release() try: urllib.urlretrieve(self.url, self.name) except Exception,e: print e time.sleep(1) urllib.urlretrieve(self.url, self.name) threads=[] for u in urls[84:]: d += 1 local = 'd:\\mysite\\pic1\\%d\\' %d mkdir(local) print 'download begin...' for i in xrange(40): lcal = local url=u url += '%03d.jpg' %i lcal += '%03d.jpg' %i th = MyThread(url,lcal) threads.append(th) th.start() # for t in threads: # t.join() print 'over! download finished'
爬虫抓取信息
#!/usr/bin/env python # -*- coding:utf-8 -*- """ Python爬虫,抓取一卡通相关企业信息 Anthor: yangyongzhen Version: 0.0.2 Date: 2014-12-14 Language: Python2.7.5 Editor: Sublime Text2 """ import urllib2, re, string import threading, Queue, time import sys import os from bs4 import BeautifulSoup #from pprint import pprint reload(sys) sys.setdefaultencoding('utf8') _DATA = [] FILE_LOCK = threading.Lock() SHARE_Q = Queue.Queue() #构造一个不限制大小的的队列 _WORKER_THREAD_NUM = 3 #设置线程的个数 _Num = 0 #总条数 class MyThread(threading.Thread) : def __init__(self, func,num) : super(MyThread, self).__init__() #调用父类的构造函数 self.func = func #传入线程函数逻辑 self.thread_num = num def run(self) : self.func() #print u'线程ID:',self.thread_num def worker() : global SHARE_Q while not SHARE_Q.empty(): url = SHARE_Q.get() #获得任务 my_page = get_page(url) find_data(my_page) #获得当前页面的数据 #write_into_file(temp_data) time.sleep(1) SHARE_Q.task_done() def get_page(url) : """ 根据所给的url爬取网页HTML Args: url: 表示当前要爬取页面的url Returns: 返回抓取到整个页面的HTML(unicode编码) Raises: URLError:url引发的异常 """ try : html = urllib2.urlopen(url).read() my_page = html.decode("gbk",'ignore') #my_page = unicode(html,'utf-8','ignore').encode('utf-8','ignore') #my_page = urllib2.urlopen(url).read().decode("utf8") except urllib2.URLError, e : if hasattr(e, "code"): print "The server couldn't fulfill the request." print "Error code: %s" % e.code elif hasattr(e, "reason"): print "We failed to reach a server. Please check your url and read the Reason" print "Reason: %s" % e.reason return my_page def find_data(my_page) : """ 通过返回的整个网页HTML, 正则匹配名称 Args: my_page: 传入页面的HTML文本用于正则匹配 """ global _Num temp_data = [] items = BeautifulSoup(my_page).find_all("div", style="width:96%;margin:10px;border-bottom:1px #CCC dashed;padding-bottom:10px;") for index, item in enumerate(items) : #print item #print item.h1 #print h.group() #temp_data.append(item) #print item.find(re.compile("^a")) href = item.find(re.compile("^a")) #soup = BeautifulSoup(item) #公司名称 if item.a: data = item.a.string.encode("gbk","ignore") print data temp_data.append(data) goods = item.find_all("div", style="font-size:12px;") #经营产品与联系方式 for i in goods: data = i.get_text().encode("gbk","ignore") temp_data.append(data) print data #b = item.find_all("b") #print b #链接地址 pat = re.compile(r'href="([^"]*)"') h = pat.search(str(item)) if h: #print h.group(0) href = h.group(1) print href temp_data.append(h.group(1)) _Num += 1 #b = item.find_all(text=re.compile("Dormouse")) #pprint(goods) #print href #pat = re.compile(r'title="([^"]*)"') #h = pat.search(str(href)) #if h: #print h.group(1) #temp_data.append(h.group(1)) _DATA.append(temp_data) #headers = {'User-Agent':"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/22.0.1207.1 Safari/537.1"}##浏览器请求头(大部分网站没有这个请求头会报错、请务必加上哦) #all_url = 'http://www.mzitu.com/all' ##开始的URL地址 #start_html = requests.get(all_url, headers=headers) ##使用requests中的get方法来获取all_url(就是:http://www.mzitu.com/all这个地址)的内容 headers为上面设置的请求头、请务必参考requests官方文档解释 #print(start_html.text) ##打印出start_html (请注意,concent是二进制的数据,一般用于下载图片、视频、音频、等多媒体内容是才使用concent, 对于打印网页内容请使用text) def main() : global SHARE_Q threads = [] start = time.clock() douban_url = "http://company.yktworld.com/comapny_search.asp?page={page}" #向队列中放入任务, 真正使用时, 应该设置为可持续的放入任务 for index in xrange(20) : SHARE_Q.put(douban_url.format(page = index * 1)) for i in xrange(_WORKER_THREAD_NUM) : thread = MyThread(worker,i) thread.start() #线程开始处理任务 threads.append(thread) for thread in threads : thread.join() SHARE_Q.join() i = 0 with open("down.txt", "w+") as my_file : for page in _DATA : i += 1 for name in page: my_file.write(name + "\n") print "Spider Successful!!!" end = time.clock() print u'抓取完成!' print u'总页数:',i print u'总条数:',_Num print u'一共用时:',end-start,u'秒' if __name__ == '__main__': main()
爬虫多线程下载电影名称
#!/usr/bin/env python # -*- coding:utf-8 -*- """ Python爬虫 Anthor: yangyongzhen Version: 0.0.2 Date: 2014-12-14 Language: Python2.7.8 Editor: Sublime Text2 """ import urllib2, re, string import threading, Queue, time import sys import os from bs4 import BeautifulSoup reload(sys) sys.setdefaultencoding('utf8') _DATA = [] FILE_LOCK = threading.Lock() SHARE_Q = Queue.Queue() #构造一个不限制大小的的队列 _WORKER_THREAD_NUM = 3 #设置线程的个数 rootpath = os.getcwd()+u'/抓取的内容/' def makedir(path): if not os.path.isdir(path): os.makedirs(path) #创建抓取的根目录 #makedir(rootpath) #显示下载进度 def Schedule(a,b,c): ''''' a:已经下载的数据块 b:数据块的大小 c:远程文件的大小 ''' per = 100.0 * a * b / c if per > 100 : per = 100 print '%.2f%%' % per class MyThread(threading.Thread) : def __init__(self, func) : super(MyThread, self).__init__() #调用父类的构造函数 self.func = func #传入线程函数逻辑 def run(self) : self.func() def worker() : print 'work thread start...\n' global SHARE_Q while not SHARE_Q.empty(): url = SHARE_Q.get() #获得任务 my_page = get_page(url) find_title(my_page) #获得当前页面的电影名 #write_into_file(temp_data) time.sleep(1) SHARE_Q.task_done() def get_page(url) : """ 根据所给的url爬取网页HTML Args: url: 表示当前要爬取页面的url Returns: 返回抓取到整个页面的HTML(unicode编码) Raises: URLError:url引发的异常 """ try : html = urllib2.urlopen(url).read() my_page = html.decode("utf8") #my_page = unicode(html,'utf-8','ignore').encode('utf-8','ignore') #my_page = urllib2.urlopen(url).read().decode("utf8") except urllib2.URLError, e : if hasattr(e, "code"): print "The server couldn't fulfill the request." print "Error code: %s" % e.code elif hasattr(e, "reason"): print "We failed to reach a server. Please check your url and read the Reason" print "Reason: %s" % e.reason return my_page def find_title(my_page) : """ 通过返回的整个网页HTML, 正则匹配前100的电影名称 Args: my_page: 传入页面的HTML文本用于正则匹配 """ temp_data = [] movie_items = BeautifulSoup(my_page).findAll('h1') for index, item in enumerate(movie_items) : #print item #print item.h1 pat = re.compile(r'href="([^"]*)"') h = pat.search(str(item)) if h: #print h.group(0) href = h.group(1) print href temp_data.append(h.group(1)) #print h.group() #temp_data.append(item) #print item.find(re.compile("^a")) href = item.find(re.compile("^a")) #soup = BeautifulSoup(item) if item.a: #print item.a.string temp_data.append(item.a.string) #print href #pat = re.compile(r'title="([^"]*)"') #h = pat.search(str(href)) #if h: #print h.group(1) #temp_data.append(h.group(1)) _DATA.append(temp_data) def main() : global SHARE_Q threads = [] start = time.clock() douban_url = "http://movie.misszm.com/page/{page}" #向队列中放入任务, 真正使用时, 应该设置为可持续的放入任务 for index in xrange(5) : SHARE_Q.put(douban_url.format(page = index * 1)) for i in xrange(_WORKER_THREAD_NUM) : thread = MyThread(worker) thread.start() #线程开始处理任务 threads.append(thread) for thread in threads : thread.join() SHARE_Q.join() with open("movie.txt", "w+") as my_file : for page in _DATA : for movie_name in page: my_file.write(movie_name + "\n") print "Spider Successful!!!" end = time.clock() print u'抓取完成!' print u'一共用时:',end-start,u'秒' if __name__ == '__main__': main()
串口转tcp工具
#coding=utf-8 #author:yangyongzhen #QQ:534117529 #'CardTest TcpServer - Simple Test Card Tool 1.00' import sys,threading,time; import serial; import binascii,encodings; import re; import os; from socket import * from struct import *; #from myutil import *; #name: myutil.py mylock = threading.RLock() Server_IP = '' Srever_Port = '' def print_hex1(s,prev='0x'): for c in s: print '%s%02x' %(prev,ord(c)), print def print_hex(s): for c in s: print '%02x' %(ord(c)), print def hexto_str(s): r ='' for c in s: r += '%02x' %(ord(c)) return r def strto_hex(s): r = s.decode('hex') return r #''代表服务器为localhost #在一个非保留端口号上进行监听 class ComThread: def __init__(self, Port=0): self.l_serial = None; self.alive = False; self.waitEnd = None; self.port = Port; #TCP部分 #self.sockobj = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.connection = None #数据 self.snddata = '' self.rcvdata = '' def waiting(self): if not self.waitEnd is None: self.waitEnd.wait(); def SetStopEvent(self): if not self.waitEnd is None: self.waitEnd.set(); self.alive = False; self.stop(); def start(self): self.l_serial = serial.Serial(); self.l_serial.port = self.port; self.l_serial.baudrate = 115200; self.l_serial.timeout = 2; #秒 self.l_serial.open(); if self.l_serial.isOpen(): self.waitEnd = threading.Event(); self.alive = True; print 'open serial port %d ok!\n' %(self.port+1) print 'baudrate:115200 \n' self.thread_read = None; self.thread_read = threading.Thread(target=self.FirstReader); self.thread_read.setDaemon(1); self.thread_read.start(); self.thread_write = None; self.thread_write = threading.Thread(target=self.FirstWriter); self.thread_write.setDaemon(1); self.thread_write.start(); #TCP部分 self.thread_TcpClient = None; self.thread_TcpClient = threading.Thread(target=self.TcpClient); self.thread_TcpClient.setDaemon(1); self.thread_TcpClient.start(); self.thread_TcpSend = None; self.thread_TcpSend = threading.Thread(target=self.TcpSend); self.thread_TcpSend.setDaemon(1); self.thread_TcpSend.start(); return True; else: return False; def FirstReader(self): while self.alive: # 接收间隔 time.sleep(0.1); try: data = ''; n = self.l_serial.inWaiting(); if n: data = data+self.l_serial.read(n); #for l in xrange(len(data)): #print '%02X' % ord(data[l]), # 发送数据 print u'->请求:' print data; mylock.acquire() self.snddata = data mylock.release() #print_hex(data); # 判断结束 except Exception, ex: print str(ex); self.waitEnd.set(); self.alive = False; def FirstWriter(self): while self.alive: # 接收间隔 time.sleep(0.1); try: #snddata = raw_input('\nenter data send:\n') if self.rcvdata!='': self.l_serial.write(self.rcvdata); print u'-<应答:' print self.rcvdata; mylock.acquire() self.rcvdata = ''; mylock.release() #print_hex(snddata); except Exception, ex: print str(ex); self.waitEnd.set(); self.alive = False; def TcpClient(self): while True: # 接收间隔 time.sleep(0.1); self.connection = socket(AF_INET, SOCK_STREAM); self.connection.connect((Server_IP, int(Server_Port))); print 'Connect to Server OK!'; self.snddata = '' self.rcvdata = '' while True: #读取客户端套接字的下一行 data = self.connection.recv(1024) #如果没有数量的话,那么跳出循环 if not data: break #发送一个回复至客户端 mylock.acquire() self.snddata = '' self.rcvdata = data mylock.release() #connection.send('Echo=>' + data) self.connection.close() self.waitEnd.set(); self.alive = False; def TcpSend(self): while True: # 接收间隔 time.sleep(0.1); while True: time.sleep(0.1); try: if not self.connection is None: if self.snddata != '': self.connection.send(self.snddata) mylock.acquire() self.rcvdata = '' self.snddata = '' mylock.release() except Exception, ex: pass def stop(self): self.alive = False; self.thread_read.join(); if self.l_serial.isOpen(): self.l_serial.close(); #测试用部分 if __name__ == '__main__': print 'Serial to Tcp Tool 1.00\n' print 'Author:yangyongzhen\n' print 'QQ:534117529\n' print 'Copyright (c) **cap 2015-2016.\n' Server_IP = raw_input('please enter ServerIP:') print 'Server_IP: %s' %(Server_IP) Server_Port = raw_input('please enter ServerPort:') print 'Server_Port: %s' %(Server_Port) com =raw_input('please enter com port(1-9):') rt = ComThread(int(com)-1); try: if rt.start(): rt.waiting(); rt.stop(); else: pass; except Exception,se: print str(se); if rt.alive: rt.stop(); os.system("pause") print ''; print 'End OK .'; del rt;
远程读卡器server端
很早之前做过一个远程读卡器工具,原理就是在现场客服电脑上装个python做的tcpserver服务端,操控现场的读卡器。在公司内部做个客户端连接过去,这样实现在公司调试现场的卡片业务。
这个就是服务端工具的实现:
#coding=utf-8 #author:yangyongzhen #QQ:534117529 #'CardTest TcpServer - Simple Test Card Tool 1.00' import sys,threading,time; import serial; import binascii,encodings; import re; import os; from socket import * from struct import *; #from myutil import *; #name: myutil.py mylock = threading.RLock() def print_hex1(s,prev='0x'): for c in s: print '%s%02x' %(prev,ord(c)), print def print_hex(s): for c in s: print '%02x' %(ord(c)), print def hexto_str(s): r ='' for c in s: r += '%02x' %(ord(c)) return r def strto_hex(s): r = s.decode('hex') return r #''代表服务器为localhost #在一个非保留端口号上进行监听 class ComThread: def __init__(self, Port=0): self.l_serial = None; self.alive = False; self.waitEnd = None; self.port = Port; #TCP部分 self.myHost = '' self.myPort = 5050 self.sockobj = socket(AF_INET, SOCK_STREAM) self.connection = None #数据 self.snddata = '' self.rcvdata = '' def waiting(self): if not self.waitEnd is None: self.waitEnd.wait(); def SetStopEvent(self): if not self.waitEnd is None: self.waitEnd.set(); self.alive = False; self.stop(); def start(self): self.l_serial = serial.Serial(); self.l_serial.port = self.port; self.l_serial.baudrate = 115200; self.l_serial.timeout = 2; #秒 self.l_serial.open(); if self.l_serial.isOpen(): self.waitEnd = threading.Event(); self.alive = True; print 'open serial port %d ok!\n' %(self.port+1) print 'baudrate:115200 \n' self.thread_read = None; self.thread_read = threading.Thread(target=self.FirstReader); self.thread_read.setDaemon(1); self.thread_read.start(); self.thread_write = None; self.thread_write = threading.Thread(target=self.FirstWriter); self.thread_write.setDaemon(1); self.thread_write.start(); #TCP部分 self.thread_TcpServer = None; self.thread_TcpServer = threading.Thread(target=self.TcpServer); self.thread_TcpServer.setDaemon(1); self.thread_TcpServer.start(); self.thread_TcpSend = None; self.thread_TcpSend = threading.Thread(target=self.TcpSend); self.thread_TcpSend.setDaemon(1); self.thread_TcpSend.start(); return True; else: return False; def FirstReader(self): while self.alive: # 接收间隔 time.sleep(0.1); try: data = ''; n = self.l_serial.inWaiting(); if n: data = data+self.l_serial.read(n); #for l in xrange(len(data)): #print '%02X' % ord(data[l]), # 发送数据 print 'serial recv:' print data; mylock.acquire() self.snddata = data mylock.release() #print_hex(data); # 判断结束 except Exception, ex: print str(ex); self.waitEnd.set(); self.alive = False; def FirstWriter(self): while self.alive: # 接收间隔 time.sleep(0.1); try: #snddata = raw_input('\nenter data send:\n') if self.rcvdata!='': self.l_serial.write(self.rcvdata); print 'serial send:' print self.rcvdata; mylock.acquire() self.rcvdata = ''; mylock.release() #print_hex(snddata); except Exception, ex: print str(ex); self.waitEnd.set(); self.alive = False; def TcpServer(self): self.sockobj.bind((self.myHost, self.myPort)) self.sockobj.listen(10) print 'TcpServer listen at 5050 oK!\n' print 'Waiting for connect...\n' while True: # 接收间隔 time.sleep(0.1); self.connection, address = self.sockobj.accept() print 'Server connected by', address self.snddata = '' self.rcvdata = '' try: while True: #读取客户端套接字的下一行 data = self.connection.recv(1024) #如果没有数量的话,那么跳出循环 if not data: break #发送一个回复至客户端 mylock.acquire() self.snddata = '' self.rcvdata = data mylock.release() #connection.send('Echo=>' + data) self.connection.close() except Exception, ex: self.connection.close() self.waitEnd.set(); self.alive = False; def TcpSend(self): while True: # 接收间隔 time.sleep(0.1); while True: time.sleep(0.1); try: if not self.connection is None: if self.snddata != '': self.connection.send(self.snddata) mylock.acquire() self.rcvdata = '' self.snddata = '' mylock.release() except Exception, ex: pass def stop(self): self.alive = False; self.thread_read.join(); if self.l_serial.isOpen(): self.l_serial.close(); #测试用部分 if __name__ == '__main__': print 'CardTest TcpServer - Simple Test Card Tool 1.00\n' print 'Author:yangyongzhen\n' print 'QQ:534117529\n' print 'Copyright (c) **** 2015-2016.\n' com =raw_input('please enter com port(1-9):') rt = ComThread(int(com)-1); try: if rt.start(): rt.waiting(); rt.stop(); else: pass; except Exception,se: print str(se); if rt.alive: rt.stop(); os.system("pause") print ''; print 'End OK .'; del rt;
黑客rtcp反向链接
# -*- coding: utf-8 -*- ''' filename:rtcp.py @desc: 利用python的socket端口转发,用于远程维护 如果连接不到远程,会sleep 36s,最多尝试200(即两小时) @usage: ./rtcp.py stream1 stream2 stream为:l:port或c:host:port l:port表示监听指定的本地端口 c:host:port表示监听远程指定的端口 @author: watercloud, zd, knownsec team @web: www.knownsec.com, blog.knownsec.com @date: 2009-7 ''' import socket import sys import threading import time streams = [None, None] # 存放需要进行数据转发的两个数据流(都是SocketObj对象) debug = 1 # 调试状态 0 or 1 def print_hex(s): for c in s: print '%02x' %(ord(c)), print def _usage(): print 'Usage: ./rtcp.py stream1 stream2\nstream : L:port or C:host:port' def _get_another_stream(num): ''' 从streams获取另外一个流对象,如果当前为空,则等待 ''' if num == 0: num = 1 elif num == 1: num = 0 else: raise "ERROR" while True: if streams[num] == 'quit': print("can't connect to the target, quit now!") sys.exit(1) if streams[num] != None: return streams[num] else: time.sleep(1) def _xstream(num, s1, s2): ''' 交换两个流的数据 num为当前流编号,主要用于调试目的,区分两个回路状态用。 ''' try: while True: #注意,recv函数会阻塞,直到对端完全关闭(close后还需要一定时间才能关闭,最快关闭方法是shutdow) buff = s1.recv(1024) if debug > 0: print num,"recv" if len(buff) == 0: #对端关闭连接,读不到数据 print num,"one closed" break s2.sendall(buff) if debug > 0: print num,"sendall" print_hex(buff) except : print num,"one connect closed." try: s1.shutdown(socket.SHUT_RDWR) s1.close() except: pass try: s2.shutdown(socket.SHUT_RDWR) s2.close() except: pass streams[0] = None streams[1] = None print num, "CLOSED" def _server(port, num): ''' 处理服务情况,num为流编号(第0号还是第1号) ''' srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) srv.bind(('0.0.0.0', port)) srv.listen(1) #print 'local listening at port %d' (%(port)) while True: conn, addr = srv.accept() print "connected from:", addr streams[num] = conn # 放入本端流对象 s2 = _get_another_stream(num) # 获取另一端流对象 _xstream(num, conn, s2) def _connect(host, port, num): ''' 处理连接,num为流编号(第0号还是第1号) @note: 如果连接不到远程,会sleep 36s,最多尝试200(即两小时) ''' not_connet_time = 0 wait_time = 36 try_cnt = 199 while True: if not_connet_time > try_cnt: streams[num] = 'quit' print('not connected') return None conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: conn.connect((host, port)) except Exception, e: print ('can not connect %s:%s!' % (host, port)) not_connet_time += 1 time.sleep(wait_time) continue print "connected to %s:%i" % (host, port) streams[num] = conn #放入本端流对象 s2 = _get_another_stream(num) #获取另一端流对象 _xstream(num, conn, s2) if __name__ == '__main__': print 'Tcp to Tcp Tool 1.00\n' print 'Author:yangyongzhen\n' print 'QQ:534117529\n' print 'Copyright (c) Newcapec 2015-2016.\n' Server_IP = raw_input('please enter Server IP:') print 'Server_IP: %s' %(Server_IP) Server_Port = raw_input('please enter Server Port:') print 'Server_Port: %s' %(Server_Port) com =raw_input('please enter Local Port:') tlist = [] # 线程列表,最终存放两个线程对象 #targv = [sys.argv[1], sys.argv[2] ] t = threading.Thread(target=_server, args=(int(com), 0)) tlist.append(t) t = threading.Thread(target=_connect, args=(Server_IP, int(Server_Port), 1)) tlist.append(t) for t in tlist: t.start() for t in tlist: t.join() sys.exit(0)
调用c的动态库示例
# -*- coding:utf8 -*- from ctypes import * from binascii import unhexlify as unhex import os dll = cdll.LoadLibrary('mydll.dll'); print 'begin load mydll..' #key #str1='\x9B\xED\x98\x89\x15\x80\xC3\xB2' str1=unhex('0000556677222238') #data str2=unhex('002d2000000100015566772222383CD881604D0D286A556677222238000020141214181427') #output str3='\x12\x34\x56\x78\x12\x34\x56\x78' pstr1=c_char_p() pstr2=c_char_p() pstr3=c_char_p() pstr1.value=str1 pstr2.value=str2 pstr3.value=str3 dll.CurCalc_DES_MAC64(805306481,pstr1,0,pstr2,13,pstr3) print pstr1 print pstr2 print pstr3 stro= pstr3.value print stro strtemp='' for c in stro: print "%02x" % (ord(c)) strtemp+="{0:02x}".format(ord(c)) print strtemp os.execlp("E:\\RSA.exe",'') s=raw_input('press any key to continue...')
tcp的socket连接报文测试工具
# -*- coding: utf-8 -*- import socket from myutil import * from binascii import unhexlify as unhex from ctypes import * dll = cdll.LoadLibrary('mydll.dll') print 'begin load mydll..' HOST, PORT = "192.168.51.28", 5800 sd ="1234567812345678" # Create a socket (SOCK_STREAM means a TCP socket) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: # Connect to server and send data sock.connect((HOST, int(PORT)) print "Sent1 OK:" print sd # Receive data from the server and shut down received = sock.recv(1024) print "Received:" print_hex(received) print 'received len is 0x%02x' %(len(received)) print 'received data analysis...' re1=received[0:4] print_hex(re1) re1=received[4:6] print_hex(re1) re1=received[6:10] print_hex(re1) re1=received[10:16] print_hex(re1) #pack2 send sock.send(sd2.decode('hex')) print "Sent2 OK:" print sd2 # Receive data from the server and shut down received1 = sock.recv(1024) print "Received1:" print_hex(received1) print 'received1 len is 0x%02x' %(len(received1)) finally: sock.close() s=raw_input('press any key to continue...')
报文拼接与加解密测试
# -*- coding: gb2312 -*- import socket from myutil import * from binascii import unhexlify as unhex from ctypes import * dll = cdll.LoadLibrary('mydll.dll') print 'begin load mydll..' #key key='\xF1\xE2\xD3\xC4\xF1\xE2\xD3\xC4' #output MAC mac='\x00'*8 data='\x00'*8 pkey=c_char_p() pdata=c_char_p() pmac=c_char_p() pkey.value=key pdata.value=data pmac.value=mac #pack1 class pack: pass pk=pack() pk.len='00000032' pk.ID='0001' pk.slnum='00000004' pk.poscode='123456781234' pk.rand='1122334455667788' pk.psam='313233343536' pk.kind='0000' pk.ver='000001' pk.time='20140805135601' pk.mac='06cc571e6d96e12d' data=unhex(pk.len+pk.ID+pk.slnum+pk.poscode+pk.rand+pk.psam+pk.kind+pk.ver+pk.time) #print_hex(data) pdata.value=data #cacl MAC dll.CurCalc_DES_MAC64(805306481,pkey,0,pdata,42,pmac) stro= pmac.value strtemp='' for c in stro: strtemp+="{0:02x}".format(ord(c)) #print strtemp pk.mac=strtemp #data to send sd=pk.len+pk.ID+pk.slnum+pk.poscode+pk.rand+pk.psam+pk.kind+pk.ver+pk.time+pk.mac print 'send1 len is 0x%02x' %(len(sd)/2) print sd #pack2 class pack2: pass pk2=pack2() pk2.len='0000006E' pk2.ID='0012' pk2.slnum='00000005' pk2.fatCode='00' pk2.cardASN='0000000000000000' pk2.cardType='00' pk2.userNO= '0000000000000000' pk2.fileName1='00000000000000000000000000000015' pk2.dataLen1='00' pk2.dataArea1='00000000000000319999990800FB2014080620240806FFFFFFFFFFFFFFFFFFFF' pk2.fileName2='00000000000000000000000000000016' pk2.dataLen2='00' pk2.dataArea2='000003E800FFFF16' pk2.mac='06cc571e6d96e12d' data2=unhex(pk2.len+pk2.ID+pk2.slnum+pk2.fatCode+pk2.cardASN+pk2.cardType+pk2.userNO+pk2.fileName1+pk2.dataLen1+pk2.dataArea1+pk2.fileName2+pk2.dataLen2+pk2.dataArea2) pdata.value=data2 #cacl MAC dll.CurCalc_DES_MAC64(805306481,pkey,0,pdata,102,pmac) stro= pmac.value strtemp='' for c in stro: strtemp+="{0:02x}".format(ord(c)) #print strtemp pk2.mac=strtemp #data to send sd2=pk2.len+pk2.ID+pk2.slnum+pk2.fatCode+pk2.cardASN+pk2.cardType+pk2.userNO+pk2.fileName1+pk2.dataLen1+pk2.dataArea1+pk2.fileName2+pk2.dataLen2+pk2.dataArea2+pk2.mac print 'send2 len is 0x%02x' %(len(sd2)/2) print sd2 #PORT="192.168.60.37" #PORT="localhost" HOST, PORT = "192.168.51.28", 5800 # Create a socket (SOCK_STREAM means a TCP socket) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: # Connect to server and send data sock.connect((HOST, int(PORT)) #data= "123456789" #s = struct.pack('bbb',1,2,3) sock.send(sd.decode('hex')) print "Sent1 OK:" print sd # Receive data from the server and shut down received = sock.recv(1024) print "Received:" print_hex(received) print 'received len is 0x%02x' %(len(received)) print 'received data analysis...' re1=received[0:4] print_hex(re1) re1=received[4:6] print_hex(re1) re1=received[6:10] print_hex(re1) re1=received[10:16] print_hex(re1) #pack2 send sock.send(sd2.decode('hex')) print "Sent2 OK:" print sd2 # Receive data from the server and shut down received1 = sock.recv(1024) print "Received1:" print_hex(received1) print 'received1 len is 0x%02x' %(len(received1)) finally: sock.close() s=raw_input('press any key to continue...')
二进制文件解析工具
# -*- coding: utf-8 -*- from myutil import * from binascii import unhexlify as unhex import os path=os.getcwd() path+='\\rec04.bin' #print path print "begin ans......" f1=open(path,'rb') for i in range(1,35): s=f1.read(280) print "data:",i print_hex(s) print 'read data is:' print_hex(s) recstatadd = 187 print "终端编号:" print_hex(s[recstatadd:recstatadd+10]) print "卡号长度:" print_hex(s[10]) print "卡号: " print_hex(s[11:11+10]) print "持卡序号1+所属地城市代码2+交易地城市代码2" print_hex(s[recstatadd+22:recstatadd+22+5]) print "应用交易计数器" print_hex(s[92:92+2]) print "交易前余额4,交易金额3" print_hex(s[recstatadd+29:recstatadd+29+7]) print "交易日期:" print_hex(s[99:99+3]) print "交易时间:" print_hex(s[44:44+3]) print "终端编号" print_hex(s[21:21+8]) print "商户编号" print_hex(s[21+8:21+8+15]) print "批次号" print_hex(s[5:5+3]) print "应用密文" print_hex(s[47:47+8]) print "授权金额" print_hex(s[103:103+6]) print "其他金额" print_hex(s[115:115+6]) print "终端验证结果" print_hex(s[94:5+94]) print "应用交易计数器" print_hex(s[92:92+4]) print "卡片验证结果" print_hex(s[56:56+32]) print "卡片序列号:" print_hex(s[131]) f1.close()
抓取动漫图片
# -*- coding:utf8 -*- # 2013.12.36 19:41 # 抓取dbmei.com的图片。 from bs4 import BeautifulSoup import os, sys, urllib2,time,random # 创建文件夹 path = os.getcwd() # 获取此脚本所在目录 new_path = os.path.join(path,u'暴走漫画') if not os.path.isdir(new_path): os.mkdir(new_path) def page_loop(page=1): url = 'http://baozoumanhua.com/all/hot/page/%s?sv=1389537379' % page content = urllib2.urlopen(url) soup = BeautifulSoup(content) my_girl = soup.find_all('div',class_='img-wrap') for girl in my_girl: jokes = girl.find('img') link = jokes.get('src') flink = link print flink content2 = urllib2.urlopen(flink).read() #with open(u'暴走漫画'+'/'+time.strftime('%H-%M-%S')+random.choice('qwertyuiopasdfghjklzxcvbnm')+flink[-5:],'wb') as code: #在OSC上现学的 with open(u'暴走漫画'+'/'+flink[-11:],'wb') as code: code.write(content2) page = int(page) + 1 print u'开始抓取下一页' print 'the %s page' % page page_loop(page) page_loop()
抓取网站模板
#!/usr/bin/env python # -*- coding: utf-8 -*- # by yangyongzhen # 2016-12-06 from bs4 import BeautifulSoup import urllib,urllib2,os,time import re rootpath = os.getcwd()+u'/抓取的模板/' def makedir(path): if not os.path.isdir(path): os.makedirs(path) #创建抓取的根目录 makedir(rootpath) #显示下载进度 def Schedule(a,b,c): ''''' a:已经下载的数据块 b:数据块的大小 c:远程文件的大小 ''' per = 100.0 * a * b / c if per > 100 : per = 100 print '%.2f%%' % per def grabHref(url,listhref,localfile): html = urllib2.urlopen(url).read() html = unicode(html,'gb2312','ignore').encode('utf-8','ignore') content = BeautifulSoup(html).findAll('link') myfile = open(localfile,'w') pat = re.compile(r'href="([^"]*)"') pat2 = re.compile(r'http') for item in content: h = pat.search(str(item)) href = h.group(1) if pat2.search(href): ans = href else: ans = url+href listhref.append(ans) myfile.write(ans) myfile.write('\r\n') print ans content = BeautifulSoup(html).findAll('script') pat = re.compile(r'src="([^"]*)"') pat2 = re.compile(r'http') for item in content: h = pat.search(str(item)) if h: href = h.group(1) if pat2.search(href): ans = href else: ans = url+href listhref.append(ans) myfile.write(ans) myfile.write('\r\n') print ans content = BeautifulSoup(html).findAll('a') pat = re.compile(r'href="([^"]*)"') pat2 = re.compile(r'http') for item in content: h = pat.search(str(item)) if h: href = h.group(1) if pat2.search(href): ans = href else: ans = url+href listhref.append(ans) myfile.write(ans) myfile.write('\r\n') print ans myfile.close() def main(): url = "http://192.168.72.140/qdkj/" #采集网页的地址 listhref =[] #链接地址 localfile = 'ahref.txt' #保存链接地址为本地文件,文件名 grabHref(url,listhref,localfile) listhref = list(set(listhref)) #去除链接中的重复地址 curpath = rootpath start = time.clock() for item in listhref: curpath = rootpath name = item.split('/')[-1] fdir = item.split('/')[3:-1] for i in fdir: curpath += i curpath += '/' print curpath makedir(curpath) local = curpath+name urllib.urlretrieve(item, local,Schedule) # 远程保存函数 end = time.clock() print u'模板抓取完成!' print u'一共用时:',end-start,u'秒' if __name__=="__main__": main()
总结
加载全部内容