package org.apache.flink.streaming.api.operators.async.queue;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.class */
public final class UnorderedStreamElementQueue<OUT> implements StreamElementQueue<OUT> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) UnorderedStreamElementQueue.class);
    private final int capacity;
    private final Deque<Segment<OUT>> segments;
    private int numberOfEntries;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue$Segment.class */
    public static class Segment<OUT> {
        private final Set<StreamElementQueueEntry<OUT>> incompleteElements;
        private final Queue<StreamElementQueueEntry<OUT>> completedElements;

        Segment(int i) {
            this.incompleteElements = CollectionUtil.newHashSetWithExpectedSize(i);
            this.completedElements = new ArrayDeque(i);
        }

        void completed(StreamElementQueueEntry<OUT> streamElementQueueEntry) {
            if (this.incompleteElements.remove(streamElementQueueEntry)) {
                this.completedElements.add(streamElementQueueEntry);
            }
        }

        boolean isEmpty() {
            return this.incompleteElements.isEmpty() && this.completedElements.isEmpty();
        }

        boolean hasCompleted() {
            return !this.completedElements.isEmpty();
        }

        void addPendingElements(List<StreamElement> list) {
            Iterator<StreamElementQueueEntry<OUT>> it = this.completedElements.iterator();
            while (it.hasNext()) {
                list.add(it.next().getInputElement());
            }
            Iterator<StreamElementQueueEntry<OUT>> it2 = this.incompleteElements.iterator();
            while (it2.hasNext()) {
                list.add(it2.next().getInputElement());
            }
        }

        int emitCompleted(TimestampedCollector<OUT> timestampedCollector) {
            StreamElementQueueEntry<OUT> poll = this.completedElements.poll();
            if (poll == null) {
                return 0;
            }
            poll.emitResult(timestampedCollector);
            return 1;
        }

        void add(StreamElementQueueEntry<OUT> streamElementQueueEntry) {
            if (streamElementQueueEntry.isDone()) {
                this.completedElements.add(streamElementQueueEntry);
            } else {
                this.incompleteElements.add(streamElementQueueEntry);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue$SegmentedStreamRecordQueueEntry.class */
    public static class SegmentedStreamRecordQueueEntry<OUT> extends StreamRecordQueueEntry<OUT> {
        private final Segment<OUT> segment;

        SegmentedStreamRecordQueueEntry(StreamRecord<?> streamRecord, Segment<OUT> segment) {
            super(streamRecord);
            this.segment = segment;
        }

        @Override // org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry, org.apache.flink.streaming.api.functions.async.ResultFuture
        public void complete(Collection<OUT> collection) {
            super.complete(collection);
            this.segment.completed(this);
        }
    }

    public UnorderedStreamElementQueue(int i) {
        Preconditions.checkArgument(i > 0, "The capacity must be larger than 0.");
        this.capacity = i;
        this.segments = new ArrayDeque(4);
        this.numberOfEntries = 0;
    }

    @Override // org.apache.flink.streaming.api.operators.async.queue.StreamElementQueue
    public Optional<ResultFuture<OUT>> tryPut(StreamElement streamElement) {
        StreamElementQueueEntry<OUT> addWatermark;
        if (size() >= this.capacity) {
            LOG.debug("Failed to put element into unordered stream element queue because it was full ({}/{}).", Integer.valueOf(size()), Integer.valueOf(this.capacity));
            return Optional.empty();
        }
        if (streamElement.isRecord()) {
            addWatermark = addRecord((StreamRecord) streamElement);
        } else {
            if (!streamElement.isWatermark()) {
                throw new UnsupportedOperationException("Cannot enqueue " + streamElement);
            }
            addWatermark = addWatermark((Watermark) streamElement);
        }
        this.numberOfEntries++;
        LOG.debug("Put element into unordered stream element queue. New filling degree ({}/{}).", Integer.valueOf(size()), Integer.valueOf(this.capacity));
        return Optional.of(addWatermark);
    }

    private StreamElementQueueEntry<OUT> addRecord(StreamRecord<?> streamRecord) {
        Segment<OUT> addSegment = this.segments.isEmpty() ? addSegment(this.capacity) : this.segments.getLast();
        SegmentedStreamRecordQueueEntry segmentedStreamRecordQueueEntry = new SegmentedStreamRecordQueueEntry(streamRecord, addSegment);
        addSegment.add(segmentedStreamRecordQueueEntry);
        return segmentedStreamRecordQueueEntry;
    }

    private Segment<OUT> addSegment(int i) {
        Segment<OUT> segment = new Segment<>(i);
        this.segments.addLast(segment);
        return segment;
    }

    private StreamElementQueueEntry<OUT> addWatermark(Watermark watermark) {
        Segment<OUT> addSegment = (this.segments.isEmpty() || !this.segments.getLast().isEmpty()) ? addSegment(1) : this.segments.getLast();
        WatermarkQueueEntry watermarkQueueEntry = new WatermarkQueueEntry(watermark);
        addSegment.add(watermarkQueueEntry);
        addSegment(this.capacity);
        return watermarkQueueEntry;
    }

    @Override // org.apache.flink.streaming.api.operators.async.queue.StreamElementQueue
    public boolean hasCompletedElements() {
        return !this.segments.isEmpty() && this.segments.getFirst().hasCompleted();
    }

    @Override // org.apache.flink.streaming.api.operators.async.queue.StreamElementQueue
    public void emitCompletedElement(TimestampedCollector<OUT> timestampedCollector) {
        if (this.segments.isEmpty()) {
            return;
        }
        Segment<OUT> first = this.segments.getFirst();
        this.numberOfEntries -= first.emitCompleted(timestampedCollector);
        if (this.segments.size() <= 1 || !first.isEmpty()) {
            return;
        }
        this.segments.pop();
    }

    @Override // org.apache.flink.streaming.api.operators.async.queue.StreamElementQueue
    public List<StreamElement> values() {
        ArrayList arrayList = new ArrayList();
        Iterator<Segment<OUT>> it = this.segments.iterator();
        while (it.hasNext()) {
            it.next().addPendingElements(arrayList);
        }
        return arrayList;
    }

    @Override // org.apache.flink.streaming.api.operators.async.queue.StreamElementQueue
    public boolean isEmpty() {
        return this.numberOfEntries == 0;
    }

    @Override // org.apache.flink.streaming.api.operators.async.queue.StreamElementQueue
    public int size() {
        return this.numberOfEntries;
    }
}
