Control or monitor the connection speed of each Channel. This can be achieved using ChannelTrafficShapingHandler
.
ChannelTrafficShapingHandler#
ChannelTrafficShapingHandler is designed for a single Channel and cannot monitor multiple channels simultaneously.
Source Code Comment
Note that this handler has a Pipeline Coverage of "one" which means a new handler must be created
for each new channel as the counter cannot be shared among all channels
When the channel is established, create a ChannelTrafficShapingHandler
and add it to the Pipeline's First. If you want to monitor globally, you can use GlobalTrafficShapingHandler
.
Traffic Statistics#
By inheriting it, we can implement traffic statistics.
/**
* @author dqn
* created at 2019/5/3 0:52
*/
public class MyChannelCounter extends ChannelTrafficShapingHandler {
Logger logger = LoggerFactory.getLogger(MyChannelCounter.class);
private Channel channel;
public MyChannelCounter(Channel channel, long writeLimit, long readLimit, long checkInterval, long maxTime) {
super(writeLimit, readLimit, checkInterval, maxTime);
this.channel = channel;
}
@Override
protected void doAccounting(TrafficCounter counter) {
logger.info("outer[{}] wrote bytes: {}, read bytes: {}", channel.id().asShortText(),
counter.lastWrittenBytes(), counter.lastReadBytes());
// do something
super.doAccounting(counter);
}
}
///channel active
// 1 * 1024 * 1024 * 1024
long limit = 1073741824;
ctx.pipeline().addFirst(TRAFFIC_COUNTER, new MyChannelCounter(ctx.channel(), limit, limit, 1000, 30000));
You can limit read and write speeds, the interval for statistics, and the maximum congestion time for traffic.
Rate Limiting#
Traffic shaping calculates the existing data transmission time under rate limiting conditions. When exceeded, the channel is stopped and placed in a queue (wrapped as an internal class ToSend
) for transmission.
Then, a delayed task is started to allow the channel to continue transmission after the exceeded time.
@Override
public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise)
throws Exception {
long size = calculateSize(msg);
long now = TrafficCounter.milliSecondFromNano();
if (size > 0) {
// Calculate write wait time
long wait = trafficCounter.writeTimeToWait(size, writeLimit, maxTime, now);
if (wait >= MINIMAL_WAIT) {
// This line will suspend the channel's write, then continue writing after the wait time
submitWrite(ctx, msg, size, wait, now, promise);
return;
}
}
// to maintain order of write
submitWrite(ctx, msg, size, 0, now, promise);
}
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
long size = calculateSize(msg);
long now = TrafficCounter.milliSecondFromNano();
if (size > 0) {
// Continue transmission after wait time
long wait = trafficCounter.readTimeToWait(size, readLimit, maxTime, now);
wait = checkWaitReadTime(ctx, wait, now);
if (wait >= MINIMAL_WAIT) {
////
// omitted
////
if (config.isAutoRead() && isHandlerActive(ctx)) {
// Suspend
channel.attr(READ_SUSPENDED).set(true);
// Reopen task
Attribute<Runnable> attr = channel.attr(REOPEN_TASK);
Runnable reopenTask = attr.get();
if (reopenTask == null) {
reopenTask = new ReopenReadTimerTask(ctx);
attr.set(reopenTask);
}
// Delayed start
ctx.executor().schedule(reopenTask, wait, TimeUnit.MILLISECONDS);
////
// omitted
////
}
}
}
////
// omitted
////
}
@Override
void submitWrite(final ChannelHandlerContext ctx, final Object msg,
final long size, final long delay, final long now,
final ChannelPromise promise) {
final ToSend newToSend;
// write order control
synchronized (this) {
// Directly write, delay is zero
if (delay == 0 && messagesQueue.isEmpty()) {
trafficCounter.bytesRealWriteFlowControl(size);
ctx.write(msg, promise);
return;
}
newToSend = new ToSend(delay + now, msg, promise);
messagesQueue.addLast(newToSend);
queueSize += size;
checkWriteSuspend(ctx, delay, queueSize);
}
final long futureNow = newToSend.relativeTimeAction;
// A delayed task
ctx.executor().schedule(new Runnable() {
@Override
public void run() {
sendAllValid(ctx, futureNow);
}
}, delay, TimeUnit.MILLISECONDS);
}
Wait Time Calculation#
This time calculation is quite large, and the read and write time calculations are similar.
public long readTimeToWait(final long size, final long limitTraffic, final long maxTime, final long now) {
// CAS (compare and set) ensures the size is correct
bytesRecvFlowControl(size);
// No need to wait if the data amount is zero
if (size == 0 || limitTraffic == 0) {
return 0;
}
final long lastTimeCheck = lastTime.get();
long sum = currentReadBytes.get();
long localReadingTime = readingTime;
long lastRB = lastReadBytes;
final long interval = now - lastTimeCheck;
long pastDelay = Math.max(lastReadingTime - lastTimeCheck, 0);
if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
// Enough interval time to compute shaping
long time = sum * 1000 / limitTraffic - interval + pastDelay;
if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
if (time > maxTime && now + time - localReadingTime > maxTime) {
time = maxTime;
}
readingTime = Math.max(localReadingTime, now + time);
return time;
}
readingTime = Math.max(localReadingTime, now);
return 0;
}
// Take the last read interval check to get enough interval time
long lastsum = sum + lastRB;
long lastinterval = interval + checkInterval.get();
long time = lastsum * 1000 / limitTraffic - lastinterval + pastDelay;
if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
if (time > maxTime && now + time - localReadingTime > maxTime) {
time = maxTime;
}
readingTime = Math.max(localReadingTime, now + time);
return time;
}
readingTime = Math.max(localReadingTime, now);
return 0;
}