package org.apache.flink.connector.base.source.hybrid;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/connector/base/source/hybrid/HybridSourceReader.class */
public class HybridSourceReader<T> implements SourceReader<T, HybridSourceSplit> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) HybridSourceReader.class);
    private final SourceReaderContext readerContext;
    private boolean isFinalSource;
    private SourceReader<T, ? extends SourceSplit> currentReader;
    private final SwitchedSources switchedSources = new SwitchedSources();
    private int currentSourceIndex = -1;
    private CompletableFuture<Void> availabilityFuture = new CompletableFuture<>();
    private List<HybridSourceSplit> restoredSplits = new ArrayList();

    public HybridSourceReader(SourceReaderContext sourceReaderContext) {
        this.readerContext = sourceReaderContext;
    }

    @Override // org.apache.flink.api.connector.source.SourceReader
    public void start() {
        int i = this.currentSourceIndex;
        if (!this.restoredSplits.isEmpty()) {
            i = this.restoredSplits.get(0).sourceIndex() - 1;
        }
        this.readerContext.sendSourceEventToCoordinator(new SourceReaderFinishedEvent(i));
    }

    @Override // org.apache.flink.api.connector.source.SourceReader
    public InputStatus pollNext(ReaderOutput readerOutput) throws Exception {
        if (this.currentReader == null) {
            return InputStatus.NOTHING_AVAILABLE;
        }
        InputStatus pollNext = this.currentReader.pollNext(readerOutput);
        if (pollNext == InputStatus.END_OF_INPUT) {
            LOG.info("End of input subtask={} sourceIndex={} {}", Integer.valueOf(this.readerContext.getIndexOfSubtask()), Integer.valueOf(this.currentSourceIndex), this.currentReader);
            this.readerContext.sendSourceEventToCoordinator(new SourceReaderFinishedEvent(this.currentSourceIndex));
            if (!this.isFinalSource) {
                if (this.availabilityFuture.isDone()) {
                    this.availabilityFuture = new CompletableFuture<>();
                }
                return InputStatus.NOTHING_AVAILABLE;
            }
        }
        return pollNext;
    }

    @Override // org.apache.flink.api.connector.source.SourceReader
    public List<HybridSourceSplit> snapshotState(long j) {
        return HybridSourceSplit.wrapSplits(this.currentReader != null ? this.currentReader.snapshotState(j) : Collections.emptyList(), this.currentSourceIndex, this.switchedSources);
    }

    @Override // org.apache.flink.api.connector.source.SourceReader, org.apache.flink.api.common.state.CheckpointListener
    public void notifyCheckpointComplete(long j) throws Exception {
        if (this.currentReader != null) {
            this.currentReader.notifyCheckpointComplete(j);
        }
    }

    @Override // org.apache.flink.api.common.state.CheckpointListener
    public void notifyCheckpointAborted(long j) throws Exception {
        if (this.currentReader != null) {
            this.currentReader.notifyCheckpointAborted(j);
        }
    }

    @Override // org.apache.flink.api.connector.source.SourceReader
    public CompletableFuture<Void> isAvailable() {
        return this.availabilityFuture;
    }

    @Override // org.apache.flink.api.connector.source.SourceReader
    public void addSplits(List<HybridSourceSplit> list) {
        LOG.info("Adding splits subtask={} sourceIndex={} currentReader={} {}", Integer.valueOf(this.readerContext.getIndexOfSubtask()), Integer.valueOf(this.currentSourceIndex), this.currentReader, list);
        if (this.currentSourceIndex < 0) {
            this.restoredSplits.addAll(list);
            return;
        }
        ArrayList arrayList = new ArrayList(list.size());
        for (HybridSourceSplit hybridSourceSplit : list) {
            Preconditions.checkState(hybridSourceSplit.sourceIndex() == this.currentSourceIndex, "Split %s while current source is %s", hybridSourceSplit, Integer.valueOf(this.currentSourceIndex));
            arrayList.add(HybridSourceSplit.unwrapSplit(hybridSourceSplit, this.switchedSources));
        }
        this.currentReader.addSplits(arrayList);
    }

    @Override // org.apache.flink.api.connector.source.SourceReader
    public void notifyNoMoreSplits() {
        if (this.currentReader != null) {
            this.currentReader.notifyNoMoreSplits();
        }
        LOG.debug("No more splits for subtask={} sourceIndex={} currentReader={}", Integer.valueOf(this.readerContext.getIndexOfSubtask()), Integer.valueOf(this.currentSourceIndex), this.currentReader);
    }

    @Override // org.apache.flink.api.connector.source.SourceReader
    public void handleSourceEvents(SourceEvent sourceEvent) {
        if (!(sourceEvent instanceof SwitchSourceEvent)) {
            this.currentReader.handleSourceEvents(sourceEvent);
            return;
        }
        SwitchSourceEvent switchSourceEvent = (SwitchSourceEvent) sourceEvent;
        LOG.info("Switch source event: subtask={} sourceIndex={} source={}", Integer.valueOf(this.readerContext.getIndexOfSubtask()), Integer.valueOf(switchSourceEvent.sourceIndex()), switchSourceEvent.source());
        this.switchedSources.put(switchSourceEvent.sourceIndex(), switchSourceEvent.source());
        setCurrentReader(switchSourceEvent.sourceIndex());
        this.isFinalSource = switchSourceEvent.isFinalSource();
        if (this.availabilityFuture.isDone()) {
            return;
        }
        this.availabilityFuture.complete(null);
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        if (this.currentReader != null) {
            this.currentReader.close();
        }
        LOG.debug("Reader closed: subtask={} sourceIndex={} currentReader={}", Integer.valueOf(this.readerContext.getIndexOfSubtask()), Integer.valueOf(this.currentSourceIndex), this.currentReader);
    }

    private void setCurrentReader(int i) {
        Preconditions.checkArgument(i != this.currentSourceIndex);
        if (this.currentReader != null) {
            try {
                this.currentReader.close();
                LOG.debug("Reader closed: subtask={} sourceIndex={} currentReader={}", Integer.valueOf(this.readerContext.getIndexOfSubtask()), Integer.valueOf(this.currentSourceIndex), this.currentReader);
            } catch (Exception e) {
                throw new RuntimeException("Failed to close current reader", e);
            }
        }
        try {
            SourceReader<T, ? extends SourceSplit> createReader = this.switchedSources.sourceOf(i).createReader(this.readerContext);
            createReader.start();
            this.currentSourceIndex = i;
            this.currentReader = createReader;
            this.currentReader.isAvailable().whenComplete((r4, th) -> {
                if (th == null) {
                    this.availabilityFuture.complete(r4);
                } else {
                    this.availabilityFuture.completeExceptionally(th);
                }
            });
            LOG.debug("Reader started: subtask={} sourceIndex={} {}", Integer.valueOf(this.readerContext.getIndexOfSubtask()), Integer.valueOf(this.currentSourceIndex), createReader);
            if (this.restoredSplits.isEmpty()) {
                return;
            }
            ArrayList arrayList = new ArrayList(this.restoredSplits.size());
            Iterator<HybridSourceSplit> it = this.restoredSplits.iterator();
            while (it.hasNext()) {
                HybridSourceSplit next = it.next();
                if (next.sourceIndex() == i) {
                    arrayList.add(next);
                    it.remove();
                }
            }
            addSplits(arrayList);
        } catch (Exception e2) {
            throw new RuntimeException("Failed tp create reader", e2);
        }
    }
}
