Skip to content

Commit

Permalink
support connect with cookie
Browse files Browse the repository at this point in the history
  • Loading branch information
4t145 committed Oct 10, 2023
1 parent d6dcfc7 commit dcf1e50
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 9 deletions.
2 changes: 1 addition & 1 deletion src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ impl Cmd {
"DANMU_MSG" => {
// 如果这里出问题,可能是b站协议发生变更了,所以panic一下无可厚非吧
let info = val["info"].as_array().expect(PROTOCOL_ERROR);
let message = info[1].as_str().expect(PROTOCOL_ERROR).clone();
let message = info[1].as_str().expect(PROTOCOL_ERROR);
let user = info[2].as_array().expect(PROTOCOL_ERROR);
let uid = user[0].as_u64().expect(PROTOCOL_ERROR);
let name = user[1].as_str().expect(PROTOCOL_ERROR);
Expand Down
19 changes: 19 additions & 0 deletions src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,31 @@ impl std::error::Error for EventStreamError {}
// fn abort(self);
// }

#[derive(Debug, Clone)]
pub struct LoginInfo {
// SESSDATA
sessdata: String,
}

impl LoginInfo {
pub fn new(sessdata: String) -> Self {
Self { sessdata }
}
pub fn inject(&self, mut request: Request) -> Request {
let cookie = HeaderValue::from_str(&format!("SESSDATA={}", self.sessdata)).expect("invalid sessdata");
request.headers_mut().append(COOKIE, cookie);
request
}
}

#[cfg(feature = "rt_tokio")]
mod tokio_connection;
use reqwest::header::{COOKIE, HeaderValue};
#[cfg(feature = "rt_tokio")]
pub use tokio_connection::TokioConnection as Connection;

#[cfg(feature = "rt_wasm")]
mod wasm_connection;
use tokio_tungstenite::tungstenite::handshake::client::Request;
#[cfg(feature = "rt_wasm")]
pub use wasm_connection::WasmConnection as Connection;
23 changes: 18 additions & 5 deletions src/connection/tokio_connection.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::*;
use futures_util::{stream::SplitStream, SinkExt, Stream, StreamExt};
use reqwest::Url;
use std::collections::VecDeque;
// use tungstenite;
use crate::{
Expand All @@ -8,7 +9,7 @@ use crate::{
packet::{Auth, Operation, RawPacket},
};
use tokio_tungstenite as tokio_ws2;
use tokio_ws2::tungstenite as ws2;
use tokio_ws2::tungstenite::{self as ws2, client::IntoClientRequest};
type WsStream = tokio_ws2::WebSocketStream<tokio_ws2::MaybeTlsStream<tokio::net::TcpStream>>;
type WsRx = SplitStream<WsStream>;

Expand Down Expand Up @@ -63,11 +64,18 @@ impl From<ws2::Error> for WsConnectError {
WsConnectError::WsError(val)
}
}
use tokio::time::Duration;
// 30s 发送一次心跳包
const HB_RATE: Duration = Duration::from_secs(30);

impl TokioConnection {
pub async fn connect(url: String, auth: Auth) -> Result<Self, WsConnectError> {
pub async fn connect(url: Url, auth: Auth, login_info: Option<&LoginInfo>) -> Result<Self, WsConnectError> {
use ws2::Message::*;
let (mut ws_stream, _resp) = tokio_ws2::connect_async(url).await?;
let mut req = url.into_client_request()?;
if let Some(login_info) = login_info {
req = login_info.inject(req);
}
let (mut ws_stream, _resp) = tokio_ws2::connect_async(req).await?;
let authpack_bin = RawPacket::build(Operation::Auth, &auth.ser()).ser();
ws_stream.send(Binary(authpack_bin)).await?;
let resp = ws_stream.next().await.ok_or_else(|| {
Expand All @@ -87,8 +95,7 @@ impl TokioConnection {
// hb task
let hb = async move {
use tokio::time::*;
// 30s 发送一次
let mut interval = interval(Duration::from_secs(30));
let mut interval = interval(HB_RATE);
loop {
interval.tick().await;
tx.send(ws2::Message::Binary(RawPacket::heartbeat().ser()))
Expand All @@ -104,6 +111,12 @@ impl TokioConnection {
}

pub fn abort(self) {
drop(self)
}
}

impl Drop for TokioConnection {
fn drop(&mut self) {
self.hb_handle.abort();
}
}
Expand Down
13 changes: 10 additions & 3 deletions src/connector.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use reqwest::Url;
use serde::Deserialize;

use crate::{connection::*, packet::*};
Expand All @@ -9,6 +10,7 @@ pub struct Connector {
pub token: String,
pub host_index: usize,
pub host_list: Vec<Host>,
pub login_info: Option<LoginInfo>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -77,10 +79,15 @@ impl Connector {
roomid,
token,
host_list,
login_info: None,
};
Ok(connector)
}

pub fn set_login_info(&mut self, login_info: LoginInfo) {
self.login_info = Some(login_info);
}

pub fn use_host(&mut self, index: usize) -> Result<&'_ str, usize> {
if self.host_list.len() > index {
self.host_index = index;
Expand All @@ -98,7 +105,7 @@ impl Connector {
for host in &self.host_list {
let url = host.wss();
let auth = Auth::new(self.uid, self.roomid, Some(self.token.clone()));
match Connection::connect(url, auth).await {
match Connection::connect(url, auth, self.login_info.as_ref()).await {
Ok(stream) => return Ok(stream),
Err(e) => log::warn!("connect error: {:?}", e),
}
Expand Down Expand Up @@ -144,10 +151,10 @@ pub struct Host {
}

impl Host {
fn wss(&self) -> String {
fn wss(&self) -> Url {
let host = &self.host;
let port = self.wss_port;
format!("wss://{host}:{port}/sub")
Url::parse(&format!("wss://{host}:{port}/sub")).expect("invalid url")
}
}

Expand Down

0 comments on commit dcf1e50

Please sign in to comment.