项目演示地址:https://www.szyfd.xyz/app/threewall/3dwall
新建django 项目,如刚接触微信请移步微信开发入门篇第一章:https://www.cnblogs.com/wangcongxing/p/11546780.html
1.新建app threewall
python manage.py startapp threewall
2.新建static 文件夹,配置静态资源
配置settings.py
STATIC_URL = '/static/'
STATICFILES_DIRS = (
os.path.join(BASE_DIR, 'static'),
)
3.安装 channels channels_redis 用于处理web socket,pyCryptodome 处理二维码对称加密
pip install -U channels==2.0.2 channels_redis==2.1.1
pip install pyCryptodome
pip install django-simpleui
4.安装完成后配置如下 settings.py
INSTALLED_APPS = [
'simpleui',
'import_export',
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'app',
'threewall',
'channels',
]
5.threewall是我们准备建立的签到墙应用,接着就建立我们的 asgi 应用,并指定其要使用的路由。在 settings 同级目录下新建一个 routing.py 的文件:
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
import threewall.routing
application = ProtocolTypeRouter({
# (http->django views is added by default)
# 普通的HTTP请求不需要我们手动在这里添加,框架会自动加载过来
'websocket': AuthMiddlewareStack(
URLRouter(
threewall.routing.websocket_urlpatterns
)
),
})
6.threewall.routing 以及 threewall.routing.websocket_urlpatterns 是我们后面会自己建立的模块。
紧接着,我们需要在 Django 的配置文件中继续配置 Channels 的 asgi 应用和通道层的信息:
#WSGI_APPLICATION = 'wechatDemo.wsgi.application'
ASGI_APPLICATION = "wechatDemo.routing.application" # 上面新建的 asgi 应用
CHANNEL_LAYERS = {
'default': {
# 这里用到了 channels_redis
'BACKEND': 'channels_redis.core.RedisChannelLayer',
'CONFIG': {
'hosts': [('127.0.0.1', 6379)], # 配置你自己的 redis 服务信息
},
}
}
7.启动redis
windows 安装redis地址:https://www.jianshu.com/p/e16d23e358c0
8.设计签到表:threewall>models.py
from django.contrib.auth.models import User
from django.db import models
from django.utils.crypto import random
# Create your models here.
from django.utils.html import format_html
# Create your models here.
def rename(newname):
def decorator(fn):
fn.__name__ = newname
return fn
return decorator
# 签到表
class checkin(models.Model):
headimgurl = models.URLField(max_length=256, default="", null=True, blank=True)
openid = models.CharField(max_length=225, verbose_name="openid", blank=True, default="")
nickname = models.CharField(max_length=225, verbose_name="昵称", blank=True, default="")
sex = models.CharField(max_length=225, verbose_name="性别", blank=True, default="")
language = models.CharField(max_length=225, verbose_name="语言", blank=True, default="")
city = models.CharField(max_length=225, verbose_name="城市", blank=True, default="")
createTime = models.DateTimeField(auto_now_add=True, verbose_name="签到时间")
lastTime = models.DateTimeField(auto_now=True, verbose_name="修改时间")
class Meta:
verbose_name_plural = "签到表"
@rename("模板头像")
def showheadimgurl(self):
return format_html("<img src='{}' style='width:50px'/>", self.headimgurl)
def __str__(self):
return self.nickname
9.threewall>modes.py
from django.contrib import admin
from threewall import models
# Register your models here.
@admin.register(models.checkin)
class orderAdmin(admin.ModelAdmin):
list_display = ("showheadimgurl", "openid", "nickname", "sex", "language", "city", "createTime", "lastTime")
list_display_links = ("openid", "nickname")
search_fields = ('nickname', "openid")
list_per_page = 50
10.threewall>views.py
from django.shortcuts import render
from wechatpy.oauth import WeChatOAuth
from django.shortcuts import render, redirect
from django.http import JsonResponse, HttpResponse, HttpResponseRedirect
import time
import datetime
from django.conf import settings
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django.shortcuts import render
import uuid
from wechatpy import WeChatClient
import os
import json
from wechatpy import WeChatPay
from threewall import models
from wechatpy.pay import dict_to_xml
import base64
from Crypto.Cipher import AES
# Create your views here.
# 公众号id
AppID = "xxx"
# 公众号AppSecret
AppSecret = "xxx"
# 密钥
key = "xxxx"
# 服务号
client = WeChatClient(AppID, AppSecret)
# 消息通道
from channels.layers import get_channel_layer
channel_layer = get_channel_layer()
from asgiref.sync import async_to_sync
# Create your views here.
def dwall(request):
checkins = models.checkin.objects.values("headimgurl")[0:200]
print(checkins.query)
checkUserInfo = []
CurPersonNum = checkins.count()
for item in checkins:
checkUserInfo.append({"headimgurl": item["headimgurl"]})
if checkUserInfo.__len__() < 199:
index = 0
while index < (199 - checkUserInfo.__len__()):
index += 1
checkUserInfo.append({"headimgurl": "/static/3dwall/img/a.png"})
aes = AES.new(add_to_16(key), AES.MODE_ECB)
# 先进行aes加密
ticks = str(time.time())
encrypt_aes = aes.encrypt(add_to_16(ticks))
# 用base64转成字符串形式
encrypted_text = str(base64.encodebytes(encrypt_aes), encoding='utf-8') # 执行加密并转码返回bytes
return render(request, "3dwall.html",
{"checkUserInfo": checkUserInfo, "CurPersonNum": CurPersonNum, "encrypted_text": encrypted_text})
# 定义授权装饰器
def getWeChatOAuth(redirect_url):
return WeChatOAuth(AppID, AppSecret, redirect_url, 'snsapi_userinfo')
def oauth(method):
def warpper(request):
if request.session.get('user_info', None) is None:
code = request.GET.get('code', None)
wechat_oauth = getWeChatOAuth(request.get_raw_uri())
url = wechat_oauth.authorize_url
print(url)
if code:
try:
wechat_oauth.fetch_access_token(code)
user_info = wechat_oauth.get_user_info()
print(user_info)
except Exception as e:
print(str(e))
# 这里需要处理请求里包含的 code 无效的情况
# abort(403)
else:
# 建议存储在用户表
request.session['user_info'] = user_info
else:
return redirect(url)
return method(request)
return warpper
@oauth
def checkin(request):
signature = request.GET.get("signature", None)
if signature is None:
return render(request, "checkerror.html")
try:
aes = AES.new(add_to_16(key), AES.MODE_ECB)
# 优先逆向解密base64成bytes
base64_decrypted = base64.decodebytes(signature.replace(' ', '+').encode(encoding='utf-8'))
# 执行解密密并转码返回str
decrypted_text = str(aes.decrypt(base64_decrypted), encoding='utf-8').replace('\0', '')
except Exception as e:
print("signature="+signature)
return render(request, "expired.html") # 二维码已过期
print(decrypted_text)
ltime = time.localtime(float(decrypted_text))
qrTime = time.strftime("%Y-%m-%d %H:%M:%S", ltime) # 没有必要再次转换为时间格式
# 获得系统当前时间-10秒,用于判断二维码是否过期
d = datetime.datetime.now() + datetime.timedelta(seconds=-10)
t = d.timetuple()
timeStamp = int(time.mktime(t))
timeStamp = float(str(timeStamp) + str("%06d" % d.microsecond)) / 1000000
if float(decrypted_text) < timeStamp:
return render(request, "expired.html") # 二维码已过期
print(qrTime)
user_info = request.session.get('user_info')
user_info = client.user.get(user_info["openid"])
print(user_info)
subscribe = user_info["subscribe"]
if subscribe == 0:
return render(request, "isfollow.html")
headimgurl = user_info["headimgurl"]
nickname = user_info["nickname"]
ischeck = models.checkin.objects.filter(openid=user_info["openid"]).first()
if ischeck is None:
models.checkin.objects.create(headimgurl=headimgurl, openid=user_info["openid"],
nickname=nickname, sex=user_info["sex"],
language=user_info["language"],
city=user_info["country"] + "-" + user_info["province"] + "-" + user_info["city"])
else:
ischeck.save()
async_to_sync(channel_layer.group_send)("chat_roomName", {'type': 'chat_message',
'message': {"type": "message",
"headimgurl": headimgurl,
"nickname": nickname}})
return render(request, "checkin.html")
# pip install pyCryptodome
# str不是16的倍数那就补足为16的倍数
def add_to_16(value):
while len(value) % 16 != 0:
value += '\0'
return str.encode(value) # 返回bytes
@csrf_exempt
def signatureCheck(request):
while True:
time.sleep(5)
# 初始化加密器
aes = AES.new(add_to_16(key), AES.MODE_ECB)
# 先进行aes加密
ticks = str(time.time())
encrypt_aes = aes.encrypt(add_to_16(ticks))
# 用base64转成字符串形式
encrypted_text = str(base64.encodebytes(encrypt_aes), encoding='utf-8') # 执行加密并转码返回bytes
#print(encrypted_text)
async_to_sync(channel_layer.group_send)("chat_roomName", {'type': 'chat_message',
'message': {"type": "signatureCheck",
"signatureCheck": encrypted_text}})
11.threewall>consumers.py
import json,uuid
import datetime
import time
from asgiref.sync import async_to_sync
import multiprocessing
from channels.generic.websocket import WebsocketConsumer
class ChatConsumer(WebsocketConsumer):
def connect(self):
# 当 websocket 一链接上以后触发该函数
self.room_name = self.scope['url_route']['kwargs']['room_name']
self.room_group_name = 'chat_%s' % self.room_name
print(self.room_group_name)
# 注意 `group_add` 只支持异步调用,所以这里需要使用`async_to_sync`转换为同步调用
async_to_sync(self.channel_layer.group_add)(
self.room_group_name,
self.channel_name
)
# 接受该链接
self.accept()
def disconnect(self, close_code):
# 断开链接是触发该函数
# 将该链接移出聊天室
async_to_sync(self.channel_layer.group_discard)(
self.room_group_name,
self.channel_name
)
def receive(self, text_data):
# 前端发送来消息时,通过这个接口传递
text_data_json = json.loads(text_data)
message = text_data_json['message']
async_to_sync(self.channel_layer.group_send)(
self.room_group_name,
{
# 这里的type要在当前类中实现一个相应的函数,
# 下划线或者'.'的都会被Channels转换为下划线处理,
# 所以这里写 'chat.message'也没问题
'type': 'chat_message',
'message': message + str(datetime.datetime.today())
}
)
# 从聊天室拿到消息,后直接将消息返回回去
def chat_message(self, event):
message = event['message']
# Send message to WebSocket
self.send(text_data=json.dumps({
'message': message
}))
12.threewall>routing.py
from django.urls import path
from threewall import consumers
websocket_urlpatterns = [
# 路由,指定 websocket 链接对应的 consumer
path('ws/chat/<str:room_name>/', consumers.ChatConsumer),
]
13.wechatDemo>routing.py
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
import threewall.routing
application = ProtocolTypeRouter({
# (http->django views is added by default)
# 普通的HTTP请求不需要我们手动在这里添加,框架会自动加载过来
'websocket': AuthMiddlewareStack(
URLRouter(
threewall.routing.websocket_urlpatterns
)
),
})
14.后台统计
15.项目完整结构
注:需要静态资源,请私下联系