/*
 * Decompiled with CFR 0.152.
 */
package org.openehealth.ipf.commons.audit.protocol;

import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.ssl.SslContext;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import org.openehealth.ipf.commons.audit.AuditException;
import org.openehealth.ipf.commons.audit.NettyUtils;
import org.openehealth.ipf.commons.audit.TlsParameters;
import org.openehealth.ipf.commons.audit.protocol.AuditTransmissionChannel;
import org.openehealth.ipf.commons.audit.protocol.NioTLSSyslogSenderImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.netty.Connection;
import reactor.netty.internal.util.Metrics;
import reactor.netty.resources.LoopResources;
import reactor.netty.tcp.TcpClient;

public class ReactorNettyTLSSyslogSenderImpl
extends NioTLSSyslogSenderImpl<Connection, ReactorNettyDestination> {
    private static final Logger log = LoggerFactory.getLogger(ReactorNettyTLSSyslogSenderImpl.class);
    private int workerThreads = 1;
    private long connectTimeoutMillis = 5000L;
    private long sendTimeoutMillis = 5000L;

    public ReactorNettyTLSSyslogSenderImpl(TlsParameters tlsParameters) {
        super(tlsParameters);
    }

    @Override
    public String getTransportName() {
        return AuditTransmissionChannel.REACTOR_NETTY_TLS.getProtocolName();
    }

    @Override
    protected ReactorNettyDestination makeDestination(TlsParameters tlsParameters, String host, int port, boolean logging) {
        return new ReactorNettyDestination(tlsParameters, host, port, this.workerThreads, this.connectTimeoutMillis, this.sendTimeoutMillis);
    }

    public void setConnectTimeout(int value, TimeUnit timeUnit) {
        this.connectTimeoutMillis = timeUnit.toMillis(value);
    }

    public void setSendTimeout(int value, TimeUnit timeUnit) {
        this.sendTimeoutMillis = timeUnit.toMillis(value);
    }

    public void setWorkerThreads(int workerThreads) {
        this.workerThreads = workerThreads;
    }

    public static final class ReactorNettyDestination
    implements NioTLSSyslogSenderImpl.Destination<Connection> {
        private final long sendTimeout;
        private final TcpClient tcpClient;
        private Connection connection;
        private final String host;
        private final int port;

        ReactorNettyDestination(TlsParameters tlsParameters, String host, int port, int workerThreads, long connectTimeout, long sendTimeout) {
            this.sendTimeout = sendTimeout;
            this.host = host;
            this.port = port;
            LoopResources loop = LoopResources.create((String)"event-loop", (int)1, (int)workerThreads, (boolean)true);
            SslContext sslContext = NettyUtils.initSslContext(tlsParameters, false);
            this.tcpClient = TcpClient.create().host(host).port(port).runOn(loop).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (Object)((int)connectTimeout)).option(ChannelOption.SO_KEEPALIVE, (Object)true).wiretap(this.getClass().getName(), LogLevel.TRACE).metrics(Metrics.isMicrometerAvailable()).secure(spec -> spec.sslContext(sslContext)).doOnConnect(config -> log.info("TLS Syslog Client is about to be started")).doOnConnected(connection -> log.info("TLS Syslog Client connected to {}", (Object)connection.address())).doOnDisconnected(connection -> log.info("TLS Syslog Client disconnected from {}", (Object)connection.address()));
        }

        @Override
        public void shutdown() {
            if (this.connection != null) {
                this.connection.disposeNow(Duration.ofSeconds(10L));
            }
        }

        @Override
        public Connection getHandle() {
            if (this.connection == null || !this.connection.channel().isActive()) {
                try {
                    this.connection = this.tcpClient.connectNow(Duration.ofSeconds(10L));
                }
                catch (Exception e) {
                    throw new AuditException("Interrupted while establishing TLS connection to " + this.host + ":" + this.port, e);
                }
            }
            return this.connection;
        }

        @Override
        public void write(byte[] bytes) {
            Channel channel = this.getHandle().channel();
            log.trace("Writing {} bytes using session: {}", (Object)bytes.length, (Object)channel);
            try {
                if (!channel.writeAndFlush((Object)Unpooled.wrappedBuffer((byte[])bytes)).await(this.sendTimeout)) {
                    throw new AuditException("Could not send audit message to " + this.host + ":" + this.port);
                }
            }
            catch (InterruptedException e) {
                throw new AuditException("Interrupted during sending audit message to " + this.host + ":" + this.port, e);
            }
        }
    }
}

