免费查询微信好友还在不在了~

最近一周经常被一些人发一些检测好友的消息,目测这东西好像是有条件收费的,然后就想想python是不是能写个免费版的好友检测。


640?wx_fmt=jpeg


具体可以看看运行结果:

640?wx_fmt=png


编写脚本会用到以下几个模块

colorama
coloredlogs
humanfriendly
lxml
qrcode
requests
six
requests_toolbelt
pyqrcode
certifi


需要使用pip安装一下。


主要原理是通过request访问网络请求,使用qrcode生成访问返回登陆链接的二维码,然后使用post请求发送消息,当然先有了wxrobot后代码就更加方便了,使用itcaht模块获取所有的好友列表,然后对每个好友发送消息。

【发送消息的时候要注意一下,每个信息对发送时间不能一样,否则会被微信检测出来,然后就不能访问了,所以可以使用random设置间隔时间】


这里上代码,使用itchat的代码稍后会放在github上,大家有兴趣可以访问。


#!/usr/bin/env python
# coding: utf-8
import qrcode
from pyqrcode import QRCode
import urllib.request, urllib.parse, urllib.error
import urllib.request, urllib.error, urllib.parse
import http.cookiejar
import requests
import xml.dom.minidom
import json
import time
import ssl
import re
import sys
import os
import subprocess
import random
import multiprocessing
import platform
import logging
import http.client
from collections import defaultdict
from urllib.parse import urlparse
from lxml import html
from socket import timeout as timeout_error
#import pdb
# for media upload
import mimetypes
from requests_toolbelt.multipart.encoder import MultipartEncoder


def catchKeyboardInterrupt(fn):
def wrapper(*args):
try:
return fn(*args)
except KeyboardInterrupt:
print('\n[*] 强制退出程序')
logging.debug('[*] 强制退出程序')
return wrapper


def _decode_list(data):
rv = []
for item in data:
if isinstance(item, str):
item = item.encode('utf-8')
elif isinstance(item, list):
item = _decode_list(item)
elif isinstance(item, dict):
item = _decode_dict(item)
rv.append(item)
return rv


def _decode_dict(data):
rv = {}
for key, value in data.items():
if isinstance(key, str):
key = key.encode('utf-8')
if isinstance(value, str):
value = value.encode('utf-8')
elif isinstance(value, list):
value = _decode_list(value)
elif isinstance(value, dict):
value = _decode_dict(value)
rv[key] = value
return rv


class WebWeixin(object):

def __str__(self):
description = \
"=========================\n" + \
"[#] Web Weixin\n" + \
"[#] Debug Mode: " + str(self.DEBUG) + "\n" + \
"[#] Uuid: " + self.uuid + "\n" + \
"[#] Uin: " + str(self.uin) + "\n" + \
"[#] Sid: " + self.sid + "\n" + \
"[#] Skey: " + self.skey + "\n" + \
"[#] DeviceId: " + self.deviceId + "\n" + \
"[#] PassTicket: " + self.pass_ticket + "\n" + \
"========================="
return description

def __init__(self):
self.DEBUG = False
self.commandLineQRCode = False
self.uuid = ''
self.base_uri = ''
self.redirect_uri = ''
self.uin = ''
self.sid = ''
self.skey = ''
self.pass_ticket = ''
self.deviceId = 'e' + repr(random.random())[2:17]
self.BaseRequest = {}
self.synckey = ''
self.SyncKey = []
self.User = []
self.MemberList = []
self.ContactList = [] # 好友
self.GroupList = [] # 群
self.GroupMemeberList = [] # 群友
self.PublicUsersList = [] # 公众号/服务号
self.SpecialUsersList = [] # 特殊账号
self.autoReplyMode = False
self.syncHost = ''
self.user_agent = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/48.0.2564.109 Safari/537.36'
self.interactive = False
self.autoOpen = False
self.saveFolder = os.path.join(os.getcwd(), 'saved')
self.saveSubFolders = {'webwxgeticon': 'icons', 'webwxgetheadimg': 'headimgs', 'webwxgetmsgimg': 'msgimgs',
'webwxgetvideo': 'videos', 'webwxgetvoice': 'voices', '_showQRCodeImg': 'qrcodes'}
self.appid = 'wx782c26e4c19acffb'
self.lang = 'zh_CN'
self.lastCheckTs = time.time()
self.memberCount = 0
self.SpecialUsers = ['newsapp', 'fmessage', 'filehelper', 'weibo', 'qqmail', 'fmessage', 'tmessage', 'qmessage', 'qqsync', 'floatbottle', 'lbsapp', 'shakeapp', 'medianote', 'qqfriend', 'readerapp', 'blogapp', 'facebookapp', 'masssendapp', 'meishiapp', 'feedsapp',
'voip', 'blogappweixin', 'weixin', 'brandsessionholder', 'weixinreminder', 'wxid_novlwrv3lqwv11', 'gh_22b87fa7cb3c', 'officialaccounts', 'notification_messages', 'wxid_novlwrv3lqwv11', 'gh_22b87fa7cb3c', 'wxitil', 'userexperience_alarm', 'notification_messages']
self.TimeOut = 20 # 同步最短时间间隔(单位:秒)
self.media_count = -1
self.cookie = http.cookiejar.CookieJar()
opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(self.cookie))
opener.addheaders = [('User-agent', self.user_agent)]
urllib.request.install_opener(opener)

def loadConfig(self, config):
if config['DEBUG']:
self.DEBUG = config['DEBUG']
if config['autoReplyMode']:
self.autoReplyMode = config['autoReplyMode']
if config['user_agent']:
self.user_agent = config['user_agent']
if config['interactive']:
self.interactive = config['interactive']
if config['autoOpen']:
self.autoOpen = config['autoOpen']

def getUUID(self):
url = 'https://login.weixin.qq.com/jslogin'
params = {
'appid': self.appid,
'fun': 'new',
'lang': self.lang,
'_': int(time.time()),
}
#r = requests.get(url=url, params=params)
#r.encoding = 'utf-8'
#data = r.text
data = self._post(url, params, False).decode("utf-8")
if data == '':
return False
regx = r'window.QRLogin.code = (\d+); window.QRLogin.uuid = "(\S+?)"'
pm = re.search(regx, data)
if pm:
code = pm.group(1)
self.uuid = pm.group(2)
return code == '200'
return False
def genQRCode(self):
#return self._showQRCodeImg()
if sys.platform.startswith('win'):
self._showQRCodeImg('win')
elif sys.platform.find('darwin') >= 0:
self._showQRCodeImg('macos')
else:
self._str2qr('https://login.weixin.qq.com/l/' + self.uuid)

def _showQRCodeImg(self, str):
if self.commandLineQRCode:
qrCode = QRCode('https://login.weixin.qq.com/l/' + self.uuid)
self._showCommandLineQRCode(qrCode.text(1))
else:
url = 'https://login.weixin.qq.com/qrcode/' + self.uuid
params = {
't': 'webwx',
'_': int(time.time())
}

data = self._post(url, params, False)
if data == '':
return
QRCODE_PATH = self._saveFile('qrcode.jpg', data, '_showQRCodeImg')
if str == 'win':
os.startfile(QRCODE_PATH)
elif str == 'macos':
subprocess.call(["open", QRCODE_PATH])
else:
return
def _showCommandLineQRCode(self, qr_data, enableCmdQR=2):
try:
b = u'\u2588'
sys.stdout.write(b + '\r')
sys.stdout.flush()
except UnicodeEncodeError:
white = 'MM'
else:
white = b
black = ' '
blockCount = int(enableCmdQR)
if abs(blockCount) == 0:
blockCount = 1
white *= abs(blockCount)
if blockCount < 0:
white, black = black, white
sys.stdout.write(' ' * 50 + '\r')
sys.stdout.flush()
qr = qr_data.replace('0', white).replace('1', black)
sys.stdout.write(qr)
sys.stdout.flush()

def waitForLogin(self, tip=1):
time.sleep(tip)
url = 'https://login.weixin.qq.com/cgi-bin/mmwebwx-bin/login?tip=%s&uuid=%s&_=%s' % (
tip, self.uuid, int(time.time()))
data = self._get(url)
if data == '':
return False
pm = re.search(r"window.code=(\d+);", data)
code = pm.group(1)

if code == '201':
return True
elif code == '200':
pm = re.search(r'window.redirect_uri="(\S+?)";', data)
r_uri = pm.group(1) + '&fun=new'
self.redirect_uri = r_uri
self.base_uri = r_uri[:r_uri.rfind('/')]
return True
elif code == '408':
self._echo('[登陆超时] \n')
else:
self._echo('[登陆异常] \n')
return False
def login(self):
data = self._get(self.redirect_uri)
if data == '':
return False
doc = xml.dom.minidom.parseString(data)
root = doc.documentElement

for node in root.childNodes:
if node.nodeName == 'skey':
self.skey = node.childNodes[0].data
elif node.nodeName == 'wxsid':
self.sid = node.childNodes[0].data
elif node.nodeName == 'wxuin':
self.uin = node.childNodes[0].data
elif node.nodeName == 'pass_ticket':
self.pass_ticket = node.childNodes[0].data

if '' in (self.skey, self.sid, self.uin, self.pass_ticket):
return False
self.BaseRequest = {
'Uin': int(self.uin),
'Sid': self.sid,
'Skey': self.skey,
'DeviceID': self.deviceId,
}
return True
def webwxinit(self):
url = self.base_uri + '/webwxinit?pass_ticket=%s&skey=%s&r=%s' % (
self.pass_ticket, self.skey, int(time.time()))
params = {
'BaseRequest': self.BaseRequest
}
dic = self._post(url, params)
if dic == '':
return False
self.SyncKey = dic['SyncKey']
self.User = dic['User']
# synckey for synccheck
self.synckey = '|'.join(
[str(keyVal['Key']) + '_' + str(keyVal['Val']) for keyVal in self.SyncKey['List']])

return dic['BaseResponse']['Ret'] == 0
def webwxstatusnotify(self):
url = self.base_uri + \
'/webwxstatusnotify?lang=zh_CN&pass_ticket=%s' % (self.pass_ticket)
params = {
'BaseRequest': self.BaseRequest,
"Code": 3,
"FromUserName": self.User['UserName'],
"ToUserName": self.User['UserName'],
"ClientMsgId": int(time.time())
}
dic = self._post(url, params)
if dic == '':
return False
return dic['BaseResponse']['Ret'] == 0
def webwxgetcontact(self):
SpecialUsers = self.SpecialUsers
url = self.base_uri + '/webwxgetcontact?pass_ticket=%s&skey=%s&r=%s' % (
self.pass_ticket, self.skey, int(time.time()))
dic = self._post(url, {})
if dic == '':
return False
self.MemberCount = dic['MemberCount']
self.MemberList = dic['MemberList']
ContactList = self.MemberList[:]
GroupList = self.GroupList[:]
PublicUsersList = self.PublicUsersList[:]
SpecialUsersList = self.SpecialUsersList[:]

for i in range(len(ContactList) - 1, -1, -1):
Contact = ContactList[i]
if Contact['VerifyFlag'] & 8 != 0: # 公众号/服务号
ContactList.remove(Contact)
self.PublicUsersList.append(Contact)
elif Contact['UserName'] in SpecialUsers: # 特殊账号
ContactList.remove(Contact)
self.SpecialUsersList.append(Contact)
elif '@@' in Contact['UserName']: # 群聊
ContactList.remove(Contact)
self.GroupList.append(Contact)
elif Contact['UserName'] == self.User['UserName']: # 自己
ContactList.remove(Contact)
self.ContactList = ContactList

return True
def webwxbatchgetcontact(self):
url = self.base_uri + \
'/webwxbatchgetcontact?type=ex&r=%s&pass_ticket=%s' % (
int(time.time()), self.pass_ticket)
params = {
'BaseRequest': self.BaseRequest,
"Count": len(self.GroupList),
"List": [{"UserName": g['UserName'], "EncryChatRoomId":""} for g in self.GroupList]
}
dic = self._post(url, params)
if dic == '':
return False
# blabla ...
ContactList = dic['ContactList']
ContactCount = dic['Count']
self.GroupList = ContactList

for i in range(len(ContactList) - 1, -1, -1):
Contact = ContactList[i]
MemberList = Contact['MemberList']
for member in MemberList:
self.GroupMemeberList.append(member)
return True
def getNameById(self, id):
url = self.base_uri + \
'/webwxbatchgetcontact?type=ex&r=%s&pass_ticket=%s' % (
int(time.time()), self.pass_ticket)
params = {
'BaseRequest': self.BaseRequest,
"Count": 1,
"List": [{"UserName": id, "EncryChatRoomId": ""}]
}
dic = self._post(url, params)
if dic == '':
return None
# blabla ...
return dic['ContactList']

def testsynccheck(self):
SyncHost = ['wx2.qq.com',
'webpush.wx2.qq.com',
'wx8.qq.com',
'webpush.wx8.qq.com',
'qq.com',
'webpush.wx.qq.com',
'web2.wechat.com',
'webpush.web2.wechat.com',
'wechat.com',
'webpush.web.wechat.com',
'webpush.weixin.qq.com',
'webpush.wechat.com',
'webpush1.wechat.com',
'webpush2.wechat.com',
'webpush.wx.qq.com',
'webpush2.wx.qq.com']
for host in SyncHost:
self.syncHost = host
[retcode, selector] = self.synccheck()
if retcode == '0':
return True
return False
def synccheck(self):
params = {
'r': int(time.time()),
'sid': self.sid,
'uin': self.uin,
'skey': self.skey,
'deviceid': self.deviceId,
'synckey': self.synckey,
'_': int(time.time()),
}
url = 'https://' + self.syncHost + '/cgi-bin/mmwebwx-bin/synccheck?' + urllib.parse.urlencode(params)
data = self._get(url, timeout=5)
if data == '':
return [-1,-1]

pm = re.search(
r'window.synccheck={retcode:"(\d+)",selector:"(\d+)"}', data)
retcode = pm.group(1)
selector = pm.group(2)
return [retcode, selector]

def webwxsync(self):
url = self.base_uri + \
'/webwxsync?sid=%s&skey=%s&pass_ticket=%s' % (
self.sid, self.skey, self.pass_ticket)
params = {
'BaseRequest': self.BaseRequest,
'SyncKey': self.SyncKey,
'rr': ~int(time.time())
}
dic = self._post(url, params)
if dic == '':
return None
if self.DEBUG:
print(json.dumps(dic, indent=4))
(json.dumps(dic, indent=4))

if dic['BaseResponse']['Ret'] == 0:
self.SyncKey = dic['SyncKey']
self.synckey = '|'.join(
[str(keyVal['Key']) + '_' + str(keyVal['Val']) for keyVal in self.SyncKey['List']])
return dic

def webwxsendmsg(self, word, to='filehelper'):
url = self.base_uri + \
'/webwxsendmsg?pass_ticket=%s' % (self.pass_ticket)
clientMsgId = str(int(time.time() * 1000)) + \
str(random.random())[:5].replace('.', '')
params = {
'BaseRequest': self.BaseRequest,
'Msg': {
"Type": 1,
"Content": self._transcoding(word),
"FromUserName": self.User['UserName'],
"ToUserName": to,
"LocalID": clientMsgId,
"ClientMsgId": clientMsgId
}
}
headers = {'content-type': 'application/json; charset=UTF-8'}
data = json.dumps(params, ensure_ascii=False).encode('utf8')
r = requests.post(url, data=data, headers=headers)
dic = r.json()
return dic['BaseResponse']['Ret'] == 0
def webwxuploadmedia(self, image_name):
url = 'https://file2.wx.qq.com/cgi-bin/mmwebwx-bin/webwxuploadmedia?f=json'
# 计数器
self.media_count = self.media_count + 1
# 文件名
file_name = image_name
# MIME格式
# mime_type = application/pdf, image/jpeg, image/png, etc.
mime_type = mimetypes.guess_type(image_name, strict=False)[0]
# 微信识别的文档格式,微信服务器应该只支持两种类型的格式。pic和doc
# pic格式,直接显示。doc格式则显示为文件。
media_type = 'pic' if mime_type.split('/')[0] == 'image' else 'doc'
# 上一次修改日期
lastModifieDate = 'Thu Mar 17 2016 00:55:10 GMT+0800 (CST)'
# 文件大小
file_size = os.path.getsize(file_name)
# PassTicket
pass_ticket = self.pass_ticket
# clientMediaId
client_media_id = str(int(time.time() * 1000)) + \
str(random.random())[:5].replace('.', '')
# webwx_data_ticket
webwx_data_ticket = ''
for item in self.cookie:
if item.name == 'webwx_data_ticket':
webwx_data_ticket = item.value
break
if (webwx_data_ticket == ''):
return "None Fuck Cookie"
uploadmediarequest = json.dumps({
"BaseRequest": self.BaseRequest,
"ClientMediaId": client_media_id,
"TotalLen": file_size,
"StartPos": 0,
"DataLen": file_size,
"MediaType": 4
}, ensure_ascii=False).encode('utf8')

multipart_encoder = MultipartEncoder(
fields={
'id': 'WU_FILE_' + str(self.media_count),
'name': file_name,
'type': mime_type,
'lastModifieDate': lastModifieDate,
'size': str(file_size),
'mediatype': media_type,
'uploadmediarequest': uploadmediarequest,
'webwx_data_ticket': webwx_data_ticket,
'pass_ticket': pass_ticket,
'filename': (file_name, open(file_name, 'rb'), mime_type.split('/')[1])
},
boundary='-----------------------------1575017231431605357584454111'
)

headers = {
'Host': 'file2.wx.qq.com',
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.10; rv:42.0) Gecko/20100101 Firefox/42.0',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate',
'Referer': 'https://wx2.qq.com/',
'Content-Type': multipart_encoder.content_type,
'Origin': 'https://wx2.qq.com',
'Connection': 'keep-alive',
'Pragma': 'no-cache',
'Cache-Control': 'no-cache'
}

r = requests.post(url, data=multipart_encoder, headers=headers)
response_json = r.json()
if response_json['BaseResponse']['Ret'] == 0:
return response_json
return None
def webwxsendmsgimg(self, user_id, media_id):
url = 'https://wx2.qq.com/cgi-bin/mmwebwx-bin/webwxsendmsgimg?fun=async&f=json&pass_ticket=%s' % self.pass_ticket
clientMsgId = str(int(time.time() * 1000)) + \
str(random.random())[:5].replace('.', '')
data_json = {
"BaseRequest": self.BaseRequest,
"Msg": {
"Type": 3,
"MediaId": media_id,
"FromUserName": self.User['UserName'],
"ToUserName": user_id,
"LocalID": clientMsgId,
"ClientMsgId": clientMsgId
}
}
headers = {'content-type': 'application/json; charset=UTF-8'}
data = json.dumps(data_json, ensure_ascii=False).encode('utf8')
r = requests.post(url, data=data, headers=headers)
dic = r.json()
return dic['BaseResponse']['Ret'] == 0
def webwxsendmsgemotion(self, user_id, media_id):
url = 'https://wx2.qq.com/cgi-bin/mmwebwx-bin/webwxsendemoticon?fun=sys&f=json&pass_ticket=%s' % self.pass_ticket
clientMsgId = str(int(time.time() * 1000)) + \
str(random.random())[:5].replace('.', '')
data_json = {
"BaseRequest": self.BaseRequest,
"Msg": {
"Type": 47,
"EmojiFlag": 2,
"MediaId": media_id,
"FromUserName": self.User['UserName'],
"ToUserName": user_id,
"LocalID": clientMsgId,
"ClientMsgId": clientMsgId
}
}
headers = {'content-type': 'application/json; charset=UTF-8'}
data = json.dumps(data_json, ensure_ascii=False).encode('utf8')
r = requests.post(url, data=data, headers=headers)
dic = r.json()
if self.DEBUG:
print(json.dumps(dic, indent=4))
logging.debug(json.dumps(dic, indent=4))
return dic['BaseResponse']['Ret'] == 0
def _saveFile(self, filename, data, api=None):
fn = filename
if self.saveSubFolders[api]:
dirName = os.path.join(self.saveFolder, self.saveSubFolders[api])
if not os.path.exists(dirName):
os.makedirs(dirName)
fn = os.path.join(dirName, filename)
logging.debug('Saved file: %s' % fn)
with open(fn, 'wb') as f:
f.write(data)
f.close()
return fn

def webwxgeticon(self, id):
url = self.base_uri + \
'/webwxgeticon?username=%s&skey=%s' % (id, self.skey)
data = self._get(url)
if data == '':
return ''
fn = 'img_' + id + '.jpg'
return self._saveFile(fn, data, 'webwxgeticon')

def webwxgetheadimg(self, id):
url = self.base_uri + \
'/webwxgetheadimg?username=%s&skey=%s' % (id, self.skey)
data = self._get(url)
if data == '':
return ''
fn = 'img_' + id + '.jpg'
return self._saveFile(fn, data, 'webwxgetheadimg')

def webwxgetmsgimg(self, msgid):
url = self.base_uri + \
'/webwxgetmsgimg?MsgID=%s&skey=%s' % (msgid, self.skey)
data = self._get(url)
if data == '':
return ''
fn = 'img_' + msgid + '.jpg'
return self._saveFile(fn, data, 'webwxgetmsgimg')

# Not work now for weixin haven't support this API
def webwxgetvideo(self, msgid):
url = self.base_uri + \
'/webwxgetvideo?msgid=%s&skey=%s' % (msgid, self.skey)
data = self._get(url, api='webwxgetvideo')
if data == '':
return ''
fn = 'video_' + msgid + '.mp4'
return self._saveFile(fn, data, 'webwxgetvideo')

def webwxgetvoice(self, msgid):
url = self.base_uri + \
'/webwxgetvoice?msgid=%s&skey=%s' % (msgid, self.skey)
data = self._get(url, api='webwxgetvoice')
if data == '':
return ''
fn = 'voice_' + msgid + '.mp3'
return self._saveFile(fn, data, 'webwxgetvoice')

def getGroupName(self, id):
name = '未知群'
for member in self.GroupList:
if member['UserName'] == id:
name = member['NickName']
if name == '未知群':
# 现有群里面查不到
GroupList = self.getNameById(id)
for group in GroupList:
self.GroupList.append(group)
if group['UserName'] == id:
name = group['NickName']
MemberList = group['MemberList']
for member in MemberList:
self.GroupMemeberList.append(member)
return name

def getUserRemarkName(self, id):
name = '未知群' if id[:2] == '@@' else '陌生人'
if id == self.User['UserName']:
return self.User['NickName'] # 自己
if id[:2] == '@@':
# 群
name = self.getGroupName(id)
else:
# 特殊账号
for member in self.SpecialUsersList:
if member['UserName'] == id:
name = member['RemarkName'] if member[
'RemarkName'] else member['NickName']

# 公众号或服务号
for member in self.PublicUsersList:
if member['UserName'] == id:
name = member['RemarkName'] if member[
'RemarkName'] else member['NickName']

# 直接联系人
for member in self.ContactList:
if member['UserName'] == id:
name = member['RemarkName'] if member[
'RemarkName'] else member['NickName']
# 群友
for member in self.GroupMemeberList:
if member['UserName'] == id:
name = member['DisplayName'] if member[
'DisplayName'] else member['NickName']

if name == '未知群' or name == '陌生人':
logging.debug(id)
return name

def getUSerID(self, name):
for member in self.MemberList:
if name == member['RemarkName'] or name == member['NickName']:
return member['UserName']
return None
def _showMsg(self, message):

srcName = None
dstName = None
groupName = None
content = None
msg = message
logging.debug(msg)

if msg['raw_msg']:
srcName = self.getUserRemarkName(msg['raw_msg']['FromUserName'])
dstName = self.getUserRemarkName(msg['raw_msg']['ToUserName'])
content = msg['raw_msg']['Content'].replace(
'&lt;', '<').replace('&gt;', '>')
message_id = msg['raw_msg']['MsgId']

if content.find('http://weixin.qq.com/cgi-bin/redirectforward?args=') != -1:
# 地理位置消息
data = self._get(content)
if data == '':
return
data.decode('gbk').encode('utf-8')
pos = self._searchContent('title', data, 'xml')
temp = self._get(content)
if temp == '':
return
tree = html.fromstring(temp)
url = tree.xpath('//html/body/div/img')[0].attrib['src']

for item in urlparse(url).query.split('&'):
if item.split('=')[0] == 'center':
loc = item.split('=')[-1:]

content = '%s 发送了一个 位置消息 - 我在 [%s](%s) @ %s]' % (
srcName, pos, url, loc)

if msg['raw_msg']['ToUserName'] == 'filehelper':
# 文件传输助手
dstName = '文件传输助手'
if msg['raw_msg']['FromUserName'][:2] == '@@':
# 接收到来自群的消息
if ":<br/>" in content:
[people, content] = content.split(':<br/>', 1)
groupName = srcName
srcName = self.getUserRemarkName(people)
dstName = 'GROUP'
else:
groupName = srcName
srcName = 'SYSTEM'
elif msg['raw_msg']['ToUserName'][:2] == '@@':
# 自己发给群的消息
groupName = dstName
dstName = 'GROUP'
# 收到了红包
if content == '收到红包,请在手机上查看':
msg['message'] = content

# 指定了消息内容
if 'message' in list(msg.keys()):
content = msg['message']

if groupName != None:
print('%s |%s| %s -> %s: %s' % (message_id, groupName.strip(), srcName.strip(), dstName.strip(), content.replace('<br/>', '\n')))
logging.info('%s |%s| %s -> %s: %s' % (message_id, groupName.strip(),
srcName.strip(), dstName.strip(), content.replace('<br/>', '\n')))
else:
print('%s %s -> %s: %s' % (message_id, srcName.strip(), dstName.strip(), content.replace('<br/>', '\n')))
logging.info('%s %s -> %s: %s' % (message_id, srcName.strip(),
dstName.strip(), content.replace('<br/>', '\n')))

def handleMsg(self, r):
for msg in r['AddMsgList']:
print('[*] 你有新的消息,请注意查收')
logging.debug('[*] 你有新的消息,请注意查收')

if self.DEBUG:
fn = 'msg' + str(int(random.random() * 1000)) + '.json'
with open(fn, 'w') as f:
f.write(json.dumps(msg))
print('[*] 该消息已储存到文件: ' + fn)
logging.debug('[*] 该消息已储存到文件: %s' % (fn))

msgType = msg['MsgType']
name = self.getUserRemarkName(msg['FromUserName'])
content = msg['Content'].replace('&lt;', '<').replace('&gt;', '>')
msgid = msg['MsgId']

if msgType == 1:
raw_msg = {'raw_msg': msg}
self._showMsg(raw_msg)
#自己加的代码-------------------------------------------#
#if self.autoReplyRevokeMode:
# store
#自己加的代码-------------------------------------------#
if self.autoReplyMode:
ans = self._xiaodoubi(content) + '\n[微信机器人自动回复]'
if self.webwxsendmsg(ans, msg['FromUserName']):
print('自动回复: ' + ans)
logging.info('自动回复: ' + ans)
else:
print('自动回复失败')
logging.info('自动回复失败')
elif msgType == 3:
image = self.webwxgetmsgimg(msgid)
raw_msg = {'raw_msg': msg,
'message': '%s 发送了一张图片: %s' % (name, image)}
self._showMsg(raw_msg)
self._safe_open(image)
elif msgType == 34:
voice = self.webwxgetvoice(msgid)
raw_msg = {'raw_msg': msg,
'message': '%s 发了一段语音: %s' % (name, voice)}
self._showMsg(raw_msg)
self._safe_open(voice)
elif msgType == 42:
info = msg['RecommendInfo']
print('%s 发送了一张名片:' % name)
print('=========================')
print('= 昵称: %s' % info['NickName'])
print('= 微信号: %s' % info['Alias'])
print('= 地区: %s %s' % (info['Province'], info['City']))
print('= 性别: %s' % ['未知', '男', '女'][info['Sex']])
print('=========================')
raw_msg = {'raw_msg': msg, 'message': '%s 发送了一张名片: %s' % (
name.strip(), json.dumps(info))}
self._showMsg(raw_msg)
elif msgType == 47:
url = self._searchContent('cdnurl', content)
raw_msg = {'raw_msg': msg,
'message': '%s 发了一个动画表情,点击下面链接查看: %s' % (name, url)}
self._showMsg(raw_msg)
self._safe_open(url)
elif msgType == 49:
appMsgType = defaultdict(lambda: "")
appMsgType.update({5: '链接', 3: '音乐', 7: '微博'})
print('%s 分享了一个%s:' % (name, appMsgType[msg['AppMsgType']]))
print('=========================')
print('= 标题: %s' % msg['FileName'])
print('= 描述: %s' % self._searchContent('des', content, 'xml'))
print('= 链接: %s' % msg['Url'])
print('= 来自: %s' % self._searchContent('appname', content, 'xml'))
print('=========================')
card = {
'title': msg['FileName'],
'description': self._searchContent('des', content, 'xml'),
'url': msg['Url'],
'appname': self._searchContent('appname', content, 'xml')
}
raw_msg = {'raw_msg': msg, 'message': '%s 分享了一个%s: %s' % (
name, appMsgType[msg['AppMsgType']], json.dumps(card))}
self._showMsg(raw_msg)
elif msgType == 51:
raw_msg = {'raw_msg': msg, 'message': '[*] 成功获取联系人信息'}
self._showMsg(raw_msg)
elif msgType == 62:
video = self.webwxgetvideo(msgid)
raw_msg = {'raw_msg': msg,
'message': '%s 发了一段小视频: %s' % (name, video)}
self._showMsg(raw_msg)
self._safe_open(video)
elif msgType == 10002:
raw_msg = {'raw_msg': msg, 'message': '%s 撤回了一条消息' % name}
self._showMsg(raw_msg)
else:
logging.debug('[*] 该消息类型为: %d,可能是表情,图片, 链接或红包: %s' %
(msg['MsgType'], json.dumps(msg)))
raw_msg = {
'raw_msg': msg, 'message': '[*] 该消息类型为: %d,可能是表情,图片, 链接或红包' % msg['MsgType']}
self._showMsg(raw_msg)

def listenMsgMode(self):
print('[*] 进入消息监听模式 ... 成功')
logging.debug('[*] 进入消息监听模式 ... 成功')
self._run('[*] 进行同步线路测试 ... ', self.testsynccheck)
playWeChat = 0
redEnvelope = 0
while True:
self.lastCheckTs = time.time()
[retcode, selector] = self.synccheck()
if self.DEBUG:
print('retcode: %s, selector: %s' % (retcode, selector))
logging.debug('retcode: %s, selector: %s' % (retcode, selector))
if retcode == '1100':
print('[*] 你在手机上登出了微信,债见')
logging.debug('[*] 你在手机上登出了微信,债见')
break
if retcode == '1101':
print('[*] 你在其他地方登录了 WEB 版微信,债见')
logging.debug('[*] 你在其他地方登录了 WEB 版微信,债见')
break
elif retcode == '0':
if selector == '2':
r = self.webwxsync()
if r is not None:
self.handleMsg(r)
elif selector == '6':
# TODO
redEnvelope += 1
print('[*] 收到疑似红包消息 %d 次' % redEnvelope)
logging.debug('[*] 收到疑似红包消息 %d 次' % redEnvelope)
elif selector == '7':
playWeChat += 1
print('[*] 你在手机上玩微信被我发现了 %d 次' % playWeChat)
logging.debug('[*] 你在手机上玩微信被我发现了 %d 次' % playWeChat)
r = self.webwxsync()
elif selector == '0':
time.sleep(1)
if (time.time() - self.lastCheckTs) <= 20:
time.sleep(time.time() - self.lastCheckTs)

def sendMsg(self, name, word, isfile=False):
id = self.getUSerID(name)
if id:
if isfile:
with open(word, 'r') as f:
for line in f.readlines():
line = line.replace('\n', '')
self._echo('-> ' + name + ': ' + line)
if self.webwxsendmsg(line, id):
print(' [成功]')
else:
print(' [失败]')
time.sleep(1)
else:
if self.webwxsendmsg(word, id):
print('[*] 消息发送成功')
logging.debug('[*] 消息发送成功')
else:
print('[*] 消息发送失败')
logging.debug('[*] 消息发送失败')
else:
print('[*] 此用户不存在')
logging.debug('[*] 此用户不存在')

def sendMsgToAll(self, word):
for contact in self.ContactList:
name = contact['RemarkName'] if contact[
'RemarkName'] else contact['NickName']
id = contact['UserName']
self._echo('-> ' + name + ': ' + word)
if self.webwxsendmsg(word, id):
print(' [成功]')
else:
print(' [失败]')
time.sleep(1)

def sendImg(self, name, file_name):
response = self.webwxuploadmedia(file_name)
media_id = ""
if response is not None:
media_id = response['MediaId']
user_id = self.getUSerID(name)
response = self.webwxsendmsgimg(user_id, media_id)

def sendEmotion(self, name, file_name):
response = self.webwxuploadmedia(file_name)
media_id = ""
if response is not None:
media_id = response['MediaId']
user_id = self.getUSerID(name)
response = self.webwxsendmsgemotion(user_id, media_id)

@catchKeyboardInterrupt
def start(self):
print("---------------------------------------------------------------------------------------------\n")
print("本工具主要用于测试微信好友是否被删除,通过发送隐藏的代码好友,同时不会打扰好友\n")
print("运行过程中如果长时间没有反应,请按Enter/重启程序\n")
print("结果会保存在当前目录下的weixin_result.txt文件中,请确认程序权限\n")
print("注意:开发对工具使用者不负任何责任,使用工具发生任何问题与开发者无关,如果不同意请勿运行此工具\n")
print("---------------------------------------------------------------------------------------------\n")
input("请按任意键开始运行程序:")

self._echo('[*] 微信网页版 ... 开动')
print()
logging.debug('[*] 微信网页版 ... 开动')
while True:
self._run('[*] 正在获取 uuid ... ', self.getUUID)
self._echo('[*] 正在获取二维码 ... 成功')
print()
logging.debug('[*] 微信网页版 ... 开动')
self.genQRCode()
print('[*] 请使用微信扫描二维码以登录 ... ')
if not self.waitForLogin():
continue
print('[*] 请在手机上点击确认以登录 ... ')
if not self.waitForLogin(0):
continue
break
self._run('[*] 正在登录 ... ', self.login)
self._run('[*] 微信初始化 ... ', self.webwxinit)
self._run('[*] 开启状态通知 ... ', self.webwxstatusnotify)
self._run('[*] 获取联系人 ... ', self.webwxgetcontact)
self._echo('[*] 应有 %s 个联系人,读取到联系人 %d 个' %
(self.MemberCount, len(self.MemberList)))
print()
self._echo('[*] 共有 %d 个群 | %d 个直接联系人 | %d 个特殊账号 | %d 公众号或服务号\n' % (len(self.GroupList),
len(self.ContactList), len(self.SpecialUsersList), len(self.PublicUsersList)))
test_result = []
for i in self.ContactList:
try:
message = self.webwxsendmsg("好久不见జ్ఞ ా", i['UserName'])
print("正在测试[{}],发送消息:[{}]".format(i.get('RemarkName') if i.get('RemarkName') else i.get('NickName'), message))
if not message:
print("发送消息太频繁,等10秒钟\n")
time.sleep(10)
send_result = self.webwxsync()
if send_result and send_result.get("AddMsgList"):
for i in send_result["AddMsgList"]:
if "你还不是他(她)朋友" in i.get("Content", ""):
print(i.get("Content", ""))
test_result.append(i.get("Content", ""))
with open("weixin_result.txt", "a", encoding="utf-8") as f:
f.write("{}\r\n".format(i.get("Content", "")))
except Exception as e:
print("测试[{}]失败,message:[{}]".format(i.get('RemarkName') if i.get('RemarkName') else i.get('NickName'), e))
time.sleep(random.randrange(2, 5))









def _safe_open(self, path):
if self.autoOpen:
if platform.system() == "Linux":
os.system("xdg-open %s &" % path)
else:
os.system('open %s &' % path)

def _run(self, str, func, *args):
self._echo(str)
if func(*args):
print('成功')
logging.debug('%s... 成功' % (str))
else:
print('失败\n[*] 退出程序')
logging.debug('%s... 失败' % (str))
logging.debug('[*] 退出程序')
exit()

def _echo(self, str):
sys.stdout.write(str)
sys.stdout.flush()

def _printQR(self, mat):
for i in mat:
BLACK = '\033[40m \033[0m'
WHITE = '\033[47m \033[0m'
print(''.join([BLACK if j else WHITE for j in i]))

def _str2qr(self, str):
print(str)
qr = qrcode.QRCode()
qr.border = 1
qr.add_data(str)
qr.make()
# img = qr.make_image()
# img.save("qrcode.png")
#mat = qr.get_matrix()
#self._printQR(mat) # qr.print_tty() or qr.print_ascii()
qr.print_ascii(invert=True)

def _transcoding(self, data):
if not data:
return data
result = None
if type(data) == str:
result = data
elif type(data) == str:
result = data.decode('utf-8')
return result

def _get(self, url: object, api: object = None, timeout: object = None) -> object:
request = urllib.request.Request(url=url)
request.add_header('Referer', 'https://wx.qq.com/')
if api == 'webwxgetvoice':
request.add_header('Range', 'bytes=0-')
if api == 'webwxgetvideo':
request.add_header('Range', 'bytes=0-')
try:
response = urllib.request.urlopen(request, timeout=timeout) if timeout else urllib.request.urlopen(request)
if api == 'webwxgetvoice' or api == 'webwxgetvideo':
data = response.read()
else:
data = response.read().decode('utf-8')
logging.debug(url)
return data
except urllib.error.HTTPError as e:
logging.error('HTTPError = ' + str(e.code))
except urllib.error.URLError as e:
logging.error('URLError = ' + str(e.reason))
except http.client.HTTPException as e:
logging.error('HTTPException')
except timeout_error as e:
pass
except ssl.CertificateError as e:
pass
except Exception:
import traceback
logging.error('generic exception: ' + traceback.format_exc())
return ''
def _post(self, url: object, params: object, jsonfmt: object = True) -> object:
if jsonfmt:
data = (json.dumps(params)).encode()

request = urllib.request.Request(url=url, data=data)
request.add_header(
'ContentType', 'application/json; charset=UTF-8')
else:
request = urllib.request.Request(url=url, data=urllib.parse.urlencode(params).encode(encoding='utf-8'))


try:
response = urllib.request.urlopen(request)
data = response.read()
if jsonfmt:
return json.loads(data.decode('utf-8') )#object_hook=_decode_dict)
return data
except urllib.error.HTTPError as e:
logging.error('HTTPError = ' + str(e.code))
except urllib.error.URLError as e:
logging.error('URLError = ' + str(e.reason))
except http.client.HTTPException as e:
logging.error('HTTPException')
except Exception:
import traceback
logging.error('generic exception: ' + traceback.format_exc())

return ''
def _xiaodoubi(self, word):
url = 'http://www.xiaodoubi.com/bot/chat.php'
try:
r = requests.post(url, data={'chat': word})
return r.content
except:
return "让我一个人静静 T_T..."
def _simsimi(self, word):
key = ''
url = 'http://sandbox.api.simsimi.com/request.p?key=%s&lc=ch&ft=0.0&text=%s' % (
key, word)
r = requests.get(url)
ans = r.json()
if ans['result'] == '100':
return ans['response']
else:
return '你在说什么,风太大听不清列'
def _searchContent(self, key, content, fmat='attr'):
if fmat == 'attr':
pm = re.search(key + '\s?=\s?"([^"<]+)"', content)
if pm:
return pm.group(1)
elif fmat == 'xml':
pm = re.search('<{0}>([^<]+)</{0}>'.format(key), content)
if not pm:
pm = re.search(
'<{0}><\!\[CDATA\[(.*?)\]\]></{0}>'.format(key), content)
if pm:
return pm.group(1)
return '未知'
class UnicodeStreamFilter:

def __init__(self, target):
self.target = target
self.encoding = 'utf-8'
self.errors = 'replace'
self.encode_to = self.target.encoding

def write(self, s):
if type(s) == str:
s = s.encode().decode('utf-8')
s = s.encode(self.encode_to, self.errors).decode(self.encode_to)
self.target.write(s)

def flush(self):
self.target.flush()

if sys.stdout.encoding == 'cp936':
sys.stdout = UnicodeStreamFilter(sys.stdout)


if __name__ == '__main__':
logger = logging.getLogger(__name__)
if not sys.platform.startswith('win'):
import coloredlogs
coloredlogs.install(level='DEBUG')

webwx = WebWeixin()
webwx.start()



之后就可以运行扫码访问了,亲测有效

640?wx_fmt=png

640?wx_fmt=png



640?wx_fmt=gif



本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/254384.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

如何找回删除的微信好友?

如何找回删除的微信好友&#xff1f; 本篇主要是说朋友圈无互动过&#xff0c;也没有任何标签和其他特殊标记&#xff0c;不知道微信号&#xff0c;手机号的微信好友&#xff0c;误删之后如何通过恢复数据方式恢复好友的办法。百度之后主要有如下几种方法&#xff1a;1.朋友圈有…

在Centos Stream 9上Docker的实操教程(四) - Docker腾讯云远程仓库和本地私有仓库

在Centos Stream 9上Docker的实操教程 - Docker腾讯云远程仓库和本地私有仓库 本地镜像发布到腾讯云注册开通腾讯云初始化个人版服务创建仓库推送拉取镜像 私有仓库结语 本地镜像发布到腾讯云 由于官方的docker hub访问由于网络原因&#xff0c;可能会比较慢&#xff0c;博主推…

html输入框 多出的字隐藏,input 输入框获得/失去焦点时隐藏/显示文字(jquery版)

input 输入框获得和失去焦点时隐藏或者显示文字我们先看下效果图 输入框默认状态: 输入框获取焦点状态: 大家可以看效果图的搜索输入框&#xff0c;默认显示着“用户名/Email”的提示&#xff0c;当这个 input 输入框获得焦点时&#xff0c;就自动清空等待用户输入&#xff0c…

【C++】函数高级 - 默认参数,占位参数,函数重载基本语法,函数重载注意事项

No.Contents1【C】基础知识 - HelloWorld&#xff0c;注释&#xff0c;变量&#xff0c;常量&#xff0c;关键字&#xff0c;标识符2【C】数据类型 - 整型&#xff0c;sizeof&#xff0c;实型&#xff0c;字符型&#xff0c;转义字符&#xff0c;字符串类型&#xff0c;布尔类型…

LeetCode 560 和为 K 的子数组

LeetCode 560 和为 K 的子数组 来源&#xff1a;力扣&#xff08;LeetCode&#xff09; 链接&#xff1a;https://leetcode.cn/problems/subarray-sum-equals-k/description 博主Github&#xff1a;https://github.com/GDUT-Rp/LeetCode 题目&#xff1a; 给你一个整数数组 …

chatgpt赋能python:Python中的画图——创建漂亮的可视化图像

Python中的画图——创建漂亮的可视化图像 Python是一个高度可编程的语言&#xff0c;因此它非常适合用于创建各种类型的可视化。 在本文中&#xff0c;我们将介绍Python中的画图。我们将讨论如何使用Python和一些流行的数据可视化库来创建漂亮的可视化图像。我们还将探讨如何…

【Linux】Linux文件权限的理解

目录 一、Shell是什么&#xff1f; 1、Shell承担用户和内核间的翻译工作 2、拒绝用户非法请求&#xff0c;保护内核 3、派生子进程执行用户指令 二、用户切换与提权 1、普通用户与root用户的切换 2、普通用户指令短暂提权 三、文件权限的理解 1、文件权限角色的权限文…

Linux文件的rwx含义,Linux文件权限rwx简单了解

Ⅰ 了解Linux下的文件权限 如上图所示,ll命令详细展示当前目录下的文件或者子目录信息 红框标注的即为此文件或者目录的权限 【第一行文件10.c的权限以-开头,用来说明这是一个文件;第四行code目录的权限以字母d开头,用来标注code是目录】 关于文件权限,我们要先了解有那些…

Linux文件权限管理命令

今天继续给大家介绍Linux基本知识&#xff0c;本文主要内容是介绍Linux文件权限相关命令。 一、chown命令修改文件属主 在Linux系统中&#xff0c;chown命令可以更改文件的属主和属组&#xff0c;chown命令使用示例如下&#xff1a; chown user.user target chown user:user…

Linux文件权限修改

Linux文件权限修改 一.文件属性查看 使用root创建一个1.txt文件进行实例 touch /opt/1.txt查看文件属性 ls -l /opt/1.txt可以看到图中红色框框内列出了文件的信息属性&#xff0c;从左到有进行性详解 1. -rw-r--r-- 参考下图进行理解 共有10个字符&#xff0c;如果第一个…

Linux文件权限的设置

本文章主要介绍了对Linux文件的权限以及如何设置权限。 一、查看文件的权限与属性 ls -l 或者 ll查看文件属性 二、可以列出如下图所示的一些信息 -rw-r--r-- 第一位代表文件类型 d 表示目录 l 表示链接文件 - 表示普通文件 b 表示快设备文…

Linux文件权限查看与修改

Linux文件的权限 linux文件的权限可以分为四类&#xff1a;可读、可写、可执行、没有权限。分别用字符r、w、x、- 表示。 2. 用户与用户组 Linux是一个多用户多任务的操作系统&#xff0c;可以通过用户和用户组来更好的控制文件的权限。 每个文件都有一个拥有者&#xff08;某一…

【C++入门】什么是内联函数?

目录 一、概念 为什么要有内联函数&#xff1f; 内联函数设计的初衷是为了替代部分 #define 宏定义 二、特性 1.空间换时间 2.编译器做主 3.声明定义放一起 总结 一、概念 以inline修饰的函数叫做内联函数&#xff0c;编译时C编译器会在调用函数的地方展开&#xff0c;没有…

CAD圆弧

import dxfgrabberinputFilePathe:/test3-11/Drawing2.dxf #输入文件的路径dxf dxfgrabber.readfile(inputFilePath)#按图层访问 # for layer in dxf.layers: # #print(layer.name,layer.color,layer.linetype) # print("图层名&#xff1a;",layer.name)l…

CAD电气制图中怎么用圆弧连接导线?

在CAD电气制图过程中经常会需要连接两条导线&#xff0c;那么你知道CAD软件中怎么用圆弧连接两条导线呢&#xff1f;浩辰CAD电气软件中提供了便捷的圆弧连接功能&#xff0c;接下来的CAD电气制图教程就和小编一起来看看浩辰CAD电气软件中圆弧连接功能的相关使用技巧吧&#xff…

php中划弧线,cad画弧形的快捷键是什么?如何画弧形?

cad画弧形的快捷键是A&#xff0c;画弧形的方法&#xff1a;首先确定圆心&#xff0c;并点击圆弧命令&#xff1b;接着继续输入C进行圆心的确定&#xff0c;并调整半径和位置&#xff1b;最后点击选择两个端点后exc退出即可完成。 cad画弧形的快捷键是A&#xff0c;画弧形的方法…

你知道CAD软件将圆弧与直线转换成多段线的方法吗?

CAD制图时想要对直线与圆弧构成的整体对象进行多段线编辑&#xff0c;首先需要将这个对象转换成多段线。本文将介绍CAD如何将直线与圆弧连接成多段线。 1.使用CAD软件打开需要进行操作的图纸文件&#xff0c;用夹点拾取一下&#xff0c;从右边的特性栏可以看到这个图形是由4条…

圆弧的绘制

圆弧的分类 圆心/起/终点画圆弧 其操作方法类似于圆命令&#xff0c;第一次单击鼠标左键&#xff0c;确定圆心&#xff0c;移动鼠标&#xff0c;再次单击鼠标左键确定半径&#xff0c;最后通过移动鼠标来确定圆弧长度&#xff08;若不移动就单击鼠标左键&#xff0c;则取消该次…

AUTOCAD——圆弧命令

创建圆弧。 执行方式 命令行&#xff1a;AEC 菜单栏&#xff1a;绘图→圆弧 工具栏&#xff1a;圆弧命令图标 “圆弧命令操作命令位置”界面 执行以上命令后&#xff0c;命令行会显示以下信息&#xff1a; 选项说明 &#xff08;1&#xff09;起点&#xff1a;指定第一个点…

chatgpt赋能python:Python中字符串的转换方法

Python中字符串的转换方法 作为一门非常强大的编程语言&#xff0c;Python在字符串的处理上也有着非常丰富的功能。在Python中&#xff0c;字符串是非常重要的数据类型之一&#xff0c;也是最常用的数据类型之一。字符串在Python中有着很多的用途&#xff0c;比如表示文本数据…