730 lines
25 KiB
Python
730 lines
25 KiB
Python
import datetime
|
||
import json
|
||
import re
|
||
import shutil
|
||
import string
|
||
import threading
|
||
from collections import Counter
|
||
from enum import Enum
|
||
import openpyxl
|
||
import jieba
|
||
import wordcloud
|
||
from numpy import sort
|
||
from imageio.v2 import imread
|
||
from openpyxl.workbook import Workbook
|
||
from wordcloud import WordCloud, STOPWORDS
|
||
import matplotlib.pyplot as plt
|
||
from mplfonts import use_font
|
||
# 系统库
|
||
import _thread
|
||
import datetime
|
||
import random
|
||
import math
|
||
# 数据分析库
|
||
import numpy as np
|
||
from pylab import mpl
|
||
import pandas as pd
|
||
import matplotlib.pyplot as plt
|
||
# 不展示警告信息
|
||
import warnings
|
||
# warnings.filterwarnings('ignore')
|
||
# 使一个单元格可以输出多次
|
||
from IPython.core.interactiveshell import InteractiveShell
|
||
# import aspose.words as aw
|
||
|
||
InteractiveShell.ast_node_interactivity = "all"
|
||
# pyecharts相关
|
||
from pyecharts.globals import CurrentConfig, OnlineHostType, ThemeType, ChartType, SymbolType
|
||
|
||
CurrentConfig.ONLINE_HOST = OnlineHostType.NOTEBOOK_HOST
|
||
from pyecharts.commons.utils import JsCode
|
||
from pyecharts.components import Table
|
||
from pyecharts.charts import * # 图表类型
|
||
from pyecharts import options as opts # 配置项
|
||
from pyecharts.charts import Bar
|
||
import os
|
||
|
||
datas = []
|
||
files = []
|
||
names = []
|
||
out_datas = []
|
||
|
||
class Type(Enum):
|
||
TEXT = 1
|
||
IMAGE = 3
|
||
EMOJI = 47
|
||
VIDEO = 43
|
||
SYSTEM = 10000
|
||
VOICE = 34
|
||
LOCATION = 48
|
||
QUOTE = 49
|
||
VOT = 50
|
||
CARD = 42
|
||
RECOMMEND = 37
|
||
|
||
|
||
def main_progress(file_path):
|
||
print("进入主程序")
|
||
workbook = openpyxl.load_workbook(file_path)
|
||
sheet = workbook["messages"]
|
||
print("表格已读入")
|
||
workbook2 = openpyxl.load_workbook("./contacts.xlsx")
|
||
print("联系人表格已载入")
|
||
sheets = workbook.sheetnames
|
||
contact_sheet=workbook2["contacts"]
|
||
contacts=[]
|
||
for row in contact_sheet.iter_rows():
|
||
if not row[4].value == "NickName":
|
||
contacts.append(row[4].value)
|
||
num = -1
|
||
for row in sheet.iter_rows():
|
||
if not num == row[1].value:
|
||
num = num + 1
|
||
print("一共有", num, "组聊天记录。")
|
||
talkerId = 0
|
||
talkerName = ""
|
||
nick = ""
|
||
ts = []
|
||
print(contacts)
|
||
for nickName in contacts:
|
||
thread = myThread(talkerId, nickName, sheet.iter_rows())
|
||
ts.append(thread)
|
||
print("添加线程" + str(talkerId))
|
||
talkerId += 1
|
||
while talkerId <= num:
|
||
# person(talkerId,sheet.iter_rows())
|
||
thread = ""
|
||
talkerId += 1
|
||
for t in ts:
|
||
t.run()
|
||
print("END ALL")
|
||
|
||
|
||
class myThread(threading.Thread):
|
||
def __init__(self, threadID, talkerId, rows):
|
||
threading.Thread.__init__(self)
|
||
self.threadID = threadID
|
||
self.talkerId = talkerId
|
||
self.rows = rows
|
||
|
||
def run(self):
|
||
print("开始线程:" +str(self.talkerId))
|
||
person(self.talkerId, self.rows)
|
||
print("退出线程:" + str(self.talkerId))
|
||
|
||
|
||
def person(talkerId, rows):
|
||
# 这是同一组聊天记录的分析
|
||
text = "" # 汇总所有文本信息
|
||
empty_time = datetime.datetime.strptime("1999-01-01 7:00:00", "%Y-%m-%d %H:%M:%S")
|
||
latest_time = empty_time
|
||
origin_datetime = datetime.datetime.now()
|
||
count = {"text": 0, "voice": 0, "emoji": 0, "video": 0, "quote": 0, "system": 0, "vot": 0, "card": 0,
|
||
"image": 0, "location": 0, "recommend": 0}
|
||
date_format = "%Y-%m-%d %H:%M:%S"
|
||
total_num = 0
|
||
month_dict = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
|
||
date_list = []
|
||
date_keys = []
|
||
time_list = []
|
||
time_keys = []
|
||
lens_list = []
|
||
send = 0
|
||
receive = 0
|
||
found = False
|
||
talkerName=""
|
||
rowcount=0
|
||
for row in rows:
|
||
# if row[1].value == talkerId or row[9] == talkerName:
|
||
if True:
|
||
rowcount += 1
|
||
# 总数
|
||
found=True
|
||
total_num += 1
|
||
talkerName = row[9].value
|
||
nick = row[10].value
|
||
|
||
# 分类进行分析统计
|
||
if str(row[2].value) == str(Type.TEXT.value):
|
||
text += str(row[7].value) # 凡是文本类型的数据都聚合一起方便后面词频分析
|
||
lens_list.append(str(row[7].value).__len__())
|
||
count["text"] += 1
|
||
elif str(row[2].value) == str(Type.IMAGE.value):
|
||
count["image"] += 1
|
||
elif str(row[2].value) == str(Type.EMOJI.value):
|
||
count["emoji"] += 1
|
||
elif str(row[2].value) == str(Type.VOICE.value):
|
||
count["voice"] += 1
|
||
elif str(row[2].value) == str(Type.VIDEO.value):
|
||
count["video"] += 1
|
||
elif str(row[2].value) == str(Type.VOT.value):
|
||
count["vot"] += 1
|
||
elif str(row[2].value) == str(Type.CARD.value):
|
||
count["card"] += 1
|
||
elif str(row[2].value) == str(Type.SYSTEM.value):
|
||
count["system"] += 1
|
||
elif str(row[2].value) == str(Type.LOCATION.value):
|
||
count["location"] += 1
|
||
elif str(row[2].value) == str(Type.QUOTE.value):
|
||
count["quote"] += 1
|
||
elif str(row[2].value) == str(Type.RECOMMEND.value):
|
||
count["recommend"] += 1
|
||
|
||
if row[8].value != "StrTime" and row[8].value != None:
|
||
message_time = datetime.datetime.strptime(row[8].value, date_format)
|
||
else:
|
||
continue
|
||
|
||
# 第一次聊天的日期
|
||
|
||
|
||
# 聊到最晚的时间
|
||
|
||
latest_time = later_time(message_time, latest_time)
|
||
|
||
# 月份分布
|
||
i = 0
|
||
while i < 12:
|
||
if message_time.month == i + 1:
|
||
month_dict[i] += 1
|
||
i += 1
|
||
|
||
# 将日期列提取出来,方便后面进行数数
|
||
date_list.append(message_time.date())
|
||
if message_time.date() not in date_keys:
|
||
date_keys.append(message_time.date())
|
||
|
||
# 提取出时间列方便后面统计时间段
|
||
time_list.append(message_time.hour)
|
||
if message_time.hour not in time_keys:
|
||
time_keys.append(message_time.hour)
|
||
|
||
# 统计发送和接收的消息数
|
||
if str(row[4].value) == "1":
|
||
send += 1
|
||
else:
|
||
receive += 1
|
||
if not found:
|
||
return
|
||
text.replace("\n", "")
|
||
print(rowcount)
|
||
|
||
# 词频统计
|
||
ls = jieba.lcut(text)
|
||
ls = [i for i in ls if i.__len__() > 1]
|
||
ls = [i for i in ls if not i == "x000D"]
|
||
ls_str = ' '.join(ls)
|
||
if ls.__len__() < 1:
|
||
ls_str="无内容 无内容 无内容"
|
||
return
|
||
stopwords = STOPWORDS
|
||
word_keys = []
|
||
word_dict = {}
|
||
for word in ls:
|
||
if word not in word_keys:
|
||
word_keys.append(word)
|
||
for key in word_keys:
|
||
word_dict[key] = ls.count(key)
|
||
s_word_dict = dict(sorted(word_dict.items(), key=lambda x: x[1], reverse=True))
|
||
word_dict = s_word_dict
|
||
# 添加新的停止词
|
||
emojis=['微笑', '撇嘴', '色', '发呆', '得意', '流泪', '害羞', '闭嘴', '睡', '大哭', '尴尬', '发怒', '调皮', '呲牙',
|
||
'惊讶', '难过', '囧', '抓狂', '吐', '偷笑', '愉快', '白眼', '傲慢', '困', '惊恐', '憨笑', '悠闲', '咒骂',
|
||
'疑问', '嘘', '晕', '衰', '骷髅', '敲打', '再见', '擦汗', '抠鼻', '鼓掌', '坏笑', '右哼哼', '鄙视', '委屈',
|
||
'快哭了', '阴险', '亲亲', '可怜', '笑脸', '生病', '脸红', '破涕为笑', '恐惧', '失望', '无语', '嘿哈',
|
||
'捂脸', '奸笑', '机智', '皱眉', '耶', '吃瓜', '加油', '汗', '天啊', 'Emm', '社会社会', '旺柴', '好的',
|
||
'打脸', '哇', '翻白眼', '666', '让我看看', '叹气', '苦涩', '裂开', '嘴唇', '爱心', '心碎', '拥抱', '强',
|
||
'弱', '握手', '胜利', '抱拳', '勾引', '拳头', 'OK', '合十', '啤酒', '咖啡', '蛋糕', '玫瑰', '凋谢', '菜刀',
|
||
'炸弹', '便便', '月亮', '太阳', '庆祝', '礼物', '红包', '發', '福', '烟花', '爆竹', '猪头', '跳跳', '发抖',
|
||
'转圈']
|
||
stopwords.update(emojis)
|
||
max_word = []
|
||
word_max = 0
|
||
for word in word_keys:
|
||
if ls.count(word) > word_max and word not in emojis:
|
||
max_word = [word]
|
||
word_max = ls.count(word)
|
||
elif ls.count(word) == word_max:
|
||
max_word.append(word)
|
||
pattern = re.compile(r'[0-9a-zA-Z]')
|
||
ls_str = re.sub(pattern, '', ls_str)
|
||
background = imread('bg.jpg')
|
||
wc = wordcloud.WordCloud(
|
||
font_path="Deng.ttf",
|
||
max_words=500,
|
||
stopwords=stopwords,
|
||
mask=background,
|
||
background_color="white")
|
||
|
||
# 表情分析
|
||
stickers = ['[微笑]', '[撇嘴]', '[色]', '[发呆]', '[得意]', '[流泪]', '[害羞]', '[闭嘴]', '[睡]', '[大哭]',
|
||
'[尴尬]', '[发怒]', '[调皮]', '[呲牙]', '[惊讶]', '[难过]', '[囧]', '[抓狂]', '[吐]', '[偷笑]',
|
||
'[愉快]', '[白眼]', '[傲慢]', '[困]', '[惊恐]', '[憨笑]', '[悠闲]', '[咒骂]', '[疑问]', '[嘘]',
|
||
'[晕]', '[衰]', '[骷髅]', '[敲打]', '[再见]', '[擦汗]', '[抠鼻]', '[鼓掌]', '[坏笑]', '[右哼哼]',
|
||
'[鄙视]', '[委屈]', '[快哭了]', '[阴险]', '[亲亲]', '[可怜]', '[笑脸]', '[生病]', '[脸红]',
|
||
'[破涕为笑]', '[恐惧]', '[失望]', '[无语]', '[嘿哈]', '[捂脸]', '[奸笑]', '[机智]', '[皱眉]',
|
||
'[耶]', '[吃瓜]', '[加油]', '[汗]', '[天啊]', '[Emm]', '[社会社会]', '[旺柴]', '[好的]', '[打脸]',
|
||
'[哇]', '[翻白眼]', '[666]', '[让我看看]', '[叹气]', '[苦涩]', '[裂开]', '[嘴唇]', '[爱心]',
|
||
'[心碎]', '[拥抱]', '[强]', '[弱]', '[握手]', '[胜利]', '[抱拳]', '[勾引]', '[拳头]', '[OK]',
|
||
'[合十]', '[啤酒]', '[咖啡]', '[蛋糕]', '[玫瑰]', '[凋谢]', '[菜刀]', '[炸弹]', '[便便]', '[月亮]',
|
||
'[太阳]', '[庆祝]', '[礼物]', '[红包]', '[發]', '[福]', '[烟花]', '[爆竹]', '[猪头]', '[跳跳]',
|
||
'[发抖]', '[转圈]']
|
||
# 除了微信自带的表情,添加一些常用的 Emoji 表情,除此之外还可以添加一些想要统计的单词
|
||
stickers.extend(['😂', '🤣', '🥰', '😅', '🥹', '😘', '🤩', '🥺', '😓', '🙄', '🤡'])
|
||
stickers_dict = {stickers: 0 for stickers in stickers}
|
||
max_emoji = []
|
||
emoji_max = 0
|
||
for word in stickers_dict.keys():
|
||
# 使用文本的 count 函数,计算里面包含了多少个 word 变量,然后加回去
|
||
if text.count(word) > emoji_max:
|
||
max_emoji = [word]
|
||
emoji_max = text.count(word)
|
||
elif text.count(word) == emoji_max:
|
||
max_emoji.append(word)
|
||
|
||
# 统计每天分布于聊天次数最多的一天
|
||
date_dict = {}
|
||
date_max = 0
|
||
max_date = []
|
||
date_min = date_list.count(date_list[0])
|
||
min_date = [date_list[0]]
|
||
for key in date_keys:
|
||
date_dict[key] = date_list.count(key)
|
||
if date_list.count(key) > date_max:
|
||
max_date = [key]
|
||
date_max = date_list.count(key)
|
||
elif date_list.count(key) == date_max:
|
||
max_date.append(key)
|
||
if date_list.count(key) < date_min:
|
||
min_date = [key]
|
||
date_min = date_list.count(key)
|
||
elif date_list.count(key) == date_min:
|
||
min_date.append(key)
|
||
|
||
# 统计时间段
|
||
time_dict = {}
|
||
s_time_dict = {}
|
||
time_max = 0
|
||
max_time = []
|
||
time_min = time_list.count(time_list[0])
|
||
min_time = [time_list[0]]
|
||
for key in time_keys:
|
||
time_dict[key] = time_list.count(key)
|
||
if time_list.count(key) > time_max:
|
||
max_time = [key]
|
||
time_max = time_list.count(key)
|
||
elif time_list.count(key) == time_max:
|
||
max_time.append(key)
|
||
if time_list.count(key) < time_min:
|
||
min_time = [key]
|
||
time_min = time_list.count(key)
|
||
elif time_list.count(key) == time_min:
|
||
min_time.append(key)
|
||
i = 0
|
||
while i < 24:
|
||
try:
|
||
s_time_dict[i] = time_dict[i]
|
||
except:
|
||
s_time_dict[i] = 0
|
||
i += 1
|
||
time_dict = s_time_dict
|
||
|
||
sum = 0
|
||
for len in lens_list:
|
||
sum += len
|
||
avg = sum / lens_list.__len__()
|
||
|
||
filename = generate_filename()
|
||
|
||
base_path = os.path.join("./out", remove_invalid_chars(filename))
|
||
if not os.path.exists(base_path):
|
||
os.mkdir(base_path)
|
||
mpl.rcParams["font.sans-serif"] = ["SimHei"]
|
||
heat_html = drawHeat(date_dict)
|
||
time_bar_html = drawBar([str(i) + "\n到\n" + str(i + 1) + "\n点" for i in time_dict.keys()],
|
||
[int(i) for i in list(time_dict.values())], "时间段", "消息数",
|
||
os.path.join(base_path, "timebar"))
|
||
typepie_html = drawPie(list(count.keys()), list(count.values()), "各类消息占比",
|
||
os.path.join(base_path, "typepie"))
|
||
send_pie_html = drawPie(["我发送的", "我收到的"], [send, receive], "发送与收到消息占比",
|
||
os.path.join(base_path, "pie"))
|
||
wordcloud_html = drawWordCloud(ls_str)
|
||
if ' ' in ls_str:
|
||
wc.generate(ls_str)
|
||
wc.to_file(os.path.join(base_path, "wordcloud.png"))
|
||
|
||
out_data = {
|
||
"nick": nick,
|
||
"origin_date": str(origin_datetime),
|
||
"latest_time": str(latest_time),
|
||
"time_comment": time_comment(latest_time),
|
||
"total_num": total_num,
|
||
"text_count": count["text"],
|
||
"voice_count": count["voice"],
|
||
"text_comment": text_comment(count["text"], count["voice"]),
|
||
"avg": "{:.2f}".format(avg),
|
||
"avg_comment": avg_comment(avg),
|
||
"receive": receive,
|
||
"send_comment": send_comment(send, receive),
|
||
"note": remove_invalid_chars(talkerName),
|
||
"max_date": list2str([str(i.month) + "月" + str(i.day) + "日" for i in max_date]),
|
||
"date_max": date_max,
|
||
"min_date": list2str([str(i.month) + "月" + str(i.day) + "日" for i in min_date]),
|
||
"date_min": date_min,
|
||
"max_time": list2str([str(i) + "时到" + str(i + 1) + "时" for i in max_time]),
|
||
"time_max": time_max,
|
||
"min_time": list2str([str(i) + "时到" + str(i + 1) + "时" for i in min_time]),
|
||
"time_min": time_min,
|
||
"max_word": list2str(max_word),
|
||
"word_max": word_max,
|
||
"sum": text.__len__(),
|
||
"text_percent": str("{:.2f}".format(count["text"] / total_num * 100)) + "%",
|
||
"max_date_this": this(max_date),
|
||
"min_date_this": this(min_date),
|
||
"max_time_this": this(max_time),
|
||
"min_time_this": this(min_time),
|
||
"file": filename,
|
||
"max_emoji":str(max_emoji),
|
||
"emoji_max":emoji_max,
|
||
"text":send_text(nick,filename),
|
||
"url":f"https://2024.peterzhong.site/report/{filename}.html",
|
||
"time_bar": time_bar_html,
|
||
"send_pie": send_pie_html,
|
||
"type_pie": typepie_html,
|
||
"heat": heat_html,
|
||
"wordcloud_html": wordcloud_html
|
||
}
|
||
|
||
md_path = os.path.join("./out", remove_invalid_chars(talkerName) + ".md")
|
||
shutil.copy("./template.md", md_path)
|
||
print(md_path)
|
||
content = ""
|
||
with open(md_path, encoding="utf-8", mode="r") as file:
|
||
content = file.read()
|
||
for key in out_data.keys():
|
||
content = content.replace("%" + str(key) + "%", str(out_data[key]))
|
||
with open(md_path, encoding="utf-8", mode="w+") as file:
|
||
file.write(content)
|
||
shutil.copy(md_path,os.path.join("./out", filename + ".md")) # 如果不需要保留密码文件名请删除此行
|
||
files.append(filename)
|
||
names.append(talkerName)
|
||
os.remove(md_path) # 如果需要保留原文件名请注释此行
|
||
out_datas.append(out_data)
|
||
update_toc()
|
||
data = {
|
||
"user": {
|
||
"nickName": nick,
|
||
"note": talkerName
|
||
},
|
||
"type": {
|
||
"text": count["text"],
|
||
"voice": count["voice"],
|
||
"emoji": count["emoji"],
|
||
"video": count["video"],
|
||
"quote": count["quote"],
|
||
"system": count["system"],
|
||
"vot": count["vot"],
|
||
"card": count["card"],
|
||
"image": count["image"],
|
||
"location": count["location"],
|
||
"recommend": count["recommend"],
|
||
},
|
||
"count": {
|
||
"total": total_num,
|
||
"avgLen": avg,
|
||
"word": {
|
||
"popWord": max_word,
|
||
"pop": word_max
|
||
},
|
||
"emoji": {
|
||
"popEmoji": max_emoji,
|
||
"pop": emoji_max
|
||
},
|
||
"date": {
|
||
"dateMax": date_max,
|
||
"dateMin": date_min,
|
||
"maxDate": [date_handler(max_date) for max_date in max_date],
|
||
"minDate": [date_handler(min_date) for min_date in min_date]
|
||
},
|
||
"time": {
|
||
"timeMax": time_max,
|
||
"timeMin": time_min,
|
||
"maxtime": [max_time for max_time in max_time],
|
||
"mintime": [min_time for min_time in min_time]
|
||
},
|
||
},
|
||
"time": {
|
||
"start": date_handler(origin_datetime),
|
||
"latest": time_handler(latest_time)
|
||
},
|
||
"out": {
|
||
"wordCloud": 0
|
||
}
|
||
}
|
||
datas.append(data)
|
||
plt.close('all')
|
||
return out_data
|
||
|
||
|
||
def time_comment(datetime):
|
||
if 7 < datetime.hour < 22:
|
||
return "我们的作息好像还挺合理,或许只是消息的作息比较巧合吧。"
|
||
else:
|
||
return "夜猫子无疑了,聊这么晚不会是在聊工作吧?"
|
||
|
||
|
||
def avg_comment(avg):
|
||
if avg < 15:
|
||
return "长话短说,这效率不错"
|
||
else:
|
||
return "到底是谁比较啰嗦一些呢[狗头]"
|
||
|
||
|
||
def send_comment(send, receive):
|
||
total = send + receive
|
||
if (send - receive) / total > 0.2:
|
||
return "看来我是个话痨。"
|
||
elif (send - receive) / total < -0.2:
|
||
return "你的话让我受益匪浅,常常仔细地听着你讲。"
|
||
else:
|
||
return "基本上是一来一回,有问有答了。"
|
||
|
||
|
||
def text_comment(text, voice):
|
||
if text > voice:
|
||
return "看来还是文本消息比较方便彼此沟通"
|
||
else:
|
||
return "估计是有比较多紧急情况,来不及手输文字了"
|
||
|
||
|
||
def date_handler(d):
|
||
return d.strftime('%Y-%m-%d')
|
||
|
||
|
||
def time_handler(d):
|
||
return d.strftime('%H-%M-%S')
|
||
|
||
|
||
def later_time(time_new, time_old):
|
||
time0 = datetime.datetime.strptime("1999-01-01 0:00:00", "%Y-%m-%d %H:%M:%S")
|
||
time7 = datetime.datetime.strptime("1999-01-01 7:00:00", "%Y-%m-%d %H:%M:%S")
|
||
if time0.time() < time_old.time() < time7.time():
|
||
if time_new.time() > time7.time():
|
||
return time_old
|
||
elif time_old.time() > time_new.time():
|
||
return time_old
|
||
else:
|
||
return time_new
|
||
else:
|
||
if time0.time() < time_new.time() < time7.time():
|
||
return time_new
|
||
else:
|
||
if time_new.time() > time_old.time():
|
||
return time_new
|
||
else:
|
||
return time_old
|
||
|
||
|
||
# 横轴标签(星期)函数
|
||
def label_days(ax, dates, i, j, calendar):
|
||
ni, nj = calendar.shape
|
||
day_of_month = np.nan * np.zeros((ni, 7))
|
||
day_of_month[i, j] = [d.day for d in dates]
|
||
|
||
for (i, j), day in np.ndenumerate(day_of_month):
|
||
if np.isfinite(day):
|
||
ax.text(j, i, int(day), ha='center', va='center')
|
||
|
||
ax.set(xticks=np.arange(7),
|
||
xticklabels=['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'])
|
||
ax.xaxis.tick_top()
|
||
|
||
|
||
# 纵轴标签(月份)函数
|
||
def label_months(ax, dates, i, j, calendar):
|
||
month_labels = np.array(['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul',
|
||
'Aug', 'Sep', 'Oct', 'Nov', 'Dec'])
|
||
months = np.array([d.month for d in dates])
|
||
uniq_months = sorted(set(months))
|
||
yticks = [i[months == m].mean() for m in uniq_months]
|
||
labels = [month_labels[m - 1] for m in uniq_months]
|
||
ax.set(yticks=yticks)
|
||
ax.set_yticklabels(labels, rotation=90)
|
||
|
||
|
||
# 数据封装函数
|
||
def calendar_array(dates, data):
|
||
i, j = zip(*[d.isocalendar()[1:] for d in dates])
|
||
i = np.array(i) - min(i)
|
||
j = np.array(j) - 1
|
||
ni = max(i) + 1
|
||
|
||
calendar = np.nan * np.zeros((ni, 7))
|
||
calendar[i, j] = data
|
||
return i, j, calendar
|
||
|
||
|
||
def remove_invalid_chars(text):
|
||
rstr = r"[\/\\\:\*\?\"\<\>\|]" # '/ \ : * ? " < > |'
|
||
if text != None:
|
||
text = re.sub(rstr, "_", text) # 替换为下划线
|
||
else:
|
||
text=str(text)+"_无备注"
|
||
return str(text)
|
||
|
||
|
||
# 传入日历数据和日期,输出日历图像
|
||
def calendar_heatmap(ax, dates, data):
|
||
i, j, calendar = calendar_array(dates, data)
|
||
im = ax.imshow(calendar, interpolation='none', cmap='summer')
|
||
label_days(ax, dates, i, j, calendar)
|
||
label_months(ax, dates, i, j, calendar)
|
||
ax.figure.colorbar(im)
|
||
|
||
|
||
def drawBar(axis_x, axis_y, xlabel, ylabel, path):
|
||
plt.clf()
|
||
# use_font('Heiti TC')
|
||
plt.bar(axis_x, axis_y)
|
||
plt.xlabel(xlabel)
|
||
plt.ylabel(ylabel)
|
||
plt.savefig(path)
|
||
bar = Bar()
|
||
bar.add_xaxis(axis_x)
|
||
bar.add_yaxis(ylabel, axis_y)
|
||
bar.set_global_opts(title_opts=opts.TitleOpts(title="消息时间段分布"))
|
||
return bar.render_embed()
|
||
|
||
|
||
def drawHeat(data):
|
||
# plt.clf()
|
||
# fig, ax = plt.subplots()
|
||
# calendar_heatmap(ax, dates, data)
|
||
# plt.savefig(path)
|
||
data_list = [
|
||
[datetime.datetime.strptime(str(date), "%Y-%m-%d"), freq]
|
||
for date, freq in data.items()
|
||
]
|
||
calendar = (
|
||
Calendar()
|
||
.add(
|
||
series_name="",
|
||
yaxis_data=data_list,
|
||
calendar_opts=opts.CalendarOpts(
|
||
pos_top="50px",
|
||
pos_left="30px",
|
||
pos_right="30px",
|
||
range_="2024", # 指定年份范围
|
||
),
|
||
)
|
||
.set_global_opts(
|
||
visualmap_opts=opts.VisualMapOpts(
|
||
max_=max([item[1] for item in data_list]),
|
||
min_=min([item[1] for item in data_list]),
|
||
orient="horizontal",
|
||
is_piecewise=True,
|
||
range_color=["#b6e1bc", "#0d7000"]
|
||
),
|
||
title_opts=opts.TitleOpts(title="消息热力图"),
|
||
)
|
||
)
|
||
return calendar.render_embed()
|
||
|
||
|
||
def drawPie(labels, data, title, path):
|
||
plt.clf()
|
||
plt.axes(aspect='equal')
|
||
plt.pie(x=data, labels=labels)
|
||
plt.title(title)
|
||
plt.legend(loc='best')
|
||
plt.savefig(path)
|
||
pie = Pie()
|
||
pie.add("", [list(z) for z in zip(labels, data)]).set_global_opts(title_opts=opts.TitleOpts(title=title)).set_series_opts(label_opts=opts.LabelOpts(formatter="{b}: {c}"))
|
||
return pie.render_embed()
|
||
|
||
def drawWordCloud(text):
|
||
# 分词并统计词频
|
||
words = text.split() # 使用空格分词
|
||
word_counts = Counter(words) # 统计词频
|
||
|
||
# 准备词云图需要的数据格式 [(word, freq), ...]
|
||
data = word_counts.items()
|
||
words = list(word_counts.items())
|
||
|
||
# 创建词云图
|
||
wordcloud = (
|
||
WordCloud()
|
||
.add("", words, word_size_range=[20, 100]) # 圆形词云
|
||
.set_global_opts(title_opts=opts.TitleOpts(title="聊天关键词"))
|
||
)
|
||
html = wordcloud.render_embed()
|
||
return wordcloud.render_embed()
|
||
|
||
|
||
def this(arr):
|
||
arr = list(arr)
|
||
if len(arr) == 1:
|
||
return "这"
|
||
elif len(arr) == 2:
|
||
return "这两"
|
||
else:
|
||
return "这几"
|
||
|
||
|
||
def list2str(list):
|
||
res = ""
|
||
for item in list:
|
||
res += "、`" + str(item)+"`"
|
||
res = res[1:]
|
||
res = replace_last_string(res, "、", "和")
|
||
return res
|
||
|
||
|
||
def replace_last_string(text, old_str, new_str):
|
||
last_index = text.rfind(old_str)
|
||
if last_index != -1:
|
||
new_text = text[:last_index] + new_str + text[last_index + len(old_str):]
|
||
return new_text
|
||
else:
|
||
return text
|
||
|
||
def generate_filename(length=15):
|
||
letters = string.ascii_lowercase + string.digits # 包含小写字母和数字
|
||
filename = ''.join(random.choice(letters) for _ in range(length))
|
||
return filename
|
||
|
||
def write_dict_to_excel(data_dict, filename):
|
||
# 创建一个新的Excel工作簿
|
||
workbook = Workbook()
|
||
# 获取活动的工作表
|
||
worksheet = workbook.active
|
||
# 写入字典的键作为表头
|
||
headers = list(data_dict[0].keys())
|
||
worksheet.append(headers)
|
||
for item in data_dict:
|
||
# 写入字典的值
|
||
values = list(item.values())
|
||
worksheet.append(values)
|
||
# 保存Excel文件
|
||
workbook.save(filename)
|
||
|
||
def send_text(nick, filename):
|
||
text=f"尊敬的{nick}:你好!我们即将迎来2024年,首先祝你新年快乐,来年身体健康、一切顺心[加油]。回顾2023年,我为你做了一份我们的微信好友年度报告作为纪念,诚邀你来看看,挺有意思:https://peterzhong1219.site/report/{filename}.html。让2023一切不如意随风而去,多多希望诗和远方——说不定哪天就实现了呢,愿共勉。【回复TD退订】[吃瓜]"
|
||
return text
|
||
|
||
def update_toc():
|
||
md = "# 微信好友年度报告\n\n"
|
||
i = 0
|
||
for filename in files:
|
||
md += "- [" + str(names[i]) + "](" + str(filename) + ".md)\n"
|
||
i += 1
|
||
with open("./out/SUMMARY.md", "w+", encoding="utf-8") as file:
|
||
file.write(md)
|
||
|
||
# 按绿色按钮运行
|
||
if __name__ == '__main__':
|
||
file_path = "./messages.xlsx"
|
||
main_progress(file_path)
|
||
with open("output.json", "w", encoding="utf-8") as file:
|
||
json.dump(datas, file, indent=" ", ensure_ascii=False)
|
||
write_dict_to_excel(out_datas,"output.xlsx") |