package org.apache.flink.table.planner.plan.nodes.exec.stream;

import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import org.apache.flink.FlinkVersion;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.codegen.CodeGenUtils;
import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
import org.apache.flink.table.planner.codegen.ExprCodeGenerator;
import org.apache.flink.table.planner.codegen.FunctionCodeGenerator;
import org.apache.flink.table.planner.codegen.GeneratedExpression;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
import org.apache.flink.table.planner.plan.nodes.exec.spec.JoinSpec;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.planner.plan.utils.JoinUtil;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.TableConfigUtils;
import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.runtime.operators.join.temporal.TemporalProcessTimeJoinOperator;
import org.apache.flink.table.runtime.operators.join.temporal.TemporalRowTimeJoinOperator;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;

@ExecNodeMetadata(name = "stream-exec-temporal-join", version = 1, producedTransformations = {StreamExecTemporalJoin.TEMPORAL_JOIN_TRANSFORMATION}, minPlanVersion = FlinkVersion.v1_15, minStateVersion = FlinkVersion.v1_15)
/* loaded from: input_file:org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTemporalJoin.class */
public class StreamExecTemporalJoin extends ExecNodeBase<RowData> implements StreamExecNode<RowData>, SingleTransformationTranslator<RowData> {
    public static final String TEMPORAL_JOIN_TRANSFORMATION = "temporal-join";
    public static final String FIELD_NAME_JOIN_SPEC = "joinSpec";
    public static final String FIELD_NAME_IS_TEMPORAL_FUNCTION_JOIN = "isTemporalFunctionJoin";
    public static final String FIELD_NAME_LEFT_TIME_ATTRIBUTE_INDEX = "leftTimeAttributeIndex";
    public static final String FIELD_NAME_RIGHT_TIME_ATTRIBUTE_INDEX = "rightTimeAttributeIndex";
    public static final int FIELD_INDEX_FOR_PROC_TIME_ATTRIBUTE = -1;

    @JsonProperty("joinSpec")
    private final JoinSpec joinSpec;

    @JsonProperty(FIELD_NAME_IS_TEMPORAL_FUNCTION_JOIN)
    private final boolean isTemporalFunctionJoin;

    @JsonProperty(FIELD_NAME_LEFT_TIME_ATTRIBUTE_INDEX)
    private final int leftTimeAttributeIndex;

    @JsonProperty(FIELD_NAME_RIGHT_TIME_ATTRIBUTE_INDEX)
    private final int rightTimeAttributeIndex;

    public StreamExecTemporalJoin(ReadableConfig readableConfig, JoinSpec joinSpec, boolean z, int i, int i2, InputProperty inputProperty, InputProperty inputProperty2, RowType rowType, String str) {
        this(ExecNodeContext.newNodeId(), ExecNodeContext.newContext(StreamExecTemporalJoin.class), ExecNodeContext.newPersistedConfig(StreamExecTemporalJoin.class, readableConfig), joinSpec, z, i, i2, Arrays.asList(inputProperty, inputProperty2), rowType, str);
    }

    @JsonCreator
    public StreamExecTemporalJoin(@JsonProperty("id") int i, @JsonProperty("type") ExecNodeContext execNodeContext, @JsonProperty("configuration") ReadableConfig readableConfig, @JsonProperty("joinSpec") JoinSpec joinSpec, @JsonProperty("isTemporalFunctionJoin") boolean z, @JsonProperty("leftTimeAttributeIndex") int i2, @JsonProperty("rightTimeAttributeIndex") int i3, @JsonProperty("inputProperties") List<InputProperty> list, @JsonProperty("outputType") RowType rowType, @JsonProperty("description") String str) {
        super(i, execNodeContext, readableConfig, list, rowType, str);
        Preconditions.checkArgument(list.size() == 2);
        Preconditions.checkArgument(i3 == -1 || i3 >= 0);
        this.joinSpec = (JoinSpec) Preconditions.checkNotNull(joinSpec);
        this.isTemporalFunctionJoin = z;
        this.leftTimeAttributeIndex = i2;
        this.rightTimeAttributeIndex = i3;
    }

    @Override // org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase
    protected Transformation<RowData> translateToPlanInternal(PlannerBase plannerBase, ExecNodeConfig execNodeConfig) {
        ExecEdge execEdge = getInputEdges().get(0);
        ExecEdge execEdge2 = getInputEdges().get(1);
        RowType rowType = (RowType) execEdge.getOutputType();
        RowType rowType2 = (RowType) execEdge2.getOutputType();
        JoinUtil.validateJoinSpec(this.joinSpec, rowType, rowType2, true);
        FlinkJoinType joinType = this.joinSpec.getJoinType();
        if (this.isTemporalFunctionJoin) {
            if (joinType != FlinkJoinType.INNER) {
                throw new ValidationException("Temporal table function join currently only support INNER JOIN, but was " + joinType + " JOIN.");
            }
        } else if (joinType != FlinkJoinType.LEFT && joinType != FlinkJoinType.INNER) {
            throw new TableException("Temporal table join currently only support INNER JOIN and LEFT JOIN, but was " + joinType + " JOIN.");
        }
        RowType rowType3 = (RowType) getOutputType();
        TwoInputStreamOperator<RowData, RowData, RowData> joinOperator = getJoinOperator(execNodeConfig, plannerBase.getFlinkContext().getClassLoader(), rowType, rowType2);
        Transformation<?> translateToPlan = execEdge.translateToPlan(plannerBase);
        TwoInputTransformation createTwoInputTransformation = ExecNodeUtil.createTwoInputTransformation(translateToPlan, execEdge2.translateToPlan(plannerBase), createTransformationMeta(TEMPORAL_JOIN_TRANSFORMATION, execNodeConfig), joinOperator, InternalTypeInfo.of(rowType3), translateToPlan.getParallelism());
        RowDataKeySelector leftKeySelector = getLeftKeySelector(plannerBase.getFlinkContext().getClassLoader(), rowType);
        createTwoInputTransformation.setStateKeySelectors(leftKeySelector, getRightKeySelector(plannerBase.getFlinkContext().getClassLoader(), rowType2));
        createTwoInputTransformation.setStateKeyType(leftKeySelector.getProducedType2());
        return createTwoInputTransformation;
    }

    private RowDataKeySelector getLeftKeySelector(ClassLoader classLoader, RowType rowType) {
        return KeySelectorUtil.getRowDataSelector(classLoader, this.joinSpec.getLeftKeys(), InternalTypeInfo.of(rowType));
    }

    private RowDataKeySelector getRightKeySelector(ClassLoader classLoader, RowType rowType) {
        return KeySelectorUtil.getRowDataSelector(classLoader, this.joinSpec.getRightKeys(), InternalTypeInfo.of(rowType));
    }

    private TwoInputStreamOperator<RowData, RowData, RowData> getJoinOperator(ExecNodeConfig execNodeConfig, ClassLoader classLoader, RowType rowType, RowType rowType2) {
        CodeGeneratorContext codeGeneratorContext = new CodeGeneratorContext(execNodeConfig, classLoader);
        ExprCodeGenerator bindSecondInput = new ExprCodeGenerator(codeGeneratorContext, false).bindInput(rowType, CodeGenUtils.DEFAULT_INPUT1_TERM(), JavaScalaConversionUtil.toScala(Optional.empty())).bindSecondInput(rowType2, CodeGenUtils.DEFAULT_INPUT2_TERM(), JavaScalaConversionUtil.toScala(Optional.empty()));
        String str = "return true;";
        if (this.joinSpec.getNonEquiCondition().isPresent()) {
            GeneratedExpression generateExpression = bindSecondInput.generateExpression(this.joinSpec.getNonEquiCondition().get());
            str = String.format("%s\nreturn %s;", generateExpression.code(), generateExpression.resultTerm());
        }
        return createJoinOperator(execNodeConfig, rowType, rowType2, FunctionCodeGenerator.generateJoinCondition(codeGeneratorContext, "ConditionFunction", str, CodeGenUtils.DEFAULT_INPUT1_TERM(), CodeGenUtils.DEFAULT_INPUT2_TERM()));
    }

    private TwoInputStreamOperator<RowData, RowData, RowData> createJoinOperator(ExecNodeConfig execNodeConfig, RowType rowType, RowType rowType2, GeneratedJoinCondition generatedJoinCondition) {
        boolean z = this.joinSpec.getJoinType() == FlinkJoinType.LEFT;
        long stateRetentionTime = execNodeConfig.getStateRetentionTime();
        long maxIdleStateRetentionTime = TableConfigUtils.getMaxIdleStateRetentionTime(execNodeConfig);
        if (this.rightTimeAttributeIndex >= 0) {
            return new TemporalRowTimeJoinOperator(InternalTypeInfo.of(rowType), InternalTypeInfo.of(rowType2), generatedJoinCondition, this.leftTimeAttributeIndex, this.rightTimeAttributeIndex, stateRetentionTime, maxIdleStateRetentionTime, z);
        }
        if (this.isTemporalFunctionJoin) {
            return new TemporalProcessTimeJoinOperator(InternalTypeInfo.of(rowType2), generatedJoinCondition, stateRetentionTime, maxIdleStateRetentionTime, z);
        }
        throw new TableException("Processing-time temporal join is not supported yet.");
    }
}
