package org.jgroups.protocols;

import groovy.inspect.Inspector;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.function.ToIntFunction;
import java.util.stream.Stream;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.infinispan.transaction.xa.recovery.RecoveryAdminOperations;
import org.jgroups.Address;
import org.jgroups.Event;
import org.jgroups.Message;
import org.jgroups.View;
import org.jgroups.annotations.MBean;
import org.jgroups.annotations.ManagedAttribute;
import org.jgroups.annotations.ManagedOperation;
import org.jgroups.annotations.Property;
import org.jgroups.logging.Log;
import org.jgroups.stack.Protocol;
import org.jgroups.util.AgeOutCache;
import org.jgroups.util.AsciiString;
import org.jgroups.util.AverageMinMax;
import org.jgroups.util.ExpiryCache;
import org.jgroups.util.LongTuple;
import org.jgroups.util.MessageBatch;
import org.jgroups.util.SeqnoList;
import org.jgroups.util.Table;
import org.jgroups.util.TimeScheduler;
import org.jgroups.util.TimeService;
import org.jgroups.util.Util;
import org.python.icu.text.PluralRules;

@MBean(description = "Reliable unicast layer")
/* loaded from: input_file:WEB-INF/lib/jgroups-4.0.9.Final.jar:org/jgroups/protocols/UNICAST3.class */
public class UNICAST3 extends Protocol implements AgeOutCache.Handler<Address> {
    protected static final long DEFAULT_FIRST_SEQNO = 1;

    @Property(description = "Max number of messages to ask for in a retransmit request. 0 disables this and uses the max bundle size in the transport")
    protected int max_xmit_req_size;
    protected Future<?> xmit_task;
    protected Address local_addr;
    protected TimeScheduler timer;
    protected short last_conn_id;
    protected AgeOutCache<Address> cache;
    protected TimeService time_service;
    protected static final Message DUMMY_OOB_MSG = new Message().setFlag(Message.Flag.OOB);
    protected static final Predicate<Message> dont_loopback_filter = message -> {
        return message != null && message.isTransientFlagSet(Message.TransientFlag.DONT_LOOPBACK);
    };
    protected static final BiConsumer<MessageBatch, Message> BATCH_ACCUMULATOR = (v0, v1) -> {
        v0.add(v1);
    };

    @Property(description = "Time (in milliseconds) after which an idle incoming or outgoing connection is closed. The connection will get re-established when used again. 0 disables connection reaping")
    protected long conn_expiry_timeout = 120000;

    @Property(description = "Time (in ms) until a connection marked to be closed will get removed. 0 disables this")
    protected long conn_close_timeout = 240000;

    @Property(description = "Number of rows of the matrix in the retransmission table (only for experts)", writable = false)
    protected int xmit_table_num_rows = 100;

    @Property(description = "Number of elements of a row of the matrix in the retransmission table; gets rounded to the next power of 2 (only for experts). The capacity of the matrix is xmit_table_num_rows * xmit_table_msgs_per_row", writable = false)
    protected int xmit_table_msgs_per_row = 1024;

    @Property(description = "Resize factor of the matrix in the retransmission table (only for experts)", writable = false)
    protected double xmit_table_resize_factor = 1.2d;

    @Property(description = "Number of milliseconds after which the matrix in the retransmission table is compacted (only for experts)", writable = false)
    protected long xmit_table_max_compaction_time = 600000;
    protected long max_retransmit_time = 60000;

    @Property(description = "Interval (in milliseconds) at which messages in the send windows are resent")
    protected long xmit_interval = 500;

    @Property(description = "If true, trashes warnings about retransmission messages not found in the xmit_table (used for testing)")
    protected boolean log_not_found_msgs = true;

    @Property(description = "Send an ack immediately when a batch of ack_threshold (or more) messages is received. Otherwise send delayed acks. If 1, ack single messages (similar to UNICAST)")
    protected int ack_threshold = 5;

    @Property(description = "Min time (in ms) to elapse for successive SEND_FIRST_SEQNO messages to be sent to the same sender")
    protected long sync_min_interval = HdfsServerConstants.NAMENODE_LEASE_RECHECK_INTERVAL;
    protected long num_msgs_sent = 0;
    protected long num_msgs_received = 0;
    protected long num_acks_sent = 0;
    protected long num_acks_received = 0;
    protected long num_xmits = 0;

    @ManagedAttribute(description = "Number of retransmit requests received")
    protected final LongAdder xmit_reqs_received = new LongAdder();

    @ManagedAttribute(description = "Number of retransmit requests sent")
    protected final LongAdder xmit_reqs_sent = new LongAdder();

    @ManagedAttribute(description = "Number of retransmit responses sent")
    protected final LongAdder xmit_rsps_sent = new LongAdder();
    protected final AverageMinMax avg_delivery_batch_size = new AverageMinMax();

    @ManagedAttribute(description = "True if sending a message can block at the transport level")
    protected boolean sends_can_block = true;

    @ManagedAttribute(description = "tracing is enabled or disabled for the given log", writable = true)
    protected boolean is_trace = this.log.isTraceEnabled();
    protected final ConcurrentMap<Address, SenderEntry> send_table = Util.createConcurrentMap();
    protected final ConcurrentMap<Address, ReceiverEntry> recv_table = Util.createConcurrentMap();
    protected final ReentrantLock recv_table_lock = new ReentrantLock();
    protected final Map<Address, Long> xmit_task_map = new HashMap();
    protected volatile List<Address> members = new ArrayList(11);
    protected volatile boolean running = false;
    protected final AtomicInteger timestamper = new AtomicInteger(0);
    protected ExpiryCache<Address> last_sync_sent = null;
    protected final Predicate<Message> drop_oob_and_dont_loopback_msgs_filter = message -> {
        return (message == null || message == DUMMY_OOB_MSG || (message.isFlagSet(Message.Flag.OOB) && !message.setTransientFlagIfAbsent(Message.TransientFlag.OOB_DELIVERED)) || (message.isTransientFlagSet(Message.TransientFlag.DONT_LOOPBACK) && this.local_addr != null && this.local_addr.equals(message.src()))) ? false : true;
    };

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:WEB-INF/lib/jgroups-4.0.9.Final.jar:org/jgroups/protocols/UNICAST3$Entry.class */
    public abstract class Entry {
        protected final Table<Message> msgs;
        protected final short conn_id;
        protected final AtomicLong timestamp = new AtomicLong(0);
        protected volatile State state = State.OPEN;

        protected Entry(short s, Table<Message> table) {
            this.conn_id = s;
            this.msgs = table;
            update();
        }

        short connId() {
            return this.conn_id;
        }

        void update() {
            this.timestamp.set(UNICAST3.this.getTimestamp());
        }

        State state() {
            return this.state;
        }

        Entry state(State state) {
            if (this.state != state) {
                this.state = state;
                update();
            }
            return this;
        }

        long age() {
            return TimeUnit.MILLISECONDS.convert(UNICAST3.this.getTimestamp() - this.timestamp.longValue(), TimeUnit.NANOSECONDS);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:WEB-INF/lib/jgroups-4.0.9.Final.jar:org/jgroups/protocols/UNICAST3$ReceiverEntry.class */
    public final class ReceiverEntry extends Entry {
        protected volatile boolean send_ack;

        public ReceiverEntry(Table<Message> table, short s) {
            super(s, table);
        }

        ReceiverEntry sendAck(boolean z) {
            this.send_ack = z;
            return this;
        }

        boolean sendAck() {
            boolean z = this.send_ack;
            this.send_ack = false;
            return z;
        }

        public String toString() {
            StringBuilder sb = new StringBuilder();
            if (this.msgs != null) {
                sb.append(this.msgs).append(RecoveryAdminOperations.SEPARATOR);
            }
            sb.append("recv_conn_id=" + ((int) this.conn_id)).append(" (" + (age() / 1000) + " secs old) - " + this.state);
            if (this.send_ack) {
                sb.append(" [ack pending]");
            }
            return sb.toString();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:WEB-INF/lib/jgroups-4.0.9.Final.jar:org/jgroups/protocols/UNICAST3$RetransmitTask.class */
    public class RetransmitTask implements Runnable {
        protected RetransmitTask() {
        }

        @Override // java.lang.Runnable
        public void run() {
            UNICAST3.this.triggerXmit();
        }

        public String toString() {
            return UNICAST3.class.getSimpleName() + ": RetransmitTask (interval=" + UNICAST3.this.xmit_interval + " ms)";
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:WEB-INF/lib/jgroups-4.0.9.Final.jar:org/jgroups/protocols/UNICAST3$SenderEntry.class */
    public final class SenderEntry extends Entry {
        final AtomicLong sent_msgs_seqno;
        protected final long[] watermark;
        protected int last_timestamp;

        public SenderEntry(short s) {
            super(s, new Table(UNICAST3.this.xmit_table_num_rows, UNICAST3.this.xmit_table_msgs_per_row, 0L, UNICAST3.this.xmit_table_resize_factor, UNICAST3.this.xmit_table_max_compaction_time));
            this.sent_msgs_seqno = new AtomicLong(1L);
            this.watermark = new long[]{0, 0};
        }

        long[] watermark() {
            return this.watermark;
        }

        SenderEntry watermark(long j, long j2) {
            this.watermark[0] = j;
            this.watermark[1] = j2;
            return this;
        }

        protected synchronized boolean updateLastTimestamp(int i) {
            if (this.last_timestamp == 0) {
                this.last_timestamp = i;
                return true;
            }
            boolean z = UNICAST3.compare(i, this.last_timestamp) > 0;
            if (z) {
                this.last_timestamp = i;
            }
            return z;
        }

        public String toString() {
            StringBuilder sb = new StringBuilder();
            if (this.msgs != null) {
                sb.append(this.msgs).append(RecoveryAdminOperations.SEPARATOR);
            }
            sb.append("send_conn_id=" + ((int) this.conn_id)).append(" (" + (age() / 1000) + " secs old) - " + this.state);
            if (this.last_timestamp != 0) {
                sb.append(", last-ts: ").append(this.last_timestamp);
            }
            return sb.toString();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:WEB-INF/lib/jgroups-4.0.9.Final.jar:org/jgroups/protocols/UNICAST3$State.class */
    public enum State {
        OPEN,
        CLOSING,
        CLOSED
    }

    @ManagedAttribute
    public String getLocalAddress() {
        return this.local_addr != null ? this.local_addr.toString() : "null";
    }

    @ManagedAttribute(description = "Returns the number of outgoing (send) connections")
    public int getNumSendConnections() {
        return this.send_table.size();
    }

    @ManagedAttribute(description = "Returns the number of incoming (receive) connections")
    public int getNumReceiveConnections() {
        return this.recv_table.size();
    }

    @ManagedAttribute(description = "Returns the total number of outgoing (send) and incoming (receive) connections")
    public int getNumConnections() {
        return getNumReceiveConnections() + getNumSendConnections();
    }

    @ManagedAttribute(description = "Next seqno issued by the timestamper")
    public int getTimestamper() {
        return this.timestamper.get();
    }

    @ManagedAttribute(description = "Average batch size of messages removed from the table and delivered to the application")
    public String getAvgBatchDeliverySize() {
        return this.avg_delivery_batch_size != null ? this.avg_delivery_batch_size.toString() : Inspector.NOT_APPLICABLE;
    }

    @Override // org.jgroups.stack.Protocol
    public <T extends Protocol> T setLevel(String str) {
        T t = (T) super.setLevel(str);
        this.is_trace = this.log.isTraceEnabled();
        return t;
    }

    @ManagedOperation
    public String printConnections() {
        StringBuilder sb = new StringBuilder();
        if (!this.send_table.isEmpty()) {
            sb.append("\nsend connections:\n");
            for (Map.Entry<Address, SenderEntry> entry : this.send_table.entrySet()) {
                sb.append(entry.getKey()).append(PluralRules.KEYWORD_RULE_SEPARATOR).append(entry.getValue()).append("\n");
            }
        }
        if (!this.recv_table.isEmpty()) {
            sb.append("\nreceive connections:\n");
            for (Map.Entry<Address, ReceiverEntry> entry2 : this.recv_table.entrySet()) {
                sb.append(entry2.getKey()).append(PluralRules.KEYWORD_RULE_SEPARATOR).append(entry2.getValue()).append("\n");
            }
        }
        return sb.toString();
    }

    @ManagedAttribute
    public long getNumMessagesSent() {
        return this.num_msgs_sent;
    }

    @ManagedAttribute
    public long getNumMessagesReceived() {
        return this.num_msgs_received;
    }

    @ManagedAttribute
    public long getNumAcksSent() {
        return this.num_acks_sent;
    }

    @ManagedAttribute
    public long getNumAcksReceived() {
        return this.num_acks_received;
    }

    @ManagedAttribute
    public long getNumXmits() {
        return this.num_xmits;
    }

    public long getMaxRetransmitTime() {
        return this.max_retransmit_time;
    }

    @Property(description = "Max number of milliseconds we try to retransmit a message to any given member. After that, the connection is removed. Any new connection to that member will start with seqno #1 again. 0 disables this")
    public void setMaxRetransmitTime(long j) {
        this.max_retransmit_time = j;
        if (this.cache == null || j <= 0) {
            return;
        }
        this.cache.setTimeout(j);
    }

    @ManagedAttribute(description = "Is the retransmit task running")
    public boolean isXmitTaskRunning() {
        return (this.xmit_task == null || this.xmit_task.isDone()) ? false : true;
    }

    @ManagedAttribute
    public int getAgeOutCacheSize() {
        if (this.cache != null) {
            return this.cache.size();
        }
        return 0;
    }

    @ManagedOperation
    public String printAgeOutCache() {
        return this.cache != null ? this.cache.toString() : Inspector.NOT_APPLICABLE;
    }

    public AgeOutCache<Address> getAgeOutCache() {
        return this.cache;
    }

    public boolean hasSendConnectionTo(Address address) {
        SenderEntry senderEntry = this.send_table.get(address);
        return senderEntry != null && senderEntry.state() == State.OPEN;
    }

    @ManagedAttribute
    public int getNumUnackedMessages() {
        return accumulate((v0) -> {
            return v0.size();
        }, this.send_table.values());
    }

    @ManagedAttribute(description = "Total number of undelivered messages in all receive windows")
    public int getXmitTableUndeliveredMessages() {
        return accumulate((v0) -> {
            return v0.size();
        }, this.recv_table.values());
    }

    @ManagedAttribute(description = "Total number of missing messages in all receive windows")
    public int getXmitTableMissingMessages() {
        return accumulate((v0) -> {
            return v0.getNumMissing();
        }, this.recv_table.values());
    }

    @ManagedAttribute(description = "Total number of deliverable messages in all receive windows")
    public int getXmitTableDeliverableMessages() {
        return accumulate((v0) -> {
            return v0.getNumDeliverable();
        }, this.recv_table.values());
    }

    @ManagedAttribute(description = "Number of compactions in all (receive and send) windows")
    public int getXmitTableNumCompactions() {
        return accumulate((v0) -> {
            return v0.getNumCompactions();
        }, this.recv_table.values(), this.send_table.values());
    }

    @ManagedAttribute(description = "Number of moves in all (receive and send) windows")
    public int getXmitTableNumMoves() {
        return accumulate((v0) -> {
            return v0.getNumMoves();
        }, this.recv_table.values(), this.send_table.values());
    }

    @ManagedAttribute(description = "Number of resizes in all (receive and send) windows")
    public int getXmitTableNumResizes() {
        return accumulate((v0) -> {
            return v0.getNumResizes();
        }, this.recv_table.values(), this.send_table.values());
    }

    @ManagedAttribute(description = "Number of purges in all (receive and send) windows")
    public int getXmitTableNumPurges() {
        return accumulate((v0) -> {
            return v0.getNumPurges();
        }, this.recv_table.values(), this.send_table.values());
    }

    @ManagedOperation(description = "Prints the contents of the receive windows for all members")
    public String printReceiveWindowMessages() {
        StringBuilder sb = new StringBuilder(this.local_addr + ":\n");
        for (Map.Entry<Address, ReceiverEntry> entry : this.recv_table.entrySet()) {
            sb.append(entry.getKey()).append(PluralRules.KEYWORD_RULE_SEPARATOR).append(entry.getValue().msgs.toString()).append('\n');
        }
        return sb.toString();
    }

    @ManagedOperation(description = "Prints the contents of the send windows for all members")
    public String printSendWindowMessages() {
        StringBuilder sb = new StringBuilder(this.local_addr + ":\n");
        for (Map.Entry<Address, SenderEntry> entry : this.send_table.entrySet()) {
            sb.append(entry.getKey()).append(PluralRules.KEYWORD_RULE_SEPARATOR).append(entry.getValue().msgs.toString()).append('\n');
        }
        return sb.toString();
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r5v0, types: [org.jgroups.protocols.UNICAST3] */
    @Override // org.jgroups.stack.Protocol
    public void resetStats() {
        ?? r5 = 0;
        this.num_xmits = 0L;
        this.num_acks_received = 0L;
        r5.num_acks_sent = this;
        this.num_msgs_received = this;
        this.num_msgs_sent = 0L;
        this.avg_delivery_batch_size.clear();
        Stream.of((Object[]) new LongAdder[]{this.xmit_reqs_received, this.xmit_reqs_sent, this.xmit_rsps_sent}).forEach((v0) -> {
            v0.reset();
        });
    }

    @Override // org.jgroups.stack.Protocol
    public void init() throws Exception {
        super.init();
        TP transport = getTransport();
        this.sends_can_block = transport instanceof TCP;
        this.time_service = transport.getTimeService();
        if (this.time_service == null) {
            throw new IllegalStateException("time service from transport is null");
        }
        this.last_sync_sent = new ExpiryCache<>(this.sync_min_interval);
        int maxBundleSize = (transport.getMaxBundleSize() - 50) * 8;
        int i = this.max_xmit_req_size;
        if (this.max_xmit_req_size <= 0) {
            this.max_xmit_req_size = maxBundleSize;
        } else {
            this.max_xmit_req_size = Math.min(this.max_xmit_req_size, maxBundleSize);
        }
        if (i != this.max_xmit_req_size) {
            this.log.trace("%s: set max_xmit_req_size from %d to %d", this.local_addr, Integer.valueOf(i), Integer.valueOf(this.max_xmit_req_size));
        }
        if (((Boolean) transport.getValue("thread_pool_enabled")).booleanValue()) {
            return;
        }
        this.log.info("the thread pool is disabled; %s could be removed (JGRP-2069)", getClass().getSimpleName());
    }

    @Override // org.jgroups.stack.Protocol
    public void start() throws Exception {
        this.timer = getTransport().getTimer();
        if (this.timer == null) {
            throw new Exception("timer is null");
        }
        if (this.max_retransmit_time > 0) {
            this.cache = new AgeOutCache<>(this.timer, this.max_retransmit_time, this);
        }
        this.running = true;
        startRetransmitTask();
    }

    @Override // org.jgroups.stack.Protocol
    public void stop() {
        sendPendingAcks();
        this.running = false;
        stopRetransmitTask();
        this.xmit_task_map.clear();
        removeAllConnections();
    }

    @Override // org.jgroups.stack.Protocol, org.jgroups.UpHandler
    public Object up(Message message) {
        if (message.getDest() == null || message.isFlagSet(Message.Flag.NO_RELIABILITY)) {
            return this.up_prot.up(message);
        }
        UnicastHeader3 unicastHeader3 = (UnicastHeader3) message.getHeader(this.id);
        if (unicastHeader3 == null) {
            return this.up_prot.up(message);
        }
        Address src = message.getSrc();
        switch (unicastHeader3.type) {
            case 0:
                if (this.is_trace) {
                    Log log = this.log;
                    Object[] objArr = new Object[5];
                    objArr[0] = this.local_addr;
                    objArr[1] = src;
                    objArr[2] = Long.valueOf(unicastHeader3.seqno);
                    objArr[3] = Short.valueOf(unicastHeader3.conn_id);
                    objArr[4] = unicastHeader3.first ? ", first" : "";
                    log.trace("%s <-- DATA(%s: #%d, conn_id=%d%s)", objArr);
                }
                if (Objects.equals(this.local_addr, src)) {
                    handleDataReceivedFromSelf(src, unicastHeader3.seqno, message);
                    return null;
                }
                handleDataReceived(src, unicastHeader3.seqno, unicastHeader3.conn_id, unicastHeader3.first, message);
                return null;
            default:
                handleUpEvent(src, message, unicastHeader3);
                return null;
        }
    }

    protected void handleUpEvent(Address address, Message message, UnicastHeader3 unicastHeader3) {
        try {
            switch (unicastHeader3.type) {
                case 0:
                    throw new IllegalStateException("header of type DATA is not supposed to be handled by this method");
                case 1:
                    handleAckReceived(address, unicastHeader3.seqno, unicastHeader3.conn_id, unicastHeader3.timestamp());
                    break;
                case 2:
                    handleResendingOfFirstMessage(address, unicastHeader3.timestamp());
                    break;
                case 3:
                    handleXmitRequest(address, (SeqnoList) Util.streamableFromBuffer(SeqnoList.class, message.getRawBuffer(), message.getOffset(), message.getLength()));
                    break;
                case 4:
                    this.log.trace(this.local_addr + "%s <-- CLOSE(%s: conn-id=%s)", this.local_addr, address, Short.valueOf(unicastHeader3.conn_id));
                    ReceiverEntry receiverEntry = this.recv_table.get(address);
                    if (receiverEntry != null && receiverEntry.connId() == unicastHeader3.conn_id) {
                        this.recv_table.remove(address, receiverEntry);
                        this.log.trace("%s: removed receive connection for %s", this.local_addr, address);
                        break;
                    }
                    break;
                default:
                    this.log.error(Util.getMessage("TypeNotKnown"), this.local_addr, Byte.valueOf(unicastHeader3.type));
                    break;
            }
        } catch (Throwable th) {
            this.log.error(Util.getMessage("FailedHandlingEvent"), this.local_addr, th);
        }
    }

    @Override // org.jgroups.stack.Protocol, org.jgroups.UpHandler
    public void up(MessageBatch messageBatch) {
        UnicastHeader3 unicastHeader3;
        if (messageBatch.dest() == null) {
            this.up_prot.up(messageBatch);
            return;
        }
        if (this.local_addr == null || this.local_addr.equals(messageBatch.sender())) {
            SenderEntry senderEntry = this.local_addr != null ? this.send_table.get(this.local_addr) : null;
            if (senderEntry != null) {
                handleBatchFromSelf(messageBatch, senderEntry);
                return;
            }
            return;
        }
        int size = messageBatch.size();
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        ReceiverEntry receiverEntry = this.recv_table.get(messageBatch.sender());
        Iterator<Message> it = messageBatch.iterator();
        while (it.hasNext()) {
            Message next = it.next();
            if (next != null && !next.isFlagSet(Message.Flag.NO_RELIABILITY) && (unicastHeader3 = (UnicastHeader3) next.getHeader(this.id)) != null) {
                it.remove();
                if (unicastHeader3.type != 0) {
                    handleUpEvent(next.getSrc(), next, unicastHeader3);
                } else {
                    ((List) linkedHashMap.computeIfAbsent(Short.valueOf(unicastHeader3.conn_id), sh -> {
                        return new ArrayList(size);
                    })).add(new LongTuple(unicastHeader3.seqno(), next));
                    if (unicastHeader3.first) {
                        receiverEntry = getReceiverEntry(messageBatch.sender(), unicastHeader3.seqno(), unicastHeader3.first, unicastHeader3.connId());
                    }
                }
            }
        }
        if (!linkedHashMap.isEmpty()) {
            if (receiverEntry == null) {
                sendRequestForFirstSeqno(messageBatch.sender());
            } else {
                if (linkedHashMap.keySet().retainAll(Collections.singletonList(Short.valueOf(receiverEntry.connId())))) {
                    sendRequestForFirstSeqno(messageBatch.sender());
                }
                List<LongTuple<Message>> list = (List) linkedHashMap.get(Short.valueOf(receiverEntry.connId()));
                if (list != null && !list.isEmpty()) {
                    handleBatchReceived(receiverEntry, messageBatch.sender(), list, messageBatch.mode() == MessageBatch.Mode.OOB);
                }
            }
        }
        if (messageBatch.isEmpty()) {
            return;
        }
        this.up_prot.up(messageBatch);
    }

    protected void handleBatchFromSelf(MessageBatch messageBatch, Entry entry) {
        UnicastHeader3 unicastHeader3;
        ArrayList arrayList = new ArrayList(messageBatch.size());
        Iterator<Message> it = messageBatch.iterator();
        while (it.hasNext()) {
            Message next = it.next();
            if (next != null && !next.isFlagSet(Message.Flag.NO_RELIABILITY) && (unicastHeader3 = (UnicastHeader3) next.getHeader(this.id)) != null) {
                it.remove();
                if (unicastHeader3.type != 0) {
                    handleUpEvent(next.getSrc(), next, unicastHeader3);
                } else if (entry.conn_id != unicastHeader3.conn_id) {
                    it.remove();
                } else {
                    arrayList.add(new LongTuple<>(unicastHeader3.seqno(), next));
                }
            }
        }
        if (!arrayList.isEmpty()) {
            if (this.is_trace) {
                this.log.trace("%s <-- DATA(%s: %s)", this.local_addr, messageBatch.sender(), printMessageList(arrayList));
            }
            int size = arrayList.size();
            Table<Message> table = entry.msgs;
            update(entry, size);
            if (messageBatch.mode() == MessageBatch.Mode.OOB) {
                MessageBatch messageBatch2 = new MessageBatch(this.local_addr, messageBatch.sender(), messageBatch.clusterName(), messageBatch.multicast(), MessageBatch.Mode.OOB, size);
                Iterator<LongTuple<Message>> it2 = arrayList.iterator();
                while (it2.hasNext()) {
                    Message message = table.get(it2.next().getVal1());
                    if (message != null && message.isFlagSet(Message.Flag.OOB) && message.setTransientFlagIfAbsent(Message.TransientFlag.OOB_DELIVERED)) {
                        messageBatch2.add(message);
                    }
                }
                deliverBatch(messageBatch2);
            }
            removeAndDeliver(table, messageBatch.sender());
        }
        if (messageBatch.isEmpty()) {
            return;
        }
        this.up_prot.up(messageBatch);
    }

    @Override // org.jgroups.stack.Protocol
    public Object down(Event event) {
        switch (event.getType()) {
            case 6:
                List<Address> members = ((View) event.getArg()).getMembers();
                HashSet hashSet = new HashSet(this.send_table.keySet());
                hashSet.addAll(this.recv_table.keySet());
                this.members = members;
                hashSet.removeAll(members);
                if (this.cache != null) {
                    this.cache.removeAll(members);
                }
                if (!hashSet.isEmpty()) {
                    this.log.trace("%s: closing connections of non members %s", this.local_addr, hashSet);
                    hashSet.forEach(this::closeConnection);
                }
                if (!members.isEmpty()) {
                    for (Address address : members) {
                        SenderEntry senderEntry = this.send_table.get(address);
                        if (senderEntry != null && senderEntry.state() == State.CLOSING) {
                            senderEntry.state(State.OPEN);
                        }
                        ReceiverEntry receiverEntry = this.recv_table.get(address);
                        if (receiverEntry != null && receiverEntry.state() == State.CLOSING) {
                            receiverEntry.state(State.OPEN);
                        }
                    }
                }
                this.xmit_task_map.keySet().retainAll(members);
                this.last_sync_sent.removeExpiredElements();
                break;
            case 8:
                this.local_addr = (Address) event.getArg();
                break;
        }
        return this.down_prot.down(event);
    }

    @Override // org.jgroups.stack.Protocol
    public Object down(Message message) {
        Address dest = message.getDest();
        if (dest == null || message.isFlagSet(Message.Flag.NO_RELIABILITY)) {
            return this.down_prot.down(message);
        }
        if (!this.running) {
            this.log.trace("%s: discarded message as start() has not yet been called, message: %s", this.local_addr, message);
            return null;
        }
        if (message.src() == null) {
            message.src(this.local_addr);
        }
        SenderEntry senderEntry = getSenderEntry(dest);
        boolean z = message.isTransientFlagSet(Message.TransientFlag.DONT_LOOPBACK) && dest.equals(this.local_addr);
        short connId = senderEntry.connId();
        long andIncrement = senderEntry.sent_msgs_seqno.getAndIncrement();
        long j = 10;
        do {
            try {
                message.putHeader(this.id, UnicastHeader3.createDataHeader(andIncrement, connId, andIncrement == 1));
                senderEntry.msgs.add(andIncrement, (long) message, (Predicate<long>) (z ? dont_loopback_filter : null));
                if (this.conn_expiry_timeout > 0) {
                    senderEntry.update();
                }
                if (z) {
                    senderEntry.msgs.purge(senderEntry.msgs.getHighestDeliverable());
                }
                break;
            } catch (Throwable th) {
                if (this.running) {
                    Util.sleep(j);
                    j = Math.min(5000L, j * 2);
                }
            }
        } while (this.running);
        if (this.is_trace) {
            StringBuilder sb = new StringBuilder();
            sb.append(this.local_addr).append(" --> DATA(").append(dest).append(": #").append(andIncrement).append(", conn_id=").append((int) connId);
            if (andIncrement == 1) {
                sb.append(", first");
            }
            sb.append(')');
            this.log.trace(sb);
        }
        this.num_msgs_sent++;
        return this.down_prot.down(message);
    }

    public void closeConnection(Address address) {
        closeSendConnection(address);
        closeReceiveConnection(address);
    }

    public void closeSendConnection(Address address) {
        SenderEntry senderEntry = this.send_table.get(address);
        if (senderEntry != null) {
            senderEntry.state(State.CLOSING);
        }
    }

    public void closeReceiveConnection(Address address) {
        ReceiverEntry receiverEntry = this.recv_table.get(address);
        if (receiverEntry != null) {
            receiverEntry.state(State.CLOSING);
        }
    }

    protected void removeSendConnection(Address address) {
        SenderEntry remove = this.send_table.remove(address);
        if (remove != null) {
            remove.state(State.CLOSED);
            if (this.members.contains(address)) {
                sendClose(address, remove.connId());
            }
        }
    }

    protected void removeReceiveConnection(Address address) {
        sendPendingAcks();
        ReceiverEntry remove = this.recv_table.remove(address);
        if (remove != null) {
            remove.state(State.CLOSED);
        }
    }

    @ManagedOperation(description = "Trashes all connections to other nodes. This is only used for testing")
    public void removeAllConnections() {
        this.send_table.clear();
        this.recv_table.clear();
    }

    protected void retransmit(SeqnoList seqnoList, Address address) {
        Message putHeader = new Message(address).setBuffer(Util.streamableToBuffer(seqnoList)).setFlag(Message.Flag.OOB, Message.Flag.INTERNAL).putHeader(this.id, UnicastHeader3.createXmitReqHeader());
        if (this.is_trace) {
            this.log.trace("%s: sending XMIT_REQ (%s) to %s", this.local_addr, seqnoList, address);
        }
        this.down_prot.down(putHeader);
        this.xmit_reqs_sent.add(seqnoList.size());
    }

    protected void retransmit(Message message) {
        if (this.is_trace) {
            UnicastHeader3 unicastHeader3 = (UnicastHeader3) message.getHeader(this.id);
            this.log.trace("%s --> XMIT(%s: #%d)", this.local_addr, message.getDest(), Long.valueOf(unicastHeader3 != null ? unicastHeader3.seqno : -1L));
        }
        this.down_prot.down(message);
        this.num_xmits++;
    }

    @Override // org.jgroups.util.AgeOutCache.Handler
    public void expired(Address address) {
        if (address != null) {
            this.log.debug("%s: removing connection to %s because it expired", this.local_addr, address);
            closeConnection(address);
        }
    }

    protected void handleDataReceived(Address address, long j, short s, boolean z, Message message) {
        ReceiverEntry receiverEntry = getReceiverEntry(address, j, z, s);
        if (receiverEntry == null) {
            return;
        }
        update(receiverEntry, 1);
        boolean isFlagSet = message.isFlagSet(Message.Flag.OOB);
        Table<Message> table = receiverEntry.msgs;
        boolean add = table.add(j, (long) (isFlagSet ? DUMMY_OOB_MSG : message));
        if (this.ack_threshold <= 1) {
            sendAck(address, table.getHighestDeliverable(), receiverEntry.connId());
        } else {
            receiverEntry.sendAck(true);
        }
        if (isFlagSet) {
            if (add) {
                deliverMessage(message, address, j);
            }
            if (message.isFlagSet(Message.Flag.INTERNAL)) {
                processInternalMessage(table, address);
                return;
            }
        }
        removeAndDeliver(table, address);
    }

    protected void handleDataReceivedFromSelf(Address address, long j, Message message) {
        SenderEntry senderEntry = this.send_table.get(address);
        if (senderEntry == null || senderEntry.state() == State.CLOSED) {
            this.log.warn("%s: entry not found for %s; dropping message", this.local_addr, address);
            return;
        }
        update(senderEntry, 1);
        Table<Message> table = senderEntry.msgs;
        if (message.isFlagSet(Message.Flag.OOB)) {
            Message message2 = table.get(j);
            if (message2 != null && message2.isFlagSet(Message.Flag.OOB) && message2.setTransientFlagIfAbsent(Message.TransientFlag.OOB_DELIVERED)) {
                deliverMessage(message2, address, j);
            }
            if (message2 != null && message2.isFlagSet(Message.Flag.INTERNAL)) {
                processInternalMessage(table, address);
                return;
            }
        }
        removeAndDeliver(table, address);
    }

    protected void processInternalMessage(Table<Message> table, Address address) {
        if (table.isEmpty() || table.getAdders().get() != 0) {
            return;
        }
        getTransport().submitToThreadPool(() -> {
            removeAndDeliver(table, address);
        }, true);
    }

    protected void handleBatchReceived(ReceiverEntry receiverEntry, Address address, List<LongTuple<Message>> list, boolean z) {
        if (this.is_trace) {
            this.log.trace("%s <-- DATA(%s: %s)", this.local_addr, address, printMessageList(list));
        }
        int size = list.size();
        Table<Message> table = receiverEntry.msgs;
        boolean add = table.add((List<LongTuple<boolean>>) list, z, (boolean) (z ? DUMMY_OOB_MSG : null));
        update(receiverEntry, size);
        if (size >= this.ack_threshold) {
            sendAck(address, table.getHighestDeliverable(), receiverEntry.connId());
        } else {
            receiverEntry.sendAck(true);
        }
        if (add && z) {
            MessageBatch messageBatch = new MessageBatch(this.local_addr, address, (AsciiString) null, false, MessageBatch.Mode.OOB, list.size());
            Iterator<LongTuple<Message>> it = list.iterator();
            while (it.hasNext()) {
                messageBatch.add(it.next().getVal2());
            }
            deliverBatch(messageBatch);
        }
        removeAndDeliver(table, address);
    }

    protected void removeAndDeliver(Table<Message> table, Address address) {
        AtomicInteger adders = table.getAdders();
        if (adders.getAndIncrement() != 0) {
            return;
        }
        MessageBatch multicast = new MessageBatch(table.getNumDeliverable()).dest(this.local_addr).sender(address).multicast(false);
        Supplier<R> supplier = () -> {
            return multicast;
        };
        do {
            try {
                multicast.reset();
                table.removeMany(true, 0, this.drop_oob_and_dont_loopback_msgs_filter, supplier, BATCH_ACCUMULATOR);
            } catch (Throwable th) {
                this.log.error("failed removing messages from table for " + address, th);
            }
            if (!multicast.isEmpty()) {
                if (this.stats) {
                    this.avg_delivery_batch_size.add(multicast.size());
                }
                deliverBatch(multicast);
            }
        } while (adders.decrementAndGet() != 0);
    }

    protected String printMessageList(List<LongTuple<Message>> list) {
        UnicastHeader3 unicastHeader3;
        UnicastHeader3 unicastHeader32;
        StringBuilder sb = new StringBuilder();
        int size = list.size();
        Message val2 = size > 0 ? list.get(0).getVal2() : null;
        Message val22 = size > 1 ? list.get(size - 1).getVal2() : val2;
        if (val2 != null && (unicastHeader32 = (UnicastHeader3) val2.getHeader(this.id)) != null) {
            sb.append("#" + unicastHeader32.seqno);
        }
        if (val22 != null && (unicastHeader3 = (UnicastHeader3) val22.getHeader(this.id)) != null) {
            sb.append(" - #" + unicastHeader3.seqno);
        }
        return sb.toString();
    }

    protected ReceiverEntry getReceiverEntry(Address address, long j, boolean z, short s) {
        ReceiverEntry receiverEntry = this.recv_table.get(address);
        if (receiverEntry != null && receiverEntry.connId() == s) {
            return receiverEntry;
        }
        this.recv_table_lock.lock();
        try {
            ReceiverEntry receiverEntry2 = this.recv_table.get(address);
            if (z) {
                if (receiverEntry2 == null) {
                    receiverEntry2 = createReceiverEntry(address, j, s);
                } else if (s != receiverEntry2.connId()) {
                    this.log.trace("%s: conn_id=%d != %d; resetting receiver window", this.local_addr, Short.valueOf(s), Short.valueOf(receiverEntry2.connId()));
                    this.recv_table.remove(address);
                    receiverEntry2 = createReceiverEntry(address, j, s);
                }
            } else if (receiverEntry2 == null || receiverEntry2.connId() != s) {
                this.recv_table_lock.unlock();
                sendRequestForFirstSeqno(address);
                if (this.recv_table_lock.isHeldByCurrentThread()) {
                    this.recv_table_lock.unlock();
                }
                return null;
            }
            return receiverEntry2;
        } finally {
            if (this.recv_table_lock.isHeldByCurrentThread()) {
                this.recv_table_lock.unlock();
            }
        }
    }

    protected SenderEntry getSenderEntry(Address address) {
        SenderEntry senderEntry = this.send_table.get(address);
        if (senderEntry == null || senderEntry.state() == State.CLOSED) {
            if (senderEntry != null) {
                this.send_table.remove(address, senderEntry);
            }
            senderEntry = new SenderEntry(getNewConnectionId());
            SenderEntry putIfAbsent = this.send_table.putIfAbsent(address, senderEntry);
            if (putIfAbsent != null) {
                senderEntry = putIfAbsent;
            } else {
                this.log.trace("%s: created sender window for %s (conn-id=%s)", this.local_addr, address, Short.valueOf(senderEntry.connId()));
                if (this.cache != null && !this.members.contains(address)) {
                    this.cache.add(address);
                }
            }
        }
        if (senderEntry.state() == State.CLOSING) {
            senderEntry.state(State.OPEN);
        }
        return senderEntry;
    }

    protected ReceiverEntry createReceiverEntry(Address address, long j, short s) {
        ReceiverEntry receiverEntry = new ReceiverEntry(new Table(this.xmit_table_num_rows, this.xmit_table_msgs_per_row, j - 1, this.xmit_table_resize_factor, this.xmit_table_max_compaction_time), s);
        ReceiverEntry putIfAbsent = this.recv_table.putIfAbsent(address, receiverEntry);
        if (putIfAbsent != null) {
            return putIfAbsent;
        }
        this.log.trace("%s: created receiver window for %s at seqno=#%d for conn-id=%d", this.local_addr, address, Long.valueOf(j), Short.valueOf(s));
        return receiverEntry;
    }

    protected void handleAckReceived(Address address, long j, short s, int i) {
        if (this.is_trace) {
            this.log.trace("%s <-- ACK(%s: #%d, conn-id=%d, ts=%d)", this.local_addr, address, Long.valueOf(j), Short.valueOf(s), Integer.valueOf(i));
        }
        SenderEntry senderEntry = this.send_table.get(address);
        if (senderEntry != null && senderEntry.connId() != s) {
            this.log.trace("%s: my conn_id (%d) != received conn_id (%d); discarding ACK", this.local_addr, Short.valueOf(senderEntry.connId()), Short.valueOf(s));
            return;
        }
        Table<Message> table = senderEntry != null ? senderEntry.msgs : null;
        if (table == null || !senderEntry.updateLastTimestamp(i)) {
            return;
        }
        table.purge(j, true);
        this.num_acks_received++;
    }

    protected void handleResendingOfFirstMessage(Address address, int i) {
        Message message;
        this.log.trace("%s <-- SEND_FIRST_SEQNO(%s)", this.local_addr, address);
        SenderEntry senderEntry = this.send_table.get(address);
        Table<Message> table = senderEntry != null ? senderEntry.msgs : null;
        if (table == null) {
            this.log.warn(Util.getMessage("SenderNotFound"), this.local_addr, address);
            return;
        }
        if (senderEntry.updateLastTimestamp(i) && (message = table.get(table.getLow() + 1)) != null) {
            Message copy = message.copy();
            UnicastHeader3 copy2 = ((UnicastHeader3) copy.getHeader(this.id)).copy();
            copy2.first = true;
            copy.putHeader(this.id, copy2);
            this.down_prot.down(copy);
        }
    }

    protected void handleXmitRequest(Address address, SeqnoList seqnoList) {
        if (this.is_trace) {
            this.log.trace("%s <-- XMIT(%s: #%s)", this.local_addr, address, seqnoList);
        }
        SenderEntry senderEntry = this.send_table.get(address);
        this.xmit_reqs_received.add(seqnoList.size());
        Table<Message> table = senderEntry != null ? senderEntry.msgs : null;
        if (table != null) {
            Iterator<Long> it = seqnoList.iterator();
            while (it.hasNext()) {
                long longValue = it.next().longValue();
                Message message = table.get(longValue);
                if (message != null) {
                    this.down_prot.down(message);
                    this.xmit_rsps_sent.increment();
                } else if (this.log.isWarnEnabled() && this.log_not_found_msgs && !this.local_addr.equals(address) && longValue > table.getLow()) {
                    this.log.warn(Util.getMessage("MessageNotFound"), this.local_addr, address, Long.valueOf(longValue));
                }
            }
        }
    }

    protected void deliverMessage(Message message, Address address, long j) {
        if (this.is_trace) {
            this.log.trace("%s: delivering %s#%s", this.local_addr, address, Long.valueOf(j));
        }
        try {
            this.up_prot.up(message);
        } catch (Throwable th) {
            Log log = this.log;
            String message2 = Util.getMessage("FailedToDeliverMsg");
            Object[] objArr = new Object[4];
            objArr[0] = this.local_addr;
            objArr[1] = message.isFlagSet(Message.Flag.OOB) ? "OOB message" : "message";
            objArr[2] = message;
            objArr[3] = th;
            log.warn(message2, objArr);
        }
    }

    protected void deliverBatch(MessageBatch messageBatch) {
        try {
            if (messageBatch.isEmpty()) {
                return;
            }
            if (this.is_trace) {
                Message first = messageBatch.first();
                Message last = messageBatch.last();
                StringBuilder sb = new StringBuilder(this.local_addr + ": delivering");
                if (first != null && last != null) {
                    sb.append(" #").append(((UnicastHeader3) first.getHeader(this.id)).seqno).append(" - #").append(((UnicastHeader3) last.getHeader(this.id)).seqno);
                }
                sb.append(" (" + messageBatch.size()).append(" messages)");
                this.log.trace(sb);
            }
            this.up_prot.up(messageBatch);
        } catch (Throwable th) {
            this.log.warn(Util.getMessage("FailedToDeliverMsg"), this.local_addr, "batch", messageBatch, th);
        }
    }

    protected long getTimestamp() {
        return this.time_service.timestamp();
    }

    protected void startRetransmitTask() {
        if (this.xmit_task == null || this.xmit_task.isDone()) {
            this.xmit_task = this.timer.scheduleWithFixedDelay(new RetransmitTask(), 0L, this.xmit_interval, TimeUnit.MILLISECONDS, this.sends_can_block);
        }
    }

    protected void stopRetransmitTask() {
        if (this.xmit_task != null) {
            this.xmit_task.cancel(true);
            this.xmit_task = null;
        }
    }

    protected void sendAck(Address address, long j, short s) {
        if (this.running) {
            Message putHeader = new Message(address).setFlag(Message.Flag.INTERNAL).putHeader(this.id, UnicastHeader3.createAckHeader(j, s, this.timestamper.incrementAndGet()));
            if (this.is_trace) {
                this.log.trace("%s --> ACK(%s: #%d)", this.local_addr, address, Long.valueOf(j));
            }
            try {
                this.down_prot.down(putHeader);
                this.num_acks_sent++;
            } catch (Throwable th) {
                this.log.error(Util.getMessage("FailedSendingAck"), this.local_addr, Long.valueOf(j), address, th);
            }
        }
    }

    protected synchronized short getNewConnectionId() {
        short s = this.last_conn_id;
        if (this.last_conn_id >= Short.MAX_VALUE || this.last_conn_id < 0) {
            this.last_conn_id = (short) 0;
        } else {
            this.last_conn_id = (short) (this.last_conn_id + 1);
        }
        return s;
    }

    protected void sendRequestForFirstSeqno(Address address) {
        if (this.last_sync_sent.addIfAbsentOrExpired(address)) {
            Message putHeader = new Message(address).setFlag(Message.Flag.OOB).putHeader(this.id, UnicastHeader3.createSendFirstSeqnoHeader(this.timestamper.incrementAndGet()));
            this.log.trace("%s --> SEND_FIRST_SEQNO(%s)", this.local_addr, address);
            this.down_prot.down(putHeader);
        }
    }

    public void sendClose(Address address, short s) {
        Message putHeader = new Message(address).setFlag(Message.Flag.INTERNAL).putHeader(this.id, UnicastHeader3.createCloseHeader(s));
        this.log.trace("%s --> CLOSE(%s, conn-id=%d)", this.local_addr, address, Short.valueOf(s));
        this.down_prot.down(putHeader);
    }

    @ManagedOperation(description = "Closes connections that have been idle for more than conn_expiry_timeout ms")
    public void closeIdleConnections() {
        for (Map.Entry<Address, SenderEntry> entry : this.send_table.entrySet()) {
            SenderEntry value = entry.getValue();
            if (value.state() == State.OPEN) {
                long age = value.age();
                if (age >= this.conn_expiry_timeout) {
                    this.log.debug("%s: closing expired connection for %s (%d ms old) in send_table", this.local_addr, entry.getKey(), Long.valueOf(age));
                    closeSendConnection(entry.getKey());
                }
            }
        }
        for (Map.Entry<Address, ReceiverEntry> entry2 : this.recv_table.entrySet()) {
            ReceiverEntry value2 = entry2.getValue();
            if (value2.state() == State.OPEN) {
                long age2 = value2.age();
                if (age2 >= this.conn_expiry_timeout) {
                    this.log.debug("%s: closing expired connection for %s (%d ms old) in recv_table", this.local_addr, entry2.getKey(), Long.valueOf(age2));
                    closeReceiveConnection(entry2.getKey());
                }
            }
        }
    }

    @ManagedOperation(description = "Removes connections that have been closed for more than conn_close_timeout ms")
    public int removeExpiredConnections() {
        int i = 0;
        for (Map.Entry<Address, SenderEntry> entry : this.send_table.entrySet()) {
            SenderEntry value = entry.getValue();
            if (value.state() != State.OPEN) {
                long age = value.age();
                if (age >= this.conn_close_timeout) {
                    this.log.debug("%s: removing expired connection for %s (%d ms old) from send_table", this.local_addr, entry.getKey(), Long.valueOf(age));
                    removeSendConnection(entry.getKey());
                    i++;
                }
            }
        }
        for (Map.Entry<Address, ReceiverEntry> entry2 : this.recv_table.entrySet()) {
            ReceiverEntry value2 = entry2.getValue();
            if (value2.state() != State.OPEN) {
                long age2 = value2.age();
                if (age2 >= this.conn_close_timeout) {
                    this.log.debug("%s: removing expired connection for %s (%d ms old) from recv_table", this.local_addr, entry2.getKey(), Long.valueOf(age2));
                    removeReceiveConnection(entry2.getKey());
                    i++;
                }
            }
        }
        return i;
    }

    @ManagedOperation(description = "Removes send- and/or receive-connections whose state is not OPEN (CLOSING or CLOSED)")
    public int removeConnections(boolean z, boolean z2) {
        int i = 0;
        if (z) {
            for (Map.Entry<Address, SenderEntry> entry : this.send_table.entrySet()) {
                SenderEntry value = entry.getValue();
                if (value.state() != State.OPEN) {
                    this.log.debug("%s: removing connection for %s (%d ms old, state=%s) from send_table", this.local_addr, entry.getKey(), Long.valueOf(value.age()), value.state());
                    removeSendConnection(entry.getKey());
                    i++;
                }
            }
        }
        if (z2) {
            for (Map.Entry<Address, ReceiverEntry> entry2 : this.recv_table.entrySet()) {
                ReceiverEntry value2 = entry2.getValue();
                if (value2.state() != State.OPEN) {
                    this.log.debug("%s: removing expired connection for %s (%d ms old, state=%s) from recv_table", this.local_addr, entry2.getKey(), Long.valueOf(value2.age()), value2.state());
                    removeReceiveConnection(entry2.getKey());
                    i++;
                }
            }
        }
        return i;
    }

    protected void update(Entry entry, int i) {
        if (this.conn_expiry_timeout > 0) {
            entry.update();
        }
        if (entry.state() == State.CLOSING) {
            entry.state(State.OPEN);
        }
        this.num_msgs_received += i;
    }

    protected static int compare(int i, int i2) {
        int i3 = i - i2;
        if (i3 < 0) {
            return -1;
        }
        return i3 > 0 ? 1 : 0;
    }

    @SafeVarargs
    protected static int accumulate(ToIntFunction<Table> toIntFunction, Collection<? extends Entry>... collectionArr) {
        Stream filter = Stream.of((Object[]) collectionArr).flatMap((v0) -> {
            return v0.stream();
        }).map(entry -> {
            return entry.msgs;
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        });
        toIntFunction.getClass();
        return filter.mapToInt((v1) -> {
            return r1.applyAsInt(v1);
        }).sum();
    }

    @ManagedOperation(description = "Triggers the retransmission task")
    public void triggerXmit() {
        SeqnoList missing;
        for (Map.Entry<Address, ReceiverEntry> entry : this.recv_table.entrySet()) {
            Address key = entry.getKey();
            ReceiverEntry value = entry.getValue();
            Table<Message> table = value != null ? value.msgs : null;
            if (table != null && value.sendAck()) {
                sendAck(key, table.getHighestDeliverable(), value.connId());
            }
            if (table != null && table.getNumMissing() > 0 && (missing = table.getMissing(this.max_xmit_req_size)) != null) {
                long last = missing.getLast();
                Long l = this.xmit_task_map.get(key);
                if (l == null) {
                    this.xmit_task_map.put(key, Long.valueOf(last));
                } else {
                    missing.removeHigherThan(l.longValue());
                    if (last > l.longValue()) {
                        this.xmit_task_map.put(key, Long.valueOf(last));
                    }
                    if (!missing.isEmpty()) {
                        retransmit(missing, key);
                    }
                }
            } else if (!this.xmit_task_map.isEmpty()) {
                this.xmit_task_map.remove(key);
            }
        }
        Iterator<SenderEntry> it = this.send_table.values().iterator();
        while (it.hasNext()) {
            SenderEntry next = it.next();
            Table<Message> table2 = next != null ? next.msgs : null;
            if (table2 != null) {
                long highestDelivered = table2.getHighestDelivered();
                long highestReceived = table2.getHighestReceived();
                if (highestDelivered < highestReceived && next.watermark[0] == highestDelivered && next.watermark[1] == highestReceived) {
                    Message message = table2.get(highestReceived);
                    if (message != null) {
                        retransmit(message);
                    }
                } else {
                    next.watermark(highestDelivered, highestReceived);
                }
            }
        }
        if (this.conn_expiry_timeout > 0) {
            closeIdleConnections();
        }
        if (this.conn_close_timeout > 0) {
            removeExpiredConnections();
        }
    }

    @ManagedOperation(description = "Sends ACKs immediately for entries which are marked as pending (ACK hasn't been sent yet)")
    public void sendPendingAcks() {
        for (Map.Entry<Address, ReceiverEntry> entry : this.recv_table.entrySet()) {
            Address key = entry.getKey();
            ReceiverEntry value = entry.getValue();
            Table<Message> table = value != null ? value.msgs : null;
            if (table != null && value.sendAck()) {
                sendAck(key, table.getHighestDeliverable(), value.connId());
            }
        }
    }
}
