diff --git a/service/worker/indexer/esProcessor.go b/service/worker/indexer/esProcessor.go index 1692ff51b1c..bf5b59e0edd 100644 --- a/service/worker/indexer/esProcessor.go +++ b/service/worker/indexer/esProcessor.go @@ -144,12 +144,6 @@ func (p *ESProcessorImpl) bulkAfterAction(id int64, requests []bulk.GenericBulka continue } wid, rid, domainID := p.getMsgWithInfo(key) - p.logger.Error("ES request failed and is not retryable", - tag.ESResponseStatus(err.Status), - tag.ESRequest(request.String()), - tag.WorkflowID(wid), - tag.WorkflowRunID(rid), - tag.WorkflowDomainID(domainID)) // check if it is a delete request and status code // 404 means the document does not exist @@ -162,6 +156,7 @@ func (p *ESProcessorImpl) bulkAfterAction(id int64, requests []bulk.GenericBulka tag.WorkflowRunID(rid), tag.WorkflowDomainID(domainID)) p.ackKafkaMsg(key) + continue } else if err.Status == 404 { req, err := request.Source() if err == nil && p.isDeleteRequest(req) { @@ -171,15 +166,19 @@ func (p *ESProcessorImpl) bulkAfterAction(id int64, requests []bulk.GenericBulka tag.WorkflowRunID(rid), tag.WorkflowDomainID(domainID)) p.ackKafkaMsg(key) + continue } else { p.logger.Error("Get request source err.", tag.Error(err), tag.ESRequest(request.String())) p.scope.IncCounter(metrics.ESProcessorCorruptedData) - p.nackKafkaMsg(key) } - } else { - // For all other non-retryable errors, nack the message - p.nackKafkaMsg(key) } + p.logger.Error("ES request failed and is not retryable", + tag.ESResponseStatus(err.Status), + tag.ESRequest(request.String()), + tag.WorkflowID(wid), + tag.WorkflowRunID(rid), + tag.WorkflowDomainID(domainID)) + p.nackKafkaMsg(key) } p.scope.IncCounter(metrics.ESProcessorFailures) }