Skip to content

Commit

Permalink
Merge pull request #74 from taosdata/enh/xftan/TS-4194
Browse files Browse the repository at this point in the history
enh: tmq commit without messageId
  • Loading branch information
huskar-t authored Apr 22, 2024
2 parents e7782e9 + 34a2148 commit 2b15661
Show file tree
Hide file tree
Showing 8 changed files with 237 additions and 47 deletions.
68 changes: 36 additions & 32 deletions .github/workflows/linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ on:
- 'main'
- '3.0'
- '3.1'

push:
branches:
- 'main'
Expand All @@ -29,23 +29,23 @@ jobs:
steps:
- name: checkout TDengine by pr
if: github.event_name == 'pull_request'
uses: actions/checkout@v3
uses: actions/checkout@v4
with:
repository: 'taosdata/TDengine'
path: 'TDengine'
ref: ${{ github.base_ref }}

- name: checkout TDengine by push
if: github.event_name == 'push'
uses: actions/checkout@v3
uses: actions/checkout@v4
with:
repository: 'taosdata/TDengine'
path: 'TDengine'
ref: ${{ github.ref_name }}

- name: checkout TDengine manually
if: github.event_name == 'workflow_dispatch'
uses: actions/checkout@v3
uses: actions/checkout@v4
with:
repository: 'taosdata/TDengine'
path: 'TDengine'
Expand All @@ -60,34 +60,27 @@ jobs:
- name: Cache server by pr
if: github.event_name == 'pull_request'
id: cache-server-pr
uses: actions/cache@v3
uses: actions/cache@v4
with:
path: server.tar.gz
key: ${{ runner.os }}-build-${{ github.base_ref }}-${{ steps.get_commit_id.outputs.commit_id }}

- name: Cache server by push
if: github.event_name == 'push'
id: cache-server-push
uses: actions/cache@v3
uses: actions/cache@v4
with:
path: server.tar.gz
key: ${{ runner.os }}-build-${{ github.ref_name }}-${{ steps.get_commit_id.outputs.commit_id }}

- name: Cache server manually
if: github.event_name == 'workflow_dispatch'
id: cache-server-manually
uses: actions/cache@v3
uses: actions/cache@v4
with:
path: server.tar.gz
key: ${{ runner.os }}-build-${{ inputs.tbBranch }}-${{ steps.get_commit_id.outputs.commit_id }}

- name: prepare install
if: >
(github.event_name == 'workflow_dispatch' && steps.cache-server-manually.outputs.cache-hit != 'true') ||
(github.event_name == 'pull_request' && steps.cache-server-pr.outputs.cache-hit != 'true') ||
(github.event_name == 'push' && steps.cache-server-push.outputs.cache-hit != 'true')
run: sudo apt install -y libgeos-dev

- name: install TDengine
if: >
(github.event_name == 'workflow_dispatch' && steps.cache-server-manually.outputs.cache-hit != 'true') ||
Expand Down Expand Up @@ -137,12 +130,12 @@ jobs:
name: Build C# ${{ matrix.dotnet }}
steps:
- name: Setup dotnet
uses: actions/setup-dotnet@v3
uses: actions/setup-dotnet@v4
with:
dotnet-version: ${{ matrix.dotnet }}

- name: checkout
uses: actions/checkout@v3
uses: actions/checkout@v4
with:
path: 'csharp-driver'

Expand All @@ -152,7 +145,7 @@ jobs:
echo `pwd`
ls -al
dotnet restore
- name: Build C# Driver
run: |
cd csharp-driver
Expand All @@ -171,17 +164,17 @@ jobs:
- name: get cache server by pr
if: github.event_name == 'pull_request'
id: get-cache-server-pr
uses: actions/cache@v3
uses: actions/cache@v4
with:
path: server.tar.gz
key: ${{ runner.os }}-build-${{ github.base_ref }}-${{ needs.build.outputs.commit_id }}
restore-keys: |
${{ runner.os }}-build-${{ github.base_ref }}-
- name: get cache server by push
if: github.event_name == 'push'
id: get-cache-server-push
uses: actions/cache@v3
uses: actions/cache@v4
with:
path: server.tar.gz
key: ${{ runner.os }}-build-${{ github.ref_name }}-${{ needs.build.outputs.commit_id }}
Expand All @@ -191,28 +184,31 @@ jobs:
- name: get cache server manually
if: github.event_name == 'workflow_dispatch'
id: get-cache-server-manually
uses: actions/cache@v3
uses: actions/cache@v4
with:
path: server.tar.gz
key: ${{ runner.os }}-build-${{ inputs.tbBranch }}-${{ needs.build.outputs.commit_id }}
restore-keys: |
${{ runner.os }}-build-${{ inputs.tbBranch }}-
- name: prepare install
run: sudo apt install -y libgeos-dev

- name: install
run: |
tar -zxvf server.tar.gz
cd release && sudo sh install.sh
- name: checkout
uses: actions/checkout@v4

- name: copy taos cfg
run: |
sudo mkdir -p /etc/taos
sudo cp ./.github/workflows/taos.cfg /etc/taos/taos.cfg
- name: Setup dotnet
uses: actions/setup-dotnet@v3
uses: actions/setup-dotnet@v4
with:
dotnet-version: ${{ matrix.dotnet }}

- name: checkout
uses: actions/checkout@v3

- name: Restore dependencies
run: |
Expand All @@ -227,25 +223,33 @@ jobs:
- name: start taosd
run: nohup sudo sh ./start.sh &

- name: start taosadapter
run: sudo taosadapter &

- name: test
id: test
run: |
sudo dotnet test --logger "console;verbosity=detailed" --collect:"XPlat Code Coverage" --results-directory:./testresults
timeout-minutes: 15

- uses: actions/upload-artifact@v3
if: always() && (steps.test.outcome == 'failure' || steps.test.outcome == 'cancelled')
with:
name: ${{ runner.os }}-${{ matrix.go }}-log
path: /var/log/taos/

- name: get coverage files
id: get_coverage_files
run: |
COVERAGE_FILES=$(find ./testresults -name 'coverage.cobertura.xml' -exec printf "%s;" {} +)
COVERAGE_FILES=${COVERAGE_FILES%;}
echo "files=${COVERAGE_FILES}" >> $GITHUB_OUTPUT
- name: Upload coverage to Codecov
if: ${{ matrix.go }} == '6.0.x'
uses: codecov/codecov-action@v4-beta
with:
files: ${{ steps.get_coverage_files.outputs.files }}
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_ORG_TOKEN }}
CODECOV_TOKEN: ${{ secrets.CODECOV_ORG_TOKEN }}
5 changes: 5 additions & 0 deletions .github/workflows/taos.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
fqdn localhost
firstEp localhost:6030
asyncLog 0
debugFlag 143
supportVnodes 256
2 changes: 0 additions & 2 deletions src/Driver/Impl/WebSocketMethods/Protocol/WSTMQCommitReq.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,5 @@ namespace TDengine.Driver.Impl.WebSocketMethods.Protocol
public class WSTMQCommitReq
{
[JsonProperty("req_id")] public ulong ReqId { get; set; }

[JsonProperty("message_id")] public ulong MessageId { get; set; }
}
}
11 changes: 5 additions & 6 deletions src/Driver/Impl/WebSocketMethods/TMQ.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public WSTMQSubscribeResp Subscribe(ulong reqId, List<string> topics, TMQOptions
ClientId = options.ClientId,
OffsetRest = options.AutoOffsetReset,
Topics = topics,
AutoCommit = options.EnableAutoCommit,
AutoCommit = "false",
AutoCommitIntervalMs = options.AutoCommitIntervalMs,
WithTableName = options.MsgWithTableName
});
Expand Down Expand Up @@ -109,17 +109,16 @@ public byte[] FetchBlock(ulong reqId, ulong messageId)
});
}

public WSTMQCommitResp Commit(ulong messageId)
public WSTMQCommitResp Commit()
{
return Commit(_GetReqId(), messageId);
return Commit(_GetReqId());
}

public WSTMQCommitResp Commit(ulong reqId, ulong messageId)
public WSTMQCommitResp Commit(ulong reqId)
{
return SendJsonBackJson<WSTMQCommitReq, WSTMQCommitResp>(WSTMQAction.TMQCommit, new WSTMQCommitReq
{
ReqId = reqId,
MessageId = messageId
});
}

Expand Down Expand Up @@ -251,7 +250,7 @@ public class TMQOptions
public string TDUseSSL => Get("useSSL");

public string TDToken => Get("token");

public string TDEnableCompression => Get("ws.message.enableCompression");

public string TDConnectUser => Get("td.connect.user");
Expand Down
37 changes: 33 additions & 4 deletions src/TMQ/WebSocket/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ public class Consumer<TValue> : IConsumer<TValue>
{
private readonly TMQOptions _options;
private readonly TMQConnection _connection;
private ulong _lastMessageId;
private readonly bool _autoCommit;
private readonly int _autoCommitInterval;
private DateTime _nextCommitTime;

private IDeserializer<TValue> valueDeserializer;

Expand All @@ -38,10 +40,38 @@ public Consumer(ConsumerBuilder<TValue> builder)
{
this.valueDeserializer = builder.ValueDeserializer;
}

if (_options.EnableAutoCommit != "true") return;
_autoCommit = true;
if (!string.IsNullOrEmpty(_options.AutoCommitIntervalMs))
{
try
{
_autoCommitInterval = int.Parse(_options.AutoCommitIntervalMs);
}
catch (Exception e)
{
throw new ArgumentException("Invalid auto commit interval", e);
}
}
else
{
_autoCommitInterval = 5000;
}
}

public ConsumeResult<TValue> Consume(int millisecondsTimeout)
{
if (_autoCommit)
{
var now = DateTime.Now;
if (now >= _nextCommitTime)
{
Commit();
_nextCommitTime = now.AddMilliseconds(_autoCommitInterval);
}
}

var resp = _connection.Poll(millisecondsTimeout);
if (!resp.HaveMessage)
{
Expand All @@ -50,7 +80,6 @@ public ConsumeResult<TValue> Consume(int millisecondsTimeout)

var consumeResult = new ConsumeResult<TValue>(resp.MessageId, resp.Topic, resp.VgroupId, resp.Offset,
(TMQ_RES)resp.MessageType);
_lastMessageId = resp.MessageId;
if (!NeedGetData((TMQ_RES)resp.MessageType)) return null;
var result = new TMQWSRows(resp, _connection, TimeZoneInfo.Local);
while (result.Read())
Expand Down Expand Up @@ -104,12 +133,12 @@ public void Unsubscribe()

public void Commit(ConsumeResult<TValue> consumerResult)
{
_connection.Commit(consumerResult.MessageId);
_connection.CommitOffset(consumerResult.Topic, consumerResult.Partition, consumerResult.Offset);
}

public List<TopicPartitionOffset> Commit()
{
_connection.Commit(_lastMessageId);
_connection.Commit();
return Committed(TimeSpan.Zero);
}

Expand Down
Loading

0 comments on commit 2b15661

Please sign in to comment.