各 Channel の接続速度の制御または監視を実現します。ChannelTrafficShapingHandler
を使用して実現できます。
ChannelTrafficShapingHandler#
ChannelTrafficShapingHandler は独立した Channel を対象としており、複数の channel を同時に監視することはできません。
ソースコードのコメント
このハンドラは「1」のパイプラインカバレッジを持っているため、カウンタはすべてのチャネルで共有できないため、新しいチャネルごとに新しいハンドラを作成する必要があります。
channel が作成されたときに、ChannelTrafficShapingHandler
を作成し、Pipeline の最初に追加するだけで済みます。全体を監視したい場合は、GlobalTrafficShapingHandler
を使用できます。
流量統計#
それを継承することで、流量統計を実現できます。
/**
* @author dqn
* created at 2019/5/3 0:52
*/
public class MyChannelCounter extends ChannelTrafficShapingHandler {
Logger logger = LoggerFactory.getLogger(MyChannelCounter.class);
private Channel channel;
public MyChannelCounter(Channel channel, long writeLimit, long readLimit, long checkInterval, long maxTime) {
super(writeLimit, readLimit, checkInterval, maxTime);
this.channel = channel;
}
@Override
protected void doAccounting(TrafficCounter counter) {
logger.info("outer[{}] wrote bytes: {}, read bytes: {}", channel.id().asShortText(),
counter.lastWrittenBytes(), counter.lastReadBytes());
// 何かをする
super.doAccounting(counter);
}
}
///channel active
// 1 * 1024 * 1024 * 1024
long limit = 1073741824;
ctx.pipeline().addFirst(TRAFFIC_COUNTER, new MyChannelCounter(ctx.channel(), limit, limit, 1000, 30000));
読み書き速度、統計の間隔、流量の最大混雑時間を制限できます。
限流#
流量整形は、限流の状況下で既存のデータ転送時間を計算することです。超過した場合、channel を停止し、キューに入れます(ToSend
内部クラスにラップ)。
その後、遅延タスクを起動し、超過時間の後に channel が転送を再開できるようにします。
@Override
public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise)
throws Exception {
long size = calculateSize(msg);
long now = TrafficCounter.milliSecondFromNano();
if (size > 0) {
// 書き込み待機時間を計算
long wait = trafficCounter.writeTimeToWait(size, writeLimit, maxTime, now);
if (wait >= MINIMAL_WAIT) {
// この行はchannelの書き込みを一時停止し、wait時間後に再開します
submitWrite(ctx, msg, size, wait, now, promise);
return;
}
}
// 書き込みの順序を維持するため
submitWrite(ctx, msg, size, 0, now, promise);
}
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
long size = calculateSize(msg);
long now = TrafficCounter.milliSecondFromNano();
if (size > 0) {
// wait時間後に転送を再開
long wait = trafficCounter.readTimeToWait(size, readLimit, maxTime, now);
wait = checkWaitReadTime(ctx, wait, now);
if (wait >= MINIMAL_WAIT) {
////
//省略
////
if (config.isAutoRead() && isHandlerActive(ctx)) {
// 一時停止
channel.attr(READ_SUSPENDED).set(true);
// 再開タスク
Attribute<Runnable> attr = channel.attr(REOPEN_TASK);
Runnable reopenTask = attr.get();
if (reopenTask == null) {
reopenTask = new ReopenReadTimerTask(ctx);
attr.set(reopenTask);
}
// 遅延起動
ctx.executor().schedule(reopenTask, wait, TimeUnit.MILLISECONDS);
////
//省略
////
}
}
}
////
//省略
////
}
@Override
void submitWrite(final ChannelHandlerContext ctx, final Object msg,
final long size, final long delay, final long now,
final ChannelPromise promise) {
final ToSend newToSend;
// 書き込み順序制御
synchronized (this) {
// 直接書き込み、遅延はゼロ
if (delay == 0 && messagesQueue.isEmpty()) {
trafficCounter.bytesRealWriteFlowControl(size);
ctx.write(msg, promise);
return;
}
newToSend = new ToSend(delay + now, msg, promise);
messagesQueue.addLast(newToSend);
queueSize += size;
checkWriteSuspend(ctx, delay, queueSize);
}
final long futureNow = newToSend.relativeTimeAction;
// 遅延タスク
ctx.executor().schedule(new Runnable() {
@Override
public void run() {
sendAllValid(ctx, futureNow);
}
}, delay, TimeUnit.MILLISECONDS);
}
待機時間計算#
この時間計算は非常に大きな塊で、読み取りと書き込みの時間計算はほぼ同じです。
public long readTimeToWait(final long size, final long limitTraffic, final long maxTime, final long now) {
// CAS(compare and set)でsizeの大きさを正しく保つ
bytesRecvFlowControl(size);
// データ量がゼロの場合は待つ必要はありません
if (size == 0 || limitTraffic == 0) {
return 0;
}
final long lastTimeCheck = lastTime.get();
long sum = currentReadBytes.get();
long localReadingTime = readingTime;
long lastRB = lastReadBytes;
final long interval = now - lastTimeCheck;
long pastDelay = Math.max(lastReadingTime - lastTimeCheck, 0);
if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
// 整形を計算するのに十分な間隔時間
long time = sum * 1000 / limitTraffic - interval + pastDelay;
if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
if (time > maxTime && now + time - localReadingTime > maxTime) {
time = maxTime;
}
readingTime = Math.max(localReadingTime, now + time);
return time;
}
readingTime = Math.max(localReadingTime, now);
return 0;
}
// 最後の読み取り間隔チェックを利用して十分な間隔時間を取得
long lastsum = sum + lastRB;
long lastinterval = interval + checkInterval.get();
long time = lastsum * 1000 / limitTraffic - lastinterval + pastDelay;
if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
if (time > maxTime && now + time - localReadingTime > maxTime) {
time = maxTime;
}
readingTime = Math.max(localReadingTime, now + time);
return time;
}
readingTime = Math.max(localReadingTime, now);
return 0;
}