From d4030dd67fc5335537dc684f9fdd83a7c5fbf4ff Mon Sep 17 00:00:00 2001 From: Open592 Developer Date: Sun, 27 Oct 2024 03:03:42 -0700 Subject: [PATCH 1/6] protocol: Second version of this Currently working on implementation Signed-off-by: Open592 Developer --- .../net/NetworkChannelInitializer.kt | 31 ++++- .../fileserver/net/js5/Js5ChannelHandler.kt | 70 ++++++++++ .../open592/fileserver/net/js5/Js5Client.kt | 44 ++++++ .../open592/fileserver/net/js5/Js5Service.kt | 131 ++++++++++++++++++ .../protocol/inbound/Js5InboundMessage.kt | 18 +++ .../inbound/Js5InboundMessageDecoder.kt | 97 +++++++++++++ .../Js5ClientIsOutOfDateMessageEncoder.kt | 10 ++ .../outbound/Js5GroupMessageEncoder.kt | 55 ++++++++ .../outbound/Js5IpLimitMessageEncoder.kt | 8 ++ .../protocol/outbound/Js5OkMessageEncoder.kt | 8 ++ .../protocol/outbound/Js5OutboundMessage.kt | 16 +++ .../outbound/Js5ServerFullMessageEncoder.kt | 8 ++ .../SimpleOpcodeMessageToByteEncoder.kt | 26 ++++ .../fileserver/server/FileServerModule.kt | 2 + 14 files changed, 522 insertions(+), 2 deletions(-) create mode 100644 src/main/kotlin/com/open592/fileserver/net/js5/Js5ChannelHandler.kt create mode 100644 src/main/kotlin/com/open592/fileserver/net/js5/Js5Client.kt create mode 100644 src/main/kotlin/com/open592/fileserver/net/js5/Js5Service.kt create mode 100644 src/main/kotlin/com/open592/fileserver/protocol/inbound/Js5InboundMessage.kt create mode 100644 src/main/kotlin/com/open592/fileserver/protocol/inbound/Js5InboundMessageDecoder.kt create mode 100644 src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5ClientIsOutOfDateMessageEncoder.kt create mode 100644 src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5GroupMessageEncoder.kt create mode 100644 src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5IpLimitMessageEncoder.kt create mode 100644 src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5OkMessageEncoder.kt create mode 100644 src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5OutboundMessage.kt create mode 100644 src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5ServerFullMessageEncoder.kt create mode 100644 src/main/kotlin/com/open592/fileserver/protocol/outbound/SimpleOpcodeMessageToByteEncoder.kt diff --git a/src/main/kotlin/com/open592/fileserver/net/NetworkChannelInitializer.kt b/src/main/kotlin/com/open592/fileserver/net/NetworkChannelInitializer.kt index 0293851..d6ddf85 100644 --- a/src/main/kotlin/com/open592/fileserver/net/NetworkChannelInitializer.kt +++ b/src/main/kotlin/com/open592/fileserver/net/NetworkChannelInitializer.kt @@ -1,22 +1,49 @@ package com.open592.fileserver.net +import com.github.michaelbull.logging.InlineLogger +import com.open592.fileserver.net.js5.Js5ChannelHandler +import com.open592.fileserver.protocol.inbound.Js5InboundMessageDecoder +import com.open592.fileserver.protocol.outbound.Js5ClientIsOutOfDateMessageEncoder +import com.open592.fileserver.protocol.outbound.Js5GroupMessageEncoder +import com.open592.fileserver.protocol.outbound.Js5IpLimitMessageEncoder +import com.open592.fileserver.protocol.outbound.Js5OkMessageEncoder +import com.open592.fileserver.protocol.outbound.Js5ServerFullMessageEncoder import io.netty.channel.Channel +import io.netty.channel.ChannelHandlerContext import io.netty.channel.ChannelInitializer import io.netty.handler.timeout.IdleStateHandler +import jakarta.inject.Inject +import jakarta.inject.Provider import jakarta.inject.Singleton import java.util.concurrent.TimeUnit @Singleton -class NetworkChannelInitializer : ChannelInitializer() { +class NetworkChannelInitializer +@Inject +constructor(private val handlerProvider: Provider) : + ChannelInitializer() { override fun initChannel(channel: Channel) { channel .pipeline() .addLast( IdleStateHandler( - true, TIMEOUT_SECONDS, TIMEOUT_SECONDS, TIMEOUT_SECONDS, TimeUnit.SECONDS)) + true, TIMEOUT_SECONDS, TIMEOUT_SECONDS, TIMEOUT_SECONDS, TimeUnit.SECONDS), + Js5InboundMessageDecoder(), + Js5OkMessageEncoder(), + Js5GroupMessageEncoder(), + Js5ClientIsOutOfDateMessageEncoder(), + Js5ServerFullMessageEncoder(), + Js5IpLimitMessageEncoder(), + handlerProvider.get(), + ) + } + + override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { + logger.error { "Caught exception: ${cause.message}" } } private companion object { private const val TIMEOUT_SECONDS: Long = 30 + private val logger = InlineLogger() } } diff --git a/src/main/kotlin/com/open592/fileserver/net/js5/Js5ChannelHandler.kt b/src/main/kotlin/com/open592/fileserver/net/js5/Js5ChannelHandler.kt new file mode 100644 index 0000000..7b5a31d --- /dev/null +++ b/src/main/kotlin/com/open592/fileserver/net/js5/Js5ChannelHandler.kt @@ -0,0 +1,70 @@ +package com.open592.fileserver.net.js5 + +import com.github.michaelbull.logging.InlineLogger +import com.open592.fileserver.configuration.ServerConfiguration +import com.open592.fileserver.protocol.inbound.Js5InboundMessage +import com.open592.fileserver.protocol.outbound.Js5OutboundMessage +import io.netty.channel.ChannelHandlerContext +import io.netty.channel.SimpleChannelInboundHandler +import io.netty.handler.timeout.IdleStateEvent +import jakarta.inject.Inject + +class Js5ChannelHandler +@Inject +constructor( + private val service: Js5Service, + private val serverConfiguration: ServerConfiguration, +) : SimpleChannelInboundHandler(Js5InboundMessage::class.java) { + private lateinit var client: Js5Client + + override fun handlerAdded(ctx: ChannelHandlerContext) { + client = Js5Client(ctx.read()) + } + + override fun channelRead0(ctx: ChannelHandlerContext, message: Js5InboundMessage) { + when (message) { + is Js5InboundMessage.InitializeJs5RemoteConnection -> + handleInitializeJs5RemoteConnection(ctx, message) + is Js5InboundMessage.RequestGroup -> service.push(client, message) + is Js5InboundMessage.ExchangeObfuscationKey -> handleExchangeObfuscationKey(message) + is Js5InboundMessage.RequestConnectionDisconnect -> ctx.close() + else -> Unit + } + } + + private fun handleInitializeJs5RemoteConnection( + ctx: ChannelHandlerContext, + message: Js5InboundMessage.InitializeJs5RemoteConnection + ) { + if (message.build != serverConfiguration.getBuildNumber()) { + ctx.write(Js5OutboundMessage.ClientIsOutOfDate) + } else { + ctx.write(Js5OutboundMessage.Ok) + } + } + + private fun handleExchangeObfuscationKey(message: Js5InboundMessage.ExchangeObfuscationKey) { + logger.info { "Handle Exchange Obfuscation Key with value = ${message.key}" } + } + + override fun channelReadComplete(ctx: ChannelHandlerContext) { + service.readIfNotFull(client) + ctx.flush() + } + + override fun channelWritabilityChanged(ctx: ChannelHandlerContext) { + if (ctx.channel().isWritable) { + service.notifyIfNotEmpty(client) + } + } + + override fun userEventTriggered(ctx: ChannelHandlerContext, event: Any) { + if (event is IdleStateEvent) { + ctx.close() + } + } + + private companion object { + private val logger = InlineLogger() + } +} diff --git a/src/main/kotlin/com/open592/fileserver/net/js5/Js5Client.kt b/src/main/kotlin/com/open592/fileserver/net/js5/Js5Client.kt new file mode 100644 index 0000000..37f9e49 --- /dev/null +++ b/src/main/kotlin/com/open592/fileserver/net/js5/Js5Client.kt @@ -0,0 +1,44 @@ +package com.open592.fileserver.net.js5 + +import com.open592.fileserver.protocol.inbound.Js5InboundMessage +import io.netty.channel.ChannelHandlerContext + +class Js5Client(val ctx: ChannelHandlerContext) { + private val urgent = ArrayDeque() + private val prefetch = ArrayDeque() + + fun push(request: Js5InboundMessage.RequestGroup) { + if (request.isPrefetch) { + prefetch += request + } else { + urgent += request + prefetch -= request + } + } + + fun pop(): Js5InboundMessage.RequestGroup? { + val request = urgent.removeFirstOrNull() + + if (request != null) { + return request + } + + return prefetch.removeFirstOrNull() + } + + fun isNotFull(): Boolean { + return urgent.size < MAX_QUEUE_SIZE && prefetch.size < MAX_QUEUE_SIZE + } + + fun isNotEmpty(): Boolean { + return urgent.isNotEmpty() || prefetch.isNotEmpty() + } + + fun isReady(): Boolean { + return ctx.channel().isWritable && isNotEmpty() + } + + private companion object { + private const val MAX_QUEUE_SIZE = 20 + } +} diff --git a/src/main/kotlin/com/open592/fileserver/net/js5/Js5Service.kt b/src/main/kotlin/com/open592/fileserver/net/js5/Js5Service.kt new file mode 100644 index 0000000..47aa976 --- /dev/null +++ b/src/main/kotlin/com/open592/fileserver/net/js5/Js5Service.kt @@ -0,0 +1,131 @@ +package com.open592.fileserver.net.js5 + +import com.displee.cache.CacheLibrary +import com.displee.compress.CompressionType +import com.displee.compress.compress +import com.displee.compress.type.EmptyCompressor +import com.google.common.util.concurrent.AbstractExecutionThreadService +import com.open592.fileserver.buffer.use +import com.open592.fileserver.collections.UniqueQueue +import com.open592.fileserver.protocol.inbound.Js5InboundMessage +import com.open592.fileserver.protocol.outbound.Js5OutboundMessage +import io.netty.buffer.ByteBufAllocator +import jakarta.inject.Inject + +class Js5Service +@Inject +constructor( + private val allocator: ByteBufAllocator, + private val cacheLibrary: CacheLibrary, +) : AbstractExecutionThreadService() { + private val lock = Object() + private val clients = UniqueQueue() + + override fun run() { + while (true) { + var client: Js5Client + var request: Js5InboundMessage.RequestGroup + + synchronized(lock) { + while (true) { + if (!isRunning) { + return + } + + val next = clients.removeFirstOrNull() + + if (next == null) { + lock.wait() + continue + } + + client = next + request = client.pop() ?: continue + + break + } + } + } + } + + private fun serve(client: Js5Client, request: Js5InboundMessage.RequestGroup) { + val ctx = client.ctx + + if (!ctx.channel().isActive) { + return + } + + val buf = + if (request.archive == ARCHIVE_SET && request.group == ARCHIVE_SET) { + allocator.buffer().use { buffer -> + val masterIndex = + cacheLibrary.generateUkeys(false).compress(CompressionType.NONE, EmptyCompressor) + + buffer.writeBytes(masterIndex) + buffer.retain() + } + } else { + allocator.buffer().use { buffer -> + val data = cacheLibrary.data(request.group, request.archive) + + buffer.writeBytes(data) + buffer.retain() + } + } + + val response = + Js5OutboundMessage.Group(request.isPrefetch, request.archive, request.group, data = buf) + + ctx.writeAndFlush(response) + + synchronized(lock) { + if (client.isReady()) { + clients.add(client) + } + + if (client.isNotFull()) { + ctx.read() + } + } + } + + fun push(client: Js5Client, request: Js5InboundMessage.RequestGroup) { + synchronized(lock) { + client.push(request) + + if (client.isReady()) { + clients.add(client) + + lock.notifyAll() + } + + if (client.isNotFull()) { + client.ctx.read() + } + } + } + + fun readIfNotFull(client: Js5Client) { + synchronized(lock) { + if (client.isNotFull()) { + client.ctx.read() + } + } + } + + fun notifyIfNotEmpty(client: Js5Client) { + synchronized(lock) { + if (client.isNotEmpty()) { + lock.notifyAll() + } + } + } + + override fun triggerShutdown() { + synchronized(lock) { lock.notifyAll() } + } + + private companion object { + private const val ARCHIVE_SET = 255 + } +} diff --git a/src/main/kotlin/com/open592/fileserver/protocol/inbound/Js5InboundMessage.kt b/src/main/kotlin/com/open592/fileserver/protocol/inbound/Js5InboundMessage.kt new file mode 100644 index 0000000..42c5c23 --- /dev/null +++ b/src/main/kotlin/com/open592/fileserver/protocol/inbound/Js5InboundMessage.kt @@ -0,0 +1,18 @@ +package com.open592.fileserver.protocol.inbound + +sealed class Js5InboundMessage { + data class InitializeJs5RemoteConnection(val build: Int) : Js5InboundMessage() + + data class RequestGroup(val archive: Int, val group: Int, val isPrefetch: Boolean) : + Js5InboundMessage() + + object InformUserIsLoggedIn : Js5InboundMessage() + + object InformUserIsLoggedOut : Js5InboundMessage() + + object InformClientIsReady : Js5InboundMessage() + + object RequestConnectionDisconnect : Js5InboundMessage() + + data class ExchangeObfuscationKey(val key: Int) : Js5InboundMessage() +} diff --git a/src/main/kotlin/com/open592/fileserver/protocol/inbound/Js5InboundMessageDecoder.kt b/src/main/kotlin/com/open592/fileserver/protocol/inbound/Js5InboundMessageDecoder.kt new file mode 100644 index 0000000..f1f1ed1 --- /dev/null +++ b/src/main/kotlin/com/open592/fileserver/protocol/inbound/Js5InboundMessageDecoder.kt @@ -0,0 +1,97 @@ +package com.open592.fileserver.protocol.inbound + +import io.netty.buffer.ByteBuf +import io.netty.channel.ChannelHandlerContext +import io.netty.handler.codec.ByteToMessageDecoder +import io.netty.handler.codec.DecoderException + +class Js5InboundMessageDecoder : ByteToMessageDecoder() { + override fun decode(ctx: ChannelHandlerContext, input: ByteBuf, output: MutableList) { + if (input.readableBytes() < 4) { + return + } + + val opcode = input.readUnsignedByte().toInt() + val message = decodeOpcode(input, opcode) + + output += message + } + + private fun decodeOpcode(input: ByteBuf, opcode: Int): Js5InboundMessage { + return when (opcode) { + 0 -> decodeRequestGroupPacket(input, isPrefetch = true) + 1 -> decodeRequestGroupPacket(input, isPrefetch = false) + 2 -> decodeInformUserIsLoggedInPacket(input) + 3 -> decodeInformUserIsLoggedOutPacket(input) + 4 -> decodeExchangeObfuscationKeyPacket(input) + 6 -> decodeInformClientIsReadyPacket(input) + 7 -> decodeRequestConnectionDisconnectPacket(input) + 15 -> decodeInitializeJs5RemoteConnectionPacket(input) + else -> throw DecoderException("Unknown Js5 inbound message opcode: $opcode") + } + } + + private fun decodeRequestGroupPacket( + input: ByteBuf, + isPrefetch: Boolean + ): Js5InboundMessage.RequestGroup { + val archive = input.readUnsignedByte().toInt() + val group = input.readUnsignedShort().toInt() + + return Js5InboundMessage.RequestGroup(archive, group, isPrefetch) + } + + private fun decodeInformUserIsLoggedInPacket( + input: ByteBuf + ): Js5InboundMessage.InformUserIsLoggedIn { + // Skip padding bytes + input.skipBytes(3) + + return Js5InboundMessage.InformUserIsLoggedIn + } + + private fun decodeInformUserIsLoggedOutPacket( + input: ByteBuf + ): Js5InboundMessage.InformUserIsLoggedOut { + // Skip padding bytes + input.skipBytes(3) + + return Js5InboundMessage.InformUserIsLoggedOut + } + + private fun decodeRequestConnectionDisconnectPacket( + input: ByteBuf + ): Js5InboundMessage.RequestConnectionDisconnect { + // Skip padding bytes + input.skipBytes(3) + + return Js5InboundMessage.RequestConnectionDisconnect + } + + private fun decodeExchangeObfuscationKeyPacket( + input: ByteBuf + ): Js5InboundMessage.ExchangeObfuscationKey { + val key = input.readUnsignedByte().toInt() + + return Js5InboundMessage.ExchangeObfuscationKey(key) + } + + private fun decodeInformClientIsReadyPacket( + input: ByteBuf + ): Js5InboundMessage.InformClientIsReady { + // Skip padding bytes + // NOTE: The client usually sends along padding bytes with `0` as the value, + // but for this message it's using `p3(3)`. + input.skipBytes(3) + + return Js5InboundMessage.InformClientIsReady + } + + private fun decodeInitializeJs5RemoteConnectionPacket( + input: ByteBuf + ): Js5InboundMessage.InitializeJs5RemoteConnection { + val build = input.readInt() + + return Js5InboundMessage.InitializeJs5RemoteConnection(build) + } +} diff --git a/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5ClientIsOutOfDateMessageEncoder.kt b/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5ClientIsOutOfDateMessageEncoder.kt new file mode 100644 index 0000000..33d4727 --- /dev/null +++ b/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5ClientIsOutOfDateMessageEncoder.kt @@ -0,0 +1,10 @@ +package com.open592.fileserver.protocol.outbound + +import io.netty.channel.ChannelHandler + +@ChannelHandler.Sharable +class Js5ClientIsOutOfDateMessageEncoder : + SimpleOpcodeMessageToByteEncoder( + opcode = 6, + klass = Js5OutboundMessage.ClientIsOutOfDate::class.java, + ) diff --git a/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5GroupMessageEncoder.kt b/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5GroupMessageEncoder.kt new file mode 100644 index 0000000..98da4f2 --- /dev/null +++ b/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5GroupMessageEncoder.kt @@ -0,0 +1,55 @@ +package com.open592.fileserver.protocol.outbound + +import io.netty.buffer.ByteBuf +import io.netty.channel.ChannelHandler +import io.netty.channel.ChannelHandlerContext +import io.netty.handler.codec.EncoderException +import io.netty.handler.codec.MessageToByteEncoder +import kotlin.math.min + +@ChannelHandler.Sharable +class Js5GroupMessageEncoder : + MessageToByteEncoder(Js5OutboundMessage.Group::class.java) { + override fun encode( + ctx: ChannelHandlerContext, + message: Js5OutboundMessage.Group, + output: ByteBuf + ) { + output.writeByte(message.archive) + output.writeShort(message.group) + + if (!message.data.isReadable) { + throw EncoderException("Missing compression byte") + } + + var compression = message.data.readUnsignedByte().toInt() + + if (message.isPrefetch) { + compression = compression or 0x80 + } + + output.writeByte(compression) + + output.writeBytes(message.data, min(message.data.readableBytes(), 508)) + + while (message.data.isReadable) { + output.writeByte(0xFF) + output.writeBytes(message.data, min(message.data.readableBytes(), 511)) + } + } + + override fun allocateBuffer( + ctx: ChannelHandlerContext, + message: Js5OutboundMessage.Group, + preferDirect: Boolean + ): ByteBuf { + val dataLength = message.data.readableBytes() + val bufferLength = 2 + dataLength + (512 + dataLength) / 511 + + return if (preferDirect) { + ctx.alloc().ioBuffer(bufferLength, bufferLength) + } else { + ctx.alloc().heapBuffer(bufferLength, bufferLength) + } + } +} diff --git a/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5IpLimitMessageEncoder.kt b/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5IpLimitMessageEncoder.kt new file mode 100644 index 0000000..a193a2b --- /dev/null +++ b/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5IpLimitMessageEncoder.kt @@ -0,0 +1,8 @@ +package com.open592.fileserver.protocol.outbound + +import io.netty.channel.ChannelHandler + +@ChannelHandler.Sharable +class Js5IpLimitMessageEncoder : + SimpleOpcodeMessageToByteEncoder( + opcode = 9, klass = Js5OutboundMessage.IpLimit::class.java) diff --git a/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5OkMessageEncoder.kt b/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5OkMessageEncoder.kt new file mode 100644 index 0000000..004875f --- /dev/null +++ b/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5OkMessageEncoder.kt @@ -0,0 +1,8 @@ +package com.open592.fileserver.protocol.outbound + +import io.netty.channel.ChannelHandler + +@ChannelHandler.Sharable +class Js5OkMessageEncoder : + SimpleOpcodeMessageToByteEncoder( + opcode = 0, klass = Js5OutboundMessage.Ok::class.java) diff --git a/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5OutboundMessage.kt b/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5OutboundMessage.kt new file mode 100644 index 0000000..c582d1a --- /dev/null +++ b/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5OutboundMessage.kt @@ -0,0 +1,16 @@ +package com.open592.fileserver.protocol.outbound + +import io.netty.buffer.ByteBuf + +sealed class Js5OutboundMessage { + object Ok : Js5OutboundMessage() + + object ClientIsOutOfDate : Js5OutboundMessage() + + object ServerIsFull : Js5OutboundMessage() + + object IpLimit : Js5OutboundMessage() + + data class Group(val isPrefetch: Boolean, val archive: Int, val group: Int, val data: ByteBuf) : + Js5OutboundMessage() +} diff --git a/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5ServerFullMessageEncoder.kt b/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5ServerFullMessageEncoder.kt new file mode 100644 index 0000000..c2a629d --- /dev/null +++ b/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5ServerFullMessageEncoder.kt @@ -0,0 +1,8 @@ +package com.open592.fileserver.protocol.outbound + +import io.netty.channel.ChannelHandler + +@ChannelHandler.Sharable +class Js5ServerFullMessageEncoder : + SimpleOpcodeMessageToByteEncoder( + opcode = 7, klass = Js5OutboundMessage.ServerIsFull::class.java) diff --git a/src/main/kotlin/com/open592/fileserver/protocol/outbound/SimpleOpcodeMessageToByteEncoder.kt b/src/main/kotlin/com/open592/fileserver/protocol/outbound/SimpleOpcodeMessageToByteEncoder.kt new file mode 100644 index 0000000..19794cb --- /dev/null +++ b/src/main/kotlin/com/open592/fileserver/protocol/outbound/SimpleOpcodeMessageToByteEncoder.kt @@ -0,0 +1,26 @@ +package com.open592.fileserver.protocol.outbound + +import io.netty.buffer.ByteBuf +import io.netty.channel.ChannelHandlerContext +import io.netty.handler.codec.MessageToByteEncoder + +abstract class SimpleOpcodeMessageToByteEncoder( + protected val opcode: Int, + klass: Class +) : MessageToByteEncoder(klass) { + override fun encode(ctx: ChannelHandlerContext, message: T, output: ByteBuf) { + output.writeByte(opcode) + } + + override fun allocateBuffer(ctx: ChannelHandlerContext, msg: T, preferDirect: Boolean): ByteBuf { + return if (preferDirect) { + ctx.alloc().ioBuffer(BUFFER_LENGTH, BUFFER_LENGTH) + } else { + ctx.alloc().heapBuffer(BUFFER_LENGTH, BUFFER_LENGTH) + } + } + + private companion object { + private const val BUFFER_LENGTH = 4 + } +} diff --git a/src/main/kotlin/com/open592/fileserver/server/FileServerModule.kt b/src/main/kotlin/com/open592/fileserver/server/FileServerModule.kt index 978c666..47752cf 100644 --- a/src/main/kotlin/com/open592/fileserver/server/FileServerModule.kt +++ b/src/main/kotlin/com/open592/fileserver/server/FileServerModule.kt @@ -7,6 +7,7 @@ import com.open592.fileserver.buffer.BufferModule import com.open592.fileserver.cache.CacheModule import com.open592.fileserver.configuration.ServerConfigurationModule import com.open592.fileserver.net.NetworkService +import com.open592.fileserver.net.js5.Js5Service object FileServerModule : AbstractModule() { override fun configure() { @@ -16,5 +17,6 @@ object FileServerModule : AbstractModule() { val binder = Multibinder.newSetBinder(binder(), Service::class.java) binder.addBinding().to(NetworkService::class.java) + binder.addBinding().to(Js5Service::class.java) } } From ef2d60d8605da4a07b59aeb515d9d99416ee67c4 Mon Sep 17 00:00:00 2001 From: Open592 Developer Date: Sun, 27 Oct 2024 08:42:05 -0700 Subject: [PATCH 2/6] protocol: Move Js5InboundChannelHandler to protocol package Matches it's function within the pipeline Signed-off-by: Open592 Developer --- .../open592/fileserver/net/NetworkChannelInitializer.kt | 6 +++--- .../inbound/Js5InboundChannelHandler.kt} | 7 ++++--- 2 files changed, 7 insertions(+), 6 deletions(-) rename src/main/kotlin/com/open592/fileserver/{net/js5/Js5ChannelHandler.kt => protocol/inbound/Js5InboundChannelHandler.kt} (92%) diff --git a/src/main/kotlin/com/open592/fileserver/net/NetworkChannelInitializer.kt b/src/main/kotlin/com/open592/fileserver/net/NetworkChannelInitializer.kt index d6ddf85..04bfc75 100644 --- a/src/main/kotlin/com/open592/fileserver/net/NetworkChannelInitializer.kt +++ b/src/main/kotlin/com/open592/fileserver/net/NetworkChannelInitializer.kt @@ -1,7 +1,7 @@ package com.open592.fileserver.net import com.github.michaelbull.logging.InlineLogger -import com.open592.fileserver.net.js5.Js5ChannelHandler +import com.open592.fileserver.protocol.inbound.Js5InboundChannelHandler import com.open592.fileserver.protocol.inbound.Js5InboundMessageDecoder import com.open592.fileserver.protocol.outbound.Js5ClientIsOutOfDateMessageEncoder import com.open592.fileserver.protocol.outbound.Js5GroupMessageEncoder @@ -20,7 +20,7 @@ import java.util.concurrent.TimeUnit @Singleton class NetworkChannelInitializer @Inject -constructor(private val handlerProvider: Provider) : +constructor(private val js5InboundChannelHandler: Provider) : ChannelInitializer() { override fun initChannel(channel: Channel) { channel @@ -34,7 +34,7 @@ constructor(private val handlerProvider: Provider) : Js5ClientIsOutOfDateMessageEncoder(), Js5ServerFullMessageEncoder(), Js5IpLimitMessageEncoder(), - handlerProvider.get(), + js5InboundChannelHandler.get(), ) } diff --git a/src/main/kotlin/com/open592/fileserver/net/js5/Js5ChannelHandler.kt b/src/main/kotlin/com/open592/fileserver/protocol/inbound/Js5InboundChannelHandler.kt similarity index 92% rename from src/main/kotlin/com/open592/fileserver/net/js5/Js5ChannelHandler.kt rename to src/main/kotlin/com/open592/fileserver/protocol/inbound/Js5InboundChannelHandler.kt index 7b5a31d..1816abc 100644 --- a/src/main/kotlin/com/open592/fileserver/net/js5/Js5ChannelHandler.kt +++ b/src/main/kotlin/com/open592/fileserver/protocol/inbound/Js5InboundChannelHandler.kt @@ -1,15 +1,16 @@ -package com.open592.fileserver.net.js5 +package com.open592.fileserver.protocol.inbound import com.github.michaelbull.logging.InlineLogger import com.open592.fileserver.configuration.ServerConfiguration -import com.open592.fileserver.protocol.inbound.Js5InboundMessage +import com.open592.fileserver.net.js5.Js5Client +import com.open592.fileserver.net.js5.Js5Service import com.open592.fileserver.protocol.outbound.Js5OutboundMessage import io.netty.channel.ChannelHandlerContext import io.netty.channel.SimpleChannelInboundHandler import io.netty.handler.timeout.IdleStateEvent import jakarta.inject.Inject -class Js5ChannelHandler +class Js5InboundChannelHandler @Inject constructor( private val service: Js5Service, From d6b003bfa60e598693b21021a63280d001fab823 Mon Sep 17 00:00:00 2001 From: Open592 Developer Date: Thu, 31 Oct 2024 22:09:30 -0700 Subject: [PATCH 3/6] collections: Add plusAssign operator to UniqueQueue Signed-off-by: Open592 Developer --- .../kotlin/com/open592/fileserver/collections/UniqueQueue.kt | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/main/kotlin/com/open592/fileserver/collections/UniqueQueue.kt b/src/main/kotlin/com/open592/fileserver/collections/UniqueQueue.kt index 12d17eb..222086d 100644 --- a/src/main/kotlin/com/open592/fileserver/collections/UniqueQueue.kt +++ b/src/main/kotlin/com/open592/fileserver/collections/UniqueQueue.kt @@ -17,6 +17,10 @@ class UniqueQueue { return false } + operator fun plusAssign(value: T) { + add(value) + } + fun removeFirstOrNull(): T? { val value = queue.removeFirstOrNull() From 76ff7c58ad6140bbd98c506349935bf453fcd48a Mon Sep 17 00:00:00 2001 From: Open592 Developer Date: Thu, 31 Oct 2024 22:11:52 -0700 Subject: [PATCH 4/6] protocol: Group outbound messages under single interface Signed-off-by: Open592 Developer --- .../net/NetworkChannelInitializer.kt | 14 +++----- .../open592/fileserver/net/js5/Js5Service.kt | 6 ++-- .../inbound/Js5InboundChannelHandler.kt | 6 ++-- .../Js5ClientIsOutOfDateMessageEncoder.kt | 10 ------ .../outbound/Js5IpLimitMessageEncoder.kt | 8 ----- .../protocol/outbound/Js5OkMessageEncoder.kt | 8 ----- .../outbound/Js5OutboundGroupMessage.kt | 10 ++++++ ...r.kt => Js5OutboundGroupMessageEncoder.kt} | 8 ++--- .../protocol/outbound/Js5OutboundMessage.kt | 15 +-------- .../outbound/Js5OutboundStatusMessage.kt | 11 +++++++ .../Js5OutboundStatusMessageEncoder.kt | 32 +++++++++++++++++++ .../outbound/Js5ServerFullMessageEncoder.kt | 8 ----- .../SimpleOpcodeMessageToByteEncoder.kt | 26 --------------- .../open592/fileserver/server/FileServer.kt | 2 -- 14 files changed, 69 insertions(+), 95 deletions(-) delete mode 100644 src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5ClientIsOutOfDateMessageEncoder.kt delete mode 100644 src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5IpLimitMessageEncoder.kt delete mode 100644 src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5OkMessageEncoder.kt create mode 100644 src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5OutboundGroupMessage.kt rename src/main/kotlin/com/open592/fileserver/protocol/outbound/{Js5GroupMessageEncoder.kt => Js5OutboundGroupMessageEncoder.kt} (86%) create mode 100644 src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5OutboundStatusMessage.kt create mode 100644 src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5OutboundStatusMessageEncoder.kt delete mode 100644 src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5ServerFullMessageEncoder.kt delete mode 100644 src/main/kotlin/com/open592/fileserver/protocol/outbound/SimpleOpcodeMessageToByteEncoder.kt diff --git a/src/main/kotlin/com/open592/fileserver/net/NetworkChannelInitializer.kt b/src/main/kotlin/com/open592/fileserver/net/NetworkChannelInitializer.kt index 04bfc75..0ead7ba 100644 --- a/src/main/kotlin/com/open592/fileserver/net/NetworkChannelInitializer.kt +++ b/src/main/kotlin/com/open592/fileserver/net/NetworkChannelInitializer.kt @@ -3,11 +3,8 @@ package com.open592.fileserver.net import com.github.michaelbull.logging.InlineLogger import com.open592.fileserver.protocol.inbound.Js5InboundChannelHandler import com.open592.fileserver.protocol.inbound.Js5InboundMessageDecoder -import com.open592.fileserver.protocol.outbound.Js5ClientIsOutOfDateMessageEncoder -import com.open592.fileserver.protocol.outbound.Js5GroupMessageEncoder -import com.open592.fileserver.protocol.outbound.Js5IpLimitMessageEncoder -import com.open592.fileserver.protocol.outbound.Js5OkMessageEncoder -import com.open592.fileserver.protocol.outbound.Js5ServerFullMessageEncoder +import com.open592.fileserver.protocol.outbound.Js5OutboundGroupMessageEncoder +import com.open592.fileserver.protocol.outbound.Js5OutboundStatusMessageEncoder import io.netty.channel.Channel import io.netty.channel.ChannelHandlerContext import io.netty.channel.ChannelInitializer @@ -29,11 +26,8 @@ constructor(private val js5InboundChannelHandler: Provider( - opcode = 6, - klass = Js5OutboundMessage.ClientIsOutOfDate::class.java, - ) diff --git a/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5IpLimitMessageEncoder.kt b/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5IpLimitMessageEncoder.kt deleted file mode 100644 index a193a2b..0000000 --- a/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5IpLimitMessageEncoder.kt +++ /dev/null @@ -1,8 +0,0 @@ -package com.open592.fileserver.protocol.outbound - -import io.netty.channel.ChannelHandler - -@ChannelHandler.Sharable -class Js5IpLimitMessageEncoder : - SimpleOpcodeMessageToByteEncoder( - opcode = 9, klass = Js5OutboundMessage.IpLimit::class.java) diff --git a/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5OkMessageEncoder.kt b/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5OkMessageEncoder.kt deleted file mode 100644 index 004875f..0000000 --- a/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5OkMessageEncoder.kt +++ /dev/null @@ -1,8 +0,0 @@ -package com.open592.fileserver.protocol.outbound - -import io.netty.channel.ChannelHandler - -@ChannelHandler.Sharable -class Js5OkMessageEncoder : - SimpleOpcodeMessageToByteEncoder( - opcode = 0, klass = Js5OutboundMessage.Ok::class.java) diff --git a/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5OutboundGroupMessage.kt b/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5OutboundGroupMessage.kt new file mode 100644 index 0000000..a51191a --- /dev/null +++ b/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5OutboundGroupMessage.kt @@ -0,0 +1,10 @@ +package com.open592.fileserver.protocol.outbound + +import io.netty.buffer.ByteBuf + +data class Js5OutboundGroupMessage( + val isPrefetch: Boolean, + val archive: Int, + val group: Int, + val data: ByteBuf +) : Js5OutboundMessage diff --git a/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5GroupMessageEncoder.kt b/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5OutboundGroupMessageEncoder.kt similarity index 86% rename from src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5GroupMessageEncoder.kt rename to src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5OutboundGroupMessageEncoder.kt index 98da4f2..a962ced 100644 --- a/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5GroupMessageEncoder.kt +++ b/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5OutboundGroupMessageEncoder.kt @@ -8,11 +8,11 @@ import io.netty.handler.codec.MessageToByteEncoder import kotlin.math.min @ChannelHandler.Sharable -class Js5GroupMessageEncoder : - MessageToByteEncoder(Js5OutboundMessage.Group::class.java) { +class Js5OutboundGroupMessageEncoder : + MessageToByteEncoder(Js5OutboundGroupMessage::class.java) { override fun encode( ctx: ChannelHandlerContext, - message: Js5OutboundMessage.Group, + message: Js5OutboundGroupMessage, output: ByteBuf ) { output.writeByte(message.archive) @@ -40,7 +40,7 @@ class Js5GroupMessageEncoder : override fun allocateBuffer( ctx: ChannelHandlerContext, - message: Js5OutboundMessage.Group, + message: Js5OutboundGroupMessage, preferDirect: Boolean ): ByteBuf { val dataLength = message.data.readableBytes() diff --git a/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5OutboundMessage.kt b/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5OutboundMessage.kt index c582d1a..26dde23 100644 --- a/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5OutboundMessage.kt +++ b/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5OutboundMessage.kt @@ -1,16 +1,3 @@ package com.open592.fileserver.protocol.outbound -import io.netty.buffer.ByteBuf - -sealed class Js5OutboundMessage { - object Ok : Js5OutboundMessage() - - object ClientIsOutOfDate : Js5OutboundMessage() - - object ServerIsFull : Js5OutboundMessage() - - object IpLimit : Js5OutboundMessage() - - data class Group(val isPrefetch: Boolean, val archive: Int, val group: Int, val data: ByteBuf) : - Js5OutboundMessage() -} +interface Js5OutboundMessage diff --git a/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5OutboundStatusMessage.kt b/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5OutboundStatusMessage.kt new file mode 100644 index 0000000..292dcaf --- /dev/null +++ b/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5OutboundStatusMessage.kt @@ -0,0 +1,11 @@ +package com.open592.fileserver.protocol.outbound + +sealed class Js5OutboundStatusMessage(val opcode: Int) : Js5OutboundMessage { + object Ok : Js5OutboundStatusMessage(opcode = 0) + + object ClientIsOutOfDate : Js5OutboundStatusMessage(opcode = 6) + + object ServerIsFull : Js5OutboundStatusMessage(opcode = 7) + + object IpIsLimited : Js5OutboundStatusMessage(opcode = 9) +} diff --git a/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5OutboundStatusMessageEncoder.kt b/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5OutboundStatusMessageEncoder.kt new file mode 100644 index 0000000..7dfee83 --- /dev/null +++ b/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5OutboundStatusMessageEncoder.kt @@ -0,0 +1,32 @@ +package com.open592.fileserver.protocol.outbound + +import io.netty.buffer.ByteBuf +import io.netty.channel.ChannelHandlerContext +import io.netty.handler.codec.MessageToByteEncoder + +class Js5OutboundStatusMessageEncoder : + MessageToByteEncoder(Js5OutboundStatusMessage::class.java) { + override fun encode( + ctx: ChannelHandlerContext, + message: Js5OutboundStatusMessage, + output: ByteBuf + ) { + output.writeByte(message.opcode) + } + + override fun allocateBuffer( + ctx: ChannelHandlerContext, + msg: Js5OutboundStatusMessage, + preferDirect: Boolean + ): ByteBuf { + return if (preferDirect) { + ctx.alloc().ioBuffer(BUFFER_LENGTH, BUFFER_LENGTH) + } else { + ctx.alloc().heapBuffer(BUFFER_LENGTH, BUFFER_LENGTH) + } + } + + private companion object { + private const val BUFFER_LENGTH = 4 + } +} diff --git a/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5ServerFullMessageEncoder.kt b/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5ServerFullMessageEncoder.kt deleted file mode 100644 index c2a629d..0000000 --- a/src/main/kotlin/com/open592/fileserver/protocol/outbound/Js5ServerFullMessageEncoder.kt +++ /dev/null @@ -1,8 +0,0 @@ -package com.open592.fileserver.protocol.outbound - -import io.netty.channel.ChannelHandler - -@ChannelHandler.Sharable -class Js5ServerFullMessageEncoder : - SimpleOpcodeMessageToByteEncoder( - opcode = 7, klass = Js5OutboundMessage.ServerIsFull::class.java) diff --git a/src/main/kotlin/com/open592/fileserver/protocol/outbound/SimpleOpcodeMessageToByteEncoder.kt b/src/main/kotlin/com/open592/fileserver/protocol/outbound/SimpleOpcodeMessageToByteEncoder.kt deleted file mode 100644 index 19794cb..0000000 --- a/src/main/kotlin/com/open592/fileserver/protocol/outbound/SimpleOpcodeMessageToByteEncoder.kt +++ /dev/null @@ -1,26 +0,0 @@ -package com.open592.fileserver.protocol.outbound - -import io.netty.buffer.ByteBuf -import io.netty.channel.ChannelHandlerContext -import io.netty.handler.codec.MessageToByteEncoder - -abstract class SimpleOpcodeMessageToByteEncoder( - protected val opcode: Int, - klass: Class -) : MessageToByteEncoder(klass) { - override fun encode(ctx: ChannelHandlerContext, message: T, output: ByteBuf) { - output.writeByte(opcode) - } - - override fun allocateBuffer(ctx: ChannelHandlerContext, msg: T, preferDirect: Boolean): ByteBuf { - return if (preferDirect) { - ctx.alloc().ioBuffer(BUFFER_LENGTH, BUFFER_LENGTH) - } else { - ctx.alloc().heapBuffer(BUFFER_LENGTH, BUFFER_LENGTH) - } - } - - private companion object { - private const val BUFFER_LENGTH = 4 - } -} diff --git a/src/main/kotlin/com/open592/fileserver/server/FileServer.kt b/src/main/kotlin/com/open592/fileserver/server/FileServer.kt index 8bdcd2c..47e9a86 100644 --- a/src/main/kotlin/com/open592/fileserver/server/FileServer.kt +++ b/src/main/kotlin/com/open592/fileserver/server/FileServer.kt @@ -3,7 +3,6 @@ package com.open592.fileserver.server import com.github.michaelbull.logging.InlineLogger import com.google.common.util.concurrent.Service import com.google.common.util.concurrent.ServiceManager -import com.open592.fileserver.configuration.ServerConfiguration import jakarta.inject.Inject import jakarta.inject.Singleton @@ -12,7 +11,6 @@ class FileServer @Inject constructor( services: Set, - private val serverConfiguration: ServerConfiguration, ) { private val serviceManager = ServiceManager(services) From 51dc506221133a37e1d7882d02b67cc84aa73cac Mon Sep 17 00:00:00 2001 From: Open592 Developer Date: Thu, 31 Oct 2024 23:01:49 -0700 Subject: [PATCH 5/6] js5: Mark Js5Service as a Singleton Fixes a bug where we weren't serving groups Signed-off-by: Open592 Developer --- src/main/kotlin/com/open592/fileserver/net/js5/Js5Service.kt | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main/kotlin/com/open592/fileserver/net/js5/Js5Service.kt b/src/main/kotlin/com/open592/fileserver/net/js5/Js5Service.kt index 53e5e10..87768e6 100644 --- a/src/main/kotlin/com/open592/fileserver/net/js5/Js5Service.kt +++ b/src/main/kotlin/com/open592/fileserver/net/js5/Js5Service.kt @@ -11,7 +11,9 @@ import com.open592.fileserver.protocol.inbound.Js5InboundMessage import com.open592.fileserver.protocol.outbound.Js5OutboundGroupMessage import io.netty.buffer.ByteBufAllocator import jakarta.inject.Inject +import jakarta.inject.Singleton +@Singleton class Js5Service @Inject constructor( @@ -78,7 +80,7 @@ constructor( val response = Js5OutboundGroupMessage(request.isPrefetch, request.archive, request.group, data = buf) - ctx.writeAndFlush(response) + ctx.writeAndFlush(response, ctx.voidPromise()) synchronized(lock) { if (client.isReady()) { From ea9c5762b5f12d5a694f4d4c5839e96de9c659fb Mon Sep 17 00:00:00 2001 From: Open592 Developer Date: Mon, 18 Nov 2024 01:09:06 -0800 Subject: [PATCH 6/6] js5: Read from index255 if group == 255 cache library doesn't seem to do this check in it's root `data` method. So we will do it explicitly ourselves. I might be missing something, but it's what greg's fileserver is doing so I don't feel too bad about it. Signed-off-by: Open592 Developer --- .../com/open592/fileserver/net/js5/Js5Service.kt | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/src/main/kotlin/com/open592/fileserver/net/js5/Js5Service.kt b/src/main/kotlin/com/open592/fileserver/net/js5/Js5Service.kt index 87768e6..c8e8f22 100644 --- a/src/main/kotlin/com/open592/fileserver/net/js5/Js5Service.kt +++ b/src/main/kotlin/com/open592/fileserver/net/js5/Js5Service.kt @@ -4,6 +4,7 @@ import com.displee.cache.CacheLibrary import com.displee.compress.CompressionType import com.displee.compress.compress import com.displee.compress.type.EmptyCompressor +import com.github.michaelbull.logging.InlineLogger import com.google.common.util.concurrent.AbstractExecutionThreadService import com.open592.fileserver.buffer.use import com.open592.fileserver.collections.UniqueQueue @@ -70,9 +71,14 @@ constructor( } } else { allocator.buffer().use { buffer -> - val data = cacheLibrary.data(request.group, request.archive) - - buffer.writeBytes(data) + val archiveSector = + if (request.group == 255) { + cacheLibrary.index255?.readArchiveSector(request.archive) + } else { + cacheLibrary.index(request.group).readArchiveSector(request.archive) + } ?: return + + buffer.writeBytes(archiveSector.data) buffer.retain() } } @@ -131,5 +137,6 @@ constructor( private companion object { private const val ARCHIVE_SET = 255 + private val logger = InlineLogger() } }