package com.amazon.communication;

import com.amazon.client.metrics.MetricEvent;
import com.amazon.communication.socket.ProtocolSocket;
import com.amazon.dp.logger.DPLogger;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: classes2.dex */
public class LongLivedMessageLifeCycleTracker {
    private static final int DEFAULT_RETRY_READ_DATA_DELAY_IN_MS = 100;
    private static final DPLogger log = new DPLogger("TComm.LongLivedMessageLifeCycleTracker");
    private final AlphaProtocolHandlerBase mAlphaProtocolHandlerBase;
    private final ByteBufferChainHandler mByteBufferChainHandler;
    private final int mChannel;
    private final int mExpectedMessageFragmentSize;
    private final int mMessageId;
    private final InputStream mMessageStream;
    private final String mMessageType;
    private final ProtocolSocket mProtocolSocket;
    private final WorkExecutor mWorkExecutor;
    private int mNextSequenceNumber = 1;
    private final AtomicInteger mNumUnacknowledgedOnByteBufferChainCalls = new AtomicInteger(0);
    private final AtomicInteger mNumRejectedChains = new AtomicInteger(0);
    protected boolean mEndOfStream = false;
    private boolean mHandlerIsDead = false;
    private final ByteBufferChainHandlerNotificationSink mNotificationSink = createNotificationSink();

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: classes2.dex */
    public class MessageTransmittingNotificationSink implements ByteBufferChainHandlerNotificationSink {
        /* JADX INFO: Access modifiers changed from: protected */
        public MessageTransmittingNotificationSink() {
        }

        @Override // com.amazon.communication.ByteBufferChainHandlerNotificationSink
        public void chainHandled(ByteBufferChain byteBufferChain, MetricEvent metricEvent) {
            LongLivedMessageLifeCycleTracker.log.verbose("chainHandled", "chain handled on large message", new Object[0]);
            synchronized (LongLivedMessageLifeCycleTracker.this) {
                metricEvent.addCounter("CountChainHandledOnLargeMessage", 1.0d);
                LongLivedMessageLifeCycleTracker.this.mNumUnacknowledgedOnByteBufferChainCalls.getAndDecrement();
                if (LongLivedMessageLifeCycleTracker.this.mHandlerIsDead) {
                    LongLivedMessageLifeCycleTracker.log.warn("chainHandled", "handler dead; directly return", new Object[0]);
                    return;
                }
                if (LongLivedMessageLifeCycleTracker.this.mEndOfStream && LongLivedMessageLifeCycleTracker.this.mNumUnacknowledgedOnByteBufferChainCalls.get() == 0 && LongLivedMessageLifeCycleTracker.this.mNumRejectedChains.get() == 0) {
                    LongLivedMessageLifeCycleTracker.log.verbose("chainHandled", "all fragments have been accepted", new Object[0]);
                    LongLivedMessageLifeCycleTracker.this.destroy(false);
                } else {
                    LongLivedMessageLifeCycleTracker.this.transmitMessage(metricEvent);
                }
            }
        }

        @Override // com.amazon.communication.ByteBufferChainHandlerNotificationSink
        public void chainRejected(ByteBufferChain byteBufferChain, MetricEvent metricEvent, boolean z) {
            LongLivedMessageLifeCycleTracker.log.warn("chainRejected", "chain rejected on large message", new Object[0]);
            synchronized (LongLivedMessageLifeCycleTracker.this) {
                metricEvent.addCounter("CountChainRejectedOnLargeMessage", 1.0d);
                LongLivedMessageLifeCycleTracker.this.mNumUnacknowledgedOnByteBufferChainCalls.getAndDecrement();
                LongLivedMessageLifeCycleTracker.this.mNumRejectedChains.incrementAndGet();
                if (z) {
                    LongLivedMessageLifeCycleTracker.log.info("chainRejected", "notified that ByteBufferChainHandler will never accept a ByteBufferChain; it cannot accept any more fragments of the current message; treating it as dead", new Object[0]);
                    LongLivedMessageLifeCycleTracker.this.mHandlerIsDead = true;
                    LongLivedMessageLifeCycleTracker.this.destroy(true);
                }
            }
        }

        @Override // com.amazon.communication.ByteBufferChainHandlerNotificationSink
        public void okToResubmitRejectedChain(ByteBufferChain byteBufferChain, MetricEvent metricEvent) {
            LongLivedMessageLifeCycleTracker.log.warn("okToResubmitRejectedChain", "notified to resubmit a rejected chain", new Object[0]);
            synchronized (LongLivedMessageLifeCycleTracker.this) {
                if (LongLivedMessageLifeCycleTracker.this.mHandlerIsDead) {
                    LongLivedMessageLifeCycleTracker.log.warn("okToResubmitRejectedChain", "not resubmitting a rejected chain, because handler is dead", new Object[0]);
                    return;
                }
                LongLivedMessageLifeCycleTracker.this.mNumRejectedChains.decrementAndGet();
                LongLivedMessageLifeCycleTracker.log.verbose("okToResubmitRejectedChain", "attempting to transmit previously rejected fragment", new Object[0]);
                metricEvent.addCounter("CountRetransmitFragment", 1.0d);
                LongLivedMessageLifeCycleTracker.this.transmitMessageFragment(metricEvent, byteBufferChain);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public LongLivedMessageLifeCycleTracker(AlphaProtocolHandlerBase alphaProtocolHandlerBase, ByteBufferChainHandler byteBufferChainHandler, WorkExecutor workExecutor, ProtocolSocket protocolSocket, int i, InputStream inputStream, String str, int i2, int i3) {
        this.mAlphaProtocolHandlerBase = alphaProtocolHandlerBase;
        this.mByteBufferChainHandler = byteBufferChainHandler;
        this.mWorkExecutor = workExecutor;
        this.mProtocolSocket = protocolSocket;
        this.mExpectedMessageFragmentSize = i;
        this.mMessageType = str;
        this.mChannel = i2;
        this.mMessageId = i3;
        this.mMessageStream = inputStream;
        this.mProtocolSocket.largeMessageTransactionBeginning();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void destroy(boolean z) {
        log.verbose("destroy", "removing tracker from map", "isDueToFailure", Boolean.valueOf(z));
        this.mProtocolSocket.largeMessageTransactionEnding();
        this.mAlphaProtocolHandlerBase.stopTrackingLongLivedMessage(this.mMessageId);
    }

    private synchronized boolean transmitNextMessageFragment(final MetricEvent metricEvent) {
        boolean z;
        if (this.mEndOfStream) {
            log.info("transmitNextMessageFragment", "stream has come to the end", new Object[0]);
            z = false;
        } else {
            boolean z2 = false;
            try {
                if (this.mMessageStream.available() == 0) {
                    log.info("transmitNextMessageFragment", "stream has no data to read without blocking; enqueue work to retry reading later", new Object[0]);
                    this.mWorkExecutor.enqueueWorkAfter(this.mProtocolSocket, new Callable<Void>() { // from class: com.amazon.communication.LongLivedMessageLifeCycleTracker.1
                        @Override // java.util.concurrent.Callable
                        public Void call() throws Exception {
                            LongLivedMessageLifeCycleTracker.this.transmitMessage(metricEvent);
                            return null;
                        }
                    }, 100L);
                    z = false;
                } else {
                    try {
                        ByteBufferChain extractByteBufferChainFromInputStream = this.mAlphaProtocolHandlerBase.extractByteBufferChainFromInputStream(this.mMessageStream, this.mExpectedMessageFragmentSize);
                        if (extractByteBufferChainFromInputStream == null) {
                            log.info("transmitNextMessageFragment", "end of stream", new Object[0]);
                            z2 = true;
                            extractByteBufferChainFromInputStream = new ByteBufferChain();
                        } else if (extractByteBufferChainFromInputStream.getDataSize() != this.mExpectedMessageFragmentSize && this.mMessageStream.available() == 1 && extractByteBufferChainFromInputStream.append(this.mMessageStream, 1) == -1) {
                            log.info("transmitNextMessageFragment", "Nothing left on the message stream, this will be the last fragment.", new Object[0]);
                            z2 = true;
                        }
                        if (z2) {
                            try {
                                this.mEndOfStream = true;
                                this.mMessageStream.close();
                            } catch (ProtocolException e) {
                                log.error("transmitNextMessageFragment", "encountered exception while encoding fragment", e);
                                destroy(true);
                                z = false;
                            } catch (IOException e2) {
                                log.error("transmitNextMessageFragment", "encountered exception while encoding fragment", e2);
                                destroy(true);
                                z = false;
                            }
                        }
                        ByteBufferChain encodeMessageFragment = this.mAlphaProtocolHandlerBase.encodeMessageFragment(extractByteBufferChainFromInputStream, !this.mEndOfStream, this.mMessageType, this.mChannel, this.mMessageId, this.mNextSequenceNumber, metricEvent);
                        this.mNextSequenceNumber++;
                        z = transmitMessageFragment(metricEvent, encodeMessageFragment);
                    } catch (IOException e3) {
                        log.error("transmitNextMessageFragment", "encountered exception while extracting message fragment", e3);
                        destroy(true);
                        z = false;
                    }
                }
            } catch (IOException e4) {
                log.error("transmitNextMessageFragment", "encountered exception while checking available bytes on message stream", e4);
                destroy(true);
                z = false;
            }
        }
        return z;
    }

    protected ByteBufferChainHandlerNotificationSink createNotificationSink() {
        return new MessageTransmittingNotificationSink();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void transmitMessage(MetricEvent metricEvent) {
        boolean z = false;
        int i = 0;
        while (this.mNumUnacknowledgedOnByteBufferChainCalls.get() < 5 && this.mNumRejectedChains.get() == 0 && !this.mEndOfStream && !this.mHandlerIsDead) {
            try {
                if (log.isVerboseEnabled()) {
                    log.verbose("transmitMessage", "attempting to transmit next message fragment", "mMessageStream.available", Integer.valueOf(this.mMessageStream.available()), "mNumUnacknowledgedOnByteBufferChainCalls", Integer.valueOf(this.mNumUnacknowledgedOnByteBufferChainCalls.get()));
                }
                z = true;
                if (!transmitNextMessageFragment(metricEvent)) {
                    break;
                } else {
                    i++;
                }
            } catch (IOException e) {
                log.error("transmitMessage", "encountered IOException when checking how many bytes are available in the stream", e);
                destroy(true);
            }
        }
        if (log.isVerboseEnabled()) {
            if (z) {
                log.verbose("transmitMessage", "sent message fragments", "numMessageFragmentsSent", Integer.valueOf(i), "mNumUnacknowledgedOnByteBufferChainCalls", Integer.valueOf(this.mNumUnacknowledgedOnByteBufferChainCalls.get()), "mNextSequenceNumber", Integer.valueOf(this.mNextSequenceNumber), "mNumRejectedChains", Integer.valueOf(this.mNumRejectedChains.get()), "mEndOfStream", Boolean.valueOf(this.mEndOfStream), "mHandlerIsDead", Boolean.valueOf(this.mHandlerIsDead));
            } else {
                log.verbose("transmitMessage", "did not attempt to send any message fragments", "mNumUnacknowledgedOnByteBufferChainCalls", Integer.valueOf(this.mNumUnacknowledgedOnByteBufferChainCalls.get()), "mNextSequenceNumber", Integer.valueOf(this.mNextSequenceNumber), "mNumRejectedChains", Integer.valueOf(this.mNumRejectedChains.get()), "mEndOfStream", Boolean.valueOf(this.mEndOfStream), "mHandlerIsDead", Boolean.valueOf(this.mHandlerIsDead));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized boolean transmitMessageFragment(MetricEvent metricEvent, ByteBufferChain byteBufferChain) {
        boolean z = true;
        synchronized (this) {
            try {
                metricEvent.addCounter("CountTransmitFragment", 1.0d);
                this.mNumUnacknowledgedOnByteBufferChainCalls.getAndIncrement();
                this.mByteBufferChainHandler.onByteBufferChain(byteBufferChain, this.mNotificationSink, metricEvent);
            } catch (IOException e) {
                log.error("transmitMessageFragment", "encountered exception upon transmitting fragment", e);
                destroy(true);
                z = false;
            }
        }
        return z;
    }
}
