package org.apache.flink.streaming.api.functions.source;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.Preconditions;

@PublicEvolving
@Deprecated
/* loaded from: input_file:org/apache/flink/streaming/api/functions/source/FromElementsFunction.class */
public class FromElementsFunction<T> implements SourceFunction<T>, CheckpointedFunction, OutputTypeConfigurable<T> {
    private static final long serialVersionUID = 1;

    @Nullable
    private TypeSerializer<T> serializer;
    private byte[] elementsSerialized;
    private final int numElements;
    private volatile int numElementsEmitted;
    private volatile int numElementsToSkip;
    private volatile boolean isRunning;
    private final transient Iterable<T> elements;
    private transient ListState<Integer> checkpointedState;

    @SafeVarargs
    public FromElementsFunction(TypeSerializer<T> typeSerializer, T... tArr) throws IOException {
        this(typeSerializer, Arrays.asList(tArr));
    }

    public FromElementsFunction(TypeSerializer<T> typeSerializer, Iterable<T> iterable) throws IOException {
        this.isRunning = true;
        this.serializer = (TypeSerializer) Preconditions.checkNotNull(typeSerializer);
        this.elements = iterable;
        this.numElements = iterable instanceof Collection ? ((Collection) iterable).size() : (int) IterableUtils.toStream(iterable).count();
        serializeElements();
    }

    @SafeVarargs
    public FromElementsFunction(T... tArr) {
        this(Arrays.asList(tArr));
    }

    public FromElementsFunction(Iterable<T> iterable) {
        this.isRunning = true;
        this.serializer = null;
        this.elements = iterable;
        this.numElements = iterable instanceof Collection ? ((Collection) iterable).size() : (int) IterableUtils.toStream(iterable).count();
        checkIterable(iterable, Object.class);
    }

    @VisibleForTesting
    @Nullable
    public TypeSerializer<T> getSerializer() {
        return this.serializer;
    }

    private void serializeElements() throws IOException {
        Preconditions.checkState(this.serializer != null, "serializer not set");
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        DataOutputViewStreamWrapper dataOutputViewStreamWrapper = new DataOutputViewStreamWrapper(byteArrayOutputStream);
        try {
            Iterator<T> it = this.elements.iterator();
            while (it.hasNext()) {
                this.serializer.serialize(it.next(), dataOutputViewStreamWrapper);
            }
            this.elementsSerialized = byteArrayOutputStream.toByteArray();
        } catch (Exception e) {
            throw new IOException("Serializing the source elements failed: " + e.getMessage(), e);
        }
    }

    @Override // org.apache.flink.streaming.api.operators.OutputTypeConfigurable
    public void setOutputType(TypeInformation<T> typeInformation, ExecutionConfig executionConfig) {
        Preconditions.checkState(this.elements != null, "The output type should've been specified before shipping the graph to the cluster");
        checkIterable(this.elements, typeInformation.getTypeClass());
        TypeSerializer<T> createSerializer = typeInformation.createSerializer(executionConfig);
        if (Objects.equals(this.serializer, createSerializer)) {
            return;
        }
        this.serializer = createSerializer;
        try {
            serializeElements();
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    @Override // org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        Preconditions.checkState(this.checkpointedState == null, "The " + getClass().getSimpleName() + " has already been initialized.");
        this.checkpointedState = functionInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("from-elements-state", IntSerializer.INSTANCE));
        if (functionInitializationContext.isRestored()) {
            ArrayList arrayList = new ArrayList();
            Iterator<T> it = this.checkpointedState.get().iterator();
            while (it.hasNext()) {
                arrayList.add((Integer) it.next());
            }
            Preconditions.checkArgument(arrayList.size() == 1, getClass().getSimpleName() + " retrieved invalid state.");
            this.numElementsToSkip = ((Integer) arrayList.get(0)).intValue();
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:12:0x0048, code lost:
    
        r10 = move-exception;
     */
    /* JADX WARN: Code restructure failed: missing block: B:14:0x0069, code lost:
    
        throw new java.io.IOException("Failed to deserialize an element from the source. If you are using user-defined serialization (Value and Writable types), check the serialization functions.\nSerializer is " + r5.serializer, r10);
     */
    /* JADX WARN: Code restructure failed: missing block: B:18:0x006a, code lost:
    
        r5.numElementsEmitted = r5.numElementsToSkip;
     */
    /* JADX WARN: Code restructure failed: missing block: B:19:0x0072, code lost:
    
        r0 = r6.getCheckpointLock();
     */
    /* JADX WARN: Code restructure failed: missing block: B:21:0x007e, code lost:
    
        if (r5.isRunning == false) goto L46;
     */
    /* JADX WARN: Code restructure failed: missing block: B:23:0x0089, code lost:
    
        if (r5.numElementsEmitted >= r5.numElements) goto L45;
     */
    /* JADX WARN: Code restructure failed: missing block: B:25:0x008c, code lost:
    
        r0 = r5.serializer.mo7279deserialize(r0);
     */
    /* JADX WARN: Code restructure failed: missing block: B:27:0x00c0, code lost:
    
        monitor-enter(r0);
     */
    /* JADX WARN: Code restructure failed: missing block: B:29:0x00c1, code lost:
    
        r6.collect(r0);
        r5.numElementsEmitted++;
     */
    /* JADX WARN: Code restructure failed: missing block: B:30:0x00d5, code lost:
    
        monitor-exit(r0);
     */
    /* JADX WARN: Code restructure failed: missing block: B:39:0x0099, code lost:
    
        r12 = move-exception;
     */
    /* JADX WARN: Code restructure failed: missing block: B:41:0x00ba, code lost:
    
        throw new java.io.IOException("Failed to deserialize an element from the source. If you are using user-defined serialization (Value and Writable types), check the serialization functions.\nSerializer is " + r5.serializer, r12);
     */
    /* JADX WARN: Code restructure failed: missing block: B:43:0x00e4, code lost:
    
        return;
     */
    /* JADX WARN: Code restructure failed: missing block: B:45:?, code lost:
    
        return;
     */
    /* JADX WARN: Code restructure failed: missing block: B:5:0x002e, code lost:
    
        if (r9 > 0) goto L8;
     */
    /* JADX WARN: Code restructure failed: missing block: B:7:0x0033, code lost:
    
        if (r9 <= 0) goto L43;
     */
    /* JADX WARN: Code restructure failed: missing block: B:9:0x0036, code lost:
    
        r5.serializer.mo7279deserialize(r0);
        r9 = r9 - 1;
     */
    @Override // org.apache.flink.streaming.api.functions.source.SourceFunction
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void run(org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext<T> r6) throws java.lang.Exception {
        /*
            Method dump skipped, instructions count: 229
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.flink.streaming.api.functions.source.FromElementsFunction.run(org.apache.flink.streaming.api.functions.source.SourceFunction$SourceContext):void");
    }

    @Override // org.apache.flink.streaming.api.functions.source.SourceFunction
    public void cancel() {
        this.isRunning = false;
    }

    public int getNumElements() {
        return this.numElements;
    }

    public int getNumElementsEmitted() {
        return this.numElementsEmitted;
    }

    @Override // org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        Preconditions.checkState(this.checkpointedState != null, "The " + getClass().getSimpleName() + " has not been properly initialized.");
        this.checkpointedState.clear();
        this.checkpointedState.add(Integer.valueOf(this.numElementsEmitted));
    }

    public static <OUT> void checkCollection(Collection<OUT> collection, Class<OUT> cls) {
        checkIterable(collection, cls);
    }

    private static <OUT> void checkIterable(Iterable<OUT> iterable, Class<?> cls) {
        for (OUT out : iterable) {
            if (out == null) {
                throw new IllegalArgumentException("The collection contains a null element");
            }
            if (!cls.isAssignableFrom(out.getClass())) {
                throw new IllegalArgumentException("The elements in the collection are not all subclasses of " + cls.getCanonicalName());
            }
        }
    }
}
