package org.apache.flink.runtime.io.network.netty;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.net.SocketOption;
import javax.annotation.Nullable;
import jdk.net.ExtendedSocketOptions;
import org.agrona.concurrent.BackoffIdleStrategy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelException;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption;
import org.apache.flink.shaded.netty4.io.netty.channel.epoll.Epoll;
import org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollChannelOption;
import org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollSocketChannel;
import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioChannelOption;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/runtime/io/network/netty/NettyClient.class */
public class NettyClient {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) NettyClient.class);

    @VisibleForTesting
    static final String NIO_TCP_KEEPIDLE_KEY = "TCP_KEEPIDLE";

    @VisibleForTesting
    static final String NIO_TCP_KEEPINTERVAL_KEY = "TCP_KEEPINTERVAL";

    @VisibleForTesting
    static final String NIO_TCP_KEEPCOUNT_KEY = "TCP_KEEPCOUNT";
    private final NettyConfig config;
    private NettyProtocol protocol;
    private Bootstrap bootstrap;

    @Nullable
    private SSLHandlerFactory clientSSLFactory;

    /* JADX INFO: Access modifiers changed from: package-private */
    public NettyClient(NettyConfig nettyConfig) {
        this.config = nettyConfig;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void init(NettyProtocol nettyProtocol, NettyBufferPool nettyBufferPool) throws IOException {
        Preconditions.checkState(this.bootstrap == null, "Netty client has already been initialized.");
        this.protocol = nettyProtocol;
        long nanoTime = System.nanoTime();
        this.bootstrap = new Bootstrap();
        switch (this.config.getTransportType()) {
            case NIO:
                initNioBootstrap();
                break;
            case EPOLL:
                initEpollBootstrap();
                break;
            case AUTO:
                if (!Epoll.isAvailable()) {
                    initNioBootstrap();
                    LOG.info("Transport type 'auto': using NIO.");
                    break;
                } else {
                    initEpollBootstrap();
                    LOG.info("Transport type 'auto': using EPOLL.");
                    break;
                }
        }
        this.bootstrap.option(ChannelOption.TCP_NODELAY, true);
        this.bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
        this.bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Integer.valueOf(this.config.getClientConnectTimeoutSeconds() * 1000));
        this.bootstrap.option(ChannelOption.ALLOCATOR, nettyBufferPool);
        int sendAndReceiveBufferSize = this.config.getSendAndReceiveBufferSize();
        if (sendAndReceiveBufferSize > 0) {
            this.bootstrap.option(ChannelOption.SO_SNDBUF, Integer.valueOf(sendAndReceiveBufferSize));
            this.bootstrap.option(ChannelOption.SO_RCVBUF, Integer.valueOf(sendAndReceiveBufferSize));
        }
        try {
            this.clientSSLFactory = this.config.createClientSSLEngineFactory();
            LOG.info("Successful initialization (took {} ms).", Long.valueOf((System.nanoTime() - nanoTime) / BackoffIdleStrategy.DEFAULT_MAX_PARK_PERIOD_NS));
        } catch (Exception e) {
            throw new IOException("Failed to initialize SSL Context for the Netty client", e);
        }
    }

    NettyConfig getConfig() {
        return this.config;
    }

    Bootstrap getBootstrap() {
        return this.bootstrap;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void shutdown() {
        long nanoTime = System.nanoTime();
        if (this.bootstrap != null) {
            if (this.bootstrap.config2().group() != null) {
                this.bootstrap.config2().group().shutdownGracefully();
            }
            this.bootstrap = null;
        }
        LOG.info("Successful shutdown (took {} ms).", Long.valueOf((System.nanoTime() - nanoTime) / BackoffIdleStrategy.DEFAULT_MAX_PARK_PERIOD_NS));
    }

    private void initNioBootstrap() {
        this.bootstrap.group(new NioEventLoopGroup(this.config.getClientNumThreads(), NettyServer.getNamedThreadFactory("Flink Netty Client (" + this.config.getServerPort() + ")"))).channel(NioSocketChannel.class);
        this.config.getTcpKeepIdleInSeconds().ifPresent(num -> {
            setNioKeepaliveOptions(NIO_TCP_KEEPIDLE_KEY, num.intValue());
        });
        this.config.getTcpKeepInternalInSeconds().ifPresent(num2 -> {
            setNioKeepaliveOptions(NIO_TCP_KEEPINTERVAL_KEY, num2.intValue());
        });
        this.config.getTcpKeepCount().ifPresent(num3 -> {
            setNioKeepaliveOptions(NIO_TCP_KEEPCOUNT_KEY, num3.intValue());
        });
    }

    private void setNioKeepaliveOptions(String str, int i) {
        try {
            this.bootstrap.option(NioChannelOption.of((SocketOption) ExtendedSocketOptions.class.getField(str).get(null)), Integer.valueOf(i));
        } catch (IllegalAccessException | NoSuchFieldException e) {
            LOG.error("Ignore keepalive option {}, this may be due to using netty transport type of nio and an older version of jdk 8, refer to https://bugs.openjdk.org/browse/JDK-8194298", str, e);
        }
    }

    private void initEpollBootstrap() {
        this.bootstrap.group(new EpollEventLoopGroup(this.config.getClientNumThreads(), NettyServer.getNamedThreadFactory("Flink Netty Client (" + this.config.getServerPort() + ")"))).channel(EpollSocketChannel.class);
        this.config.getTcpKeepIdleInSeconds().ifPresent(num -> {
            this.bootstrap.option(EpollChannelOption.TCP_KEEPIDLE, num);
        });
        this.config.getTcpKeepInternalInSeconds().ifPresent(num2 -> {
            this.bootstrap.option(EpollChannelOption.TCP_KEEPINTVL, num2);
        });
        this.config.getTcpKeepCount().ifPresent(num3 -> {
            this.bootstrap.option(EpollChannelOption.TCP_KEEPCNT, num3);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ChannelFuture connect(final InetSocketAddress inetSocketAddress) {
        Preconditions.checkState(this.bootstrap != null, "Client has not been initialized yet.");
        this.bootstrap.handler(new ChannelInitializer<SocketChannel>() { // from class: org.apache.flink.runtime.io.network.netty.NettyClient.1
            @Override // org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer
            public void initChannel(SocketChannel socketChannel) throws Exception {
                if (NettyClient.this.clientSSLFactory != null) {
                    socketChannel.pipeline().addLast("ssl", NettyClient.this.clientSSLFactory.createNettySSLHandler(socketChannel.alloc(), inetSocketAddress.getAddress().getCanonicalHostName(), inetSocketAddress.getPort()));
                }
                socketChannel.pipeline().addLast(NettyClient.this.protocol.getClientChannelHandlers());
            }
        });
        try {
            return this.bootstrap.connect(inetSocketAddress);
        } catch (ChannelException e) {
            if (((e.getCause() instanceof SocketException) && e.getCause().getMessage().equals("Too many open files")) || ((e.getCause() instanceof ChannelException) && (e.getCause().getCause() instanceof SocketException) && e.getCause().getCause().getMessage().equals("Too many open files"))) {
                throw new ChannelException("The operating system does not offer enough file handles to open the network connection. Please increase the number of available file handles.", e.getCause());
            }
            throw e;
        }
    }
}
