Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pick-6.5: Support adaptive update interval for low resolution ts (#1484) #1531

Merged
merged 10 commits into from
Jan 9, 2025
24 changes: 12 additions & 12 deletions oracle/oracles/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,10 @@ type pdOracle struct {
lastTSMap sync.Map
quit chan struct{}
// The configured interval to update the low resolution ts. Set by SetLowResolutionTimestampUpdateInterval.
// For TiDB, this is directly controlled by the system variable `tidb_low_resolution_tso_update_interval`.
// For TiDB >=v8.0.0, this is directly controlled by the system variable `tidb_low_resolution_tso_update_interval`.
// For versions before v8.0.0, this value is fixed to 2s.
lastTSUpdateInterval atomic.Int64

// The actual interval to update the low resolution ts. If the configured one is too large to satisfy the
// requirement of the stale read or snapshot read, the actual interval can be automatically set to a shorter
// value than lastTSUpdateInterval.
Expand Down Expand Up @@ -351,13 +353,6 @@ func (o *pdOracle) getLastTSWithArrivalTS(txnScope string) (*lastTSO, bool) {
return last, true
}

func max(x, y time.Duration) time.Duration {
if x > y {
return x
}
return y
}

func (o *pdOracle) nextUpdateInterval(now time.Time, requiredStaleness time.Duration) time.Duration {
o.adaptiveUpdateIntervalState.mu.Lock()
defer o.adaptiveUpdateIntervalState.mu.Unlock()
Expand Down Expand Up @@ -402,7 +397,11 @@ func (o *pdOracle) nextUpdateInterval(now time.Time, requiredStaleness time.Dura
// update interval immediately to adapt to it.
// We shrink the update interval to a value slightly lower than the requested staleness to avoid potential
// frequent shrinking operations. But there's a lower bound to prevent loading ts too frequently.
newInterval := max(requiredStaleness-adaptiveUpdateTSIntervalShrinkingPreserve, minAllowedAdaptiveUpdateTSInterval)
// newInterval := max(requiredStaleness-adaptiveUpdateTSIntervalShrinkingPreserve, minAllowedAdaptiveUpdateTSInterval)
newInterval := requiredStaleness - adaptiveUpdateTSIntervalShrinkingPreserve
if newInterval < minAllowedAdaptiveUpdateTSInterval {
newInterval = minAllowedAdaptiveUpdateTSInterval
}
return adaptiveUpdateTSIntervalStateAdapting, newInterval
}

Expand Down Expand Up @@ -486,8 +485,6 @@ func (o *pdOracle) updateTS(ctx context.Context) {
ticker := time.NewTicker(currentInterval)
defer ticker.Stop()

// Note that as `doUpdate` updates last tick time while `nextUpdateInterval` may perform calculation depending on the
// last tick time, `doUpdate` should be called after finishing calculating the next interval.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks this is part of #1502 and don't need to be removed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes

doUpdate := func(now time.Time) {
// Update the timestamp for each txnScope
o.lastTSMap.Range(func(key, _ interface{}) bool {
Expand Down Expand Up @@ -694,7 +691,10 @@ func (o *pdOracle) adjustUpdateLowResolutionTSIntervalWithRequestedStaleness(rea
// calculation. Make sure it's always positive before passing it to the updateTS goroutine.
// Note that `nextUpdateInterval` method expects the requiredStaleness is always non-zero when triggerred
// by this path.
requiredStaleness = max(requiredStaleness, time.Millisecond)
// requiredStaleness = max(requiredStaleness, time.Millisecond)
if requiredStaleness < time.Millisecond {
requiredStaleness = time.Millisecond
}
// Try to non-blocking send a signal to notify it to change the interval immediately. But if the channel is
// busy, it means that there's another concurrent call trying to update it. Just skip it in this case.
select {
Expand Down
3 changes: 2 additions & 1 deletion oracle/oracles/pd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,8 +335,9 @@
getCtx := func(index int) context.Context {
if cancelIndex == index {
return ctx
} else {

Check failure on line 338 in oracle/oracles/pd_test.go

View workflow job for this annotation

GitHub Actions / golangci

`if` block ends with a `return` statement, so drop this `else` and outdent its block (golint)
return context.Background()
}
return context.Background()
}

results = append(results, asyncValidate(getCtx(0), ts-2))
Expand Down
Loading