package org.apache.flink.runtime.operators;

import org.apache.flink.api.common.functions.CrossFunction;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator;
import org.apache.flink.runtime.operators.resettable.SpillingResettableMutableObjectIterator;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
import org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/operators/CrossDriver.class */
public class CrossDriver<T1, T2, OT> implements Driver<CrossFunction<T1, T2, OT>, OT> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) CrossDriver.class);
    private TaskContext<CrossFunction<T1, T2, OT>, OT> taskContext;
    private MemoryManager memManager;
    private SpillingResettableMutableObjectIterator<?> spillIter;
    private BlockResettableMutableObjectIterator<?> blockIter;
    private int memPagesForBlockSide;
    private int memPagesForSpillingSide;
    private boolean blocked;
    private boolean firstIsOuter;
    private volatile boolean running;
    private boolean objectReuseEnabled = false;

    @Override // org.apache.flink.runtime.operators.Driver
    public void setup(TaskContext<CrossFunction<T1, T2, OT>, OT> taskContext) {
        this.taskContext = taskContext;
        this.running = true;
    }

    @Override // org.apache.flink.runtime.operators.Driver
    public int getNumberOfInputs() {
        return 2;
    }

    @Override // org.apache.flink.runtime.operators.Driver
    public Class<CrossFunction<T1, T2, OT>> getStubType() {
        return CrossFunction.class;
    }

    @Override // org.apache.flink.runtime.operators.Driver
    public int getNumberOfDriverComparators() {
        return 0;
    }

    @Override // org.apache.flink.runtime.operators.Driver
    public void prepare() throws Exception {
        TaskConfig taskConfig = this.taskContext.getTaskConfig();
        DriverStrategy driverStrategy = taskConfig.getDriverStrategy();
        switch (driverStrategy) {
            case NESTEDLOOP_BLOCKED_OUTER_FIRST:
                this.blocked = true;
                this.firstIsOuter = true;
                break;
            case NESTEDLOOP_BLOCKED_OUTER_SECOND:
                this.blocked = true;
                this.firstIsOuter = false;
                break;
            case NESTEDLOOP_STREAMED_OUTER_FIRST:
                this.blocked = false;
                this.firstIsOuter = true;
                break;
            case NESTEDLOOP_STREAMED_OUTER_SECOND:
                this.blocked = false;
                this.firstIsOuter = false;
                break;
            default:
                throw new RuntimeException("Invalid local strategy for CROSS: " + driverStrategy);
        }
        this.memManager = this.taskContext.getMemoryManager();
        int computeNumberOfPages = this.memManager.computeNumberOfPages(taskConfig.getRelativeMemoryDriver());
        if (computeNumberOfPages < 2) {
            throw new RuntimeException("The Cross task was initialized with too little memory. Cross requires at least 2 memory pages.");
        }
        if (driverStrategy == DriverStrategy.NESTEDLOOP_STREAMED_OUTER_FIRST || driverStrategy == DriverStrategy.NESTEDLOOP_STREAMED_OUTER_SECOND) {
            this.memPagesForSpillingSide = computeNumberOfPages;
            this.memPagesForBlockSide = 0;
        } else {
            if (computeNumberOfPages > 32) {
                this.memPagesForSpillingSide = 2;
            } else {
                this.memPagesForSpillingSide = 1;
            }
            this.memPagesForBlockSide = computeNumberOfPages - this.memPagesForSpillingSide;
        }
        this.objectReuseEnabled = this.taskContext.getExecutionConfig().isObjectReuseEnabled();
        if (LOG.isDebugEnabled()) {
            LOG.debug("CrossDriver object reuse: " + (this.objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
        }
    }

    @Override // org.apache.flink.runtime.operators.Driver
    public void run() throws Exception {
        if (this.blocked) {
            if (this.firstIsOuter) {
                runBlockedOuterFirst();
                return;
            } else {
                runBlockedOuterSecond();
                return;
            }
        }
        if (this.firstIsOuter) {
            runStreamedOuterFirst();
        } else {
            runStreamedOuterSecond();
        }
    }

    @Override // org.apache.flink.runtime.operators.Driver
    public void cleanup() throws Exception {
        if (this.spillIter != null) {
            this.spillIter.close();
            this.spillIter = null;
        }
        if (this.blockIter != null) {
            this.blockIter.close();
            this.blockIter = null;
        }
    }

    @Override // org.apache.flink.runtime.operators.Driver
    public void cancel() {
        this.running = false;
    }

    private void runBlockedOuterFirst() throws Exception {
        Object next;
        Object next2;
        if (LOG.isDebugEnabled()) {
            LOG.debug(this.taskContext.formatLogString("Running Cross with Block-Nested-Loops: First input is outer (blocking) side, second input is inner (spilling) side."));
        }
        Counter numRecordsInCounter = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter();
        Counter numRecordsOutCounter = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter();
        CountingMutableObjectIterator countingMutableObjectIterator = new CountingMutableObjectIterator(this.taskContext.getInput(0), numRecordsInCounter);
        CountingMutableObjectIterator countingMutableObjectIterator2 = new CountingMutableObjectIterator(this.taskContext.getInput(1), numRecordsInCounter);
        TypeSerializer serializer = this.taskContext.getInputSerializer(0).getSerializer();
        TypeSerializer serializer2 = this.taskContext.getInputSerializer(1).getSerializer();
        BlockResettableMutableObjectIterator<?> blockResettableMutableObjectIterator = new BlockResettableMutableObjectIterator<>(this.memManager, countingMutableObjectIterator, serializer, this.memPagesForBlockSide, this.taskContext.getContainingTask());
        this.blockIter = blockResettableMutableObjectIterator;
        SpillingResettableMutableObjectIterator<?> spillingResettableMutableObjectIterator = new SpillingResettableMutableObjectIterator<>(countingMutableObjectIterator2, (TypeSerializer<?>) serializer2, this.memManager, this.taskContext.getIOManager(), this.memPagesForSpillingSide, this.taskContext.getContainingTask());
        this.spillIter = spillingResettableMutableObjectIterator;
        CrossFunction stub = this.taskContext.getStub();
        CountingCollector countingCollector = new CountingCollector(this.taskContext.getOutputCollector(), numRecordsOutCounter);
        if (this.objectReuseEnabled) {
            Object mo12375createInstance = serializer.mo12375createInstance();
            Object mo12375createInstance2 = serializer2.mo12375createInstance();
            while (true) {
                if (!this.running || (next2 = spillingResettableMutableObjectIterator.next(mo12375createInstance2)) == null) {
                    spillingResettableMutableObjectIterator.reset();
                    if (!this.running || !blockResettableMutableObjectIterator.nextBlock()) {
                        return;
                    }
                } else {
                    while (true) {
                        Object next3 = blockResettableMutableObjectIterator.next(mo12375createInstance);
                        if (next3 == null) {
                            break;
                        } else {
                            countingCollector.collect(stub.cross(next3, next2));
                        }
                    }
                    blockResettableMutableObjectIterator.reset();
                }
            }
        } else {
            while (true) {
                if (!this.running || (next = spillingResettableMutableObjectIterator.next()) == null) {
                    spillingResettableMutableObjectIterator.reset();
                    if (!this.running || !blockResettableMutableObjectIterator.nextBlock()) {
                        return;
                    }
                } else {
                    while (true) {
                        Object next4 = blockResettableMutableObjectIterator.next();
                        if (next4 == null) {
                            break;
                        } else {
                            countingCollector.collect(stub.cross(next4, serializer2.copy(next)));
                        }
                    }
                    blockResettableMutableObjectIterator.reset();
                }
            }
        }
    }

    private void runBlockedOuterSecond() throws Exception {
        Object next;
        Object next2;
        Object next3;
        Object next4;
        if (LOG.isDebugEnabled()) {
            LOG.debug(this.taskContext.formatLogString("Running Cross with Block-Nested-Loops: First input is inner (spilling) side, second input is outer (blocking) side."));
        }
        Counter numRecordsInCounter = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter();
        Counter numRecordsOutCounter = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter();
        CountingMutableObjectIterator countingMutableObjectIterator = new CountingMutableObjectIterator(this.taskContext.getInput(0), numRecordsInCounter);
        CountingMutableObjectIterator countingMutableObjectIterator2 = new CountingMutableObjectIterator(this.taskContext.getInput(1), numRecordsInCounter);
        TypeSerializer serializer = this.taskContext.getInputSerializer(0).getSerializer();
        TypeSerializer serializer2 = this.taskContext.getInputSerializer(1).getSerializer();
        SpillingResettableMutableObjectIterator<?> spillingResettableMutableObjectIterator = new SpillingResettableMutableObjectIterator<>(countingMutableObjectIterator, (TypeSerializer<?>) serializer, this.memManager, this.taskContext.getIOManager(), this.memPagesForSpillingSide, this.taskContext.getContainingTask());
        this.spillIter = spillingResettableMutableObjectIterator;
        BlockResettableMutableObjectIterator<?> blockResettableMutableObjectIterator = new BlockResettableMutableObjectIterator<>(this.memManager, countingMutableObjectIterator2, serializer2, this.memPagesForBlockSide, this.taskContext.getContainingTask());
        this.blockIter = blockResettableMutableObjectIterator;
        CrossFunction stub = this.taskContext.getStub();
        CountingCollector countingCollector = new CountingCollector(this.taskContext.getOutputCollector(), numRecordsOutCounter);
        if (this.objectReuseEnabled) {
            Object mo12375createInstance = serializer.mo12375createInstance();
            Object mo12375createInstance2 = serializer2.mo12375createInstance();
            while (true) {
                if (!this.running || (next3 = spillingResettableMutableObjectIterator.next(mo12375createInstance)) == null) {
                    spillingResettableMutableObjectIterator.reset();
                    if (!this.running || !blockResettableMutableObjectIterator.nextBlock()) {
                        return;
                    }
                } else {
                    while (this.running && (next4 = blockResettableMutableObjectIterator.next(mo12375createInstance2)) != null) {
                        countingCollector.collect(stub.cross(next3, next4));
                    }
                    blockResettableMutableObjectIterator.reset();
                }
            }
        } else {
            while (true) {
                if (!this.running || (next = spillingResettableMutableObjectIterator.next()) == null) {
                    spillingResettableMutableObjectIterator.reset();
                    if (!this.running || !blockResettableMutableObjectIterator.nextBlock()) {
                        return;
                    }
                } else {
                    while (this.running && (next2 = blockResettableMutableObjectIterator.next()) != null) {
                        countingCollector.collect(stub.cross(serializer.copy(next), next2));
                    }
                    blockResettableMutableObjectIterator.reset();
                }
            }
        }
    }

    private void runStreamedOuterFirst() throws Exception {
        IN next;
        Object next2;
        IN next3;
        Object next4;
        if (LOG.isDebugEnabled()) {
            LOG.debug(this.taskContext.formatLogString("Running Cross with Nested-Loops: First input is outer side, second input is inner (spilling) side."));
        }
        Counter numRecordsInCounter = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter();
        Counter numRecordsOutCounter = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter();
        CountingMutableObjectIterator countingMutableObjectIterator = new CountingMutableObjectIterator(this.taskContext.getInput(0), numRecordsInCounter);
        CountingMutableObjectIterator countingMutableObjectIterator2 = new CountingMutableObjectIterator(this.taskContext.getInput(1), numRecordsInCounter);
        TypeSerializer serializer = this.taskContext.getInputSerializer(0).getSerializer();
        TypeSerializer serializer2 = this.taskContext.getInputSerializer(1).getSerializer();
        SpillingResettableMutableObjectIterator<?> spillingResettableMutableObjectIterator = new SpillingResettableMutableObjectIterator<>(countingMutableObjectIterator2, (TypeSerializer<?>) serializer2, this.memManager, this.taskContext.getIOManager(), this.memPagesForSpillingSide, this.taskContext.getContainingTask());
        this.spillIter = spillingResettableMutableObjectIterator;
        CrossFunction stub = this.taskContext.getStub();
        CountingCollector countingCollector = new CountingCollector(this.taskContext.getOutputCollector(), numRecordsOutCounter);
        if (!this.objectReuseEnabled) {
            while (this.running && (next = countingMutableObjectIterator.next()) != 0) {
                while (this.running && (next2 = spillingResettableMutableObjectIterator.next()) != null) {
                    countingCollector.collect(stub.cross(serializer.copy(next), next2));
                }
                spillingResettableMutableObjectIterator.reset();
            }
            return;
        }
        Object mo12375createInstance = serializer.mo12375createInstance();
        Object mo12375createInstance2 = serializer2.mo12375createInstance();
        while (this.running && (next3 = countingMutableObjectIterator.next(mo12375createInstance)) != 0) {
            while (this.running && (next4 = spillingResettableMutableObjectIterator.next(mo12375createInstance2)) != null) {
                countingCollector.collect(stub.cross(next3, next4));
            }
            spillingResettableMutableObjectIterator.reset();
        }
    }

    private void runStreamedOuterSecond() throws Exception {
        IN next;
        Object next2;
        IN next3;
        Object next4;
        if (LOG.isDebugEnabled()) {
            LOG.debug(this.taskContext.formatLogString("Running Cross with Nested-Loops: First input is inner (spilling) side, second input is outer side."));
        }
        Counter numRecordsInCounter = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter();
        Counter numRecordsOutCounter = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter();
        CountingMutableObjectIterator countingMutableObjectIterator = new CountingMutableObjectIterator(this.taskContext.getInput(0), numRecordsInCounter);
        CountingMutableObjectIterator countingMutableObjectIterator2 = new CountingMutableObjectIterator(this.taskContext.getInput(1), numRecordsInCounter);
        TypeSerializer serializer = this.taskContext.getInputSerializer(0).getSerializer();
        TypeSerializer serializer2 = this.taskContext.getInputSerializer(1).getSerializer();
        SpillingResettableMutableObjectIterator<?> spillingResettableMutableObjectIterator = new SpillingResettableMutableObjectIterator<>(countingMutableObjectIterator, (TypeSerializer<?>) serializer, this.memManager, this.taskContext.getIOManager(), this.memPagesForSpillingSide, this.taskContext.getContainingTask());
        this.spillIter = spillingResettableMutableObjectIterator;
        CrossFunction stub = this.taskContext.getStub();
        CountingCollector countingCollector = new CountingCollector(this.taskContext.getOutputCollector(), numRecordsOutCounter);
        if (!this.objectReuseEnabled) {
            while (this.running && (next = countingMutableObjectIterator2.next()) != 0) {
                while (this.running && (next2 = spillingResettableMutableObjectIterator.next()) != null) {
                    countingCollector.collect(stub.cross(next2, serializer2.copy(next)));
                }
                spillingResettableMutableObjectIterator.reset();
            }
            return;
        }
        Object mo12375createInstance = serializer.mo12375createInstance();
        Object mo12375createInstance2 = serializer2.mo12375createInstance();
        while (this.running && (next3 = countingMutableObjectIterator2.next(mo12375createInstance2)) != 0) {
            while (this.running && (next4 = spillingResettableMutableObjectIterator.next(mo12375createInstance)) != null) {
                countingCollector.collect(stub.cross(next4, next3));
            }
            spillingResettableMutableObjectIterator.reset();
        }
    }
}
