Skip to content

Commit

Permalink
Log4j2 refactor cloud engine orchestration module (#8742)
Browse files Browse the repository at this point in the history
Co-authored-by: klaus.freitas.scclouds <[email protected]>
  • Loading branch information
KlausDornsbach and klaus.freitas.scclouds authored Apr 24, 2024
1 parent 4fe2b66 commit e330d76
Show file tree
Hide file tree
Showing 15 changed files with 745 additions and 1,174 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ public int compare(final Object o1, final Object o2) {
}
};

protected static String LOG_SEQ_FORMATTED_STRING;

protected final long _id;
protected String _name = null;
protected final ConcurrentHashMap<Long, Listener> _waitForList;
Expand Down Expand Up @@ -137,6 +139,7 @@ protected AgentAttache(final AgentManagerImpl agentMgr, final long id, final Str
_requests = new LinkedList<Request>();
_agentMgr = agentMgr;
_nextSequence = new Long(s_rand.nextInt(Short.MAX_VALUE)).longValue() << 48;
LOG_SEQ_FORMATTED_STRING = String.format("Seq %d-{}: {}", _id);
}

public synchronized long getNextSequence() {
Expand Down Expand Up @@ -197,9 +200,7 @@ protected void cancel(final Request req) {
}

protected synchronized void cancel(final long seq) {
if (logger.isDebugEnabled()) {
logger.debug(log(seq, "Cancelling."));
}
logger.debug(LOG_SEQ_FORMATTED_STRING, seq, "Cancelling.");
final Listener listener = _waitForList.remove(seq);
if (listener != null) {
listener.processDisconnect(_id, Status.Disconnected);
Expand All @@ -218,24 +219,16 @@ protected synchronized int findRequest(final long seq) {
return Collections.binarySearch(_requests, seq, s_seqComparator);
}

protected String log(final long seq, final String msg) {
return "Seq " + _id + "-" + seq + ": " + msg;
}

protected void registerListener(final long seq, final Listener listener) {
if (logger.isTraceEnabled()) {
logger.trace(log(seq, "Registering listener"));
}
logger.trace(LOG_SEQ_FORMATTED_STRING, seq, "Registering listener");
if (listener.getTimeout() != -1) {
s_listenerExecutor.schedule(new Alarm(seq), listener.getTimeout(), TimeUnit.SECONDS);
}
_waitForList.put(seq, listener);
}

protected Listener unregisterListener(final long sequence) {
if (logger.isTraceEnabled()) {
logger.trace(log(sequence, "Unregistering listener"));
}
logger.trace(LOG_SEQ_FORMATTED_STRING, sequence, "Unregistering listener");
return _waitForList.remove(sequence);
}

Expand Down Expand Up @@ -267,7 +260,7 @@ public int getNonRecurringListenersSize() {
final Listener monitor = entry.getValue();
if (!monitor.isRecurring()) {
//TODO - remove this debug statement later
logger.debug("Listener is " + entry.getValue() + " waiting on " + entry.getKey());
logger.debug("Listener is {} waiting on {}", entry.getValue(), entry.getKey());
nonRecurringListenersList.add(monitor);
}
}
Expand All @@ -290,15 +283,10 @@ public boolean processAnswers(final long seq, final Response resp) {
if (answers[0] != null && answers[0].getResult()) {
processed = true;
}
if (logger.isDebugEnabled()) {
logger.debug(log(seq, "Unable to find listener."));
}
logger.debug(LOG_SEQ_FORMATTED_STRING, seq, "Unable to find listener.");
} else {
processed = monitor.processAnswers(_id, seq, answers);
if (logger.isTraceEnabled()) {
logger.trace(log(seq, (processed ? "" : " did not ") + " processed "));
}

logger.trace(LOG_SEQ_FORMATTED_STRING, seq, (processed ? "" : " did not ") + " processed ");
if (!monitor.isRecurring()) {
unregisterListener(seq);
}
Expand All @@ -324,9 +312,7 @@ protected void cancelAllCommands(final Status state, final boolean cancelActive)
final Map.Entry<Long, Listener> entry = it.next();
it.remove();
final Listener monitor = entry.getValue();
if (logger.isDebugEnabled()) {
logger.debug(log(entry.getKey(), "Sending disconnect to " + monitor.getClass()));
}
logger.debug(LOG_SEQ_FORMATTED_STRING, entry.getKey(), "Sending disconnect to " + monitor.getClass());
monitor.processDisconnect(_id, state);
}
}
Expand Down Expand Up @@ -357,9 +343,8 @@ public void send(final Request req, final Listener listener) throws AgentUnavail
long seq = req.getSequence();
if (listener != null) {
registerListener(seq, listener);
} else if (logger.isDebugEnabled()) {
logger.debug(log(seq, "Routed from " + req.getManagementServerId()));
}
logger.debug(LOG_SEQ_FORMATTED_STRING, seq, "Routed from " + req.getManagementServerId());

synchronized (this) {
try {
Expand All @@ -381,16 +366,14 @@ public void send(final Request req, final Listener listener) throws AgentUnavail

if (req.executeInSequence() && _currentSequence == null) {
_currentSequence = seq;
if (logger.isTraceEnabled()) {
logger.trace(log(seq, " is current sequence"));
}
logger.trace(LOG_SEQ_FORMATTED_STRING, seq, " is current sequence");
}
} catch (AgentUnavailableException e) {
logger.info(log(seq, "Unable to send due to " + e.getMessage()));
logger.info(LOG_SEQ_FORMATTED_STRING, seq, "Unable to send due to " + e.getMessage());
cancel(seq);
throw e;
} catch (Exception e) {
logger.warn(log(seq, "Unable to send due to "), e);
logger.warn(LOG_SEQ_FORMATTED_STRING, seq, "Unable to send due to " + e.getMessage(), e);
cancel(seq);
throw new AgentUnavailableException("Problem due to other exception " + e.getMessage(), _id);
}
Expand All @@ -409,50 +392,41 @@ public Answer[] send(final Request req, final int wait) throws AgentUnavailableE
try {
answers = sl.waitFor(wait);
} catch (final InterruptedException e) {
logger.debug(log(seq, "Interrupted"));
logger.debug(LOG_SEQ_FORMATTED_STRING, seq, "Interrupted");
}
if (answers != null) {
if (logger.isDebugEnabled()) {
new Response(req, answers).logD("Received: ", false);
}
new Response(req, answers).logD("Received: ", false);
return answers;
}

answers = sl.getAnswers(); // Try it again.
if (answers != null) {
if (logger.isDebugEnabled()) {
new Response(req, answers).logD("Received after timeout: ", true);
}
new Response(req, answers).logD("Received after timeout: ", true);

_agentMgr.notifyAnswersToMonitors(_id, seq, answers);
return answers;
}

final Long current = _currentSequence;
if (current != null && seq != current) {
if (logger.isDebugEnabled()) {
logger.debug(log(seq, "Waited too long."));
}
logger.debug(LOG_SEQ_FORMATTED_STRING, seq, "Waited too long.");

throw new OperationTimedoutException(req.getCommands(), _id, seq, wait, false);
}

if (logger.isDebugEnabled()) {
logger.debug(log(seq, "Waiting some more time because this is the current command"));
}
logger.debug(LOG_SEQ_FORMATTED_STRING, seq, "Waiting some more time because this is the current command");
}

throw new OperationTimedoutException(req.getCommands(), _id, seq, wait * 2, true);
} catch (OperationTimedoutException e) {
logger.warn(log(seq, "Timed out on " + req.toString()));
logger.warn(LOG_SEQ_FORMATTED_STRING, seq, "Timed out on " + req.toString());
cancel(seq);
final Long current = _currentSequence;
if (req.executeInSequence() && (current != null && current == seq)) {
sendNext(seq);
}
throw e;
} catch (Exception e) {
logger.warn(log(seq, "Exception while waiting for answer"), e);
logger.warn(LOG_SEQ_FORMATTED_STRING, seq, "Exception while waiting for answer", e);
cancel(seq);
final Long current = _currentSequence;
if (req.executeInSequence() && (current != null && current == seq)) {
Expand All @@ -467,22 +441,16 @@ public Answer[] send(final Request req, final int wait) throws AgentUnavailableE
protected synchronized void sendNext(final long seq) {
_currentSequence = null;
if (_requests.isEmpty()) {
if (logger.isDebugEnabled()) {
logger.debug(log(seq, "No more commands found"));
}
logger.debug(LOG_SEQ_FORMATTED_STRING, seq, "No more commands found");
return;
}

Request req = _requests.pop();
if (logger.isDebugEnabled()) {
logger.debug(log(req.getSequence(), "Sending now. is current sequence."));
}
logger.debug(LOG_SEQ_FORMATTED_STRING, req.getSequence(), "Sending now. is current sequence.");
try {
send(req);
} catch (AgentUnavailableException e) {
if (logger.isDebugEnabled()) {
logger.debug(log(req.getSequence(), "Unable to send the next sequence"));
}
logger.debug(LOG_SEQ_FORMATTED_STRING, req.getSequence(), "Unable to send the next sequence");
cancel(req.getSequence());
}
_currentSequence = req.getSequence();
Expand Down
Loading

0 comments on commit e330d76

Please sign in to comment.