Skip to content

Commit

Permalink
Merge pull request #216 from zhujun98/dev
Browse files Browse the repository at this point in the history
Fix race condition in DaqBuffer
  • Loading branch information
zhujun98 authored Sep 26, 2023
2 parents eef89f0 + 4438221 commit dc3929e
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 8 deletions.
10 changes: 6 additions & 4 deletions recon/include/recon/daq/daq_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#ifndef RECON_DAQBUFFER_H
#define RECON_DAQBUFFER_H

#include <atomic>
#include <chrono>
#include <limits>
#include <memory>
Expand All @@ -31,7 +32,7 @@ class DaqBuffer {
};

size_t max_len_;
size_t len_ {0};
std::atomic<size_t> len_ {0};

mutable std::mutex head_mtx_;
std::unique_ptr<Item> head_;
Expand All @@ -49,7 +50,7 @@ class DaqBuffer {
std::unique_ptr<Item> popHead() {
std::unique_ptr<Item> old_head = std::move(head_);
head_ = std::move(old_head->next);
--len_;
len_--;
return old_head;
}

Expand Down Expand Up @@ -80,7 +81,7 @@ class DaqBuffer {
Item* const new_tail = new_item.get();
tail_->next = std::move(new_item);
tail_ = new_tail;
++len_;
len_++;
}
cv_.notify_one();
return true;
Expand All @@ -93,7 +94,8 @@ class DaqBuffer {

bool waitAndPop(T& value) {
std::unique_lock<std::mutex> lk(head_mtx_);
if (cv_.wait_for(lk, std::chrono::milliseconds(100), [&] { return head_.get() != tail(); })) {
if (cv_.wait_for(lk, std::chrono::milliseconds(100),
[&] { return head_.get() != tail(); })) {
value = std::move(head_->data);
popHead();
return true;
Expand Down
2 changes: 1 addition & 1 deletion recon/include/recon/daq/zmq_daq_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class ZmqDaqClient : public DaqClientInterface {
public:

static constexpr size_t K_BUFFER_SIZE = 1000;
static constexpr size_t K_MONITOR_EVERY = 100;
static constexpr size_t K_MONITOR_EVERY = 1000;

protected:

Expand Down
2 changes: 1 addition & 1 deletion recon/src/daq/std_daq_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ StdDaqClient::parseData(const nlohmann::json& meta, const zmq::message_t& data)
size_t num_cols = meta["shape"][1];
if (!isDataShapeValid(num_rows, num_cols)) return std::nullopt;

assert(update.size() == sizeof(RawDtype) * n_rows * n_cols);
assert(data.size() == sizeof(RawDtype) * num_rows * num_cols);
return Projection<>{proj_type, frame, num_cols, num_rows, data.data(), data.size()};
}

Expand Down
9 changes: 7 additions & 2 deletions recon/src/monitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ void Monitor::countFlat() {
void Monitor::countTomogram() {
++num_tomograms_;

#if (VERBOSITY >= 1)
spdlog::info("{} tomograms reconstructed", num_tomograms_);
#endif

#if (VERBOSITY >= 2)
if (num_tomograms_ % report_tomo_throughput_every_ == 0) {
// The number for the first <report_tomo_throughput_every_> tomograms
// underestimates the throughput!
Expand All @@ -57,11 +62,11 @@ void Monitor::countTomogram() {
end - tomo_start_).count();
double throughput = scan_byte_size_ * report_tomo_throughput_every_ / dt;
double throughput_per_tomo = 1000000. * report_tomo_throughput_every_ / dt;
spdlog::info("{} tomograms reconstructed", num_tomograms_);
spdlog::info("[Bench] Throughput (averaged over the last {} tomograms): {:.1f} (MB/s) / {:.1f} (tomo/s)",
spdlog::info("Throughput (averaged over the last {} tomograms): {:.1f} (MB/s) / {:.1f} (tomo/s)",
report_tomo_throughput_every_, throughput, throughput_per_tomo);
tomo_start_ = end;
}
#endif
}

void Monitor::summarize() const {
Expand Down

0 comments on commit dc3929e

Please sign in to comment.