How to consume DynamoDB JSON and get map[string]types.AttributeValue using json.Unmarshal? #1652
-
Confirm by changing [ ] to [x] below:
When working with AWS SDK for Go prior to v2, I had a solution to consume a DynamoDB Streams Events. // Item represents a dynamoDB record in the configurations table
type Item struct {
PK string `json:"pk"`
Version int `json:"version,string"`
ManifestVersion string `json:"manifestVersion"`
Region string `json:"region"`
K8sClusterPrefix string `json:"k8sClusterPrefix"`
AZDependent map[string]interface{} `json:"azDependent,omitempty"`
Parameters map[string]interface{} `json:"parameters"`
AuthoredBy string `json:"authoredBy"`
AuthoredWhen string `json:"authoredWhen"`
ItemState ItemState `json:"itemState"`
ItemStateWhen string `json:"itemStateWhen"`
ItemStateAZ string `json:"itemStateAZ,omitempty"`
Message string `json:"message,omitempty"`
} I then had the following to consume a Raw DynamoDB Streams Event // Event represents a DynamoDB Streams Event written to SQS
type Event struct {
Records []*EventRecord `json:"Records"`
}
// EventRecord represents a DynamoDB Streams Event Record
type EventRecord struct {
Change StreamRecord `json:"dynamodb"`
EventName string `json:"eventName"`
}
// StreamRecord represents a DynamoDB Streams Record
type StreamRecord struct {
Keys map[string]*dynamodb.AttributeValue `json:"Keys,omitempty"`
NewImage map[string]*dynamodb.AttributeValue `json:"NewImage,omitempty"`
OldImage map[string]*dynamodb.AttributeValue `json:"OldImage,omitempty"`
SequenceNumber string `json:"SequenceNumber"`
SizeBytes int64 `json:"SizeBytes"`
} And finally, to consume messages from the SQS queue for {
result, err := q.client.ReceiveMessageWithContext(q.ctx, &sqs.ReceiveMessageInput{
QueueUrl: sdkaws.String(env.SQSQueueURL()),
WaitTimeSeconds: sdkaws.Int64(maxLongPollSeconds),
MaxNumberOfMessages: sdkaws.Int64(1),
VisibilityTimeout: sdkaws.Int64(visibilityTimeoutSeconds),
})
if err != nil {
q.lasterr = err
return
}
for _, m := range result.Messages {
if m.Body == nil {
q.deleteMessage(m.ReceiptHandle)
continue
}
var event configurations.Event
if err = json.Unmarshal([]byte(*m.Body), &event); err != nil {
q.deleteMessage(m.ReceiptHandle)
continue
}
for _, eventRecord := range event.Records {
select {
case q.eventRecordCh <- eventRecord:
case <-q.ctx.Done():
logrus.Info("queues.processor: context terminated")
q.lasterr = q.ctx.Err()
return
}
}
q.deleteMessage(m.ReceiptHandle)
}
} Then the consumer of the err := dynamodbattribute.UnmarshalMap(eventRecord.Change.NewImage, &item) if a When porting over to AWS SDK for go v2 the Event structures became // Event represents a DynamoDB Streams Event written to SQS
type Event struct {
Records []*EventRecord `json:"Records"`
}
// EventRecord represents a DynamoDB Streams Event Record
type EventRecord struct {
Change StreamRecord `json:"dynamodb"`
EventName string `json:"eventName"`
}
// StreamRecord represents a DynamoDB Streams Record
type StreamRecord struct {
Keys map[string]types.AttributeValue `json:"Keys,omitempty"`
NewImage map[string]types.AttributeValue `json:"NewImage,omitempty"`
OldImage map[string]types.AttributeValue `json:"OldImage,omitempty"`
SequenceNumber string `json:"SequenceNumber"`
SizeBytes int64 `json:"SizeBytes"`
} And with the following test driver result, err := client.ReceiveMessage(context.TODO(), &sqs.ReceiveMessageInput{
QueueUrl: aws.String(env.SQSQueueURL()),
WaitTimeSeconds: 20,
MaxNumberOfMessages: 1,
VisibilityTimeout: 30,
})
if err != nil {
panic(err)
}
for i, m := range result.Messages {
var event configurations.Event
if err = json.Unmarshal([]byte(*m.Body), &event); err != nil {
fmt.Printf("Error: %v\n", err)
continue
}
fmt.Printf("result.Message[%d]: %#v\n", i, event)
}
Error: json: cannot unmarshal object into Go struct field StreamRecord.Records.dynamodb.Keys of type types.AttributeValue Mind you, I have working ported dynamodb code to read and write the table, // Item represents a dynamoDB record in the configurations table
type Item struct {
Feed string `json:"feed" dynamodbav:"feed"`
Microservice string `json:"microservice" dynamodbav:"microservice"`
Version int `json:"version,string" dynamodbav:"version,string"`
ManifestVersion string `json:"manifestVersion" dynamodbav:"manifestVersion"`
Region string `json:"region" dynamodbav:"region"`
K8sCluster string `json:"k8sCluster" dynamodbav:"k8sCluster"`
AZDependent map[string]interface{} `json:"azDependent,omitempty" dynamodbav:"azDependent,omitempty"`
Parameters map[string]interface{} `json:"parameters" dynamodbav:"parameters"`
AuthoredBy string `json:"authoredBy" dynamodbav:"authoredBy"`
AuthoredWhen string `json:"authoredWhen" dynamodbav:"authoredWhen"`
ItemState ItemState `json:"itemState" dynamodbav:"itemState"`
ItemStateWhen string `json:"itemStateWhen" dynamodbav:"itemStateWhen"`
ItemStateAZ string `json:"itemStateAZ,omitempty" dynamodbav:"itemStateAZ,omitempty"`
Message string `json:"message,omitempty" dynamodbav:"message,omitempty"`
}
// ItemState is string
type ItemState string The magic that In AWS SDK for go v2, is there a public way to go from DynamoDB JSON to JSON? |
Beta Was this translation helpful? Give feedback.
Replies: 12 comments 7 replies
-
My solution was to port the AttributeValue struct and methods from api.go in aws-sdk-go. I also threw in the Prettify internal awsutil function and helper; everything else in api.go was stripped out. I created a dynamodbattribute directory and copied the following files from aws-sdk-go as well: decode.go, encode.go, field.go, fields_go1.9.go, and tag.go; modifying them to use the parent dynamodb package and the aws package from aws-sdk-go-v2. dynamodbevent
├── dto.go
└── dynamodb
├── api.go
├── doc.go
└── dynamodbattribute
├── decode.go
├── encode.go
├── field.go
├── fields_go1.9.go
└── tag.go I restructured, moving my Event and EventRecord and StreamRecord structs to a dynamodbevent package which has the dynamodb subpackage as illustrated above. So with // Event represents a DynamoDB Streams Event written to SQS
type Event struct {
Records []*EventRecord `json:"Records"`
}
// EventRecord represents a DynamoDB Streams Event Record
type EventRecord struct {
EventName string `json:"eventName"`
Change StreamRecord `json:"dynamodb"`
}
// StreamRecord represents a DynamoDB Streams Record
type StreamRecord struct {
Keys map[string]*dynamodb.AttributeValue `json:"Keys,omitempty"`
NewImage map[string]*dynamodb.AttributeValue `json:"NewImage,omitempty"`
OldImage map[string]*dynamodb.AttributeValue `json:"OldImage,omitempty"`
SequenceNumber string `json:"SequenceNumber"`
SizeBytes int64 `json:"SizeBytes"`
} the following snippet from my test driver now works result, err := client.ReceiveMessage(context.TODO(), &sqs.ReceiveMessageInput{
QueueUrl: aws.String(env.SQSQueueURL()),
WaitTimeSeconds: 20,
MaxNumberOfMessages: 1,
VisibilityTimeout: 30,
})
if err != nil {
panic(err)
}
for _, m := range result.Messages {
fmt.Printf("\n\nMessage Body: %v\n\n", *m.Body)
var event dynamodbevent.Event
if err = json.Unmarshal([]byte(*m.Body), &event); err != nil {
fmt.Printf("Error: %v\n", err)
continue
}
for _, record := range event.Records {
if len(record.Change.NewImage) > 0 {
var item configurations.Item
err := dynamodbattribute.UnmarshalMap(record.Change.NewImage, &item)
if err != nil {
panic(err)
}
ij, err := json.Marshal(&item)
if err != nil {
panic(err)
}
fmt.Printf("NewImage: %v\n", string(ij))
}
if len(record.Change.OldImage) > 0 {
var item configurations.Item
err := dynamodbattribute.UnmarshalMap(record.Change.OldImage, &item)
if err != nil {
panic(err)
}
ij, err := json.Marshal(&item)
if err != nil {
panic(err)
}
fmt.Printf("OldImage: %v\n", string(ij))
}
}
} |
Beta Was this translation helpful? Give feedback.
-
When aws-sdk-go-v2 supports converting DynamoDB JSON to map[string]types.AttributeValue via |
Beta Was this translation helpful? Give feedback.
-
Check out https://pkg.go.dev/github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue. It looks like this functionality was just moved around in v2 (out of the service |
Beta Was this translation helpful? Give feedback.
-
I can successfully Scan, Query, GetItem from DynamoDB with this new version of the AWS SDK for Go. The output of such queries are The issue is getting |
Beta Was this translation helpful? Give feedback.
-
The conversion of DynamoDB JSON is hidden within V2's implementation of both DynamoDB and DynamoDB Streams... I'm not reading from Shards... I have a Lamda that is called by DynamoDB Streams with an Event having one or more StreamRecords. My Lambda writes the Event to an SNS topic, which forwards to an SQS subscription. I'm reading these Events from SQS and am getting a mixture of JSON and DynamoDB JSON. At some point, I'm trying to consume NewImage and OldImage (both DynamoDB JSON) from the StreamRecord into a map[string]types.AttributeValue, which fails (using json.Unmarshal). This worked when json.Unmarshaling into map[string]*dynamodbattribute.AttributeValue in V1 of the SDK. Having the Map, of attribute values, I can use the SDK to UnmarshalMap. |
Beta Was this translation helpful? Give feedback.
-
My main use case is the same as the OPs (reading from a dynamodb stream lambda), but I ran into this issue upgrading from one of the earlier versions of the v2 SDK. It's super frustrating and my colleague spent quite a few hours pulling his hair out. I did some digging through past issues and as far as I can tell there's no reason to not support json as before. Right now I'm trying to write a function to do
and it just seems to be needlessly convoluted. |
Beta Was this translation helpful? Give feedback.
-
tl;dr: A solution is provided in the last paragraph. I have the same use case as @marc-ostrow and @jspri of a lambda triggered by a dynamodb stream which then needs to interact with the sdk. I want to be able to use the NewImage/OldImage of the event for another I was trying to use There are no parse errors if you use Essentially, the types in
The idea of In practise, this is inconvenient for anyone writing aws lambdas in Go. I think the following options should be explored: a) It would be great if a maintainer from each of For anyone who just wants a solution, I have adapted the code for |
Beta Was this translation helpful? Give feedback.
-
I also would like to see a SDK v2 DDB stream event type to be used in lambdas. It seems like such is missing from
But there is no analogue to the DynamoDBEvent type that holds a slice of Records. To work around I adapted Luke-Davies gist above ☝. Thank you @Luke-Davies! |
Beta Was this translation helpful? Give feedback.
-
I have had the same problem and I have found a very simple solution that is based on simply converting the types. Next, I will show you the conversion code. package aws_lambda
import (
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
)
func UnmarshalDynamoEventsMap(
record map[string]events.DynamoDBAttributeValue, out interface{}) error {
asTypesMap := DynamoDbEventsMapToTypesMap(record)
err := attributevalue.UnmarshalMap(asTypesMap, out)
if err != nil {
return err
}
return nil
}
func DynamoDbEventsMapToTypesMap(
record map[string]events.DynamoDBAttributeValue) map[string]types.AttributeValue {
resultMap := make(map[string]types.AttributeValue)
for key, rec := range record {
resultMap[key] = DynamoDbEventsToTypes(rec)
}
return resultMap
}
// DynamoDbEventsToTypes relates the dynamo event received by AWS Lambda with the data type that is
// used in the Amazon SDK V2 to deal with DynamoDB data.
// This function is necessary because Amazon does not provide any kind of solution to make this
// relationship between types of data.
func DynamoDbEventsToTypes(record events.DynamoDBAttributeValue) types.AttributeValue {
var val types.AttributeValue
switch record.DataType() {
case events.DataTypeBinary:
val = &types.AttributeValueMemberB{
Value: record.Binary(),
}
case events.DataTypeBinarySet:
val = &types.AttributeValueMemberBS{
Value: record.BinarySet(),
}
case events.DataTypeBoolean:
val = &types.AttributeValueMemberBOOL{
Value: record.Boolean(),
}
case events.DataTypeList:
var items []types.AttributeValue
for _, value := range record.List() {
items = append(items, DynamoDbEventsToTypes(value))
}
val = &types.AttributeValueMemberL{
Value: items,
}
case events.DataTypeMap:
items := make(map[string]types.AttributeValue)
for k, v := range record.Map() {
items[k] = DynamoDbEventsToTypes(v)
}
val = &types.AttributeValueMemberM{
Value: items,
}
case events.DataTypeNull:
val = nil
case events.DataTypeNumber:
val = &types.AttributeValueMemberN{
Value: record.Number(),
}
case events.DataTypeNumberSet:
val = &types.AttributeValueMemberNS{
Value: record.NumberSet(),
}
case events.DataTypeString:
val = &types.AttributeValueMemberS{
Value: record.String(),
}
case events.DataTypeStringSet:
val = &types.AttributeValueMemberSS{
Value: record.StringSet(),
}
}
return val
} |
Beta Was this translation helpful? Give feedback.
-
I wanted to be able to copy/paste some DynamoDB JSON and use it for testing. The shortest code I could come up with to access the internal DynamoDB JSON deserializer in the AWS client was the following: func TestStuff(t *testing.T) {
// Create a dummy DynamoDB client and request a DynamoDB item with the minimal required options.
c := &dynamodb.Client{}
x := "X"
o, err := c.GetItem(context.Background(), &dynamodb.GetItemInput{TableName: &x, Key: map[string]types.AttributeValue{}}, func(o *dynamodb.Options) {
o.Retryer = aws.NopRetryer{}
o.Region = "X"
// Pass in the DynamoDB JSON to be deserialized as the "Item" value.
o.HTTPClient = staticResponseClient{[]byte(`{"ConsumedCapacity": {}, "Item": {"Hi": {"S": "There"}}}`)}
})
// You can now access the values as the AttributeValue type:
fmt.Println(o.Item)
}
type staticResponseClient struct {
data []byte
}
func (c staticResponseClient) Do(_ *http.Request) (*http.Response, error) {
return &http.Response{StatusCode: 200, Body: io.NopCloser(bytes.NewReader(c.data))}, nil
} |
Beta Was this translation helpful? Give feedback.
-
FTR we are currently migrating our v1 AWS SDK to v2 in advance of end of support and are hitting this particular issue. It's amazing that the powers that be do not understand the validity of the use case (one of ours is implementation of their best practice around large entries via the use of S3, where we want to make that a transparent client for the developer experience beyond initial wiring). |
Beta Was this translation helpful? Give feedback.
-
Hi there, Thanks everyone for the engagement on the thread. We can see that there is a valid use case for implementing this. I've created a feature request and added it to our backlog. I have marked @Luke-Davies's response as the answer as it seems to have addressed the OP's initial concern with a workaround. Since we don't monitor answered discussions as closely, I ask that if you have concerns / questions please communicate with the dev team on the newly created feature request instead. All the best, |
Beta Was this translation helpful? Give feedback.
tl;dr: A solution is provided in the last paragraph.
I have the same use case as @marc-ostrow and @jspri of a lambda triggered by a dynamodb stream which then needs to interact with the sdk. I want to be able to use the NewImage/OldImage of the event for another
PutItem
call and I also want theattributevalue.UnmarshalMap
convenience that the SDK provides.I was trying to use
("github.com/aws/aws-sdk-go-v2/service/dynamodbstreams/types").Record
in the event struct I defined but this has two issues - the first being the one highlighted by OP and the second being an issue that is present in the v1 SDK:ApproximateCreationDateTime
cannot be unmarshalled into *time.Time because the value is u…