Skip to content

Commit

Permalink
use bilibili client
Browse files Browse the repository at this point in the history
  • Loading branch information
4t145 committed Oct 23, 2023
1 parent c1544ac commit 177a845
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 118 deletions.
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ serde-wasm-bindgen = { version = "0.5", optional = true }
log = "0.4"
reqwest = { version = "0.11", features = ["json"], optional = true }
byteorder = { version = "1.4.3", optional = true }
http = "0.2.9"

[dependencies.bincode]
version = "1.3.3"
Expand Down Expand Up @@ -61,6 +62,9 @@ optional = true
[dependencies.futures]
version = "0.3"

[dependencies.bilibili-client]
git = "https://github.com/4t145/bilibili-client.git"
rev = "32edfa6"

[features]
default = ["event"]
Expand Down
2 changes: 1 addition & 1 deletion examples/using-tokio/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ async fn tokio_main() {
let roomid = std::env::var("room_id")
.map(|s| str::parse::<u64>(&s).expect("invalid room id"))
.unwrap_or(read_roomid());
let connector = Connector::init(roomid, None).await.unwrap();
let connector = Connector::init(roomid, Default::default()).await.unwrap();
let mut stream = connector.connect().await.unwrap();
while let Some(maybe_evt) = stream.next().await {
match maybe_evt {
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
[toolchain]
channel="stable"
channel="nightly"
29 changes: 0 additions & 29 deletions src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,35 +45,6 @@ impl std::fmt::Display for EventStreamError {
}

impl std::error::Error for EventStreamError {}
// #[async_trait]
// pub trait Connector: Stream<Item = Result<Event, EventStreamError>> + StreamExt
// where
// Self: Sized,
// {
// /// 连接
// // async fn connect(url: String, auth: Auth) -> Result<Self, WsConnectError>;
// /// abort所有任务
// fn abort(self);
// }

#[derive(Debug, Clone)]
pub struct LoginInfo {
// SESSDATA
pub sessdata: String,
// uid
pub uid: u64,
}

impl LoginInfo {
pub fn new(uid: u64, sessdata: String) -> Self {
Self { sessdata, uid }
}
pub fn inject(&self, mut request: Request) -> Request {
let cookie = HeaderValue::from_str(&format!("SESSDATA={}", self.sessdata)).expect("invalid sessdata");
request.headers_mut().append(COOKIE, cookie);
request
}
}

#[cfg(feature = "rt_tokio")]
mod tokio_connection;
Expand Down
30 changes: 24 additions & 6 deletions src/connection/tokio_connection.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use super::*;
use bilibili_client::reqwest_client::LoginInfo;
use futures_util::{stream::SplitStream, SinkExt, Stream, StreamExt};
use reqwest::Url;
use reqwest::{Method, Url};
use std::collections::VecDeque;
// use tungstenite;
use crate::{
connection::WsConnectError,
event::Event,
packet::{Auth, Operation, RawPacket},
Connector,
};
use tokio_tungstenite as tokio_ws2;
use tokio_ws2::tungstenite::{self as ws2, client::IntoClientRequest};
Expand Down Expand Up @@ -69,12 +71,28 @@ use tokio::time::Duration;
const HB_RATE: Duration = Duration::from_secs(30);

impl TokioConnection {
pub async fn connect(url: Url, auth: Auth, login_info: Option<&LoginInfo>) -> Result<Self, WsConnectError> {
pub async fn connect(
url: Url,
auth: Auth,
connector: &Connector,
) -> Result<Self, WsConnectError> {
use ws2::Message::*;
let mut req = url.into_client_request()?;
if let Some(login_info) = login_info {
req = login_info.inject(req);
}
let reqwest_req = connector
.client
.inner()
.request(Method::GET, url)
.build()
.expect("shouldn't build fail");
let mut http_req_builder = http::Request::builder();
http_req_builder
.headers_mut()
.map(|h| *h = reqwest_req.headers().clone())
.expect("should have headers");
let req = http_req_builder
.uri(reqwest_req.url().as_str())
.method("GET")
.body(())
.expect("shouldn't fail to build ssh req body");
let (mut ws_stream, _resp) = tokio_ws2::connect_async(req).await?;
let authpack_bin = RawPacket::build(Operation::Auth, &auth.ser()).ser();
ws_stream.send(Binary(authpack_bin)).await?;
Expand Down
97 changes: 16 additions & 81 deletions src/connector.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
use reqwest::Url;
use serde::Deserialize;

use bilibili_client::{reqwest_client::LoginInfo, api::live::{danmu_info::RoomPlayInfo, room_play_info::{DanmuInfoData, Host}}};
use crate::{connection::*, packet::*};

#[derive(Debug, Clone)]
#[derive(Clone)]
pub struct Connector {
pub roomid: u64,
pub uid: u64,
pub token: String,
pub host_index: usize,
pub host_list: Vec<Host>,
pub login_info: Option<LoginInfo>,
pub login_info: LoginInfo,
pub client: bilibili_client::reqwest_client::Client,
}

#[derive(Debug)]
Expand Down Expand Up @@ -43,38 +42,17 @@ impl std::fmt::Display for InitError {
}

impl Connector {
pub async fn init(mut roomid: u64, login_info: Option<LoginInfo>) -> Result<Self, InitError> {
let client = reqwest::Client::new();
let room_info_url = format!(
"https://api.live.bilibili.com/xlive/web-room/v2/index/getRoomPlayInfo?room_id={}",
roomid
);
let RoomPlayInfoData {
room_id: real_room_id,
uid,
} = client
.get(room_info_url)
.send()
.await?
.json::<RoomPlayInfo>()
.await?
.data
.ok_or(InitError::ParseError("Fail to get room info".to_string()))?;
roomid = real_room_id;
let url = format!(
"https://api.live.bilibili.com/xlive/web-room/v1/index/getDanmuInfo?id={}&type=0",
roomid
);
let DanmuInfoData { token, host_list } = client
.get(url)
.send()
.await?
.json::<DanmuInfo>()
.await?
.data
.ok_or(InitError::ParseError("Fail to get danmu info".to_string()))?;
let uid = login_info.as_ref().map(|x| x.uid).unwrap_or(uid);
pub async fn init(mut roomid: u64, login_info: LoginInfo) -> bilibili_client::reqwest_client::ClientResult<Self> {
let client = bilibili_client::reqwest_client::Client::default();
client.set_login_info(&login_info);
let RoomPlayInfo { room_id, uid } = client.get_room_play_info(roomid).await?;
roomid = room_id;
let DanmuInfoData {
token,
host_list,
} = client.get_danmu_info(room_id).await?;
let connector = Connector {
client,
uid,
host_index: 0,
roomid,
Expand All @@ -86,7 +64,7 @@ impl Connector {
}

pub fn set_login_info(&mut self, login_info: LoginInfo) {
self.login_info = Some(login_info);
self.login_info = login_info;
}

pub fn use_host(&mut self, index: usize) -> Result<&'_ str, usize> {
Expand All @@ -106,7 +84,7 @@ impl Connector {
for host in &self.host_list {
let url = host.wss();
let auth = Auth::new(self.uid, self.roomid, Some(self.token.clone()));
match Connection::connect(url, auth, self.login_info.as_ref()).await {
match Connection::connect(url, auth, self).await {
Ok(stream) => return Ok(stream),
Err(e) => log::warn!("connect error: {:?}", e),
}
Expand All @@ -116,49 +94,6 @@ impl Connector {
}
}

#[derive(Debug, Deserialize)]
struct RoomPlayInfoData {
room_id: u64,
uid: u64,
}

///
/// api url:
/// https://api.live.bilibili.com/xlive/web-room/v2/index/getRoomPlayInfo?room_id=510
#[derive(Debug, Deserialize)]
struct RoomPlayInfo {
data: Option<RoomPlayInfoData>,
}

#[derive(Debug, Deserialize)]
struct DanmuInfo {
// code: i32,
// message: String,
// ttl: i32,
data: Option<DanmuInfoData>,
}
#[derive(Debug, Deserialize)]

struct DanmuInfoData {
// max_delay: i32,
token: String,
host_list: Vec<Host>,
}

#[derive(Debug, Deserialize, Clone)]
pub struct Host {
pub host: String,
pub wss_port: u16,
}

impl Host {
fn wss(&self) -> Url {
let host = &self.host;
let port = self.wss_port;
Url::parse(&format!("wss://{host}:{port}/sub")).expect("invalid url")
}
}

#[derive(Debug)]
pub enum ConnectError {
HostListIsEmpty,
Expand Down

0 comments on commit 177a845

Please sign in to comment.