Skip to content

Commit

Permalink
Merge pull request #65 from taosdata/enh/xftan/TD-27500-3.0
Browse files Browse the repository at this point in the history
enh: websocket compression
  • Loading branch information
huskar-t authored Jan 24, 2024
2 parents b991896 + 7fbd021 commit 4191699
Show file tree
Hide file tree
Showing 14 changed files with 106 additions and 49 deletions.
2 changes: 2 additions & 0 deletions README-CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ ConnectionStringBuilder 支持的参数如下:
* connTimeout: WebSocket 连接超时时间,仅当 protocol 为 WebSocket 时有效,默认为 1 分钟,使用 `TimeSpan.Parse` 方法解析字符串为 `TimeSpan` 对象。
* readTimeout: WebSocket 读超时时间,仅当 protocol 为 WebSocket 时有效,默认为 5 分钟,使用 `TimeSpan.Parse` 方法解析字符串为 `TimeSpan` 对象。
* writeTimeout: WebSocket 写超时时间,仅当 protocol 为 WebSocket 时有效,默认为 10 秒,使用 `TimeSpan.Parse` 方法解析字符串为 `TimeSpan` 对象。
* enableCompression: 是否启用 WebSocket 压缩(dotnet 版本 6 及以上,连接器版本 3.1.1 及以上生效),默认为 false

### 指定 URL 和 Properties 获取连接

Expand Down Expand Up @@ -747,6 +748,7 @@ consumer 支持的配置参数如下:
* auto.commit.interval.ms: 自动提交 offset 的间隔时间,默认为 5000 毫秒
* auto.offset.reset: 当 offset 不存在时,从哪里开始消费,可选值为 earliest 或 latest,默认为 latest
* msg.with.table.name: 消息是否包含表名
* ws.message.enableCompression: 是否启用 WebSocket 压缩(dotnet 版本 6 及以上,连接器版本 3.1.1 及以上生效),默认为 false

支持订阅结果集 `Dictionary<string, object>` key 为列名,value 为列值。

Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ The parameters supported by `ConnectionStringBuilder` are as follows:
* connTimeout: WebSocket connection timeout, only valid when the protocol is WebSocket, the default is 1 minute, use the `TimeSpan.Parse` method to parse the string into a `TimeSpan` object.
* readTimeout: WebSocket read timeout, only valid when the protocol is WebSocket, the default is 5 minutes, use the `TimeSpan.Parse` method to parse the string into a `TimeSpan` object.
* writeTimeout: WebSocket write timeout, only valid when the protocol is WebSocket, the default is 10 seconds, use the `TimeSpan.Parse` method to parse the string into a `TimeSpan` object.
* enableCompression: Whether to enable WebSocket compression (effective for dotnet version 6 and above, connector version 3.1.1 and above). The default is false.

### Specify the URL and Properties to get the connection

Expand Down Expand Up @@ -747,6 +748,7 @@ The configuration parameters supported by consumer are as follows:
* auto.commit.interval.ms: The interval for automatically submitting offsets, the default is 5000 milliseconds
* auto.offset.reset: When offset does not exist, where to start consumption, the optional value is earliest or latest, the default is latest
* msg.with.table.name: Whether the message contains the table name
* ws.message.enableCompression: Whether to enable WebSocket compression (effective for dotnet version 6 and above, connector version 3.1.1 and above). The default is false.

Supports subscribing to the result set `Dictionary<string, object>` where the key is the column name and the value is the column value.

Expand Down
4 changes: 3 additions & 1 deletion src/Driver/Client/Websocket/WSClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ public WSClient(ConnectionStringBuilder builder)
Debug.Assert(builder.Protocol == TDengineConstant.ProtocolWebSocket);
_tz = builder.Timezone;
_connection = new Connection(GetUrl(builder), builder.Username, builder.Password,
builder.Database, builder.ConnTimeout, builder.ReadTimeout, builder.WriteTimeout);
builder.Database, builder.ConnTimeout, builder.ReadTimeout, builder.WriteTimeout,
builder.EnableCompression);

_connection.Connect();
}
Expand Down Expand Up @@ -97,6 +98,7 @@ public long Exec(string query, long reqId)
{
_connection.FreeResult(resp.ResultId);
}

return resp.AffectedRows;
}

Expand Down
28 changes: 24 additions & 4 deletions src/Driver/ConnectionStringBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public class ConnectionStringBuilder : DbConnectionStringBuilder
private const string WriteTimeoutKey = "writeTimeout";
private const string TokenKey = "token";
private const string UseSSLKey = "useSSL";
private const string EnableCompressionKey = "enableCompression";

private enum KeysEnum
{
Expand All @@ -34,7 +35,9 @@ private enum KeysEnum
ReadTimeout,
WriteTimeout,
Token,
UseSSL
UseSSL,
EnableCompression,
Total
}

private string _host = string.Empty;
Expand All @@ -49,13 +52,14 @@ private enum KeysEnum
private TimeSpan _writeTimeout = TimeSpan.Zero;
private string _token = string.Empty;
private bool _useSSL = false;
private bool _enableCompression = false;

private static readonly IReadOnlyList<string> KeysList;
private static readonly IReadOnlyDictionary<string, KeysEnum> KeysDict;

static ConnectionStringBuilder()
{
var list = new string[12];
var list = new string[(int)KeysEnum.Total];
list[(int)KeysEnum.Host] = HostKey;
list[(int)KeysEnum.Port] = PortKey;
list[(int)KeysEnum.Database] = DatabaseKey;
Expand All @@ -68,9 +72,10 @@ static ConnectionStringBuilder()
list[(int)KeysEnum.WriteTimeout] = WriteTimeoutKey;
list[(int)KeysEnum.Token] = TokenKey;
list[(int)KeysEnum.UseSSL] = UseSSLKey;
list[(int)KeysEnum.EnableCompression] = EnableCompressionKey;
KeysList = list;

KeysDict = new Dictionary<string, KeysEnum>(12, StringComparer.OrdinalIgnoreCase)
KeysDict = new Dictionary<string, KeysEnum>((int)KeysEnum.Total, StringComparer.OrdinalIgnoreCase)
{
[HostKey] = KeysEnum.Host,
[PortKey] = KeysEnum.Port,
Expand All @@ -83,7 +88,8 @@ static ConnectionStringBuilder()
[ReadTimeoutKey] = KeysEnum.ReadTimeout,
[WriteTimeoutKey] = KeysEnum.WriteTimeout,
[TokenKey] = KeysEnum.Token,
[UseSSLKey] = KeysEnum.UseSSL
[UseSSLKey] = KeysEnum.UseSSL,
[EnableCompressionKey] = KeysEnum.EnableCompression
};
}

Expand Down Expand Up @@ -145,6 +151,9 @@ public ConnectionStringBuilder(string connectionString)
case KeysEnum.UseSSL:
UseSSL = Convert.ToBoolean(value);
break;
case KeysEnum.EnableCompression:
EnableCompression = Convert.ToBoolean(value);
break;
default:
throw new ArgumentOutOfRangeException(nameof(index), index, "get value error");
}
Expand Down Expand Up @@ -230,6 +239,12 @@ public bool UseSSL
get => _useSSL;
set => base[UseSSLKey] = _useSSL = value;
}

public bool EnableCompression
{
get => _enableCompression;
set => base[EnableCompressionKey] = _enableCompression = value;
}

public override ICollection Keys => new ReadOnlyCollection<string>((string[])KeysList);

Expand Down Expand Up @@ -275,6 +290,8 @@ private object GetAt(KeysEnum index)
return Token;
case KeysEnum.UseSSL:
return UseSSL;
case KeysEnum.EnableCompression:
return EnableCompression;
default:
throw new ArgumentOutOfRangeException(nameof(index), index, "get value error");
}
Expand Down Expand Up @@ -334,6 +351,9 @@ private void Reset(KeysEnum index)
case KeysEnum.UseSSL:
_useSSL = false;
return;
case KeysEnum.EnableCompression:
_enableCompression = false;
return;
default:
throw new ArgumentOutOfRangeException(nameof(index), index, null);
}
Expand Down
19 changes: 16 additions & 3 deletions src/Driver/Impl/WebSocketMethods/BaseConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,22 @@ public class BaseConnection
private readonly TimeSpan _defaultWriteTimeout = TimeSpan.FromSeconds(10);

protected BaseConnection(string addr, TimeSpan connectTimeout = default,
TimeSpan readTimeout = default, TimeSpan writeTimeout = default)
TimeSpan readTimeout = default, TimeSpan writeTimeout = default, bool enableCompression = false)
{
_client = new ClientWebSocket();
_client.Options.KeepAliveInterval = TimeSpan.FromSeconds(30);

#if NET6_0_OR_GREATER
if (enableCompression)
{
_client.Options.DangerousDeflateOptions = new WebSocketDeflateOptions()
{
ClientMaxWindowBits = 15, // Default value
ServerMaxWindowBits = 15, // Default value
ClientContextTakeover = true, // Default value
ServerContextTakeover = true // Default value
};
}
#endif
if (connectTimeout == default)
{
connectTimeout = _defaultConnTimeout;
Expand Down Expand Up @@ -105,8 +116,10 @@ protected T2 SendJsonBackJson<T1, T2>(string action, T1 req) where T2 : IWSBaseR
// Console.WriteLine(Encoding.UTF8.GetString(respBytes));
if (resp.Action != action)
{
throw new TDengineError(-1, $"receive unexpected action {resp.Action},req:{reqStr}",Encoding.UTF8.GetString(respBytes));
throw new TDengineError(-1, $"receive unexpected action {resp.Action},req:{reqStr}",
Encoding.UTF8.GetString(respBytes));
}

if (resp.Code == 0) return resp;
throw new TDengineError(resp.Code, resp.Message);
}
Expand Down
2 changes: 1 addition & 1 deletion src/Driver/Impl/WebSocketMethods/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public partial class Connection : BaseConnection
private readonly string _db = string.Empty;

public Connection(string addr, string user, string password, string db, TimeSpan connectTimeout = default,
TimeSpan readTimeout = default, TimeSpan writeTimeout = default) : base(addr, connectTimeout, readTimeout, writeTimeout)
TimeSpan readTimeout = default, TimeSpan writeTimeout = default, bool enableCompression = false) : base(addr, connectTimeout, readTimeout, writeTimeout,enableCompression)
{
_user = user;
_password = password;
Expand Down
22 changes: 14 additions & 8 deletions src/Driver/Impl/WebSocketMethods/TMQ.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ namespace TDengine.Driver.Impl.WebSocketMethods
public class TMQConnection : BaseConnection
{
public TMQConnection(TMQOptions options, TimeSpan connectTimeout = default,
TimeSpan readTimeout = default, TimeSpan writeTimeout = default) : base(GetUrl(options), connectTimeout, readTimeout,
writeTimeout)
TimeSpan readTimeout = default, TimeSpan writeTimeout = default) : base(
GetUrl(options), connectTimeout, readTimeout,
writeTimeout, options.TDEnableCompression == "true")
{
}

Expand Down Expand Up @@ -41,7 +42,7 @@ private static string GetUrl(TMQOptions options)
return $"{schema}://{options.TDConnectIp}:{port}/rest/tmq?token={options.TDToken}";
}
}

public WSTMQSubscribeResp Subscribe(List<string> topics, TMQOptions options)
{
return Subscribe(_GetReqId(), topics, options);
Expand Down Expand Up @@ -153,8 +154,9 @@ public WSTMQGetTopicAssignmentResp Assignment(ulong reqId, string topic)

public WSTMQOffsetSeekResp Seek(string topic, int vgroupId, long offset)
{
return Seek(_GetReqId(), topic,vgroupId, offset);
return Seek(_GetReqId(), topic, vgroupId, offset);
}

public WSTMQOffsetSeekResp Seek(ulong reqId, string topic, int vgroupId, long offset)
{
return SendJsonBackJson<WSTMQOffsetSeekReq, WSTMQOffsetSeekResp>(WSTMQAction.TMQSeek,
Expand Down Expand Up @@ -186,8 +188,9 @@ public WSTMQCommitOffsetResp CommitOffset(ulong reqId, string topic, int vgroupI

public WSTMQCommittedResp Committed(List<WSTopicVgroupId> tvIds)
{
return Committed(_GetReqId(),tvIds);
return Committed(_GetReqId(), tvIds);
}

public WSTMQCommittedResp Committed(ulong reqId, List<WSTopicVgroupId> tvIds)
{
return SendJsonBackJson<WSTMQCommittedReq, WSTMQCommittedResp>(WSTMQAction.TMQCommitted,
Expand All @@ -202,6 +205,7 @@ public WSTMQPositionResp Position(List<WSTopicVgroupId> tvIds)
{
return Position(_GetReqId(), tvIds);
}

public WSTMQPositionResp Position(ulong reqId, List<WSTopicVgroupId> tvIds)
{
return SendJsonBackJson<WSTMQPositionReq, WSTMQPositionResp>(WSTMQAction.TMQPosition,
Expand Down Expand Up @@ -243,10 +247,12 @@ public class TMQOptions
public string MsgWithTableName => Get("msg.with.table.name");

public string TDConnectIp => Get("td.connect.ip");

public string TDUseSSL => Get("useSSL");

public string TDToken => Get("token");

public string TDEnableCompression => Get("ws.message.enableCompression");

public string TDConnectUser => Get("td.connect.user");

Expand All @@ -255,7 +261,7 @@ public class TMQOptions
public string TDConnectPort => Get("td.connect.port");

public string TDDatabase => Get("td.connect.db");

public string TDConnectType => Get("td.connect.type");

public TMQOptions(IEnumerable<KeyValuePair<string, string>> config)
Expand Down
6 changes: 6 additions & 0 deletions src/TMQ/ConsumerConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ public string TDToken
get => Get("token");
set => this.SetObject("token", value);
}

public string TDEnableCompression
{
get => Get("ws.message.enableCompression");
set => this.SetObject("ws.message.enableCompression", value);
}

public string TDConnectUser
{
Expand Down
12 changes: 6 additions & 6 deletions test/Data.Tests/TDengineDataReaderTesting.cs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public void CommonExec()
Assert.Equal((sbyte)7, (sbyte)reader.GetByte(7));

Assert.Equal(true, reader.GetValue(8));
Assert.Equal(true, reader.GetBoolean(8));
Assert.True(reader.GetBoolean(8));

Assert.Equal("9nchar", reader.GetValue(9));
Assert.Equal("9nchar", reader.GetString(9));
Expand Down Expand Up @@ -255,7 +255,7 @@ public void Statement()
Assert.Equal((sbyte)7, (sbyte)reader.GetByte(7));

Assert.Equal(true, reader.GetValue(8));
Assert.Equal(true, reader.GetBoolean(8));
Assert.True(reader.GetBoolean(8));

Assert.Equal("9nchar", reader.GetValue(9));
Assert.Equal("9nchar", reader.GetString(9));
Expand Down Expand Up @@ -385,7 +385,7 @@ public void StatementNano()
Assert.Equal((sbyte)7, (sbyte)reader.GetByte(7));

Assert.Equal(true, reader.GetValue(8));
Assert.Equal(true, reader.GetBoolean(8));
Assert.True(reader.GetBoolean(8));

Assert.Equal("9nchar", reader.GetValue(9));
Assert.Equal("9nchar", reader.GetString(9));
Expand Down Expand Up @@ -469,7 +469,7 @@ public void WSCommonExec()
Assert.Equal((sbyte)7, (sbyte)reader.GetByte(7));

Assert.Equal(true, reader.GetValue(8));
Assert.Equal(true, reader.GetBoolean(8));
Assert.True(reader.GetBoolean(8));

Assert.Equal("9nchar", reader.GetValue(9));
Assert.Equal("9nchar", reader.GetString(9));
Expand Down Expand Up @@ -598,7 +598,7 @@ public void WSStatement()
Assert.Equal((sbyte)7, (sbyte)reader.GetByte(7));

Assert.Equal(true, reader.GetValue(8));
Assert.Equal(true, reader.GetBoolean(8));
Assert.True(reader.GetBoolean(8));

Assert.Equal("9nchar", reader.GetValue(9));
Assert.Equal("9nchar", reader.GetString(9));
Expand Down Expand Up @@ -728,7 +728,7 @@ public void WSStatementNano()
Assert.Equal((sbyte)7, (sbyte)reader.GetByte(7));

Assert.Equal(true, reader.GetValue(8));
Assert.Equal(true, reader.GetBoolean(8));
Assert.True(reader.GetBoolean(8));

Assert.Equal("9nchar", reader.GetValue(9));
Assert.Equal("9nchar", reader.GetString(9));
Expand Down
14 changes: 7 additions & 7 deletions test/Driver.Test/Client/Native/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -360,9 +360,9 @@ public void TDengineNativeStmtTest()
Assert.True(isInsert);
stmt.SetTableName("t1");
stmt.SetTags(new object[] { "{\"a\":\"b\"}" });
stmt.BindRow(new Object[]
stmt.BindRow(new object[]
{ now, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, "test_binary", "test_nchar" });
stmt.BindRow(new Object[]
stmt.BindRow(new object?[]
{ nextSecond, null, null, null, null, null, null, null, null, null, null, null, null, null });
stmt.AddBatch();
stmt.Exec();
Expand Down Expand Up @@ -494,9 +494,9 @@ public void TDengineNativeStmtNanoTest()
var tagFields = stmt.GetTagFields();
Assert.Single(tagFields);
stmt.SetTags(new object[] { "{\"a\":\"b\"}" });
stmt.BindRow(new Object[]
stmt.BindRow(new object[]
{ now, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, "test_binary", "test_nchar" });
stmt.BindRow(new Object[]
stmt.BindRow(new object?[]
{ nextSecond, null, null, null, null, null, null, null, null, null, null, null, null, null });
stmt.AddBatch();
stmt.Exec();
Expand Down Expand Up @@ -629,9 +629,9 @@ public void TDengineNativeStmtNanoReqTest()
Assert.True(isInsert);
stmt.SetTableName("t1");
stmt.SetTags(new object[] { "{\"a\":\"b\"}" });
stmt.BindRow(new Object[]
stmt.BindRow(new object[]
{ now, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, "test_binary", "test_nchar" });
stmt.BindRow(new Object[]
stmt.BindRow(new object?[]
{ nextSecond, null, null, null, null, null, null, null, null, null, null, null, null, null });
stmt.AddBatch();
stmt.Exec();
Expand Down Expand Up @@ -792,7 +792,7 @@ public void TDengineNativeStmtBindColumnsTest()
var a9 = new ulong?[] { v9, null, vv9 };
var a10 = new float?[] { v10, null, vv10 };
var a11 = new double?[]{v11,null,vv11};
stmt.BindColumn(fields,new DateTime[]{now,nextSecond,next2Second},a1,a2,a3,a4,a5,a6,a7,a8,a9,a10,a11,new string[]{ "test_binary",null,"b中文b"},new string[]{ "test_nchar",null,"n中文n"});
stmt.BindColumn(fields,new DateTime[]{now,nextSecond,next2Second},a1,a2,a3,a4,a5,a6,a7,a8,a9,a10,a11,new string?[]{ "test_binary",null,"b中文b"},new string?[]{ "test_nchar",null,"n中文n"});
stmt.AddBatch();
stmt.Exec();
var affected = stmt.Affected();
Expand Down
Loading

0 comments on commit 4191699

Please sign in to comment.