package com.hazelcast.jet.impl.connector;

import com.hazelcast.internal.util.JVMUtil;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.security.impl.function.SecuredFunctions;
import com.hazelcast.security.permission.ActionConstants;
import com.hazelcast.security.permission.ConnectorPermission;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import javax.annotation.Nonnull;

/* loaded from: input_file:lib/hazelcast-5.3.7.jar:com/hazelcast/jet/impl/connector/StreamSocketP.class */
public final class StreamSocketP extends AbstractProcessor {
    private static final int BUFFER_SIZE = 4096;
    private static final int MAX_BYTES_PER_CHAR = 4;
    private final String host;
    private final int port;
    private final CharsetDecoder charsetDecoder;
    private String pendingLine;
    private SocketChannel socketChannel;
    private boolean socketDone;
    private boolean maybeLfExpected;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final StringBuilder lineBuilder = new StringBuilder();
    private final ByteBuffer byteBuffer = ByteBuffer.allocate(4096);
    private final CharBuffer charBuffer = CharBuffer.allocate(4096);

    public StreamSocketP(String str, int i, Charset charset) {
        this.host = str;
        this.port = i;
        this.charsetDecoder = charset.newDecoder();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.hazelcast.jet.core.AbstractProcessor
    public void init(@Nonnull Processor.Context context) throws Exception {
        getLogger().info("Connecting to socket " + hostAndPort());
        this.socketChannel = SocketChannel.open();
        this.socketChannel.configureBlocking(false);
        this.socketChannel.connect(new InetSocketAddress(this.host, this.port));
        while (!this.socketChannel.finishConnect()) {
            LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(1L));
        }
        getLogger().info("Connected to socket " + hostAndPort());
        JVMUtil.upcast(this.byteBuffer).limit(0);
        JVMUtil.upcast(this.charBuffer).limit(0);
    }

    @Override // com.hazelcast.jet.core.Processor
    public boolean complete() {
        try {
            return tryComplete();
        } catch (Exception e) {
            throw ExceptionUtil.sneakyThrow(e);
        }
    }

    private boolean tryComplete() throws IOException {
        fillCharBuffer();
        emitFromCharBuffer();
        return this.socketDone && this.pendingLine == null;
    }

    private void fillCharBuffer() throws IOException {
        if (this.socketDone || this.charBuffer.hasRemaining()) {
            return;
        }
        this.socketDone = this.socketChannel.read(this.byteBuffer) < 0;
        JVMUtil.upcast(this.byteBuffer).flip();
        JVMUtil.upcast(this.charBuffer).clear();
        this.charsetDecoder.decode(this.byteBuffer, this.charBuffer, this.socketDone);
        JVMUtil.upcast(this.charBuffer).flip();
        this.byteBuffer.compact();
        if (!$assertionsDisabled && this.byteBuffer.position() >= 3) {
            throw new AssertionError("position=" + this.byteBuffer.position());
        }
    }

    private void emitFromCharBuffer() {
        while (this.charBuffer.hasRemaining()) {
            if (this.pendingLine == null) {
                this.pendingLine = tryReadLineFromBuffer();
            }
            if (this.pendingLine != null) {
                if (!tryEmit(this.pendingLine)) {
                    return;
                } else {
                    this.pendingLine = null;
                }
            }
        }
    }

    private String tryReadLineFromBuffer() {
        while (this.charBuffer.hasRemaining()) {
            char c = this.charBuffer.get();
            if (c != '\r' && c != '\n') {
                this.lineBuilder.append(c);
                this.maybeLfExpected = false;
            } else {
                if (!this.maybeLfExpected || c != '\n') {
                    if (c == '\r') {
                        this.maybeLfExpected = true;
                    }
                    try {
                        return this.lineBuilder.toString();
                    } finally {
                        this.lineBuilder.setLength(0);
                    }
                }
                this.maybeLfExpected = false;
            }
        }
        return null;
    }

    @Override // com.hazelcast.jet.core.Processor
    public void close() throws IOException {
        if (this.socketChannel != null) {
            getLogger().info("Closing socket " + hostAndPort());
            this.socketChannel.close();
        }
    }

    private String hostAndPort() {
        return this.host + ':' + this.port;
    }

    public static ProcessorMetaSupplier supplier(String str, int i, @Nonnull String str2) {
        return ProcessorMetaSupplier.preferLocalParallelismOne(ConnectorPermission.socket(str, i, ActionConstants.ACTION_READ), SecuredFunctions.streamSocketProcessorFn(str, i, str2));
    }

    static {
        $assertionsDisabled = !StreamSocketP.class.desiredAssertionStatus();
    }
}
