diff --git a/concurrency/handler.go b/concurrency/handler.go index b917df1..56e6e83 100644 --- a/concurrency/handler.go +++ b/concurrency/handler.go @@ -13,42 +13,42 @@ type ConcurrencyHandler struct { sem chan struct{} logger *zap.SugaredLogger AcquisitionTimes []time.Duration - lock sync.Mutex lastTokenAcquisitionTime time.Time Metrics *ConcurrencyMetrics + sync.Mutex } // ConcurrencyMetrics captures various metrics related to managing concurrency for the client's interactions with the API. type ConcurrencyMetrics struct { - TotalRequests int64 // Total number of requests made - TotalRetries int64 // Total number of retry attempts - TotalRateLimitErrors int64 // Total number of rate limit errors encountered - PermitWaitTime time.Duration // Total time spent waiting for tokens - TTFB struct { // Metrics related to Time to First Byte (TTFB) - Total time.Duration // Total Time to First Byte (TTFB) for all requests - Count int64 // Count of requests used for calculating TTFB - Lock sync.Mutex // Lock for TTFB metrics + TotalRequests int64 + TotalRetries int64 + TotalRateLimitErrors int64 + PermitWaitTime time.Duration + sync.Mutex + TTFB struct { + Total time.Duration + Count int64 + sync.Mutex } - Throughput struct { // Metrics related to network throughput - Total float64 // Total network throughput for all requests - Count int64 // Count of requests used for calculating throughput - Lock sync.Mutex // Lock for throughput metrics/ + Throughput struct { + Total float64 + Count int64 + sync.Mutex } - ResponseTimeVariability struct { // Metrics related to response time variability - Total time.Duration // Total response time for all requests - Average time.Duration // Average response time across all requests - Variance float64 // Variance of response times - Count int64 // Count of responses used for calculating response time variability - Lock sync.Mutex // Lock for response time variability metrics - StdDevThreshold float64 // Maximum acceptable standard deviation for adjusting concurrency - DebounceScaleDownCount int // Counter to manage scale down actions after consecutive triggers - DebounceScaleUpCount int // Counter to manage scale up actions after consecutive triggers + ResponseTimeVariability struct { + Total time.Duration + Average time.Duration + Variance float64 + Count int64 + sync.Mutex + StdDevThreshold float64 + DebounceScaleDownCount int + DebounceScaleUpCount int } ResponseCodeMetrics struct { - ErrorRate float64 // Error rate calculated as (TotalRateLimitErrors + 5xxErrors) / TotalRequests - Lock sync.Mutex // Lock for response code metrics + ErrorRate float64 + sync.Mutex } - Lock sync.Mutex // Lock for overall metrics fields } // NewConcurrencyHandler initializes a new ConcurrencyHandler with the given diff --git a/concurrency/metrics.go b/concurrency/metrics.go index f1b9843..0e9d18f 100644 --- a/concurrency/metrics.go +++ b/concurrency/metrics.go @@ -156,8 +156,8 @@ func (ch *ConcurrencyHandler) MonitorRateLimitHeaders(resp *http.Response) int { func (ch *ConcurrencyHandler) MonitorServerResponseCodes(resp *http.Response) int { statusCode := resp.StatusCode - ch.Metrics.Lock.Lock() - defer ch.Metrics.Lock.Unlock() + ch.Metrics.Lock() + defer ch.Metrics.Unlock() if statusCode >= 200 && statusCode < 300 { // Reset error counts for successful responses @@ -212,8 +212,8 @@ var responseTimesLock sync.Mutex // - (1) to suggest an increase in concurrency, // - (0) to indicate no change needed. func (ch *ConcurrencyHandler) MonitorResponseTimeVariability(responseTime time.Duration) int { - ch.Metrics.ResponseTimeVariability.Lock.Lock() - defer ch.Metrics.ResponseTimeVariability.Lock.Unlock() + ch.Metrics.ResponseTimeVariability.Lock() + defer ch.Metrics.ResponseTimeVariability.Unlock() responseTimesLock.Lock() // Ensure safe concurrent access responseTimes = append(responseTimes, responseTime) diff --git a/concurrency/scale.go b/concurrency/scale.go index 73a8105..97fe811 100644 --- a/concurrency/scale.go +++ b/concurrency/scale.go @@ -6,8 +6,8 @@ import "go.uber.org/zap" // ScaleDown reduces the concurrency level by one, down to the minimum limit. func (ch *ConcurrencyHandler) ScaleDown() { // Lock to ensure thread safety - ch.lock.Lock() - defer ch.lock.Unlock() + ch.Lock() + defer ch.Unlock() // We must consider the capacity rather than the length of the semaphore channel currentSize := cap(ch.sem) @@ -23,8 +23,8 @@ func (ch *ConcurrencyHandler) ScaleDown() { // ScaleUp increases the concurrency level by one, up to the maximum limit. func (ch *ConcurrencyHandler) ScaleUp() { // Lock to ensure thread safety - ch.lock.Lock() - defer ch.lock.Unlock() + ch.Lock() + defer ch.Unlock() currentSize := cap(ch.sem) if currentSize < MaxConcurrency { diff --git a/concurrency/semaphore.go b/concurrency/semaphore.go index d0e7ec1..d787b35 100644 --- a/concurrency/semaphore.go +++ b/concurrency/semaphore.go @@ -71,15 +71,15 @@ func (ch *ConcurrencyHandler) AcquireConcurrencyPermit(ctx context.Context) (con // This method locks the concurrency handler to safely update shared metrics and logs detailed // information about the permit acquisition for debugging and monitoring purposes. func (ch *ConcurrencyHandler) trackResourceAcquisition(duration time.Duration, requestID uuid.UUID) { - ch.lock.Lock() - defer ch.lock.Unlock() + ch.Lock() + defer ch.Unlock() // Record the time taken to acquire the permit and update related metrics. ch.AcquisitionTimes = append(ch.AcquisitionTimes, duration) - ch.Metrics.Lock.Lock() + ch.Metrics.Lock() ch.Metrics.PermitWaitTime += duration ch.Metrics.TotalRequests++ // Increment the count of total requests handled. - ch.Metrics.Lock.Unlock() + ch.Metrics.Unlock() // Calculate and log the current state of permit utilization. utilizedPermits := len(ch.sem) @@ -115,13 +115,13 @@ func (ch *ConcurrencyHandler) ReleaseConcurrencyPermit(requestID uuid.UUID) { return } - ch.lock.Lock() - defer ch.lock.Unlock() + ch.Lock() + defer ch.Unlock() // Update metrics related to permit release. - ch.Metrics.Lock.Lock() + ch.Metrics.Lock() ch.Metrics.TotalRequests-- // Decrement the count of total requests handled, if applicable. - ch.Metrics.Lock.Unlock() + ch.Metrics.Unlock() utilizedPermits := len(ch.sem) // Calculate tokens currently in use. availablePermits := cap(ch.sem) - utilizedPermits // Calculate tokens that are available for use.