forked from KarisAya/nonebot_plugin_game_collection
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathAccount.py
606 lines (534 loc) · 20.5 KB
/
Account.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
from typing import Tuple
from nonebot.adapters.onebot.v11 import (
Bot,
MessageEvent,
GroupMessageEvent,
Message,
MessageSegment
)
import random
import time
import datetime
from .utils.chart import (
bar_chart,
my_info_head,
my_info_account,
my_exchange_head,
linecard,
info_splicing
)
from .data import Company, UserDict, GroupAccount
from .data import props_library, props_index
from .config import bot_name,sign_gold, revolt_gold, revolt_cd, revolt_gini, max_bet_gold
from . import Manager
data = Manager.data
user_data = data.user
group_data = data.group
company_index = Manager.company_index
def gold_create(event:MessageEvent,gold:int) -> str:
"""
获取金币
"""
user,group_account = Manager.locate_user(event)
if not group_account:
return "私聊未关联账户,请发送【关联账户】关联群内账户。"
user.gold += gold
group_account.gold += gold
return f"你获得了 {gold} 金币"
def props_create(event:MessageEvent, prop_name:str, count:int) -> str:
"""
获取道具
"""
prop_code = props_index.get(prop_name)
if not prop_code:
return f"没有【{prop_name}】这种道具。"
user,group_account = Manager.locate_user(event)
if not group_account:
return "私聊未关联账户,请发送【关联账户】关联群内账户。"
if prop_code[1] == "3":
account = user
else:
account = group_account
account.props.setdefault(prop_code,0)
account.props[prop_code] += count
return f"你获得了{count}个【{prop_name}】!"
def sign(event:MessageEvent) -> str:
"""
签到
"""
user,group_account = Manager.locate_user(event)
if not group_account:
return "私聊未关联账户,请发送【关联账户】关联群内账户。"
if group_account.is_sign:
return "你已经签过到了哦"
else:
gold = random.randint(sign_gold[0], sign_gold[1])
user.gold += gold
group_account.gold += gold
group_account.is_sign = True
return random.choice(["祝你好运~", "可别花光了哦~"]) + f"\n你获得了 {gold} 金币"
def revolt_sign(event:MessageEvent) -> str:
"""
重置签到
"""
user,group_account = Manager.locate_user(event)
if not group_account:
return "私聊未关联账户,请发送【关联账户】关联群内账户。"
if group_account.revolution:
return "你没有待领取的金币"
else:
gold = random.randint(revolt_gold[0], revolt_gold[1])
user.gold += gold
group_account.gold += gold
group_account.revolution = True
return f"这是你重置后获得的金币,你获得了 {gold} 金币"
def revolution(group_id:int) -> str:
"""
发动革命
group_id:群号
"""
if group_id not in group_data:
return None
group = group_data[group_id]
if time.time() - group.revolution_time < revolt_cd:
return f"重置正在冷却中,结束时间:{datetime.datetime.fromtimestamp(group.revolution_time + revolt_cd).strftime('%H:%M:%S')}"
if (gold := Manager.group_wealths(group_id)) < (limit := 15 * max_bet_gold):
return f"本群金币({round(gold,2)})小于{limit},未满足重置条件。"
if (gini := Manager.Gini(group_id)) < revolt_gini:
return f"当前基尼系数为{round(gini,3)},未满足重置条件。"
rank = Manager.group_ranklist(group_id,"资产")
user_id = rank[0][0]
group_account = user_data[user_id].group_accounts[group_id]
group_account.props.setdefault("02101",0)
group_account.props["02101"] += 1
group.revolution_time = time.time()
group.Achieve_revolution.setdefault(user_id,0)
group.Achieve_revolution[user_id] += 1
first_name = group_account.nickname
i = 0.0
j = i**2
for x in rank[:10]:
user = user_data[x[0]]
group_account = user.group_accounts[group_id]
gold = int(group_account.value*j - group_account.gold*(1-j))
user.gold += gold
group_account.gold += gold
company_ids = [company_id for company_id in group_account.stocks]
for company_id in company_ids:
company = group_data[company_id].company
if user_id in company.exchange:
del company.exchange[user_id]
company.stock += group_account.stocks[company_id]
del group_account.stocks[company_id]
i += 0.1
j = i**2
for user_id in group.namelist:
user_data[user_id].group_accounts[group_id].revolution = False
data.save()
return f"重置成功!恭喜{first_name}进入挂件榜☆!\n当前系数为:{round(gini,3)},重置签到已刷新。"
def transfer_gold(event:GroupMessageEvent, target:Tuple[UserDict,GroupAccount], gold:int) -> str:
"""
转账处理
param:
event: GroupMessageEvent
target:目标账户
gold: 转账金币
"""
self_user,self_group_account = Manager.locate_user(event)
target_user,target_group_account = target
if self_group_account.gold < gold:
return f"你没有足够的金币,无法完成结算。\n——你还有{self_group_account.gold}枚金币。"
if target_group_account.props.get("42001",0):
fee = 0
tips = f"『{props_library['42001']['name']}』免手续费"
else:
fee = int(gold * 0.02)
tips = f"扣除2%手续费:{fee},实际到账金额{gold - fee}"
self_user.gold -= gold
self_group_account.gold -= gold
target_user.gold += gold - fee
target_group_account.gold += gold - fee
return f"向 {target_group_account.nickname} 赠送{gold}金币\n" + tips
def transfer_props(event:GroupMessageEvent, target:Tuple[UserDict,GroupAccount], prop_name:str, count:int) -> str:
"""
赠送道具
param:
event: GroupMessageEvent
target:目标账户
props: 道具名
count: 数量
"""
prop_code = props_index.get(prop_name)
if not prop_code:
return f"没有【{prop_name}】这种道具。"
self_user,self_group_account = Manager.locate_user(event)
target_user,target_group_account = target
if prop_code[1] == "3":
self_account = self_user
target_account = target_user
else:
self_account = self_group_account
target_account = target_group_account
n = self_account.props.get(prop_code,0)
if n < count:
return f"你没有足够的道具,无法完成结算。\n——你有{n}个【{prop_name}】。"
self_account.props[prop_code] -= count
target_account.props.setdefault(prop_code,0)
target_account.props[prop_code] += count
return f"向 {target_group_account.nickname} 送出{count}个【{prop_name}】!"
def connect(event:MessageEvent, group_id:str = None) -> str:
"""
关联账户
param:
event: GroupMessageEvent
group_id:关联群号
"""
user = Manager.locate_user(event)[0]
if group_id:
try:
group_id = int(group_id)
except:
return f"无效输入:{group_id}"
if group_id in user.group_accounts:
user.connect = group_id
else:
return f"你在 {group_id} 无账户,关联失败。"
else:
user.connect = event.group_id
return f"私聊账户已关联到{group_id}"
def my_gold(event:MessageEvent) -> str:
"""
我的金币
"""
user,group_account = Manager.locate_user(event)
if group_account:
return f"你还有 {group_account.gold} 枚金币"
else:
group_accounts = user.group_accounts
msg = "你的账户\n"
for group_id in group_accounts:
msg += f'{group_id} 金币:{group_accounts[group_id].gold}枚\n'
return msg
async def my_info(event:MessageEvent) -> Message:
"""
我的资料卡
"""
user,group_account = Manager.locate_user(event)
if not group_account:
return "私聊未关联账户,请发送【关联账户】关联群内账户。"
info = []
# 加载全局信息
nickname = group_account.nickname
info.append(await my_info_head(user,nickname))
# 加载卡片
PropsCard = Manager.PropsCard_list((user,group_account))
msg = ""
for x in PropsCard:
msg += f"----\n[center]{x}\n"
if msg:
info.append(linecard(msg+"----", width = 880,padding = (0,-28),spacing = 1, font_size = 60))
# 加载成就卡片
Achieve = Manager.Achieve_list((user,group_account))[:2]
# 加载本群账户
gold = group_account.gold
value = group_account.value
is_sign = group_account.is_sign
if is_sign:
is_sign = ["已签到","green"]
else:
is_sign = ["未签到","red"]
security = 3 - group_account.security
if security:
security = [security,"green"]
else:
security = [security,"red"]
msg = ""
for x in Achieve:
msg += f"{x}\n"
msg += (2-len(Achieve))*"\n"
msg += (
f"金币 {'{:,}'.format(gold)}\n"
f"股票 {'{:,}'.format(round(value,2))}\n"
"签到 [nowrap]\n"
f"[color][{is_sign[1]}]{is_sign[0]}\n"
"补贴 还剩 [nowrap]\n"
f"[color][{security[1]}]{security[0]}[nowrap]\n 次"
)
# 加载资产分析
dist = []
for x in user.group_accounts:
account = user.group_accounts[x]
if not (group_name := group_data[x].company.company_name):
group_name = f"({str(x)[-4:]})"
dist.append([account.gold + account.value, group_name])
dist = [x for x in dist if x[0] > 0]
info.append(my_info_account(msg,dist))
# 加载股票信息
msg = ""
for stock in group_account.stocks:
company_name = group_data[stock].company.company_name
if i := group_account.stocks[stock]:
msg += f"{company_name}[nowrap]\n[right][color][green]{i}\n"
if msg:
info.append(linecard(msg, width = 880,endline = "股票信息"))
return MessageSegment.image(info_splicing(info,Manager.BG_path(event.user_id)))
async def my_exchange(event:MessageEvent) -> Message:
"""
我的交易信息
"""
user,group_account = Manager.locate_user(event)
if not group_account:
return "私聊未关联账户,请发送【关联账户】关联群内账户。"
info = []
# 加载股票信息
for company_id,stock in group_account.stocks.items():
msg = ""
if stock:
account_name = None
company = group_data[company_id].company
msg += f"[pixel][20]公司 {company.company_name}\n[pixel][20]结算 [nowrap]\n[color][green]{'{:,}'.format(round(company.float_gold/company.issuance,2))}[nowrap]\n[pixel][400]数量 [nowrap]\n[color][green]{stock}\n"
if exchange := company.exchange.get(user.user_id):
if exchange.group_id == group_account.group_id:
account_name = "本群"
else:
account_name = group_data[exchange.group_id].company.company_name
account_name = account_name if account_name else f"({str(exchange.group_id)[4]}...)"
msg += f"[pixel][20]报价 [nowrap]\n[color][green]{exchange.quote}[nowrap]\n[pixel][400]发布 [nowrap]\n[color][green]{exchange.n}\n"
info.append(linecard(msg, width = 880,endline = f"报价账户:{account_name}" if account_name else "无报价"))
if info:
info.insert(0,await my_exchange_head(group_account))
return MessageSegment.image(info_splicing(info,Manager.BG_path(event.user_id)))
else:
return "你的股票信息为空。"
def my_props(event:MessageEvent) -> Message:
"""
我的道具
"""
user,group_account = Manager.locate_user(event)
if not group_account:
return "私聊未关联账户,请发送【关联账户】关联群内账户。"
props = {}
props.update(user.props)
props.update(group_account.props)
props = sorted(props.items(), key = lambda x:int(x[0]), reverse = True)
info = []
for seg in props:
if (n := seg[1]) < 1:
continue
prop_code = seg[0]
quant = "天" if prop_code[2] == "0" else "个"
prop = props_library.get(prop_code,{"name": prop_code, "color": "black","rare": 1,"intro": "未知","des": "未知"})
color = prop['color']
name = prop['name']
rare = prop['rare']
msg = (
f"[font_big][color][{color}]【{name}】[nowrap][passport]\n[right]{n}{quant}\n"
"----\n"
+ prop['intro'] + f"\n[right]{prop['des']}\n"
)
info.append(
linecard(msg,
width = 880,
padding=(0,20),
endline = "特殊道具" if rare == 0 else rare*'☆',
bg_color = (255,255,255,153),
autowrap = True
))
if info:
return MessageSegment.image(info_splicing(info,Manager.BG_path(event.user_id),spacing = 5))
else:
return "您的仓库空空如也。"
async def info_profile(user_id:int) -> list:
"""
总览资料卡
"""
user = user_data[user_id]
info = []
# 加载全局信息
nickname = user.nickname
info.append(await my_info_head(user,nickname))
msg = ""
# 加载资产分析
dist = []
for x in user.group_accounts:
account = user.group_accounts[x]
if not (group_name := group_data[x].company.company_name):
group_name = f"({str(x)[-4:]})"
dist.append([account.gold + account.value, group_name])
dist = [x for x in dist if x[0] > 0]
if dist:
info.append(my_info_account(msg,dist))
return info
def format_ranktitle(x,title:str = "金币"):
"""
根据排行榜将数据格式化
"""
if title == "金币" or title == "总金币":
return '{:,}'.format(x)
elif title == "总资产" or title == "资产" or title == "财富":
return '{:,}'.format(round(x,2))
elif title == "胜率":
return f"{round(x*100,2)}%"
else:
return x
async def draw_rank(ranklist:list, title:str = "金币", top:int = 20, group_id = None) -> list:
"""
排名信息
"""
info = []
i = 1
first = ranklist[0][1]
if group_id:
nicname = lambda user:user.group_accounts[group_id].nickname
else:
nicname = lambda user:user.nickname
for x in ranklist[:top]:
user_id = x[0]
info.append(await bar_chart(user_id,f"{i}.{nicname(user_data[user_id])}:{format_ranktitle(x[1],title)}\n", x[1]/first))
i += 1
return info
async def group_rank(event:MessageEvent, title:str = "金币") -> str:
"""
生成群内排行榜
"""
user,group_account = Manager.locate_user(event)
if not group_account:
return "私聊未关联账户,请发送【关联账户】关联群内账户。"
user_id = user.user_id
group_id = group_account.group_id
if not (ranklist := Manager.group_ranklist(group_id, title)):
return "无数据。"
info = await draw_rank(ranklist, title, 20, group_id = group_id)
return MessageSegment.image(info_splicing(info,Manager.BG_path(user_id), spacing = 5))
async def All_rank(event:MessageEvent, title:str = "金币", top:int = 10) -> list:
"""
生成总排行榜
"""
if not (ranklist := Manager.All_ranklist(title)):
return None
info = await draw_rank(ranklist, title, 20)
return MessageSegment.image(info_splicing(info,Manager.BG_path(event.user_id), spacing = 5))
def transfer_fee(amount: int, limit: int) -> int:
levels = [[0.02, max_bet_gold * 10],[0.2, max_bet_gold * 40],[0.4, max_bet_gold * 50],[0.6, float('inf')]]
fee = 0
for n,(level_tax, level_step) in enumerate(levels):
if limit >= level_step:
limit -= level_step
continue
levels[n][1] -= limit
break
for level_tax, level_step in levels[n:]:
if amount <= level_step:
fee += amount * level_tax
break
fee += level_step * level_tax
amount -= level_step
return int(fee)
def intergroup_transfer_gold(event:MessageEvent, gold:int, company_name:str):
"""
跨群转移金币到自己的账户
"""
user,group_account = Manager.locate_user(event)
if not group_account:
return "私聊未关联账户,请发送【关联账户】关联群内账户。"
if gold > group_account.gold:
return f"你没有足够的金币,无法完成结算。\n——你还有{group_account.gold}枚金币。"
if company_name in company_index:
company_id = company_index[company_name]
else:
return f"没有 {company_name} 的注册信息"
if company_id in user.group_accounts:
target_group_account = user.group_accounts[company_id]
else:
return f"你在 {company_name} 没有创建账户"
group_account.gold -= gold
ExRate = group_data[group_account.group_id].company.level/group_data[company_id].company.level
ExRate = min(ExRate,10) if ExRate else 1.0
tgold = int(ExRate * gold + 0.5)
fee = transfer_fee(tgold, user.transfer_limit)
target_group_account.gold += tgold - fee
user.transfer_limit += tgold
user.gold -= fee
return f"向 {company_name} 转移{gold}金币。\n汇率:{round(ExRate,2)} 手续费:{fee}({round(100*fee/tgold,2)}%)\n实际到账金额:{tgold - fee}"
def freeze(target:UserDict):
target_id = target.user_id
gold = target.gold
value = 0.0
group_accounts = target.group_accounts
for group_id,group_account in group_accounts.items():
value += group_account.value
group = group_data[group_id]
for company_id in group_account.stocks:
company = group_data[company_id].company
company.Buyback(group_account)
group.namelist.remove(target_id)
target.gold = 0
target.group_accounts = {}
x = gold + value
if x > 500 * max_bet_gold:
count = 500
elif x > 50 * max_bet_gold:
count = int (x / max_bet_gold)
else:
count = int (x / max_bet_gold) + 1
target.props.setdefault("03101",0)
target.props["03101"] += count
data.save()
return f"【冻结】清算完成,总价值为 {round(x,2)}(金币 {gold} 股票 {round(value,2)})"
async def delist():
"""
清理无效账户
"""
mapping = {}
bot_list = Manager.driver.bots.values()
for bot in bot_list:
group_list = await bot.get_group_list(no_cache = True)
for group in group_list:
mapping[group["group_id"]] = bot
if not mapping:
return "群组获取失败"
# 存在的群
groups = set(mapping.keys())
# 注册过的群
login_groups= set(group_data.keys())
# 已注册但不存在的群
delist_group = login_groups - groups
log = ""
# 处理与不存在的群相关的群账户
for user_id in user_data:
user = user_data[user_id]
group_accounts = user.group_accounts
accountset = set(group_accounts.keys())
delist_group_accounts = accountset & delist_group
for group_id in accountset:
group_account = group_accounts[group_id]
if group_id in delist_group_accounts:
# 删除不存在的群账户
log += f'删除群账户:{user_id} - {group_id}\n'
del group_account
else:
# 删除不存在的股票
group_account.stocks = {stock:count for stock, count in group_account.stocks.items() if stock not in delist_group_accounts}
# 删除不存在的群
for group_id in delist_group:
log += f'删除群:{group_id}\n'
del group_data[group_id]
# 已注册且存在的群
normal_groups = login_groups & groups
for group_id in normal_groups:
users = await mapping[group_id].get_group_member_list(group_id = group_id, no_cache = True)
if users:
users = set(x["user_id"] for x in users)
else:
continue
# 删除已注册但不存在的用户
namelist = group_data[group_id].namelist
delist_users = namelist - users
for user_id in delist_users:
log += f'删除群账户:{user_id} - {group_id}\n'
del user_data[user_id].group_accounts[group_id]
namelist.discard(user_id)
Manager.update_company_index()
# 保存数据
data.save()
return log[:-1] if log else "没有要清理的数据!"