package org.apache.flink.queryablestate.network;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.queryablestate.network.messages.MessageBody;
import org.apache.flink.queryablestate.network.messages.MessageSerializer;
import org.apache.flink.queryablestate.network.stats.KvStateRequestStats;
import org.apache.flink.shaded.guava31.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption;
import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.Future;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.GenericFutureListener;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/queryablestate/network/Client.class */
public class Client<REQ extends MessageBody, RESP extends MessageBody> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) Client.class);
    private final String clientName;
    private final Bootstrap bootstrap;
    private final MessageSerializer<REQ, RESP> messageSerializer;
    private final KvStateRequestStats stats;
    private final Map<InetSocketAddress, ServerConnection<REQ, RESP>> connections = new ConcurrentHashMap();
    private final AtomicReference<CompletableFuture<Void>> clientShutdownFuture = new AtomicReference<>(null);

    public Client(String str, int i, MessageSerializer<REQ, RESP> messageSerializer, KvStateRequestStats kvStateRequestStats) {
        Preconditions.checkArgument(i >= 1, "Non-positive number of event loop threads.");
        this.clientName = (String) Preconditions.checkNotNull(str);
        this.messageSerializer = (MessageSerializer) Preconditions.checkNotNull(messageSerializer);
        this.stats = (KvStateRequestStats) Preconditions.checkNotNull(kvStateRequestStats);
        this.bootstrap = new Bootstrap().group(new NioEventLoopGroup(i, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Flink " + str + " Event Loop Thread %d").build())).channel(NioSocketChannel.class).option(ChannelOption.ALLOCATOR, new NettyBufferPool(i)).handler(new ChannelInitializer<SocketChannel>() { // from class: org.apache.flink.queryablestate.network.Client.1
            /* JADX INFO: Access modifiers changed from: protected */
            @Override // org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer
            public void initChannel(SocketChannel socketChannel) {
                socketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)).addLast(new ChunkedWriteHandler());
            }
        });
    }

    public String getClientName() {
        return this.clientName;
    }

    public CompletableFuture<RESP> sendRequest(InetSocketAddress inetSocketAddress, REQ req) {
        return this.clientShutdownFuture.get() != null ? FutureUtils.completedExceptionally(new IllegalStateException(this.clientName + " is already shut down.")) : this.connections.computeIfAbsent(inetSocketAddress, inetSocketAddress2 -> {
            ServerConnection createPendingConnection = ServerConnection.createPendingConnection(this.clientName, this.messageSerializer, this.stats);
            ChannelFuture connect = this.bootstrap.connect(inetSocketAddress.getAddress(), inetSocketAddress.getPort());
            createPendingConnection.getClass();
            connect.addListener2((GenericFutureListener<? extends Future<? super Void>>) createPendingConnection::establishConnection);
            createPendingConnection.getCloseFuture().handle((r7, th) -> {
                return Boolean.valueOf(this.connections.remove(inetSocketAddress, createPendingConnection));
            });
            return createPendingConnection;
        }).sendRequest(req);
    }

    public CompletableFuture<Void> shutdown() {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        if (!this.clientShutdownFuture.compareAndSet(null, completableFuture)) {
            return this.clientShutdownFuture.get();
        }
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<InetSocketAddress, ServerConnection<REQ, RESP>> entry : this.connections.entrySet()) {
            if (this.connections.remove(entry.getKey(), entry.getValue())) {
                arrayList.add(entry.getValue().close());
            }
        }
        CompletableFuture.allOf((CompletableFuture[]) arrayList.toArray(new CompletableFuture[arrayList.size()])).whenComplete((r9, th) -> {
            if (th != null) {
                LOG.warn("Problem while shutting down the connections at the {}: {}", this.clientName, th);
            }
            if (this.bootstrap == null) {
                completableFuture.complete(null);
                return;
            }
            EventLoopGroup group = this.bootstrap.config2().group();
            if (group == null || group.isShutdown()) {
                completableFuture.complete(null);
            } else {
                group.shutdownGracefully(0L, 0L, TimeUnit.MILLISECONDS).addListener2(future -> {
                    if (future.isSuccess()) {
                        completableFuture.complete(null);
                    } else {
                        completableFuture.completeExceptionally(future.cause());
                    }
                });
            }
        });
        return completableFuture;
    }

    @VisibleForTesting
    public boolean isEventGroupShutdown() {
        return this.bootstrap == null || this.bootstrap.config2().group().isTerminated();
    }
}
