实现对每一条 Channel 的连接速度的控制或监控。可以使用ChannelTrafficShapingHandler
实现。
ChannelTrafficShapingHandler#
ChannelTrafficShapingHandler 面向的是一条独立的 Channel,他是不能同时监控多个 channel 的。
源码注释
Note that this handler has a Pipeline Coverage of "one" which means a new handler must be created
for each new channel as the counter cannot be shared among all channels
在 channel 建立完毕的时候,创建一个ChannelTrafficShapingHandler
扔到 Pipline 的 First 就好了。如果要检测全局的可以使用GlobalTrafficShapingHandler
流量统计#
通过继承它,我们可以实现流量统计。
/**
* @author dqn
* created at 2019/5/3 0:52
*/
public class MyChannelCounter extends ChannelTrafficShapingHandler {
Logger logger = LoggerFactory.getLogger(MyChannelCounter.class);
private Channel channel;
public MyChannelCounter(Channel channel, long writeLimit, long readLimit, long checkInterval, long maxTime) {
super(writeLimit, readLimit, checkInterval, maxTime);
this.channel = channel;
}
@Override
protected void doAccounting(TrafficCounter counter) {
logger.info("outer[{}] wrote bytes: {}, read bytes: {}", channel.id().asShortText(),
counter.lastWrittenBytes(), counter.lastReadBytes());
// do something
super.doAccounting(counter);
}
}
///channel active
// 1 * 1024 * 1024 * 1024
long limit = 1073741824;
ctx.pipeline().addFirst(TRAFFIC_COUNTER, new MyChannelCounter(ctx.channel(), limit, limit, 1000, 30000));
可以限制读写速度、统计的间隔、流量的最大拥塞时间。
限流#
流量整形就是计算在限流情况下,计算现有的数据传输时间。在超出的情况下把 channel 停止,放到一个队列里(包装成ToSend
内部类) 传输
然后启动一个延时 Task, 在超出的时间之后让 channel 继续传输。
@Override
public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise)
throws Exception {
long size = calculateSize(msg);
long now = TrafficCounter.milliSecondFromNano();
if (size > 0) {
// 计算写等待时间
long wait = trafficCounter.writeTimeToWait(size, writeLimit, maxTime, now);
if (wait >= MINIMAL_WAIT) {
// 这行会挂起channel的写,然后wait时间之后继续写
submitWrite(ctx, msg, size, wait, now, promise);
return;
}
}
// to maintain order of write
submitWrite(ctx, msg, size, 0, now, promise);
}
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
long size = calculateSize(msg);
long now = TrafficCounter.milliSecondFromNano();
if (size > 0) {
// 在wait时间之后继续传输
long wait = trafficCounter.readTimeToWait(size, readLimit, maxTime, now);
wait = checkWaitReadTime(ctx, wait, now);
if (wait >= MINIMAL_WAIT) {
////
//略
////
if (config.isAutoRead() && isHandlerActive(ctx)) {
// 挂起
channel.attr(READ_SUSPENDED).set(true);
// 重开的task
Attribute<Runnable> attr = channel.attr(REOPEN_TASK);
Runnable reopenTask = attr.get();
if (reopenTask == null) {
reopenTask = new ReopenReadTimerTask(ctx);
attr.set(reopenTask);
}
// 延时启动
ctx.executor().schedule(reopenTask, wait, TimeUnit.MILLISECONDS);
////
//略
////
}
}
}
////
//略
////
}
@Override
void submitWrite(final ChannelHandlerContext ctx, final Object msg,
final long size, final long delay, final long now,
final ChannelPromise promise) {
final ToSend newToSend;
// write order control
synchronized (this) {
// 直接写,延时为零
if (delay == 0 && messagesQueue.isEmpty()) {
trafficCounter.bytesRealWriteFlowControl(size);
ctx.write(msg, promise);
return;
}
newToSend = new ToSend(delay + now, msg, promise);
messagesQueue.addLast(newToSend);
queueSize += size;
checkWriteSuspend(ctx, delay, queueSize);
}
final long futureNow = newToSend.relativeTimeAction;
// 一个延时任务
ctx.executor().schedule(new Runnable() {
@Override
public void run() {
sendAllValid(ctx, futureNow);
}
}, delay, TimeUnit.MILLISECONDS);
}
等待时间计算#
这个时间计算很大一坨,读取的和写的时间计算差不多。
public long readTimeToWait(final long size, final long limitTraffic, final long maxTime, final long now) {
// CAS(compare and set)确保size的大小正确
bytesRecvFlowControl(size);
// 数据量为零就不用等
if (size == 0 || limitTraffic == 0) {
return 0;
}
final long lastTimeCheck = lastTime.get();
long sum = currentReadBytes.get();
long localReadingTime = readingTime;
long lastRB = lastReadBytes;
final long interval = now - lastTimeCheck;
long pastDelay = Math.max(lastReadingTime - lastTimeCheck, 0);
if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
// Enough interval time to compute shaping
long time = sum * 1000 / limitTraffic - interval + pastDelay;
if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
if (time > maxTime && now + time - localReadingTime > maxTime) {
time = maxTime;
}
readingTime = Math.max(localReadingTime, now + time);
return time;
}
readingTime = Math.max(localReadingTime, now);
return 0;
}
// take the last read interval check to get enough interval time
long lastsum = sum + lastRB;
long lastinterval = interval + checkInterval.get();
long time = lastsum * 1000 / limitTraffic - lastinterval + pastDelay;
if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
if (time > maxTime && now + time - localReadingTime > maxTime) {
time = maxTime;
}
readingTime = Math.max(localReadingTime, now + time);
return time;
}
readingTime = Math.max(localReadingTime, now);
return 0;
}