import datetime import json import re import shutil import string import threading from collections import Counter from enum import Enum import openpyxl import jieba import wordcloud from numpy import sort from imageio.v2 import imread from openpyxl.workbook import Workbook from wordcloud import WordCloud, STOPWORDS import matplotlib.pyplot as plt from mplfonts import use_font # 系统库 import _thread import datetime import random import math # 数据分析库 import numpy as np from pylab import mpl import pandas as pd import matplotlib.pyplot as plt # 不展示警告信息 import warnings # warnings.filterwarnings('ignore') # 使一个单元格可以输出多次 from IPython.core.interactiveshell import InteractiveShell # import aspose.words as aw InteractiveShell.ast_node_interactivity = "all" # pyecharts相关 from pyecharts.globals import CurrentConfig, OnlineHostType, ThemeType, ChartType, SymbolType CurrentConfig.ONLINE_HOST = OnlineHostType.NOTEBOOK_HOST from pyecharts.commons.utils import JsCode from pyecharts.components import Table from pyecharts.charts import * # 图表类型 from pyecharts import options as opts # 配置项 from pyecharts.charts import Bar import os datas = [] files = [] names = [] out_datas = [] class Type(Enum): TEXT = 1 IMAGE = 3 EMOJI = 47 VIDEO = 43 SYSTEM = 10000 VOICE = 34 LOCATION = 48 QUOTE = 49 VOT = 50 CARD = 42 RECOMMEND = 37 def main_progress(file_path): print("进入主程序") workbook = openpyxl.load_workbook(file_path) sheet = workbook["messages"] print("表格已读入") workbook2 = openpyxl.load_workbook("./contacts.xlsx") print("联系人表格已载入") sheets = workbook.sheetnames contact_sheet=workbook2["contacts"] contacts=[] for row in contact_sheet.iter_rows(): if not row[4].value == "NickName": contacts.append(row[4].value) num = -1 for row in sheet.iter_rows(): if not num == row[1].value: num = num + 1 print("一共有", num, "组聊天记录。") talkerId = 0 talkerName = "" nick = "" ts = [] print(contacts) for nickName in contacts: thread = myThread(talkerId, nickName, sheet.iter_rows()) ts.append(thread) print("添加线程" + str(talkerId)) talkerId += 1 while talkerId <= num: # person(talkerId,sheet.iter_rows()) thread = "" talkerId += 1 for t in ts: t.run() print("END ALL") class myThread(threading.Thread): def __init__(self, threadID, talkerId, rows): threading.Thread.__init__(self) self.threadID = threadID self.talkerId = talkerId self.rows = rows def run(self): print("开始线程:" +str(self.talkerId)) person(self.talkerId, self.rows) print("退出线程:" + str(self.talkerId)) def person(talkerId, rows): # 这是同一组聊天记录的分析 text = "" # 汇总所有文本信息 empty_time = datetime.datetime.strptime("1999-01-01 7:00:00", "%Y-%m-%d %H:%M:%S") latest_time = empty_time origin_datetime = datetime.datetime.now() count = {"text": 0, "voice": 0, "emoji": 0, "video": 0, "quote": 0, "system": 0, "vot": 0, "card": 0, "image": 0, "location": 0, "recommend": 0} date_format = "%Y-%m-%d %H:%M:%S" total_num = 0 month_dict = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] date_list = [] date_keys = [] time_list = [] time_keys = [] lens_list = [] send = 0 receive = 0 found = False talkerName="" rowcount=0 for row in rows: # if row[1].value == talkerId or row[9] == talkerName: if True: rowcount += 1 # 总数 found=True total_num += 1 talkerName = row[9].value nick = row[10].value # 分类进行分析统计 if str(row[2].value) == str(Type.TEXT.value): text += str(row[7].value) # 凡是文本类型的数据都聚合一起方便后面词频分析 lens_list.append(str(row[7].value).__len__()) count["text"] += 1 elif str(row[2].value) == str(Type.IMAGE.value): count["image"] += 1 elif str(row[2].value) == str(Type.EMOJI.value): count["emoji"] += 1 elif str(row[2].value) == str(Type.VOICE.value): count["voice"] += 1 elif str(row[2].value) == str(Type.VIDEO.value): count["video"] += 1 elif str(row[2].value) == str(Type.VOT.value): count["vot"] += 1 elif str(row[2].value) == str(Type.CARD.value): count["card"] += 1 elif str(row[2].value) == str(Type.SYSTEM.value): count["system"] += 1 elif str(row[2].value) == str(Type.LOCATION.value): count["location"] += 1 elif str(row[2].value) == str(Type.QUOTE.value): count["quote"] += 1 elif str(row[2].value) == str(Type.RECOMMEND.value): count["recommend"] += 1 if row[8].value != "StrTime" and row[8].value != None: message_time = datetime.datetime.strptime(row[8].value, date_format) else: continue # 第一次聊天的日期 # 聊到最晚的时间 latest_time = later_time(message_time, latest_time) # 月份分布 i = 0 while i < 12: if message_time.month == i + 1: month_dict[i] += 1 i += 1 # 将日期列提取出来,方便后面进行数数 date_list.append(message_time.date()) if message_time.date() not in date_keys: date_keys.append(message_time.date()) # 提取出时间列方便后面统计时间段 time_list.append(message_time.hour) if message_time.hour not in time_keys: time_keys.append(message_time.hour) # 统计发送和接收的消息数 if str(row[4].value) == "1": send += 1 else: receive += 1 if not found: return text.replace("\n", "") print(rowcount) # 词频统计 ls = jieba.lcut(text) ls = [i for i in ls if i.__len__() > 1] ls = [i for i in ls if not i == "x000D"] ls_str = ' '.join(ls) if ls.__len__() < 1: ls_str="无内容 无内容 无内容" return stopwords = STOPWORDS word_keys = [] word_dict = {} for word in ls: if word not in word_keys: word_keys.append(word) for key in word_keys: word_dict[key] = ls.count(key) s_word_dict = dict(sorted(word_dict.items(), key=lambda x: x[1], reverse=True)) word_dict = s_word_dict # 添加新的停止词 emojis=['微笑', '撇嘴', '色', '发呆', '得意', '流泪', '害羞', '闭嘴', '睡', '大哭', '尴尬', '发怒', '调皮', '呲牙', '惊讶', '难过', '囧', '抓狂', '吐', '偷笑', '愉快', '白眼', '傲慢', '困', '惊恐', '憨笑', '悠闲', '咒骂', '疑问', '嘘', '晕', '衰', '骷髅', '敲打', '再见', '擦汗', '抠鼻', '鼓掌', '坏笑', '右哼哼', '鄙视', '委屈', '快哭了', '阴险', '亲亲', '可怜', '笑脸', '生病', '脸红', '破涕为笑', '恐惧', '失望', '无语', '嘿哈', '捂脸', '奸笑', '机智', '皱眉', '耶', '吃瓜', '加油', '汗', '天啊', 'Emm', '社会社会', '旺柴', '好的', '打脸', '哇', '翻白眼', '666', '让我看看', '叹气', '苦涩', '裂开', '嘴唇', '爱心', '心碎', '拥抱', '强', '弱', '握手', '胜利', '抱拳', '勾引', '拳头', 'OK', '合十', '啤酒', '咖啡', '蛋糕', '玫瑰', '凋谢', '菜刀', '炸弹', '便便', '月亮', '太阳', '庆祝', '礼物', '红包', '發', '福', '烟花', '爆竹', '猪头', '跳跳', '发抖', '转圈'] stopwords.update(emojis) max_word = [] word_max = 0 for word in word_keys: if ls.count(word) > word_max and word not in emojis: max_word = [word] word_max = ls.count(word) elif ls.count(word) == word_max: max_word.append(word) pattern = re.compile(r'[0-9a-zA-Z]') ls_str = re.sub(pattern, '', ls_str) background = imread('bg.jpg') wc = wordcloud.WordCloud( font_path="Deng.ttf", max_words=500, stopwords=stopwords, mask=background, background_color="white") # 表情分析 stickers = ['[微笑]', '[撇嘴]', '[色]', '[发呆]', '[得意]', '[流泪]', '[害羞]', '[闭嘴]', '[睡]', '[大哭]', '[尴尬]', '[发怒]', '[调皮]', '[呲牙]', '[惊讶]', '[难过]', '[囧]', '[抓狂]', '[吐]', '[偷笑]', '[愉快]', '[白眼]', '[傲慢]', '[困]', '[惊恐]', '[憨笑]', '[悠闲]', '[咒骂]', '[疑问]', '[嘘]', '[晕]', '[衰]', '[骷髅]', '[敲打]', '[再见]', '[擦汗]', '[抠鼻]', '[鼓掌]', '[坏笑]', '[右哼哼]', '[鄙视]', '[委屈]', '[快哭了]', '[阴险]', '[亲亲]', '[可怜]', '[笑脸]', '[生病]', '[脸红]', '[破涕为笑]', '[恐惧]', '[失望]', '[无语]', '[嘿哈]', '[捂脸]', '[奸笑]', '[机智]', '[皱眉]', '[耶]', '[吃瓜]', '[加油]', '[汗]', '[天啊]', '[Emm]', '[社会社会]', '[旺柴]', '[好的]', '[打脸]', '[哇]', '[翻白眼]', '[666]', '[让我看看]', '[叹气]', '[苦涩]', '[裂开]', '[嘴唇]', '[爱心]', '[心碎]', '[拥抱]', '[强]', '[弱]', '[握手]', '[胜利]', '[抱拳]', '[勾引]', '[拳头]', '[OK]', '[合十]', '[啤酒]', '[咖啡]', '[蛋糕]', '[玫瑰]', '[凋谢]', '[菜刀]', '[炸弹]', '[便便]', '[月亮]', '[太阳]', '[庆祝]', '[礼物]', '[红包]', '[發]', '[福]', '[烟花]', '[爆竹]', '[猪头]', '[跳跳]', '[发抖]', '[转圈]'] # 除了微信自带的表情,添加一些常用的 Emoji 表情,除此之外还可以添加一些想要统计的单词 stickers.extend(['😂', '🤣', '🥰', '😅', '🥹', '😘', '🤩', '🥺', '😓', '🙄', '🤡']) stickers_dict = {stickers: 0 for stickers in stickers} max_emoji = [] emoji_max = 0 for word in stickers_dict.keys(): # 使用文本的 count 函数,计算里面包含了多少个 word 变量,然后加回去 if text.count(word) > emoji_max: max_emoji = [word] emoji_max = text.count(word) elif text.count(word) == emoji_max: max_emoji.append(word) # 统计每天分布于聊天次数最多的一天 date_dict = {} date_max = 0 max_date = [] date_min = date_list.count(date_list[0]) min_date = [date_list[0]] for key in date_keys: date_dict[key] = date_list.count(key) if date_list.count(key) > date_max: max_date = [key] date_max = date_list.count(key) elif date_list.count(key) == date_max: max_date.append(key) if date_list.count(key) < date_min: min_date = [key] date_min = date_list.count(key) elif date_list.count(key) == date_min: min_date.append(key) # 统计时间段 time_dict = {} s_time_dict = {} time_max = 0 max_time = [] time_min = time_list.count(time_list[0]) min_time = [time_list[0]] for key in time_keys: time_dict[key] = time_list.count(key) if time_list.count(key) > time_max: max_time = [key] time_max = time_list.count(key) elif time_list.count(key) == time_max: max_time.append(key) if time_list.count(key) < time_min: min_time = [key] time_min = time_list.count(key) elif time_list.count(key) == time_min: min_time.append(key) i = 0 while i < 24: try: s_time_dict[i] = time_dict[i] except: s_time_dict[i] = 0 i += 1 time_dict = s_time_dict sum = 0 for len in lens_list: sum += len avg = sum / lens_list.__len__() filename = generate_filename() base_path = os.path.join("./out", remove_invalid_chars(filename)) if not os.path.exists(base_path): os.mkdir(base_path) mpl.rcParams["font.sans-serif"] = ["SimHei"] heat_html = drawHeat(date_dict) time_bar_html = drawBar([str(i) + "\n到\n" + str(i + 1) + "\n点" for i in time_dict.keys()], [int(i) for i in list(time_dict.values())], "时间段", "消息数", os.path.join(base_path, "timebar")) typepie_html = drawPie(list(count.keys()), list(count.values()), "各类消息占比", os.path.join(base_path, "typepie")) send_pie_html = drawPie(["我发送的", "我收到的"], [send, receive], "发送与收到消息占比", os.path.join(base_path, "pie")) wordcloud_html = drawWordCloud(ls_str) if ' ' in ls_str: wc.generate(ls_str) wc.to_file(os.path.join(base_path, "wordcloud.png")) out_data = { "nick": nick, "origin_date": str(origin_datetime), "latest_time": str(latest_time), "time_comment": time_comment(latest_time), "total_num": total_num, "text_count": count["text"], "voice_count": count["voice"], "text_comment": text_comment(count["text"], count["voice"]), "avg": "{:.2f}".format(avg), "avg_comment": avg_comment(avg), "receive": receive, "send_comment": send_comment(send, receive), "note": remove_invalid_chars(talkerName), "max_date": list2str([str(i.month) + "月" + str(i.day) + "日" for i in max_date]), "date_max": date_max, "min_date": list2str([str(i.month) + "月" + str(i.day) + "日" for i in min_date]), "date_min": date_min, "max_time": list2str([str(i) + "时到" + str(i + 1) + "时" for i in max_time]), "time_max": time_max, "min_time": list2str([str(i) + "时到" + str(i + 1) + "时" for i in min_time]), "time_min": time_min, "max_word": list2str(max_word), "word_max": word_max, "sum": text.__len__(), "text_percent": str("{:.2f}".format(count["text"] / total_num * 100)) + "%", "max_date_this": this(max_date), "min_date_this": this(min_date), "max_time_this": this(max_time), "min_time_this": this(min_time), "file": filename, "max_emoji":str(max_emoji), "emoji_max":emoji_max, "text":send_text(nick,filename), "url":f"https://2024.peterzhong.site/report/{filename}.html", "time_bar": time_bar_html, "send_pie": send_pie_html, "type_pie": typepie_html, "heat": heat_html, "wordcloud_html": wordcloud_html } md_path = os.path.join("./out", remove_invalid_chars(talkerName) + ".md") shutil.copy("./template.md", md_path) print(md_path) content = "" with open(md_path, encoding="utf-8", mode="r") as file: content = file.read() for key in out_data.keys(): content = content.replace("%" + str(key) + "%", str(out_data[key])) with open(md_path, encoding="utf-8", mode="w+") as file: file.write(content) shutil.copy(md_path,os.path.join("./out", filename + ".md")) # 如果不需要保留密码文件名请删除此行 files.append(filename) names.append(talkerName) os.remove(md_path) # 如果需要保留原文件名请注释此行 out_datas.append(out_data) update_toc() data = { "user": { "nickName": nick, "note": talkerName }, "type": { "text": count["text"], "voice": count["voice"], "emoji": count["emoji"], "video": count["video"], "quote": count["quote"], "system": count["system"], "vot": count["vot"], "card": count["card"], "image": count["image"], "location": count["location"], "recommend": count["recommend"], }, "count": { "total": total_num, "avgLen": avg, "word": { "popWord": max_word, "pop": word_max }, "emoji": { "popEmoji": max_emoji, "pop": emoji_max }, "date": { "dateMax": date_max, "dateMin": date_min, "maxDate": [date_handler(max_date) for max_date in max_date], "minDate": [date_handler(min_date) for min_date in min_date] }, "time": { "timeMax": time_max, "timeMin": time_min, "maxtime": [max_time for max_time in max_time], "mintime": [min_time for min_time in min_time] }, }, "time": { "start": date_handler(origin_datetime), "latest": time_handler(latest_time) }, "out": { "wordCloud": 0 } } datas.append(data) plt.close('all') return out_data def time_comment(datetime): if 7 < datetime.hour < 22: return "我们的作息好像还挺合理,或许只是消息的作息比较巧合吧。" else: return "夜猫子无疑了,聊这么晚不会是在聊工作吧?" def avg_comment(avg): if avg < 15: return "长话短说,这效率不错" else: return "到底是谁比较啰嗦一些呢[狗头]" def send_comment(send, receive): total = send + receive if (send - receive) / total > 0.2: return "看来我是个话痨。" elif (send - receive) / total < -0.2: return "你的话让我受益匪浅,常常仔细地听着你讲。" else: return "基本上是一来一回,有问有答了。" def text_comment(text, voice): if text > voice: return "看来还是文本消息比较方便彼此沟通" else: return "估计是有比较多紧急情况,来不及手输文字了" def date_handler(d): return d.strftime('%Y-%m-%d') def time_handler(d): return d.strftime('%H-%M-%S') def later_time(time_new, time_old): time0 = datetime.datetime.strptime("1999-01-01 0:00:00", "%Y-%m-%d %H:%M:%S") time7 = datetime.datetime.strptime("1999-01-01 7:00:00", "%Y-%m-%d %H:%M:%S") if time0.time() < time_old.time() < time7.time(): if time_new.time() > time7.time(): return time_old elif time_old.time() > time_new.time(): return time_old else: return time_new else: if time0.time() < time_new.time() < time7.time(): return time_new else: if time_new.time() > time_old.time(): return time_new else: return time_old # 横轴标签(星期)函数 def label_days(ax, dates, i, j, calendar): ni, nj = calendar.shape day_of_month = np.nan * np.zeros((ni, 7)) day_of_month[i, j] = [d.day for d in dates] for (i, j), day in np.ndenumerate(day_of_month): if np.isfinite(day): ax.text(j, i, int(day), ha='center', va='center') ax.set(xticks=np.arange(7), xticklabels=['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']) ax.xaxis.tick_top() # 纵轴标签(月份)函数 def label_months(ax, dates, i, j, calendar): month_labels = np.array(['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']) months = np.array([d.month for d in dates]) uniq_months = sorted(set(months)) yticks = [i[months == m].mean() for m in uniq_months] labels = [month_labels[m - 1] for m in uniq_months] ax.set(yticks=yticks) ax.set_yticklabels(labels, rotation=90) # 数据封装函数 def calendar_array(dates, data): i, j = zip(*[d.isocalendar()[1:] for d in dates]) i = np.array(i) - min(i) j = np.array(j) - 1 ni = max(i) + 1 calendar = np.nan * np.zeros((ni, 7)) calendar[i, j] = data return i, j, calendar def remove_invalid_chars(text): rstr = r"[\/\\\:\*\?\"\<\>\|]" # '/ \ : * ? " < > |' if text != None: text = re.sub(rstr, "_", text) # 替换为下划线 else: text=str(text)+"_无备注" return str(text) # 传入日历数据和日期,输出日历图像 def calendar_heatmap(ax, dates, data): i, j, calendar = calendar_array(dates, data) im = ax.imshow(calendar, interpolation='none', cmap='summer') label_days(ax, dates, i, j, calendar) label_months(ax, dates, i, j, calendar) ax.figure.colorbar(im) def drawBar(axis_x, axis_y, xlabel, ylabel, path): plt.clf() # use_font('Heiti TC') plt.bar(axis_x, axis_y) plt.xlabel(xlabel) plt.ylabel(ylabel) plt.savefig(path) bar = Bar() bar.add_xaxis(axis_x) bar.add_yaxis(ylabel, axis_y) bar.set_global_opts(title_opts=opts.TitleOpts(title="消息时间段分布")) return bar.render_embed() def drawHeat(data): # plt.clf() # fig, ax = plt.subplots() # calendar_heatmap(ax, dates, data) # plt.savefig(path) data_list = [ [datetime.datetime.strptime(str(date), "%Y-%m-%d"), freq] for date, freq in data.items() ] calendar = ( Calendar() .add( series_name="", yaxis_data=data_list, calendar_opts=opts.CalendarOpts( pos_top="50px", pos_left="30px", pos_right="30px", range_="2024", # 指定年份范围 ), ) .set_global_opts( visualmap_opts=opts.VisualMapOpts( max_=max([item[1] for item in data_list]), min_=min([item[1] for item in data_list]), orient="horizontal", is_piecewise=True, range_color=["#b6e1bc", "#0d7000"] ), title_opts=opts.TitleOpts(title="消息热力图"), ) ) return calendar.render_embed() def drawPie(labels, data, title, path): plt.clf() plt.axes(aspect='equal') plt.pie(x=data, labels=labels) plt.title(title) plt.legend(loc='best') plt.savefig(path) pie = Pie() pie.add("", [list(z) for z in zip(labels, data)]).set_global_opts(title_opts=opts.TitleOpts(title=title)).set_series_opts(label_opts=opts.LabelOpts(formatter="{b}: {c}")) return pie.render_embed() def drawWordCloud(text): # 分词并统计词频 words = text.split() # 使用空格分词 word_counts = Counter(words) # 统计词频 # 准备词云图需要的数据格式 [(word, freq), ...] data = word_counts.items() words = list(word_counts.items()) # 创建词云图 wordcloud = ( WordCloud() .add("", words, word_size_range=[20, 100]) # 圆形词云 .set_global_opts(title_opts=opts.TitleOpts(title="聊天关键词")) ) html = wordcloud.render_embed() return wordcloud.render_embed() def this(arr): arr = list(arr) if len(arr) == 1: return "这" elif len(arr) == 2: return "这两" else: return "这几" def list2str(list): res = "" for item in list: res += "、`" + str(item)+"`" res = res[1:] res = replace_last_string(res, "、", "和") return res def replace_last_string(text, old_str, new_str): last_index = text.rfind(old_str) if last_index != -1: new_text = text[:last_index] + new_str + text[last_index + len(old_str):] return new_text else: return text def generate_filename(length=15): letters = string.ascii_lowercase + string.digits # 包含小写字母和数字 filename = ''.join(random.choice(letters) for _ in range(length)) return filename def write_dict_to_excel(data_dict, filename): # 创建一个新的Excel工作簿 workbook = Workbook() # 获取活动的工作表 worksheet = workbook.active # 写入字典的键作为表头 headers = list(data_dict[0].keys()) worksheet.append(headers) for item in data_dict: # 写入字典的值 values = list(item.values()) worksheet.append(values) # 保存Excel文件 workbook.save(filename) def send_text(nick, filename): text=f"尊敬的{nick}:你好!我们即将迎来2024年,首先祝你新年快乐,来年身体健康、一切顺心[加油]。回顾2023年,我为你做了一份我们的微信好友年度报告作为纪念,诚邀你来看看,挺有意思:https://peterzhong1219.site/report/{filename}.html。让2023一切不如意随风而去,多多希望诗和远方——说不定哪天就实现了呢,愿共勉。【回复TD退订】[吃瓜]" return text def update_toc(): md = "# 微信好友年度报告\n\n" i = 0 for filename in files: md += "- [" + str(names[i]) + "](" + str(filename) + ".md)\n" i += 1 with open("./out/SUMMARY.md", "w+", encoding="utf-8") as file: file.write(md) # 按绿色按钮运行 if __name__ == '__main__': file_path = "./messages.xlsx" main_progress(file_path) with open("output.json", "w", encoding="utf-8") as file: json.dump(datas, file, indent=" ", ensure_ascii=False) write_dict_to_excel(out_datas,"output.xlsx")