diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml index 1bb399f..bed3582 100644 --- a/.github/workflows/linux.yml +++ b/.github/workflows/linux.yml @@ -203,6 +203,7 @@ jobs: run: | sudo mkdir -p /etc/taos sudo cp ./.github/workflows/taos.cfg /etc/taos/taos.cfg + sudo cp ./.github/workflows/taosadapter.toml /etc/taos/taosadapter.toml - name: Setup dotnet uses: actions/setup-dotnet@v4 diff --git a/.github/workflows/taosadapter.toml b/.github/workflows/taosadapter.toml new file mode 100644 index 0000000..a18addf --- /dev/null +++ b/.github/workflows/taosadapter.toml @@ -0,0 +1,115 @@ +debug = true +taosConfigDir = "" +port = 6041 +logLevel = "info" +httpCodeServerError = false +SMLAutoCreateDB = false + +[cors] +allowAllOrigins = true + +#[pool] +#maxConnect = 0 +#maxIdle = 0 +#idleTimeout = 0 + +[ssl] +enable = false +certFile = "" +keyFile = "" + +[log] +#path = "/var/log/taos" +rotationCount = 30 +rotationTime = "24h" +rotationSize = "1GB" +enableRecordHttpSql = false +sqlRotationCount = 2 +sqlRotationTime = "24h" +sqlRotationSize = "1GB" + +[monitor] +disable = false +collectDuration = "3s" +disableCollectClientIP = false +incgroup = false +pauseQueryMemoryThreshold = 70 +pauseAllMemoryThreshold = 80 +identity = "" +writeToTD = false +user = "root" +password = "taosdata" +writeInterval = "30s" + +[uploadKeeper] +enable = false +url = "http://127.0.0.1:6043/adapter_report" +interval = "15s" +timeout = "5s" +retryTimes = 3 +retryInterval = "5s" + +[opentsdb] +enable = true + +[influxdb] +enable = true + +[statsd] +enable = false +port = 6044 +db = "statsd" +user = "root" +password = "taosdata" +worker = 10 +gatherInterval = "5s" +protocol = "udp4" +maxTCPConnections = 250 +tcpKeepAlive = false +allowPendingMessages = 50000 +deleteCounters = true +deleteGauges = true +deleteSets = true +deleteTimings = true + +[collectd] +enable = false +port = 6045 +db = "collectd" +user = "root" +password = "taosdata" +worker = 10 + + +[opentsdb_telnet] +enable = false +maxTCPConnections = 250 +tcpKeepAlive = false +dbs = ["opentsdb_telnet", "collectd", "icinga2", "tcollector"] +ports = [6046, 6047, 6048, 6049] +user = "root" +password = "taosdata" +batchSize = 1 +flushInterval = "0s" + +[node_exporter] +enable = false +db = "node_exporter" +user = "root" +password = "taosdata" +urls = ["http://localhost:9100"] +responseTimeout = "5s" +httpUsername = "" +httpPassword = "" +httpBearerTokenString = "" +caCertFile = "" +certFile = "" +keyFile = "" +insecureSkipVerify = true +gatherDuration = "5s" + +[prometheus] +enable = true + +[tmq] +releaseIntervalMultiplierForAutocommit = 2 \ No newline at end of file diff --git a/src/Driver/Client/Websocket/WSClient.cs b/src/Driver/Client/Websocket/WSClient.cs index a063482..9713ece 100644 --- a/src/Driver/Client/Websocket/WSClient.cs +++ b/src/Driver/Client/Websocket/WSClient.cs @@ -1,5 +1,6 @@ using System; using System.Diagnostics; +using System.Net.WebSockets; using TDengine.Driver.Impl.WebSocketMethods; namespace TDengine.Driver.Client.Websocket @@ -7,7 +8,8 @@ namespace TDengine.Driver.Client.Websocket public class WSClient : ITDengineClient { private Connection _connection; - private TimeZoneInfo _tz; + private readonly TimeZoneInfo _tz; + private readonly ConnectionStringBuilder _builder; public WSClient(ConnectionStringBuilder builder) { @@ -18,6 +20,7 @@ public WSClient(ConnectionStringBuilder builder) builder.EnableCompression); _connection.Connect(); + _builder = builder; } private static string GetUrl(ConnectionStringBuilder builder) @@ -59,12 +62,72 @@ public void Dispose() } } + private void Reconnect() + { + if (!_builder.AutoReconnect) + return; + + Connection connection = null; + for (int i = 0; i < _builder.ReconnectRetryCount; i++) + { + try + { + // sleep + System.Threading.Thread.Sleep(_builder.ReconnectIntervalMs); + connection = new Connection(GetUrl(_builder), _builder.Username, _builder.Password, + _builder.Database, _builder.ConnTimeout, _builder.ReadTimeout, _builder.WriteTimeout, + _builder.EnableCompression); + connection.Connect(); + break; + } + catch (Exception) + { + if (connection != null) + { + connection.Close(); + connection = null; + } + } + } + + if (connection == null) + { + throw new TDengineError((int)TDengineError.InternalErrorCode.WS_RECONNECT_FAILED, + "websocket connection reconnect failed"); + } + + if (_connection != null) + { + _connection.Close(); + } + + _connection = connection; + } + public IStmt StmtInit() { return StmtInit(ReqId.GetReqId()); } public IStmt StmtInit(long reqId) + { + try + { + return DoStmtInit(reqId); + } + catch (Exception e) + { + if (_connection.IsAvailable(e)) + { + throw; + } + + Reconnect(); + return DoStmtInit(reqId); + } + } + + private IStmt DoStmtInit(long reqId) { var resp = _connection.StmtInit((ulong)reqId); return new WSStmt(resp.StmtId, _tz, _connection); @@ -76,6 +139,24 @@ public IRows Query(string query) } public IRows Query(string query, long reqId) + { + try + { + return DoQuery(query, reqId); + } + catch (Exception e) + { + if (_connection.IsAvailable(e)) + { + throw; + } + + Reconnect(); + return DoQuery(query, reqId); + } + } + + private IRows DoQuery(string query, long reqId) { var resp = _connection.Query(query, (ulong)reqId); if (resp.IsUpdate) @@ -92,6 +173,24 @@ public long Exec(string query) } public long Exec(string query, long reqId) + { + try + { + return DoExec(query, reqId); + } + catch (Exception e) + { + if (_connection.IsAvailable(e)) + { + throw; + } + + Reconnect(); + return DoExec(query, reqId); + } + } + + private long DoExec(string query, long reqId) { var resp = _connection.Query(query, (ulong)reqId); if (!resp.IsUpdate) @@ -105,6 +204,26 @@ public long Exec(string query, long reqId) public void SchemalessInsert(string[] lines, TDengineSchemalessProtocol protocol, TDengineSchemalessPrecision precision, int ttl, long reqId) + { + try + { + DoSchemalessInsert(lines, protocol, precision, ttl, reqId); + } + catch (Exception e) + { + if (_connection.IsAvailable(e)) + { + throw; + } + + Reconnect(); + DoSchemalessInsert(lines, protocol, precision, ttl, reqId); + } + } + + private void DoSchemalessInsert(string[] lines, TDengineSchemalessProtocol protocol, + TDengineSchemalessPrecision precision, + int ttl, long reqId) { var line = string.Join("\n", lines); _connection.SchemalessInsert(line, protocol, precision, ttl, reqId); diff --git a/src/Driver/Client/Websocket/WSRows.cs b/src/Driver/Client/Websocket/WSRows.cs index 60cc2e3..50804c1 100644 --- a/src/Driver/Client/Websocket/WSRows.cs +++ b/src/Driver/Client/Websocket/WSRows.cs @@ -103,7 +103,7 @@ public void Dispose() } _freed = true; - if (_connection != null) + if (_connection != null && _connection.IsAvailable()) { _connection.FreeResult(_resultId); } @@ -164,6 +164,7 @@ private void FetchBlock() { return; } + _block = _connection.FetchBlock(_resultId); _blockReader.SetBlock(_block, _blockSize); } diff --git a/src/Driver/ConnectionStringBuilder.cs b/src/Driver/ConnectionStringBuilder.cs index ef50d43..e3d7f60 100644 --- a/src/Driver/ConnectionStringBuilder.cs +++ b/src/Driver/ConnectionStringBuilder.cs @@ -21,6 +21,10 @@ public class ConnectionStringBuilder : DbConnectionStringBuilder private const string TokenKey = "token"; private const string UseSSLKey = "useSSL"; private const string EnableCompressionKey = "enableCompression"; + private const string AutoReconnectKey = "autoReconnect"; + private const string ReconnectRetryCountKey = "reconnectRetryCount"; + private const string ReconnectIntervalMsKey = "reconnectIntervalMs"; + private enum KeysEnum { @@ -37,6 +41,9 @@ private enum KeysEnum Token, UseSSL, EnableCompression, + AutoReconnect, + ReconnectRetryCount, + ReconnectIntervalMs, Total } @@ -53,6 +60,9 @@ private enum KeysEnum private string _token = string.Empty; private bool _useSSL = false; private bool _enableCompression = false; + private bool _autoReconnect = false; + private int _reconnectRetryCount = 3; + private int _reconnectIntervalMs = 2000; private static readonly IReadOnlyList KeysList; private static readonly IReadOnlyDictionary KeysDict; @@ -73,6 +83,9 @@ static ConnectionStringBuilder() list[(int)KeysEnum.Token] = TokenKey; list[(int)KeysEnum.UseSSL] = UseSSLKey; list[(int)KeysEnum.EnableCompression] = EnableCompressionKey; + list[(int)KeysEnum.AutoReconnect] = AutoReconnectKey; + list[(int)KeysEnum.ReconnectRetryCount] = ReconnectRetryCountKey; + list[(int)KeysEnum.ReconnectIntervalMs] = ReconnectIntervalMsKey; KeysList = list; KeysDict = new Dictionary((int)KeysEnum.Total, StringComparer.OrdinalIgnoreCase) @@ -89,7 +102,10 @@ static ConnectionStringBuilder() [WriteTimeoutKey] = KeysEnum.WriteTimeout, [TokenKey] = KeysEnum.Token, [UseSSLKey] = KeysEnum.UseSSL, - [EnableCompressionKey] = KeysEnum.EnableCompression + [EnableCompressionKey] = KeysEnum.EnableCompression, + [AutoReconnectKey] = KeysEnum.AutoReconnect, + [ReconnectRetryCountKey] = KeysEnum.ReconnectRetryCount, + [ReconnectIntervalMsKey] = KeysEnum.ReconnectIntervalMs }; } @@ -154,6 +170,15 @@ public ConnectionStringBuilder(string connectionString) case KeysEnum.EnableCompression: EnableCompression = Convert.ToBoolean(value); break; + case KeysEnum.AutoReconnect: + AutoReconnect = Convert.ToBoolean(value); + break; + case KeysEnum.ReconnectRetryCount: + ReconnectRetryCount = Convert.ToInt32(value); + break; + case KeysEnum.ReconnectIntervalMs: + ReconnectIntervalMs = Convert.ToInt32(value); + break; default: throw new ArgumentOutOfRangeException(nameof(index), index, "get value error"); } @@ -207,45 +232,89 @@ public string Protocol public TimeZoneInfo Timezone { get => _timezone; - set => base[TimezoneKey] = _timezone = value; + set + { + base[TimezoneKey] = value.Id; + _timezone = value; + } } public TimeSpan ConnTimeout { get => _connTimeout; - set => base[ConnTimeoutKey] = _connTimeout = value; + set + { + base[ConnTimeoutKey] = value.ToString(); + _connTimeout = value; + } } public TimeSpan ReadTimeout { get => _readTimeout; - set => base[ReadTimeoutKey] = _readTimeout = value; + set + { + base[ReadTimeoutKey] = value.ToString(); + _readTimeout = value; + } } public TimeSpan WriteTimeout { get => _writeTimeout; - set => base[WriteTimeoutKey] = _writeTimeout = value; + set + { + base[WriteTimeoutKey] = value.ToString(); + _writeTimeout = value; + } } - + public string Token { get => _token; set => base[TokenKey] = _token = value; } - + public bool UseSSL { get => _useSSL; set => base[UseSSLKey] = _useSSL = value; } - + public bool EnableCompression { get => _enableCompression; set => base[EnableCompressionKey] = _enableCompression = value; } + public bool AutoReconnect + { + get => _autoReconnect; + set => base[AutoReconnectKey] = _autoReconnect = value; + } + + public int ReconnectRetryCount + { + get => _reconnectRetryCount; + set + { + if (value < 0) + throw new ArgumentException("invalid reconnect retry count value", ReconnectRetryCountKey); + base[ReconnectRetryCountKey] = _reconnectRetryCount = value; + } + } + + public int ReconnectIntervalMs + { + get => _reconnectIntervalMs; + set + { + if (value < 0) + throw new ArgumentException("invalid reconnect interval value", ReconnectIntervalMsKey); + base[ReconnectIntervalMsKey] = _reconnectIntervalMs = value; + } + } + public override ICollection Keys => new ReadOnlyCollection((string[])KeysList); public override ICollection Values @@ -292,6 +361,12 @@ private object GetAt(KeysEnum index) return UseSSL; case KeysEnum.EnableCompression: return EnableCompression; + case KeysEnum.AutoReconnect: + return AutoReconnect; + case KeysEnum.ReconnectRetryCount: + return ReconnectRetryCount; + case KeysEnum.ReconnectIntervalMs: + return ReconnectIntervalMs; default: throw new ArgumentOutOfRangeException(nameof(index), index, "get value error"); } @@ -354,6 +429,15 @@ private void Reset(KeysEnum index) case KeysEnum.EnableCompression: _enableCompression = false; return; + case KeysEnum.AutoReconnect: + _autoReconnect = false; + return; + case KeysEnum.ReconnectRetryCount: + _reconnectRetryCount = 3; + return; + case KeysEnum.ReconnectIntervalMs: + _reconnectIntervalMs = 2000; + return; default: throw new ArgumentOutOfRangeException(nameof(index), index, null); } diff --git a/src/Driver/Impl/WebSocketMethods/BaseConnection.cs b/src/Driver/Impl/WebSocketMethods/BaseConnection.cs index 73516cd..7b0ac91 100644 --- a/src/Driver/Impl/WebSocketMethods/BaseConnection.cs +++ b/src/Driver/Impl/WebSocketMethods/BaseConnection.cs @@ -16,7 +16,6 @@ public class BaseConnection private readonly TimeSpan _readTimeout; private readonly TimeSpan _writeTimeout; - private const int InternalError = -1; private ulong _reqId; private readonly TimeSpan _defaultConnTimeout = TimeSpan.FromMinutes(1); private readonly TimeSpan _defaultReadTimeout = TimeSpan.FromMinutes(5); @@ -65,7 +64,8 @@ protected BaseConnection(string addr, TimeSpan connectTimeout = default, if (_client.State != WebSocketState.Open) { - throw new TDengineError(InternalError, $"connect to {addr} fail"); + throw new TDengineError((int)TDengineError.InternalErrorCode.WS_CONNEC_FAILED, + $"connect to {addr} fail"); } } @@ -94,7 +94,8 @@ protected T SendBinaryBackJson(byte[] request) where T : IWSBaseResp var respBytes = Receive(out var messageType); if (messageType != WebSocketMessageType.Text) { - throw new TDengineError(-1, "receive unexpected binary message"); + throw new TDengineError((int)TDengineError.InternalErrorCode.WS_UNEXPECTED_MESSAGE, + "receive unexpected binary message"); } var resp = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(respBytes)); @@ -108,14 +109,16 @@ protected T2 SendJsonBackJson(string action, T1 req) where T2 : IWSBaseR var respBytes = Receive(out var messageType); if (messageType != WebSocketMessageType.Text) { - throw new TDengineError(-1, "receive unexpected binary message", respBytes, reqStr); + throw new TDengineError((int)TDengineError.InternalErrorCode.WS_UNEXPECTED_MESSAGE, + "receive unexpected binary message", respBytes, reqStr); } var resp = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(respBytes)); // Console.WriteLine(Encoding.UTF8.GetString(respBytes)); if (resp.Action != action) { - throw new TDengineError(-1, $"receive unexpected action {resp.Action},req:{reqStr}", + throw new TDengineError((int)TDengineError.InternalErrorCode.WS_UNEXPECTED_MESSAGE, + $"receive unexpected action {resp.Action},req:{reqStr}", Encoding.UTF8.GetString(respBytes)); } @@ -149,6 +152,12 @@ protected string SendJson(string action, T req) private async Task SendAsync(ArraySegment data, WebSocketMessageType messageType) { + if (!IsAvailable()) + { + throw new TDengineError((int)TDengineError.InternalErrorCode.WS_CONNECTION_CLOSED, + "websocket connection is closed"); + } + using (var cts = new CancellationTokenSource()) { cts.CancelAfter(_writeTimeout); @@ -158,7 +167,8 @@ private async Task SendAsync(ArraySegment data, WebSocketMessageType messa } catch (OperationCanceledException) { - throw new TDengineError(InternalError, "write message timeout"); + throw new TDengineError((int)TDengineError.InternalErrorCode.WS_WRITE_TIMEOUT, + "write message timeout"); } } } @@ -166,21 +176,15 @@ private async Task SendAsync(ArraySegment data, WebSocketMessageType messa private void SendText(string request) { var data = new ArraySegment(Encoding.UTF8.GetBytes(request)); - Task.Run(async () => - { - await SendAsync(data, WebSocketMessageType.Text).ConfigureAwait(true); - }); + Task.Run(async () => { await SendAsync(data, WebSocketMessageType.Text).ConfigureAwait(true); }).Wait(); } private void SendBinary(byte[] request) { var data = new ArraySegment(request); - Task.Run(async () => - { - await SendAsync(data, WebSocketMessageType.Binary).ConfigureAwait(true); - }); + Task.Run(async () => { await SendAsync(data, WebSocketMessageType.Binary).ConfigureAwait(true); }).Wait(); } - + private byte[] Receive(out WebSocketMessageType messageType) { var task = Task.Run(async () => await ReceiveAsync().ConfigureAwait(true)); @@ -188,9 +192,15 @@ private byte[] Receive(out WebSocketMessageType messageType) messageType = task.Result.Item2; return task.Result.Item1; } - - private async Task> ReceiveAsync() + + private async Task> ReceiveAsync() { + if (!IsAvailable()) + { + throw new TDengineError((int)TDengineError.InternalErrorCode.WS_CONNECTION_CLOSED, + "websocket connection is closed"); + } + using (var cts = new CancellationTokenSource()) { cts.CancelAfter(_readTimeout); @@ -202,13 +212,18 @@ private async Task> ReceiveAsync() do { - result = await _client.ReceiveAsync(new ArraySegment(buffer), cts.Token).ConfigureAwait(false); + result = await _client.ReceiveAsync(new ArraySegment(buffer), cts.Token) + .ConfigureAwait(false); if (result.MessageType == WebSocketMessageType.Close) { - await _client.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None).ConfigureAwait(false); - throw new TDengineError(InternalError, "receive websocket close frame"); + await _client + .CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None) + .ConfigureAwait(false); + throw new TDengineError((int)TDengineError.InternalErrorCode.WS_RECEIVE_CLOSE_FRAME, + "receive websocket close frame"); } + memoryStream.Write(buffer, 0, result.Count); } while (!result.EndOfMessage); @@ -228,5 +243,27 @@ public void Close() // ignored } } + + public bool IsAvailable(Exception e = null) + { + if (_client.State != WebSocketState.Open) + return false; + + switch (e) + { + case null: + return true; + case WebSocketException _: + return false; + case AggregateException ae: + return !(ae.InnerException is WebSocketException); + case TDengineError te: + return te.Code != (int)TDengineError.InternalErrorCode.WS_CONNECTION_CLOSED && + te.Code != (int)TDengineError.InternalErrorCode.WS_RECEIVE_CLOSE_FRAME && + te.Code != (int)TDengineError.InternalErrorCode.WS_WRITE_TIMEOUT; + default: + return true; + } + } } } \ No newline at end of file diff --git a/src/Driver/Impl/WebSocketMethods/TMQ.cs b/src/Driver/Impl/WebSocketMethods/TMQ.cs index a88d4bd..d39f8dc 100644 --- a/src/Driver/Impl/WebSocketMethods/TMQ.cs +++ b/src/Driver/Impl/WebSocketMethods/TMQ.cs @@ -263,6 +263,12 @@ public class TMQOptions public string TDConnectType => Get("td.connect.type"); + public string TDReconnect => Get("ws.autoReconnect"); + + public string TDReconnectRetryCount => Get("ws.reconnect.retry.count"); + + public string TDReconnectIntervalMs => Get("ws.reconnect.interval.ms"); + public TMQOptions(IEnumerable> config) { this.properties = new Dictionary(); diff --git a/src/Driver/TDengineError.cs b/src/Driver/TDengineError.cs index 704272d..55384a4 100644 --- a/src/Driver/TDengineError.cs +++ b/src/Driver/TDengineError.cs @@ -10,25 +10,35 @@ public class TDengineError : Exception public byte[] ExtendedErrorBytes { get; } public string ExtendedErrorString { get; } - public TDengineError(int code, string error) : base($"code:[0x{(code&0xffff):x}],error:{error}") + public enum InternalErrorCode { - Code = code&0xffff; + WS_RECONNECT_FAILED = 0xf001, + WS_UNEXPECTED_MESSAGE = 0xf002, + WS_CONNECTION_CLOSED = 0xf003, + WS_WRITE_TIMEOUT = 0xf004, + WS_CONNEC_FAILED = 0xf005, + WS_RECEIVE_CLOSE_FRAME = 0xf006, + } + + public TDengineError(int code, string error) : base($"code:[0x{(code & 0xffff):x}],error:{error}") + { + Code = code & 0xffff; Error = error; } public TDengineError(int code, string error, byte[] extendedErrorBytes, string extendedErrorString) : base( - $"code:[0x{(code&0xffff):x}],error:{error},extendedBytes:{Format(extendedErrorBytes)},extendedString:{extendedErrorString}") + $"code:[0x{(code & 0xffff):x}],error:{error},extendedBytes:{Format(extendedErrorBytes)},extendedString:{extendedErrorString}") { - Code = code&0xffff; + Code = code & 0xffff; Error = error; ExtendedErrorBytes = extendedErrorBytes; ExtendedErrorString = extendedErrorString; } public TDengineError(int code, string error, string extendedErrorString) : base( - $"code:[0x{(code&0xffff):x}],error:{error},extendedString:{extendedErrorString}") + $"code:[0x{(code & 0xffff):x}],error:{error},extendedString:{extendedErrorString}") { - Code = code&0xffff; + Code = code & 0xffff; Error = error; ExtendedErrorString = extendedErrorString; } diff --git a/src/TMQ/ConsumerConfig.cs b/src/TMQ/ConsumerConfig.cs index 624d6ef..feea1fc 100644 --- a/src/TMQ/ConsumerConfig.cs +++ b/src/TMQ/ConsumerConfig.cs @@ -4,7 +4,9 @@ namespace TDengine.TMQ { public class ConsumerConfig : Config { - public ConsumerConfig() : base() { } + public ConsumerConfig() : base() + { + } public ConsumerConfig(Config config) : base(config) { @@ -69,7 +71,7 @@ public string TDToken get => Get("token"); set => this.SetObject("token", value); } - + public string TDEnableCompression { get => Get("ws.message.enableCompression"); @@ -103,7 +105,25 @@ public string TDDatabase public string TDConnectType { get => Get("td.connect.type"); - set => SetObject("td.connect.type",value); + set => SetObject("td.connect.type", value); + } + + public string TDReconnect + { + get => Get("ws.autoReconnect"); + set => SetObject("ws.autoReconnect", value); + } + + public string TDReconnectRetryCount + { + get => Get("ws.reconnect.retry.count"); + set => SetObject("ws.reconnect.retry.count", value); + } + + public string TDReconnectIntervalMs + { + get => Get("ws.reconnect.interval.ms"); + set => SetObject("ws.reconnect.interval.ms", value); } } } \ No newline at end of file diff --git a/src/TMQ/WebSocket/Consumer.cs b/src/TMQ/WebSocket/Consumer.cs index 2131c09..97f1265 100644 --- a/src/TMQ/WebSocket/Consumer.cs +++ b/src/TMQ/WebSocket/Consumer.cs @@ -10,10 +10,14 @@ namespace TDengine.TMQ.WebSocket public class Consumer : IConsumer { private readonly TMQOptions _options; - private readonly TMQConnection _connection; + private TMQConnection _connection; private readonly bool _autoCommit; private readonly int _autoCommitInterval; private DateTime _nextCommitTime; + private readonly bool _reconnect; + private readonly int _reconnectRetryCount; + private readonly int _reconnectRetryIntervalMs; + private List _topics; private IDeserializer valueDeserializer; @@ -41,23 +45,72 @@ public Consumer(ConsumerBuilder builder) this.valueDeserializer = builder.ValueDeserializer; } - if (_options.EnableAutoCommit != "true") return; - _autoCommit = true; - if (!string.IsNullOrEmpty(_options.AutoCommitIntervalMs)) + if (_options.EnableAutoCommit == "true") + { + _autoCommit = true; + if (!string.IsNullOrEmpty(_options.AutoCommitIntervalMs)) + { + if (!int.TryParse(_options.AutoCommitIntervalMs, out _autoCommitInterval)) + throw new ArgumentException($"Invalid auto commit interval {_options.AutoCommitIntervalMs}"); + } + else + _autoCommitInterval = 5000; + } + + if (_options.TDReconnect == "true") + { + _reconnect = true; + if (!int.TryParse(_options.TDReconnectRetryCount, out _reconnectRetryCount)) + throw new ArgumentException($"Invalid reconnect retry count {_options.TDReconnectRetryCount}"); + if (_reconnectRetryCount < 0) + throw new ArgumentException($"Invalid reconnect retry count {_options.TDReconnectRetryCount}"); + if (!int.TryParse(_options.TDReconnectIntervalMs, out _reconnectRetryIntervalMs)) + throw new ArgumentException($"Invalid reconnect retry intervalMs {_options.TDReconnectIntervalMs}"); + if (_reconnectRetryIntervalMs < 0) + throw new ArgumentException($"Invalid reconnect retry intervalMs {_options.TDReconnectIntervalMs}"); + } + } + + private void Reconnect() + { + if (!_reconnect) + return; + TMQConnection connection = null; + for (int i = 0; i < _reconnectRetryCount; i++) { try { - _autoCommitInterval = int.Parse(_options.AutoCommitIntervalMs); + System.Threading.Thread.Sleep(_reconnectRetryIntervalMs); + connection = new TMQConnection(_options); + if (_topics != null) + { + connection.Subscribe(_topics, _options); + } + + break; } - catch (Exception e) + catch (Exception) { - throw new ArgumentException("Invalid auto commit interval", e); + if (connection != null) + { + connection.Close(); + connection = null; + } } } - else + + if (connection == null) { - _autoCommitInterval = 5000; + throw new TDengineError((int)TDengineError.InternalErrorCode.WS_RECONNECT_FAILED, + "websocket connection reconnect failed"); } + + if (_connection != null) + { + _connection.Close(); + } + + _connection = connection; } public ConsumeResult Consume(int millisecondsTimeout) @@ -67,11 +120,29 @@ public ConsumeResult Consume(int millisecondsTimeout) var now = DateTime.Now; if (now >= _nextCommitTime) { - Commit(); + _connection.Commit(); _nextCommitTime = now.AddMilliseconds(_autoCommitInterval); } } + try + { + return DoConsume(millisecondsTimeout); + } + catch (Exception e) + { + if (_connection.IsAvailable(e)) + { + throw; + } + + Reconnect(); + return DoConsume(millisecondsTimeout); + } + } + + private ConsumeResult DoConsume(int millisecondsTimeout) + { var resp = _connection.Poll(millisecondsTimeout); if (!resp.HaveMessage) { @@ -118,12 +189,33 @@ public List Subscription() public void Subscribe(IEnumerable topic) { - _connection.Subscribe((List)topic, _options); + var topics = (List)topic; + DoSubscribe(topics); } public void Subscribe(string topic) { - _connection.Subscribe(new List { topic }, _options); + var topics = new List { topic }; + DoSubscribe(topics); + } + + private void DoSubscribe(List topics) + { + try + { + _connection.Subscribe(topics, _options); + _topics = topics; + } + catch (Exception e) + { + if (_connection.IsAvailable(e)) + { + throw; + } + + Reconnect(); + _connection.Subscribe(topics, _options); + } } public void Unsubscribe() @@ -216,7 +308,8 @@ public Offset Position(TopicPartition partition) public void Close() { - _connection.Close(); + if (_connection != null && _connection.IsAvailable()) + _connection.Close(); } private bool NeedGetData(TMQ_RES type) diff --git a/test/Data.Tests/TDengineConnectionStringBuilderTests.cs b/test/Data.Tests/TDengineConnectionStringBuilderTests.cs index 6de6c38..481a186 100644 --- a/test/Data.Tests/TDengineConnectionStringBuilderTests.cs +++ b/test/Data.Tests/TDengineConnectionStringBuilderTests.cs @@ -33,24 +33,64 @@ public void DefaultWebSocket_ShouldSetDefaultValues() [Fact] public void Parse() { - var builder = new TDengineConnectionStringBuilder("host=127.0.0.1;port=6030;username=root;password=taosdata;protocol=Native;db=test"); - Assert.Equal("127.0.0.1",builder.Host); - Assert.Equal(6030,builder.Port); - Assert.Equal("root",builder.Username); - Assert.Equal("taosdata",builder.Password); - Assert.Equal("test",builder.Database); - Assert.Equal(TDengineConstant.ProtocolNative,builder.Protocol); + var builder = + new TDengineConnectionStringBuilder( + "host=127.0.0.1;port=6030;username=root;password=taosdata;protocol=Native;db=test"); + Assert.Equal("127.0.0.1", builder.Host); + Assert.Equal(6030, builder.Port); + Assert.Equal("root", builder.Username); + Assert.Equal("taosdata", builder.Password); + Assert.Equal("test", builder.Database); + Assert.Equal(TDengineConstant.ProtocolNative, builder.Protocol); builder.Clear(); - Assert.Equal(string.Empty,builder.Host); - Assert.Equal(0,builder.Port); - Assert.Equal(string.Empty,builder.Username); - Assert.Equal(string.Empty,builder.Password); - Assert.Equal(string.Empty,builder.Database); - Assert.Equal(TDengineConstant.ProtocolNative,builder.Protocol); + Assert.Equal(string.Empty, builder.Host); + Assert.Equal(0, builder.Port); + Assert.Equal(string.Empty, builder.Username); + Assert.Equal(string.Empty, builder.Password); + Assert.Equal(string.Empty, builder.Database); + Assert.Equal(TDengineConstant.ProtocolNative, builder.Protocol); builder.Database = "test2"; - Assert.Equal("test2",builder.Database); + Assert.Equal("test2", builder.Database); builder.Remove("db"); - Assert.Equal(string.Empty,builder.Database); + Assert.Equal(string.Empty, builder.Database); + } + + [Fact] + public void ParseWebSocket() + { + var builder = new TDengineConnectionStringBuilder( + "host=127.0.0.1;" + + "port=6041;" + + "username=root;" + + "password=taosdata;" + + "protocol=WebSocket;" + + "db=test;" + + "enableCompression=true;" + + "connTimeout=00:00:10;" + + "readTimeout=00:00:20;" + + "writeTimeout=00:00:30;" + + "timezone=UTC;" + + "useSSL=true;" + + "token=123456;" + + "autoReconnect=true;" + + "reconnectIntervalMs=10;" + + "reconnectRetryCount=5"); + Assert.Equal("127.0.0.1", builder.Host); + Assert.Equal(6041, builder.Port); + Assert.Equal("root", builder.Username); + Assert.Equal("taosdata", builder.Password); + Assert.Equal("test", builder.Database); + Assert.Equal(TDengineConstant.ProtocolWebSocket, builder.Protocol); + Assert.True(builder.EnableCompression); + Assert.Equal(10, builder.ConnTimeout.TotalSeconds); + Assert.Equal(20, builder.ReadTimeout.TotalSeconds); + Assert.Equal(30, builder.WriteTimeout.TotalSeconds); + Assert.Equal("UTC", builder.Timezone.Id); + Assert.True(builder.UseSSL); + Assert.Equal("123456", builder.Token); + Assert.True(builder.AutoReconnect); + Assert.Equal(10, builder.ReconnectIntervalMs); + Assert.Equal(5, builder.ReconnectRetryCount); } } -} +} \ No newline at end of file diff --git a/test/Driver.Test/Client/Query/Reconnect.cs b/test/Driver.Test/Client/Query/Reconnect.cs new file mode 100644 index 0000000..7c3dd1e --- /dev/null +++ b/test/Driver.Test/Client/Query/Reconnect.cs @@ -0,0 +1,208 @@ +using System; +using System.Diagnostics; +using System.Net.Http; +using System.Runtime.InteropServices; +using System.Threading; +using System.Threading.Tasks; +using TDengine.Driver; +using TDengine.Driver.Client; +using Xunit; + +namespace Driver.Test.Client.Query +{ + public partial class Client + { + private Process NewTaosAdapter(string port) + { + string exec; + if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) + { + exec = "C:\\TDengine\\taosadapter.exe"; + } + else + { + exec = "taosadapter"; + } + + ProcessStartInfo startInfo = new ProcessStartInfo(exec, $"--port {port}"); + Process process = new Process { StartInfo = startInfo }; + return process; + } + + private async Task Start(Process process, string port) + { + process.Start(); + await WaitForStart(port); + } + + private void Stop(Process process) + { + if (process.HasExited) + { + return; + } + + process.Kill(); + } + + private async Task WaitForStart(string port) + { + HttpClient client = new HttpClient(); + string url = $"http://127.0.0.1:{port}/-/ping"; + bool success = await WaitForPingSuccess(client, url); + if (!success) + { + throw new Exception("Failed to start taosadapter"); + } + } + + static async Task WaitForPingSuccess(HttpClient client, string url) + { + bool success = false; + int retryCount = 20; + int retryDelayMs = 100; + + for (int i = 0; i < retryCount; i++) + { + try + { + HttpResponseMessage response = await client.GetAsync(url); + if (response.IsSuccessStatusCode) + { + success = true; + break; + } + } + catch (Exception e) + { + // ignored + } + + await Task.Delay(retryDelayMs); + } + + return success; + } + + [Fact] + public void QueryReconnect() + { + var port = "36043"; + var process = NewTaosAdapter(port); + Start(process, port).Wait(); + Thread.Sleep(1000); + var connStr = + $"protocol=WebSocket;host=localhost;port={port};useSSL=false;username=root;password=taosdata;enableCompression=true;autoReconnect=true;"; + + var builder = new ConnectionStringBuilder(connStr); + using (var client = DbDriver.Open(builder)) + { + try + { + client.Exec("drop database if exists test_query_reconnect"); + client.Exec("create database test_query_reconnect"); + client.Exec("create table test_query_reconnect.t1 (ts timestamp, a int, b float, c binary(10))"); + Stop(process); + Task.Run(() => + { + Thread.Sleep(3000); + Start(process, port).Wait(); + }); + client.Exec("insert into test_query_reconnect.t1 values (now, 1, 1.1, 'abc')"); + using (var rows = client.Query("select * from test_query_reconnect.t1")) + { + var haveNext = rows.Read(); + Assert.True(haveNext); + } + + Stop(process); + Task.Run(() => + { + Thread.Sleep(3000); + Start(process, port).Wait(); + }); + using (var rows = client.Query("select * from test_query_reconnect.t1")) + { + var haveNext = rows.Read(); + Assert.True(haveNext); + } + } + finally + { + Stop(process); + } + } + } + + [Fact] + public void SchemalessReconnect() + { + var port = "36044"; + var process = NewTaosAdapter(port); + try + { + Start(process, port).Wait(); + var connStr = + $"protocol=WebSocket;host=localhost;port={port};useSSL=false;username=root;password=taosdata;enableCompression=true;autoReconnect=true;"; + + var builder = new ConnectionStringBuilder(connStr); + using (var client = DbDriver.Open(builder)) + { + client.Exec("drop database if exists test_sml_reconnect"); + client.Exec("create database test_sml_reconnect"); + } + + var data = new string[] + { + "sys_if_bytes_out 1479496100 1.3E3 host=web01 interface=eth0", + "sys_procs_running 1479496100 42 host=web01", + }; + builder.Database = "test_sml_reconnect"; + using (var client = DbDriver.Open(builder)) + { + Stop(process); + Task.Run(() => + { + Thread.Sleep(3000); + Start(process, port).Wait(); + }); + client.SchemalessInsert(data, TDengineSchemalessProtocol.TSDB_SML_TELNET_PROTOCOL, + TDengineSchemalessPrecision.TSDB_SML_TIMESTAMP_NOT_CONFIGURED, 0, ReqId.GetReqId()); + } + } + finally + { + Stop(process); + } + } + + [Fact] + public void StmtInitReconnect() + { + var port = "36045"; + var process = NewTaosAdapter(port); + try + { + Start(process, port).Wait(); + var connStr = + $"protocol=WebSocket;host=localhost;port={port};useSSL=false;username=root;password=taosdata;enableCompression=true;autoReconnect=true;"; + + var builder = new ConnectionStringBuilder(connStr); + using (var client = DbDriver.Open(builder)) + { + Stop(process); + Task.Run(() => + { + Thread.Sleep(3000); + Start(process, port).Wait(); + }); + client.StmtInit(); + } + } + finally + { + Stop(process); + } + } + } +} \ No newline at end of file diff --git a/test/Driver.Test/Client/TMQ/Reconnect.cs b/test/Driver.Test/Client/TMQ/Reconnect.cs new file mode 100644 index 0000000..691754d --- /dev/null +++ b/test/Driver.Test/Client/TMQ/Reconnect.cs @@ -0,0 +1,194 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Net.Http; +using System.Runtime.InteropServices; +using System.Threading; +using System.Threading.Tasks; +using TDengine.Driver; +using TDengine.Driver.Client; +using TDengine.TMQ; +using Xunit; + +namespace Driver.Test.Client.TMQ +{ + public partial class Consumer + { + private Process NewTaosAdapter(string port) + { + string exec; + if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) + { + exec = "C:\\TDengine\\taosadapter.exe"; + } + else + { + exec = "taosadapter"; + } + + ProcessStartInfo startInfo = new ProcessStartInfo(exec, $"--port {port}"); + Process process = new Process { StartInfo = startInfo }; + return process; + } + + private async Task Start(Process process, string port) + { + process.Start(); + await WaitForStart(port); + } + + private void Stop(Process process) + { + if (process.HasExited) + { + return; + } + + process.Kill(); + } + + private async Task WaitForStart(string port) + { + HttpClient client = new HttpClient(); + string url = $"http://127.0.0.1:{port}/-/ping"; + bool success = await WaitForPingSuccess(client, url); + if (!success) + { + throw new Exception("Failed to start taosadapter"); + } + } + + static async Task WaitForPingSuccess(HttpClient client, string url) + { + bool success = false; + int retryCount = 20; + int retryDelayMs = 100; + + for (int i = 0; i < retryCount; i++) + { + try + { + HttpResponseMessage response = await client.GetAsync(url); + if (response.IsSuccessStatusCode) + { + success = true; + break; + } + } + catch (Exception e) + { + // ignored + } + + await Task.Delay(retryDelayMs); + } + + return success; + } + + [Fact] + public void SubscribeReconnect() + { + var port = "36041"; + var process = NewTaosAdapter(port); + Start(process, port).Wait(); + try + { + var connStr = + $"protocol=WebSocket;host=127.0.0.1;port={port};useSSL=false;username=root;password=taosdata;enableCompression=true"; + var builder = new ConnectionStringBuilder(connStr); + using (var client = DbDriver.Open(builder)) + { + client.Exec("create database if not exists test_subscribe_reconnect"); + client.Exec( + "create topic if not exists topic_subscribe_reconnect as database test_subscribe_reconnect"); + } + + var cfg = new Dictionary() + { + { "td.connect.type", "WebSocket" }, + { "group.id", "test" }, + { "auto.offset.reset", "earliest" }, + { "td.connect.ip", "127.0.0.1" }, + { "td.connect.user", "root" }, + { "td.connect.pass", "taosdata" }, + { "td.connect.port", $"{port}" }, + { "client.id", "test_tmq_c" }, + { "enable.auto.commit", "false" }, + { "msg.with.table.name", "true" }, + { "useSSL", "false" }, + { "ws.autoReconnect", "true" }, + { "ws.reconnect.retry.count", "3" }, + { "ws.reconnect.interval.ms", "2000" } + }; + var consumer = new ConsumerBuilder>(cfg).Build(); + Stop(process); + Task.Run(() => + { + Thread.Sleep(3000); + Start(process, port).Wait(); + }); + consumer.Subscribe("topic_subscribe_reconnect"); + } + finally + { + Stop(process); + } + } + + [Fact] + public void ConsumeReconnect() + { + var port = "36042"; + var process = NewTaosAdapter(port); + Start(process, port).Wait(); + try + { + var connStr = + $"protocol=WebSocket;host=127.0.0.1;port={port};useSSL=false;username=root;password=taosdata;enableCompression=true"; + var builder = new ConnectionStringBuilder(connStr); + using (var client = DbDriver.Open(builder)) + { + client.Exec("create database if not exists test_consume_reconnect"); + client.Exec( + "create table if not exists test_consume_reconnect.t1 (ts timestamp, a int, b float, c binary(10))"); + client.Exec( + "create topic if not exists topic_consume_reconnect as select * from test_consume_reconnect.t1"); + client.Exec("insert into test_consume_reconnect.t1 values (now, 1, 1.1, 'abc')"); + } + + var cfg = new Dictionary() + { + { "td.connect.type", "WebSocket" }, + { "group.id", "test" }, + { "auto.offset.reset", "earliest" }, + { "td.connect.ip", "127.0.0.1" }, + { "td.connect.user", "root" }, + { "td.connect.pass", "taosdata" }, + { "td.connect.port", $"{port}" }, + { "client.id", "test_tmq_c" }, + { "enable.auto.commit", "false" }, + { "msg.with.table.name", "true" }, + { "useSSL", "false" }, + { "ws.autoReconnect", "true" }, + { "ws.reconnect.retry.count", "3" }, + { "ws.reconnect.interval.ms", "2000" } + }; + var consumer = new ConsumerBuilder>(cfg).Build(); + consumer.Subscribe("topic_consume_reconnect"); + Stop(process); + Task.Run(() => + { + Thread.Sleep(3000); + Start(process, port).Wait(); + }); + var data = consumer.Consume(1000); + Assert.NotNull(data); + } + finally + { + Stop(process); + } + } + } +} \ No newline at end of file