WeChatReport_OS/main.py
PeterZhong a4cca18a42 2024
2025-01-01 01:23:29 +08:00

730 lines
25 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import datetime
import json
import re
import shutil
import string
import threading
from collections import Counter
from enum import Enum
import openpyxl
import jieba
import wordcloud
from numpy import sort
from imageio.v2 import imread
from openpyxl.workbook import Workbook
from wordcloud import WordCloud, STOPWORDS
import matplotlib.pyplot as plt
from mplfonts import use_font
# 系统库
import _thread
import datetime
import random
import math
# 数据分析库
import numpy as np
from pylab import mpl
import pandas as pd
import matplotlib.pyplot as plt
# 不展示警告信息
import warnings
# warnings.filterwarnings('ignore')
# 使一个单元格可以输出多次
from IPython.core.interactiveshell import InteractiveShell
# import aspose.words as aw
InteractiveShell.ast_node_interactivity = "all"
# pyecharts相关
from pyecharts.globals import CurrentConfig, OnlineHostType, ThemeType, ChartType, SymbolType
CurrentConfig.ONLINE_HOST = OnlineHostType.NOTEBOOK_HOST
from pyecharts.commons.utils import JsCode
from pyecharts.components import Table
from pyecharts.charts import * # 图表类型
from pyecharts import options as opts # 配置项
from pyecharts.charts import Bar
import os
datas = []
files = []
names = []
out_datas = []
class Type(Enum):
TEXT = 1
IMAGE = 3
EMOJI = 47
VIDEO = 43
SYSTEM = 10000
VOICE = 34
LOCATION = 48
QUOTE = 49
VOT = 50
CARD = 42
RECOMMEND = 37
def main_progress(file_path):
print("进入主程序")
workbook = openpyxl.load_workbook(file_path)
sheet = workbook["messages"]
print("表格已读入")
workbook2 = openpyxl.load_workbook("./contacts.xlsx")
print("联系人表格已载入")
sheets = workbook.sheetnames
contact_sheet=workbook2["contacts"]
contacts=[]
for row in contact_sheet.iter_rows():
if not row[4].value == "NickName":
contacts.append(row[4].value)
num = -1
for row in sheet.iter_rows():
if not num == row[1].value:
num = num + 1
print("一共有", num, "组聊天记录。")
talkerId = 0
talkerName = ""
nick = ""
ts = []
print(contacts)
for nickName in contacts:
thread = myThread(talkerId, nickName, sheet.iter_rows())
ts.append(thread)
print("添加线程" + str(talkerId))
talkerId += 1
while talkerId <= num:
# person(talkerId,sheet.iter_rows())
thread = ""
talkerId += 1
for t in ts:
t.run()
print("END ALL")
class myThread(threading.Thread):
def __init__(self, threadID, talkerId, rows):
threading.Thread.__init__(self)
self.threadID = threadID
self.talkerId = talkerId
self.rows = rows
def run(self):
print("开始线程:" +str(self.talkerId))
person(self.talkerId, self.rows)
print("退出线程:" + str(self.talkerId))
def person(talkerId, rows):
# 这是同一组聊天记录的分析
text = "" # 汇总所有文本信息
empty_time = datetime.datetime.strptime("1999-01-01 7:00:00", "%Y-%m-%d %H:%M:%S")
latest_time = empty_time
origin_datetime = datetime.datetime.now()
count = {"text": 0, "voice": 0, "emoji": 0, "video": 0, "quote": 0, "system": 0, "vot": 0, "card": 0,
"image": 0, "location": 0, "recommend": 0}
date_format = "%Y-%m-%d %H:%M:%S"
total_num = 0
month_dict = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
date_list = []
date_keys = []
time_list = []
time_keys = []
lens_list = []
send = 0
receive = 0
found = False
talkerName=""
rowcount=0
for row in rows:
# if row[1].value == talkerId or row[9] == talkerName:
if True:
rowcount += 1
# 总数
found=True
total_num += 1
talkerName = row[9].value
nick = row[10].value
# 分类进行分析统计
if str(row[2].value) == str(Type.TEXT.value):
text += str(row[7].value) # 凡是文本类型的数据都聚合一起方便后面词频分析
lens_list.append(str(row[7].value).__len__())
count["text"] += 1
elif str(row[2].value) == str(Type.IMAGE.value):
count["image"] += 1
elif str(row[2].value) == str(Type.EMOJI.value):
count["emoji"] += 1
elif str(row[2].value) == str(Type.VOICE.value):
count["voice"] += 1
elif str(row[2].value) == str(Type.VIDEO.value):
count["video"] += 1
elif str(row[2].value) == str(Type.VOT.value):
count["vot"] += 1
elif str(row[2].value) == str(Type.CARD.value):
count["card"] += 1
elif str(row[2].value) == str(Type.SYSTEM.value):
count["system"] += 1
elif str(row[2].value) == str(Type.LOCATION.value):
count["location"] += 1
elif str(row[2].value) == str(Type.QUOTE.value):
count["quote"] += 1
elif str(row[2].value) == str(Type.RECOMMEND.value):
count["recommend"] += 1
if row[8].value != "StrTime" and row[8].value != None:
message_time = datetime.datetime.strptime(row[8].value, date_format)
else:
continue
# 第一次聊天的日期
# 聊到最晚的时间
latest_time = later_time(message_time, latest_time)
# 月份分布
i = 0
while i < 12:
if message_time.month == i + 1:
month_dict[i] += 1
i += 1
# 将日期列提取出来,方便后面进行数数
date_list.append(message_time.date())
if message_time.date() not in date_keys:
date_keys.append(message_time.date())
# 提取出时间列方便后面统计时间段
time_list.append(message_time.hour)
if message_time.hour not in time_keys:
time_keys.append(message_time.hour)
# 统计发送和接收的消息数
if str(row[4].value) == "1":
send += 1
else:
receive += 1
if not found:
return
text.replace("\n", "")
print(rowcount)
# 词频统计
ls = jieba.lcut(text)
ls = [i for i in ls if i.__len__() > 1]
ls = [i for i in ls if not i == "x000D"]
ls_str = ' '.join(ls)
if ls.__len__() < 1:
ls_str="无内容 无内容 无内容"
return
stopwords = STOPWORDS
word_keys = []
word_dict = {}
for word in ls:
if word not in word_keys:
word_keys.append(word)
for key in word_keys:
word_dict[key] = ls.count(key)
s_word_dict = dict(sorted(word_dict.items(), key=lambda x: x[1], reverse=True))
word_dict = s_word_dict
# 添加新的停止词
emojis=['微笑', '撇嘴', '', '发呆', '得意', '流泪', '害羞', '闭嘴', '', '大哭', '尴尬', '发怒', '调皮', '呲牙',
'惊讶', '难过', '', '抓狂', '', '偷笑', '愉快', '白眼', '傲慢', '', '惊恐', '憨笑', '悠闲', '咒骂',
'疑问', '', '', '', '骷髅', '敲打', '再见', '擦汗', '抠鼻', '鼓掌', '坏笑', '右哼哼', '鄙视', '委屈',
'快哭了', '阴险', '亲亲', '可怜', '笑脸', '生病', '脸红', '破涕为笑', '恐惧', '失望', '无语', '嘿哈',
'捂脸', '奸笑', '机智', '皱眉', '', '吃瓜', '加油', '', '天啊', 'Emm', '社会社会', '旺柴', '好的',
'打脸', '', '翻白眼', '666', '让我看看', '叹气', '苦涩', '裂开', '嘴唇', '爱心', '心碎', '拥抱', '',
'', '握手', '胜利', '抱拳', '勾引', '拳头', 'OK', '合十', '啤酒', '咖啡', '蛋糕', '玫瑰', '凋谢', '菜刀',
'炸弹', '便便', '月亮', '太阳', '庆祝', '礼物', '红包', '', '', '烟花', '爆竹', '猪头', '跳跳', '发抖',
'转圈']
stopwords.update(emojis)
max_word = []
word_max = 0
for word in word_keys:
if ls.count(word) > word_max and word not in emojis:
max_word = [word]
word_max = ls.count(word)
elif ls.count(word) == word_max:
max_word.append(word)
pattern = re.compile(r'[0-9a-zA-Z]')
ls_str = re.sub(pattern, '', ls_str)
background = imread('bg.jpg')
wc = wordcloud.WordCloud(
font_path="Deng.ttf",
max_words=500,
stopwords=stopwords,
mask=background,
background_color="white")
# 表情分析
stickers = ['[微笑]', '[撇嘴]', '[色]', '[发呆]', '[得意]', '[流泪]', '[害羞]', '[闭嘴]', '[睡]', '[大哭]',
'[尴尬]', '[发怒]', '[调皮]', '[呲牙]', '[惊讶]', '[难过]', '[囧]', '[抓狂]', '[吐]', '[偷笑]',
'[愉快]', '[白眼]', '[傲慢]', '[困]', '[惊恐]', '[憨笑]', '[悠闲]', '[咒骂]', '[疑问]', '[嘘]',
'[晕]', '[衰]', '[骷髅]', '[敲打]', '[再见]', '[擦汗]', '[抠鼻]', '[鼓掌]', '[坏笑]', '[右哼哼]',
'[鄙视]', '[委屈]', '[快哭了]', '[阴险]', '[亲亲]', '[可怜]', '[笑脸]', '[生病]', '[脸红]',
'[破涕为笑]', '[恐惧]', '[失望]', '[无语]', '[嘿哈]', '[捂脸]', '[奸笑]', '[机智]', '[皱眉]',
'[耶]', '[吃瓜]', '[加油]', '[汗]', '[天啊]', '[Emm]', '[社会社会]', '[旺柴]', '[好的]', '[打脸]',
'[哇]', '[翻白眼]', '[666]', '[让我看看]', '[叹气]', '[苦涩]', '[裂开]', '[嘴唇]', '[爱心]',
'[心碎]', '[拥抱]', '[强]', '[弱]', '[握手]', '[胜利]', '[抱拳]', '[勾引]', '[拳头]', '[OK]',
'[合十]', '[啤酒]', '[咖啡]', '[蛋糕]', '[玫瑰]', '[凋谢]', '[菜刀]', '[炸弹]', '[便便]', '[月亮]',
'[太阳]', '[庆祝]', '[礼物]', '[红包]', '[發]', '[福]', '[烟花]', '[爆竹]', '[猪头]', '[跳跳]',
'[发抖]', '[转圈]']
# 除了微信自带的表情,添加一些常用的 Emoji 表情,除此之外还可以添加一些想要统计的单词
stickers.extend(['😂', '🤣', '🥰', '😅', '🥹', '😘', '🤩', '🥺', '😓', '🙄', '🤡'])
stickers_dict = {stickers: 0 for stickers in stickers}
max_emoji = []
emoji_max = 0
for word in stickers_dict.keys():
# 使用文本的 count 函数,计算里面包含了多少个 word 变量,然后加回去
if text.count(word) > emoji_max:
max_emoji = [word]
emoji_max = text.count(word)
elif text.count(word) == emoji_max:
max_emoji.append(word)
# 统计每天分布于聊天次数最多的一天
date_dict = {}
date_max = 0
max_date = []
date_min = date_list.count(date_list[0])
min_date = [date_list[0]]
for key in date_keys:
date_dict[key] = date_list.count(key)
if date_list.count(key) > date_max:
max_date = [key]
date_max = date_list.count(key)
elif date_list.count(key) == date_max:
max_date.append(key)
if date_list.count(key) < date_min:
min_date = [key]
date_min = date_list.count(key)
elif date_list.count(key) == date_min:
min_date.append(key)
# 统计时间段
time_dict = {}
s_time_dict = {}
time_max = 0
max_time = []
time_min = time_list.count(time_list[0])
min_time = [time_list[0]]
for key in time_keys:
time_dict[key] = time_list.count(key)
if time_list.count(key) > time_max:
max_time = [key]
time_max = time_list.count(key)
elif time_list.count(key) == time_max:
max_time.append(key)
if time_list.count(key) < time_min:
min_time = [key]
time_min = time_list.count(key)
elif time_list.count(key) == time_min:
min_time.append(key)
i = 0
while i < 24:
try:
s_time_dict[i] = time_dict[i]
except:
s_time_dict[i] = 0
i += 1
time_dict = s_time_dict
sum = 0
for len in lens_list:
sum += len
avg = sum / lens_list.__len__()
filename = generate_filename()
base_path = os.path.join("./out", remove_invalid_chars(filename))
if not os.path.exists(base_path):
os.mkdir(base_path)
mpl.rcParams["font.sans-serif"] = ["SimHei"]
heat_html = drawHeat(date_dict)
time_bar_html = drawBar([str(i) + "\n\n" + str(i + 1) + "\n" for i in time_dict.keys()],
[int(i) for i in list(time_dict.values())], "时间段", "消息数",
os.path.join(base_path, "timebar"))
typepie_html = drawPie(list(count.keys()), list(count.values()), "各类消息占比",
os.path.join(base_path, "typepie"))
send_pie_html = drawPie(["我发送的", "我收到的"], [send, receive], "发送与收到消息占比",
os.path.join(base_path, "pie"))
wordcloud_html = drawWordCloud(ls_str)
if ' ' in ls_str:
wc.generate(ls_str)
wc.to_file(os.path.join(base_path, "wordcloud.png"))
out_data = {
"nick": nick,
"origin_date": str(origin_datetime),
"latest_time": str(latest_time),
"time_comment": time_comment(latest_time),
"total_num": total_num,
"text_count": count["text"],
"voice_count": count["voice"],
"text_comment": text_comment(count["text"], count["voice"]),
"avg": "{:.2f}".format(avg),
"avg_comment": avg_comment(avg),
"receive": receive,
"send_comment": send_comment(send, receive),
"note": remove_invalid_chars(talkerName),
"max_date": list2str([str(i.month) + "" + str(i.day) + "" for i in max_date]),
"date_max": date_max,
"min_date": list2str([str(i.month) + "" + str(i.day) + "" for i in min_date]),
"date_min": date_min,
"max_time": list2str([str(i) + "时到" + str(i + 1) + "" for i in max_time]),
"time_max": time_max,
"min_time": list2str([str(i) + "时到" + str(i + 1) + "" for i in min_time]),
"time_min": time_min,
"max_word": list2str(max_word),
"word_max": word_max,
"sum": text.__len__(),
"text_percent": str("{:.2f}".format(count["text"] / total_num * 100)) + "%",
"max_date_this": this(max_date),
"min_date_this": this(min_date),
"max_time_this": this(max_time),
"min_time_this": this(min_time),
"file": filename,
"max_emoji":str(max_emoji),
"emoji_max":emoji_max,
"text":send_text(nick,filename),
"url":f"https://2024.peterzhong.site/report/{filename}.html",
"time_bar": time_bar_html,
"send_pie": send_pie_html,
"type_pie": typepie_html,
"heat": heat_html,
"wordcloud_html": wordcloud_html
}
md_path = os.path.join("./out", remove_invalid_chars(talkerName) + ".md")
shutil.copy("./template.md", md_path)
print(md_path)
content = ""
with open(md_path, encoding="utf-8", mode="r") as file:
content = file.read()
for key in out_data.keys():
content = content.replace("%" + str(key) + "%", str(out_data[key]))
with open(md_path, encoding="utf-8", mode="w+") as file:
file.write(content)
shutil.copy(md_path,os.path.join("./out", filename + ".md")) # 如果不需要保留密码文件名请删除此行
files.append(filename)
names.append(talkerName)
os.remove(md_path) # 如果需要保留原文件名请注释此行
out_datas.append(out_data)
update_toc()
data = {
"user": {
"nickName": nick,
"note": talkerName
},
"type": {
"text": count["text"],
"voice": count["voice"],
"emoji": count["emoji"],
"video": count["video"],
"quote": count["quote"],
"system": count["system"],
"vot": count["vot"],
"card": count["card"],
"image": count["image"],
"location": count["location"],
"recommend": count["recommend"],
},
"count": {
"total": total_num,
"avgLen": avg,
"word": {
"popWord": max_word,
"pop": word_max
},
"emoji": {
"popEmoji": max_emoji,
"pop": emoji_max
},
"date": {
"dateMax": date_max,
"dateMin": date_min,
"maxDate": [date_handler(max_date) for max_date in max_date],
"minDate": [date_handler(min_date) for min_date in min_date]
},
"time": {
"timeMax": time_max,
"timeMin": time_min,
"maxtime": [max_time for max_time in max_time],
"mintime": [min_time for min_time in min_time]
},
},
"time": {
"start": date_handler(origin_datetime),
"latest": time_handler(latest_time)
},
"out": {
"wordCloud": 0
}
}
datas.append(data)
plt.close('all')
return out_data
def time_comment(datetime):
if 7 < datetime.hour < 22:
return "我们的作息好像还挺合理,或许只是消息的作息比较巧合吧。"
else:
return "夜猫子无疑了,聊这么晚不会是在聊工作吧?"
def avg_comment(avg):
if avg < 15:
return "长话短说,这效率不错"
else:
return "到底是谁比较啰嗦一些呢[狗头]"
def send_comment(send, receive):
total = send + receive
if (send - receive) / total > 0.2:
return "看来我是个话痨。"
elif (send - receive) / total < -0.2:
return "你的话让我受益匪浅,常常仔细地听着你讲。"
else:
return "基本上是一来一回,有问有答了。"
def text_comment(text, voice):
if text > voice:
return "看来还是文本消息比较方便彼此沟通"
else:
return "估计是有比较多紧急情况,来不及手输文字了"
def date_handler(d):
return d.strftime('%Y-%m-%d')
def time_handler(d):
return d.strftime('%H-%M-%S')
def later_time(time_new, time_old):
time0 = datetime.datetime.strptime("1999-01-01 0:00:00", "%Y-%m-%d %H:%M:%S")
time7 = datetime.datetime.strptime("1999-01-01 7:00:00", "%Y-%m-%d %H:%M:%S")
if time0.time() < time_old.time() < time7.time():
if time_new.time() > time7.time():
return time_old
elif time_old.time() > time_new.time():
return time_old
else:
return time_new
else:
if time0.time() < time_new.time() < time7.time():
return time_new
else:
if time_new.time() > time_old.time():
return time_new
else:
return time_old
# 横轴标签(星期)函数
def label_days(ax, dates, i, j, calendar):
ni, nj = calendar.shape
day_of_month = np.nan * np.zeros((ni, 7))
day_of_month[i, j] = [d.day for d in dates]
for (i, j), day in np.ndenumerate(day_of_month):
if np.isfinite(day):
ax.text(j, i, int(day), ha='center', va='center')
ax.set(xticks=np.arange(7),
xticklabels=['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'])
ax.xaxis.tick_top()
# 纵轴标签(月份)函数
def label_months(ax, dates, i, j, calendar):
month_labels = np.array(['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul',
'Aug', 'Sep', 'Oct', 'Nov', 'Dec'])
months = np.array([d.month for d in dates])
uniq_months = sorted(set(months))
yticks = [i[months == m].mean() for m in uniq_months]
labels = [month_labels[m - 1] for m in uniq_months]
ax.set(yticks=yticks)
ax.set_yticklabels(labels, rotation=90)
# 数据封装函数
def calendar_array(dates, data):
i, j = zip(*[d.isocalendar()[1:] for d in dates])
i = np.array(i) - min(i)
j = np.array(j) - 1
ni = max(i) + 1
calendar = np.nan * np.zeros((ni, 7))
calendar[i, j] = data
return i, j, calendar
def remove_invalid_chars(text):
rstr = r"[\/\\\:\*\?\"\<\>\|]" # '/ \ : * ? " < > |'
if text != None:
text = re.sub(rstr, "_", text) # 替换为下划线
else:
text=str(text)+"_无备注"
return str(text)
# 传入日历数据和日期,输出日历图像
def calendar_heatmap(ax, dates, data):
i, j, calendar = calendar_array(dates, data)
im = ax.imshow(calendar, interpolation='none', cmap='summer')
label_days(ax, dates, i, j, calendar)
label_months(ax, dates, i, j, calendar)
ax.figure.colorbar(im)
def drawBar(axis_x, axis_y, xlabel, ylabel, path):
plt.clf()
# use_font('Heiti TC')
plt.bar(axis_x, axis_y)
plt.xlabel(xlabel)
plt.ylabel(ylabel)
plt.savefig(path)
bar = Bar()
bar.add_xaxis(axis_x)
bar.add_yaxis(ylabel, axis_y)
bar.set_global_opts(title_opts=opts.TitleOpts(title="消息时间段分布"))
return bar.render_embed()
def drawHeat(data):
# plt.clf()
# fig, ax = plt.subplots()
# calendar_heatmap(ax, dates, data)
# plt.savefig(path)
data_list = [
[datetime.datetime.strptime(str(date), "%Y-%m-%d"), freq]
for date, freq in data.items()
]
calendar = (
Calendar()
.add(
series_name="",
yaxis_data=data_list,
calendar_opts=opts.CalendarOpts(
pos_top="50px",
pos_left="30px",
pos_right="30px",
range_="2024", # 指定年份范围
),
)
.set_global_opts(
visualmap_opts=opts.VisualMapOpts(
max_=max([item[1] for item in data_list]),
min_=min([item[1] for item in data_list]),
orient="horizontal",
is_piecewise=True,
range_color=["#b6e1bc", "#0d7000"]
),
title_opts=opts.TitleOpts(title="消息热力图"),
)
)
return calendar.render_embed()
def drawPie(labels, data, title, path):
plt.clf()
plt.axes(aspect='equal')
plt.pie(x=data, labels=labels)
plt.title(title)
plt.legend(loc='best')
plt.savefig(path)
pie = Pie()
pie.add("", [list(z) for z in zip(labels, data)]).set_global_opts(title_opts=opts.TitleOpts(title=title)).set_series_opts(label_opts=opts.LabelOpts(formatter="{b}: {c}"))
return pie.render_embed()
def drawWordCloud(text):
# 分词并统计词频
words = text.split() # 使用空格分词
word_counts = Counter(words) # 统计词频
# 准备词云图需要的数据格式 [(word, freq), ...]
data = word_counts.items()
words = list(word_counts.items())
# 创建词云图
wordcloud = (
WordCloud()
.add("", words, word_size_range=[20, 100]) # 圆形词云
.set_global_opts(title_opts=opts.TitleOpts(title="聊天关键词"))
)
html = wordcloud.render_embed()
return wordcloud.render_embed()
def this(arr):
arr = list(arr)
if len(arr) == 1:
return ""
elif len(arr) == 2:
return "这两"
else:
return "这几"
def list2str(list):
res = ""
for item in list:
res += "、`" + str(item)+"`"
res = res[1:]
res = replace_last_string(res, "", "")
return res
def replace_last_string(text, old_str, new_str):
last_index = text.rfind(old_str)
if last_index != -1:
new_text = text[:last_index] + new_str + text[last_index + len(old_str):]
return new_text
else:
return text
def generate_filename(length=15):
letters = string.ascii_lowercase + string.digits # 包含小写字母和数字
filename = ''.join(random.choice(letters) for _ in range(length))
return filename
def write_dict_to_excel(data_dict, filename):
# 创建一个新的Excel工作簿
workbook = Workbook()
# 获取活动的工作表
worksheet = workbook.active
# 写入字典的键作为表头
headers = list(data_dict[0].keys())
worksheet.append(headers)
for item in data_dict:
# 写入字典的值
values = list(item.values())
worksheet.append(values)
# 保存Excel文件
workbook.save(filename)
def send_text(nick, filename):
text=f"尊敬的{nick}你好我们即将迎来2024年首先祝你新年快乐来年身体健康、一切顺心[加油]。回顾2023年我为你做了一份我们的微信好友年度报告作为纪念诚邀你来看看挺有意思https://peterzhong1219.site/report/{filename}.html。让2023一切不如意随风而去多多希望诗和远方——说不定哪天就实现了呢愿共勉。【回复TD退订】[吃瓜]"
return text
def update_toc():
md = "# 微信好友年度报告\n\n"
i = 0
for filename in files:
md += "- [" + str(names[i]) + "](" + str(filename) + ".md)\n"
i += 1
with open("./out/SUMMARY.md", "w+", encoding="utf-8") as file:
file.write(md)
# 按绿色按钮运行
if __name__ == '__main__':
file_path = "./messages.xlsx"
main_progress(file_path)
with open("output.json", "w", encoding="utf-8") as file:
json.dump(datas, file, indent=" ", ensure_ascii=False)
write_dict_to_excel(out_datas,"output.xlsx")