diff --git a/htslib/thread_pool.h b/htslib/thread_pool.h index b13ccb73b..fe012030b 100644 --- a/htslib/thread_pool.h +++ b/htslib/thread_pool.h @@ -115,6 +115,14 @@ HTSLIB_EXPORT int hts_tpool_size(hts_tpool *p); +/// Return the worker ID index, from 0 to nthreads-1. +/** + * @param p Thread pool + * @return The worker index (0..ntheads-1) or -1 if not found + */ +HTSLIB_EXPORT +int hts_tpool_worker_id(hts_tpool *pool); + /// Add an item to the work pool. /** * @param p Thread pool diff --git a/thread_pool.c b/thread_pool.c index 252a9d24c..a5170a8da 100644 --- a/thread_pool.c +++ b/thread_pool.c @@ -51,10 +51,13 @@ static void hts_tpool_process_detach_locked(hts_tpool *p, //#define DEBUG -#ifdef DEBUG -static int worker_id(hts_tpool *p) { - int i; +// Return the worker ID index, from 0 to nthreads-1. +// Return <0 on error, but this shouldn't be possible +int hts_tpool_worker_id(hts_tpool *p) { + if (!p) + return -1; pthread_t s = pthread_self(); + int i; for (i = 0; i < p->tsize; i++) { if (pthread_equal(s, p->t[i].tid)) return i; @@ -62,6 +65,7 @@ static int worker_id(hts_tpool *p) { return -1; } +#ifdef DEBUG void DBG_OUT(FILE *fp, char *fmt, ...) { va_list args; va_start(args, fmt); @@ -95,7 +99,7 @@ static int hts_tpool_add_result(hts_tpool_job *j, void *data) { pthread_mutex_lock(&q->p->pool_m); DBG_OUT(stderr, "%d: Adding result to queue %p, serial %"PRId64", %d of %d\n", - worker_id(j->p), q, j->serial, q->n_output+1, q->qsize); + hts_tpool_worker_id(j->p), q, j->serial, q->n_output+1, q->qsize); if (--q->n_processing == 0) pthread_cond_signal(&q->none_processing_c); @@ -129,9 +133,9 @@ static int hts_tpool_add_result(hts_tpool_job *j, void *data) { || q->next_serial == INT_MAX); // ... unless flush in progress. if (r->serial == q->next_serial) { DBG_OUT(stderr, "%d: Broadcasting result_avail (id %"PRId64")\n", - worker_id(j->p), r->serial); + hts_tpool_worker_id(j->p), r->serial); pthread_cond_broadcast(&q->output_avail_c); - DBG_OUT(stderr, "%d: Broadcast complete\n", worker_id(j->p)); + DBG_OUT(stderr, "%d: Broadcast complete\n", hts_tpool_worker_id(j->p)); } pthread_mutex_unlock(&q->p->pool_m); @@ -603,7 +607,7 @@ static void *tpool_worker(void *arg) { pthread_mutex_unlock(&p->pool_m); DBG_OUT(stderr, "%d: Processing queue %p, serial %"PRId64"\n", - worker_id(j->p), q, j->serial); + hts_tpool_worker_id(j->p), q, j->serial); if (hts_tpool_add_result(j, j->func(j->arg)) < 0) goto err; @@ -625,13 +629,13 @@ static void *tpool_worker(void *arg) { shutdown: pthread_mutex_unlock(&p->pool_m); #ifdef DEBUG - fprintf(stderr, "%d: Shutting down\n", worker_id(p)); + fprintf(stderr, "%d: Shutting down\n", hts_tpool_worker_id(p)); #endif return NULL; err: #ifdef DEBUG - fprintf(stderr, "%d: Failed to add result\n", worker_id(p)); + fprintf(stderr, "%d: Failed to add result\n", hts_tpool_worker_id(p)); #endif // Hard failure, so shutdown all queues pthread_mutex_lock(&p->pool_m);