Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a shared_ptr when creating a child stream #2036

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/oboe/AudioStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class AudioStream : public AudioStreamBase {
*/
explicit AudioStream(const AudioStreamBuilder &builder);

virtual ~AudioStream() = default;
virtual ~AudioStream();

/**
* Open a stream based on the current settings.
Expand Down
22 changes: 10 additions & 12 deletions include/oboe/AudioStreamBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -609,14 +609,13 @@ class AudioStreamBuilder : public AudioStreamBase {
}

/**
* Create and open a stream object based on the current settings.
*
* The caller owns the pointer to the AudioStream object
* and must delete it when finished.
* WARNING - this method has been disabled and will fail.
* Opening a stream using this method led to crashes so
* it is no longer available.
*
* @deprecated Use openStream(std::shared_ptr<oboe::AudioStream> &stream) instead.
* @param stream pointer to a variable to receive the stream address
* @return OBOE_OK if successful or a negative error code
* @return Result::ErrorUnavailable
*/
Result openStream(AudioStream **stream);

Expand All @@ -633,15 +632,13 @@ class AudioStreamBuilder : public AudioStreamBase {
Result openStream(std::shared_ptr<oboe::AudioStream> &stream);

/**
* Create and open a ManagedStream object based on the current builder state.
*
* The caller must create a unique ptr, and pass by reference so it can be
* modified to point to an opened stream. The caller owns the unique ptr,
* and it will be automatically closed and deleted when going out of scope.
* WARNING - this method has been disabled and will fail.
* Opening a stream using this method led to crashes so
* it is no longer available.
*
* @deprecated Use openStream(std::shared_ptr<oboe::AudioStream> &stream) instead.
* @param stream Reference to the ManagedStream (uniqueptr) used to keep track of stream
* @return OBOE_OK if successful or a negative error code.
* @return Result::ErrorUnavailable
*/
Result openManagedStream(ManagedStream &stream);

Expand All @@ -653,14 +650,15 @@ class AudioStreamBuilder : public AudioStreamBase {
* @param stream pointer to a variable to receive the stream address
* @return OBOE_OK if successful or a negative error code.
*/
Result openStreamInternal(AudioStream **streamPP);
Result openStreamInternal(std::shared_ptr<oboe::AudioStream> &stream);

/**
* @param other
* @return true if channels, format and sample rate match
*/
bool isCompatible(AudioStreamBase &other);


/**
* Create an AudioStream object. The AudioStream must be opened before use.
*
Expand Down
2 changes: 2 additions & 0 deletions include/oboe/AudioStreamCallback.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ class AudioStreamErrorCallback {
*
* This callback could be used to reopen a new stream on another device.
*
* Do NOT delete the audioStream. It is managed by a shared_ptr.
*
* @param audioStream pointer to the associated stream
* @param error
*/
Expand Down
47 changes: 22 additions & 25 deletions src/aaudio/AudioStreamAAudio.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,32 +60,28 @@ static aaudio_data_callback_result_t oboe_aaudio_data_callback_proc(
// This runs in its own thread.
// Only one of these threads will be launched from internalErrorCallback().
// It calls app error callbacks from a static function in case the stream gets deleted.
static void oboe_aaudio_error_thread_proc(AudioStreamAAudio *oboeStream,
Result error) {
static void oboe_aaudio_error_thread_proc_shared(std::shared_ptr<AudioStream> sharedStream,
Result error) {
LOGD("%s(,%d) - entering >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>", __func__, error);
AudioStreamErrorCallback *errorCallback = oboeStream->getErrorCallback();
#if 0
// This code is used to test for race conditions.
LOGE("%s() sleep for 5 seconds", __func__);
usleep(5*1000*1000);
LOGD("%s() - woke up ------------------------------------", __func__);
#endif
AudioStreamErrorCallback *errorCallback = sharedStream->getErrorCallback();
if (errorCallback == nullptr) return; // should be impossible
bool isErrorHandled = errorCallback->onError(oboeStream, error);

bool isErrorHandled = errorCallback->onError(sharedStream.get(), error);
if (!isErrorHandled) {
oboeStream->requestStop();
errorCallback->onErrorBeforeClose(oboeStream, error);
oboeStream->close();
// Warning, oboeStream may get deleted by this callback.
errorCallback->onErrorAfterClose(oboeStream, error);
sharedStream->requestStop();
errorCallback->onErrorBeforeClose(sharedStream.get(), error);
sharedStream->close();
errorCallback->onErrorAfterClose(sharedStream.get(), error);
}
LOGD("%s() - exiting <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<", __func__);
}

// This runs in its own thread.
// Only one of these threads will be launched from internalErrorCallback().
// Prevents deletion of the stream if the app is using AudioStreamBuilder::openSharedStream()
static void oboe_aaudio_error_thread_proc_shared(std::shared_ptr<AudioStream> sharedStream,
Result error) {
AudioStreamAAudio *oboeStream = reinterpret_cast<AudioStreamAAudio*>(sharedStream.get());
oboe_aaudio_error_thread_proc(oboeStream, error);
}

namespace oboe {

/*
Expand Down Expand Up @@ -126,23 +122,24 @@ void AudioStreamAAudio::internalErrorCallback(

oboeStream->mErrorCallbackResult = oboeResult;

// Prevents deletion of the stream if the app is using AudioStreamBuilder::openStream(shared_ptr)
std::shared_ptr<AudioStream> sharedStream = oboeStream->lockWeakThis();

// These checks should be enough because we assume that the stream close()
// will join() any active callback threads and will not allow new callbacks.
if (oboeStream->wasErrorCallbackCalled()) { // block extra error callbacks
LOGE("%s() multiple error callbacks called!", __func__);
return;
} else if (stream != oboeStream->getUnderlyingStream()) {
LOGW("%s() stream already closed or closing", __func__); // might happen if there are bugs
} else if (sharedStream) {
return;
}

// Prevents deletion of the stream if the app is using AudioStreamBuilder::openStream(shared_ptr)
std::shared_ptr<AudioStream> sharedStream = oboeStream->lockWeakThis();
if (sharedStream) {
// Handle error on a separate thread using shared pointer.
std::thread t(oboe_aaudio_error_thread_proc_shared, sharedStream, oboeResult);
t.detach();
} else {
// Handle error on a separate thread.
std::thread t(oboe_aaudio_error_thread_proc, oboeStream, oboeResult);
t.detach();
LOGE("%s() oboeStream not shared! Error callback ignored!", __func__);
}
}

Expand Down
6 changes: 6 additions & 0 deletions src/common/AudioStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ namespace oboe {
*/
AudioStream::AudioStream(const AudioStreamBuilder &builder)
: AudioStreamBase(builder) {
LOGD("Constructor for AudioStream at %p", this);
}

AudioStream::~AudioStream() {
// This is to help debug use after free bugs.
LOGD("Destructor for AudioStream at %p", this);
}

Result AudioStream::close() {
Expand Down
63 changes: 26 additions & 37 deletions src/common/AudioStreamBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,13 @@ bool AudioStreamBuilder::isCompatible(AudioStreamBase &other) {
}

Result AudioStreamBuilder::openStream(AudioStream **streamPP) {
LOGW("Passing AudioStream pointer deprecated, Use openStream(std::shared_ptr<oboe::AudioStream> &stream) instead.");
return openStreamInternal(streamPP);
LOGE("Passing AudioStream pointer no longer supported."
" Use openStream(std::shared_ptr<oboe::AudioStream> &stream) instead.");
*streamPP = nullptr;
return Result(Result::ErrorUnavailable);
}

Result AudioStreamBuilder::openStreamInternal(AudioStream **streamPP) {
Result AudioStreamBuilder::openStream(std::shared_ptr<AudioStream> &stream) {
auto result = isValidConfig();
if (result != Result::OK) {
LOGW("%s() invalid config. Error %s", __func__, oboe::convertToText(result));
Expand All @@ -103,28 +105,23 @@ Result AudioStreamBuilder::openStreamInternal(AudioStream **streamPP) {
LOGI("%s() %s -------- %s --------",
__func__, getDirection() == Direction::Input ? "INPUT" : "OUTPUT", getVersionText());

if (streamPP == nullptr) {
return Result::ErrorNull;
}
*streamPP = nullptr;

AudioStream *streamP = nullptr;
stream.reset();

// Maybe make a FilterInputStream.
AudioStreamBuilder childBuilder(*this);
// Check need for conversion and modify childBuilder for optimal stream.
bool conversionNeeded = QuirksManager::getInstance().isConversionNeeded(*this, childBuilder);
// Do we need to make a child stream and convert.
if (conversionNeeded) {
AudioStream *tempStream;
result = childBuilder.openStream(&tempStream);
std::shared_ptr<AudioStream> tempStream;
result = childBuilder.openStream(tempStream);
if (result != Result::OK) {
return result;
}

if (isCompatible(*tempStream)) {
// The child stream would work as the requested stream so we can just use it directly.
*streamPP = tempStream;
stream = tempStream;
return result;
} else {
AudioStreamBuilder parentBuilder = *this;
Expand All @@ -146,21 +143,22 @@ Result AudioStreamBuilder::openStreamInternal(AudioStream **streamPP) {
LOGI("%s() create a FilterAudioStream for data conversion.", __func__);
FilterAudioStream *filterStream = new FilterAudioStream(parentBuilder, tempStream);
result = filterStream->configureFlowGraph();
if (result != Result::OK) {
if (result != Result::OK) {
filterStream->close();
delete filterStream;
// Just open streamP the old way.
} else {
streamP = static_cast<AudioStream *>(filterStream);
stream.reset(static_cast<AudioStream *>(filterStream));
}
}
}

if (streamP == nullptr) {
streamP = build();
if (stream == nullptr) {
AudioStream *streamP = build();
if (streamP == nullptr) {
return Result::ErrorNull;
}
stream.reset(streamP);
}

// If MMAP has a problem in this case then disable it temporarily.
Expand All @@ -173,35 +171,37 @@ Result AudioStreamBuilder::openStreamInternal(AudioStream **streamPP) {
wasMMapTemporarilyDisabled = true;
}
}
result = streamP->open();

result = stream->open();
if (wasMMapTemporarilyDisabled) {
AAudioExtensions::getInstance().setMMapEnabled(wasMMapOriginallyEnabled); // restore original
}
if (result == Result::OK) {

if (result == Result::OK) {
int32_t optimalBufferSize = -1;
// Use a reasonable default buffer size.
if (streamP->getDirection() == Direction::Input) {
if (stream->getDirection() == Direction::Input) {
// For input, small size does not improve latency because the stream is usually
// run close to empty. And a low size can result in XRuns so always use the maximum.
optimalBufferSize = streamP->getBufferCapacityInFrames();
} else if (streamP->getPerformanceMode() == PerformanceMode::LowLatency
&& streamP->getDirection() == Direction::Output) { // Output check is redundant.
optimalBufferSize = streamP->getFramesPerBurst() *
optimalBufferSize = stream->getBufferCapacityInFrames();
} else if (stream->getPerformanceMode() == PerformanceMode::LowLatency
&& stream->getDirection() == Direction::Output) { // Output check is redundant.
optimalBufferSize = stream->getFramesPerBurst() *
kBufferSizeInBurstsForLowLatencyStreams;
}
if (optimalBufferSize >= 0) {
auto setBufferResult = streamP->setBufferSizeInFrames(optimalBufferSize);
auto setBufferResult = stream->setBufferSizeInFrames(optimalBufferSize);
if (!setBufferResult) {
LOGW("Failed to setBufferSizeInFrames(%d). Error was %s",
optimalBufferSize,
convertToText(setBufferResult.error()));
}
}

*streamPP = streamP;
// Save a weak_ptr in the stream for use with callbacks.
stream->setWeakThis(stream);
} else {
delete streamP;
stream.reset();
}
return result;
}
Expand All @@ -215,16 +215,5 @@ Result AudioStreamBuilder::openManagedStream(oboe::ManagedStream &stream) {
return result;
}

Result AudioStreamBuilder::openStream(std::shared_ptr<AudioStream> &sharedStream) {
sharedStream.reset();
AudioStream *streamptr;
auto result = openStreamInternal(&streamptr);
if (result == Result::OK) {
sharedStream.reset(streamptr);
// Save a weak_ptr in the stream for use with callbacks.
streamptr->setWeakThis(sharedStream);
}
return result;
}

} // namespace oboe
9 changes: 3 additions & 6 deletions src/common/FilterAudioStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ class FilterAudioStream : public AudioStream, AudioStreamCallback {
*
* @param builder containing all the stream's attributes
*/
FilterAudioStream(const AudioStreamBuilder &builder, AudioStream *childStream)
FilterAudioStream(const AudioStreamBuilder &builder,
std::shared_ptr<AudioStream> childStream)
: AudioStream(builder)
, mChildStream(childStream) {
// Intercept the callback if used.
Expand Down Expand Up @@ -66,10 +67,6 @@ class FilterAudioStream : public AudioStream, AudioStreamCallback {

virtual ~FilterAudioStream() = default;

AudioStream *getChildStream() const {
return mChildStream.get();
}

Result configureFlowGraph();

// Close child and parent.
Expand Down Expand Up @@ -216,7 +213,7 @@ class FilterAudioStream : public AudioStream, AudioStreamCallback {

private:

std::unique_ptr<AudioStream> mChildStream; // this stream wraps the child stream
std::shared_ptr<AudioStream> mChildStream;
std::unique_ptr<DataConversionFlowGraph> mFlowGraph; // for converting data
std::unique_ptr<uint8_t[]> mBlockingBuffer; // temp buffer for write()
double mRateScaler = 1.0; // ratio parent/child sample rates
Expand Down
12 changes: 6 additions & 6 deletions tests/testFullDuplexStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class TestFullDuplexStream : public ::testing::Test,
mOutputBuilder.setFormat(AudioFormat::Float);
mOutputBuilder.setDataCallback(this);

Result r = mOutputBuilder.openStream(&mOutputStream);
Result r = mOutputBuilder.openStream(mOutputStream);
ASSERT_EQ(r, Result::OK) << "Failed to open output stream " << convertToText(r);

mInputBuilder.setDirection(Direction::Input);
Expand All @@ -68,11 +68,11 @@ class TestFullDuplexStream : public ::testing::Test,
mInputBuilder.setBufferCapacityInFrames(mOutputStream->getBufferCapacityInFrames() * 2);
mInputBuilder.setSampleRate(mOutputStream->getSampleRate());

r = mInputBuilder.openStream(&mInputStream);
r = mInputBuilder.openStream(mInputStream);
ASSERT_EQ(r, Result::OK) << "Failed to open input stream " << convertToText(r);

setInputStream(mInputStream);
setOutputStream(mOutputStream);
setInputStream(mInputStream.get());
setOutputStream(mOutputStream.get());
}

void startStream() {
Expand Down Expand Up @@ -107,8 +107,8 @@ class TestFullDuplexStream : public ::testing::Test,

AudioStreamBuilder mInputBuilder;
AudioStreamBuilder mOutputBuilder;
AudioStream *mInputStream = nullptr;
AudioStream *mOutputStream = nullptr;
std::shared_ptr<AudioStream> mInputStream;
std::shared_ptr<AudioStream> mOutputStream;
std::atomic<int32_t> mCallbackCount{0};
std::atomic<int32_t> mGoodCallbackCount{0};
};
Expand Down