Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lock singleton tasks #222

Merged
merged 3 commits into from
Jul 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion src/main/java/org/dependencytrack/common/ConfigKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,16 @@ public enum ConfigKey implements Config.Key {
KAFKA_TOPIC_PREFIX("api.topic.prefix", ""),
KAFKA_STREAMS_METRICS_RECORDING_LEVEL("kafka.streams.metrics.recording.level", "INFO"),
TASK_PORTFOLIO_LOCK_AT_MOST_FOR("task.metrics.portfolio.lockAtMostForInMillis", "900000"),
TASK_PORTFOLIO_LOCK_AT_LEAST_FOR("task.metrics.portfolio.lockAtLeastForInMillis", "3000");
TASK_PORTFOLIO_LOCK_AT_LEAST_FOR("task.metrics.portfolio.lockAtLeastForInMillis", "3000"),
TASK_METRICS_VULNERABILITY_LOCK_AT_MOST_FOR("task.metrics.vulnerability.lockAtMostForInMillis", "900000"),
TASK_METRICS_VULNERABILITY_LOCK_AT_LEAST_FOR("task.metrics.vulnerability.lockAtLeastForInMillis", "3000"),
TASK_MIRROR_EPSS_LOCK_AT_MOST_FOR("task.mirror.epss.lockAtMostForInMillis", "900000"),
TASK_MIRROR_EPSS_LOCK_AT_LEAST_FOR("task.mirror.epss.lockAtLeastForInMillis", "3000"),
TASK_COMPONENT_IDENTIFICATION_LOCK_AT_MOST_FOR("task.componentIdentification.lockAtMostForInMillis", "900000"),
TASK_COMPONENT_IDENTIFICATION_LOCK_AT_LEAST_FOR("task.componentIdentification.lockAtLeastForInMillis", "3000"),
TASK_LDAP_SYNC_LOCK_AT_MOST_FOR("task.ldapSync.lockAtMostForInMillis", "900000"),
TASK_LDAP_SYNC_LOCK_AT_LEAST_FOR("task.ldapSync.lockAtLeastForInMillis", "3000");

private final String propertyName;
private final Object defaultValue;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import alpine.event.LdapSyncEvent;
import alpine.event.framework.EventService;
import alpine.event.framework.SingleThreadedEventService;
import alpine.server.tasks.LdapSyncTask;
import org.dependencytrack.RequirementsVerifier;
import org.dependencytrack.tasks.BomUploadProcessingTask;
import org.dependencytrack.tasks.CallbackTask;
Expand All @@ -34,6 +33,7 @@
import org.dependencytrack.tasks.IndexTask;
import org.dependencytrack.tasks.InternalComponentIdentificationTask;
import org.dependencytrack.tasks.KennaSecurityUploadTask;
import org.dependencytrack.tasks.LdapSyncTaskWrapper;
import org.dependencytrack.tasks.NistMirrorTask;
import org.dependencytrack.tasks.OsvDownloadTask;
import org.dependencytrack.tasks.PolicyEvaluationTask;
Expand Down Expand Up @@ -78,7 +78,7 @@ public void contextInitialized(final ServletContextEvent event) {
}
EVENT_SERVICE.subscribe(BomUploadEvent.class, BomUploadProcessingTask.class);
EVENT_SERVICE.subscribe(VexUploadEvent.class, VexUploadProcessingTask.class);
EVENT_SERVICE.subscribe(LdapSyncEvent.class, LdapSyncTask.class);
EVENT_SERVICE.subscribe(LdapSyncEvent.class, LdapSyncTaskWrapper.class);
EVENT_SERVICE.subscribe(GitHubAdvisoryMirrorEvent.class, GitHubAdvisoryMirrorTask.class);
EVENT_SERVICE.subscribe(OsvMirrorEvent.class, OsvDownloadTask.class);
EVENT_SERVICE.subscribe(VulnDbSyncEvent.class, VulnDbSyncTask.class);
Expand Down Expand Up @@ -116,7 +116,7 @@ public void contextDestroyed(final ServletContextEvent event) {

EVENT_SERVICE.unsubscribe(BomUploadProcessingTask.class);
EVENT_SERVICE.unsubscribe(VexUploadProcessingTask.class);
EVENT_SERVICE.unsubscribe(LdapSyncTask.class);
EVENT_SERVICE.unsubscribe(LdapSyncTaskWrapper.class);
EVENT_SERVICE.unsubscribe(GitHubAdvisoryMirrorTask.class);
EVENT_SERVICE.unsubscribe(OsvDownloadTask.class);
EVENT_SERVICE.unsubscribe(VulnDbSyncTask.class);
Expand Down
20 changes: 14 additions & 6 deletions src/main/java/org/dependencytrack/tasks/EpssMirrorTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import alpine.model.ConfigProperty;
import alpine.notification.NotificationLevel;
import org.apache.commons.io.FileUtils;
import org.apache.http.HttpStatus;
import org.apache.http.StatusLine;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
Expand All @@ -36,7 +37,7 @@
import org.dependencytrack.notification.NotificationScope;
import org.dependencytrack.parser.epss.EpssParser;
import org.dependencytrack.persistence.QueryManager;
import org.apache.http.HttpStatus;
import org.dependencytrack.util.LockProvider;
import org.dependencytrack.util.NotificationUtil;

import java.io.Closeable;
Expand All @@ -50,6 +51,7 @@

import static org.dependencytrack.model.ConfigPropertyConstants.VULNERABILITY_SOURCE_EPSS_ENABLED;
import static org.dependencytrack.model.ConfigPropertyConstants.VULNERABILITY_SOURCE_EPSS_FEEDS_URL;
import static org.dependencytrack.tasks.LockName.EPSS_MIRROR_TASK_LOCK;

public class EpssMirrorTask implements LoggableSubscriber {

Expand All @@ -70,7 +72,7 @@ public EpssMirrorTask() {
this.isEnabled = enabled != null && Boolean.valueOf(enabled.getPropertyValue());
this.feedUrl = qm.getConfigProperty(VULNERABILITY_SOURCE_EPSS_FEEDS_URL.getGroupName(), VULNERABILITY_SOURCE_EPSS_FEEDS_URL.getPropertyName()).getPropertyValue();
if (this.feedUrl.endsWith("/")) {
this.feedUrl = this.feedUrl.substring(0, this.feedUrl.length()-1);
this.feedUrl = this.feedUrl.substring(0, this.feedUrl.length() - 1);
}
}
}
Expand All @@ -84,7 +86,9 @@ public void inform(final Event e) {
LOGGER.info("Starting EPSS mirroring task");
final File mirrorPath = new File(MIRROR_DIR);
setOutputDir(mirrorPath.getAbsolutePath());
getAllFiles();

LockProvider.executeWithLock(EPSS_MIRROR_TASK_LOCK, (Runnable) () -> getAllFiles());

final long end = System.currentTimeMillis();
LOGGER.info("EPSS mirroring complete");
LOGGER.info("Time spent (d/l): " + metricDownloadTime + "ms");
Expand All @@ -96,6 +100,7 @@ public void inform(final Event e) {
/**
* Defines the output directory where the mirrored files will be stored.
* Creates the directory if non-existent.
*
* @param outputDirPath the target output directory path
*/
private void setOutputDir(final String outputDirPath) {
Expand All @@ -114,12 +119,13 @@ private void getAllFiles() {
doDownload(this.feedUrl + "/" + FILENAME);
if (mirroredWithoutErrors) {
String content = "Mirroring of the Exploit Prediction Scoring System completed successfully";
NotificationUtil.dispatchExceptionNotifications(NotificationScope.SYSTEM, NotificationGroup.DATASOURCE_MIRRORING, NotificationConstants.Title.EPSS_MIRROR, content , NotificationLevel.INFORMATIONAL);
NotificationUtil.dispatchExceptionNotifications(NotificationScope.SYSTEM, NotificationGroup.DATASOURCE_MIRRORING, NotificationConstants.Title.EPSS_MIRROR, content, NotificationLevel.INFORMATIONAL);
}
}

/**
* Performs a download of specified URL.
*
* @param urlString the URL contents to download
*/
private void doDownload(final String urlString) {
Expand Down Expand Up @@ -158,19 +164,20 @@ private void doDownload(final String urlString) {
mirroredWithoutErrors = false;
LOGGER.warn("Unable to download - HTTP Response " + status.getStatusCode() + ": " + status.getReasonPhrase());
String content = "An error occurred mirroring the contents of the Exploit Prediction Scoring System. Check log for details. HTTP Response: " + status.getStatusCode();
NotificationUtil.dispatchExceptionNotifications(NotificationScope.SYSTEM, NotificationGroup.DATASOURCE_MIRRORING, NotificationConstants.Title.EPSS_MIRROR, content , NotificationLevel.ERROR);
NotificationUtil.dispatchExceptionNotifications(NotificationScope.SYSTEM, NotificationGroup.DATASOURCE_MIRRORING, NotificationConstants.Title.EPSS_MIRROR, content, NotificationLevel.ERROR);
}
}
} catch (IOException e) {
mirroredWithoutErrors = false;
LOGGER.error("Download failed : " + e.getMessage());
String content = "An error occurred mirroring the contents of the Exploit Prediction Scoring System. Check log for details. " + e.getMessage();
NotificationUtil.dispatchExceptionNotifications(NotificationScope.SYSTEM, NotificationGroup.DATASOURCE_MIRRORING, NotificationConstants.Title.EPSS_MIRROR, content , NotificationLevel.ERROR);
NotificationUtil.dispatchExceptionNotifications(NotificationScope.SYSTEM, NotificationGroup.DATASOURCE_MIRRORING, NotificationConstants.Title.EPSS_MIRROR, content, NotificationLevel.ERROR);
}
}

/**
* Extracts a GZip file.
*
* @param file the file to extract
*/
private void uncompress(final File file) {
Expand Down Expand Up @@ -203,6 +210,7 @@ private void uncompress(final File file) {

/**
* Closes a closable object.
*
* @param object the object to close
*/
private void close(final Closeable object) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@
import alpine.common.logging.Logger;
import alpine.event.framework.Event;
import alpine.event.framework.Subscriber;
import net.javacrumbs.shedlock.core.LockExtender;
import net.javacrumbs.shedlock.core.LockingTaskExecutor;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.datanucleus.PropertyNames;
import org.dependencytrack.event.InternalComponentIdentificationEvent;
import org.dependencytrack.model.Component;
import org.dependencytrack.persistence.QueryManager;
import org.dependencytrack.util.InternalComponentIdentificationUtil;
import org.dependencytrack.util.LockProvider;

import javax.jdo.PersistenceManager;
import javax.jdo.Query;
Expand All @@ -36,6 +39,9 @@
import java.time.Instant;
import java.util.List;

import static java.time.Duration.ZERO;
import static org.dependencytrack.tasks.LockName.INTERNAL_COMPONENT_IDENTIFICATION_TASK_LOCK;

/**
* Subscriber task that identifies internal components throughout the entire portfolio.
*
Expand All @@ -52,9 +58,9 @@ public void inform(final Event e) {
LOGGER.info("Starting internal component identification");
final Instant startTime = Instant.now();
try {
analyze();
} catch (Exception ex) {
LOGGER.error("An unexpected error occurred while identifying internal components", ex);
LockProvider.executeWithLock(INTERNAL_COMPONENT_IDENTIFICATION_TASK_LOCK, (LockingTaskExecutor.Task) () -> analyze());
} catch (Throwable ex) {
LOGGER.error("Error in acquiring lock and executing internal component identification task", ex);
}
LOGGER.info("Internal component identification completed in "
+ DateFormatUtils.format(Duration.between(startTime, Instant.now()).toMillis(), "mm:ss:SS"));
Expand All @@ -72,6 +78,12 @@ private void analyze() throws Exception {

List<Component> components = fetchNextComponentsPage(pm, null);
while (!components.isEmpty()) {
//Extend the lock by 5 min everytime we have a page.
//We will get max 1000 components in a page
//Reason of not extending at the end of loop is if it does not have to do much,
//It might finish execution before lock could be extended resulting in error
LOGGER.debug("extending lock of internal component identification by 5 min");
LockExtender.extendActiveLock(Duration.ofMinutes(5), ZERO);
for (final Component component : components) {
String coordinates = component.getName();
if (StringUtils.isNotBlank(component.getGroup())) {
Expand Down Expand Up @@ -129,7 +141,7 @@ private List<Component> fetchNextComponentsPage(final PersistenceManager pm, fin
query.setParameters(lastId);
}
query.setOrdering("id DESC");
query.setRange(0, 500);
query.setRange(0, 1000);
query.getFetchPlan().setGroup(Component.FetchGroup.INTERNAL_IDENTIFICATION.name());
return List.copyOf(query.executeList());
}
Expand Down
30 changes: 30 additions & 0 deletions src/main/java/org/dependencytrack/tasks/LdapSyncTaskWrapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package org.dependencytrack.tasks;

import alpine.event.framework.Event;
import alpine.event.framework.Subscriber;
import alpine.server.tasks.LdapSyncTask;
import alpine.event.LdapSyncEvent;
import org.dependencytrack.util.LockProvider;

import static org.dependencytrack.tasks.LockName.LDAP_SYNC_TASK_LOCK;

public class LdapSyncTaskWrapper implements Subscriber {

private final LdapSyncTask ldapSyncTask;

public LdapSyncTaskWrapper() {
this(new LdapSyncTask());
}

LdapSyncTaskWrapper(LdapSyncTask ldapSyncTask) {
this.ldapSyncTask = ldapSyncTask;
}

@Override
public void inform(Event e) {
if (e instanceof LdapSyncEvent) {
LockProvider.executeWithLock(LDAP_SYNC_TASK_LOCK, (Runnable) () -> this.ldapSyncTask.inform(new LdapSyncEvent()));
}
}
}

5 changes: 4 additions & 1 deletion src/main/java/org/dependencytrack/tasks/LockName.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,8 @@

public enum LockName {
PORTFOLIO_METRICS_TASK_LOCK,
PROJECT_METRICS_TASK_LOCK,
LDAP_SYNC_TASK_LOCK,
EPSS_MIRROR_TASK_LOCK,
VULNERABILITY_METRICS_TASK_LOCK,
INTERNAL_COMPONENT_IDENTIFICATION_TASK_LOCK
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,39 +18,32 @@
*/
package org.dependencytrack.tasks.metrics;

import alpine.Config;
import alpine.common.logging.Logger;
import alpine.common.util.SystemUtil;
import alpine.event.framework.Event;
import alpine.event.framework.Subscriber;
import io.micrometer.core.instrument.Timer;
import net.javacrumbs.shedlock.core.LockConfiguration;
import net.javacrumbs.shedlock.core.LockExtender;
import net.javacrumbs.shedlock.core.LockingTaskExecutor;
import net.javacrumbs.shedlock.provider.jdbc.JdbcLockProvider;
import org.apache.commons.collections4.ListUtils;
import org.dependencytrack.event.CallbackEvent;
import org.dependencytrack.event.PortfolioMetricsUpdateEvent;
import org.dependencytrack.event.ProjectMetricsUpdateEvent;
import org.dependencytrack.metrics.Metrics;
import org.dependencytrack.model.Project;
import org.dependencytrack.persistence.QueryManager;
import org.dependencytrack.util.LockProvider;

import javax.jdo.PersistenceManager;
import javax.jdo.Query;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static java.time.Duration.ZERO;
import static org.dependencytrack.common.ConfigKey.TASK_PORTFOLIO_LOCK_AT_LEAST_FOR;
import static org.dependencytrack.common.ConfigKey.TASK_PORTFOLIO_LOCK_AT_MOST_FOR;
import static org.dependencytrack.tasks.LockName.PORTFOLIO_METRICS_TASK_LOCK;
import static org.dependencytrack.util.LockProvider.getJdbcLockProviderInstance;
import static org.dependencytrack.util.LockProvider.getLockingTaskExecutorInstance;


/**
Expand All @@ -68,22 +61,9 @@ public class PortfolioMetricsUpdateTask implements Subscriber {
public void inform(final Event e) {
if (e instanceof final PortfolioMetricsUpdateEvent event) {
try {
JdbcLockProvider instance = getJdbcLockProviderInstance();
LockingTaskExecutor executor = getLockingTaskExecutorInstance(instance);
LockConfiguration lockConfiguration = new LockConfiguration(Instant.now(),
PORTFOLIO_METRICS_TASK_LOCK.name(),
Duration.ofMillis(Config.getInstance().getPropertyAsInt(TASK_PORTFOLIO_LOCK_AT_MOST_FOR)),
Duration.ofMillis(Config.getInstance().getPropertyAsInt(TASK_PORTFOLIO_LOCK_AT_LEAST_FOR)));

executor.executeWithLock((Runnable) () -> {
try {
updateMetrics(event.isForceRefresh());
} catch (Exception ex) {
throw new RuntimeException("Error in acquiring lock and executing metrics", ex);
}
}, lockConfiguration);
} catch (Exception ex) {
LOGGER.error("An unexpected error occurred while updating portfolio metrics", ex);
LockProvider.executeWithLock(PORTFOLIO_METRICS_TASK_LOCK, (LockingTaskExecutor.Task)() -> updateMetrics(event.isForceRefresh()));
} catch (Throwable ex) {
LOGGER.error("Error in acquiring lock and executing portfolio metrics task", ex);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import alpine.event.framework.Event;
import alpine.event.framework.Subscriber;
import io.micrometer.core.instrument.Timer;
import net.javacrumbs.shedlock.core.LockingTaskExecutor;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.dependencytrack.event.VulnerabilityMetricsUpdateEvent;
import org.dependencytrack.model.VulnerabilityMetrics;
import org.dependencytrack.persistence.QueryManager;
import org.dependencytrack.util.LockProvider;

import javax.jdo.PersistenceManager;
import javax.jdo.Query;
Expand All @@ -38,6 +40,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.dependencytrack.tasks.LockName.VULNERABILITY_METRICS_TASK_LOCK;
import static org.dependencytrack.tasks.metrics.VulnerabilityMetricsUpdateTask.VulnerabilityDateCounters.queryForMetrics;

/**
Expand All @@ -53,9 +56,9 @@ public class VulnerabilityMetricsUpdateTask implements Subscriber {
public void inform(final Event e) {
if (e instanceof VulnerabilityMetricsUpdateEvent) {
try {
updateMetrics();
} catch (Exception ex) {
LOGGER.error("An unexpected error occurred while updating vulnerability metrics", ex);
LockProvider.executeWithLock(VULNERABILITY_METRICS_TASK_LOCK, (LockingTaskExecutor.Task) () -> updateMetrics());
} catch (Throwable ex) {
LOGGER.error("Error in acquiring lock and executing vulnerability metrics update task", ex);
}
}
}
Expand Down Expand Up @@ -101,7 +104,7 @@ private void updateMetrics() throws Exception {

// Flatten another time, for the yearly counts
Stream<VulnerabilityMetrics> yearlyStream = metrics.entrySet().stream()
.map(e -> new VulnerabilityMetrics(e.getKey(), null, e.getValue().values().stream().mapToInt(d->d.intValue()).sum(), measuredAt));
.map(e -> new VulnerabilityMetrics(e.getKey(), null, e.getValue().values().stream().mapToInt(d -> d.intValue()).sum(), measuredAt));

qm.synchronizeVulnerabilityMetrics(Stream.concat(monthlyStream, yearlyStream).toList());

Expand Down
Loading
Loading