Skip to content

Commit

Permalink
deleting comments
Browse files Browse the repository at this point in the history
  • Loading branch information
thejoeker12 committed Jul 10, 2024
1 parent c92ef82 commit 508db5c
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 39 deletions.
2 changes: 1 addition & 1 deletion concurrency/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,6 @@ const (

debounceScaleDownThreshold = 5 // Number of consecutive triggers before scaling down

//
// TODO this comment
AcceptableAverageResponseTime = 100 * time.Millisecond
)
16 changes: 7 additions & 9 deletions concurrency/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,39 +215,37 @@ func (ch *ConcurrencyHandler) MonitorResponseTimeVariability(responseTime time.D
ch.Metrics.ResponseTimeVariability.Lock()
defer ch.Metrics.ResponseTimeVariability.Unlock()

responseTimesLock.Lock() // Ensure safe concurrent access
responseTimesLock.Lock()
responseTimes = append(responseTimes, responseTime)
if len(responseTimes) > 10 {
responseTimes = responseTimes[1:] // Maintain last 10 measurements
responseTimes = responseTimes[1:]
}
responseTimesLock.Unlock()

stdDev := calculateStdDev(responseTimes)
averageResponseTime := calculateAverage(responseTimes)

// Check if conditions suggest a need to scale down
if stdDev > ch.Metrics.ResponseTimeVariability.StdDevThreshold && averageResponseTime > AcceptableAverageResponseTime {
ch.Metrics.ResponseTimeVariability.DebounceScaleDownCount++
if ch.Metrics.ResponseTimeVariability.DebounceScaleDownCount >= debounceScaleDownThreshold {
ch.Metrics.ResponseTimeVariability.DebounceScaleDownCount = 0
return -1 // Suggest decrease concurrency
return -1
}
} else {
ch.Metrics.ResponseTimeVariability.DebounceScaleDownCount = 0 // Reset counter if conditions are not met
ch.Metrics.ResponseTimeVariability.DebounceScaleDownCount = 0
}

// Check if conditions suggest a need to scale up
if stdDev <= ch.Metrics.ResponseTimeVariability.StdDevThreshold && averageResponseTime <= AcceptableAverageResponseTime {
ch.Metrics.ResponseTimeVariability.DebounceScaleUpCount++
if ch.Metrics.ResponseTimeVariability.DebounceScaleUpCount >= debounceScaleDownThreshold {
ch.Metrics.ResponseTimeVariability.DebounceScaleUpCount = 0
return 1 // Suggest increase concurrency
return 1
}
} else {
ch.Metrics.ResponseTimeVariability.DebounceScaleUpCount = 0 // Reset counter if conditions are not met
ch.Metrics.ResponseTimeVariability.DebounceScaleUpCount = 0
}

return 0 // Default to no change
return 0
}

// calculateAverage computes the average response time from a slice of time.Duration values.
Expand Down
7 changes: 0 additions & 7 deletions concurrency/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@ import "go.uber.org/zap"

// ScaleDown reduces the concurrency level by one, down to the minimum limit.
func (ch *ConcurrencyHandler) ScaleDown() {
// Lock to ensure thread safety
ch.Lock()
defer ch.Unlock()

// We must consider the capacity rather than the length of the semaphore channel
currentSize := cap(ch.sem)
if currentSize > MinConcurrency {
newSize := currentSize - 1
Expand All @@ -22,7 +20,6 @@ func (ch *ConcurrencyHandler) ScaleDown() {

// ScaleUp increases the concurrency level by one, up to the maximum limit.
func (ch *ConcurrencyHandler) ScaleUp() {
// Lock to ensure thread safety
ch.Lock()
defer ch.Unlock()

Expand All @@ -49,19 +46,15 @@ func (ch *ConcurrencyHandler) ScaleUp() {
func (ch *ConcurrencyHandler) ResizeSemaphore(newSize int) {
newSem := make(chan struct{}, newSize)

// Transfer tokens from the old semaphore to the new one.
for {
select {
case token := <-ch.sem:
select {
case newSem <- token:
// Token transferred to new semaphore.
default:
// New semaphore is full, put token back to the old one to allow ongoing operations to complete.
ch.sem <- token
}
default:
// No more tokens to transfer.
close(ch.sem)
ch.sem = newSem
return
Expand Down
30 changes: 8 additions & 22 deletions concurrency/semaphore.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,28 +33,21 @@ import (
// concurrency tracking.
func (ch *ConcurrencyHandler) AcquireConcurrencyPermit(ctx context.Context) (context.Context, uuid.UUID, error) {
log := ch.logger

// Start measuring the permit acquisition time.
tokenAcquisitionStart := time.Now()

// Generate a unique request ID for this permit acquisition.
requestID := uuid.New()

// Create a new context with a specified timeout for acquiring the permit.
ctxWithTimeout, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel() // Ensure to free up resources by cancelling the context after use.
defer cancel()

select {
case ch.sem <- struct{}{}: // permit acquisition was successful.
// Record the time taken to acquire the permit.
case ch.sem <- struct{}{}:
tokenAcquisitionDuration := time.Since(tokenAcquisitionStart)
ch.trackResourceAcquisition(tokenAcquisitionDuration, requestID) // Track and log metrics.
ch.trackResourceAcquisition(tokenAcquisitionDuration, requestID)

// Create a new context that includes the unique request ID.
ctxWithRequestID := context.WithValue(ctx, RequestIDKey{}, requestID)
return ctxWithRequestID, requestID, nil

case <-ctxWithTimeout.Done(): // Timeout occurred before a permit could be acquired.
case <-ctxWithTimeout.Done():
log.Error("Failed to acquire concurrency permit", zap.Error(ctxWithTimeout.Err()))
return ctx, requestID, ctxWithTimeout.Err()
}
Expand All @@ -74,14 +67,12 @@ func (ch *ConcurrencyHandler) trackResourceAcquisition(duration time.Duration, r
ch.Lock()
defer ch.Unlock()

// Record the time taken to acquire the permit and update related metrics.
ch.AcquisitionTimes = append(ch.AcquisitionTimes, duration)
ch.Metrics.Lock()
ch.Metrics.PermitWaitTime += duration
ch.Metrics.TotalRequests++ // Increment the count of total requests handled.
ch.Metrics.TotalRequests++
ch.Metrics.Unlock()

// Calculate and log the current state of permit utilization.
utilizedPermits := len(ch.sem)
availablePermits := cap(ch.sem) - utilizedPermits
ch.logger.Debug("Resource acquired", zap.String("RequestID", requestID.String()), zap.Duration("Duration", duration), zap.Int("UtilizedPermits", utilizedPermits), zap.Int("AvailablePermits", availablePermits))
Expand All @@ -105,28 +96,23 @@ func (ch *ConcurrencyHandler) trackResourceAcquisition(duration time.Duration, r
// This usage ensures that the permit is released in a deferred manner at the end of the operation, regardless of
// how the operation exits (normal completion or error path).
func (ch *ConcurrencyHandler) ReleaseConcurrencyPermit(requestID uuid.UUID) {
// Safely remove a permit from the semaphore to make it available for other operations.
select {
case <-ch.sem:
// Continue to process after successfully retrieving a permit from the semaphore.
default:
// Log an error if no permit was available to release, indicating a potential synchronization issue.
ch.logger.Error("Attempted to release a non-existent concurrency permit", zap.String("RequestID", requestID.String()))
return
}

ch.Lock()
defer ch.Unlock()

// Update metrics related to permit release.
ch.Metrics.Lock()
ch.Metrics.TotalRequests-- // Decrement the count of total requests handled, if applicable.
ch.Metrics.TotalRequests--
ch.Metrics.Unlock()

utilizedPermits := len(ch.sem) // Calculate tokens currently in use.
availablePermits := cap(ch.sem) - utilizedPermits // Calculate tokens that are available for use.
utilizedPermits := len(ch.sem)
availablePermits := cap(ch.sem) - utilizedPermits

// Log the release of the concurrency permit for auditing and debugging purposes.
ch.logger.Debug("Released concurrency permit",
zap.String("RequestID", requestID.String()),
zap.Int("UtilizedPermits", utilizedPermits),
Expand Down

0 comments on commit 508db5c

Please sign in to comment.