package org.apache.flink.table.planner.plan.nodes.exec.batch;

import java.util.Arrays;
import java.util.stream.IntStream;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
import org.apache.flink.table.planner.codegen.LongHashJoinGenerator;
import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.fusion.OpFusionCodegenSpecGenerator;
import org.apache.flink.table.planner.plan.fusion.generator.TwoInputOpFusionCodegenSpecGenerator;
import org.apache.flink.table.planner.plan.fusion.spec.HashJoinFusionCodegenSpec;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
import org.apache.flink.table.planner.plan.nodes.exec.spec.JoinSpec;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.planner.plan.utils.JoinUtil;
import org.apache.flink.table.planner.plan.utils.SorMergeJoinOperatorUtil;
import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
import org.apache.flink.table.runtime.generated.GeneratedProjection;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.runtime.operators.join.HashJoinOperator;
import org.apache.flink.table.runtime.operators.join.HashJoinType;
import org.apache.flink.table.runtime.operators.join.SortMergeJoinFunction;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;

/* loaded from: input_file:org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashJoin.class */
public class BatchExecHashJoin extends ExecNodeBase<RowData> implements BatchExecNode<RowData>, SingleTransformationTranslator<RowData> {
    private final JoinSpec joinSpec;
    private final boolean isBroadcast;
    private final boolean leftIsBuild;
    private final int estimatedLeftAvgRowSize;
    private final int estimatedRightAvgRowSize;
    private final long estimatedLeftRowCount;
    private final long estimatedRightRowCount;
    private final boolean tryDistinctBuildRow;

    public BatchExecHashJoin(ReadableConfig readableConfig, JoinSpec joinSpec, int i, int i2, long j, long j2, boolean z, boolean z2, boolean z3, InputProperty inputProperty, InputProperty inputProperty2, RowType rowType, String str) {
        super(ExecNodeContext.newNodeId(), ExecNodeContext.newContext(BatchExecHashJoin.class), ExecNodeContext.newPersistedConfig(BatchExecHashJoin.class, readableConfig), Arrays.asList(inputProperty, inputProperty2), rowType, str);
        this.joinSpec = joinSpec;
        this.isBroadcast = z;
        this.leftIsBuild = z2;
        this.estimatedLeftAvgRowSize = i;
        this.estimatedRightAvgRowSize = i2;
        this.estimatedLeftRowCount = j;
        this.estimatedRightRowCount = j2;
        this.tryDistinctBuildRow = z3;
    }

    @Override // org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase
    protected Transformation<RowData> translateToPlanInternal(PlannerBase plannerBase, ExecNodeConfig execNodeConfig) {
        Transformation<?> transformation;
        GeneratedProjection generatedProjection;
        RowType rowType;
        int i;
        long j;
        int[] iArr;
        Transformation<?> transformation2;
        GeneratedProjection generatedProjection2;
        RowType rowType2;
        long j2;
        int[] iArr2;
        ExecEdge execEdge = getInputEdges().get(0);
        ExecEdge execEdge2 = getInputEdges().get(1);
        Transformation<?> translateToPlan = execEdge.translateToPlan(plannerBase);
        Transformation<?> translateToPlan2 = execEdge2.translateToPlan(plannerBase);
        RowType rowType3 = (RowType) execEdge.getOutputType();
        RowType rowType4 = (RowType) execEdge2.getOutputType();
        JoinUtil.validateJoinSpec(this.joinSpec, rowType3, rowType4, false);
        int[] leftKeys = this.joinSpec.getLeftKeys();
        int[] rightKeys = this.joinSpec.getRightKeys();
        IntStream of = IntStream.of(leftKeys);
        rowType3.getClass();
        RowType of2 = RowType.of((LogicalType[]) of.mapToObj(rowType3::getTypeAt).toArray(i2 -> {
            return new LogicalType[i2];
        }));
        GeneratedJoinCondition generateConditionFunction = JoinUtil.generateConditionFunction(execNodeConfig, plannerBase.getFlinkContext().getClassLoader(), this.joinSpec.getNonEquiCondition().orElse(null), rowType3, rowType4);
        GeneratedProjection generateProjection = ProjectionCodeGenerator.generateProjection(new CodeGeneratorContext(execNodeConfig, plannerBase.getFlinkContext().getClassLoader()), "HashJoinLeftProjection", rowType3, of2, leftKeys);
        GeneratedProjection generateProjection2 = ProjectionCodeGenerator.generateProjection(new CodeGeneratorContext(execNodeConfig, plannerBase.getFlinkContext().getClassLoader()), "HashJoinRightProjection", rowType4, of2, rightKeys);
        boolean z = !this.leftIsBuild;
        if (this.leftIsBuild) {
            transformation = translateToPlan;
            generatedProjection = generateProjection;
            rowType = rowType3;
            i = this.estimatedLeftAvgRowSize;
            j = this.estimatedLeftRowCount;
            iArr = leftKeys;
            transformation2 = translateToPlan2;
            generatedProjection2 = generateProjection2;
            rowType2 = rowType4;
            j2 = this.estimatedRightRowCount;
            iArr2 = rightKeys;
        } else {
            transformation = translateToPlan2;
            generatedProjection = generateProjection2;
            rowType = rowType4;
            i = this.estimatedRightAvgRowSize;
            j = this.estimatedRightRowCount;
            iArr = rightKeys;
            transformation2 = translateToPlan;
            generatedProjection2 = generateProjection;
            rowType2 = rowType3;
            j2 = this.estimatedLeftRowCount;
            iArr2 = leftKeys;
        }
        FlinkJoinType joinType = this.joinSpec.getJoinType();
        HashJoinType of3 = HashJoinType.of(this.leftIsBuild, joinType.isLeftOuter(), joinType.isRightOuter(), joinType == FlinkJoinType.SEMI, joinType == FlinkJoinType.ANTI);
        long bytes = ((MemorySize) execNodeConfig.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_EXTERNAL_BUFFER_MEMORY)).getBytes();
        long largeManagedMemory = getLargeManagedMemory(joinType, execNodeConfig);
        SortMergeJoinFunction sortMergeJoinFunction = SorMergeJoinOperatorUtil.getSortMergeJoinFunction(plannerBase.getFlinkContext().getClassLoader(), execNodeConfig, joinType, rowType3, rowType4, leftKeys, rightKeys, of2, this.leftIsBuild, this.joinSpec.getFilterNulls(), generateConditionFunction, (1.0d * bytes) / largeManagedMemory);
        boolean booleanValue = ((Boolean) execNodeConfig.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED)).booleanValue();
        int bytes2 = (int) ((MemorySize) execNodeConfig.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE)).getBytes();
        return ExecNodeUtil.createTwoInputTransformation(transformation, transformation2, createTransformationName(execNodeConfig), createTransformationDescription(execNodeConfig), LongHashJoinGenerator.support(of3, of2, this.joinSpec.getFilterNulls()) ? LongHashJoinGenerator.gen(execNodeConfig, plannerBase.getFlinkContext().getClassLoader(), of3, of2, rowType, rowType2, iArr, iArr2, i, j, z, generateConditionFunction, this.leftIsBuild, booleanValue, bytes2, sortMergeJoinFunction) : SimpleOperatorFactory.of(HashJoinOperator.newHashJoinOperator(of3, this.leftIsBuild, booleanValue, bytes2, generateConditionFunction, z, this.joinSpec.getFilterNulls(), generatedProjection, generatedProjection2, this.tryDistinctBuildRow, i, j, j2, of2, sortMergeJoinFunction)), InternalTypeInfo.of(getOutputType()), transformation2.getParallelism(), largeManagedMemory, false);
    }

    private long getLargeManagedMemory(FlinkJoinType flinkJoinType, ExecNodeConfig execNodeConfig) {
        long bytes = ((MemorySize) execNodeConfig.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_HASH_JOIN_MEMORY)).getBytes();
        long bytes2 = ((MemorySize) execNodeConfig.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_EXTERNAL_BUFFER_MEMORY)).getBytes();
        long bytes3 = ((MemorySize) execNodeConfig.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_SORT_MEMORY)).getBytes();
        int i = 1;
        if (flinkJoinType == FlinkJoinType.FULL) {
            i = 2;
        }
        return Math.max(bytes, (bytes2 * i) + (bytes3 * 2));
    }

    @Override // org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase, org.apache.flink.table.planner.plan.nodes.exec.FusionCodegenExecNode
    public boolean supportFusionCodegen() {
        RowType rowType = (RowType) getInputEdges().get(0).getOutputType();
        IntStream of = IntStream.of(this.joinSpec.getLeftKeys());
        rowType.getClass();
        RowType of2 = RowType.of((LogicalType[]) of.mapToObj(rowType::getTypeAt).toArray(i -> {
            return new LogicalType[i];
        }));
        FlinkJoinType joinType = this.joinSpec.getJoinType();
        return LongHashJoinGenerator.support(HashJoinType.of(this.leftIsBuild, joinType.isLeftOuter(), joinType.isRightOuter(), joinType == FlinkJoinType.SEMI, joinType == FlinkJoinType.ANTI), of2, this.joinSpec.getFilterNulls());
    }

    @Override // org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase
    protected OpFusionCodegenSpecGenerator translateToFusionCodegenSpecInternal(PlannerBase plannerBase, ExecNodeConfig execNodeConfig) {
        OpFusionCodegenSpecGenerator translateToFusionCodegenSpec = getInputEdges().get(0).translateToFusionCodegenSpec(plannerBase);
        OpFusionCodegenSpecGenerator translateToFusionCodegenSpec2 = getInputEdges().get(1).translateToFusionCodegenSpec(plannerBase);
        TwoInputOpFusionCodegenSpecGenerator twoInputOpFusionCodegenSpecGenerator = new TwoInputOpFusionCodegenSpecGenerator(translateToFusionCodegenSpec, translateToFusionCodegenSpec2, getLargeManagedMemory(this.joinSpec.getJoinType(), execNodeConfig), (RowType) getOutputType(), new HashJoinFusionCodegenSpec(new CodeGeneratorContext(execNodeConfig, plannerBase.getFlinkContext().getClassLoader()), this.isBroadcast, this.leftIsBuild, this.joinSpec, this.estimatedLeftAvgRowSize, this.estimatedRightAvgRowSize, this.estimatedLeftRowCount, this.estimatedRightRowCount, ((Boolean) execNodeConfig.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED)).booleanValue(), (int) ((MemorySize) execNodeConfig.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE)).getBytes()));
        translateToFusionCodegenSpec.addOutput(1, twoInputOpFusionCodegenSpecGenerator);
        translateToFusionCodegenSpec2.addOutput(2, twoInputOpFusionCodegenSpecGenerator);
        return twoInputOpFusionCodegenSpecGenerator;
    }
}
