Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enh: websocket reconnect #75

Merged
merged 1 commit into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ jobs:
run: |
sudo mkdir -p /etc/taos
sudo cp ./.github/workflows/taos.cfg /etc/taos/taos.cfg
sudo cp ./.github/workflows/taosadapter.toml /etc/taos/taosadapter.toml

- name: Setup dotnet
uses: actions/setup-dotnet@v4
Expand Down
115 changes: 115 additions & 0 deletions .github/workflows/taosadapter.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
debug = true
taosConfigDir = ""
port = 6041
logLevel = "info"
httpCodeServerError = false
SMLAutoCreateDB = false

[cors]
allowAllOrigins = true

#[pool]
#maxConnect = 0
#maxIdle = 0
#idleTimeout = 0

[ssl]
enable = false
certFile = ""
keyFile = ""

[log]
#path = "/var/log/taos"
rotationCount = 30
rotationTime = "24h"
rotationSize = "1GB"
enableRecordHttpSql = false
sqlRotationCount = 2
sqlRotationTime = "24h"
sqlRotationSize = "1GB"

[monitor]
disable = false
collectDuration = "3s"
disableCollectClientIP = false
incgroup = false
pauseQueryMemoryThreshold = 70
pauseAllMemoryThreshold = 80
identity = ""
writeToTD = false
user = "root"
password = "taosdata"
writeInterval = "30s"

[uploadKeeper]
enable = false
url = "http://127.0.0.1:6043/adapter_report"
interval = "15s"
timeout = "5s"
retryTimes = 3
retryInterval = "5s"

[opentsdb]
enable = true

[influxdb]
enable = true

[statsd]
enable = false
port = 6044
db = "statsd"
user = "root"
password = "taosdata"
worker = 10
gatherInterval = "5s"
protocol = "udp4"
maxTCPConnections = 250
tcpKeepAlive = false
allowPendingMessages = 50000
deleteCounters = true
deleteGauges = true
deleteSets = true
deleteTimings = true

[collectd]
enable = false
port = 6045
db = "collectd"
user = "root"
password = "taosdata"
worker = 10


[opentsdb_telnet]
enable = false
maxTCPConnections = 250
tcpKeepAlive = false
dbs = ["opentsdb_telnet", "collectd", "icinga2", "tcollector"]
ports = [6046, 6047, 6048, 6049]
user = "root"
password = "taosdata"
batchSize = 1
flushInterval = "0s"

[node_exporter]
enable = false
db = "node_exporter"
user = "root"
password = "taosdata"
urls = ["http://localhost:9100"]
responseTimeout = "5s"
httpUsername = ""
httpPassword = ""
httpBearerTokenString = ""
caCertFile = ""
certFile = ""
keyFile = ""
insecureSkipVerify = true
gatherDuration = "5s"

[prometheus]
enable = true

[tmq]
releaseIntervalMultiplierForAutocommit = 2
121 changes: 120 additions & 1 deletion src/Driver/Client/Websocket/WSClient.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
using System;
using System.Diagnostics;
using System.Net.WebSockets;
using TDengine.Driver.Impl.WebSocketMethods;

namespace TDengine.Driver.Client.Websocket
{
public class WSClient : ITDengineClient
{
private Connection _connection;
private TimeZoneInfo _tz;
private readonly TimeZoneInfo _tz;
private readonly ConnectionStringBuilder _builder;

public WSClient(ConnectionStringBuilder builder)
{
Expand All @@ -18,6 +20,7 @@ public WSClient(ConnectionStringBuilder builder)
builder.EnableCompression);

_connection.Connect();
_builder = builder;
}

private static string GetUrl(ConnectionStringBuilder builder)
Expand Down Expand Up @@ -59,12 +62,72 @@ public void Dispose()
}
}

private void Reconnect()
{
if (!_builder.AutoReconnect)
return;

Connection connection = null;
for (int i = 0; i < _builder.ReconnectRetryCount; i++)
{
try
{
// sleep
System.Threading.Thread.Sleep(_builder.ReconnectIntervalMs);
connection = new Connection(GetUrl(_builder), _builder.Username, _builder.Password,
_builder.Database, _builder.ConnTimeout, _builder.ReadTimeout, _builder.WriteTimeout,
_builder.EnableCompression);
connection.Connect();
break;
}
catch (Exception)
{
if (connection != null)
{
connection.Close();
connection = null;
}
}
}

if (connection == null)
{
throw new TDengineError((int)TDengineError.InternalErrorCode.WS_RECONNECT_FAILED,
"websocket connection reconnect failed");
}

if (_connection != null)
{
_connection.Close();
}

_connection = connection;
}

public IStmt StmtInit()
{
return StmtInit(ReqId.GetReqId());
}

public IStmt StmtInit(long reqId)
{
try
{
return DoStmtInit(reqId);
}
catch (Exception e)
{
if (_connection.IsAvailable(e))
{
throw;
}

Reconnect();
return DoStmtInit(reqId);
}
}

private IStmt DoStmtInit(long reqId)
{
var resp = _connection.StmtInit((ulong)reqId);
return new WSStmt(resp.StmtId, _tz, _connection);
Expand All @@ -76,6 +139,24 @@ public IRows Query(string query)
}

public IRows Query(string query, long reqId)
{
try
{
return DoQuery(query, reqId);
}
catch (Exception e)
{
if (_connection.IsAvailable(e))
{
throw;
}

Reconnect();
return DoQuery(query, reqId);
}
}

private IRows DoQuery(string query, long reqId)
{
var resp = _connection.Query(query, (ulong)reqId);
if (resp.IsUpdate)
Expand All @@ -92,6 +173,24 @@ public long Exec(string query)
}

public long Exec(string query, long reqId)
{
try
{
return DoExec(query, reqId);
}
catch (Exception e)
{
if (_connection.IsAvailable(e))
{
throw;
}

Reconnect();
return DoExec(query, reqId);
}
}

private long DoExec(string query, long reqId)
{
var resp = _connection.Query(query, (ulong)reqId);
if (!resp.IsUpdate)
Expand All @@ -105,6 +204,26 @@ public long Exec(string query, long reqId)
public void SchemalessInsert(string[] lines, TDengineSchemalessProtocol protocol,
TDengineSchemalessPrecision precision,
int ttl, long reqId)
{
try
{
DoSchemalessInsert(lines, protocol, precision, ttl, reqId);
}
catch (Exception e)
{
if (_connection.IsAvailable(e))
{
throw;
}

Reconnect();
DoSchemalessInsert(lines, protocol, precision, ttl, reqId);
}
}

private void DoSchemalessInsert(string[] lines, TDengineSchemalessProtocol protocol,
TDengineSchemalessPrecision precision,
int ttl, long reqId)
{
var line = string.Join("\n", lines);
_connection.SchemalessInsert(line, protocol, precision, ttl, reqId);
Expand Down
3 changes: 2 additions & 1 deletion src/Driver/Client/Websocket/WSRows.cs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public void Dispose()
}

_freed = true;
if (_connection != null)
if (_connection != null && _connection.IsAvailable())
{
_connection.FreeResult(_resultId);
}
Expand Down Expand Up @@ -164,6 +164,7 @@ private void FetchBlock()
{
return;
}

_block = _connection.FetchBlock(_resultId);
_blockReader.SetBlock(_block, _blockSize);
}
Expand Down
Loading
Loading