Skip to content

Commit

Permalink
Merge pull request #82 from kaleido-io/query-target
Browse files Browse the repository at this point in the history
Configurable Chaincode Query targets; new Query endpoints
  • Loading branch information
peterbroadhurst authored Mar 11, 2022
2 parents f54ba80 + 0b0c834 commit 180b383
Show file tree
Hide file tree
Showing 15 changed files with 517 additions and 233 deletions.
2 changes: 1 addition & 1 deletion internal/events/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (s *subscription) getEventTimestamp(evt *eventsapi.EventEntry) {
return
}
// we didn't find the timestamp in our cache, query the node for the block header where we can find the timestamp
_, block, err := s.client.QueryBlock(s.info.ChannelId, evt.BlockNumber, s.info.Signer)
_, block, err := s.client.QueryBlock(s.info.ChannelId, s.info.Signer, evt.BlockNumber, nil)
if err != nil {
log.Errorf("Unable to retrieve block[%s] timestamp: %s", blockNumber, err)
evt.Timestamp = 0 // set to 0, we were not able to retrieve the timestamp.
Expand Down
5 changes: 3 additions & 2 deletions internal/fabric/client/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,10 @@ type RegistrationWrapper struct {

type RPCClient interface {
Invoke(channelId, signer, chaincodeName, method string, args []string, isInit bool) (*TxReceipt, error)
Query(channelId, signer, chaincodeName, method string, args []string) ([]byte, error)
Query(channelId, signer, chaincodeName, method string, args []string, strongread bool) ([]byte, error)
QueryChainInfo(channelId, signer string) (*fab.BlockchainInfoResponse, error)
QueryBlock(channelId string, blockNumber uint64, signer string) (*utils.RawBlock, *utils.Block, error)
QueryBlock(channelId string, signer string, blocknumber uint64, blockhash []byte) (*utils.RawBlock, *utils.Block, error)
QueryBlockByTxId(channelId string, signer string, txId string) (*utils.RawBlock, *utils.Block, error)
QueryTransaction(channelId, signer, txId string) (map[string]interface{}, error)
SubscribeEvent(subInfo *eventsapi.SubscriptionInfo, since uint64) (*RegistrationWrapper, <-chan *fab.BlockEvent, <-chan *fab.CCEvent, error)
Unregister(*RegistrationWrapper)
Expand Down
146 changes: 37 additions & 109 deletions internal/fabric/client/client_ccp.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ import (
"github.com/hyperledger/fabric-sdk-go/pkg/fabsdk"
mspImpl "github.com/hyperledger/fabric-sdk-go/pkg/msp"
"github.com/hyperledger/firefly-fabconnect/internal/errors"
eventsapi "github.com/hyperledger/firefly-fabconnect/internal/events/api"
"github.com/hyperledger/firefly-fabconnect/internal/fabric/utils"
log "github.com/sirupsen/logrus"
)

Expand All @@ -41,18 +39,10 @@ type ccpClientWrapper struct {
signer *msp.IdentityIdentifier
}

// defined to allow mocking in tests
type channelCreator func(context.ChannelProvider) (*channel.Client, error)

type ccpRPCWrapper struct {
txTimeout int
sdk *fabsdk.FabricSDK
cryptoSuiteConfig core.CryptoSuiteConfig
userStore msp.UserStore
idClient IdentityClient
ledgerClientWrapper *ledgerClientWrapper
eventClientWrapper *eventClientWrapper
channelCreator channelCreator
*commonRPCWrapper
cryptoSuiteConfig core.CryptoSuiteConfig
userStore msp.UserStore
// one channel client per channel ID, per signer ID
channelClients map[string](map[string]*ccpClientWrapper)
}
Expand All @@ -71,15 +61,18 @@ func newRPCClientFromCCP(configProvider core.ConfigProvider, txTimeout int, user

log.Infof("New gRPC connection established")
w := &ccpRPCWrapper{
sdk: ledgerClientWrapper.sdk,
cryptoSuiteConfig: cryptoConfig,
userStore: userStore,
idClient: idClient,
ledgerClientWrapper: ledgerClientWrapper,
eventClientWrapper: eventClientWrapper,
channelClients: make(map[string]map[string]*ccpClientWrapper),
channelCreator: createChannelClient,
txTimeout: txTimeout,
commonRPCWrapper: &commonRPCWrapper{
sdk: ledgerClientWrapper.sdk,
configProvider: configProvider,
idClient: idClient,
ledgerClientWrapper: ledgerClientWrapper,
eventClientWrapper: eventClientWrapper,
channelCreator: createChannelClient,
txTimeout: txTimeout,
},
cryptoSuiteConfig: cryptoConfig,
userStore: userStore,
channelClients: make(map[string]map[string]*ccpClientWrapper),
}
return w, nil
}
Expand All @@ -97,7 +90,7 @@ func (w *ccpRPCWrapper) Invoke(channelId, signer, chaincodeName, method string,
return newReceipt(result, txStatus, signerID), err
}

func (w *ccpRPCWrapper) Query(channelId, signer, chaincodeName, method string, args []string) ([]byte, error) {
func (w *ccpRPCWrapper) Query(channelId, signer, chaincodeName, method string, args []string, strongread bool) ([]byte, error) {
log.Tracef("RPC [%s:%s:%s] --> %+v", channelId, chaincodeName, method, args)

client, err := w.getChannelClient(channelId, signer)
Expand All @@ -106,74 +99,32 @@ func (w *ccpRPCWrapper) Query(channelId, signer, chaincodeName, method string, a
return nil, errors.Errorf("Failed to get channel client. %s", err)
}

result, err := client.channelClient.Query(
channel.Request{
ChaincodeID: chaincodeName,
Fcn: method,
Args: convert(args),
},
channel.WithRetry(retry.DefaultChannelOpts),
)
if err != nil {
log.Errorf("Failed to send query [%s:%s:%s]. %s", channelId, chaincodeName, method, err)
return nil, err
}

log.Tracef("RPC [%s:%s:%s] <-- %+v", channelId, chaincodeName, method, result)
return result.Payload, nil
}

func (w *ccpRPCWrapper) QueryTransaction(channelId, signer, txId string) (map[string]interface{}, error) {
log.Tracef("RPC [%s] --> QueryTransaction %s", channelId, txId)

result, err := w.ledgerClientWrapper.queryTransaction(channelId, signer, txId)
if err != nil {
log.Errorf("Failed to query transaction on channel %s. %s", channelId, err)
return nil, err
}

log.Tracef("RPC [%s] <-- %+v", channelId, result)
return result, nil
}

func (w *ccpRPCWrapper) QueryChainInfo(channelId, signer string) (*fab.BlockchainInfoResponse, error) {
log.Tracef("RPC [%s] --> ChainInfo", channelId)

result, err := w.ledgerClientWrapper.queryChainInfo(channelId, signer)
if err != nil {
log.Errorf("Failed to query chain info on channel %s. %s", channelId, err)
return nil, err
req := channel.Request{
ChaincodeID: chaincodeName,
Fcn: method,
Args: convert(args),
}

log.Tracef("RPC [%s] <-- %+v", channelId, result)
return result, nil
}

func (w *ccpRPCWrapper) QueryBlock(channelId string, blockNumber uint64, signer string) (*utils.RawBlock, *utils.Block, error) {
log.Tracef("RPC [%s] --> QueryBlock %v", channelId, blockNumber)

rawblock, block, err := w.ledgerClientWrapper.queryBlock(channelId, blockNumber, signer)
if err != nil {
log.Errorf("Failed to query block %v on channel %s. %s", blockNumber, channelId, err)
return nil, nil, err
var result channel.Response
var err1 error
if strongread {
// strongread means querying a set of peers that would have fulfilled the
// endorsement policies and make sure they all have the same results
result, err1 = client.channelClient.Query(req, channel.WithRetry(retry.DefaultChannelOpts))
} else {
peerEndpoint, err := getFirstPeerEndpointFromConfig(w.configProvider)
if err != nil {
return nil, err
}
result, err1 = client.channelClient.Query(req, channel.WithRetry(retry.DefaultChannelOpts), channel.WithTargetEndpoints(peerEndpoint))
}

log.Tracef("RPC [%s] <-- success", channelId)
return rawblock, block, nil
}

// The returned registration must be closed when done
func (w *ccpRPCWrapper) SubscribeEvent(subInfo *eventsapi.SubscriptionInfo, since uint64) (*RegistrationWrapper, <-chan *fab.BlockEvent, <-chan *fab.CCEvent, error) {
reg, blockEventCh, ccEventCh, err := w.eventClientWrapper.subscribeEvent(subInfo, since)
if err != nil {
log.Errorf("Failed to subscribe to event [%s:%s:%s]. %s", subInfo.Stream, subInfo.ChannelId, subInfo.Filter.ChaincodeId, err)
return nil, nil, nil, err
if err1 != nil {
log.Errorf("Failed to send query [%s:%s:%s]. %s", channelId, chaincodeName, method, err)
return nil, err1
}
return reg, blockEventCh, ccEventCh, nil
}

func (w *ccpRPCWrapper) Unregister(regWrapper *RegistrationWrapper) {
regWrapper.eventClient.Unregister(regWrapper.registration)
log.Tracef("RPC [%s:%s:%s] <-- %+v", channelId, chaincodeName, method, result)
return result.Payload, nil
}

func (w *ccpRPCWrapper) getChannelClient(channelId string, signer string) (*ccpClientWrapper, error) {
Expand Down Expand Up @@ -212,10 +163,6 @@ func (w *ccpRPCWrapper) Close() error {
return nil
}

func createChannelClient(channelProvider context.ChannelProvider) (*channel.Client, error) {
return channel.New(channelProvider)
}

func (w *ccpRPCWrapper) sendTransaction(channelId, signer, chaincodeName, method string, args []string, isInit bool) (*msp.IdentityIdentifier, []byte, *fab.TxStatusEvent, error) {
client, err := w.getChannelClient(channelId, signer)
if err != nil {
Expand Down Expand Up @@ -247,22 +194,3 @@ func (w *ccpRPCWrapper) sendTransaction(channelId, signer, chaincodeName, method
}
return client.signer, result.Payload, &txStatus, nil
}

func convert(args []string) [][]byte {
result := [][]byte{}
for _, v := range args {
result = append(result, []byte(v))
}
return result
}

func newReceipt(responsePayload []byte, status *fab.TxStatusEvent, signerID *msp.IdentityIdentifier) *TxReceipt {
return &TxReceipt{
SignerMSP: signerID.MSPID,
Signer: signerID.ID,
TransactionID: status.TxID,
Status: status.TxValidationCode,
BlockNumber: status.BlockNumber,
SourcePeer: status.SourceURL,
}
}
154 changes: 154 additions & 0 deletions internal/fabric/client/client_common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package client

import (
"fmt"

"github.com/hyperledger/fabric-sdk-go/pkg/client/channel"
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context"
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/core"
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab"
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/msp"
"github.com/hyperledger/fabric-sdk-go/pkg/fabsdk"
"github.com/hyperledger/firefly-fabconnect/internal/errors"
eventsapi "github.com/hyperledger/firefly-fabconnect/internal/events/api"
"github.com/hyperledger/firefly-fabconnect/internal/fabric/utils"
log "github.com/sirupsen/logrus"
)

type commonRPCWrapper struct {
txTimeout int
configProvider core.ConfigProvider
sdk *fabsdk.FabricSDK
idClient IdentityClient
ledgerClientWrapper *ledgerClientWrapper
eventClientWrapper *eventClientWrapper
channelCreator channelCreator
}

func getOrgFromConfig(config core.ConfigProvider) (string, error) {
configBackend, err := config()
if err != nil {
return "", err
}
if len(configBackend) != 1 {
return "", errors.Errorf("Invalid config file")
}

cfg := configBackend[0]
value, ok := cfg.Lookup("client.organization")
if !ok {
return "", errors.Errorf("No client organization defined in the config")
}

return value.(string), nil
}

func getFirstPeerEndpointFromConfig(config core.ConfigProvider) (string, error) {
org, err := getOrgFromConfig(config)
if err != nil {
return "", err
}
configBackend, _ := config()
cfg := configBackend[0]
value, ok := cfg.Lookup(fmt.Sprintf("organizations.%s.peers", org))
if !ok {
return "", errors.Errorf("No peers list found in the organization %s", org)
}
peers := value.([]interface{})
if len(peers) < 1 {
return "", errors.Errorf("Peers list for organization %s is empty", org)
}
return peers[0].(string), nil
}

// defined to allow mocking in tests
type channelCreator func(context.ChannelProvider) (*channel.Client, error)

func createChannelClient(channelProvider context.ChannelProvider) (*channel.Client, error) {
return channel.New(channelProvider)
}

func newReceipt(responsePayload []byte, status *fab.TxStatusEvent, signerID *msp.IdentityIdentifier) *TxReceipt {
return &TxReceipt{
SignerMSP: signerID.MSPID,
Signer: signerID.ID,
TransactionID: status.TxID,
Status: status.TxValidationCode,
BlockNumber: status.BlockNumber,
SourcePeer: status.SourceURL,
}
}

func convert(args []string) [][]byte {
result := [][]byte{}
for _, v := range args {
result = append(result, []byte(v))
}
return result
}

func (w *commonRPCWrapper) QueryChainInfo(channelId, signer string) (*fab.BlockchainInfoResponse, error) {
log.Tracef("RPC [%s] --> QueryChainInfo", channelId)

result, err := w.ledgerClientWrapper.queryChainInfo(channelId, signer)
if err != nil {
log.Errorf("Failed to query chain info on channel %s. %s", channelId, err)
return nil, err
}

log.Tracef("RPC [%s] <-- %+v", channelId, result)
return result, nil
}

func (w *commonRPCWrapper) QueryBlock(channelId string, signer string, blockNumber uint64, blockhash []byte) (*utils.RawBlock, *utils.Block, error) {
log.Tracef("RPC [%s] --> QueryBlock %v", channelId, blockNumber)

rawblock, block, err := w.ledgerClientWrapper.queryBlock(channelId, signer, blockNumber, blockhash)
if err != nil {
log.Errorf("Failed to query block %v on channel %s. %s", blockNumber, channelId, err)
return nil, nil, err
}

log.Tracef("RPC [%s] <-- success", channelId)
return rawblock, block, nil
}

func (w *commonRPCWrapper) QueryBlockByTxId(channelId string, signer string, txId string) (*utils.RawBlock, *utils.Block, error) {
log.Tracef("RPC [%s] --> QueryBlockByTxId %s", channelId, txId)

rawblock, block, err := w.ledgerClientWrapper.queryBlockByTxId(channelId, signer, txId)
if err != nil {
log.Errorf("Failed to query block by transaction Id %s on channel %s. %s", txId, channelId, err)
return nil, nil, err
}

log.Tracef("RPC [%s] <-- success", channelId)
return rawblock, block, nil
}

func (w *commonRPCWrapper) QueryTransaction(channelId, signer, txId string) (map[string]interface{}, error) {
log.Tracef("RPC [%s] --> QueryTransaction %s", channelId, txId)

result, err := w.ledgerClientWrapper.queryTransaction(channelId, signer, txId)
if err != nil {
log.Errorf("Failed to query transaction on channel %s. %s", channelId, err)
return nil, err
}

log.Tracef("RPC [%s] <-- %+v", channelId, result)
return result, nil
}

// The returned registration must be closed when done
func (w *commonRPCWrapper) SubscribeEvent(subInfo *eventsapi.SubscriptionInfo, since uint64) (*RegistrationWrapper, <-chan *fab.BlockEvent, <-chan *fab.CCEvent, error) {
reg, blockEventCh, ccEventCh, err := w.eventClientWrapper.subscribeEvent(subInfo, since)
if err != nil {
log.Errorf("Failed to subscribe to event [%s:%s:%s]. %s", subInfo.Stream, subInfo.ChannelId, subInfo.Filter.ChaincodeId, err)
return nil, nil, nil, err
}
return reg, blockEventCh, ccEventCh, nil
}

func (w *commonRPCWrapper) Unregister(regWrapper *RegistrationWrapper) {
regWrapper.eventClient.Unregister(regWrapper.registration)
}
Loading

0 comments on commit 180b383

Please sign in to comment.