From 2ecd2d82fbbb56fa795bd41a03f80d02e1d071ed Mon Sep 17 00:00:00 2001 From: Lewis Westbury Date: Tue, 6 Dec 2022 16:11:51 +0000 Subject: [PATCH 1/4] Suggested modifications to increase streaming robustness --- Mastonet/TimelineHttpStreaming.cs | 141 +++++++++++++++---------- Mastonet/TimelineStreaming.cs | 2 +- Mastonet/TimelineWebSocketStreaming.cs | 51 ++++++--- 3 files changed, 125 insertions(+), 69 deletions(-) diff --git a/Mastonet/TimelineHttpStreaming.cs b/Mastonet/TimelineHttpStreaming.cs index 8c41c6e..62417e4 100644 --- a/Mastonet/TimelineHttpStreaming.cs +++ b/Mastonet/TimelineHttpStreaming.cs @@ -24,74 +24,107 @@ public TimelineHttpStreaming(StreamingType type, string? param, string instance, this.instance = instance; } - public override async Task Start() + public override async Task Start(TimeSpan? timeout = null, bool restart = true) { - string url = "https://" + instance; - switch (streamingType) + do { - case StreamingType.User: - url += "/api/v1/streaming/user"; - break; - case StreamingType.Public: - url += "/api/v1/streaming/public"; - break; - case StreamingType.PublicLocal: - url += "/api/v1/streaming/public/local"; - break; - case StreamingType.Hashtag: - url += "/api/v1/streaming/hashtag?tag=" + param; - break; - case StreamingType.HashtagLocal: - url += "/api/v1/streaming/hashtag/local?tag=" + param; - break; - case StreamingType.List: - url += "/api/v1/streaming/list?list=" + param; - break; - case StreamingType.Direct: - url += "/api/v1/streaming/direct"; - break; - default: - throw new NotImplementedException(); - } + string url = "https://" + instance; + switch (streamingType) + { + case StreamingType.User: + url += "/api/v1/streaming/user"; + break; + case StreamingType.Public: + url += "/api/v1/streaming/public"; + break; + case StreamingType.PublicLocal: + url += "/api/v1/streaming/public/local"; + break; + case StreamingType.Hashtag: + url += "/api/v1/streaming/hashtag?tag=" + param; + break; + case StreamingType.HashtagLocal: + url += "/api/v1/streaming/hashtag/local?tag=" + param; + break; + case StreamingType.List: + url += "/api/v1/streaming/list?list=" + param; + break; + case StreamingType.Direct: + url += "/api/v1/streaming/direct"; + break; + default: + throw new NotImplementedException(); + } - using (var request = new HttpRequestMessage(HttpMethod.Get, url)) - using (cts = new CancellationTokenSource()) - { - request.Headers.Add("Authorization", "Bearer " + accessToken); - using (var response = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cts.Token)) + try { - var stream = await response.Content.ReadAsStreamAsync(); - using (var reader = new StreamReader(stream)) + using (var request = new HttpRequestMessage(HttpMethod.Get, url)) + using (cts = new CancellationTokenSource()) { - string? eventName = null; - string? data = null; - - while (true) + request.Headers.Add("Authorization", "Bearer " + accessToken); + using (var response = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cts.Token)) { - var line = await reader.ReadLineAsync(); - - if (string.IsNullOrEmpty(line) || line.StartsWith(":")) + var stream = await response.Content.ReadAsStreamAsync(); + using (var reader = new StreamReader(stream)) { - eventName = data = null; - continue; - } + string? eventName = null; + string? data = null; - if (line.StartsWith("event: ")) - { - eventName = line.Substring("event: ".Length).Trim(); - } - else if (line.StartsWith("data: ")) - { - data = line.Substring("data: ".Length); - if (eventName != null) + DateTime lastReceivedValidLine = DateTime.Now; + while (true) { - SendEvent(eventName, data); + var line = await reader.ReadLineAsync(); + + if (string.IsNullOrEmpty(line)) + { + // reader returned without a line because of BaseStream.ReadTimeout + var timedOut = timeout != null && lastReceivedValidLine.Add(timeout.Value) < DateTime.Now; + if (reader.EndOfStream || timedOut) + { + // it has been too long since a valid line + var timeoutDuration = DateTime.Now.Subtract(lastReceivedValidLine); + throw new TimeoutException($"TimelineHttpStreaming timed out after: {timeoutDuration.ToString()}"); + } + else + { + // nothing to do here, we haven't timed out yet + eventName = data = null; + continue; + } + } + + if (line.StartsWith(":")) + { + lastReceivedValidLine = DateTime.Now; + eventName = data = null; + continue; + } + + if (line.StartsWith("event: ")) + { + lastReceivedValidLine = DateTime.Now; + eventName = line.Substring("event: ".Length).Trim(); + } + else if (line.StartsWith("data: ")) + { + lastReceivedValidLine = DateTime.Now; + data = line.Substring("data: ".Length); + if (eventName != null) + { + SendEvent(eventName, data); + } + } } } } } } - } + catch (TimeoutException) + { + if (!restart) + throw; + } + } while (restart); } public override void Stop() diff --git a/Mastonet/TimelineStreaming.cs b/Mastonet/TimelineStreaming.cs index d0ce382..1040399 100644 --- a/Mastonet/TimelineStreaming.cs +++ b/Mastonet/TimelineStreaming.cs @@ -24,7 +24,7 @@ protected TimelineStreaming(StreamingType type, string? param, string? accessTok this.accessToken = accessToken; } - public abstract Task Start(); + public abstract Task Start(TimeSpan? timeout = null, bool restart = true); public abstract void Stop(); protected void SendEvent(string eventName, string data) diff --git a/Mastonet/TimelineWebSocketStreaming.cs b/Mastonet/TimelineWebSocketStreaming.cs index de5d049..0b4efdc 100644 --- a/Mastonet/TimelineWebSocketStreaming.cs +++ b/Mastonet/TimelineWebSocketStreaming.cs @@ -25,7 +25,7 @@ public TimelineWebSocketStreaming(StreamingType type, string? param, string inst this.instanceGetter = instanceGetter; } - public override async Task Start() + public override async Task Start(TimeSpan? timeout = null, bool restart = true) { var instance = await instanceGetter; var url = instance?.Urls?.StreamingAPI; @@ -66,31 +66,54 @@ public override async Task Start() throw new NotImplementedException(); } - socket = new ClientWebSocket(); - await socket.ConnectAsync(new Uri(url), CancellationToken.None); byte[] buffer = new byte[receiveChunkSize]; MemoryStream ms = new MemoryStream(); - while (socket != null) + var lastValidMessage = DateTime.Now; + var timedOut = false; + do { - var result = await socket.ReceiveAsync(new ArraySegment(buffer), CancellationToken.None); + try + { + if (socket == null || socket.State != WebSocketState.Open || socket.CloseStatus != WebSocketCloseStatus.Empty) + { + if (socket != null) { socket.Dispose(); } + socket = new ClientWebSocket(); + await socket.ConnectAsync(new Uri(url), CancellationToken.None); + } - ms.Write(buffer, 0, result.Count); + var result = await socket.ReceiveAsync(new ArraySegment(buffer), CancellationToken.None); - if (result.EndOfMessage) - { - var messageStr = Encoding.UTF8.GetString(ms.ToArray()); + ms.Write(buffer, 0, result.Count); - var message = JsonConvert.DeserializeObject(messageStr); - if (message != null) + if (result.EndOfMessage) { - SendEvent(message.Event, message.Payload); + var messageStr = Encoding.UTF8.GetString(ms.ToArray()); + + var message = JsonConvert.DeserializeObject(messageStr); + if (message != null) + { + lastValidMessage = DateTime.Now; + SendEvent(message.Event, message.Payload); + } + ms.Dispose(); + ms = new MemoryStream(); } - ms.Dispose(); - ms = new MemoryStream(); + timedOut = timeout != null && lastValidMessage.Add(timeout.Value) < DateTime.Now; + if (timedOut) + { + var timeoutDuration = DateTime.Now.Subtract(lastValidMessage); + throw new TimeoutException($"TimelineWebSocketStreaming timed out after: {timeoutDuration.ToString()}"); + } + } catch (TimeoutException) + { + if (!restart) + throw; } } + while (restart); + ms.Dispose(); this.Stop(); From ceb55881456e04ea8f64c39aada0f1476b7b0b4e Mon Sep 17 00:00:00 2001 From: Lewis Westbury Date: Tue, 6 Dec 2022 16:15:48 +0000 Subject: [PATCH 2/4] Add a stream restarted notification event --- Mastonet/TimelineHttpStreaming.cs | 2 ++ Mastonet/TimelineStreaming.cs | 10 ++++++++++ Mastonet/TimelineWebSocketStreaming.cs | 2 ++ 3 files changed, 14 insertions(+) diff --git a/Mastonet/TimelineHttpStreaming.cs b/Mastonet/TimelineHttpStreaming.cs index 62417e4..cd98717 100644 --- a/Mastonet/TimelineHttpStreaming.cs +++ b/Mastonet/TimelineHttpStreaming.cs @@ -123,6 +123,8 @@ public override async Task Start(TimeSpan? timeout = null, bool restart = true) { if (!restart) throw; + else + NotifyStreamRestarted(); } } while (restart); } diff --git a/Mastonet/TimelineStreaming.cs b/Mastonet/TimelineStreaming.cs index 1040399..d1fc34b 100644 --- a/Mastonet/TimelineStreaming.cs +++ b/Mastonet/TimelineStreaming.cs @@ -17,6 +17,8 @@ public abstract class TimelineStreaming public event EventHandler? OnFiltersChanged; public event EventHandler? OnConversation; + public event Action OnStreamRestarted; + protected TimelineStreaming(StreamingType type, string? param, string? accessToken) { this.streamingType = type; @@ -27,6 +29,14 @@ protected TimelineStreaming(StreamingType type, string? param, string? accessTok public abstract Task Start(TimeSpan? timeout = null, bool restart = true); public abstract void Stop(); + protected void NotifyStreamRestarted() + { + if (OnStreamRestarted != null) + { + OnStreamRestarted.Invoke(); + } + } + protected void SendEvent(string eventName, string data) { switch (eventName) diff --git a/Mastonet/TimelineWebSocketStreaming.cs b/Mastonet/TimelineWebSocketStreaming.cs index 0b4efdc..5bf5ff4 100644 --- a/Mastonet/TimelineWebSocketStreaming.cs +++ b/Mastonet/TimelineWebSocketStreaming.cs @@ -110,6 +110,8 @@ public override async Task Start(TimeSpan? timeout = null, bool restart = true) { if (!restart) throw; + else + NotifyStreamRestarted(); } } while (restart); From 545995beff0eb3469d0648a37326e6bf28d716b6 Mon Sep 17 00:00:00 2001 From: Lewis Westbury Date: Tue, 6 Dec 2022 19:08:32 +0000 Subject: [PATCH 3/4] Indicate time of last valid activity in the stream when notifying of a restart --- .vscode/settings.json | 7 +++++++ Mastonet/TimelineHttpStreaming.cs | 4 ++-- Mastonet/TimelineStreaming.cs | 6 +++--- 3 files changed, 12 insertions(+), 5 deletions(-) create mode 100644 .vscode/settings.json diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..921cf8b --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,7 @@ +{ + "workbench.colorCustomizations": { + "activityBar.background": "#5A1219", + "titleBar.activeBackground": "#7E1A24", + "titleBar.activeForeground": "#FEFBFB" + } +} \ No newline at end of file diff --git a/Mastonet/TimelineHttpStreaming.cs b/Mastonet/TimelineHttpStreaming.cs index cd98717..9417736 100644 --- a/Mastonet/TimelineHttpStreaming.cs +++ b/Mastonet/TimelineHttpStreaming.cs @@ -56,6 +56,7 @@ public override async Task Start(TimeSpan? timeout = null, bool restart = true) throw new NotImplementedException(); } + DateTime lastReceivedValidLine = DateTime.Now; try { using (var request = new HttpRequestMessage(HttpMethod.Get, url)) @@ -70,7 +71,6 @@ public override async Task Start(TimeSpan? timeout = null, bool restart = true) string? eventName = null; string? data = null; - DateTime lastReceivedValidLine = DateTime.Now; while (true) { var line = await reader.ReadLineAsync(); @@ -124,7 +124,7 @@ public override async Task Start(TimeSpan? timeout = null, bool restart = true) if (!restart) throw; else - NotifyStreamRestarted(); + NotifyStreamRestarted(lastReceivedValidLine); } } while (restart); } diff --git a/Mastonet/TimelineStreaming.cs b/Mastonet/TimelineStreaming.cs index d1fc34b..8dcbcda 100644 --- a/Mastonet/TimelineStreaming.cs +++ b/Mastonet/TimelineStreaming.cs @@ -17,7 +17,7 @@ public abstract class TimelineStreaming public event EventHandler? OnFiltersChanged; public event EventHandler? OnConversation; - public event Action OnStreamRestarted; + public event Action OnStreamRestarted; protected TimelineStreaming(StreamingType type, string? param, string? accessToken) { @@ -29,11 +29,11 @@ protected TimelineStreaming(StreamingType type, string? param, string? accessTok public abstract Task Start(TimeSpan? timeout = null, bool restart = true); public abstract void Stop(); - protected void NotifyStreamRestarted() + protected void NotifyStreamRestarted(DateTime lastKnownSuccess) { if (OnStreamRestarted != null) { - OnStreamRestarted.Invoke(); + OnStreamRestarted.Invoke(lastKnownSuccess); } } From ed7703240aaaeee58960e238b510c0343c5b1fec Mon Sep 17 00:00:00 2001 From: Lewis Westbury Date: Tue, 6 Dec 2022 19:12:04 +0000 Subject: [PATCH 4/4] Save changes --- Mastonet/TimelineWebSocketStreaming.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Mastonet/TimelineWebSocketStreaming.cs b/Mastonet/TimelineWebSocketStreaming.cs index 5bf5ff4..4f9d989 100644 --- a/Mastonet/TimelineWebSocketStreaming.cs +++ b/Mastonet/TimelineWebSocketStreaming.cs @@ -111,7 +111,7 @@ public override async Task Start(TimeSpan? timeout = null, bool restart = true) if (!restart) throw; else - NotifyStreamRestarted(); + NotifyStreamRestarted(lastValidMessage); } } while (restart);