Skip to content

Commit

Permalink
improve reconnect issue when multiple bridges running
Browse files Browse the repository at this point in the history
  • Loading branch information
tocsoft committed Mar 31, 2023
1 parent ce45b60 commit 02439f6
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 18 deletions.
4 changes: 4 additions & 0 deletions KubeConnect/Args.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ public Args(string[] args)
case "working-directory":
WorkingDirectory = Path.GetFullPath(argNext);
break;
case "trace-logs":
EnableTraceLogs = true;
break;
default:
// capture unknown args
if (!string.IsNullOrWhiteSpace(arg))
Expand Down Expand Up @@ -180,6 +183,7 @@ private void ProcessArgs(string[] args, Func<string, string, string, bool> proce
}
}

public bool EnableTraceLogs { get; set; } = false;
public string? Namespace { get; set; }
public string? KubeconfigFile { get; private set; }
public string? Context { get; private set; }
Expand Down
12 changes: 9 additions & 3 deletions KubeConnect/Hubs/BridgeHub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,19 @@
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.SignalR;
using Microsoft.Extensions.Hosting;

namespace KubeConnect.Hubs
{
public class BridgeHub : Hub
{
private readonly ServiceManager serviceManager;
private readonly IHostApplicationLifetime hostApplicationLifetime;

public BridgeHub(ServiceManager serviceManager)
public BridgeHub(ServiceManager serviceManager, IHostApplicationLifetime hostApplicationLifetime)
{
this.serviceManager = serviceManager;
this.hostApplicationLifetime = hostApplicationLifetime;
}

public override Task OnConnectedAsync()
Expand All @@ -22,7 +25,10 @@ public override Task OnConnectedAsync()

public override async Task OnDisconnectedAsync(Exception? exception)
{
await serviceManager.Release(Context.ConnectionId);
if (!hostApplicationLifetime.ApplicationStopping.IsCancellationRequested)
{
await serviceManager.Release(Context.ConnectionId);
}

// when disconnecting kill bridge
await base.OnDisconnectedAsync(exception);
Expand Down Expand Up @@ -53,7 +59,7 @@ public async Task StartServiceBridge(string serviceName, Dictionary<int, int> po
{
throw new Exception($"Unable to find the service '{serviceName}'");
}

// bridge logging should be a write to the group `$"Bridge:{service.ServiceName}:{service.Namespace}"`
await serviceManager.Intercept(service, ports.Select(x => (x.Key, x.Value)).ToList(), this.Context.ConnectionId, this.Clients.Client(this.Context.ConnectionId));
}
Expand Down
16 changes: 13 additions & 3 deletions KubeConnect/IConsoleLogProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,33 @@ namespace KubeConnect
public partial class IConsoleLogProvider : ILoggerProvider
{
private readonly IConsole console;
private readonly Args args;

public IConsoleLogProvider(IConsole console)
public IConsoleLogProvider(IConsole console, Args args)
{
this.console = console;
this.args = args;
}

public ILogger CreateLogger(string categoryName)
=> new IConsoleLoggger(categoryName, console);
=> new IConsoleLoggger(categoryName, console, this.args.EnableTraceLogs);

public void Dispose()
{
}

public class IConsoleLoggger : ILogger, IDisposable
{
public IConsoleLoggger(string category, IConsole console)
public IConsoleLoggger(string category, IConsole console, bool traceLogs)
{
this.traceLogs = traceLogs;
Category = category;
this.console = console;
this.internalLogs = category.StartsWith("KubeConnect");
}

private readonly bool traceLogs;

public string Category { get; }

private bool internalLogs;
Expand All @@ -45,6 +50,11 @@ public void Dispose()

public bool IsEnabled(LogLevel logLevel)
{
if (traceLogs)
{
return true;
}

if (internalLogs)
{
return logLevel >= LogLevel.Debug;
Expand Down
7 changes: 4 additions & 3 deletions KubeConnect/PortForwarding/PortForwardingConnectionHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public override async Task OnConnectedAsync(ConnectionContext connection)
{
var input = connection.Transport.Input;
var output = connection.Transport.Output;

var binding = connection.Features.Get<PortBinding>();
if (binding == null)
{
Expand All @@ -43,6 +43,7 @@ public override async Task OnConnectedAsync(ConnectionContext connection)
if (pod == null)
{
connection.Abort();
return;
}

logger.LogInformation("[{ConnectionID}] Opening connection for {ServiceName}:{ServicePort} to {PodName}:{PodPort}", connection.ConnectionId, binding.Name, (connection.LocalEndPoint as IPEndPoint)?.Port, pod.Name(), binding.TargetPort);
Expand All @@ -67,7 +68,7 @@ async Task PushToPod()
{
while (true)
{
var result = await input.ReadAsync();
var result = await input.ReadAsync(connection.ConnectionClosed);

foreach (var buffer in result.Buffer)
{
Expand All @@ -82,7 +83,7 @@ async Task PushToClient()
{
while (true)
{
var result = await podOutput.ReadAsync();
var result = await podOutput.ReadAsync(connection.ConnectionClosed);

foreach (var buffer in result.Buffer)
{
Expand Down
48 changes: 48 additions & 0 deletions KubeConnect/ServiceDetails.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.SignalR;

namespace KubeConnect
Expand All @@ -17,6 +20,51 @@ public class BridgeDetails
public IReadOnlyList<(int remotePort, int localPort)> BridgedPorts { get; init; } = Array.Empty<(int, int)>();

public IClientProxy Client { get; internal set; }

object locker = new object();
StringBuilder builder = new StringBuilder();
StringBuilder builderAlt = new StringBuilder();
internal void Log(string msg)
{
lock (locker)
{
if (builder.Length > 0)
{
builder.AppendLine();
}
builder.Append(msg);
}

}
SemaphoreSlim semaphore = new SemaphoreSlim(1);
public async Task FlushLogs()
{
StringBuilder RotateLogBuffers()
{
lock (locker)
{
var current = builder;
builder = builderAlt;
builderAlt = current;
return current;
}
}

await semaphore.WaitAsync();
try
{
var currentBuffers = RotateLogBuffers();
if (currentBuffers.Length > 0)
{
await Client.SendAsync("log", currentBuffers.ToString(), false);
currentBuffers.Clear();
}
}
finally
{
semaphore.Release();
}
}
}

public class ServiceDetails
Expand Down
80 changes: 71 additions & 9 deletions KubeConnect/ServiceManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -201,12 +201,52 @@ private async Task EnableDeployment(V1Deployment deployment)
}
}

Task logWriter;
private void EnsureLogWriterRunning()
{
if (!BridgedServices.Any())
{
return;
}

if (logWriter == null)
{
logWriter = Task.Run(async () =>
{
while (true)
{
try
{
foreach (var s in BridgedServices.ToList())
{
try
{
await s.FlushLogs();
}
catch
{
console.WriteErrorLine($"Failed to flush logs for {s.ServiceName} bridge");
}
}
}
catch
{
}
await Task.Delay(250);
}
});
}
}

private async Task StartSshForward(BridgeDetails bridgeDetails, ServiceDetails service)
{
EnsureLogWriterRunning();

var logger = (string msg) =>
{
console.WriteLine(msg);
_ = bridgeDetails.Client.SendAsync("log", msg, false);

bridgeDetails.Log(msg);
};

var pod = await FindInterceptionPod(service);
Expand All @@ -221,6 +261,7 @@ private async Task StartSshForward(BridgeDetails bridgeDetails, ServiceDetails s
ContainerPort = X.remotePort,
Name = $"port-{X.remotePort}"
}).ToList();

ports.Add(new V1ContainerPort
{
ContainerPort = 2222,
Expand Down Expand Up @@ -279,7 +320,14 @@ private async Task StartSshForward(BridgeDetails bridgeDetails, ServiceDetails s
client.AddForwardedPort(port);
port.RequestReceived += (object? sender, Renci.SshNet.Common.PortForwardEventArgs e) =>
{
logger($"Traffic redirected from {service.ServiceName}:{mappings.remotePort} to localhost:{mappings.localPort}");
try
{
logger($"Traffic redirected from {service.ServiceName}:{mappings.remotePort} to localhost:{mappings.localPort}");
}
catch
{

}
};
port.Start();
}
Expand Down Expand Up @@ -368,12 +416,21 @@ public async Task Intercept(ServiceDetails service, IReadOnlyList<(int remotePor
await StartSshForward(bridgeDetails, service);
}

private SemaphoreSlim semaphore = new SemaphoreSlim(1);
public async Task Release(string connectionId)
{
var services = this.BridgedServices.Where(x => x.ConnectionId == connectionId);
foreach (var service in services)
await semaphore.WaitAsync();
try
{
var services = this.BridgedServices.Where(x => x.ConnectionId == connectionId).ToList();
foreach (var service in services)
{
await Release(service);
}
}
finally
{
await Release(service);
semaphore.Release();
}
}

Expand All @@ -390,11 +447,12 @@ public Task Release(BridgeDetails bridgeDetails)

public async Task Release(ServiceDetails service)
{
var details = BridgedServices.FirstOrDefault(x => x.ServiceName.Equals(service.ServiceName, StringComparison.OrdinalIgnoreCase) && x.Namespace.Equals(service.Namespace, StringComparison.OrdinalIgnoreCase));
if (details != null)
var details = BridgedServices.Where(x => x.ServiceName.Equals(service.ServiceName, StringComparison.OrdinalIgnoreCase) && x.Namespace.Equals(service.Namespace, StringComparison.OrdinalIgnoreCase)).ToList();

if (details.Any())
{
BridgedServices = BridgedServices.Where(x => x != details).ToArray();
OnBridgedServicesChanged?.Invoke(this, BridgedServices);
BridgedServices = BridgedServices.Where(x => !details.Contains(x)).ToArray();
console.WriteLine($"Shutting down bridge for {service.ServiceName}");
}

var pod = await FindInterceptionPod(service);
Expand All @@ -408,6 +466,10 @@ public async Task Release(ServiceDetails service)
{
await EnableDeployment(dep);
}
if (details != null)
{
OnBridgedServicesChanged?.Invoke(this, BridgedServices);
}
}

public async Task ReleaseAll()
Expand Down

0 comments on commit 02439f6

Please sign in to comment.