Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(stream): log error and continue running #29340

Merged
merged 2 commits into from
Dec 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 11 additions & 12 deletions source/libs/stream/src/streamDispatch.c
Original file line number Diff line number Diff line change
Expand Up @@ -954,7 +954,6 @@ static int32_t doTaskChkptStatusCheck(SStreamTask* pTask, void* param, int32_t n
int32_t vgId = pTask->pMeta->vgId;

if (pTmrInfo->launchChkptId != pActiveInfo->activeId) {
streamCleanBeforeQuitTmr(pTmrInfo, param);
stWarn("s-task:%s vgId:%d ready-msg send tmr launched by previous checkpoint procedure, checkpointId:%" PRId64
", quit",
id, vgId, pTmrInfo->launchChkptId);
Expand All @@ -963,13 +962,11 @@ static int32_t doTaskChkptStatusCheck(SStreamTask* pTask, void* param, int32_t n

// active checkpoint info is cleared for now
if ((pActiveInfo->activeId == 0) || (pActiveInfo->transId == 0) || (num == 0) || (pTask->chkInfo.startTs == 0)) {
streamCleanBeforeQuitTmr(pTmrInfo, param);
stWarn("s-task:%s vgId:%d active checkpoint may be cleared, quit from readyMsg send tmr", id, vgId);
return -1;
}

if (taosArrayGetSize(pTask->upstreamInfo.pList) != num) {
streamCleanBeforeQuitTmr(pTmrInfo, param);
stWarn("s-task:%s vgId:%d upstream number:%d not equals sent readyMsg:%d, quit from readyMsg send tmr", id,
vgId, (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList), num);
return -1;
Expand Down Expand Up @@ -998,6 +995,7 @@ static int32_t doFindNotConfirmUpstream(SArray** ppNotRspList, SArray* pList, in
void* p = taosArrayPush(pTmp, &pInfo->upstreamTaskId);
if (p == NULL) {
stError("s-task:%s vgId:%d failed to record not rsp task, code: out of memory", id, vgId);
taosArrayDestroy(pTmp);
return terrno;
} else {
stDebug("s-task:%s vgId:%d level:%d checkpoint-ready rsp from upstream:0x%x not confirmed yet", id, vgId, level,
Expand Down Expand Up @@ -1047,13 +1045,13 @@ static void doSendChkptReadyMsg(SStreamTask* pTask, SArray* pNotRspList, int64_t
}
}

static int32_t chkptReadyMsgSendHelper(SStreamTask* pTask, void* param, SArray* pNotRspList) {
static int32_t chkptReadyMsgSendHelper(SStreamTask* pTask, void* param, SArray** pNotRspList) {
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptReadyMsgTmr;
SArray* pList = pActiveInfo->pReadyMsgList;
int32_t num = taosArrayGetSize(pList);
int32_t vgId = pTask->pMeta->vgId;
int32_t checkpointId = pActiveInfo->activeId;
int64_t checkpointId = pActiveInfo->activeId;
const char* id = pTask->id.idStr;
int32_t notRsp = 0;

Expand All @@ -1062,18 +1060,17 @@ static int32_t chkptReadyMsgSendHelper(SStreamTask* pTask, void* param, SArray*
return code;
}

code = doFindNotConfirmUpstream(&pNotRspList, pList, num, vgId, pTask->info.taskLevel, id);
code = doFindNotConfirmUpstream(pNotRspList, pList, num, vgId, pTask->info.taskLevel, id);
if (code) {
streamCleanBeforeQuitTmr(pTmrInfo, param);
stError("s-task:%s failed to find not rsp checkpoint-ready downstream, code:%s, out of tmr", id, tstrerror(code));
return code;
}

notRsp = taosArrayGetSize(pNotRspList);
notRsp = taosArrayGetSize(*pNotRspList);
if (notRsp == 0) {
streamClearChkptReadyMsg(pActiveInfo);
} else {
doSendChkptReadyMsg(pTask, pNotRspList, checkpointId, pList);
doSendChkptReadyMsg(pTask, *pNotRspList, checkpointId, pList);
}

return code;
Expand Down Expand Up @@ -1137,10 +1134,12 @@ static void chkptReadyMsgSendMonitorFn(void* param, void* tmrId) {
}

streamMutexLock(&pActiveInfo->lock);
code = chkptReadyMsgSendHelper(pTask, param, pNotRspList);
code = chkptReadyMsgSendHelper(pTask, param, &pNotRspList);
streamMutexUnlock(&pActiveInfo->lock);

if (code != TSDB_CODE_SUCCESS) {
streamCleanBeforeQuitTmr(pTmrInfo, param);

streamMetaReleaseTask(pTask->pMeta, pTask);
taosArrayDestroy(pNotRspList);
return;
Expand Down Expand Up @@ -1176,7 +1175,7 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) {

int32_t num = taosArrayGetSize(pList);
if (taosArrayGetSize(pTask->upstreamInfo.pList) != num) {
stError("s-task:%s invalid number of sent readyMsg:%d to upstream:%d", id, num,
stError("s-task:%s invalid number of sent readyMsg:%d to upstream:%d not send chkpt-ready msg", id, num,
(int32_t)taosArrayGetSize(pTask->upstreamInfo.pList));
streamMutexUnlock(&pActiveInfo->lock);
return TSDB_CODE_STREAM_INTERNAL_ERROR;
Expand All @@ -1200,7 +1199,7 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) {
stError("s-task:%s failed to send checkpoint-ready msg, try nex time in 10s", id);
}
} else {
stError("s-task:%s failed to prepare the checkpoint-ready msg, try nex time in 10s", id);
stError("s-task:%s failed to prepare the checkpoint-ready msg, try next time in 10s", id);
}
}

Expand Down
3 changes: 1 addition & 2 deletions source/libs/stream/src/streamExec.c
Original file line number Diff line number Diff line change
Expand Up @@ -915,8 +915,7 @@ int32_t streamResumeTask(SStreamTask* pTask) {
while (1) {
code = doStreamExecTask(pTask);
if (code) {
stError("s-task:%s failed to exec stream task, code:%s", id, tstrerror(code));
return code;
stError("s-task:%s failed to exec stream task, code:%s, continue", id, tstrerror(code));
}
// check if continue
streamMutexLock(&pTask->lock);
Expand Down