package org.apache.flink.runtime.checkpoint.channel;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.fs.FileSystemSafetyNet;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.RunnableWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.class */
class ChannelStateWriteRequestExecutorImpl implements ChannelStateWriteRequestExecutor {
    private static final Logger LOG;
    private final Object lock;
    private final ChannelStateWriteRequestDispatcher dispatcher;
    private final Thread thread;
    private final int maxSubtasksPerChannelStateFile;

    @GuardedBy("lock")
    private final Deque<ChannelStateWriteRequest> deque;

    @GuardedBy("lock")
    private Exception thrown;

    @GuardedBy("lock")
    private boolean wasClosed;

    @GuardedBy("lock")
    private final Map<SubtaskID, Queue<ChannelStateWriteRequest>> unreadyQueues;

    @GuardedBy("lock")
    private final Set<SubtaskID> subtasks;
    private final Object registerLock;

    @GuardedBy("registerLock")
    private boolean isRegistering;

    @GuardedBy("registerLock")
    private final Consumer<ChannelStateWriteRequestExecutor> onRegistered;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ChannelStateWriteRequestExecutorImpl(ChannelStateWriteRequestDispatcher channelStateWriteRequestDispatcher, int i, Consumer<ChannelStateWriteRequestExecutor> consumer, Object obj) {
        this(channelStateWriteRequestDispatcher, new ArrayDeque(), i, obj, consumer);
    }

    ChannelStateWriteRequestExecutorImpl(ChannelStateWriteRequestDispatcher channelStateWriteRequestDispatcher, Deque<ChannelStateWriteRequest> deque, int i, Object obj, Consumer<ChannelStateWriteRequestExecutor> consumer) {
        this.lock = new Object();
        this.thrown = null;
        this.wasClosed = false;
        this.unreadyQueues = new HashMap();
        this.isRegistering = true;
        this.dispatcher = channelStateWriteRequestDispatcher;
        this.deque = deque;
        this.maxSubtasksPerChannelStateFile = i;
        this.registerLock = obj;
        this.onRegistered = consumer;
        this.thread = new Thread(this::run, "Channel state writer ");
        this.subtasks = new HashSet();
        this.thread.setDaemon(true);
    }

    @VisibleForTesting
    void run() {
        try {
            try {
                FileSystemSafetyNet.initializeSafetyNetForThread();
                loop();
                try {
                    IOUtils.closeAll(this::cleanupRequests, () -> {
                        Throwable cancellationException;
                        synchronized (this.lock) {
                            cancellationException = this.thrown == null ? new CancellationException() : this.thrown;
                        }
                        this.dispatcher.fail(cancellationException);
                    });
                } catch (Exception e) {
                    synchronized (this.lock) {
                        this.thrown = (Exception) ExceptionUtils.firstOrSuppressed(e, this.thrown);
                    }
                }
                FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
            } catch (Exception e2) {
                this.thrown = e2;
                try {
                    IOUtils.closeAll(this::cleanupRequests, () -> {
                        Throwable cancellationException;
                        synchronized (this.lock) {
                            cancellationException = this.thrown == null ? new CancellationException() : this.thrown;
                        }
                        this.dispatcher.fail(cancellationException);
                    });
                } catch (Exception e3) {
                    synchronized (this.lock) {
                        this.thrown = (Exception) ExceptionUtils.firstOrSuppressed(e3, this.thrown);
                    }
                }
                FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
            }
            LOG.debug("loop terminated");
        } catch (Throwable th) {
            try {
                IOUtils.closeAll(this::cleanupRequests, () -> {
                    Throwable cancellationException;
                    synchronized (this.lock) {
                        cancellationException = this.thrown == null ? new CancellationException() : this.thrown;
                    }
                    this.dispatcher.fail(cancellationException);
                });
            } catch (Exception e4) {
                synchronized (this.lock) {
                    this.thrown = (Exception) ExceptionUtils.firstOrSuppressed(e4, this.thrown);
                }
            }
            FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
            throw th;
        }
    }

    /* JADX WARN: Removed duplicated region for block: B:47:0x005d A[EXC_TOP_SPLITTER, SYNTHETIC] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void loop() throws java.lang.Exception {
        /*
            r4 = this;
        L0:
            r0 = r4
            java.lang.Object r0 = r0.lock     // Catch: java.lang.InterruptedException -> L55
            r1 = r0
            r6 = r1
            monitor-enter(r0)     // Catch: java.lang.InterruptedException -> L55
            r0 = r4
            org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest r0 = r0.waitAndTakeUnsafe()     // Catch: java.lang.Throwable -> L18 java.lang.InterruptedException -> L55
            r5 = r0
            r0 = r5
            if (r0 != 0) goto L13
            r0 = r6
            monitor-exit(r0)     // Catch: java.lang.Throwable -> L18 java.lang.InterruptedException -> L55
            return
        L13:
            r0 = r6
            monitor-exit(r0)     // Catch: java.lang.Throwable -> L18 java.lang.InterruptedException -> L55
            goto L1d
        L18:
            r7 = move-exception
            r0 = r6
            monitor-exit(r0)     // Catch: java.lang.Throwable -> L18 java.lang.InterruptedException -> L55
            r0 = r7
            throw r0     // Catch: java.lang.InterruptedException -> L55
        L1d:
            r0 = r5
            boolean r0 = r0 instanceof org.apache.flink.runtime.checkpoint.channel.CheckpointStartRequest     // Catch: java.lang.InterruptedException -> L55
            if (r0 == 0) goto L48
            r0 = r4
            java.lang.Object r0 = r0.registerLock     // Catch: java.lang.InterruptedException -> L55
            r1 = r0
            r6 = r1
            monitor-enter(r0)     // Catch: java.lang.InterruptedException -> L55
            r0 = r4
            boolean r0 = r0.completeRegister()     // Catch: java.lang.Throwable -> L41 java.lang.InterruptedException -> L55
            if (r0 == 0) goto L3c
            r0 = r4
            java.util.function.Consumer<org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutor> r0 = r0.onRegistered     // Catch: java.lang.Throwable -> L41 java.lang.InterruptedException -> L55
            r1 = r4
            r0.accept(r1)     // Catch: java.lang.Throwable -> L41 java.lang.InterruptedException -> L55
        L3c:
            r0 = r6
            monitor-exit(r0)     // Catch: java.lang.Throwable -> L41 java.lang.InterruptedException -> L55
            goto L48
        L41:
            r8 = move-exception
            r0 = r6
            monitor-exit(r0)     // Catch: java.lang.Throwable -> L41 java.lang.InterruptedException -> L55
            r0 = r8
            throw r0     // Catch: java.lang.InterruptedException -> L55
        L48:
            r0 = r4
            org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcher r0 = r0.dispatcher     // Catch: java.lang.InterruptedException -> L55
            r1 = r5
            r0.dispatch(r1)     // Catch: java.lang.InterruptedException -> L55
            goto L0
        L55:
            r5 = move-exception
            r0 = r4
            java.lang.Object r0 = r0.lock
            r1 = r0
            r6 = r1
            monitor-enter(r0)
            r0 = r4
            boolean r0 = r0.wasClosed     // Catch: java.lang.Throwable -> L80
            if (r0 != 0) goto L72
            org.slf4j.Logger r0 = org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.LOG     // Catch: java.lang.Throwable -> L80
            java.lang.String r1 = "Channel state executor is interrupted while waiting for a request (continue waiting)"
            r2 = r5
            r0.debug(r1, r2)     // Catch: java.lang.Throwable -> L80
            goto L7b
        L72:
            java.lang.Thread r0 = java.lang.Thread.currentThread()     // Catch: java.lang.Throwable -> L80
            r0.interrupt()     // Catch: java.lang.Throwable -> L80
            r0 = r6
            monitor-exit(r0)     // Catch: java.lang.Throwable -> L80
            return
        L7b:
            r0 = r6
            monitor-exit(r0)     // Catch: java.lang.Throwable -> L80
            goto L87
        L80:
            r9 = move-exception
            r0 = r6
            monitor-exit(r0)     // Catch: java.lang.Throwable -> L80
            r0 = r9
            throw r0
        L87:
            goto L0
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.loop():void");
    }

    @Nullable
    private ChannelStateWriteRequest waitAndTakeUnsafe() throws InterruptedException {
        while (!this.wasClosed) {
            ChannelStateWriteRequest pollFirst = this.deque.pollFirst();
            if (pollFirst != null) {
                return pollFirst;
            }
            this.lock.wait();
        }
        return null;
    }

    private void cleanupRequests() throws Exception {
        Throwable cancellationException;
        ArrayList arrayList;
        synchronized (this.lock) {
            cancellationException = this.thrown == null ? new CancellationException() : this.thrown;
            arrayList = new ArrayList(this.deque);
            this.deque.clear();
            for (Queue<ChannelStateWriteRequest> queue : this.unreadyQueues.values()) {
                while (!queue.isEmpty()) {
                    arrayList.add(queue.poll());
                }
            }
        }
        LOG.info("discarding {} drained requests", Integer.valueOf(arrayList.size()));
        IOUtils.closeAll((Iterable<? extends AutoCloseable>) arrayList.stream().map(channelStateWriteRequest -> {
            return () -> {
                channelStateWriteRequest.cancel(cancellationException);
            };
        }).collect(Collectors.toList()));
    }

    @Override // org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutor
    public void start() throws IllegalStateException {
        this.thread.start();
    }

    @Override // org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutor
    public void submit(ChannelStateWriteRequest channelStateWriteRequest) throws Exception {
        synchronized (this.lock) {
            Queue<ChannelStateWriteRequest> queue = this.unreadyQueues.get(SubtaskID.of(channelStateWriteRequest.getJobVertexID(), channelStateWriteRequest.getSubtaskIndex()));
            Preconditions.checkArgument(queue != null, "The subtask %s is not yet registered.");
            submitInternal(channelStateWriteRequest, () -> {
                if (!queue.isEmpty()) {
                    queue.add(channelStateWriteRequest);
                } else if (channelStateWriteRequest.getReadyFuture().isDone()) {
                    this.deque.add(channelStateWriteRequest);
                    this.lock.notifyAll();
                } else {
                    queue.add(channelStateWriteRequest);
                    registerFirstRequestFuture(channelStateWriteRequest, queue);
                }
            });
        }
    }

    private void registerFirstRequestFuture(@Nonnull ChannelStateWriteRequest channelStateWriteRequest, Queue<ChannelStateWriteRequest> queue) {
        if (!$assertionsDisabled && !Thread.holdsLock(this.lock)) {
            throw new AssertionError();
        }
        Preconditions.checkState(channelStateWriteRequest == queue.peek(), "The request isn't the first request.");
        channelStateWriteRequest.getReadyFuture().thenAccept(obj -> {
            synchronized (this.lock) {
                moveReadyRequestToReadyQueue(queue, channelStateWriteRequest);
            }
        }).exceptionally(th -> {
            synchronized (this.lock) {
                moveReadyRequestToReadyQueue(queue, channelStateWriteRequest);
            }
            return null;
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void moveReadyRequestToReadyQueue(Queue<ChannelStateWriteRequest> queue, ChannelStateWriteRequest channelStateWriteRequest) {
        if (!$assertionsDisabled && !Thread.holdsLock(this.lock)) {
            throw new AssertionError();
        }
        Preconditions.checkState(channelStateWriteRequest == queue.peek());
        while (!queue.isEmpty()) {
            ChannelStateWriteRequest peek = queue.peek();
            if (!peek.getReadyFuture().isDone()) {
                registerFirstRequestFuture(peek, queue);
                return;
            } else {
                this.deque.add(Objects.requireNonNull(queue.poll()));
                this.lock.notifyAll();
            }
        }
    }

    @Override // org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutor
    public void submitPriority(ChannelStateWriteRequest channelStateWriteRequest) throws Exception {
        synchronized (this.lock) {
            Preconditions.checkArgument(this.unreadyQueues.containsKey(SubtaskID.of(channelStateWriteRequest.getJobVertexID(), channelStateWriteRequest.getSubtaskIndex())), "The subtask %s is not yet registered.");
            Preconditions.checkState(channelStateWriteRequest.getReadyFuture().isDone(), "The priority request must be ready.");
            submitInternal(channelStateWriteRequest, () -> {
                this.deque.addFirst(channelStateWriteRequest);
                this.lock.notifyAll();
            });
        }
    }

    private void submitInternal(ChannelStateWriteRequest channelStateWriteRequest, RunnableWithException runnableWithException) throws Exception {
        try {
            runnableWithException.run();
            ensureRunning();
        } catch (Exception e) {
            channelStateWriteRequest.cancel(e);
            throw e;
        }
    }

    private void ensureRunning() throws Exception {
        if (!$assertionsDisabled && !Thread.holdsLock(this.lock)) {
            throw new AssertionError();
        }
        if (this.wasClosed || !this.thread.isAlive()) {
            cleanupRequests();
            IllegalStateException illegalStateException = new IllegalStateException("not running");
            if (this.thrown != null) {
                illegalStateException.addSuppressed(this.thrown);
            }
            throw illegalStateException;
        }
    }

    @Override // org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutor
    public void registerSubtask(JobVertexID jobVertexID, int i) {
        if (!$assertionsDisabled && !Thread.holdsLock(this.registerLock)) {
            throw new AssertionError();
        }
        SubtaskID of = SubtaskID.of(jobVertexID, i);
        synchronized (this.lock) {
            Preconditions.checkState(isRegistering(), "This executor has been registered.");
            Preconditions.checkState(!this.subtasks.contains(of), String.format("This subtask[%s] has already registered.", of));
            this.subtasks.add(of);
            this.deque.add(ChannelStateWriteRequest.registerSubtask(of.getJobVertexID(), of.getSubtaskIndex()));
            this.lock.notifyAll();
            this.unreadyQueues.put(of, new ArrayDeque());
            if (this.subtasks.size() == this.maxSubtasksPerChannelStateFile && completeRegister()) {
                this.onRegistered.accept(this);
            }
        }
    }

    @VisibleForTesting
    public boolean isRegistering() {
        boolean z;
        synchronized (this.registerLock) {
            z = this.isRegistering;
        }
        return z;
    }

    private boolean completeRegister() {
        if (!$assertionsDisabled && !Thread.holdsLock(this.registerLock)) {
            throw new AssertionError();
        }
        if (!this.isRegistering) {
            return false;
        }
        this.isRegistering = false;
        return true;
    }

    @Override // org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutor
    public void releaseSubtask(JobVertexID jobVertexID, int i) throws IOException {
        synchronized (this.registerLock) {
            synchronized (this.lock) {
                if (completeRegister()) {
                    this.onRegistered.accept(this);
                }
                this.subtasks.remove(SubtaskID.of(jobVertexID, i));
                if (this.subtasks.isEmpty()) {
                    this.wasClosed = true;
                    this.lock.notifyAll();
                    while (this.thread.isAlive()) {
                        this.thread.interrupt();
                        try {
                            this.thread.join();
                        } catch (InterruptedException e) {
                            if (!this.thread.isAlive()) {
                                Thread.currentThread().interrupt();
                            }
                            LOG.debug("Channel state executor is interrupted while waiting for the writer thread to die", (Throwable) e);
                        }
                    }
                    synchronized (this.lock) {
                        if (this.thrown != null) {
                            throw new IOException(this.thrown);
                        }
                    }
                }
            }
        }
    }

    @VisibleForTesting
    Thread getThread() {
        return this.thread;
    }

    static {
        $assertionsDisabled = !ChannelStateWriteRequestExecutorImpl.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger((Class<?>) ChannelStateWriteRequestExecutorImpl.class);
    }
}
