Skip to content

Commit

Permalink
add put state batch
Browse files Browse the repository at this point in the history
Signed-off-by: Fedor Partanskiy <[email protected]>
  • Loading branch information
pfi79 committed Jan 15, 2025
1 parent 299e4f5 commit 836c07f
Show file tree
Hide file tree
Showing 11 changed files with 322 additions and 37 deletions.
2 changes: 1 addition & 1 deletion pkg/statebased/statebasedimpl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestAddOrg(t *testing.T) {

// bad role type
err = ep.AddOrgs("unknown", "Org1")
assert.Equal(t, &statebased.RoleTypeDoesNotExistError{RoleType: statebased.RoleType("unknown")}, err)
assert.Equal(t, &statebased.RoleTypeDoesNotExistError{RoleType: "unknown"}, err)
assert.EqualError(t, err, "role type unknown does not exist")

epBytes, err := ep.Policy()
Expand Down
3 changes: 1 addition & 2 deletions shim/chaincodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,12 @@ import (

"github.com/hyperledger/fabric-chaincode-go/v2/shim/internal"
"github.com/hyperledger/fabric-protos-go-apiv2/peer"

"google.golang.org/grpc/keepalive"
)

// TLSProperties passed to ChaincodeServer
type TLSProperties struct {
//Disabled forces default to be TLS enabled
// Disabled forces default to be TLS enabled
Disabled bool
Key []byte
Cert []byte
Expand Down
213 changes: 212 additions & 1 deletion shim/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ const (
created state = "created" // start state
established state = "established" // connection established
ready state = "ready" // ready for requests

defaultMaxSizeWriteBatch = 100
prefixMetaDataWriteBatch = "m"
prefixStateDataWriteBatch = "s"
)

// PeerChaincodeStream is the common stream interface for Peer - chaincode communication.
Expand Down Expand Up @@ -46,6 +50,13 @@ type Handler struct {
cc Chaincode
// state holds the current state of this handler.
state state
// if you can send the changes in batches.
usePeerWriteBatch bool
maxSizeWriteBatch uint32
batchMutex sync.RWMutex
batch map[string]map[string]*peer.WriteRecord
startWriteBatchMutex sync.RWMutex
startWriteBatch map[string]bool

// Multiple queries (and one transaction) with different txids can be executing in parallel for this chaincode
// responseChannels is the channel on which responses are communicated by the shim to the chaincodeStub.
Expand Down Expand Up @@ -150,6 +161,8 @@ func newChaincodeHandler(peerChatStream PeerChaincodeStream, chaincode Chaincode
cc: chaincode,
responseChannels: map[string]chan *peer.ChaincodeMessage{},
state: created,
batch: map[string]map[string]*peer.WriteRecord{},
startWriteBatch: map[string]bool{},
}
}

Expand Down Expand Up @@ -188,6 +201,11 @@ func (h *Handler) handleInit(msg *peer.ChaincodeMessage) (*peer.ChaincodeMessage
return nil, fmt.Errorf("failed to marshal response: %s", err)
}

err = h.sendBatch(msg.ChannelId, msg.Txid)
if err != nil {
return nil, fmt.Errorf("failed send batch: %s", err)
}

return &peer.ChaincodeMessage{Type: peer.ChaincodeMessage_COMPLETED, Payload: resBytes, Txid: msg.Txid, ChaincodeEvent: stub.chaincodeEvent, ChannelId: stub.ChannelID}, nil
}

Expand All @@ -214,6 +232,11 @@ func (h *Handler) handleTransaction(msg *peer.ChaincodeMessage) (*peer.Chaincode
return nil, fmt.Errorf("failed to marshal response: %s", err)
}

err = h.sendBatch(msg.ChannelId, msg.Txid)
if err != nil {
return nil, fmt.Errorf("failed send batch: %s", err)
}

return &peer.ChaincodeMessage{Type: peer.ChaincodeMessage_COMPLETED, Payload: resBytes, Txid: msg.Txid, ChaincodeEvent: stub.chaincodeEvent, ChannelId: stub.ChannelID}, nil
}

Expand Down Expand Up @@ -312,6 +335,17 @@ func (h *Handler) handleGetStateMetadata(collection string, key string, channelI

// handlePutState communicates with the peer to put state information into the ledger.
func (h *Handler) handlePutState(collection string, key string, value []byte, channelID string, txid string) error {
if h.usePeerWriteBatch && h.isStartWriteBatch(channelID, txid) {
st := h.batchByID(channelID, txid)
st[prefixStateDataWriteBatch+collection+key] = &peer.WriteRecord{
Key: key,
Value: value,
Collection: collection,
Type: peer.WriteRecord_PUT_STATE,
}
return nil
}

// Construct payload for PUT_STATE
payloadBytes := marshalOrPanic(&peer.PutState{Collection: collection, Key: key, Value: value})

Expand Down Expand Up @@ -340,6 +374,19 @@ func (h *Handler) handlePutState(collection string, key string, value []byte, ch
func (h *Handler) handlePutStateMetadataEntry(collection string, key string, metakey string, metadata []byte, channelID string, txID string) error {
// Construct payload for PUT_STATE_METADATA
md := &peer.StateMetadata{Metakey: metakey, Value: metadata}

if h.usePeerWriteBatch && h.isStartWriteBatch(channelID, txID) {
st := h.batchByID(channelID, txID)
st[prefixMetaDataWriteBatch+collection+key] = &peer.WriteRecord{
Key: key,
Collection: collection,
Metadata: md,
Type: peer.WriteRecord_PUT_STATE_METADATA,
}

return nil
}

payloadBytes := marshalOrPanic(&peer.PutStateMetadata{Collection: collection, Key: key, Metadata: md})

msg := &peer.ChaincodeMessage{Type: peer.ChaincodeMessage_PUT_STATE_METADATA, Payload: payloadBytes, Txid: txID, ChannelId: channelID}
Expand All @@ -365,6 +412,16 @@ func (h *Handler) handlePutStateMetadataEntry(collection string, key string, met

// handleDelState communicates with the peer to delete a key from the state in the ledger.
func (h *Handler) handleDelState(collection string, key string, channelID string, txid string) error {
if h.usePeerWriteBatch && h.isStartWriteBatch(channelID, txid) {
st := h.batchByID(channelID, txid)
st[prefixStateDataWriteBatch+collection+key] = &peer.WriteRecord{
Key: key,
Collection: collection,
Type: peer.WriteRecord_DEL_STATE,
}
return nil
}

payloadBytes := marshalOrPanic(&peer.DelState{Collection: collection, Key: key})
msg := &peer.ChaincodeMessage{Type: peer.ChaincodeMessage_DEL_STATE, Payload: payloadBytes, Txid: txid, ChannelId: channelID}
// Execute the request and get response
Expand All @@ -388,6 +445,16 @@ func (h *Handler) handleDelState(collection string, key string, channelID string

// handlePurgeState communicates with the peer to purge a state from private data
func (h *Handler) handlePurgeState(collection string, key string, channelID string, txid string) error {
if h.usePeerWriteBatch && h.isStartWriteBatch(channelID, txid) {
st := h.batchByID(channelID, txid)
st[prefixStateDataWriteBatch+collection+key] = &peer.WriteRecord{
Key: key,
Collection: collection,
Type: peer.WriteRecord_PURGE_PRIVATE_DATA,
}
return nil
}

payloadBytes := marshalOrPanic(&peer.DelState{Collection: collection, Key: key})
msg := &peer.ChaincodeMessage{Type: peer.ChaincodeMessage_PURGE_PRIVATE_DATA, Payload: payloadBytes, Txid: txid, ChannelId: channelID}
// Execute the request and get response
Expand All @@ -409,6 +476,111 @@ func (h *Handler) handlePurgeState(collection string, key string, channelID stri
return fmt.Errorf("[%s] incorrect chaincode message %s received. Expecting %s or %s", shorttxid(responseMsg.Txid), responseMsg.Type, peer.ChaincodeMessage_RESPONSE, peer.ChaincodeMessage_ERROR)
}

// handleWriteBatch communicates with the peer to write batch to state all changes information into the ledger.
func (h *Handler) handleWriteBatch(batch *peer.WriteBatchState, channelID string, txid string) error {
// Construct payload for PUT_STATE_BATCH
payloadBytes := marshalOrPanic(batch)

msg := &peer.ChaincodeMessage{Type: peer.ChaincodeMessage_WRITE_BATCH_STATE, Payload: payloadBytes, Txid: txid, ChannelId: channelID}

// Execute the request and get response
responseMsg, err := h.callPeerWithChaincodeMsg(msg, channelID, txid)
if err != nil {
return fmt.Errorf("[%s] error sending %s: %s", msg.Txid, peer.ChaincodeMessage_WRITE_BATCH_STATE, err)
}

if responseMsg.Type == peer.ChaincodeMessage_RESPONSE {
// Success response
return nil
}

if responseMsg.Type == peer.ChaincodeMessage_ERROR {
// Error response
return fmt.Errorf("%s", responseMsg.Payload[:])
}

// Incorrect chaincode message received
return fmt.Errorf("[%s] incorrect chaincode message %s received. Expecting %s or %s", shorttxid(responseMsg.Txid), responseMsg.Type, peer.ChaincodeMessage_RESPONSE, peer.ChaincodeMessage_ERROR)
}

func (h *Handler) sendBatch(channelID string, txid string) error {
if !h.usePeerWriteBatch || !h.isStartWriteBatch(channelID, txid) {
return nil
}

st := h.batchByID(channelID, txid)
txCtxID := transactionContextID(channelID, txid)

defer func() {
h.batchMutex.Lock()
h.startWriteBatchMutex.Lock()

delete(h.batch, txCtxID)
delete(h.startWriteBatch, txCtxID)

h.startWriteBatchMutex.Unlock()
h.batchMutex.Unlock()
}()

batch := &peer.WriteBatchState{}
for _, kv := range st {
batch.Rec = append(batch.Rec, kv)
if len(batch.Rec) >= int(h.maxSizeWriteBatch) {
err := h.handleWriteBatch(batch, channelID, txid)
if err != nil {
return fmt.Errorf("failed send batch: %s", err)
}
batch.Rec = batch.Rec[:0]
}
}

if len(batch.Rec) != 0 {
err := h.handleWriteBatch(batch, channelID, txid)
if err != nil {
return fmt.Errorf("failed send batch: %s", err)
}
}

return nil
}

func (h *Handler) handleStartWriteBatch(channelID string, txID string) error {
if !h.usePeerWriteBatch {
return errors.New("peer does not support write batch")
}

txCtxID := transactionContextID(channelID, txID)
h.startWriteBatchMutex.Lock()
defer h.startWriteBatchMutex.Unlock()

h.startWriteBatch[txCtxID] = true
return nil
}

func (h *Handler) handleFinishWriteBatch(channelID string, txID string) error {
return h.sendBatch(channelID, txID)
}

func (h *Handler) handleResetWriteBatch(channelID string, txID string) {
if !h.usePeerWriteBatch || !h.isStartWriteBatch(channelID, txID) {
return
}

txCtxID := transactionContextID(channelID, txID)

h.batchMutex.Lock()
delete(h.batch, txCtxID)
h.batchMutex.Unlock()
}

func (h *Handler) isStartWriteBatch(channelID string, txID string) bool {
txCtxID := transactionContextID(channelID, txID)
h.startWriteBatchMutex.RLock()
defer h.startWriteBatchMutex.RUnlock()

return h.startWriteBatch[txCtxID]
}

func (h *Handler) handleGetStateByRange(collection, startKey, endKey string, metadata []byte,
channelID string, txid string) (*peer.QueryResponse, error) {
// Send GET_STATE_BY_RANGE message to peer chaincode support
Expand Down Expand Up @@ -655,6 +827,23 @@ func (h *Handler) handleEstablished(msg *peer.ChaincodeMessage) error {
}

h.state = ready
if len(msg.Payload) == 0 {
return nil
}

ccAdditionalParams := &peer.ChaincodeAdditionalParams{}
err := proto.Unmarshal(msg.Payload, ccAdditionalParams)
if err != nil {
return nil
}

h.usePeerWriteBatch = ccAdditionalParams.UseWriteBatch
h.maxSizeWriteBatch = ccAdditionalParams.MaxSizeWriteBatch

if h.usePeerWriteBatch && h.maxSizeWriteBatch < defaultMaxSizeWriteBatch {
h.maxSizeWriteBatch = defaultMaxSizeWriteBatch
}

return nil
}

Expand Down Expand Up @@ -697,7 +886,29 @@ func (h *Handler) handleMessage(msg *peer.ChaincodeMessage, errc chan error) err
return nil
}

// marshalOrPanic attempts to marshal the provided protobbuf message but will panic
func (h *Handler) batchByID(channelID string, txID string) map[string]*peer.WriteRecord {
txCtxID := transactionContextID(channelID, txID)

h.batchMutex.RLock()
st, ok := h.batch[txCtxID]
h.batchMutex.RUnlock()
if ok {
return st
}

h.batchMutex.Lock()
defer h.batchMutex.Unlock()
st, ok = h.batch[txCtxID]
if ok {
return st
}

st = make(map[string]*peer.WriteRecord)
h.batch[txCtxID] = st
return st
}

// marshalOrPanic attempts to marshal the provided protobuf message but will panic
// when marshaling fails instead of returning an error.
func marshalOrPanic(msg proto.Message) []byte {
bytes, err := proto.Marshal(msg)
Expand Down
Loading

0 comments on commit 836c07f

Please sign in to comment.