Skip to content
This repository has been archived by the owner on Jun 6, 2023. It is now read-only.

Surb based publish subscribe #145

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 64 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,8 +372,12 @@ type Provider struct {
Kaetzchen []*Kaetzchen

// CBORPluginKaetzchen is the list of configured external CBOR Kaetzchen plugins
// for this provider.
// for this Provider.
CBORPluginKaetzchen []*CBORPluginKaetzchen

// PubsubPlugin is the list of configured external Publish Subscribe service plugins
// for this Provider.
PubsubPlugin []*PubsubPlugin
}

// SQLDB is the SQL database backend configuration.
Expand Down Expand Up @@ -535,6 +539,56 @@ func (kCfg *CBORPluginKaetzchen) validate() error {
return nil
}

// PubsubPlugin is a Provider Publish Subscribe service agent.
type PubsubPlugin struct {
// Capability is the capability exposed by the agent.
Capability string

// Endpoint is the provider side endpoint that the agent will accept
// requests at. While not required by the spec, this server only
// supports Endpoints that are lower-case local-parts of an e-mail
// address.
Endpoint string

// Config is the extra per agent arguments to be passed to the agent's
// initialization routine.
Config map[string]interface{}

// Command is the full file path to the external plugin program
// that implements this Kaetzchen service.
Command string

// MaxConcurrency is the number of worker goroutines to start
// for this service.
MaxConcurrency int

// Disable disabled a configured agent.
Disable bool
}

func (kCfg *PubsubPlugin) validate() error {
if kCfg.Capability == "" {
return fmt.Errorf("config: Pubsub: Capability is invalid")
}

// Ensure the endpoint is normalized.
epNorm, err := precis.UsernameCaseMapped.String(kCfg.Endpoint)
if err != nil {
return fmt.Errorf("config: Pubsub: '%v' has invalid endpoint: %v", kCfg.Capability, err)
}
if epNorm != kCfg.Endpoint {
return fmt.Errorf("config: Pubsub: '%v' has non-normalized endpoint %v", kCfg.Capability, kCfg.Endpoint)
}
if kCfg.Command == "" {
return fmt.Errorf("config: Pubsub: Command is invalid")
}
if _, err = mail.ParseAddress(kCfg.Endpoint + "@test.invalid"); err != nil {
return fmt.Errorf("config: Pubsub: '%v' has non local-part endpoint '%v': %v", kCfg.Capability, kCfg.Endpoint, err)
}

return nil
}

func (pCfg *Provider) applyDefaults(sCfg *Server) {
if pCfg.UserDB == nil {
pCfg.UserDB = &UserDB{}
Expand Down Expand Up @@ -656,6 +710,15 @@ func (pCfg *Provider) validate() error {
}
capaMap[v.Capability] = true
}
for _, v := range pCfg.PubsubPlugin {
if err := v.validate(); err != nil {
return err
}
if capaMap[v.Capability] {
return fmt.Errorf("config: Kaetzchen: '%v' configured multiple times", v.Capability)
}
capaMap[v.Capability] = true
}

return nil
}
Expand Down
1 change: 1 addition & 0 deletions internal/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
DecoySubsystem = "decoy"
IncomingConnSubsystem = "incoming_conn"
KaetzchenSubsystem = "kaetzchen"
PubsubPluginSubsystem = "pubsub_plugin"
OutgoingConnSubsystem = "outgoing_conn"
PKISubsystem = "pki"
ProviderSubsystem = "provider"
Expand Down
1 change: 1 addition & 0 deletions internal/glue/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type PKI interface {
StartWorker()
OutgoingDestinations() map[[constants.NodeIDLength]byte]*pki.MixDescriptor
AuthenticateConnection(*wire.PeerCredentials, bool) (*pki.MixDescriptor, bool, bool)
GetCachedConsensusDoc(uint64) (*pki.Document, error)
GetRawConsensus(uint64) ([]byte, error)
}

Expand Down
93 changes: 73 additions & 20 deletions internal/packet/packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ package packet

import (
"fmt"
mRand "math/rand"
"sync"
"sync/atomic"
"time"

"github.com/katzenpost/core/constants"
"github.com/katzenpost/core/crypto/rand"
"github.com/katzenpost/core/pki"
"github.com/katzenpost/core/sphinx"
"github.com/katzenpost/core/sphinx/commands"
"github.com/katzenpost/core/utils"
Expand Down Expand Up @@ -215,12 +218,10 @@ func newRedundantError(cmd commands.RoutingCommand) error {
return fmt.Errorf("redundant command: %T", cmd)
}

func ParseForwardPacket(pkt *Packet) ([]byte, []byte, error) {
func ParseForwardPacket(pkt *Packet) ([]byte, [][]byte, error) {
const (
hdrLength = constants.SphinxPlaintextHeaderLength + sphinx.SURBLength
flagsPadding = 0
flagsSURB = 1
reserved = 0
hdrLength = constants.SphinxPlaintextHeaderLength
reserved = 0
)

// Sanity check the forward packet payload length.
Expand All @@ -230,26 +231,21 @@ func ParseForwardPacket(pkt *Packet) ([]byte, []byte, error) {

// Parse the payload, which should be a valid BlockSphinxPlaintext.
b := pkt.Payload
if len(b) < hdrLength {
return nil, nil, fmt.Errorf("truncated message block")
}
if b[1] != reserved {
return nil, nil, fmt.Errorf("invalid message reserved: 0x%02x", b[1])
}
ct := b[hdrLength:]
var surb []byte
switch b[0] {
case flagsPadding:
case flagsSURB:
surb = b[constants.SphinxPlaintextHeaderLength:hdrLength]
default:
return nil, nil, fmt.Errorf("invalid message flags: 0x%02x", b[0])
surbCount := int(b[0])
if (surbCount * sphinx.SURBLength) >= (constants.ForwardPayloadLength - hdrLength) {
return nil, nil, fmt.Errorf("invalid message SURB count: %d", uint8(b[0]))
}
if len(ct) != constants.UserForwardPayloadLength {
return nil, nil, fmt.Errorf("mis-sized user payload: %v", len(ct))
surbs := make([][]byte, surbCount)
startOffset := 2
for i := 0; i < surbCount; i++ {
surbs[i] = b[startOffset : startOffset+sphinx.SURBLength]
startOffset += sphinx.SURBLength
}

return ct, surb, nil
ct := b[hdrLength+(surbCount*sphinx.SURBLength):]
return ct, surbs, nil
}

func NewPacketFromSURB(pkt *Packet, surb, payload []byte) (*Packet, error) {
Expand Down Expand Up @@ -303,3 +299,60 @@ func NewPacketFromSURB(pkt *Packet, surb, payload []byte) (*Packet, error) {

return respPkt, nil
}

func NewProviderDelay(rng *mRand.Rand, doc *pki.Document) uint32 {
delay := uint64(rand.Exp(rng, doc.Mu)) + 1
if doc.MuMaxDelay > 0 && delay > doc.MuMaxDelay {
delay = doc.MuMaxDelay
}
return uint32(delay)
}

// NewDelayedPacketFromSURB creates a new Packet given a SURB, payload and, delay
// where the specified delay is for the first hop, the Provider.
func NewDelayedPacketFromSURB(delay uint32, surb, payload []byte) (*Packet, error) {
// Pad out payloads to the full packet size.
var respPayload [constants.ForwardPayloadLength]byte
switch {
case len(payload) == 0:
case len(payload) > constants.ForwardPayloadLength:
return nil, fmt.Errorf("oversized response payload: %v", len(payload))
default:
copy(respPayload[:], payload)
}

// Build a response packet using a SURB.
//
// TODO/perf: This is a crypto operation that is paralleizable, and
// could be handled by the crypto worker(s), since those are allocated
// based on hardware acceleration considerations. However the forward
// packet processing doesn't constantly utilize the AES-NI units due
// to the non-AEZ components of a Sphinx Unwrap operation.
rawRespPkt, firstHop, err := sphinx.NewPacketFromSURB(surb, respPayload[:])
if err != nil {
return nil, err
}

// Build the command vector for the SURB-ACK
cmds := make([]commands.RoutingCommand, 0, 2)

nextHopCmd := new(commands.NextNodeHop)
copy(nextHopCmd.ID[:], firstHop[:])
cmds = append(cmds, nextHopCmd)

nodeDelayCmd := new(commands.NodeDelay)
nodeDelayCmd.Delay = delay
cmds = append(cmds, nodeDelayCmd)

// Assemble the response packet.
respPkt, _ := New(rawRespPkt)
respPkt.Set(nil, cmds)

respPkt.Delay = time.Duration(nodeDelayCmd.Delay) * time.Millisecond
respPkt.MustForward = true

// XXX: This should probably fudge the delay to account for processing
// time.

return respPkt, nil
}
104 changes: 104 additions & 0 deletions internal/packet/packet_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// packet_test.go - Katzenpost server packet structure tests.
// Copyright (C) 2020 David Stainton.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

// Package packet implements the Katzenpost server side packet structure.
package packet

import (
"testing"

"github.com/katzenpost/core/constants"
"github.com/katzenpost/core/sphinx"
"github.com/stretchr/testify/require"
)

func TestParseForwardPacket(t *testing.T) {
require := require.New(t)

// test that wrong payload size is an error
wrongPayload := [constants.ForwardPayloadLength + 123]byte{}
pkt := &Packet{
Payload: wrongPayload[:],
}
_, _, err := ParseForwardPacket(pkt)
require.Error(err)

// test that the wrong reserved value is an error
payload := [constants.ForwardPayloadLength]byte{}
pkt = &Packet{
Payload: payload[:],
}
pkt.Payload[1] = byte(1)
_, _, err = ParseForwardPacket(pkt)
require.Error(err)

// test that an invalid SURB count is an error
payload = [constants.ForwardPayloadLength]byte{}
pkt = &Packet{
Payload: payload[:],
}
pkt.Payload[0] = byte(255)
_, _, err = ParseForwardPacket(pkt)
require.Error(err)

// test that an invalid SURB count is an error
payload = [constants.ForwardPayloadLength]byte{}
pkt = &Packet{
Payload: payload[:],
}
pkt.Payload[0] = byte(93)
_, _, err = ParseForwardPacket(pkt)
require.Error(err)

// test that the 1 SURB case is handled properly
payload = [constants.ForwardPayloadLength]byte{}
pkt = &Packet{
Payload: payload[:],
}
pkt.Payload[0] = byte(1)
pkt.Payload[constants.SphinxPlaintextHeaderLength+sphinx.SURBLength] = 1
ct, surbs, err := ParseForwardPacket(pkt)
require.NoError(err)
require.Equal(1, len(surbs))
require.Equal(constants.UserForwardPayloadLength, len(ct))
require.Equal(int(ct[0]), 1)
require.Equal(int(ct[1]), 0)

// test that the 2 SURB case is handled properly
payload = [constants.ForwardPayloadLength]byte{}
pkt = &Packet{
Payload: payload[:],
}
pkt.Payload[0] = byte(2)
pkt.Payload[constants.SphinxPlaintextHeaderLength+sphinx.SURBLength+sphinx.SURBLength] = 1
ct, surbs, err = ParseForwardPacket(pkt)
require.NoError(err)
require.Equal(2, len(surbs))
require.NotEqual(constants.UserForwardPayloadLength, len(ct))
require.Equal(int(ct[0]), 1)
require.Equal(int(ct[1]), 0)

// test that a large SURB count is OK
payload = [constants.ForwardPayloadLength]byte{}
pkt = &Packet{
Payload: payload[:],
}
pkt.Payload[0] = byte(92)
ct, surbs, err = ParseForwardPacket(pkt)
require.NoError(err)
require.Equal(92, len(surbs))
require.Equal((constants.ForwardPayloadLength-constants.SphinxPlaintextHeaderLength)-(92*sphinx.SURBLength), len(ct))
}
11 changes: 11 additions & 0 deletions internal/pki/pki.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,17 @@ func (p *pki) OutgoingDestinations() map[[sConstants.NodeIDLength]byte]*cpki.Mix
return descMap
}

// GetCachedConsensusDoc returns a cache PKI doc for the given epoch.
func (p *pki) GetCachedConsensusDoc(epoch uint64) (*cpki.Document, error) {
p.RLock()
defer p.RUnlock()
entry, ok := p.docs[epoch]
if !ok {
return nil, errors.New("failed to retrieve cached pki doc")
}
return entry.Document(), nil
}

func (p *pki) GetRawConsensus(epoch uint64) ([]byte, error) {
if ok, err := p.getFailedFetch(epoch); ok {
p.log.Debugf("GetRawConsensus failure: no cached PKI document for epoch %v: %v", epoch, err)
Expand Down
18 changes: 13 additions & 5 deletions internal/provider/kaetzchen/cbor_plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,17 +123,25 @@ func (k *CBORPluginWorker) processKaetzchen(pkt *packet.Packet, pluginClient cbo
defer kaetzchenRequestsTimer.ObserveDuration()
defer pkt.Dispose()

ct, surb, err := packet.ParseForwardPacket(pkt)
ct, surbs, err := packet.ParseForwardPacket(pkt)
if err != nil {
k.log.Debugf("Dropping Kaetzchen request: %v (%v)", pkt.ID, err)
kaetzchenRequestsDropped.Inc()
return
}

if len(surbs) > 1 {
k.log.Debugf("Received multi-SURB payload, dropping Kaetzchen request: %v (%v)", pkt.ID, err)
kaetzchenRequestsDropped.Inc()
return
}
hasSURB := false
if len(surbs) == 1 {
hasSURB = true
}
resp, err := pluginClient.OnRequest(&cborplugin.Request{
ID: pkt.ID,
Payload: ct,
HasSURB: surb != nil,
HasSURB: hasSURB,
})
switch err {
case nil:
Expand All @@ -151,10 +159,10 @@ func (k *CBORPluginWorker) processKaetzchen(pkt *packet.Packet, pluginClient cbo
}

// Iff there is a SURB, generate a SURB-Reply and schedule.
if surb != nil {
if hasSURB {
// Prepend the response header.
resp = append([]byte{0x01, 0x00}, resp...)

surb := surbs[0]
respPkt, err := packet.NewPacketFromSURB(pkt, surb, resp)
if err != nil {
k.log.Debugf("Failed to generate SURB-Reply: %v (%v)", pkt.ID, err)
Expand Down
Loading