Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ampFactor in the BinaryReplayGenerator #340

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cachelib/cachebench/cache/CacheValue.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class CACHELIB_PACKED_ATTR CacheValue {
// static function to make sure the item size is at least sizeof CacheValue
// so we have enough space for fields used by sanity checks
static size_t getSize(size_t size) {
return std::max<size_t>(size, sizeof(CacheValue));
return std::max<size_t>((uint32_t)size, sizeof(CacheValue));
}
// static function to help to initialize CacheValue with given item's data
// pointer.
Expand Down
2 changes: 2 additions & 0 deletions cachelib/cachebench/util/Request.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ struct Request {
OpType getOp() const noexcept { return op.load(); }
void setOp(OpType o) noexcept { op = o; }

inline void updateKey(std::string_view k) { key = k; }

std::string_view key;

// size iterators in case this request is
Expand Down
63 changes: 51 additions & 12 deletions cachelib/cachebench/workload/BinaryKVReplayGenerator.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ class BinaryKVReplayGenerator : public ReplayGeneratorBase {
// per thread: the number of requests to run through
// the trace before jumping to next offset
static constexpr size_t kRunLength = 10000;
static constexpr size_t kMinKeySize = 16;
static constexpr size_t maxAmpFactor = 10000;

// StressorCtx keeps track of the state including the submission queues
// per stressor thread. Since there is only one request generator thread,
Expand All @@ -94,7 +96,11 @@ class BinaryKVReplayGenerator : public ReplayGeneratorBase {
reinterpret_cast<size_t*>(0),
OpType::kGet,
0),
runIdx_(0) {}
runIdx_(0) {
for (int i = 0; i < maxAmpFactor; i++) {
suffixes.push_back(folly::sformat("{:05d}", i));
}
}

bool isFinished() { return finished_.load(std::memory_order_relaxed); }
void markFinish() { finished_.store(true, std::memory_order_relaxed); }
Expand All @@ -103,10 +109,39 @@ class BinaryKVReplayGenerator : public ReplayGeneratorBase {
reqIdx_ = id_ * kRunLength;
}

uint32_t id_{0};
uint64_t reqIdx_{0};
std::string_view updateKeyWithAmpFactor(std::string_view key) {
if (ampFactor_ > 0) {
// trunkcate the key so we don't overflow
// max ampFactor is 10000, so we reserve at least 5 bytes
size_t keySize = key.size() > kMinKeySize ?
std::max<size_t>(key.size() - 5, kMinKeySize) :
key.size();
// copy the key into the currKey memory
std::memcpy(currKey_, key.data(), keySize);
std::memcpy(currKey_ + keySize, suffixes[ampFactor_].data(),
suffixes[ampFactor_].size());
// add null terminating
currKey_[keySize + suffixes[ampFactor_].size()] = '\0';
ampFactor_--;
} else {
// copy the key into the currKey memory
std::memcpy(currKey_, key.data(), key.size());
// add null terminating
currKey_[key.size()] = '\0';
}
return std::string_view(currKey_);
}

Request request_;
uint64_t reqIdx_{0};
uint64_t runIdx_{0};
uint32_t id_{0};
uint32_t ampFactor_{0};
std::vector<std::string> suffixes;
// space for the current key, with the
// ampFactor suffix
char currKey_[256];

// Thread that finish its operations mark it here, so we will skip
// further request on its shard
std::atomic<bool> finished_{false};
Expand Down Expand Up @@ -159,17 +194,26 @@ const Request& BinaryKVReplayGenerator::getReq(uint8_t,
BinaryRequest* prevReq = reinterpret_cast<BinaryRequest*>(*(r.requestId));
if (prevReq != nullptr && prevReq->repeats_ > 1) {
prevReq->repeats_ = prevReq->repeats_ - 1;
} else {
} else if (stressorCtx.ampFactor_ == 0) {
BinaryRequest* req = nullptr;
try {
req = binaryStream_.getNextPtr(stressorCtx.reqIdx_ + stressorCtx.runIdx_);
stressorCtx.ampFactor_ = ampFactor_;
// update the binary request index
if (stressorCtx.runIdx_ < kRunLength) {
stressorCtx.runIdx_++;
} else {
stressorCtx.runIdx_ = 0;
stressorCtx.reqIdx_ += numShards_ * kRunLength;
}
} catch (const EndOfTrace& e) {
if (config_.repeatTraceReplay) {
XLOGF_EVERY_MS(
INFO, 60'000,
"{} Reached the end of trace files. Restarting from beginning.",
stressorCtx.id_);
stressorCtx.resetIdx();
stressorCtx.ampFactor_ = ampFactor_;
req =
binaryStream_.getNextPtr(stressorCtx.reqIdx_ + stressorCtx.runIdx_);
} else {
Expand All @@ -180,20 +224,15 @@ const Request& BinaryKVReplayGenerator::getReq(uint8_t,

XDCHECK_NE(req, nullptr);
XDCHECK_LT(req->op_, 12);
auto key = req->getKey();
OpType op = static_cast<OpType>(req->op_);
req->valueSize_ = (req->valueSize_) * ampSizeFactor_;
r.update(key,
r.update(stressorCtx.updateKeyWithAmpFactor(req->getKey()),
const_cast<size_t*>(reinterpret_cast<size_t*>(&req->valueSize_)),
op,
req->ttl_,
reinterpret_cast<uint64_t>(req));
if (stressorCtx.runIdx_ < kRunLength) {
stressorCtx.runIdx_++;
} else {
stressorCtx.runIdx_ = 0;
stressorCtx.reqIdx_ += numShards_ * kRunLength;
}
} else {
r.updateKey(stressorCtx.updateKeyWithAmpFactor(prevReq->getKey()));
}
return r;
}
Expand Down
Loading