Skip to content
This repository has been archived by the owner on Dec 14, 2021. It is now read-only.

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
htdvisser committed Aug 18, 2017
2 parents e9c4b03 + bdf5610 commit 788fbc3
Show file tree
Hide file tree
Showing 10 changed files with 230 additions and 118 deletions.
46 changes: 31 additions & 15 deletions backend/pktfwd/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,9 +431,9 @@ func (b *Backend) handleTXACK(addr *net.UDPAddr, data []byte) error {

// newGatewayStatsPacket transforms a Semtech Stat packet into a StatusMessage.
func newGatewayStatsPacket(mac lorawan.EUI64, stat Stat) *types.StatusMessage {
var gps *pb_gateway.GPSMetadata
var gps *pb_gateway.LocationMetadata
if stat.Lati != 0 || stat.Long != 0 || stat.Alti != 0 {
gps = &pb_gateway.GPSMetadata{
gps = &pb_gateway.LocationMetadata{
Latitude: float32(stat.Lati),
Longitude: float32(stat.Long),
Altitude: stat.Alti,
Expand All @@ -446,18 +446,31 @@ func newGatewayStatsPacket(mac lorawan.EUI64, stat Stat) *types.StatusMessage {
gatewayTime = time.Unix(0, 0)
}

bootTime := time.Time(stat.Boot)
if bootTime.IsZero() || bootTime.Before(time.Unix(0, 0)) {
bootTime = time.Unix(0, 0)
}

status := &types.StatusMessage{
GatewayID: getID(mac),
Message: &pb_gateway.Status{
Time: gatewayTime.UnixNano(),
Gps: gps,
RxIn: uint32(stat.RXNb),
RxOk: uint32(stat.RXOK),
TxIn: uint32(stat.DWNb),
TxOk: uint32(stat.TXNb),
BootTime: bootTime.UnixNano(),
Location: gps,
Platform: stat.Pfrm,
ContactEmail: stat.Mail,
Description: stat.Desc,
Fpga: stat.FPGA,
Dsp: stat.DSP,
Hal: stat.HAL,
RxIn: stat.RXNb,
RxOk: stat.RXOK,
TxIn: stat.DWNb,
TxOk: stat.TXNb,
LmOk: stat.LMOK,
LmSt: stat.LMST,
LmNw: stat.LMNW,
LPps: stat.LPPS,
},
}

Expand Down Expand Up @@ -530,22 +543,25 @@ func newRXPacketFromRXPK(mac lorawan.EUI64, rxpk RXPK) (*types.UplinkMessage, er
// Use LSNR and RSSI from RSig if present
if len(rxpk.RSig) > 0 {
rxPacket.Message.GatewayMetadata.Snr = float32(rxpk.RSig[0].LSNR)
rxPacket.Message.GatewayMetadata.Rssi = float32(rxpk.RSig[0].RSSIC)
rxPacket.Message.GatewayMetadata.Rssi = float32(rxpk.RSig[0].RSSIS)
}

if len(rxpk.RSig) > 1 {
for _, sig := range rxpk.RSig {
if float32(sig.LSNR) > rxPacket.Message.GatewayMetadata.Snr {
rxPacket.Message.GatewayMetadata.Snr = float32(sig.LSNR)
rxPacket.Message.GatewayMetadata.Rssi = float32(sig.RSSIC)
} else if float32(sig.LSNR) == rxPacket.Message.GatewayMetadata.Snr && float32(sig.RSSIC) > rxPacket.Message.GatewayMetadata.Rssi {
rxPacket.Message.GatewayMetadata.Rssi = float32(sig.RSSIC)
rxPacket.Message.GatewayMetadata.Rssi = float32(sig.RSSIS)
} else if float32(sig.LSNR) == rxPacket.Message.GatewayMetadata.Snr && float32(sig.RSSIS) > rxPacket.Message.GatewayMetadata.Rssi {
rxPacket.Message.GatewayMetadata.Rssi = float32(sig.RSSIS)
}
antenna := &pb_gateway.RxMetadata_Antenna{
Antenna: uint32(sig.Ant),
Channel: uint32(sig.Chan),
Rssi: float32(sig.RSSIC),
Snr: float32(sig.LSNR),
Antenna: uint32(sig.Ant),
Channel: uint32(sig.Chan),
ChannelRssi: float32(sig.RSSIC),
Rssi: float32(sig.RSSIS),
RssiStandardDeviation: float32(sig.RSSISD),
Snr: float32(sig.LSNR),
FrequencyOffset: int64(sig.FOff),
}
if eTime, err := base64.StdEncoding.DecodeString(sig.ETime); err == nil && len(eTime) > 0 {
antenna.EncryptedTime = eTime
Expand Down
6 changes: 3 additions & 3 deletions backend/pktfwd/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ func TestNewGatewayStatPacket(t *testing.T) {
So(gwStats.GatewayID, ShouldEqual, "eui-0102030405060708")
So(gwStats.Message, ShouldResemble, &pb_gateway.Status{
Time: now.UnixNano(),
Gps: &pb_gateway.GPSMetadata{
Location: &pb_gateway.LocationMetadata{
Latitude: 1.234,
Longitude: 2.123,
Altitude: 234,
Expand Down Expand Up @@ -498,8 +498,8 @@ func TestNewRXPacketFromRXPK(t *testing.T) {
Size: 16,
Data: base64.StdEncoding.EncodeToString([]byte{1, 2, 3, 4}),
RSig: []RSig{
RSig{Ant: 0, Chan: 2, RSSIC: -54, LSNR: 6.5},
RSig{Ant: 1, Chan: 2, RSSIC: -51, LSNR: 7},
RSig{Ant: 0, Chan: 2, RSSIS: -54, LSNR: 6.5},
RSig{Ant: 1, Chan: 2, RSSIS: -51, LSNR: 7},
},
}
mac := [8]byte{1, 2, 3, 4, 5, 6, 7, 8}
Expand Down
7 changes: 7 additions & 0 deletions cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/TheThingsNetwork/gateway-connector-bridge/backend/ttn"
"github.com/TheThingsNetwork/gateway-connector-bridge/exchange"
"github.com/TheThingsNetwork/gateway-connector-bridge/middleware"
"github.com/TheThingsNetwork/gateway-connector-bridge/middleware/deduplicate"
"github.com/TheThingsNetwork/gateway-connector-bridge/middleware/gatewayinfo"
"github.com/TheThingsNetwork/gateway-connector-bridge/middleware/inject"
"github.com/TheThingsNetwork/gateway-connector-bridge/middleware/ratelimit"
Expand Down Expand Up @@ -91,6 +92,10 @@ func runBridge(cmd *cobra.Command, args []string) {

var middleware middleware.Chain

if viper.GetBool("deduplicate") {
middleware = append(middleware, deduplicate.NewDeduplicate())
}

id := fmt.Sprintf(
"%s %s-%s (%s)",
config.GetString("id"),
Expand Down Expand Up @@ -344,6 +349,8 @@ func init() {

BridgeCmd.Flags().String("inject-frequency-plan", "", "Inject a frequency plan field into status message that don't have one")

BridgeCmd.Flags().Bool("deduplicate", true, "Block duplicate messages")

BridgeCmd.Flags().Bool("ratelimit", false, "Rate-limit messages")
BridgeCmd.Flags().Uint("ratelimit-uplink", 600, "Uplink rate limit (per gateway per minute)")
BridgeCmd.Flags().Uint("ratelimit-downlink", 0, "Downlink rate limit (per gateway per minute)")
Expand Down
54 changes: 54 additions & 0 deletions middleware/deduplicate/deduplicate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright © 2017 The Things Network
// Use of this source code is governed by the MIT license that can be found in the LICENSE file.

package deduplicate

import (
"bytes"
"errors"
"sync"

"github.com/TheThingsNetwork/gateway-connector-bridge/middleware"
"github.com/TheThingsNetwork/gateway-connector-bridge/types"
"github.com/TheThingsNetwork/go-utils/log"
)

// NewDeduplicate returns a middleware that deduplicates duplicate uplink messages received from broken gateways
func NewDeduplicate() *Deduplicate {
return &Deduplicate{
log: log.Get(),
lastMessage: make(map[string]*types.UplinkMessage),
}
}

// Deduplicate middleware
type Deduplicate struct {
log log.Interface
mu sync.RWMutex
lastMessage map[string]*types.UplinkMessage
}

// HandleDisconnect cleans up
func (d *Deduplicate) HandleDisconnect(ctx middleware.Context, msg *types.DisconnectMessage) error {
d.mu.Lock()
defer d.mu.Unlock()
delete(d.lastMessage, msg.GatewayID)
return nil
}

// ErrDuplicateMessage is returned when an uplink message is received multiple times
var ErrDuplicateMessage = errors.New("deduplicate: already handled this message")

// HandleUplink blocks duplicate messages
func (d *Deduplicate) HandleUplink(_ middleware.Context, msg *types.UplinkMessage) error {
d.mu.Lock()
defer d.mu.Unlock()
if lastMessage, ok := d.lastMessage[msg.GatewayID]; ok {
if bytes.Equal(msg.Message.Payload, lastMessage.Message.Payload) && // length check on slice is fast
msg.Message.GetGatewayMetadata().GetTimestamp() == lastMessage.Message.GetGatewayMetadata().GetTimestamp() {
return ErrDuplicateMessage
}
}
d.lastMessage[msg.GatewayID] = msg
return nil
}
52 changes: 52 additions & 0 deletions middleware/deduplicate/deduplicate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright © 2017 The Things Network
// Use of this source code is governed by the MIT license that can be found in the LICENSE file.

package deduplicate

import (
"testing"

"github.com/TheThingsNetwork/gateway-connector-bridge/middleware"
"github.com/TheThingsNetwork/gateway-connector-bridge/types"
"github.com/TheThingsNetwork/ttn/api/router"
. "github.com/smartystreets/goconvey/convey"
)

func TestDeduplicate(t *testing.T) {
Convey("Given a new Deduplicate", t, func(c C) {
i := NewDeduplicate()

up := &types.UplinkMessage{GatewayID: "test", Message: &router.UplinkMessage{
Payload: []byte{1, 2, 3, 4},
}}
upDup := &types.UplinkMessage{GatewayID: "test", Message: &router.UplinkMessage{
Payload: []byte{1, 2, 3, 4},
}}
nextUp := &types.UplinkMessage{GatewayID: "test", Message: &router.UplinkMessage{
Payload: []byte{1, 2, 3, 4, 5},
}}

Convey("When sending an UplinkMessage", func() {
Reset(func() {
i.HandleDisconnect(middleware.NewContext(), &types.DisconnectMessage{GatewayID: "test"})
})
err := i.HandleUplink(middleware.NewContext(), up)
Convey("There should be no error", func() {
So(err, ShouldBeNil)
})
Convey("When sending a duplicate of that UplinkMessage", func() {
err := i.HandleUplink(middleware.NewContext(), upDup)
Convey("There should be an error", func() {
So(err, ShouldEqual, ErrDuplicateMessage)
})
})
Convey("When sending another UplinkMessage", func() {
err := i.HandleUplink(middleware.NewContext(), nextUp)
Convey("There should be no error", func() {
So(err, ShouldBeNil)
})
})

})
})
}
63 changes: 18 additions & 45 deletions middleware/gatewayinfo/gatewayinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,61 +201,32 @@ func (p *Public) HandleDisconnect(ctx middleware.Context, msg *types.DisconnectM
return nil
}

// HandleUplink inserts metadata if set in info, but not present in message
func (p *Public) HandleUplink(ctx middleware.Context, msg *types.UplinkMessage) error {
info, err := p.get(msg.GatewayID)
if err != nil {
msg.Message.Trace = msg.Message.Trace.WithEvent("unable to get gateway info", "error", err)
}
meta := msg.Message.GetGatewayMetadata()
if gps, changed := p.setGPS(meta.Gps, info.AntennaLocation); changed {
msg.Message.Trace = msg.Message.Trace.WithEvent("injecting gateway location")
meta.Gps = gps
// HandleStatus inserts metadata if set in info, but not present in message
func (p *Public) HandleStatus(ctx middleware.Context, msg *types.StatusMessage) error {
info, _ := p.get(msg.GatewayID)

if msg.Message.Location == nil || msg.Message.Location.Validate() != nil {
msg.Message.Location = nil
}
return nil
}

func (p *Public) setGPS(gps *gateway.GPSMetadata, location *account.Location) (_ *gateway.GPSMetadata, injected bool) {
if gps != nil {
// Unset GPS if invalid coordinates
if gps.Latitude > 90 || gps.Latitude < -90 || gps.Longitude > 180 || gps.Longitude < -180 {
gps.Latitude = 0
gps.Longitude = 0
if info.AntennaLocation != nil {
if msg.Message.Location == nil {
msg.Message.Location = new(gateway.LocationMetadata)
}
// Unset GPS if close enough to null island
if (gps.Latitude > -1 && gps.Latitude < 1) && (gps.Longitude > -1 && gps.Longitude < 1) {
gps.Latitude = 0
gps.Longitude = 0
if msg.Message.Location.IsZero() {
msg.Message.Location.Latitude = float32(info.AntennaLocation.Latitude)
msg.Message.Location.Longitude = float32(info.AntennaLocation.Longitude)
msg.Message.Location.Source = gateway.LocationMetadata_REGISTRY
}
if gps.Latitude == 0 && gps.Longitude == 0 && gps.Altitude == 0 {
gps = nil
if msg.Message.Location.Altitude == 0 {
msg.Message.Location.Altitude = int32(info.AntennaLocation.Altitude)
}
}
if location == nil {
return gps, injected
}
if gps == nil {
gps = new(gateway.GPSMetadata)
}
if gps.Latitude == 0 && gps.Longitude == 0 {
gps.Latitude = float32(location.Latitude)
gps.Longitude = float32(location.Longitude)
injected = true
}
if gps.Altitude == 0 {
gps.Altitude = int32(location.Altitude)
injected = true
}
return gps, injected
}

// HandleStatus inserts metadata if set in info, but not present in message
func (p *Public) HandleStatus(ctx middleware.Context, msg *types.StatusMessage) error {
info, _ := p.get(msg.GatewayID)
msg.Message.Gps, _ = p.setGPS(msg.Message.Gps, info.AntennaLocation)
if msg.Message.FrequencyPlan == "" && info.FrequencyPlan != "" {
msg.Message.FrequencyPlan = info.FrequencyPlan
}

if msg.Message.Platform == "" {
platform := []string{}
if info.Attributes.Brand != nil {
Expand All @@ -266,8 +237,10 @@ func (p *Public) HandleStatus(ctx middleware.Context, msg *types.StatusMessage)
}
msg.Message.Platform = strings.Join(platform, " ")
}

if msg.Message.Description == "" && info.Attributes.Description != nil {
msg.Message.Description = *info.Attributes.Description
}

return nil
}
22 changes: 2 additions & 20 deletions middleware/gatewayinfo/gatewayinfo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/TheThingsNetwork/gateway-connector-bridge/types"
"github.com/TheThingsNetwork/go-account-lib/account"
"github.com/TheThingsNetwork/ttn/api/gateway"
"github.com/TheThingsNetwork/ttn/api/router"
. "github.com/smartystreets/goconvey/convey"
redis "gopkg.in/redis.v5"
)
Expand Down Expand Up @@ -116,23 +115,6 @@ func TestPublic(t *testing.T) {
})
})

Convey("When sending an UplinkMessage", func() {
uplink := &types.UplinkMessage{
GatewayID: gatewayID,
Message: &router.UplinkMessage{
GatewayMetadata: &gateway.RxMetadata{},
},
}
err := p.HandleUplink(middleware.NewContext(), uplink)
Convey("There should be no error", func() {
So(err, ShouldBeNil)
})
Convey("The UplinkMessage should have GPS Metadata", func() {
So(uplink.Message.GetGatewayMetadata().GetGps(), ShouldNotBeNil)
So(uplink.Message.GetGatewayMetadata().GetGps().Latitude, ShouldEqual, 12.34)
})
})

Convey("When sending a StatusMessage", func() {
status := &types.StatusMessage{
GatewayID: gatewayID,
Expand All @@ -143,8 +125,8 @@ func TestPublic(t *testing.T) {
So(err, ShouldBeNil)
})
Convey("The StatusMessage should have Metadata", func() {
So(status.Message.GetGps(), ShouldNotBeNil)
So(status.Message.GetGps().Latitude, ShouldEqual, 12.34)
So(status.Message.GetLocation(), ShouldNotBeNil)
So(status.Message.GetLocation().Latitude, ShouldEqual, 12.34)
So(status.Message.Description, ShouldEqual, "My Test Gateway")
So(status.Message.Platform, ShouldEqual, "Test Gateway")
So(status.Message.FrequencyPlan, ShouldEqual, "EU_868")
Expand Down
Loading

0 comments on commit 788fbc3

Please sign in to comment.