Skip to content

Commit

Permalink
Add hts_tpool_worker_id() API
Browse files Browse the repository at this point in the history
This returns a numeric value from 0 to nthreads-1 corresponding to the
current running thread, or -1 if unthreaded or the thread does not
correspond to one allocated to this pool.

It may be used to associate data to a thread rather than to a job.
For example maintaining one open file handle per thread spawned.
  • Loading branch information
jkbonfield authored and whitwham committed Jan 28, 2025
1 parent c6c1d19 commit 2578c89
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 9 deletions.
8 changes: 8 additions & 0 deletions htslib/thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,14 @@ HTSLIB_EXPORT
int hts_tpool_size(hts_tpool *p);


/// Return the worker ID index, from 0 to nthreads-1.
/**
* @param p Thread pool
* @return The worker index (0..ntheads-1) or -1 if not found
*/
HTSLIB_EXPORT
int hts_tpool_worker_id(hts_tpool *pool);

/// Add an item to the work pool.
/**
* @param p Thread pool
Expand Down
22 changes: 13 additions & 9 deletions thread_pool.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,21 @@ static void hts_tpool_process_detach_locked(hts_tpool *p,

//#define DEBUG

#ifdef DEBUG
static int worker_id(hts_tpool *p) {
int i;
// Return the worker ID index, from 0 to nthreads-1.
// Return <0 on error, but this shouldn't be possible
int hts_tpool_worker_id(hts_tpool *p) {
if (!p)
return -1;
pthread_t s = pthread_self();
int i;
for (i = 0; i < p->tsize; i++) {
if (pthread_equal(s, p->t[i].tid))
return i;
}
return -1;
}

#ifdef DEBUG
void DBG_OUT(FILE *fp, char *fmt, ...) {
va_list args;
va_start(args, fmt);
Expand Down Expand Up @@ -95,7 +99,7 @@ static int hts_tpool_add_result(hts_tpool_job *j, void *data) {
pthread_mutex_lock(&q->p->pool_m);

DBG_OUT(stderr, "%d: Adding result to queue %p, serial %"PRId64", %d of %d\n",
worker_id(j->p), q, j->serial, q->n_output+1, q->qsize);
hts_tpool_worker_id(j->p), q, j->serial, q->n_output+1, q->qsize);

if (--q->n_processing == 0)
pthread_cond_signal(&q->none_processing_c);
Expand Down Expand Up @@ -129,9 +133,9 @@ static int hts_tpool_add_result(hts_tpool_job *j, void *data) {
|| q->next_serial == INT_MAX); // ... unless flush in progress.
if (r->serial == q->next_serial) {
DBG_OUT(stderr, "%d: Broadcasting result_avail (id %"PRId64")\n",
worker_id(j->p), r->serial);
hts_tpool_worker_id(j->p), r->serial);
pthread_cond_broadcast(&q->output_avail_c);
DBG_OUT(stderr, "%d: Broadcast complete\n", worker_id(j->p));
DBG_OUT(stderr, "%d: Broadcast complete\n", hts_tpool_worker_id(j->p));
}

pthread_mutex_unlock(&q->p->pool_m);
Expand Down Expand Up @@ -603,7 +607,7 @@ static void *tpool_worker(void *arg) {
pthread_mutex_unlock(&p->pool_m);

DBG_OUT(stderr, "%d: Processing queue %p, serial %"PRId64"\n",
worker_id(j->p), q, j->serial);
hts_tpool_worker_id(j->p), q, j->serial);

if (hts_tpool_add_result(j, j->func(j->arg)) < 0)
goto err;
Expand All @@ -625,13 +629,13 @@ static void *tpool_worker(void *arg) {
shutdown:
pthread_mutex_unlock(&p->pool_m);
#ifdef DEBUG
fprintf(stderr, "%d: Shutting down\n", worker_id(p));
fprintf(stderr, "%d: Shutting down\n", hts_tpool_worker_id(p));
#endif
return NULL;

err:
#ifdef DEBUG
fprintf(stderr, "%d: Failed to add result\n", worker_id(p));
fprintf(stderr, "%d: Failed to add result\n", hts_tpool_worker_id(p));
#endif
// Hard failure, so shutdown all queues
pthread_mutex_lock(&p->pool_m);
Expand Down

0 comments on commit 2578c89

Please sign in to comment.