Skip to content

Commit

Permalink
✨feature: implement receive progress
Browse files Browse the repository at this point in the history
  • Loading branch information
tom8zds committed Jul 14, 2024
1 parent 4370689 commit 1358ec8
Show file tree
Hide file tree
Showing 10 changed files with 235 additions and 29 deletions.
3 changes: 3 additions & 0 deletions lib/core/rust/bridge.dart
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ Stream<List<NodeDevice>> listenDevice() =>
Stream<MissionInfo?> listenMission() =>
RustLib.instance.api.crateBridgeListenMission();

Stream<BigInt> listenTaskProgress() =>
RustLib.instance.api.crateBridgeListenTaskProgress();

Future<void> clearMission() => RustLib.instance.api.crateBridgeClearMission();

Future<void> cancelPending({required String id}) =>
Expand Down
83 changes: 78 additions & 5 deletions lib/core/rust/frb_generated.dart
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class RustLib extends BaseEntrypoint<RustLibApi, RustLibApiImpl, RustLibWire> {
String get codegenVersion => '2.1.0';

@override
int get rustContentHash => -1293166539;
int get rustContentHash => 1509709400;

static const kDefaultExternalLibraryLoaderConfig =
ExternalLibraryLoaderConfig(
Expand Down Expand Up @@ -92,6 +92,8 @@ abstract class RustLibApi extends BaseApi {

Stream<bool> crateBridgeListenServerState();

Stream<BigInt> crateBridgeListenTaskProgress();

Future<void> crateBridgeRestartServer();

Future<void> crateBridgeSetup(
Expand Down Expand Up @@ -357,13 +359,40 @@ class RustLibApiImpl extends RustLibApiImplPlatform implements RustLibApi {
argNames: ["s"],
);

@override
Stream<BigInt> crateBridgeListenTaskProgress() {
final s = RustStreamSink<BigInt>();
unawaited(handler.executeNormal(NormalTask(
callFfi: (port_) {
final serializer = SseSerializer(generalizedFrbRustBinding);
sse_encode_StreamSink_usize_Sse(s, serializer);
pdeCallFfi(generalizedFrbRustBinding, serializer,
funcId: 11, port: port_);
},
codec: SseCodec(
decodeSuccessData: sse_decode_unit,
decodeErrorData: null,
),
constMeta: kCrateBridgeListenTaskProgressConstMeta,
argValues: [s],
apiImpl: this,
)));
return s.stream;
}

TaskConstMeta get kCrateBridgeListenTaskProgressConstMeta =>
const TaskConstMeta(
debugName: "listen_task_progress",
argNames: ["s"],
);

@override
Future<void> crateBridgeRestartServer() {
return handler.executeNormal(NormalTask(
callFfi: (port_) {
final serializer = SseSerializer(generalizedFrbRustBinding);
pdeCallFfi(generalizedFrbRustBinding, serializer,
funcId: 11, port: port_);
funcId: 12, port: port_);
},
codec: SseCodec(
decodeSuccessData: sse_decode_unit,
Expand All @@ -389,7 +418,7 @@ class RustLibApiImpl extends RustLibApiImplPlatform implements RustLibApi {
sse_encode_box_autoadd_node_device(device, serializer);
sse_encode_box_autoadd_core_config(config, serializer);
pdeCallFfi(generalizedFrbRustBinding, serializer,
funcId: 12, port: port_);
funcId: 13, port: port_);
},
codec: SseCodec(
decodeSuccessData: sse_decode_unit,
Expand All @@ -412,7 +441,7 @@ class RustLibApiImpl extends RustLibApiImplPlatform implements RustLibApi {
callFfi: (port_) {
final serializer = SseSerializer(generalizedFrbRustBinding);
pdeCallFfi(generalizedFrbRustBinding, serializer,
funcId: 13, port: port_);
funcId: 14, port: port_);
},
codec: SseCodec(
decodeSuccessData: sse_decode_unit,
Expand All @@ -435,7 +464,7 @@ class RustLibApiImpl extends RustLibApiImplPlatform implements RustLibApi {
callFfi: (port_) {
final serializer = SseSerializer(generalizedFrbRustBinding);
pdeCallFfi(generalizedFrbRustBinding, serializer,
funcId: 14, port: port_);
funcId: 15, port: port_);
},
codec: SseCodec(
decodeSuccessData: sse_decode_unit,
Expand Down Expand Up @@ -484,6 +513,12 @@ class RustLibApiImpl extends RustLibApiImplPlatform implements RustLibApi {
throw UnimplementedError();
}

@protected
RustStreamSink<BigInt> dco_decode_StreamSink_usize_Sse(dynamic raw) {
// Codec=Dco (DartCObject based), see doc to use other codecs
throw UnimplementedError();
}

@protected
String dco_decode_String(dynamic raw) {
// Codec=Dco (DartCObject based), see doc to use other codecs
Expand Down Expand Up @@ -699,6 +734,12 @@ class RustLibApiImpl extends RustLibApiImplPlatform implements RustLibApi {
return;
}

@protected
BigInt dco_decode_usize(dynamic raw) {
// Codec=Dco (DartCObject based), see doc to use other codecs
return dcoDecodeU64(raw);
}

@protected
AnyhowException sse_decode_AnyhowException(SseDeserializer deserializer) {
// Codec=Sse (Serialization based), see doc to use other codecs
Expand Down Expand Up @@ -735,6 +776,13 @@ class RustLibApiImpl extends RustLibApiImplPlatform implements RustLibApi {
throw UnimplementedError('Unreachable ()');
}

@protected
RustStreamSink<BigInt> sse_decode_StreamSink_usize_Sse(
SseDeserializer deserializer) {
// Codec=Sse (Serialization based), see doc to use other codecs
throw UnimplementedError('Unreachable ()');
}

@protected
String sse_decode_String(SseDeserializer deserializer) {
// Codec=Sse (Serialization based), see doc to use other codecs
Expand Down Expand Up @@ -986,6 +1034,12 @@ class RustLibApiImpl extends RustLibApiImplPlatform implements RustLibApi {
// Codec=Sse (Serialization based), see doc to use other codecs
}

@protected
BigInt sse_decode_usize(SseDeserializer deserializer) {
// Codec=Sse (Serialization based), see doc to use other codecs
return deserializer.buffer.getBigUint64();
}

@protected
void sse_encode_AnyhowException(
AnyhowException self, SseSerializer serializer) {
Expand Down Expand Up @@ -1045,6 +1099,19 @@ class RustLibApiImpl extends RustLibApiImplPlatform implements RustLibApi {
serializer);
}

@protected
void sse_encode_StreamSink_usize_Sse(
RustStreamSink<BigInt> self, SseSerializer serializer) {
// Codec=Sse (Serialization based), see doc to use other codecs
sse_encode_String(
self.setupAndSerialize(
codec: SseCodec(
decodeSuccessData: sse_decode_usize,
decodeErrorData: sse_decode_AnyhowException,
)),
serializer);
}

@protected
void sse_encode_String(String self, SseSerializer serializer) {
// Codec=Sse (Serialization based), see doc to use other codecs
Expand Down Expand Up @@ -1255,4 +1322,10 @@ class RustLibApiImpl extends RustLibApiImplPlatform implements RustLibApi {
void sse_encode_unit(void self, SseSerializer serializer) {
// Codec=Sse (Serialization based), see doc to use other codecs
}

@protected
void sse_encode_usize(BigInt self, SseSerializer serializer) {
// Codec=Sse (Serialization based), see doc to use other codecs
serializer.buffer.putBigUint64(self);
}
}
20 changes: 20 additions & 0 deletions lib/core/rust/frb_generated.io.dart
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ abstract class RustLibApiImplPlatform extends BaseApiImpl<RustLibWire> {
RustStreamSink<MissionInfo?>
dco_decode_StreamSink_opt_box_autoadd_mission_info_Sse(dynamic raw);

@protected
RustStreamSink<BigInt> dco_decode_StreamSink_usize_Sse(dynamic raw);

@protected
String dco_decode_String(dynamic raw);

Expand Down Expand Up @@ -112,6 +115,9 @@ abstract class RustLibApiImplPlatform extends BaseApiImpl<RustLibWire> {
@protected
void dco_decode_unit(dynamic raw);

@protected
BigInt dco_decode_usize(dynamic raw);

@protected
AnyhowException sse_decode_AnyhowException(SseDeserializer deserializer);

Expand All @@ -132,6 +138,10 @@ abstract class RustLibApiImplPlatform extends BaseApiImpl<RustLibWire> {
sse_decode_StreamSink_opt_box_autoadd_mission_info_Sse(
SseDeserializer deserializer);

@protected
RustStreamSink<BigInt> sse_decode_StreamSink_usize_Sse(
SseDeserializer deserializer);

@protected
String sse_decode_String(SseDeserializer deserializer);

Expand Down Expand Up @@ -206,6 +216,9 @@ abstract class RustLibApiImplPlatform extends BaseApiImpl<RustLibWire> {
@protected
void sse_decode_unit(SseDeserializer deserializer);

@protected
BigInt sse_decode_usize(SseDeserializer deserializer);

@protected
void sse_encode_AnyhowException(
AnyhowException self, SseSerializer serializer);
Expand All @@ -226,6 +239,10 @@ abstract class RustLibApiImplPlatform extends BaseApiImpl<RustLibWire> {
void sse_encode_StreamSink_opt_box_autoadd_mission_info_Sse(
RustStreamSink<MissionInfo?> self, SseSerializer serializer);

@protected
void sse_encode_StreamSink_usize_Sse(
RustStreamSink<BigInt> self, SseSerializer serializer);

@protected
void sse_encode_String(String self, SseSerializer serializer);

Expand Down Expand Up @@ -306,6 +323,9 @@ abstract class RustLibApiImplPlatform extends BaseApiImpl<RustLibWire> {

@protected
void sse_encode_unit(void self, SseSerializer serializer);

@protected
void sse_encode_usize(BigInt self, SseSerializer serializer);
}

// Section: wire_class
Expand Down
2 changes: 1 addition & 1 deletion lib/i18n/strings.g.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
/// Locales: 2
/// Strings: 60 (30 per locale)
///
/// Built on 2024-07-07 at 14:19 UTC
/// Built on 2024-07-14 at 06:18 UTC
// coverage:ignore-file
// ignore_for_file: type=lint
Expand Down
5 changes: 4 additions & 1 deletion lib/view/pages/mission_page.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import '../../core/rust/bridge.dart';
import '../../i18n/strings.g.dart';
import '../widget/common_widget.dart';
import '../widget/device_widget.dart';
import '../widget/mission_widget.dart';

class IdlePage extends StatelessWidget {
@override
Expand Down Expand Up @@ -119,7 +120,9 @@ class _TransferPageState extends State<TransferPage> {
),
),
if (file.state == const FileState.transfer())
LinearProgressIndicator(value: 0.3),
TaskProgress(
total: file.info.size,
),
],
),
),
Expand Down
23 changes: 23 additions & 0 deletions lib/view/widget/mission_widget.dart
Original file line number Diff line number Diff line change
@@ -1,5 +1,28 @@
import 'package:flutter/material.dart';
import 'package:flutter_riverpod/flutter_riverpod.dart';
import 'package:localsend_rs/core/rust/bridge.dart';

class TaskProgress extends StatelessWidget {
final progressStream = listenTaskProgress();
final int total;

TaskProgress({super.key, required this.total});

@override
Widget build(BuildContext context) {
return StreamBuilder(
stream: progressStream,
builder: (context, snapshot) {
if (snapshot.hasData) {
print("${snapshot.data} / ${total}");
return LinearProgressIndicator(
value: (snapshot.data?.toDouble() ?? 0) / total,
);
}
return Container();
});
}
}

class MissionWidget extends ConsumerWidget {
const MissionWidget({super.key});
Expand Down
19 changes: 4 additions & 15 deletions rust/src/actor/mission/transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ enum Message {
respond_to: oneshot::Sender<Result<(), MissionState>>,
},
ListenTask {
token: String,
respond_to: oneshot::Sender<Result<watch::Receiver<usize>, String>>,
},
StartTask {
Expand Down Expand Up @@ -225,16 +224,12 @@ impl Actor {

let _ = respond_to.send(());
}
Message::ListenTask { token, respond_to } => match &self.store.mission {
Message::ListenTask { respond_to } => match &self.store.mission {
Some(_) => {
let task = self.store.task.clone();
match task {
Some(task) => {
if task.token == token {
let _ = respond_to.send(Ok(task.progress));
return;
}
let _ = respond_to.send(Err("task token not match".to_string()));
let _ = respond_to.send(Ok(task.progress));
}
None => {
let _ = respond_to.send(Err("task not found".to_string()));
Expand Down Expand Up @@ -283,15 +278,9 @@ impl Handle {
recv.await.expect("Actor task has been killed")
}

pub async fn listen_task_progress(
&self,
token: String,
) -> Result<watch::Receiver<usize>, String> {
pub async fn listen_task_progress(&self) -> Result<watch::Receiver<usize>, String> {
let (send, recv) = oneshot::channel();
let msg = Message::ListenTask {
token,
respond_to: send,
};
let msg = Message::ListenTask { respond_to: send };

let _ = self.sender.send(msg).await;

Expand Down
14 changes: 14 additions & 0 deletions rust/src/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,20 @@ pub async fn listen_mission(s: StreamSink<Option<MissionInfo>>) {
}
}

pub async fn listen_task_progress(s: StreamSink<usize>) {
let mut rx = _get_core()
.mission
.transfer
.listen_task_progress()
.await
.unwrap();
loop {
let _ = rx.changed().await;
let data = rx.borrow().clone();
let _ = s.add(data);
}
}

pub async fn clear_mission() {
MISSION_NOTIFY.clear().await;
}
Expand Down
Loading

0 comments on commit 1358ec8

Please sign in to comment.