diff --git a/concurrency/constants.go b/concurrency/constants.go index 9fb9838..06d5ce7 100644 --- a/concurrency/constants.go +++ b/concurrency/constants.go @@ -57,6 +57,6 @@ const ( debounceScaleDownThreshold = 5 // Number of consecutive triggers before scaling down - // + // TODO this comment AcceptableAverageResponseTime = 100 * time.Millisecond ) diff --git a/concurrency/metrics.go b/concurrency/metrics.go index 0e9d18f..fe39551 100644 --- a/concurrency/metrics.go +++ b/concurrency/metrics.go @@ -215,39 +215,37 @@ func (ch *ConcurrencyHandler) MonitorResponseTimeVariability(responseTime time.D ch.Metrics.ResponseTimeVariability.Lock() defer ch.Metrics.ResponseTimeVariability.Unlock() - responseTimesLock.Lock() // Ensure safe concurrent access + responseTimesLock.Lock() responseTimes = append(responseTimes, responseTime) if len(responseTimes) > 10 { - responseTimes = responseTimes[1:] // Maintain last 10 measurements + responseTimes = responseTimes[1:] } responseTimesLock.Unlock() stdDev := calculateStdDev(responseTimes) averageResponseTime := calculateAverage(responseTimes) - // Check if conditions suggest a need to scale down if stdDev > ch.Metrics.ResponseTimeVariability.StdDevThreshold && averageResponseTime > AcceptableAverageResponseTime { ch.Metrics.ResponseTimeVariability.DebounceScaleDownCount++ if ch.Metrics.ResponseTimeVariability.DebounceScaleDownCount >= debounceScaleDownThreshold { ch.Metrics.ResponseTimeVariability.DebounceScaleDownCount = 0 - return -1 // Suggest decrease concurrency + return -1 } } else { - ch.Metrics.ResponseTimeVariability.DebounceScaleDownCount = 0 // Reset counter if conditions are not met + ch.Metrics.ResponseTimeVariability.DebounceScaleDownCount = 0 } - // Check if conditions suggest a need to scale up if stdDev <= ch.Metrics.ResponseTimeVariability.StdDevThreshold && averageResponseTime <= AcceptableAverageResponseTime { ch.Metrics.ResponseTimeVariability.DebounceScaleUpCount++ if ch.Metrics.ResponseTimeVariability.DebounceScaleUpCount >= debounceScaleDownThreshold { ch.Metrics.ResponseTimeVariability.DebounceScaleUpCount = 0 - return 1 // Suggest increase concurrency + return 1 } } else { - ch.Metrics.ResponseTimeVariability.DebounceScaleUpCount = 0 // Reset counter if conditions are not met + ch.Metrics.ResponseTimeVariability.DebounceScaleUpCount = 0 } - return 0 // Default to no change + return 0 } // calculateAverage computes the average response time from a slice of time.Duration values. diff --git a/concurrency/scale.go b/concurrency/scale.go index ee8b05a..b03a20a 100644 --- a/concurrency/scale.go +++ b/concurrency/scale.go @@ -5,11 +5,9 @@ import "go.uber.org/zap" // ScaleDown reduces the concurrency level by one, down to the minimum limit. func (ch *ConcurrencyHandler) ScaleDown() { - // Lock to ensure thread safety ch.Lock() defer ch.Unlock() - // We must consider the capacity rather than the length of the semaphore channel currentSize := cap(ch.sem) if currentSize > MinConcurrency { newSize := currentSize - 1 @@ -22,7 +20,6 @@ func (ch *ConcurrencyHandler) ScaleDown() { // ScaleUp increases the concurrency level by one, up to the maximum limit. func (ch *ConcurrencyHandler) ScaleUp() { - // Lock to ensure thread safety ch.Lock() defer ch.Unlock() @@ -49,19 +46,15 @@ func (ch *ConcurrencyHandler) ScaleUp() { func (ch *ConcurrencyHandler) ResizeSemaphore(newSize int) { newSem := make(chan struct{}, newSize) - // Transfer tokens from the old semaphore to the new one. for { select { case token := <-ch.sem: select { case newSem <- token: - // Token transferred to new semaphore. default: - // New semaphore is full, put token back to the old one to allow ongoing operations to complete. ch.sem <- token } default: - // No more tokens to transfer. close(ch.sem) ch.sem = newSem return diff --git a/concurrency/semaphore.go b/concurrency/semaphore.go index d787b35..bf3ef67 100644 --- a/concurrency/semaphore.go +++ b/concurrency/semaphore.go @@ -33,28 +33,21 @@ import ( // concurrency tracking. func (ch *ConcurrencyHandler) AcquireConcurrencyPermit(ctx context.Context) (context.Context, uuid.UUID, error) { log := ch.logger - - // Start measuring the permit acquisition time. tokenAcquisitionStart := time.Now() - - // Generate a unique request ID for this permit acquisition. requestID := uuid.New() - // Create a new context with a specified timeout for acquiring the permit. ctxWithTimeout, cancel := context.WithTimeout(ctx, 10*time.Second) - defer cancel() // Ensure to free up resources by cancelling the context after use. + defer cancel() select { - case ch.sem <- struct{}{}: // permit acquisition was successful. - // Record the time taken to acquire the permit. + case ch.sem <- struct{}{}: tokenAcquisitionDuration := time.Since(tokenAcquisitionStart) - ch.trackResourceAcquisition(tokenAcquisitionDuration, requestID) // Track and log metrics. + ch.trackResourceAcquisition(tokenAcquisitionDuration, requestID) - // Create a new context that includes the unique request ID. ctxWithRequestID := context.WithValue(ctx, RequestIDKey{}, requestID) return ctxWithRequestID, requestID, nil - case <-ctxWithTimeout.Done(): // Timeout occurred before a permit could be acquired. + case <-ctxWithTimeout.Done(): log.Error("Failed to acquire concurrency permit", zap.Error(ctxWithTimeout.Err())) return ctx, requestID, ctxWithTimeout.Err() } @@ -74,14 +67,12 @@ func (ch *ConcurrencyHandler) trackResourceAcquisition(duration time.Duration, r ch.Lock() defer ch.Unlock() - // Record the time taken to acquire the permit and update related metrics. ch.AcquisitionTimes = append(ch.AcquisitionTimes, duration) ch.Metrics.Lock() ch.Metrics.PermitWaitTime += duration - ch.Metrics.TotalRequests++ // Increment the count of total requests handled. + ch.Metrics.TotalRequests++ ch.Metrics.Unlock() - // Calculate and log the current state of permit utilization. utilizedPermits := len(ch.sem) availablePermits := cap(ch.sem) - utilizedPermits ch.logger.Debug("Resource acquired", zap.String("RequestID", requestID.String()), zap.Duration("Duration", duration), zap.Int("UtilizedPermits", utilizedPermits), zap.Int("AvailablePermits", availablePermits)) @@ -105,12 +96,9 @@ func (ch *ConcurrencyHandler) trackResourceAcquisition(duration time.Duration, r // This usage ensures that the permit is released in a deferred manner at the end of the operation, regardless of // how the operation exits (normal completion or error path). func (ch *ConcurrencyHandler) ReleaseConcurrencyPermit(requestID uuid.UUID) { - // Safely remove a permit from the semaphore to make it available for other operations. select { case <-ch.sem: - // Continue to process after successfully retrieving a permit from the semaphore. default: - // Log an error if no permit was available to release, indicating a potential synchronization issue. ch.logger.Error("Attempted to release a non-existent concurrency permit", zap.String("RequestID", requestID.String())) return } @@ -118,15 +106,13 @@ func (ch *ConcurrencyHandler) ReleaseConcurrencyPermit(requestID uuid.UUID) { ch.Lock() defer ch.Unlock() - // Update metrics related to permit release. ch.Metrics.Lock() - ch.Metrics.TotalRequests-- // Decrement the count of total requests handled, if applicable. + ch.Metrics.TotalRequests-- ch.Metrics.Unlock() - utilizedPermits := len(ch.sem) // Calculate tokens currently in use. - availablePermits := cap(ch.sem) - utilizedPermits // Calculate tokens that are available for use. + utilizedPermits := len(ch.sem) + availablePermits := cap(ch.sem) - utilizedPermits - // Log the release of the concurrency permit for auditing and debugging purposes. ch.logger.Debug("Released concurrency permit", zap.String("RequestID", requestID.String()), zap.Int("UtilizedPermits", utilizedPermits),