package com.newcapec.mobile.ncp.im;

import com.walker.cheetah.client.transport.LongConnector;
import com.walker.cheetah.core.DataProtocolsException;
import com.walker.cheetah.core.Protocols;
import com.walker.cheetah.core.ProtocolsFactory;
import com.walker.cheetah.core.RemoteException;
import com.walkersoft.mobile.core.util.LogUtils;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

/* loaded from: classes.dex */
public class AbstractTcpLongConnector extends AbstractConnector implements LongConnector {
    static final /* synthetic */ boolean $assertionsDisabled = false;
    public static final int DEFAULT_MAP_CAPACITY = 8;
    public static final int MAX_ONCE_READ = 8192;
    private static final int MAX_SAVED_PUSH_SIZE = 128;
    private static final int THREAD_WAIT_TIME = 1;
    protected Protocols protocols = ProtocolsFactory.getInstance();
    private BlockingQueue<ByteBuffer> pushedFromServerData = new ArrayBlockingQueue(128);
    private BlockingQueue<ByteBuffer> sendToServerData = new ArrayBlockingQueue(128);
    private volatile boolean connectThreadStoped = false;
    private ListeningServerThread connectThread = null;
    private final Map<Integer, ByteDataContainer> readReference = new HashMap(8);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: classes.dex */
    public static final class ByteDataContainer {
        private int currentSize = 0;
        private ByteBuffer resultBytes;
        private int totalBytes;

        public ByteDataContainer(int i) {
            this.totalBytes = 0;
            this.resultBytes = null;
            this.totalBytes = i;
            this.resultBytes = ByteBuffer.allocate(i);
        }

        public void addBytes(ByteBuffer byteBuffer, int i) {
            byteBuffer.flip();
            this.resultBytes.put(byteBuffer);
            byteBuffer.flip();
            this.currentSize += i;
        }

        public int getLeftCount() {
            return this.totalBytes - this.currentSize;
        }

        public ByteBuffer getResultBuffer() {
            this.resultBytes.flip();
            return this.resultBytes;
        }

        public String toString() {
            StringBuilder sb = new StringBuilder(128);
            sb.append("[currentSize=");
            sb.append(this.currentSize);
            sb.append(", totalBytes=");
            sb.append(this.totalBytes);
            sb.append(", resultBytes=");
            sb.append(this.resultBytes);
            sb.append("]");
            return sb.toString();
        }
    }

    /* loaded from: classes.dex */
    private class ListeningServerThread extends Thread {
        private static final String T_NAME = "ListeningServerThread";
        private volatile boolean isSending = false;
        private Thread parentThread;

        public ListeningServerThread() {
            setName(T_NAME);
        }

        private void processInterrupted() throws IOException {
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            AbstractTcpLongConnector abstractTcpLongConnector;
            LogUtils.g("======= connect thread is runing!");
            while (!AbstractTcpLongConnector.this.connectThreadStoped) {
                try {
                    try {
                        try {
                            try {
                                LogUtils.g("count = " + AbstractTcpLongConnector.this.selector.select());
                                Iterator<SelectionKey> it = AbstractTcpLongConnector.this.selector.selectedKeys().iterator();
                                if (AbstractTcpLongConnector.this.sendToServerData.peek() != null) {
                                    LogUtils.g("++发现发送数据，进入发送模式");
                                    AbstractTcpLongConnector abstractTcpLongConnector2 = AbstractTcpLongConnector.this;
                                    abstractTcpLongConnector2.client.register(abstractTcpLongConnector2.selector, 4);
                                    this.isSending = true;
                                } else {
                                    this.isSending = false;
                                }
                                while (it.hasNext()) {
                                    SelectionKey next = it.next();
                                    it.remove();
                                    if (next.isConnectable()) {
                                        if (AbstractTcpLongConnector.this.client.isConnectionPending()) {
                                            AbstractTcpLongConnector.this.client.finishConnect();
                                        }
                                        LogUtils.g("is connected...");
                                        if (this.isSending) {
                                            AbstractTcpLongConnector abstractTcpLongConnector3 = AbstractTcpLongConnector.this;
                                            abstractTcpLongConnector3.client.register(abstractTcpLongConnector3.selector, 4);
                                        } else {
                                            AbstractTcpLongConnector abstractTcpLongConnector4 = AbstractTcpLongConnector.this;
                                            abstractTcpLongConnector4.client.register(abstractTcpLongConnector4.selector, 1);
                                        }
                                    } else if (next.isWritable()) {
                                        try {
                                            AbstractTcpLongConnector abstractTcpLongConnector5 = AbstractTcpLongConnector.this;
                                            abstractTcpLongConnector5.write0(abstractTcpLongConnector5.sendToServerData.take());
                                            AbstractTcpLongConnector abstractTcpLongConnector6 = AbstractTcpLongConnector.this;
                                            abstractTcpLongConnector6.client.register(abstractTcpLongConnector6.selector, 1);
                                            LogUtils.g("已发送数据");
                                        } catch (InterruptedException unused) {
                                            LogUtils.g("从队列中获得发送数据时阻塞，当前已被中断");
                                            processInterrupted();
                                        }
                                    } else if (next.isReadable()) {
                                        ByteBuffer byteBuffer = null;
                                        try {
                                            byteBuffer = AbstractTcpLongConnector.this.read0();
                                            if (byteBuffer != null) {
                                                LogUtils.g("接收到数据: " + byteBuffer);
                                                try {
                                                    AbstractTcpLongConnector.this.pushedFromServerData.put(byteBuffer);
                                                } catch (InterruptedException unused2) {
                                                    LogUtils.g("接收服务端消息时，线程中断，接收到的数据可以在此处理，如：保存数据库等");
                                                }
                                            }
                                        } catch (DataProtocolsException e2) {
                                            e2.printStackTrace();
                                            LogUtils.g("接收到的数据不符合协议: " + e2.getMessage());
                                            LogUtils.g("错误的数据: " + byteBuffer);
                                            AbstractTcpLongConnector.this.connectThreadStoped = true;
                                        }
                                    } else {
                                        continue;
                                    }
                                }
                            } catch (ClosedSelectorException e3) {
                                LogUtils.g(".........selector is closed: " + e3.getMessage());
                                if (!AbstractTcpLongConnector.this.connectThreadStoped) {
                                    abstractTcpLongConnector = AbstractTcpLongConnector.this;
                                }
                            }
                        } catch (IOException e4) {
                            LogUtils.g("connect thread stoped: " + e4.getMessage());
                            if (!AbstractTcpLongConnector.this.connectThreadStoped) {
                                abstractTcpLongConnector = AbstractTcpLongConnector.this;
                            }
                        }
                    } catch (Throwable th) {
                        try {
                            if (!AbstractTcpLongConnector.this.connectThreadStoped) {
                                AbstractTcpLongConnector.this.close();
                            }
                        } catch (RemoteException unused3) {
                        }
                        LogUtils.g("客户端监听线程结束...");
                        terminate();
                        throw th;
                    }
                } catch (RemoteException unused4) {
                }
            }
            LogUtils.g(getName() + ": terminated because of connectThreadStoped = " + AbstractTcpLongConnector.this.connectThreadStoped);
            if (!AbstractTcpLongConnector.this.connectThreadStoped) {
                abstractTcpLongConnector = AbstractTcpLongConnector.this;
                abstractTcpLongConnector.close();
            }
            LogUtils.g("客户端监听线程结束...");
            terminate();
        }

        public void setParentThread(Thread thread) {
            this.parentThread = thread;
        }

        public void setStop() {
            AbstractTcpLongConnector.this.connectThreadStoped = true;
        }

        public void terminate() {
            AbstractTcpLongConnector.this.connectThreadStoped = true;
            Thread thread = this.parentThread;
            if (thread != null) {
                thread.interrupt();
            }
        }
    }

    private void checkConnection0() {
        if (this.client == null) {
            throw new IllegalStateException("客户端还未建立连接,必须先调用start()方法。");
        }
    }

    private ByteBuffer doGetFixsizeBuffer(ByteBuffer byteBuffer) {
        ByteBuffer allocate = ByteBuffer.allocate(byteBuffer.remaining());
        allocate.put(byteBuffer);
        return allocate;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ByteBuffer read0() throws IOException {
        int hashCode = this.client.hashCode();
        ByteDataContainer byteDataContainer = this.readReference.get(Integer.valueOf(hashCode));
        ByteBuffer allocate = ByteBuffer.allocate(8192);
        if (byteDataContainer != null) {
            int leftCount = byteDataContainer.getLeftCount();
            if (leftCount <= 0) {
                LogUtils.g("Processor.read()-没有剩余内容可以读取了");
                this.readReference.remove(Integer.valueOf(hashCode));
                return byteDataContainer.getResultBuffer();
            }
            int read = this.client.read(allocate);
            if (read < leftCount) {
                byteDataContainer.addBytes(allocate, read);
                LogUtils.g("++++++++++ 接着读数据，length: " + read);
                return null;
            }
            if (read != leftCount) {
                return null;
            }
            byteDataContainer.addBytes(allocate, read);
            LogUtils.g("++++++++++ 剩余内容读完，length: " + read);
            this.readReference.remove(Integer.valueOf(hashCode));
            return byteDataContainer.getResultBuffer();
        }
        ByteBuffer allocate2 = ByteBuffer.allocate(4);
        int read2 = this.client.read(allocate2);
        if (read2 > 0) {
            allocate2.flip();
            if (read2 != 4) {
                throw new IllegalArgumentException("The four bytes ahead must be type of Integer(size of content)!");
            }
            int i = allocate2.getInt();
            if (i > 8192) {
                LogUtils.g("++++++++++ 第一次读取，超过最大限制，total: " + i);
                readFirstOverMax(this.client, hashCode, i, allocate);
                return null;
            }
            ByteBuffer allocate3 = ByteBuffer.allocate(i);
            if (!readOnce(this.client, allocate3, i)) {
                return null;
            }
            LogUtils.g("++++++++++ 第一次读取，一次读完，total: " + allocate3.limit());
            this.readReference.remove(Integer.valueOf(hashCode));
            return allocate3;
        }
        LogUtils.g("xxxxxxxxxxx 读入数据格式错误，返回");
        int read3 = this.client.read(allocate);
        if (read3 <= 0) {
            LogUtils.g("xxxxxxxxxxx 尝试在读，但仍未读到消息内容");
            LogUtils.g("xxxxxxxxxxx 因为读取到空数据，可能存在链接异常，系统尝试关闭该连接 ============");
            close();
            return null;
        }
        LogUtils.g("xxxxxxxxxxx 尝试在读，消息字节数：" + read3);
        allocate.flip();
        String str = new String(allocate.array());
        if (read3 <= 8192) {
            LogUtils.g("xxxxxxxxxxx 尝试在读，一次读完：" + str);
        } else {
            LogUtils.g("xxxxxxxxxxx 尝试在读，内容过多：" + str);
        }
        return null;
    }

    private boolean readOnce(SocketChannel socketChannel, ByteBuffer byteBuffer, int i) throws IOException {
        int read = socketChannel.read(byteBuffer);
        LogUtils.g("============= total: " + i);
        if (read < i) {
            LogUtils.g(">>>>>>>>>>> 读入的数据，比宣称的要少: " + read);
            ByteDataContainer byteDataContainer = new ByteDataContainer(i);
            byteDataContainer.addBytes(byteBuffer, read);
            this.readReference.put(Integer.valueOf(socketChannel.hashCode()), byteDataContainer);
            return false;
        }
        if (read > i) {
            LogUtils.g(">>>>>>>>>>> 读入的数据，比宣称的要多: " + read + ",剩余的是另一个数据包");
            throw new UnsupportedOperationException();
        }
        if (read != i) {
            return false;
        }
        LogUtils.g(">>>>>>>>>>> 读入的数据，和宣称的一样: " + read);
        byteBuffer.flip();
        return true;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void write0(Object obj) throws IOException {
        if (obj instanceof byte[]) {
            byte[] encode = this.protocols.encode((byte[]) obj);
            LogUtils.g("发送的是字节，共: " + encode.length);
            this.client.write(ByteBuffer.wrap(encode));
            return;
        }
        if (!(obj instanceof ByteBuffer)) {
            throw new UnsupportedOperationException();
        }
        ByteBuffer encode2 = this.protocols.encode((ByteBuffer) obj);
        LogUtils.g("发送的是字节缓冲，共: " + encode2.remaining() + ", " + encode2);
        this.client.write(encode2);
    }

    @Override // com.walker.cheetah.client.transport.LongConnector
    public void close() throws RemoteException {
        try {
            LogUtils.g("客户端长连接被调用close()");
            stop();
        } catch (Exception e2) {
            throw new RemoteException("close connector failed!", e2);
        }
    }

    @Override // com.walker.cheetah.client.transport.LongConnector
    public SocketChannel connect() throws RemoteException {
        checkConnection0();
        if (this.connectThreadStoped) {
            LogUtils.g("长连接客户端线程重新连接，设置connectThreadStoped = false;");
            this.connectThreadStoped = false;
        }
        this.connectThread.setDaemon(true);
        this.connectThread.start();
        try {
            LogUtils.g("connectThread正在启动...");
            TimeUnit.SECONDS.sleep(1L);
        } catch (InterruptedException e2) {
            e2.printStackTrace();
        }
        return this.client;
    }

    @Override // com.walker.cheetah.client.transport.LongConnector
    public boolean isConnected() {
        SocketChannel socketChannel;
        return (this.connectThreadStoped || (socketChannel = this.client) == null || !socketChannel.isConnected()) ? false : true;
    }

    void readFirstOverMax(SocketChannel socketChannel, int i, int i2, ByteBuffer byteBuffer) throws IOException {
        int read = socketChannel.read(byteBuffer);
        ByteDataContainer byteDataContainer = new ByteDataContainer(i2);
        byteDataContainer.addBytes(byteBuffer, read);
        this.readReference.put(Integer.valueOf(i), byteDataContainer);
        LogUtils.g("...... 本次读入一次完成，等待下次，this size = " + read);
    }

    @Override // com.walker.cheetah.client.transport.LongConnector
    public byte[] receivePushed() throws RemoteException, InterruptedException {
        if (this.connectThreadStoped) {
            throw new RemoteException();
        }
        try {
            return this.pushedFromServerData.take().array();
        } catch (InterruptedException e2) {
            LogUtils.g("调用线程被中断，停止从'推送队列'获取数据");
            throw e2;
        }
    }

    @Override // com.walker.cheetah.client.transport.Connector
    public Object send(Object obj) throws RemoteException {
        checkConnection0();
        if (this.connectThreadStoped) {
            throw new RemoteException();
        }
        this.sendToServerData.add(obj instanceof byte[] ? ByteBuffer.wrap((byte[]) obj) : obj instanceof ByteBuffer ? (ByteBuffer) obj : null);
        this.selector.wakeup();
        return null;
    }

    @Override // com.newcapec.mobile.ncp.im.AbstractConnector
    protected void startConnector() throws Exception {
        Objects.requireNonNull(this.ip, "remote adress is required!");
        SocketChannel open = SocketChannel.open();
        this.client = open;
        open.configureBlocking(false);
        this.client.socket().setSoTimeout(0);
        Selector open2 = Selector.open();
        this.selector = open2;
        this.client.register(open2, 8);
        if (this.client.connect(this.ip)) {
            LogUtils.g("客户端连接成功!");
        } else {
            LogUtils.g("客户端正在连接...");
        }
        if (this.connectThread == null) {
            this.connectThread = new ListeningServerThread();
            LogUtils.g("创建了一个客户端线程:connectThread = " + this.connectThread);
        }
        this.connectThread.setParentThread(Thread.currentThread());
    }

    @Override // com.newcapec.mobile.ncp.im.AbstractConnector
    protected void stopConnector() throws Exception {
        ListeningServerThread listeningServerThread = this.connectThread;
        if (listeningServerThread != null) {
            listeningServerThread.setStop();
            LogUtils.g("设置connectThread.setStop()...");
            this.connectThread.interrupt();
        }
        this.connectThread = null;
        SocketChannel socketChannel = this.client;
        if (socketChannel != null) {
            closeChannel(socketChannel, this.selector);
            this.client = null;
            this.selector = null;
            LogUtils.g("客户端连接已被关闭：stopConnector");
        } else {
            LogUtils.g("客户端已经关闭，调用无效: stopConnector");
        }
        this.pushedFromServerData.clear();
        this.sendToServerData.clear();
    }
}
