Skip to content

Commit

Permalink
feat(error-handler): Introduce error handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
pandomic committed Apr 26, 2019
1 parent 249af87 commit 2a1551b
Show file tree
Hide file tree
Showing 13 changed files with 219 additions and 20 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Global hooks for ActiveRecord 4.2.10.
- Overrides to ActiveRecord::Base, ActiveRecord::Relation, ActiveRecord::Persistence to serve callbacks.
- SNS, Kinesis and HTTP stream types.

## [0.1.1] - 2019-04-26
### Added
- Error handlers for streams
- Documentation for error handlers
- Tests for error handlers
- New method to build Message from json
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
active_record_streams (0.1.0)
active_record_streams (0.1.1)
activerecord (~> 4.2.10)
aws-sdk (~> 2.11.9)

Expand Down
62 changes: 60 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,25 @@
A small library to stream ActiveRecord's create/update/delete
events to AWS SNS topics, Kinesis streams or HTTP listeners.

# Table of contents

* [Version mappings](#version-mappings)
* [Warning](#warning)
* [How does it work](#how-does-it-work)
* [Installation](#installation)
* [Usage](#usage)
* [Setting up for AWS](#setting-up-for-aws)
* [Enabling streams](#enabling-streams)
* [Error handling](#error-handling)
* [Supported targets](#supported-targets)
* [License](#license)
* [Development](#development)
* [Contributing](#contributing)

## Version mappings

```
1.0.X - ActiveRecord 4.2.10
0.1.X - ActiveRecord 4.2.10
```

## Warning
Expand Down Expand Up @@ -96,7 +111,50 @@ ActiveRecordStreams.configure do |config|
end
```

## Supported targets:
### Error handling

It might happen that the message delivery fails. In such case you might
want to retry sending the message or even use a background processor like
`Sidekiq` to perform retires in automated way.

Every stream has an `error_handler` option which accepts `lambda/proc`
with `error_handler :: (Stream, TableName, Message, Error) -> *` signature.

**NOTE** that your consumer has to take care of duplicated messages.
Each message contains `EventID` which may help to identify duplications.

```ruby
# config/initializers/active_record_streams.rb

require 'active_record_streams'

class SampleHttpReSender
include Sidekiq::Worker

def perform(table_name, message_json)
message = ActiveRecordStreams::Message.from_json(message_json)
ActiveRecordStreams.config.streams.each do |stream|
stream.publish(table_name, message)
end
end
end

ActiveRecordStreams.configure do |config|
config.streams << ActiveRecordStreams::Publishers::HttpStream.new(
url: 'https://posteventshere.test',
error_handler: lambda do |stream, table_name, message, error|
# Do whatever you want here, you may also try to start a new
# thread or re-try publishing directly to stream using
# stream.publish(table_name, message)

# Try to schedule re-publishing with Sidekiq
SampleHttpReSender.perform_async(table_name, message.json)
end
)
end
```

## Supported targets

### ActiveRecordStreams::Publishers::SnsStream

Expand Down
14 changes: 14 additions & 0 deletions lib/active_record_streams/message.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# frozen_string_literal: true

require 'securerandom'

module ActiveRecordStreams
class Message
def initialize(table_name, action_type, old_image, new_image)
Expand All @@ -9,8 +11,20 @@ def initialize(table_name, action_type, old_image, new_image)
@new_image = new_image
end

def self.from_json(json)
parsed_json = JSON.parse(json)

new(
parsed_json['TableName'],
parsed_json['ActionType'],
parsed_json['OldImage'],
parsed_json['NewImage']
)
end

def json
{
EventID: SecureRandom.uuid,
TableName: @table_name,
ActionType: @action_type,
OldImage: @old_image,
Expand Down
28 changes: 28 additions & 0 deletions lib/active_record_streams/message_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,42 @@
let(:action_type) { :create }
let(:old_image) { { 'hello' => 'world' } }
let(:new_image) { { 'hello' => 'new world' } }
let(:uuid) { '26f4ee2c-20ce-481f-b9f2-833bf7e51c5e' }

subject do
described_class.new(table_name, action_type, old_image, new_image)
end

before do
allow(SecureRandom).to receive(:uuid).and_return(uuid)
end

describe '.from_json' do
let(:expected_value) do
{
EventID: uuid,
TableName: table_name,
ActionType: action_type,
OldImage: old_image,
NewImage: new_image
}
end

it 'creates a message from jsom' do
expect(described_class.from_json(expected_value.to_json))
.to be_a(described_class)
end

it 'creates a right schema from json' do
expect(described_class.from_json(expected_value.to_json).json)
.to eq(expected_value.to_json)
end
end

describe '#json' do
let(:expected_value) do
{
EventID: uuid,
TableName: table_name,
ActionType: action_type,
OldImage: old_image,
Expand Down
25 changes: 23 additions & 2 deletions lib/active_record_streams/publishers/http_stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,41 @@ module ActiveRecordStreams
module Publishers
class HttpStream
ANY_TABLE = '*'
SUCCESSFUL_CODE_REGEX = /\A2\d{2}\z/.freeze
DEFAULT_CONTENT_TYPE = 'application/json'

##
# @param [String] url
# @param [Hash] headers
# @param [String] table_name
# @param [Enumerable<String>] ignored_tables
# @param [Proc] error_handler

def initialize(
url:,
headers: {},
table_name: ANY_TABLE,
ignored_tables: []
ignored_tables: [],
error_handler: nil
)
@url = url
@headers = headers
@table_name = table_name
@ignored_tables = ignored_tables
@error_handler = error_handler
end

def publish(table_name, message)
return unless (any_table? && allowed_table?(table_name)) ||
table_name == @table_name

request.body = message.json
http.request(request)
response = http.request(request)
assert_response_code(response)
rescue StandardError => e
raise e unless @error_handler.is_a?(Proc)

@error_handler.call(self, table_name, message, e)
end

private
Expand Down Expand Up @@ -53,6 +68,12 @@ def uri
def headers
{ 'Content-Type': DEFAULT_CONTENT_TYPE }.merge(@headers)
end

def assert_response_code(response)
return if response.code.to_s.match(SUCCESSFUL_CODE_REGEX)

raise StandardError, response.body
end
end
end
end
25 changes: 22 additions & 3 deletions lib/active_record_streams/publishers/http_stream_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,20 @@
let(:desired_table_name) { '*' }
let(:actual_table_name) { 'lovely_records' }
let(:ignored_tables) { [] }
let(:error_handler) { nil }

let(:request) { double('body=': nil, request: nil) }
let(:http_client) { double('body=': nil, request: nil) }
let(:request) { double('body=': nil) }
let(:response) { double(code: 200) }
let(:http_client) { double('body=': nil, request: response) }
let(:message) { double(json: '{}') }

subject do
described_class.new(
url: url,
headers: headers,
table_name: desired_table_name,
ignored_tables: ignored_tables
ignored_tables: ignored_tables,
error_handler: error_handler
)
end

Expand Down Expand Up @@ -69,5 +72,21 @@
expect(http_client).not_to have_received(:request).with(request)
end
end

context 'error response' do
let(:response) { double(body: 'Error', code: 400) }
let(:error_handler) { Proc.new {} }

it 'calls error handler' do
expect(error_handler).to receive(:call) do |stream, table_name, message, error|
expect(stream).to eq(subject)
expect(table_name).to eq(actual_table_name)
expect(message).to eq(message)
expect(error.message).to eq('Error')
end

subject.publish(actual_table_name, message)
end
end
end
end
17 changes: 10 additions & 7 deletions lib/active_record_streams/publishers/kinesis_stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,20 @@ class KinesisStream
# @param [String] table_name
# @param [Enumerable<String>] ignored_tables
# @param [Hash] overrides
# @param [Proc] error_handler

def initialize(
stream_name:,
table_name: ANY_TABLE,
ignored_tables: [],
overrides: {}
overrides: {},
error_handler: nil
)
@stream_name = stream_name
@table_name = table_name
@ignored_tables = ignored_tables
@overrides = overrides
@error_handler = error_handler
end

##
Expand All @@ -34,12 +37,12 @@ def publish(table_name, message)
return unless (any_table? && allowed_table?(table_name)) ||
table_name == @table_name

client.publish(
@stream_name,
partition_key(table_name),
message.json,
@overrides
)
client.publish(@stream_name, partition_key(table_name),
message.json, @overrides)
rescue StandardError => e
raise e unless @error_handler.is_a?(Proc)

@error_handler.call(self, table_name, message, e)
end

private
Expand Down
23 changes: 22 additions & 1 deletion lib/active_record_streams/publishers/kinesis_stream_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
let(:actual_table_name) { 'lovely_records' }
let(:ignored_tables) { [] }
let(:overrides) { {} }
let(:error_handler) { nil }

let(:message) { double(json: '{}') }
let(:kinesis_client) { double(publish: nil) }
Expand All @@ -18,7 +19,8 @@
stream_name: stream_name,
table_name: desired_table_name,
ignored_tables: ignored_tables,
overrides: overrides
overrides: overrides,
error_handler: error_handler
)
end

Expand Down Expand Up @@ -89,5 +91,24 @@
subject.publish(actual_table_name, message)
end
end

context 'delivery error' do
let(:error_handler) { Proc.new {} }

it 'calls error handler' do
expect(kinesis_client).to receive(:publish) do
raise StandardError, 'Delivery error'
end

expect(error_handler).to receive(:call) do |stream, table_name, message, error|
expect(stream).to eq(subject)
expect(table_name).to eq(actual_table_name)
expect(message).to eq(message)
expect(error.message).to eq('Delivery error')
end

subject.publish(actual_table_name, message)
end
end
end
end
9 changes: 8 additions & 1 deletion lib/active_record_streams/publishers/sns_stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,20 @@ class SnsStream
# @param [String] table_name
# @param [Enumerable<String>] ignored_tables
# @param [Hash] overrides
# @param [Proc] error_handler

def initialize(
topic_arn:,
table_name: ANY_TABLE,
ignored_tables: [],
overrides: {}
overrides: {},
error_handler: nil
)
@topic_arn = topic_arn
@table_name = table_name
@ignored_tables = ignored_tables
@overrides = overrides
@error_handler = error_handler
end

##
Expand All @@ -32,6 +35,10 @@ def publish(table_name, message)
table_name == @table_name

client.publish(@topic_arn, message.json, @overrides)
rescue StandardError => e
raise e unless @error_handler.is_a?(Proc)

@error_handler.call(self, table_name, message, e)
end

private
Expand Down
Loading

0 comments on commit 2a1551b

Please sign in to comment.