package org.apache.flink.table.planner.plan.nodes.exec.stream;

import java.util.List;
import javax.annotation.Nullable;
import org.apache.flink.FlinkVersion;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
import org.apache.flink.table.planner.plan.nodes.exec.StateMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.spec.JoinSpec;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.planner.plan.utils.JoinUtil;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.runtime.operators.join.stream.AbstractStreamingJoinOperator;
import org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator;
import org.apache.flink.table.runtime.operators.join.stream.StreamingSemiAntiJoinOperator;
import org.apache.flink.table.runtime.operators.join.stream.state.JoinInputSideSpec;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;

@ExecNodeMetadata(name = "stream-exec-join", version = 1, producedTransformations = {StreamExecJoin.JOIN_TRANSFORMATION}, minPlanVersion = FlinkVersion.v1_15, minStateVersion = FlinkVersion.v1_15)
/* loaded from: input_file:org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecJoin.class */
public class StreamExecJoin extends ExecNodeBase<RowData> implements StreamExecNode<RowData>, SingleTransformationTranslator<RowData> {
    public static final String JOIN_TRANSFORMATION = "join";
    public static final String LEFT_STATE_NAME = "leftState";
    public static final String RIGHT_STATE_NAME = "rightState";
    public static final String FIELD_NAME_JOIN_SPEC = "joinSpec";
    public static final String FIELD_NAME_LEFT_UPSERT_KEYS = "leftUpsertKeys";
    public static final String FIELD_NAME_RIGHT_UPSERT_KEYS = "rightUpsertKeys";

    @JsonProperty("joinSpec")
    private final JoinSpec joinSpec;

    @JsonInclude(JsonInclude.Include.NON_DEFAULT)
    @JsonProperty(FIELD_NAME_LEFT_UPSERT_KEYS)
    private final List<int[]> leftUpsertKeys;

    @JsonInclude(JsonInclude.Include.NON_DEFAULT)
    @JsonProperty(FIELD_NAME_RIGHT_UPSERT_KEYS)
    private final List<int[]> rightUpsertKeys;

    @Nullable
    @JsonInclude(JsonInclude.Include.NON_NULL)
    @JsonProperty("state")
    private final List<StateMetadata> stateMetadataList;

    public StreamExecJoin(ReadableConfig readableConfig, JoinSpec joinSpec, List<int[]> list, List<int[]> list2, InputProperty inputProperty, InputProperty inputProperty2, RowType rowType, String str) {
        this(ExecNodeContext.newNodeId(), ExecNodeContext.newContext(StreamExecJoin.class), ExecNodeContext.newPersistedConfig(StreamExecJoin.class, readableConfig), joinSpec, list, list2, StateMetadata.getMultiInputOperatorDefaultMeta(readableConfig, LEFT_STATE_NAME, RIGHT_STATE_NAME), Lists.newArrayList(inputProperty, inputProperty2), rowType, str);
    }

    @JsonCreator
    public StreamExecJoin(@JsonProperty("id") int i, @JsonProperty("type") ExecNodeContext execNodeContext, @JsonProperty("configuration") ReadableConfig readableConfig, @JsonProperty("joinSpec") JoinSpec joinSpec, @JsonProperty("leftUpsertKeys") List<int[]> list, @JsonProperty("rightUpsertKeys") List<int[]> list2, @Nullable @JsonProperty("state") List<StateMetadata> list3, @JsonProperty("inputProperties") List<InputProperty> list4, @JsonProperty("outputType") RowType rowType, @JsonProperty("description") String str) {
        super(i, execNodeContext, readableConfig, list4, rowType, str);
        Preconditions.checkArgument(list4.size() == 2);
        this.joinSpec = (JoinSpec) Preconditions.checkNotNull(joinSpec);
        this.leftUpsertKeys = list;
        this.rightUpsertKeys = list2;
        this.stateMetadataList = list3;
    }

    @Override // org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase
    protected Transformation<RowData> translateToPlanInternal(PlannerBase plannerBase, ExecNodeConfig execNodeConfig) {
        AbstractStreamingJoinOperator streamingSemiAntiJoinOperator;
        ExecEdge execEdge = getInputEdges().get(0);
        ExecEdge execEdge2 = getInputEdges().get(1);
        Transformation<?> translateToPlan = execEdge.translateToPlan(plannerBase);
        Transformation<?> translateToPlan2 = execEdge2.translateToPlan(plannerBase);
        RowType rowType = (RowType) execEdge.getOutputType();
        RowType rowType2 = (RowType) execEdge2.getOutputType();
        JoinUtil.validateJoinSpec(this.joinSpec, rowType, rowType2, true);
        int[] leftKeys = this.joinSpec.getLeftKeys();
        int[] rightKeys = this.joinSpec.getRightKeys();
        InternalTypeInfo<RowData> of = InternalTypeInfo.of(rowType);
        JoinInputSideSpec analyzeJoinInput = JoinUtil.analyzeJoinInput(plannerBase.getFlinkContext().getClassLoader(), of, leftKeys, this.leftUpsertKeys);
        InternalTypeInfo<RowData> of2 = InternalTypeInfo.of(rowType2);
        JoinInputSideSpec analyzeJoinInput2 = JoinUtil.analyzeJoinInput(plannerBase.getFlinkContext().getClassLoader(), of2, rightKeys, this.rightUpsertKeys);
        GeneratedJoinCondition generateConditionFunction = JoinUtil.generateConditionFunction(execNodeConfig, plannerBase.getFlinkContext().getClassLoader(), this.joinSpec, rowType, rowType2);
        List<Long> stateTtlForMultiInputOperator = StateMetadata.getStateTtlForMultiInputOperator(execNodeConfig, 2, this.stateMetadataList);
        long longValue = stateTtlForMultiInputOperator.get(0).longValue();
        long longValue2 = stateTtlForMultiInputOperator.get(1).longValue();
        FlinkJoinType joinType = this.joinSpec.getJoinType();
        if (joinType == FlinkJoinType.ANTI || joinType == FlinkJoinType.SEMI) {
            streamingSemiAntiJoinOperator = new StreamingSemiAntiJoinOperator(joinType == FlinkJoinType.ANTI, of, of2, generateConditionFunction, analyzeJoinInput, analyzeJoinInput2, this.joinSpec.getFilterNulls(), longValue, longValue2);
        } else {
            streamingSemiAntiJoinOperator = new StreamingJoinOperator(of, of2, generateConditionFunction, analyzeJoinInput, analyzeJoinInput2, joinType == FlinkJoinType.LEFT || joinType == FlinkJoinType.FULL, joinType == FlinkJoinType.RIGHT || joinType == FlinkJoinType.FULL, this.joinSpec.getFilterNulls(), longValue, longValue2);
        }
        TwoInputTransformation createTwoInputTransformation = ExecNodeUtil.createTwoInputTransformation((Transformation) translateToPlan, (Transformation) translateToPlan2, createTransformationMeta(JOIN_TRANSFORMATION, execNodeConfig), (TwoInputStreamOperator) streamingSemiAntiJoinOperator, (TypeInformation) InternalTypeInfo.of((RowType) getOutputType()), translateToPlan.getParallelism(), false);
        RowDataKeySelector rowDataSelector = KeySelectorUtil.getRowDataSelector(plannerBase.getFlinkContext().getClassLoader(), leftKeys, of);
        createTwoInputTransformation.setStateKeySelectors(rowDataSelector, KeySelectorUtil.getRowDataSelector(plannerBase.getFlinkContext().getClassLoader(), rightKeys, of2));
        createTwoInputTransformation.setStateKeyType(rowDataSelector.getProducedType2());
        return createTwoInputTransformation;
    }
}
