Skip to content

Commit

Permalink
Add QueryBlockByTxId, and QueryBlockByHash support
Browse files Browse the repository at this point in the history
Signed-off-by: Jim Zhang <[email protected]>
  • Loading branch information
jimthematrix committed Mar 9, 2022
1 parent e0cb0ad commit 4c661d1
Show file tree
Hide file tree
Showing 11 changed files with 171 additions and 24 deletions.
2 changes: 1 addition & 1 deletion internal/events/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (s *subscription) getEventTimestamp(evt *eventsapi.EventEntry) {
return
}
// we didn't find the timestamp in our cache, query the node for the block header where we can find the timestamp
_, block, err := s.client.QueryBlock(s.info.ChannelId, evt.BlockNumber, s.info.Signer)
_, block, err := s.client.QueryBlock(s.info.ChannelId, s.info.Signer, evt.BlockNumber, nil)
if err != nil {
log.Errorf("Unable to retrieve block[%s] timestamp: %s", blockNumber, err)
evt.Timestamp = 0 // set to 0, we were not able to retrieve the timestamp.
Expand Down
3 changes: 2 additions & 1 deletion internal/fabric/client/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ type RPCClient interface {
Invoke(channelId, signer, chaincodeName, method string, args []string, isInit bool) (*TxReceipt, error)
Query(channelId, signer, chaincodeName, method string, args []string, strongread bool) ([]byte, error)
QueryChainInfo(channelId, signer string) (*fab.BlockchainInfoResponse, error)
QueryBlock(channelId string, blockNumber uint64, signer string) (*utils.RawBlock, *utils.Block, error)
QueryBlock(channelId string, signer string, blocknumber uint64, blockhash []byte) (*utils.RawBlock, *utils.Block, error)
QueryBlockByTxId(channelId string, signer string, txId string) (*utils.RawBlock, *utils.Block, error)
QueryTransaction(channelId, signer, txId string) (map[string]interface{}, error)
SubscribeEvent(subInfo *eventsapi.SubscriptionInfo, since uint64) (*RegistrationWrapper, <-chan *fab.BlockEvent, <-chan *fab.CCEvent, error)
Unregister(*RegistrationWrapper)
Expand Down
19 changes: 16 additions & 3 deletions internal/fabric/client/client_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func convert(args []string) [][]byte {
}

func (w *commonRPCWrapper) QueryChainInfo(channelId, signer string) (*fab.BlockchainInfoResponse, error) {
log.Tracef("RPC [%s] --> ChainInfo", channelId)
log.Tracef("RPC [%s] --> QueryChainInfo", channelId)

result, err := w.ledgerClientWrapper.queryChainInfo(channelId, signer)
if err != nil {
Expand All @@ -100,10 +100,10 @@ func (w *commonRPCWrapper) QueryChainInfo(channelId, signer string) (*fab.Blockc
return result, nil
}

func (w *commonRPCWrapper) QueryBlock(channelId string, blockNumber uint64, signer string) (*utils.RawBlock, *utils.Block, error) {
func (w *commonRPCWrapper) QueryBlock(channelId string, signer string, blockNumber uint64, blockhash []byte) (*utils.RawBlock, *utils.Block, error) {
log.Tracef("RPC [%s] --> QueryBlock %v", channelId, blockNumber)

rawblock, block, err := w.ledgerClientWrapper.queryBlock(channelId, blockNumber, signer)
rawblock, block, err := w.ledgerClientWrapper.queryBlock(channelId, signer, blockNumber, blockhash)
if err != nil {
log.Errorf("Failed to query block %v on channel %s. %s", blockNumber, channelId, err)
return nil, nil, err
Expand All @@ -113,6 +113,19 @@ func (w *commonRPCWrapper) QueryBlock(channelId string, blockNumber uint64, sign
return rawblock, block, nil
}

func (w *commonRPCWrapper) QueryBlockByTxId(channelId string, signer string, txId string) (*utils.RawBlock, *utils.Block, error) {
log.Tracef("RPC [%s] --> QueryBlockByTxId %s", channelId, txId)

rawblock, block, err := w.ledgerClientWrapper.queryBlockByTxId(channelId, signer, txId)
if err != nil {
log.Errorf("Failed to query block by transaction Id %s on channel %s. %s", txId, channelId, err)
return nil, nil, err
}

log.Tracef("RPC [%s] <-- success", channelId)
return rawblock, block, nil
}

func (w *commonRPCWrapper) QueryTransaction(channelId, signer, txId string) (map[string]interface{}, error) {
log.Tracef("RPC [%s] --> QueryTransaction %s", channelId, txId)

Expand Down
24 changes: 22 additions & 2 deletions internal/fabric/client/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package client

import (
"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric-sdk-go/pkg/client/ledger"
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context"
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/core"
Expand Down Expand Up @@ -58,12 +59,31 @@ func (l *ledgerClientWrapper) queryChainInfo(channelId, signer string) (*fab.Blo
return result, nil
}

func (l *ledgerClientWrapper) queryBlock(channelId string, blockNumber uint64, signer string) (*utils.RawBlock, *utils.Block, error) {
func (l *ledgerClientWrapper) queryBlock(channelId string, signer string, blockNumber uint64, blockhash []byte) (*utils.RawBlock, *utils.Block, error) {
client, err := l.getLedgerClient(channelId, signer)
if err != nil {
return nil, nil, errors.Errorf("Failed to get channel client. %s", err)
}
result, err := client.QueryBlock(blockNumber)
var result *common.Block
var err1 error
if blockhash == nil {
result, err1 = client.QueryBlock(blockNumber)
} else {
result, err1 = client.QueryBlockByHash(blockhash)
}
if err1 != nil {
return nil, nil, err1
}
rawblock, block, err := utils.DecodeBlock(result)
return rawblock, block, err
}

func (l *ledgerClientWrapper) queryBlockByTxId(channelId string, signer string, txId string) (*utils.RawBlock, *utils.Block, error) {
client, err := l.getLedgerClient(channelId, signer)
if err != nil {
return nil, nil, errors.Errorf("Failed to get channel client. %s", err)
}
result, err := client.QueryBlockByTxID(fab.TransactionID(txId))
if err != nil {
return nil, nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion internal/fabric/test/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ func MockRPCClient(fromBlock string, withReset ...bool) *mockfabric.RPCClient {
rpc.On("SubscribeEvent", mock.Anything, mock.Anything).Return(nil, roBlockEventChan, roCCEventChan, nil)
rpc.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(chaincodeResult, nil)
rpc.On("QueryChainInfo", mock.Anything, mock.Anything).Return(res, nil)
rpc.On("QueryBlock", mock.Anything, mock.Anything, mock.Anything).Return(rawBlock, block, nil)
rpc.On("QueryBlock", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(rawBlock, block, nil)
rpc.On("QueryBlockByTxId", mock.Anything, mock.Anything, mock.Anything).Return(rawBlock, block, nil)
rpc.On("QueryTransaction", mock.Anything, mock.Anything, mock.Anything).Return(txResult, nil)
rpc.On("Unregister", mock.Anything).Return()

Expand Down
8 changes: 7 additions & 1 deletion internal/messages/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,13 @@ type GetChainInfo struct {

type GetBlock struct {
RequestCommon
BlockNumber uint64 `json:"blockNumber"`
BlockNumber uint64
BlockHash []byte
}

type GetBlockByTxId struct {
RequestCommon
TxId string
}

// SendTransaction message instructs the bridge to install a contract
Expand Down
10 changes: 10 additions & 0 deletions internal/rest/restgateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,16 @@ func TestQueryEndpoints(t *testing.T) {
block := rr["block"].(map[string]interface{})
assert.Equal(float64(20), block["block_number"])

url, _ = url.Parse(fmt.Sprintf("http://localhost:%d/blockByTxId/f008dbfcb393fd40fa14a26fc2a0aaa01327d9483576e277a0a91b042bf7612f?fly-channel=default-channel&fly-signer=user1", g.config.HTTP.Port))
req = &http.Request{URL: url, Method: http.MethodGet, Header: header}
resp, _ = http.DefaultClient.Do(req)
assert.Equal(200, resp.StatusCode)
bodyBytes, _ = io.ReadAll(resp.Body)
result = utils.DecodePayload(bodyBytes).(map[string]interface{})
rr = result["result"].(map[string]interface{})
block = rr["block"].(map[string]interface{})
assert.Equal(float64(20), block["block_number"])

url, _ = url.Parse(fmt.Sprintf("http://localhost:%d/query?fly-channel=default-channel&fly-signer=user1&fly-chaincode=asset_transfer", g.config.HTTP.Port))
req = &http.Request{
URL: url,
Expand Down
7 changes: 7 additions & 0 deletions internal/rest/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func (r *router) addRoutes() {

r.httpRouter.GET("/chaininfo", r.queryChainInfo)
r.httpRouter.GET("/blocks/:blockNumber", r.queryBlock)
r.httpRouter.GET("/blockByTxId/:txId", r.queryBlockByTxId)

r.httpRouter.POST("/query", r.queryChaincode)
r.httpRouter.POST("/transactions", r.sendTransaction)
Expand Down Expand Up @@ -157,6 +158,12 @@ func (r *router) queryBlock(res http.ResponseWriter, req *http.Request, params h
r.syncDispatcher.GetBlock(res, req, params)
}

func (r *router) queryBlockByTxId(res http.ResponseWriter, req *http.Request, params httprouter.Params) {
log.Infof("--> %s %s", req.Method, req.URL)
// query requests are always synchronous
r.syncDispatcher.GetBlockByTxId(res, req, params)
}

func (r *router) queryChaincode(res http.ResponseWriter, req *http.Request, params httprouter.Params) {
log.Infof("--> %s %s", req.Method, req.URL)
// query requests are always synchronous
Expand Down
24 changes: 23 additions & 1 deletion internal/rest/sync/syncdispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type SyncDispatcher interface {
GetTxById(res http.ResponseWriter, req *http.Request, params httprouter.Params)
GetChainInfo(res http.ResponseWriter, req *http.Request, params httprouter.Params)
GetBlock(res http.ResponseWriter, req *http.Request, params httprouter.Params)
GetBlockByTxId(res http.ResponseWriter, req *http.Request, params httprouter.Params)
}

type syncDispatcher struct {
Expand Down Expand Up @@ -254,7 +255,28 @@ func (d *syncDispatcher) GetBlock(res http.ResponseWriter, req *http.Request, pa
return
}

rawblock, block, err1 := d.processor.GetRPCClient().QueryBlock(msg.Headers.ChannelID, msg.BlockNumber, msg.Headers.Signer)
rawblock, block, err1 := d.processor.GetRPCClient().QueryBlock(msg.Headers.ChannelID, msg.Headers.Signer, msg.BlockNumber, msg.BlockHash)
if err1 != nil {
errors.RestErrReply(res, req, err1, 500)
return
}
var reply messages.LedgerQueryResult
result := make(map[string]interface{})
result["raw"] = rawblock
result["block"] = block
reply.Result = result

sendReply(res, req, reply)
}

func (d *syncDispatcher) GetBlockByTxId(res http.ResponseWriter, req *http.Request, params httprouter.Params) {
msg, err := restutil.BuildGetBlockByTxIdMessage(res, req, params)
if err != nil {
errors.RestErrReply(res, req, err.Error, err.StatusCode)
return
}

rawblock, block, err1 := d.processor.GetRPCClient().QueryBlockByTxId(msg.Headers.ChannelID, msg.Headers.Signer, msg.TxId)
if err1 != nil {
errors.RestErrReply(res, req, err1, 500)
return
Expand Down
45 changes: 40 additions & 5 deletions internal/rest/utils/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package util

import (
"encoding/hex"
"encoding/json"
"fmt"
"net/http"
Expand Down Expand Up @@ -199,16 +200,50 @@ func BuildGetBlockMessage(res http.ResponseWriter, req *http.Request, params htt
if signer == "" {
return nil, NewRestError("Must specify the signer", 400)
}
blockNumber, err := strconv.ParseUint(params.ByName("blockNumber"), 10, 64)
if err != nil {
return nil, NewRestError("Invalid block number", 400)
}

msg := messages.GetBlock{}
msg.Headers.ID = msgId // this could be empty
msg.Headers.ChannelID = channel
msg.Headers.Signer = signer
msg.BlockNumber = blockNumber

blockNumberOrHash := params.ByName("blockNumber")
if len(blockNumberOrHash) == 64 {
// 32-byte hex string means this is a block hash
bytes, err := hex.DecodeString(blockNumberOrHash)
if err != nil {
return nil, NewRestError("Invalid block hash", 400)
}
msg.BlockHash = bytes
} else {
blockNumber, err := strconv.ParseUint(blockNumberOrHash, 10, 64)
if err != nil {
return nil, NewRestError("Invalid block number", 400)
}
msg.BlockNumber = blockNumber
}

return &msg, nil
}

func BuildGetBlockByTxIdMessage(res http.ResponseWriter, req *http.Request, params httprouter.Params) (*messages.GetBlockByTxId, *RestError) {
var body map[string]interface{}
err := req.ParseForm()
if err != nil {
return nil, NewRestError(err.Error(), 400)
}
channel := getFlyParam("channel", body, req)
if channel == "" {
return nil, NewRestError("Must specify the channel", 400)
}
signer := getFlyParam("signer", body, req)
if signer == "" {
return nil, NewRestError("Must specify the signer", 400)
}

msg := messages.GetBlockByTxId{}
msg.Headers.ChannelID = channel
msg.Headers.Signer = signer
msg.TxId = params.ByName("txId")

return &msg, nil
}
Expand Down
50 changes: 41 additions & 9 deletions mocks/fabric/client/rpc_client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 4c661d1

Please sign in to comment.