banner
RustyNail

RustyNail

coder. 【blog】https://rustynail.me 【nostr】wss://ts.relays.world/ wss://relays.world/nostr

Nettyチャネルの流量制御

各 Channel の接続速度の制御または監視を実現します。ChannelTrafficShapingHandlerを使用して実現できます。

ChannelTrafficShapingHandler#

ChannelTrafficShapingHandler は独立した Channel を対象としており、複数の channel を同時に監視することはできません。

ソースコードのコメント

このハンドラは「1」のパイプラインカバレッジを持っているため、カウンタはすべてのチャネルで共有できないため、新しいチャネルごとに新しいハンドラを作成する必要があります。

channel が作成されたときに、ChannelTrafficShapingHandlerを作成し、Pipeline の最初に追加するだけで済みます。全体を監視したい場合は、GlobalTrafficShapingHandlerを使用できます。

流量統計#

それを継承することで、流量統計を実現できます。

/**
 * @author dqn
 * created at 2019/5/3 0:52
 */
public class MyChannelCounter extends ChannelTrafficShapingHandler {
    Logger logger = LoggerFactory.getLogger(MyChannelCounter.class);
    private Channel channel;

    public MyChannelCounter(Channel channel, long writeLimit, long readLimit, long checkInterval, long maxTime) {
        super(writeLimit, readLimit, checkInterval, maxTime);
        this.channel = channel;
    }

    @Override
    protected void doAccounting(TrafficCounter counter) {
        logger.info("outer[{}] wrote bytes: {}, read bytes: {}", channel.id().asShortText(),
                counter.lastWrittenBytes(), counter.lastReadBytes());
        // 何かをする 
        super.doAccounting(counter);
    }
}

///channel active

// 1 * 1024 * 1024 * 1024
long limit = 1073741824;
ctx.pipeline().addFirst(TRAFFIC_COUNTER, new MyChannelCounter(ctx.channel(), limit, limit, 1000, 30000));

読み書き速度、統計の間隔、流量の最大混雑時間を制限できます。

限流#

流量整形は、限流の状況下で既存のデータ転送時間を計算することです。超過した場合、channel を停止し、キューに入れます(ToSend内部クラスにラップ)。

その後、遅延タスクを起動し、超過時間の後に channel が転送を再開できるようにします。

@Override
    public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise)
            throws Exception {
        long size = calculateSize(msg);
        long now = TrafficCounter.milliSecondFromNano();
        if (size > 0) {
            // 書き込み待機時間を計算
            long wait = trafficCounter.writeTimeToWait(size, writeLimit, maxTime, now);
            if (wait >= MINIMAL_WAIT) {
                // この行はchannelの書き込みを一時停止し、wait時間後に再開します
                submitWrite(ctx, msg, size, wait, now, promise);
                return;
            }
        }
        // 書き込みの順序を維持するため
        submitWrite(ctx, msg, size, 0, now, promise);
    }
@Override
    public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
        long size = calculateSize(msg);
        long now = TrafficCounter.milliSecondFromNano();
        if (size > 0) {
            // wait時間後に転送を再開
            long wait = trafficCounter.readTimeToWait(size, readLimit, maxTime, now);
            wait = checkWaitReadTime(ctx, wait, now);
            if (wait >= MINIMAL_WAIT) {
                ////
                //省略
                ////
                if (config.isAutoRead() && isHandlerActive(ctx)) {
                    // 一時停止
                    channel.attr(READ_SUSPENDED).set(true);
                    // 再開タスク
                    Attribute<Runnable> attr = channel.attr(REOPEN_TASK);
                    Runnable reopenTask = attr.get();
                    if (reopenTask == null) {
                        reopenTask = new ReopenReadTimerTask(ctx);
                        attr.set(reopenTask);
                    }
                    // 遅延起動
                    ctx.executor().schedule(reopenTask, wait, TimeUnit.MILLISECONDS);
                    ////
                    //省略
                    ////
                }
            }
        }
        ////
        //省略
        ////
    }


    @Override
    void submitWrite(final ChannelHandlerContext ctx, final Object msg,
            final long size, final long delay, final long now,
            final ChannelPromise promise) {
        final ToSend newToSend;
        // 書き込み順序制御
        synchronized (this) {
            // 直接書き込み、遅延はゼロ
            if (delay == 0 && messagesQueue.isEmpty()) {
                trafficCounter.bytesRealWriteFlowControl(size);
                ctx.write(msg, promise);
                return;
            }
            newToSend = new ToSend(delay + now, msg, promise);
            messagesQueue.addLast(newToSend);
            queueSize += size;
            checkWriteSuspend(ctx, delay, queueSize);
        }
        final long futureNow = newToSend.relativeTimeAction;
        // 遅延タスク
        ctx.executor().schedule(new Runnable() {
            @Override
            public void run() {
                sendAllValid(ctx, futureNow);
            }
        }, delay, TimeUnit.MILLISECONDS);
    }

待機時間計算#

この時間計算は非常に大きな塊で、読み取りと書き込みの時間計算はほぼ同じです。

public long readTimeToWait(final long size, final long limitTraffic, final long maxTime, final long now) {
        // CAS(compare and set)でsizeの大きさを正しく保つ
        bytesRecvFlowControl(size);
        // データ量がゼロの場合は待つ必要はありません
        if (size == 0 || limitTraffic == 0) {
            return 0;
        }
        final long lastTimeCheck = lastTime.get();
        long sum = currentReadBytes.get();
        long localReadingTime = readingTime;
        long lastRB = lastReadBytes;
        final long interval = now - lastTimeCheck;
        long pastDelay = Math.max(lastReadingTime - lastTimeCheck, 0);
        if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
            // 整形を計算するのに十分な間隔時間
            long time = sum * 1000 / limitTraffic - interval + pastDelay;
            if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
                if (time > maxTime && now + time - localReadingTime > maxTime) {
                    time = maxTime;
                }
                readingTime = Math.max(localReadingTime, now + time);
                return time;
            }
            readingTime = Math.max(localReadingTime, now);
            return 0;
        }
        // 最後の読み取り間隔チェックを利用して十分な間隔時間を取得
        long lastsum = sum + lastRB;
        long lastinterval = interval + checkInterval.get();
        long time = lastsum * 1000 / limitTraffic - lastinterval + pastDelay;
        if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
            
            if (time > maxTime && now + time - localReadingTime > maxTime) {
                time = maxTime;
            }
            readingTime = Math.max(localReadingTime, now + time);
            return time;
        }
        readingTime = Math.max(localReadingTime, now);
        return 0;
    }
読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。