Skip to content

Commit

Permalink
Migrate gocoap to v3
Browse files Browse the repository at this point in the history
Signed-off-by: 1998-felix <[email protected]>
  • Loading branch information
felixgateru committed Apr 17, 2024
1 parent 3cfcf14 commit 9c5c0ab
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 143 deletions.
59 changes: 35 additions & 24 deletions coap/api/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package api

import (
"bytes"
"context"
"fmt"
"io"
Expand All @@ -20,9 +21,9 @@ import (
svcerr "github.com/absmach/magistrala/pkg/errors/service"
"github.com/absmach/magistrala/pkg/messaging"
"github.com/go-chi/chi/v5"
"github.com/plgd-dev/go-coap/v2/message"
"github.com/plgd-dev/go-coap/v2/message/codes"
"github.com/plgd-dev/go-coap/v2/mux"
"github.com/plgd-dev/go-coap/v3/message"
"github.com/plgd-dev/go-coap/v3/message/codes"
"github.com/plgd-dev/go-coap/v3/mux"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

Expand Down Expand Up @@ -66,20 +67,28 @@ func MakeCoAPHandler(svc coap.Service, l *slog.Logger) mux.HandlerFunc {
return handler
}

func sendResp(w mux.ResponseWriter, resp *message.Message) {
if err := w.Client().WriteMessage(resp); err != nil {
logger.Warn(fmt.Sprintf("Can't set response: %s", err))
func sendResp(w mux.ResponseWriter, resp *message.Message, isObs bool) {
pm := w.Conn().AcquireMessage(w.Conn().Context())
defer w.Conn().ReleaseMessage(pm)
pm.SetCode(resp.Code)
pm.SetToken(resp.Token)
pm.SetBody(bytes.NewReader(resp.Payload))
for _, opt := range resp.Options {
pm.SetOptionBytes(opt.ID, opt.Value)
}
if !isObs {
if err := w.Conn().WriteMessage(pm); err != nil {
logger.Warn(fmt.Sprintf("Can't set response: %s", err))
}
}
}

func handler(w mux.ResponseWriter, m *mux.Message) {
resp := message.Message{
Code: codes.Content,
Token: m.Token,
Context: m.Context,
Token: m.Token(),
Options: make(message.Options, 0, 16),
}
defer sendResp(w, &resp)
msg, err := decodeMessage(m)
if err != nil {
logger.Warn(fmt.Sprintf("Error decoding message: %s", err))
Expand All @@ -92,15 +101,17 @@ func handler(w mux.ResponseWriter, m *mux.Message) {
resp.Code = codes.Unauthorized
return
}
switch m.Code {
var isObs bool = false
switch m.Code() {
case codes.GET:
err = handleGet(m.Context, m, w.Client(), msg, key)
isObs, err = handleGet(context.Background(), m, w, msg, key)
case codes.POST:
resp.Code = codes.Created
err = service.Publish(m.Context, key, msg)
err = nil
default:
err = svcerr.ErrNotFound
}
defer sendResp(w, &resp, isObs)
if err != nil {
switch {
case err == errBadOptions:
Expand All @@ -116,25 +127,25 @@ func handler(w mux.ResponseWriter, m *mux.Message) {
}
}

func handleGet(ctx context.Context, m *mux.Message, c mux.Client, msg *messaging.Message, key string) error {
func handleGet(ctx context.Context, m *mux.Message, c mux.ResponseWriter, msg *messaging.Message, key string) (bool, error) {
var obs uint32
obs, err := m.Options.Observe()
obs, err := m.Observe()
if err != nil {
logger.Warn(fmt.Sprintf("Error reading observe option: %s", err))
return errBadOptions
return false, errBadOptions
}
if obs == startObserve {
c := coap.NewClient(c, m.Token, logger)
return service.Subscribe(ctx, key, msg.GetChannel(), msg.GetSubtopic(), c)
c := coap.NewClient(c, m.Token(), logger)
return true, service.Subscribe(ctx, key, msg.GetChannel(), msg.GetSubtopic(), c)
}
return service.Unsubscribe(ctx, key, msg.GetChannel(), msg.GetSubtopic(), m.Token.String())
return false, service.Unsubscribe(ctx, key, msg.GetChannel(), msg.GetSubtopic(), m.Token().String())
}

func decodeMessage(msg *mux.Message) (*messaging.Message, error) {
if msg.Options == nil {
if msg.Options() == nil {
return &messaging.Message{}, errBadOptions
}
path, err := msg.Options.Path()
path, err := msg.Path()
if err != nil {
return &messaging.Message{}, err
}
Expand All @@ -155,8 +166,8 @@ func decodeMessage(msg *mux.Message) (*messaging.Message, error) {
Created: time.Now().UnixNano(),
}

if msg.Body != nil {
buff, err := io.ReadAll(msg.Body)
if msg.Body() != nil {
buff, err := io.ReadAll(msg.Body())
if err != nil {
return ret, err
}
Expand All @@ -166,10 +177,10 @@ func decodeMessage(msg *mux.Message) (*messaging.Message, error) {
}

func parseKey(msg *mux.Message) (string, error) {
if obs, _ := msg.Options.Observe(); obs != 0 && msg.Code == codes.GET {
if obs, _ := msg.Observe(); obs != 0 && msg.Code() == codes.GET {
return "", nil
}
authKey, err := msg.Options.GetString(message.URIQuery)
authKey, err := msg.Options().GetString(message.URIQuery)
if err != nil {
return "", err
}
Expand Down
42 changes: 20 additions & 22 deletions coap/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ import (

"github.com/absmach/magistrala/pkg/errors"
"github.com/absmach/magistrala/pkg/messaging"
"github.com/plgd-dev/go-coap/v2/message"
"github.com/plgd-dev/go-coap/v2/message/codes"
mux "github.com/plgd-dev/go-coap/v2/mux"
"github.com/plgd-dev/go-coap/v3/message"
"github.com/plgd-dev/go-coap/v3/message/codes"
"github.com/plgd-dev/go-coap/v3/message/pool"
mux "github.com/plgd-dev/go-coap/v3/mux"
)

// Client wraps CoAP client.
Expand All @@ -36,14 +37,14 @@ type Client interface {
var ErrOption = errors.New("unable to set option")

type client struct {
client mux.Client
client mux.ResponseWriter
token message.Token
observe uint32
logger *slog.Logger
}

// NewClient instantiates a new Observer.
func NewClient(c mux.Client, tkn message.Token, l *slog.Logger) Client {
func NewClient(c mux.ResponseWriter, tkn message.Token, l *slog.Logger) Client {
return &client{
client: c,
token: tkn,
Expand All @@ -53,33 +54,28 @@ func NewClient(c mux.Client, tkn message.Token, l *slog.Logger) Client {
}

func (c *client) Done() <-chan struct{} {
return c.client.Done()
return c.client.Conn().Done()
}

func (c *client) Cancel() error {
m := message.Message{
Code: codes.Content,
Token: c.token,
Context: context.Background(),
Options: make(message.Options, 0, 16),
}
if err := c.client.WriteMessage(&m); err != nil {
pm := c.client.Conn().AcquireMessage(context.Background())
pm.SetCode(codes.Content)
pm.SetToken(c.token)
if err := c.client.Conn().WriteMessage(pm); err != nil {
c.logger.Error(fmt.Sprintf("Error sending message: %s.", err))
}
return c.client.Close()
return c.client.Conn().Close()
}

func (c *client) Token() string {
return c.token.String()
}

func (c *client) Handle(msg *messaging.Message) error {
m := message.Message{
Code: codes.Content,
Token: c.token,
Context: c.client.Context(),
Body: bytes.NewReader(msg.GetPayload()),
}
pm := pool.NewMessage(context.Background())
pm.SetCode(codes.Content)
pm.SetToken(c.token)
pm.SetBody(bytes.NewReader(msg.Payload))

atomic.AddUint32(&c.observe, 1)
var opts message.Options
Expand All @@ -103,6 +99,8 @@ func (c *client) Handle(msg *messaging.Message) error {
return fmt.Errorf("cannot set options to response: %w", err)
}

m.Options = opts
return c.client.WriteMessage(&m)
for _, option := range opts {
pm.SetOptionBytes(option.ID, option.Value)
}
return c.client.Conn().WriteMessage(pm)
}
7 changes: 3 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ require (
github.com/oklog/ulid/v2 v2.1.0
github.com/ory/dockertest/v3 v3.10.0
github.com/pelletier/go-toml v1.9.5
github.com/plgd-dev/go-coap/v2 v2.6.0
github.com/plgd-dev/go-coap/v3 v3.3.3
github.com/prometheus/client_golang v1.19.0
github.com/rabbitmq/amqp091-go v1.9.0
github.com/rubenv/sql-migrate v1.6.1
Expand Down Expand Up @@ -143,11 +143,10 @@ require (
github.com/opencontainers/image-spec v1.1.0 // indirect
github.com/opencontainers/runc v1.1.12 // indirect
github.com/pelletier/go-toml/v2 v2.2.0 // indirect
github.com/pion/dtls/v2 v2.2.10 // indirect
github.com/pion/dtls/v2 v2.2.8-0.20240201071732-2597464081c8 // indirect
github.com/pion/logging v0.2.2 // indirect
github.com/pion/transport/v2 v2.2.4 // indirect
github.com/pion/transport/v3 v3.0.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/plgd-dev/kit/v2 v2.0.0-20211006190727-057b33161b90 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.52.2 // indirect
Expand Down
Loading

0 comments on commit 9c5c0ab

Please sign in to comment.