package com.huawei.ecs.mtk.tcp;

import com.huawei.ecs.mtk.log.LogRecord;
import com.huawei.ecs.mtk.log.Logger;
import com.huawei.ecs.mtk.mt.SafeLong;
import com.huawei.ecs.mtk.mt.SafeQueue;
import com.huawei.ecs.mtk.util.CryptInterface;
import com.huawei.ecs.mtk.util.SimpleThread;
import com.huawei.ecs.mtk.util.Util;
import java.net.SocketAddress;

/* loaded from: classes.dex */
public class TcpChannel extends SimpleThread implements TcpPeer {
    private static final byte[] EMPTY_BYTES = new byte[0];
    public static final String TAG = "TCP";
    private TcpCallback cb_;
    private Object channelId_;
    private CryptInterface crypt_;
    private TcpEnv env_;
    private SafeLong lastPauseRecvTime_;
    private SafeLong lastPauseSendTime_;
    private SafeLong lastPrintRecvTime_;
    private SafeLong lastPrintSendTime_;
    private SocketAddress localAddress_;
    private TcpReceiverThread receiver_;
    private SafeQueue<byte[]> recvQ_;
    private SocketAddress remoteAddress_;
    private SafeQueue<byte[]> sendQ_;
    private TcpSenderThread sender_;
    private TcpSocket sock_;

    /* loaded from: classes.dex */
    class TcpReceiverThread extends SimpleThread {
        public TcpReceiverThread() {
            super("tcp.call" + TcpChannel.this.sock_);
        }

        @Override // com.huawei.ecs.mtk.util.SimpleThread
        protected boolean onLoop() {
            byte[] bArr = (byte[]) TcpChannel.this.recvQ_.get(TcpChannel.this.env_.TCP_HEARTBEAT_TIMEOUT_MILLISECONDS);
            if (bArr != null) {
                if (bArr.length <= 0) {
                    return true;
                }
                TcpChannel.this.env_.statistics.info.recvWaiting.update(-1L, -bArr.length);
                TcpChannel.this.onRecv(bArr);
                return true;
            }
            if (!running()) {
                return true;
            }
            Logger.beginInfo("TCP").p((LogRecord) "connection heartbeat timed out").end();
            if (!TcpChannel.this.onUnavailable()) {
                return true;
            }
            TcpChannel.this.closeSocket();
            return true;
        }

        @Override // com.huawei.ecs.mtk.util.SimpleThread
        protected void onStop() {
            TcpChannel.this.recvQ_.wakeup();
        }
    }

    /* loaded from: classes.dex */
    class TcpSenderThread extends SimpleThread {
        private long lastSendTime_;

        public TcpSenderThread() {
            super("tcp.send" + TcpChannel.this.sock_);
            this.lastSendTime_ = System.currentTimeMillis();
        }

        @Override // com.huawei.ecs.mtk.util.SimpleThread
        protected boolean onLoop() {
            byte[] bArr = (byte[]) TcpChannel.this.sendQ_.get(TcpChannel.this.env_.TCP_HEARTBEAT_INTERVAL_MILLISECONDS);
            if (!running()) {
                return false;
            }
            long currentTimeMillis = System.currentTimeMillis();
            if (bArr != null) {
                Util.sync_notify_all(TcpChannel.this.sendQ_);
            } else if (currentTimeMillis >= (this.lastSendTime_ + TcpChannel.this.env_.TCP_HEARTBEAT_INTERVAL_MILLISECONDS) - 10) {
                bArr = TcpChannel.EMPTY_BYTES;
            }
            if (bArr == null) {
                return true;
            }
            if (bArr.length > 0) {
                TcpChannel.this.env_.statistics.info.sendWaiting.update(-1L, -bArr.length);
                TcpChannel.this.onSend(bArr);
            }
            if (!TcpChannel.this.writeSocket(bArr)) {
                TcpChannel.this.closeSocket();
                return true;
            }
            this.lastSendTime_ = currentTimeMillis;
            if (bArr.length <= 0) {
                return true;
            }
            TcpChannel.this.env_.statistics.info.sent.update(1L, bArr.length);
            return true;
        }

        @Override // com.huawei.ecs.mtk.util.SimpleThread
        protected void onStop() {
            TcpChannel.this.sendQ_.wakeup();
        }
    }

    public TcpChannel(TcpSocket tcpSocket) {
        super("tcp.recv" + tcpSocket);
        this.lastPrintSendTime_ = new SafeLong();
        this.lastPrintRecvTime_ = new SafeLong();
        this.lastPauseSendTime_ = new SafeLong();
        this.lastPauseRecvTime_ = new SafeLong();
        this.sock_ = tcpSocket;
        this.localAddress_ = this.sock_.getLocalSocketAddress();
        this.remoteAddress_ = this.sock_.getRemoteSocketAddress();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void closeSocket() {
        stopRunning();
        this.sock_.close();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onRecv(byte[] bArr) {
        Logger.beginDebug("TCP").p(bArr).end();
        this.cb_.onRecv(this, bArr);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onSend(byte[] bArr) {
        Logger.beginDebug("TCP").p(bArr).end();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean onUnavailable() {
        return this.cb_.onUnavailable(this);
    }

    private byte[] readSocket() {
        return this.sock_.recv(false);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean writeSocket(byte[] bArr) {
        return this.sock_.send(bArr);
    }

    boolean checkRecvQ() {
        if (this.recvQ_.size() <= this.env_.TCP_CHANNEL_PAUSE_SIZE) {
            return false;
        }
        if (this.lastPauseRecvTime_.setTimeIfElapsed(this.env_.TCP_CHANNEL_ALERT_TIME)) {
            Logger.beginError("TCP").p((LogRecord) "TOO many recv packets ").p((LogRecord) Integer.valueOf(this.recvQ_.size())).p((LogRecord) ", pause recvQ ").p((LogRecord) Long.valueOf(this.env_.TCP_CHANNEL_PAUSE_TIME)).p((LogRecord) "ms").end();
        }
        Util.sync_wait(this.recvQ_, this.env_.TCP_CHANNEL_PAUSE_TIME);
        return true;
    }

    public boolean connected() {
        return this.sock_.isConnected();
    }

    @Override // com.huawei.ecs.mtk.tcp.TcpPeer
    public Object getChannelId() {
        return this.channelId_;
    }

    @Override // com.huawei.ecs.mtk.tcp.TcpPeer
    public CryptInterface getCrypt() {
        return this.crypt_;
    }

    public int getLocalPort() {
        return this.sock_.getLocalPort();
    }

    @Override // com.huawei.ecs.mtk.tcp.TcpPeer
    public SocketAddress getLocalSocketAddress() {
        return this.localAddress_;
    }

    @Override // com.huawei.ecs.mtk.tcp.TcpPeer
    public SocketAddress getRemoteSocketAddress() {
        return this.remoteAddress_;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.huawei.ecs.mtk.util.SimpleThread
    public boolean onBirth() {
        Logger.beginInfo("TCP").p((LogRecord) "---- CONNECTED ----").end();
        this.sender_.start();
        this.receiver_.start();
        this.cb_.onOpen(this);
        return true;
    }

    @Override // com.huawei.ecs.mtk.util.SimpleThread
    protected void onDeath() {
        this.cb_.onClose(this);
        this.sender_.close();
        this.receiver_.close();
        closeSocket();
        Logger.beginInfo("TCP").p((LogRecord) "---- DISCONNECTED ----").end();
    }

    @Override // com.huawei.ecs.mtk.util.SimpleThread
    protected boolean onLoop() {
        if (checkRecvQ()) {
            return true;
        }
        byte[] readSocket = readSocket();
        if (readSocket != null) {
            putRecvQ(readSocket);
        } else if (this.sock_.isClosed()) {
            return false;
        }
        return true;
    }

    @Override // com.huawei.ecs.mtk.util.SimpleThread
    protected void onStop() {
        closeSocket();
    }

    public void open(TcpCallback tcpCallback, TcpEnv tcpEnv) {
        this.cb_ = tcpCallback;
        this.env_ = tcpEnv;
        this.sendQ_ = new SafeQueue<>();
        this.recvQ_ = new SafeQueue<>();
        this.sender_ = new TcpSenderThread();
        this.receiver_ = new TcpReceiverThread();
        writeSocket(EMPTY_BYTES);
    }

    public void putRecvQ(byte[] bArr) {
        if (this.recvQ_.size() > this.env_.TCP_CHANNEL_ALERT_SIZE && this.lastPrintRecvTime_.setTimeIfElapsed(this.env_.TCP_CHANNEL_ALERT_TIME)) {
            Logger.beginInfo("TCP").p((LogRecord) "many recv packets ").p((LogRecord) Integer.valueOf(this.recvQ_.size())).end();
        }
        this.recvQ_.put(bArr);
        if (bArr.length > 0) {
            this.env_.statistics.info.recv.update(1L, bArr.length);
            this.env_.statistics.info.recvWaiting.update(1L, bArr.length);
        }
    }

    @Override // com.huawei.ecs.mtk.tcp.TcpPeer
    public void send(byte[] bArr) {
        while (this.sendQ_.size() > this.env_.TCP_CHANNEL_PAUSE_SIZE && running()) {
            if (this.lastPauseSendTime_.setTimeIfElapsed(this.env_.TCP_CHANNEL_ALERT_TIME)) {
                Logger.beginError("TCP").p((LogRecord) "TOO many send packets ").p((LogRecord) Integer.valueOf(this.sendQ_.size())).p((LogRecord) ", pause sendQ ").p((LogRecord) Long.valueOf(this.env_.TCP_CHANNEL_PAUSE_TIME)).p((LogRecord) "ms").end();
            }
            Util.sync_wait(this.sendQ_, this.env_.TCP_CHANNEL_PAUSE_TIME);
        }
        if (this.sendQ_.size() > this.env_.TCP_CHANNEL_ALERT_SIZE && this.lastPrintSendTime_.setTimeIfElapsed(this.env_.TCP_CHANNEL_ALERT_TIME)) {
            Logger.beginInfo("TCP").p((LogRecord) "many send packets ").p((LogRecord) Integer.valueOf(this.sendQ_.size())).end();
        }
        this.sendQ_.put(bArr);
        if (bArr.length > 0) {
            this.env_.statistics.info.send.update(1L, bArr.length);
            this.env_.statistics.info.sendWaiting.update(1L, bArr.length);
        }
    }

    @Override // com.huawei.ecs.mtk.tcp.TcpPeer
    public void setChannelId(Object obj) {
        this.channelId_ = obj;
    }

    @Override // com.huawei.ecs.mtk.tcp.TcpPeer
    public void setCrypt(CryptInterface cryptInterface) {
        this.crypt_ = cryptInterface;
    }
}
