實現對每一條 Channel 的連接速度的控制或監控。可以使用ChannelTrafficShapingHandler
實現。
ChannelTrafficShapingHandler#
ChannelTrafficShapingHandler 面向的是一條獨立的 Channel,他是不能同時監控多個 channel 的。
源碼註釋
Note that this handler has a Pipeline Coverage of "one" which means a new handler must be created
for each new channel as the counter cannot be shared among all channels
在 channel 建立完畢的時候,創建一個ChannelTrafficShapingHandler
扔到 Pipline 的 First 就好了。如果要檢測全局的可以使用GlobalTrafficShapingHandler
流量統計#
通過繼承它,我們可以實現流量統計。
/**
* @author dqn
* created at 2019/5/3 0:52
*/
public class MyChannelCounter extends ChannelTrafficShapingHandler {
Logger logger = LoggerFactory.getLogger(MyChannelCounter.class);
private Channel channel;
public MyChannelCounter(Channel channel, long writeLimit, long readLimit, long checkInterval, long maxTime) {
super(writeLimit, readLimit, checkInterval, maxTime);
this.channel = channel;
}
@Override
protected void doAccounting(TrafficCounter counter) {
logger.info("outer[{}] wrote bytes: {}, read bytes: {}", channel.id().asShortText(),
counter.lastWrittenBytes(), counter.lastReadBytes());
// do something
super.doAccounting(counter);
}
}
///channel active
// 1 * 1024 * 1024 * 1024
long limit = 1073741824;
ctx.pipeline().addFirst(TRAFFIC_COUNTER, new MyChannelCounter(ctx.channel(), limit, limit, 1000, 30000));
可以限制讀寫速度、統計的間隔、流量的最大擁塞時間。
限流#
流量整形就是計算在限流情況下,計算現有的數據傳輸時間。在超出的情況下把 channel 停止,放到一個隊列裡(包裝成ToSend
內部類) 傳輸
然後啟動一個延時 Task, 在超出的時間之後讓 channel 繼續傳輸。
@Override
public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise)
throws Exception {
long size = calculateSize(msg);
long now = TrafficCounter.milliSecondFromNano();
if (size > 0) {
// 計算寫等待時間
long wait = trafficCounter.writeTimeToWait(size, writeLimit, maxTime, now);
if (wait >= MINIMAL_WAIT) {
// 這行會掛起channel的寫,然後wait時間之後繼續寫
submitWrite(ctx, msg, size, wait, now, promise);
return;
}
}
// to maintain order of write
submitWrite(ctx, msg, size, 0, now, promise);
}
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
long size = calculateSize(msg);
long now = TrafficCounter.milliSecondFromNano();
if (size > 0) {
// 在wait時間之後繼續傳輸
long wait = trafficCounter.readTimeToWait(size, readLimit, maxTime, now);
wait = checkWaitReadTime(ctx, wait, now);
if (wait >= MINIMAL_WAIT) {
////
//略
////
if (config.isAutoRead() && isHandlerActive(ctx)) {
// 掛起
channel.attr(READ_SUSPENDED).set(true);
// 重開的task
Attribute<Runnable> attr = channel.attr(REOPEN_TASK);
Runnable reopenTask = attr.get();
if (reopenTask == null) {
reopenTask = new ReopenReadTimerTask(ctx);
attr.set(reopenTask);
}
// 延時啟動
ctx.executor().schedule(reopenTask, wait, TimeUnit.MILLISECONDS);
////
//略
////
}
}
}
////
//略
////
}
@Override
void submitWrite(final ChannelHandlerContext ctx, final Object msg,
final long size, final long delay, final long now,
final ChannelPromise promise) {
final ToSend newToSend;
// write order control
synchronized (this) {
// 直接寫,延時為零
if (delay == 0 && messagesQueue.isEmpty()) {
trafficCounter.bytesRealWriteFlowControl(size);
ctx.write(msg, promise);
return;
}
newToSend = new ToSend(delay + now, msg, promise);
messagesQueue.addLast(newToSend);
queueSize += size;
checkWriteSuspend(ctx, delay, queueSize);
}
final long futureNow = newToSend.relativeTimeAction;
// 一個延時任務
ctx.executor().schedule(new Runnable() {
@Override
public void run() {
sendAllValid(ctx, futureNow);
}
}, delay, TimeUnit.MILLISECONDS);
}
等待時間計算#
這個時間計算很大一坨,讀取的和寫的時間計算差不多。
public long readTimeToWait(final long size, final long limitTraffic, final long maxTime, final long now) {
// CAS(compare and set)確保size的大小正確
bytesRecvFlowControl(size);
// 數據量為零就不用等
if (size == 0 || limitTraffic == 0) {
return 0;
}
final long lastTimeCheck = lastTime.get();
long sum = currentReadBytes.get();
long localReadingTime = readingTime;
long lastRB = lastReadBytes;
final long interval = now - lastTimeCheck;
long pastDelay = Math.max(lastReadingTime - lastTimeCheck, 0);
if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
// Enough interval time to compute shaping
long time = sum * 1000 / limitTraffic - interval + pastDelay;
if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
if (time > maxTime && now + time - localReadingTime > maxTime) {
time = maxTime;
}
readingTime = Math.max(localReadingTime, now + time);
return time;
}
readingTime = Math.max(localReadingTime, now);
return 0;
}
// take the last read interval check to get enough interval time
long lastsum = sum + lastRB;
long lastinterval = interval + checkInterval.get();
long time = lastsum * 1000 / limitTraffic - lastinterval + pastDelay;
if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
if (time > maxTime && now + time - localReadingTime > maxTime) {
time = maxTime;
}
readingTime = Math.max(localReadingTime, now + time);
return time;
}
readingTime = Math.max(localReadingTime, now);
return 0;
}