package org.apache.flink.runtime.dispatcher.cleanup;

import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.RetryStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleaner.class */
public class DefaultResourceCleaner<T> implements ResourceCleaner {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) DefaultResourceCleaner.class);
    private final ComponentMainThreadExecutor mainThreadExecutor;
    private final Executor cleanupExecutor;
    private final CleanupFn<T> cleanupFn;
    private final Collection<CleanupWithLabel<T>> prioritizedCleanup;
    private final Collection<CleanupWithLabel<T>> regularCleanup;
    private final RetryStrategy retryStrategy;

    /* loaded from: input_file:org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleaner$Builder.class */
    public static class Builder<T> {
        private final ComponentMainThreadExecutor mainThreadExecutor;
        private final Executor cleanupExecutor;
        private final CleanupFn<T> cleanupFn;
        private final RetryStrategy retryStrategy;
        private final Collection<CleanupWithLabel<T>> prioritizedCleanup;
        private final Collection<CleanupWithLabel<T>> regularCleanup;

        private Builder(ComponentMainThreadExecutor componentMainThreadExecutor, Executor executor, CleanupFn<T> cleanupFn, RetryStrategy retryStrategy) {
            this.prioritizedCleanup = new ArrayList();
            this.regularCleanup = new ArrayList();
            this.mainThreadExecutor = componentMainThreadExecutor;
            this.cleanupExecutor = executor;
            this.cleanupFn = cleanupFn;
            this.retryStrategy = retryStrategy;
        }

        public Builder<T> withPrioritizedCleanup(String str, T t) {
            this.prioritizedCleanup.add(new CleanupWithLabel<>(t, str));
            return this;
        }

        public Builder<T> withRegularCleanup(String str, T t) {
            this.regularCleanup.add(new CleanupWithLabel<>(t, str));
            return this;
        }

        public DefaultResourceCleaner<T> build() {
            return new DefaultResourceCleaner<>(this.mainThreadExecutor, this.cleanupExecutor, this.cleanupFn, this.prioritizedCleanup, this.regularCleanup, this.retryStrategy);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    @FunctionalInterface
    /* loaded from: input_file:org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleaner$CleanupFn.class */
    public interface CleanupFn<T> {
        CompletableFuture<Void> cleanupAsync(T t, JobID jobID, Executor executor);
    }

    /* loaded from: input_file:org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleaner$CleanupWithLabel.class */
    private static class CleanupWithLabel<CLEANUP_TYPE> {
        private final CLEANUP_TYPE cleanup;
        private final String label;

        public CleanupWithLabel(CLEANUP_TYPE cleanup_type, String str) {
            this.cleanup = cleanup_type;
            this.label = str;
        }

        public CLEANUP_TYPE getCleanup() {
            return this.cleanup;
        }

        public String getLabel() {
            return this.label;
        }
    }

    public static Builder<LocallyCleanableResource> forLocallyCleanableResources(ComponentMainThreadExecutor componentMainThreadExecutor, Executor executor, RetryStrategy retryStrategy) {
        return forCleanableResources(componentMainThreadExecutor, executor, (v0, v1, v2) -> {
            return v0.localCleanupAsync(v1, v2);
        }, retryStrategy);
    }

    public static Builder<GloballyCleanableResource> forGloballyCleanableResources(ComponentMainThreadExecutor componentMainThreadExecutor, Executor executor, RetryStrategy retryStrategy) {
        return forCleanableResources(componentMainThreadExecutor, executor, (v0, v1, v2) -> {
            return v0.globalCleanupAsync(v1, v2);
        }, retryStrategy);
    }

    @VisibleForTesting
    static <T> Builder<T> forCleanableResources(ComponentMainThreadExecutor componentMainThreadExecutor, Executor executor, CleanupFn<T> cleanupFn, RetryStrategy retryStrategy) {
        return new Builder<>(componentMainThreadExecutor, executor, cleanupFn, retryStrategy);
    }

    private DefaultResourceCleaner(ComponentMainThreadExecutor componentMainThreadExecutor, Executor executor, CleanupFn<T> cleanupFn, Collection<CleanupWithLabel<T>> collection, Collection<CleanupWithLabel<T>> collection2, RetryStrategy retryStrategy) {
        this.mainThreadExecutor = componentMainThreadExecutor;
        this.cleanupExecutor = executor;
        this.cleanupFn = cleanupFn;
        this.prioritizedCleanup = collection;
        this.regularCleanup = collection2;
        this.retryStrategy = retryStrategy;
    }

    @Override // org.apache.flink.runtime.dispatcher.cleanup.ResourceCleaner
    public CompletableFuture<Void> cleanupAsync(JobID jobID) {
        this.mainThreadExecutor.assertRunningInMainThread();
        CompletableFuture<Void> completedVoidFuture = FutureUtils.completedVoidFuture();
        for (CleanupWithLabel<T> cleanupWithLabel : this.prioritizedCleanup) {
            completedVoidFuture = completedVoidFuture.thenCompose(r8 -> {
                return withRetry(jobID, cleanupWithLabel.getLabel(), cleanupWithLabel.getCleanup());
            });
        }
        return completedVoidFuture.thenCompose(r6 -> {
            return FutureUtils.completeAll((Collection) this.regularCleanup.stream().map(cleanupWithLabel2 -> {
                return withRetry(jobID, cleanupWithLabel2.getLabel(), cleanupWithLabel2.getCleanup());
            }).collect(Collectors.toList()));
        });
    }

    private CompletableFuture<Void> withRetry(JobID jobID, String str, T t) {
        return FutureUtils.retryWithDelay(() -> {
            return this.cleanupFn.cleanupAsync(t, jobID, this.cleanupExecutor).whenComplete((r8, th) -> {
                if (th != null) {
                    String format = String.format("Cleanup of %s failed for job %s due to a %s: %s", str, jobID, th.getClass().getSimpleName(), th.getMessage());
                    if (LOG.isTraceEnabled()) {
                        LOG.warn(format, th);
                    } else {
                        LOG.warn(format);
                    }
                }
            });
        }, this.retryStrategy, this.mainThreadExecutor);
    }
}
