哔哩哔哩漫画签到与福利券兑换
前置说明
既然都搭建博客了,感觉可以在这里把写过的一些杂七杂八的简单代码放上来,都比较简单就不打算在Github上给每一个都单独建一个仓库了。
这里先更新一个“哔哩哔哩漫画签到与福利券兑换”,未来有空还可以再写一些。
动机
哔哩哔哩漫画每天签到能领取一定的积分,积分可以兑换成可购买漫画的福利券。
不过最优惠的100积分只在每天0点发放一定数量。
为此我基于NoneBot2和bilibili-api,并参考SocialSisterYi/bilibili-API-collect写了两个插件,分别用于每天签到领积分和每天0点抢购福利券,目前用下来稳定性极高。
NoneBot2用起来还是比较简单的,我也在上面开发了一些插件,有空也可以分享出来。
实现的效果如下:
似乎成功率太高了完全不需要推送
以防万一还是推送一下吧
代码
get_sessdata.py
用于获取并刷新登录信息
from bilibili_api import Credential, sync
import json
import logging
def get_sessdata(cookie_path: str) -> str:
with open(cookie_path, 'r') as f:
cookie = json.load(f)
credential = Credential(**cookie)
if sync(credential.check_refresh()):
logging.info('Cookie needs to be refreshed.')
sync(credential.refresh())
cookie = credential.get_cookies()
cookie = {k.lower(): v for k, v in cookie.items()}
with open(cookie_path, 'w') as f:
json.dump(cookie, f, indent=4)
logging.info(f'Cookie refreshed.\n\t{cookie}')
return cookie['sessdata']
其中cookie_path是一个json文件,包含以下5个key:
- sessdata
- buvid3
- bili_jct
- ac_time_value
- dedeuserid
可以使用浏览器InPrivate模式登录后在cookie中获取,详见bilibili-api文档
checkin.py
用于简单的NoneBot2插件,可以在给QQ机器人发送/漫画签到
手动执行,也会在每天指定的时间定时运行并将结果反馈。
我这里没有开发成通用的形式,让所有的QQ机器人好友使用,因此仅支持发送给QQ机器人管理员。
当然你要用的话也可以考虑改为直接使用cron
定时运行,再想别的办法进行消息推送或者不推送。
from nonebot import get_driver, on_command, get_bot, require
from nonebot.log import logger
from nonebot.adapters.onebot.v11 import Bot, Event
from nonebot.typing import T_State
from nonebot.permission import SUPERUSER
require("nonebot_plugin_apscheduler")
from nonebot_plugin_apscheduler import scheduler
import requests
import time
import traceback
from .get_sessdata import get_sessdata
checkin = on_command("漫画签到", priority=5, permission=SUPERUSER) # 指定仅允许管理员使用
async def checkin_func():
url = "https://manga.bilibili.com/twirp/activity.v1.Activity/ClockIn?platform=android"
headers = {
'user-agent': "Dalvik/2.1.0 (Linux; U; Android 14; 23046RP50C Build/UKQ1.230804.001) 6.8.5 os/android model/23046RP50C mobi_app/android_comic build/36608060 channel/xiaomi innerVer/36608060 osVer/14 network/2",
'accept': 'application/json, text/plain, */*',
'accept-encoding': 'gzip, deflate, br',
}
# 此处隐去我的cookie_path
headers.update({'cookie': f'SESSDATA={get_sessdata(config.cookie_path)}'})
logger.info(f'headers: {headers}')
try:
resp = requests.post(url, headers=headers)
resp.raise_for_status()
retdata = resp.json()
if retdata['code'] == 0:
msg = 'Bilibili漫画签到成功'
logger.success(msg)
else:
msg = 'Bilibili漫画签到失败,' + retdata['msg']
logger.error(msg)
except Exception as e:
msg = 'Bilibili漫画签到失败,' + str(e)
logger.error(traceback.format_exc())
return msg
@checkin.handle()
async def handle_checkin(bot: Bot, event: Event, state: T_State):
msg = await checkin_func()
await checkin.finish(msg)
# 此处设置每天签到的时间
@scheduler.scheduled_job('cron', hour=14, minute=57, id='manga_checkin')
async def handle_checkin_scheduler():
msg = await checkin_func()
bot = get_bot()
# 定时消息仅发送给管理员
await bot.send_private_msg(user_id=next(iter(bot.config.superusers)), message=msg)
exchange.py
用于在每天0点兑换的NoneBot2插件。实际上在23点59分开始运行并进行预处理,之后检测系统时间,于0点准时兑换,因此请确保系统时间准确,定时同步时间:
timedatectl set-ntp true
NoneBot2逻辑和上面的签到插件类似,查看exchange_func
函数了解兑换逻辑即可,代码如下:
from nonebot import get_driver, on_command, get_bot, require
from nonebot.log import logger
from nonebot.adapters.onebot.v11 import Bot, Event
from nonebot.typing import T_State
from nonebot.permission import SUPERUSER
require("nonebot_plugin_apscheduler")
from nonebot_plugin_apscheduler import scheduler
import requests
import time
import traceback
from .get_sessdata import get_sessdata
exchange = on_command("漫画兑换", priority=5, permission=SUPERUSER) # 指定仅允许管理员使用
async def exchange_func():
point_of_target = 100 # 仅兑换100积分的福利券
type_of_target = 7 # 仅兑换漫画福利券,避免其他类型的商品
get_point_url = 'https://manga.bilibili.com/twirp/pointshop.v1.Pointshop/GetUserPoint'
list_url = 'https://manga.bilibili.com/twirp/pointshop.v1.Pointshop/ListProduct'
exchange_url = 'https://manga.bilibili.com/twirp/pointshop.v1.Pointshop/Exchange'
headers = {
'user-agent': "Dalvik/2.1.0 (Linux; U; Android 14; 23046RP50C Build/UKQ1.230804.001) 6.8.5 os/android model/23046RP50C mobi_app/android_comic build/36608060 channel/xiaomi innerVer/36608060 osVer/14 network/2",
'accept': 'application/json, text/plain, */*',
'accept-encoding': 'gzip, deflate, br',
}
# 此处隐去我的cookie_path
headers.update({'cookie': f'SESSDATA={get_sessdata(config.cookie_path)}'})
logger.info(f'headers: {headers}')
def get_point(session):
try:
resp = session.post(get_point_url)
resp.raise_for_status()
return resp.json()['data']['point']
except Exception as e:
logger.error(traceback.format_exc())
return None
def get_target_id(session):
try:
resp = session.post(list_url)
resp.raise_for_status()
if resp.json()['code'] != 0:
raise RuntimeError('Failed to get product list.')
products = resp.json()['data']
for product in products:
if product['real_cost'] == point_of_target and product['type'] == type_of_target:
return product['id']
return -1
except Exception as e:
logger.error(traceback.format_exc())
return None
def exchange(session, product_id):
try:
resp = session.post(exchange_url, json={'product_id': product_id, 'product_num': 1, 'point': point_of_target})
resp.raise_for_status()
return resp.json()
except Exception as e:
logger.error(traceback.format_exc())
return None
session = requests.Session()
session.headers.update(headers)
# 获取剩余点数,小于目标点数则退出
point, retry = None, 10
while point is None and retry > 0:
point = get_point(session)
retry -= 1
if point is None:
msg = '获取剩余点数失败'
logger.error(msg)
return msg
point = int(point)
logger.info(f'当前剩余点数: {point}')
if point < point_of_target:
msg = f'剩余{point},点数不足'
logger.error(msg)
return msg
# 获取目标商品id
product_id, retry = None, 10
while product_id is None and retry > 0:
product_id = get_target_id(session)
retry -= 1
if product_id is None:
msg = '获取目标商品id失败'
logger.error(msg)
return msg
logger.info(f'目标商品id: {product_id}')
# 兑换
retry, success_times = 100, 0
while retry > 0:
if time.strftime("%H", time.localtime()) == '23':
while time.strftime("%H", time.localtime()) != '00':
pass # 等待到0点
resp = exchange(session, product_id)
if resp is None:
retry -= 1
continue
if resp['code'] == 2:
msg = '兑换失败,' + resp['msg']
break
if resp['code'] == 1 and resp['msg'] == '积分不足':
msg = '兑换失败,' + resp['msg']
break
if resp['code'] != 0:
msg = '兑换失败,' + resp['msg']
retry -= 1
continue
else:
success_times += 1
msg = f'兑换成功{success_times}次'
time.sleep(0.05)
if success_times > 0:
msg = f'兑换成功{success_times}次'
logger.info(msg)
return msg
@exchange.handle()
async def handle_exchange(bot: Bot, event: Event, state: T_State):
msg = await exchange_func()
await exchange.finish(msg)
@scheduler.scheduled_job('cron', hour='23', minute='59', id='manga_exchange')
async def handle_exchange_scheduler():
msg = await exchange_func()
bot = get_bot()
await bot.send_private_msg(user_id=next(iter(bot.config.superusers)), message=msg)