banner
RustyNail

RustyNail

coder. 【blog】https://rustynail.me 【nostr】wss://ts.relays.world/ wss://relays.world/nostr

Netty channel的流量控制

實現對每一條 Channel 的連接速度的控制或監控。可以使用ChannelTrafficShapingHandler實現。

ChannelTrafficShapingHandler#

ChannelTrafficShapingHandler 面向的是一條獨立的 Channel,他是不能同時監控多個 channel 的。

源碼註釋

Note that this handler has a Pipeline Coverage of "one" which means a new handler must be created
for each new channel as the counter cannot be shared among all channels

在 channel 建立完畢的時候,創建一個ChannelTrafficShapingHandler扔到 Pipline 的 First 就好了。如果要檢測全局的可以使用GlobalTrafficShapingHandler

流量統計#

通過繼承它,我們可以實現流量統計。

/**
 * @author dqn
 * created at 2019/5/3 0:52
 */
public class MyChannelCounter extends ChannelTrafficShapingHandler {
    Logger logger = LoggerFactory.getLogger(MyChannelCounter.class);
    private Channel channel;

    public MyChannelCounter(Channel channel, long writeLimit, long readLimit, long checkInterval, long maxTime) {
        super(writeLimit, readLimit, checkInterval, maxTime);
        this.channel = channel;
    }

    @Override
    protected void doAccounting(TrafficCounter counter) {
        logger.info("outer[{}] wrote bytes: {}, read bytes: {}", channel.id().asShortText(),
                counter.lastWrittenBytes(), counter.lastReadBytes());
        // do something 
        super.doAccounting(counter);
    }
}

///channel active

// 1 * 1024 * 1024 * 1024
long limit = 1073741824;
ctx.pipeline().addFirst(TRAFFIC_COUNTER, new MyChannelCounter(ctx.channel(), limit, limit, 1000, 30000));

可以限制讀寫速度、統計的間隔、流量的最大擁塞時間。

限流#

流量整形就是計算在限流情況下,計算現有的數據傳輸時間。在超出的情況下把 channel 停止,放到一個隊列裡(包裝成ToSend內部類) 傳輸

然後啟動一個延時 Task, 在超出的時間之後讓 channel 繼續傳輸。

@Override
    public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise)
            throws Exception {
        long size = calculateSize(msg);
        long now = TrafficCounter.milliSecondFromNano();
        if (size > 0) {
            // 計算寫等待時間
            long wait = trafficCounter.writeTimeToWait(size, writeLimit, maxTime, now);
            if (wait >= MINIMAL_WAIT) {
                // 這行會掛起channel的寫,然後wait時間之後繼續寫
                submitWrite(ctx, msg, size, wait, now, promise);
                return;
            }
        }
        // to maintain order of write
        submitWrite(ctx, msg, size, 0, now, promise);
    }
@Override
    public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
        long size = calculateSize(msg);
        long now = TrafficCounter.milliSecondFromNano();
        if (size > 0) {
            // 在wait時間之後繼續傳輸
            long wait = trafficCounter.readTimeToWait(size, readLimit, maxTime, now);
            wait = checkWaitReadTime(ctx, wait, now);
            if (wait >= MINIMAL_WAIT) {
                ////
                //略
                ////
                if (config.isAutoRead() && isHandlerActive(ctx)) {
                    // 掛起
                    channel.attr(READ_SUSPENDED).set(true);
                    // 重開的task
                    Attribute<Runnable> attr = channel.attr(REOPEN_TASK);
                    Runnable reopenTask = attr.get();
                    if (reopenTask == null) {
                        reopenTask = new ReopenReadTimerTask(ctx);
                        attr.set(reopenTask);
                    }
                    // 延時啟動
                    ctx.executor().schedule(reopenTask, wait, TimeUnit.MILLISECONDS);
                    ////
                    //略
                    ////
                }
            }
        }
        ////
        //略
        ////
    }


    @Override
    void submitWrite(final ChannelHandlerContext ctx, final Object msg,
            final long size, final long delay, final long now,
            final ChannelPromise promise) {
        final ToSend newToSend;
        // write order control
        synchronized (this) {
            // 直接寫,延時為零
            if (delay == 0 && messagesQueue.isEmpty()) {
                trafficCounter.bytesRealWriteFlowControl(size);
                ctx.write(msg, promise);
                return;
            }
            newToSend = new ToSend(delay + now, msg, promise);
            messagesQueue.addLast(newToSend);
            queueSize += size;
            checkWriteSuspend(ctx, delay, queueSize);
        }
        final long futureNow = newToSend.relativeTimeAction;
        // 一個延時任務
        ctx.executor().schedule(new Runnable() {
            @Override
            public void run() {
                sendAllValid(ctx, futureNow);
            }
        }, delay, TimeUnit.MILLISECONDS);
    }

等待時間計算#

這個時間計算很大一坨,讀取的和寫的時間計算差不多。

public long readTimeToWait(final long size, final long limitTraffic, final long maxTime, final long now) {
        // CAS(compare and set)確保size的大小正確
        bytesRecvFlowControl(size);
        // 數據量為零就不用等
        if (size == 0 || limitTraffic == 0) {
            return 0;
        }
        final long lastTimeCheck = lastTime.get();
        long sum = currentReadBytes.get();
        long localReadingTime = readingTime;
        long lastRB = lastReadBytes;
        final long interval = now - lastTimeCheck;
        long pastDelay = Math.max(lastReadingTime - lastTimeCheck, 0);
        if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
            // Enough interval time to compute shaping
            long time = sum * 1000 / limitTraffic - interval + pastDelay;
            if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
                if (time > maxTime && now + time - localReadingTime > maxTime) {
                    time = maxTime;
                }
                readingTime = Math.max(localReadingTime, now + time);
                return time;
            }
            readingTime = Math.max(localReadingTime, now);
            return 0;
        }
        // take the last read interval check to get enough interval time
        long lastsum = sum + lastRB;
        long lastinterval = interval + checkInterval.get();
        long time = lastsum * 1000 / limitTraffic - lastinterval + pastDelay;
        if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
            
            if (time > maxTime && now + time - localReadingTime > maxTime) {
                time = maxTime;
            }
            readingTime = Math.max(localReadingTime, now + time);
            return time;
        }
        readingTime = Math.max(localReadingTime, now);
        return 0;
    }
載入中......
此文章數據所有權由區塊鏈加密技術和智能合約保障僅歸創作者所有。