banner
RustyNail

RustyNail

coder. 【blog】https://rustynail.me 【nostr】wss://ts.relays.world/ wss://relays.world/nostr

Netty channel的流量控制

实现对每一条 Channel 的连接速度的控制或监控。可以使用ChannelTrafficShapingHandler实现。

ChannelTrafficShapingHandler#

ChannelTrafficShapingHandler 面向的是一条独立的 Channel,他是不能同时监控多个 channel 的。

源码注释

Note that this handler has a Pipeline Coverage of "one" which means a new handler must be created
for each new channel as the counter cannot be shared among all channels

在 channel 建立完毕的时候,创建一个ChannelTrafficShapingHandler扔到 Pipline 的 First 就好了。如果要检测全局的可以使用GlobalTrafficShapingHandler

流量统计#

通过继承它,我们可以实现流量统计。

/**
 * @author dqn
 * created at 2019/5/3 0:52
 */
public class MyChannelCounter extends ChannelTrafficShapingHandler {
    Logger logger = LoggerFactory.getLogger(MyChannelCounter.class);
    private Channel channel;

    public MyChannelCounter(Channel channel, long writeLimit, long readLimit, long checkInterval, long maxTime) {
        super(writeLimit, readLimit, checkInterval, maxTime);
        this.channel = channel;
    }

    @Override
    protected void doAccounting(TrafficCounter counter) {
        logger.info("outer[{}] wrote bytes: {}, read bytes: {}", channel.id().asShortText(),
                counter.lastWrittenBytes(), counter.lastReadBytes());
        // do something 
        super.doAccounting(counter);
    }
}

///channel active

// 1 * 1024 * 1024 * 1024
long limit = 1073741824;
ctx.pipeline().addFirst(TRAFFIC_COUNTER, new MyChannelCounter(ctx.channel(), limit, limit, 1000, 30000));

可以限制读写速度、统计的间隔、流量的最大拥塞时间。

限流#

流量整形就是计算在限流情况下,计算现有的数据传输时间。在超出的情况下把 channel 停止,放到一个队列里(包装成ToSend内部类) 传输

然后启动一个延时 Task, 在超出的时间之后让 channel 继续传输。

@Override
    public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise)
            throws Exception {
        long size = calculateSize(msg);
        long now = TrafficCounter.milliSecondFromNano();
        if (size > 0) {
            // 计算写等待时间
            long wait = trafficCounter.writeTimeToWait(size, writeLimit, maxTime, now);
            if (wait >= MINIMAL_WAIT) {
                // 这行会挂起channel的写,然后wait时间之后继续写
                submitWrite(ctx, msg, size, wait, now, promise);
                return;
            }
        }
        // to maintain order of write
        submitWrite(ctx, msg, size, 0, now, promise);
    }
@Override
    public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
        long size = calculateSize(msg);
        long now = TrafficCounter.milliSecondFromNano();
        if (size > 0) {
            // 在wait时间之后继续传输
            long wait = trafficCounter.readTimeToWait(size, readLimit, maxTime, now);
            wait = checkWaitReadTime(ctx, wait, now);
            if (wait >= MINIMAL_WAIT) {
                ////
                //略
                ////
                if (config.isAutoRead() && isHandlerActive(ctx)) {
                    // 挂起
                    channel.attr(READ_SUSPENDED).set(true);
                    // 重开的task
                    Attribute<Runnable> attr = channel.attr(REOPEN_TASK);
                    Runnable reopenTask = attr.get();
                    if (reopenTask == null) {
                        reopenTask = new ReopenReadTimerTask(ctx);
                        attr.set(reopenTask);
                    }
                    // 延时启动
                    ctx.executor().schedule(reopenTask, wait, TimeUnit.MILLISECONDS);
                    ////
                    //略
                    ////
                }
            }
        }
        ////
        //略
        ////
    }


    @Override
    void submitWrite(final ChannelHandlerContext ctx, final Object msg,
            final long size, final long delay, final long now,
            final ChannelPromise promise) {
        final ToSend newToSend;
        // write order control
        synchronized (this) {
            // 直接写,延时为零
            if (delay == 0 && messagesQueue.isEmpty()) {
                trafficCounter.bytesRealWriteFlowControl(size);
                ctx.write(msg, promise);
                return;
            }
            newToSend = new ToSend(delay + now, msg, promise);
            messagesQueue.addLast(newToSend);
            queueSize += size;
            checkWriteSuspend(ctx, delay, queueSize);
        }
        final long futureNow = newToSend.relativeTimeAction;
        // 一个延时任务
        ctx.executor().schedule(new Runnable() {
            @Override
            public void run() {
                sendAllValid(ctx, futureNow);
            }
        }, delay, TimeUnit.MILLISECONDS);
    }

等待时间计算#

这个时间计算很大一坨,读取的和写的时间计算差不多。

public long readTimeToWait(final long size, final long limitTraffic, final long maxTime, final long now) {
        // CAS(compare and set)确保size的大小正确
        bytesRecvFlowControl(size);
        // 数据量为零就不用等
        if (size == 0 || limitTraffic == 0) {
            return 0;
        }
        final long lastTimeCheck = lastTime.get();
        long sum = currentReadBytes.get();
        long localReadingTime = readingTime;
        long lastRB = lastReadBytes;
        final long interval = now - lastTimeCheck;
        long pastDelay = Math.max(lastReadingTime - lastTimeCheck, 0);
        if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
            // Enough interval time to compute shaping
            long time = sum * 1000 / limitTraffic - interval + pastDelay;
            if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
                if (time > maxTime && now + time - localReadingTime > maxTime) {
                    time = maxTime;
                }
                readingTime = Math.max(localReadingTime, now + time);
                return time;
            }
            readingTime = Math.max(localReadingTime, now);
            return 0;
        }
        // take the last read interval check to get enough interval time
        long lastsum = sum + lastRB;
        long lastinterval = interval + checkInterval.get();
        long time = lastsum * 1000 / limitTraffic - lastinterval + pastDelay;
        if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
            
            if (time > maxTime && now + time - localReadingTime > maxTime) {
                time = maxTime;
            }
            readingTime = Math.max(localReadingTime, now + time);
            return time;
        }
        readingTime = Math.max(localReadingTime, now);
        return 0;
    }
Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.