banner
RustyNail

RustyNail

coder. 【blog】https://rustynail.me 【nostr】wss://ts.relays.world/ wss://relays.world/nostr

Traffic Control of Netty Channel

Control or monitor the connection speed of each Channel. This can be achieved using ChannelTrafficShapingHandler.

ChannelTrafficShapingHandler#

ChannelTrafficShapingHandler is designed for a single Channel and cannot monitor multiple channels simultaneously.

Source Code Comment

Note that this handler has a Pipeline Coverage of "one" which means a new handler must be created
for each new channel as the counter cannot be shared among all channels

When the channel is established, create a ChannelTrafficShapingHandler and add it to the Pipeline's First. If you want to monitor globally, you can use GlobalTrafficShapingHandler.

Traffic Statistics#

By inheriting it, we can implement traffic statistics.

/**
 * @author dqn
 * created at 2019/5/3 0:52
 */
public class MyChannelCounter extends ChannelTrafficShapingHandler {
    Logger logger = LoggerFactory.getLogger(MyChannelCounter.class);
    private Channel channel;

    public MyChannelCounter(Channel channel, long writeLimit, long readLimit, long checkInterval, long maxTime) {
        super(writeLimit, readLimit, checkInterval, maxTime);
        this.channel = channel;
    }

    @Override
    protected void doAccounting(TrafficCounter counter) {
        logger.info("outer[{}] wrote bytes: {}, read bytes: {}", channel.id().asShortText(),
                counter.lastWrittenBytes(), counter.lastReadBytes());
        // do something 
        super.doAccounting(counter);
    }
}

///channel active

// 1 * 1024 * 1024 * 1024
long limit = 1073741824;
ctx.pipeline().addFirst(TRAFFIC_COUNTER, new MyChannelCounter(ctx.channel(), limit, limit, 1000, 30000));

You can limit read and write speeds, the interval for statistics, and the maximum congestion time for traffic.

Rate Limiting#

Traffic shaping calculates the existing data transmission time under rate limiting conditions. When exceeded, the channel is stopped and placed in a queue (wrapped as an internal class ToSend) for transmission.

Then, a delayed task is started to allow the channel to continue transmission after the exceeded time.

@Override
    public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise)
            throws Exception {
        long size = calculateSize(msg);
        long now = TrafficCounter.milliSecondFromNano();
        if (size > 0) {
            // Calculate write wait time
            long wait = trafficCounter.writeTimeToWait(size, writeLimit, maxTime, now);
            if (wait >= MINIMAL_WAIT) {
                // This line will suspend the channel's write, then continue writing after the wait time
                submitWrite(ctx, msg, size, wait, now, promise);
                return;
            }
        }
        // to maintain order of write
        submitWrite(ctx, msg, size, 0, now, promise);
    }
@Override
    public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
        long size = calculateSize(msg);
        long now = TrafficCounter.milliSecondFromNano();
        if (size > 0) {
            // Continue transmission after wait time
            long wait = trafficCounter.readTimeToWait(size, readLimit, maxTime, now);
            wait = checkWaitReadTime(ctx, wait, now);
            if (wait >= MINIMAL_WAIT) {
                ////
                // omitted
                ////
                if (config.isAutoRead() && isHandlerActive(ctx)) {
                    // Suspend
                    channel.attr(READ_SUSPENDED).set(true);
                    // Reopen task
                    Attribute<Runnable> attr = channel.attr(REOPEN_TASK);
                    Runnable reopenTask = attr.get();
                    if (reopenTask == null) {
                        reopenTask = new ReopenReadTimerTask(ctx);
                        attr.set(reopenTask);
                    }
                    // Delayed start
                    ctx.executor().schedule(reopenTask, wait, TimeUnit.MILLISECONDS);
                    ////
                    // omitted
                    ////
                }
            }
        }
        ////
        // omitted
        ////
    }


    @Override
    void submitWrite(final ChannelHandlerContext ctx, final Object msg,
            final long size, final long delay, final long now,
            final ChannelPromise promise) {
        final ToSend newToSend;
        // write order control
        synchronized (this) {
            // Directly write, delay is zero
            if (delay == 0 && messagesQueue.isEmpty()) {
                trafficCounter.bytesRealWriteFlowControl(size);
                ctx.write(msg, promise);
                return;
            }
            newToSend = new ToSend(delay + now, msg, promise);
            messagesQueue.addLast(newToSend);
            queueSize += size;
            checkWriteSuspend(ctx, delay, queueSize);
        }
        final long futureNow = newToSend.relativeTimeAction;
        // A delayed task
        ctx.executor().schedule(new Runnable() {
            @Override
            public void run() {
                sendAllValid(ctx, futureNow);
            }
        }, delay, TimeUnit.MILLISECONDS);
    }

Wait Time Calculation#

This time calculation is quite large, and the read and write time calculations are similar.

public long readTimeToWait(final long size, final long limitTraffic, final long maxTime, final long now) {
        // CAS (compare and set) ensures the size is correct
        bytesRecvFlowControl(size);
        // No need to wait if the data amount is zero
        if (size == 0 || limitTraffic == 0) {
            return 0;
        }
        final long lastTimeCheck = lastTime.get();
        long sum = currentReadBytes.get();
        long localReadingTime = readingTime;
        long lastRB = lastReadBytes;
        final long interval = now - lastTimeCheck;
        long pastDelay = Math.max(lastReadingTime - lastTimeCheck, 0);
        if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
            // Enough interval time to compute shaping
            long time = sum * 1000 / limitTraffic - interval + pastDelay;
            if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
                if (time > maxTime && now + time - localReadingTime > maxTime) {
                    time = maxTime;
                }
                readingTime = Math.max(localReadingTime, now + time);
                return time;
            }
            readingTime = Math.max(localReadingTime, now);
            return 0;
        }
        // Take the last read interval check to get enough interval time
        long lastsum = sum + lastRB;
        long lastinterval = interval + checkInterval.get();
        long time = lastsum * 1000 / limitTraffic - lastinterval + pastDelay;
        if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
            
            if (time > maxTime && now + time - localReadingTime > maxTime) {
                time = maxTime;
            }
            readingTime = Math.max(localReadingTime, now + time);
            return time;
        }
        readingTime = Math.max(localReadingTime, now);
        return 0;
    }
Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.