Skip to content

golang-queue/nsq

Repository files navigation

NSQ

Run Testing Go Report Card codecov

NSQ as backend with Queue package (A realtime distributed messaging platform)

screen

Setup

start the NSQ lookupd

nsqlookupd

start the NSQ server

nsqd --lookupd-tcp-address=localhost:4160

start the NSQ admin dashboard

nsqadmin --lookupd-http-address localhost:4161

Testing

go test -v ./...

Example

package main

import (
  "context"
  "encoding/json"
  "fmt"
  "log"
  "time"

  "github.com/golang-queue/nsq"
  "github.com/golang-queue/queue"
  "github.com/golang-queue/queue/core"
)

type job struct {
  Message string
}

func (j *job) Bytes() []byte {
  b, err := json.Marshal(j)
  if err != nil {
    panic(err)
  }
  return b
}

func main() {
  taskN := 100
  rets := make(chan string, taskN)

  // define the worker
  w := nsq.NewWorker(
    nsq.WithAddr("127.0.0.1:4150"),
    nsq.WithTopic("example"),
    nsq.WithChannel("foobar"),
    // concurrent job number
    nsq.WithMaxInFlight(10),
    nsq.WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
      var v *job
      if err := json.Unmarshal(m.Payload(), &v); err != nil {
        return err
      }
      rets <- v.Message
      return nil
    }),
  )

  // define the queue
  q := queue.NewPool(
    10,
    queue.WithWorker(w),
  )
  defer q.Release()

  // assign tasks in queue
  for i := 0; i < taskN; i++ {
    go func(i int) {
      if err := q.Queue(&job{
        Message: fmt.Sprintf("handle the job: %d", i+1),
      }); err != nil {
        log.Fatal(err)
      }
    }(i)
  }

  // wait until all tasks done
  for i := 0; i < taskN; i++ {
    fmt.Println("message:", <-rets)
    time.Sleep(10 * time.Millisecond)
  }
}