package org.apache.flink.table.planner.plan.nodes.exec.stream;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.apache.calcite.rex.RexNode;
import org.apache.flink.FlinkVersion;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.codegen.WatermarkGeneratorCodeGenerator;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.runtime.generated.GeneratedWatermarkGenerator;
import org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperatorFactory;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;

@ExecNodeMetadata(name = "stream-exec-watermark-assigner", version = 1, producedTransformations = {StreamExecWatermarkAssigner.WATERMARK_ASSIGNER_TRANSFORMATION}, minPlanVersion = FlinkVersion.v1_15, minStateVersion = FlinkVersion.v1_15)
/* loaded from: input_file:org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWatermarkAssigner.class */
public class StreamExecWatermarkAssigner extends ExecNodeBase<RowData> implements StreamExecNode<RowData>, SingleTransformationTranslator<RowData> {
    public static final String WATERMARK_ASSIGNER_TRANSFORMATION = "watermark-assigner";
    public static final String FIELD_NAME_WATERMARK_EXPR = "watermarkExpr";
    public static final String FIELD_NAME_ROWTIME_FIELD_INDEX = "rowtimeFieldIndex";

    @JsonProperty("watermarkExpr")
    private final RexNode watermarkExpr;

    @JsonProperty(FIELD_NAME_ROWTIME_FIELD_INDEX)
    private final int rowtimeFieldIndex;

    public StreamExecWatermarkAssigner(ReadableConfig readableConfig, RexNode rexNode, int i, InputProperty inputProperty, RowType rowType, String str) {
        this(ExecNodeContext.newNodeId(), ExecNodeContext.newContext(StreamExecWatermarkAssigner.class), ExecNodeContext.newPersistedConfig(StreamExecWatermarkAssigner.class, readableConfig), rexNode, i, Collections.singletonList(inputProperty), rowType, str);
    }

    @JsonCreator
    public StreamExecWatermarkAssigner(@JsonProperty("id") int i, @JsonProperty("type") ExecNodeContext execNodeContext, @JsonProperty("configuration") ReadableConfig readableConfig, @JsonProperty("watermarkExpr") RexNode rexNode, @JsonProperty("rowtimeFieldIndex") int i2, @JsonProperty("inputProperties") List<InputProperty> list, @JsonProperty("outputType") RowType rowType, @JsonProperty("description") String str) {
        super(i, execNodeContext, readableConfig, list, rowType, str);
        Preconditions.checkArgument(list.size() == 1);
        this.watermarkExpr = (RexNode) Preconditions.checkNotNull(rexNode);
        this.rowtimeFieldIndex = i2;
    }

    @Override // org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase
    protected Transformation<RowData> translateToPlanInternal(PlannerBase plannerBase, ExecNodeConfig execNodeConfig) {
        ExecEdge execEdge = getInputEdges().get(0);
        Transformation<?> translateToPlan = execEdge.translateToPlan(plannerBase);
        GeneratedWatermarkGenerator generateWatermarkGenerator = WatermarkGeneratorCodeGenerator.generateWatermarkGenerator(execNodeConfig, plannerBase.getFlinkContext().getClassLoader(), (RowType) execEdge.getOutputType(), this.watermarkExpr, JavaScalaConversionUtil.toScala(Optional.empty()));
        return ExecNodeUtil.createOneInputTransformation(translateToPlan, createTransformationMeta(WATERMARK_ASSIGNER_TRANSFORMATION, execNodeConfig), new WatermarkAssignerOperatorFactory(this.rowtimeFieldIndex, ((Duration) execNodeConfig.get(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT)).toMillis(), generateWatermarkGenerator), InternalTypeInfo.of(getOutputType()), translateToPlan.getParallelism());
    }
}
