netty3.6.2 En el proceso de escritura de datos y cómo manejar los datos escritos después de escribirlos
Código Java
public void eventSunk(
Canalización de ChannelPipeline , ChannelEvent e) arroja una excepción {
if (e instancia de ChannelStateEvent) {
. .....
} else if (e instancia de MessageEvent) {
Evento MessageEvent = (MessageEvent) e;
Canal NioSocketChannel = (NioSocketChannel) evento .getChannel();
booleano ofrecido = canal.writeBufferQueue.offer(event);//Escribe el writeBufferQueue del canal
assert ofrecido;
canal .worker.writeFromUserCode(channel);
}
}
El método de oferta de WriteRequestQueue determina si se ha procesado en función del tamaño total del mensaje almacenado en caché. bytes. Se superó el nivel de agua alto. Si excede highWaterMark por primera vez, se producirá fireChannelInterestChanged; si los datos aún se colocan en la cola en el futuro y el tamaño de los mensajes almacenados en caché continúa excediendo la marca highWater, no se producirá fireChannelInterestChanged.
Código Java
oferta pública booleana(MessageEvent e) {
éxito booleano = queue.offer(e);
afirmar éxito ;
int messageSize = getMessageSize(e);
int newWriteBufferSize = writeBufferSize.
addAndGet(messageSize);
int highWaterMark = getConfig(). getWriteBufferHighWaterMark();
if (newWriteBufferSize >= highWaterMark) {
if (newWriteBufferSize - messageSize <.highWaterMark) {
highWaterMarkCounter.incrementAndGet();
if (!notifying.get()) {
notifying.set(Boolean.TRUE) ;
fireChannelInterestChanged(AbstractNioChannel.this);
notifying.set(Boolean.FALSE);
f (newWriteBuffSize - messageSize <.FALSE); p> p>
}
}
}
}
devuelve verdadero)
} p>
fireChannelInterestChanged Este método llamará a SimpleChannelUpstreamHandler.handleUpstream, activando SimpleChannelUpstreamHandler.channelInterestChanged. El valor alto de la marca de agua se puede establecer a través de Bootstrap, que eventualmente llamará al método DefaultNioSocketChannelConfig.setOption.writeBufferHighWaterMark.
El valor predeterminado de writeBufferHighWaterMark es 64K
Código Java
setOption público booleano (clave de cadena, valor del objeto) {
if (super.setOption(clave, valor ) ) {
return true;
}
if (" writeBufferHighWaterMark".equals(key)) {
setWriteBufferHighWaterMark0(ConversionUtil . toInt(valor));
} else if (" writeBufferLowWaterMark".equals(key)) {
setWriteBufferLowWaterMark0(ConversionUtil.toInt(valor));
} else if (" writeSpinCount".equals(key)) {
setWriteSpinCount(ConversionUtil.toInt(value));
} else if (" recibeBufferSizePredictorFactory".equals( clave )) {
setReceiveBufferSizePredictorFactory((ReceiveBufferSizePredictorFactory) valor);
} else if ("receiveBufferSizePredictor".equals(key)) {
setReceiveBufferSizePredictor(( RecibirBufferSizePredictor ) valor);
} else {
devuelve falso;
}
devuelve verdadero;
}
Luego, en write0, extraerá los datos de la cola y, mientras extrae los datos, si se descubre que la extracción actual hará que el tamaño de los datos almacenados en caché (en bytes) cambie desde más alto que Si el writeBufferLowWaterMark de la marca de nivel bajo cae por debajo de la marca de nivel bajo, es decir, cruza la marca de nivel bajo, el evento fireChannelInterestChanged se activará nuevamente.
El valor predeterminado de writeBufferLowWaterMark es 32K
Código Java
Código Java
public MessageEvent poll() {
MessageEvent e = queue.poll();
if (e != null) {
int messageSize = getMessageSize(e);
int newWriteBufferSize = writeBufferSize.addAndGet(-messageSize); p >
int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
if (newWriteBufferSize == 0 || newWriteBufferSize < lowWaterMark) {
if (newWriteBufferSize + messageSize >= lowWaterMark ) {// De esta manera, el tamaño de los datos almacenados en caché será menor que la marca de límite inferior
highWaterMarkCounter.decreaseAndGet();
Si el tamaño de los datos almacenados en caché es menor que el marca de agua baja, los datos almacenados en caché se eliminarán.
disminuirAndGet();
if (isConnected() && !notifying.get()) {
notificar.set(Boolean.TRUE);
fireChannelInterestChanged( AbstractNioChannel.
notificando.set(Boolean.FALSE);
}
}
}
}
}
return e;
}
Tanto exceder el nivel de agua alto como caer por debajo del nivel de agua bajo desencadenar fireChannelInterestChanged. ¿Cómo distinguir la tela de lana? A través de AbstractChannel.isWritable(), si OP_WRITE está registrado en interestOps del canal, entonces no se puede escribir; de lo contrario, se puede escribir
Código Java
public boolean isWritable() { p>
return (getInterestOps() & OP_WRITE) == 0;
}
public int getInterestOps() {
if(! isOpen( )) {
return Channel.OP_ WRITE;
}
int interestOps = getRawInterestOps();
int writeBufferSize = esto. writeBufferSize.get();
if ( writeBufferSize ! = 0) {
if (highWaterMarkCounter.get() > 0) {// Recuerda este valor cuando Cuando se ingresan datos la cola de envío, el valor es +=1, cuando los datos se extraen de la cola, el valor es -=1
int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
if (writeBufferSize >= lowWaterMark) {//La cantidad de datos en la cola de caché que excede la marca de límite superior también excede la marca de límite inferior, lo que significa que la marca de límite superior > la marca de límite inferior es igual a que se realizó una operación de escritura registrado en este momento
interestOps |= Channel.OP_WRITE;
} else {
interestOps &= ~Channel.OP_WRITE;
} más {
interestOps &= ~Channel.OP_WRITE;
} más {
interestOps &= ~Channel.OP_WRITE;
}.OP_WRITE;// cola de caché La cantidad de datos excede el nivel máximo pero también excede el nivel bajo, lo que significa nivel bajo > nivel alto En este momento, significa que no se registra ninguna operación de escritura
}<. /p>
} else {//Excede el nivel máximo de agua Contador de agua <= 0, lo que significa que el volumen de datos actual es menor que el nivel máximo de agua
int highWaterMark = getConfig().getWriteBufferHighWaterMark();
if (writeBufferSize >= highWaterMark) {// aquí, la cantidad de datos almacenados en caché todavía está por encima de la marca máxima..... ¿Simultaneidad? Por definición,
El procesamiento de canales es un procesamiento de un solo subproceso, lo que equivale a registrar operaciones de escritura
interestOps |= Channel.OP_WRITE;
} else {
interestOps &= ~ Canal .OP_WRITE;
}
}
} más {
interestOps &= ~Channel.OP_WRITE;//la cola de escritura es vacío, no se registró ninguna operación de escritura
}
return interestOps;
}
Significa que si se excede el nivel alto de agua, se puede escribir. ()== falso, más bajo que el nivel de agua bajo esWritable()==verdadero, el nivel de agua bajo tiene mayor prioridad que el nivel de agua alto, es decir, si el agua actual > el nivel de agua bajo no se puede escribir, de lo contrario se puede escribir
Si los datos se escriben en la máquina a través de netty, pero la velocidad de escritura es muy lenta, lo que hará que todos los datos se almacenen en caché en la cola de envío de netty. Si no se controla, puede provocar. gc/cms gc completo frecuente, e incluso OOM final Utilice AbstractChannel.isWritable Para controlar si se deben continuar escribiendo datos, descarte los datos si AbstractChannel.isWritable==false, o registre el estado de los datos enviados y envíelos nuevamente después del. El nivel de agua de la cola de datos de caché posterior cae a un nivel seguro.