Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sequentially queue processing #1446

Closed
LeonidEfremov opened this issue Aug 28, 2019 · 22 comments
Closed

Sequentially queue processing #1446

LeonidEfremov opened this issue Aug 28, 2019 · 22 comments

Comments

@LeonidEfremov
Copy link

It's possible to consume one queue (single node) sequentially? What is Bull.QueueOptions for this behavior?
For example, i set 10 tasks at the same time and want to process each one-by-one, without delay and parallelism.

@stansv
Copy link
Contributor

stansv commented Aug 28, 2019

This is exactly default behavior. Jobs are processed one by one, in same order they have been added to queue.

@LeonidEfremov
Copy link
Author

LeonidEfremov commented Aug 28, 2019

@stansv, let me provide more details.
I use Express for API host, in POST action i set new named task for queue.
And when i make this POST action few times, i get same parallel execution of these named tasks.
But i want to process it sequentially, not parallel, each next task should start when previous task is finished.

queue instance

const options = {
    redis: api.redis,
    defaultJobOptions: {
        attempts: 1,
        timeout: 1000 * 60 * 5,
        lifo: false,
        removeOnComplete: true,
        removeOnFail: true
    }
};

const queue = new bull(api.queues.puppeteer, options);

set named task

await queue.add(api.analyzers.cdp, { analyze: {} });

process

queue.process(api.analyzers.cdp, 1, _onCdp);

What if i set different names for tasks in one queue - what is default process behavior?

@stansv
Copy link
Contributor

stansv commented Aug 28, 2019

You should create queue, and register job processor outside express handler function. Only call queue.add() there.

...
const queue = new bull(api.queues.puppeteer, options);
queue.process(...);
...
app.post((req, res) => {
    queue.add(...);
});
...

Named processors is just the same as if you define only single processor without name and put switch block inside to select a job-specific processor; if no matching processor found for next job will just fail.
You do not have to always use named processors, probably in your case you can avoid it.

@LeonidEfremov
Copy link
Author

Named processors is just the same as if you define only single processor without name and put switch block inside to select a job-specific processor; if no matching processor found for next job will just fail.
You do not have to always use named processors, probably in your case you can avoid it.

yes, will try, thank you

@skliarovartem
Copy link

Did you get it work? I see that bull starts new jobs without finished previous one...

@LeonidEfremov
Copy link
Author

Did you get it work? I see that bull starts new jobs without finished previous one...

No =(

@skliarovartem
Copy link

Yes, it is strange that queue can't be sequential...

@manast
Copy link
Member

manast commented Jun 4, 2022

It is possible to limit how many jobs to be processed in parallel actually, but this requires the Pro version and use the groups functionality (worst case just have 1 group with max concurrency 1): https://docs.bullmq.io/bullmq-pro/groups/concurrency

@squalsoft
Copy link

I have same problem using nest.js.
This job processor try to handle jobs in PARALLEL:
@Process({ name: 'test1', concurrency: 1 }) async handleJobTest1(job: Job<string>) { console.log('test1 queue thing ' + job.data); await new Promise((resolve) => setTimeout(resolve, 2000)); console.log('test1 AFTER TIMEOUT queue thing ' + job.data); }

@trsh
Copy link

trsh commented Oct 10, 2022

Same problem here. Jobs run in parallel in queue. How is this a queue in anyway, if its just executes jobs by chance?

@manast
Copy link
Member

manast commented Oct 10, 2022

@trsh running jobs in parallel is a feature, not a problem. I already told you that you can limit concurrency, by limiting the amount of workers and concurrency factors.

@manast
Copy link
Member

manast commented Oct 10, 2022

In fact, you can even process jobs manually if you want to have full control: https://github.com/OptimalBits/bull/blob/develop/PATTERNS.md#manually-fetching-jobs

@trsh
Copy link

trsh commented Oct 10, 2022

@trsh running jobs in parallel is a feature, not a problem. I already told you that you can limit concurrency, by limiting the amount of workers and concurrency factors.

@manast can you point me in the direction, how I can do that? I did not find a thing - No match for concurrency or workers in documentation. I don't want manual control, just sequential jobs.

P.S features should have on/off no?

@trsh
Copy link

trsh commented Oct 10, 2022

Almost got happy about process(name: string, concurrency: number, callback: ProcessCallbackFunction<T>): Promise<void>;. But still goes parallel.

@manast
Copy link
Member

manast commented Oct 10, 2022

You are probably not implementing the processor correctly. I suggest you that you start using BullMQ instead, there is more documentation than Bull (https://docs.bullmq.io) and plenty of tutorials (https://blog.taskforce.sh)

@trsh
Copy link

trsh commented Oct 10, 2022

@manast its not an option for this time being. Just tell people who see this issue how to limit workers and concurrency properly. My processor is created together with queue

queue.process('process', 1, async (job: Job<EventJobData>) => {
  await something.to.compleye();
  return Promise.resolve();
}

@manast
Copy link
Member

manast commented Oct 10, 2022

@trsh If you read the documentation it is explained in several places. Your example should work, but I guess the reason you think the jobs are executing in parallel is that "something.to.compleye()" is not implemented correctly. The best is if you write a test case that does not hold, and then we take it from there.

@trsh
Copy link

trsh commented Oct 10, 2022

@trsh If you read the documentation it is explained in several places. Your example should work, but I guess the reason you think the jobs are executing in parallel is that "something.to.compleye()" is not implemented correctly. The best is if you write a test case that does not hold, and then we take it from there.

Ok I will. My await something.to.compleye(); return Promise.resolve(); is on point. There is no vodoo in it.

@itseramin
Copy link

You have to setup the RateLimiter option for your queue...

Just set the max property to 1.

bull/index.d.ts

Lines 51 to 60 in 60fa88f

interface RateLimiter {
/** Max numbers of jobs processed */
max: number;
/** Per duration in milliseconds */
duration: number;
/** When jobs get rate limited, they stay in the waiting queue and are not moved to the delayed queue */
bounceBack?: boolean | undefined;
/** Groups jobs with the specified key from the data object passed to the Queue#add ex. "network.handle" */
groupKey?: string | undefined;
}

@stouch
Copy link

stouch commented Jul 17, 2024

You have to setup the RateLimiter option for your queue...

Just set the max property to 1.

bull/index.d.ts

Lines 51 to 60 in 60fa88f

interface RateLimiter {
/** Max numbers of jobs processed */
max: number;
/** Per duration in milliseconds */
duration: number;
/** When jobs get rate limited, they stay in the waiting queue and are not moved to the delayed queue */
bounceBack?: boolean | undefined;
/** Groups jobs with the specified key from the data object passed to the Queue#add ex. "network.handle" */
groupKey?: string | undefined;
}

Not sure it would work because if you set max, you also have to set duration and so you would just say : "[max] jobs per [duration]ms", but this does not resolve the issue here.

@roggervalf
Copy link
Collaborator

Hi guys, we recommend you to use BullMQ as all new features will be implemented there, for sequential execution we have a pr. Now that we also added a global concurrency feature in bullmq, it's possible for us to bring this new functionality

@braniubojni
Copy link

If someone is using nestjs along with BullMq. Just adding concurrency key in the Processor decorator will do the job sequently.

@Processor('task', {
  concurrency: 1,
})

Adding tasks

    let num = 0;
    const list = new Array(3).fill(0);
    for (const id of list) {
      num++;
      const last = list.length === num;
      this.queue.add('task', { id: id + num, last });
    }

And executing them.

@Processor('task', {
  concurrency: 1,
})
export class TaskQueue extends WorkerHost {
  async process(job: Job<any, any, string>): Promise<any> {
    console.log('Processing job', job.data);
    await sleep(25_000);
    console.log('Job end');

    if (job.data.last) {
      console.log('Last job');
    }
  }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

10 participants